Announcement Announcement Module
Collapse
No announcement yet.
RabbitMQ channels not getting released and connections hanging in blocked/blocking st Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • RabbitMQ channels not getting released and connections hanging in blocked/blocking st

    Good afternoon Dave.
    Dave we are still using spring-amqp 1.0.0M3 version. We didn't get a chance to migrate to 1.0.0 release.
    We are using channel transaction while sending and consuming messages.
    For example DMBTemplate which extends RabbitTemplate is set with channel transacted as true and
    also the consumer configuration is set with channel transacted as true. In addition we are injecting
    transactionManager as org.springframework.batch.support.transaction.Reso urcelessTransactionManager.
    In one of the previous threads you suggested not to use transactionManager since it is not doing anything.
    The reason for including it is: referring to SecondQConsumer.handleMessage() method, in this method
    the business logic required to split a message and send those messages to another queue.
    If we don't include transactionManger what we observed is that after splitting messages and making calls to
    send messages to another queue, if an exception is thrown, messages were still sent to next queue.
    When we added transactionManager, we found that it is behaving as a unit of work. That's the reason why transactionManager is used.

    Unfortutantely we are running into another problem. One of the developers coded two queues.
    In the first queue handler, when the message is received it checks for some criteria, if the criteria is not met then
    sleep for 3 seconds and send it to a delayQueue where it waits for another 3-5 seconds and come back to the first queue.
    There was no business logic coded to handle a message that has bad data. When QA team performed endurance test running a testcase
    for 4 concurrent users for about 8 hours, we found that about 35000 channels with transaction are hanging and not getting released.
    We also found that about 40 connections some of them in blocking and some of them in blocked state. We used rabbitmqctl list_channels
    to list channels and rabbitmqctl list_connections to list connections.
    When QA team disabled this problematic testcase we found that only 27 channels hanging around and no connections in blocked/blocking state. In other words they have run test cases that have queues with channelTransacted as flase and no transactionMagarer injected into SimpleMessageListenerContainer.

    Dave the incorrect business logic was causing rolling messages from one queue to another without getting consumed.
    This we will correct it.
    Could you please let me know what could be the cause of not releasing channels and letting connections in blocked/blocking state.
    Do you think using transactionManager org.springframework.batch.support.transaction.Reso urcelessTransactionManager posing this problem.
    I have attached configurations and classes for reference.
    I appreciate and thank you for your feedback.

    Thanks
    Venkat
    ================================================== ===
    @Configuration
    public abstract class AbstractMDBRabbitConfiguration extends AbstractRabbitConfiguration{

    protected abstract void configureMDBTemplate(DMBTemplate template);

    @Bean
    public ConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
    connectionFactory.setChannelCacheSize(100);
    connectionFactory.setUsername("guest");
    connectionFactory.setPassword("guest");
    return connectionFactory;
    }

    @Bean
    public DMBTemplate rabbitTemplate() {
    DMBTemplate template = new DMBTemplate(connectionFactory());
    template.setMessageConverter(messageConverter());
    configureMDBTemplate(template);
    return template;
    }


    @Bean
    public MessageConverter messageConverter() {
    return new SimpleMessageConverter();
    }
    }
    ----------------------------------------------------
    @Configuration
    public class SecondQConfiguration extends AbstractMDBRabbitConfiguration{

    private static final String MDB_SECQ_EXCHANGE_NAME = "mdb.testq.exchange";

    private static final String MDB_SECQ_NAME = "mdb.testq.queue";

    private static final String MDB_SECQ_ROUTING_KEY = MDB_SECQ_NAME;


    @Override
    protected void configureMDBTemplate(DMBTemplate template) {
    template.setExchange(MDB_SECQ_EXCHANGE_NAME);
    template.setRoutingKey(MDB_SECQ_NAME);
    }

    @Bean
    public DMBTemplate rabbitTemplate() {
    DMBTemplate template = new DMBTemplate(connectionFactory());
    template.setChannelTransacted(true);
    template.setMessageConverter(messageConverter());
    configureMDBTemplate(template);
    return template;
    }

    @Bean
    public Queue mdbSecQ() {
    Queue q = new Queue(MDB_SECQ_NAME);
    q.setAutoDelete(false);
    q.setDurable(true);
    return q;
    }


    @Bean
    public TopicExchange mdbSecQExchange() {
    TopicExchange t = new TopicExchange(MDB_SECQ_EXCHANGE_NAME);
    t.setAutoDelete(false);
    t.setDurable(true);
    return t;
    }


    @Bean
    public Binding mdbSecQBinding() {
    return BindingBuilder.from(mdbSecQ()).to(mdbSecQExchange( )).with(MDB_SECQ_ROUTING_KEY);
    }
    }
    ----------------------------------------------------------------------------------------------------
    Utility class to get hold of rabbit template to send the message:
    ================================================== ===============
    public class ApplicationContextUtil {
    private static final AbstractApplicationContext cac3;
    static {
    cac3 = new ClassPathXmlApplicationContext("producer-secondq-bootstrap-config.xml");
    cac3.registerShutdownHook();
    }

    public static DMBTemplate getDMBTemplate() {
    return (DMBTemplate) cac3.getBean("rabbitTemplate");
    }

    }

    For example DMBMessage is put on quueue "queue.one" as follows:
    DMBTemplate template = ApplicationContextUtil.getDMBTemplate();
    DMBMessage dmbMsg = new DMBMessage();
    DMBMessagePayload payload = new DMBMessagePayload();
    payload.set<essage("payload");
    dmbMsg.setPayload(payload);
    template.convertAndSend(dmbMsg);


    -----------------------------------------------------------------------------------
    Handler that processes messages:
    ================================
    public class SecondQConsumer implements IDMBProcessor{
    private int count = 0;

    public void handleMessage(DMBMessage msg) {
    //consume the message
    DMBMessagePayload pl = msg.getMessagePayload();
    String message = (String)pl.getMessage();
    // Here there is logic to wait for 3 seconds and if the criteria doesn't meet put the message back into queue.
    // In other cases the logic split xml message into peices and send them to another queue one by one.
    // To make sure here the unit of work executes or rolled back by injecting org.springframework.batch.support.transacti on.ResourcelessTransactionManager
    // into messageListenerContainer as transactionManager
    }

    }
    -----------------------------------------------------------------------------------------------
    Producer config: producer-secondq-bootstrap-config.xml
    ================================================== ====
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:jaxws="http://cxf.apache.org/jaxws"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schem...-beans-3.0.xsd
    http://cxf.apache.org/jaxws http://cxf.apache.org/schemas/jaxws.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd">

    <!-- pick up rabbit broker configuration -->

    <context:component-scan base-package="com.test">
    </context:component-scan>
    </beans>
    ---------------------------------------------------------------------------
    Consumer configuration: consumer--secondq-bootstrap-config.xml
    ================================================== ============
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schem...-beans-3.0.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd">

    <!-- pick up rabbit broker configuration -->

    <context:component-scan base-package="com.test">
    </context:component-scan>

    <bean id="messageListenerContainer" class="org.springframework.amqp.rabbit.listener.Si mpleMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="queueName" value="mdb.testq.queue"/>
    <property name="concurrentConsumers" value="1" />
    <property name="messageListener" ref="messageListenerAdapter" />
    <property name="channelTransacted" value="true" />
    <property name="transactionManager" ref="txnManager" />
    </bean>

    <bean id="messageListenerAdapter" class="org.springframework.amqp.rabbit.listener.ad apter.MessageListenerAdapter">
    <property name="delegate" ref="testHandler" />
    <property name="messageConverter" ref="messageConverter" />
    </bean>

    <bean id="testHandler" class="com.test.SecondQConsumer">
    </bean>

    <bean id="txnManager" class="org.springframework.batch.support.transacti on.ResourcelessTransactionManager" />

    </beans>
    ---------------------------------------------------------------------------------------------------------------------------

  • #2
    Sounds like you re-discovered this issue: https://jira.springsource.org/browse/AMQP-190?

    Comment


    • #3
      Good morning Dave.
      Thanks for your feedback.
      Dave migrating from spring-amqp 1.0.0M3 to 1.00 should resolve this problem?.

      Thanks
      Venkat

      Comment


      • #4
        No, the issue is marked as resolved in 1.0.1. Workarounds are discussed in the issue comments.

        Comment


        • #5
          Good morning Dave.
          I appreciate and thank you for your feedback.
          Dave my apology to you for bothering with one last question.
          We have disabled the testcase that is using TransactionManager and we found that there are no thousands of channels created.
          When I run rabbitmqctl list_channels there are 29 channels listed. This number remains constant over the period of 45 hours of test.
          When I run rabbitmqctl list_connections there are 47 connections running and this remain constant for 45 hours.
          This is good the numbers are under control. There is no abnormal growth of channel creation because we don't have TransactionManager in action.
          Dave one question is 29 channels and 47 connections are listed because connections are cached and therefore their respective channels are active.

          Thanks
          Venkat

          Comment

          Working...
          X