summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2010-03-23 11:54:48 +0000
committerRobert Gemmell <robbie@apache.org>2010-03-23 11:54:48 +0000
commit166b7426164e35992d43d88b5c9e61c0482a41bd (patch)
tree21b625922ca3b49ab42203f8a0ae69239572dd5f /java
parentc97d5f6162578e161154aad8d2ca8d53c616ccbf (diff)
downloadqpid-python-166b7426164e35992d43d88b5c9e61c0482a41bd.tar.gz
QPID-2379: add the queue UnackedMessage counts
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@926530 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java1
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java30
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java33
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java15
6 files changed, 74 insertions, 15 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java b/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
index 54f1db845e..381c376f56 100644
--- a/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
+++ b/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
@@ -1004,14 +1004,12 @@ public class QMFService implements ConfigStore.ConfigEventListener
public Long getUnackedMessages()
{
- // TODO
- return 0l;
+ return _obj.getUnackedMessageCount();
}
public Long getUnackedMessagesHigh()
{
- // TODO
- return 0l;
+ return _obj.getUnackedMessageCountHigh();
}
public Long getUnackedMessagesLow()
diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java
index 95e2aa516b..4c9ec6619e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java
@@ -75,6 +75,10 @@ public interface QueueConfig extends ConfiguredObject<QueueConfigType, QueueConf
long getPersistentMsgEnqueues();
long getPersistentMsgDequeues();
+
+ long getUnackedMessageCount();
+
+ long getUnackedMessageCountHigh();
void purge(long request);
} \ No newline at end of file
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index d3867d8140..45c84d7603 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -111,6 +111,7 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>, ExchangeRefer
void dequeue(QueueEntry entry, Subscription sub);
+ void decrementUnackedMsgCount();
boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index bf4015eb7a..1ba4f4d89b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -226,22 +226,29 @@ public class QueueEntryImpl implements QueueEntry
public void release()
{
- _stateUpdater.set(this,AVAILABLE_STATE);
- if(!getQueue().isDeleted())
+ EntryState state = _state;
+
+ if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, AVAILABLE_STATE))
{
- getQueue().requeue(this);
- if(_stateChangeListeners != null)
+ if(state instanceof SubscriptionAcquiredState)
{
- notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE);
+ getQueue().decrementUnackedMsgCount();
}
+
+ if(!getQueue().isDeleted())
+ {
+ getQueue().requeue(this);
+ if(_stateChangeListeners != null)
+ {
+ notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE);
+ }
+ }
+ else if(acquire())
+ {
+ routeToAlternate();
+ }
}
- else if(acquire())
- {
- routeToAlternate();
- }
-
-
}
public boolean releaseButRetain()
@@ -369,6 +376,7 @@ public class QueueEntryImpl implements QueueEntry
Subscription s = null;
if (state instanceof SubscriptionAcquiredState)
{
+ getQueue().decrementUnackedMsgCount();
s = ((SubscriptionAcquiredState) state).getSubscription();
s.onDequeue(this);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index b5d1290e98..cf2f637697 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -127,6 +127,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
private final AtomicLong _byteTxnEnqueues = new AtomicLong(0);
private final AtomicLong _msgTxnDequeues = new AtomicLong(0);
private final AtomicLong _byteTxnDequeues = new AtomicLong(0);
+ private final AtomicLong _unackedMsgCount = new AtomicLong(0);
+ private final AtomicLong _unackedMsgCountHigh = new AtomicLong(0);
private final AtomicInteger _bindingCountHigh = new AtomicInteger();
@@ -693,6 +695,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
throws AMQException
{
_deliveredMessages.incrementAndGet();
+ incrementUnackedMsgCount();
+
sub.send(entry);
setLastSeenEntry(sub,entry);
@@ -2138,4 +2142,33 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
return String.valueOf(getNameShortString());
}
+
+ public long getUnackedMessageCountHigh()
+ {
+ return _unackedMsgCountHigh.get();
+ }
+
+ public long getUnackedMessageCount()
+ {
+ return _unackedMsgCount.get();
+ }
+
+ public void decrementUnackedMsgCount()
+ {
+ _unackedMsgCount.decrementAndGet();
+ }
+
+ private void incrementUnackedMsgCount()
+ {
+ long unackedMsgCount = _unackedMsgCount.incrementAndGet();
+
+ long unackedMsgCountHigh;
+ while(unackedMsgCount > (unackedMsgCountHigh = _unackedMsgCountHigh.get()))
+ {
+ if(_unackedMsgCountHigh.compareAndSet(unackedMsgCountHigh, unackedMsgCount))
+ {
+ break;
+ }
+ }
+ }
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
index 7063beefca..dbd51af68c 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
@@ -572,4 +572,19 @@ public class MockAMQQueue implements AMQQueue
{
return 0;
}
+
+ public void decrementUnackedMsgCount()
+ {
+
+ }
+
+ public long getUnackedMessageCount()
+ {
+ return 0;
+ }
+
+ public long getUnackedMessageCountHigh()
+ {
+ return 0;
+ }
}