Announcement Announcement Module
Collapse
No announcement yet.
Help with AMQP Asyncronous Consumer Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Help with AMQP Asyncronous Consumer

    I am trying to create a simple POC that can asynchronously retrieve messages from rabbitmq as they are posted by a third party provider. I have created a simple provider that creates these messages to Rabbit. Now, I am trying to create a consumer and have some difficulty in creating it.

    Problem: The Test application runs fine inside eclipse. But when packaged and deployed in tomcat, the JsontoObjectMessageTransformer fails giving Jackson UnrecognizedPropertyException.

    Providing all configuration and logs in detail. Any pointers or help will be greatly appreciated.

    Order – simple POJO

    Code:
    public class Order implements Serializable {
    		
    	private int orderId;
    
    getter setter…


    OrderGateway:

    Code:
    public interface OrderGateway {
    	
    	public Future<Order> receiveOrder();
    
    }
    order-consumer-si-context.xml

    Code:
    <beans>
    	<context:spring-configured />
    
    	<context:component-scan base-package="com.company.sample.amqp.order.incoming"
    		annotation-config="true" />
    
    	<int-amqp:inbound-channel-adapter id="inboundOrder"
    		channel="fromAmqpJson" connection-factory="connectionFactory"
    		queue-names="order.unprocessed" acknowledge-mode="AUTO"
    		concurrent-consumers="2" task-executor="executor" error-handler="errorHandler" />
    
    	<bean name="errorHandler"
    		class="com.company.sample.amqp.order.incoming.LoggingErrorHandler" />
    
    	<bean name="jsonMessageConverter"
    		class="org.springframework.amqp.support.converter.JsonMessageConverter" />
    
    
    	<task:executor id="executor" keep-alive="3" pool-size="5-10"
    		queue-capacity="10" rejection-policy="ABORT" />
    
    
    	<int:channel id="fromAmqpJson">
    		<int:interceptors>
    			<int:wire-tap channel="logChannel" />
    		</int:interceptors>
    	</int:channel>
    
    	<int:json-to-object-transformer id="jsonToOrder"
    		input-channel="fromAmqpJson" output-channel="fromAmqp"
    		type="com.company.sample.amqp.order.Order" />
    
    	<int:channel id="fromAmqp">
    		<int:queue />
    		<int:interceptors>
    			<int:wire-tap channel="logChannel" />
    		</int:interceptors>
    	</int:channel>
    
    	<int:logging-channel-adapter id="logChannel"
    		level="INFO" log-full-message="true"></int:logging-channel-adapter>
    
    	<int:gateway id="orderGateway"
    		service-interface="com.company.sample.amqp.order.incoming.OrderGateway"
    		default-reply-channel="fromAmqp" default-request-channel="requestChannel">
    	</int:gateway>
    
    	<int:channel id="requestChannel">
    		<int:interceptors>
    			<int:wire-tap channel="logChannel" />
    		</int:interceptors>
    	</int:channel>
    
    	<!-- RABBIT MQ configuration -->
    	<rabbit:connection-factory id="connectionFactory"
    		host="localhost" username="guest" password="guest" />
    
    	<rabbit:template id="rabbitTemplate"
    		connection-factory="connectionFactory" queue="order.unprocessed"
    		exchange="customer.order.unprocessed" routing-key="order.unprocessed" />
    
    	<rabbit:topic-exchange name="customer.order.unprocessed"
    		auto-delete="false" durable="true">
    		<rabbit:bindings>
    			<rabbit:binding queue="order.unprocessed" pattern="order.*" />
    		</rabbit:bindings>
    	</rabbit:topic-exchange>
    
    	<rabbit:queue name="order.unprocessed" auto-delete="false"
    		durable="true" />
    
    	<!-- Spring AMQP Admin -->
    
    	<rabbit:admin id="rabbitAdmin" connection-factory="connectionFactory"
    		auto-startup="true" />
    
    
    
    
    </beans>

    Test Application when executed works fine. Attaching the Test application:

    Code:
    public class OrderConsumerSIApp {
    
    	private Logger log = Logger.getLogger(getClass());
    
    	private OrderGateway orderGateway;
    
    	public void consumeOrders() throws InterruptedException,
    			ExecutionException, TimeoutException {
    
    		ApplicationContext applicationContext = new ClassPathXmlApplicationContext(
    				"order-consumer-si-context.xml");
    
    		orderGateway = applicationContext.getBean("orderGateway",
    				OrderGateway.class);
    
    		Thread newThrd = new Thread();
    		newThrd.start();
    		do {
    
    			try {
    				Future<Order> order = orderGateway.receiveOrder();
    				
    				handleOrderFuture(order);
    				
    				Thread.sleep(1000);
    			} catch (AmqpException exc) {
    				log.info("Exception : " + exc.getMessage());
    			}
    		} while (true);
    
    	}
    
    	private void handleOrderFuture(Future<Order> order) {
    		try {
    			log.info(order.get(10, TimeUnit.SECONDS ).getOrderId());
    		} catch (Exception e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		} 
    	}
    
    	public static void main(String... args) {
    		OrderConsumerSIApp orderConsumer = new OrderConsumerSIApp();
    		try {
    			orderConsumer.consumeOrders();
    		}catch(Exception e){
    e.printStackTrace();	
    }
    This is the output when I execute the App:

    Code:
    INFO: [Payload={"orderId":1864381373,"billingApartment":null,"billingCity":null,"billingCountry":null,"billingState":null,"billingStreetAddress":null,"billingZip":null,"customerEmail":null,"customerFullName":null,"orderStatus":null,"orderTimestamp":null,"orderTotal":0.0,"shippingAddressSameFlag":0,"shippingApartment":null,"shippingCountry":null,"shippingState":null,"shippingStreetAddress":null,"shippingZip":null,"orderItems":null,"payment":null}][Headers={timestamp=1328742495038, id=3b5d26fa-7532-4af2-b4ba-67cc53c77fa5, amqp_receivedRoutingKey=order.unprocessed, amqp_deliveryMode=PERSISTENT, amqp_contentType=text/plain, amqp_receivedExchange=customer.order.unprocessed, amqp_contentEncoding=UTF-8, amqp_redelivered=false, amqp_deliveryTag=1708}]
    Feb 8, 2012 6:08:15 PM org.springframework.integration.handler.LoggingHandler handleMessageInternal
    INFO: [Payload=com.company.sample.amqp.order.Order@1cf6f61][Headers={timestamp=1328742495038, id=72b771f8-765b-4eab-83d3-9325df2473d0, amqp_receivedRoutingKey=order.unprocessed, amqp_deliveryMode=PERSISTENT, amqp_contentType=text/plain, amqp_receivedExchange=customer.order.unprocessed, amqp_contentEncoding=UTF-8, amqp_deliveryTag=1708, amqp_redelivered=false}]
    18:08:15,688  INFO main order.OrderConsumerSIApp:59 - 532448680
      18:08:16,670  INFO main order.OrderConsumerSIApp:59 - 495375727
      18:08:24,698  INFO main order.OrderConsumerSIApp:59 - 1073209887
    However, when I deploy it in tomcat, the transformation from JsonToObject fails and I get the following as the root cause of error:
    Code:
    Caused by: org.codehaus.jackson.map.exc.UnrecognizedPropertyException: Unrecogni
    zed field "orderId" (Class com.company.sample.amqp.order.Order), not marked as
     ignorable
    Last edited by tanmoy.banerjee; Feb 9th, 2012, 08:24 AM. Reason: mis typing

  • #2
    Attaching the complete error log:


    Code:
    INFO: Initializing ExecutorService  'taskScheduler'
    Feb 8, 2012 4:51:23 PM org.springframework.integration.endpoint.AbstractEndpoint
     start
    INFO: started orderGateway
    Feb 8, 2012 4:51:23 PM org.springframework.integration.endpoint.AbstractEndpoint
     start
    INFO: started orderGateway
    Feb 8, 2012 4:51:24 PM org.springframework.scheduling.concurrent.ExecutorConfigu
    rationSupport initialize
    INFO: Initializing ExecutorService
    Feb 8, 2012 4:51:24 PM org.springframework.amqp.rabbit.listener.SimpleMessageLis
    tenerContainer validateConfiguration
    WARNING: CachingConnectionFactory's channelCacheSize can not be less than the nu
    mber of concurrentConsumers so it was reset to match: 2
    Feb 8, 2012 4:51:25 PM org.springframework.context.support.DefaultLifecycleProce
    ssor$LifecycleGroup start
    INFO: Starting beans in phase -2147483648
    Feb 8, 2012 4:51:25 PM org.springframework.integration.endpoint.EventDrivenConsu
    mer logComponentSubscriptionEvent
    INFO: Adding {:jsonToOrder} as a subscriber to the 'fromAmqpJson' channel
    Feb 8, 2012 4:51:25 PM org.springframework.integration.channel.AbstractSubscriba
    bleChannel subscribe
    INFO: Channel 'fromAmqpJson' has 1 subscriber(s).
    Feb 8, 2012 4:51:25 PM org.springframework.integration.endpoint.AbstractEndpoint
     start
    INFO: started jsonToOrder
    Feb 8, 2012 4:51:25 PM org.springframework.integration.endpoint.EventDrivenConsu
    mer logComponentSubscriptionEvent
    INFO: Adding {logging-channel-adapter:logChannel.adapter} as a subscriber to the
     'logChannel' channel
    Feb 8, 2012 4:51:25 PM org.springframework.integration.channel.AbstractSubscriba
    bleChannel subscribe
    INFO: Channel 'logChannel' has 1 subscriber(s).
    Feb 8, 2012 4:51:25 PM org.springframework.integration.endpoint.AbstractEndpoint
     start
    INFO: started logChannel.adapter
    Feb 8, 2012 4:51:25 PM org.springframework.integration.endpoint.EventDrivenConsu
    mer logComponentSubscriptionEvent
    INFO: Adding {logging-channel-adapter:_org.springframework.integration.errorLogg
    er} as a subscriber to the 'errorChannel' channel
    Feb 8, 2012 4:51:25 PM org.springframework.integration.channel.AbstractSubscriba
    bleChannel subscribe
    INFO: Channel 'errorChannel' has 1 subscriber(s).
    Feb 8, 2012 4:51:25 PM org.springframework.integration.endpoint.AbstractEndpoint
     start
    INFO: started _org.springframework.integration.errorLogger
    Feb 8, 2012 4:51:25 PM org.springframework.context.support.DefaultLifecycleProce
    ssor$LifecycleGroup start
    INFO: Starting beans in phase 0
    Feb 8, 2012 4:51:25 PM org.springframework.integration.endpoint.AbstractEndpoint
     start
    INFO: started inboundOrder
    Feb 8, 2012 4:51:25 PM org.springframework.web.context.ContextLoader initWebAppl
    icationContext
    INFO: Root WebApplicationContext: initialization completed in 3001 ms
    Feb 8, 2012 4:51:25 PM org.apache.catalina.startup.HostConfig deployDirectory
    INFO: Deploying web application directory docs
    16:51:25,821 ERROR executor-1 incoming.LoggingErrorHandler:25 - *****ERROR*****
      org.springframework.amqp.rabbit.listener.ListenerExecutionFailedException: Lis
    tener threw exception
            at org.springframework.amqp.rabbit.listener.AbstractMessageListenerConta
    iner.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerConta
    iner.java:590)
            at org.springframework.amqp.rabbit.listener.AbstractMessageListenerConta
    iner.doInvokeListener(AbstractMessageListenerContainer.java:529)
            at org.springframework.amqp.rabbit.listener.AbstractMessageListenerConta
    iner.invokeListener(AbstractMessageListenerContainer.java:472)
            at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContain
    er.access$001(SimpleMessageListenerContainer.java:56)
            at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContain
    er$1.invokeListener(SimpleMessageListenerContainer.java:103)
            at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContain
    er.invokeListener(SimpleMessageListenerContainer.java:560)
            at org.springframework.amqp.rabbit.listener.AbstractMessageListenerConta
    iner.executeListener(AbstractMessageListenerContainer.java:452)
            at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContain
    er.doReceiveAndExecute(SimpleMessageListenerContainer.java:436)
            at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContain
    er.receiveAndExecute(SimpleMessageListenerContainer.java:420)
            at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContain
    er.access$200(SimpleMessageListenerContainer.java:56)
            at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContain
    er$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:505)
            at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExec
    utor.java:886)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor
    .java:908)
            at java.lang.Thread.run(Thread.java:619)
    Caused by: org.springframework.integration.transformer.MessageTransformationExce
    ption: failed to transform message
            at org.springframework.integration.transformer.AbstractTransformer.trans
    form(AbstractTransformer.java:44)
            at org.springframework.integration.transformer.MessageTransformingHandle
    r.handleRequestMessage(MessageTransformingHandler.java:67)
            at org.springframework.integration.handler.AbstractReplyProducingMessage
    Handler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:97)
            at org.springframework.integration.handler.AbstractMessageHandler.handle
    Message(AbstractMessageHandler.java:73)
            at org.springframework.integration.dispatcher.UnicastingDispatcher.doDis
    patch(UnicastingDispatcher.java:114)
            at org.springframework.integration.dispatcher.UnicastingDispatcher.dispa
    tch(UnicastingDispatcher.java:101)
            at org.springframework.integration.channel.AbstractSubscribableChannel.d
    oSend(AbstractSubscribableChannel.java:61)
            at org.springframework.integration.channel.AbstractMessageChannel.send(A
    bstractMessageChannel.java:157)
            at org.springframework.integration.channel.AbstractMessageChannel.send(A
    bstractMessageChannel.java:128)
            at org.springframework.integration.core.MessagingTemplate.doSend(Messagi
    ngTemplate.java:288)
            at org.springframework.integration.core.MessagingTemplate.send(Messaging
    Template.java:149)
            at org.springframework.integration.endpoint.MessageProducerSupport.sendM
    essage(MessageProducerSupport.java:92)
            at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapte
    r.access$200(AmqpInboundChannelAdapter.java:39)
            at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapte
    r$1.onMessage(AmqpInboundChannelAdapter.java:73)
            at org.springframework.amqp.rabbit.listener.AbstractMessageListenerConta
    iner.doInvokeListener(AbstractMessageListenerContainer.java:527)
            ... 12 more
    Caused by: org.codehaus.jackson.map.exc.UnrecognizedPropertyException: Unrecogni
    zed field "orderId" (Class com.company.sample.amqp.order.Order), not marked as
     ignorable
     at [Source: java.io.StringReader@51f88fbd; line: 1, column: 22] (through refere
    nce chain: com.company.sample.amqp.order.Order["orderId"])
            at org.codehaus.jackson.map.exc.UnrecognizedPropertyException.from(Unrec
    ognizedPropertyException.java:53)
            at org.codehaus.jackson.map.deser.StdDeserializationContext.unknownField
    Exception(StdDeserializationContext.java:246)
            at org.codehaus.jackson.map.deser.StdDeserializer.reportUnknownProperty(
    StdDeserializer.java:604)
            at org.codehaus.jackson.map.deser.StdDeserializer.handleUnknownProperty(
    StdDeserializer.java:590)
            at org.codehaus.jackson.map.deser.BeanDeserializer.handleUnknownProperty
    (BeanDeserializer.java:689)
            at org.codehaus.jackson.map.deser.BeanDeserializer.deserializeFromObject
    (BeanDeserializer.java:514)
            at org.codehaus.jackson.map.deser.BeanDeserializer.deserialize(BeanDeser
    ializer.java:350)
            at org.codehaus.jackson.map.ObjectMapper._readMapAndClose(ObjectMapper.j
    ava:2395)
            at org.codehaus.jackson.map.ObjectMapper.readValue(ObjectMapper.java:159
    5)
            at org.springframework.integration.json.JsonToObjectTransformer.transfor
    mPayload(JsonToObjectTransformer.java:49)
            at org.springframework.integration.json.JsonToObjectTransformer.transfor
    mPayload(JsonToObjectTransformer.java:30)
            at org.springframework.integration.transformer.AbstractPayloadTransforme
    r.doTransform(AbstractPayloadTransformer.java:33)
            at org.springframework.integration.transformer.AbstractTransformer.trans
    form(AbstractTransformer.java:33)
            ... 26 more

    Comment


    • #3
      I don't know why Tomcat makes a difference. It looks like your Json converter transformation is different when there is an error (it is trying to create an instance of com.company.sample.amqp.order.Order, whereas when it is successful we see com.company.sample.sample.order.Order). Do you have some old stuff on your classpath in the webapp?

      Comment


      • #4
        cross check

        Hi Dave,

        No, the Order instances are same. It was a wrong Find&Replace to remove company etc names.

        Comment


        • #5
          update to this - temporary resolution

          In my first post, i mentioned that the Order has orderId which was int and not Integer. This we did because we wanted it to be int and not Integer. However, this led to the issue i was getting in tomcat.

          The JsonTransformer was failing ( actually Jackson was failing) to convert the Json back to int. Once i converted the id from int to Integer, everything worked fine in tomcat.

          However, i still wonder why it was working inside the test application in eclipse and failing in tomcat. If converting it to the native type int is a limitation, it should have failed in the test application too.

          Comment

          Working...
          X