diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2008-07-15 11:42:55 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2008-07-15 11:42:55 +0000 |
| commit | 64d1a8e63e8a7f82837e8de095e5d23837835e65 (patch) | |
| tree | ac00371ac6952db725c45bff190897726c891515 /java | |
| parent | b5c255a61c9f10cffff11c3327b7561e91670800 (diff) | |
| download | qpid-python-64d1a8e63e8a7f82837e8de095e5d23837835e65.tar.gz | |
QPID-1172 : Moved unregistration out of the sendLock. Potential refactor possible between processQueue and flushSubscription
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@676887 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
| -rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java | 414 |
1 files changed, 175 insertions, 239 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 247402e442..f06e3598a7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -82,16 +82,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener private volatile Subscription _exclusiveSubscriber; - private final QueueEntryList _entries; - private final AMQQueueMBean _managedObject; private final Executor _asyncDelivery; private final AtomicLong _totalMessagesReceived = new AtomicLong(); - - /** max allowed size(KB) of a single message */ @Configured(path = "maximumMessageSize", defaultValue = "0") public long _maximumMessageSize; @@ -112,14 +108,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener @Configured(path = "minimumAlertRepeatGap", defaultValue = "0") public long _minimumAlertRepeatGap; - - private static final int MAX_ASYNC_DELIVERIES = 10; - private final Set<NotificationCheck> _notificationChecks = EnumSet.noneOf(NotificationCheck.class); - private final AtomicLong _stateChangeCount = new AtomicLong(Long.MIN_VALUE); private AtomicReference _asynchronousRunner = new AtomicReference(null); private AtomicInteger _deliveredMessages = new AtomicInteger(); @@ -127,7 +119,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost) throws AMQException { - this(name,durable,owner,autoDelete,virtualHost,new SimpleQueueEntryList.Factory()); + this(name, durable, owner, autoDelete, virtualHost, new SimpleQueueEntryList.Factory()); } protected SimpleAMQQueue(AMQShortString name, @@ -136,7 +128,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener boolean autoDelete, VirtualHost virtualHost, QueueEntryListFactory entryListFactory) - throws AMQException + throws AMQException { if (name == null) @@ -168,7 +160,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener throw new AMQException("AMQQueue MBean creation has failed ", e); } - // This ensure that the notification checks for the configured alerts are created. setMaximumMessageAge(_maximumMessageAge); setMaximumMessageCount(_maximumMessageCount); @@ -204,7 +195,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener return _virtualHost; } - // ------ bind and unbind public void bind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException @@ -227,27 +217,25 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } boolean removed = _bindings.remove(routingKey, arguments, exchange); - if(!removed) + if (!removed) { _logger.error("Mismatch between queue bindings and exchange record of bindings"); } } - // ------ Manage Subscriptions public synchronized void registerSubscription(final Subscription subscription, final boolean exclusive) throws AMQException { - - if(isExclusiveSubscriber()) + if (isExclusiveSubscriber()) { throw new ExistingExclusiveSubscription(); } - if(exclusive) + if (exclusive) { - if(getConsumerCount() != 0) + if (getConsumerCount() != 0) { throw new ExistingSubscriptionPreventsExclusive(); } @@ -258,16 +246,15 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } } - _activeSubscriberCount.incrementAndGet(); subscription.setStateListener(this); - subscription.setLastSeenEntry(null,_entries.getHead()); + subscription.setLastSeenEntry(null, _entries.getHead()); - if(!isDeleted()) + if (!isDeleted()) { subscription.setQueue(this); _subscriptionList.add(subscription); - if(isDeleted()) + if (isDeleted()) { subscription.queueDeleted(this); } @@ -277,39 +264,32 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener // TODO } - deliverAsync(subscription); } public synchronized void unregisterSubscription(final Subscription subscription) throws AMQException { - if(subscription == null) + if (subscription == null) { throw new NullPointerException("subscription argument is null"); } boolean removed = _subscriptionList.remove(subscription); - - - if(removed) + if (removed) { subscription.close(); // No longer can the queue have an exclusive consumer setExclusiveSubscriber(null); - QueueEntry lastSeen; - while((lastSeen = subscription.getLastSeenEntry()) != null) + while ((lastSeen = subscription.getLastSeenEntry()) != null) { subscription.setLastSeenEntry(lastSeen, null); } - - - // auto-delete queues must be deleted if there are no remaining subscribers if (_autoDelete && getConsumerCount() == 0) @@ -324,30 +304,25 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener // we need to manually fire the event to the removed subscription (which was the last one left for this // queue. This is because the delete method uses the subscription set which has just been cleared subscription.queueDeleted(this); - } + } } - } - // ------ Enqueue / Dequeue public QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException { - - incrementQueueCount(); incrementQueueSize(message); _totalMessagesReceived.incrementAndGet(); - - QueueEntry entry; + QueueEntry entry; Subscription exclusiveSub = _exclusiveSubscriber; - if(exclusiveSub != null) + if (exclusiveSub != null) { exclusiveSub.getSendLock(); @@ -357,11 +332,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener deliverToSubscription(exclusiveSub, entry); - // where there is more than one producer there's a reasonable chance that even though there is // no "queueing" we do not deliver because we get an interleving of _entries.add and // deliverToSubscription between threads. Therefore have one more try. - if(!(entry.isAcquired() || entry.isDeleted())) + if (!(entry.isAcquired() || entry.isDeleted())) { deliverToSubscription(exclusiveSub, entry); } @@ -381,13 +355,13 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener */ SubscriptionList.SubscriptionNode node = _lastSubscriptionNode.get(); SubscriptionList.SubscriptionNode nextNode = node.getNext(); - if(nextNode == null) + if (nextNode == null) { nextNode = _subscriptionList.getHead().getNext(); } - while(nextNode != null) + while (nextNode != null) { - if(_lastSubscriptionNode.compareAndSet(node, nextNode)) + if (_lastSubscriptionNode.compareAndSet(node, nextNode)) { break; } @@ -395,21 +369,20 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { node = _lastSubscriptionNode.get(); nextNode = node.getNext(); - if(nextNode == null) + if (nextNode == null) { nextNode = _subscriptionList.getHead().getNext(); } } } - // always do one extra loop after we believe we've finished // this catches the case where we *just* miss an update int loops = 2; - while(!(entry.isAcquired() || entry.isDeleted()) && loops != 0) + while (!(entry.isAcquired() || entry.isDeleted()) && loops != 0) { - if(nextNode == null) + if (nextNode == null) { loops--; nextNode = _subscriptionList.getHead(); @@ -425,20 +398,18 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } } - - if(entry.immediateAndNotDelivered()) + if (entry.immediateAndNotDelivered()) { dequeue(storeContext, entry); entry.dispose(storeContext); } - else if(!(entry.isAcquired() || entry.isDeleted())) + else if (!(entry.isAcquired() || entry.isDeleted())) { checkSubscriptionsNotAheadOfDelivery(entry); deliverAsync(); } - try { _managedObject.checkForNotification(entry.getMessage()); @@ -448,7 +419,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener throw new AMQException("Unable to get notification from manage queue: " + e, e); } - return entry; } @@ -460,12 +430,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener sub.getSendLock(); try { - if(subscriptionReadyAndHasInterest(sub, entry) - && !sub.isSuspended()) + if (subscriptionReadyAndHasInterest(sub, entry) + && !sub.isSuspended()) { - if( !sub.wouldSuspend(entry)) + if (!sub.wouldSuspend(entry)) { - if(!sub.isBrowser() && !entry.acquire(sub)) + if (!sub.isBrowser() && !entry.acquire(sub)) { // restore credit here that would have been taken away by wouldSuspend since we didn't manage // to acquire the entry for this subscription @@ -516,11 +486,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener // We need to move this subscription on, past entries which are already acquired, or deleted or ones it has no // interest in. QueueEntry node = sub.getLastSeenEntry(); - while(node != null && (node.isAcquired() || node.isDeleted() || !sub.hasInterest(node)) ) + while (node != null && (node.isAcquired() || node.isDeleted() || !sub.hasInterest(node))) { QueueEntry newNode = _entries.next(node); - if(newNode != null) + if (newNode != null) { sub.setLastSeenEntry(node, newNode); node = sub.getLastSeenEntry(); @@ -533,8 +503,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } - - if(node == entry) + if (node == entry) { // If the first entry that subscription can process is the one we are trying to deliver to it, then we are // good @@ -555,11 +524,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { QueueEntry node = sub.getLastSeenEntry(); - if(node != null && entry.compareTo(node) < 0 && sub.hasInterest(entry)) + if (node != null && entry.compareTo(node) < 0 && sub.hasInterest(entry)) { do { - if(sub.setLastSeenEntry(node,entry)) + if (sub.setLastSeenEntry(node, entry)) { return; } @@ -567,7 +536,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { node = sub.getLastSeenEntry(); } - } while (node != null && entry.compareTo(node) < 0); + } + while (node != null && entry.compareTo(node) < 0); } } @@ -577,28 +547,26 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator(); // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards - while(subscriberIter.advance()) + while (subscriberIter.advance()) { Subscription sub = subscriberIter.getNode().getSubscription(); // we don't make browsers send the same stuff twice - if(!sub.isBrowser()) + if (!sub.isBrowser()) { updateLastSeenEntry(sub, entry); } } - deliverAsync(); - } public void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException { decrementQueueCount(); decrementQueueSize(entry); - if(entry.acquiredBySubscription()) + if (entry.acquiredBySubscription()) { _deliveredMessages.decrementAndGet(); } @@ -606,7 +574,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener try { AMQMessage msg = entry.getMessage(); - if(isDurable() && msg.isPersistent()) + if (isDurable() && msg.isPersistent()) { _virtualHost.getMessageStore().dequeueMessage(storeContext, this, msg.getMessageId()); } @@ -626,7 +594,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener throw new FailedDequeueException(_name.toString(), e); } - } private void decrementQueueSize(final QueueEntry entry) @@ -647,7 +614,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener subscription.getSendLock(); try { - if(!subscription.isClosed()) + if (!subscription.isClosed()) { deliverMessage(subscription, entry); return true; @@ -663,10 +630,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } } - - - - public int getConsumerCount() { return _subscriptionList.size(); @@ -700,7 +663,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public int getUndeliveredMessageCount() { int count = getMessageCount() - _deliveredMessages.get(); - if(count < 0) + if (count < 0) { return 0; } @@ -710,7 +673,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } } - public long getReceivedMessageCount() { return _totalMessagesReceived.get(); @@ -732,16 +694,14 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener return _deleted.get(); } - - public List<QueueEntry> getMessagesOnTheQueue() { ArrayList<QueueEntry> entryList = new ArrayList<QueueEntry>(); QueueEntryIterator queueListIterator = _entries.iterator(); - while(queueListIterator.advance()) + while (queueListIterator.advance()) { QueueEntry node = queueListIterator.getNode(); - if(node != null && !node.isDeleted()) + if (node != null && !node.isDeleted()) { entryList.add(node); } @@ -752,14 +712,14 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public void stateChange(Subscription sub, Subscription.State oldState, Subscription.State newState) { - if(oldState == Subscription.State.ACTIVE && newState != Subscription.State.ACTIVE) + if (oldState == Subscription.State.ACTIVE && newState != Subscription.State.ACTIVE) { _activeSubscriberCount.decrementAndGet(); } - else if(newState == Subscription.State.ACTIVE) - { - if(oldState != Subscription.State.ACTIVE) + else if (newState == Subscription.State.ACTIVE) + { + if (oldState != Subscription.State.ACTIVE) { _activeSubscriberCount.incrementAndGet(); @@ -800,55 +760,52 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public boolean filterComplete(); } - - public List<QueueEntry> getMessagesOnTheQueue(final long fromMessageId, final long toMessageId) { return getMessagesOnTheQueue(new QueueEntryFilter() - { + { - public boolean accept(QueueEntry entry) - { - final long messageId = entry.getMessage().getMessageId(); - return messageId >= fromMessageId && messageId <= toMessageId; - } + public boolean accept(QueueEntry entry) + { + final long messageId = entry.getMessage().getMessageId(); + return messageId >= fromMessageId && messageId <= toMessageId; + } - public boolean filterComplete() - { - return false; - } - }); + public boolean filterComplete() + { + return false; + } + }); } public QueueEntry getMessageOnTheQueue(final long messageId) { List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter() - { - private boolean _complete; + { + private boolean _complete; - public boolean accept(QueueEntry entry) - { - _complete = entry.getMessage().getMessageId() == messageId; - return _complete; - } + public boolean accept(QueueEntry entry) + { + _complete = entry.getMessage().getMessageId() == messageId; + return _complete; + } - public boolean filterComplete() - { - return _complete; - } - }); + public boolean filterComplete() + { + return _complete; + } + }); return entries.isEmpty() ? null : entries.get(0); } - public List<QueueEntry> getMessagesOnTheQueue(QueueEntryFilter filter) { ArrayList<QueueEntry> entryList = new ArrayList<QueueEntry>(); QueueEntryIterator queueListIterator = _entries.iterator(); - while(queueListIterator.advance() && !filter.filterComplete()) + while (queueListIterator.advance() && !filter.filterComplete()) { QueueEntry node = queueListIterator.getNode(); - if(!node.isDeleted() && filter.accept(node)) + if (!node.isDeleted() && filter.accept(node)) { entryList.add(node); } @@ -857,7 +814,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } - public void moveMessagesToAnotherQueue(final long fromMessageId, final long toMessageId, String queueName, @@ -867,24 +823,22 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName)); MessageStore store = getVirtualHost().getMessageStore(); - List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter() - { - - public boolean accept(QueueEntry entry) - { - final long messageId = entry.getMessage().getMessageId(); - return (messageId >= fromMessageId) - && (messageId <= toMessageId) - && entry.acquire(); - } + { - public boolean filterComplete() - { - return false; - } - }); + public boolean accept(QueueEntry entry) + { + final long messageId = entry.getMessage().getMessageId(); + return (messageId >= fromMessageId) + && (messageId <= toMessageId) + && entry.acquire(); + } + public boolean filterComplete() + { + return false; + } + }); try { @@ -895,7 +849,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { AMQMessage message = entry.getMessage(); - if(message.isPersistent() && toQueue.isDurable()) + if (message.isPersistent() && toQueue.isDurable()) { store.enqueueMessage(storeContext, toQueue, message.getMessageId()); } @@ -943,7 +897,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener throw new RuntimeException(e); } - } public void copyMessagesToAnotherQueue(final long fromMessageId, @@ -954,30 +907,29 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName)); MessageStore store = getVirtualHost().getMessageStore(); - List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter() - { + { - public boolean accept(QueueEntry entry) - { - final long messageId = entry.getMessage().getMessageId(); - if((messageId >= fromMessageId) - && (messageId <= toMessageId)) - { - if(!entry.isDeleted()) - { - return entry.getMessage().incrementReference(); - } - } + public boolean accept(QueueEntry entry) + { + final long messageId = entry.getMessage().getMessageId(); + if ((messageId >= fromMessageId) + && (messageId <= toMessageId)) + { + if (!entry.isDeleted()) + { + return entry.getMessage().incrementReference(); + } + } - return false; - } + return false; + } - public boolean filterComplete() - { - return false; - } - }); + public boolean filterComplete() + { + return false; + } + }); try { @@ -988,7 +940,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { AMQMessage message = entry.getMessage(); - if(message.isReferenced() && message.isPersistent() && toQueue.isDurable()) + if (message.isReferenced() && message.isPersistent() && toQueue.isDurable()) { store.enqueueMessage(storeContext, toQueue, message.getMessageId()); } @@ -1021,7 +973,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { for (QueueEntry entry : entries) { - if(entry.getMessage().isReferenced()) + if (entry.getMessage().isReferenced()) { toQueue.enqueue(storeContext, entry.getMessage()); } @@ -1036,7 +988,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener throw new RuntimeException(e); } - } public void removeMessagesFromQueue(long fromMessageId, long toMessageId, StoreContext storeContext) @@ -1046,17 +997,16 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { QueueEntryIterator queueListIterator = _entries.iterator(); - - while(queueListIterator.advance()) + while (queueListIterator.advance()) { QueueEntry node = queueListIterator.getNode(); final long messageId = node.getMessage().getMessageId(); - if((messageId >= fromMessageId) - && (messageId <= toMessageId) - && !node.isDeleted() - && node.acquire()) + if ((messageId >= fromMessageId) + && (messageId <= toMessageId) + && !node.isDeleted() + && node.acquire()) { node.discard(storeContext); } @@ -1072,16 +1022,15 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener // ------ Management functions - public void deleteMessageFromTop(StoreContext storeContext) throws AMQException { QueueEntryIterator queueListIterator = _entries.iterator(); boolean noDeletes = true; - while(noDeletes && queueListIterator.advance() ) + while (noDeletes && queueListIterator.advance()) { QueueEntry node = queueListIterator.getNode(); - if(!node.isDeleted() && node.acquire()) + if (!node.isDeleted() && node.acquire()) { node.discard(storeContext); noDeletes = false; @@ -1096,10 +1045,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener QueueEntryIterator queueListIterator = _entries.iterator(); long count = 0; - while(queueListIterator.advance()) + while (queueListIterator.advance()) { QueueEntry node = queueListIterator.getNode(); - if(!node.isDeleted() && node.acquire()) + if (!node.isDeleted() && node.acquire()) { node.discard(storeContext); count++; @@ -1110,7 +1059,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } - public void addQueueDeleteTask(final Task task) { _deleteTaskList.add(task); @@ -1126,7 +1074,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener while (subscriptionIter.advance()) { Subscription s = subscriptionIter.getNode().getSubscription(); - if(s != null) + if (s != null) { s.queueDeleted(this); } @@ -1135,7 +1083,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener _bindings.deregister(); _virtualHost.getQueueRegistry().unregisterQueue(_name); - _managedObject.unregister(); for (Task task : _deleteTaskList) { @@ -1149,15 +1096,14 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } - public void deliverAsync() { _stateChangeCount.incrementAndGet(); Runner runner = new Runner(); - if(_asynchronousRunner.compareAndSet(null,runner)) - { + if (_asynchronousRunner.compareAndSet(null, runner)) + { _asyncDelivery.execute(runner); } } @@ -1193,12 +1139,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } } - private class SubFlushRunner implements ReadWriteRunnable { private final Subscription _sub; - public SubFlushRunner(Subscription sub) { _sub = sub; @@ -1216,7 +1160,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { _logger.error(e); } - if(!complete && !_sub.isSuspended()) + if (!complete && !_sub.isSuspended()) { _asyncDelivery.execute(this); } @@ -1244,25 +1188,25 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener boolean atTail = false; boolean advanced; - while(!sub.isSuspended() && !atTail && deliveries != 0) + while (!sub.isSuspended() && !atTail && deliveries != 0) { advanced = false; sub.getSendLock(); try { - if(sub.isActive()) + if (sub.isActive()) { QueueEntry node = moveSubscriptionToNextNode(sub); - if(!(node.isAcquired() || node.isDeleted())) + if (!(node.isAcquired() || node.isDeleted())) { - if(!sub.isSuspended()) + if (!sub.isSuspended()) { - if(sub.hasInterest(node)) + if (sub.hasInterest(node)) { - if(!sub.wouldSuspend(node)) + if (!sub.wouldSuspend(node)) { - if(!sub.isBrowser() && !node.acquire(sub)) + if (!sub.isBrowser() && !node.acquire(sub)) { sub.restoreCredit(node); @@ -1272,11 +1216,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener deliveries--; deliverMessage(sub, node); - if(sub.isBrowser()) + if (sub.isBrowser()) { QueueEntry newNode = _entries.next(node); - if(newNode != null) + if (newNode != null) { advanced = true; sub.setLastSeenEntry(node, newNode); @@ -1295,7 +1239,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { // this subscription is not interested in this node so we can skip over it QueueEntry newNode = _entries.next(node); - if(newNode != null) + if (newNode != null) { sub.setLastSeenEntry(node, newNode); } @@ -1318,12 +1262,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener // next entry they are interested in yet. This would lead to holding on to references to expired messages, etc // which would give us memory "leak". - if(!isExclusiveSubscriber()) + if (!isExclusiveSubscriber()) { advanceAllSubscriptions(); } - if(atTail && sub.isAutoClose()) + if (atTail && sub.isAutoClose()) { unregisterSubscription(sub); @@ -1337,7 +1281,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener protected void advanceAllSubscriptions() throws AMQException { SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator(); - while(subscriberIter.advance()) + while (subscriberIter.advance()) { SubscriptionList.SubscriptionNode subNode = subscriberIter.getNode(); Subscription sub = subNode.getSubscription(); @@ -1349,19 +1293,19 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener throws AMQException { QueueEntry node = sub.getLastSeenEntry(); - - while(node != null && (node.isAcquired() || node.isDeleted() || node.expired())) + + while (node != null && (node.isAcquired() || node.isDeleted() || node.expired())) { - if(!node.isAcquired() && !node.isDeleted() && node.expired()) + if (!node.isAcquired() && !node.isDeleted() && node.expired()) { - if(node.acquire()) + if (node.acquire()) { final StoreContext reapingStoreContext = new StoreContext(); node.discard(reapingStoreContext); } } QueueEntry newNode = _entries.next(node); - if(newNode != null) + if (newNode != null) { sub.setLastSeenEntry(node, newNode); node = sub.getLastSeenEntry(); @@ -1375,7 +1319,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener return node; } - private void processQueue(Runnable runner) throws AMQException { long stateChangeCount; @@ -1385,51 +1328,51 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener int extraLoops = 1; int deliveries = MAX_ASYNC_DELIVERIES; - _asynchronousRunner.compareAndSet(runner,null); + _asynchronousRunner.compareAndSet(runner, null); - while(deliveries != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete ) && _asynchronousRunner.compareAndSet(null,runner)) + while (deliveries != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete) && _asynchronousRunner.compareAndSet(null, runner)) { // we want to have one extra loop after every subscription has reached the point where it cannot move // further, just in case the advance of one subscription in the last loop allows a different subscription to // move forward in the next iteration - if(previousStateChangeCount != stateChangeCount) + if (previousStateChangeCount != stateChangeCount) { extraLoops = 1; } - + previousStateChangeCount = stateChangeCount; deliveryIncomplete = _subscriptionList.size() != 0; boolean done = true; - SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator(); //iterate over the subscribers and try to advance their pointer - while(subscriptionIter.advance()) + while (subscriptionIter.advance()) { + boolean closeConsumer = false; Subscription sub = subscriptionIter.getNode().getSubscription(); - if(sub != null) + if (sub != null) { sub.getSendLock(); try { QueueEntry node = moveSubscriptionToNextNode(sub); - if(node != null && sub.isActive()) + if (node != null && sub.isActive()) { boolean advanced = false; boolean subActive = false; - if(!(node.isAcquired() || node.isDeleted())) + if (!(node.isAcquired() || node.isDeleted())) { - if(!sub.isSuspended()) + if (!sub.isSuspended()) { subActive = true; - if(sub.hasInterest(node)) + if (sub.hasInterest(node)) { - if(!sub.wouldSuspend(node)) + if (!sub.wouldSuspend(node)) { - if(!sub.isBrowser() && !node.acquire(sub)) + if (!sub.isBrowser() && !node.acquire(sub)) { sub.restoreCredit(node); @@ -1439,32 +1382,31 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener deliverMessage(sub, node); deliveries--; - if(sub.isBrowser()) + if (sub.isBrowser()) { QueueEntry newNode = _entries.next(node); - if(newNode != null) + if (newNode != null) { sub.setLastSeenEntry(node, newNode); node = sub.getLastSeenEntry(); advanced = true; } - } } done = false; } else { - node.addStateChangeListener(new QueueEntryListener(sub,node)); + node.addStateChangeListener(new QueueEntryListener(sub, node)); } } else { // this subscription is not interested in this node so we can skip over it QueueEntry newNode = _entries.next(node); - if(newNode != null) + if (newNode != null) { sub.setLastSeenEntry(node, newNode); } @@ -1475,25 +1417,26 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener done = done && (!subActive || atTail); - if(atTail && !advanced && sub.isAutoClose()) - { - unregisterSubscription(sub); - - ProtocolOutputConverter converter = sub.getChannel().getProtocolSession().getProtocolOutputConverter(); - converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(), sub.getConsumerTag()); - - } - + closeConsumer = (atTail && !advanced && sub.isAutoClose()); } } finally { sub.releaseSendLock(); } + + if (closeConsumer) + { + unregisterSubscription(sub); + + ProtocolOutputConverter converter = sub.getChannel().getProtocolSession().getProtocolOutputConverter(); + converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(), sub.getConsumerTag()); + } + } - if(done) + if (done) { - if(extraLoops == 0) + if (extraLoops == 0) { deliveryIncomplete = false; } @@ -1508,20 +1451,17 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } } - - _asynchronousRunner.set(null); } // If deliveries == 0 then the limitting factor was the time-slicing rather than available messages or credit // therefore we should schedule this runner again (unless someone beats us to it :-) ). - if(deliveries == 0 && _asynchronousRunner.compareAndSet(null,runner)) + if (deliveries == 0 && _asynchronousRunner.compareAndSet(null, runner)) { _asyncDelivery.execute(runner); } } - public void removeExpiredIfNoSubscribers() throws AMQException { @@ -1529,10 +1469,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener QueueEntryIterator queueListIterator = _entries.iterator(); - while(queueListIterator.advance()) + while (queueListIterator.advance()) { QueueEntry node = queueListIterator.getNode(); - if(!node.isDeleted() && node.expired() && node.acquire()) + if (!node.isDeleted() && node.expired() && node.acquire()) { node.discard(storeContext); @@ -1542,7 +1482,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } - public long getMinimumAlertRepeatGap() { return _minimumAlertRepeatGap; @@ -1561,7 +1500,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public void setMaximumMessageAge(long maximumMessageAge) { _maximumMessageAge = maximumMessageAge; - if(maximumMessageAge == 0L) + if (maximumMessageAge == 0L) { _notificationChecks.remove(NotificationCheck.MESSAGE_AGE_ALERT); } @@ -1579,7 +1518,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public void setMaximumMessageCount(final long maximumMessageCount) { _maximumMessageCount = maximumMessageCount; - if(maximumMessageCount == 0L) + if (maximumMessageCount == 0L) { _notificationChecks.remove(NotificationCheck.MESSAGE_COUNT_ALERT); } @@ -1588,8 +1527,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener _notificationChecks.add(NotificationCheck.MESSAGE_COUNT_ALERT); } - - } public long getMaximumQueueDepth() @@ -1601,7 +1538,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public void setMaximumQueueDepth(final long maximumQueueDepth) { _maximumQueueDepth = maximumQueueDepth; - if(maximumQueueDepth == 0L) + if (maximumQueueDepth == 0L) { _notificationChecks.remove(NotificationCheck.QUEUE_DEPTH_ALERT); } @@ -1620,7 +1557,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public void setMaximumMessageSize(final long maximumMessageSize) { _maximumMessageSize = maximumMessageSize; - if(maximumMessageSize == 0L) + if (maximumMessageSize == 0L) { _notificationChecks.remove(NotificationCheck.MESSAGE_SIZE_ALERT); } @@ -1630,7 +1567,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } } - public Set<NotificationCheck> getNotificationChecks() { return _notificationChecks; @@ -1654,7 +1590,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public boolean equals(Object o) { - return _entry == ((QueueEntryListener)o)._entry && _sub == ((QueueEntryListener)o)._sub; + return _entry == ((QueueEntryListener) o)._entry && _sub == ((QueueEntryListener) o)._sub; } public int hashCode() |
