Announcement Announcement Module
Collapse
No announcement yet.
Explicit ACK with DMLC Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Explicit ACK with DMLC

    Is this design a workable one?

    My app has a DMLC with a single listener thread. The thread submits the received workitem from MQ to a pool of worker threads.

    Each worker thread when it completes its job updates the state of the Listener with a flag indicating that the job is finished and that the message can be acknowledged.

    The listener will hold a reference to each message that has not been explicitly acknowledged. The Listener at the end of the onMessage() processes all the flags and explicitly acknowledges a particular message with a message.acknowledge() call. The message it acknowledges is determined by knowing if each and every single message prior to that particular message was successfully processed by any of the worker threads. In other words, the Listener keeps an ordered list of all received messages.

    I have done some testing with DMLC and ActiveMQ.

    1. Is it possible for a single thread of DMLC to explicitly do a message.acknowledge() on a particular message and all thereby trigger activemq to remove all messages prior to the message that was acknowledged? It does not work for me. The messages are still in the queue even when I acknowledge.

    I am using this strategy to avoid message losses. When the app / session / connection is restarted, ActiveMQ should redeliver. It does but when I send a message.acknowledge(), the messages remain on ActiveMQ.

    Another thing I can't figure out is the configuration:

    <bean id="jmsContainer"
    class="org.springframework.jms.listener.DefaultMes sageListenerContainer">
    <property name="connectionFactory" ref="cachedConnectionFactory" />
    <property name="concurrentConsumers" value="1" />
    <property name="destination" ref="receiveQueue" />
    <property name="messageListener" ref="MessageListener" />
    <property name="recoveryInterval" value="2000"/>
    <!-- property name="sessionTransacted" value="true"/-->
    <property name="sessionAcknowledgeMode" value="3"/>
    <property name="cacheLevelName" value="CACHE_CONSUMER"/>
    </bean>

    If I set the value of "sessionAcknowledgeMode" =1 or 2 or 3, the message automatically disappears from ActiveMQ after it is delivered. If I set it to 4, then it remains on the queue. But I don't believe "4" is a valid value.

    Thanks

  • #2
    OK, I realized that ActiveMQ does allow a "4" value:
    public static final int INDIVIDUAL_ACKNOWLEDGE 4.

    But I'd still like to know if my strategy will work and how to trigger the provider to remove all messages up to the message that was acknowledged. What should the value of sessionAcknowledgeMode be? It looks like all of these with the exception of SESSION_TRANSACTED automatically acknowledge the message in some way.

    Thanks

    Comment


    • #3
      OK, I realized that ActiveMQ does allow a "4" value:
      public static final int INDIVIDUAL_ACKNOWLEDGE 4.

      But I'd still like to know if my strategy will work and how to trigger the provider to remove all messages up to the message that was acknowledged. What should the value of sessionAcknowledgeMode be? It looks like all of these with the exception of SESSION_TRANSACTED automatically acknowledge the message in some way.

      Thanks

      Comment


      • #4
        Why is there a difference between DMLC and javax.jms.Session for ACKNOWLEDGEMENT MODE:

        DMLC:
        "sessionAcknowledgeMode" set to "CLIENT_ACKNOWLEDGE": Automatic message acknowledgment after successful listener execution; no redelivery in case of exception thrown.

        javax.jms.Session:
        CLIENT_ACKNOWLEDGE
        With this acknowledgment mode, the client acknowledges a consumed message by calling the message's acknowledge method.

        Does this mean that DMLC does not allow calling message.acknowledge()?

        Comment


        • #5
          Well, the DMLC is the client, so it does the acknowledgement for CLIENT_ACKNOWLEDGE after the listener returns (in AbstractMessageListenerContainer.commitIfNecessary () ).

          If you want to manually ack the messages, you can't use the DMLC (or SMLC).

          Comment


          • #6
            individual manual ack for messages

            Originally posted by Gary Russell View Post
            Well, the DMLC is the client, so it does the acknowledgement for CLIENT_ACKNOWLEDGE after the listener returns (in AbstractMessageListenerContainer.commitIfNecessary () ).

            If you want to manually ack the messages, you can't use the DMLC (or SMLC).

            Hi Gary,
            Is there a way of sending manual acknowledgement after consuming the messages? acknowledgements needs to be sent for individual messages.

            Comment


            • #7
              Not with the listener container - you would have to create your own consumer, perhaps by using a JmsTemplate execute method (with a SessionCallback).

              Comment


              • #8
                can u pls provide a sample impl or the context xml impl alone..

                Originally posted by Gary Russell View Post
                Not with the listener container - you would have to create your own consumer, perhaps by using a JmsTemplate execute method (with a SessionCallback).
                can u pls provide a sample impl link or sample appl zip or the context xml impl alone..

                Comment


                • #9
                  This sends the ack every 10 messages...

                  Code:
                  	<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
                  		<property name="targetConnectionFactory">
                  			<bean class="org.apache.activemq.ActiveMQConnectionFactory">
                  				<property name="brokerURL" value="vm://localhost?broker.persistent=false"/>
                  			</bean>
                  		</property>
                  		<property name="sessionCacheSize" value="10"/>
                  	</bean>
                  
                  	<bean id="requestQueue" class="org.apache.activemq.command.ActiveMQQueue">
                  		<constructor-arg value="queue.demo"/>
                  	</bean>
                  
                  	<bean id="template" class="org.springframework.jms.core.JmsTemplate">
                  		<property name="connectionFactory" ref="connectionFactory" />
                  		<property name="sessionAcknowledgeModeName" value="CLIENT_ACKNOWLEDGE" />
                  		<property name="defaultDestination" ref="requestQueue" />
                  	</bean>
                  Code:
                  @ContextConfiguration
                  @RunWith(SpringJUnit4ClassRunner.class)
                  public class ManualAcks {
                  
                  	@Autowired
                  	private JmsTemplate template;
                  
                  	@Test
                  	public void test() throws Exception {
                  
                  		for (int i = 0; i < 20; i++) {
                  			template.convertAndSend("foo" + i);
                  		}
                  
                  		final CountDownLatch latch = new CountDownLatch(20);
                  
                  		template.execute(new SessionCallback<Object>() {
                  
                  			@Override
                  			public Object doInJms(final Session session) throws JMSException {
                  
                  				MessageConsumer consumer = session.createConsumer(template.getDefaultDestination());
                  				int n = 0;
                  				while (true) {
                  					Message m = consumer.receive(1000);
                  					if (m != null) {
                  						System.out.println(m);
                  						if (++n % 10 == 0) {
                  							m.acknowledge();
                  						}
                  						latch.countDown();
                  					}
                  					else {
                  						System.out.println("No message (" + n + ")");
                  					}
                  					if (n == 20) {
                  						break;
                  					}
                  				}
                  				return null;
                  			}
                  		}, true); // <<<< start the connection
                  
                  		assertTrue(latch.await(10, TimeUnit.SECONDS));
                  	}
                  
                  }

                  Comment

                  Working...
                  X