summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYehuda Sadeh <yehuda@inktank.com>2012-11-13 17:02:02 -0800
committerYehuda Sadeh <yehuda@inktank.com>2012-11-16 15:17:20 -0800
commit29a96cf29c90f297de9d6e878fc0a8607c11bae9 (patch)
tree74f4cce097f516d63c1f78710c3399f7607159ee
parent8b187bd8ca0f81bf623de8405fdc611a130b219f (diff)
downloadceph-29a96cf29c90f297de9d6e878fc0a8607c11bae9.tar.gz
rgw: ops log can also go to socket
Adding a new ops log output (into a unix domain socket). Configuration: rgw_enable_usage_log : master switch for ops log rgw ops log socket path : set socket path rgw ops log rados : whether ops should be logged in the rados cluster rgw ops log data backlog : max size in MB to be accumulated without flushing Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
-rw-r--r--src/Makefile.am2
-rw-r--r--src/common/OutputDataSocket.cc418
-rw-r--r--src/common/OutputDataSocket.h72
-rw-r--r--src/common/config_opts.h3
-rw-r--r--src/rgw/rgw_admin.cc21
-rw-r--r--src/rgw/rgw_log.cc92
-rw-r--r--src/rgw/rgw_log.h20
-rw-r--r--src/rgw/rgw_main.cc18
8 files changed, 611 insertions, 35 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index 9ea28ee2387..84d934e1c77 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1191,6 +1191,7 @@ libcommon_files = \
common/BackTrace.cc \
common/perf_counters.cc \
common/Mutex.cc \
+ common/OutputDataSocket.cc \
common/admin_socket.cc \
common/admin_socket_client.cc \
common/escape.c \
@@ -1469,6 +1470,7 @@ noinst_HEADERS = \
common/Finisher.h\
common/Formatter.h\
common/perf_counters.h\
+ common/OutputDataSocket.h \
common/admin_socket.h \
common/admin_socket_client.h \
common/shared_cache.hpp \
diff --git a/src/common/OutputDataSocket.cc b/src/common/OutputDataSocket.cc
new file mode 100644
index 00000000000..54f6ab4a2a4
--- /dev/null
+++ b/src/common/OutputDataSocket.cc
@@ -0,0 +1,418 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2011 New Dream Network
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "common/Thread.h"
+#include "common/OutputDataSocket.h"
+#include "common/config.h"
+#include "common/dout.h"
+#include "common/errno.h"
+#include "common/perf_counters.h"
+#include "common/pipe.h"
+#include "common/safe_io.h"
+#include "common/version.h"
+#include "common/Formatter.h"
+
+#include <errno.h>
+#include <fcntl.h>
+#include <inttypes.h>
+#include <map>
+#include <poll.h>
+#include <set>
+#include <sstream>
+#include <stdint.h>
+#include <string.h>
+#include <string>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <sys/un.h>
+#include <unistd.h>
+
+#include "include/compat.h"
+
+#define dout_subsys ceph_subsys_asok
+#undef dout_prefix
+#define dout_prefix *_dout << "asok(" << (void*)m_cct << ") "
+
+using std::ostringstream;
+
+/*
+ * UNIX domain sockets created by an application persist even after that
+ * application closes, unless they're explicitly unlinked. This is because the
+ * directory containing the socket keeps a reference to the socket.
+ *
+ * This code makes things a little nicer by unlinking those dead sockets when
+ * the application exits normally.
+ */
+static pthread_mutex_t cleanup_lock = PTHREAD_MUTEX_INITIALIZER;
+static std::vector <const char*> cleanup_files;
+static bool cleanup_atexit = false;
+
+static void remove_cleanup_file(const char *file)
+{
+ pthread_mutex_lock(&cleanup_lock);
+ TEMP_FAILURE_RETRY(unlink(file));
+ for (std::vector <const char*>::iterator i = cleanup_files.begin();
+ i != cleanup_files.end(); ++i) {
+ if (strcmp(file, *i) == 0) {
+ free((void*)*i);
+ cleanup_files.erase(i);
+ break;
+ }
+ }
+ pthread_mutex_unlock(&cleanup_lock);
+}
+
+static void remove_all_cleanup_files()
+{
+ pthread_mutex_lock(&cleanup_lock);
+ for (std::vector <const char*>::iterator i = cleanup_files.begin();
+ i != cleanup_files.end(); ++i) {
+ TEMP_FAILURE_RETRY(unlink(*i));
+ free((void*)*i);
+ }
+ cleanup_files.clear();
+ pthread_mutex_unlock(&cleanup_lock);
+}
+
+static void add_cleanup_file(const char *file)
+{
+ char *fname = strdup(file);
+ if (!fname)
+ return;
+ pthread_mutex_lock(&cleanup_lock);
+ cleanup_files.push_back(fname);
+ if (!cleanup_atexit) {
+ atexit(remove_all_cleanup_files);
+ cleanup_atexit = true;
+ }
+ pthread_mutex_unlock(&cleanup_lock);
+}
+
+
+OutputDataSocket::OutputDataSocket(CephContext *cct, uint64_t _backlog)
+ : m_cct(cct),
+ data_max_backlog(_backlog),
+ m_sock_fd(-1),
+ m_shutdown_rd_fd(-1),
+ m_shutdown_wr_fd(-1),
+ going_down(false),
+ m_lock("OutputDataSocket::m_lock")
+{
+}
+
+OutputDataSocket::~OutputDataSocket()
+{
+ shutdown();
+}
+
+/*
+ * This thread listens on the UNIX domain socket for incoming connections.
+ * It only handles one connection at a time at the moment. All I/O is nonblocking,
+ * so that we can implement sensible timeouts. [TODO: make all I/O nonblocking]
+ *
+ * This thread also listens to m_shutdown_rd_fd. If there is any data sent to this
+ * pipe, the thread terminates itself gracefully, allowing the
+ * OutputDataSocketConfigObs class to join() it.
+ */
+
+#define PFL_SUCCESS ((void*)(intptr_t)0)
+#define PFL_FAIL ((void*)(intptr_t)1)
+
+std::string OutputDataSocket::create_shutdown_pipe(int *pipe_rd, int *pipe_wr)
+{
+ int pipefd[2];
+ int ret = pipe_cloexec(pipefd);
+ if (ret < 0) {
+ ostringstream oss;
+ oss << "OutputDataSocket::create_shutdown_pipe error: " << cpp_strerror(ret);
+ return oss.str();
+ }
+
+ *pipe_rd = pipefd[0];
+ *pipe_wr = pipefd[1];
+ return "";
+}
+
+std::string OutputDataSocket::bind_and_listen(const std::string &sock_path, int *fd)
+{
+ ldout(m_cct, 5) << "bind_and_listen " << sock_path << dendl;
+
+ struct sockaddr_un address;
+ if (sock_path.size() > sizeof(address.sun_path) - 1) {
+ ostringstream oss;
+ oss << "OutputDataSocket::bind_and_listen: "
+ << "The UNIX domain socket path " << sock_path << " is too long! The "
+ << "maximum length on this system is "
+ << (sizeof(address.sun_path) - 1);
+ return oss.str();
+ }
+ int sock_fd = socket(PF_UNIX, SOCK_STREAM, 0);
+ if (sock_fd < 0) {
+ int err = errno;
+ ostringstream oss;
+ oss << "OutputDataSocket::bind_and_listen: "
+ << "failed to create socket: " << cpp_strerror(err);
+ return oss.str();
+ }
+ int r = fcntl(sock_fd, F_SETFD, FD_CLOEXEC);
+ if (r < 0) {
+ r = errno;
+ TEMP_FAILURE_RETRY(::close(sock_fd));
+ ostringstream oss;
+ oss << "OutputDataSocket::bind_and_listen: failed to fcntl on socket: " << cpp_strerror(r);
+ return oss.str();
+ }
+ memset(&address, 0, sizeof(struct sockaddr_un));
+ address.sun_family = AF_UNIX;
+ snprintf(address.sun_path, sizeof(address.sun_path),
+ "%s", sock_path.c_str());
+ if (bind(sock_fd, (struct sockaddr*)&address,
+ sizeof(struct sockaddr_un)) != 0) {
+ int err = errno;
+ if (err == EADDRINUSE) {
+ // The old UNIX domain socket must still be there.
+ // Let's unlink it and try again.
+ TEMP_FAILURE_RETRY(unlink(sock_path.c_str()));
+ if (bind(sock_fd, (struct sockaddr*)&address,
+ sizeof(struct sockaddr_un)) == 0) {
+ err = 0;
+ }
+ else {
+ err = errno;
+ }
+ }
+ if (err != 0) {
+ ostringstream oss;
+ oss << "OutputDataSocket::bind_and_listen: "
+ << "failed to bind the UNIX domain socket to '" << sock_path
+ << "': " << cpp_strerror(err);
+ close(sock_fd);
+ return oss.str();
+ }
+ }
+ if (listen(sock_fd, 5) != 0) {
+ int err = errno;
+ ostringstream oss;
+ oss << "OutputDataSocket::bind_and_listen: "
+ << "failed to listen to socket: " << cpp_strerror(err);
+ close(sock_fd);
+ TEMP_FAILURE_RETRY(unlink(sock_path.c_str()));
+ return oss.str();
+ }
+ *fd = sock_fd;
+ return "";
+}
+
+void* OutputDataSocket::entry()
+{
+ ldout(m_cct, 5) << "entry start" << dendl;
+ while (true) {
+ struct pollfd fds[2];
+ memset(fds, 0, sizeof(fds));
+ fds[0].fd = m_sock_fd;
+ fds[0].events = POLLIN | POLLRDBAND;
+ fds[1].fd = m_shutdown_rd_fd;
+ fds[1].events = POLLIN | POLLRDBAND;
+
+ int ret = poll(fds, 2, -1);
+ if (ret < 0) {
+ int err = errno;
+ if (err == EINTR) {
+ continue;
+ }
+ lderr(m_cct) << "OutputDataSocket: poll(2) error: '"
+ << cpp_strerror(err) << dendl;
+ return PFL_FAIL;
+ }
+
+ if (fds[0].revents & POLLIN) {
+ // Send out some data
+ do_accept();
+ }
+ if (fds[1].revents & POLLIN) {
+ // Parent wants us to shut down
+ return PFL_SUCCESS;
+ }
+ }
+ ldout(m_cct, 5) << "entry exit" << dendl;
+
+ return PFL_SUCCESS; // unreachable
+}
+
+
+bool OutputDataSocket::do_accept()
+{
+ struct sockaddr_un address;
+ socklen_t address_length = sizeof(address);
+ ldout(m_cct, 30) << "OutputDataSocket: calling accept" << dendl;
+ int connection_fd = accept(m_sock_fd, (struct sockaddr*) &address,
+ &address_length);
+ ldout(m_cct, 30) << "OutputDataSocket: finished accept" << dendl;
+ if (connection_fd < 0) {
+ int err = errno;
+ lderr(m_cct) << "OutputDataSocket: do_accept error: '"
+ << cpp_strerror(err) << dendl;
+ return false;
+ }
+
+ handle_connection(connection_fd);
+ close_connection(connection_fd);
+
+ return 0;
+}
+
+void OutputDataSocket::handle_connection(int fd)
+{
+ bufferlist bl;
+
+ m_lock.Lock();
+ init_connection(bl);
+ m_lock.Unlock();
+
+ if (bl.length()) {
+ /* need to special case the connection init buffer output, as it needs
+ * to be dumped before any data, including older data that was sent
+ * before the connection was established, or before we identified
+ * older connection was broken
+ */
+ int ret = safe_write(fd, bl.c_str(), bl.length());
+ if (ret < 0) {
+ return;
+ }
+ }
+
+ int ret = dump_data(fd);
+ if (ret < 0)
+ return;
+
+ do {
+ m_lock.Lock();
+ cond.Wait(m_lock);
+
+ if (going_down) {
+ m_lock.Unlock();
+ break;
+ }
+ m_lock.Unlock();
+
+ ret = dump_data(fd);
+ } while (ret >= 0);
+}
+
+int OutputDataSocket::dump_data(int fd)
+{
+ m_lock.Lock();
+ list<bufferlist> l;
+ l = data;
+ data.clear();
+ data_size = 0;
+ m_lock.Unlock();
+
+ for (list<bufferlist>::iterator iter = l.begin(); iter != l.end(); ++iter) {
+ bufferlist& bl = *iter;
+ int ret = safe_write(fd, bl.c_str(), bl.length());
+ if (ret >= 0) {
+ ret = safe_write(fd, delim.c_str(), delim.length());
+ }
+ if (ret < 0) {
+ for (; iter != l.end(); ++iter) {
+ bufferlist& bl = *iter;
+ data.push_back(bl);
+ data_size += bl.length();
+ }
+ return ret;
+ }
+ }
+
+ return 0;
+}
+
+void OutputDataSocket::close_connection(int fd)
+{
+ TEMP_FAILURE_RETRY(close(fd));
+}
+
+bool OutputDataSocket::init(const std::string &path)
+{
+ ldout(m_cct, 5) << "init " << path << dendl;
+
+ /* Set up things for the new thread */
+ std::string err;
+ int pipe_rd = -1, pipe_wr = -1;
+ err = create_shutdown_pipe(&pipe_rd, &pipe_wr);
+ if (!err.empty()) {
+ lderr(m_cct) << "OutputDataSocketConfigObs::init: error: " << err << dendl;
+ return false;
+ }
+ int sock_fd;
+ err = bind_and_listen(path, &sock_fd);
+ if (!err.empty()) {
+ lderr(m_cct) << "OutputDataSocketConfigObs::init: failed: " << err << dendl;
+ close(pipe_rd);
+ close(pipe_wr);
+ return false;
+ }
+
+ /* Create new thread */
+ m_sock_fd = sock_fd;
+ m_shutdown_rd_fd = pipe_rd;
+ m_shutdown_wr_fd = pipe_wr;
+ m_path = path;
+ create();
+ add_cleanup_file(m_path.c_str());
+ return true;
+}
+
+void OutputDataSocket::shutdown()
+{
+ m_lock.Lock();
+ going_down = true;
+ cond.Signal();
+ m_lock.Unlock();
+
+ if (m_shutdown_wr_fd < 0)
+ return;
+
+ ldout(m_cct, 5) << "shutdown" << dendl;
+
+ // Send a byte to the shutdown pipe that the thread is listening to
+ char buf[1] = { 0x0 };
+ int ret = safe_write(m_shutdown_wr_fd, buf, sizeof(buf));
+ TEMP_FAILURE_RETRY(close(m_shutdown_wr_fd));
+ m_shutdown_wr_fd = -1;
+
+ if (ret == 0) {
+ join();
+ } else {
+ lderr(m_cct) << "OutputDataSocket::shutdown: failed to write "
+ "to thread shutdown pipe: error " << ret << dendl;
+ }
+
+ remove_cleanup_file(m_path.c_str());
+ m_path.clear();
+}
+
+void OutputDataSocket::append_output(bufferlist& bl)
+{
+ Mutex::Locker l(m_lock);
+
+ if (data_size + bl.length() > data_max_backlog) {
+ ldout(m_cct, 20) << "dropping data output, max backlog reached" << dendl;
+ }
+ data.push_back(bl);
+
+ data_size += bl.length();
+
+ cond.Signal();
+}
diff --git a/src/common/OutputDataSocket.h b/src/common/OutputDataSocket.h
new file mode 100644
index 00000000000..f581a56bf03
--- /dev/null
+++ b/src/common/OutputDataSocket.h
@@ -0,0 +1,72 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2011 New Dream Network
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_COMMON_OUTPUTDATASOCKET_H
+#define CEPH_COMMON_OUTPUTDATASOCKET_H
+
+#include "common/Thread.h"
+#include "common/Mutex.h"
+#include "common/Cond.h"
+
+#include <string>
+#include <map>
+#include <list>
+#include "include/buffer.h"
+
+class CephContext;
+
+class OutputDataSocket : public Thread
+{
+public:
+ OutputDataSocket(CephContext *cct, uint64_t _backlog);
+ virtual ~OutputDataSocket();
+
+ bool init(const std::string &path);
+
+ void append_output(bufferlist& bl);
+
+protected:
+ virtual void init_connection(bufferlist& bl) {}
+ void shutdown();
+
+ std::string create_shutdown_pipe(int *pipe_rd, int *pipe_wr);
+ std::string bind_and_listen(const std::string &sock_path, int *fd);
+
+ void *entry();
+ bool do_accept();
+
+ void handle_connection(int fd);
+ void close_connection(int fd);
+
+ int dump_data(int fd);
+
+ CephContext *m_cct;
+ uint64_t data_max_backlog;
+ std::string m_path;
+ int m_sock_fd;
+ int m_shutdown_rd_fd;
+ int m_shutdown_wr_fd;
+ bool going_down;
+
+ uint64_t data_size;
+
+ std::list<bufferlist> data;
+
+ Mutex m_lock;
+ Cond cond;
+
+ bufferlist delim;
+};
+
+#endif
diff --git a/src/common/config_opts.h b/src/common/config_opts.h
index 6fb6d943658..b1ae20590d6 100644
--- a/src/common/config_opts.h
+++ b/src/common/config_opts.h
@@ -470,6 +470,9 @@ OPTION(rgw_usage_max_shards, OPT_INT, 32)
OPTION(rgw_usage_max_user_shards, OPT_INT, 1)
OPTION(rgw_enable_ops_log, OPT_BOOL, true) // enable logging every rgw operation
OPTION(rgw_enable_usage_log, OPT_BOOL, true) // enable logging bandwidth usage
+OPTION(rgw_ops_log_rados, OPT_BOOL, true) // whether ops log should go to rados
+OPTION(rgw_ops_log_socket_path, OPT_STR, "") // path to unix domain socket where ops log can go
+OPTION(rgw_ops_log_data_backlog, OPT_INT, 5 << 20) // max data backlog for ops log
OPTION(rgw_usage_log_flush_threshold, OPT_INT, 1024) // threshold to flush pending log data
OPTION(rgw_usage_log_tick_interval, OPT_INT, 30) // flush pending log data every X seconds
OPTION(rgw_intent_log_object_name, OPT_STR, "%Y-%m-%d-%i-%n") // man date to see codes (a subset are supported)
diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc
index 5a5b7406aaf..665ac5516e8 100644
--- a/src/rgw/rgw_admin.cc
+++ b/src/rgw/rgw_admin.cc
@@ -1361,25 +1361,8 @@ int main(int argc, char **argv)
goto next;
if (show_log_entries) {
- formatter->open_object_section("log_entry");
- formatter->dump_string("bucket", entry.bucket);
- entry.time.gmtime(formatter->dump_stream("time")); // UTC
- entry.time.localtime(formatter->dump_stream("time_local"));
- formatter->dump_string("remote_addr", entry.remote_addr);
- if (entry.object_owner.length())
- formatter->dump_string("object_owner", entry.object_owner);
- formatter->dump_string("user", entry.user);
- formatter->dump_string("operation", entry.op);
- formatter->dump_string("uri", entry.uri);
- formatter->dump_string("http_status", entry.http_status);
- formatter->dump_string("error_code", entry.error_code);
- formatter->dump_int("bytes_sent", entry.bytes_sent);
- formatter->dump_int("bytes_received", entry.bytes_received);
- formatter->dump_int("object_size", entry.obj_size);
- formatter->dump_int("total_time", total_time);
- formatter->dump_string("user_agent", entry.user_agent);
- formatter->dump_string("referrer", entry.referrer);
- formatter->close_section();
+
+ rgw_format_ops_log_entry(entry, formatter);
formatter->flush(cout);
}
next:
diff --git a/src/rgw/rgw_log.cc b/src/rgw/rgw_log.cc
index 5406a2c924b..a55df8bdbbf 100644
--- a/src/rgw/rgw_log.cc
+++ b/src/rgw/rgw_log.cc
@@ -1,6 +1,8 @@
#include "common/Clock.h"
#include "common/Timer.h"
#include "common/utf8.h"
+#include "common/OutputDataSocket.h"
+#include "common/Formatter.h"
#include "rgw_log.h"
#include "rgw_acl.h"
@@ -188,7 +190,67 @@ static void log_usage(struct req_state *s, const string& op_name)
usage_logger->insert(ts, entry);
}
-int rgw_log_op(RGWRados *store, struct req_state *s, const string& op_name)
+void rgw_format_ops_log_entry(struct rgw_log_entry& entry, Formatter *formatter)
+{
+ formatter->open_object_section("log_entry");
+ formatter->dump_string("bucket", entry.bucket);
+ entry.time.gmtime(formatter->dump_stream("time")); // UTC
+ entry.time.localtime(formatter->dump_stream("time_local"));
+ formatter->dump_string("remote_addr", entry.remote_addr);
+ if (entry.object_owner.length())
+ formatter->dump_string("object_owner", entry.object_owner);
+ formatter->dump_string("user", entry.user);
+ formatter->dump_string("operation", entry.op);
+ formatter->dump_string("uri", entry.uri);
+ formatter->dump_string("http_status", entry.http_status);
+ formatter->dump_string("error_code", entry.error_code);
+ formatter->dump_int("bytes_sent", entry.bytes_sent);
+ formatter->dump_int("bytes_received", entry.bytes_received);
+ formatter->dump_int("object_size", entry.obj_size);
+ uint64_t total_time = entry.total_time.sec() * 1000000LL * entry.total_time.usec();
+
+ formatter->dump_int("total_time", total_time);
+ formatter->dump_string("user_agent", entry.user_agent);
+ formatter->dump_string("referrer", entry.referrer);
+ formatter->close_section();
+}
+
+void OpsLogSocket::formatter_to_bl(bufferlist& bl)
+{
+ stringstream ss;
+ formatter->flush(ss);
+ const string& s = ss.str();
+
+ bl.append(s);
+}
+
+void OpsLogSocket::init_connection(bufferlist& bl)
+{
+ bl.append("[");
+}
+
+OpsLogSocket::OpsLogSocket(CephContext *cct, uint64_t _backlog) : OutputDataSocket(cct, _backlog)
+{
+ formatter = new JSONFormatter;
+ delim.append(",\n");
+}
+
+OpsLogSocket::~OpsLogSocket()
+{
+ delete formatter;
+}
+
+void OpsLogSocket::log(struct rgw_log_entry& entry)
+{
+ bufferlist bl;
+
+ rgw_format_ops_log_entry(entry, formatter);
+ formatter_to_bl(bl);
+
+ append_output(bl);
+}
+
+int rgw_log_op(RGWRados *store, struct req_state *s, const string& op_name, OpsLogSocket *olog)
{
struct rgw_log_entry entry;
string bucket_id;
@@ -263,19 +325,27 @@ int rgw_log_op(RGWRados *store, struct req_state *s, const string& op_name)
gmtime_r(&t, &bdt);
else
localtime_r(&t, &bdt);
-
- string oid = render_log_object_name(s->cct->_conf->rgw_log_object_name, &bdt,
- s->bucket.bucket_id, entry.bucket.c_str());
- rgw_obj obj(store->params.log_pool, oid);
+ int ret = 0;
+
+ if (s->cct->_conf->rgw_ops_log_rados) {
+ string oid = render_log_object_name(s->cct->_conf->rgw_log_object_name, &bdt,
+ s->bucket.bucket_id, entry.bucket.c_str());
+
+ rgw_obj obj(store->params.log_pool, oid);
- int ret = store->append_async(obj, bl.length(), bl);
- if (ret == -ENOENT) {
- ret = store->create_pool(store->params.log_pool);
- if (ret < 0)
- goto done;
- // retry
ret = store->append_async(obj, bl.length(), bl);
+ if (ret == -ENOENT) {
+ ret = store->create_pool(store->params.log_pool);
+ if (ret < 0)
+ goto done;
+ // retry
+ ret = store->append_async(obj, bl.length(), bl);
+ }
+ }
+
+ if (olog) {
+ olog->log(entry);
}
done:
if (ret < 0)
diff --git a/src/rgw/rgw_log.h b/src/rgw/rgw_log.h
index f832c1cadeb..823f0b1767f 100644
--- a/src/rgw/rgw_log.h
+++ b/src/rgw/rgw_log.h
@@ -3,6 +3,8 @@
#include "rgw_common.h"
#include "include/utime.h"
+#include "common/Formatter.h"
+#include "common/OutputDataSocket.h"
class RGWRados;
@@ -115,11 +117,27 @@ struct rgw_intent_log_entry {
};
WRITE_CLASS_ENCODER(rgw_intent_log_entry)
-int rgw_log_op(RGWRados *store, struct req_state *s, const string& op_name);
+class OpsLogSocket : public OutputDataSocket {
+ Formatter *formatter;
+
+ void formatter_to_bl(bufferlist& bl);
+
+protected:
+ void init_connection(bufferlist& bl);
+
+public:
+ OpsLogSocket(CephContext *cct, uint64_t _backlog);
+ ~OpsLogSocket();
+
+ void log(struct rgw_log_entry& entry);
+};
+
+int rgw_log_op(RGWRados *store, struct req_state *s, const string& op_name, OpsLogSocket *olog);
int rgw_log_intent(RGWRados *store, rgw_obj& obj, RGWIntentEvent intent, const utime_t& timestamp, bool utc);
int rgw_log_intent(RGWRados *store, struct req_state *s, rgw_obj& obj, RGWIntentEvent intent);
void rgw_log_usage_init(CephContext *cct, RGWRados *store);
void rgw_log_usage_finalize();
+void rgw_format_ops_log_entry(struct rgw_log_entry& entry, Formatter *formatter);
#endif
diff --git a/src/rgw/rgw_main.cc b/src/rgw/rgw_main.cc
index 944b59a5c8d..4e87d64a497 100644
--- a/src/rgw/rgw_main.cc
+++ b/src/rgw/rgw_main.cc
@@ -127,6 +127,7 @@ struct RGWRequest
class RGWProcess {
RGWRados *store;
+ OpsLogSocket *olog;
deque<RGWRequest *> m_req_queue;
ThreadPool m_tp;
Throttle req_throttle;
@@ -185,8 +186,8 @@ class RGWProcess {
uint64_t max_req_id;
public:
- RGWProcess(CephContext *cct, RGWRados *rgwstore, int num_threads, RGWREST *_rest)
- : store(rgwstore), m_tp(cct, "RGWProcess::m_tp", num_threads),
+ RGWProcess(CephContext *cct, RGWRados *rgwstore, OpsLogSocket *_olog, int num_threads, RGWREST *_rest)
+ : store(rgwstore), olog(_olog), m_tp(cct, "RGWProcess::m_tp", num_threads),
req_throttle(cct, "rgw_ops", num_threads * 2),
rest(_rest),
req_wq(this, g_conf->rgw_op_thread_timeout,
@@ -331,7 +332,7 @@ void RGWProcess::handle_request(RGWRequest *req)
op->execute();
op->complete();
done:
- rgw_log_op(store, s, (op ? op->name() : "unknown"));
+ rgw_log_op(store, s, (op ? op->name() : "unknown"), olog);
int http_ret = s->err.http_ret;
@@ -485,7 +486,14 @@ int main(int argc, const char **argv)
rest.register_resource(g_conf->rgw_admin_entry, admin_resource);
}
- RGWProcess process(g_ceph_context, store, g_conf->rgw_thread_pool_size, &rest);
+ OpsLogSocket *olog = NULL;
+
+ if (!g_conf->rgw_ops_log_socket_path.empty()) {
+ olog = new OpsLogSocket(g_ceph_context, g_conf->rgw_ops_log_data_backlog);
+ olog->init(g_conf->rgw_ops_log_socket_path);
+ }
+
+ RGWProcess process(g_ceph_context, store, olog, g_conf->rgw_thread_pool_size, &rest);
process.run();
if (do_swift) {
@@ -494,6 +502,8 @@ int main(int argc, const char **argv)
rgw_log_usage_finalize();
+ delete olog;
+
rgw_perf_stop(g_ceph_context);
unregister_async_signal_handler(SIGHUP, sighup_handler);