summaryrefslogtreecommitdiff
path: root/java/client/example/src
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-07-18 16:01:58 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-07-18 16:01:58 +0000
commit41cdc0f710669477e9c5c1a47a0e34ea059bc3b5 (patch)
tree637253cafd29fa2d2927ac32777667c84f563447 /java/client/example/src
parent0b3d5ba2d59301cc0dead3eaf643e2a84cc58e85 (diff)
downloadqpid-python-41cdc0f710669477e9c5c1a47a0e34ea059bc3b5.tar.gz
Update fix incorrect license headers.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@557306 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/example/src')
-rw-r--r--java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java59
-rw-r--r--java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java53
2 files changed, 76 insertions, 36 deletions
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java b/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java
index 8784d340da..b6544db995 100644
--- a/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java
+++ b/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java
@@ -18,18 +18,18 @@
*/
package org.apache.qpid.example.publisher;
-import org.apache.log4j.Logger;
import org.apache.log4j.BasicConfigurator;
+import org.apache.log4j.Logger;
-import javax.jms.*;
-
-import java.util.Properties;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
/**
- * Class that sends heartbeat messages to allow monitoring of message consumption
- * Sends regular (currently 20 seconds apart) heartbeat message
+ * Class that sends heartbeat messages to allow monitoring of message consumption Sends regular (currently 20 seconds
+ * apart) heartbeat message
*/
-public class MonitorMessageDispatcher {
+public class MonitorMessageDispatcher
+{
private static final Logger _logger = Logger.getLogger(MonitorMessageDispatcher.class);
@@ -39,17 +39,18 @@ public class MonitorMessageDispatcher {
/**
* Easy entry point for running a message dispatcher for monitoring consumption
+ *
* @param args
*/
public static void main(String[] args)
{
-
//Switch on logging appropriately for your app
BasicConfigurator.configure();
try
{
- while(true)
+ int i =0;
+ while (i < 1000)
{
try
{
@@ -62,9 +63,10 @@ public class MonitorMessageDispatcher {
}
//sleep for twenty seconds and then publish again - change if appropriate
- Thread.sleep(20000);
+ //Thread.sleep(1000);
+ i++ ;
}
- catch(UndeliveredMessageException a)
+ catch (UndeliveredMessageException a)
{
//trigger application specific failure handling here
_logger.error("Problem delivering monitor message");
@@ -72,7 +74,7 @@ public class MonitorMessageDispatcher {
}
}
}
- catch(Exception e)
+ catch (Exception e)
{
_logger.error("Error trying to dispatch AMS monitor message: " + e);
System.exit(1);
@@ -81,7 +83,7 @@ public class MonitorMessageDispatcher {
{
if (getMonitorPublisher() != null)
{
- getMonitorPublisher().cleanup();
+ getMonitorPublisher().cleanup();
}
}
@@ -90,19 +92,24 @@ public class MonitorMessageDispatcher {
/**
* Publish heartbeat message
+ *
* @throws JMSException
* @throws UndeliveredMessageException
*/
public static void publish() throws JMSException, UndeliveredMessageException
{
//Send the message generated from the payload using the _publisher
- getMonitorPublisher().sendImmediateMessage
- (FileMessageFactory.createSimpleEventMessage(getMonitorPublisher().getSession(),"monitor:" +System.currentTimeMillis()));
+// getMonitorPublisher().sendImmediateMessage
+// (FileMessageFactory.createSimpleEventMessage(getMonitorPublisher().getSession(),"monitor:" +System.currentTimeMillis()));
+
+ getMonitorPublisher().sendMessage
+ (getMonitorPublisher()._session,
+ FileMessageFactory.createSimpleEventMessage(getMonitorPublisher().getSession(), "monitor:" + System.currentTimeMillis()),
+ DeliveryMode.PERSISTENT, false, true);
+
}
- /**
- * Cleanup publishers
- */
+ /** Cleanup publishers */
public static void cleanup()
{
if (getMonitorPublisher() != null)
@@ -119,16 +126,16 @@ public class MonitorMessageDispatcher {
//Returns a _publisher for the monitor queue
private static MonitorPublisher getMonitorPublisher()
{
- if (_monitorPublisher != null)
- {
- return _monitorPublisher;
- }
+ if (_monitorPublisher != null)
+ {
+ return _monitorPublisher;
+ }
- //Create a _publisher using failover details and constant for monitor queue
- _monitorPublisher = new MonitorPublisher();
+ //Create a _publisher using failover details and constant for monitor queue
+ _monitorPublisher = new MonitorPublisher();
- _monitorPublisher.setName(MonitorMessageDispatcher.DEFAULT_MONITOR_PUB_NAME);
- return _monitorPublisher;
+ _monitorPublisher.setName(MonitorMessageDispatcher.DEFAULT_MONITOR_PUB_NAME);
+ return _monitorPublisher;
}
}
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java b/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java
index 233c3fea0a..a67b602e58 100644
--- a/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java
+++ b/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java
@@ -18,15 +18,17 @@
*/
package org.apache.qpid.example.publisher;
-import javax.jms.Message;
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.BasicMessageProducer;
+
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
-import org.apache.qpid.client.BasicMessageProducer;
-import org.apache.log4j.Logger;
+import javax.jms.Message;
+import javax.jms.Session;
/**
- * Subclass of Publisher which uses QPID functionality to send a heartbeat message
- * Note immediate flag not available via JMS MessageProducer
+ * Subclass of Publisher which uses QPID functionality to send a heartbeat message Note immediate flag not available via
+ * JMS MessageProducer
*/
public class MonitorPublisher extends Publisher
{
@@ -40,14 +42,45 @@ public class MonitorPublisher extends Publisher
super();
}
- /*
- * Publishes a non-persistent message using transacted session
- */
+ /*
+ * Publishes a message using given details
+ */
+ public boolean sendMessage(Session session, Message message, int deliveryMode,
+ boolean immediate, boolean commit) throws UndeliveredMessageException
+ {
+ try
+ {
+ _producer = (BasicMessageProducer) session.createProducer(_destination);
+
+ _producer.send(message, deliveryMode, immediate);
+
+ if (commit)
+ {
+ //commit the message send and close the transaction
+ _session.commit();
+ }
+
+ }
+ catch (JMSException e)
+ {
+ //Have to assume our commit failed but do not rollback here as channel closed
+ _log.error(e);
+ e.printStackTrace();
+ throw new UndeliveredMessageException("Cannot deliver immediate message", e);
+ }
+
+ _log.info(_name + " finished sending message: " + message);
+ return true;
+ }
+
+ /*
+ * Publishes a non-persistent message using transacted session
+ */
public boolean sendImmediateMessage(Message message) throws UndeliveredMessageException
{
try
{
- _producer = (BasicMessageProducer)_session.createProducer(_destination);
+ _producer = (BasicMessageProducer) _session.createProducer(_destination);
//Send message via our producer which is not persistent and is immediate
//NB: not available via jms interface MessageProducer
@@ -62,7 +95,7 @@ public class MonitorPublisher extends Publisher
//Have to assume our commit failed but do not rollback here as channel closed
_log.error(e);
e.printStackTrace();
- throw new UndeliveredMessageException("Cannot deliver immediate message",e);
+ throw new UndeliveredMessageException("Cannot deliver immediate message", e);
}
_log.info(_name + " finished sending message: " + message);