Announcement Announcement Module
No announcement yet.
Why threads are terminated ? Page Title Module
Move Remove Collapse
Conversation Detail Module
  • Filter
  • Time
  • Show
Clear All
new posts

  • Why threads are terminated ?

    I create some thread by using dispatcher in channel and task-executor.
    But when method(where SI was invoked) finished and his thread is terminated, all other SI threads die.
    public interface Foo {
        void doSmth(Complex complex);
    <gateway id="doingSmth" service-interface="Foo"
    		default-request-channel="complex" />
    	<channel id="complex" />
    	<splitter input-channel="complex" method="split"
    		output-channel="item" ref="..." />
    	<channel id="item">
    		<dispatcher task-executor="ex1" />
    			<wire-tap channel="testAfterSplitter" />
    	<task:executor id="ex1" pool-size="..."
    		queue-capacity="..." rejection-policy="..." />
    	<service-activator id="ch1" ref="..."
    		input-channel="item" output-channel="toRoute1">
        public void testDoSmth() {
            for (int i = 0; i < 100; i++) {
            try {
            } catch (InterruptedException e) {
    If I run with timeout 10000 or 20000 all items are processed.
    Otherwise, if 1000 or 5000 timeout, or without him, only first part of items was processed.

    What can I do that all items are being processed (after finish of my tests or main() metod) as much time as they needed.

    I was trying put different parameters for task-executor, but result is the same for each time.

  • #2
    The test method is going to exit as soon as the timeout occurs, because that is the side that is doing the async handoffs (so, as far as it's concerned, there's nothing left to do). For testing purposes, if you want to ensure that the test remains active as long as the downstream processing is happening, then you can use an approach like CountDownLatch that is shared by the test and the downstream service.


    • #3
      Is that mean, sleep is fully normal. And there isn't other way for me in this case ?

      How I must invoke doSmth() to full processing all of the items?


      • #4
        Well, this is a situation that pertains to the unit test execution behavior. If you do the same thing from a main() method, does it complete?


        • #5

          Currently it is a little bit tricky to make sure that all messages send into the message infrastructure are processed before going down or checking something.
          I think that the spring-integration team is working for a solution for 2.1.0 .

          I do not know what you really want to achieve with your code, but if the scenario is to shutdown safe (process everything before going down), than one possible solution is:
          1. setWaitForTasksToCompleteOnShutdown(true) in your TaskScheduler (possibly this could not be set with the task tag and you have to create the executor with the bean tag).
          2. Call the destroy() method on the TaskScheduler.
          3. Get the underlying ThreadPoolExecutor from the TaskScheduler and call awaitTermination(...) with infinite timeout, which will block until all Runnables in the ThreadPoolExecutor were processed.
          4. Let spring shutdown the application context, whatever.

          This is just one possibility, which might not work very well, if you have got more than one test method.



          • #6
            Actually, Philipp's suggestion is right on track. As he mentioned, you would need to create the TaskExecutor instance as a normal <bean> definition in order to set the 'waitForTasksToCompleteOnShutdown' property since that is not exposed via the namespace schema (feel free to open a feature request on the Spring Framework itself for that one).


            • #7
              Thanks for good idea!
              But I did next hook.
              public class MyThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
                  protected ExecutorService initializeExecutor(ThreadFactory threadFactory,
                          RejectedExecutionHandler rejectedExecutionHandler) {
                      super.initializeExecutor(threadFactory, rejectedExecutionHandler);
                      MyThreadPoolExecutor executor = new MyThreadPoolExecutor(getThreadPoolExecutor());
                      try {
                          Field threadPoolExecutor = ThreadPoolTaskExecutor.class
                          threadPoolExecutor.set(this, executor);
                      } catch 
                      return executor;
                  volatile private int threadNumber = 0;
                  public void setThreadNumber(int n) {
                      threadNumber = n;
                      ((MyThreadPoolExecutor) getThreadPoolExecutor()).reset();
                  public boolean isNotComplete() {
                      int currentNumberOfThreades = ((MyThreadPoolExecutor) getThreadPoolExecutor()).getCount();
                      return currentNumberOfThreades < threadNumber;
              public class MyThreadPoolExecutor extends ThreadPoolExecutor {
                  public MyThreadPoolExecutor(ThreadPoolExecutor executor) {
                      super(executor.getCorePoolSize(), executor.getMaximumPoolSize(), executor
                              .getKeepAliveTime(TimeUnit.SECONDS), TimeUnit.SECONDS, executor.getQueue(),
                              executor.getThreadFactory(), executor.getRejectedExecutionHandler());
                  private int numberOfFinishedThreads = 0;
                  public void afterExecute(Runnable r, Throwable t) {
                  public int getCount() {
                      return numberOfFinishedThreads;
                  public void reset() {
                      numberOfFinishedThreads = 0;
              and this kind of timeout in junit tests , where ex1 is task-executor from context
                  private MyThreadPoolTaskExecutor ex1;
                  boolean isNotComplete = ex1.isNotComplete();
                  while (isNotComplete) {
                      isNotComplete = ex1.isNotComplete();
              I do not know whether this is correct, but it works exactly as I need


              • #8

                I do not get all of your code, but if you only want to poll if a well known number of Runnables (not Threads!) were processed, than the following code would also do the job for you:

                long numberOfCompletedRunnablesBefore = threadPoolExecutor.getCompletedTaskCount();
                //Now your message input test code follows
                while(threadPoolExecutor.getCompletedTaskCount() - numberOfCompletedRunnablesBefore < wellKnownNumberOfToBeProcessedRunnables) {
                //Now your test validation code follows

                You can also enhance the ThreadPoolTaskExecutor in that way, that it will inform you, if committedRunnables == processedRunnables. Or take a close look at the Quartz Scheduler project which offers nice job listening support (notification when job started, finished, ...).
                You can than work with a java.util.concurrent.locks.Condition or simple object.notify() to wake up your test method, if jobStartedCount == jobFinishedCount.
                Spring gives support for the Quartz Scheduler project out of the box.