Announcement Announcement Module
Collapse
No announcement yet.
Testing Guranteed Delivery Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Testing Guranteed Delivery

    Good afternoon.
    I am trying to test guaranteed delivery using spring amqp.
    Making sure that:
    1. durable queue is used.
    2. delivery mode as PERSISTENT_MODE
    3. channelTransacted.

    To test the guaranteed delivery scenario I am doing the following:
    - publish 100 messages
    - In the consumer I am incrementing the counter.

    - When I see the count of 10 messages, bringing down RabbitMQ broker.

    - Bring up RabbitMQ broker, using RabbitMQ Management utility checking how many messages_ready are there.
    I found that there are no messages_ready ( in other words messages_ready count is zero).

    - Bring up the consumer expecting to process rest of the 90 messages.
    Since there are messages_ready in the queue I don't see any count print stataments.
    This means I lost 90 messages when I brought down RabbitMQ broker.

    I have repeated this test by publishing 20000 messages.
    Before I bring down RabbitMQ broker, 645 messages are processed.
    Bring down Broker and bring it up and checked messages_ready in the queue. The count is 17609.

    17609 + 645 = 18254 messages in tact out of 20000.
    1746 messages are missing.

    Could you please let me know if I am doing something wrong in my queueing configuration.

    Following is the queueing configuration I am using:
    ================================================== =

    @Configuration
    public abstract class AbstractMDBRabbitConfiguration extends AbstractRabbitConfiguration{

    protected abstract void configureMDBTemplate(DMBTemplate template);

    @Value("${rabbitmq.user}")
    private String rabbitMQUser;

    @Value("${rabbitmq.password}")
    private String rabbitMQPassword;

    @Value("${rabbitmq.host}")
    private String rabbitMQHost;



    @Bean
    public ConnectionFactory connectionFactory() {
    SingleConnectionFactory connectionFactory = new SingleConnectionFactory(rabbitMQHost);
    connectionFactory.setUsername(rabbitMQUser);
    connectionFactory.setPassword(rabbitMQPassword);
    return connectionFactory;
    }


    public DMBTemplate rabbitTemplate() {
    DMBTemplate template = new DMBTemplate(connectionFactory());
    template.setMessageConverter(messageConverter());
    configureMDBTemplate(template);
    return template;
    }


    @Bean
    public MessageConverter messageConverter() {
    return new SimpleMessageConverter();
    }
    }
    ================================================== ==============================================
    @Configuration
    public class SecondQConfiguration extends AbstractMDBRabbitConfiguration{

    private static final String MDB_SECQ_EXCHANGE_NAME = "mdb.testq.exchange";

    private static final String MDB_SECQ_NAME = "mdb.testq.queue";

    private static final String MDB_SECQ_ROUTING_KEY = MDB_SECQ_NAME;


    @Override
    protected void configureMDBTemplate(DMBTemplate template) {
    template.setExchange(MDB_SECQ_EXCHANGE_NAME);
    template.setRoutingKey(MDB_SECQ_NAME);
    }

    @Bean
    public DMBTemplate secondQTemplate() {
    return rabbitTemplate();
    }

    @Bean
    public Queue mdbSecQ() {
    Queue q = new Queue(MDB_SECQ_NAME);
    q.setAutoDelete(false);
    q.setDurable(true);---------------------> Making sure that queue is durable
    return q;
    }


    @Bean
    public TopicExchange mdbSecQExchange() {
    TopicExchange t = new TopicExchange(MDB_SECQ_EXCHANGE_NAME);
    t.setAutoDelete(false);
    t.setDurable(true);
    return t;
    }


    @Bean
    public Binding mdbSecQBinding() {
    return BindingBuilder.from(mdbSecQ()).to(mdbSecQExchange( )).with(MDB_SECQ_ROUTING_KEY);
    }
    }
    ================================================== ================================================== ======
    public class DMBTemplate extends RabbitTemplate {

    public DMBTemplate() {
    super();
    }

    public DMBTemplate(ConnectionFactory connectionFactory) {
    super(connectionFactory);
    }

    @Override
    public void convertAndSend(Object object) throws DMBMessagingException {
    try {
    super.convertAndSend(object);
    } catch (AmqpException amqpe) {
    throw new DMBMessagingException(amqpe);
    }
    }


    @Override
    public void convertAndSend(String exchange, String routingKey,
    final Object message) throws AmqpException {
    MessageProperties props =new MessageProperties();
    props.setDeliveryMode(MessageDeliveryMode.PERSISTE NT); -------> setting delivery mode as PERSISTENT
    MessageConverter msgConverter = getMessageConverter();
    send(exchange, routingKey, getMessageConverter().toMessage(message, props));
    }
    }

    ================================================== ================================================== ==========
    Handler that processes messages:
    ================================
    public class SecondQConsumer implements IDMBProcessor{
    private int count = 0;

    public void handleMessage(DMBMessage msg) {
    DMBMessagePayload pl = msg.getMessagePayload();
    String message = (String)pl.getMessage();
    count++;
    System.out.println("count:"+ count);
    }

    }

    ================================================== ================================================== =====
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:jaxws="http://cxf.apache.org/jaxws"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schem...-beans-3.0.xsd
    http://cxf.apache.org/jaxws http://cxf.apache.org/schemas/jaxws.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd">

    <!-- pick up rabbit broker configuration -->

    <context:component-scan base-package="com.test">
    </context:component-scan>



    <contextroperty-placeholder location="file:///var/test/amqp.properties"/>




    <bean id="messageListenerContainer" class="org.springframework.amqp.rabbit.listener.Si mpleMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="queueName" value="mdb.testq.queue"/>
    <property name="concurrentConsumers" value="1" />
    <property name="messageListener" ref="messageListenerAdapter" />
    <property name="channelTransacted" value="true" />

    <property name="transactionManager" ref="txnManager" />

    </bean>

    <bean id="messageListenerAdapter" class="org.springframework.amqp.rabbit.listener.ad apter.MessageListenerAdapter">
    <property name="delegate" ref="testHandler" />
    <property name="messageConverter" ref="messageConverter" />
    </bean>



    <bean id="testHandler" class="com.test.SecondQConsumer">
    </bean>

    <bean id="txnManager" class="org.springframework.batch.support.transacti on.ResourcelessTransactionManager" />


    </beans>


    To make sure messages rolled back on exception, using channelTransacted = true and using transactionManager as
    org.springframework.batch.support.transaction.Reso urcelessTransactionManager.
    ================================================== ================================================== ======================

    Thanks
    Venkat

  • #2
    I can't see anything wrong there. The transaction manager is not adding anything and you should remove it, but it shouldn't break anything either. You could also set the prefetch count in the container to limit the number of messages delivered before any acks are sent.

    It looks like you are using an older version so if you can use a snapshot, or wait till tomorrow and pick up 1.0.0.m3 we might make more progress (some bugs in the message listener container were fixed recently).

    One question: when you say you "bring down the broker" what exactly do you mean? How do you know that exactly 10 messages were delivered when you did that?

    Comment


    • #3
      Good morning Dave.
      I was running the Consumer and when 10 messages are processed by SecondQConsumer which is handler (I am making sure this by printing the count in this handler), I stop RabbitMQ broker which is Running from C:\Program Files\RabbitMQ\rabbitmq_server2.0.0\sbin. In other words killing rabbitmq-server.bat. Once RabbitMQ broker is down, I bring it up and run the command: rabbitmqctl list_queues messages_ready (from C:\Program Files\RabbitMQ\rabbitmq_server2.0.0\sbin. This should show me the number of messages ready to be consumed by the handler. Since I am using Persistent Mode as delivery mode, my expectation is that even if the RabbitMQ broker is down, messages are saved and this number has to be 90. Since 10 of them are already processed.

      As per your suggestion I will try with 1.0.0.M3.
      Dave I have one question. If I remove Transaction Manger, then I should remove channelTransacted property also. By removing these two I believe the rollback mechanism will not work if an exception is thrown in handleMessage of the handler.

      Thanks
      Venkat

      Comment


      • #4
        Originally posted by vveludan View Post
        This should show me the number of messages ready to be consumed by the handler.
        And what was the result? Were there any unacked messages?

        As per your suggestion I will try with 1.0.0.M3.
        If you can try a snapshot, you will have a chance to find any problems before M3 is released. Anyway, thanks for trying it and reporting on your experience.

        If I remove Transaction Manger, then I should remove channelTransacted property also.
        No, those two properties are independent (otherwise we wouldn't have provided both). Your transaction manager does nothing, so it can be removed, until your transaction needs it (probably never, I would guess, unless you start using Spring Batch stuff inside your listener).
        Last edited by Dave Syer; Mar 7th, 2011, 04:21 AM. Reason: formatting

        Comment


        • #5
          Good afternoon Dave.

          And what was the result? Were there any unacked messages?

          Dave I took the snap shot of M3 and retried the test.
          - Published 10 messages
          - After consuming 5 messages, shutdown the rabbitmq broker. Message Conatiner Listener stopped throwing java.net.ConnectException: Connection refused: connect (which is expected).
          - Brought up rabbitmq broker. Used the command: rabbitmqctl list_queues messages_ready messages_unacknowledged. It is found that both messages_ready and messages_unacknowledged count is zero.

          You are right, by just setting channelTransacted I was able to test the functionality of rolling back the message when an exception is thrown in the handler.

          Thanks
          Venkat

          Comment


          • #6
            With M3 if you don't set the acknowledgeMode=AUTO the broker will not expect any acks, so it will send all the messages (in blocks of 100 by default I think). You are probably receiving them all, but not giving the listener a chance to process them. Can you try again with the acknowledgeMode=AUTO. (Maybe we should make this the default if the channel is transacted - I actually thought it would be an error, but apparently not.)

            Comment


            • #7
              Good morning Dave. As per your suggestion
              I have set autoAcknowledgeMode = AUTO:
              <bean id="messageListenerContainer" class="org.springframework.amqp.rabbit.listener.Si mpleMessageListenerContainer">
              <property name="connectionFactory" ref="connectionFactory" />
              <property name="queueName" value="mdb.testq.queue"/>
              <property name="concurrentConsumers" value="1" />
              <property name="messageListener" ref="messageListenerAdapter" />
              <property name="channelTransacted" value="true" />
              <property name="acknowledgeMode" value="AUTO"/>
              </bean>


              With this configuration I have rerun the test.
              Stopped the broker after processing 5 messages. After brining back RabbitMQ broker, checked for messages_ready and messages_unacknowledged and found that the messages_ready count is 10 (instead of 5, in other words all messages were rolled back). The count of messages_unacknowledged count is zero. At this point I started the Consumer (Message Listener Container). For some reason it was not processing any messages from the queue though the messages_ready count is zero. When I waited for some time and checked again for messages_ready and messages_unacknowledged, the messages_ready count is changed to and messages_unacknowledged count changed to 1. But I didn't see anything processed in handler (The print count was not printed to the console of Message Listener Container).

              Thanks
              Venkat

              Comment


              • #8
                Good afternoon Dave in my previous post I found the following typo mistakes:

                At this point I started the Consumer (Message Listener Container). For some reason it was not processing any messages from the queue though the messages_ready count is zero.--> 10When I waited for some time and checked again for messages_ready and messages_unacknowledged, the messages_ready count is changed to 9 and messages_unacknowledged count changed to 1.

                Thanks
                Venkat

                Comment


                • #9
                  I'm not really sure how the SingleConnectionFactory would behave if the broker isn't stable. Try the CachingConnectionFactory (which should be the default for everyone from M3 onwards). At least you aren't losing any messages now, so the container is getting something right.

                  Comment


                  • #10
                    Good morning Dave.
                    Dave I tried with CachingConnectionFactory. Messages were not getting consumed on start of Listener.
                    I am finding another strange issue.
                    With autoAcknowledgeMode=TRUE, when the broker was down, messages were rolled back. They were not getting consumed on start of spring-amqp Message Listener. At this point I used a consumer code with plain RabbitMQ API to consume all messages.
                    When the messages are cleared I tried to reproduce the problem.
                    - Publish 100 messages
                    - Bring the broker down after 10 messages are consumed.
                    - Bring up the broker
                    - check messages_ready. I see messages are zero.
                    This is strange, if I consume messages with RabbitMQ API consumer,
                    can't reproduce the problem.

                    Thanks
                    Venkat

                    Comment


                    • #11
                      Dave to avoid confusion I have begun using new exchange and new queue.
                      using the same configuration as before

                      <bean id="messageListenerContainer" class="org.springframework.amqp.rabbit.listener.Si mpleMessageListenerContainer">
                      <property name="connectionFactory" ref="connectionFactory" />
                      <property name="queueName" value="rabbitmq.test.queue"/>
                      <property name="concurrentConsumers" value="1" />
                      <property name="messageListener" ref="messageListenerAdapter" />
                      <property name="channelTransacted" value="true" />
                      <property name="acknowledgeMode" value="AUTO"/>
                      </bean>


                      In AbstractMDBRabbitConfiguration: tried with both SingleConnectionFactory and CachingConnectionFactory:
                      @Bean
                      public ConnectionFactory connectionFactory() {
                      //CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitMQHost);
                      SingleConnectionFactory connectionFactory = new SingleConnectionFactory(rabbitMQHost);
                      connectionFactory.setUsername(rabbitMQUser);
                      connectionFactory.setPassword(rabbitMQPassword);
                      //connectionFactory.setChannelCacheSize(1);
                      return connectionFactory;
                      }


                      Now messages are not at all saved to the disk/queue on failure of RabbitMQ broker.

                      Thanks
                      Venkat

                      Comment


                      • #12
                        [QUOTE=vveludan;350593
                        This is strange, if I consume messages with RabbitMQ API consumer,
                        can't reproduce the problem.
                        [/QUOTE]

                        What version of the broker do you have (rabbitmqctl status tells you). It should be 2.3.1 (the latest). Otherwise there was a bug in the broker that prevented messages from being delivered to a restarted MessageListenerContainer.

                        Comment


                        • #13
                          Good morning Dave.
                          I am using RabbitMQ version 2.0. That could be the problem. I will work with the version 2.3.1. I appreciate and thank you for the feed back.

                          Thanks
                          Venkat

                          Comment


                          • #14
                            Dave I have updated RabbitMQ Broker to 2.3.1 version. Also updated OTP Erlang to otp_win32_R14B01 and RabbitMQ Java client to 2.3.1. After these updates I have rerun the test.
                            Following are my observations with 1.0.0M3:
                            - Bring up RabbitMQ Broker
                            - Bring Message Listener Container.
                            - Publish 100 Messages.
                            Dave Simple Message Listener Container is not responding at all to the oublished messages. It is up and running but when messages are put in the queue. It simply does not process. In other words the handler never get executed.

                            Following are observations with 1.0.0M2:
                            - Bring up Broker
                            - Bring up Message Listener Container
                            - Publish 100 messages.

                            Messages are consumed without any problem.

                            The other observation is:

                            -Have Broker running
                            - Have Listener Running.
                            - Publish 100 messages
                            - Bring down messages after 10 messages consumed.
                            - Bring up Broker and check for messages_ready
                            - It shows 100 instead of 90. That is fine. The good thing is messages are saved when the broker is down. When I bring up Listener it process messages in the queue. Though 10 messages are reprocessed (considered as duplicate messages), at least messages are not lost when the broker is down. With M2 I could not set acknowledgeMode = TRUE as it was throwing:
                            org.springframework.beans.NotWritablePropertyExcep tion: Invalid property 'acknowledgeMode'.

                            The drawback of M2 is, when I bring down RabbitMQ Broker, Message Listener Container gets terminated silently without throwing any exception.

                            Coming back to M3, Dave could you please let me know what's happening with Message Listener Container. As I mentioned earlier, it's up and running but never pick up messages from queue to process.

                            I appreciate and thank you for your time.

                            Venkat

                            Comment


                            • #15
                              Originally posted by vveludan View Post
                              Coming back to M3, Dave could you please let me know what's happening with Message Listener Container. As I mentioned earlier, it's up and running but never pick up messages from queue to process.
                              All I can say is it works for me (and everyone else). Can you grab the samples and verify that they work? Run the integration tests?

                              Comment

                              Working...
                              X