Announcement Announcement Module
Collapse
No announcement yet.
how to do to send other message before close connection in single use client ? Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • how to do to send other message before close connection in single use client ?

    Hi, Is there any way to send other message ( different the original ) before tcpnetconnection run closeconnection?? I'm connecting to a linux service and I need a message after receiving response and befor close connection.

  • #2
    See http://static.springsource.org/sprin...p-interceptors

    You can add an interceptor and intercept the onMessage() method and send the data either before or after calling super.onMessage() (which will send the reply to the sender).

    When the intercepted method returns, the connection will be closed.

    Comment


    • #3
      hi Gary, I implemented a interceptor factory class and also implemented a interceptor with onMessage method ( based on AbstractTcpConnectionInterceptor) :


      public boolean onMessage(Message<?> message) {
      if (this.tcpListener == null) {
      throw new NoListenerException("No listener registered for message reception");
      }
      boolean b = this.tcpListener.onMessage(message);
      try {
      this.theConnection.send( MessageBuilder.withPayload("XXXXXX").build());
      } catch (Exception e) {
      log.debug(e);
      // TODO Auto-generated catch block
      throw new NoListenerException("ERROR");
      }
      return b;
      }


      XXXX is the message. The problem is sometimes appear in the log:
      16:33:06,072|ERROR||org.springframework.integratio n.ip.tcp.TcpOutboundGateway|Cannot correlate response - no pending reply
      or
      16:33:06,072|DEBUG||com.booleancore.ws.ConexionInt erceptor|java.net.SocketException: Broken pipe

      I used a tcp-connection-factory type client and tcp-outbound-gateway,

      like I said , I need to send a message with the gateway ( because i need the response ) but I need to send a message ( without response ) before close connection.

      Please help me, thks!

      Comment


      • #4
        I am not sure what's going on for you, but I just wrote this test case and all works as expected for me...

        Code:
        	@Test
        	public void testInterceptSendAck() throws Exception {
        		final int port = SocketUtils.findAvailableServerSocket();
        		AbstractConnectionFactory ccf = new TcpNetClientConnectionFactory("localhost", port);
        		final CountDownLatch latch = new CountDownLatch(1);
        		final AtomicBoolean done = new AtomicBoolean();
        		final AtomicReference<Object> ack = new AtomicReference<Object>();
        		final CountDownLatch ackLatch = new CountDownLatch(1);
        		Executors.newSingleThreadExecutor().execute(new Runnable() {
        			public void run() {
        				try {
        					ServerSocket server = ServerSocketFactory.getDefault().createServerSocket(port, 100);
        					latch.countDown();
        					while (true) {
        						Socket socket = server.accept();
        						ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());
        						ois.readObject();
        						ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
        						oos.writeObject("bar");
        						ois = new ObjectInputStream(socket.getInputStream());
        						ack.set(ois.readObject());
        						ackLatch.countDown();
        					}
        				} catch (Exception e) {
        					if (!done.get()) {
        						e.printStackTrace();
        					}
        				}
        			}
        		});
        		ccf.setSerializer(new DefaultSerializer());
        		ccf.setDeserializer(new DefaultDeserializer());
        		ccf.setSoTimeout(10000);
        		ccf.setSingleUse(true);
        		TcpConnectionInterceptorFactoryChain interceptorFactoryChain = new TcpConnectionInterceptorFactoryChain();
        		interceptorFactoryChain.setInterceptors(new TcpConnectionInterceptorFactory[] {new TcpConnectionInterceptorFactory() {
        			public TcpConnectionInterceptor getInterceptor() {
        				return new AbstractTcpConnectionInterceptor() {
        
        					@Override
        					public boolean onMessage(Message<?> message) {
        						boolean b = super.onMessage(message);
        						try {
        							this.send(MessageBuilder.withPayload("baz").build());
        						}
        						catch (Exception e) {
        							e.printStackTrace();
        						}
        						return b;
        					}
        				};
        			}
        		}});
        		ccf.setInterceptorFactoryChain(interceptorFactoryChain );
        		ccf.afterPropertiesSet();
        		ccf.start();
        		assertTrue(latch.await(10000, TimeUnit.MILLISECONDS));
        		TcpOutboundGateway gateway = new TcpOutboundGateway();
        		gateway.setConnectionFactory(ccf);
        		QueueChannel replyChannel = new QueueChannel();
        		gateway.setRequiresReply(true);
        		gateway.setOutputChannel(replyChannel);
        		gateway.afterPropertiesSet();
        		gateway.handleMessage(MessageBuilder.withPayload("foo").build());
        		assertTrue(ackLatch.await(10000, TimeUnit.MILLISECONDS));
        		Message<?> reply = replyChannel.receive(10000);
        		assertNotNull(reply);
        		assertEquals("bar", reply.getPayload());
        		assertEquals("baz", ack.get());
        	}
        Here's what I see in the log...

        Code:
        2012-12-17 07:37:30,542 [main] DEBUG: org.springframework.integration.ip.tcp.connection.TcpNetConnection - Message sent [Payload=foo][Headers={timestamp=1355747850530, id=cd45d182-0858-4901-ab40-2a3063adee04}]
        2012-12-17 07:37:30,546 [pool-2-thread-1] DEBUG: org.springframework.integration.ip.tcp.connection.TcpNetConnection - Message received [Payload=bar][Headers={timestamp=1355747850546, id=a08c45da-3817-4876-9975-6ffeb2f5cbf6, ip_tcp_remotePort=59336, ip_address=127.0.0.1, ip_hostname=localhost, ip_connectionId=localhost:59336:558c1d07-67f9-498c-9329-c3c1b3ec6135}]
        2012-12-17 07:37:30,547 [main] DEBUG: org.springframework.integration.ip.tcp.TcpOutboundGateway - Respose [Payload=bar][Headers={timestamp=1355747850546, id=a08c45da-3817-4876-9975-6ffeb2f5cbf6, ip_tcp_remotePort=59336, ip_address=127.0.0.1, ip_hostname=localhost, ip_connectionId=localhost:59336:558c1d07-67f9-498c-9329-c3c1b3ec6135}]
        2012-12-17 07:37:30,547 [pool-2-thread-1] DEBUG: org.springframework.integration.ip.tcp.connection.TcpNetConnection - Message sent [Payload=baz][Headers={timestamp=1355747850546, id=bb2047f3-13d3-414f-a2e8-25b974ee773c}]
        2012-12-17 07:37:30,547 [pool-2-thread-1] DEBUG: org.springframework.integration.ip.tcp.connection.TcpNetConnection - Closing single use socket after inbound message localhost:59336:558c1d07-67f9-498c-9329-c3c1b3ec6135
        I suggest you compare your log with this.

        I would also suggest you subclass AbstractTcpConnectionInterceptor rather than "basing" your interceptor on it.

        Comment


        • #5
          Thks Gary, your example is perfect! but the weird in my testing and production, this run good 1 time and the next time not, then run good again and then not, and go on... i dont know the reason. The time is bad in the log shows:

          |ERROR||org.springframework.integration.ip.tcp.Tcp OutboundGateway|Cannot correlate response - no pending reply

          when is good, dont show that. The linux service only return values under request, but the other message before close connection doesnt response ( and it's correct ) , but i dont understand why run ok one time and the next not.

          Comment


          • #6
            That's weird; it implies a reply is being received before the request was sent out.

            Can you post a DEBUG log showing a good and bad event?

            Comment


            • #7
              the correct is:

              Code:
              16:29:49,365|DEBUG||org.springframework.integration.ip.tcp.connection.TcpNetClientConnectionFactory|Opening new socket connection to 192.168.1.2:1025
              16:29:49,374|DEBUG||org.springframework.integration.ip.tcp.connection.TcpNetConnection|Reading...
              16:29:49,375|DEBUG||com.mylib.ws.ByteToStringSerializer|Bytes disponibles para lectura:0
              16:29:49,375|DEBUG||org.springframework.integration.ip.tcp.TcpOutboundGateway|Added 192.168.1.2:1025:5578e9c1-adb1-4651-9542-c81342439656
              16:29:49,376|DEBUG||org.springframework.integration.ip.tcp.connection.TcpNetConnection|Message sent [Payload=XXXXX][Headers={timestamp=1355779789363, id=2703f41e-8542-4ebf-87f3-15b00d079485, errorChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@2d0d719, replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@2d0d719}]
              16:29:49,919|DEBUG||org.springframework.integration.ip.tcp.connection.TcpNetConnection|Message received [Payload=[B@1bcbb330][Headers={timestamp=1355779789919, id=6622103b-53d4-4d73-9428-ebcf11ad4fb0, ip_tcp_remotePort=1025, ip_address=192.168.1.2, ip_hostname=192.168.1.2, ip_connectionId=192.168.1.2:1025:5578e9c1-adb1-4651-9542-c81342439656}]
              16:29:49,919|DEBUG||com.mylib.ConexionInterceptor|INIT
              16:29:49,919|DEBUG||com.mylib.ConexionInterceptor|BEFORE
              16:29:49,920|DEBUG||com.mylib.ConexionInterceptor|ENTER
              16:29:49,920|DEBUG||org.springframework.integration.ip.tcp.TcpOutboundGateway|Respose [Payload=[B@1bcbb330][Headers={timestamp=1355779789919, id=6622103b-53d4-4d73-9428-ebcf11ad4fb0, ip_tcp_remotePort=1025, ip_address=192.168.1.2, ip_hostname=192.168.1.2, ip_connectionId=192.168.1.2:1025:5578e9c1-adb1-4651-9542-c81342439656}]
              16:29:49,921|DEBUG||org.springframework.integration.ip.tcp.connection.TcpNetConnection|Message sent [Payload=YYYY][Headers={timestamp=1355779789920, id=24ac770b-c398-46ad-9280-e2cee805ad24}]
              16:29:49,921|DEBUG||org.springframework.integration.ip.tcp.TcpOutboundGateway|handler 'org.springframework.integration.ip.tcp.TcpOutboundGateway#0' sending reply Message: [Payload=[B@1bcbb330][Headers={timestamp=1355779789921, id=40fa7230-ce9d-4da2-9752-a8194f9c1ec6, errorChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@2d0d719, ip_address=192.168.1.2, ip_tcp_remotePort=1025, replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@2d0d719, ip_hostname=192.168.1.2, ip_connectionId=192.168.1.2:1025:5578e9c1-adb1-4651-9542-c81342439656}]
              16:29:49,921|DEBUG||org.springframework.integration.ip.tcp.connection.TcpNetConnection|Closing single use socket after inbound message 192.168.1.2:1025:5578e9c1-adb1-4651-9542-c81342439656
              16:29:49,921|DEBUG||org.springframework.beans.factory.support.DefaultListableBeanFactory|Returning cached instance of singleton bean 'integrationConversionService'
              16:29:49,922|DEBUG||org.springframework.integration.channel.QueueChannel|preSend on channel 'reply', message: [Payload=RXXXXXX][Headers={timestamp=1355779789922, id=eb6d2237-59b8-460b-9bca-d363dc5d8846, errorChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@2d0d719, ip_tcp_remotePort=1025, ip_address=192.168.1.2, replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@2d0d719, ip_hostname=192.168.1.2, ip_connectionId=10.10.183.114:1025:5578e9c1-adb1-4651-9542-c81342439656}]
              and the bad is:


              Code:
              16:29:52,904|DEBUG||org.springframework.integration.channel.DirectChannel|preSend on channel 'input', message: [Payload=XXXXX][Headers={timestamp=1355779792904, id=e90e1f6d-910b-462d-b566-e5f72b15bc7f, errorChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@3f731338, replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@3f731338}]
              16:29:52,904|DEBUG||org.springframework.integration.ip.tcp.TcpSendingMessageHandler|org.springframework.integration.ip.tcp.TcpSendingMessageHandler#0 received message: [Payload=XXXXX][Headers={timestamp=1355779792904, id=e90e1f6d-910b-462d-b566-e5f72b15bc7f, errorChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@3f731338, replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@3f731338}]
              16:29:52,904|DEBUG||org.springframework.integration.ip.tcp.connection.TcpNetClientConnectionFactory|Opening new socket connection to 192.168.1.2:1025
              16:29:52,905|DEBUG||org.springframework.integration.ip.tcp.connection.TcpNetConnection|Reading...
              16:29:52,905|DEBUG||org.springframework.integration.ip.tcp.TcpSendingMessageHandler|Got Connection 192.168.1.2:1025:b2a886c1-568f-48dd-b70d-209191d34bff
              16:29:52,905|DEBUG||org.springframework.integration.ip.tcp.connection.TcpNetConnection|Message sent [Payload=XXXXX][Headers={timestamp=1355779792904, id=e90e1f6d-910b-462d-b566-e5f72b15bc7f, errorChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@3f731338, replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@3f731338}]
              16:29:52,906|DEBUG||org.springframework.integration.channel.DirectChannel|postSend (sent=true) on channel 'input', message: [Payload=XXXXX][Headers={timestamp=1355779792904, id=e90e1f6d-910b-462d-b566-e5f72b15bc7f, errorChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@3f731338, replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@3f731338}]
              16:29:52,952|TRACE||org.springframework.integration.channel.QueueChannel|postReceive on channel 'reply', message is null
              16:29:52,952|DEBUG||org.springframework.integration.endpoint.PollingConsumer|Received no Message during the poll, returning 'false'
              16:29:52,963|TRACE||org.springframework.integration.channel.QueueChannel|preReceive on channel 'reply'
              16:29:53,049|DEBUG||org.springframework.integration.ip.tcp.connection.TcpNetConnection|Message received [Payload=[B@3179aaa6][Headers={timestamp=1355779793049, id=ab69790f-5854-433b-9729-e762b97fa48c, ip_tcp_remotePort=1025, ip_address=192.168.1.2, ip_hostname=192.168.1.2, ip_connectionId=192.168.1.2:1025:b2a886c1-568f-48dd-b70d-209191d34bff}]
              16:29:53,049|DEBUG||com.mylib.ConexionInterceptor|INIT
              16:29:53,049|DEBUG||com.mylib.ConexionInterceptor|BEFORE
              16:29:53,049|ERROR||org.springframework.integration.ip.tcp.TcpOutboundGateway|Cannot correlate response - no pending reply
              16:29:53,049|DEBUG||com.mylib.ConexionInterceptor|ENTER
              16:29:53,050|DEBUG||org.springframework.integration.ip.tcp.connection.TcpNetConnection|Message sent [Payload=YYYY][Headers={timestamp=1355779793049, id=fe4e694c-06e1-44b8-aa22-718ffa249754}]
              16:29:53,050|DEBUG||org.springframework.integration.ip.tcp.connection.TcpNetConnection|Closing single use socket after inbound message 192.168.1.2:1025:b2a886c1-568f-48dd-b70d-209191d34bff
              16:29:53,908|TRACE||org.springframework.integration.core.MessagingTemplate|failed to receive message from channel 'org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@3f731338' within timeout: 1000
              the interceptor class is the copy from AbstractTcpConnectionInterceptor

              Code:
              public class ConexionInterceptor  implements TcpConnectionInterceptor {
              	 protected final Logger log = Logger.getLogger(getClass());
              ....
              
              	public boolean onMessage(Message<?> message) {
              		log.debug("INIT");
              		if (this.tcpListener == null) {
              			throw new NoListenerException("No listener registered for message reception");
              		}
              		log.debug("BEFORE");
              		boolean b = this.tcpListener.onMessage(message);
              		try {
              			log.debug("ENTER");
              			
              			this.theConnection.send( MessageBuilder.withPayload("YYYY").build());
              			
              		} catch (Exception e) {
              			log.debug("ERROR");
              			log.debug(e);
              			
              			
              			
              			// TODO Auto-generated catch block
              			throw new NoListenerException("ERROR");
              		}
              		return b;
              	}



              The linux service doesnt have problems. The bad code doesnt return the response.

              Comment


              • #8
                Can you show you SI configuration? It looks like you have both an <ip:outbound-gateway/> and an <ip:outbound-channel-adapter/> subscribed to channel 'input'.

                When more than one subscriber is subscribed to a direct channel, the framework round-robins the messages between the subscribers. Hence the first goes through the gateway, the second goes through the adapter. The gateway doesn't know anything about messages sent through the adapter (which is for one-way integration), there is no way it can handle the reply...

                Good...
                Code:
                16:29:49,375|DEBUG||org.springframework.integration.ip.tcp.TcpOutboundGateway|Added 192.168.1.2:1025:5578e9c1-adb1-4651-9542-c81342439656
                Bad...
                Code:
                16:29:52,904|DEBUG||org.springframework.integration.channel.DirectChannel|preSend on channel 'input', message: [Payload=XXXXX][...
                ...
                16:29:52,904|DEBUG||org.springframework.integration.ip.tcp.TcpSendingMessageHandler|org.springframework.integration.ip.tcp.TcpSendingMessageHandler#0 received message: [Payload=XXXXX][Headers=...
                ...
                16:29:53,049|ERROR||org.springframework.integration.ip.tcp.TcpOutboundGateway|Cannot correlate response - no pending reply
                The good/bad will alternate because the dispatcher alternates the requests between the gateway and the adapter.
                Last edited by Gary Russell; Dec 18th, 2012, 08:56 AM.

                Comment


                • #9
                  You're in the correct!. Everything OK right now, Thank you very much, and congratulations for this excellent module.

                  Comment

                  Working...
                  X