summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2012-06-13 14:13:27 +0000
committerTed Ross <tross@apache.org>2012-06-13 14:13:27 +0000
commit6669f46b2edc10980edc2d3d664262dcca9dd881 (patch)
tree9e40f10bea753856a229ddbc53fd2a8fc759fa6c
parente4a8c97e664c6293be8337d3bf54600b7e636aa1 (diff)
downloadqpid-python-6669f46b2edc10980edc2d3d664262dcca9dd881.tar.gz
QPID-4061 - Added statistic: Number of unacknowledged messages in a session
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1349865 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp7
-rw-r--r--qpid/cpp/src/qpid/broker/SessionContext.h1
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.h5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java6
-rw-r--r--qpid/specs/management-schema.xml2
-rwxr-xr-xqpid/tools/src/py/qpid-stat4
6 files changed, 23 insertions, 2 deletions
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index 64924bdd4c..5786370598 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -142,6 +142,7 @@ bool SemanticState::cancel(const string& tag)
DeliveryRecords::iterator removed =
remove_if(unacked.begin(), unacked.end(), bind(&DeliveryRecord::isRedundant, _1));
unacked.erase(removed, unacked.end());
+ getSession().setUnackedCount(unacked.size());
return true;
} else {
return false;
@@ -270,6 +271,7 @@ void SemanticState::checkDtxTimeout()
void SemanticState::record(const DeliveryRecord& delivery)
{
unacked.push_back(delivery);
+ getSession().setUnackedCount(unacked.size());
}
const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency");
@@ -555,6 +557,7 @@ void SemanticState::recover(bool requeue)
//w.r.t id is lost
sort(unacked.begin(), unacked.end());
}
+ getSession().setUnackedCount(unacked.size());
}
void SemanticState::deliver(DeliveryRecord& msg, bool sync)
@@ -712,6 +715,7 @@ void SemanticState::release(DeliveryId first, DeliveryId last, bool setRedeliver
DeliveryRecords::iterator removed =
remove_if(range.start, range.end, bind(&DeliveryRecord::isRedundant, _1));
unacked.erase(removed, range.end);
+ getSession().setUnackedCount(unacked.size());
}
void SemanticState::reject(DeliveryId first, DeliveryId last)
@@ -723,6 +727,7 @@ void SemanticState::reject(DeliveryId first, DeliveryId last)
if (i->isRedundant()) i = unacked.erase(i);
else i++;
}
+ getSession().setUnackedCount(unacked.size());
}
bool SemanticState::ConsumerImpl::doOutput()
@@ -810,6 +815,7 @@ void SemanticState::accepted(const SequenceSet& commands) {
(TransactionContext*) 0)));
unacked.erase(removed, unacked.end());
}
+ getSession().setUnackedCount(unacked.size());
}
void SemanticState::completed(const SequenceSet& commands) {
@@ -819,6 +825,7 @@ void SemanticState::completed(const SequenceSet& commands) {
bind(&SemanticState::complete, this, _1)));
unacked.erase(removed, unacked.end());
requestDispatch();
+ getSession().setUnackedCount(unacked.size());
}
void SemanticState::attached()
diff --git a/qpid/cpp/src/qpid/broker/SessionContext.h b/qpid/cpp/src/qpid/broker/SessionContext.h
index 253ce8dcf2..ee98da1878 100644
--- a/qpid/cpp/src/qpid/broker/SessionContext.h
+++ b/qpid/cpp/src/qpid/broker/SessionContext.h
@@ -47,6 +47,7 @@ class SessionContext : public OwnershipToken, public sys::OutputControl
virtual uint16_t getChannel() const = 0;
virtual const SessionId& getSessionId() const = 0;
virtual void addPendingExecutionSync() = 0;
+ virtual void setUnackedCount(uint64_t) {}
};
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h
index 8db232a2d6..a8ff7feff9 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.h
+++ b/qpid/cpp/src/qpid/broker/SessionState.h
@@ -126,6 +126,11 @@ class SessionState : public qpid::SessionState,
// the SessionState of a received Execution.Sync command.
void addPendingExecutionSync();
+ void setUnackedCount(uint64_t count) {
+ if (mgmtObject)
+ mgmtObject->set_unackedMessages(count);
+ }
+
// Used to delay creation of management object for sessions
// belonging to inter-broker bridges
void addManagementObject();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
index fbaba7afed..84487d314f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
@@ -1741,6 +1741,12 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable
return 0l;
}
+ public Long getUnackedMessages()
+ {
+ // TODO
+ return 0l;
+ }
+
public Long getTxnStarts()
{
return _obj.getTxnStarts();
diff --git a/qpid/specs/management-schema.xml b/qpid/specs/management-schema.xml
index 69ca56f39c..ed0902bfc1 100644
--- a/qpid/specs/management-schema.xml
+++ b/qpid/specs/management-schema.xml
@@ -443,7 +443,7 @@
<property name="expireTime" type="absTime" access="RO" optional="y"/>
<property name="maxClientRate" type="uint32" access="RO" unit="msgs/sec" optional="y"/>
- <statistic name="framesOutstanding" type="count32"/>
+ <statistic name="unackedMessages" type="uint64" unit="message" desc="Unacknowledged messages in the session"/>
<statistic name="TxnStarts" type="count64" unit="transaction" desc="Total transactions started "/>
<statistic name="TxnCommits" type="count64" unit="transaction" desc="Total transactions committed"/>
diff --git a/qpid/tools/src/py/qpid-stat b/qpid/tools/src/py/qpid-stat
index 835f9da2f1..f92a70e664 100755
--- a/qpid/tools/src/py/qpid-stat
+++ b/qpid/tools/src/py/qpid-stat
@@ -427,7 +427,8 @@ class BrokerManager:
heads.append(Header("acked", Header.Y))
heads.append(Header("excl", Header.Y))
heads.append(Header("creditMode"))
- heads.append(Header("delivered", Header.KMG))
+ heads.append(Header("delivered", Header.COMMAS))
+ heads.append(Header("sessUnacked", Header.COMMAS))
rows = []
subscriptions = self.broker.getAllSubscriptions()
sessions = self.getSessionMap()
@@ -447,6 +448,7 @@ class BrokerManager:
row.append(s.exclusive)
row.append(s.creditMode)
row.append(s.delivered)
+ row.append(session.unackedMessages)
rows.append(row)
except:
pass