diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2006-11-07 11:05:25 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2006-11-07 11:05:25 +0000 |
| commit | 796c146fdd324d8a0c0fe1087a13e2a377f1a88e (patch) | |
| tree | 851b97a06b42bc22aa9c79641656910ad2599be7 /java/broker/test/src | |
| parent | d14a9ea19cc719dc4669b8649f279aed5fa5271e (diff) | |
| download | qpid-python-796c146fdd324d8a0c0fe1087a13e2a377f1a88e.tar.gz | |
QPID-69
Made an interface from the current DeliveryManager.java. The original DeliveryManager is now the SynchronizedDeliveryManager.java where the deliver() method now has synchronization to solve the race condition.
An alternative DeliveryManager - ConcurrentDeliveryManager.java uses a modified ConcurrentLinkedQueue (Modified to maintain the current queue size) this uses a compare and swap methods to allow concurrent access to each end of the queue. Additional locking is required once the queue has been depleted to ensure that a thread is not in the process of appending to the queue.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@472060 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/test/src')
4 files changed, 123 insertions, 28 deletions
diff --git a/java/broker/test/src/org/apache/qpid/server/queue/ConcurrencyTest.java b/java/broker/test/src/org/apache/qpid/server/queue/ConcurrencyTest.java index 1cf11933fa..caf8ba0d8a 100644 --- a/java/broker/test/src/org/apache/qpid/server/queue/ConcurrencyTest.java +++ b/java/broker/test/src/org/apache/qpid/server/queue/ConcurrencyTest.java @@ -54,7 +54,7 @@ public class ConcurrencyTest extends MessageTestHelper public ConcurrencyTest() throws Exception { - _deliveryMgr = new DeliveryManager(_subscriptionMgr, new AMQQueue("myQ", false, "guest", false, + _deliveryMgr = new ConcurrentDeliveryManager(_subscriptionMgr, new AMQQueue("myQ", false, "guest", false, new DefaultQueueRegistry())); } @@ -165,7 +165,7 @@ public class ConcurrencyTest extends MessageTestHelper } else { - if (_deliveryMgr.getQueueMessageCount() == 0) { + if (!_deliveryMgr.hasQueuedMessages()) { isComplete = true; } return null; diff --git a/java/broker/test/src/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java b/java/broker/test/src/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java new file mode 100644 index 0000000000..b2dcd31222 --- /dev/null +++ b/java/broker/test/src/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java @@ -0,0 +1,48 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.server.queue.ConcurrentDeliveryManager; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.DefaultQueueRegistry; +import org.apache.qpid.server.queue.DeliveryManagerTest; +import org.apache.qpid.AMQException; +import junit.framework.JUnit4TestAdapter; + +public class ConcurrentDeliveryManagerTest extends DeliveryManagerTest +{ + public ConcurrentDeliveryManagerTest() throws Exception + { + try + { + System.setProperty("concurrentdeliverymanager","true"); + _mgr = new ConcurrentDeliveryManager(_subscriptions, new AMQQueue("myQ", false, "guest", false, + new DefaultQueueRegistry())); + } + catch (Throwable t) + { + t.printStackTrace(); + throw new AMQException("Could not initialise delivery manager", t); + } + } + + public static junit.framework.Test suite() + { + return new JUnit4TestAdapter(ConcurrentDeliveryManagerTest.class); + } +} diff --git a/java/broker/test/src/org/apache/qpid/server/queue/DeliveryManagerTest.java b/java/broker/test/src/org/apache/qpid/server/queue/DeliveryManagerTest.java index ef287e079b..cc0f156dba 100644 --- a/java/broker/test/src/org/apache/qpid/server/queue/DeliveryManagerTest.java +++ b/java/broker/test/src/org/apache/qpid/server/queue/DeliveryManagerTest.java @@ -20,40 +20,38 @@ package org.apache.qpid.server.queue; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import org.junit.Test; +import org.junit.runners.Suite; +import org.junit.runner.RunWith; import org.apache.qpid.server.handler.OnCurrentThreadExecutor; import org.apache.qpid.AMQException; import junit.framework.JUnit4TestAdapter; -public class DeliveryManagerTest extends MessageTestHelper +@RunWith(Suite.class) +@Suite.SuiteClasses({ + ConcurrentDeliveryManagerTest.class, + SynchronizedDeliveryManagerTest.class}) + +abstract public class DeliveryManagerTest extends MessageTestHelper { - private final SubscriptionSet _subscriptions = new SubscriptionSet(); - private final DeliveryManager _mgr; + protected final SubscriptionSet _subscriptions = new SubscriptionSet(); + protected DeliveryManager _mgr; public DeliveryManagerTest() throws Exception { - try - { - _mgr = new DeliveryManager(_subscriptions, new AMQQueue("myQ", false, "guest", false, - new DefaultQueueRegistry())); - } - catch(Throwable t) - { - t.printStackTrace(); - throw new AMQException("Could not initialise delivery manager", t); - } } + @Test public void startInQueueingMode() throws AMQException { AMQMessage[] messages = new AMQMessage[10]; - for(int i = 0; i < messages.length; i++) + for (int i = 0; i < messages.length; i++) { messages[i] = message(); } int batch = messages.length / 2; - for(int i = 0; i < batch; i++) + for (int i = 0; i < batch; i++) { _mgr.deliver("Me", messages[i]); } @@ -63,7 +61,7 @@ public class DeliveryManagerTest extends MessageTestHelper _subscriptions.addSubscriber(s1); _subscriptions.addSubscriber(s2); - for(int i = batch; i < messages.length; i++) + for (int i = batch; i < messages.length; i++) { _mgr.deliver("Me", messages[i]); } @@ -76,9 +74,9 @@ public class DeliveryManagerTest extends MessageTestHelper assertEquals(messages.length / 2, s1.getMessages().size()); assertEquals(messages.length / 2, s2.getMessages().size()); - for(int i = 0; i < messages.length; i++) + for (int i = 0; i < messages.length; i++) { - if(i % 2 == 0) + if (i % 2 == 0) { assertTrue(s1.getMessages().get(i / 2) == messages[i]); } @@ -93,7 +91,7 @@ public class DeliveryManagerTest extends MessageTestHelper public void startInDirectMode() throws AMQException { AMQMessage[] messages = new AMQMessage[10]; - for(int i = 0; i < messages.length; i++) + for (int i = 0; i < messages.length; i++) { messages[i] = message(); } @@ -102,13 +100,13 @@ public class DeliveryManagerTest extends MessageTestHelper TestSubscription s1 = new TestSubscription("1"); _subscriptions.addSubscriber(s1); - for(int i = 0; i < batch; i++) + for (int i = 0; i < batch; i++) { _mgr.deliver("Me", messages[i]); } assertEquals(batch, s1.getMessages().size()); - for(int i = 0; i < batch; i++) + for (int i = 0; i < batch; i++) { assertTrue(messages[i] == s1.getMessages().get(i)); } @@ -116,7 +114,7 @@ public class DeliveryManagerTest extends MessageTestHelper assertEquals(0, s1.getMessages().size()); s1.setSuspended(true); - for(int i = batch; i < messages.length; i++) + for (int i = batch; i < messages.length; i++) { _mgr.deliver("Me", messages[i]); } @@ -128,22 +126,22 @@ public class DeliveryManagerTest extends MessageTestHelper _mgr.processAsync(new OnCurrentThreadExecutor()); assertEquals(messages.length - batch, s1.getMessages().size()); - for(int i = batch; i < messages.length; i++) + for (int i = batch; i < messages.length; i++) { assertTrue(messages[i] == s1.getMessages().get(i - batch)); } } - @Test (expected=NoConsumersException.class) + @Test(expected = NoConsumersException.class) public void noConsumers() throws AMQException { AMQMessage msg = message(true); _mgr.deliver("Me", msg); - msg.checkDeliveredToConsumer(); + msg.checkDeliveredToConsumer(); } - @Test (expected=NoConsumersException.class) + @Test(expected = NoConsumersException.class) public void noActiveConsumers() throws AMQException { TestSubscription s = new TestSubscription("A"); diff --git a/java/broker/test/src/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java b/java/broker/test/src/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java new file mode 100644 index 0000000000..7250f9f739 --- /dev/null +++ b/java/broker/test/src/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java @@ -0,0 +1,49 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.server.queue.SynchronizedDeliveryManager; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.DefaultQueueRegistry; +import org.apache.qpid.server.queue.ConcurrentDeliveryManager; +import org.apache.qpid.server.queue.DeliveryManagerTest; +import org.apache.qpid.AMQException; +import junit.framework.JUnit4TestAdapter; + +public class SynchronizedDeliveryManagerTest extends DeliveryManagerTest +{ + public SynchronizedDeliveryManagerTest() throws Exception + { + try + { + System.setProperty("concurrentdeliverymanager","false"); + _mgr = new SynchronizedDeliveryManager(_subscriptions, new AMQQueue("myQ", false, "guest", false, + new DefaultQueueRegistry())); + } + catch (Throwable t) + { + t.printStackTrace(); + throw new AMQException("Could not initialise delivery manager", t); + } + } + + public static junit.framework.Test suite() + { + return new JUnit4TestAdapter(SynchronizedDeliveryManagerTest.class); + } +} |
