Announcement Announcement Module
Collapse
No announcement yet.
Message Filter issue Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Message Filter issue

    We have a message pattern that is not doing the same thing given two similar messages. The stack trace from the message flow that fails to complete the pattern:

    Code:
    12:47:28.701 DEBUG [SimpleAsyncTaskExecutor-1][org.springframework.integration.channel.DirectChannel] preSend on channel 'publishRouterOut', message: [Payload={profileId=3181, groupName=0, oldGroupName=0, userId=48640, broadcastStreamName=48640-1717760938-165050000, serverName=xxxx.xx.xxx.xx, appInstName=h14, jobType=add, action=updateJob, username=h14, userpic=http://xxxxx/xxxx.jpg, t=1371214056351}][Headers={timestamp=1371214048700, id=a49bf251-bf88-49af-9713-ef04d9459284, content-type=application/octet-stream, amqp_receivedRoutingKey=xxxxx.publish, amqp_deliveryMode=PERSISTENT, instanceaction=[Payload={UPDATED=1}][Headers={timestamp=1371214048575, id=ccc4d706-c057-40d5-9594-c4025de918ad, content-type=application/octet-stream, amqp_receivedRoutingKey=xxxxx.publish, amqp_deliveryMode=PERSISTENT, amqp_receivedExchange=xxxxxx, amqp_redelivered=false, amqp_deliveryTag=4}], route=ip-xx-xx-xxx-x.ec2.internal, amqp_receivedExchange=xxxxx, idinstance=1959, amqp_deliveryTag=4, amqp_redelivered=false}]
    The one that succeeds is almost identical at the same point.



    Code:
    12:47:28.850 DEBUG [SimpleAsyncTaskExecutor-1][org.springframework.integration.channel.DirectChannel] preSend on channel 'publishRouterOut', message: [Payload={profileId=3181, groupName=0, oldGroupName=0, userId=64518, broadcastStreamName=64518-1319123379-463550000, serverName=xxx.xxx.xxxx.xxx, appInstName=h11, jobType=add, action=updateJob, username=h11, userpic=xxxxxxx.jpg, t=1371214056358}][Headers={timestamp=1371214048849, id=0c272126-1bdd-48e5-a8a1-9ae9e8f0d25d, content-type=application/octet-stream, amqp_receivedRoutingKey=xxxxx.publish, amqp_deliveryMode=PERSISTENT, instanceaction=[Payload={UPDATED=1}][Headers={timestamp=1371214048784, id=6b16c80e-cc66-4a1b-a573-532d7f34cd35, content-type=application/octet-stream, amqp_receivedRoutingKey=xxxxx.publish, amqp_deliveryMode=PERSISTENT, amqp_receivedExchange=xxxxx, amqp_redelivered=false, amqp_deliveryTag=5}], route=ip-XX-XX-XXX-X.ec2.internal, amqp_receivedExchange=xxxxx, idinstance=1960, amqp_deliveryTag=5, amqp_redelivered=false}
    but it continues to hit the filter bean on the next line of the log:

    Code:
    12:47:28.850 DEBUG [SimpleAsyncTaskExecutor-1][org.springframework.integration.filter.MessageFilter] org.springframework.integration.filter.MessageFilter@261ca3cb received message: [Payload={profileId=3181, groupName=0, oldGroupName=0, userId=64518, broadcastStreamName=64518-1319123379-463550000, serverName=XXX.XX.XXX.XX, appInstName=h11, jobType=add, action=updateJob, username=h11, userpic=http://xxxxxx/xxxxx.jpg, t=1371214056358}][Headers={timestamp=1371214048849, id=0c272126-1bdd-48e5-a8a1-9ae9e8f0d25d, content-type=application/octet-stream, amqp_receivedRoutingKey=XXXXX.publish, amqp_deliveryMode=PERSISTENT, instanceaction=[Payload={UPDATED=1}][Headers={timestamp=1371214048784, id=6b16c80e-cc66-4a1b-a573-532d7f34cd35, content-type=application/octet-stream, amqp_receivedRoutingKey=XXXXX.publish, amqp_deliveryMode=PERSISTENT, amqp_receivedExchange=xxxxx, amqp_redelivered=false, amqp_deliveryTag=5}], route=ip-XX-XX-XXX-X.ec2.internal, amqp_receivedExchange=xxxxx, idinstance=1960, amqp_deliveryTag=5, amqp_redelivered=false}]
    12:47:28.850 DEBUG [SimpleAsyncTaskExecutor-1][org.springframework.integration.filter.MessageFilter] handler 'org.springframework.integration.filter.MessageFilter@261ca3cb' sending reply Message: [Payload={profileId=3181, groupName=0, oldGroupName=0, userId=64518, broadcastStreamName=64518-1319123379-463550000, serverName=XXX.XX.XXX.XX, appInstName=h11, jobType=add, action=updateJob, username=h11, userpic=http://xxxxxxx/xxxx.jpg, t=1371214056358}][Headers={timestamp=1371214048849, id=0c272126-1bdd-48e5-a8a1-9ae9e8f0d25d, content-type=application/octet-stream, amqp_receivedRoutingKey=xxxx.publish, amqp_deliveryMode=PERSISTENT, instanceaction=[Payload={UPDATED=1}][Headers={timestamp=1371214048784, id=6b16c80e-cc66-4a1b-a573-532d7f34cd35, content-type=application/octet-stream, amqp_receivedRoutingKey=xxxxx.publish, amqp_deliveryMode=PERSISTENT, amqp_receivedExchange=xxxxx, amqp_redelivered=false, amqp_deliveryTag=5}], route=ip-xxx-xxx-xxx-x.ec2.internal, amqp_receivedExchange=xxxxx, idinstance=1960, amqp_deliveryTag=5, amqp_redelivered=false}]
    whereas the one that fails does not hit the filter.

    The one that fails does not hit the filter, does not go to the createStreamOut channel, and as a result does not insert the record into the database.

    I dont think that the filter is being applied to the message, because the logs don't show it. And it should not filter the one that fails anyway because the the SPEL expression evaluation that we are filtering on is the same = payload[jobType].equals('add'). As you can see in the messages above jobType=add. See the filter setup below. The one that suceeds makes it to createStreamOut and creates a record in the database table.

    Code:
    <int:channel id="publishRouterOut"/>
    	
    	<int:filter input-channel="publishRouterOut" output-channel="createStreamOut" expression="payload[jobType].equals('add')"/>
    	
    	<int:channel id="createStreamOut"/>
    Last edited by ehrdoctors; Jun 14th, 2013, 01:27 PM.

  • #2
    Hi!

    Do you see where the Message fails?
    Add to your config <mesasge-history/> and look into logs how your messages travel.
    If there is some Exception, post them here, please!

    Take care,
    Artem

    Comment

    Working...
    X