Announcement Announcement Module
Collapse
No announcement yet.
Simple AMQP question Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Simple AMQP question

    Hello,

    I've created a sample application using the build snapshot of spring integration and amqp. I'm running RabbitMQ and am able to run the sample tests included with the integration-amqp source.

    I've got a sample client that creates a simple message, for which I would like to get a reply after a sequence of hops have been made through the messaging system. The topology is this:
    1. inboundChannel
    2. serviceActivator
    3. router
    4. replyChannel or invalidChannel

    The client code posts the message to the inboundChannel and registers itself for replies on both the reply and invalid channels.

    My spring integration config looks like this:

    Code:
    <beans:bean id="serviceActivator" class="com.sms.connect.adapter.rest.smsRestAdaptor">
            <beans:constructor-arg value="http"/>
            <beans:constructor-arg value="localhost"/>
            <beans:constructor-arg value="8080"/>
            <beans:constructor-arg value="api"/>
            <beans:constructor-arg value="v1"/>
        </beans:bean>
    
        <beans:bean id="router" class="com.sms.connect.adapter.rest.smsRestRouter"/>
    
        <amqp:inbound-channel-adapter channel="smsInboundChannel"
            queue-names="sms-inbound-queue" connection-factory="rabbitConnectionFactory" />
    
        <channel id="smsInboundChannel">
            <interceptors>
                <wire-tap channel="logger" />
            </interceptors>
        </channel>
    
        <service-activator input-channel="smsInboundChannel"
            ref="serviceActivator" method="handleMessage" output-channel="smsOutputChannel" />
    
        <channel id="smsOutputChannel">
            <interceptors>
                <wire-tap channel="logger" />
            </interceptors>
        </channel>
    
        <router input-channel="smsOutputChannel" ref="router" method="route"/>
    
        <channel id="smsInvalidChannel"/>
        <channel id="smsReplyChannel"/>
    
        <logging-channel-adapter id="logger" level="ERROR" />
    
        <rabbit:queue name="sms-inbound-queue"/>
    My router simply returns smsInvalidChannel or smsReplyChannel.

    The behavior I am seeing is that the message keeps repeating. Perhaps there is a poller that reexecutes the same message? The message is never removed from RabbitMQ.

    So first I'd like to know how to stop the message from repeating and have it removed from the queue. Second, how do I specify a queue name on which to publish the response on the invalid and reply channels.

    So far my client looks like this (it is not polling for a reply right now--it's only submitting the request and looking for a response. I'm new to this...

    Code:
    public class RabbitMQClientTest {
        @Autowired
        private AmqpAdmin admin;
        @Autowired
        private AmqpTemplate template;
    
        @Test
        public void simpleProducerConsumerTest() {
            try {
                String sent = "gdvs";
                admin.declareQueue(new Queue("sms-inbound-queue"));
    
                Message message = new Message(sent.getBytes(), null);
    
                // write message
                template.convertAndSend(sent);
                // read message
                Object results = template.receiveAndConvert();
    
                //System.out.println("Msg: " + received);
                //Assert.assertEquals(sent, received);
    
            } catch (Exception e) {
                Assert.fail("Test failed: " + e.getLocalizedMessage());
            }
        }
    
    }
    Thanks,

    Scott

  • #2
    I imagine the repeat deliveries you are seeing are caused by an exception in the message listener - by default an exception causes the message to be re-queued on the broker. The exception is probably logged for you, but unless there is some more config that we haven't seen, I can guess that it is Spring Integration telling you that it has no way to deliver the messages you send to the channels downstream of the router (they are not connected to anything). That is related to one of your questions: if you want to send the messages back to the Rabbit broker, then you need an outbound adapter attached to each of the outbound channels.

    Your test looks like a good start, and it should at least send a message, but there are some problems. First, it seems to be sending and receiving from the same queue - the default queue that you have configured in the unseen autowired RabbitTemplate (probably not what you wanted). Second, you don't wait for a response, so at least some of the time the result from receiveAndConvert() will be null. Third (cosmetic only), the Message you declare is never used.

    Since a lot of this is really about Spring Integration, you might find it helpful to analyse the behaviour by taking AMQP out of the equation to start with. Also the Integration forum will be able to provide more detail if you still have questions about vanilla Spring Integration.

    Comment


    • #3
      I noticed toward the end of the day yesterday as I traced through the spring source that some kind of exception was being thrown. This morning I'm working on my example to create two reply queues that I will listen to asynchronously. If I can do that then I am in business.

      Thanks for the reply. I'll update when I know more.

      Comment


      • #4
        I was able to get an example of what I'm looking for to work using Spring AMQP, but not with Integration.

        Is there a working Spring Integration sample available that has the following features:
        1. A client that initiates message specifying a return address
        2. The client monitors the named queue (specified in the return address) for the reply
        3. An endpoint that receives the message
        4. The message is sent back on the queue specified by the client as the return address
        5. The client getting the message it was waiting for

        Surprisingly I have not been able to make this happen.

        My client looks like this when it sends the message:

        Code:
                    template.convertAndSend( msg, new MessagePostProcessor() {
                        public Message postProcessMessage(Message message) throws AmqpException {
                            message.getMessageProperties().setReplyTo("sms-reply-queue");
                            try {
                                message.getMessageProperties().setCorrelationId(UUID.randomUUID().toString().getBytes("UTF-8"));
                            }
                            catch (UnsupportedEncodingException e) {
                                throw new AmqpException(e);
                            }
                            return message;
                        }
                    });
        This specifies the queue name as
        sms-reply-queue
        The client monitors messages on the reply queue like this:
        Code:
                SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        	container.setConnectionFactory((ConnectionFactory) context.getBean("rabbitConnFactory"));
        	container.setQueueNames("sms-inbound-queue");
        	container.setMessageListener(new MessageListenerAdapter(new MessageListenerAdapter() {
                    public void handleMessage(String text) {
                        System.out.println("Message received: " + text);
                    }
                }));
        I can't get my integration solution to call back onto this reply queue, though I can get a similar solution to work without integration. This is a simple matter of semantics. Can anyone tell me what my spring integration config should look like to make this happen?

        As a starting point, here is what I have right now:

        Code:
        <amqp:inbound-channel-adapter channel="smsInboundChannel"
                queue-names="sms-inbound-queue" connection-factory="rabbitConnectionFactory" />
        
        <channel id="smsInboundChannel"/>
        
        <service-activator input-channel="smsInboundChannel" ref="smsRestAdaptor" method="handleMessage" />
        
        <channel id="smsOutputChannel"/>
        <!-- <stream:stdout-channel-adapter id="smsOutputChannel"/> -->
        
        <rabbit:queue name="sms-inbound-queue"/>
        <rabbit:queue name="sms-reply-queue"/>
        I haven't defined anything to associate with the smsOutputChannel and so I have an error. You can see that I've commented out the stdout-channel-adapter line, which I've done because I want the reply to go to queue the client named. So what I have definitely won't work; what I need is the last piece of the puzzle. How do I define an output channel for my service-activator.

        Keep in mind too that this is a simplified case. Later there will be several chained steps leading up to and away from my service activator. I will want to maintain the reply queue name throughout and send back to that named queue when the chain is complete.

        Thanks,
        Scott

        Comment


        • #5
          Scott, it sounds to me like you just want to use the amqp:outbound-gateway and amqp:inbound-gateway. Those handle the request/reply logic for you.

          Here's a sample configuration: https://github.com/SpringSource/spri...ts-context.xml

          Comment


          • #6
            Mark,

            All the samples provided in the integration tests point to the stdout-channel-adapter. There are no examples of spring configuration that leaves the choice of the reply queue/channel up to the requester. The GatewayEchoTest-context.xml file routes replies as such:

            Code:
            <amqp:outbound-gateway request-channel="outboundGatewayRequests" reply-channel="outboundGatewayReplies" routing-key="si.test.gateway.echo" amqp-template="amqpTemplate" />
            
            <console:stdout-channel-adapter id="outboundGatewayReplies" append-newline="true" />
            If I am understanding this correctly, the reply is sent to stdout and is not sent back to the requester--or is it?

            Comment


            • #7
              You can leave the "reply-channel" off of the outbound-gateway element, and it will instead use the replyChannel header that had been established by the requestor.

              Comment


              • #8
                Mark,

                When I attempt to leave off the reply-channel gateway element, the spring integration schema is invalid--the reply-channel attribute is required.

                Scott

                Comment


                • #9
                  What version are you using?

                  Comment


                  • #10
                    Right now it is a build snapshot version, since the spring integration for amqp is not available in the current release (as of two weeks ago). The versions are:

                    Spring AMQP: 1.0.BUILD-SNAPSHOT
                    Rabbit MQ Client: 2.5.0
                    Spring Framework: 3.0.6.RELEASE
                    Spring Integration: 2.1.0.BUILD-SNAPSHOT

                    Should I get the latest builds for each of these?

                    Thanks for your interest in helping me on this.

                    Scott

                    Comment


                    • #11
                      Just checking in to see what you think of this issue so far. Is it a bug that needs to be fixed, or a scenario that has not been tested? No pressure, just wondering if you haven't had time to look at it yet, or if you have been looking at it and it's been some work to figure out.

                      Comment


                      • #12
                        I didn't really have time to look any further since posting the question. However, the attribute should not be *required* according to the latest version of the XSD. Can you make sure you have the latest 2.1.0.BUILD-SNAPSHOT for Spring Integration (or maybe try 2.1.M2 if you want something more "bookmarkable").

                        The Spring AMQP, Spring Framework, and RabbitMQ client dependencies could be loaded transitively, and would be 1.0.0.RELEASE, 3.0.6.RELEASE, and 2.5.0 respectively.

                        Comment


                        • #13
                          I haven't gotten back to trying this yet. Since I had issues and limited time, I dropped to the Spring-Amqp level without using integration, and implemented my solution there, which works. I'm trying to get my project set up using Maven and it appears that in order to use integration, I'll have to pull a snapshot build in order to use SpringAMQP.

                          Is this the way that I need to proceed? When is this going to be part of the official release of integration?

                          Comment


                          • #14
                            You can grab 2.1.0.M3. It was released last week. It is a milestone (not yet GA), but our next release will be RC1 and then GA will follow shortly (aiming for end of November).

                            Comment


                            • #15
                              Excellent. I pulled that and now I'll run through my scenario again. Maybe with the latest build I can get it to work.

                              Thanks,

                              Scott

                              Comment

                              Working...
                              X