Announcement Announcement Module
Collapse
No announcement yet.
Using Spring with RabbitMQ Consumer Cancel Notification Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Using Spring with RabbitMQ Consumer Cancel Notification

    Newbie to RabbitMQ and new to Java.

    I'm attempting to write a listener that will use manual acks and handle consumer cancellation notifications using the java Spring AMQP abstraction. Can I accomplish both tasks by using the Spring abstraction?

    I want to write a listener that will pull messages from a queue and process that message (maybe write to a database or something). I planned on using manual acknowledgements so that if processing of the message fails or can't be completed for some reason, I can reject and requeue. So far I've found that in order to manually ack/nack/reject using Spring AMQP I have to use a ChannelAwareMessageListener.

    Additionally, I want to handle Consumer Cancel Notifications from RabbitMQ, however using the ChannelAwareMessageListener (and Spring AMQP in general) I don't really see a way to achieve this. The only way I see to handle CCN is to write code using the lower level Java Client API (instead of spring amqp) by calling channel.basicConsume() and passing a new DefaultConsumer instance which allows you to handle CCN's with the handleCancel method.

    I also don't see how I would set the clientProperties on the Spring version of the ConnectionFactory (to tell the broker I can handle the CCN).

    My pseudo code of the listener and creation of container is below.

    Code:
    public class MyChannelAwareListener implements ChannelAwareMessageListener
    {
        @Override
        public void onMessage(Message message, Channel channel) throws Exception
        {
            msgProcessed = processMessage(message);
    
            if(msgProcessed)    
               channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            else
               channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);  
        }
    }
    public static void main(String[] args) throws Exception
    {
        ConnectionFactory rabbitConnectionFactory;
        ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext   (MY_CONTEXT_PATH);
        rabbitConnectionFactory = (ConnectionFactory)ctx.getBean("rabbitConnectionFactory");
    
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    
        MyChannelAwareListener listener = new MyChannelAwareListener();
        container.setMessageListener(listener);
        container.setQueueNames("myQueue");
        container.setConnectionFactory(rabbitConnectionFactory);
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        container.start();
    }

  • #2
    You can't do this using the listener container but you can use the RabbitTemplate execute method, with a channel callback, in which case you can do whatever you want with the channel passed back in the callback (including consuming from it, receiving cancels etc).

    Code:
    	public <T> T execute(ChannelCallback<T> action);
    Code:
    public interface ChannelCallback<T> {
    
    	/**
    	 * Execute any number of operations against the supplied RabbitMQ
    	 * {@link Channel}, possibly returning a result.
    	 *
    	 * @throws Exception Not sure what else Rabbit Throws
    	 */
    	T doInRabbit(Channel channel) throws Exception;
    
    }
    You would put your code in the 'doInRabbit' method where you have complete control over the Channel.

    For the conection factory clientProperties (and other settings), you can simply wire up your own client factory and create a Spring CachingConnectionFactory by using the constructor that takes a client factory.

    Comment

    Working...
    X