Announcement Announcement Module
No announcement yet.
JDBC message store serializes messages Page Title Module
Move Remove Collapse
Conversation Detail Module
  • Filter
  • Time
  • Show
Clear All
new posts

  • JDBC message store serializes messages

    I have a project where I want to use Spring Integration as a persistent message queue; essentially fire-and-forget (relatively) long-running tasks. In the attached test project (Attachment ), the relevant context is:
    <task:executor id="pool" pool-size="10" queue-capacity="4" rejection-policy="CALLER_RUNS" />
    <int-jdbc:message-store id="store" data-source="dataSource" />
    <bean id="store1" class="" />
    <int:gateway id="sender" default-request-channel="inputChannel" service-interface="com.invenia.test.spring.integration.Sender" />
    <int:channel id="inputChannel">
        <int:queue message-store="store" />
    <int:bridge input-channel="inputChannel" output-channel="outputChannel">
        <int:poller fixed-delay="100" receive-timeout="500" max-messages-per-poll="1" task-executor="pool">
            <int:transactional propagation="REQUIRED" isolation="DEFAULT" timeout="5" transaction-manager="transactionManager" />
    <int:channel id="outputChannel" />
    <int:service-activator input-channel="outputChannel" >
        <bean class="com.invenia.test.spring.integration.ServiceActivator" />
    The test code just sends 10 messages to the gateway, while the service activator sleeps for 2 seconds on each message received.

    If i use store1 (the SimpleMessageStore), the messages are processed in parallel, while if I use store (the JdbcMessageStore) backed by MySQL, each message is processed serially. Not only is each message processed serially, but message sending also blocks after each message is sent. I've also tried this with PostgreSQL (using a development driver, as the released driver version does not implement setQueryTimeout. Why do you include schema SQL for a database not able to run the code?) with the same result.

    If I haven't misunderstood the configuration of the message store completely (in which case I would be delighted to be corrected), the JDBC message store as it stands now is relatively useless. Is there anyone else with the same experience?
    Attached Files

  • #2
    The way I see it everything works exactly as it suposed too.
    So let's parse it out and see what's going on

    JdbcMessageStore indeed serializes Messages as its the main job of the MessageStore - to serialize Message to a persistent data store specific to its implementation (e.g., JDBC, Mongo, Redis, Gemfire etc.). And with such serialization you obviously pay the cost which in a lot of cases depends on third-party system configuration (RDBMS, Mongo, Redis etc.).
    So what does this mean in the context of your sample?
    It means that when you are using SimpleMessageStore there is practically no cost to pay since the Message is stored in memory as if you would not specified message-store at all (we use SimpleMessageStore by default). So there is no cost on write and no cost on read.
    With JdbcMessageStore you pay the price on both write and read.
    This means:
    - when you say the gateway blocks on send that is because the write is happening on the same thread as send.
    - when you say the Messages are process serially its because your poller configuration is almost set up to make an illusion of serial processing (with max-messages-per-poll set to 1) and the only reason why it seems like its in parallel with SMS is because there is no price to pay on the read and you are making an artificial delay in the ServiceActivator.

    We could obviously help you fix the configuration once we know what are you trying ti accomplish with this prototype. What is the use case? Can you describe it from the user's perspective (not technology)


    • #3
      Thank you for your reply.

      What I want to achieve is to kick off a long-running background task (report generation, file transfer, etc.) so that the foreground thread may return. I also need these tasks to be persisted, so that in the event of failure, we may restart the operation.

      When running the example with SMS, I can see that all messages are put in the message store before the poller even starts to consume messages. This is expected, as the SMS is an in-memory queue. But when using the JDBC message store, the first message put on the store returns immediately, while the subsequent message puts block until the previous message has been processed. I would expect to be able to deliver all messages within a very short timespan, even though the message processing has started.

      I know that this is a simplification of events, the timings would be different on some other machine, etc., but it seems to me that the problem is that the method removeMessageFromGroup() updates the message group table, effectively placing a lock on the message group for the duration of the transaction, and also blocks any further message insertion or removal. Therefore, both message insertion and removal are forced to proceed in lock-step. This behavior is exhibited even if the max-messages-per-poll is increased, but is harder to see.


      • #4
        Few questions
        Is your Gateway method annotated as @Transactional?
        Also, you are using synchronous gateway. Have you tried async gateway
        Also, what version of SI you are using? The 'removeMessageFromGroup' and the entire JdbcMessageStore was changed quite a bit with the latest 2.2 M1 and M2


        • #5
          1. As of now, the gateway method is not transactional. I call Sender.send() (Sender is the gateway service interface) from a test case inheriting from AbstractJUnit4SpringContextTests. But in production, it will be run in a transactional context.

          2. This is definitely a step forward! Replacing 'void send(String message)'with 'Future<Void> send(String message)' allows the sender to proceed at it's own pace. This seems to hold true even when run as a AbstractTransactionalJUnit4SpringContextTests, giving it a transactional context.

          3. I'm now using using SI 2.2.0M2, although the attached example is using M1.

          Even though #2 improves things a lot, there are still 2 issues:

          1. The lock created by the updateMessageGroup() call in removeMessageFromGroup() still prevents the other threads in the thread pool from retrieving messages and processing these in parallell. This is also manifested by several transaction timeouts by the threads in the pool as they never have a chance to finish polling the message store.

          2. This is actually a different issue, but the same message may be delivered several times. Is there any way to prevent this (once and only once delivery)?



          • #6
            Well, i think you practically answered your own question. In JdbcMessageStore case you are relying on the shared datastore so although in your program (even without async gateway) you are using two different threads they still operate on the same shared resource which can be locked by one thread at the DB level. And that is where the DB isolation levels come in where you can manage the locking strategy. So that should answer #1

            As far as #2 when you say "the same message" what do you mean by that? Is that a business definition? How would framework know that this message is a duplicate?


            • #7
              So what you're in effect saying, is that if we use the JdbcMessageStore, there is no way to process messages in parallell? That would render it quite useless. What we hoped to achieve was to replace JMS-queues with spring integration queues backed by a database, but this seems to be impossible right now.

              #2 is kind of irrelevant now, but as it stands, the JdbcMessageStore may deliver the same message (as stored in the message store) to several threads in the thread pool. But as it is not possible to parallellize processing of the messages, there is no need for a thread pool, and hence, no duplicate message delivery...