Announcement Announcement Module
Collapse
No announcement yet.
reply-channel for tcp-inbound-gateway Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • reply-channel for tcp-inbound-gateway

    Hi,

    I defined a tcp-inbound-gateway as follows.

    Code:
    	<ip:tcp-connection-factory id="crLfServer"
    		type="server"
    		port="8888"/>
    
    	<ip:tcp-inbound-gateway id="productGateway"
    		connection-factory="crLfServer"
    		request-channel="requestChannel"
            reply-channel="replyChannel"
    		error-channel="errorChannel"/>
    
        <channel id="requestChannel"/>
        <channel id="replyChannel"/>
    
        <chain input-channel="requestChannel" output-channel="jsonProductsChannel">
            <transformer expression="new String(payload)"/>
    		<transformer ref="productTransformer" method="transform" />
    	</chain>
    	<transformer id="errorHandler"
    		input-channel="errorChannel"
    		expression="payload.failedMessage.payload + ':' + payload.cause.message"/>
    
        <channel id="jsonProductsChannel"/>
    
        <recipient-list-router id="customRouter" input-channel="jsonProductsChannel"
                               timeout="1234"
                               ignore-send-failures="false"
                               apply-sequence="true">
            <recipient channel="aggregatedProductChannel"/>
            <recipient channel="productsForReplyChannel"/>
        </recipient-list-router>
    
        <channel id="aggregatedProductChannel"/>
        <channel id="productsForReplyChannel"/>
    
     	<service-activator input-channel="productsForReplyChannel" output-channel="replyChannel"
    					   ref="productValidator"
    					    method="validate"/>
    The basic idea is to return a reply message at the down stream channel "productsForReplyChannel". Be aware that I used a publish and subscribe channel to dispatch the same message to both channels aggregatedProductChannel and productsForReplyChannel. The aggregatedProductChannel will do further processing and the productsForReplyChannel will return the response message back to the TCP client. I don't know if this is a valid channel definition or not.

    However, I got the following error message.

    011-02-08 18:24:31,018 (pool-1-thread-2) [TcpInboundGateway.doSendAndReceive]
    WARN - failure occurred in gateway sendAndReceive
    org.springframework.integration.MessageHandlingExc eption: handler 'org.springframework.integration.transformer.Messa geTransformingHandler@59bdbfec' requires a reply, but no reply was received
    at org.springframework.integration.handler.AbstractRe plyProducingMessageHandler.handleMessageInternal(A bstractReplyProducingMessageHandler.java:104)
    at org.springframework.integration.handler.AbstractMe ssageHandler.handleMessage(AbstractMessageHandler. java:78)
    at org.springframework.integration.handler.MessageHan dlerChain$1.send(MessageHandlerChain.java:150)
    at org.springframework.integration.core.MessagingTemp late.doSend(MessagingTemplate.java:288)
    at org.springframework.integration.core.MessagingTemp late.send(MessagingTemplate.java:149)
    at org.springframework.integration.handler.AbstractRe plyProducingMessageHandler.sendMessage(AbstractRep lyProducingMessageHandler.java:176)
    at org.springframework.integration.handler.AbstractRe plyProducingMessageHandler.sendReplyMessage(Abstra ctReplyProducingMessageHandler.java:160)
    at org.springframework.integration.handler.AbstractRe plyProducingMessageHandler.produceReply(AbstractRe plyProducingMessageHandler.java:125)
    at org.springframework.integration.handler.AbstractRe plyProducingMessageHandler.handleResult(AbstractRe plyProducingMessageHandler.java:119)
    at org.springframework.integration.handler.AbstractRe plyProducingMessageHandler.handleMessageInternal(A bstractReplyProducingMessageHandler.java:101)
    at org.springframework.integration.handler.AbstractMe ssageHandler.handleMessage(AbstractMessageHandler. java:78)
    at org.springframework.integration.handler.MessageHan dlerChain.handleMessageInternal(MessageHandlerChai n.java:133)
    at org.springframework.integration.handler.AbstractMe ssageHandler.handleMessage(AbstractMessageHandler. java:78)
    at org.springframework.integration.dispatcher.Unicast ingDispatcher.doDispatch(UnicastingDispatcher.java :110)
    at org.springframework.integration.dispatcher.Unicast ingDispatcher.dispatch(UnicastingDispatcher.java:9 7)
    at org.springframework.integration.channel.AbstractSu bscribableChannel.doSend(AbstractSubscribableChann el.java:44)
    at org.springframework.integration.channel.AbstractMe ssageChannel.send(AbstractMessageChannel.java:157)
    at org.springframework.integration.core.MessagingTemp late.doSend(MessagingTemplate.java:288)
    at org.springframework.integration.core.MessagingTemp late.doSendAndReceive(MessagingTemplate.java:318)
    at org.springframework.integration.core.MessagingTemp late.sendAndReceive(MessagingTemplate.java:239)
    at org.springframework.integration.gateway.MessagingG atewaySupport.doSendAndReceive(MessagingGatewaySup port.java:234)
    at org.springframework.integration.gateway.MessagingG atewaySupport.sendAndReceiveMessage(MessagingGatew aySupport.java:208)
    at org.springframework.integration.ip.tcp.TcpInboundG ateway.onMessage(TcpInboundGateway.java:48)
    at org.springframework.integration.ip.tcp.connection. TcpNetConnection.run(TcpNetConnection.java:136)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run Task(ThreadPoolExecutor.java:886)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:908)
    at java.lang.Thread.run(Thread.java:662)
    If my message channel definition is not correct, is there a way to reply back the response message from one of the downstream channels?

    Thanks in advance,

    John

  • #2
    We may need to see more of your configuration together with a debug log.

    First of all, you're not really using a pub-sub channel you are using a recipient list router. Also, you are setting apply-sequence to true. What is the completion strategy for the aggregator on the aggregatedProductChannel? If it's the default, it will never complete.

    The aggregatedProductChannel is first in the list and there is no async handoff so it looks like the downstream flow on that channel is timing out, preventing the response being sent.

    Consider using a <publish-subscribe/> channel, with a task executor so each message instance is processed on a different thread.

    Also, why are you setting apply-sequence when the second leg will never go to the aggregation process?

    Finally, you don't really need a reply-channel here; the tcp gatway will create a temporary reply channel; by omitting the output-channel on the service activator; the reply will go directly to that channel.

    HTH

    Comment


    • #3
      Thanks for your quick reply. The aggregatedProductChannel name is a bit misleading and it is not an aggregation channel. If I understand correctly, the recipient list router will send the same message to the both downstream channels.

      Here is the full channel definition.

      Code:
      	<annotation-config/>
      
      	<context:component-scan base-package="com.barnesandnoble.message"/>
      
      	<ip:tcp-connection-factory id="crLfServer"
      		type="server"
      		port="8888"/>
      
      	<ip:tcp-inbound-gateway id="productGateway"
      		connection-factory="crLfServer"
      		request-channel="requestChannel"
              reply-channel="replyChannel"
      		error-channel="errorChannel"/>
      
          <channel id="requestChannel"/>
          <channel id="replyChannel"/>
      
          <chain input-channel="requestChannel" output-channel="jsonProductsChannel">
              <transformer expression="new String(payload)"/>
      		<transformer ref="productTransformer" method="transform" />
      	</chain>
      	<transformer id="errorHandler"
      		input-channel="errorChannel"
      		expression="payload.failedMessage.payload + ':' + payload.cause.message"/>
          <channel id="jsonProductsChannel"/>
      
          <recipient-list-router id="customRouter" input-channel="jsonProductsChannel"
                                 timeout="1234"
                                 ignore-send-failures="false"
                                 apply-sequence="false">
              <recipient channel="aggregatedProductChannel"/>
              <recipient channel="replyChannel"/>
          </recipient-list-router>
          <channel id="aggregatedProductChannel"/>
       
          <service-activator input-channel="replyChannel"
      					   ref="productValidator"
      					    method="validate"/>
          <logging-channel-adapter id="logger" level="DEBUG"/>
      
          <beans:bean id="productValidator" class="com.barnesandnoble.message.bus.ProductValidator"/>
      
          <channel id="productsChannel"/>
      
          <channel id="BOOKS">
              <queue capacity="500"/>
          </channel>
      
          <channel id="WORKS">
              <queue capacity="500"/>
          </channel>
      
          <channel id="MERCH">
              <queue capacity="500"/>
          </channel>
      
          <channel id="discardChannel">
              <queue capacity="200"/>
          </channel>
      
      	<poller id="poller" default="true" max-messages-per-poll="1" fixed-delay="1000"/>
      
          <channel id="booksOutputChannel">
              <interceptors>
                  <wire-tap channel="logger"/>
              </interceptors>
          </channel>
      
          <beans:bean id="booksAggregatorBean" class="com.barnesandnoble.message.bus.ProductAggregator"/>
      
          <aggregator id="booksAggregator"
                      input-channel="BOOKS"
                      output-channel="booksOutputChannel"
                      ref="booksAggregatorBean"
                      method="aggregateProduct">
          </aggregator>
      
          <beans:bean id="booksMessageStore" class="org.springframework.integration.store.SimpleMessageStore"/>
      
          <beans:bean id="jsonConverter" class="com.barnesandnoble.message.bus.ProductJSONConverter"/>
          <service-activator input-channel="booksOutputChannel"
                             output-channel="booksJsonOutput" ref="jsonConverter"
                             method="toJson"/>
      
          <stream:stdout-channel-adapter id="booksJsonOutput" append-newline="true"/>
      
          <channel id="worksOutputChannel">
               <interceptors>
                  <wire-tap channel="logger"/>
              </interceptors>
          </channel>
      
          <beans:bean id="worksAggregatorBean" class="com.barnesandnoble.message.bus.ProductAggregator"/>
      
          <aggregator id="worksAggregator"
                      input-channel="WORKS"
                      output-channel="worksOutputChannel"
                      ref="worksAggregatorBean"
                      method="aggregateProduct">
          </aggregator>
      
          <beans:bean id="worksMessageStore" class="org.springframework.integration.store.SimpleMessageStore"/>
         <service-activator input-channel="worksOutputChannel"
                             output-channel="worksJsonOutput" ref="jsonConverter"
                             method="toJson"/>
      
          <stream:stdout-channel-adapter id="worksJsonOutput" append-newline="true"/>
      
          <channel id="merchOutputChannel">
               <interceptors>
                  <wire-tap channel="logger"/>
              </interceptors>
          </channel>
      
          <beans:bean id="merchAggregatorBean" class="com.barnesandnoble.message.bus.ProductAggregator"/>
      
          <aggregator id="merchAggregator"
                      input-channel="MERCH"
                      output-channel="merchOutputChannel"
                      ref="merchAggregatorBean"
                      method="aggregateProduct">
          </aggregator>
      
          <beans:bean id="merchMessageStore" class="org.springframework.integration.store.SimpleMessageStore"/>
          <service-activator input-channel="merchOutputChannel"
                             output-channel="merchJsonOutput" ref="jsonConverter"
                             method="toJson"/>
      
           <stream:stdout-channel-adapter id="merchJsonOutput" append-newline="true"/>
      One routing class is defined as follows.

      Code:
      @MessageEndpoint
      public class ProductRouter {
       private static Logger logger = LoggerFactory.getLogger(ProductRouter.class);
      
      	@Router(inputChannel="productsChannel")
      	public String resolveProductChannel(Product product) {
              logger.debug("routing incoming channel products to " + product.getType() + " channel");
      		return product.getType();
      	}
      }
      The splitter is defined as follows.

      Code:
      @MessageEndpoint
      public class ProductSplitter {
          private static Logger logger = LoggerFactory.getLogger(ProductSplitter.class);
      
       	@Splitter(inputChannel="aggregatedProductChannel", outputChannel="productsChannel")
      	public List<Product> split(Products products) {
              logger.debug("Split Products object into " + products.getProducts().size() + " products");
      	    return products.getProducts();
      	}
      }
      The product validator is

      Code:
      public class ProductValidator {
          private static final String SUCCESSFUL = "successful";
          private static final String FAILED = "failed";
      
          public String validate(Products products){
              if(products != null && products.getProducts() !=null && products.getProducts().size() > 0){
                  return SUCCESSFUL;
              }
      
              return FAILED;
          }
      }
      The client is defined as

      Code:
      	<gateway id="productGateway"
              service-interface="com.barnesandnoble.message.service.iface.ProductService"
              default-request-channel="requestChannel"/>
      
      	<ip:tcp-connection-factory id="client"
      		type="client"
      		host="localhost"
      		port="8888"
      		single-use="true"
      		so-timeout="10000"/>
      
      	<channel id="requestChannel" />
      
      	<ip:tcp-outbound-gateway id="outGateway"
      		request-channel="requestChannel"
      		reply-channel="replyChannel"
      		connection-factory="client"
      		request-timeout="10000"
      		reply-timeout="10000"/>
      
          <channel id="replyChannel" />
      
       	<transformer id="serverBytes2String"
      		input-channel="requestChannel"
      		expression="new String(payload)"/>

      Comment


      • #4
        Here is the log message

        2011-02-08 21:54:52,341 (pool-1-thread-1) [TcpNetServerConnectionFactory.run]
        INFO - Listening on port 8888
        2011-02-08 21:54:52,344 (WrapperListener_start_runner) [PollingConsumer.start]
        INFO - started booksAggregator
        2011-02-08 21:54:52,345 (WrapperListener_start_runner) [PollingConsumer.start]
        INFO - started worksAggregator
        2011-02-08 21:54:52,346 (WrapperListener_start_runner) [PollingConsumer.start]
        INFO - started merchAggregator
        2011-02-08 21:54:52,434 (WrapperListener_start_runner) [MessageBusService.start]
        INFO - Message server started successfully
        2011-02-08 21:55:22,641 (pool-1-thread-1) [TcpNetServerConnectionFactory.run]
        DEBUG - Accepted connection from 127.0.0.1
        2011-02-08 21:55:22,646 (pool-1-thread-2) [TcpNetConnection.run]
        DEBUG - Reading...
        2011-02-08 21:55:22,646 (pool-1-thread-2) [ByteArrayCrLfSerializer.deserialize]
        DEBUG - Available to read:0
        2011-02-08 21:55:22,654 (pool-1-thread-2) [TcpNetConnection.run]
        DEBUG - Message received [Payload=[B@5288d319][Headers={timestamp=1297220122653, id=4f38184a-8d47-4449-ac32-d80d0d83847c, ip_address=127.0.0.1, ip_connection_seq=1, ip_hostname=localhost.localdomain, ip_tcp_remote_port=46056, ip_connection_id=localhost.localdomain:46056:94812 9019}]
        2011-02-08 21:55:22,655 (pool-1-thread-2) [EventDrivenConsumer.start]
        INFO - started org.springframework.integration.endpoint.EventDriv enConsumer@690bc995
        2011-02-08 21:55:22,656 (pool-1-thread-2) [DirectChannel.preSend]
        DEBUG - preSend on channel 'requestChannel', message: [Payload=[B@5288d319][Headers={timestamp=1297220122656, id=96a879a3-a26e-448d-84e8-70000344a7de, errorChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@41e8fa70, ip_address=127.0.0.1, replyChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@41e8fa70, ip_hostname=localhost.localdomain, ip_connection_seq=1, ip_tcp_remote_port=46056, ip_connection_id=localhost.localdomain:46056:94812 9019}]
        2011-02-08 21:55:22,657 (pool-1-thread-2) [MessageHandlerChain.handleMessage]
        DEBUG - org.springframework.integration.handler.MessageHan dlerChain#0 received message: [Payload=[B@5288d319][Headers={timestamp=1297220122656, id=96a879a3-a26e-448d-84e8-70000344a7de, errorChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@41e8fa70, ip_address=127.0.0.1, replyChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@41e8fa70, ip_hostname=localhost.localdomain, ip_connection_seq=1, ip_tcp_remote_port=46056, ip_connection_id=localhost.localdomain:46056:94812 9019}]
        2011-02-08 21:55:22,658 (pool-1-thread-2) [MessageTransformingHandler.handleMessage]
        DEBUG - org.springframework.integration.transformer.Messag eTransformingHandler@451415c8 received message: [Payload=[B@5288d319][Headers={timestamp=1297220122656, id=96a879a3-a26e-448d-84e8-70000344a7de, errorChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@41e8fa70, ip_address=127.0.0.1, replyChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@41e8fa70, ip_hostname=localhost.localdomain, ip_connection_seq=1, ip_tcp_remote_port=46056, ip_connection_id=localhost.localdomain:46056:94812 9019}]
        2011-02-08 21:55:22,668 (pool-1-thread-2) [MessageTransformingHandler.sendReplyMessage]
        DEBUG - handler 'org.springframework.integration.transformer.Messa geTransformingHandler@451415c8' sending reply Message: [Payload={"products":[{"type":"BOOKS","ean":"2000003602803"},{"type":"ME RCH","ean":"2000003602888"},{"type":"WORKS","ean": "2000003602999"}]}][Headers={timestamp=1297220122668, id=2c9792d0-5ede-47ae-b85f-2aa4392255ac, errorChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@41e8fa70, ip_address=127.0.0.1, replyChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@41e8fa70, ip_connection_seq=1, ip_hostname=localhost.localdomain, ip_tcp_remote_port=46056, ip_connection_id=localhost.localdomain:46056:94812 9019}]
        2011-02-08 21:55:22,669 (pool-1-thread-2) [MessageTransformingHandler.handleMessage]
        DEBUG - org.springframework.integration.transformer.Messag eTransformingHandler@29a220e6 received message: [Payload={"products":[{"type":"BOOKS","ean":"2000003602803"},{"type":"ME RCH","ean":"2000003602888"},{"type":"WORKS","ean": "2000003602999"}]}][Headers={timestamp=1297220122668, id=2c9792d0-5ede-47ae-b85f-2aa4392255ac, errorChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@41e8fa70, ip_address=127.0.0.1, replyChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@41e8fa70, ip_connection_seq=1, ip_hostname=localhost.localdomain, ip_tcp_remote_port=46056, ip_connection_id=localhost.localdomain:46056:94812 9019}]
        2011-02-08 21:55:22,716 (pool-1-thread-2) [ProductTransformer.transform]
        DEBUG - Converted json {"products":[{"type":"BOOKS","ean":"2000003602803"},{"type":"ME RCH","ean":"2000003602888"},{"type":"WORKS","ean": "2000003602999"}]} to Products object successfully
        2011-02-08 21:55:22,716 (pool-1-thread-2) [MessageTransformingHandler.sendReplyMessage]
        DEBUG - handler 'org.springframework.integration.transformer.Messa geTransformingHandler@29a220e6' sending reply Message: [Payload=Products{Product{ean='2000003602803', type='BOOKS'} Product{ean='2000003602888', type='MERCH'} Product{ean='2000003602999', type='WORKS'} }][Headers={timestamp=1297220122716, id=31a484d8-e986-4356-adda-eaa2d242ea23, errorChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@41e8fa70, ip_address=127.0.0.1, replyChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@41e8fa70, ip_hostname=localhost.localdomain, ip_connection_seq=1, ip_tcp_remote_port=46056, ip_connection_id=localhost.localdomain:46056:94812 9019}]
        2011-02-08 21:55:22,717 (pool-1-thread-2) [DirectChannel.preSend]
        DEBUG - preSend on channel 'jsonProductsChannel', message: [Payload=Products{Product{ean='2000003602803', type='BOOKS'} Product{ean='2000003602888', type='MERCH'} Product{ean='2000003602999', type='WORKS'} }][Headers={timestamp=1297220122716, id=31a484d8-e986-4356-adda-eaa2d242ea23, errorChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@41e8fa70, ip_address=127.0.0.1, replyChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@41e8fa70, ip_hostname=localhost.localdomain, ip_connection_seq=1, ip_tcp_remote_port=46056, ip_connection_id=localhost.localdomain:46056:94812 9019}]
        2011-02-08 21:55:22,717 (pool-1-thread-2) [RecipientListRouter.handleMessage]
        DEBUG - org.springframework.integration.router.RecipientLi stRouter#0 received message: [Payload=Products{Product{ean='2000003602803', type='BOOKS'} Product{ean='2000003602888', type='MERCH'} Product{ean='2000003602999', type='WORKS'} }][Headers={timestamp=1297220122716, id=31a484d8-e986-4356-adda-eaa2d242ea23, errorChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@41e8fa70, ip_address=127.0.0.1, replyChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@41e8fa70, ip_hostname=localhost.localdomain, ip_connection_seq=1, ip_tcp_remote_port=46056, ip_connection_id=localhost.localdomain:46056:94812 9019}]
        2011-02-08 21:55:22,718 (pool-1-thread-2) [DirectChannel.preSend]
        DEBUG - preSend on channel 'aggregatedProductChannel', message: [Payload=Products{Product{ean='2000003602803', type='BOOKS'} Product{ean='2000003602888', type='MERCH'} Product{ean='2000003602999', type='WORKS'} }][Headers={timestamp=1297220122716, id=31a484d8-e986-4356-adda-eaa2d242ea23, errorChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@41e8fa70, ip_address=127.0.0.1, replyChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@41e8fa70, ip_hostname=localhost.localdomain, ip_connection_seq=1, ip_tcp_remote_port=46056, ip_connection_id=localhost.localdomain:46056:94812 9019}]
        2011-02-08 21:55:22,718 (pool-1-thread-2) [MethodInvokingSplitter.handleMessage]

        Comment


        • #5
          More log message:

          DEBUG - _initHandlerFor_productSplitter received message: [Payload=Products{Product{ean='2000003602803', type='BOOKS'} Product{ean='2000003602888', type='MERCH'} Product{ean='2000003602999', type='WORKS'} }][Headers={timestamp=1297220122716, id=31a484d8-e986-4356-adda-eaa2d242ea23, errorChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@41e8fa70, ip_address=127.0.0.1, replyChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@41e8fa70, ip_hostname=localhost.localdomain, ip_connection_seq=1, ip_tcp_remote_port=46056, ip_connection_id=localhost.localdomain:46056:94812 9019}]
          2011-02-08 21:55:22,719 (pool-1-thread-2) [ProductSplitter.split]
          DEBUG - Split Products object into 3 products
          2011-02-08 21:55:22,719 (pool-1-thread-2) [MethodInvokingSplitter.sendReplyMessage]
          DEBUG - handler '_initHandlerFor_productSplitter' sending reply Message: [Payload=Product{ean='2000003602803', type='BOOKS'}][Headers={timestamp=1297220122719, id=221d4691-fb89-4777-a322-ec9f0d8c2889, errorChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@41e8fa70, ip_address=127.0.0.1, correlationId=31a484d8-e986-4356-adda-eaa2d242ea23, replyChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@41e8fa70, sequenceSize=3, ip_connection_seq=1, ip_hostname=localhost.localdomain, ip_tcp_remote_port=46056, sequenceNumber=1, ip_connection_id=localhost.localdomain:46056:94812 9019}]
          2011-02-08 21:55:22,720 (pool-1-thread-2) [DirectChannel.preSend]
          DEBUG - preSend on channel 'productsChannel', message: [Payload=Product{ean='2000003602803', type='BOOKS'}][Headers={timestamp=1297220122719, id=221d4691-fb89-4777-a322-ec9f0d8c2889, errorChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@41e8fa70, ip_address=127.0.0.1, correlationId=31a484d8-e986-4356-adda-eaa2d242ea23, replyChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@41e8fa70, sequenceSize=3, ip_connection_seq=1, ip_hostname=localhost.localdomain, ip_tcp_remote_port=46056, sequenceNumber=1, ip_connection_id=localhost.localdomain:46056:94812 9019}]
          2011-02-08 21:55:22,720 (pool-1-thread-2) [MethodInvokingRouter.handleMessage]
          DEBUG - _initHandlerFor_productRouter received message: [Payload=Product{ean='2000003602803', type='BOOKS'}][Headers={timestamp=1297220122719, id=221d4691-fb89-4777-a322-ec9f0d8c2889, errorChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@41e8fa70, ip_address=127.0.0.1, correlationId=31a484d8-e986-4356-adda-eaa2d242ea23, replyChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@41e8fa70, sequenceSize=3, ip_connection_seq=1, ip_hostname=localhost.localdomain, ip_tcp_remote_port=46056, sequenceNumber=1, ip_connection_id=localhost.localdomain:46056:94812 9019}]
          2011-02-08 21:55:22,721 (pool-1-thread-2) [ProductRouter.resolveProductChannel]
          DEBUG - routing incoming channel products to BOOKS channel
          2011-02-08 21:55:22,722 (pool-1-thread-2) [QueueChannel.preSend]
          DEBUG - preSend on channel 'BOOKS', message: [Payload=Product{ean='2000003602803', type='BOOKS'}][Headers={timestamp=1297220122719, id=221d4691-fb89-4777-a322-ec9f0d8c2889, errorChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@41e8fa70, ip_address=127.0.0.1, correlationId=31a484d8-e986-4356-adda-eaa2d242ea23, replyChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@41e8fa70, sequenceSize=3, ip_connection_seq=1, ip_hostname=localhost.localdomain, ip_tcp_remote_port=46056, sequenceNumber=1, ip_connection_id=localhost.localdomain:46056:94812 9019}]
          2011-02-08 21:55:22,722 (pool-1-thread-2) [QueueChannel.postSend]
          DEBUG - postSend (sent=true) on channel 'BOOKS', message: [Payload=Product{ean='2000003602803', type='BOOKS'}][Headers={timestamp=1297220122719, id=221d4691-fb89-4777-a322-ec9f0d8c2889, errorChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@41e8fa70, ip_address=127.0.0.1, correlationId=31a484d8-e986-4356-adda-eaa2d242ea23, replyChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@41e8fa70, sequenceSize=3, ip_connection_seq=1, ip_hostname=localhost.localdomain, ip_tcp_remote_port=46056, sequenceNumber=1, ip_connection_id=localhost.localdomain:46056:94812 9019}]
          2011-02-08 21:55:22,722 (task-scheduler-10) [QueueChannel.postReceive]
          DEBUG - postReceive on channel 'BOOKS', message: [Payload=Product{ean='2000003602803', type='BOOKS'}][Headers={timestamp=1297220122719, id=221d4691-fb89-4777-a322-ec9f0d8c2889, errorChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@41e8fa70, ip_address=127.0.0.1, correlationId=31a484d8-e986-4356-adda-eaa2d242ea23, replyChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@41e8fa70, sequenceSize=3, ip_connection_seq=1, ip_hostname=localhost.localdomain, ip_tcp_remote_port=46056, sequenceNumber=1, ip_connection_id=localhost.localdomain:46056:94812 9019}]
          2011-02-08 21:55:22,723 (pool-1-thread-2) [DirectChannel.postSend]
          DEBUG - postSend (sent=true) on channel 'productsChannel', message: [Payload=Product{ean='2000003602803', type='BOOKS'}][Headers={timestamp=1297220122719, id=221d4691-fb89-4777-a322-ec9f0d8c2889, errorChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@41e8fa70, ip_address=127.0.0.1, correlationId=31a484d8-e986-4356-adda-eaa2d242ea23, replyChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@41e8fa70, sequenceSize=3, ip_connection_seq=1, ip_hostname=localhost.localdomain, ip_tcp_remote_port=46056, sequenceNumber=1, ip_connection_id=localhost.localdomain:46056:94812 9019}]
          2011-02-08 21:55:22,724 (task-scheduler-10) [CorrelatingMessageHandler.handleMessage]
          DEBUG - org.springframework.integration.aggregator.Correla tingMessageHandler#0 received message: [Payload=Product{ean='2000003602803', type='BOOKS'}][Headers={timestamp=1297220122719, id=221d4691-fb89-4777-a322-ec9f0d8c2889, errorChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@41e8fa70, ip_address=127.0.0.1, correlationId=31a484d8-e986-4356-adda-eaa2d242ea23, replyChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@41e8fa70, sequenceSize=3, ip_connection_seq=1, ip_hostname=localhost.localdomain, ip_tcp_remote_port=46056, sequenceNumber=1, ip_connection_id=localhost.localdomain:46056:94812 9019}]

          Comment


          • #6
            Skipped some parts and here is more:

            2011-02-08 21:55:22,747 (pool-1-thread-2) [DirectChannel.postSend]
            DEBUG - postSend (sent=true) on channel 'requestChannel', message: [Payload=[B@5288d319][Headers={timestamp=1297220122656, id=96a879a3-a26e-448d-84e8-70000344a7de, errorChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@41e8fa70, ip_address=127.0.0.1, replyChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@41e8fa70, ip_hostname=localhost.localdomain, ip_connection_seq=1, ip_tcp_remote_port=46056, ip_connection_id=localhost.localdomain:46056:94812 9019}]
            2011-02-08 21:55:22,748 (pool-1-thread-2) [TcpNetConnection.send]
            DEBUG - Message sent [Payload=successful][Headers={timestamp=1297220122748, id=43964be2-0b63-4106-b3d2-beefb2077c0e, ip_address=127.0.0.1, ip_hostname=localhost.localdomain, ip_connection_seq=1, ip_tcp_remote_port=46056, ip_connection_id=localhost.localdomain:46056:94812 9019}]
            2011-02-08 21:55:22,749 (pool-1-thread-2) [ByteArrayCrLfSerializer.deserialize]
            DEBUG - Available to read:2
            2011-02-08 21:55:22,749 (pool-1-thread-2) [TcpNetConnection.run]
            DEBUG - Message received [Payload=[B@156a9424][Headers={timestamp=1297220122749, id=89f447a1-9f8f-4226-8aa5-d4d27bdcd509, ip_address=127.0.0.1, ip_connection_seq=2, ip_hostname=localhost.localdomain, ip_tcp_remote_port=46056, ip_connection_id=localhost.localdomain:46056:94812 9019}]
            2011-02-08 21:55:22,749 (pool-1-thread-2) [DirectChannel.preSend]
            DEBUG - preSend on channel 'requestChannel', message: [Payload=[B@156a9424][Headers={timestamp=1297220122749, id=24293e02-6627-4dce-add8-135014c22eb5, errorChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@823d278, ip_address=127.0.0.1, replyChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@823d278, ip_hostname=localhost.localdomain, ip_connection_seq=2, ip_tcp_remote_port=46056, ip_connection_id=localhost.localdomain:46056:94812 9019}]
            2011-02-08 21:55:22,750 (pool-1-thread-2) [MessageHandlerChain.handleMessage]
            DEBUG - org.springframework.integration.handler.MessageHan dlerChain#0 received message: [Payload=[B@156a9424][Headers={timestamp=1297220122749, id=24293e02-6627-4dce-add8-135014c22eb5, errorChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@823d278, ip_address=127.0.0.1, replyChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@823d278, ip_hostname=localhost.localdomain, ip_connection_seq=2, ip_tcp_remote_port=46056, ip_connection_id=localhost.localdomain:46056:94812 9019}]
            2011-02-08 21:55:22,750 (pool-1-thread-2) [MessageTransformingHandler.handleMessage]
            DEBUG - org.springframework.integration.transformer.Messag eTransformingHandler@451415c8 received message: [Payload=[B@156a9424][Headers={timestamp=1297220122749, id=24293e02-6627-4dce-add8-135014c22eb5, errorChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@823d278, ip_address=127.0.0.1, replyChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@823d278, ip_hostname=localhost.localdomain, ip_connection_seq=2, ip_tcp_remote_port=46056, ip_connection_id=localhost.localdomain:46056:94812 9019}]
            2011-02-08 21:55:22,751 (pool-1-thread-2) [MessageTransformingHandler.sendReplyMessage]
            DEBUG - handler 'org.springframework.integration.transformer.Messa geTransformingHandler@451415c8' sending reply Message: [Payload=][Headers={timestamp=1297220122751, id=a94bb621-7113-4eb9-b35d-5419f7cc4f3a, errorChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@823d278, ip_address=127.0.0.1, replyChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@823d278, ip_connection_seq=2, ip_hostname=localhost.localdomain, ip_tcp_remote_port=46056, ip_connection_id=localhost.localdomain:46056:94812 9019}]
            2011-02-08 21:55:22,751 (pool-1-thread-2) [MessageTransformingHandler.handleMessage]
            DEBUG - org.springframework.integration.transformer.Messag eTransformingHandler@29a220e6 received message: [Payload=][Headers={timestamp=1297220122751, id=a94bb621-7113-4eb9-b35d-5419f7cc4f3a, errorChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@823d278, ip_address=127.0.0.1, replyChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@823d278, ip_connection_seq=2, ip_hostname=localhost.localdomain, ip_tcp_remote_port=46056, ip_connection_id=localhost.localdomain:46056:94812 9019}]
            2011-02-08 21:55:22,751 (pool-1-thread-2) [ProductTransformer.transform]
            ERROR - Error converting json to Products object, No content to map to Object due to end of input
            2011-02-08 21:55:22,753 (pool-1-thread-2) [ProductTransformer.transform]
            ERROR -
            No content to map to Object due to end of input
            org.codehaus.jackson.map.ObjectMapper._initForRead ing(ObjectMapper.java:2173)
            org.codehaus.jackson.map.ObjectMapper._readMapAndC lose(ObjectMapper.java:2125)
            org.codehaus.jackson.map.ObjectMapper.readValue(Ob jectMapper.java:1395)
            com.barnesandnoble.message.bus.ProductTransformer. transform(ProductTransformer.java:33)
            sun.reflect.NativeMethodAccessorImpl.invoke0(Nativ e Method)
            sun.reflect.NativeMethodAccessorImpl.invoke(Native MethodAccessorImpl.java:39)
            sun.reflect.DelegatingMethodAccessorImpl.invoke(De legatingMethodAccessorImpl.java:25)
            java.lang.reflect.Method.invoke(Method.java:597)
            org.springframework.expression.spel.support.Reflec tiveMethodExecutor.execute(ReflectiveMethodExecuto r.java:69)
            org.springframework.expression.spel.ast.MethodRefe rence.getValueInternal(MethodReference.java:83)
            org.springframework.expression.spel.ast.CompoundEx pression.getValueInternal(CompoundExpression.java: 57)
            org.springframework.expression.spel.ast.SpelNodeIm pl.getTypedValue(SpelNodeImpl.java:102)
            org.springframework.expression.spel.standard.SpelE xpression.getValue(SpelExpression.java:102)
            org.springframework.integration.util.AbstractExpre ssionEvaluator.evaluateExpression(AbstractExpressi onEvaluator.java:114)
            org.springframework.integration.util.MessagingMeth odInvokerHelper.processInternal(MessagingMethodInv okerHelper.java:223)
            org.springframework.integration.util.MessagingMeth odInvokerHelper.process(MessagingMethodInvokerHelp er.java:123)
            org.springframework.integration.handler.MethodInvo kingMessageProcessor.processMessage(MethodInvoking MessageProcessor.java:73)
            org.springframework.integration.transformer.Abstra ctMessageProcessingTransformer.transform(AbstractM essageProcessingTransformer.java:56)
            org.springframework.integration.transformer.Messag eTransformingHandler.handleRequestMessage(MessageT ransformingHandler.java:67)
            org.springframework.integration.handler.AbstractRe plyProducingMessageHandler.handleMessageInternal(A bstractReplyProducingMessageHandler.java:98)
            org.springframework.integration.handler.AbstractMe ssageHandler.handleMessage(AbstractMessageHandler. java:78)
            org.springframework.integration.handler.MessageHan dlerChain$1.send(MessageHandlerChain.java:150)
            org.springframework.integration.core.MessagingTemp late.doSend(MessagingTemplate.java:288)
            org.springframework.integration.core.MessagingTemp late.send(MessagingTemplate.java:149)
            org.springframework.integration.handler.AbstractRe plyProducingMessageHandler.sendMessage(AbstractRep lyProducingMessageHandler.java:176)

            Comment


            • #7
              Sorry for the long message. More to come:

              org.springframework.integration.handler.AbstractRe plyProducingMessageHandler.sendReplyMessage(Abstra ctReplyProducingMessageHandler.java:160)
              org.springframework.integration.handler.AbstractRe plyProducingMessageHandler.produceReply(AbstractRe plyProducingMessageHandler.java:125)
              org.springframework.integration.handler.AbstractRe plyProducingMessageHandler.handleResult(AbstractRe plyProducingMessageHandler.java:119)
              org.springframework.integration.handler.AbstractRe plyProducingMessageHandler.handleMessageInternal(A bstractReplyProducingMessageHandler.java:101)
              org.springframework.integration.handler.AbstractMe ssageHandler.handleMessage(AbstractMessageHandler. java:78)
              org.springframework.integration.handler.MessageHan dlerChain.handleMessageInternal(MessageHandlerChai n.java:133)
              org.springframework.integration.handler.AbstractMe ssageHandler.handleMessage(AbstractMessageHandler. java:78)
              org.springframework.integration.dispatcher.Unicast ingDispatcher.doDispatch(UnicastingDispatcher.java :110)
              org.springframework.integration.dispatcher.Unicast ingDispatcher.dispatch(UnicastingDispatcher.java:9 7)
              org.springframework.integration.channel.AbstractSu bscribableChannel.doSend(AbstractSubscribableChann el.java:44)
              org.springframework.integration.channel.AbstractMe ssageChannel.send(AbstractMessageChannel.java:157)
              org.springframework.integration.core.MessagingTemp late.doSend(MessagingTemplate.java:288)
              org.springframework.integration.core.MessagingTemp late.doSendAndReceive(MessagingTemplate.java:318)
              org.springframework.integration.core.MessagingTemp late.sendAndReceive(MessagingTemplate.java:239)
              org.springframework.integration.gateway.MessagingG atewaySupport.doSendAndReceive(MessagingGatewaySup port.java:234)
              org.springframework.integration.gateway.MessagingG atewaySupport.sendAndReceiveMessage(MessagingGatew aySupport.java:208)
              org.springframework.integration.ip.tcp.TcpInboundG ateway.onMessage(TcpInboundGateway.java:48)
              org.springframework.integration.ip.tcp.connection. TcpNetConnection.run(TcpNetConnection.java:136)
              java.util.concurrent.ThreadPoolExecutor$Worker.run Task(ThreadPoolExecutor.java:886)
              java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:908)
              java.lang.Thread.run(Thread.java:662)

              2011-02-08 21:55:22,754 (pool-1-thread-2) [TcpInboundGateway.doSendAndReceive]
              WARN - failure occurred in gateway sendAndReceive
              org.springframework.integration.MessageHandlingExc eption: handler 'org.springframework.integration.transformer.Messa geTransformingHandler@29a220e6' requires a reply, but no reply was received
              at org.springframework.integration.handler.AbstractRe plyProducingMessageHandler.handleMessageInternal(A bstractReplyProducingMessageHandler.java:104)
              at org.springframework.integration.handler.AbstractMe ssageHandler.handleMessage(AbstractMessageHandler. java:78)
              at org.springframework.integration.handler.MessageHan dlerChain$1.send(MessageHandlerChain.java:150)
              at org.springframework.integration.core.MessagingTemp late.doSend(MessagingTemplate.java:288)
              at org.springframework.integration.core.MessagingTemp late.send(MessagingTemplate.java:149)
              at org.springframework.integration.handler.AbstractRe plyProducingMessageHandler.sendMessage(AbstractRep lyProducingMessageHandler.java:176)
              at org.springframework.integration.handler.AbstractRe plyProducingMessageHandler.sendReplyMessage(Abstra ctReplyProducingMessageHandler.java:160)
              at org.springframework.integration.handler.AbstractRe plyProducingMessageHandler.produceReply(AbstractRe plyProducingMessageHandler.java:125)
              at org.springframework.integration.handler.AbstractRe plyProducingMessageHandler.handleResult(AbstractRe plyProducingMessageHandler.java:119)
              at org.springframework.integration.handler.AbstractRe plyProducingMessageHandler.handleMessageInternal(A bstractReplyProducingMessageHandler.java:101)
              at org.springframework.integration.handler.AbstractMe ssageHandler.handleMessage(AbstractMessageHandler. java:78)
              at org.springframework.integration.handler.MessageHan dlerChain.handleMessageInternal(MessageHandlerChai n.java:133)
              at org.springframework.integration.handler.AbstractMe ssageHandler.handleMessage(AbstractMessageHandler. java:78)
              at org.springframework.integration.dispatcher.Unicast ingDispatcher.doDispatch(UnicastingDispatcher.java :110)
              at org.springframework.integration.dispatcher.Unicast ingDispatcher.dispatch(UnicastingDispatcher.java:9 7)
              at org.springframework.integration.channel.AbstractSu bscribableChannel.doSend(AbstractSubscribableChann el.java:44)
              at org.springframework.integration.channel.AbstractMe ssageChannel.send(AbstractMessageChannel.java:157)
              at org.springframework.integration.core.MessagingTemp late.doSend(MessagingTemplate.java:288)
              at org.springframework.integration.core.MessagingTemp late.doSendAndReceive(MessagingTemplate.java:318)
              at org.springframework.integration.core.MessagingTemp late.sendAndReceive(MessagingTemplate.java:239)
              at org.springframework.integration.gateway.MessagingG atewaySupport.doSendAndReceive(MessagingGatewaySup port.java:234)
              at org.springframework.integration.gateway.MessagingG atewaySupport.sendAndReceiveMessage(MessagingGatew aySupport.java:208)
              at org.springframework.integration.ip.tcp.TcpInboundG ateway.onMessage(TcpInboundGateway.java:48)
              at org.springframework.integration.ip.tcp.connection. TcpNetConnection.run(TcpNetConnection.java:136)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run Task(ThreadPoolExecutor.java:886)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:908)
              at java.lang.Thread.run(Thread.java:662)
              2011-02-08 21:55:22,755 (pool-1-thread-2) [DirectChannel.preSend]
              DEBUG - preSend on channel 'errorChannel', message: [Payload=org.springframework.integration.MessageHan dlingException: handler 'org.springframework.integration.transformer.Messa geTransformingHandler@29a220e6' requires a reply, but no reply was received][Headers={timestamp=1297220122755, id=a278a5d8-d8cf-4bff-bd8c-eb87f7e28806, errorChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@6af2da21, replyChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@6af2da21}]
              2011-02-08 21:55:22,756 (pool-1-thread-2) [MessageTransformingHandler.handleMessage]
              DEBUG - org.springframework.integration.transformer.Messag eTransformingHandler@23042fcc received message: [Payload=org.springframework.integration.MessageHan dlingException: handler 'org.springframework.integration.transformer.Messa geTransformingHandler@29a220e6' requires a reply, but no reply was received][Headers={timestamp=1297220122755, id=a278a5d8-d8cf-4bff-bd8c-eb87f7e28806, errorChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@6af2da21, replyChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@6af2da21}]
              2011-02-08 21:55:22,760 (pool-1-thread-2) [TcpNetConnection.run]

              Comment


              • #8
                Here is the last part of the log message.

                ERROR - Exception sending meeeage: [Payload=[B@156a9424][Headers={timestamp=1297220122749, id=89f447a1-9f8f-4226-8aa5-d4d27bdcd509, ip_address=127.0.0.1, ip_connection_seq=2, ip_hostname=localhost.localdomain, ip_tcp_remote_port=46056, ip_connection_id=localhost.localdomain:46056:94812 9019}]
                org.springframework.integration.MessagingException : failure occurred in error-handling flow
                at org.springframework.integration.gateway.MessagingG atewaySupport.doSendAndReceive(MessagingGatewaySup port.java:253)
                at org.springframework.integration.gateway.MessagingG atewaySupport.sendAndReceiveMessage(MessagingGatew aySupport.java:208)
                at org.springframework.integration.ip.tcp.TcpInboundG ateway.onMessage(TcpInboundGateway.java:48)
                at org.springframework.integration.ip.tcp.connection. TcpNetConnection.run(TcpNetConnection.java:136)
                at java.util.concurrent.ThreadPoolExecutor$Worker.run Task(ThreadPoolExecutor.java:886)
                at java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:908)
                at java.lang.Thread.run(Thread.java:662)
                Caused by: org.springframework.integration.transformer.Messag eTransformationException: org.springframework.integration.MessageHandlingExc eption: Expression evaluation failed: payload.failedMessage.payload + ':' + payload.cause.message
                at org.springframework.integration.transformer.Messag eTransformingHandler.handleRequestMessage(MessageT ransformingHandler.java:73)
                at org.springframework.integration.handler.AbstractRe plyProducingMessageHandler.handleMessageInternal(A bstractReplyProducingMessageHandler.java:98)
                at org.springframework.integration.handler.AbstractMe ssageHandler.handleMessage(AbstractMessageHandler. java:78)
                at org.springframework.integration.dispatcher.Unicast ingDispatcher.doDispatch(UnicastingDispatcher.java :110)
                at org.springframework.integration.dispatcher.Unicast ingDispatcher.dispatch(UnicastingDispatcher.java:9 7)
                at org.springframework.integration.channel.AbstractSu bscribableChannel.doSend(AbstractSubscribableChann el.java:44)
                at org.springframework.integration.channel.AbstractMe ssageChannel.send(AbstractMessageChannel.java:157)
                at org.springframework.integration.core.MessagingTemp late.doSend(MessagingTemplate.java:288)
                at org.springframework.integration.core.MessagingTemp late.doSendAndReceive(MessagingTemplate.java:318)
                at org.springframework.integration.core.MessagingTemp late.sendAndReceive(MessagingTemplate.java:239)
                at org.springframework.integration.gateway.MessagingG atewaySupport.doSendAndReceive(MessagingGatewaySup port.java:250)
                ... 6 more
                Caused by: org.springframework.integration.MessageHandlingExc eption: Expression evaluation failed: payload.failedMessage.payload + ':' + payload.cause.message
                at org.springframework.integration.util.AbstractExpre ssionEvaluator.evaluateExpression(AbstractExpressi onEvaluator.java:84)
                at org.springframework.integration.handler.Expression EvaluatingMessageProcessor.processMessage(Expressi onEvaluatingMessageProcessor.java:67)
                at org.springframework.integration.transformer.Abstra ctMessageProcessingTransformer.transform(AbstractM essageProcessingTransformer.java:56)
                at org.springframework.integration.transformer.Messag eTransformingHandler.handleRequestMessage(MessageT ransformingHandler.java:67)
                ... 16 more
                Caused by: org.springframework.expression.spel.SpelEvaluation Exception: EL1007Epos 52): Field or property 'message' cannot be found on null
                at org.springframework.expression.spel.ast.PropertyOr FieldReference.readProperty(PropertyOrFieldReferen ce.java:204)
                at org.springframework.expression.spel.ast.PropertyOr FieldReference.getValueInternal(PropertyOrFieldRef erence.java:71)
                at org.springframework.expression.spel.ast.CompoundEx pression.getValueInternal(CompoundExpression.java: 57)
                at org.springframework.expression.spel.ast.OpPlus.get ValueInternal(OpPlus.java:63)
                at org.springframework.expression.spel.ast.SpelNodeIm pl.getTypedValue(SpelNodeImpl.java:102)
                at org.springframework.expression.spel.standard.SpelE xpression.getValue(SpelExpression.java:102)
                at org.springframework.integration.util.AbstractExpre ssionEvaluator.evaluateExpression(AbstractExpressi onEvaluator.java:114)
                at org.springframework.integration.util.AbstractExpre ssionEvaluator.evaluateExpression(AbstractExpressi onEvaluator.java:80)
                ... 19 more
                2011-02-08 21:55:22,761 (pool-1-thread-2) [ByteArrayCrLfSerializer.deserialize]
                DEBUG - Available to read:0
                2011-02-08 21:55:22,761 (pool-1-thread-2) [TcpNetConnection.run]
                ERROR - Read exception localhost.localdomain:46056:948129019 SocketException:null:Connection reset
                The client received the first reply "successful" successfully, but then there was an empty message passed to the message channel and caused the error. Any hints?

                Thanks,

                John

                Comment


                • #9
                  Here is the client log message

                  INFO : org.springframework.context.support.DefaultLifecyc leProcessor - Starting beans in phase -2147483648
                  DEBUG: org.springframework.context.support.DefaultLifecyc leProcessor - Starting bean 'outGateway' of type [class org.springframework.integration.config.ConsumerEnd pointFactoryBean]
                  INFO : org.springframework.integration.endpoint.EventDriv enConsumer - started outGateway
                  DEBUG: org.springframework.context.support.DefaultLifecyc leProcessor - Successfully started bean 'outGateway'
                  DEBUG: org.springframework.context.support.DefaultLifecyc leProcessor - Starting bean 'serverBytes2String' of type [class org.springframework.integration.config.ConsumerEnd pointFactoryBean]
                  INFO : org.springframework.integration.endpoint.EventDriv enConsumer - started serverBytes2String
                  DEBUG: org.springframework.context.support.DefaultLifecyc leProcessor - Successfully started bean 'serverBytes2String'
                  DEBUG: org.springframework.context.support.DefaultLifecyc leProcessor - Starting bean '_org.springframework.integration.errorLogger' of type [class org.springframework.integration.endpoint.EventDriv enConsumer]
                  INFO : org.springframework.integration.endpoint.EventDriv enConsumer - started _org.springframework.integration.errorLogger
                  DEBUG: org.springframework.context.support.DefaultLifecyc leProcessor - Successfully started bean '_org.springframework.integration.errorLogger'
                  INFO : org.springframework.context.support.DefaultLifecyc leProcessor - Starting beans in phase 0
                  DEBUG: org.springframework.context.support.DefaultLifecyc leProcessor - Starting bean 'client' of type [class org.springframework.integration.ip.tcp.connection. TcpNetClientConnectionFactory]
                  DEBUG: org.springframework.context.support.DefaultLifecyc leProcessor - Successfully started bean 'client'
                  DEBUG: org.springframework.beans.factory.support.DefaultL istableBeanFactory - Returning cached instance of singleton bean 'productGateway'
                  DEBUG: com.barnesandnoble.message.client.ProductClient - Processing 3 products
                  DEBUG: com.barnesandnoble.message.ProductTransformer - Converted products: {"products":[{"type":"BOOKS","ean":"2000003602803"},{"type":"ME RCH","ean":"2000003602888"},{"type":"WORKS","ean": "2000003602999"}]}
                  DEBUG: com.barnesandnoble.message.client.ProductClient - Send request: {"products":[{"type":"BOOKS","ean":"2000003602803"},{"type":"ME RCH","ean":"2000003602888"},{"type":"WORKS","ean": "2000003602999"}]}

                  DEBUG: org.springframework.integration.channel.DirectChan nel - preSend on channel 'requestChannel', message: [Payload={"products":[{"type":"BOOKS","ean":"2000003602803"},{"type":"ME RCH","ean":"2000003602888"},{"type":"WORKS","ean": "2000003602999"}]}
                  ][Headers={timestamp=1297220122633, id=f3f3ffbc-3f2f-401d-8c58-9228c0bc1ccb, errorChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@46e06703, replyChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@46e06703}]
                  DEBUG: org.springframework.integration.ip.tcp.TcpOutbound Gateway - org.springframework.integration.ip.tcp.TcpOutbound Gateway#0 received message: [Payload={"products":[{"type":"BOOKS","ean":"2000003602803"},{"type":"ME RCH","ean":"2000003602888"},{"type":"WORKS","ean": "2000003602999"}]}
                  ][Headers={timestamp=1297220122633, id=f3f3ffbc-3f2f-401d-8c58-9228c0bc1ccb, errorChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@46e06703, replyChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@46e06703}]
                  DEBUG: org.springframework.integration.ip.tcp.connection. TcpNetClientConnectionFactory - Opening new socket connection to localhost:8888
                  DEBUG: org.springframework.integration.ip.tcp.connection. TcpNetConnection - Reading...
                  DEBUG: org.springframework.integration.ip.tcp.TcpOutbound Gateway - Added localhost:8888:545732387
                  DEBUG: org.springframework.integration.ip.tcp.serializer. ByteArrayCrLfSerializer - Available to read:0
                  DEBUG: org.springframework.integration.ip.tcp.connection. TcpNetConnection - Message sent [Payload={"products":[{"type":"BOOKS","ean":"2000003602803"},{"type":"ME RCH","ean":"2000003602888"},{"type":"WORKS","ean": "2000003602999"}]}
                  ][Headers={timestamp=1297220122633, id=f3f3ffbc-3f2f-401d-8c58-9228c0bc1ccb, errorChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@46e06703, replyChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@46e06703}]
                  DEBUG: org.springframework.integration.ip.tcp.connection. TcpNetConnection - Message received [Payload=[B@317cfd38][Headers={timestamp=1297220122749, id=a16b6dba-9b54-418c-b888-93371a1e7a56, ip_address=127.0.0.1, ip_connection_seq=1, ip_hostname=localhost, ip_tcp_remote_port=8888, ip_connection_id=localhost:8888:545732387}]
                  DEBUG: org.springframework.integration.ip.tcp.connection. TcpNetConnection - Closing single use socket after inbound message localhost:8888:545732387
                  DEBUG: org.springframework.integration.ip.tcp.TcpOutbound Gateway - Respose [Payload=[B@317cfd38][Headers={timestamp=1297220122749, id=a16b6dba-9b54-418c-b888-93371a1e7a56, ip_address=127.0.0.1, ip_connection_seq=1, ip_hostname=localhost, ip_tcp_remote_port=8888, ip_connection_id=localhost:8888:545732387}]
                  DEBUG: org.springframework.integration.ip.tcp.TcpOutbound Gateway - handler 'org.springframework.integration.ip.tcp.TcpOutboun dGateway#0' sending reply Message: [Payload=[B@317cfd38][Headers={timestamp=1297220122751, id=68840006-b279-42a0-a4cb-9bbfb4c7a982, errorChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@46e06703, ip_address=127.0.0.1, replyChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@46e06703, ip_hostname=localhost, ip_connection_seq=1, ip_tcp_remote_port=8888, ip_connection_id=localhost:8888:545732387}]
                  DEBUG: org.springframework.integration.channel.DirectChan nel - preSend on channel 'replyChannel', message: [Payload=[B@317cfd38][Headers={timestamp=1297220122751, id=68840006-b279-42a0-a4cb-9bbfb4c7a982, errorChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@46e06703, ip_address=127.0.0.1, replyChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@46e06703, ip_hostname=localhost, ip_connection_seq=1, ip_tcp_remote_port=8888, ip_connection_id=localhost:8888:545732387}]
                  DEBUG: org.springframework.integration.transformer.Messag eTransformingHandler - org.springframework.integration.transformer.Messag eTransformingHandler@2e8aeed0 received message: [Payload={"products":[{"type":"BOOKS","ean":"2000003602803"},{"type":"ME RCH","ean":"2000003602888"},{"type":"WORKS","ean": "2000003602999"}]}
                  ][Headers={timestamp=1297220122633, id=f3f3ffbc-3f2f-401d-8c58-9228c0bc1ccb, errorChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@46e06703, replyChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@46e06703}]
                  DEBUG: org.springframework.integration.transformer.Messag eTransformingHandler - handler 'org.springframework.integration.transformer.Messa geTransformingHandler@2e8aeed0' sending reply Message: [Payload={"products":[{"type":"BOOKS","ean":"2000003602803"},{"type":"ME RCH","ean":"2000003602888"},{"type":"WORKS","ean": "2000003602999"}]}
                  ][Headers={timestamp=1297220122761, id=0d0cad9a-9f25-4866-8310-639e7b87af8a, errorChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@46e06703, replyChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@46e06703}]
                  DEBUG: org.springframework.integration.channel.DirectChan nel - postSend (sent=true) on channel 'requestChannel', message: [Payload={"products":[{"type":"BOOKS","ean":"2000003602803"},{"type":"ME RCH","ean":"2000003602888"},{"type":"WORKS","ean": "2000003602999"}]}
                  ][Headers={timestamp=1297220122633, id=f3f3ffbc-3f2f-401d-8c58-9228c0bc1ccb, errorChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@46e06703, replyChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@46e06703}]
                  DEBUG: org.springframework.integration.gateway.GatewayPro xyFactoryBean - Unable to attempt conversion of Message payload types. Component 'productGateway' has no explicit ConversionService reference, and there is no 'integrationConversionService' bean within the context.
                  DEBUG: com.barnesandnoble.message.client.ProductClient - Received response: {"products":[{"type":"BOOKS","ean":"2000003602803"},{"type":"ME RCH","ean":"2000003602888"},{"type":"WORKS","ean": "2000003602999"}]}

                  Comment


                  • #10
                    Looked closer. The client did not receive the "successful" reply message, but received the product json message. This is very confusing behavior.

                    Comment


                    • #11
                      I haven't fully dissected your flow but transformers have to return a result

                      Code:
                      2011-02-08 21:55:22,751 (pool-1-thread-2) [ProductTransformer.transform]
                      ERROR - Error converting json to Products object, No content to map to Object due to end of input
                      2011-02-08 21:55:22,753 (pool-1-thread-2) [ProductTransformer.transform]
                      ERROR -
                      No content to map to Object due to end of input
                      
                      ...
                      
                      org.springframework.integration.MessageHandlingExc eption: handler 'org.springframework.integration.transformer.Messa geTransformingHandler@29a220e6' requires a reply, but no reply was received
                      
                      ...
                      That's why you are getting an 'requires reply' error and why we never send it to the second recipient. Again, running it on a separate thread will make that process independent of your first leg.

                      Also, you need to add a <header-filter/> to remove the replyChannel from the first leg so we don't try to send any reply created by that leg back to the tcp gateway.

                      In future, please attach logs in a zip; thanks.

                      Comment


                      • #12
                        Sorry. I didn't know that I can attach a zip file here.

                        The wired thing is why the transformer received an empty message because the client only sent one message with 3 products. Should never be empty.

                        Thanks,

                        John

                        Comment


                        • #13
                          Did you mean whether to reply back is controlled by header meta data? Will other behaviors be controlled by the header meta data as well?

                          I changed the router to a publish-subcribe-channel, but got back the same result.

                          <publish-subscribe-channel id="productPublishChannel"/>

                          <service-activator input-channel="productPublishChannel" output-channel="replyChannel"
                          ref="productValidator" method="validate"/>

                          <beans:bean id="productSplitter" class="com.barnesandnoble.message.bus.ProductSplit ter"/>

                          <service-activator input-channel="productPublishChannel" output-channel="productsChannel"
                          ref="productSplitter" method="split"/>
                          I saw publish-subscribe-channel comes with a task-executor attribute, but could not find an example for it.

                          <publish-subscribe-channel id="pubsubChannel" task-executor="someExecutor"/>
                          Is the task-executor a thread or a thread pool. If it is a thread pool, will the thread pool be used for the current/single tcp request thread or shared for all tcp request threads?

                          Also, does the server tcp connection factory create tcp request threads from a thread pool? If it does, how to configure the thread pool?

                          Thanks,

                          John

                          Comment


                          • #14
                            Yes, when an inbound gateway (any gateway, not just TCP) puts a message on the request-channel, it includes a temporary reply channel in the replyChannel header. This channel is used to send a reply to the client; even if you explicitly send the reply to the reply-channel, that gets bridged to the temporary reply channel.

                            Since you are sending the message to two subflows, it only makes sense for one of those subflows to send a reply so you need to expunge the replyChannel header from one of the flows using a <header-filter/>.

                            The reason you are seeing the same result with a pub-sub with no task executor is again because the first leg is failing. You can set ignore-failures to true, or use a task-executor to run the reply on a separate thread, but in any case, you must remove the replyChannel header from the other flow.

                            You can choose whatever task executor you want; the easiest is to use the <task: /> namespace, e.g.

                            Code:
                            	<task:executor id="executor" pool-size="10"/>
                            With the tcp connnection factory, you can specify an external task executor or, if you don't, we use a fixed thread pool task executor

                            Code:
                            			if (this.taskExecutor == null) {
                            				this.taskExecutor = Executors.newFixedThreadPool(this.poolSize);
                            			}
                            Where the pool size defaults to 5.

                            This is all documented in Table 16.1. Connection Factory Attributes.

                            Comment


                            • #15
                              Thanks for the reply. But that does not explain why the socket was read multiple times on the server side. The client side only sent request once.

                              011-02-09 12:57:39,021 (pool-1-thread-1) [TcpNetServerConnectionFactory.run]
                              DEBUG - Accepted connection from 127.0.0.1
                              2011-02-09 12:57:39,025 (pool-1-thread-2) [TcpNetConnection.run]
                              DEBUG - Reading...
                              2011-02-09 12:57:39,025 (pool-1-thread-2) [ByteArrayCrLfSerializer.deserialize]
                              DEBUG - Available to read:0
                              2011-02-09 12:57:39,030 (pool-1-thread-2) [TcpNetConnection.run]
                              DEBUG - Message received [Payload=[B@6f03fcaa][Headers={timestamp=1297274259030, id=5481206f-897c-4887-bf74-45dc3f2d4363, ip_address=127.0.0.1, ip_connection_seq=1, ip_hostname=localhost.localdomain, ip_tcp_remote_port=57415, ip_connection_id=localhost.localdomain:57415:17741 97673}]
                              ...
                              2011-02-09 12:57:39,112 (pool-1-thread-2) [ByteArrayCrLfSerializer.deserialize]
                              DEBUG - Available to read:2
                              2011-02-09 12:57:39,112 (pool-1-thread-2) [TcpNetConnection.run]
                              DEBUG - Message received [Payload=[B@53458dcb][Headers={timestamp=1297274259112, id=10f502df-6ac4-445c-8cad-9ce8bd87a47a, ip_address=127.0.0.1, ip_connection_seq=2, ip_hostname=localhost.localdomain, ip_tcp_remote_port=57415, ip_connection_id=localhost.localdomain:57415:17741 97673}]
                              ...
                              2011-02-09 12:57:39,131 (pool-1-thread-2) [ByteArrayCrLfSerializer.deserialize]
                              DEBUG - Available to read:0
                              2011-02-09 12:57:39,132 (pool-1-thread-2) [TcpNetConnection.run]
                              ERROR - Read exception localhost.localdomain:57415:1774197673 SocketException:null:Connection reset
                              Does that mean the server will keep reading data from the socket and trying to process it even the input is empty? Sound wired to me because I assume the server will expect CRLF to indicate the end of a message and it should not process any empty input. Assume there is no exception, would the server keep processing the empty input as long as the request socket is open?

                              Thanks,

                              John

                              Comment

                              Working...
                              X