Announcement Announcement Module
Collapse
No announcement yet.
performance issue with amqp adapters,aggregator,spring integration Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • performance issue with amqp adapters,aggregator,spring integration

    Hi,

    I have task, where i need to come up with an API for making aggregated service calls. i.e my clients will need an api, where they will call multiple services concurrently, Basically they will let me know the list of services they want to call and provide the argument for each service, I will need to make concurrent calls to each of the service and provide an aggregated result to them.

    Basically my clients makes a blocking call to my api, my aggregated service handler, makes concurrent calls and returns the consolidated result.

    As part of my solution, I am using scatter gather pattern of EAI.

    I create a message out of the list of arguments and use a splitter down stream to split this into multiple messages, and use an amp outbound gateway to place the messages to appropriate rabbit queues ( exchange,binding key are discovered with help of service name from a service registry).

    I have a dedicated AMQP queue where all the reply messages will be sent. I use an AMQP inbound adapter to listen to this queue, and use a aggregator aggregate the message, and then use my custom router ( extended from header value router, uses its own channel resolver instead of the app context file). and route the message back to the message handler from where the message originated.

    With this setup, i am able to achieve my requirement, but, i did some performance bench marking and found that my implementation is not able to scale very well.

    My environment

    linux :
    processor :12 cores (24 with hyper threading).



    When I have 1 client firing 10,000 requests of various group consisting of ( 1,2,5,10,15,20,30,50 and 45)requests. Each request is of size 400bytes.
    I see the system is able to handle groups upto size 45 well , but the performance slows down significantly for a group of size 50.

    I have 10 server threads listening the the request queue to serve these requests.

    I see that the avg response time for 10,000 requests increases gradually with increase in group size unto 45. with group size of 50, i am not able to handle 10000 request .I am getting java.lang.OutOfMemoryError: GC overhead limit exceeded exception.

    Repeated the test for 4k payload,and found very similar results.

    repeated the test with 10 clients instead of 1 and found that with10 clients firing 1000 requests each , i was not able to handle a group of size 30 ( runs well for group size =20).


    Please find attached the excel for various values of clients, servers threads, listeners threads,group size and payload sizes.


    I also found that my resident memory (not heap) was huge and grew unto 6.1 gb before throwing the outofMemory error.
    I am not sure if there is a memory leak some where.


    I also saw NIO in one of the stack traces, Does amp inbound/outbound adapters uses nio? and could the huge memory footprint (6.1g) be because of the NIOs ?

    Answer to any of these questions will be of much help :

    -Please let me know if there is some thing fundamentally incorrect with my design?
    - Any known performance issues with aggregator ,amp inbound/outbound adapters, rabbit java client api
    - Any alternate design which i can consider.

  • #2
    Which MessageStore implementation are you using?

    Comment


    • #3
      I am using the default message store of the aggregator which means it is using SimpleMessageStore of spring integration.

      Comment


      • #4
        The default message group store stores the messages in memory. You can look at using some alternate message group store like say the JDBC message group store. You can find the details here

        Comment


        • #5
          Thanks Amol, i will try this out. But what i also noticed is, my heap groups very large. I have lot of full gc happening ( full gc's run up 2 to 5 secs). And after a point in time, all my threads are in blocked state. Any idea what the issue could be?

          Comment


          • #6
            That would be difficult to conclude here. Chances are bright it is due to the default MessageGroupStore implementation you are using. That's the reason i suggested you use a different MessageGroupStore and then check if you still face the heap issue.
            Spring integration has a couple of key value pair MessageGroupStore implementation, a JDBC and Mongo DB implementation. You can use any of those or implement your own (which shouldn't store in memory, at least the heap memory )
            Try this out and let us know your test results.

            Comment


            • #7
              Thanks Amol, The im memory (simple message store) looks like the culprit, I used jprofiler to profile the heap dump and AggregatingMessageHandler to be the biggest object using around 96% of memory.

              I am not using a jdbc message store with embedded derby db.

              But i am having new set of problems now.

              Looks like amqp messages cannot persisted in message stores because of com.rabbitmq.client.impl.LongStringHelper.ByteArra yLongString not being serializable object.
              I am getting the
              org.springframework.core.serializer.support.Serial izationFailedException: Failed to serialize object using DefaultSerializer; nested exception is java.io.NotSerializableException: com.rabbitmq.client.impl.LongStringHelper$ByteArra yLongString
              exception while adding message to jdbc store.
              Find attached complete stack trace.

              Looks like ampq-inbound-adapter converts Strings inside an ArrayList to .LongStringHelper$ByteArrayLongString .

              In my case the sequencedetails header seems to have the corrlation id as an LongStringHelper$ByteArrayLongString object(don know why!).
              Since this is not a serializable object, jdbcmessage store is not accepting the Message and throwing SerializationFailedException exception.

              Itried adding my own header mapper and modified the sequeuncedetails header to contain a string object instead of LongStringHelper$ByteArrayLongString, but this as well did not help.

              I am getting the same exception inspite of that.

              I am using spring-integration-2.1.0M2, which uses amqp-client-2.5.0.jar and spring-amqp-1.0.0.RELEASE.jar internally.

              Refered : https://jira.springsource.org/browse/AMQP-170 not sure if this is related.

              Regards,
              Madhu

              Comment


              • #8
                Here is the stack trace

                ERROR [org.springframework.integration.handler.LoggingHan dler] - <org.springframework.integration.MessageHandlingEx ception: error occurred in message handler [org.springframework.integration.aggregator.Aggrega tingMessageHandler#0]
                at org.springframework.integration.handler.AbstractMe ssageHandler.handleMessage(AbstractMessageHandler. java:84)
                at org.springframework.integration.dispatcher.Unicast ingDispatcher.doDispatch(UnicastingDispatcher.java :110)
                at org.springframework.integration.dispatcher.Unicast ingDispatcher.access$000(UnicastingDispatcher.java :51)
                at org.springframework.integration.dispatcher.Unicast ingDispatcher$1.run(UnicastingDispatcher.java:92)
                at org.springframework.integration.util.ErrorHandling TaskExecutor$1.run(ErrorHandlingTaskExecutor.java: 52)
                at java.util.concurrent.ThreadPoolExecutor$Worker.run Task(ThreadPoolExecutor.java:886)
                at java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:908)
                at java.lang.Thread.run(Thread.java:680)
                Caused by: org.springframework.core.serializer.support.Serial izationFailedException: Failed to serialize object using DefaultSerializer; nested exception is java.io.NotSerializableException: com.rabbitmq.client.impl.LongStringHelper$ByteArra yLongString
                at org.springframework.core.serializer.support.Serial izingConverter.convert(SerializingConverter.java:6 5)
                at org.springframework.integration.jdbc.JdbcMessageSt ore.addMessageToGroup(JdbcMessageStore.java:298)
                at org.springframework.integration.aggregator.Abstrac tCorrelatingMessageHandler.store(AbstractCorrelati ngMessageHandler.java:273)
                at org.springframework.integration.aggregator.Abstrac tCorrelatingMessageHandler.handleMessageInternal(A bstractCorrelatingMessageHandler.java:196)
                at org.springframework.integration.handler.AbstractMe ssageHandler.handleMessage(AbstractMessageHandler. java:78)
                ... 7 more
                Caused by: java.io.NotSerializableException: com.rabbitmq.client.impl.LongStringHelper$ByteArra yLongString
                at java.io.ObjectOutputStream.writeObject0(ObjectOutp utStream.java:1164)
                at java.io.ObjectOutputStream.writeObject(ObjectOutpu tStream.java:330)
                at java.util.HashMap.writeObject(HashMap.java:1001)
                at sun.reflect.NativeMethodAccessorImpl.invoke0(Nativ e Method)

                Comment


                • #9
                  Madhu,

                  Could you please try upgrading? These two issues have been resolved:
                  https://jira.springsource.org/browse/INT-2121
                  https://jira.springsource.org/browse/AMQP-170

                  The second one is the "real" issue, but you are using an older version of the spring-amqp library (since it's an earlier version of spring-integration pulling in the dependency transitively).

                  I would recommend trying spring-integration 2.1 RC1 (but RC2 will be out within days, then GA shortly after that). We have resolved a lot of issues since M2, and from our perspective, the more users trying out the RC the better... in case there is anything to catch prior to GA (but time is running out!).

                  Thanks!
                  -Mark

                  Comment


                  • #10
                    Thanks Mark,

                    It worked with RC1.0. I had to create my own header mapper and seth the replyHeaderNames and requestHeaderNames.


                    With JDBCMessageStore (derby) my program do not throw out of memory error,but the performance decreases tremendously(no where compared to in memory message store).

                    Regards,
                    Madhu

                    Comment


                    • #11
                      Would it be possible for you to provide some type of a test harness that shows performance degradation? It would be very helpful

                      Comment

                      Working...
                      X