Announcement Announcement Module
Collapse
No announcement yet.
Asynchronous Messaging using Spring/JMS/ActiveMQ Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Asynchronous Messaging using Spring/JMS/ActiveMQ

    Hello,

    I need to integrate JMS in my Spring based application. Initially, I tried using JMSTemplate in Spring 1.1 and ran into issues, so I bought a new book called, 'Pro Spring', which says that Spring 1.1 only supports Synchronous messaging. The book *previews* Spring 1.2 JMS Support.

    As per the instructions in the book, I added various beans such as JmsConnectionFactoryBean, JmsSessionFactoryBean to my application context. I tried using Spring 1.2-RC2, but realized that these classes are NOT available in there! I looked at CVS and I don't see them in there either! Has the implementation of JMS changed since this book was written? Am I not looking at the correct CVS repository? Someone please help!

    I was going thru the archives and saw an email which indicates that ActiveMQ can be integrated using a JCA Container. It directs us to the following url: http://activemq.org/JCA+Container

    Is this the best and preferred way at this time? Do I need Spring 1.2 for this?

    Someone please help.... soon! Thanks.

  • #2
    Indeed, Spring 1.2 does not ship with any JMS support classes for asynchronous messaging. We do plan to come up with a comprehensive solution there, but have deferred this to the Spring 1.3 timeframe.

    For the time being, you can find those classes that Pro Spring mentions in our sandbox in CVS (sandbox/src directory). It's not clear whether we're ever gonna release them in that fashion, though: Unstead, we intend to find a solution where all you need to implement is a JMS MessageListener, managed as Spring bean (similar to an MDB in an EJB container).

    I would currently recommend either ActiveMQ's JCA container (which should work nicely on Spring 1.1) or traditional MDBs if you need multi-threaded asynchronous reception of JMS messages.

    Juergen

    Comment


    • #3
      Of course, you can always use the JMS API directly to create message consumers with a MessageListener. Let your JMS client object receive a ConnectionFactory, create a Connection, create a Session, create a MessageConsumer, set a MessageListener. The FactoryBeans from our sandbox don't do anything else, actually.

      The limitation of this approach (both in case of direct usage and with the FactoryBeans from a sandbox) is that it isn't suitable for concurrent message reception. You'd need to explicitly create multiple Sessions with a MessageConsumer/MessageListener each for this.

      FYI, we have a sophisticated JMS listener solution scheduled for Spring 1.3 (properly supporting concurrent reception, possibly also transactional reception), and are already beginning to get it into shape in our sandbox. Stay tuned for any updates in that area.

      Juergen

      Comment


      • #4
        Juergen,

        Thanks for the replies. I tried to use ActiveMQs JCA container, but ran into classpath issues related to the classes in the J2EE.jar file; so I decided to use JMS APIs directly. Everything seems to be working fine now; but I am concerned because you said that this isn't suitable for concurrent message reception. As far as I can tell, my logic *should* work for concurrent messages as well. Can you please quickly review the following code snippets and tell me why this solution will not work in the Multi threading environment?

        (My next option is to use the traditional MDBs, but as far as possible I would like to avoid that).

        Anyway, here's a code snippet from my Consumer class:

        Code:
            // This method will be triggered only once, via init-method="start"
            public void start() throws Exception {
        
                try {
                    connection = getJmsConf().getJmsConnectionFactory().createQueueConnection();
                    QueueSession session = null;
                    QueueReceiver queueReceiver = null;
        
                    //TODO: Set appropriate client id
                    connection.setClientID("foo");
                    connection.start();
        
                    session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
                    Queue queue = session.createQueue(getJmsConf().getQueueName());
                    queueReceiver = session.createReceiver(queue);
                    queueReceiver.setMessageListener(this);
                }
                catch (Exception ex) {
                    logger.error(ex.getMessage());
                    throw ex;
                }
            }
        
            // This *light-weight* method delegates processing to a SSB
            public void onMessage(Message message) {
        
                TextMessage txt = (TextMessage) message;
                try {
                    String text = txt.getText();
                    if (message.getJMSRedelivered()) {
                        logger.error("This message was redelivered:" + text);
                    } else {
                        logger.debug("This message is new" + text);
                        MessageDelegate.processMessage(_testUser, text);
                    }
                    message.acknowledge();
                }
                catch (JMSException e) {
                    logger.error("Message processing failed!);
                }
            }
        When a message is received, the MessageDelegate sends it to a Stateless Session Bean EJB for processing.

        As far as I can tell, my 'onMessage' method is thread-safe and it should process messages received concurrently safely. Am I missing something? Your help will be greatly appreciated.

        Thanks.

        - Ajay

        Comment


        • #5
          This looks OK to me in general, but be aware that a single JMS Session will essentially work single-threaded: that is, it will invoke your MessageListener from a single thread. Consequently, your MessageListener won't be invoked concurrently, which might lead to a bottleneck if many corresponding messages arrive in parallel.

          What we will provide in Spring 1.3 is essentially pretty similar what you're setting up there in a programmatic fashion, except that it's gonna use multiple Sessions in parallel (freshly created Sessions or a Session pool), with concurrent delivery to the MessageListener.

          Juergen

          Comment


          • #6
            Juergen,

            Thanks for the reply. Your point regarding the MessageListener becoming a *bottleneck* is well taken. For now we will proceed with our solution, and as soon as Spring 1.3 is available, we will switch to the *Multiple Listeners* solution offered by Spring.

            Thanks again!

            - Ajay

            PS: By the way, I would like to point out to everyone that, even though 'Pro Spring' book was not helpful for implementing JMS, it is none the less an EXCELLENT book for anyone who needs introduction of Spring as well as AOP.

            Comment


            • #7
              Hi Juergen and Team:

              Re 1.3 Async JMS, will it be tightly coupled to ActiveMQ or a particular JMS implementation, or will it support any JMS implementation? Or those that provide JCA resource adapters?

              I'm looking at porting a JBossMQ client app to Spring.

              Comment


              • #8
                Originally posted by mparaz
                Hi Juergen and Team:

                Re 1.3 Async JMS, will it be tightly coupled to ActiveMQ or a particular JMS implementation, or will it support any JMS implementation? Or those that provide JCA resource adapters?

                I'm looking at porting a JBossMQ client app to Spring.
                BTW If you're using Spring, then I'd suggest not using JBossMQ - its one of the worst JMS providers around - its very slow and has few features.

                I'd definitely recommend you look at either ActiveMQ or Joram as they are the 2 best OSS providers by far. If you want high performance then ActiveMQ gets the nod; but Joram is a reasonable JMS provider too.

                Comment


                • #9
                  Originally posted by Juergen Hoeller
                  Indeed, Spring 1.2 does not ship with any JMS support classes for asynchronous messaging. We do plan to come up with a comprehensive solution there, but have deferred this to the Spring 1.3 timeframe.

                  Juergen
                  Is there currently any release date for Spring 1.3 ?
                  Which solution to you recommend in the meanwhile, to be able to use asynchronous messaging ?

                  Thanks,
                  Sami Dalouche

                  Comment


                  • #10
                    Originally posted by samokk
                    Which solution to you recommend in the meanwhile, to be able to use asynchronous messaging ?
                    Ok, sorry, I saw you already posted the answer in the same thread

                    Comment


                    • #11
                      Originally posted by mparaz
                      Re 1.3 Async JMS, will it be tightly coupled to ActiveMQ or a particular JMS implementation, or will it support any JMS implementation? Or those that provide JCA resource adapters?
                      Actually, the goal is to provide a MessageListenerContainer that only works with JMS API and does not impose any special requirements on the setup of your JMS provider.

                      A JCA-based listener container can offer XA transactions that include the reception of the message, which the plain JMS approach can't. On the other hand, the plain JMS approach is significantly simpler and imposes less requirements on the JMS provider, so IMO it's definitely worth going down that route as well.

                      I expect the plain JMS approach to cover ~80% of all needs, in particular as receiving the message as part of an XA transaction is often not even desired: Messages that lead to transaction failure are often forwarded to some "invalid message" destination, without retrying them. An XA transaction rollback that includes message reception, on the other hand, would cause an immediate resend of the same message.

                      Juergen

                      Comment


                      • #12
                        Originally posted by samokk
                        Is there currently any release date for Spring 1.3?
                        Spring 1.3 RC1 is currently scheduled for end of August, with Portlet support and asynchronous JMS listening as main new features, plus serving as basis for Spring Web Flow 1.0 RC1. Spring 1.3 final will then probably be available by October.

                        Juergen

                        Comment


                        • #13
                          There is one issue I've run into in regard to producing messages. We configure our JMS provider (ActiveMQ) entirely in Spring. If you want to produce messages and participate in the surrounding JTA transaction, you then have to either 1) find a JCA conainer for Spring that can handle automatically registering the XASession with JTA (and handle pooling), 2) manually register XASession with JTA, or 3) write some pooling connection factory that also handles automatically registering and keeping track of XA sessions.
                          As for option #1, it doesn't look like there is (yet) a JCA container implementation for Spring that is oriented toward producing messages that will automatically register JMS sessions with JTA. Note that ActiveMQ's JCA container will participate in the surrounding JTA transaction, but only if you are producing within a MessageListener that happened to receive a message within a transaction - which is not always the case for us as we have components that produce messages that are not in response to a received message.
                          As a temporary solution, I've written a PooledSpringXAConnectionFactory that handles pooling and XA resource enlistment, while also ensuring that you will use the same XASession per thread per transaction. Take a look at http://jira.logicblaze.com/jira/browse/AMQ-303 - again, this is primarily for producing, though it can be made to work for consumption as well.

                          - Andy

                          Comment


                          • #14
                            What is a suitable destroy() Method for Message Driven POJO?

                            Hi all,

                            I have done something similar to what is described here by Ajay, regardless of this being non-transacted and single-threaded.

                            Code:
                                // This method will be triggered only once, via init-method="start"
                                public void start() throws Exception {
                            
                                    try {
                                        connection = getJmsConf().getJmsConnectionFactory().createQueueConnection();
                                        QueueSession session = null;
                                        QueueReceiver queueReceiver = null;
                            
                                        //TODO: Set appropriate client id
                                        connection.setClientID("foo");
                                        connection.start();
                            
                                        session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
                                        Queue queue = session.createQueue(getJmsConf().getQueueName());
                                        queueReceiver = session.createReceiver(queue);
                                        queueReceiver.setMessageListener(this);
                                    }
                                    catch (Exception ex) {
                                        logger.error(ex.getMessage());
                                        throw ex;
                                    }
                                }
                            
                                // This *light-weight* method delegates processing to a SSB
                                public void onMessage(Message message) {
                            
                                    TextMessage txt = (TextMessage) message;
                                    try {
                                        String text = txt.getText();
                                        if (message.getJMSRedelivered()) {
                                            logger.error("This message was redelivered:" + text);
                                        } else {
                                            logger.debug("This message is new" + text);
                                            MessageDelegate.processMessage(_testUser, text);
                                        }
                                        message.acknowledge();
                                    }
                                    catch (JMSException e) {
                                        logger.error("Message processing failed!);
                                    }
                                }
                            I am using WL8.1 SP4 JMS Provider, and Ant to automatically deploy and undeploy my app.

                            I have registered this bean with an init method, which creates and sets up a listener to a Queue. Queue and ConnectionFactories are wired into my bean by Spring.

                            Given the above scenario, what would be a proper destroy method? I have already coded the following, due to a javax.jms.InvalidClientIDException during redeploy.

                            Code:
                                /**
                                 * Removes message listeners and consumers from Queue. Used to simplify redeployment of SITE app.
                                 * @throws JMSException 
                                 */
                                public void destroy() throws JMSException {
                                    queueReceiver.setMessageListener(null);
                                    queueReceiver.close();
                                    queueReceiver = null;
                                    connection.stop();
                                    connection = null;
                                }
                            But I still get that horrible InvalidClientIDException whenever I try to redeploy, or even when trying a deploy-undeploy-deploy sequence. The only way is resetting my server, which is a little time-consuming. This is strange, since I would think that once my app is undeployed, connections stopped and receivers closed, all consumers registered with this Queue would shut down. Perhaps I am missing some other object which needs to be closed or nulled?

                            What should I do in order to redeply my app without this Exception?

                            Help please.

                            J.

                            Comment


                            • #15
                              Re: What is a suitable destroy() Method for Message Driven P

                              Originally posted by JSRamos
                              This is strange, since I would think that once my app is undeployed, connections stopped and receivers closed, all consumers registered with this Queue would shut down. Perhaps I am missing some other object which needs to be closed or nulled?

                              What should I do in order to redeply my app without this Exception?
                              Duh! my mistake. I was stopping the connection instead of closing it. That way all message consumers are unregistered. Here´s the complete destroy() method:

                              Code:
                                  public void destroy() throws JMSException {
                                      queueReceiver.setMessageListener(null);
                                      queueReceiver.close();
                                      LOG.debug("SITE - SPEI - Closed queue receiver.");
                                      queueReceiver = null;
                                      connection.close();
                                      LOG.debug("SITE - SPEI - Stopped connection.");
                                      connection = null;
                                  }
                              Thank you all for lending an ear.

                              J.

                              Comment

                              Working...
                              X