Announcement Announcement Module
Collapse
No announcement yet.
How to configure consumers with spring xml-namespace Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • How to configure consumers with spring xml-namespace

    Hi all,

    I am new to Spring AMQP playing arround with RabbitMQ. Just studied some examples from a talk of Rob (http://www.infoq.com/presentations/M...-with-RabbitMQ) in france this year (I guess) where he introduceces the stockprice example being consumed by different clients and languages. Being inspired by the example I wanted to implement a little webapplication where i can subscribe to news-channels and receive the news as far as a news producer creates a news for this channel. So I have a little mvc-controller where I can subscribe to three different news-channel. After subscribing to the channel the application changes to a screen with long polling and checks wheter ther is a news in the channel I just subscribed to. On the publisher side there is a litte controller where I can create a news and assign it to one or more channels. So far so good. With only one subscriber this works fine. But if a second subscriber subscribes to the same channel I get the problem that the messages are distributed round-robin between the subscriber. In the stockprice example Rob fixed this in changing the implementation of the consumer (subscriber) part. This was done on the low-level-api of RabbitMQ-Client. My question now is how can I configure Rabbit correctly to achieve the goal I described above? I've tried different things on the publisher-side with different exchange-types etc. Actually I think I can go with the topic-exchange-type where I have one topic-exchange "news.topic" and multiple bindings to different queues.

    Here is my configuration:
    Code:
    	<rabbit:connection-factory id="connectionFactory" username="guest" password="guest" host="localhost"/>
    	
    	<rabbit:template id="news.publish.rabbitTemplate" exchange="news.topic" connection-factory="connectionFactory" message-converter="jsonMessageConverter"/>
    	
    	<rabbit:admin connection-factory="connectionFactory" />
    
     	<rabbit:topic-exchange name="news.topic">
    		<rabbit:bindings>
    			<rabbit:binding queue="news.sport" pattern="news.sport"/>
    			<rabbit:binding queue="news.economy" pattern="news.economy"/>
    			<rabbit:binding queue="news.international" pattern="news.international"/>
    		</rabbit:bindings>
    	</rabbit:topic-exchange>
    
    	<rabbit:queue name="news.sport" />
    	<rabbit:queue name="news.economy"/>
    	<rabbit:queue name="news.international" />
    I know there goes a lot under the hood in configuring with namespace... so maybe a just missed some configuration properties.
    The consumer consumes the messages asynchronously just with this code:

    Code:
    ...
    	@Inject
    	@Named("connectionFactory")
    	private ConnectionFactory rabbitConnectionFactory;
    
    	@Override
    	public void subscribe(	String newsChannel,
    							MessageListener listener)
    	{
    		SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    		container.setConnectionFactory(this.rabbitConnectionFactory);
    		container.setQueueNames(newsChannel);
    		container.setMessageListener(listener);
    		container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    		container.start();
    	}
    ...
    Can someone point me to the missing "magic"? :-)

    Thanks and cheers,
    Sandro

  • #2
    We just found the solution and wanted to share it... In this case the namespace-config (we think) does not work... but you can do it manually on your own (or with java-config)... the simple part was:

    Code:
    ...
    	@Inject
    	@Named("connectionFactory")
    	private ConnectionFactory rabbitConnectionFactory;
    
    	@Inject
    	private RabbitAdmin rabbitAdmin;
    
    	private Map<Integer, SimpleMessageListenerContainer> containers = new HashMap<Integer, SimpleMessageListenerContainer>();
    
    	@Override
    	public void subscribe(	String newsChannel,
    							MessageListener listener)
    	{
    		Queue q = new AnonymousQueue();
    		rabbitAdmin.declareQueue(q);
    		Binding binding = BindingBuilder.bind(q).to(new TopicExchange("amq.topic")).with(newsChannel);
    		rabbitAdmin.declareBinding(binding);
    
    		SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    		container.setConnectionFactory(rabbitConnectionFactory);
    		container.setMessageListener(listener);
    		container.setQueues(q);
    		container.start();
    
    		containers.put(listener.hashCode(), container);
    	}
    ...
    The key point here is to use the RabbitAdmin component to register the new queue on Rabbit...

    Thats it... :-)

    Comment

    Working...
    X