Announcement Announcement Module
Collapse
No announcement yet.
Chunk processing , poor performance with channels Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Chunk processing , poor performance with channels

    Hi guys. I am tackling a problem for a while I would like to hear your ideas as to what the solution might be.
    I have a large XML file in which, let's say, 300.000 items are stored. I use SAX parser to read data from the file and send them in chunks to pollable channel. On the receiver end item is retrieved processed - stored in database. And while the task seems mundane I have very poor persormance and it's degrading by time. Solution runs on Weblogic 10+ and oracle 10+. Here is my config :

    Code:
     
    <task:executor id="taskExecutor" pool-size="5-50" queue-capacity="0" rejection-policy="CALLER_RUNS" keep-alive="20"/>
    <task:executor id="defaultPollerTaskExecutor" pool-size="10-50" queue-capacity="0" rejection-policy="CALLER_RUNS" keep-alive="5"/>
    <task:executor id="pollerTaskExecutor" pool-size="5-80" queue-capacity="0" rejection-policy="CALLER_RUNS" keep-alive="10"/>
    <int:poller task-executor="defaultPollerTaskExecutor" fixed-rate="50" max-messages-per-poll="500" default="true"/>
    
    <int:channel id="docChargeInChannel">
      <int:queue capacity="1000"/>
    </int:channel>
    <int:channel id="docChargeOutChannel">
      <int:queue capacity="1000"/>
    </int:channel>
    
    <int:bridge input-channel="docChargeInChannel" output-channel="docChargeOutChannel"/>
    
    <file:inbound-channel-adapter channel="filesInChannel" id="filesIn" directory="${somedir}" prevent-duplicates="true">
     <int:poller fixed-delay="5000"/>
    </file:inbound-channel-adapter>
    <bean id="fileSplitter" class="*.DirectoryFileSplitter"/>
                       <int:splitter ref="fileSplitter" 
                       method="split" 
                       input-channel="filesInChannel" 
                       output-channel="docChargeInChannel"/>
    
     <int:channel id="errorChargeChannel">
      <int:queue capacity="1500"/>
     </int:channel>
      <file:outbound-channel-adapter id="errorFileAdapter"
                                      directory="${someOutDirectory}" 
                                      channel="errorChargeChannel" charset="UTF-8"/>
    
    
    <int:outbound-channel-adapter id="docChargeAdapter" channel="docChargeOutChannel" method="handleMessage" ref="pollerBean">
     <int:poller fixed-rate="5" task-executor="pollerTaskExecutor"/>
    </int:outbound-channel-adapter>
    
    <bean id="pollerTarget" class="com.bssys.ebpp.file.charge.jms.FileChargePoller" scope="prototype"/>
      <bean id="pollerPool" class="org.springframework.aop.target.CommonsPoolTargetSource">
        <property name="targetBeanName" value="pollerTarget" />
        <property name="maxSize" value="100" />
      </bean>
      <bean id="pollerBean" class="org.springframework.aop.framework.ProxyFactoryBean">
        <qualifier value="pollerBean" />
        <property name="targetSource" ref="pollerPool" />
      </bean>
    Here is the class for sending data to the channel:

    Code:
    public class DirectoryFileSplitter
    {
    	private final Logger		log		= LoggerFactory.getLogger(DirectoryFileSplitter.class);
    	private SAXParserFactory	factory	= SAXParserFactory.newInstance();
    	@Autowired
    	@Qualifier("docChargeOutChannel")
    	private MessageChannel		processChannel;
    	
    	@Autowired
    	private XQueryTransformer transformer;
    
    	public List<Message<String>> split(Message<?> message)
    	{
    		InputStream inputStream = null;
    		SaxHandler handler = new SaxHandler();
    		try {
    			handler.setCallback(new CallbackChunk()
    			{
    				@Override
    				public void sendChunk(Collection<String> collection)
    				{
    					log.debug("Start sending chunk {}", collection.size());
    					for (String item : collection) {
    						long start = System.currentTimeMillis();
    						String chargeString = extractCharge(item);
    						String msg = transformer.transform(item);
    						processChannel.send(MessageBuilder.withPayload(msg).setHeader("splitTimeStamp", System.currentTimeMillis()).setHeader("initialSource", item)
    								.setHeader("src", msg).setHeader("transformTime", System.currentTimeMillis() - start).setHeader("signedPayload", chargeString).build());
    
    					}
    					log.debug("End");
    				}
    			});
    
    			log.debug("Start splitting message from file adapter");
    			final File filePayload = (File) message.getPayload();
    			SAXParser saxParser = factory.newSAXParser();
    			inputStream = new FileInputStream(filePayload);
    			Reader reader = new InputStreamReader(inputStream, "UTF-8");
    
    			InputSource is = new InputSource(reader);
    			is.setEncoding("UTF-8");
    			saxParser.parse(is, handler);
    
    			int leftCount = handler.getDocumentList().size();
    			log.debug(MessageFormat.format("Items sent by callback {0}, items left to be sent {1}", handler.getItemsRead()
    					- leftCount, leftCount));
    
    		} catch (Exception e) {
    			log.error(ExceptionUtils.getFullStackTrace(e));
    		}
    		List<Message<String>> listt = new ArrayList<Message<String>>();
    		if (handler.getDocumentList().size() > 0){
    			for (String item : handler.getDocumentList()){
    				long start = System.currentTimeMillis();
    				String msg = transformer.transform(item);
    				String chargeString = extractCharge(item);
    				Message<String> msg2 = MessageBuilder.withPayload(msg).setHeader("splitTimeStamp", System.currentTimeMillis()).setHeader("initialSource", item)
    						.setHeader("src", item).setHeader("transformTime", System.currentTimeMillis() - start).setHeader("signedPayload", chargeString).build();
    				listt.add(msg2);
    			}
    		}
    		return listt;
    	}
    	
    	private String extractCharge(String someString){
    		//some logic here
    	}
    
    }
    And here goes the sax hanler:

    Code:
    public class SaxHandler extends DefaultHandler
    {
    	private final Logger log	= LoggerFactory.getLogger(SaxHandler.class);
    	private StringBuilder	builder		= new StringBuilder();
    	boolean					startEvent	= false;
    	private long			itemsRead	= 0;
    	private long sendCounter = 0;
    	private final int chunkSize = 1000;
    	private Deque<String> documentList = new LinkedList<String>();
    	
    	private CallbackChunk callback;
    	
    	public Deque<String> getDocumentList()
    	{
    		return documentList;
    	}
    
    	public void setDocumentList(Deque<String> documentList)
    	{
    		this.documentList = documentList;
    	}
    
    	public CallbackChunk getCallback()
    	{
    		return callback;
    	}
    
    	public void setCallback(CallbackChunk callback)
    	{
    		this.callback = callback;
    	}
    
    	public SaxHandler()
    	{
    	}
    
    	@Override
    	public void startElement(String uri, String localName, String qName, Attributes attributes) throws SAXException
    	{
    		boolean chargeFound = false;
    		if (qName.contains("UnifoTransferMsg")) {
    			chargeFound = true;
    	    }
    		if (chargeFound || startEvent) {
    			if (chargeFound){
    				builder = new StringBuilder();
    			}
    			startEvent = true;
    			String attrs = getAttributes(attributes);
    			builder.append("<").append(qName).append(attrs.isEmpty() ? "" : " ").append(attrs).append(">");
    		}
    	}
    
    	public long getItemsRead()
    	{
    		return itemsRead;
    	}
    
    	public void setItemsRead(long itemsRead)
    	{
    		this.itemsRead = itemsRead;
    	}
    
    	private String getAttributes(Attributes attributes)
    	{
    		StringBuilder attrBuilder = new StringBuilder();
    		int i = 0;
    		while (i < attributes.getLength()) {
    				String qName = attributes.getQName(i);
    				String value = attributes.getValue(i++);
    				attrBuilder.append(qName).append("=").append("\"").
    				append(value).append("\"").append(" ");
    			
    		}
    		return attrBuilder.toString();
    	}
    
    	@Override
    	public void endElement(String uri, String localName, String qName) throws SAXException
    	{
    		boolean chargeFound = false;
    		if (qName.contains("UnifoTransferMsg")) {
    			chargeFound = true;
    	    }
    		if (chargeFound || startEvent) {
    			if (chargeFound) {
    				startEvent = false;
    				itemsRead++;
    				sendCounter++;
    			}
    			builder.append("</").append(qName).append(">");
    			if (chargeFound){
    				String item = builder.toString();
    				documentList.add(item);
    				if (sendCounter >=chunkSize){
    					log.debug("Start sending chunk, counter {}", sendCounter);
    					Collection<String> els = popElements(documentList, chunkSize);
    					int elsSize = els.size();
    					log.debug("Start sending chunk, size {}, items count so far, size {}", elsSize, itemsRead);
    					
    					callback.sendChunk(els);
    					
    					sendCounter-=elsSize;
    					log.debug("End sending chunk, counter {}", sendCounter);
    					els = null;
    					builder = new StringBuilder();
    				}
    			}
    		}
    	}
    	
    	private Collection<String> popElements(Deque<String> deque, int size){
    		List<String> elementSet = new ArrayList<String>();
    		while (size-- > 0 && !deque.isEmpty()){
    			elementSet.add(deque.poll());
    		}
    		return elementSet;
    	}
    
    	@Override
    	public void characters(char[] ch, int start, int length) throws SAXException
    	{
    		if (startEvent) {
    			builder.append(ch, start, length);
    		}
    	}
    
    }
    And there is a log:
    Code:
    # [Items processed succesfully 8, overall took 0 ms, transaction commit 12137, default poller executor 42, file charge executor 72]
    
    #[Items processed succesfully 77, overall took 11357 ms, transaction commit 7675, default poller executor 42, file charge executor 72]
    
    # [Items processed succesfully 1168, overall took 100324 ms, transaction commit 7534, default poller executor 40, file charge executor 72]
    
    
    #[Items processed succesfully 10473, overall took 49390 ms, transaction commit 3463, default poller executor 46, file charge executor 72]
    
    
    #[Items processed succesfully 15203, overall took 36098 ms, transaction commit 3947, default poller executor 44, file charge executor 72]
    
    #[Items processed succesfully 30380, overall took 46878 ms, transaction commit 608, default poller executor 50, file charge executor 72]
    As we can see, the size of items processed grows and so does the time required to poll message from one channel to the other. I played a little with thread executors configuration but it is of no use. Do you have any ideas why it is slow? Any idea or even remote suggestion would be of great help. Thanks for your time, fellas.
    Last edited by Fandorin; Jul 31st, 2012, 04:56 AM.

  • #2
    And hereis the processor itself:
    Code:
    public class FileChargePoller implements MessageHandler
    {
    	private final Logger	log	= LoggerFactory.getLogger(FileChargePoller.class);
    	private final Logger	countLogger	= LoggerFactory.getLogger("countLogger");
    	
    	@Autowired
    	private JAXBUtil jaxb;
    	private final String senderId = "******";
    	
    	@Autowired
    	private CounterAtomic counter;
    	
    	@Autowired
    	@Qualifier("pollerTaskExecutor")
    	private ThreadPoolTaskExecutor taskExecutor;
    	
    	@Autowired
    	@Qualifier("defaultPollerTaskExecutor")
    	private ThreadPoolTaskExecutor defPoller;
    	
    	@Autowired
    	@Qualifier("errorChargeChannel")
    	private MessageChannel errChannel;
    
    	public void handleMessage(Message<?> message) throws MessagingException
    	{
    		String msgId = null;
    		long tookTimeToSplit = 0;
    		long timeToFinish = System.currentTimeMillis();
    		long jaxbStart = 0;
    		long jaxbEnd = 0;
    		try {
    			log.debug("Start general transform {}", hashCode());
    			DocTransfer docProcessor = ContextLoader.getCurrentWebApplicationContext().getBean("documentProcessorBean", DocTransfer.class);
    			XPathInterface transformProcessor = ContextLoader.getCurrentWebApplicationContext().getBean("xPathProcessorBean", XPathInterface.class);
    			log.debug(MessageFormat.format("File charge input message received. This {0}, xpath proc {1}, Active count of running thread on executor {2}, doc proc {3}",  
    					this.hashCode(), transformProcessor.hashCode(), taskExecutor.getActiveCount(), docProcessor.hashCode()));
    			String msgPayload = (String) message.getPayload();
    			tookTimeToSplit = System.currentTimeMillis() - message.getHeaders().get("splitTimeStamp", Long.class);
    			log.debug("Start conversion to doc");
    			Document doc = transformProcessor.stringToDom(msgPayload);
    			log.debug("End conversion to doc");
    			String billtransferString = transformProcessor.getStringForBillTransfer(doc);
    
    			jaxbStart = System.currentTimeMillis();
    			BillTransferMsgRqType billTransfer = null;
    			String msgHdr = transformProcessor.getStringForMsgHdr(doc);
    			if (!StringUtils.isBlank(msgHdr)){
    				MsgHdrType hdr = jaxb.stringToJaxbObject(msgHdr, MsgHdrType.class);
    				msgId = hdr.getMsgId();
    				log.debug("Msg header parsed, msg initial id {}", msgId);
    			}
    			
    			SourcesType sources = new SourcesType();
    			SourceType srcType = new SourceType();
    			srcType.setRequest(message.getHeaders().get("signedPayload", String.class).getBytes("UTF-8"));
    			sources.getSource().add(srcType); 
    			billTransfer = jaxb.stringToJaxbObject(billtransferString, BillTransferMsgRqType.class);
    			jaxbEnd = System.currentTimeMillis();
    			docProcessor.loadCharges(senderId, billTransfer, sources);
    			
    			counter.getCounter().incrementAndGet();
    			log.debug("Message with id {} processed successfully, hashCode {}", msgId, hashCode());
    		} catch (Exception ex) {
    			String msg =  (String) message.getPayload();
    			StringBuilder builder = new StringBuilder(256);
    			String fullStack = ExceptionUtils.getFullStackTrace(ex); 
    			builder.append("Error details: \n").append(fullStack);
    			builder.append("\n\n -------------\nPayload:\n").append(msg);
    			log.error(fullStack);
    			errChannel.send(MessageBuilder.withPayload(builder.toString()).build());
    			
    		}
    		countLogger.warn(MessageFormat.format("Items processed succesfully {0}, overall took {1} ms, transaction commit {2}, default poller executor {3}, file charge executor {4}", 
    				counter.getCounter().longValue(), tookTimeToSplit,  System.currentTimeMillis() - timeToFinish, defPoller.getActiveCount(), taskExecutor.getActiveCount()));
    	}
    
    }

    Comment


    • #3
      There are definitely too many variables in your configuration for anyone to sift through. When submitting a question like this it would be highly recommended to provide a minimum configuration that demonstrates the problem so it could be reproduced by whoever is trying to respond, otherwise its not practical. It would also help if you could explain your problem instead of trying to get help with your solution as it may be wrong and we definitely do not want to mislead you.
      But I'll try to do is give you some pointers anyway.

      - Most of the components that I see in our config are PollingConsumers. When you dealing with PollingConsumers you must realize that your polling segment can never be any faster then the poller rate. If your flow has more then one polling segment than ou could easily get in the timing situation where an input to the next poller was produced just a tick after the downstream poller executed its poll and now it waits for the next poll even though the data from the upstream is ready. So knowing your system and tuning it accordingly helps. Why do you have so many polling consumers?
      - Queue capacity. Although it does protect you from the OOM errors it can also produce bottleneck if Messages are not retrieved in time, since once the Queue reaches its capacity the sending thread blocks.
      - Practically every endpoint i see is a ref to a bean which means its a ref to a custom code that needs to be profiled separately.

      General advice, start profiling your system in logical segments. For example:
      I would measure this segment first
      Code:
      <file:inbound-channel-adapter channel="filesInChannel" id="filesIn" directory="${somedir}" prevent-duplicates="true">
       <int:poller fixed-delay="5000"/>
      </file:inbound-channel-adapter>
      <bean id="fileSplitter" class="*.DirectoryFileSplitter"/>
                         <int:splitter ref="fileSplitter" 
                         method="split" 
                         input-channel="filesInChannel" 
                         output-channel="docChargeInChannel"/>
      . . . and then continue

      Comment


      • #4
        Also, i see that your configuration is a mix of injection styes - XML, Annotation and context.getBean which makes it next to impossible to follow without having full access to your code and runtime environment, so cleaning up would help

        Comment


        • #5
          Hello, Oleg. Thanks for reply. I will clean things up a little and post concise version of the configuration.
          Last edited by Fandorin; Jul 26th, 2012, 11:15 AM.

          Comment


          • #6
            I took your advice and profiled segments separately. I also logged time when message was submitted to channel and time it was retrieved calculating delay.

            First one which reads a huge file is rather fast. There is only one file in directory and thus can be profiled easily. It reads (using SAX parser) and sends 10.000 messages to "docChargeOutChannel" within 40 seconds thus estimating ~240 messages per second and there is no significant delay(~1ms).
            Diagram: chunk is read -> sent to docChargeOut-> retrieved from the channel - > log - ..- > next iteration

            Input channel config:
            Code:
            <int:channel id="docChargeInChannel">
            <int:queue capacity="3000"/>
            </int:channel>
            <int:bridge input-channel="docChargeInChannel" output-channel="docChargeOutChannel"/>
                
            <int:channel id="filesInChannel"/>
                
            <bean id="fileSplitter" class="DirectoryFileSplitter"/>
                               <int:splitter ref="fileSplitter" 
                               method="split" 
                               input-channel="filesInChannel" 
                               output-channel="docChargeInChannel"/>
                               
            <file:inbound-channel-adapter channel="filesInChannel" 
                                              id="filesIn" 
                                              directory="..."
                                              prevent-duplicates="true">
                                              <int:poller fixed-delay="5000"/>
                                              
            </file:inbound-channel-adapter>
            Out channel config:
            Code:
            <int:channel id="docChargeOutChannel">
              <int:queue capacity="3000"/>
            </int:channel>
            <int:outbound-channel-adapter id="docChargeAdapter" channel="docChargeOutChannel" method="handleMessage" ref="pollerBean">
            <int:poller fixed-rate="1" max-messages-per-poll="1" task-executor="pollerTaskExecutor"/>
            </int:outbound-channel-adapter>
            Pollers and task executors config:

            Code:
            <task:executor id="taskExecutor" pool-size="5-50" queue-capacity="0" rejection-policy="CALLER_RUNS" keep-alive="2"/>
            <task:executor id="defaultPollerTaskExecutor" pool-size="10-60" queue-capacity="0" rejection-policy="CALLER_RUNS" keep-alive="1"/>
            <task:executor id="pollerTaskExecutor" pool-size="5-60" queue-capacity="0" rejection-policy="CALLER_RUNS" keep-alive="5"/>
            <int:poller task-executor="defaultPollerTaskExecutor" fixed-rate="5" max-messages-per-poll="10" default="true"/>
            Second profiling is when file is read, messages sent to channel, retrieved and full processing cycle is deployed apart from database processing (insert operations). This gave me ~ 169 messages per second and almost no delay.

            Diagram: chunk is read -> sent to docChargeOut-> retrieved from the channel - > xpath transformation -> log - ..- > next iteration

            But when I switched db-processing on, delay was increasing being full ten seconds just after 20 messages had been processed and counting. Oracle connection pool is 50 connections. Moreover transaction time is almost a constant value (~3-4 seconds) as messages are being processed. I may surmise it has to do with pollers configuration.

            Diagram: chunk is read -> sent to docChargeOut-> retrieved from the channel - > xpath transformation -> database processing -> log - ..- > next iteration

            The reason for having a lot of polling consumers is to allow multi threading processing that is big file is sent in chunks to a channel being polled by task executor and many identical consumers. Am I getting something wrong and there is a better solution? I am open to an solution and willing to experiment with different configurations.

            I tried to be least verbose on the code and config))
            Last edited by Fandorin; Jul 31st, 2012, 11:40 AM.

            Comment


            • #7
              It turned out I misconfigured poller timing options. I toyed with them a little and it worked. Now, on i5 processor it estimates to about 200 messages per second. Oleg, thanks for the idea of tuning the pollers.

              Comment


              • #8
                Also, if you needs multithreading you can also rely on ExecutorChannel
                For example:
                Code:
                <int:channel id="myExecChannel">
                      <int:dispatcher task-executor="ref_to_instance_of_java.util.concurrent.Executor"/>
                </int:channel>
                This way you'll get async event based message distribution.

                Comment


                • #9
                  I will certainly give it a try. Thank you, Oleg.

                  Comment

                  Working...
                  X