Announcement Announcement Module
Collapse
No announcement yet.
Newbie - Queue Channel doubt Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Newbie - Queue Channel doubt

    Hi,

    I have got a simple requirement but am struggling a bit to fit it in within the Spring integration framework...

    I want to have a queue into which I shall be putting in messages and for which I have defined a queue channel as like this
    Code:
    <channel id="x">
      <queue capacity="30"/>
    </channel>
    Now, I have defined a poller on the queue like this

    Code:
    <poller id="poller" max-messages-per-poll="10">
     <trigger interval="10000"/>
    </poller>
    I have kept an interval of 10 seconds and in each poll I retrieve 10 messages. Now, what I would like to achieve is that I want the next poll to block until these 10 messages are processed. How can I achieve this ? I do not want to use an event driven consumer as the program that does the processing is best suited for bulk and I want to use the poller...

    Any ideas/suggestions ?

    Thanks in advance

    Rgds...VJ

  • #2
    If the poller has only one thread you'll get what you want (if I understand you correctly).

    Code:
    <poller ... task-executor="singleThreadedPool">
    ...
    </poller>

    Comment


    • #3
      It looks like a bug

      Originally posted by iwein View Post
      If the poller has only one thread you'll get what you want (if I understand you correctly).

      Code:
      <poller ... task-executor="singleThreadedPool">
      ...
      </poller>
      Actually this doesn't work with SI 1.0.3. This is my configuration:

      <channel id="reception" >
      <queue capacity="500" />
      </channel>
      <service-activator input-channel="reception" ref="DARVAMessagesHandler" method="handleMessage">
      <poller ref="singleThreadedPool"/>
      </service-activator>
      When I test this configuration with two messages in the queue and intercept the handleMessage() in the debugger I see two threads: one corresponding to
      my single thread pool and a task-scheduler- one. So, when the only thread of my singleThreadPool is busy SI use another thread to handle the second message... This is not what I expected.

      regards
      Vincent

      Comment


      • #4
        I think what you are seeing is the rejection policy. Spring Integration uses "CallerRuns" by default, and that means that when a pool is exhausted, the caller thread will run a task that it submits. So, you are seeing the central "task-scheduler" thread pool (the caller in this case) executing the task since the 1 thread is already occupied. The reason we use that as a default is that it provides a natural way to throttle requests.

        If you want to change the behavior, you can choose a different option by adding the "rejection-policy" attribute in the poller configuration. However, the options are to throw an exception (abort) or to discard tasks (oldest or current). Those options are usually *not* desired... unless the application can afford to lose messages (such as real-time events that will be sent again soon).

        Another configuration option for the thread-pool is the queue-capacity. If you set a large size (even unbounded is an option but that could lead to OOM), then the tasks are submitted to the queue any time the thread pool is busy. Perhaps that's the direction you want to go?

        Comment


        • #5
          Actually, my use case is the following:
          - I receive a file containing several individual messages.
          - I use a file adapter to read the file (XML)
          - A transformer unmarshalls XML to an object list.
          - A splitter wraps each object in a message and send it to a queue.
          - A poller takes each message in the queue and call a service activator to update my business database.

          Unfortunately, some messages leads to concurrence exceptions in the database. One obvious solution consists in reading messages in the queue with only one thread (I do not have performance issue in my use case, so a one thread consumer is OK). Naively, I tried to limit the thread pool size, but it is definitely not a good option. The current rejection-policy is OK for me as I do not want to loose messages.

          May be a rendez-vous channel and a bridge with the messages queue can simulate a one thread polling (seems complicated...) ? What are the options to control messages consumption ?

          sincerly
          Vincent

          Comment


          • #6
            Changing rejection policy to 'DISCARD' for my one thread task executor solves my use case. However, according to Anna Labinskaya (see http://forum.springsource.org/showth...ler+throughput), one cannot with DISCARD option.

            I test it with a debugger and a break point in the service activator method and two messages: I block the thread 5 minutes for the first message, I release it and see a the second message enter the service activator normally. Note that I use a queue size of 0 for my single thread task executor since I do not want to copy my message queue in a thread queue but just want to serialize the service activator message handling.

            For my use case it is enough, I just wonder if you think Anna L. conclusions are correct?

            sincerly
            Vincent

            Originally posted by vincentditlevinz View Post
            If you want to change the behavior, you can choose a different option by adding the "rejection-policy" attribute in the poller configuration. However, the options are to throw an exception (abort) or to discard tasks (oldest or current). Those options are usually *not* desired... unless the application can afford to lose messages (such as real-time events that will be sent again soon).

            Comment


            • #7
              Let me ask a simple question first. Would you be okay with a single File being processed at a time? In other words, pushing the "single-threadedness" down to the level of the original File-poller?

              Comment


              • #8
                Originally posted by Mark Fisher View Post
                Let me ask a simple question first. Would you be okay with a single File being processed at a time? In other words, pushing the "single-threadedness" down to the level of the original File-poller?
                The file is emitted by a third party and I don't want to pre-process the file to split it into N files, too much refactoring (Jibx xml-object binding, the specific adapter I developped for the third party file exchange tool we use, changing the data workflow ...).
                The DISCARD option seems OK, but I must say that I don't understand all the implications of this choice.

                Comment


                • #9
                  Okay... perhaps I was making an incorrect assumption as well. You are not using Spring Integration's "inbound-channel-adapter" for the File polling are you?

                  Comment


                  • #10
                    Originally posted by Mark Fisher View Post
                    Okay... perhaps I was making an incorrect assumption as well. You are not using Spring Integration's "inbound-channel-adapter" for the File polling are you?
                    I am using a file:inbound-channel-adapter from the SI File adapter project.

                    Comment


                    • #11
                      Okay. So, let me re-phrase my last question... I was wondering if you would be able to modify the poller configuration on that inbound-channel-adapter so that *it* is only processing one File at a time?

                      If so, the subsequent channel could then be synchronous (remove its 'queue' sub-element) so that the same thread that polls the file system is also sending and later handling the message. If interval based, the file poller with a single thread task-executor, max-messages-per-poll=1, and a fixed-delay (so that it only polls after the preceding poll completes) should be moving in the right direction.

                      Comment


                      • #12
                        Originally posted by Mark Fisher View Post
                        Okay. So, let me re-phrase my last question... I was wondering if you would be able to modify the poller configuration on that inbound-channel-adapter so that *it* is only processing one File at a time?

                        If so, the subsequent channel could then be synchronous (remove its 'queue' sub-element) so that the same thread that polls the file system is also sending and later handling the message. If interval based, the file poller with a single thread task-executor, max-messages-per-poll=1, and a fixed-delay (so that it only polls after the preceding poll completes) should be moving in the right direction.
                        I was unclear in my use case description:
                        Daily, I will receive only one XML file . So, the file system polling cannot help in this case.
                        The file input adapter takes this file and call a transformer that extract an ArrayList of N orders (through a JiBx transformation), then, a splitter takes this list and wraps each item in a SI message before sending them in a queue.
                        For each message in the queue I have to update my database, and, to prevent from concurrency exceptions, I must handle one message at a time.
                        Your suggestion implies that I pre-process my daily file to obtain N files (one message per file) which means a lot of refactoring for me.

                        Comment


                        • #13
                          I understand. So, the next question ... if you want the processing of all rows to be handled by a single thread, can you simply remove the 'queue' sub-element from any channels in the process?

                          In such a case, a splitter will send each Message including the full downstream message flow (handling that Message) prior to the next split being sent. Does that make sense?

                          Comment


                          • #14
                            Originally posted by Mark Fisher View Post
                            I understand. So, the next question ... if you want the processing of all rows to be handled by a single thread, can you simply remove the 'queue' sub-element from any channels in the process?

                            In such a case, a splitter will send each Message including the full downstream message flow (handling that Message) prior to the next split being sent. Does that make sense?
                            Yes it makes sense for this specific use case.

                            Actually, I have built a common data workflow handling with SI to adapt a third party file exchange software. Unfortunately, sometimes I do not have concurrency problems but performances becomes an issue and I prefer let several threads handle my messages. It depends on the input file content... I simply change the task executor parameters to achieve this with my current configuration. That's why I wonder if changing task executor rejection policy is dangerous or not.

                            Comment

                            Working...
                            X