Announcement Announcement Module
Collapse
No announcement yet.
PriorityChannel Gotcha! Messages with same priority are not processed in FIFO order Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • PriorityChannel Gotcha! Messages with same priority are not processed in FIFO order

    We use PriorityChannels in our application, and during testing we discovered that PriorityChannel doesn't quite behave in the way we expected. As PriorityChannel is derived from QueueChannel, I assumed that the order in which messages are processed would be determined by the relative priority of the messages and their relative position on the queue. In other words, if two messages have the same priority, then the one that was placed on the queue first would be read off the queue first.

    But this is not the case - the messages are compared based on priority only, so messages with the same priority will be processed in an arbitrary order This came to light during our testing, when we saw some messages taking 15 minutes to process, but messages received immediately before and after took less than a second to process!

    Now, this may be by design - I note that PriorityChannel is backed by PriorityBlockingQueue, and in the JavaDoc for that class, it explicitly states that the class "makes no guarantees about the ordering of elements with equal priority" (and it even provides a code excerpt that can be used to provide priority+FIFO (first-in, first-out) ordering).

    However, I don't think this behaviour is particularly intuitive (or practical). At the very least, I think the documentation (JavaDoc and reference manual) should be explicit about how messages will be ordered when using PriorityChannel. Better still, I think that the default behaviour should be to order messages by both priority and their position on the queue.

    Regards
    -Matt

  • #2
    Please refer to the reference manual, wherin it states...

    ...However, for custom priority determination logic, a comparator of type Comparator<Message<?>> can be provided to the PriorityChannel's constructor. ...

    And, further, ...

    Code:
    <channel id="priorityChannel" datatype="example.Widget">
        <priority-queue comparator="widgetComparator" capacity="10"/>
    </channel>

    Comment


    • #3
      Gary, Thanks for your reply. I still think it would be helpful to state explicitly that messages with the same priority will be received in an arbitrary order. To me, anything that is based on a queue implies FIFO behaviour (all other things being equal), so the behaviour in this case seems counter-intuitive to me.

      Regards
      -Matt

      Comment


      • #4
        Matt, you might need to argue that case to Doug Lea and Josh Bloch

        We are simply delegating to java.util.PriorityQueue under the covers. Here's a relevant excerpt from its JavaDoc:
        Code:
         * <p>The <em>head</em> of this queue is the <em>least</em> element
         * with respect to the specified ordering.  If multiple elements are
         * tied for least value, the head is one of those elements -- ties are
         * broken arbitrarily.
        -Mark

        Comment


        • #5
          Sorry, I should have read the entire post before commenting I realize that you did mention the underlying JavaDoc, and I also realized that I accidentally quoted PriorityQueue and not PriorityBlockingQueue. So, this is actually the relevant JavaDoc excerpt that you were referring to I presume:
          Code:
           * <p>Operations on this class make no guarantees about the ordering
           * of elements with equal priority. If you need to enforce an
           * ordering, you can define custom classes or comparators that use a
           * secondary key to break ties in primary priority values.  For
           * example, here is a class that applies first-in-first-out
           * tie-breaking to comparable elements. To use it, you would insert a
           * <tt>new FIFOEntry(anEntry)</tt> instead of a plain entry object.
          What Gary is suggesting is that you could implement that Comparator and provide it. I guess we could consider providing such a Comparator implementation within Spring Integration so that others may use it as well. I don't think we would want to assume that every provided Comparator should be wrapped though. As long as we provide the class that someone can configure as a bean (with a targetComparator property or something for the "primary" ordering), then that can always be defined as a bean itself. Thoughts?

          Comment


          • #6
            One question is what to use as the secondary key in the comparator.

            In all but the very simplest case, $timestamp won't work because messages may arrive at the channel in a different order to that in which the messages were originated. Reading Matt's original question, he wants the arrival sequence to be preserved (within priority).

            Wouldn't this need some header-enrichment immediately upstream of the channel; either to add a simple incrementing sequence header, or a new timestamp?

            Comment


            • #7
              Hi - this might not be suited to a framework solution, but here's what I did for this. I created a FIFOMessage class (see extract 1 below) that maintains a global sequence number. Then I have created my own subclass of QueueChannel (see extract 2 below) that wraps my message inside a FIFOMessage on a send, and unwraps it on a receive.

              You may ask why I have created my own class derived from QueueChannel - the reason is that I had already created this class as a workaround for a different issue (which is fixed in 1.0.5, but this is not released yet). See http://forum.springsource.org/showthread.php?t=92699 for details.

              Regards
              -Matt

              Code extract 1:
              Code:
              /**
               * A subclass of Message that maintains a sequence number as well as priority.
               * This is to enable messages to be processed in order of priority, but if
               * two messages have the same priority, then they will be ordered based on
               * their position in the queue (as determined by the sequence number).
               * In other words, messages with the same priority will be processed in
               * first-in, first-out (FIFO) order.
               *
               * @author carlmat
               *
               * @param <T>
               */
              @SuppressWarnings("serial")
              public class FIFOMessage<T> extends GenericMessage<T> implements Comparable<FIFOMessage<T>> {
              	final static AtomicLong seq = new AtomicLong();
              	final long seqNum;
              	
              	public FIFOMessage(T payload, MessagePriority priority) {
              		super(payload, getHeadersMap(priority));
              		seqNum = seq.getAndIncrement();		
              	}
              	
              	private static Map<String,Object> getHeadersMap(MessagePriority priority) {
              		Map<String, Object> headers = new HashMap<String,Object>();
              		headers.put(MessageHeaders.PRIORITY, priority);
              		return headers;
              	}
              
              	@Override
              	public int compareTo(FIFOMessage<T> other) {
              		// compare the priority of the messages first
              		MessagePriority priority1 = this.getHeaders().getPriority();
              		MessagePriority priority2 = other.getHeaders().getPriority();
              		priority1 = priority1 != null ? priority1 : MessagePriority.NORMAL;
              		priority2 = priority2 != null ? priority2 : MessagePriority.NORMAL;
              		int res = priority1.compareTo(priority2);
              
              		// for two messages of the same priority, compare their sequence number
              		if (res == 0 && other != this)
              			res = (seqNum < other.seqNum ? -1 : 1);
              		return res;
              	}
              }
              Code extract 2:
              Code:
              public class PriorityBlockingChannel extends QueueChannel {
              ...
              	@Override
              	protected boolean doSend(Message<?> message, long timeout) {
              		if (!acquirePermitIfNecessary(timeout)) {
              			return false;
              		}
              		
              		log.debug("Channel {} - sending message with timeout {}", this.getName(), timeout);
              		
              		FIFOMessage<Message<?>> fifoMessage = new FIFOMessage<Message<?>>(message, message.getHeaders().getPriority());
              		boolean ret = super.doSend(fifoMessage, 0);
              		
              		return ret;
              	}
              
              	@Override
              	protected Message<?> doReceive(long timeout) {
              		FIFOMessage<Message<?>> fifoMessage = (FIFOMessage<Message<?>>) super.doReceive(timeout);
              		if ( fifoMessage != null ) {
              			Message<?> message = fifoMessage.getPayload();
              			this.releasePermitIfNecessary();
              			return message;
              		}
              		
              		return null;
              	}
              ...
              }

              Comment

              Working...
              X