Announcement Announcement Module
Collapse
No announcement yet.
Spring Integration Messages sent but not received from ActiveMQ Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Spring Integration Messages sent but not received from ActiveMQ

    I would like to create a simple test when messages are sent to / received from FORWARD queue. Upon received message a service should be invoked.

    Unfortunately, the messages are sent through
    Code:
    Message<?> message = MessageBuilder.withPayload(params); forwardGateway.sendPremMessage(message);
    but not received. This is probably a reason why the process() from the service has not been called.

    This is my config:

    Code:
     <!-- SENDER -->
    
    <si:gateway id="forwardGateway" 
        service-interface="com.ucware.ucpo.forward.jms.MessageGateway" 
        default-request-channel="inputChannel"/>
    
    <si:channel id="inputChannel"/>
    
        <!-- Subscriber to a channel -->
    <int-jms:outbound-channel-adapter 
    channel="inputChannel"
    connection-factory="connectionFactory" 
    destination-name="FORWARD" />
    
    
    <!-- RECEIVER -->
    <int:channel id="jmsInChannel"/>
    
     <!-- Subscriber to jmsInChannel. Used instead of inboud channel adapter -->
    <int-jms:message-driven-channel-adapter id="messageDrivenAdapter"
        channel="jmsInChannel" destination-name="FORWARD" connection-factory="connectionFactory"
        concurrent-consumers="1" auto-startup="true" acknowledge="auto"/>
    
     // This service should be invoked but it is not 
    <si:service-activator id ="activator" 
        input-channel="jmsInChannel"
        ref="messageService" 
        method="process"/>
    The service is defined as :
    Code:
    @MessageEndpoint
    public class MessageService   {
    
    public void process(Message<?> message )  
    }
    and the gateway as :
    Code:
    public interface MessageGateway  {
    
    @Gateway 
    public void sendPremMessage(Message<?> message);
    }

  • #2
    Your configuration works perfectly fine for me; I suggest you turn on TRACE level logging to see the message flow on both sides. (TRACE gives details of the message listener container activity on the receiving side).


    If you are using in-memory ActiveMQ you need to be careful about the timing - don't send a message until the container has started. Alternatively be sure to use a `CachingConnectionFactory`.
    Code:
    	<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
    		<property name="targetConnectionFactory">
    			<bean class="org.apache.activemq.ActiveMQConnectionFactory">
    				<property name="brokerURL" value="vm://localhost"/>
    			</bean>
    		</property>
    	</bean>
    That way, the broker will stay running after the send.
    Last edited by Gary Russell; Jul 18th, 2013, 08:05 AM.

    Comment


    • #3
      Originally posted by Gary Russell View Post
      Your configuration works perfectly fine for me; I suggest you turn on TRACE level logging to see the message flow on both sides. (TRACE gives details of the message listener container activity on the receiving side).
      I did as you suggested, added the cachedConnectionFactory and still nothing. However, when I added a messageListener :

      Code:
      <jms:listener-container connection-factory="connectionFactory"> 
              <jms:listener destination="FORWARD" ref="messageListener"/> 
          </jms:listener-container>
      the listener seems to receive the messages. Maybe the listener can be replaced by a service-activator?

      18.07.2013 15:52:16.036 [DirectChannel] [AbstractMessageChannel.java] [DEBUG] [main]
      postSend (sent=true) on channel 'inputChannel', message: [Payload={FORWARD_ALL=3000, LINE=601}
      [Headers={timestamp=1374155536031, id=c2895e24-2260-4af8-9b23-a226ae95c31f,
      source=PRESENCE_ENGINE,userid=[email protected], type=FORWARD}]

      18.07.2013 15:52:16.036 [ActiveMQMessageConsumer] [ActiveMQMessageConsumer.java] [TRACE] [org.springframework.jms.listener.DefaultMessageLis tenerContainer#1-1] ID:Lmiroslaw-PC-59127-1374155535776-1:1:1:1 **received message: MessageDispatch** {commandId = 0, responseRequired = false, consumerId = ID:Lmiroslaw-PC-59127-1374155535776-1:1:1:1, destination = queue://FORWARD, message = ActiveMQObjectMessage {commandId = 11, responseRequired = true, messageId = ID:Lmiroslaw-PC-59127-1374155535776-1:1:3:1:3, originalDestination = null, originalTransactionId = null, producerId = ID:Lmiroslaw-PC-59127-1374155535776-1:1:3:1, destination = queue://FORWARD, transactionId = null, expiration = 0, timestamp = 1374155536032, arrival = 0, brokerInTime = 1374155536013, brokerOutTime = 1374155536013, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@1bc4ec8, marshalledProperties = org.apache.activemq.util.ByteSequence@1d840d9, dataStructure = null, redeliveryCounter = 0, size = 0, properties = {timestamp=1374155536031, userid=[email protected], source=PRESENCE_ENGINE, type=FORWARD}, readOnlyProperties = true, readOnlyBody = true, droppable = false}, redeliveryCounter = 0}

      18.07.2013 15:52:16.037 [DefaultMessageListenerContainer] [AbstractPollingMessageListenerContainer.java] [DEBUG] [org.springframework.jms.listener.DefaultMessageLis tenerContainer#1-1] **Received message of** type [class org.apache.activemq.command.ActiveMQObjectMessage] from consumer [Cached JMS MessageConsumer: ActiveMQMessageConsumer { value=ID:Lmiroslaw-PC-59127-1374155535776-1:1:1:1, started=true }] of session [Cached JMS Session: ActiveMQSession {id=ID:Lmiroslaw-PC-59127-1374155535776-1:1:1,started=true}]
      Last edited by Gary Russell; Jul 18th, 2013, 09:22 AM.

      Comment


      • #4
        I do not know if that helps but when I turned out extractPayload="false" in int-jms:outbound-channel-adapter I can see the service is invoked during the 'shutdown' procedure:

        18.07.2013 16:12:01.071 [DefaultMessageListenerContainer] [DefaultMessageListenerContainer.java] [DEBUG] [Thread-2 ] Still waiting for shutdown of 1 message listener invokers
        Service invoked and received: [Payload=HELLO][Headers={timestamp=1374156721064, id=df32e4f0-acff-4d15-9dae-2c21a47e0c17, jms_timestamp=1374156721054, jms_redelivered=false, CTI_LOGIN=LOGIN, jms_messageId=ID:Lmiroslaw-PC-59366-1374156720827-1:1:3:1:2}]
        18.07.2013 16:12:01.072 [ServiceActivatingHandler] [AbstractReplyProducingMessageHandler.java] [DEBUG] [org.springframework.jms.listener.DefaultMessageLis tenerContainer#1-1] handler 'ServiceActivator for [org.springframework.integration.handler.MethodInvo kingMessageProcessor@ed9f47]' produced no reply for request Message: [Payload=HELLO][Headers={timestamp=1374156721064, id=df32e4f0-acff-4d15-9dae-2c21a47e0c17, jms_timestamp=1374156721054, jms_redelivered=false, CTI_LOGIN=LOGIN, jms_messageId=ID:Lmiroslaw-PC-59366-1374156720827-1:1:3:1:2}]
        Last edited by Gary Russell; Jul 18th, 2013, 09:21 AM.

        Comment


        • #5
          What does the code in process() look like?

          Please show ALL the log after the

          ...**Received message of**...

          Comment


          • #6
            Code:
            @MessageEndpoint
            public class MessageService   {
            
                private static Logger LOGGER = LoggerFactory.getLogger(MessageService.class);
                @ServiceActivator
            	public void process(Message<?> message ) {  
                        LOGGER.debug("PROCESS. message received: {}", message.toString());
            }
            Code:
            19.07.2013 07:20:54.117 [ActiveMQSession] [ActiveMQSession.java] [TRACE] [main      ] ID:Lmiroslaw-PC-59579-1374211253886-1:1:3 sending message: ActiveMQObjectMessage {commandId = 0, responseRequired = false, messageId = ID:Lmiroslaw-PC-59579-1374211253886-1:1:3:1:2, originalDestination = null, originalTransactionId = null, producerId = ID:Lmiroslaw-PC-59579-1374211253886-1:1:3:1, destination = queue://FORWARD, transactionId = null, expiration = 0, timestamp = 1374211254117, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@1a8d460, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = {timestamp=1374211254116, CTI_LOGIN=LOGIN}, readOnlyProperties = true, readOnlyBody = true, droppable = false}
            19.07.2013 07:20:54.119 [DirectChannel] [AbstractMessageChannel.java] [DEBUG] [main      ] postSend (sent=true) on channel 'inputChannel', message: [Payload=HELLO][Headers={timestamp=1374211254116, id=7d5a2e69-552e-4940-8c6f-77795ae4fbcd, CTI_LOGIN=LOGIN}]
            Sending message from CHANNEL HELLO
            19.07.2013 07:20:54.120 [DependencyInjectionTestExecutionListener] [DependencyInjectionTestExecutionListener.java] [DEBUG] [main      ] Performing dependency injection for test context [[TestContext@1f26605 testClass = SpringIntegrationTest, testInstance = com.ucware.ucpo.forward.jms.SpringIntegrationTest@bf7916, testMethod = [null], testException = [null], mergedContextConfiguration = [MergedContextConfiguration@10f11b8 testClass = SpringIntegrationTest, locations = '{classpath:test-integration.xml}', classes = '{}', contextInitializerClasses = '[]', activeProfiles = '{}', contextLoader = 'org.springframework.test.context.support.DelegatingSmartContextLoader', parent = [null]]]].
            19.07.2013 07:20:54.120 [CacheAwareContextLoaderDelegate] [CacheAwareContextLoaderDelegate.java] [DEBUG] [main      ] Retrieved ApplicationContext from cache with key [[MergedContextConfiguration@10f11b8 testClass = SpringIntegrationTest, locations = '{classpath:test-integration.xml}', classes = '{}', contextInitializerClasses = '[]', activeProfiles = '{}', contextLoader = 'org.springframework.test.context.support.DelegatingSmartContextLoader', parent = [null]]].
             
            19.07.2013 07:20:54.120 [PooledTaskRunner] [PooledTaskRunner.java] [TRACE] [ActiveMQ Session Task-1] Run task done: org.apache.activemq.ActiveMQSessionExecutor@1e2afb2
            19.07.2013 07:20:54.120 [DefaultListableBeanFactory] [AbstractBeanFactory.java] [DEBUG] [main      ] Returning cached instance of singleton bean 'inputChannel'
             
            19.07.2013 07:20:54.121 [DefaultListableBeanFactory] [AbstractBeanFactory.java] [DEBUG] [main      ] Returning cached instance of singleton bean 'forwardGateway'
            19.07.2013 07:20:54.121 [ActiveMQMessageConsumer] [ActiveMQMessageConsumer.java] [TRACE] [org.springframework.jms.listener.DefaultMessageListenerContainer#0-1] ID:Lmiroslaw-PC-59579-1374211253886-1:1:2:1 received message: MessageDispatch {commandId = 0, responseRequired = false, consumerId = ID:Lmiroslaw-PC-59579-1374211253886-1:1:2:1, destination = queue://FORWARD, message = ActiveMQObjectMessage {commandId = 10, responseRequired = true, messageId = ID:Lmiroslaw-PC-59579-1374211253886-1:1:3:1:2, originalDestination = null, originalTransactionId = null, producerId = ID:Lmiroslaw-PC-59579-1374211253886-1:1:3:1, destination = queue://FORWARD, transactionId = null, expiration = 0, timestamp = 1374211254117, arrival = 0, brokerInTime = 1374211254716, brokerOutTime = 1374211254716, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@1c09624, marshalledProperties = org.apache.activemq.util.ByteSequence@c1186f, dataStructure = null, redeliveryCounter = 0, size = 0, properties = {timestamp=1374211254116, CTI_LOGIN=LOGIN}, readOnlyProperties = true, readOnlyBody = true, droppable = false}, redeliveryCounter = 0}
            19.07.2013 07:20:54.121 [DirectChannel] [AbstractMessageChannel.java] [DEBUG] [main      ] preSend on channel 'inputChannel', message: [Payload={FORWARD_ALL=3000, LINE=601}][Headers={timestamp=1374211254121, id=01a33cbc-3365-46ce-9e7a-4e7884f0793f, source=PRESENCE_ENGINE, userid=[email protected], type=FORWARD}]
            19.07.2013 07:20:54.121 [JmsSendingMessageHandler] [AbstractMessageHandler.java] [DEBUG] [main      ] org.springframework.integration.jms.JmsSendingMessageHandler#0 received message: [Payload={FORWARD_ALL=3000, LINE=601}][Headers={timestamp=1374211254121, id=01a33cbc-3365-46ce-9e7a-4e7884f0793f, source=PRESENCE_ENGINE, userid=[email protected], type=FORWARD}] 
            19.07.2013 07:20:54.122 [DefaultMessageListenerContainer] [AbstractPollingMessageListenerContainer.java] [DEBUG] [org.springframework.jms.listener.DefaultMessageListenerContainer#0-1] Received message of type [class org.apache.activemq.command.ActiveMQObjectMessage] from consumer [Cached JMS MessageConsumer: ActiveMQMessageConsumer { value=ID:Lmiroslaw-PC-59579-1374211253886-1:1:2:1, started=true }] of session [Cached JMS Session: ActiveMQSession {id=ID:Lmiroslaw-PC-59579-1374211253886-1:1:2,started=true}]
            19.07.2013 07:20:54.122 [DynamicJmsTemplate] [JmsTemplate.java] [DEBUG] [main      ] Sending created message: ActiveMQObjectMessage {commandId = 0, responseRequired = false, messageId = null, originalDestination = null, originalTransactionId = null, producerId = null, destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@5bf624, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = {timestamp=1374211254121, source=PRESENCE_ENGINE, userid=[email protected], type=FORWARD}, readOnlyProperties = false, readOnlyBody = false, droppable = false}
            19.07.2013 07:20:54.123 [ActiveMQSession] [ActiveMQSession.java] [TRACE] [main      ] ID:Lmiroslaw-PC-59579-1374211253886-1:1:3 sending message: ActiveMQObjectMessage {commandId = 0, responseRequired = false, messageId = ID:Lmiroslaw-PC-59579-1374211253886-1:1:3:1:3, originalDestination = null, originalTransactionId = null, producerId = ID:Lmiroslaw-PC-59579-1374211253886-1:1:3:1, destination = queue://FORWARD, transactionId = null, expiration = 0, timestamp = 1374211254122, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@5bf624, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = {timestamp=1374211254121, source=PRESENCE_ENGINE, userid=[email protected], type=FORWARD}, readOnlyProperties = true, readOnlyBody = true, droppable = false}
            2013-07-19 07:20:54,124 INFO [com.ucware.ucpo.forward.mess.MyMessageListener] - <Consumed message: ActiveMQObjectMessage {commandId = 10, responseRequired = true, messageId = ID:Lmiroslaw-PC-59579-1374211253886-1:1:3:1:2, originalDestination = null, originalTransactionId = null, producerId = ID:Lmiroslaw-PC-59579-1374211253886-1:1:3:1, destination = queue://FORWARD, transactionId = null, expiration = 0, timestamp = 1374211254117, arrival = 0, brokerInTime = 1374211254716, brokerOutTime = 1374211254716, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@1c09624, marshalledProperties = org.apache.activemq.util.ByteSequence@c1186f, dataStructure = null, redeliveryCounter = 0, size = 0, properties = {timestamp=1374211254116, CTI_LOGIN=LOGIN}, readOnlyProperties = true, readOnlyBody = true, droppable = false}>
            2013-07-19 07:20:54,124 WARN [com.ucware.ucpo.forward.mess.MyMessageListener] - <Unknown message received. >

            Comment


            • #7
              That is very strange; there is no Spring Integration activity in that log (on the receiving side).

              I would also have expected more TRACE activity - did you edit the log?

              Perhaps there is some delay converting (deserializing) the object payload (its an ActiveMQObjectMessage).

              The next step would be to take a thread dump to find out what this thread

              org.springframework.jms.listener.DefaultMessageLis tenerContainer#0-1

              is doing. You can use jstack or visualvm to get a thread dump.

              Comment

              Working...
              X