Announcement Announcement Module
Collapse
No announcement yet.
Spring Integration Help - newbie Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Spring Integration Help - newbie

    Hello,
    I am trying to use spring and spring integration in a new project that i am working on, and i was wondering how do i accomplish this with spring integration. Below is the scenario i have.

    I have the spring integration app set up on the back end with a generic servlet. I am NOT using spring-mvc, as our project does not need it, thus i have configured the war using spring and a generic servlet. I use spring integration because i read that it has very poweful capabilites. To tap into spring integration from the code which the servlet calls, i use the spring integration gateway. What the gateway then needs to do is call three different webservices and aggregate the response, and store it as one result set.

    To achieve the above i have the gateway call a pub sub channel which spawns two threads, one to get make a call to 1 web service, the other two do some transformation and make the remaining calls to two other web services. The two other calls are made via a pub sub channel as well which spawns two different threads and both these threads call a web service. what i am trying to do based on my little knowledge on spring integration after the above 3 major web service calls via different threads at different points, i have all the web service response into the same aggregator at the end, as this is the place where i need to combine the three different responses to be able to use it. I could have done the above processes synchronously, i.e make 1 web service calls after the other but then that would be slow, and that's why i use this approach which i believe will help me a lot in terms of efficiency.

    The problem i face in the above is if i have the apply sequence property set to true on the pub sub channels, the aggregation of all the three messages do not happen as intended.. i get a batch of 2 and 1, thus i thought, i'd set up a custom release strategy, and to do this i make the apply sequence property to false, and now i am able to release the aggregation after the three responses come back... and the release strategy i have set up is a basic one, which says if payload.size() == 3, then release, which works fine for one request to the gateway and then one response from the aggregation. But now i do not how would i scale this to multiple requests to the gateway from different threads as in the ideal world all these requests would be made from different browsers, which will thus be multithreaded, and i am afraid when this situation happens, how would it recognize which of the 3 payloads it should consider to be able to release it?. to add to a note is, when i tried two requests to this process one after the other, then the first request worked fine as expected, but then the second request got lost somewhere and the release strategy was never called. so the second request was not served.

    what i need to know is how should i use spring integration to accomplish my needs, i am sure there must be a fairly simple way to do this, but i am not just able to go about it. Any suggestions would be greatly appreciated.

    I am not sure if the above makes any sense, but if you have any questions, please let me know, and i'll try my best to explain myself.

  • #2
    apply-sequence on the pub-sub channel does 2 things - it sets the correlationId header and sets a sequence number header.

    The correlation id is the mechanism by which the aggregator knows to which group a message belongs.

    So, you either need to set the correlation id, or simply use apply-sequence and just ignore the sequence number in your custom release strategy.

    Comment


    • #3
      how do i ignore the sequence number in the custom release strategy. Below is my aggregator def.
      <int:aggregator id="profileAggregator"
      input-channel="customerProfileResponseChannel" output-channel="customerProfileResponse"
      ref="customerProfileResponseMapper" method="getCustomerProfileFromCustomerProfileRetri eveResponse"
      correlation-strategy="customerProfileResponseMapper" correlation-strategy-method="applyCorrelationStrategy"
      release-strategy="customerProfileResponseMapper" release-strategy-method="isAggregationComplete" />
      and below are the custom methods for release strategy and correlation strategy:

      @Override
      public boolean isAggregationComplete(List<?> payload) {
      log.info("isAggregationComplete()" + payload.size());
      return (payload.size() == 3) ? true : false;
      }

      @Override
      public boolean applyCorrelationStrategy(Object payload) {
      log.info("applyCorrelationStrategy()" + payload);
      return true;
      }

      while using the above i have the apply-sequence set to true on the pub-sub channel.

      what happens here is that the aggregation method is called only twice, i am expecting it to be called thrice so that it can aggregate the 3 different process.

      above u mentioned to ignore the sequence number(how do i ignore it where do i get the sequence no.) the input parameters that come into this method are a list of payload on the release strategy and an Object payload on the correlation strategy.

      please keep in mind that the end goal is to have this run without fail when multiple threads are trying to come get the data from this entire process mentioned above.

      Comment


      • #4
        Attachment

        please see the attached jpg for what i am trying to explain.

        if apply sequence is set to true or not, the release strategy method is only called when the server initially starts up, after that once it released or not, for any further requests the release strategy is never invoked...
        Attached Files

        Comment


        • #5
          I suggest you run with debug logging and follow the messages through.

          You are already ignoring the sequence number by using a POJO for your release strategy.

          Comment


          • #6
            I have changed my aggregator to the following so that it implements the default correlation and release strategy
            <int:aggregator id="profileAggregator"
            input-channel="customerProfileResponseChannel" output-channel="customerProfileResponse"
            ref="customerProfileResponseMapper" method="getCustomerProfileFromCustomerProfileRetri eveResponse"
            />
            After enabling debugging i see the following:

            DEBUG: org.springframework.integration.aggregator.Abstrac tCorrelatingMessageHandler - Handling message with correlationKey [73868882-b9e1-400a-a40a-e717f1d91df2]: [Payload=com.aa.ct.custhub.loyaltymemberinformation v1.MemberInformationRetrieveResponse@1b7b061][Headers={timestamp=1343687713274, id=97d0cabe-a06b-49d0-b8c1-ff3ea26b495e, errorChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@d8e140, correlationId=73868882-b9e1-400a-a40a-e717f1d91df2, replyChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@d8e140, sequenceSize=2, sequenceNumber=2}]

            DEBUG: org.springframework.integration.aggregator.Abstrac tCorrelatingMessageHandler - Handling message with correlationKey [73868882-b9e1-400a-a40a-e717f1d91df2]: [Payload=com.aa.ct.custhub.managecustomerprofilev3. CustomerProfileRetrieveResponse@3a0ddf][Headers={timestamp=1343687713646, id=fc25b5df-f908-4e85-8a1b-b52142a6886e, errorChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@d8e140, replyChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@d8e140, correlationId=73868882-b9e1-400a-a40a-e717f1d91df2, sequenceSize=2, sequenceNumber=1}]

            DEBUG: org.springframework.integration.aggregator.Abstrac tCorrelatingMessageHandler - Completing group with correlationKey [73868882-b9e1-400a-a40a-e717f1d91df2]

            DEBUG: org.springframework.integration.aggregator.Abstrac tCorrelatingMessageHandler - Handling message with correlationKey [73868882-b9e1-400a-a40a-e717f1d91df2]: [Payload=com.aa.ct.custhub.customerindexv2.Retrieve CustomerIndexResponse@1cae63b][Headers={timestamp=1343687714107, id=cc349d26-520d-43e2-ad35-bd26eac92383, errorChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@d8e140, replyChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@d8e140, correlationId=73868882-b9e1-400a-a40a-e717f1d91df2, sequenceSize=2, sequenceNumber=1}]

            All the three messages have the same correlation id=73868882-b9e1-400a-a40a-e717f1d91df2. But why is it grouping after the first 2 messages and not all the three since the correlation ids are the same.

            Please see the diagram for how the three messages are expected to come to the aggregator for an end result. Based on the documentation by default if the correlation id is the same it should group all the three messages together, but it is not doing in our case. Please help.. i have been struggling with this for a long time now and it does not work. Where am i going wrong.? or what would be a correct way to implement to achieve the expected result.

            Comment


            • #7
              the timestamps above in the messages are as follows:
              1343687713274=Mon Jul 30 18:35:13 EDT 2012
              1343687713646=Mon Jul 30 18:35:13 EDT 2012
              1343687714107=Mon Jul 30 18:35:14 EDT 2012

              in case it helps. Please remember that the above three messages are coming from different channels spawned by different threads as per the diagram.

              Comment


              • #8
                Look at sequenceSize=2, sequenceNumber=2. You are saying that your sequence size is 2 and message with sequence number 1 and 2 came in. That satisfies the default correlation strategy.
                Correlation id is what tells the aggregator how to aggregate (which messages are related), but not when to release. That is the release strategy and default one is based on matching Sequence size with sequence number of Messages.

                Comment


                • #9
                  So how would i achieve what i am trying to do. Please see the diagram for what i am doing. The messages that are coming in the aggregator based on what is defined in the configuration xmls are 3. one from an earlier and thread and 2 from a latter thread, but those messages are related and need to be aggregated in the end to be processed simultaneously. I have tried implementing a release strategy if you read from my earlier post, but that fails, it only get's called the first time during server start up, and on subsequent requests it does not get called. If i were to implement a custom correlation strategy, how would i do it, as when i ran the app in debug mode, all the messages came into the correlation method separately:

                  @Override
                  public boolean applyCorrelationStrategy(Object payload) {
                  log.info("applyCorrelationStrategy()" + payload);
                  return true;
                  }


                  and the message that came in were the objects that i am expecting and not the header information. so how do i accomplish what i am trying to do here.?

                  Comment


                  • #10
                    does some one have further insight into this..? Please let me know if i am not being clear in what i am trying to achieve.? Thanks.!

                    Comment


                    • #11
                      anyone has an idea, or the above can't be done with spring integration?

                      Comment

                      Working...
                      X