From 5f303af110e3696845372b5cb657c8d26a688f8e Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Thu, 29 May 2008 12:54:06 +0000 Subject: Temp fix out of order issue with async(sub) git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/broker-queue-refactor@661324 13f79535-47bb-0310-9956-ffa450edef68 --- .../main/java/org/apache/qpid/server/AMQChannel.java | 2 +- .../java/org/apache/qpid/server/queue/AMQQueue.java | 2 ++ .../org/apache/qpid/server/queue/SimpleAMQQueue.java | 19 +++++++++++++------ 3 files changed, 16 insertions(+), 7 deletions(-) (limited to 'java') diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 847c8b8459..a302a5b503 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -831,7 +831,7 @@ public class AMQChannel // may need to deliver queued messages for (Subscription s : _tag2SubscriptionMap.values()) { - s.getQueue().deliverAsync(s); + s.getQueue().deliverAsync(); } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 570bd97a28..f3e4e7c28b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -152,6 +152,8 @@ public interface AMQQueue extends Managable, Comparable void deliverAsync(final Subscription sub); + void deliverAsync(); + /** * ExistingExclusiveSubscription signals a failure to create a subscription, because an exclusive subscription diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 95cfa8d36c..efa9e9180d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -245,12 +245,19 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener throw new ExistingExclusiveSubscription(); } - if(exclusive && getConsumerCount() != 0) + if(exclusive) { - throw new ExistingSubscriptionPreventsExclusive(); + if(getConsumerCount() != 0) + { + throw new ExistingSubscriptionPreventsExclusive(); + } + else + { + _exclusiveSubscriber = subscription; + + } } - setExclusiveSubscriber(subscription); _activeSubscriberCount.incrementAndGet(); subscription.setStateListener(this); @@ -271,7 +278,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } - deliverAsync(subscription); + deliverAsync(); } @@ -765,7 +772,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener _activeSubscriberCount.incrementAndGet(); } - deliverAsync(sub); + deliverAsync(); } } @@ -1655,7 +1662,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public void stateChanged(QueueEntry entry, QueueEntry.State oldSate, QueueEntry.State newState) { entry.removeStateChangeListener(this); - deliverAsync(_sub); + deliverAsync(); } } } \ No newline at end of file -- cgit v1.2.1