Announcement Announcement Module
Collapse
No announcement yet.
Some questions about Spring integration with AMQP Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Some questions about Spring integration with AMQP

    Hi all,

    I am an experienced Java developer, but fairly new to Spring in general and Spring Integration in particular, although I've read a LOT of stuff on the web about it and about AMQP/RabbitMQ in Spring/Java.

    We are building a software component (a Java Daemon) and we want to use Spring Integration with AMQP (RabbitMQ) to abstract the low-level messaging stuff and we are having some issues/questions on how to proceed.

    Basically our component receives AMQP messages (commands) from a predefined RabbitMQ inbound channel. At runtime, it can also create message receivers on dynamically to get messages from notification producers. Upon receiving those notifications, some business logic is executed and a newly created notification message is sent to a topic exchange on an outbound channel, using a routing key that is built based on the notification contents received on the dynamic notification inbound channel.

    So it looks like this:


    command channel (p2p, preconfigured, inbound) -->
    Our Java daemon
    notif. channel (topic, preconfigured, outbound) <-- <-- Notif. channel (p2p, dynamic, inbound)


    I was able to the the inbound command channel somewhat working, but I'm not sure how to proceed for the other channels.

    My spring xml configuration is the following:
    Code:
    <!-- inbound command channel -->
    <int-amqp:inbound-channel-adapter id="commandChannelAdapter" channel="commandChannel" queue-names="commandQueue" connection-factory="commandConnectionFactory" concurrent-consumers="5" task-executor="commandTaskExecutor"/>
    
    <int:channel id="commandChannel"/>
    	
    <int:service-activator id="commandServiceActivator" input-channel="commandChannel" ref="commandReceiver"/>
    
    <bean class="MyCommandReceiver" id="commandReceiver"/>
    
    <task:executor id="commandTaskExecutor" pool-size="5-25" queue-capacity="100" rejection-policy="CALLER_RUNS"/>
    
    <rabbit:admin connection-factory="commandConnectionFactory" />
    
    <rabbit:template id="amqpTemplate" connection-factory="commandConnectionFactory" />
                   
    <rabbit:connection-factory id="commandConnectionFactory" host="commandHost" port="8090" username="user" password="password" virtual-host="/"/>
    
    <rabbit:queue name="commandQueue" exclusive="true" />
    
    <!-- outbound notification channel -->
    <int-amqp:outbound-channel-adapter id="notificationChannelAdapter" channel="notificationChannel" exchange-name="notificationExchange" routing-key-expression="@notificationSender.getRoutingKey(elementID)"/>
        
    <bean class="MyNotificationSender" id="notificationSender"/>
    
    <int:channel id="notificationChannel"/>
    
    <rabbit:template id="amqpTemplate" connection-factory="notificationConnectionFactory" />
    
    <rabbit:connection-factory id="notificationConnectionFactory" host="notificationHost}" port="8091" username="username" password="password" virtual-host="/"/>
    		    		
    <rabbit:topic-exchange name="notificationExchange" />

    The class that receives the command messages:
    Code:
    @Component
    public class MyCommandReceiver {
    
       public void receiveMessage(Object message) {
          System.out.println("Received " + message);
       }
    }

    The class that sends the notification messages:
    Code:
    @Component
    public class MyNotificationSender {
       
       public String getRoutingKey(int elementID) {
          return "test.key";
       }
    
       @Publisher(channel = "notificationChannel")
       public String publishNotification(Object notification) {
          return "Test Notification Message"
       }
    
    }
    So my questions are:

    1) When I receive command messages, I configured the inbound channel to dispatch the message reception to task executors. However when I trace, I only ever see the first task executor handling messages, even if I send thousands of them. Why aren't my message receivers work concurrently as task executors as configured?

    2) For the inbound notification channels, they are created dynamically at runtime, as I understand the sprint integration is for channels that are preconfigured. How do I use sprint/sprint integration to create message receivers at runtime? I know how to do that in java using the lower level RabbitMQ APIs, but that defeats the purpose of what we want to do.

    3) How do you specify/build the messages to send on the outbound channel and how do you send them in Java code? I have a @publisher annotated method in the MyNotificationSender class, but I'm not sure at all I'm doing the right thing. If so, how do I call the sending method? If not, how is this done?

    4) When sending the outbound notification message to our topic exchange, we need a routing Key to be built dynamically using information received in the inbound notification message. I use the outbound-channel-adapter routing-key-expression="@notificationSender.getRoutingKey(elem entID) to have the MyNotificationSender#getRoutingKey method called, but again I'm not sure at all this is the way to do it. How do you do this?


    Thanks in advance for your answers,
    /Sebastien

  • #2
    Hi!

    You ask several questions.
    So, don't be in a hurry, we'll try to answer them all ASAP.
    For a start to first one:
    However when I trace, I only ever see the first task executor handling messages, even if I send thousands of them.
    Not sure what's going on.
    My test-case:
    Config
    HTML Code:
    <channel id="toRabbit"/>
    
    <amqp:outbound-channel-adapter channel="toRabbit"
    					   exchange-name="si.test.exchange"
    					   routing-key="si.test.binding"
    					   amqp-template="amqpTemplate"/>
    
    <amqp:inbound-channel-adapter channel="fromRabbit"
    					  queue-names="si.test.queue"
    					  connection-factory="connectionFactory"
    					  concurrent-consumers="5"
    					  task-executor="commandTaskExecutor"/>
    
    <task:executor id="commandTaskExecutor" pool-size="5-25" queue-capacity="100" rejection-policy="CALLER_RUNS"/>
    
    <logging-channel-adapter id="fromRabbit" logger-name="test.logger"/>
    
    <rabbit:connection-factory id="connectionFactory"/>
    
    <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"/>
    
    <rabbit:admin connection-factory="connectionFactory"/>
    
    <rabbit:queue name="si.test.queue" exclusive="true"/>
    
    <rabbit:direct-exchange name="si.test.exchange">
    	<rabbit:bindings>
    		<rabbit:binding queue="si.test.queue" key="si.test.binding"/>
    	</rabbit:bindings>
    </rabbit:direct-exchange>
    Code
    Code:
    ApplicationContext ctx = new ClassPathXmlApplicationContext("EchoSample-context.xml", EchoSample.class);
    		MessageChannel toRabbit = ctx.getBean("toRabbit", MessageChannel.class);
    		for (int i = 0; i < 10; i++) {
    			toRabbit.send(new GenericMessage<String>("test" + i));
    		}
    And the logs
    Code:
    2013-07-12 09:24:12,276 INFO (commandTaskExecutor-3) [test.logger] - <test2>
    2013-07-12 09:24:12,276 INFO (commandTaskExecutor-1) [test.logger] - <test1>
    2013-07-12 09:24:12,276 INFO (commandTaskExecutor-5) [test.logger] - <test3>
    2013-07-12 09:24:12,276 INFO (commandTaskExecutor-2) [test.logger] - <test0>
    2013-07-12 09:24:12,292 INFO (commandTaskExecutor-2) [test.logger] - <test5>
    2013-07-12 09:24:12,293 INFO (commandTaskExecutor-3) [test.logger] - <test7>
    2013-07-12 09:24:12,293 INFO (commandTaskExecutor-5) [test.logger] - <test6>
    2013-07-12 09:24:12,294 INFO (commandTaskExecutor-2) [test.logger] - <test8>
    2013-07-12 09:24:12,294 INFO (commandTaskExecutor-3) [test.logger] - <test9>
    2013-07-12 09:24:12,292 INFO (commandTaskExecutor-1) [test.logger] - <test4>
    As you see with with config similar to yours everything looks good: there are several Threads who are involved to process messages from queue. And no more than 5.
    Maybe you have switched some specific RabbitMQ, where there can be only one consumer on the queue?..

    Take care,
    Artem

    Comment


    • #3
      The second question isn't clear.
      For the inbound notification channels, they are created dynamically at runtime,
      What do you mean? The RabbitMQ queues?
      How do I use sprint/sprint integration to create message receivers at runtime?
      Spring/Spring Integration
      Receiver - is a consumer, or a listener in terms of Spring AMQP. They should be configured to the concrate queue as you've done it via <int-amqp:inbound-channel-adapter>.
      What do you want to achieve with similar task?

      Comment


      • #4
        The third one:
        The @Publisher is OK. He is one of point to connect from you application to the messaging infrastructure.
        There are other components to have something similar: <service-actovator>, @Gateway etc.
        As I see, your MyNotificationSender is just a prototype. So, try to dig more into Spring Integration Reference Manual, to get an answer for your case.
        Now it's not clear, how it should work within your application.

        Comment


        • #5
          The forth:
          Not, it's OK. routing-key-expression is one of trick how to determine a real routing-key at runtime based on incoming Message.
          I don't know which logic you have here, but at a glance to invoke some bean method from this expression is good.

          Comment


          • #6
            Originally posted by Cleric View Post
            The second question isn't clear.

            What do you mean? The RabbitMQ queues?

            Spring/Spring Integration
            Receiver - is a consumer, or a listener in terms of Spring AMQP. They should be configured to the concrate queue as you've done it via <int-amqp:inbound-channel-adapter>.
            What do you want to achieve with similar task?
            Thanks for the quick answers!

            What happens is that we are creating the consumers dynamically i.e. the producer(s) that will produce messages is not known at startup, and will be added later. So we need a way to dynamically create the consumers and bind them to the producer/queue at runtime.

            Comment


            • #7
              Originally posted by Cleric View Post
              The third one:
              The @Publisher is OK. He is one of point to connect from you application to the messaging infrastructure.
              There are other components to have something similar: <service-actovator>, @Gateway etc.
              As I see, your MyNotificationSender is just a prototype. So, try to dig more into Spring Integration Reference Manual, to get an answer for your case.
              Now it's not clear, how it should work within your application.
              Basically what I want to know is how to you "send" a Message with my config in the Java code i.e. which method do you call to send the message to the outbound channel. In all the examples I've seen this is not clear.

              I know how to do it using the straight AMQP:

              class My rabbitSender {

              @Autowired
              private volatile RabbitTemplate template;

              ...
              public void sendMessage(String routingKey, object message) {
              template.convertAndSend(routingKey, message);
              }
              }
              But how do you achieve the same result using the spring integration abstractions?

              Comment


              • #8
                You can't add queues dynamically to existing listener containers, you would need to spin up a SimpleMessageListenerContainer yourself.

                You can stop the container, change the queues and restart it.

                Simply declare the container as a <bean/> and provide it to the adapter via the container attribute.

                If your "commandReceiver" method returns a reply

                Code:
                public String receiveMessage(Object message)
                set the output-channel of the <service-activator/> to "notificationChannel" and the reply (if not null) will go to the outbound adapter.

                Comment


                • #9
                  Originally posted by Gary Russell View Post
                  You can't add queues dynamically to existing listener containers, you would need to spin up a SimpleMessageListenerContainer yourself.

                  You can stop the container, change the queues and restart it.

                  Simply declare the container as a <bean/> and provide it to the adapter via the container attribute.
                  OK, so what you mean is that it is not possible to create inbound spring integration channels dynamically at runtime and I have to tap into the lower-level amqp api directly. Correct?

                  In my case, the rabbitMQ configuration (rabbitMQ server host, port etc.) for the "new" producers will be provided at runtime to the consumers. So the creation of the consumers/message receivers/queues will be done at runtime.

                  Originally posted by Gary Russell View Post

                  If your "commandReceiver" method returns a reply

                  Code:
                  public String receiveMessage(Object message)
                  set the output-channel of the <service-activator/> to "notificationChannel" and the reply (if not null) will go to the outbound adapter.
                  That wouldn't work, because as per my description above, the input channels for the notifications are the dynamically configured ones (see above). The command channel receiver does not reply anything, it just does not internal logic through an internal api.

                  Comment


                  • #10
                    lower-level amqp api directly.
                    Well the lower level Spring-AMQP classes, yes (not the lower level rabbit client).

                    However, this thread discusses a technique for spinning up child application contexts and hooking them up to an inbound channel in the main context.

                    Although that discussion is about FTP, the same techniques can be used for rabbit adapters.


                    If you decide to spin up your own containers programmatically, the preferred way of sending messages into a Spring Integration flow (either in this case, or for your notifiction case) is to use a Messaging Gateway, where you inject a framework generated implementation of an interface into your "sender" bean.

                    Comment


                    • #11
                      Originally posted by Gary Russell View Post
                      Well the lower level Spring-AMQP classes, yes (not the lower level rabbit client).

                      However, this thread discusses a technique for spinning up child application contexts and hooking them up to an inbound channel in the main context.

                      Although that discussion is about FTP, the same techniques can be used for rabbit adapters.
                      Thanks I'll be looking into that

                      Originally posted by Gary Russell View Post
                      If you decide to spin up your own containers programmatically, the preferred way of sending messages into a Spring Integration flow (either in this case, or for your notifiction case) is to use a Messaging Gateway, where you inject a framework generated implementation of an interface into your "sender" bean.
                      Ok, but can't I use something simple like this to send the message to the outbound notification channel?

                      Code:
                      import org.springframework.integration.Message;
                      import org.springframework.integration.MessageChannel;
                      import org.springframework.integration.support.MessageBuilder;
                      
                      @Component
                      public class MyNotificationSender {
                         
                         @Autowired
                         @Qualifier("notificationChannel")  //the <int:channel id="notificationChannel"/> is defined in xml config
                         MessageChannel channel;
                      
                         public String sendNotification(Object notification) {
                            Message<String> message = MessageBuilder.withPayload("test notification").build();
                            channel.send(message);
                         }
                      }

                      Comment


                      • #12
                        Yes, of course; using the <gateway/> simply avoids you having any direct framework compile-time dependencies in your code.

                        We generally recommend coding with POJOs where possible and only introducing dependencies (such as Message<?>) when absolutely necessary.

                        Comment


                        • #13
                          Allright, thanks a lot for your help :-)

                          /Sebastien

                          Comment


                          • #14
                            Ok, I got almost everything I wanted to work, except this:

                            I'm getting the following exception when trying to send a message on my outbound channel

                            [ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.2.1:java (default-cli) on project com.ericsson.ericloud.commander: An exception occured while executing the Java class. null: InvocationTargetException: Expression evaluation failed: @notificationSender.getRoutingKey(elementID): EL1057Epos 1): No bean resolver registered in the context to resolve access to bean 'notificationSender' -> [Help 1]


                            but the bean referred to by notificationSender is indeed configured and the method getRoutingKey exists:

                            Code:
                            ...
                            	<bean
                                	class="com.ericsson.ericloud.commander.notifications.NotificationSender"
                                	id="notificationSender"/>
                                	
                            	<int-amqp:outbound-channel-adapter 
                            		id="notificationChannelAdapter"
                            		channel="notificationChannel"
                            		exchange-name="notificationExchange"
                            		routing-key-expression="@notificationSender.getRoutingKey(elementID)" />
                            
                            ...
                            Code:
                            @Component
                            public class NotificationSender {
                            ...
                               public String getRoutingKey(int elementID) {
                                  ...
                               }
                            ...
                            }
                            What I am doing wrong here?

                            Thx,
                            /Sebastien

                            Comment


                            • #15
                              You are not doing anything wrong.

                              Unfortunately that was a bug; the fix is currently only available in the 3.0.0.M2 release (available from the spring milestone repostitory).

                              https://jira.springsource.org/browse/INT-2999

                              However, forum post mentioned in the JIRA issue has a work-around, if you don't want to use a milestone release.

                              Comment

                              Working...
                              X