Announcement Announcement Module
Collapse
No announcement yet.
Strange behaviour with message-channel-adapter Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Strange behaviour with message-channel-adapter

    I have an message-driven-channel-adapter that is exhibiting some strange behaviour. It is defined as follows

    [code]<jms:message-driven-channel-adapter id="requestsGateway" channel="requestsChannel" message-converter="messageConverter" connection-factory="tibcoTopicConnectionFactory" destination="rxTopic" />[code]

    This is writing messages to the following channel definition
    [code]
    <si:thread-pool-task-executor id="requestsPool" core-size="1" max-size="1"/>
    <siublish-subscribe-channel id="requestsChannel" task-executor="requestsPool"/>
    [code]

    What I am trying to achieve is to have messages read from a topic and queued in my app for processing then have them processed by a single thread as soon as messages are available.

    The publish-subscribe-channel seems to work well in 99% of cases. However I am seeing occasionally that the MessageListener thread will still be used in some circumstances for processing messages.

    Below is an example

    [code]
    2008-12-12 09:08:36,986 INFO [org.springframework.jms.listener.DefaultMessageLis tenerContainer#0-1] order.TxMessageConverter - Got message : ObjectMessage={ Header={ JMSMessageID={ID:EMS-SERVER.1944928C608279A:880} JMSDestination={Topic[topic.rx]} JMSReplyTo={null} JMSDeliveryMode={PERSISTENT} JMSRedelivered={false} JMSCorrelationID={null} JMSType={null} JMSTimestamp={Fri Dec 12 09:08:36 GMT 2008} JMSExpiration={0} JMSPriority={4} } Properties={ spring.integration.timestamp={Long:1229072916986} } Object={} }
    2008-12-12 09:08:36,986 INFO [requestsPool-1] order.OmsActionProcessorImpl - requestValidator received : ....
    .
    2008-12-12 09:08:37,345 INFO [org.springframework.jms.listener.DefaultMessageLis tenerContainer#0-1] order.TxMessageConverter - Got message : ObjectMessage={ Header={ JMSMessageID={ID:EMS-SERVER.1944928C608279D:5883} JMSDestination={Topic[topic.rx]} JMSReplyTo={null} JMSDeliveryMode={PERSISTENT} JMSRedelivered={false} JMSCorrelationID={null} JMSType={null} JMSTimestamp={Fri Dec 12 09:08:37 GMT 2008} JMSExpiration={0} JMSPriority={4} } Properties={ spring.integration.timestamp={Long:1229072917345} } Object={} }
    2008-12-12 09:08:37,345 INFO [requestsPool-1] order.OmsActionProcessorImpl - requestValidator received : ....
    .
    2008-12-12 09:08:37,377 INFO [org.springframework.jms.listener.DefaultMessageLis tenerContainer#0-1] order.TxMessageConverter - Got message : ObjectMessage={ Header={ JMSMessageID={ID:EMS-SERVER.1944928C608886:881} JMSDestination={Topic[topic.rx]} JMSReplyTo={null} JMSDeliveryMode={PERSISTENT} JMSRedelivered={false} JMSCorrelationID={null} JMSType={null} JMSTimestamp={Fri Dec 12 09:08:37 GMT 2008} JMSExpiration={0} JMSPriority={4} } Properties={ spring.integration.timestamp={Long:1229072917361} spring.integration.sequence-number={Integer:1} spring.integration.sequence-size={Integer:1} } Object={} }
    2008-12-12 09:08:37,377 INFO [requestsPool-1] order.OmsActionProcessorImpl - requestValidator received : ...
    .
    2008-12-12 09:08:37,439 INFO [org.springframework.jms.listener.DefaultMessageLis tenerContainer#0-1] order.TxMessageConverter - Got message : ObjectMessage={ Header={ JMSMessageID={ID:EMS-SERVER.1944928C608885:145} JMSDestination={Topic[topic.rx]} JMSReplyTo={null} JMSDeliveryMode={PERSISTENT} JMSRedelivered={false} JMSCorrelationID={null} JMSType={null} JMSTimestamp={Fri Dec 12 09:08:37 GMT 2008} JMSExpiration={0} JMSPriority={4} } Properties={ spring.integration.timestamp={Long:1229072917439} spring.integration.sequence-number={Integer:1} spring.integration.sequence-size={Integer:1} } Object={} }
    2008-12-12 09:08:37,439 INFO [org.springframework.jms.listener.DefaultMessageLis tenerContainer#0-1] order.OmsActionProcessorImpl - requestValidator received : ...
    .
    2008-12-12 09:08:37,455 INFO [org.springframework.jms.listener.DefaultMessageLis tenerContainer#0-1] order.TxMessageConverter - Got message : ObjectMessage={ Header={ JMSMessageID={ID:EMS-SERVER.1944928C608886:882} JMSDestination={Topic[topic.rx]} JMSReplyTo={null} JMSDeliveryMode={PERSISTENT} JMSRedelivered={false} JMSCorrelationID={null} JMSType={null} JMSTimestamp={Fri Dec 12 09:08:37 GMT 2008} JMSExpiration={0} JMSPriority={4} } Properties={ spring.integration.timestamp={Long:1229072917455} spring.integration.sequence-number={Integer:1} spring.integration.sequence-size={Integer:1} } Object={} }
    2008-12-12 09:08:37,455 INFO [requestsPool-1] order.OmsActionProcessorImpl - requestValidator received : ...
    .
    [code]

    As you can seen from the messge at 2008-12-12 09:08:37,439 is processing by the MesageListener thread and not handed off to the requestPool thread. Is there a reason for this ?

  • #2
    This is a result of the thread pool's rejection policy. The default is "CallerRuns". That means that anytime a task is submitted but all threads in that pool are already occupied with a task, the calling thread will run that task. This provides a very natural way to "throttle" requests since the calling thread is unable to keep throwing tasks to the executor while it is running the otherwise rejected task.

    The policy can be changed. Even with the <thread-pool-task-executor/> element we provide a "rejection-policy" attribute, and it accepts any of the following enumerated values:
    <xsd:enumeration value="ABORT"/>
    <xsd:enumeration value="CALLER_RUNS"/>
    <xsd:enumeration value="DISCARD"/>
    <xsd:enumeration value="DISCARD_OLDEST"/>

    So, as you can see, there are other options. For example, 'ABORT' will throw RejectedExecutionExceptions or 'DISCARD' will ignore the current task altogether. For more information, check out the JavaDoc for java.util.concurrent.ThreadPoolExecutor where each of these are defined as inner classes implementing the 'RejectedExecutionHandler' interface.

    For most Spring Integration use cases, I would expect that CallerRuns is the most logical policy, and that is why we have chosen that as a default. There are valid reasons to change it however. If you have high throughput "events" where there is no harm in ignoring events that arrive very quickly because you will receive another event immediately, you may decide that dropping messages is the best way to handle an overwhelmed task executor. Obviously, that is not a very good choice for the default.

    I hope that clarifies things a bit. Let me know if you have any more questions or comments.

    Comment


    • #3
      Were you able to confirm that this behavior was changing based on the rejection policy?

      Comment


      • #4
        I see what your intention is for this. However I have a couple of points to raise

        1. Each message had completed processing before the next message was recieved so the CALLER_RUNS policy shouldn't have need to kick in. The request pool should have been able to process the request.

        2. I seem to need a different solution to my problem. I want a high speed reader from a pub/sub bus that then queues messages for processing in recieved sequence by a single thread. Also the single thread should be event driven so when it is idle it will immiediately kickoff when a message is new message is recieved.

        Comment


        • #5
          Originally posted by Mark Fisher View Post
          For most Spring Integration use cases, I would expect that CallerRuns is the most logical policy, and that is why we have chosen that as a default. There are valid reasons to change it however. If you have high throughput "events" where there is no harm in ignoring events that arrive very quickly because you will receive another event immediately, you may decide that dropping messages is the best way to handle an overwhelmed task executor. Obviously, that is not a very good choice for the default.
          Hi Mark!

          First of all, thanks for the great tool which the Spring Integration is!

          About the rejection policy:

          I've just also run into this and first was surprised that my service activator occasionally got started in a different thread pool (in a thread named 'task-scheduler-<a_number>') even though I had a separate task executor with its own pool defined for the service activator's poller. So I think that for such a case when there is a thread pool defined for a poller, the default of 'CALLER_RUNS' looks a bit confusing.

          I also think that 'DISCARD' as a default would be really a good choice. Because as I can see in the code and how it works, is that no messages get actually lost with 'DISCARD'. Poller get's scheduled by the main SI's task scheduler (by default), but the scheduler tries to execute the poller task in the task executor defined for the poller (if the poller has its task executor). The poller task consists of first polling the channel queue and then calling the message handler with the message received. And so if the poller's task executor is busy, it would just reject the poller task, that is the channel simply doesn't get polled when there's no thread available to process the message. I think this behavior is good. The only thing is that the most suitable rejection policy for the case seems to be 'DISCARD'.

          Comment

          Working...
          X