Announcement Announcement Module
Collapse
No announcement yet.
asynchronous and concurrent message processing from a queue Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • asynchronous and concurrent message processing from a queue

    I have 2 code samples below for concurrently processing messages from a tibco jms server queue with multiple consumers (two), each created from a different jms session instance.
    Questions: 1) I would like to understand the difference between the two in behavior. One explicitly starts 2 threads, and one just stops with creating a couple of consumers. When I run them, to me it seems both are behaving the same way and starting 2 threads seems redundant or not required?
    2) what happens when the main thread returns normally without calling connection.close() or system.exit() in the end of the program, do the listener's be able to get messages still?
    3) is the jms session/server responsible for creating threads to call the onMessage method in each consumer?
    4) are they daemon or non-daemon threads and how do I find out?
    A) CODE1: with threads created
    Code:
    public class QueueConsumer implements Runnable, MessageListener {
     
     
     
    private static ConnectionFactory qcf = null;
    private static Connection queueConnection = null;
     
     
    private Session ses = null;
    private Destination queue = null;
    private MessageConsumer msgConsumer = null;
     
    public static final Logger logger = LoggerFactory
            .getLogger(QueueConsumer.class);
     
    public QueueConsumer() {
        super();
    }
     
    public static void main(String[] args) {
     
           try {
                qcf = new JMSConnectionFactory("oms");
                queueConnection = qcf.createConnection();
            } catch (JMSException e1) {
     
                e1.printStackTrace();
                System.exit(-1);
            }
     
     
     
        QueueConsumer consumer1 = new QueueConsumer();
        QueueConsumer consumer2 = new QueueConsumer();
        try {
            consumer1.init("oms", "US.Q.CHECKOUT-ORDER.1.0.JSON");
            consumer2.init("oms","US.Q.CHECKOUT-ORDER.1.0.JSON");
        } catch (JMSException ex) {
            ex.printStackTrace();
            System.exit(-1);
        }
     
     
        Thread newThread1 = new Thread(consumer1);
     
        Thread newThread2 = new Thread(consumer2);
     
        newThread1.start();
     
        newThread2.start();
     
     
        InputStreamReader aISR = new InputStreamReader(System.in);
     
            char aAnswer = ' ';
     
            do {
     
                try {
     
                    aAnswer = (char) aISR.read();
     
                } catch (IOException e) {
     
                    e.printStackTrace();
     
                }
     
            } while ((aAnswer != 'q') && (aAnswer != 'Q'));
     
            newThread1.interrupt();
     
            newThread2.interrupt();
     
            try {
                newThread1.join();
     
                newThread2.join();
     
            } catch (InterruptedException e) {
     
     
                e.printStackTrace();
     
            }
     
              try {
     
                queueConnection.close();
     
            } catch (JMSException e) {
     
                e.printStackTrace();
            }
    }
     
     
     
    public void onMessage(Message msg) {
        if (msg instanceof TextMessage) {
            try {
     
                //process message
     
            } catch (JMSException ex) {
                ex.printStackTrace();
     
            }
        }
     
    }
     
    public void run() {
     
        try {
            queueConnection.start();
        } catch (JMSException e) {
     
            e.printStackTrace();
     
            System.exit(-1);
        }
        while (!Thread.currentThread().isInterrupted()) {
            synchronized (this) {
                try {
                    wait();
                } catch (InterruptedException ex) {
                    break;
                }
            }
        }
     
    }
     
     
     
    public void init(String factoryName, String queue2) throws JMSException {
        try {
     
            ses = queueConnection.createSession(false,
                    Session.CLIENT_ACKNOWLEDGE);
            queue = ses.createQueue(queue2);
            logger.info("Subscribing to destination: " + queue2);
     
            msgConsumer = ses.createConsumer(queue);
     
     
            msgConsumer.setMessageListener(this);
     
            System.out.println("Listening on queue " + queue2);
     
        } catch (Exception e) {
            e.printStackTrace();
            System.exit(-1);
        }
     
    }
     
    private static void setConnectionFactoryName(String name) {
        connectionFactoryName = name;
    }
     
    private static String getQueueName() {
        return queueName;
    }
     
    private static void setQueueName(String name) {
        queueName = name;
    }
    }
    b) CODE2: with no explicit threads created
    Code:
    public class QueueConsumer implements MessageListener {
     
     
     
    private static ConnectionFactory qcf = null;
    private static Connection queueConnection = null;
     
     
    private Session ses = null;
    private Destination queue = null;
    private MessageConsumer msgConsumer = null;
     
    public static final Logger logger = LoggerFactory
            .getLogger(QueueConsumer.class);
     
    public QueueConsumer() {
        super();
    }
     
    public static void main(String[] args) {
     
           try {
                qcf = new JMSConnectionFactory("oms");
                queueConnection = qcf.createConnection();
            } catch (JMSException e1) {
     
                e1.printStackTrace();
                System.exit(-1);
            }
     
     
     
        QueueConsumer consumer1 = new QueueConsumer();
        QueueConsumer consumer2 = new QueueConsumer();
        try {
            consumer1.init("oms", "US.Q.CHECKOUT-ORDER.1.0.JSON");
            consumer2.init("oms","US.Q.CHECKOUT-ORDER.1.0.JSON");
        } catch (JMSException ex) {
            ex.printStackTrace();
            System.exit(-1);
        }
     
        InputStreamReader aISR = new InputStreamReader(System.in);
            char aAnswer = ' ';
            do {
                try {
                    aAnswer = (char) aISR.read();
                } catch (IOException e) {
     
                    e.printStackTrace();
     
                }
     
            } while ((aAnswer != 'q') && (aAnswer != 'Q'));
     
           try {
                queueConnection.close();
            } catch (JMSException e) {
     
                e.printStackTrace();
            }
     
     
     
    }
     
     
     
    public void onMessage(Message msg) {
        if (msg instanceof TextMessage) {
            try {
     
                //process message
     
            } catch (JMSException ex) {
                ex.printStackTrace();
     
            }
        }
     
    }
     
     
     
     
     
    public void init(String factoryName, String queue2) throws JMSException {
        try {
     
            ses = queueConnection.createSession(false,
                    Session.CLIENT_ACKNOWLEDGE);
            queue = ses.createQueue(queue2);
            logger.info("Subscribing to destination: " + queue2);
     
            msgConsumer = ses.createConsumer(queue);
     
     
            msgConsumer.setMessageListener(this);
     
            System.out.println("Listening on queue " + queue2);
     
        } catch (Exception e) {
            e.printStackTrace();
            System.exit(-1);
        }
     
    }
     
    private static void setConnectionFactoryName(String name) {
        connectionFactoryName = name;
    }
     
    private static String getQueueName() {
        return queueName;
    }
     
    private static void setQueueName(String name) {
        queueName = name;
    }
    }
    Last edited by Gary Russell; Jun 14th, 2013, 06:32 PM.

  • #2
    My question is why you are asking this question in a Spring forum when you are not using Spring at all; you are coding to the JMS APIs directly.

    If you want to use Spring, I suggest you read the JMS section in the Spring Reference Documentation.

    That said, with your existing code, you need to refer your questions to Tibco because they are managing the threads and I have no idea whether they create daemons etc. It's a function of their library.

    Comment

    Working...
    X