diff options
Diffstat (limited to 'java/client/src/test')
6 files changed, 782 insertions, 68 deletions
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java index 338404a431..4667a2b3fa 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java @@ -73,7 +73,8 @@ public class RecoverTest extends TestCase Queue queue = new AMQQueue(consumerSession.getDefaultQueueExchangeName(),new AMQShortString("someQ"), new AMQShortString("someQ"), false, true); MessageConsumer consumer = consumerSession.createConsumer(queue); //force synch to ensure the consumer has resulted in a bound queue - ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); + //((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); + // This is the default now AMQConnection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test"); Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); @@ -130,7 +131,8 @@ public class RecoverTest extends TestCase Queue queue = new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("someQ"), new AMQShortString("someQ"), false, true); MessageConsumer consumer = consumerSession.createConsumer(queue); //force synch to ensure the consumer has resulted in a bound queue - ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); + //((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); + // This is the default now AMQConnection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test"); Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java index 3431c56783..51bbe7d0e6 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java @@ -109,6 +109,10 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con } catch (AMQException e) { + if (_logger.isInfoEnabled()) + { + _logger.info("Exception occured was:" + e.getErrorCode()); + } assertEquals("Connection should be closed", AMQConstant.CHANNEL_ERROR, e.getErrorCode()); _connection = newConnection(); @@ -315,15 +319,15 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con } catch (JMSException e) { - fail("Creating new connection when:"+e.getMessage()); + fail("Creating new connection when:" + e.getMessage()); } catch (AMQException e) { - fail("Creating new connection when:"+e.getMessage()); + fail("Creating new connection when:" + e.getMessage()); } catch (URLSyntaxException e) { - fail("Creating new connection when:"+e.getMessage()); + fail("Creating new connection when:" + e.getMessage()); } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java new file mode 100644 index 0000000000..a56bae3d70 --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java @@ -0,0 +1,603 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.test.unit.close; + +import junit.framework.TestCase; + +import java.util.concurrent.atomic.AtomicInteger; + + +import javax.jms.ExceptionListener; +import javax.jms.Session; +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Queue; +import javax.jms.MessageProducer; +import javax.jms.Message; +import javax.jms.TextMessage; +import javax.jms.MessageConsumer; + +import org.apache.qpid.client.AMQConnectionFactory; +import org.apache.qpid.client.AMQConnectionURL; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.message.AbstractJMSMessage; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.url.URLSyntaxException; +import org.apache.qpid.AMQException; +import org.apache.log4j.Logger; +import org.apache.log4j.Level; + +public class MessageRequeueTest extends TestCase +{ + + private static final Logger _logger = Logger.getLogger(MessageRequeueTest.class); + + protected static AtomicInteger consumerIds = new AtomicInteger(0); + protected final Integer numTestMessages = 150; + + protected final int consumeTimeout = 3000; + + protected final String queue = "direct://amq.direct//queue"; + protected String payload = "Message:"; + + protected final String BROKER = "vm://:1"; + private boolean testReception = true; + + private long[] receieved = new long[numTestMessages + 1]; + private boolean passed=false; + + protected void setUp() throws Exception + { + super.setUp(); + TransportConnection.createVMBroker(1); + + QpidClientConnection conn = new QpidClientConnection(); + + conn.connect(); + // clear queue + conn.consume(queue, consumeTimeout); + // load test data + _logger.info("creating test data, " + numTestMessages + " messages"); + conn.put(queue, payload, numTestMessages); + // close this connection + conn.disconnect(); + } + + protected void tearDown() throws Exception + { + super.tearDown(); + + if (!passed) + { + QpidClientConnection conn = new QpidClientConnection(); + + conn.connect(); + // clear queue + conn.consume(queue, consumeTimeout); + } + TransportConnection.killVMBroker(1); + } + + /** multiple consumers */ + public void testDrain() throws JMSException, InterruptedException + { + QpidClientConnection conn = new QpidClientConnection(); + + conn.connect(); + + _logger.info("consuming queue " + queue); + Queue q = conn.getSession().createQueue(queue); + + final MessageConsumer consumer = conn.getSession().createConsumer(q); + int messagesReceived = 0; + + long messageLog[] = new long[numTestMessages + 1]; + + _logger.info("consuming..."); + Message msg = consumer.receive(1000); + while (msg != null) + { + messagesReceived++; + + long dt = ((AbstractJMSMessage) msg).getDeliveryTag(); + + int msgindex = msg.getIntProperty("index"); + if (messageLog[msgindex] != 0) + { + _logger.error("Received Message(" + msgindex + ":" + ((AbstractJMSMessage) msg).getDeliveryTag() + + ") more than once."); + } + + if (_logger.isInfoEnabled()) + { + _logger.info("Received Message(" + System.identityHashCode(msgindex) + ") " + + "DT:" + dt + + "IN:" + msgindex); + } + + if (dt == 0) + { + _logger.error("DT is zero for msg:" + msgindex); + } + + messageLog[msgindex] = dt; + + //get Next message + msg = consumer.receive(1000); + } + + conn.getSession().commit(); + consumer.close(); + assertEquals("number of consumed messages does not match initial data", (int) numTestMessages, messagesReceived); + + int index = 0; + StringBuilder list = new StringBuilder(); + list.append("Failed to receive:"); + int failed = 0; + + for (long b : messageLog) + { + if (b == 0 && index != 0) //delivery tag of zero shouldn't exist + { + _logger.error("Index: " + index + " was not received."); + list.append(" "); + list.append(index); + list.append(":"); + list.append(b); + failed++; + } + + index++; + } + assertEquals(list.toString(), 0, failed); + _logger.info("consumed: " + messagesReceived); + conn.disconnect(); + } + + /** multiple consumers */ + public void testTwoCompetingConsumers() + { + Consumer c1 = new Consumer(); + Consumer c2 = new Consumer(); + Consumer c3 = new Consumer(); + Consumer c4 = new Consumer(); + + Thread t1 = new Thread(c1); + Thread t2 = new Thread(c2); + Thread t3 = new Thread(c3); + Thread t4 = new Thread(c4); + + t1.start(); +// t2.start(); +// t3.start(); +// t4.start(); + + try + { + t1.join(); + t2.join(); + t3.join(); + t4.join(); + } + catch (InterruptedException e) + { + fail("Uanble to join to Consumer theads"); + } + + _logger.info("consumer 1 count is " + c1.getCount()); + _logger.info("consumer 2 count is " + c2.getCount()); + _logger.info("consumer 3 count is " + c3.getCount()); + _logger.info("consumer 4 count is " + c4.getCount()); + + Integer totalConsumed = c1.getCount() + c2.getCount() + c3.getCount() + c4.getCount(); + + // Check all messages were correctly delivered + int index = 0; + StringBuilder list = new StringBuilder(); + list.append("Failed to receive:"); + int failed = 0; + + for (long b : receieved) + { + if (b == 0 && index != 0) //delivery tag of zero shouldn't exist (and we don't have msg 0) + { + _logger.error("Index: " + index + " was not received."); + list.append(" "); + list.append(index); + list.append(":"); + list.append(b); + failed++; + } + index++; + } + assertEquals(list.toString() + "-" + numTestMessages + "-" + totalConsumed, 0, failed); + assertEquals("number of consumed messages does not match initial data", numTestMessages, totalConsumed); + passed=true; + } + + class Consumer implements Runnable + { + private Integer count = 0; + private Integer id; + + public Consumer() + { + id = consumerIds.addAndGet(1); + } + + public void run() + { + try + { + _logger.info("consumer-" + id + ": starting"); + QpidClientConnection conn = new QpidClientConnection(); + + conn.connect(); + + _logger.info("consumer-" + id + ": connected, consuming..."); + Message result; + do + { + result = conn.getNextMessage(queue, consumeTimeout); + if (result != null) + { + + long dt = ((AbstractJMSMessage) result).getDeliveryTag(); + + if (testReception) + { + int msgindex = result.getIntProperty("index"); + if (receieved[msgindex] != 0) + { + _logger.error("Received Message(" + msgindex + ":" + ((AbstractJMSMessage) result).getDeliveryTag() + + ") more than once."); + } + + if (_logger.isInfoEnabled()) + { + _logger.info("Received Message(" + System.identityHashCode(msgindex) + ") " + + "DT:" + dt + + "IN:" + msgindex); + } + + if (dt == 0) + { + _logger.error("DT is zero for msg:" + msgindex); + } + + receieved[msgindex] = dt; + } + + + count++; + if (count % 100 == 0) + { + _logger.info("consumer-" + id + ": got " + result + ", new count is " + count); + } + } + } + while (result != null); + + _logger.info("consumer-" + id + ": complete"); + conn.disconnect(); + + } + catch (Exception e) + { + e.printStackTrace(); + } + } + + public Integer getCount() + { + return count; + } + + public Integer getId() + { + return id; + } + } + + + public class QpidClientConnection implements ExceptionListener + { + private boolean transacted = true; + private int ackMode = Session.CLIENT_ACKNOWLEDGE; + private Connection connection; + + private String virtualHost; + private String brokerlist; + private int prefetch; + protected Session session; + protected boolean connected; + + public QpidClientConnection() + { + super(); + setVirtualHost("/test"); + setBrokerList(BROKER); + setPrefetch(5000); + } + + + public void connect() throws JMSException + { + if (!connected) + { + /* + * amqp://[user:pass@][clientid]/virtualhost? + * brokerlist='[transport://]host[:port][?option='value'[&option='value']];' + * [&failover='method[?option='value'[&option='value']]'] + * [&option='value']" + */ + String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'"; + try + { + AMQConnectionFactory factory = new AMQConnectionFactory(new AMQConnectionURL(brokerUrl)); + _logger.info("connecting to Qpid :" + brokerUrl); + connection = factory.createConnection(); + + // register exception listener + connection.setExceptionListener(this); + + session = ((AMQConnection) connection).createSession(transacted, ackMode, prefetch); + + + _logger.info("starting connection"); + connection.start(); + + connected = true; + } + catch (URLSyntaxException e) + { + throw new JMSException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage()); + } + } + } + + public void disconnect() throws JMSException + { + if (connected) + { + session.commit(); + session.close(); + connection.close(); + connected = false; + _logger.info("disconnected"); + } + } + + public void disconnectWithoutCommit() throws JMSException + { + if (connected) + { + session.close(); + connection.close(); + connected = false; + _logger.info("disconnected without commit"); + } + } + + public String getBrokerList() + { + return brokerlist; + } + + public void setBrokerList(String brokerlist) + { + this.brokerlist = brokerlist; + } + + public String getVirtualHost() + { + return virtualHost; + } + + public void setVirtualHost(String virtualHost) + { + this.virtualHost = virtualHost; + } + + public void setPrefetch(int prefetch) + { + this.prefetch = prefetch; + } + + + /** override as necessary */ + public void onException(JMSException exception) + { + _logger.info("ExceptionListener event: error " + exception.getErrorCode() + ", message: " + exception.getMessage()); + } + + public boolean isConnected() + { + return connected; + } + + public Session getSession() + { + return session; + } + + /** + * Put a String as a text messages, repeat n times. A null payload will result in a null message. + * + * @param queueName The queue name to put to + * @param payload the content of the payload + * @param copies the number of messages to put + * + * @throws javax.jms.JMSException any exception that occurs + */ + public void put(String queueName, String payload, int copies) throws JMSException + { + if (!connected) + { + connect(); + } + + _logger.info("putting to queue " + queueName); + Queue queue = session.createQueue(queueName); + + final MessageProducer sender = session.createProducer(queue); + + for (int i = 0; i < copies; i++) + { + Message m = session.createTextMessage(payload + i); + m.setIntProperty("index", i + 1); + sender.send(m); + } + + session.commit(); + sender.close(); + _logger.info("put " + copies + " copies"); + } + + /** + * GET the top message on a queue. Consumes the message. Accepts timeout value. + * + * @param queueName The quename to get from + * @param readTimeout The timeout to use + * + * @return the content of the text message if any + * + * @throws javax.jms.JMSException any exception that occured + */ + public Message getNextMessage(String queueName, long readTimeout) throws JMSException + { + if (!connected) + { + connect(); + } + + Queue queue = session.createQueue(queueName); + + final MessageConsumer consumer = session.createConsumer(queue); + + Message message = consumer.receive(readTimeout); + session.commit(); + consumer.close(); + + Message result; + + // all messages we consume should be TextMessages + if (message instanceof TextMessage) + { + result = ((TextMessage) message); + } + else if (null == message) + { + result = null; + } + else + { + _logger.info("warning: received non-text message"); + result = message; + } + + return result; + } + + /** + * GET the top message on a queue. Consumes the message. + * + * @param queueName The Queuename to get from + * + * @return The string content of the text message, if any received + * + * @throws javax.jms.JMSException any exception that occurs + */ + public Message getNextMessage(String queueName) throws JMSException + { + return getNextMessage(queueName, 0); + } + + /** + * Completely clears a queue. For readTimeout behaviour see Javadocs for javax.jms.MessageConsumer. + * + * @param queueName The Queue name to consume from + * @param readTimeout The timeout for each consume + * + * @throws javax.jms.JMSException Any exception that occurs during the consume + * @throws InterruptedException If the consume thread was interrupted during a consume. + */ + public void consume(String queueName, int readTimeout) throws JMSException, InterruptedException + { + if (!connected) + { + connect(); + } + + _logger.info("consuming queue " + queueName); + Queue queue = session.createQueue(queueName); + + final MessageConsumer consumer = session.createConsumer(queue); + int messagesReceived = 0; + + _logger.info("consuming..."); + while ((consumer.receive(readTimeout)) != null) + { + messagesReceived++; + } + + session.commit(); + consumer.close(); + _logger.info("consumed: " + messagesReceived); + } + } + + + public void testRequeue() throws JMSException, AMQException, URLSyntaxException + { + String virtualHost = "/test"; + String brokerlist = "vm://:1"; + String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'"; + + Connection conn = new AMQConnection(brokerUrl); + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue q = session.createQueue(queue); + + _logger.info("Create Consumer"); + MessageConsumer consumer = session.createConsumer(q); + + try + { + Thread.sleep(2000); + } + catch (InterruptedException e) + { + // + } + + _logger.info("Receiving msg"); + Message msg = consumer.receive(); + + assertNotNull("Message should not be null", msg); + + _logger.info("Close Consumer"); + consumer.close(); + + _logger.info("Close Connection"); + conn.close(); + } + +}
\ No newline at end of file diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java index 07ef5f04d4..fb5ea58174 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java @@ -80,7 +80,8 @@ public class StreamMessageTest extends TestCase //force synch to ensure the consumer has resulted in a bound queue - ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS); + //((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS); + // This is the default now Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test"); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java index 0d75a6b968..2abc139ced 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java @@ -43,7 +43,8 @@ import javax.jms.TextMessage; public class CommitRollbackTest extends TestCase { protected AMQConnection conn; - protected final String queue = "direct://amq.direct//Qpid.Client.Transacted.CommitRollback.queue"; + protected String queue = "direct://amq.direct//Qpid.Client.Transacted.CommitRollback.queue"; + protected static int testMethod = 0; protected String payload = "xyzzy"; private Session _session; private MessageProducer _publisher; @@ -57,6 +58,11 @@ public class CommitRollbackTest extends TestCase { super.setUp(); TransportConnection.createVMBroker(1); + + testMethod++; + queue += testMethod; + + newConnection(); } @@ -84,7 +90,11 @@ public class CommitRollbackTest extends TestCase TransportConnection.killVMBroker(1); } - /** PUT a text message, disconnect before commit, confirm it is gone. */ + /** + * PUT a text message, disconnect before commit, confirm it is gone. + * + * @throws Exception On error + */ public void testPutThenDisconnect() throws Exception { assertTrue("session is not transacted", _session.getTransacted()); @@ -109,7 +119,11 @@ public class CommitRollbackTest extends TestCase assertNull("test message was put and disconnected before commit, but is still present", result); } - /** PUT a text message, disconnect before commit, confirm it is gone. */ + /** + * PUT a text message, disconnect before commit, confirm it is gone. + * + * @throws Exception On error + */ public void testPutThenCloseDisconnect() throws Exception { assertTrue("session is not transacted", _session.getTransacted()); @@ -140,6 +154,8 @@ public class CommitRollbackTest extends TestCase /** * PUT a text message, rollback, confirm message is gone. The consumer is on the same connection but different * session as producer + * + * @throws Exception On error */ public void testPutThenRollback() throws Exception { @@ -160,7 +176,11 @@ public class CommitRollbackTest extends TestCase assertNull("test message was put and rolled back, but is still present", result); } - /** GET a text message, disconnect before commit, confirm it is still there. The consumer is on a new connection */ + /** + * GET a text message, disconnect before commit, confirm it is still there. The consumer is on a new connection + * + * @throws Exception On error + */ public void testGetThenDisconnect() throws Exception { assertTrue("session is not transacted", _session.getTransacted()); @@ -194,6 +214,8 @@ public class CommitRollbackTest extends TestCase /** * GET a text message, close consumer, disconnect before commit, confirm it is still there. The consumer is on the * same connection but different session as producer + * + * @throws Exception On error */ public void testGetThenCloseDisconnect() throws Exception { @@ -230,6 +252,8 @@ public class CommitRollbackTest extends TestCase /** * GET a text message, rollback, confirm it is still there. The consumer is on the same connection but differnt * session to the producer + * + * @throws Exception On error */ public void testGetThenRollback() throws Exception { @@ -266,6 +290,8 @@ public class CommitRollbackTest extends TestCase /** * GET a text message, close message producer, rollback, confirm it is still there. The consumer is on the same * connection but different session as producer + * + * @throws Exception On error */ public void testGetThenCloseRollback() throws Exception { @@ -304,7 +330,11 @@ public class CommitRollbackTest extends TestCase } - /** Test that rolling back a session purges the dispatcher queue, and the messages arrive in the correct order */ + /** + * Test that rolling back a session purges the dispatcher queue, and the messages arrive in the correct order + * + * @throws Exception On error + */ public void testSend2ThenRollback() throws Exception { assertTrue("session is not transacted", _session.getTransacted()); @@ -339,37 +369,41 @@ public class CommitRollbackTest extends TestCase public void testSend2ThenCloseAfter1andTryAgain() throws Exception { -// assertTrue("session is not transacted", _session.getTransacted()); -// assertTrue("session is not transacted", _pubSession.getTransacted()); -// -// _logger.info("sending two test messages"); -// _publisher.send(_pubSession.createTextMessage("1")); -// _publisher.send(_pubSession.createTextMessage("2")); -// _pubSession.commit(); -// -// _logger.info("getting test message"); -// assertEquals("1", ((TextMessage) _consumer.receive(1000)).getText()); -// -// _consumer.close(); -// -// _consumer = _session.createConsumer(_jmsQueue); -// -// _logger.info("receiving result"); -// Message result = _consumer.receive(1000); -// _logger.error("1:" + result); -//// assertNotNull("test message was consumed and rolled back, but is gone", result); -//// assertEquals("1" , ((TextMessage) result).getText()); -//// assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered()); -// -// result = _consumer.receive(1000); -// _logger.error("2" + result); -//// assertNotNull("test message was consumed and rolled back, but is gone", result); -//// assertEquals("2", ((TextMessage) result).getText()); -//// assertTrue("Messasge is marked as redelivered" + result, !result.getJMSRedelivered()); -// -// result = _consumer.receive(1000); -// _logger.error("3" + result); -// assertNull("test message should be null:" + result, result); + assertTrue("session is not transacted", _session.getTransacted()); + assertTrue("session is not transacted", _pubSession.getTransacted()); + + _logger.info("sending two test messages"); + _publisher.send(_pubSession.createTextMessage("1")); + _publisher.send(_pubSession.createTextMessage("2")); + _pubSession.commit(); + + _logger.info("getting test message"); + Message result = _consumer.receive(1000); + + assertNotNull("Message received should not be null", result); + assertEquals("1", ((TextMessage) result).getText()); + assertTrue("Messasge is marked as redelivered" + result, !result.getJMSRedelivered()); + + + _logger.info("Closing Consumer"); + _consumer.close(); + + _logger.info("Creating New consumer"); + _consumer = _session.createConsumer(_jmsQueue); + + _logger.info("receiving result"); + result = _consumer.receive(1000); + assertNotNull("test message was consumed and rolled back, but is gone", result); + assertEquals("1", ((TextMessage) result).getText()); + assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered()); + + result = _consumer.receive(1000); + assertNotNull("test message was consumed and rolled back, but is gone", result); + assertEquals("2", ((TextMessage) result).getText()); + assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered()); + + result = _consumer.receive(1000); + assertNull("test message should be null:" + result, result); } } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java index 94cbb426e5..d994d4c141 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java @@ -62,69 +62,125 @@ public class TransactedTest extends TestCase { super.setUp(); TransportConnection.createVMBroker(1); + _logger.info("Create Connection"); con = new AMQConnection("vm://:1", "guest", "guest", "TransactedTest", "test"); + + _logger.info("Create Session"); session = con.createSession(true, Session.SESSION_TRANSACTED); + _logger.info("Create Q1"); queue1 = new AMQQueue(session.getDefaultQueueExchangeName(), new AMQShortString("Q1"), new AMQShortString("Q1"), false, true); + _logger.info("Create Q2"); queue2 = new AMQQueue(session.getDefaultQueueExchangeName(), new AMQShortString("Q2"), false); - + _logger.info("Create Consumer of Q1"); consumer1 = session.createConsumer(queue1); - //Dummy just to create the queue. + //Dummy just to create the queue. + _logger.info("Create Consumer of Q2"); MessageConsumer consumer2 = session.createConsumer(queue2); + _logger.info("Close Consumer of Q2"); consumer2.close(); + + _logger.info("Create producer to Q2"); producer2 = session.createProducer(queue2); + + _logger.info("Start Connection"); con.start(); + _logger.info("Create prep connection"); prepCon = new AMQConnection("vm://:1", "guest", "guest", "PrepConnection", "test"); + + _logger.info("Create prep session"); prepSession = prepCon.createSession(false, AMQSession.NO_ACKNOWLEDGE); + + _logger.info("Create prep producer to Q1"); prepProducer1 = prepSession.createProducer(queue1); + + _logger.info("Create prep connection start"); prepCon.start(); - //add some messages - prepProducer1.send(prepSession.createTextMessage("A")); - prepProducer1.send(prepSession.createTextMessage("B")); - prepProducer1.send(prepSession.createTextMessage("C")); + _logger.info("Create test connection"); testCon = new AMQConnection("vm://:1", "guest", "guest", "TestConnection", "test"); + _logger.info("Create test session"); testSession = testCon.createSession(false, AMQSession.NO_ACKNOWLEDGE); + _logger.info("Create test consumer of q2"); testConsumer2 = testSession.createConsumer(queue2); - } protected void tearDown() throws Exception { + _logger.info("Close connection"); con.close(); + _logger.info("Close test connection"); testCon.close(); + _logger.info("Close prep connection"); prepCon.close(); + _logger.info("Kill broker"); TransportConnection.killAllVMBrokers(); super.tearDown(); } public void testCommit() throws Exception { + //add some messages + _logger.info("Send prep A"); + prepProducer1.send(prepSession.createTextMessage("A")); + _logger.info("Send prep B"); + prepProducer1.send(prepSession.createTextMessage("B")); + _logger.info("Send prep C"); + prepProducer1.send(prepSession.createTextMessage("C")); + //send and receive some messages + _logger.info("Send X to Q2"); producer2.send(session.createTextMessage("X")); + _logger.info("Send Y to Q2"); producer2.send(session.createTextMessage("Y")); + _logger.info("Send Z to Q2"); producer2.send(session.createTextMessage("Z")); + + + _logger.info("Read A from Q1"); expect("A", consumer1.receive(1000)); + _logger.info("Read B from Q1"); expect("B", consumer1.receive(1000)); + _logger.info("Read C from Q1"); expect("C", consumer1.receive(1000)); //commit + _logger.info("session commit"); session.commit(); + _logger.info("Start test Connection"); testCon.start(); + //ensure sent messages can be received and received messages are gone + _logger.info("Read X from Q2"); expect("X", testConsumer2.receive(1000)); + _logger.info("Read Y from Q2"); expect("Y", testConsumer2.receive(1000)); + _logger.info("Read Z from Q2"); expect("Z", testConsumer2.receive(1000)); + _logger.info("create test session on Q1"); testConsumer1 = testSession.createConsumer(queue1); + _logger.info("Read null from Q1"); assertTrue(null == testConsumer1.receive(1000)); + _logger.info("Read null from Q2"); assertTrue(null == testConsumer2.receive(1000)); } public void testRollback() throws Exception { + //add some messages + _logger.info("Send prep A"); + prepProducer1.send(prepSession.createTextMessage("A")); + _logger.info("Send prep B"); + prepProducer1.send(prepSession.createTextMessage("B")); + _logger.info("Send prep C"); + prepProducer1.send(prepSession.createTextMessage("C")); + + //Quick sleep to ensure all three get pre-fetched + Thread.sleep(500); + _logger.info("Sending X Y Z"); producer2.send(session.createTextMessage("X")); producer2.send(session.createTextMessage("Y")); @@ -140,9 +196,9 @@ public class TransactedTest extends TestCase _logger.info("Receiving A B C"); //ensure sent messages are not visible and received messages are requeued - expect("A", consumer1.receive(1000)); - expect("B", consumer1.receive(1000)); - expect("C", consumer1.receive(1000)); + expect("A", consumer1.receive(1000), true); + expect("B", consumer1.receive(1000), true); + expect("C", consumer1.receive(1000), true); _logger.info("Starting new connection"); testCon.start(); @@ -152,20 +208,22 @@ public class TransactedTest extends TestCase assertTrue(null == testConsumer2.receive(1000)); session.commit(); + + _logger.info("Testing we have no messages left after commit"); + assertTrue(null == testConsumer1.receive(1000)); + assertTrue(null == testConsumer2.receive(1000)); } public void testResendsMsgsAfterSessionClose() throws Exception { AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test"); - Session consumerSession = con.createSession(true, Session.CLIENT_ACKNOWLEDGE); + Session consumerSession = con.createSession(true, Session.SESSION_TRANSACTED); AMQQueue queue3 = new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q3"), false); MessageConsumer consumer = consumerSession.createConsumer(queue3); - //force synch to ensure the consumer has resulted in a bound queue - ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); AMQConnection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test"); - Session producerSession = con2.createSession(true, Session.CLIENT_ACKNOWLEDGE); + Session producerSession = con2.createSession(true, Session.SESSION_TRANSACTED); MessageProducer producer = producerSession.createProducer(queue3); _logger.info("Sending four messages"); @@ -176,65 +234,77 @@ public class TransactedTest extends TestCase producerSession.commit(); - _logger.info("Starting connection"); con.start(); TextMessage tm = (TextMessage) consumer.receive(); + assertNotNull(tm); + assertEquals("msg1", tm.getText()); - tm.acknowledge(); consumerSession.commit(); - _logger.info("Received and acknowledged first message"); + _logger.info("Received and committed first message"); tm = (TextMessage) consumer.receive(1000); assertNotNull(tm); + assertEquals("msg2", tm.getText()); + tm = (TextMessage) consumer.receive(1000); assertNotNull(tm); + assertEquals("msg3", tm.getText()); + tm = (TextMessage) consumer.receive(1000); assertNotNull(tm); + assertEquals("msg4", tm.getText()); + _logger.info("Received all four messages. Closing connection with three outstanding messages"); consumerSession.close(); - consumerSession = con.createSession(true, Session.CLIENT_ACKNOWLEDGE); + consumerSession = con.createSession(true, Session.SESSION_TRANSACTED); consumer = consumerSession.createConsumer(queue3); // no ack for last three messages so when I call recover I expect to get three messages back - tm = (TextMessage) consumer.receive(3000); assertNotNull(tm); assertEquals("msg2", tm.getText()); + assertTrue("Message is not redelivered", tm.getJMSRedelivered()); tm = (TextMessage) consumer.receive(3000); assertNotNull(tm); assertEquals("msg3", tm.getText()); + assertTrue("Message is not redelivered", tm.getJMSRedelivered()); tm = (TextMessage) consumer.receive(3000); assertNotNull(tm); assertEquals("msg4", tm.getText()); + assertTrue("Message is not redelivered", tm.getJMSRedelivered()); + + _logger.info("Received redelivery of three messages. Committing"); - _logger.info("Received redelivery of three messages. Acknowledging last message"); - tm.acknowledge(); consumerSession.commit(); - _logger.info("Calling acknowledge with no outstanding messages"); - // all acked so no messages to be delivered + _logger.info("Called commit"); - tm = (TextMessage) consumer.receiveNoWait(); + tm = (TextMessage) consumer.receive(1000); assertNull(tm); + _logger.info("No messages redelivered as is expected"); con.close(); con2.close(); - } - private void expect(String text, Message msg) throws JMSException { + expect(text, msg, false); + } + + private void expect(String text, Message msg, boolean requeued) throws JMSException + { assertNotNull("Message should not be null", msg); assertTrue("Message should be a text message", msg instanceof TextMessage); assertEquals("Message content does not match expected", text, ((TextMessage) msg).getText()); + assertEquals("Message should " + (requeued ? "" : "not") + " be requeued", requeued, msg.getJMSRedelivered()); } public static junit.framework.Test suite() |
