Announcement Announcement Module
No announcement yet.
CachingConnectionFactory leaks channels in when synchronized with a transaction? Page Title Module
Move Remove Collapse
Conversation Detail Module
  • Filter
  • Time
  • Show
Clear All
new posts

  • CachingConnectionFactory leaks channels in when synchronized with a transaction?

    Hello everyone,

    I am switching to using transacted channels for guaranteed delivery and I started observing some weird behavior in my connections channels.

    After several publishes, the broker is running out of memory because it seems that channels, when channelTransacted=true AND when on a thread with an ACTIVE transaction that the PlatformTransactionManager knows about (e.g. when the RabbitResourceHolder is synchronized with the current DB transaction).

    I've stepped through the code, and realize that the channel.commit() and channel.close() operations are being deferred until after the transaction manager has committed the transaction. This is a good thing.

        public static void releaseResources(RabbitResourceHolder resourceHolder) {
            if (resourceHolder == null || resourceHolder.isSynchronizedWithTransaction()) {
    The difficulty occurs when the ResourceHolder afterComplete() fires after the transaction has been completed.

                if (releaseNecessary) {
                    releaseResource(this.resourceHolder, this.resourceKey);
    This properly should close the channel and the connection (releasing the channel to the cachingconnectionfactory, and not closing the connection, due to reuse of the connection for amqp)

    The difficulty is that:$RabbitResourceSynchron ization:256
            protected void releaseResource(RabbitResourceHolder resourceHolder, Object resourceKey) {
    calls releaseResources... and the resourceHolder is still marked as synchronized with the transaction. So this method is a NOP, and the channel stays unclosed. Subsequent publishes therefore do not see existing channels in the CachingConnectionFactory, and every transacted publish within the scope of an existing transaction will result in a new channel until the broker runs out of memory.

    Am I missing some configuration somewhere?

    My workaround (which is not acceptable for guaranteed delivery) is to use channelTransacted = false

    $ mvn dependency:list | grep amqp
    [INFO] com.rabbitmq:amqp-client:jar:2.5.0:compile
    [INFO] org.springframework.amqp:spring-amqp:jar:1.0.0.RELEASE:compile
    [INFO] org.springframework.amqp:spring-rabbit:jar:1.0.0.RELEASE:compile
    [INFO] org.springframework.integration:spring-integration-amqp:jar:2.1.0.M1:compile

    Last edited by neoha; Sep 10th, 2011, 09:46 PM. Reason: spelling

  • #2
    This looks like it might be a good catch. I don't think it should matter if you are using an async consumer (MessageListenerContainer) because the Channel is not supposed to be released in that case anyway - so your use case is synchronous? You could raise a JIRA and we can look at it for 1.0.1, or you could submit a pull request. Another workaround is to not use a transaction manager - I don't know enough about your use case to know if that would work for you.


    • #3

      Thanks for the quick reply.
      My use case is actually on the publishing side, so yes, its "synchronous". (MessageListenerContainer doesn't apply and yes I am *not* seeing any channel "leaks" on the consuming side. Only the publishing side.) I'll look into raising a JIRA shortly and possibly a pull request if I can scrounge some time.

      Oh, and I *do* need the Transaction Manager... reason being, if the publish to rabbit fails, I want to rollback the database transaction (I'm recording whether or not a given entity has been published to rabbit).

      Last edited by neoha; Sep 10th, 2011, 09:51 PM. Reason: adding in why I need transaction manager...


      • #4
        JIRA issue