Announcement Announcement Module
Collapse
No announcement yet.
Message properties contains java.io.DataInputStream Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Message properties contains java.io.DataInputStream

    RabbitMQ version 3
    org.springframework.amqp 1.0.0.RELEASE


    Hi, I am new to spring and amqp and I have an error in a project.

    First here is the code that puts the value in the org.springframework.amqp.core.MessageProperties

    Code:
     public final void sendClientUpdates(final String queueName, final String textMessage,
                                            final Object updateIds, final String replyUrl) {
            MessageProperties messageProperties = new MessageProperties();
            ByteArrayOutputStream output = new ByteArrayOutputStream();
            XMLEncoder encoder = new XMLEncoder(output);
            encoder.writeObject(updateIds);
            try {
                encoder.close();
                output.close();
            } catch (IOException ex) {
                logger.warn("", ex);
            }
            messageProperties.setHeader(UPDATE_ID_PROPERTY, output.toString());

    When reading the message from the queue

    Code:
    public void onMessage(final Message message) {
            Map<String, Object> headers = message.getMessageProperties().getHeaders();
            XMLDecoder decoder = new XMLDecoder(
                        new ByteArrayInputStream(((String) headers.get(UPDATE_ID_PROPERTY)).getBytes()));
    I get a
    java.lang.ClassCastException: java.io.DataInputStream cannot be cast to java.lang.String

    Suddenly UPDATE_ID_PROPERTY is a java.io.DataInputStream.

    This does not happen all the time... but very often.

    The UPDATE_ID_PROPERTY would normally look like this

    UPDATE_ID_PROPERTY : <?xml version="1.0" encoding="UTF-8"?>
    <java version="1.7.0_13" class="java.beans.XMLDecoder">
    <object class="java.util.ArrayList">
    <void method="add">
    <long>162</long>
    </void>
    </object>
    </java>

  • #2
    Please provide a full stack trace for the exception as well as a DEBUG log that includes the message delivery (and exception) if possible.

    Also, spring-amqp 1.0.0 is very old; please update to 1.1.4.RELEASE.

    Comment


    • #3
      The error is in our live environment.. but so far I haven't been able to reproduce it a test environment.
      Here is the full stacktrace

      [ERROR] 2013-03-08 16:10:36,972 [com.farheap.jsi.mdp.MessageListenerJSI3] - Unknown error while sending updates from message (Body:'[B@54e0f629(byte[12156])'; ID:null; Content:application/octet-stream; Headers:{ENDPOINT_URL=https://xxxxx/php/classes/JSI3Receiver.class.php, UPDATE_ID=java.io.DataInputStream@7d412947}; Exchange:; RoutingKey:jsi.to.external; Reply:null; DeliveryMode:PERSISTENT; DeliveryTag:118)
      java.lang.ClassCastException: java.io.DataInputStream cannot be cast to java.lang.String
      ********at com.farheap.jsi.mdp.MessageListenerJSI3.onMessage( MessageListenerJSI3.java:74) ~[jsi-webservice-3.jar:na]
      ********at com.farheap.jsi.mdp.MessageListenerJSI3$$FastClass ByCGLIB$$91448113.invoke(<generated>) [cglib-nodep-2.1_3.jar:na]
      ********at net.sf.cglib.proxy.MethodProxy.invoke(MethodProxy. java:149) [cglib-nodep-2.1_3.jar:na]
      ********at org.springframework.aop.framework.Cglib2AopProxy$C glibMethodInvocation.invokeJoinpoint(Cglib2AopProx y.java:688) [spring-aop-3.0.5.RELEASE.jar:3.0.5.RELEASE]
      ********at org.springframework.aop.framework.ReflectiveMethod Invocation.proceed(ReflectiveMethodInvocation.java :150) [spring-aop-3.0.5.RELEASE.jar:3.0.5.RELEASE]
      ********at org.springframework.transaction.interceptor.Transa ctionInterceptor.invoke(TransactionInterceptor.jav a:110) [spring-tx-3.0.5.RELEASE.jar:3.0.5.RELEASE]
      ********at org.springframework.aop.framework.ReflectiveMethod Invocation.proceed(ReflectiveMethodInvocation.java :161) [spring-aop-3.0.5.RELEASE.jar:3.0.5.RELEASE]
      ********at org.springframework.transaction.interceptor.Transa ctionInterceptor.invoke(TransactionInterceptor.jav a:110) [spring-tx-3.0.5.RELEASE.jar:3.0.5.RELEASE]
      ********at org.springframework.aop.framework.ReflectiveMethod Invocation.proceed(ReflectiveMethodInvocation.java :161) [spring-aop-3.0.5.RELEASE.jar:3.0.5.RELEASE]
      ********at org.springframework.aop.interceptor.ExposeInvocati onInterceptor.invoke(ExposeInvocationInterceptor.j ava:89) [spring-aop-3.0.5.RELEASE.jar:3.0.5.RELEASE]
      ********at org.springframework.aop.framework.ReflectiveMethod Invocation.proceed(ReflectiveMethodInvocation.java :172) [spring-aop-3.0.5.RELEASE.jar:3.0.5.RELEASE]
      ********at org.springframework.aop.framework.Cglib2AopProxy$D ynamicAdvisedInterceptor.intercept(Cglib2AopProxy. java:621) [spring-aop-3.0.5.RELEASE.jar:3.0.5.RELEASE]
      ********at com.farheap.jsi.mdp.MessageListenerJSI3$$EnhancerB yCGLIB$$63a53782.onMessage(<generated>) [cglib-nodep-2.1_3.jar:na]
      ********at org.springframework.amqp.rabbit.listener.adapter.M essageListenerAdapter.onMessage(MessageListenerAda pter.java:328) [spring-rabbit-1.0.0.RELEASE.jar:na]
      ********at org.springframework.amqp.rabbit.listener.AbstractM essageListenerContainer.doInvokeListener(AbstractM essageListenerContainer.java:506) [spring-rabbit-1.0.0.RELEASE.jar:na]
      ********at org.springframework.amqp.rabbit.listener.AbstractM essageListenerContainer.invokeListener(AbstractMes sageListenerContainer.java:470) [spring-rabbit-1.0.0.RELEASE.jar:na]
      ********at org.springframework.amqp.rabbit.listener.SimpleMes sageListenerContainer.access$001(SimpleMessageList enerContainer.java:56) [spring-rabbit-1.0.0.RELEASE.jar:na]
      ********at org.springframework.amqp.rabbit.listener.SimpleMes sageListenerContainer$1.invokeListener(SimpleMessa geListenerContainer.java:103) [spring-rabbit-1.0.0.RELEASE.jar:na]
      ********at org.springframework.amqp.rabbit.listener.SimpleMes sageListenerContainer.invokeListener(SimpleMessage ListenerContainer.java:560) [spring-rabbit-1.0.0.RELEASE.jar:na]
      ********at org.springframework.amqp.rabbit.listener.AbstractM essageListenerContainer.executeListener(AbstractMe ssageListenerContainer.java:452) [spring-rabbit-1.0.0.RELEASE.jar:na]
      ********at org.springframework.amqp.rabbit.listener.SimpleMes sageListenerContainer.doReceiveAndExecute(SimpleMe ssageListenerContainer.java:436) [spring-rabbit-1.0.0.RELEASE.jar:na]
      ********at org.springframework.amqp.rabbit.listener.SimpleMes sageListenerContainer.receiveAndExecute(SimpleMess ageListenerContainer.java:420) [spring-rabbit-1.0.0.RELEASE.jar:na]
      ********at org.springframework.amqp.rabbit.listener.SimpleMes sageListenerContainer.access$200(SimpleMessageList enerContainer.java:56) [spring-rabbit-1.0.0.RELEASE.jar:na]
      ********at org.springframework.amqp.rabbit.listener.SimpleMes sageListenerContainer$AsyncMessageProcessingConsum er.run(SimpleMessageListenerContainer.java:505) [spring-rabbit-1.0.0.RELEASE.jar:na]
      ********at java.lang.Thread.run(Thread.java:662) [na:1.6.0_26]

      Comment


      • #4
        After some investigation, I see the MessagePropertiesConverter has a threshold for the size of the property...

        Code:
        /**
         * Converts a LongString value to either a String or DataInputStream based on a length-driven threshold. If the
         * length is 1024 bytes or less, a String will be returned, otherwise a DataInputStream is returned.
         */
        private Object convertLongString(LongString longString, String charset) {
        	try {
        		if (longString.length() <= 1024) {
        			return new String(longString.getBytes(), charset);
        		} else {
        			return longString.getStream();
        		}
        	} catch (Exception e) {
        		throw RabbitUtils.convertRabbitAccessException(e);
        	}
        }
        So, a quick fix would be detecting the type in your listener and calling getBytes() on the DataInputStream.

        The threshold (1024) perhaps should be configurable; if you feel strongly please open up an 'Improvement' JIRA issue.

        We should at least document this in the reference manual.
        Last edited by Gary Russell; Mar 9th, 2013, 06:13 PM.

        Comment


        • #5
          Updating to 1.1.4 didn't help.

          I fixed the problem by doing this :

          MessageProperties messageProperties = new MessageProperties();
          StringBuilder builder = new StringBuilder();
          for (Iterator<Long> iterator = updateIds.iterator(); iterator.hasNext() {
          Long long1 = (Long) iterator.next();
          builder.append(long1).append(";");
          }
          messageProperties.setHeader(MessageListenerJSI3.UP DATE_ID_PROPERTY, builder.toString());
          and

          Map<String, Object> headers = message.getMessageProperties().getHeaders();

          try {
          String sEndpointUrl = (String) headers.get(ENDPOINT_URL_PROPERTY);
          updateIds = new ArrayList<Long>();
          String idsStr = (String) headers.get(UPDATE_ID_PROPERTY);
          StringTokenizer stringTokenizer = new StringTokenizer(idsStr, ";");
          while(stringTokenizer.hasMoreTokens()){
          String nextElement = (String) stringTokenizer.nextElement();
          updateIds.add(Long.parseLong(nextElement));
          }

          Comment


          • #6
            Oh I see Gary.. that's the reason..

            So my code fix will actually not really fix the problem. It could still happen.

            I made this change to handle DataInputStream

            Map<String, Object> headers = message.getMessageProperties().getHeaders();
            String idsStr = "";
            try {
            String sEndpointUrl = (String) headers.get(ENDPOINT_URL_PROPERTY);
            updateIds = new ArrayList<Long>();
            Object object = headers.get(UPDATE_ID_PROPERTY);
            if (object instanceof DataInputStream) {
            DataInputStream dataInputStream = (DataInputStream) object;
            idsStr = IOUtils.toString(dataInputStream);
            } else {
            idsStr = (String) headers.get(UPDATE_ID_PROPERTY);
            }
            Last edited by klind; Mar 9th, 2013, 07:13 PM.

            Comment

            Working...
            X