Announcement Announcement Module
Collapse
No announcement yet.
Amazon SQS and Message Acknowledgement Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Amazon SQS and Message Acknowledgement

    Hello folks,

    Trying to use Spring Integration with Amazon's SQS (well, actually a wrapper around SQS that gives us hooks to do encryption etc) and we're running into some troubles. The basic problem is that we need to be able to read a message off of SQS, do some processing on it, and once the message is processed, delete it from the queue. A bit of detail on the lifecyle of an SQS message is here - http://aws.amazon.com/sqs/#details.

    Our first attempt at this involved using a Channel Adapter to read messages off of SQS and onto a message channel. The problem with this is that it doesn't give us any hook to delete the message once processing is successful. It means we need to delete it at the same time we read it off the queue, so if our processing step fails, we lose the message.

    Read - process - ack is a pretty common pattern for messaging. JMS has a way to ack messages, as does the AMQP spec, so I'm guessing there's some way to do this baked into Spring Integration that I'm just failing to find.

    Little help?

    Thanks,
    Mike

  • #2
    Alex Peters seems to have this problem solved here http://github.com/alpe/spring-integr...ndGateway.java He uses TransactionSynchronizationManager.registerSynchron ization to have the use of a transaction cause the message to be deleted on commit. This assumes that you have some ultimate transaction downstream that will commit the transaction and thus trigger the commit (or rollback, which in this case simply means itll expire).

    Comment


    • #3
      Hm thanks for that,

      I'm a little confused as to how I could use this, and just what a MessageSource is supposed to be used for. From what I've seen, one of the central abstractions in Spring Integration (and the book it's based on, Enterprise Integration Patterns) is a MessageChannel.

      I was expecting to be able to do something like configure a MessageChannel that would represent my SQS queue, possibly with some small bit of translation code somewhere. It looked to me that a ChannelAdapter is what's supposed to be used for that translation.

      Where exactly does a MessageSource fit into Spring Integration?

      Comment


      • #4
        Right. OK, so, the immediate answer is that a MessageSource implementation can be used to teach Spring Integration about an external system if it doesnt already have an adapter for it. You can use the MessageSource implementation with the inbound-channel-adapter element in the Spring Integration configuration. the inbound-channel-adapter presses the MessageSource implementation for new messages by calling the receive method periodically. the period, in this case, is dictated by a poller element (specified in the inbound-channel-adapter element, or globally).

        to connect that adapter with some other component (say you want to react to the message, or print it out, or do some processing, etc, using a service-activator), you define a named channel, which acts as a conduit: messages coming 'out' of the adapter get sent over the channel and 'in' to the service-activator, like so:

        <channel id = "c" />
        <inbound-channel-adapter ref = "messageSourceBeanId" channel = "c" />
        <service-activator ref = "serviceActivatorBeanId" method = "methodToCall" inbound-channel ="c" />

        So, the presumption of that code to which I linked you is that you are going to have a spring-managed transaction downstream, in the same thread (perhaps in the service activator when talking to a database or something?) and that will ultimately commit and cause the Transaction Synchronization delegate to be invoked (which will in turn tell SQS to delete your message). If nothing commits (and thus deletes the message), then eventually SQS will expire the read and rollback.

        Comment


        • #5
          Also, I'd like to add that if you look at the existing inbound Channel Adapter implementations for Spring Integration, you will find that any that are Polling Consumers (e.g. File, POP3) are internally delegating to a MessageSource implementation. We simply wrap that up and provide namespace support. Eventually, we will very likely do the same for SQS. In the meantime, Josh's example with the "generic" inbound-channel-adapter should suffice.

          -Mark

          Comment


          • #6
            Ah okay,

            Starting to grok it. An additional TransactionSyncronization is added to an existing transaction manager, so that an additional callback fires when the transaction commits. Hopefully that'll work for us since we are indeed writing to a database so we do have a TransactionManager in play.

            Out of curiosity, is this the way you guys think things should work when dealing with queues that support this sort of delete/ack semantics, or has any consideration been given to adding some way to have a delete/ack callback on a MessageChannel?

            IMHO, requiring a transaction manager might be a bit overkill in many situations. There are some cases where we'll be writing to, say, a key/value store and wouldn't have an existing Spring TransactionManager around. Quite a few messaging systems I'm familiar with work this way (JMS, SQS, also AMQP and Rabbit - http://www.rabbitmq.com/faq.html#atomic-operations), all have something similar...

            Anyhow, thanks a ton for the assist!

            Comment


            • #7
              Message Acknowledgement

              Hello,

              I have the similar problem, I need to have some possibility to ACK message after receive on channel.

              I want to write some client that reads messages from server through Webservice interface. The client should read message from channel, than process the message and than ACK or reject or timeout the message delivery.The Webservice should provide message from Spring Integration Queue Channel.

              So I want to implement my own SQS like solution based on Spring Integration.

              I can not use distributed TX context for acknowledge. The Webservice could be a SOAP or REST realization, and the client could be Android, Web-AJAX or some other technologies.

              It may be like realization of STOMP adapter to communicate with SI Channel.

              Is there some support for this solution ? Or is planned some support for explicit ACK on channels?

              Comment


              • #8
                This may solve the problem: Nevado

                I was just searching for a way to integrate Amazons SQS in Spring and I found this project:

                http://nevado.skyscreamer.org/

                Nevado JMS is a JMS driver for Amazon SQS.
                • Leverage cloud services in Amazon using standard Spring/J2EE metaphors
                • Avoid tight coupling to cloud-specific API's inside your code
                • Rapidly prototype enterprise clients in the cloud. Then deploy them anywhere.

                How you get started:

                Code:
                <dependency>
                    <groupId>org.skyscreamer</groupId>
                    <artifactId>nevado-jms</artifactId>
                    <version>1.0.0</version>
                </dependency>
                Code:
                <!-- Pick your AWS SDK.  Typica is pretty fast. -->
                <bean id="sqsConnectorFactory" class="org.skyscreamer.nevado.jms.connector.typica.TypicaSQSConnectorFactory" />
                
                <!-- And this is an implementation of javax.jms.ConnectionFactory -->
                <bean id="connectionFactory" class="org.skyscreamer.nevado.jms.NevadoConnectionFactory">
                    <property name="sqsConnectorFactory" ref="sqsConnectorFactory" />
                    <property name="awsAccessKey" value="${aws.accessKey}" /> <!-- Set this -->
                    <property name="awsSecretKey" value="${aws.secretKey}" /> <!-- And this -->
                </bean>
                And that's it ... maybe you should give it a try.

                Comment


                • #9
                  I could not find out how to use Nevado with an HTTP proxy to set the connection with AmazonWS.
                  AmazonWS java connection code lets do it through ClientConfiguration class, but I have not seen anything similar with these solutions.
                  Regards

                  Comment

                  Working...
                  X