summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2012-06-13 14:13:27 +0000
committerTed Ross <tross@apache.org>2012-06-13 14:13:27 +0000
commit6669f46b2edc10980edc2d3d664262dcca9dd881 (patch)
tree9e40f10bea753856a229ddbc53fd2a8fc759fa6c /qpid/cpp/src
parente4a8c97e664c6293be8337d3bf54600b7e636aa1 (diff)
downloadqpid-python-6669f46b2edc10980edc2d3d664262dcca9dd881.tar.gz
QPID-4061 - Added statistic: Number of unacknowledged messages in a session
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1349865 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp7
-rw-r--r--qpid/cpp/src/qpid/broker/SessionContext.h1
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.h5
3 files changed, 13 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index 64924bdd4c..5786370598 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -142,6 +142,7 @@ bool SemanticState::cancel(const string& tag)
DeliveryRecords::iterator removed =
remove_if(unacked.begin(), unacked.end(), bind(&DeliveryRecord::isRedundant, _1));
unacked.erase(removed, unacked.end());
+ getSession().setUnackedCount(unacked.size());
return true;
} else {
return false;
@@ -270,6 +271,7 @@ void SemanticState::checkDtxTimeout()
void SemanticState::record(const DeliveryRecord& delivery)
{
unacked.push_back(delivery);
+ getSession().setUnackedCount(unacked.size());
}
const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency");
@@ -555,6 +557,7 @@ void SemanticState::recover(bool requeue)
//w.r.t id is lost
sort(unacked.begin(), unacked.end());
}
+ getSession().setUnackedCount(unacked.size());
}
void SemanticState::deliver(DeliveryRecord& msg, bool sync)
@@ -712,6 +715,7 @@ void SemanticState::release(DeliveryId first, DeliveryId last, bool setRedeliver
DeliveryRecords::iterator removed =
remove_if(range.start, range.end, bind(&DeliveryRecord::isRedundant, _1));
unacked.erase(removed, range.end);
+ getSession().setUnackedCount(unacked.size());
}
void SemanticState::reject(DeliveryId first, DeliveryId last)
@@ -723,6 +727,7 @@ void SemanticState::reject(DeliveryId first, DeliveryId last)
if (i->isRedundant()) i = unacked.erase(i);
else i++;
}
+ getSession().setUnackedCount(unacked.size());
}
bool SemanticState::ConsumerImpl::doOutput()
@@ -810,6 +815,7 @@ void SemanticState::accepted(const SequenceSet& commands) {
(TransactionContext*) 0)));
unacked.erase(removed, unacked.end());
}
+ getSession().setUnackedCount(unacked.size());
}
void SemanticState::completed(const SequenceSet& commands) {
@@ -819,6 +825,7 @@ void SemanticState::completed(const SequenceSet& commands) {
bind(&SemanticState::complete, this, _1)));
unacked.erase(removed, unacked.end());
requestDispatch();
+ getSession().setUnackedCount(unacked.size());
}
void SemanticState::attached()
diff --git a/qpid/cpp/src/qpid/broker/SessionContext.h b/qpid/cpp/src/qpid/broker/SessionContext.h
index 253ce8dcf2..ee98da1878 100644
--- a/qpid/cpp/src/qpid/broker/SessionContext.h
+++ b/qpid/cpp/src/qpid/broker/SessionContext.h
@@ -47,6 +47,7 @@ class SessionContext : public OwnershipToken, public sys::OutputControl
virtual uint16_t getChannel() const = 0;
virtual const SessionId& getSessionId() const = 0;
virtual void addPendingExecutionSync() = 0;
+ virtual void setUnackedCount(uint64_t) {}
};
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h
index 8db232a2d6..a8ff7feff9 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.h
+++ b/qpid/cpp/src/qpid/broker/SessionState.h
@@ -126,6 +126,11 @@ class SessionState : public qpid::SessionState,
// the SessionState of a received Execution.Sync command.
void addPendingExecutionSync();
+ void setUnackedCount(uint64_t count) {
+ if (mgmtObject)
+ mgmtObject->set_unackedMessages(count);
+ }
+
// Used to delay creation of management object for sessions
// belonging to inter-broker bridges
void addManagementObject();