Announcement Announcement Module
Collapse
No announcement yet.
udp-inbound-adapter missing sequence correlationID headers Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • udp-inbound-adapter missing sequence correlationID headers

    Hi

    I'm trying to send different files(of unknown size) across a network using UDP (using udp is a requirement) at the moment I use a file:inbound-channel-adapter which polls a directory for files, followed by a file-to-bytes-transformer the output of which I feed into a splitter ( as the files can be varied sizes I break the byte streams into known sizes for aggregation after the transfer) .

    file-inbound-adapter--->file-to-byte-transfomer--->splitter--->channel--->udp-outbound-channel-adapter

    Looking at the output of the splitter through a wire-tap on the channel prior to the udp-outbound-adapter I get the following payload/ header info:

    [Payload=[B@b3e15f7][Headers={timestamp=1341442699646, id=39294f3a-5eac-4071-8fea-70b2ebca67d1, correlationId=6707d7c8-c225-43cf-b0a5-8f1bd259f307, sequenceSize=12, file_originalFile=/path/to/files/Attorney Questions.odt, file_name=Attorney Questions.odt, sequenceNumber=1}]
    08:58:19,648 INFO ework.integration.handler.LoggingHandler: 121 - [Payload=[B@44023756][Headers={timestamp=1341442699648, id=7640bb42-1d09-4b3f-9aab-5f90f0d4dd1d, correlationId=6707d7c8-c225-43cf-b0a5-8f1bd259f307, sequenceSize=12, file_originalFile=/path/to/files/Attorney Questions.odt, file_name=Attorney Questions.odt, sequenceNumber=2}]
    ...
    ...
    Payload=[B@120540c][Headers={timestamp=1341442699654, id=9782fffe-b5cf-497a-a44d-808ff9168d84, correlationId=6707d7c8-c225-43cf-b0a5-8f1bd259f307, sequenceSize=12, file_originalFile=/path/to/files/Attorney Questions.odt, file_name=Attorney Questions.odt, sequenceNumber=12}]

    On my receive end I use a udp-inbound-channel-adapter

    udp-inbound-channel-adapter -- > receiveUdpChannel

    Looking at the input to the receiveUdpChannel with a wire-tap I get the following payload/header info(the number of records matches the number that were sent however the information required for correlation has been stripped from the header somewhere ..):

    Payload=[B@d5e92d7][Headers={timestamp=1341443822722, id=8da5a0cf-53c0-4ee3-9335-9ca97cc878aa, ip_address=127.0.0.1, ip_hostname=localhost}]

    this in turn leads to a
    Exception in thread "UDP-Incoming-Msg-Handler" org.springframework.integration.MessageHandlingExc eption: error occurred in message handler [org.springframework.integration.aggregator.Correla tingMessageHandler#0]
    ...
    Caused by: java.lang.IllegalStateException: Null correlation not allowed. Maybe the CorrelationStrategy is failing?
    I've included the context below ... any help as always appreciated... I'm using STS Version: 2.9.2.RELEASE
    Build Id: 201205071000


    Code:
    <!-- Beans used as handlers  -->
    	<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" />
    	<bean id="writePayloadToFileHandler" class="org.springintegration.cds.WritePayloadToFileMessageHandler" />
    	<bean id="handleSplitter" class="org.springintegration.cds.SplitMessageHandler" />
    		
    		
    	<!-- polls for files in the specified directory using the polling settings below -->
    	<file:inbound-channel-adapter id="filesIn"
    		directory="file:/home/john/input"  />
    
    	<!--  Poller used to poll specifies directory -->
    	<integration:poller id="poller" default="true"
    		fixed-delay="5000" />
    
    	<!-- File object to byte stream transformer required to use the udp outbound channel adapter -->
    	<file:file-to-bytes-transformer
    		input-channel="filesIn" output-channel="bytesOut" />
    	
    	<!-- split byte [] into known sizes for aggragation after network transfer -->
    	<int:splitter input-channel="bytesOut" apply-sequence="true"
    		output-channel="sendUdp" ref="handleSplitter" id="splitterChannel" />
    
    
    	<!-- udp output channel adapter -->
    	<ip:udp-outbound-channel-adapter id="udpOut"
    									 host="127.0.0.1" 
    									 port="19876" 
    									 multicast="false" 
    									 check-length="false" 
    									 so-send-buffer-size="4096"
    									 channel="sendUdp" />
                                        
    	<!-- inbound udp adapter -->
    	<ip:udp-inbound-channel-adapter id="udpIn" port="19876" 
    		channel="receiveUdp" multicast="false" check-length="false"  receive-buffer-size="4096" />
    	 
    	<!-- aggragate messages into one message -->
    	<int:aggregator input-channel="receiveUdp"
    		release-strategy="releaseStrategy"
    		correlation-strategy="correlationStrategy" id="fileMessageAggregator"
    		output-channel="aggragatorOutChannel">
    	</int:aggregator>
    	
    	<!-- correlation strategy to use -->
    	<beans:bean id="correlationStrategy" class="org.springframework.integration.aggregator.HeaderAttributeCorrelationStrategy">
    		<beans:constructor-arg value="CONVERSATION_ID" />
    	</beans:bean>  
    	
    	<!-- release strategy -->
    	<beans:bean id="releaseStrategy" class="org.springframework.integration.aggregator.SequenceSizeReleaseStrategy">
    		<beans:constructor-arg value="false" />
    	</beans:bean>
        
        <!-- output channel of aggregator -->
    	<int:channel id="aggragatorOutChannel">
    			<int:interceptors>
    			<int:wire-tap channel="logger" />
    		</int:interceptors>
    	</int:channel>
    	
    	<!-- Convert payload to File -->	
    	 <integration:service-activator output-channel="filesOut"
                                       ref="writePayloadToFileHandler" input-channel="aggragatorOutChannel"/>                               
         
        <!-- write files to output directory -->                               
        <file:outbound-channel-adapter id="filesOut"
        	directory="file:/home/john/output" />
    Last edited by squidVicious; Jul 4th, 2012, 06:42 PM. Reason: additional info

  • #2
    Unlike http, jms, etc, UDP/TCP has no standard way to transmit a "message", with headers/payload. Therefore, currently, these adapters transfer the payload only.

    If you wish to transfer correlation data, you need to include it in the payload somehow, then move it back to the headers on the receiving side.

    We do have an open JIRA to transfer an entire message over TCP, but doing the same with UDP garnered little interest; in addition UDP has a very limited packet size.

    Comment


    • #3
      Hi Gary

      thanks for the reply, I was aware that TCP did not include header info however got thrown a bit by by the fact there did seem to be header information coming through with the payload ie [Headers={timestamp=1341443822722, id=8da5a0cf-53c0-4ee3-9335-9ca97cc878aa, ip_address=127.0.0.1, ip_hostname=localhost} .... I'm assuming then that this information is somehow being packaged with the payload in such a way that the wire-tap can split it out of the actual payload. Is there a preferred / recommended way of including the header details in the payload? thanks again

      Comment


      • #4
        It depends on the payload type; if it's a simple string, you could do something like:

        client:
        Code:
        	<int:transformer output-channel="foo" input-channel="bar"
        		expression="headers.correlationId + ':' + payload" />
        server:
        Code:
        	<int:chain input-channel="foo" output-channel="bar">
        		<int:header-enricher>
        			<int:correlation-id expression="payload.substring(0, payload.indexOf(':'))"/>
        		</int:header-enricher>
        		<int:transformer expression="payload.substring(payload.indexOf(':') + 1)"/>
        	</int:chain>

        Comment

        Working...
        X