Announcement Announcement Module
Collapse
No announcement yet.
sending a message on RabbitMQ queue via SMTP with Spring Integration Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • sending a message on RabbitMQ queue via SMTP with Spring Integration

    Hi, I am relatively new to Rabbit MQ and Spring integration, but have been able to successfully configure a program using the Integration Samples for Mail and AMQP, based on the advice in this thread that checks a POP/SSL mailbox periodically and puts any messages on a Rabbit MQ queue via AMQP. Kudos to the Spring developers who have made development of this type of integration straightforward.

    Conversely, I would like to have a separate program that checks the contents of a queue via AMQP and submits the message to a transformer class, which in turn constructs an email from the original RMQ message payload and sends it to a recipient via SMTP (communicating with the server over SSL).

    I am not sure about how the XML should look to move the data from Rabbit to the transform class. I read a good example of how to get started on the SMTP side but the input was a file instead of Rabbit.

    So far I have this:
    amqpsmtp-config.xml
    Code:
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xmlns:int="http://www.springframework.org/schema/integration"
    	xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp"
    	xmlns:rabbit="http://www.springframework.org/schema/rabbit"
    	xmlns:int-mail="http://www.springframework.org/schema/integration/mail"
            xmlns:util="http://www.springframework.org/schema/util"
    	xsi:schemaLocation="http://www.springframework.org/schema/integration/amqp http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd
    		http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
    		http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd	
    		http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
    		http://www.springframework.org/schema/integration/mail http://www.springframework.org/schema/integration/mail/spring-integration-mail.xsd
    		http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
    
    
    
        <int-amqp:inbound-channel-adapter id="ackqueue"
    				      channel="fromRabbit" 
                                          queue-names="immunization.acknowledge.queue"
                                          connection-factory="connectionFactory"/>
    
       <int-amqp:inbound-channel-adapter  id="ackerror"
    				      channel="fromRabbit" 
                                          queue-names="immunization.error.queue"
                                          connection-factory="connectionFactory"/>
    
        <int:channel id="fromRabbit">
           <int:interceptors>
                <int:wire-tap channel="loggingChannel"/>
            </int:interceptors>
        </int:channel>
    
        <int:transformer input-channel="fromRabbit" output-channel="outgoingMail" ref="MailTransformer" method="doTransform"/>
    
        <bean id="mailSender" class="org.springframework.mail.javamail.JavaMailSenderImpl">
    		<property name="host" value="XXXXXXXXX"/>
    		<property name="username" value="XXXXXXX"/>
    		<property name="password" value="XXXXXXXX"/>
        </bean>
    
        <bean id="MailTransformer" class="com.ehrdoctors.amqp.amqpsmtp.MailTransformer">
    	<property name="mailSender" ref="mailSender"/>
        </bean>	
    
        <int:channel id="outgoingMail">
           <int:interceptors>
                <int:wire-tap channel="loggingChannel"/>
            </int:interceptors>
        </int:channel>
    
        <int-mail:outbound-channel-adapter 
    	id="outboundMailAdapter" 
    	channel="outgoingMail" 
    	mail-sender="mailSender"
    	java-mail-properties="javaMailProperties"/>
    
    <!-- Sends the mail to the mailbox with the credentials defined by mailSender bean -->
    
        <util:properties id="javaMailProperties">
    		<prop key="mail.smtp.debug">true</prop>
    		<prop key="mail.smtp.auth">true</prop>
    		<prop key="mail.smtp.port">25</prop>
    <!--SSL Parameters -->
    <!--		<prop key="mail.smtp.socketFactory.class">javax.net.ssl.SSLSocketFactory</prop>
    		<prop key="mail.smtp.socketFactory.port">465</prop> 
    		<prop key="mail.smtp.socketFactory.fallback">false</prop>
    -->
        </util:properties>
     
        <int:logging-channel-adapter id="loggingChannel" log-full-message="true" level="DEBUG"/>
    
        <!-- Infrastructure -->
    
        <rabbit:connection-factory id="connectionFactory" host="localhost" port="5672" virtual-host="immunization" username="XXXXX" password="XXXXXX" />
        <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />
        <rabbit:admin connection-factory="connectionFactory" />
        <rabbit:queue name="immunization.acknowledge.queue" />
        <rabbit:queue name="immunization.error.queue" />
    
        <rabbit:topic-exchange name="immunization.message.exchange">
            <rabbit:bindings>
                <rabbit:binding queue="immunization.acknowledge.queue" pattern="immunization.acknowledge.binding" />
            </rabbit:bindings> 	
        </rabbit:topic-exchange>
    
        <rabbit:topic-exchange name="immunization.message.exchange">
            <rabbit:bindings>
                <rabbit:binding queue="immunization.error.queue" pattern="immunization.error.binding" />
            </rabbit:bindings> 	
        </rabbit:topic-exchange>
    
    
    </beans>
    Mail Transformer.java
    Code:
    package com.ehrdoctors.amqp.amqpsmtp;
    
    import org.springframework.mail.javamail.MimeMailMessage;
    import org.springframework.mail.javamail.MimeMessageHelper;
    import javax.mail.internet.MimeMessage;
    import javax.xml.parsers.DocumentBuilder;
    import javax.xml.parsers.DocumentBuilderFactory;
    import org.w3c.dom.CharacterData;
    import org.w3c.dom.Document;
    import org.w3c.dom.Element;
    import org.w3c.dom.Node;
    import org.w3c.dom.NodeList;
    import org.xml.sax.InputSource;
    import javax.xml.xpath.*;
    import java.io.StringReader;
    import java.io.ByteArrayInputStream;
    import java.util.Date;
    import org.springframework.integration.Message;
    import org.springframework.mail.javamail.JavaMailSenderImpl;
    import org.apache.log4j.Logger;
    import org.springframework.integration.transformer.AbstractTransformer;
    import org.springframework.core.io.ByteArrayResource;
    import org.apache.commons.codec.binary.Base64;
     
    public class MailTransformer  extends AbstractTransformer {
    private static Logger logger = Logger.getLogger(MailTransformer.class);
    private JavaMailSenderImpl mailSender;
    public void setMailSender(JavaMailSenderImpl jms) {
    	this.mailSender = jms;
    }
    public JavaMailSenderImpl getMailSender() {
           return mailSender;
    }
    
    @Override
    protected Object doTransform(Message<?> message) throws Exception {
    			/* Create the mime message using spring.framework.mail packages */
    			MimeMessage mimeMsg = mailSender.createMimeMessage();
    			try{
    				
    				Object payload = message.getPayload();
    				DocumentBuilder db = DocumentBuilderFactory.newInstance().newDocumentBuilder();
        				InputSource is = new InputSource();
        				is.setCharacterStream(new StringReader(payload.toString()));
    				Document doc = db.parse(is);
        				XPathFactory  factory=XPathFactory.newInstance();
    
    				//Create an XPath object from the XPathFactory object with the newXPath method.
    				XPath xPath=factory.newXPath();
    				//Extract the To, From, and subject from the Message
    				String mailFrom = xPath.evaluate("/Message/From", doc);
    				String mailTo = xPath.evaluate("/Message/To", doc);
    				String mailSubject = xPath.evaluate("/Message/Subject", doc);
    				
    				//The body is a little different, we use the text() method because the Content is in CDATA
    				String body = xPath.evaluate("/Message/Content/text()", doc);
    				
    				//We may have more than one attachment, so we need to create a NodeList fo attachments
    				//to iterate through
    				NodeList attachments = (NodeList)xPath.evaluate("/Message/attachments", doc, XPathConstants.NODESET);	
    				boolean multipart = false;
    				if(attachments!=null){multipart=true;}
    
    				MimeMessageHelper helper = new MimeMessageHelper(mimeMsg , multipart);
    				helper.setTo(mailTo);	
    				helper.setFrom(mailFrom);
    				helper.setSubject(mailSubject);
    				helper.setSentDate(new Date());
    				helper.setText(body);
    
    				
    				if(attachments!=null){
    					for (int i = 0; i < attachments.getLength(); i++) {
    				      		logger.info("Adding attachment: " + i);
    				      		Element attachment = (Element) attachments.item(i);
          				      		String strAttachment = xPath.evaluate("attachment", attachment);
    				     		String strFileName = xPath.evaluate("attachment/@filename",attachment);
    				     		logger.info("Sucess adding attachment: " + strAttachment + " with filename:" + strFileName);
    				      
    				      		if(strAttachment.equals("")==false && strFileName.equals("")==false){
    					  		byte[] decoded = Base64.decodeBase64(strAttachment); 
    				      			helper.addAttachment(strFileName, new ByteArrayResource(decoded));
    							logger.info("helpEncoding:" + helper.getEncoding());
    				     		}	 	
                          	     	
    					}
    				}
    				return helper.getMimeMessage() ;
    
    
    			}
    			catch(Exception e){
    				logger.info("error:" + e);
    				return message;
    			}
    			
    		}
    }

    I run it like this:
    Code:
    bash$: java -cp target/amqp-2.1.0.BUILD-SNAPSHOT.jar:lib/* -Djavax.net.ssl.trustStore=cacerts.jks org.springframework.integration.samples.amqp.Main;

    Cheers,

    Richard
    Last edited by ehrdoctors; Nov 12th, 2012, 03:48 PM.

  • #2
    I am continuing to update this thread as I progress. I will post the solution when I have it for others to use. I can be reached via pm.
    Last edited by ehrdoctors; Sep 21st, 2012, 10:29 PM.

    Comment


    • #3
      I suggest you break it down into manageable chunks.

      First of all - are you getting messages from the inbound channel adapter? Run with debug logging and you will see detailed message flows through thse system.

      If you are not seeing messages arriving at the adapter, it likely a configuration issue.

      Regarding message format; yes, XML is a reasonable choice, you also might want to look at JSON; SI has first class support for marshaling between JSON and java objects (and XML too).

      Once you have your message transformed to a Java object (e.g. using a json-to-object-transformer), you can then use a mail header-enricher...

      Code:
      	<header-enricher input-channel="fromAmqp" output-channel="toSMTP">
      		<to expression="payload.to"/>
      		<cc expression="payload.cc"/>
      		<bcc expression="payload.bcc"/>
      		<from expression="payload.from"/>
      		<reply-to expression="payload.replyto"/>
      		<subject expression="payload.subject"/>
      	</header-enricher>
      You should be able to achieve your objective without writing any custom java code.

      Comment


      • #4
        Originally posted by Gary Russell View Post
        I suggest you break it down into manageable chunks.

        First of all - are you getting messages from the inbound channel adapter? Run with debug logging and you will see detailed message flows through thse system.

        If you are not seeing messages arriving at the adapter, it likely a configuration issue.
        Gary, first, thank you for the reply.

        I do think that the messages are arriving at the adapter. I see a line like this, and the message is no longer in the queue when I run list_queues

        Code:
        18:47:32.118 INFO  [SimpleAsyncTaskExecutor-1][org.springframework.integration.samples.amqp.Main] Message: [Payload=[B@12c3327][Headers={timestamp=1348512261777, id=953c743b-52d5-4a4b-9ca0-c0d6514b3aa1, amqp_receivedRoutingKey=si.test.binding, amqp_deliveryMode=PERSISTENT, amqp_contentType=application/octet-stream, amqp_receivedExchange=si.test.exchange, amqp_redelivered=true, amqp_deliveryTag=1}]
        However, I do not see any messages being sent to by SMTP server. I will take your recommendation and ditch my attempt to use the MailTransformer class and instead try to to this all in spring config file.


        Regarding message format; yes, XML is a reasonable choice, you also might want to look at JSON; SI has first class support for marshaling between JSON and java objects (and XML too).
        Once you have your message transformed to a Java object (e.g. using a json-to-object-transformer),
        I understand you are saying I can accomplish my objective without having a custom MailTransformer class. I think JSON is a good choice for message format.

        I would imagine code like this would be what I would use:

        Code:
        <int:json-to-object-transformer id="incomingJsonConverter" input-channel="incoming" type="java.util.HashMap"  />

        you can then use a mail header-enricher...

        Code:
        	<header-enricher input-channel="incoming" output-channel="outgoing">
        		<to expression="payload.to"/>
        		<cc expression="payload.cc"/>
        		<bcc expression="payload.bcc"/>
        		<from expression="payload.from"/>
        		<reply-to expression="payload.replyto"/>
        		<subject expression="payload.subject"/>
        	</header-enricher>
        You should be able to achieve your objective without writing any custom java code.
        Where does the body of the message go? What about attachments?

        Comment


        • #5
          We decided on an XML format to represent our message to be converted into a mime message:

          Code:
          <Message>
            <Subject>test</Subject>
            <From>sender@domain</From>
            <ReplyTo>sender@domain/ReplyTo>
            <To>recipient@domain</To>
            <Content>
              <![CDATA[test CCD]]>
            </Content>
          </Message>
          Last edited by ehrdoctors; Oct 28th, 2012, 02:43 AM.

          Comment


          • #6
            Still not sending, but close....

            I am still not seeing the mail leave via SMTP. Everything else works. The message is taken from the Rabbit Queue and the message get transformed (loaded into a DOM, parsed with XPath, build a MimeMessage). I updated the code in my initial post to show my latest configuration. The MimeMessage should hit the outgoingMail channel and get sent out via the out going mail adapter, but its failing somewhere, although nothing telling in the logs, in fact there is not SMTP traffic showing. Any ideas? Thanks is Advance

            Code:
            06:04:20.699 DEBUG [SimpleAsyncTaskExecutor-1][org.springframework.integration.mail.MailSendingMessageHandler] org.springframework.integration.mail.MailSendingMessageHandler#0 received message: [Payload=org.springframework.mail.javamail.MimeMailMessage@18a6e6e][Headers={timestamp=1351404260698, id=63cda341-1fa4-4412-bef8-2130f327ae01, amqp_receivedRoutingKey=immunization.messaging.binding, amqp_deliveryMode=PERSISTENT, amqp_contentType=text/plain, amqp_receivedExchange=immunization.message.exchange, amqp_contentEncoding=UTF-8, amqp_deliveryTag=7, amqp_redelivered=true}]

            Comment


            • #7
              Working now.

              I got working, will post the final source tomorrow as a good example of how to do AMQP->SMTP outbound.

              Comment


              • #8
                I changed to the code now shown in Post#1 to something that was working, , but now its not. I have the XML in Post#5 above on the Rabbit Queue as a string. I am trying to create a MimeMessage from the XML, and I have a custom MailTransformer class that overrides the doTransform method of the AbstractTransformer.

                I get an error from a MailSendingMessageHandler class that I have never seen before, and that I had no intention of running.. I was under the understanding that there was no default class for transforming rabbit messages into mimemessages, but apparently there is. I would just as soon use my XML to MimeMessage MailTransformer, but I am stuck on this error.
                org.springframework.integration.mail.MailSendingMe ssageHandler.createMailMessageFromByteArrayMessage (MailSendingMessageHandler.java:130)
                at org.springframework.integration.mail.MailSendingMe ssageHandler.convertMessageToMailMessage(MailSendi ngMessageHandler.java:95)
                at

                13:33:57.898 DEBUG [SimpleAsyncTaskExecutor-1][org.springframework.amqp.rabbit.listener.SimpleMes sageListenerContainer] Execution of Rabbit message listener failed, and no ErrorHandler has been set.
                org.springframework.amqp.rabbit.listener.ListenerE xecutionFailedException: Listener threw exception
                at org.springframework.amqp.rabbit.listener.AbstractM essageListenerContainer.wrapToListenerExecutionFai ledExceptionIfNeeded(AbstractMessageListenerContai ner.java:590)
                at org.springframework.amqp.rabbit.listener.AbstractM essageListenerContainer.doInvokeListener(AbstractM essageListenerContainer.java:529)
                at org.springframework.amqp.rabbit.listener.AbstractM essageListenerContainer.invokeListener(AbstractMes sageListenerContainer.java:472)
                at org.springframework.amqp.rabbit.listener.SimpleMes sageListenerContainer.access$001(SimpleMessageList enerContainer.java:56)
                at org.springframework.amqp.rabbit.listener.SimpleMes sageListenerContainer$1.invokeListener(SimpleMessa geListenerContainer.java:103)
                at org.springframework.amqp.rabbit.listener.SimpleMes sageListenerContainer.invokeListener(SimpleMessage ListenerContainer.java:560)
                at org.springframework.amqp.rabbit.listener.AbstractM essageListenerContainer.executeListener(AbstractMe ssageListenerContainer.java:452)
                at org.springframework.amqp.rabbit.listener.SimpleMes sageListenerContainer.doReceiveAndExecute(SimpleMe ssageListenerContainer.java:436)
                at org.springframework.amqp.rabbit.listener.SimpleMes sageListenerContainer.receiveAndExecute(SimpleMess ageListenerContainer.java:420)
                at org.springframework.amqp.rabbit.listener.SimpleMes sageListenerContainer.access$200(SimpleMessageList enerContainer.java:56)
                at org.springframework.amqp.rabbit.listener.SimpleMes sageListenerContainer$AsyncMessageProcessingConsum er.run(SimpleMessageListenerContainer.java:505)
                at java.lang.Thread.run(Thread.java:662)
                Caused by: org.springframework.integration.mapping.MessageMap pingException: Header 'mail_attachmentFilename' is required when mapping a Message with a byte array payload to a MailMessage.
                at org.springframework.integration.mail.MailSendingMe ssageHandler.createMailMessageFromByteArrayMessage (MailSendingMessageHandler.java:130)
                at org.springframework.integration.mail.MailSendingMe ssageHandler.convertMessageToMailMessage(MailSendi ngMessageHandler.java:95)
                at org.springframework.integration.mail.MailSendingMe ssageHandler.handleMessageInternal(MailSendingMess ageHandler.java:71)
                at org.springframework.integration.handler.AbstractMe ssageHandler.handleMessage(AbstractMessageHandler. java:73)
                at org.springframework.integration.dispatcher.Unicast ingDispatcher.doDispatch(UnicastingDispatcher.java :114)
                at org.springframework.integration.dispatcher.Unicast ingDispatcher.dispatch(UnicastingDispatcher.java:1 01)
                at org.springframework.integration.channel.AbstractSu bscribableChannel.doSend(AbstractSubscribableChann el.java:61)
                at org.springframework.integration.channel.AbstractMe ssageChannel.send(AbstractMessageChannel.java:157)
                at org.springframework.integration.channel.AbstractMe ssageChannel.send(AbstractMessageChannel.java:128)
                at org.springframework.integration.core.MessagingTemp late.doSend(MessagingTemplate.java:288)
                at org.springframework.integration.core.MessagingTemp late.send(MessagingTemplate.java:149)
                at org.springframework.integration.handler.AbstractRe plyProducingMessageHandler.sendMessage(AbstractRep lyProducingMessageHandler.java:175)

                Comment


                • #9
                  Ah ha

                  Ok, so I now see that the MailSendingMessageHandler only does something if the payload of the Message on the Queue is of type byte[]. If its type String, it just sets the String to the message body. That's why it worked before and not now. During my testing, the payload had a type string, and in the new environment, the file was (mistakenly) put on the queue as type byte[]. Can you override this MessageHandler, which is effectively doing a transformation without being asked to????


                  Code:
                  private MailMessage convertMessageToMailMessage(Message<?> message) {
                  
                  85 	
                  
                                  MailMessage mailMessage = null;
                  
                  86 	
                  
                                  if (message.getPayload() instanceof MailMessage) {
                  
                  87 	
                  
                                          mailMessage = (MailMessage) message.getPayload();
                  
                  88 	
                  
                                  }
                  
                  89 	
                  
                                  else if (message.getPayload() instanceof byte[]) {
                  
                  90 	
                  
                                          mailMessage = this.createMailMessageFromByteArrayMessage((Message<byte[]>) message);
                  
                  91 	
                  
                                  }
                  
                  92 	
                  
                                  else if (message.getPayload() instanceof String) {
                  
                  93 	
                  
                                          mailMessage = new SimpleMailMessage();
                  
                  94 	
                  
                                          mailMessage.setText((String) message.getPayload());
                  
                  95 	
                  
                                  }
                  
                  96 	
                  
                                  else {
                  
                  97 	
                  
                                          throw new MessageHandlingException(message, "Unable to create MailMessage from payload type ["
                  
                  98 	
                  
                                                          + message.getPayload().getClass().getName() + "], expected byte array or String.");
                  
                  99 	
                  
                                  }
                  
                  100 	
                  
                                  this.applyHeadersToMailMessage(mailMessage, message.getHeaders());
                  
                  101 	
                  
                                  return mailMessage;
                  
                  102 	
                  
                          }

                  Comment


                  • #10
                    No; unfortunately, that behavior is hard-wired into the MessageHandler.

                    The mail conversion should probably be delegated to a strategy interface, to allow custom strategies to be injected. Please open an 'Improvement' JIRA ticket.

                    You could convert the byte[] to a String (with a simple <transformer ... expression="new String(payload)" ... />.

                    The only other alternative would (currently) be a custom MessageHandler and wire it up using <bean/> definitions instead of using the <int-mail:.../> namespace.

                    (Add a ConsumerEndpointFactoryBean wired up with your custom handler).

                    Comment

                    Working...
                    X