Announcement Announcement Module
Collapse
No announcement yet.
JSON Filtering, Transforming , and Splitting in SI Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • JSON Filtering, Transforming , and Splitting in SI

    We have some working code that utilizes the Rabbit Management HTTP API. We have a need to poll the queues and look for messages_unacknowledged, which cause messages to build up in the queue. As you know, the Rabbit HTTP API returns JSON that looks like this:

    Code:
    {"memory":12712,"idle_since":"2013-07-08 20:51:54","exclusive_consumer_tag":"","messages_ready":0,"messages_unacknowledged":1,"messages":0,"consumers":1,"slave_nodes":[],"backing_queue_status":{"q1":0,"q2":0,"delta":["delta","undefined",0,"undefined"],"q3":0,"q4":0,"len":0,"pending_acks":0,"target_ram_count":"infinity","ram_msg_count":0,"ram_ack_count":0,"next_seq_id":19,"persistent_count":0,"avg_ingress_rate":0.0,"avg_egress_rate":0.0,"avg_ack_ingress_rate":0.0,"avg_ack_egress_rate":0.0},"messages_details":{"rate":0,"interval":35386563,"last_event":1373316714945},"messages_ready_details":{"rate":0,"interval":35386563,"last_event":1373316714945},"messages_unacknowledged_details":{"rate":0,"interval":35386563,"last_event":1373316714945},"name":"publish","vhost":"/","durable":true,"auto_delete":false,"arguments":{},"node":"rabbit@ip-XX-XXX-XXX-X"}
    Of course there are multiple JSON objects returned and we would like to filter by messages_unacknowledged > 0 then split the JSON into individual messages, sending the list of queues that have unacked messages (ie "name":"publish") to a channel downstream for processing by a jdbc outbound adapter. I am not sure we can use a filter with SPEL on the JSON. I saw this issue https://jira.springsource.org/browse/INTEXT-29. We are running SI 3 M2.

    Here are some of the questions I have:

    Does the JSON SPEL on the filter below look good? If not I will implement a custom Filter.

    Also will the JSON to Object transformer transform the JSON to a LinkedHashMap in the transformer defined below?

    Will the splitter split the JSON objects into individual messages?

    Here is our config so far:
    Code:
    <int:channel id="exceptionTransformationChannel"/>
    
    	<!--Requirement 11:  Lock servers with unacked queues -->
    	<int:inbound-channel-adapter channel="getRabbitQueues" expression="''">
    		<int:poller fixed-delay="60000"></int:poller>
    	</int:inbound-channel-adapter>
    	
    
    	<int-http:outbound-gateway id="rabbitQueueHttpGateway"
    		request-channel="getRabbitQueues"
    		url="http://localhost:55672/api/queues"
    		http-method="GET"
    		charset="UTF-8"
    		reply-timeout="5000"
    		reply-channel="rabbitQueues"
    		request-factory="httpClientFactory"> 
    	</int-http:outbound-gateway>
    
    	<int:channel id="rabbitQueues"/>
    	<int:filter id="filterUnackedQueues" input-channel="rabbitQueues" output-channel="unAckedQueues" expression="#json(payload.jsonProp, '/messages_unacknowledged') &gt; 1"/>
    	<int:channel id="unAckedQueues"/>	
    
    
    	<int:chain input-channel="unAckedQueues" output-channel="unAckedQueue">
    		<int:splitter />                                                         
    	</int:chain>
    
    	<int:channel id="unAckedQueue"/>	
    
    	
    	<int:json-to-object-transformer input-channel="unAckedQueue" output-channel="lockedQueue"
       	 type="java.util.LinkedHashMap"/>
    
    	<int:channel id="lockedQueue"/>	
    
    	<int:service-activator input-channel="lockedQueue" output-channel="nullChannel" expression="@lockServer.exchange(#root)" />  
    
    	<int:gateway id="lockServer" default-request-channel="updateLockonServer" error-channel="exceptionTransformationChannel" />                        
    
    	<int:chain input-channel="updateLockonServer" id="UpdateServerSetLock">
    		<int-jdbc:outbound-gateway update="update queue set locked = 1 where queue_name = :payload[name]" data-source="dataSource" />                                                         
    	</int:chain>

  • #2
    Hello!
    Does the JSON SPEL on the filter below look good?
    As you see the "JsonPath Support" feature isn't implemented. And, of course, the SpEL function "#json()" isn't here too. Now we're working under SpEL-functions support - https://github.com/SpringSource/spri...ation/pull/827 - and #xpath and #json functions will be some further tasks out-of-the-box. But for a start we should implement "JsonPath Support" at all, inpedendently of SpEL-functions.
    So, you shoud:
    I will implement a custom Filter
    Also will the JSON to Object transformer transform the JSON to a LinkedHashMap in the transformer defined below?
    Yes, it will. And it is doing it perfectly already now.

    Will the splitter split the JSON objects into individual messages?
    Theoretically, yes. I assume the result of #json will be a List<String>. So it is appropriate type for generic <splitter>.
    However, as you see in the JIRA issue we are going to introduce <json:splitter>, something similar to <int-xml:xpath-splitter>.

    Total: what you want isn't in the framework, so you should do some work yourself. If you have good ideas (any ideas), the contributions are alwasy welcome: https://github.com/SpringSource/spri...ONTRIBUTING.md

    Take care,
    Artem

    Comment


    • #3
      TRansform JSON to LinkedHashMap Problem

      So we reworked our flow taking into account that certain things are implemented yet.

      Code:
      	<int:inbound-channel-adapter channel="getRabbitQueues" expression="''">
      		<int:poller fixed-delay="60000"></int:poller>
      	</int:inbound-channel-adapter>
      	
      
      	<int-http:outbound-gateway id="rabbitQueueHttpGateway"
      		request-channel="getRabbitQueues"
      		url="http://localhost:55672/api/queues"
      		http-method="GET"
      		charset="UTF-8"
      		reply-timeout="5000"
      		reply-channel="rabbitQueues"
      		request-factory="httpClientFactory"
      		expected-response-type="java.lang.String"
      	> 
      	</int-http:outbound-gateway>
      
      	<int:channel id="rabbitQueues"/>
      
      	<int:json-to-object-transformer input-channel="rabbitQueues" output-channel="hashedQueues"
         	 type="java.util.LinkedHashMap"/>
      
      	<int:channel id="hashedQueues"/>
      
      	<int:filter id="filterUnackedQueues" input-channel="hashedQueues" output-channel="unAckedQueues" expression="payload[messages_unacknowledged] &gt; '1'"/>
      	
      	<int:channel id="unAckedQueues"/>	
      
      
      	<int:chain input-channel="unAckedQueues" output-channel="unAckedQueue">
      		<int:splitter />                                                         
      	</int:chain>
      Unfortunately, we are having problem converting the JSON to a LinkedHashMap.

      13:09:55.825 DEBUG [task-scheduler-1][org.springframework.integration.http.outbound.Http RequestExecutingMessageHandler] handler 'org.springframework.integration.http.outbound.Htt pRequestExecutingMessageHandler#0' sending reply Message: [Payload=[{"memory":9656,"idle_since":"2013-07-02 11:27:44","exclusive_consumer_tag":"","messages_re ady":0,"messages_unacknowledged":0,"messages":0,"c onsumers":0,"slave_nodes":[],"backing_queue_status":{"q1":0,"q2":0,"delta":["delta","undefined",0,"undefined"],"q3":0,"q4":0,"len":0,"pending_acks":0,"target_ra m_count":0,"ram_msg_count":0,"ram_ack_count":0,"ne xt_seq_id":0,"persistent_count":0,"avg_ingress_rat e":0.0,"avg_egress_rate":0.0,"avg_ack_ingress_rate ":0.0,"avg_ack_egress_rate":0.0},"messages_details ":{"rate":0,"interval":447012968,"last_event":1372 764464447},"messages_ready_details":{"rate":0,"int erval":447012968,"last_event":1372764464447},"mess ages_unacknowledged_details":{"rate":0,"interval": 447012968,"last_event":1372764464447},"name":"data base.errors","vhost":"/","durable":true,"auto_delete":false,"arguments":{ },"node":"rabbit@ip-10-122-193-4"},REST OF JSON OMMITTED}]][Headers={timestamp=1373461795824, id=d7f515f5-8044-4036-9dc8-077f285b77a4, history=getRabbitQueues,rabbitQueueHttpGateway, Date=1373461795000, Content-Length=6853, http_statusCode=200, Content-Type=application/json, Server=MochiWeb/1.1 WebMachine/1.9.0 (someone had painted it blue), Cache-Control=no-cache}]
      13:09:55.850 ERROR [task-scheduler-1][org.springframework.integration.handler.LoggingHan dler] org.springframework.integration.transformer.Messag eTransformationException: failed to transform message
      at org.springframework.integration.transformer.Abstra ctTransformer.transform(AbstractTransformer.java:4 4)
      at org.springframework.integration.transformer.Messag eTransformingHandler.handleRequestMessage(MessageT ransformingHandler.java:67)
      at org.springframework.integration.handler.AbstractRe plyProducingMessageHandler.handleMessageInternal(A bstractReplyProducingMessageHandler.java:137)
      at org.springframework.integration.handler.AbstractMe ssageHandler.handleMessage(AbstractMessageHandler. java:73)
      at org.springframework.integration.dispatcher.Unicast ingDispatcher.doDispatch(UnicastingDispatcher.java :115)
      at org.springframework.integration.dispatcher.Unicast ingDispatcher.dispatch(UnicastingDispatcher.java:1 02)
      at org.springframework.integration.channel.AbstractSu bscribableChannel.doSend(AbstractSubscribableChann el.java:77)
      at org.springframework.integration.channel.AbstractMe ssageChannel.send(AbstractMessageChannel.java:178)
      at org.springframework.integration.core.MessagingTemp late.doSend(MessagingTemplate.java:304)
      at org.springframework.integration.core.MessagingTemp late.send(MessagingTemplate.java:165)
      at org.springframework.integration.handler.AbstractRe plyProducingMessageHandler.sendMessage(AbstractRep lyProducingMessageHandler.java:223)
      at org.springframework.integration.handler.AbstractRe plyProducingMessageHandler.sendReplyMessage(Abstra ctReplyProducingMessageHandler.java:207)
      at org.springframework.integration.handler.AbstractRe plyProducingMessageHandler.produceReply(AbstractRe plyProducingMessageHandler.java:172)
      at org.springframework.integration.handler.AbstractRe plyProducingMessageHandler.handleResult(AbstractRe plyProducingMessageHandler.java:166)
      at org.springframework.integration.handler.AbstractRe plyProducingMessageHandler.handleMessageInternal(A bstractReplyProducingMessageHandler.java:144)
      at org.springframework.integration.handler.AbstractMe ssageHandler.handleMessage(AbstractMessageHandler. java:73)
      at org.springframework.integration.dispatcher.Unicast ingDispatcher.doDispatch(UnicastingDispatcher.java :115)
      at org.springframework.integration.dispatcher.Unicast ingDispatcher.dispatch(UnicastingDispatcher.java:1 02)
      at org.springframework.integration.channel.AbstractSu bscribableChannel.doSend(AbstractSubscribableChann el.java:77)
      at org.springframework.integration.channel.AbstractMe ssageChannel.send(AbstractMessageChannel.java:178)
      at org.springframework.integration.channel.AbstractMe ssageChannel.send(AbstractMessageChannel.java:149)
      at org.springframework.integration.core.MessagingTemp late.doSend(MessagingTemplate.java:304)
      at org.springframework.integration.core.MessagingTemp late.send(MessagingTemplate.java:165)
      at org.springframework.integration.endpoint.SourcePol lingChannelAdapter.handleMessage(SourcePollingChan nelAdapter.java:97)
      at org.springframework.integration.endpoint.AbstractP ollingEndpoint.doPoll(AbstractPollingEndpoint.java :199)
      at org.springframework.integration.endpoint.AbstractP ollingEndpoint.access$000(AbstractPollingEndpoint. java:51)
      at org.springframework.integration.endpoint.AbstractP ollingEndpoint$1.call(AbstractPollingEndpoint.java :143)
      at org.springframework.integration.endpoint.AbstractP ollingEndpoint$1.call(AbstractPollingEndpoint.java :141)
      at org.springframework.integration.endpoint.AbstractP ollingEndpoint$Poller$1.run(AbstractPollingEndpoin t.java:273)
      at org.springframework.integration.util.ErrorHandling TaskExecutor$1.run(ErrorHandlingTaskExecutor.java: 52)
      at org.springframework.core.task.SyncTaskExecutor.exe cute(SyncTaskExecutor.java:48)
      at org.springframework.integration.util.ErrorHandling TaskExecutor.execute(ErrorHandlingTaskExecutor.jav a:49)
      at org.springframework.integration.endpoint.AbstractP ollingEndpoint$Poller.run(AbstractPollingEndpoint. java:268)
      at org.springframework.scheduling.support.DelegatingE rrorHandlingRunnable.run(DelegatingErrorHandlingRu nnable.java:53)
      at org.springframework.scheduling.concurrent.Reschedu lingRunnable.run(ReschedulingRunnable.java:81)
      at java.util.concurrent.Executors$RunnableAdapter.cal l(Executors.java:471)
      at java.util.concurrent.FutureTask$Sync.innerRun(Futu reTask.java:334)
      at java.util.concurrent.FutureTask.run(FutureTask.jav a:166)
      at java.util.concurrent.ScheduledThreadPoolExecutor$S cheduledFutureTask.access$101(ScheduledThreadPoolE xecutor.java:165)
      at java.util.concurrent.ScheduledThreadPoolExecutor$S cheduledFutureTask.run(ScheduledThreadPoolExecutor .java:266)
      at java.util.concurrent.ThreadPoolExecutor.runWorker( ThreadPoolExecutor.java:1146)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:615)
      at java.lang.Thread.run(Thread.java:679)
      Caused by: org.codehaus.jackson.map.JsonMappingException: Can not deserialize instance of java.util.LinkedHashMap out of START_ARRAY token
      at [Source: java.io.StringReader@3c56b64c; line: 1, column: 1]
      at org.codehaus.jackson.map.JsonMappingException.from (JsonMappingException.java:163)
      at org.codehaus.jackson.map.deser.StdDeserializationC ontext.mappingException(StdDeserializationContext. java:198)
      at org.codehaus.jackson.map.deser.MapDeserializer.des erialize(MapDeserializer.java:151)
      at org.codehaus.jackson.map.deser.MapDeserializer.des erialize(MapDeserializer.java:25)
      at org.codehaus.jackson.map.ObjectMapper._readMapAndC lose(ObjectMapper.java:2402)
      at org.codehaus.jackson.map.ObjectMapper.readValue(Ob jectMapper.java:1602)
      at org.springframework.integration.json.JacksonJsonOb jectMapper.fromJson(JacksonJsonObjectMapper.java:5 9)
      at org.springframework.integration.json.JsonToObjectT ransformer.transformPayload(JsonToObjectTransforme r.java:78)
      at org.springframework.integration.json.JsonToObjectT ransformer.transformPayload(JsonToObjectTransforme r.java:36)
      at org.springframework.integration.transformer.Abstra ctPayloadTransformer.doTransform(AbstractPayloadTr ansformer.java:33)
      at org.springframework.integration.transformer.Abstra ctTransformer.transform(AbstractTransformer.java:3 3)
      ... 42 more

      Comment


      • #4
        H-m-m.
        I've just tested it with your json:
        Code:
        JsonToObjectTransformer<LinkedHashMap> transformer = new JsonToObjectTransformer<LinkedHashMap>(LinkedHashMap.class);
        		String jsonString = "{\"memory\":9656," +
        				"\"idle_since\":\"2013-07-02 11:27:44\"," +
        				"\"exclusive_consumer_tag\":\"\"," +
        				"\"messages_re ady\":0," +
        				"\"messages_unacknowledged\":0," +
        				"\"messages\":0," +
        				"\"consumers\":0," +
        				"\"slave_nodes\":[]," +
        				"\"backing_queue_status\":{\"q1\":0,\"q2\":0,\"delta\":[\"delta\",\"undefined\",0,\"undefined\"],\"q3\":0,\"q4\":0,\"len\":0,\"pending_acks\":0,\"target_ra m_count\":0,\"ram_msg_count\":0,\"ram_ack_count\":0,\"ne xt_seq_id\":0,\"persistent_count\":0,\"avg_ingress_rat e\":0.0,\"avg_egress_rate\":0.0,\"avg_ack_ingress_rate \":0.0,\"avg_ack_egress_rate\":0.0}," +
        				"\"messages_details \":{\"rate\":0,\"interval\":447012968,\"last_event\":1372764464447}," +
        				"\"messages_ready_details\":{\"rate\":0,\"int erval\":447012968,\"last_event\":1372764464447}," +
        				"\"mess ages_unacknowledged_details\":{\"rate\":0,\"interval\": 447012968,\"last_event\":1372764464447}," +
        				"\"name\":\"data base.errors\"," +
        				"\"vhost\":\"/\"," +
        				"\"durable\":true," +
        				"\"auto_delete\":false," +
        				"\"arguments\":{ }," +
        				"\"node\":\"rabbit@ip-10-122-193-4\"},REST OF JSON OMMITTED}";
        		LinkedHashMap result = transformer.transformPayload(jsonString);
        and the result is:
        Code:
        [0] = {java.util.LinkedHashMap$Entry@778}"memory" -> "9656"
        [1] = {java.util.LinkedHashMap$Entry@1154}"idle_since" -> "2013-07-02 11:27:44"
        [2] = {java.util.LinkedHashMap$Entry@1157}"exclusive_consumer_tag" -> ""
        [3] = {java.util.LinkedHashMap$Entry@1160}"messages_re ady" -> "0"
        [4] = {java.util.LinkedHashMap$Entry@1163}"messages_unacknowledged" -> "0"
        [5] = {java.util.LinkedHashMap$Entry@1165}"messages" -> "0"
        [6] = {java.util.LinkedHashMap$Entry@1167}"consumers" -> "0"
        [7] = {java.util.LinkedHashMap$Entry@1169}"slave_nodes" ->  size = 0
        [8] = {java.util.LinkedHashMap$Entry@1172}"backing_queue_status" ->  size = 16
        [9] = {java.util.LinkedHashMap$Entry@1175}"messages_details " ->  size = 3
        [10] = {java.util.LinkedHashMap$Entry@1178}"messages_ready_details" ->  size = 3
        [11] = {java.util.LinkedHashMap$Entry@1181}"mess ages_unacknowledged_details" ->  size = 3
        [12] = {java.util.LinkedHashMap$Entry@1184}"name" -> "data base.errors"
        [13] = {java.util.LinkedHashMap$Entry@1187}"vhost" -> "/"
        [14] = {java.util.LinkedHashMap$Entry@1190}"durable" -> "true"
        [15] = {java.util.LinkedHashMap$Entry@1193}"auto_delete" -> "false"
        [16] = {java.util.LinkedHashMap$Entry@1196}"arguments" ->  size = 0
        [17] = {java.util.LinkedHashMap$Entry@1199}"node" -> "rabbit@ip-10-122-193-4"
        So, can you DEBUG JsonToObjectTransformer and see what he gets as an argument to transform.

        Comment


        • #5
          I noticed that if I chnage the json to object to serialize to to Java.util.List, the transform seems to work, but then my expression fails in the filter with
          Code:
           at java.lang.Thread.run(Thread.java:679)
          Caused by: org.springframework.expression.spel.SpelEvaluationException: EL1008E:(pos 7): Field or property 'messages_unacknowledged' cannot be found on object of type 'org.springframework.integration.message.GenericMessage'

          Comment


          • #6
            Working Code

            Originally posted by ehrdoctors View Post
            I noticed that if I chnage the json to object to serialize to to Java.util.List, the transform seems to work, but then my expression fails in the filter with
            Code:
             at java.lang.Thread.run(Thread.java:679)
            Caused by: org.springframework.expression.spel.SpelEvaluationException: EL1008E:(pos 7): Field or property 'messages_unacknowledged' cannot be found on object of type 'org.springframework.integration.message.GenericMessage'

            The solution was to tranform into list (because there is more than 1 JSON object), then split (so that each message is a LinkedHashMap, then filter, which is easy SPEL on a LinkedHashMap.

            The working code looks like this:

            Code:
            	<int:inbound-channel-adapter channel="getRabbitQueues" expression="''">
            		<int:poller fixed-delay="1000"></int:poller>
            	</int:inbound-channel-adapter>
            	
            
            	<int-http:outbound-gateway id="rabbitQueueHttpGateway"
            		request-channel="getRabbitQueues"
            		url="http://localhost:55672/api/queues"
            		http-method="GET"
            		charset="UTF-8"
            		reply-timeout="5000"
            		reply-channel="rabbitQueues"
            		request-factory="httpClientFactory"
            		expected-response-type="java.lang.String"
            	> 
            	</int-http:outbound-gateway>
            
            	<int:channel id="rabbitQueues"/>
            
            	<int:json-to-object-transformer input-channel="rabbitQueues" output-channel="hashedQueues"
               	 type="java.util.List"/>
            
            	<int:channel id="hashedQueues"/>
            
            	<int:chain input-channel="hashedQueues" output-channel="hashedQueue">
            		<int:splitter />                                                         
            	</int:chain>
            
            	<int:channel id="hashedQueue"/>
            
            	<int:filter id="filterUnackedQueues" input-channel="hashedQueue" output-channel="unAckedQueue" expression="payload[messages_unacknowledged] &gt; 0"/>
            	
            	<int:channel id="unAckedQueue"/>		
            
            	<int:service-activator input-channel="unAckedQueue" output-channel="nullChannel" expression="@lockServer.exchange(#root)" />  
            
            	<int:gateway id="lockServer" default-request-channel="updateLockonServer" error-channel="exceptionTransformationChannel" />                        
            
            	<int:chain input-channel="updateLockonServer" id="UpdateServerSetLock">
            		<int-jdbc:outbound-gateway update="update queue set locked = 1 where name = :payload[name]" data-source="dataSource" />                                                         
            	</int:chain>

            Comment


            • #7
              Great to hear you solved your task!

              Just some comments.
              By default JaksonMapper deserializes JSON to the Map.
              I figured you payload from <int-http:outbound-gateway> was List.
              There is no reason to wrap <splitter> to the <chain>
              expression="@lockServer.exchange(#root)" it's redundant. Just use ref="lockServer" and method="exchange"

              Comment


              • #8
                Originally posted by Cleric View Post
                Great to hear you solved your task!
                There is no reason to wrap <splitter> to the <chain>
                How should the splitter be configured ? I didn't think it had input/output channel attributes?

                Comment


                • #9
                  I didn't think it had input/output channel attributes?
                  Why have you thought so? http://static.springsource.org/sprin...ngle/#splitter
                  It's just an endpoint component, so it has any channel abilities as service-activator or filter.
                  Please, read the EIP book, Spring Integration in Action and SI Reference Manual, to close many confused questions.

                  Comment

                  Working...
                  X