Announcement Announcement Module
Collapse
No announcement yet.
Handling connectivity issues in web service outbound gateway (retry) Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Handling connectivity issues in web service outbound gateway (retry)

    Currently, we have a requirement to attempt a WS request, and in the case that the remote endpoint is down, queue the request for retry. This queue should also be backed by a database, so that if the application goes down, the requests will be retried once it is restarted.

    I am hoping that I can achieve this purely with Spring Integration. My thought was that I could use a WS Outbound Gateway, with a message-store-backed queue channel set as the error channel. The error channel would be polled, and the message would be sent to a retry gateway (via Service Activator) which would also hook up to the WS Outbound Gateway, and basically form a processing loop.

    The idea of the Service Activator and retry Gateway was taken from one of Oleg's SpringOne 2GX examples. The polling is happening properly, but I can't figure out how to "complete the loop" and actually get the retry to happen. Oddly, the poller seems to stop after the retry gateway is invoked.

    Here is what I have currently:

    HTML Code:
            <!-- Will replace this with a JDBC store later -->
            <bean id="testMessageStore" class="org.springframework.integration.store.SimpleMessageStore" />
    
            <int:channel id="wsRequestChannel" />
            <int:channel id="wsReplyChannel" />
            <int:channel id="wsErrorChannel">
                <int:queue message-store="testMessageStore" />
            </int:channel>
    
            <int:channel id="testChannel" />
            <int:chain input-channel="wsErrorChannel" auto-startup="true">
                <int:poller fixed-rate="5000">
                    <int:transactional transaction-manager="transactionManager" />
                </int:poller>
                <int:transformer expression="payload.getFailedMessage().getPayload() + ' '" />
                <!-- Everything seems to work fine up to here, but how do I get it to retry the WS? -->
                <int:service-activator ref="retryGateway" />
            </int:chain>
    
            <int:chain input-channel="wsRequestChannel" output-channel="wsReplyChannel">
                <int-ws:header-enricher>
                    <int-ws:soap-action value="theTargetSoapAction" />
                </int-ws:header-enricher>
                <int-ws:outbound-gateway id="testWsGateway" uri="https://someSOAPService" interceptor="wsSecurityInterceptor" />
            </int:chain>
    
            <int:gateway id="mainGateway" default-request-channel="wsRequestChannel" default-reply-channel="wsReplyChannel" error-channel="wsErrorChannel" service-interface="com.mypackage.IntegrationTestService">
                <int:method name="sendMessage" />
            </int:gateway>
            <int:gateway id="retryGateway" default-request-channel="wsRequestChannel" default-reply-channel="wsReplyChannel" error-channel="wsErrorChannel" />

    Can anyone help with this, or offer up suggestions for alternative approaches?
    Thanks!
    Last edited by ach; Jul 18th, 2012, 09:03 AM.

  • #2
    Hello

    If you're going to use persistent MessageStore for <queue>, you just need to configure <transactional> <poller> on <ws:outbound-gateway> and after Exception on service the transaction will be rollbacked and your failed Message won't be removed from MessageStore. And on the next poll the last one will be sended to service again. So, to make simple retry it's enough to configure message-flow like this:
    HTML Code:
     <int:channel id="wsRequestChannel">
       <int:queue message-store="messageStore" />
    </int:channel>
    
    <int:chain input-channel="wsRequestChannel" output-channel="wsReplyChannel">
       <int:poller fixed-rate="5000">
           <int:transactional/>
        </int:poller>
        <int-ws:header-enricher>
            <int-ws:soap-action value="theTargetSoapAction" />
        </int-ws:header-enricher>
        <int-ws:outbound-gateway id="testWsGateway" uri="https://someSOAPService" interceptor="wsSecurityInterceptor" />
    </int:chain>
    However be careful about <gateway> as message-flow entry point: Message Store. "Important" paragraph.

    Take care,
    Artem Bilan

    Comment


    • #3
      Thanks for the reply. The reason why my approach may seem a bit convoluted, is that I would like for the first attempt at the web service call to be synchronous -- only when it fails should it be retried asynchronously. In this scenario, the poller rate would be something much slower: 5 mins, for example.

      Comment


      • #4
        OK, let it be
        But <gateway> is still bad idea for <queue> with persistent MessageStore.
        Nevertheless, how do you plan to process WS-response in the asynchronous retry sub-flow?

        By the way, what you're looking for looks like this: http://forum.springsource.org/showthread.php?111867

        Give me, please, more time to figure out how it should be in your case...

        Comment


        • #5
          Originally posted by Cleric View Post
          OK, let it be
          But <gateway> is still bad idea for <queue> with persistent MessageStore.
          The use of a Gateway was mainly just because it's an easy way to start the message flow for my small proof-of-concept. I don't necessarily have to use it.

          Originally posted by Cleric View Post
          Nevertheless, how do you plan to process WS-response in the asynchronous retry sub-flow?
          Probably just via some service method (is this possible with a Service Activator?).

          Originally posted by Cleric View Post
          By the way, what you're looking for looks like this: http://forum.springsource.org/showthread.php?111867
          Thanks, I will take a look at that.
          Last edited by ach; Jul 18th, 2012, 10:31 AM.

          Comment


          • #6
            Hi, again

            So, I've builded some approximate test for your use case:
            HTML Code:
            <int-jdbc:message-store id="messageStore" data-source="dataSource"/>
            
            	<channel id="input"/>
            
            	<channel id="output">
            		<queue/>
            	</channel>
            
            	<channel id="retryChannel">
            		<!--<queue/>-->
            		<queue message-store="messageStore"/>
            	</channel>
            
            	<gateway service-interface="org.springframework.integration.jdbc.RetryViaMessageStoreTests$TestGateway"
            			 error-channel="retryChannel">
            		<method name="send" request-channel="input"/>
            		<method name="receive" reply-channel="output"/>
            	</gateway>
            
            	<service-activator input-channel="input" output-channel="output" ref="service" method="serviceIt"/>
            
            	<beans:bean id="service" class="org.springframework.integration.jdbc.RetryViaMessageStoreTests$TestService"/>
            
            	<chain input-channel="retryChannel">
            		<poller fixed-rate="100">
            			<transactional/>
            		</poller>
            		<transformer expression="payload.failedMessage.payload"/>
            		<gateway request-channel="input"/>
            	</chain>
            Code:
            @ContextConfiguration
            @RunWith(SpringJUnit4ClassRunner.class)
            public class RetryViaMessageStoreTests {
            
            	@Autowired
            	private TestGateway gateway;
            
            	@Test
            	public void testIt() throws InterruptedException {
            		gateway.send("test");
            		assertEquals("TEST", gateway.receive());
            	}
            
            
            	private static interface TestGateway {
            
            		void send(String s);
            
            		String receive();
            	}
            
            	private static class TestService {
            
            		private AtomicInteger counter = new AtomicInteger();
            
            		public String serviceIt(String s) {
            			System.out.println(counter);
            			if (10 == counter.getAndIncrement()) {
            				return s.toUpperCase();
            			}
            			throw new RuntimeException("intentional");
            
            		}
            
            	}
            
            }
            Here you can replace my <service-activator> with your WS-flow. If you change the 'retryChannel' <queue> to SimpleMessageStore you're going to lose message after first retry effort and your upstream invocation will hang on 'gateway.receive()'.
            My test is passed after 10 retries.
            Nevertheless you should try to use real robust solution with Spring-retry as it will be done after solving the issue: https://jira.springsource.org/browse/INT-343

            Good luck

            Comment


            • #7
              Thank you again for your help!

              I tried adapting your example, but for some reason the retry is only happening once... here is my configuration:

              HTML Code:
                      <int-jdbc:message-store id="testMessageStore" data-source="commonDataSource" region="TestRegion" />
              
                      <int:channel id="wsRequestChannel" />
                      <int:channel id="wsReplyChannel">
                          <int:queue/>
                      </int:channel>
                      <int:channel id="wsRetryChannel">
                          <int:queue message-store="testMessageStore" />
                      </int:channel>
              
                      <int:gateway error-channel="wsRetryChannel" service-interface="com.myorg.myapp.ws.IntegrationTestService">
                          <int:method name="sendMessage" request-channel="wsRequestChannel" />
                          <int:method name="receiveMessage" reply-channel="wsReplyChannel" />
                      </int:gateway>
              
                      <int:chain input-channel="wsRetryChannel">
                          <int:poller fixed-rate="5000">
                              <int:transactional />
                          </int:poller>
                          <int:transformer expression="payload.failedMessage.payload" />
                          <int:gateway request-channel="wsRequestChannel" />
                      </int:chain>
              
                      <int:chain input-channel="wsRequestChannel" output-channel="wsReplyChannel">
                          <int-ws:header-enricher>
                              <int-ws:soap-action value="mySoapAction" />
                          </int-ws:header-enricher>
                          <int-ws:outbound-gateway id="testWsGateway" uri="https://myServiceUrl" interceptor="wsSecurityInterceptor" />
                      </int:chain>
              And here are the last log statements that I see after the first retry:

              Code:
              Caused by: org.springframework.ws.client.WebServiceTransportException: Not Found [404]
              	at org.springframework.ws.client.core.WebServiceTemplate.handleError(WebServiceTemplate.java:627)
              	at org.springframework.ws.client.core.WebServiceTemplate.doSendAndReceive(WebServiceTemplate.java:551)
              	at org.springframework.ws.client.core.WebServiceTemplate.sendAndReceive(WebServiceTemplate.java:502)
              	at org.springframework.integration.ws.SimpleWebServiceOutboundGateway.doHandle(SimpleWebServiceOutboundGateway.java:88)
              	at org.springframework.integration.ws.AbstractWebServiceOutboundGateway.handleRequestMessage(AbstractWebServiceOutboundGateway.java:176)
              	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:97)
              	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
              	... 83 more
              2012-07-18 13:25:00 QueueChannel [DEBUG] preSend on channel 'wsErrorChannel', message: [Payload=org.springframework.integration.MessageHandlingException: error occurred in message handler [org.springframework.integration.ws.SimpleWebServiceOutboundGateway#882dfc]][Headers={timestamp=1342632300160, id=2f7ed54b-d912-400c-8f61-3a90241272ef, errorChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@2b9562b6, replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@2b9562b6}]
              2012-07-18 13:25:00 QueueChannel [DEBUG] postSend (sent=true) on channel 'wsErrorChannel', message: [Payload=org.springframework.integration.MessageHandlingException: error occurred in message handler [org.springframework.integration.ws.SimpleWebServiceOutboundGateway#882dfc]][Headers={timestamp=1342632300160, id=2f7ed54b-d912-400c-8f61-3a90241272ef, errorChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@2b9562b6, replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@2b9562b6}]
              The above is the same behavior I was seeing with the configuration that I posted initially.

              Comment


              • #8
                Please disregard my last post. There was an issue with my IDE not picking up the changes to the config file -- it was not picking up my change to the MessageStore. This is exactly what I was looking for, thanks!

                Comment


                • #9
                  I'm interested in somehow packaging the above up in some kind of a reusable bundle, with certain parameters like dataSource/region (for the message store), soapAction/wsEndpointUri (for the WS gateway), etc. Initially I thought that I could implement my own Gateway that creates the various components programmatically, but it doesn't seem like the SI API is well suited for that (and the documentation doesn't cover it very well). Do you have any suggestion on the best way for me to achieve this?

                  Comment


                  • #10
                    Hello

                    At a glance it seems like you need this one: https://jira.springsource.org/browse/INT-1685
                    But you say this one:
                    with certain parameters like dataSource/region (for the message store), soapAction/wsEndpointUri (for the WS gateway), etc
                    So, try to investigate this sample: https://github.com/SpringSource/spri...ed/dynamic-ftp

                    Take care,
                    Artem

                    Comment


                    • #11
                      Originally posted by Cleric View Post
                      Hello
                      At a glance it seems like you need this one: https://jira.springsource.org/browse/INT-1685
                      Yes, this would seem to fit my requirements. Unfortunately, this is needed for current development so I can probably not wait for 2.2 to be released.

                      Originally posted by Cleric View Post
                      So, try to investigate this sample: https://github.com/SpringSource/spri...ed/dynamic-ftp
                      Spawning multiple application contexts seems a bit... heavyweight for our purposes. We've also run into production issues related to dynamically instantiating app contexts in the past, so that's probably a no-go.

                      Just to reiterate: Basically, the idea is that we will be using this pattern in multiple places in the application (and potentially in other apps as well):
                      1. Make a WS call
                      2. If the endpoint is down, queue the message for retry (persistent queue)
                      3. Poll the queue and retry the WS call

                      If I wanted to implement the above for multiple endpoints in the same application, I would need to copy/paste a giant chunk of XML configuration for each one. So, I want to encapsulate that pattern for re-use.

                      I have tried to programmatically create the SI objects and wire them together but it is proving to be difficult.

                      Comment


                      • #12
                        I apologize for bumping this again... is it feasible to programmatically create these components and wire them together, for the sake of reuse? Is there a resource (other than the API docs) that might be able to help me with this?

                        Comment

                        Working...
                        X