Announcement Announcement Module
Collapse
No announcement yet.
ActiveMQ JMS Topic and Subscribers setup and visible in JMX but messages not consumed Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • ActiveMQ JMS Topic and Subscribers setup and visible in JMX but messages not consumed

    Hi Guys,

    I've an ActiveMQ JMS Topic and 3 Subscribers setup and visible in JMX but messages are never consumed from the Topic.

    I've extracted the parts of the configuration below that I think are important where I've tried to setup a ActiveMQ Topic and 3 of listeners consume messages from the topic (the full XML config is at the end of the post).

    I believe I can see the listeners and the messages enqueued on the topic but they are never consumed.

    The classes configured as listeners all implement the MessageListener interface and I've also tried to add it as a MDP. I can see the classes being initialized, that code also included at the end.


    Here's the elements from the config XML that setup the listener:

    Code:
        <jms:listener-container container-type="default" 
                                connection-factory="jmsConsumerConnectionFactory"
                                acknowledge="auto" 
                                destination-type="topic">
            <jms:listener destination="smarter.jms.topic" ref="asyncFCPolicyHandler" subscription="subscription"/>
            <jms:listener destination="smarter.jms.topic" ref="asyncTPOPolicyHandler" />
        </jms:listener-container>
    
        <bean id="mdpMessageListener" class="ie.smarter.jms.AsyncFCPolicyHandler" />


    I've used a little tool to inspect the JMX console and I can see the Topic is setup, there are 3 subscribers to the topic but messages are never taken off the topic, based on the length of the "Enqueue Count".

    Any help is appreciated on where I've gone wrong! I am sure it is something dumb I am doing

    Here's the output of the JMX console using JMXTerm (changed the list order of 'get *' for simpler reading):

    Code:
    Oisin-Kims-MacBook-Air:SpringJMS oisin$ java -jar jmxterm-1.0-alpha-4-uber.jar 
    Welcome to JMX terminal. Type "help" for available commands.
    
    $>jvms
    9712     ( ) - org.eclipse.jdt.internal.junit.runner.RemoteTestRunner -version 3 -port 54604 -testLoaderClass org.eclipse.jdt.internal.junit4.runner.JUnit4TestLoader -loaderpluginname org.eclipse.jdt.junit4.runtime -classNames ie.smarter.jms.JmsMessageListenerTest
    9693     ( ) - jmxterm-1.0-alpha-4-uber.jar
    
    $>open 9712
    #Connection to 9712 is opened
    
    $>domain org.apache.activemq
    #domain is set to org.apache.activemq
    
    $>bean
    org.apache.activemq:BrokerName=localhost,Destination=smarter.jms.topic,Type=Topic
    
    $>get *    
    #mbean = org.apache.activemq:BrokerName=localhost,Destination=smarter.jms.topic,Type=Topic:
    Name = smarter.jms.topic;
    
    ConsumerCount = 3;
    
    EnqueueCount = 100;
    
    QueueSize = 100;
    
    Subscriptions = [ org.apache.activemq:BrokerName=localhost,Type=Subscription,persistentMode=Non-Durable,destinationType=Topic,destinationName=smarter.jms.topic,clientId=ID_Oisin-Kims-MacBook-Air.local-54612-1319298591637-3_1,consumerId=ID_Oisin-Kims-MacBook-Air.local-54612-1319298591637-2_1_3_1, org.apache.activemq:BrokerName=localhost,Type=Subscription,persistentMode=Non-Durable,destinationType=Topic,destinationName=smarter.jms.topic,clientId=ID_Oisin-Kims-MacBook-Air.local-54612-1319298591637-3_1,consumerId=ID_Oisin-Kims-MacBook-Air.local-54612-1319298591637-2_1_2_1, org.apache.activemq:BrokerName=localhost,Type=Subscription,persistentMode=Non-Durable,destinationType=Topic,destinationName=smarter.jms.topic,clientId=ID_Oisin-Kims-MacBook-Air.local-54612-1319298591637-3_1,consumerId=ID_Oisin-Kims-MacBook-Air.local-54612-1319298591637-2_1_1_1 ];
    
    ProducerFlowControl = true;
    
    MemoryPercentUsage = 0;
    
    MemoryLimit = 67108864;
    
    MaxProducersToAudit = 1024;
    
    MaxAuditDepth = 2048;
    
    MaxPageSize = 200;
    
    UseCache = true;
    
    DispatchCount = 0;
    
    DequeueCount = 0;
    
    InFlightCount = 0;
    
    ExpiredCount = 0;
    
    ProducerCount = 0;
    
    MemoryUsagePortion = 1.0;
    
    MaxEnqueueTime = 0;
    
    MinEnqueueTime = 0;
    
    AverageEnqueueTime = 0.0;
    Here's my full XML config:

    Code:
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:p="http://www.springframework.org/schema/p"
           xmlns:context="http://www.springframework.org/schema/context"
           xmlns:jms="http://www.springframework.org/schema/jms"
           xmlns:amq="http://activemq.apache.org/schema/core"
           xsi:schemaLocation="http://www.springframework.org/schema/beans 
                               http://www.springframework.org/schem...ring-beans.xsd
                               http://www.springframework.org/schema/context 
                               http://www.springframework.org/schem...ng-context.xsd
                               http://www.springframework.org/schema/jms 
                               http://www.springframework.org/schem...spring-jms.xsd
                               http://activemq.apache.org/schema/core 
                               http://activemq.apache.org/schema/core/activemq-core-5.5.0.xsd">
     
        <context:component-scan base-package="ie.smarter.jms" />
    
        <!--  Embedded ActiveMQ Broker -->
        <amq:broker id="broker" useJmx="true" persistent="false">
            <amq:transportConnectors>
                <amq:transportConnector uri="tcp://localhost:0" />
            </amq:transportConnectors>
        </amq:broker>
    
        <!--  ActiveMQ Destination  -->
        <amq:topic id="topic.destination" physicalName="smarter.jms.topic" name="smarter.jms.topic"/>
        
        <!-- JMS ConnectionFactory to use, configuring the embedded broker using XML -->
        <amq:connectionFactory id="jmsFactory" brokerURL="vm://localhost" />
    
        <!-- JMS Producer Configuration -->
        <bean id="jmsProducerConnectionFactory" 
              class="org.springframework.jms.connection.SingleConnectionFactory"
              depends-on="broker"
              p:targetConnectionFactory-ref="jmsFactory"
             />
    
        <bean id="jmsProducerTemplate" class="org.springframework.jms.core.JmsTemplate"
              p:connectionFactory-ref="jmsProducerConnectionFactory"
              p:defaultDestination-ref="topic.destination"
              p:pubSubDomain ="true"
              />
    
        <!-- JMS Consumer Configuration -->
        <bean id="jmsConsumerConnectionFactory" 
              class="org.springframework.jms.connection.SingleConnectionFactory"
              depends-on="broker"
              p:targetConnectionFactory-ref="jmsFactory" />
                 
        <jms:listener-container container-type="default" 
                                connection-factory="jmsConsumerConnectionFactory"
                                acknowledge="auto" 
                                destination-type="topic">
            <jms:listener destination="smarter.jms.topic" ref="asyncFCPolicyHandler" subscription="subscription"/>
            <jms:listener destination="smarter.jms.topic" ref="asyncTPOPolicyHandler" />
        </jms:listener-container>
        
        
        <!-- Adding a 3rd Message Driven POJO (MDP) listener -->
        <bean id="mdpMessageListener" class="ie.smarter.jms.AsyncFCPolicyHandler" />
    
        <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="jmsConsumerConnectionFactory"/>
            <property name="destination" ref="topic.destination"/>
            <property name="messageListener" ref="mdpMessageListener" />
            <property name="pubSubDomain" value="true" />  
        </bean>
    
        <!-- Counter for consumer to increment and test to verify count -->
        <bean id="counter" class="java.util.concurrent.atomic.AtomicInteger" />
        
    </beans>

    Here's a sample class that is a listener and also tried as a MDP:

    Code:
    package ie.smarter.jms;
    
    import java.util.concurrent.atomic.AtomicInteger;
    
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageListener;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    /**
     * Consumes messages from a JMS topic.
     * 
     * @author Oisin Kim
     */
    @Component
    public class AsyncFCPolicyHandler implements MessageListener { 
    
        private static final Logger logger = LoggerFactory.getLogger(AsyncFCPolicyHandler.class);
    
        @Autowired
        private AtomicInteger counter = null;
        
        static {
        	logger.info("AsyncFCPolicyHandler initalised");
        }
    
        /**
         * Implementation of <code>MessageListener</code>.
         */
        public void onMessage(Message message) {
            try {   
            	    long policyId = message.getLongProperty(JmsMessageProducer.POLICY_ID);
            	    if (logger.isInfoEnabled())
            	    	logger.info("(1) Handling FC request to processed message for Policy ["+ policyId+"]");
                    
                    //this increments the AtomicInteger
                    counter.incrementAndGet();
                
            } catch (JMSException e) {
                logger.error(e.getMessage(), e);
            }
        }
        
    }

  • #2
    As a note, there was nothing wrong with the sample code - it was the testing!!!!

    Comment

    Working...
    X