Announcement Announcement Module
Collapse
No announcement yet.
Asynchronous / multi-threaded processing of AMQP messages Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Asynchronous / multi-threaded processing of AMQP messages

    Greetings,

    As a proof-of-concept, I recently started using the base Spring AMQP module (RC1) to perform basic putting and fetching of serialized JSON messages to and from a RabbitMQ instance. For reading the messages, I was using SimpleMessageListenerContainer. This was great, as I was able to specify the the number of concurrent consumers on the queue via the concurrentConsumers property.

    Since then, I have moved on to trying to prototype Spring Integration to build a loosely-coupled pipeline that performs operations triggered by messages delivered via AMQP. I would like to have a configurable number of workers reading from the queue and independently processing the requests, but I am unable to find a straightforward way to do this with the AMQP inbound channel adapter. As I am new to SI in general, I'm wondering if I'm missing some fundamental knowledge that makes this possible. This seems like a fundamental need across most of the Spring Integration stack, so it is clear that my understanding is flawed. Is there anyone out there willing to point me in the right direction?

    Thanks,
    Will

  • #2
    Will, Spring Integration's AMQP inbound-channel-adapter builds directly on top of the Spring AMQP SimpleMessageListenerContainer. Knowing that... if we add the concurrent-consumers attribute on the adapter element's configuration, would that be sufficient for your needs?

    Thanks,
    Mark

    Comment


    • #3
      Originally posted by Mark Fisher View Post
      Will, Spring Integration's AMQP inbound-channel-adapter builds directly on top of the Spring AMQP SimpleMessageListenerContainer. Knowing that... if we add the concurrent-consumers attribute on the adapter element's configuration, would that be sufficient for your needs?

      Thanks,
      Mark
      Thanks for the quick reply, Mark. I believe that would be sufficient, but I'm so early in this project that I wouldn't be willing to bet my firstborn on it. Regardless, it would certainly go a long way towards helping to implement distributed workloads being delivered via AMQP.

      Comment


      • #4
        Mark, I'm not certain what the etiquette or process is for this kind of thing, but I've attached the patch I threw together to be able to do this locally.

        Comment


        • #5
          Mark, Oleg helped me understand the concept of channels and task executors performing many of the asynchronous tasks. With that in mind, I think that using a task executor to asynchronously pull messages from the AMQP input channel is a strategy that is more in line with the "Spring Integration Way". I'm no longer convinced that exposing the concurrent consumers attribute is necessary.

          Comment


          • #6
            There is one important thing to consider however: transactions.

            If you don't need them (i.e. it's ok to lose messages... typical eventing rather than important-data-to-be-processed), then it's fine to use the async channel and hand off to another thread. However, if you are sending AMQP Messages that contain data that must be processed (account debit/credit, stock trade request, etc.), you probably want to have transaction support. In that case, you would be better off with the concurrent-consumers on the listener container since those consumer threads are actually able to participate in the transaction and commit or rollback based on downstream success or failure, respectively.

            Does that make sense?... and if so, which type of behavior best describes your use-case?

            Comment


            • #7
              Originally posted by Mark Fisher View Post
              There is one important thing to consider however: transactions.

              If you don't need them (i.e. it's ok to lose messages... typical eventing rather than important-data-to-be-processed), then it's fine to use the async channel and hand off to another thread. However, if you are sending AMQP Messages that contain data that must be processed (account debit/credit, stock trade request, etc.), you probably want to have transaction support. In that case, you would be better off with the concurrent-consumers on the listener container since those consumer threads are actually able to participate in the transaction and commit or rollback based on downstream success or failure, respectively.

              Does that make sense?... and if so, which type of behavior best describes your use-case?
              Excellent point. For my current use case, transaction support is not required, but it sounds like exposing concurrent-consumers is important for those instances.

              Comment

              Working...
              X