Announcement Announcement Module
No announcement yet.
Adding a MessageEndPoint on runtime Page Title Module
Move Remove Collapse
Conversation Detail Module
  • Filter
  • Time
  • Show
Clear All
new posts

  • Adding a MessageEndPoint on runtime

    I would like to add a annotation message end point to the message bus on runtime. I am not sure how to utilize the Publisher/Subscriber AnnotationPostProcessor.

    Is there a easier way to do this?



  • #2
    In most cases it is not needed to add the endpoint at runtime. You can either change the behavior of an endpoint at runtime or use a detour (for example by changing an attribute on a @Router through jmx).

    If you really need to change the running system you should consider osgi. Osgi has the semantics that are perfect for this kind of job. It shouldn't be that hard go figure out yourself, but we will publish something on the topic before the final release.

    Let me know if you are trying, I can probably help you along the way.


    • #3
      That is exactly what I am trying to do right now with osgi. I would like to be able to publish a plain java class that is annotated with spring integration annotation and get process and add into the MessageBus as a MessageEndPoint.

      Since osgi services will not contain any annotations, I can't have my message bus bundle add them in dynamically. My approach is to process the annotation in the publisher/subscriber bundle and inject into the message bus. The message bus is published as a service.


      • #4
        There are different ways to look at it, but my first impression would be is that it is easier to confine a message bus to a single bundle and use <service-activator/>s for all inter bundle messaging. This might not match your view of what a channel is, but it will make things a lot less complicated on a technical level.

        Only one bundle needs to depend on SI. It also keeps a more clear separation between the concern of the channel (decoupling and enabling asynchronous messaging) and the implementation of the transport (osgi).

        I really need to write a sample for this soon, because I didn't even check if a <service-activator/> on a <reference/> works, but if it doesn't I would consider that a bug.

        In your case when loading the bundle you would load the internal message bus and the endpoints registered with that it at the same time. This would allow SI to remain oblivious of osgi and that makes my job a lot easier too
        Last edited by iwein; Sep 27th, 2008, 10:38 PM.


        • #5
          Thanks Iwein. I didn't like having the message bus bundle to have knowledge about the subscriber bundle using the service-activator. Every new subscriber bundle, i will need to export those classes and import them to the message bus bundle and add a service-activator in the configuration. My approach is described below, please feel free to give me any comments.

          In Bundle A, this is where I setup the message bus and channel, I exported the internal.MessageBus to the osgi service registry.

          <message-bus />
          <context:component-scan base-package="com.example.messagebus" />
          <channel id="events"/>	
          <channel id="characterEvents"/>	
          <publish-subscribe-channel id="presenceChangedEvents"/>
          <osgi:service ref="internal.MessageBus" interface="org.springframework.integration.bus.MessageBus" />
          Bundle B, this is the bundle where I want to publish message to the channel.
          One problem I had with the @publisher annotation is my message did not get publish if the class is calling itself. I originally had an init method in the DummyService that use a ScheduledExecutor to fire fake event. Once I used another class to invoke the annotated publisher method, it works fine. The subscriber bundle does the same thing described below. One important thing is you will have to reference the message bus with "internal.MessageBus", since the PostProcessor is looking for that bean definition name when trying to resolve the channel or add a new EndPoint.

          <osgi:reference id="internal.MessageBus" interface="org.springframework.integration.bus.MessageBus" />
          <bean id="dummyService" 
            class="com.example.messagebus.publisher.DummyService /> 
          However, this approach will cause every bundle depend on SI. I don't think that will be a big deal, since every bundle need spring anyways.


          Last edited by markous; Sep 29th, 2008, 03:50 PM.


          • #6
            I might be off topic here... I am looking at the MessagePublishingInterceptor. I realized that the message will send prior to the method being processed if you are using ARGUMENTS as the PayloadType.

            I think that is the correct behavior.

            However, most of the time one would want to publish a message only if the method is successfully completed. Is it possible to make it flexible and have an option to allow the argument to be send after the method is processed?

            Also the MessagePublishingInterceptor is always going to send the object array to the channel unless there is a way to specify the MessageCreator. Is there a way to change the MessageCreator implementation? I guess that is something that will go into the @Publisher annotation?


            • #7
              Offtopic: I think it is time you moved away from M6 and started working on HEAD (until there is another milestone). You're obviously smart enough to deal with it and you probably don't want to get too attached to the @Publisher for example.

              At the moment HEAD is quite stable, and it has some drastic improvements over M6.


              • #8
                Osgi Integration Patterns

                Very interesting you have a working sample that exposes the MB. Now I have seen three flavors:
                1. Expose MessageChannels (Dave Syer)
                2. Expose MessageBus (Michael)
                3. Expose custom service (Iwein (my sample is done since yesterday))

                The way I did it was this:

                Producer bundle
                Has a Timer based component that pushes Strings to a reference to an InputGateway (which is defined in a separate bundle: Contracts):
                public interface InputGateway {
                	void consume(String string);
                You could send any domain object of course.

                Consumer bundle
                Let's Spring integration implement the InputGateway like this:
                	<gateway id="inputGateway" default-request-channel="filtered"
                		service-interface="com.springsource.samples.integration.osgi.input.InputGateway" />
                	<publish-subscribe-channel id="filtered" />
                	<beans:bean id="channelSubscriber"
                		class="com.springsource.samples.integration.osgi.consumer.DefaultChannelSubscriber" />
                That <gateway/> is just awesome: you don't even need to implement the interface .

                The DefaultChannelSubscriber implements:
                public interface OutputDispatcherSubscriber {
                	void subscribe(InputGateway consumer);
                	void unsubscribe(InputGateway consumer);
                Reusing the InputGateway here, which might not be the way to go, but hey this is just a sample.

                The consumer bundle exports a service to input messages and a service to register for output like this:
                	<osgi:service id="collectorService" ref="inputGateway"
                		interface="com.springsource.samples.integration.osgi.input.InputGateway" />
                	<osgi:service id="subscriberService" ref="channelSubscriber"
                		interface="com.springsource.samples.integration.osgi.output.OutputDispatcherSubscriber" />
                Now where do the messages go?

                The sink bundle
                The sink has an implementation of the (reused ) InputGateway, that registers itself upon activation.
                public class Sink implements InputGateway {
                	private static final Log logger = LogFactory.getLog(Sink.class);
                	OutputDispatcherSubscriber subscriberService;
                	public void subscribeThis() {
        "subscribing sink");
                	public void consume(String string) {
         + " sunk in " + this.getClass().getPackage());
                So this set's the stage: three flavors exposing MessageBus, MessageChannel, custom service respectively.

                What do you think?