Announcement Announcement Module
Collapse
No announcement yet.
SI and BlazeDS failure when no consumers (blazeds sessions) Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • SI and BlazeDS failure when no consumers (blazeds sessions)

    Hello. I'm trying to build a solution whereby I have a Spring Integration process that processes files from the file system and then sends them through the pipleline. Eventually they get shoved through JMS and come out the other end via a message driven JMS adapter, out on the channel and straight to the destination configured by <flex:integration-message-destination ... /> this works fine IF there is somebody logged in. It blows chunks if I restart the server and drop a file, because (reasonably so) the message driven adapter complains that there are no subscribers when it tries to send the message out. Which is true. But there's hardly anything I can do about that. I know of a few ways to work around this. I can choose ignore-failures on the dispatcher, but that's missing the point, it isn't a failure, per se. I could simply create a component before the final channel and eat the success/failure of the send operation by injecting the channel and sending the message manually. Thus, as far as Spring integration is concerned the processing dead ends right before the flex destination. I can afford to ignore these messages as theyre not document messages, but event messages - they server only to refresh the UI thats already loaded in a given clients screen. If nobodys logged on, then theres no screen to refresh. Next time they login theyll get a fresh view of the data, no matter what.

    One approach i thought about was using the MessageInterceptors on the channel and veto-ing the send if there are no subcribers. It's just that I don't know how to ascertain that there are no subscribers from inside a ChannelInterceptor.

    Any insight into this would be appreciated. I wasn't sure if I should post this on the Spring Integration forum or on the BlazeDS forum since it's definitely a problem unique to BlazeDS in conjunction with SI...

    Thanks,

  • #2
    If the messages are really event messages, I would typically recommend using a "publish-subscribe-channel" instead. That would make it easy to add additional consumers for one thing, but it would also avoid the dispatcher failure.

    I've considered this general issue a bit, and I think there are two things we could do within Spring Integration: 1) add a boolean attribute to the 'channel' element ('require-subscribers' or something) or 2) support a header within the message itself. The latter option would allow for explicit labeling of "event" messages.

    Let me know if there is some reason you *don't* want to use "publish-subscribe-channel", and if you have a preference for the 2 options I mentioned above, let me know that as well.

    Thanks,
    Mark

    Comment


    • #3
      I did try the publish-subscribe option (thinking that it should work as you describe.) Apologies for not having mentioned this...

      here's my integration (roughly):

      Code:
       
      <?xml version="1.0" encoding="UTF-8"?>
      <beans xmlns:integration="http://www.springframework.org/schema/integration"
      	xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context"
      	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
      	xmlns:util="http://www.springframework.org/schema/util" xmlns:tool="http://www.springframework.org/schema/tool"
      	xmlns:lang="http://www.springframework.org/schema/lang" xmlns:jms="http://www.springframework.org/schema/integration/jms"
          xmlns:file="http://www.springframework.org/schema/integration/file"
      	xmlns:amq="http://activemq.apache.org/schema/core" xmlns:flex="http://www.springframework.org/schema/flex"
      	xsi:schemaLocation="
          	http://www.springframework.org/schema/util
          	http://www.springframework.org/schem...g-util-3.0.xsd
      		http://www.springframework.org/schema/tool
      		http://www.springframework.org/schem...g-tool-3.0.xsd
      		http://www.springframework.org/schema/lang
      		http://www.springframework.org/schem...g-lang-3.0.xsd
              http://www.springframework.org/schema/beans
              http://www.springframework.org/schem...-beans-3.0.xsd
              http://www.springframework.org/schema/context
      		http://www.springframework.org/schem...ontext-3.0.xsd
              http://www.springframework.org/schema/integration
      		http://www.springframework.org/schem...ration-1.0.xsd
              http://www.springframework.org/schema/integration/file
      		http://www.springframework.org/schem...n-file-1.0.xsd
      		http://www.springframework.org/schema/integration/jms
      		http://www.springframework.org/schem...on-jms-1.0.xsd
      		http://activemq.apache.org/schema/core
      		http://activemq.apache.org/schema/co...core-5.3.0.xsd
      		http://www.springframework.org/schema/flex
              http://www.springframework.org/schem...g-flex-1.0.xsd
              ">
      
      
          <!-- general Spring stuff -->
      	<context:annotation-config />
      	<context:component-scan base-package="c.a.swr.flex.auction" />
      
      
          <!-- bus stuff -->
      	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate" p:connectionFactory-ref="connectionFactory" />
      	<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory" p:sessionCacheSize="10" p:cacheProducers="false">
      		<property name="targetConnectionFactory">
      			<amq:connectionFactory   brokerURL="tcp://localhost:61616" />
      		</property>
      	</bean>
      
          <amq:topic id="itemPosted" name="itemPosted" physicalName="itemPosted" />
          <amq:topic id="bidPosted" name="bidPosted" physicalName="bidPosted" />
          <amq:topic id="bidAccepted" name="bidAccepted" physicalName="bidAccepted" />
      
          <bean id="itemFilesMount" class="org.springframework.core.io.FileSystemResource"  >
              <constructor-arg value="#{ systemProperties['user.home']+'/flexCsvFiles' }"/>
          </bean>
      
          <integration:channel id="inboundItemFiles"/>
          <integration:channel id="inboundItemFileStrings"/>
          <integration:channel id="inboundItems"/>
          <integration:publish-subscribe-channel  id="inboundItemsPosted" />
      
      
          <file:inbound-channel-adapter auto-create-directory="true" directory="#{itemFilesMount.file.absolutePath}" channel="inboundItemFiles">
              <integration:poller> <integration:interval-trigger interval="10000"/> </integration:poller>
          </file:inbound-channel-adapter>
          <file:file-to-string-transformer input-channel="inboundItemFiles" output-channel="inboundItemFileStrings" delete-files="true" />
          <integration:transformer input-channel="inboundItemFileStrings" ref="fileToItemTransformer" output-channel="inboundItems"/>
          <integration:service-activator input-channel="inboundItems" ref="itemCreationServiceActivator"/>
          <jms:message-driven-channel-adapter  connection-factory="connectionFactory" destination="itemPosted" channel="inboundItemsPosted" />
      
      	<flex:message-broker services-config-path="/WEB-INF/flex/services-config.xml">
      		<flex:message-service default-channels="my-amf" />
      	</flex:message-broker>
      
          <flex:integration-message-destination
                  channels="my-amf"
                  id="itemPostedDestination"
                  message-channel="inboundItemsPosted"            
                  />
          
         
         
      </beans>
      Note the channel named 'inboundItemsPosted'. It's what passes the inbound messages from the JMS message driven channel adapter and passes it on to the ntegration-message-destination. I debugged this and got that inside of BroadcastingDispatcher, iin the dispatch(Message<?> message) method we have the following logic:

      Code:
      public boolean dispatch(Message<?> message) {
      		boolean dispatched = false;
      		int sequenceNumber = 1;
      		List<MessageHandler> handlers = this.getHandlers();
      		int sequenceSize = handlers.size();
      		for (final MessageHandler handler : handlers) {
      			final Message<?> messageToSend = (!this.applySequence) ? message
      				: MessageBuilder.fromMessage(message)
      						.setSequenceNumber(sequenceNumber++)
      						.setSequenceSize(sequenceSize)
      						.setCorrelationId(message.getHeaders().getId())
      						.setHeader(MessageHeaders.ID, UUID.randomUUID())
      						.build();
      			if (this.taskExecutor != null) {
      				this.taskExecutor.execute(new Runnable() {
      					public void run() {
      						invokeHandler(handler, messageToSend);
      					}
      				});
      				dispatched = true;
      			}
      			else {
      				boolean success = this.invokeHandler(handler, messageToSend);
      				dispatched = (success || dispatched);
      			}
      		}
      		return dispatched;
      	}
      As there are no handleers in the handlers collection we never get a chance to set dispatched = true, which in turn is returned from the method as the success status variable. this value (the boolean) is then returned from protected boolean doSend(Message<?> msg, long timeout) in AbstractSubscribableChannel, which in turn is invoked by public final boolean send (Message<?> msg, long timeout) in AbstractMessageChannel, which in turn returns the boolean as whether or not the message was sent from the other variant of the send(Message<?> msg) method. Anyway, this bubbles up all the way back to the MessageChannelTemplate's doSend method.

      So, I understand why this works that way, normally. Most integrations are dealing with static components that don't subscribe/disconnect as frequently as users on a web app... so I'm not recommending we change that fundamental behavior. Instead, I like the idea of that opt-in configuration option at the higher level that you suggested. I am fundamentally unsure of which option would be better, too. Functionality exposed as a message header is definitely the most flexible since it means it could be dynamic / arbitrary and still be an opt-in behavior.

      Anyway, all input's greatly appreciated.

      Thanks,

      Comment

      Working...
      X