Announcement Announcement Module
Collapse
No announcement yet.
JMS listener-container concurrentConsumers Issue Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • JMS listener-container concurrentConsumers Issue

    I'm seeing an issue with concurrent consumption of JMS messages. I've written a simple test which demonstrates what is occurring. It seems that threads are NOT being assigned to waiting consumers as they become free, they are being assigned based on the order they were originally used.

    Simple example (with concurrentConsumers=3, 5 concurrent requests are send initially):

    1st consumer/thread starts and sleeps for 10 seconds, completes.
    2nd consumer/thread starts and sleeps for 5 seconds, completes.
    3rd consumer/thread starts and sleeps for 3 seconds, completes.
    4th REQUEST waits for the 1st consumer/thread to complete before being processed
    5th REQUEST waits for the 2nd consumer/thread to complete

    I would expect that the 4th Request would be handled when the 3rd request completes (3rd request completes before 1st and 2nd) and that the 5th Request would be handled when the 2nd Request completes.

    I'm using ActiveMQ 5.4.1, Spring 3.0.4. Following is a log showing the behavior with 10 consumers (and 13 initially queued messages), followed by my configuration.

    This seems pretty basic, am I missing something? This is certainly not the default behavior I would expect.

    --- --- ---

    08:45:03,864 | INFO | ad-1 | uniworks.core.ThreadTester | ThreadTester: Sending Requests
    08:45:03,904 | INFO | er-8 | uniworks.core.ThreadTestServiceImpl | Starting Thread: duration of pause = 15000
    08:45:03,919 | INFO | er-7 | uniworks.core.ThreadTestServiceImpl | Starting Thread: duration of pause = 12000
    08:45:03,949 | INFO | er-3 | uniworks.core.ThreadTestServiceImpl | Starting Thread: duration of pause = 11000
    08:45:03,968 | INFO | er-2 | uniworks.core.ThreadTestServiceImpl | Starting Thread: duration of pause = 10000
    08:45:03,990 | INFO | er-4 | uniworks.core.ThreadTestServiceImpl | Starting Thread: duration of pause = 9000
    08:45:04,017 | INFO | r-10 | uniworks.core.ThreadTestServiceImpl | Starting Thread: duration of pause = 8000
    08:45:04,038 | INFO | er-1 | uniworks.core.ThreadTestServiceImpl | Starting Thread: duration of pause = 7000
    08:45:04,056 | INFO | er-5 | uniworks.core.ThreadTestServiceImpl | Starting Thread: duration of pause = 6000
    08:45:04,084 | INFO | er-6 | uniworks.core.ThreadTestServiceImpl | Starting Thread: duration of pause = 5000
    08:45:04,106 | INFO | er-9 | uniworks.core.ThreadTestServiceImpl | Starting Thread: duration of pause = 4000
    08:45:04,160 | INFO | ad-1 | uniworks.core.ThreadTester | ThreadTester: All Requests Sent
    08:45:08,107 | INFO | er-9 | uniworks.core.ThreadTestServiceImpl | Ending Thread: duration of pause = 4000
    08:45:09,085 | INFO | er-6 | uniworks.core.ThreadTestServiceImpl | Ending Thread: duration of pause = 5000
    08:45:10,056 | INFO | er-5 | uniworks.core.ThreadTestServiceImpl | Ending Thread: duration of pause = 6000
    08:45:11,038 | INFO | er-1 | uniworks.core.ThreadTestServiceImpl | Ending Thread: duration of pause = 7000
    08:45:12,017 | INFO | r-10 | uniworks.core.ThreadTestServiceImpl | Ending Thread: duration of pause = 8000
    08:45:12,990 | INFO | er-4 | uniworks.core.ThreadTestServiceImpl | Ending Thread: duration of pause = 9000
    08:45:13,968 | INFO | er-2 | uniworks.core.ThreadTestServiceImpl | Ending Thread: duration of pause = 10000
    08:45:14,949 | INFO | er-3 | uniworks.core.ThreadTestServiceImpl | Ending Thread: duration of pause = 11000
    08:45:14,954 | INFO | er-3 | uniworks.core.ThreadTestServiceImpl | Starting Thread: duration of pause = 1000
    08:45:15,919 | INFO | er-7 | uniworks.core.ThreadTestServiceImpl | Ending Thread: duration of pause = 12000
    08:45:15,924 | INFO | er-7 | uniworks.core.ThreadTestServiceImpl | Starting Thread: duration of pause = 2000
    08:45:15,954 | INFO | er-3 | uniworks.core.ThreadTestServiceImpl | Ending Thread: duration of pause = 1000
    08:45:17,924 | INFO | er-7 | uniworks.core.ThreadTestServiceImpl | Ending Thread: duration of pause = 2000
    08:45:18,904 | INFO | er-8 | uniworks.core.ThreadTestServiceImpl | Ending Thread: duration of pause = 15000
    08:45:18,908 | INFO | er-8 | uniworks.core.ThreadTestServiceImpl | Starting Thread: duration of pause = 3000
    08:45:21,909 | INFO | er-8 | uniworks.core.ThreadTestServiceImpl | Ending Thread: duration of pause = 3000

    --- --- ---

    <jms:listener-container
    container-type="default"
    connection-factory="pooledConnectionFactory"
    acknowledge="auto"
    concurrency="10-10">
    <jms:listener destination="springThreadTestService" ref="threadTestServiceImpl" method="onMessage" />
    </jms:listener-container>


    <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFacto ry">
    <property name="brokerURL" value="${jms.url}"/>
    </bean>

    <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFa ctory">
    <property name="maxConnections" value="25"/>
    <property name="maximumActive" value="500"/>
    <property name="connectionFactory" ref="jmsConnectionFactory"/>
    </bean>

  • #2
    I think you *might* be seeing prefetch behavior, i.e. the 4th request was actually prefetched by that first consumer. Can you run this same example with a prefetch of 0?

    Comment


    • #3
      I tried setting prefetch="0", was not allowed, threw a maxMessagesPerTask can't be 0 exception. Tried setting prefetch=1 also, didn't affect how it executed.

      prefetch seemed like a likely candidate... any other ideas? maybe not being configured with a TransactionManager?

      Any other ideas / areas to look at?

      Comment


      • #4
        Sorry, I was actually thinking about the ActiveMQ prefetch setting - not the value on the MessageListener container.

        Comment


        • #5
          solved

          Thanks! Setting the prefetch on the ActiveMQ/jms uri to 0 did fix the issue. It also made some of the confusing startup sequence go away.

          tcp://localhost:61616?jms.prefetchPolicy.all=0

          Comment


          • #6
            Glad to hear it! That's a common source of confusion. Of course, you might want to consider different prefetch values now that you understand what's happening when prefetch is enabled.

            Comment

            Working...
            X