Announcement Announcement Module
Collapse
No announcement yet.
How to add the suscribers to a publisher? Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • How to add the suscribers to a publisher?

    Hi!

    Ive a spring context like this:
    Code:
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    	xmlns:context="http://www.springframework.org/schema/context"
    	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:i="http://www.springframework.org/schema/integration"
    	xmlns:jdbc="http://www.springframework.org/schema/integration/jdbc"
    	xsi:schemaLocation="http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc.xsd
    		http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
    		http://www.springframework.org/schema/integration/jdbc http://www.springframework.org/schema/integration/jdbc/spring-integration-jdbc.xsd
    		http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
    		http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd">
    
    	<bean id="dataSource" class="oracle.jdbc.pool.OracleDataSource"
    		destroy-method="close">
    		<property name="connectionCachingEnabled" value="true" />
    		<property name="URL" value="jdbc:oracle:thin:@//localhost:1521/XE" />
    		<property name="password" value="admin" />
    		<property name="user" value="admin" />
    		<property name="connectionCacheProperties">
    			<props merge="default">
    				<prop key="MinLimit">3</prop>
    				<prop key="MaxLimit">20</prop>
    			</props>
    		</property>
    	</bean>
    
    	<i:channel id="coffeeChannel" />
    
    	<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
    	    <property name="dataSource" ref="dataSource"/>
    	</bean>
    
    	<bean id="CoffeBeverageMapper" class="org.springframework.integration.support.CoffeBeverageMapper" />
    
    	<jdbc:inbound-channel-adapter id="findAllCoffeeBeverages"
    		channel="coffeeChannel" data-source="dataSource"
    		query="SELECT id, coffee_name, coffee_description FROM coffee_beverages ORDER BY ID"
    		row-mapper="CoffeBeverageMapper"
    		max-rows-per-poll="100">
    		<i:poller trigger="trigerConsulta">
    			<i:transactional propagation="REQUIRED"
    				transaction-manager="transactionManager" />
    		</i:poller>
    	</jdbc:inbound-channel-adapter>
    
    	<bean id="trigerConsulta" class="org.springframework.scheduling.support.PeriodicTrigger">
    		<constructor-arg name="period" value="15000"/>
    		<property name="initialDelay" value="10000"/>
    	</bean>
    
    </beans>
    and I call it in the main like this:
    Code:
    final AbstractApplicationContext context =
    			new ClassPathXmlApplicationContext("classpath:META-INF/spring/integration/*-context.xml");
    
    		context.registerShutdownHook();
    				List<CoffeeBeverage> coffeeBeverages = service.findAllCoffeeBeverages();
    
    				if (coffeeBeverages!=null) {
    					for (CoffeeBeverage coffeeBeverage : coffeeBeverages) {
    						System.out.println(String.format("%s - %s", coffeeBeverage.getId(),
    																	coffeeBeverage.getName()));
    					}
    				} else {
    					System.out.println("coffeeBeverages: null");
    				}
    So when I run it with maven I get:
    Code:
    12:03:51.528 ERROR [task-scheduler-1][org.springframework.integration.handler.LoggingHandler] org.springframework.integration.MessageDeliveryException: Dispatcher has no subscribers for channel coffeeChannel.
    	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:82)
    	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:157)
    	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:128)
    	at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:288)
    	at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:149)
    	at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:97)
    	at org.springframework.integration.endpoint.AbstractTransactionSynchronizingPollingEndpoint.doPoll(AbstractTransactionSynchronizingPollingEndpoint.java:82)
    	at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:146)
    	at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:144)
    	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    	at java.lang.reflect.Method.invoke(Method.java:597)
    	at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:319)
    	at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183)
    	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)
    	at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:110)
    	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
    	at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:202)
    	at $Proxy15.call(Unknown Source)
    	at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:236)
    	at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:52)
    	at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:48)
    	at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:49)
    	at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:231)
    	at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:53)
    	at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
    	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
    	at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
    	at java.util.concurrent.FutureTask.run(FutureTask.java:138)
    	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:98)
    	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:206)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
    	at java.lang.Thread.run(Thread.java:662)
    Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
    	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:109)
    	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:102)
    	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
    	... 34 more
    1. Why is it asking me for suscribers if Im not using a Publish channel?
    2. So how do I add the suscribers?

    Thanks in advance.

  • #2
    Dispatcher has no subscribers for channel coffeeChannel.
    This simply means you have no endpoint declared to receive messages from coffeeChannel.

    Your JDBC inbound adapter is polled, creates a message and sends it to 'coffeeChannel' but there is nothing in your configuration to receive that message.

    What do you want to "do" with the message?

    Typically, you'd have some service invoked, such as

    Code:
    <i:service-activator input-channel="coffeeChannel" ref="myBean" output-channel="nextChannel" />

    Comment


    • #3
      I just want to print the message in console and sent it as you suggest to the next channel.
      So I did as you suggested but now I get:
      Code:
       Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.2:java (default-cli) on project oracle-stored-procedures: An exception occured while executing the Java class. null: InvocationTargetException: Error creating bean with name 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0': Cannot resolve reference to bean 'org.springframework.integration.config.ServiceActivatorFactoryBean#0' while setting bean property 'handler'; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.springframework.integration.config.ServiceActivatorFactoryBean#0': FactoryBean threw exception on object creation; nested exception is java.lang.IllegalArgumentException: Found ambiguous parameter type [interface java.util.List] for method match: [public void org.springframework.integration.model.CoffeeBeverage.setDescription(java.lang.String), public void org.springframework.integration.model.CoffeeBeverage.setId(java.lang.Integer), public java.lang.String org.springframework.integration.model.CoffeeBeverage.getDescription()] -> [Help 1]
      org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.2:java (default-cli) on project oracle-stored-procedures: An exception occured while executing the Java class. null

      Comment


      • #4
        Well, without seeing your configuration, I can't really tell what you did - the error message indicates you sent it to a bean with more than one method and the framework couldn't figure out which method to call - in that case, you have to add a 'method' attribute to the service-activator.

        However, if you just want to send it to the console, add a <int-stream:stdout-channel-adapter channel="coffeeChannel" /> and send it there.

        I suggest you browse the other samples (e.g. JMS) for examples.

        Comment


        • #5
          Ok I decided to simplify it, so I let my spring context like this:

          Code:
          <?xml version="1.0" encoding="UTF-8"?>
          <beans xmlns="http://www.springframework.org/schema/beans"
          	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
          	xmlns:int-jdbc="http://www.springframework.org/schema/integration/jdbc"
          	xsi:schemaLocation="http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
          		http://www.springframework.org/schema/integration/jdbc http://www.springframework.org/schema/integration/jdbc/spring-integration-jdbc.xsd
          		http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
          
          	<bean id="dataSource" class="oracle.jdbc.pool.OracleDataSource"
          		destroy-method="close">
          		<property name="connectionCachingEnabled" value="true" />
          		<property name="URL" value="jdbc:oracle:thin:@//localhost:1521/XE" />
          		<property name="password" value="admin" />
          		<property name="user" value="admin" />
          		<property name="connectionCacheProperties">
          			<props merge="default">
          				<prop key="MinLimit">3</prop>
          				<prop key="MaxLimit">20</prop>
          			</props>
          		</property>
          	</bean>
          
          	<int:channel id="findCoffeeProcedureRequestChannel" />
          	<int:channel id="findAllProcedureRequestChannel" />
          	<int:channel id="insertCoffee" />
          
          	<int:gateway id="gateway" default-request-timeout="5000"
          		default-reply-timeout="5000"
          		service-interface="org.springframework.integration.service.CoffeeService">
          		<int:method name="findCoffeeBeverage" request-channel="findCoffeeProcedureRequestChannel" />
          		<int:method name="findAllCoffeeBeverages" request-channel="findAllProcedureRequestChannel" />
          		<int:method name="insertCoffee" request-channel="insertCoffee" />
          	</int:gateway>
          
          	<int-jdbc:stored-proc-outbound-gateway
          		id="outbound-gateway-storedproc-find-coffee" data-source="dataSource"
          		request-channel="findCoffeeProcedureRequestChannel"
          		skip-undeclared-results="true" stored-procedure-name="FIND_COFFEE_BEVERAGES"
          		expect-single-result="true" is-function="true">
          		<int-jdbc:parameter name="PID" expression="payload" />
          	</int-jdbc:stored-proc-outbound-gateway>
          
          	<int-jdbc:stored-proc-outbound-gateway
          		id="outbound-gateway-storedproc-find-all" data-source="dataSource"
          		request-channel="findAllProcedureRequestChannel" expect-single-result="true"
          		stored-procedure-name="FIND_ALL_COFFEE_BEVERAGES" is-function="true">
          		<int-jdbc:returning-resultset name="coffeeBeverages"
          			row-mapper="org.springframework.integration.support.CoffeBeverageMapper" />
          	</int-jdbc:stored-proc-outbound-gateway>
          
          	<int-jdbc:stored-proc-outbound-gateway id="storeProcedureOutbound"
          		request-channel="insertCoffee" data-source="dataSource"
          		stored-procedure-name="agregar_edi">
          		<int-jdbc:parameter name="p_COFFEE_NAME" expression="payload.p_COFFEE_NAME" />
          		<int-jdbc:parameter name="p_COFFEE_DESCRIPTION" expression="payload.p_COFFEE_DESCRIPTION" />
          	</int-jdbc:stored-proc-outbound-gateway>
          
          </beans>
          My main is like this:
          Code:
          		final AbstractApplicationContext context =
          			new ClassPathXmlApplicationContext("classpath:META-INF/spring/integration/*-context.xml");
          
          		context.registerShutdownHook();
          
          		final CoffeeService service = context.getBean(CoffeeService.class);
          service.insertCoffee("Nescafe", "Huele rico!");
          So when I run it I get:
          Code:
          Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.2:java (default-cli) on project oracle-stored-procedures: An exception occured while executing the Java class. null: InvocationTargetException: failed to convert object to Message: At most one parameter (or expression via method-level @Payload) may be mapped to the payload or Message. Found more than one on method [public abstract void org.springframework.integration.service.CoffeeService.insertCoffee(java.lang.String,java.lang.String)] -> [Help 1]
          org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.2:java (default-cli) on project oracle-stored-procedures: An exception occured while executing the Java class. null
          	at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:217)
          	at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:153)
          	at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:145)
          	at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:84)
          	at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:59)
          	at org.apache.maven.lifecycle.internal.LifecycleStarter.singleThreadedBuild(LifecycleStarter.java:183)
          	at org.apache.maven.lifecycle.internal.LifecycleStarter.execute(LifecycleStarter.java:161)
          	at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:320)
          	at org.apache.maven.DefaultMaven.execute(DefaultMaven.java:156)
          	at org.apache.maven.cli.MavenCli.execute(MavenCli.java:537)
          	at org.apache.maven.cli.MavenCli.doMain(MavenCli.java:196)
          	at org.apache.maven.cli.MavenCli.main(MavenCli.java:141)
          	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
          	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
          	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
          	at java.lang.reflect.Method.invoke(Method.java:597)
          	at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced(Launcher.java:290)
          	at org.codehaus.plexus.classworlds.launcher.Launcher.launch(Launcher.java:230)
          	at org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode(Launcher.java:409)
          	at org.codehaus.plexus.classworlds.launcher.Launcher.main(Launcher.java:352)
          Caused by: org.apache.maven.plugin.MojoExecutionException: An exception occured while executing the Java class. null
          	at org.codehaus.mojo.exec.ExecJavaMojo.execute(ExecJavaMojo.java:346)
          	at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:101)
          	at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:209)
          	... 19 more
          why is it failing to convert my object to a message?
          Thanks in advance.

          Comment


          • #6
            A message can only have one payload - you are sending 2 strings - the framework can't figure out which is the payload.

            In any case, your stored procedure is looking for two properties on the payload

            Code:
            <int-jdbc:parameter name="p_COFFEE_NAME" expression="payload.p_COFFEE_NAME" />
            <int-jdbc:parameter name="p_COFFEE_DESCRIPTION" expression="payload.p_COFFEE_DESCRIPTION" />
            This implies the payload is a java bean with two properties, and 2 getters for those properties...

            Code:
            getP_COFFEE_NAME() and getP_COFFEE_DESCRIPTION()
            I suggest you browse the other samples, and the documentation (http://static.springsource.org/sprin...eference/html/) to get a better understanding of the framework.
            Last edited by Gary Russell; Feb 28th, 2013, 02:48 PM.

            Comment


            • #7
              Hi!
              Exception message says to you an advice
              At most one parameter (or expression via method-level @Payload) may be mapped to the payload or Message. Found more than one on method
              So, you should mark one of parameter of your 'insertCoffee' gateway's method as @Payload. From other side: another one should be marked with @Header: http://static.springsource.org/sprin...on-annotations

              Cheers,
              Artem

              Comment


              • #8
                Ok I read the documentation and I found 2 ways

                stored-proc-outbound-gateway (in the explanation they have the xml shows a definition with 2 parameters)

                Code:
                	<int-jdbc:stored-proc-outbound-gateway
                		id="storeProcedureOutbound" request-channel="insertCoffee" data-source="dataSource"
                		stored-procedure-name="agregar_edi">
                		<int-jdbc:sql-parameter-definition name="P_COFFEE_NAME"/>
                		<int-jdbc:sql-parameter-definition name="P_COFFEE_DESCRIPTION"/>
                		<int-jdbc:parameter name="P_COFFEE_NAME" />
                		<int-jdbc:parameter name="P_COFFEE_DESCRIPTION"  />
                	</int-jdbc:stored-proc-outbound-gateway>

                and

                stored-proc-outbound-channel-adapter

                Code:
                	<int-jdbc:stored-proc-outbound-channel-adapter
                		id="storeProcedureOutbound" channel="insertCoffee" data-source="dataSource"
                		stored-procedure-name="agregar_edi">
                		<int-jdbc:sql-parameter-definition name="P_COFFEE_NAME"/>
                		<int-jdbc:sql-parameter-definition name="P_COFFEE_DESCRIPTION"/>
                		<int-jdbc:parameter name="P_COFFEE_NAME" />
                		<int-jdbc:parameter name="P_COFFEE_DESCRIPTION"  />
                	</int-jdbc:stored-proc-outbound-channel-adapter>
                in the documentation (http://static.springsource.org/sprin...hannel-adapter)

                it shows an example with 2 parameters but it still tells me that I can have 2 parameters.

                I try both, but both return the exception about only accepting 1 parameter.

                Comment

                Working...
                X