Announcement Announcement Module
Collapse
No announcement yet.
UDP INBOUND, Buffering Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • UDP INBOUND, Buffering

    Hi!
    I am just starting with Spring Integration...
    Could anyone suggest me some kind of best aproach solution for given scenario?

    In my flow i have:
    - udp-inbound-channel-adapter - receive messages
    - transformer - byte array to POJO
    - service-activator - checks sequence number of received messages

    there is a problem...

    When I detect there is some mismatch in sequnce numbers of received messages (there is some kind of gap), I have to make a retransmission (for lost message) and for given amount of time, stop processing other messages (but they still are being received on inbound adapter).
    When I get missing message from external source, I have to resume processing of messages that have been queued (when I was waiting for missed message) and messages that are arriving in real time...

  • #2
    Sounds like you need a resequencer, with the functionality described here... https://jira.springsource.org/browse/INT-1769

    Until that's implemented (hopefully in the next month or so), you'll need to use a custom stateful <service-activator/>

    Something like...

    Code:
    List<Message<?>> resequence(Message<?> message) {
       if (!sequence ok) {
            save message in ordered list
            return null
       }   
       if (message fills gap {
           return a new list from the ordered messages up to next gap
       }
       if (sequence ok and list empty {
           return a new list with this one message
       }
       add message to ordered list
       return null
    }
    Then, follow this <service-activator/> with <splitter... expression="payload"/>

    This splitter will split the list into individual messages,
    Last edited by Gary Russell; May 26th, 2011, 07:07 AM.

    Comment


    • #3
      Maybe i Got it wrong but..

      Messages are arriving in order 1,2,3,4 from UDP external source.
      They must be transmitted in exact order...so on output (after processing) it must be also 1,2,3,4

      So when gap appears I have 1,2,3,4...GAP...8,9,10 (when I will see there is a gap I invoke TCP request for retransmission for 5,6,7 messages).
      In mean time, messages are buffered so we have something like: 1,2,3,4,8,9,10,11,12,13...
      When retransmission (with lost messages) response comes we have got: 1,2,3,4,8,9,10,11,12,13,14,15,5,6,7

      I have managed to do it using <resequencer />

      Code:
      	
      <int:resequencer id="seqNumberResequencer" comparator="messageComparator"
      	input-channel="filteredChannel" output-channel="logger"
      	release-strategy="sequenceReleaseStrategy" />
      <int:channel id="sequencedChannel" />
      Where:
      messageComparator - compares messages (makes ordering) using sequence numbers (it could be default, Spring Docs: Defaults to comparing the sequence number header)
      sequenceReleaseStrategy
      correlationId - I am using some kind of constant which is associated (common) for all massges in the flow

      Code:
      class SequenceReleaseStrategy implements ReleaseStrategy
      Code:
      public boolean canRelease(MessageGroup group) {
      		Packet lastPacket = null;
      		final Collection<Message<?>> collection = group.getUnmarked();
      		final int size = collection.size();
      		if (size > 0) {
      			lastPacket = (Packet) ((Message<Packet>) collection.toArray()[size - 1]).getPayload();
      		}
      		if (lastPacket.getHeader().getPacketSeqNum() - 1 == currSeqHolder.getCurrentSequnceNo().get()) {
      			currSeqHolder.getCurrentSequnceNo().set(lastPacket.getHeader().getPacketSeqNum());
      			
      			if(missedChunks > 0 ) {
      				System.out.println("Got retransmission chunk [" + missedChunks + "/" + maxChunks + "]");
      				--missedChunks;				
      				if(missedChunks == 0 ) {
      					currSeqHolder.getCurrentSequnceNo().set(((Packet) ((Message<Packet>) collection.toArray()[group.getUnmarked().size() - maxChunks - 1]).getPayload()).getHeader().getPacketSeqNum());
      				}
      			}						
      			if(missedChunks == 0 ) {				
      				maxChunks = 0;
      				return true;	
      			}
      			return false;			
      		} else {
      			System.err.println("Missed packet, currentSequnceNo="
      					+ currSeqHolder.getCurrentSequnceNo() + " != packetSeqNum="
      					+ lastPacket.getHeader().getPacketSeqNum());
      			
      			// TODO
      			// Send event to service registered on this JVM to get from local
      			// cache. Consider getting data from remote JVM or make TCP/IP
      			// request
      			
      			if(missedChunks == 0) {
      				missedChunks = (lastPacket.getHeader().getPacketSeqNum() - 1) - currSeqHolder.getCurrentSequnceNo().get();
      				maxChunks = missedChunks;
      				System.out.println("Missed chunks: " + missedChunks);
      			}
      			return false;
      		}		
      	}
      The point is that messages are being queued unit all "missed messages" arrive, messages are stored in MessageGroup under some correlationId number.
      When we received all the lost messages I return true...so all the collection of messages (LinkedQueue) is pushed...

      Comment


      • #4
        I assumed you didn't want to hold up 1,2,3,4, but let them flow, then, when 8 arrives, put it in a holding (ordered) list. When 9, 10 etc arrive, put them behind 8, if 5 arrives, let it go, if 7 arrives, put it at the front of the list, when 6 arrives, let it go, as well as 7, 8, 9, 10 etc.

        The problem with using the current <resequencer/> is it does not work well with unbounded groups. If the sequence size header is 0, the group is always completed and removed when a partial release is done. The next set will then never complete.

        If you set the sequence size header to, say, Integer.MAX_VALUE, the group will never complete, and you'll eventually run out of memory. If you don't allow partial releases, you'll never release anything. Hence I opened the JIRA I cited.

        Here is the code that implements this behavior (in CorrelatingMessageHandler).

        Code:
        	private void cleanUpForReleasedGroup(MessageGroup group, Collection<Message> completedMessages) {
        		if (group.isComplete() || group.getSequenceSize() == 0) {
        			// The group is complete or else there is no
        			// sequence so there is no more state to track
        			remove(group);
        		} else {
        			// Mark these messages as processed, but do not
        			// remove the group from store
        			if (completedMessages == null) {
        				mark(group);
        			} else {
        				mark(group, completedMessages);
        			}
        		}
        	}
        I do believe a custom service such as the one I suggested should work for you.

        Now, if you have a known group size, then, yes, a standard resequencer will work, just set the sequence size and sequence number headers appropriately, before you enter the <resequencer/>. In that case, you'll need no custom code at all in the resequencer.

        Comment


        • #5
          "I assumed you didn't want to hold up 1,2,3,4, but let them flow"
          Yes, You are right!

          "When 9, 10 etc arrive, put them behind 8, if 5 arrives, let it go, if 7 arrives, put it at the front of the list, when 6 arrives, let it go, as well as 7, 8, 9, 10 etc."
          This will never happen....when retransmission response arrive i will be in sequence 5 ,6 ,7...so they can be simply appended to BlockingQueue (that You are using internally in resquencer)..in this step I have got: ...,8,9,10,5,6,7

          In the method:
          Code:
          public boolean canRelease(MessageGroup group)
          I am returning true..so it will flush buffered queue to the output...

          After reordering, on the output od resquencer component, I Have 5,6,78,9,10 (because I am using sequence number ordering). And that's it!

          "The problem with using the current <resequencer/> is it does not work well with unbounded group"
          I understand these are unlimited grups (groups with unknown sequence size?)

          If you set the sequence size header to, say, Integer.MAX_VALUE, the group will never complete, and you'll eventually run out of memory. If you don't allow partial releases, you'll never release anything.
          I don't set sequence size header anywhere...I just wait for retransmission response to arrive (get all the lost packets, I internally know how many packets did I miss) and then flush the internal QUEUE (with messages that were buffered in the background).

          Comment


          • #6
            When the functionality described here https://jira.springsource.org/browse/INT-1769 will be implemented?

            Comment


            • #7
              It is scheduled for 2.1.

              I have a lot of travel in the near future so I can't give you a specific date when it will be available.

              Comment


              • #8
                Could You please provide me some details about this functionality? Some use case? Alghoritm? How it will work?

                Comment


                • #9
                  It will work the same as the existing resequencer; the difference is that it will purge all released messages, except the last. (The current resequencer retains all the messages in the group, including any that have been released, until the entire group has been received). This would cause an out-of-memory condition for an unbounded group.

                  So, if the first five of an unbounded message group arrive in this sequence...

                  2, 1, 5, 4, 3,

                  When 2 arrives, it will be put in the store.

                  When 1 arrives, we will see that the sequence is complete, and release 1 then 2; we will delete 1 from the store.

                  When 5 arrives, we see it is not the next so we put it in the store.

                  When 4 arrives, we see it is not the next, so we put it in the store.

                  When 3 arrives, we see that the sequence is complete, and release 3, 4, 5; we then delete 3 and 4, keeping 5 in the store so we can check the sequence of the next message.

                  This will still only work in relatively simple environments out of the box; a custom release strategy will be needed if, say, the first logical message doesn't start with sequence 1.

                  But the key is an underlying message store that supports unbounded groups by removing all except the last released message.

                  Comment


                  • #10
                    Thank You very much...

                    "..say, the first logical message doesn't start with sequence 1."
                    Why the first message sequence number must be 1??? Should not it be fixed? I would like to set it before processing...

                    Comment


                    • #11
                      One more....the grup (which is put to the store) is formed basing on gap detection in sequence numbers? So when there is no gap, so there is no spooling, right?

                      Comment


                      • #12
                        ...Why the first message sequence number must be 1???
                        That's just the way the default works. If, say, the first message is sequence 23, how would we know it's the first? (Vs. 1-22 being missing).

                        If you have some other mechanism that can determine that message #23 is really the first, then simply provide your own release strategy.

                        ...when there is no gap, so there is no spooling, right?
                        Correct.

                        Comment

                        Working...
                        X