From 9b45e0ab5d2a719d8ab4e7ec3f23d63ba4966f80 Mon Sep 17 00:00:00 2001 From: Stephen Vinoski Date: Tue, 12 Dec 2006 04:57:43 +0000 Subject: systests test reorg * move unused tests to src/old_test * modify pom.xml to remove surefire inclusions and exclusions git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@486021 13f79535-47bb-0310-9956-ffa450edef68 --- java/systests/pom.xml | 10 - .../exchange/HeadersExchangePerformanceTest.java | 184 ++++++++++++++ .../server/protocol/TestProtocolInitiation.java | 266 +++++++++++++++++++++ .../qpid/server/queue/QueueConcurrentPerfTest.java | 49 ++++ .../apache/qpid/server/queue/QueuePerfTest.java | 258 ++++++++++++++++++++ .../org/apache/qpid/server/queue/SendPerfTest.java | 174 ++++++++++++++ .../apache/qpid/server/util/ConcurrentTest.java | 79 ++++++ .../test/unit/ack/DisconnectAndRedeliverTest.java | 216 +++++++++++++++++ .../exchange/AbstractHeadersExchangeTest.java | 214 ----------------- .../exchange/AbstractHeadersExchangeTestBase.java | 214 +++++++++++++++++ .../exchange/HeadersExchangePerformanceTest.java | 184 -------------- .../qpid/server/exchange/HeadersExchangeTest.java | 2 +- .../server/protocol/TestProtocolInitiation.java | 266 --------------------- .../apache/qpid/server/queue/ConcurrencyTest.java | 6 +- .../qpid/server/queue/DeliveryManagerTest.java | 8 +- .../qpid/server/queue/QueueConcurrentPerfTest.java | 49 ---- .../apache/qpid/server/queue/QueuePerfTest.java | 258 -------------------- .../org/apache/qpid/server/queue/SendPerfTest.java | 174 -------------- .../qpid/server/queue/SubscriptionManagerTest.java | 18 +- .../qpid/server/queue/SubscriptionSetTest.java | 14 +- .../qpid/server/queue/SubscriptionTestHelper.java | 87 +++++++ .../apache/qpid/server/queue/TestSubscription.java | 87 ------- .../apache/qpid/server/util/ConcurrentTest.java | 79 ------ .../test/unit/ack/DisconnectAndRedeliverTest.java | 216 ----------------- 24 files changed, 1551 insertions(+), 1561 deletions(-) create mode 100644 java/systests/src/old_test/java/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java create mode 100644 java/systests/src/old_test/java/org/apache/qpid/server/protocol/TestProtocolInitiation.java create mode 100644 java/systests/src/old_test/java/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java create mode 100644 java/systests/src/old_test/java/org/apache/qpid/server/queue/QueuePerfTest.java create mode 100644 java/systests/src/old_test/java/org/apache/qpid/server/queue/SendPerfTest.java create mode 100644 java/systests/src/old_test/java/org/apache/qpid/server/util/ConcurrentTest.java create mode 100644 java/systests/src/old_test/java/org/apache/qpid/test/unit/ack/DisconnectAndRedeliverTest.java delete mode 100644 java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTest.java create mode 100644 java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java delete mode 100644 java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java delete mode 100644 java/systests/src/test/java/org/apache/qpid/server/protocol/TestProtocolInitiation.java delete mode 100644 java/systests/src/test/java/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java delete mode 100644 java/systests/src/test/java/org/apache/qpid/server/queue/QueuePerfTest.java delete mode 100644 java/systests/src/test/java/org/apache/qpid/server/queue/SendPerfTest.java create mode 100644 java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java delete mode 100644 java/systests/src/test/java/org/apache/qpid/server/queue/TestSubscription.java delete mode 100644 java/systests/src/test/java/org/apache/qpid/server/util/ConcurrentTest.java delete mode 100644 java/systests/src/test/java/org/apache/qpid/test/unit/ack/DisconnectAndRedeliverTest.java (limited to 'java') diff --git a/java/systests/pom.xml b/java/systests/pom.xml index bf0afa7eef..93c8a2333b 100644 --- a/java/systests/pom.xml +++ b/java/systests/pom.xml @@ -84,16 +84,6 @@ file:///${basedir}/src/test/java/log4j.properties - - **/server/**/*Test.java - **/test/unit/ack/DisconnectAndRedeliver.java - - - **/Abstract*Test* - **/*PerfTest* - **/*PerformanceTest* - **/server/util/ConcurrentTest.java - diff --git a/java/systests/src/old_test/java/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java b/java/systests/src/old_test/java/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java new file mode 100644 index 0000000000..ff0d58ad69 --- /dev/null +++ b/java/systests/src/old_test/java/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java @@ -0,0 +1,184 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.exchange; + +import org.apache.qpid.AMQException; +import org.apache.qpid.server.queue.NoConsumersException; +import org.apache.qpid.server.util.TimedRun; +import org.apache.qpid.server.util.AveragedRun; +import org.apache.qpid.framing.BasicPublishBody; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.ContentBody; + +import java.util.List; + +/** + * Want to vary the number of regsitrations, messages and matches and measure + * the corresponding variance in execution time. + *

+ * Each registration will contain the 'All' header, even registrations will + * contain the 'Even' header and odd headers will contain the 'Odd' header. + * In additions each regsitration will have a unique value for the 'Specific' + * header as well. + *

