Announcement Announcement Module
Collapse
No announcement yet.
Using ActiveMq Topics Instead of Queues Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Using ActiveMq Topics Instead of Queues

    Hi all,

    I'm trying to configure an integration between 2 activemq queues using JMS. In both queues is published topics for possible subscribers. I have that app-context configuration:

    Code:
    		<!-- Channel Definition -->
    		
    		<!-- This channel supports all the error messages from other channels -->		
    		<integration:channel id="errorChannel"></integration:channel>
    		
    		
    		<!-- Correlator channels -->		 
    		<integration:channel id="channel1"></integration:channel>
    		<!-- End correlator channels -->
    		
    		<!-- End Channel Definition -->
    		
    		<!-- Channel adapter definitions -->		
    		
    		<spring-jms:inbound-channel-adapter id="adapter1" channel="channel1"
    			connection-factory="connectionFact" destination="topicOut">
    		</spring-jms:inbound-channel-adapter>
    
    		<spring-jms:outbound-channel-adapter id="adapter2" channel="channel1"	connection-factory="connectionFact" destination="topicIn">
    		</spring-jms:outbound-channel-adapter>
    	
    		<!-- End channel adapter definitions -->
    		
    		<!-- ActiveMq Configuration -->
    		 
    		<bean id="connectionFact" class="org.apache.activemq.pool.PooledConnectionFactory" 
    			destroy-method="stop">
    			<property name="connectionFactory">
    				<bean class="org.apache.activemq.ActiveMQConnectionFactory">
    					<property name="brokerURL">
    					<value>tcp://localhost:61616</value>
    					</property>
    				</bean>
    			</property>
    		</bean>
    			
    		<!-- Topics -->    	    	
        	<bean id="topicOut" class="org.apache.activemq.command.ActiveMQTopic">
            	<property name="physicalName" value="${topic.event.out}"/>
        	</bean>
        	    	
        	<bean id="topicIn" class="org.apache.activemq.command.ActiveMQTopic">
            	<property name="physicalName" value="${topic.event.in}"/>
        	</bean>				
    		<!-- End Topics -->
    And I'm getting that error:

    Code:
    ERROR org.springframework.integration.channel.MessagePublishingErrorHandler  - failure occurred in messaging task with message: {message description}
    org.springframework.integration.message.MessageDeliveryException: Dispatcher has no subscribers.
    	at org.springframework.integration.dispatcher.SimpleDispatcher.dispatch(SimpleDispatcher.java:39)
    	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:56)
    	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:116)
    	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:94)
    	at org.springframework.integration.channel.MessageChannelTemplate.doSend(MessageChannelTemplate.java:222)
    	at org.springframework.integration.channel.MessageChannelTemplate.send(MessageChannelTemplate.java:179)
    	at org.springframework.integration.endpoint.SourcePollingChannelAdapter.doPoll(SourcePollingChannelAdapter.java:78)
    	at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.innerPoll(AbstractPollingEndpoint.java:233)
    	at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.poll(AbstractPollingEndpoint.java:217)
    	at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:210)
    	at org.springframework.integration.scheduling.SimpleTaskScheduler$ErrorHandlingRunnableWrapper.run(SimpleTaskScheduler.java:307)
    	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
    	at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
    	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
    	at org.springframework.integration.scheduling.SimpleTaskScheduler$TriggeredTask.run(SimpleTaskScheduler.java:256)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
    	at java.lang.Thread.run(Thread.java:619)
    Can anyone help me?

    Thanks A Lot!!!!

    NEStabur
    Last edited by nestabur; Nov 21st, 2008, 03:38 AM.

  • #2
    It looks like you need to add a handler subscribed to the 'errorChannel' or else add a <queue/> sub-element so that it is not a dispatching channel (that is what throws the exception - dispatcher has no subscribers). Also, note that a default 'errorChannel' (with a queue) is created behind the scenes automatically, so you can leave that out unless you have some custom configuration requirements.

    Comment


    • #3
      Thanks Mark, it works. But now I dont receive anything from the ouput of the channel ...

      The code is the same ...

      Comment


      • #4
        Can you provide some more details. How are you passing a Message?

        Comment


        • #5
          As follows:

          Code:
          _jmsTemplate.send(_des, new MessageCreator() {
          					public Message createMessage(Session session)
          							throws JMSException {
          						Message msg = session.createMessage();
          						msg.setStringProperty("message", "test");
          						return msg;
          					}

          Comment


          • #6
            And, how are you receiving the JMS reply?

            Also, you may want to enable DEBUG-level logging for now to see if that provides more insight.

            Comment


            • #7
              Hi Mark,

              I'm not receiving any response from the channel, even when I activate the channel error all the messages are delivered there and the message is empty ...

              Here my send message class:
              Code:
              _jmsTemplate.send(_des, new MessageCreator() {
              
              					public Message createMessage(Session session)
              							throws JMSException {
              						// TODO Auto-generated method stub
              						Message msg = session.createMessage();
              						msg.setStringProperty("message", "test");
              						return msg;
              					}
              				});
              				try {
              					sleep(_delay);
              				}
              				catch (InterruptedException e) {
              					// TODO Auto-generated catch block
              					WriteLog(e.getMessage());
              				}

              Comment


              • #8
                I've activated the DEBUG level but spring didn't write anything in the log.

                Here my trace:

                Code:
                log4j:ERROR Could not find value for key log4j.appender.myMail
                log4j:ERROR Could not instantiate appender named "myMail".
                0    [main] INFO  org.springframework.context.support.ClassPathXmlApplicationContext  - Refreshing org.springframework.context.support.ClassPathXmlApplicationContext@bfc8e0: display name [org.springframework.context.support.ClassPathXmlApplicationContext@bfc8e0]; startup date [Fri Nov 21 16:43:17 CET 2008]; root of context hierarchy
                84   [main] INFO  org.springframework.beans.factory.xml.XmlBeanDefinitionReader  - Loading XML bean definitions from class path resource [ApplicationContextInfraestructure.xml]
                353  [main] INFO  org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor  - Initializing ThreadPoolExecutor
                367  [main] INFO  org.springframework.beans.factory.support.DefaultListableBeanFactory  - Overriding bean definition for bean 'errorChannel': replacing [Root bean: class [org.springframework.integration.channel.QueueChannel]; scope=singleton; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=null; destroyMethodName=null] with [Generic bean: class [org.springframework.integration.channel.QueueChannel]; scope=singleton; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=null; destroyMethodName=null]
                383  [main] INFO  org.springframework.beans.factory.support.DefaultListableBeanFactory  - Overriding bean definition for bean 'correlatedBasicEventOutput': replacing [Generic bean: class [org.springframework.integration.config.ConsumerEndpointFactoryBean]; scope=singleton; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=null; destroyMethodName=null] with [Generic bean: class [org.springframework.integration.config.ConsumerEndpointFactoryBean]; scope=singleton; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=null; destroyMethodName=null]
                384  [main] INFO  org.springframework.beans.factory.support.DefaultListableBeanFactory  - Overriding bean definition for bean 'correlatedBasicEventInput': replacing [Generic bean: class [org.springframework.integration.endpoint.SourcePollingChannelAdapter]; scope=singleton; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=null; destroyMethodName=null] with [Generic bean: class [org.springframework.integration.config.ConsumerEndpointFactoryBean]; scope=singleton; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=null; destroyMethodName=null]
                387  [main] INFO  org.springframework.beans.factory.xml.XmlBeanDefinitionReader  - Loading XML bean definitions from class path resource [testcontext.xml]
                442  [main] INFO  org.springframework.context.support.ClassPathXmlApplicationContext  - Bean factory for application context [org.springframework.context.support.ClassPathXmlApplicationContext@bfc8e0]: org.springframework.beans.factory.support.DefaultListableBeanFactory@16546ef
                525  [main] INFO  org.springframework.beans.factory.config.PropertyPlaceholderConfigurer  - Loading properties file from class path resource [config.properties]
                642  [main] INFO  org.springframework.integration.endpoint.PollingConsumer  - started correlatedBasicEventInput
                644  [main] INFO  org.springframework.integration.endpoint.PollingConsumer  - started correlatedBasicEventOutput
                645  [main] INFO  org.springframework.beans.factory.support.DefaultListableBeanFactory  - Pre-instantiating singletons in org.springframework.beans.factory.support.DefaultListableBeanFactory@16546ef: defining beans [propertyConfigurer,errorChannel,org.springframework.integration.channel.MessagePublishingErrorHandler#0,taskScheduler,com.s2grupo.triton.integration.interceptors.ChannelInterceptor#0,com.s2grupo.triton.integration.interceptors.ChannelInterceptor#1,correlatedBasicEvent,correlatedBasicAlarm,org.springframework.integration.jms.JmsDestinationPollingSource#0,correlatedBasicEventInput,org.springframework.integration.jms.JmsSendingMessageHandler#0,correlatedBasicEventOutput,org.springframework.integration.jms.JmsDestinationPollingSource#1,correlatedBasicAlarmInput,org.springframework.integration.jms.JmsSendingMessageHandler#1,org.springframework.integration.jms.JmsSendingMessageHandler#2,tritonJmsFactory,emasJmsFactory,alarmTopicOut,alarmTopicIn,eventTopicOut,eventTopicIn,errorTopic,eventTopicTest,jmsFactory,testConnectionFactory,listenerTest,jmsContainer,errorListener,jmsErrorContainer]; root of factory hierarchy
                665  [main] INFO  org.springframework.integration.endpoint.SourcePollingChannelAdapter  - started correlatedBasicAlarmInput
                1007 [main] INFO  org.springframework.integration.scheduling.SimpleTaskScheduler  - started [email protected]3beb5
                1007 [main] INFO  Main  - ====> Cargando test de envio.
                1008 [task-scheduler-2] INFO  Main  - Receiving in channelerrorChannel
                1008 [Thread-3] INFO  Main  - Escribiendo...
                1008 [main] INFO  Main  - ====> Cargado test de envio.
                1048 [Thread-3] INFO  Main  - Mensage test enviado
                2009 [task-scheduler-5] INFO  Main  - Receiving in channelerrorChannel
                3009 [task-scheduler-4] INFO  Main  - Receiving in channelerrorChannel
                3061 [Thread-3] INFO  Main  - Escribiendo...
                3072 [Thread-3] INFO  Main  - Mensage test enviado
                4010 [task-scheduler-5] INFO  Main  - Receiving in channelerrorChannel
                5010 [task-scheduler-2] INFO  Main  - Receiving in channelerrorChannel

                Comment


                • #9
                  As you can see in the post, all the messages are delivered to channel error.

                  Comment


                  • #10
                    in your error channel handler, grab the Message payload (message.getPayload()). It will be an Exception instance. Then, call printStackTrace() on that.

                    Comment


                    • #11
                      Hi again,

                      I have tis config for my errorChannel:

                      Code:
                      		<integration:channel id="errorChannel">
                      			<integration:queue/>
                      		</integration:channel>	
                      
                      		<bean id="channelErrorAdapter" class="ChannelErrorAdapter">
                      		</bean>
                      		
                      		<integration:outbound-channel-adapter ref="channelErrorAdapter" method="OnMessageReceive"
                      			channel="errorChannel">
                      		</integration:outbound-channel-adapter>
                      And here my ChannelErrorAdapter implementation:

                      Code:
                      	public void OnMessageReceive(Message<?> msg)
                      	{
                      		((Exception)msg.getPayload()).printStackTrace();
                      	}
                      In the trace I'm not receiving any error ...

                      Comment


                      • #12
                        What do you mean by this?:
                        In the trace I'm not receiving any error ...
                        Earlier, you said this:
                        As you can see in the post, all the messages are delivered to channel error.
                        Are you receiving ErrorMessages or not?

                        If the ChannelErrorAdapter.OnMessageReceive(..) method is not being invoked, then you are not receiving ErrorMessages on that channel.

                        Comment


                        • #13
                          Yes I know, but if I put an interceptor in the errorChannel I receive this one:

                          Code:
                          		<integration:channel id="errorChannel">
                          			<integration:queue/>
                          			<integration:interceptors>
                          				<bean class="ChannelInterceptor"></bean>
                          			</integration:interceptors>
                          		</integration:channel>
                          Here my interceptor:

                          Code:
                              @Override
                              public boolean preReceive(org.springframework.integration.core.MessageChannel channel) {
                          		
                              	WriteLog("Receiving in channel"+channel.getName());
                              	
                              	return true;
                              	}
                              
                              @Override
                              public org.springframework.integration.core.Message<?> preSend(org.springframework.integration.core.Message<?> message, 
                              		org.springframework.integration.core.MessageChannel channel) {
                          		
                              	WriteLog("Sending to channel "+channel.getName() + " message " + message.toString());
                              	
                              	return message;
                              	}
                          As I can see, the error channel are receiving messages but they are not delivered to my outbound-channel-adapter.

                          Sorry but I cant understand the situation ... I've just followed the integration documentation ...

                          Comment


                          • #14
                            The 'preReceive' call will be invoked every time that channel is polled regardless of whether there are any Messages on the channel.

                            Therefore, if you are not seeing the 'Sending...' log output from your interceptor, then there are no messages on the error channel.

                            It looks like you are in fact not seeing those log messages. Is that correct?

                            Comment


                            • #15
                              Yes it is,

                              But I've put that interceptor in all the channels and I only can see the sending message:

                              Code:
                              5227 [task-scheduler-4] INFO  Main  - Sending to channel correlatedBasicEvent message [Payload=ActiveMQMessage {commandId = 5, responseRequired = true, messageId = ID:pc-nestor-39806-1227284819436-0:5:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:pc-nestor-39806-1227284819436-0:5:1:1, destination = topic://CORRELATED.BASIC_EVENT.OUT, transactionId = null, expiration = 0, timestamp = 1227284823765, arrival = 0, brokerInTime = 1227284823766, brokerOutTime = 1227284823767, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = org.apache.activemq.util.ByteSequence@b51c29, dataStructure = null, redeliveryCounter = 0, size = 0, properties = {message=test}, readOnlyProperties = true, readOnlyBody = true, droppable = false}][Headers={spring.integration.id=242a49aa-3428-467d-8e86-840ed881120e, spring.integration.user.jms.message=test, spring.integration.transport.jms.redelivered=false, spring.integration.timestamp=1227284823768}]
                              6170 [task-scheduler-6] INFO  Main  - Receiving in channelerrorChannel
                              7171 [task-scheduler-6] INFO  Main  - Receiving in channelerrorChannel

                              Comment

                              Working...
                              X