Announcement Announcement Module
Collapse
No announcement yet.
Synchronous Messaging Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Synchronous Messaging

    Hi all:

    I just started testing SI, and never worked with any other Spring modules before. First, my congratulations to the developers, I'm normally very suspicious about everything that has "framework" on it's name (they tend to be bloated, over-complex and tend to hide things that shouldn't be hidden). Until now I didn't find any of these pitfalls with SI.

    Until now (one day and a half!) my tests went very well and I'm about to recommend the use of SI over other similar "frameworks".

    The only thing I didn't catch in my tests until now is the way to do "synchronous messages", or at least to force a process to wait for a response to a previous sent message.

    Basically, what I have now is a "adapter" (http for now, i.e. a servlet, to be extended to other adaptor protocols), that invoke a "resource" that dispatch messages to "services" that finally put some "responses" back in a queue.

    All this is working ok asynchronously, but now I wanted a way for my servlet to wait for the "service" response.

    What's the best way to do this?

    Thanks for your help.

  • #2
    The feature you are describing is currently in-progress: http://jira.springframework.org/browse/INT-86

    This will be included in the Milestone 2 release (next week). However, the first cut should be available in SVN later today if you would like to try it out.

    -Mark

    Comment


    • #3
      Hi. Thanks for that. I will check it.

      If you think I can be of assistance, either testing or developing it, please let me know.

      Cheers.

      Comment


      • #4
        Actually, if I do just

        responseChannel.doReceive(-1).getPayload()

        wouldn't I get a synchronous response?

        Comment


        • #5
          That's the general idea

          My plan is to also implement the MessageHandler interface. So, it would be something like this:

          MessageHandler handler = new SynchronousHandler(channelToSendTo);
          Message reply = handler.handle(messageToSend);

          (internally, that would set the replyChannel and then wait for a response)
          Also, the timeouts could be set...
          handler.setSendTimeout(..)
          handler.setReceiveTimeout(..)

          How does that look?

          -Mark

          Comment


          • #6
            But that will take care of "temporary replyChannel" only, right? How about "shared channel with correlationId"? That can not be dealt with a BloquingQueue, meaning you have to make major changes in SimpleChannel or write another type of channel.

            And that second access is very important for not having response messages waiting for other tasks to finish.

            I'm going to do some more testing.

            Cheers.

            Comment


            • #7
              Even a shared channel can still be based on the BlockingQueue internally, the synchronous handler would simply receive in a loop. When a message arrives, it will check the correlationId and return to the correct caller (still likely using individual SynchronousQueues to block each caller). The main advantage I see with the shared channel approach is the ability to use interceptors, set a capacity, etc. Are there any other ideas or use-cases that you can foresee here?

              Thanks,
              -Mark

              Comment


              • #8
                Well, for our situation we're going to need at least some way of doing prioritization of messages, auditing and persistence.

                Persistence will only be needed on a future instalment (the first will be intra vm only and will need no persistence). And probably will rely on JMS and transactional support that I see you already working on.

                Auditing will probably be doable with interceptors/filters.

                Prioritizing messages, however, and as I see it now, will probably need some sort of unsynchronized, non-blocking queue, like a PriorityQueue.

                Please note that I'm not a expert on these issues, so I can probably be wrong on this.

                Once you have something on SVN please let me know, I can probably develop something on it to test my use cases.

                Cheers.

                Comment


                • #9
                  There is a PriorityBlockingQueue - the main difference (other than prioritization) is that it is unbounded whereas the current "SimpleChannel" allows configuration of a capacity. A priority-based channel is definitely going to be a necessary addition, but I think the queue will need to be wrapped and combined with a semaphore so that capacity can be limited. We don't want the limit to be enforced by OutOfMemoryExceptions

                  -Mark

                  Comment


                  • #10
                    But a PriorityBlockingQueue is synchronized and blocking. In my first tests (without using SI yet) I hadn't be able to correctly implement priority *and* synchronous messages using synchronized, blocking queues.

                    I'll try to put up a simple case scenario tomorrow and test it again.

                    Thanks for your prompt replies.

                    Comment


                    • #11
                      I've added an issue for this: http://jira.springframework.org/browse/INT-115

                      It might get pushed into M3, but I have managed to create a simple prototype with an enforced capacity limit and timeouts for send/receive.

                      -Mark

                      Comment


                      • #12
                        I can't find no new code on SVN, I really want to try and test it.

                        Cheers.

                        Comment


                        • #13
                          I just committed the initial version of RequestReplyTemplate (in the org.springframework.integration.channel package).

                          For example usage, see: org.springframework.integration.channel.RequestRep lyTemplateTests (in the src/test directory). Currently there are only 2 tests, but they demonstrate both synchronous (blocking) and asynchronous (with callback) usage.

                          Here are a couple quick examples of the usage model...

                          Synchronous:
                          Code:
                          RequestReplyTemplate template = new RequestReplyTemplate(requestChannel);
                          Message<?> replyMessage = template.request(requestMessage);
                          Asynchronous:
                          Code:
                          ReplyHandler replyHandler = new ReplyHandler() {
                              public void handle(Message<?> replyMessage, MessageHeader originalMessageHeader) {
                                  // handle reply message
                              }
                          };
                          RequestReplyTemplate template = new RequestReplyTemplate(requestChannel);
                          template.request(requestMessage, replyHandler); // returns without blocking
                          The "shared channel" scenario is on the way.

                          If you get a chance to try it out, please let me know how it goes.

                          -Mark

                          Comment


                          • #14
                            Hi:

                            Great work, thanks for that. Until now my tests are going swell.

                            I just made some changes to your code in RequestReplyTemplate::request like this:

                            Code:
                            final MessageHeader header = requestMessage.getHeader();
                            *final Object correlationId = requestMessage.getId();*
                            *header.setCorrelationId(correlationId);*
                            this.executor.submit(new Runnable() {
                            	public void run() {
                            		Message<?> reply = request(requestMessage, sendTimeout, receiveTimeout);				
                            		*reply.getHeader().setCorrelationId(correlationId);*
                            		replyHandler.handle(reply, header);
                            	}
                            });
                            Basically, for asynchronous calls I need to establish a correlationId so I can give the user a way to access the resulting message once it's done (like, return that Id on the http response, providing a URI using that Id that could access the result message once done, or maybe send a email once the job is done containing that URI/Id, etc...)

                            I'm just using the requestMessage Id for that, so

                            requestMessage.getId()==requestMessage.getHeader() .getCorrelationId()==responseMessage.getHeader().g etCorrelationId()

                            maybe it should be better to let the implementers to implement some pluggable scheme on their own and only fallback to this in case they don't, well, you get the point...

                            I wonder also if the method should return some value, maybe the correlationId itself or maybe the original requestMessage with the header correlationId set.

                            I'll keep on testing.

                            Cheers.
                            Last edited by amsmota; Feb 21st, 2008, 07:27 AM.

                            Comment


                            • #15
                              Another thing, in the case of asynchronous messages, the message should go silently to the pre-defined default-output-channel, unless explicitly stated.

                              Now I have

                              <endpoint input-channel="requestChannel" handler-ref="serviceimpl" handler-method="execute" default-output-channel="responseChannel"/>

                              and in asynchronous mode I have to do

                              Code:
                              	ReplyHandler replyHandler = new ReplyHandler() {
                              	public void handle(Message<?> replyMessage, MessageHeader originalMessageHeader) {
                              	  	responseChannel.send(replyMessage);
                              	    }
                              	};
                              	template.request(requestMessage, replyHandler);
                              If I have simply

                              Code:
                              template.request(requestMessage, null);
                              the message should go to the default responseChannel.

                              Comment

                              Working...
                              X