diff options
Diffstat (limited to 'qpid/java/systests')
5 files changed, 759 insertions, 6 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java index 7c14de23a6..20de0d5df0 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java @@ -44,9 +44,9 @@ public class AMQBrokerManagerMBeanTest extends TestCase VirtualHost vHost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"); ManagedBroker mbean = new AMQBrokerManagerMBean((VirtualHost.VirtualHostMBean)vHost.getManagedObject()); - mbean.createNewExchange(exchange1,"direct",false, false); - mbean.createNewExchange(exchange2,"topic",false, false); - mbean.createNewExchange(exchange3,"headers",false, false); + mbean.createNewExchange(exchange1,"direct",false); + mbean.createNewExchange(exchange2,"topic",false); + mbean.createNewExchange(exchange3,"headers",false); assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange1)) != null); assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange2)) != null); @@ -70,7 +70,7 @@ public class AMQBrokerManagerMBeanTest extends TestCase assertTrue(_queueRegistry.getQueue(new AMQShortString(queueName)) == null); - mbean.createNewQueue(queueName, "test", false, true); + mbean.createNewQueue(queueName, "test", false); assertTrue(_queueRegistry.getQueue(new AMQShortString(queueName)) != null); mbean.deleteQueue(queueName); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java new file mode 100644 index 0000000000..52eb5414ff --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java @@ -0,0 +1,208 @@ +package org.apache.qpid.server.failure; + +import junit.framework.TestCase; +import org.apache.qpid.testutil.QpidClientConnection; +import org.apache.qpid.client.failover.FailoverException; +import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.log4j.Logger; + +import javax.jms.JMSException; +import java.io.IOException; + + +/** Test Case provided by client Non-functional Test NF101: heap exhaustion behaviour */ +public class HeapExhaustion extends TestCase +{ + private static final Logger _logger = Logger.getLogger(HeapExhaustion.class); + + protected QpidClientConnection conn; + protected final String BROKER = "localhost"; + protected final String vhost = "/test"; + protected final String queue = "direct://amq.direct//queue"; + + protected String hundredK; + protected String megabyte; + + protected String generatePayloadOfSize(Integer numBytes) + { + return new String(new byte[numBytes]); + } + + protected void setUp() throws Exception + { + conn = new QpidClientConnection(BROKER); + conn.setVirtualHost(vhost); + + conn.connect(); + // clear queue + _logger.debug("setup: clearing test queue"); + conn.consume(queue, 2000); + + hundredK = generatePayloadOfSize(1024 * 100); + megabyte = generatePayloadOfSize(1024 * 1024); + } + + protected void tearDown() throws Exception + { + conn.disconnect(); + } + + + /** + * PUT at maximum rate (although we commit after each PUT) until failure + * + * @throws Exception on error + */ + public void testUntilFailure() throws Exception + { + int copies = 0; + int total = 0; + String payload = hundredK; + int size = payload.getBytes().length; + while (true) + { + conn.put(queue, payload, 1); + copies++; + total += size; + _logger.info("put copy " + copies + " OK for total bytes: " + total); + } + } + + /** + * PUT at lower rate (5 per second) until failure + * + * @throws Exception on error + */ + public void testUntilFailureWithDelays() throws Exception + { + int copies = 0; + int total = 0; + String payload = hundredK; + int size = payload.getBytes().length; + while (true) + { + conn.put(queue, payload, 1); + copies++; + total += size; + _logger.debug("put copy " + copies + " OK for total bytes: " + total); + Thread.sleep(200); + } + } + + public static void noDelay() + { + HeapExhaustion he = new HeapExhaustion(); + + try + { + he.setUp(); + } + catch (Exception e) + { + _logger.info("Unable to connect"); + System.exit(0); + } + + try + { + _logger.info("Running testUntilFailure"); + try + { + he.testUntilFailure(); + } + catch (FailoverException fe) + { + _logger.error("Caught failover:" + fe); + } + _logger.info("Finishing Connection "); + + try + { + he.tearDown(); + } + catch (JMSException jmse) + { + if (((AMQException) jmse.getLinkedException()).getErrorCode() == AMQConstant.REQUEST_TIMEOUT) + { + _logger.info("Successful test of testUntilFailure"); + } + else + { + _logger.error("Test Failed due to:" + jmse); + } + } + } + catch (Exception e) + { + _logger.error("Test Failed due to:" + e); + } + } + + public static void withDelay() + { + HeapExhaustion he = new HeapExhaustion(); + + try + { + he.setUp(); + } + catch (Exception e) + { + _logger.info("Unable to connect"); + System.exit(0); + } + + try + { + _logger.info("Running testUntilFailure"); + try + { + he.testUntilFailureWithDelays(); + } + catch (FailoverException fe) + { + _logger.error("Caught failover:" + fe); + } + _logger.info("Finishing Connection "); + + try + { + he.tearDown(); + } + catch (JMSException jmse) + { + if (((AMQException) jmse.getLinkedException()).getErrorCode() == AMQConstant.REQUEST_TIMEOUT) + { + _logger.info("Successful test of testUntilFailure"); + } + else + { + _logger.error("Test Failed due to:" + jmse); + } + } + } + catch (Exception e) + { + _logger.error("Test Failed due to:" + e); + } + } + + public static void main(String args[]) + { + noDelay(); + + + try + { + System.out.println("Restart failed broker now to retest broker with delays in send."); + System.in.read(); + } + catch (IOException e) + { + _logger.info("Continuing"); + } + + withDelay(); + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java index 8795adbc55..0ad6502755 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java @@ -35,6 +35,7 @@ import org.apache.qpid.server.store.MessageStore; import javax.security.sasl.SaslServer; import java.util.HashMap; import java.util.Map; +import java.security.Principal; /** * A protocol session that can be used for testing purposes. @@ -177,12 +178,12 @@ public class MockProtocolSession implements AMQProtocolSession return ProtocolOutputConverterRegistry.getConverter(this); } - public void setAuthorizedID(String authorizedID) + public void setAuthorizedID(Principal authorizedID) { //To change body of implemented methods use File | Settings | File Templates. } - public String getAuthorizedID() + public Principal getAuthorizedID() { return null; //To change body of implemented methods use File | Settings | File Templates. } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PersistentTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PersistentTest.java new file mode 100644 index 0000000000..4ad10b68ff --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PersistentTest.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQChannelClosedException; +import org.apache.qpid.AMQConnectionClosedException; +import org.apache.qpid.util.CommandLineParser; +import org.apache.qpid.url.URLSyntaxException; +import org.apache.log4j.Logger; + +import javax.jms.Session; +import javax.jms.JMSException; +import javax.jms.Queue; +import javax.jms.MessageProducer; +import javax.jms.TextMessage; +import java.io.IOException; +import java.util.Properties; + +public class PersistentTest +{ + private static final Logger _logger = Logger.getLogger(PersistentTest.class); + + + private static final String QUEUE = "direct://amq.direct//PersistentTest-Queue2?durable='true',exclusive='true'"; + + protected AMQConnection _connection; + + protected Session _session; + + protected Queue _queue; + private Properties properties; + + private String _brokerDetails; + private String _username; + private String _password; + private String _virtualpath; + + public PersistentTest(Properties overrides) + { + properties = new Properties(defaults); + properties.putAll(overrides); + + _brokerDetails = properties.getProperty(BROKER_PROPNAME); + _username = properties.getProperty(USERNAME_PROPNAME); + _password = properties.getProperty(PASSWORD_PROPNAME); + _virtualpath = properties.getProperty(VIRTUAL_HOST_PROPNAME); + + createConnection(); + } + + protected void createConnection() + { + try + { + _connection = new AMQConnection(_brokerDetails, _username, _password, "PersistentTest", _virtualpath); + + _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + _connection.start(); + } + catch (Exception e) + { + _logger.error("Unable to create test class due to:" + e.getMessage(), e); + System.exit(0); + } + } + + public void test() throws AMQException, URLSyntaxException + { + + //Create the Durable Queue + try + { + _session.createConsumer(_session.createQueue(QUEUE)).close(); + } + catch (JMSException e) + { + _logger.error("Unable to create Queue due to:" + e.getMessage(), e); + System.exit(0); + } + + try + { + if (testQueue()) + { + // close connection + _connection.close(); + // wait + System.out.println("Restart Broker Now"); + try + { + System.in.read(); + } + catch (IOException e) + { + // + } + finally + { + System.out.println("Continuing...."); + } + + //Test queue is still there. + AMQConnection connection = new AMQConnection(_brokerDetails, _username, _password, "DifferentClientID", _virtualpath); + + AMQSession session = (AMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + try + { + session.createConsumer(session.createQueue(QUEUE)); + _logger.error("Create consumer succeeded." + + " This shouldn't be allowed as this means the queue didn't exist when it should"); + + connection.close(); + + exit(); + } + catch (JMSException e) + { + try + { + connection.close(); + } + catch (JMSException cce) + { + if (cce.getLinkedException() instanceof AMQConnectionClosedException) + { + _logger.error("Channel Close Bug still present QPID-432, should see an 'Error closing session'"); + } + else + { + exit(cce); + } + } + + if (e.getLinkedException() instanceof AMQChannelClosedException) + { + _logger.info("AMQChannelClosedException received as expected"); + } + else + { + exit(e); + } + } + } + } + catch (JMSException e) + { + _logger.error("Unable to test Queue due to:" + e.getMessage(), e); + System.exit(0); + } + } + + private void exit(JMSException e) + { + _logger.error("JMSException received:" + e.getMessage()); + e.printStackTrace(); + exit(); + } + + private void exit() + { + try + { + _connection.close(); + } + catch (JMSException e) + { + // + } + System.exit(0); + } + + private boolean testQueue() throws JMSException + { + String TEST_TEXT = "init"; + + //Create a new session to send producer + Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Queue q = session.createQueue(QUEUE); + MessageProducer producer = session.createProducer(q); + + producer.send(session.createTextMessage(TEST_TEXT)); + + //create a new consumer on the original session + TextMessage m = (TextMessage) _session.createConsumer(q).receive(); + + + if ((m != null) && m.getText().equals(TEST_TEXT)) + { + return true; + } + else + { + _logger.error("Incorrect values returned from Queue Test:" + m); + System.exit(0); + return false; + } + } + + /** Holds the name of the property to get the test broker url from. */ + public static final String BROKER_PROPNAME = "broker"; + + /** Holds the default broker url for the test. */ + public static final String BROKER_DEFAULT = "tcp://localhost:5672"; + + /** Holds the name of the property to get the test broker virtual path. */ + public static final String VIRTUAL_HOST_PROPNAME = "virtualHost"; + + /** Holds the default virtual path for the test. */ + public static final String VIRTUAL_HOST_DEFAULT = ""; + + /** Holds the name of the property to get the broker access username from. */ + public static final String USERNAME_PROPNAME = "username"; + + /** Holds the default broker log on username. */ + public static final String USERNAME_DEFAULT = "guest"; + + /** Holds the name of the property to get the broker access password from. */ + public static final String PASSWORD_PROPNAME = "password"; + + /** Holds the default broker log on password. */ + public static final String PASSWORD_DEFAULT = "guest"; + + /** Holds the default configuration properties. */ + public static Properties defaults = new Properties(); + + static + { + defaults.setProperty(BROKER_PROPNAME, BROKER_DEFAULT); + defaults.setProperty(USERNAME_PROPNAME, USERNAME_DEFAULT); + defaults.setProperty(PASSWORD_PROPNAME, PASSWORD_DEFAULT); + defaults.setProperty(VIRTUAL_HOST_PROPNAME, VIRTUAL_HOST_DEFAULT); + } + + public static void main(String[] args) + { + PersistentTest test; + + Properties options = CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][]{})); + + + test = new PersistentTest(options); + try + { + test.test(); + System.out.println("Test was successfull."); + } + catch (Exception e) + { + _logger.error("Unable to test due to:" + e.getMessage(), e); + } + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnection.java b/qpid/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnection.java new file mode 100644 index 0000000000..80773c102d --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnection.java @@ -0,0 +1,268 @@ +package org.apache.qpid.testutil; + +import org.apache.qpid.client.AMQConnectionFactory; +import org.apache.qpid.client.AMQConnectionURL; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.JMSAMQException; +import org.apache.qpid.url.URLSyntaxException; +import org.apache.log4j.Logger; + +import javax.jms.ExceptionListener; +import javax.jms.Session; +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Queue; +import javax.jms.MessageProducer; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.TextMessage; + +public class QpidClientConnection implements ExceptionListener +{ + + private static final Logger _logger = Logger.getLogger(QpidClientConnection.class); + + private boolean transacted = true; + private int ackMode = Session.CLIENT_ACKNOWLEDGE; + private Connection connection; + + private String virtualHost; + private String brokerlist; + private int prefetch; + protected Session session; + protected boolean connected; + + public QpidClientConnection(String broker) + { + super(); + setVirtualHost("/test"); + setBrokerList(broker); + setPrefetch(5000); + } + + + public void connect() throws JMSException + { + if (!connected) + { + /* + * amqp://[user:pass@][clientid]/virtualhost? + * brokerlist='[transport://]host[:port][?option='value'[&option='value']];' + * [&failover='method[?option='value'[&option='value']]'] + * [&option='value']" + */ + String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'"; + try + { + AMQConnectionFactory factory = new AMQConnectionFactory(new AMQConnectionURL(brokerUrl)); + _logger.info("connecting to Qpid :" + brokerUrl); + connection = factory.createConnection(); + + // register exception listener + connection.setExceptionListener(this); + + session = ((AMQConnection) connection).createSession(transacted, ackMode, prefetch); + + + _logger.info("starting connection"); + connection.start(); + + connected = true; + } + catch (URLSyntaxException e) + { + throw new JMSAMQException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage(), e); + } + } + } + + public void disconnect() throws JMSException + { + if (connected) + { + session.commit(); + session.close(); + connection.close(); + connected = false; + _logger.info("disconnected"); + } + } + + public void disconnectWithoutCommit() throws JMSException + { + if (connected) + { + session.close(); + connection.close(); + connected = false; + _logger.info("disconnected without commit"); + } + } + + public String getBrokerList() + { + return brokerlist; + } + + public void setBrokerList(String brokerlist) + { + this.brokerlist = brokerlist; + } + + public String getVirtualHost() + { + return virtualHost; + } + + public void setVirtualHost(String virtualHost) + { + this.virtualHost = virtualHost; + } + + public void setPrefetch(int prefetch) + { + this.prefetch = prefetch; + } + + + /** override as necessary */ + public void onException(JMSException exception) + { + _logger.info("ExceptionListener event: error " + exception.getErrorCode() + ", message: " + exception.getMessage()); + } + + public boolean isConnected() + { + return connected; + } + + public Session getSession() + { + return session; + } + + /** + * Put a String as a text messages, repeat n times. A null payload will result in a null message. + * + * @param queueName The queue name to put to + * @param payload the content of the payload + * @param copies the number of messages to put + * + * @throws javax.jms.JMSException any exception that occurs + */ + public void put(String queueName, String payload, int copies) throws JMSException + { + if (!connected) + { + connect(); + } + + _logger.info("putting to queue " + queueName); + Queue queue = session.createQueue(queueName); + + final MessageProducer sender = session.createProducer(queue); + + for (int i = 0; i < copies; i++) + { + Message m = session.createTextMessage(payload + i); + m.setIntProperty("index", i + 1); + sender.send(m); + } + + session.commit(); + sender.close(); + _logger.info("put " + copies + " copies"); + } + + /** + * GET the top message on a queue. Consumes the message. Accepts timeout value. + * + * @param queueName The quename to get from + * @param readTimeout The timeout to use + * + * @return the content of the text message if any + * + * @throws javax.jms.JMSException any exception that occured + */ + public Message getNextMessage(String queueName, long readTimeout) throws JMSException + { + if (!connected) + { + connect(); + } + + Queue queue = session.createQueue(queueName); + + final MessageConsumer consumer = session.createConsumer(queue); + + Message message = consumer.receive(readTimeout); + session.commit(); + consumer.close(); + + Message result; + + // all messages we consume should be TextMessages + if (message instanceof TextMessage) + { + result = ((TextMessage) message); + } + else if (null == message) + { + result = null; + } + else + { + _logger.info("warning: received non-text message"); + result = message; + } + + return result; + } + + /** + * GET the top message on a queue. Consumes the message. + * + * @param queueName The Queuename to get from + * + * @return The string content of the text message, if any received + * + * @throws javax.jms.JMSException any exception that occurs + */ + public Message getNextMessage(String queueName) throws JMSException + { + return getNextMessage(queueName, 0); + } + + /** + * Completely clears a queue. For readTimeout behaviour see Javadocs for javax.jms.MessageConsumer. + * + * @param queueName The Queue name to consume from + * @param readTimeout The timeout for each consume + * + * @throws javax.jms.JMSException Any exception that occurs during the consume + * @throws InterruptedException If the consume thread was interrupted during a consume. + */ + public void consume(String queueName, int readTimeout) throws JMSException, InterruptedException + { + if (!connected) + { + connect(); + } + + _logger.info("consuming queue " + queueName); + Queue queue = session.createQueue(queueName); + + final MessageConsumer consumer = session.createConsumer(queue); + int messagesReceived = 0; + + _logger.info("consuming..."); + while ((consumer.receive(readTimeout)) != null) + { + messagesReceived++; + } + + session.commit(); + consumer.close(); + _logger.info("consumed: " + messagesReceived); + } +} |
