Announcement Announcement Module
Collapse
No announcement yet.
Poller removing message from queue before service activator complete Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Poller removing message from queue before service activator complete

    I have a channel backed with a message queue. I then have a service activator which uses a poller to and a taskExecuter to poll the queue and invoke a service when a message is received.

    I have made the queue persistent using hypersonic (hsql) and is setup to persist to the filesystem rather than memory. This all works under normal conditions. However, if I run my server and set a breakpoint inside the service so the breakpoint is hit on a thread from the taskExecutor and then I stop the Server, when I restart the message that was being serviced when I stopped the server is lost.

    That is, stopping the server in the time between getting the message from the queue and processing it. This is a test to check what could happen due to a server failure and it looks like any messages that have been removed from the queue and are being processed will be lost. However this is why I made the queue persistent in the first place.

    Looking at the HSQL logs, it appears that a message is deleted from the database when it is removed from the queue, which ordinarily makes sense I guess.

    I need the persistent store to not delete/comit the message until processing is complete and the call stack unwinds back into the polling/task execute code.

    Anyone know how to do this, prevent the delete from the channel queue until after the service has completed processing the message rather than when the message is read from the queue?

  • #2
    backed with a message queue
    It sounds like you mean message store.

    You need to make the poller transactional (add a <transactional/> child element). Bear in mind there must be no async handoff to another thread, by using queue or executor channels etc further downstream, or the commit will happen right after the handoff. It's ok to have a task executor on the poller, though.

    http://static.springsource.org/sprin...nsactions.html

    Comment


    • #3
      Originally posted by Gary Russell View Post
      It sounds like you mean message store.

      You need to make the poller transactional (add a <transactional/> child element). Bear in mind there must be no async handoff to another thread, by using queue or executor channels etc further downstream, or the commit will happen right after the handoff. It's ok to have a task executor on the poller, though.

      http://static.springsource.org/sprin...nsactions.html
      I did actually mean a queue (cant you have channels without a queue?), which is then backed by a persistent store. Maybe "backed by a queue" isnt the correct way to describe it though.

      Anyway, I'm a bit confused with the transaction side of things because there's effectively two databases. When the Service Activator invokes my service class method, I have a @Transactional(value="transactionManagerAdmin") on that method. The attribute identifies the transactionManager (and hence implicitly the datasource/connection) I want the transaction to run on.

      Then theres the persistent store that backs the queue.
      Code:
      <bean id="transactionManagerAsyncChannel" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
          	<property name="dataSource" ref="intDataSource"/>
      </bean>	
      
        <bean id="store" class="org.foo.integration.jdbc.store.JdbcChannelMessageStore">
          <property name="dataSource" ref="intDataSource"/>
          <property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
          <property name="region" value="TX_TIMEOUT"/>
          <property name="usingIdCache" value="true"/>
       </bean>	
      
      <int:service-activator ref="legacyTransactionHandlerAsyncService" method="processMessage" input-channel="legacyTransactionHandlerAsyncServiceTransactionQueue">
          <int:poller trigger="rxPollingTrigger" max-messages-per-poll="1"  task-executor="taskExecutor">
               <int:transactional transaction-manager="transactionManagerAsyncChannel"/>
          </int:poller>
      </int:service-activator>
      
      <task:executor id="taskExecutor" pool-size="25" keep-alive="120" />
      So I added a transaction manager to the poller. Now when I run after some period of time running a load test I get the following Exceptions thrown:

      Caused by: org.apache.tomcat.jdbc.pool.PoolExhaustedException : ["http-bio-8080"-exec-3] Timeout: Pool empty. Unable to fetch a connection in 30 seconds, none available[size:25; busy:25; idle:0; lastwait:30000].
      at org.apache.tomcat.jdbc.pool.ConnectionPool.borrowC onnection(ConnectionPool.java:675)
      at org.apache.tomcat.jdbc.pool.ConnectionPool.getConn ection(ConnectionPool.java:188)


      So I increased the size of the pool and changed some other things like waitTime and abandoned timeout etc. This fixed the above Exception but I started to get othe errors which was probably a symptom of connections being abandonded early, though im not sure.

      What I dont understand quite is where the transaction is started and where it ends. Does the transaction start when the message is put into the queue? or when its taken from the queue and comitted once the service is complete and the delete has happened?

      Im going to try and use a simple datasource instead of a pooled one. I think I read in the HSQL docs someplace you dont need to use a pool because its in memory and connection setup is really quick (I gues no IP/TCP and handshaking required etc).
      Last edited by Gary Russell; Aug 20th, 2013, 09:23 PM.

      Comment


      • #4
        Please use [ code ] ... [ /code ] tags when posting code/config (I edited your post).

        You need to be a little more precise with your language.

        It sounds like you mean message store.
        I did actually mean a queue
        and then you provide configuration that shows a JDBC message store.

        It also doesn't help very much when you only show partial configuration, so I'll make some assumptions...

        Assuming legacyTransactionHandlerAsyncServiceTransactionQue ue is your messageStore-backed channel; the transaction starts when the poller (actually the taskExecutor thread) polls a message from the channel. The thread then invokes your processMessage() method and, since you don't have an output channel, the commit occurs after the method returns (actually when the task execution invoked by the poller completes).

        When debugging Spring transactions, I usually find enabling TRACE level debugging to be useful. All should become clear.

        Comment


        • #5
          ok,

          Sorry about the language. As I think you have figured out I have a service-activator which is configured with a poller to a channel and the poller has a taks executor.

          The input channel is a (has a?) queue, and the queue is backed with a message store - the HSQL database.

          I'm now using an apache jdbc connection pool created and configured in spring using @Bean. (I was having trouble getting the datasource to appear as an MBean so I create it in code and use the @Bean annotation rather than straight spring xml)

          After looking at the MBean for the HSQL connection pool while running a performance test, I notice the "Active" attribute is getting upto 10-15. I dont understand this since my performace test is only sending maximum of 5 requirests at once (it simulates 5 clients and they only send messages one at a time). So the channel should have a maximum of 5 messages, the clients get their async response from a different route and wont send another message until the current one is completed, which involves the invocation of the service.

          The service that is invoked by the service activator is transactional and the associated connection pool to my enterprise database never goes much beyond 2. I dont understand that if my service invocations are not going over 2 (max 5) in terms of concurrent access, why the HSQL persistent active connections are getting upto 15. I can only think that its freeing up connections not after the service returns back to the poller. But if this is the transaction boundary then surely it should be?

          Ill try switching on tracing. The HSQL log also has a lot of DISSCONNECTS all in a row, implying that connections are getting disconnected in batch? And I have no clue why it keeps sending SET SCHEMA PUBLIC

          eg:

          DELETE FROM INT_CHANNEL_MESSAGE WHERE GROUP_KEY='952b9a8a-d08b-3147-a819-8a34535e1bad' AND MESSAGE_ID='3f939a71-7441-465f-ac3e-1dc58656e88a' AND REGION='TX_TIMEOUT'
          COMMIT
          DISCONNECT
          /*C1989*/SET SCHEMA PUBLIC
          INSERT INTO INT_CHANNEL_MESSAGE VALUES('c23a0bf5-e227-41e7-a55c-ca47a7f8f207','952b9a8a-d08b-3147-a819-8a34535e1bad',1377054422657,'aced0005737200366f726 72e737snip','TX_TIMEOUT')
          COMMIT
          DISCONNECT
          /*C1986*/DISCONNECT
          /*C1980*/SET SCHEMA PUBLIC
          DISCONNECT
          /*C1978*/SET SCHEMA PUBLIC
          DISCONNECT
          /*C1979*/SET SCHEMA PUBLIC
          DISCONNECT
          /*C1977*/SET SCHEMA PUBLIC
          DISCONNECT
          /*C1976*/SET SCHEMA PUBLIC
          DISCONNECT
          /*C1975*/SET SCHEMA PUBLIC
          DISCONNECT
          /*C1974*/SET SCHEMA PUBLIC
          DISCONNECT
          /*C1972*/SET SCHEMA PUBLIC
          DISCONNECT
          /*C1982*/DISCONNECT
          /*C1917*/SET SCHEMA PUBLIC
          DISCONNECT
          /*C1990*/SET SCHEMA PUBLIC
          DELETE FROM INT_CHANNEL_MESSAGE WHERE GROUP_KEY='952b9a8a-d08b-3147-a819-8a34535e1bad' AND MESSAGE_ID='c23a0bf5-e227-41e7-a55c-ca47a7f8f207' AND REGION='TX_TIMEOUT'
          COMMIT
          /*C1991*/SET SCHEMA PUBLIC
          Last edited by tim.taylor; Aug 20th, 2013, 10:32 PM.

          Comment


          • #6
            It depends on the transaction configuration of your service. The poller will start a transaction before it pulls a row from the channel. If the service is configured to participate in an existing transaction, it will do so, and the transaction will commit when the poller returns.

            Again, turn on TRACE logging to see what's going on.

            Comment


            • #7
              BTW thanks for your help on this. I appreciate it

              Comment


              • #8
                Originally posted by Gary Russell View Post
                It depends on the transaction configuration of your service. The poller will start a transaction before it pulls a row from the channel. If the service is configured to participate in an existing transaction, it will do so, and the transaction will commit when the poller returns.

                Again, turn on TRACE logging to see what's going on.
                The Service is using a different transaction manager and datasource (its pointing to SQL Server) so it runs within its own transaction. Im wondering if the poller and task Executor is pulling off my messages more than once, So a single message is processed twice?


                Ill try tracing.
                Last edited by tim.taylor; Aug 20th, 2013, 10:53 PM.

                Comment

                Working...
                X