Announcement Announcement Module
Collapse
No announcement yet.
Start / Stop a JMS Bridge with Control Bus Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Start / Stop a JMS Bridge with Control Bus

    Hi
    I'm a bit confused how to start/stop a bridge between 2 JMS adapters (inbound & outbound).

    This is the config I'm using :


    Code:
    	<task:scheduler id="taskScheduler" pool-size="10" />
    
    	<poller id="defaultPoller" fixed-delay="1000"
    		max-messages-per-poll="50" receive-timeout="60000" default="true">
    		<transactional />
    	</poller>
    
    	<!-- Inbound / Outbound channel adatpers -->
    	<jms:inbound-channel-adapter id="inboundAdapter" destination="inputQueue" />
    	<jms:outbound-channel-adapter id="outboundAdapter" destination="outputQueue" />
    
    	<!-- JMS bridges -->
    	<bridge id="bridge" input-channel="inboundAdapter" output-channel="outboundAdapter" />
    
    	<!-- Control channel -->
    	<channel id="controlChannel" />
    	<control-bus input-channel="controlChannel" />
    And

    Code:
    ApplicationContext ac = new ClassPathXmlApplicationContext("spring-context.xml");
    		MessageChannel controlChannel = ac.getBean("controlChannel", MessageChannel.class);
    	
    		Thread.sleep(2000);
    		System.out.println("Stopping bridge....");
    		controlChannel.send(new GenericMessage<String>("@inboundAdapter.stop()"));
    
    		Thread.sleep(5000);
    		System.out.println("Starting bridge....");
    		controlChannel.send(new GenericMessage<String>("@inboundAdapter.start()"));

    But this does not work.

    Exception in thread "main" org.springframework.integration.MessageHandlingExc eption: error occurred in message handler [ServiceActivator for [org.springframework.integration.handler.Expression CommandMessageProcessor@3eb52a28]]
    at org.springframework.integration.handler.AbstractMe ssageHandler.handleMessage(AbstractMessageHandler. java:84)
    at org.springframework.integration.dispatcher.Unicast ingDispatcher.doDispatch(UnicastingDispatcher.java :110)
    at org.springframework.integration.dispatcher.Unicast ingDispatcher.dispatch(UnicastingDispatcher.java:9 7)
    at org.springframework.integration.channel.AbstractSu bscribableChannel.doSend(AbstractSubscribableChann el.java:44)
    at org.springframework.integration.channel.AbstractMe ssageChannel.send(AbstractMessageChannel.java:157)
    at org.springframework.integration.channel.AbstractMe ssageChannel.send(AbstractMessageChannel.java:128)
    at com.sfr.bps.bridges.Main.main(Main.java:17)
    Caused by: org.springframework.expression.EvaluationException : The method 'stop' is not supported by this command processor. If using the Control Bus, consider adding @ManagedOperation or @ManagedAttribute.
    at org.springframework.integration.handler.Expression CommandMessageProcessor$ExpressionCommandMethodRes olver.validateMethod(ExpressionCommandMessageProce ssor.java:100)
    at org.springframework.integration.handler.Expression CommandMessageProcessor$ExpressionCommandMethodRes olver.resolve(ExpressionCommandMessageProcessor.ja va:81)
    at org.springframework.expression.spel.ast.MethodRefe rence.findAccessorForMethod(MethodReference.java:1 74)
    at org.springframework.expression.spel.ast.MethodRefe rence.getValueInternal(MethodReference.java:107)
    at org.springframework.expression.spel.ast.CompoundEx pression.getValueInternal(CompoundExpression.java: 57)
    at org.springframework.expression.spel.ast.SpelNodeIm pl.getTypedValue(SpelNodeImpl.java:102)
    at org.springframework.expression.spel.standard.SpelE xpression.getValue(SpelExpression.java:102)
    at org.springframework.integration.util.AbstractExpre ssionEvaluator.evaluateExpression(AbstractExpressi onEvaluator.java:98)
    at org.springframework.integration.util.AbstractExpre ssionEvaluator.evaluateExpression(AbstractExpressi onEvaluator.java:94)
    at org.springframework.integration.handler.Expression CommandMessageProcessor.processMessage(ExpressionC ommandMessageProcessor.java:63)
    at org.springframework.integration.handler.ServiceAct ivatingHandler.handleRequestMessage(ServiceActivat ingHandler.java:64)
    at org.springframework.integration.handler.AbstractRe plyProducingMessageHandler.handleMessageInternal(A bstractReplyProducingMessageHandler.java:98)
    at org.springframework.integration.handler.AbstractMe ssageHandler.handleMessage(AbstractMessageHandler. java:78)
    ... 6 more


    I've find out a little workaround which is to get the inboundChannel.adapter bean, and directly call stop() and start() method, but does not use the control bus...
    Code:
    AbstractEndpoint adapter = ac.getBean("inboundAdapter.adapter", AbstractEndpoint.class);
    adapter.stop();
    But this logs an InteruptedException.

    Any ideas ? What am I doing wrong ?
    Thanks a lot for your help.
    Last edited by osgiliath; May 30th, 2011, 08:57 AM. Reason: precisions

  • #2
    If you use the adapter id to name the channel (instead of explicitly configuring a 'channel' attribute), then the adapter gets a bean name of <channel>.adapter; in your case it would be '@inboundadapter.adapter.stop()' (which you figured out in your workaround). @inboundAdapter.stop() is trying to 'stop' a channel.

    You can explicitly name the channel to, say, 'inbound' instead of relying of the automated naming and then you can stop the adapter using its id.

    Comment


    • #3
      Hi,
      Thanks for the answer.

      It's working, I tried with
      controlChannel.send(new GenericMessage<String>("@'inboundAdapter.adapter'. stop()"));
      controlChannel.send(new GenericMessage<String>("@'inboundAdapter.adapter'. start()"));
      but I still have some exceptions logs which are not really clean I think

      INFO SourcePollingChannelAdapter - stopped inboundAdapter.adapter
      ERROR LoggingHandler - org.springframework.jms.UncategorizedJmsException: Uncategorized exception occured during JMS processing; nested exception is javax.jms.JMSException: java.lang.InterruptedException
      at org.springframework.jms.support.JmsUtils.convertJm sAccessException(JmsUtils.java:316)
      at org.springframework.jms.support.JmsAccessor.conver tJmsAccessException(JmsAccessor.java:168)
      at org.springframework.jms.core.JmsTemplate.execute(J msTemplate.java:469)
      at org.springframework.jms.core.JmsTemplate.receiveSe lected(JmsTemplate.java:695)
      at org.springframework.integration.jms.JmsDestination PollingSource.doReceiveJmsMessage(JmsDestinationPo llingSource.java:115)
      at org.springframework.integration.jms.JmsDestination PollingSource.receive(JmsDestinationPollingSource. java:93)
      at org.springframework.integration.endpoint.SourcePol lingChannelAdapter.doPoll(SourcePollingChannelAdap ter.java:89)
      at org.springframework.integration.endpoint.AbstractP ollingEndpoint$1.call(AbstractPollingEndpoint.java :145)
      at org.springframework.integration.endpoint.AbstractP ollingEndpoint$1.call(AbstractPollingEndpoint.java :143)
      at sun.reflect.NativeMethodAccessorImpl.invoke0(Nativ e Method)
      at sun.reflect.NativeMethodAccessorImpl.invoke(Native MethodAccessorImpl.java:39)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(De legatingMethodAccessorImpl.java:25)
      at java.lang.reflect.Method.invoke(Method.java:597)
      at org.springframework.aop.support.AopUtils.invokeJoi npointUsingReflection(AopUtils.java:309)
      at org.springframework.aop.framework.ReflectiveMethod Invocation.invokeJoinpoint(ReflectiveMethodInvocat ion.java:183)
      at org.springframework.aop.framework.ReflectiveMethod Invocation.proceed(ReflectiveMethodInvocation.java :150)
      at org.springframework.transaction.interceptor.Transa ctionInterceptor.invoke(TransactionInterceptor.jav a:110)
      at org.springframework.aop.framework.ReflectiveMethod Invocation.proceed(ReflectiveMethodInvocation.java :172)
      at org.springframework.aop.framework.JdkDynamicAopPro xy.invoke(JdkDynamicAopProxy.java:202)
      at $Proxy4.call(Unknown Source)
      at org.springframework.integration.endpoint.AbstractP ollingEndpoint$Poller$1.run(AbstractPollingEndpoin t.java:206)
      at org.springframework.integration.util.ErrorHandling TaskExecutor$1.run(ErrorHandlingTaskExecutor.java: 52)
      at org.springframework.core.task.SyncTaskExecutor.exe cute(SyncTaskExecutor.java:48)
      at org.springframework.integration.util.ErrorHandling TaskExecutor.execute(ErrorHandlingTaskExecutor.jav a:49)
      at org.springframework.integration.endpoint.AbstractP ollingEndpoint$Poller.run(AbstractPollingEndpoint. java:201)
      at org.springframework.scheduling.support.DelegatingE rrorHandlingRunnable.run(DelegatingErrorHandlingRu nnable.java:51)
      at org.springframework.scheduling.concurrent.Reschedu lingRunnable.run(ReschedulingRunnable.java:81)
      at java.util.concurrent.Executors$RunnableAdapter.cal l(Executors.java:441)
      at java.util.concurrent.FutureTask$Sync.innerRun(Futu reTask.java:303)
      at java.util.concurrent.FutureTask.run(FutureTask.jav a:138)
      at java.util.concurrent.ScheduledThreadPoolExecutor$S cheduledFutureTask.access$301(ScheduledThreadPoolE xecutor.java:98)
      at java.util.concurrent.ScheduledThreadPoolExecutor$S cheduledFutureTask.run(ScheduledThreadPoolExecutor .java:206)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run Task(ThreadPoolExecutor.java:886)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:908)
      at java.lang.Thread.run(Thread.java:680)
      Caused by: javax.jms.JMSException: java.lang.InterruptedException
      at org.apache.activemq.util.JMSExceptionSupport.creat e(JMSExceptionSupport.java:62)
      at org.apache.activemq.ActiveMQMessageConsumer.dequeu e(ActiveMQMessageConsumer.java:483)
      at org.apache.activemq.ActiveMQMessageConsumer.receiv e(ActiveMQMessageConsumer.java:504)
      at org.springframework.jms.core.JmsTemplate.doReceive (JmsTemplate.java:777)
      at org.springframework.jms.core.JmsTemplate.doReceive (JmsTemplate.java:741)
      at org.springframework.jms.core.JmsTemplate.doReceive (JmsTemplate.java:722)
      at org.springframework.jms.core.JmsTemplate$9.doInJms (JmsTemplate.java:697)
      at org.springframework.jms.core.JmsTemplate$9.doInJms (JmsTemplate.java:1)
      at org.springframework.jms.core.JmsTemplate.execute(J msTemplate.java:466)
      ... 32 more
      Caused by: java.lang.InterruptedException
      at java.lang.Object.wait(Native Method)
      at java.lang.Object.wait(Object.java:485)
      at org.apache.activemq.SimplePriorityMessageDispatchC hannel.dequeue(SimplePriorityMessageDispatchChanne l.java:87)
      at org.apache.activemq.ActiveMQMessageConsumer.dequeu e(ActiveMQMessageConsumer.java:452)
      ... 39 more

      Starting bridge....
      INFO SourcePollingChannelAdapter - started inboundAdapter.adapter
      It is the expected behaviour ?
      Thanks.

      Comment


      • #4
        Yes, given that you are using a polling channel adapter, we have to interrupt the thread that's hanging on the receive; I suppose we could consider suppressing the log message if the error is a result of stopping the adapter. Consider entering an 'Improvement' JIRA issue if you feel strongly about it.

        Also, consider using a message-driven-channel-adapter instead; in that case, I believe you will only get a log message if you happen to stop the adapter just as a message is being received, forcing it to be rolled-back onto the queue.

        Comment


        • #5
          OK, but I'll keep the poller since I need the max-message-per-poll and fixed-delay properties in order to have a flow controler.
          Thanks.

          Comment

          Working...
          X