Announcement Announcement Module
Collapse
No announcement yet.
Not able tor restart AmqpInboundChannelAdapter programatically Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Not able tor restart AmqpInboundChannelAdapter programatically

    I have a scenario where the queues are dynamically created by the Producer module and the messages in the dynamically created queues need to be consumed by the Consumer.
    Below is rabbitBeansContext.xml

    HTML Code:
    <beans:bean id="servicerHighPriorityListener" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
       <beans:property name="queueNames" value="#{ servicerService.highPriorityQueueNames }"></beans:property>
       <beans:property name="connectionFactory" ref="amqpConn" />
       <beans:property name="taskExecutor" ref="highPriorityThreadPoolTaskExecutor" />
       <beans:property name="autoStartup" value="true" />		
       <beans:property name="acknowledgeMode" value="AUTO" />		
    </beans:bean>
    
    <int-amqp:inbound-channel-adapter id="servicerHighPriorityConsumerAdapter" 
    		channel="servicerHighPriorityConsumerChannel" listener-container="servicerHighPriorityListener"/
    <int:channel id="servicerHighPriorityConsumerChannel"></int:channel>
    <int:service-activator input-channel="servicerHighPriorityConsumerChannel"
    		ref="loanFilesCopier" method="copyImages" output-channel="highPriorityChecksumOutboundChannel"></int:service-activator>
    <int:channel id="highPriorityChecksumOutboundChannel"></int:channel>
    <int-amqp:outbound-channel-adapter
    		channel="highPriorityChecksumOutboundChannel" routing-key="checksum.high.key"
    		amqp-template="amqpTemplate" exchange-name="cis.Checksum.Exchange" id="highPriorityChecksumOutboundAdapter"/>
    The queue names are dynamically assigned to the MessageListener when the server is started.
    But after the server is started if any new queue is dynamically created then it is not recognized by the adapter.
    So I am trying to restart the AmqpInboundChannelAdapter whenever a new queue is created dynamically so that the consumer will consume the messages from this new queue.
    Below is the code that i have written for restarting the Adapter

    Code:
    AmqpInboundChannelAdapter adapter = (AmqpInboundChannelAdapter) applicationContext.getBean(adapterName);
    ConnectionFactory connectionFactory = (ConnectionFactory) applicationContext.getBean("amqpConn");
    adapter.stop();
    Thread.sleep(1000);
    SimpleMessageListenerContainer listener=null;
    listener = (SimpleMessageListenerContainer) applicationContext.getBean("servicerHighPriorityListener");
    listener.setQueueNames(servicerService.getHighPriorityQueueNames());
    listener.setAutoStartup(true);
    listener.setAcknowledgeMode(AcknowledgeMode.AUTO);
    listener.setConnectionFactory(connectionFactory);
    ThreadPoolTaskExecutor executor = (ThreadPoolTaskExecutor) applicationContext.getBean("highPriorityThreadPoolTaskExecutor");
    listener.setTaskExecutor(executor);
    listener.setMessageListener(adapter);
    listener.afterPropertiesSet();
    adapter.start();
    listener.start();
    But the above code is not restarting the adapter and the messages from the newly created queue is not consumed.
    But if i restart the tomcat server then the message is getting consumed

    Can anyone please help me.
    Thanks in advance..

  • #2
    This works fine for me...

    Code:
    		context.getBean(AmqpInboundChannelAdapter.class).stop();
    		context.getBean(SimpleMessageListenerContainer.class).setQueueNames("si.test.queue", "test.queue.2");
    		context.getBean(AmqpInboundChannelAdapter.class).start();
    Look for messages like this in the log (with DEBUG)...

    Code:
    ...BlockingQueueConsumer] Started on queue 'si.test.queue': Consumer...
    
    ...
    
    
    ...BlockingQueueConsumer] Started on queue 'si.test.queue': Consumer...
    ...BlockingQueueConsumer] Started on queue 'test.queue.2': Consumer...

    Comment


    • #3
      Thanks Gary for the reply. I tried what you suggested but I am getting the below exception after the changes

      Code:
      ERROR SimpleAsyncTaskExecutor-1 com.asps.cis.common.adapter.AdapterRestarter - Failed while restarting the adapter servicerHighPriorityConsumerAdapter .
      org.springframework.amqp.AmqpIllegalStateException: Fatal exception on listener startup
      	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doStart(SimpleMessageListenerContainer.java:309)
      	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:358)
      	at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.doStart(AmqpInboundChannelAdapter.java:82)
      	at org.springframework.integration.endpoint.AbstractEndpoint.start(AbstractEndpoint.java:84)
      	at com.asps.cis.common.adapter.AdapterRestarter.restartAdapter(AdapterRestarter.java:96)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
      	at java.lang.reflect.Method.invoke(Method.java:597)
      	at org.springframework.expression.spel.support.ReflectiveMethodExecutor.execute(ReflectiveMethodExecutor.java:69)
      	at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:109)
      	at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:57)
      	at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:102)
      	at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:102)
      	at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:126)
      	at org.springframework.integration.util.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:225)
      	at org.springframework.integration.util.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:125)
      	at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:73)
      	at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:64)
      	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:97)
      	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
      	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:114)
      	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:101)
      	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:61)
      	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:157)
      	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:128)
      	at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:288)
      	at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:149)
      	at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:92)
      	at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$200(AmqpInboundChannelAdapter.java:39)
      	at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$1.onMessage(AmqpInboundChannelAdapter.java:73)
      	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:527)
      	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:472)
      	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:56)
      	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:103)
      	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:560)
      	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:452)
      	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:436)
      	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:420)
      	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$200(SimpleMessageListenerContainer.java:56)
      	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:505)
      	at java.lang.Thread.run(Thread.java:662)
      Caused by: org.springframework.amqp.rabbit.listener.FatalListenerStartupException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.
      	at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:192)
      	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:489)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
      	... 1 more
      Caused by: java.io.IOException
      	at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:107)
      	at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:131)
      	at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:643)
      	at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:59)
      	at sun.reflect.GeneratedMethodAccessor67.invoke(Unknown Source)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
      	at java.lang.reflect.Method.invoke(Method.java:597)
      	at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:298)
      	at $Proxy106.queueDeclarePassive(Unknown Source)
      	at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:188)
      	... 4 more
      Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; reason: {#method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'cis.SAXON.High,cis.JPMC.High,cis.TBW.High,cis.LITTON.High,cis.PRISM.High' in vhost '/', class-id=50, method-id=10),null,""}
      	at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
      	at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
      	at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:328)
      	at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:201)
      	at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:125)
      	... 12 more
      Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; reason: {#method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'cis.SAXON.High,cis.JPMC.High,cis.TBW.High,cis.LITTON.High,cis.PRISM.High' in vhost '/', class-id=50, method-id=10),null,""}
      	at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:384)
      	at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:235)
      	at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:151)
      	at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:96)
      	at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:441)

      Comment


      • #4
        Looks like you are supplying a comma-delimited list instead of an array (or varargs) ...

        Code:
        Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; reason: {#method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'cis.SAXON.High,cis.JPMC.High,cis.TBW.High,cis.LITTON.High,cis.PRISM.High' in vhost '/', class-id=50, method-id=10),null,""}

        Comment


        • #5
          Thanks Gary for the reply.

          I resolved the above issue but still i am getting some error while restarting the adapter, the processor module is still not able to consume messages after the adapter is restarted, it is not able to pick the queue names dynamically after adapter restart.


          Below is the code that i have written

          Code:
          AmqpInboundChannelAdapter adapter = (AmqpInboundChannelAdapter) applicationContext.getBean(adapterName);
          adapter.stop();
          Thread.sleep(1000);
          SimpleMessageListenerContainer listener= (SimpleMessageListenerContainer) applicationContext.getBean("servicerHighPriorityListener");
          listener.setQueueNames("cis.SAXON.High","cis.JPMC.High","cis.TBW.High","cis.LITTON.High","cis.PRISM.High","cis.TEST.High");
          listener.setAutoStartup(true);
          listener.initialize();	
          adapter.start();
          Code:
          15:25:19.660 [highPriorityChecksumThreadPoolTaskExecutor-2] WARN  o.s.a.r.l.Simp
          leMessageListenerContainer - Consumer raised exception, processing can restart i
          f the connection factory supports it
          com.rabbitmq.client.ShutdownSignalException: connection error; reason: #method<c
          onnection.close>(reply-code=541, reply-text=Internal error in Consumer null (amq
          .ctag-g_HSG8z6OfQ3QlqVWNzhd3) method handleCancelOk for channel AMQChannel(amqp:
          //[email protected]:5672/,7), class-id=0, method-id=0)
                  at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:58
          9) ~[amqp-client-2.5.1.jar:na]
                  at com.rabbitmq.client.impl.AMQConnection.close(AMQConnection.java:695)
          ~[amqp-client-2.5.1.jar:na]
                  at com.rabbitmq.client.impl.AMQConnection.close(AMQConnection.java:673)
          ~[amqp-client-2.5.1.jar:na]
                  at com.rabbitmq.client.impl.DefaultExceptionHandler.handleChannelKiller(
          DefaultExceptionHandler.java:67) ~[amqp-client-2.5.1.jar:na]
                  at com.rabbitmq.client.impl.DefaultExceptionHandler.handleConsumerExcept
          ion(DefaultExceptionHandler.java:52) ~[amqp-client-2.5.1.jar:na]
                  at com.rabbitmq.client.impl.ChannelN$2.transformReply(ChannelN.java:824)
           ~[amqp-client-2.5.1.jar:na]
                  at com.rabbitmq.client.impl.ChannelN$2.transformReply(ChannelN.java:814)
           ~[amqp-client-2.5.1.jar:na]
                  at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.handleCom
          mand(AMQChannel.java:319) ~[amqp-client-2.5.1.jar:na]
                  at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQC
          hannel.java:154) ~[amqp-client-2.5.1.jar:na]
                  at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:96) ~
          [amqp-client-2.5.1.jar:na]
                  at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.jav
          a:441) ~[amqp-client-2.5.1.jar:na]
          Caused by: java.lang.NullPointerException: null
                  at com.rabbitmq.client.impl.ChannelN$2.transformReply(ChannelN.java:822)
           ~[amqp-client-2.5.1.jar:na]
                  ... 5 common frames omitted
          15:25:19.660 [highPriorityChecksumThreadPoolTaskExecutor-2] INFO  o.s.a.r.l.Simp
          leMessageListenerContainer - Restarting Consumer: tag=[amq.ctag-gpIbKlBZ3IcWaIgb
          Ix-VVZ], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:567
          2/,1), acknowledgeMode=AUTO local queue size=0
          15:25:19.660 [highPriorityChecksumThreadPoolTaskExecutor-2] DEBUG o.s.a.r.l.Bloc
          kingQueueConsumer - Closing Rabbit Channel: Cached Rabbit Channel: AMQChannel(am
          qp://[email protected]:5672/,1)
          15:25:19.660 [errorThreadPoolTaskExecutor-1] WARN  o.s.a.r.l.SimpleMessageListen
          erContainer - Consumer raised exception, processing can restart if the connectio
          n factory supports it
          com.rabbitmq.client.ShutdownSignalException: connection error; reason: #method<c
          onnection.close>(reply-code=541, reply-text=Internal error in Consumer null (amq
          .ctag-g_HSG8z6OfQ3QlqVWNzhd3) method handleCancelOk for channel AMQChannel(amqp:
          //[email protected]:5672/,7), class-id=0, method-id=0)
                  at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:58
          9) ~[amqp-client-2.5.1.jar:na]
                  at com.rabbitmq.client.impl.AMQConnection.close(AMQConnection.java:695)
          ~[amqp-client-2.5.1.jar:na]
                  at com.rabbitmq.client.impl.AMQConnection.close(AMQConnection.java:673)
          ~[amqp-client-2.5.1.jar:na]
                  at com.rabbitmq.client.impl.DefaultExceptionHandler.handleChannelKiller(
          DefaultExceptionHandler.java:67) ~[amqp-client-2.5.1.jar:na]
                  at com.rabbitmq.client.impl.DefaultExceptionHandler.handleConsumerExcept
          ion(DefaultExceptionHandler.java:52) ~[amqp-client-2.5.1.jar:na]
                  at com.rabbitmq.client.impl.ChannelN$2.transformReply(ChannelN.java:824)
           ~[amqp-client-2.5.1.jar:na]
                  at com.rabbitmq.client.impl.ChannelN$2.transformReply(ChannelN.java:814)
           ~[amqp-client-2.5.1.jar:na]
                  at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.handleCom
          mand(AMQChannel.java:319) ~[amqp-client-2.5.1.jar:na]
                  at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQC
          hannel.java:154) ~[amqp-client-2.5.1.jar:na]
                  at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:96) ~
          [amqp-client-2.5.1.jar:na]
                  at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.jav
          a:441) ~[amqp-client-2.5.1.jar:na]
          Caused by: java.lang.NullPointerException: null
                  at com.rabbitmq.client.impl.ChannelN$2.transformReply(ChannelN.java:822)
           ~[amqp-client-2.5.1.jar:na]
                  ... 5 common frames omitted
          15:25:19.660 [normalPriorityChecksumThreadPoolTaskExecutor-2] WARN  o.s.a.r.l.Si
          mpleMessageListenerContainer - Consumer raised exception, processing can restart
           if the connection factory supports it
          com.rabbitmq.client.ShutdownSignalException: connection error; reason: #method<c
          onnection.close>(reply-code=541, reply-text=Internal error in Consumer null (amq
          .ctag-g_HSG8z6OfQ3QlqVWNzhd3) method handleCancelOk for channel AMQChannel(amqp:
          //[email protected]:5672/,7), class-id=0, method-id=0)
                  at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:58
          9) ~[amqp-client-2.5.1.jar:na]
                  at com.rabbitmq.client.impl.AMQConnection.close(AMQConnection.java:695)
          ~[amqp-client-2.5.1.jar:na]
                  at com.rabbitmq.client.impl.AMQConnection.close(AMQConnection.java:673)
          ~[amqp-client-2.5.1.jar:na]
                  at com.rabbitmq.client.impl.DefaultExceptionHandler.handleChannelKiller(
          DefaultExceptionHandler.java:67) ~[amqp-client-2.5.1.jar:na]
                  at com.rabbitmq.client.impl.DefaultExceptionHandler.handleConsumerExcept
          ion(DefaultExceptionHandler.java:52) ~[amqp-client-2.5.1.jar:na]
                  at com.rabbitmq.client.impl.ChannelN$2.transformReply(ChannelN.java:824)
           ~[amqp-client-2.5.1.jar:na]
                  at com.rabbitmq.client.impl.ChannelN$2.transformReply(ChannelN.java:814)
           ~[amqp-client-2.5.1.jar:na]
                  at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.handleCom
          mand(AMQChannel.java:319) ~[amqp-client-2.5.1.jar:na]
                  at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQC
          hannel.java:154) ~[amqp-client-2.5.1.jar:na]
                  at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:96) ~
          [amqp-client-2.5.1.jar:na]
                  at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.jav
          a:441) ~[amqp-client-2.5.1.jar:na]
          Caused by: java.lang.NullPointerException: null
                  at com.rabbitmq.client.impl.ChannelN$2.transformReply(ChannelN.java:822)
           ~[amqp-client-2.5.1.jar:na]
                  ... 5 common frames omitted
          15:25:19.660 [errorThreadPoolTaskExecutor-1] INFO  o.s.a.r.l.SimpleMessageListen
          erContainer - Restarting Consumer: tag=[amq.ctag-An8l7NlH4ckcppOR4O6SGi], channe
          l=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,5), acknowle
          dgeMode=AUTO local queue size=0
          15:25:19.660 [normalPriorityChecksumThreadPoolTaskExecutor-2] INFO  o.s.a.r.l.Si
          mpleMessageListenerContainer - Restarting Consumer: tag=[amq.ctag-wuWLKksulBf30Y
          mE6yuzaH], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5
          672/,4), acknowledgeMode=AUTO local queue size=0
          15:25:19.660 [errorThreadPoolTaskExecutor-1] DEBUG o.s.a.r.l.BlockingQueueConsum
          er - Closing Rabbit Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@172.
          18.98.68:5672/,5)
          15:25:19.660 [normalPriorityChecksumThreadPoolTaskExecutor-2] DEBUG o.s.a.r.l.Bl
          ockingQueueConsumer - Closing Rabbit Channel: Cached Rabbit Channel: AMQChannel(
          amqp://[email protected]:5672/,4)

          Comment


          • #6
            I am not sure if this is the problem but you shouldn't call initialize() on the container.

            Just stop() the adapter, change the queues and start() the adapter as I showed above.

            Comment

            Working...
            X