diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 21 |
1 files changed, 14 insertions, 7 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 221f12990e..09e053dc28 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -48,6 +48,7 @@ #include "qpid/memory.h" #include "qpid/shared_ptr.h" #include "qpid/sys/Thread.h" +#include "qpid/sys/LatencyTracker.h" #include <boost/bind.hpp> #include <boost/cast.hpp> @@ -196,8 +197,8 @@ void Cluster::leave() { leave(l); } -#define LEAVE_TRY(STMT) try { STMT; } \ - catch (const std::exception& e) { \ +#define LEAVE_TRY(STMT) try { STMT; } \ + catch (const std::exception& e) { \ QPID_LOG(warning, *this << " error leaving cluster: " << e.what()); \ } do {} while(0) @@ -228,17 +229,20 @@ void Cluster::deliver( } void Cluster::deliverEvent(const Event& e) { + LATENCY_START(Event, "enqueue event", e.getData()); deliverEventQueue.push(e); } void Cluster::deliverFrame(const EventFrame& e) { + LATENCY_START(EventFrame, "enqueue frame", e.frame.getBody()); deliverFrameQueue.push(e); } // Handler for deliverEventQueue. // This thread decodes frames from events. void Cluster::deliveredEvent(const Event& e) { - QPID_LOG(trace, *this << " DLVR: " << e); + LATENCY_STAGE(Event, "dequeue event", e.getData()); + QPID_LOG(trace, *this << " DLVR: " << e); if (e.isCluster()) { EventFrame ef(e, e.getFrame()); // Stop the deliverEventQueue on update offers. @@ -253,9 +257,10 @@ void Cluster::deliveredEvent(const Event& e) { deliverFrame(EventFrame(e, e.getFrame())); else decoder.decode(e, e.getData()); -} + } else // Discard connection events if discarding is set. QPID_LOG(trace, *this << " DROP: " << e); + LATENCY_END(Event, "processed event", e.getData()); } void Cluster::flagError(Connection& connection, ErrorCheck::ErrorType type) { @@ -266,11 +271,13 @@ void Cluster::flagError(Connection& connection, ErrorCheck::ErrorType type) { // Handler for deliverFrameQueue. // This thread executes the main logic. void Cluster::deliveredFrame(const EventFrame& e) { + LATENCY_STAGE(EventFrame, "dequeued frame", e.frame.getBody()); Mutex::ScopedLock l(lock); // Process each frame through the error checker. error.delivered(e); while (error.canProcess()) // There is a frame ready to process. processFrame(error.getNext(), l); + LATENCY_END(EventFrame, "processed frame", e.frame.getBody()); } void Cluster::processFrame(const EventFrame& e, Lock& l) { @@ -543,7 +550,7 @@ Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args& args, s Lock l(lock); QPID_LOG(debug, *this << " managementMethod [id=" << methodId << "]"); switch (methodId) { - case _qmf::Cluster::METHOD_STOPCLUSTERNODE : + case _qmf::Cluster::METHOD_STOPCLUSTERNODE : { _qmf::ArgsClusterStopClusterNode& iargs = (_qmf::ArgsClusterStopClusterNode&) args; stringstream stream; @@ -552,10 +559,10 @@ Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args& args, s stopClusterNode(l); } break; - case _qmf::Cluster::METHOD_STOPFULLCLUSTER : + case _qmf::Cluster::METHOD_STOPFULLCLUSTER : stopFullCluster(l); break; - default: + default: return Manageable::STATUS_UNKNOWN_METHOD; } return Manageable::STATUS_OK; |
