Announcement Announcement Module
Collapse
No announcement yet.
Add Transactional Boundries to Inbound XMPP Adapter Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Add Transactional Boundries to Inbound XMPP Adapter

    I'm trying to add an transactional boundary beginning with the inbound XMPP message adapter. I don't see any XML namespace support for adding a poller within the <xmpp:inbound-channel-adapter> element, so I tried just using the generic <int:inbound-channel-adapter> element, which does provider namespace support for pollers.

    The issue is that when I setup the inbound-channel-adapter, it requires a reference to a MessageSource, which I assumed to be the ChatMessageListenerEndpoint, but when I wired this config up, the application context failed to startup, saying it couldn't convert ChatMessageListenerEndpoint to a MessageSource. Any guidance on how to add transactional support with XMPP inbound adapter?

    Code:
    <int:channel id="xmppInboundChannel" />
    <bean id="chatMessageListenerEndpoint" class="org.springframework...ChatMessageListenerEndpoint" >
      <constructor-arg ref="xmppConnection" />
      <property name="extractPayload" value="false" />
      <property name="requestChannel" ref="xmppInboundChannel" />
    </bean>
    <int:inbound-channel-adapter channel="xmppInboundChannel" ref="chatMessageListenerEndpoint">
      <int:poller max-messages-per-poll="1" fixed-rate="1000">
        <int:transactional />
      </int:poller>
    </int:inbound-channel-adapter>

  • #2
    Yeah, ChatMessageListenerEndpoint is not a PollingConsumer but an EventDrivenConsumer so what you can do is this.

    XMPP -> QueueChannel -> Poller

    . . . which would look something like this
    Code:
    <int-xmpp:inbound-channel-adapter id="xmppInboundAdapter"
          channel="xmppInbound" 
          xmpp-connection="testConnection"/>
    
    <int:channel id="xmppInbound">
         <int:queue capacity="10000"/>
    </int:channel/>
    
    <!-- and here whatever polling consumer you want (e.g., service-activator)-->
    <int:service-activator  input-channel="xmppInbound" . . .>
         <int:poller max-messages-per-poll="1" fixed-rate="10000">
    		<int:transactional transaction-manager="txManager" />
         </int:poller>
    </int:service-activator>
    Does that explain?

    Comment


    • #3
      Yes, I'll give that a try.

      Comment


      • #4
        Brandon

        We were just discussing it and although the solution that i described will work we do agree that we must provide support for initiating transaction flows for event driven consumer so we just created this issue. https://jira.springsource.org/browse/INT-1685
        You can follow its progress.

        Also, you can try another solution which is more event driven

        XMPP -> service-activator -> Gateway

        This whole point is not to create a point of failure with the queue channel and don't rely on the Poler
        So all you need to do is define a Gateway and invoke this gateway from service activator bean method which is annotated with @Transactional

        Code:
        @Transactional
        public void processXmppMessage(Message message){
                myGteway.whatever()
        }

        Comment


        • #5
          I followed your setup, with the service activator calling the gateway, wrapped in transaction. However, I get a ChannelResolutionException saying "no output-channel or replyChannel header available".

          Setup is as follows:

          Code:
          <xmpp-inbound-channel-adapter .... channel="openfireInboundChannel" />
          
          <int:service-activator input-channel="openfireInboundChannel" output-channel="openfireOutputChannel" >
            <bean class="org.mycompany.messaging.XmppMessageHandlingService" />
          </int:service-activator>
          
          <int:gateway id="xmppMessageHandlingService" service-interface="org.mycompany.messaging.XmppMessageHandlingGateway" 
             default-request-channel="xmppInboundTransformerChannel" 
             default-reply-channel="xmppOutboundSmackConvertedChannel" />
          
          ...
          
          <int:transformer input-channel="messageProcessedChannel" output-channel="xmppOutboundSmackConvertedChannel" 
            ref="messageTransformer" method="convertPayloadToSmackMessage" />
          Both the service and the gateway return a Smack message, and take a smack message as their parameters. I expected the gateway to just return the result, which is a smack message, to the service which would then send the message to the configured outbound channel. From the logs, it makes it back to the gateway which apparently doesn't know what to do with the Smack message.
          Last edited by jzcfk9; Dec 19th, 2010, 10:26 AM.

          Comment


          • #6
            Ok, I changed the gateway and service to both return void. I then changed the last step in the workflow, which converted the SI message into a Smack message, to send the Smack message to the "openfireOutputChannel" which then forwards the Smack message through the outbound XMPP message sending adapter.

            This all seems to work now, so I guess my confusion in all of this was the role that the gateway played in workflow. I was assuming I could have the gateway initiate the workflow (minus the inbound and outbound adapters) and then I could let the gateway return the resulting Smack message back to the service, which would then send the message back through the outbound Xmpp adapter:

            In/Out Xmpp Adapter <--> Service Activator <--> Gateway <--> workflow

            But rather, it seems the process is now linear:

            In Xmpp Adapter --> SA --> Gateway --> workflow --> outbound Xmpp Adapter

            I'm not sure there is an advantate or disadvantage to this linear flow, but it would be nice if everything started and ended with the service, which is where the transaction begins and ends.

            Comment


            • #7
              Brandon

              Gateways are the only components that could be looked at as by-directional, because they encapsulate both the producer and optionally the consumer, thus providing you with single send/receive type entry point to a messaging system.
              Adapters are always unidirectional and in fact if you look at the Gateway they are also uni-directional. Its the fact that single Gateway encapsulates a message producer (the first component in the message flow that sends a message to a channel) and the *last* message consumer (the last component that receives the message from the channel), is what makes it look like by-directional.

              Also, with some refactoring that was done for SI 2.0.1 (released 2 days ago) you no longer have to define a custom service-interface for a gateway which is invoked by service-activator.
              See this for more details https://jira.springsource.org/browse/INT-1688
              The following should be enough:
              Code:
              <int:gateway id="xmppMessageHandlingService" 
                 default-request-channel="xmppInboundTransformerChannel"/>
              Let us know

              Comment


              • #8
                Just to be clear with the changes in 2.0.1 related to service-interface, do I still need to create the XmppMessageHandlingService class annotated with @Transactional, or is a proxy gateway created automatically for me?

                Here's what I had before:

                Code:
                @Service
                @Transactional
                public class XmppMessageHandlingService
                {
                  @Autowired
                  XmppMessageHandlingGateway messageGateway;
                
                  @Transactional
                  public void processMessage(Message message)
                  {
                    messageGateway.handleMessage(message);
                  }
                }
                
                public interface XmppMessageHandlingGateway
                {
                  public void handleMessage(Message message);
                }

                Now, I removed the <int:service-activator> tag where I defined the XmppMessageHandlingService, and replaced it with just the <int:gateway id="xmppMessageHandlingService"> tag, but I'm wondering if it's respecting my original @Service bean annotated with @Transactional.

                Comment


                • #9
                  Yes, it should.
                  The question I have is that XMPP does not expose TransactionManager so any TX configuration on it is meaningless.

                  Could you elaborate on your use case a bit?
                  What are you trying to accomplish with this:
                  Code:
                  @Transactional
                    public void processMessage(Message message)
                    {
                      messageGateway.handleMessage(message);
                    }

                  Comment


                  • #10
                    I'm performing various persistence layer operations based on incoming messages. In the beginning, I'm retrieving the referenced object contained in the incoming message from the persistence layer. Based on message headers I may be performing any number of CRUD operations on the object. At the very end of the worklow I am marshalling the object to XML before sending the message outbound through the XMPP adapter. I am sometimes encountering a closed Hibernate session where it's unable to lazy-load some of the associations contained in the object that I'm trying to marshal to XML. I was hoping that by starting a transaction via the gateway that the Hibernate session would remain open throughout the messaging sequence, similar to the OpenSessionInView filter that's available for Controllers, which forces the session to remain open until the view is rendered.

                    Comment


                    • #11
                      Brandon, without looking at the full application with all the configs etc., everything i would say would be speculation.
                      One thing i can say for sure is that by adding @Tx to the XMPP adapter would not help.

                      TX is bound to the thread-local so anything on the stack of that thread is now under transaction
                      For example:

                      @Transactional
                      ComponentA -> ComponentB -> ComponentC -> . . .

                      In the above, because ComponentA is transactional, ComponentB and ComponentC are also transactional because they are part of the same stack.

                      Now in messaging that is not always true since it could be affected by the nature of your flow.
                      For example, if you break the single-theraded nature (e.g., by adding an executor or pollable channel in between the components) of your flow, it will also break your transactional boundaries.

                      But unless you are doing that I would honestly ask the question on Spring DataAccess forum or you can also try Hibernate forum.

                      Comment

                      Working...
                      X