Announcement Announcement Module
Collapse
No announcement yet.
RabbitMQ and Control Bus Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • RabbitMQ and Control Bus

    Hi,

    Continuing from a previous reply to a thread (http://forum.springsource.org/showth...I&goto=newpost). I did try to use the control bus. I have looked at the example of control bus, however, I have a few questions. I modeled my consumer process with control bus, similar to the control bus example.

    My amqp rabbit adapter has an id = " rabbitAdapter" . Should the following statement stop the rabbitAdapter from listening??

    controlChannel.send(new GenericMessage<String>("@rabbitAdapter.start()")); ??

    I have modeled this based on the Control bus example..In the control bus example it is done as follows:

    controlChannel.send(new GenericMessage<String>("@inboundAdapter.start()")) ;

    Let me know your suggestions

  • #2
    If you want to *stop* it from listening, you should call stop() rather than start().

    Comment


    • #3
      My bad, I meant controlChannel.send(new GenericMessage<String>("@rabbitAdapter.stop()"))

      Comment


      • #4
        I just modified the amqp sample as folllows...

        Code:
                MessageChannel controlBus = context.getBean("controlBus", MessageChannel.class);
        
                controlBus.send(new GenericMessage<String>("@rabbitAdapter.start()"));
        
                Thread.sleep(6000);
        
                controlBus.send(new GenericMessage<String>("@rabbitAdapter.stop()"));
        Code:
            <!-- From RabbitMQ To STDOUT -->
        
        	<int:channel id="controlBus" />
        	
        	<int:control-bus input-channel="controlBus"/>
        
            <int-amqp:inbound-channel-adapter channel="fromRabbit" id="rabbitAdapter"
                                              queue-names="si.test.queue" auto-startup="false"
                                              connection-factory="connectionFactory" />
        And all worked fine...

        Code:
        2012-05-24 12:52:11,243  INFO | main | o.s.integration.amqp.inbound.AmqpInboundChannelAdapter  | started rabbitAdapter 
        xxx
        2012-05-24 12:52:16,140  INFO | SimpleAsyncTaskExecutor-1 | org.springframework.integration.handler.LoggingHandler  | [Payload=xxx][Headers={timestamp=1337878336139, id=9fdbf001-7b86-42aa-9f09-6e652e1c4858, amqp_receivedRoutingKey=si.test.binding, amqp_deliveryMode=PERSISTENT, amqp_contentType=text/plain, amqp_receivedExchange=si.test.exchange, amqp_contentEncoding=UTF-8, amqp_redelivered=false, amqp_deliveryTag=1}] 
        xxx
        2012-05-24 12:52:17,245  INFO | main | o.s.amqp.rabbit.listener.SimpleMessageListenerContainer | Waiting for workers to finish. 
        2012-05-24 12:52:18,144  INFO | main | o.s.amqp.rabbit.listener.SimpleMessageListenerContainer | Successfully waited for workers to finish. 
        2012-05-24 12:52:18,144  INFO | main | o.s.integration.amqp.inbound.AmqpInboundChannelAdapter  | stopped rabbitAdapter

        Notice auto-startup="false" to prevent the adapter from immediately starting.

        Comment


        • #5
          Thank you so much for your help Russell. Looks like that may have stopped it. I do not know why it was not stopping earlier, may be I had another adapter running elsewhere.

          Follow up question on that is, does stopping the adapter stop the broker or does it just stop the consumer. I would guess the consumer?
          Additionally, my output of stopping on the console looks different than the one you pasted above. Here is what I get:


          2012-05-24 13:28:09,983 INFO | Thread-0 | o.s.context.support.ClassPathXmlApplicationContext | Closing org.springframework.context.support.ClassPathXmlAp plicationContext@2d58f9d3: startup date [Thu May 24 13:28:08 EDT 2012]; root of context hierarchy
          2012-05-24 13:28:09,987 INFO | Thread-0 | o.s.context.support.DefaultLifecycleProcessor | Stopping beans in phase 0
          2012-05-24 13:28:09,987 INFO | Thread-0 | o.s.context.support.DefaultLifecycleProcessor | Stopping beans in phase -2147483648
          2012-05-24 13:28:09,988 INFO | Thread-0 | o.s.integration.endpoint.EventDrivenConsumer | Removing {service-activator} as a subscriber to the 'controlChannel' channel
          2012-05-24 13:28:09,988 INFO | Thread-0 | org.springframework.integration.channel.DirectChan nel | Channel 'controlChannel' has 0 subscriber(s).
          2012-05-24 13:28:09,989 INFO | Thread-0 | o.s.integration.endpoint.EventDrivenConsumer | stopped org.springframework.integration.config.ConsumerEnd pointFactoryBean#0
          2012-05-24 13:28:09,989 INFO | Thread-0 | o.s.integration.endpoint.EventDrivenConsumer | Removing {message-handler:consoleOut} as a subscriber to the 'fromRabbit' channel
          2012-05-24 13:28:09,989 INFO | Thread-0 | org.springframework.integration.channel.DirectChan nel | Channel 'fromRabbit' has 0 subscriber(s).
          2012-05-24 13:28:09,989 INFO | Thread-0 | o.s.integration.endpoint.EventDrivenConsumer | stopped consoleOut
          2012-05-24 13:28:09,989 INFO | Thread-0 | o.s.integration.endpoint.EventDrivenConsumer | Removing {logging-channel-adapter:loggingChannel.adapter} as a subscriber to the 'loggingChannel' channel
          2012-05-24 13:28:09,989 INFO | Thread-0 | org.springframework.integration.channel.DirectChan nel | Channel 'loggingChannel' has 0 subscriber(s).
          2012-05-24 13:28:09,989 INFO | Thread-0 | o.s.integration.endpoint.EventDrivenConsumer | stopped loggingChannel.adapter
          2012-05-24 13:28:09,989 INFO | Thread-0 | o.s.integration.endpoint.EventDrivenConsumer | Removing {logging-channel-adapter:_org.springframework.integration.errorLogg er} as a subscriber to the 'errorChannel' channel
          2012-05-24 13:28:09,989 INFO | Thread-0 | o.s.integration.channel.PublishSubscribeChannel | Channel 'errorChannel' has 0 subscriber(s).
          2012-05-24 13:28:09,989 INFO | Thread-0 | o.s.integration.endpoint.EventDrivenConsumer | stopped _org.springframework.integration.errorLogger
          2012-05-24 13:28:09,989 INFO | Thread-0 | o.s.beans.factory.support.DefaultListableBeanFacto ry | Destroying singletons in org.springframework.beans.factory.support.DefaultL istableBeanFactory@3caa4b: defining beans [channelInitializer,$autoCreateChannelCandidates,or g.springframework.integration.internalDefaultConfi guringBeanFactoryPostProcessor,org.springframework .integration.config.ExpressionControlBusFactoryBea n#0,org.springframework.integration.config.Consume rEndpointFactoryBean#0,controlChannel,rabbitAdapte r,org.springframework.integration.channel.intercep tor.WireTap#0,fromRabbit,org.springframework.integ ration.stream.CharacterStreamWritingMessageHandler #0,consoleOut,loggingChannel,org.springframework.i ntegration.handler.LoggingHandler#0,loggingChannel .adapter,connectionFactory,amqpTemplate,org.spring framework.amqp.rabbit.core.RabbitAdmin#0,org.sprin gframework.amqp.core.Queue#0,org.springframework.a mqp.rabbit.config.BindingFactoryBean#0,org.springf ramework.amqp.core.DirectExchange#0,nullChannel,er rorChannel,_org.springframework.integration.errorL ogger,taskScheduler,org.springframework.integratio n.config.IdGeneratorConfigurer#0]; root of factory hierarchy
          2012-05-24 13:28:09,990 INFO | Thread-0 | o.s.scheduling.concurrent.ThreadPoolTaskScheduler | Shutting down ExecutorService 'taskScheduler'


          Is there anyway one could pause and resume the consumer ?

          Comment


          • #6
            Just the consumer.

            Looks like your entire context is shutting down. How are you running your test? You can't let your main() exit before you are ready.

            With the amqp example above, I can start and stop the adapter as many times as I like.

            Comment


            • #7
              Well I was just trying to stop it so I really had not written it as a Unit Test. It was written in my Main. I fail to understand what you mean by "before you are ready" ??

              How are you stopping and starting the adapter as many times as you want?? Will be of great help if you could share code like you did above...it really clarifies things ..

              Thanks.

              Comment


              • #8
                I just meant I can do this...

                Code:
                        controlBus.send(new GenericMessage<String>("@rabbitAdapter.start()"));
                
                        Thread.sleep(6000);
                
                        controlBus.send(new GenericMessage<String>("@rabbitAdapter.stop()"));
                
                        controlBus.send(new GenericMessage<String>("@rabbitAdapter.start()"));
                
                        Thread.sleep(6000);
                
                        controlBus.send(new GenericMessage<String>("@rabbitAdapter.stop()"));
                This is in a main() of the amqp sample. Can you show us your main() so we can see what you are doing/

                Comment


                • #9
                  I think I know what was going wrong ...

                  I had left context.registerShutdownHook() in the main... Removing that I get the same output as you were getting ...

                  Thanks for your help !

                  Really appreciate all the help... Spring Integration rocks ! \m/

                  Comment

                  Working...
                  X