Announcement Announcement Module
Collapse
No announcement yet.
Resequencer discarding messages to discard-channel Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Resequencer discarding messages to discard-channel

    I have a scenario where messages are resequenced based on simple sequence number and then sent to an aggregator which has a simple aggregation strategy of appending payloads. I need to sequence the incoming packets correctly to ensure sanity in the aggregated payload.

    I am noticing that the resequencer is randomly (it works sometimes) sending messages to the discard-channel which are eventually sent to the DLQ for lack of handling. I have made no configuration to set an explicit timeout anywhere. Here is my configuration in short. Why are messages being sent to discard even though the sequencing is not complete ?

    <resequencer input-channel = "aggregatorInputChannel"
    discard-channel = "resequencerDiscardChannel"
    output-channel = "reseqAggreChannel"
    release-partial-sequences = "true"
    send-partial-result-on-expiry = "false"
    />
    <aggregator input-channel = "reseqAggreChannel"
    output-channel = "decryptionChannel"
    discard-channel = "aggregatorDiscardChannel"
    send-partial-result-on-expiry = "false"
    ref = "messageAggregatorBean" method = "aggregate"
    expire-groups-upon-completion = "true">
    </aggregator>



    <!-- The aggregate() method in the PacketAggregator aggregates the messages -->
    <beans:bean id = "messageAggregatorBean" class = "com.auric.qrev.integration.aggregator.PacketAggre gator" />

    <!-- JMS backed channels. These need to be JDBC backed channels -->
    <int-jms:channel id = "aggregatorDiscardChannel" queue-name="aggregatorDiscardChannel"
    connection-factory = "connectionFactory" />
    <int-jms:channel id = "resequencerDiscardChannel" queue-name="resequencerDiscardChannel"
    connection-factory = "connectionFactory" />
    <int-jms:channel id = "reseqAggreChannel" queue-name="reseqAggreChannel"
    connection-factory = "connectionFactory" />


    I have made sure that the CORRELATION_ID, SEQUENCE_NUMBER and SEQUENCE_SIZE are correct as seen below from some logging statements.

    23:17:40,419 DEBUG .integration.handler.MessageHandlerChain: 67 - org.springframework.integration.handler.MessageHan dlerChain#0 received message: [Payload=2!3.0!1*02216601!3.0!1*02232101!2.0!1*0225 5601!2.0!1*02097601!2.0!1*02104101!2.0!1][Headers={timestamp=1349372860403, id=99916ce4-1375-4908-a9d2-412275a73eed, customerId=919952028650, correlationId=9199520286504, expirationDate=1349459260403, sequenceSize=3, sequenceNumber=3}]
    23:17:40,419 DEBUG on.aggregator.ResequencingMessageHandler: 67 - org.springframework.integration.aggregator.Reseque ncingMessageHandler#dd1e765 received message: [Payload=2!3.0!1*02216601!3.0!1*02232101!2.0!1*0225 5601!2.0!1*02097601!2.0!1*02104101!2.0!1][Headers={timestamp=1349372860403, id=99916ce4-1375-4908-a9d2-412275a73eed, customerId=919952028650, correlationId=9199520286504, expirationDate=1349459260403, sequenceSize=3, sequenceNumber=3}]
    23:17:40,419 DEBUG egator.AbstractCorrelatingMessageHandler: 177 - Handling message with correlationKey [9199520286504]: [Payload=2!3.0!1*02216601!3.0!1*02232101!2.0!1*0225 5601!2.0!1*02097601!2.0!1*02104101!2.0!1][Headers={timestamp=1349372860403, id=99916ce4-1375-4908-a9d2-412275a73eed, customerId=919952028650, correlationId=9199520286504, expirationDate=1349459260403, sequenceSize=3, sequenceNumber=3}]
    23:17:40,420 DEBUG org.apache.activemq.ActiveMQSession: 558 - ID:libra-55976-1349369389207-0:12:6 Transaction Commit :TX:ID:libra-55976-1349369389207-0:12:18
    23:17:40,420 DEBUG org.apache.activemq.TransactionContext: 290 - Commit: TX:ID:libra-55976-1349369389207-0:12:18 syncCount: 1

    And the others are simply with the needed changes in the sequenceNumber and sequenceSize.
    Any clues why the resequencer is behaving like this. I had originally chained the resequencer and aggregator.

  • #2
    I can't see how this could happen. I mean I can't see how your configuration can send messages to a discard-channel. The 'discard-channel' only plays a role if 'send-partial-result-on-expiry' is set to 'true' and in your case its 'false'. Further more, 'send-partial-result-on-expiry' is meaningless unless you have a MessageGroupStoreReaper configured or expiring MessageGrops manually (e.g., JMX)

    Can you try to isolate the problem into a reproducible piece of code (e.g., test case) so we can take a look?

    Comment


    • #3
      Resequencer discarding messages to discard-channel

      I am working on isolating the issue into a simple test case. A quick question before that

      Under what circumstances besides the ones you have mentioned above does the discard-channel for resequencer come into the picture. Quite some time ago, I had a posted a question for aggregator

      http://forum.springsource.org/showth...s&goto=newpost

      There is no "expire-groups-upon-completion" for resequencer. How then does it react to messages coming in with the same correlation IDs ? I am definitely seeing movement of messages onto the discard channel. Is there a mechanism for me to tap into and determine the reason why exceptions are thrown, causing the transaction to roll back and eventually the message being sent to resequencer-discard and finally to DLQ. Also, by default I have kept the Expiry of the message to be ONE day after the creation time. I assume, that after the messages are consumed by a consumer, identical correlation IDs wouldnt matter, right ?

      Would this kind of configuration cause issues (see Below) ?

      <chain input-channel = "aggregatorInputChannel" output-channel = "decryptionChannel"
      send-timeout = "60000">

      <resequencer discard-channel = "resequencerDiscardChannel"
      release-partial-sequences = "true"
      send-partial-result-on-expiry = "false"
      />
      <aggregator discard-channel = "aggregatorDiscardChannel"
      send-partial-result-on-expiry = "false"
      ref = "messageAggregatorBean" method = "aggregate"
      expire-groups-upon-completion = "true">
      </aggregator>

      </chain>

      Comment


      • #4
        I can't think of any scenario where it would come into play since Messages are sent to a discard channel *only* if MessageGroup is complete. In the Aggregator case MessageGroup will be marked as complete as soon as MessageGroup is released. However, for the Resequencer there is no code that will mark it complete ever. So the only way for Messages to ever go to the discard channel in the case of Resequencer IF some other process completed the MessageGroup (e.g., JMX)

        Comment

        Working...
        X