diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2007-04-05 08:51:55 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2007-04-05 08:51:55 +0000 |
| commit | 2057518593b06f4bbb3d99e817332f784f786c6e (patch) | |
| tree | 2f3cdf9161afa90415d859a49dd13324c75204c1 /java | |
| parent | ed8b109e67b650005fee8b4bd9377993e9a39e59 (diff) | |
| download | qpid-python-2057518593b06f4bbb3d99e817332f784f786c6e.tar.gz | |
QPID-308 Added test case to demonstrate heap exhaustion of broker. Can't be run InVM as it kills the broker.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@525766 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
| -rw-r--r-- | java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java | 87 | ||||
| -rw-r--r-- | java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnection.java | 267 |
2 files changed, 354 insertions, 0 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java b/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java new file mode 100644 index 0000000000..c3a0e0d47b --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java @@ -0,0 +1,87 @@ +package org.apache.qpid.server.failure; + +import junit.framework.TestCase; +import org.apache.qpid.testutil.QpidClientConnection; + + +/** Test Case provided by client Non-functional Test NF101: heap exhaustion behaviour */ +public class HeapExhaustion extends TestCase +{ + protected QpidClientConnection conn; + protected final String BROKER = "localhost"; + protected final String vhost = "/test"; + protected final String queue = "direct://amq.direct//queue"; + + protected String hundredK; + protected String megabyte; + + protected void log(String msg) + { + System.out.println(msg); + } + + protected String generatePayloadOfSize(Integer numBytes) + { + return new String(new byte[numBytes]); + } + + protected void setUp() throws Exception + { + super.setUp(); + conn = new QpidClientConnection(BROKER); + conn.setVirtualHost(vhost); + + conn.connect(); + // clear queue + log("setup: clearing test queue"); + conn.consume(queue, 2000); + + hundredK = generatePayloadOfSize(1024 * 100); + megabyte = generatePayloadOfSize(1024 * 1024); + } + + @Override + protected void tearDown() throws Exception + { + super.tearDown(); + conn.disconnect(); + } + + + /** PUT at maximum rate (although we commit after each PUT) until failure + * @throws Exception on error + */ + public void testUntilFailure() throws Exception + { + int copies = 0; + int total = 0; + String payload = hundredK; + int size = payload.getBytes().length; + while (true) + { + conn.put(queue, payload, 1); + copies++; + total += size; + log("put copy " + copies + " OK for total bytes: " + total); + } + } + + /** PUT at lower rate (5 per second) until failure + * @throws Exception on error + */ + public void testUntilFailureWithDelays() throws Exception + { + int copies = 0; + int total = 0; + String payload = hundredK; + int size = payload.getBytes().length; + while (true) + { + conn.put(queue, payload, 1); + copies++; + total += size; + log("put copy " + copies + " OK for total bytes: " + total); + Thread.sleep(200); + } + } +} diff --git a/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnection.java b/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnection.java new file mode 100644 index 0000000000..53c4e7bb8f --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnection.java @@ -0,0 +1,267 @@ +package org.apache.qpid.testutil; + +import org.apache.qpid.client.AMQConnectionFactory; +import org.apache.qpid.client.AMQConnectionURL; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.url.URLSyntaxException; +import org.apache.log4j.Logger; + +import javax.jms.ExceptionListener; +import javax.jms.Session; +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Queue; +import javax.jms.MessageProducer; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.TextMessage; + +public class QpidClientConnection implements ExceptionListener +{ + + private static final Logger _logger = Logger.getLogger(QpidClientConnection.class); + + private boolean transacted = true; + private int ackMode = Session.CLIENT_ACKNOWLEDGE; + private Connection connection; + + private String virtualHost; + private String brokerlist; + private int prefetch; + protected Session session; + protected boolean connected; + + public QpidClientConnection(String broker) + { + super(); + setVirtualHost("/test"); + setBrokerList(broker); + setPrefetch(5000); + } + + + public void connect() throws JMSException + { + if (!connected) + { + /* + * amqp://[user:pass@][clientid]/virtualhost? + * brokerlist='[transport://]host[:port][?option='value'[&option='value']];' + * [&failover='method[?option='value'[&option='value']]'] + * [&option='value']" + */ + String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'"; + try + { + AMQConnectionFactory factory = new AMQConnectionFactory(new AMQConnectionURL(brokerUrl)); + _logger.info("connecting to Qpid :" + brokerUrl); + connection = factory.createConnection(); + + // register exception listener + connection.setExceptionListener(this); + + session = ((AMQConnection) connection).createSession(transacted, ackMode, prefetch); + + + _logger.info("starting connection"); + connection.start(); + + connected = true; + } + catch (URLSyntaxException e) + { + throw new JMSException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage()); + } + } + } + + public void disconnect() throws JMSException + { + if (connected) + { + session.commit(); + session.close(); + connection.close(); + connected = false; + _logger.info("disconnected"); + } + } + + public void disconnectWithoutCommit() throws JMSException + { + if (connected) + { + session.close(); + connection.close(); + connected = false; + _logger.info("disconnected without commit"); + } + } + + public String getBrokerList() + { + return brokerlist; + } + + public void setBrokerList(String brokerlist) + { + this.brokerlist = brokerlist; + } + + public String getVirtualHost() + { + return virtualHost; + } + + public void setVirtualHost(String virtualHost) + { + this.virtualHost = virtualHost; + } + + public void setPrefetch(int prefetch) + { + this.prefetch = prefetch; + } + + + /** override as necessary */ + public void onException(JMSException exception) + { + _logger.info("ExceptionListener event: error " + exception.getErrorCode() + ", message: " + exception.getMessage()); + } + + public boolean isConnected() + { + return connected; + } + + public Session getSession() + { + return session; + } + + /** + * Put a String as a text messages, repeat n times. A null payload will result in a null message. + * + * @param queueName The queue name to put to + * @param payload the content of the payload + * @param copies the number of messages to put + * + * @throws javax.jms.JMSException any exception that occurs + */ + public void put(String queueName, String payload, int copies) throws JMSException + { + if (!connected) + { + connect(); + } + + _logger.info("putting to queue " + queueName); + Queue queue = session.createQueue(queueName); + + final MessageProducer sender = session.createProducer(queue); + + for (int i = 0; i < copies; i++) + { + Message m = session.createTextMessage(payload + i); + m.setIntProperty("index", i + 1); + sender.send(m); + } + + session.commit(); + sender.close(); + _logger.info("put " + copies + " copies"); + } + + /** + * GET the top message on a queue. Consumes the message. Accepts timeout value. + * + * @param queueName The quename to get from + * @param readTimeout The timeout to use + * + * @return the content of the text message if any + * + * @throws javax.jms.JMSException any exception that occured + */ + public Message getNextMessage(String queueName, long readTimeout) throws JMSException + { + if (!connected) + { + connect(); + } + + Queue queue = session.createQueue(queueName); + + final MessageConsumer consumer = session.createConsumer(queue); + + Message message = consumer.receive(readTimeout); + session.commit(); + consumer.close(); + + Message result; + + // all messages we consume should be TextMessages + if (message instanceof TextMessage) + { + result = ((TextMessage) message); + } + else if (null == message) + { + result = null; + } + else + { + _logger.info("warning: received non-text message"); + result = message; + } + + return result; + } + + /** + * GET the top message on a queue. Consumes the message. + * + * @param queueName The Queuename to get from + * + * @return The string content of the text message, if any received + * + * @throws javax.jms.JMSException any exception that occurs + */ + public Message getNextMessage(String queueName) throws JMSException + { + return getNextMessage(queueName, 0); + } + + /** + * Completely clears a queue. For readTimeout behaviour see Javadocs for javax.jms.MessageConsumer. + * + * @param queueName The Queue name to consume from + * @param readTimeout The timeout for each consume + * + * @throws javax.jms.JMSException Any exception that occurs during the consume + * @throws InterruptedException If the consume thread was interrupted during a consume. + */ + public void consume(String queueName, int readTimeout) throws JMSException, InterruptedException + { + if (!connected) + { + connect(); + } + + _logger.info("consuming queue " + queueName); + Queue queue = session.createQueue(queueName); + + final MessageConsumer consumer = session.createConsumer(queue); + int messagesReceived = 0; + + _logger.info("consuming..."); + while ((consumer.receive(readTimeout)) != null) + { + messagesReceived++; + } + + session.commit(); + consumer.close(); + _logger.info("consumed: " + messagesReceived); + } +} |
