summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYehuda Sadeh <yehuda@inktank.com>2013-06-11 10:43:08 -0700
committerYehuda Sadeh <yehuda@inktank.com>2013-06-11 10:43:08 -0700
commit00743d50e09f48c71ce47f6b61907fa48a42f114 (patch)
treecf688868729f87892f4f7305f7261cbc52f7f1ec
parentea3efca3fd161d9ed56df3c28f558ce593aae470 (diff)
downloadceph-00743d50e09f48c71ce47f6b61907fa48a42f114.tar.gz
rgw: propagate error from remote gateway when copying object
Also make sure that we don't continue iterating locally through the object. Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
-rw-r--r--src/rgw/rgw_rados.cc45
-rw-r--r--src/rgw/rgw_rest_client.cc24
-rw-r--r--src/rgw/rgw_rest_client.h6
3 files changed, 52 insertions, 23 deletions
diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc
index a9a4815036a..d115fc731a4 100644
--- a/src/rgw/rgw_rados.cc
+++ b/src/rgw/rgw_rados.cc
@@ -3566,7 +3566,11 @@ void RGWRados::get_obj_aio_completion_cb(completion_t c, void *arg)
for (iter = bl_list.begin(); iter != bl_list.end(); ++iter) {
bufferlist& bl = *iter;
- d->client_cb->handle_data(bl, 0, bl.length());
+ int r = d->client_cb->handle_data(bl, 0, bl.length());
+ if (r < 0) {
+ d->set_cancelled(r);
+ break;
+ }
}
done_unlock:
@@ -3585,10 +3589,16 @@ int RGWRados::get_obj_iterate_cb(void *ctx, RGWObjState *astate,
RGWRadosCtx *rctx = static_cast<RGWRadosCtx *>(ctx);
ObjectReadOperation op;
struct get_obj_data *d = (struct get_obj_data *)arg;
+ string oid, key;
+ rgw_bucket bucket;
+ bufferlist *pbl;
+ AioCompletion *c;
+
+ int r;
if (is_head_obj) {
/* only when reading from the head object do we need to do the atomic test */
- int r = append_atomic_test(rctx, obj, op, &astate);
+ r = append_atomic_test(rctx, obj, op, &astate);
if (r < 0)
return r;
@@ -3597,8 +3607,10 @@ int RGWRados::get_obj_iterate_cb(void *ctx, RGWObjState *astate,
unsigned chunk_len = min((uint64_t)astate->data.length() - obj_ofs, (uint64_t)len);
d->data_lock.Lock();
- d->client_cb->handle_data(astate->data, obj_ofs, chunk_len);
+ r = d->client_cb->handle_data(astate->data, obj_ofs, chunk_len);
d->data_lock.Unlock();
+ if (r < 0)
+ return r;
d->lock.Lock();
d->total_read += chunk_len;
@@ -3612,33 +3624,35 @@ int RGWRados::get_obj_iterate_cb(void *ctx, RGWObjState *astate,
}
}
- string oid, key;
- rgw_bucket bucket;
get_obj_bucket_and_oid_key(obj, bucket, oid, key);
- bufferlist *pbl;
- AioCompletion *c;
-
- d->add_io(obj_ofs, len, &pbl, &c);
-
d->throttle.get(len);
if (d->is_cancelled()) {
return d->get_err_code();
}
+ /* add io after we check that we're not cancelled, otherwise we're going to have trouble
+ * cleaning up
+ */
+ d->add_io(obj_ofs, len, &pbl, &c);
+
ldout(cct, 20) << "rados->get_obj_iterate_cb oid=" << oid << " obj-ofs=" << obj_ofs << " read_ofs=" << read_ofs << " len=" << len << dendl;
op.read(read_ofs, len, pbl, NULL);
librados::IoCtx io_ctx(d->io_ctx);
io_ctx.locator_set_key(key);
- int r = io_ctx.aio_operate(oid, c, &op, NULL);
+ r = io_ctx.aio_operate(oid, c, &op, NULL);
ldout(cct, 20) << "rados->aio_operate r=" << r << " bl.length=" << pbl->length() << dendl;
+ if (r < 0)
+ goto done_err;
- if (r < 0) {
- d->set_cancelled(r);
- d->cancel_io(obj_ofs);
- }
+ return 0;
+
+done_err:
+ ldout(cct, 20) << "cancelling io r=" << r << " obj_ofs=" << obj_ofs << dendl;
+ d->set_cancelled(r);
+ d->cancel_io(obj_ofs);
return r;
}
@@ -3659,6 +3673,7 @@ int RGWRados::get_obj_iterate(void *ctx, void **handle, rgw_obj& obj,
int r = iterate_obj(ctx, obj, ofs, end, cct->_conf->rgw_get_obj_max_req_size, _get_obj_iterate_cb, (void *)data);
if (r < 0) {
+ data->cancel_all_io();
goto done;
}
diff --git a/src/rgw/rgw_rest_client.cc b/src/rgw/rgw_rest_client.cc
index e3e1c0ba7f8..7a78bb8f9cd 100644
--- a/src/rgw/rgw_rest_client.cc
+++ b/src/rgw/rgw_rest_client.cc
@@ -33,7 +33,10 @@ int RGWRESTSimpleRequest::receive_header(void *ptr, size_t len)
l++;
if (strcmp(tok, "HTTP") == 0 || strncmp(tok, "HTTP/", 5) == 0) {
- status = atoi(l);
+ http_status = atoi(l);
+ if (http_status == 100) /* 100-continue response */
+ continue;
+ status = rgw_http_error_to_errno(http_status);
} else {
/* convert header field name to upper case */
char *src = tok;
@@ -100,7 +103,7 @@ int RGWRESTSimpleRequest::execute(RGWAccessKey& key, const char *method, const c
if (r < 0)
return r;
- return rgw_http_error_to_errno(status);
+ return status;
}
int RGWRESTSimpleRequest::send_data(void *ptr, size_t len)
@@ -245,7 +248,7 @@ int RGWRESTSimpleRequest::forward_request(RGWAccessKey& key, req_info& info, siz
outbl->claim(response);
}
- return rgw_http_error_to_errno(status);
+ return status;
}
class RGWRESTStreamOutCB : public RGWGetDataCB {
@@ -277,6 +280,11 @@ RGWRESTStreamRequest::~RGWRESTStreamRequest()
int RGWRESTStreamRequest::add_output_data(bufferlist& bl)
{
lock.Lock();
+ if (status < 0) {
+ int ret = status;
+ lock.Unlock();
+ return ret;
+ }
pending_send.push_back(bl);
lock.Unlock();
@@ -442,9 +450,9 @@ int RGWRESTStreamRequest::send_data(void *ptr, size_t len)
dout(20) << "RGWRESTStreamRequest::send_data()" << dendl;
lock.Lock();
- if (pending_send.empty()) {
+ if (pending_send.empty() || status < 0) {
lock.Unlock();
- return 0;
+ return status;
}
list<bufferlist>::iterator iter = pending_send.begin();
@@ -483,5 +491,9 @@ int RGWRESTStreamRequest::send_data(void *ptr, size_t len)
int RGWRESTStreamRequest::complete()
{
- return complete_request(handle);
+ int ret = complete_request(handle);
+ if (ret < 0)
+ return ret;
+
+ return status;
}
diff --git a/src/rgw/rgw_rest_client.h b/src/rgw/rgw_rest_client.h
index f3f9f7ff91c..cb744f4f695 100644
--- a/src/rgw/rgw_rest_client.h
+++ b/src/rgw/rgw_rest_client.h
@@ -11,6 +11,7 @@ class RGWRESTSimpleRequest : public RGWHTTPClient {
protected:
CephContext *cct;
+ int http_status;
int status;
string url;
@@ -29,8 +30,9 @@ protected:
int sign_request(RGWAccessKey& key, RGWEnv& env, req_info& info);
public:
RGWRESTSimpleRequest(CephContext *_cct, string& _url, list<pair<string, string> > *_headers,
- list<pair<string, string> > *_params) : cct(_cct), status(0), url(_url), send_iter(NULL),
- max_response(0) {
+ list<pair<string, string> > *_params) : cct(_cct), http_status(0), status(0),
+ url(_url), send_iter(NULL),
+ max_response(0) {
if (_headers)
headers = *_headers;