diff options
| author | Robert Gemmell <robbie@apache.org> | 2010-03-23 11:54:48 +0000 |
|---|---|---|
| committer | Robert Gemmell <robbie@apache.org> | 2010-03-23 11:54:48 +0000 |
| commit | 166b7426164e35992d43d88b5c9e61c0482a41bd (patch) | |
| tree | 21b625922ca3b49ab42203f8a0ae69239572dd5f /java | |
| parent | c97d5f6162578e161154aad8d2ca8d53c616ccbf (diff) | |
| download | qpid-python-166b7426164e35992d43d88b5c9e61c0482a41bd.tar.gz | |
QPID-2379: add the queue UnackedMessage counts
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@926530 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
6 files changed, 74 insertions, 15 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java b/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java index 54f1db845e..381c376f56 100644 --- a/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java +++ b/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java @@ -1004,14 +1004,12 @@ public class QMFService implements ConfigStore.ConfigEventListener public Long getUnackedMessages() { - // TODO - return 0l; + return _obj.getUnackedMessageCount(); } public Long getUnackedMessagesHigh() { - // TODO - return 0l; + return _obj.getUnackedMessageCountHigh(); } public Long getUnackedMessagesLow() diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java index 95e2aa516b..4c9ec6619e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java @@ -75,6 +75,10 @@ public interface QueueConfig extends ConfiguredObject<QueueConfigType, QueueConf long getPersistentMsgEnqueues(); long getPersistentMsgDequeues(); + + long getUnackedMessageCount(); + + long getUnackedMessageCountHigh(); void purge(long request); }
\ No newline at end of file diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index d3867d8140..45c84d7603 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -111,6 +111,7 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>, ExchangeRefer void dequeue(QueueEntry entry, Subscription sub); + void decrementUnackedMsgCount(); boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException; diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index bf4015eb7a..1ba4f4d89b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -226,22 +226,29 @@ public class QueueEntryImpl implements QueueEntry public void release() { - _stateUpdater.set(this,AVAILABLE_STATE); - if(!getQueue().isDeleted()) + EntryState state = _state; + + if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, AVAILABLE_STATE)) { - getQueue().requeue(this); - if(_stateChangeListeners != null) + if(state instanceof SubscriptionAcquiredState) { - notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE); + getQueue().decrementUnackedMsgCount(); } + + if(!getQueue().isDeleted()) + { + getQueue().requeue(this); + if(_stateChangeListeners != null) + { + notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE); + } + } + else if(acquire()) + { + routeToAlternate(); + } } - else if(acquire()) - { - routeToAlternate(); - } - - } public boolean releaseButRetain() @@ -369,6 +376,7 @@ public class QueueEntryImpl implements QueueEntry Subscription s = null; if (state instanceof SubscriptionAcquiredState) { + getQueue().decrementUnackedMsgCount(); s = ((SubscriptionAcquiredState) state).getSubscription(); s.onDequeue(this); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index b5d1290e98..cf2f637697 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -127,6 +127,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener private final AtomicLong _byteTxnEnqueues = new AtomicLong(0); private final AtomicLong _msgTxnDequeues = new AtomicLong(0); private final AtomicLong _byteTxnDequeues = new AtomicLong(0); + private final AtomicLong _unackedMsgCount = new AtomicLong(0); + private final AtomicLong _unackedMsgCountHigh = new AtomicLong(0); private final AtomicInteger _bindingCountHigh = new AtomicInteger(); @@ -693,6 +695,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener throws AMQException { _deliveredMessages.incrementAndGet(); + incrementUnackedMsgCount(); + sub.send(entry); setLastSeenEntry(sub,entry); @@ -2138,4 +2142,33 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { return String.valueOf(getNameShortString()); } + + public long getUnackedMessageCountHigh() + { + return _unackedMsgCountHigh.get(); + } + + public long getUnackedMessageCount() + { + return _unackedMsgCount.get(); + } + + public void decrementUnackedMsgCount() + { + _unackedMsgCount.decrementAndGet(); + } + + private void incrementUnackedMsgCount() + { + long unackedMsgCount = _unackedMsgCount.incrementAndGet(); + + long unackedMsgCountHigh; + while(unackedMsgCount > (unackedMsgCountHigh = _unackedMsgCountHigh.get())) + { + if(_unackedMsgCountHigh.compareAndSet(unackedMsgCountHigh, unackedMsgCount)) + { + break; + } + } + } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java index 7063beefca..dbd51af68c 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java @@ -572,4 +572,19 @@ public class MockAMQQueue implements AMQQueue { return 0; } + + public void decrementUnackedMsgCount() + { + + } + + public long getUnackedMessageCount() + { + return 0; + } + + public long getUnackedMessageCountHigh() + { + return 0; + } } |
