Announcement Announcement Module
Collapse
No announcement yet.
Nested Splitters/Aggregators, correlationIds, and a NullPointerException Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Nested Splitters/Aggregators, correlationIds, and a NullPointerException

    I have the following setup using Spring Integration 2.0.0.RELEASE: service activator (indexerWorker) -> gateway (indexService) -> splitter (jobToItemsSplitter) -> transformer (itemToDocumentTransformer) -> aggregator (documentsForItemAggregator) -> splitter (documentToSolrCoreSplitter) -> router (coreRouter) -> service activator (solrWriter) -> aggregator (indexedDocumentsToIndexedItemResultAggregator) -> (response channel back to first service gateway).

    I'm noticing that the gateway is creating a message id, which the jobToItemsSplitter is using as the correlationId in the message headers it sends out. The interesting thing is that the documentForItemAggregator leaves the correlationId in the header of the message it sends once it has gathered together the documents for an item. The problem arises the moment the documentToSolrCoreSplitter creates document bundles from the set sent from documentForItemAggregator. Since the correlationId has been left by the aggregator, the code around the documentToSolrCoreSplitter, creates a sequence details using the correlationId and a null for the sequence number and the sequence size. This will finally cause the code around indexerWorker (specifially MessageBuilder.popSequenceDetails) to error with a NullPointerException because the sequence number and total numbers are null.

    Here's an example of the logs:
    Code:
    Message from Gateway (indexService):
    2010/12/15 14:34:17,496 [jobItemExecutor-1] INFO  org.springframework.integration.handler.LoggingHandler  - [Payload=111][Headers={timestamp=1292445257495, id=804b86fa-c8c5-4b0e-b5a9-6df67500f025, errorChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@4e9b7d, replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@4e9b7d}]
    
    Message from Splitter (jobToItemsSplitter):
    2010/12/15 14:34:17,780 [jobItemExecutor-1] INFO  org.springframework.integration.handler.LoggingHandler  - [Payload=com.peapod.indexer.worker.IndexedProduct@96ac47][Headers={timestamp=1292445257780, id=a51c6f75-1237-47f1-9a7c-c52cae1486e2, errorChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@4e9b7d, correlationId=804b86fa-c8c5-4b0e-b5a9-6df67500f025, replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@4e9b7d, sequenceSize=900, sequenceNumber=1}]
    
    Message from Aggregator (documentForItemAggregator):
    2010/12/15 15:04:12,148 [docGeneratorExecutor-59] INFO  org.springframework.integration.handler.LoggingHandler  - [Payload=com.peapod.indexer.worker.IndexedProducts@17533df][Headers={timestamp=1292445567519, id=5bdc5a42-d251-4559-a5f2-fcfa30d9b86d, errorChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@4e9b7d, replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@4e9b7d, correlationId=804b86fa-c8c5-4b0e-b5a9-6df67500f025}]
    
    Message from Splitter (documentToSolrCoreSplitter):
    2010/12/15 15:04:14,038 [docGeneratorExecutor-59] INFO  org.springframework.integration.handler.LoggingHandler  - [Payload=com.peapod.indexer.worker.IndexedProductBunch@86558][Headers={timestamp=1292447054038, id=9e99e91a-f75c-4972-9aa9-896a8613a1a5, errorChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@4e9b7d, correlationId=5bdc5a42-d251-4559-a5f2-fcfa30d9b86d, replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@4e9b7d, sequenceSize=1, sequenceDetails=[[804b86fa-c8c5-4b0e-b5a9-6df67500f025, null, null]], sequenceNumber=1}]
    Now the (indexedDocumentsToIndexedItemResultAggregator) will use the correlation id sequenceSize and sequenceNumber fields out of the header, leaving the sequenceDetails in place. And here's the stack trace when the final response is sent back to the gateway and on to the service activator.

    Code:
    Exception in thread "main" org.springframework.integration.MessageHandlingException: 
    	at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:76)
    	at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:64)
    ....
    Caused by: java.lang.NullPointerException
    	at org.springframework.integration.support.MessageBuilder.popSequenceDetails(MessageBuilder.java:210)
    	at org.springframework.integration.aggregator.AbstractAggregatingMessageGroupProcessor.processMessageGroup(AbstractAggregatingMessageGroupProcessor.java:53)
    	at org.springframework.integration.aggregator.CorrelatingMessageHandler.completeGroup(CorrelatingMessageHandler.java:306)
    	at org.springframework.integration.aggregator.CorrelatingMessageHandler.handleMessageInternal(CorrelatingMessageHandler.java:179)
    	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
    	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:110)
    	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97)
    	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:44)
    	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:157)
    	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:128)
    	at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:288)
    	at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:149)
    	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendMessage(AbstractReplyProducingMessageHandler.java:176)
    	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendReplyMessage(AbstractReplyProducingMessageHandler.java:160)
    	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.produceReply(AbstractReplyProducingMessageHandler.java:125)
    	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleResult(AbstractReplyProducingMessageHandler.java:119)
    	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:101)
    	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
    	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:110)
    	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97)
    	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:44)
    	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:157)
    	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:128)
    	at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:288)
    	at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:149)
    	at org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:220)
    	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
    	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:110)
    	at org.springframework.integration.dispatcher.UnicastingDispatcher.access$000(UnicastingDispatcher.java:51)
    	at org.springframework.integration.dispatcher.UnicastingDispatcher$1.run(UnicastingDispatcher.java:92)
    	... 4 more
    So my questions are:

    1. Should the code around the documentsForItemAggregator set the id of the message it sends to the correlationId used in the messages it received?

    2. If the answer to question one is no, should the code around the documentToSolrCoreSplitter be checking to see if the sequenceNumber and sequenceSize are null prior to setting them in the sequenceDetails, and should it set the values to 1?

    3. Could the code around the documentToSolrCoreSplitter simply check for the absence of a sequenceNumber and sequenceSize in the header of the message it receives and then simply use the correlationId (in the header of the received message) as the correlationId in the set of messages it sends out?

    My apologies if the phrase "the code around" is too vague, it just means the Spring Integration framework code. I would like to know the best approach to fixing this because I'll need to add it myself until a proper fix can be released. (That's assuming that my use of the integration framework is not in error.)
    Last edited by j.c.watts; Dec 15th, 2010, 05:02 PM.

  • #2
    You could try adding a <header-filter/> to the flow after the documentForItemAggregator to remove the correlation id.

    Might be a quick work-around.

    Comment


    • #3
      That's an interesting idea, but would I be losing the context (original message id) that would enable the gateway/activator to match up the response with the request?

      If I'm understanding how the integration framework works, a gateway creates a header with a message id. That original id will be used as the correlationId by any other component in pipeline, when it's passing the message along. The exception to this, that I see, is in the aggregator/splitter, which uses the sequenceDetails as a kind of stack to preserve the correlationId and allow nested splitters and aggregators. If I tossed the correlationId out after the documentForItemAggregator, I don't know how the final message sent back to the gateway/activator would be matched with the original request.

      That said, I have only been using the framework for about a couple of weeks, and I might be missing something.

      Comment


      • #4
        You don't have to worry about that, the end of the flow sends the result to the replyChannel header which is how it gets back to the gateway...

        Code:
        replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@4e9b7d
        If you want to retain the original message id for your own purposes (logging etc), you can use a <header-enricher/> to add a custom header.

        Comment


        • #5
          BTW, just re-read your original post - your final aggregator doesn't need an output channel; the replyChannel header will be used.

          Comment


          • #6
            BTW #2.

            Removing the header is just a work-around; I do think there's a bug here; we should certainly not throw an NPE.

            https://jira.springsource.org/browse/INT-1690
            Last edited by Gary Russell; Dec 15th, 2010, 05:55 PM.

            Comment


            • #7
              Originally posted by Gary Russell View Post
              You don't have to worry about that, the end of the flow sends the result to the replyChannel header which is how it gets back to the gateway...

              Code:
              replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@4e9b7d
              If you want to retain the original message id for your own purposes (logging etc), you can use a <header-enricher/> to add a custom header.
              That makes sense. I got so focused on following the correlationId that I missed the replyChannel. Thanks for the quick replies.

              Comment

              Working...
              X