Announcement Announcement Module
Collapse
No announcement yet.
Using Transaction Synchronization w/o RabbitMQ transactions for better performance Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Using Transaction Synchronization w/o RabbitMQ transactions for better performance

    For those interested I have implemented a wrapper around Spring AMQP to make an in-memory list of pending messages that are then sent after the platform's transaction has commited.

    That is if you do something like:

    Code:
    @Transaction
    public void doSomeDbStuffAndSendMessage() {
        //Do some db stuff and put the id of a entity into Message m.
        customRabbitTemplate.send(m);
        // Maybe do some more db stuff.
    }
    With the current Spring AMQP/RabbitMQ model you would have to use RabbitMQ TX which are god awfully slow.
    The reality is most of the time you just want the messages to be sent after the transaction has committed.

    Below is a rough idea how I implemented this. I think that Spring AMQP could offer something like this as the above use case is a very common one (I have seen dozens of Stackoverflow Q. of why is my entity not persisted yet?).

    Code:
    	public static Optional<MessageBusResourceHolder> getTransactionalResourceHolder(TxMessageBus messageBus) {
    
    		if ( ! TransactionSynchronizationManager.isActualTransactionActive()) {
    			return Optional.absent();
    		}
    		
    		MessageBusResourceHolder o = (MessageBusResourceHolder) TransactionSynchronizationManager.getResource(messageBus);
    		if (o != null) return Optional.of(o);
    		
    		o = new MessageBusResourceHolder();
    		TransactionSynchronizationManager.bindResource(messageBus, o);
    		o.setSynchronizedWithTransaction(true);
    		if (TransactionSynchronizationManager.isSynchronizationActive()) {
    			TransactionSynchronizationManager.registerSynchronization(new MessageBusResourceSynchronization(o, messageBus));
    		}
    		return Optional.of(o);
    		
    	}
    	
    	private static class MessageBusResourceSynchronization extends ResourceHolderSynchronization<MessageBusResourceHolder, TxMessageBus> {
    		private final TxMessageBus messageBus;
    		private final MessageBusResourceHolder holder;
    
    		public MessageBusResourceSynchronization(MessageBusResourceHolder resourceHolder, TxMessageBus resourceKey) {
    			super(resourceHolder, resourceKey);
    			this.messageBus = resourceKey;
    			this.holder = resourceHolder;
    		}
    
    
    		@Override
    		protected void cleanupResource(MessageBusResourceHolder resourceHolder, TxMessageBus resourceKey,
    				boolean committed) {
    			resourceHolder.getPendingMessages().clear();
    		}
    
    		@Override
    		public void afterCompletion(int status) {
    			if (status == TransactionSynchronization.STATUS_COMMITTED) {
    				for (Object o : holder.getPendingMessages()) {
    					messageBus.post(o, false);
    				}
    			}
    			else {
    				holder.getPendingMessages().clear();
    			}
    			super.afterCompletion(status);
    		}
    		
    		
    	}

  • #2
    Hi!

    I will not go into the debate around the solution, but I say about this one:
    With the current Spring AMQP/RabbitMQ model you would have to use RabbitMQ TX
    There is no reason to use RabbitTransactionManager, if you already have one, e.g. DataSourceTransactionManager, which does excelent synchronizations: http://static.springsource.org/sprin...qp.html#d4e415

    Isn't it?

    Cheers,
    Artem

    Comment

    Working...
    X