diff options
Diffstat (limited to 'src/rgw/rgw_process.cc')
-rw-r--r-- | src/rgw/rgw_process.cc | 635 |
1 files changed, 635 insertions, 0 deletions
diff --git a/src/rgw/rgw_process.cc b/src/rgw/rgw_process.cc new file mode 100644 index 00000000000..9c82f89b163 --- /dev/null +++ b/src/rgw/rgw_process.cc @@ -0,0 +1,635 @@ +#include <stdlib.h> +#include <stdio.h> +#include <string.h> +#include <stdarg.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> +#include <errno.h> +#include <signal.h> + +#include <curl/curl.h> + +#include "acconfig.h" + +#include "common/ceph_argparse.h" +#include "global/global_init.h" +#include "global/signal_handler.h" +#include "common/config.h" +#include "common/errno.h" +#include "common/WorkQueue.h" +#include "common/Timer.h" +#include "common/Throttle.h" +#include "include/str_list.h" +#include "rgw_common.h" +#include "rgw_rados.h" +#include "rgw_acl.h" +#include "rgw_user.h" +#include "rgw_op.h" +#include "rgw_rest.h" +#include "rgw_rest_s3.h" +#include "rgw_rest_swift.h" +#include "rgw_rest_admin.h" +#include "rgw_rest_usage.h" +#include "rgw_rest_user.h" +#include "rgw_rest_bucket.h" +#include "rgw_rest_metadata.h" +#include "rgw_rest_log.h" +#include "rgw_rest_opstate.h" +#include "rgw_replica_log.h" +#include "rgw_rest_replica_log.h" +#include "rgw_rest_config.h" +#include "rgw_swift_auth.h" +#include "rgw_swift.h" +#include "rgw_log.h" +#include "rgw_tools.h" +#include "rgw_resolve.h" +#include "rgw_client_io.h" + +#include <map> +#include <string> +#include <vector> +#include <iostream> +#include <sstream> + +#include "include/types.h" +#include "common/BackTrace.h" + +#define dout_subsys ceph_subsys_rgw + +using namespace std; + +static sighandler_t sighandler_alrm; + +class RGWProcess; + +static RGWProcess *pprocess = NULL; + + +#define SOCKET_BACKLOG 1024 + +struct RGWRequest +{ + // FCGX_Request fcgx; + uint64_t id; + struct req_state *s; + string req_str; + RGWOp *op; + utime_t ts; + + RGWRequest() : id(0), s(NULL), op(NULL) { + } + + ~RGWRequest() { + delete s; + } + + req_state *init_state(CephContext *cct, RGWEnv *env) { + s = new req_state(cct, env); + return s; + } + + void log_format(struct req_state *s, const char *fmt, ...) + { +#define LARGE_SIZE 1024 + char buf[LARGE_SIZE]; + va_list ap; + + va_start(ap, fmt); + vsnprintf(buf, sizeof(buf), fmt, ap); + va_end(ap); + + log(s, buf); + } + + void log_init() { + ts = ceph_clock_now(g_ceph_context); + } + + void log(struct req_state *s, const char *msg) { + if (s->info.method && req_str.size() == 0) { + req_str = s->info.method; + req_str.append(" "); + req_str.append(s->info.request_uri); + } + utime_t t = ceph_clock_now(g_ceph_context) - ts; + dout(2) << "req " << id << ":" << t << ":" << s->dialect << ":" << req_str << ":" << (op ? op->name() : "") << ":" << msg << dendl; + } +}; + +class RGWProcess { + RGWRados *store; + OpsLogSocket *olog; + deque<RGWRequest *> m_req_queue; + ThreadPool m_tp; + Throttle req_throttle; + RGWREST *rest; + int sock_fd; + + struct RGWWQ : public ThreadPool::WorkQueue<RGWRequest> { + RGWProcess *process; + RGWWQ(RGWProcess *p, time_t timeout, time_t suicide_timeout, ThreadPool *tp) + : ThreadPool::WorkQueue<RGWRequest>("RGWWQ", timeout, suicide_timeout, tp), process(p) {} + + bool _enqueue(RGWRequest *req) { + process->m_req_queue.push_back(req); + perfcounter->inc(l_rgw_qlen); + dout(20) << "enqueued request req=" << hex << req << dec << dendl; + _dump_queue(); + return true; + } + void _dequeue(RGWRequest *req) { + assert(0); + } + bool _empty() { + return process->m_req_queue.empty(); + } + RGWRequest *_dequeue() { + if (process->m_req_queue.empty()) + return NULL; + RGWRequest *req = process->m_req_queue.front(); + process->m_req_queue.pop_front(); + dout(20) << "dequeued request req=" << hex << req << dec << dendl; + _dump_queue(); + perfcounter->inc(l_rgw_qlen, -1); + return req; + } + void _process(RGWRequest *req) { + perfcounter->inc(l_rgw_qactive); + process->handle_request(req); + process->req_throttle.put(1); + perfcounter->inc(l_rgw_qactive, -1); + } + void _dump_queue() { + deque<RGWRequest *>::iterator iter; + if (process->m_req_queue.empty()) { + dout(20) << "RGWWQ: empty" << dendl; + return; + } + dout(20) << "RGWWQ:" << dendl; + for (iter = process->m_req_queue.begin(); iter != process->m_req_queue.end(); ++iter) { + dout(20) << "req: " << hex << *iter << dec << dendl; + } + } + void _clear() { + assert(process->m_req_queue.empty()); + } + } req_wq; + + uint64_t max_req_id; + +public: + RGWProcess(CephContext *cct, RGWRados *rgwstore, OpsLogSocket *_olog, int num_threads, RGWREST *_rest) + : store(rgwstore), olog(_olog), m_tp(cct, "RGWProcess::m_tp", num_threads), + req_throttle(cct, "rgw_ops", num_threads * 2), + rest(_rest), sock_fd(-1), + req_wq(this, g_conf->rgw_op_thread_timeout, + g_conf->rgw_op_thread_suicide_timeout, &m_tp), + max_req_id(0) {} + void run(); + void handle_request(RGWRequest *req); + + void close_fd() { + if (sock_fd >= 0) + close(sock_fd); + } +}; + +void RGWProcess::run() +{ +#if 0 + sock_fd = 0; + if (!g_conf->rgw_socket_path.empty()) { + string path_str = g_conf->rgw_socket_path; + + /* this is necessary, as FCGX_OpenSocket might not return an error, but rather ungracefully exit */ + int fd = open(path_str.c_str(), O_CREAT, 0644); + if (fd < 0) { + int err = errno; + /* ENXIO is actually expected, we'll get that if we try to open a unix domain socket */ + if (err != ENXIO) { + dout(0) << "ERROR: cannot create socket: path=" << path_str << " error=" << cpp_strerror(err) << dendl; + return; + } + } else { + close(fd); + } + + const char *path = path_str.c_str(); + sock_fd = FCGX_OpenSocket(path, SOCKET_BACKLOG); + if (sock_fd < 0) { + dout(0) << "ERROR: FCGX_OpenSocket (" << path << ") returned " << sock_fd << dendl; + return; + } + if (chmod(path, 0777) < 0) { + dout(0) << "WARNING: couldn't set permissions on unix domain socket" << dendl; + } + } else if (!g_conf->rgw_port.empty()) { + string bind = g_conf->rgw_host + ":" + g_conf->rgw_port; + sock_fd = FCGX_OpenSocket(bind.c_str(), SOCKET_BACKLOG); + if (sock_fd < 0) { + dout(0) << "ERROR: FCGX_OpenSocket (" << bind.c_str() << ") returned " << sock_fd << dendl; + return; + } + } + + m_tp.start(); + + for (;;) { + RGWRequest *req = new RGWRequest; + req->id = ++max_req_id; + dout(10) << "allocated request req=" << hex << req << dec << dendl; + FCGX_InitRequest(&req->fcgx, sock_fd, 0); + req_throttle.get(1); + int ret = FCGX_Accept_r(&req->fcgx); + if (ret < 0) { + delete req; + dout(0) << "ERROR: FCGX_Accept_r returned " << ret << dendl; + req_throttle.put(1); + break; + } + + req_wq.queue(req); + } + + m_tp.drain(); + m_tp.stop(); +#endif +} + +static void handle_sigterm(int signum) +{ + dout(1) << __func__ << dendl; + + // close the fd, so that accept can't start again. + pprocess->close_fd(); + + // send a signal to make fcgi's accept(2) wake up. unfortunately the + // initial signal often isn't sufficient because we race with accept's + // check of the flag wet by ShutdownPending() above. + if (signum != SIGUSR1) { + kill(getpid(), SIGUSR1); + + // safety net in case we get stuck doing an orderly shutdown. + uint64_t secs = g_ceph_context->_conf->rgw_exit_timeout_secs; + if (secs) + alarm(secs); + dout(1) << __func__ << " set alarm for " << secs << dendl; + } + +} + +static void godown_alarm(int signum) +{ + _exit(0); +} + +static int call_log_intent(RGWRados *store, void *ctx, rgw_obj& obj, RGWIntentEvent intent) +{ + struct req_state *s = (struct req_state *)ctx; + return rgw_log_intent(store, s, obj, intent); +} + +class RGWIO : public RGWClientIO +{ +protected: + int write_data(const char *buf, int len) { return 0; } + int read_data(char *buf, int len) { return 0; } + +public: + RGWIO() {} + void flush() {} + const char **envp() { return NULL; } +}; + +void RGWProcess::handle_request(RGWRequest *req) +{ +#if 0 + FCGX_Request *fcgx = &req->fcgx; + RGWFCGX client_io(fcgx); +#endif + RGWIO client_io; + int ret; + RGWEnv rgw_env; + + req->log_init(); + + dout(1) << "====== starting new request req=" << hex << req << dec << " =====" << dendl; + perfcounter->inc(l_rgw_req); + +#if 0 + rgw_env.init(g_ceph_context, fcgx->envp); +#endif + + struct req_state *s = req->init_state(g_ceph_context, &rgw_env); + s->obj_ctx = store->create_context(s); + store->set_intent_cb(s->obj_ctx, call_log_intent); + + s->req_id = store->unique_id(req->id); + + req->log(s, "initializing"); + + RGWOp *op = NULL; + int init_error = 0; + bool should_log = false; + RGWRESTMgr *mgr; + RGWHandler *handler = rest->get_handler(store, s, &client_io, &mgr, &init_error); + if (init_error != 0) { + abort_early(s, NULL, init_error); + goto done; + } + + should_log = mgr->get_logging(); + + req->log(s, "getting op"); + op = handler->get_op(store); + if (!op) { + abort_early(s, NULL, -ERR_METHOD_NOT_ALLOWED); + goto done; + } + req->op = op; + + req->log(s, "authorizing"); + ret = handler->authorize(); + if (ret < 0) { + dout(10) << "failed to authorize request" << dendl; + abort_early(s, op, ret); + goto done; + } + + if (s->user.suspended) { + dout(10) << "user is suspended, uid=" << s->user.user_id << dendl; + abort_early(s, op, -ERR_USER_SUSPENDED); + goto done; + } + req->log(s, "reading permissions"); + ret = handler->read_permissions(op); + if (ret < 0) { + abort_early(s, op, ret); + goto done; + } + + req->log(s, "init op"); + ret = op->init_processing(); + if (ret < 0) { + abort_early(s, op, ret); + goto done; + } + + req->log(s, "verifying op mask"); + ret = op->verify_op_mask(); + if (ret < 0) { + abort_early(s, op, ret); + goto done; + } + + req->log(s, "verifying op permissions"); + ret = op->verify_permission(); + if (ret < 0) { + if (s->system_request) { + dout(2) << "overriding permissions due to system operation" << dendl; + } else { + abort_early(s, op, ret); + goto done; + } + } + + req->log(s, "verifying op params"); + ret = op->verify_params(); + if (ret < 0) { + abort_early(s, op, ret); + goto done; + } + + if (s->expect_cont) + dump_continue(s); + + req->log(s, "executing"); + op->execute(); + op->complete(); +done: + if (should_log) { + rgw_log_op(store, s, (op ? op->name() : "unknown"), olog); + } + + int http_ret = s->err.http_ret; + + req->log_format(s, "http status=%d", http_ret); + + if (handler) + handler->put_op(op); + rest->put_handler(handler); + store->destroy_context(s->obj_ctx); +#if 0 + FCGX_Finish_r(fcgx); +#endif + + dout(1) << "====== req done req=" << hex << req << dec << " http_status=" << http_ret << " ======" << dendl; + delete req; +} + +#ifdef HAVE_CURL_MULTI_WAIT +static void check_curl() +{ +} +#else +static void check_curl() +{ + derr << "WARNING: libcurl doesn't support curl_multi_wait()" << dendl; + derr << "WARNING: cross zone / region transfer performance may be affected" << dendl; +} +#endif + +class C_InitTimeout : public Context { +public: + C_InitTimeout() {} + void finish(int r) { + derr << "Initialization timeout, failed to initialize" << dendl; + exit(1); + } +}; + +int usage() +{ + cerr << "usage: radosgw [options...]" << std::endl; + cerr << "options:\n"; + cerr << " --rgw-region=<region> region in which radosgw runs\n"; + cerr << " --rgw-zone=<zone> zone in which radosgw runs\n"; + generic_server_usage(); + return 0; +} + +static RGWRESTMgr *set_logging(RGWRESTMgr *mgr) +{ + mgr->set_logging(true); + return mgr; +} + +/* + * start up the RADOS connection and then handle HTTP messages as they come in + */ +extern "C" int rgw_process_init(int argc, const char **argv) +{ + cerr << __FILE__ << ":" << __LINE__ << std::endl; + // dout() messages will be sent to stderr, but FCGX wants messages on stdout + // Redirect stderr to stdout. + TEMP_FAILURE_RETRY(close(STDERR_FILENO)); + if (TEMP_FAILURE_RETRY(dup2(STDOUT_FILENO, STDERR_FILENO) < 0)) { + int err = errno; + cout << "failed to redirect stderr to stdout: " << cpp_strerror(err) + << std::endl; + return ENOSYS; + } + + /* alternative default for module */ + vector<const char *> def_args; + def_args.push_back("--debug-rgw=20"); + def_args.push_back("--keyring=$rgw_data/keyring"); + def_args.push_back("--log-file=/var/log/radosgw/$cluster-$name"); + + vector<const char*> args; + if (argv) + argv_to_vec(argc, argv, args); + env_to_vec(args); + global_init(&def_args, args, CEPH_ENTITY_TYPE_CLIENT, CODE_ENVIRONMENT_DAEMON, + CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS); + + for (std::vector<const char*>::iterator i = args.begin(); i != args.end(); ++i) { + if (ceph_argparse_flag(args, i, "-h", "--help", (char*)NULL)) { + usage(); + return 0; + } + } + + check_curl(); + + Mutex mutex("main"); + SafeTimer init_timer(g_ceph_context, mutex); + init_timer.init(); + mutex.Lock(); + init_timer.add_event_after(g_conf->rgw_init_timeout, new C_InitTimeout); + mutex.Unlock(); + + common_init_finish(g_ceph_context); + + rgw_tools_init(g_ceph_context); + + rgw_init_resolver(); + rgw_rest_init(g_ceph_context); + + curl_global_init(CURL_GLOBAL_ALL); + + int r = 0; + RGWRados *store = RGWStoreManager::get_storage(g_ceph_context, true); + if (!store) { + derr << "Couldn't init storage provider (RADOS)" << dendl; + r = EIO; + } + if (!r) + r = rgw_perf_start(g_ceph_context); + + mutex.Lock(); + init_timer.cancel_all_events(); + init_timer.shutdown(); + mutex.Unlock(); + + if (r) + return 1; + + rgw_user_init(store->meta_mgr); + rgw_bucket_init(store->meta_mgr); + rgw_log_usage_init(g_ceph_context, store); + + RGWREST rest; + + list<string> apis; + bool do_swift = false; + + get_str_list(g_conf->rgw_enable_apis, apis); + + map<string, bool> apis_map; + for (list<string>::iterator li = apis.begin(); li != apis.end(); ++li) { + apis_map[*li] = true; + } + + if (apis_map.count("s3") > 0) + rest.register_default_mgr(set_logging(new RGWRESTMgr_S3)); + + if (apis_map.count("swift") > 0) { + do_swift = true; + swift_init(g_ceph_context); + rest.register_resource(g_conf->rgw_swift_url_prefix, set_logging(new RGWRESTMgr_SWIFT)); + } + + if (apis_map.count("swift_auth") > 0) + rest.register_resource(g_conf->rgw_swift_auth_entry, set_logging(new RGWRESTMgr_SWIFT_Auth)); + + if (apis_map.count("admin") > 0) { + RGWRESTMgr_Admin *admin_resource = new RGWRESTMgr_Admin; + admin_resource->register_resource("usage", new RGWRESTMgr_Usage); + admin_resource->register_resource("user", new RGWRESTMgr_User); + admin_resource->register_resource("bucket", new RGWRESTMgr_Bucket); + + /*Registering resource for /admin/metadata */ + admin_resource->register_resource("metadata", new RGWRESTMgr_Metadata); + admin_resource->register_resource("log", new RGWRESTMgr_Log); + admin_resource->register_resource("opstate", new RGWRESTMgr_Opstate); + admin_resource->register_resource("replica_log", new RGWRESTMgr_ReplicaLog); + admin_resource->register_resource("config", new RGWRESTMgr_Config); + rest.register_resource(g_conf->rgw_admin_entry, admin_resource); + } + + OpsLogSocket *olog = NULL; + + if (!g_conf->rgw_ops_log_socket_path.empty()) { + olog = new OpsLogSocket(g_ceph_context, g_conf->rgw_ops_log_data_backlog); + olog->init(g_conf->rgw_ops_log_socket_path); + } + + pprocess = new RGWProcess(g_ceph_context, store, olog, g_conf->rgw_thread_pool_size, &rest); + +#if 0 + init_async_signal_handler(); + register_async_signal_handler(SIGHUP, sighup_handler); + register_async_signal_handler(SIGTERM, handle_sigterm); + register_async_signal_handler(SIGINT, handle_sigterm); + register_async_signal_handler(SIGUSR1, handle_sigterm); + + sighandler_alrm = signal(SIGALRM, godown_alarm); + + pprocess->run(); + derr << "shutting down" << dendl; + + unregister_async_signal_handler(SIGHUP, sighup_handler); + unregister_async_signal_handler(SIGTERM, handle_sigterm); + unregister_async_signal_handler(SIGINT, handle_sigterm); + unregister_async_signal_handler(SIGUSR1, handle_sigterm); + shutdown_async_signal_handler(); + + delete pprocess; + + if (do_swift) { + swift_finalize(); + } + + rgw_log_usage_finalize(); + + delete olog; + + rgw_perf_stop(g_ceph_context); + + RGWStoreManager::close_storage(store); + + rgw_tools_cleanup(); + rgw_shutdown_resolver(); + curl_global_cleanup(); + + dout(1) << "final shutdown" << dendl; + g_ceph_context->put(); + + ceph::crypto::shutdown(); + +#endif + return 0; +} + |