diff options
| author | Stephen Vinoski <vinoski@apache.org> | 2006-12-12 04:57:43 +0000 |
|---|---|---|
| committer | Stephen Vinoski <vinoski@apache.org> | 2006-12-12 04:57:43 +0000 |
| commit | 9b45e0ab5d2a719d8ab4e7ec3f23d63ba4966f80 (patch) | |
| tree | 65fdd1a2eb41f382ad72d600d98ee2596ab7506a /java/systests/src/test | |
| parent | 6a03e9214cf9e896bdc2b077975fdd0f1129ce28 (diff) | |
| download | qpid-python-9b45e0ab5d2a719d8ab4e7ec3f23d63ba4966f80.tar.gz | |
systests test reorg
* move unused tests to src/old_test
* modify pom.xml to remove surefire inclusions and exclusions
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@486021 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/systests/src/test')
14 files changed, 30 insertions, 1256 deletions
diff --git a/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTest.java b/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index a7611df55d..93fbab682a 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTest.java +++ b/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -40,7 +40,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; -public class AbstractHeadersExchangeTest extends TestCase +public class AbstractHeadersExchangeTestBase extends TestCase { private final HeadersExchange exchange = new HeadersExchange(); protected final Set<TestQueue> queues = new HashSet<TestQueue>(); diff --git a/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java b/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java deleted file mode 100644 index ff0d58ad69..0000000000 --- a/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java +++ /dev/null @@ -1,184 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.exchange; - -import org.apache.qpid.AMQException; -import org.apache.qpid.server.queue.NoConsumersException; -import org.apache.qpid.server.util.TimedRun; -import org.apache.qpid.server.util.AveragedRun; -import org.apache.qpid.framing.BasicPublishBody; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.ContentBody; - -import java.util.List; - -/** - * Want to vary the number of regsitrations, messages and matches and measure - * the corresponding variance in execution time. - * <p/> - * Each registration will contain the 'All' header, even registrations will - * contain the 'Even' header and odd headers will contain the 'Odd' header. - * In additions each regsitration will have a unique value for the 'Specific' - * header as well. - * <p/> - * Messages can then be routed to all registrations, to even- or odd- registrations - * or to a specific registration. - * - */ -public class HeadersExchangePerformanceTest extends AbstractHeadersExchangeTest -{ - private static enum Mode {ALL, ODD_OR_EVEN, SPECIFIC} - - private final TestQueue[] queues; - private final Mode mode; - - public HeadersExchangePerformanceTest(Mode mode, int registrations) throws AMQException - { - this.mode = mode; - queues = new TestQueue[registrations]; - for (int i = 0; i < queues.length; i++) - { - switch(mode) - { - case ALL: - queues[i] = bind(new FastQueue("Queue" + i), "All"); - break; - case ODD_OR_EVEN: - queues[i] = bind(new FastQueue("Queue" + i), "All", oddOrEven(i)); - break; - case SPECIFIC: - queues[i] = bind(new FastQueue("Queue" + i), "All", oddOrEven(i), "Specific"+ i); - break; - } - } - } - - void sendToAll(int count) throws AMQException - { - send(count, "All=True"); - } - - void sendToOdd(int count) throws AMQException - { - send(count, "All=True", "Odd=True"); - } - - void sendToEven(int count) throws AMQException - { - send(count, "All=True", "Even=True"); - } - - void sendToAllSpecifically(int count) throws AMQException - { - for (int i = 0; i < queues.length; i++) - { - sendToSpecific(count, i); - } - } - - void sendToSpecific(int count, int index) throws AMQException - { - send(count, "All=True", oddOrEven(index) + "=True", "Specific=" + index); - } - - private void send(int count, String... headers) throws AMQException - { - for (int i = 0; i < count; i++) - { - route(new Message("Message" + i, headers)); - } - } - - private static String oddOrEven(int i) - { - return (i % 2 == 0 ? "Even" : "Odd"); - } - - static class FastQueue extends TestQueue - { - - public FastQueue(String name) throws AMQException - { - super(name); - } - - public void deliver(BasicPublishBody publishBody, ContentHeaderBody contentHeaderBody, List<ContentBody> contentBodies) throws NoConsumersException - { - //just discard as we are not testing routing functionality here - } - } - - static class Test extends TimedRun - { - private final Mode mode; - private final int registrations; - private final int count; - private HeadersExchangePerformanceTest test; - - Test(Mode mode, int registrations, int count) - { - super(mode + ", registrations=" + registrations + ", count=" + count); - this.mode = mode; - this.registrations = registrations; - this.count = count; - } - - protected void setup() throws Exception - { - test = new HeadersExchangePerformanceTest(mode, registrations); - run(100); //do a warm up run before times start - } - - protected void teardown() throws Exception - { - test = null; - System.gc(); - } - - protected void run() throws Exception - { - run(count); - } - - private void run(int count) throws Exception - { - switch(mode) - { - case ALL: - test.sendToAll(count); - break; - default: - System.out.println("Test for " + mode + " not yet implemented."); - } - } - } - - public static void main(String[] argv) throws Exception - { - int registrations = Integer.parseInt(argv[0]); - int messages = Integer.parseInt(argv[1]); - int iterations = Integer.parseInt(argv[2]); - TimedRun test = new Test(Mode.ALL, registrations, messages); - AveragedRun tests = new AveragedRun(test, iterations); - System.out.println(tests.call()); - } -} - diff --git a/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java index 1c80e521ca..91520df3bf 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java +++ b/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java @@ -24,7 +24,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.util.TestApplicationRegistry; -public class HeadersExchangeTest extends AbstractHeadersExchangeTest +public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase { protected void setUp() throws Exception { diff --git a/java/systests/src/test/java/org/apache/qpid/server/protocol/TestProtocolInitiation.java b/java/systests/src/test/java/org/apache/qpid/server/protocol/TestProtocolInitiation.java deleted file mode 100644 index e76c164f64..0000000000 --- a/java/systests/src/test/java/org/apache/qpid/server/protocol/TestProtocolInitiation.java +++ /dev/null @@ -1,266 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol; - -import org.apache.qpid.codec.AMQDecoder; -import org.apache.qpid.codec.AMQEncoder; -import org.apache.qpid.framing.*; -import org.apache.mina.common.ByteBuffer; -import org.apache.mina.common.WriteFuture; -import org.apache.mina.filter.codec.ProtocolDecoderOutput; -import org.apache.mina.filter.codec.ProtocolEncoderOutput; -import org.apache.mina.filter.codec.support.SimpleProtocolDecoderOutput; - -import junit.framework.TestCase; - -/** - * This test suite tests the handling of protocol initiation frames and related issues. - */ -public class TestProtocolInitiation extends TestCase implements ProtocolVersionList -{ - private AMQPFastProtocolHandler _protocolHandler; - - private MockIoSession _mockIoSession; - - /** - * We need to use the object encoder mechanism so to allow us to retrieve the - * output (a bytebuffer) we define our own encoder output class. The encoder - * writes the encoded data to this class, from where we can retrieve it during - * the test run. - */ - private class TestProtocolEncoderOutput implements ProtocolEncoderOutput - { - public ByteBuffer result; - - public void write(ByteBuffer buf) - { - result = buf; - } - - public void mergeAll() - { - throw new UnsupportedOperationException(); - } - - public WriteFuture flush() - { - throw new UnsupportedOperationException(); - } - } - - private class TestProtocolDecoderOutput implements ProtocolDecoderOutput - { - public Object result; - - public void write(Object buf) - { - result = buf; - } - - public void flush() - { - throw new UnsupportedOperationException(); - } - } - - protected void setUp() throws Exception - { - super.setUp(); - _mockIoSession = new MockIoSession(); - _protocolHandler = new AMQPFastProtocolHandler(null, null); - } - - - /** - * Tests that the AMQDecoder handles invalid protocol classes - * @throws Exception - */ - public void testDecoderValidateProtocolClass() throws Exception - { - try - { - ProtocolInitiation pi = createValidProtocolInitiation(); - pi.protocolClass = 2; - decodePI(pi); - fail("expected exception did not occur"); - } - catch (AMQProtocolClassException m) - { - // ok - } - catch (Exception e) - { - fail("expected AMQProtocolClassException, got " + e); - } - } - - /** - * Tests that the AMQDecoder handles invalid protocol instance numbers - * @throws Exception - */ - public void testDecoderValidatesProtocolInstance() throws Exception - { - try - { - ProtocolInitiation pi = createValidProtocolInitiation(); - pi.protocolInstance = 2; - decodePI(pi); - fail("expected exception did not occur"); - } - catch (AMQProtocolInstanceException m) - { - // ok - } - catch (Exception e) - { - fail("expected AMQProtocolInstanceException, got " + e); - } - } - - /** - * Tests that the AMQDecoder handles invalid protocol major - * @throws Exception - */ - public void testDecoderValidatesProtocolMajor() throws Exception - { - try - { - ProtocolInitiation pi = createValidProtocolInitiation(); - pi.protocolMajor = 2; - decodePI(pi); - fail("expected exception did not occur"); - } - catch (AMQProtocolVersionException m) - { - // ok - } - catch (Exception e) - { - fail("expected AMQProtocolVersionException, got " + e); - } - } - - /** - * Tests that the AMQDecoder handles invalid protocol minor - * @throws Exception - */ - public void testDecoderValidatesProtocolMinor() throws Exception - { - try - { - ProtocolInitiation pi = createValidProtocolInitiation(); - pi.protocolMinor = 99; - decodePI(pi); - fail("expected exception did not occur"); - } - catch (AMQProtocolVersionException m) - { - // ok - } - catch (Exception e) - { - fail("expected AMQProtocolVersionException, got " + e); - } - } - - /** - * Tests that the AMQDecoder accepts a valid PI - * @throws Exception - */ - public void testDecoderValidatesHeader() throws Exception - { - try - { - ProtocolInitiation pi = createValidProtocolInitiation(); - pi.header = new char[] {'P', 'Q', 'M', 'A' }; - decodePI(pi); - fail("expected exception did not occur"); - } - catch (AMQProtocolHeaderException m) - { - // ok - } - catch (Exception e) - { - fail("expected AMQProtocolHeaderException, got " + e); - } - } - - /** - * Test that a valid header is passed by the decoder. - * @throws Exception - */ - public void testDecoderAcceptsValidHeader() throws Exception - { - ProtocolInitiation pi = createValidProtocolInitiation(); - decodePI(pi); - } - - /** - * This test checks that an invalid protocol header results in the - * connection being closed. - */ - public void testInvalidProtocolHeaderClosesConnection() throws Exception - { - AMQProtocolHeaderException pe = new AMQProtocolHeaderException("Test"); - _protocolHandler.exceptionCaught(_mockIoSession, pe); - assertNotNull(_mockIoSession.getLastWrittenObject()); - Object piResponse = _mockIoSession.getLastWrittenObject(); - assertEquals(piResponse.getClass(), ProtocolInitiation.class); - ProtocolInitiation pi = (ProtocolInitiation) piResponse; - assertEquals("Protocol Initiation sent out was not the broker's expected header", pi, - createValidProtocolInitiation()); - assertTrue("Session has not been closed", _mockIoSession.isClosing()); - } - - private ProtocolInitiation createValidProtocolInitiation() - { - /* Find last protocol version in protocol version list. Make sure last protocol version - listed in the build file (build-module.xml) is the latest version which will be used - here. */ - int i = pv.length - 1; - return new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR]); - } - - /** - * Helper that encodes a protocol initiation and attempts to decode it - * @param pi - * @throws Exception - */ - private void decodePI(ProtocolInitiation pi) throws Exception - { - // we need to do this test at the level of the decoder since we initially only expect PI frames - // so the protocol handler is not set up to know whether it should be expecting a PI frame or - // a different type of frame - AMQDecoder decoder = new AMQDecoder(true); - AMQEncoder encoder = new AMQEncoder(); - TestProtocolEncoderOutput peo = new TestProtocolEncoderOutput(); - encoder.encode(_mockIoSession, pi, peo); - TestProtocolDecoderOutput pdo = new TestProtocolDecoderOutput(); - decoder.decode(_mockIoSession, peo.result, pdo); - ((ProtocolInitiation) pdo.result).checkVersion(this); - } - - public static junit.framework.Test suite() - { - return new junit.framework.TestSuite(TestProtocolInitiation.class); - } -} diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java index a76db6a728..fe8960c872 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java +++ b/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java @@ -36,7 +36,7 @@ public class ConcurrencyTest extends MessageTestHelper private final int numMessages = 1000; - private final List<TestSubscription> _subscribers = new ArrayList<TestSubscription>(); + private final List<SubscriptionTestHelper> _subscribers = new ArrayList<SubscriptionTestHelper>(); private final Set<Subscription> _active = new HashSet<Subscription>(); private final List<AMQMessage> _messages = new ArrayList<AMQMessage>(); private int next = 0;//index to next message to send @@ -91,7 +91,7 @@ public class ConcurrencyTest extends MessageTestHelper { for(int i = 0; i < subscriptions; i++) { - _subscribers.add(new TestSubscription("Subscriber" + i, _received)); + _subscribers.add(new SubscriptionTestHelper("Subscriber" + i, _received)); } } @@ -174,7 +174,7 @@ public class ConcurrencyTest extends MessageTestHelper return random.nextBoolean(); } - private TestSubscription randomSubscriber() + private SubscriptionTestHelper randomSubscriber() { return _subscribers.get(random.nextInt(_subscribers.size())); } diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java index 9cfd9458d1..3631264e5a 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java +++ b/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java @@ -48,8 +48,8 @@ abstract public class DeliveryManagerTest extends MessageTestHelper _mgr.deliver("Me", messages[i]); } - TestSubscription s1 = new TestSubscription("1"); - TestSubscription s2 = new TestSubscription("2"); + SubscriptionTestHelper s1 = new SubscriptionTestHelper("1"); + SubscriptionTestHelper s2 = new SubscriptionTestHelper("2"); _subscriptions.addSubscriber(s1); _subscriptions.addSubscriber(s2); @@ -88,7 +88,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper } int batch = messages.length / 2; - TestSubscription s1 = new TestSubscription("1"); + SubscriptionTestHelper s1 = new SubscriptionTestHelper("1"); _subscriptions.addSubscriber(s1); for (int i = 0; i < batch; i++) @@ -147,7 +147,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper { try { - TestSubscription s = new TestSubscription("A"); + SubscriptionTestHelper s = new SubscriptionTestHelper("A"); _subscriptions.addSubscriber(s); s.setSuspended(true); AMQMessage msg = message(true); diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java deleted file mode 100644 index 11c0026455..0000000000 --- a/java/systests/src/test/java/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.server.util.AveragedRun; -import org.apache.qpid.server.util.ConcurrentTest; - -public class QueueConcurrentPerfTest extends QueuePerfTest -{ - QueueConcurrentPerfTest(Factory factory, int queueCount, int messages) - { - super(factory, queueCount, messages); - } - - public static void main(String[] argv) throws Exception - { - Factory[] factories = new Factory[]{SYNCHRONIZED, CONCURRENT}; - int iterations = 5; - String label = argv.length > 0 ? argv[0]: null; - System.out.println((label == null ? "" : "Label, ") + "Queue Type, No. of Queues, No. of Operations, Avg Time, Min Time, Max Time"); - //vary number of queues: - for(Factory f : factories) - { - run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 100, 10000), iterations), 5)); - run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 1000, 10000), iterations), 5)); - run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 10000, 10000), iterations), 5)); - run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 1000, 1000), iterations), 5)); - run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 1000, 100000), iterations), 5)); - } - } -} diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/QueuePerfTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/QueuePerfTest.java deleted file mode 100644 index 5b3857396d..0000000000 --- a/java/systests/src/test/java/org/apache/qpid/server/queue/QueuePerfTest.java +++ /dev/null @@ -1,258 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.server.util.AveragedRun; -import org.apache.qpid.server.util.TimedRun; -import org.apache.qpid.server.util.RunStats; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; - -public class QueuePerfTest extends TimedRun -{ - private final Factory _factory; - private final int _queueCount; - private final int _messages; - private final String _msg = ""; - private List<Queue<String>> _queues; - - QueuePerfTest(Factory factory, int queueCount, int messages) - { - super(factory + ", " + queueCount + ", " + messages); - _factory = factory; - _queueCount = queueCount; - _messages = messages; - } - - protected void setup() throws Exception - { - //init - int count = Integer.getInteger("prepopulate", 0); -// System.err.println("Prepopulating with " + count + " items"); - _queues = new ArrayList<Queue<String>>(_queueCount); - for (int i = 0; i < _queueCount; i++) - { - Queue<String> q = _factory.create(); - for(int j = 0; j < count; ++j) - { - q.add("Item"+ j); - } - _queues.add(q); - } - System.gc(); - } - - protected void teardown() throws Exception - { - System.gc(); - } - - protected void run() throws Exception - { - //dispatch - for (int i = 0; i < _messages; i++) - { - for (Queue<String> q : _queues) - { - q.offer(_msg); - q.poll(); - } - } - } - - static interface Factory - { - Queue<String> create(); - } - - static Factory CONCURRENT = new Factory() - { - public Queue<String> create() - { - return new ConcurrentLinkedQueue<String>(); - } - - public String toString() - { - return "ConcurrentLinkedQueue"; - } - - }; - - static Factory SYNCHRONIZED = new Factory() - { - public Queue<String> create() - { - return new SynchronizedQueue<String>(new LinkedList<String>()); - } - - - public String toString() - { - return "Synchronized LinkedList"; - } - }; - - static Factory PLAIN = new Factory() - { - public Queue<String> create() - { - return new LinkedList<String>(); - } - - public String toString() - { - return "Plain LinkedList"; - } - }; - - static class SynchronizedQueue<E> implements Queue<E> - { - private final Queue<E> queue; - - SynchronizedQueue(Queue<E> queue) - { - this.queue = queue; - } - - public synchronized E element() - { - return queue.element(); - } - - public synchronized boolean offer(E o) - { - return queue.offer(o); - } - - public synchronized E peek() - { - return queue.peek(); - } - - public synchronized E poll() - { - return queue.poll(); - } - - public synchronized E remove() - { - return queue.remove(); - } - - public synchronized int size() - { - return queue.size(); - } - - public synchronized boolean isEmpty() - { - return queue.isEmpty(); - } - - public synchronized boolean contains(Object o) - { - return queue.contains(o); - } - - public synchronized Iterator<E> iterator() - { - return queue.iterator(); - } - - public synchronized Object[] toArray() - { - return queue.toArray(); - } - - public synchronized <T>T[] toArray(T[] a) - { - return queue.toArray(a); - } - - public synchronized boolean add(E o) - { - return queue.add(o); - } - - public synchronized boolean remove(Object o) - { - return queue.remove(o); - } - - public synchronized boolean containsAll(Collection<?> c) - { - return queue.containsAll(c); - } - - public synchronized boolean addAll(Collection<? extends E> c) - { - return queue.addAll(c); - } - - public synchronized boolean removeAll(Collection<?> c) - { - return queue.removeAll(c); - } - - public synchronized boolean retainAll(Collection<?> c) - { - return queue.retainAll(c); - } - - public synchronized void clear() - { - queue.clear(); - } - } - - static void run(String label, AveragedRun test) throws Exception - { - RunStats stats = test.call(); - System.out.println((label == null ? "" : label + ", ") + test - + ", " + stats.getAverage() + ", " + stats.getMax() + ", " + stats.getMin()); - } - - public static void main(String[] argv) throws Exception - { - Factory[] factories = new Factory[]{PLAIN, SYNCHRONIZED, CONCURRENT}; - int iterations = 5; - String label = argv.length > 0 ? argv[0]: null; - System.out.println((label == null ? "" : "Label, ") + "Queue Type, No. of Queues, No. of Operations, Avg Time, Min Time, Max Time"); - //vary number of queues: - - for(Factory f : factories) - { - run(label, new AveragedRun(new QueuePerfTest(f, 100, 10000), iterations)); - run(label, new AveragedRun(new QueuePerfTest(f, 1000, 10000), iterations)); - run(label, new AveragedRun(new QueuePerfTest(f, 10000, 10000), iterations)); - run(label, new AveragedRun(new QueuePerfTest(f, 1000, 1000), iterations)); - run(label, new AveragedRun(new QueuePerfTest(f, 1000, 100000), iterations)); - } - } - -} diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/SendPerfTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/SendPerfTest.java deleted file mode 100644 index 6490b9f270..0000000000 --- a/java/systests/src/test/java/org/apache/qpid/server/queue/SendPerfTest.java +++ /dev/null @@ -1,174 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.AMQException; -import org.apache.qpid.codec.AMQCodecFactory; -import org.apache.qpid.framing.BasicPublishBody; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.exchange.AbstractExchange; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.handler.OnCurrentThreadExecutor; -import org.apache.qpid.server.protocol.AMQMinaProtocolSession; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.protocol.MockIoSession; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.registry.IApplicationRegistry; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.SkeletonMessageStore; -import org.apache.qpid.server.util.AveragedRun; -import org.apache.qpid.server.util.TestApplicationRegistry; -import org.apache.qpid.server.util.TimedRun; - -import java.util.ArrayList; -import java.util.List; - -public class SendPerfTest extends TimedRun -{ - private int _messages = 1000; - private int _clients = 10; - private List<AMQQueue> _queues; - - public SendPerfTest(int clients, int messages) - { - super("SendPerfTest, msgs=" + messages + ", clients=" + clients); - _messages = messages; - _clients = clients; - } - - protected void setup() throws Exception - { - _queues = initQueues(_clients); - System.gc(); - } - - protected void teardown() throws Exception - { - System.gc(); - } - - protected void run() throws Exception - { - deliver(_messages, _queues); - } - - //have a dummy AMQProtocolSession that does nothing on the writeFrame() - //set up x number of queues - //create necessary bits and pieces to deliver a message - //deliver y messages to each queue - - public static void main(String[] argv) throws Exception - { - ApplicationRegistry.initialise(new TestApplicationRegistry()); - int clients = Integer.parseInt(argv[0]); - int messages = Integer.parseInt(argv[1]); - int iterations = Integer.parseInt(argv[2]); - AveragedRun test = new AveragedRun(new SendPerfTest(clients, messages), iterations); - test.run(); - } - - /** - * Delivers messages to a number of queues. - * @param count the number of messages to deliver - * @param queues the list of queues - * @throws NoConsumersException - */ - static void deliver(int count, List<AMQQueue> queues) throws AMQException - { - BasicPublishBody publish = new BasicPublishBody(); - publish.exchange = new NullExchange().getName(); - ContentHeaderBody header = new ContentHeaderBody(); - List<ContentBody> body = new ArrayList<ContentBody>(); - MessageStore messageStore = new SkeletonMessageStore(); - body.add(new ContentBody()); - for (int i = 0; i < count; i++) - { - for (AMQQueue q : queues) - { - q.deliver(new AMQMessage(messageStore, i, publish, header, body)); - } - } - } - - static List<AMQQueue> initQueues(int number) throws AMQException - { - Exchange exchange = new NullExchange(); - List<AMQQueue> queues = new ArrayList<AMQQueue>(number); - for (int i = 0; i < number; i++) - { - AMQQueue q = createQueue("Queue" + (i + 1)); - q.bind("routingKey", exchange); - try - { - q.registerProtocolSession(createSession(), 1, "1", false); - } - catch (Exception e) - { - throw new AMQException("Error creating protocol session: " + e, e); - } - queues.add(q); - } - return queues; - } - - static AMQQueue createQueue(String name) throws AMQException - { - return new AMQQueue(name, false, null, false, ApplicationRegistry.getInstance().getQueueRegistry(), - new OnCurrentThreadExecutor()); - } - - static AMQProtocolSession createSession() throws Exception - { - IApplicationRegistry reg = ApplicationRegistry.getInstance(); - AMQCodecFactory codecFactory = new AMQCodecFactory(true); - AMQMinaProtocolSession result = new AMQMinaProtocolSession(new MockIoSession(), reg.getQueueRegistry(), reg.getExchangeRegistry(), codecFactory); - result.addChannel(new AMQChannel(1, null, null)); - return result; - } - - static class NullExchange extends AbstractExchange - { - public String getName() - { - return "NullExchange"; - } - - protected ExchangeMBean createMBean() - { - return null; - } - - public void registerQueue(String routingKey, AMQQueue queue, FieldTable args) throws AMQException - { - } - - public void deregisterQueue(String routingKey, AMQQueue queue) throws AMQException - { - } - - public void route(AMQMessage payload) throws AMQException - { - } - } -} diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java index f8db90850f..d3ec3c11d4 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java +++ b/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java @@ -30,12 +30,12 @@ public class SubscriptionManagerTest extends TestCase { assertTrue(mgr.isEmpty()); assertFalse(mgr.hasActiveSubscribers()); - TestSubscription s1 = new TestSubscription("S1"); + SubscriptionTestHelper s1 = new SubscriptionTestHelper("S1"); mgr.addSubscriber(s1); assertFalse(mgr.isEmpty()); assertTrue(mgr.hasActiveSubscribers()); - TestSubscription s2 = new TestSubscription("S2"); + SubscriptionTestHelper s2 = new SubscriptionTestHelper("S2"); mgr.addSubscriber(s2); s2.setSuspended(true); @@ -47,18 +47,18 @@ public class SubscriptionManagerTest extends TestCase s1.setSuspended(true); assertFalse(mgr.hasActiveSubscribers()); - mgr.removeSubscriber(new TestSubscription("S1")); + mgr.removeSubscriber(new SubscriptionTestHelper("S1")); assertFalse(mgr.isEmpty()); - mgr.removeSubscriber(new TestSubscription("S2")); + mgr.removeSubscriber(new SubscriptionTestHelper("S2")); assertTrue(mgr.isEmpty()); } public void testRoundRobin() { - TestSubscription a = new TestSubscription("A"); - TestSubscription b = new TestSubscription("B"); - TestSubscription c = new TestSubscription("C"); - TestSubscription d = new TestSubscription("D"); + SubscriptionTestHelper a = new SubscriptionTestHelper("A"); + SubscriptionTestHelper b = new SubscriptionTestHelper("B"); + SubscriptionTestHelper c = new SubscriptionTestHelper("C"); + SubscriptionTestHelper d = new SubscriptionTestHelper("D"); mgr.addSubscriber(a); mgr.addSubscriber(b); mgr.addSubscriber(c); @@ -84,7 +84,7 @@ public class SubscriptionManagerTest extends TestCase mgr.removeSubscriber(a); d.setSuspended(true); c.setSuspended(false); - Subscription e = new TestSubscription("D"); + Subscription e = new SubscriptionTestHelper("D"); mgr.addSubscriber(e); for (int i = 0; i < 3; i++) diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionSetTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionSetTest.java index 4969b5589a..bcf54693d3 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionSetTest.java +++ b/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionSetTest.java @@ -47,13 +47,13 @@ public class SubscriptionSetTest extends TestCase } } - final TestSubscription sub1 = new TestSubscription("1"); - final TestSubscription sub2 = new TestSubscription("2"); - final TestSubscription sub3 = new TestSubscription("3"); + final SubscriptionTestHelper sub1 = new SubscriptionTestHelper("1"); + final SubscriptionTestHelper sub2 = new SubscriptionTestHelper("2"); + final SubscriptionTestHelper sub3 = new SubscriptionTestHelper("3"); - final TestSubscription suspendedSub1 = new TestSubscription("sus1", true); - final TestSubscription suspendedSub2 = new TestSubscription("sus2", true); - final TestSubscription suspendedSub3 = new TestSubscription("sus3", true); + final SubscriptionTestHelper suspendedSub1 = new SubscriptionTestHelper("sus1", true); + final SubscriptionTestHelper suspendedSub2 = new SubscriptionTestHelper("sus2", true); + final SubscriptionTestHelper suspendedSub3 = new SubscriptionTestHelper("sus3", true); public void testNextMessage() { @@ -114,7 +114,7 @@ public class SubscriptionSetTest extends TestCase public void testNextMessageOverScanning() { TestSubscriptionSet ss = new TestSubscriptionSet(); - TestSubscription sub = new TestSubscription("test"); + SubscriptionTestHelper sub = new SubscriptionTestHelper("test"); ss.addSubscriber(suspendedSub1); ss.addSubscriber(sub); ss.addSubscriber(suspendedSub3); diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/TestSubscription.java b/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java index de6df4bc03..2773c810d2 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/queue/TestSubscription.java +++ b/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java @@ -23,24 +23,24 @@ package org.apache.qpid.server.queue; import java.util.ArrayList; import java.util.List; -public class TestSubscription implements Subscription +public class SubscriptionTestHelper implements Subscription { private final List<AMQMessage> messages; private final Object key; private boolean isSuspended; - public TestSubscription(Object key) + public SubscriptionTestHelper(Object key) { this(key, new ArrayList<AMQMessage>()); } - public TestSubscription(final Object key, final boolean isSuspended) + public SubscriptionTestHelper(final Object key, final boolean isSuspended) { this(key); setSuspended(isSuspended); } - TestSubscription(Object key, List<AMQMessage> messages) + SubscriptionTestHelper(Object key, List<AMQMessage> messages) { this.key = key; this.messages = messages; @@ -77,7 +77,7 @@ public class TestSubscription implements Subscription public boolean equals(Object o) { - return o instanceof TestSubscription && ((TestSubscription) o).key.equals(key); + return o instanceof SubscriptionTestHelper && ((SubscriptionTestHelper) o).key.equals(key); } public String toString() diff --git a/java/systests/src/test/java/org/apache/qpid/server/util/ConcurrentTest.java b/java/systests/src/test/java/org/apache/qpid/server/util/ConcurrentTest.java deleted file mode 100644 index 1ae8d3205d..0000000000 --- a/java/systests/src/test/java/org/apache/qpid/server/util/ConcurrentTest.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.util; - -public class ConcurrentTest extends TimedRun -{ - private final TimedRun _test; - private final Thread[] _threads; - - public ConcurrentTest(TimedRun test, int threads) - { - super(test.toString()); - _test = test; - _threads = new Thread[threads]; - } - - protected void setup() throws Exception - { - _test.setup(); - for(int i = 0; i < _threads.length; i++) - { - _threads[i] = new Thread(new Runner()); - } - } - - protected void teardown() throws Exception - { - _test.teardown(); - } - - protected void run() throws Exception - { - for(Thread t : _threads) - { - t.start(); - } - for(Thread t : _threads) - { - t.join(); - } - } - - private class Runner implements Runnable - { - private Exception error; - - public void run() - { - try - { - _test.run(); - } - catch(Exception e) - { - error = e; - e.printStackTrace(); - } - } - } - -} diff --git a/java/systests/src/test/java/org/apache/qpid/test/unit/ack/DisconnectAndRedeliverTest.java b/java/systests/src/test/java/org/apache/qpid/test/unit/ack/DisconnectAndRedeliverTest.java deleted file mode 100644 index a3e555aac9..0000000000 --- a/java/systests/src/test/java/org/apache/qpid/test/unit/ack/DisconnectAndRedeliverTest.java +++ /dev/null @@ -1,216 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.test.unit.ack; - -import org.apache.log4j.Logger; -import org.apache.log4j.xml.DOMConfigurator; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException; -import org.apache.qpid.client.transport.TransportConnection; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.store.TestableMemoryMessageStore; -import org.apache.qpid.server.util.TestApplicationRegistry; -import org.apache.qpid.test.VMBrokerSetup; - -import javax.jms.*; - -import junit.framework.TestCase; - -public class DisconnectAndRedeliverTest extends TestCase -{ - private static final Logger _logger = Logger.getLogger(DisconnectAndRedeliverTest.class); - - static - { - String workdir = System.getProperty("QPID_WORK"); - if (workdir == null || workdir.equals("")) - { - String tempdir = System.getProperty("java.io.tmpdir"); - System.out.println("QPID_WORK not set using tmp directory: " + tempdir); - System.setProperty("QPID_WORK", tempdir); - } - DOMConfigurator.configure("../broker/etc/log4j.xml"); - } - - protected void setUp() throws Exception - { - super.setUp(); - ApplicationRegistry.initialise(new TestApplicationRegistry(), 1); - } - - protected void tearDown() throws Exception - { - super.tearDown(); - } - - /** - * This tests that when there are unacknowledged messages on a channel they are requeued for delivery when - * the channel is closed. - * - * @throws Exception - */ - public void testDisconnectRedeliversMessages() throws Exception - { - Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); - - TestableMemoryMessageStore store = (TestableMemoryMessageStore) ApplicationRegistry.getInstance().getMessageStore(); - - Session consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE); - AMQQueue queue = new AMQQueue("someQ", "someQ", false, false); - MessageConsumer consumer = consumerSession.createConsumer(queue); - //force synch to ensure the consumer has resulted in a bound queue - ((AMQSession) consumerSession).declareExchangeSynch("amq.direct", "direct"); - - Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test"); - - - Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); - MessageProducer producer = producerSession.createProducer(queue); - - _logger.info("Sending four messages"); - producer.send(producerSession.createTextMessage("msg1")); - producer.send(producerSession.createTextMessage("msg2")); - producer.send(producerSession.createTextMessage("msg3")); - producer.send(producerSession.createTextMessage("msg4")); - - con2.close(); - - _logger.info("Starting connection"); - con.start(); - TextMessage tm = (TextMessage) consumer.receive(); - tm.acknowledge(); - _logger.info("Received and acknowledged first message"); - consumer.receive(); - consumer.receive(); - consumer.receive(); - _logger.info("Received all four messages. About to disconnect and reconnect"); - - con.close(); - con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); - consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); - consumer = consumerSession.createConsumer(queue); - - _logger.info("Starting second consumer connection"); - con.start(); - - tm = (TextMessage) consumer.receive(3000); - assertEquals("msg2", tm.getText()); - - - tm = (TextMessage) consumer.receive(3000); - assertEquals("msg3", tm.getText()); - - - tm = (TextMessage) consumer.receive(3000); - assertEquals("msg4", tm.getText()); - - _logger.info("Received redelivery of three messages. Acknowledging last message"); - tm.acknowledge(); - - con.close(); - - con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); - consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); - consumer = consumerSession.createConsumer(queue); - _logger.info("Starting third consumer connection"); - con.start(); - tm = (TextMessage) consumer.receiveNoWait(); - assertNull(tm); - _logger.info("No messages redelivered as is expected"); - con.close(); - - con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); - consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); - consumer = consumerSession.createConsumer(queue); - _logger.info("Starting fourth consumer connection"); - con.start(); - tm = (TextMessage) consumer.receive(3000); - assertNull(tm); - _logger.info("No messages redelivered as is expected"); - con.close(); - - _logger.info("Actually:" + store.getMessageMap().size()); - // assertTrue(store.getMessageMap().size() == 0); - } - - /** - * Tests that unacknowledged messages are thrown away when the channel is closed and they cannot be - * requeued (due perhaps to the queue being deleted). - * - * @throws Exception - */ - public void testDisconnectWithTransientQueueThrowsAwayMessages() throws Exception - { - - Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); - TestableMemoryMessageStore store = (TestableMemoryMessageStore) ApplicationRegistry.getInstance().getMessageStore(); - Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); - Queue queue = new AMQQueue("someQ", "someQ", false, true); - MessageConsumer consumer = consumerSession.createConsumer(queue); - //force synch to ensure the consumer has resulted in a bound queue - ((AMQSession) consumerSession).declareExchangeSynch("amq.direct", "direct"); - - Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test"); - Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); - MessageProducer producer = producerSession.createProducer(queue); - - _logger.info("Sending four messages"); - producer.send(producerSession.createTextMessage("msg1")); - producer.send(producerSession.createTextMessage("msg2")); - producer.send(producerSession.createTextMessage("msg3")); - producer.send(producerSession.createTextMessage("msg4")); - - con2.close(); - - _logger.info("Starting connection"); - con.start(); - TextMessage tm = (TextMessage) consumer.receive(); - tm.acknowledge(); - _logger.info("Received and acknowledged first message"); - consumer.receive(); - consumer.receive(); - consumer.receive(); - _logger.info("Received all four messages. About to disconnect and reconnect"); - - con.close(); - con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); - consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); - consumer = consumerSession.createConsumer(queue); - - _logger.info("Starting second consumer connection"); - con.start(); - - tm = (TextMessage) consumer.receiveNoWait(); - assertNull(tm); - _logger.info("No messages redelivered as is expected"); - - _logger.info("Actually:" + store.getMessageMap().size()); - assertTrue(store.getMessageMap().size() == 0); - con.close(); - } - - public static junit.framework.Test suite() - { - return new VMBrokerSetup(new junit.framework.TestSuite(DisconnectAndRedeliverTest.class)); - } -} |
