Announcement Announcement Module
Collapse
No announcement yet.
Enable Multithreading in JMS Message Driven Channel Adapter Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Enable Multithreading in JMS Message Driven Channel Adapter

    Hi everyone,
    I have some question about multithreading and jms message driven channel adapter. I want to make my jms message driven channel adapter is multithreading, so it will have a concurrent jms consumer at one time. After doing some research and reading the documentation, I found out that I can make multiple concurrent consumers by adding concurrent-consumers and max-concurrent-consumers attribute to jms message driven channel adapter element. Here is my code :
    Code:
    	<jms:message-driven-channel-adapter
    		id="jmsNotificationIn" destination="notificationQueue" channel="jmsToRouterOut"
    		error-channel="errorChannel" concurrent-consumers="2" max-concurrent-consumers="2"/>
    
    	<channel id="jmsToRouterOut" />
    
    	<router ref="notificationRouter" input-channel="jmsToRouterOut"
    		method="route" />
    
    	<channel id="routerToStdOut" />
    
    	<stream:stdout-channel-adapter id="stdout"
    		channel="routerToStdOut" append-newline="true" />
    
    	<channel id="routerToEmailServiceOut" />
    
    	<service-activator input-channel="routerToEmailServiceOut"
    		output-channel="serviceToJdbcOut" ref="emailUtilities" method="send"></service-activator>
    
    	<channel id="serviceToJdbcOut" />
    
    	<jdbc:outbound-channel-adapter
    		data-source="dataSource" channel="serviceToJdbcOut"
    		query="update myTable set status='FINISH', EndDateTime=GETDATE() where Id in (:payload)" />
    
    	<chain input-channel="errorChannel">
    		<transformer ref="errorSender" />
    		<jdbc:outbound-channel-adapter
    			data-source="dataSource"
    			query="update myTable 
     			set status='FINISH WITH ERROR', EndDateTime=GETDATE(), Notes=(:payload[Notes])  
     			where Id in (:headers[Id])" />
    	</chain>
    	
    	<!-- If you don't listen to the default error channel you risk losing track 
    		of exceptions, as they cannot be passed back to the sender in band. It is 
    		recommended to have a generic error handler in your configuration to prevent 
    		this. -->
    	<stream:stderr-channel-adapter channel="errorChannel"
    		append-newline="true" />
    Here is my setup code :
    Code:
    	<bean id="connectionFactory"
    		class="org.springframework.jms.connection.CachingConnectionFactory">
    		<property name="targetConnectionFactory">
    			<bean class="org.apache.activemq.ActiveMQConnectionFactory">
    				<property name="brokerURL" value="tcp://localhost:61616" />
    				<property name="redeliveryPolicy">
    					<bean class="org.apache.activemq.RedeliveryPolicy">
    						<property name="maximumRedeliveries" value="3"></property>
    					</bean>
    				</property>
    			</bean>
    		</property>
    		<property name="sessionCacheSize" value="10" />
    		<property name="cacheProducers" value="false" />
    	</bean>
    
    	<bean id="notificationQueue" class="org.apache.activemq.command.ActiveMQQueue">
    		<constructor-arg value="notification.messages" />
    	</bean>
    My questions are :
    1. Is the way I doing right now is the best one to achieve multithreading processing message from jms?
    2. What is the best practice for setting the value of concurrent consumers and max concurrent consumers attribute?
    3. What type of connection factory that fit my need? Is caching connection factory is a good one or there is a better one?
    4. Is the redelivery policy still can be used when we are using concurrent consumer? (Pardon me if this question is a nob question, because I still new to JMS and Spring Integration)
    5. How do I setup things to avoid memory leak when using multithreading?

    Thanks & Regards.
    Last edited by willy.juisan; Jun 13th, 2013, 12:20 AM.

  • #2
    1. Yes
    2. It's hard to say - it depends on your application, the resources available, and the desired concurrency.
    3. You typically don't need a caching connection factory when consuming using a listener container (message driven adapter); it may actually cause problems if you set max-consumers higher than consumers. A caching factory is typically only needed when sending to JMS.
    4. Yes, but that's a function of the broker, not JMS or Spring.
    5. It's not clear what you mean; there's no difference from single-threading.

    Comment


    • #3
      3. You typically don't need a caching connection factory when consuming using a listener container (message driven adapter); it may actually cause problems if you set max-consumers higher than consumers. A caching factory is typically only needed when sending to JMS.
      So in my case, what is the best connection factory that fit my need, especially when we want to use concurrent consumers for processing jms messages?

      5. It's not clear what you mean; there's no difference from single-threading.
      What I mean is, I would like to know what is the best practice If I want to work with multithreading in spring integration environment, especially in my case.

      Thanks & Regards.

      Comment


      • #4
        Just use the native ActiveMQConnectionFactory

        But you specifically asked about memory leaks - there are no addition considerations for memory leaks; a memory leak is where code retains a reference to an object that is no longer needed; avoiding such things has nothing to do with threading.

        Much more important is thread safety; objects like your emailUtilities must have no shared state (instance variables) or, if it does, it must be protected with locks (but it's better to try to avoid altogether).

        Comment


        • #5
          Below is my emailUtilities class, does it have any instance variables or any shared state that will cause any problem in thread safety?
          Code:
          public class EmailUtil {
          	/**
          	 * Utility for send email
          	 * 
          	 */
          	public List<Long> send(Map<String, String> message) {
          		List<Long> result = new ArrayList<Long>();
          		
          		if (message != null) {
          			JavaMailSenderImpl mailSender = new JavaMailSenderImpl();
          			
          			//Setting sender with simple mail transfer protocol (smtp)
          			mailSender.setHost(message.get("SmtpServer"));
          			mailSender.setPort(Integer.valueOf(message.get("SmtpPort")));
          			
          			if (Boolean.valueOf(message.get("IsRequiredAuthentication"))) {
          				mailSender.setUsername(message.get("AuthenticationUsername"));
          				mailSender.setPassword(message.get("AuthenticationPassword"));
          			}
          			
          			try {
          				//Create MIME message
          				MimeMessage mimeMessage = mailSender.createMimeMessage();
          				MimeMessageHelper helper = new MimeMessageHelper(mimeMessage, true);
          				
          				helper.setSubject(message.get("Subject"));
          				helper.setFrom(message.get("From"));
          				if (message.get("To") != null && message.get("To").contains("@"))
          					helper.setTo(message.get("To").split(","));
          				if (message.get("Cc") != null && message.get("Cc").contains("@"))
          					helper.setCc(message.get("Cc").split(","));
          				if (message.get("Bcc") != null && message.get("Bcc").contains("@"))
          					helper.setBcc(message.get("Bcc").split(","));
          				
          				helper.setText(message.get("Content"), true);
          				//TODO: remove for deploy in production
          				//let's attach the infamous windows Sample file (this time copied to c:/)
          				FileSystemResource file = new FileSystemResource(new File("c:/Sample.png"));
          				helper.addAttachment("CoolImage.jpg", file);
          				
          				//Send MIME message
          				mailSender.send(mimeMessage);
          				
          				result.add(Long.valueOf(message.get("Id")));					
          			} catch (MessagingException me) {
          				me.printStackTrace();
          			}
          		}		
          		return result;
          	}
          }
          Thanks & Regards.
          Last edited by Gary Russell; Jun 13th, 2013, 04:39 AM.

          Comment


          • #6
            I am sorry, but this is not the place to learn Java.

            But yes, this class is thread safe.

            However, I would ask why you wrote this code instead of using an <int-mail:outbound-channel-adapter/>.

            Comment


            • #7
              Originally posted by Gary Russell View Post
              I am sorry, but this is not the place to learn Java.

              But yes, this class is thread safe.

              However, I would ask why you wrote this code instead of using an <int-mail:outbound-channel-adapter/>.
              Because I need to send each mail using it's own properties (smtp server, port, username, password, etc).
              Let's say I need to send 3 mails :
              - mailA will be send using smtpserver=gmail, port=1, username=steve, password=stevecool
              - mailB will be send using smtpserver=hotmail, port=2, username=bill, password=billrich
              - mailC will be send using smtpserver=oracle, port=3, username=larry, password=larrygood
              Which is the detail of smtp server, port, username, etc cannot be write down in spring xml file, because its too dynamic, we will know it at runtime (well as far as I know, pardon me if it's not right). I will be happy if I can use mail outbond channel adapter to achieve my goal.

              Thanks & Regards.

              Comment


              • #8
                in that case, you are correct to use custom code.

                Comment


                • #9
                  Originally posted by Gary Russell View Post
                  in that case, you are correct to use custom code.
                  Thanks Gary, you have help me a lot. I should say that your support was excellence.
                  Really appreciate it.

                  Thanks & Regards.

                  Comment

                  Working...
                  X