diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2008-05-27 12:43:04 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2008-05-27 12:43:04 +0000 |
| commit | 980643b9364f2ec7e75f9e4a391754f5db4bc24a (patch) | |
| tree | a7f43779191f41bc9d8413460c302ff25a67ab76 /java/broker | |
| parent | 4b1cc6b00ded3584ed2f11431845de09f195ed14 (diff) | |
| download | qpid-python-980643b9364f2ec7e75f9e4a391754f5db4bc24a.tar.gz | |
Refactoring updates (job queue changes, enqueue collections..)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/broker-queue-refactor@660490 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker')
12 files changed, 384 insertions, 156 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java index 441f88b9b6..d1bea3410b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java @@ -50,6 +50,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Collection; import java.util.concurrent.CopyOnWriteArrayList; /** @@ -248,8 +249,10 @@ public class HeadersExchange extends AbstractExchange _logger.debug("Exchange " + getName() + ": routing message with headers " + headers); } boolean routed = false; + Collection<AMQQueue> queues = new ArrayList<AMQQueue>(); for (Registration e : _bindings) { + if (e.binding.matches(headers)) { if (_logger.isDebugEnabled()) @@ -257,10 +260,12 @@ public class HeadersExchange extends AbstractExchange _logger.debug("Exchange " + getName() + ": delivering message with headers " + headers + " to " + e.queue.getName()); } - payload.enqueue(e.queue); + queues.add(e.queue); + routed = true; } } + payload.enqueue(queues); } public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue) diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java index 8d3110ef18..d07501a188 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java @@ -262,8 +262,24 @@ public class TopicExchange extends AbstractExchange _filteredQueues.put(queue,newFilters); } - public Set<AMQQueue> processMessage(IncomingMessage msg, Set<AMQQueue> queues) + public Collection<AMQQueue> processMessage(IncomingMessage msg, Collection<AMQQueue> queues) { + if(queues == null) + { + if(_filteredQueues.isEmpty()) + { + return new ArrayList<AMQQueue>(_unfilteredQueues.keySet()); + } + else + { + queues = new HashSet<AMQQueue>(); + } + } + else if(!(queues instanceof Set)) + { + queues = new HashSet<AMQQueue>(queues); + } + queues.addAll(_unfilteredQueues.keySet()); if(!_filteredQueues.isEmpty()) { @@ -621,11 +637,11 @@ public class TopicExchange extends AbstractExchange } else { - Set<AMQQueue> queues = new HashSet<AMQQueue>(); + Collection<AMQQueue> queues = results.size() == 1 ? null : new HashSet<AMQQueue>(); for(TopicMatcherResult result : results) { - ((TopicExchangeResult)result).processMessage(message, queues); + queues = ((TopicExchangeResult)result).processMessage(message, queues); } return queues; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 0a6bfb15e6..bdb16d0fcb 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -444,7 +444,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable public boolean channelAwaitingClosure(int channelId) { - return _closingChannelsList.contains(channelId); + return !_closingChannelsList.isEmpty() && _closingChannelsList.contains(channelId); } public void addChannel(AMQChannel channel) throws AMQException diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index 05533e0d2d..0e5e7aa68c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -136,6 +136,11 @@ public class AMQMessage implements Filterable<AMQException> } } + public void clearStoreContext() + { + _storeContext = new StoreContext(); + } + public StoreContext getStoreContext() { return _storeContext; diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 780cd49834..570bd97a28 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -58,15 +58,13 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue> void unregisterSubscription(final Subscription subscription) throws AMQException; + int getConsumerCount(); int getActiveConsumerCount(); boolean isUnused(); - - - boolean isEmpty(); int getMessageCount(); @@ -80,10 +78,27 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue> long getOldestMessageArrivalTime(); - boolean isDeleted(); + int delete() throws AMQException; + + + QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException; + + void requeue(StoreContext storeContext, QueueEntry entry) throws AMQException; + + void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException; + + + + boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException; + + + + void addQueueDeleteTask(final Task task); + + List<QueueEntry> getMessagesOnTheQueue(); List<QueueEntry> getMessagesOnTheQueue(long fromMessageId, long toMessageId); @@ -91,7 +106,6 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue> QueueEntry getMessageOnTheQueue(long messageId); - void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, StoreContext storeContext); @@ -99,9 +113,7 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue> void removeMessagesFromQueue(long fromMessageId, long toMessageId, StoreContext storeContext); - void quiesce(); - void start(); long getMaximumMessageSize(); @@ -132,27 +144,14 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue> - int delete() throws AMQException; - - - QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException; - - void requeue(StoreContext storeContext, QueueEntry entry) throws AMQException; - - void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException; - - void deliverAsync(); - - void addQueueDeleteTask(final Task task); - - boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException; - void removeExpiredIfNoSubscribers() throws AMQException; Set<NotificationCheck> getNotificationChecks(); void flushSubscription(final Subscription sub) throws AMQException; + void deliverAsync(final Subscription sub); + /** * ExistingExclusiveSubscription signals a failure to create a subscription, because an exclusive subscription diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java index 68b429efc6..81c8c04d6d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java @@ -32,10 +32,8 @@ import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.exchange.NoRouteException; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.AMQException; -import org.apache.qpid.common.ClientProperties; import org.apache.log4j.Logger; -import java.util.ArrayList; import java.util.Collection; public class IncomingMessage implements Filterable<RuntimeException> @@ -198,19 +196,53 @@ public class IncomingMessage implements Filterable<RuntimeException> } else { + int offset; + final int queueCount = destinationQueues.size(); + if(queueCount == 1) + { + offset = 0; + } + else + { + offset = ((int)(message.getMessageId().longValue())) % queueCount; + if(offset < 0) + { + offset = -offset; + } + } + + int i = 0; for (AMQQueue q : destinationQueues) { - // Increment the references to this message for each queue delivery. - message.incrementReference(); - // normal deliver so add this message at the end. - _txnContext.deliver(q, message); + if(++i > offset) + { + // Increment the references to this message for each queue delivery. + message.incrementReference(); + // normal deliver so add this message at the end. + _txnContext.deliver(q, message); + } } + i = 0; + if(offset != 0) + { + for (AMQQueue q : destinationQueues) + { + if(i++ < offset) + { + // Increment the references to this message for each queue delivery. + message.incrementReference(); + // normal deliver so add this message at the end. + _txnContext.deliver(q, message); + } + } + } + } // we then allow the transactional context to do something with the message content // now that it has all been received, before we attempt delivery _txnContext.messageFullyReceived(isPersistent()); - + message.clearStoreContext(); return message; } finally @@ -257,16 +289,6 @@ public class IncomingMessage implements Filterable<RuntimeException> return _messagePublishInfo.isImmediate(); } - - public void enqueue(final AMQQueue q) throws AMQException - { - if(_destinationQueues == null) - { - _destinationQueues = new ArrayList<AMQQueue>(); - } - _destinationQueues.add(q); - } - public ContentHeaderBody getContentHeaderBody() { return _contentHeaderBody; diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index fea1db97b3..d26d6af7b2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -42,7 +42,7 @@ public class QueueEntryImpl implements QueueEntry private final SimpleQueueEntryList _queueEntryList; - private final AMQMessage _message; + private AMQMessage _message; private Set<Subscription> _rejectedBy = null; @@ -376,7 +376,7 @@ public class QueueEntryImpl implements QueueEntry if(state != DELETED_STATE && _stateUpdater.compareAndSet(this,state,DELETED_STATE)) { - _queueEntryList.advanceHead(); + _queueEntryList.advanceHead(); return true; } else diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 16d24e74ee..5baf48245c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -11,6 +11,8 @@ import org.apache.qpid.server.subscription.SubscriptionList; import org.apache.qpid.server.output.ProtocolOutputConverter; import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.AMQException; +import org.apache.qpid.pool.ReadWriteRunnable; +import org.apache.qpid.pool.ReferenceCountingExecutorService; import org.apache.qpid.configuration.Configured; import org.apache.log4j.Logger; @@ -21,8 +23,6 @@ import java.util.ArrayList; import java.util.EnumSet; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -84,7 +84,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener protected final SubscriptionList _subscriptionList = new SubscriptionList(this); private final AtomicReference<SubscriptionList.SubscriptionNode> _lastSubscriptionNode = new AtomicReference<SubscriptionList.SubscriptionNode>(_subscriptionList.getHead()); - private boolean _exclusiveSubscriber; + private volatile Subscription _exclusiveSubscriber; private final QueueEntryList _entries; @@ -116,9 +116,15 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener @Configured(path = "minimumAlertRepeatGap", defaultValue = "0") public long _minimumAlertRepeatGap; + + + private static final int MAX_ASYNC_DELIVERIES = 10; + + private final Set<NotificationCheck> _notificationChecks = EnumSet.noneOf(NotificationCheck.class); + private final AtomicLong _stateChangeCount = new AtomicLong(Long.MIN_VALUE); private AtomicReference _asynchronousRunner = new AtomicReference(null); private AtomicInteger _deliveredMessages = new AtomicInteger(); @@ -155,7 +161,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener _virtualHost = virtualHost; _entries = entryListFactory.createQueueEntryList(this); - _asyncDelivery = AsyncDeliveryConfig.getAsyncDeliveryExecutor(); + _asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService(); + + AsyncDeliveryConfig.getAsyncDeliveryExecutor(); try { @@ -235,11 +243,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener // ------ Manage Subscriptions - public void registerSubscription(final Subscription subscription, final boolean exclusive) throws AMQException + public synchronized void registerSubscription(final Subscription subscription, final boolean exclusive) throws AMQException { - if(_exclusiveSubscriber) + if(isExclusiveSubscriber()) { throw new ExistingExclusiveSubscription(); } @@ -249,7 +257,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener throw new ExistingSubscriptionPreventsExclusive(); } - _exclusiveSubscriber = exclusive; + setExclusiveSubscriber(subscription); _activeSubscriberCount.incrementAndGet(); subscription.setStateListener(this); @@ -274,7 +282,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } - public void unregisterSubscription(final Subscription subscription) throws AMQException + public synchronized void unregisterSubscription(final Subscription subscription) throws AMQException { if(subscription == null) { @@ -289,9 +297,16 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { subscription.close(); // No longer can the queue have an exclusive consumer - _exclusiveSubscriber = false; + setExclusiveSubscriber(null); + QueueEntry lastSeen; + + while((lastSeen = subscription.getLastSeenEntry()) != null) + { + subscription.setLastSeenEntry(lastSeen, null); + } + @@ -329,83 +344,84 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener _totalMessagesReceived.incrementAndGet(); - QueueEntry entry = _entries.add(message); + QueueEntry entry; + Subscription exclusiveSub = _exclusiveSubscriber; + if(exclusiveSub != null) + { + exclusiveSub.getSendLock(); - /* + try + { + entry = _entries.add(message); + deliverToSubscription(exclusiveSub, entry); - iterate over subscriptions and if any is at the end of the queue and can deliver this message, then deliver the message - */ - SubscriptionList.SubscriptionNode node = _lastSubscriptionNode.get(); - SubscriptionList.SubscriptionNode nextNode = node.getNext(); - if(nextNode == null) - { - nextNode = _subscriptionList.getHead().getNext(); - } - while(nextNode != null) - { - if(_lastSubscriptionNode.compareAndSet(node, nextNode)) - { - break; - } - else - { - node = _lastSubscriptionNode.get(); - nextNode = node.getNext(); - if(nextNode == null) + // where there is more than one producer there's a reasonable chance that even though there is + // no "queueing" we do not deliver because we get an interleving of _entries.add and + // deliverToSubscription between threads. Therefore have one more try. + if(!(entry.isAcquired() || entry.isDeleted())) { - nextNode = _subscriptionList.getHead().getNext(); + deliverToSubscription(exclusiveSub, entry); } } + finally + { + exclusiveSub.releaseSendLock(); + } } + else + { + entry = _entries.add(message); + /* + iterate over subscriptions and if any is at the end of the queue and can deliver this message, then deliver the message - // always do one extra loop after we believe we've finished - // this catches the case where we *just* miss an update - int loops = 2; - - while(!entry.isAcquired() && loops != 0) - { + */ + SubscriptionList.SubscriptionNode node = _lastSubscriptionNode.get(); + SubscriptionList.SubscriptionNode nextNode = node.getNext(); if(nextNode == null) { - loops--; - nextNode = _subscriptionList.getHead(); + nextNode = _subscriptionList.getHead().getNext(); } - else + while(nextNode != null) { - // if subscription at end, and active, offer - Subscription sub = nextNode.getSubscription(); - synchronized(sub.getSendLock()) + if(_lastSubscriptionNode.compareAndSet(node, nextNode)) { - if(subscriptionReadyAndHasInterest(sub, entry) - && !sub.isSuspended() - && sub.isActive()) + break; + } + else + { + node = _lastSubscriptionNode.get(); + nextNode = node.getNext(); + if(nextNode == null) { - if( !sub.wouldSuspend(entry)) - { - if(!sub.isBrowser() && !entry.acquire(sub)) - { - sub.restoreCredit(entry); - } - else - { - QueueEntry queueEntryNode = sub.getLastSeenEntry(); - if(_entries.next(queueEntryNode) == entry) - { - sub.setLastSeenEntry(queueEntryNode,entry); - } - - deliverMessage(sub, entry); - - } - } + nextNode = _subscriptionList.getHead().getNext(); } } } - nextNode = nextNode.getNext(); - } + // always do one extra loop after we believe we've finished + // this catches the case where we *just* miss an update + int loops = 2; + + while(!entry.isAcquired() && loops != 0) + { + if(nextNode == null) + { + loops--; + nextNode = _subscriptionList.getHead(); + } + else + { + // if subscription at end, and active, offer + Subscription sub = nextNode.getSubscription(); + deliverToSubscription(sub, entry); + } + nextNode = nextNode.getNext(); + + } + } if(entry.immediateAndNotDelivered()) @@ -413,7 +429,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener dequeue(storeContext, entry); entry.dispose(storeContext); } - else if(!entry.isAcquired()) + else if(!(entry.isAcquired() || entry.isDeleted())) { checkSubscriptionsNotAheadOfDelivery(entry); @@ -435,6 +451,42 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } + private void deliverToSubscription(final Subscription sub, final QueueEntry entry) + throws AMQException + { + sub.getSendLock(); + try + { + if(subscriptionReadyAndHasInterest(sub, entry) + && !sub.isSuspended() + && sub.isActive()) + { + if( !sub.wouldSuspend(entry)) + { + if(!sub.isBrowser() && !entry.acquire(sub)) + { + sub.restoreCredit(entry); + } + else + { + QueueEntry queueEntryNode = sub.getLastSeenEntry(); + if(_entries.next(queueEntryNode) == entry) + { + sub.setLastSeenEntry(queueEntryNode,entry); + } + + deliverMessage(sub, entry); + + } + } + } + } + finally + { + sub.releaseSendLock(); + } + } + protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry) { // This method is only required for queues which mess with ordering @@ -588,7 +640,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener /* TODO : This is wrong as the subscription may be suspended, we should instead change the state of the message entry to resend and move back the subscription pointer. */ - synchronized(subscription.getSendLock()) + subscription.getSendLock(); + try { if(!subscription.isClosed()) { @@ -600,6 +653,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener return false; } } + finally + { + subscription.releaseSendLock(); + } } @@ -703,7 +760,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener _activeSubscriberCount.incrementAndGet(); } - deliverAsync(); + deliverAsync(sub); } } @@ -722,6 +779,16 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener return _atomicQueueSize; } + private boolean isExclusiveSubscriber() + { + return _exclusiveSubscriber != null; + } + + private void setExclusiveSubscriber(Subscription exclusiveSubscriber) + { + _exclusiveSubscriber = exclusiveSubscriber; + } + public static interface QueueEntryFilter { public boolean accept(QueueEntry entry); @@ -999,22 +1066,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } - public void quiesce() - { - _quiesced.set(true); - } - - public void start() - { - if(_quiesced.compareAndSet(true,false)) - { - deliverAsync(); - } - } - - - - // ------ Management functions @@ -1088,6 +1139,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } _deleteTaskList.clear(); + ReferenceCountingExecutorService.getInstance().releaseExecutorService(); } return getMessageCount(); @@ -1098,13 +1150,20 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { _stateChangeCount.incrementAndGet(); - if(_asynchronousRunner.get() == null) - { - _asyncDelivery.execute(new Runner()); + Runner runner = new Runner(); + + if(_asynchronousRunner.compareAndSet(null,runner)) + { + _asyncDelivery.execute(runner); } } - private class Runner implements Runnable + public void deliverAsync(Subscription sub) + { + _asyncDelivery.execute(new SubFlushRunner(sub)); + } + + private class Runner implements ReadWriteRunnable { public void run() { @@ -1118,21 +1177,77 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } } + + public boolean isRead() + { + return false; + } + + public boolean isWrite() + { + return true; + } + } + + + private class SubFlushRunner implements ReadWriteRunnable + { + private final Subscription _sub; + + + public SubFlushRunner(Subscription sub) + { + _sub = sub; + } + + public void run() + { + boolean complete = false; + try + { + complete = flushSubscription(_sub, MAX_ASYNC_DELIVERIES); + + } + catch (AMQException e) + { + _logger.error(e); + } + if(!complete && !_sub.isSuspended()) + { + _asyncDelivery.execute(this); + } + + } + + public boolean isRead() + { + return false; + } + + public boolean isWrite() + { + return true; + } } public void flushSubscription(Subscription sub) throws AMQException { + flushSubscription(sub, Long.MAX_VALUE); + } + + public boolean flushSubscription(Subscription sub, long deliveries) throws AMQException + { boolean atTail = false; - while(sub.isActive() && !atTail) + + while(!sub.isSuspended() && !atTail && deliveries != 0) { - synchronized(sub.getSendLock()) + sub.getSendLock(); + try { if(sub.isActive()) { - QueueEntry node = moveSubscriptionToNextNode(sub); - if(!(node.isAcquired() || node.isDeleted())) { if(!sub.isSuspended()) @@ -1148,6 +1263,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } else { + deliveries--; deliverMessage(sub, node); if(sub.isBrowser()) @@ -1159,8 +1275,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener sub.setLastSeenEntry(node, newNode); node = sub.getLastSeenEntry(); } - - } } @@ -1180,13 +1294,36 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } } } + } atTail = (_entries.next(node) == null); } } + finally + { + sub.releaseSendLock(); + } } + + if(!isExclusiveSubscriber()) + { + advanceAllSubscriptions(); + } + + return atTail; + } + + protected void advanceAllSubscriptions() throws AMQException + { + SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator(); + while(subscriberIter.advance()) + { + SubscriptionList.SubscriptionNode subNode = subscriberIter.getNode(); + Subscription sub = subNode.getSubscription(); + moveSubscriptionToNextNode(sub); + } } private QueueEntry moveSubscriptionToNextNode(final Subscription sub) @@ -1227,8 +1364,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener boolean deliveryIncomplete = true; int extraLoops = 1; + int deliveries = MAX_ASYNC_DELIVERIES; + + _asynchronousRunner.compareAndSet(runner,null); - while(((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete ) && _asynchronousRunner.compareAndSet(null,runner)) + while(deliveries != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete ) && _asynchronousRunner.compareAndSet(null,runner)) { // we want to have one extra loop after the every subscription has reached the point where it cannot move // further, just in case the advance of one subscription in the last loop allows a different subscription to @@ -1251,17 +1391,21 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener Subscription sub = subscriptionIter.getNode().getSubscription(); if(sub != null) { - synchronized(sub.getSendLock()) + sub.getSendLock(); + try { + QueueEntry node = moveSubscriptionToNextNode(sub); + if(sub.isActive()) { boolean advanced = false; + boolean subActive = false; - QueueEntry node = moveSubscriptionToNextNode(sub); if(!(node.isAcquired() || node.isDeleted())) { if(!sub.isSuspended()) { + subActive = true; if(sub.hasInterest(node)) { if(!sub.wouldSuspend(node)) @@ -1274,6 +1418,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener else { deliverMessage(sub, node); + deliveries--; if(sub.isBrowser()) { @@ -1309,7 +1454,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } final boolean atTail = (_entries.next(node) == null); - done = done && atTail; + done = done && (!subActive || atTail); if(atTail && !advanced && sub.isAutoClose()) { @@ -1322,6 +1467,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } } + finally + { + sub.releaseSendLock(); + } } if(done) { @@ -1346,6 +1495,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } + if(deliveries == 0 && _asynchronousRunner.compareAndSet(null,runner)) + { + _asyncDelivery.execute(runner); + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java index acbeae0f40..537966e3aa 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java +++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java @@ -28,7 +28,6 @@ import org.apache.qpid.server.queue.QueueEntry; public interface Subscription { - boolean isActive(); public static enum State @@ -75,7 +74,7 @@ public interface Subscription boolean wouldSuspend(QueueEntry msg); Object getSendLock(); - + void releaseSendLock(); void resend(final QueueEntry entry) throws AMQException; @@ -87,4 +86,9 @@ public interface Subscription boolean setLastSeenEntry(QueueEntry expected, QueueEntry newValue); + + boolean isActive(); + + + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java index 653c7de514..8e124c8b0c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java @@ -22,6 +22,9 @@ package org.apache.qpid.server.subscription; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.Lock; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; @@ -62,6 +65,9 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage private final RecordDeliveryMethod _recordMethod; private QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this); + private final Lock _stateChangeLock; + private final Lock _stateChangeExclusiveLock; + static final class BrowserSubscription extends SubscriptionImpl @@ -254,7 +260,8 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString(); private AMQQueue _queue; - private final AtomicBoolean _sendLock = new AtomicBoolean(false); + private final AtomicBoolean _deleted = new AtomicBoolean(false); + @@ -280,7 +287,9 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage _deliveryMethod = deliveryMethod; _recordMethod = recordMethod; - + ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + _stateChangeLock = readWriteLock.readLock(); + _stateChangeExclusiveLock = readWriteLock.writeLock(); if (arguments != null) { @@ -334,7 +343,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage public boolean isSuspended() { - return !isActive() || _channel.isSuspended() || _sendLock.get(); + return !isActive() || _channel.isSuspended() || _deleted.get(); } /** @@ -344,7 +353,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage */ public void queueDeleted(AMQQueue queue) { - _sendLock.set(true); + _deleted.set(true); // _channel.queueDeleted(queue); } @@ -435,7 +444,9 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage { boolean closed = false; State state = getState(); - synchronized (_sendLock) + + _stateChangeExclusiveLock.lock(); + try { while(!closed && state != State.CLOSED) { @@ -451,6 +462,11 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage } _creditManager.removeListener(this); } + finally + { + _stateChangeExclusiveLock.unlock(); + } + if (closed) { @@ -481,7 +497,13 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage public Object getSendLock() { - return _sendLock; + _stateChangeLock.lock(); + return _deleted; + } + + public void releaseSendLock() + { + _stateChangeLock.unlock(); } public void resend(final QueueEntry entry) throws AMQException diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java index 9924d1c770..ca614e053a 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java @@ -44,6 +44,7 @@ import org.apache.mina.common.ByteBuffer; import javax.management.Notification; import java.util.LinkedList; +import java.util.Collections; /** This class tests all the alerts an AMQQueue can throw based on threshold values of different parameters */ public class AMQQueueAlertTest extends TestCase @@ -303,7 +304,7 @@ public class AMQQueueAlertTest extends TestCase for (int i = 0; i < messages.length; i++) { messages[i] = message(false, size); - messages[i].enqueue(_queue); + messages[i].enqueue(Collections.singleton(_queue)); messages[i].routingComplete(_messageStore, new MessageHandleFactory()); } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index 98f78e3d69..bf0a8a6d90 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -48,6 +48,7 @@ import org.apache.mina.common.ByteBuffer; import javax.management.JMException; import java.util.LinkedList; +import java.util.Collections; /** * Test class to test AMQQueueMBean attribtues and operations @@ -216,7 +217,7 @@ public class AMQQueueMBeanTest extends TestCase long id = msg.getMessageId(); _queue.clearQueue(_storeContext); - msg.enqueue(_queue); + msg.enqueue(Collections.singleton(_queue)); msg.routingComplete(_messageStore, new MessageHandleFactory()); msg.addContentBodyFrame(new ContentChunk() @@ -318,7 +319,7 @@ public class AMQQueueMBeanTest extends TestCase for (int i = 0; i < messageCount; i++) { IncomingMessage currentMessage = message(false, persistent); - currentMessage.enqueue(_queue); + currentMessage.enqueue(Collections.singleton(_queue)); // route header currentMessage.routingComplete(_messageStore, new MessageHandleFactory()); |
