Announcement Announcement Module
Collapse
No announcement yet.
DMLC maxMessagesPerTask causes inability to scale down Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • DMLC maxMessagesPerTask causes inability to scale down

    I was going to put in a bug/enhancement request, but decided to discuss here first. Keep in mind what started me down this path was I noticed that my DMLC was able to scale up fine, but it was never scaling back down. After digging through the source, I believe this is due to how maxMessagesPerTask is implemented which is somewhat out of line with the api documentation. I say somewhat is because it is mentioned in text for setIdleTaskExecutionLimit, but no mention of it in setMaxMessagesPerTask. Also the name of the property itself ends up being misleading.

    The max is also a min. Unless the container is stopped, it will look for messages until it reaches maxMessagesPerTask. If you use the default (-1) it will look forever. A side effect of this is the check to scale down doesn't occur until after maxMessagesPerTask is reached, hence also a minimum. Again if you use the default maxMessagesPerTask settting (-1), it will never scale down! I don't think this is the intended behavior as the class documentation implies I should just be able to set maxConcurrentConsumers higher than concurrentConsumers to achieve dynamic scaling, which includes scaling down.

    IMHO, what should happen is the task should be considered idle as soon as it doesn't receive a message. Once it doesn't receive a message, then it should check idleTaskExecutionLimit. Of course if maxMessagesPerTask is > 0, then it needs to check that after every receive call.

    Am I off base here?
    Last edited by bbohl; Dec 19th, 2012, 09:26 AM.

  • #2
    Saw this same behavior with an additional problem. Idle tasks didn't seem to commit the message, so when my app restarts, all the pending messages were rolledback into the queues.

    Comment


    • #3
      That sounds like the "stuck" threads are in user code and never returned to the container.

      Comment


      • #4
        Originally posted by Gary Russell View Post
        That sounds like the "stuck" threads are in user code and never returned to the container.
        Hi Gary,

        I'm sure you know more about DMLC that I do... but this is what I've been seeing in our application. When I leave maxMessagesPerTask at the default (-1), my messages are committed and removed from the queue. However, this never decreased the number of active consumers on our queues. I changed maxMessagesPerTask and the consumers do become idle and stop listening as I wanted, but they leave unacknowledged messages behind, according to OpenMQ. Debugging shows that the consumer threads do go away, and are not stuck. Perhaps the messages get stuck in our ConnectionFactory?


        While debugging this morning, I found the code path is different when maxMessagesPerTask > 0 in line 984 of DMLC.AsyncMessageListenerInvoker.run() (Spring 3.2.2.RELEASE)

        Code:
        				if (maxMessagesPerTask < 0) {
        					messageReceived = executeOngoingLoop();
        				}
        				else {
        					int messageCount = 0;
        					while (isRunning() && messageCount < maxMessagesPerTask) {
        						messageReceived = (invokeListener() || messageReceived);
        						messageCount++;
        				}
        By default, executeOngoingLoop() is run, in the other invokeListener(). Not sure of this is the problem or not (or perhaps I'm not committing the message?). But in executeOngoingLoop, my JMS sessions are 'committed', under invokeListener they are not.

        Here's the code I use to construct my beans. We use a custom MessageListenerAdapter, but I changed the behavior to match Spring's MessageListenerAdapter and still saw the problem. We are not using a transaction manager, but local JMS transactions.

        Code:
        @Bean
          public Dispatcher dispatcher() {
            Dispatcher dispatcher = new Dispatcher();
            return dispatcher;
          }
        
          @Bean
          public MessageListenerAdapter dispatcherListenerAdapter() {
            MessageListenerAdapter adapter = new MessageListenerAdapter();
            adapter.setDelegate(dispatcher());
            adapter.setMessageConverter(messageConverter());
            adapter.setJmsTemplate(jmsTemplate());
            adapter.setRetryQueue(retryQueue());
            return adapter;
          }
        
          @Bean
          public DefaultMessageListenerContainer dispatcherContainer() {
            DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
            container.setMessageListener(dispatcherListenerAdapter());
        
            // Normally, this is 5 consumers, bump up so we keep ahead.
            container.setMaxConcurrentConsumers(20);
            container.setMaxMessagesPerTask(50);
        
            container.setDestinationName(dispatcherQueue());
            container.setConnectionFactory(connectionFactory());
            container.setSessionTransacted(true);
            container.setAutoStartup(false);
            return container;
          }
        Hope this helps!
        -dan

        Comment


        • #5
          invokeListener() is also invoked from executeOngoingLoop(); if you drill up the stack you'll see (when using local transactions) the commit is done in doExecuteListener().

          The management of the container threads has nothing to do with when transactions are committed.

          Comment


          • #6
            Originally posted by Gary Russell View Post
            invokeListener() is also invoked from executeOngoingLoop(); if you drill up the stack you'll see (when using local transactions) the commit is done in doExecuteListener().

            The management of the container threads has nothing to do with when transactions are committed.
            Yup, figured that out. Not sure what's happening to my app. Will go back to what works for now. Thanks for the comments.

            Comment


            • #7
              Is it possible that a CachingConnectionFactory could cause this problem? Things worked as expected when I took it out. Each DMLC connects directly to my OpenMP connection factory.

              Comment


              • #8
                By default, the consumer will be cached (and reused by a future thread); but it shouldn't have any impact on whether transactions are committed or not.

                You can set cacheConsumers to false.

                Comment


                • #9
                  Setting cacheConsumers to false worked. In testing with cacheConsumers true, I would see many consumers for a queue (beyond the number of listening threads in the container). They seemed did not seem to properly commit the messages.

                  Could be an OpenMQ issue?!? We've had strangeness like this in the past. I'll try ActiveMQ on our test server.

                  Thanks for the help. I think I have a working solution, but am still confused why cached consumers would not commit messages, though they claim to.

                  Edit:

                  Switching to ActiveMQ and cachedConsumers(true) exhibited the same behavior. The containers started enough consumers to process the messages, but did not seem to commit the messages properly. Might be possible to write a unit test showing this unexpected behavior, if you could point me in the right direction.
                  Last edited by blezek; May 23rd, 2013, 11:12 AM.

                  Comment


                  • #10
                    Let's take a step back - what makes you think that messages are not being committed?

                    Like we have discussed, the commit is done much further up the stack.

                    If you turn on debug logging you should see the commits (I'm pretty sure AMQ does, not sure about OpenMQ).

                    Maybe you can post your DMLC configuration?

                    Comment


                    • #11
                      DMLC config:

                      Code:
                      @Bean
                        public DefaultMessageListenerContainer dispatcherContainer() {
                          DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
                          container.setMessageListener(dispatcherListenerAdapter());
                      
                          // Normally, this is 5 consumers, bump up so we keep ahead.
                          container.setMaxConcurrentConsumers(20);
                          container.setMaxMessagesPerTask(10);
                      
                          container.setDestinationName(dispatcherQueue());
                          container.setConnectionFactory(connectionFactory());
                          container.setSessionTransacted(true);
                          container.setAutoStartup(false);
                          return container;
                        }
                      Connection Factory:
                      Code:
                      @Bean
                        public ConnectionFactory connectionFactory() {
                          // Formerly this used a CachingConnectionFactory, but this caused problems
                          // with dynamic scaling. Just use the standard deweyConnectionFactory.
                          logger.debug("Creating connectionFactory");
                          CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
                          connectionFactory.setTargetConnectionFactory(deweyConnectionFactory);
                          connectionFactory.setSessionCacheSize(15);
                          connectionFactory.setCacheProducers(true);
                          connectionFactory.setCacheConsumers(true);
                          return connectionFactory;
                        }
                      I give the message queue some work. In the debugger, I can see that commit is being called in
                      Code:
                      AbstractMessageListenerContainer.commitIfNecessary()
                      .

                      Watching the OpenMQ stats at startup (for dewey.dispatcher)

                      Code:
                      dewey-integration Thu May 23 13:14:09 CDT 2013
                                Name            Type    State      Producers        Consumers                  Msgs               
                                                                Total  Wildcard  Total  Wildcard  Count  Remote  UnAck  Avg Size
                      ----------------------------------------------------------------------------------------------------------
                      dewey.dispatcher          Queue  RUNNING  0      -         1      -         0      0       0      0.0
                      We have 0 producers, 1 consumer. My DMLC reports 1 active connection, 1 schedule, 20 max. Over time, the number of active connections peaks around 20, then drops back down to 1. The queue should have been drained, with no messages waiting. However I see this:

                      Code:
                      dewey-integration Thu May 23 13:21:50 CDT 2013
                                Name            Type    State      Producers        Consumers                  Msgs               
                                                                Total  Wildcard  Total  Wildcard  Count  Remote  UnAck  Avg Size
                      ----------------------------------------------------------------------------------------------------------
                      dewey.dispatcher          Queue  RUNNING  20     -         13     -         10     0       24     1750.5
                      10 messages in the queue, with 24 unacknowledged/uncommitted messages, with 13 consumers listening at the dewey.dispatcher queue. The messages may have been committed, but they are unacknowledged and rollback into the queue when I stop the application:

                      Code:
                      dewey-integration Thu May 23 13:26:13 CDT 2013
                                Name            Type    State      Producers        Consumers                  Msgs               
                                                                Total  Wildcard  Total  Wildcard  Count  Remote  UnAck  Avg Size
                      ----------------------------------------------------------------------------------------------------------
                      dewey.dispatcher          Queue  RUNNING  0      -         0      -         14     0       0      598.2143
                      Is this enough info to help? If I change:

                      Code:
                          connectionFactory.setCacheConsumers(true);
                      to

                      Code:
                          connectionFactory.setCacheConsumers(false);
                      the number of consumers matches the DMLC's number of active consumers, and the queue drains to 0 messages, 0 unacknowledged messages as expected.

                      Thoughts?

                      Comment


                      • #12
                        Bingo!

                        Yes, of course; the consumer is cached and the broker doesn't know there's not actually someone really receive()ing from it, so he sends a message to it and nobody's home.

                        Duh - sometimes one just needs to see the wood instead of the trees.

                        Caching consumers is simply not compatible with dynamic scaling down of listener threads.

                        Sorry it took so long for the light to turn on.

                        Comment


                        • #13
                          Originally posted by Gary Russell View Post
                          Bingo!

                          Yes, of course; the consumer is cached and the broker doesn't know there's not actually someone really receive()ing from it, so he sends a message to it and nobody's home.

                          Duh - sometimes one just needs to see the wood instead of the trees.

                          Caching consumers is simply not compatible with dynamic scaling down of listener threads.

                          Sorry it took so long for the light to turn on.
                          Great! I'm glad you figured this out: I'm not crazy after all. Is there some way to document this or give a warning to unsuspecting developers?

                          Comment


                          • #14
                            Is there some way to document this or give a warning
                            I suggest you open up a Spring Framework JIRA 'Documentation' Issue suggesting additions to the JavaDoc and/or a warning in the reference documentation and/or emit a WARN log if a caching connection factory is being used with cached consumers.

                            Comment


                            • #15
                              JIRA ticket logged. Our dev team discussed this issue today and could not figure out a scenario where you might want to use a cached consumer. We dropped the caching from our app.

                              Thanks much!

                              Comment

                              Working...
                              X