Announcement Announcement Module
Collapse
No announcement yet.
Dispatcher has no subscribers when creating a new source Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Dispatcher has no subscribers when creating a new source

    Hi there, I'm writing a simple source module. I read the docs, and it's somehow similar to the Netty Http source module.

    Here's my xml config:

    Code:
     <?xml version="1.0" encoding="UTF-8"?>
      <beans xmlns="http://www.springframework.org/schema/beans"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xmlns:context="http://www.springframework.org/schema/context"
      xmlns:int="http://www.springframework.org/schema/integration"
      xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
              http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd
              http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-4.0.xsd">
       
      <int:channel id="output"/>
       
       
      <bean id="randomData" class="com.gopivotal.dataloader.RandomDataProducer">
      <property name="outputChannel" ref="output"/>
      <property name="resource" value="${xd.config.home}/${configFile:stream}.yml">
      </property>
      </bean>
       
      </beans>
     
      public class RandomDataProducer extends MessageProducerSupport{
       
      private long startTime;
          
      private volatile long itemCount = 0L;
          
          private StreamDefinition definition;
          
          private Resource resource;
          
          
      @PostConstruct
          public void setup() throws IOException{
              Yaml yaml = new Yaml();
              Map<String,Object> root = yaml.loadAs(resource.getInputStream(),Map.class);
              Map<String,Object> loaderMap = (Map<String, Object>) root.get("loader");
              LoaderDefinition loader = new LoaderDefinition(loaderMap);
              this.definition = loader.getDefinitions().get(0);
              run();
          }
          
      public void run(){
              while(canRun()){
                  for(ColumnDefinition column : definition.getColumns()){
                      StringBuilder buffer = new StringBuilder();
                      buffer.append(column.getTypeDefinition().nextValue());
                      buffer.append(definition.getDelimiter());
                      Message<String> msg = MessageBuilder.withPayload(buffer.toString()).build();
                      sendMessage(msg);
                  }
                  this.itemCount++;
              }
          }

    The source is just a random generator of data, but after deploying the module and trying to create a simple stream:

    stream create --name myrandomstream --definition "random | file"

    I'm getting an exception saying that there's no dispatcher configured for my channel.

    1:46:13,598 ERROR http-nio-9393-exec-4 rest.RestControllerAdvice:186 - Caught exception while handling a request
    org.springframework.integration.MessageHandlingExc eption: error occurred in message handler [moduleDeployer]
    at org.springframework.integration.handler.AbstractMe ssageHandler.handleMessage(AbstractMessageHandler. java:79)
    at org.springframework.integration.config.ServiceActi vatorFactoryBean$1.handleRequestMessage(ServiceAct ivatorFactoryBean.java:83)
    at org.springframework.integration.handler.AbstractRe plyProducingMessageHandler.handleMessageInternal(A bstractReplyProducingMessageHandler.java:142)
    at org.springframework.integration.handler.AbstractMe ssageHandler.handleMessage(AbstractMessageHandler. java:73)
    at org.springframework.integration.dispatcher.Unicast ingDispatcher.doDispatch(UnicastingDispatcher.java :115)
    at org.springframework.integration.dispatcher.Unicast ingDispatcher.dispatch(UnicastingDispatcher.java:1 02)
    at org.springframework.integration.channel.AbstractSu bscribableChannel.doSend(AbstractSubscribableChann el.java:77)
    at org.springframework.integration.channel.AbstractMe ssageChannel.send(AbstractMessageChannel.java:183)
    at org.springframework.integration.channel.AbstractMe ssageChannel.send(AbstractMessageChannel.java:153)
    at org.springframework.messaging.core.GenericMessagin gTemplate.doSend(GenericMessagingTemplate.java:114 )
    at org.springframework.messaging.core.GenericMessagin gTemplate.doSend(GenericMessagingTemplate.java:44)
    at org.springframework.messaging.core.AbstractMessage SendingTemplate.send(AbstractMessageSendingTemplat e.java:92)
    at org.springframework.integration.handler.AbstractRe plyProducingMessageHandler.sendMessage(AbstractRep lyProducingMessageHandler.java:228)
    at org.springframework.integration.handler.AbstractRe plyProducingMessageHandler.sendReplyMessage(Abstra ctReplyProducingMessageHandler.java:212)
    at org.springframework.integration.handler.AbstractRe plyProducingMessageHandler.produceReply(AbstractRe plyProducingMessageHandler.java:177)
    at org.springframework.integration.handler.AbstractRe plyProducingMessageHandler.handleResult(AbstractRe plyProducingMessageHandler.java:171)
    at org.springframework.integration.handler.AbstractRe plyProducingMessageHandler.handleMessageInternal(A bstractReplyProducingMessageHandler.java:149)
    at org.springframework.integration.handler.AbstractMe ssageHandler.handleMessage(AbstractMessageHandler. java:73)
    at org.springframework.integration.dispatcher.Unicast ingDispatcher.doDispatch(UnicastingDispatcher.java :115)
    at org.springframework.integration.dispatcher.Unicast ingDispatcher.dispatch(UnicastingDispatcher.java:1 02)
    at org.springframework.integration.channel.AbstractSu bscribableChannel.doSend(AbstractSubscribableChann el.java:77)
    at org.springframework.integration.channel.AbstractMe ssageChannel.send(AbstractMessageChannel.java:183)
    at org.springframework.integration.channel.AbstractMe ssageChannel.send(AbstractMessageChannel.java:153)
    at org.springframework.xd.dirt.stream.DeploymentMessa geSender.sendDeploymentRequests(DeploymentMessageS ender.java:57)
    at org.springframework.xd.dirt.stream.AbstractDeploye r.sendDeploymentRequests(AbstractDeployer.java:163 )
    at org.springframework.xd.dirt.stream.AbstractDeploye r.basicDeploy(AbstractDeployer.java:204)
    at org.springframework.xd.dirt.stream.AbstractInstanc ePersistingDeployer.deploy(AbstractInstancePersist ingDeployer.java:78)
    at org.springframework.xd.dirt.rest.XDController.save (XDController.java:229)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Nativ e Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Native MethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(De legatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.springframework.web.method.support.InvocableHa ndlerMethod.invoke(InvocableHandlerMethod.java:214 )
    at org.springframework.web.method.support.InvocableHa ndlerMethod.invokeForRequest(InvocableHandlerMetho d.java:132)
    at org.springframework.web.servlet.mvc.method.annotat ion.ServletInvocableHandlerMethod.invokeAndHandle( ServletInvocableHandlerMethod.java:104)
    at org.springframework.web.servlet.mvc.method.annotat ion.RequestMappingHandlerAdapter.invokeHandleMetho d(RequestMappingHandlerAdapter.java:748)
    at org.springframework.web.servlet.mvc.method.annotat ion.RequestMappingHandlerAdapter.handleInternal(Re questMappingHandlerAdapter.java:689)
    at org.springframework.web.servlet.mvc.method.Abstrac tHandlerMethodAdapter.handle(AbstractHandlerMethod Adapter.java:83)
    at org.springframework.web.servlet.DispatcherServlet. doDispatch(DispatcherServlet.java:945)
    at org.springframework.web.servlet.DispatcherServlet. doService(DispatcherServlet.java:876)
    at org.springframework.web.servlet.FrameworkServlet.p rocessRequest(FrameworkServlet.java:931)
    at org.springframework.web.servlet.FrameworkServlet.d oPost(FrameworkServlet.java:833)
    at javax.servlet.http.HttpServlet.service(HttpServlet .java:647)
    at org.springframework.web.servlet.FrameworkServlet.s ervice(FrameworkServlet.java:807)
    at javax.servlet.http.HttpServlet.service(HttpServlet .java:728)
    at org.apache.catalina.core.ApplicationFilterChain.in ternalDoFilter(ApplicationFilterChain.java:305)
    at org.apache.catalina.core.ApplicationFilterChain.do Filter(ApplicationFilterChain.java:210)
    at org.springframework.boot.actuate.trace.WebRequestT raceFilter.doFilter(WebRequestTraceFilter.java:114 )
    at org.apache.catalina.core.ApplicationFilterChain.in ternalDoFilter(ApplicationFilterChain.java:243)
    at org.apache.catalina.core.ApplicationFilterChain.do Filter(ApplicationFilterChain.java:210)
    at org.springframework.boot.actuate.autoconfigure.End pointWebMvcAutoConfiguration$1.doFilterInternal(En dpointWebMvcAutoConfiguration.java:126)
    at org.springframework.web.filter.OncePerRequestFilte r.doFilter(OncePerRequestFilter.java:108)
    at org.apache.catalina.core.ApplicationFilterChain.in ternalDoFilter(ApplicationFilterChain.java:243)
    at org.apache.catalina.core.ApplicationFilterChain.do Filter(ApplicationFilterChain.java:210)
    at org.springframework.web.filter.HiddenHttpMethodFil ter.doFilterInternal(HiddenHttpMethodFilter.java:7 7)
    at org.springframework.web.filter.OncePerRequestFilte r.doFilter(OncePerRequestFilter.java:108)
    at org.apache.catalina.core.ApplicationFilterChain.in ternalDoFilter(ApplicationFilterChain.java:243)
    at org.apache.catalina.core.ApplicationFilterChain.do Filter(ApplicationFilterChain.java:210)
    at org.springframework.web.filter.HttpPutFormContentF ilter.doFilterInternal(HttpPutFormContentFilter.ja va:88)
    at org.springframework.web.filter.OncePerRequestFilte r.doFilter(OncePerRequestFilter.java:108)
    at org.apache.catalina.core.ApplicationFilterChain.in ternalDoFilter(ApplicationFilterChain.java:243)
    at org.apache.catalina.core.ApplicationFilterChain.do Filter(ApplicationFilterChain.java:210)
    at org.springframework.boot.actuate.autoconfigure.Met ricFilterAutoConfiguration$MetricsFilter.doFilter( MetricFilterAutoConfiguration.java:97)
    at org.springframework.boot.actuate.autoconfigure.Met ricFilterAutoConfiguration$MetricsFilter.doFilter( MetricFilterAutoConfiguration.java:82)
    at org.apache.catalina.core.ApplicationFilterChain.in ternalDoFilter(ApplicationFilterChain.java:243)
    at org.apache.catalina.core.ApplicationFilterChain.do Filter(ApplicationFilterChain.java:210)
    at org.apache.catalina.core.StandardWrapperValve.invo ke(StandardWrapperValve.java:222)
    at org.apache.catalina.core.StandardContextValve.invo ke(StandardContextValve.java:123)
    at org.apache.catalina.authenticator.AuthenticatorBas e.invoke(AuthenticatorBase.java:472)
    at org.apache.catalina.valves.RemoteIpValve.invoke(Re moteIpValve.java:680)
    at org.apache.catalina.core.StandardHostValve.invoke( StandardHostValve.java:171)
    at org.apache.catalina.valves.ErrorReportValve.invoke (ErrorReportValve.java:99)
    at org.apache.catalina.core.StandardEngineValve.invok e(StandardEngineValve.java:118)
    at org.apache.catalina.connector.CoyoteAdapter.servic e(CoyoteAdapter.java:407)
    at org.apache.coyote.http11.AbstractHttp11Processor.p rocess(AbstractHttp11Processor.java:1004)
    at org.apache.coyote.AbstractProtocol$AbstractConnect ionHandler.process(AbstractProtocol.java:589)
    at org.apache.tomcat.util.net.NioEndpoint$SocketProce ssor.run(NioEndpoint.java:1680)
    at java.util.concurrent.ThreadPoolExecutor.runWorker( ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)
    Caused by: org.springframework.beans.factory.BeanCreationExce ption: Error creating bean with name 'randomData': Invocation of init method failed; nested exception is org.springframework.messaging.MessageDeliveryExcep tion: Dispatcher has no subscribers for channel 'SimpleModule [name=random, type=source, group=http2file, index=0 @336395be]:9393.output'.
    at org.springframework.beans.factory.annotation.InitD estroyAnnotationBeanPostProcessor.postProcessBefor eInitialization(InitDestroyAnnotationBeanPostProce ssor.java:136)
    at org.springframework.beans.factory.support.Abstract AutowireCapableBeanFactory.applyBeanPostProcessors BeforeInitialization(AbstractAutowireCapableBeanFa ctory.java:407)
    at org.springframework.beans.factory.support.Abstract AutowireCapableBeanFactory.initializeBean(Abstract AutowireCapableBeanFactory.java:1545)
    at org.springframework.beans.factory.support.Abstract AutowireCapableBeanFactory.doCreateBean(AbstractAu towireCapableBeanFactory.java:539)
    at org.springframework.beans.factory.support.Abstract AutowireCapableBeanFactory.createBean(AbstractAuto wireCapableBeanFactory.java:475)
    at org.springframework.beans.factory.support.Abstract BeanFactory$1.getObject(AbstractBeanFactory.java:3 04)
    at org.springframework.beans.factory.support.DefaultS ingletonBeanRegistry.getSingleton(DefaultSingleton BeanRegistry.java:228)
    at org.springframework.beans.factory.support.Abstract BeanFactory.doGetBean(AbstractBeanFactory.java:300 )
    at org.springframework.beans.factory.support.Abstract BeanFactory.getBean(AbstractBeanFactory.java:195)
    at org.springframework.beans.factory.support.DefaultL istableBeanFactory.preInstantiateSingletons(Defaul tListableBeanFactory.java:700)
    at org.springframework.context.support.AbstractApplic ationContext.finishBeanFactoryInitialization(Abstr actApplicationContext.java:760)
    at org.springframework.context.support.AbstractApplic ationContext.refresh(AbstractApplicationContext.ja va:482)
    at org.springframework.boot.SpringApplication.refresh (SpringApplication.java:609)
    at org.springframework.boot.SpringApplication.run(Spr ingApplication.java:321)
    at org.springframework.boot.builder.SpringApplication Builder.run(SpringApplicationBuilder.java:130)
    at org.springframework.xd.module.core.SimpleModule.in itialize(SimpleModule.java:181)
    at org.springframework.xd.dirt.module.ModuleDeployer. deploy(ModuleDeployer.java:264)
    at org.springframework.xd.dirt.module.ModuleDeployer. deployAndStore(ModuleDeployer.java:254)
    at org.springframework.xd.dirt.module.ModuleDeployer. handleDeploy(ModuleDeployer.java:249)
    at org.springframework.xd.dirt.module.ModuleDeployer. handleSingleModuleMessage(ModuleDeployer.java:227)
    at org.springframework.xd.dirt.module.ModuleDeployer. handleMessageInternal(ModuleDeployer.java:154)
    at org.springframework.integration.handler.AbstractMe ssageHandler.handleMessage(AbstractMessageHandler. java:73)
    ... 79 more
    Caused by: org.springframework.messaging.MessageDeliveryExcep tion: Dispatcher has no subscribers for channel 'SimpleModule [name=random, type=source, group=http2file, index=0 @336395be]:9393.output'.
    at org.springframework.integration.channel.AbstractSu bscribableChannel.doSend(AbstractSubscribableChann el.java:81)
    at org.springframework.integration.channel.AbstractMe ssageChannel.send(AbstractMessageChannel.java:183)
    at org.springframework.integration.channel.AbstractMe ssageChannel.send(AbstractMessageChannel.java:153)
    at org.springframework.messaging.core.GenericMessagin gTemplate.doSend(GenericMessagingTemplate.java:114 )
    at org.springframework.messaging.core.GenericMessagin gTemplate.doSend(GenericMessagingTemplate.java:44)
    at org.springframework.messaging.core.AbstractMessage SendingTemplate.send(AbstractMessageSendingTemplat e.java:92)
    at org.springframework.integration.endpoint.MessagePr oducerSupport.sendMessage(MessageProducerSupport.j ava:92)
    at com.gopivotal.dataloader.RandomDataProducer.run(Ra ndomDataProducer.java:47)
    at com.gopivotal.dataloader.RandomDataProducer.setup( RandomDataProducer.java:37)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Nativ e Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Native MethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(De legatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.springframework.beans.factory.annotation.InitD estroyAnnotationBeanPostProcessor$LifecycleElement .invoke(InitDestroyAnnotationBeanPostProcessor.jav a:349)
    at org.springframework.beans.factory.annotation.InitD estroyAnnotationBeanPostProcessor$LifecycleMetadat a.invokeInitMethods(InitDestroyAnnotationBeanPostP rocessor.java:300)
    at org.springframework.beans.factory.annotation.InitD estroyAnnotationBeanPostProcessor.postProcessBefor eInitialization(InitDestroyAnnotationBeanPostProce ssor.java:133)
    ... 100 more
    Caused by: org.springframework.integration.MessageDispatching Exception: Dispatcher has no subscribers
    at org.springframework.integration.dispatcher.Unicast ingDispatcher.doDispatch(UnicastingDispatcher.java :109)
    at org.springframework.integration.dispatcher.Unicast ingDispatcher.dispatch(UnicastingDispatcher.java:1 02)
    at org.springframework.integration.channel.AbstractSu bscribableChannel.doSend(AbstractSubscribableChann el.java:77)
    ... 115 more
    Any ideas? I can't find where my mistake might be.

    Regards
Working...
X