From 5a769bdde526fa83c317ad4feb7d9ac0351eeea9 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Mon, 4 May 2009 18:38:18 +0000 Subject: LatenchTracker: a tool for measuring latencies. Added measurement points to cluster code. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@771392 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/cluster/Cluster.cpp | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) (limited to 'cpp/src/qpid/cluster') diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 221f12990e..09e053dc28 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -48,6 +48,7 @@ #include "qpid/memory.h" #include "qpid/shared_ptr.h" #include "qpid/sys/Thread.h" +#include "qpid/sys/LatencyTracker.h" #include #include @@ -196,8 +197,8 @@ void Cluster::leave() { leave(l); } -#define LEAVE_TRY(STMT) try { STMT; } \ - catch (const std::exception& e) { \ +#define LEAVE_TRY(STMT) try { STMT; } \ + catch (const std::exception& e) { \ QPID_LOG(warning, *this << " error leaving cluster: " << e.what()); \ } do {} while(0) @@ -228,17 +229,20 @@ void Cluster::deliver( } void Cluster::deliverEvent(const Event& e) { + LATENCY_START(Event, "enqueue event", e.getData()); deliverEventQueue.push(e); } void Cluster::deliverFrame(const EventFrame& e) { + LATENCY_START(EventFrame, "enqueue frame", e.frame.getBody()); deliverFrameQueue.push(e); } // Handler for deliverEventQueue. // This thread decodes frames from events. void Cluster::deliveredEvent(const Event& e) { - QPID_LOG(trace, *this << " DLVR: " << e); + LATENCY_STAGE(Event, "dequeue event", e.getData()); + QPID_LOG(trace, *this << " DLVR: " << e); if (e.isCluster()) { EventFrame ef(e, e.getFrame()); // Stop the deliverEventQueue on update offers. @@ -253,9 +257,10 @@ void Cluster::deliveredEvent(const Event& e) { deliverFrame(EventFrame(e, e.getFrame())); else decoder.decode(e, e.getData()); -} + } else // Discard connection events if discarding is set. QPID_LOG(trace, *this << " DROP: " << e); + LATENCY_END(Event, "processed event", e.getData()); } void Cluster::flagError(Connection& connection, ErrorCheck::ErrorType type) { @@ -266,11 +271,13 @@ void Cluster::flagError(Connection& connection, ErrorCheck::ErrorType type) { // Handler for deliverFrameQueue. // This thread executes the main logic. void Cluster::deliveredFrame(const EventFrame& e) { + LATENCY_STAGE(EventFrame, "dequeued frame", e.frame.getBody()); Mutex::ScopedLock l(lock); // Process each frame through the error checker. error.delivered(e); while (error.canProcess()) // There is a frame ready to process. processFrame(error.getNext(), l); + LATENCY_END(EventFrame, "processed frame", e.frame.getBody()); } void Cluster::processFrame(const EventFrame& e, Lock& l) { @@ -543,7 +550,7 @@ Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args& args, s Lock l(lock); QPID_LOG(debug, *this << " managementMethod [id=" << methodId << "]"); switch (methodId) { - case _qmf::Cluster::METHOD_STOPCLUSTERNODE : + case _qmf::Cluster::METHOD_STOPCLUSTERNODE : { _qmf::ArgsClusterStopClusterNode& iargs = (_qmf::ArgsClusterStopClusterNode&) args; stringstream stream; @@ -552,10 +559,10 @@ Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args& args, s stopClusterNode(l); } break; - case _qmf::Cluster::METHOD_STOPFULLCLUSTER : + case _qmf::Cluster::METHOD_STOPFULLCLUSTER : stopFullCluster(l); break; - default: + default: return Manageable::STATUS_UNKNOWN_METHOD; } return Manageable::STATUS_OK; -- cgit v1.2.1