Announcement Announcement Module
Collapse
No announcement yet.
Spring ApplicationEvent over AMQP Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Spring ApplicationEvent over AMQP

    Hi,
    I would like my spring application running on two separate machines to share Application Events. I know about http://static.springsource.org/sprin...plicationevent

    However, I am looking for an example of passing application events from one application instance through a rabbit queue and on the end a consumer reading them from the queue and firing those events in the other application instance.

    Or

    Can anyone suggest a better way of solving the sharing of application events ?

    Thanks

  • #2
    Spring Integration provides Channel Adapters for both ApplicationEvents and RabbitMQ. So you can simply connect the two.

    See these two sections of the reference manual for more info:

    ApplicationEvents: http://static.springsource.org/sprin...plicationevent
    RabbitMQ: http://static.springsource.org/sprin...mlsingle/#amqp

    Hope that helps.
    -Mark

    Comment


    • #3
      Thank you Mark for the suggestion,
      I created two applications
      1) Producer
      - An event is fired,
      - The event is picked up the event inbound channel adapter
      - The event is passed to a events Queue (rabbit queue)

      2) Consumer
      - Reads the event from the queue
      - Passes it to the event outbound channel adapter

      However, the event is not fired at the consumer end even though I can print the contents of the event. Upon debugging, I realized that the source of the event on the consumer side is null (as its a transient field). Can that be the cause ?

      Here are the code snippets:
      Rabbit MQ)
      HTML Code:
              <rabbit:connection-factory id="connectionFactory" />
      	<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />
      	<rabbit:admin connection-factory="connectionFactory" />
      
      	<rabbit:queue name="eventsQueue" />
      	<rabbit:direct-exchange name="eventsExchange">
      		<rabbit:bindings>
      			<rabbit:binding queue="eventsQueue" key="event" />
      		</rabbit:bindings>
      	</rabbit:direct-exchange>
      Producer)
      HTML Code:
              <int-event:inbound-channel-adapter
      		channel="eventChannel" event-types="com.company.testevents.TestEvent" />
      
      	<int:channel id="eventChannel" />
      
       	<int-amqp:outbound-channel-adapter
      		channel="eventChannel" amqp-template="amqpTemplate"
      		exchange-name="eventsExchange" routing-key="event" />
      Consumer)
      HTML Code:
              <int-amqp:inbound-channel-adapter
      		channel="printEventChannel" queue-names="eventsQueue"
      		connection-factory="connectionFactory" />
      
      	<int:channel id="returnEventChannel">
      		<int:queue />
      		<int:interceptors>
      			<int:wire-tap channel="printEventChannel"/>
      		</int:interceptors>
      	</int:channel>
      
      	 <int-event:outbound-channel-adapter
      		channel="returnEventChannel">
      		<int:poller max-messages-per-poll="1" fixed-rate="100" />
      	</int-event:outbound-channel-adapter>
      
      	<int:channel id="printEventChannel"/>
      	<int:service-activator ref="printer" input-channel="printEventChannel"/>
      Help would be much appreciated. Thanks,
      Rehan
      Last edited by rehanrauf; Jul 31st, 2012, 03:46 AM.

      Comment


      • #4
        Hi

        The first one: please, wrap your sources with [ CODE ] [ /CODE ] or [ HTML ] [ /HTML ] without whitespaces to make the post more readable.
        And now on the matter:
        (as its a transient field). Can that be the cause ?
        Yes it is.
        To fix your problem try to use payload-expression attribute on the <int-event:inbound-channel-adapter>:
        HTML Code:
        <int-event:inbound-channel-adapter
        channel="eventChannel" event-types="com.company.testevents.TestEvent" payload-expression="source"/>
        Where the rootObject of evaluationContext is entire ApplicationEvent.
        So, to the Rabbit queue will be send just the source of your event.

        On the other side in the Consumer <int-event:outbound-channel-adapter/> picks up the message from Rabbit queue and publish into ApplicationContext the instance of org.springframework.integration.event.core.Messagi ngEvent with entire Message as event source.

        Try that and let us know how it is,
        Artem Bilan

        Comment


        • #5
          Thanks Artem,
          However, adding the payload expression was not helpful. The source is always null at the other end.

          There are a couple of things I have been thinking about:
          1) The source is supposed to be an object in memory so clearly the source cannot go to an application running in a different jvm.

          2) Since ApplicationEvents are synchronous then how does connecting the
          Channel Adapters for both ApplicationEvents and RabbitMQ
          manage that ?. I don't think the sharing of application events between two application instances is as simple as connecting the two adapters. Can you guys comment on this please ?

          Help is much appreciated,
          Rehan

          Comment


          • #6
            Hello

            1.
            The source is always null at the other end
            But don't send to the external system any Java object. Where is a guaranty, that other side is Java too and the more it has in its CLASSPATH the same class for deserialization?
            Messaging is just middleware. It doesn't have any ability to work with buisiness objects. It just allows you to transport data between different systems via many existing protocols. To connect these protocols with messaging system there are some specific components - channel adapters.
            One of them is <int-event:inbound-channel-adapter/> which allows to listen an ApplicationEvent from the context and sends a Message into Spring Integration Bus. Inside this Bus Messages can contain in their payload any Object including your business object even without the implements Serializable. Because your classes and Spring Integration components are inside the same CLASSPATH of your application. But sending Message to the external system is a bit different and more complex. Here we should think about protocol, framework abilities and the more about data. The last one isn't important for messaging system, but the data has to be understandable for system, which will read this data from messaging system on the other side. So, here we should use some common types of data presentation: XML, JSON or simple String, it can be byte[] too. They can be transported on wires to
            messaging system (AMQP) and can be stored until consumer reads it from messaging system.
            Spring Integration architecture & design allow easy build the messaging application which can connect to the external systems by the same way as internally between business services. It sometimes confuses developers. Please, take a look into my presentation on the slide 11: http://www.slideshare.net/artembilan...n-but-not-only to figure out how it works inside the Spring Integration application.

            2.
            how does connecting the
            Channel Adapters for both ApplicationEvents and RabbitMQ
            manage that
            Manage what? It doesn't matter: <int-event:inbound-channel-adapter/> receives event and send a Message into a <channel>. This chennel has subscriber in the face of <int-amqp:outbound-channel-adapter/>. The last one connects to the AMQP-Broker and sends AMQP-Message to the external queue. And that's all: everithing works at the same thread.
            On the Consumer application, via your use-case, components are working as unmirrored.

            Here is the test-case as I undersatand your use-case:
            Sender:
            HTML Code:
            <rabbit:connection-factory id="rabbitConnectionFactory" />
            
            	<rabbit:admin connection-factory="rabbitConnectionFactory" />
            
            	<rabbit:queue name="eventsQueue"/>
            
            	<rabbit:template id="amqpTemplate" connection-factory="rabbitConnectionFactory"/>
            
            	<int-event:inbound-channel-adapter channel="amqpChannel" payload-expression="source"
            			event-types="com.my.EventAmqpTests$TestEvent"/>
            
            	<amqp:outbound-channel-adapter id="amqpChannel" routing-key="eventsQueue" />
            Receiver:
            HTML Code:
            <rabbit:connection-factory id="rabbitConnectionFactory"/>
            
            	<channel id="eventsQueue"/>
            
            	<amqp:inbound-channel-adapter channel="publishEventChannel" queue-names="eventsQueue"/>
            
            	<int-event:outbound-channel-adapter id="publishEventChannel"/>
            
            	<beans:bean class="com.my.EventAmqpTests$TestListener"/>
            Test class:
            Code:
            public class EventAmqpTests {
            
            	private final static String TEST_MESSAGE = "Hello through AMQP!";
            
            	private final static CountDownLatch latch = new CountDownLatch(1);
            
            	@Test
            	public void send() {
            		ApplicationContext ctx = new ClassPathXmlApplicationContext("EventSender-context.xml", EventAmqpTests.class);
            		ctx.publishEvent(new TestEvent(TEST_MESSAGE));
            	}
            
            	@Test
            	public void receive() throws InterruptedException {
            		new ClassPathXmlApplicationContext("EventReceiver-context.xml", EventAmqpTests.class);
            		assertTrue(latch.await(2, TimeUnit.SECONDS));
            	}
            
            	public static class TestEvent extends ApplicationEvent {
            
            		public TestEvent(Object source) {
            			super(source);
            		}
            	}
            
            	public static class TestListener implements ApplicationListener<MessagingEvent> {
            
            		@Override
            		public void onApplicationEvent(MessagingEvent event) {
            			assertEquals(TEST_MESSAGE, event.getMessage().getPayload());
            			latch.countDown();
            		}
            	}
            
            }
            For a better understanding of how it works, please, invoke send() & receive() methods independently: call send() and take a look how it is on Rabbit via his console. Then call receive() and you'll see that ApplicationEvent is received and message disappeared from queue on Broker.

            Is it what are you looking for?

            Cheers,
            Artem

            P.S. Sorry, If I was a wordy...

            Comment

            Working...
            X