summaryrefslogtreecommitdiff
path: root/qpid/java/systests
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/systests')
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java8
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java208
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java5
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PersistentTest.java276
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnection.java268
5 files changed, 759 insertions, 6 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java
index 7c14de23a6..20de0d5df0 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java
@@ -44,9 +44,9 @@ public class AMQBrokerManagerMBeanTest extends TestCase
VirtualHost vHost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test");
ManagedBroker mbean = new AMQBrokerManagerMBean((VirtualHost.VirtualHostMBean)vHost.getManagedObject());
- mbean.createNewExchange(exchange1,"direct",false, false);
- mbean.createNewExchange(exchange2,"topic",false, false);
- mbean.createNewExchange(exchange3,"headers",false, false);
+ mbean.createNewExchange(exchange1,"direct",false);
+ mbean.createNewExchange(exchange2,"topic",false);
+ mbean.createNewExchange(exchange3,"headers",false);
assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange1)) != null);
assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange2)) != null);
@@ -70,7 +70,7 @@ public class AMQBrokerManagerMBeanTest extends TestCase
assertTrue(_queueRegistry.getQueue(new AMQShortString(queueName)) == null);
- mbean.createNewQueue(queueName, "test", false, true);
+ mbean.createNewQueue(queueName, "test", false);
assertTrue(_queueRegistry.getQueue(new AMQShortString(queueName)) != null);
mbean.deleteQueue(queueName);
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java
new file mode 100644
index 0000000000..52eb5414ff
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java
@@ -0,0 +1,208 @@
+package org.apache.qpid.server.failure;
+
+import junit.framework.TestCase;
+import org.apache.qpid.testutil.QpidClientConnection;
+import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.log4j.Logger;
+
+import javax.jms.JMSException;
+import java.io.IOException;
+
+
+/** Test Case provided by client Non-functional Test NF101: heap exhaustion behaviour */
+public class HeapExhaustion extends TestCase
+{
+ private static final Logger _logger = Logger.getLogger(HeapExhaustion.class);
+
+ protected QpidClientConnection conn;
+ protected final String BROKER = "localhost";
+ protected final String vhost = "/test";
+ protected final String queue = "direct://amq.direct//queue";
+
+ protected String hundredK;
+ protected String megabyte;
+
+ protected String generatePayloadOfSize(Integer numBytes)
+ {
+ return new String(new byte[numBytes]);
+ }
+
+ protected void setUp() throws Exception
+ {
+ conn = new QpidClientConnection(BROKER);
+ conn.setVirtualHost(vhost);
+
+ conn.connect();
+ // clear queue
+ _logger.debug("setup: clearing test queue");
+ conn.consume(queue, 2000);
+
+ hundredK = generatePayloadOfSize(1024 * 100);
+ megabyte = generatePayloadOfSize(1024 * 1024);
+ }
+
+ protected void tearDown() throws Exception
+ {
+ conn.disconnect();
+ }
+
+
+ /**
+ * PUT at maximum rate (although we commit after each PUT) until failure
+ *
+ * @throws Exception on error
+ */
+ public void testUntilFailure() throws Exception
+ {
+ int copies = 0;
+ int total = 0;
+ String payload = hundredK;
+ int size = payload.getBytes().length;
+ while (true)
+ {
+ conn.put(queue, payload, 1);
+ copies++;
+ total += size;
+ _logger.info("put copy " + copies + " OK for total bytes: " + total);
+ }
+ }
+
+ /**
+ * PUT at lower rate (5 per second) until failure
+ *
+ * @throws Exception on error
+ */
+ public void testUntilFailureWithDelays() throws Exception
+ {
+ int copies = 0;
+ int total = 0;
+ String payload = hundredK;
+ int size = payload.getBytes().length;
+ while (true)
+ {
+ conn.put(queue, payload, 1);
+ copies++;
+ total += size;
+ _logger.debug("put copy " + copies + " OK for total bytes: " + total);
+ Thread.sleep(200);
+ }
+ }
+
+ public static void noDelay()
+ {
+ HeapExhaustion he = new HeapExhaustion();
+
+ try
+ {
+ he.setUp();
+ }
+ catch (Exception e)
+ {
+ _logger.info("Unable to connect");
+ System.exit(0);
+ }
+
+ try
+ {
+ _logger.info("Running testUntilFailure");
+ try
+ {
+ he.testUntilFailure();
+ }
+ catch (FailoverException fe)
+ {
+ _logger.error("Caught failover:" + fe);
+ }
+ _logger.info("Finishing Connection ");
+
+ try
+ {
+ he.tearDown();
+ }
+ catch (JMSException jmse)
+ {
+ if (((AMQException) jmse.getLinkedException()).getErrorCode() == AMQConstant.REQUEST_TIMEOUT)
+ {
+ _logger.info("Successful test of testUntilFailure");
+ }
+ else
+ {
+ _logger.error("Test Failed due to:" + jmse);
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ _logger.error("Test Failed due to:" + e);
+ }
+ }
+
+ public static void withDelay()
+ {
+ HeapExhaustion he = new HeapExhaustion();
+
+ try
+ {
+ he.setUp();
+ }
+ catch (Exception e)
+ {
+ _logger.info("Unable to connect");
+ System.exit(0);
+ }
+
+ try
+ {
+ _logger.info("Running testUntilFailure");
+ try
+ {
+ he.testUntilFailureWithDelays();
+ }
+ catch (FailoverException fe)
+ {
+ _logger.error("Caught failover:" + fe);
+ }
+ _logger.info("Finishing Connection ");
+
+ try
+ {
+ he.tearDown();
+ }
+ catch (JMSException jmse)
+ {
+ if (((AMQException) jmse.getLinkedException()).getErrorCode() == AMQConstant.REQUEST_TIMEOUT)
+ {
+ _logger.info("Successful test of testUntilFailure");
+ }
+ else
+ {
+ _logger.error("Test Failed due to:" + jmse);
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ _logger.error("Test Failed due to:" + e);
+ }
+ }
+
+ public static void main(String args[])
+ {
+ noDelay();
+
+
+ try
+ {
+ System.out.println("Restart failed broker now to retest broker with delays in send.");
+ System.in.read();
+ }
+ catch (IOException e)
+ {
+ _logger.info("Continuing");
+ }
+
+ withDelay();
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
index 8795adbc55..0ad6502755 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
@@ -35,6 +35,7 @@ import org.apache.qpid.server.store.MessageStore;
import javax.security.sasl.SaslServer;
import java.util.HashMap;
import java.util.Map;
+import java.security.Principal;
/**
* A protocol session that can be used for testing purposes.
@@ -177,12 +178,12 @@ public class MockProtocolSession implements AMQProtocolSession
return ProtocolOutputConverterRegistry.getConverter(this);
}
- public void setAuthorizedID(String authorizedID)
+ public void setAuthorizedID(Principal authorizedID)
{
//To change body of implemented methods use File | Settings | File Templates.
}
- public String getAuthorizedID()
+ public Principal getAuthorizedID()
{
return null; //To change body of implemented methods use File | Settings | File Templates.
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PersistentTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PersistentTest.java
new file mode 100644
index 0000000000..4ad10b68ff
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PersistentTest.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQChannelClosedException;
+import org.apache.qpid.AMQConnectionClosedException;
+import org.apache.qpid.util.CommandLineParser;
+import org.apache.qpid.url.URLSyntaxException;
+import org.apache.log4j.Logger;
+
+import javax.jms.Session;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.MessageProducer;
+import javax.jms.TextMessage;
+import java.io.IOException;
+import java.util.Properties;
+
+public class PersistentTest
+{
+ private static final Logger _logger = Logger.getLogger(PersistentTest.class);
+
+
+ private static final String QUEUE = "direct://amq.direct//PersistentTest-Queue2?durable='true',exclusive='true'";
+
+ protected AMQConnection _connection;
+
+ protected Session _session;
+
+ protected Queue _queue;
+ private Properties properties;
+
+ private String _brokerDetails;
+ private String _username;
+ private String _password;
+ private String _virtualpath;
+
+ public PersistentTest(Properties overrides)
+ {
+ properties = new Properties(defaults);
+ properties.putAll(overrides);
+
+ _brokerDetails = properties.getProperty(BROKER_PROPNAME);
+ _username = properties.getProperty(USERNAME_PROPNAME);
+ _password = properties.getProperty(PASSWORD_PROPNAME);
+ _virtualpath = properties.getProperty(VIRTUAL_HOST_PROPNAME);
+
+ createConnection();
+ }
+
+ protected void createConnection()
+ {
+ try
+ {
+ _connection = new AMQConnection(_brokerDetails, _username, _password, "PersistentTest", _virtualpath);
+
+ _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ _connection.start();
+ }
+ catch (Exception e)
+ {
+ _logger.error("Unable to create test class due to:" + e.getMessage(), e);
+ System.exit(0);
+ }
+ }
+
+ public void test() throws AMQException, URLSyntaxException
+ {
+
+ //Create the Durable Queue
+ try
+ {
+ _session.createConsumer(_session.createQueue(QUEUE)).close();
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Unable to create Queue due to:" + e.getMessage(), e);
+ System.exit(0);
+ }
+
+ try
+ {
+ if (testQueue())
+ {
+ // close connection
+ _connection.close();
+ // wait
+ System.out.println("Restart Broker Now");
+ try
+ {
+ System.in.read();
+ }
+ catch (IOException e)
+ {
+ //
+ }
+ finally
+ {
+ System.out.println("Continuing....");
+ }
+
+ //Test queue is still there.
+ AMQConnection connection = new AMQConnection(_brokerDetails, _username, _password, "DifferentClientID", _virtualpath);
+
+ AMQSession session = (AMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ try
+ {
+ session.createConsumer(session.createQueue(QUEUE));
+ _logger.error("Create consumer succeeded." +
+ " This shouldn't be allowed as this means the queue didn't exist when it should");
+
+ connection.close();
+
+ exit();
+ }
+ catch (JMSException e)
+ {
+ try
+ {
+ connection.close();
+ }
+ catch (JMSException cce)
+ {
+ if (cce.getLinkedException() instanceof AMQConnectionClosedException)
+ {
+ _logger.error("Channel Close Bug still present QPID-432, should see an 'Error closing session'");
+ }
+ else
+ {
+ exit(cce);
+ }
+ }
+
+ if (e.getLinkedException() instanceof AMQChannelClosedException)
+ {
+ _logger.info("AMQChannelClosedException received as expected");
+ }
+ else
+ {
+ exit(e);
+ }
+ }
+ }
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Unable to test Queue due to:" + e.getMessage(), e);
+ System.exit(0);
+ }
+ }
+
+ private void exit(JMSException e)
+ {
+ _logger.error("JMSException received:" + e.getMessage());
+ e.printStackTrace();
+ exit();
+ }
+
+ private void exit()
+ {
+ try
+ {
+ _connection.close();
+ }
+ catch (JMSException e)
+ {
+ //
+ }
+ System.exit(0);
+ }
+
+ private boolean testQueue() throws JMSException
+ {
+ String TEST_TEXT = "init";
+
+ //Create a new session to send producer
+ Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Queue q = session.createQueue(QUEUE);
+ MessageProducer producer = session.createProducer(q);
+
+ producer.send(session.createTextMessage(TEST_TEXT));
+
+ //create a new consumer on the original session
+ TextMessage m = (TextMessage) _session.createConsumer(q).receive();
+
+
+ if ((m != null) && m.getText().equals(TEST_TEXT))
+ {
+ return true;
+ }
+ else
+ {
+ _logger.error("Incorrect values returned from Queue Test:" + m);
+ System.exit(0);
+ return false;
+ }
+ }
+
+ /** Holds the name of the property to get the test broker url from. */
+ public static final String BROKER_PROPNAME = "broker";
+
+ /** Holds the default broker url for the test. */
+ public static final String BROKER_DEFAULT = "tcp://localhost:5672";
+
+ /** Holds the name of the property to get the test broker virtual path. */
+ public static final String VIRTUAL_HOST_PROPNAME = "virtualHost";
+
+ /** Holds the default virtual path for the test. */
+ public static final String VIRTUAL_HOST_DEFAULT = "";
+
+ /** Holds the name of the property to get the broker access username from. */
+ public static final String USERNAME_PROPNAME = "username";
+
+ /** Holds the default broker log on username. */
+ public static final String USERNAME_DEFAULT = "guest";
+
+ /** Holds the name of the property to get the broker access password from. */
+ public static final String PASSWORD_PROPNAME = "password";
+
+ /** Holds the default broker log on password. */
+ public static final String PASSWORD_DEFAULT = "guest";
+
+ /** Holds the default configuration properties. */
+ public static Properties defaults = new Properties();
+
+ static
+ {
+ defaults.setProperty(BROKER_PROPNAME, BROKER_DEFAULT);
+ defaults.setProperty(USERNAME_PROPNAME, USERNAME_DEFAULT);
+ defaults.setProperty(PASSWORD_PROPNAME, PASSWORD_DEFAULT);
+ defaults.setProperty(VIRTUAL_HOST_PROPNAME, VIRTUAL_HOST_DEFAULT);
+ }
+
+ public static void main(String[] args)
+ {
+ PersistentTest test;
+
+ Properties options = CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][]{}));
+
+
+ test = new PersistentTest(options);
+ try
+ {
+ test.test();
+ System.out.println("Test was successfull.");
+ }
+ catch (Exception e)
+ {
+ _logger.error("Unable to test due to:" + e.getMessage(), e);
+ }
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnection.java b/qpid/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnection.java
new file mode 100644
index 0000000000..80773c102d
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnection.java
@@ -0,0 +1,268 @@
+package org.apache.qpid.testutil;
+
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.client.AMQConnectionURL;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.JMSAMQException;
+import org.apache.qpid.url.URLSyntaxException;
+import org.apache.log4j.Logger;
+
+import javax.jms.ExceptionListener;
+import javax.jms.Session;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.MessageProducer;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.TextMessage;
+
+public class QpidClientConnection implements ExceptionListener
+{
+
+ private static final Logger _logger = Logger.getLogger(QpidClientConnection.class);
+
+ private boolean transacted = true;
+ private int ackMode = Session.CLIENT_ACKNOWLEDGE;
+ private Connection connection;
+
+ private String virtualHost;
+ private String brokerlist;
+ private int prefetch;
+ protected Session session;
+ protected boolean connected;
+
+ public QpidClientConnection(String broker)
+ {
+ super();
+ setVirtualHost("/test");
+ setBrokerList(broker);
+ setPrefetch(5000);
+ }
+
+
+ public void connect() throws JMSException
+ {
+ if (!connected)
+ {
+ /*
+ * amqp://[user:pass@][clientid]/virtualhost?
+ * brokerlist='[transport://]host[:port][?option='value'[&option='value']];'
+ * [&failover='method[?option='value'[&option='value']]']
+ * [&option='value']"
+ */
+ String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'";
+ try
+ {
+ AMQConnectionFactory factory = new AMQConnectionFactory(new AMQConnectionURL(brokerUrl));
+ _logger.info("connecting to Qpid :" + brokerUrl);
+ connection = factory.createConnection();
+
+ // register exception listener
+ connection.setExceptionListener(this);
+
+ session = ((AMQConnection) connection).createSession(transacted, ackMode, prefetch);
+
+
+ _logger.info("starting connection");
+ connection.start();
+
+ connected = true;
+ }
+ catch (URLSyntaxException e)
+ {
+ throw new JMSAMQException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage(), e);
+ }
+ }
+ }
+
+ public void disconnect() throws JMSException
+ {
+ if (connected)
+ {
+ session.commit();
+ session.close();
+ connection.close();
+ connected = false;
+ _logger.info("disconnected");
+ }
+ }
+
+ public void disconnectWithoutCommit() throws JMSException
+ {
+ if (connected)
+ {
+ session.close();
+ connection.close();
+ connected = false;
+ _logger.info("disconnected without commit");
+ }
+ }
+
+ public String getBrokerList()
+ {
+ return brokerlist;
+ }
+
+ public void setBrokerList(String brokerlist)
+ {
+ this.brokerlist = brokerlist;
+ }
+
+ public String getVirtualHost()
+ {
+ return virtualHost;
+ }
+
+ public void setVirtualHost(String virtualHost)
+ {
+ this.virtualHost = virtualHost;
+ }
+
+ public void setPrefetch(int prefetch)
+ {
+ this.prefetch = prefetch;
+ }
+
+
+ /** override as necessary */
+ public void onException(JMSException exception)
+ {
+ _logger.info("ExceptionListener event: error " + exception.getErrorCode() + ", message: " + exception.getMessage());
+ }
+
+ public boolean isConnected()
+ {
+ return connected;
+ }
+
+ public Session getSession()
+ {
+ return session;
+ }
+
+ /**
+ * Put a String as a text messages, repeat n times. A null payload will result in a null message.
+ *
+ * @param queueName The queue name to put to
+ * @param payload the content of the payload
+ * @param copies the number of messages to put
+ *
+ * @throws javax.jms.JMSException any exception that occurs
+ */
+ public void put(String queueName, String payload, int copies) throws JMSException
+ {
+ if (!connected)
+ {
+ connect();
+ }
+
+ _logger.info("putting to queue " + queueName);
+ Queue queue = session.createQueue(queueName);
+
+ final MessageProducer sender = session.createProducer(queue);
+
+ for (int i = 0; i < copies; i++)
+ {
+ Message m = session.createTextMessage(payload + i);
+ m.setIntProperty("index", i + 1);
+ sender.send(m);
+ }
+
+ session.commit();
+ sender.close();
+ _logger.info("put " + copies + " copies");
+ }
+
+ /**
+ * GET the top message on a queue. Consumes the message. Accepts timeout value.
+ *
+ * @param queueName The quename to get from
+ * @param readTimeout The timeout to use
+ *
+ * @return the content of the text message if any
+ *
+ * @throws javax.jms.JMSException any exception that occured
+ */
+ public Message getNextMessage(String queueName, long readTimeout) throws JMSException
+ {
+ if (!connected)
+ {
+ connect();
+ }
+
+ Queue queue = session.createQueue(queueName);
+
+ final MessageConsumer consumer = session.createConsumer(queue);
+
+ Message message = consumer.receive(readTimeout);
+ session.commit();
+ consumer.close();
+
+ Message result;
+
+ // all messages we consume should be TextMessages
+ if (message instanceof TextMessage)
+ {
+ result = ((TextMessage) message);
+ }
+ else if (null == message)
+ {
+ result = null;
+ }
+ else
+ {
+ _logger.info("warning: received non-text message");
+ result = message;
+ }
+
+ return result;
+ }
+
+ /**
+ * GET the top message on a queue. Consumes the message.
+ *
+ * @param queueName The Queuename to get from
+ *
+ * @return The string content of the text message, if any received
+ *
+ * @throws javax.jms.JMSException any exception that occurs
+ */
+ public Message getNextMessage(String queueName) throws JMSException
+ {
+ return getNextMessage(queueName, 0);
+ }
+
+ /**
+ * Completely clears a queue. For readTimeout behaviour see Javadocs for javax.jms.MessageConsumer.
+ *
+ * @param queueName The Queue name to consume from
+ * @param readTimeout The timeout for each consume
+ *
+ * @throws javax.jms.JMSException Any exception that occurs during the consume
+ * @throws InterruptedException If the consume thread was interrupted during a consume.
+ */
+ public void consume(String queueName, int readTimeout) throws JMSException, InterruptedException
+ {
+ if (!connected)
+ {
+ connect();
+ }
+
+ _logger.info("consuming queue " + queueName);
+ Queue queue = session.createQueue(queueName);
+
+ final MessageConsumer consumer = session.createConsumer(queue);
+ int messagesReceived = 0;
+
+ _logger.info("consuming...");
+ while ((consumer.receive(readTimeout)) != null)
+ {
+ messagesReceived++;
+ }
+
+ session.commit();
+ consumer.close();
+ _logger.info("consumed: " + messagesReceived);
+ }
+}