diff options
| author | Alan Conway <aconway@apache.org> | 2009-05-19 21:18:52 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2009-05-19 21:18:52 +0000 |
| commit | 285ca60cf814ce4b96813e929ced910d53097aef (patch) | |
| tree | 347254ab8c4b2c27d6a095ccdac7ed444ed993b0 /cpp/src/qpid/cluster | |
| parent | fe0a36ba0edb47757a7bc7331764631ebd20205e (diff) | |
| download | qpid-python-285ca60cf814ce4b96813e929ced910d53097aef.tar.gz | |
Instrumentation for measuring latencies.
Compiled out of normal builds, enable with -DQPID_LATENCY_TRACKER.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@776463 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster')
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 21 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Multicaster.cpp | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Multicaster.h | 3 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/OutputInterceptor.cpp | 4 |
4 files changed, 26 insertions, 6 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 5f51bb9dad..58ac4b91b3 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -109,6 +109,7 @@ #include "qpid/framing/ClusterShutdownBody.h" #include "qpid/framing/ClusterUpdateOfferBody.h" #include "qpid/framing/ClusterUpdateRequestBody.h" +#include "qpid/framing/MessageTransferBody.h" #include "qpid/log/Helpers.h" #include "qpid/log/Statement.h" #include "qpid/management/IdAllocator.h" @@ -294,23 +295,27 @@ void Cluster::deliver( MemberId from(nodeid, pid); framing::Buffer buf(static_cast<char*>(msg), msg_len); Event e(Event::decodeCopy(from, buf)); + LATENCY_TRACK(if (e.getConnectionId().getMember() == self) mcast.cpgLatency.finish()); deliverEvent(e); } +LATENCY_TRACK(sys::LatencyTracker<const char*> eventQueueLatencyTracker("EventQueue");) +LATENCY_TRACK(sys::LatencyTracker<const AMQBody*> frameQueueLatencyTracker("FrameQueue");) + void Cluster::deliverEvent(const Event& e) { - LATENCY_START(Event, "enqueue event", e.getData()); + LATENCY_TRACK(eventQueueLatencyTracker.start(e.getData());) deliverEventQueue.push(e); } void Cluster::deliverFrame(const EventFrame& e) { - LATENCY_START(EventFrame, "enqueue frame", e.frame.getBody()); + LATENCY_TRACK(frameQueueLatencyTracker.start(e.frame.getBody())); deliverFrameQueue.push(e); } // Handler for deliverEventQueue. // This thread decodes frames from events. void Cluster::deliveredEvent(const Event& e) { - LATENCY_STAGE(Event, "dequeue event", e.getData()); + LATENCY_TRACK(eventQueueLatencyTracker.finish(e.getData())); QPID_LOG(trace, *this << " DLVR: " << e); if (e.isCluster()) { EventFrame ef(e, e.getFrame()); @@ -329,7 +334,6 @@ void Cluster::deliveredEvent(const Event& e) { } else // Discard connection events if discarding is set. QPID_LOG(trace, *this << " DROP: " << e); - LATENCY_END(Event, "processed event", e.getData()); } void Cluster::flagError(Connection& connection, ErrorCheck::ErrorType type) { @@ -337,18 +341,22 @@ void Cluster::flagError(Connection& connection, ErrorCheck::ErrorType type) { error.error(connection, type, map.getFrameSeq(), map.getMembers()); } +LATENCY_TRACK(sys::LatencyTracker<const AMQBody*> doOutputTracker("DoOutput");) + // Handler for deliverFrameQueue. // This thread executes the main logic. void Cluster::deliveredFrame(const EventFrame& e) { - LATENCY_STAGE(EventFrame, "dequeued frame", e.frame.getBody()); + LATENCY_TRACK(frameQueueLatencyTracker.finish(e.frame.getBody())); + LATENCY_TRACK(if (e.frame.getBody()->type() == CONTENT_BODY) doOutputTracker.start(e.frame.getBody())); Mutex::ScopedLock l(lock); // Process each frame through the error checker. error.delivered(e); while (error.canProcess()) // There is a frame ready to process. processFrame(error.getNext(), l); - LATENCY_END(EventFrame, "processed frame", e.frame.getBody()); } +LATENCY_TRACK(sys::LatencyStatistic processLatency("Process");) + void Cluster::processFrame(const EventFrame& e, Lock& l) { if (e.isCluster()) { QPID_LOG(trace, *this << " DLVR: " << e); @@ -357,6 +365,7 @@ void Cluster::processFrame(const EventFrame& e, Lock& l) { throw Exception(QPID_MSG("Invalid cluster control")); } else if (state >= CATCHUP) { + LATENCY_TRACK(LatencyScope ls(processLatency)); map.incrementFrameSeq(); QPID_LOG(trace, *this << " DLVR " << map.getFrameSeq() << ": " << e); ConnectionPtr connection = getConnection(e.connectionId, l); diff --git a/cpp/src/qpid/cluster/Multicaster.cpp b/cpp/src/qpid/cluster/Multicaster.cpp index 3b9d3ac990..f72867de4d 100644 --- a/cpp/src/qpid/cluster/Multicaster.cpp +++ b/cpp/src/qpid/cluster/Multicaster.cpp @@ -31,6 +31,9 @@ namespace cluster { Multicaster::Multicaster(Cpg& cpg_, const boost::shared_ptr<sys::Poller>& poller, boost::function<void()> onError_) : +#if defined (QPID_LATENCY_TRACKER) + cpgLatency("CPG"), +#endif onError(onError_), cpg(cpg_), queue(boost::bind(&Multicaster::sendMcast, this, _1), poller), holding(true) @@ -58,6 +61,7 @@ void Multicaster::mcastBuffer(const char* data, size_t size, const ConnectionId& void Multicaster::mcast(const Event& e) { { sys::Mutex::ScopedLock l(lock); + LATENCY_TRACK(cpgLatency.start()); if (e.getType() == DATA && e.isConnection() && holding) { holdingQueue.push_back(e); return; diff --git a/cpp/src/qpid/cluster/Multicaster.h b/cpp/src/qpid/cluster/Multicaster.h index baa5b87f38..e1014fa499 100644 --- a/cpp/src/qpid/cluster/Multicaster.h +++ b/cpp/src/qpid/cluster/Multicaster.h @@ -26,6 +26,7 @@ #include "Event.h" #include "qpid/sys/PollableQueue.h" #include "qpid/sys/Mutex.h" +#include "qpid/sys/LatencyTracker.h" #include <boost/shared_ptr.hpp> namespace qpid { @@ -56,6 +57,8 @@ class Multicaster /** End holding mode, held events are mcast */ void release(); + LATENCY_TRACK(sys::LatencyCounter cpgLatency;) + private: typedef sys::PollableQueue<Event> PollableEventQueue; typedef std::deque<Event> PlainEventQueue; diff --git a/cpp/src/qpid/cluster/OutputInterceptor.cpp b/cpp/src/qpid/cluster/OutputInterceptor.cpp index 9062edc846..a7ec82128b 100644 --- a/cpp/src/qpid/cluster/OutputInterceptor.cpp +++ b/cpp/src/qpid/cluster/OutputInterceptor.cpp @@ -24,6 +24,7 @@ #include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h" #include "qpid/framing/AMQFrame.h" #include "qpid/log/Statement.h" +#include "qpid/sys/LatencyTracker.h" #include <boost/current_function.hpp> @@ -41,7 +42,10 @@ OutputInterceptor::OutputInterceptor(Connection& p, sys::ConnectionOutputHandler moreOutput(), doingOutput() {} +LATENCY_TRACK(extern sys::LatencyTracker<const AMQBody*> doOutputTracker;) + void OutputInterceptor::send(framing::AMQFrame& f) { + LATENCY_TRACK(doOutputTracker.finish(f.getBody())); parent.getCluster().checkQuorum(); { // FIXME aconway 2009-04-28: locking around next-> may be redundant |
