Announcement Announcement Module
Collapse
No announcement yet.
RabbitMQ and TopicExchange with Spring Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • RabbitMQ and TopicExchange with Spring

    Hi,
    as the title says I have got a question regarding TopicExchange via Spring. My Consumer and ProducerConfiguration all are based upon the class ServiceAMQPConfiguration:

    Code:
    @Configuration
    public class ServiceAMQPConfiguration {
    	
    	protected final String vhost = "vhost";
    	protected final String vhostUser = "user";
    	protected final String vhostPassword = "passwd";
    	
    	private int port = 5672;
    	
    	protected final String queueName = "queue";
    	protected final String topicExchange = "queue.command";
    
    	@Bean
    	public ConnectionFactory connectionFactory() {
    		CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
    		connectionFactory.setVirtualHost(vhost);
    		connectionFactory.setPort(port);
    		connectionFactory.setUsername(vhostUser);
    		connectionFactory.setPassword(vhostPassword);
    		return connectionFactory;
    	}
    	
    	@Bean
    	public AmqpAdmin amqpAdmin() {
    		return new RabbitAdmin(connectionFactory());
    	}
    
    	@Bean
    	public RabbitTemplate rabbitTemplate() {
    		RabbitTemplate template = new RabbitTemplate(connectionFactory());
    		template.setQueue(this.queueName);
    		template.setExchange(topicExchange);
    		template.setRoutingKey(this.queueName);
    		return template;
    	}
    
    	@Bean
    	public Queue serviceQueue() {
    		Queue queue = new Queue(this.queueName);
    		return queue;
    	}		
    	
    }
    My Consumer Configuration looks as follows:

    Code:
    @Configuration
    public class ConsumerConfiguration extends ServiceAMQPConfiguration {
    	
    		
    	@Bean
    	public SimpleMessageListenerContainer listenerContainer() {
    		SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    		
    		container.setConnectionFactory(connectionFactory());
    		container.setQueueNames(this.queueName);
    		container.setMessageListener(messageListenerAdapter());
    		
    		return container;
    	}
    	
    	@Bean 
    	public MessageListenerAdapter messageListenerAdapter() {
    		return new MessageListenerAdapter(new ServiceBroker());		
    	}
    
    	
    }
    My producer configuration as:

    Code:
            @Bean
    	public ScheduledProducer scheduledProducer() {
    		return new ScheduledProducer();
    	}
    
    	@Bean
    	public BeanPostProcessor postProcessor() {
    		return new ScheduledAnnotationBeanPostProcessor();
    	}
    
    
    	static class ScheduledProducer {
    
    		@Autowired
    		private volatile RabbitTemplate rabbitTemplate;
    		protected final String queueName = "queue";
    		protected final String topicExchange = "queue.command";
    
    		
    		@Scheduled(fixedRate = 3000)
    		public void sendMessage() {
    			rabbitTemplate.convertAndSend(topicExchange, queueName, "This is a test message");
    		}
    	}
    So my questions are the following:

    - There are actually no messages send, why?
    - Do I have a basic misunderstanding regarding the setup?

    Could someone give me a hint, based on this code structure?

  • #2
    Do you have a binding for that Queue to that TopicExchange already configured on the broker? If not, I think that's your missing piece. Also, if you are just mapping the literal name of the Queue (i.e. no patterns), then you might be okay with a DirectExchange. Is there a specific reason you are using a TopicExchange?

    Comment


    • #3
      Hi Mark,
      thanks for your quick reply. No atm I don't have any binding. I need to put the Binding into the Consumer, right? And queue.command should be in future queue.command.*. How do I define the Binding properly and wire it into my code with the usage of spring?

      Comment


      • #4
        Check out this Reference Manual section: http://static.springsource.org/sprin...-configuration

        That shows both XML namespace and @Bean configuration options with the latter using BindingBuilder.

        Hope that helps.
        -Mark

        Comment


        • #5
          Hi Mark,
          okay I changed my producer and consumer configuration as follows:

          Consumer:
          Code:
          @Value("${service.command.pattern}")
              private String commandRoutingKey;
          		
          	@Bean
          	public SimpleMessageListenerContainer listenerContainer() {
          		SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
          		
          		container.setConnectionFactory(connectionFactory());
          		container.setQueueNames(this.queueName);
          		container.setMessageListener(messageListenerAdapter());
          		
          		return container;
          	}
          	
          	@Bean 
          	public MessageListenerAdapter messageListenerAdapter() {
          		return new MessageListenerAdapter(new ServiceBroker());		
          	}
          	
          	@Bean
              public Queue commandQueue() {
                  return amqpAdmin().declareQueue();
              }
          
              @Bean
              public Binding marketDataBinding() {
                  return BindingBuilder.bind(
                          commandQueue()).to(commandExchange()).with(commandRoutingKey);
              }
          Producer:
          Code:
          @Bean
          	public ScheduledProducer scheduledProducer() {
          		return new ScheduledProducer();
          	}
          
          	@Bean
          	public BeanPostProcessor postProcessor() {
          		return new ScheduledAnnotationBeanPostProcessor();
          	}
          
          
          	static class ScheduledProducer {
          
          		@Autowired
          		private volatile RabbitTemplate rabbitTemplate;
          		protected final String queueName = "service";
          		protected final String topicExchange = "service.command";
          
          		
          		@Scheduled(fixedRate = 3000)
          		public void sendMessage() {
          			rabbitTemplate.convertAndSend(topicExchange, queueName, "This is a test message");
          		}
          	}
          But still no messages... :-(

          AbstractConfiguration looks like this:

          Code:
          protected final String vhost = "";
          	protected final String vhostUser = "guest";
          	protected final String vhostPassword = "guest";
          	
          	private int port = 5672;
          	
          	protected final String queueName = "service";
          	protected final String topicExchange = "service.command";
          
          	@Bean
          	public ConnectionFactory connectionFactory() {
          		CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
          		//connectionFactory.setVirtualHost(vhost);
          		connectionFactory.setPort(port);
          		connectionFactory.setUsername(vhostUser);
          		connectionFactory.setPassword(vhostPassword);
          		return connectionFactory;
          	}
          	
          	@Bean
          	public AmqpAdmin amqpAdmin() {
          		return new RabbitAdmin(connectionFactory());
          	}
          	
          
          	@Bean
          	public RabbitTemplate rabbitTemplate() {
          		RabbitTemplate template = new RabbitTemplate(connectionFactory());
          		template.setQueue(this.queueName);
          		template.setExchange(this.topicExchange);
          		template.setRoutingKey(this.queueName);
          		return template;
          	}
          
          	@Bean
          	public Queue serviceQueue() {
          		Queue queue = new Queue(this.queueName);
          		return queue;
          	}	
          	
          	@Bean
          	public TopicExchange commandExchange() {
          		return new TopicExchange(topicExchange, true, false);
          	}
          Could you help me out with that? I don't know what I am actually doing wrong... :-(

          Thanks Johannes

          Comment


          • #6
            Are you sure that ${service.command.pattern} is being replaced with something that matches the routing key (log it in your config method maybe)? Are you sure you are actually sending messages (e.g. put a log statement in your producer)?

            Comment


            • #7
              Hi Dave,
              yes I think so on both cases.

              Question one: ${service.command.pattern} is loaded from the config file. To prove it I replaced and hardcoded it in the producer with:
              private String commandRoutingKey = "service.command.*";

              Also I changed the producer here:
              Code:
              static class ScheduledProducer {
              
              		@Autowired
              		private volatile RabbitTemplate rabbitTemplate;
              		protected final String queueName = "service";
              		protected final String topicExchange = "service.command.POWER";
              
              		
              		@Scheduled(fixedRate = 3000)
              		public void sendMessage() {
              			rabbitTemplate.convertAndSend(topicExchange, queueName, "This is a test message");
              		}
              	}
              Line important: protected final String topicExchange = "service.command.POWER";

              When I set a breakpoint in rabbitTemplate.convertAndSend(...); I can see the method being executed. I am totally out of ideas. :-(

              Comment


              • #8
                I think you are almost there, but your bindings and routing keys do not match. It's hard to keep track because we are changing things in every post, and I don't think I've seen a complete end-to-end example yet.

                Correct me if I'm wrong, you have:

                Code:
                topic exchange name = service.command.POWER (changed from last post)
                queue name = <auto-generated and irrelevant>
                binding key (between the two): service.command.*
                routing key (confusingly called "queueName" in the code) = service
                Then you send a message to the exchange with a with the routing key=service (queueName). This routing key does not match the binding, so the message is discarded. Possibly you meant to use a routing key service.command.POWER? The exchange name is irrelevant as long as the binding and the producer use the same value, just as the queue name is irrelevant as long as the consumer and binding agree.

                Comment


                • #9
                  Hi,
                  okay I played around a little bit, and went along the more complex code sample provided in the Spring & RabbitMQ downloadables...My configuration regarding the queues look like this:

                  Code:
                  In the AbstractConfiguration:
                  protected final String QUEUE_NAME = "service";
                  protected final String ROUTING_KEY_NAME = QUEUE_NAME;
                  protected final String EXCHANGE_NAME = "service.command";
                  
                  In the ProducerConfiguration I added:
                  private String ROUTING_KEY = "service.command.*";
                  Doing a list_bindings on the RabbitMQ server results in:

                  Code:
                  Listing bindings ...
                          exchange        650dfac8-c928-4f14-9f0b-e475822babdf    queue   650dfac8-c928-4f14-9f0b-e475822babdf    []
                          exchange        service queue   service []
                  service.command exchange        650dfac8-c928-4f14-9f0b-e475822babdf    queue   service.command.*   []
                  ...done.
                  This is not what I expected, any idea? Please help me, it is driving me nuts! :-(
                  Last edited by johanneshiemer; Oct 20th, 2011, 03:30 AM.

                  Comment


                  • #10
                    Okay, solved got it running.

                    Comment

                    Working...
                    X