Announcement Announcement Module
Collapse
No announcement yet.
RabbitMQ multiple consumers using HttpClient not keeping state Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • RabbitMQ multiple consumers using HttpClient not keeping state

    Hello,

    I think I have got myself into a muddle and would appreciate being set straight.

    I have a message consumer with a pojo as the handler. I have a SimpleMessageListenerContainer which I extend to start 5 consumers. (When I used 1 consumer, the messages where being processed sequentially which was not acceptable)

    MessageConsumer:
    Code:
    public void setContainer(MessageConsumerSimpleMessageListenerContainer container) {
    	this.container = container;
    	this.container.setConnectionFactory(getFactory());
    	this.container.setQueueNames(getQueueName());
    	this.container.setConcurrentConsumers(this.numOfConsumer);
    	this.container.setMessageListener(new MessageListenerAdapter(getHandler()));
    }
    Code:
    public void startConsumers() {
    	try {
    		getContainer().startConsumers();
    	} catch (Exception e) {
    		e.printStackTrace();
    	}
    }
    With it just consuming the message - no issues. However the handler has to make a couple of http calls for each message that comes in (authentication, POST, GET, GET). Say I have 20 messages come in that should ultimately produce 20 reports (this is for Jasper (though I not displaying the reports - just running and getting file back)) I loose a couple and work that should occur (update a mongo record when report generated) is haphazard.

    So I was getting the apache Invalid use of SingleClientConnManager: connection still allocated error when I switched to 5 consumers. Read up on it and implemented the following:

    Code:
    <bean id="httpClient" class="org.apache.http.impl.client.DefaultHttpClient">
    	<constructor-arg index="0">
    		<bean class="org.apache.http.impl.conn.PoolingClientConnectionManager">
    			<property name="maxTotal" value="100" />
    		</bean>
    	</constructor-arg>
    </bean>
    and
    Code:
    <bean id="reportConsumerHandler" class="com.xxx.report.jasper.ReportConsumerHandler">
    		<constructor-arg ref="httpClient" />
    		<property name="reportDao" ref="reportDocumentDaoImpl" />
    		<property name="fileDao" ref="reportFileDaoImpl" />
    		<property name="reportEmailProducer" ref="reportEmailProducer" />
    		<property name="messageGenerator" ref="messageGenerator" />
    </bean>
    <bean id="messageGenerator" class="com.k12foodservice.report.MessageGenerator">
    		<constructor-arg ref="k12gsonBuilder"/>
    </bean>
    	
    <bean id="k12gsonBuilder" class="com.google.gson.GsonBuilder">
    		<property name="dateFormat" value="yyyy-MM-dd" />
    </bean>
    The Google stuff was for an error message being generated when I switched to PoolingClientConnectionManager. As I understand this I am passing in a single httpClient instance to my consumer handler and ensuring state won't be lost.

    My handler constructor:
    Code:
    ReportConsumerHandler(DefaultHttpClient httpClient) {
    	this.httpClient = httpClient;
    	HttpParams params = this.httpClient.getParams();
    	HttpConnectionParams.setConnectionTimeout(params, 600000);
    	HttpConnectionParams.setSoTimeout(params, 600000);
    	this.cookieStore = new BasicCookieStore();
    	this.httpContext = new BasicHttpContext();
    	httpContext.setAttribute(ClientContext.COOKIE_STORE, cookieStore);
    }
    and the handleMessage:

    Code:
    public void handleMessage(byte[] data) {
    ....
    try {
    ....
    } finally {
    	try {
    		if (httpRes.getEntity().getContent().available() != 0) {
    			httpRes.getEntity().getContent();
    		}
    	} catch (IllegalStateException e) {
    				logger.error("handleMessage() IllegalStateException Exception error");
    	} catch (IOException e) {
    			logger.error("handleMessage() IOException Exception error");
    	}
    	httpClient.getConnectionManager().closeExpiredConnections();		
    }
    The http Request code:
    Code:
    public HttpResponse sendRequest(HttpRequestBase req, String service, List<NameValuePair> qparams) throws Exception {
    	URI uri = createURI(service, qparams, false);
    	req.setURI(uri);
    	logger.info("sending Request. url: " + uri.toString() + " req verb: " + req.getMethod());
    	httpRes = httpClient.execute(req, httpContext);
    	logger.info(id + ". response status line: " + httpRes.getStatusLine());
    	return httpRes;
    }
    Tomcat7 logs shows running asynchronously but sending 20 reports that individually all run without issue, don't when sent all together (20 individual messages).

    Not sure how to fix... Please advise and thanks for looking.
Working...
X