Announcement Announcement Module
Collapse
No announcement yet.
How to redirect the message to an error channel using spring integration. Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • How to redirect the message to an error channel using spring integration.

    I am trying to redirect my message to an error channel in the case of an exception. I have the following in my java code:

    Code:
        {
    
            if (eventName == null)
            {
                throw new IllegalArgumentException("EventName was null");
            }
    
            if (eventType == null)
            {
                throw new IllegalArgumentException("EventType was null");
            }
            .
            .
            .
            .
            .
    So instead of throwing an exception, I want to redirect it to an error channel.


    I am using direct channels in my xml file.

    In my java code I was able to do the following to send the message to a direct channel:


    DirectChannel channel;
    context = new ClassPathXmlApplicationContext("com.comp.asdt.audi ting/integration.xml");
    channel = (DirectChannel) context.getBean("annotatorChannel");


    and then do:
    channel.send(MessageBuilder.withPayload(auditEvent ).build());

    to send the payload.

    I am trying to do something similar with the error channel namely something like:

    ErrorChannel errorChannel;
    errorChannel = (ErrorChannel) context.getBean("errorChannel");

    and then do the following in my java code above:

    Code:
    {
    
            if (eventName == null)
            {
                errorChannel.send(MessageBuilder.withPayload(auditEvent).build());
            }
    
            if (eventType == null)
            {
                errorChannel.send(MessageBuilder.withPayload(auditEvent).build());
            }

    However I am not able to do this. It seems that unlike DirectChannel, there is no class named ErrorChannel for me to use. Does anyone know how I can do this?
    Last edited by albert_newton; Jun 15th, 2010, 02:28 PM. Reason: Fixed Grammar

  • #2
    The simplest option is to use the MessageChannel type in your getBean call:
    Code:
    MessageChannel errorChannel = (MessageChannel) context.getBean("errorChannel");
    Also, note that if you are using Spring 3.0+, you can avoid the cast by providing the expected type:
    Code:
    MessageChannel errorChannel = context.getBean("errorChannel", MessageChannel.class);
    Hope that helps.
    -Mark

    Comment


    • #3
      Originally posted by Mark Fisher View Post
      The simplest option is to use the MessageChannel type in your getBean call:
      Code:
      MessageChannel errorChannel = (MessageChannel) context.getBean("errorChannel");
      Also, note that if you are using Spring 3.0+, you can avoid the cast by providing the expected type:
      Code:
      MessageChannel errorChannel = context.getBean("errorChannel", MessageChannel.class);
      Hope that helps.
      -Mark
      Thanks for the reply. I changed that in my java code, but seems like it is still not sending the event to my error channel. I think the problem should be in my xml file, so here's what I am doing in my xml file:

      Code:
      <?xml version="1.0" encoding="UTF-8"?>
      <beans xmlns="http://www.springframework.org/schema/beans"
      	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      	xmlns:jee="http://www.springframework.org/schema/jee"
      	xmlns:int="http://www.springframework.org/schema/integration"
      	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
      	http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-1.0.xsd">
      
      	<int:channel id="annotatorChannel" datatype="com.comp.asfd.auditing.auditservice.AuditEvent">
      	</int:channel>
      	
      	<int:channel id="destinationChannel" datatype="com.comp.asdf.auditing.auditservice.AuditEvent">
      	</int:channel>	
      	
      	<int:poller id="defaultPoller" default="true" max-messages-per-poll="10">
      	    <int:interval-trigger interval="1" time-unit="SECONDS"/>
      	 </int:poller>
      	 
      	<int:outbound-channel-adapter channel="destinationChannel" ref ="destinationAdapter" method="handleAuditEvent">
      	 </int:outbound-channel-adapter>
      	
      	<int:transformer id="annotationTransformer" ref="annotationAdapter" input-channel="annotatorChannel" method="annotate"  output-channel="destinationChannel" />
      
      	<bean id="annotationAdapter" class="com.comp.asdf.auditing.spring.auditservice.integration.AnnotatingTransformer">
      		<property name="annotators">
      			<list>
      				<ref bean="angulataAnnotator" />
      			</list>
      		</property>
      	</bean>	
      		
      	<bean id="destinationAdapter" class="com.comp.asdf.auditing.destination.DestinationAdapter">
      		<property name="destinations">
      			<list>
      				<ref bean="sensageDestination" />
      			</list>
      		</property>
      	</bean>
      	
      	<bean id="com.comp.asdf.auditing.auditservice.AuditService"
      		class="com.comp.asdf.auditing.spring.auditservice.AuditServiceImpl">
      	</bean>	
      	
      	
      	<bean id="destinationAdapter1" class="com.comp.asdf.auditing.destination.DestinationAdapter">
      		<property name="destinations">
      			<list>
      				<ref bean="sensageDestination" />
      			</list>
      		</property>
      	</bean>
      
      	
      	<bean id="sensageDestination" class="com.comp.asdf.auditing.destination.SensageDestination" />
      	<bean id="angulataAnnotator" class="com.comp.asdf.auditing.spring.auditservice.AngulataAnnotationEngine" />
      </beans>


      The way it works right now, is that the event passes from the annotator channel to the destination channel. That's all the wiring that is going on in the xml file. So that when the send method is called from one of the classes on the channel ( this is from the same class and method where I want to add the exception handling), it calls the annotator channel, which processes the event and send it to the destination channel which again processes the event using the handleAuditEvent method. Now I want to pop in the error handling code somewhere before the send is called from the java class on the annotator channel. So I put that part

      Code:
       if (eventName == null)
              {
                  errorChannel.send(MessageBuilder.withPayload(auditEvent).build());
              }
      But I am not sure how to fit it in my xml file for this scenario. Here's what I tried to add to my xml file from above:

      Code:
      <int:channel id="errorChannel" datatype="com.comp.asdf.auditing.auditservice.AuditEvent">
           <int:queue capacity="500"/>
       </int:channel>
      
      <int:chain input-channel="errorChannel" >
         <int:header-enricher error-channel="errorChannel"/>
         <int:service-activator ref = "com.comp.asdf.auditing.auditservice.AuditService" method="log"/>
      </int:chain>
      where log is the method where my exception statements are. This does not seem to be writing the error events to the errorChannel, so either I have the error channel configured wrong or I am missing something in my xml file, but not sure.
      Last edited by albert_newton; Jun 15th, 2010, 04:25 PM. Reason: Changed text.

      Comment


      • #4
        Can you verify that the following method is invoked:

        Code:
        ...
        errorChannel.send(MessageBuilder.withPayload(auditEvent).build());
        ...
        Can you put a log statement there?

        Comment


        • #5
          Originally posted by oleg.zhurakousky View Post
          Can you verify that the following method is invoked:

          Code:
          ...
          errorChannel.send(MessageBuilder.withPayload(auditEvent).build());
          ...
          Can you put a log statement there?
          I put a system.out in that section, but it is not even reaching that part, when I run my junit. It actually errors out while loading the bean definition from the xml file. It gives an out of memory error for the Java heap space, and seems like it is actually erroring out while loading the error Channel beans or something associated with it, like the selector or so. Here is the stack trace:

          Code:
          org.springframework.integration.scheduling.SimpleTaskScheduler destroy
          INFO: shutting down TaskExecutor
          Jun 16, 2010 9:29:51 AM org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor shutdown
          INFO: Shutting down ThreadPoolExecutor
          Jun 16, 2010 9:29:51 AM org.springframework.beans.factory.support.DefaultSingletonBeanRegistry destroySingletons
          INFO: Destroying singletons in org.springframework.beans.factory.support.DefaultListableBeanFactory@1549f94: defining beans [errorChannel,org.springframework.integration.channel.MessagePublishingErrorHandler#0,taskScheduler,org.springframework.integration.selector.PayloadTypeSelector#0,org.springframework.integration.channel.interceptor.MessageSelectingInterceptor#0,annotatorChannel,org.springframework.integration.selector.PayloadTypeSelector#1,org.springframework.integration.channel.interceptor.MessageSelectingInterceptor#1,destinationChannel,org.springframework.integration.selector.PayloadTypeSelector#2,org.springframework.integration.channel.interceptor.MessageSelectingInterceptor#2,org.springframework.integration.scheduling.IntervalTrigger#0,defaultPoller,org.springframework.integration.handler.MethodInvokingMessageHandler#0,org.springframework.integration.config.ConsumerEndpointFactoryBean#0,org.springframework.integration.transformer.MethodInvokingTransformer#0,org.springframework.integration.transformer.MessageTransformingHandler#0,annotationTransformer,org.springframework.integration.transformer.HeaderEnricher#0,org.springframework.integration.transformer.MessageTransformingHandler#1,org.springframework.integration.handler.ServiceActivatingHandler#0,annotationAdapter,destinationAdapter,com.comp.asdf.auditing.auditservice.AuditService,destinationAdapter1,persistenceInterceptor,sensageDestination,angulataAnnotator]; root of factory hierarchy
          Jun 16, 2010 9:29:51 AM org.springframework.integration.scheduling.SimpleTaskScheduler destroy
          INFO: shutting down TaskExecutor
          Jun 16, 2010 9:29:51 AM org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor shutdown
          INFO: Shutting down ThreadPoolExecutor
          java.lang.OutOfMemoryError: Java heap space
          	at java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:99)
          	at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:393)
          	at java.lang.StringBuffer.append(StringBuffer.java:225)
          	at java.io.StringWriter.write(StringWriter.java:90)
          	at java.io.PrintWriter.write(PrintWriter.java:384)
          	at java.io.PrintWriter.write(PrintWriter.java:401)
          	at java.io.PrintWriter.print(PrintWriter.java:532)
          	at java.io.PrintWriter.println(PrintWriter.java:669)
          	at java.lang.Throwable.printStackTraceAsCause(Throwable.java:539)
          	at java.lang.Throwable.printStackTraceAsCause(Throwable.java:546)
          	at java.lang.Throwable.printStackTraceAsCause(Throwable.java:546)
          	at java.lang.Throwable.printStackTraceAsCause(Throwable.java:546)
          	at java.lang.Throwable.printStackTraceAsCause(Throwable.java:546)
          	                                  .
                                                    .
                                                    .
                                                    .
          So seems like something regarding the error Channel is configured incorrectly.
          Last edited by albert_newton; Jun 16th, 2010, 09:59 AM. Reason: Corrected text

          Comment


          • #6
            Can you please provide a minimal configuration that replicates the problem. The exception you are showing could be caused by any bean within your configuration

            Comment


            • #7
              Originally posted by oleg.zhurakousky View Post
              Can you please provide a minimal configuration that replicates the problem. The exception you are showing could be caused by any bean within your configuration

              Actually it was working fine before I added the part for the error handling. I just added the error handling later on. So this following configuration ( with all my other beans commented out) gives the same exception:

              Code:
              ?xml version="1.0" encoding="UTF-8"?>
              <beans xmlns="http://www.springframework.org/schema/beans"
              	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
              	xmlns:jee="http://www.springframework.org/schema/jee"
              	xmlns:int="http://www.springframework.org/schema/integration"
              	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
              	http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-1.0.xsd">
              
              
              <int:channel id="errorChannel" datatype="com.comp.asdf.auditing.auditservice.AuditEvent">
                   <int:queue capacity="200"/>
                   		<int:interceptors>
              			<int:ref bean="persistenceInterceptor" />
              		</int:interceptors>
               </int:channel>
              
              <int:chain input-channel="errorChannel" >
                 <int:header-enricher error-channel="errorChannel"/>
                 <int:service-activator ref = "com.comp.asdf.auditing.auditservice.AuditService" method="log"/>
              </int:chain>
              	
              	<int:poller id="defaultPoller" default="true" max-messages-per-poll="10">
              	    <int:interval-trigger interval="1" time-unit="SECONDS"/>
              	 </int:poller>
              
              <bean id="com.comp.asdf.auditing.auditservice.AuditService" class="com.comp.asdf.auditing.spring.auditservice.AuditServiceImpl">
              	</bean>		
              
              <bean id="persistenceInterceptor" class="com.comp.asdf.auditing.spring.auditservice.integration.FilePersistedChannelInterceptor" />
              
              </beans>

              Comment


              • #8
                Looking through the spring integration documentation, I found the following under the section titled 'B.4 Error Handling':

                The most important thing to understand here is that the messaging-based error handling will only apply to Exceptions that are thrown by a Spring Integration task that is executing within a TaskExecutor. This does not apply to Exceptions thrown by a handler that is operating within the same thread as the sender (e.g. through a DirectChannel as described above).


                Does this mean that since I am using a direct channel, it is not possible for me to be able to do error handling and send the bad messages to the error channel?

                Comment


                • #9
                  redirect to error channel

                  i have the same problem i am trying to redirect to an error channel but it never reaches.

                  <si:chain input-channel="securityChannel" >
                  <si:header-enricher>
                  <si:error-channel ref="fail"/>
                  </si:header-enricher>
                  <si:service-activator
                  ref="securityService" method="getUser"
                  />
                  </si:chain>
                  <si:channel id="fail" />

                  <si:chain input-channel="fail">
                  <si:transformer ref="errorUnwrapper" method="transform" />
                  <si:service-activator ref="securityService" method="sysout"/>
                  </si:chain>


                  Help..

                  Comment


                  • #10
                    That's correct.
                    As previous user noted the error-channel *only* plays role in two cases:
                    1. Inbound gateways/adapters
                    2. When thread boundaries are broken (e.g., poller, async channel etc.)

                    In the first scenario inbound messaging gateway by definition encapsulates the entry point to a Messaging system and thus defines the error-channel attribute to allow pluggable error-handling logic.
                    In the second scenario, you the caller do not own the thread but would like to know if anything wrong happened and that is why the error-channel plays role there.
                    Basically think of it this way; The caller into the messaging system must handle errors.
                    When you send message to a DirectChannel you are the caller into the messaging system and thus responsible to handle errors. If you are using gateway such gateway is the caller and thus has capabilities to handle errors. And when thread breakage is part of the messaging flow, each new thread is essentially a caller into a messaging system and provides error-handling capabilities.

                    I hope that clears it up

                    Comment


                    • #11
                      yes, thank you so much , this is very clear
                      thanks again for your quick response.

                      Comment

                      Working...
                      X