Announcement Announcement Module
Collapse
No announcement yet.
Jmstemplate is a bottle neck in performance Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Jmstemplate is a bottle neck in performance

    Hi, i am comparing the performance of active mq using spring and with out spring.
    it seems that with spring is much slower.
    It looks like the problem is with sending and not receiving. When i use jmstemplate it is much slower.
    any ideas why?
    how can i improve performance of jms template?

    please note that i tested these 2 configurations:

    Code:
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate" lazy-init="true">
             <property name="connectionFactory" ref="pooledConnectionFactory"/>  
         </bean>
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate" lazy-init="true">
             <property name="connectionFactory" ref="cachingConnectionFactory"/>  
         </bean>
    thanks
    Last edited by michals; Dec 5th, 2012, 06:15 AM.

  • #2
    Post your tests... My guess is that you are comparing apples and oranges...

    Also when posting code use [ code][/code ] tags that way it remains readable.

    Comment


    • #3
      I can't post my entire test, i don't see where i can attach files.

      I am testing this flow:
      1.Client->send request ->Server.
      2. Server->send response->Client.
      3. Client->receive response&send message to some measurement client
      4. Measurement Client->receive message - after receiving all messages it will stop the timer.

      This full flow is tested for 300k messages.
      and it takes for Activemq with no spring half the time as it does for with spring.

      Here is the spring xml i am using.
      Code:
      <!-- ********************** Consumer ********************************* --> 
      	<bean id="clientListener" class="example.ClientReceive">
      		<property name="receiveQueue" ref="receiveQueue" />
      		<property name="sinkQueue" ref="sinkQueue" />
      		<property name="clientSender" ref="clientSender"/> 
      	</bean>
      
      	<bean id="containerClient"
      		class="org.springframework.jms.listener.DefaultMessageListenerContainer">
      		<property name="concurrentConsumers" value="20"/><!-- if it is not 1 - it is difficult to see distribution to other services!!!change to 50,20 -->
                  <property name="maxConcurrentConsumers" value="50"/><!-- if it is not 1 - it is difficult to see distribution to other services!!!change to 50-->
                  <property name="idleTaskExecutionLimit" value="10"/>
                  <!-- <property name="maxMessagesPerTask" value="30"/> -->
      		<property name="connectionFactory" ref="singleConnectionFactory" />
      		<property name="destination" ref="receiveQueue" />
      		<property name="messageListener" ref="clientListener" />
      		<property name="sessionAcknowledgeMode" value="1"/>
      	</bean>
      	
      	
      	
      	<bean id="receiveQueue" class="org.apache.activemq.command.ActiveMQQueue">
              <constructor-arg value="queueA"/>
          </bean>
          <bean id="sinkQueue" class="org.apache.activemq.command.ActiveMQQueue">
              <constructor-arg value="sinkQueue"/>
          </bean>
          
          <!-- **********************  END-Consumer *****************************	-->
          
      	
      	<!-- ********************** Producer ********************************* --> 
      	<bean id="clientSender" class="example.ClientSend">
      		 <property name="sendQueue" ref="sendQueue" /> 
      		 <property name="jmsTemplate" ref="jmsTemplate"/>  
      	</bean>
      	
      	 <bean id="sendQueue" class="org.apache.activemq.command.ActiveMQQueue">
              <constructor-arg value="queueB"/>
          </bean> 
          
           <!-- for send response on temp q-->
          <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate" lazy-init="true">
               <property name="connectionFactory" ref="cachingConnectionFactory"/>  
           </bean>
      	
      	<!-- ********************** END Producer ********************************* -->
       
          
          
          <!-- ********************** Common ********************************* -->
          
      	<bean id="connectionFactory" 
      		class="org.apache.activemq.ActiveMQConnectionFactory">
      		 <property name="brokerURL" value="failover:(tcp://localhost:61616)?randomize=false" />    <!-- Single broker-->
      		 <!-- <property name="brokerURL" value="tcp://localhost:61616" /> --> 
      		  <!-- <property name="brokerURL" value="failover:(tcp://localhost:61616?wireFormat.tightEncodingEnabled=false)?randomize=false" /> --><!-- for vertical scalability-->
      		 <!-- <property name="brokerURL" value="failover:(tcp://localhost:61616,tcp://dev15:61616)?randomize=true"/> --> <!-- network of brokers -->
      		<!-- <property name="brokerURL" value="failover:(tcp://dev15:61616)?randomize=true"/> -->
      		<!-- <property name="brokerURL" value="vm://amq3?brokerConfig=xbean:amq3.xml" /> --><!-- embedded broker for network of brokers -->
      		<!-- <property name="brokerURL" value="failover:(tcp://0.0.0.0:61618,tcp://0.0.0.0:61617)" />  --><!-- service broker for network of brokers : connect to amq3-->
      		<!-- <property name="brokerURL" value="failover:(tcp://0.0.0.0:61618)" /> --> <!-- connection amq3 -->
      	</bean>
      	
          <bean id="singleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
          	 <property name="targetConnectionFactory"
          		ref="connectionFactory" />
          		<property name="reconnectOnException" value="true"/>
          </bean>
          
           <bean id="pooledConnectionFactory"  
      		class="org.apache.activemq.pool.PooledConnectionFactory">
      		<constructor-arg ref="connectionFactory"/>
      		<property name="maxConnections" value="10" /> 
      	</bean>    
      	
      	  <bean id="cachingConnectionFactory"  
      		class="org.springframework.jms.connection.CachingConnectionFactory">
      		<constructor-arg ref="singleConnectionFactory"/>
      		<property name="sessionCacheSize" value="100" /> 
      	</bean>   
      	
          <!-- <bean id="sinkQueue" class="org.apache.activemq.command.ActiveMQQueue">
              <constructor-arg value="sinkQueue"/>
          </bean> -->
      	
           
      </beans>
      I am not sure what you mean by "comparing apples and oranges"... i am testing the same code, once with spring and once with
      creating my own activemq objects and using

      Comment


      • #4
        I am not sure what you mean by "comparing apples and oranges"... i am testing the same code, once with spring and once with
        creating my own activemq objects and using
        Which is exactly what I mean by comparing apples and oranges...

        Hence my request for the code you wrote/use (inlcuding the configuration)...

        Comment


        • #5
          Is there a way to attach files other than images?

          Comment


          • #6
            By simply attaching them (you might need to go to advanced mode for that)... If the code isn't that large you could copy/paste in the post using [ code][/code ] tags to maintain formatting.

            Comment


            • #7
              This is a big example and for some reason when i try to attach the files - it indicates that the file is bad.
              I will add some of the code here: which shows how the client is implemented. the server is basically the same
              so i didn't add it.
              thanks

              Code:
              spring-client.xml:
              <beans xmlns="http://www.springframework.org/schema/beans"
              	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
              	xmlns:jms="http://www.springframework.org/schema/jms"
              	xsi:schemaLocation="http://www.springframework.org/schema/beans 
              	       	http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
              	       	http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd
                     		http://www.springframework.org/schema/context
                     		http://www.springframework.org/schema/context/spring-context-3.0.xsd">
              
              
              	<!-- ********************** Consumer ********************************* --> 
              	<bean id="clientListener" class="example.ClientReceive">
              		<property name="receiveQueue" ref="receiveQueue" />
              		<property name="sinkQueue" ref="sinkQueue" />
              		<property name="clientSender" ref="clientSender"/> 
              	</bean>
              
              	<bean id="containerClient"
              		class="org.springframework.jms.listener.DefaultMessageListenerContainer">
              		<property name="concurrentConsumers" value="20"/><!-- if it is not 1 - it is difficult to see distribution to other services!!!change to 50,20 -->
                          <property name="maxConcurrentConsumers" value="50"/><!-- if it is not 1 - it is difficult to see distribution to other services!!!change to 50-->
                          <property name="idleTaskExecutionLimit" value="10"/>
                          <!-- <property name="maxMessagesPerTask" value="30"/> -->
              		<property name="connectionFactory" ref="singleConnectionFactory" />
              		<property name="destination" ref="receiveQueue" />
              		<property name="messageListener" ref="clientListener" />
              		<property name="sessionAcknowledgeMode" value="1"/>
              	</bean>
              	
              	
              	
              	<bean id="receiveQueue" class="org.apache.activemq.command.ActiveMQQueue">
                      <constructor-arg value="queueA"/>
                  </bean>
                  <bean id="sinkQueue" class="org.apache.activemq.command.ActiveMQQueue">
                      <constructor-arg value="sinkQueue"/>
                  </bean>
                  
                  <!-- **********************  END-Consumer *****************************	-->
                  
              	
              	<!-- ********************** Producer ********************************* --> 
              	<bean id="clientSender" class="example.ClientSend">
              		 <property name="sendQueue" ref="sendQueue" /> 
              		 <property name="jmsTemplate" ref="jmsTemplate"/>  
              	</bean>
              	
              	 <bean id="sendQueue" class="org.apache.activemq.command.ActiveMQQueue">
                      <constructor-arg value="queueB"/>
                  </bean> 
                  
                   <!-- for send response on temp q-->
                  <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate" lazy-init="true">
                       <property name="connectionFactory" ref="cachingConnectionFactory"/>  
                   </bean>
              	
              	<!-- ********************** END Producer ********************************* -->
               
                  
                  
                  <!-- ********************** Common ********************************* -->
                  
              	<bean id="connectionFactory" 
              		class="org.apache.activemq.ActiveMQConnectionFactory">
              		 <property name="brokerURL" value="failover:(tcp://localhost:61616)?randomize=false" />    <!-- Single broker-->
              		 <!-- <property name="brokerURL" value="tcp://localhost:61616" /> --> 
              		  <!-- <property name="brokerURL" value="failover:(tcp://localhost:61616?wireFormat.tightEncodingEnabled=false)?randomize=false" /> --><!-- for vertical scalability-->
              		 <!-- <property name="brokerURL" value="failover:(tcp://localhost:61616,tcp://dev15:61616)?randomize=true"/> --> <!-- network of brokers -->
              		<!-- <property name="brokerURL" value="failover:(tcp://dev15:61616)?randomize=true"/> -->
              		<!-- <property name="brokerURL" value="vm://amq3?brokerConfig=xbean:amq3.xml" /> --><!-- embedded broker for network of brokers -->
              		<!-- <property name="brokerURL" value="failover:(tcp://0.0.0.0:61618,tcp://0.0.0.0:61617)" />  --><!-- service broker for network of brokers : connect to amq3-->
              		<!-- <property name="brokerURL" value="failover:(tcp://0.0.0.0:61618)" /> --> <!-- connection amq3 -->
              	</bean>
              	
                  <bean id="singleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
                  	 <property name="targetConnectionFactory"
                  		ref="connectionFactory" />
                  		<property name="reconnectOnException" value="true"/>
                  </bean>
                  
                   <bean id="pooledConnectionFactory"  
              		class="org.apache.activemq.pool.PooledConnectionFactory">
              		<constructor-arg ref="connectionFactory"/>
              		<property name="maxConnections" value="10" /> 
              	</bean>    
              	
              	  <bean id="cachingConnectionFactory"  
              		class="org.springframework.jms.connection.CachingConnectionFactory">
              		<constructor-arg ref="singleConnectionFactory"/>
              		<property name="sessionCacheSize" value="100" /> 
              	</bean>   
              	
                  <!-- <bean id="sinkQueue" class="org.apache.activemq.command.ActiveMQQueue">
                      <constructor-arg value="sinkQueue"/>
                  </bean> -->
              	
                   
              </beans>
              
              public class ClientReceive implements MessageListener{
              	Connection connection = null;
              	Destination receiveQueue = null;
              	Destination sinkQueue = null;
              	ClientSend clientSender = null;
              	private AtomicInteger counter = new AtomicInteger(0);
              	private String id = null;
              	private boolean verbose = false; 
              	
              	
              	
              	public ClientSend getClientSender() {
              		return clientSender;
              	}
              	public void setClientSender(ClientSend clientSender) {
              		this.clientSender = clientSender;
              	}
              	public Destination getSinkQueue() {
              		return sinkQueue;
              	}
              	public void setSinkQueue(Destination sinkQueue) {
              		this.sinkQueue = sinkQueue;
              	}
              	
              	public Destination getReceiveQueue() {
              		return receiveQueue;
              	}
              	public void setReceiveQueue(Destination receiveQueue) {
              		this.receiveQueue = receiveQueue;
              	}
              	public void start(String id, boolean verbose) throws JMSException {
              		this.id = id;
              		this.verbose = verbose;
              		clientSender.start(id, verbose); 
              	}
              	public void onMessage(Message message) { 
              		try {
              			message.acknowledge();//otherwise the messages get stuck, check why?????
              
              			LogicResponse response = (LogicResponse)(((ObjectMessage)message).getObject());
              			
              			// send to sink notification
              			clientSender.sendRequest(null, sinkQueue);
              
              			if (this.verbose == true){
              				System.out.println("Received response: " + response.getData());
              			}
              			
              		} 
              		catch (Exception e) {
              				e.printStackTrace();
              		}
              	}
              	public class SinkMessageCreator implements MessageCreator {
              
                  	//message is of type javax.jms.Message
                  	private Message message = null;
                  	private Destination destination = null;
                  	
                  	public SinkMessageCreator(Destination destination)
                  	{
                  		this.destination = destination;
                  	}
                  	
               public Message createMessage(Session session) throws JMSException 
                  	{
                  		message = session.createTextMessage();
                  		return message;
                  	}
              	}
              
              
              public class ClientSend{
              	Connection connection = null;
              	Session session = null;
              	String brokerUrl = "tcp://localhost:61616";
              	MessageProducer producer = null;
              	Destination sendQueue = null;
                  private AtomicInteger counter = new AtomicInteger(0);
              
              	long end = 0;
              	long start = 0;
              	private String id = null;
              	private boolean verbose = false; 
              	
              	JmsTemplate jmsTemplate = null;
              	
              	public JmsTemplate getJmsTemplate() {
              		return jmsTemplate;
              	}
              	public void setJmsTemplate(JmsTemplate jmsTemplate) {
              		this.jmsTemplate = jmsTemplate;
              	}
              	public Destination getSendQueue() {
              		return sendQueue;
              	}
              	public void setSendQueue(Destination sendQueue) {
              		this.sendQueue = sendQueue;
              	}
              	
              	public void start(String id, boolean verbose) throws JMSException {
              		this.id = id;
              		this.verbose = verbose;
              	}
              
              	public void sendRequest(LogicRequest request) throws JMSException {
              		sendRequest(request, sendQueue);
              	}
              	public void sendRequest(LogicRequest request, Destination queue) throws JMSException {
              		jmsTemplate.send(queue, new RequestMessageCreator(request));
              
              	}
              	
              	public class RequestMessageCreator implements MessageCreator {
              
                  	//message is of type javax.jms.Message
                  	private Message message = null;
                  	private Object request = null;
                  	private String correlationId = null;
                  	
                  	public RequestMessageCreator(Object request)
                  	{
                  		this.request = request;
                  	}
                  	
                  	public Message createMessage(Session session) throws JMSException 
                  	{
                  		correlationId = UUID.randomUUID().toString(); 
                  		 
                  		if (this.request != null){
                  			message = session.createObjectMessage((Serializable)this.request);
                  		}
                  		else{
                  			message = session.createTextMessage();
                  		}
                  		message.setJMSCorrelationID(correlationId);
                  		return message;
                  	}
              
              		public Object getRequest() {
              			return request;
              		}
              
              		public void setRequest(Object request) {
              			this.request = request;
              		}
              
                  }

              Comment


              • #8
                spring-client.xml:
                Code:
                !-- ********************** Consumer ********************************* --> 
                	<bean id="clientListener" class="example.ClientReceive">
                		<property name="receiveQueue" ref="receiveQueue" />
                		<property name="sinkQueue" ref="sinkQueue" />
                		<property name="clientSender" ref="clientSender"/> 
                	</bean>
                
                	<bean id="containerClient"
                		class="org.springframework.jms.listener.DefaultMessageListenerContainer">
                		<property name="concurrentConsumers" value="20"/><!-- if it is not 1 - it is difficult to see distribution to other services!!!change to 50,20 -->
                            <property name="maxConcurrentConsumers" value="50"/><!-- if it is not 1 - it is difficult to see distribution to other services!!!change to 50-->
                            <property name="idleTaskExecutionLimit" value="10"/>
                            <!-- <property name="maxMessagesPerTask" value="30"/> -->
                		<property name="connectionFactory" ref="singleConnectionFactory" />
                		<property name="destination" ref="receiveQueue" />
                		<property name="messageListener" ref="clientListener" />
                		<property name="sessionAcknowledgeMode" value="1"/>
                	</bean>
                	
                	<bean id="receiveQueue" class="org.apache.activemq.command.ActiveMQQueue">
                        <constructor-arg value="queueA"/>
                    </bean>
                    <bean id="sinkQueue" class="org.apache.activemq.command.ActiveMQQueue">
                        <constructor-arg value="sinkQueue"/>
                    </bean>
                    
                    <!-- **********************  END-Consumer *****************************	-->
                    
                	<!-- ********************** Producer ********************************* --> 
                	<bean id="clientSender" class="example.ClientSend">
                		 <property name="sendQueue" ref="sendQueue" /> 
                		 <property name="jmsTemplate" ref="jmsTemplate"/>  
                	</bean>
                	
                	 <bean id="sendQueue" class="org.apache.activemq.command.ActiveMQQueue">
                        <constructor-arg value="queueB"/>
                    </bean> 
                    
                     <!-- for send response on temp q-->
                    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate" lazy-init="true">
                         <property name="connectionFactory" ref="cachingConnectionFactory"/>  
                     </bean>
                	
                	<!-- ********************** END Producer ********************************* -->
                    
                    <!-- ********************** Common ********************************* -->
                    
                	<bean id="connectionFactory" 
                		class="org.apache.activemq.ActiveMQConnectionFactory">
                		 <property name="brokerURL" value="failover:(tcp://localhost:61616)?randomize=false" />    <!-- Single broker-->
                	</bean>
                	
                    <bean id="singleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
                    	 <property name="targetConnectionFactory"
                    		ref="connectionFactory" />
                    		<property name="reconnectOnException" value="true"/>
                    </bean>
                    
                     <bean id="pooledConnectionFactory"  
                		class="org.apache.activemq.pool.PooledConnectionFactory">
                		<constructor-arg ref="connectionFactory"/>
                		<property name="maxConnections" value="10" /> 
                	</bean>    
                	
                	  <bean id="cachingConnectionFactory"  
                		class="org.springframework.jms.connection.CachingConnectionFactory">
                		<constructor-arg ref="singleConnectionFactory"/>
                		<property name="sessionCacheSize" value="100" /> 
                	</bean>   
                	
                    <!-- <bean id="sinkQueue" class="org.apache.activemq.command.ActiveMQQueue">
                        <constructor-arg value="sinkQueue"/>
                    </bean> -->
                	
                     
                </beans>
                Client receive:
                Code:
                public class ClientReceive implements MessageListener{
                	Destination receiveQueue = null;
                	Destination sinkQueue = null;
                	ClientSend clientSender = null;
                	private AtomicInteger counter = new AtomicInteger(0);
                			
                	public ClientSend getClientSender() {
                		return clientSender;
                	}
                	public void setClientSender(ClientSend clientSender) {
                		this.clientSender = clientSender;
                	}
                	public Destination getSinkQueue() {
                		return sinkQueue;
                	}
                	public void setSinkQueue(Destination sinkQueue) {
                		this.sinkQueue = sinkQueue;
                	}
                	
                	public Destination getReceiveQueue() {
                		return receiveQueue;
                	}
                	public void setReceiveQueue(Destination receiveQueue) {
                		this.receiveQueue = receiveQueue;
                	}
                	public void start() throws JMSException {
                		clientSender.start(id, verbose); 
                	}
                	public void onMessage(Message message) { 
                		try {
                			message.acknowledge();//otherwise the messages get stuck, check why?????
                
                			LogicResponse response = (LogicResponse)(((ObjectMessage)message).getObject());
                			
                			// send to sink notification
                			clientSender.sendRequest(null, sinkQueue);
                
                		} 
                		catch (Exception e) {
                				e.printStackTrace();
                		}
                	}
                	public class SinkMessageCreator implements MessageCreator {
                
                    	//message is of type javax.jms.Message
                    	private Message message = null;
                    	private Destination destination = null;
                    	
                    	public SinkMessageCreator(Destination destination)
                    	{
                    		this.destination = destination;
                    	}
                    	
                 public Message createMessage(Session session) throws JMSException 
                    	{
                    		message = session.createTextMessage();
                    		return message;
                    	}
                	}
                Client send:
                Code:
                public class ClientSend{
                	Destination sendQueue = null;
                    private AtomicInteger counter = new AtomicInteger(0);
                
                	JmsTemplate jmsTemplate = null;
                	
                	public JmsTemplate getJmsTemplate() {
                		return jmsTemplate;
                	}
                	public void setJmsTemplate(JmsTemplate jmsTemplate) {
                		this.jmsTemplate = jmsTemplate;
                	}
                	public Destination getSendQueue() {
                		return sendQueue;
                	}
                	public void setSendQueue(Destination sendQueue) {
                		this.sendQueue = sendQueue;
                	}
                	
                	public void start(String id, boolean verbose) throws JMSException {
                		this.id = id;
                		this.verbose = verbose;
                	}
                
                	public void sendRequest(LogicRequest request) throws JMSException {
                		sendRequest(request, sendQueue);
                	}
                	public void sendRequest(LogicRequest request, Destination queue) throws JMSException {
                		jmsTemplate.send(queue, new RequestMessageCreator(request));
                
                	}
                	
                	public class RequestMessageCreator implements MessageCreator {
                
                    	//message is of type javax.jms.Message
                    	private Message message = null;
                    	private Object request = null;
                    	private String correlationId = null;
                    	
                    	public RequestMessageCreator(Object request)
                    	{
                    		this.request = request;
                    	}
                    	
                    	public Message createMessage(Session session) throws JMSException 
                    	{
                    		correlationId = UUID.randomUUID().toString(); 
                    		 
                    		if (this.request != null){
                    			message = session.createObjectMessage((Serializable)this.request);
                    		}
                    		else{
                    			message = session.createTextMessage();
                    		}
                    		message.setJMSCorrelationID(correlationId);
                    		return message;
                    	}
                
                		public Object getRequest() {
                			return request;
                		}
                
                		public void setRequest(Object request) {
                			this.request = request;
                		}
                
                    }
                Last edited by michals; Dec 6th, 2012, 03:22 AM.

                Comment


                • #9
                  It must be me but I only see Spring stuff no plain JMS stuff. Also could you seperate the classes in different code blocks one big gulp is n't really readable.

                  Comment


                  • #10
                    oh, i just added the spring test. i will also add the test which does not use spring.
                    I also fixed the previous post to be mode readable.

                    Here is the code for just plain AMQ:

                    Server:
                    Code:
                    public class Server implements MessageListener {
                    	BrokerService broker = null;
                    	String brokerUrl = new String("failover:(tcp://localhost:61616)?randomize=false");//failover:(tcp://localhost:61616)
                    	Session session;
                    	MessageProducer producer = null;
                    	MessageConsumer consumer = null;
                    	private AtomicInteger counter = new AtomicInteger(0);
                    	ActiveMQConnectionFactory connectionFactory = null;
                    	Connection connection = null;
                    	Destination tasksQueue = null;
                    	
                    	public void start() throws Exception {
                    //		createBroker(); 
                    		setupConsumer();
                    	}
                    	
                    	private void createBroker() throws Exception { 
                    		broker = new BrokerService(); 
                    		broker.setPersistent(false); 
                    		broker.setUseJmx(false); 
                    		broker.addConnector(brokerUrl); 
                    		broker.start();
                    	}
                    	private void setupConsumer() throws JMSException {
                    		connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
                    		connection = connectionFactory.createConnection(); 
                    		connection.start(); 
                    		session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
                    		tasksQueue = session.createQueue(JmsConfiguration.AGENT_QUEUE_NAME);
                    		producer = session.createProducer(null); 
                    		producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
                    		consumer = session.createConsumer(tasksQueue);
                    		
                    		consumer.setMessageListener(this); 
                    	}
                    	public void onMessage(Message message) {
                    	try {
                    		LogicResponse response = null;
                    			if (message instanceof ObjectMessage) {
                    				LogicRequest request = (LogicRequest)(((ObjectMessage)message).getObject());
                    				String param = (String)request.getParam("test");
                    				response = new LogicResponse(handleRequest(param));
                    			}
                    			ObjectMessage responseObj = session.createObjectMessage(response);
                    			responseObj.setJMSCorrelationID(message.getJMSCorrelationID());
                    			producer.send(message.getJMSReplyTo(), responseObj); 
                    		} 
                    		catch (JMSException e) {
                    			e.printStackTrace(); 
                    		}
                    	}
                    	
                    	public String handleRequest(String messageText) { 
                    		return "Response to '" + messageText + "'";
                    	} 
                    	public void stop() throws Exception { 
                    		if (producer != null){
                    			producer.close();
                    		}
                    
                    		if (consumer != null){
                    			consumer.setMessageListener(null);
                    			consumer.close();
                    			consumer = null;
                    		}
                    		if (session != null){
                    			session.close(); 
                    		}
                    		if (broker != null){
                    			broker.stop();
                    		}
                    		if (connection != null){
                    			connection.stop();
                    		}
                    		
                    	
                    	}
                    Client:
                    Code:
                    public class Client implements MessageListener{
                    	Connection connection = null;
                    	Session session = null;
                    	MessageProducer producer = null;
                    	MessageProducer sinkProducer = null;
                    	MessageConsumer consumer = null;
                    	Destination tasksQueue = null;
                    	Destination sinkQueue = null;
                    	Destination responseQueue = null;
                    	String jmsBrokerIp = ManagerConsts.JMS_BROKER_IP;//System.getProperty(ManagerConsts.JMS_BROKER_IP);
                        String jmsBrokerPort = ManagerConsts.JMS_BROKER_PORT;//System.getProperty(ManagerConsts.JMS_BROKER_PORT);
                        String jmsBrokerUrl = new String("failover:(tcp://" + jmsBrokerIp + ":" + jmsBrokerPort+")?randomize=false");//failover:(tcp://localhost:61616)
                        private AtomicInteger count = new AtomicInteger(0);
                    	long end = 0;
                    	long start = 0;
                    	
                    	public void start() throws JMSException {
                    		ActiveMQConnectionFactory connectionFactory = 
                    		new ActiveMQConnectionFactory(jmsBrokerUrl);
                    		connection = connectionFactory.createConnection(); 
                    		connection.start(); 
                    		session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
                    	
                    		tasksQueue = session.createQueue(JmsConfiguration.AGENT_QUEUE_NAME);
                    		producer = session.createProducer(tasksQueue); 
                    		producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
                    		responseQueue = session.createQueue(JmsConfiguration.AGENT_DEST_QUEUE_NAME);
                    		consumer = session.createConsumer(responseQueue);
                    		consumer.setMessageListener(this);
                    		
                    		sinkQueue = session.createQueue(JmsConfiguration.SINK_QUEUE);
                    		sinkProducer = session.createProducer(sinkQueue); 
                    		sinkProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
                    		
                    	}
                    	public void request(LogicRequest request) throws JMSException {
                    		ObjectMessage message = session.createObjectMessage(request);
                    		message.setJMSReplyTo(responseQueue);
                    		String correlationId = UUID.randomUUID().toString(); 
                    		message.setJMSCorrelationID(correlationId); 
                    		this.producer.send(message);
                    	}
                    	
                    	public void onMessage(Message message) { 
                    		try {
                    			int countCurr = count.incrementAndGet();
                    			
                    			TextMessage stam = session.createTextMessage();
                    			this.sinkProducer.send(stam);
                    
                    		} 
                    		catch (Exception e) {
                    				e.printStackTrace();
                    		}
                    	}
                    	public void stop() throws JMSException { 
                    		producer.close(); 
                    		consumer.close(); 
                    		session.close(); 
                    		connection.close();
                    	}
                    Sink:
                    Code:
                    public class Sink implements MessageListener{
                    	Connection connection = null;
                    	Session session = null;
                    //	String brokerUrl = "tcp://localhost:61616";
                    	MessageConsumer consumer = null;
                    	
                    	Destination sinkQueue = null;
                    	String jmsBrokerIp = ManagerConsts.JMS_BROKER_IP;//System.getProperty(ManagerConsts.JMS_BROKER_IP);
                        String jmsBrokerPort = ManagerConsts.JMS_BROKER_PORT;//System.getProperty(ManagerConsts.JMS_BROKER_PORT);
                        String jmsBrokerUrl = new String("failover:(tcp://" + jmsBrokerIp + ":" + jmsBrokerPort+")?randomize=false");//failover:(tcp://localhost:61616)
                        private AtomicInteger count = new AtomicInteger(0);
                    	long end = 0;
                    	long start = 0;
                    	
                    	public void start() throws JMSException {
                    		ActiveMQConnectionFactory connectionFactory = 
                    		new ActiveMQConnectionFactory(jmsBrokerUrl);
                    		connection = connectionFactory.createConnection(); 
                    		connection.start(); 
                    		session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
                    		sinkQueue = session.createQueue(JmsConfiguration.SINK_QUEUE);
                    		consumer = session.createConsumer(sinkQueue);
                    		consumer.setMessageListener(this);
                    	}
                    
                    	public void onMessage(Message message) { 
                    		try {
                    //			message.acknowledge();
                    			int countCurr = count.incrementAndGet();
                    			
                    			if (countCurr == 1){
                    				System.out.println("started timing ...");
                    				start = System.currentTimeMillis();// ////////////////TEST
                    			}
                    
                    			else if (countCurr == (RaiseClient.COUNT_MESSAGES*RaiseClient.CONTROLLERS_COUNT)){
                    			
                    				end = System.currentTimeMillis();// ////////////////TEST
                    				printDebugTime(start, end, "TestSink:"+countCurr);
                    				stop();
                    				System.exit(1);
                    			}
                    			System.out.print(countCurr+".");
                    			System.out.flush();
                    
                    		} 
                    		catch (Exception e) {
                    				e.printStackTrace();
                    		}
                    	}
                    	public void stop() throws JMSException { 
                    		consumer.close(); 
                    		session.close(); 
                    		connection.close();
                    	} 
                    		
                    }
                    Last edited by michals; Dec 6th, 2012, 03:23 AM.

                    Comment


                    • #11
                      Which is also where the difference is... When using JmsTemplate the producer is recreated each time (and if you are unlucky also the session) which takes time. Your plain solution creates a session/producer when constructed and reuse it always. So there is a difference there.

                      Also you are using a single connection for both receiving and sending in the spring sample might be of influence.

                      To get more information on what happens inside spring you might want to enable trace logging (this will impact performace!) for the org.springframework.jms package. That way you can see what happens.

                      Another note the CachingConnectionFactory is already a single connectionFactory so you don't need to point it to another singleconnectionfactory.

                      Your MessageListener isn't caching anything (but the connection) using a JmsTransactionManager will lead to caching of the consumer instead of recreation of consumers (or setting the cacheLevel property to 3 for caching/reusing the consumer) or use the cachingCOnnectionFactory instead of the singleConnectionFactory.

                      Comment


                      • #12
                        thanks for the reply.
                        I have a few questions:
                        1. So if I understand this correctly, since i am using cachingConnectionFactory for jmsTemplate, it reuses the session/producer.
                        doesn't it mean that this should increase performance since the session and producer creation each time is costly?

                        2. As for reusing the connection for sending and receiving, do you mean that I should separate this and use different connections for consumer and producer? even if this is the consumer and producer which is on the same process (i have a consumer & producer per process, they don't communicate within the same process)

                        3. you say that the MessageListener does not cache the consumers. but i defined concurrentConsumers=20. doesn't it mean that 20 are kept and reused?

                        thanks

                        Comment


                        • #13
                          1. In theory it should reuse but there is a key generated to determine the reuse or not (hence my suggestion to turn on TRACE logging to see if this is actually happening)

                          2. In your plain sample you also have different connections hence to create a consistent sample you should do the same for the spring one (or change the plain one to use a single connection).

                          3. No... It means that there are 20 threads consuming messages it doesn't mean that it reuses MessageConsumers (in general after a poll they are closed!).

                          Edit: Another difference is that Spring uses default PERSISTENT to send messages you use NON_PERSISTENT, switching that made a huge difference (at least for the plain client) when sending messages.
                          Last edited by Marten Deinum; Dec 6th, 2012, 05:41 AM.

                          Comment


                          • #14
                            Thanks for the great reply.
                            I tested what you suggested:
                            1. separating the connections between producer and consumer increased the performance greatly (300k on 2 directions~ 600k took 45 seconds instead of about 75 seconds)
                            2. updating to caching in the consumer instead of single & update the cacheLevel to 3- had no impact on performance - i tried the transaction manager but for some reason
                            my computer got stuck every time i ran it and i had to restart the machine - i am not sure how this relates, maybe many connections are created.
                            3. I updated the deliveryMode in jmsTemplate to be not persistence - but i didn't see any impact on performance which is weird.

                            This is the new spring xml file:
                            Code:
                            <!-- ********************** Consumer ********************************* --> 
                            	<bean id="clientListener" class="example.ClientReceive">
                            		<property name="receiveQueue" ref="receiveQueue" />
                            		<property name="sinkQueue" ref="sinkQueue" />
                            		<property name="clientSender" ref="clientSender"/> 
                            	</bean>
                            
                            	<bean id="containerClient"
                            		class="org.springframework.jms.listener.DefaultMessageListenerContainer">
                            		<property name="concurrentConsumers" value="20"/><!-- if it is not 1 - it is difficult to see distribution to other services!!!change to 50,20 -->
                                        <property name="maxConcurrentConsumers" value="50"/><!-- if it is not 1 - it is difficult to see distribution to other services!!!change to 50-->
                                        <property name="idleTaskExecutionLimit" value="10"/>
                            		 <property name="connectionFactory" ref="cachingConnectionFactoryConsumer" />
                            		<property name="destination" ref="receiveQueue" />
                            		<property name="messageListener" ref="clientListener" />
                            		<property name="cacheLevel" value="3"/> <!-- CACHE_CONSUMER: cache a shared JMS Connection, a JMS Session, and a JMS MessageConsumer for each listener thread. -->
                            		<property name="sessionAcknowledgeMode" value="1"/>
                            	</bean>
                            	
                            	<bean id="receiveQueue" class="org.apache.activemq.command.ActiveMQQueue">
                                    <constructor-arg value="queueA"/>
                                </bean>
                                <bean id="sinkQueue" class="org.apache.activemq.command.ActiveMQQueue">
                                    <constructor-arg value="sinkQueue"/>
                                </bean>
                                
                                <bean id="connectionFactoryConsumer" 
                            		class="org.apache.activemq.ActiveMQConnectionFactory">
                            		 <property name="brokerURL" value="failover:(tcp://localhost:61616)?randomize=false" />    <!-- Single broker-->
                            	</bean>
                            	
                                <bean id="singleConnectionFactoryConsumer" class="org.springframework.jms.connection.SingleConnectionFactory">
                                	 <property name="targetConnectionFactory"
                                		ref="connectionFactoryConsumer" />
                                		<property name="reconnectOnException" value="true"/>
                                </bean>
                                
                                 <bean id="cachingConnectionFactoryConsumer"  
                            		class="org.springframework.jms.connection.CachingConnectionFactory">
                            		<!-- <constructor-arg ref="singleConnectionFactoryConsumer"/> -->
                            		<constructor-arg ref="connectionFactoryConsumer"/>
                            		<property name="sessionCacheSize" value="100" /> 
                            	</bean>  
                            	<!--<bean id="jmsTransactionManagerConsumer" class="org.springframework.jms.connection.JmsTransactionManager">
                            		<property name="connectionFactory" ref="connectionFactoryConsumer" />
                            	</bean>-->
                                <!-- **********************  END-Consumer *****************************	-->
                                
                            	
                            	<!-- ********************** Producer ********************************* --> 
                            	<bean id="clientSender" class="example.ClientSend">
                            		 <property name="sendQueue" ref="sendQueue" /> 
                            		 <property name="jmsTemplate" ref="jmsTemplate"/>  
                            	</bean>
                            	
                            	 <bean id="sendQueue" class="org.apache.activemq.command.ActiveMQQueue">
                                    <constructor-arg value="queueB"/>
                                </bean> 
                                
                                 <!-- for send response on temp q-->
                                <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate" lazy-init="true">
                                     <property name="connectionFactory" ref="cachingConnectionFactoryProducer"/>
                                     <property name="deliveryMode" value="1"/>  
                                 </bean>
                            	
                            	  <bean id="connectionFactoryProducer" 
                            		class="org.apache.activemq.ActiveMQConnectionFactory">
                            		 <property name="brokerURL" value="failover:(tcp://localhost:61616)?randomize=false" />    <!-- Single broker-->
                            	</bean>
                            	
                            	<bean id="cachingConnectionFactoryProducer"  
                            		class="org.springframework.jms.connection.CachingConnectionFactory">
                            		<constructor-arg ref="connectionFactoryProducer"/>
                            		<property name="sessionCacheSize" value="100" /> 
                            	</bean>   
                            	
                            	<!-- ********************** END Producer ********************************* -->

                            Comment


                            • #15
                              Did more or less the same.

                              Sending 5K messages with a plain producer takes ~1 second less as compared with jmsTemplate. This is without connection sharing between the listener container and template. When I enable that jmsTemplate takes 3 times as long as a plain version but that is probably due to the multithreading and contention for the pool/caching mechanism.

                              When using the cachingConnectionFactory you don't need to additionally set the cache level, caching is now handled by the connectionfactory and its related classes so this doesn't impact performance (at least not noticably I suspect).

                              Also tried playing around with (NON)Persistent but that appears to be changing nothing for the JmsTemplate it has a great impact ont he plain client when setting it to Persistent.

                              Edit: Next to setting the delivery mode you also need to enable explicitQosEnabled on the JmsTemplate (forgot about that ).
                              Last edited by Marten Deinum; Dec 6th, 2012, 06:43 AM.

                              Comment

                              Working...
                              X