Announcement Announcement Module
No announcement yet.
Random Access Queue Page Title Module
Move Remove Collapse
Conversation Detail Module
  • Filter
  • Time
  • Show
Clear All
new posts

  • Random Access Queue

    I'm in the process of extending AbstractMessageChannel to obtain a Random Access Queue, is there something similar under way? Any suggestions on this? Should I publish the code in here?

    It's nothing complicated, though, just a simple wrapper around a Map.

  • #2
    Can you elaborate a bit on the motivation/use-case?

    I'm curious if it's something that could be handled with a ChannelInterceptor or ChannelPurger.



    • #3
      Well, as I said in another post, I'm building a "service layer" based on REST to our in-house framework. In it I can have (at least) 3 types of invocations:

      1) Fire & Forget
      2) Request-and-wait-for-Response
      3) Request-and-get-Id
      4) get the response by id from 3)

      So for 3) and 4) I can have something like

      A) GET /slayer/reports/reportABC -> 8e596fd2-216e-4015-8609-463f3b83a253

      B) GET /slayer/reports -> list of reports on the "reports" queue not yet consumed

      C) GET /slayer/reports/8e596fd2-216e-4015-8609-463f3b83a253 -> the report from A)

      So I have to access the queue in a random way.

      What do you think?


      • #4
        BTW, how do I define what type of channel to use? Is using the datatype?

        <channel id="responseChannel" datatype="XXXXXXXX"/>

        And for other completely different thing, since I updated spring integration from trunk today I always get

        BeanDefinitionParsingException: Configuration problem: Unable to locate Spring NamespaceHandler for XML schema namespace []

        Have I missed anything?



        • #5
          The "datatype" attribute is for the accepted types of Message payload (see: ttp://

          When using <channel/> currently it will always create a SimpleChannel (with different internal queue based on the capacity you provide). If you want to use a PriorityChannel, you would need to define a generic <bean/> at this point (the namespace support for that is on the M3 roadmap).

          As far as the XSD resolution, can you post your xmlns declarations from the top-level element? (or try this... it was temporarily expecting "spring-integration-adapters-1.0.xsd" but now should be just "spring-integration-1.0.xsd").



          • #6
            Ok, no problem with that. But I have another question. (moved to here)
            Last edited by amsmota; Mar 3rd, 2008, 10:41 AM.


            • #7
              I don't know if it's of general interest, but here's a draft of a RandomAccessChannel, that wraps a Random Access Queue.

              I have 3 doubts about this, anyhow:

              1) What should I do with the timeouts (see TODO's) ?
              2) Should I enforce Map<String, Message<?>> instead of Map<Object, Message<?>> ?
              3) What is the Java version dependency of Spring Integration? I was thinking about using a ConcurrentMap instead of a Map, but that implies Java 1.5.

              Here's the code so far:

              import java.util.Hashtable;
              import java.util.List;
              import java.util.Map;
              import java.util.Set;
              import java.util.Vector;
              import org.springframework.integration.message.Message;
              import org.springframework.integration.message.selector.MessageSelector;
              import org.springframework.util.Assert;
              public class RandomAccessChannel extends AbstractMessageChannel{
              	private final Map<Object, Message<?>> queue; 
              	public RandomAccessChannel() {
              		this(null, null);
              	public RandomAccessChannel(Map<Object, Message<?>> queue) {
              		this(null, queue);
              	public RandomAccessChannel(DispatcherPolicy dispatcherPolicy) {
              		this(dispatcherPolicy, null);
              	public RandomAccessChannel(DispatcherPolicy dispatcherPolicy, Map<Object, Message<?>> queue) {
              		super((dispatcherPolicy != null) ? dispatcherPolicy : new DispatcherPolicy());
              		this.queue = (queue != null) ? queue : new Hashtable<Object, Message<?>>();
              	protected boolean doSend(Message<?> message, long timeout) {
              		Assert.notNull(message, "'message' must not be null");
              		//what TODO with the timeout?
              		return doSend(message);
              	protected Message<?> doReceive(long timeout) {
              		//what TODO with the timeout?
              		String key = queue.keySet().toArray()[0].toString();
              		return doReceive(key);
              	public List<Message<?>> clear() {
              		List<Message<?>> list = new Vector<Message<?>>(this.queue.values());
              		return list;
              	public List<Message<?>> purge(MessageSelector selector) {
              		List<Message<?>> list = new Vector<Message<?>>();
              		for(Message<?> message: queue.values()){
              		return list;
              	public boolean doSend(Message<?> message) {
              		Assert.notNull(message, "'message' must not be null");
              		this.queue.put(message.getHeader().getCorrelationId().toString(), message);
              		return true;
              	public Message<?> doReceive(Object id) {
              		return this.queue.remove(id);
              	public Message<?> peek(Object id) {
              		return this.queue.get(id);
              	public boolean containsMessageForId(Object key){
              		return this.queue.containsKey(key);
              	public Set<Object> iterator() {
              		return queue.keySet();
              Last edited by amsmota; Mar 4th, 2008, 09:57 AM.


              • #8
                I'm not sure, but it smells to me like you're trying to give the channel a responsibility that is not related to channeling.

                If you create a registry that stores the state of the reports you can influence that registry from a ChannelInterceptor. That would decouple your businesslogic (of keeping track of the state of the message) from the Channel.

                In essence you would just copy the code that is your business from the RAQ to your registry and glue it to the channel with a ChIn. Et voila, your business code is independent from Spring again.

                I'd have to check, but maybe there is already a MethodInvokingChannelInterceptor?


                • #9
                  I think I don't understand your reply. It seems to me that I don't have any business logic in my use case. Let me try to explain myself a little better.

                  I have two queues, a "request" queue and a "response" queue. The user-agent put a request in the request queue and gets back the messageId og the message generated on that queue. The user can then query the response queue using that messageId whenever he wants. Of course a message with that Id can exist or not on the response queue at a given time. Or he can obtain a list off all the messages that are in the response queue and see if the message with a corresponding Id already exists in there.

                  Now the message in the request queue is consumed eventually by a service, that makes whatever it needs, taking the time it needs (in my case it seems that can be some long period), and when it finishes it puts its results in a message with correlationId equal to the consumed messageId in the response queue. Once it's there it can be consumed by the user-agent through the original messageId.

                  It's for this last "response queue" that I need the RAQ channel.

                  So I don't have any "status" of messages, just a response queue with messages to be consumed at any time after they are put there, that can be consumed through a "key".

                  Now I don't know if this is a clear explanation, so bare with me...


                  • #10
                    Yes, the term 'business logic' was out of order. But I do think I understand.

                    The RAQ is responsible for two things:
                    1. transporting messages with the result of the slow service
                    2. holding them for a user agent to query by id

                    I'm looking for a better place for the second responsibility.


                    • #11
                      If I understand correctly, I believe that this issue should address your problem without the need for a new channel implementation:

                      The first draft of that implementation should be available by the end of today (or tomorrow at the latest). I'll post back here and would be very interested in your feedback - if that does in fact address your need.



                      • #12
                        How's the situation on this? I noticed today there are lot's of updates in the SVN. (neverthless, my channel is working ok for me...)


                        • #13
                          Check out the recently added ResponseCorrelator. There are still a number of possibilities for extending this (see the JIRA comments for a start:, but I'm interested in how this may apply to your use case.



                          • #14
                            Hello again:

                            I started today to test the ResponseCorrelator and I'm having a heck of a time doing it. First, let me tell the things my RandomAccessChannel was doing and I can't accomplish with ResponseCorrelator.

                            - a non-blocking request-response mechanism. When I try to get a response from the queue I want to return something similar to a 404 - Not Found, so the user can keep requesting the resource for the response whenever he wants until it gets it.

                            - a iterator over the messages currently on hold in the queue. That way, a user can query the responseChannel without a msgId in which case he receives a list of all the msgId's currently on the queue, if working with a http connector in the form of hiperlinks to those msgs (very restfull )

                            Besides this "wishes", I don't even am successful in simply testing it with a asynchronous request / response test, the id's never match...

                            But I'm trying a little more, I was testing with the M3 from the repository, I'll try now with the version I downloaded from the trunk.

                            Thanks a lot for your time.


                            • #15
                              So basically I can rewrite my code to use a ResponseCorrelator instead of my Response channel, but that way I cannot specify like I have

                              <channel id="input"/>
                              <channel id="responses"/>
                              <endpoint input-channel="input" handler-ref="serviceimpl" handler-method="execute" default-output-channel="responses"/>

                              or with my RandomAccessChannel

                              <beans:bean id="responses" class="RandomAccessChannel"/>

                              because I have to hand code the ResponseCorrelator that has it's own queue. Also I added a iterator to ResponseCorrelator, RetrievalBlockingMessageStore, MessageStore, SimpleMessageStore. And I miss a peek method, but I can have the same efect by looking for a null after a ResponseCorrelator.getResponse(key).

                              I'll keep on testing tomorrow.