Announcement Announcement Module
Collapse
No announcement yet.
MethodInvokingAggregator requires the 'correlationKey' property? Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • MethodInvokingAggregator requires the 'correlationKey' property?

    Hello,

    I am using SI 1.04. I have configured an aggregator which is thowing an error saying "MethodInvokingAggregator requires the 'correlationKey' property".

    My aggregator follows s publish-subscribe channel with the apply-sequence="true" (See below):

    <siublish-subscribe-channel id="someRequests" task-executor="taskExecutor" apply-sequence="true"/>

    I have configured my aggregator like this:

    <si:aggregator
    input-channel="myInputChannel"
    method="aggregateMessages"
    output-channel="myResponseChannel"
    timeout="5000">
    <bean class="somepackage.MyMessageAggregator"/>
    </si:aggregator>

    "MyMessageAggregator" does not extend any other class. The "aggregateMessages" method conforms to the signature of:

    public SomeObject aggregateMessages(List<?> items)

    I am using the "?" because I can receive two different types of messages.

    Based on the SI documentation, it looks like the MethodInvokingMessageAggregator defaults the CompletionStrategy to SequenceSizeCompletionStrategy and the CorrelationStrategy to HeaderAttributeCorrelationStrategy.

    If my understanding is correct, shouldn't the HeaderAttributeCorrelationStrategy be providing this value?

  • #2
    Yes, the default correlation strategy *should* find that value, and the 'apply-sequence' flag on the publish-subscribe-channel *should* be providing it.

    Is there any chance that you could boil this down to a simple, failing unit-test and attach it?

    Thanks,
    Mark

    Comment


    • #3
      I think I know what it is...

      Mark,

      I appreciate your help. I think I know what the problem is... Here is the message flow:

      publish-subscribe-channel --> Transformer --> Header Enricher --> HTTP Gateway --> Response --> Aggregator

      The Transformer --> Header Enricher --> HTTP Gateway --> Response leg is repeated for a parallel leg. I suspect that the information placed in the header from the publish-subscribe-channel is lost after the HTTP Gateway and thats why the aggregator is complaining

      Questions:

      1. Both the Transformer and Header Enricher are within a Chain. Is there a way to conveniently dump the contents of headers (and for future knowledge the payload) within a Chain similar to <stream:stdin-channel-adapter id="stdin"/>?
      2. If my suspicion is correct, is there anyway to copy the header values from the input message to the output message within the http gateway?

      Comment


      • #4
        Found out how to log headers...

        Reading the documentation:

        <si:channel id="in">
        <si:interceptors>
        <si:wire-tap channel="logger"/>
        </si:interceptors>
        </si:channel>

        <si:logging-channel-adapter id="logger" log-full-message="true" level="DEBUG"/>

        log-full-message="true" will log the values of the headers (default = false)

        Comment


        • #5
          By the way, in 2.0 M4, you can even provide an "expression" attribute on the <logging-channel-adapter/> instead and evaluate any SpEL expression against the Message payload/headers, e.g. "headers.foo".

          Comment


          • #6
            Is there a way to copy header attributes from input message?

            Mark,

            Is there a way to copy the SEQUENCE_SIZE, SEQUENCE_NUMBER and CORRELATION_ID headers added by the publish-subscribe-channel back to the output message returned from the http gateway? I think this is why the aggregator is complaining.

            Comment


            • #7
              Mark,

              I put together a brief example that demonstrates the exception mentioned earlier. I am sure that is something we missed. Thanks for taking a look.

              -Joshua

              Comment


              • #8
                Ok, so i tried your zip file and i could not get it to run since it hangs on your http-gateways or so, but based on your description i created a watered down version of the scenario that you describe, which is Gateway -> PubSub-Channel -> two HttpGateways -> Aggregator

                Here is the configuration:
                Code:
                <si:gateway id="enhancedTimelineGateway" 
                                service-interface="integration.EnhancedTimelineGateway"
                                default-request-channel="enhancedTimelineRequest"/>
                
                <si:publish-subscribe-channel id="enhancedTimelineRequest" apply-sequence="true"/>
                
                <si-http:outbound-gateway
                		request-channel="enhancedTimelineRequest"
                		default-url="http://localhost:8080/test"
                		extract-request-payload="false"
                		charset="UTF-8"
                		request-timeout="1234"	
                		reply-channel="aggregatorInputChannel"/>
                	
                <si-http:outbound-gateway
                		request-channel="enhancedTimelineRequest"
                		default-url="http://localhost:8080/test"
                		extract-request-payload="false"
                		charset="UTF-8"
                		request-timeout="1234"	
                		reply-channel="aggregatorInputChannel"/>
                
                <si:aggregator input-channel="aggregatorInputChannel"  method="prepareEnhancedTimeline">
                        <bean class="integration.MessageAggregator"/>
                </si:aggregator>
                Unfortunately everything works and all the headers are copied and the final message is successfully aggregated.

                Let me know what I might be missing. Better off,provide a bare minimum version of your system that replicates the problem. Current version has 36 classes in it

                Comment


                • #9
                  Default request method for http outbound gateway?

                  Oleg, thanks for taking the time to look into this. Just out of curiosity, I was using a separate header enricher to add the following header:

                  springintegration_http_requestMethod=GET

                  Does the http outbound gateway always use a GET (or at least by default)? Reading the documentation, it is unclear how this behaves. If so, I could certainly simplify my configuration.

                  -Joshua

                  Comment


                  • #10
                    Joshua

                    The default one is actually POST, so i added header-enricher in my config and everything still works as expected:
                    Code:
                    <si:gateway id="enhancedTimelineGateway" 
                                    service-interface="integration.EnhancedTimelineGateway"
                                    default-request-channel="enrichereChannel"/>
                                    
                    <si:header-enricher input-channel="enrichereChannel" output-channel="enhancedTimelineRequest">
                         <si:header name="springintegration_http_requestMethod" value="GET"/>
                    </si:header-enricher>
                    
                    <si:publish-subscribe-channel id="enhancedTimelineRequest" apply-sequence="true"/>
                        
                    <si-http:outbound-gateway
                    		request-channel="enhancedTimelineRequest"
                    		default-url="http://localhost:8080/test"
                    		charset="UTF-8"
                    		request-timeout="1234"	
                    		reply-channel="aggregatorInputChannel"/>
                    
                    <si-http:outbound-gateway
                    		request-channel="enhancedTimelineRequest"
                    		default-url="http://localhost:8080/test"
                    		charset="UTF-8"
                    		request-timeout="1234"	
                    		reply-channel="aggregatorInputChannel"/>
                    	
                    <si:aggregator input-channel="aggregatorInputChannel"  method="prepareEnhancedTimeline">
                            <bean class="integration.MessageAggregator"/>
                    </si:aggregator>
                    Can you try to use this config as basis for yours and see where it will take you?

                    Comment


                    • #11
                      Two different gateways and messages

                      Oleg,

                      As you know from my example, I need two call two different services that each have different request/response messages. This is what makes this example more complicated. Before each gateway, I have a transformer that transforms the original message into a valid request message for each of the two services.

                      The reply messages (which are different), are sent to the same channel where they are unmarshalled into object based messages and then sent to the aggregator.

                      To test things out, I suppose I could use duplicate gateways and request/response messages.

                      Is there a chance that the headers are being lost in any of the extra steps that I mentioned (transformers, unmarshaller)?

                      -Joshua

                      Comment


                      • #12
                        Possible, let me look at it in the morning. I also try to use your transformers as a model

                        Comment


                        • #13
                          User Error

                          Oleg,

                          I apologize, this appears to be user error. If you look at the gateway class, you will see the following annotation:

                          @Gateway(requestChannel="enhancedTimelineRequest", replyChannel="enhancedTimelineResponse")

                          But the request channel is actually specified as:

                          <siublish-subscribe-channel id="enhancedTimelineRequests" task-executor="taskExecutor" apply-sequence="true"/>

                          I did not receive any error indicating that the "enhancedTimelineRequest" channel didn't exist. I would like to better understand this behavior. When to channels need to be explicitly defined, and when do they not?

                          -Joshua

                          Comment


                          • #14
                            OK, i am glad you've figured it out, because i've just added the transformer and everything still works fine.
                            Here is the config I am using:
                            Code:
                            <si:gateway id="enhancedTimelineGateway" 
                                            service-interface="integration.EnhancedTimelineGateway"/>
                                            
                            <si:header-enricher input-channel="enricherChannel" output-channel="enhancedTimelineRequest">
                                	<si:header name="springintegration_http_requestMethod" value="GET"/>
                            </si:header-enricher>
                            
                            <si:publish-subscribe-channel id="enhancedTimelineRequest" apply-sequence="true"/>
                                
                            <si:transformer input-channel="enhancedTimelineRequest" output-channel="httpGatewayA-channel">
                                	<bean class="integration.TwitterMessageTransformer"/>
                            </si:transformer>
                            <si-http:outbound-gateway
                            		request-channel="httpGatewayA-channel"
                            		default-url="http://localhost:8080/test"
                            		charset="UTF-8"
                            		request-timeout="1234"	
                            		reply-channel="aggregatorInputChannel"/>
                            		
                            <si:transformer input-channel="enhancedTimelineRequest" output-channel="httpGatewayB-channel">
                                	<bean class="integration.WeatherUndergroundMessageTransformer"/>
                            </si:transformer>	
                            <si-http:outbound-gateway
                            		request-channel="httpGatewayB-channel"
                            		default-url="http://localhost:8080/test"
                            		charset="UTF-8"
                            		request-timeout="1234"	
                            		reply-channel="aggregatorInputChannel"/>
                            	
                            <si:aggregator input-channel="aggregatorInputChannel"  method="prepareEnhancedTimeline">
                                    <bean class="integration.MessageAggregator"/>
                            </si:aggregator>
                            As far as the second question, you don't have to define channels explicitly if:
                            1) there is an element that contains attribute input-channel
                            2) you are OK with it being Direct channel
                            For example, this configuration is sufficient:
                            Code:
                            <service-activator input-channel="foo".../>
                            foo channel will be auto-created as Direct channel.
                            However if you want it to be any other channel (pub-sub, queue etc.), then you would have to define it explicitly.

                            Comment


                            • #15
                              ConversionNotSupportedException

                              My next headache...

                              If I take out my unmarshaller and have two different message types returned to my aggregator, everything works fine. My message signature looks like this:

                              public EnhancedTimeline prepareEnhancedTimeline(List<?> items)

                              I am using the wildcard because I expect two different types of messages to be flowing into the aggregator.

                              When I put my unmarshaller (which uses jaxb to unmarshall to two different objects) in between, I receive the following exception:

                              Exception in thread "main" org.springframework.beans.ConversionNotSupportedEx ception: Failed to convert value of type 'org.springframework.integration.transformer.Messa geTransformationException' to required type 'integration.EnhancedTimeline'; nested exception is java.lang.IllegalStateException: Cannot convert value of type [org.springframework.integration.transformer.Messag eTransformationException] to required type [integration.EnhancedTimeline]: no matching editors or conversion strategy found
                              at org.springframework.beans.SimpleTypeConverter.conv ertIfNecessary(SimpleTypeConverter.java:53)
                              at org.springframework.beans.SimpleTypeConverter.conv ertIfNecessary(SimpleTypeConverter.java:41)
                              at org.springframework.integration.gateway.GatewayPro xyFactoryBean.invokeGatewayMethod(GatewayProxyFact oryBean.java:210)
                              at org.springframework.integration.gateway.GatewayPro xyFactoryBean.invoke(GatewayProxyFactoryBean.java: 172)
                              at org.springframework.aop.framework.ReflectiveMethod Invocation.proceed(ReflectiveMethodInvocation.java :172)
                              at org.springframework.aop.framework.JdkDynamicAopPro xy.invoke(JdkDynamicAopProxy.java:202)
                              at $Proxy1.getEnhancedTimeline(Unknown Source)
                              at integration.EnhancedTimelineServiceImpl.getEnhance dTimeline(EnhancedTimelineServiceImpl.java:18)
                              at integration.AggregationDemo.performDemo(Aggregatio nDemo.java:32)
                              at integration.AggregationDemo.main(AggregationDemo.j ava:23)
                              Caused by: java.lang.IllegalStateException: Cannot convert value of type [org.springframework.integration.transformer.Messag eTransformationException] to required type [integration.EnhancedTimeline]: no matching editors or conversion strategy found
                              at org.springframework.beans.TypeConverterDelegate.co nvertIfNecessary(TypeConverterDelegate.java:291)
                              at org.springframework.beans.TypeConverterDelegate.co nvertIfNecessary(TypeConverterDelegate.java:105)
                              at org.springframework.beans.SimpleTypeConverter.conv ertIfNecessary(SimpleTypeConverter.java:47)
                              ... 9 more


                              I have tested the unmarshaller earlier and it works fine. The weird thing is that the only class that is interested in creating/returning a EnhancedTimeline class is the aggregator (which is currently just returning an empty EnhancedTimeline object) and it does not complain when the unmarshaller is taken out of the mix.

                              Can someone help me understand what might be happening here?

                              Comment

                              Working...
                              X