Announcement Announcement Module
Collapse
No announcement yet.
Queue channel and multiple consumers per message Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Queue channel and multiple consumers per message

    Hi,

    I use a UDP inbound channel adapter to receive UDP datagrams. Each incoming message is stored in a queue channel.

    I want two services to process each incoming message but I know that each message in the queue may be consumed by exactly one consumer.

    I thought about using a publish/subscribe scheme but I don't know if it will work since I may receive a lot of messages at the same time and I don't know how this situation is handled.

    I've tried to use a chain with a transformer, one service activator for the first service and another for the second service. But, I see that only the first declared service is called. Is it expected?

    What is the best solution for this case?

    Thanks,
    Mickael

  • #2
    A <publish-subscribe/> channel will work fine for you. You don't have to worry about there being many messages, the framework will send each message to each of the subscribers.

    You could also choose to use the <recipient-list-router />, described here - http://static.springsource.org/sprin...ientlistrouter

    Comment


    • #3
      How the publish/subscribe model works if there are many messages? The UDP inbound channel adapter has many threads (by default 5) that are ready to get incoming datagrams. Then, how the messages are dispatched to the different subscribers? Are there as many subscriber threads as the number of incoming messages?

      I have some difficulties to understand this model concerning multi-threading.

      Thanks for your help.

      Comment


      • #4
        There is one thread that does nothing except read packets from the socket. We don't do any processing on this thread, to give the best chance to avoid missing any packets. This thread is obtained from the Spring Integration global task scheduler.

        When a packet is received, it is handed off to another thread; the send to the channel is asynchronously processed by creating the message and executing the send via a TaskExecutor. You can supply your own task executor (via setTaskExecutor()) such as a CachedThreadPool, which has no limits on the number of threads. If you don't supply your own TaskExecutor, you get a FixedThreadPoolTaskExecutor, with a (default) thread pool size of 5.

        When each message is processed, if the channel is <publish-subscribe/> the selected thread will send the message to the first consumer and then to the second consumer.

        I hope that helps to explain the threading model.

        Comment


        • #5
          Originally posted by Gary Russell View Post
          There is one thread that does nothing except read packets from the socket. We don't do any processing on this thread, to give the best chance to avoid missing any packets. This thread is obtained from the Spring Integration global task scheduler.

          When a packet is received, it is handed off to another thread; the send to the channel is asynchronously processed by creating the message and executing the send via a TaskExecutor. You can supply your own task executor (via setTaskExecutor()) such as a CachedThreadPool, which has no limits on the number of threads. If you don't supply your own TaskExecutor, you get a FixedThreadPoolTaskExecutor, with a (default) thread pool size of 5.
          Ok, so in the case there are many UDP datagrams that arrive during a very short period of time, I guess 5 threads may not be enough? In that case, is there a way to add more threads in the FixedThreadPoolTaskExecutor at runtime? I see that the Executor is created when the thread is started (method checkTaskExecutor), so I don't think it is possible to add more thread through the poolSize property, am I right?

          Should I use my own TaskExecutor in that case? Does the CachedThreadPool provide this feature (configuration at runtime)?


          Originally posted by Gary Russell View Post
          When each message is processed, if the channel is <publish-subscribe/> the selected thread will send the message to the first consumer and then to the second consumer.
          If a DirectChannel is used, the consumers' processing will be done by default in the send method of the publisher (the selected thread). And if this processing is long, a lot of threads from the CachedThreadPool will be used since UDP datagrams are arriving and more and more threads of the pool are executing the consumers' processing. The other possibility would be to execute the consumers' processing on new threads (by using an ExecutorChannel?). But in this case, there will be more threads on the consumers side.

          Why no to use a queue and to use less threads? What is the tradeoff? Am I forced to follow the Publish/Subscribe model only because the QueueChannel doesn't provide the possibility for multpile consumers to receive the same message?

          Thanks for your clear explanation

          Comment


          • #6
            The default is a Fixed thread pool so, no, it can't be adjusted at run time.

            There are lots of Executors, you can pick whichever one you want. ThreadPoolTaskExecutor has a core pool size and a max pool size, which means the pool is adjusted as needed; CachedThreadPool reuses threads but has no limit. You simply choose one that's appropriate for your environment.

            Let's say you have a fixed pool of (max) 5 threads, and 10 messages show up; this doesn't mean you'll lose messages; just that #6 through #10 will only be processed when one of the other messages is consumed (by both consumers).

            For this particular adapter, there is no benefit of using a queue channel because there is already an async handoff in the adapter (needed because of the nature of UDP).

            I don't think you need to worry; just use a <publish-subscribe-channel/> and an executor with the characteristics you want.

            Comment

            Working...
            X