Announcement Announcement Module
Collapse
No announcement yet.
AMQP init not working against RabbitMQ server behind amazon load balancer Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • AMQP init not working against RabbitMQ server behind amazon load balancer

    Hi,

    I'm successfully running a Spring 3.2 MVC APP with Spring AMQP version 1.1.3.RELEASE to a RabbitMQ Server (AWS EC2 Ubuntu) version 3.0.2 with SSL to port 5671. Queues, exchanges, etc... created fine and listener subscribed to topic queues come up okay and everything works.

    However, when I simply put the RabbitMQ Server behind an Amazon Elastic Load Balancer and point my Spring AMQP client to the load balancer, I see no errors during initialization, but then see the following errors repeat every 20 seconds on the listeners in the Tomcat log:

    19:33:23.469 [threadPoolTaskExecutorPrivate-10] WARN o.s.a.r.l.SimpleMessageListenerContainer - Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: org.springframework.amqp.AmqpIOException: java.io.IOException
    19:33:33.483 [threadPoolTaskExecutorWellKnown-10] WARN o.s.a.r.l.SimpleMessageListenerContainer - Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: org.springframework.amqp.AmqpIOException: java.io.IOException


    Note that the Queues, Exchanges are not created on the RabbitMQ, so the init is failing, but I see no error indications in the Tomcat Log until the listeners try to connect.

    Here's my AMQP code:

    Code:
        @Bean
        public ConnectionFactory connectionFactory() {
    	final com.rabbitmq.client.ConnectionFactory connectionFactory = new com.rabbitmq.client.ConnectionFactory();
    	connectionFactory.setUsername(this.queueServerUsername);
    	connectionFactory.setPassword(this.queueServerPassword);
    	connectionFactory.setPort(this.queueServerPort);
    	connectionFactory.setHost(this.queueServerAddr);
    	connectionFactory.setRequestedHeartbeat(30);
    
    	if (this.sslOn.equals("true")) {
    	    connectionFactory.setPort(this.queueServerSslPort);
    	    try {
    		final KeyStore tks = KeyStore.getInstance("JKS");
    		final InputStream certsInputStream = this.getClass().getResourceAsStream(this.keyStore);
    		tks.load(certsInputStream, this.keyStorePassword.toCharArray());
    
    		// Set up key manager factory to use our key store final
    		final KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
    		kmf.init(tks, this.keyStorePassword.toCharArray());
    		final TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509");
    		tmf.init(tks);
    		final SSLContext c = SSLContext.getInstance("SSLv3");
    		c.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
    		connectionFactory.useSslProtocol(c);
    	    } catch (final Exception e) {
    		e.printStackTrace();
    	    }
    
    	}
    	final CachingConnectionFactory ccf = new CachingConnectionFactory(connectionFactory);
    	return ccf;
        }
    
        @Bean
        public AmqpAdmin amqpAdmin() {
    	return new RabbitAdmin(this.connectionFactory());
        }
    
        @Bean
        public RabbitTemplate rabbitTemplate() {
    	final RabbitTemplate template = new RabbitTemplate(this.connectionFactory());
    	template.setRoutingKey(RabbitMqConfiguration.this.wellKnownQueue);
    	template.setQueue(RabbitMqConfiguration.this.wellKnownQueue);
    	return template;
        }
    
        @Bean
        public TopicExchange applicationWellKnownExchange() {
    	return new TopicExchange(RabbitMqConfiguration.this.wellKnownQueue);
        }
    
        @Bean
        public TopicExchange applicationPrivateExchange() {
    	return new TopicExchange(RabbitMqConfiguration.this.PrivateQueueName);
        }
    
        @Bean
        public Queue wellKnownQueue() {
    	return new Queue(RabbitMqConfiguration.this.wellKnownQueue);
        }
    
        @Bean
        public Queue privateQueue() {
    	return new Queue(RabbitMqConfiguration.this.PrivateQueueName, true, false, true);
        }
    
        @Bean
        public MessageConverter jsonMessageConverter() {
    	return new JsonMessageConverter();
        }
    
        @Bean
        public SimpleMessageListenerContainer messageListenerContainerWellKnown() {
    	try {
    	    final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(this.connectionFactory());
    	    container.setQueueNames(RabbitMqConfiguration.this.wellKnownQueue);
    	    container.setMessageListener(this.messageListenerWellKnownAdapter());
    	    container.setAcknowledgeMode(AcknowledgeMode.AUTO);
    	    container.setTaskExecutor(this.threadPoolTaskExecutorWellKnown());
    	    return container;
    	} catch (final AmqpIOException e) {
    	    e.printStackTrace();
    	} catch (final Exception e) {
    	    e.printStackTrace();
    	}
    	return null;
        }
    
        @Bean
        public MessageListenerAdapter messageListenerWellKnownAdapter() {
    	final MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(this.applicationClientRestController(), this.jsonMessageConverter());
    	messageListenerAdapter.setDefaultListenerMethod("handleWellKnownQueueMessage");
    	return messageListenerAdapter;
        }
    
        @Bean
        public ThreadPoolTaskExecutor threadPoolTaskExecutorWellKnown() {
    	final ThreadPoolTaskExecutor threadPoolTaskExecutorWellKnown = new ThreadPoolTaskExecutor();
    	threadPoolTaskExecutorWellKnown.setCorePoolSize(this.queueWellknownCorePoolSize);
    	threadPoolTaskExecutorWellKnown.setMaxPoolSize(this.queueWellknownMaxPoolSize);
    	return threadPoolTaskExecutorWellKnown;
        }
    
        @Bean
        public SimpleMessageListenerContainer messageListenerContainerPrivate() {
    	final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(this.connectionFactory());
    	container.setQueueNames(RabbitMqConfiguration.this.PrivateQueueName);
    	container.setMessageListener(this.messageListenerPrivateAdapter());
    	container.setAcknowledgeMode(AcknowledgeMode.AUTO);
    	container.setTaskExecutor(this.threadPoolTaskExecutorPrivate());
    	return container;
        }
    
        @Bean
        public MessageListenerAdapter messageListenerPrivateAdapter() {
    	final MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(this.applicationClientRestController(), this.jsonMessageConverter());
    	messageListenerAdapter.setDefaultListenerMethod("handlePrivateQueueMessage");
    	return messageListenerAdapter;
        }
    
        @Bean
        public ThreadPoolTaskExecutor threadPoolTaskExecutorPrivate() {
    	final ThreadPoolTaskExecutor threadPoolTaskExecutorPrivate = new ThreadPoolTaskExecutor();
    	threadPoolTaskExecutorPrivate.setCorePoolSize(this.queuePrivateCorePoolSize);
    	threadPoolTaskExecutorPrivate.setMaxPoolSize(this.queuePrivateMaxPoolSize);
    	return threadPoolTaskExecutorPrivate;
        }
    
        @Bean
        public ApplicationClientRestController applicationClientRestController() {
    	return new ApplicationClientRestController();
        }
    
        @Bean
        public Binding rabbitMqBinding() {
    	return BindingBuilder.bind(this.wellKnownQueue()).to(this.applicationWellKnownExchange()).with(RabbitMqConfiguration.this.wellKnownQueue);
        }
    
        @Bean
        public Binding rabbitMqBinding2() {
    	return BindingBuilder.bind(this.privateQueue()).to(this.applicationPrivateExchange()).with(RabbitMqConfiguration.this.PrivateQueueName);
        }
    Any ideas why this works fine directly to the RabbitMQ Server but not to a load balancer (same port as the RabbitMQ Server) that has the RabbitMQ Server behind it?

    Thanks,
    Brian
    Last edited by bhmass; Feb 28th, 2013, 01:46 PM.

  • #2
    No ideas, but it's hard to debug stuff like this without "being there".

    One thing that will help is turning on DEBUG logging for category

    Code:
    org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer
    Then, you'll get a full stack trace for the
    org.springframework.amqp.AmqpIOException: java.io.IOException
    in the
    Consumer raised exception
    logs, instead of just the
    Exception summary:

    Comment


    • #3
      Thanks Gary,

      I turned on the debugging and see the details of the listener errors. I won't print them all here but it's a SocketTimeoutException, which I expect since the initialization code never created the queues (I can go to the RabbitMQ Admin console and see no queues, exchanges, etc... not created).

      I guess I would expect to see some errors if the connectionFactory fails to connect and create the queues, exchanges, but I see nothing.

      A simpler question to ask is: Has anyone successfully run Spring AMQP against an AWS Elastic load balancer where the RabbitMQ servers are clustered and behind the load balancer on EC2s? If so, are there any additional Spring AMQP initialization parameters to set? Again, running directly to the RabbitMQ servers works fine, so wondering if Spring AMQP has issue of connecting to RabbitMQ servers indirectly (load balancer is middle-man).

      A colleague has it running okay this way without using Spring AMQP (using rabbitMQ libs directly). That may be my next step.

      Thanks,
      Brian

      Comment


      • #4
        I doubt it has anything to do with the queues not being created; I suspect that's because the RabbitAdmin is having the same problem as the listeners (they all - including the admin - use the same connection). The listeners won't time out if the queue doesn't exist, they will fail to start altogether (after a number of attempts at passive declaration).

        I seriously doubt using the client directly will make any difference in this case - Spring AMQP uses the client under the covers.

        I don't know the answer to your "Has anyone...?".

        I am not personally familiar with that environment, but I would suspect the configuration of the load balancers.

        Comment


        • #5
          Thanks again Gary,

          I have figured it out. Stupid mistake on my part in configuring the AWS Elastic Load Balancer. When configuring listeners for SSL on port 5671, use SSL (Secure TCP), not HTTPS (Secure HTTP) as I had done initially.

          I am happy to report that Spring AMQP works great talking to clustered RabbitMQ servers running on AWS EC2s running behind an AWS Elastic Load Balancer securely (SSL). Simply point your client to the load balancer (and configure the load balancer listener ports properly, DOH!!!).

          Thanks,
          Brian

          Comment

          Working...
          X