Announcement Announcement Module
Collapse
No announcement yet.
Is it possible to give a custom scope to a direct channel? (no subscriber issue) Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Is it possible to give a custom scope to a direct channel? (no subscriber issue)

    Hello,

    I have the following requirement:
    - each simulation must run on a dedicated thread (which must be the same thread for all consecutive simulations with the same id)

    Therefore I have implemented a custom scope (which works fine), called simulation scope. Spring creates an instance per value for that scope. Eg.

    Code:
    @Scope("simulation")
    public class Foo { .. }
    When the value for my scope is set to simulation-1, I'll get an instance of Foo, eg. Foo-1. When the value for scope is set to simulation-2 , it results in a new instance of Foo, eg. Foo-2. When yet another message contains scope simulation-1, the instance Foo-1 is reused. This all works fine.

    Some information about my integration setup:

    My requests contain a simulation-id and a security-token. When a request enters the system, the request is spawned to a new thread. A ThreadLocal containing the security information is attached to the thread. Security is applied. So far, so good.

    After the security check is passed, my requests for a given simulation(depending on the simulation-id in the header), should be executed on the SAME dedicated thread for THAT given simulation-id.

    Eg. simulation-1 messages should always be executed on sim-exec-thread-1, and simulation-2 messages should always be executed on sim-exec-thread-2.

    I have tried to accomplish this by using the following construct:

    Code:
      
    <integration:channel id="singleThreadedSimulationChannel">
      <integration:dispatcher task-executor="dedicatedSingleThreadedTaskExecutor"/>
    </integration:channel>
    
    <!-- poolsize = ! : this fulfills my requirement; messages are moved to the SAME dedicated thread.. 
           However, HOW do I create such a SCOPED dedicated thread.. 
    -->
    <bean id="dedicatedSingleThreadedTaskExecutor"
            class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" scope="simulation">
      <property name="corePoolSize" value="1"/>
      <property name="maxPoolSize" value="1"/>
      <property name="threadNamePrefix" value="Sim-Exec-Thread-"/>
    </bean>
    With the code above, my messages for simulation-2 are handled by the same thread thus the thread for simulation-1. I tried to solve this by setting the scope for singleThreadedSimulationChannel to scope="simulation". However, this results in the dreaded message "dispatcher has no subscribers".

    Note that all my other code eg. ServiceActivators are also scoped to scope="simulation", and they work fine. The only problem I have regarding scope is when I try to move the messages from their individual threads to a dedicated thread (which should be dedicated, per scope, thus one instance per scope eg. one instance for scope-value=simulation-1 and one instance for scope-value=simulation-2)

    Does anyone has a clue how to solve this?
    Thanks in advance,
    David

  • #2
    One step closer to a solution

    I've managed to find a partial solution. I now have an instance of my serviceActivator(EchoService) per simulation. But I am still searching for a way to have a DEDICATED thread per simulation.

    Code:
    @Component
    @Scope( "game" )
    public class ScopedThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
      
      private static final int CORE_POOL_SIZE = 1;
      private static final int MAX_POOL_SIZE = 1;
    
      public ScopedThreadPoolTaskExecutor() {
        super();
        setCorePoolSize( CORE_POOL_SIZE );
        setMaxPoolSize( MAX_POOL_SIZE );
      }
    }
    and

    Code:
    <integration:channel id="singleThreadedSimulationChannel">
      <integration:dispatcher task-executor="scopedThreadPoolTaskExecutor"/>
    </integration:channel>
    solved the problem.

    For completeness of the example; this is my serviceActivator definition
    Code:
    <integration:service-activator input-channel="singleThreadedSimulationChannel" output-channel="outChannel"
                                     ref="echoService"/>
    There where my previous solution always pointed to the same instance of my serviceActivator(EchoService), the new solution points to an instance per simulation.

    Eg. log output before changes:
    Code:
    DEBUG[Sim-Exec-Thread-1] org.acme.EchoService - ## instance = org.acme.EchoService@150382b5
    DEBUG[Sim-Exec-Thread-1] org.acme.EchoService - ## message = Message for simulation 2
    
    DEBUG[Sim-Exec-Thread-1] org.acme.EchoService - ## instance = org.acme.EchoService@150382b5
    DEBUG[Sim-Exec-Thread-1] org.acme.EchoService - ## message = Mensaje para simulation 3
    
    DEBUG[Sim-Exec-Thread-1] org.acme.EchoService - ## instance = org.acme.EchoService@150382b5
    DEBUG[Sim-Exec-Thread-1] org.acme.EchoService - ## message = Mensaje 2 para simulation 3
    
    DEBUG[Sim-Exec-Thread-1] org.acme.EchoService - ## instance = org.acme.EchoService@150382b5
    DEBUG[Sim-Exec-Thread-1] org.acme.EchoService - ## message = Other message for simulation 2
    vs. log output after changes:
    Code:
    DEBUG[Sim-Exec-Thread-1] org.acme.EchoService - ## instance = org.acme.EchoService@702b4b78
    DEBUG[Sim-Exec-Thread-1] org.acme.EchoService - ## message = Message for simulation 2
    
    DEBUG[Sim-Exec-Thread-1] org.acme.EchoService - ## instance = org.acme.EchoService@7abbf606
    DEBUG[Sim-Exec-Thread-1] org.acme.EchoService - ## message = Mensaje para simulation 3
    
    DEBUG[Sim-Exec-Thread-1] org.acme.EchoService - ## instance = org.acme.EchoService@7abbf606
    DEBUG[Sim-Exec-Thread-1] org.acme.EchoService - ## message = Mensaje 2 para simulation 3
    
    DEBUG[Sim-Exec-Thread-1] org.acme.EchoService - ## instance = org.acme.EchoService@702b4b78
    DEBUG[Sim-Exec-Thread-1] org.acme.EchoService - ## message = Other message for simulation 2
    They are still all on Sim-Exec-Thread-1, but other instances are handling the messages, based on their simulation-id. Note that I can verify this because the EchoService also writes the message in the database for the given simulation. This way, I can assure myself that the code works as desired.

    However, one problem remains: HOW do I move execution of the messages to a DEDICATED thread per simulation?

    Comment


    • #3
      Issue fixed and closed

      Adding some randomness to my code, clarified that different threads are used. It seems that, for every instance, my ThreadPoolExecutor bean starts counting from zero.

      Thus:

      instance1 spawns a new thread, the first and only for this this instance, with name Sim-Exec-Thread-1.
      instance2 spawns a new thread the first and only for this this instance, with name Sim-Exec-Thread-1.

      While they share the same name, they are not the same threads.

      Problem solved.

      Comment


      • #4
        I was just thinking about this and there is probably a simpler solution we can do within the framework itself.
        Would you mind opening a JIRA https://jira.springsource.org/browse/INT

        Comment


        • #5
          Done, see https://jira.springsource.org/browse/INT-2654

          Comment


          • #6
            Did you try adding the "scope" attribute to the channel itself? It seems like that fits your use-case (and your question).

            Comment


            • #7
              Yes Marc, I did. It resulted in a "dispatcher has no subscribers" error. Sidenote: When the application starts, all beans are instantiated in an invalid scope, which I defaulted to SCOPE_NO_SIMULATION_STARTED. Then, when my first simulation starts, a message enters the system, with a simulation-id of eg SIM-1. This instantiates the beans in the correct scope. I think it might be related to the fact that the beans that get created when the application context is starting, are not used throughout the application because they are in a scope which is never used.

              Comment


              • #8
                I don't fully understand the reason for creating the "invalid scope". Are you using aop:scoped-proxy? (the idea is that a proxy exists which can lazily be "filled" when the necessary scope is active). As long as you are sending Messages only after a scope is activated, you should be able to satisfy your use-case with the "scope" attribute on a channel.

                Comment


                • #9
                  I don't use scoped-proxy. We (ab)use the "invalid scope" for some integration tests, but not in production code.

                  Comment


                  • #10
                    David

                    Sorry, I am just getting to this issue and i have a question.
                    Would it be true to say that your message caries or may carry some unique data that can be used to determine which thread to use?

                    Comment


                    • #11
                      Oleg,

                      That's correct. The message carries a header containing a unique piece of data which can be used to determine the destination thread.

                      Comment


                      • #12
                        Thanks, and the other question is are you storing something in thread local? I mean can the same thread be shared by more then one Messages, as long as its the same thread?

                        Comment


                        • #13
                          The only thing I store is an InheritableThreadLocal which contains the same value for all Messages which are processed by the same thread.

                          An example. Suppose we have incoming messages on different working threads. On the worker thread, I create a threadlocal containing the value for the 'destination' header ( I do this because we apply a security check on these threads, and the security setup is related to the 'destination'.)

                          All messages on the worker threads are then routed to their single 'dedicated thread', the threadlocal variable is passed to that dedicated thread, but because the routing mechanism only routes threads with the same 'destination' to the dedicated thread, it actually stays the same. But in short: yes, I am storing something in an (inheritable)threadlocal, but this value is always the same and can't and may not be altered.

                          MSG-1 destination=dedicatedThread1,.. Worker thread 3 dedicatedThread1
                          MSG-2 destination=dedicatedThread2,.. Worker thread 5 dedicatedThread2
                          MSG-3 destination=dedicatedThread1,.. Worker thread 9 dedicatedThread1

                          Comment


                          • #14
                            I don't use scoped-proxy. We (ab)use the "invalid scope" for some integration tests, but not in production code.

                            Comment

                            Working...
                            X