Announcement Announcement Module
Collapse
No announcement yet.
JMS Input -> Service Problemo Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • JMS Input -> Service Problemo

    Hello,

    was wondering if anybody can help....

    I'm trying to post a message onto a queue, then have a spring-integration pipeline set up to process the message, ending in a service call.

    I've got a couple of integration tests set up - one test the pipeline without any jms - this one works.


    The second test brings up an embedded activeMQ broker and posts a message onto it.... this is the one that fails.

    It seems that I post my message ok, but it doesn't progress down the channels, filters, etc..... Until.... the test is ended and the beans get destroyed! At that point the very first filter get's hit, and then an exception gets thrown.

    Exception is shown, and then the skeleton config. I've been tearing my hair out trying to figure out what's going on.

    I haven't got much hair.

    To save the few remaining strands could somebody tell me if I'm doing something stupid here?

    Thanks,

    Vince.

    Exception is thrown when test is torn down...

    Code:
    org.springframework.integration.dispatcher.AggregateMessageDeliveryException: All attempts to deliver Message to MessageHandlers failed.
    	at org.springframework.integration.dispatcher.UnicastingDispatcher.handleExceptions(UnicastingDispatcher.java:143)
    	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:112)
    	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:90)
    	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:43)
    	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:116)
    	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:94)
    	at org.springframework.integration.channel.MessageChannelTemplate.doSend(MessageChannelTemplate.java:223)
    	at org.springframework.integration.channel.MessageChannelTemplate.send(MessageChannelTemplate.java:180)
    	at org.springframework.integration.channel.MessageChannelTemplate.send(MessageChannelTemplate.java:168)
    	at org.springframework.integration.jms.ChannelPublishingJmsMessageListener.onMessage(ChannelPublishingJmsMessageListener.java:203)
    	at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:518)
    	at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:479)
    	at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:451)
    	at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:323)
    	at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:261)
    	at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:982)
    	at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:974)
    	at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:876)
    	at java.lang.Thread.run(Thread.java:735)

    Config:


    Code:
     <amq:broker>
            <amq:transportConnectors>
                <amq:transportConnector uri="tcp://localhost:0"/>
            </amq:transportConnectors>
        </amq:broker>
    
        <!--  ActiveMQ destinations to use  -->
        <amq:queue id="inputQueue" physicalName="blah.input.name"/>
    
        <!-- JMS ConnectionFactory to use, configuring the embedded broker using XML -->
        <amq:connectionFactory id="connectionFactory" brokerURL="vm://localhost"/>
    
    
        <!-- Spring JMS Template -->
        <bean id="myJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
            <property name="connectionFactory">
                <!-- lets wrap in a pool to avoid creating a connection per send -->
                <bean class="org.springframework.jms.connection.SingleConnectionFactory">
                    <property name="targetConnectionFactory">
                        <ref local="connectionFactory"/>
                    </property>
                </bean>
            </property>
        </bean>
    
    
        <!-- a pojo to put messages onto an input jms queue -->
        <bean id="testProducer" class="blah.TestProducer">
            <property name="jmsTemplate">
                <ref bean="myJmsTemplate"></ref>
            </property>
            <property name="queue">
                <ref bean="inputQueue"/>
            </property>
        </bean>
    
    
     <!-- =============== Receives messages from jms   ===============  -->
        <jms:message-driven-channel-adapter id="messageIn"
                                            destination="inputQueue"
                                            channel="wireFormatFIXMLTradeChannel"/>
        
    
    <channel id="wireFormatFIXMLTradeChannel">        
            <interceptors>
                <ref bean="infoLoggingChannelInterceptor"/>
            </interceptors>
        </channel>
    
        <!-- add a filter to remove messages we dont care about -->
        <filter input-channel="wireFormatFIXMLTradeChannel"
                output-channel="filteredFIXMLTradeChannel"
                discard-channel="rejectedFIXMLTradeChannel"
                ref="fixmlFilter"/>
    
        <!-- any messages filtered out get put on this -->
        <channel id="rejectedFIXMLTradeChannel"/>
    
        <!-- messages that reach this channel have been filtered -->
        <channel id="filteredFIXMLTradeChannel"/>
    
    
        <!-- MxMLUnmarshaller -->
        <!-- Takes MxML raw XML
             Unmarshalls into MxML POJO's
             Outputs MxML POJOS's -->
        <transformer id="FIXMLUnmarshaller"
                     input-channel="filteredFIXMLTradeChannel"
                     ref="FIXMLJAXBUnmarshaller"
                     output-channel="FIXMLTradeChannel"/>
    
        <!-- MxMLTradeChannel  -->
        <!-- Data on here are FIXML POJO's -->
        <channel id="FIXMLTradeChannel"/>
    
        <!-- takes the jaxb objects and maps to Message with  Collection<Trade> payload -->
        <transformer id="tradeTransformer"
                     input-channel="FIXMLTradeChannel"
                     ref="fixmlTradeTransformer"
                     output-channel="tradeCollectionOutputChannel"/>
    
        <!-- channel contains a Messages with a Collection<Trade> payload -->
        <channel id="tradeCollectionOutputChannel"/>
    
        <!-- filters our invalid trades -->
        <filter input-channel="tradeCollectionOutputChannel"
                output-channel="validatedTradeChannel"
                discard-channel="failedTradeValidationChannel"
                ref="validTradeFilter"/>
    
        <!-- trades that pass validation get put on here -->
        <channel id="validatedTradeChannel"/>
        <!-- trades that fail validation get put on here -->
        <channel id="failedTradeValidationChannel"/>
    
        <service-activator id="tradeProcessorServiceActivator"
                           input-channel="validatedTradeChannel"                       
                           ref="tradeProcessorActivator"
                           method="processTrades"/>  
    
      <!-- theres some reference beans here but I don't think they're important. -->

  • #2
    The only thing I notice initially is that you have "connectionFactory" as the bean name for your unwrapped ConnectionFactory, and the message-driven-channel-adapter will use that (based on its bean name) by default (unless you add the 'connection-factory' attribute explicitly). However, I don't see how that would impact anything here.

    When you say it's working without JMS, you mean you are testing by sending directly to the filter's input-channel?

    Comment


    • #3
      Thanks for the response,

      I'm posting via a JMS producer that looks like this:


      Code:
      public class TestProducer {
      
          private JmsTemplate jmsTemplate;
          private Queue queue;
      
          public void setJmsTemplate(JmsTemplate jmsTemplate) {
              this.jmsTemplate = jmsTemplate;
          }
      
          public void setQueue(Queue queue) {
              this.queue = queue;
          }
      
          public void send(final String message) {        
              this.jmsTemplate.send(this.queue, new MessageCreator() {
                  public Message createMessage(Session session) throws JMSException {
                      return session.createTextMessage(message);
                  }
              });
          }
      
      }
      Its called from withing an integration test that looks like this (I've removed some crap from it so that it's easier to read..):

      Code:
      @RunWith(SpringJUnit4ClassRunner.class)
      public class TradePipelineJmsIntegrationTest {
      
      
          /**
           * used to send messages to an input queue.
           */
          @Autowired
          private TestProducer testProducer;
      
             
          @Test    
          public void testPipelineJMS() throws Exception {
              //this call just reads an xml string from a file.
              String stringXml = getFileAsString("/path/to/input/input_fixml.xml");
              testProducer.send(stringXml);
              //somet tests follow
          }
      }

      Comment


      • #4
        Vincent/mark,

        I am having the same issue and have the similar kind of set-up. Is there a solution to it?

        Also if i look at stacktrace then I have another exception like Not allowed to create destination; nested exception is javax.jms.InvalidDestinationException: Not allowed to create destination

        Comment

        Working...
        X