Announcement Announcement Module
Collapse
No announcement yet.
Group message and JMS transaction Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Group message and JMS transaction

    Dear All,

    I have a simple question.
    Can I handling all elements (messages) of a group message in one JMS transaction?

    The message handling without transaction is ready. But if I have a problem one of them element, then only the last will be rollback. And my problem a bit more difficult. The processing of the message not finished when message receiving. I would have to rollback every part of group, when the processing of the message doesn't successfully. (Wrong xml, invalid message type, etc. That type of error, what is the processing error.)

    My configuration:

    Code:
    	<jms:message-driven-channel-adapter 
    		id="MQInputBean"
    		destination="inputDestination"
    		connection-factory="mqConnFact"
    		channel="inputChannel" 
    		error-channel="errorChannel"
    		auto-startup="true"
    		transaction-manager="jmsTrnManager"  
    		acknowledge="transacted"/> 
    
    	             
    	<bean id="testTransformerBean" class="MQTransformer" />
    
    	<bean id="messAggregator" class="MQMessageAggregator" />
    	
    	<bean id="inputRouterProc" class="InputRouterProc" />
    
    	<int:channel id="routingMessage" />
    
    	<int:chain input-channel="inputChannel" output-channel="routingMessage">
    		<int:transformer ref="testTransformerBean" method="transform" />
    		<int:aggregator ref="messAggregator" method="add" 
    		                release-strategy="messAggregator" release-strategy-method="messageEnd" 
    		                expire-groups-upon-completion="true"
    		                correlation-strategy="messAggregator" correlation-strategy-method="getCorrelId" />
    	</int:chain>
    
    	<int:router id="inputRouter" input-channel="routingMessage" ref="inputRouterProc" />
            
            Target of router:
    
    	<int:chain input-channel="startHolydayProcess" >
    		<int-xml:xpath-splitter create-documents="true" >
    			<int-xml:xpath-expression expression="/xxxx/APPLICATION[APP.NAME='HOLIDAY']/RECORD" />
    		</int-xml:xpath-splitter>
    		<int:transformer ref="holidayUnmarshaller" />
    		<int:aggregator />
    		<int:service-activator ref="holidayProcessor" method="process" />
    	</int:chain>
    
    	<int:chain input-channel="startGlobusProcess"> 
    		<int:service-activator ref="processXmlObjConv" method="getRpObj" />
    		<int:service-activator ref="processGlobusAnswer" method="procAnswer" />
    	</int:chain>
    The transaction would be finish here.

    Do you have any idea?

    Thank you!
    Feri

  • #2
    No; each inbound message from the message-driven-adapter is in its own transaction.

    You would need your own custom code (not a message listener container) to consume multiple messages in the same transaction, and then send them into your Spring Integration flow, perhaps using a <gateway/>.

    Comment


    • #3
      Hi Gary,

      Thank you your fast answer.
      So you tell me that, I have to write my own poller, message getter and aggregator, what solve the transaction handling?
      What do you think, can I use the API, or I need the JMS?

      Thx!
      Feri

      Comment


      • #4
        Use a JmsTemplate configured to create transactional sessions.

        Use the execute() method with a SessionCallback - create a Consumer, receive messages from the consumer - build a list of payloads send them to your <router/> using a <gateway/>. When the thread returns call session.commit() (or rollback() if you catch an exception).

        Comment


        • #5
          Spring Batch provides something along these lines:
          http://static.springsource.org/sprin...Container.html

          Comment


          • #6
            Hi Gary,

            I try to do, what you suggest, but I thing my knowing not enough.

            My new config:
            Code:
            	<bean id="mqConnFact" class="com.ibm.mq.jms.MQQueueConnectionFactory"> 
            		<property name="hostName" value="${MQ_HOST}" />
            		<property name="port" value="${MQ_PORT}" />
            		<property name="queueManager" value="${MQ_QUEUE_MANAGGER}" />
            		<property name="CCSID" value="${MQ_CCSID}" />
             		<property name="channel" value="${MQ_CHANNEL}" />
            		<property name="transportType">
            			<util:constant static-field="com.ibm.mq.jms.JMSC.MQJMS_TP_CLIENT_MQ_TCPIP" />
            		</property>
            	</bean>
            
                <bean id="jmsTrnManager" class="org.springframework.jms.connection.JmsTransactionManager">
                    <property name="connectionFactory" ref="mqConnFact" />
                    <property name="" />
                </bean>
            
            	<int:annotation-config />
            
            	<bean id="inputDestination" class="com.ibm.mq.jms.MQQueue">
            		<constructor-arg value="${MQ_OUTPUT_QUEUE}" />
            	</bean>
            
            	<!-- JMS Queue Template -->
            	<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
            		<property name="connectionFactory" ref="jmsQueueConnectionFactory" />
            		<property name="destinationResolver" ref="jmsDestinationResolver" />
            		<property name="pubSubDomain" value="false" />
            		<property name="receiveTimeout" value="20000" />
            		<property name="sessionTransacted" value="true"/>
            	</bean>
             	
                <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
                    <property name="connectionFactory" ref="mqConnFact" />
                    <property name="sessionTransacted" value="true" />
                    <property name="transactionManager" ref="jmsTrnManager" />
                    <property name="destination" ref="inputDestination" />
                    <property name="messageListener" ref="imputXMLByTemplate" />
                    <property name="autoStartup" value="true" />
                </bean>
            The listener proc:
            Code:
                @Transactional(value="jmsTrnManager", rollbackFor={Exception.class})
                public void onMessage(javax.jms.Message message)
                {
                    Message<Object> convertedMessage = null;
            
                    try
                    {
                        Map<String, Object> mappedHeaders = this.headerMapper.toHeaders(message);
                        MessageConverter converter = this.jmsTemplate.getMessageConverter();
                        Object convertedObject = converter.fromMessage(message);
                        MessageBuilder<Object> builder;
                        builder = (convertedObject instanceof Message) ? MessageBuilder.fromMessage((Message<Object>) convertedObject) : MessageBuilder.withPayload(convertedObject);
                        convertedMessage = builder.copyHeadersIfAbsent(mappedHeaders).build();
            
                        MessagingTemplate mt = new MessagingTemplate();
                        mt.send(inputChannel, convertedMessage);
                    }
                    catch (Exception e)
                    {
                        e.printStackTrace();
                    }
                }
            But it is not working.
            Could you send me an example?

            Thank you!
            Feri

            Comment


            • #7
              As Mark pointed out above, probably the best solution is to use the BatchMessageListenerContainer instead of the DefaultMessageListenerContainer.

              But, there's an example of how to use the JmsTemplate with a SessionCallback here... http://forum.springsource.org/showth...842#post441842.

              That example shows how to batch acks; you'd use a similar technique to commit the session after some number of messages.

              Comment


              • #8
                Hi Mark,

                I have a small problem. I try to use that container, what you suggest, but I don't find. I take my pom the spring-batch-core and spring-batch-integration, but these didn't download that jar, what contains this method.
                Could you help me?

                Thank you!
                Feri

                Comment

                Working...
                X