summaryrefslogtreecommitdiff
path: root/java/systests/src/old_test
diff options
context:
space:
mode:
authorStephen Vinoski <vinoski@apache.org>2006-12-12 04:57:43 +0000
committerStephen Vinoski <vinoski@apache.org>2006-12-12 04:57:43 +0000
commit9b45e0ab5d2a719d8ab4e7ec3f23d63ba4966f80 (patch)
tree65fdd1a2eb41f382ad72d600d98ee2596ab7506a /java/systests/src/old_test
parent6a03e9214cf9e896bdc2b077975fdd0f1129ce28 (diff)
downloadqpid-python-9b45e0ab5d2a719d8ab4e7ec3f23d63ba4966f80.tar.gz
systests test reorg
* move unused tests to src/old_test * modify pom.xml to remove surefire inclusions and exclusions git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@486021 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/systests/src/old_test')
-rw-r--r--java/systests/src/old_test/java/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java184
-rw-r--r--java/systests/src/old_test/java/org/apache/qpid/server/protocol/TestProtocolInitiation.java266
-rw-r--r--java/systests/src/old_test/java/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java49
-rw-r--r--java/systests/src/old_test/java/org/apache/qpid/server/queue/QueuePerfTest.java258
-rw-r--r--java/systests/src/old_test/java/org/apache/qpid/server/queue/SendPerfTest.java174
-rw-r--r--java/systests/src/old_test/java/org/apache/qpid/server/util/ConcurrentTest.java79
-rw-r--r--java/systests/src/old_test/java/org/apache/qpid/test/unit/ack/DisconnectAndRedeliverTest.java216
7 files changed, 1226 insertions, 0 deletions
diff --git a/java/systests/src/old_test/java/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java b/java/systests/src/old_test/java/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java
new file mode 100644
index 0000000000..ff0d58ad69
--- /dev/null
+++ b/java/systests/src/old_test/java/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java
@@ -0,0 +1,184 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.NoConsumersException;
+import org.apache.qpid.server.util.TimedRun;
+import org.apache.qpid.server.util.AveragedRun;
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.ContentBody;
+
+import java.util.List;
+
+/**
+ * Want to vary the number of regsitrations, messages and matches and measure
+ * the corresponding variance in execution time.
+ * <p/>
+ * Each registration will contain the 'All' header, even registrations will
+ * contain the 'Even' header and odd headers will contain the 'Odd' header.
+ * In additions each regsitration will have a unique value for the 'Specific'
+ * header as well.
+ * <p/>
+ * Messages can then be routed to all registrations, to even- or odd- registrations
+ * or to a specific registration.
+ *
+ */
+public class HeadersExchangePerformanceTest extends AbstractHeadersExchangeTest
+{
+ private static enum Mode {ALL, ODD_OR_EVEN, SPECIFIC}
+
+ private final TestQueue[] queues;
+ private final Mode mode;
+
+ public HeadersExchangePerformanceTest(Mode mode, int registrations) throws AMQException
+ {
+ this.mode = mode;
+ queues = new TestQueue[registrations];
+ for (int i = 0; i < queues.length; i++)
+ {
+ switch(mode)
+ {
+ case ALL:
+ queues[i] = bind(new FastQueue("Queue" + i), "All");
+ break;
+ case ODD_OR_EVEN:
+ queues[i] = bind(new FastQueue("Queue" + i), "All", oddOrEven(i));
+ break;
+ case SPECIFIC:
+ queues[i] = bind(new FastQueue("Queue" + i), "All", oddOrEven(i), "Specific"+ i);
+ break;
+ }
+ }
+ }
+
+ void sendToAll(int count) throws AMQException
+ {
+ send(count, "All=True");
+ }
+
+ void sendToOdd(int count) throws AMQException
+ {
+ send(count, "All=True", "Odd=True");
+ }
+
+ void sendToEven(int count) throws AMQException
+ {
+ send(count, "All=True", "Even=True");
+ }
+
+ void sendToAllSpecifically(int count) throws AMQException
+ {
+ for (int i = 0; i < queues.length; i++)
+ {
+ sendToSpecific(count, i);
+ }
+ }
+
+ void sendToSpecific(int count, int index) throws AMQException
+ {
+ send(count, "All=True", oddOrEven(index) + "=True", "Specific=" + index);
+ }
+
+ private void send(int count, String... headers) throws AMQException
+ {
+ for (int i = 0; i < count; i++)
+ {
+ route(new Message("Message" + i, headers));
+ }
+ }
+
+ private static String oddOrEven(int i)
+ {
+ return (i % 2 == 0 ? "Even" : "Odd");
+ }
+
+ static class FastQueue extends TestQueue
+ {
+
+ public FastQueue(String name) throws AMQException
+ {
+ super(name);
+ }
+
+ public void deliver(BasicPublishBody publishBody, ContentHeaderBody contentHeaderBody, List<ContentBody> contentBodies) throws NoConsumersException
+ {
+ //just discard as we are not testing routing functionality here
+ }
+ }
+
+ static class Test extends TimedRun
+ {
+ private final Mode mode;
+ private final int registrations;
+ private final int count;
+ private HeadersExchangePerformanceTest test;
+
+ Test(Mode mode, int registrations, int count)
+ {
+ super(mode + ", registrations=" + registrations + ", count=" + count);
+ this.mode = mode;
+ this.registrations = registrations;
+ this.count = count;
+ }
+
+ protected void setup() throws Exception
+ {
+ test = new HeadersExchangePerformanceTest(mode, registrations);
+ run(100); //do a warm up run before times start
+ }
+
+ protected void teardown() throws Exception
+ {
+ test = null;
+ System.gc();
+ }
+
+ protected void run() throws Exception
+ {
+ run(count);
+ }
+
+ private void run(int count) throws Exception
+ {
+ switch(mode)
+ {
+ case ALL:
+ test.sendToAll(count);
+ break;
+ default:
+ System.out.println("Test for " + mode + " not yet implemented.");
+ }
+ }
+ }
+
+ public static void main(String[] argv) throws Exception
+ {
+ int registrations = Integer.parseInt(argv[0]);
+ int messages = Integer.parseInt(argv[1]);
+ int iterations = Integer.parseInt(argv[2]);
+ TimedRun test = new Test(Mode.ALL, registrations, messages);
+ AveragedRun tests = new AveragedRun(test, iterations);
+ System.out.println(tests.call());
+ }
+}
+
diff --git a/java/systests/src/old_test/java/org/apache/qpid/server/protocol/TestProtocolInitiation.java b/java/systests/src/old_test/java/org/apache/qpid/server/protocol/TestProtocolInitiation.java
new file mode 100644
index 0000000000..e76c164f64
--- /dev/null
+++ b/java/systests/src/old_test/java/org/apache/qpid/server/protocol/TestProtocolInitiation.java
@@ -0,0 +1,266 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import org.apache.qpid.codec.AMQDecoder;
+import org.apache.qpid.codec.AMQEncoder;
+import org.apache.qpid.framing.*;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.WriteFuture;
+import org.apache.mina.filter.codec.ProtocolDecoderOutput;
+import org.apache.mina.filter.codec.ProtocolEncoderOutput;
+import org.apache.mina.filter.codec.support.SimpleProtocolDecoderOutput;
+
+import junit.framework.TestCase;
+
+/**
+ * This test suite tests the handling of protocol initiation frames and related issues.
+ */
+public class TestProtocolInitiation extends TestCase implements ProtocolVersionList
+{
+ private AMQPFastProtocolHandler _protocolHandler;
+
+ private MockIoSession _mockIoSession;
+
+ /**
+ * We need to use the object encoder mechanism so to allow us to retrieve the
+ * output (a bytebuffer) we define our own encoder output class. The encoder
+ * writes the encoded data to this class, from where we can retrieve it during
+ * the test run.
+ */
+ private class TestProtocolEncoderOutput implements ProtocolEncoderOutput
+ {
+ public ByteBuffer result;
+
+ public void write(ByteBuffer buf)
+ {
+ result = buf;
+ }
+
+ public void mergeAll()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public WriteFuture flush()
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private class TestProtocolDecoderOutput implements ProtocolDecoderOutput
+ {
+ public Object result;
+
+ public void write(Object buf)
+ {
+ result = buf;
+ }
+
+ public void flush()
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ _mockIoSession = new MockIoSession();
+ _protocolHandler = new AMQPFastProtocolHandler(null, null);
+ }
+
+
+ /**
+ * Tests that the AMQDecoder handles invalid protocol classes
+ * @throws Exception
+ */
+ public void testDecoderValidateProtocolClass() throws Exception
+ {
+ try
+ {
+ ProtocolInitiation pi = createValidProtocolInitiation();
+ pi.protocolClass = 2;
+ decodePI(pi);
+ fail("expected exception did not occur");
+ }
+ catch (AMQProtocolClassException m)
+ {
+ // ok
+ }
+ catch (Exception e)
+ {
+ fail("expected AMQProtocolClassException, got " + e);
+ }
+ }
+
+ /**
+ * Tests that the AMQDecoder handles invalid protocol instance numbers
+ * @throws Exception
+ */
+ public void testDecoderValidatesProtocolInstance() throws Exception
+ {
+ try
+ {
+ ProtocolInitiation pi = createValidProtocolInitiation();
+ pi.protocolInstance = 2;
+ decodePI(pi);
+ fail("expected exception did not occur");
+ }
+ catch (AMQProtocolInstanceException m)
+ {
+ // ok
+ }
+ catch (Exception e)
+ {
+ fail("expected AMQProtocolInstanceException, got " + e);
+ }
+ }
+
+ /**
+ * Tests that the AMQDecoder handles invalid protocol major
+ * @throws Exception
+ */
+ public void testDecoderValidatesProtocolMajor() throws Exception
+ {
+ try
+ {
+ ProtocolInitiation pi = createValidProtocolInitiation();
+ pi.protocolMajor = 2;
+ decodePI(pi);
+ fail("expected exception did not occur");
+ }
+ catch (AMQProtocolVersionException m)
+ {
+ // ok
+ }
+ catch (Exception e)
+ {
+ fail("expected AMQProtocolVersionException, got " + e);
+ }
+ }
+
+ /**
+ * Tests that the AMQDecoder handles invalid protocol minor
+ * @throws Exception
+ */
+ public void testDecoderValidatesProtocolMinor() throws Exception
+ {
+ try
+ {
+ ProtocolInitiation pi = createValidProtocolInitiation();
+ pi.protocolMinor = 99;
+ decodePI(pi);
+ fail("expected exception did not occur");
+ }
+ catch (AMQProtocolVersionException m)
+ {
+ // ok
+ }
+ catch (Exception e)
+ {
+ fail("expected AMQProtocolVersionException, got " + e);
+ }
+ }
+
+ /**
+ * Tests that the AMQDecoder accepts a valid PI
+ * @throws Exception
+ */
+ public void testDecoderValidatesHeader() throws Exception
+ {
+ try
+ {
+ ProtocolInitiation pi = createValidProtocolInitiation();
+ pi.header = new char[] {'P', 'Q', 'M', 'A' };
+ decodePI(pi);
+ fail("expected exception did not occur");
+ }
+ catch (AMQProtocolHeaderException m)
+ {
+ // ok
+ }
+ catch (Exception e)
+ {
+ fail("expected AMQProtocolHeaderException, got " + e);
+ }
+ }
+
+ /**
+ * Test that a valid header is passed by the decoder.
+ * @throws Exception
+ */
+ public void testDecoderAcceptsValidHeader() throws Exception
+ {
+ ProtocolInitiation pi = createValidProtocolInitiation();
+ decodePI(pi);
+ }
+
+ /**
+ * This test checks that an invalid protocol header results in the
+ * connection being closed.
+ */
+ public void testInvalidProtocolHeaderClosesConnection() throws Exception
+ {
+ AMQProtocolHeaderException pe = new AMQProtocolHeaderException("Test");
+ _protocolHandler.exceptionCaught(_mockIoSession, pe);
+ assertNotNull(_mockIoSession.getLastWrittenObject());
+ Object piResponse = _mockIoSession.getLastWrittenObject();
+ assertEquals(piResponse.getClass(), ProtocolInitiation.class);
+ ProtocolInitiation pi = (ProtocolInitiation) piResponse;
+ assertEquals("Protocol Initiation sent out was not the broker's expected header", pi,
+ createValidProtocolInitiation());
+ assertTrue("Session has not been closed", _mockIoSession.isClosing());
+ }
+
+ private ProtocolInitiation createValidProtocolInitiation()
+ {
+ /* Find last protocol version in protocol version list. Make sure last protocol version
+ listed in the build file (build-module.xml) is the latest version which will be used
+ here. */
+ int i = pv.length - 1;
+ return new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR]);
+ }
+
+ /**
+ * Helper that encodes a protocol initiation and attempts to decode it
+ * @param pi
+ * @throws Exception
+ */
+ private void decodePI(ProtocolInitiation pi) throws Exception
+ {
+ // we need to do this test at the level of the decoder since we initially only expect PI frames
+ // so the protocol handler is not set up to know whether it should be expecting a PI frame or
+ // a different type of frame
+ AMQDecoder decoder = new AMQDecoder(true);
+ AMQEncoder encoder = new AMQEncoder();
+ TestProtocolEncoderOutput peo = new TestProtocolEncoderOutput();
+ encoder.encode(_mockIoSession, pi, peo);
+ TestProtocolDecoderOutput pdo = new TestProtocolDecoderOutput();
+ decoder.decode(_mockIoSession, peo.result, pdo);
+ ((ProtocolInitiation) pdo.result).checkVersion(this);
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(TestProtocolInitiation.class);
+ }
+}
diff --git a/java/systests/src/old_test/java/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java b/java/systests/src/old_test/java/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java
new file mode 100644
index 0000000000..11c0026455
--- /dev/null
+++ b/java/systests/src/old_test/java/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java
@@ -0,0 +1,49 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.util.AveragedRun;
+import org.apache.qpid.server.util.ConcurrentTest;
+
+public class QueueConcurrentPerfTest extends QueuePerfTest
+{
+ QueueConcurrentPerfTest(Factory factory, int queueCount, int messages)
+ {
+ super(factory, queueCount, messages);
+ }
+
+ public static void main(String[] argv) throws Exception
+ {
+ Factory[] factories = new Factory[]{SYNCHRONIZED, CONCURRENT};
+ int iterations = 5;
+ String label = argv.length > 0 ? argv[0]: null;
+ System.out.println((label == null ? "" : "Label, ") + "Queue Type, No. of Queues, No. of Operations, Avg Time, Min Time, Max Time");
+ //vary number of queues:
+ for(Factory f : factories)
+ {
+ run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 100, 10000), iterations), 5));
+ run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 1000, 10000), iterations), 5));
+ run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 10000, 10000), iterations), 5));
+ run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 1000, 1000), iterations), 5));
+ run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 1000, 100000), iterations), 5));
+ }
+ }
+}
diff --git a/java/systests/src/old_test/java/org/apache/qpid/server/queue/QueuePerfTest.java b/java/systests/src/old_test/java/org/apache/qpid/server/queue/QueuePerfTest.java
new file mode 100644
index 0000000000..5b3857396d
--- /dev/null
+++ b/java/systests/src/old_test/java/org/apache/qpid/server/queue/QueuePerfTest.java
@@ -0,0 +1,258 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.util.AveragedRun;
+import org.apache.qpid.server.util.TimedRun;
+import org.apache.qpid.server.util.RunStats;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public class QueuePerfTest extends TimedRun
+{
+ private final Factory _factory;
+ private final int _queueCount;
+ private final int _messages;
+ private final String _msg = "";
+ private List<Queue<String>> _queues;
+
+ QueuePerfTest(Factory factory, int queueCount, int messages)
+ {
+ super(factory + ", " + queueCount + ", " + messages);
+ _factory = factory;
+ _queueCount = queueCount;
+ _messages = messages;
+ }
+
+ protected void setup() throws Exception
+ {
+ //init
+ int count = Integer.getInteger("prepopulate", 0);
+// System.err.println("Prepopulating with " + count + " items");
+ _queues = new ArrayList<Queue<String>>(_queueCount);
+ for (int i = 0; i < _queueCount; i++)
+ {
+ Queue<String> q = _factory.create();
+ for(int j = 0; j < count; ++j)
+ {
+ q.add("Item"+ j);
+ }
+ _queues.add(q);
+ }
+ System.gc();
+ }
+
+ protected void teardown() throws Exception
+ {
+ System.gc();
+ }
+
+ protected void run() throws Exception
+ {
+ //dispatch
+ for (int i = 0; i < _messages; i++)
+ {
+ for (Queue<String> q : _queues)
+ {
+ q.offer(_msg);
+ q.poll();
+ }
+ }
+ }
+
+ static interface Factory
+ {
+ Queue<String> create();
+ }
+
+ static Factory CONCURRENT = new Factory()
+ {
+ public Queue<String> create()
+ {
+ return new ConcurrentLinkedQueue<String>();
+ }
+
+ public String toString()
+ {
+ return "ConcurrentLinkedQueue";
+ }
+
+ };
+
+ static Factory SYNCHRONIZED = new Factory()
+ {
+ public Queue<String> create()
+ {
+ return new SynchronizedQueue<String>(new LinkedList<String>());
+ }
+
+
+ public String toString()
+ {
+ return "Synchronized LinkedList";
+ }
+ };
+
+ static Factory PLAIN = new Factory()
+ {
+ public Queue<String> create()
+ {
+ return new LinkedList<String>();
+ }
+
+ public String toString()
+ {
+ return "Plain LinkedList";
+ }
+ };
+
+ static class SynchronizedQueue<E> implements Queue<E>
+ {
+ private final Queue<E> queue;
+
+ SynchronizedQueue(Queue<E> queue)
+ {
+ this.queue = queue;
+ }
+
+ public synchronized E element()
+ {
+ return queue.element();
+ }
+
+ public synchronized boolean offer(E o)
+ {
+ return queue.offer(o);
+ }
+
+ public synchronized E peek()
+ {
+ return queue.peek();
+ }
+
+ public synchronized E poll()
+ {
+ return queue.poll();
+ }
+
+ public synchronized E remove()
+ {
+ return queue.remove();
+ }
+
+ public synchronized int size()
+ {
+ return queue.size();
+ }
+
+ public synchronized boolean isEmpty()
+ {
+ return queue.isEmpty();
+ }
+
+ public synchronized boolean contains(Object o)
+ {
+ return queue.contains(o);
+ }
+
+ public synchronized Iterator<E> iterator()
+ {
+ return queue.iterator();
+ }
+
+ public synchronized Object[] toArray()
+ {
+ return queue.toArray();
+ }
+
+ public synchronized <T>T[] toArray(T[] a)
+ {
+ return queue.toArray(a);
+ }
+
+ public synchronized boolean add(E o)
+ {
+ return queue.add(o);
+ }
+
+ public synchronized boolean remove(Object o)
+ {
+ return queue.remove(o);
+ }
+
+ public synchronized boolean containsAll(Collection<?> c)
+ {
+ return queue.containsAll(c);
+ }
+
+ public synchronized boolean addAll(Collection<? extends E> c)
+ {
+ return queue.addAll(c);
+ }
+
+ public synchronized boolean removeAll(Collection<?> c)
+ {
+ return queue.removeAll(c);
+ }
+
+ public synchronized boolean retainAll(Collection<?> c)
+ {
+ return queue.retainAll(c);
+ }
+
+ public synchronized void clear()
+ {
+ queue.clear();
+ }
+ }
+
+ static void run(String label, AveragedRun test) throws Exception
+ {
+ RunStats stats = test.call();
+ System.out.println((label == null ? "" : label + ", ") + test
+ + ", " + stats.getAverage() + ", " + stats.getMax() + ", " + stats.getMin());
+ }
+
+ public static void main(String[] argv) throws Exception
+ {
+ Factory[] factories = new Factory[]{PLAIN, SYNCHRONIZED, CONCURRENT};
+ int iterations = 5;
+ String label = argv.length > 0 ? argv[0]: null;
+ System.out.println((label == null ? "" : "Label, ") + "Queue Type, No. of Queues, No. of Operations, Avg Time, Min Time, Max Time");
+ //vary number of queues:
+
+ for(Factory f : factories)
+ {
+ run(label, new AveragedRun(new QueuePerfTest(f, 100, 10000), iterations));
+ run(label, new AveragedRun(new QueuePerfTest(f, 1000, 10000), iterations));
+ run(label, new AveragedRun(new QueuePerfTest(f, 10000, 10000), iterations));
+ run(label, new AveragedRun(new QueuePerfTest(f, 1000, 1000), iterations));
+ run(label, new AveragedRun(new QueuePerfTest(f, 1000, 100000), iterations));
+ }
+ }
+
+}
diff --git a/java/systests/src/old_test/java/org/apache/qpid/server/queue/SendPerfTest.java b/java/systests/src/old_test/java/org/apache/qpid/server/queue/SendPerfTest.java
new file mode 100644
index 0000000000..6490b9f270
--- /dev/null
+++ b/java/systests/src/old_test/java/org/apache/qpid/server/queue/SendPerfTest.java
@@ -0,0 +1,174 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.codec.AMQCodecFactory;
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.exchange.AbstractExchange;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.handler.OnCurrentThreadExecutor;
+import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.MockIoSession;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.SkeletonMessageStore;
+import org.apache.qpid.server.util.AveragedRun;
+import org.apache.qpid.server.util.TestApplicationRegistry;
+import org.apache.qpid.server.util.TimedRun;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class SendPerfTest extends TimedRun
+{
+ private int _messages = 1000;
+ private int _clients = 10;
+ private List<AMQQueue> _queues;
+
+ public SendPerfTest(int clients, int messages)
+ {
+ super("SendPerfTest, msgs=" + messages + ", clients=" + clients);
+ _messages = messages;
+ _clients = clients;
+ }
+
+ protected void setup() throws Exception
+ {
+ _queues = initQueues(_clients);
+ System.gc();
+ }
+
+ protected void teardown() throws Exception
+ {
+ System.gc();
+ }
+
+ protected void run() throws Exception
+ {
+ deliver(_messages, _queues);
+ }
+
+ //have a dummy AMQProtocolSession that does nothing on the writeFrame()
+ //set up x number of queues
+ //create necessary bits and pieces to deliver a message
+ //deliver y messages to each queue
+
+ public static void main(String[] argv) throws Exception
+ {
+ ApplicationRegistry.initialise(new TestApplicationRegistry());
+ int clients = Integer.parseInt(argv[0]);
+ int messages = Integer.parseInt(argv[1]);
+ int iterations = Integer.parseInt(argv[2]);
+ AveragedRun test = new AveragedRun(new SendPerfTest(clients, messages), iterations);
+ test.run();
+ }
+
+ /**
+ * Delivers messages to a number of queues.
+ * @param count the number of messages to deliver
+ * @param queues the list of queues
+ * @throws NoConsumersException
+ */
+ static void deliver(int count, List<AMQQueue> queues) throws AMQException
+ {
+ BasicPublishBody publish = new BasicPublishBody();
+ publish.exchange = new NullExchange().getName();
+ ContentHeaderBody header = new ContentHeaderBody();
+ List<ContentBody> body = new ArrayList<ContentBody>();
+ MessageStore messageStore = new SkeletonMessageStore();
+ body.add(new ContentBody());
+ for (int i = 0; i < count; i++)
+ {
+ for (AMQQueue q : queues)
+ {
+ q.deliver(new AMQMessage(messageStore, i, publish, header, body));
+ }
+ }
+ }
+
+ static List<AMQQueue> initQueues(int number) throws AMQException
+ {
+ Exchange exchange = new NullExchange();
+ List<AMQQueue> queues = new ArrayList<AMQQueue>(number);
+ for (int i = 0; i < number; i++)
+ {
+ AMQQueue q = createQueue("Queue" + (i + 1));
+ q.bind("routingKey", exchange);
+ try
+ {
+ q.registerProtocolSession(createSession(), 1, "1", false);
+ }
+ catch (Exception e)
+ {
+ throw new AMQException("Error creating protocol session: " + e, e);
+ }
+ queues.add(q);
+ }
+ return queues;
+ }
+
+ static AMQQueue createQueue(String name) throws AMQException
+ {
+ return new AMQQueue(name, false, null, false, ApplicationRegistry.getInstance().getQueueRegistry(),
+ new OnCurrentThreadExecutor());
+ }
+
+ static AMQProtocolSession createSession() throws Exception
+ {
+ IApplicationRegistry reg = ApplicationRegistry.getInstance();
+ AMQCodecFactory codecFactory = new AMQCodecFactory(true);
+ AMQMinaProtocolSession result = new AMQMinaProtocolSession(new MockIoSession(), reg.getQueueRegistry(), reg.getExchangeRegistry(), codecFactory);
+ result.addChannel(new AMQChannel(1, null, null));
+ return result;
+ }
+
+ static class NullExchange extends AbstractExchange
+ {
+ public String getName()
+ {
+ return "NullExchange";
+ }
+
+ protected ExchangeMBean createMBean()
+ {
+ return null;
+ }
+
+ public void registerQueue(String routingKey, AMQQueue queue, FieldTable args) throws AMQException
+ {
+ }
+
+ public void deregisterQueue(String routingKey, AMQQueue queue) throws AMQException
+ {
+ }
+
+ public void route(AMQMessage payload) throws AMQException
+ {
+ }
+ }
+}
diff --git a/java/systests/src/old_test/java/org/apache/qpid/server/util/ConcurrentTest.java b/java/systests/src/old_test/java/org/apache/qpid/server/util/ConcurrentTest.java
new file mode 100644
index 0000000000..1ae8d3205d
--- /dev/null
+++ b/java/systests/src/old_test/java/org/apache/qpid/server/util/ConcurrentTest.java
@@ -0,0 +1,79 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.util;
+
+public class ConcurrentTest extends TimedRun
+{
+ private final TimedRun _test;
+ private final Thread[] _threads;
+
+ public ConcurrentTest(TimedRun test, int threads)
+ {
+ super(test.toString());
+ _test = test;
+ _threads = new Thread[threads];
+ }
+
+ protected void setup() throws Exception
+ {
+ _test.setup();
+ for(int i = 0; i < _threads.length; i++)
+ {
+ _threads[i] = new Thread(new Runner());
+ }
+ }
+
+ protected void teardown() throws Exception
+ {
+ _test.teardown();
+ }
+
+ protected void run() throws Exception
+ {
+ for(Thread t : _threads)
+ {
+ t.start();
+ }
+ for(Thread t : _threads)
+ {
+ t.join();
+ }
+ }
+
+ private class Runner implements Runnable
+ {
+ private Exception error;
+
+ public void run()
+ {
+ try
+ {
+ _test.run();
+ }
+ catch(Exception e)
+ {
+ error = e;
+ e.printStackTrace();
+ }
+ }
+ }
+
+}
diff --git a/java/systests/src/old_test/java/org/apache/qpid/test/unit/ack/DisconnectAndRedeliverTest.java b/java/systests/src/old_test/java/org/apache/qpid/test/unit/ack/DisconnectAndRedeliverTest.java
new file mode 100644
index 0000000000..a3e555aac9
--- /dev/null
+++ b/java/systests/src/old_test/java/org/apache/qpid/test/unit/ack/DisconnectAndRedeliverTest.java
@@ -0,0 +1,216 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.ack;
+
+import org.apache.log4j.Logger;
+import org.apache.log4j.xml.DOMConfigurator;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.util.TestApplicationRegistry;
+import org.apache.qpid.test.VMBrokerSetup;
+
+import javax.jms.*;
+
+import junit.framework.TestCase;
+
+public class DisconnectAndRedeliverTest extends TestCase
+{
+ private static final Logger _logger = Logger.getLogger(DisconnectAndRedeliverTest.class);
+
+ static
+ {
+ String workdir = System.getProperty("QPID_WORK");
+ if (workdir == null || workdir.equals(""))
+ {
+ String tempdir = System.getProperty("java.io.tmpdir");
+ System.out.println("QPID_WORK not set using tmp directory: " + tempdir);
+ System.setProperty("QPID_WORK", tempdir);
+ }
+ DOMConfigurator.configure("../broker/etc/log4j.xml");
+ }
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ ApplicationRegistry.initialise(new TestApplicationRegistry(), 1);
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+ /**
+ * This tests that when there are unacknowledged messages on a channel they are requeued for delivery when
+ * the channel is closed.
+ *
+ * @throws Exception
+ */
+ public void testDisconnectRedeliversMessages() throws Exception
+ {
+ Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+
+ TestableMemoryMessageStore store = (TestableMemoryMessageStore) ApplicationRegistry.getInstance().getMessageStore();
+
+ Session consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ AMQQueue queue = new AMQQueue("someQ", "someQ", false, false);
+ MessageConsumer consumer = consumerSession.createConsumer(queue);
+ //force synch to ensure the consumer has resulted in a bound queue
+ ((AMQSession) consumerSession).declareExchangeSynch("amq.direct", "direct");
+
+ Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
+
+
+ Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageProducer producer = producerSession.createProducer(queue);
+
+ _logger.info("Sending four messages");
+ producer.send(producerSession.createTextMessage("msg1"));
+ producer.send(producerSession.createTextMessage("msg2"));
+ producer.send(producerSession.createTextMessage("msg3"));
+ producer.send(producerSession.createTextMessage("msg4"));
+
+ con2.close();
+
+ _logger.info("Starting connection");
+ con.start();
+ TextMessage tm = (TextMessage) consumer.receive();
+ tm.acknowledge();
+ _logger.info("Received and acknowledged first message");
+ consumer.receive();
+ consumer.receive();
+ consumer.receive();
+ _logger.info("Received all four messages. About to disconnect and reconnect");
+
+ con.close();
+ con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+ consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ consumer = consumerSession.createConsumer(queue);
+
+ _logger.info("Starting second consumer connection");
+ con.start();
+
+ tm = (TextMessage) consumer.receive(3000);
+ assertEquals("msg2", tm.getText());
+
+
+ tm = (TextMessage) consumer.receive(3000);
+ assertEquals("msg3", tm.getText());
+
+
+ tm = (TextMessage) consumer.receive(3000);
+ assertEquals("msg4", tm.getText());
+
+ _logger.info("Received redelivery of three messages. Acknowledging last message");
+ tm.acknowledge();
+
+ con.close();
+
+ con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+ consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ consumer = consumerSession.createConsumer(queue);
+ _logger.info("Starting third consumer connection");
+ con.start();
+ tm = (TextMessage) consumer.receiveNoWait();
+ assertNull(tm);
+ _logger.info("No messages redelivered as is expected");
+ con.close();
+
+ con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+ consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ consumer = consumerSession.createConsumer(queue);
+ _logger.info("Starting fourth consumer connection");
+ con.start();
+ tm = (TextMessage) consumer.receive(3000);
+ assertNull(tm);
+ _logger.info("No messages redelivered as is expected");
+ con.close();
+
+ _logger.info("Actually:" + store.getMessageMap().size());
+ // assertTrue(store.getMessageMap().size() == 0);
+ }
+
+ /**
+ * Tests that unacknowledged messages are thrown away when the channel is closed and they cannot be
+ * requeued (due perhaps to the queue being deleted).
+ *
+ * @throws Exception
+ */
+ public void testDisconnectWithTransientQueueThrowsAwayMessages() throws Exception
+ {
+
+ Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+ TestableMemoryMessageStore store = (TestableMemoryMessageStore) ApplicationRegistry.getInstance().getMessageStore();
+ Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Queue queue = new AMQQueue("someQ", "someQ", false, true);
+ MessageConsumer consumer = consumerSession.createConsumer(queue);
+ //force synch to ensure the consumer has resulted in a bound queue
+ ((AMQSession) consumerSession).declareExchangeSynch("amq.direct", "direct");
+
+ Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
+ Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageProducer producer = producerSession.createProducer(queue);
+
+ _logger.info("Sending four messages");
+ producer.send(producerSession.createTextMessage("msg1"));
+ producer.send(producerSession.createTextMessage("msg2"));
+ producer.send(producerSession.createTextMessage("msg3"));
+ producer.send(producerSession.createTextMessage("msg4"));
+
+ con2.close();
+
+ _logger.info("Starting connection");
+ con.start();
+ TextMessage tm = (TextMessage) consumer.receive();
+ tm.acknowledge();
+ _logger.info("Received and acknowledged first message");
+ consumer.receive();
+ consumer.receive();
+ consumer.receive();
+ _logger.info("Received all four messages. About to disconnect and reconnect");
+
+ con.close();
+ con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+ consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ consumer = consumerSession.createConsumer(queue);
+
+ _logger.info("Starting second consumer connection");
+ con.start();
+
+ tm = (TextMessage) consumer.receiveNoWait();
+ assertNull(tm);
+ _logger.info("No messages redelivered as is expected");
+
+ _logger.info("Actually:" + store.getMessageMap().size());
+ assertTrue(store.getMessageMap().size() == 0);
+ con.close();
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new VMBrokerSetup(new junit.framework.TestSuite(DisconnectAndRedeliverTest.class));
+ }
+}