Announcement Announcement Module
Collapse
No announcement yet.
DefaultSplitter and java.util.ConcurrentModificationException Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • DefaultSplitter and java.util.ConcurrentModificationException

    Hi!
    I've implemented class 'MsgTypeSplitter' that extends DefaultSplitter. It's main purpose is of course to split single message into multiple messages (which are "parsed" from the payload). Code:
    Code:
    @Component
    public class MsgTypeSplitter extends DefaultSplitter {
    
    	private Logger log = LoggerFactory.getLogger(MsgTypeSplitter.class);
    	
    	@Override
    	protected Object splitMessage(Message<?> message) {
    		List<Object> messageList = new ArrayList<Object>();
    		final List<IData> dataList = ((Packet) message.getPayload()).getDataList();
    		try {
    		for (IData data : dataList) {
    			switch (data.getMsgType()) {
    			case 230:
    				messageList.add(transformer.prepare(				data.getHeader(), data.getDataPacket()));
    				break;
    			case 231:
    		                break;
    			default:
    				log.warn("Unknown 'MsgType' field: " + data.getMsgType());
    				break;
    			}
    		}
    		} catch(IOException e) {
    			e.printStackTrace();
    		}
    		return messageList;
    	}
    }
    Splitter is connected with direct channel to filter (it fetches data from filter) after splitting it pushes data into 'payload-type-router'.
    In some cases I am having 'java.util.ConcurrentModificationException'
    Code:
    Exception in thread "UDP-Incoming-Msg-Handler" org.springframework.integration.MessageHandlingException: error occurred in message handler [msgTypeSplitter]
    	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:84)
    	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:110)
    	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97)
    	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:61)
    in line:
    Code:
    for (IData data : dataList) {
    Could You please help?
    Last edited by snc85; Jun 14th, 2011, 05:39 AM.

  • #2
    I don;t see ConcurrentModificationException in your stack. Could you please post the stack with ConcurrentModificationException?

    Comment


    • #3
      Code:
      13:34:43.349 DEBUG c.c.b.s.MsgTypeSplitter : handler 'msgTypeSplitter' produced no reply for request Message: [Payload=Packet [header=Header(packetLength=64, packetType=994, packetSeqNum=6210, sendTime=479001287, serviceID=1, deliveryFlag=0, numberMsgEntries=1), dataList=[DataPacket [msgType=247]], packet=[0, 64, 3, -30, 0, 0, 24, 66, 28, -116, -6, -57, 0, 1, 0, 1, 0, 46, 0, -9, 0, 0, 0, 113, 2, -51, 30, 8, 0, 0, 31, 118, 0, 1, 0, 1, 0, 0, 3, 36, 0, 0, 3, 36, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 2, 0, 0, 0]]][Headers={timestamp=1308051283349, id=2bc492e7-d2a0-4d0c-8956-8b376a3ff0d0, ip_address=10.132.26.168, correlationId=1, ip_hostname=nbkrzyzak.krakow.comarch, sequenceNumber=6210}]
      Exception in thread "UDP-Incoming-Msg-Handler" org.springframework.integration.MessageHandlingException: error occurred in message handler [msgTypeSplitter]
      	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:84)
      	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:110)
      	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97)
      	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:61)
      	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:157)
      	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:128)
      	at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:288)
      	at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:149)
      	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendMessage(AbstractReplyProducingMessageHandler.java:176)13:34:43.350 DEBUG c.c.b.a.c.PacketControlAcumulator : CONTROL PACKET, type: 994, seq: 6211, serviceID: 1, len: 64
      13:34:43.351 DEBUG c.c.b.s.MsgTypeSplitter : msgTypeSplitter received message: [Payload=Packet [header=Header(packetLength=64, packetType=994, packetSeqNum=6211, sendTime=479001289, serviceID=1, deliveryFlag=0, numberMsgEntries=1), dataList=[DataPacket [msgType=247]], packet=[0, 64, 3, -30, 0, 0, 24, 67, 28, -116, -6, -55, 0, 1, 0, 1, 0, 46, 0, -9, 0, 0, 0, 105, 2, -51, 30, 8, 0, 0, 31, 119, 0, 1, 0, 1, 0, 0, 11, 94, 0, 0, 11, 94, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 2, 0, 0, 0]]][Headers={timestamp=1308051283351, id=99d31e44-9c78-4bfc-ad9f-c751415e1a63, ip_address=10.132.26.168, correlationId=1, ip_hostname=nbkrzyzak.krakow.comarch, sequenceNumber=6211}]
      
      	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendReplyMessage(AbstractReplyProducingMessageHandler.java:160)
      	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.produceReply(AbstractReplyProducingMessageHandler.java:125)
      	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleResult(AbstractReplyProducingMessageHandler.java:119)
      	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:101)
      	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
      	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:110)
      	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97)
      	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:61)
      	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:157)
      	at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:288)
      	at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:149)
      	at org.springframework.integration.aggregator.CorrelatingMessageHandler.sendReplyMessage(CorrelatingMessageHandler.java:339)
      	at org.springframework.integration.aggregator.CorrelatingMessageHandler.sendReplies(CorrelatingMessageHandler.java:329)
      	at org.springframework.integration.aggregator.CorrelatingMessageHandler.completeGroup(CorrelatingMessageHandler.java:312)
      	at org.springframework.integration.aggregator.CorrelatingMessageHandler.handleMessageInternal(CorrelatingMessageHandler.java:179)
      	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
      	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:110)
      	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97)
      	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:61)
      	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:157)
      	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:128)
      	at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:288)
      	at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:149)
      	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendMessage(AbstractReplyProducingMessageHandler.java:176)
      	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendReplyMessage(AbstractReplyProducingMessageHandler.java:160)
      	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.produceReply(AbstractReplyProducingMessageHandler.java:125)
      	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleResult(AbstractReplyProducingMessageHandler.java:119)
      	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:101)
      	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
      	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:110)
      	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97)
      	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:61)
      	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:157)
      	at org.springframework.integration.handler.MessageHandlerChain$ReplyForwardingMessageChannel.send(MessageHandlerChain.java:210)
      	at org.springframework.integration.handler.MessageHandlerChain$ReplyForwardingMessageChannel.send(MessageHandlerChain.java:203)
      	at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:288)
      	at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:149)
      	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendMessage(AbstractReplyProducingMessageHandler.java:176)
      	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendReplyMessage(AbstractReplyProducingMessageHandler.java:160)
      	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.produceReply(AbstractReplyProducingMessageHandler.java:125)
      	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleResult(AbstractReplyProducingMessageHandler.java:119)
      	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:101)13:34:43.351 WARN  c.c.b.s.MsgTypeSplitter : Unknown 'MsgType' field: 247

      Comment


      • #4
        Code:
        13:34:43.351 DEBUG c.c.b.s.MsgTypeSplitter : handler 'msgTypeSplitter' produced no reply for request Message: [Payload=Packet [header=Header(packetLength=64, packetType=994, packetSeqNum=6211, sendTime=479001289, serviceID=1, deliveryFlag=0, numberMsgEntries=1), dataList=[DataPacket [msgType=247]], packet=[0, 64, 3, -30, 0, 0, 24, 67, 28, -116, -6, -55, 0, 1, 0, 1, 0, 46, 0, -9, 0, 0, 0, 105, 2, -51, 30, 8, 0, 0, 31, 119, 0, 1, 0, 1, 0, 0, 11, 94, 0, 0, 11, 94, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 2, 0, 0, 0]]][Headers={timestamp=1308051283351, id=99d31e44-9c78-4bfc-ad9f-c751415e1a63, ip_address=10.132.26.168, correlationId=1, ip_hostname=nbkrzyzak.krakow.comarch, sequenceNumber=6211}]
        
        	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
        	at org.springframework.integration.handler.MessageHandlerChain$1.send(MessageHandlerChain.java:154)
        	at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:288)
        	at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:149)
        	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendMessage(AbstractReplyProducingMessageHandler.java:176)
        	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendReplyMessage(AbstractReplyProducingMessageHandler.java:160)
        	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.produceReply(AbstractReplyProducingMessageHandler.java:125)
        	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleResult(AbstractReplyProducingMessageHandler.java:119)
        	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:101)
        	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
        	at org.springframework.integration.handler.MessageHandlerChain.handleMessageInternal(MessageHandlerChain.java:137)
        	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
        	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:110)
        	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97)
        	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:61)
        	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:157)
        	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:128)
        	at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:288)
        	at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:149)
        	at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:92)
        	at org.springframework.integration.ip.udp.UnicastReceivingChannelAdapter.access$300(UnicastReceivingChannelAdapter.java:43)
        	at org.springframework.integration.ip.udp.UnicastReceivingChannelAdapter$1.run(UnicastReceivingChannelAdapter.java:151)
        	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        	at java.lang.Thread.run(Thread.java:662)
        Caused by: java.util.ConcurrentModificationException
        	at java.util.LinkedList$ListItr.checkForComodification(LinkedList.java:761)
        	at java.util.LinkedList$ListItr.next(LinkedList.java:696)
        	at splitter.MsgTypeSplitter.splitMessage_aroundBody0(MsgTypeSplitter.java:28)
        	at splitter.MsgTypeSplitter.splitMessage_aroundBody1$advice(MsgTypeSplitter.java:21)
        	at splitter.MsgTypeSplitter.splitMessage(MsgTypeSplitter.java:1)
        	at org.springframework.integration.splitter.AbstractMessageSplitter.handleRequestMessage(AbstractMessageSplitter.java:50)
        	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:98)
        	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
        	... 75 more

        Comment


        • #5
          Hello
          What is the advice around MsgTypeSplitter.splitMessage?
          I think you must show your config...

          Artem

          Comment


          • #6
            The exception seems to happen in your code
            Code:
            	at java.util.LinkedList$ListItr.next(LinkedList.java:696)
            	at splitter.MsgTypeSplitter.splitMessage_aroundBody0(MsgTypeSplitter.java:28)
            	at splitter.MsgTypeSplitter.splitMessage_aroundBody1$advice(MsgTypeSplitter.java:21)
            	at splitter.MsgTypeSplitter.splitMessage(MsgTypeSplitter.java:1)
            so, I would look there.

            Also, your custom implementation does not have to extend DefaultSprlitter or any other SI class. It could simply be a POJO as long as you maintain a convention by having a method that takes an Object (e.g., Message, payload, headers etc.) and returns a Collection of an Array

            Comment


            • #7
              Code:
              public abstract aspect Profiling {
              
                  private Logger<ProfilingData> logger = new Logger<ProfilingData>(600000);
              
                  protected pointcut profiled() :
                      (Pointcuts.cbsPackages() &&
                      !Pointcuts.aopPackages() &&
                      Pointcuts.publicMethods() &&
                      !Pointcuts.gettersOrSetters());
              
                  Object around(): profiled() {
                      Signature signature = thisJoinPointStaticPart.getSignature();
                      ProfilingData profilingData = new ProfilingData(signature.getDeclaringTypeName(), signature.getName());
                      try {
                          return proceed();
                      } finally {
                          profilingData.stop();
                          logger.add(profilingData);
                      }
                  }
              }

              Comment


              • #8
                Not sure how many aspects are in play here (not clear from the code you posted above), but what is clear is that the exception originates from the instrumented MsgTypeSplitter, so I would suggest to look into config and all aspects that are being instrumented into the final representation fo the class.

                Comment


                • #9
                  How about "without" aspects?

                  Comment


                  • #10
                    No I am having:
                    Code:
                    Caused by: java.util.ConcurrentModificationException
                    	at java.util.LinkedList$ListItr.checkForComodification(LinkedList.java:761)
                    	at java.util.LinkedList$ListItr.next(LinkedList.java:696)
                    	at java.util.AbstractCollection.toString(AbstractCollection.java:421)
                    	at java.lang.String.valueOf(String.java:2826)
                    	at java.lang.StringBuilder.append(StringBuilder.java:115)
                    	at packet.Packet.toString(Packet.java:56)
                    	at java.lang.String.valueOf(String.java:2826)
                    	at java.lang.StringBuilder.append(StringBuilder.java:115)
                    	at org.springframework.integration.message.GenericMessage.toString(GenericMessage.java:82)
                    	at java.lang.String.valueOf(String.java:2826)
                    	at java.lang.StringBuilder.append(StringBuilder.java:115)
                    	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:72)
                    	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:110)
                    	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97)
                    	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:61)
                    	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:157)
                    	... 71 more
                    I've changed:
                    Code:
                    	@Override
                    	public String toString() {
                    		return "Packet [header=" + header + ", dataList=" + dataList
                    				+ ", packet=" + Arrays.toString(packet) + "]";
                    	}
                    to:
                    Code:
                    	@Override
                    	public String toString() {
                    		return "Packet [header=" + header + "]";
                    	}
                    ...and It looks like it works... any comments on that?

                    Comment


                    • #11
                      Well you have an array 'packet' and if "packet" is concurrently modified elsewhere, then you get this exception. In your case it seems like when toString() is executed the contents of the 'packet' are being modified by another thread.

                      Comment


                      • #12
                        I removed aspects, still having exception:
                        Code:
                        Caused by: java.util.ConcurrentModificationException
                        	at java.util.LinkedList$ListItr.checkForComodification(LinkedList.java:761)
                        	at java.util.LinkedList$ListItr.next(LinkedList.java:696)
                        	at splitter.MsgTypeSplitter.splitMessage(MsgTypeSplitter.java:25)
                        	at org.springframework.integration.splitter.AbstractMessageSplitter.handleRequestMessage(AbstractMessageSplitter.java:50)
                        	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:98)
                        	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
                        	... 75 more

                        Comment


                        • #13
                          Look at the line 25 of MsgTypeSplitter
                          Code:
                          at splitter.MsgTypeSplitter.splitMessage(MsgTypeSplitter.java:25)

                          Comment


                          • #14
                            it is:
                            Code:
                            for (IData data : dataList) {
                            ...
                            Code:
                            List<IData> dataList = ((Packet) message.getPayload()).getDataList();

                            Comment


                            • #15
                              @snc85

                              I am sorry but at this point it is clearly became a pure Java problem in user's code. I think we are all convinced that there is nothing in the framework that causes this problem.
                              The responses Artem and I had given you are aimed to help you (give you a hint) to locate the problem. But it is you who eventually have to find it and resolve it since only you have access to all the moving pieces. Sending short responses as you just did and expecting us to sit here trying to figure out what it all means without having any context does not help.

                              Think about forum ethics and consider that people here are doing it at their own free time and free will so if you want help then help us as well, by spending a little time trying to apply what you've learned from the previous reply before sending another question or meaningless reply as you just did.

                              Comment

                              Working...
                              X