summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/configure.ac18
-rw-r--r--qpid/cpp/src/CMakeLists.txt10
-rw-r--r--qpid/cpp/src/Makefile.am1
-rw-r--r--qpid/cpp/src/config.h.cmake1
-rw-r--r--qpid/cpp/src/qpid/sys/Probes.h65
-rw-r--r--qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp30
6 files changed, 123 insertions, 2 deletions
diff --git a/qpid/cpp/configure.ac b/qpid/cpp/configure.ac
index 8729ace169..a3318531bd 100644
--- a/qpid/cpp/configure.ac
+++ b/qpid/cpp/configure.ac
@@ -480,6 +480,24 @@ case "$host" in
esac
AM_CONDITIONAL([SUNOS], [test x$arch = xsolaris])
+# Check whether we've got the header for dtrace static probes
+AC_ARG_WITH([probes],
+ [AS_HELP_STRING([--with-probes], [Build with dtrace/systemtap static probes])],
+ [case ${withval} in
+ yes)
+ AC_CHECK_HEADERS([sys/sdt.h])
+ ;;
+ no)
+ ;;
+ *)
+ AC_MSG_ERROR([Bad value for --with-probes: ${withval}])
+ ;;
+ esac],
+ [
+ AC_CHECK_HEADERS([sys/sdt.h])
+ ]
+)
+
# Check for some syslog capabilities not present in all systems
AC_TRY_COMPILE([#include <sys/syslog.h>],
[int v = LOG_AUTHPRIV;],
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt
index e8e543b672..a258605a1e 100644
--- a/qpid/cpp/src/CMakeLists.txt
+++ b/qpid/cpp/src/CMakeLists.txt
@@ -477,6 +477,16 @@ else (NOT CLOCK_GETTIME_IN_RT)
set(QPID_HAS_CLOCK_GETTIME YES CACHE BOOL "Platform has clock_gettime")
endif (NOT CLOCK_GETTIME_IN_RT)
+# Check for header file for dtrace static probes
+check_include_files(sys/sdt.h HAVE_SYS_SDT_H)
+if (HAVE_SYS_SDT_H)
+ set(probes_default ON)
+endif (HAVE_SYS_SDT_H)
+option(BUILD_PROBES "Build with DTrace/systemtap static probes" ${probes_default})
+if (NOT BUILD_PROBES)
+ set (HAVE_SYS_SDT_H 0)
+endif (NOT BUILD_PROBES)
+
# If not windows ensure that we have uuid library
if (NOT CMAKE_SYSTEM_NAME STREQUAL Windows)
CHECK_LIBRARY_EXISTS (uuid uuid_compare "" HAVE_UUID)
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index 4f733e985b..0d582aaa54 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -487,6 +487,7 @@ libqpidcommon_la_SOURCES += \
qpid/sys/PollableCondition.h \
qpid/sys/PollableQueue.h \
qpid/sys/Poller.h \
+ qpid/sys/Probes.h \
qpid/sys/ProtocolFactory.h \
qpid/sys/Runnable.cpp \
qpid/sys/ScopedIncrement.h \
diff --git a/qpid/cpp/src/config.h.cmake b/qpid/cpp/src/config.h.cmake
index 2bb84c6e47..f55f68afde 100644
--- a/qpid/cpp/src/config.h.cmake
+++ b/qpid/cpp/src/config.h.cmake
@@ -60,6 +60,7 @@
#cmakedefine HAVE_OPENAIS_CPG_H ${HAVE_OPENAIS_CPG_H}
#cmakedefine HAVE_COROSYNC_CPG_H ${HAVE_COROSYNC_CPG_H}
#cmakedefine HAVE_LIBCMAN_H ${HAVE_LIBCMAN_H}
+#cmakedefine HAVE_SYS_SDT_H ${HAVE_SYS_SDT_H}
#cmakedefine HAVE_LOG_AUTHPRIV
#cmakedefine HAVE_LOG_FTP
diff --git a/qpid/cpp/src/qpid/sys/Probes.h b/qpid/cpp/src/qpid/sys/Probes.h
new file mode 100644
index 0000000000..d30181c357
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/Probes.h
@@ -0,0 +1,65 @@
+#ifndef _sys_Probes
+#define _sys_Probes
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "config.h"
+
+#ifdef HAVE_SYS_SDT_H
+#include <sys/sdt.h>
+#endif
+
+// Pragmatically it seems that Linux and Solaris versions of sdt.h which support
+// user static probes define up to DTRACE_PROBE8, but FreeBSD 8 which doesn't
+// support usdt only defines up to DTRACE_PROBE4 - FreeBSD 9 which does support usdt
+// defines up to DTRACE_PROBE5.
+
+#ifdef DTRACE_PROBE5
+// Versions for Linux Systemtap/Solaris/FreeBSD 9
+#define QPID_PROBE(probe) DTRACE_PROBE(qpid, probe)
+#define QPID_PROBE1(probe, p1) DTRACE_PROBE1(qpid, probe, p1)
+#define QPID_PROBE2(probe, p1, p2) DTRACE_PROBE2(qpid, probe, p1, p2)
+#define QPID_PROBE3(probe, p1, p2, p3) DTRACE_PROBE3(qpid, probe, p1, p2, p3)
+#define QPID_PROBE4(probe, p1, p2, p3, p4) DTRACE_PROBE4(qpid, probe, p1, p2, p3, p4)
+#define QPID_PROBE5(probe, p1, p2, p3, p4, p5) DTRACE_PROBE5(qpid, probe, p1, p2, p3, p4, p5)
+#else
+// FreeBSD 8
+#define QPID_PROBE(probe)
+#define QPID_PROBE1(probe, p1)
+#define QPID_PROBE2(probe, p1, p2)
+#define QPID_PROBE3(probe, p1, p2, p3)
+#define QPID_PROBE4(probe, p1, p2, p3, p4)
+#define QPID_PROBE5(probe, p1, p2, p3, p4, p5)
+#endif
+
+#ifdef DTRACE_PROBE8
+// Versions for Linux Systemtap
+#define QPID_PROBE6(probe, p1, p2, p3, p4, p5, p6) DTRACE_PROBE6(qpid, probe, p1, p2, p3, p4, p5, p6)
+#define QPID_PROBE7(probe, p1, p2, p3, p4, p5, p6, p7) DTRACE_PROBE7(qpid, probe, p1, p2, p3, p4, p5, p6, p7)
+#define QPID_PROBE8(probe, p1, p2, p3, p4, p5, p6, p7, p8) DTRACE_PROBE8(qpid, probe, p1, p2, p3, p4, p5, p6, p7, p8)
+#else
+// Versions for Solaris/FreeBSD
+#define QPID_PROBE6(probe, p1, p2, p3, p4, p5, p6)
+#define QPID_PROBE7(probe, p1, p2, p3, p4, p5, p6, p7)
+#define QPID_PROBE8(probe, p1, p2, p3, p4, p5, p6, p7, p8)
+#endif
+
+#endif // _sys_Probes
diff --git a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
index a1c161b596..c25159985e 100644
--- a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
+++ b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
@@ -23,6 +23,7 @@
#include "qpid/sys/Socket.h"
#include "qpid/sys/SocketAddress.h"
#include "qpid/sys/Poller.h"
+#include "qpid/sys/Probes.h"
#include "qpid/sys/DispatchHandle.h"
#include "qpid/sys/Time.h"
#include "qpid/log/Statement.h"
@@ -423,9 +424,12 @@ AsynchIO::BufferBase* AsynchIO::getQueuedBuffer() {
void AsynchIO::readable(DispatchHandle& h) {
if (readingStopped) {
// We have been flow controlled.
+ QPID_PROBE1(asynchio_read_flowcontrolled, &h);
return;
}
AbsTime readStartTime = AbsTime::now();
+ size_t total = 0;
+ int readCalls = 0;
do {
// (Try to) get a buffer
if (!bufferQueue.empty()) {
@@ -436,23 +440,29 @@ void AsynchIO::readable(DispatchHandle& h) {
errno = 0;
int readCount = buff->byteCount-buff->dataCount;
int rc = socket.read(buff->bytes + buff->dataCount, readCount);
+ int64_t duration = Duration(readStartTime, AbsTime::now());
+ ++readCalls;
if (rc > 0) {
buff->dataCount += rc;
threadReadTotal += rc;
+ total += rc;
readCallback(*this, buff);
if (readingStopped) {
// We have been flow controlled.
+ QPID_PROBE4(asynchio_read_finished_flowcontrolled, &h, duration, total, readCalls);
break;
}
if (rc != readCount) {
// If we didn't fill the read buffer then time to stop reading
+ QPID_PROBE4(asynchio_read_finished_done, &h, duration, total, readCalls);
break;
}
// Stop reading if we've overrun our timeslot
- if (Duration(readStartTime, AbsTime::now()) > threadMaxIoTimeNs) {
+ if ( duration > threadMaxIoTimeNs) {
+ QPID_PROBE4(asynchio_read_finished_maxtime, &h, duration, total, readCalls);
break;
}
@@ -461,6 +471,7 @@ void AsynchIO::readable(DispatchHandle& h) {
bufferQueue.push_front(buff);
assert(buff);
+ QPID_PROBE5(asynchio_read_finished_error, &h, duration, total, readCalls, errno);
// Eof or other side has gone away
if (rc == 0 || errno == ECONNRESET) {
eofCallback(*this);
@@ -486,6 +497,7 @@ void AsynchIO::readable(DispatchHandle& h) {
// If we still have no buffers we can't do anything more
if (bufferQueue.empty()) {
h.unwatchRead();
+ QPID_PROBE4(asynchio_read_finished_nobuffers, &h, Duration(readStartTime, AbsTime::now()), total, readCalls);
break;
}
@@ -501,6 +513,8 @@ void AsynchIO::readable(DispatchHandle& h) {
*/
void AsynchIO::writeable(DispatchHandle& h) {
AbsTime writeStartTime = AbsTime::now();
+ size_t total = 0;
+ int writeCalls = 0;
do {
// See if we've got something to write
if (!writeQueue.empty()) {
@@ -510,14 +524,18 @@ void AsynchIO::writeable(DispatchHandle& h) {
errno = 0;
assert(buff->dataStart+buff->dataCount <= buff->byteCount);
int rc = socket.write(buff->bytes+buff->dataStart, buff->dataCount);
+ int64_t duration = Duration(writeStartTime, AbsTime::now());
+ ++writeCalls;
if (rc >= 0) {
threadWriteTotal += rc;
+ total += rc;
// If we didn't write full buffer put rest back
if (rc != buff->dataCount) {
buff->dataStart += rc;
buff->dataCount -= rc;
writeQueue.push_back(buff);
+ QPID_PROBE4(asynchio_write_finished_done, &h, duration, total, writeCalls);
break;
}
@@ -525,12 +543,15 @@ void AsynchIO::writeable(DispatchHandle& h) {
queueReadBuffer(buff);
// Stop writing if we've overrun our timeslot
- if (Duration(writeStartTime, AbsTime::now()) > threadMaxIoTimeNs) {
+ if (duration > threadMaxIoTimeNs) {
+ QPID_PROBE4(asynchio_write_finished_maxtime, &h, duration, total, writeCalls);
break;
}
} else {
// Put buffer back
writeQueue.push_back(buff);
+ QPID_PROBE5(asynchio_write_finished_error, &h, duration, total, writeCalls, errno);
+
if (errno == ECONNRESET || errno == EPIPE) {
// Just stop watching for write here - we'll get a
// disconnect callback soon enough
@@ -548,9 +569,13 @@ void AsynchIO::writeable(DispatchHandle& h) {
}
}
} else {
+ int64_t duration = Duration(writeStartTime, AbsTime::now());
+ (void) duration; // force duration to be used if no probes are compiled
+
// If we're waiting to close the socket then can do it now as there is nothing to write
if (queuedClose) {
close(h);
+ QPID_PROBE4(asynchio_write_finished_closed, &h, duration, total, writeCalls);
break;
}
// Fd is writable, but nothing to write
@@ -567,6 +592,7 @@ void AsynchIO::writeable(DispatchHandle& h) {
// desired rewatchWrite so we correct that here
if (writePending)
h.rewatchWrite();
+ QPID_PROBE4(asynchio_write_finished_nodata, &h, duration, total, writeCalls);
break;
}
}