Announcement Announcement Module
Collapse
No announcement yet.
JMS CachingConnectionFactory it starts more consumers as configured. Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • JMS CachingConnectionFactory it starts more consumers as configured.

    Hi all,

    I have a problem in my application using Spring Framework 3.1.0.RELEASE and Spring JMS.
    I want in my application to have only one consumer per queue.
    Because I have many queues I want to use a caching class for connections/sessions/ preferable for consumers.
    I am using JNDI with glassfish 3.1.2 Release for JMS Connections and Queues.
    In glassfish I configured the ActiveMQ Connection Pool and the queues.
    See the configuration:

    Code:
      <bean id="jndiTemplate" class="org.springframework.jndi.JndiTemplate">
        <property name="environment">
          <props>
            <prop key="java.naming.factory.initial">${jndi.NamingFactory}</prop>
            <prop key="java.naming.provider.url">${jndi.NamingProviderURL}</prop>
            <prop key="java.naming.factory.url.pkgs">com.sun.enterprise.naming</prop>
          </props>
        </property>
      </bean>
    
      <bean id="jmsConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory" >
        <property name="targetConnectionFactory">
          <bean id="jmsJndiConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean">
            <property name="jndiTemplate" ref="jndiTemplate"/>
            <property name="jndiName" value="${jms.jndi.ConnectionFactory}"/>
            <property name="proxyInterface" value="javax.jms.ConnectionFactory"/>
            <property name="cache" value="true" />
            <property name="lookupOnStartup" value="true"/>  
          </bean>
        </property>
        <property name="cacheConsumers" value="false"/>
        <property name="cacheProducers" value="false"/> 
      </bean>
    
      <bean id="destinationResolver" class="org.springframework.jms.support.destination.JndiDestinationResolver"/>
    
        <bean id="pageUpdateSlaveMessageListener" class="com.tsystems.ac.fids.web.server.jms.JMSMessageReceiver">
        <property name="eventClass" value="com.tsystems.ac.fids.web.server.business.event.PageUpdateEvent"/>
        <property name="messageClass" value="com.tsystems.ac.fids.web.server.interfaces.pojoxml.Command"/>
        <property name="messageConverter" ref="jmsMasterMessageConverter" />
      </bean>
    
         <bean id="configurationUpdateSlaveMessageListener" class="com.tsystems.ac.fids.web.server.jms.JMSMessageReceiver">
        <property name="eventClass" value="com.tsystems.ac.fids.web.server.business.event.ConfigurationUpdateEvent"/>
        <property name="messageClass" value="com.tsystems.ac.fids.web.server.interfaces.pojoxml.Command"/>
        <property name="messageConverter" ref="jmsMasterMessageConverter" />
      </bean>
      
      <bean id="deviceUpdateSlaveMessageListener" class="com.tsystems.ac.fids.web.server.jms.JMSMessageReceiver">
        <property name="eventClass" value="com.tsystems.ac.fids.web.server.business.event.DeviceCommandEvent"/>
        <property name="messageClass" value="com.tsystems.ac.fids.web.server.interfaces.pojoxml.Command"/>
        <property name="messageConverter" ref="jmsMasterMessageConverter" />
      </bean>
    
       <bean id="executorAsync" class="org.springframework.core.task.SimpleAsyncTaskExecutor" />
       <bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
          <property name="entityManagerFactory" ref="entityManagerFactory"/>
       </bean>
      <!--
       ...
       to hide complexity I took out all the persistence beans except the JTA Transaction Manager.
      -->
     
      <jms:listener-container task-executor="executorAsync" connection-factory="jmsConnectionFactory" container-type="default"
                              destination-resolver="destinationResolver" transaction-manager="transactionManager" concurrency="1-1">
          <jms:listener destination="${jms.jndi.pageCommandSlaveDestination}" ref="pageUpdateSlaveMessageListener" /> 
          <jms:listener destination="${jms.jndi.configurationCommandSlaveDestination}" ref="configurationUpdateSlaveMessageListener" />
          <jms:listener destination="${jms.jndi.deviceCommandSlaveDestination}" ref="deviceUpdateSlaveMessageListener" />
      </jms:listener-container>
    
    
    Here the consumer class:
    
      public class JMSMessageReceiver implements MessageListener {
    	private static final Logger log = LoggerFactory
    			.getLogger(JMSMessageReceiver.class);
    
    	private MessageConverter messageConverter;
    
    	@Autowired
    	ApplicationContext applicationContext;
    
    	private Class messageClass;
    	private Class eventClass;
    	private Class textEventClass = null;
    
    
    
    	/**
    	 * Will be called by Spring for each message.
    	 */
    	@Override
    	@Transactional
    	public void onMessage(Message message) {
    		if (message == null) {
    			return;
    		}
    		try {
    			log.info("{} - Received JMS telegram: {}", message.getJMSDestination(), message.getJMSMessageID());
    			if (message instanceof TextMessage) {
    				String text = ((TextMessage) message).getText();
    				log.debug("--- text: {}", text);
    				if (messageConverter == null) {
    					processText(text);
    				} else {
    					try {
    						Object mess = messageConverter.fromMessage(message);
    						mess = messageClass.cast(mess);
    						processMessage(mess);
    					} catch (ClassCastException e) {
    						log.error("Received object, but parsing failed: "+ message, e);
    						processText(text);
    					} catch (Exception e) {
    						log.error("Received object, but parsing failed: "+ message, e);
    						processText(text);
    					}
    				}
    			} else if (message instanceof BytesMessage) {
    				Object mess = messageConverter.fromMessage(message);
    				log.debug("--- object, expecting {}",
    						messageClass.getName());
    				mess = messageClass.cast(mess);
    				processMessage(mess);
    			}
    		} catch (ClassCastException e) {
    			throw new RuntimeException("Message type is incorrect");
    		} catch (Exception e) {
    			throw new RuntimeException(e);
    		}
    	}
    
       <!-- 
        ...
        for less complexity I took out some code -->
    }
    
    Here is my log gile from glassfish:
    
     [#|2012-06-20T15:38:27.760+0200|INFO|glassfish3.1.2|javax.enterprise.system.std.
    com.sun.enterprise.server.logging|_ThreadID=18;_ThreadName=SimpleAsyncTaskExecut
    or-2;|15:38:27.758 [SimpleAsyncTaskExecutor-2] DEBUG o.s.t.jta.JtaTransactionMan
    ager - Initiating transaction commit
    |#]
    
    [#|2012-06-20T15:38:27.760+0200|INFO|glassfish3.1.2|javax.enterprise.system.std.
    com.sun.enterprise.server.logging|_ThreadID=18;_ThreadName=SimpleAsyncTaskExecut
    or-2;|15:38:27.760 [SimpleAsyncTaskExecutor-2] DEBUG o.s.t.jta.JtaTransactionMan
    ager - Creating new transaction with name [org.springframework.jms.listener.Defa
    ultMessageListenerContainer#1]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
    |#]
    
    [#|2012-06-20T15:38:27.761+0200|INFO|glassfish3.1.2|javax.enterprise.system.std.
    com.sun.enterprise.server.logging|_ThreadID=18;_ThreadName=SimpleAsyncTaskExecut
    or-2;|15:38:27.761 [SimpleAsyncTaskExecutor-2] DEBUG o.s.j.c.CachingConnectionFa
    ctory - Creating cached JMS Session for mode 1: ManagedSessionProxy { ActiveMQSe
    ssion {id=ID:W4DE3MSY4167457-55011-1340199497309-0:1:10,started=true} }
    |#]
    
    [#|2012-06-20T15:38:27.772+0200|INFO|glassfish3.1.2|javax.enterprise.system.std.
    com.sun.enterprise.server.logging|_ThreadID=19;_ThreadName=SimpleAsyncTaskExecut
    or-3;|15:38:27.772 [SimpleAsyncTaskExecutor-3] DEBUG o.a.activemq.ActiveMQMessag
    eConsumer - remove: ID:W4DE3MSY4167457-55011-1340199497309-0:1:4:7, lastDelivere
    dSequenceId:0
    |#]
    Not everything works corectly in the application with one Exception, the CachingConnectionFactory it starts sometimes 2 consumers instead of 1, like I configured in the jms:listener-container.
    My configuration says :
    concurrency="1-1" this means one consumer started at the beginning and maximal one.
    Most of the times I have one consumer per queue but sometimes are just 2 started, I never get more then 2 consumers per queue.
    I mean in the queue I have no messages and still I get 2 consumers.
    As you can see I disabled the consumers cache from the CachingConnectionFactory, If I dont do it then it is worst, having almost all the times 2 consumers per queue.

    The side efect of this problem it is that I cannot read/receive all messages from the queue. The messages are in the queue but no one of the 2 consumers are reading/receiving the messages.
    I check the number of consumers with the ActiveMQ Web Administrator app.

    I read something about the CachingConnectionFactory that the consumers/JMS Sessions are not closed and leaks are produces.


    It is this the cause ?
    Any idea what is wrong ?
    Can I find a work around for this problem ?

    In my opinion it is a concurency bug in the CachingConnectionFactory and/or in the DefaultMessageListenerContainer class if of course my configuration is correct.

    PS: Replacing CachingConnectionFactory with activemq.pool.PooledConnectionFactory will solve the problem. But I don't want to have any dependency to ActiveMQ in the project.
    If nobody can help me I will open a bug in JMS Spring for CachingConnectionFactory.

    Code:
      <bean id="jmsConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
        <property name="connectionFactory">
        <bean class="org.apache.activemq.ActiveMQConnectionFactory">
            <property name="brokerURL">
              <value>tcp://localhost:61616</value>
            </property>
          </bean>
        </property>
      </bean>
    Last edited by vasim; Jun 21st, 2012, 11:07 AM.
Working...
X