diff options
Diffstat (limited to 'java/client/example/src')
2 files changed, 76 insertions, 36 deletions
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java b/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java index 8784d340da..b6544db995 100644 --- a/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java @@ -18,18 +18,18 @@ */ package org.apache.qpid.example.publisher; -import org.apache.log4j.Logger; import org.apache.log4j.BasicConfigurator; +import org.apache.log4j.Logger; -import javax.jms.*; - -import java.util.Properties; +import javax.jms.DeliveryMode; +import javax.jms.JMSException; /** - * Class that sends heartbeat messages to allow monitoring of message consumption - * Sends regular (currently 20 seconds apart) heartbeat message + * Class that sends heartbeat messages to allow monitoring of message consumption Sends regular (currently 20 seconds + * apart) heartbeat message */ -public class MonitorMessageDispatcher { +public class MonitorMessageDispatcher +{ private static final Logger _logger = Logger.getLogger(MonitorMessageDispatcher.class); @@ -39,17 +39,18 @@ public class MonitorMessageDispatcher { /** * Easy entry point for running a message dispatcher for monitoring consumption + * * @param args */ public static void main(String[] args) { - //Switch on logging appropriately for your app BasicConfigurator.configure(); try { - while(true) + int i =0; + while (i < 1000) { try { @@ -62,9 +63,10 @@ public class MonitorMessageDispatcher { } //sleep for twenty seconds and then publish again - change if appropriate - Thread.sleep(20000); + //Thread.sleep(1000); + i++ ; } - catch(UndeliveredMessageException a) + catch (UndeliveredMessageException a) { //trigger application specific failure handling here _logger.error("Problem delivering monitor message"); @@ -72,7 +74,7 @@ public class MonitorMessageDispatcher { } } } - catch(Exception e) + catch (Exception e) { _logger.error("Error trying to dispatch AMS monitor message: " + e); System.exit(1); @@ -81,7 +83,7 @@ public class MonitorMessageDispatcher { { if (getMonitorPublisher() != null) { - getMonitorPublisher().cleanup(); + getMonitorPublisher().cleanup(); } } @@ -90,19 +92,24 @@ public class MonitorMessageDispatcher { /** * Publish heartbeat message + * * @throws JMSException * @throws UndeliveredMessageException */ public static void publish() throws JMSException, UndeliveredMessageException { //Send the message generated from the payload using the _publisher - getMonitorPublisher().sendImmediateMessage - (FileMessageFactory.createSimpleEventMessage(getMonitorPublisher().getSession(),"monitor:" +System.currentTimeMillis())); +// getMonitorPublisher().sendImmediateMessage +// (FileMessageFactory.createSimpleEventMessage(getMonitorPublisher().getSession(),"monitor:" +System.currentTimeMillis())); + + getMonitorPublisher().sendMessage + (getMonitorPublisher()._session, + FileMessageFactory.createSimpleEventMessage(getMonitorPublisher().getSession(), "monitor:" + System.currentTimeMillis()), + DeliveryMode.PERSISTENT, false, true); + } - /** - * Cleanup publishers - */ + /** Cleanup publishers */ public static void cleanup() { if (getMonitorPublisher() != null) @@ -119,16 +126,16 @@ public class MonitorMessageDispatcher { //Returns a _publisher for the monitor queue private static MonitorPublisher getMonitorPublisher() { - if (_monitorPublisher != null) - { - return _monitorPublisher; - } + if (_monitorPublisher != null) + { + return _monitorPublisher; + } - //Create a _publisher using failover details and constant for monitor queue - _monitorPublisher = new MonitorPublisher(); + //Create a _publisher using failover details and constant for monitor queue + _monitorPublisher = new MonitorPublisher(); - _monitorPublisher.setName(MonitorMessageDispatcher.DEFAULT_MONITOR_PUB_NAME); - return _monitorPublisher; + _monitorPublisher.setName(MonitorMessageDispatcher.DEFAULT_MONITOR_PUB_NAME); + return _monitorPublisher; } } diff --git a/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java b/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java index 233c3fea0a..a67b602e58 100644 --- a/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java @@ -18,15 +18,17 @@ */ package org.apache.qpid.example.publisher; -import javax.jms.Message; +import org.apache.log4j.Logger; +import org.apache.qpid.client.BasicMessageProducer; + import javax.jms.DeliveryMode; import javax.jms.JMSException; -import org.apache.qpid.client.BasicMessageProducer; -import org.apache.log4j.Logger; +import javax.jms.Message; +import javax.jms.Session; /** - * Subclass of Publisher which uses QPID functionality to send a heartbeat message - * Note immediate flag not available via JMS MessageProducer + * Subclass of Publisher which uses QPID functionality to send a heartbeat message Note immediate flag not available via + * JMS MessageProducer */ public class MonitorPublisher extends Publisher { @@ -40,14 +42,45 @@ public class MonitorPublisher extends Publisher super(); } - /* - * Publishes a non-persistent message using transacted session - */ + /* + * Publishes a message using given details + */ + public boolean sendMessage(Session session, Message message, int deliveryMode, + boolean immediate, boolean commit) throws UndeliveredMessageException + { + try + { + _producer = (BasicMessageProducer) session.createProducer(_destination); + + _producer.send(message, deliveryMode, immediate); + + if (commit) + { + //commit the message send and close the transaction + _session.commit(); + } + + } + catch (JMSException e) + { + //Have to assume our commit failed but do not rollback here as channel closed + _log.error(e); + e.printStackTrace(); + throw new UndeliveredMessageException("Cannot deliver immediate message", e); + } + + _log.info(_name + " finished sending message: " + message); + return true; + } + + /* + * Publishes a non-persistent message using transacted session + */ public boolean sendImmediateMessage(Message message) throws UndeliveredMessageException { try { - _producer = (BasicMessageProducer)_session.createProducer(_destination); + _producer = (BasicMessageProducer) _session.createProducer(_destination); //Send message via our producer which is not persistent and is immediate //NB: not available via jms interface MessageProducer @@ -62,7 +95,7 @@ public class MonitorPublisher extends Publisher //Have to assume our commit failed but do not rollback here as channel closed _log.error(e); e.printStackTrace(); - throw new UndeliveredMessageException("Cannot deliver immediate message",e); + throw new UndeliveredMessageException("Cannot deliver immediate message", e); } _log.info(_name + " finished sending message: " + message); |