Announcement Announcement Module
Collapse
No announcement yet.
Missing message ID for stateful retry operations Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Missing message ID for stateful retry operations

    I'm trying to configure StatefulRetryOperationsInterceptorFactoryBean to simulate retry and dead letter queues as I have on a JMS implementation. But the retry is rasing a fatal exception due to the absence of the ID header.

    How do I add message Id header on my messages, backed by an Spring Integration amqp outbound channel?

    Code:
    	<int-amqp:outbound-channel-adapter channel="toRabbitJson" amqp-template="amqpTemplate" />
    Code:
    	<bean id="retryOperationsInterceptor" class="org.springframework.amqp.rabbit.config.StatefulRetryOperationsInterceptorFactoryBean">
    		<property name="retryOperations">
    			<bean class="org.springframework.retry.support.RetryTemplate">
    				<property name="retryPolicy">
    					<bean class="org.springframework.retry.policy.SimpleRetryPolicy">
    						<property name="maxAttempts" value="2" />
    					</bean>
    				</property>
    			</bean>
    		</property>
    		<property name="messageRecoverer">
    			<bean class="org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer" />
    		</property>
    	</bean>

  • #2
    The sending system needs to set the messageId header on the message to a unique value.

    Comment


    • #3
      On the outbound adapter, inject a SimpleMessageConverter, that has the 'createMessageIds' property set to true, into the 'amqpTemplate'.
      Last edited by Gary Russell; May 6th, 2013, 01:30 PM.

      Comment


      • #4
        Correction 'createMessageIds'

        Comment


        • #5
          I can't find on the documentation how to inject the message converter on the outbound adapter, only on inbound adapter. http://static.springsource.org/sprin...html/amqp.html

          I declared the bean here:
          Code:
          	<bean id="simpleMessageConverter" class="org.springframework.amqp.support.converter.SimpleMessageConverter">
          		<property name="createMessageIds" value="true" />
          	</bean>
          But I can't to inject it on the outbound adapter.

          Comment


          • #6
            I updated the post a little while ago to clarify that it goes on the template.

            Comment


            • #7
              Great, it worked!

              My RabbitMQ AMQP producer generating Java UUID on message ID header
              Code:
              	<bean id="simpleMessageConverter" class="org.springframework.amqp.support.converter.SimpleMessageConverter">
              		<property name="createMessageIds" value="true" />
              	</bean>
              	<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="amqp.direct" message-converter="simpleMessageConverter"/>
              And the consumer, with statefull retrys and reprocessing intervals (backoff policy).
              Code:
              	<int-amqp:inbound-channel-adapter channel="fromRabbit" queue-names="ha.queue" connection-factory="connectionFactory" channel-transacted="true" concurrent-consumers="20" advice-chain="retryOperationsInterceptor" />
              	<bean id="retryOperationsInterceptor" class="org.springframework.amqp.rabbit.config.StatefulRetryOperationsInterceptorFactoryBean">
              		<property name="retryOperations">
              			<bean class="org.springframework.retry.support.RetryTemplate">
              				<property name="retryPolicy">
              					<bean class="org.springframework.retry.policy.SimpleRetryPolicy">
              						<property name="maxAttempts" value="5" />
              					</bean>
              				</property>
              				<property name="backOffPolicy">
              					<bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
              						<property name="initialInterval" value="5000" />
              						<property name="maxInterval" value="120000" />
              						<property name="multiplier" value="2" />
              					</bean>		
              				</property>				
              			</bean>
              		</property>
              		<property name="messageRecoverer">
              			<bean class="org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer" />
              		</property>
              	</bean>
              Thanks!

              Comment

              Working...
              X