Announcement Announcement Module
Collapse
No announcement yet.
Maintaining individual transactions for service activations Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Maintaining individual transactions for service activations

    We are using spring-integration and we wish to maintain individual transaction boundaries for services which are somewhat in a pipeline fashion as below

    Code:
    	<int-jdbc:inbound-channel-adapter channel="tablePoller" 
    	query="select * from table1 where column1 = 'value1' and column2 = 'value2' "
    	data-source="dataSource"
    	update="update table1 set column1 = 'value3' where id in (:id)" >
    		<int:poller fixed-rate="10000">
    			<int:transactional />
    		</int:poller>
    	</int-jdbc:inbound-channel-adapter>
    	
    		
    	<!--  split List<id's> into id -->
    	<int:splitter input-channel="filesPoller" output-channel="getid" ></int:splitter>
    	
    	<int:transformer input-channel="getid" expression="payload.get('id')" output-channel="nextChannel"></int:transformer>  
    
    	
    	<!-- wish to maintain individual transaction here -->
    	<bean id="aService" class="a.b.c.AService">
    		<property name="aDao" ref="aDao"></property>
    	</bean>
    	<int:service-activator input-channel="nextChannel" ref="aService" method="aService" output-channel="router"></int:service-activator>
    	
    	<int:router input-channel="router" expression="payload.variable.contains('aValue') ? 'bChannel' : 'cChannel'"></int:router>
    	
    	
    	<!-- wish to maintain individual transaction here -->
    	<bean id="bService" class="a.b.c.BService"/>
    		<property name="aDao" ref="aDao"></property>
    	</bean>
    	<int:service-activator input-channel="aChannel" ref="bService" method="bService" output-channel="stdout"></int:service-activator>
    	
    	<bean id="cService" class="a.b.c.CService"/>
    		<property name="aDao" ref="aDao"></property>
    	</bean>
    	<int:service-activator input-channel="cChannel" ref="cService" method="cService" output-channel="stdout"></int:service-activator>
    Since we are using direct channels there is a kind of 'super' transaction which exists. I tried setting the propagation=REQUIRES_NEW for both the poller and the services (aService and bService) but i am not able to achieve individual transaction boundaries for the services.
    If i were to just create a REQUIRES_NEW propagation for aService then the poller doesnt kick off and send messages ? why ?
    what would be the best way to demarcate these transactions ? We wish to maintain the state of the messages as the pipeline performs business logic
    Our dao's use jpa transaction management.

    thanks in advance

  • #2
    Well, if you are using Poller than you are not using DirectChannels, but that's OK, just wanted to make sure you understand it.

    So let's say you do have DirectChannels (as you are showing in the config), what happens if you annotate your service methods with @Transactional(propagation=REQUIRES_NEW)?

    Also, could you please include your TX Manager configuration.

    Comment


    • #3
      Thanks,

      i tried this config

      Code:
      <int-jdbc:inbound-channel-adapter channel="tablePoller" 
      	query="select * from table1 where column1 = 'value1' and column2 = 'value2' "
      	data-source="dataSource"
      	update="update table1 set column1 = 'value3' where id in (:id)" >
      		<int:poller fixed-rate="10000">
      			<int:transactional propagation="REQUIRES_NEW"/>
      		</int:poller>
      	</int-jdbc:inbound-channel-adapter>
      Then no messages are recvd on the nextChannel.

      If i annotate the service method with @Transactional(propagation=REQUIRES_NEW) without the above then it either does not receive messages or push messages to router.

      the transactionManager configuration
      Code:
      	<bean id="transactionManager" class="org.springframework.orm.jpa.JpaTransactionManager">
      		<property name="entityManagerFactory" ref="entityManagerFactory" />
      	</bean>
      
      	<!-- jpa -->
      	<bean id="jpaAdapter"
      		class="org.springframework.orm.jpa.vendor.HibernateJpaVendorAdapter">
      		<property name="database" value="POSTGRESQL" />
      		<property name="showSql" value="true" />
      	</bean>
      
      	<!-- entity manager -->
      	<bean id="entityManagerFactory"
      		class="org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean">
      		<property name="dataSource" ref="dataSource" />
      		<property name="jpaVendorAdapter" ref="jpaAdapter" />
      		<property name="persistenceUnitName" value="customPU" />
      		<property name="jpaProperties">
      			<props>
      				<prop key="hibernate.dialect">org.hibernatespatial.postgis.PostgisDialect</prop>
      			</props>
      		</property>
      	</bean>

      Comment


      • #4
        Let's try to isolate the problem.
        Can you try the following configuration
        Code:
        <int-jdbc:inbound-channel-adapter channel="tablePoller" 
        	query="select * from table1 where column1 = 'value1' and column2 = 'value2' "
        	data-source="dataSource"
        	update="update table1 set column1 = 'value3' where id in (:id)" >
        		<int:poller fixed-rate="5000">
        			<int:transactional />
        		</int:poller>
        </int-jdbc:inbound-channel-adapter>
        
        <int:channel id="tablePoller"/>
        
        <int:logging-channel-adapter channel="tablePoller" level="WARN" log-full-message="true"/>
        and see if you you can see a log statement every 5 sec

        Comment


        • #5
          i was trying at my end with different configurations.
          but the poller just stopped working even in a standalone scenario.
          but this has nothing to do with transactions for now though i would guess.
          the update statement does not work.
          "update table1 set column1 = 'value3' where id in (:id)"
          which is strange.


          Originally posted by oleg.zhurakousky View Post
          Let's try to isolate the problem.
          Can you try the following configuration
          Code:
          <int-jdbc:inbound-channel-adapter channel="tablePoller" 
          	query="select * from table1 where column1 = 'value1' and column2 = 'value2' "
          	data-source="dataSource"
          	update="update table1 set column1 = 'value3' where id in (:id)" >
          		<int:poller fixed-rate="5000">
          			<int:transactional />
          		</int:poller>
          </int-jdbc:inbound-channel-adapter>
          
          <int:channel id="tablePoller"/>
          
          <int:logging-channel-adapter channel="tablePoller" level="WARN" log-full-message="true"/>
          and see if you you can see a log statement every 5 sec

          Comment


          • #6
            Good, that means that the update statement results in error and poller generates ErrorMessage. Since you did not configure error-channel explicitly on the poller the ErrorMessage goes to the default errorChannel (channel by the name 'errorChannel')
            So what you can do to debug your issue is add another logger to the errorChannel
            Code:
            <int:logging-channel-adapter channel="errorChannel" level="WARN" log-full-message="true"/>

            Comment


            • #7
              There is a problem with the update statement but the poller does not generate any errormessage. But i had this working yesteday, the only difference is i have created a fresh development environment.
              Is there anyway i can look into what is happening in the adapter ?


              Code:
              <int-jdbc:inbound-channel-adapter channel="logput" 
              	query="select * from schema.table_test where status = 'something' and service = 'somevalue'"
              	data-source="dataSource"
              	update="update schema.table_test set status= 'newValue' where id in (:id)" >
              		<int:poller fixed-rate="10000" error-channel="errorChannel" >
              			<int:transactional />
              		</int:poller>
              	</int-jdbc:inbound-channel-adapter>
              Originally posted by oleg.zhurakousky View Post
              Good, that means that the update statement results in error and poller generates ErrorMessage. Since you did not configure error-channel explicitly on the poller the ErrorMessage goes to the default errorChannel (channel by the name 'errorChannel')
              So what you can do to debug your issue is add another logger to the errorChannel
              Code:
              <int:logging-channel-adapter channel="errorChannel" level="WARN" log-full-message="true"/>

              Comment


              • #8
                This somehow worked. I will go back to addressing the original problem.

                Comment


                • #9
                  It is probably related to this bug that was discovered last week and is already fixed https://jira.springsource.org/browse/INT-1952.
                  But the good part you are seeing the error message so the poller behaves as expected

                  Comment


                  • #10
                    coming to the original problem and your suggestion i tried ; my poller does work as requested.

                    The service activators are Services which do some business logic and update the state of a table accessed by aDao. The poller works on the same table. The transaction manager is jpa as provided before.

                    my problem is that i want to define individual transaction boundaries for each of my service activators.

                    a) If i use @Transactional(propagation=REQUIRES_NEW) on the first service and on the second service then my table state is still set to that of the poller ( the first in the pipeline).
                    b) If i use @Transactional(propagation=REQUIRES_NEW) on the first service and just @Transactional on the second service then my second service does not receive a message.
                    c) If my second service throws a checked exception i want to persist that state in db however the state in db is still that which invokes the poller. Since my poller is configured to run on that state again why does it not re-invoke ?

                    After a few tries with the above the update statement again stops working. (although polling is working fine).

                    what would be the way around this or which channel do i use ?

                    Code:
                    <int-jdbc:inbound-channel-adapter channel="tablePoller" 
                    	query="select * from table1 where column1 = 'value1' and column2 = 'value2' "
                    	data-source="dataSource"
                    	update="update table1 set column1 = 'value3' where id in (:id)" >
                    		<int:poller fixed-rate="10000">
                    			<int:transactional />
                    		</int:poller>
                    	</int-jdbc:inbound-channel-adapter>
                    	
                    		
                    	<!--  split List<id's> into id -->
                    	<int:splitter input-channel="filesPoller" output-channel="getid" ></int:splitter>
                    	
                    	<int:transformer input-channel="getid" expression="payload.get('id')" output-channel="nextChannel"></int:transformer>  
                    
                    	
                    	<!-- wish to maintain individual transaction here -->
                    	<bean id="aService" class="a.b.c.AService">
                    		<property name="aDao" ref="aDao"></property>
                    	</bean>
                    	<int:service-activator input-channel="nextChannel" ref="aService" method="aService" output-channel="router"></int:service-activator>
                    	
                    	<int:router input-channel="router" expression="payload.variable.contains('aValue') ? 'bChannel' : 'cChannel'"></int:router>
                    	
                    	
                    	<!-- wish to maintain individual transaction here -->
                    	<bean id="bService" class="a.b.c.BService"/>
                    		<property name="aDao" ref="aDao"></property>
                    	</bean>
                    	<int:service-activator input-channel="aChannel" ref="bService" method="bService" output-channel="stdout"></int:service-activator>
                    	
                    	<bean id="cService" class="a.b.c.CService"/>
                    		<property name="aDao" ref="aDao"></property>
                    	</bean>
                    	<int:service-activator input-channel="cChannel" ref="cService" method="cService" output-channel="stdout"></int:service-activator>

                    Comment


                    • #11
                      Hi
                      I have attached a sample project (maven) explaining my problem. it also has the script for postgres.
                      The config file is as described below
                      The poller polls rTable and updates a column.
                      After splitting the list and transforming them into individual results they are fed to aService.
                      the doBusiness1 is decorated with @Transactional and updates a column in rTable.

                      Since i required individual transaction boundaries for each service activator i tried to separate the transaction using @Transactional(propagation=REQUIRES_NEW). In this case rTable does not have the new value.

                      If i leave it as is i.e @Transactional the value is updated. But if the pipeline is further extended to another bService and suppose it fails then the value of the column in rTable is the initial one (the state at which the poller begins), but i wish to store the state of the service at which it failed. And if it does fail, then the poller does not trigger again even though the state of the table is the one in which it is supposed to trigger itself.

                      have i misunderstood something ?



                      Code:
                      	<tx:annotation-driven/>
                      	<context:annotation-config></context:annotation-config>
                      	  
                      	  
                      	 <!-- test to push data --> 
                      	<int:channel id="input"></int:channel>
                      	<int-jdbc:outbound-channel-adapter 
                      		query="insert into abc.rtable (service, service_status, tracking_uuid) values (:payload.service,:payload.serviceStatus,:payload.trackingUuid)"
                      		data-source="dataSource" channel="input" />
                      	
                      	
                      	
                      	<int-jdbc:inbound-channel-adapter channel="tablePoller" 
                      	query="select * from abc.rtable where service_status = 'start' and service = 'aService' and tracking_uuid is not null "
                      	data-source="dataSource"
                      	update="update abc.rtable set service_status= 'aService', service='starting' where id in (:id)" >
                      		<int:poller fixed-rate="10000" error-channel="errorChannel" >
                      			<int:transactional/>
                      		</int:poller>
                      	</int-jdbc:inbound-channel-adapter>
                      	
                      
                      	
                      	<int:splitter input-channel="tablePoller" output-channel="detectUUID" ></int:splitter>
                      	<int:transformer input-channel="detectUUID" expression="payload.tracking_uuid" output-channel="aChannel"></int:transformer>  
                      
                      
                          <!-- required an individual transaction boundary here-->	
                      	<bean id="rDao" class="a.b.c.RDao"></bean>
                      	<bean id="aService" class="a.b.c.AService">
                      		<property name="rdao" ref="rDao"></property>
                      	</bean>
                      	<int:service-activator input-channel="aChannel" ref="aService" method="doBusiness1" output-channel="logChannel"></int:service-activator>
                      	
                      	
                      	<int:channel id="logChannel"></int:channel>
                      	<int:logging-channel-adapter channel="logChannel" level="WARN" log-full-message="true"></int:logging-channel-adapter>
                      	
                      	<int:channel id="errorChannel"></int:channel>
                      	<int:logging-channel-adapter channel="errorChannel" level="WARN" log-full-message="true"></int:logging-channel-adapter>	
                      	
                      	<!-- datasource -->
                      	
                      	<bean id="dataSource" class="org.apache.commons.dbcp.BasicDataSource">
                      		<property name="driverClassName" value="org.postgresql.Driver" />
                      		<property name="url" value="${url}" />
                      		<property name="username" value="postgres" />
                      		<property name="password" value="${pwd}" />
                      	</bean>
                      
                      	<bean id="transactionManager" class="org.springframework.orm.jpa.JpaTransactionManager">
                      		<property name="entityManagerFactory" ref="entityManagerFactory" />
                      	</bean>
                      
                      	<!-- jpa -->
                      	<bean id="jpaAdapter"
                      		class="org.springframework.orm.jpa.vendor.HibernateJpaVendorAdapter">
                      		<property name="database" value="POSTGRESQL" />
                      		<property name="showSql" value="true" />
                      	</bean>
                      
                      	<!-- entity manager -->
                      	<bean id="entityManagerFactory"
                      		class="org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean">
                      		<property name="dataSource" ref="dataSource" />
                      		<property name="jpaVendorAdapter" ref="jpaAdapter" />
                      		<property name="persistenceUnitName" value="myPU" />
                      		<property name="jpaProperties">
                      			<props>
                      				<prop key="hibernate.dialect">org.hibernatespatial.postgis.PostgisDialect</prop>
                      			</props>
                      		</property>
                      	</bean>
                      
                      </beans>

                      Comment


                      • #12
                        After reading the a little more carefully.
                        I changed my channels from direct to queue channels. and it works as my requirement was quite different from what was obviously stated in the documentation for direct channel.
                        The key motivation for providing a channel implementation with this behavior is to support transactions that must span across the channel while still benefiting from the abstraction and loose coupling that the channel provides

                        Comment


                        • #13
                          Right, but with Polling channels you also change the protocol for Message exchange from 'PUSH' to "PULL"
                          If you still want to maintain a PUSH behavior you may want to use this hybrid service-activator-gateway pattern with the async channel. I'll explain
                          Code:
                          <int:channel id="serviceInputChannel">
                               <int:dispatcher task-executor="executor"/>
                          <int:channel>
                          
                          
                          <int:gateway id="txGateway" service-interface="foo.Bar" default-request-channel="txBeginChannel"/>
                          
                          <int:service-activator input-channel="serviceInputChannel" output-channel="serviceOutputChannel" ref="txGateway">
                          
                          <task:executor id="executor" pool-size="10"/>
                          
                          // define your other channels
                          
                          
                          public interface Bar {
                               @Transactional
                               public Foo process(Something something);
                          }
                          So in the above we have 'serviceInputChannel' which is the async channel as it uses TaskExecutor. This is an input channel to service-activator. The bean method for this service-activator could obviously be annotated as Transactional with whatever attributes you want. However to isolate Transaction requirements from the actual service in the above configuration I am using slightly different approach. My service-activator is referencing a Gateway (Bar) which defines a method which is annotated with @Transactional. So now my service simply invokes the gateway which essentially marks a beginning of transaction without modifying any classes. In other words your service classes might need to be Transactional in some scenarios and non-transactional in others or they might have different transactional requirements, so instead of embedding those in the service class, you can use this hybrid service-activator-gateway pattern (we made it up ) to keep it separately.
                          Does it make sense?

                          Comment


                          • #14
                            I was essentially creating wrappers around actual services i wanted to invoke to achieve pretty much the same thing.
                            If i understand your approach, I could use my wrappers to implement this interface and then inject the wrappers with the services to consume payloads.
                            I will give it a try in our next iteration.

                            Thanks for your help.

                            Comment

                            Working...
                            X