Announcement Announcement Module
Collapse
No announcement yet.
Starting Multiple Consumers Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Starting Multiple Consumers

    I have been wondering about what would be the best way to go about implementing creating multiple consumers with Spring Integration Framework.

    So to elaborate on the question, I am more interested in scaling the consumers that are consuming messages from a RabbitMQ broker. In the amqp example that I could run, there is a producer and a consumer. Now lets say, I am just interested in the consumer part of it. That just makes up one consumer with a context.xml file where I can define the wiring required for that consumer. What if I have the need to incrementally add more consumers as and when needed from my application.

    Since I am a newbie, if I think of the layman approach, it would be having multiple context files for each consumer, but in that case I would need to know the number of consumers before hand which I do not know for my application. My application should be able to add consumers by mere clicks. Is this possible with Spring Integration? I am stuck since in the approach that I plan to take I would have to write the configuration for each consumer separately which I is not what I am looking for.

    Any thoughts or suggestions ??

    Thanks.

  • #2
    If you simply want more consumers within a single consumer-side process, then you can provide the 'concurrent-consumers' on your inbound adapter (e.g. amqp:inbound-channel-adapter).

    Comment


    • #3
      Thanks for the quick reply Mark. I did look at concurrent-consumers attribute as an option but then all my consumers would be getting their messages from one single adapter. What if I want to shut down one of the consumer out of all I have? I don't think that would be possible with this, since currently I have modeled a single consumer with the rabbitmq adapter and if i use concurrent-consumer for that adapter, and then with my control bus shutdown the rabbit adapter, that would imply all the remaining consumers on that adapter would also not receive message. I want to be able to scale and have full control over each consumer.

      Comment


      • #4
        Well, you can create multiple adapters that listen to the same queue(s). If you want each of those to have a single consumer, that's the default.

        That said, can you describe your rationale a bit more?

        Comment


        • #5
          Yes exactly what my current option is. But here is why it does not work for me. So I want to create a monitoring and management configuration such that I should be able to start/pause/resume and stop any consumer as and when required. Also, I require that I should be able to add consumers or in other words want to scale the consumers though my application. Now lets say, there application is a simple GUI that has a button to add consumers. Now if I need control over each of them, I need to create consumers the way you replied in the post above ( the default way each having their own adapter) But now this is not possible since the wiring would still need to be hard coded correct? I mean the xml will not get generated on the fly by mere click of add consumer. I will have to manually add these consumers each having their own rabbit adapter listening to the queue for messages eventually leading me to hard code the configuration/wiring for each consumer.

          Let me know if this is still not clear enough and I can may be try and explain better. Thanks and your help is greatly appreciated !

          Comment


          • #6
            Starting Multiple Consumers

            If you want to consume concurrently from a queue, then you must use a different session for each consumer.
            This is because you must have a session per thread. The JMS contract is that only 1 session is used by one thread at once - which if you're using consumers means that only 1 consumer can receive messages at once if using the same session. So if you want concurrent consumption of messages, you need to use different sessions for each consumer.
            However its your call on how many connections you have. In a normal network based mode, sharing the same connection across sessions will essentially multiplex traffic across a single socket (and so reduce resource usage at the cost of more synchronization). So sometimes there's a need to use more connections, as typically using different sockets concurrently tends to be a bit faster than using a single one).
            Incidentally in the JCA specifications in J2EE 1.4, Resource Adapters will typically create 1 connection for each session, so there's a 1-1 mapping between the two.

            Comment


            • #7
              I you want to control the life cycle of multiple consumers in the way you describe, one technique would be to use parent/child application contexts; the main (root) application would contain the bulk of your integration flow, it would start with a channel. Declare the inbound adapter in a separate application context and, when you create it, set the main context as the parent. This will give the child context access to the beans in the parent context, including the inbound channel, rabbit infrastructure, etc. The inbound contexts can then send messages into the main context.

              Now, you can create/start/stop/destroy child contexts as you please, with each one having a separate listener container/consumer.

              Comment


              • #8
                Ok. This looks like something I can start looking into...I do not have much experience with parent/child application context since I never had the need to use one, but I will dig into it and see what I can extract. Do you have any particular samples/links that you think may be helpful to me?

                Comment


                • #9
                  There's a sample here https://github.com/SpringSource/spri...ed/dynamic-ftp that dynamically creates application contexts on the outbound side (for dynamic ftp). However, it doesn't need parent/child contexts. For that, just use this constructor...

                  Code:
                  	/**
                  	 * Create a new ClassPathXmlApplicationContext with the given parent,
                  	 * loading the definitions from the given XML files and automatically
                  	 * refreshing the context.
                  	 * @param configLocations array of resource locations
                  	 * @param parent the parent context
                  	 * @throws BeansException if context creation failed
                  	 */
                  	public ClassPathXmlApplicationContext(String[] configLocations, ApplicationContext parent) throws BeansException {
                  		this(configLocations, true, parent);
                  	}
                  Just remember that beans in the child context can reference beans in the parent, but not vice versa.

                  Comment


                  • #10
                    Thanks a lot Gary. Looking into this ..

                    Comment


                    • #11
                      I am looking into the same issue, and think that this parent/child context seems like a work around (albeit feasible in the short term), not a long term solution to the problem.

                      In my configuration context file for a IMAP/AMQP connector I have a number of "routes in". Each one has an inbound IMAP adapter, a filter bean, a transformer bean, and and outbound AMQP adapter. There are some channels and some Rabbit Infrastructure also defined for each route in.

                      I would like to be able to start and stop, and restart the "routes in" independent of one another. I see the ClassPathXmlApplicationContext and I understand what you are saying in theory, The idea is to put the inbound adapter for each "route in" into its own separate child context using ClassPathApplicationContext.

                      The child context can be created or destroyed, which would be the start/stop mechanism. Am I understanding correctly? What would be the mechanism for sending start and stop commands? a std-in adapter?

                      I have some experience with Integration Engines in healthcare. The Open source Mirth is a java based one. These engines have similar functionality to SI when it comes to channels, adapters, transformers, filters, etc, but they add a UI. The UI has WSWYG editor for the configurations, and it has a management console (threads, start, stop). The engines also have logging and queuing of messages is also built in. Given AMQP provides a generic queuing prtocol and Rabbit provides persistent queues detached from the integration piece, it is probably superior to the tightly coupled queuing strategy deployed in other integration engines I have seen.

                      It seems SI could also be the generic integration engine if it had the UI and management piece found in other products on the market.

                      We are interested in exploring this option in further detail.

                      Comment


                      • #12
                        You can send commands to any adapter using a <control-bus/>; send '@adapterName.stop()'.

                        You can also use JMX to access the Lifecycle methods; Spring has a convenience API making JMX much easier to use - there is an example here: http://forum.springsource.org/showth...007#post432007 but you would use the Lifecycle interface for the proxyInterface property.

                        You can then invoke Lifecycle interface methods (stop/start etc).

                        Comment


                        • #13
                          Originally posted by Gary Russell View Post
                          You can send commands to any adapter using a <control-bus/>; send '@adapterName.stop()'.

                          You can also use JMX to access the Lifecycle methods; Spring has a convenience API making JMX much easier to use - there is an example here: http://forum.springsource.org/showth...007#post432007 but you would use the Lifecycle interface for the proxyInterface property.

                          You can then invoke Lifecycle interface methods (stop/start etc).
                          I turned on JMX which was most of what I was looking for .

                          I am still looking into control bus. Whats the advantage over JMX? .
                          The control box docs are kind of limited on details here:
                          http://static.springsource.org/sprin...e/#control-bus

                          I am not sure how to add a control box (or 2) to my config....

                          Comment


                          • #14
                            The monitoring example cited in that post shows how to use the control bus. Here's a snippet of the code from DefaultTwitterService...

                            Code:
                            	public void stopTwitterAdapter() {
                            
                            		Message<String> operation = MessageBuilder.withPayload("@twitter.stop()").build();
                            
                            		this.controlBusChannel.send(operation);
                            
                            		if (this.dummyTwitter != null) {
                            			this.controlBusChannel.send(MessageBuilder.withPayload("@dummyTwitter.stop()").build());
                            		}
                            	}
                            As far as advantages Vs. JMX - it's just a different technique (sending messages Vs. direct API calls - just like any use of Spring Integration).

                            There's also an open Pull Request for that sample that shows yet another technique (using Spring Integration JMX adapters as an interface to the MBeanServer). https://github.com/SpringSource/spri...-samples/pulls

                            Comment

                            Working...
                            X