Announcement Announcement Module
Collapse
No announcement yet.
Channel scaling limitations Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Channel scaling limitations

    I'm building a Spring MVC app and I'm looking to route messages from various server components to individual users. I'm thinking about getting my feet wet with Spring Integration and using that to send messages around the system. I have a comet servlet that is a channel endpoint. So my users are connected to that and waiting for messages. Right now all users get all messages on that channel.

    If I want a message to go to a specific user, do I need a channel per user? Does having that many channels scale if there are 1000's of users logged in at a time? Can you create and destroy channels on the fly? Is Spring Integration the right way to go for this scenario?

    Thanks for helping a beginner get going on this.

  • #2
    Have you taken a look at our spring-integration-comet project yet? (it's in the sandbox repository for now):
    http://git.springsource.org/spring-i...egration-comet

    We would love to get your input as that evolves. It sounds like exactly what you need.

    -Mark

    Comment


    • #3
      Originally posted by Mark Fisher View Post
      Have you taken a look at our spring-integration-comet project yet? (it's in the sandbox repository for now):
      http://git.springsource.org/spring-i...egration-comet

      We would love to get your input as that evolves. It sounds like exactly what you need.

      -Mark
      Yes I found that project a couple days ago and have been playing with it quite a bit. Right now I'm actually tearing apart AsyncHttpRequestHandlingMessageAdapter to see exactly how it works and if I can get it to work for my scenario. And I think my scenario of passing messages to specific users using comet is a quite common one for comet. I'd love to see Spring improve it's support for comet behavior.

      From what I can tell, AsyncHttpRequestHandlingMessageAdapter is basically your comet servlet and an endpoint for 1 channel. So users connecting to this comet servlet are getting all the messages on that channel. That seems to be working fine. But I only want users to get messages that are specific to them. So I think there are 2 possible options.

      Option 1 is to have each user have their own channel and their own AsyncHttpRequestHandlingMessageAdapter. They would have to be created and destroyed at runtime, which I'm not exactly sure how to do and even if I could do it I'm not sure that's the best route.

      Option 2 is having AsyncHttpRequestHandlingMessageAdapter scan the incoming messages and only broadcast the messages directed to the user in the session. So all users are listening to the same channel and inside each message payload is a user identifier or something. If that matches the user in the current http session then continue broadcasting it to the user.

      I'd love to help out with whatever I can to get comet more integrated into Spring. I think there's a huge benefit of getting these together.

      Comment


      • #4
        At least from the point of view of the Atmosphere API, the way to do that is described here:
        http://atmosphere-users-mailling-lis...td5721315.html

        Right now, as you have pointed out, we just broadcast any given message to all suspended instances of AtmosphereResource (which essentially equates to all of the "subscribed" clients).

        So basically what would be needed is:

        1. Some logic to track the AtmosphereResource associated with a given user

        2. A way to add some additional routing logic to the actual broadcast so that you could, for example, broadcast to some subset of suspended AtmosphereResources based on some information in the SI Message (a custom userId header, for example).

        I'm not sure off the top of my head if there's some more idiomatic SI way of doing this (I'm sure Mark will chime in if there is), but my immediate instinct would be to add some extension hooks to AsyncHttpRequestHandlingMessageAdapter. Something like:

        Code:
        protected void onSubscribe(AtmosphereResource<HttpServletRequest, HttpServletResponse> resource)
        That would get called after a successful subscription to the endpoint, where the default implementation would be a no-op.

        Then for selecting a subset of AtmosphereResources to which to send a given Message:

        Code:
        protected Set<AtmosphereResource> selectBroadcastResources(Message<?> message, Broadcaster broadcaster)
        where the default implementation just returns broadcaster.getAtmosphereResources() or equivalent.

        I won't have time to work on this myself until after the holiday break, but feel free to fork the code and give these changes a shot and send me a pull request if you get it working.
        Last edited by jeremyg484; Dec 23rd, 2010, 03:07 PM.

        Comment


        • #5
          Thanks so much for the response Jeremy. I hacked together something that seems to work. A slightly different approach then yours. I didn't touch the AsyncHttpRequestHandlingMessageAdapter, but instead changed the broadcast method in HttpMessageBroadcaster. Mine only works for authenticated users through Spring Security and it only works for messages that implement my UserMessage bean, which has a userId field. I could easily combine your broadcast with mine so it can handle all cases, but for now this is the only case I needed.

          Code:
            protected void broadcast(AtmosphereResource<?, ?> resource, AtmosphereResourceEvent event) {
              HttpServletRequest request = (HttpServletRequest) resource.getRequest();
              HttpServletResponse response = (HttpServletResponse) resource.getResponse();
              SecurityContext securityContext = (SecurityContext)request.getSession().getAttribute(HttpSessionSecurityContextRepository.SPRING_SECURITY_CONTEXT_KEY);
              Integer userId = null;
              if (securityContext != null && securityContext.getAuthentication() != null && securityContext.getAuthentication().getPrincipal() instanceof User) {
                userId = ((User)securityContext.getAuthentication().getPrincipal()).getUserId();
              }
              else {
                throw new MessagingException("No authenticated user for user broadcast!");
              }
          
              List<HttpBroadcastMessage> messages = new ArrayList<HttpBroadcastMessage>();
              if (event.getMessage() instanceof List) {
                List messageBacklog = (List) event.getMessage();
                if (!messageBacklog.isEmpty() && messageBacklog.get(0) instanceof HttpBroadcastMessage) {
                  for (Object message : messageBacklog) {
                    if (message instanceof HttpBroadcastMessage && ((HttpBroadcastMessage) message).getMessage().getPayload() instanceof UserMessage && ((UserMessage) ((HttpBroadcastMessage) message).getMessage().getPayload()).getUserId() == userId) {
                      HttpBroadcastMessage httpBroadcastMessage = (HttpBroadcastMessage)message;
                      messages.add(httpBroadcastMessage);
                    }
                  }
                }
              }
              else if (HttpBroadcastMessage.class.isAssignableFrom(event.getMessage().getClass()) && ((HttpBroadcastMessage) event.getMessage()).getMessage().getPayload() instanceof UserMessage && ((UserMessage) ((HttpBroadcastMessage) event.getMessage()).getMessage().getPayload()).getUserId() == userId) {
                messages.add((HttpBroadcastMessage)event.getMessage());
              }
              else {
                throw new MessagingException("Illegal message for user broadcast!");
              }
          
              if (messages.isEmpty()) {
                log.info("User "+userId+" has no messages.");
                return;
              }
          
              Message<?> broadcastMessage = mergeMessagesForBroadcast(messages);
          
              // The rest of the code after this point is the same as Jeremy Grelle's version.
              try {
                response.getOutputStream();
              }
              catch (Exception ex) {
                throw new MessagingException("Cannot get the Servlet OutputStream for delivering async Message to browser client.  " + "Ensure that you've set 'useStreamForFlushingComments' to true in the AtmosphereServlet.", ex);
              }
          
              HttpMessageBroadcasterResponseWrapper responseWrapper = new HttpMessageBroadcasterResponseWrapper(response);
              messageMapper.writeMessage(request, responseWrapper, broadcastMessage, true, headerMapper);
              try {
                response.getOutputStream().write(responseWrapper.toByteArray());
                if (log.isInfoEnabled()) {
                  log.info("Wrote " + responseWrapper.toByteArray().length + " bytes to response.");
                }
                response.getOutputStream().flush();
              }
              catch (IOException ex) {
                throw new MessagingException("Failed to write async Message to browser client.", ex);
              }
          
              Boolean resumeOnBroadcast = (Boolean) request.getAttribute(AtmosphereServlet.RESUME_ON_BROADCAST);
              if (resumeOnBroadcast != null && resumeOnBroadcast) {
                resource.resume();
              }
            }
          Not sure if my way of changing the broadcaster is a good way of doing it. I think I'll explore your suggestions a bit more. The one thing that's annoying is the BraodcasterCache isn't working. I see your comments at the top saying it's not working and you have a message queue and threshold as temp solution. I'm trying to figure out why the cache isn't working in your code. Any suggestions on where I should look?

          Comment


          • #6
            After more experimenting I'm now using your HttpMessageBroadcaster instead of my UserHttpMessageBroadcaster in the previous post. To get spring-integration-comet stuff to work with per user messages I just changed AsyncHttpRequestHandlingMessageAdapter's handleMessage method to include the following code that builds a set of resources to send the message to. If the message coming through extends UserMessage then I know it will have a userId field and I use that to compare to the Spring Security user id in the session. Pretty simple.

            The biggest problem I've been having is with the BroadcasterCache. I've spent hours trying to figure out what's going on without any success. Any suggestions? Thanks!

            Code:
                  Set<AtmosphereResource<?,?>> resources = new HashSet<AtmosphereResource<?,?>>();
                  if (httpMessage.getMessage().getPayload() instanceof UserMessage) {
                    int messageUserId = ((UserMessage) httpMessage.getMessage().getPayload()).getUserId();
                    for (AtmosphereResource<?,?> resource : broadcaster.getAtmosphereResources()) {
                      if (messageUserId == ((User)((SecurityContext)((HttpServletRequest)resource.getRequest()).getSession().getAttribute(HttpSessionSecurityContextRepository.SPRING_SECURITY_CONTEXT_KEY)).getAuthentication().getPrincipal()).getUserId()) {
                        resources.add(resource);
                      }
                    }
                  }
                  else {
                    resources.addAll(broadcaster.getAtmosphereResources());
                  }
            
            ...
            
                  Future future = broadcaster.broadcast(broadcastMessages, resources);

            Comment

            Working...
            X