summaryrefslogtreecommitdiff
path: root/src/rgw/rgw_main.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/rgw/rgw_main.cc')
-rw-r--r--src/rgw/rgw_main.cc138
1 files changed, 132 insertions, 6 deletions
diff --git a/src/rgw/rgw_main.cc b/src/rgw/rgw_main.cc
index 944b59a5c8d..33181ec4ce0 100644
--- a/src/rgw/rgw_main.cc
+++ b/src/rgw/rgw_main.cc
@@ -43,6 +43,9 @@
#include "rgw_log.h"
#include "rgw_tools.h"
#include "rgw_resolve.h"
+#include "rgw_mongoose.h"
+
+#include "mongoose/mongoose.h"
#include <map>
#include <string>
@@ -125,6 +128,11 @@ struct RGWRequest
}
};
+struct RGWProcessEnv {
+ RGWRados *store;
+ RGWREST *rest;
+};
+
class RGWProcess {
RGWRados *store;
deque<RGWRequest *> m_req_queue;
@@ -132,6 +140,8 @@ class RGWProcess {
Throttle req_throttle;
RGWREST *rest;
+ RGWProcessEnv *process_env;
+
struct RGWWQ : public ThreadPool::WorkQueue<RGWRequest> {
RGWProcess *process;
RGWWQ(RGWProcess *p, time_t timeout, time_t suicide_timeout, ThreadPool *tp)
@@ -185,10 +195,10 @@ class RGWProcess {
uint64_t max_req_id;
public:
- RGWProcess(CephContext *cct, RGWRados *rgwstore, int num_threads, RGWREST *_rest)
- : store(rgwstore), m_tp(cct, "RGWProcess::m_tp", num_threads),
+ RGWProcess(CephContext *cct, RGWProcessEnv *pe, int num_threads)
+ : store(pe->store), m_tp(cct, "RGWProcess::m_tp", num_threads),
req_throttle(cct, "rgw_ops", num_threads * 2),
- rest(_rest),
+ rest(pe->rest),
req_wq(this, g_conf->rgw_op_thread_timeout,
g_conf->rgw_op_thread_suicide_timeout, &m_tp),
max_req_id(0) {}
@@ -258,15 +268,16 @@ void RGWProcess::handle_request(RGWRequest *req)
{
FCGX_Request *fcgx = &req->fcgx;
int ret;
- RGWEnv rgw_env;
RGWFCGX client_io(fcgx);
+ client_io.init(g_ceph_context);
+
req->log_init();
dout(1) << "====== starting new request req=" << hex << req << dec << " =====" << dendl;
perfcounter->inc(l_rgw_req);
- rgw_env.init(g_ceph_context, fcgx->envp);
+ RGWEnv& rgw_env = client_io.get_env();
struct req_state *s = req->init_state(g_ceph_context, &rgw_env);
s->obj_ctx = store->create_context(s);
@@ -356,6 +367,111 @@ public:
}
};
+static void *mongoose_callback(enum mg_event event,
+ struct mg_connection *conn) {
+ const struct mg_request_info *request_info = mg_get_request_info(conn);
+ RGWProcessEnv *pe = (RGWProcessEnv *)request_info->user_data;
+ RGWRados *store = pe->store;
+ RGWREST *rest = pe->rest;
+
+ if (event != MG_NEW_REQUEST)
+ return NULL;
+
+ RGWRequest *req = new RGWRequest;
+
+ int ret;
+ RGWMongoose client_io(conn);
+
+ client_io.init(g_ceph_context);
+
+ req->log_init();
+
+ dout(1) << "====== starting new request req=" << hex << req << dec << " =====" << dendl;
+ perfcounter->inc(l_rgw_req);
+
+ RGWEnv& rgw_env = client_io.get_env();
+
+ struct req_state *s = req->init_state(g_ceph_context, &rgw_env);
+ s->obj_ctx = store->create_context(s);
+ store->set_intent_cb(s->obj_ctx, call_log_intent);
+
+ req->log(s, "initializing");
+
+ RGWOp *op = NULL;
+ int init_error = 0;
+ RGWHandler *handler = rest->get_handler(store, s, &client_io, &init_error);
+ if (init_error != 0) {
+ abort_early(s, init_error);
+ goto done;
+ }
+
+ req->log(s, "getting op");
+ op = handler->get_op(store);
+ if (!op) {
+ abort_early(s, -ERR_METHOD_NOT_ALLOWED);
+ goto done;
+ }
+ req->op = op;
+
+ req->log(s, "authorizing");
+ ret = handler->authorize();
+ if (ret < 0) {
+ dout(10) << "failed to authorize request" << dendl;
+ abort_early(s, ret);
+ goto done;
+ }
+
+ if (s->user.suspended) {
+ dout(10) << "user is suspended, uid=" << s->user.user_id << dendl;
+ abort_early(s, -ERR_USER_SUSPENDED);
+ goto done;
+ }
+ req->log(s, "reading permissions");
+ ret = handler->read_permissions(op);
+ if (ret < 0) {
+ abort_early(s, ret);
+ goto done;
+ }
+
+ req->log(s, "verifying op permissions");
+ ret = op->verify_permission();
+ if (ret < 0) {
+ abort_early(s, ret);
+ goto done;
+ }
+
+ req->log(s, "verifying op params");
+ ret = op->verify_params();
+ if (ret < 0) {
+ abort_early(s, ret);
+ goto done;
+ }
+
+ if (s->expect_cont)
+ dump_continue(s);
+
+ req->log(s, "executing");
+ op->execute();
+ op->complete();
+done:
+ rgw_log_op(store, s, (op ? op->name() : "unknown"));
+
+ int http_ret = s->err.http_ret;
+
+ req->log_format(s, "http status=%d", http_ret);
+
+ if (handler)
+ handler->put_op(op);
+ rest->put_handler(handler);
+ store->destroy_context(s->obj_ctx);
+
+ dout(1) << "====== req done req=" << hex << req << dec << " http_status=" << http_ret << " ======" << dendl;
+ delete req;
+
+ // Mark as processed
+ return (void *)"";
+}
+
/*
* start up the RADOS connection and then handle HTTP messages as they come in
*/
@@ -485,9 +601,19 @@ int main(int argc, const char **argv)
rest.register_resource(g_conf->rgw_admin_entry, admin_resource);
}
- RGWProcess process(g_ceph_context, store, g_conf->rgw_thread_pool_size, &rest);
+ struct mg_context *ctx;
+ const char *options[] = {"listening_ports", "8080", NULL};
+
+ RGWProcessEnv pe = { store, &rest };
+
+ ctx = mg_start(&mongoose_callback, &pe, options);
+ assert(ctx);
+
+ RGWProcess process(g_ceph_context, &pe, g_conf->rgw_thread_pool_size);
process.run();
+ mg_stop(ctx);
+
if (do_swift) {
swift_finalize();
}