Announcement Announcement Module
Collapse
No announcement yet.
confused about mesage gateway with JMS Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • confused about mesage gateway with JMS

    Hi,

    I am trying to port an example of the Spring Batch samples to JMS. The sample is available here[1]

    Basically, the ChunkMessageChannelItemWriter sends a request objet to the request channel and wait the responses coming back in the response channel.

    The message endpoint is the ChunkProcessorChunkHandler.

    I would like to upgrade this to a JMS-based example so that:

    * The request and response objects are handled by JMS queues
    * The message endpoint is an MessageListener where the number of concurrent theads can be configured

    (Long story short: upgrade this sample to a multi-thread / multi-process) sample.

    I have tried the following but the execution is executed in a single thread (probably missing an indirection)

    Code:
    <integration:annotation-config/>
        <integration:channel id="requests"/>
        <integration:channel id="replies">
            <integration:queue/>
        </integration:channel>
    
        <integration:poller max-messages-per-poll="1" id="defaultPoller"
                            default="true">
            <integration:interval-trigger interval="3000"/>
        </integration:poller>
    
        <bean id="messagingGateway" class="org.springframework.integration.gateway.SimpleMessagingGateway">
            <property name="requestChannel" ref="requests"/>
            <property name="replyChannel" ref="replies"/>
            <property name="replyTimeout" value="1000"/>
        </bean>
    
        <jms:message-driven-channel-adapter id="jmsIn" destination="batchChunkInQueue" channel="requests"/>
        <jms:outbound-channel-adapter id="jmsOut" destination="batchChunkOutQueue" channel="replies"/>
        <integration:service-activator input-channel="requests" output-channel="replies" ref="chunkHandler"/>
    Any advice or a reference to the documentation would be appreciated.

    Thanks!

    [1] https://src.springframework.org/svn/...ts-context.xml

  • #2
    The default number of concurrent consumers for the underlying message listener container is 1. You can modify that setting through the 'concurrent-consumers' and/or 'max-concurrent-consumers' attributes of the "message-driven-channel-adapter" element.

    Please post back to let us know if that solves your problem.

    Comment


    • #3
      Nope it did not. I think I am missing some base concepts here.

      Here's what I want:

      * Request is a JMS queue
      * Response is a JMS queue
      * ChunkHandler receives an object from the request queue and sends back a response to the response queue (Service Activator)
      * Regarding the messaging gateway, a call to send should send a JMS message on the request queue and a call to receive should act as a standard MessageListener on the response queue
      * The number of concurrent ChunkHandler instances reading for message can be configured (on-the-fly configuration as well)


      Code:
      <integration:channel id="requests"/>
          <integration:channel id="replies">
              <integration:queue/>
          </integration:channel> 
      
          <integration:poller max-messages-per-poll="1" id="defaultPoller"
                              default="true">
              <integration:interval-trigger interval="3000"/>
          </integration:poller>
      
          <bean id="messagingGateway" class="org.springframework.integration.gateway.SimpleMessagingGateway">
              <property name="requestChannel" ref="requests"/>
              <property name="replyChannel" ref="replies"/>
              <property name="replyTimeout" value="1000"/>
          </bean>
      
          <jms:message-driven-channel-adapter id="jmsIn" destination="batchChunkInQueue"
                                              channel="requests" concurrent-consumers="5"/>
          <jms:outbound-channel-adapter id="jmsOut" destination="batchChunkOutQueue" channel="replies"/>
          <integration:service-activator input-channel="requests" output-channel="replies" ref="chunkHandler"/>
      Thanks
      Last edited by snicoll; Dec 21st, 2009, 09:24 AM.

      Comment


      • #4
        OK. things are improving a bit but I am still puzzled that I can't find a working example of a JMS request/reply mechanism with SI.

        Here's the updated config (probably ugly and overcomplicated)

        Code:
         <integration:annotation-config/>   
        
            <integration:channel id="chunkOutput"/>
            <integration:channel id="chunkStatus"/>
            
             <bean id="messagingGateway" class="org.springframework.integration.gateway.SimpleMessagingGateway">
                <property name="requestChannel" ref="chunkOutput"/>
                <property name="replyChannel" ref="chunkStatus"/>
                <property name="replyTimeout" value="1000"/>
            </bean>
        
            <jms:outbound-gateway id="chunkSender"
                                  request-channel="chunkOutput"
                                  request-destination="batchChunkInQueue"/>
            <jms:inbound-gateway id="chunkResponseReceiver" request-channel="chunkStatus"
                                 request-destination="batchChunkOutQueue"/>
        
        
            <!-- Messages are received from the batchChunkInQueue and send to the chunkRequests channel. These
                 are processed by the chunkHandler service activator -->
            <integration:channel id="chunkRequests"/>
            <integration:channel id="chunkResponses"/>
        
            <jms:inbound-gateway id="chunkReceiver" request-channel="chunkRequests"
                                 request-destination="batchChunkInQueue"/>
            <jms:outbound-gateway id="chunkResponseSender"
                                  request-channel="chunkResponses"
                                  request-destination="batchChunkOutQueue"/>
        
            <integration:service-activator input-channel="chunkRequests" output-channel="chunkResponses" ref="chunkHandler"/>
        All works well until the receiver is supposed to dispatch the response

        Code:
        2009-12-22 11:20:29 [org.springframework.jms.listener.DefaultMessageListenerContainer#0-1] DirectChannel [DEBUG] preSend on channel 'chunkStatus', message: [Payload=ChunkResponse: jobId=1, stepContribution=[StepContribution: read=0, written=0, filtered=0, readSkips=0, writeSkips=0, processSkips=0, exitStatus=EXECUTING], successful=false][Headers={springintegration_jms_messageId=ID:llnp186-2335-1261477138812-2:3:1:1:1, springintegration_replyChannel=org.springframework.integration.channel.MessageChannelTemplate$TemporaryReplyChannel@30cd64, springintegration_timestamp=1261477229031, springintegration_id=d77a2055-13e4-4327-a38c-7f4e4995c380, springintegration_jms_redelivered=false, springintegration_jms_replyTo=temp-queue://ID:llnp186-2335-1261477138812-2:3:1, springintegration_errorChannel=org.springframework.integration.channel.MessageChannelTemplate$TemporaryReplyChannel@30cd64}]
        2009-12-22 11:20:29 [org.springframework.jms.listener.DefaultMessageListenerContainer#0-1] DefaultMessageListenerContainer [WARN] Execution of JMS message listener failed
        org.springframework.integration.message.MessageDeliveryException: Dispatcher has no subscribers.
        	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:97)
        	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:90)
        	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:43)
        	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:116)
        	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:94)
        	at org.springframework.integration.channel.MessageChannelTemplate.doSend(MessageChannelTemplate.java:223)
        	at org.springframework.integration.channel.MessageChannelTemplate.doSendAndReceive(MessageChannelTemplate.java:248)
        	at org.springframework.integration.channel.MessageChannelTemplate.sendAndReceive(MessageChannelTemplate.java:215)
        	at org.springframework.integration.channel.MessageChannelTemplate.sendAndReceive(MessageChannelTemplate.java:203)
        	at org.springframework.integration.jms.ChannelPublishingJmsMessageListener.onMessage(ChannelPublishingJmsMessageListener.java:209)
        	at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:518)
        	at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:479)
        	at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:451)
        	at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:323)
        	at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:261)
        	at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:982)
        	at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:974)
        	at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:876)
        	at java.lang.Thread.run(Thread.java:619)
        Apparently, my message gateway is not susbribed to the chunkStatus channel?

        Comment


        • #5
          replying to self:

          Code:
              <integration:annotation-config/>
          
          
              <integration:channel id="chunkOutput"/>
              <integration:channel id="chunkStatus">
                  <integration:queue/>
              </integration:channel>
          
              <bean id="messagingGateway" class="org.springframework.integration.gateway.SimpleMessagingGateway">
                  <property name="requestChannel" ref="chunkOutput"/>
                  <property name="replyChannel" ref="chunkStatus"/>
                  <property name="replyTimeout" value="1000"/>
              </bean>
          
          
              <jms:outbound-channel-adapter id="chunkSender" destination="batchChunkInQueue" channel="chunkOutput"/>
              <jms:message-driven-channel-adapter id="chunkResponseReceiver" destination="batchChunkOutQueue"
                                                  channel="chunkStatus"/>
          
          
              <!-- Messages are received from the batchChunkInQueue and send to the chunkRequests channel. These
                  are processed by the chunkHandler service activator -->
              <integration:channel id="chunkRequests"/>
              <integration:channel id="chunkResponses"/>
          
              <jms:message-driven-channel-adapter id="chunkReceiver" destination="batchChunkInQueue" channel="chunkRequests"/>
              <jms:outbound-channel-adapter id="chunkResponseSender" destination="batchChunkOutQueue" channel="chunkResponses"/>
          
              <integration:service-activator input-channel="chunkRequests" output-channel="chunkResponses" ref="chunkHandler"/>
          Any comment on this config is welcome of course

          Comment


          • #6
            Thanks snicoll, this is a very helpful JMS sample configuration for a distributed scenario I was looking for. For other folks looking at the solution the queue element in the chunkStatus channel is the essential to get rid of the "dispatch has no subscriber" error message.

            Comment


            • #7
              Give me complete config XML

              Hi,

              Can any body send me the complete XMl with name spaces for this configuration. I am trying to put it one of batch code but I am having some name spaces errors.

              Thanks in advace
              Vipin Goel

              Comment


              • #8
                Please help with jms request-reply configuration

                Hello,
                First of all thank you, snicoll, for posting your jms request-reply configuration. It took me ages to get past "dispatcher has no subscriber" issue until i stumbled upon your solution. However, in my case the client still fails to receive reply. It is blocked on waiting to receive reply and hangs indefinitely despite default-reply-timeout setting. I also tried the same example without specifying named reply-channel, so that the reply would be delivered via temporary reply queue and this never worked for me. I got "no output-channel or replyChannel header available" on the side of the service. I wonder if anyone could help debugging my configuration. Below is the project code:
                Client configuration
                Code:
                <?xml version="1.0" encoding="UTF-8"?>
                <beans xmlns="http://www.springframework.org/schema/beans"
                    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                    xmlns:context="http://www.springframework.org/schema/context"
                    xmlns:util="http://www.springframework.org/schema/util"
                    xmlns:jms="http://www.springframework.org/schema/jms"
                    xmlns:si="http://www.springframework.org/schema/integration"
                    xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
                    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
                            http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
                            http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd
                            http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.1.xsd
                            http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms-2.1.xsd
                            http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd" >
                
                <import resource="hornetq-connection-factory.xml"/>
                 
                    <bean id="htmlProcessorMessageTemplate"  class="org.springframework.jms.core.JmsTemplate">
                                <property name="connectionFactory" ref="connectionFactory"/>
                                <property name="defaultDestinationName" value="htmlProcessorRequest" />
                                <property name="explicitQosEnabled" value="true"/> 
                                <property name="deliveryMode">
                                    <util:constant static-field="javax.jms.DeliveryMode.NON_PERSISTENT" /></property>                                      
                                <property name="sessionAcknowledgeMode">
                                    <util:constant static-field="javax.jms.Session.AUTO_ACKNOWLEDGE" />
                                </property>
                     </bean>
                
                    <si:gateway id="htmlProcessorGateway" service-interface="com.si.investigation.HtmlProcessorGateway"
                          default-request-channel="htmlProcessorRequestChannel" default-reply-channel="htmlProcessorReplyChannel" default-reply-timeout="10" />
                
                    <si:channel id="htmlProcessorRequestChannel" />
                
                    <int-jms:outbound-channel-adapter channel="htmlProcessorRequestChannel" jms-template="htmlProcessorMessageTemplate"  />
                
                    <si:channel id="htmlProcessorReplyChannel" > <si:queue capacity = "100"/></si:channel>
                
                </beans>
                Service configuration
                Code:
                <?xml version="1.0" encoding="UTF-8"?>
                <beans xmlns="http://www.springframework.org/schema/beans"
                    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                    xmlns:context="http://www.springframework.org/schema/context"
                    xmlns:util="http://www.springframework.org/schema/util"
                    xmlns:jms="http://www.springframework.org/schema/jms"
                    xmlns:si="http://www.springframework.org/schema/integration"
                    xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
                    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
                            http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
                            http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd
                            http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.1.xsd
                            http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms-2.1.xsd
                            http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.1.xsd" >
                
                    <import resource="hornetq-connection-factory.xml"/>
                   
                    <bean id="htmlProcessorServiceActivator" class="com.si.vestigation.HtmlProcessorServiceActivator" />
                    <si:service-activator input-channel="htmlProcessorRequestChannel" ref="htmlProcessorServiceActivator"
                      method="reply" output-channel="htmlProcessorReplyChannel" />
                     
                     <!-- Listen to incoming messages on the JMS request queue -->
                    <si:channel id="htmlProcessorRequestChannel" />
                    
                    <int-jms:message-driven-channel-adapter id="htmlProcessorRequestChannelAdaptor"
                      channel="htmlProcessorRequestChannel" destination-name="htmlProcessorRequest"
                      connection-factory="connectionFactory" /> 
                
                     <si:channel id="htmlProcessorReplyChannel" />  
                     <int-jms:outbound-channel-adapter channel="htmlProcessorReplyChannel" destination-name="htmlProcessorReply"  connection-factory="connectionFactory" />
                     
                </beans>
                Code:
                public interface HtmlProcessorGateway {
                
                    @Gateway  
                    String sendAndReceiveString(String msgout);
                    
                }
                Many thanks

                Comment


                • #9
                  If you want to do

                  gateway->jms-outbound->jms-inbound->processandsendreply

                  It would be better to use

                  gateway->jms-outbound-gateway->jms-inbound-gateway->process

                  because the framework will take care of all the message correlation and the result of the process will get back to the originating gateway.

                  Hope that helps.

                  Comment


                  • #10
                    Many thanks. it really helps. I have reconfigured as you suggested. there is one problem that I cannot solve with gateways. How can I force messages to be non-persistent. I figured out how to do this with channel adapters, but not with gateways. Once again thank you for your reply.

                    Comment


                    • #11
                      Here are some examples from the test cases. You can control the persistence of both the request and the reply...

                      Code:
                      	<jms:outbound-gateway id="jmsGateway"
                      	                      request-destination-name="requestQueue"
                      	                      request-channel="requestChannel"
                      	                      explicit-qos-enabled="true"
                      	                      delivery-persistent="true"/>
                      
                      	<jms:inbound-gateway id="gatewayWithReplyQos"
                      					 	 request-destination-name="testDestinationName"
                      					 	 request-channel="requestChannel"
                      					 	 reply-time-to-live="12345"
                      					 	 reply-priority="7"
                      					 	 reply-delivery-persistent="false"
                      					 	 explicit-qos-enabled-for-replies="true"/>

                      Comment


                      • #12
                        Gary,

                        I've stumble upon your post while trying to configure amq x spring batch. Could you post your solution or xml's associated to it on GitHub?

                        Comment


                        • #13
                          This is a really old thread, that was reawakened recently about configuring message persistence.

                          I suggest you start a new thread, and explain exactly what you need.

                          Comment


                          • #14
                            Thanks in advance, will do that.

                            Comment

                            Working...
                            X