Announcement Announcement Module
Collapse
No announcement yet.
How to implement invalid message channel pattern Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • How to implement invalid message channel pattern

    Hello,

    Has anyone successfully implemented the invalid message channel pattern? I'm finding it surprisingly difficult to pull off.

    I have a JMS message driven channel adapter connected to a DefaultMessageListenerContainer, with a JSON to Pojo converter, down stream there's a validator and a service activator all wrapped in a local transaction.

    I'd like to be able to intercept all exceptions from the converter, validator and service, determine if they are caused by an invalid message, if so send the message on an invalid message channel and commit the transaction. If not an invalid message, bubble the exception up so the container roles back the transaction.

    This jira entry would solve my problems but it's not been implemented.

    I've considered sub classing the DefaultMessageListenerContainer and overriding the doExecuteListener to filter the exceptions and commit the transaction if necessary, but to then send the message on to the invalid message channel would require a tight coupling of a class inherited from the spring JMS namespace to the spring integration namespace, first to convert the message into a spring integration message, then to send it on a channel.

    Thanks

  • #2
    I've just updated INT-907 so that it will be included in the 2.0 M5 roadmap.

    Are you using 2.0?

    One thing you might want to look at is the support for ErrorHandler strategy on the DefaultMessageListenterContainer (as of Spring 3.0). Spring Integration provides the MessagePublishingErrorHandler implementation of that. Would that help as an interim solution?

    Comment


    • #3
      Wow, that was prompt :-)

      Yeah, we're using 2.0 and having lots of fun with it so thanks for the good work!

      We've already wired in that error handler but we need to make a decision about committing the trxn or not before it bubbles up to the handler.

      We thought we could override the following method in the AbstractMessageListenerContainer, this is hit before the error handler right? So we need to catch the business exception decide whether to commit or not and then rethrow which calls the error handler but then we've lost the original message.

      Are we missing something?

      Code:
      protected void doExecuteListener(Session session, Message message) throws JMSException {
      		if (!isAcceptMessagesWhileStopping() && !isRunning()) {
      			if (logger.isWarnEnabled()) {
      				logger.warn("Rejecting received message because of the listener container " +
      						"having been stopped in the meantime: " + message);
      			}
      			rollbackIfNecessary(session);
      			throw new MessageRejectedWhileStoppingException();
      		}
      		try {
      			invokeListener(session, message);
      		}
      		catch (JMSException ex) {
      			rollbackOnExceptionIfNecessary(session, ex);
      			throw ex;
      		}
      		catch (RuntimeException ex) {
      			rollbackOnExceptionIfNecessary(session, ex);
      			throw ex;
      		}
      		catch (Error err) {
      			rollbackOnExceptionIfNecessary(session, err);
      			throw err;
      		}
      		commitIfNecessary(session, message);
      	}

      Comment


      • #4
        I would try to rely on the underlying commit/rollback behavior as much as possible. In other words, your ErrorHandler should rethrow in the case where you want to rollback, and handle (without propagating) in the case you want to commit. That handling would involve sending to the channel (your 'invalidMessageChannel').

        Assuming that would work, then I guess you are saying that you don't have the original Spring Integration Message for the "invalid" cases? (it would be available for most MessagingException instances via the getFailedMessage() method, but are you expecting to catch a raw Exception that has not been wrapped by framework handler code?).

        As far as a generic solution to this in the framework, it seems like the decision of what makes an "invalid" vs. fatal error has to be configurable. Perhaps we can just accept a list of Exception types that are treated as commit-but-publish-to-invalid-message-channel cases.

        Comment


        • #5
          The generic exception strategy you propose sounds like a good idea.

          Just to be clear on our problem, we have the following config:

          Code:
          <bean id="container" class="our.overriden.MessageListenerContainer">
                  <property name="errorHandler" ref="messagePublishErorr"/>
                  <property name="destinationName" value="destination"/>
                  <property name="connectionFactory" ref="connectionFactory"/>
                  <property name="transactionManager" ref="transactionManager"/>
          </bean>
          
          <bean id="messagePublishErorr"    class="org.springframework.integration.channel.MessagePublishingErrorHandler">
                  <property name="defaultErrorChannel" ref="messageError"/>
          </bean>
          
          <integration:channel id="messageError"/>
          
          Then we would have an ErrorMessageExceptionTypeRouter listening on messageError channel and sending to our jms outbound channel adapter which sends to the invalid queue.
          If we override the method in the container as follows:

          Code:
          protected void doExecuteListener(Session session, Message message) throws JMSException {
          		if (!isAcceptMessagesWhileStopping() && !isRunning()) {
          			if (logger.isWarnEnabled()) {
          				logger.warn("Rejecting received message because of the listener container " +
          						"having been stopped in the meantime: " + message);
          			}
          			rollbackIfNecessary(session);
          			throw new MessageRejectedWhileStoppingException();
          		}
          		try {
          			invokeListener(session, message);
          		}
                          catch(OurBusinessException e) {
                            //commit the trxn but want to send the original message onto invalid queue
                           commitIfNecessary(session, message);
                           throw new InvalidMessageException(e); 
                         }
          		catch (JMSException ex) {
          			rollbackOnExceptionIfNecessary(session, ex);
          			throw ex;
          		}
          		catch (RuntimeException ex) {
          			rollbackOnExceptionIfNecessary(session, ex);
          			throw ex;
          		}
          		catch (Error err) {
          			rollbackOnExceptionIfNecessary(session, err);
          			throw err;
          		}
          		commitIfNecessary(session, message);
          	}
          All exceptions happen within a SI context, so are you saying then that if catch our business exception and then rethrow a SI MessagingException we will be able to get hold of the original message in our subscribers to the ErrorMessageExceptionTypeRouter?

          Hope that's clear?

          Comment


          • #6
            I think that should achieve your thing.

            Guys,

            I was trying to expcetion handling for my thing as well. I have a queue listeneing for messages using jms:message-driven-adaptor. from this I put the message on a channel defined by <channel id="testRouter" />

            There is a router class waiting for messages on the testRouter channel. If i dont get some info that I am looking for in message then router method returns null and router config is as below

            <router ref="requestRouter" input-channel="testRouter" default-output-channel="errorChannel" method="route" />

            Now, if the route does return null as there are no mapping in the system for that message, the message goes to error-channel. And, error channel handler keeps on logging the message.

            Is there a way that I can just log the message and finish it off or i have to send this to some destination like queue/topic...

            2) I dont get messages on errorChannel( atleast the handler on errorChanel does not get any message) if I manually change the code in route method to throw messagingException. Is this again because of direct channel?

            Please adivse

            Comment


            • #7
              James,

              did your approach worked for you..?

              Comment


              • #8
                Hi rock_star,

                I'm sure James won't mind me answering for him as he's on holiday. We decided not to pursue the invalid message channel pattern for now, we're going to wait for http://jira.springframework.org/browse/INT-907. Instead we're using Active MQs dead letter channel.

                Comment


                • #9
                  Hi nholland, thanks for the reply. We have the prod deployment in next month. Not sure if we can wait for that fix .

                  But certainly a good fix/feature when it comes on SI-2.0 M-5!

                  Comment

                  Working...
                  X