Announcement Announcement Module
Collapse
No announcement yet.
Questions about parallel job execution Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Questions about parallel job execution

    Hi there, I had a test on ParallelJobFunctionalTests on 1.0.0.FINAL and have some doubts...

    1. Thread number
    Based on my understanding (correct me if I'm wrong), by default, the number of concurrent threads is unlimited in SimpleAsyncTaskExecutor. So if each item reader is gonna process one item, the thread number will be equal to total item counts. But from the log of ParallelJobFunctionalTests, it seems the thread count is larger than item count (there are 5 items processed, but 9 StagingItemReader thread was running).

    Code:
    17:44:50,428  INFO main StagingItemReader:75 - Keys obtained for staging.
    17:44:50,460 DEBUG main StagingItemReader:184 - Clearing buffer on commit: []
    17:44:50,475 DEBUG SimpleAsyncTaskExecutor-1 StagingItemReader:140 - Retrieved key from list: 231
    17:44:50,569 DEBUG SimpleAsyncTaskExecutor-1 TradeWriter:50 - Trade: [isin=UK21341EAH41,quantity=211,price=31.11,customer=customer1]
    17:44:50,585 DEBUG SimpleAsyncTaskExecutor-1 JdbcTradeDao:53 - Processing: Trade: [isin=UK21341EAH41,quantity=211,price=31.11,customer=customer1]
    17:44:50,616  INFO SimpleAsyncTaskExecutor-1 ProcessorLogAdvice:53 - Processed: Trade: [isin=UK21341EAH41,quantity=211,price=31.11,customer=customer1]
    17:44:50,632 DEBUG SimpleAsyncTaskExecutor-1 StagingItemReader:184 - Clearing buffer on commit: [231]
    17:45:03,679 DEBUG SimpleAsyncTaskExecutor-2 StagingItemReader:140 - Retrieved key from list: 232
    17:45:03,679 DEBUG SimpleAsyncTaskExecutor-3 StagingItemReader:140 - Retrieved key from list: 233
    17:45:03,726 DEBUG SimpleAsyncTaskExecutor-3 TradeWriter:50 - Trade: [isin=UK21341EAH43,quantity=213,price=33.11,customer=customer3]
    17:45:03,757 DEBUG SimpleAsyncTaskExecutor-3 JdbcTradeDao:53 - Processing: Trade: [isin=UK21341EAH43,quantity=213,price=33.11,customer=customer3]
    17:45:03,772  INFO SimpleAsyncTaskExecutor-3 ProcessorLogAdvice:53 - Processed: Trade: [isin=UK21341EAH43,quantity=213,price=33.11,customer=customer3]
    17:45:03,788 DEBUG SimpleAsyncTaskExecutor-3 StagingItemReader:184 - Clearing buffer on commit: [233]
    17:45:05,898 DEBUG SimpleAsyncTaskExecutor-2 TradeWriter:50 - Trade: [isin=UK21341EAH42,quantity=212,price=32.11,customer=customer2]
    17:45:05,944 DEBUG SimpleAsyncTaskExecutor-2 JdbcTradeDao:53 - Processing: Trade: [isin=UK21341EAH42,quantity=212,price=32.11,customer=customer2]
    17:45:06,304  INFO SimpleAsyncTaskExecutor-2 ProcessorLogAdvice:53 - Processed: Trade: [isin=UK21341EAH42,quantity=212,price=32.11,customer=customer2]
    17:45:24,023 DEBUG SimpleAsyncTaskExecutor-4 StagingItemReader:140 - Retrieved key from list: 234
    17:45:25,273 DEBUG SimpleAsyncTaskExecutor-4 TradeWriter:50 - Trade: [isin=UK21341EAH44,quantity=214,price=34.11,customer=customer4]
    17:45:25,617 DEBUG SimpleAsyncTaskExecutor-4 JdbcTradeDao:53 - Processing: Trade: [isin=UK21341EAH44,quantity=214,price=34.11,customer=customer4]
    17:45:26,085  INFO SimpleAsyncTaskExecutor-4 ProcessorLogAdvice:53 - Processed: Trade: [isin=UK21341EAH44,quantity=214,price=34.11,customer=customer4]
    17:45:49,758 DEBUG SimpleAsyncTaskExecutor-5 StagingItemReader:140 - Retrieved key from list: 235
    17:45:49,805 DEBUG SimpleAsyncTaskExecutor-2 StagingItemReader:184 - Clearing buffer on commit: [232]
    17:45:50,320 DEBUG SimpleAsyncTaskExecutor-4 StagingItemReader:184 - Clearing buffer on commit: [234]
    17:45:50,461 DEBUG SimpleAsyncTaskExecutor-6 StagingItemReader:184 - Clearing buffer on commit: []
    17:45:50,539 DEBUG SimpleAsyncTaskExecutor-7 StagingItemReader:184 - Clearing buffer on commit: []
    17:45:50,633 DEBUG SimpleAsyncTaskExecutor-8 StagingItemReader:184 - Clearing buffer on commit: []
    17:45:50,711 DEBUG SimpleAsyncTaskExecutor-9 StagingItemReader:184 - Clearing buffer on commit: []
    17:45:51,148 DEBUG SimpleAsyncTaskExecutor-5 TradeWriter:50 - Trade: [isin=UK21341EAH45,quantity=215,price=35.11,customer=customer5]
    17:45:51,492 DEBUG SimpleAsyncTaskExecutor-5 JdbcTradeDao:53 - Processing: Trade: [isin=UK21341EAH45,quantity=215,price=35.11,customer=customer5]
    17:45:52,133  INFO SimpleAsyncTaskExecutor-5 ProcessorLogAdvice:53 - Processed: Trade: [isin=UK21341EAH45,quantity=215,price=35.11,customer=customer5]
    17:45:53,383 DEBUG SimpleAsyncTaskExecutor-5 StagingItemReader:184 - Clearing buffer on commit: [235]
    17:45:53,680  INFO main SimpleJobLauncher:87 - Job: [SimpleJob: [name=parallelJob]] completed successfully with the following parameters: [{}{}{}{}]
    17:45:53,820  INFO Thread-3 SessionFactoryImpl:769 - closing
    2. the usage of concurrencyLimit in SimpleAsyncTaskExecutor

    From Spring 2.5.x javadoc, SimpleAsyncTaskExecutor supports limiting concurrent threads through the "concurrencyLimit" bean property. But when I set concurrencyLimit to '2' , the result is that there are 7 threads of StagingItemReader running

    Code:
    17:47:34,744  INFO main StagingItemReader:75 - Keys obtained for staging.
    17:47:34,807 DEBUG main StagingItemReader:184 - Clearing buffer on commit: []
    17:47:34,822 DEBUG SimpleAsyncTaskExecutor-1 StagingItemReader:140 - Retrieved key from list: 261
    17:47:34,900 DEBUG SimpleAsyncTaskExecutor-1 TradeWriter:50 - Trade: [isin=UK21341EAH41,quantity=211,price=31.11,customer=customer1]
    17:47:34,947 DEBUG SimpleAsyncTaskExecutor-1 JdbcTradeDao:53 - Processing: Trade: [isin=UK21341EAH41,quantity=211,price=31.11,customer=customer1]
    17:47:35,088  INFO SimpleAsyncTaskExecutor-1 ProcessorLogAdvice:53 - Processed: Trade: [isin=UK21341EAH41,quantity=211,price=31.11,customer=customer1]
    17:47:35,135 DEBUG SimpleAsyncTaskExecutor-1 StagingItemReader:184 - Clearing buffer on commit: [261]
    17:47:46,276 DEBUG SimpleAsyncTaskExecutor-2 StagingItemReader:140 - Retrieved key from list: 262
    17:47:46,276 DEBUG SimpleAsyncTaskExecutor-3 StagingItemReader:140 - Retrieved key from list: 263
    17:47:46,416 DEBUG SimpleAsyncTaskExecutor-2 TradeWriter:50 - Trade: [isin=UK21341EAH42,quantity=212,price=32.11,customer=customer2]
    17:47:46,447 DEBUG SimpleAsyncTaskExecutor-2 JdbcTradeDao:53 - Processing: Trade: [isin=UK21341EAH42,quantity=212,price=32.11,customer=customer2]
    17:47:46,479  INFO SimpleAsyncTaskExecutor-2 ProcessorLogAdvice:53 - Processed: Trade: [isin=UK21341EAH42,quantity=212,price=32.11,customer=customer2]
    17:47:46,494 DEBUG SimpleAsyncTaskExecutor-2 StagingItemReader:184 - Clearing buffer on commit: [262]
    17:47:46,510 DEBUG SimpleAsyncTaskExecutor-4 StagingItemReader:140 - Retrieved key from list: 264
    17:47:46,713 DEBUG SimpleAsyncTaskExecutor-4 TradeWriter:50 - Trade: [isin=UK21341EAH44,quantity=214,price=34.11,customer=customer4]
    17:47:46,744 DEBUG SimpleAsyncTaskExecutor-4 JdbcTradeDao:53 - Processing: Trade: [isin=UK21341EAH44,quantity=214,price=34.11,customer=customer4]
    17:47:46,823  INFO SimpleAsyncTaskExecutor-4 ProcessorLogAdvice:53 - Processed: Trade: [isin=UK21341EAH44,quantity=214,price=34.11,customer=customer4]
    17:47:46,838 DEBUG SimpleAsyncTaskExecutor-4 StagingItemReader:184 - Clearing buffer on commit: [264]
    17:47:46,854 DEBUG SimpleAsyncTaskExecutor-5 StagingItemReader:140 - Retrieved key from list: 265
    17:47:46,916 DEBUG SimpleAsyncTaskExecutor-3 TradeWriter:50 - Trade: [isin=UK21341EAH43,quantity=213,price=33.11,customer=customer3]
    17:47:46,948 DEBUG SimpleAsyncTaskExecutor-3 JdbcTradeDao:53 - Processing: Trade: [isin=UK21341EAH43,quantity=213,price=33.11,customer=customer3]
    17:47:46,994  INFO SimpleAsyncTaskExecutor-3 ProcessorLogAdvice:53 - Processed: Trade: [isin=UK21341EAH43,quantity=213,price=33.11,customer=customer3]
    17:47:47,010 DEBUG SimpleAsyncTaskExecutor-3 StagingItemReader:184 - Clearing buffer on commit: [263]
    17:47:47,010 DEBUG SimpleAsyncTaskExecutor-5 TradeWriter:50 - Trade: [isin=UK21341EAH45,quantity=215,price=35.11,customer=customer5]
    17:47:47,057 DEBUG SimpleAsyncTaskExecutor-6 StagingItemReader:184 - Clearing buffer on commit: []
    17:47:47,057 DEBUG SimpleAsyncTaskExecutor-5 JdbcTradeDao:53 - Processing: Trade: [isin=UK21341EAH45,quantity=215,price=35.11,customer=customer5]
    17:47:47,073  INFO SimpleAsyncTaskExecutor-5 ProcessorLogAdvice:53 - Processed: Trade: [isin=UK21341EAH45,quantity=215,price=35.11,customer=customer5]
    17:47:47,088 DEBUG SimpleAsyncTaskExecutor-7 StagingItemReader:184 - Clearing buffer on commit: []
    17:47:47,135 DEBUG SimpleAsyncTaskExecutor-5 StagingItemReader:184 - Clearing buffer on commit: [265]
    17:47:47,432  INFO main SimpleJobLauncher:87 - Job: [SimpleJob: [name=parallelJob]] completed successfully with the following parameters: [{}{}{}{}]
    17:47:47,463  INFO Thread-3 SessionFactoryImpl:769 - closing
    could anyone tell me what's going on?

  • #2
    The SimpleAsyncTaskExecutor simply creates a new Thread for each task - just because you see N thread ids in the logs, doesn't therefore mean that there are N concurrent executions. (As far as I know there is no way to tell that from the logs.) ThreadPoolTaskExecutor might behave more as you expect in the logs at least.

    Comment

    Working...
    X