Announcement Announcement Module
Collapse
No announcement yet.
Error handling using message-driven-channel-adapter Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Error handling using message-driven-channel-adapter

    Hi,

    I'm using spring integration 2.1.3.RELEASE with ActiveMQ.
    This is my configuration :

    Code:
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:context="http://www.springframework.org/schema/context"
    	xmlns:p="http://www.springframework.org/schema/p" xmlns:int-stream="http://www.springframework.org/schema/integration/stream" xmlns:mongo="http://www.springframework.org/schema/data/mongo"
    	xmlns:int-jmx="http://www.springframework.org/schema/integration/jmx" xmlns:int-jms="http://www.springframework.org/schema/integration/jms" xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp"
    	xsi:schemaLocation="http://www.springframework.org/schema/data/mongo http://www.springframework.org/schema/data/mongo/spring-mongo-1.0.xsd
            http://www.springframework.org/schema/integration/amqp http://www.springframework.org/schema/integration/amqp/spring-integration-amqp-2.1.xsd
            http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.1.xsd
            http://www.springframework.org/schema/integration/jmx http://www.springframework.org/schema/integration/jmx/spring-integration-jmx-2.1.xsd
            http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms-2.1.xsd
            http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream-2.1.xsd
            http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd">
    
    	<!-- *************** 1. General Configuration *************** -->
    	
        <!-- 1.11 Model to Message converter -->
        <bean id="jaxbMessageConverter" class="ca.sunmedia.test.converter.JaxbMessageConverter">
           <property name="marshallerProperties">
          <map>
            <entry>
                <key>
                   <value>jaxb.encoding</value>
                </key>
                <value type="java.lang.String">UTF-8</value>
            </entry>
             <entry>
                <key>
                   <value>jaxb.formatted.output</value>
                </key>
                   <value type="java.lang.Boolean">true</value>
              </entry>
            </map>
        </property>
            <property name="classesToBeBound">
                <list>
                    <value>ca.sunmedia.test.model.Bidon</value>
                </list>
            </property>
        </bean>	
    
    
    	<!-- 1.2 Activates various annotations to be detected in bean classes -->
    	<context:annotation-config />
    
    	<!-- 1.3 Activate the trace of all message history -->
    	<int:message-history />
    
    	<!-- 1.4 Component Scanning -->
    
    	<context:component-scan base-package="ca.sunmedia.test.service.impl" />
    
    
    	<!-- *************** 2 ActiveMQ configurations *************** -->
    
    	<!-- 2.1 ActiveMQ Conntection configuration -->
    
    	<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
    		<property name="targetConnectionFactory">
    			<bean class="org.apache.activemq.ActiveMQConnectionFactory">
    				<property name="brokerURL" value="tcp://localhost:61616" />
    			</bean>
    		</property>
    		<property name="sessionCacheSize" value="10" />
    		<property name="cacheProducers" value="false" />
    	</bean>
    		
    	<int:channel id="entryChannel" />
    	<int:channel id="afterSplitterChannel" />
    	<int:channel id="afterProcessChannel" />
    	<int:channel id="afterAggregatorChannel" />
    
    	<bean id="entryQueue" class="org.apache.activemq.command.ActiveMQQueue">
    		<constructor-arg value="1.entry" />
    	</bean>
    	<int-jms:message-driven-channel-adapter channel="entryChannel" destination="entryQueue" message-converter="jaxbMessageConverter"/>
    	<int-jms:outbound-channel-adapter channel="entryChannel" destination="entryQueue" explicit-qos-enabled="true" delivery-persistent="true" extract-payload="true" message-converter="jaxbMessageConverter"/>
    	
    	<bean id="afterSplitterQueue" class="org.apache.activemq.command.ActiveMQQueue">
    		<constructor-arg value="2.afterSplitter" />
    	</bean>	
    	<int-jms:message-driven-channel-adapter channel="afterSplitterChannel" destination="afterSplitterQueue" message-converter="jaxbMessageConverter"/>
    	<int-jms:outbound-channel-adapter channel="afterSplitterChannel" destination="afterSplitterQueue" explicit-qos-enabled="true" delivery-persistent="true" extract-payload="true" message-converter="jaxbMessageConverter"/>
    	
    	<bean id="afterProcessQueue" class="org.apache.activemq.command.ActiveMQQueue">
    		<constructor-arg value="3.afterProcess" />
    	</bean>	
    	<int-jms:message-driven-channel-adapter channel="afterProcessChannel" destination="afterProcessQueue" message-converter="jaxbMessageConverter"/>
    	<int-jms:outbound-channel-adapter channel="afterProcessChannel" destination="afterProcessQueue" explicit-qos-enabled="true" delivery-persistent="true" extract-payload="true" message-converter="jaxbMessageConverter"/>
    		
    	<bean id="afterAggregatorQueue" class="org.apache.activemq.command.ActiveMQQueue">
    		<constructor-arg value="4.afterAggregator" />
    	</bean>	
    	<int-jms:message-driven-channel-adapter channel="afterAggregatorChannel" destination="afterAggregatorQueue" message-converter="jaxbMessageConverter" />
    	<int-jms:outbound-channel-adapter channel="afterAggregatorChannel" destination="afterAggregatorQueue" explicit-qos-enabled="true" delivery-persistent="true" extract-payload="true" message-converter="jaxbMessageConverter"/>
    	 
    	<int:splitter input-channel="entryChannel" output-channel="afterSplitterChannel" ref="bidonService" method="split" />
    	<int:service-activator input-channel="afterSplitterChannel" output-channel="afterProcessChannel" ref="bidonService" method="processMessage" />
    	
    	<!-- 
    	<bean id="JMSCorrelationStrategy"
            class="ca.sunmedia.test.aggregator.strategy.JMSCorrelationStrategy">
        </bean>
        -->
        
    	<int:aggregator input-channel="afterProcessChannel" output-channel="afterAggregatorChannel" ref="bidonService" method="aggregate" correlation-strategy-method="getCorrelationKey"/>
    	<int:service-activator input-channel="afterAggregatorChannel" ref="bidonService" method="readAggregatedResult" />
    	
    
    	
    	
    </beans>
    The services are in this class :

    Code:
    package ca.sunmedia.test.service.impl;
    
    import java.util.ArrayList;
    import java.util.List;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.integration.Message;
    import org.springframework.integration.MessageChannel;
    import org.springframework.integration.annotation.CorrelationStrategy;
    import org.springframework.integration.jms.JmsHeaders;
    import org.springframework.integration.support.MessageBuilder;
    import org.springframework.stereotype.Component;
    
    import ca.sunmedia.test.model.Bidon;
    
    
    @Component(value="bidonService")
    public class BidonService {
    	
    	
    	@Autowired
    	@Qualifier("entryChannel")
    	private MessageChannel entryChannel;
    	
    	public void start() {
    		Bidon bidon = new Bidon();
    		bidon.setId("start");
    		Message<Bidon> message = MessageBuilder.withPayload(bidon).build();		
    		entryChannel.send(message);
    	}
    	
    	
    	public List<Message<Bidon>> split(Message<Bidon> input) {
    		List<Message<Bidon>> messages = new ArrayList<Message<Bidon>>();
    
    		for (int i = 1; i <= 5; i++) {
    			Bidon bidon = new Bidon();
    			bidon.setId(String.valueOf(i));
    			Message<Bidon> message = MessageBuilder.withPayload(bidon)
    					.setHeader(JmsHeaders.CORRELATION_ID, "test123")
    					.setSequenceSize(5)
    					.setSequenceNumber(i)
    					.build() ; 		
    			messages.add(message);
    		}
    		
    		return messages;
    	}
    	
    	@CorrelationStrategy
    	public Object getCorrelationKey(Message<?> message) {
    		return message.getHeaders().get(JmsHeaders.CORRELATION_ID);
    	}
    	
    	public Message<Bidon> processMessage(Message<Bidon> message) throws Exception {
    		System.out.println("processed " + message.getPayload().getId());
    		
    		if(true) {
    			throw new Exception("dummy exception");
    		}
    		
    		Message returnMessage = MessageBuilder.withPayload(message.getPayload())
    				.setHeader(JmsHeaders.CORRELATION_ID, message.getHeaders().get(JmsHeaders.CORRELATION_ID))
    				.setSequenceSize(message.getHeaders().getSequenceSize())
    				.setSequenceNumber(message.getHeaders().getSequenceNumber())
    				.build() ; 
    		
    		return returnMessage ; 
    	}
    	
    	
    	public Message<Bidon> aggregate(List<Message<Bidon>> messages) {
    		StringBuilder sb = new StringBuilder();
    		
    		for(Message<Bidon> message : messages) {
    			sb.append(message.getPayload().getId());
    		}
    		
    		Bidon bidon = new Bidon();
    		bidon.setId(sb.toString());
    		
    		Message<Bidon> message = MessageBuilder.withPayload(bidon).build();
    		return message;
    	}
    	
    	public void readAggregatedResult(Message<Bidon> message) {
    		System.out.println("end : " + message.getPayload().getId());
    	}
    	
    
    }
    The problem is that when an exception occurs, the message is reinserted in the queue. So this create a loop. In the above class, I generate a dummy exception in the processMessage method.

    I've read about specifying the error-channel attribute on the message-driven-channel-adapter, but that didn't fix my loop problem.

    Instead of an infinite loop, I would like the exception to be stored in a channel. On this channel, I would log its content.

    Does anyone knows why the message keeps being requeued ?

    I have uploaded the maven eclipse project to be able to test.

    Thank you !

  • #2
    Maybe it is caused by the redistribued flag set to true by default. I tried to set it to false that way, without succes :

    <bean id="redeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
    <property name="initialRedeliveryDelay" value="15000" />
    <property name="maximumRedeliveries" value="0" />
    <property name="useExponentialBackOff" value="true" />
    <property name="backOffMultiplier" value="2" />
    </bean>

    <bean id="connectionFactory" class="org.springframework.jms.connection.CachingC onnectionFactory">
    <property name="targetConnectionFactory">
    <bean class="org.apache.activemq.ActiveMQConnectionFacto ry">
    <property name="brokerURL" value="tcp://localhost:61616" />
    <property name="redeliveryPolicy" ref="redeliveryPolicy"/>
    </bean>
    </property>
    <property name="sessionCacheSize" value="10" />
    <property name="cacheProducers" value="false" />
    </bean>

    I still don't know how to tell SI to get exceptions to go in the errorChannel.

    Comment

    Working...
    X