Announcement Announcement Module
Collapse
No announcement yet.
Enable Multithreading in Channel Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Enable Multithreading in Channel

    Hi everyone,
    I just start to use Spring Integration about a month ago. I have one requirement for my system, that is to enable multithreading for processing data from jdbc inbound channel adapter. From the documentation and training sources (books etc.), I found out that I can make a channel become a multithread channel by adding a queue tag as inner child element in channel tag.
    Below is my code :
    Code:
    	<jdbc:inbound-channel-adapter id="jdbcNotificationIn"
    		data-source="dataSource"
    		query="select * from myTable where status='OPEN' order by Priority"
    		channel="jdbcToChainOut" 
    		max-rows-per-poll="2"
    		update="update myTable set status='INPROCESS', StartDateTime=GETDATE() 
        		where Id in (:NotificationJobId)">
    			<poller ref="jdbc-inbound-poller"/>
    	</jdbc:inbound-channel-adapter>
    
    	<channel id="jdbcToChainOut">
    		<queue capacity="10"/>
    	</channel>
    
    	<chain input-channel="jdbcToChainOut">
    		<header-enricher>
    			<error-channel ref="chainToFailedOut"></error-channel>
    		</header-enricher>
    		<splitter ref="notificationSplitter" />
    		<transformer ref="notificationTransformer" />
    		<jms:outbound-channel-adapter id="jmsNotificationOut"
    			destination="notificationQueue" />
    	</chain>
    
    	<channel id="chainToFailedOut" />
    
    	<chain input-channel="chainToFailedOut">
    		<transformer ref="errorTransformer" />
    		<jdbc:outbound-channel-adapter
    			data-source="dataSource"
    			query="update myTable 
    			set status='FINISH WITH ERROR', EndDateTime=GETDATE(), Notes=(:payload[Notes]) 
    			where Id in (:headers[Ids])" />
    	</chain>
    
    	<!-- If you don't listen to the default error channel you risk losing track 
    		of exceptions, as they cannot be passed back to the sender in band. It is 
    		recommended to have a generic error handler in your configuration to prevent 
    		this. -->
    	<stream:stderr-channel-adapter channel="errorChannel"
    		append-newline="true" />
    This is my setup code :
    Code:
    	<bean id="transactionManager"
    		class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
    		<property name="dataSource" ref="dataSource" />
    	</bean>
    
    	<integration:poller id="jdbc-inbound-poller"
    		default="true" fixed-delay="5000">
    		<integration:transactional />
    	</integration:poller>
    My questions are :
    1. The part that I highlight with red color is the part that I believe it make my application become multithreading, have I done it with the right way to enabling multithreading? Or there are a better way for doing this?
    2. Have I do it right handling the error? Or there are a better way for doing this?
    3. If I want to log the error stack trace to a file, what is the best way for doing it? Previously in my other spring project, I use log4j to log the error stack trace to a file.
    4. What is the meaning of queue in this context?
    5. What is the best practice when we want to make a channel is multithreading? From what I read in Spring Integration book, we can just simply enable multithreading in channel by adding queue element or task executor attribute, in what condition we must use queue and in what condition we must use task executor?

    Thanks & Regards.
    Last edited by willy.juisan; Jun 13th, 2013, 12:18 AM.

  • #2
    1. No; for a polled channel adapter (such as jdbc) you would not use a QueueChannel
    2. No; you should add an error-channel to the poller (but see below)
    3. See below
    4. It's meaningless in this context
    5. It depends on the message source; and application requirements. I'll explain further after answering your specific questions.

    You should remove the <queue/> element.
    You should add a specific <poller/> element to the jdbcToChainOut

    (Note: most people use xsd prefix 'int' instead of 'integration' now (and int-jdbc etc).

    Code:
    <jdbc:inbound-channel-adapter ...
        <int:poller task-executor="exec" fixed-delay="500" error-channel="chainToFailedOut">
            <int:transactional/>
        </int:poller>    
    </jdbc:inbound-channel-adapter>
    <task:executor pool-size="10"/> <!-- how many threads you want to run at once -->
    You should remove the header-enricher that adds the error channel.

    To log as well as updating the db, make chainToFailedOut a <publish-subscribe-channel/> and add a second subscriber (<int:logging-channel-adapter/>).

    You will need to tune the max-rows-per-poll and the poller fixed-delay to achieve your desired concurrency.

    As to to the general question.

    ExecutorChannel and QueueChannel both provide async capabilities with slightly different semantics; they are only used when messages enter the queueing system from some external source on a thread (such as a <gateway/> or some other inbound endpoint that is not polled). It is important to understand that in Spring, transactions are bound to threads and these channels break the thread boundary, so transactions may not work as expected.

    In your case, a transactional poller works fine because each poller thread starts its own transaction.
    Last edited by Gary Russell; Jun 13th, 2013, 03:08 AM.

    Comment


    • #3
      Hi Gary, thank you for your fast response

      You should add a specific <poller/> element to the jdbcToChainOut
      This is mean that I have 2 poller in my application? The first poller is for jdbc-inbound-channel-adapter and the second is for jdbcToChainOut channel? How to tune this two poller so it will working well together? Could you provide me the guideline for setup those two poller element?

      (Note: most people use xsd prefix 'int' instead of 'integration' now (and int-jdbc etc).
      Why I should use xsd prefix 'int' instead of 'integration'? What benefit that I can get from it? It is better to set the xmlns to spring integration schema, so I don't have to write the prefix int when I want to use spring integration element?

      To log as well as updating the db, make chainToFailedOut a <publish-subscribe-channel/> and add a second subscriber (<int:logging-channel-adapter/>).
      Any example code or link that I can learn more about the code for publish subscribe channel?

      Sorry if I asking to many questions..
      Thanks & Regards.

      Comment


      • #4
        Sorry, my mistake, I meant to put the poller on the adapter (the poller would be needed on the chain if the channel was a queue channel - in which cases transactions wouldn't work as I described).

        The default poller is just a convenience if you need many pollers that need the same configuration. It is not an active component, it is just a set of metadata that defines polling attributes for a polling endpoint that doesn't have its own poller. It's generally clearer to understand a flow if you define the poller on the endpoint itself (chain in this case). You can delete the default poller; it's not used.

        'int' is just less typing than 'integration', that's all; yes the prefix would need to be changed in the xmlns declaration as well.

        http://static.springsource.org/sprin...bscribechannel

        http://static.springsource.org/sprin...-pubsubchannel

        Comment


        • #5
          Sorry, my mistake, I meant to put the poller on the adapter (the poller would be needed on the chain if the channel was a queue channel - in which cases transactions wouldn't work as I described).
          It's ok, I have figured it out when I read your first reply, the 'jdbcToChainOut' channel don't have any poller child element after all. So I have put the poller of task executor inside the first chain (the one that have input-channel 'jdbcToChainOut'.

          The default poller is just a convenience if you need many pollers that need the same configuration. It is not an active component, it is just a set of metadata that defines polling attributes for a polling endpoint that doesn't have its own poller. It's generally clearer to understand a flow if you define the poller on the endpoint itself (chain in this case). You can delete the default poller; it's not used.
          So in my case, I only need one poller inside the first chain element? Then how I can configure the fixed delay for jdbc inbound channel adapter?

          Thanks & Regards.

          Comment


          • #6
            Additional question:
            How do I know that my application is already running in multithread? Is there any tools to make sure that it is already multithread? How to test this multithread things?

            Thanks & Regards.

            Comment


            • #7
              No; I edited my answer, you only need one poller inside the inbound adapter element; you don't need a poller in the chain (that was my error)

              I doubt you'll see much multi-threading here because you are doing so little, unless you have a large amount of data and increase the max-messages-per-poll.

              You can see it by turning on debug logging.

              Comment


              • #8
                Hi Gary,
                I just finished trying your suggestions, and here is my final code now :
                Code:
                	<int-jdbc:inbound-channel-adapter id="jdbcNotificationIn"
                		data-source="dataSource"
                		query="select *  
                			from myTable 
                			where status='OPEN' order by Priority"
                		channel="jdbcToChainOut"
                		update="update myTable set status='INPROCESS', StartDateTime=GETDATE() 
                    		where Id in (:Id)"
                		max-rows-per-poll="5">
                		<int:poller fixed-delay="1000">
                			<int:transactional />
                		</int:poller>
                	</int-jdbc:inbound-channel-adapter>
                
                	<int:channel id="jdbcToChainOut">
                		<int:queue capacity="5" />
                	</int:channel>
                
                	<int:chain input-channel="jdbcToChainOut">
                		<int:splitter ref="notificationSplitter" />
                		<int:transformer ref="notificationTransformer" />
                		<int-jms:outbound-channel-adapter id="jmsNotificationOut"
                			destination="notificationQueue" />
                		<int:poller task-executor="notificationExecutor"
                			fixed-delay="1000" error-channel="chainToFailedOut">
                			<int:transactional />
                		</int:poller>
                	</int:chain>
                
                	<task:executor id="notificationExecutor" pool-size="5" />
                
                	<int:publish-subscribe-channel id="chainToFailedOut" />
                
                	<int:chain input-channel="chainToFailedOut">
                		<int:transformer ref="errorTransformer" />
                		<int-jdbc:outbound-channel-adapter
                			data-source="dataSource"
                			query="update myTable 
                			set status='FINISH WITH ERROR', EndDateTime=GETDATE(), Notes=(:payload[Notes]) 
                			where Id in (:headers[Id])" />
                	</int:chain>
                
                	<int:logging-channel-adapter channel="chainToFailedOut"
                		log-full-message="true" level="DEBUG"></int:logging-channel-adapter>
                
                	<!-- If you don't listen to the default error channel you risk losing track 
                		of exceptions, as they cannot be passed back to the sender in band. It is 
                		recommended to have a generic error handler in your configuration to prevent 
                		this. -->
                	<int-stream:stderr-channel-adapter
                		channel="errorChannel" append-newline="true" />
                The code that highlighted with red color is the code that I changed based on your advice. My questions are :
                1. The logging-channel-adapter seem to do nothing here when I run my application. What should I do with this logging-channel-adapter? Note that I have tried to raise some divided by zero exception in my transformer object.
                2. I try to remove the default poller in my setup code, and when I run the application it keeps throwing exception that said the jdbc inbound channel adapter don't have any poller and there is no default poller too. So I still have to add an internal poller inside of the jdbc inbound channel adapter, is it okay if I do this? Or do you have any other recommendation?
                3. The channel 'jdbcToChainOut' still need the internal queue element, because everytime I try to remove it, I will get exception that said the channel is a subscriable channel not pollable. It is okay if I don't remove this queue element?
                4. The internal poller element inside the first chain element, have I do it right?
                5. Last but not least, I just want to confirm my understanding from what I have done so far.
                First the jdbc inbound channel adapter will pull 5 row data every second and put it at a queue channel called jdbcToChainOut which the maximum capacity is 5 data. Then there is a second poller which will create 5 task executor in 5 different thread every second to run the first chain.
                When I check the result in activeMQ console, I got 5 messages that enqueue with 1 message finished at x time, 2 messages at y time, and 2 messages at z time. I think the process of my application already running in multithread, am I right? Please kindly advice me if I'm wrong.

                Thanks & Regards.
                Last edited by willy.juisan; Jun 13th, 2013, 05:23 AM.

                Comment


                • #9
                  You are clearly not completely reading or understanding my replies. Once again...

                  1. Remove the <queue/> from the channel; otherwise the transaction won't work.
                  2. Remove the <poller/> from the chain.
                  3. Set the task-executor on the adapter's poller

                  The logging channel adapter will log the message, as long as you have DEBUG logging level set.

                  Comment


                  • #10
                    Originally posted by Gary Russell View Post
                    You are clearly not completely reading or understanding my replies. Once again...

                    1. Remove the <queue/> from the channel; otherwise the transaction won't work.
                    2. Remove the <poller/> from the chain.
                    3. Set the task-executor on the adapter's poller
                    Ok, the only poller will be in the jdbc inbound channel adapter, and it's a task executor poller, so my code looks like this now :
                    Code:
                    	<int-jdbc:inbound-channel-adapter id="jdbcNotificationIn"
                    		data-source="dataSource"
                    		query="select *  
                    			from myTable 
                    			where status='OPEN' order by Priority"
                    		channel="jdbcToChainOut"
                    		update="update myTable set status='INPROCESS', StartDateTime=GETDATE() 
                        		where Id in (:Id)"
                    		max-rows-per-poll="5">
                    		<int:poller task-executor="notificationExecutor"
                    			fixed-delay="1000" error-channel="chainToFailedOut">
                    			<int:transactional />
                    		</int:poller>
                    	</int-jdbc:inbound-channel-adapter>
                    
                    	<int:channel id="jdbcToChainOut"/>
                    
                    	<int:chain input-channel="jdbcToChainOut">
                    		<int:splitter ref="notificationSplitter" />
                    		<int:transformer ref="notificationTransformer" />
                    		<int-jms:outbound-channel-adapter id="jmsNotificationOut"
                    			destination="notificationQueue" />
                    	</int:chain>
                    
                    	<task:executor id="notificationExecutor" pool-size="5" />
                    
                    	<int:publish-subscribe-channel id="chainToFailedOut" />
                    
                    	<int:chain input-channel="chainToFailedOut">
                    		<int:transformer ref="errorTransformer" />
                    		<int-jdbc:outbound-channel-adapter
                    			data-source="dataSource"
                    			query="update myTable 
                    			set status='FINISH WITH ERROR', EndDateTime=GETDATE(), Notes=(:payload[Notes]) 
                    			where Id in (:headers[Id])" />
                    	</int:chain>
                    
                    	<int:logging-channel-adapter channel="chainToFailedOut"
                    		log-full-message="true" level="DEBUG" />
                    
                    	<!-- If you don't listen to the default error channel you risk losing track 
                    		of exceptions, as they cannot be passed back to the sender in band. It is 
                    		recommended to have a generic error handler in your configuration to prevent 
                    		this. -->
                    	<int-stream:stderr-channel-adapter
                    		channel="errorChannel" append-newline="true" />
                    Is this Ok Gary? With this code should the multithreading and transaction working now?

                    The logging channel adapter will log the message, as long as you have DEBUG logging level set.
                    So how can I set logging level to DEBUG?

                    Thanks & Regards.

                    Comment


                    • #11
                      Just want to confirm if my understanding of the code is right or wrong :
                      there is only one poller that will select data to jdbc database every 1000 ms and get maximum 5 row to process it in the chain using 5 thread conccurent.

                      Please somebody explain to me the meaning of this code, cause I still confuse about it.

                      Thanks & Regards.

                      Comment


                      • #12
                        To turn on debug logging consult your logging subsystem documentation (e.g. log4j).

                        With this configuration, the database will be polled once a second and up to 5 rows will be retrieved. You will only get more than one thread if it takes more than 1 second to process those rows.

                        The fixed-delay determines how often we ask for rows, the task executor defines the maximum number of threads.

                        If you have many rows to process, you should poll more often so that you get more threads running.

                        Comment

                        Working...
                        X