Announcement Announcement Module
Collapse
No announcement yet.
Message Redelivery with DMLC + ActiveMQ + TxManager not working Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Message Redelivery with DMLC + ActiveMQ + TxManager not working

    We are currently trying to integrate ActiveMQ (5.5.0) into a product using Spring DMLC (2.5.6). The integration works so far except that the message redelivery is broken and we have no clue how to solve this. We want to rollback the transaction programmatically when there are certain error cases hit (no throwing of RuntimeExceptions).

    Config:

    <bean id="queueConnectionFactory"
    class="org.apache.activemq.pool.PooledConnectionFa ctory">
    <property name="connectionFactory">
    <bean class="org.apache.activemq.ActiveMQConnectionFacto ry">
    <property name="brokerURL">
    <value>${amq.broker.url}</value>
    </property>
    <property name="redeliveryPolicy" ref="amqRedeliveryPolicy" />
    </bean>
    </property>
    </bean>

    with a defined Redelivery Policy:

    <bean id="amqRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
    <property name="useExponentialBackOff" value="true" />
    <property name="backOffMultiplier" value="2" />
    <property name="initialRedeliveryDelay" value="1000" />
    <property name="maximumRedeliveries" value="5" />
    </bean>

    <bean id="txManager" class="org.springframework.jms.connection.JmsTrans actionManager">
    <property name="connectionFactory" ref="queueConnectionFactory"/>
    </bean>

    JMS Consumer:
    <property name="connectionFactory" ref="queueConnectionFactory"/>
    <property name="transactionManager" ref="springPlatformTxManager"/>
    <property name="cacheLevelName" value="CACHE_AUTO"/>

    The problem is that we can only either get a setup with an external transaction manager using the correct transactional behaviour, but with incorrect redelivery behaviour to work OR the correct redelivery behaviour if using a single consumer, but without external control over a TM (autocommit and rollback via exceptions).

    The ActiveMQ redelivery handling is done on the consumer side to avoid overhead in the broker (change from 3.x to 4.x, see http://activemq.apache.org/message-r...andling.html):

    "On rollback, since nothing has been acked yet, and all messages are still available in an internal consumer queue, the messages are re-dispatched from the internal consumer queue. This reduces redelivery dispatch overhead, at the cost of the broker not being aware that redeliveries are occurring."

    If redelivery is handled on a consumer level, all consumers need to share the current state and coordinate their work. You can also find a lot of user complaints in the Spring forums struggling with the same problem and the answer is always the same � use consumer caching (cacheLevel=CACHE_CONSUMER).

    It looks like the broker is delivering the message again, although it should not have received and ack from the client in the case we are doing the rollback. I suspect that if we have a transaction, something in the code is sending the broker the information, that he can resend the message and another consumer will pick it and handle it as a first time delivery � so completely without any delay in between.

    Maybe someone has an idea how to get this combination running (without the consumer caching workaround and with a transaction manager).

    Thanks!

    Marek

  • #2
    The common solution here (as you already stated) is to enable consumer caching. What I don't understand is why you don't seem to want to use it? And, if I recall correctly, you will also need to enable sessionTransacted on the message listener config. These two config options must be used together for redelivery to work properly.

    Bruce

    Comment


    • #3
      Hi Bruce,

      AFAIK using sessionTransacted=true implies that transactions are managed by ActiveMQ - but we need to use an external transaction manager (using local transactions) to commit and rollback the transactions (depending on custom mediation logic in the product). The combination CACHE_CONSUMER + SESSION_TRANSACTED + Transaction Manager does not work for redelivery in ActiveMQ. We are currently trying to patch the core code to make this combination work. We have also added https://issues.apache.org/jira/browse/AMQ-3597 to make the redelivery independent of the consumer (which was done to decrease load on the broker but fails to work in the above combination).

      Thanks!

      Marek

      Comment


      • #4
        When I last tested this back around 2009, despite the DMLC docs regarding JMS transactions stating that the sessionTransacted property is only for use with local transactions, this property needed to be enabled to activate rollbacks even when using an external transaction manager. If I recall correctly, this had something to do with whether the session was closed or held open upon exception. When I tested this I was using Atomikos and it worked correctly. I encourage you to test it out in your environment.

        After a quick Google search, I found a blog post on this very topic with some discussion in the comments amongst three transactions aficionados.

        Bruce

        Comment


        • #5
          I'm interrested for the solution or a clear explanation on this because all this is complicated...

          * SessionTransacted=true: means ActiveMQ manage the transaction? Doesn't seem
          * Setting a transactionManager in DefaultMessageListenerContainer disables the CacheLevel (this.cacheLevel = (getTransactionManager() != null ? CACHE_NONE : CACHE_CONSUMER)
          * Setting a transactionManager automatically activates the sessionTransacted
          Code:
          (AbstractPollingMessageListenerContainer)
          	public void initialize() {
          		// Set sessionTransacted=true in case of a non-JTA transaction manager.
          		if (!this.sessionTransactedCalled &&
          				this.transactionManager instanceof ResourceTransactionManager &&
          				!TransactionSynchronizationUtils.sameResourceFactory(
          						(ResourceTransactionManager) this.transactionManager, getConnectionFactory())) {
          			super.setSessionTransacted(true);
          		}
          * RedeliveryPolicy doesn't seem active when using transactionManager.
          * Message are not redelivered even when settings RedeliveryCounter to -1 (NO_MAXIMUM_REDELIVERIES). Worst, they are dropped off in DLQ which is absolutely not good when I'm trying to get message redelivered infinitely in case of problem (code crash, I don't want to try another message when ordering is important)
          * Using CACHE_CONSUMER, if I see what's going on, shares the session between all consumers. When one crash, session is closed and all other consumer gets a session-closed-exception
          * Not using CACHE_CONSUMER: when a MessageListener throws an exception, JmsTransactionManager tells "Rolling back transactions", but message isn't redelivered anyway...
          * I don't have a J2EE application server under the hand, so can't use JtaTransactionManagement...

          Pfff, not not clear.... ((

          Comment


          • #6
            Ok, I found a solution that works...

            Code:
            <bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager" p:connectionFactory-ref="connectionFactory"/>
            
            <bean name="listenerContainer" abstract="true" class="org.springframework.jms.listener.DefaultMessageListenerContainer" p:connectionFactory-ref="connectionFactory" p:sessionTransacted="true" p:transactionManager-ref="jmsTransactionManager" p:receiveTimeout="60000" p:cacheLevelName="CACHE_CONSUMER"/>
            
            <bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" p:brokerURL="tcp://localhost:61616" p:redeliveryPolicy-ref="amqRedeliveryPolicy"/>
            
            <bean id="amqRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy" p:maximumRedeliveries="-1" p:initialRedeliveryDelay="0"/>
            
            <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory" p:targetConnectionFactory-ref="amqConnectionFactory"/>
            I found a key solution at the initialRedeliveryDelay. By default, it's set to 1000 (1 second) meaning that failed message will be redelivered after 1 second. In my case, I set it to 0 to be sure that message#2 won't be read before message#1 is successfully processed. Also CACHE_CONSUMER seems to be THE solution.

            By the way, I'm using Spring 3.0.5 with ActiveMQ 5.5.0.

            Comment


            • #7
              Small modification compared to what I said before.

              Using CachingConnectionFactory seems to share connection & session between multiple consumer which seems to fail on rollback.
              It's not so clear but I think it's not really thread-safe.

              Anyway, I changed to PooledXFactory and removed CACHE_CONSUMER. Cache_consumer also seems to do some sharing of session which makes some commit or rollback to be mixed between different consumers.

              Code:
              <bean name="listenerContainer" abstract="true" class="org.springframework.jms.listener.DefaultMessageListenerContainer" p:connectionFactory-ref="connectionFactory" p:sessionTransacted="true" p:transactionManager-ref="jmsTransactionManager" p:receiveTimeout="60000" p:cacheLevelName="CACHE_NONE"/>
              
              <bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" p:connectionFactory-ref="amqConnectionFactory"/>

              Comment

              Working...
              X