summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
Diffstat (limited to 'java')
-rw-r--r--java/systests/src/old_test/java/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java184
-rw-r--r--java/systests/src/old_test/java/org/apache/qpid/server/protocol/TestProtocolInitiation.java266
-rw-r--r--java/systests/src/old_test/java/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java49
-rw-r--r--java/systests/src/old_test/java/org/apache/qpid/server/queue/QueuePerfTest.java258
-rw-r--r--java/systests/src/old_test/java/org/apache/qpid/server/queue/SendPerfTest.java183
-rw-r--r--java/systests/src/old_test/java/org/apache/qpid/server/util/ConcurrentTest.java79
-rw-r--r--java/systests/src/old_test/java/org/apache/qpid/test/unit/ack/DisconnectAndRedeliverTest.java216
7 files changed, 0 insertions, 1235 deletions
diff --git a/java/systests/src/old_test/java/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java b/java/systests/src/old_test/java/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java
deleted file mode 100644
index ff0d58ad69..0000000000
--- a/java/systests/src/old_test/java/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.exchange;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.queue.NoConsumersException;
-import org.apache.qpid.server.util.TimedRun;
-import org.apache.qpid.server.util.AveragedRun;
-import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.ContentBody;
-
-import java.util.List;
-
-/**
- * Want to vary the number of regsitrations, messages and matches and measure
- * the corresponding variance in execution time.
- * <p/>
- * Each registration will contain the 'All' header, even registrations will
- * contain the 'Even' header and odd headers will contain the 'Odd' header.
- * In additions each regsitration will have a unique value for the 'Specific'
- * header as well.
- * <p/>
- * Messages can then be routed to all registrations, to even- or odd- registrations
- * or to a specific registration.
- *
- */
-public class HeadersExchangePerformanceTest extends AbstractHeadersExchangeTest
-{
- private static enum Mode {ALL, ODD_OR_EVEN, SPECIFIC}
-
- private final TestQueue[] queues;
- private final Mode mode;
-
- public HeadersExchangePerformanceTest(Mode mode, int registrations) throws AMQException
- {
- this.mode = mode;
- queues = new TestQueue[registrations];
- for (int i = 0; i < queues.length; i++)
- {
- switch(mode)
- {
- case ALL:
- queues[i] = bind(new FastQueue("Queue" + i), "All");
- break;
- case ODD_OR_EVEN:
- queues[i] = bind(new FastQueue("Queue" + i), "All", oddOrEven(i));
- break;
- case SPECIFIC:
- queues[i] = bind(new FastQueue("Queue" + i), "All", oddOrEven(i), "Specific"+ i);
- break;
- }
- }
- }
-
- void sendToAll(int count) throws AMQException
- {
- send(count, "All=True");
- }
-
- void sendToOdd(int count) throws AMQException
- {
- send(count, "All=True", "Odd=True");
- }
-
- void sendToEven(int count) throws AMQException
- {
- send(count, "All=True", "Even=True");
- }
-
- void sendToAllSpecifically(int count) throws AMQException
- {
- for (int i = 0; i < queues.length; i++)
- {
- sendToSpecific(count, i);
- }
- }
-
- void sendToSpecific(int count, int index) throws AMQException
- {
- send(count, "All=True", oddOrEven(index) + "=True", "Specific=" + index);
- }
-
- private void send(int count, String... headers) throws AMQException
- {
- for (int i = 0; i < count; i++)
- {
- route(new Message("Message" + i, headers));
- }
- }
-
- private static String oddOrEven(int i)
- {
- return (i % 2 == 0 ? "Even" : "Odd");
- }
-
- static class FastQueue extends TestQueue
- {
-
- public FastQueue(String name) throws AMQException
- {
- super(name);
- }
-
- public void deliver(BasicPublishBody publishBody, ContentHeaderBody contentHeaderBody, List<ContentBody> contentBodies) throws NoConsumersException
- {
- //just discard as we are not testing routing functionality here
- }
- }
-
- static class Test extends TimedRun
- {
- private final Mode mode;
- private final int registrations;
- private final int count;
- private HeadersExchangePerformanceTest test;
-
- Test(Mode mode, int registrations, int count)
- {
- super(mode + ", registrations=" + registrations + ", count=" + count);
- this.mode = mode;
- this.registrations = registrations;
- this.count = count;
- }
-
- protected void setup() throws Exception
- {
- test = new HeadersExchangePerformanceTest(mode, registrations);
- run(100); //do a warm up run before times start
- }
-
- protected void teardown() throws Exception
- {
- test = null;
- System.gc();
- }
-
- protected void run() throws Exception
- {
- run(count);
- }
-
- private void run(int count) throws Exception
- {
- switch(mode)
- {
- case ALL:
- test.sendToAll(count);
- break;
- default:
- System.out.println("Test for " + mode + " not yet implemented.");
- }
- }
- }
-
- public static void main(String[] argv) throws Exception
- {
- int registrations = Integer.parseInt(argv[0]);
- int messages = Integer.parseInt(argv[1]);
- int iterations = Integer.parseInt(argv[2]);
- TimedRun test = new Test(Mode.ALL, registrations, messages);
- AveragedRun tests = new AveragedRun(test, iterations);
- System.out.println(tests.call());
- }
-}
-
diff --git a/java/systests/src/old_test/java/org/apache/qpid/server/protocol/TestProtocolInitiation.java b/java/systests/src/old_test/java/org/apache/qpid/server/protocol/TestProtocolInitiation.java
deleted file mode 100644
index e76c164f64..0000000000
--- a/java/systests/src/old_test/java/org/apache/qpid/server/protocol/TestProtocolInitiation.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol;
-
-import org.apache.qpid.codec.AMQDecoder;
-import org.apache.qpid.codec.AMQEncoder;
-import org.apache.qpid.framing.*;
-import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.WriteFuture;
-import org.apache.mina.filter.codec.ProtocolDecoderOutput;
-import org.apache.mina.filter.codec.ProtocolEncoderOutput;
-import org.apache.mina.filter.codec.support.SimpleProtocolDecoderOutput;
-
-import junit.framework.TestCase;
-
-/**
- * This test suite tests the handling of protocol initiation frames and related issues.
- */
-public class TestProtocolInitiation extends TestCase implements ProtocolVersionList
-{
- private AMQPFastProtocolHandler _protocolHandler;
-
- private MockIoSession _mockIoSession;
-
- /**
- * We need to use the object encoder mechanism so to allow us to retrieve the
- * output (a bytebuffer) we define our own encoder output class. The encoder
- * writes the encoded data to this class, from where we can retrieve it during
- * the test run.
- */
- private class TestProtocolEncoderOutput implements ProtocolEncoderOutput
- {
- public ByteBuffer result;
-
- public void write(ByteBuffer buf)
- {
- result = buf;
- }
-
- public void mergeAll()
- {
- throw new UnsupportedOperationException();
- }
-
- public WriteFuture flush()
- {
- throw new UnsupportedOperationException();
- }
- }
-
- private class TestProtocolDecoderOutput implements ProtocolDecoderOutput
- {
- public Object result;
-
- public void write(Object buf)
- {
- result = buf;
- }
-
- public void flush()
- {
- throw new UnsupportedOperationException();
- }
- }
-
- protected void setUp() throws Exception
- {
- super.setUp();
- _mockIoSession = new MockIoSession();
- _protocolHandler = new AMQPFastProtocolHandler(null, null);
- }
-
-
- /**
- * Tests that the AMQDecoder handles invalid protocol classes
- * @throws Exception
- */
- public void testDecoderValidateProtocolClass() throws Exception
- {
- try
- {
- ProtocolInitiation pi = createValidProtocolInitiation();
- pi.protocolClass = 2;
- decodePI(pi);
- fail("expected exception did not occur");
- }
- catch (AMQProtocolClassException m)
- {
- // ok
- }
- catch (Exception e)
- {
- fail("expected AMQProtocolClassException, got " + e);
- }
- }
-
- /**
- * Tests that the AMQDecoder handles invalid protocol instance numbers
- * @throws Exception
- */
- public void testDecoderValidatesProtocolInstance() throws Exception
- {
- try
- {
- ProtocolInitiation pi = createValidProtocolInitiation();
- pi.protocolInstance = 2;
- decodePI(pi);
- fail("expected exception did not occur");
- }
- catch (AMQProtocolInstanceException m)
- {
- // ok
- }
- catch (Exception e)
- {
- fail("expected AMQProtocolInstanceException, got " + e);
- }
- }
-
- /**
- * Tests that the AMQDecoder handles invalid protocol major
- * @throws Exception
- */
- public void testDecoderValidatesProtocolMajor() throws Exception
- {
- try
- {
- ProtocolInitiation pi = createValidProtocolInitiation();
- pi.protocolMajor = 2;
- decodePI(pi);
- fail("expected exception did not occur");
- }
- catch (AMQProtocolVersionException m)
- {
- // ok
- }
- catch (Exception e)
- {
- fail("expected AMQProtocolVersionException, got " + e);
- }
- }
-
- /**
- * Tests that the AMQDecoder handles invalid protocol minor
- * @throws Exception
- */
- public void testDecoderValidatesProtocolMinor() throws Exception
- {
- try
- {
- ProtocolInitiation pi = createValidProtocolInitiation();
- pi.protocolMinor = 99;
- decodePI(pi);
- fail("expected exception did not occur");
- }
- catch (AMQProtocolVersionException m)
- {
- // ok
- }
- catch (Exception e)
- {
- fail("expected AMQProtocolVersionException, got " + e);
- }
- }
-
- /**
- * Tests that the AMQDecoder accepts a valid PI
- * @throws Exception
- */
- public void testDecoderValidatesHeader() throws Exception
- {
- try
- {
- ProtocolInitiation pi = createValidProtocolInitiation();
- pi.header = new char[] {'P', 'Q', 'M', 'A' };
- decodePI(pi);
- fail("expected exception did not occur");
- }
- catch (AMQProtocolHeaderException m)
- {
- // ok
- }
- catch (Exception e)
- {
- fail("expected AMQProtocolHeaderException, got " + e);
- }
- }
-
- /**
- * Test that a valid header is passed by the decoder.
- * @throws Exception
- */
- public void testDecoderAcceptsValidHeader() throws Exception
- {
- ProtocolInitiation pi = createValidProtocolInitiation();
- decodePI(pi);
- }
-
- /**
- * This test checks that an invalid protocol header results in the
- * connection being closed.
- */
- public void testInvalidProtocolHeaderClosesConnection() throws Exception
- {
- AMQProtocolHeaderException pe = new AMQProtocolHeaderException("Test");
- _protocolHandler.exceptionCaught(_mockIoSession, pe);
- assertNotNull(_mockIoSession.getLastWrittenObject());
- Object piResponse = _mockIoSession.getLastWrittenObject();
- assertEquals(piResponse.getClass(), ProtocolInitiation.class);
- ProtocolInitiation pi = (ProtocolInitiation) piResponse;
- assertEquals("Protocol Initiation sent out was not the broker's expected header", pi,
- createValidProtocolInitiation());
- assertTrue("Session has not been closed", _mockIoSession.isClosing());
- }
-
- private ProtocolInitiation createValidProtocolInitiation()
- {
- /* Find last protocol version in protocol version list. Make sure last protocol version
- listed in the build file (build-module.xml) is the latest version which will be used
- here. */
- int i = pv.length - 1;
- return new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR]);
- }
-
- /**
- * Helper that encodes a protocol initiation and attempts to decode it
- * @param pi
- * @throws Exception
- */
- private void decodePI(ProtocolInitiation pi) throws Exception
- {
- // we need to do this test at the level of the decoder since we initially only expect PI frames
- // so the protocol handler is not set up to know whether it should be expecting a PI frame or
- // a different type of frame
- AMQDecoder decoder = new AMQDecoder(true);
- AMQEncoder encoder = new AMQEncoder();
- TestProtocolEncoderOutput peo = new TestProtocolEncoderOutput();
- encoder.encode(_mockIoSession, pi, peo);
- TestProtocolDecoderOutput pdo = new TestProtocolDecoderOutput();
- decoder.decode(_mockIoSession, peo.result, pdo);
- ((ProtocolInitiation) pdo.result).checkVersion(this);
- }
-
- public static junit.framework.Test suite()
- {
- return new junit.framework.TestSuite(TestProtocolInitiation.class);
- }
-}
diff --git a/java/systests/src/old_test/java/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java b/java/systests/src/old_test/java/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java
deleted file mode 100644
index 11c0026455..0000000000
--- a/java/systests/src/old_test/java/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.server.util.AveragedRun;
-import org.apache.qpid.server.util.ConcurrentTest;
-
-public class QueueConcurrentPerfTest extends QueuePerfTest
-{
- QueueConcurrentPerfTest(Factory factory, int queueCount, int messages)
- {
- super(factory, queueCount, messages);
- }
-
- public static void main(String[] argv) throws Exception
- {
- Factory[] factories = new Factory[]{SYNCHRONIZED, CONCURRENT};
- int iterations = 5;
- String label = argv.length > 0 ? argv[0]: null;
- System.out.println((label == null ? "" : "Label, ") + "Queue Type, No. of Queues, No. of Operations, Avg Time, Min Time, Max Time");
- //vary number of queues:
- for(Factory f : factories)
- {
- run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 100, 10000), iterations), 5));
- run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 1000, 10000), iterations), 5));
- run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 10000, 10000), iterations), 5));
- run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 1000, 1000), iterations), 5));
- run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 1000, 100000), iterations), 5));
- }
- }
-}
diff --git a/java/systests/src/old_test/java/org/apache/qpid/server/queue/QueuePerfTest.java b/java/systests/src/old_test/java/org/apache/qpid/server/queue/QueuePerfTest.java
deleted file mode 100644
index 5b3857396d..0000000000
--- a/java/systests/src/old_test/java/org/apache/qpid/server/queue/QueuePerfTest.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.server.util.AveragedRun;
-import org.apache.qpid.server.util.TimedRun;
-import org.apache.qpid.server.util.RunStats;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-public class QueuePerfTest extends TimedRun
-{
- private final Factory _factory;
- private final int _queueCount;
- private final int _messages;
- private final String _msg = "";
- private List<Queue<String>> _queues;
-
- QueuePerfTest(Factory factory, int queueCount, int messages)
- {
- super(factory + ", " + queueCount + ", " + messages);
- _factory = factory;
- _queueCount = queueCount;
- _messages = messages;
- }
-
- protected void setup() throws Exception
- {
- //init
- int count = Integer.getInteger("prepopulate", 0);
-// System.err.println("Prepopulating with " + count + " items");
- _queues = new ArrayList<Queue<String>>(_queueCount);
- for (int i = 0; i < _queueCount; i++)
- {
- Queue<String> q = _factory.create();
- for(int j = 0; j < count; ++j)
- {
- q.add("Item"+ j);
- }
- _queues.add(q);
- }
- System.gc();
- }
-
- protected void teardown() throws Exception
- {
- System.gc();
- }
-
- protected void run() throws Exception
- {
- //dispatch
- for (int i = 0; i < _messages; i++)
- {
- for (Queue<String> q : _queues)
- {
- q.offer(_msg);
- q.poll();
- }
- }
- }
-
- static interface Factory
- {
- Queue<String> create();
- }
-
- static Factory CONCURRENT = new Factory()
- {
- public Queue<String> create()
- {
- return new ConcurrentLinkedQueue<String>();
- }
-
- public String toString()
- {
- return "ConcurrentLinkedQueue";
- }
-
- };
-
- static Factory SYNCHRONIZED = new Factory()
- {
- public Queue<String> create()
- {
- return new SynchronizedQueue<String>(new LinkedList<String>());
- }
-
-
- public String toString()
- {
- return "Synchronized LinkedList";
- }
- };
-
- static Factory PLAIN = new Factory()
- {
- public Queue<String> create()
- {
- return new LinkedList<String>();
- }
-
- public String toString()
- {
- return "Plain LinkedList";
- }
- };
-
- static class SynchronizedQueue<E> implements Queue<E>
- {
- private final Queue<E> queue;
-
- SynchronizedQueue(Queue<E> queue)
- {
- this.queue = queue;
- }
-
- public synchronized E element()
- {
- return queue.element();
- }
-
- public synchronized boolean offer(E o)
- {
- return queue.offer(o);
- }
-
- public synchronized E peek()
- {
- return queue.peek();
- }
-
- public synchronized E poll()
- {
- return queue.poll();
- }
-
- public synchronized E remove()
- {
- return queue.remove();
- }
-
- public synchronized int size()
- {
- return queue.size();
- }
-
- public synchronized boolean isEmpty()
- {
- return queue.isEmpty();
- }
-
- public synchronized boolean contains(Object o)
- {
- return queue.contains(o);
- }
-
- public synchronized Iterator<E> iterator()
- {
- return queue.iterator();
- }
-
- public synchronized Object[] toArray()
- {
- return queue.toArray();
- }
-
- public synchronized <T>T[] toArray(T[] a)
- {
- return queue.toArray(a);
- }
-
- public synchronized boolean add(E o)
- {
- return queue.add(o);
- }
-
- public synchronized boolean remove(Object o)
- {
- return queue.remove(o);
- }
-
- public synchronized boolean containsAll(Collection<?> c)
- {
- return queue.containsAll(c);
- }
-
- public synchronized boolean addAll(Collection<? extends E> c)
- {
- return queue.addAll(c);
- }
-
- public synchronized boolean removeAll(Collection<?> c)
- {
- return queue.removeAll(c);
- }
-
- public synchronized boolean retainAll(Collection<?> c)
- {
- return queue.retainAll(c);
- }
-
- public synchronized void clear()
- {
- queue.clear();
- }
- }
-
- static void run(String label, AveragedRun test) throws Exception
- {
- RunStats stats = test.call();
- System.out.println((label == null ? "" : label + ", ") + test
- + ", " + stats.getAverage() + ", " + stats.getMax() + ", " + stats.getMin());
- }
-
- public static void main(String[] argv) throws Exception
- {
- Factory[] factories = new Factory[]{PLAIN, SYNCHRONIZED, CONCURRENT};
- int iterations = 5;
- String label = argv.length > 0 ? argv[0]: null;
- System.out.println((label == null ? "" : "Label, ") + "Queue Type, No. of Queues, No. of Operations, Avg Time, Min Time, Max Time");
- //vary number of queues:
-
- for(Factory f : factories)
- {
- run(label, new AveragedRun(new QueuePerfTest(f, 100, 10000), iterations));
- run(label, new AveragedRun(new QueuePerfTest(f, 1000, 10000), iterations));
- run(label, new AveragedRun(new QueuePerfTest(f, 10000, 10000), iterations));
- run(label, new AveragedRun(new QueuePerfTest(f, 1000, 1000), iterations));
- run(label, new AveragedRun(new QueuePerfTest(f, 1000, 100000), iterations));
- }
- }
-
-}
diff --git a/java/systests/src/old_test/java/org/apache/qpid/server/queue/SendPerfTest.java b/java/systests/src/old_test/java/org/apache/qpid/server/queue/SendPerfTest.java
deleted file mode 100644
index 9784b2f671..0000000000
--- a/java/systests/src/old_test/java/org/apache/qpid/server/queue/SendPerfTest.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.RequiredDeliveryException;
-import org.apache.qpid.server.configuration.ServerConfiguration;
-import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.txn.NonTransactionalContext;
-import org.apache.qpid.server.exchange.AbstractExchange;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.handler.OnCurrentThreadExecutor;
-import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.protocol.MockIoSession;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.SkeletonMessageStore;
-import org.apache.qpid.server.util.AveragedRun;
-import org.apache.qpid.server.util.TestApplicationRegistry;
-import org.apache.qpid.server.util.TimedRun;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.LinkedList;
-
-public class SendPerfTest extends TimedRun
-{
- private int _messages = 1000;
- private int _clients = 10;
- private List<AMQQueue> _queues;
-
- public SendPerfTest(int clients, int messages)
- {
- super("SendPerfTest, msgs=" + messages + ", clients=" + clients);
- _messages = messages;
- _clients = clients;
- }
-
- protected void setup() throws Exception
- {
- _queues = initQueues(_clients);
- System.gc();
- }
-
- protected void teardown() throws Exception
- {
- System.gc();
- }
-
- protected void run() throws Exception
- {
- deliver(_messages, _queues);
- }
-
- //have a dummy AMQProtocolSession that does nothing on the writeFrame()
- //set up x number of queues
- //create necessary bits and pieces to deliver a message
- //deliver y messages to each queue
-
- public static void main(String[] argv) throws Exception
- {
- ApplicationRegistry.initialise(new TestApplicationRegistry(new ServerConfiguration(new PropertiesConfiguration())));
- int clients = Integer.parseInt(argv[0]);
- int messages = Integer.parseInt(argv[1]);
- int iterations = Integer.parseInt(argv[2]);
- AveragedRun test = new AveragedRun(new SendPerfTest(clients, messages), iterations);
- test.run();
- }
-
- /**
- * Delivers messages to a number of queues.
- * @param count the number of messages to deliver
- * @param queues the list of queues
- * @throws NoConsumersException
- */
- static void deliver(int count, List<AMQQueue> queues) throws AMQException
- {
- BasicPublishBody publish = new BasicPublishBody();
- publish.exchange = new NullExchange().getName();
- ContentHeaderBody header = new ContentHeaderBody();
- List<ContentBody> body = new ArrayList<ContentBody>();
- MessageStore messageStore = new SkeletonMessageStore();
- // channel can be null since it is only used in ack processing which does not apply to this test
- TransactionalContext txContext = new NonTransactionalContext(messageStore, null,
- new LinkedList<RequiredDeliveryException>());
- body.add(new ContentBody());
- MessageHandleFactory factory = new MessageHandleFactory();
- for (int i = 0; i < count; i++)
- {
- // this routes and delivers the message
- AMQMessage msg = new AMQMessage(i, publish, txContext, header, queues, body, messageStore,
- factory);
- }
- }
-
- static List<AMQQueue> initQueues(int number) throws AMQException
- {
- Exchange exchange = new NullExchange();
- List<AMQQueue> queues = new ArrayList<AMQQueue>(number);
- for (int i = 0; i < number; i++)
- {
- AMQQueue q = createQueue("Queue" + (i + 1));
- q.bind("routingKey", exchange);
- try
- {
- q.registerProtocolSession(createSession(), 1, "1", false);
- }
- catch (Exception e)
- {
- throw new AMQException("Error creating protocol session: " + e, e);
- }
- queues.add(q);
- }
- return queues;
- }
-
- static AMQQueue createQueue(String name) throws AMQException
- {
- return new AMQQueue(name, false, null, false, ApplicationRegistry.getInstance().getQueueRegistry(),
- new OnCurrentThreadExecutor());
- }
-
- static AMQProtocolSession createSession() throws Exception
- {
- IApplicationRegistry reg = ApplicationRegistry.getInstance();
- AMQCodecFactory codecFactory = new AMQCodecFactory(true);
- AMQMinaProtocolSession result = new AMQMinaProtocolSession(new MockIoSession(), reg.getQueueRegistry(), reg.getExchangeRegistry(), codecFactory);
- result.addChannel(new AMQChannel(1, null, null));
- return result;
- }
-
- static class NullExchange extends AbstractExchange
- {
- public String getName()
- {
- return "NullExchange";
- }
-
- protected ExchangeMBean createMBean()
- {
- return null;
- }
-
- public void registerQueue(String routingKey, AMQQueue queue, FieldTable args) throws AMQException
- {
- }
-
- public void deregisterQueue(String routingKey, AMQQueue queue) throws AMQException
- {
- }
-
- public void route(AMQMessage payload) throws AMQException
- {
- }
- }
-}
diff --git a/java/systests/src/old_test/java/org/apache/qpid/server/util/ConcurrentTest.java b/java/systests/src/old_test/java/org/apache/qpid/server/util/ConcurrentTest.java
deleted file mode 100644
index 1ae8d3205d..0000000000
--- a/java/systests/src/old_test/java/org/apache/qpid/server/util/ConcurrentTest.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.util;
-
-public class ConcurrentTest extends TimedRun
-{
- private final TimedRun _test;
- private final Thread[] _threads;
-
- public ConcurrentTest(TimedRun test, int threads)
- {
- super(test.toString());
- _test = test;
- _threads = new Thread[threads];
- }
-
- protected void setup() throws Exception
- {
- _test.setup();
- for(int i = 0; i < _threads.length; i++)
- {
- _threads[i] = new Thread(new Runner());
- }
- }
-
- protected void teardown() throws Exception
- {
- _test.teardown();
- }
-
- protected void run() throws Exception
- {
- for(Thread t : _threads)
- {
- t.start();
- }
- for(Thread t : _threads)
- {
- t.join();
- }
- }
-
- private class Runner implements Runnable
- {
- private Exception error;
-
- public void run()
- {
- try
- {
- _test.run();
- }
- catch(Exception e)
- {
- error = e;
- e.printStackTrace();
- }
- }
- }
-
-}
diff --git a/java/systests/src/old_test/java/org/apache/qpid/test/unit/ack/DisconnectAndRedeliverTest.java b/java/systests/src/old_test/java/org/apache/qpid/test/unit/ack/DisconnectAndRedeliverTest.java
deleted file mode 100644
index 04b6bceb4f..0000000000
--- a/java/systests/src/old_test/java/org/apache/qpid/test/unit/ack/DisconnectAndRedeliverTest.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.test.unit.ack;
-
-import junit.framework.TestCase;
-import org.apache.log4j.Logger;
-import org.apache.log4j.xml.DOMConfigurator;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.transport.TransportConnection;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
-import org.apache.qpid.server.util.TestApplicationRegistry;
-
-import javax.jms.*;
-
-public class DisconnectAndRedeliverTest extends InternalBrokerBaseCase
-{
- private static final Logger _logger = Logger.getLogger(DisconnectAndRedeliverTest.class);
-
- static
- {
- String workdir = System.getProperty("QPID_WORK");
- if (workdir == null || workdir.equals(""))
- {
- String tempdir = System.getProperty("java.io.tmpdir");
- System.out.println("QPID_WORK not set using tmp directory: " + tempdir);
- System.setProperty("QPID_WORK", tempdir);
- }
- DOMConfigurator.configure("../broker/etc/log4j.xml");
- }
-
- @Override
- public void createBroker() throws Exception
- {
- super.createBroker();
- TransportConnection.createVMBroker(ApplicationRegistry.DEFAULT_INSTANCE);
- }
-
- @Override
- public void stopBroker()
- {
- TransportConnection.killVMBroker(ApplicationRegistry.DEFAULT_INSTANCE);
- super.stopBroker();
- }
-
- /**
- * This tests that when there are unacknowledged messages on a channel they are requeued for delivery when
- * the channel is closed.
- *
- * @throws Exception
- */
- public void testDisconnectRedeliversMessages() throws Exception
- {
- Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
-
- TestableMemoryMessageStore store = (TestableMemoryMessageStore) ApplicationRegistry.getInstance().getMessageStore();
-
- Session consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- AMQQueue queue = new AMQQueue("someQ", "someQ", false, false);
- MessageConsumer consumer = consumerSession.createConsumer(queue);
- //force synch to ensure the consumer has resulted in a bound queue
- ((AMQSession) consumerSession).declareExchangeSynch("amq.direct", "direct");
-
- Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
-
-
- Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- MessageProducer producer = producerSession.createProducer(queue);
-
- _logger.info("Sending four messages");
- producer.send(producerSession.createTextMessage("msg1"));
- producer.send(producerSession.createTextMessage("msg2"));
- producer.send(producerSession.createTextMessage("msg3"));
- producer.send(producerSession.createTextMessage("msg4"));
-
- con2.close();
-
- _logger.info("Starting connection");
- con.start();
- TextMessage tm = (TextMessage) consumer.receive();
- tm.acknowledge();
- _logger.info("Received and acknowledged first message");
- consumer.receive();
- consumer.receive();
- consumer.receive();
- _logger.info("Received all four messages. About to disconnect and reconnect");
-
- con.close();
- con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
- consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- consumer = consumerSession.createConsumer(queue);
-
- _logger.info("Starting second consumer connection");
- con.start();
-
- tm = (TextMessage) consumer.receive(3000);
- assertEquals("msg2", tm.getText());
-
-
- tm = (TextMessage) consumer.receive(3000);
- assertEquals("msg3", tm.getText());
-
-
- tm = (TextMessage) consumer.receive(3000);
- assertEquals("msg4", tm.getText());
-
- _logger.info("Received redelivery of three messages. Acknowledging last message");
- tm.acknowledge();
-
- con.close();
-
- con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
- consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- consumer = consumerSession.createConsumer(queue);
- _logger.info("Starting third consumer connection");
- con.start();
- tm = (TextMessage) consumer.receiveNoWait();
- assertNull(tm);
- _logger.info("No messages redelivered as is expected");
- con.close();
-
- con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
- consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- consumer = consumerSession.createConsumer(queue);
- _logger.info("Starting fourth consumer connection");
- con.start();
- tm = (TextMessage) consumer.receive(3000);
- assertNull(tm);
- _logger.info("No messages redelivered as is expected");
- con.close();
-
- _logger.info("Actually:" + store.getMessageMetaDataMap().size());
- // assertTrue(store.getMessageMap().size() == 0);
- }
-
- /**
- * Tests that unacknowledged messages are thrown away when the channel is closed and they cannot be
- * requeued (due perhaps to the queue being deleted).
- *
- * @throws Exception
- */
- public void testDisconnectWithTransientQueueThrowsAwayMessages() throws Exception
- {
-
- Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
- TestableMemoryMessageStore store = (TestableMemoryMessageStore) ApplicationRegistry.getInstance().getMessageStore();
- Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- Queue queue = new AMQQueue("someQ", "someQ", false, true);
- MessageConsumer consumer = consumerSession.createConsumer(queue);
- //force synch to ensure the consumer has resulted in a bound queue
- ((AMQSession) consumerSession).declareExchangeSynch("amq.direct", "direct");
-
- Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
- Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- MessageProducer producer = producerSession.createProducer(queue);
-
- _logger.info("Sending four messages");
- producer.send(producerSession.createTextMessage("msg1"));
- producer.send(producerSession.createTextMessage("msg2"));
- producer.send(producerSession.createTextMessage("msg3"));
- producer.send(producerSession.createTextMessage("msg4"));
-
- con2.close();
-
- _logger.info("Starting connection");
- con.start();
- TextMessage tm = (TextMessage) consumer.receive();
- tm.acknowledge();
- _logger.info("Received and acknowledged first message");
- consumer.receive();
- consumer.receive();
- consumer.receive();
- _logger.info("Received all four messages. About to disconnect and reconnect");
-
- con.close();
- con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
- consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- consumer = consumerSession.createConsumer(queue);
-
- _logger.info("Starting second consumer connection");
- con.start();
-
- tm = (TextMessage) consumer.receiveNoWait();
- assertNull(tm);
- _logger.info("No messages redelivered as is expected");
-
- _logger.info("Actually:" + store.getMessageMetaDataMap().size());
- assertTrue(store.getMessageMetaDataMap().size() == 0);
- con.close();
- }
-
- public static junit.framework.Test suite()
- {
- return new junit.framework.TestSuite(DisconnectAndRedeliverTest.class);
- }
-}