Announcement Announcement Module
Collapse
No announcement yet.
DefaultSplitter and java.util.ConcurrentModificationException Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • #16
    Ok.thanks for Your free time.

    Comment


    • #17
      @snc85
      The reason of your problem is: when your read data in one thread, some other thread modifies your data...

      Comment


      • #18
        I know that...but Splitter component should be thread-safe...am I wrong?

        Comment


        • #19
          It is not problem of thread-safe Splitter.
          The problem is in your collection of message's paylod.
          Try this:
          Code:
          final List<IData> dataList = new ArrayList<IData>(((Packet) message.getPayload()).getDataList());

          Comment


          • #20
            As Artem pointed out, the issue has nothing to do with Splitter. You have a custom Array or List that is part of the Message payload or headers and that value is modified concurrently by more then one thread. And although Spring Integration does ensure Message immutability by creating a new Message for every exchange, we can not possibly control how many component in your code have access to the same reference of the object that is stored in the Message.

            Just look at the following code

            Code:
            Foo foo = new Foo()
            Message msg1 = new GenericMessage<Foo>(foo);
            
            Message msg2 = new GenericMessage<Foo>(foo)
            
            boolean msgSame = msg1.equals(msg2) // will obviously be 'false'
            boolean payloadSame = msg1.getPayload().equals(msg2.getPayload()) // will obviously be 'true'
            Messages are different but the payload they have is the same.

            Comment


            • #21
              Ok. THANK YOU GUYS!!!

              Comment


              • #22
                Just to be clear; this code...

                Code:
                for (IData data : dataList) {
                is iterating over dataList(). The exception is caused because dataList is being modified after the interator is created. This might be another thread; it might be the current thread.

                When using this Java5 syntax, you are NOT allowed to modify the list (e.g. remove an element) within the loop. If you need to do that, you need to explicitly use an iterator, and use the iterator to remove the item...

                Code:
                Iterator<IData> iterator = dataList.iterator();
                while (iterator.hasNext) {
                    IData iData = iterator.next();
                ...
                    iterator.remove();
                }
                If you are not removing elements in the loop, then some other thread must be modifying the list.

                Comment


                • #23
                  No, no...no removing from list
                  To sum up, now I have such components as:
                  - udp-inbound-channel-adapter
                  - transformer
                  - filter
                  - resequencer
                  - logging-channel-adapter

                  Code:
                  	<int-ip:udp-inbound-channel-adapter
                  		id="udpInput" channel="inputChannel" multicast="true"
                  		multicast-address="230.0.0.1" port="4446" receive-buffer-size="1400" />
                  	<int:channel id="inputChannel" />
                  
                  	<int:chain id="headerChain" input-channel="inputChannel"
                  		output-channel="headerTransformedChannel">
                  		<int:transformer ref="headerTransformer" />
                  		<int:header-enricher>
                  			<int:header name="correlationId" expression="payload.header.serviceID"
                  				overwrite="true" />
                  			<int:header name="sequenceNumber" expression="payload.header.packetSeqNum"
                  				overwrite="true" />
                  		</int:header-enricher>
                  	</int:chain>
                  	<int:channel id="headerTransformedChannel">
                  		<int:interceptors>
                  			<int:wire-tap channel="packetControlChannel" />
                  		</int:interceptors>
                  	</int:channel>
                  
                  	<int:filter input-channel="headerTransformedChannel"
                  		discard-channel="packetTypeErrorChannel" output-channel="packetFilteredChannel"
                  		ref="packetTypeFilter" />
                  	<int:channel id="packetFilteredChannel" />
                  	
                  	<int:resequencer id="seqNumberResequencer" comparator="messageComparator"
                  		input-channel="packetFilteredChannel" output-channel="logger"
                  		release-strategy="sequenceReleaseStrategy" />
                  
                  	<int:logging-channel-adapter id="logger" expression="payload.header.packetSeqNum" />
                  And I am getting:
                  Code:
                  Exception in thread "UDP-Incoming-Msg-Handler" org.springframework.integration.MessageHandlingException: error occurred in message handler [org.springframework.integration.aggregator.CorrelatingMessageHandler#0]
                  	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:84)
                  	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:110)
                  	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97)
                  	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:61)
                  	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:157)
                  	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:128)
                  	at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:288)
                  	at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:149)
                  	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendMessage(AbstractReplyProducingMessageHandler.java:176)
                  	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendReplyMessage(AbstractReplyProducingMessageHandler.java:160)
                  	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.produceReply(AbstractReplyProducingMessageHandler.java:125)
                  	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleResult(AbstractReplyProducingMessageHandler.java:119)
                  	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:101)
                  	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
                  	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:110)
                  	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97)
                  	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:61)
                  	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:157)
                  	at org.springframework.integration.handler.MessageHandlerChain$ReplyForwardingMessageChannel.send(MessageHandlerChain.java:210)
                  	at org.springframework.integration.handler.MessageHandlerChain$ReplyForwardingMessageChannel.send(MessageHandlerChain.java:203)
                  	at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:288)
                  	at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:149)
                  	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendMessage(AbstractReplyProducingMessageHandler.java:176)
                  	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendReplyMessage(AbstractReplyProducingMessageHandler.java:160)
                  	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.produceReply(AbstractReplyProducingMessageHandler.java:125)
                  	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleResult(AbstractReplyProducingMessageHandler.java:119)
                  	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:101)
                  	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
                  	at org.springframework.integration.handler.MessageHandlerChain$1.send(MessageHandlerChain.java:154)
                  	at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:288)
                  	at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:149)
                  	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendMessage(AbstractReplyProducingMessageHandler.java:176)
                  	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendReplyMessage(AbstractReplyProducingMessageHandler.java:160)
                  	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.produceReply(AbstractReplyProducingMessageHandler.java:125)
                  	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleResult(AbstractReplyProducingMessageHandler.java:119)
                  	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:101)
                  	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
                  	at org.springframework.integration.handler.MessageHandlerChain.handleMessageInternal(MessageHandlerChain.java:137)
                  	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
                  	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:110)
                  	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97)
                  	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:61)
                  	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:157)
                  	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:128)
                  	at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:288)
                  	at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:149)
                  	at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:92)
                  	at org.springframework.integration.ip.udp.UnicastReceivingChannelAdapter.access$300(UnicastReceivingChannelAdapter.java:43)
                  	at org.springframework.integration.ip.udp.UnicastReceivingChannelAdapter$1.run(UnicastReceivingChannelAdapter.java:151)
                  	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
                  	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
                  	at java.lang.Thread.run(Thread.java:662)
                  Caused by: java.lang.NullPointerException
                  	at org.springframework.integration.aggregator.CorrelatingMessageHandler.handleMessageInternal(CorrelatingMessageHandler.java:169)
                  	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
                  	... 51 more

                  Comment

                  Working...
                  X