diff options
| author | Arnaud Simon <arnaudsimon@apache.org> | 2007-09-28 15:40:21 +0000 |
|---|---|---|
| committer | Arnaud Simon <arnaudsimon@apache.org> | 2007-09-28 15:40:21 +0000 |
| commit | af097b2fa03725820f0be434ce3e381604ad5bd2 (patch) | |
| tree | 1f20a9518df9eaec63015e30c27829ddffc6f5e2 /java/client/src/test | |
| parent | b46c7467a0422e16f63f6b7d7fabb8d1ca9a2cff (diff) | |
| download | qpid-python-af097b2fa03725820f0be434ce3e381604ad5bd2.tar.gz | |
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@580389 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src/test')
5 files changed, 276 insertions, 237 deletions
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java index 000fb9ab88..5247f45d21 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java @@ -21,7 +21,6 @@ package org.apache.qpid.test.unit.basic; import junit.framework.Assert; -import junit.framework.TestCase; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; @@ -29,6 +28,7 @@ import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.message.JMSTextMessage; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.testutil.VMBrokerSetup; +import org.apache.qpid.testutil.QpidTestCase; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,7 +45,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; -public class TextMessageTest extends TestCase implements MessageListener +public class TextMessageTest extends QpidTestCase implements MessageListener { private static final Logger _logger = LoggerFactory.getLogger(TextMessageTest.class); @@ -62,7 +62,7 @@ public class TextMessageTest extends TestCase implements MessageListener super.setUp(); try { - init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test")); + init((AMQConnection) getConnection("guest", "guest")); } catch (Exception e) { @@ -89,7 +89,15 @@ public class TextMessageTest extends TestCase implements MessageListener _session = (AMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // set up a slow consumer - _session.createConsumer(destination).setMessageListener(this); + try + { + _session.createConsumer(destination).setMessageListener(this); + } + catch (Throwable e) + { +// TODO + e.printStackTrace(); + } connection.start(); } @@ -117,6 +125,7 @@ public class TextMessageTest extends TestCase implements MessageListener _logger.info("Sending Msg:" + m); producer.send(m); } + _logger.info("sent " + count + " mesages"); } void waitFor(int count) throws InterruptedException @@ -227,6 +236,7 @@ public class TextMessageTest extends TestCase implements MessageListener { synchronized (received) { + _logger.info("===== received one message"); received.add((JMSTextMessage) message); received.notify(); } @@ -237,21 +247,10 @@ public class TextMessageTest extends TestCase implements MessageListener return in + System.currentTimeMillis(); } - public static void main(String[] argv) throws Exception - { - TextMessageTest test = new TextMessageTest(); - test._connectionString = (argv.length == 0) ? "vm://:1" : argv[0]; - test.setUp(); - if (argv.length > 1) - { - test._count = Integer.parseInt(argv[1]); - } - test.test(); - } public static junit.framework.Test suite() { - return new VMBrokerSetup(new junit.framework.TestSuite(TextMessageTest.class)); + return new junit.framework.TestSuite(TextMessageTest.class); } } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java index a19687b07c..58e3f19eed 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java @@ -36,30 +36,29 @@ import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQTopic; import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.testutil.QpidTestCase; /** @author Apache Software Foundation */ -public class TopicSessionTest extends TestCase +public class TopicSessionTest extends QpidTestCase { private static final String BROKER = "vm://:1"; protected void setUp() throws Exception { super.setUp(); - TransportConnection.createVMBroker(1); } protected void tearDown() throws Exception { super.tearDown(); - TransportConnection.killAllVMBrokers(); } public void testTopicSubscriptionUnsubscription() throws Exception { - AMQConnection con = new AMQConnection(BROKER+"?retries='0'", "guest", "guest", "test", "test"); + AMQConnection con = (AMQConnection) getConnection("guest", "guest"); AMQTopic topic = new AMQTopic(con.getDefaultTopicExchangeName(), "MyTopic"); TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); TopicSubscriber sub = session1.createDurableSubscriber(topic, "subscription0"); @@ -104,7 +103,7 @@ public class TopicSessionTest extends TestCase private void subscriptionNameReuseForDifferentTopic(boolean shutdown) throws Exception { - AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test"); + AMQConnection con = (AMQConnection) getConnection("guest", "guest"); AMQTopic topic = new AMQTopic(con, "MyTopic1" + String.valueOf(shutdown)); AMQTopic topic2 = new AMQTopic(con, "MyOtherTopic1" + String.valueOf(shutdown)); @@ -143,13 +142,13 @@ public class TopicSessionTest extends TestCase public void testUnsubscriptionAfterConnectionClose() throws Exception { - AMQConnection con1 = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test"); + AMQConnection con1 = (AMQConnection) getConnection("guest", "guest"); AMQTopic topic = new AMQTopic(con1, "MyTopic3"); TopicSession session1 = con1.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); TopicPublisher publisher = session1.createPublisher(topic); - AMQConnection con2 = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test2", "test"); + AMQConnection con2 = (AMQConnection) getConnection("guest", "guest"); TopicSession session2 = con2.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); TopicSubscriber sub = session2.createDurableSubscriber(topic, "subscription0"); @@ -174,7 +173,7 @@ public class TopicSessionTest extends TestCase public void testTextMessageCreation() throws Exception { - AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test"); + AMQConnection con = (AMQConnection) getConnection("guest", "guest"); AMQTopic topic = new AMQTopic(con, "MyTopic4"); TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); TopicPublisher publisher = session1.createPublisher(topic); @@ -214,7 +213,7 @@ public class TopicSessionTest extends TestCase public void testSendingSameMessage() throws Exception { - AMQConnection conn = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test"); + AMQConnection conn = (AMQConnection) getConnection("guest", "guest"); TopicSession session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); TemporaryTopic topic = session.createTemporaryTopic(); assertNotNull(topic); @@ -237,7 +236,7 @@ public class TopicSessionTest extends TestCase public void testTemporaryTopic() throws Exception { - AMQConnection conn = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test"); + AMQConnection conn = (AMQConnection) getConnection("guest", "guest"); TopicSession session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); TemporaryTopic topic = session.createTemporaryTopic(); assertNotNull(topic); @@ -289,7 +288,7 @@ public class TopicSessionTest extends TestCase public void testNoLocal() throws Exception { - AMQConnection con = new AMQConnection(BROKER + "?retries='0'", "guest", "guest", "test", "test"); + AMQConnection con = (AMQConnection) getConnection("guest", "guest"); AMQTopic topic = new AMQTopic(con, "testNoLocal"); @@ -341,7 +340,7 @@ public class TopicSessionTest extends TestCase m = (TextMessage) noLocal.receive(100); assertNull(m); - AMQConnection con2 = new AMQConnection(BROKER + "?retries='0'", "guest", "guest", "test2", "test"); + AMQConnection con2 = (AMQConnection) getConnection("guest", "guest"); TopicSession session2 = con2.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); TopicPublisher publisher2 = session2.createPublisher(topic); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java index 1a45773907..150ff6e798 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java @@ -22,6 +22,7 @@ package org.apache.qpid.test.unit.transacted; import junit.framework.TestCase; import org.apache.qpid.AMQException; +import org.apache.qpid.testutil.QpidTestCase; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.url.URLSyntaxException; @@ -41,7 +42,7 @@ import javax.jms.TextMessage; * * Assumptions; - Assumes empty Queue */ -public class CommitRollbackTest extends TestCase +public class CommitRollbackTest extends QpidTestCase { protected AMQConnection conn; protected String queue = "direct://amq.direct//Qpid.Client.Transacted.CommitRollback.queue"; @@ -54,7 +55,6 @@ public class CommitRollbackTest extends TestCase Queue _jmsQueue; private static final Logger _logger = LoggerFactory.getLogger(CommitRollbackTest.class); - private static final String BROKER = "vm://:1"; private boolean _gotone = false; private boolean _gottwo = false; private boolean _gottwoRedelivered = false; @@ -62,20 +62,14 @@ public class CommitRollbackTest extends TestCase protected void setUp() throws Exception { super.setUp(); - if (BROKER.startsWith("vm")) - { - TransportConnection.createVMBroker(1); - } - testMethod++; queue += testMethod; - newConnection(); } - private void newConnection() throws AMQException, URLSyntaxException, JMSException + private void newConnection() throws Exception { - conn = new AMQConnection("amqp://guest:guest@client/test?brokerlist='" + BROKER + "'"); + conn = (AMQConnection) getConnection("guest", "guest"); _session = conn.createSession(true, Session.CLIENT_ACKNOWLEDGE); @@ -92,12 +86,7 @@ public class CommitRollbackTest extends TestCase protected void tearDown() throws Exception { super.tearDown(); - conn.close(); - if (BROKER.startsWith("vm")) - { - TransportConnection.killVMBroker(1); - } } /** diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java index 9e9815bcc0..1a86975b96 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java @@ -27,6 +27,8 @@ import org.apache.qpid.client.AMQSession; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.jms.Session; import org.apache.qpid.testutil.QpidTestCase; +import org.apache.qpid.AMQException; +import org.apache.qpid.url.URLSyntaxException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,233 +60,275 @@ public class TransactedTest extends QpidTestCase protected void setUp() throws Exception { - super.setUp(); - _logger.info("Create Connection"); - con = (AMQConnection) getConnection("guest", "guest"); - _logger.info("Create Session"); - session = con.createSession(true, Session.SESSION_TRANSACTED); - _logger.info("Create Q1"); - queue1 = - new AMQQueue(session.getDefaultQueueExchangeName(), new AMQShortString("Q1"), new AMQShortString("Q1"), false, - true); - _logger.info("Create Q2"); - AMQQueue queue2 = new AMQQueue(session.getDefaultQueueExchangeName(), new AMQShortString("Q2"), false); - - _logger.info("Create Consumer of Q1"); - consumer1 = session.createConsumer(queue1); - // Dummy just to create the queue. - _logger.info("Create Consumer of Q2"); - MessageConsumer consumer2 = session.createConsumer(queue2); - _logger.info("Close Consumer of Q2"); - consumer2.close(); - - _logger.info("Create producer to Q2"); - producer2 = session.createProducer(queue2); - - _logger.info("Start Connection"); - con.start(); - - _logger.info("Create prep connection"); - prepCon = new AMQConnection("vm://:1", "guest", "guest", "PrepConnection", "test"); - - _logger.info("Create prep session"); - prepSession = prepCon.createSession(false, AMQSession.NO_ACKNOWLEDGE); - - _logger.info("Create prep producer to Q1"); - prepProducer1 = prepSession.createProducer(queue1); - - _logger.info("Create prep connection start"); - prepCon.start(); - - _logger.info("Create test connection"); - testCon = (AMQConnection) getConnection("guest", "guest"); - _logger.info("Create test session"); - testSession = testCon.createSession(false, AMQSession.NO_ACKNOWLEDGE); - _logger.info("Create test consumer of q2"); - testConsumer2 = testSession.createConsumer(queue2); + try + { + super.setUp(); + _logger.info("Create Connection"); + con = (AMQConnection) getConnection("guest", "guest"); + _logger.info("Create Session"); + session = con.createSession(true, Session.SESSION_TRANSACTED); + _logger.info("Create Q1"); + queue1 = new AMQQueue(session.getDefaultQueueExchangeName(), new AMQShortString("Q1"), + new AMQShortString("Q1"), false, true); + _logger.info("Create Q2"); + AMQQueue queue2 = new AMQQueue(session.getDefaultQueueExchangeName(), new AMQShortString("Q2"), false); + + _logger.info("Create Consumer of Q1"); + consumer1 = session.createConsumer(queue1); + // Dummy just to create the queue. + _logger.info("Create Consumer of Q2"); + MessageConsumer consumer2 = session.createConsumer(queue2); + _logger.info("Close Consumer of Q2"); + consumer2.close(); + + _logger.info("Create producer to Q2"); + producer2 = session.createProducer(queue2); + + _logger.info("Start Connection"); + con.start(); + + _logger.info("Create prep connection"); + prepCon = (AMQConnection) getConnection("guest", "guest"); + + _logger.info("Create prep session"); + prepSession = prepCon.createSession(false, AMQSession.NO_ACKNOWLEDGE); + + _logger.info("Create prep producer to Q1"); + prepProducer1 = prepSession.createProducer(queue1); + + _logger.info("Create prep connection start"); + prepCon.start(); + + _logger.info("Create test connection"); + testCon = (AMQConnection) getConnection("guest", "guest"); + _logger.info("Create test session"); + testSession = testCon.createSession(false, AMQSession.NO_ACKNOWLEDGE); + _logger.info("Create test consumer of q2"); + testConsumer2 = testSession.createConsumer(queue2); + } + catch (Exception e) + { + e.printStackTrace(); + stopBroker(); + throw e; + } } protected void tearDown() throws Exception { - _logger.info("Close connection"); - con.close(); - _logger.info("Close test connection"); - testCon.close(); - _logger.info("Close prep connection"); - prepCon.close(); - super.tearDown(); + try + { + _logger.info("Close connection"); + con.close(); + _logger.info("Close test connection"); + testCon.close(); + _logger.info("Close prep connection"); + prepCon.close(); + } + catch (Exception e) + { + e.printStackTrace(); + } + finally + { + super.tearDown(); + } } public void testCommit() throws Exception { - // add some messages - _logger.info("Send prep A"); - prepProducer1.send(prepSession.createTextMessage("A")); - _logger.info("Send prep B"); - prepProducer1.send(prepSession.createTextMessage("B")); - _logger.info("Send prep C"); - prepProducer1.send(prepSession.createTextMessage("C")); - - // send and receive some messages - _logger.info("Send X to Q2"); - producer2.send(session.createTextMessage("X")); - _logger.info("Send Y to Q2"); - producer2.send(session.createTextMessage("Y")); - _logger.info("Send Z to Q2"); - producer2.send(session.createTextMessage("Z")); - - _logger.info("Read A from Q1"); - expect("A", consumer1.receive(1000)); - _logger.info("Read B from Q1"); - expect("B", consumer1.receive(1000)); - _logger.info("Read C from Q1"); - expect("C", consumer1.receive(1000)); - - // commit - _logger.info("session commit"); - session.commit(); - _logger.info("Start test Connection"); - testCon.start(); - - // ensure sent messages can be received and received messages are gone - _logger.info("Read X from Q2"); - expect("X", testConsumer2.receive(1000)); - _logger.info("Read Y from Q2"); - expect("Y", testConsumer2.receive(1000)); - _logger.info("Read Z from Q2"); - expect("Z", testConsumer2.receive(1000)); - - _logger.info("create test session on Q1"); - testConsumer1 = testSession.createConsumer(queue1); - _logger.info("Read null from Q1"); - assertTrue(null == testConsumer1.receive(1000)); - _logger.info("Read null from Q2"); - assertTrue(null == testConsumer2.receive(1000)); + try + { +// add some messages + _logger.info("Send prep A"); + prepProducer1.send(prepSession.createTextMessage("A")); + _logger.info("Send prep B"); + prepProducer1.send(prepSession.createTextMessage("B")); + _logger.info("Send prep C"); + prepProducer1.send(prepSession.createTextMessage("C")); + + // send and receive some messages + _logger.info("Send X to Q2"); + producer2.send(session.createTextMessage("X")); + _logger.info("Send Y to Q2"); + producer2.send(session.createTextMessage("Y")); + _logger.info("Send Z to Q2"); + producer2.send(session.createTextMessage("Z")); + + _logger.info("Read A from Q1"); + expect("A", consumer1.receive(1000)); + _logger.info("Read B from Q1"); + expect("B", consumer1.receive(1000)); + _logger.info("Read C from Q1"); + expect("C", consumer1.receive(1000)); + + // commit + _logger.info("session commit"); + session.commit(); + _logger.info("Start test Connection"); + testCon.start(); + + // ensure sent messages can be received and received messages are gone + _logger.info("Read X from Q2"); + expect("X", testConsumer2.receive(1000)); + _logger.info("Read Y from Q2"); + expect("Y", testConsumer2.receive(1000)); + _logger.info("Read Z from Q2"); + expect("Z", testConsumer2.receive(1000)); + + _logger.info("create test session on Q1"); + testConsumer1 = testSession.createConsumer(queue1); + _logger.info("Read null from Q1"); + assertTrue(null == testConsumer1.receive(1000)); + _logger.info("Read null from Q2"); + assertTrue(null == testConsumer2.receive(1000)); + } + catch (Throwable e) + { + e.printStackTrace(); + fail(e.getMessage()); + } } public void testRollback() throws Exception { - // add some messages - _logger.info("Send prep RB_A"); - prepProducer1.send(prepSession.createTextMessage("RB_A")); - _logger.info("Send prep RB_B"); - prepProducer1.send(prepSession.createTextMessage("RB_B")); - _logger.info("Send prep RB_C"); - prepProducer1.send(prepSession.createTextMessage("RB_C")); - - _logger.info("Sending RB_X RB_Y RB_Z"); - producer2.send(session.createTextMessage("RB_X")); - producer2.send(session.createTextMessage("RB_Y")); - producer2.send(session.createTextMessage("RB_Z")); - _logger.info("Receiving RB_A RB_B"); - expect("RB_A", consumer1.receive(1000)); - expect("RB_B", consumer1.receive(1000)); - // Don't consume 'RB_C' leave it in the prefetch cache to ensure rollback removes it. - // Quick sleep to ensure 'RB_C' gets pre-fetched - Thread.sleep(500); - - // rollback - _logger.info("rollback"); - session.rollback(); - - _logger.info("Receiving RB_A RB_B RB_C"); - // ensure sent messages are not visible and received messages are requeued - expect("RB_A", consumer1.receive(1000), true); - expect("RB_B", consumer1.receive(1000), true); - expect("RB_C", consumer1.receive(1000), true); - - _logger.info("Starting new connection"); - testCon.start(); - testConsumer1 = testSession.createConsumer(queue1); - _logger.info("Testing we have no messages left"); - assertTrue(null == testConsumer1.receive(1000)); - assertTrue(null == testConsumer2.receive(1000)); - - session.commit(); - - _logger.info("Testing we have no messages left after commit"); - assertTrue(null == testConsumer1.receive(1000)); - assertTrue(null == testConsumer2.receive(1000)); + try + { +// add some messages + _logger.info("Send prep RB_A"); + prepProducer1.send(prepSession.createTextMessage("RB_A")); + _logger.info("Send prep RB_B"); + prepProducer1.send(prepSession.createTextMessage("RB_B")); + _logger.info("Send prep RB_C"); + prepProducer1.send(prepSession.createTextMessage("RB_C")); + + _logger.info("Sending RB_X RB_Y RB_Z"); + producer2.send(session.createTextMessage("RB_X")); + producer2.send(session.createTextMessage("RB_Y")); + producer2.send(session.createTextMessage("RB_Z")); + _logger.info("Receiving RB_A RB_B"); + expect("RB_A", consumer1.receive(1000)); + expect("RB_B", consumer1.receive(1000)); + // Don't consume 'RB_C' leave it in the prefetch cache to ensure rollback removes it. + // Quick sleep to ensure 'RB_C' gets pre-fetched + Thread.sleep(500); + + // rollback + _logger.info("rollback"); + session.rollback(); + + _logger.info("Receiving RB_A RB_B RB_C"); + // ensure sent messages are not visible and received messages are requeued + expect("RB_A", consumer1.receive(1000), true); + expect("RB_B", consumer1.receive(1000), true); + expect("RB_C", consumer1.receive(1000), true); + + _logger.info("Starting new connection"); + testCon.start(); + testConsumer1 = testSession.createConsumer(queue1); + _logger.info("Testing we have no messages left"); + assertTrue(null == testConsumer1.receive(1000)); + assertTrue(null == testConsumer2.receive(1000)); + + session.commit(); + + _logger.info("Testing we have no messages left after commit"); + assertTrue(null == testConsumer1.receive(1000)); + assertTrue(null == testConsumer2.receive(1000)); + } + catch (Throwable e) + { + e.printStackTrace(); + fail(e.getMessage()); + } } public void testResendsMsgsAfterSessionClose() throws Exception { - AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test"); + try + { + AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test"); - Session consumerSession = con.createSession(true, Session.SESSION_TRANSACTED); - AMQQueue queue3 = new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q3"), false); - MessageConsumer consumer = consumerSession.createConsumer(queue3); + Session consumerSession = con.createSession(true, Session.SESSION_TRANSACTED); + AMQQueue queue3 = new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q3"), false); + MessageConsumer consumer = consumerSession.createConsumer(queue3); - AMQConnection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test"); - Session producerSession = con2.createSession(true, Session.SESSION_TRANSACTED); - MessageProducer producer = producerSession.createProducer(queue3); + AMQConnection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test"); + Session producerSession = con2.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer producer = producerSession.createProducer(queue3); - _logger.info("Sending four messages"); - producer.send(producerSession.createTextMessage("msg1")); - producer.send(producerSession.createTextMessage("msg2")); - producer.send(producerSession.createTextMessage("msg3")); - producer.send(producerSession.createTextMessage("msg4")); + _logger.info("Sending four messages"); + producer.send(producerSession.createTextMessage("msg1")); + producer.send(producerSession.createTextMessage("msg2")); + producer.send(producerSession.createTextMessage("msg3")); + producer.send(producerSession.createTextMessage("msg4")); - producerSession.commit(); + producerSession.commit(); - _logger.info("Starting connection"); - con.start(); - TextMessage tm = (TextMessage) consumer.receive(); - assertNotNull(tm); - assertEquals("msg1", tm.getText()); + _logger.info("Starting connection"); + con.start(); + TextMessage tm = (TextMessage) consumer.receive(); + assertNotNull(tm); + assertEquals("msg1", tm.getText()); - consumerSession.commit(); + consumerSession.commit(); - _logger.info("Received and committed first message"); - tm = (TextMessage) consumer.receive(1000); - assertNotNull(tm); - assertEquals("msg2", tm.getText()); + _logger.info("Received and committed first message"); + tm = (TextMessage) consumer.receive(1000); + assertNotNull(tm); + assertEquals("msg2", tm.getText()); - tm = (TextMessage) consumer.receive(1000); - assertNotNull(tm); - assertEquals("msg3", tm.getText()); + tm = (TextMessage) consumer.receive(1000); + assertNotNull(tm); + assertEquals("msg3", tm.getText()); - tm = (TextMessage) consumer.receive(1000); - assertNotNull(tm); - assertEquals("msg4", tm.getText()); + tm = (TextMessage) consumer.receive(1000); + assertNotNull(tm); + assertEquals("msg4", tm.getText()); - _logger.info("Received all four messages. Closing connection with three outstanding messages"); + _logger.info("Received all four messages. Closing connection with three outstanding messages"); - consumerSession.close(); + consumerSession.close(); - consumerSession = con.createSession(true, Session.SESSION_TRANSACTED); + consumerSession = con.createSession(true, Session.SESSION_TRANSACTED); - consumer = consumerSession.createConsumer(queue3); + consumer = consumerSession.createConsumer(queue3); - // no ack for last three messages so when I call recover I expect to get three messages back - tm = (TextMessage) consumer.receive(3000); - assertNotNull(tm); - assertEquals("msg2", tm.getText()); - assertTrue("Message is not redelivered", tm.getJMSRedelivered()); + // no ack for last three messages so when I call recover I expect to get three messages back + tm = (TextMessage) consumer.receive(3000); + assertNotNull(tm); + assertEquals("msg2", tm.getText()); + assertTrue("Message is not redelivered", tm.getJMSRedelivered()); - tm = (TextMessage) consumer.receive(3000); - assertNotNull(tm); - assertEquals("msg3", tm.getText()); - assertTrue("Message is not redelivered", tm.getJMSRedelivered()); + tm = (TextMessage) consumer.receive(3000); + assertNotNull(tm); + assertEquals("msg3", tm.getText()); + assertTrue("Message is not redelivered", tm.getJMSRedelivered()); - tm = (TextMessage) consumer.receive(3000); - assertNotNull(tm); - assertEquals("msg4", tm.getText()); - assertTrue("Message is not redelivered", tm.getJMSRedelivered()); + tm = (TextMessage) consumer.receive(3000); + assertNotNull(tm); + assertEquals("msg4", tm.getText()); + assertTrue("Message is not redelivered", tm.getJMSRedelivered()); - _logger.info("Received redelivery of three messages. Committing"); + _logger.info("Received redelivery of three messages. Committing"); - consumerSession.commit(); + consumerSession.commit(); - _logger.info("Called commit"); + _logger.info("Called commit"); - tm = (TextMessage) consumer.receive(1000); - assertNull(tm); + tm = (TextMessage) consumer.receive(1000); + assertNull(tm); - _logger.info("No messages redelivered as is expected"); + _logger.info("No messages redelivered as is expected"); - con.close(); - con2.close(); + con.close(); + con2.close(); + } + catch (Throwable e) + { + e.printStackTrace(); + fail(e.getMessage()); + } } private void expect(String text, Message msg) throws JMSException diff --git a/java/client/src/test/java/org/apache/qpid/testutil/QpidTestCase.java b/java/client/src/test/java/org/apache/qpid/testutil/QpidTestCase.java index 598211c393..2289d81fd1 100644 --- a/java/client/src/test/java/org/apache/qpid/testutil/QpidTestCase.java +++ b/java/client/src/test/java/org/apache/qpid/testutil/QpidTestCase.java @@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory; */ public class QpidTestCase extends TestCase { + /* this clas logger */ private static final Logger _logger = LoggerFactory.getLogger(QpidTestCase.class); @@ -46,6 +47,7 @@ public class QpidTestCase extends TestCase private static final String BROKER_PATH = "broker_path"; private static final String BROKER_PARAM = "broker_param"; private static final String BROKER_VM = "vm"; + private static final String EXT_BROKER = "ext" ; /** * The process where the remote broker is running. */ @@ -79,18 +81,18 @@ public class QpidTestCase extends TestCase { _brokerParams = System.getProperties().getProperty(BROKER_PARAM); } - if (!_shel.equals(BROKER_VM)) + if (!_shel.equals(BROKER_VM) && ! _shel.equals(EXT_BROKER) ) { // start a new broker startBroker(); } - else + else if ( ! _shel.equals(EXT_BROKER) ) { // create an in_VM broker TransportConnection.createVMBroker(1); } - System.out.println("========================================="); - System.out.println("= " + _shel + " " + _brokerPath + " " + _brokerParams); + _logger.info("========================================="); + _logger.info("= " + _shel + " " + _brokerPath + " " + _brokerParams); } /** @@ -100,17 +102,18 @@ public class QpidTestCase extends TestCase */ protected void tearDown() throws Exception { - super.tearDown(); _logger.info("Kill broker"); if (_brokerProcess != null) { // destroy the currently running broker _brokerProcess.destroy(); + _brokerProcess = null; } else { TransportConnection.killAllVMBrokers(); } + super.tearDown(); } //--------- Util method @@ -131,6 +134,11 @@ public class QpidTestCase extends TestCase //bad, we had an error starting the broker throw new Exception("Problem when starting the broker: " + reader.readLine()); } + // We need to wait for th ebroker to start ideally we would need to ping it + synchronized(this) + { + this.wait(1000); + } } /** |
