Announcement Announcement Module
Collapse
No announcement yet.
CannotCreateTransactionException when using splitter and dispatcher Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • CannotCreateTransactionException when using splitter and dispatcher

    I have a chain that uses a splitter that splits a request into 10 separate threads using a dispatcher/task-executor.

    We use Spring Data repositories to retrieve data. While I am able to retrieve data before the splitter splits the request, when querying for data post the split, I get CannotCreateTransactionException.

    Code:
    Caused by: org.springframework.transaction.CannotCreateTransactionException: Could not open JPA EntityManager for transaction; nested exception is java.lang.NullPointerException
    	at org.springframework.orm.jpa.JpaTransactionManager.doBegin(JpaTransactionManager.java:382)
    	at org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:371)
    	at org.springframework.transaction.interceptor.TransactionAspectSupport.createTransactionIfNecessary(TransactionAspectSupport.java:335)
    	at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:105)
    	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
    	at org.springframework.dao.support.PersistenceExceptionTranslationInterceptor.invoke(PersistenceExceptionTranslationInterceptor.java:155)
    	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
    My pipeline configuration is as follows:
    Code:
    	<si:chain input-channel="emsValuationServiceIn" output-channel="requestSplitterChannel">
    		<si:filter ref="requestFilter" method="accept" />
    		<si:transformer ref="valuationProcessor" />
    		<si:splitter ref="positionContextSplitter" method="splitRequest" />
    	</si:chain>
                  <si:channel id="requestSplitterChannel" >
    		<si:dispatcher task-executor="positionRequestTaskExecutor" />
    	</si:channel>
                <bean id="positionRequestTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" >
    		<property name="corePoolSize" value="10" />
    		<property name="daemon" value="false" />
    	</bean>
    <si:chain input-channel="requestSplitterChannel" output-channel="aggregatorRequestChannel"  >
    		<si:transformer ref="processorA" />
    		<si:transformer ref="processorB" />
    	</si:chain>
    The documentation of executor channel does mention that:"this does break the "single-threaded" execution context between sender and receiver so that any active transaction context will not be shared by the invocation of the handler". So since a new transaction would be needed post the 'requestSplitterChannel' above, we did add @Transactional to processorA and processorB, assuming that would tell Spring to create a new transaction within those threads. But that does not work and gives us the exception mentioned above.

    Is there a way of introducing a new transaction here? Any help would be appreciated.

    P.S: I am using Spring Integration 2.0.3.RELEASE.

  • #2
    Can you post the complete exception stack trace?

    Comment


    • #3
      Here it is:

      Code:
      2011-11-18 12:07:36,904 ERROR (positionRequestTaskExecutor-1)[notification.ExceptionNotificationHandler] 
       Error occured - processing on item will not continue 
      org.springframework.integration.transformer.MessageTransformationException: org.springframework.integration.MessageHandlingException: com.nomura.spb.infrastructure.exception.notification.ErrorException: Error processing message in the pipeline
      	at org.springframework.integration.transformer.MessageTransformingHandler.handleRequestMessage(MessageTransformingHandler.java:73)
      	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:98)
      	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
      	at org.springframework.integration.handler.MessageHandlerChain$1.send(MessageHandlerChain.java:150)
      	at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:288)
      	at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:149)
      	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendMessage(AbstractReplyProducingMessageHandler.java:176)
      	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendReplyMessage(AbstractReplyProducingMessageHandler.java:160)
      	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.produceReply(AbstractReplyProducingMessageHandler.java:125)
      	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleResult(AbstractReplyProducingMessageHandler.java:119)
      	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:101)
      	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
      	at org.springframework.integration.handler.MessageHandlerChain$1.send(MessageHandlerChain.java:150)
      	at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:288)
      	at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:149)
      	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendMessage(AbstractReplyProducingMessageHandler.java:176)
      	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendReplyMessage(AbstractReplyProducingMessageHandler.java:160)
      	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.produceReply(AbstractReplyProducingMessageHandler.java:125)
      	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleResult(AbstractReplyProducingMessageHandler.java:119)
      	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:101)
      	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
      	at org.springframework.integration.handler.MessageHandlerChain.handleMessageInternal(MessageHandlerChain.java:133)
      	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
      	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:110)
      	at org.springframework.integration.dispatcher.UnicastingDispatcher.access$000(UnicastingDispatcher.java:51)
      	at org.springframework.integration.dispatcher.UnicastingDispatcher$1.run(UnicastingDispatcher.java:92)
      	at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:52)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
      	at java.lang.Thread.run(Thread.java:619)
      Caused by: org.springframework.integration.MessageHandlingException: com.nomura.spb.infrastructure.exception.notification.ErrorException: Error processing message in the pipeline
      	at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:76)
      	at org.springframework.integration.transformer.AbstractMessageProcessingTransformer.transform(AbstractMessageProcessingTransformer.java:56)
      	at org.springframework.integration.transformer.MessageTransformingHandler.handleRequestMessage(MessageTransformingHandler.java:67)
      	... 29 more
      Caused by: com.nomura.spb.infrastructure.exception.notification.ErrorException: Error processing message in the pipeline
      	at com.nomura.spb.infrastructure.exception.notification.PublishingExceptionNotifier.error(PublishingExceptionNotifier.java:28)
      	at com.nomura.spb.services.valuationservice.pipeline.base.PositionContextTransformer.handleUnexpectedException(PositionContextTransformer.java:21)
      	at com.nomura.spb.services.valuationservice.pipeline.base.ExceptionHandlingPipelineElement.doHandle(ExceptionHandlingPipelineElement.java:39)
      	at com.nomura.spb.services.valuationservice.pipeline.base.TransformerPipelineElement.handle(TransformerPipelineElement.java:32)
      	at sun.reflect.GeneratedMethodAccessor400.invoke(Unknown Source)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
      	at java.lang.reflect.Method.invoke(Method.java:597)
      	at org.springframework.expression.spel.support.ReflectiveMethodExecutor.execute(ReflectiveMethodExecutor.java:69)
      	at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:84)
      	at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:57)
      	at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:102)
      	at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:102)
      	at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:114)
      	at org.springframework.integration.util.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:223)
      	at org.springframework.integration.util.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:123)
      	at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:73)
      	... 31 more
      Caused by: com.nomura.spb.services.valuationservice.exception.PipelineException: com.nomura.spb.services.valuationservice.exception.SwapException: org.springframework.transaction.CannotCreateTransactionException: Could not open JPA EntityManager for transaction; nested exception is java.lang.NullPointerException
      	at com.nomura.spb.services.valuationservice.exception.ExceptionContext.forPipeline(ExceptionContext.java:69)
      	... 46 more
      Caused by: com.nomura.spb.services.valuationservice.exception.SwapException: org.springframework.transaction.CannotCreateTransactionException: Could not open JPA EntityManager for transaction; nested exception is java.lang.NullPointerException
      	... 47 more
      Caused by: org.springframework.transaction.CannotCreateTransactionException: Could not open JPA EntityManager for transaction; nested exception is java.lang.NullPointerException
      	at org.springframework.orm.jpa.JpaTransactionManager.doBegin(JpaTransactionManager.java:382)
      	at org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:371)
      	at org.springframework.transaction.interceptor.TransactionAspectSupport.createTransactionIfNecessary(TransactionAspectSupport.java:335)
      	at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:105)
      	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
      	at org.springframework.dao.support.PersistenceExceptionTranslationInterceptor.invoke(PersistenceExceptionTranslationInterceptor.java:155)
      	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
      	at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:202)
      	at $Proxy241.findOne(Unknown Source)
      	at com.nomura.spb.services.valuationservice.pipeline.DividendCashflowPropertyUtility.getMarketBySecurity(DividendCashflowPropertyUtility.java:55)
      	at com.nomura.spb.services.valuationservice.pipeline.DividendCashflowPropertyUtility.getDividendConventionBySecurity(DividendCashflowPropertyUtility.java:39)
      	at com.nomura.spb.services.valuationservice.pipeline.DividendsProcessor.process(DividendsProcessor.java:132)
      	at com.nomura.spb.services.valuationservice.pipeline.base.ExceptionHandlingPipelineElement.doHandle(ExceptionHandlingPipelineElement.java:37)
      	... 44 more
      Caused by: java.lang.NullPointerException
      	at org.hibernate.transaction.JDBCTransaction.begin(JDBCTransaction.java:81)
      	at org.hibernate.impl.SessionImpl.beginTransaction(SessionImpl.java:1473)
      	at org.hibernate.ejb.TransactionImpl.begin(TransactionImpl.java:60)
      	at org.springframework.orm.jpa.DefaultJpaDialect.beginTransaction(DefaultJpaDialect.java:70)
      	at org.springframework.orm.jpa.vendor.HibernateJpaDialect.beginTransaction(HibernateJpaDialect.java:57)
      	at org.springframework.orm.jpa.JpaTransactionManager.doBegin(JpaTransactionManager.java:332)
      	... 56 more

      Comment


      • #4
        Hi Pankil,
        I developed a similar sample where we get a message over a channel to the splitter which in turn puts the split messages over an executor output channel.
        The executor channel is an input to a transformer which transforms the message and puts it over another output channel. Now this transformer also stores the original and transformed message in the database using JPA and i have put a @Transactional annotation around the method and injected an EntityManager in the bean.
        This whole setup seems to work fine with me without any problem

        What version of Hibernate Entity manager are you using? Can you possible try using a different version of the Entity manager? I have used version 3.5.5-Final for entity manager and spring 3.0.6.

        Lets see if this solves your problem, else we can look at some other way to sort the problem out.

        Comment


        • #5
          Thanks Amol for trying this.

          We use 3.1.0.M2 Spring and 3.6.0.FINAL Hibernate Entity Manager. Would it be possible for you to attach your sample project if it is not too huge?

          Thanks again!

          Comment


          • #6
            Hi Pankil,

            I went through your stack trace and i guess your @Transformer method in the transformer bean need to be looked. I'll try attaching the code here, meanwhile can you possible use the versions i recommended for spring and hibernate and check if we face the same issue?

            Comment


            • #7
              The Spring config file used

              Code:
              <?xml version="1.0" encoding="UTF-8"?>
              <beans xmlns="http://www.springframework.org/schema/beans"
              	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
              	xmlns:integration="http://www.springframework.org/schema/integration"
              	xmlns:tx="http://www.springframework.org/schema/tx"
              	xmlns:task="http://www.springframework.org/schema/task"	
              	xsi:schemaLocation=
              	"http://www.springframework.org/schema/beans http://www.springframework.org/schem...-beans-3.0.xsd
              	http://www.springframework.org/schema/task	http://www.springframework.org/schem...g-task-3.0.xsd
              	http://www.springframework.org/schema/integration http://www.springframework.org/schem...ration-2.1.xsd
              	http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd">
              	
              	<tx:annotation-driven/>
              	
              	<integration:channel id="inputChannel"/>
              	
              	<integration:channel id="spiltterOutputChannel">
              		<integration:dispatcher task-executor="taskExecutor"/>
              	</integration:channel>
              	
              	<integration:channel id="outputChannel"/>
              	
              	<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" >
              		<property name="corePoolSize" value="10" />	
              	</bean>
              	
              	<bean id="splitterBean" class="com.example.integration.test.forum.SplitterBean"/>
              	<integration:splitter input-channel="inputChannel" output-channel="spiltterOutputChannel" ref="splitterBean"/>
              	
              	<bean id="transformer" class="com.example.integration.test.forum.TransformerBean"/>
              	
              	<integration:transformer input-channel="spiltterOutputChannel" output-channel="outputChannel" ref="transformer"/>
              	
              	 
              	
              	<bean id="dataSource" class="org.springframework.jdbc.datasource.DriverManagerDataSource">
              		<property name="driverClassName" value="org.apache.derby.client.ClientXAConnection"/>
              		<property name="url" value="jdbc:derby://localhost:1527/TestDatabase"/>
              	</bean>	
              		
              
              	 
              	<bean class="org.springframework.orm.jpa.support.PersistenceAnnotationBeanPostProcessor"/>
              	 
              	<bean id="lc" class="org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean">
              		<property name="dataSource" ref="dataSource"/>	
              	</bean>	
              
              	
              	<bean id="transactionManager" class="org.springframework.orm.jpa.JpaTransactionManager">
              		<constructor-arg ref="lc"/>		
              	</bean>
              	 
              	
              </beans>
              The Splitter bean
              Code:
              public class SplitterBean {
              
              	
              	@Splitter
              	public Collection<String> split(String payload) {
              		String[] splits = payload.split(",");		
              		return Arrays.asList(splits);
              	}
              }
              and transformer

              Code:
              public class TransformerBean {
              
              	@PersistenceContext
              	private EntityManager em;
              
              	@Transformer
              	@Transactional(propagation=Propagation.REQUIRED)
              	public String transform(String payload) {
              		String transformed = payload.toUpperCase();
              		SplitAudit audit = new SplitAudit();
              		audit.setSplit(transformed);
              		audit.setTime(new Date());
              		em.persist(audit);
              		return transformed;
              	}
              }
              SplitAudit is a simple POJO JPA entity and hence not included (please note the example is a very poor use case but just implemented to check if the problem is faced )

              Input to the splitter is a simple String message with payload say "test,test1,test2"

              Also can you try invoking the method annotated with @Transformer of your transformer bean from a junit providing the method some dummy input value? It would be interesting to see the result. It should ideally be failing there too.

              Comment


              • #8
                Thanks Amol for trying this out. We tried to replicate the issue with your config.

                If we make TransformerBean implement any interface e.g. Cloneable we get the stack below, the only way around it seems to be to make it also implement Transformer.

                Code:
                Caused by: org.springframework.expression.spel.SpelEvaluationException: EL1004E:(pos 8): Method call: Method transform(org.springframework.core.convert.TypeDescriptor) cannot be found on $Proxy147 type
                	at org.springframework.expression.spel.ast.MethodReference.findAccessorForMethod(MethodReference.java:182)
                	at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:106)
                	at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:57)
                	at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:102)
                	at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:102)
                	at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:114)
                	at org.springframework.integration.util.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:223)
                	at org.springframework.integration.util.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:123)
                	at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:73)

                Can anyone from spring integration team comment on whether this is an issue and can be fixed?

                Comment


                • #9
                  Hi Pankil,

                  Any reason why you want the transformer bean to implement the Cloneable interface?

                  You are getting this exception because, the proxy generated is a JDK proxy the moment you implement an interface in your bean. Using CGLIB proxy should solve your problem

                  You may implement any interfaces you want but add <aop:config proxy-target-class="true"/> to your spring config file.

                  This will ensure CGLIB proxies are generated for all eligible beans and your exception will go away.

                  Comment


                  • #10
                    The CGLIB proxy should indeed solve the immediate issue, OR you should be able to simply provide the method name in the config of your "transformer" element (e.g. method="transform" in addition to the ref).

                    Otherwise, you can also provide your own interface (like Transformer with the transform() method). That way that interface will also be implemented by the JDK proxy. If you take that approach, try moving the @Transformer annotation to the interface method declaration instead of the implementation.

                    Comment


                    • #11
                      Thanks all, just to answer Amol, we do not actually want to implement Cloneable it was just an example to try with the simple project posted on the forum.

                      One of my team mates has been trying a few things and this is what was found:
                      Providing the method name in the transformer config does not work but providing my own Transformer interface or adding <aop:config proxy-target-class="true"/> does.

                      We have got it working with the <aop:config proxy-target-class="true"/> approach however it seems if we have one of our JPA repository interfaces with any method that is @Transactional we get the exception below, we do not actually need to make this method @Transactional but would be interested to know for future how to resolve this.

                      Code:
                      Caused by: org.springframework.aop.framework.AopConfigException: Could not generate CGLIB subclass of class [class $Proxy132]: Common causes of this problem include using a final class or a non-visible class; nested exception is java.lang.IllegalArgumentException: Cannot subclass final class class $Proxy132
                      	at org.springframework.aop.framework.Cglib2AopProxy.getProxy(Cglib2AopProxy.java:212)
                      	at org.springframework.aop.framework.ProxyFactory.getProxy(ProxyFactory.java:112)
                      	at org.springframework.aop.framework.autoproxy.AbstractAutoProxyCreator.createProxy(AbstractAutoProxyCreator.java:476)
                      	at org.springframework.aop.framework.autoproxy.AbstractAutoProxyCreator.wrapIfNecessary(AbstractAutoProxyCreator.java:362)
                      	at org.springframework.aop.framework.autoproxy.AbstractAutoProxyCreator.postProcessAfterInitialization(AbstractAutoProxyCreator.java:322)
                      	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsAfterInitialization(AbstractAutowireCapableBeanFactory.java:407)
                      	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.postProcessObjectFromFactoryBean(AbstractAutowireCapableBeanFactory.java:1563)
                      	at org.springframework.beans.factory.support.FactoryBeanRegistrySupport.doGetObjectFromFactoryBean(FactoryBeanRegistrySupport.java:162)
                      	... 38 more
                      Caused by: java.lang.IllegalArgumentException: Cannot subclass final class class $Proxy132
                      	at net.sf.cglib.proxy.Enhancer.generateClass(Enhancer.java:446)
                      	at net.sf.cglib.transform.TransformingClassGenerator.generateClass(TransformingClassGenerator.java:33)
                      	at net.sf.cglib.core.DefaultGeneratorStrategy.generate(DefaultGeneratorStrategy.java:25)
                      	at net.sf.cglib.core.AbstractClassGenerator.create(AbstractClassGenerator.java:216)
                      	at net.sf.cglib.proxy.Enhancer.createHelper(Enhancer.java:377)
                      	at net.sf.cglib.proxy.Enhancer.create(Enhancer.java:285)
                      	at org.springframework.aop.framework.Cglib2AopProxy.getProxy(Cglib2AopProxy.java:200)
                      	... 45 more

                      Comment


                      • #12
                        CGLIB creates a proxy by subclassing the class and overriding all the methods. If a class is marked 'private' then it can't be subclassed (as the exception states). It also can't override final methods.

                        I prefer the first solution (adding an interface to your transformer). This is generally considered good practice anyway (coding to interfaces), and it avoids these issues with forcing CGLIB proxying.

                        Comment


                        • #13
                          I agree programming to interfaces is a better solution so will probably just go with this if we need to but we do not have a final or private class so I guess itís the proxies generated by JPA are the problem.

                          If you change the simple example posted by Amol to introduce a gateway like this you will also get the same exception. I guess any generated proxy has to be final?

                          Code:
                              <integration:service-activator input-channel="spiltterOutputChannel" output-channel="outputChannel" ref="transform.tx" method="transform"/>
                              <integration:gateway id="transform.tx" default-request-channel="transformChannel" service-interface="simple.Transformer" />
                              
                              <bean id="transformer" class="com.nomura.spb.services.valuationservice.TransformerBean"/>
                              <integration:transformer input-channel="transformChannel" ref="transformer"/>
                          
                          public interface Transformer {
                          
                              @Transactional
                              public String transform(String payload);
                          
                          }

                          Comment

                          Working...
                          X