Announcement Announcement Module
Collapse
No announcement yet.
Auto declare queue for unroutable message?' Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Auto declare queue for unroutable message?'

    I have a scenario that need to declare separate queue for every Type of message. But "Type" of message is out of my control (decided by upper apps), so I decide to implements a auto-adaptive queue declaration. Eg, use message's type as it's routingKey, if it is unroutable, in ReturnCallback of rabbitTemplate declare a new Queue and Binding, and resend the unroutable message. But the ReturnCallback of rabbitTemplate is hanging when i declare new Queue in it(even though the queue has been successfully declared in Broker).

    Another question:
    Can Spring AMQP has some APIs can help me to retrive queue's info from Message Broker? just like rabbitmqctl list_queues did?

    Any response would be greatly appreciated.
    Regards.


  • #2
    Try to declare a new queue outside of ReturnCallback's Thread. I mean send the command to create queue and resend message to some TaskExecutor.

    Can Spring AMQP has some APIs can help me to retrive queue's info from Message Broker? just like rabbitmqctl list_queues did?
    No, Spring AMQP doesn't provide similar API.
    However you can do it using RestTemplate from your app and RabbitManagement Rest API: https://www.rabbitmq.com/management.html

    Comment


    • #3
      Originally posted by Artem Bilan View Post
      Try to declare a new queue outside of ReturnCallback's Thread. I mean send the command to create queue and resend message to some TaskExecutor.


      No, Spring AMQP doesn't provide similar API.
      However you can do it using RestTemplate from your app and RabbitManagement Rest API: https://www.rabbitmq.com/management.html
      Hi Artem, thank you so much for your prompt response! It works!

      Can I ask why declare new queue for unroutable message and resend it again need outside of ReturnCallback's Thread?

      And one more concern:
      Since ReturnCallback is asynchronous, for a bunch of simultaneously incoming messages that belong to a new Type, the operation of declare new queue for this Type of message probably execute mutiple times, is there any potential threats with that?


      Cheers.
      --Jason

      Comment


      • #4
        why declare new queue for unroutable message and resend it again need outside of ReturnCallback's Thread?
        Because you are going to get deal with RabbitMQ and it will be some admin operations, so you should do it within a new AMQP Channel and allow for that failed to finish its work properly.

        the operation of declare new queue for this Type of message probably execute mutiple times, is there any potential threats with that?
        It doesn't hurt by AMQP protocol at all: any attempt to declare an existing AMQP entity finishes good without issues

        Comment


        • #5
          Here is my solution:

          On the Producer:
          Code:
              @Override
              public void produce(EmailForm emailForm) {
                  this.rabbitTemplate.convertAndSend("prefix.".concat(emailForm.getTag()), emailForm, new MessagePostProcessor() {
                      public Message postProcessMessage(Message message) throws AmqpException {
                          message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
                          if(logger.isTraceEnabled()) {
                              logger.trace("PUBLISHED Message - {}", message);
                          }
                          return message;
                      }
                  });
              }

          ReturnCallback of rabbitTemplate:
          Code:
              @PostConstruct
              public void init() {
                  this.rabbitTemplate.setReturnCallback(new ReturnCallback(){
                      @Override
                      public void returnedMessage(Message message, int replyCode,
                              String replyText, String exchange, String routingKey) {
                          if (AMQP.NO_ROUTE == replyCode) {
                              if (logger.isInfoEnabled()) {
                                  logger.info("Unroutable Message: \n\t exchange : {}\n\t routingKey : {}",
                                          new Object[]{exchange, routingKey});
                              }
                              taskExecutor.execute(new ResendUnroutedTask(message, routingKey));
                          } else {
                              if (logger.isInfoEnabled()) {
                                  logger.info("Publishing failed - replyCode:[{}] replyText:[{}] exchange:[{}] routingKey:[{}]",
                                          new Object[]{replyCode, replyText, exchange, routingKey});
                              }
                          }
                      }
                  });
              }
          
              public class ResendUnroutedTask implements Runnable {
          
                  private Message message;
                  private String routingKey;
          
                  public ResendUnroutedTask(Message message, String routingKey) {
                      this.message = message;
                      this.routingKey = routingKey;
                  }
          
                  @Override
                  public void run() {
                      Queue queue = new Queue(routingKey, true, false, false, defaultQueueArgs);
                      rabbitAdmin.declareQueue(queue);
                      rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(emailTopicExchange).with(routingKey));
                      //send message again
                      rabbitTemplate.send(emailTopicExchange.getName(), routingKey, message);
                      //send name of declared queue to a special queue
                      rabbitTemplate.convertAndSend("nameOfSpecialQueue", queue.getName());
                  }
          
              }
          Consumer of SpecialQueue:
          Code:
              private static final Object lock = new Object();
          
              @Autowired
              private SimpleMessageListenerContainer messageListenerContainer;
          
              @Override
              public void handleNewTemplateQueue(String queueName) {
                  synchronized (lock) {
                      String[] consumedQueueNames = messageListenerContainer.getQueueNames();
          
                      if (!Arrays.asList(consumedQueueNames).contains(queueName)) {
                          if (logger.isInfoEnabled()) {
                              logger.info("Consuming new queue: [{}]", queueName);
                          }
                          messageListenerContainer.addQueueNames(queueName);
                      } else {
                          if (logger.isInfoEnabled()) {
                              logger.info("Already consumed queue: [{}]", queueName);
                          }
                      }
                  }
              }

          Comment

          Working...
          X