From 8d1d4fc8550e13f592cd6667733ed3c84d982b21 Mon Sep 17 00:00:00 2001 From: Marnie McCormack Date: Tue, 28 Nov 2006 15:29:16 +0000 Subject: Further example tidy up git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@480089 13f79535-47bb-0310-9956-ffa450edef68 --- .../example/publisher/FileMessageDispatcher.java | 43 ++++--- .../qpid/example/publisher/FileMessageFactory.java | 134 +++++++++++++++++++++ .../qpid/example/publisher/MessageFactory.java | 120 ------------------ .../publisher/MonitorMessageDispatcher.java | 33 +++-- .../apache/qpid/example/publisher/Publisher.java | 27 +++-- 5 files changed, 200 insertions(+), 157 deletions(-) create mode 100644 java/client/src/test/java/org/apache/qpid/example/publisher/FileMessageFactory.java delete mode 100644 java/client/src/test/java/org/apache/qpid/example/publisher/MessageFactory.java (limited to 'java') diff --git a/java/client/src/test/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java b/java/client/src/test/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java index ca3e5ce3f5..b199d41432 100644 --- a/java/client/src/test/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java +++ b/java/client/src/test/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java @@ -19,7 +19,7 @@ package org.apache.qpid.example.publisher; import org.apache.log4j.Logger; -import java.util.Properties; + import java.io.File; import org.apache.qpid.example.shared.FileUtils; @@ -34,12 +34,17 @@ import javax.jms.JMSException; */ public class FileMessageDispatcher { - private static final Logger _logger = Logger.getLogger(FileMessageDispatcher.class); - - private static Publisher _publisher = null; + protected static final Logger _logger = Logger.getLogger(FileMessageDispatcher.class); - private static final String DEFAULT_PUB_NAME = "Publisher"; + protected static Publisher _publisher = null; + /** + * To use this main method you need to specify a path or file to use for input + * This class then uses file contents from the dir/file specified to generate + * messages to publish + * Intended to be a very simple way to get going with publishing using the broker + * @param args - must specify one value, the path to file(s) for publisher + */ public static void main(String[] args) { @@ -52,7 +57,7 @@ public class FileMessageDispatcher { { try { - //publish message(s) from file(s) and send message to monitor queue + //publish message(s) from file(s) to configured queue publish(args[0]); //Move payload file(s) to archive location as no error @@ -60,7 +65,8 @@ public class FileMessageDispatcher { } catch(Exception e) { - System.err.println("Error trying to dispatch message: " + e); + //log error and exit + _logger.error("Error trying to dispatch message: " + e); System.exit(1); } finally @@ -81,8 +87,12 @@ public class FileMessageDispatcher { System.exit(0); } - - //Publish files or file as message + /** + * Publish the content of a file or files from a directory as messages + * @param path - from main args + * @throws JMSException + * @throws MessageFactoryException - if cannot create message from file content + */ public static void publish(String path) throws JMSException, MessageFactoryException { File tempFile = new File(path); @@ -100,7 +110,7 @@ public class FileMessageDispatcher { for (File file : files) { //Create message factory passing in payload path - MessageFactory factory = new MessageFactory(getPublisher().getSession(), file.toString()); + FileMessageFactory factory = new FileMessageFactory(getPublisher().getSession(), file.toString()); //Send the message generated from the payload using the _publisher getPublisher().sendMessage(factory.createEventMessage()); @@ -110,16 +120,18 @@ public class FileMessageDispatcher { } else { - //handle as single file + //handle a single file //Create message factory passing in payload path - MessageFactory factory = new MessageFactory(getPublisher().getSession(),tempFile.toString()); + FileMessageFactory factory = new FileMessageFactory(getPublisher().getSession(),tempFile.toString()); //Send the message generated from the payload using the _publisher getPublisher().sendMessage(factory.createEventMessage()); } } - //cleanup publishers + /** + * Cleanup before exit + */ public static void cleanup() { if (getPublisher() != null) @@ -128,8 +140,8 @@ public class FileMessageDispatcher { } } - /* - * Returns a _publisher for a queue + /** + * @return A Publisher instance */ private static Publisher getPublisher() { @@ -141,7 +153,6 @@ public class FileMessageDispatcher { //Create a _publisher _publisher = new Publisher(); - _publisher.setName(DEFAULT_PUB_NAME); return _publisher; } diff --git a/java/client/src/test/java/org/apache/qpid/example/publisher/FileMessageFactory.java b/java/client/src/test/java/org/apache/qpid/example/publisher/FileMessageFactory.java new file mode 100644 index 0000000000..88bcbbbccb --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/example/publisher/FileMessageFactory.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.qpid.example.publisher; + +import org.apache.qpid.example.shared.FileUtils; +import org.apache.qpid.example.shared.Statics; + +import java.io.*; +import javax.jms.*; + +public class FileMessageFactory +{ + protected final Session _session; + protected final String _payload; + protected final String _filename; + + /** + * Contructs and instance using a filename from which content will be used to create message + * @param session + * @param filename + * @throws MessageFactoryException + */ + public FileMessageFactory(Session session, String filename) throws MessageFactoryException + { + try + { + _filename = filename; + _payload = FileUtils.getFileContent(filename); + _session = session; + } + catch (IOException e) + { + throw new MessageFactoryException(e.toString()); + } + } + + /** + * Creates a text message and sets filename property on it + * The filename property is purely intended to provide visibility + * of file content passing trhough the broker using example classes + * @return Message - a TextMessage with content from file + * @throws JMSException + */ + public Message createEventMessage() throws JMSException + { + TextMessage msg = _session.createTextMessage(); + msg.setText(_payload); + msg.setStringProperty(Statics.FILENAME_PROPERTY,new File(_filename).getName()); + return msg; + } + + /** + * Creates message from a string for use by the monitor + * @param session + * @param textMsg - message content + * @return Message - TextMessage with content from String + * @throws JMSException + */ + public static Message createSimpleEventMessage(Session session, String textMsg) throws JMSException + { + TextMessage msg = session.createTextMessage(); + msg.setText(textMsg); + return msg; + } + + public Message createShutdownMessage() throws JMSException + { + return _session.createTextMessage("SHUTDOWN"); + } + + public Message createReportRequestMessage() throws JMSException + { + return _session.createTextMessage("REPORT"); + } + + public Message createReportResponseMessage(String msg) throws JMSException + { + return _session.createTextMessage(msg); + } + + public boolean isShutdown(Message m) + { + return checkText(m, "SHUTDOWN"); + } + + public boolean isReport(Message m) + { + return checkText(m, "REPORT"); + } + + public Object getReport(Message m) + { + try + { + return ((TextMessage) m).getText(); + } + catch (JMSException e) + { + e.printStackTrace(System.out); + return e.toString(); + } + } + + private static boolean checkText(Message m, String s) + { + try + { + return m instanceof TextMessage && ((TextMessage) m).getText().equals(s); + } + catch (JMSException e) + { + e.printStackTrace(System.out); + return false; + } + } +} + diff --git a/java/client/src/test/java/org/apache/qpid/example/publisher/MessageFactory.java b/java/client/src/test/java/org/apache/qpid/example/publisher/MessageFactory.java deleted file mode 100644 index f9944284c8..0000000000 --- a/java/client/src/test/java/org/apache/qpid/example/publisher/MessageFactory.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.qpid.example.publisher; - -import org.apache.qpid.example.shared.FileUtils; -import org.apache.qpid.example.shared.Statics; - -import java.io.*; -import javax.jms.*; - -public class MessageFactory -{ - private final Session _session; - private final String _payload; - private final String _filename; - - public MessageFactory(Session session, String filename) throws MessageFactoryException - { - try - { - _filename = filename; - _payload = FileUtils.getFileContent(filename); - _session = session; - } - catch (IOException e) - { - throw new MessageFactoryException(e.toString()); - } - } - - /* - * Creates message and sets filename property on it - */ - public Message createEventMessage() throws JMSException - { - TextMessage msg = _session.createTextMessage(); - msg.setText(_payload); - msg.setStringProperty(Statics.FILENAME_PROPERTY,new File(_filename).getName()); - return msg; - } - - /* - * Creates message from a string for use by the monitor - */ - public static Message createSimpleEventMessage(Session session, String textMsg) throws JMSException - { - TextMessage msg = session.createTextMessage(); - msg.setText(textMsg); - return msg; - } - - public Message createShutdownMessage() throws JMSException - { - return _session.createTextMessage("SHUTDOWN"); - } - - public Message createReportRequestMessage() throws JMSException - { - return _session.createTextMessage("REPORT"); - } - - public Message createReportResponseMessage(String msg) throws JMSException - { - return _session.createTextMessage(msg); - } - - public boolean isShutdown(Message m) - { - return checkText(m, "SHUTDOWN"); - } - - public boolean isReport(Message m) - { - return checkText(m, "REPORT"); - } - - public Object getReport(Message m) - { - try - { - return ((TextMessage) m).getText(); - } - catch (JMSException e) - { - e.printStackTrace(System.out); - return e.toString(); - } - } - - private static boolean checkText(Message m, String s) - { - try - { - return m instanceof TextMessage && ((TextMessage) m).getText().equals(s); - } - catch (JMSException e) - { - e.printStackTrace(System.out); - return false; - } - } -} - diff --git a/java/client/src/test/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java b/java/client/src/test/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java index 16b32da22a..8784d340da 100644 --- a/java/client/src/test/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java +++ b/java/client/src/test/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java @@ -20,8 +20,9 @@ package org.apache.qpid.example.publisher; import org.apache.log4j.Logger; import org.apache.log4j.BasicConfigurator; -import org.apache.qpid.example.shared.Statics; + import javax.jms.*; + import java.util.Properties; /** @@ -32,14 +33,18 @@ public class MonitorMessageDispatcher { private static final Logger _logger = Logger.getLogger(MonitorMessageDispatcher.class); - private static MonitorPublisher _monitorPublisher = null; + protected static MonitorPublisher _monitorPublisher = null; - private static final String DEFAULT_MONITOR_PUB_NAME = "MonitorPublisher"; + protected static final String DEFAULT_MONITOR_PUB_NAME = "MonitorPublisher"; + /** + * Easy entry point for running a message dispatcher for monitoring consumption + * @param args + */ public static void main(String[] args) { - //@TODO switch on logging appropriately at your app level + //Switch on logging appropriately for your app BasicConfigurator.configure(); try @@ -61,7 +66,7 @@ public class MonitorMessageDispatcher { } catch(UndeliveredMessageException a) { - //@TODO trigger application specific failure handling here + //trigger application specific failure handling here _logger.error("Problem delivering monitor message"); break; } @@ -69,8 +74,7 @@ public class MonitorMessageDispatcher { } catch(Exception e) { - - System.err.println("Error trying to dispatch AMS monitor message: " + e); + _logger.error("Error trying to dispatch AMS monitor message: " + e); System.exit(1); } finally @@ -84,15 +88,21 @@ public class MonitorMessageDispatcher { System.exit(1); } - //Publish heartbeat message + /** + * Publish heartbeat message + * @throws JMSException + * @throws UndeliveredMessageException + */ public static void publish() throws JMSException, UndeliveredMessageException { //Send the message generated from the payload using the _publisher getMonitorPublisher().sendImmediateMessage - (MessageFactory.createSimpleEventMessage(getMonitorPublisher().getSession(),"monitor:" +System.currentTimeMillis())); + (FileMessageFactory.createSimpleEventMessage(getMonitorPublisher().getSession(),"monitor:" +System.currentTimeMillis())); } - //cleanup publishers + /** + * Cleanup publishers + */ public static void cleanup() { if (getMonitorPublisher() != null) @@ -114,9 +124,6 @@ public class MonitorMessageDispatcher { return _monitorPublisher; } - //Create _publisher using system properties - Properties props = System.getProperties(); - //Create a _publisher using failover details and constant for monitor queue _monitorPublisher = new MonitorPublisher(); diff --git a/java/client/src/test/java/org/apache/qpid/example/publisher/Publisher.java b/java/client/src/test/java/org/apache/qpid/example/publisher/Publisher.java index d64fd9b142..be42e0e413 100644 --- a/java/client/src/test/java/org/apache/qpid/example/publisher/Publisher.java +++ b/java/client/src/test/java/org/apache/qpid/example/publisher/Publisher.java @@ -22,14 +22,14 @@ import org.apache.log4j.Logger; import org.apache.qpid.client.AMQConnectionFactory; -import org.apache.qpid.jms.Session; - import javax.jms.JMSException; import javax.jms.Message; import javax.jms.DeliveryMode; import javax.jms.Queue; import javax.jms.MessageProducer; import javax.jms.Connection; +import javax.jms.Session; + import javax.naming.InitialContext; import org.apache.qpid.example.shared.InitialContextHelper; @@ -44,7 +44,7 @@ public class Publisher protected Session _session; - private MessageProducer _producer; + protected MessageProducer _producer; protected String _destinationDir; @@ -54,7 +54,10 @@ public class Publisher protected static final String _defaultDestinationDir = "/tmp"; - //constructor for use with a single host + /** + * Creates a Publisher instance using properties from example.properties + * See InitialContextHelper for details of how context etc created + */ public Publisher() { try @@ -68,7 +71,7 @@ public class Publisher _connection = cf.createConnection(); //create a transactional session - _session = (Session) _connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + _session = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE); //lookup the example queue and use it //Queue is non-exclusive and not deleted when last consumer detaches @@ -90,8 +93,9 @@ public class Publisher } /** - * Publishes a non-persistent message using transacted session - **/ + * Publishes a non-persistent message using transacted session + * Note that persistent is the default mode for send - so need to specify for transient + */ public boolean sendMessage(Message message) { try @@ -124,6 +128,9 @@ public class Publisher return true; } + /** + * Cleanup resources before exit + */ public void cleanup() { try @@ -138,11 +145,15 @@ public class Publisher } catch(Exception e) { - System.err.println("Error trying to cleanup publisher " + e); + _log.error("Error trying to cleanup publisher " + e); System.exit(1); } } + /** + * Exposes session + * @return Session + */ public Session getSession() { return _session; -- cgit v1.2.1