Announcement Announcement Module
Collapse
No announcement yet.
Durable Topic with jms:publish-subscribe-channel Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Durable Topic with jms:publish-subscribe-channel

    First let me say I really enjoyed working with Spring Integration, along with ActiveMQ. Good Stuff.

    I am prototyping an application with spring integration 2.0.0.M1 and ActiveMQ 5.3. I am just wondering is there a way to use the nice jmsublish-subscribe-channel with a durable topic. I did not see obvious configuration options with the JMS backed channel so I started to define MessageListenerContainer etc. It looks like I did create a durable consumer that way, however, it also looked like this explicitly defined MessageListenerContainer had no effect on the JMS backed channel.

    Oh, the JMS backed channel worked nicely with a regular non-durable ActiveMQ topic.

    Any pointers will be much appreciated.

    -Feng

  • #2
    Figured out using message drive inbound adapter

    Finally got it to work after shifting priorities took me to some other projects since I lasted posted the question above. I'll document my configuration in case any one is interested.

    First all, here is the my consumer configuration that did not work, from a durable consumer standpoint:

    Code:
      <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory">
          <bean class="org.apache.activemq.ActiveMQConnectionFactory">
            <property name="brokerURL" value="tcp://localhost:61616"/>
          </bean>
        </property>
        <property name="sessionCacheSize" value="10"/>
        <property name="cacheProducers" value="false"/>
        <property name="clientId" value="myDurableConsumer.instance.01"/>
      </bean>
    
      <!-- jms backed channel works nicely however cannot be made durable with spring integration 2.0.0.M1 -->
      <jms:publish-subscribe-channel id="accounts" topic-name="topic.categorized.events" />
    
      <si:chain input-channel="accounts" output-channel="accountsOut">
        <si:transformer id="accountTransformer" ref="accountTransformerBean"/>
        <si:filter ref="myFilter" method="accept"/>
      </si:chain>
    The above configuration does create a consumer, however it is not durable. From what I can find in docs and online, it does not seem it is possible to configure a durable subscriber this way.

    What worked is the message driven inbound adapter, which I can hook it up with a DefaultMessageListenerContainer, like so:

    Code:
    	<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
    	  <property name="targetConnectionFactory">
    	    <bean class="org.apache.activemq.ActiveMQConnectionFactory">
    	      <property name="brokerURL" value="tcp://localhost:61616"/>
    	    </bean>
    	  </property>
    	  <property name="sessionCacheSize" value="10"/>
    	  <property name="cacheProducers" value="false"/>
    	  <property name="clientId" value="myDurableConsumer.instance.01"/>
    	</bean>
    
      <bean id="defaultMessageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="concurrentConsumers" value="1"/>
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="destinationName" value="topic.categorized.events"/>
        <!--property name="messageListener" ref="notReallyNeededHere"/-->
        <property name="sessionTransacted" value="true"/>
        <property name="pubSubDomain" value="true"/>
        <property name="subscriptionDurable" value="true"/>
        <property name="durableSubscriptionName" value="mailer"/>
      </bean>
    
      <jms:message-driven-channel-adapter id="jmsInAdapter" channel="accounts" container="defaultMessageListenerContainer" />
    
      <si:chain input-channel="accounts" output-channel="accountsOut">
        <si:transformer id="accountTransformer" ref="accountTransformerBean"/>
        <si:filter ref="myFilter" method="accept"/>
      </si:chain>
    The only minor issue that I saw are a couple WARN messages appeared in the logs, when I took the consumer application down and bring it back up again, see below. In this case, if and only if there are messages waiting on the topic for the consumer, then the following messages are logged twice at the consumer application start up, but everything else seems to work fine. Subsequent new messages do not trigger any more WARN messages.

    I have not looked into the source code of DefaultMessageListenerContainer to see whether that warning can be somehow turned off. In the above configuration, technically the jmsInAdapter is the "messageListener" property for the defaultMessageListenerContainer, so ideally it should not have complained. Could this be a timing issue? Could be that at the start up, defaultMessageListenerContainer already started to receive messages while the jmsInAdapter has not been created yet by Spring. From the following log messages, it does seems that my jmsInAdapter was started after defaultMessageListenerContainer has been started and has already received messages from the topic and tried to invoke its "messageHandler".

    Code:
    [INFO] ThreadPoolTaskScheduler - Initializing ExecutorService  'taskScheduler'
    [INFO] EventDrivenConsumer - started logger.adapter
    [INFO] ThreadPoolTaskExecutor - Initializing ExecutorService  'requestExecutor'
    [INFO] CachingConnectionFactory - Established shared JMS Connection: ActiveMQConnection {id=ID:zhanlu-61442-1266020738529-0:0,clientId=courier.instance.01,started=false}
    [WARN] DefaultMessageListenerContainer - Execution of JMS message listener failed, and no ErrorHandler has been set. <java.lang.IllegalStateException: No message listener specified - see property 'messageListener'>java.lang.IllegalStateException: No message listener specified - see property 'messageListener'
    	at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:505)
    	at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:467)
    	at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:323)
    	at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:261)
    	at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:976)
    	at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:968)
    	at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:870)
    	at java.lang.Thread.run(Thread.java:637)
    
    [WARN] DefaultMessageListenerContainer - Execution of JMS message listener failed, and no ErrorHandler has been set. <java.lang.IllegalStateException: No message listener specified - see property 'messageListener'>java.lang.IllegalStateException: No message listener specified - see property 'messageListener'
    	at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:505)
    	at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:467)
    	at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:323)
    	at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:261)
    	at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:976)
    	at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:968)
    	at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:870)
    	at java.lang.Thread.run(Thread.java:637)
    
    [INFO] EventDrivenConsumer - started org.springframework.integration.config.ConsumerEndpointFactoryBean#0
    [INFO] EventDrivenConsumer - started org.springframework.integration.config.ConsumerEndpointFactoryBean#1
    [INFO] EventDrivenConsumer - started org.springframework.integration.endpoint.EventDrivenConsumer#0
    [INFO] JmsMessageDrivenEndpoint - started jmsInAdapter
    By the way, I am using Spring 3.0 RC1 and Spring Integration 2.0.0.M1.

    Thanks.

    Comment


    • #3
      I am very interested in this topic. Can anyone confirm if there is a better way not that both tools have advanced a few iterations? This appears to be a scarce subject. What about ActiveMQ "Virtual Destinations"?

      Thanks in advance!

      Comment

      Working...
      X