Announcement Announcement Module
Collapse
No announcement yet.
Threads for Queue and Executor Channel Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Threads for Queue and Executor Channel

    I want to determine what channel to use in my application. My setup is this:

    1. A sender thread creates 2 request messages and put these in a Queue Channel (same time)
    2. A router will poll the queue and routes it to a channel of a service
    3. I created 2 services: the QueueChannelService which has a queue input channel and an ExecutorChannelService which has an executor input channel. When the process() methods are called, I delay it [Thread.sleep(5000)] for 5 seconds before putting the responses to a queue output channel
    4. A receiver thread polls the output channel for responses and prints the message

    These are the codes/cfg:

    Code:
    //FOR SENDER THREAD
    
    private QueueChannel channel;
    
    run() {
         channel.send(new GenericMessage<Request>(r));
         channel.send(new GenericMessage<Request>(r2));
    }
    
    //FOR RECEIVER THREAD
    
    private QueueChannel channel;
    
    while (!Thread.interrupted()) {
    	Response r = (Response) channel.receive().getPayload();
    	log.info("Received " + r);
    }
    
    //SERVICE ACTIVATORS CFG
    <si:channel id="channel1">
    	<si:dispatcher task-executor="pool"/>
    </si:channel>
    <bean id="pool" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
    	   <property name="corePoolSize"><value>10</value></property>
    	   <property name="maxPoolSize"><value>10</value></property>
    	   <property name="keepAliveSeconds"><value>120</value></property>
    </bean>
    <si:service-activator input-channel="channel1" ref="ExecutorChannelServiceBean" method="process" output-channel="outputChannel"/>
    
    <si:channel id="channel2">
    	<si:queue capacity="10"/>
    </si:channel>
    <si:service-activator input-channel="channel2" ref="QueueChannelServiceBean" method="process" output-channel="outputChannel"/>
    
    //ROUTER CFG
    <si:poller default="true" max-messages-per-poll="1000">
    	<si:interval-trigger interval="100"/>
    </si:poller>
    	
    <si:router id="requestRouter" ref="RouterBean" input-channel="inputChannel" method="route"/>
    
    //INPUT and OUTPUT CHANNELS
    <si:channel id="inputChannel">
    	<si:queue capacity="10"/>
    </si:channel>
    	
    <si:channel id="outputChannel">
    	<si:queue capacity="10"/>
    </si:channel>
    For first run, the Router routes the message to the executor channel service. This is the output:
    Code:
    2010-10-26 16:45:00,267 INFO  Sender [Thread-1] Sending Request[channel1, message1]
    2010-10-26 16:45:00,288 INFO  Sender [Thread-1] Sending Request[channel1, message1]
    2010-10-26 16:45:00,304 INFO  QueueChannelRouter [task-scheduler-2] Routing Request[channel1, message1]
    2010-10-26 16:45:00,309 INFO  ExecutorChannelService [pool-1] Processing request Request[channel1, message1]
    2010-10-26 16:45:00,310 INFO  QueueChannelRouter [task-scheduler-2] Routing Request[channel1, message1]
    2010-10-26 16:45:00,310 INFO  ExecutorChannelService [pool-2] Processing request Request[channel1, message1]
    2010-10-26 16:45:05,310 INFO  ExecutorChannelService [pool-1] Done processing request Request[channel1, message1]Response is Response[EXECUTOR, Request[channel1, message1]]
    2010-10-26 16:45:05,311 INFO  ExecutorChannelService [pool-2] Done processing request Request[channel1, message1]Response is Response[EXECUTOR, Request[channel1, message1]]
    2010-10-26 16:45:05,313 INFO  Receiver [Thread-2] Received Response[EXECUTOR, Request[channel1, message1]]
    2010-10-26 16:45:05,313 INFO  Receiver [Thread-2] Received Response[EXECUTOR, Request[channel1, message1]]
    As you can see, both request messages were put to the channel at 16:45:00. Since the delay I set was 5 secs, both responses were received at 16:45:05.

    For my second run, the Router routes the message to the queue channel service. This is the output:
    Code:
    2010-10-26 16:47:58,651 INFO  Sender [Thread-1] Sending Request[channel2, message2]
    2010-10-26 16:47:58,677 INFO  Sender [Thread-1] Sending Request[channel2, message2]
    2010-10-26 16:47:58,687 INFO  QueueChannelRouter [task-scheduler-2] Routing Request[channel2, message2]
    2010-10-26 16:47:58,690 INFO  QueueChannelRouter [task-scheduler-2] Routing Request[channel2, message2]
    2010-10-26 16:47:58,690 INFO  QueueChannelService [task-scheduler-10] Processing request Request[channel2, message2]
    2010-10-26 16:48:03,690 INFO  QueueChannelService [task-scheduler-10] Done processing request Request[channel2, message2]Response is Response[QUEUED, Request[channel2, message2]]
    2010-10-26 16:48:03,693 INFO  Receiver [Thread-2] Received Response[QUEUED, Request[channel2, message2]]
    2010-10-26 16:48:03,693 INFO  QueueChannelService [task-scheduler-10] Processing request Request[channel2, message2]
    2010-10-26 16:48:08,693 INFO  QueueChannelService [task-scheduler-10] Done processing request Request[channel2, message2]Response is Response[QUEUED, Request[channel2, message2]]
    2010-10-26 16:48:08,694 INFO  Receiver [Thread-2] Received Response[QUEUED, Request[channel2, message2]]
    Both requests were sent at 16:47:58 but responses were received at 16:48:03 and 16:48:08. Why is that? I thought Queued Channel and Executor Channel will have the same function. Is there something wrong with my queue channel cfg?

    Thanks!

  • #2
    What version of Spring Integration are you using?

    Thanks,
    Mark

    Comment


    • #3
      I'm using the stable release spring-integration-1.0.4.RELEASE

      By the way, instead of using <task:executor/>, I used Spring's org.springframework.scheduling.concurrent.ThreadPo olTaskExecutor

      Thanks!
      Last edited by racumin; Oct 26th, 2010, 08:56 PM.

      Comment

      Working...
      X