Announcement Announcement Module
Collapse
No announcement yet.
Thread un-safety in jms:outbound-gateway with reply listener? Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Thread un-safety in jms:outbound-gateway with reply listener?

    ***My apologies for the double-post. I did not notice the JMS sub-forum at first***

    Hello:

    I've been using SI for a while now with great success, but recently heavier loading has exposed a problem in my JMS outbound-gateway with a reply listener. The problem seems to occur when the loading causes more than one thread to overlap on invoking the gateway.

    I have set up a unit test where I beat on this gateway with multiple threads. In this test the request/reply payload and the service activator are both stubs of no consequence. If I serialize the messaging everything works perfectly fine. If I allow the threads to hit the gateway in parallel things go south very quickly. The problem manifests as replies that are not being correlated correctly or otherwise being discarded (they are being de-queued), then reply timeouts occur.

    I am still trying to debug through it, but between the multithreading and the asynchronous behaviors it is a beast to follow. Any help you could offer would be greatly appreciated.

    Below is the relevant config. Does anything jump out at you? Please note that it works flawlessly outside of multithreading, so we can skip the basic invalid configuration questions.

    Code:
            <!-- The outbound gateway bean -->
    	<int:gateway id="asyncActionRequestGateway"
    		service-interface="xxx">
    		<int:method name="executeAsyncAction" request-channel="asyncActionRequestChannel" />
    	</int:gateway>
    
            <!-- The request channel -->
            <!-- 'integrationListenerTaskExecutor' is prototype-scoped -->
    	<int:channel id="asyncActionRequestChannel">
    		<int:dispatcher task-executor="integrationListenerTaskExecutor"/>
    	</int:channel>
    
    
            <!-- The reply channel -->
            <!-- 'integrationListenerTaskExecutor' is prototype scoped -->
    	<int:channel id="asyncActionReplyChannel">
    		<int:dispatcher task-executor="integrationListenerTaskExecutor"/>
    	</int:channel>
    	
    
            <!-- The outbound JMS rigging -->
            <!-- ${integration.listener.threads.maximum} == 'integrationListenerTaskExecutor'.corePoolSize -->
    	<int-jms:outbound-gateway request-channel="asyncActionRequestChannel"
    		request-destination-name="${jms.destination.name.asyncactionrequest}"
    		reply-channel="asyncActionReplyChannel" reply-destination-name="${jms.destination.name.asyncactionreply}"
    		message-converter="asyncActionMessageConverter" header-mapper="asyncActionMessageConverter" 
    		reply-timeout="${bpm.actionservice.reply.timeout}" receive-timeout="${bpm.actionservice.receive.timeout}">
    		<int-jms:reply-listener concurrent-consumers="${integration.listener.threads.maximum}" />
    	</int-jms:outbound-gateway>
    
    
            <!-- The inbound JMS rigging -->
            <!-- ${integration.listener.threads.maximum} == 'integrationListenerTaskExecutor'.corePoolSize -->
    	<int-jms:inbound-gateway request-channel="asyncActionRequestChannel"
    		request-destination-name="${jms.destination.name.asyncactionrequest}" 
    		reply-channel="asyncActionReplyChannel" selector="${jms.asyncaction.selector.expression}"
    		concurrent-consumers="${integration.listener.threads.maximum}"
    		reply-timeout="${bpm.actionservice.reply.timeout}" 
    		receive-timeout="${bpm.actionservice.receive.timeout}"
    		request-timeout="${bpm.actionservice.request.timeout}" />

  • #2
    Please provide your settings for all placeholders, especially ${bpm.actionservice.receive.timeout} ${bpm.actionservice.reply.timeout}.

    We have extensive multi-threaded tests; they have failed under load from time to time, but it usually came down to the receive-timeout being too small on the outbound gateway, when the build server was busy doing other things. In such an environment, the sending thread timed out and then the reply couldn't be correlated because there was no longer an entry in the 'replies' map (which maps the correlationId to a blocking queue (on which the sending thread is waiting for a reply).

    On the inbound side, the reply-timeout is also important because the gateway gives up when that is exceeded, and no reply will be sent to the outbound gateway.

    Both these timeouts need to be sufficiently large to avoid these issues.

    You said you were seeing replies that could not correlated, so the first condition is most likely the cause.

    The "easiest" way to debug these issues is to run with debug logging; look for

    Code:
    logger.debug("Reply for correlationId " + correlationId + " received early or late");
    log entries; and then find other log entries for the correlationId.

    You will likely find

    Code:
    logger.debug(this.getComponentName() + " Timed out waiting for reply with CorrelationId " + correlationId);
    earlier in the log. It's much easier to debug if you include timestamps (milliseconds) and threads (%t) in the debug logs.

    If you see something else, please attach a zip with the log.

    Comment


    • #3
      BTW, I noticed you have the same placeholder for the reply-timeout on the in and out gateways.

      These timeouts are quite different; the outbound reply-timeout is rarely important on the outbound-gateway; it is only used if the reply channel can block; in your case, this would need a RejectedExecutionHandler that blocks when no threads are available for the task executor to run immediately.
      Last edited by Gary Russell; Jun 8th, 2013, 07:27 AM.

      Comment


      • #4
        I just looked at the code again, and I think I've found a bug, when using the default correlation technique.

        Can you try setting
        Code:
        correlation-key="JMSCorrelationID"
        on the outbound gateway?

        That should eliminate the issue; in the meantime I'll see if I can create a test to prove out what I have seen.

        Thanks.

        Comment


        • #5
          After further review, what I thought might be an issue is not; so the correlation will work with a container, regardless of the correlation-key setting (but see below).

          After looking at your configuration more carefully, you have a specific reply destination, but no correlation-key.

          When using default correlation (no correlation-key), you cannot use the same reply queue across multiple gateway instances because we have not way to put a message selector on the listener container. Since we have no way to determine if the gateway is the sole consumer, we therefore disable the listener in this case and emit a warning...

          Code:
          if (logger.isWarnEnabled()) {
          	logger.warn("The gateway cannot use a reply listener container with a specified destination(Name/Expression) " +
          			"without a 'correlation-key'; " +
          			"a container will NOT be used; " +
          			"to avoid this problem, set the 'correlation-key' attribute; " +
          			"some consumers, including the Spring Integration <jms:inbound-gateway/>, " +
          			"support the use of the value 'JMSCorrelationID' " +
          			"for this purpose. Alternatively, do not specify a reply destination " +
          			"and a temporary queue will be used for replies.");
          }
          So, now, your situation continues to be intriguing to me because the gateway will fall back to creating a consumer-per-message, with a message-selector specifically for that message id.

          So I am back to my original hypothesis that the receiveTimeout is too small, but even with that fixed, you'd gain some benefit by using a correlation-key (e.g. JMSCorrelationID).

          Comment


          • #6
            Thanks so much for the fast response! I don't like it when people foist their emergencies on forums and demand fast action, but this one is a big problem for our project so I really appreciate your attention.

            I will try to catch up with your questions:

            Here is the non-placeholdered config:

            HTML Code:
            	<int:channel id="asyncActionReplyChannel">
            		<int:dispatcher task-executor="integrationListenerTaskExecutor"/>
            	</int:channel>
            	
            	<int:channel id="asyncActionRequestChannel">
            		<int:dispatcher task-executor="integrationListenerTaskExecutor"/>
            	</int:channel>
            
            	<int-jms:outbound-gateway request-channel="asyncActionRequestChannel"
            		request-destination-name="asyncActionRequestQueue"
            		reply-channel="asyncActionReplyChannel" reply-destination-name="asyncActionReplyQueue"
            		message-converter="asyncActionMessageConverter" header-mapper="asyncActionMessageConverter" 
            		reply-timeout="10000" receive-timeout="10000" correlation-key="JMSCorrelationID">
            		<int-jms:reply-listener concurrent-consumers="10" />
            	</int-jms:outbound-gateway>
            
            	<int-jms:inbound-gateway request-channel="asyncActionRequestChannel"
            		request-destination-name="asyncActionRequestQueue" 
            		reply-channel="asyncActionReplyChannel" 
            		selector="processActionName='DerivationExecutionAction'"
            		concurrent-consumers="10"
            		reply-timeout="10000" 
            		receive-timeout="10000"
            		request-timeout="10000"
            		correlation-key="JMSCorrelationID" />
            I do know that reply/receive timeout are 2 very different things - I just keep forgetting which is which so around 4AM last night I was trying the safe approach and making them both high. In the serialized unit test the replies are all in the < 1s range so I figure 10 seconds is plenty for all timeouts.

            Another important note about the thread pools I use is that I am using the CallerRuns rejection policy. Earlier on I did have dropped traffic caused by rejected consumer executions but that is no longer the case. Note also that the test is being driven by a threadpool of equal size to the pools used for the SI components, so there ought to be a consistent # of slots available all the way through.

            I think I may have seen the same thing you were referring to about the correlation-key, which spurred me to try adding it, but it does not make a difference. I have tried it onboth inbound and outbound and either or with no change.

            Below is the DEBUG log from this last test:
            The "Queueing action" log is from the multithreaded test calling the gateway
            The "received message" log id from the stub service behind the activator.

            Now that I look at these logs more closely a one thing seems curious:
            I have supplied all my own thread pools for SI that do thread naming "JMS Listener ##", and yet I have many logs from "DefaultMessageListenerContainer". What task executor is running these? Are there more things being scheduled in my executor than strictly the max-conccurent-consumer settings? This might be an issue since I set the pool size to be equal to this setting.

            I am attachign the logs in a zip since they are too big for the post.

            I will create another zip that I can give you to test with as soon as I can. Please let me know if these logs expose anything for you.

            Attachment
            Attached Files

            Comment


            • #7
              I am continuing to scrutinize these logs, and another thing jumped out at me:

              At the first outbound message, I see the creation of the reply bridge:

              Code:
              2013-06-08 12:13:02,282 DEBUG | org.springframework.jms.listener.DefaultMessageListenerContainer#0-2 | o.s.integration.jms.ChannelPublishingJmsMessageListener | converted JMS Message [HornetQMessage[ID:708cf75b-d06f-11e2-8648-e4d53d9651fd]:PERSISTENT] to integration Message payload [[email protected]f67bf] 
              2013-06-08 12:13:02,284  INFO | org.springframework.jms.listener.DefaultMessageListenerContainer#0-2 | o.s.integration.endpoint.EventDrivenConsumer            | Adding {bridge:null} as a subscriber to the 'asyncActionReplyChannel' channel 
              2013-06-08 12:13:02,284  INFO | org.springframework.jms.listener.DefaultMessageListenerContainer#0-2 | org.springframework.integration.channel.ExecutorChannel | Channel 'asyncActionReplyChannel' has 1 subscriber(s). 
              2013-06-08 12:13:02,284  INFO | org.springframework.jms.listener.DefaultMessageListenerContainer#0-2 | o.s.integration.endpoint.EventDrivenConsumer            | started [email protected]61a1 
              2013-06-08 12:13:02,285 DEBUG | org.springframework.jms.listener.DefaultMessageListenerContainer#0-2 | org.springframework.integration.channel.ExecutorChannel | preSend on channel 'asyncActionRequestChannel', message: [Payload=[email protected]f67bf][Headers={timestamp=1370718782285, id=0b3f2ae6-dd96-43f3-9868-10721b52877b, errorChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@7bb5a43c, JMSXDeliveryCount=1, jms_timestamp=1370718782277, replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@7bb5a43c, jms_redelivered=false, processActionName=DerivationExecutionAction, jms_correlationId=c23d122b-baf2-4243-9c9f-3f178958229f_2, jms_replyTo=HornetQQueue[asyncActionReplyQueue], jms_messageId=ID:708cf75b-d06f-11e2-8648-e4d53d9651fd}] 
              2013-06-08 12:13:02,285 DEBUG | org.springframework.jms.listener.DefaultMessageListenerContainer#0-2 | org.springframework.integration.channel.ExecutorChannel | postSend (sent=true) on channel 'asyncActionRequestChannel', message: [Payload=[email protected]f67bf][Headers={timestamp=1370718782285, id=0b3f2ae6-dd96-43f3-9868-10721b52877b, errorChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@7bb5a43c, JMSXDeliveryCount=1, jms_timestamp=1370718782277, replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@7bb5a43c, jms_redelivered=false, processActionName=DerivationExecutionAction, jms_correlationId=c23d122b-baf2-4243-9c9f-3f178958229f_2, jms_replyTo=HornetQQueue[asyncActionReplyQueue], jms_messageId=ID:708cf75b-d06f-11e2-8648-e4d53d9651fd}]
              But I never see another one after that. I would have expected to see as many as the concurrent-consumers setting on the reply-listener. Is this the wrong expectation?

              I seem to recall there being a separation of settings between concurrent threads (tasks) and the number of messages to handle per task, and the above makes me think that might be significant. I will dig in some more.

              Comment


              • #8
                One other thing that strikes me as potentially out of the ordinary about my setup is that I have the inbound and outbound gateways defined in the same place using the same channel refs. I would expect the more typical scenario to be that the service activator is in another application that is being integrated via JMS. I do this because I want these messages to be distributed when I have multiple deployments.

                I tried separating out the channels so that there was both an in/out channel for the request and reply, and giving each gateway distinct channel refs, but in this configuration it would not send the message at all.

                I'm mobile at the moment so I can't post that config, but do you think there is something to this?

                Comment


                • #9
                  I have the inbound and outbound gateways defined in the same place using the same channel refs.
                  You can't do that; the default dispatcher round robins requests to subscribers; you have the outbound gateway and a header value router subscribed to the 'asyncActionRequestChannel'; some requests go to the gateway; some skip the gateway. Same thing for the reply channel.

                  Similarly the inbound gateway will send every other received message directly back to the inbound gateway...

                  2013-06-08 12:13:02,293 DEBUG | org.springframework.jms.listener.DefaultMessageLis tenerContainer#0-1 | org.springframework.integration.channel.ExecutorCh annel | postSend (sent=true) on channel 'asyncActionRequestChannel', message: [Payload=com.etp.bpm.services.jaxwsbindings.data.Pr ocessActionInstance@d39b469][Headers={timestamp=1370718782293, id=d5ee700f-342b-42ea-9511-34698a4b9c3f, errorChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@1f19b44d, JMSXDeliveryCount=1, jms_timestamp=1370718782275, replyChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@1f19b44d, jms_redelivered=false, processActionName=DerivationExecutionAction, jms_correlationId=c23d122b-baf2-4243-9c9f-3f178958229f_1, jms_replyTo=HornetQQueue[asyncActionReplyQueue], jms_messageId=ID:708ca937-d06f-11e2-8648-e4d53d9651fd}]
                  2013-06-08 12:13:02,294 DEBUG | JMS Listener #19 | org.springframework.integration.jms.JmsOutboundGat eway | org.springframework.integration.jms.JmsOutboundGateway#0 received message: [Payload=com.etp.bpm.services.jaxwsbindings.data.Pr ocessActionInstance@d39b469][Headers={timestamp=1370718782293, id=d5ee700f-342b-42ea-9511-34698a4b9c3f, errorChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@1f19b44d, JMSXDeliveryCount=1, jms_timestamp=1370718782275, replyChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@1f19b44d, jms_redelivered=false, processActionName=DerivationExecutionAction, jms_correlationId=c23d122b-baf2-4243-9c9f-3f178958229f_1, jms_replyTo=HornetQQueue[asyncActionReplyQueue], jms_messageId=ID:708ca937-d06f-11e2-8648-e4d53d9651fd}]
                  As a general observation, you don't need so much async processing; making one of the channels async in each flow is typically all you need.

                  For example, on the client side making both channels executor channels is redundant. Your 'Deferred Command' thread hands off to an 'JMS Listener' thread, which blocks in the ob gateway waiting for a reply. When the reply is received, the JMS listener thread hands the reply off to another jms listener thread - why not just process the reply on the first "listener" thread? Also, on the inbound side, you should manage concurrency with the inbound gateway rather than immediately handing over to another thread.

                  Further, it is unusual to need more than one consumer on the <reply-listener/>; the action performed on this thread is extremely lightweight - all it does is hand the reply off to the thread that's waiting for it; it does no processing on the message at all (aside from message conversion).

                  Re post#6 - those threads are created by the DMLC used within the inbound gateway; if you want to specify that task executor, you'll need to declare a DMLC (using a <bean/> definition) and provide it to the gateway using the 'container' attribute. The <reply-listener/> currently does not permit the specification of a task executor. I have created an issue for that but, as I mentioned above it typically only needs one thread.

                  Re post#7 - that message is from the inbound gateway (not the reply listener); it is simply initial setup of an internal bridge in the gateway; it's only needed once.

                  ...configuration it would not send the message at all.
                  That must have been a different configuration issue; sharing channels like this will never work.


                  Finally, in order to ease debugging multi-system messaging, it would be advantageous if your 'ProcessActionInstance' had a toString() method that included some unique identifier. That way you can use simple greps/finds across systems to track the flow of a particular request/reply in the logs.
                  Last edited by Gary Russell; Jun 9th, 2013, 04:58 AM.

                  Comment


                  • #10
                    Thanks again, Gary - this is great information.

                    Right around when I started putting task executors on those re-used channels in the effort to troubleshoot was when it did start to smell fishy to me. I thought perhaps SI was doing something extra genius here and detecting the dual-use nature of the channels, but your explanation makes perfect sense as to why that is a poor assumption.

                    I have changed things around to the config below, which I believe is more in line with your recommendations. Problem is, I still encounter issues when the test is multithreaded. This time it manifests as indefinite hanging in the call to the gateway. I have attached a new log. I would put toString() on the payload classes, but they are being generated from XSD on execution. I have been using the @xxxxxx objectId, which seems to do fine as a log correlator.

                    I am also still working on getting you a zip to test with.

                    Code:
                    	<int:gateway id="asyncActionRequestGateway"
                    		service-interface="com.etp.bpm.actionservice.ProcessActionService">
                    		<int:method name="executeAsyncAction" request-channel="asyncActionRequestOutChannel" reply-channel="asyncActionReplyInChannel"/>
                    	</int:gateway>
                    
                    	<int:channel id="asyncActionRequestOutChannel"/>
                    	
                    	<int:channel id="asyncActionRequestInChannel">
                    		<int:dispatcher task-executor="integrationListenerTaskExecutor"/>
                    	</int:channel>
                    
                    	<int:channel id="asyncActionReplyInChannel"/>
                    	
                    	<int:channel id="asyncActionReplyOutChannel"/>
                    
                    
                    	<int-jms:outbound-gateway request-channel="asyncActionRequestOutChannel"
                    		request-destination-name="${jms.destination.name.asyncactionrequest}"
                    		reply-channel="asyncActionReplyInChannel" 
                    		reply-destination-name="${jms.destination.name.asyncactionreply}"
                    		message-converter="asyncActionMessageConverter" 
                    		header-mapper="asyncActionMessageConverter" 
                    		reply-timeout="${bpm.actionservice.reply.timeout}" 
                    		receive-timeout="${bpm.actionservice.receive.timeout}" 
                    		correlation-key="JMSCorrelationID">
                    	</int-jms:outbound-gateway>
                    
                    	<int-jms:inbound-gateway request-channel="asyncActionRequestInChannel"
                    		request-destination-name="${jms.destination.name.asyncactionrequest}" 
                    		reply-channel="asyncActionReplyOutChannel" 
                    		selector="${jms.asyncaction.selector.expression}"
                    		concurrent-consumers="${integration.listener.threads.maximum}"/>
                    Attachment
                    Attached Files

                    Comment


                    • #11
                      Correction: The hang is not indefinite - I just forgot to restore my much increased timeout setting that I was using to debug.

                      All timeouts are set at 10000, and the behavior is essentially the same as when I started (outbound gateway throws timeout exception after 10 seconds).

                      Comment


                      • #12
                        OK, here is the test case. It is in the form of a Maven project - if that is a problem let me know and I can package the dependencies for you.

                        Attachment

                        In the test config I have included a header value router that is representative of a similar one in my application. When I run the test I get the same result with or without it, but I thought it would be best to include it for the sake of being thorough.

                        In the test class, if you uncomment the Future.get() line in the sending loop and serialize the messaging, the test will complete correctly.

                        Here is the output from this test.

                        Attachment
                        Attached Files

                        Comment


                        • #13
                          I'll look at the logs in the morning; I am overseas at the moment and it's the middle of the night here.

                          The problem with using the @xxxxxx is that you are using java serialization so the @xxxxxx will change to @yyyyyy on the "other side" of HornetQ.

                          So you end up having to use @xxxxxx up to the ob gateway, then grab the correlation id in the log just before the send to "jump" the the other side. Manageable, for sure; it's just easier to debug if you can track a payload (or header) across systems. If you can't add a toString() to the payload, an alternative would be to add a custom header (unique) and change the header mappings on the gateways to pass it over JMS.

                          It would also be helpful if you could share your full configuration (server side too) and include in the logs the full org.springframework DEBUG log right from the beginning, showing all the bean instantiation and channel wiring.

                          You still have an executor on the request 'in' channel - as i said, concurrency on the inbound side is managed by the container and you don't need to hand off to another thread.

                          Perhaps you intended to put it on the request 'out' channel, to free up your 'Deferred Command' thread?

                          Comment


                          • #14
                            Sorry - that response was to your post #10 - I didn't notice we'd spilled over to a second page.

                            Comment


                            • #15
                              No problem - thanks for taking the time.

                              Good point on the ID. I thought of that a little bit after posting, so in the test case the payload has an internal ID that it toString()s to.

                              I'm a little confused about what you said regarding the channel executor though. I see why I only need a single thread for the reply-listener since it is just dispatching notifications to the waiting outbound-gateway threads, but wouldn't that be the 'asyncActionReplyInChannel' channel?

                              I have the executor on the 'asyncActionRequestInChannel', which I thought was giving me the ability to have multiple parallel invocations of 'derivationActionService' by the service-activator.

                              Or are you saying that the inbound-gateway consumer is also just a dispatcher to a task executor? If that is the case, where do I configure that task executor?

                              Comment

                              Working...
                              X