Announcement Announcement Module
Collapse
No announcement yet.
How to read from a message-driven-channel-adapter Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • How to read from a message-driven-channel-adapter

    Greetings,

    How can a Java program read from a message-driven-channel-adapter?

    I have declared the following outbond-channel-adapter:
    Code:
    <channel id="myOutboundChannel"/>
    <jms:outbound-channel-adapter id="jmsout" channel="myOutboundChannel" destination="outboundJmsQueue"/>


    Here is the Java code I use to send a message to the channel:
    Code:
    ChannelResolver channelResolver = new BeanFactoryChannelResolver(applicationContext);
    
    MessageChannel outputChannel = channelResolver.resolveChannelName("myOutboundChannel");
    
    Jaxb2Marshaller marshaller = Jaxb2Marshaller)applicationContext.getBean("marshaller");
            
    outputChannel.send(new StringMessage(marshaller.marshal(message)));


    Similarly, I define a message-driven-channel-adapter (based on the Spring Integration examples):
    Code:
    <jms:message-driven-channel-adapter id="jmsin"	destination="inboundJmsQueue" channel="jmsinToStdoutChannel"/>
    
    <channel id="jmsinToStdoutChannel"/>
    
    <stream:stdout-channel-adapter id="stdout" channel="jmsinToStdoutChannel" append-newline="true"/>


    All the above works great: I send an XML message and receive an XML reply that gets displayed on the console.

    What I'd like to be able to do is process the received XML message in my Java program.

    I've removed the stdout-channel-adapter and changed the message-driven-channel-adapter definition to:
    Code:
    <channel id="myInboundChannel"/>
    <jms:message-driven-channel-adapter id="jmsin"	destination="inboundJmsQueue" channel="myInboundChannel"/>


    I'd like to be able to read the XML message from "myInboundChannel" as a String, convert (unmarshall) the String into an object and process the object.

    I've not been able to find any sample code and thought someone on this forum would be able to point me in the right direction.

    Thanks!

  • #2
    First of all, on the sending side, consider isolating your code from Spring Integration, and let Spring Integration do the marshalling...

    Code:
    public interface MyGateway {
       public void send(MyObject theObject);
    }
    Code:
    <int:gateway id="toJmsGw"
       service-interface="my.company.MyGateway" default-request-channel="in"/>
    <int-xml:marshalling-transformer input-channel="in" output-channel="toJms"
        marshaller="marshaller" ... etc, etc  />
    <jms:outbound-channel-adapter id="jmsout" channel="toJms" destination="outboundJmsQueue"/>
    Your calling code then just needs to get a reference to the toJmsGw and call send().

    On the receiving side, you just need a simple Pojo

    Code:
    public class MyService {
        public void process(MyObject theObject) {
    ...
        }
    }
    Then, connect the message-driven-adapter to an xml unmarshalling-transformer, and connect that to a

    Code:
    <service-activator ref="myServiceInstance"/>
    Hope that helps.

    Comment


    • #3
      Hi Gary,
      Thanks for your reply. I should clarify the scenario. I have a client/server application. The server is implemented using the service-activator component as well as relying on Spring to do the marshaling and unmarshaling. I showed the code that the client is using to send a request message. The client receives the response from the server and displays it out on the console. So, the entire round trip processing works up to that point.

      ActiveMQ is the JMS implementation. It allows individual messages to be inspected. Everything looks good there.

      The piece that is missing is the client being able to read from the message channel (to read the reply from the server) as opposed to letting Spring display the message on the console.

      I will have to follow up on your suggestion to let Spring handle the marshaling on the client side.
      W2M

      Comment


      • #4
        What I am trying to say is that you don't have to "read" the reply from a channel; the message-driven adapter will call your code (service activator) on its thread.

        If it is a true request/reply scenario then consider using a jms outbound-gateway instead of a an outbound adapter; that way, the response will come back to you. Just be sure to unmarshall it first.

        See the gateway sample...

        Code:
        <jms:outbound-gateway request-channel="stdinToJmsoutChannel"
        					  request-destination="requestQueue"
        					  reply-channel="jmsReplyToStdoutChannel"/>
        You can specify a reply-destination too (inboundJmsQueue in your case).

        Route the message to the jms outbound gateway (after marshalling) and the reply will come back to you. as a response to the call (your calling thread will block in the gateway).

        Code:
        public interface myGateway {
        
           public MyResponseType sendRequest(MyRequestType object);
        
        }

        If you really want to roll your own, using your existing strategy, then simply make the message-driven adapter's channel a queue channel; then you can get a reference to that channel and do a receive() on it.

        Code:
        <channel id="replyChannel">
           <queue/>
        </channel>
        But we generally don't recommend that style. It's better to not to have your code depend directly on SI classes.

        Comment


        • #5
          Hi Gary,
          Your suggestion looks really neat (and is not one I would have considered). I've looked at the sample code and can define the following:
          Code:
          <channel id="myOutboundChannel"/>
          <channel id="myInboundChannel"/>
          <jms:outbound-gateway   request-channel="myOutboundChannel"
          			  request-destination="outboundJmsQueue"
          			  reply-channel="myInboundChannel"
                                              reply-destination="inboundJmsQueue" />
          The samples present everything in terms of standard I/O. So how do I get the bean that implements your "myGateway" interface hooked into the jms:outbound-gateway shown above? If I can do that, then the client code can easily retrieve the bean and call the sendRequest method. Of course, if I put the marshaler code before and after jms:outbound-gateway, I'll have to tie "myGateway" to the marshalers.

          Thanks again!

          Comment


          • #6
            Something like this...

            Code:
            public interface MyGateway {
            
               public MyResponseType sendRequest(MyRequestType object);
            
            }
            Notice you don't provide implementation of this interface; Spring Integration (SI) does that; in effect, he takes object, wraps it in a message, waits for a response message, unwraps it, and returns it.

            Code:
            <gateway id="gw" service-interface="my.company.MyGateway"
               default-request=channel="toMarshaller"/>
            
            <xml:marshalling-transformer input-channel="toMarshaller"
               output-channel="toJms" ... rest of config />
            
            <jms:outbound-gateway request-channel="toJms"
                 reply-channel="toUnmarshaller ... rest of config/>
            
            <xml:unmarshalling-transformer input-channel="toUnMarshaller"
              ... rest of config />  <!-- note: NO output-channel -->
            getBean("gw", MyGateway.class); and call sendRequest().

            Now, since the last element in the flow (unmarshaller) has no output channel, SI will route the reply to a reply channel that was automatically added to the original inbound message by the gateway. The will go back to the "gw" bean and your thread (which has been suspended all this time) will be resumed and the gateway method will return the result. By default, your thread will block indefinitely, but you can add a timeout if you wish.

            I don't want to cause any confusion but, if you want to do some work between calling the gateway and getting the result, simply make the return type Future<MyResponseType> and you can later do a get() on the Future to get the result. This technique is a little more advanced, and is described in 'Asynchronous Gateway' section in the reference guide.
            Last edited by Gary Russell; Jun 18th, 2011, 05:47 PM.

            Comment


            • #7
              Very cool. Thanks for this. It will simplify the client code. The client code is also a bean. It must be possible for it to be injected with the reference to the gateway? What type would I use?
              Thanks!

              Comment


              • #8
                Absolutely!

                Either

                Code:
                public class MyClientBean {
                
                  @Autowired
                  MyGateway gw;
                
                ...
                
                   // somemethod
                
                      MyResponse resp = gw.sendRequest(req);
                or

                Code:
                public class MyClientBean {
                
                  MyGateway gw;
                
                
                  public MyClientBean(MyGateway gw) { // <constructor-arg ref="gw"/>
                      this.gw = gw;
                  }
                
                
                ...
                
                  public void setGateway (MyGateway gw) { // <property name="gateway" ref="gw"/>
                    this.gw = gw;
                  }

                Comment


                • #9
                  Yes, of course, use the declared interface! It makes for an elegant design!
                  Thanks again!

                  Comment


                  • #10
                    One more thing... the jms outbound-gateway has a default timeout of 5 seconds; if you expect your server to take longer than that, you can increase it via the receive-timeout attribute.

                    If the timeout occurs, your thread calling the gateway will get an exception (unless you declare an error-channel on the <gateway/>, in which case you can handle it via a subscriber to that channel and return some other result to the client).

                    Note that the error-channel is declared on *your* gateway, not the jms gateway; think of an error channel as a sort-of 'try-catch' block on the integration flow.

                    Comment


                    • #11
                      Hi Again,

                      I made all the changes and the code is working half way. With the gateway, I can send a message and the server picks it up and generates a reply. However, the gateway never picks up the reply and instead times out. I increased the receive-timeout but that had no impact. In fact, after several attempts, the reply queue had several messages in it so the gateway would not have had to wait at all to pick the first one off the queue.

                      Here is the relevant code:
                      Code:
                      <gateway id="authenticationGateway" service-interface="com.xyz.MyAuthenticationGateway" default-request-channel="toMarshaller"	/>
                      
                      <channel id="toMarshaller"/>
                      	
                      <transformer id="object-to-xml" ref="marshaller" input-channel="toMarshaller" method="marshal" output-channel="jmsoutputchannel"/>	    
                      	
                      <channel id="jmsoutputchannel" />
                      	
                      <jms:outbound-gateway request-channel="jmsoutputchannel" request-destination="requestJmsQueue" 
                      	                      reply-channel="toUnmarshaller"     reply-destination="replyJmsQueue"/>
                      	
                      <channel id="toUnmarshaller" />
                      	
                      <transformer id="xml-to-object" ref="marshaller" input-channel="toUnmarshaller"  method="unmarshal" />
                      Notice that I used transformer elements as opposed to marshalling-transformer, etc. The former I was already using on the server and the latter I did not know how to configure.

                      Also, autowiring the gateway did not work, I had to use getBean on the application context.

                      Do you have any suggestions as to what I should try?

                      Thanks!

                      Comment


                      • #12
                        How does your server correlate a reply to a request?

                        When the reply arrives, the gateway needs to know which outbound message to correlate the response to.

                        If you use an inbound-gateway on the server, this will all be taken care of for you. If you are using disjoint adapters on the server (like you were trying to do on the client), you need to provide correlation information by giving the outbound gateway a correlation key.

                        The easiest thing to do would be to look at using an inbound-gateway on the server.

                        In any case, DEBUG logs are your friend; they provide lots of information.

                        Comment


                        • #13
                          Disjoint adapters are being used on the server.

                          To replace these with an inbound gateway, would I use the same approach to configuring it as I used with the outbound gateway? Is there a need to define a service-interface as was done with the outbound gateway?

                          On the server the same two transformers are used and are connected to a service activator. Would this piece have to change? I don't have the existing code at hand to show you.

                          I imagine that the new code would be the complement of the code that I listed in my previous post (#11) except as follows: The outbound-gateway becomes an inbound-gateway, the gateway with the service-interface attribute is replaced with the service activator and the input-channel and output-channel of both transformers are fully connected.

                          Does this seem correct?

                          Comment


                          • #14
                            Yes, the configuration would be a complement of the client...

                            inbound-gateway->unmarshaller->service-activator->marshaller

                            1. No output-channel on the marshaller, SI will automatically route the output to the gateway
                            2. No reply destination required on the inbound gateway (the oubound gateway puts the reply-destination in the JMS replyTo header). There is a default-reply-destination attribute for messages that don't have a replyTo header.
                            3. No need to define any service-interface

                            There is another timeout you need to worry about - this time, the default timeout of the inbound gateway is 1 second.

                            To summarize, there are 3 timeouts involved...

                            1. Timeout on programmatic gateway (interface); default infinity.
                            2. Timeout on outbound gateway; default 5 seconds for JMS.
                            3. Timeout on inbound gateway; default 1 second for JMS (actually, all inbound-gateways).

                            These need to be set to appropriate values for your application.

                            Comment


                            • #15
                              Beautiful. I followed your steps and everything worked the first time! You gave a number of insights that were essential to solving the problem but perhaps the most important key was in your post #12 about correlating requests with replies. So, besides working code, you've given me much food for thought! Thanks so very much for all your help.

                              Comment

                              Working...
                              X