Announcement Announcement Module
Collapse
No announcement yet.
Error handling, Aggregator and response problems Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Error handling, Aggregator and response problems

    I'm trying to implement error aggregation based on the sample here:
    https://github.com/olegz/s12gx.2011/...egration/error

    I use Integration version 2.0.5, so aggregator functions off the CorrelatingMessageHandler class (I noticed in later version it's been broken up into more classes).

    All the internal wiring works fine, except for I do not get the response back.
    Here is the structures I'm dealing with:
    Request
    > has Operations (being split and aggregated), for example a list of "Retrieve id=1, Retrieve id=2".

    The service activator for each Operation throwns an exception, which ends up in error-channel that has some components attached to it to deal with the exception message. Previously I would simply try to direct the error message passed to an error channel to the aggregator (like the sample), but it would complain that CorrelationId is null - "Null correlation not allowed. Maybe the CorrelationStrategy is failing?" Why is the CorrelationId not propagated from original message?

    So I decided to inspect the error message and realized that we have "failedMessage" inside of it and it has the appropriate CorrelationId in its header. So I decided to insert a transformer and create a new message with desired payload and tweaking the headers before sending the error msg to aggregator.

    Copy all headers from failed original message. This gets the correlationId like I want and aggregator works properly, but I seem to loose the message after that (I noticed that errorChannel has a message sitting in it, while non-error messages do not have this).

    What do I need to do in order to properly get the failed message to the aggregator and out along with other success results?

    I have the following chain:

    Code:
    	<!-- Gateway -->
    	<int-ws:inbound-gateway id="passport-integration-services"
    		request-channel="primaryRequestChannel" reply-channel="primaryResponseChannel"
    		error-channel="operationExceptionChannel" />
    
    	<int:channel id="primaryRequestChannel">
    	</int:channel>
    
    	<!-- When an error is thrown, it gets to this error channel -->
    	<int:channel id="operationExceptionChannel">
    	</int:channel>
    
    	<!-- Delegates exception message transformation to the operationExceptionTransformer. 
    		Produces Error and directs it to Aggregator(via operationHandlerChannel). -->
    	<int:transformer input-channel="operationExceptionChannel"
    		output-channel="operationResponseOutputChannel" ref="operationExceptionTransformer" />
    
            ... (splitter > routers > service activator> result ends up in channel below)
    	
    	<int:publish-subscribe-channel id="operationResponseOutputChannel" />
    
    	<!-- Aggregates operation results into a Response -->
    	<int:aggregator input-channel="operationResponseOutputChannel"
    		method="aggregate" ref="responseAggregator" output-channel="responseOutputPojoChannel">
    	</int:aggregator>
    Thank you.
    Last edited by yermolovd; Jul 23rd, 2013, 02:28 PM.

  • #2
    When copying the failedMessage headers, DO NOT replace the replyChannel header or errorChannel header. If you are using a MessageBuilder, use copyHeadersIfAbsent().

    The "old" headers in the failedMessage are spent; you must use those headers from the ErrorMessage.

    In 3.0, we have added warning messages for this case (where someone uses an invalid channel header).

    Comment


    • #3
      But I see you are trying to aggregate the error with good results.

      This won't work because the messages in the aggregator still have the stale channels in their headers; when the framework tries to aggregate the headers, it will detect different values and won't know what to do (so it simply removes the replyChannel from the aggregated result).

      See this thread where the poster had a similar issue; I posted a solution towards the end of the thread.

      http://forum.springsource.org/showth...with-agregator

      Comment


      • #4
        Yea, that is what we wanted to do. I have already seen this, exactly how you described. This was going to be my next step to resolve. Thanks for letting me know it won't work.
        Reason why we want to aggregate good and bad results is we are treating each Operation as "transaction" and if it fails, we wanted to maintain the operation results for each and return results to the client. I'll look at the other thread. Thank you.

        Comment


        • #5
          The main thing to understand from that gist is that you insert an error-handling gateway and the reply message from that gateway is sent to the aggregator; that way, the aggregated messages will always have the same replyChannel header, regardless of whether the flow downstream of the interim gateway succeeded or failed.

          Comment


          • #6
            So I tried to simplify that example in another thread. I chopped off the second error handling gateway, looks like it is extra stuff that is not important for a basic setup.
            Please let me know if I understand the underlying flow correctly:

            Message comes in to "gateway" > goes out to "input" > first error handling activator takes it, uses "service1gw" which eventually sends it to the "service1", the actual service.
            For certain case, "service1" throws an error: error goes to error-channel of "service1gw", which sends it off to "ec" and then ends up in an aggregator "agg", then out to reply.
            For good condition: "service1" completes, gets back to "service1gw", then back to error handling activator, which puts it to output-channel and "agg" agregator and then out to "reply".

            Do I have the right flow? If so, I'm going to try to integrate this sort of structure in my application tomorrow.

            Code:
            	<int:gateway id="gateway" service-interface="foo.Sample$MyGateway"
            		default-reply-channel="reply" default-request-channel="input" />
            
            	<int:channel id="reply" />
            
            	<!-- use a pub-sub channel to insert aggregation headers -->
            	<int:publish-subscribe-channel id="input"
            		apply-sequence="true" task-executor="exec" />
            	<task:executor id="exec" pool-size="5" />
            
            	<!-- First error-handling gateway -->
            	<int:service-activator ref="service1gw"
            		input-channel="input" output-channel="agg" />
            
            	<int:gateway id="service1gw" default-request-channel="input1"
            		error-channel="ec" />
            
            	<!-- Error flow ("catch" block) -->
            	<int:channel id="ec" />
            	<int:transformer input-channel="ec" ref="foo" method="errorHandler" />
            
            	<!-- Aggregates returned results from the error handling gateways -->
            	<int:aggregator input-channel="agg" ref="foo" method="aggregate"
            		output-channel="reply" />
            
            	<int:channel id="input1" />
            	<int:service-activator id="service1"
            		input-channel="input1" ref="foo" method="service" />
            
            	<!-- bean with methods for services and transformer -->
            	<bean id="foo" class="foo.Sample" />

            Comment


            • #7
              Yes; that is correct - but in this case it looks like you don't need the pub/sub channel and task executor; the correlation stuff is added later by your splitter later in your flow. I'd need to understand the threading on your downstream flow to advise further.

              Comment


              • #8
                I put in the changes and am attaching the flow configuration below.
                It works for separate cases: positive and exception.

                It does not work in case where I send 2 operations in one request and last one fails for example. My aggregator only gets the second Error message, but not the successful 1st operation result. By the order of things, I see a successful execution of Operation 1 in service activator, but it never goes to the aggregator (CorrelatingMessageHandler), then Operation 2 throws an exception in service activator and gets to the aggregator. For Operation 1 I don't see what channel it is actually sending it to, because it uses the header temporary channel and I'm not clear where it ends up.

                The flow has more than bare example with routing etc, but I'm not sure if it really affects things, but I'm still considering stripping out all extras to get to the root of the problem.

                My message looks like:
                Request - has information processed by "Session" handlers, before the "error gateway" that I added per the other thread, which I think can be ignored for this case as there's no problems there. I want this to be separate and not reach aggregator, this is more high level that is handled by the initial gateway error channel.
                > Operation 1 (succeeds)
                > Operation 2 (fails)

                Please let me know if you can spot anything.

                Thank you.

                Code:
                	<int-ws:inbound-gateway id="passport-integration-services"
                		request-channel="gatewayInputChannel" reply-channel="primaryResponseChannel"
                		error-channel="processErrorChannel" />
                
                	<int:channel id="gatewayInputChannel">
                		<int:interceptors>
                			<!-- Load preferences -->
                			<ref bean="preferencesLoader" />
                			<ref bean="loggingChannelInterceptorAdapter" />
                		</int:interceptors>
                	</int:channel>
                
                	<!-- SOAP transformer: converts XML to service POJOs -->
                	<si-xml:unmarshalling-transformer id="soapUnmarshallingTransformer"
                		input-channel="gatewayInputChannel" output-channel="preProcessRequestChannel"
                		unmarshaller="jaxbMarshaller" />
                
                	<!-- Send message for Authentication first, then if passed authentication, 
                		for processing -->
                	<int:recipient-list-router id="requestRouter"
                		input-channel="preProcessRequestChannel">
                		<!-- Send Request for authentication -->
                		<int:recipient channel="sessionChannel" />
                		<!-- Send Request for processing -->
                		<int:recipient channel="processChannel" />
                	</int:recipient-list-router>
                
                	<!-- Activate SessionProcessor on Request -->
                	<int:service-activator input-channel="sessionChannel"
                		ref="sessionProcessor">
                	</int:service-activator>
                
                	<!-- Error Gateway preceeding the actual service -->
                	<int:service-activator id="processActivator" ref="processGateway"
                		input-channel="processChannel" output-channel="operationResponseOutputChannel" />
                
                	<int:gateway id="processGateway" default-request-channel="splitterChannel"
                		error-channel="processErrorChannel" />
                
                	<!-- ************************ ERROR HANDLING COMPONENTS START ************************ -->
                	<!-- When an error is thrown, it gets to this error channel -->
                	<int:channel id="processErrorChannel">
                	</int:channel>
                
                	<!-- Checks error contents and if TRUE: directs to primary response channel 
                		(in case of Request level exception, like Session processor failed), if FALSE: lets the error gateway send it to aggregator (in 
                		case of Operation exception). -->
                	<int:filter input-channel="processErrorChannel" ref="exceptionFilter"
                		output-channel="requestExceptionChannel" discard-channel="operationExceptionChannel" />
                
                	<!-- Delegates Throwable transformation to the requestExceptionTransformer. 
                		Produces Response with Error and directs it to Marshalling/Output. -->
                	<int:transformer input-channel="requestExceptionChannel"
                		output-channel="responseOutputPojoChannel" ref="requestExceptionTransformer" />
                
                	<!-- Delegates Throwable transformation to the operationExceptionTransformer. 
                		Produces Message with Error and gateway directs it to Aggregator(via operationHandlerChannel). -->
                	<int:transformer input-channel="operationExceptionChannel"
                		ref="operationExceptionTransformer" />
                	<!-- ************************ ERROR HANDLING COMPONENTS END ************************ -->
                
                	<!-- Split big Request into Operations (Create, Retrieve etc.) -->
                	<int:splitter input-channel="splitterChannel" ref="requestSplitter"
                		output-channel="operationHandlerChannel" />
                
                	<!-- Activate RequestHandler on each Operation (Create, Retrieve etc.) -->
                	<int:service-activator input-channel="operationHandlerChannel"
                		ref="operationHandler">
                	</int:service-activator>
                
                	<!-- Aggregates operation results, such as Create, Retrieve etc. into a Response -->
                	<int:aggregator input-channel="operationResponseOutputChannel"
                		method="aggregate" ref="responseAggregator" output-channel="responseOutputPojoChannel">
                	</int:aggregator>
                
                	<!-- POJO transformer: converts service POJOs to XML -->
                	<si-xml:marshalling-transformer id="soapMarshallingTransformer"
                		input-channel="responseOutputPojoChannel" output-channel="primaryResponseChannel"
                		marshaller="jaxbMarshaller" result-transformer="resultToStringTransformer" />
                
                	<!-- Primary output channel -->
                	<int:channel id="primaryResponseChannel">
                		<int:interceptors>
                			<ref bean="loggingChannelInterceptorAdapter" />
                		</int:interceptors>
                	</int:channel>

                Comment


                • #9
                  You need to put the error handling gateway AFTER the splitter - that way, each split result (success or otherwise) goes to the aggregator.
                  Last edited by Gary Russell; Jul 24th, 2013, 12:41 PM.

                  Comment


                  • #10
                    And that WORKS! Thank you very much.

                    Comment

                    Working...
                    X