Announcement Announcement Module
Collapse
No announcement yet.
Point to Point Communication achieved Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Point to Point Communication achieved

    I've managed to hook up some working point-to-point communication now that I realized that channels are directional.

    For each request/response I would need two queues and four channels set up, and while the xml is straightforward to set up doing this for tons of different requests will be error prone, so I am trying to wrap up this set up into a Proxy class and a Stub class.

    Code:
    	<channel id="statusRequestProducerChannel"/>
    	
    	<jms-target connection-factory="connectionFactory" destination="statusRequestQueue" channel="statusRequestProducerChannel"/>
    
        <beans:bean id="statusRequestQueue" class="org.apache.activemq.command.ActiveMQQueue">
        	<beans:constructor-arg><beans:value>statusRequest.que</beans:value></beans:constructor-arg>
        </beans:bean>
    
    
    	<channel id="statusRequestConsumerChannel"/>
    	
    	<jms-source connection-factory="connectionFactory" destination="statusRequestQueue" channel="statusRequestConsumerChannel"/>
    
     	<endpoint input-channel="statusRequestConsumerChannel" handler-ref="statusResponder">
    		<concurrency core="1" max="1" queue-capacity="1" keep-alive="10" />
    	</endpoint>    
    
    	<beans:bean id="statusResponder"
    		class="nz.co.bonusbonds.bbo.stub.manager.StatusResponder">
    	</beans:bean>
    
    	<channel id="statusResponseProducerChannel"/>
    
    	<jms-target connection-factory="connectionFactory" destination="statusResponseQueue" channel="statusResponseProducerChannel"/>
    
        <beans:bean id="statusResponseQueue" class="org.apache.activemq.command.ActiveMQQueue">
        	<beans:constructor-arg><beans:value>statusResponse.que</beans:value></beans:constructor-arg>
        </beans:bean>
    
    	<channel id="statusResponseConsumerChannel"/>
    
    	<jms-source connection-factory="connectionFactory" destination="statusResponseQueue" channel="statusResponseConsumerChannel"/>
    I can see how most of the spring hookup results in java code but am not sure how the MessageChannel gets created, or how the jms-target and channel get linked (i.e. in the code below the new MessageChannel() does not compile because MessageChannel is an interface, and target.setChannel(input); does not compile because there is no such method, and none seems to exist in the other direction either (i.e. input.setTarget(target)))
    Code:
        @Override
        protected MessageChannel getInputChannel() {
    //        <channel id="statusRequestProducerChannel"/>
            MessageChannel input=new MessageChannel();
    
    //<beans:bean id="statusRequestQueue" class="org.apache.activemq.command.ActiveMQQueue">
    //    <beans:constructor-arg><beans:value>statusRequest.que</beans:value></beans:constructor-arg>
    //</beans:bean>
            Destination destination=new ActiveMQQueue(getRequestQueueName());
            
    //        <jms-target connection-factory="connectionFactory" destination="statusRequestQueue" channel="statusRequestProducerChannel"/>
            JmsTargetAdapter target=new JmsTargetAdapter();
            target.setConnectionFactory(getConnectionFactory());
            target.setDestination(destination);
            target.setChannel(input);
            
            return input;
    
        }
    Any ideas how I can hook this up? If it is possible the setup of a request/response pair would reduce down from the XML shown to just
    Code:
            Proxy<String, String> proxy=new Proxy<String, String>(STATUS);
            Stub<String,String> stub=new Stub<String, String>(STATUS){
                public String handle(String payload) {
                    return "Processed "+payload;
                }
            };
            proxy.start();
            stub.start();
            String result=proxy.execute("Status: OK");
            proxy.stop();
            stub.stop();

  • #2
    MessageChannel channel=new SimpleChannel(); solves one problem.
    SourceAdapter=new JmsMessageDrivenSourceAdapter(); solves the second problem.

    Still not sure how to get the channel and target adapter linked. (The JmsSourceAdapter s have a setChannel but the JmsTargetAdapter doesn't).

    Comment


    • #3
      So I checked out the JmsTargetAdapterParser and it seems to create a DefaultMessageEndpoint with a Subscription for the channel, and the JmsTargetAdapter as the handler for the endpoint. Following this I can get the code to compile and run, but it is not resulting in any messages being sent (and therefore received).

      Comment


      • #4
        OK. So now I am calling registerSourceAdapter() on the message bus (and for the endpoints), setting the name of the channel, calling afterPropertiesSet() on the adapters and endpoints, and calling .start() on the endpoints... and it is almost working.

        Comment


        • #5
          Well, I still can't get it working. Seems that I am missing something. It should be possible to create this stuff dynamically in code right? (i.e. if you don't know your channel/que names until runtime).

          I'm at the point where I may just look at dynamically creating the xml for this, and then getting the AppContext to parse it, rather than plugging it in myself in code... but that seems like a dreadful solution...

          Comment


          • #6
            You might want to have a look at the RequestReplyTemplate that was added in M2. Here's a quick example of how it works:
            Code:
            RequestReplyTemplate template = new RequestReplyTemplate(requestChannel);
            Message<?> reply = template.request(new StringMessage("test"));
            The target adapters are all just specialized implementations of MessageHandler, so endpoints will delegate to them for Messages that are received, and the channel's dispatcher will poll the channel for messages based on a Subscription. In other words, the target adapter parsers are basically shorthands for something like the following:
            Code:
            <endpoint input-channel="inChannel" handler-ref="someTargetAdapter">
                <schedule period="5000"/>
            </endpoint>

            Comment


            • #7
              OK. I got the dynamic creation of XML working, but it only works on the first try of the web application, since I am not unregistering producers/consumers on the queue, and I'm guessing that one of the existing ones is getting the reply message.

              Comment


              • #8
                Originally posted by Mark Fisher View Post
                You might want to have a look at the
                Code:
                <endpoint input-channel="inChannel" handler-ref="someTargetAdapter">
                    <schedule period="5000"/>
                </endpoint>
                Is 5 seconds the default? Does it force a poll when you do a receive()?

                Comment


                • #9
                  WooHoo! I'm just a dumbass My Stub was handling the message and returning the processed version of the message, NOT sending it to the response queue. Everything is working now and I just have Stub and Proxy classes to deal with. Thanks for the help.

                  Comment


                  • #10
                    Hmmm, Whatever I'm doing is only working about 75% of the time.

                    To clean up objects, should I call .stop() on endpoints and source adapters?

                    Also, is there going to be an unregister method on the message bus for each register method (and a lookup message)?

                    Comment

                    Working...
                    X