summaryrefslogtreecommitdiff
path: root/java/client/src/test
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-03-13 10:35:42 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-03-13 10:35:42 +0000
commitc4f018f7c10d2169ced4c59e776844ee5cf52d33 (patch)
tree421d85cf41bf382bab618587298d9d6bef825bdc /java/client/src/test
parent685ad5615e73f02f76f69841162fb9aa126892d2 (diff)
downloadqpid-python-c4f018f7c10d2169ced4c59e776844ee5cf52d33.tar.gz
QPID-346,QPID-386,QPID-403, QPID-410 Rollback, Basic-Reject, QueueBrowser NO_ACK.
QPID-346 Message loss after rollback\recover QPID-386 Updated Transactional Tests to cover underlying AMQP/Qpid state. QPID-403 Implement Basic.Reject QPID-410 Queue Browsers should use not acknowledge messages. ------------------------------------- Broker TxAck - Added comment and fixed white space UnacknowledgedMessage - Added comment for messageDecrement AMQChannel - Added extra debugging. + Created a NonTransactionalContext for requeuing messages as using txContext will tie the requeue to any runing transaction. + Updated message reference counting. So it is in terms of queues don't increment when giving to client. BasicCancelMethodHandler - Added Debug log. BasicConsumeMethodHandler - Reverted to directly writes frames to the session, throwing ChannelException caused problems. Added Trace and debug logging. BasicRejectMethodHandler, ChannelCloseHandler, ConnectionCloseMethodHandler - Added Debug logging AMQPFastProtocolHandler - moved error log to before session.write AMQMessage - Added additional debug via debugIdentity() and comments AMQQueue - Decoupled reference counting from dequeue operation. ConcurrentSelectorDeliveryManager - Added comments and increased info in debug logging SubscriptionImpl - Disabled use of acks for browsers. For now put setDeliveredToConsumer back in the finally block. commented that I'm not sure this is correct as even an error writing to client will cause msg to be marked delivered to consumer. + On Close ensured that it is only called once. + Had problem where closing browser was causing two CancelOk frames to be sent back to client. RequiredDeliveryException - Added comment to explain incrementReference LocalTransactionalContext - Commented out incrementReference as it shouldn't be required here. NonTransactionalContext - Removed incrementReference on deliver + - Fixed bug where browsers - acks would cause messages to be discarded. new JIRA this needs tidied up. TxnBuffer - Added debug logging. Client ------ AMQQueueBrowser - Added comments AMQSession - Added comments and debug + Updated to cause closed consumer to reject messages rather than receive them. + Prevented NoConsumer's from rollingback and rejecting.. they simply clear their SyncQueue - JIRA to ensure clean state with rollback BasicMessageConsumer - Added trace level debuging on close calls + Forced noConsume-rs to use NO_ACK + added more logging Closeable - Updated to use isClosed rather than directly calling _closed.get() to aid in future work on ensuring multi threaded close still allows pending acks to be processed first. ChannelCloseOkMethodHandler - updated comment AMQProtocolSession - Update comments,whitespace TransportConnection - removed static block FlowControllingBlockingQueue - Added isEmpty() Method PropertyValueTest - Added VM Broker setup + Updated test to run once and 50 times to pull out delivery tag problems that were occuring. + Adjusted logging level to be more helpful. moved some info down to trace and debug. MessageRequeueTest - Moved QpidClientConnection its own file. + Fixed it so it actually runs more than one consumer, concurrently.Now 3 was 1. ConcurrentLinkedMessageQueueAtomicSize - Implemented iterator(). Added QueueBrowserTest to system tests to test QueueBrowsering. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@517638 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src/test')
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java71
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java314
-rw-r--r--java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java268
3 files changed, 363 insertions, 290 deletions
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
index fe15fa5155..1e50a62fee 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
@@ -39,8 +39,8 @@ import org.apache.log4j.Logger;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.client.message.JMSTextMessage;
-import org.apache.qpid.testutil.VMBrokerSetup;
public class PropertyValueTest extends TestCase implements MessageListener
{
@@ -59,19 +59,13 @@ public class PropertyValueTest extends TestCase implements MessageListener
protected void setUp() throws Exception
{
super.setUp();
- try
- {
- init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test"));
- }
- catch (Exception e)
- {
- fail("Unable to initialilse connection: " + e);
- }
+ TransportConnection.createVMBroker(1);
}
protected void tearDown() throws Exception
{
super.tearDown();
+ TransportConnection.killVMBroker(1);
}
private void init(AMQConnection connection) throws Exception
@@ -91,14 +85,48 @@ public class PropertyValueTest extends TestCase implements MessageListener
connection.start();
}
- public void test() throws Exception
+ public void testOnce()
{
- int count = _count;
- send(count);
- waitFor(count);
- check();
- _logger.info("Completed without failure");
- _connection.close();
+ runBatch(1);
+ }
+
+ public void test50()
+ {
+ runBatch(50);
+ }
+
+ private void runBatch(int runSize)
+ {
+ try
+ {
+ int run = 0;
+ while (run < runSize)
+ {
+ _logger.error("Run Number:" + run++);
+ try
+ {
+ init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test"));
+ }
+ catch (Exception e)
+ {
+ fail("Unable to initialilse connection: " + e);
+ }
+
+ int count = _count;
+ send(count);
+ waitFor(count);
+ check();
+ _logger.info("Completed without failure");
+ _connection.close();
+
+ _logger.error("End Run Number:" + (run - 1));
+ }
+ }
+ catch (Exception e)
+ {
+ _logger.error(e.getMessage(), e);
+ e.printStackTrace();
+ }
}
void send(int count) throws JMSException
@@ -138,7 +166,7 @@ public class PropertyValueTest extends TestCase implements MessageListener
m.setJMSReplyTo(q);
m.setStringProperty("TempQueue", q.toString());
- _logger.info("Message:" + m);
+ _logger.trace("Message:" + m);
Assert.assertEquals("Check temp queue has been set correctly",
m.getJMSReplyTo().toString(), m.getStringProperty("TempQueue"));
@@ -150,7 +178,7 @@ public class PropertyValueTest extends TestCase implements MessageListener
m.setShortProperty("Short", (short) Short.MAX_VALUE);
m.setStringProperty("String", "Test");
- _logger.info("Sending Msg:" + m);
+ _logger.debug("Sending Msg:" + m);
producer.send(m);
}
}
@@ -206,8 +234,11 @@ public class PropertyValueTest extends TestCase implements MessageListener
Assert.assertEquals("Check String properties are correctly transported",
"Test", m.getStringProperty("String"));
}
+ received.clear();
assertEqual(messages.iterator(), actual.iterator());
+
+ messages.clear();
}
private static void assertEqual(Iterator expected, Iterator actual)
@@ -269,11 +300,11 @@ public class PropertyValueTest extends TestCase implements MessageListener
{
test._count = Integer.parseInt(argv[1]);
}
- test.test();
+ test.testOnce();
}
public static junit.framework.Test suite()
{
- return new VMBrokerSetup(new junit.framework.TestSuite(PropertyValueTest.class));
+ return new junit.framework.TestSuite(PropertyValueTest.class);
}
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
index a56bae3d70..7762cb3fe9 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
@@ -42,6 +42,7 @@ import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.url.URLSyntaxException;
import org.apache.qpid.AMQException;
+import org.apache.qpid.testutil.QpidClientConnection;
import org.apache.log4j.Logger;
import org.apache.log4j.Level;
@@ -62,14 +63,14 @@ public class MessageRequeueTest extends TestCase
private boolean testReception = true;
private long[] receieved = new long[numTestMessages + 1];
- private boolean passed=false;
+ private boolean passed = false;
protected void setUp() throws Exception
{
super.setUp();
TransportConnection.createVMBroker(1);
- QpidClientConnection conn = new QpidClientConnection();
+ QpidClientConnection conn = new QpidClientConnection(BROKER);
conn.connect();
// clear queue
@@ -85,21 +86,28 @@ public class MessageRequeueTest extends TestCase
{
super.tearDown();
- if (!passed)
+ if (!passed) // clean up
{
- QpidClientConnection conn = new QpidClientConnection();
+ QpidClientConnection conn = new QpidClientConnection(BROKER);
conn.connect();
// clear queue
conn.consume(queue, consumeTimeout);
+
+ conn.disconnect();
}
TransportConnection.killVMBroker(1);
}
- /** multiple consumers */
+ /**
+ * multiple consumers
+ *
+ * @throws javax.jms.JMSException if a JMS problem occurs
+ * @throws InterruptedException on timeout
+ */
public void testDrain() throws JMSException, InterruptedException
{
- QpidClientConnection conn = new QpidClientConnection();
+ QpidClientConnection conn = new QpidClientConnection(BROKER);
conn.connect();
@@ -170,6 +178,7 @@ public class MessageRequeueTest extends TestCase
assertEquals(list.toString(), 0, failed);
_logger.info("consumed: " + messagesReceived);
conn.disconnect();
+ passed = true;
}
/** multiple consumers */
@@ -186,8 +195,8 @@ public class MessageRequeueTest extends TestCase
Thread t4 = new Thread(c4);
t1.start();
-// t2.start();
-// t3.start();
+ t2.start();
+ t3.start();
// t4.start();
try
@@ -230,7 +239,7 @@ public class MessageRequeueTest extends TestCase
}
assertEquals(list.toString() + "-" + numTestMessages + "-" + totalConsumed, 0, failed);
assertEquals("number of consumed messages does not match initial data", numTestMessages, totalConsumed);
- passed=true;
+ passed = true;
}
class Consumer implements Runnable
@@ -248,7 +257,7 @@ public class MessageRequeueTest extends TestCase
try
{
_logger.info("consumer-" + id + ": starting");
- QpidClientConnection conn = new QpidClientConnection();
+ QpidClientConnection conn = new QpidClientConnection(BROKER);
conn.connect();
@@ -318,286 +327,51 @@ public class MessageRequeueTest extends TestCase
}
- public class QpidClientConnection implements ExceptionListener
+ public void testRequeue() throws JMSException, AMQException, URLSyntaxException
{
- private boolean transacted = true;
- private int ackMode = Session.CLIENT_ACKNOWLEDGE;
- private Connection connection;
-
- private String virtualHost;
- private String brokerlist;
- private int prefetch;
- protected Session session;
- protected boolean connected;
-
- public QpidClientConnection()
- {
- super();
- setVirtualHost("/test");
- setBrokerList(BROKER);
- setPrefetch(5000);
- }
-
-
- public void connect() throws JMSException
- {
- if (!connected)
- {
- /*
- * amqp://[user:pass@][clientid]/virtualhost?
- * brokerlist='[transport://]host[:port][?option='value'[&option='value']];'
- * [&failover='method[?option='value'[&option='value']]']
- * [&option='value']"
- */
- String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'";
- try
- {
- AMQConnectionFactory factory = new AMQConnectionFactory(new AMQConnectionURL(brokerUrl));
- _logger.info("connecting to Qpid :" + brokerUrl);
- connection = factory.createConnection();
-
- // register exception listener
- connection.setExceptionListener(this);
-
- session = ((AMQConnection) connection).createSession(transacted, ackMode, prefetch);
-
-
- _logger.info("starting connection");
- connection.start();
-
- connected = true;
- }
- catch (URLSyntaxException e)
- {
- throw new JMSException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage());
- }
- }
- }
-
- public void disconnect() throws JMSException
- {
- if (connected)
- {
- session.commit();
- session.close();
- connection.close();
- connected = false;
- _logger.info("disconnected");
- }
- }
-
- public void disconnectWithoutCommit() throws JMSException
- {
- if (connected)
- {
- session.close();
- connection.close();
- connected = false;
- _logger.info("disconnected without commit");
- }
- }
-
- public String getBrokerList()
- {
- return brokerlist;
- }
-
- public void setBrokerList(String brokerlist)
- {
- this.brokerlist = brokerlist;
- }
-
- public String getVirtualHost()
+ int run = 0;
+ while (run < 10)
{
- return virtualHost;
- }
-
- public void setVirtualHost(String virtualHost)
- {
- this.virtualHost = virtualHost;
- }
-
- public void setPrefetch(int prefetch)
- {
- this.prefetch = prefetch;
- }
+ run++;
-
- /** override as necessary */
- public void onException(JMSException exception)
- {
- _logger.info("ExceptionListener event: error " + exception.getErrorCode() + ", message: " + exception.getMessage());
- }
-
- public boolean isConnected()
- {
- return connected;
- }
-
- public Session getSession()
- {
- return session;
- }
-
- /**
- * Put a String as a text messages, repeat n times. A null payload will result in a null message.
- *
- * @param queueName The queue name to put to
- * @param payload the content of the payload
- * @param copies the number of messages to put
- *
- * @throws javax.jms.JMSException any exception that occurs
- */
- public void put(String queueName, String payload, int copies) throws JMSException
- {
- if (!connected)
- {
- connect();
- }
-
- _logger.info("putting to queue " + queueName);
- Queue queue = session.createQueue(queueName);
-
- final MessageProducer sender = session.createProducer(queue);
-
- for (int i = 0; i < copies; i++)
- {
- Message m = session.createTextMessage(payload + i);
- m.setIntProperty("index", i + 1);
- sender.send(m);
- }
-
- session.commit();
- sender.close();
- _logger.info("put " + copies + " copies");
- }
-
- /**
- * GET the top message on a queue. Consumes the message. Accepts timeout value.
- *
- * @param queueName The quename to get from
- * @param readTimeout The timeout to use
- *
- * @return the content of the text message if any
- *
- * @throws javax.jms.JMSException any exception that occured
- */
- public Message getNextMessage(String queueName, long readTimeout) throws JMSException
- {
- if (!connected)
+ if (_logger.isInfoEnabled())
{
- connect();
+ _logger.info("testRequeue run " + run);
}
- Queue queue = session.createQueue(queueName);
+ String virtualHost = "/test";
+ String brokerlist = BROKER;
+ String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'";
- final MessageConsumer consumer = session.createConsumer(queue);
+ Connection conn = new AMQConnection(brokerUrl);
+ Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Queue q = session.createQueue(queue);
- Message message = consumer.receive(readTimeout);
- session.commit();
- consumer.close();
-
- Message result;
+ _logger.debug("Create Consumer");
+ MessageConsumer consumer = session.createConsumer(q);
- // all messages we consume should be TextMessages
- if (message instanceof TextMessage)
- {
- result = ((TextMessage) message);
- }
- else if (null == message)
+ try
{
- result = null;
+ Thread.sleep(2000);
}
- else
+ catch (InterruptedException e)
{
- _logger.info("warning: received non-text message");
- result = message;
+ //
}
- return result;
- }
+ _logger.debug("Receiving msg");
+ Message msg = consumer.receive(1000);
- /**
- * GET the top message on a queue. Consumes the message.
- *
- * @param queueName The Queuename to get from
- *
- * @return The string content of the text message, if any received
- *
- * @throws javax.jms.JMSException any exception that occurs
- */
- public Message getNextMessage(String queueName) throws JMSException
- {
- return getNextMessage(queueName, 0);
- }
+ assertNotNull("Message should not be null", msg);
- /**
- * Completely clears a queue. For readTimeout behaviour see Javadocs for javax.jms.MessageConsumer.
- *
- * @param queueName The Queue name to consume from
- * @param readTimeout The timeout for each consume
- *
- * @throws javax.jms.JMSException Any exception that occurs during the consume
- * @throws InterruptedException If the consume thread was interrupted during a consume.
- */
- public void consume(String queueName, int readTimeout) throws JMSException, InterruptedException
- {
- if (!connected)
- {
- connect();
- }
-
- _logger.info("consuming queue " + queueName);
- Queue queue = session.createQueue(queueName);
-
- final MessageConsumer consumer = session.createConsumer(queue);
- int messagesReceived = 0;
-
- _logger.info("consuming...");
- while ((consumer.receive(readTimeout)) != null)
- {
- messagesReceived++;
- }
- session.commit();
+ // As we have not ack'd message will be requeued.
+ _logger.debug("Close Consumer");
consumer.close();
- _logger.info("consumed: " + messagesReceived);
- }
- }
-
-
- public void testRequeue() throws JMSException, AMQException, URLSyntaxException
- {
- String virtualHost = "/test";
- String brokerlist = "vm://:1";
- String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'";
- Connection conn = new AMQConnection(brokerUrl);
- Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue q = session.createQueue(queue);
-
- _logger.info("Create Consumer");
- MessageConsumer consumer = session.createConsumer(q);
-
- try
- {
- Thread.sleep(2000);
- }
- catch (InterruptedException e)
- {
- //
+ _logger.debug("Close Connection");
+ conn.close();
}
-
- _logger.info("Receiving msg");
- Message msg = consumer.receive();
-
- assertNotNull("Message should not be null", msg);
-
- _logger.info("Close Consumer");
- consumer.close();
-
- _logger.info("Close Connection");
- conn.close();
}
} \ No newline at end of file
diff --git a/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java b/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java
new file mode 100644
index 0000000000..f2afa472ab
--- /dev/null
+++ b/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java
@@ -0,0 +1,268 @@
+package org.apache.qpid.testutil;
+
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.client.AMQConnectionURL;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.url.URLSyntaxException;
+import org.apache.log4j.Logger;
+
+import javax.jms.ExceptionListener;
+import javax.jms.Session;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.MessageProducer;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.TextMessage;
+
+public class QpidClientConnection implements ExceptionListener
+{
+
+ private static final Logger _logger = Logger.getLogger(QpidClientConnection.class);
+
+ private boolean transacted = true;
+ private int ackMode = Session.CLIENT_ACKNOWLEDGE;
+ private Connection connection;
+
+ private String virtualHost;
+ private String brokerlist;
+ private int prefetch;
+ protected Session session;
+ protected boolean connected;
+
+ public QpidClientConnection(String broker)
+ {
+ super();
+ setVirtualHost("/test");
+ setBrokerList(broker);
+ setPrefetch(5000);
+ }
+
+
+ public void connect() throws JMSException
+ {
+ if (!connected)
+ {
+ /*
+ * amqp://[user:pass@][clientid]/virtualhost?
+ * brokerlist='[transport://]host[:port][?option='value'[&option='value']];'
+ * [&failover='method[?option='value'[&option='value']]']
+ * [&option='value']"
+ */
+ String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'";
+ try
+ {
+ AMQConnectionFactory factory = new AMQConnectionFactory(new AMQConnectionURL(brokerUrl));
+ _logger.info("connecting to Qpid :" + brokerUrl);
+ connection = factory.createConnection();
+
+ // register exception listener
+ connection.setExceptionListener(this);
+
+ session = ((AMQConnection) connection).createSession(transacted, ackMode, prefetch);
+
+
+ _logger.info("starting connection");
+ connection.start();
+
+ connected = true;
+ }
+ catch (URLSyntaxException e)
+ {
+ throw new JMSException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage());
+ }
+ }
+ }
+
+ public void disconnect() throws JMSException
+ {
+ if (connected)
+ {
+ session.commit();
+ session.close();
+ connection.close();
+ connected = false;
+ _logger.info("disconnected");
+ }
+ }
+
+ public void disconnectWithoutCommit() throws JMSException
+ {
+ if (connected)
+ {
+ session.close();
+ connection.close();
+ connected = false;
+ _logger.info("disconnected without commit");
+ }
+ }
+
+ public String getBrokerList()
+ {
+ return brokerlist;
+ }
+
+ public void setBrokerList(String brokerlist)
+ {
+ this.brokerlist = brokerlist;
+ }
+
+ public String getVirtualHost()
+ {
+ return virtualHost;
+ }
+
+ public void setVirtualHost(String virtualHost)
+ {
+ this.virtualHost = virtualHost;
+ }
+
+ public void setPrefetch(int prefetch)
+ {
+ this.prefetch = prefetch;
+ }
+
+
+ /** override as necessary */
+ public void onException(JMSException exception)
+ {
+ _logger.info("ExceptionListener event: error " + exception.getErrorCode() + ", message: " + exception.getMessage());
+ }
+
+ public boolean isConnected()
+ {
+ return connected;
+ }
+
+ public Session getSession()
+ {
+ return session;
+ }
+
+ /**
+ * Put a String as a text messages, repeat n times. A null payload will result in a null message.
+ *
+ * @param queueName The queue name to put to
+ * @param payload the content of the payload
+ * @param copies the number of messages to put
+ *
+ * @throws javax.jms.JMSException any exception that occurs
+ */
+ public void put(String queueName, String payload, int copies) throws JMSException
+ {
+ if (!connected)
+ {
+ connect();
+ }
+
+ _logger.info("putting to queue " + queueName);
+ Queue queue = session.createQueue(queueName);
+
+ final MessageProducer sender = session.createProducer(queue);
+
+ for (int i = 0; i < copies; i++)
+ {
+ Message m = session.createTextMessage(payload + i);
+ m.setIntProperty("index", i + 1);
+ sender.send(m);
+ }
+
+ session.commit();
+ sender.close();
+ _logger.info("put " + copies + " copies");
+ }
+
+ /**
+ * GET the top message on a queue. Consumes the message. Accepts timeout value.
+ *
+ * @param queueName The quename to get from
+ * @param readTimeout The timeout to use
+ *
+ * @return the content of the text message if any
+ *
+ * @throws javax.jms.JMSException any exception that occured
+ */
+ public Message getNextMessage(String queueName, long readTimeout) throws JMSException
+ {
+ if (!connected)
+ {
+ connect();
+ }
+
+ Queue queue = session.createQueue(queueName);
+
+ final MessageConsumer consumer = session.createConsumer(queue);
+
+ Message message = consumer.receive(readTimeout);
+ session.commit();
+ consumer.close();
+
+ Message result;
+
+ // all messages we consume should be TextMessages
+ if (message instanceof TextMessage)
+ {
+ result = ((TextMessage) message);
+ }
+ else if (null == message)
+ {
+ result = null;
+ }
+ else
+ {
+ _logger.info("warning: received non-text message");
+ result = message;
+ }
+
+ return result;
+ }
+
+ /**
+ * GET the top message on a queue. Consumes the message.
+ *
+ * @param queueName The Queuename to get from
+ *
+ * @return The string content of the text message, if any received
+ *
+ * @throws javax.jms.JMSException any exception that occurs
+ */
+ public Message getNextMessage(String queueName) throws JMSException
+ {
+ return getNextMessage(queueName, 0);
+ }
+
+ /**
+ * Completely clears a queue. For readTimeout behaviour see Javadocs for javax.jms.MessageConsumer.
+ *
+ * @param queueName The Queue name to consume from
+ * @param readTimeout The timeout for each consume
+ *
+ * @throws javax.jms.JMSException Any exception that occurs during the consume
+ * @throws InterruptedException If the consume thread was interrupted during a consume.
+ */
+ public void consume(String queueName, int readTimeout) throws JMSException, InterruptedException
+ {
+ if (!connected)
+ {
+ connect();
+ }
+
+ _logger.info("consuming queue " + queueName);
+ Queue queue = session.createQueue(queueName);
+
+ final MessageConsumer consumer = session.createConsumer(queue);
+ int messagesReceived = 0;
+
+ _logger.info("consuming...");
+ while ((consumer.receive(readTimeout)) != null)
+ {
+ messagesReceived++;
+ }
+
+ session.commit();
+ consumer.close();
+ _logger.info("consumed: " + messagesReceived);
+ }
+}
+