summaryrefslogtreecommitdiff
path: root/java/client/src/test
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-03-06 14:12:47 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-03-06 14:12:47 +0000
commitf39d26362950a622eabbccbcc1616621d5015db1 (patch)
tree27a8e73ee10cb483dd64fe03928275a576f7587b /java/client/src/test
parent92910e32ef098b6f60b25781456157c2bcd1fe81 (diff)
downloadqpid-python-f39d26362950a622eabbccbcc1616621d5015db1.tar.gz
QPID-403 QPID-346 QPID-355 QPID-386 QPID-389 Updates to fix Transactional Rollback.
QPID-346 Message loss after rollback\recover QPID-355 Closing a consumer does not ensure messages delivery will stop for that subscription QPID-386 Updated Transactional Tests to cover underlying AMQP/Qpid state. QPID-389 Prefetched message are not correctly returned to the queue QPID-403 Implement Basic.Reject Broker UnacknowledgedMessage - Added toString for debug UnacknowledgedMessageMapImpl - Removed resendMessages method as all sending should go via DeliveryManager and Subscription. AMQChannel - Updated resend and requeue methods so they do not directly write messages to a subscriber. This was violating the suspension state. - Used a local non-transactional context to requeue messages as the internal requeuing of messages on the broker should not be part of any client transaction. - Maked messages as resent. - Removed warnings from IDE about missing JavaDoc text etc. BasicAckMethodHandler - Added debugging BasicRecoverMethodHandler - Removed session from the resend call. BasicRejectMethodHandler - Initial implementation. Hooks left for possible 'resend' bit. ChannelCloseHandler - Fixed bug where channel wasn't marked as fully closed on reception of a close from the client. TxRollbackHandler - Removed session from resend call. AMQMinaProtocolSession - Fixed bug where channel was marked as awaiting closure before it had actually been closed. This causes problems as the close looks up the channel by id again which will return null after it has been marked as awaiting closure. AMQMessage - Initial implementation of Rejection. Currently inactive in hasInterest() as we are miss-using reject to requeue prefetched messages from the client. AMQQueue - Removed debug method as it made reading the log very difficult as all the logs had the same line number ConcurrentSelectorDeliveryManager - Fixed clearAllMessages() as it didn't actually remove the messages. - Fixed bad logic in getNextMessage when using null subscriber. (as done by clearAllMessages) - Added more logging messages. Made more frequent logging a trace value. - Added debugIdentity() method to reduce over head in calculating standard log prefix. - Allowed messages to be added to the front of the queue. - Added currentStatus() to an overview of the queue's current state. SubscriptionImpl - Updated to handle closure correctly (QPID-355) -Updated the deliver method so it doesn't use a try->finally to do msg.setDeliveredToConsumer() as this would be done even in the event of an error. - Created an additional logger to log suspension calls rather than through the SI logger which logs a lot of detail. Client pom.xml - Excluded older version of log4j that commons-collections exposes. AMQSession - Added ability for dispatcher to start in stopped state. - Added dispatcher logger - Added checks around logging - Added message rejection if the dispatcher receives a message that it doesn't have a consumer for. - Updated message rejection to allow the dispatcher to perform the rejection if running this ensures that all queued messages are processed correctly and rejection occurs in order. - rollback() before calling rollback all pending queued messages must be rejected as rollback will clear unacked map which the rejects caused by rollback() will need. - fixed closedProducersAndConsumers so that it will rethrow any JMS Exception - recover() as for rollback() the rejects need to be done before the Recover Call to the broker. - Allowed delclareExchange to be done synchronously programatically - Updated confirmConsumerCancelled to use the dispatcher to perform the clean up. This required the creation of the dispatcher in stopped mode so that it does not start and message attempted to be delivered while the subscriber is being cancelled. BasicMessageConsumer - Updated close not to perform the deregistration. This is done in via BasicCancelOkMethodHandler - Added guards on logging - Record all messages that have been received so they can be rejected if rollback occurs. so had to change impl of acknowledgeLastDelivered. - Updated Rollback to initially reject all received messages that are still unAcked. - Added a recursive call should the queue not be empty at the end of the rollback.. with a warning. BasicCancelOkMethodHandler - White space changes to meet style guide. Added guard on logging. UnprocessedMessage - White space changes to meet style guide. StateWaiter - Added comment about timeout bug. FlowControllingBlockingQueue - Tidied imports RecoverTest - Updated as declareExchange is now Synchronous ChannelCloseTest - added guard on logging MessageRequeueTest - Added to better test underlying AMQP/Qpid state QPID-386 StreamMessageTest - Updated as declareExchange is now Synchronous CommitRollbackTest - added Additional test case to ensure prefetch queue is correctly purged. TransactedTest - Added logging and additional tests. Cluster SimpleClusterTest - updated in line with AMQSession.delcareExchange changes Common AMQConstant - Fixed error code 'not allowed' should be 530 not 507. ConcurrentLinkedMessageQueueAtomicSize - Updated to beable to get the size of messages on the 'head' queue along with additional debug Systests ReturnUnroutableMandatoryMessageTest - Updated as declareExchange is now Synchronous git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@515127 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src/test')
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java6
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java10
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java603
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java3
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java106
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java122
6 files changed, 782 insertions, 68 deletions
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
index 338404a431..4667a2b3fa 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
@@ -73,7 +73,8 @@ public class RecoverTest extends TestCase
Queue queue = new AMQQueue(consumerSession.getDefaultQueueExchangeName(),new AMQShortString("someQ"), new AMQShortString("someQ"), false, true);
MessageConsumer consumer = consumerSession.createConsumer(queue);
//force synch to ensure the consumer has resulted in a bound queue
- ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
+ //((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
+ // This is the default now
AMQConnection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test");
Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
@@ -130,7 +131,8 @@ public class RecoverTest extends TestCase
Queue queue = new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("someQ"), new AMQShortString("someQ"), false, true);
MessageConsumer consumer = consumerSession.createConsumer(queue);
//force synch to ensure the consumer has resulted in a bound queue
- ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
+ //((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
+ // This is the default now
AMQConnection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test");
Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java
index 3431c56783..51bbe7d0e6 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java
@@ -109,6 +109,10 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con
}
catch (AMQException e)
{
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Exception occured was:" + e.getErrorCode());
+ }
assertEquals("Connection should be closed", AMQConstant.CHANNEL_ERROR, e.getErrorCode());
_connection = newConnection();
@@ -315,15 +319,15 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con
}
catch (JMSException e)
{
- fail("Creating new connection when:"+e.getMessage());
+ fail("Creating new connection when:" + e.getMessage());
}
catch (AMQException e)
{
- fail("Creating new connection when:"+e.getMessage());
+ fail("Creating new connection when:" + e.getMessage());
}
catch (URLSyntaxException e)
{
- fail("Creating new connection when:"+e.getMessage());
+ fail("Creating new connection when:" + e.getMessage());
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
new file mode 100644
index 0000000000..a56bae3d70
--- /dev/null
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
@@ -0,0 +1,603 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.test.unit.close;
+
+import junit.framework.TestCase;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+import javax.jms.ExceptionListener;
+import javax.jms.Session;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.MessageProducer;
+import javax.jms.Message;
+import javax.jms.TextMessage;
+import javax.jms.MessageConsumer;
+
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.client.AMQConnectionURL;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpid.AMQException;
+import org.apache.log4j.Logger;
+import org.apache.log4j.Level;
+
+public class MessageRequeueTest extends TestCase
+{
+
+ private static final Logger _logger = Logger.getLogger(MessageRequeueTest.class);
+
+ protected static AtomicInteger consumerIds = new AtomicInteger(0);
+ protected final Integer numTestMessages = 150;
+
+ protected final int consumeTimeout = 3000;
+
+ protected final String queue = "direct://amq.direct//queue";
+ protected String payload = "Message:";
+
+ protected final String BROKER = "vm://:1";
+ private boolean testReception = true;
+
+ private long[] receieved = new long[numTestMessages + 1];
+ private boolean passed=false;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ TransportConnection.createVMBroker(1);
+
+ QpidClientConnection conn = new QpidClientConnection();
+
+ conn.connect();
+ // clear queue
+ conn.consume(queue, consumeTimeout);
+ // load test data
+ _logger.info("creating test data, " + numTestMessages + " messages");
+ conn.put(queue, payload, numTestMessages);
+ // close this connection
+ conn.disconnect();
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+
+ if (!passed)
+ {
+ QpidClientConnection conn = new QpidClientConnection();
+
+ conn.connect();
+ // clear queue
+ conn.consume(queue, consumeTimeout);
+ }
+ TransportConnection.killVMBroker(1);
+ }
+
+ /** multiple consumers */
+ public void testDrain() throws JMSException, InterruptedException
+ {
+ QpidClientConnection conn = new QpidClientConnection();
+
+ conn.connect();
+
+ _logger.info("consuming queue " + queue);
+ Queue q = conn.getSession().createQueue(queue);
+
+ final MessageConsumer consumer = conn.getSession().createConsumer(q);
+ int messagesReceived = 0;
+
+ long messageLog[] = new long[numTestMessages + 1];
+
+ _logger.info("consuming...");
+ Message msg = consumer.receive(1000);
+ while (msg != null)
+ {
+ messagesReceived++;
+
+ long dt = ((AbstractJMSMessage) msg).getDeliveryTag();
+
+ int msgindex = msg.getIntProperty("index");
+ if (messageLog[msgindex] != 0)
+ {
+ _logger.error("Received Message(" + msgindex + ":" + ((AbstractJMSMessage) msg).getDeliveryTag() +
+ ") more than once.");
+ }
+
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Received Message(" + System.identityHashCode(msgindex) + ") " +
+ "DT:" + dt +
+ "IN:" + msgindex);
+ }
+
+ if (dt == 0)
+ {
+ _logger.error("DT is zero for msg:" + msgindex);
+ }
+
+ messageLog[msgindex] = dt;
+
+ //get Next message
+ msg = consumer.receive(1000);
+ }
+
+ conn.getSession().commit();
+ consumer.close();
+ assertEquals("number of consumed messages does not match initial data", (int) numTestMessages, messagesReceived);
+
+ int index = 0;
+ StringBuilder list = new StringBuilder();
+ list.append("Failed to receive:");
+ int failed = 0;
+
+ for (long b : messageLog)
+ {
+ if (b == 0 && index != 0) //delivery tag of zero shouldn't exist
+ {
+ _logger.error("Index: " + index + " was not received.");
+ list.append(" ");
+ list.append(index);
+ list.append(":");
+ list.append(b);
+ failed++;
+ }
+
+ index++;
+ }
+ assertEquals(list.toString(), 0, failed);
+ _logger.info("consumed: " + messagesReceived);
+ conn.disconnect();
+ }
+
+ /** multiple consumers */
+ public void testTwoCompetingConsumers()
+ {
+ Consumer c1 = new Consumer();
+ Consumer c2 = new Consumer();
+ Consumer c3 = new Consumer();
+ Consumer c4 = new Consumer();
+
+ Thread t1 = new Thread(c1);
+ Thread t2 = new Thread(c2);
+ Thread t3 = new Thread(c3);
+ Thread t4 = new Thread(c4);
+
+ t1.start();
+// t2.start();
+// t3.start();
+// t4.start();
+
+ try
+ {
+ t1.join();
+ t2.join();
+ t3.join();
+ t4.join();
+ }
+ catch (InterruptedException e)
+ {
+ fail("Uanble to join to Consumer theads");
+ }
+
+ _logger.info("consumer 1 count is " + c1.getCount());
+ _logger.info("consumer 2 count is " + c2.getCount());
+ _logger.info("consumer 3 count is " + c3.getCount());
+ _logger.info("consumer 4 count is " + c4.getCount());
+
+ Integer totalConsumed = c1.getCount() + c2.getCount() + c3.getCount() + c4.getCount();
+
+ // Check all messages were correctly delivered
+ int index = 0;
+ StringBuilder list = new StringBuilder();
+ list.append("Failed to receive:");
+ int failed = 0;
+
+ for (long b : receieved)
+ {
+ if (b == 0 && index != 0) //delivery tag of zero shouldn't exist (and we don't have msg 0)
+ {
+ _logger.error("Index: " + index + " was not received.");
+ list.append(" ");
+ list.append(index);
+ list.append(":");
+ list.append(b);
+ failed++;
+ }
+ index++;
+ }
+ assertEquals(list.toString() + "-" + numTestMessages + "-" + totalConsumed, 0, failed);
+ assertEquals("number of consumed messages does not match initial data", numTestMessages, totalConsumed);
+ passed=true;
+ }
+
+ class Consumer implements Runnable
+ {
+ private Integer count = 0;
+ private Integer id;
+
+ public Consumer()
+ {
+ id = consumerIds.addAndGet(1);
+ }
+
+ public void run()
+ {
+ try
+ {
+ _logger.info("consumer-" + id + ": starting");
+ QpidClientConnection conn = new QpidClientConnection();
+
+ conn.connect();
+
+ _logger.info("consumer-" + id + ": connected, consuming...");
+ Message result;
+ do
+ {
+ result = conn.getNextMessage(queue, consumeTimeout);
+ if (result != null)
+ {
+
+ long dt = ((AbstractJMSMessage) result).getDeliveryTag();
+
+ if (testReception)
+ {
+ int msgindex = result.getIntProperty("index");
+ if (receieved[msgindex] != 0)
+ {
+ _logger.error("Received Message(" + msgindex + ":" + ((AbstractJMSMessage) result).getDeliveryTag() +
+ ") more than once.");
+ }
+
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Received Message(" + System.identityHashCode(msgindex) + ") " +
+ "DT:" + dt +
+ "IN:" + msgindex);
+ }
+
+ if (dt == 0)
+ {
+ _logger.error("DT is zero for msg:" + msgindex);
+ }
+
+ receieved[msgindex] = dt;
+ }
+
+
+ count++;
+ if (count % 100 == 0)
+ {
+ _logger.info("consumer-" + id + ": got " + result + ", new count is " + count);
+ }
+ }
+ }
+ while (result != null);
+
+ _logger.info("consumer-" + id + ": complete");
+ conn.disconnect();
+
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ public Integer getCount()
+ {
+ return count;
+ }
+
+ public Integer getId()
+ {
+ return id;
+ }
+ }
+
+
+ public class QpidClientConnection implements ExceptionListener
+ {
+ private boolean transacted = true;
+ private int ackMode = Session.CLIENT_ACKNOWLEDGE;
+ private Connection connection;
+
+ private String virtualHost;
+ private String brokerlist;
+ private int prefetch;
+ protected Session session;
+ protected boolean connected;
+
+ public QpidClientConnection()
+ {
+ super();
+ setVirtualHost("/test");
+ setBrokerList(BROKER);
+ setPrefetch(5000);
+ }
+
+
+ public void connect() throws JMSException
+ {
+ if (!connected)
+ {
+ /*
+ * amqp://[user:pass@][clientid]/virtualhost?
+ * brokerlist='[transport://]host[:port][?option='value'[&option='value']];'
+ * [&failover='method[?option='value'[&option='value']]']
+ * [&option='value']"
+ */
+ String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'";
+ try
+ {
+ AMQConnectionFactory factory = new AMQConnectionFactory(new AMQConnectionURL(brokerUrl));
+ _logger.info("connecting to Qpid :" + brokerUrl);
+ connection = factory.createConnection();
+
+ // register exception listener
+ connection.setExceptionListener(this);
+
+ session = ((AMQConnection) connection).createSession(transacted, ackMode, prefetch);
+
+
+ _logger.info("starting connection");
+ connection.start();
+
+ connected = true;
+ }
+ catch (URLSyntaxException e)
+ {
+ throw new JMSException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage());
+ }
+ }
+ }
+
+ public void disconnect() throws JMSException
+ {
+ if (connected)
+ {
+ session.commit();
+ session.close();
+ connection.close();
+ connected = false;
+ _logger.info("disconnected");
+ }
+ }
+
+ public void disconnectWithoutCommit() throws JMSException
+ {
+ if (connected)
+ {
+ session.close();
+ connection.close();
+ connected = false;
+ _logger.info("disconnected without commit");
+ }
+ }
+
+ public String getBrokerList()
+ {
+ return brokerlist;
+ }
+
+ public void setBrokerList(String brokerlist)
+ {
+ this.brokerlist = brokerlist;
+ }
+
+ public String getVirtualHost()
+ {
+ return virtualHost;
+ }
+
+ public void setVirtualHost(String virtualHost)
+ {
+ this.virtualHost = virtualHost;
+ }
+
+ public void setPrefetch(int prefetch)
+ {
+ this.prefetch = prefetch;
+ }
+
+
+ /** override as necessary */
+ public void onException(JMSException exception)
+ {
+ _logger.info("ExceptionListener event: error " + exception.getErrorCode() + ", message: " + exception.getMessage());
+ }
+
+ public boolean isConnected()
+ {
+ return connected;
+ }
+
+ public Session getSession()
+ {
+ return session;
+ }
+
+ /**
+ * Put a String as a text messages, repeat n times. A null payload will result in a null message.
+ *
+ * @param queueName The queue name to put to
+ * @param payload the content of the payload
+ * @param copies the number of messages to put
+ *
+ * @throws javax.jms.JMSException any exception that occurs
+ */
+ public void put(String queueName, String payload, int copies) throws JMSException
+ {
+ if (!connected)
+ {
+ connect();
+ }
+
+ _logger.info("putting to queue " + queueName);
+ Queue queue = session.createQueue(queueName);
+
+ final MessageProducer sender = session.createProducer(queue);
+
+ for (int i = 0; i < copies; i++)
+ {
+ Message m = session.createTextMessage(payload + i);
+ m.setIntProperty("index", i + 1);
+ sender.send(m);
+ }
+
+ session.commit();
+ sender.close();
+ _logger.info("put " + copies + " copies");
+ }
+
+ /**
+ * GET the top message on a queue. Consumes the message. Accepts timeout value.
+ *
+ * @param queueName The quename to get from
+ * @param readTimeout The timeout to use
+ *
+ * @return the content of the text message if any
+ *
+ * @throws javax.jms.JMSException any exception that occured
+ */
+ public Message getNextMessage(String queueName, long readTimeout) throws JMSException
+ {
+ if (!connected)
+ {
+ connect();
+ }
+
+ Queue queue = session.createQueue(queueName);
+
+ final MessageConsumer consumer = session.createConsumer(queue);
+
+ Message message = consumer.receive(readTimeout);
+ session.commit();
+ consumer.close();
+
+ Message result;
+
+ // all messages we consume should be TextMessages
+ if (message instanceof TextMessage)
+ {
+ result = ((TextMessage) message);
+ }
+ else if (null == message)
+ {
+ result = null;
+ }
+ else
+ {
+ _logger.info("warning: received non-text message");
+ result = message;
+ }
+
+ return result;
+ }
+
+ /**
+ * GET the top message on a queue. Consumes the message.
+ *
+ * @param queueName The Queuename to get from
+ *
+ * @return The string content of the text message, if any received
+ *
+ * @throws javax.jms.JMSException any exception that occurs
+ */
+ public Message getNextMessage(String queueName) throws JMSException
+ {
+ return getNextMessage(queueName, 0);
+ }
+
+ /**
+ * Completely clears a queue. For readTimeout behaviour see Javadocs for javax.jms.MessageConsumer.
+ *
+ * @param queueName The Queue name to consume from
+ * @param readTimeout The timeout for each consume
+ *
+ * @throws javax.jms.JMSException Any exception that occurs during the consume
+ * @throws InterruptedException If the consume thread was interrupted during a consume.
+ */
+ public void consume(String queueName, int readTimeout) throws JMSException, InterruptedException
+ {
+ if (!connected)
+ {
+ connect();
+ }
+
+ _logger.info("consuming queue " + queueName);
+ Queue queue = session.createQueue(queueName);
+
+ final MessageConsumer consumer = session.createConsumer(queue);
+ int messagesReceived = 0;
+
+ _logger.info("consuming...");
+ while ((consumer.receive(readTimeout)) != null)
+ {
+ messagesReceived++;
+ }
+
+ session.commit();
+ consumer.close();
+ _logger.info("consumed: " + messagesReceived);
+ }
+ }
+
+
+ public void testRequeue() throws JMSException, AMQException, URLSyntaxException
+ {
+ String virtualHost = "/test";
+ String brokerlist = "vm://:1";
+ String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'";
+
+ Connection conn = new AMQConnection(brokerUrl);
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue q = session.createQueue(queue);
+
+ _logger.info("Create Consumer");
+ MessageConsumer consumer = session.createConsumer(q);
+
+ try
+ {
+ Thread.sleep(2000);
+ }
+ catch (InterruptedException e)
+ {
+ //
+ }
+
+ _logger.info("Receiving msg");
+ Message msg = consumer.receive();
+
+ assertNotNull("Message should not be null", msg);
+
+ _logger.info("Close Consumer");
+ consumer.close();
+
+ _logger.info("Close Connection");
+ conn.close();
+ }
+
+} \ No newline at end of file
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
index 07ef5f04d4..fb5ea58174 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
@@ -80,7 +80,8 @@ public class StreamMessageTest extends TestCase
//force synch to ensure the consumer has resulted in a bound queue
- ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
+ //((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
+ // This is the default now
Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test");
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
index 0d75a6b968..2abc139ced 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
@@ -43,7 +43,8 @@ import javax.jms.TextMessage;
public class CommitRollbackTest extends TestCase
{
protected AMQConnection conn;
- protected final String queue = "direct://amq.direct//Qpid.Client.Transacted.CommitRollback.queue";
+ protected String queue = "direct://amq.direct//Qpid.Client.Transacted.CommitRollback.queue";
+ protected static int testMethod = 0;
protected String payload = "xyzzy";
private Session _session;
private MessageProducer _publisher;
@@ -57,6 +58,11 @@ public class CommitRollbackTest extends TestCase
{
super.setUp();
TransportConnection.createVMBroker(1);
+
+ testMethod++;
+ queue += testMethod;
+
+
newConnection();
}
@@ -84,7 +90,11 @@ public class CommitRollbackTest extends TestCase
TransportConnection.killVMBroker(1);
}
- /** PUT a text message, disconnect before commit, confirm it is gone. */
+ /**
+ * PUT a text message, disconnect before commit, confirm it is gone.
+ *
+ * @throws Exception On error
+ */
public void testPutThenDisconnect() throws Exception
{
assertTrue("session is not transacted", _session.getTransacted());
@@ -109,7 +119,11 @@ public class CommitRollbackTest extends TestCase
assertNull("test message was put and disconnected before commit, but is still present", result);
}
- /** PUT a text message, disconnect before commit, confirm it is gone. */
+ /**
+ * PUT a text message, disconnect before commit, confirm it is gone.
+ *
+ * @throws Exception On error
+ */
public void testPutThenCloseDisconnect() throws Exception
{
assertTrue("session is not transacted", _session.getTransacted());
@@ -140,6 +154,8 @@ public class CommitRollbackTest extends TestCase
/**
* PUT a text message, rollback, confirm message is gone. The consumer is on the same connection but different
* session as producer
+ *
+ * @throws Exception On error
*/
public void testPutThenRollback() throws Exception
{
@@ -160,7 +176,11 @@ public class CommitRollbackTest extends TestCase
assertNull("test message was put and rolled back, but is still present", result);
}
- /** GET a text message, disconnect before commit, confirm it is still there. The consumer is on a new connection */
+ /**
+ * GET a text message, disconnect before commit, confirm it is still there. The consumer is on a new connection
+ *
+ * @throws Exception On error
+ */
public void testGetThenDisconnect() throws Exception
{
assertTrue("session is not transacted", _session.getTransacted());
@@ -194,6 +214,8 @@ public class CommitRollbackTest extends TestCase
/**
* GET a text message, close consumer, disconnect before commit, confirm it is still there. The consumer is on the
* same connection but different session as producer
+ *
+ * @throws Exception On error
*/
public void testGetThenCloseDisconnect() throws Exception
{
@@ -230,6 +252,8 @@ public class CommitRollbackTest extends TestCase
/**
* GET a text message, rollback, confirm it is still there. The consumer is on the same connection but differnt
* session to the producer
+ *
+ * @throws Exception On error
*/
public void testGetThenRollback() throws Exception
{
@@ -266,6 +290,8 @@ public class CommitRollbackTest extends TestCase
/**
* GET a text message, close message producer, rollback, confirm it is still there. The consumer is on the same
* connection but different session as producer
+ *
+ * @throws Exception On error
*/
public void testGetThenCloseRollback() throws Exception
{
@@ -304,7 +330,11 @@ public class CommitRollbackTest extends TestCase
}
- /** Test that rolling back a session purges the dispatcher queue, and the messages arrive in the correct order */
+ /**
+ * Test that rolling back a session purges the dispatcher queue, and the messages arrive in the correct order
+ *
+ * @throws Exception On error
+ */
public void testSend2ThenRollback() throws Exception
{
assertTrue("session is not transacted", _session.getTransacted());
@@ -339,37 +369,41 @@ public class CommitRollbackTest extends TestCase
public void testSend2ThenCloseAfter1andTryAgain() throws Exception
{
-// assertTrue("session is not transacted", _session.getTransacted());
-// assertTrue("session is not transacted", _pubSession.getTransacted());
-//
-// _logger.info("sending two test messages");
-// _publisher.send(_pubSession.createTextMessage("1"));
-// _publisher.send(_pubSession.createTextMessage("2"));
-// _pubSession.commit();
-//
-// _logger.info("getting test message");
-// assertEquals("1", ((TextMessage) _consumer.receive(1000)).getText());
-//
-// _consumer.close();
-//
-// _consumer = _session.createConsumer(_jmsQueue);
-//
-// _logger.info("receiving result");
-// Message result = _consumer.receive(1000);
-// _logger.error("1:" + result);
-//// assertNotNull("test message was consumed and rolled back, but is gone", result);
-//// assertEquals("1" , ((TextMessage) result).getText());
-//// assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered());
-//
-// result = _consumer.receive(1000);
-// _logger.error("2" + result);
-//// assertNotNull("test message was consumed and rolled back, but is gone", result);
-//// assertEquals("2", ((TextMessage) result).getText());
-//// assertTrue("Messasge is marked as redelivered" + result, !result.getJMSRedelivered());
-//
-// result = _consumer.receive(1000);
-// _logger.error("3" + result);
-// assertNull("test message should be null:" + result, result);
+ assertTrue("session is not transacted", _session.getTransacted());
+ assertTrue("session is not transacted", _pubSession.getTransacted());
+
+ _logger.info("sending two test messages");
+ _publisher.send(_pubSession.createTextMessage("1"));
+ _publisher.send(_pubSession.createTextMessage("2"));
+ _pubSession.commit();
+
+ _logger.info("getting test message");
+ Message result = _consumer.receive(1000);
+
+ assertNotNull("Message received should not be null", result);
+ assertEquals("1", ((TextMessage) result).getText());
+ assertTrue("Messasge is marked as redelivered" + result, !result.getJMSRedelivered());
+
+
+ _logger.info("Closing Consumer");
+ _consumer.close();
+
+ _logger.info("Creating New consumer");
+ _consumer = _session.createConsumer(_jmsQueue);
+
+ _logger.info("receiving result");
+ result = _consumer.receive(1000);
+ assertNotNull("test message was consumed and rolled back, but is gone", result);
+ assertEquals("1", ((TextMessage) result).getText());
+ assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered());
+
+ result = _consumer.receive(1000);
+ assertNotNull("test message was consumed and rolled back, but is gone", result);
+ assertEquals("2", ((TextMessage) result).getText());
+ assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered());
+
+ result = _consumer.receive(1000);
+ assertNull("test message should be null:" + result, result);
}
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
index 94cbb426e5..d994d4c141 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
@@ -62,69 +62,125 @@ public class TransactedTest extends TestCase
{
super.setUp();
TransportConnection.createVMBroker(1);
+ _logger.info("Create Connection");
con = new AMQConnection("vm://:1", "guest", "guest", "TransactedTest", "test");
+
+ _logger.info("Create Session");
session = con.createSession(true, Session.SESSION_TRANSACTED);
+ _logger.info("Create Q1");
queue1 = new AMQQueue(session.getDefaultQueueExchangeName(), new AMQShortString("Q1"), new AMQShortString("Q1"), false, true);
+ _logger.info("Create Q2");
queue2 = new AMQQueue(session.getDefaultQueueExchangeName(), new AMQShortString("Q2"), false);
-
+ _logger.info("Create Consumer of Q1");
consumer1 = session.createConsumer(queue1);
- //Dummy just to create the queue.
+ //Dummy just to create the queue.
+ _logger.info("Create Consumer of Q2");
MessageConsumer consumer2 = session.createConsumer(queue2);
+ _logger.info("Close Consumer of Q2");
consumer2.close();
+
+ _logger.info("Create producer to Q2");
producer2 = session.createProducer(queue2);
+
+ _logger.info("Start Connection");
con.start();
+ _logger.info("Create prep connection");
prepCon = new AMQConnection("vm://:1", "guest", "guest", "PrepConnection", "test");
+
+ _logger.info("Create prep session");
prepSession = prepCon.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+
+ _logger.info("Create prep producer to Q1");
prepProducer1 = prepSession.createProducer(queue1);
+
+ _logger.info("Create prep connection start");
prepCon.start();
- //add some messages
- prepProducer1.send(prepSession.createTextMessage("A"));
- prepProducer1.send(prepSession.createTextMessage("B"));
- prepProducer1.send(prepSession.createTextMessage("C"));
+ _logger.info("Create test connection");
testCon = new AMQConnection("vm://:1", "guest", "guest", "TestConnection", "test");
+ _logger.info("Create test session");
testSession = testCon.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ _logger.info("Create test consumer of q2");
testConsumer2 = testSession.createConsumer(queue2);
-
}
protected void tearDown() throws Exception
{
+ _logger.info("Close connection");
con.close();
+ _logger.info("Close test connection");
testCon.close();
+ _logger.info("Close prep connection");
prepCon.close();
+ _logger.info("Kill broker");
TransportConnection.killAllVMBrokers();
super.tearDown();
}
public void testCommit() throws Exception
{
+ //add some messages
+ _logger.info("Send prep A");
+ prepProducer1.send(prepSession.createTextMessage("A"));
+ _logger.info("Send prep B");
+ prepProducer1.send(prepSession.createTextMessage("B"));
+ _logger.info("Send prep C");
+ prepProducer1.send(prepSession.createTextMessage("C"));
+
//send and receive some messages
+ _logger.info("Send X to Q2");
producer2.send(session.createTextMessage("X"));
+ _logger.info("Send Y to Q2");
producer2.send(session.createTextMessage("Y"));
+ _logger.info("Send Z to Q2");
producer2.send(session.createTextMessage("Z"));
+
+
+ _logger.info("Read A from Q1");
expect("A", consumer1.receive(1000));
+ _logger.info("Read B from Q1");
expect("B", consumer1.receive(1000));
+ _logger.info("Read C from Q1");
expect("C", consumer1.receive(1000));
//commit
+ _logger.info("session commit");
session.commit();
+ _logger.info("Start test Connection");
testCon.start();
+
//ensure sent messages can be received and received messages are gone
+ _logger.info("Read X from Q2");
expect("X", testConsumer2.receive(1000));
+ _logger.info("Read Y from Q2");
expect("Y", testConsumer2.receive(1000));
+ _logger.info("Read Z from Q2");
expect("Z", testConsumer2.receive(1000));
+ _logger.info("create test session on Q1");
testConsumer1 = testSession.createConsumer(queue1);
+ _logger.info("Read null from Q1");
assertTrue(null == testConsumer1.receive(1000));
+ _logger.info("Read null from Q2");
assertTrue(null == testConsumer2.receive(1000));
}
public void testRollback() throws Exception
{
+ //add some messages
+ _logger.info("Send prep A");
+ prepProducer1.send(prepSession.createTextMessage("A"));
+ _logger.info("Send prep B");
+ prepProducer1.send(prepSession.createTextMessage("B"));
+ _logger.info("Send prep C");
+ prepProducer1.send(prepSession.createTextMessage("C"));
+
+ //Quick sleep to ensure all three get pre-fetched
+ Thread.sleep(500);
+
_logger.info("Sending X Y Z");
producer2.send(session.createTextMessage("X"));
producer2.send(session.createTextMessage("Y"));
@@ -140,9 +196,9 @@ public class TransactedTest extends TestCase
_logger.info("Receiving A B C");
//ensure sent messages are not visible and received messages are requeued
- expect("A", consumer1.receive(1000));
- expect("B", consumer1.receive(1000));
- expect("C", consumer1.receive(1000));
+ expect("A", consumer1.receive(1000), true);
+ expect("B", consumer1.receive(1000), true);
+ expect("C", consumer1.receive(1000), true);
_logger.info("Starting new connection");
testCon.start();
@@ -152,20 +208,22 @@ public class TransactedTest extends TestCase
assertTrue(null == testConsumer2.receive(1000));
session.commit();
+
+ _logger.info("Testing we have no messages left after commit");
+ assertTrue(null == testConsumer1.receive(1000));
+ assertTrue(null == testConsumer2.receive(1000));
}
public void testResendsMsgsAfterSessionClose() throws Exception
{
AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test");
- Session consumerSession = con.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+ Session consumerSession = con.createSession(true, Session.SESSION_TRANSACTED);
AMQQueue queue3 = new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q3"), false);
MessageConsumer consumer = consumerSession.createConsumer(queue3);
- //force synch to ensure the consumer has resulted in a bound queue
- ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
AMQConnection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test");
- Session producerSession = con2.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+ Session producerSession = con2.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = producerSession.createProducer(queue3);
_logger.info("Sending four messages");
@@ -176,65 +234,77 @@ public class TransactedTest extends TestCase
producerSession.commit();
-
_logger.info("Starting connection");
con.start();
TextMessage tm = (TextMessage) consumer.receive();
+ assertNotNull(tm);
+ assertEquals("msg1", tm.getText());
- tm.acknowledge();
consumerSession.commit();
- _logger.info("Received and acknowledged first message");
+ _logger.info("Received and committed first message");
tm = (TextMessage) consumer.receive(1000);
assertNotNull(tm);
+ assertEquals("msg2", tm.getText());
+
tm = (TextMessage) consumer.receive(1000);
assertNotNull(tm);
+ assertEquals("msg3", tm.getText());
+
tm = (TextMessage) consumer.receive(1000);
assertNotNull(tm);
+ assertEquals("msg4", tm.getText());
+
_logger.info("Received all four messages. Closing connection with three outstanding messages");
consumerSession.close();
- consumerSession = con.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+ consumerSession = con.createSession(true, Session.SESSION_TRANSACTED);
consumer = consumerSession.createConsumer(queue3);
// no ack for last three messages so when I call recover I expect to get three messages back
-
tm = (TextMessage) consumer.receive(3000);
assertNotNull(tm);
assertEquals("msg2", tm.getText());
+ assertTrue("Message is not redelivered", tm.getJMSRedelivered());
tm = (TextMessage) consumer.receive(3000);
assertNotNull(tm);
assertEquals("msg3", tm.getText());
+ assertTrue("Message is not redelivered", tm.getJMSRedelivered());
tm = (TextMessage) consumer.receive(3000);
assertNotNull(tm);
assertEquals("msg4", tm.getText());
+ assertTrue("Message is not redelivered", tm.getJMSRedelivered());
+
+ _logger.info("Received redelivery of three messages. Committing");
- _logger.info("Received redelivery of three messages. Acknowledging last message");
- tm.acknowledge();
consumerSession.commit();
- _logger.info("Calling acknowledge with no outstanding messages");
- // all acked so no messages to be delivered
+ _logger.info("Called commit");
- tm = (TextMessage) consumer.receiveNoWait();
+ tm = (TextMessage) consumer.receive(1000);
assertNull(tm);
+
_logger.info("No messages redelivered as is expected");
con.close();
con2.close();
-
}
-
private void expect(String text, Message msg) throws JMSException
{
+ expect(text, msg, false);
+ }
+
+ private void expect(String text, Message msg, boolean requeued) throws JMSException
+ {
assertNotNull("Message should not be null", msg);
assertTrue("Message should be a text message", msg instanceof TextMessage);
assertEquals("Message content does not match expected", text, ((TextMessage) msg).getText());
+ assertEquals("Message should " + (requeued ? "" : "not") + " be requeued", requeued, msg.getJMSRedelivered());
}
public static junit.framework.Test suite()