diff options
author | Yehuda Sadeh <yehuda@inktank.com> | 2012-11-13 17:02:02 -0800 |
---|---|---|
committer | Yehuda Sadeh <yehuda@inktank.com> | 2012-11-16 15:17:20 -0800 |
commit | 29a96cf29c90f297de9d6e878fc0a8607c11bae9 (patch) | |
tree | 74f4cce097f516d63c1f78710c3399f7607159ee | |
parent | 8b187bd8ca0f81bf623de8405fdc611a130b219f (diff) | |
download | ceph-29a96cf29c90f297de9d6e878fc0a8607c11bae9.tar.gz |
rgw: ops log can also go to socket
Adding a new ops log output (into a unix domain socket).
Configuration:
rgw_enable_usage_log : master switch for ops log
rgw ops log socket path : set socket path
rgw ops log rados : whether ops should be logged in the rados
cluster
rgw ops log data backlog : max size in MB to be accumulated
without flushing
Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
-rw-r--r-- | src/Makefile.am | 2 | ||||
-rw-r--r-- | src/common/OutputDataSocket.cc | 418 | ||||
-rw-r--r-- | src/common/OutputDataSocket.h | 72 | ||||
-rw-r--r-- | src/common/config_opts.h | 3 | ||||
-rw-r--r-- | src/rgw/rgw_admin.cc | 21 | ||||
-rw-r--r-- | src/rgw/rgw_log.cc | 92 | ||||
-rw-r--r-- | src/rgw/rgw_log.h | 20 | ||||
-rw-r--r-- | src/rgw/rgw_main.cc | 18 |
8 files changed, 611 insertions, 35 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index 9ea28ee2387..84d934e1c77 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1191,6 +1191,7 @@ libcommon_files = \ common/BackTrace.cc \ common/perf_counters.cc \ common/Mutex.cc \ + common/OutputDataSocket.cc \ common/admin_socket.cc \ common/admin_socket_client.cc \ common/escape.c \ @@ -1469,6 +1470,7 @@ noinst_HEADERS = \ common/Finisher.h\ common/Formatter.h\ common/perf_counters.h\ + common/OutputDataSocket.h \ common/admin_socket.h \ common/admin_socket_client.h \ common/shared_cache.hpp \ diff --git a/src/common/OutputDataSocket.cc b/src/common/OutputDataSocket.cc new file mode 100644 index 00000000000..54f6ab4a2a4 --- /dev/null +++ b/src/common/OutputDataSocket.cc @@ -0,0 +1,418 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2011 New Dream Network + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "common/Thread.h" +#include "common/OutputDataSocket.h" +#include "common/config.h" +#include "common/dout.h" +#include "common/errno.h" +#include "common/perf_counters.h" +#include "common/pipe.h" +#include "common/safe_io.h" +#include "common/version.h" +#include "common/Formatter.h" + +#include <errno.h> +#include <fcntl.h> +#include <inttypes.h> +#include <map> +#include <poll.h> +#include <set> +#include <sstream> +#include <stdint.h> +#include <string.h> +#include <string> +#include <sys/socket.h> +#include <sys/types.h> +#include <sys/un.h> +#include <unistd.h> + +#include "include/compat.h" + +#define dout_subsys ceph_subsys_asok +#undef dout_prefix +#define dout_prefix *_dout << "asok(" << (void*)m_cct << ") " + +using std::ostringstream; + +/* + * UNIX domain sockets created by an application persist even after that + * application closes, unless they're explicitly unlinked. This is because the + * directory containing the socket keeps a reference to the socket. + * + * This code makes things a little nicer by unlinking those dead sockets when + * the application exits normally. + */ +static pthread_mutex_t cleanup_lock = PTHREAD_MUTEX_INITIALIZER; +static std::vector <const char*> cleanup_files; +static bool cleanup_atexit = false; + +static void remove_cleanup_file(const char *file) +{ + pthread_mutex_lock(&cleanup_lock); + TEMP_FAILURE_RETRY(unlink(file)); + for (std::vector <const char*>::iterator i = cleanup_files.begin(); + i != cleanup_files.end(); ++i) { + if (strcmp(file, *i) == 0) { + free((void*)*i); + cleanup_files.erase(i); + break; + } + } + pthread_mutex_unlock(&cleanup_lock); +} + +static void remove_all_cleanup_files() +{ + pthread_mutex_lock(&cleanup_lock); + for (std::vector <const char*>::iterator i = cleanup_files.begin(); + i != cleanup_files.end(); ++i) { + TEMP_FAILURE_RETRY(unlink(*i)); + free((void*)*i); + } + cleanup_files.clear(); + pthread_mutex_unlock(&cleanup_lock); +} + +static void add_cleanup_file(const char *file) +{ + char *fname = strdup(file); + if (!fname) + return; + pthread_mutex_lock(&cleanup_lock); + cleanup_files.push_back(fname); + if (!cleanup_atexit) { + atexit(remove_all_cleanup_files); + cleanup_atexit = true; + } + pthread_mutex_unlock(&cleanup_lock); +} + + +OutputDataSocket::OutputDataSocket(CephContext *cct, uint64_t _backlog) + : m_cct(cct), + data_max_backlog(_backlog), + m_sock_fd(-1), + m_shutdown_rd_fd(-1), + m_shutdown_wr_fd(-1), + going_down(false), + m_lock("OutputDataSocket::m_lock") +{ +} + +OutputDataSocket::~OutputDataSocket() +{ + shutdown(); +} + +/* + * This thread listens on the UNIX domain socket for incoming connections. + * It only handles one connection at a time at the moment. All I/O is nonblocking, + * so that we can implement sensible timeouts. [TODO: make all I/O nonblocking] + * + * This thread also listens to m_shutdown_rd_fd. If there is any data sent to this + * pipe, the thread terminates itself gracefully, allowing the + * OutputDataSocketConfigObs class to join() it. + */ + +#define PFL_SUCCESS ((void*)(intptr_t)0) +#define PFL_FAIL ((void*)(intptr_t)1) + +std::string OutputDataSocket::create_shutdown_pipe(int *pipe_rd, int *pipe_wr) +{ + int pipefd[2]; + int ret = pipe_cloexec(pipefd); + if (ret < 0) { + ostringstream oss; + oss << "OutputDataSocket::create_shutdown_pipe error: " << cpp_strerror(ret); + return oss.str(); + } + + *pipe_rd = pipefd[0]; + *pipe_wr = pipefd[1]; + return ""; +} + +std::string OutputDataSocket::bind_and_listen(const std::string &sock_path, int *fd) +{ + ldout(m_cct, 5) << "bind_and_listen " << sock_path << dendl; + + struct sockaddr_un address; + if (sock_path.size() > sizeof(address.sun_path) - 1) { + ostringstream oss; + oss << "OutputDataSocket::bind_and_listen: " + << "The UNIX domain socket path " << sock_path << " is too long! The " + << "maximum length on this system is " + << (sizeof(address.sun_path) - 1); + return oss.str(); + } + int sock_fd = socket(PF_UNIX, SOCK_STREAM, 0); + if (sock_fd < 0) { + int err = errno; + ostringstream oss; + oss << "OutputDataSocket::bind_and_listen: " + << "failed to create socket: " << cpp_strerror(err); + return oss.str(); + } + int r = fcntl(sock_fd, F_SETFD, FD_CLOEXEC); + if (r < 0) { + r = errno; + TEMP_FAILURE_RETRY(::close(sock_fd)); + ostringstream oss; + oss << "OutputDataSocket::bind_and_listen: failed to fcntl on socket: " << cpp_strerror(r); + return oss.str(); + } + memset(&address, 0, sizeof(struct sockaddr_un)); + address.sun_family = AF_UNIX; + snprintf(address.sun_path, sizeof(address.sun_path), + "%s", sock_path.c_str()); + if (bind(sock_fd, (struct sockaddr*)&address, + sizeof(struct sockaddr_un)) != 0) { + int err = errno; + if (err == EADDRINUSE) { + // The old UNIX domain socket must still be there. + // Let's unlink it and try again. + TEMP_FAILURE_RETRY(unlink(sock_path.c_str())); + if (bind(sock_fd, (struct sockaddr*)&address, + sizeof(struct sockaddr_un)) == 0) { + err = 0; + } + else { + err = errno; + } + } + if (err != 0) { + ostringstream oss; + oss << "OutputDataSocket::bind_and_listen: " + << "failed to bind the UNIX domain socket to '" << sock_path + << "': " << cpp_strerror(err); + close(sock_fd); + return oss.str(); + } + } + if (listen(sock_fd, 5) != 0) { + int err = errno; + ostringstream oss; + oss << "OutputDataSocket::bind_and_listen: " + << "failed to listen to socket: " << cpp_strerror(err); + close(sock_fd); + TEMP_FAILURE_RETRY(unlink(sock_path.c_str())); + return oss.str(); + } + *fd = sock_fd; + return ""; +} + +void* OutputDataSocket::entry() +{ + ldout(m_cct, 5) << "entry start" << dendl; + while (true) { + struct pollfd fds[2]; + memset(fds, 0, sizeof(fds)); + fds[0].fd = m_sock_fd; + fds[0].events = POLLIN | POLLRDBAND; + fds[1].fd = m_shutdown_rd_fd; + fds[1].events = POLLIN | POLLRDBAND; + + int ret = poll(fds, 2, -1); + if (ret < 0) { + int err = errno; + if (err == EINTR) { + continue; + } + lderr(m_cct) << "OutputDataSocket: poll(2) error: '" + << cpp_strerror(err) << dendl; + return PFL_FAIL; + } + + if (fds[0].revents & POLLIN) { + // Send out some data + do_accept(); + } + if (fds[1].revents & POLLIN) { + // Parent wants us to shut down + return PFL_SUCCESS; + } + } + ldout(m_cct, 5) << "entry exit" << dendl; + + return PFL_SUCCESS; // unreachable +} + + +bool OutputDataSocket::do_accept() +{ + struct sockaddr_un address; + socklen_t address_length = sizeof(address); + ldout(m_cct, 30) << "OutputDataSocket: calling accept" << dendl; + int connection_fd = accept(m_sock_fd, (struct sockaddr*) &address, + &address_length); + ldout(m_cct, 30) << "OutputDataSocket: finished accept" << dendl; + if (connection_fd < 0) { + int err = errno; + lderr(m_cct) << "OutputDataSocket: do_accept error: '" + << cpp_strerror(err) << dendl; + return false; + } + + handle_connection(connection_fd); + close_connection(connection_fd); + + return 0; +} + +void OutputDataSocket::handle_connection(int fd) +{ + bufferlist bl; + + m_lock.Lock(); + init_connection(bl); + m_lock.Unlock(); + + if (bl.length()) { + /* need to special case the connection init buffer output, as it needs + * to be dumped before any data, including older data that was sent + * before the connection was established, or before we identified + * older connection was broken + */ + int ret = safe_write(fd, bl.c_str(), bl.length()); + if (ret < 0) { + return; + } + } + + int ret = dump_data(fd); + if (ret < 0) + return; + + do { + m_lock.Lock(); + cond.Wait(m_lock); + + if (going_down) { + m_lock.Unlock(); + break; + } + m_lock.Unlock(); + + ret = dump_data(fd); + } while (ret >= 0); +} + +int OutputDataSocket::dump_data(int fd) +{ + m_lock.Lock(); + list<bufferlist> l; + l = data; + data.clear(); + data_size = 0; + m_lock.Unlock(); + + for (list<bufferlist>::iterator iter = l.begin(); iter != l.end(); ++iter) { + bufferlist& bl = *iter; + int ret = safe_write(fd, bl.c_str(), bl.length()); + if (ret >= 0) { + ret = safe_write(fd, delim.c_str(), delim.length()); + } + if (ret < 0) { + for (; iter != l.end(); ++iter) { + bufferlist& bl = *iter; + data.push_back(bl); + data_size += bl.length(); + } + return ret; + } + } + + return 0; +} + +void OutputDataSocket::close_connection(int fd) +{ + TEMP_FAILURE_RETRY(close(fd)); +} + +bool OutputDataSocket::init(const std::string &path) +{ + ldout(m_cct, 5) << "init " << path << dendl; + + /* Set up things for the new thread */ + std::string err; + int pipe_rd = -1, pipe_wr = -1; + err = create_shutdown_pipe(&pipe_rd, &pipe_wr); + if (!err.empty()) { + lderr(m_cct) << "OutputDataSocketConfigObs::init: error: " << err << dendl; + return false; + } + int sock_fd; + err = bind_and_listen(path, &sock_fd); + if (!err.empty()) { + lderr(m_cct) << "OutputDataSocketConfigObs::init: failed: " << err << dendl; + close(pipe_rd); + close(pipe_wr); + return false; + } + + /* Create new thread */ + m_sock_fd = sock_fd; + m_shutdown_rd_fd = pipe_rd; + m_shutdown_wr_fd = pipe_wr; + m_path = path; + create(); + add_cleanup_file(m_path.c_str()); + return true; +} + +void OutputDataSocket::shutdown() +{ + m_lock.Lock(); + going_down = true; + cond.Signal(); + m_lock.Unlock(); + + if (m_shutdown_wr_fd < 0) + return; + + ldout(m_cct, 5) << "shutdown" << dendl; + + // Send a byte to the shutdown pipe that the thread is listening to + char buf[1] = { 0x0 }; + int ret = safe_write(m_shutdown_wr_fd, buf, sizeof(buf)); + TEMP_FAILURE_RETRY(close(m_shutdown_wr_fd)); + m_shutdown_wr_fd = -1; + + if (ret == 0) { + join(); + } else { + lderr(m_cct) << "OutputDataSocket::shutdown: failed to write " + "to thread shutdown pipe: error " << ret << dendl; + } + + remove_cleanup_file(m_path.c_str()); + m_path.clear(); +} + +void OutputDataSocket::append_output(bufferlist& bl) +{ + Mutex::Locker l(m_lock); + + if (data_size + bl.length() > data_max_backlog) { + ldout(m_cct, 20) << "dropping data output, max backlog reached" << dendl; + } + data.push_back(bl); + + data_size += bl.length(); + + cond.Signal(); +} diff --git a/src/common/OutputDataSocket.h b/src/common/OutputDataSocket.h new file mode 100644 index 00000000000..f581a56bf03 --- /dev/null +++ b/src/common/OutputDataSocket.h @@ -0,0 +1,72 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2011 New Dream Network + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_COMMON_OUTPUTDATASOCKET_H +#define CEPH_COMMON_OUTPUTDATASOCKET_H + +#include "common/Thread.h" +#include "common/Mutex.h" +#include "common/Cond.h" + +#include <string> +#include <map> +#include <list> +#include "include/buffer.h" + +class CephContext; + +class OutputDataSocket : public Thread +{ +public: + OutputDataSocket(CephContext *cct, uint64_t _backlog); + virtual ~OutputDataSocket(); + + bool init(const std::string &path); + + void append_output(bufferlist& bl); + +protected: + virtual void init_connection(bufferlist& bl) {} + void shutdown(); + + std::string create_shutdown_pipe(int *pipe_rd, int *pipe_wr); + std::string bind_and_listen(const std::string &sock_path, int *fd); + + void *entry(); + bool do_accept(); + + void handle_connection(int fd); + void close_connection(int fd); + + int dump_data(int fd); + + CephContext *m_cct; + uint64_t data_max_backlog; + std::string m_path; + int m_sock_fd; + int m_shutdown_rd_fd; + int m_shutdown_wr_fd; + bool going_down; + + uint64_t data_size; + + std::list<bufferlist> data; + + Mutex m_lock; + Cond cond; + + bufferlist delim; +}; + +#endif diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 6fb6d943658..b1ae20590d6 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -470,6 +470,9 @@ OPTION(rgw_usage_max_shards, OPT_INT, 32) OPTION(rgw_usage_max_user_shards, OPT_INT, 1) OPTION(rgw_enable_ops_log, OPT_BOOL, true) // enable logging every rgw operation OPTION(rgw_enable_usage_log, OPT_BOOL, true) // enable logging bandwidth usage +OPTION(rgw_ops_log_rados, OPT_BOOL, true) // whether ops log should go to rados +OPTION(rgw_ops_log_socket_path, OPT_STR, "") // path to unix domain socket where ops log can go +OPTION(rgw_ops_log_data_backlog, OPT_INT, 5 << 20) // max data backlog for ops log OPTION(rgw_usage_log_flush_threshold, OPT_INT, 1024) // threshold to flush pending log data OPTION(rgw_usage_log_tick_interval, OPT_INT, 30) // flush pending log data every X seconds OPTION(rgw_intent_log_object_name, OPT_STR, "%Y-%m-%d-%i-%n") // man date to see codes (a subset are supported) diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index 5a5b7406aaf..665ac5516e8 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -1361,25 +1361,8 @@ int main(int argc, char **argv) goto next; if (show_log_entries) { - formatter->open_object_section("log_entry"); - formatter->dump_string("bucket", entry.bucket); - entry.time.gmtime(formatter->dump_stream("time")); // UTC - entry.time.localtime(formatter->dump_stream("time_local")); - formatter->dump_string("remote_addr", entry.remote_addr); - if (entry.object_owner.length()) - formatter->dump_string("object_owner", entry.object_owner); - formatter->dump_string("user", entry.user); - formatter->dump_string("operation", entry.op); - formatter->dump_string("uri", entry.uri); - formatter->dump_string("http_status", entry.http_status); - formatter->dump_string("error_code", entry.error_code); - formatter->dump_int("bytes_sent", entry.bytes_sent); - formatter->dump_int("bytes_received", entry.bytes_received); - formatter->dump_int("object_size", entry.obj_size); - formatter->dump_int("total_time", total_time); - formatter->dump_string("user_agent", entry.user_agent); - formatter->dump_string("referrer", entry.referrer); - formatter->close_section(); + + rgw_format_ops_log_entry(entry, formatter); formatter->flush(cout); } next: diff --git a/src/rgw/rgw_log.cc b/src/rgw/rgw_log.cc index 5406a2c924b..a55df8bdbbf 100644 --- a/src/rgw/rgw_log.cc +++ b/src/rgw/rgw_log.cc @@ -1,6 +1,8 @@ #include "common/Clock.h" #include "common/Timer.h" #include "common/utf8.h" +#include "common/OutputDataSocket.h" +#include "common/Formatter.h" #include "rgw_log.h" #include "rgw_acl.h" @@ -188,7 +190,67 @@ static void log_usage(struct req_state *s, const string& op_name) usage_logger->insert(ts, entry); } -int rgw_log_op(RGWRados *store, struct req_state *s, const string& op_name) +void rgw_format_ops_log_entry(struct rgw_log_entry& entry, Formatter *formatter) +{ + formatter->open_object_section("log_entry"); + formatter->dump_string("bucket", entry.bucket); + entry.time.gmtime(formatter->dump_stream("time")); // UTC + entry.time.localtime(formatter->dump_stream("time_local")); + formatter->dump_string("remote_addr", entry.remote_addr); + if (entry.object_owner.length()) + formatter->dump_string("object_owner", entry.object_owner); + formatter->dump_string("user", entry.user); + formatter->dump_string("operation", entry.op); + formatter->dump_string("uri", entry.uri); + formatter->dump_string("http_status", entry.http_status); + formatter->dump_string("error_code", entry.error_code); + formatter->dump_int("bytes_sent", entry.bytes_sent); + formatter->dump_int("bytes_received", entry.bytes_received); + formatter->dump_int("object_size", entry.obj_size); + uint64_t total_time = entry.total_time.sec() * 1000000LL * entry.total_time.usec(); + + formatter->dump_int("total_time", total_time); + formatter->dump_string("user_agent", entry.user_agent); + formatter->dump_string("referrer", entry.referrer); + formatter->close_section(); +} + +void OpsLogSocket::formatter_to_bl(bufferlist& bl) +{ + stringstream ss; + formatter->flush(ss); + const string& s = ss.str(); + + bl.append(s); +} + +void OpsLogSocket::init_connection(bufferlist& bl) +{ + bl.append("["); +} + +OpsLogSocket::OpsLogSocket(CephContext *cct, uint64_t _backlog) : OutputDataSocket(cct, _backlog) +{ + formatter = new JSONFormatter; + delim.append(",\n"); +} + +OpsLogSocket::~OpsLogSocket() +{ + delete formatter; +} + +void OpsLogSocket::log(struct rgw_log_entry& entry) +{ + bufferlist bl; + + rgw_format_ops_log_entry(entry, formatter); + formatter_to_bl(bl); + + append_output(bl); +} + +int rgw_log_op(RGWRados *store, struct req_state *s, const string& op_name, OpsLogSocket *olog) { struct rgw_log_entry entry; string bucket_id; @@ -263,19 +325,27 @@ int rgw_log_op(RGWRados *store, struct req_state *s, const string& op_name) gmtime_r(&t, &bdt); else localtime_r(&t, &bdt); - - string oid = render_log_object_name(s->cct->_conf->rgw_log_object_name, &bdt, - s->bucket.bucket_id, entry.bucket.c_str()); - rgw_obj obj(store->params.log_pool, oid); + int ret = 0; + + if (s->cct->_conf->rgw_ops_log_rados) { + string oid = render_log_object_name(s->cct->_conf->rgw_log_object_name, &bdt, + s->bucket.bucket_id, entry.bucket.c_str()); + + rgw_obj obj(store->params.log_pool, oid); - int ret = store->append_async(obj, bl.length(), bl); - if (ret == -ENOENT) { - ret = store->create_pool(store->params.log_pool); - if (ret < 0) - goto done; - // retry ret = store->append_async(obj, bl.length(), bl); + if (ret == -ENOENT) { + ret = store->create_pool(store->params.log_pool); + if (ret < 0) + goto done; + // retry + ret = store->append_async(obj, bl.length(), bl); + } + } + + if (olog) { + olog->log(entry); } done: if (ret < 0) diff --git a/src/rgw/rgw_log.h b/src/rgw/rgw_log.h index f832c1cadeb..823f0b1767f 100644 --- a/src/rgw/rgw_log.h +++ b/src/rgw/rgw_log.h @@ -3,6 +3,8 @@ #include "rgw_common.h" #include "include/utime.h" +#include "common/Formatter.h" +#include "common/OutputDataSocket.h" class RGWRados; @@ -115,11 +117,27 @@ struct rgw_intent_log_entry { }; WRITE_CLASS_ENCODER(rgw_intent_log_entry) -int rgw_log_op(RGWRados *store, struct req_state *s, const string& op_name); +class OpsLogSocket : public OutputDataSocket { + Formatter *formatter; + + void formatter_to_bl(bufferlist& bl); + +protected: + void init_connection(bufferlist& bl); + +public: + OpsLogSocket(CephContext *cct, uint64_t _backlog); + ~OpsLogSocket(); + + void log(struct rgw_log_entry& entry); +}; + +int rgw_log_op(RGWRados *store, struct req_state *s, const string& op_name, OpsLogSocket *olog); int rgw_log_intent(RGWRados *store, rgw_obj& obj, RGWIntentEvent intent, const utime_t& timestamp, bool utc); int rgw_log_intent(RGWRados *store, struct req_state *s, rgw_obj& obj, RGWIntentEvent intent); void rgw_log_usage_init(CephContext *cct, RGWRados *store); void rgw_log_usage_finalize(); +void rgw_format_ops_log_entry(struct rgw_log_entry& entry, Formatter *formatter); #endif diff --git a/src/rgw/rgw_main.cc b/src/rgw/rgw_main.cc index 944b59a5c8d..4e87d64a497 100644 --- a/src/rgw/rgw_main.cc +++ b/src/rgw/rgw_main.cc @@ -127,6 +127,7 @@ struct RGWRequest class RGWProcess { RGWRados *store; + OpsLogSocket *olog; deque<RGWRequest *> m_req_queue; ThreadPool m_tp; Throttle req_throttle; @@ -185,8 +186,8 @@ class RGWProcess { uint64_t max_req_id; public: - RGWProcess(CephContext *cct, RGWRados *rgwstore, int num_threads, RGWREST *_rest) - : store(rgwstore), m_tp(cct, "RGWProcess::m_tp", num_threads), + RGWProcess(CephContext *cct, RGWRados *rgwstore, OpsLogSocket *_olog, int num_threads, RGWREST *_rest) + : store(rgwstore), olog(_olog), m_tp(cct, "RGWProcess::m_tp", num_threads), req_throttle(cct, "rgw_ops", num_threads * 2), rest(_rest), req_wq(this, g_conf->rgw_op_thread_timeout, @@ -331,7 +332,7 @@ void RGWProcess::handle_request(RGWRequest *req) op->execute(); op->complete(); done: - rgw_log_op(store, s, (op ? op->name() : "unknown")); + rgw_log_op(store, s, (op ? op->name() : "unknown"), olog); int http_ret = s->err.http_ret; @@ -485,7 +486,14 @@ int main(int argc, const char **argv) rest.register_resource(g_conf->rgw_admin_entry, admin_resource); } - RGWProcess process(g_ceph_context, store, g_conf->rgw_thread_pool_size, &rest); + OpsLogSocket *olog = NULL; + + if (!g_conf->rgw_ops_log_socket_path.empty()) { + olog = new OpsLogSocket(g_ceph_context, g_conf->rgw_ops_log_data_backlog); + olog->init(g_conf->rgw_ops_log_socket_path); + } + + RGWProcess process(g_ceph_context, store, olog, g_conf->rgw_thread_pool_size, &rest); process.run(); if (do_swift) { @@ -494,6 +502,8 @@ int main(int argc, const char **argv) rgw_log_usage_finalize(); + delete olog; + rgw_perf_stop(g_ceph_context); unregister_async_signal_handler(SIGHUP, sighup_handler); |