Announcement Announcement Module
Collapse
No announcement yet.
jms messages stuck in JMS MQ Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • jms messages stuck in JMS MQ

    Hi All!

    I don't know, if it's feature or a bug, but the following thing gives me lot of headache and I can't solve it, or just find out what is the cause.

    I have a server, where I have spring integration app, which reads messages from JMS message queue (openMQ) and if there is a message, then it sends the message to a service activator, which puts back the message to the JMS message queue if the call is successfull.
    It works fine, if the service activator works in syncronous mode.
    But when I change it to asyncronous, then a few messages stuck in the JMS message queue. I can see those messages in the message queue, but the jms inbound channel doesen't want to read it out from it.
    If I restart my app, then it reads out the messages from the message queue successfully.
    I tried to play with the queue size and with the thread-pool-task-executor parameteres with no success.
    I can't see the logic, when and why stuck some message in the message queue. I don't get any error messages, or warnings in my logs, so I totally pointless what is happening.


    My config look like this:
    Code:
        <jms:inbound-channel-adapter id="jmsInChannel"
                                     connection-factory="springconnectionfactory"
                                     channel="fromMQ"
                                     destination="toSAQueue" >
            <poller max-messages-per-poll="1" receive-timeout="5000" 
    send-timeout="60000" >
                <interval-trigger interval="1000" time-unit="MILLISECONDS" />
            </poller>
        </jms:inbound-channel-adapter>
    
        <beans:bean id="toSAQueue"
                    class="com.sun.messaging.Queue">
            <beans:constructor-arg index="0" value="toSA.queue"/>
        </beans:bean>
    
        <channel id="fromMQ" >
            <rendezvous-queue/>
        </channel>
    
        <service-activator id="MyServiceActivator" 
    input-channel="fromMQ" output-channel="toMQ"
                                method="callMyService"
                                ref="myService">
             <poller  max-messages-per-poll="10"> <!--  
    task-executor="threadPoolExecutor" -->
                 <interval-trigger interval="50" time-unit="MILLISECONDS"/>
             </poller>
        </service-activator>
        <thread-pool-task-executor id="threadPoolExecutor" queue-capacity="10" 
    core-size="1" keep-alive-seconds="60" max-size="1"/>
    
        <beans:bean id="myService" 
    class="hu.treff7es.myService"
         <beans:bean id="serviceImpl" 
    class="hu.treff7es.myService.ServicesImpl"/>
    
        <channel id="toMQ"/>
    
        <jms:outbound-channel-adapter id="jmsMyServiceOutChannel"
                                      connection-factory="springconnectionfactory"
                                      channel="toMQ"
                                      destination="toSaveQueue" 
    extract-payload="true"/>
    
        <beans:bean id="toSaveQueue"
              class="com.sun.messaging.Queue">
            <beans:constructor-arg index="0" value="toSave2.queue"/>
        </beans:bean>
    I would be happy, if somebody could give me any advice or idea to solve this problem!
    I have spent a lot of time to solve this problem with no success!

    Thanx for you in advance!

  • #2
    still not working

    Well, unfortunatelly we played in the meantime with the polling parameters and with the queue sizes, but we wasn't able to figure out why are messages stuck in the jms MQ.
    We can't figure out, why those messages aren't read out by the jms inbound channel. (and why it read out after a restart of the application (the application where the jms inbound channel is define))
    Any Idea would be appreciated!

    Comment


    • #3
      Have you tried setting the log level to DEBUG and also checking the logs for any errors picked up from the errorChannel?

      Also, what is the reason you need polling in both the service-activator and the JMS queue?

      Comment


      • #4
        Thanks for the quick answer!

        The poller at the JMS queue was just a desperate try.
        In the original version we didn't have a poller at the JMS queue.

        Today we ran the program with DEBUG enabled. (the configuration was the same as the attached with 2 modification:
        1. We deleted the poller at the jms queue
        2. we increased the number of threads at thread-pool-task-executor (core-size="2" and max-size="2")

        Results:
        32 messages out of 50 messages were stucked (we could see those messages in the MQ) in the MQ. The other 18 were processed successfully.

        It's interesting, that we can't see any log message for the stucked messages, but we can for the processed 18 messages (
        like this:
        Code:
        2009-11-30 18:16:07,163 DEBUG task-scheduler-3 org.springframework.integration.jms.HeaderMappingMessageConverter (
        HeaderMappingMessageConverter.java:158) - converted JMS Message [
        Class:			com.sun.messaging.jmq.jmsclient.BytesMessageImpl
        getJMSMessageID():	ID:2429-10.0.0.111(a3:18:1f:46:7b:24)-3599-1259601355646
        getJMSTimestamp():	1259601355646
        getJMSCorrelationID():	null
        JMSReplyTo:		null
        JMSDestination:		toSA.queue
        getJMSDeliveryMode():	PERSISTENT
        getJMSRedelivered():	false
        getJMSType():		null
        getJMSExpiration():	0
        getJMSPriority():	3
        ]
        )

        If we put in a new message in the queue, then it started to process that messages, but it left the stucked messages in the MQ)

        After a restart the program started to process the stucked messages.
        I can give more logs, or to do any modification.

        So we are confused now with the problem!
        Any idea, what are we doing wrong?

        Comment


        • #5
          Which messaging provider are you using?

          This sounds like a prefetch issue.

          Comment


          • #6
            We are using OpenMQ.
            What can we do?
            The problem only happens in asyncronous mode. In syncronous mode it works great, but the service, which we call with the service acticator is too slow to use in syncronous mode.

            Originally posted by chudak View Post
            Which messaging provider are you using?

            This sounds like a prefetch issue.

            Comment


            • #7
              We've been having the same exact problem with SI 2.0.0.M1 and haven't made any headway. In our case, we send 200 messages to the queue, processing from the message queue stops around 100 messages, and 100 remain. When the app is restarted, the remaining 100 messages are processed. Unlike treff7es above, we're running synchronously. We've tried both ActiveMQ and OpenMQ with identical results.

              Our application scenario is that we receive a request to a webservice which, in turn, invokes a Spring bean. We intercept the invocation via an SI Gateway which puts the request onto a message queue. An SI service activator pulls the message off of the queue and invokes the target service. Then, a response is returned back through the chain. This works fine with the only issue being that inbound messages are left on the queue.

              This is our first SI project, so I'm sure it's just a configuration error on our part. Any insight or suggestions would be greatly appreciated. I'm sure we can figure this out together.

              Thanks!

              Here is our configuration file.

              Code:
              <!--  Sun OpenMQ declarations [BEGIN] -->
              	<bean id="mqConnectionFactoryInitialiser" 	 class="com.mycopmany.mq.MqConnectionFactoryInitialiser">
                      <property name="configuration">
                          <props>
                              <prop key="imqAddressList">localhost:7676</prop>
                              <prop key="imqAddressListBehavior">RANDOM</prop>
                          </props>
                      </property>
                  </bean>
                  <bean id="mqConnectionFactory"
                   	factory-bean="mqConnectionFactoryInitialiser"
                   	factory-method="createConnectionFactory"/>   
                  
                  <alias name="mqConnectionFactory" alias="connectionFactory"/>
              <!--  Sun OpenMQ declarations [END] -->
              
              
                  <bean id="ProductLookupRequestQueue" class="com.sun.messaging.Queue">
                      <constructor-arg type="java.lang.String" value="queue.ProductLookupRequest"/>
                  </bean>   
                  <bean id="ProductLookupResponseQueue" class="com.sun.messaging.Queue">
                      <constructor-arg type="java.lang.String" value="queue.ProductLookupResponse"/>
                  </bean>   
              
                  <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
                      <property name="connectionFactory" ref="mqConnectionFactory"/>
                      <property name="defaultDestination" ref="ProductLookupRequestQueue"/>
                      <property name="receiveTimeout" value="20000"/>
                  </bean>   
              <!--  Sun OpenMQ declarations [END] -->
              
              
              	<si:poller id="poller" default="true">
              		<si:interval-trigger interval="1000"/>
              	</si:poller>
              	
              	 
              	
              	
              	<!-- JMS gateway declaration -->	
              	<jms:inbound-gateway id="jmsin" 
                        request-destination="ProductLookupRequestQueue"
                        request-channel="ProductLookup-Request"/>
                    
              	<jms:outbound-gateway request-channel="ProductLookup-Response"
              						  request-destination="ProductLookupRequestQueue"
               						  reply-channel="ProductLookup-Response"/>
              	
              	<jms:channel id="ProductLookup-Request" queue="ProductLookupRequestQueue"/>
              	
              
               
              	<si:service-activator 
              		input-channel="ProductLookup-Request"
              		output-channel="ProductLookup-Response"
              		ref="lookupProdOrchestrator"
              		method="invoke" /> 
              	
              
              	<si:gateway id="lookupProdGateway"
              		service-interface="com.mycompany.gateway.I_ProductLookupGateway"
              		default-request-channel="ProductLookup-Request"
              		default-reply-channel="ProductLookup-Response" />
              Last edited by Solstice; Dec 8th, 2009, 10:50 AM.

              Comment


              • #8
                Chudak gave me an idea and I turned off cacheConsumer from the ConnectionFactory and it solved our problem.

                Our connection factory now looks like this:
                Code:
                <bean id="springconnectionfactory" class="org.springframework.jms.connection.CachingConnectionFactory">
                      <property name="targetConnectionFactory"  ref="connectionfactory" />
                      <property name="sessionCacheSize" value="10"/>
                      <property name="cacheProducers" value="false"/>
                      <property name="cacheConsumers" value="false"/>
                    </bean>
                I would appreciate, if somebody could explain to me, what happened and why cacheConsumer helped to us. (i googled a lot, but I couldn't find the exact answer)

                Comment


                • #9
                  I deleted this post so as not to cause any confusion - Sol
                  Last edited by Solstice; Dec 1st, 2009, 12:17 PM.

                  Comment


                  • #10
                    @Solstice: I think you are confusing the Camel "Cache Consumer" with the "cacheConsumers" boolean property on Spring's CachingConnectionFactory.

                    The setting on the Spring CachingConnectionFactory has nothing to do with Camel. It is an indication of whether JMS MessageConsumer instances should be cached along with their Session.

                    Comment


                    • #11
                      Originally posted by Mark Fisher View Post
                      @Solstice: I think you are confusing the Camel "Cache Consumer" with the "cacheConsumers" boolean property on Spring's CachingConnectionFactory.

                      The setting on the Spring CachingConnectionFactory has nothing to do with Camel. It is an indication of whether JMS MessageConsumer instances should be cached along with their Session.
                      Ah, you're right, my mistake. I was searching through the ActiveMQ documentation for information on it and found the reference within Camel. I have redacted the post so as not to cause any confusion.

                      At any rate, can you offer some thoughts on our similar problems? Or would it be more appropriate to move my question to a new thread? It's not my intention to threadjack.
                      Last edited by Solstice; Dec 1st, 2009, 12:17 PM.

                      Comment


                      • #12
                        Well, I tried switching over to ActiveMQ and setting and setting the cacheConsumers property to false and messages are still being left on the queue. I'm still not sure why disabling the cache would solve your problem, treff7es.

                        Mark, any thoughts on why it would prevent this problem?

                        As an aside, reading through the SI forum, I see that Mark has been extremely active and helpful. I do hope you know that your efforts are greatly appreciated by the community.

                        Comment


                        • #13
                          I'm not sure too. So, I hope Mark or somebody else can give us some answer!
                          We would appreciate that!

                          Originally posted by Solstice View Post
                          Well, I tried switching over to ActiveMQ and setting and setting the cacheConsumers property to false and messages are still being left on the queue. I'm still not sure why disabling the cache would solve your problem, treff7es.

                          Mark, any thoughts on why it would prevent this problem?

                          As an aside, reading through the SI forum, I see that Mark has been extremely active and helpful. I do hope you know that your efforts are greatly appreciated by the community.

                          Comment


                          • #14
                            Pressing on, in addition to messages being left on the queue, I've noticed that each invocation of the gateway is resulting in two messages being enqueued. I haven't been able to figure out why this has been happening. I suspect that eliminating the duplicate messages will also resolve the stuck messages as well.

                            Again, the application flow is this: An object calls a service. The request is intercepted by a gateway and is placed on a channel. The request is then placed on a JMS queue, which is managed by ActiveMQ. The message is received at the other end, dequeued, and handled by a service activator which invokes the target service. A response is then delivered back along the chain to the requesting object in the first step. This is intended to be a synchronous request/reply message.

                            Our goal is to physically separate the clients (in this case a webservice endpoint) and the services and to facilitate communication via a message bus. Am I even taking the correct approach here?

                            Any advice is greatly appreciated.

                            Here is the current state of our configuration:
                            Code:
                            <!--  ActiveMQ declarations [BEGIN] -->			
                            	<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
                            		<property name="targetConnectionFactory">
                            			<bean class="org.apache.activemq.ActiveMQConnectionFactory">
                            				<property name="brokerURL" value="tcp://localhost:61616" />
                            			</bean>
                            		</property>
                            		<property name="sessionCacheSize" value="10"/>
                            		<property name="cacheProducers" value="false"/>
                            	</bean>
                            
                            	<bean id="requestQueue" class="org.apache.activemq.command.ActiveMQQueue">
                            		<constructor-arg value="ProductLookupRequestQueue"/>
                            	</bean>
                            <!--  ActiveMQ declarations [END] -->
                            
                            
                                <bean id="ProductLookupRequestQueue" class="com.sun.messaging.Queue">
                                    <constructor-arg type="java.lang.String" value="ProductLookupRequestQueue"/>
                                </bean>   
                                <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
                                    <property name="connectionFactory" ref="connectionFactory"/>
                                    <property name="defaultDestination" ref="ProductLookupRequestQueue"/>
                                    <property name="receiveTimeout" value="10000"/>
                                </bean>   
                            
                            	
                            	<!-- JMS gateway declaration -->	
                            	<jms:inbound-gateway id="jmsin" 
                                   request-destination="ProductLookupRequestQueue"    
                                   request-channel="ProductLookup-Request"/>
                            
                            	<jms:channel id="ProductLookup-Request" queue="ProductLookupRequestQueue" />
                            	 
                            	<si:service-activator 
                            		input-channel="ProductLookup-Request"		
                            		ref="lookupProdDOrchestrator"
                            		method="invoke"/> 
                            		
                            	<si:gateway id="lookupProdDGateway"
                            		service-interface="company.app.gateway.I_ProductLookupGateway"
                            		default-request-channel="ProductLookup-Request"/>

                            Thanks!
                            Last edited by Solstice; Dec 4th, 2009, 04:14 PM.

                            Comment


                            • #15
                              Mark, can you offer any suggestions of what to try?

                              Thanks in advance,

                              Sol

                              Comment

                              Working...
                              X