Announcement Announcement Module
Collapse
No announcement yet.
TCP Collaborative Adapters Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • TCP Collaborative Adapters

    Here is my flow :

    Scenario 1:
    XML => TCP Gateway -> XmlTransformer -> Splitter [with Dispatcher] -> Service Activator[Parallel Treatment] -> Aggregator -> XmlTransformer -> Return to Gateway.

    Scenario 1:
    XML => TCP Inbound Adapter -> XmlTransformer -> Splitter [with Dispatcher] -> Service Activator[Parallel Treatment] -> Aggregator -> XmlTransformer -> TCP Outbound Adapter.

    In both of my scenario, I have problems after the aggregator.

    In scenario 1, the reply channel is a temporary channel for the gateway return value. It bypasses, the last XML Transformer.

    In scenario 2, the aren't any reply channel set and I get the following exception after aggregation : org.springframework.integration.MessageHandlingExc eption: java.lang.IllegalArgumentException: no outputChannel or replyChannel header available.

    I've tried to set it with header-enricher, without success.

    Here is my context [for scenario 2]:
    Code:
    <ip:tcp-inbound-channel-adapter channel="tcpChannel" connection-factory="server"/>
    <ip:tcp-outbound-channel-adapter channel="replyChannel" connection-factory="server"/>
    
    <chain input-channel="tcpChannel" output-channel="parallelTxChannel"  >
       <service-activator ref="tcpProcess" method="process" />	
    </chain>
    	
    <chain input-channel="parallelTxChannel" output-channel="replyChannel">
    	<splitter ref="splitterBean" />
    	<service-activator ref="timerBean" />
    	<service-activator ref="aggregatorBean" />
    	<service-activator ref="xmlTransformer"/>	
    </chain>
    
    
    <channel id="parallelTxChannel">
    	<dispatcher task-executor="asmTxExecutor"/>
    </channel>
    
    <task:executor id="asmTxExecutor" pool-size="2"/>
    
    <beans:bean id="aggregatorBean" class="org.springframework.integration.aggregator.CorrelatingMessageHandler">
      <beans:constructor-arg>
    	<beans:bean class="com.ibm.brmt.ats.MessageGroupProcessorImpl"/>
      </beans:constructor-arg>
    </beans:bean>
    ---------------------------------------------------------------------

    Any idea why I get this exception after aggregation and why it's not chained to the last xml Transformer?

    Also, the documentation says we should prefer collaborative adapters (Scenario 2) to gateway (Scenario 1)... So i have tried it thinking it would solve my problem, is that a good avenue?

    The whole flow is not a transaction, the flow handles multiples parallel independent of each other transactions. Is that what you call nested transaction?

    Any help would be greatly appreciated!!
    Last edited by Olivier Quirion; Mar 29th, 2011, 11:27 AM.

  • #2
    I believe you are seeing this because you are wrapping the aggregator in a <service-activator/>. The implied output channel is assigned to the service activator. The aggregator itself has no output channel, so it sends the output to the reply channel in the header.

    If you change the <service-activator/> to an <aggregator/> you should be ok.

    (Or define the last two outside of a chain).

    BTW, please use [ code ] and [ /code ] tags around xml and code (no spaces inside the brackets).

    Comment


    • #3
      Hi Gary, thanks for the quick response.


      This is what i've tried and got the following exception:

      Code:
      <chain input-channel="parallelTxChannel" output-channel="replyChannel">
      	<splitter ref="splitterBean" />
      	<service-activator ref="timerBean" />
      	<aggregator ref="aggregatorBean" />
      	<service-activator ref="xmlTransformer"/>	
      </chain>
      
      //Here is the cut down stack trace.
      Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.springframework.integration.aggregator.CorrelatingMessageHandler#21962196': Cannot create inner bean '(inner bean)' of type [org.springframework.integration.aggregator.MethodInvokingMessageGroupProcessor] while setting constructor argument; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name '(inner bean)': Instantiation of bean failed; nested exception is org.springframework.beans.BeanInstantiationException: Could not instantiate bean class [org.springframework.integration.aggregator.MethodInvokingMessageGroupProcessor]: Constructor threw exception; nested exception is java.lang.IllegalArgumentException: Found ambiguous parameter type [interface java.util.List] for method match: [public final java.lang.String org.springframework.integration.context.IntegrationObjectSupport.getComponentName()]
      ...
      Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name '(inner bean)': Instantiation of bean failed; nested exception is org.springframework.beans.BeanInstantiationException: Could not instantiate bean class [org.springframework.integration.aggregator.MethodInvokingMessageGroupProcessor]: Constructor threw exception; nested exception is java.lang.IllegalArgumentException: Found ambiguous parameter type [interface java.util.List] for method match: [public final java.lang.String org.springframework.integration.context.IntegrationObjectSupport.getComponentName()]
      ...
      Caused by: org.springframework.beans.BeanInstantiationException: Could not instantiate bean class [org.springframework.integration.aggregator.MethodInvokingMessageGroupProcessor]: Constructor threw exception; nested exception is java.lang.IllegalArgumentException: Found ambiguous parameter type [interface java.util.List] for method match: [public final java.lang.String org.springframework.integration.context.IntegrationObjectSupport.getComponentName()]
      ...
      Caused by: java.lang.IllegalArgumentException: Found ambiguous parameter type [interface java.util.List] for method match: [public final java.lang.String org.springframework.integration.context.IntegrationObjectSupport.getComponentName()]
      It seems as if I have to specify a 'method' for the aggregator, but I have no idea which one.
      Last edited by Olivier Quirion; Mar 29th, 2011, 11:29 AM.

      Comment


      • #4
        Looking at your original post, you are injecting a custom MessageGroupProcessor into the aggregator.

        We generally recommend using a POJO for such customization, rather than tying your implementation to framework APIs. Then, a framework component (MethodInvokingMessageGroupProcessor) will invoke your POJO.

        There is no way to override the default MGP using the namespace support (I presume that's why you went with a <bean/> definition). You can either reimplement your custom MGP code as a POJO or use discrete component definitions instead of a <chain/>.

        Comment


        • #5
          Here is what I did to solve this problem:

          Code:
          <chain input-channel="tcpInputChannel" output-channel="parallelTxChannel">
          	<transformer ref="unmarshallingTransformer" />
          	<splitter ref="splitter" />
          </chain>
          
          <chain input-channel="parallelTxChannel" output-channel="tcpOutpuChannel">
          	<service-activator ref="transactionExecutorService" />
          	<aggregator ref="aggregator" />
          	<transformer ref="marshallingTransformer" />
          </chain>
          The key here is to have 2 different chains because it's a 1 to many relationship (splitter/aggregator) and they can't be part of the same chain structure.

          I've also used POJO implementation for splitter/aggregator :

          Code:
          public class SplitterImpl {
          	
          	public Object splitMessage(AtsRequest atsRequest) {
                return atsRequest.getRequests();
          	}	
          }
          
          public class AggregatorImpl {
          
          	public AtsResponse aggregate(List<Response> responses) {
          		AtsResponse atsResponse = new AtsResponse();
          		
          		for (Response response : responses) {
          			atsResponse.getResponses().add(response);
          		}
                return atsResponse;
          	}
          }

          Comment

          Working...
          X