Announcement Announcement Module
Collapse
No announcement yet.
Message ordering when using poller with multiple consumers Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Message ordering when using poller with multiple consumers

    Hi,
    I have a question about the message ordering when pooling consumers are used.
    This is an example:
    Code:
    <service-activator input-channel="testChannel" ref="orderingHandler"
    		method="handle">
    		<poller task-executor="testPool" max-messages-per-poll="1">
    			<interval-trigger interval="0" />
    		</poller>
    	</service-activator>
    	<beans:bean id="orderingHandler" class="OrderingHandler" />
    	<thread-pool-task-executor id="testPool"
    		max-size="4" />
    In this example, we send 2 messages (supposing must be processed according to the reception order and this occurs) that are handled at the same time but in this case through 2 different threads. The problem is that the handler of the second message may finish first and therefore the order of the whole processing is altered.

    Is it possible to send the 2 messages (of the same group) and handle them one after the other in the same thread despite using multiple consumers?

    Thanks,

  • #2
    I don't know if it's relevant to this particular use-case (depends on where the sequence order needs to be maintained), but you might want to have a look at the Resequencer component: http://static.springframework.org/sp...ml#resequencer

    Comment


    • #3
      Resequencer problem

      Hi,
      I tried to use a Sequencer but it doesn’t work.
      I created a transformer to add the correlation id between the gateway and the sequencer. It seems that all messages are discarded and the payload sent to the discardHandler is different, an Integer with the value = 0.
      Here is the configuration:
      Code:
      	<channel id="channel1">
      		<queue capacity="5" />
      	</channel>
      	<gateway id="testGateway"
      		service-interface="es.dtg.nm.process.ordering.resequencer.TestGateway"
      		default-request-channel="channel1" />
      
      	<poller id="poller" default="true" max-messages-per-poll="10">
      		<interval-trigger interval="1000" />
      	</poller>
      
      	<transformer ref="testTransformer" method="transform"
      		input-channel="channel1" output-channel="channel2"></transformer>
      	<beans:bean id="testTransformer"
      		class="es.dtg.nm.process.ordering.resequencer.TestTransformer" />
      
      	<channel id="channel2">
      		<queue capacity="5" />
      	</channel>
      	<resequencer id="resequencer" input-channel="channel2"
      		output-channel="channel3" discard-channel="discardChannel"
      		release-partial-sequences="false" timeout="4200"
      		send-partial-result-on-timeout="false" reaper-interval="135"
      		tracked-correlation-id-capacity="99" send-timeout="9999" />
      
      	<channel id="discardChannel">
      		<queue capacity="5" />
      	</channel>
      	<service-activator input-channel="discardChannel" ref="discardHandler"
      		method="handle"/>
      	<beans:bean id="discardHandler"
      		class="es.dtg.nm.process.ordering.resequencer.DiscardHandler" />
      
      	<channel id="channel3">
      		<queue capacity="5" />
      	</channel>
      	<service-activator input-channel="channel3" ref="testHandler"
      		method="handle">
      		<!--
      			<poller task-executor="testPool" max-messages-per-poll="8">
      			<interval-trigger interval="0" /> </poller>
      		-->
      	</service-activator>
      	<beans:bean id="testHandler"
      		class="es.dtg.nm.process.ordering.resequencer.TestHandler" />
      	<thread-pool-task-executor id="testPool"
      		max-size="4" />
      Thanks,

      Comment


      • #4
        Can you post the TestTransformer code?

        Comment


        • #5
          Here is the TestTransformer class code:
          Code:
          public class TestTransformer {
          
          	public Message transform(Message msg){
          		System.out.println("input: " + msg);
          		Message result = MessageBuilder.withPayload(msg.getPayload()).setCorrelationId(msg.getPayload().toString()).build();
          		System.out.println("transformed: " + result);
          		return result;
          	}
          }
          I do setCorrelationId(payload). The payload are String values: "1", "2".

          Comment


          • #6
            Okay. First, you need to provide a common 'correlationId' for all Messages within a sequence. The Resequencer uses that to determine what message sequence the current message belongs to. Then, you need to set the 'sequenceNumber' and 'sequenceSize' headers as well. The Resequencer uses 'sequenceNumber' to determine whether the Message can be released (if the Messages with preceding sequenceNumbers have already been released). It uses the 'sequenceSize' to recognize when the sequence is complete. For example, you should have something like this:
            Code:
            Message result = MessageBuilder.withPayload(msg.getPayload())
                    .setCorrelationId(groupId)
                    .setSequenceNumber(currentIndex)
                    .setSequenceSize(groupSize)
                    .build();

            Comment


            • #7
              Thank you for replaying. Ok, I assigned all sequence headers but it still not working correctly. I attached a zip file with the example code.
              It discard one message with payload = 0 (not sent by me) and the first message sent. Moreover, the handler of the resequencer output channel doesn’t receive any message.
              I have another question, do the resequencer support both batch and stream resequencing using only “release-partial-sequences” parameter?
              Thanks,

              Comment


              • #8
                Hicyo,

                The message with payload 0 has been fixed as part of INT-602.

                The other problem is caused by a restriction in the Resequencer (explanation added to the javadoc as of last night).

                In order to be able to assess that a sequence has been processed correctly, the resequencer assumes that the incoming messages have sequence numbers starting with 1 up to sequenceSize. In your case, one of the messages has been rejected because its sequence number was larger than the sequence size. According to the same rule, the first message should have been rejected too (and wasn't), something that has been now made consistent. I added INT-603 to allow for customizing the ordering strategy as well as relaxing the rule.

                In your case, for getting the expected result, just get the latest version and increase the sequence size to 92 in the Transformer.

                Cheers,
                Marius

                Comment


                • #9
                  run 'org.springframework.integration.samples' jms problem

                  Hi-
                  i am trying to learn more about spring integration jms
                  within the 'org.springframework.integration.samples' module.

                  When I run the ChannelAdapterDemo or GatewayDemo, I got this warning:
                  Is my eclipse environment setting problem?
                  many thanks


                  WARNING: Ignored XML validation warning
                  org.xml.sax.SAXParseException: schema_reference.4: Failed to read schema document '.springframework.org/schema/integration/stream/spring-integration-stream-1.0.xsd', because 1) could not find the document; 2) the document could not be read; 3) the root element of the document is not <xsd:schema>.
                  at com.sun.org.apache.xerces.internal.util.ErrorHandl erWrapper.createSAXParseException(Unknown Source)

                  Comment


                  • #10
                    My warning is resolved by adding a proxy in vm arg for runtime

                    Comment

                    Working...
                    X