Announcement Announcement Module
Collapse
No announcement yet.
amqp messageListener not working programatically ? Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • amqp messageListener not working programatically ?

    Hi guys,

    I've got this TopicExchange configured via xml namespace which works fine if I define the consumers (queue and bindigs) via xml. But when I try to do it programatically from my code my listener pojo never got called.

    I can confirm that the binding is successful, I can even see the the messages en-queued but for some reason my "Message-driven POJO" is never call when it has a message available.

    This is how I defined my exchange
    Code:
    <rabbit:topic-exchange name="app.alerts">
    		<rabbit:bindings>
    			<rabbit:binding queue="app.log.queue" pattern="app.alerts.#"/>
    			<rabbit:binding queue="app.web.queue" pattern="app.alerts.web.*"/>
    		</rabbit:bindings>
    	</rabbit:topic-exchange>
    This is one of my working listeners

    Code:
    ....
    <bean id="logSystemNotifications" class="com.mycompany.amqp.consumers.logger"/>
    
    <rabbit:listener-container message-converter="jsonMessageConverter" connection-factory="connectionFactory">
    		<rabbit:listener ref="logSystemNotifications" method="handleMessage" queue-names="app.log.queue"/>
    </rabbit:listener-container>
    ....
    And this is one of my attempts to do it programatically
    PHP Code:
    ....
    public class 
    NotificationsConsumer implements MessageListener {
        private static 
    Logger log LoggerFactory.getLogger(NotificationsConsumer.class);
        
        public 
    NotificationsConsumer(final AmqpAdmin amqpAdmin, final TopicExchange exchange, final Queue queue, final ConnectionFactory connectionFactory) {

         
            
    amqpAdmin.declareBinding(BindingBuilder.bind(queue).to(exchange).with("app.alerts.#"));
            
            
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
            
    container.setConnectionFactory(connectionFactory);
            
    container.setQueueNames(queue.getName());
            
    container.setMessageListener(this);
         }


        
    /* (non-Javadoc)
         * @see org.springframework.amqp.core.MessageListener#onMessage(org.springframework.amqp.core.Message)
         */
        
    @Override
        
    public void onMessage(Message message) {
            
    System.out.println(message);
        }
         


    I tried it too using the Adapter "MessageListenerAdapter" with the same result. What is that, that I'm doing wrong?

    Can't the two configurations xml and programatically being mixed?

    PHP Code:
    ...
    public class 
    NotificationsConsumer {
             public 
    NotificationsConsumer(final AmqpAdmin amqpAdmin, final TopicExchange exchange, final Queue queue, final ConnectionFactory connectionFactory) {
         
            
    amqpAdmin.declareBinding(BindingBuilder.bind(queue).to(exchange).with("app.alerts.#"));
            
            
    MessageListener listener = new MessageListenerAdapter(this, new JsonMessageConverter());
            
            
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
            
    container.setConnectionFactory(connectionFactory);
            
    container.setQueueNames(queue.getName());
            
    container.setMessageListener(listener);
            
            
         }

         public 
    void handleMessage(String message) {
            
    log.error(message);
        }


  • #2
    Yes, you should be able to mix the configurations. Silly question.... are you allowing time for the message listener to get called? Your message listener will run in a different thread and if your main thread is shutting down too early your message listener will not get called.


    nicolas.loriente

    Comment

    Working...
    X