Announcement Announcement Module
No announcement yet.
Spring integration flow in single transaction Page Title Module
Move Remove Collapse
Conversation Detail Module
  • Filter
  • Time
  • Show
Clear All
new posts

  • Spring integration flow in single transaction


    I need to process message in single transaction. In future it will be rich flow with XA transaction witch must be rolled back in case of exceptions.
    But now I cannot understand how to keep transaction during sending message from one channel to another.
    This is my config:
     <int:channel id="in">
            <int:queue capacity="10"/>
        <int:channel id="internalChannel">
            <int:queue capacity="10"/>
        <int:channel id="out">
            <int:queue capacity="10"/>
    <int:transformer input-channel="in" output-channel="internalChannel" ref="myTrasnformer">
           <int:poller max-messages-per-poll="1" fixed-rate="10000">
                <int:transactional transaction-manager="jtaTransactionManager" propagation="REQUIRED"/>
                update="insert into message (id, message) values (1,'Hello')"
                request-channel="internalChannel" reply-channel="out" data-source="dataSource1">
            <int:poller max-messages-per-poll="1" fixed-rate="10000">
                <int:transactional transaction-manager="jtaTransactionManager" propagation="MANDATORY"/>
       <bean id="jdbcDataSource1" class="org.hsqldb.jdbc.pool.JDBCXADataSource">
            <property name="url" value="jdbc:hsqldb:hsql://localhost/testdb"/>
            <property name="user" value="sa"/>
            <property name="password" value=""/>
        <bean id="dataSource1" class="com.atomikos.jdbc.AtomikosDataSourceBean" init-method="init"
            <property name="xaDataSource" ref="jdbcDataSource1"/>
            <property name="uniqueResourceName" value="database1"/>
     <bean id="jtaTransactionManager"
              class="org.springframework.transaction.jta.JtaTransactionManager" >
            <property name="transactionManager" ref="atomikosTransactionManager"/>
            <property name="userTransaction" ref="atomikosUserTransaction"/>
        <bean id="atomikosTransactionManager"
              init-method="init" destroy-method="close">
            <property name="forceShutdown" value="false"/>
        <bean id="atomikosUserTransaction"
            <property name="transactionTimeout" value="300"/>
    As I understand we starting transaction in poller, but when we sent message to outbound-gateway we don't have it. So getting exception:

    No existing transaction found for transaction marked with propagation 'mandatory'
    I need to keep single transaction during all flow. Please advice me, how I can do that

  • #2
    You must use DirectChannels instead of QueueChannels (remove the <queue/> elements on your channels).

    With direct channels the downstream elements are invoked on the calling thread (in this case your transactional poller).


    • #3
      Thanks Gary!

      I read this documentations, but cannot get some things from it.
      If I will start using direct channel, I will be need to remove pollers, because DirectChannel not pollable channel. But I need poller to start transaction.


      • #4
        It depends on how you start your flow. If it polls a message source, you need an initial poller but you must not have a poller on the outbound gateway.


        • #5
          I have a MessageChannel (in) which is autowired in code. I directly send message in it.
          1) How I can create transaction? I need poller here to create transaction?
          2) If I must not have poller on outbound gateway have I can check that this call is under transaction?



          • #6
            Every time you have a <queue/> channel, you hand off to another thread.

            In your case, you need to remove the <queue/> from internalChannel and the poller from the gateway.

            The transformer's poller will start the transaction and remove the message from 'in' and the gateway will be called on the same thread (and thus participate in the transaction).

            An alternative would be to also remove the <queue/> from 'in' and start the transaction in your calling code; then the calling thread will call the transformer, and then the gateway and then, again, everything runs in the same transaction.