Announcement Announcement Module
Collapse
No announcement yet.
Custom poller use Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Custom poller use

    Hi guys,
    I would like to use my own poller implementation. I defined a class extending AbstractPollerConsummer but I don't know how to use it: with the following code, an instance of PollingConsummer is always used.
    Code:
    <int:service-activator 
    	input-channel="inChannel" 
    	output-channel="outChannel" 
    	ref="myBean" 
    	method="myMethod">
    	<int:poller ref="myPollerMetadata"/>
    </int:service-activator>
    How can I inject my own poller in the service-activator configuration without using the <poller> tag ?
    Last edited by madecreton; May 31st, 2011, 12:07 PM. Reason: poller

  • #2
    Can you show the bean whose id is "myPollerMetadata"?

    Also, can you explain the reasons you need to implement your own poller? (there might be a simpler, strategy-based way to configure the app).

    Comment


    • #3
      Hi Mark,
      For the moment, here is my configuration :
      Code:
      <int:poller id="myPollerMetadata" task-executor="pollerTaskExecutor" fixed-rate="3000">
        <int:transactional transaction-manager="fileSendingTransactionManager" propagation="REQUIRES_NEW" isolation="DEFAULT" timeout="60000" read-only="false"/>
      </int:poller>
      <bean id="pollerTaskExecutor" class="org.springframework.core.task.SyncTaskExecutor"/>
      I want to define my own poller because I want to override the "doPoll" method. On the previous code, you can see I defined a transactionManager for my poller. My problem is I have to manage not only one database, so I have to deal with more transactionManagers. I intend to write a doPoll method like this :
      Code:
      @Override
      	protected boolean doPoll() {
      		Message<?> message = (this.receiveTimeout >= 0)
      		? this.inputChannel.receive(this.receiveTimeout)
      				: this.inputChannel.receive();
      		if (message == null) {
      			return false;
      		}
      		
                      //My correlationId defines my transactionManager
      		Object correlationId = message.getHeaders().getCorrelationId();
      		
                      //Defines my TransactionDefinition
                      TransactionDefinition td = ...
      
                      TransactionStatus ts = this.transactionManagerProxy.getTransaction(correlationId, td);
                      try {
                        this.handler.handleMessage(message);
                        this.transactionManagerProxy.commit(correlationId, ts);
                      } catch (Exception e) {
                        this.transactionManagerProxy.rollback(correlationId, ts);
                      }
      		return true;
      	}
      The transactionManagerProxy manages a pool of transactionManagers and determines dynamically (depending on the incomming message) which one it has to use.

      Comment


      • #4
        Just to add to what Mark has said, your need for custom poller might actually be a need for custom trigger

        Comment


        • #5
          It actually looks to me like you should probably consider implementing your own PlatformTransactionManager. That said, I would also recommend reading this: http://www.javaworld.com/javaworld/j...nsactions.html

          If you don't choose that, then I would actually just implement Around advice (AOP), then add that to the advice-chain for the poller. It would be very similar to what you have now, but would not require extending our class. It would also be reusable as a cross-cutting concern.

          Hope that helps.
          -Mark

          Comment


          • #6
            Actually, you don't need any of that code. I thought you may be having requirements around custom trigger, but you simply poll every 3 sec and the rest is programmatic TX management which you don't need at all. All of it could be handled declaratively via poller transactional settings (propagation, isolation etc) and based on your code it seems like default settings is all you need
            Please read more in here: http://static.springsource.org/sprin.../#transactions

            Comment


            • #7
              Just to clarify, what i meant when i said 'programmatic TX management which you don't need at all'. I meant TX could be handled declaratively by bootstrapping your poller with appropriate transaction manager (custom in your case). However the actual poller configuration would be the same regardless of the implementation of PlatformTransactionManager (as Mark suggested).

              Also, looking at your code again you seem to be using correlationId when committing TX. Could you elaborate about your TX requirements and especially what is your transactional resource (RDBMS etc). I am not even sure you need to use correlationId explicitly as it will be handled by TX Manager implicitly.

              Comment


              • #8
                To clarify too, let's say that my flow will receive a lot of messages. My flow uses an aggregator, and we decide to use many MessageStores (mainly to avoid performance issues). I have a configuration table defining the dataSource to be used depending on the correlationId :
                Attachment
                So I instanciate the transactionManagers at startup while populating a cache.

                Now, here is a part of my flow configuration : before sending my message to the aggregator, I use a poller to delimitate my transaction.
                Code:
                <int:channel id="fileSendingCorrelationKeyCacheRemoverChannel"><int:queue/></int:channel>
                	<int:service-activator 
                		input-channel="fileSendingCorrelationKeyCacheRemoverChannel" 
                		output-channel="fileSendingAggregatorChannel" 
                		ref="fileSendingManagedCorrelationKeyCache" 
                		method="removeFromCache">
                		<int:poller ref="fileSendingTransactionalPoller"/>
                	</int:service-activator>
                
                	<int:poller id="fileSendingTransactionalPoller" task-executor="pollerTaskExecutor" fixed-rate="3000">
                		<int:transactional transaction-manager="fileSendingTransactionManager" propagation="REQUIRES_NEW" isolation="DEFAULT" timeout="60000" read-only="false"/>
                	</int:poller>
                	<bean id="pollerTaskExecutor" class="org.springframework.core.task.SyncTaskExecutor"/>
                But how to know which transactionManager to use? I can define my poller's transactionManager only at runtime, when my poller receives a message because this piece of information is contained into the message header.
                I can also define as much as pollers as I got transactionManagers, but how to dynamically bind them to the service-activator ?
                Attached Files

                Comment


                • #9
                  My flow uses an aggregator, and we decide to use many MessageStores (mainly to avoid performance issues).
                  Coud you please clarify this one? You simply want to distribute messages across many datasources? Since you mentioned performance, how many messages per second are you targeting?

                  In any event if you want to scale horizontally, then why are you using a single Application Context? Don't you want to scale that as well? This way you'll end up with a much cleaner approach with single TX manager per application context while load-balancing the load between application contexts.

                  Having said that, I do suspect a case of 'premature optimization', so let's figure out what you need first and then will try to come up with the most appropriate solution.

                  Comment


                  • #10
                    Oleg,
                    In our case, aggregators are used to produce summaries of the daily activity: we store each message sent in the day and at the evening we aggregate the messages.

                    As we don't release the aggregator very frequently, our messageStore can be quite populated (roughly 15000 messages per flow per day, and we'll have 40 flows).

                    So our idea was to implement an API which is in charge to manage the aggregation for every flow. However, we wanted each flow to manage its storage (its database, tx manager and message store table).

                    As it is quite complex to manage many database for a single aggregator bean, we simplified the solution by forcing flows to store in a single database. So the tx manager is always the same. The only thing that is still configurable is the message store prefix. I works well.

                    Thank you for your help Oleg

                    Comment

                    Working...
                    X