Announcement Announcement Module
No announcement yet.
Asynchronous Queue - handling duplicates Page Title Module
Move Remove Collapse
Conversation Detail Module
  • Filter
  • Time
  • Show
Clear All
new posts

  • Asynchronous Queue - handling duplicates

    I'm looking at using SI to process long running tasks asynchronously.

    The cafe example is similar to my needs - although I want to have multiple baristas - each handling a different type of order item.

    Also if an item is added to the queue and it matches an already queued item is it possible to have this item only processed once?

    Example: querying a store (that would take some time to respond) for a particular item repeatedly - ideally we would only perform the query for all duplicate items on the queue.

    Further to this - what would be the best practice for caching these queries so that subsequent requests within a time period return the same response?

  • #2
    For deduplication I would look at Idempotent Receiver. This is not something implemented in the framework, as it can only be handled in the Receiver (which is your job to write).

    You could create a deduping filter that stores all messages it has seen and passes only ones it hasn't seen before, but that is an inherent memory leak, or with a timeout, it isn't fool proof.


    • #3
      I also have a similar use-case and I need to prevent adding a message into a queue channel if the queue already contains a message with the same payload.

      My current solution for this is a channel interceptor that keeps track of messages sent to and received from the channel -- it just maintains an internal Set, adding payloads in preSend() and removing them in postReceive().

      I am wondering if there is a more elegant (and ideally 'out of the box') approach.
      I've looked through the SI's sources and cannot see one.

      What would help is a sort of a 'contains(Object payload)' method on QueueChannel. This way the duplicates rejecting interceptor would be simpler as it would only need to impl the preSend().


      • #4

        Your use-case sounds different from a typical Idempotent Receiver. For example, if a Polling Consumer receives a Message with payload X, but then someone places another Message with the same payload X on the channel, it would be accepted? In other words, you only care about duplicates that are actually *in* the Message Channel's queue at any given time. Is that correct? I'd like to hear more about the rationale so we can also consider that type of usage.



        • #5
          Hi Mark,

          thanks for the quick reply.

          Yes, exactly -- I only need to reject messages that are currently in the channel.

          In our case each message triggers a data synchronization process, the type of the data to be synchronized is defined by the payload. There are several such processes, each working on its own dataset. The process will run until it has nothing left to synchronize in its dataset.
          And we cannot run synchronization processes for different datasets in parallel because there are dependencies between the data, and the processes as they are can create duplicate data when 2 of them run at the same time. So we have to run them in a single thread one by one.

          There sometimes may be many repeated invocations for the same dataset. And we actually need only one message per dataset in the queue at a time because if the corresponding synchronization process hasn't yet started processing the message that is already in the queue to the moment when a new (duplicate) message arrives, then when the process finally gets to the first message and starts processing it, it will do all the synchronization (up to the point in time when the synchronization finally runs) in one run.

          Rejecting duplicate messages before they enter the queue helps to make sure the queue can accept new messages for other datasets (i.e. the queue doesn't get flooded by repeated identical messages).

          If we could run processes for different datasets in parallel, we could have a queue channel per dataset with a queue capacity set to 1.

          With a single thread for all processes, we still should be able to have a queue channel (with capacity = 1) + service activator per dataset, but with their pollers using one single common task executor. This should work I think but it looks a bit more complicated and would also mean a poller per dataset which I don't quite like.


          • #6
            What if you add a coordinator process between others that will be listening for all messges, routes each message to an appropriate synchronization process then waits reply for ending work and drops incoming duplicating messages.


            • #7

              Well, the coordinator would need a way to decide if it needs to drop a message. In fact it would need to know the contents of the queue. The easiest way to do that in SI seems to be with an interceptor. (BTW, where earlier I described the interceptor I used to reject duplicates -- of course it adds into the internal Set in postSend(), not in preSend(). In preSend() it just looks if the message needs to be rejected.)

              Actually we don't even need a reply from the sync processes when they are done. They get run one after another just because they use a common single-thread task executor.
              And yes, I have a service activator that knows the mapping between messages and sync processes and calls the right process based on the dataset info in the message.

              The whole configuration is simple and looks like this:

                -> [ Queue channel ] -> [ Service activator ] -> sync processes
                   ^--Interceptor--^    |---- Single-thread task executor ----|


              • #8
                I am wondering how an interceptor will know about what a message is duplicating.
                Last edited by ajones; Jun 28th, 2009, 04:51 AM.


                • #9
                  An interceptor sees every message entering and leaving the channel and so it can just keep messages (or payloads) that are currently in the channel in a collection. And then it can check if the collection contains an object equal to the new message payload.


                  • #10
                    You are going to duplicate the queue in fact. I guess it will be ok If the predictible average size of your queue will be enough small. Else you may consider another solution.


                    • #11
                      Actually in my case the channel is only meant for objects of a specific type and there may be a limited number of them (one per dataset in my case). And as the interceptor in preSend() rejects messages with a payload that's already in the interceptor's internal collection (and in the queue), neither queue nor the collection will grow beyond the max size equal to the number of datasets.

                      Yes, it would be a bit more efficient if the interceptor could just ask the channel if it contains a given object as a payload. But actually memory-wise -- the collection just references the same objects that are in the queue, not to mention they are really small in my case.


                      • #12
                        What is the criteria of а duplicating message? Its type or specific payload?


                        • #13
                          In my case a message is duplicating if the channel already (currently) contains a message with the same payload. Then when a messages leaves the channel, a new message with the same payload is not considered to be duplicating any more and now needs to be let into the channel.


                          • #14
                            Are the message produsers and the service activator executed on the same JMV or not?


                            • #15
                              Here's another option to consider.

                              You can provide a "ref" on the <queue/> element within a channel. That means you could actually define a bean for the queue you want to use. That queue could be a custom wrapper implementation (e.g. around LinkedBlockingQueue) that prevents duplicates. Whereas in the Spring Integration channel we've encapsulated the queue, if you have the queue instance yourself, you can call methods like queue.contains(Object).

                              That might be the simplest solution to your problem.

                              Just watch out for this issue that I just discovered: