Announcement Announcement Module
Collapse
No announcement yet.
JMS queues and gateway Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • JMS queues and gateway

    I am trying to create a gateway component.

    I want my gateway to send/receive messages from jms queues. The gateway interface is, approxmately, like this:

    interface HotelService {
    Collection<HotelReservation> findAvailableHotels( HotelSearch search ) ;
    }

    If I invoke the method ad pass in a search, it correctly sends that search to an utbound jms gateway and the message arrives on the queue and - in another process - invokes the message driven pojos method and it even responds, build a Sprign Integration Message and packaging it with a payload of type Collection<HotelReservation> and sending it out on the jms queue which feeds into the reply channel on the gatewa in the first process. I can even add a WireTap interceptor to the reply channel to confirm that the channel is indeed being polled correctly and the payload is there, its just that the gateway never returns it. its like its idling. I suspected it might have something to do with correlation, or perhaps payload type. So Ive ensured that a correlation id is propogated from both processes any time a message is set.

    oddly, the gateway works if i simply declare two channels and fashion the message driven pojo out of the same channels as is used in the gateway (essentially making the message driven pojo and gateway share the same process space, which is what im trying to avoid. i want them decoupled via a JMS queue.)

    thanks

  • #2
    Josh,

    Can you provide the configuration file as well?

    Thanks,
    Mark

    Comment


    • #3
      Hi Mark,

      There are two files. One for the client (which contains just enough configuration to (as soon as we find what simple thing I've overlooked!) receive and send to the JMS queue) and one for the service (which contains the configuration for the message driven pojo that receives the inbound JMS message and sends it out on the outbound JMS queue)

      I'm including both. Pardon the thrashing in the configuration, and - as usual - thank you for your time.

      ===================
      SERVICE
      ===================
      Code:
      <?xml version="1.0" encoding="UTF-8"?>
      <beans:beans
      	xmlns:beans="http://www.springframework.org/schema/beans"
      	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      	xmlns="http://www.springframework.org/schema/integration"
      	xmlns:context="http://www.springframework.org/schema/context"
      	xmlns:jms="http://www.springframework.org/schema/integration/jms"
      	xsi:schemaLocation="http://www.springframework.org/schema/beans
                   http://www.springframework.org/schem...-beans-2.5.xsd http://www.springframework.org/schema/context
                   http://www.springframework.org/schem...ontext-2.5.xsd http://www.springframework.org/schema/integration
                   http://www.springframework.org/schem...ration-1.0.xsd
                   http://www.springframework.org/schema/integration/jms http://www.springframework.org/schem...on-jms-1.0.xsd
                   ">
      
      	<context:annotation-config />
      
      	<beans:bean
      		id="connectionFactory"
      		class="org.springframework.jms.connection.CachingConnectionFactory">
      		<beans:property
      			name="targetConnectionFactory">
      			<beans:bean
      				class="org.apache.activemq.ActiveMQConnectionFactory">
      				<beans:property
      					name="brokerURL"
      					value="tcp://localhost:61616" />
      			</beans:bean>
      		</beans:property>
      		<beans:property
      			name="sessionCacheSize"
      			value="10" />
      		<beans:property
      			name="cacheProducers"
      			value="false" />
      	</beans:bean>
      
      	<beans:bean
      		id="jmsTemplate"
      		class="org.springframework.jms.core.JmsTemplate">
      		<beans:property
      			name="connectionFactory"
      			ref="connectionFactory" />
      	</beans:bean>
      
      	<beans:bean
      		id="vacationServiceImpl"
      		class="com....myholiday.VacationServiceImpl" />
      
      	<channel
      		id="outboundHotelReservationSearchResultsChannel">
      		<interceptors>
      			<beans:bean
      				class="org.springframework.integration.channel.interceptor.WireTap">
      				<beans:constructor-arg
      					ref="spy" />
      			</beans:bean>
      		</interceptors>
      	</channel>
      
      	<jms:message-driven-channel-adapter
      		channel="inboundHotelReservationSearchChannel"
      		extract-payload="false"
      		connection-factory="connectionFactory"
      		destination-name="inboundHotelReservationSearchDestination" />
      
      	<channel
      		id="spy" />
      
      	<channel
      		id="inboundHotelReservationSearchChannel" />
      
      	<service-activator
      		input-channel="spy"
      		ref="vacationServiceImpl"
      		method="spy" />
      
      
      	<service-activator
      		input-channel="inboundHotelReservationSearchChannel"
      		ref="vacationServiceImpl"
      		method="findHotels"
      		output-channel="outboundHotelReservationSearchResultsChannel" />
      
      	<jms:outbound-channel-adapter extract-payload="false"
      		destination-name="outboundHotelReservationSearchResultsDestination"
      		channel="outboundHotelReservationSearchResultsChannel"
      		connection-factory="connectionFactory" />
      
      </beans:beans>


      ===================
      CLIENT
      ===================

      Code:
      <?xml version="1.0" encoding="UTF-8"?>
      <beans:beans
      	xmlns:beans="http://www.springframework.org/schema/beans"
      	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      	xmlns="http://www.springframework.org/schema/integration"
      	xmlns:context="http://www.springframework.org/schema/context"
      	xmlns:jms="http://www.springframework.org/schema/integration/jms"
      	xsi:schemaLocation="http://www.springframework.org/schema/beans
                   http://www.springframework.org/schem...-beans-2.5.xsd http://www.springframework.org/schema/context
                   http://www.springframework.org/schem...ontext-2.5.xsd http://www.springframework.org/schema/integration
                   http://www.springframework.org/schem...ration-1.0.xsd
                   http://www.springframework.org/schema/integration/jms http://www.springframework.org/schem...on-jms-1.0.xsd
                   ">
      	<channel
      		id="responses" />
      	<channel
      		id="inboundHotelReservationSearchResultsChannel" />
      	<channel
      		id="requests">
      		<interceptors>
      			<beans:bean			class="com....myholiday.MyCustomInterceptorToAddCorrelation" />
      		</interceptors>
      	</channel>
      	
      	<poller
      		default="true">
      		<interval-trigger
      			interval="1000" />
      	</poller>
      
      	<beans:bean
      		id="jmsTemplate"
      		class="org.springframework.jms.core.JmsTemplate">
      		<beans:property
      			name="connectionFactory"
      			ref="connectionFactory" />
      	</beans:bean>
      
      	<jms:outbound-channel-adapter
      		channel="requests"
      		extract-payload="false"
      		connection-factory="connectionFactory"
      		destination-name="inboundHotelReservationSearchDestination" />
      
      	<jms:inbound-channel-adapter
      		extract-payload="false"
      		channel="responses"
      		connection-factory="connectionFactory"
      		destination-name="outboundHotelReservationSearchResultsDestination" />
      
      
      	<gateway
      		id="vacationService"
      		service-interface="com....myholiday.VacationService" />
      
      	<beans:bean
      		id="connectionFactory"
      		class="org.springframework.jms.connection.CachingConnectionFactory">
      		<beans:property
      			name="targetConnectionFactory">
      			<beans:bean
      				class="org.apache.activemq.ActiveMQConnectionFactory">
      				<beans:property
      					name="brokerURL"
      					value="tcp://localhost:61616" />
      			</beans:bean>
      		</beans:property>
      		<beans:property
      			name="sessionCacheSize"
      			value="10" />
      		<beans:property
      			name="cacheProducers"
      			value="false" />
      	</beans:bean>

      Comment


      • #4
        Have you tried using the jms outbound-gateway for the client and the jms inbound-gateway for the server?

        Comment


        • #5
          OK, gven your advice, I made the changes. It's giving me the rather obvious error that :

          Code:
          org.springframework.integration.message.MessageTimeoutException: failed to receive JMS response within timeout of: 5000ms
          It's obviously trying to do the right thing. I'm still debugging where the disconnect is...

          Anyway, here are the revised configurations:

          ==========
          SERVICE
          ==========
          Code:
          <?xml version="1.0" encoding="UTF-8"?>
          <beans:beans
          	xmlns:beans="http://www.springframework.org/schema/beans"
          	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          	xmlns="http://www.springframework.org/schema/integration"
          	xmlns:context="http://www.springframework.org/schema/context"
          	xmlns:jms="http://www.springframework.org/schema/integration/jms"
          	xsi:schemaLocation="http://www.springframework.org/schema/beans
                       http://www.springframework.org/schem...-beans-2.5.xsd http://www.springframework.org/schema/context
                       http://www.springframework.org/schem...ontext-2.5.xsd http://www.springframework.org/schema/integration
                       http://www.springframework.org/schem...ration-1.0.xsd
                       http://www.springframework.org/schema/integration/jms http://www.springframework.org/schem...on-jms-1.0.xsd
                       ">
          	<beans:import
          		resource="solution041.xml" />
          	<context:annotation-config />
          	<channel
          		id="spy" />
          	<service-activator
          		input-channel="spy"
          		ref="errorBean"></service-activator>
          	<channel
          		id="inboundHotelReservationSearchChannel" />
          	<channel
          		id="outboundHotelReservationSearchResultsChannel">
          		<interceptors>
          			<beans:bean
          				class="org.springframework.integration.channel.interceptor.WireTap">
          				<beans:constructor-arg
          					ref="spy" />
          			</beans:bean>
          		</interceptors>
          	</channel>
          	<beans:bean
          		id="vacationServiceImpl"
          		class="com...VacationServiceImpl" />
          	<jms:inbound-gateway
          		request-channel="inboundHotelReservationSearchChannel"
          		request-destination-name="inboundHotelReservationSearchDestination"
          		connection-factory="connectionFactory" />
          	<service-activator
          		input-channel="inboundHotelReservationSearchChannel"
          		ref="vacationServiceImpl"
          		method="findHotels"
          		output-channel="outboundHotelReservationSearchResultsChannel" />
          	<jms:outbound-channel-adapter
          		extract-payload="false"
          		destination-name="outboundHotelReservationSearchResultsDestination"
          		channel="outboundHotelReservationSearchResultsChannel"
          		connection-factory="connectionFactory" />
          </beans:beans>
          ==========
          CLIENT
          ==========
          Code:
          <?xml version="1.0" encoding="UTF-8"?>
          <beans:beans
          	xmlns:beans="http://www.springframework.org/schema/beans"
          	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          	xmlns="http://www.springframework.org/schema/integration"
          	xmlns:context="http://www.springframework.org/schema/context"
          	xmlns:jms="http://www.springframework.org/schema/integration/jms"
          	xsi:schemaLocation="http://www.springframework.org/schema/beans
                       http://www.springframework.org/schem...-beans-2.5.xsd http://www.springframework.org/schema/context
                       http://www.springframework.org/schem...ontext-2.5.xsd http://www.springframework.org/schema/integration
                       http://www.springframework.org/schem...ration-1.0.xsd
                       http://www.springframework.org/schema/integration/jms http://www.springframework.org/schem...on-jms-1.0.xsd
                       ">
          	<beans:import
          		resource="solution041.xml" />
          	<context:annotation-config />
          	<channel
          		id="responses" />
          	<channel
          		id="requests" />
          	<poller
          		default="true">
          		<interval-trigger
          			interval="1000" />
          	</poller>
          	<jms:outbound-gateway
          		request-destination-name="inboundHotelReservationSearchDestination"
          		reply-destination-name="outboundHotelReservationSearchResultsDestination"
          		request-channel="requests"
          		reply-channel="responses"
          		connection-factory="connectionFactory" />
          	<gateway
          		id="vacationService"
          		service-interface="com.....VacationService" />
          </beans:beans>
          Also, the solution01.xml just contains the shared definitions for the JMS connection factory and other things.

          Thanks,
          Josh

          Comment


          • #6
            It looks like you left the jms outbound-channel-adapter in your server configuration. Can you try removing that?

            Comment


            • #7
              I did that, and it didn't work (same reasons, jms timeout..), and I'm not sure I understood why it should have worked.

              The channel-adapter on the service was meant to let the service-activator do something and then send the output to the response jms destination.

              i was hoping that, if it was sent to the response destination, the gateway in the client would pick it up and somehow resolve it as the response.

              but if we get rid of the outbound channel adapter, and there's no way to specify a reply-channel on the inbound jms gateway, then the only thing that could possibly bind the the request and response together would be maybe a replyTo header or some use of a correlationId header on the request message, and thats only assuming the inbound-gateway knew/did something with our destination queue (the only option that looks even close to do doing that is inbound-gateway's default-reply-destination attribute..)

              So, what needs to happen for the gateway in vm1 to understand that message x on a particular destination 'response' is the reply to a request sent on destination 'request' - what does it use to tie them together?

              Thanks,
              Josh

              Comment


              • #8
                Try removing the output-channel from the service-activator. Then, when that service-activator sends its response it will rely on the temporary reply channel generated by the inbound-gateway and configured on the Spring Integration Message created from the JMS Message. I realize what might be confusing is the fact that the inbound-gateway does not itself have a 'reply-channel' option, and that is most likely why you were creating the additional outbound-channel-adapter.

                The jms outbound-gateway on the client side does set the JMSReplyTo header on the JMS Message that it sends. The inbound-gateway then sends the reply to that JMS Destination automatically which is why you don't need anything on the service side that has a reference to the response Destination (the JMSCorrelationId is also used to coordinate that client-server exchange).

                The main reason for that behavior is that typically multiple clients would be using the service so that the service cannot possibly know where it needs to send each response. Instead it expects the client to provide that (and then falls back to the default-response-destination setting if there is no JMSReplyTo for a received Message).

                I hope that clarifies things a bit. Let me know if it works and/or if you have any other questions.

                Comment


                • #9
                  That's fantastic! I'm going to write about this! Thanks, and cheers Mark.

                  Comment


                  • #10
                    Glad to hear that! I assume your response also means that you got it working after that last suggestion?

                    Regards,
                    Mark

                    Comment


                    • #11
                      JMS Outbound Gateway Timeout

                      Hi Mark,
                      I encountered similar problem. In my case jms outbound gateway always timed out even when there were reply messages in reply queue.

                      Here is my config:

                      Code:
                       	<integration:channel id="helloReply" />
                         	<integration:gateway id="hello" 
                         			default-request-channel="helloReq" 
                      			default-reply-channel="helloReply" 
                      			service-interface="com.capgroup.ta.Hello"/>
                      
                       	<jms:outbound-gateway request-channel="helloReq" reply-channel="helloReply" 
                       		request-destination="reqQueue" reply-destination="replyQueue" receive-timeout="3000"/>
                        
                        		  <!-- JMS connection configuration -->
                      	<bean id="reqQueue" class="com.tibco.tibjms.TibjmsQueue">
                      		<constructor-arg value="hello.req"/>
                      	</bean>
                        	<bean id="replyQueue" class="com.tibco.tibjms.TibjmsQueue">
                      		<constructor-arg value="hello.reply"/>
                      	</bean>
                      	<bean id="connectionFactory" class="com.tibco.tibjms.TibjmsQueueConnectionFactory">
                      	    <constructor-arg type="java.lang.String" value="tcp://mulan-z2:7201" />
                      	    <property name="userName" value="fxp" />
                      	    <property name="userPassword" value="fxppass" />
                      	</bean>
                      Error message:
                      Code:
                      -------------------------------------------------------------------------------
                      Test set: ron.test.TestJMSOutboundGateway
                      -------------------------------------------------------------------------------
                      Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 4.174 sec <<< FAILURE!
                      testJMSOutboundGateway(ron.test.TestJMSOutboundGateway)  Time elapsed: 3.236 sec  <<< ERROR!
                      org.springframework.integration.message.MessageTimeoutException: failed to receive JMS response within timeout of: 3000ms
                      	at org.springframework.integration.jms.JmsOutboundGateway.handleRequestMessage(JmsOutboundGateway.java:296)
                      	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:91)
                      	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:48)
                      	at org.springframework.integration.dispatcher.AbstractDispatcher.sendMessageToHandler(AbstractDispatcher.java:85)
                      	at org.springframework.integration.dispatcher.AbstractUnicastDispatcher.dispatch(AbstractUnicastDispatcher.java:54)
                      	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:56)
                      	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:116)
                      	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:94)
                      	at org.springframework.integration.channel.MessageChannelTemplate.doSend(MessageChannelTemplate.java:223)
                      	at org.springframework.integration.channel.MessageChannelTemplate.doSendAndReceive(MessageChannelTemplate.java:248)
                      	at org.springframework.integration.channel.MessageChannelTemplate.sendAndReceive(MessageChannelTemplate.java:215)
                      	at org.springframework.integration.gateway.AbstractMessagingGateway.sendAndReceiveMessage(AbstractMessagingGateway.java:176)
                      	at org.springframework.integration.gateway.AbstractMessagingGateway.sendAndReceive(AbstractMessagingGateway.java:159)
                      	at org.springframework.integration.gateway.AbstractMessagingGateway.sendAndReceive(AbstractMessagingGateway.java:150)
                      	at org.springframework.integration.gateway.GatewayProxyFactoryBean.invokeGatewayMethod(GatewayProxyFactoryBean.java:203)
                      	at org.springframework.integration.gateway.GatewayProxyFactoryBean.invoke(GatewayProxyFactoryBean.java:172)
                      	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:171)
                      	at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204)
                      	at $Proxy7.sayHelloBack(Unknown Source)
                      	at ron.test.TestJMSOutboundGateway.testJMSOutboundGateway(TestJMSOutboundGateway.java:29)
                      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
                      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
                      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
                      	at java.lang.reflect.Method.invoke(Method.java:597)
                      	at org.springframework.test.context.junit4.SpringTestMethod.invoke(SpringTestMethod.java:198)
                      	at org.springframework.test.context.junit4.SpringMethodRoadie.runTestMethod(SpringMethodRoadie.java:274)
                      	at org.springframework.test.context.junit4.SpringMethodRoadie$2.run(SpringMethodRoadie.java:207)
                      	at org.springframework.test.context.junit4.SpringMethodRoadie.runBeforesThenTestThenAfters(SpringMethodRoadie.java:254)
                      	at org.springframework.test.context.junit4.SpringMethodRoadie.runWithRepetitions(SpringMethodRoadie.java:234)
                      	at org.springframework.test.context.junit4.SpringMethodRoadie.runTest(SpringMethodRoadie.java:204)
                      	at org.springframework.test.context.junit4.SpringMethodRoadie.run(SpringMethodRoadie.java:146)
                      	at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.invokeTestMethod(SpringJUnit4ClassRunner.java:151)
                      Seems like the gateway never receive any messages from the JMS reply queue configured for the gateway. I got the same error message when I removed 'reply-destination="replyQueue"' from the config.

                      Any suggestions on what could be wrong?

                      Thanks
                      Ron
                      Last edited by Ron; Aug 21st, 2009, 01:30 PM.

                      Comment


                      • #12
                        Never mind.
                        Found that message id should be used in message selector when using static queue for JMS outbound gateway.
                        Ron

                        Comment


                        • #13
                          Originally posted by starbuxman View Post
                          That's fantastic! I'm going to write about this! Thanks, and cheers Mark.
                          Hello did you get a chance to write a blog or tutorial on this? Do share the URL with us.

                          Comment


                          • #14
                            Originally posted by Saravan View Post
                            Hello did you get a chance to write a blog or tutorial on this? Do share the URL with us.
                            On the client side, with the JMS outbound gateway configration in my previous post, there are several options to implement a server.

                            1. The most simple way is to use Spring Integration JMS inbound gateway. It sets the correlation ID for you. Here is my configuration.
                            Code:
                             <jms:inbound-gateway request-channel="helloIn" request-destination="reqQueue"/>
                             <integration:service-activator input-channel="helloIn" ref="helloService" method="sayHelloBack"/>
                            <bean id="reqQueue" class="com.tibco.tibjms.TibjmsQueue">
                            	<constructor-arg value="hello.req"/>
                            </bean>
                            2. I have to use a JMS header transformer to set the correlation ID with JMS message ID from the request message before sending it back as response when use JMS inbound adapter and outbound adapter. Here is my configuration.

                            Code:
                            	<jms:message-driven-channel-adapter destination="reqQueue" channel="helloIn"/>
                            	<jms:outbound-channel-adapter destination="replyQueue" channel="helloOut"/> 	
                            	<integration:chain input-channel="helloIn" output-channel="helloOut">
                             	 	<integration:service-activator ref="helloService" method="sayHelloBack"/>
                             	 	<integration:transformer ref="headerEnricher"/>
                             	</integration:chain>
                             	
                             	<bean id="reqQueue" class="com.tibco.tibjms.TibjmsQueue">
                            		<constructor-arg value="hello.req"/>
                            	</bean>
                            	<bean id="replyQueue" class="com.tibco.tibjms.TibjmsQueue">
                            		<constructor-arg value="hello.reply"/>
                            	</bean>
                            and the transformer code for setting the JMS header.
                            Code:
                            MessageEndpoint
                            public class HeaderEnricher {
                                protected final  Log log = LogFactory.getLog(this.getClass());
                                
                            	@Transformer
                            	public Message<?> transform(Message<?> msg) {
                            		return MessageBuilder.fromMessage(msg).
                            				setHeader("springintegration_jms_correlationId", msg.getHeaders().
                            				get("springintegration_jms_messageId")).build();
                            	}
                            }
                            3. In my case, the server is implemented in Tibco BusinessWorks, therefore JMSCorrelationID has to be set correctly to work with Spring Integration JMS outbound gateway. In Java, the code would be similar to what is implemented in ChannelPublishingJmsMessageListener class used in the JMS inbound gateway.

                            Code:
                            	if (replyMessage != null) {
                            		javax.jms.Message jmsReply = this.messageConverter.toMessage(replyMessage, session);
                            		if (jmsReply.getJMSCorrelationID() == null) {
                            					jmsReply.setJMSCorrelationID(jmsMessage.getJMSMessageID());
                            		}
                            		MessageProducer producer = session.createProducer(jmsMessage.getJMSReplyTo());
                            		producer.send(jmsMessage.getJMSReplyTo(), jmsReply);
                            	}
                            Ron

                            Comment

                            Working...
                            X