diff options
Diffstat (limited to 'java/client/src/test')
3 files changed, 363 insertions, 290 deletions
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java index fe15fa5155..1e50a62fee 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java @@ -39,8 +39,8 @@ import org.apache.log4j.Logger; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.client.message.JMSTextMessage; -import org.apache.qpid.testutil.VMBrokerSetup; public class PropertyValueTest extends TestCase implements MessageListener { @@ -59,19 +59,13 @@ public class PropertyValueTest extends TestCase implements MessageListener protected void setUp() throws Exception { super.setUp(); - try - { - init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test")); - } - catch (Exception e) - { - fail("Unable to initialilse connection: " + e); - } + TransportConnection.createVMBroker(1); } protected void tearDown() throws Exception { super.tearDown(); + TransportConnection.killVMBroker(1); } private void init(AMQConnection connection) throws Exception @@ -91,14 +85,48 @@ public class PropertyValueTest extends TestCase implements MessageListener connection.start(); } - public void test() throws Exception + public void testOnce() { - int count = _count; - send(count); - waitFor(count); - check(); - _logger.info("Completed without failure"); - _connection.close(); + runBatch(1); + } + + public void test50() + { + runBatch(50); + } + + private void runBatch(int runSize) + { + try + { + int run = 0; + while (run < runSize) + { + _logger.error("Run Number:" + run++); + try + { + init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test")); + } + catch (Exception e) + { + fail("Unable to initialilse connection: " + e); + } + + int count = _count; + send(count); + waitFor(count); + check(); + _logger.info("Completed without failure"); + _connection.close(); + + _logger.error("End Run Number:" + (run - 1)); + } + } + catch (Exception e) + { + _logger.error(e.getMessage(), e); + e.printStackTrace(); + } } void send(int count) throws JMSException @@ -138,7 +166,7 @@ public class PropertyValueTest extends TestCase implements MessageListener m.setJMSReplyTo(q); m.setStringProperty("TempQueue", q.toString()); - _logger.info("Message:" + m); + _logger.trace("Message:" + m); Assert.assertEquals("Check temp queue has been set correctly", m.getJMSReplyTo().toString(), m.getStringProperty("TempQueue")); @@ -150,7 +178,7 @@ public class PropertyValueTest extends TestCase implements MessageListener m.setShortProperty("Short", (short) Short.MAX_VALUE); m.setStringProperty("String", "Test"); - _logger.info("Sending Msg:" + m); + _logger.debug("Sending Msg:" + m); producer.send(m); } } @@ -206,8 +234,11 @@ public class PropertyValueTest extends TestCase implements MessageListener Assert.assertEquals("Check String properties are correctly transported", "Test", m.getStringProperty("String")); } + received.clear(); assertEqual(messages.iterator(), actual.iterator()); + + messages.clear(); } private static void assertEqual(Iterator expected, Iterator actual) @@ -269,11 +300,11 @@ public class PropertyValueTest extends TestCase implements MessageListener { test._count = Integer.parseInt(argv[1]); } - test.test(); + test.testOnce(); } public static junit.framework.Test suite() { - return new VMBrokerSetup(new junit.framework.TestSuite(PropertyValueTest.class)); + return new junit.framework.TestSuite(PropertyValueTest.class); } } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java index a56bae3d70..7762cb3fe9 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java @@ -42,6 +42,7 @@ import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.url.URLSyntaxException; import org.apache.qpid.AMQException; +import org.apache.qpid.testutil.QpidClientConnection; import org.apache.log4j.Logger; import org.apache.log4j.Level; @@ -62,14 +63,14 @@ public class MessageRequeueTest extends TestCase private boolean testReception = true; private long[] receieved = new long[numTestMessages + 1]; - private boolean passed=false; + private boolean passed = false; protected void setUp() throws Exception { super.setUp(); TransportConnection.createVMBroker(1); - QpidClientConnection conn = new QpidClientConnection(); + QpidClientConnection conn = new QpidClientConnection(BROKER); conn.connect(); // clear queue @@ -85,21 +86,28 @@ public class MessageRequeueTest extends TestCase { super.tearDown(); - if (!passed) + if (!passed) // clean up { - QpidClientConnection conn = new QpidClientConnection(); + QpidClientConnection conn = new QpidClientConnection(BROKER); conn.connect(); // clear queue conn.consume(queue, consumeTimeout); + + conn.disconnect(); } TransportConnection.killVMBroker(1); } - /** multiple consumers */ + /** + * multiple consumers + * + * @throws javax.jms.JMSException if a JMS problem occurs + * @throws InterruptedException on timeout + */ public void testDrain() throws JMSException, InterruptedException { - QpidClientConnection conn = new QpidClientConnection(); + QpidClientConnection conn = new QpidClientConnection(BROKER); conn.connect(); @@ -170,6 +178,7 @@ public class MessageRequeueTest extends TestCase assertEquals(list.toString(), 0, failed); _logger.info("consumed: " + messagesReceived); conn.disconnect(); + passed = true; } /** multiple consumers */ @@ -186,8 +195,8 @@ public class MessageRequeueTest extends TestCase Thread t4 = new Thread(c4); t1.start(); -// t2.start(); -// t3.start(); + t2.start(); + t3.start(); // t4.start(); try @@ -230,7 +239,7 @@ public class MessageRequeueTest extends TestCase } assertEquals(list.toString() + "-" + numTestMessages + "-" + totalConsumed, 0, failed); assertEquals("number of consumed messages does not match initial data", numTestMessages, totalConsumed); - passed=true; + passed = true; } class Consumer implements Runnable @@ -248,7 +257,7 @@ public class MessageRequeueTest extends TestCase try { _logger.info("consumer-" + id + ": starting"); - QpidClientConnection conn = new QpidClientConnection(); + QpidClientConnection conn = new QpidClientConnection(BROKER); conn.connect(); @@ -318,286 +327,51 @@ public class MessageRequeueTest extends TestCase } - public class QpidClientConnection implements ExceptionListener + public void testRequeue() throws JMSException, AMQException, URLSyntaxException { - private boolean transacted = true; - private int ackMode = Session.CLIENT_ACKNOWLEDGE; - private Connection connection; - - private String virtualHost; - private String brokerlist; - private int prefetch; - protected Session session; - protected boolean connected; - - public QpidClientConnection() - { - super(); - setVirtualHost("/test"); - setBrokerList(BROKER); - setPrefetch(5000); - } - - - public void connect() throws JMSException - { - if (!connected) - { - /* - * amqp://[user:pass@][clientid]/virtualhost? - * brokerlist='[transport://]host[:port][?option='value'[&option='value']];' - * [&failover='method[?option='value'[&option='value']]'] - * [&option='value']" - */ - String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'"; - try - { - AMQConnectionFactory factory = new AMQConnectionFactory(new AMQConnectionURL(brokerUrl)); - _logger.info("connecting to Qpid :" + brokerUrl); - connection = factory.createConnection(); - - // register exception listener - connection.setExceptionListener(this); - - session = ((AMQConnection) connection).createSession(transacted, ackMode, prefetch); - - - _logger.info("starting connection"); - connection.start(); - - connected = true; - } - catch (URLSyntaxException e) - { - throw new JMSException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage()); - } - } - } - - public void disconnect() throws JMSException - { - if (connected) - { - session.commit(); - session.close(); - connection.close(); - connected = false; - _logger.info("disconnected"); - } - } - - public void disconnectWithoutCommit() throws JMSException - { - if (connected) - { - session.close(); - connection.close(); - connected = false; - _logger.info("disconnected without commit"); - } - } - - public String getBrokerList() - { - return brokerlist; - } - - public void setBrokerList(String brokerlist) - { - this.brokerlist = brokerlist; - } - - public String getVirtualHost() + int run = 0; + while (run < 10) { - return virtualHost; - } - - public void setVirtualHost(String virtualHost) - { - this.virtualHost = virtualHost; - } - - public void setPrefetch(int prefetch) - { - this.prefetch = prefetch; - } + run++; - - /** override as necessary */ - public void onException(JMSException exception) - { - _logger.info("ExceptionListener event: error " + exception.getErrorCode() + ", message: " + exception.getMessage()); - } - - public boolean isConnected() - { - return connected; - } - - public Session getSession() - { - return session; - } - - /** - * Put a String as a text messages, repeat n times. A null payload will result in a null message. - * - * @param queueName The queue name to put to - * @param payload the content of the payload - * @param copies the number of messages to put - * - * @throws javax.jms.JMSException any exception that occurs - */ - public void put(String queueName, String payload, int copies) throws JMSException - { - if (!connected) - { - connect(); - } - - _logger.info("putting to queue " + queueName); - Queue queue = session.createQueue(queueName); - - final MessageProducer sender = session.createProducer(queue); - - for (int i = 0; i < copies; i++) - { - Message m = session.createTextMessage(payload + i); - m.setIntProperty("index", i + 1); - sender.send(m); - } - - session.commit(); - sender.close(); - _logger.info("put " + copies + " copies"); - } - - /** - * GET the top message on a queue. Consumes the message. Accepts timeout value. - * - * @param queueName The quename to get from - * @param readTimeout The timeout to use - * - * @return the content of the text message if any - * - * @throws javax.jms.JMSException any exception that occured - */ - public Message getNextMessage(String queueName, long readTimeout) throws JMSException - { - if (!connected) + if (_logger.isInfoEnabled()) { - connect(); + _logger.info("testRequeue run " + run); } - Queue queue = session.createQueue(queueName); + String virtualHost = "/test"; + String brokerlist = BROKER; + String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'"; - final MessageConsumer consumer = session.createConsumer(queue); + Connection conn = new AMQConnection(brokerUrl); + Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Queue q = session.createQueue(queue); - Message message = consumer.receive(readTimeout); - session.commit(); - consumer.close(); - - Message result; + _logger.debug("Create Consumer"); + MessageConsumer consumer = session.createConsumer(q); - // all messages we consume should be TextMessages - if (message instanceof TextMessage) - { - result = ((TextMessage) message); - } - else if (null == message) + try { - result = null; + Thread.sleep(2000); } - else + catch (InterruptedException e) { - _logger.info("warning: received non-text message"); - result = message; + // } - return result; - } + _logger.debug("Receiving msg"); + Message msg = consumer.receive(1000); - /** - * GET the top message on a queue. Consumes the message. - * - * @param queueName The Queuename to get from - * - * @return The string content of the text message, if any received - * - * @throws javax.jms.JMSException any exception that occurs - */ - public Message getNextMessage(String queueName) throws JMSException - { - return getNextMessage(queueName, 0); - } + assertNotNull("Message should not be null", msg); - /** - * Completely clears a queue. For readTimeout behaviour see Javadocs for javax.jms.MessageConsumer. - * - * @param queueName The Queue name to consume from - * @param readTimeout The timeout for each consume - * - * @throws javax.jms.JMSException Any exception that occurs during the consume - * @throws InterruptedException If the consume thread was interrupted during a consume. - */ - public void consume(String queueName, int readTimeout) throws JMSException, InterruptedException - { - if (!connected) - { - connect(); - } - - _logger.info("consuming queue " + queueName); - Queue queue = session.createQueue(queueName); - - final MessageConsumer consumer = session.createConsumer(queue); - int messagesReceived = 0; - - _logger.info("consuming..."); - while ((consumer.receive(readTimeout)) != null) - { - messagesReceived++; - } - session.commit(); + // As we have not ack'd message will be requeued. + _logger.debug("Close Consumer"); consumer.close(); - _logger.info("consumed: " + messagesReceived); - } - } - - - public void testRequeue() throws JMSException, AMQException, URLSyntaxException - { - String virtualHost = "/test"; - String brokerlist = "vm://:1"; - String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'"; - Connection conn = new AMQConnection(brokerUrl); - Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue q = session.createQueue(queue); - - _logger.info("Create Consumer"); - MessageConsumer consumer = session.createConsumer(q); - - try - { - Thread.sleep(2000); - } - catch (InterruptedException e) - { - // + _logger.debug("Close Connection"); + conn.close(); } - - _logger.info("Receiving msg"); - Message msg = consumer.receive(); - - assertNotNull("Message should not be null", msg); - - _logger.info("Close Consumer"); - consumer.close(); - - _logger.info("Close Connection"); - conn.close(); } }
\ No newline at end of file diff --git a/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java b/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java new file mode 100644 index 0000000000..f2afa472ab --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java @@ -0,0 +1,268 @@ +package org.apache.qpid.testutil; + +import org.apache.qpid.client.AMQConnectionFactory; +import org.apache.qpid.client.AMQConnectionURL; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.url.URLSyntaxException; +import org.apache.log4j.Logger; + +import javax.jms.ExceptionListener; +import javax.jms.Session; +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Queue; +import javax.jms.MessageProducer; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.TextMessage; + +public class QpidClientConnection implements ExceptionListener +{ + + private static final Logger _logger = Logger.getLogger(QpidClientConnection.class); + + private boolean transacted = true; + private int ackMode = Session.CLIENT_ACKNOWLEDGE; + private Connection connection; + + private String virtualHost; + private String brokerlist; + private int prefetch; + protected Session session; + protected boolean connected; + + public QpidClientConnection(String broker) + { + super(); + setVirtualHost("/test"); + setBrokerList(broker); + setPrefetch(5000); + } + + + public void connect() throws JMSException + { + if (!connected) + { + /* + * amqp://[user:pass@][clientid]/virtualhost? + * brokerlist='[transport://]host[:port][?option='value'[&option='value']];' + * [&failover='method[?option='value'[&option='value']]'] + * [&option='value']" + */ + String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'"; + try + { + AMQConnectionFactory factory = new AMQConnectionFactory(new AMQConnectionURL(brokerUrl)); + _logger.info("connecting to Qpid :" + brokerUrl); + connection = factory.createConnection(); + + // register exception listener + connection.setExceptionListener(this); + + session = ((AMQConnection) connection).createSession(transacted, ackMode, prefetch); + + + _logger.info("starting connection"); + connection.start(); + + connected = true; + } + catch (URLSyntaxException e) + { + throw new JMSException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage()); + } + } + } + + public void disconnect() throws JMSException + { + if (connected) + { + session.commit(); + session.close(); + connection.close(); + connected = false; + _logger.info("disconnected"); + } + } + + public void disconnectWithoutCommit() throws JMSException + { + if (connected) + { + session.close(); + connection.close(); + connected = false; + _logger.info("disconnected without commit"); + } + } + + public String getBrokerList() + { + return brokerlist; + } + + public void setBrokerList(String brokerlist) + { + this.brokerlist = brokerlist; + } + + public String getVirtualHost() + { + return virtualHost; + } + + public void setVirtualHost(String virtualHost) + { + this.virtualHost = virtualHost; + } + + public void setPrefetch(int prefetch) + { + this.prefetch = prefetch; + } + + + /** override as necessary */ + public void onException(JMSException exception) + { + _logger.info("ExceptionListener event: error " + exception.getErrorCode() + ", message: " + exception.getMessage()); + } + + public boolean isConnected() + { + return connected; + } + + public Session getSession() + { + return session; + } + + /** + * Put a String as a text messages, repeat n times. A null payload will result in a null message. + * + * @param queueName The queue name to put to + * @param payload the content of the payload + * @param copies the number of messages to put + * + * @throws javax.jms.JMSException any exception that occurs + */ + public void put(String queueName, String payload, int copies) throws JMSException + { + if (!connected) + { + connect(); + } + + _logger.info("putting to queue " + queueName); + Queue queue = session.createQueue(queueName); + + final MessageProducer sender = session.createProducer(queue); + + for (int i = 0; i < copies; i++) + { + Message m = session.createTextMessage(payload + i); + m.setIntProperty("index", i + 1); + sender.send(m); + } + + session.commit(); + sender.close(); + _logger.info("put " + copies + " copies"); + } + + /** + * GET the top message on a queue. Consumes the message. Accepts timeout value. + * + * @param queueName The quename to get from + * @param readTimeout The timeout to use + * + * @return the content of the text message if any + * + * @throws javax.jms.JMSException any exception that occured + */ + public Message getNextMessage(String queueName, long readTimeout) throws JMSException + { + if (!connected) + { + connect(); + } + + Queue queue = session.createQueue(queueName); + + final MessageConsumer consumer = session.createConsumer(queue); + + Message message = consumer.receive(readTimeout); + session.commit(); + consumer.close(); + + Message result; + + // all messages we consume should be TextMessages + if (message instanceof TextMessage) + { + result = ((TextMessage) message); + } + else if (null == message) + { + result = null; + } + else + { + _logger.info("warning: received non-text message"); + result = message; + } + + return result; + } + + /** + * GET the top message on a queue. Consumes the message. + * + * @param queueName The Queuename to get from + * + * @return The string content of the text message, if any received + * + * @throws javax.jms.JMSException any exception that occurs + */ + public Message getNextMessage(String queueName) throws JMSException + { + return getNextMessage(queueName, 0); + } + + /** + * Completely clears a queue. For readTimeout behaviour see Javadocs for javax.jms.MessageConsumer. + * + * @param queueName The Queue name to consume from + * @param readTimeout The timeout for each consume + * + * @throws javax.jms.JMSException Any exception that occurs during the consume + * @throws InterruptedException If the consume thread was interrupted during a consume. + */ + public void consume(String queueName, int readTimeout) throws JMSException, InterruptedException + { + if (!connected) + { + connect(); + } + + _logger.info("consuming queue " + queueName); + Queue queue = session.createQueue(queueName); + + final MessageConsumer consumer = session.createConsumer(queue); + int messagesReceived = 0; + + _logger.info("consuming..."); + while ((consumer.receive(readTimeout)) != null) + { + messagesReceived++; + } + + session.commit(); + consumer.close(); + _logger.info("consumed: " + messagesReceived); + } +} + |
