Hi all,

I have a problem in my application using Spring Framework 3.1.0.RELEASE and Spring JMS.
I want in my application to have only one consumer per queue.
Because I have many queues I want to use a caching class for connections/sessions/ preferable for consumers.
I am using JNDI with glassfish 3.1.2 Release for JMS Connections and Queues.
In glassfish I configured the ActiveMQ Connection Pool and the queues.
See the configuration:

Code:
  <bean id="jndiTemplate" class="org.springframework.jndi.JndiTemplate">
    <property name="environment">
      <props>
        <prop key="java.naming.factory.initial">${jndi.NamingFactory}</prop>
        <prop key="java.naming.provider.url">${jndi.NamingProviderURL}</prop>
        <prop key="java.naming.factory.url.pkgs">com.sun.enterprise.naming</prop>
      </props>
    </property>
  </bean>

  <bean id="jmsConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory" >
    <property name="targetConnectionFactory">
      <bean id="jmsJndiConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean">
        <property name="jndiTemplate" ref="jndiTemplate"/>
        <property name="jndiName" value="${jms.jndi.ConnectionFactory}"/>
        <property name="proxyInterface" value="javax.jms.ConnectionFactory"/>
        <property name="cache" value="true" />
        <property name="lookupOnStartup" value="true"/>  
      </bean>
    </property>
    <property name="cacheConsumers" value="false"/>
    <property name="cacheProducers" value="false"/> 
  </bean>

  <bean id="destinationResolver" class="org.springframework.jms.support.destination.JndiDestinationResolver"/>

    <bean id="pageUpdateSlaveMessageListener" class="com.tsystems.ac.fids.web.server.jms.JMSMessageReceiver">
    <property name="eventClass" value="com.tsystems.ac.fids.web.server.business.event.PageUpdateEvent"/>
    <property name="messageClass" value="com.tsystems.ac.fids.web.server.interfaces.pojoxml.Command"/>
    <property name="messageConverter" ref="jmsMasterMessageConverter" />
  </bean>

     <bean id="configurationUpdateSlaveMessageListener" class="com.tsystems.ac.fids.web.server.jms.JMSMessageReceiver">
    <property name="eventClass" value="com.tsystems.ac.fids.web.server.business.event.ConfigurationUpdateEvent"/>
    <property name="messageClass" value="com.tsystems.ac.fids.web.server.interfaces.pojoxml.Command"/>
    <property name="messageConverter" ref="jmsMasterMessageConverter" />
  </bean>
  
  <bean id="deviceUpdateSlaveMessageListener" class="com.tsystems.ac.fids.web.server.jms.JMSMessageReceiver">
    <property name="eventClass" value="com.tsystems.ac.fids.web.server.business.event.DeviceCommandEvent"/>
    <property name="messageClass" value="com.tsystems.ac.fids.web.server.interfaces.pojoxml.Command"/>
    <property name="messageConverter" ref="jmsMasterMessageConverter" />
  </bean>

   <bean id="executorAsync" class="org.springframework.core.task.SimpleAsyncTaskExecutor" />
   <bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
      <property name="entityManagerFactory" ref="entityManagerFactory"/>
   </bean>
  <!--
   ...
   to hide complexity I took out all the persistence beans except the JTA Transaction Manager.
  -->
 
  <jms:listener-container task-executor="executorAsync" connection-factory="jmsConnectionFactory" container-type="default"
                          destination-resolver="destinationResolver" transaction-manager="transactionManager" concurrency="1-1">
      <jms:listener destination="${jms.jndi.pageCommandSlaveDestination}" ref="pageUpdateSlaveMessageListener" /> 
      <jms:listener destination="${jms.jndi.configurationCommandSlaveDestination}" ref="configurationUpdateSlaveMessageListener" />
      <jms:listener destination="${jms.jndi.deviceCommandSlaveDestination}" ref="deviceUpdateSlaveMessageListener" />
  </jms:listener-container>


