Announcement Announcement Module
No announcement yet.
Process multiple messages at once Page Title Module
Move Remove Collapse
Conversation Detail Module
  • Filter
  • Time
  • Show
Clear All
new posts

  • Process multiple messages at once


    Is it possible to get more than one message at once from a message channel? If yes, which kind of channel provides this feature? Publish/subscribe or queue?


  • #2

    Let's start from sample:
    HTML Code:
    <task:executor id="taskExecutor" pool-size="5"/>
    <channel id="queueChannel">
    <channel id="executorChannel">
    	<dispatcher task-executor="taskExecutor"/>
    <bridge input-channel="queue" output-channel="executorChannel">
    	<poller fixed-rate="1000"/>
    <service-activator input-channel="executorChannel"/>
    1. Your messages are received into queueChannel and stored in the real Queue object! ;-)
    2. <bridge> polls this channel via PollingTask every sec. But in one pass bridge polls messages from queue until it receives null: one-by-one and handles each of them immediately into output-channel
    3. But the last one is an Executor Channel ( with dispatcher on ThreadPoolTaskExecutor
    4. In the end some <service-activator> is subscribed to "executorChannel".
    5. And thanks for "taskExecutor" your messages are processed in parallel.

    Is it what are you looking for?

    If not: to read from channel more than one message (Collection) will be some anti-pattern for Messaging Architecture. ;-)
    However you can use <aggregator> ( to get in the end a Collection of you Messages' payalods.

    Artem Bilan


    • #3
      Hi Artem,

      First, thanks for helping me.

      Originally posted by Cleric View Post
      5. And thanks for "taskExecutor" your messages are processed in parallel.
      When I receive a message, I need to execute an INSERT on a certain database. So, in the case there are many messages in the queue, I thought about receiving N messages at once, so I can use the batch functionality of JdbcTemplate.

      I understand from your solution that it's not the case, am I right (since my messages will be processed in parallel)? What I want is more "My groups of N messages will be processed in parallel".

      Originally posted by Cleric View Post
      However you can use <aggregator> ( to get in the end a Collection of you Messages' payalods.
      May you provide an example, please?
      I would like to process N messages at once (or less if there are not enough messages in the channel). I have no idea of which Correlation Strategy to use. Which correlation key should I use for a given message?



      • #4

        What I see here:
        HTML Code:
        <aggregator correlation-strategy-expression="headers.customCorrelationKey"
                         release-strategy-expression="size() == 100"/>
        1. correlation-strategy just reads some header from message, which you can simply to add on your own
        2. release-strategy just checks how how much messages are received by aggregator
        3. And in the end <aggregator> returns into output-channel a Collection of your messages' payloads.

        That's all

        So, you still should know how much messages you need to aggregate. It also can be some Message's header.


        • #5
          So, I'm force to use a second channel for the Message groups? Is there a way to directly pass the Message group to a service activator?


          • #6
            No, you can't!
            It's anti-pattern too ;-)
            MessageGroup is some internal container for specific logic and we don't expose any API to manipulate with it.
            It will be a bit confused.

            What's problem to manipulate just with List of payloads?


            • #7
              Can you please explain why do you want to get "more then one Message from the channel at once"? What is your use case?


              • #8

                Since processing one message involves database access, I wanted to improve performance using batch functionality of JdbcTemplate. If I get multiple messages at once from the message channel, I also can update my database for all these messages at once.


                I just wanted to know if I can directly pass the List of payloads to my service activator (I used the word MessageGroup because I understood that a MessageGroup was put in the output channel of the aggregator.


                • #9

                  if I can directly pass the List of payloads to my service activator
                  Shortly: yes, you can


                  • #10
                    Then its simple
                    All you need to do is batch up messages in groups and send them to a channel as a single Message - which is a case for aggregation.
                    I recently presented it in one of the webinars
                    And here is the direct link to the code


                    • #11
                      I've tried with the following configuration:

                      		id="udbInboundAdapter" channel="receiveChannel" lookup-host="false"
                      		local-address="${listeningInterface}" port="${listeningPort}"
                      		receive-buffer-size="${receiveBufferSize}" multicast="false"
                      		check-length="false" />
                      	<int:publish-subscribe-channel id="receiveChannel"
                      		task-executor="taskExecutor" apply-sequence="true"/>
                      	<int:chain id="myChain" input-channel="receiveChannel">
                      		<int:filter ref="myFilter" />
                      		<int:transformer ref="myTransformer" />
                      		<int:aggregator send-partial-result-on-expiry="false" />
                      		<int:service-activator ref="myService"
                      			method="processMessage" />
                      I can see that the incoming messages have automatically the same CORRELATION ID (am I right? or do I have to process the headers?) but it is never processed by the service activator and I don't get any exception.


                      • #12

                        1. How much subsribers does your <publish-subscribe-channel> have? Sequence Details (corelationId, sequenceSize & sequenceNumber) are applied for all messages which are sended to those subsribers. So, if you have only one "myChain" your 'sequenceSize' will be "1".
                        2. I see you have <filter>, so, <aggregator> can't produce reply because it may never complete the group on 'corelationId'
                        3. Also keep in mind: if your <transformer> return full Message you lose all incoming headers if you don't put them manualy via MessageBuilder.

                        Can you debug that all?



                        • #13
                          Here's the last configuration I have:

                          		id="udbInboundAdapter" channel="receiveChannel" lookup-host="false"
                          		local-address="${listeningInterface}" port="${listeningPort}"
                          		receive-buffer-size="${receiveBufferSize}" multicast="false"
                          		check-length="false" pool-size="${listeningThreads}" />
                          	<int:publish-subscribe-channel id="receiveChannel"
                          		apply-sequence="true" />
                          	<int:chain id="preprocessChain" input-channel="receiveChannel"
                          		<int:filter ref="myFilter" />
                          		<int:transformer ref="myTransformer" />
                          			<int:correlation-id overwrite="true" value="0" />
                          		<int:aggregator release-strategy="timeoutCountReleaseStrategy"
                          			release-strategy-method="canRelease" />
                          	<bean id="timeoutCountReleaseStrategy"
                          		<constructor-arg value="5" />
                          		<constructor-arg value="2000" />
                          	<int:publish-subscribe-channel id="aggregatedChannel"
                          		task-executor="taskExecutor" />
                                  <bean id="taskExecutor"
                          		<property name="corePoolSize" value="5" />
                          		<property name="maxPoolSize" value="10" />
                          		<property name="queueCapacity" value="1000" />
                          	<int:service-activator id="myServiceActivator1"
                          		input-channel="aggregatedChannel" ref="myService1" method="processMessage" />
                          	<int:service-activator id="myServiceActivator2"
                          		input-channel="aggregatedChannel" ref="myService2" method="processMessage" />
                          	<bean id="myService1" class="" />
                          	<bean id="myService2" class="" />
                          I get the following exception:

                          Exception in thread "UDP-Incoming-Msg-Handler" org.springframework.integration.MessageHandlingException: error occurred in message handler [org.springframework.integration.aggregator.AggregatingMessageHandler#6f50a8]
                          	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(
                          	at java.util.concurrent.ThreadPoolExecutor$
                          Caused by: java.lang.IllegalStateException: Failed to process message list
                          	at org.springframework.integration.aggregator.MethodInvokingMessageListProcessor.process(
                          	at org.springframework.integration.aggregator.MethodInvokingReleaseStrategy.canRelease(
                          	at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.handleMessageInternal(
                          	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(
                          	... 109 more
                          Caused by: org.springframework.expression.AccessException: Unable to access property 'payload' through getter
                          	at org.springframework.expression.spel.ast.PropertyOrFieldReference.readProperty(
                          	at org.springframework.expression.spel.ast.PropertyOrFieldReference.getValueInternal(
                          	at org.springframework.expression.spel.ast.MethodReference.getValueInternal(
                          	at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(
                          	at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(
                          	at org.springframework.expression.spel.standard.SpelExpression.getValue(
                          	at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(
                          	at org.springframework.integration.util.MessagingMethodInvokerHelper.processInternal(
                          	at org.springframework.integration.util.MessagingMethodInvokerHelper.process(
                          	at org.springframework.integration.aggregator.MethodInvokingMessageListProcessor.process(
                          	... 112 more
                          Caused by: java.lang.reflect.InvocationTargetException
                          	at sun.reflect.GeneratedMethodAccessor46.invoke(Unknown Source)
                          	at sun.reflect.DelegatingMethodAccessorImpl.invoke(
                          	at java.lang.reflect.Method.invoke(
                          	... 122 more
                          Caused by: java.lang.IllegalStateException: Invalid method parameter for payload: was expecting collection.
                          	at org.springframework.util.Assert.state(
                          	at org.springframework.integration.util.MessagingMethodInvokerHelper$ParametersWrapper.getPayload(
                          	... 126 more
                          I know that the issue is related to the use of the TimeoutCountSequenceSizeReleaseStrategy because if I use the default release strategy, I don't get the exception. It also seems that the canRelease is never entered (using my debugger).
                          Last edited by Miko95; Jun 17th, 2012, 08:39 AM.


                          • #14
                            Okay, I solved this issue by removing release-strategy-method="canRelease" from the aggregator. Do you think it's a bug? I've expected that release-strategy-method is the method that decides if the MessageGroup should be released or not, and in this case, this method is "canRelease".

                            Now, I get a different issue. My services are called only one for the first MessageGroup. After that, I keep receiving new messages but the services are never called. Using JMX, I can see that the sendCount of the receiveChannel increases since messages are received. But, the sendCount of the aggregatedChannel keeps the value one. What is wrong with my configuration?

                            What I'm trying to do is as follows:

                            -I receive messages from an external system that doesn't use Spring integration.
                            -Each incoming message is filtered and if it is an expected message, it is transformed.
                            -The header enricher allows me to set the Correlation ID to a common value for all the incoming messages, so I can tell the framework that all the incoming messages may potentially be put in the same MessageGroup.
                            -So, I use the aggregator with the default correlation strategy. Concerning the release strategy, even if all the messages potentially belong to the same MessageGroup, I don't want to wait more than a certain amount of time before releasing. So, I use the Timeout+Count release strategy.

                            Since the apply-sequence flag of the channel named "receiveChannel" is set to true, I also have the sequenceNumber and sequenceSize of each incoming message set to 1. I think that it is causing the issue but I don't know why.
                            Last edited by Miko95; Jun 17th, 2012, 10:05 AM.


                            • #15

                              removing release-strategy-method="canRelease" from the aggregator. Do you think it's a bug?
                              No. It's expecteed behavior. 'ReleaseStrategy' and many other strategies in the framework should follow with some rules:
                              1. Or they implements some specific strategy interface (e.g. TimeoutCountSequenceSizeReleaseStrategy), then you can refer to them in the config without 'method'
                              2. Or 'strategy-method' should have some specific annotation (e.g. org.springframework.integration.annotation.Release Strategy)
                              3. Or you should just follow with 'method-invocation' pattern: when you refer to you Bean and mention his method.

                              I think that it is causing the issue but I don't know why.
                              I've asked you before: is there any reason to use here <publish-subscribe-channel>, specially apply-sequence="true"?
                              How your message flow feels without the last one?