Announcement Announcement Module
Collapse
No announcement yet.
Channel and message transformation question Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Channel and message transformation question

    I have been thinking about the following problem for a few days but I couldn`t find any answers (although I have the book: Enterprise Integration Patterns). So I try it on the Spring forum.

    I have a system that can be compared to a productionline in a factory, transforming data, calculating new data. This system has to be very flexible, so components have to be loosely coupled. The system allows asynchronous communication (it doesn`t depend on calculated results) and that makes threading very easy (also very important)

    I came up with a message/channel approach. One (or more) component generates messages and posts those messages in a channel. The receiver of the channel could load the required data, and posts the result in the following channel. The next receiver could do some heavy calculation on this data and post them in the following channel etc etc. This approach makes it easy to connect components in different ways, eg: I could send the same message to a channelsplitter so a lot of channel-endpoints recieve the message. This is what I need.

    I was very happy with this design, but there is a big problem: it looks like those components could be connected in different ways, but they sometimes depend on each other. A receiver of a message could depend on data calculated by the sender, this makes it difficult to connect them in different ways. And how could new data (that is not part of the original message) be transported to a receiver (I calculate a lot of new data that is very specific and only specialized receivers know that to do with it)? I could use a map or wrap the old message, with a new one and add new fields to message. Now the receiver of that transformed message, has to cast the message to the transformed message to recieve those new fields. This doesn`t make a great design imho.

    Should I drop the 'one fits all' message channel and create a new channel and message for every new type message? Who has any experience with new calculated data in message channels?

    [edit]
    I can`t create a message that has all the fields for all new calculated data. A single message, could be send to different receivers that all calculated different data and require different fields.

  • #2
    Ok lets see what I can provide to improve your special problem. First of all we need to find the most abstract level so we can generalize most of existing solutions to that point and check what matches and what is not helpful.

    So lets start with the things you have provided:

    I have a system that can be compared to a productionline in a factory, transforming data, calculating new data. This system has to be very flexible, so components have to be loosely coupled. The system allows asynchronous communication (it doesn`t depend on calculated results) and that makes threading very easy (also very important)
    We have several interesting catch phrases here.

    1. Asynchronous communication -> Event driven design / Event driven distributed networks -> (synchronization -> (Queues | clock tick | synchronization event | semaphore) | Commands | Action | Entity values / Representation)

    2. comparable to a productionline in a factory -> (Processing stations | transport | Control -> finite state machine -> determinism -> cycle prevention | Flow -> Workflow )

    3. system needs to be very flexible -> components loosely coupled -> determinism -> Cycle prevention

    4. threading very important -> synchronisation -> (above)


    message/channel approach
    Abstractum:
    * message -> information -> immatrial thing -> object
    * channel -- associated with --> road / flow / bidirection vs unidirection

    I was very happy with this design, but there is a big problem: it looks like those components could be connected in different ways, but they sometimes depend on each other.
    Depending on each other -> states -> finit state machine | workflow | processing steps | action hierachie -> composition | aggregation | association | specialisation | generalisation

    A receiver of a message could depend on data calculated by the sender, this makes it difficult to connect them in different ways.
    Key-requirements:

    * receiver of message could depend on data calculated by a sender
    -- generalize --> * one processing unit may depend on output of other processing unit

    * Connecting depending reviecers needs to be easy
    -- generalize --> * interconnecting processing units needs to be easy

    'easy' refers to complexitivity management which means: Compositum, Hierarchies, Grouping, Semantic checking (workplan satisfied in terms of dependencies)

    And how could new data (that is not part of the original message) be transported to a receiver (I calculate a lot of new data that is very specific and only specialized receivers know that to do with it)?
    Indicates a need of specialisation -> Polymorphism, Casting, Extension, Referred Objects, Associated Objects, Meta state etc.

    I could use a map or wrap the old message, with a new one and add new fields to message. Now the receiver of that transformed message, has to cast the message to the transformed message to recieve those new fields. This doesn`t make a great design imho.
    I agree.

    Should I drop the 'one fits all' message channel and create a new channel and message for every new type message? Who has any experience with new calculated data in message channels?
    Ok here is what I think:

    From the above hip-shot analysation I got the following picture:

    We are talking about an object oriented systems (objects intercooperate and special objects have special behaviours). The objects do not xchange messages but informations, so we should drop talking about messages here since this is referring to an unnecessary detail of the model.

    I am also having a problem with the term information in this context. A information can be everything and we are talking about informations for a certain task. Since I used terms of work and workflow, so we might apply the language of workflow systems:

    Lets use this speech:
    - Unit of Work ak WorkUnit instead of message
    - Processing Unit instead of receiver / sender
    - Processing instead of transformation / creation.

    There are some patterns related to workflow out there but the I had studied where poorly written but very interesting. Saidly this also was about two years ago, so I am out of concret memory and I am to lazy to glaze over some resources by now. I don't think we need to do this anyway, since mostly those pattern are only interesting for learning new works and detail concepts, the rest is mostly just specialisation of things we already know.

    Back to the work:

    Lets convert the message/channel picture to the workflow area:

    A WorkUnit is composed of informations representing a certain state -> stateful WorkUnit.
    A WorkUnit is processed (and processing is a translation from one state to another / function / transformation) by Processing Units.
    A Processing Unit is only interested in certain types of Work Units -> Filtering | Selecting | Concurrency problem
    A Processing Unit may depend on the work of another processing unit -> Special state of a work unit, dependency graph, cyclic free, stratification.
    Processing Units must be placed and configured easily.
    Multithreading scenario: More then one Processing Unit may work with different WorkUnits -> Synchronization, dispatching, queue, stack etc.

    Assumtion: Since you said asynchron, I don't guess you try to add a timeliness to the system (adding timing constraints like deadlines which would require scheduling).

    So lets go for the XP term and think of the most simple solution that we can think of:

    WorkUnit:
    -----------

    Needs to represent several states + Flexible content
    From Odell we know that a state is nothing more than an attribute and attributes can be expressed by associated objects (which can be a property, too) and by specialisation (polymorphism). I guess we want to make it easy, lets go for a property here.

    I faced a similar problem with a measurement framework for code quality where I used Polymorphism of Result objects (like TypeReferringResult etc.). But we are lazy so lets go for a String variable:

    Code:
    type WorkUnit {
       String getType();  
    }
    From the above analysation we learned the following thing:

    Originally posted by myself
    Indicates a need of specialisation -> Polymorphism, Casting, Extension, Referred Objects, Associated Objects, Meta state etc.
    The best would be to use associated objects in conjunction with extension (allowing to add things).

    I am currently facing a similar problem with a document repository (also workflow realted) where I need to map diffrent types of documents towards a meta document format. I simply used a xml representation but this is an implementation detail. The What part looks like this:

    Code:
    type IDocument {
        DocumentType getType(); //Enum object
        String getContent(String section); //String representation of the section
    }
    I guess we can use this, too. Since we are lazy and we only talk about the Java world, we are going for serialisation. The final WorkUnit looks like this:

    Code:
    type WorkUnit {
        String getType();
        Object getInformation(String informationSection);
    }
    We might go for <T> T getInformation(Class<T extends Information> type); instead but we are lazy... .


    Now we need to deal with the channel replacement. We need a dispatcher / manager solution of dispatching a work unit toward an appropiated (note I used an) ProcessingUnit.

    Code:
    type WorkUnitDispatcher &#123;
         WorkUnit dispatch&#40;WorkUnit&#41;;
         register&#40;ProcessingUnit&#41;;
         isRegistered&#40;ProcessingUnit&#41;;
         deregister&#40;ProcessingUnit&#41;;
    &#125;
    So we have an easy WorkUnitDispatcher which task is to dispatch a given WorkUnit towards the best ProcessingUnit interested in this kind of work unit.

    Next thing is our ProcessingUnit. In the above type we make the ProcessingUnit responsible to say in what kind of WorkUnit it is interested. Since we are lazy, we don't use extensive pre-processing to map a ProcessingUnit to a WorkUnit types. Lets use this one (I used this for the code quality measuring tool too):

    Code:
    type ProcessingUnit &#123;
        boolean isProcessable&#40;WorkUnit&#41;;
        WorkUnit process&#40;WorkUnit&#41;;
    &#125;
    So the dispatcher simply iterates to all registred ProcessingUnits and if it founds one saying the certain WorkUnit is processable by it, the dispatcher simply passes the WorkUnit to the ProcessingUnit and returning the resulting work unit. If no ProcessingUnit is able to process a WorkUnit we know we have a problem and that a WorkUnit is not processable by the system anymore.

    ----
    With the above solution we can process a single work unit by doing the following:

    Code:
    WorkUnit process&#40;WorkUnit&#41; &#123;
        try &#123;
           while&#40;true&#41; &#123;
              workUnit=workUnitDispatcher.process&#40;workUnit&#41;;
           &#125;
        &#125;
        catch&#40;WorkUnitNotProcessableException e&#41; &#123;
             return workUnit;
        &#125;
    &#125;
    Since we want to have concurrency and multi-threading we need a multi-thread version:

    That means we need a kind of concurrent queue (FIFO Stack/Queue). So it goes like this:

    Code:
    type ProcessingThread &#123;
         void run&#40;&#41; &#123;
              while&#40;true&#41; &#123;
                 processNextWorkUnit&#40;&#41;;
              &#125;
         &#125;
         processNextWorkUnit&#40;&#41; &#123;
             WorkUnit unit=concurrentQueue.getNextWorkUnit&#40;&#41;;
              try &#123;
                   unit = workUnitDispatcher.process&#40;unit&#41;;
                   concurrentQueue.addWorkUnit&#40;unit&#41;;
              &#125; 
              catch&#40;WorkUnitNotProcessableException e&#41; &#123;
                   handleErrorDueLockingOrWhatEver&#40;&#41;;
              &#125;
       &#125;
    &#125;
    So now we can process a vast of WorkUnits using concurrent processing of a flexible 'web' of ProcessingUnits (you might setting up such a web using XML / Spring definition / Contribution /Plugin framework) .

    So what is left?

    Since we want to be deterministic we need an End and a Beginning and also Cycle prevention, but remember we are dead lazy.

    The beginning is simple, just add a new WorkUnit to the concurrentQueue and watch a worker thread comming along and picking it up. The end is now a problem. We need to have a end state of a processor. One idea would be abusing our ProcessingUnit model and add a processing unit returning a null object when it processes a WorkUnit in an end state. Would work but we are architects and not a bunch of kids playing in a sandbox. So we would like to have something better.

    Since everything is connected via the concurrentQueue, we just add another responsibility. We add the responsibility to pass every work unit to a given WorkUnitConsumer. if no consumer for a given WorkUnit exist, the work unit is placed in the queue for further processing.

    The concurrentQuere gets renamed to WorkUnitQueue and gets the following methods:

    Code:
    type WorkUnitQueue &#123;
        private final concurrentQueue;
        
        public registerConsumer&#40;WorkUnitConsumer&#41;;
        public addWorkUnit&#40;WorkUnit workUnit&#41; &#123;
            for&#40;WorkUnitConsumer consumer &#58; registeredConsumer&#41; 
                 if&#40;consumer.isInterestedIn&#40;workUnit&#41;&#41; &#123;
                    consumer.consume&#40;workUnit&#41;;
                    return;
                 &#125;
            concurrentQueue.add&#40;workUnit&#41;;
        &#125;
    &#125;
    Now the only thing left in terms of determinism is the cyclic prevention. We could go for a stratification approach and test if the given ProcessingUnits are forming a cyclic free web, but hey we are lazy and also it might be worth using cycles to design recursive WorkUnit processing, so we are going for an optimistic approach (testing beforehand is pessimistic and means special afford, and we are lazy).

    So what can we do? Lets take a look at other areas, cycle prevention is done optimistically. The first example I can think of is the internet! Yeah the internet was made by some smart and very lazy dudes. They where quite optimistic in terms of routing. They use something called a hop counter. Every package that is delivered by the net has a counter attached counting each server the packages passes through. If a given counter value is reached the package is thown away without further notice.

    So lets go for a hop counter. If a logical(!) WorkUnit has been processed for a 100 times we expect it to be in a live lock situation (anything goes but nothing goes further) , and it gets thown away. So where do we want to implement it? Since we need a single point of responsible the point of responsiblity is the adding of a WorkUnit to the queue.

    But before we start adding additional logic to the WorkUnitQueue's add method, we just think twice and do it the lazy way. Before we specialize we just compose before we compose we aggregate. And so we remember the WorkUnitConsumer type. Lets just add a consumer for logical(!) WorkUnits being in a livelock:

    Code:
    type LiveLockedWorkUnitConsumer &#123;
        boolean isInterestedIn&#40;WorkUnit workUnit&#41; &#123;
            return workUnit.getHopCounter&#40;&#41; >= 100;
        &#125;     
        void consume&#40;WorkUnit workUnit&#41; &#123;
            logTheLiveLockSituation&#40;workUnit&#41;;
            doSomeErrorHandling&#40;&#41;;
        &#125;
    &#125;

    Ok this is a neat solution. But think again. A processing step of a certain logical WorkUnit might take ages where as another processing is done on the fly. We wouldn't want to take out the fast one just after 100 iteration (or 2 seconds) where the other one spend 2 hours of processing time before being taken out. So instead of a hop we just use a timely approach. So we are luck to specify a deadline timestamp. Every logical WorkUnit spending more time then it is specified to spend at maximum, is taken out of the queue and is being logged. So we need to modifiy our consumer:


    Code:
    type LiveLockedWorkUnitConsumer &#123;
        boolean isInterestedIn&#40;WorkUnit workUnit&#41; &#123;
            return workUnit.getDeadline&#40;&#41; < System.currentTimeMillis&#40;&#41;;
        &#125;     
        void consume&#40;WorkUnit workUnit&#41; &#123;
            logTheLiveLockSituation&#40;workUnit&#41;;
            doSomeErrorHandling&#40;&#41;;
        &#125;
    &#125;

    So in the end we got a timely specification, we have load balancing since we got threading (meaning time sharing schedule style), we have:

    * a scalable aproach -> faster machine results in fast processing
    * we have a flexible system which can be easily configured by using spring -> Just add additional consumer to the WorkUnitQueue object and add additional ProcessingUnits to the WorkUnitDispatcher.
    * We have easy live lock and dead lock prevention -> Consumer is handling timeouts and the queue is handling WorkUnits being not processable by the web of ProcessingUnits.

    And most importantly we didn't received the kiss of dead since everything is dead simple and neat.


    Thats just my 2+x cents,

    Cheers,

    Martin (Kersten)

    PS: It was interesting so I don't care the time I spend in writing this vast stuff, nice training also... .
    PSS: Uff to much to proof read, must get some sleep boys... sorry.

    Comment


    • #3
      Boy if I think twice, this looks like the MVC framework. Just filters and pre/post processors + a bunch of validators... .

      Comment


      • #4
        I`ll read it the first thing in the morning (I was hoping you would write something).

        btw: I don`t have any problems with the rest of the design and the threading part (I`m a big lover of the concurrency library of doug lea). I`m only interested in the message transformation part.

        Comment


        • #5
          :lol: Hehe.

          Another addition, if you go for polymorphism for your WorkUnits (would make more sense in the long run), you might use the implementation class as a type string replacement:

          instead of: workUnit.getType().equals("FinancePreProcessedData ");
          it would be: workUnit.getClass().getName().equals("org.mypackag e.FinancePreProcessedData")
          or you go for direct class processing like instanceof or something similar. Anything goes... .


          Cheers,

          Martin (Kersten)

          Comment


          • #6
            I have read your reply, but it doesn`t answer my question. I have been thinking about it last night and this morning for quite some time. and finally came to the conclusion that I don`t need a single type of channel (a channel that accepts all kinds of messages)

            You could see a piece of functionality adding data to a specifc message as a single messaging subsystem that requires its own messagetype and channel (that only allows messages of that messagetype). The components that act on that messagetype can be replaced but I can`t swap them for components that need other messages.

            So: Every system that needs his own data, is going to have its own channels and messages. Messages of different types can`t flow through the channels anymore.

            And offcource I can reuse a lot of functionality by using polymorfism (generics and subclassing).

            /me loves asynchronous communication and threading.

            Comment


            • #7
              I have read your reply, but it doesn`t answer my question.
              Thats a sad thing. :-) Never the less I enjoyed recapping and thinking on it. A hour worth spending... . Also I got something out of it, I can use my self later this week. But maybe you want to drop talking about messages in such a domain anyway.

              Every system that needs his own data, is going to have its own channels and messages. Messages of different types can`t flow through the channels anymore.
              Never came across such a system. I often see either special receiver or general work flow systems and your problem description sounded like a general work flow system with contributed ProcessorUnits. Wrong target nice hit... .

              /me loves asynchronous communication and threading.
              Nothing wrong with this. I like the 'everything is a contribution' thingy :-).

              Comment


              • #8
                Originally posted by Martin Kersten
                I have read your reply, but it doesn`t answer my question.
                Thats a sad thing. :-) Never the less I enjoyed recapping and thinking on it. A hour worth spending... . Also I got something out of it, I can use my self later this week. But maybe you want to drop talking about messages in such a domain anyway.
                What is your problem with the 'message/channel' name? This how it is called (also knows as pipes/filters). Check: Enterprise Integration Patterns.

                Never came across such a system. I often see either special receiver or general work flow systems and your problem description sounded like a general work flow system with contributed ProcessorUnits.
                One single messaging subsystem can use the same message, no problems here. So for the subsystem itself it is a generics messaging system.

                Comment


                • #9
                  What is your problem with the 'message/channel' name? This how it is called (also knows as pipes/filters). Check: Enterprise Integration Patterns.
                  Its about the how. A message means nothing more than data / information passed from one point to another. It means nothing in the matter of the what. A meassage does not reveal reason why it is existing.

                  I don't know, but for me this smells like a work flow system. A production line is a work flow system. Since a dispatcher requires usally no time and we can replicate processing units out of thin air (something good old Mr. Ford was not able to do), a processing chain is only implemented logically (by making processing units responsible for processing of certain kind of work units). This is the good old inversion of responsibility happening here (in a positiv turn) and this pushes the system towards simplicity. But it depends on the point of view and this is just the way I like to look at things. Nothing worth to argue, just a habbit.

                  Anyway did you check if your solution can profitate from any of the workflow related thinking?


                  One single messaging subsystem can use the same message, no problems here. So for the subsystem itself it is a generics messaging system.
                  The question is, what a message really stands for. If you pass a serialized object towards another node using messages, this is only the half of the picture. Logically you see the object traveling to the other node or if the object is cloned, you see a clone/copy traveling. A message is about the 'how', and that's what I don't care for, since it comes along naturally. It is usally the 'what', that causes pain in the end.

                  If I struggle implementing something or if the test suite is screwing itself up, that is mostly the time, when I just think that I don't know 'what' I do. Integration is like design patterns. It has nothing to do with analysis patterns at all - different worlds and if the analysis is flawed than design patterns and integration patterns can only help to make things worst.

                  But I guess I was just too focused on this part:

                  Should I drop the 'one fits all' message channel and create a new channel and message for every new type message? Who has any experience with new calculated data in message channels?
                  You know: One fits all, message channel in conjunction with new calculated data. This was pondering me and made me think: Workflow! And this turns everything like messages into implementational details which means this question is not part of the solution but part of 'how' can a solution be finally implemented. That's why I still think that talking about workflow is more appropriated and has a great help possiblity rating attached with it. But as you might imagen, I am lacking the broader picture about what new calculated data and extension means.

                  Maybe another idea would be taking a look at protocol extensions (e.g certificate extensions for Public Key infrastructure or the Java's byte code format extensions etc.) Maybe this might add some value here, too.


                  Cheers,

                  Martin (Kersten)

                  Comment


                  • #10
                    Originally posted by Martin Kersten
                    What is your problem with the 'message/channel' name? This how it is called (also knows as pipes/filters). Check: Enterprise Integration Patterns.
                    Its about the how. A message means nothing more than data / information passed from one point to another.
                    It depends. There are styles that allow methods to be added to the message, making the message like a command

                    I don't know, but for me this smells like a work flow system. A production line is a work flow system. Since a dispatcher requires usally no time and we can replicate processing units out of thin air (something good old Mr. Ford was not able to do), a processing chain is only implemented logically (by making processing units responsible for processing of certain kind of work units). This is the good old inversion of responsibility happening here (in a positiv turn) and this pushes the system towards simplicity. But it depends on the point of view and this is just the way I like to look at things. Nothing worth to argue, just a habbit.
                    I think workflow and messaging systems are two different views on the same type of system.

                    The question is, what a message really stands for. If you pass a serialized object towards another node using messages, this is only the half of the picture. Logically you see the object traveling to the other node or if the object is cloned, you see a clone/copy traveling. A message is about the 'how', and that's what I don't care for, since it comes along naturally. It is usally the 'what', that causes pain in the end.
                    It depends.. if you use the command-message you also add the what (to do) part to the message.

                    This was pondering me and made me think: Workflow!
                    Workflow can be done with messagechannels. Do you know Enterprise Integration Patterns : Designing, Building, and Deploying Messaging Solutions. Working with message channels creates a very flexible system... it`s like blumbing.. just connecting the parts together.. splitters.. adapters.. stoppers... but you don`t want to connect all the systems together.. you don`t want to use the same kind of pipes for the water system as for the sewage system. A turd is to big for the watering system.. so it depends on the needs which system to use.. I have different needs so.. different systems that need their own kind of messages...

                    Comment


                    • #11
                      Do you know Enterprise Integration Patterns : Designing, Building, and Deploying Messaging Solutions.
                      Yap, I own a copy of that book and read it. As the name said it's about integration patterns. But I will write me a note and reread those parts you mentioned at the weekend, just to be sure about. Maybe I miss something here... .

                      It depends. There are styles that allow methods to be added to the message, making the message like a command
                      Well a command is nothing more then a special message, I guess. Some tells some to do something. Thats communication and communication is based on messages (ok messages is an interpretation of a cause but I guess, you know what I mean).

                      In a command driven approach (or message driven approach) there are command processors ( message consumers / producers) processing each command (message). And in the end everything comes down to event driven design (as a work flow solution may also apears to be like).

                      There is also this picture, that if an object calls another object's method it sends a message to the other object. I have a different picture of a workunit and a message but maybe I am splitting hairs or I am just incompatible with common sense, I don't know.

                      What is the acutal task you are trying to achieve with your solution? What is processed/calculated and how? Is it distributed among specialised nodes?


                      Cheers,

                      Martin (Kersten)

                      Comment


                      • #12
                        Originally posted by Martin Kersten
                        Do you know Enterprise Integration Patterns : Designing, Building, and Deploying Messaging Solutions.
                        What is the acutal task you are trying to achieve with your solution? What is processed/calculated and how? Is it distributed among specialised nodes?
                        It is part of a searchengine. One of the message producers is a crawler. A Crawler could detect that a new file is found, creates a message and is droppes it in a channel.

                        At the moment:
                        The receiver of that channel is a 'loader' that loads the data in memory and than it drops the message with the loaded data in the following channel. The receiver of this channel strips all the stop words, does stemming and analyzes the text and it drops the result in the indexer.

                        But there are other scenario`s possible. If I need topic detection, clustering analysis, or page ranking, I could use a channel splitter in the beginnen and send the message to the current messaging system and the message to another system that different kinds of analysis.

                        If I want to reject files (to big, not allowed to be indexed etc) I could add a new channel receiver before the loader.. that filters the bad ones out.

                        Comment


                        • #13
                          So this is really event driven. I guess using a chain of responsibility like every web app does is the similar. Did you checked how Tomcat's implementation looks like? Actually they just have worker threads and a dispatcher up-front. Then add the filters and interceptors (or whatever) to the picture.

                          But you are right in such a particular situation I wouldn't talk about a workUnit since there are informations outside the scope of the message which makes the actual unit of work in conjunction. In a normal workflow system only the 'unit of work' is effected by processing directly (in a primary stance). I don't know how far the information retrieval parts go and what else is done. But I guess there are additional resources being updated.

                          But I would dispatch the events according to the event types and use polymorphism. Maybe you can use this one, too:


                          Code:
                          CompositeMessage &#123;
                          
                               List<Message> getParts&#40;&#41;;
                               <T> List<T> getPartsByType&#40;Class<T extends Message>.class&#41;;
                          &#125;
                          It would allow easy composition and reuse of the message parts being not altered by a message consumer... .


                          Cheers,

                          Martin (Kersten)

                          PS: This discusion was continued and end using ICQ... .

                          Comment

                          Working...
                          X