Announcement Announcement Module
Collapse
No announcement yet.
How to stop a MessageChannel Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • How to stop a MessageChannel

    Hello,

    I'm playing with SI for a while and I like it very much.
    But I have one question:

    The MessageChannel interface provides the methods receive() and receive(long timeout). Both methods will block until a message is available. When a consumer thread calls the first method or the second with a long timeout how can the thread be interrupted when the MessageBus in which the channel is registered will be stopped?

    Thank's,
    Thomas

  • #2
    Code:
    	/**
    	 * Receive the first available message from this channel. If the channel
    	 * contains no messages, this method will block.
    	 * 
    	 * @return the first available message or <code>null</code> if the
    	 * receiving thread is interrupted.
    	 */
    The thread will automatically be interrupted when you stop the MessageBus I would expect. Doesn't this happen?

    Comment


    • #3
      ... no, I have checked this:

      Code:
          public void testStopMessageChannel() throws Exception {
              MessageBus bus = new MessageBus();
              final MessageChannel channel = new SimpleChannel();
              bus.registerChannel("myChannel", channel);
              bus.start();
              final CountDownLatch latch = new CountDownLatch(1);
              Runnable consumer = new Runnable() {
                  public void run() {
                      channel.receive();
                      latch.countDown();
                  }
              };
              Thread consumerThread = new Thread(consumer);
              consumerThread.start();
              Thread.sleep(500);
              bus.stop();
              latch.await(1500, TimeUnit.MILLISECONDS);
              Assert.assertEquals("message channel not stopped", 0, latch.getCount());
          }
      - Thomas

      Comment


      • #4
        Originally posted by Thomas Ziem View Post
        ... no, I have checked this:

        Code:
            public void testStopMessageChannel() throws Exception {
                MessageBus bus = new MessageBus();
                final MessageChannel channel = new SimpleChannel();
                bus.registerChannel("myChannel", channel);
                bus.start();
                final CountDownLatch latch = new CountDownLatch(1);
                Runnable consumer = new Runnable() {
                    public void run() {
                        channel.receive();
                        latch.countDown();
                    }
                };
                Thread consumerThread = new Thread(consumer);
                consumerThread.start();
                Thread.sleep(500);
                bus.stop();
                latch.await(1500, TimeUnit.MILLISECONDS);
                Assert.assertEquals("message channel not stopped", 0, latch.getCount());
            }
        - Thomas
        Thomas,

        The threads which are managed by the MessageBus (for dispatching messages) will be interrupted, but if you are consuming messages from external threads, i.e. directly from the channel, the MessageBus doesn't know anything about those threads, therefore it can't interrupt them.

        The MessageChannels, by the way, can be used in the absence of a MessageBus, which is something that, I think, explains better, why an external consumer is completely decoupled from the MessageBus.

        In this case having a shorter wait time, backed by a retry mechanism, could give you a guarantee that the consumer will not block indefinitely.
        In fact, in this case, if you know when the message bus is stopped, you can interrupt the consumers as well, as the whole process is under your control.

        Please let me know if this clarifies things,
        Marius

        Comment


        • #5
          Thomas,

          this might be of some help to you, as it addresses the issue of being notified when the MessageBus stops.

          http://jira.springframework.org/browse/INT-256

          Marius

          Comment


          • #6
            Hi Marius,

            thanks for your reply.
            I understand that the MessageBus doesn't know about external threads. The solution for my problem will be that I will use a short timeout and then check if the MessageBus was already stopped.
            Another solution could be an implementation of an cancelable MessageChannel. Together with a listener approach, the listener could stop all MessageChannels when the MessageBus was stopped. This presumes that you can ask the MessageBus (ChannelRegistry) about all registered channels.

            Or is there another solution for that? My consumer thread is an Ajax thread within a web environment. The Ajax client starts a request to the web server where the thread will be blocked until a message is available in the MessageChannel. The MessageBus will be stopped from another thread.
            I also could register a MessageHandler for the specific channel, but than I have to queue the message again. Therefore the MessageHandler must implement the Lifecycle interface. The MessageBus than will stop all registered MessageHandler. But I don't like the double queuing of messages.

            Regards,
            Thomas

            Comment


            • #7
              Hi Thomas,

              What currently happens, is that the Message Bus will stop the subscriptions to the channels. The channels themselves cannot be stopped as they're passive elements (to understand better, the model is one of a Blocking Queue). What does stopping the subscription effectively mean, is that the messages will not be passed on to endpoints (and from there to reply channels). Now, if you're having external components listening to the channels, they will block until a message is available there.

              What you can do in this case, is to simply register a MessageBusListener (newly added) and be notified whenever the message bus has been stopped.
              Thus, you can interrupt your client threads from the MessageBusListener itself (which will cause receive() to throw an InterruptedException).

              Hope that helps,
              Marius

              Comment


              • #8
                Hi Marius,

                to interrupt the clients, I have to register these clients (threads) to the MessageBusListener before calling receive(). Am I right?

                - Thomas

                Comment

                Working...
                X