Announcement Announcement Module
Collapse
No announcement yet.
Lightweight message solution Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Lightweight message solution

    Asynchronous communication can be handy to decouple components, and JMS was made for this. Only I find JMS to heavy if I need channels/pipes-filters in my system and I don`t care about the remoting aspect. Therefore I have created a lightweight channel implemention that doesn`t rely on jms (or anything else)

    The basis of the system is the InputChannel (you can take messages from) and the OutputChannel (you can put messages into). These channels are blocking, or are blocking with a timeout, this makes it very easy to use them in a production line.

    I have been looking for a good implemenation for some time, but I didn`t find anything lightweight that fits my needs, so I wrote my own. A lot of components are based on the book: Enterprise Integration Patterns, so people who read it know how those components work.

    In the near future (as soon as I have the time) I want to make the project opensource, but I first want to get some comment on it. Be warned.. the documention is partly in dutch.. there is not nearly enough documention.. the unit tests are missing.. so you don`t have to read a lot


    the link:
    http://members.home.nl/peter-veentjer01/index.htm

    btw:
    I have to create a new homepage also... (but I don`t like building webpages).

    This is an example I use in my Lucene implementation:

    These are the channels I`m currently using in this part of the system.
    -------------------------------------------------------------------
    crawler->deleteChannelBeginPoint->indexUpdater
    -------------------------------------------------------------------
    crawler->normalCrawlBeginPoint->retrieveOrCreateDocumentChannel->loadAndAnalyzeContentChannel
    ->metadataAddingChannel>writingEndPoint->indexUpdater
    -------------------------------------------------------------------

    this is a part of the configration in Spring:
    Code:
    	<bean id="crawler"
    		  class="com.jph.lucene.adoc.crawler.DirMappingTwoWayCrawler">
    
    		<constructor-arg index="0">
    			<bean class="com.jph.lucene.crawlers.SimpleCrawlerId">
    				<constructor-arg index="0" value="standaard"/>
    			</bean>
    		</constructor-arg>
    
    		<constructor-arg index="1" ref="index"/>
    		<constructor-arg index="2" ref="docConverter"/>
    		<constructor-arg index="3" ref="normalCrawlBeginPoint"/>
    		<constructor-arg index="4" ref="deleteChannelBeginPoint"/>
    		<constructor-arg index="5" ref="dirMappingRepository"/>
    	</bean>
    
    	<bean id="processingMap"
    		  class="org.jph.channel.StdProcessingMap"/>
    
    	<bean id="deleteQueue"
    		  class="java.util.concurrent.LinkedBlockingQueue">
    		<constructor-arg index="0" value="50000"/>
    	</bean>
    
    	<bean id="deleteChannelBeginPoint"
    		class="org.jph.channel.StdActiveOuputChannel">
    
    		<constructor-arg index="0">
    			<bean
    					class="org.jph.concurrent.BlockingThreadPoolExecutor"
    					destroy-method="shutdown">
    				<!-- core pool size -->
    				<constructor-arg index="0" value="1"/>
    				<!-- max pool size -->
    				<constructor-arg index="1" value="1"/>
    				<!-- die-time -->
    				<constructor-arg index="2" value="5000"/>
    				<!-- de tijdeenheid waarmee deze executorservice gaat werken -->
    				<constructor-arg index="3" ref="java.util.concurrent.TimeUnit.MILLISECONDS"/>
    				<!-- de queue die gebruikt wordt om taken in op te slaan -->
    				<constructor-arg index="4" ref="retrieveOrCreateDocumentQueue"/>
    					<!--
    						- de threadfactory die deze executorservice gebruikt om threads voor de threadpool
    						- aan te maken.
    						 -->
    				<constructor-arg index="5">
    					<bean class="org.jph.concurrent.StdThreadFactory">
    					    <!-- priority -->
    						<constructor-arg index="0" value="1"/>
    
    						<!-- De naam van ThreadGroup waar de Threads onder vallen -->
    						<constructor-arg index="1" value="deleteQueue"/>
    					</bean>
    				</constructor-arg>
    			</bean>
    		</constructor-arg>
    
    		<constructor-arg index="1">
    			<bean class="com.jph.lucene.adoc.standard.DeleteChannel">
    				<constructor-arg index="0" ref="indexUpdater"/>
    				<constructor-arg index="1" ref="pnlDocumentIndexReaderProvider"/>
    				<constructor-arg index="2" ref="processingMap"/>
    			</bean>
    		</constructor-arg>
    	</bean>
    	
    	<bean id="normalCrawlBeginPoint"
    		  class="anchormen.pnl.search.NormalCrawlStartOutputChannel">
    
    		<constructor-arg index="0" ref="retrieveOrCreateDocumentChannel"/>
    		<constructor-arg index="1" ref="processingMap"/>
    	</bean>
    
    	<bean id="retrieveOrCreateDocumentQueue"
    		  class="java.util.concurrent.LinkedBlockingQueue">
    		<constructor-arg index="0" value="500000"/>
    	</bean>
    
    	<bean id="loadAndAnalyzeContentQueue"
    		  class="java.util.concurrent.LinkedBlockingQueue">
    		<constructor-arg index="0" value="5000"/>
    	</bean>
    
    	<bean id="metadataAddingQueue"
    		  class="java.util.concurrent.LinkedBlockingQueue">
    		<constructor-arg index="0" value="5000"/>
    	</bean>
    
    	<bean id="retrieveOrCreateDocumentChannel"
    		  class="org.jph.channel.StdActiveOuputChannel">
    
    		<constructor-arg index="0">
    			<bean
    					class="org.jph.concurrent.BlockingThreadPoolExecutor"
    					destroy-method="shutdown">
    				<!-- core pool size -->
    				<constructor-arg index="0" value="1"/>
    				<!-- max pool size -->
    				<constructor-arg index="1" value="1"/>
    				<!-- die-time -->
    				<constructor-arg index="2" value="5000"/>
    				<!-- de tijdeenheid waarmee deze executorservice gaat werken -->
    				<constructor-arg index="3" ref="java.util.concurrent.TimeUnit.MILLISECONDS"/>
    				<!-- de queue die gebruikt wordt om taken in op te slaan -->
    				<constructor-arg index="4" ref="retrieveOrCreateDocumentQueue"/>
    
    				<constructor-arg index="5">
    					<bean class="org.jph.concurrent.StdThreadFactory">
    						<!-- priority -->
    						<constructor-arg index="0" value="1"/>
    
    						<!-- De naam van ThreadGroup waar de Threads onder vallen -->
    						<constructor-arg index="1" value="retrieveOrCreateDocument"/>
    					</bean>
    				</constructor-arg>
    			</bean>
    		</constructor-arg>
    
    		<constructor-arg index="1">
    			<bean class="org.jph.channel.StdProcessingOutputChannel">
    				<constructor-arg index="0" ref="loadAndAnalyzeContentChannel"/>
    
    				<constructor-arg index="1" >
    					<bean class="anchormen.pnl.search.RetrieveOrCreateDocumentMsgProcessor">
    						<constructor-arg index="0" ref="pnlDocumentIndexReaderProvider"/>
    						<constructor-arg index="1" ref="idFactory"/>
    					</bean>
    				</constructor-arg>
    			</bean>
    		</constructor-arg>
    	</bean>
    
    
    	<bean id="loadAndAnalyzeContentChannel"
    		  class="org.jph.channel.StdActiveOuputChannel">
    
    		<constructor-arg index="0">
    			<bean	class="org.jph.concurrent.BlockingThreadPoolExecutor"
    				destroy-method="shutdown">
    				<!-- core pool size -->
    				<constructor-arg index="0" value="2"/>
    				<!-- max pool size -->
    				<constructor-arg index="1" value="6"/>
    				<!-- die-time -->
    				<constructor-arg index="2" value="5000"/>
    				<!-- de tijdeenheid waarmee deze executorservice gaat werken -->
    				<constructor-arg index="3" ref="java.util.concurrent.TimeUnit.MILLISECONDS"/>
    				<!-- de queue die gebruikt wordt om taken in op te slaan -->
    				<constructor-arg index="4" ref="loadAndAnalyzeContentQueue"/>
    
    				<constructor-arg index="5">
    					<bean class="org.jph.concurrent.StdThreadFactory">
    						<!-- priority -->
    						<constructor-arg index="0" value="1"/>
    						<!-- De naam van ThreadGroup waar de Threads onder vallen -->
    						<constructor-arg index="1" value="loadersAndAnalyzers"/>
    					</bean>
    				</constructor-arg>
    			</bean>
    		</constructor-arg>
    
    		<constructor-arg index="1">
    			<bean class="org.jph.channel.StdProcessingOutputChannel">
    				<constructor-arg index="0" ref="metadataAddingChannel"/>
    
    				<constructor-arg index="1">
    					<bean class="anchormen.pnl.search.LoadAndAnalyzeContentMsgProcessor">
    						<constructor-arg index="0" ref="dirMappingRepository"/>
    					</bean>
    				</constructor-arg>
    			</bean>
    		</constructor-arg>
    
    
    	</bean>
    
    	<bean id="metadataAddingChannel"
    		  class="org.jph.channel.StdActiveOuputChannel">
    
    		<constructor-arg index="0">
    			<bean
    					class="org.jph.concurrent.BlockingThreadPoolExecutor"
    					destroy-method="shutdown">
    
    				<!-- core pool size -->
    				<constructor-arg index="0" value="1"/>
    				<!-- max pool size -->
    				<constructor-arg index="1" value="1"/>
    				<!-- die-time -->
    				<constructor-arg index="2" value="5000"/>
    				<!-- de tijdeenheid waarmee deze executorservice gaat werken -->
    				<constructor-arg index="3" ref="java.util.concurrent.TimeUnit.MILLISECONDS"/>
    				<!-- de queue die gebruikt wordt om taken in op te slaan -->
    				<constructor-arg index="4" ref="metadataAddingQueue"/>
    					<!--
    					- de threadfactory die deze executorservice gebruikt om threads voor de threadpool
    					- aan te maken.
    					 -->
    				<constructor-arg index="5">
    					<bean class="org.jph.concurrent.StdThreadFactory">
    						<!-- priority -->
    						<constructor-arg index="0" value="1"/>
    						<!-- De naam van ThreadGroup waar de Threads onder vallen -->
    						<constructor-arg index="1" value="analyzers"/>
    					</bean>
    				</constructor-arg>
    			</bean>
    		</constructor-arg>
    
    		<constructor-arg index="1">
    			<bean class="org.jph.channel.StdProcessingOutputChannel">
    				<constructor-arg index="0" ref="writingEndPoint"/>
    
    				<constructor-arg index="1">
    					<bean class="anchormen.pnl.search.MetadataAddingMsgProcessor">
    						<constructor-arg index="0" ref="pnlManager"/>
    					</bean>
    				</constructor-arg>
    			</bean>
    		</constructor-arg>
    	</bean>
    
    	<bean id="writingEndPoint"
    		  class="anchormen.pnl.search.WritingOutputChannel">
    
    		<constructor-arg index="0" ref="indexUpdater"/>
    	</bean>

  • #2
    I have changed a few structures a little. It is now possible to make all channels active..

    OutputChannel channel = the channel to proces;
    //The messageprocessor does some heavy calculations..
    //and you don`t want to hit the thread(s) that posts message on the 'channel to proces'
    //to much. So... make it multithreaded (active)
    channel = new StdProcessingOutputChannel(channelProcessor, channel);
    BlockingExecutor executor = BlockingThreadPoolExecutor(....)
    channel = new StdActiveOutputChannel(channel, executor);

    But nobody has an comment?

    Comment


    • #3
      BTW JMS is a standard API to messaging; but that doesn't mean it has to be heavyweight - its just an API.

      e.g. ActiveMQ can be used purely in JVM without any networking or persistence to exchange POJOs, which may or may not be inside the same classloader.

      http://activemq.org

      Comment


      • #4
        Originally posted by jstrachan
        BTW JMS is a standard API to messaging; but that doesn't mean it has to be heavyweight - its just an API.
        Maybe.. but does it give me the control I need (especially the blocking and timeout parts)? and the check the interfaces.. there is not much an outputchannel/inputchannel needs and there is no message interface I need to implement, so every pojo will do. Making a custom implementation is very very easy.. is it just as easy with a JMS-interface? I hate dragging a lot of unneeded code with me..

        Comment


        • #5
          Originally posted by Alarmnummer
          Originally posted by jstrachan
          BTW JMS is a standard API to messaging; but that doesn't mean it has to be heavyweight - its just an API.
          Maybe.. but does it give me the control I need (especially the blocking and timeout parts)? and the check the interfaces.. there is not much an outputchannel/inputchannel needs and there is no message interface I need to implement, so every pojo will do. Making a custom implementation is very very easy.. is it just as easy with a JMS-interface? I hate dragging a lot of unneeded code with me..
          Its just a single Jar to put on your classpath; its all open source so you're welcome to surf the code if you like

          Comment


          • #6
            THe message interface:
            http://java.sun.com/j2ee/sdk_1.3/tec...s/Message.html
            The message consumer interface.
            http://java.sun.com/j2ee/sdk_1.3/tec...eConsumer.html

            There are a lot of unneeded methods I need the implement and the message interface is big.. my message channel accepts all objects.. I like it lightweight and that is the reason I wrote my implementation (it was extracted from another project).

            Btw:
            good to see there is the same amount of timeout control.

            Comment

            Working...
            X