Announcement Announcement Module
Collapse
No announcement yet.
Send on a PriorityChannel does not block Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Send on a PriorityChannel does not block

    Hi - I am working on an application that uses priority channels, e.g.

    Code:
    	<si:channel id="inbound_soi_lookup">
    		<si:priority-queue capacity="100"/>
    	</si:channel>
    During load testing, I have noticed several occurrences of the following exception:

    Code:
    16:01:52.486 [subscriptionService-3] ERROR o.s.i.handler.LoggingHandler - org.springframework.integration.message.MessageDeliveryException: Router failed to send to channel: inbound_soi_lookup
    	at org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:118)
    	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:59)
    	at org.springframework.integration.endpoint.PollingConsumer.doPoll(PollingConsumer.java:59)
    	at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.innerPoll(AbstractPollingEndpoint.java:232)
    	at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.poll(AbstractPollingEndpoint.java:216)
    	at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.access$0(AbstractPollingEndpoint.java:213)
    	at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:204)
    	at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:49)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
    	at java.lang.Thread.run(Thread.java:619)
    This is happening when the priority channel's internal queue is already at maximum capacity, and another message is sent to the channel. The call to send() returns false because acquirePermitIfNecessary in PriorityChannel.doSend() returns false:

    Code:
    	@Override
    	protected boolean doSend(Message<?> message, long timeout) {
    		if (!acquirePermitIfNecessary(timeout)) {
    			return false;
    		}
    		return super.doSend(message, 0);
    	}
    I also tried changing the PriorityChannels to regular QueueChannels to verify that the issue was related to priority channels in particular. When I did this, the exception did not occur. Presumably this is because when using a QueueChannel, the call to send() will block until there is space on the queue. This behaviour is noted in the reference manual:

    A channel that has not reached its capacity limit will store messages in its internal queue, and the send() method will return immediately even if no receiver is ready to handle the message. If the queue has reached capacity, then the sender will block until room is available.
    The behaviour described above is what I was expecting would happen with the PriorityChannels as well (eventually leading to a backlog of messages on the 'external' MQ queue). Is there a particular reason why PriorityChannel cannot be implemented so that it also blocks on send() when the queue is full? Can you suggest a workaround for this issue, other than increasing the capacity on the queue?

    Thanks
    -Matt

  • #2
    The call to acquirePermitIfNecessary(timeout) should be blocking. What are you passing as the timeout value?

    Comment


    • #3
      Hi Mark - I have not defined any specific timeout for the channel, so the timeout parameter for acquirePermitIfNecessary has a value of -1.

      Rgds
      -Matt

      Comment


      • #4
        Matt, what version are you using?

        Comment


        • #5
          Version 1.0.4

          Thanks
          Matt

          Comment


          • #6
            Matt,

            Can you open an issue for this? The problem is that we normally interpret -1 as an "indefinite" timeout value (as on QueueChannel), but in this case we are passing that value directly to tryAcquire on the Semaphore. The javadoc from that method explains why this is not working as you expect:
            Code:
              * <p>If the specified waiting time elapses then the value {@code false}
              * is returned.  If the time is less than or equal to zero, the method
              * will not wait at all.
            Thanks,
            Mark

            Comment


            • #7
              Hi Mark - this is now raised as issue 1275: http://jira.springframework.org/browse/INT-1275. Can you suggest a workaround that we could use in the meantime?

              Thanks
              -Matt

              Comment


              • #8
                From your stack trace it looks like you can set the timeout in the router. Does that work (if you set it to something long that is)?
                Last edited by Dave Syer; Jul 27th, 2010, 05:36 AM. Reason: clarification

                Comment


                • #9
                  Hi Dave -- yes, I could try to avoid it by setting a timeout or increasing the queue capacity, but there's still a risk of the condition occurring and therefore one or more messages failing to process. Ideally, I want the same blocking behaviour as a standard QueueChannel. So, as a workaround, I've created a new class PriorityBlockingChannel that calls Semaphore.acquire() instead of Semaphore.tryAcquire(timeout) when timeout is <= 0 (see below)

                  This seems to do the trick, but let me know if you think there will be any problems with this approach.

                  Thanks
                  -Matt

                  Code:
                  ...
                  
                  public class PriorityBlockingChannel extends QueueChannel {
                  	private final Semaphore semaphore;
                  
                  ...
                  
                  	@Override
                  	protected boolean doSend(Message<?> message, long timeout) {
                  		if (!acquirePermitIfNecessary(timeout)) {
                  			return false;
                  		}
                  		return super.doSend(message, 0);
                  	}
                  
                  ...
                  
                  	private boolean acquirePermitIfNecessary(long timeoutInMilliseconds) {
                  		if (this.semaphore != null) {
                  			try {
                  				if (timeoutInMilliseconds > 0) {
                  					return this.semaphore.tryAcquire(timeoutInMilliseconds, TimeUnit.MILLISECONDS);
                  				}
                  				else {
                  					this.semaphore.acquire();
                  					return true;
                  				}
                  			}
                  			catch (InterruptedException e) {
                  				Thread.currentThread().interrupt();
                  				return false;
                  			}
                  		}
                  		return true;
                  	}
                  
                  ...
                  
                  }

                  Comment


                  • #10
                    Thanks again for raising the issue. I've fixed it on trunk as a well as the 1.0.x branch. The 2.0 M6 release will be out very soon (probably today), and 1.0.5 will likely be out within days or a week.

                    Comment


                    • #11
                      Mark - Thanks for the quick response.

                      Regards
                      -Matt

                      Comment

                      Working...
                      X