Announcement Announcement Module
Collapse
No announcement yet.
ERROR: Dispatcher has no subscribers Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • ERROR: Dispatcher has no subscribers

    Hi,
    I have a web service endpoint that receives a collection of search requests which in turn sends the collection of search requests to Spring Integration Gateway interface. It then splits the collection into individual search request and route the request to two channels at the same time (Recipient List Router). Some elements in the search request has to be processed from database and other elements need to be sent to an external web services to get the response. So I have defined two service activator APIs each receiving search request from a unique channel respectively. The service activator API's then sends the response to the aggregator and finally return the aggregated response to the client. The correlation is implemented using the transaction number associated with the search response.
    But when I fire the request using SOAP UI as if the external client invokes the webservice and I could see the request coming in. But it fails and throws the following exception.
    I am using Spring Web Services 1.5, Spring Framework 2.5.6, Spring Integration 1.0.3, Spring Security 2.0.4, JDK 1.6.13, Windows XP development environment, Tomcat 6.0.20 servlet container.
    I would greatly appreciate if anyone could throw some light into the problem area that needs to be addressed.

    Code:
    2009-09-04 11:36:18,857 DEBUG [org.springframework.integration.channel.DirectChannel] - preSend on channel 'inputRequest', message: [Payload=[email protected]e1][Headers={springintegration_replyChannel=org.springframework.integration.channel.MessageChannelTemplate$TemporaryReplyChannel@1c9d17e, springintegration_timestamp=1252078578857, springintegration_id=413e9881-ad3b-47af-b5dc-aeee89c7eb11, springintegration_errorChannel=org.springframework.integration.channel.MessageChannelTemplate$TemporaryReplyChannel@1c9d17e}]
    2009-09-04 11:36:18,857 DEBUG [org.springframework.jdbc.datasource.DataSourceTransactionManager] - Initiating transaction rollback
    2009-09-04 11:36:18,857 DEBUG [org.springframework.jdbc.datasource.DataSourceTransactionManager] - Rolling back JDBC transaction on Connection [ProxyConnection[PooledConnection[com.ibm.db2.jcc.a.b@1de530a]]]
    2009-09-04 11:36:18,873 DEBUG [org.springframework.jdbc.datasource.DataSourceUtils] - Resetting read-only flag of JDBC Connection [ProxyConnection[PooledConnection[com.ibm.db2.jcc.a.b@1de530a]]]
    2009-09-04 11:36:18,873 DEBUG [org.springframework.jdbc.datasource.DataSourceTransactionManager] - Releasing JDBC Connection [ProxyConnection[PooledConnection[com.ibm.db2.jcc.a.b@1de530a]]] after transaction
    2009-09-04 11:36:18,873 DEBUG [org.springframework.jdbc.datasource.DataSourceUtils] - Returning JDBC Connection to DataSource
    2009-09-04 11:36:18,873 DEBUG [org.springframework.ws.soap.server.SoapMessageDispatcher] - Endpoint invocation resulted in exception - responding with Fault
    org.springframework.integration.message.MessageDeliveryException: Dispatcher has no subscribers.
    	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:97)
    	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:90)
    	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:43)
    	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:116)
    	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:94)
    	at org.springframework.integration.channel.MessageChannelTemplate.doSend(MessageChannelTemplate.java:223)
    	at org.springframework.integration.channel.MessageChannelTemplate.doSendAndReceive(MessageChannelTemplate.java:248)
    	at org.springframework.integration.channel.MessageChannelTemplate.sendAndReceive(MessageChannelTemplate.java:215)
    	at org.springframework.integration.gateway.AbstractMessagingGateway.sendAndReceiveMessage(AbstractMessagingGateway.java:176)
    	at org.springframework.integration.gateway.AbstractMessagingGateway.sendAndReceive(AbstractMessagingGateway.java:159)
    	at org.springframework.integration.gateway.AbstractMessagingGateway.sendAndReceive(AbstractMessagingGateway.java:150)
    	at org.springframework.integration.gateway.GatewayProxyFactoryBean.invokeGatewayMethod(GatewayProxyFactoryBean.java:203)
    	at org.springframework.integration.gateway.GatewayProxyFactoryBean.invoke(GatewayProxyFactoryBean.java:172)
    	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:171)
    	at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204)
    	at $Proxy17.getIrpSearchResponse(Unknown Source)
    	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    	at java.lang.reflect.Method.invoke(Method.java:597)
    	at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:307)
    	at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:182)
    	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:149)
    	at org.springframework.security.intercept.method.aopalliance.MethodSecurityInterceptor.invoke(MethodSecurityInterceptor.java:66)
    	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:171)
    	at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:106)
    	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:171)
    	at org.springframework.aop.framework.adapter.MethodBeforeAdviceInterceptor.invoke(MethodBeforeAdviceInterceptor.java:50)
    	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:160)
    	at org.springframework.aop.framework.adapter.MethodBeforeAdviceInterceptor.invoke(MethodBeforeAdviceInterceptor.java:50)
    	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:160)
    	at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:89)
    	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:171)
    	at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204)
    	at $Proxy36.getIrpSearchResponse(Unknown Source)
    	at gov.dc.dmv.destiny.eis.services.irp.ws.IrpWebServiceMarshallingEndpoint.getSchemaStopDataResponse(IrpWebServiceMarshallingEndpoint.java:87)
    	at gov.dc.dmv.destiny.eis.services.irp.ws.IrpWebServiceMarshallingEndpoint.getStopDataResponse(IrpWebServiceMarshallingEndpoint.java:62)
    	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    	at java.lang.reflect.Method.invoke(Method.java:597)
    	at org.springframework.ws.server.endpoint.MethodEndpoint.invoke(MethodEndpoint.java:115)
    	at org.springframework.ws.server.endpoint.adapter.MarshallingMethodEndpointAdapter.invokeInternal(MarshallingMethodEndpointAdapter.java:135)
    	at org.springframework.ws.server.endpoint.adapter.AbstractMethodEndpointAdapter.invoke(AbstractMethodEndpointAdapter.java:58)
    	at org.springframework.ws.server.MessageDispatcher.dispatch(MessageDispatcher.java:221)
    	at org.springframework.ws.server.MessageDispatcher.receive(MessageDispatcher.java:168)
    	at org.springframework.ws.transport.support.WebServiceMessageReceiverObjectSupport.handleConnection(WebServiceMessageReceiverObjectSupport.java:88)
    	at org.springframework.ws.transport.http.WebServiceMessageReceiverHandlerAdapter.handle(WebServiceMessageReceiverHandlerAdapter.java:57)
    	at org.springframework.ws.transport.http.MessageDispatcherServlet.doService(MessageDispatcherServlet.java:230)
    	at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:571)
    	at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:511)
    	at javax.servlet.http.HttpServlet.service(HttpServlet.java:637)
    	at javax.servlet.http.HttpServlet.service(HttpServlet.java:717)
    	at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:290)
    	at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206)
    	at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:233)
    	at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:191)
    	at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:128)
    	at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:102)
    	at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:109)
    	at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:293)
    	at org.apache.coyote.http11.Http11Processor.process(Http11Processor.java:849)
    	at org.apache.coyote.http11.Http11Protocol$Http11ConnectionHandler.process(Http11Protocol.java:583)
    	at org.apache.tomcat.util.net.JIoEndpoint$Worker.run(JIoEndpoint.java:454)
    	at java.lang.Thread.run(Thread.java:619)
    Thanks & Regards,
    Vigil
    Last edited by vbose; Sep 4th, 2009, 01:14 PM.

  • #2
    It looks like your input channel has no subscribers. Can you provide the relevant part of the configuration?

    Comment


    • #3
      Mark,

      Thanks for looking into this. Here is what you asked.

      Gateway Interface:

      Code:
      /**
       * IIrpService.java :The {@link IIrpService} interface exposes the API
       * that implements the business logic to retrieve the stop information
       * for the given IRP search request parameters.
       * @author Vigil Bose 
       */
      public interface IIrpService {
      
      	/**
      	 * The API getIrpSearchResponse() is used to get the response that contains
      	 * the stop information for the given {@link IrpSearchRequest}
      	 * @param request - The search object
      	 * @return an instance of {@link IrpSearchResponse}
      	 * @throws InvalidDataException
      	 * @throws DataAccessFailedException
      	 * @throws InvalidRegionException
      	 * @throws DatatypeConfigurationException
      	 */
      	@Gateway(requestChannel="inputRequest", replyChannel="searchResponse")
      	@Transactional (readOnly = true)
      	@Secured({RoleConstants.ROLE_STOP_REQUEST})
      	@BusinessOperation (IIrpOperations.SEARCH)
      	public IrpSearchResponse getIrpSearchResponse(IrpSearchRequest request)
      		throws InvalidDataException, DataAccessFailedException, InvalidRegionException, DatatypeConfigurationException;
      Splitter:

      Code:
      @MessageEndpoint
      public class SearchRequestSplitter {
      
      	private static final Log logger = LogFactory.getLog(SearchRequestSplitter.class);
      	
      	@Splitter(inputChannel="inputRequest", outputChannel="searchRequest")
      	public List<SearchRequest> split(IrpSearchRequest request){
      		
      		logger.info("The Stop Splitter recieves the request to split the search request data ");
      		return request.getSearchRequestList();
      	}
      }
      Router:
      Code:
      @MessageEndpoint
      public class SearchRouter {
      
      	@Router(inputChannel="searchRequest")
      	public List<String> route(SearchRequest searchRequest){
      		
      		List<String> channelList = new ArrayList<String>(2);
      		channelList.add("stopRequest");
      		channelList.add("ivsRequest");
      		
      		return channelList;
      	}
      }
      Service Activator:
      Code:
      /**
      	 * The API getIrpSearchResponse() is used to get the response that contains
      	 * the stop information for the given {@link IrpSearchRequest}
      	 * @param request - The search object
      	 * @return an instance of {@link IrpSearchResponse}
      	 * @throws InvalidDataException
      	 * @throws DataAccessFailedException
      	 * @throws InvalidRegionException
      	 * @throws DatatypeConfigurationException 
      	 */
      	@ServiceActivator(inputChannel="stopRequest", outputChannel="responseAggregator")
      	public SearchResponse getStopResponse(SearchRequest searchRequest){
                  
                  // Implemented database interaction and return an instance of Search Response
              }
              @ServiceActivator(inputChannel="ivsRequest", outputChannel="responseAggregator")
      	public SearchResponse getIvsResponse(SearchRequest searchRequest){
                  
                  //Implemented the logic external web service invocation to get the response to some request parameters
              }
      Aggregator:
      Code:
      @MessageEndpoint
      public class SearchResponseAggregator {
      
      	 private static final Log logger = LogFactory.getLog(SearchResponseAggregator.class);
      	
      	 @Aggregator(inputChannel = "responseAggregator", outputChannel = "searchResponse", timeout = 5 * 60 * 1000)
      	 public IrpSearchResponse getIrpSearchResponse(List<SearchResponse> responseList){
      		assert(responseList.size() > 0);
      		logger.info("The aggregator receives search responses :"+responseList.get(0).getTransactionId());
      		return new IrpSearchResponse(responseList);
      	 }
      	 
      	 @CorrelationStrategy
      	 public Long correlateByTransactionNumber(SearchResponse response){
      		 return response.getTransactionId();
      	 }	 
      }
      Application Context :
      Code:
      <?xml version="1.0" encoding="UTF-8"?>
      <beans:beans xmlns="http://www.springframework.org/schema/integration"
      	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      	xmlns:beans="http://www.springframework.org/schema/beans"
      	xmlns:context="http://www.springframework.org/schema/context"
      	xmlns:ws="http://www.springframework.org/schema/integration/ws"
      	xsi:schemaLocation="http://www.springframework.org/schema/beans
      			http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
      			http://www.springframework.org/schema/context
      			http://www.springframework.org/schema/context/spring-context-2.5.xsd
      			http://www.springframework.org/schema/integration
      			http://www.springframework.org/schema/integration/spring-integration-1.0.xsd
      			http://www.springframework.org/schema/integration/ws
      			http://www.springframework.org/schema/integration/ws/spring-integration-ws-1.0.xsd">
      			
         <gateway id="irpService" service-interface="gov.dc.dmv.destiny.eis.services.irp.service.IIrpService"/>	
         
         <channel id="inputRequest"/>
         <channel id="searchRequest"/> 
         <channel id="stopRequest"><queue capacity="25"/></channel>
         <channel id="ivsRequest"><queue capacity="25"/></channel>    
         <channel id="responseAggregator"/>
         <channel id="searchResponse"/>   
         
      			
      </beans:beans>
      Last edited by vbose; Sep 4th, 2009, 01:30 PM.

      Comment


      • #4
        Do you have a <context:component-scan ../> element? If not, you need to add that to pick up the various annotated objects. Also, you should provide the <annotation-driven/> element from the Spring Integration core namespace.

        Comment


        • #5
          I have another context file that imports the SI application context that has the the <context:component-scan base-package/> defined. Please see the following context


          Code:
          <?xml version="1.0" encoding="UTF-8"?>
          <beans xmlns="http://www.springframework.org/schema/beans"
                 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                 xmlns:tx="http://www.springframework.org/schema/tx"
                 xmlns:context="http://www.springframework.org/schema/context"
                 xmlns:aop="http://www.springframework.org/schema/aop"
                 xmlns:util="http://www.springframework.org/schema/util"          
                 xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
                 http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.5.xsd
                 http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.5.xsd
                 http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd
                 http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-2.5.xsd">
          
          	<import resource="applicationContext-integration.xml"/>
          	
              <tx:annotation-driven/>
          
              <context:annotation-config/>
              
              <context:component-scan base-package="gov.dc.dmv.destiny.eis.services"/>
              
              <aop:aspectj-autoproxy/>    
              
              .....

          Comment


          • #6
            Okay. In that case, I think the only missing piece is the <annotation-driven/> element in your Spring Integration configuration.

            Comment


            • #7
              Is it <annotation-config/> or <annotation-driven/>. Please confirm. When I use <annotation-driven/>, STS IDE shows error.

              <annotation-config/> is already defined in the application context that imports the SI application context

              Comment


              • #8
                Sorry, it is <annotation-config/> and it's separate from the one you already have defined. Basically, the one in the "context" namespace turns on core Spring functionality like "@Autowired" etc. In fact you do not need that since it's already enabled by the <component-scan/> element.

                The "spring-integration" XSD also defines an <annotation-config/> element, and that one enables post-processing of any @MessageEndpoint classes to find @Splitter, @Router, etc. and create the necessary endpoints. If you add that to the configuration that contains your channels, I think you'll be up and running (or at least one big step closer).

                Comment


                • #9
                  I did that. Now I got a different error.

                  Code:
                  2009-09-04 13:50:01,826 ERROR [org.springframework.web.context.ContextLoader] - Context initialization failed
                  org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'irpServiceActivator' defined in file [C:\software\apache-tomcat-6.0.20\webapps\irp-ivs-ws\WEB-INF\classes\gov\dc\dmv\destiny\eis\services\irp\service\impl\IrpServiceImpl.class]: Initialization of bean failed; nested exception is java.lang.IllegalArgumentException: The input channel for an Annotation-based endpoint must be a SubscribableChannel.
                  	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:480)
                  	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory$1.run(AbstractAutowireCapableBeanFactory.java:409)
                  	at java.security.AccessController.doPrivileged(Native Method)
                  	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:380)
                  	at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:264)
                  	at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:222)
                  	at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:261)
                  	at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:185)
                  	at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:164)
                  	at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:429)
                  	at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:728)
                  	at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:380)
                  	at org.springframework.web.context.ContextLoader.createWebApplicationContext(ContextLoader.java:255)
                  	at org.springframework.web.context.ContextLoader.initWebApplicationContext(ContextLoader.java:199)
                  	at org.springframework.web.context.ContextLoaderListener.contextInitialized(ContextLoaderListener.java:45)
                  	at org.apache.catalina.core.StandardContext.listenerStart(StandardContext.java:3934)
                  	at org.apache.catalina.core.StandardContext.start(StandardContext.java:4429)
                  	at org.apache.catalina.core.ContainerBase.addChildInternal(ContainerBase.java:791)
                  	at org.apache.catalina.core.ContainerBase.addChild(ContainerBase.java:771)
                  	at org.apache.catalina.core.StandardHost.addChild(StandardHost.java:526)
                  	at org.apache.catalina.startup.HostConfig.deployDescriptor(HostConfig.java:630)
                  	at org.apache.catalina.startup.HostConfig.deployDescriptors(HostConfig.java:556)
                  	at org.apache.catalina.startup.HostConfig.deployApps(HostConfig.java:491)
                  	at org.apache.catalina.startup.HostConfig.start(HostConfig.java:1206)
                  	at org.apache.catalina.startup.HostConfig.lifecycleEvent(HostConfig.java:314)
                  	at org.apache.catalina.util.LifecycleSupport.fireLifecycleEvent(LifecycleSupport.java:119)
                  	at org.apache.catalina.core.ContainerBase.start(ContainerBase.java:1053)
                  	at org.apache.catalina.core.StandardHost.start(StandardHost.java:722)
                  	at org.apache.catalina.core.ContainerBase.start(ContainerBase.java:1045)
                  	at org.apache.catalina.core.StandardEngine.start(StandardEngine.java:443)
                  	at org.apache.catalina.core.StandardService.start(StandardService.java:516)
                  	at org.apache.catalina.core.StandardServer.start(StandardServer.java:710)
                  	at org.apache.catalina.startup.Catalina.start(Catalina.java:583)
                  	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
                  	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
                  	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
                  	at java.lang.reflect.Method.invoke(Method.java:597)
                  	at org.apache.catalina.startup.Bootstrap.start(Bootstrap.java:288)
                  	at org.apache.catalina.startup.Bootstrap.main(Bootstrap.java:413)
                  Caused by: java.lang.IllegalArgumentException: The input channel for an Annotation-based endpoint must be a SubscribableChannel.
                  	at org.springframework.util.Assert.isTrue(Assert.java:65)
                  	at org.springframework.integration.config.annotation.AbstractMethodAnnotationPostProcessor.createEndpoint(AbstractMethodAnnotationPostProcessor.java:89)
                  	at org.springframework.integration.config.annotation.AbstractMethodAnnotationPostProcessor.postProcess(AbstractMethodAnnotationPostProcessor.java:72)
                  	at org.springframework.integration.config.annotation.MessagingAnnotationPostProcessor$1.doWith(MessagingAnnotationPostProcessor.java:104)
                  	at org.springframework.util.ReflectionUtils.doWithMethods(ReflectionUtils.java:466)
                  	at org.springframework.util.ReflectionUtils.doWithMethods(ReflectionUtils.java:443)
                  	at org.springframework.integration.config.annotation.MessagingAnnotationPostProcessor.postProcessAfterInitialization(MessagingAnnotationPostProcessor.java:97)
                  	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsAfterInitialization(AbstractAutowireCapableBeanFactory.java:361)
                  	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1344)
                  	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:473)
                  	... 38 more

                  Comment


                  • #10
                    Okay. That is because channels that must be polled (the 2 that you have with 'queue's) should not be referenced by annotations. The original rationale for that was that polling configuration should not be hard-coded in the Java code.

                    Generally, pollable channels are discouraged since it breaks a transaction boundary, so the first question is do you *really* need to have the asynchronous, buffering behavior there?

                    If so, then you can provide a "bridge" that allows the polling metadata to be externalized in the configuration and whose output-channel is the one referenced by the annotation property.

                    Comment


                    • #11
                      Thanks Mark, let me remove the asynchronous behavior and try it. Actually I do not really need the asynchronous behavior. The invocation is all synchronous.

                      Comment


                      • #12
                        Now my message is getting processed. The database response is sent to the aggregator. But the web service response is not. Do I need a completion strategy here? The two part response should be combined into a single response using transaction id (114880765489) as the correlation strategy.

                        I see the following entry in the log after the first response received in the aggregator:
                        2009-09-04 16:10:00,669 DEBUG [org.springframework.integration.aggregator.MethodI nvokingAggregator] - Handling of Message group with correlationKey '114880765489' has already completed or timed out.

                        Any insight would be helpful

                        SearchResponse.java
                        Code:
                        public class SearchResponse extends TransactionInfo implements Serializable {
                        
                        	private static final long serialVersionUID = 1L;
                        	
                        	private IvsResponse ivsResponse;
                        	private StopResponse stopResponse;
                        	
                        	/**
                        	 * @return the ivsResponse
                        	 */
                        	public IvsResponse getIvsResponse() {
                        		return ivsResponse;
                        	}
                        	/**
                        	 * @param ivsResponse the ivsResponse to set
                        	 */
                        	public void setIvsResponse(IvsResponse ivsResponse) {
                        		this.ivsResponse = ivsResponse;
                        	}
                        	/**
                        	 * @return the stopResponse
                        	 */
                        	public StopResponse getStopResponse() {
                        		return stopResponse;
                        	}
                        	/**
                        	 * @param stopResponse the stopResponse to set
                        	 */
                        	public void setStopResponse(StopResponse stopResponse) {
                        		this.stopResponse = stopResponse;
                        	}
                        Service Activator API implementation logic:

                        Code:
                        @ServiceActivator(inputChannel="stopRequest", outputChannel="responseAggregator")
                        	public SearchResponse getStopResponse(SearchRequest searchRequest)
                        			throws InvalidDataException, 
                        						DataAccessFailedException,
                        							  InvalidRegionException, 
                        							  		DatatypeConfigurationException {
                        		
                        		SearchResponse searchResponse = null;
                        				
                        		try{			
                        			
                        			StopResponse stopResponse;
                        			List<Messages> messageList;
                        			
                        			StopRequest stopRequest = new StopRequest();
                        			stopRequest.setBusiness(searchRequest.getBusiness());
                        			stopRequest.setIndividual(searchRequest.getIndividual());
                        			stopRequest.setVehicle(searchRequest.getVehicle());
                        
                        				
                        			stopResponse = this.irpServiceJdbcRepository.getStopDetailsList(stopRequest);
                        			stopResponse.setTransactionId(searchRequest.getTransactionId());
                        
                        			messageList = getMessageList(stopResponse);				
                        			if (messageList != null){
                        				stopResponse.setMessageList(messageList);
                        			}
                        			searchResponse = new SearchResponse();
                        			searchResponse.setStopResponse(stopResponse);
                        			searchResponse.setTransactionId(searchRequest.getTransactionId());			
                        		
                        		}catch (DataAccessException e){
                        			logger.error(e);	
                        			e.printStackTrace();
                        			throw new DataAccessFailedException("The IRP request to the Destiny Data Access Layer has been failed");
                        		}		
                        		return searchResponse;
                        	}
                        
                        
                        @ServiceActivator(inputChannel="ivsRequest", outputChannel="responseAggregator")
                        	public SearchResponse getIvsResponse(SearchRequest searchRequest)
                        			throws InvalidDataException, 
                        					  DataAccessFailedException,
                        					  	 InvalidRegionException, 
                        		  		              DatatypeConfigurationException {
                        		
                        		
                        		SearchResponse searchResponse = null;
                        		
                        		try{
                        			
                        			IvsResponse ivsResponse;
                        			
                        			IvsRequest ivsRequest = new IvsRequest();
                        			ivsRequest.setOwnerType(searchRequest.getOwnerType());
                        			ivsRequest.setPolicyInfo(searchRequest.getPolicyInfo());
                        			ivsRequest.setRegion(searchRequest.getRegion());
                        			ivsRequest.setTransactionId(searchRequest.getTransactionId());
                        			ivsRequest.setTransactionSource(searchRequest.getTransactionSource());
                        			ivsRequest.setVerificationDate(searchRequest.getVerificationDate());
                        			
                        			ivsResponse = this.getInsuranceVerificationResponse.getInsuranceVerificationResponseData(ivsRequest);
                        			
                        			logger.info("Primary Response :"+ivsResponse.getPrimaryResponse());
                        			logger.info("Reason Code :"+ivsResponse.getReasonCode());
                        						
                        			searchResponse = new SearchResponse();
                        			searchResponse.setIvsResponse(ivsResponse);
                        			searchResponse.setTransactionId(searchRequest.getTransactionId());
                        		}catch (Exception e){
                        			logger.error(e);	
                        			e.printStackTrace();
                        			throw new DataAccessFailedException("The IRP request to the IVS Service has been failed");
                        		}		
                        		return searchResponse;
                        	}
                        Last edited by vbose; Sep 4th, 2009, 04:33 PM.

                        Comment


                        • #13
                          Are you always expecting exactly 2 items per-transaction ID?

                          Comment


                          • #14
                            The answer is NO. However, I made it work once. I changed the correlation strategy from using transaction id to a search id. I have assigned a unique id per search request in the collection of search requests. So the aggregator can now group the search response by search id. But now, it is timing out as I see the debug statements to this effect in the log mentioned below and I am not seeing any response coming back to the client though I could see each service activator API invocation completes the task except the aggregator.

                            Code:
                            2009-09-07 00:39:44,875 DEBUG [org.springframework.integration.handler.ServiceActivatingHandler] - handler 'ServiceActivator for [org.springframework.integration.handler.MessageMappingMethodInvoker@e7ccd7]' sending reply Message: [Payload=gov.dc.dmv.destiny.eis.services.irp.domain.SearchResponse@598a5d][Headers={springintegration_sequenceSize=1, springintegration_replyChannel=org.springframework.integration.channel.MessageChannelTemplate$TemporaryReplyChannel@b7cee4, springintegration_timestamp=1252298384718, springintegration_id=47d26a08-e5f3-419a-bf41-d1fd1d83b8bd, springintegration_correlationId=0cfeb83e-385e-4bbf-924b-58982fa78f88, springintegration_sequenceNumber=1, springintegration_errorChannel=org.springframework.integration.channel.MessageChannelTemplate$TemporaryReplyChannel@b7cee4}]
                            2009-09-07 00:39:44,875 DEBUG [org.springframework.integration.channel.DirectChannel] - preSend on channel 'responseAggregator', message: [Payload=gov.dc.dmv.destiny.eis.services.irp.domain.SearchResponse@598a5d][Headers={springintegration_sequenceSize=1, springintegration_replyChannel=org.springframework.integration.channel.MessageChannelTemplate$TemporaryReplyChannel@b7cee4, springintegration_timestamp=1252298384718, springintegration_id=47d26a08-e5f3-419a-bf41-d1fd1d83b8bd, springintegration_correlationId=0cfeb83e-385e-4bbf-924b-58982fa78f88, springintegration_sequenceNumber=1, springintegration_errorChannel=org.springframework.integration.channel.MessageChannelTemplate$TemporaryReplyChannel@b7cee4}]
                            2009-09-07 00:39:44,875 DEBUG [org.springframework.integration.aggregator.MethodInvokingAggregator] - org.springframework.integration.aggregator.MethodInvokingAggregator@c1429c received message: [Payload=gov.dc.dmv.destiny.eis.services.irp.domain.SearchResponse@598a5d][Headers={springintegration_sequenceSize=1, springintegration_replyChannel=org.springframework.integration.channel.MessageChannelTemplate$TemporaryReplyChannel@b7cee4, springintegration_timestamp=1252298384718, springintegration_id=47d26a08-e5f3-419a-bf41-d1fd1d83b8bd, springintegration_correlationId=0cfeb83e-385e-4bbf-924b-58982fa78f88, springintegration_sequenceNumber=1, springintegration_errorChannel=org.springframework.integration.channel.MessageChannelTemplate$TemporaryReplyChannel@b7cee4}]
                            2009-09-07 00:39:44,875 DEBUG [org.springframework.integration.util.DefaultMethodInvoker] - Invoking method [public int gov.dc.dmv.destiny.eis.services.irp.service.impl.SearchResponseAggregator.correlateBySearchId(gov.dc.dmv.destiny.eis.services.irp.domain.SearchResponse)] with arguments [{gov.dc.dmv.destiny.eis.services.irp.domain.SearchResponse@598a5d}]
                            2009-09-07 00:39:44,875 DEBUG [org.springframework.integration.util.DefaultMethodInvoker] - Invoking method [public boolean gov.dc.dmv.destiny.eis.services.irp.service.impl.SearchResponseAggregator.completionChecker(java.util.List)] with arguments [{[gov.dc.dmv.destiny.eis.services.irp.domain.SearchResponse@598a5d]}]
                            2009-09-07 00:39:44,875 DEBUG [org.springframework.integration.channel.DirectChannel] - postSend (sent=true) on channel 'responseAggregator', message: [Payload=gov.dc.dmv.destiny.eis.services.irp.domain.SearchResponse@598a5d][Headers={springintegration_sequenceSize=1, springintegration_replyChannel=org.springframework.integration.channel.MessageChannelTemplate$TemporaryReplyChannel@b7cee4, springintegration_timestamp=1252298384718, springintegration_id=47d26a08-e5f3-419a-bf41-d1fd1d83b8bd, springintegration_correlationId=0cfeb83e-385e-4bbf-924b-58982fa78f88, springintegration_sequenceNumber=1, springintegration_errorChannel=org.springframework.integration.channel.MessageChannelTemplate$TemporaryReplyChannel@b7cee4}]
                            2009-09-07 00:39:44,875 DEBUG [org.springframework.integration.channel.DirectChannel] - postSend (sent=true) on channel 'stopRequest', message: [Payload=gov.dc.dmv.destiny.eis.services.irp.domain.SearchRequest@7e05c4][Headers={springintegration_sequenceSize=1, springintegration_replyChannel=org.springframework.integration.channel.MessageChannelTemplate$TemporaryReplyChannel@b7cee4, springintegration_timestamp=1252298384718, springintegration_id=47d26a08-e5f3-419a-bf41-d1fd1d83b8bd, springintegration_correlationId=0cfeb83e-385e-4bbf-924b-58982fa78f88, springintegration_sequenceNumber=1, springintegration_errorChannel=org.springframework.integration.channel.MessageChannelTemplate$TemporaryReplyChannel@b7cee4}]
                            2009-09-07 00:39:44,875 DEBUG [org.springframework.integration.channel.DirectChannel] - preSend on channel 'ivsRequest', message: [Payload=gov.dc.dmv.destiny.eis.services.irp.domain.SearchRequest@7e05c4][Headers={springintegration_sequenceSize=1, springintegration_replyChannel=org.springframework.integration.channel.MessageChannelTemplate$TemporaryReplyChannel@b7cee4, springintegration_timestamp=1252298384718, springintegration_id=47d26a08-e5f3-419a-bf41-d1fd1d83b8bd, springintegration_correlationId=0cfeb83e-385e-4bbf-924b-58982fa78f88, springintegration_sequenceNumber=1, springintegration_errorChannel=org.springframework.integration.channel.MessageChannelTemplate$TemporaryReplyChannel@b7cee4}]
                            2009-09-07 00:39:44,875 DEBUG [org.springframework.integration.handler.ServiceActivatingHandler] - ServiceActivator for [org.springframework.integration.handler.MessageMappingMethodInvoker@9578c1] received message: [Payload=gov.dc.dmv.destiny.eis.services.irp.domain.SearchRequest@7e05c4][Headers={springintegration_sequenceSize=1, springintegration_replyChannel=org.springframework.integration.channel.MessageChannelTemplate$TemporaryReplyChannel@b7cee4, springintegration_timestamp=1252298384718, springintegration_id=47d26a08-e5f3-419a-bf41-d1fd1d83b8bd, springintegration_correlationId=0cfeb83e-385e-4bbf-924b-58982fa78f88, springintegration_sequenceNumber=1, springintegration_errorChannel=org.springframework.integration.channel.MessageChannelTemplate$TemporaryReplyChannel@b7cee4}]
                            2009-09-07 00:39:44,890 DEBUG [org.springframework.integration.util.DefaultMethodInvoker] - Invoking method [public gov.dc.dmv.destiny.eis.services.irp.domain.SearchResponse gov.dc.dmv.destiny.eis.services.irp.service.impl.IrpServiceImpl.getIvsResponse(gov.dc.dmv.destiny.eis.services.irp.domain.SearchRequest) throws gov.dc.dmv.destiny.eis.services.common.exception.InvalidDataException,gov.dc.dmv.destiny.eis.services.common.exception.DataAccessFailedException,gov.dc.dmv.destiny.eis.services.common.exception.InvalidRegionException,javax.xml.datatype.DatatypeConfigurationException] with arguments [{gov.dc.dmv.destiny.eis.services.irp.domain.SearchRequest@7e05c4}]
                            
                            2009-09-07 00:39:44,921 DEBUG [org.springframework.ws.client.core.WebServiceTemplate] - Opening [[email protected]1] to [https://web.dcivs.org/verifyinsurance.asmx]
                            
                            2009-09-07 00:39:57,171 DEBUG [org.springframework.ws.client.MessageTracing.received] - Received response [SaajSoapMessage {http://tempuri.org/IVS/VerifyInsurance}VerifyInsuranceResponse] for request [SaajSoapMessage {http://tempuri.org/IVS/VerifyInsurance}VerifyInsurance]
                            
                            2009-09-07 00:39:57,171 DEBUG [org.springframework.integration.channel.DirectChannel] - postSend (sent=true) on channel 'responseAggregator',
                            2009-09-07 00:39:57,171 DEBUG [org.springframework.integration.channel.DirectChannel] - postSend (sent=true) on channel 'ivsRequest', message: [Payload=gov.dc.dmv.destiny.eis.services.irp.domain.SearchRequest@7e05c4][Headers={springintegration_sequenceSize=1, springintegration_replyChannel=org.springframework.integration.channel.MessageChannelTemplate$TemporaryReplyChannel@b7cee4, springintegration_timestamp=1252298384718, springintegration_id=47d26a08-e5f3-419a-bf41-d1fd1d83b8bd, springintegration_correlationId=0cfeb83e-385e-4bbf-924b-58982fa78f88, springintegration_sequenceNumber=1, springintegration_errorChannel=org.springframework.integration.channel.MessageChannelTemplate$TemporaryReplyChannel@b7cee4}]
                            2009-09-07 00:39:57,171 DEBUG [org.springframework.integration.channel.DirectChannel] - postSend (sent=true) on channel 'searchRequest', message: [Payload=gov.dc.dmv.destiny.eis.services.irp.domain.SearchRequest@7e05c4][Headers={springintegration_sequenceSize=1, springintegration_replyChannel=org.springframework.integration.channel.MessageChannelTemplate$TemporaryReplyChannel@b7cee4, springintegration_timestamp=1252298384718, springintegration_id=47d26a08-e5f3-419a-bf41-d1fd1d83b8bd, springintegration_correlationId=0cfeb83e-385e-4bbf-924b-58982fa78f88, springintegration_sequenceNumber=1, springintegration_errorChannel=org.springframework.integration.channel.MessageChannelTemplate$TemporaryReplyChannel@b7cee4}]
                            2009-09-07 00:39:57,171 DEBUG [org.springframework.integration.channel.DirectChannel] - postSend (sent=true) on channel 'inputRequest', message: [Payload=[email protected]35]
                            2009-09-07 00:44:45,595 DEBUG [org.springframework.integration.aggregator.MethodInvokingAggregator] - Handling of Message group with correlation key '1' has timed out.
                            2009-09-07 00:44:45,595 DEBUG [org.springframework.integration.aggregator.MethodInvokingAggregator] - Handling of Message group with correlation key '1' has timed out.

                            Comment

                            Working...
                            X