Announcement Announcement Module
Collapse
No announcement yet.
MDP not getting invoked on using DMLC Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • MDP not getting invoked on using DMLC

    I've an MDP that implements MessageListener interface and overrides its onMessage() method.

    Below is the listener configuration done in spring config file:
    Code:
    <bean id="jmsListener_V2_SCH_CT_Res_Queue" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="v2_CT_SCH_RES_WMQQueueConnectionFactory" />
            <property name="destination" ref="v2_CT_SCH_RES_WMQQueue" />
            <property name="messageListener" ref="ersOrcSvcResMDP"/>
            <property name="concurrentConsumers" value="1" />
            <property name="maxConcurrentConsumers" value="10" />
        </bean>
    I'm facing a sporadic issue of MDP not getting invoked when the number of concurrent messages on the message queue increases to which this MDP is listening.
    I've not been able to find out a pattern but more than once I observed that when number of "concurrent" messages send on the queue is more than 5, the MDP processes just one or two of those messages and then fails to consume the remaining messages.
    If I continue to send more messages on the queue, still MDP does not get invoked.

    The only option I'm left with is to restart the server.

    I've tried to play around with "concurrentConsumers" and "maxConcurrentConsumers" values but it doesn't help much. For example, I tried with values 10 and 50 respectively, though my MDP was able to process more than 5 concurrent messages, but it then failed to invoke when the number of concurrent messages reached a certain level (which was well less than 50 i.e., less than value of "maxConcurrentConsumers").

    Any pointers on what could be the issue and how can I debug the exact cause ?

    I'm concerned if there is any criterion by which MDP will fail to invoke when number of concurrent messages on the queue reaches a threshold (in other words, say if number of concurrent messages on the queue exceeds "maxConcurrentConsumers", will Spring container shut down the DMLC to process any further messages) ?

    Why does DMLC not process the pending messages in the queue once it has finished processing the messages as defined by "maxConcurrentConsumers" property ?

    Any help/guidance/suggestion shall be appreciated.

    Best Regards
    LB
    Last edited by lbvirgo; Jun 19th, 2013, 01:54 PM.

  • #2
    Is the MessageListener thread-safe?

    Typically, problems like this are caused by user code hanging up the container thread(s).

    The consumers know nothing about the size of the queue.

    The best way to debug would be to take a thread dump with jstack or VisualVM to see where the container threads are hung.

    Also, when using a variable number of threads, you must not use a CachingConnectionFactory (which is really unnecessary for listener containers anyway).

    Comment


    • #3
      Thanks for responding, Gary.

      Is the MessageListener thread-safe?
      How do I check this ? Just to elaborate, my MessageListener is a thin consumer that consumes message and then puts the payload on a Spring-Integration Gateway (where inturn it gets processed by going through various chains).

      Also, when using a variable number of threads, you must not use a CachingConnectionFactory (which is really unnecessary for listener containers anyway).
      What configuration I need to do to avoid using CachingConnectionFactory ?

      Best Regards.
      LB

      Comment


      • #4
        It's not just the listener that needs to be thread safe; it's any downstream component that's invoked on the container's thread (SI framework components are thread-safe). It's beyond the scope of this forum to explain thread-safety; there are lots of resources on the web. Essentially, you can't have any shared state (instance variables) unless they're protected with locks (but stateless is generally better wherever possible).

        Like I said, a thread dump will tell you immediately where the threads are hung.

        Unless you explicitly wrap the vendor's connection factory in a CachingConnetionFactory, you are NOT using one. Your factory's bean name implies you are not but I needed to point it out just in case.

        If your listener is as lightweight as you say, why not use a <int-jms:message-driven-channel-adapter/> instead of rolling your own?

        Comment


        • #5
          Hi Gary
          I do not have control over the remote server where my MDP is hosted. Moreover I'm not aware if they have opened up JMX port on the server. So getting a thread dump may not be a feasibility at the moment.

          However, I would like to make some observations here:
          1) I see that that the number of listeners that get started on server startup is upto "maxConcurrentConsumers" which is expected.
          2) Each listener is picking up a message from the queue and is "successfully" processing it (i.e., there is absolutely no exception at application level)
          3) I even tried to put a "finally" block with a "return" statement (my MDP code is packaged under a try-catch) just to ensure that MDP is returning the control back to container. Had put in few log statements as well in the "finally" block.
          I can see that all the log statements in the "finally" block are getting well printed (from which I can infer control is returning back to container from MDP).

          Any further suggestions ? This issue is now appearing to be a showstopper for my application to go live

          Also need guidance on use of <int-jms:message-driven-channel-adapter/>. My current MDP code extracts the payload from JMS message, creates the Gateway bean and then invokes its interface method my placing the payload on it. The Gateway then take care of the further business logic. I need help in understanding if I can achieve the same through <int-jms:message-driven-channel-adapter/> so that I can restrict my code changes to minimum (Planning to use this approach in case above issue is not resolved with MDP)

          Best Regards
          LB
          Last edited by lbvirgo; Jun 21st, 2013, 11:14 PM.

          Comment


          • #6
            You can't get the operations people to run 'jstack <pid>' ??

            It's hard to say without a thread dump but your finally block seems to indicate that all seems well, but I seriously doubt there is a problem with the container; there are probably many thousands of applications using it at very high volumes/concurrency.

            Another thing to try is to turn on TRACE level logging; the container logs all the receive attempts at that level.

            Yes, the message-driven adapter can probably eliminate the need for your custom listener.

            Comment


            • #7
              Hi - I've managed to replicate the issue in my localhost server and have captured the Thread dump using VisualVM.

              Though I'm not skilled enough to analyze the dump but I see that the listener threads with which I'm facing issue have threads state as WAITING (parking).

              It is waiting for what ???

              Pls find attached the dump (Thread-Dump.zip). Do you see issues anywhere ?
              Also attaching my MDP class and the spring integration config files for your reference.

              The listener that listens to the queue (with which I'm facing issue) is jmsListener_ERSsvc_SCH_CT_Req_Queue

              If you see the dump, three threads are present for this listener (which is expected as my "maxConcurrentConsumers" is 3). Though I'd more than 3 messages in the queue, this listener read only 3 messages.

              Also sorry for my incorrect statement in my previous response, I just noted in my local testing that control is *not* combing back to the MDP after it has invoked the Gateway method (there are no exceptions). I got confused with similar logs and thought that the logs were from "finally" block of my MDP.

              Best Regards
              Lalit
              Last edited by lbvirgo; Jun 22nd, 2013, 03:50 AM.

              Comment


              • #8
                I don't have time to analyze your entire application, but the problem is your threads are waiting for a reply in the gateway...

                ...
                at org.springframework.integration.core.MessagingTemp late$TemporaryReplyChannel.receive(MessagingTempla te.java:378)
                at org.springframework.integration.core.MessagingTemp late$TemporaryReplyChannel.receive(MessagingTempla te.java:372)
                at org.springframework.integration.core.MessagingTemp late.doReceive(MessagingTemplate.java:301)
                at org.springframework.integration.core.MessagingTemp late.doSendAndReceive(MessagingTemplate.java:319)
                at org.springframework.integration.core.MessagingTemp late.sendAndReceive(MessagingTemplate.java:239)
                at org.springframework.integration.gateway.MessagingG atewaySupport.doSendAndReceive(MessagingGatewaySup port.java:233)
                at org.springframework.integration.gateway.MessagingG atewaySupport.sendAndReceiveMessage(MessagingGatew aySupport.java:207)
                at org.springframework.integration.gateway.GatewayPro xyFactoryBean.invokeGatewayMethod(GatewayProxyFact oryBean.java:306)
                at org.springframework.integration.gateway.GatewayPro xyFactoryBean.doInvoke(GatewayProxyFactoryBean.jav a:269)
                at org.springframework.integration.gateway.GatewayPro xyFactoryBean.invoke(GatewayProxyFactoryBean.java: 260)
                at org.springframework.aop.framework.ReflectiveMethod Invocation.proceed(ReflectiveMethodInvocation.java :172)
                at org.springframework.aop.framework.JdkDynamicAopPro xy.invoke(JdkDynamicAopProxy.java:204)
                at com.sun.proxy.$Proxy18.processResponse(Unknown Source)
                at com.bnym.ecs.report.service.orchestration.jms.list eners.ERSOrchestrationSvcResMDP.onMessage(ERSOrche strationSvcResMDP.java:104)
                ...
                You are assigning the result of your flow to a variable 'aObj' which you then do nothing with. Do you really expect a reply from your flow? If so, what will you do with it?

                (However, it's not got there yet, it's waiting for a reply from processRequest).

                If you don't need a reply, make your gateway method return void.

                If you ARE expecting a reply, turn on DEBUG logging and follow the messages through the flow to see why you are not getting one.

                You could also add a 'default-reply-timeout' to the <gateway/> (or a 'reply-timeout' to the method) to avoid hanging your thread forever. If the gateway times out, it will return null.
                Last edited by Gary Russell; Jun 22nd, 2013, 04:34 AM.

                Comment


                • #9
                  Thanks.
                  In fact, I really do not expect a reply. The only reason I added a non-void return type for my gateway method is because in case there is any exception, I'm sending a meaningful failed response back to client. And I'm handling this through the global "error-channel" attribute of gateway.
                  Just to elaborate - the same gateway method is used for handling request via REST (synchronous call) and via JMS (async call).
                  I need to send a meaningful response to client only for sync calls.

                  And the code of 'aObj' is just a test code, it is not part of actual business logic.

                  However I can now get what could be the probable cause of this issue. I'll try to separate out my gateway methods so that I don't mix up with sync and async calls.

                  Let me test that change and revert with feedback.

                  As always, thanks for all the guidance.

                  Best Regards
                  LB
                  Last edited by lbvirgo; Jun 22nd, 2013, 05:32 AM.

                  Comment


                  • #10
                    I am not sure why adding an error-channel makes you think you need a return type.

                    Simply return void and you should be good - assuming you also remove the

                    Object aObj = ersOrchestrationSvcReqGateway.receive();

                    Comment


                    • #11
                      I am not sure why adding an error-channel makes you think you need a return type.

                      Simply return void and you should be good - assuming you also remove the

                      Object aObj = ersOrchestrationSvcReqGateway.receive();

                      Comment


                      • #12
                        Gary - Below is the use case for the reason I need to add "error-channel"

                        Client makes a sync (REST) call. My POJO will invoke the processRequest method on Gateway and will wait for a response. This response will then be returned back to client.
                        In case there is an exception, the error message will go to the channel defined via "error-channel". I've a chain that will read this error channel, get the "failed message" out of it, form a meaningful failed response for client and return back this response back to calling layer.
                        This response will then be returned back to my POJO that had intially invoked the processRequest method.
                        So this way, I'm making sure that whether exception or not, I'm sending a response back to client.

                        However in async call this is not the case. In async call, client just posts a message on a queue and forgets (its a fire and forget scenario). So here I need not send a response back to client (whether a valid or a failed response).

                        Based on your guidelines, I separated my gateways for sycn and async calls. Earlier I'd just one gateway for both the calls.
                        I've now tested my changes and it seems to work fine. All the messages are being read from my queue in case of async calls.

                        I believe the issue is now resolved. However will have to deploy code on remote server and test the use cases in real time.

                        The issue basically (I think, probably you can confirm as well) was that I'd an MDP that was handling async calls and a POJO that was handling sync calls and both were calling the same gateway method that had a non-void return. Sync calls were all fine as they were returning back the response to client but since MDP was doing nothing with the return, threads were hanging up there.

                        Many thanks & Best Regards
                        Lalit Bansal
                        Last edited by lbvirgo; Jun 22nd, 2013, 06:46 AM.

                        Comment


                        • #13
                          I think, probably you can confirm as well...
                          That is correct; you can't simply use the same gateway method for requests that sometimes reply and sometimes not.

                          You can play some tricks with timeouts and Future<?>s, to make it work, but it's better to use separate methods, or coerce the flow to always return a reply.

                          Comment


                          • #14
                            Thanks Gary.

                            On a different context, if I may use the same post for it, can you pls clarify on two points:

                            1) Though I've read the SI specification on channel adapters but I'm not clear on the point that how do they act as the "end-of-road" inside a chain ? To elaborate, I'd a chain where my last component in the chain was a service activator and the method it was invoking had a non-void return. Since I didn't define any "output-channel" for my chain, I was getting handle error which then I realized was expected. However on placing a logging-channel-adapter as the last component in my chain, I realized that I need not have to define an "output-channel" for the chain.
                            From this am I to interpret that all channel-adapters have been internally written to always define a void return ?

                            2) As I've been mentioning earlier, on receiving request, I put the payload on a gateway method that has a "request-channel" defined.
                            I've defined a "global" interceptor on this "request-channel" with order as negative. Also the "request-channel" has a local interceptor. This will mean that my global interceptor gets invoked first (both interceptors are overriding the "pre-send" method).
                            In the global interceptor, I'm doing validation of my payload for some mandatory fields as needed by business logic.
                            In case the mandatory field is not present, I'm throwing a customized RuntimeException (i.e., I've defined my own Exception class that extends RuntimeException).
                            My gateway has a "error-channel" attribute defined to which a Transformer is reading. This transformer gets the "MessagingException" through getPayload() method and then gets the "failed message" out of it. I then apply some business logic before returning back the control.
                            What I'm observing is below:
                            I sent a payload with one of the mandatory field missing. The global interceptor validated the payload and threw my customized RuntimeException.
                            This exception was caught on the "error-channel" by the Transformer.
                            However, when I do "(MessagingException)message.getPayload()", I get a ClassCastException.

                            I know that spec says that Spring will wrap an exception into MessagingException only if its under SI domain but is global interceptor not part of SI domain ? Why is my customized RuntimeException not being wrapped under it ?

                            Just to elaborate further - All my application code logic is wrapped under try-catch blocks and I'm throwing my customized RunTimeExceptions from the catch blocks. In those code blocks, I never got any issues of ClassCastException. It is the case only with Global interceptor.

                            Any suggestions on where I could be doing mistake ?

                            Best Regards
                            LB
                            Last edited by lbvirgo; Jun 23rd, 2013, 08:15 AM.

                            Comment


                            • #15
                              1. If the final element in the chain returns a reply, you must have an output channel; if you just want to discard the result, you can set the output-channel to 'nullChannel'.

                              2. Interceptors are a "special case"; exceptions thrown in 'preSend()' are not wrapped, whereas exceptions in 'postSend()' are wrapped in a MessageDeliveryException - if it's not already a MessagingException (see 'send()' in AbstractMessageChannel).

                              I'll need to talk to Mark to see if there was a specific reason for not wrapping preSend exceptions (if not, we might be able to make a change in 3.0).

                              For now, I suggest you either change your exception to subclass MessagingException, or wrap it in a MessagingException in your interceptor.

                              Comment

                              Working...
                              X