Announcement Announcement Module
Collapse
No announcement yet.
Multiple receive on JMS queues Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Multiple receive on JMS queues

    Hi. we use Spring Integration with ActiveMQ to handle task and task flow in our application.
    We are having a problem that the task bound to queues are being called multiple times.

    To explain, here is the flow in the config:
    The application receive an online order for a photobook (an XML file containing the data for it) and creates the photobook pages from it.

    we have define channels, service-activator and beans like this:
    Code:
    	<sijms:channel id="retrieveImagesChannel" queue-name="retrieveImagesQueue" error-handler="messagePublishError" />
    	<sijms:channel id="createPrintPagesChannel" queue-name="createPrintPagesQueue" error-handler="messagePublishError" />
    
    	<si:service-activator input-channel="retrieveImagesChannel" ref="retrieveImages" method="executeTask" output-channel="transformImagesChannel" />
    	<si:service-activator input-channel="createPrintPagesChannel" ref="createPrintPages" method="executeTask" output-channel="createPagesChannel" />
    
    	<bean id="retrieveImages" class="com.tvi.fusion.task.impl.RetrieveImages" />
    	<bean id="createPrintPages" class="com.tvi.fusion.task.impl.CreatePrintPages" />
    We also have this:

    Code:
    	<si:poller default="true" >
    		<si:interval-trigger interval="100"/>
    	</si:poller>
    And an errorHandler:

    Code:
    	<!-- error message -->
    	<bean id="messagePublishError" class="org.springframework.integration.channel.MessagePublishingErrorHandler">
    	        <property name="defaultErrorChannel" ref="jmsErrorChannel" />
    	</bean>
    (There's a bit more than this, but it does not concern my problem)

    This flow is working, everything can go from start to finish, but we are having a big problem:
    The method executeTask in the RetrieveImages class is being called multiple times.
    That's really not good. For each order, there should be one message put in the queue and one call to the method.

    I have no idea why.

    Any help on this would be appreciated.
    NOTE: we are using v 2.0.0 M5.

    Llaurick

  • #2
    Are you monitoring the underlying ActiveMQ Queues, and if so are you seeing the expected number of Messages being enqueued and dequeued?

    Comment


    • #3
      Originally posted by Mark Fisher View Post
      Are you monitoring the underlying ActiveMQ Queues, and if so are you seeing the expected number of Messages being enqueued and dequeued?
      No. I don't really know how to do it with Spring integration and ActiveMQ

      But I forgot to mention also a very weird thing. In the stack trace of the call to the method, I see this:

      at com.tvi.fusion.task.ProjectTask.executeTask(Projec tTask.java:52)
      at com.tvi.fusion.task.impl.RetrieveImages.route(Retr ieveImages.java:48)
      at com.tvi.fusion.task.impl.RetrieveImages$$FastClass ByCGLIB$$e1f3a28f.invoke(<generated>)

      The problem is that the RetrieveImages class does NOT have a route method, and never had.
      Also, when I look at the whole call trace, I don't see any ActiveMQ in it. Is this normal?

      The trace I see: (we get this because we validate the status before handling, so the first message goes through, and the others get an exception.)
      Code:
      15:47:23,476 TRACE: NullableType::binding 'com.tvi.fusion.OrderException: The required status is not valid CREATED != ERROR
              at com.tvi.fusion.task.ProjectTask.validateStatus(ProjectTask.java:96)
              at com.tvi.fusion.task.ProjectTask.executeTask(ProjectTask.java:52)
              at com.tvi.fusion.task.impl.RetrieveImages.route(RetrieveImages.java:48)
              at com.tvi.fusion.task.impl.RetrieveImages$$FastClassByCGLIB$$e1f3a28f.invoke(<generated>)
              at net.sf.cglib.proxy.MethodProxy.invoke(MethodProxy.java:191)
              at org.springframework.aop.framework.Cglib2AopProxy$CglibMethodInvocation.invokeJoinpoint(Cglib2AopProxy.java:688)
              at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)
              at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:110)
              at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
              at org.springframework.aop.framework.Cglib2AopProxy$DynamicAdvisedInterceptor.intercept(Cglib2AopProxy.java:621)
              at com.tvi.fusion.task.impl.RetrieveImages$$EnhancerByCGLIB$$2e5bc5bf.route(<generated>)
              at sun.reflect.GeneratedMethodAccessor79.invoke(Unknown Source)
              at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
              at java.lang.reflect.Method.invoke(Method.java:597)
              at org.springframework.expression.spel.support.ReflectiveMethodExecutor.execute(ReflectiveMethodExecutor.java:58)
              at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:77)
              at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:57)
              at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:102)
              at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:102)
              at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:100)
              at org.springframework.integration.util.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:221)
              at org.springframework.integration.util.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:120)
              at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:73)
              at org.springframework.integration.router.AbstractMessageProcessingRouter.getChannelIndicatorList(AbstractMessageProcessingRouter.java:58)
              at org.springframework.integration.router.AbstractChannelNameResolvingMessageRouter.determineTargetChannels(AbstractChannelNameResolvingMessageRouter.java:117)
              at org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:114)
              at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:77)
              at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:110)
              at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97)
              at org.springframework.integration.jms.JmsDestinationBackedMessageChannel.onMessage(JmsDestinationBackedMessageChannel.java:212)
              at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:560)
              at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:498)
              at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:467)
              at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:323)
              at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:261)
              at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1056)
              at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1048)
              at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:947)
              at java.lang.Thread.run(Thread.java:619)

      Comment

      Working...
      X