diff options
Diffstat (limited to 'src/rgw/rgw_main.cc')
-rw-r--r-- | src/rgw/rgw_main.cc | 138 |
1 files changed, 132 insertions, 6 deletions
diff --git a/src/rgw/rgw_main.cc b/src/rgw/rgw_main.cc index 944b59a5c8d..33181ec4ce0 100644 --- a/src/rgw/rgw_main.cc +++ b/src/rgw/rgw_main.cc @@ -43,6 +43,9 @@ #include "rgw_log.h" #include "rgw_tools.h" #include "rgw_resolve.h" +#include "rgw_mongoose.h" + +#include "mongoose/mongoose.h" #include <map> #include <string> @@ -125,6 +128,11 @@ struct RGWRequest } }; +struct RGWProcessEnv { + RGWRados *store; + RGWREST *rest; +}; + class RGWProcess { RGWRados *store; deque<RGWRequest *> m_req_queue; @@ -132,6 +140,8 @@ class RGWProcess { Throttle req_throttle; RGWREST *rest; + RGWProcessEnv *process_env; + struct RGWWQ : public ThreadPool::WorkQueue<RGWRequest> { RGWProcess *process; RGWWQ(RGWProcess *p, time_t timeout, time_t suicide_timeout, ThreadPool *tp) @@ -185,10 +195,10 @@ class RGWProcess { uint64_t max_req_id; public: - RGWProcess(CephContext *cct, RGWRados *rgwstore, int num_threads, RGWREST *_rest) - : store(rgwstore), m_tp(cct, "RGWProcess::m_tp", num_threads), + RGWProcess(CephContext *cct, RGWProcessEnv *pe, int num_threads) + : store(pe->store), m_tp(cct, "RGWProcess::m_tp", num_threads), req_throttle(cct, "rgw_ops", num_threads * 2), - rest(_rest), + rest(pe->rest), req_wq(this, g_conf->rgw_op_thread_timeout, g_conf->rgw_op_thread_suicide_timeout, &m_tp), max_req_id(0) {} @@ -258,15 +268,16 @@ void RGWProcess::handle_request(RGWRequest *req) { FCGX_Request *fcgx = &req->fcgx; int ret; - RGWEnv rgw_env; RGWFCGX client_io(fcgx); + client_io.init(g_ceph_context); + req->log_init(); dout(1) << "====== starting new request req=" << hex << req << dec << " =====" << dendl; perfcounter->inc(l_rgw_req); - rgw_env.init(g_ceph_context, fcgx->envp); + RGWEnv& rgw_env = client_io.get_env(); struct req_state *s = req->init_state(g_ceph_context, &rgw_env); s->obj_ctx = store->create_context(s); @@ -356,6 +367,111 @@ public: } }; +static void *mongoose_callback(enum mg_event event, + struct mg_connection *conn) { + const struct mg_request_info *request_info = mg_get_request_info(conn); + RGWProcessEnv *pe = (RGWProcessEnv *)request_info->user_data; + RGWRados *store = pe->store; + RGWREST *rest = pe->rest; + + if (event != MG_NEW_REQUEST) + return NULL; + + RGWRequest *req = new RGWRequest; + + int ret; + RGWMongoose client_io(conn); + + client_io.init(g_ceph_context); + + req->log_init(); + + dout(1) << "====== starting new request req=" << hex << req << dec << " =====" << dendl; + perfcounter->inc(l_rgw_req); + + RGWEnv& rgw_env = client_io.get_env(); + + struct req_state *s = req->init_state(g_ceph_context, &rgw_env); + s->obj_ctx = store->create_context(s); + store->set_intent_cb(s->obj_ctx, call_log_intent); + + req->log(s, "initializing"); + + RGWOp *op = NULL; + int init_error = 0; + RGWHandler *handler = rest->get_handler(store, s, &client_io, &init_error); + if (init_error != 0) { + abort_early(s, init_error); + goto done; + } + + req->log(s, "getting op"); + op = handler->get_op(store); + if (!op) { + abort_early(s, -ERR_METHOD_NOT_ALLOWED); + goto done; + } + req->op = op; + + req->log(s, "authorizing"); + ret = handler->authorize(); + if (ret < 0) { + dout(10) << "failed to authorize request" << dendl; + abort_early(s, ret); + goto done; + } + + if (s->user.suspended) { + dout(10) << "user is suspended, uid=" << s->user.user_id << dendl; + abort_early(s, -ERR_USER_SUSPENDED); + goto done; + } + req->log(s, "reading permissions"); + ret = handler->read_permissions(op); + if (ret < 0) { + abort_early(s, ret); + goto done; + } + + req->log(s, "verifying op permissions"); + ret = op->verify_permission(); + if (ret < 0) { + abort_early(s, ret); + goto done; + } + + req->log(s, "verifying op params"); + ret = op->verify_params(); + if (ret < 0) { + abort_early(s, ret); + goto done; + } + + if (s->expect_cont) + dump_continue(s); + + req->log(s, "executing"); + op->execute(); + op->complete(); +done: + rgw_log_op(store, s, (op ? op->name() : "unknown")); + + int http_ret = s->err.http_ret; + + req->log_format(s, "http status=%d", http_ret); + + if (handler) + handler->put_op(op); + rest->put_handler(handler); + store->destroy_context(s->obj_ctx); + + dout(1) << "====== req done req=" << hex << req << dec << " http_status=" << http_ret << " ======" << dendl; + delete req; + + // Mark as processed + return (void *)""; +} + /* * start up the RADOS connection and then handle HTTP messages as they come in */ @@ -485,9 +601,19 @@ int main(int argc, const char **argv) rest.register_resource(g_conf->rgw_admin_entry, admin_resource); } - RGWProcess process(g_ceph_context, store, g_conf->rgw_thread_pool_size, &rest); + struct mg_context *ctx; + const char *options[] = {"listening_ports", "8080", NULL}; + + RGWProcessEnv pe = { store, &rest }; + + ctx = mg_start(&mongoose_callback, &pe, options); + assert(ctx); + + RGWProcess process(g_ceph_context, &pe, g_conf->rgw_thread_pool_size); process.run(); + mg_stop(ctx); + if (do_swift) { swift_finalize(); } |