Announcement Announcement Module
Collapse
No announcement yet.
Receiving multiple messages in consumer Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Receiving multiple messages in consumer

    Hi,

    I am trying to make a sample app work with the rc2 jars. The only problem I got is that my consumer is showing multiple messages as received , like this:

    MsgConsumer:handleMessage
    Decrypted message is : Hello world
    MsgConsumer:handleMessage
    Decrypted message is : Hello world
    MsgConsumer:handleMessage
    Decrypted message is : Hello world
    MsgConsumer:handleMessage
    Decrypted message is : Hello world

    Sometime this message comes just once, but many a times mutiple, (2/3/4/5)

    But I am just sending a single message. What am I missing here ?

    Following is the code.

    Invoker.java
    ==========

    Code:
    	
    public static void main(String[] args) {
    		Invoker i = new Invoker();
    		i.declareExchangeAndQueue();
    		i.publishMsg();
    	}
    
    	private void declareExchangeAndQueue() {
    		ConfigurableApplicationContext rabbitMQAdminCntxt = new ClassPathXmlApplicationContext(CLASSPATH_RABBIT_MQ_ADMIN_CONFIG_XML);
    		AmqpAdmin amqpAdmin = (AmqpAdmin) rabbitMQAdminCntxt.getBean(AMQP_ADMIN_BEAN);
    		DirectExchange exchange = createExchange(amqpAdmin);
    		Queue queue = createQueue(amqpAdmin);
    		amqpAdmin.declareBinding(BindingBuilder.bind(queue).to(exchange).with(""));
    	}
    
    	private void publishMsg() {
    		ConfigurableApplicationContext eventingCntxt = new ClassPathXmlApplicationContext(CLASSPATH_EVENTING_CONFIG_XML);
    		MsgPublisher messagePublisher = (MsgPublisher) eventingCntxt.getBean(MSG_PUBLISHER_BEAN);
    		messagePublisher.publishMsg(SAMPLE_EXCHANGE, HELLO_WORLD);		
    	}
    
    	private Queue createQueue(AmqpAdmin amqpAdmin) {
    		Queue queue = new Queue(SAMPLE_QUEUE, true);
    		amqpAdmin.declareQueue(queue);
    		return queue;
    	}
    
    	private DirectExchange createExchange(AmqpAdmin amqpAdmin) {
    		DirectExchange exchange = new DirectExchange(SAMPLE_EXCHANGE, true, false);
    		amqpAdmin.declareExchange(exchange);
    		return exchange;
    	}
    CLASSPATH_RABBIT_MQ_ADMIN_CONFIG_XML
    ======================================
    Code:
    	
    	<bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory" scope="singleton">
    		<constructor-arg value="localhost" />
    		<property name="username" value="guest" />
    		<property name="password" value="guest" />
    	</bean>
    	
    	<!-- Configure the admin class -->
    	<bean id="amqpAdmin" class="org.springframework.amqp.rabbit.core.RabbitAdmin">
    		<constructor-arg ref="connectionFactory" />
    	</bean>

    CLASSPATH_EVENTING_CONFIG_XML
    ================================
    Code:
    	
    	<import resource="classpath:config/rabbitMQAdminConfig.xml"/>
    	
    	<!-- Publisher settings starts -->
    	<!-- Configure spring AMQP Template for publisher -->
    	<bean id="amqpPublisherTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate" p:connectionFactory-ref="connectionFactory" />
    
    	<!-- Message Publisher settings -->
    	<bean id="msgPublisher" class="com.eventing.publisher.MsgPublisher">
    		<property name="amqpPublisherTemplate" ref="amqpPublisherTemplate" />
    	</bean>
    	
    	
    	<!-- Consumers' settings starts -->
    	<!-- Message handler settings -->
    	<bean id="msgConsumer" class="com.eventing.subscriber.handlers.MsgConsumer" />
    	
    	<!-- Message handler Adaptor settings -->
    	<bean id="msgConsumerAdaptor" class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter">
    		<constructor-arg ref="msgConsumer" />
    	</bean>
    
    	<!-- Message Listener settings -->
    	<bean id="msgListener" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer"
    		p:connectionFactory-ref="connectionFactory" p:queueNames="sampleQueue" p:acknowledgeMode="AUTO" p:messageListener-ref="msgConsumerAdaptor" />
    MsgPublisher.java
    ================
    Code:
    	private AmqpTemplate amqpPublisherTemplate;
    	
    	public void publishMsg(String exchange, String xmlMessage) {
    		String encMsg = Base64Util.encode(xmlMessage);
    		getAmqpPublisherTemplate().convertAndSend(exchange,null,encMsg);
    	}
    
    	public AmqpTemplate getAmqpPublisherTemplate() {
    		return amqpPublisherTemplate;
    	}
    	
    	public void setAmqpPublisherTemplate(AmqpTemplate amqpPublisherTemplate) {
    		this.amqpPublisherTemplate = amqpPublisherTemplate;
    	}
    Code:
    MsgConsumer.java
    ================
    	// All the variables are method-local except subscriptionsDao,noOfEmailWorkerThreads and applicationContext 
    	// (they are not being modified in this method, just being read)
    	public void handleMessage(String msg) {
    		String decMsg = Base64Util.decode(msg);
    		System.out.println("MsgConsumer:handleMessage \nDecrypted message is : " + decMsg);
    	}

    Following is the bean loading :
    ========================

    Code:
    31 Jul, 2011 3:28:07 PM org.springframework.context.support.AbstractApplicationContext prepareRefresh
    INFO: Refreshing org.springframework.context.support.ClassPathXmlApplicationContext@4def8cf3: startup date [Sun Jul 31 15:28:07 IST 2011]; root of context hierarchy
    31 Jul, 2011 3:28:07 PM org.springframework.beans.factory.xml.XmlBeanDefinitionReader loadBeanDefinitions
    INFO: Loading XML bean definitions from class path resource [config/rabbitMQAdminConfig.xml]
    31 Jul, 2011 3:28:07 PM org.springframework.beans.factory.support.DefaultListableBeanFactory preInstantiateSingletons
    INFO: Pre-instantiating singletons in org.springframework.beans.factory.support.DefaultListableBeanFactory@19632847: defining beans [connectionFactory,amqpAdmin]; root of factory hierarchy
    31 Jul, 2011 3:28:07 PM org.springframework.context.support.AbstractApplicationContext prepareRefresh
    INFO: Refreshing org.springframework.context.support.ClassPathXmlApplicationContext@39ed1b0b: startup date [Sun Jul 31 15:28:07 IST 2011]; root of context hierarchy
    31 Jul, 2011 3:28:07 PM org.springframework.beans.factory.xml.XmlBeanDefinitionReader loadBeanDefinitions
    INFO: Loading XML bean definitions from class path resource [config/eventingConfig.xml]
    31 Jul, 2011 3:28:07 PM org.springframework.beans.factory.xml.XmlBeanDefinitionReader loadBeanDefinitions
    INFO: Loading XML bean definitions from class path resource [config/rabbitMQAdminConfig.xml]
    31 Jul, 2011 3:28:07 PM org.springframework.beans.factory.support.DefaultListableBeanFactory preInstantiateSingletons
    INFO: Pre-instantiating singletons in org.springframework.beans.factory.support.DefaultListableBeanFactory@5d7b6643: defining beans [connectionFactory,amqpAdmin,amqpPublisherTemplate,msgPublisher,msgConsumer,msgConsumerAdaptor,msgListener]; root of factory hierarchy
    31 Jul, 2011 3:28:07 PM org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup start
    INFO: Starting beans in phase 2147483647
    MsgConsumer:handleMessage 
    Decrypted message is : Hello world
    MsgConsumer:handleMessage 
    Decrypted message is : Hello world

  • #2
    The main problem I see is that your client is short lived, and yet it makes no attempt to wait until the message is fully consumed before stopping. It also doesn't close the ApplicationContext, so the shutdown behaviour is essentially undefined. Put those facts together and I would guess that the acks sometimes don't get sent from the consumer, so the broker redelivers messages from the last n runs next time you run your app. You can verify that by looking at the queue using rabbitmqctl or one of the admin tools. You probably should make an attempt to reset the broker to a known state (e.g. by deleting the queue) before you start anything, if you really want to see the effect only of your single message producer.

    Comment


    • #3
      Thanks

      Thanks for your suggestion. By deleting the exchange and the queue as the 1st step, I get only a single message on the consumer side now .
      Can you please share your thoughts on :
      1. I am wondering that how big the consumer's processing should be, so that it can't be called as "short-lived".
      2. Since producer and consumer are not coupled, I think that consumer should not be closing down the app context started by the producer. Should I have separate app-contexts for Producer and consumer?

      Regards,
      Kshitiz
      Last edited by gargkshitiz; Aug 1st, 2011, 06:21 AM.

      Comment


      • #4
        1. It wasn't the work that the consumer does that led me to say your client is short lived - it's because it is a main() method with no obvious mechanism to stop it from exiting as soon as you have sent the message. Sometimes you might have a non-daemon thread in the ApplicationContext that would keep the JVM alive (in this case you don't because you use the default task executor settings), but even then you need to close the context to shut it down gracefully.

        2. It's up to you, and it really depends on the application. If you are using messaging, the chances are that you need multiple physical deployments, but whether they have different ApplicationContexts really depends on the business problem. Typically they would if they have different business concerns.

        Comment


        • #5
          Thanks Dave, I understood it now

          Comment

          Working...
          X