Announcement Announcement Module
Collapse
No announcement yet.
Memory increasing indefinitely Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Memory increasing indefinitely

    Hi,

    I use a UDP inbound channel adapter to receive UDP datagrams. I know there is one thread that listens for UDP datagrams and that this thread delegates the processing to other threads (by default, to the 5 threads present in the pool).

    I've written a simple application that sends UDP datagrams to a certain IP/Port at a high rate (the meanSendCount attribute reaches 480 messages/second). I've tried two different message channels: queue based and publish/subscribe.

    I've profiled my application and I see that in the two cases, the thread that listens from the network (i.e. the one that's called task-scheduler-1) is always running. But the 5 threads that processes the incoming messages are not always running.

    I'm trying to reach high performance (i.e. to be able to support a high rate of incoming UDP datagrams). In the queue-based model, the queue doesn't stop growing which is not acceptable. In the publish/subscribe model, I just see that the memory increases a lot (and I guess that if I leave the application running for a long time, I would see my heap size increasing "indefinitely").

    Which parameters should I play with in order to increase performance?

    Thanks,
    Mickael

  • #2
    You can increase the pool-size, or even inject a custom-configured task executor with whatever properties you want.

    Comment


    • #3
      For the queue scenario, I see that the threads named UDP-Incoming-Msg-Handler only put the datagrams received from the thread named task-scheduler-1 into the queue. I also see that the thread named task-scheduler-10 is the one that receives the messages present in the queue. But, concerning the other task-scheduler threads (2 to 9), they are all waiting and never run (I see that with VisualVM). I expected these threads to poll the queue to receive the messages present inside it but it seems it doesn't happen.

      How it it related to the pool-size? It seems related as you said to the default taskExecutor used but I expected the default task executor to behave differently.

      Any idea?

      EDIT: The thread named task-scheduler-1 is always running as I explained in my message. Concerning the other always running thread, it is not always task-scheduler-10 but it may be another one from the pool. But, as I said, only two threads are always running, all the others task-scheduler threads are always waiting.
      Last edited by Miko95; Jun 14th, 2012, 03:45 PM.

      Comment


      • #4
        Can you attach your config and a debug log? What you are saying doesn't make sense; all we do is hand over to the task executor's thread pool via executor.execute()

        Comment


        • #5
          My application context:

          Code:
          <int-ip:udp-inbound-channel-adapter
          		id="udbInboundAdapter" channel="receiveChannel" lookup-host="false"
          		local-address="${listeningInterface}" port="${listeningPort}"
          		receive-buffer-size="2048" multicast="false"
          		check-length="false" pool-size="5" />
          
          <int:channel id="receiveChannel">
          	<int:queue capacity="100000"/>
          </int:channel>
          
          <int:chain input-channel="receiveChannel">
          	<int:filter ref="myFilter" />
          	<int:transformer ref="messageTransformer" />
          	<int:service-activator ref="updateDatabaseService"
          			method="processMessage" />
          	<int:poller fixed-delay="${pollTime}" time-unit="MILLISECONDS" />
          </int:chain>
          Thanks

          Comment


          • #6
            You shouldn't use a <queue/> channel immediately downstream of the udp inbound adapter.

            Because of the nature of UDP, we always do an asynch handoff within the adapter so the main thread can get back to reading packets ASAP.

            By doing another asynch handoff (in the QueueChannel), you are funneling the work to just one thread; the internal asynch handoff is very short lived - just runs to insert into the queue; and then the poller thread picks up the work.

            Just remove the <int:queue .../> sub element in receiveChannel.

            After doing that, you can control the concurrency via the adapter's pool-size attribute.

            Comment


            • #7
              Thanks Gary, it solves the issue.

              But now, I have another issue, I use a ThreadPoolTaskExecutor (that I define in the UDP inbound channel adapter) and after some time, I see that incoming tasks are rejected. At first, only the corePoolSize threads are used. After some time, it seems that the queue is full, and new threads are created until maxPoolSize threads are used, and then I get the TaskRejected errors.

              Is there a way to monitor the ThreadPoolTaskExecutor using JMX? I know that I can export it as a MBeans but it doesn't help me since I can't access the internal ThreadPoolExecutor to monitor the status of the queue.

              Is it possible that whatever values I use for corePoolSize, maxPoolSize and queueCapacity, after a certain time (that may be very long), I'll receive TaskRejected errors? I'm suspecting that the arrival rate of tasks is greater than my service rate. What should I do?

              Comment


              • #8
                The "internal" task executor uses the default queue size (Integer.MAX_VALUE)...

                Code:
                    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
                        return new ThreadPoolExecutor(nThreads, nThreads,
                                                      0L, TimeUnit.MILLISECONDS,
                                                      new LinkedBlockingQueue<Runnable>(),
                                                      threadFactory);
                    }
                
                    public LinkedBlockingQueue() {
                        this(Integer.MAX_VALUE);
                    }
                Regarding JMX, just define your own Executor and inject it into the UDP adapter using the task-executor attribute. Then it will use that instead of creating an internal one.

                Comment


                • #9
                  Originally posted by Gary Russell View Post
                  Regarding JMX, just define your own Executor and inject it into the UDP adapter using the task-executor attribute. Then it will use that instead of creating an internal one.
                  I use a custom task executor in a publish/subscribe channel. But I can't see it using JMX.

                  Comment


                  • #10
                    You will need to export it with an MBean exporter.

                    Comment

                    Working...
                    X