diff options
author | Yehuda Sadeh <yehuda@inktank.com> | 2013-06-11 10:43:08 -0700 |
---|---|---|
committer | Yehuda Sadeh <yehuda@inktank.com> | 2013-06-11 10:43:08 -0700 |
commit | 00743d50e09f48c71ce47f6b61907fa48a42f114 (patch) | |
tree | cf688868729f87892f4f7305f7261cbc52f7f1ec | |
parent | ea3efca3fd161d9ed56df3c28f558ce593aae470 (diff) | |
download | ceph-00743d50e09f48c71ce47f6b61907fa48a42f114.tar.gz |
rgw: propagate error from remote gateway when copying object
Also make sure that we don't continue iterating locally through
the object.
Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
-rw-r--r-- | src/rgw/rgw_rados.cc | 45 | ||||
-rw-r--r-- | src/rgw/rgw_rest_client.cc | 24 | ||||
-rw-r--r-- | src/rgw/rgw_rest_client.h | 6 |
3 files changed, 52 insertions, 23 deletions
diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index a9a4815036a..d115fc731a4 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -3566,7 +3566,11 @@ void RGWRados::get_obj_aio_completion_cb(completion_t c, void *arg) for (iter = bl_list.begin(); iter != bl_list.end(); ++iter) { bufferlist& bl = *iter; - d->client_cb->handle_data(bl, 0, bl.length()); + int r = d->client_cb->handle_data(bl, 0, bl.length()); + if (r < 0) { + d->set_cancelled(r); + break; + } } done_unlock: @@ -3585,10 +3589,16 @@ int RGWRados::get_obj_iterate_cb(void *ctx, RGWObjState *astate, RGWRadosCtx *rctx = static_cast<RGWRadosCtx *>(ctx); ObjectReadOperation op; struct get_obj_data *d = (struct get_obj_data *)arg; + string oid, key; + rgw_bucket bucket; + bufferlist *pbl; + AioCompletion *c; + + int r; if (is_head_obj) { /* only when reading from the head object do we need to do the atomic test */ - int r = append_atomic_test(rctx, obj, op, &astate); + r = append_atomic_test(rctx, obj, op, &astate); if (r < 0) return r; @@ -3597,8 +3607,10 @@ int RGWRados::get_obj_iterate_cb(void *ctx, RGWObjState *astate, unsigned chunk_len = min((uint64_t)astate->data.length() - obj_ofs, (uint64_t)len); d->data_lock.Lock(); - d->client_cb->handle_data(astate->data, obj_ofs, chunk_len); + r = d->client_cb->handle_data(astate->data, obj_ofs, chunk_len); d->data_lock.Unlock(); + if (r < 0) + return r; d->lock.Lock(); d->total_read += chunk_len; @@ -3612,33 +3624,35 @@ int RGWRados::get_obj_iterate_cb(void *ctx, RGWObjState *astate, } } - string oid, key; - rgw_bucket bucket; get_obj_bucket_and_oid_key(obj, bucket, oid, key); - bufferlist *pbl; - AioCompletion *c; - - d->add_io(obj_ofs, len, &pbl, &c); - d->throttle.get(len); if (d->is_cancelled()) { return d->get_err_code(); } + /* add io after we check that we're not cancelled, otherwise we're going to have trouble + * cleaning up + */ + d->add_io(obj_ofs, len, &pbl, &c); + ldout(cct, 20) << "rados->get_obj_iterate_cb oid=" << oid << " obj-ofs=" << obj_ofs << " read_ofs=" << read_ofs << " len=" << len << dendl; op.read(read_ofs, len, pbl, NULL); librados::IoCtx io_ctx(d->io_ctx); io_ctx.locator_set_key(key); - int r = io_ctx.aio_operate(oid, c, &op, NULL); + r = io_ctx.aio_operate(oid, c, &op, NULL); ldout(cct, 20) << "rados->aio_operate r=" << r << " bl.length=" << pbl->length() << dendl; + if (r < 0) + goto done_err; - if (r < 0) { - d->set_cancelled(r); - d->cancel_io(obj_ofs); - } + return 0; + +done_err: + ldout(cct, 20) << "cancelling io r=" << r << " obj_ofs=" << obj_ofs << dendl; + d->set_cancelled(r); + d->cancel_io(obj_ofs); return r; } @@ -3659,6 +3673,7 @@ int RGWRados::get_obj_iterate(void *ctx, void **handle, rgw_obj& obj, int r = iterate_obj(ctx, obj, ofs, end, cct->_conf->rgw_get_obj_max_req_size, _get_obj_iterate_cb, (void *)data); if (r < 0) { + data->cancel_all_io(); goto done; } diff --git a/src/rgw/rgw_rest_client.cc b/src/rgw/rgw_rest_client.cc index e3e1c0ba7f8..7a78bb8f9cd 100644 --- a/src/rgw/rgw_rest_client.cc +++ b/src/rgw/rgw_rest_client.cc @@ -33,7 +33,10 @@ int RGWRESTSimpleRequest::receive_header(void *ptr, size_t len) l++; if (strcmp(tok, "HTTP") == 0 || strncmp(tok, "HTTP/", 5) == 0) { - status = atoi(l); + http_status = atoi(l); + if (http_status == 100) /* 100-continue response */ + continue; + status = rgw_http_error_to_errno(http_status); } else { /* convert header field name to upper case */ char *src = tok; @@ -100,7 +103,7 @@ int RGWRESTSimpleRequest::execute(RGWAccessKey& key, const char *method, const c if (r < 0) return r; - return rgw_http_error_to_errno(status); + return status; } int RGWRESTSimpleRequest::send_data(void *ptr, size_t len) @@ -245,7 +248,7 @@ int RGWRESTSimpleRequest::forward_request(RGWAccessKey& key, req_info& info, siz outbl->claim(response); } - return rgw_http_error_to_errno(status); + return status; } class RGWRESTStreamOutCB : public RGWGetDataCB { @@ -277,6 +280,11 @@ RGWRESTStreamRequest::~RGWRESTStreamRequest() int RGWRESTStreamRequest::add_output_data(bufferlist& bl) { lock.Lock(); + if (status < 0) { + int ret = status; + lock.Unlock(); + return ret; + } pending_send.push_back(bl); lock.Unlock(); @@ -442,9 +450,9 @@ int RGWRESTStreamRequest::send_data(void *ptr, size_t len) dout(20) << "RGWRESTStreamRequest::send_data()" << dendl; lock.Lock(); - if (pending_send.empty()) { + if (pending_send.empty() || status < 0) { lock.Unlock(); - return 0; + return status; } list<bufferlist>::iterator iter = pending_send.begin(); @@ -483,5 +491,9 @@ int RGWRESTStreamRequest::send_data(void *ptr, size_t len) int RGWRESTStreamRequest::complete() { - return complete_request(handle); + int ret = complete_request(handle); + if (ret < 0) + return ret; + + return status; } diff --git a/src/rgw/rgw_rest_client.h b/src/rgw/rgw_rest_client.h index f3f9f7ff91c..cb744f4f695 100644 --- a/src/rgw/rgw_rest_client.h +++ b/src/rgw/rgw_rest_client.h @@ -11,6 +11,7 @@ class RGWRESTSimpleRequest : public RGWHTTPClient { protected: CephContext *cct; + int http_status; int status; string url; @@ -29,8 +30,9 @@ protected: int sign_request(RGWAccessKey& key, RGWEnv& env, req_info& info); public: RGWRESTSimpleRequest(CephContext *_cct, string& _url, list<pair<string, string> > *_headers, - list<pair<string, string> > *_params) : cct(_cct), status(0), url(_url), send_iter(NULL), - max_response(0) { + list<pair<string, string> > *_params) : cct(_cct), http_status(0), status(0), + url(_url), send_iter(NULL), + max_response(0) { if (_headers) headers = *_headers; |