Announcement Announcement Module
Collapse
No announcement yet.
RabbitMQ Multiple consumers of the same queue don't receive the desired messages Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • RabbitMQ Multiple consumers of the same queue don't receive the desired messages

    Hello.

    We have stumbled over this issue and would really appreciate any help.

    We are using SI and RabbitMQ to create multiple consumers (on different machines) for a single queue (called messagesQueue).

    We want to configure an environment in which, if you send a message to the queue with a certain custom header value, that message will be sent to the consumer that accepts that header value.

    We publish multiple messages with different header values. Each consumer is configured to accept only messages with a certain header value.

    e.g. : Consumer1 - only accepts messages with header = 'C1', Consumer2 with header = 'C2' and so on...

    Our problem is that the messages are being sent 'randomly' (more like in a round-robin fashion from our tests) to consumers and if the consumer that receives the message, does not want that message, it ends up in the discard channel (all consumers are set up with the same discard-channel).

    e.g. Consumer1 receives a message with header 'C2' - it gets discarded; Consumer2 receives a message with header 'C3' - it gets discarded and so on...


    Here are some configuration snippets:

    Common config :

    Code:
       <rabbit:connection-factory id="connectionFactory"
            host="host" username="username" password="password" />
        <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />
        <rabbit:admin connection-factory="connectionFactory" />
        <rabbit:queue name="messagesQueue" />
        <rabbit:queue name="discardQueue"/>
      
        <channel id="messagesChannel_FROM_Rabbit" />
        <channel id="discardChannel" />
        
        <rabbit:direct-exchange name="test.exchange" >
            <rabbit:bindings>
                <rabbit:binding queue="messagesQueue" key="messagesQueue.binding"></rabbit:binding>
                <rabbit:binding queue="discardQueue" key="discardQueue.binding"></rabbit:binding>
            </rabbit:bindings>
        </rabbit:direct-exchange>
    
         <int-amqp:inbound-channel-adapter channel="messagesChannel_FROM_Rabbit" queue-names="messagesQueue"
            connection-factory="connectionFactory" mapped-request-headers="MY_HEADER" />
    
        <int-amqp:outbound-channel-adapter channel="discardChannel" amqp-template="amqpTemplate" mapped-request-headers="MY_HEADER" exchange-name="test.exchange" routing-key="discardQueue.binding" />

    Consumer A :

    Code:
        <beans:bean id="headerFilterRabbitA" class="com.springintegration.test.headerFilter.HeaderFilterRabbitA" />
    
        <beans:bean id="expressionSelectorA"
            class="org.springframework.integration.filter.ExpressionEvaluatingSelector">
            <beans:constructor-arg value="headers[MY_HEADER] eq 'A'" />
        </beans:bean>
    
        <chain id="chain_FROM_RabbitA" input-channel="messagesChannel_FROM_Rabbit">
            <filter ref="expressionSelectorA" />
            <service-activator ref="headerFilterRabbitA" method="filterRabbitA"  />
        </chain>
    Consumer B :
    Code:
        <beans:bean id="headerFilterRabbitB" class="com.springintegration.test.headerFilter.HeaderFilterRabbitB" />
    
        <beans:bean id="expressionSelectorB"
            class="org.springframework.integration.filter.ExpressionEvaluatingSelector">
            <beans:constructor-arg value="headers[MY_HEADER] eq 'B'" />
        </beans:bean>
    
         <chain id="chain_FROM_RabbitB" input-channel="messagesChannel_FROM_Rabbit">
             <filter ref="expressionSelectorB"/>
             <service-activator ref="headerFilterRabbitB" method="filterRabbitB" />
         </chain>

    The results vary a lot. We made a test in which we start 2 consumers that accept 'C1' and 'C2'.
    We then send 5 messages to the queue with headers 'C1', 'C1', 'C1', 'C2' and 'C3'. In some cases Consumer1 ends up with 1 message, in other cases it ends up with 2 messages. Consumer2 sometimes gets 1 message, sometimes no message. The message with 'C3' is always sent to the discard channel (as intended) but some messages with 'C1' and 'C2' are also sent there.


    Dependencies used :

    Code:
            
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-amqp</artifactId>
            <version>3.0.2.RELEASE</version>
        </dependency>
    RabbitMQ version : 3.2.4

    Is there a way to properly configure the environment to achieve the desired result?

    Thank you for your time!
    Last edited by AlexFlorian; May 19th, 2014, 10:51 AM.

  • #2
    You need to use a headers exchange in rabbit instead of the direct exchange and each consumer gets its own queue; that way the messages are routed by the broker and the consumers are not competing for the same messages.

    Comment


    • #3
      Thank you for the suggestion. We had that in mind, but we were hoping to solve the issue just by filtering. If that's not possible, we'll just have to use a queue for every consumer.
      Thanks again for the help.

      Comment


      • #4
        We have tried it like this and it works.

        Producer code :
        Code:
            <rabbit:queue name="queue-a"/>
            <rabbit:queue name="queue-b"/>
            <rabbit:queue name="queue-c"/>
        
            <channel id="messagesChannel_TO_Rabbit" />
        
            <rabbit:headers-exchange name="someHeadersExchange">
                <rabbit:bindings>
                    <rabbit:binding key="MY_HEADER" value="A" queue="queue-a"/>
                    <rabbit:binding key="MY_HEADER" value="B" queue="queue-b"/>
                    <rabbit:binding key="MY_HEADER" value="C" queue="queue-c"/>        
                </rabbit:bindings>
            </rabbit:headers-exchange>
        However, is there any chance of making this 'routing' be dynamic? To create a rule for routing, not create specific routes. For example :

        Code:
        <rabbit:binding key="MY_HEADER" queue="queue-${header.value}"/>
        We do not want to specify the queues in the configuration (or add them at runtime). Could we create some kind of rule like this? In which the exchange knows which header's value to check and what queue to send it to based on an expression that we tell it? Is there any other way of doing this kind of configuration if it's not possible with headers-exchange?

        Thank you.

        Comment


        • #5
          How about just have only one AMQP Queue, but build routing logic using Spring Integration Router?
          In this case you will have only one Consumer, but provide the appropriate logic to invoke desired service by HeaderValueRouter

          Comment


          • #6
            You can go back to a direct exchange (or even the default exchange); still have a queue for each consumer (bound by routing key) - queues are automatically bound to the default exchange with their queue names as their routing keys.

            And, on the outbound side, use
            Code:
            routing-key-expression="'queue-' + headers['MY_HEADER']"

            Comment


            • #7
              Thanks Gary. That works great with just a little hitch. The routing prefix is environment specific. We have queueName-dev-HEADER_VALUE, queueName-stage-HEADER_VALUE and so on.

              Can we take the first part of the routing-key-expression from a properties file?

              We have a properties file like this :

              Code:
              <util:properties id="rabbitmqProps" location="classpath:/rabbitMQ/rabbitmq-${INSTANCE-TYPE}.properties" />
              We tried something like :

              Code:
              routing-key-expression="#{rabbitmqProps['rabbitmq.subscriptionsQueue']} + '-' + headers['MY_HEADER']"
              However, this expression is not evaluated correctly. Is the expression incorrect or is this not supported?

              Comment


              • #8
                Right: your SpEL isn't correct.
                Should be like this:
                HTML Code:
                routing-key-expression="'#{rabbitmqProps['rabbitmq.subscriptionsQueue']}-' + headers['MY_HEADER']"
                You use here the Bean Definition Expresison to resolve a literal value on definition phase, but from other side you build here the another expression for runtime. So the result of first expression should be literal, otherwise the runtime expression tries to resolve it as a evaluation context variable.

                Comment


                • #9
                  That did the trick. It's now working as we want it. Thank you both for your help.

                  Comment

                  Working...
                  X