Announcement Announcement Module
Collapse
No announcement yet.
Getting original message back after service output Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Getting original message back after service output

    Hi guys,

    first of all, great work with SI, it's excellent!

    I found myself in bit of a problem. In a related post http://forum.springsource.org/showth...p-batch-engine I am trying to figure out a way to get spring batch and si running and its going quite well.

    What I'm unsure off is how to activate services without making them and final endpoint or changing the payload.

    The flow:

    1) I have the original message from RabbitMQ
    2) transforming it into a Properties object for ease of handling
    3) downloading a file with a service using a property
    4) launching a job (with job parameters from the Properties object) and do stuff with the file
    5) add responses on a Queue channel, send files back

    However the problem seams to be that I can't really fire up a service that returns a File Object (which I'm not using) but retain the original message for the next service in line, that needs the original message.

    Any way of doing that? (Tried claim check, but it seams to remove the message and I need it just persisted for later use) Should I use just a custom solution?

    thanks,
    pejot

  • #2
    There are a number of ways to do it - <recipient-list-router/>, or <publish-subscribe-channel/>; either one will call subscribed services in turn (with the same message).

    Or, promote the original file to a header (with a <header-enricher/>). There is a standard header often used for that (FileHeaders.ORIGINAL_FILE). Then, downstream in your flow, simply add a transformer (<int:transformer ... expression="headers['file_originalFile']" />) to restore the original file into the payload.

    Comment


    • #3
      Thanks very much for your answer Gary!

      Didn't occur to me, will try that!

      Comment


      • #4
        Hmm, one thing though:

        I'd like the files to be received in parallel, and the second service should not be started before the first one (for the particular file) finishes.

        Is there a way of achieving both blocking until download done and parallelism ?

        Thanks,

        Comment


        • #5
          As long as you use DirectChannels between the components, they will be invoked serially.

          To handle multiple requests in parallel, increase the concurrency on the inbound amqp adapter.

          Comment


          • #6
            So assigning a TaskExecutor and changing the concurrent-consumers param in int-amqp:inbound-channel-adapter will pretty much make several concurrent tasks happen?

            Comment


            • #7
              Yes, but you don't need an external task executor (unless you want more control); the container will use a SimpleTaskExecutor by default.

              Comment


              • #8
                Awesome, will try that out Thanks a lot Gary!

                Comment


                • #9
                  Seems to be working brilliantly except one thing:

                  the input adapter gets concurrent tasks running

                  Code:
                  <int-amqp:inbound-channel-adapter
                  		queue-names="doc" channel="input.channel" connection-factory="rabbitConnFactory"
                  		concurrent-consumers="20" task-executor="taskExecutor" tx-size="1"/>
                  And the chains produce what they are supposed to

                  Code:
                  <si:chain id="downloader.chain" input-channel="input.channel"
                  		output-channel="downloader.output.channel">
                  		<si:transformer>
                  			<bean class="com.example.converter.message.RabbitMessageTransformer" />
                  		</si:transformer>
                  
                  		<si:header-enricher>
                  			<si:header name="file_id" expression="payload['file_id']" />
                  			<si:header name="file_org_download_url" expression="payload['file_org_download_url']" />
                  			<si:header name="file_conv_upload_url" expression="payload['file_conv_upload_url']" />
                  		</si:header-enricher>
                  
                  		<si:service-activator ref="file.download.service" />
                  	</si:chain>
                  
                  	<si:chain id="batch.doc.chain" input-channel="downloader.output.channel"
                  		output-channel="job.responses.channel">
                  		<si:transformer expression="headers['file_id']"></si:transformer>
                  		<si:transformer>
                  			<bean
                  				class="com.example.converter.message.PropertiesToJobLaunchRequestTransformer">
                  				<property name="job" ref="simpleJob"></property>
                  			</bean>
                  		</si:transformer>
                  		<int:service-activator ref="simple.document.service" />
                  	</si:chain>
                  But at the end,

                  Code:
                  <int:channel id="job.responses.channel">
                  		<int:queue />
                  		<si:interceptors>
                  			<si:wire-tap channel="input.logging.channel" />
                  		</si:interceptors>
                  	</int:channel>
                  When I look in RabbitMq's panel, there are several unacknowledged messages, and it does not want to process any more. How do I acknowledge the messages after the process?

                  Comment


                  • #10
                    The ack should (will) happen when the thread returns to the container (when the flow is complete).

                    Take a thread dump (jstack or visualVM) to see where the container's threads are.

                    Comment


                    • #11
                      OK I figured it out.

                      I used the same ThreadExecutor for the int-amqp:inbound-channel-adapter to handle incoming job request and for the tasklet adapter itself. Would not be a problem however I set the queue on the TaskExecutor to 0 to prevent of building up any waiting jobs (the MQ is for that) and when the Job wanted to perform, and the thread pool had been exhausted, it probably rejected the incoming calls (but no rejection policy had been set). Do I understand that right?

                      Comment


                      • #12
                        It's probably best for you to look at DEBUG logs to see what happened. I can't speculate.

                        Comment

                        Working...
                        X