+ * Messages can then be routed to all registrations, to even- or odd- registrations + * or to a specific registration. + * + */ +public class HeadersExchangePerformanceTest extends AbstractHeadersExchangeTest +{ + private static enum Mode {ALL, ODD_OR_EVEN, SPECIFIC} + + private final TestQueue[] queues; + private final Mode mode; + + public HeadersExchangePerformanceTest(Mode mode, int registrations) throws AMQException + { + this.mode = mode; + queues = new TestQueue[registrations]; + for (int i = 0; i < queues.length; i++) + { + switch(mode) + { + case ALL: + queues[i] = bind(new FastQueue("Queue" + i), "All"); + break; + case ODD_OR_EVEN: + queues[i] = bind(new FastQueue("Queue" + i), "All", oddOrEven(i)); + break; + case SPECIFIC: + queues[i] = bind(new FastQueue("Queue" + i), "All", oddOrEven(i), "Specific"+ i); + break; + } + } + } + + void sendToAll(int count) throws AMQException + { + send(count, "All=True"); + } + + void sendToOdd(int count) throws AMQException + { + send(count, "All=True", "Odd=True"); + } + + void sendToEven(int count) throws AMQException + { + send(count, "All=True", "Even=True"); + } + + void sendToAllSpecifically(int count) throws AMQException + { + for (int i = 0; i < queues.length; i++) + { + sendToSpecific(count, i); + } + } + + void sendToSpecific(int count, int index) throws AMQException + { + send(count, "All=True", oddOrEven(index) + "=True", "Specific=" + index); + } + + private void send(int count, String... headers) throws AMQException + { + for (int i = 0; i < count; i++) + { + route(new Message("Message" + i, headers)); + } + } + + private static String oddOrEven(int i) + { + return (i % 2 == 0 ? "Even" : "Odd"); + } + + static class FastQueue extends TestQueue + { + + public FastQueue(String name) throws AMQException + { + super(name); + } + + public void deliver(BasicPublishBody publishBody, ContentHeaderBody contentHeaderBody, List contentBodies) throws NoConsumersException + { + //just discard as we are not testing routing functionality here + } + } + + static class Test extends TimedRun + { + private final Mode mode; + private final int registrations; + private final int count; + private HeadersExchangePerformanceTest test; + + Test(Mode mode, int registrations, int count) + { + super(mode + ", registrations=" + registrations + ", count=" + count); + this.mode = mode; + this.registrations = registrations; + this.count = count; + } + + protected void setup() throws Exception + { + test = new HeadersExchangePerformanceTest(mode, registrations); + run(100); //do a warm up run before times start + } + + protected void teardown() throws Exception + { + test = null; + System.gc(); + } + + protected void run() throws Exception + { + run(count); + } + + private void run(int count) throws Exception + { + switch(mode) + { + case ALL: + test.sendToAll(count); + break; + default: + System.out.println("Test for " + mode + " not yet implemented."); + } + } + } + + public static void main(String[] argv) throws Exception + { + int registrations = Integer.parseInt(argv[0]); + int messages = Integer.parseInt(argv[1]); + int iterations = Integer.parseInt(argv[2]); + TimedRun test = new Test(Mode.ALL, registrations, messages); + AveragedRun tests = new AveragedRun(test, iterations); + System.out.println(tests.call()); + } +} + diff --git a/java/systests/src/old_test/java/org/apache/qpid/server/protocol/TestProtocolInitiation.java b/java/systests/src/old_test/java/org/apache/qpid/server/protocol/TestProtocolInitiation.java new file mode 100644 index 0000000000..e76c164f64 --- /dev/null +++ b/java/systests/src/old_test/java/org/apache/qpid/server/protocol/TestProtocolInitiation.java @@ -0,0 +1,266 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol; + +import org.apache.qpid.codec.AMQDecoder; +import org.apache.qpid.codec.AMQEncoder; +import org.apache.qpid.framing.*; +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.WriteFuture; +import org.apache.mina.filter.codec.ProtocolDecoderOutput; +import org.apache.mina.filter.codec.ProtocolEncoderOutput; +import org.apache.mina.filter.codec.support.SimpleProtocolDecoderOutput; + +import junit.framework.TestCase; + +/** + * This test suite tests the handling of protocol initiation frames and related issues. + */ +public class TestProtocolInitiation extends TestCase implements ProtocolVersionList +{ + private AMQPFastProtocolHandler _protocolHandler; + + private MockIoSession _mockIoSession; + + /** + * We need to use the object encoder mechanism so to allow us to retrieve the + * output (a bytebuffer) we define our own encoder output class. The encoder + * writes the encoded data to this class, from where we can retrieve it during + * the test run. + */ + private class TestProtocolEncoderOutput implements ProtocolEncoderOutput + { + public ByteBuffer result; + + public void write(ByteBuffer buf) + { + result = buf; + } + + public void mergeAll() + { + throw new UnsupportedOperationException(); + } + + public WriteFuture flush() + { + throw new UnsupportedOperationException(); + } + } + + private class TestProtocolDecoderOutput implements ProtocolDecoderOutput + { + public Object result; + + public void write(Object buf) + { + result = buf; + } + + public void flush() + { + throw new UnsupportedOperationException(); + } + } + + protected void setUp() throws Exception + { + super.setUp(); + _mockIoSession = new MockIoSession(); + _protocolHandler = new AMQPFastProtocolHandler(null, null); + } + + + /** + * Tests that the AMQDecoder handles invalid protocol classes + * @throws Exception + */ + public void testDecoderValidateProtocolClass() throws Exception + { + try + { + ProtocolInitiation pi = createValidProtocolInitiation(); + pi.protocolClass = 2; + decodePI(pi); + fail("expected exception did not occur"); + } + catch (AMQProtocolClassException m) + { + // ok + } + catch (Exception e) + { + fail("expected AMQProtocolClassException, got " + e); + } + } + + /** + * Tests that the AMQDecoder handles invalid protocol instance numbers + * @throws Exception + */ + public void testDecoderValidatesProtocolInstance() throws Exception + { + try + { + ProtocolInitiation pi = createValidProtocolInitiation(); + pi.protocolInstance = 2; + decodePI(pi); + fail("expected exception did not occur"); + } + catch (AMQProtocolInstanceException m) + { + // ok + } + catch (Exception e) + { + fail("expected AMQProtocolInstanceException, got " + e); + } + } + + /** + * Tests that the AMQDecoder handles invalid protocol major + * @throws Exception + */ + public void testDecoderValidatesProtocolMajor() throws Exception + { + try + { + ProtocolInitiation pi = createValidProtocolInitiation(); + pi.protocolMajor = 2; + decodePI(pi); + fail("expected exception did not occur"); + } + catch (AMQProtocolVersionException m) + { + // ok + } + catch (Exception e) + { + fail("expected AMQProtocolVersionException, got " + e); + } + } + + /** + * Tests that the AMQDecoder handles invalid protocol minor + * @throws Exception + */ + public void testDecoderValidatesProtocolMinor() throws Exception + { + try + { + ProtocolInitiation pi = createValidProtocolInitiation(); + pi.protocolMinor = 99; + decodePI(pi); + fail("expected exception did not occur"); + } + catch (AMQProtocolVersionException m) + { + // ok + } + catch (Exception e) + { + fail("expected AMQProtocolVersionException, got " + e); + } + } + + /** + * Tests that the AMQDecoder accepts a valid PI + * @throws Exception + */ + public void testDecoderValidatesHeader() throws Exception + { + try + { + ProtocolInitiation pi = createValidProtocolInitiation(); + pi.header = new char[] {'P', 'Q', 'M', 'A' }; + decodePI(pi); + fail("expected exception did not occur"); + } + catch (AMQProtocolHeaderException m) + { + // ok + } + catch (Exception e) + { + fail("expected AMQProtocolHeaderException, got " + e); + } + } + + /** + * Test that a valid header is passed by the decoder. + * @throws Exception + */ + public void testDecoderAcceptsValidHeader() throws Exception + { + ProtocolInitiation pi = createValidProtocolInitiation(); + decodePI(pi); + } + + /** + * This test checks that an invalid protocol header results in the + * connection being closed. + */ + public void testInvalidProtocolHeaderClosesConnection() throws Exception + { + AMQProtocolHeaderException pe = new AMQProtocolHeaderException("Test"); + _protocolHandler.exceptionCaught(_mockIoSession, pe); + assertNotNull(_mockIoSession.getLastWrittenObject()); + Object piResponse = _mockIoSession.getLastWrittenObject(); + assertEquals(piResponse.getClass(), ProtocolInitiation.class); + ProtocolInitiation pi = (ProtocolInitiation) piResponse; + assertEquals("Protocol Initiation sent out was not the broker's expected header", pi, + createValidProtocolInitiation()); + assertTrue("Session has not been closed", _mockIoSession.isClosing()); + } + + private ProtocolInitiation createValidProtocolInitiation() + { + /* Find last protocol version in protocol version list. Make sure last protocol version + listed in the build file (build-module.xml) is the latest version which will be used + here. */ + int i = pv.length - 1; + return new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR]); + } + + /** + * Helper that encodes a protocol initiation and attempts to decode it + * @param pi + * @throws Exception + */ + private void decodePI(ProtocolInitiation pi) throws Exception + { + // we need to do this test at the level of the decoder since we initially only expect PI frames + // so the protocol handler is not set up to know whether it should be expecting a PI frame or + // a different type of frame + AMQDecoder decoder = new AMQDecoder(true); + AMQEncoder encoder = new AMQEncoder(); + TestProtocolEncoderOutput peo = new TestProtocolEncoderOutput(); + encoder.encode(_mockIoSession, pi, peo); + TestProtocolDecoderOutput pdo = new TestProtocolDecoderOutput(); + decoder.decode(_mockIoSession, peo.result, pdo); + ((ProtocolInitiation) pdo.result).checkVersion(this); + } + + public static junit.framework.Test suite() + { + return new junit.framework.TestSuite(TestProtocolInitiation.class); + } +} diff --git a/java/systests/src/old_test/java/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java b/java/systests/src/old_test/java/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java new file mode 100644 index 0000000000..11c0026455 --- /dev/null +++ b/java/systests/src/old_test/java/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java @@ -0,0 +1,49 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.server.util.AveragedRun; +import org.apache.qpid.server.util.ConcurrentTest; + +public class QueueConcurrentPerfTest extends QueuePerfTest +{ + QueueConcurrentPerfTest(Factory factory, int queueCount, int messages) + { + super(factory, queueCount, messages); + } + + public static void main(String[] argv) throws Exception + { + Factory[] factories = new Factory[]{SYNCHRONIZED, CONCURRENT}; + int iterations = 5; + String label = argv.length > 0 ? argv[0]: null; + System.out.println((label == null ? "" : "Label, ") + "Queue Type, No. of Queues, No. of Operations, Avg Time, Min Time, Max Time"); + //vary number of queues: + for(Factory f : factories) + { + run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 100, 10000), iterations), 5)); + run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 1000, 10000), iterations), 5)); + run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 10000, 10000), iterations), 5)); + run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 1000, 1000), iterations), 5)); + run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 1000, 100000), iterations), 5)); + } + } +} diff --git a/java/systests/src/old_test/java/org/apache/qpid/server/queue/QueuePerfTest.java b/java/systests/src/old_test/java/org/apache/qpid/server/queue/QueuePerfTest.java new file mode 100644 index 0000000000..5b3857396d --- /dev/null +++ b/java/systests/src/old_test/java/org/apache/qpid/server/queue/QueuePerfTest.java @@ -0,0 +1,258 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.server.util.AveragedRun; +import org.apache.qpid.server.util.TimedRun; +import org.apache.qpid.server.util.RunStats; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +public class QueuePerfTest extends TimedRun +{ + private final Factory _factory; + private final int _queueCount; + private final int _messages; + private final String _msg = ""; + private List> _queues; + + QueuePerfTest(Factory factory, int queueCount, int messages) + { + super(factory + ", " + queueCount + ", " + messages); + _factory = factory; + _queueCount = queueCount; + _messages = messages; + } + + protected void setup() throws Exception + { + //init + int count = Integer.getInteger("prepopulate", 0); +// System.err.println("Prepopulating with " + count + " items"); + _queues = new ArrayList>(_queueCount); + for (int i = 0; i < _queueCount; i++) + { + Queue q = _factory.create(); + for(int j = 0; j < count; ++j) + { + q.add("Item"+ j); + } + _queues.add(q); + } + System.gc(); + } + + protected void teardown() throws Exception + { + System.gc(); + } + + protected void run() throws Exception + { + //dispatch + for (int i = 0; i < _messages; i++) + { + for (Queue q : _queues) + { + q.offer(_msg); + q.poll(); + } + } + } + + static interface Factory + { + Queue create(); + } + + static Factory CONCURRENT = new Factory() + { + public Queue create() + { + return new ConcurrentLinkedQueue(); + } + + public String toString() + { + return "ConcurrentLinkedQueue"; + } + + }; + + static Factory SYNCHRONIZED = new Factory() + { + public Queue create() + { + return new SynchronizedQueue(new LinkedList()); + } + + + public String toString() + { + return "Synchronized LinkedList"; + } + }; + + static Factory PLAIN = new Factory() + { + public Queue create() + { + return new LinkedList(); + } + + public String toString() + { + return "Plain LinkedList"; + } + }; + + static class SynchronizedQueue implements Queue + { + private final Queue queue; + + SynchronizedQueue(Queue queue) + { + this.queue = queue; + } + + public synchronized E element() + { + return queue.element(); + } + + public synchronized boolean offer(E o) + { + return queue.offer(o); + } + + public synchronized E peek() + { + return queue.peek(); + } + + public synchronized E poll() + { + return queue.poll(); + } + + public synchronized E remove() + { + return queue.remove(); + } + + public synchronized int size() + { + return queue.size(); + } + + public synchronized boolean isEmpty() + { + return queue.isEmpty(); + } + + public synchronized boolean contains(Object o) + { + return queue.contains(o); + } + + public synchronized Iterator iterator() + { + return queue.iterator(); + } + + public synchronized Object[] toArray() + { + return queue.toArray(); + } + + public synchronized T[] toArray(T[] a) + { + return queue.toArray(a); + } + + public synchronized boolean add(E o) + { + return queue.add(o); + } + + public synchronized boolean remove(Object o) + { + return queue.remove(o); + } + + public synchronized boolean containsAll(Collection c) + { + return queue.containsAll(c); + } + + public synchronized boolean addAll(Collection c) + { + return queue.addAll(c); + } + + public synchronized boolean removeAll(Collection c) + { + return queue.removeAll(c); + } + + public synchronized boolean retainAll(Collection c) + { + return queue.retainAll(c); + } + + public synchronized void clear() + { + queue.clear(); + } + } + + static void run(String label, AveragedRun test) throws Exception + { + RunStats stats = test.call(); + System.out.println((label == null ? "" : label + ", ") + test + + ", " + stats.getAverage() + ", " + stats.getMax() + ", " + stats.getMin()); + } + + public static void main(String[] argv) throws Exception + { + Factory[] factories = new Factory[]{PLAIN, SYNCHRONIZED, CONCURRENT}; + int iterations = 5; + String label = argv.length > 0 ? argv[0]: null; + System.out.println((label == null ? "" : "Label, ") + "Queue Type, No. of Queues, No. of Operations, Avg Time, Min Time, Max Time"); + //vary number of queues: + + for(Factory f : factories) + { + run(label, new AveragedRun(new QueuePerfTest(f, 100, 10000), iterations)); + run(label, new AveragedRun(new QueuePerfTest(f, 1000, 10000), iterations)); + run(label, new AveragedRun(new QueuePerfTest(f, 10000, 10000), iterations)); + run(label, new AveragedRun(new QueuePerfTest(f, 1000, 1000), iterations)); + run(label, new AveragedRun(new QueuePerfTest(f, 1000, 100000), iterations)); + } + } + +} diff --git a/java/systests/src/old_test/java/org/apache/qpid/server/queue/SendPerfTest.java b/java/systests/src/old_test/java/org/apache/qpid/server/queue/SendPerfTest.java new file mode 100644 index 0000000000..6490b9f270 --- /dev/null +++ b/java/systests/src/old_test/java/org/apache/qpid/server/queue/SendPerfTest.java @@ -0,0 +1,174 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.AMQException; +import org.apache.qpid.codec.AMQCodecFactory; +import org.apache.qpid.framing.BasicPublishBody; +import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.exchange.AbstractExchange; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.handler.OnCurrentThreadExecutor; +import org.apache.qpid.server.protocol.AMQMinaProtocolSession; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.protocol.MockIoSession; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.SkeletonMessageStore; +import org.apache.qpid.server.util.AveragedRun; +import org.apache.qpid.server.util.TestApplicationRegistry; +import org.apache.qpid.server.util.TimedRun; + +import java.util.ArrayList; +import java.util.List; + +public class SendPerfTest extends TimedRun +{ + private int _messages = 1000; + private int _clients = 10; + private List _queues; + + public SendPerfTest(int clients, int messages) + { + super("SendPerfTest, msgs=" + messages + ", clients=" + clients); + _messages = messages; + _clients = clients; + } + + protected void setup() throws Exception + { + _queues = initQueues(_clients); + System.gc(); + } + + protected void teardown() throws Exception + { + System.gc(); + } + + protected void run() throws Exception + { + deliver(_messages, _queues); + } + + //have a dummy AMQProtocolSession that does nothing on the writeFrame() + //set up x number of queues + //create necessary bits and pieces to deliver a message + //deliver y messages to each queue + + public static void main(String[] argv) throws Exception + { + ApplicationRegistry.initialise(new TestApplicationRegistry()); + int clients = Integer.parseInt(argv[0]); + int messages = Integer.parseInt(argv[1]); + int iterations = Integer.parseInt(argv[2]); + AveragedRun test = new AveragedRun(new SendPerfTest(clients, messages), iterations); + test.run(); + } + + /** + * Delivers messages to a number of queues. + * @param count the number of messages to deliver + * @param queues the list of queues + * @throws NoConsumersException + */ + static void deliver(int count, List queues) throws AMQException + { + BasicPublishBody publish = new BasicPublishBody(); + publish.exchange = new NullExchange().getName(); + ContentHeaderBody header = new ContentHeaderBody(); + List body = new ArrayList(); + MessageStore messageStore = new SkeletonMessageStore(); + body.add(new ContentBody()); + for (int i = 0; i < count; i++) + { + for (AMQQueue q : queues) + { + q.deliver(new AMQMessage(messageStore, i, publish, header, body)); + } + } + } + + static List initQueues(int number) throws AMQException + { + Exchange exchange = new NullExchange(); + List queues = new ArrayList(number); + for (int i = 0; i < number; i++) + { + AMQQueue q = createQueue("Queue" + (i + 1)); + q.bind("routingKey", exchange); + try + { + q.registerProtocolSession(createSession(), 1, "1", false); + } + catch (Exception e) + { + throw new AMQException("Error creating protocol session: " + e, e); + } + queues.add(q); + } + return queues; + } + + static AMQQueue createQueue(String name) throws AMQException + { + return new AMQQueue(name, false, null, false, ApplicationRegistry.getInstance().getQueueRegistry(), + new OnCurrentThreadExecutor()); + } + + static AMQProtocolSession createSession() throws Exception + { + IApplicationRegistry reg = ApplicationRegistry.getInstance(); + AMQCodecFactory codecFactory = new AMQCodecFactory(true); + AMQMinaProtocolSession result = new AMQMinaProtocolSession(new MockIoSession(), reg.getQueueRegistry(), reg.getExchangeRegistry(), codecFactory); + result.addChannel(new AMQChannel(1, null, null)); + return result; + } + + static class NullExchange extends AbstractExchange + { + public String getName() + { + return "NullExchange"; + } + + protected ExchangeMBean createMBean() + { + return null; + } + + public void registerQueue(String routingKey, AMQQueue queue, FieldTable args) throws AMQException + { + } + + public void deregisterQueue(String routingKey, AMQQueue queue) throws AMQException + { + } + + public void route(AMQMessage payload) throws AMQException + { + } + } +} diff --git a/java/systests/src/old_test/java/org/apache/qpid/server/util/ConcurrentTest.java b/java/systests/src/old_test/java/org/apache/qpid/server/util/ConcurrentTest.java new file mode 100644 index 0000000000..1ae8d3205d --- /dev/null +++ b/java/systests/src/old_test/java/org/apache/qpid/server/util/ConcurrentTest.java @@ -0,0 +1,79 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.util; + +public class ConcurrentTest extends TimedRun +{ + private final TimedRun _test; + private final Thread[] _threads; + + public ConcurrentTest(TimedRun test, int threads) + { + super(test.toString()); + _test = test; + _threads = new Thread[threads]; + } + + protected void setup() throws Exception + { + _test.setup(); + for(int i = 0; i < _threads.length; i++) + { + _threads[i] = new Thread(new Runner()); + } + } + + protected void teardown() throws Exception + { + _test.teardown(); + } + + protected void run() throws Exception + { + for(Thread t : _threads) + { + t.start(); + } + for(Thread t : _threads) + { + t.join(); + } + } + + private class Runner implements Runnable + { + private Exception error; + + public void run() + { + try + { + _test.run(); + } + catch(Exception e) + { + error = e; + e.printStackTrace(); + } + } + } + +} diff --git a/java/systests/src/old_test/java/org/apache/qpid/test/unit/ack/DisconnectAndRedeliverTest.java b/java/systests/src/old_test/java/org/apache/qpid/test/unit/ack/DisconnectAndRedeliverTest.java new file mode 100644 index 0000000000..a3e555aac9 --- /dev/null +++ b/java/systests/src/old_test/java/org/apache/qpid/test/unit/ack/DisconnectAndRedeliverTest.java @@ -0,0 +1,216 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.ack; + +import org.apache.log4j.Logger; +import org.apache.log4j.xml.DOMConfigurator; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.util.TestApplicationRegistry; +import org.apache.qpid.test.VMBrokerSetup; + +import javax.jms.*; + +import junit.framework.TestCase; + +public class DisconnectAndRedeliverTest extends TestCase +{ + private static final Logger _logger = Logger.getLogger(DisconnectAndRedeliverTest.class); + + static + { + String workdir = System.getProperty("QPID_WORK"); + if (workdir == null || workdir.equals("")) + { + String tempdir = System.getProperty("java.io.tmpdir"); + System.out.println("QPID_WORK not set using tmp directory: " + tempdir); + System.setProperty("QPID_WORK", tempdir); + } + DOMConfigurator.configure("../broker/etc/log4j.xml"); + } + + protected void setUp() throws Exception + { + super.setUp(); + ApplicationRegistry.initialise(new TestApplicationRegistry(), 1); + } + + protected void tearDown() throws Exception + { + super.tearDown(); + } + + /** + * This tests that when there are unacknowledged messages on a channel they are requeued for delivery when + * the channel is closed. + * + * @throws Exception + */ + public void testDisconnectRedeliversMessages() throws Exception + { + Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); + + TestableMemoryMessageStore store = (TestableMemoryMessageStore) ApplicationRegistry.getInstance().getMessageStore(); + + Session consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE); + AMQQueue queue = new AMQQueue("someQ", "someQ", false, false); + MessageConsumer consumer = consumerSession.createConsumer(queue); + //force synch to ensure the consumer has resulted in a bound queue + ((AMQSession) consumerSession).declareExchangeSynch("amq.direct", "direct"); + + Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test"); + + + Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageProducer producer = producerSession.createProducer(queue); + + _logger.info("Sending four messages"); + producer.send(producerSession.createTextMessage("msg1")); + producer.send(producerSession.createTextMessage("msg2")); + producer.send(producerSession.createTextMessage("msg3")); + producer.send(producerSession.createTextMessage("msg4")); + + con2.close(); + + _logger.info("Starting connection"); + con.start(); + TextMessage tm = (TextMessage) consumer.receive(); + tm.acknowledge(); + _logger.info("Received and acknowledged first message"); + consumer.receive(); + consumer.receive(); + consumer.receive(); + _logger.info("Received all four messages. About to disconnect and reconnect"); + + con.close(); + con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); + consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); + consumer = consumerSession.createConsumer(queue); + + _logger.info("Starting second consumer connection"); + con.start(); + + tm = (TextMessage) consumer.receive(3000); + assertEquals("msg2", tm.getText()); + + + tm = (TextMessage) consumer.receive(3000); + assertEquals("msg3", tm.getText()); + + + tm = (TextMessage) consumer.receive(3000); + assertEquals("msg4", tm.getText()); + + _logger.info("Received redelivery of three messages. Acknowledging last message"); + tm.acknowledge(); + + con.close(); + + con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); + consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); + consumer = consumerSession.createConsumer(queue); + _logger.info("Starting third consumer connection"); + con.start(); + tm = (TextMessage) consumer.receiveNoWait(); + assertNull(tm); + _logger.info("No messages redelivered as is expected"); + con.close(); + + con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); + consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); + consumer = consumerSession.createConsumer(queue); + _logger.info("Starting fourth consumer connection"); + con.start(); + tm = (TextMessage) consumer.receive(3000); + assertNull(tm); + _logger.info("No messages redelivered as is expected"); + con.close(); + + _logger.info("Actually:" + store.getMessageMap().size()); + // assertTrue(store.getMessageMap().size() == 0); + } + + /** + * Tests that unacknowledged messages are thrown away when the channel is closed and they cannot be + * requeued (due perhaps to the queue being deleted). + * + * @throws Exception + */ + public void testDisconnectWithTransientQueueThrowsAwayMessages() throws Exception + { + + Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); + TestableMemoryMessageStore store = (TestableMemoryMessageStore) ApplicationRegistry.getInstance().getMessageStore(); + Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Queue queue = new AMQQueue("someQ", "someQ", false, true); + MessageConsumer consumer = consumerSession.createConsumer(queue); + //force synch to ensure the consumer has resulted in a bound queue + ((AMQSession) consumerSession).declareExchangeSynch("amq.direct", "direct"); + + Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test"); + Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageProducer producer = producerSession.createProducer(queue); + + _logger.info("Sending four messages"); + producer.send(producerSession.createTextMessage("msg1")); + producer.send(producerSession.createTextMessage("msg2")); + producer.send(producerSession.createTextMessage("msg3")); + producer.send(producerSession.createTextMessage("msg4")); + + con2.close(); + + _logger.info("Starting connection"); + con.start(); + TextMessage tm = (TextMessage) consumer.receive(); + tm.acknowledge(); + _logger.info("Received and acknowledged first message"); + consumer.receive(); + consumer.receive(); + consumer.receive(); + _logger.info("Received all four messages. About to disconnect and reconnect"); + + con.close(); + con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); + consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); + consumer = consumerSession.createConsumer(queue); + + _logger.info("Starting second consumer connection"); + con.start(); + + tm = (TextMessage) consumer.receiveNoWait(); + assertNull(tm); + _logger.info("No messages redelivered as is expected"); + + _logger.info("Actually:" + store.getMessageMap().size()); + assertTrue(store.getMessageMap().size() == 0); + con.close(); + } + + public static junit.framework.Test suite() + { + return new VMBrokerSetup(new junit.framework.TestSuite(DisconnectAndRedeliverTest.class)); + } +} diff --git a/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTest.java b/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTest.java deleted file mode 100644 index a7611df55d..0000000000 --- a/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTest.java +++ /dev/null @@ -1,214 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.exchange; - -import junit.framework.TestCase; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.BasicPublishBody; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.FieldTableFactory; -import org.apache.qpid.server.queue.AMQMessage; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.SkeletonMessageStore; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -public class AbstractHeadersExchangeTest extends TestCase -{ - private final HeadersExchange exchange = new HeadersExchange(); - protected final Set queues = new HashSet(); - private int count; - - protected TestQueue bindDefault(String... bindings) throws AMQException - { - return bind("Queue" + (++count), bindings); - } - - protected TestQueue bind(String queueName, String... bindings) throws AMQException - { - return bind(queueName, getHeaders(bindings)); - } - - protected TestQueue bind(String queue, FieldTable bindings) throws AMQException - { - return bind(new TestQueue(queue), bindings); - } - - protected TestQueue bind(TestQueue queue, String... bindings) throws AMQException - { - return bind(queue, getHeaders(bindings)); - } - - protected TestQueue bind(TestQueue queue, FieldTable bindings) throws AMQException - { - queues.add(queue); - exchange.registerQueue(null, queue, bindings); - return queue; - } - - - protected void route(Message m) throws AMQException - { - m.route(exchange); - } - - protected void routeAndTest(Message m, TestQueue... expected) throws AMQException - { - routeAndTest(m, Arrays.asList(expected)); - } - - protected void routeAndTest(Message m, List expected) throws AMQException - { - route(m); - for (TestQueue q : queues) - { - if (expected.contains(q)) - { - assertTrue("Expected " + m + " to be delivered to " + q, m.isInQueue(q)); - //assert m.isInQueue(q) : "Expected " + m + " to be delivered to " + q; - } - else - { - assertFalse("Did not expect " + m + " to be delivered to " + q, m.isInQueue(q)); - //assert !m.isInQueue(q) : "Did not expect " + m + " to be delivered to " + q; - } - } - } - - static FieldTable getHeaders(String... entries) - { - FieldTable headers = FieldTableFactory.newFieldTable(); - for (String s : entries) - { - String[] parts = s.split("=", 2); - headers.put(parts[0], parts.length > 1 ? parts[1] : ""); - } - return headers; - } - - static BasicPublishBody getPublishRequest(String id) - { - BasicPublishBody request = new BasicPublishBody(); - request.routingKey = id; - return request; - } - - static ContentHeaderBody getContentHeader(FieldTable headers) - { - ContentHeaderBody header = new ContentHeaderBody(); - header.properties = getProperties(headers); - return header; - } - - static BasicContentHeaderProperties getProperties(FieldTable headers) - { - BasicContentHeaderProperties properties = new BasicContentHeaderProperties(); - properties.setHeaders(headers); - return properties; - } - - static class TestQueue extends AMQQueue - { - final List messages = new ArrayList(); - - public TestQueue(String name) throws AMQException - { - super(name, false, "test", true, ApplicationRegistry.getInstance().getQueueRegistry()); - } - - public void deliver(AMQMessage msg) throws AMQException - { - messages.add(new HeadersExchangeTest.Message(msg)); - } - } - - /** - * Just add some extra utility methods to AMQMessage to aid testing. - */ - static class Message extends AMQMessage - { - private static MessageStore _messageStore = new SkeletonMessageStore(); - - Message(String id, String... headers) throws AMQException - { - this(id, getHeaders(headers)); - } - - Message(String id, FieldTable headers) throws AMQException - { - this(getPublishRequest(id), getContentHeader(headers), null); - } - - private Message(BasicPublishBody publish, ContentHeaderBody header, List bodies) throws AMQException - { - super(_messageStore, publish, header, bodies); - } - - private Message(AMQMessage msg) throws AMQException - { - super(msg); - } - - void route(Exchange exchange) throws AMQException - { - exchange.route(this); - } - - boolean isInQueue(TestQueue queue) - { - return queue.messages.contains(this); - } - - public int hashCode() - { - return getKey().hashCode(); - } - - public boolean equals(Object o) - { - return o instanceof HeadersExchangeTest.Message && equals((HeadersExchangeTest.Message) o); - } - - private boolean equals(HeadersExchangeTest.Message m) - { - return getKey().equals(m.getKey()); - } - - public String toString() - { - return getKey().toString(); - } - - private Object getKey() - { - return getPublishBody().routingKey; - } - } -} diff --git a/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java new file mode 100644 index 0000000000..93fbab682a --- /dev/null +++ b/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -0,0 +1,214 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.exchange; + +import junit.framework.TestCase; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.BasicPublishBody; +import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.FieldTableFactory; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.SkeletonMessageStore; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class AbstractHeadersExchangeTestBase extends TestCase +{ + private final HeadersExchange exchange = new HeadersExchange(); + protected final Set queues = new HashSet(); + private int count; + + protected TestQueue bindDefault(String... bindings) throws AMQException + { + return bind("Queue" + (++count), bindings); + } + + protected TestQueue bind(String queueName, String... bindings) throws AMQException + { + return bind(queueName, getHeaders(bindings)); + } + + protected TestQueue bind(String queue, FieldTable bindings) throws AMQException + { + return bind(new TestQueue(queue), bindings); + } + + protected TestQueue bind(TestQueue queue, String... bindings) throws AMQException + { + return bind(queue, getHeaders(bindings)); + } + + protected TestQueue bind(TestQueue queue, FieldTable bindings) throws AMQException + { + queues.add(queue); + exchange.registerQueue(null, queue, bindings); + return queue; + } + + + protected void route(Message m) throws AMQException + { + m.route(exchange); + } + + protected void routeAndTest(Message m, TestQueue... expected) throws AMQException + { + routeAndTest(m, Arrays.asList(expected)); + } + + protected void routeAndTest(Message m, List expected) throws AMQException + { + route(m); + for (TestQueue q : queues) + { + if (expected.contains(q)) + { + assertTrue("Expected " + m + " to be delivered to " + q, m.isInQueue(q)); + //assert m.isInQueue(q) : "Expected " + m + " to be delivered to " + q; + } + else + { + assertFalse("Did not expect " + m + " to be delivered to " + q, m.isInQueue(q)); + //assert !m.isInQueue(q) : "Did not expect " + m + " to be delivered to " + q; + } + } + } + + static FieldTable getHeaders(String... entries) + { + FieldTable headers = FieldTableFactory.newFieldTable(); + for (String s : entries) + { + String[] parts = s.split("=", 2); + headers.put(parts[0], parts.length > 1 ? parts[1] : ""); + } + return headers; + } + + static BasicPublishBody getPublishRequest(String id) + { + BasicPublishBody request = new BasicPublishBody(); + request.routingKey = id; + return request; + } + + static ContentHeaderBody getContentHeader(FieldTable headers) + { + ContentHeaderBody header = new ContentHeaderBody(); + header.properties = getProperties(headers); + return header; + } + + static BasicContentHeaderProperties getProperties(FieldTable headers) + { + BasicContentHeaderProperties properties = new BasicContentHeaderProperties(); + properties.setHeaders(headers); + return properties; + } + + static class TestQueue extends AMQQueue + { + final List messages = new ArrayList(); + + public TestQueue(String name) throws AMQException + { + super(name, false, "test", true, ApplicationRegistry.getInstance().getQueueRegistry()); + } + + public void deliver(AMQMessage msg) throws AMQException + { + messages.add(new HeadersExchangeTest.Message(msg)); + } + } + + /** + * Just add some extra utility methods to AMQMessage to aid testing. + */ + static class Message extends AMQMessage + { + private static MessageStore _messageStore = new SkeletonMessageStore(); + + Message(String id, String... headers) throws AMQException + { + this(id, getHeaders(headers)); + } + + Message(String id, FieldTable headers) throws AMQException + { + this(getPublishRequest(id), getContentHeader(headers), null); + } + + private Message(BasicPublishBody publish, ContentHeaderBody header, List bodies) throws AMQException + { + super(_messageStore, publish, header, bodies); + } + + private Message(AMQMessage msg) throws AMQException + { + super(msg); + } + + void route(Exchange exchange) throws AMQException + { + exchange.route(this); + } + + boolean isInQueue(TestQueue queue) + { + return queue.messages.contains(this); + } + + public int hashCode() + { + return getKey().hashCode(); + } + + public boolean equals(Object o) + { + return o instanceof HeadersExchangeTest.Message && equals((HeadersExchangeTest.Message) o); + } + + private boolean equals(HeadersExchangeTest.Message m) + { + return getKey().equals(m.getKey()); + } + + public String toString() + { + return getKey().toString(); + } + + private Object getKey() + { + return getPublishBody().routingKey; + } + } +} diff --git a/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java b/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java deleted file mode 100644 index ff0d58ad69..0000000000 --- a/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java +++ /dev/null @@ -1,184 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.exchange; - -import org.apache.qpid.AMQException; -import org.apache.qpid.server.queue.NoConsumersException; -import org.apache.qpid.server.util.TimedRun; -import org.apache.qpid.server.util.AveragedRun; -import org.apache.qpid.framing.BasicPublishBody; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.ContentBody; - -import java.util.List; - -/** - * Want to vary the number of regsitrations, messages and matches and measure - * the corresponding variance in execution time. - *

- * Each registration will contain the 'All' header, even registrations will - * contain the 'Even' header and odd headers will contain the 'Odd' header. - * In additions each regsitration will have a unique value for the 'Specific' - * header as well. - *

- * Messages can then be routed to all registrations, to even- or odd- registrations - * or to a specific registration. - * - */ -public class HeadersExchangePerformanceTest extends AbstractHeadersExchangeTest -{ - private static enum Mode {ALL, ODD_OR_EVEN, SPECIFIC} - - private final TestQueue[] queues; - private final Mode mode; - - public HeadersExchangePerformanceTest(Mode mode, int registrations) throws AMQException - { - this.mode = mode; - queues = new TestQueue[registrations]; - for (int i = 0; i < queues.length; i++) - { - switch(mode) - { - case ALL: - queues[i] = bind(new FastQueue("Queue" + i), "All"); - break; - case ODD_OR_EVEN: - queues[i] = bind(new FastQueue("Queue" + i), "All", oddOrEven(i)); - break; - case SPECIFIC: - queues[i] = bind(new FastQueue("Queue" + i), "All", oddOrEven(i), "Specific"+ i); - break; - } - } - } - - void sendToAll(int count) throws AMQException - { - send(count, "All=True"); - } - - void sendToOdd(int count) throws AMQException - { - send(count, "All=True", "Odd=True"); - } - - void sendToEven(int count) throws AMQException - { - send(count, "All=True", "Even=True"); - } - - void sendToAllSpecifically(int count) throws AMQException - { - for (int i = 0; i < queues.length; i++) - { - sendToSpecific(count, i); - } - } - - void sendToSpecific(int count, int index) throws AMQException - { - send(count, "All=True", oddOrEven(index) + "=True", "Specific=" + index); - } - - private void send(int count, String... headers) throws AMQException - { - for (int i = 0; i < count; i++) - { - route(new Message("Message" + i, headers)); - } - } - - private static String oddOrEven(int i) - { - return (i % 2 == 0 ? "Even" : "Odd"); - } - - static class FastQueue extends TestQueue - { - - public FastQueue(String name) throws AMQException - { - super(name); - } - - public void deliver(BasicPublishBody publishBody, ContentHeaderBody contentHeaderBody, List contentBodies) throws NoConsumersException - { - //just discard as we are not testing routing functionality here - } - } - - static class Test extends TimedRun - { - private final Mode mode; - private final int registrations; - private final int count; - private HeadersExchangePerformanceTest test; - - Test(Mode mode, int registrations, int count) - { - super(mode + ", registrations=" + registrations + ", count=" + count); - this.mode = mode; - this.registrations = registrations; - this.count = count; - } - - protected void setup() throws Exception - { - test = new HeadersExchangePerformanceTest(mode, registrations); - run(100); //do a warm up run before times start - } - - protected void teardown() throws Exception - { - test = null; - System.gc(); - } - - protected void run() throws Exception - { - run(count); - } - - private void run(int count) throws Exception - { - switch(mode) - { - case ALL: - test.sendToAll(count); - break; - default: - System.out.println("Test for " + mode + " not yet implemented."); - } - } - } - - public static void main(String[] argv) throws Exception - { - int registrations = Integer.parseInt(argv[0]); - int messages = Integer.parseInt(argv[1]); - int iterations = Integer.parseInt(argv[2]); - TimedRun test = new Test(Mode.ALL, registrations, messages); - AveragedRun tests = new AveragedRun(test, iterations); - System.out.println(tests.call()); - } -} - diff --git a/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java index 1c80e521ca..91520df3bf 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java +++ b/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java @@ -24,7 +24,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.util.TestApplicationRegistry; -public class HeadersExchangeTest extends AbstractHeadersExchangeTest +public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase { protected void setUp() throws Exception { diff --git a/java/systests/src/test/java/org/apache/qpid/server/protocol/TestProtocolInitiation.java b/java/systests/src/test/java/org/apache/qpid/server/protocol/TestProtocolInitiation.java deleted file mode 100644 index e76c164f64..0000000000 --- a/java/systests/src/test/java/org/apache/qpid/server/protocol/TestProtocolInitiation.java +++ /dev/null @@ -1,266 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol; - -import org.apache.qpid.codec.AMQDecoder; -import org.apache.qpid.codec.AMQEncoder; -import org.apache.qpid.framing.*; -import org.apache.mina.common.ByteBuffer; -import org.apache.mina.common.WriteFuture; -import org.apache.mina.filter.codec.ProtocolDecoderOutput; -import org.apache.mina.filter.codec.ProtocolEncoderOutput; -import org.apache.mina.filter.codec.support.SimpleProtocolDecoderOutput; - -import junit.framework.TestCase; - -/** - * This test suite tests the handling of protocol initiation frames and related issues. - */ -public class TestProtocolInitiation extends TestCase implements ProtocolVersionList -{ - private AMQPFastProtocolHandler _protocolHandler; - - private MockIoSession _mockIoSession; - - /** - * We need to use the object encoder mechanism so to allow us to retrieve the - * output (a bytebuffer) we define our own encoder output class. The encoder - * writes the encoded data to this class, from where we can retrieve it during - * the test run. - */ - private class TestProtocolEncoderOutput implements ProtocolEncoderOutput - { - public ByteBuffer result; - - public void write(ByteBuffer buf) - { - result = buf; - } - - public void mergeAll() - { - throw new UnsupportedOperationException(); - } - - public WriteFuture flush() - { - throw new UnsupportedOperationException(); - } - } - - private class TestProtocolDecoderOutput implements ProtocolDecoderOutput - { - public Object result; - - public void write(Object buf) - { - result = buf; - } - - public void flush() - { - throw new UnsupportedOperationException(); - } - } - - protected void setUp() throws Exception - { - super.setUp(); - _mockIoSession = new MockIoSession(); - _protocolHandler = new AMQPFastProtocolHandler(null, null); - } - - - /** - * Tests that the AMQDecoder handles invalid protocol classes - * @throws Exception - */ - public void testDecoderValidateProtocolClass() throws Exception - { - try - { - ProtocolInitiation pi = createValidProtocolInitiation(); - pi.protocolClass = 2; - decodePI(pi); - fail("expected exception did not occur"); - } - catch (AMQProtocolClassException m) - { - // ok - } - catch (Exception e) - { - fail("expected AMQProtocolClassException, got " + e); - } - } - - /** - * Tests that the AMQDecoder handles invalid protocol instance numbers - * @throws Exception - */ - public void testDecoderValidatesProtocolInstance() throws Exception - { - try - { - ProtocolInitiation pi = createValidProtocolInitiation(); - pi.protocolInstance = 2; - decodePI(pi); - fail("expected exception did not occur"); - } - catch (AMQProtocolInstanceException m) - { - // ok - } - catch (Exception e) - { - fail("expected AMQProtocolInstanceException, got " + e); - } - } - - /** - * Tests that the AMQDecoder handles invalid protocol major - * @throws Exception - */ - public void testDecoderValidatesProtocolMajor() throws Exception - { - try - { - ProtocolInitiation pi = createValidProtocolInitiation(); - pi.protocolMajor = 2; - decodePI(pi); - fail("expected exception did not occur"); - } - catch (AMQProtocolVersionException m) - { - // ok - } - catch (Exception e) - { - fail("expected AMQProtocolVersionException, got " + e); - } - } - - /** - * Tests that the AMQDecoder handles invalid protocol minor - * @throws Exception - */ - public void testDecoderValidatesProtocolMinor() throws Exception - { - try - { - ProtocolInitiation pi = createValidProtocolInitiation(); - pi.protocolMinor = 99; - decodePI(pi); - fail("expected exception did not occur"); - } - catch (AMQProtocolVersionException m) - { - // ok - } - catch (Exception e) - { - fail("expected AMQProtocolVersionException, got " + e); - } - } - - /** - * Tests that the AMQDecoder accepts a valid PI - * @throws Exception - */ - public void testDecoderValidatesHeader() throws Exception - { - try - { - ProtocolInitiation pi = createValidProtocolInitiation(); - pi.header = new char[] {'P', 'Q', 'M', 'A' }; - decodePI(pi); - fail("expected exception did not occur"); - } - catch (AMQProtocolHeaderException m) - { - // ok - } - catch (Exception e) - { - fail("expected AMQProtocolHeaderException, got " + e); - } - } - - /** - * Test that a valid header is passed by the decoder. - * @throws Exception - */ - public void testDecoderAcceptsValidHeader() throws Exception - { - ProtocolInitiation pi = createValidProtocolInitiation(); - decodePI(pi); - } - - /** - * This test checks that an invalid protocol header results in the - * connection being closed. - */ - public void testInvalidProtocolHeaderClosesConnection() throws Exception - { - AMQProtocolHeaderException pe = new AMQProtocolHeaderException("Test"); - _protocolHandler.exceptionCaught(_mockIoSession, pe); - assertNotNull(_mockIoSession.getLastWrittenObject()); - Object piResponse = _mockIoSession.getLastWrittenObject(); - assertEquals(piResponse.getClass(), ProtocolInitiation.class); - ProtocolInitiation pi = (ProtocolInitiation) piResponse; - assertEquals("Protocol Initiation sent out was not the broker's expected header", pi, - createValidProtocolInitiation()); - assertTrue("Session has not been closed", _mockIoSession.isClosing()); - } - - private ProtocolInitiation createValidProtocolInitiation() - { - /* Find last protocol version in protocol version list. Make sure last protocol version - listed in the build file (build-module.xml) is the latest version which will be used - here. */ - int i = pv.length - 1; - return new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR]); - } - - /** - * Helper that encodes a protocol initiation and attempts to decode it - * @param pi - * @throws Exception - */ - private void decodePI(ProtocolInitiation pi) throws Exception - { - // we need to do this test at the level of the decoder since we initially only expect PI frames - // so the protocol handler is not set up to know whether it should be expecting a PI frame or - // a different type of frame - AMQDecoder decoder = new AMQDecoder(true); - AMQEncoder encoder = new AMQEncoder(); - TestProtocolEncoderOutput peo = new TestProtocolEncoderOutput(); - encoder.encode(_mockIoSession, pi, peo); - TestProtocolDecoderOutput pdo = new TestProtocolDecoderOutput(); - decoder.decode(_mockIoSession, peo.result, pdo); - ((ProtocolInitiation) pdo.result).checkVersion(this); - } - - public static junit.framework.Test suite() - { - return new junit.framework.TestSuite(TestProtocolInitiation.class); - } -} diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java index a76db6a728..fe8960c872 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java +++ b/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java @@ -36,7 +36,7 @@ public class ConcurrencyTest extends MessageTestHelper private final int numMessages = 1000; - private final List _subscribers = new ArrayList(); + private final List _subscribers = new ArrayList(); private final Set _active = new HashSet(); private final List _messages = new ArrayList(); private int next = 0;//index to next message to send @@ -91,7 +91,7 @@ public class ConcurrencyTest extends MessageTestHelper { for(int i = 0; i < subscriptions; i++) { - _subscribers.add(new TestSubscription("Subscriber" + i, _received)); + _subscribers.add(new SubscriptionTestHelper("Subscriber" + i, _received)); } } @@ -174,7 +174,7 @@ public class ConcurrencyTest extends MessageTestHelper return random.nextBoolean(); } - private TestSubscription randomSubscriber() + private SubscriptionTestHelper randomSubscriber() { return _subscribers.get(random.nextInt(_subscribers.size())); } diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java index 9cfd9458d1..3631264e5a 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java +++ b/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java @@ -48,8 +48,8 @@ abstract public class DeliveryManagerTest extends MessageTestHelper _mgr.deliver("Me", messages[i]); } - TestSubscription s1 = new TestSubscription("1"); - TestSubscription s2 = new TestSubscription("2"); + SubscriptionTestHelper s1 = new SubscriptionTestHelper("1"); + SubscriptionTestHelper s2 = new SubscriptionTestHelper("2"); _subscriptions.addSubscriber(s1); _subscriptions.addSubscriber(s2); @@ -88,7 +88,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper } int batch = messages.length / 2; - TestSubscription s1 = new TestSubscription("1"); + SubscriptionTestHelper s1 = new SubscriptionTestHelper("1"); _subscriptions.addSubscriber(s1); for (int i = 0; i < batch; i++) @@ -147,7 +147,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper { try { - TestSubscription s = new TestSubscription("A"); + SubscriptionTestHelper s = new SubscriptionTestHelper("A"); _subscriptions.addSubscriber(s); s.setSuspended(true); AMQMessage msg = message(true); diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java deleted file mode 100644 index 11c0026455..0000000000 --- a/java/systests/src/test/java/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.server.util.AveragedRun; -import org.apache.qpid.server.util.ConcurrentTest; - -public class QueueConcurrentPerfTest extends QueuePerfTest -{ - QueueConcurrentPerfTest(Factory factory, int queueCount, int messages) - { - super(factory, queueCount, messages); - } - - public static void main(String[] argv) throws Exception - { - Factory[] factories = new Factory[]{SYNCHRONIZED, CONCURRENT}; - int iterations = 5; - String label = argv.length > 0 ? argv[0]: null; - System.out.println((label == null ? "" : "Label, ") + "Queue Type, No. of Queues, No. of Operations, Avg Time, Min Time, Max Time"); - //vary number of queues: - for(Factory f : factories) - { - run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 100, 10000), iterations), 5)); - run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 1000, 10000), iterations), 5)); - run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 10000, 10000), iterations), 5)); - run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 1000, 1000), iterations), 5)); - run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 1000, 100000), iterations), 5)); - } - } -} diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/QueuePerfTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/QueuePerfTest.java deleted file mode 100644 index 5b3857396d..0000000000 --- a/java/systests/src/test/java/org/apache/qpid/server/queue/QueuePerfTest.java +++ /dev/null @@ -1,258 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.server.util.AveragedRun; -import org.apache.qpid.server.util.TimedRun; -import org.apache.qpid.server.util.RunStats; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; - -public class QueuePerfTest extends TimedRun -{ - private final Factory _factory; - private final int _queueCount; - private final int _messages; - private final String _msg = ""; - private List> _queues; - - QueuePerfTest(Factory factory, int queueCount, int messages) - { - super(factory + ", " + queueCount + ", " + messages); - _factory = factory; - _queueCount = queueCount; - _messages = messages; - } - - protected void setup() throws Exception - { - //init - int count = Integer.getInteger("prepopulate", 0); -// System.err.println("Prepopulating with " + count + " items"); - _queues = new ArrayList>(_queueCount); - for (int i = 0; i < _queueCount; i++) - { - Queue q = _factory.create(); - for(int j = 0; j < count; ++j) - { - q.add("Item"+ j); - } - _queues.add(q); - } - System.gc(); - } - - protected void teardown() throws Exception - { - System.gc(); - } - - protected void run() throws Exception - { - //dispatch - for (int i = 0; i < _messages; i++) - { - for (Queue q : _queues) - { - q.offer(_msg); - q.poll(); - } - } - } - - static interface Factory - { - Queue create(); - } - - static Factory CONCURRENT = new Factory() - { - public Queue create() - { - return new ConcurrentLinkedQueue(); - } - - public String toString() - { - return "ConcurrentLinkedQueue"; - } - - }; - - static Factory SYNCHRONIZED = new Factory() - { - public Queue create() - { - return new SynchronizedQueue(new LinkedList()); - } - - - public String toString() - { - return "Synchronized LinkedList"; - } - }; - - static Factory PLAIN = new Factory() - { - public Queue create() - { - return new LinkedList(); - } - - public String toString() - { - return "Plain LinkedList"; - } - }; - - static class SynchronizedQueue implements Queue - { - private final Queue queue; - - SynchronizedQueue(Queue queue) - { - this.queue = queue; - } - - public synchronized E element() - { - return queue.element(); - } - - public synchronized boolean offer(E o) - { - return queue.offer(o); - } - - public synchronized E peek() - { - return queue.peek(); - } - - public synchronized E poll() - { - return queue.poll(); - } - - public synchronized E remove() - { - return queue.remove(); - } - - public synchronized int size() - { - return queue.size(); - } - - public synchronized boolean isEmpty() - { - return queue.isEmpty(); - } - - public synchronized boolean contains(Object o) - { - return queue.contains(o); - } - - public synchronized Iterator iterator() - { - return queue.iterator(); - } - - public synchronized Object[] toArray() - { - return queue.toArray(); - } - - public synchronized T[] toArray(T[] a) - { - return queue.toArray(a); - } - - public synchronized boolean add(E o) - { - return queue.add(o); - } - - public synchronized boolean remove(Object o) - { - return queue.remove(o); - } - - public synchronized boolean containsAll(Collection c) - { - return queue.containsAll(c); - } - - public synchronized boolean addAll(Collection c) - { - return queue.addAll(c); - } - - public synchronized boolean removeAll(Collection c) - { - return queue.removeAll(c); - } - - public synchronized boolean retainAll(Collection c) - { - return queue.retainAll(c); - } - - public synchronized void clear() - { - queue.clear(); - } - } - - static void run(String label, AveragedRun test) throws Exception - { - RunStats stats = test.call(); - System.out.println((label == null ? "" : label + ", ") + test - + ", " + stats.getAverage() + ", " + stats.getMax() + ", " + stats.getMin()); - } - - public static void main(String[] argv) throws Exception - { - Factory[] factories = new Factory[]{PLAIN, SYNCHRONIZED, CONCURRENT}; - int iterations = 5; - String label = argv.length > 0 ? argv[0]: null; - System.out.println((label == null ? "" : "Label, ") + "Queue Type, No. of Queues, No. of Operations, Avg Time, Min Time, Max Time"); - //vary number of queues: - - for(Factory f : factories) - { - run(label, new AveragedRun(new QueuePerfTest(f, 100, 10000), iterations)); - run(label, new AveragedRun(new QueuePerfTest(f, 1000, 10000), iterations)); - run(label, new AveragedRun(new QueuePerfTest(f, 10000, 10000), iterations)); - run(label, new AveragedRun(new QueuePerfTest(f, 1000, 1000), iterations)); - run(label, new AveragedRun(new QueuePerfTest(f, 1000, 100000), iterations)); - } - } - -} diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/SendPerfTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/SendPerfTest.java deleted file mode 100644 index 6490b9f270..0000000000 --- a/java/systests/src/test/java/org/apache/qpid/server/queue/SendPerfTest.java +++ /dev/null @@ -1,174 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.AMQException; -import org.apache.qpid.codec.AMQCodecFactory; -import org.apache.qpid.framing.BasicPublishBody; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.exchange.AbstractExchange; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.handler.OnCurrentThreadExecutor; -import org.apache.qpid.server.protocol.AMQMinaProtocolSession; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.protocol.MockIoSession; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.registry.IApplicationRegistry; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.SkeletonMessageStore; -import org.apache.qpid.server.util.AveragedRun; -import org.apache.qpid.server.util.TestApplicationRegistry; -import org.apache.qpid.server.util.TimedRun; - -import java.util.ArrayList; -import java.util.List; - -public class SendPerfTest extends TimedRun -{ - private int _messages = 1000; - private int _clients = 10; - private List _queues; - - public SendPerfTest(int clients, int messages) - { - super("SendPerfTest, msgs=" + messages + ", clients=" + clients); - _messages = messages; - _clients = clients; - } - - protected void setup() throws Exception - { - _queues = initQueues(_clients); - System.gc(); - } - - protected void teardown() throws Exception - { - System.gc(); - } - - protected void run() throws Exception - { - deliver(_messages, _queues); - } - - //have a dummy AMQProtocolSession that does nothing on the writeFrame() - //set up x number of queues - //create necessary bits and pieces to deliver a message - //deliver y messages to each queue - - public static void main(String[] argv) throws Exception - { - ApplicationRegistry.initialise(new TestApplicationRegistry()); - int clients = Integer.parseInt(argv[0]); - int messages = Integer.parseInt(argv[1]); - int iterations = Integer.parseInt(argv[2]); - AveragedRun test = new AveragedRun(new SendPerfTest(clients, messages), iterations); - test.run(); - } - - /** - * Delivers messages to a number of queues. - * @param count the number of messages to deliver - * @param queues the list of queues - * @throws NoConsumersException - */ - static void deliver(int count, List queues) throws AMQException - { - BasicPublishBody publish = new BasicPublishBody(); - publish.exchange = new NullExchange().getName(); - ContentHeaderBody header = new ContentHeaderBody(); - List body = new ArrayList(); - MessageStore messageStore = new SkeletonMessageStore(); - body.add(new ContentBody()); - for (int i = 0; i < count; i++) - { - for (AMQQueue q : queues) - { - q.deliver(new AMQMessage(messageStore, i, publish, header, body)); - } - } - } - - static List initQueues(int number) throws AMQException - { - Exchange exchange = new NullExchange(); - List queues = new ArrayList(number); - for (int i = 0; i < number; i++) - { - AMQQueue q = createQueue("Queue" + (i + 1)); - q.bind("routingKey", exchange); - try - { - q.registerProtocolSession(createSession(), 1, "1", false); - } - catch (Exception e) - { - throw new AMQException("Error creating protocol session: " + e, e); - } - queues.add(q); - } - return queues; - } - - static AMQQueue createQueue(String name) throws AMQException - { - return new AMQQueue(name, false, null, false, ApplicationRegistry.getInstance().getQueueRegistry(), - new OnCurrentThreadExecutor()); - } - - static AMQProtocolSession createSession() throws Exception - { - IApplicationRegistry reg = ApplicationRegistry.getInstance(); - AMQCodecFactory codecFactory = new AMQCodecFactory(true); - AMQMinaProtocolSession result = new AMQMinaProtocolSession(new MockIoSession(), reg.getQueueRegistry(), reg.getExchangeRegistry(), codecFactory); - result.addChannel(new AMQChannel(1, null, null)); - return result; - } - - static class NullExchange extends AbstractExchange - { - public String getName() - { - return "NullExchange"; - } - - protected ExchangeMBean createMBean() - { - return null; - } - - public void registerQueue(String routingKey, AMQQueue queue, FieldTable args) throws AMQException - { - } - - public void deregisterQueue(String routingKey, AMQQueue queue) throws AMQException - { - } - - public void route(AMQMessage payload) throws AMQException - { - } - } -} diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java index f8db90850f..d3ec3c11d4 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java +++ b/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java @@ -30,12 +30,12 @@ public class SubscriptionManagerTest extends TestCase { assertTrue(mgr.isEmpty()); assertFalse(mgr.hasActiveSubscribers()); - TestSubscription s1 = new TestSubscription("S1"); + SubscriptionTestHelper s1 = new SubscriptionTestHelper("S1"); mgr.addSubscriber(s1); assertFalse(mgr.isEmpty()); assertTrue(mgr.hasActiveSubscribers()); - TestSubscription s2 = new TestSubscription("S2"); + SubscriptionTestHelper s2 = new SubscriptionTestHelper("S2"); mgr.addSubscriber(s2); s2.setSuspended(true); @@ -47,18 +47,18 @@ public class SubscriptionManagerTest extends TestCase s1.setSuspended(true); assertFalse(mgr.hasActiveSubscribers()); - mgr.removeSubscriber(new TestSubscription("S1")); + mgr.removeSubscriber(new SubscriptionTestHelper("S1")); assertFalse(mgr.isEmpty()); - mgr.removeSubscriber(new TestSubscription("S2")); + mgr.removeSubscriber(new SubscriptionTestHelper("S2")); assertTrue(mgr.isEmpty()); } public void testRoundRobin() { - TestSubscription a = new TestSubscription("A"); - TestSubscription b = new TestSubscription("B"); - TestSubscription c = new TestSubscription("C"); - TestSubscription d = new TestSubscription("D"); + SubscriptionTestHelper a = new SubscriptionTestHelper("A"); + SubscriptionTestHelper b = new SubscriptionTestHelper("B"); + SubscriptionTestHelper c = new SubscriptionTestHelper("C"); + SubscriptionTestHelper d = new SubscriptionTestHelper("D"); mgr.addSubscriber(a); mgr.addSubscriber(b); mgr.addSubscriber(c); @@ -84,7 +84,7 @@ public class SubscriptionManagerTest extends TestCase mgr.removeSubscriber(a); d.setSuspended(true); c.setSuspended(false); - Subscription e = new TestSubscription("D"); + Subscription e = new SubscriptionTestHelper("D"); mgr.addSubscriber(e); for (int i = 0; i < 3; i++) diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionSetTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionSetTest.java index 4969b5589a..bcf54693d3 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionSetTest.java +++ b/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionSetTest.java @@ -47,13 +47,13 @@ public class SubscriptionSetTest extends TestCase } } - final TestSubscription sub1 = new TestSubscription("1"); - final TestSubscription sub2 = new TestSubscription("2"); - final TestSubscription sub3 = new TestSubscription("3"); + final SubscriptionTestHelper sub1 = new SubscriptionTestHelper("1"); + final SubscriptionTestHelper sub2 = new SubscriptionTestHelper("2"); + final SubscriptionTestHelper sub3 = new SubscriptionTestHelper("3"); - final TestSubscription suspendedSub1 = new TestSubscription("sus1", true); - final TestSubscription suspendedSub2 = new TestSubscription("sus2", true); - final TestSubscription suspendedSub3 = new TestSubscription("sus3", true); + final SubscriptionTestHelper suspendedSub1 = new SubscriptionTestHelper("sus1", true); + final SubscriptionTestHelper suspendedSub2 = new SubscriptionTestHelper("sus2", true); + final SubscriptionTestHelper suspendedSub3 = new SubscriptionTestHelper("sus3", true); public void testNextMessage() { @@ -114,7 +114,7 @@ public class SubscriptionSetTest extends TestCase public void testNextMessageOverScanning() { TestSubscriptionSet ss = new TestSubscriptionSet(); - TestSubscription sub = new TestSubscription("test"); + SubscriptionTestHelper sub = new SubscriptionTestHelper("test"); ss.addSubscriber(suspendedSub1); ss.addSubscriber(sub); ss.addSubscriber(suspendedSub3); diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java b/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java new file mode 100644 index 0000000000..2773c810d2 --- /dev/null +++ b/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java @@ -0,0 +1,87 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import java.util.ArrayList; +import java.util.List; + +public class SubscriptionTestHelper implements Subscription +{ + private final List messages; + private final Object key; + private boolean isSuspended; + + public SubscriptionTestHelper(Object key) + { + this(key, new ArrayList()); + } + + public SubscriptionTestHelper(final Object key, final boolean isSuspended) + { + this(key); + setSuspended(isSuspended); + } + + SubscriptionTestHelper(Object key, List messages) + { + this.key = key; + this.messages = messages; + } + + List getMessages() + { + return messages; + } + + public void send(AMQMessage msg, AMQQueue queue) + { + messages.add(msg); + } + + public void setSuspended(boolean suspended) + { + isSuspended = suspended; + } + + public boolean isSuspended() + { + return isSuspended; + } + + public void queueDeleted(AMQQueue queue) + { + } + + public int hashCode() + { + return key.hashCode(); + } + + public boolean equals(Object o) + { + return o instanceof SubscriptionTestHelper && ((SubscriptionTestHelper) o).key.equals(key); + } + + public String toString() + { + return key.toString(); + } +} diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/TestSubscription.java b/java/systests/src/test/java/org/apache/qpid/server/queue/TestSubscription.java deleted file mode 100644 index de6df4bc03..0000000000 --- a/java/systests/src/test/java/org/apache/qpid/server/queue/TestSubscription.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import java.util.ArrayList; -import java.util.List; - -public class TestSubscription implements Subscription -{ - private final List messages; - private final Object key; - private boolean isSuspended; - - public TestSubscription(Object key) - { - this(key, new ArrayList()); - } - - public TestSubscription(final Object key, final boolean isSuspended) - { - this(key); - setSuspended(isSuspended); - } - - TestSubscription(Object key, List messages) - { - this.key = key; - this.messages = messages; - } - - List getMessages() - { - return messages; - } - - public void send(AMQMessage msg, AMQQueue queue) - { - messages.add(msg); - } - - public void setSuspended(boolean suspended) - { - isSuspended = suspended; - } - - public boolean isSuspended() - { - return isSuspended; - } - - public void queueDeleted(AMQQueue queue) - { - } - - public int hashCode() - { - return key.hashCode(); - } - - public boolean equals(Object o) - { - return o instanceof TestSubscription && ((TestSubscription) o).key.equals(key); - } - - public String toString() - { - return key.toString(); - } -} diff --git a/java/systests/src/test/java/org/apache/qpid/server/util/ConcurrentTest.java b/java/systests/src/test/java/org/apache/qpid/server/util/ConcurrentTest.java deleted file mode 100644 index 1ae8d3205d..0000000000 --- a/java/systests/src/test/java/org/apache/qpid/server/util/ConcurrentTest.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.util; - -public class ConcurrentTest extends TimedRun -{ - private final TimedRun _test; - private final Thread[] _threads; - - public ConcurrentTest(TimedRun test, int threads) - { - super(test.toString()); - _test = test; - _threads = new Thread[threads]; - } - - protected void setup() throws Exception - { - _test.setup(); - for(int i = 0; i < _threads.length; i++) - { - _threads[i] = new Thread(new Runner()); - } - } - - protected void teardown() throws Exception - { - _test.teardown(); - } - - protected void run() throws Exception - { - for(Thread t : _threads) - { - t.start(); - } - for(Thread t : _threads) - { - t.join(); - } - } - - private class Runner implements Runnable - { - private Exception error; - - public void run() - { - try - { - _test.run(); - } - catch(Exception e) - { - error = e; - e.printStackTrace(); - } - } - } - -} diff --git a/java/systests/src/test/java/org/apache/qpid/test/unit/ack/DisconnectAndRedeliverTest.java b/java/systests/src/test/java/org/apache/qpid/test/unit/ack/DisconnectAndRedeliverTest.java deleted file mode 100644 index a3e555aac9..0000000000 --- a/java/systests/src/test/java/org/apache/qpid/test/unit/ack/DisconnectAndRedeliverTest.java +++ /dev/null @@ -1,216 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.test.unit.ack; - -import org.apache.log4j.Logger; -import org.apache.log4j.xml.DOMConfigurator; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException; -import org.apache.qpid.client.transport.TransportConnection; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.store.TestableMemoryMessageStore; -import org.apache.qpid.server.util.TestApplicationRegistry; -import org.apache.qpid.test.VMBrokerSetup; - -import javax.jms.*; - -import junit.framework.TestCase; - -public class DisconnectAndRedeliverTest extends TestCase -{ - private static final Logger _logger = Logger.getLogger(DisconnectAndRedeliverTest.class); - - static - { - String workdir = System.getProperty("QPID_WORK"); - if (workdir == null || workdir.equals("")) - { - String tempdir = System.getProperty("java.io.tmpdir"); - System.out.println("QPID_WORK not set using tmp directory: " + tempdir); - System.setProperty("QPID_WORK", tempdir); - } - DOMConfigurator.configure("../broker/etc/log4j.xml"); - } - - protected void setUp() throws Exception - { - super.setUp(); - ApplicationRegistry.initialise(new TestApplicationRegistry(), 1); - } - - protected void tearDown() throws Exception - { - super.tearDown(); - } - - /** - * This tests that when there are unacknowledged messages on a channel they are requeued for delivery when - * the channel is closed. - * - * @throws Exception - */ - public void testDisconnectRedeliversMessages() throws Exception - { - Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); - - TestableMemoryMessageStore store = (TestableMemoryMessageStore) ApplicationRegistry.getInstance().getMessageStore(); - - Session consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE); - AMQQueue queue = new AMQQueue("someQ", "someQ", false, false); - MessageConsumer consumer = consumerSession.createConsumer(queue); - //force synch to ensure the consumer has resulted in a bound queue - ((AMQSession) consumerSession).declareExchangeSynch("amq.direct", "direct"); - - Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test"); - - - Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); - MessageProducer producer = producerSession.createProducer(queue); - - _logger.info("Sending four messages"); - producer.send(producerSession.createTextMessage("msg1")); - producer.send(producerSession.createTextMessage("msg2")); - producer.send(producerSession.createTextMessage("msg3")); - producer.send(producerSession.createTextMessage("msg4")); - - con2.close(); - - _logger.info("Starting connection"); - con.start(); - TextMessage tm = (TextMessage) consumer.receive(); - tm.acknowledge(); - _logger.info("Received and acknowledged first message"); - consumer.receive(); - consumer.receive(); - consumer.receive(); - _logger.info("Received all four messages. About to disconnect and reconnect"); - - con.close(); - con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); - consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); - consumer = consumerSession.createConsumer(queue); - - _logger.info("Starting second consumer connection"); - con.start(); - - tm = (TextMessage) consumer.receive(3000); - assertEquals("msg2", tm.getText()); - - - tm = (TextMessage) consumer.receive(3000); - assertEquals("msg3", tm.getText()); - - - tm = (TextMessage) consumer.receive(3000); - assertEquals("msg4", tm.getText()); - - _logger.info("Received redelivery of three messages. Acknowledging last message"); - tm.acknowledge(); - - con.close(); - - con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); - consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); - consumer = consumerSession.createConsumer(queue); - _logger.info("Starting third consumer connection"); - con.start(); - tm = (TextMessage) consumer.receiveNoWait(); - assertNull(tm); - _logger.info("No messages redelivered as is expected"); - con.close(); - - con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); - consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); - consumer = consumerSession.createConsumer(queue); - _logger.info("Starting fourth consumer connection"); - con.start(); - tm = (TextMessage) consumer.receive(3000); - assertNull(tm); - _logger.info("No messages redelivered as is expected"); - con.close(); - - _logger.info("Actually:" + store.getMessageMap().size()); - // assertTrue(store.getMessageMap().size() == 0); - } - - /** - * Tests that unacknowledged messages are thrown away when the channel is closed and they cannot be - * requeued (due perhaps to the queue being deleted). - * - * @throws Exception - */ - public void testDisconnectWithTransientQueueThrowsAwayMessages() throws Exception - { - - Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); - TestableMemoryMessageStore store = (TestableMemoryMessageStore) ApplicationRegistry.getInstance().getMessageStore(); - Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); - Queue queue = new AMQQueue("someQ", "someQ", false, true); - MessageConsumer consumer = consumerSession.createConsumer(queue); - //force synch to ensure the consumer has resulted in a bound queue - ((AMQSession) consumerSession).declareExchangeSynch("amq.direct", "direct"); - - Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test"); - Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); - MessageProducer producer = producerSession.createProducer(queue); - - _logger.info("Sending four messages"); - producer.send(producerSession.createTextMessage("msg1")); - producer.send(producerSession.createTextMessage("msg2")); - producer.send(producerSession.createTextMessage("msg3")); - producer.send(producerSession.createTextMessage("msg4")); - - con2.close(); - - _logger.info("Starting connection"); - con.start(); - TextMessage tm = (TextMessage) consumer.receive(); - tm.acknowledge(); - _logger.info("Received and acknowledged first message"); - consumer.receive(); - consumer.receive(); - consumer.receive(); - _logger.info("Received all four messages. About to disconnect and reconnect"); - - con.close(); - con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); - consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); - consumer = consumerSession.createConsumer(queue); - - _logger.info("Starting second consumer connection"); - con.start(); - - tm = (TextMessage) consumer.receiveNoWait(); - assertNull(tm); - _logger.info("No messages redelivered as is expected"); - - _logger.info("Actually:" + store.getMessageMap().size()); - assertTrue(store.getMessageMap().size() == 0); - con.close(); - } - - public static junit.framework.Test suite() - { - return new VMBrokerSetup(new junit.framework.TestSuite(DisconnectAndRedeliverTest.class)); - } -} -- cgit v1.2.1