Announcement Announcement Module
Collapse
No announcement yet.
reply-channel for tcp-inbound-gateway Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • #16
    Another confusing part is what tasks the executor will run, all the subscribers of the publish and subscribe channel? For example, in your case, you defined a thread pool with pool size 10.

    You can choose whatever task executor you want; the easiest is to use the <task: /> namespace, e.g.

    Code:
    	<task:executor id="executor" pool-size="10"/>
    Does that mean all the subscribers will be run on this thread pool?

    Thanks,

    John

    Comment


    • #17
      ...Does that mean the server will keep reading data from the socket and trying to process it even the input is empty?
      It depends on whether using-nio is true or false. Please read the reference documentation; if it's not clear there, please let us know.

      To summarize...

      If not using NIO, we have to dedicate a thread to each socket and we block on the read until there is new data, when data arrives we keep reading until we detect the end of the message, the message is then put on the channel and we wait for a reply; then we go back and wait for (read) the next message. In this case, the thread is never put back into the pool (until the socket closes).

      If using NIO, threads are shared between sockets and a thread is used to read/assemble a message until it is complete; it then passes it to another thread that sends the message and waits for the reply. The initial thread is put back into the pool as soon as the message has been assembled; the second thread is put back into the pool when the reply is received. Reader threads and sender threads are continually being grabbed and returned to the pool.

      NIO provides an event driven architecture so we know when new data is available on a socket; without NIO, we have no choice but to block on a read.

      Comment


      • #18
        ...Does that mean all the subscribers will be run on this thread pool?
        Channels, by default, are DirectChannels; that means the message consumer is run on the same thread as the producer. If you do an aysnc handoff (perhaps by using a pub-sub channel with a task-executor, or a QueueChannel), then the thread used for the hand-off will come from whatever task executor you configure for that purpose. If you configure the same task executor to be used in different places (e.g. poller, pub-sub channel, tcp connection factory, etc, etc) then, yes, they will share the same pool. But, there is nothing stopping you from configuring a different executor for each place.

        Comment


        • #19
          In my previous test, I did not use the nio. Then, I turned the use-nio to true, the empty input string "" is still coming and thus led to errors.

          Originally posted by Gary Russell View Post
          It depends on whether using-nio is true or false. Please read the reference documentation; if it's not clear there, please let us know.
          I have not objection to whether block the thread for the socket, but am curious about why the empty input got passed to the message channel.
          For the tcp-connection-factory, I didn't specify the serializer and deserializer, then it should use CRLF ones, right?
          Code:
          	<ip:tcp-connection-factory id="crLfServer" 
          		type="server"
                  pool-size="40"
                  using-nio="true"
          		port="8888"/>
          For the empty string, it certainly does not have CRLF and the server should just keep checking if there is CRLF marked message coming and keep waiting for that. I just could not understand why the empty string got passed in. Or do I misunderstand anything here?

          Thanks,

          John

          Comment


          • #20
            I think this is your problem...

            DEBUG: org.springframework.integration.transformer.Messag eTransformingHandler - org.springframework.integration.transformer.Messag eTransformingHandler@2e8aeed0 received message: [Payload={"products":[{"type":"BOOKS","ean":"2000003602803"},{"type": "ME RCH","ean":"2000003602888"},{"type":"WORKS","ean" : "2000003602999"}]}
            ][Headers={timestamp=1297220122633, id=f3f3ffbc-3f2f-401d-8c58-9228c0bc1ccb, errorChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@46e06703,
            Notice the newline at the end of the payload (between your final } and the ][Headers...)

            Looks to me like your payload already has a CRLF at the end. The serializer on the client side is adding another CRLF and we are reading that as a second message...

            ...
            2011-02-09 12:57:39,112 (pool-1-thread-2) [ByteArrayCrLfSerializer.deserialize]
            DEBUG - Available to read:2
            2011-02-09 12:57:39,112 (pool-1-thread-2) [TcpNetConnection.run]
            DEBUG - Message received [Payload=[B@53458dcb][Headers={timestamp=1297274259112, id=10f502df-6ac4-445c-8cad-9ce8bd87a47a, ip_address=127.0.0.1, ip_connection_seq=2, ip_hostname=localhost.localdomain, ip_tcp_remote_port=57415, ip_connection_id=localhost.localdomain:57415:17741 97673}]
            ...
            You need to strip off the CRLF from your payload or write a custom serializer that does not add CRLF on the client side (I just checked in a ByteArrayRawSerializer which would work) - you need to retain the existing deserializer on the server side.

            Comment


            • #21
              Yes, you are right. I added a CRLF because I didn't know the client will automatically add a CRLF to the message. After strip off the CRLF, no error any more.

              Thanks again for your help.

              John

              Comment


              • #22
                Great!

                I would still recommend you protect yourself, though, by filtering out the replyChannel with a <header-filter/> on the aggregatedProductChannel leg, just in case.

                Comment


                • #23
                  Sorry for bothering you again. Here is the latest connection factory

                  Code:
                      <beans:bean id="tcpSerializer"
                            class="org.springframework.integration.ip.tcp.serializer.ByteArrayLengthHeaderSerializer">
                              <beans:property name="maxMessageSize" value="20480"/>
                      </beans:bean>
                  
                      <beans:bean id="tcpDeserializer"
                            class="org.springframework.integration.ip.tcp.serializer.ByteArrayLengthHeaderSerializer">
                          <beans:property name="maxMessageSize" value="20480"/>
                      </beans:bean>
                  
                  	<ip:tcp-connection-factory id="client"
                          serializer="tcpSerializer"
                          deserializer="tcpDeserializer"
                  		type="client"
                  		host="localhost"
                  		port="8888"
                          using-nio="true"
                          single-use="true"
                  		so-timeout="10000"/>
                  
                  	<ip:tcp-connection-factory id="crLfServer"
                          serializer="tcpSerializer"
                          deserializer="tcpDeserializer"
                  		type="server"
                          pool-size="40"
                          using-nio="true"
                  		port="8888"/>

                  But I keep seeing the following error message.

                  2011-02-09 16:34:19,863 (pool-1-thread-12) [TcpNioConnection.readPacket]
                  ERROR - Exception on Read localhost.localdomain:58631:404837374 Connection reset by peer
                  Any idea to remove this kind of errors?

                  Thanks,

                  John

                  Comment


                  • #24
                    Originally posted by Gary Russell View Post
                    Great!

                    I would still recommend you protect yourself, though, by filtering out the replyChannel with a <header-filter/> on the aggregatedProductChannel leg, just in case.
                    Thanks, I renamed the aggregatedProductChannel to productPublishChannel and did what you suggested as follows.

                    Code:
                        <publish-subscribe-channel id="productPublishChannel" task-executor="executor" ignore-failures="true"/>
                    
                        <task:executor id="executor" pool-size="40"/>
                    
                        <poller id="poller" default="true" max-messages-per-poll="1" fixed-rate="1000" task-executor="executor"/>
                    
                        <service-activator input-channel="productPublishChannel" output-channel="replyChannel"
                    		ref="productValidator" method="validate"/>
                    
                        <chain input-channel="productPublishChannel" output-channel="productsChannel">
                            <header-filter header-names="replyChannel"/>
                            <service-activator ref="productSplitter"  method="split"/>
                        </chain>
                    The productValidator should return either "successful" or "failed" back to the client. But I still saw the client received the product json message instead of the "successful" or "failed" message. Anything I did wrong here?

                    Thanks,

                    John

                    Comment


                    • #25
                      Any idea to remove this kind of errors?
                      Try upgrading to Spring-Integration 2.0.2.RELEASE (released yesterday) or try setting so-linger to, say, 5000. (See https://jira.springsource.org/browse/INT-1707).

                      Comment


                      • #26
                        Seems the header-filter does not take effect, see the log here.

                        DEBUG - postSend (sent=true) on channel 'productsChannel', message: [Payload=[Product{ean='20001191', type='BOOKS'}, Product{ean='20001192', type='BOOKS'}, Product{ean='20001193', type='BOOKS'}, Product{ean='20001194', type='BOOKS'}, Product{ean='20001195', type='BOOKS'}, Product{ean='20001196', type='BOOKS'}, Product{ean='20001197', type='BOOKS'}, Product{ean='20001198', type='BOOKS'}, Product{ean='20001199', type='BOOKS'}, Product{ean='20001200', type='BOOKS'}, Product{ean='20001201', type='BOOKS'}, Product{ean='20001202', type='BOOKS'}, Product{ean='20001203', type='BOOKS'}, Product{ean='20001204', type='BOOKS'}, Product{ean='20001205', type='BOOKS'}, Product{ean='20001206', type='BOOKS'}, Product{ean='20001207', type='BOOKS'}, Product{ean='20001208', type='BOOKS'}, Product{ean='20001209', type='BOOKS'}, Product{ean='20001210', type='BOOKS'}, Product{ean='20001211', type='BOOKS'}, Product{ean='20001212', type='BOOKS'}, Product{ean='20001213', type='BOOKS'}, Product{ean='20001214', type='BOOKS'}, Product{ean='20001215', type='BOOKS'}, Product{ean='20001216', type='BOOKS'}, Product{ean='20001217', type='BOOKS'}, Product{ean='20001218', type='BOOKS'}, Product{ean='20001219', type='BOOKS'}, Product{ean='20001220', type='BOOKS'}, Product{ean='20001221', type='BOOKS'}, Product{ean='20001222', type='BOOKS'}, Product{ean='20001223', type='BOOKS'}, Product{ean='20001224', type='BOOKS'}, Product{ean='20001225', type='BOOKS'}, Product{ean='20001226', type='BOOKS'}, Product{ean='20001227', type='BOOKS'}, Product{ean='20001228', type='BOOKS'}, Product{ean='20001229', type='BOOKS'}, Product{ean='20001230', type='BOOKS'}, Product{ean='20001231', type='BOOKS'}, Product{ean='20001232', type='BOOKS'}, Product{ean='20001233', type='BOOKS'}, Product{ean='20001234', type='BOOKS'}, Product{ean='20001235', type='BOOKS'}, Product{ean='20001236', type='BOOKS'}, Product{ean='20001237', type='BOOKS'}, Product{ean='20001238', type='BOOKS'}, Product{ean='20001239', type='BOOKS'}, Product{ean='20001240', type='BOOKS'}, Product{ean='20001241', type='BOOKS'}]][Headers={timestamp=1297289282817, id=7c11b459-a421-4945-b6b0-4cc28df0c5e8, errorChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@76d78df0, ip_address=127.0.0.1, replyChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@76d78df0, ip_connection_seq=1, ip_hostname=localhost.localdomain, ip_tcp_remote_port=44213, ip_connection_id=localhost.localdomain:44213:19527 03843}]
                        The replyChannel is still there.

                        Comment


                        • #27
                          2.0.2 is out? Great. Will try that. Hope it is in the Maven repo now.

                          Originally posted by Gary Russell View Post
                          Try upgrading to Spring-Integration 2.0.2.RELEASE (released yesterday) or try setting so-linger to, say, 5000. (See https://jira.springsource.org/browse/INT-1707).

                          Comment


                          • #28
                            The replyChannel is still there.
                            Hmmm... can't explain that (and I can't reproduce it either with a simpler test case). Can you share more of the log?

                            Comment


                            • #29
                              Attached please find the log file and the xml configurations for the server and the client.

                              The client now receives the "successful" message, but the Channel closed error is still there even I upgraded to SI 2.0.2 release.

                              Thanks,

                              John

                              Comment


                              • #30
                                Thanks

                                2011-02-09 18:14:51,905 (pool-1-thread-4) [TcpNioConnection.readPacket]
                                ERROR - Exception on Read localhost.localdomain:37779:343935649 Channel closed
                                At least we are getting a close now, instead of a reset (which is what the 2.0.2 fix was about) but, given your configuration, I wouldn't expect to see a thread in readPacket() at this point (when the client closed the socket), we should have no pending data in the socket so we shouldn't have received any indication that we needed to read more data. Threads are only dispatched into readPacket() when the selector has indicated there is data to read.

                                2011-02-09 18:14:51,824 (pool-1-thread-2) [TcpNioConnection.doRead]
                                DEBUG - Read 135 into raw buffer
                                2011-02-09 18:14:51,825 (pool-1-thread-3) [ByteArrayLengthHeaderSerializer.read]
                                DEBUG - Read 4 bytes, buffer is now at 4 of 4
                                2011-02-09 18:14:51,825 (pool-1-thread-3) [ByteArrayLengthHeaderSerializer.deserialize]
                                DEBUG - Message length is 131
                                2011-02-09 18:14:51,825 (pool-1-thread-3) [ByteArrayLengthHeaderSerializer.read]
                                DEBUG - Read 131 bytes, buffer is now at 131 of 131
                                What operating system are you running on?

                                Do you see a similar ERROR when using-nio="false"?

                                Can you provide a TRACE level log from the server with using-nio="true"? (Please also send the corresponding client-side log).

                                What happens if you set single-use="true" (with using-nio="true") on the server connection factory?

                                Thanks!

                                Comment

                                Working...
                                X