Announcement Announcement Module
Collapse
No announcement yet.
BroadcastingDispatcher bug Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • BroadcastingDispatcher bug

    I think I found a bug; but figured I'd check here before posting a JIRA.

    I'm seeing the following symptom:

    I'm putting a PublishSubscribeChannel in the OSGi service registry. Another bundle gets it out and subscribes. In the meantime the first bundle may have started to put messages onto it. The messages are being put in relatively quickly (~30hz) and it appears a new subscriber is coming on while a message is being processed. Doesn't happen all the time; but does happen. By delaying the message writes for a bit the problem goes away so I'm feeling confident that add a new subscriber while processing a message is the problem.

    Exception is:

    Caused by: java.util.ConcurrentModificationException
    at java.util.AbstractList$Itr.checkForComodification( AbstractList.java:372)
    at java.util.AbstractList$Itr.next(AbstractList.java: 343)
    at java.util.Collections$UnmodifiableCollection$1.nex t(Collections.java:1010)
    at org.springframework.integration.dispatcher.Broadca stingDispatcher.dispatch(BroadcastingDispatcher.ja va:53)
    at org.springframework.integration.channel.AbstractSu bscribableChannel.doSend(AbstractSubscribableChann el.java:56)
    at org.springframework.integration.channel.AbstractMe ssageChannel.send(AbstractMessageChannel.java:116)
    at org.springframework.integration.channel.MessageCha nnelTemplate.doSend(MessageChannelTemplate.java:22 2)
    at org.springframework.integration.channel.MessageCha nnelTemplate.send(MessageChannelTemplate.java:180)
    at org.springframework.integration.handler.AbstractRe plyProducingMessageHandler.sendReplyMessage(Abstra ctReplyProducingMessageHandler.java:119)
    at org.springframework.integration.handler.AbstractRe plyProducingMessageHandler.handleMessageInternal(A bstractReplyProducingMessageHandler.java:107)
    at org.springframework.integration.handler.AbstractMe ssageHandler.handleMessage(AbstractMessageHandler. java:48)
    ... 11 more

    Now I think I've found the problem.

    AbstractDispatcher.java defines getHandlers() as

    protected List<MessageHandler> getHandlers() {
    return Collections.unmodifiableList(handlers);
    }

    BroadcastingDispatcher then uses it like this:

    for (final MessageHandler handler : getHandlers()) {
    ...

    I believe this is incorrect behavior. The Javadoc for Collections.unmodifiableList reads:

    "It is imperative that the user manually synchronize on the returned list when iterating over it:
    ...
    Failure to follow this advice may result in non-deterministic behavior."

    Will just following this advice even be enough? As the Javadoc also adds:

    "In order to guarantee serial access, it is critical that all access to the backing list is accomplished through the returned list."

  • #2
    What version are you using?

    I believe this bug has already been fixed by the following change to AbstractDispatcher (returning a new, copied List instance):
    Code:
    protected List<MessageHandler> getHandlers() {
        return Collections.unmodifiableList(new ArrayList(this.handlers));
    }

    Comment


    • #3
      I was using 1.0.2; so I'll give this a try.


      Looking at 1.0.3 I do think there is still a problem in this code. While the list is being copied by the ArrayList constructor it is possible for a subscriber to be added or removed and modify the list. These operations need to be synchronized.

      This is a much smaller window of opportunity for it to happen than in 1.0.2 of course; but it could still happen.

      Comment


      • #4
        The idea there is for each dispatch to use a snapshot of the handlers at the moment that dispatch() is invoked. Therefore, if a handler subscribes while a dispatch is in progress, it will not be included in that dispatch, but it will be included in the next invocation. Conversely, if a handler unsubscribes while a dispatch is in progress, it can still be invoked as part of that dispatch, but it will not be included in the next invocation. It is the state of the subscribers at the very moment of the dispatch() invocation that matters. Does that seem reasonable?

        Comment


        • #5
          I agree with that; but believe it CAN throw an Exception if the stars align JUST right.

          Look at this code for example; it will throw an Exception (sometimes) because I'm copying the list in one thread while another thread is updating it. That's the case I'm worried about; modifying the handler list WHILE it is being copied.

          Code:
          package alt;
          
          import java.util.ArrayList;
          
          public class Tester {
          
          	/**
          	 * @param args
          	 */
          	public static void main(String[] args) {
          		final ArrayList<Object> list = new ArrayList<Object>();
          		Thread thread1 = new Thread(new Runnable() {
          			
          			public void run() {
          				try {
          					for (int i=0; i < 30000; i++) {
          						list.add(i);
          					}
          				} catch (Exception e) {
          					e.printStackTrace();
          					System.exit(-1);
          				}
          			}
          		});
          		Thread thread2 = new Thread(new Runnable() {
          			public void run() {
          				try {
          					for (int i=0; i < 20000; i++) {
          						ArrayList<Object> copy = new ArrayList<Object>(list);
          					}
          				} catch (Exception e) {
          					e.printStackTrace();
          					System.exit(-1);
          				}
          			}
          		});
          		thread2.start();
          		thread1.start();
          		
          		try {
          			thread1.join();
          			thread2.join();
          		} catch (Exception e) {
          			e.printStackTrace();
          		}
          	}
          }

          Comment


          • #6
            Are you saying that you have actually produced an Exception with that code?

            Comment


            • #7
              Yes.

              java.lang.ArrayIndexOutOfBoundsException
              at java.lang.System.arraycopy(Native Method)
              at java.util.ArrayList.toArray(ArrayList.java:304)
              at java.util.ArrayList.<init>(ArrayList.java:136)
              at alt.Tester$2.run(Tester.java:29)
              at java.lang.Thread.run(Thread.java:595)

              Comment


              • #8
                Okay. I have been unable to align the stars I guess ... even after increasing both loops to the millions (but the index out of bounds is definitely a possibility as you have managed to demonstrate). We could use a CopyOnWriteArrayList. However, it is actually the Collection being copied into this list that matters, since the one being created in getHandlers() will not itself be modifiable.

                Comment


                • #9
                  I still think you need a synchronized block or a lock.

                  Code:
                  ArrayList newList = new ArrayList(oldList)
                  needs to be protected if oldList might be modified by another thread during this operation; which in the abstract class the add and remove methods modify it.

                  synchronized (oldList) {} blocks or one of the locks from the concurrent package (ReadWriteLock is nice since it will allow multiple "readers" and writes almost never occur).

                  Does that make sense or do I have something fundamentally wrong?

                  Comment


                  • #10
                    paquettd, I think you're pretty much on top of things. Thanks!

                    http://jira.springframework.org/browse/INT-748, I'll have a look now.

                    Comment


                    • #11
                      please take a look at this code and reopen the issue if you think there's something off with it.

                      https://fisheye.springsource.org/bro...2=2366&r1=2279

                      Comment


                      • #12
                        I just made a few minor changes:
                        https://fisheye.springsource.org/cha...ation/?cs=2367

                        However, we might be able to push down all locking into the underlying OrderedAwareLinkedHashSet implementation. At this point, the only relevant methods lacking a lock there are toArray().

                        -Mark

                        Comment


                        • #13
                          Dan,

                          Have a look at the most recent changes:
                          https://fisheye.springsource.org/cha...ation/?cs=2368

                          All concurrent access safety is being managed within the OrderedAwareLinkedHashSet implementation now. Let me know what you think, and if you have time to bombard that collection with some similar tests, that would be really helpful.

                          Regards,
                          Mark

                          Comment

                          Working...
                          X