Announcement Announcement Module
Collapse
No announcement yet.
Rabbit listener - too much processed messages Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Rabbit listener - too much processed messages

    At first I have application where I'm using RabbitMQ and Grails. I defined listeners:
    Code:
    <bean id="rabbitListenerCreateDocument" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
             <property name="connectionFactory" ref="connectionFactory"/>        
             <property name="queues" ref="rabbitQueuePageInstanceCreated" />        
             <property name="defaultRequeueRejected" value="false"/>        
             <property name="messageListener" ref="createPageService" />        
             <property name="errorHandler" ref="errorHandlerService" />        
             <property name="transactionManager" ref="mongoTransactionManager"/>        
             <property name="autoStartup" value="true" />        
             <property name="concurrentConsumers" value="0" />  
    </bean>
    The problem is that onMessage method from createPageService run too more times. I think because I got this error (when I turn onlogs from spring amqp):
    Code:
    2014-05-13 12:08:40,669 [SimpleAsyncTaskExecutor-2] TRACE listener.SimpleMessageListenerContainer  - Waiting for message from consumer.
    2014-05-13 12:08:40,669 [SimpleAsyncTaskExecutor-2] DEBUG listener.BlockingQueueConsumer  - Retrieving delivery for Consumer: tag=[amq.ctag-ai0NxtUB9-WZROZHFQn7Ag], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672tesse_bps_karol,9), acknowledgeMode=MANU
    AL local queue size=0
    2014-05-13 12:08:40,671 [SimpleAsyncTaskExecutor-2] WARN  listener.SimpleMessageListenerContainer  - Consumer raised exception, processing can restart if the connection factory supports it
    com.rabbitmq.client.ShutdownSignalException: connection error; reason: com.rabbitmq.client.impl.UnknownChannelException: Unknown channel number 10
            at java.lang.Thread.run(Thread.java:744)
    Caused by: com.rabbitmq.client.impl.UnknownChannelException: Unknown channel number 10
            at com.rabbitmq.client.impl.ChannelManager.getChannel(ChannelManager.java:74)
            at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:533)
    2014-05-13 12:08:40,671 [SimpleAsyncTaskExecutor-2] INFO  listener.SimpleMessageListenerContainer  - Restarting Consumer: tag=[amq.ctag-ai0NxtUB9-WZROZHFQn7Ag], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672tesse_bps_karol,9), acknowledgeMode=MANUAL l
    ocal queue size=0
    2014-05-13 12:08:40,671 [SimpleAsyncTaskExecutor-2] DEBUG listener.BlockingQueueConsumer  - Closing Rabbit Channel: Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672fis_bps_karol,9)
    2014-05-13 12:08:40,673 [SimpleAsyncTaskExecutor-3] DEBUG listener.BlockingQueueConsumer  - Starting consumer Consumer: tag=[null], channel=null, acknowledgeMode=MANUAL local queue size=0
    2014-05-13 12:08:40,679 [SimpleAsyncTaskExecutor-3] DEBUG connection.CachingConnectionFactory  - Creating cached Rabbit Channel from AMQChannel(amqp://[email protected]:5672fis_bps_karol,9)
    2014-05-13 12:08:40,687 [SimpleAsyncTaskExecutor-3] DEBUG listener.BlockingQueueConsumer  - Started on queue 'tesse-page.srv.document.instance.created': Consumer: tag=[null], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:tesse_bps_karol,9), acknowledgeMode=MANUAL local queue size=0
    After some time I hit on that spring dosnt send ack to rabbit when this exception was thrown.
    Everything should look fine, but... in onMessage method I'm creating document in MongoDB == and I'm geting too much documents - it CRITICAL in my app.
    I'm using Mongo to store some data, and as I know MongoDB is not transactional. I think scenario looks like this:
    1. Listener getting message
    2. method onMessage from createPageService is running
    3. method creating document in Mongo
    4. method onMessage reach to end
    5. Spring want to send ACK to rabbit but it get UnknownChannelException
    6. TransactionManager want to rollback but it can't - there is no transaction on MongoDB
    7. The same message came one more time and now everything works ok (there is no Exception)
    I trying to make some modification to solve this but it doesn’t works:
    I'm tried to add Around aspect on onMessage method - it dosnt works ... I'm getting:
    Error creating bean with name 'grailsApplicationPostProcessor': BeanPostProcessor before instantiation of bean failed; nested exception is org.springframework.beans.factory.BeanCreationExce ption: Error creating bean with name 'org.springframework.transaction.config.internalTr ansactionAdvisor': Cannot resolve reference to bean 'org.springframework.transaction.annotation.Annota tionTransactionAttributeSource#0' while setting bean property 'transactionAttributeSource'; nested exception is org.springframework.beans.factory.BeanCreationExce ption: Error creating bean with name 'org.springframework.transaction.annotation.Annota tionTransactionAttributeSource#0': BeanPostProcessor before instantiation of bean failed; nested exception is org.springframework.beans.factory.BeanCreationExce ption: Error creating bean with name 'org.springframework.aop.aspectj.AspectJPointcutAd visor#0': Instantiation of bean failed; nested exception is org.springframework.beans.BeanInstantiationExcepti on: Could not instantiate bean class [org.springframework.aop.aspectj.AspectJPointcutAdv isor]: Constructor threw exception; nested exception is java.lang.IllegalArgumentException: error at ::0 formal unbound in pointcut

    Somebody have any idea to solve this problem ?

  • #2
    What version of Spring AMQP?

    What does your createPageService look like - is it a ChannelAwareMessageListener? Are you doing anything unusual with the Channel?

    I suggest you turn on TRACE level logging and it should become clear what's happening.

    Comment


    • #3
      In project we ar using spring-rabbit:1.2.1.RELEASE
      with amqp-client:3.1.3

      My createPageService looks like:

      Code:
      class CreatePageService implements MessageListener{
      
          @Override
          void onMessage(Message message) {
          }
      }
      And I dosn't use directly Chanel in onMessage method.
      I will try to turn on logs to TRACe and check it.
      Last edited by lpyrkosz; May 16th, 2014, 02:39 AM.

      Comment


      • #4
        And my log looks like:
        http://pastebin.com/S8zpRHwB

        I paste here whole log from my app (when it received 1500 documents)
        Last edited by lpyrkosz; May 16th, 2014, 04:22 AM.

        Comment


        • #5
          I don't see any of those errors in the log (unkown channel), just a few "normal" closes (200 OK).

          Comment

          Working...
          X