Announcement Announcement Module
Collapse
No announcement yet.
Integration and simple multithreading Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Integration and simple multithreading

    Hello guyz,

    I'd like to use multithreading for each component of my spring integration in my project. Here is an example XML configuration:
    Code:
    <poller default="true" id="poller" time-unit="SECONDS" fixed-delay="60"/>
    
    	<beans:bean id="inboundFileToStringsSplitter" class="com.shopping.si.splitter.InboundFileToStringsSplitter" />
    	<beans:bean id="inboundStringToOscarSaleDataTransformer" class="com.shopping.si.transformer.InboundStringToOscarSaleDataTransformer" />
    	<beans:bean id="inboundOscarSaleDataProcessor" class="com.shopping.si.sa.InboundOscarSaleDataSA"/>
        	<beans:bean id="successPointsServiceActivator" class="com.shopping.si.sa.SuccessPointsSA" />
    	
    	<channel id="inboundFileChannel"/>
    	<channel id="inboundStringChannel"/>
    	<channel id="inboundOscarSaleDataChannel"/>
        <channel id="successPointsChannel" />
    
    	<file:inbound-channel-adapter directory="${com.shopping.listen.dir}" channel="inboundFileChannel" id="fileChannelAdapter" />
    
    	<splitter id="fileToStringsSplitter" input-channel="inboundFileChannel" output-channel="inboundStringChannel" ref="inboundFileToStringsSplitter" />
    	<transformer id="stringToOscarSaleDataTransformer" input-channel="inboundStringChannel" output-channel="inboundOscarSaleDataChannel" ref="inboundStringToOscarSaleDataTransformer" />
    
    	<service-activator id="saInboundOscarSaleDataProcessor" input-channel="inboundOscarSaleDataChannel" ref="inboundOscarSaleDataProcessor" />
    	<service-activator id="saSuccessPointsServiceActivator" input-channel="successPointsChannel" ref="successPointsServiceActivator" />
    I've read that you can use task-executors for multithreading. But as I understood it - you have to define it per each channel, per each component. That sucks imho. Please correct me If I'm wrong.
    P.S.: little offtopic question: channel-queue - what that queue actually does ? As I understood, channel will collect N messages defined by queue property and only then processes them. Am I right or wrong?

    Mine idea was if spring would allow (or better automatically handle this) to define some property, like "threads" for each (not every each, but most) component and create defined amount of threads to process messages.
    Illustraiting on the example above what I want to accomplish:

    1) <file:inbound-channel-adapter directory="${com.shopping.listen.dir}" channel="inboundFileChannel" id="fileChannelAdapter" />
    Well, I don't really mind if that one works with one file w/o multiply processing. But as option it would be nice to have something like:
    Code:
    <file:inbound-channel-adapter directory="${com.shopping.listen.dir}" channel="inboundFileChannel" id="fileChannelAdapter" concurrent-files="2" />
    So that will mean: InboundChannelAdapter, if you see 2 files in specified directory, please create 2 instances/threads of inboundFileToStringsSplitter and process the simulatenously. This will actually should tell splitted to create 2 instances of it as well.
    Maybe it is hard to handle, but OK - i can accept that should work only with 1 file at the time. But lets continue.

    2) inboundFileToStringsSplitter splits fileLines to strings and send them to the transformer inboundStringChannel. Here I DEFINATELY want to have the following option:
    Code:
    <transformer id="stringToOscarSaleDataTransformer" input-channel="inboundStringChannel" output-channel="inboundOscarSaleDataChannel" ref="inboundStringToOscarSaleDataTransformer" threads="100" />
    So if there were 100 string lines passed to the inboundStringChannel it would create 100 instances/threads of transformer and process them simulatenously. I think this is pretty easy to implement.

    3) Transformer sends data to the inboundOscarSaleDataChannel which is handled by ServiceActivator. Here I want just the same threads property to process incoming messages simulatenously. For example: SpringIntegration sees that there more messages in that channel, but the current service activator is still busy - just go ahead and create new instance of SA and process it.

    So in conclusion, I'd like to ask all two following questions:
    1) Why spring still doesn't have such easy to implement features ? Or they are not that easy to implement at all ? What difficulties could be faced in that case ?

    2) Which is currently best and easy option for me to implement the algorythm I've described using current implementation of Spring Integration ?

    Thanks all !

  • #2
    I think you are misunderstanding the framework.

    It wouldn't make sense to have threading on every (or even many) components.

    You simply identify the point in your flow where you want to multiprocess and add an ExecutorChannel there.

    For example, after your file inbound adapter, you make the channel an Executor channel.

    Then, the entire downstream flow runs in parallel on a thread. If somewhere downstream you want further threading (e.g. after your splitter) you add an Executor channel there.

    It would be very unusual to have more than a few such points in even some of the most sophisticated flows.

    An Excecutor channel hands the message off to another thread (which could be from a pool to allow throttling, or from one of several types of task executor).

    The QueueChannel is an alternative mechanism to do a handoff to another thread. The difference being the consumer needs a poller (with some schedule/trigger) to pull messages off the queue.

    It doesn't have to "fill up" before messages are consumed - it's entirely based on the poller's trigger.

    An 'aggregator' can be used to batch up messages if that's what you need.

    Comment


    • #3
      Thanks a lot Gary.
      Yea, after i've dived deeper into documention I realised how it should work.
      Thanks as well for describing how it could be implemented, found it very useful.

      Comment


      • #4
        One other point: the most efficient settings for a thread pool are usually based on the number of cores on a machine (e.g. n+1) IF the tasks being handled are CPU-bound (not blocking I/O). So, in the case that you have 100 items and send those downstream to a CPU-bound handler, you don't usually want 100 threads (unless you have 100 cores). Hope that makes sense.

        Comment


        • #5
          Originally posted by Mark Fisher View Post
          One other point: the most efficient settings for a thread pool are usually based on the number of cores on a machine (e.g. n+1) IF the tasks being handled are CPU-bound (not blocking I/O). So, in the case that you have 100 items and send those downstream to a CPU-bound handler, you don't usually want 100 threads (unless you have 100 cores). Hope that makes sense.
          Yeah that makes sense and is true.
          But this is concurrency behaviour (CPU bound).
          Sometimes usual threads which are running even on the same processor give you some performance over running just one thread. And that is the case I was talking about referrring to multithreading.

          So there is a question appeared in my mind with your answer: which behaviour will be used by spring integration ? Will it make threads amount equal to CPUs amount (CPU bound threads) or either it will still work with "virtual"/one-processor threads ?
          So for example if I have just one CPU it will not make any sense to use async SI behaviour ?

          Comment

          Working...
          X