Announcement Announcement Module
Collapse
No announcement yet.
acknowledge="transacted" problems with JMS Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • acknowledge="transacted" problems with JMS

    I'm dealing with a situation where messages in a given context must be consumed in order, but I have many contexts. The workload is such that it must be distributed across many hosts. I had thought to implement one JMS queue for each context and have a single listener for each context on each host in the cluster. I figured a message for a given queue would be delivered to a single host, which wouldn't acknowledge the message until it had finished processing it, which would prevent any other messages from that queue from being delivered. I'm testing with openMQ purely because it was already on my laptop but will eventually be running against sonic.

    I've implemented listeners in every possible way and run int problems with every one of them.

    At least with openMQ, if x consumers show up on a single queue, then the first x messages will be immediately delivered. If any are rolled back, then they will be re-delivered. This allows messages to be consumed out of order. If I configure the server-side to only allow a single consumer per-queue, then the first host that starts up beings the consumer on every queue and all subsequent hosts just wind up blocked until the first host dies, then the second host takes over. So either my messages get consumed out of order or my workload doesn't get distributed across the cluster at all (I realize that a single queue would be serialized if my method worked, but with many queues, I should still have some work being performed on all hosts if I get random or round robin assignment to consumers).

    This proved to be the case whether I use spring-integration via message-driven-channel-adapter sent to a service-activator or if I just use spring-jms and DefaultMessageListenerContainer and MessageListener.

    I then thought that my problem is using message-driven consumers where I don't have any control over how the jms client connections are utilized. So I then switched to using inbound-channel-adapter with a poller configured with very short receive-timeout and short delay-interval. I thought that would cause no single consumer to remain attached to the queue as a consumer for very long, allowing consumers from other hosts an opportunity to switch in and do some of the work. Instead, I ran into a completely different problem:

    No messages are ever acknowledged, regardless of whether an exception is being thrown by the service activator method. With messages never being acknowledged, I never get past the first message on the queue, but it doesn't look as though any context witching between the various consumers is happening, even though my poller is frequently timing out on the receive. This problem feels like a bug in spring-integration - messages should be getting acknowledged, but they are not. I have verified that I am not throwing an exception that I am unaware of by placing a try/catch around my entire service activator method.

    Code:
        <jmsi:inbound-channel-adapter id="inPoll1" destination="jmsWorker1" channel="worker1Input" acknowledge="transacted">
        	<int:poller max-messages-per-poll="1" receive-timeout="1" fixed-delay="50"/>
        </jmsi:inbound-channel-adapter>
        <jmsi:inbound-channel-adapter id="inPoll2" destination="jmsWorker2" channel="worker2Input" acknowledge="transacted">
        	<int:poller max-messages-per-poll="1" receive-timeout="1" fixed-delay="50"/>
        </jmsi:inbound-channel-adapter>
        <jmsi:inbound-channel-adapter id="inPoll3" destination="jmsWorker3" channel="worker3Input" acknowledge="transacted">
        	<int:poller max-messages-per-poll="1" receive-timeout="1" fixed-delay="50"/>
        </jmsi:inbound-channel-adapter>
                
        <int:channel id="worker1Input"/>
        <int:channel id="worker2Input"/>
        <int:channel id="worker3Input"/>
    
        <!-- find workerBean via annotation -->
        <context:component-scan base-package="my.package.name"/>     
        
        <!-- assign workerBean to be listener on input channels -->       
        <int:service-activator input-channel="worker1Input" ref="workerBean"/>
        <int:service-activator input-channel="worker2Input" ref="workerBean"/>
        <int:service-activator input-channel="worker3Input" ref="workerBean"/>
    And the ServiceActivator bean looks like this:

    Code:
    @MessageEndpoint
    public class WorkerBean {
    
    	@ServiceActivator
    	public void upperCase(String text) {
                try {
                    System.out.println("JMS Work: " + text);
                    Thread.sleep(3000);
                }
                catch (Exception ex) {
                    throw new RuntimeException(ex);
                }
                /* roll back at random */
                if (Math.random() * 4 < 1) {
                	System.out.println("rolling back");
                	throw new RuntimeException("roll back");
                }
                System.out.println("done");
    	}
    }
    Incidentally, if I change my @ServiceActivator method to take a Message as a parameter instead of the payload string, then I never receive messages at all. This prevents me from being able to use the message.acknowledge() method to try to acknowledge them manually

    Can anyone out there comment on solving my core problem - consuming messages in order dspite having multiple listeners on each queue? If not, can anyone comment on using acknowledge="transacted" with service-activator? If I remote that attribute entirely, messages do get acknowledged, though they get acknowledged whether I throw an exception or not, which defeats my purpose. Adding the attribute forces no messages to acknowledge, even when no exception is thrown. That's got to be a bug, I think.

    I'm using Spring-Integration 2.2.0.RELEASE

  • #2
    As a basic rule of thumb, you can only have a single thread consuming messages if you want proper ordering of messages.

    This holds true even when not using Spring Integration. In your scenario you have competing consumers on the same queue and therefore ordering is not guaranteed. Even with a single thread, some brokers will re-queue the message at the back of the queue when a failure has occur and this could impact the ordering of messages. The last major factor (there may be more) is how to process messages in the queue when there is a message that is sent to the Dead Letter Queue as this can cause messages to be transacted out of order. Finally, many systems don't really need to process the *entire* queue in a FIFO order but they need to ensure that a business message for the same entity are processed in order. For example: All updates for invoice id: 12345 need to be processed in order but updates for invoice id: 3456 can happen in parallel. Here are some ideas depending which will depend on how strict your ordering solution needs to be:

    1. All messages in queue must be processed in FIFO order - Use a single thread to pull messages. I know sonicmq allows you to create an exclusive consumer which will allow the process to fail over if you loose a node in the cluster

    2. Messages should be ordered by business entity id - Many brokers (sonicmq, activemq) has a "message group" feature that ensures messages are processed on the same thread when they have the same "group id". Here is the activemq docs that describe the feature: http://activemq.apache.org/message-groups.html. This doesn't handle the Dead Letter Queue use cases, so this may break ordering

    3. Strict ordering by business entity id - This moves the ordering functionality to the message bus (in this case Spring Integration). It provides the greatest control in handling ordering as well as failed messages. It also has the benefit of being reused outside JMS scenarios. Gary Russell put together an implementation that can be seen here: http://www.infoq.com/presentations/I...ng-Integration. The code can be found here: https://github.com/garyrussell/sprin...ration-ha-demo

    Hope that helps.
    Brian

    Comment

    Working...
    X