Announcement Announcement Module
Collapse
No announcement yet.
error-channel on JMS adaptor Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • error-channel on JMS adaptor

    Hi Guys,

    I am using error-channel attribute on message driven endpoint. The flow is like

    MDEndPoint-->SA1-->SA2-->SI Channel-->PutMessageOnResponseQueue(Topic).

    If error occurs anywhere in the chain above then message goes to error-channel. The SA connected to error-channel constructs custom response and put on SI channel, which puts ultimately on responseQueue(Topic).

    Behavior when acknowledge attribute is set as below on the message adaptor
    1. if acknowledge= AUTO then original message on requestQueue gets deleted, which is what I want -- I mean the message should be removed from queue.
    1. if acknowledge= transacted/client then original message on requestQueue DO NOT gets deleted, which is what I don't want. As, I would not like to keep the failed message there on input request queue or would like to route the message to some ERROR queue.

    For all above cases, log entries are below.

    Code:
    [org.springframework.jms.listener.DefaultMessageListenerContainer#1-4][org.springframework.jms.listener.DefaultMessageListenerContainer] Execution of JMS message listener failed, and no ErrorHandler has been set. 
    org.springframework.integration.MessagingException: failed to send message
    	at org.springframework.integration.gateway.MessagingGatewaySupport.send(MessagingGatewaySupport.java:193)
    Code:
    <jms:message-driven-channel-adapter id="jmsRequestIn" destination="requestQueue" channel="priorityMessageChannelIn" max-concurrent-consumers="5" 
    		acknowledge="transacted" connection-factory="jmsQueueConnectionFactory" extract-payload="true" header-mapper="customHeaderMapper" max-messages-per-task="5" error-channel="busErrorChannel"/>

    Please let me know what I need to set or I am missing so that I get the original message deleted/consumed from the requestQueue once error-channel activator has done the job. And, I do not want to use AUTO acknowledge mode as I want message to be removed after listener execution.

    Thanks
    Last edited by rock_star; Feb 16th, 2011, 01:17 PM.

  • #2
    Can you clarify what you mean by the two #1 items you've listed there?

    Thanks,
    Mark

    Comment


    • #3
      Mark, I edited the two points in my first message. please let me know if it is clear now. Sorry for the ambiguity.

      Comment


      • #4
        If you are specifying acknowledge=transacted, it should only rollback if an Exception is propagated. Otherwise, the transaction should commit and the message should be removed from the Queue at that point. If you are seeing different behavior, can you provide a bit more detail, perhaps some configuration excerpts?

        Thanks,
        Mark

        Comment


        • #5
          Hi Mark,

          Please see the config below. I am throwing an exception from messageUnMarshallingTransformer and that ultimately comes to
          Code:
          jms:message-driven-channel-adapter id="jmsPriorityQueueMessagesIn"
          ,
          which has error-channel defined and hence should have removed the message from queue. But does not seem to be the behavior.

          Please let me know if you need more details. Hope I am not missing something.

          Thanks


          Code:
          <!--Error channel -->
          <si:channel id="busErrorChannel">
             <si:interceptors>
          		<si:ref bean="channelMessageInterceptor"/>
          	</si:interceptors>
           </si:channel>
          
          <!-- SA for handling messaages arriving on Error channel -->
          <si:service-activator ref = "errorMessageHandler" method="messageHandler" 
          	input-channel="busErrorChannel" output-channel="responseOutputChannel"/>
          
          <!-- custom erro handler bean which transforms the message and sends respons back on the 
          responseOutputChannel -->
          <bean id="errorMessageHandler" class="com.abc.exception.ErrorMessageHandler"/>
          
          <!--
          		channel adaptor to send the response message on the  Topic
          -->
          <jms:outbound-channel-adapter id="jmsResponsOut" destination="responseTopic"
          	channel="responseOutputChannel" connection-factory="jmsTopicConnectionFactory">
          </jms:outbound-channel-adapter>
          	
          <!-- Incoming messsage request queue -->
          <jms:message-driven-channel-adapter id="jmsPriorityQueueMessagesIn" destination="priorityQueue" 
          channel="unmarshallRequestMessageChannel" max-concurrent-consumers="1" acknowledge="transacted" 
          connection-factory="jmsQueueConnectionFactory" extract-payload="true" header-mapper="headerMapper" 
          max-messages-per-task="5" error-channel="busErrorChannel"/>
           
           <!--  Unmarshaller definition for requests -->
          <si:transformer id="messageUnmarshaller" input-channel="unmarshallRequestMessageChannel" 
          output-channel="controllerInChannel" 	ref="messageUnMarshallingTransformer" ></si:transformer>

          Comment


          • #6
            If i understand correctly the errors should be delegated to ErrorMessageHandler which is subscribed to the 'busErrorChannel'. There the ErrorMessage is transformed to what?

            Comment


            • #7
              So, what i do on ErrorMessageHandler is:

              1) Extract the failed message using

              Code:
              MessagingException exception = (MessagingException) messageInput
              					.getPayload();
              			failedMessage= exception.getFailedMessage();
              Response response = ressponseBuilder.createSpbResponse(failedMessage, exceptionMessage);
              String responseMessage = (String)messageMarshallingTransformer.doTransform(response, messageTypeStr);
              return MessageBuilder.withPayload(responseMessage).setHeader("status", 
              response.getResponseStatus().toString()).
              copyHeadersIfAbsent(failedMessage.getHeaders()).build();
              2) And, then construct a custom String XML response and put on the responseOutputChannel, which has a outbound channel adaptor subscriber putting message on the Topic. I created a consumer/listener on Topic and I see the message reaching the response Topic but the request message remains on request queue.

              Comment


              • #8
                Well, i just took your configuration and tried to simulate your scenario with ActiveMq and it behaves as I expected where the error-channel and handler attached to it essentially intercepts the error which if you handle it properly (you do) will not be propagated to JMS. In other words JMS has no idea that exception has happened downstream so I am a bit surprised. Could you verify that what you showing us is all it is there and that your error handling logic itself does not result in exception?

                Comment


                • #9
                  I would have thought so. But that would mean the message would not have travelled to response Topic, no?

                  May be something to do with Tibco EMS servers? ActiveMq seems to be nice but Tib world has always made me unhappy

                  P.S:

                  Oleg, you still in London or back to USA? I am near London Bridge, can catch up for a quick one in evening.

                  Comment


                  • #10
                    I am still surprised regardless of Tibco or ActiveMq. The main point is how does Tibco knows that there is an exception. The error-channel pattern should essentially suppress the exception thus making Tibco completely unaware of it.

                    P.S. I am still in London. Let me see how my time is and i'll let you know through the email.

                    Comment


                    • #11
                      Yes, in theory error-channel should do the stuff as mentioned by you but sadly it is not doing for me.

                      As soon as I turn the transacted ack on the mesasge gets retried infinitely as it is not removed/deleted.

                      I am surprised that it has message like "no ErrorHandler has been set". Though the ERROR message do get routed to error-channel( may be it is crying for unmarshalling chani) but eventually finds one on MDAdaptor.

                      Also, I added few logging statements in catch(Throwable) of the ErrorHandler class to ensure that I come to know of any failure in the ErrorHandler.
                      Just to add the error it encounters before propagating the exception to error-handler.

                      Code:
                      Execution of JMS message listener failed, and no ErrorHandler has been set. 
                      org.springframework.integration.MessagingException: failed to send message
                      	at org.springframework.integration.gateway.MessagingGatewaySupport.send(MessagingGatewaySupport.java:193)
                      	at org.springframework.integration.jms.ChannelPublishingJmsMessageListener.onMessage(ChannelPublishingJmsMessageListener.java:225)
                      	at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:535)
                      	at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:495)
                      	at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:467)
                      	at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:325)
                      	at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:263)
                      	at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1058)
                      	at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:952)
                      	at java.lang.Thread.run(Thread.java:619)
                      Caused by: org.springframework.integration.transformer.MessageTransformationException: failed to transform message
                      	at org.springframework.integration.transformer.AbstractTransformer.transform(AbstractTransformer.java:44)
                      	at org.springframework.integration.transformer.MessageTransformingHandler.handleRequestMessage(MessageTransformingHandler.java:67)
                      	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:98)
                      	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
                      	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:110)
                      	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97)
                      	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:44)
                      	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:157)
                      	at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:288)
                      	at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:149)
                      	at org.springframework.integration.core.MessagingTemplate.convertAndSend(MessagingTemplate.java:189)
                      	at org.springframework.integration.gateway.MessagingGatewaySupport.send(MessagingGatewaySupport.java:184)
                      	... 9 more
                      Caused by: java.lang.NullPointerException
                      	at com.abc.request.transformer.MessageUnMarshallingTransformer.doTransform(MessageUnMarshallingTransformer.java:111)
                      	at org.springframework.integration.transformer.AbstractTransformer.transform(AbstractTransformer.java:33)
                      	... 20 more

                      Comment


                      • #12
                        Can you try to bypass SI and simply register a listener with try/catch Throwable, force new RuntimeException in the try, handle it in the catch and see if the message is still on the destination.
                        It might also be Tibco specific configurations with regard to acknowledging transactions.

                        Comment


                        • #13
                          Is the stack trace you posted above revealing that an Exception is being thrown from your error-handling flow?, specifically:
                          Code:
                          Caused by: java.lang.NullPointerException
                          	at com.abc.request.transformer.MessageUnMarshallingTransformer.doTransform(
                          If an Exception is thrown after sending to the error-channel, it *will* propagate and roll back the transaction. Think of the error-handling flow as your "last chance" to create a successful response Message.

                          Comment


                          • #14
                            Hi Mark, the null pointer one was not from error-handling flow. I thought(wished was the case) the same earlier when Oleg answered few.

                            Ok, here i made a sample class(attached).

                            Behavior:
                            • AUTO ack: Message gets removed whether session.commit gets called or not
                            • CLIENT_ACK*/TRANSACTED acknowledge will require explicit commit call on the session for failure/success to have the message removed from the queue. Otherwise message will remain on queue.

                            Mark/Oleg,
                            After error handling flow, can we call the commit or it should have been called automatically. I am looking through DefaultMessageListenerContainer and AbstractMessageListenerContainer but have not reached to any good stage.

                            Thanks.

                            Comment


                            • #15
                              I think doExecuteListener will never commit if there was a runtimeException as I dont see handleListenerException(ex) ding anything.

                              I am assuming it is a RUnTimeException thrown when my system does not recognize a request message and as per code below commit will never happen.

                              I could be wrong but this is what I can figure out in AbstractMessageListenerContainer class. May be time to debug this one.

                              Code:
                              /**
                              	 * Execute the specified listener,
                              	 * committing or rolling back the transaction afterwards (if necessary).
                              	 * @param session the JMS Session to operate on
                              	 * @param message the received JMS Message
                              	 * @see #invokeListener
                              	 * @see #commitIfNecessary
                              	 * @see #rollbackOnExceptionIfNecessary
                              	 * @see #handleListenerException
                              	 */
                              	protected void executeListener(Session session, Message message) {
                              		try {
                              			doExecuteListener(session, message);
                              		}
                              		catch (Throwable ex) {
                              			handleListenerException(ex);
                              		}
                              	}
                              
                              	/**
                              	 * Execute the specified listener,
                              	 * committing or rolling back the transaction afterwards (if necessary).
                              	 * @param session the JMS Session to operate on
                              	 * @param message the received JMS Message
                              	 * @throws JMSException if thrown by JMS API methods
                              	 * @see #invokeListener
                              	 * @see #commitIfNecessary
                              	 * @see #rollbackOnExceptionIfNecessary
                              	 * @see #convertJmsAccessException
                              	 */
                              	protected void doExecuteListener(Session session, Message message) throws JMSException {
                              		if (!isAcceptMessagesWhileStopping() && !isRunning()) {
                              			if (logger.isWarnEnabled()) {
                              				logger.warn("Rejecting received message because of the listener container " +
                              						"having been stopped in the meantime: " + message);
                              			}
                              			rollbackIfNecessary(session);
                              			throw new MessageRejectedWhileStoppingException();
                              		}
                              		try {
                              			invokeListener(session, message);
                              		}
                              		catch (JMSException ex) {
                              			rollbackOnExceptionIfNecessary(session, ex);
                              			throw ex;
                              		}
                              		catch (RuntimeException ex) {
                              			rollbackOnExceptionIfNecessary(session, ex);
                              			throw ex;
                              		}
                              		catch (Error err) {
                              			rollbackOnExceptionIfNecessary(session, err);
                              			throw err;
                              		}
                              		commitIfNecessary(session, message);
                              	}

                              Comment

                              Working...
                              X