summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp21
1 files changed, 14 insertions, 7 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 221f12990e..09e053dc28 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -48,6 +48,7 @@
#include "qpid/memory.h"
#include "qpid/shared_ptr.h"
#include "qpid/sys/Thread.h"
+#include "qpid/sys/LatencyTracker.h"
#include <boost/bind.hpp>
#include <boost/cast.hpp>
@@ -196,8 +197,8 @@ void Cluster::leave() {
leave(l);
}
-#define LEAVE_TRY(STMT) try { STMT; } \
- catch (const std::exception& e) { \
+#define LEAVE_TRY(STMT) try { STMT; } \
+ catch (const std::exception& e) { \
QPID_LOG(warning, *this << " error leaving cluster: " << e.what()); \
} do {} while(0)
@@ -228,17 +229,20 @@ void Cluster::deliver(
}
void Cluster::deliverEvent(const Event& e) {
+ LATENCY_START(Event, "enqueue event", e.getData());
deliverEventQueue.push(e);
}
void Cluster::deliverFrame(const EventFrame& e) {
+ LATENCY_START(EventFrame, "enqueue frame", e.frame.getBody());
deliverFrameQueue.push(e);
}
// Handler for deliverEventQueue.
// This thread decodes frames from events.
void Cluster::deliveredEvent(const Event& e) {
- QPID_LOG(trace, *this << " DLVR: " << e);
+ LATENCY_STAGE(Event, "dequeue event", e.getData());
+ QPID_LOG(trace, *this << " DLVR: " << e);
if (e.isCluster()) {
EventFrame ef(e, e.getFrame());
// Stop the deliverEventQueue on update offers.
@@ -253,9 +257,10 @@ void Cluster::deliveredEvent(const Event& e) {
deliverFrame(EventFrame(e, e.getFrame()));
else
decoder.decode(e, e.getData());
-}
+ }
else // Discard connection events if discarding is set.
QPID_LOG(trace, *this << " DROP: " << e);
+ LATENCY_END(Event, "processed event", e.getData());
}
void Cluster::flagError(Connection& connection, ErrorCheck::ErrorType type) {
@@ -266,11 +271,13 @@ void Cluster::flagError(Connection& connection, ErrorCheck::ErrorType type) {
// Handler for deliverFrameQueue.
// This thread executes the main logic.
void Cluster::deliveredFrame(const EventFrame& e) {
+ LATENCY_STAGE(EventFrame, "dequeued frame", e.frame.getBody());
Mutex::ScopedLock l(lock);
// Process each frame through the error checker.
error.delivered(e);
while (error.canProcess()) // There is a frame ready to process.
processFrame(error.getNext(), l);
+ LATENCY_END(EventFrame, "processed frame", e.frame.getBody());
}
void Cluster::processFrame(const EventFrame& e, Lock& l) {
@@ -543,7 +550,7 @@ Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args& args, s
Lock l(lock);
QPID_LOG(debug, *this << " managementMethod [id=" << methodId << "]");
switch (methodId) {
- case _qmf::Cluster::METHOD_STOPCLUSTERNODE :
+ case _qmf::Cluster::METHOD_STOPCLUSTERNODE :
{
_qmf::ArgsClusterStopClusterNode& iargs = (_qmf::ArgsClusterStopClusterNode&) args;
stringstream stream;
@@ -552,10 +559,10 @@ Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args& args, s
stopClusterNode(l);
}
break;
- case _qmf::Cluster::METHOD_STOPFULLCLUSTER :
+ case _qmf::Cluster::METHOD_STOPFULLCLUSTER :
stopFullCluster(l);
break;
- default:
+ default:
return Manageable::STATUS_UNKNOWN_METHOD;
}
return Manageable::STATUS_OK;