Announcement Announcement Module
Collapse
No announcement yet.
HornetQ topic delayed messages Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • HornetQ topic delayed messages

    Hi,

    My enterprise application is divided into modules connected over JMS. First module acts as a dispatcher - it receives request from remote clients, does some request processing and forwards message to the JMS topic.
    Also, there are multiple other modules which are registered in topic. Every time message comes, it should be forwarded to all of these modules (consumers). Only one consumer is able to process one kind of messages - each one is doing some detection. If module detects that it should handle this kind of message, it does so, and replies to dispatcher. If, however, consumer will find out that it is not responsible to handle this kind of message, it sends processing to the nullChannel.

    Whole integration is based on SI components:

    Dispatcher [jms:outbound-gateway] - HORNETQ TOPIC - [jms:inbound-gateway] Consumer (multiple instances)

    Below is the typical use case (1 dispatcher, 2 consumers):
    1. remote client sends request to the dispatcher
    2. dispatcher does some processing on the message, and forwards it to JMS topic
    3a. first instance of consumer analyzes message content, and it found appropriate handler - it starts processing. When it will be finished, consumer replies to dispatcher with processed message
    3b. (at the same time when first consumer process message) second instance of consumer analyzes message content, and it cannot find an appropriate handler. It drops message by sending it to SI nullChannel.

    Lets assume that this scenario is reapeated e.g. 100 times - every time only first consumer can handle this type of message.

    Because consumer code is very fast, first consumer process every of these messages, and every time sends response to the dispatcher.
    Second consumer should 100 times drop message.

    The problem is:
    Messages comes to the first consumer from HornetQ very fast.
    Messages comes to the second consumer from HornetQ very slow.

    It is not the case that 2nd consumer is slow - I sniffed network link, and it looks like hornetq broker slows down messages delivering frequence to the second consumer

    This leads to the situation, that the first consumer has already processed all messages and send them back to dispatcher, and at the same time second consumer still receives delayed messages (client has already processed response).

    Is it possible that this situation is caused by sending JMS messages to the nullChannel? I've done integration testing before (using ActiveMQ broker), and then, consumers has been receiving messages simulatenously. Now, with HornetQ first consumer receives them really fast, second - really slow.

    Below is part of my configuration (consumer / producer):

    Code:
    <beans:bean id="connectionFactory"
        class="org.springframework.jms.connection.CachingConnectionFactory">
        <beans:property name="targetConnectionFactory" ref="targetConnectionFactory" />
        <beans:property name="sessionCacheSize" value="10" />
        <beans:property name="cacheProducers" value="false" />
      </beans:bean>
    
    
        <beans:bean id="targetConnectionFactory" class="org.hornetq.jms.client.HornetQConnectionFactory">
        <beans:constructor-arg>
          <beans:bean class="org.hornetq.api.core.TransportConfiguration">
            <beans:constructor-arg
              value="org.hornetq.integration.transports.netty.NettyConnectorFactory" />
            <beans:constructor-arg>
              <beans:map key-type="java.lang.String" value-type="java.lang.Object">
                <beans:entry key="host" value="localhost" />
                <beans:entry key="port" value="5445" />
              </beans:map>
            </beans:constructor-arg>
          </beans:bean>
        </beans:constructor-arg>
        <beans:property name="clientID" value="AnossstherConsumer"/> <!-- different for every consumer -->
      </beans:bean>
    
    
      <beans:bean name="requestTopic" class="org.hornetq.api.jms.HornetQJMSClient" factory-method="createTopic">
        <beans:constructor-arg index="0" value="TopicRequestDemo"/>
      </beans:bean>
    
    
       <beans:bean name="responseTopic" class="org.hornetq.api.jms.HornetQJMSClient" factory-method="createTopic">
        <beans:constructor-arg index="0" value="TopicResponseDemo"/>
       </beans:bean>

    HornetQ configuration in hornetq-jms.xml:


    Code:
    <connection-factory name="NettyConnectionFactory">
      <connectors>
        <connector-ref connector-name="netty"/>
      </connectors>
      <entries>
        <entry name="/ConnectionFactory"/>
        <entry name="/XAConnectionFactory"/>
      </entries>
      <consumer-window-size>-1</consumer-window-size>
      <consumer-max-rate>-1</consumer-max-rate>
    </connection-factory>
    Thanks for any advices!
    Piotr
    Last edited by PiotrD; Apr 14th, 2011, 11:59 AM.

  • #2
    After investigating this issue I can say that most probably there is a flaw in integration of HornetQ with Spring Integration JMS system.

    Let's consider the following situation (HornetQ as a JMS broker):
    1. system A send message to topic using JMS outbound gateway
    2. system B receives messages from topic using inbound gateway
    3. system C receives messages from topic using inbound gateway
    4. System D receives messages using non-Spring Integration JMS mechanism

    System B process data and sends response to the system A
    Systems C and D do not respond to system A (system C forwards message processing to the null channel) ; these systems can for example be designed only to gather some data and basis on them, display statistics of requests.

    After couple of send messages system C starts receiving messages delayed (one-two seconds of delay for every message).
    At the same time systems B and D receives messages undelayed.
    (Everything is for the same message and for the same topic!)

    It's my suspicion only, but I think that it is caused because of temporary channels set up under the hood in JMS gateways. For each message this kind of channel is instantiated (as I could read from gateway source code), but no reply is send from the message listener. HornetQ thinks, that receiver is very slow (it doesn't send any response) and starts delaying messages to give some additional time for receiver.

    As I've written, it's only my suspicion, why this issue happens. If I try to repeat the same on e.g. ActiveMQ broker, messages are undelayed for each receiver, however after some time broker hangs up (too many temporary channels instantiated?)

    I would really appreciate if someone from Spring Integration team could take a look at this issue. We need such deployment type - one system responds, other system receives messages just to analyze them - everything over one topic.

    Thanks in advance,
    Piotr

    Comment

    Working...
    X