Here the consumer class:

  public class JMSMessageReceiver implements MessageListener {
	private static final Logger log = LoggerFactory
			.getLogger(JMSMessageReceiver.class);

	private MessageConverter messageConverter;

	@Autowired
	ApplicationContext applicationContext;

	private Class messageClass;
	private Class eventClass;
	private Class textEventClass = null;



	/**
	 * Will be called by Spring for each message.
	 */
	@Override
	@Transactional
	public void onMessage(Message message) {
		if (message == null) {
			return;
		}
		try {
			log.info("{} - Received JMS telegram: {}", message.getJMSDestination(), message.getJMSMessageID());
			if (message instanceof TextMessage) {
				String text = ((TextMessage) message).getText();
				log.debug("--- text: {}", text);
				if (messageConverter == null) {
					processText(text);
				} else {
					try {
						Object mess = messageConverter.fromMessage(message);
						mess = messageClass.cast(mess);
						processMessage(mess);
					} catch (ClassCastException e) {
						log.error("Received object, but parsing failed: "+ message, e);
						processText(text);
					} catch (Exception e) {
						log.error("Received object, but parsing failed: "+ message, e);
						processText(text);
					}
				}
			} else if (message instanceof BytesMessage) {
				Object mess = messageConverter.fromMessage(message);
				log.debug("--- object, expecting {}",
						messageClass.getName());
				mess = messageClass.cast(mess);
				processMessage(mess);
			}
		} catch (ClassCastException e) {
			throw new RuntimeException("Message type is incorrect");
		} catch (Exception e) {
			throw new RuntimeException(e);
		}
	}

   <!-- 
    ...
    for less complexity I took out some code -->
}

Here is my log gile from glassfish:

 [#|2012-06-20T15:38:27.760+0200|INFO|glassfish3.1.2|javax.enterprise.system.std.
com.sun.enterprise.server.logging|_ThreadID=18;_ThreadName=SimpleAsyncTaskExecut
or-2;|15:38:27.758 [SimpleAsyncTaskExecutor-2] DEBUG o.s.t.jta.JtaTransactionMan
ager - Initiating transaction commit
|#]

[#|2012-06-20T15:38:27.760+0200|INFO|glassfish3.1.2|javax.enterprise.system.std.
com.sun.enterprise.server.logging|_ThreadID=18;_ThreadName=SimpleAsyncTaskExecut
or-2;|15:38:27.760 [SimpleAsyncTaskExecutor-2] DEBUG o.s.t.jta.JtaTransactionMan
ager - Creating new transaction with name [org.springframework.jms.listener.Defa
ultMessageListenerContainer#1]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
|#]

[#|2012-06-20T15:38:27.761+0200|INFO|glassfish3.1.2|javax.enterprise.system.std.
com.sun.enterprise.server.logging|_ThreadID=18;_ThreadName=SimpleAsyncTaskExecut
or-2;|15:38:27.761 [SimpleAsyncTaskExecutor-2] DEBUG o.s.j.c.CachingConnectionFa
ctory - Creating cached JMS Session for mode 1: ManagedSessionProxy { ActiveMQSe
ssion {id=ID:W4DE3MSY4167457-55011-1340199497309-0:1:10,started=true} }
|#]

[#|2012-06-20T15:38:27.772+0200|INFO|glassfish3.1.2|javax.enterprise.system.std.
com.sun.enterprise.server.logging|_ThreadID=19;_ThreadName=SimpleAsyncTaskExecut
or-3;|15:38:27.772 [SimpleAsyncTaskExecutor-3] DEBUG o.a.activemq.ActiveMQMessag
eConsumer - remove: ID:W4DE3MSY4167457-55011-1340199497309-0:1:4:7, lastDelivere
dSequenceId:0
|#]
Not everything works corectly in the application with one Exception, the CachingConnectionFactory it starts sometimes 2 consumers instead of 1, like I configured in the jms:listener-container.
My configuration says :
concurrency="1-1" this means one consumer started at the beginning and maximal one.
Most of the times I have one consumer per queue but sometimes are just 2 started, I never get more then 2 consumers per queue.
I mean in the queue I have no messages and still I get 2 consumers.
As you can see I disabled the consumers cache from the CachingConnectionFactory, If I dont do it then it is worst, having almost all the times 2 consumers per queue.

The side efect of this problem it is that I cannot read/receive all messages from the queue. The messages are in the queue but no one of the 2 consumers are reading/receiving the messages.
I check the number of consumers with the ActiveMQ Web Administrator app.

I read something about the CachingConnectionFactory that the consumers/JMS Sessions are not closed and leaks are produces.


It is this the cause ?
Any idea what is wrong ?
Can I find a work around for this problem ?

In my opinion it is a concurency bug in the CachingConnectionFactory and/or in the DefaultMessageListenerContainer class if of course my configuration is correct.

PS: Replacing CachingConnectionFactory with activemq.pool.PooledConnectionFactory will solve the problem. But I don't want to have any dependency to ActiveMQ in the project.
If nobody can help me I will open a bug in JMS Spring for CachingConnectionFactory.

Code:
  <bean id="jmsConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
    <property name="connectionFactory">
    <bean class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL">
          <value>tcp://localhost:61616</value>
        </property>
      </bean>
    </property>
  </bean>