Announcement Announcement Module
Collapse
No announcement yet.
Problem with Durability Subscription Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Problem with Durability Subscription

    Hi,
    I was working on trying to integrate spring + activeMQ + jencks using the help available on the net. My requirement is for program to have durable and non durable subscribers on topics and start listening on demand. I was somehow able to get this thing done, but now I am facing a situation where my durable subscriber doesn't get the messages which were sent when it was down. My test case was as below

    1) Start both durable and Non durable subscriber and send 1-1 message each and check if the activeMQ console shows message consumed and Durable subscriber registered.
    2)Stop program after step 1 and re - run it and this time not registering any subscribers but still send messages on the topic on which durable subscriber is listening.
    3)Stop program after step 2 and re run it and this time register only the durable subscriber and check if the message sent earlier is received.

    From my observation, at step 3, the durable subscriber comes up but doesn't get any message but on a try and error basis I found out that when I bring both durable and non durable subscriber up and publish messages as in step 1, both get message sent currently as well as the message sent at step 2.

    I don't know if I am missing anything here. I had to use two jencks containers because durable subscriber listened on non durable subscription and vice versa when I used the code example provided on jencks website. All suggestions are welcome
    Code:
    <bean id="publisherFactory"
    		class="org.springframework.beans.factory.config.ServiceLocatorFactoryBean">
    		<property name="serviceLocatorInterface"
    			value="com.activemq.spring.jms.interfaces.ISpringMsgPublisherFactory" />
    	</bean>
    
    	<!-- will lookup the 'subscriber' bean by *TYPE* -->
    	<bean id="subscriberFactory"
    		class="org.springframework.beans.factory.config.ServiceLocatorFactoryBean">
    		<property name="serviceLocatorInterface"
    			value="com.activemq.spring.jms.interfaces.ISpringMsgSubscriberFactory" />
    	</bean>
    	<bean id="jmsConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
    		destroy-method="stop">
    		<property name="connectionFactory">
    			<bean class="org.apache.activemq.spring.ActiveMQConnectionFactory">
    				<property name="brokerURL">
    					<value>tcp://localhost:61616</value>
    				</property>
    			</bean>
    		</property>
    	</bean>
    	<bean id="base" class="com.activemq.activemq.impl.JmsBase"
    		abstract="true" lazy-init="false">
    	</bean>
    	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    		<property name="connectionFactory" ref="jmsConnectionFactory" />
    		<property name="defaultDestination" ref="topicDestination" />
    		<!--
    			<property name="defaultDestination" ref="topicDestination" />
    		-->
    		<property name="deliveryPersistent" value="true" />
    		<property name="pubSubDomain" value="true" />
    	</bean>
    	<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
    		<constructor-arg value="LOAD_EXT" />
    	</bean>
    	<bean id="durableTopicDestination" class="org.apache.activemq.command.ActiveMQTopic">
    		<constructor-arg value="EXT_LOAD" />
    	</bean>
    	<bean id="topicListener" class="com.activemq.spring.jms.NonDurableMsgListener"  />
    	<bean id="durabletopicListener" class="com.activemq.spring.jms.DurableMsgListener" />
    	<bean id="jencksSpringJmsTest" class="com.activemq.spring.jms.activemqSimulator" init-method="init" parent="base" lazy-init="false">
    		<property name="msgSubscriberFactory" ref="subscriberFactory" />
    		<property name="msgPublisherFactory" ref="publisherFactory" />
    		<property name="processNm" value="${com.activemq.process.activeMQ.name}" />
    	</bean>
    			<!-- JMS Common Services Publisher-->
    	<bean id="commonServicesPublisher" class="com.activemq.spring.jms.interfaces.SpringMsgPublisher"
    		scope="prototype">
    		<constructor-arg index="0" ref="nonDurableTopicDestinationList" />
    		<constructor-arg index="1" ref="durableTopicDestinationList" />
    		<constructor-arg index="2">
    			<value>ACTIVEMQ_PUBLISHER</value>
    		</constructor-arg>
    		<constructor-arg index="3" ref="jmsTemplate" />
    	</bean>
    	<bean id="durableTopicDestinationList"
    		class="org.springframework.beans.factory.config.ListFactoryBean" lazy-init="true" >
    		<property name="sourceList">
    			<list>
    				<ref bean="durableTopicDestination" />
    			</list>
    		</property>
    	</bean>
    	<bean id="nonDurableTopicDestinationList"
    		class="org.springframework.beans.factory.config.ListFactoryBean" >
    		<property name="sourceList">
    			<list>
    				<ref bean="topicDestination" />
    			</list>
    		</property>
    	</bean>
    	<!-- JMS Subscribers -->
    	<bean id="subscriber" class="com.activemq.spring.jms.interfaces.SpringNonDurableMsgSubscriber" init-method="init"  
    		scope="prototype">
    		<property name="msgConnectorWrapper" ref="listenerConnectorWrapper" />
    		<property name="clientId" value="ACTIVEMQ_SUBSCRIBER" />
    	</bean>
    	<bean id="durableSubscriber" class="com.activemq.spring.jms.interfaces.SpringDurableMsgSubscriber" init-method="init" 
    		scope="prototype">
    		<property name="msgConnectorWrapper" ref="durableListenerConnectorWrapper" />
    		<property name="clientId" value="ACTIVEMQ_DURABLE_SUBSCRIBER" />
    	</bean>
    	
    	<!--  START SNIPPET: jca  -->
    	<bean id="jencksNonDurableJCAContainer" class="org.jencks.JCAContainer" >
    	<property name="transactionManager" ref="transactionManager" />
    	<property name="threadPoolSize" value="25" />
    		<!--  the JCA Resource Adapter  -->
    	<property name="resourceAdapter" ref="activeMQResourceAdapter" />
    	</bean>
    	<bean id="jencksDurableJCAContainer" class="org.jencks.JCAContainer" >
    	<property name="transactionManager" ref="transactionManager" />
    	<property name="threadPoolSize" value="25" />
    		<!--  the JCA Resource Adapter  -->
    	<property name="resourceAdapter" ref="activeMQResourceAdapter" />
    	</bean>
    	<bean id="activeMQResourceAdapter" class="org.apache.activemq.ra.ActiveMQResourceAdapter" >
    			<property name="serverUrl" value="tcp://localhost:61616" />
    		</bean>
    	<bean id="listenerConnectorWrapper"
    		class="com.activemq.spring.jms.core.MsgListnerConnectorWrapper" >
    		<property name="connectionFactory" ref="jmsConnectionFactory" />
    		<property name="jcaContainer" ref="jencksNonDurableJCAContainer" />
    		<property name="activationSpec" ref="topicActivationSpec" />
    		<property name="refListener" value="topicListener" />
    	</bean>
    	<bean id="durableListenerConnectorWrapper"
    		class="com.activemq.spring.jms.core.MsgListnerConnectorWrapper" >
    		<property name="connectionFactory" ref="jmsConnectionFactory" />
    		<property name="jcaContainer" ref="jencksDurableJCAContainer" />
    		<property name="activationSpec" ref="durableTopicActivationSpec" />
    		<property name="refListener" value="durabletopicListener" />
    	</bean>
    	<!-- The TransactionManager -->
    	<bean id="transactionManager" class="org.jencks.factory.TransactionManagerFactoryBean" />
        <!-- configuration details for queue and topic -->
        <bean id="durableTopicActivationSpec" class="org.apache.activemq.ra.ActiveMQActivationSpec" >
    		<property name="destination" value="EXT_LOAD" />
    		<property name="destinationType" value="javax.jms.Topic" />
    		<property name="subscriptionDurability" value="Durable" />
    		<property name="clientId" value="101" />
    		<property name="subscriptionName" value="JencksDurableSubscriber" />
    	</bean>
    	<bean id="topicActivationSpec" class="org.apache.activemq.ra.ActiveMQActivationSpec" >
    		<property name="destination" value="LOAD_EXT" />
    		<property name="destinationType" value="javax.jms.Topic" />
    	</bean>
    Thanks
    Shishir
Working...
X