Announcement Announcement Module
Collapse
No announcement yet.
Competing Consumers Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Competing Consumers

    I'm trying to refactor the Cafe sample to use Competing Consumers [EIP502] (on the hotDrinks channel).

    First I hoped to just define two barista's and hook them up to the same queue, but from the results and documentation it seems that only one handler is being used by design.

    I know I could use a router and route to two different queues (one for each barista), but I don't want that because it seems overkill to me.

    I know I could use concurrency on the endpoint, but that is fundamentally different from using two different baristas

    Why doesn't this just work?

    Code:
            <endpoint input-channel="hotDrinks" handler-ref="barista1"
    		handler-method="prepareHotDrink"/>
    	<endpoint input-channel="hotDrinks" handler-ref="barista2"
    		handler-method="prepareHotDrink"/>

  • #2
    Iwein,

    I have added an issue for this: http://jira.springframework.org/browse/INT-152

    In the immediate term, I believe this should provide the behavior you are expecting:
    Code:
    <endpoint input-channel="hotDrinks" handler-ref="barista2" handler-method="prepareHotDrink">
        <concurrency core="1" max="1"/>
    </endpoint>
    <endpoint input-channel="hotDrinks" handler-ref="barista3" handler-method="prepareHotDrink">
        <concurrency core="1" max="1"/>
    </endpoint>
    The problem is that with a null ConcurrencyPolicy (the current default), the endpoint's handler is not being decorated with ConcurrentHandler (the one that uses a thread pool). Therefore the dispatcher's thread is actually executing the handle(message) method rather than returning immediately to process more incoming messages.

    Could you add the explicit concurrency settings as shown above and let me know if that works as you would expect? I'm considering whether the no-arg constructor of ConcurrencyPolicy should always be used as the default (that has core=1 and max=10 currently).

    -Mark

    Comment


    • #3
      Done and done. This works exactly as I intended. I actually can't think of any reason why I didn't try this before...

      I've used different concurrency settings and those seem to work fine too. The only thing that I didn't expect is that the second thread on the first barista goes before the first thread on the second.

      Since the only use case I can think of is some sort of load balancing the last comment shouldn't be a problem.

      On the ConcurrencyPolicy I'm not completely sure. On the one hand, as long as no threads are wasted it seems like a fine solution to me to always wrap. On the other hand it introduces concurrency implicitly and that might be confusing.

      Comment


      • #4
        process the same message

        This seem to work, but both barista2, and 3 process the same drink (same message). Is this right? How do I configure to have barista2 and 3 process different message?

        Originally posted by Mark Fisher View Post
        Iwein,

        I have added an issue for this: http://jira.springframework.org/browse/INT-152

        In the immediate term, I believe this should provide the behavior you are expecting:
        Code:
        <endpoint input-channel="hotDrinks" handler-ref="barista2" handler-method="prepareHotDrink">
            <concurrency core="1" max="1"/>
        </endpoint>
        <endpoint input-channel="hotDrinks" handler-ref="barista3" handler-method="prepareHotDrink">
            <concurrency core="1" max="1"/>
        </endpoint>
        The problem is that with a null ConcurrencyPolicy (the current default), the endpoint's handler is not being decorated with ConcurrentHandler (the one that uses a thread pool). Therefore the dispatcher's thread is actually executing the handle(message) method rather than returning immediately to process more incoming messages.

        Could you add the explicit concurrency settings as shown above and let me know if that works as you would expect? I'm considering whether the no-arg constructor of ConcurrencyPolicy should always be used as the default (that has core=1 and max=10 currently).

        -Mark

        Comment


        • #5
          I had the same experience, but you're wrong about them processing the same drinks, I think. The baristas have a counter that they use to tell you which drink they are processing. That is why they tell you that they are processing the same drink. If you print the hashcode of the drink object you'll see that they are in fact different.

          You can also see by the counters that they process 100 drinks together. I modified my sample to have a counter property inside the drink and set it before deviding over different baristas. It looked ok then.

          Comment


          • #6
            I guess another option is to make the 'hotDrinkCounter' and 'coldDrinkCounter' static.

            -Mark

            Comment


            • #7
              I am trying to do this modification to the Cafe demo.
              When I look at the XSD schema documents I cannot find an endpoint element anywhere,
              Also when I get Eclipse to do a validation against the schema, it returns the following error:
              Description Resource Path Location Type
              cvc-complex-type.2.4.c: The matching wildcard is strict, but no declaration can be found for element 'endpoint'. cafeDemo.xml SpringIntegration/src/org/springframework/integration/samples/cafe/xml line 47 XML Problem

              Is there definitive comprehensive documentation for ALL the elements that Spring Integration supports somewhere that I seem to be missing?

              Comment


              • #8
                The "endpoint" elements were replaced quite a while ago with more direct mappings to the patterns described in Enterprise Integration Patterns.

                The best place to start is the reference manual:
                http://static.springframework.org/sp...reference.html

                The endpoint and element discussions start with chapter 5.

                Also, you can checkout Spring Integration and import the projects into Eclipse or IDEA (see the readme.txt). Then, with the XSD editor, you will see a list of all of the elements. The XSD is documented albeit sparsely. We will be enhancing that documentation as well in upcoming releases.

                Hope that helps.
                Mark

                Comment


                • #9
                  How would you do competing consumers with the current syntax? How would you specify how many concurrent consumers etc.

                  Comment


                  • #10
                    When you are consuming from a PollableChannel (one with a <queue/> sub-element), then the <poller/> sub-element can contain a "task-executor" reference. The reference object can be any instance that implements the Spring TaskExecutor interface. The most common case would be a ThreadPoolTaskExecutor for which the concurrent consumers would be determined by the core-size/max-size of the pool. Here's an example with a modification in cafeDemo.xml:
                    Code:
                    <service-activator input-channel="hotDrinks" ref="barista" method="prepareHotDrink" output-channel="preparedDrinks">
                       <poller task-executor="pool">
                          <interval-trigger interval="1000"/>
                       </poller>
                    </service-activator>
                    
                    <thread-pool-task-executor id="pool" max-size="10"/>
                    Note that the 'thread-pool-task-executor' element is provided for convenience, but again - any implementation could be provided (e.g. WorkManagerTaskExecutor)

                    Comment

                    Working...
                    X