Announcement Announcement Module
Collapse
No announcement yet.
How to indicate that a JMSTemplate.receive() should use a DurableSubscriber Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • How to indicate that a JMSTemplate.receive() should use a DurableSubscriber

    I want to do this without using the xml configuration. I am toying with Spring JMS to see if it meets my needs and if it does to adopt it for our project. Is there anyway of doing a synchronous receive using the

    Code:
    JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory); 
    jmsTemplate.receive()
    so that it is equivalent to the pure JMS call:

    Code:
    MessageConsumer consumer = session.createDurableSubscriber(topic, "durable name"); 
    Message message = consumer.receive();
    Without needing to configure through xml?

  • #2
    Again, it's better to use a DefaultListenerContainer for this use case, where you can simply configure that you want a durable subscription by calling setSubscriptionDurable and setDurableSubscriptionName.

    With the JmsTemplate you'd have to use one of the methods with a SessionCallback and create your own durable consumer and receive from it,

    Comment


    • #3
      Thanks for the reply Gary,

      The problem is the kind of use case I have. I want a single connection to be MessageConsumer for Topic A and the same connection to be messageProducer for Topic B. If I use DefaultMessageListenerContainer the code looks like this:

      Code:
      dmlc = (DefaultMessageListenerContainer) factory.getBean("messageListener");
      		dmlc.start();
      When start() is called the createConnection() is called on the SingleConnectionFactory for this thread

      However I want access to the same connection in the onMessage() method because I want to perform a publish() within the same session.(enables the session commit() to affects both topic A and B simultaneously) This is not possible because the onMessage() is on a different thread and it results in SingleConnectionFactory creating a new connection for me.

      If I instead use synchronous receive() I lose the benefits of using the messageListenerContainer and will have to do the error handling and recovery by hand.

      Comment


      • #4
        But if you do the send() on the container's thread - the one that calls onMessage() (and the session is transacted), any JmsTemplate sends will all be done on the same session.

        Comment


        • #5
          Yes that makes sense. Am just wondering if every onMessage() is on a new thread. If thats the case I will have a new JMS Connection on every call because the SingleConnectionFactory gives a single connection per thread. Let me implement this and see if it works. Thanks a lot.

          Comment


          • #6
            No; the SingleConnectionFactory caches a single connection period.

            Each thread (controlled by concurrentConsumers) runs in a separate session on that single connection.

            If you use the subclass (CachingConnectionFactory), the sessions, consumers, producers are also cached - avoiding the overhead. By default, only one session is cached.

            Comment


            • #7
              I see, I tested this and I get a

              javax.jms.InvalidClientIDException: clientId already exists for every send within an onMessage() (except for the first onMessage()).
              I can see that all the onMessage() calls take place in the same thread. However it seems the jmsTemplate.send() is trying to call createConnection(). I tried using both SingleConnectionFactory and CachingConnectionFactory.

              Portion of stack trace for the javax.jms.InvalidClientIDException exception:

              Code:
              	at org.springframework.jms.connection.SingleConnectionFactory.doCreateConnection(SingleConnectionFactory.java:342)
              	at org.springframework.jms.connection.SingleConnectionFactory.initConnection(SingleConnectionFactory.java:288)
              	at org.springframework.jms.connection.SingleConnectionFactory.createConnection(SingleConnectionFactory.java:225)
              	at org.springframework.jms.support.JmsAccessor.createConnection(JmsAccessor.java:184)
              	at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:456)
              	at org.springframework.jms.core.JmsTemplate.send(JmsTemplate.java:534)


              I also wrote a CustomSingleConnection factory that extends SingleConnectionFactory

              Code:
              public class CustomConnectionFactory extends SingleConnectionFactory{
              
              	ConnectionFactory factory = null;
              	Connection connection = null;
              	int count = 0;
              
              	public CustomConnectionFactory(ConnectionFactory factory){
              		super(factory);
              		this.factory = factory;
              	}
              
              	public Connection createConnection()
              	throws JMSException{
              		count++;
              		System.out.println("Create connection was called  " + count + " times in the thread " + Thread.currentThread().getId());
              		if(null==connection){
              			System.out.println("Connection was NULL creating new connection!");
              			connection = factory.createConnection();
              			System.out.println("Connection created with clientId " + connection.getClientID());
              		}
              		connection.setExceptionListener(new ExceptionListener() {
              			public void onException(JMSException ex) {
              				System.out.println("Connection got closed!!!!!!!!!!!!!" + ex.getMessage());
              
              			}
              		});
              		return connection;
              	}
              
              }
              CustomConnectionFactory works for me because the connection gets closed at the end of every send() call and recreated at the next send() call. But in any case I would'nt want a new connection to be opened everytime a send() is called.
              This is what my config looks like:

              Code:
              	<bean id="pubCachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
              		<property name="targetConnectionFactory" ref="pubConnectionFactory" />
              		<property name="reconnectOnException" value="true"/>
              		<property name="sessionCacheSize" value="1" />
              	</bean>
              Any idea why the CachingConnectionFactory would not simply reuse the previously created connection?

              Comment


              • #8
                Did you set sessionTransacted to true on the DMLC?

                Comment


                • #9
                  Originally posted by Gary Russell View Post
                  But if you do the send() on the container's thread - the one that calls onMessage() (and the session is transacted), any JmsTemplate sends will all be done on the same session.
                  Can I also control the commit() and rollback() of the receive() from within the onMessage(). Sometimes I might need to rollback() or commit() a transaction manually

                  Comment


                  • #10
                    Originally posted by Gary Russell View Post
                    Did you set sessionTransacted to true on the DMLC?
                    Yes here's my config:

                    Code:
                    <bean id="stagingMessageListener" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
                    		<property name="connectionFactory" ref="subCachingConnectionFactory"/>
                    		<property name="destination" ref="stagingTopic"/>
                    		<property name="messageListener" ref="messageListener"/>
                    		<property name="sessionAcknowledgeModeName" value="AUTO_ACKNOWLEDGE"/>
                    		<property name="subscriptionDurable" value="true"/>
                    		<!-- TODO: name the durable -->
                    		<property name="durableSubscriptionName" value="durable name" />
                    		<!--Each message listener invocation will operate within an active JMS transaction, 
                    			with message reception rolled back in case of listener execution failure-->
                    		<property name="sessionTransacted" value="true" />
                    		<!--cache a shared JMS Connection, a JMS Session, and a JMS MessageConsumer (CACHE_CONSUMER = 3)-->
                    		<property name="cacheLevel" value="3"/>
                    		<!-- use only a single MessageConsumer -->
                    		<property name="maxConcurrentConsumers" value="1"/>
                    		<property name="autoStartup" value="false"/>
                    		
                    	</bean>

                    Comment


                    • #11
                      I think I solved that problem. The reason was that i was doing a factory = new ClassPathXmlApplicationContext(configLocation) within every onMessage and using it to initialize my sender bean. The CachingConnectionFactory works fine.

                      Comment


                      • #12
                        I think I solved that problem.
                        Ah; good.

                        Regarding rollback; if the listener throws an exception, the transaction will be rolled back; if it exits normally, it will be committed.

                        Comment


                        • #13
                          One last question..
                          if I use dmlc.start () ( Connection 1 ) and within the onMessage() do a jmsTemplate.send() (Connection 2 )
                          if an exception occurs in Connection 2 will the message sent on Connection 1 get RolledBack safely?

                          I guess my question is, Is this completely Atomic (( connection1->session.rollback ) and (connection2->session.rollback))
                          The same question for commit(), will the commit be atomic too? Since they are two different transactions taking place on Connections that are on different threads can we guarantee that they will either happen together or not happen at all without using an external Transaction Manager?
                          Does the dmlc guarantee this?

                          Comment


                          • #14
                            There still seems to be some confusion. What do you mean by connection 1 and connection 2? There is only one connection if you use a (Simple|Caching)ConnectionFactory.

                            * If the session is transacted and
                            * if the jmsTemplate send is done in onMessage(), it will use the same session and will be atomic and the receive() and any other jmsTemplate operations will be either committed or rolled back.

                            If you do the send on a separate thread, session, connection, whatever; then, no, they won't be atomic.

                            You cannot have an atomic transaction across multiple sessions or connections.

                            To summarize, the DMLC does a receive, binds the session to the thread; the JmsTemplate (when running on the same thread), will use the same session; when onMessage() returns, the transaction is commited (or rolled back if an exception is thrown).

                            Comment


                            • #15
                              Thanks for clarifying that Gary, As long as the receive() and the onMessage() calls take place on the same thread ( and consequently same session/connection because I use the CachingConnectionFactory), my use case is met.

                              Comment

                              Working...
                              X