summaryrefslogtreecommitdiff
path: root/java/broker/test/src
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/test/src')
-rw-r--r--java/broker/test/src/org/apache/qpid/server/UnitTests.java39
-rw-r--r--java/broker/test/src/org/apache/qpid/server/configuration/TestPropertyUtils.java50
-rw-r--r--java/broker/test/src/org/apache/qpid/server/configuration/UnitTests.java32
-rw-r--r--java/broker/test/src/org/apache/qpid/server/exchange/AbstractHeadersExchangeTest.java212
-rw-r--r--java/broker/test/src/org/apache/qpid/server/exchange/HeadersBindingTest.java200
-rw-r--r--java/broker/test/src/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java181
-rw-r--r--java/broker/test/src/org/apache/qpid/server/exchange/HeadersExchangeTest.java81
-rw-r--r--java/broker/test/src/org/apache/qpid/server/exchange/UnitTests.java32
-rw-r--r--java/broker/test/src/org/apache/qpid/server/protocol/MockIoSession.java288
-rw-r--r--java/broker/test/src/org/apache/qpid/server/protocol/TestProtocolInitiation.java212
-rw-r--r--java/broker/test/src/org/apache/qpid/server/protocol/UnitTests.java32
-rw-r--r--java/broker/test/src/org/apache/qpid/server/queue/AckTest.java243
-rw-r--r--java/broker/test/src/org/apache/qpid/server/queue/ConcurrencyTest.java261
-rw-r--r--java/broker/test/src/org/apache/qpid/server/queue/DeliveryManagerTest.java159
-rw-r--r--java/broker/test/src/org/apache/qpid/server/queue/MessageTestHelper.java49
-rw-r--r--java/broker/test/src/org/apache/qpid/server/queue/MockProtocolSession.java121
-rw-r--r--java/broker/test/src/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java46
-rw-r--r--java/broker/test/src/org/apache/qpid/server/queue/QueuePerfTest.java255
-rw-r--r--java/broker/test/src/org/apache/qpid/server/queue/SendPerfTest.java171
-rw-r--r--java/broker/test/src/org/apache/qpid/server/queue/SubscriptionManagerTest.java105
-rw-r--r--java/broker/test/src/org/apache/qpid/server/queue/SubscriptionSetTest.java149
-rw-r--r--java/broker/test/src/org/apache/qpid/server/queue/TestSubscription.java84
-rw-r--r--java/broker/test/src/org/apache/qpid/server/queue/UnitTests.java38
-rw-r--r--java/broker/test/src/org/apache/qpid/server/store/SkeletonMessageStore.java99
-rw-r--r--java/broker/test/src/org/apache/qpid/server/store/TestReferenceCounting.java68
-rw-r--r--java/broker/test/src/org/apache/qpid/server/store/TestableMemoryMessageStore.java39
-rw-r--r--java/broker/test/src/org/apache/qpid/server/store/UnitTests.java34
-rw-r--r--java/broker/test/src/org/apache/qpid/server/util/AveragedRun.java63
-rw-r--r--java/broker/test/src/org/apache/qpid/server/util/ConcurrentTest.java76
-rw-r--r--java/broker/test/src/org/apache/qpid/server/util/LoggingProxyTest.java89
-rw-r--r--java/broker/test/src/org/apache/qpid/server/util/NullApplicationRegistry.java103
-rw-r--r--java/broker/test/src/org/apache/qpid/server/util/NullAuthenticationManager.java82
-rw-r--r--java/broker/test/src/org/apache/qpid/server/util/RunStats.java54
-rw-r--r--java/broker/test/src/org/apache/qpid/server/util/TimedRun.java49
-rw-r--r--java/broker/test/src/org/apache/qpid/server/util/UnitTests.java32
35 files changed, 3828 insertions, 0 deletions
diff --git a/java/broker/test/src/org/apache/qpid/server/UnitTests.java b/java/broker/test/src/org/apache/qpid/server/UnitTests.java
new file mode 100644
index 0000000000..377b1fb64e
--- /dev/null
+++ b/java/broker/test/src/org/apache/qpid/server/UnitTests.java
@@ -0,0 +1,39 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server;
+
+import junit.framework.JUnit4TestAdapter;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+ org.apache.qpid.server.configuration.UnitTests.class,
+ org.apache.qpid.server.exchange.UnitTests.class,
+ org.apache.qpid.server.protocol.UnitTests.class,
+ org.apache.qpid.server.queue.UnitTests.class,
+ org.apache.qpid.server.store.UnitTests.class,
+ org.apache.qpid.server.util.UnitTests.class
+ })
+public class UnitTests
+{
+ public static junit.framework.Test suite()
+ {
+ return new JUnit4TestAdapter(UnitTests.class);
+ }
+}
diff --git a/java/broker/test/src/org/apache/qpid/server/configuration/TestPropertyUtils.java b/java/broker/test/src/org/apache/qpid/server/configuration/TestPropertyUtils.java
new file mode 100644
index 0000000000..bd78d1c786
--- /dev/null
+++ b/java/broker/test/src/org/apache/qpid/server/configuration/TestPropertyUtils.java
@@ -0,0 +1,50 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.configuration;
+
+import junit.framework.JUnit4TestAdapter;
+import org.apache.qpid.configuration.PropertyException;
+import org.apache.qpid.configuration.PropertyUtils;
+import static org.junit.Assert.assertEquals;
+import org.junit.Test;
+
+// TODO: This belongs in the "common" module.
+public class TestPropertyUtils
+{
+ @Test
+ public void testSimpleExpansion() throws PropertyException
+ {
+ System.setProperty("banana", "fruity");
+ String expandedProperty = PropertyUtils.replaceProperties("${banana}");
+ assertEquals(expandedProperty, "fruity");
+ }
+
+ @Test
+ public void testDualExpansion() throws PropertyException
+ {
+ System.setProperty("banana", "fruity");
+ System.setProperty("concrete", "horrible");
+ String expandedProperty = PropertyUtils.replaceProperties("${banana}xyz${concrete}");
+ assertEquals(expandedProperty, "fruityxyzhorrible");
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new JUnit4TestAdapter(TestPropertyUtils.class);
+ }
+}
diff --git a/java/broker/test/src/org/apache/qpid/server/configuration/UnitTests.java b/java/broker/test/src/org/apache/qpid/server/configuration/UnitTests.java
new file mode 100644
index 0000000000..4c70d7c4da
--- /dev/null
+++ b/java/broker/test/src/org/apache/qpid/server/configuration/UnitTests.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.configuration;
+
+import junit.framework.JUnit4TestAdapter;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+@RunWith(Suite.class)
+@Suite.SuiteClasses({TestPropertyUtils.class})
+public class UnitTests
+{
+ public static junit.framework.Test suite()
+ {
+ return new JUnit4TestAdapter(UnitTests.class);
+ }
+}
diff --git a/java/broker/test/src/org/apache/qpid/server/exchange/AbstractHeadersExchangeTest.java b/java/broker/test/src/org/apache/qpid/server/exchange/AbstractHeadersExchangeTest.java
new file mode 100644
index 0000000000..ac04c51e46
--- /dev/null
+++ b/java/broker/test/src/org/apache/qpid/server/exchange/AbstractHeadersExchangeTest.java
@@ -0,0 +1,212 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.NoConsumersException;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.SkeletonMessageStore;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.AMQException;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Set;
+import java.util.HashSet;
+
+public class AbstractHeadersExchangeTest
+{
+ private final HeadersExchange exchange = new HeadersExchange();
+ protected final Set<TestQueue> queues = new HashSet<TestQueue>();
+ private int count;
+
+ protected TestQueue bindDefault(String... bindings) throws AMQException
+ {
+ return bind("Queue" + (++count), bindings);
+ }
+
+ protected TestQueue bind(String queueName, String... bindings) throws AMQException
+ {
+ return bind(queueName, getHeaders(bindings));
+ }
+
+ protected TestQueue bind(String queue, FieldTable bindings) throws AMQException
+ {
+ return bind(new TestQueue(queue), bindings);
+ }
+
+ protected TestQueue bind(TestQueue queue, String... bindings) throws AMQException
+ {
+ return bind(queue, getHeaders(bindings));
+ }
+
+ protected TestQueue bind(TestQueue queue, FieldTable bindings) throws AMQException
+ {
+ queues.add(queue);
+ exchange.registerQueue(null, queue, bindings);
+ return queue;
+ }
+
+
+ protected void route(Message m) throws AMQException
+ {
+ m.route(exchange);
+ }
+
+ protected void routeAndTest(Message m, TestQueue... expected) throws AMQException
+ {
+ routeAndTest(m, Arrays.asList(expected));
+ }
+
+ protected void routeAndTest(Message m, List<TestQueue> expected) throws AMQException
+ {
+ route(m);
+ for (TestQueue q : queues)
+ {
+ if (expected.contains(q))
+ {
+ assertTrue("Expected " + m + " to be delivered to " + q, m.isInQueue(q));
+ //assert m.isInQueue(q) : "Expected " + m + " to be delivered to " + q;
+ }
+ else
+ {
+ assertFalse("Did not expect " + m + " to be delivered to " + q, m.isInQueue(q));
+ //assert !m.isInQueue(q) : "Did not expect " + m + " to be delivered to " + q;
+ }
+ }
+ }
+
+ static FieldTable getHeaders(String... entries)
+ {
+ FieldTable headers = new FieldTable();
+ for (String s : entries)
+ {
+ String[] parts = s.split("=", 2);
+ headers.put(parts[0], parts.length > 1 ? parts[1] : "");
+ }
+ return headers;
+ }
+
+ static BasicPublishBody getPublishRequest(String id)
+ {
+ BasicPublishBody request = new BasicPublishBody();
+ request.routingKey = id;
+ return request;
+ }
+
+ static ContentHeaderBody getContentHeader(FieldTable headers)
+ {
+ ContentHeaderBody header = new ContentHeaderBody();
+ header.properties = getProperties(headers);
+ return header;
+ }
+
+ static BasicContentHeaderProperties getProperties(FieldTable headers)
+ {
+ BasicContentHeaderProperties properties = new BasicContentHeaderProperties();
+ properties.setHeaders(headers);
+ return properties;
+ }
+
+ static class TestQueue extends AMQQueue
+ {
+ final List<HeadersExchangeTest.Message> messages = new ArrayList<HeadersExchangeTest.Message>();
+
+ public TestQueue(String name) throws AMQException
+ {
+ super(name, false, "test", true, ApplicationRegistry.getInstance().getQueueRegistry());
+ }
+
+ public void deliver(AMQMessage msg) throws AMQException
+ {
+ messages.add(new HeadersExchangeTest.Message(msg));
+ }
+ }
+
+ /**
+ * Just add some extra utility methods to AMQMessage to aid testing.
+ */
+ static class Message extends AMQMessage
+ {
+ private static MessageStore _messageStore = new SkeletonMessageStore();
+
+ Message(String id, String... headers) throws AMQException
+ {
+ this(id, getHeaders(headers));
+ }
+
+ Message(String id, FieldTable headers) throws AMQException
+ {
+ this(getPublishRequest(id), getContentHeader(headers), null);
+ }
+
+ private Message(BasicPublishBody publish, ContentHeaderBody header, List<ContentBody> bodies) throws AMQException
+ {
+ super(_messageStore, publish, header, bodies);
+ }
+
+ private Message(AMQMessage msg) throws AMQException
+ {
+ super(msg);
+ }
+
+ void route(Exchange exchange) throws AMQException
+ {
+ exchange.route(this);
+ }
+
+ boolean isInQueue(TestQueue queue)
+ {
+ return queue.messages.contains(this);
+ }
+
+ public int hashCode()
+ {
+ return getKey().hashCode();
+ }
+
+ public boolean equals(Object o)
+ {
+ return o instanceof HeadersExchangeTest.Message && equals((HeadersExchangeTest.Message) o);
+ }
+
+ private boolean equals(HeadersExchangeTest.Message m)
+ {
+ return getKey().equals(m.getKey());
+ }
+
+ public String toString()
+ {
+ return getKey().toString();
+ }
+
+ private Object getKey()
+ {
+ return getPublishBody().routingKey;
+ }
+ }
+}
diff --git a/java/broker/test/src/org/apache/qpid/server/exchange/HeadersBindingTest.java b/java/broker/test/src/org/apache/qpid/server/exchange/HeadersBindingTest.java
new file mode 100644
index 0000000000..7e33b1d711
--- /dev/null
+++ b/java/broker/test/src/org/apache/qpid/server/exchange/HeadersBindingTest.java
@@ -0,0 +1,200 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import org.junit.Test;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+
+
+import java.util.Map;
+import java.util.HashMap;
+
+import junit.framework.JUnit4TestAdapter;
+
+/**
+ */
+public class HeadersBindingTest
+{
+ private Map<String, String> bindHeaders = new HashMap<String, String>();
+ private Map<String, String> matchHeaders = new HashMap<String, String>();
+
+ @Test public void default_1()
+ {
+ bindHeaders.put("A", "Value of A");
+
+ matchHeaders.put("A", "Value of A");
+
+ assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ }
+
+ @Test public void default_2()
+ {
+ bindHeaders.put("A", "Value of A");
+
+ matchHeaders.put("A", "Value of A");
+ matchHeaders.put("B", "Value of B");
+
+ assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ }
+
+ @Test public void default_3()
+ {
+ bindHeaders.put("A", "Value of A");
+
+ matchHeaders.put("A", "Altered value of A");
+
+ assertFalse(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ }
+
+ @Test public void all_1()
+ {
+ bindHeaders.put("X-match", "all");
+ bindHeaders.put("A", "Value of A");
+
+ matchHeaders.put("A", "Value of A");
+
+ assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ }
+
+ @Test public void all_2()
+ {
+ bindHeaders.put("X-match", "all");
+ bindHeaders.put("A", "Value of A");
+ bindHeaders.put("B", "Value of B");
+
+ matchHeaders.put("A", "Value of A");
+
+ assertFalse(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ }
+
+ @Test public void all_3()
+ {
+ bindHeaders.put("X-match", "all");
+ bindHeaders.put("A", "Value of A");
+ bindHeaders.put("B", "Value of B");
+
+ matchHeaders.put("A", "Value of A");
+ matchHeaders.put("B", "Value of B");
+
+ assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ }
+
+ @Test public void all_4()
+ {
+ bindHeaders.put("X-match", "all");
+ bindHeaders.put("A", "Value of A");
+ bindHeaders.put("B", "Value of B");
+
+ matchHeaders.put("A", "Value of A");
+ matchHeaders.put("B", "Value of B");
+ matchHeaders.put("C", "Value of C");
+
+ assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ }
+
+ @Test public void all_5()
+ {
+ bindHeaders.put("X-match", "all");
+ bindHeaders.put("A", "Value of A");
+ bindHeaders.put("B", "Value of B");
+
+ matchHeaders.put("A", "Value of A");
+ matchHeaders.put("B", "Altered value of B");
+ matchHeaders.put("C", "Value of C");
+
+ assertFalse(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ }
+
+ @Test public void any_1()
+ {
+ bindHeaders.put("X-match", "any");
+ bindHeaders.put("A", "Value of A");
+
+ matchHeaders.put("A", "Value of A");
+
+ assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ }
+
+ @Test public void any_2()
+ {
+ bindHeaders.put("X-match", "any");
+ bindHeaders.put("A", "Value of A");
+ bindHeaders.put("B", "Value of B");
+
+ matchHeaders.put("A", "Value of A");
+
+ assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ }
+
+ @Test public void any_3()
+ {
+ bindHeaders.put("X-match", "any");
+ bindHeaders.put("A", "Value of A");
+ bindHeaders.put("B", "Value of B");
+
+ matchHeaders.put("A", "Value of A");
+ matchHeaders.put("B", "Value of B");
+
+ assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ }
+
+ @Test public void any_4()
+ {
+ bindHeaders.put("X-match", "any");
+ bindHeaders.put("A", "Value of A");
+ bindHeaders.put("B", "Value of B");
+
+ matchHeaders.put("A", "Value of A");
+ matchHeaders.put("B", "Value of B");
+ matchHeaders.put("C", "Value of C");
+
+ assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ }
+
+ @Test public void any_5()
+ {
+ bindHeaders.put("X-match", "any");
+ bindHeaders.put("A", "Value of A");
+ bindHeaders.put("B", "Value of B");
+
+ matchHeaders.put("A", "Value of A");
+ matchHeaders.put("B", "Altered value of B");
+ matchHeaders.put("C", "Value of C");
+
+ assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ }
+
+ @Test public void any_6()
+ {
+ bindHeaders.put("X-match", "any");
+ bindHeaders.put("A", "Value of A");
+ bindHeaders.put("B", "Value of B");
+
+ matchHeaders.put("A", "Altered value of A");
+ matchHeaders.put("B", "Altered value of B");
+ matchHeaders.put("C", "Value of C");
+
+ assertFalse(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ }
+ public static junit.framework.Test suite()
+ {
+ return new JUnit4TestAdapter(HeadersBindingTest.class);
+ }
+
+}
diff --git a/java/broker/test/src/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java b/java/broker/test/src/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java
new file mode 100644
index 0000000000..74cb082db7
--- /dev/null
+++ b/java/broker/test/src/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java
@@ -0,0 +1,181 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.NoConsumersException;
+import org.apache.qpid.server.util.TimedRun;
+import org.apache.qpid.server.util.AveragedRun;
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.ContentBody;
+
+import java.util.List;
+
+/**
+ * Want to vary the number of regsitrations, messages and matches and measure
+ * the corresponding variance in execution time.
+ * <p/>
+ * Each registration will contain the 'All' header, even registrations will
+ * contain the 'Even' header and odd headers will contain the 'Odd' header.
+ * In additions each regsitration will have a unique value for the 'Specific'
+ * header as well.
+ * <p/>
+ * Messages can then be routed to all registrations, to even- or odd- registrations
+ * or to a specific registration.
+ *
+ */
+public class HeadersExchangePerformanceTest extends AbstractHeadersExchangeTest
+{
+ private static enum Mode {ALL, ODD_OR_EVEN, SPECIFIC}
+
+ private final TestQueue[] queues;
+ private final Mode mode;
+
+ public HeadersExchangePerformanceTest(Mode mode, int registrations) throws AMQException
+ {
+ this.mode = mode;
+ queues = new TestQueue[registrations];
+ for (int i = 0; i < queues.length; i++)
+ {
+ switch(mode)
+ {
+ case ALL:
+ queues[i] = bind(new FastQueue("Queue" + i), "All");
+ break;
+ case ODD_OR_EVEN:
+ queues[i] = bind(new FastQueue("Queue" + i), "All", oddOrEven(i));
+ break;
+ case SPECIFIC:
+ queues[i] = bind(new FastQueue("Queue" + i), "All", oddOrEven(i), "Specific"+ i);
+ break;
+ }
+ }
+ }
+
+ void sendToAll(int count) throws AMQException
+ {
+ send(count, "All=True");
+ }
+
+ void sendToOdd(int count) throws AMQException
+ {
+ send(count, "All=True", "Odd=True");
+ }
+
+ void sendToEven(int count) throws AMQException
+ {
+ send(count, "All=True", "Even=True");
+ }
+
+ void sendToAllSpecifically(int count) throws AMQException
+ {
+ for (int i = 0; i < queues.length; i++)
+ {
+ sendToSpecific(count, i);
+ }
+ }
+
+ void sendToSpecific(int count, int index) throws AMQException
+ {
+ send(count, "All=True", oddOrEven(index) + "=True", "Specific=" + index);
+ }
+
+ private void send(int count, String... headers) throws AMQException
+ {
+ for (int i = 0; i < count; i++)
+ {
+ route(new Message("Message" + i, headers));
+ }
+ }
+
+ private static String oddOrEven(int i)
+ {
+ return (i % 2 == 0 ? "Even" : "Odd");
+ }
+
+ static class FastQueue extends TestQueue
+ {
+
+ public FastQueue(String name) throws AMQException
+ {
+ super(name);
+ }
+
+ public void deliver(BasicPublishBody publishBody, ContentHeaderBody contentHeaderBody, List<ContentBody> contentBodies) throws NoConsumersException
+ {
+ //just discard as we are not testing routing functionality here
+ }
+ }
+
+ static class Test extends TimedRun
+ {
+ private final Mode mode;
+ private final int registrations;
+ private final int count;
+ private HeadersExchangePerformanceTest test;
+
+ Test(Mode mode, int registrations, int count)
+ {
+ super(mode + ", registrations=" + registrations + ", count=" + count);
+ this.mode = mode;
+ this.registrations = registrations;
+ this.count = count;
+ }
+
+ protected void setup() throws Exception
+ {
+ test = new HeadersExchangePerformanceTest(mode, registrations);
+ run(100); //do a warm up run before times start
+ }
+
+ protected void teardown() throws Exception
+ {
+ test = null;
+ System.gc();
+ }
+
+ protected void run() throws Exception
+ {
+ run(count);
+ }
+
+ private void run(int count) throws Exception
+ {
+ switch(mode)
+ {
+ case ALL:
+ test.sendToAll(count);
+ break;
+ default:
+ System.out.println("Test for " + mode + " not yet implemented.");
+ }
+ }
+ }
+
+ public static void main(String[] argv) throws Exception
+ {
+ int registrations = Integer.parseInt(argv[0]);
+ int messages = Integer.parseInt(argv[1]);
+ int iterations = Integer.parseInt(argv[2]);
+ TimedRun test = new Test(Mode.ALL, registrations, messages);
+ AveragedRun tests = new AveragedRun(test, iterations);
+ System.out.println(tests.call());
+ }
+}
+
diff --git a/java/broker/test/src/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/java/broker/test/src/org/apache/qpid/server/exchange/HeadersExchangeTest.java
new file mode 100644
index 0000000000..86414ffae2
--- /dev/null
+++ b/java/broker/test/src/org/apache/qpid/server/exchange/HeadersExchangeTest.java
@@ -0,0 +1,81 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import org.junit.Test;
+import org.junit.Before;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.util.NullApplicationRegistry;
+import junit.framework.JUnit4TestAdapter;
+
+public class HeadersExchangeTest extends AbstractHeadersExchangeTest
+{
+ @Before
+ public void init() throws Exception
+ {
+ ApplicationRegistry.initialise(new NullApplicationRegistry());
+ }
+
+ @Test
+ public void simple() throws AMQException
+ {
+ TestQueue q1 = bindDefault("F0000");
+ TestQueue q2 = bindDefault("F0000=Aardvark");
+ TestQueue q3 = bindDefault("F0001");
+ TestQueue q4 = bindDefault("F0001=Bear");
+ TestQueue q5 = bindDefault("F0000", "F0001");
+ TestQueue q6 = bindDefault("F0000=Aardvark", "F0001=Bear");
+ TestQueue q7 = bindDefault("F0000", "F0001=Bear");
+ TestQueue q8 = bindDefault("F0000=Aardvark", "F0001");
+ TestQueue q9 = bindDefault("F0000=Apple", "F0001=Banana");
+ TestQueue q10 = bindDefault("F0000=Apple", "F0001");
+
+ routeAndTest(new Message("Message1", "F0000"), q1);
+ routeAndTest(new Message("Message2", "F0000=Aardvark"), q1, q2);
+ routeAndTest(new Message("Message3", "F0000=Aardvark", "F0001"), q1, q2, q3, q5, q8);
+ routeAndTest(new Message("Message4", "F0000", "F0001=Bear"), q1, q3, q4, q5, q7);
+ routeAndTest(new Message("Message5", "F0000=Aardvark", "F0001=Bear"),
+ q1, q2, q3, q4, q5, q6, q7, q8);
+ routeAndTest(new Message("Message6", "F0002"));
+ }
+
+ @Test
+ public void any() throws AMQException
+ {
+ TestQueue q1 = bindDefault("F0000", "F0001", "X-match=any");
+ TestQueue q2 = bindDefault("F0000=Aardvark", "F0001=Bear", "X-match=any");
+ TestQueue q3 = bindDefault("F0000", "F0001=Bear", "X-match=any");
+ TestQueue q4 = bindDefault("F0000=Aardvark", "F0001", "X-match=any");
+ TestQueue q5 = bindDefault("F0000=Apple", "F0001=Banana", "X-match=any");
+ TestQueue q6 = bindDefault("F0000=Apple", "F0001", "X-match=any");
+
+ routeAndTest(new Message("Message1", "F0000"), q1, q3);
+ routeAndTest(new Message("Message2", "F0000=Aardvark"), q1, q2, q3, q4);
+ routeAndTest(new Message("Message3", "F0000=Aardvark", "F0001"), q1, q2, q3, q4, q6);
+ routeAndTest(new Message("Message4", "F0000", "F0001=Bear"), q1, q2, q3, q4, q6);
+ routeAndTest(new Message("Message5", "F0000=Aardvark", "F0001=Bear"), q1, q2, q3, q4, q6);
+ routeAndTest(new Message("Message6", "F0002"));
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new JUnit4TestAdapter(HeadersExchangeTest.class);
+ }
+
+}
diff --git a/java/broker/test/src/org/apache/qpid/server/exchange/UnitTests.java b/java/broker/test/src/org/apache/qpid/server/exchange/UnitTests.java
new file mode 100644
index 0000000000..a3c6439b67
--- /dev/null
+++ b/java/broker/test/src/org/apache/qpid/server/exchange/UnitTests.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import junit.framework.JUnit4TestAdapter;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+@RunWith(Suite.class)
+@Suite.SuiteClasses({HeadersBindingTest.class, HeadersExchangeTest.class})
+public class UnitTests
+{
+ public static junit.framework.Test suite()
+ {
+ return new JUnit4TestAdapter(UnitTests.class);
+ }
+}
diff --git a/java/broker/test/src/org/apache/qpid/server/protocol/MockIoSession.java b/java/broker/test/src/org/apache/qpid/server/protocol/MockIoSession.java
new file mode 100644
index 0000000000..101ba7dd36
--- /dev/null
+++ b/java/broker/test/src/org/apache/qpid/server/protocol/MockIoSession.java
@@ -0,0 +1,288 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import org.apache.mina.common.*;
+import org.apache.mina.common.support.DefaultCloseFuture;
+import org.apache.mina.common.support.DefaultWriteFuture;
+
+import java.net.SocketAddress;
+import java.util.Set;
+
+public class MockIoSession implements IoSession
+{
+ private AMQProtocolSession _protocolSession;
+
+ /**
+ * Stores the last response written
+ */
+ private Object _lastWrittenObject;
+
+ private boolean _closing;
+
+ public MockIoSession()
+ {
+ }
+
+ public Object getLastWrittenObject()
+ {
+ return _lastWrittenObject;
+ }
+
+ public IoService getService()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public IoHandler getHandler()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public IoSessionConfig getConfig()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public IoFilterChain getFilterChain()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public WriteFuture write(Object message)
+ {
+ WriteFuture wf = new DefaultWriteFuture(null);
+ _lastWrittenObject = message;
+ return wf;
+ }
+
+ public CloseFuture close()
+ {
+ _closing = true;
+ CloseFuture cf = new DefaultCloseFuture(null);
+ cf.setClosed();
+ return cf;
+ }
+
+ public Object getAttachment()
+ {
+ return _protocolSession;
+ }
+
+ public Object setAttachment(Object attachment)
+ {
+ Object current = _protocolSession;
+ _protocolSession = (AMQProtocolSession) attachment;
+ return current;
+ }
+
+ public Object getAttribute(String key)
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public Object setAttribute(String key, Object value)
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public Object setAttribute(String key)
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public Object removeAttribute(String key)
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean containsAttribute(String key)
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public Set getAttributeKeys()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public TransportType getTransportType()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean isConnected()
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean isClosing()
+ {
+ return _closing;
+ }
+
+ public CloseFuture getCloseFuture()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public SocketAddress getRemoteAddress()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public SocketAddress getLocalAddress()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public SocketAddress getServiceAddress()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public int getIdleTime(IdleStatus status)
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public long getIdleTimeInMillis(IdleStatus status)
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void setIdleTime(IdleStatus status, int idleTime)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public int getWriteTimeout()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public long getWriteTimeoutInMillis()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void setWriteTimeout(int writeTimeout)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public TrafficMask getTrafficMask()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void setTrafficMask(TrafficMask trafficMask)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void suspendRead()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void suspendWrite()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void resumeRead()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void resumeWrite()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public long getReadBytes()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public long getWrittenBytes()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public long getReadMessages()
+ {
+ return 0L;
+ }
+
+ public long getWrittenMessages()
+ {
+ return 0L;
+ }
+
+ public long getWrittenWriteRequests()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public int getScheduledWriteRequests()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public int getScheduledWriteBytes()
+ {
+ return 0; //TODO
+ }
+
+ public long getCreationTime()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public long getLastIoTime()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public long getLastReadTime()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public long getLastWriteTime()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean isIdle(IdleStatus status)
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public int getIdleCount(IdleStatus status)
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public long getLastIdleTime(IdleStatus status)
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+}
diff --git a/java/broker/test/src/org/apache/qpid/server/protocol/TestProtocolInitiation.java b/java/broker/test/src/org/apache/qpid/server/protocol/TestProtocolInitiation.java
new file mode 100644
index 0000000000..34e1709a2d
--- /dev/null
+++ b/java/broker/test/src/org/apache/qpid/server/protocol/TestProtocolInitiation.java
@@ -0,0 +1,212 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import junit.framework.Assert;
+import junit.framework.JUnit4TestAdapter;
+import org.apache.qpid.codec.AMQDecoder;
+import org.apache.qpid.codec.AMQEncoder;
+import org.apache.qpid.framing.*;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.WriteFuture;
+import org.apache.mina.filter.codec.ProtocolDecoderOutput;
+import org.apache.mina.filter.codec.ProtocolEncoderOutput;
+import org.apache.mina.filter.codec.support.SimpleProtocolDecoderOutput;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * This test suite tests the handling of protocol initiation frames and related issues.
+ */
+public class TestProtocolInitiation implements ProtocolVersionList
+{
+ private AMQPFastProtocolHandler _protocolHandler;
+
+ private MockIoSession _mockIoSession;
+
+ /**
+ * We need to use the object encoder mechanism so to allow us to retrieve the
+ * output (a bytebuffer) we define our own encoder output class. The encoder
+ * writes the encoded data to this class, from where we can retrieve it during
+ * the test run.
+ */
+ private class TestProtocolEncoderOutput implements ProtocolEncoderOutput
+ {
+ public ByteBuffer result;
+
+ public void write(ByteBuffer buf)
+ {
+ result = buf;
+ }
+
+ public void mergeAll()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public WriteFuture flush()
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private class TestProtocolDecoderOutput implements ProtocolDecoderOutput
+ {
+ public Object result;
+
+ public void write(Object buf)
+ {
+ result = buf;
+ }
+
+ public void flush()
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ @Before
+ public void createCommonObjects()
+ {
+ _mockIoSession = new MockIoSession();
+ _protocolHandler = new AMQPFastProtocolHandler(null, null);
+ }
+
+
+ /**
+ * Tests that the AMQDecoder handles invalid protocol classes
+ * @throws Exception
+ */
+ @Test(expected = AMQProtocolClassException.class)
+ public void testDecoderValidateProtocolClass() throws Exception
+ {
+ ProtocolInitiation pi = createValidProtocolInitiation();
+ pi.protocolClass = 2;
+ decodePI(pi);
+ }
+
+ /**
+ * Tests that the AMQDecoder handles invalid protocol instance numbers
+ * @throws Exception
+ */
+ @Test(expected = AMQProtocolInstanceException.class)
+ public void testDecoderValidatesProtocolInstance() throws Exception
+ {
+ ProtocolInitiation pi = createValidProtocolInitiation();
+ pi.protocolInstance = 2;
+ decodePI(pi);
+ }
+
+ /**
+ * Tests that the AMQDecoder handles invalid protocol major
+ * @throws Exception
+ */
+ @Test(expected = AMQProtocolVersionException.class)
+ public void testDecoderValidatesProtocolMajor() throws Exception
+ {
+ ProtocolInitiation pi = createValidProtocolInitiation();
+ pi.protocolMajor = 2;
+ decodePI(pi);
+ }
+
+ /**
+ * Tests that the AMQDecoder handles invalid protocol minor
+ * @throws Exception
+ */
+ @Test(expected = AMQProtocolVersionException.class)
+ public void testDecoderValidatesProtocolMinor() throws Exception
+ {
+ ProtocolInitiation pi = createValidProtocolInitiation();
+ pi.protocolMinor = 99;
+ decodePI(pi);
+ }
+
+ /**
+ * Tests that the AMQDecoder accepts a valid PI
+ * @throws Exception
+ */
+ @Test(expected = AMQProtocolHeaderException.class)
+ public void testDecoderValidatesHeader() throws Exception
+ {
+ ProtocolInitiation pi = createValidProtocolInitiation();
+ pi.header = new char[] {'P', 'Q', 'M', 'A' };
+ decodePI(pi);
+ }
+
+ /**
+ * Test that a valid header is passed by the decoder.
+ * @throws Exception
+ */
+ @Test
+ public void testDecoderAcceptsValidHeader() throws Exception
+ {
+ ProtocolInitiation pi = createValidProtocolInitiation();
+ decodePI(pi);
+ }
+
+ /**
+ * This test checks that an invalid protocol header results in the
+ * connection being closed.
+ */
+ @Test
+ public void testInvalidProtocolHeaderClosesConnection() throws Exception
+ {
+ AMQProtocolHeaderException pe = new AMQProtocolHeaderException("Test");
+ _protocolHandler.exceptionCaught(_mockIoSession, pe);
+ Assert.assertNotNull(_mockIoSession.getLastWrittenObject());
+ Object piResponse = _mockIoSession.getLastWrittenObject();
+ Assert.assertEquals(piResponse.getClass(), ProtocolInitiation.class);
+ ProtocolInitiation pi = (ProtocolInitiation) piResponse;
+ Assert.assertEquals("Protocol Initiation sent out was not the broker's expected header", pi,
+ createValidProtocolInitiation());
+ Assert.assertTrue("Session has not been closed", _mockIoSession.isClosing());
+ }
+
+ private ProtocolInitiation createValidProtocolInitiation()
+ {
+ /* Find last protocol version in protocol version list. Make sure last protocol version
+ listed in the build file (build-module.xml) is the latest version which will be used
+ here. */
+ int i = pv.length - 1;
+ return new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR]);
+ }
+
+ /**
+ * Helper that encodes a protocol initiation and attempts to decode it
+ * @param pi
+ * @throws Exception
+ */
+ private void decodePI(ProtocolInitiation pi) throws Exception
+ {
+ // we need to do this test at the level of the decoder since we initially only expect PI frames
+ // so the protocol handler is not set up to know whether it should be expecting a PI frame or
+ // a different type of frame
+ AMQDecoder decoder = new AMQDecoder(true);
+ AMQEncoder encoder = new AMQEncoder();
+ TestProtocolEncoderOutput peo = new TestProtocolEncoderOutput();
+ encoder.encode(_mockIoSession, pi, peo);
+ TestProtocolDecoderOutput pdo = new TestProtocolDecoderOutput();
+ decoder.decode(_mockIoSession, peo.result, pdo);
+ ((ProtocolInitiation) pdo.result).checkVersion(this);
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new JUnit4TestAdapter(TestProtocolInitiation.class);
+ }
+}
diff --git a/java/broker/test/src/org/apache/qpid/server/protocol/UnitTests.java b/java/broker/test/src/org/apache/qpid/server/protocol/UnitTests.java
new file mode 100644
index 0000000000..09dc76d310
--- /dev/null
+++ b/java/broker/test/src/org/apache/qpid/server/protocol/UnitTests.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import junit.framework.JUnit4TestAdapter;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+@RunWith(Suite.class)
+@Suite.SuiteClasses({TestProtocolInitiation.class})
+public class UnitTests
+{
+ public static junit.framework.Test suite()
+ {
+ return new JUnit4TestAdapter(UnitTests.class);
+ }
+}
diff --git a/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java b/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java
new file mode 100644
index 0000000000..904665949c
--- /dev/null
+++ b/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java
@@ -0,0 +1,243 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import junit.framework.JUnit4TestAdapter;
+import static org.junit.Assert.assertTrue;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.Ignore;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.util.NullApplicationRegistry;
+import org.apache.log4j.Logger;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Tests that acknowledgements are handled correctly.
+ */
+public class AckTest
+{
+ private static final Logger _log = Logger.getLogger(AckTest.class);
+
+ private SubscriptionImpl _subscription;
+
+ private MockProtocolSession _protocolSession;
+
+ private TestableMemoryMessageStore _messageStore;
+
+ private AMQChannel _channel;
+
+ private SubscriptionSet _subscriptionManager;
+
+ private AMQQueue _queue;
+
+ public AckTest() throws Exception
+ {
+ ApplicationRegistry.initialise(new NullApplicationRegistry());
+ }
+
+ @Before
+ public void setup() throws Exception
+ {
+ _messageStore = new TestableMemoryMessageStore();
+ _channel = new AMQChannel(5, _messageStore, null/*dont need exchange registry*/);
+ _protocolSession = new MockProtocolSession(_messageStore);
+ _protocolSession.addChannel(_channel);
+ _subscriptionManager = new SubscriptionSet();
+ _queue = new AMQQueue("myQ", false, "guest", true, new DefaultQueueRegistry(), _subscriptionManager);
+ }
+
+ private void publishMessages(int count) throws AMQException
+ {
+ for (int i = 1; i <= count; i++)
+ {
+ BasicPublishBody publishBody = new BasicPublishBody();
+ publishBody.routingKey = "rk";
+ publishBody.exchange = "someExchange";
+ AMQMessage msg = new AMQMessage(_messageStore, publishBody);
+ msg.setContentHeaderBody(new ContentHeaderBody());
+ _subscription.send(msg, _queue);
+ }
+ }
+
+ /**
+ * Tests that the acknowledgements are correctly associated with a channel and
+ * order is preserved when acks are enabled
+ */
+ @Test @Ignore /* FIXME: broken at the moment */
+ public void ackChannelAssociationTest() throws AMQException
+ {
+ _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true);
+ final int msgCount = 10;
+ publishMessages(msgCount);
+
+ Map<Long, AMQChannel.UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
+ assertTrue(map.size() == msgCount);
+
+ Iterator<Map.Entry<Long, AMQChannel.UnacknowledgedMessage>> it = map.entrySet().iterator();
+ for (int i = 1; i <= map.size(); i++)
+ {
+ Map.Entry<Long, AMQChannel.UnacknowledgedMessage> entry = it.next();
+ assertTrue(entry.getKey() == i);
+ AMQChannel.UnacknowledgedMessage unackedMsg = entry.getValue();
+ assertTrue(unackedMsg.queue == _queue);
+ }
+ assertTrue(_messageStore.getMessageMap().size() == msgCount);
+ }
+
+ /**
+ * Tests that in no-ack mode no messages are retained
+ */
+ @Test
+ public void testNoAckMode() throws AMQException
+ {
+ // false arg means no acks expected
+ _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", false);
+ final int msgCount = 10;
+ publishMessages(msgCount);
+
+ Map<Long, AMQChannel.UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
+ assertTrue(map.size() == 0);
+ assertTrue(_messageStore.getMessageMap().size() == 0);
+ }
+
+ /**
+ * Tests that a single acknowledgement is handled correctly (i.e multiple flag not
+ * set case)
+ */
+ @Test
+ public void singleAckReceivedTest() throws AMQException
+ {
+ _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true);
+ final int msgCount = 10;
+ publishMessages(msgCount);
+
+ _channel.acknowledgeMessage(5, false);
+ Map<Long, AMQChannel.UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
+ assertTrue(map.size() == msgCount - 1);
+
+ Iterator<Map.Entry<Long, AMQChannel.UnacknowledgedMessage>> it = map.entrySet().iterator();
+ int i = 1;
+ while (i <= map.size())
+ {
+ Map.Entry<Long, AMQChannel.UnacknowledgedMessage> entry = it.next();
+ assertTrue(entry.getKey() == i);
+ AMQChannel.UnacknowledgedMessage unackedMsg = entry.getValue();
+ assertTrue(unackedMsg.queue == _queue);
+ // 5 is the delivery tag of the message that *should* be removed
+ if (++i == 5)
+ {
+ ++i;
+ }
+ }
+ }
+
+ /**
+ * Tests that a single acknowledgement is handled correctly (i.e multiple flag not
+ * set case)
+ */
+ @Test
+ public void multiAckReceivedTest() throws AMQException
+ {
+ _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true);
+ final int msgCount = 10;
+ publishMessages(msgCount);
+
+ _channel.acknowledgeMessage(5, true);
+ Map<Long, AMQChannel.UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
+ assertTrue(map.size() == 5);
+
+ Iterator<Map.Entry<Long, AMQChannel.UnacknowledgedMessage>> it = map.entrySet().iterator();
+ int i = 1;
+ while (i <= map.size())
+ {
+ Map.Entry<Long, AMQChannel.UnacknowledgedMessage> entry = it.next();
+ assertTrue(entry.getKey() == i + 5);
+ AMQChannel.UnacknowledgedMessage unackedMsg = entry.getValue();
+ assertTrue(unackedMsg.queue == _queue);
+ ++i;
+ }
+ }
+
+ /**
+ * Tests that a multiple acknowledgement is handled correctly. When ack'ing all pending msgs.
+ *
+ */
+ @Test
+ public void multiAckAllReceivedTest() throws AMQException
+ {
+ _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true);
+ final int msgCount = 10;
+ publishMessages(msgCount);
+
+ _channel.acknowledgeMessage(0, true);
+ Map<Long, AMQChannel.UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
+ assertTrue(map.size() == 0);
+
+ Iterator<Map.Entry<Long, AMQChannel.UnacknowledgedMessage>> it = map.entrySet().iterator();
+ int i = 1;
+ while (i <= map.size())
+ {
+ Map.Entry<Long, AMQChannel.UnacknowledgedMessage> entry = it.next();
+ assertTrue(entry.getKey() == i + 5);
+ AMQChannel.UnacknowledgedMessage unackedMsg = entry.getValue();
+ assertTrue(unackedMsg.queue == _queue);
+ ++i;
+ }
+ }
+
+
+
+ @Test
+ public void testPrefetch() throws AMQException
+ {
+ _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true);
+ _channel.setPrefetchCount(5);
+ final int msgCount = 5;
+ publishMessages(msgCount);
+
+ // at this point we should have sent out only 5 messages with a further 5 queued
+ // up in the channel which should be suspended
+ assertTrue(_subscription.isSuspended());
+ Map<Long, AMQChannel.UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
+ assertTrue(map.size() == 5);
+ _channel.acknowledgeMessage(5, true);
+ assertTrue(!_subscription.isSuspended());
+ try
+ {
+ Thread.sleep(3000);
+ }
+ catch (InterruptedException e)
+ {
+ _log.error("Error: " + e, e);
+ }
+ assertTrue(map.size() == 0);
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new JUnit4TestAdapter(AckTest.class);
+ }
+}
diff --git a/java/broker/test/src/org/apache/qpid/server/queue/ConcurrencyTest.java b/java/broker/test/src/org/apache/qpid/server/queue/ConcurrencyTest.java
new file mode 100644
index 0000000000..1cf11933fa
--- /dev/null
+++ b/java/broker/test/src/org/apache/qpid/server/queue/ConcurrencyTest.java
@@ -0,0 +1,261 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import junit.framework.JUnit4TestAdapter;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import org.junit.Test;
+import org.junit.Assert;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.handler.OnCurrentThreadExecutor;
+
+import java.util.*;
+import java.util.concurrent.Executor;
+
+/**
+ * Tests delivery in the face of concurrent incoming _messages, subscription alterations
+ * and attempts to asynchronously process queued _messages.
+ */
+public class ConcurrencyTest extends MessageTestHelper
+{
+ private final Random random = new Random();
+
+ private final int numMessages = 1000;
+
+ private final List<TestSubscription> _subscribers = new ArrayList<TestSubscription>();
+ private final Set<Subscription> _active = new HashSet<Subscription>();
+ private final List<AMQMessage> _messages = new ArrayList<AMQMessage>();
+ private int next = 0;//index to next message to send
+ private final List<AMQMessage> _received = Collections.synchronizedList(new ArrayList<AMQMessage>());
+ private final Executor _executor = new OnCurrentThreadExecutor();
+ private final List<Thread> _threads = new ArrayList<Thread>();
+
+ private final SubscriptionSet _subscriptionMgr = new SubscriptionSet();
+ private final DeliveryManager _deliveryMgr;
+
+ private boolean isComplete;
+ private boolean failed;
+
+ public ConcurrencyTest() throws Exception
+ {
+ _deliveryMgr = new DeliveryManager(_subscriptionMgr, new AMQQueue("myQ", false, "guest", false,
+ new DefaultQueueRegistry()));
+ }
+
+ @Test
+ public void concurrent1() throws InterruptedException, AMQException
+ {
+ initSubscriptions(10);
+ initMessages(numMessages);
+ initThreads(1, 4, 4, 4);
+ run();
+ check();
+ }
+
+ @Test
+ public void concurrent2() throws InterruptedException, AMQException
+ {
+ initSubscriptions(10);
+ initMessages(numMessages);
+ initThreads(4, 2, 2, 2);
+ run();
+ check();
+ }
+
+ void check()
+ {
+ assertFalse("Failed", failed);
+
+ _deliveryMgr.processAsync(_executor);
+
+ assertEquals("Did not recieve the correct number of messages", _messages.size(), _received.size());
+ for(int i = 0; i < _messages.size(); i++)
+ {
+ assertEquals("Wrong message at " + i, _messages.get(i), _received.get(i));
+ }
+ }
+
+ void initSubscriptions(int subscriptions)
+ {
+ for(int i = 0; i < subscriptions; i++)
+ {
+ _subscribers.add(new TestSubscription("Subscriber" + i, _received));
+ }
+ }
+
+ void initMessages(int messages) throws AMQException
+ {
+ for(int i = 0; i < messages; i++)
+ {
+ _messages.add(message());
+ }
+ }
+
+ void initThreads(int senders, int subscribers, int suspenders, int processors)
+ {
+ addThreads(senders, senders == 1 ? new Sender() : new OrderedSender());
+ addThreads(subscribers, new Subscriber());
+ addThreads(suspenders, new Suspender());
+ addThreads(processors, new Processor());
+ }
+
+ void addThreads(int count, Runnable runner)
+ {
+ for(int i = 0; i < count; i++)
+ {
+ _threads.add(new Thread(runner, runner.toString()));
+ }
+ }
+
+ void run() throws InterruptedException
+ {
+ for(Thread t : _threads)
+ {
+ t.start();
+ }
+
+ for(Thread t : _threads)
+ {
+ t.join();
+ }
+ }
+
+ private void toggle(Subscription s)
+ {
+ synchronized (_active)
+ {
+ if (_active.contains(s))
+ {
+ _active.remove(s);
+ Subscription result = _subscriptionMgr.removeSubscriber(s);
+ Assert.assertTrue("Removed subscription " + result + " but trying to remove subscription " + s,
+ result != null && result.equals(s));
+ }
+ else
+ {
+ _active.add(s);
+ _subscriptionMgr.addSubscriber(s);
+ }
+ }
+ }
+
+ private AMQMessage nextMessage()
+ {
+ synchronized (_messages)
+ {
+ if (next < _messages.size())
+ {
+ return _messages.get(next++);
+ }
+ else
+ {
+ if (_deliveryMgr.getQueueMessageCount() == 0) {
+ isComplete = true;
+ }
+ return null;
+ }
+ }
+ }
+
+ private boolean randomBoolean()
+ {
+ return random.nextBoolean();
+ }
+
+ private TestSubscription randomSubscriber()
+ {
+ return _subscribers.get(random.nextInt(_subscribers.size()));
+ }
+
+ private class Sender extends Runner
+ {
+ void doRun() throws Throwable
+ {
+ AMQMessage msg = nextMessage();
+ if (msg != null)
+ {
+ _deliveryMgr.deliver(toString(), msg);
+ }
+ }
+ }
+
+ private class OrderedSender extends Sender
+ {
+ synchronized void doRun() throws Throwable
+ {
+ super.doRun();
+ }
+ }
+
+ private class Suspender extends Runner
+ {
+ void doRun() throws Throwable
+ {
+ randomSubscriber().setSuspended(randomBoolean());
+ }
+ }
+
+ private class Subscriber extends Runner
+ {
+ void doRun() throws Throwable
+ {
+ toggle(randomSubscriber());
+ }
+ }
+
+ private class Processor extends Runner
+ {
+ void doRun() throws Throwable
+ {
+ _deliveryMgr.processAsync(_executor);
+ }
+ }
+
+ private abstract class Runner implements Runnable
+ {
+ public void run()
+ {
+ try
+ {
+ while (!stop())
+ {
+ doRun();
+ }
+ }
+ catch (Throwable t)
+ {
+ failed = true;
+ t.printStackTrace();
+ }
+ }
+
+ abstract void doRun() throws Throwable;
+
+ boolean stop()
+ {
+ return isComplete || failed;
+ }
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new JUnit4TestAdapter(ConcurrencyTest.class);
+ }
+
+}
diff --git a/java/broker/test/src/org/apache/qpid/server/queue/DeliveryManagerTest.java b/java/broker/test/src/org/apache/qpid/server/queue/DeliveryManagerTest.java
new file mode 100644
index 0000000000..d00cd55fa1
--- /dev/null
+++ b/java/broker/test/src/org/apache/qpid/server/queue/DeliveryManagerTest.java
@@ -0,0 +1,159 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import org.junit.Test;
+import org.apache.qpid.server.handler.OnCurrentThreadExecutor;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.util.NullApplicationRegistry;
+import org.apache.qpid.AMQException;
+import junit.framework.JUnit4TestAdapter;
+
+public class DeliveryManagerTest extends MessageTestHelper
+{
+ private final SubscriptionSet _subscriptions = new SubscriptionSet();
+ private final DeliveryManager _mgr;
+
+ public DeliveryManagerTest() throws Exception
+ {
+ try
+ {
+ _mgr = new DeliveryManager(_subscriptions, new AMQQueue("myQ", false, "guest", false,
+ new DefaultQueueRegistry()));
+ }
+ catch(Throwable t)
+ {
+ t.printStackTrace();
+ throw new AMQException("Could not initialise delivery manager", t);
+ }
+ }
+
+ @Test
+ public void startInQueueingMode() throws AMQException
+ {
+ AMQMessage[] messages = new AMQMessage[10];
+ for(int i = 0; i < messages.length; i++)
+ {
+ messages[i] = message();
+ }
+ int batch = messages.length / 2;
+
+ for(int i = 0; i < batch; i++)
+ {
+ _mgr.deliver("Me", messages[i]);
+ }
+
+ TestSubscription s1 = new TestSubscription("1");
+ TestSubscription s2 = new TestSubscription("2");
+ _subscriptions.addSubscriber(s1);
+ _subscriptions.addSubscriber(s2);
+
+ for(int i = batch; i < messages.length; i++)
+ {
+ _mgr.deliver("Me", messages[i]);
+ }
+
+ assertTrue(s1.getMessages().isEmpty());
+ assertTrue(s2.getMessages().isEmpty());
+
+ _mgr.processAsync(new OnCurrentThreadExecutor());
+
+ assertEquals(messages.length / 2, s1.getMessages().size());
+ assertEquals(messages.length / 2, s2.getMessages().size());
+
+ for(int i = 0; i < messages.length; i++)
+ {
+ if(i % 2 == 0)
+ {
+ assertTrue(s1.getMessages().get(i / 2) == messages[i]);
+ }
+ else
+ {
+ assertTrue(s2.getMessages().get(i / 2) == messages[i]);
+ }
+ }
+ }
+
+ @Test
+ public void startInDirectMode() throws AMQException
+ {
+ AMQMessage[] messages = new AMQMessage[10];
+ for(int i = 0; i < messages.length; i++)
+ {
+ messages[i] = message();
+ }
+ int batch = messages.length / 2;
+
+ TestSubscription s1 = new TestSubscription("1");
+ _subscriptions.addSubscriber(s1);
+
+ for(int i = 0; i < batch; i++)
+ {
+ _mgr.deliver("Me", messages[i]);
+ }
+
+ assertEquals(batch, s1.getMessages().size());
+ for(int i = 0; i < batch; i++)
+ {
+ assertTrue(messages[i] == s1.getMessages().get(i));
+ }
+ s1.getMessages().clear();
+ assertEquals(0, s1.getMessages().size());
+
+ s1.setSuspended(true);
+ for(int i = batch; i < messages.length; i++)
+ {
+ _mgr.deliver("Me", messages[i]);
+ }
+
+ _mgr.processAsync(new OnCurrentThreadExecutor());
+ assertEquals(0, s1.getMessages().size());
+ s1.setSuspended(false);
+
+ _mgr.processAsync(new OnCurrentThreadExecutor());
+ assertEquals(messages.length - batch, s1.getMessages().size());
+
+ for(int i = batch; i < messages.length; i++)
+ {
+ assertTrue(messages[i] == s1.getMessages().get(i - batch));
+ }
+
+ }
+
+ @Test (expected=NoConsumersException.class)
+ public void noConsumers() throws AMQException
+ {
+ _mgr.deliver("Me", message(true));
+ }
+
+ @Test (expected=NoConsumersException.class)
+ public void noActiveConsumers() throws AMQException
+ {
+ TestSubscription s = new TestSubscription("A");
+ _subscriptions.addSubscriber(s);
+ s.setSuspended(true);
+ _mgr.deliver("Me", message(true));
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new JUnit4TestAdapter(DeliveryManagerTest.class);
+ }
+}
diff --git a/java/broker/test/src/org/apache/qpid/server/queue/MessageTestHelper.java b/java/broker/test/src/org/apache/qpid/server/queue/MessageTestHelper.java
new file mode 100644
index 0000000000..f9baa77b65
--- /dev/null
+++ b/java/broker/test/src/org/apache/qpid/server/queue/MessageTestHelper.java
@@ -0,0 +1,49 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.SkeletonMessageStore;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.util.NullApplicationRegistry;
+import org.apache.qpid.AMQException;
+
+class MessageTestHelper
+{
+ private final MessageStore _messageStore = new SkeletonMessageStore();
+
+ MessageTestHelper() throws Exception
+ {
+ ApplicationRegistry.initialise(new NullApplicationRegistry());
+ }
+
+ AMQMessage message() throws AMQException
+ {
+ return message(false);
+ }
+
+ AMQMessage message(boolean immediate) throws AMQException
+ {
+ BasicPublishBody publish = new BasicPublishBody();
+ publish.immediate = immediate;
+ return new AMQMessage(_messageStore, publish, new ContentHeaderBody(), null);
+ }
+
+}
diff --git a/java/broker/test/src/org/apache/qpid/server/queue/MockProtocolSession.java b/java/broker/test/src/org/apache/qpid/server/queue/MockProtocolSession.java
new file mode 100644
index 0000000000..f26d6d64b3
--- /dev/null
+++ b/java/broker/test/src/org/apache/qpid/server/queue/MockProtocolSession.java
@@ -0,0 +1,121 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.store.MessageStore;
+
+import javax.security.sasl.SaslServer;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A protocol session that can be used for testing purposes.
+ */
+public class MockProtocolSession implements AMQProtocolSession
+{
+ private MessageStore _messageStore;
+
+ private Map<Integer, AMQChannel> _channelMap = new HashMap<Integer, AMQChannel>();
+
+ public MockProtocolSession(MessageStore messageStore)
+ {
+ _messageStore = messageStore;
+ }
+
+ public void dataBlockReceived(AMQDataBlock message) throws Exception
+ {
+ }
+
+ public void writeFrame(AMQDataBlock frame)
+ {
+ }
+
+ public String getContextKey()
+ {
+ return null;
+ }
+
+ public void setContextKey(String contextKey)
+ {
+ }
+
+ public AMQChannel getChannel(int channelId)
+ {
+ AMQChannel channel = _channelMap.get(channelId);
+ if (channel == null)
+ {
+ throw new IllegalArgumentException("Invalid channel id: " + channelId);
+ }
+ else
+ {
+ return channel;
+ }
+ }
+
+ public void addChannel(AMQChannel channel)
+ {
+ if (channel == null)
+ {
+ throw new IllegalArgumentException("Channel must not be null");
+ }
+ else
+ {
+ _channelMap.put(channel.getChannelId(), channel);
+ }
+ }
+
+ public void closeChannel(int channelId) throws AMQException
+ {
+ }
+
+ public void removeChannel(int channelId)
+ {
+ _channelMap.remove(channelId);
+ }
+
+ public void initHeartbeats(int delay)
+ {
+ }
+
+ public void closeSession() throws AMQException
+ {
+ }
+
+ public Object getKey()
+ {
+ return null;
+ }
+
+ public String getLocalFQDN()
+ {
+ return null;
+ }
+
+ public SaslServer getSaslServer()
+ {
+ return null;
+ }
+
+ public void setSaslServer(SaslServer saslServer)
+ {
+ }
+}
diff --git a/java/broker/test/src/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java b/java/broker/test/src/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java
new file mode 100644
index 0000000000..8ae8ebae79
--- /dev/null
+++ b/java/broker/test/src/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java
@@ -0,0 +1,46 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.util.AveragedRun;
+import org.apache.qpid.server.util.ConcurrentTest;
+
+public class QueueConcurrentPerfTest extends QueuePerfTest
+{
+ QueueConcurrentPerfTest(Factory factory, int queueCount, int messages)
+ {
+ super(factory, queueCount, messages);
+ }
+
+ public static void main(String[] argv) throws Exception
+ {
+ Factory[] factories = new Factory[]{SYNCHRONIZED, CONCURRENT};
+ int iterations = 5;
+ String label = argv.length > 0 ? argv[0]: null;
+ System.out.println((label == null ? "" : "Label, ") + "Queue Type, No. of Queues, No. of Operations, Avg Time, Min Time, Max Time");
+ //vary number of queues:
+ for(Factory f : factories)
+ {
+ run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 100, 10000), iterations), 5));
+ run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 1000, 10000), iterations), 5));
+ run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 10000, 10000), iterations), 5));
+ run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 1000, 1000), iterations), 5));
+ run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 1000, 100000), iterations), 5));
+ }
+ }
+}
diff --git a/java/broker/test/src/org/apache/qpid/server/queue/QueuePerfTest.java b/java/broker/test/src/org/apache/qpid/server/queue/QueuePerfTest.java
new file mode 100644
index 0000000000..36e4e90f35
--- /dev/null
+++ b/java/broker/test/src/org/apache/qpid/server/queue/QueuePerfTest.java
@@ -0,0 +1,255 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.util.AveragedRun;
+import org.apache.qpid.server.util.TimedRun;
+import org.apache.qpid.server.util.RunStats;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public class QueuePerfTest extends TimedRun
+{
+ private final Factory _factory;
+ private final int _queueCount;
+ private final int _messages;
+ private final String _msg = "";
+ private List<Queue<String>> _queues;
+
+ QueuePerfTest(Factory factory, int queueCount, int messages)
+ {
+ super(factory + ", " + queueCount + ", " + messages);
+ _factory = factory;
+ _queueCount = queueCount;
+ _messages = messages;
+ }
+
+ protected void setup() throws Exception
+ {
+ //init
+ int count = Integer.getInteger("prepopulate", 0);
+// System.err.println("Prepopulating with " + count + " items");
+ _queues = new ArrayList<Queue<String>>(_queueCount);
+ for (int i = 0; i < _queueCount; i++)
+ {
+ Queue<String> q = _factory.create();
+ for(int j = 0; j < count; ++j)
+ {
+ q.add("Item"+ j);
+ }
+ _queues.add(q);
+ }
+ System.gc();
+ }
+
+ protected void teardown() throws Exception
+ {
+ System.gc();
+ }
+
+ protected void run() throws Exception
+ {
+ //dispatch
+ for (int i = 0; i < _messages; i++)
+ {
+ for (Queue<String> q : _queues)
+ {
+ q.offer(_msg);
+ q.poll();
+ }
+ }
+ }
+
+ static interface Factory
+ {
+ Queue<String> create();
+ }
+
+ static Factory CONCURRENT = new Factory()
+ {
+ public Queue<String> create()
+ {
+ return new ConcurrentLinkedQueue<String>();
+ }
+
+ public String toString()
+ {
+ return "ConcurrentLinkedQueue";
+ }
+
+ };
+
+ static Factory SYNCHRONIZED = new Factory()
+ {
+ public Queue<String> create()
+ {
+ return new SynchronizedQueue<String>(new LinkedList<String>());
+ }
+
+
+ public String toString()
+ {
+ return "Synchronized LinkedList";
+ }
+ };
+
+ static Factory PLAIN = new Factory()
+ {
+ public Queue<String> create()
+ {
+ return new LinkedList<String>();
+ }
+
+ public String toString()
+ {
+ return "Plain LinkedList";
+ }
+ };
+
+ static class SynchronizedQueue<E> implements Queue<E>
+ {
+ private final Queue<E> queue;
+
+ SynchronizedQueue(Queue<E> queue)
+ {
+ this.queue = queue;
+ }
+
+ public synchronized E element()
+ {
+ return queue.element();
+ }
+
+ public synchronized boolean offer(E o)
+ {
+ return queue.offer(o);
+ }
+
+ public synchronized E peek()
+ {
+ return queue.peek();
+ }
+
+ public synchronized E poll()
+ {
+ return queue.poll();
+ }
+
+ public synchronized E remove()
+ {
+ return queue.remove();
+ }
+
+ public synchronized int size()
+ {
+ return queue.size();
+ }
+
+ public synchronized boolean isEmpty()
+ {
+ return queue.isEmpty();
+ }
+
+ public synchronized boolean contains(Object o)
+ {
+ return queue.contains(o);
+ }
+
+ public synchronized Iterator<E> iterator()
+ {
+ return queue.iterator();
+ }
+
+ public synchronized Object[] toArray()
+ {
+ return queue.toArray();
+ }
+
+ public synchronized <T>T[] toArray(T[] a)
+ {
+ return queue.toArray(a);
+ }
+
+ public synchronized boolean add(E o)
+ {
+ return queue.add(o);
+ }
+
+ public synchronized boolean remove(Object o)
+ {
+ return queue.remove(o);
+ }
+
+ public synchronized boolean containsAll(Collection<?> c)
+ {
+ return queue.containsAll(c);
+ }
+
+ public synchronized boolean addAll(Collection<? extends E> c)
+ {
+ return queue.addAll(c);
+ }
+
+ public synchronized boolean removeAll(Collection<?> c)
+ {
+ return queue.removeAll(c);
+ }
+
+ public synchronized boolean retainAll(Collection<?> c)
+ {
+ return queue.retainAll(c);
+ }
+
+ public synchronized void clear()
+ {
+ queue.clear();
+ }
+ }
+
+ static void run(String label, AveragedRun test) throws Exception
+ {
+ RunStats stats = test.call();
+ System.out.println((label == null ? "" : label + ", ") + test
+ + ", " + stats.getAverage() + ", " + stats.getMax() + ", " + stats.getMin());
+ }
+
+ public static void main(String[] argv) throws Exception
+ {
+ Factory[] factories = new Factory[]{PLAIN, SYNCHRONIZED, CONCURRENT};
+ int iterations = 5;
+ String label = argv.length > 0 ? argv[0]: null;
+ System.out.println((label == null ? "" : "Label, ") + "Queue Type, No. of Queues, No. of Operations, Avg Time, Min Time, Max Time");
+ //vary number of queues:
+
+ for(Factory f : factories)
+ {
+ run(label, new AveragedRun(new QueuePerfTest(f, 100, 10000), iterations));
+ run(label, new AveragedRun(new QueuePerfTest(f, 1000, 10000), iterations));
+ run(label, new AveragedRun(new QueuePerfTest(f, 10000, 10000), iterations));
+ run(label, new AveragedRun(new QueuePerfTest(f, 1000, 1000), iterations));
+ run(label, new AveragedRun(new QueuePerfTest(f, 1000, 100000), iterations));
+ }
+ }
+
+}
diff --git a/java/broker/test/src/org/apache/qpid/server/queue/SendPerfTest.java b/java/broker/test/src/org/apache/qpid/server/queue/SendPerfTest.java
new file mode 100644
index 0000000000..eff65a9350
--- /dev/null
+++ b/java/broker/test/src/org/apache/qpid/server/queue/SendPerfTest.java
@@ -0,0 +1,171 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.codec.AMQCodecFactory;
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.exchange.AbstractExchange;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.handler.OnCurrentThreadExecutor;
+import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.MockIoSession;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.SkeletonMessageStore;
+import org.apache.qpid.server.util.AveragedRun;
+import org.apache.qpid.server.util.NullApplicationRegistry;
+import org.apache.qpid.server.util.TimedRun;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class SendPerfTest extends TimedRun
+{
+ private int _messages = 1000;
+ private int _clients = 10;
+ private List<AMQQueue> _queues;
+
+ public SendPerfTest(int clients, int messages)
+ {
+ super("SendPerfTest, msgs=" + messages + ", clients=" + clients);
+ _messages = messages;
+ _clients = clients;
+ }
+
+ protected void setup() throws Exception
+ {
+ _queues = initQueues(_clients);
+ System.gc();
+ }
+
+ protected void teardown() throws Exception
+ {
+ System.gc();
+ }
+
+ protected void run() throws Exception
+ {
+ deliver(_messages, _queues);
+ }
+
+ //have a dummy AMQProtocolSession that does nothing on the writeFrame()
+ //set up x number of queues
+ //create necessary bits and pieces to deliver a message
+ //deliver y messages to each queue
+
+ public static void main(String[] argv) throws Exception
+ {
+ ApplicationRegistry.initialise(new NullApplicationRegistry());
+ int clients = Integer.parseInt(argv[0]);
+ int messages = Integer.parseInt(argv[1]);
+ int iterations = Integer.parseInt(argv[2]);
+ AveragedRun test = new AveragedRun(new SendPerfTest(clients, messages), iterations);
+ test.run();
+ }
+
+ /**
+ * Delivers messages to a number of queues.
+ * @param count the number of messages to deliver
+ * @param queues the list of queues
+ * @throws NoConsumersException
+ */
+ static void deliver(int count, List<AMQQueue> queues) throws AMQException
+ {
+ BasicPublishBody publish = new BasicPublishBody();
+ publish.exchange = new NullExchange().getName();
+ ContentHeaderBody header = new ContentHeaderBody();
+ List<ContentBody> body = new ArrayList<ContentBody>();
+ MessageStore messageStore = new SkeletonMessageStore();
+ body.add(new ContentBody());
+ for (int i = 0; i < count; i++)
+ {
+ for (AMQQueue q : queues)
+ {
+ q.deliver(new AMQMessage(messageStore, i, publish, header, body));
+ }
+ }
+ }
+
+ static List<AMQQueue> initQueues(int number) throws AMQException
+ {
+ Exchange exchange = new NullExchange();
+ List<AMQQueue> queues = new ArrayList<AMQQueue>(number);
+ for (int i = 0; i < number; i++)
+ {
+ AMQQueue q = createQueue("Queue" + (i + 1));
+ q.bind("routingKey", exchange);
+ try
+ {
+ q.registerProtocolSession(createSession(), 1, "1", false);
+ }
+ catch (Exception e)
+ {
+ throw new AMQException("Error creating protocol session: " + e, e);
+ }
+ queues.add(q);
+ }
+ return queues;
+ }
+
+ static AMQQueue createQueue(String name) throws AMQException
+ {
+ return new AMQQueue(name, false, null, false, ApplicationRegistry.getInstance().getQueueRegistry(),
+ new OnCurrentThreadExecutor());
+ }
+
+ static AMQProtocolSession createSession() throws Exception
+ {
+ IApplicationRegistry reg = ApplicationRegistry.getInstance();
+ AMQCodecFactory codecFactory = new AMQCodecFactory(true);
+ AMQMinaProtocolSession result = new AMQMinaProtocolSession(new MockIoSession(), reg.getQueueRegistry(), reg.getExchangeRegistry(), codecFactory);
+ result.addChannel(new AMQChannel(1, null, null));
+ return result;
+ }
+
+ static class NullExchange extends AbstractExchange
+ {
+ public String getName()
+ {
+ return "NullExchange";
+ }
+
+ protected ExchangeMBean createMBean()
+ {
+ return null;
+ }
+
+ public void registerQueue(String routingKey, AMQQueue queue, FieldTable args) throws AMQException
+ {
+ }
+
+ public void deregisterQueue(String routingKey, AMQQueue queue) throws AMQException
+ {
+ }
+
+ public void route(AMQMessage payload) throws AMQException
+ {
+ }
+ }
+}
diff --git a/java/broker/test/src/org/apache/qpid/server/queue/SubscriptionManagerTest.java b/java/broker/test/src/org/apache/qpid/server/queue/SubscriptionManagerTest.java
new file mode 100644
index 0000000000..7743db5078
--- /dev/null
+++ b/java/broker/test/src/org/apache/qpid/server/queue/SubscriptionManagerTest.java
@@ -0,0 +1,105 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import org.junit.Test;
+import junit.framework.JUnit4TestAdapter;
+
+public class SubscriptionManagerTest
+{
+ private final SubscriptionSet mgr = new SubscriptionSet();
+
+ @Test
+ public void basicSubscriptionManagement()
+ {
+ assertTrue(mgr.isEmpty());
+ assertFalse(mgr.hasActiveSubscribers());
+ TestSubscription s1 = new TestSubscription("S1");
+ mgr.addSubscriber(s1);
+ assertFalse(mgr.isEmpty());
+ assertTrue(mgr.hasActiveSubscribers());
+
+ TestSubscription s2 = new TestSubscription("S2");
+ mgr.addSubscriber(s2);
+
+ s2.setSuspended(true);
+ assertFalse(mgr.isEmpty());
+ assertTrue(mgr.hasActiveSubscribers());
+ assertTrue(s2.isSuspended());
+ assertFalse(s1.isSuspended());
+
+ s1.setSuspended(true);
+ assertFalse(mgr.hasActiveSubscribers());
+
+ mgr.removeSubscriber(new TestSubscription("S1"));
+ assertFalse(mgr.isEmpty());
+ mgr.removeSubscriber(new TestSubscription("S2"));
+ assertTrue(mgr.isEmpty());
+ }
+
+ @Test
+ public void roundRobin()
+ {
+ TestSubscription a = new TestSubscription("A");
+ TestSubscription b = new TestSubscription("B");
+ TestSubscription c = new TestSubscription("C");
+ TestSubscription d = new TestSubscription("D");
+ mgr.addSubscriber(a);
+ mgr.addSubscriber(b);
+ mgr.addSubscriber(c);
+ mgr.addSubscriber(d);
+
+ for (int i = 0; i < 3; i++)
+ {
+ assertEquals(a, mgr.nextSubscriber(null));
+ assertEquals(b, mgr.nextSubscriber(null));
+ assertEquals(c, mgr.nextSubscriber(null));
+ assertEquals(d, mgr.nextSubscriber(null));
+ }
+
+ c.setSuspended(true);
+
+ for (int i = 0; i < 3; i++)
+ {
+ assertEquals(a, mgr.nextSubscriber(null));
+ assertEquals(b, mgr.nextSubscriber(null));
+ assertEquals(d, mgr.nextSubscriber(null));
+ }
+
+ mgr.removeSubscriber(a);
+ d.setSuspended(true);
+ c.setSuspended(false);
+ Subscription e = new TestSubscription("D");
+ mgr.addSubscriber(e);
+
+ for (int i = 0; i < 3; i++)
+ {
+ assertEquals(b, mgr.nextSubscriber(null));
+ assertEquals(c, mgr.nextSubscriber(null));
+ assertEquals(e, mgr.nextSubscriber(null));
+ }
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new JUnit4TestAdapter(SubscriptionManagerTest.class);
+ }
+}
diff --git a/java/broker/test/src/org/apache/qpid/server/queue/SubscriptionSetTest.java b/java/broker/test/src/org/apache/qpid/server/queue/SubscriptionSetTest.java
new file mode 100644
index 0000000000..b6e8f8b44d
--- /dev/null
+++ b/java/broker/test/src/org/apache/qpid/server/queue/SubscriptionSetTest.java
@@ -0,0 +1,149 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import junit.framework.JUnit4TestAdapter;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import org.junit.Test;
+
+public class SubscriptionSetTest
+{
+ /**
+ * A SubscriptionSet that counts the number of items scanned.
+ */
+ static class TestSubscriptionSet extends SubscriptionSet
+ {
+ private int scanned = 0;
+
+ void resetScanned()
+ {
+ scanned = 0;
+ }
+
+ protected void subscriberScanned()
+ {
+ ++scanned;
+ }
+
+ int getScanned()
+ {
+ return scanned;
+ }
+ }
+
+ final TestSubscription sub1 = new TestSubscription("1");
+ final TestSubscription sub2 = new TestSubscription("2");
+ final TestSubscription sub3 = new TestSubscription("3");
+
+ final TestSubscription suspendedSub1 = new TestSubscription("sus1", true);
+ final TestSubscription suspendedSub2 = new TestSubscription("sus2", true);
+ final TestSubscription suspendedSub3 = new TestSubscription("sus3", true);
+
+ @Test
+ public void nextMessage()
+ {
+ SubscriptionSet ss = new SubscriptionSet();
+ assertNull(ss.nextSubscriber(null));
+ assertEquals(0, ss.getCurrentSubscriber());
+
+ ss.addSubscriber(sub1);
+ assertEquals(sub1, ss.nextSubscriber(null));
+ assertEquals(1, ss.getCurrentSubscriber());
+ assertEquals(sub1, ss.nextSubscriber(null));
+ assertEquals(1, ss.getCurrentSubscriber());
+
+ ss.addSubscriber(sub2);
+ ss.addSubscriber(sub3);
+
+ assertEquals(sub2, ss.nextSubscriber(null));
+ assertEquals(2, ss.getCurrentSubscriber());
+
+ assertEquals(sub3, ss.nextSubscriber(null));
+ assertEquals(3, ss.getCurrentSubscriber());
+ }
+
+ @Test
+ public void nextMessageWhenAllSuspended()
+ {
+ SubscriptionSet ss = createAllSuspendedSubscriptionSet();
+ assertNull(ss.nextSubscriber(null));
+ assertEquals(3, ss.getCurrentSubscriber());
+
+ assertNull(ss.nextSubscriber(null));
+ assertEquals(3, ss.getCurrentSubscriber());
+ }
+
+ private TestSubscriptionSet createAllSuspendedSubscriptionSet()
+ {
+ TestSubscriptionSet ss = new TestSubscriptionSet();
+ ss.addSubscriber(suspendedSub1);
+ ss.addSubscriber(suspendedSub2);
+ ss.addSubscriber(suspendedSub3);
+ return ss;
+ }
+
+ @Test
+ public void nextMessageAfterRemove()
+ {
+ SubscriptionSet ss = new SubscriptionSet();
+ ss.addSubscriber(suspendedSub1);
+ ss.addSubscriber(suspendedSub2);
+ ss.addSubscriber(sub3);
+ assertEquals(sub3, ss.nextSubscriber(null));
+ assertEquals(3, ss.getCurrentSubscriber());
+
+ assertEquals(suspendedSub1, ss.removeSubscriber(suspendedSub1));
+
+ assertEquals(sub3, ss.nextSubscriber(null)); // Current implementation handles OutOfBoundsException here.
+ assertEquals(2, ss.getCurrentSubscriber());
+ }
+
+ @Test
+ public void nextMessageOverScanning()
+ {
+ TestSubscriptionSet ss = new TestSubscriptionSet();
+ TestSubscription sub = new TestSubscription("test");
+ ss.addSubscriber(suspendedSub1);
+ ss.addSubscriber(sub);
+ ss.addSubscriber(suspendedSub3);
+ assertEquals(sub, ss.nextSubscriber(null));
+ assertEquals(2, ss.getCurrentSubscriber());
+ assertEquals(2, ss.getScanned());
+
+ ss.resetScanned();
+ sub.setSuspended(true);
+ assertNull(ss.nextSubscriber(null));
+ assertEquals(3, ss.getCurrentSubscriber());
+ // Current implementation overscans by one item here.
+ assertEquals(ss.size() + 1, ss.getScanned());
+ }
+
+ @Test
+ public void nextMessageOverscanWorstCase() {
+ TestSubscriptionSet ss = createAllSuspendedSubscriptionSet();
+ ss.nextSubscriber(null);
+ // Scans the subscriptions twice.
+ assertEquals(ss.size() * 2, ss.getScanned());
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new JUnit4TestAdapter(SubscriptionSetTest.class);
+ }
+}
diff --git a/java/broker/test/src/org/apache/qpid/server/queue/TestSubscription.java b/java/broker/test/src/org/apache/qpid/server/queue/TestSubscription.java
new file mode 100644
index 0000000000..093d7e60f4
--- /dev/null
+++ b/java/broker/test/src/org/apache/qpid/server/queue/TestSubscription.java
@@ -0,0 +1,84 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class TestSubscription implements Subscription
+{
+ private final List<AMQMessage> messages;
+ private final Object key;
+ private boolean isSuspended;
+
+ public TestSubscription(Object key)
+ {
+ this(key, new ArrayList<AMQMessage>());
+ }
+
+ public TestSubscription(final Object key, final boolean isSuspended)
+ {
+ this(key);
+ setSuspended(isSuspended);
+ }
+
+ TestSubscription(Object key, List<AMQMessage> messages)
+ {
+ this.key = key;
+ this.messages = messages;
+ }
+
+ List<AMQMessage> getMessages()
+ {
+ return messages;
+ }
+
+ public void send(AMQMessage msg, AMQQueue queue)
+ {
+ messages.add(msg);
+ }
+
+ public void setSuspended(boolean suspended)
+ {
+ isSuspended = suspended;
+ }
+
+ public boolean isSuspended()
+ {
+ return isSuspended;
+ }
+
+ public void queueDeleted(AMQQueue queue)
+ {
+ }
+
+ public int hashCode()
+ {
+ return key.hashCode();
+ }
+
+ public boolean equals(Object o)
+ {
+ return o instanceof TestSubscription && ((TestSubscription) o).key.equals(key);
+ }
+
+ public String toString()
+ {
+ return key.toString();
+ }
+}
diff --git a/java/broker/test/src/org/apache/qpid/server/queue/UnitTests.java b/java/broker/test/src/org/apache/qpid/server/queue/UnitTests.java
new file mode 100644
index 0000000000..3a86773a15
--- /dev/null
+++ b/java/broker/test/src/org/apache/qpid/server/queue/UnitTests.java
@@ -0,0 +1,38 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import junit.framework.JUnit4TestAdapter;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+ AckTest.class,
+ DeliveryManagerTest.class,
+ SubscriptionManagerTest.class,
+ SubscriptionSetTest.class,
+ ConcurrencyTest.class}
+)
+public class UnitTests
+{
+ public static junit.framework.Test suite()
+ {
+ return new JUnit4TestAdapter(UnitTests.class);
+ }
+}
diff --git a/java/broker/test/src/org/apache/qpid/server/store/SkeletonMessageStore.java b/java/broker/test/src/org/apache/qpid/server/store/SkeletonMessageStore.java
new file mode 100644
index 0000000000..82292d2503
--- /dev/null
+++ b/java/broker/test/src/org/apache/qpid/server/store/SkeletonMessageStore.java
@@ -0,0 +1,99 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.AMQException;
+import org.apache.commons.configuration.Configuration;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * A message store that does nothing. Designed to be used in tests that do not want to use any message store
+ * functionality.
+ */
+public class SkeletonMessageStore implements MessageStore
+{
+ private final AtomicLong _messageId = new AtomicLong(1);
+
+ public void configure(String base, Configuration config) throws Exception
+ {
+ }
+
+ public void configure(QueueRegistry queueRegistry, String base, Configuration config) throws Exception
+ {
+ }
+
+ public void close() throws Exception
+ {
+ }
+
+ public void put(AMQMessage msg)
+ {
+ }
+
+ public void removeMessage(long messageId)
+ {
+ }
+
+ public void createQueue(AMQQueue queue) throws AMQException
+ {
+ }
+
+ public void removeQueue(String name) throws AMQException
+ {
+ }
+
+ public void enqueueMessage(String name, long messageId) throws AMQException
+ {
+ }
+
+ public void dequeueMessage(String name, long messageId) throws AMQException
+ {
+ }
+
+ public void beginTran() throws AMQException
+ {
+ }
+
+ public boolean inTran()
+ {
+ return false;
+ }
+
+ public void commitTran() throws AMQException
+ {
+ }
+
+ public void abortTran() throws AMQException
+ {
+ }
+
+ public List<AMQQueue> createQueues() throws AMQException
+ {
+ return null;
+ }
+
+ public long getNewMessageId()
+ {
+ return _messageId.getAndIncrement();
+ }
+}
diff --git a/java/broker/test/src/org/apache/qpid/server/store/TestReferenceCounting.java b/java/broker/test/src/org/apache/qpid/server/store/TestReferenceCounting.java
new file mode 100644
index 0000000000..d2e3ca6478
--- /dev/null
+++ b/java/broker/test/src/org/apache/qpid/server/store/TestReferenceCounting.java
@@ -0,0 +1,68 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import junit.framework.JUnit4TestAdapter;
+import org.junit.Test;
+import org.junit.Assert;
+import org.junit.Before;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.AMQException;
+
+/**
+ * Tests that reference counting works correctly with AMQMessage and the message store
+ */
+public class TestReferenceCounting
+{
+ private TestableMemoryMessageStore _store;
+
+ @Before
+ public void createCommonObjects()
+ {
+ _store = new TestableMemoryMessageStore();
+ }
+
+ /**
+ * Check that when the reference count is decremented the message removes itself from the store
+ */
+ @Test
+ public void testMessageGetsRemoved() throws AMQException
+ {
+ AMQMessage message = new AMQMessage(_store, null);
+ _store.put(message);
+ Assert.assertTrue(_store.getMessageMap().size() == 1);
+ message.decrementReference();
+ Assert.assertTrue(_store.getMessageMap().size() == 0);
+ }
+
+ @Test
+ public void testMessageRemains() throws AMQException
+ {
+ AMQMessage message = new AMQMessage(_store, null);
+ _store.put(message);
+ Assert.assertTrue(_store.getMessageMap().size() == 1);
+ message.incrementReference();
+ message.decrementReference();
+ Assert.assertTrue(_store.getMessageMap().size() == 1);
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new JUnit4TestAdapter(TestReferenceCounting.class);
+ }
+}
diff --git a/java/broker/test/src/org/apache/qpid/server/store/TestableMemoryMessageStore.java b/java/broker/test/src/org/apache/qpid/server/store/TestableMemoryMessageStore.java
new file mode 100644
index 0000000000..1214cd2825
--- /dev/null
+++ b/java/broker/test/src/org/apache/qpid/server/store/TestableMemoryMessageStore.java
@@ -0,0 +1,39 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import org.apache.qpid.server.queue.AMQMessage;
+
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Adds some extra methods to the memory message store for testing purposes.
+ */
+public class TestableMemoryMessageStore extends MemoryMessageStore
+{
+ public TestableMemoryMessageStore()
+ {
+ _messageMap = new ConcurrentHashMap<Long, AMQMessage>();
+ }
+
+ public ConcurrentMap<Long, AMQMessage> getMessageMap()
+ {
+ return _messageMap;
+ }
+}
diff --git a/java/broker/test/src/org/apache/qpid/server/store/UnitTests.java b/java/broker/test/src/org/apache/qpid/server/store/UnitTests.java
new file mode 100644
index 0000000000..a917d736a2
--- /dev/null
+++ b/java/broker/test/src/org/apache/qpid/server/store/UnitTests.java
@@ -0,0 +1,34 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import junit.framework.JUnit4TestAdapter;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+ TestReferenceCounting.class
+})
+public class UnitTests
+{
+ public static junit.framework.Test suite()
+ {
+ return new JUnit4TestAdapter(UnitTests.class);
+ }
+}
diff --git a/java/broker/test/src/org/apache/qpid/server/util/AveragedRun.java b/java/broker/test/src/org/apache/qpid/server/util/AveragedRun.java
new file mode 100644
index 0000000000..3e4a1edac4
--- /dev/null
+++ b/java/broker/test/src/org/apache/qpid/server/util/AveragedRun.java
@@ -0,0 +1,63 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.util;
+
+import org.apache.qpid.server.util.TimedRun;
+
+import java.util.concurrent.Callable;
+import java.util.Collection;
+
+public class AveragedRun implements Callable<RunStats>
+{
+ private final RunStats stats = new RunStats();
+ private final TimedRun test;
+ private final int iterations;
+
+ public AveragedRun(TimedRun test, int iterations)
+ {
+ this.test = test;
+ this.iterations = iterations;
+ }
+
+ public RunStats call() throws Exception
+ {
+ for (int i = 0; i < iterations; i++)
+ {
+ stats.record(test.call());
+ }
+ return stats;
+ }
+
+ public void run() throws Exception
+ {
+ System.out.println(test + ": " + call());
+ }
+
+ public String toString()
+ {
+ return test.toString();
+ }
+
+ static void run(Collection<AveragedRun> tests) throws Exception
+ {
+ for(AveragedRun test : tests)
+ {
+ test.run();
+ }
+ }
+}
diff --git a/java/broker/test/src/org/apache/qpid/server/util/ConcurrentTest.java b/java/broker/test/src/org/apache/qpid/server/util/ConcurrentTest.java
new file mode 100644
index 0000000000..6c5185e254
--- /dev/null
+++ b/java/broker/test/src/org/apache/qpid/server/util/ConcurrentTest.java
@@ -0,0 +1,76 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.util;
+
+public class ConcurrentTest extends TimedRun
+{
+ private final TimedRun _test;
+ private final Thread[] _threads;
+
+ public ConcurrentTest(TimedRun test, int threads)
+ {
+ super(test.toString());
+ _test = test;
+ _threads = new Thread[threads];
+ }
+
+ protected void setup() throws Exception
+ {
+ _test.setup();
+ for(int i = 0; i < _threads.length; i++)
+ {
+ _threads[i] = new Thread(new Runner());
+ }
+ }
+
+ protected void teardown() throws Exception
+ {
+ _test.teardown();
+ }
+
+ protected void run() throws Exception
+ {
+ for(Thread t : _threads)
+ {
+ t.start();
+ }
+ for(Thread t : _threads)
+ {
+ t.join();
+ }
+ }
+
+ private class Runner implements Runnable
+ {
+ private Exception error;
+
+ public void run()
+ {
+ try
+ {
+ _test.run();
+ }
+ catch(Exception e)
+ {
+ error = e;
+ e.printStackTrace();
+ }
+ }
+ }
+
+}
diff --git a/java/broker/test/src/org/apache/qpid/server/util/LoggingProxyTest.java b/java/broker/test/src/org/apache/qpid/server/util/LoggingProxyTest.java
new file mode 100644
index 0000000000..15c9e1a59a
--- /dev/null
+++ b/java/broker/test/src/org/apache/qpid/server/util/LoggingProxyTest.java
@@ -0,0 +1,89 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.util;
+
+import junit.framework.JUnit4TestAdapter;
+import org.junit.Assert;
+import static org.junit.Assert.assertEquals;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+public class LoggingProxyTest
+{
+ static interface IFoo {
+ void foo();
+ void foo(int i, Collection c);
+ String bar();
+ String bar(String s, List l);
+ }
+
+ static class Foo implements IFoo {
+ public void foo()
+ {
+ }
+
+ public void foo(int i, Collection c)
+ {
+ }
+
+ public String bar()
+ {
+ return null;
+ }
+
+ public String bar(String s, List l)
+ {
+ return "ha";
+ }
+ }
+
+ @Test
+ public void simple() {
+ LoggingProxy proxy = new LoggingProxy(new Foo(), 20);
+ IFoo foo = (IFoo)proxy.getProxy(IFoo.class);
+ foo.foo();
+ assertEquals(2, proxy.getBufferSize());
+ Assert.assertTrue(proxy.getBuffer().get(0).toString().matches(".*: foo\\(\\) entered$"));
+ Assert.assertTrue(proxy.getBuffer().get(1).toString().matches(".*: foo\\(\\) returned$"));
+
+ foo.foo(3, Arrays.asList(0, 1, 2));
+ assertEquals(4, proxy.getBufferSize());
+ Assert.assertTrue(proxy.getBuffer().get(2).toString().matches(".*: foo\\(\\[3, \\[0, 1, 2\\]\\]\\) entered$"));
+ Assert.assertTrue(proxy.getBuffer().get(3).toString().matches(".*: foo\\(\\) returned$"));
+
+ foo.bar();
+ assertEquals(6, proxy.getBufferSize());
+ Assert.assertTrue(proxy.getBuffer().get(4).toString().matches(".*: bar\\(\\) entered$"));
+ Assert.assertTrue(proxy.getBuffer().get(5).toString().matches(".*: bar\\(\\) returned null$"));
+
+ foo.bar("hello", Arrays.asList(1, 2, 3));
+ assertEquals(8, proxy.getBufferSize());
+ Assert.assertTrue(proxy.getBuffer().get(6).toString().matches(".*: bar\\(\\[hello, \\[1, 2, 3\\]\\]\\) entered$"));
+ Assert.assertTrue(proxy.getBuffer().get(7).toString().matches(".*: bar\\(\\) returned ha$"));
+
+ proxy.dump();
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new JUnit4TestAdapter(LoggingProxyTest.class);
+ }
+}
diff --git a/java/broker/test/src/org/apache/qpid/server/util/NullApplicationRegistry.java b/java/broker/test/src/org/apache/qpid/server/util/NullApplicationRegistry.java
new file mode 100644
index 0000000000..3daf143561
--- /dev/null
+++ b/java/broker/test/src/org/apache/qpid/server/util/NullApplicationRegistry.java
@@ -0,0 +1,103 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.util;
+
+import org.apache.qpid.server.exchange.DefaultExchangeFactory;
+import org.apache.qpid.server.exchange.DefaultExchangeRegistry;
+import org.apache.qpid.server.exchange.ExchangeFactory;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.management.ManagedObjectRegistry;
+import org.apache.qpid.server.management.NoopManagedObjectRegistry;
+import org.apache.qpid.server.queue.DefaultQueueRegistry;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.security.auth.AuthenticationManager;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.MapConfiguration;
+
+import java.util.HashMap;
+
+public class NullApplicationRegistry extends ApplicationRegistry
+{
+ private QueueRegistry _queueRegistry;
+
+ private ExchangeRegistry _exchangeRegistry;
+
+ private ExchangeFactory _exchangeFactory;
+
+ private ManagedObjectRegistry _managedObjectRegistry;
+
+ private AuthenticationManager _authenticationManager;
+
+ private MessageStore _messageStore;
+
+ public NullApplicationRegistry()
+ {
+ super(new MapConfiguration(new HashMap()));
+ }
+
+ public void initialise() throws Exception
+ {
+ _managedObjectRegistry = new NoopManagedObjectRegistry();
+ _queueRegistry = new DefaultQueueRegistry();
+ _exchangeFactory = new DefaultExchangeFactory();
+ _exchangeRegistry = new DefaultExchangeRegistry(_exchangeFactory);
+ _authenticationManager = new NullAuthenticationManager();
+ _messageStore = new TestableMemoryMessageStore();
+
+ _configuration.addProperty("heartbeat.delay", 10 * 60); // 10 minutes
+ }
+
+ public Configuration getConfiguration()
+ {
+ return _configuration;
+ }
+
+ public QueueRegistry getQueueRegistry()
+ {
+ return _queueRegistry;
+ }
+
+ public ExchangeRegistry getExchangeRegistry()
+ {
+ return _exchangeRegistry;
+ }
+
+ public ExchangeFactory getExchangeFactory()
+ {
+ return _exchangeFactory;
+ }
+
+ public ManagedObjectRegistry getManagedObjectRegistry()
+ {
+ return _managedObjectRegistry;
+ }
+
+ public AuthenticationManager getAuthenticationManager()
+ {
+ return _authenticationManager;
+ }
+
+ public MessageStore getMessageStore()
+ {
+ return _messageStore;
+ }
+}
+
diff --git a/java/broker/test/src/org/apache/qpid/server/util/NullAuthenticationManager.java b/java/broker/test/src/org/apache/qpid/server/util/NullAuthenticationManager.java
new file mode 100644
index 0000000000..cdde505451
--- /dev/null
+++ b/java/broker/test/src/org/apache/qpid/server/util/NullAuthenticationManager.java
@@ -0,0 +1,82 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.util;
+
+import org.apache.qpid.server.security.auth.AuthenticationManager;
+import org.apache.qpid.server.security.auth.AuthenticationResult;
+
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
+public class NullAuthenticationManager implements AuthenticationManager
+{
+ public String getMechanisms()
+ {
+ return "PLAIN";
+ }
+
+ public SaslServer createSaslServer(String mechanism, String localFQDN) throws SaslException
+ {
+ return new SaslServer()
+ {
+ public String getMechanismName()
+ {
+ return "PLAIN";
+ }
+
+ public byte[] evaluateResponse(byte[] response) throws SaslException
+ {
+ return new byte[0];
+ }
+
+ public boolean isComplete()
+ {
+ return true;
+ }
+
+ public String getAuthorizationID()
+ {
+ return "guest";
+ }
+
+ public byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException
+ {
+ return new byte[0];
+ }
+
+ public byte[] wrap(byte[] outgoing, int offset, int len) throws SaslException
+ {
+ return new byte[0];
+ }
+
+ public Object getNegotiatedProperty(String propName)
+ {
+ return null;
+ }
+
+ public void dispose() throws SaslException
+ {
+ }
+ };
+ }
+
+ public AuthenticationResult authenticate(SaslServer server, byte[] response)
+ {
+ return new AuthenticationResult(AuthenticationResult.AuthenticationStatus.SUCCESS);
+ }
+}
diff --git a/java/broker/test/src/org/apache/qpid/server/util/RunStats.java b/java/broker/test/src/org/apache/qpid/server/util/RunStats.java
new file mode 100644
index 0000000000..248622836d
--- /dev/null
+++ b/java/broker/test/src/org/apache/qpid/server/util/RunStats.java
@@ -0,0 +1,54 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.util;
+
+public class RunStats
+{
+ private long min = Long.MAX_VALUE;
+ private long max;
+ private long total;
+ private int count;
+
+ public void record(long time)
+ {
+ max = Math.max(time, max);
+ min = Math.min(time, min);
+ total += time;
+ count++;
+ }
+
+ public long getMin()
+ {
+ return min;
+ }
+
+ public long getMax()
+ {
+ return max;
+ }
+
+ public long getAverage()
+ {
+ return total / count;
+ }
+
+ public String toString()
+ {
+ return "avg=" + getAverage() + ", min=" + min + ", max=" + max;
+ }
+}
diff --git a/java/broker/test/src/org/apache/qpid/server/util/TimedRun.java b/java/broker/test/src/org/apache/qpid/server/util/TimedRun.java
new file mode 100644
index 0000000000..f779b7fbb6
--- /dev/null
+++ b/java/broker/test/src/org/apache/qpid/server/util/TimedRun.java
@@ -0,0 +1,49 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.util;
+
+import java.util.concurrent.Callable;
+
+public abstract class TimedRun implements Callable<Long>
+{
+ private final String description;
+
+ public TimedRun(String description)
+ {
+ this.description = description;
+ }
+
+ public Long call() throws Exception
+ {
+ setup();
+ long start = System.currentTimeMillis();
+ run();
+ long stop = System.currentTimeMillis();
+ teardown();
+ return stop - start;
+ }
+
+ public String toString()
+ {
+ return description;
+ }
+
+ protected void setup() throws Exception{}
+ protected void teardown() throws Exception{}
+ protected abstract void run() throws Exception;
+}
diff --git a/java/broker/test/src/org/apache/qpid/server/util/UnitTests.java b/java/broker/test/src/org/apache/qpid/server/util/UnitTests.java
new file mode 100644
index 0000000000..d6cc471413
--- /dev/null
+++ b/java/broker/test/src/org/apache/qpid/server/util/UnitTests.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.util;
+
+import junit.framework.JUnit4TestAdapter;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+@RunWith(Suite.class)
+@Suite.SuiteClasses({LoggingProxyTest.class})
+public class UnitTests
+{
+ public static junit.framework.Test suite()
+ {
+ return new JUnit4TestAdapter(UnitTests.class);
+ }
+}