summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSage Weil <sage@inktank.com>2013-02-22 14:23:45 -0800
committerSage Weil <sage@inktank.com>2013-02-22 14:23:45 -0800
commite4fd70fcec3a9abb7f19517326e46f58031c4196 (patch)
tree999606a3c247af90874bdad1b5bacac058fa69e2
parente03657e45226fc21dfb3257c0d2ee47cf67ee8b1 (diff)
parenta1ae8562877d1b902918e866a1699214090c40bd (diff)
downloadceph-e4fd70fcec3a9abb7f19517326e46f58031c4196.tar.gz
Merge remote-tracking branch 'gh/wip-rbd-flatten-deadlock'
Reviewed-by: Sage Weil <sage@inktank.com>
-rw-r--r--src/common/RWLock.h27
-rw-r--r--src/librbd/AioRequest.cc10
-rw-r--r--src/librbd/ImageCtx.cc36
-rw-r--r--src/librbd/ImageCtx.h7
-rw-r--r--src/librbd/LibrbdWriteback.cc16
-rw-r--r--src/librbd/internal.cc229
-rwxr-xr-xsrc/test/run-rbd-tests2
7 files changed, 167 insertions, 160 deletions
diff --git a/src/common/RWLock.h b/src/common/RWLock.h
index f8915ed288e..6fabc8c3083 100644
--- a/src/common/RWLock.h
+++ b/src/common/RWLock.h
@@ -48,7 +48,7 @@ public:
// read
void get_read() {
if (g_lockdep) id = lockdep_will_lock(name, id);
- pthread_rwlock_rdlock(&L);
+ pthread_rwlock_rdlock(&L);
if (g_lockdep) id = lockdep_locked(name, id);
}
bool try_get_read() {
@@ -78,6 +78,31 @@ public:
void put_write() {
unlock();
}
+
+public:
+ class RLocker {
+ RWLock &m_lock;
+
+ public:
+ RLocker(RWLock& lock) : m_lock(lock) {
+ m_lock.get_read();
+ }
+ ~RLocker() {
+ m_lock.put_read();
+ }
+ };
+
+ class WLocker {
+ RWLock &m_lock;
+
+ public:
+ WLocker(RWLock& lock) : m_lock(lock) {
+ m_lock.get_write();
+ }
+ ~WLocker() {
+ m_lock.put_write();
+ }
+ };
};
#endif // !_Mutex_Posix_
diff --git a/src/librbd/AioRequest.cc b/src/librbd/AioRequest.cc
index b9a76e48e0a..95519fb372d 100644
--- a/src/librbd/AioRequest.cc
+++ b/src/librbd/AioRequest.cc
@@ -4,6 +4,7 @@
#include "common/ceph_context.h"
#include "common/dout.h"
#include "common/Mutex.h"
+#include "common/RWLock.h"
#include "librbd/AioCompletion.h"
#include "librbd/ImageCtx.h"
@@ -50,7 +51,6 @@ namespace librbd {
void AioRequest::read_from_parent(vector<pair<uint64_t,uint64_t> >& image_extents)
{
assert(!m_parent_completion);
- assert(m_ictx->parent_lock.is_locked());
m_parent_completion = aio_create_completion_internal(this, rbd_req_cb);
ldout(m_ictx->cct, 20) << "read_from_parent this = " << this
<< " parent completion " << m_parent_completion
@@ -68,8 +68,8 @@ namespace librbd {
<< " r = " << r << dendl;
if (!m_tried_parent && r == -ENOENT) {
- Mutex::Locker l(m_ictx->snap_lock);
- Mutex::Locker l2(m_ictx->parent_lock);
+ RWLock::RLocker l(m_ictx->snap_lock);
+ RWLock::RLocker l2(m_ictx->parent_lock);
// calculate reverse mapping onto the image
vector<pair<uint64_t,uint64_t> > image_extents;
@@ -158,8 +158,8 @@ namespace librbd {
if (r == -ENOENT) {
- Mutex::Locker l(m_ictx->snap_lock);
- Mutex::Locker l2(m_ictx->parent_lock);
+ RWLock::RLocker l(m_ictx->snap_lock);
+ RWLock::RLocker l2(m_ictx->parent_lock);
/*
* Parent may have disappeared; if so, recover by using
diff --git a/src/librbd/ImageCtx.cc b/src/librbd/ImageCtx.cc
index 7909f8c620c..f2ffd79b662 100644
--- a/src/librbd/ImageCtx.cc
+++ b/src/librbd/ImageCtx.cc
@@ -224,7 +224,6 @@ namespace librbd {
int ImageCtx::snap_set(string in_snap_name)
{
- assert(snap_lock.is_locked());
map<string, SnapInfo>::iterator it = snaps_by_name.find(in_snap_name);
if (it != snaps_by_name.end()) {
snap_name = in_snap_name;
@@ -238,7 +237,6 @@ namespace librbd {
void ImageCtx::snap_unset()
{
- assert(snap_lock.is_locked());
snap_id = CEPH_NOSNAP;
snap_name = "";
snap_exists = true;
@@ -247,7 +245,6 @@ namespace librbd {
snap_t ImageCtx::get_snap_id(string in_snap_name) const
{
- assert(snap_lock.is_locked());
map<string, SnapInfo>::const_iterator it = snaps_by_name.find(in_snap_name);
if (it != snaps_by_name.end())
return it->second.id;
@@ -256,7 +253,6 @@ namespace librbd {
int ImageCtx::get_snap_name(snapid_t in_snap_id, string *out_snap_name) const
{
- assert(snap_lock.is_locked());
map<string, SnapInfo>::const_iterator it;
for (it = snaps_by_name.begin(); it != snaps_by_name.end(); it++) {
@@ -270,7 +266,6 @@ namespace librbd {
int ImageCtx::get_parent_spec(snapid_t in_snap_id, parent_spec *out_pspec)
{
- assert(snap_lock.is_locked());
map<string, SnapInfo>::iterator it;
for (it = snaps_by_name.begin(); it != snaps_by_name.end(); it++) {
@@ -322,7 +317,6 @@ namespace librbd {
int ImageCtx::is_snap_protected(string in_snap_name, bool *is_protected) const
{
- assert(snap_lock.is_locked());
map<string, SnapInfo>::const_iterator it = snaps_by_name.find(in_snap_name);
if (it != snaps_by_name.end()) {
*is_protected =
@@ -335,7 +329,6 @@ namespace librbd {
int ImageCtx::is_snap_unprotected(string in_snap_name,
bool *is_unprotected) const
{
- assert(snap_lock.is_locked());
map<string, SnapInfo>::const_iterator it = snaps_by_name.find(in_snap_name);
if (it != snaps_by_name.end()) {
*is_unprotected =
@@ -347,7 +340,6 @@ namespace librbd {
int ImageCtx::get_snap_size(string in_snap_name, uint64_t *out_size) const
{
- assert(snap_lock.is_locked());
map<string, SnapInfo>::const_iterator it = snaps_by_name.find(in_snap_name);
if (it != snaps_by_name.end()) {
*out_size = it->second.size;
@@ -361,7 +353,6 @@ namespace librbd {
parent_info parent,
uint8_t protection_status)
{
- assert(snap_lock.is_locked());
snaps.push_back(id);
SnapInfo info(id, in_size, features, parent, protection_status);
snaps_by_name.insert(pair<string, SnapInfo>(in_snap_name, info));
@@ -369,8 +360,6 @@ namespace librbd {
uint64_t ImageCtx::get_image_size(snap_t in_snap_id) const
{
- assert(md_lock.is_locked());
- assert(snap_lock.is_locked());
if (in_snap_id == CEPH_NOSNAP) {
return size;
}
@@ -386,8 +375,6 @@ namespace librbd {
int ImageCtx::get_features(snap_t in_snap_id, uint64_t *out_features) const
{
- assert(md_lock.is_locked());
- assert(snap_lock.is_locked());
if (in_snap_id == CEPH_NOSNAP) {
*out_features = features;
return 0;
@@ -405,8 +392,6 @@ namespace librbd {
int64_t ImageCtx::get_parent_pool_id(snap_t in_snap_id) const
{
- assert(snap_lock.is_locked());
- assert(parent_lock.is_locked());
if (in_snap_id == CEPH_NOSNAP) {
return parent_md.spec.pool_id;
}
@@ -422,8 +407,6 @@ namespace librbd {
string ImageCtx::get_parent_image_id(snap_t in_snap_id) const
{
- assert(snap_lock.is_locked());
- assert(parent_lock.is_locked());
if (in_snap_id == CEPH_NOSNAP) {
return parent_md.spec.image_id;
}
@@ -439,8 +422,6 @@ namespace librbd {
uint64_t ImageCtx::get_parent_snap_id(snap_t in_snap_id) const
{
- assert(snap_lock.is_locked());
- assert(parent_lock.is_locked());
if (in_snap_id == CEPH_NOSNAP) {
return parent_md.spec.snap_id;
}
@@ -456,8 +437,6 @@ namespace librbd {
int ImageCtx::get_parent_overlap(snap_t in_snap_id, uint64_t *overlap) const
{
- assert(snap_lock.is_locked());
- assert(parent_lock.is_locked());
if (in_snap_id == CEPH_NOSNAP) {
*overlap = parent_md.overlap;
return 0;
@@ -475,9 +454,9 @@ namespace librbd {
void ImageCtx::aio_read_from_cache(object_t o, bufferlist *bl, size_t len,
uint64_t off, Context *onfinish) {
- snap_lock.Lock();
+ snap_lock.get_read();
ObjectCacher::OSDRead *rd = object_cacher->prepare_read(snap_id, bl, 0);
- snap_lock.Unlock();
+ snap_lock.put_read();
ObjectExtent extent(o, 0 /* a lie */, off, len);
extent.oloc.pool = data_ctx.get_id();
extent.buffer_extents.push_back(make_pair(0, len));
@@ -491,10 +470,10 @@ namespace librbd {
void ImageCtx::write_to_cache(object_t o, bufferlist& bl, size_t len,
uint64_t off) {
- snap_lock.Lock();
+ snap_lock.get_read();
ObjectCacher::OSDWrite *wr = object_cacher->prepare_write(snapc, bl,
utime_t(), 0);
- snap_lock.Unlock();
+ snap_lock.put_read();
ObjectExtent extent(o, 0, off, len);
extent.oloc.pool = data_ctx.get_id();
extent.buffer_extents.push_back(make_pair(0, len));
@@ -542,14 +521,13 @@ namespace librbd {
}
void ImageCtx::shutdown_cache() {
- md_lock.Lock();
+ md_lock.get_write();
invalidate_cache();
- md_lock.Unlock();
+ md_lock.put_write();
object_cacher->stop();
}
void ImageCtx::invalidate_cache() {
- assert(md_lock.is_locked());
if (!object_cacher)
return;
cache_lock.Lock();
@@ -582,8 +560,6 @@ namespace librbd {
size_t ImageCtx::parent_io_len(uint64_t offset, size_t length,
snap_t in_snap_id)
{
- assert(snap_lock.is_locked());
- assert(parent_lock.is_locked());
uint64_t overlap = 0;
get_parent_overlap(in_snap_id, &overlap);
diff --git a/src/librbd/ImageCtx.h b/src/librbd/ImageCtx.h
index b24639906b2..99a07a6bf1c 100644
--- a/src/librbd/ImageCtx.h
+++ b/src/librbd/ImageCtx.h
@@ -11,6 +11,7 @@
#include <vector>
#include "common/Mutex.h"
+#include "common/RWLock.h"
#include "common/snap_types.h"
#include "include/buffer.h"
#include "include/rbd/librbd.hpp"
@@ -59,12 +60,12 @@ namespace librbd {
* Lock ordering:
* md_lock, cache_lock, snap_lock, parent_lock, refresh_lock
*/
- Mutex md_lock; // protects access to the mutable image metadata that
+ RWLock md_lock; // protects access to the mutable image metadata that
// isn't guarded by other locks below
// (size, features, image locks, etc)
Mutex cache_lock; // used as client_lock for the ObjectCacher
- Mutex snap_lock; // protects snapshot-related member variables:
- Mutex parent_lock; // protects parent_md and parent
+ RWLock snap_lock; // protects snapshot-related member variables:
+ RWLock parent_lock; // protects parent_md and parent
Mutex refresh_lock; // protects refresh_seq and last_refresh
bool old_format;
diff --git a/src/librbd/LibrbdWriteback.cc b/src/librbd/LibrbdWriteback.cc
index 6d69a8297e7..1689ad91860 100644
--- a/src/librbd/LibrbdWriteback.cc
+++ b/src/librbd/LibrbdWriteback.cc
@@ -87,13 +87,13 @@ namespace librbd {
bool LibrbdWriteback::may_copy_on_write(const object_t& oid, uint64_t read_off, uint64_t read_len, snapid_t snapid)
{
- m_ictx->snap_lock.Lock();
+ m_ictx->snap_lock.get_read();
librados::snap_t snap_id = m_ictx->snap_id;
- m_ictx->parent_lock.Lock();
+ m_ictx->parent_lock.get_read();
uint64_t overlap = 0;
m_ictx->get_parent_overlap(snap_id, &overlap);
- m_ictx->parent_lock.Unlock();
- m_ictx->snap_lock.Unlock();
+ m_ictx->parent_lock.put_read();
+ m_ictx->snap_lock.put_read();
uint64_t object_no = oid_to_object_no(oid.name, m_ictx->object_prefix);
@@ -116,13 +116,13 @@ namespace librbd {
uint64_t trunc_size, __u32 trunc_seq,
Context *oncommit)
{
- m_ictx->snap_lock.Lock();
+ m_ictx->snap_lock.get_read();
librados::snap_t snap_id = m_ictx->snap_id;
- m_ictx->parent_lock.Lock();
+ m_ictx->parent_lock.get_read();
uint64_t overlap = 0;
m_ictx->get_parent_overlap(snap_id, &overlap);
- m_ictx->parent_lock.Unlock();
- m_ictx->snap_lock.Unlock();
+ m_ictx->parent_lock.put_read();
+ m_ictx->snap_lock.put_read();
uint64_t object_no = oid_to_object_no(oid.name, m_ictx->object_prefix);
diff --git a/src/librbd/internal.cc b/src/librbd/internal.cc
index 33b948d2310..d913b314edd 100644
--- a/src/librbd/internal.cc
+++ b/src/librbd/internal.cc
@@ -109,11 +109,11 @@ namespace librbd {
void image_info(ImageCtx *ictx, image_info_t& info, size_t infosize)
{
int obj_order = ictx->order;
- ictx->md_lock.Lock();
- ictx->snap_lock.Lock();
+ ictx->md_lock.get_read();
+ ictx->snap_lock.get_read();
info.size = ictx->get_image_size(ictx->snap_id);
- ictx->snap_lock.Unlock();
- ictx->md_lock.Unlock();
+ ictx->snap_lock.put_read();
+ ictx->md_lock.put_read();
info.obj_size = 1ULL << obj_order;
info.num_objs = howmany(info.size, ictx->get_object_size());
info.order = obj_order;
@@ -143,7 +143,6 @@ namespace librbd {
void trim_image(ImageCtx *ictx, uint64_t newsize, ProgressContext& prog_ctx)
{
- assert(ictx->md_lock.is_locked());
CephContext *cct = (CephContext *)ictx->data_ctx.cct();
uint64_t size = ictx->get_current_size();
@@ -247,7 +246,6 @@ namespace librbd {
uint64_t ver;
if (ictx) {
- assert(ictx->md_lock.is_locked());
ictx->refresh_lock.Lock();
ldout(ictx->cct, 20) << "notify_change refresh_seq = " << ictx->refresh_seq
<< " last_refresh = " << ictx->last_refresh << dendl;
@@ -310,7 +308,6 @@ namespace librbd {
int rollback_image(ImageCtx *ictx, uint64_t snap_id,
ProgressContext& prog_ctx)
{
- assert(ictx->md_lock.is_locked());
uint64_t numseg = ictx->get_num_objects();
uint64_t bsize = ictx->get_object_size();
@@ -431,7 +428,7 @@ namespace librbd {
if (r < 0)
return r;
- Mutex::Locker l(ictx->md_lock);
+ RWLock::RLocker l(ictx->md_lock);
do {
r = add_snap(ictx, snap_name);
} while (r == -ESTALE);
@@ -475,18 +472,18 @@ namespace librbd {
if (r < 0)
return r;
- Mutex::Locker l(ictx->md_lock);
+ RWLock::RLocker l(ictx->md_lock);
snap_t snap_id;
{
// block for purposes of auto-destruction of l2 on early return
- Mutex::Locker l2(ictx->snap_lock);
+ RWLock::RLocker l2(ictx->snap_lock);
snap_id = ictx->get_snap_id(snap_name);
if (snap_id == CEPH_NOSNAP)
return -ENOENT;
parent_spec our_pspec;
- Mutex::Locker l3(ictx->parent_lock);
+ RWLock::RLocker l3(ictx->parent_lock);
r = ictx->get_parent_spec(snap_id, &our_pspec);
if (r < 0) {
lderr(ictx->cct) << "snap_remove: can't get parent spec" << dendl;
@@ -529,8 +526,8 @@ namespace librbd {
if (r < 0)
return r;
- Mutex::Locker l(ictx->md_lock);
- Mutex::Locker l2(ictx->snap_lock);
+ RWLock::RLocker l(ictx->md_lock);
+ RWLock::RLocker l2(ictx->snap_lock);
uint64_t features;
ictx->get_features(ictx->snap_id, &features);
if ((features & RBD_FEATURE_LAYERING) == 0) {
@@ -549,7 +546,7 @@ namespace librbd {
if (is_protected)
return -EBUSY;
-
+
r = cls_client::set_protection_status(&ictx->md_ctx,
ictx->header_oid,
snap_id,
@@ -572,8 +569,8 @@ namespace librbd {
if (r < 0)
return r;
- Mutex::Locker l(ictx->md_lock);
- Mutex::Locker l2(ictx->snap_lock);
+ RWLock::RLocker l(ictx->md_lock);
+ RWLock::RLocker l2(ictx->snap_lock);
uint64_t features;
ictx->get_features(ictx->snap_id, &features);
if ((features & RBD_FEATURE_LAYERING) == 0) {
@@ -661,7 +658,7 @@ reprotect_and_return_err:
if (r < 0)
return r;
- Mutex::Locker l(ictx->snap_lock);
+ RWLock::RLocker l(ictx->snap_lock);
bool is_unprotected;
r = ictx->is_snap_unprotected(snap_name, &is_unprotected);
// consider both PROTECTED or UNPROTECTING to be 'protected',
@@ -919,13 +916,13 @@ reprotect_and_return_err:
goto err_close_parent;
}
- p_imctx->md_lock.Lock();
- p_imctx->snap_lock.Lock();
+ p_imctx->md_lock.get_read();
+ p_imctx->snap_lock.get_read();
p_imctx->get_features(p_imctx->snap_id, &p_features);
size = p_imctx->get_image_size(p_imctx->snap_id);
p_imctx->is_snap_protected(p_imctx->snap_name, &snap_protected);
- p_imctx->snap_lock.Unlock();
- p_imctx->md_lock.Unlock();
+ p_imctx->snap_lock.put_read();
+ p_imctx->md_lock.put_read();
if ((p_features & RBD_FEATURE_LAYERING) != RBD_FEATURE_LAYERING) {
lderr(cct) << "parent image must support layering" << dendl;
@@ -968,14 +965,14 @@ reprotect_and_return_err:
goto err_close_child;
}
- p_imctx->md_lock.Lock();
+ p_imctx->md_lock.get_write();
r = ictx_refresh(p_imctx);
- p_imctx->md_lock.Unlock();
+ p_imctx->md_lock.put_write();
if (r == 0) {
- p_imctx->snap_lock.Lock();
+ p_imctx->snap_lock.get_read();
r = p_imctx->is_snap_protected(p_imctx->snap_name, &snap_protected);
- p_imctx->snap_lock.Unlock();
+ p_imctx->snap_lock.put_read();
}
if (r < 0 || !snap_protected) {
// we lost the race with unprotect
@@ -1145,8 +1142,8 @@ reprotect_and_return_err:
int r = ictx_check(ictx);
if (r < 0)
return r;
- Mutex::Locker l(ictx->md_lock);
- Mutex::Locker l2(ictx->snap_lock);
+ RWLock::RLocker l(ictx->md_lock);
+ RWLock::RLocker l2(ictx->snap_lock);
*size = ictx->get_image_size(ictx->snap_id);
return 0;
}
@@ -1156,8 +1153,8 @@ reprotect_and_return_err:
int r = ictx_check(ictx);
if (r < 0)
return r;
- Mutex::Locker l(ictx->md_lock);
- Mutex::Locker l2(ictx->snap_lock);
+ RWLock::RLocker l(ictx->md_lock);
+ RWLock::RLocker l2(ictx->snap_lock);
return ictx->get_features(ictx->snap_id, features);
}
@@ -1166,15 +1163,13 @@ reprotect_and_return_err:
int r = ictx_check(ictx);
if (r < 0)
return r;
- Mutex::Locker l(ictx->snap_lock);
- Mutex::Locker l2(ictx->parent_lock);
+ RWLock::RLocker l(ictx->snap_lock);
+ RWLock::RLocker l2(ictx->parent_lock);
return ictx->get_parent_overlap(ictx->snap_id, overlap);
}
int open_parent(ImageCtx *ictx)
{
- assert(ictx->snap_lock.is_locked());
- assert(ictx->parent_lock.is_locked());
string pool_name;
Rados rados(ictx->md_ctx);
@@ -1212,17 +1207,17 @@ reprotect_and_return_err:
return r;
}
- ictx->parent->snap_lock.Lock();
+ ictx->parent->snap_lock.get_write();
r = ictx->parent->get_snap_name(parent_snap_id, &ictx->parent->snap_name);
if (r < 0) {
lderr(ictx->cct) << "parent snapshot does not exist" << dendl;
- ictx->parent->snap_lock.Unlock();
+ ictx->parent->snap_lock.put_write();
close_image(ictx->parent);
ictx->parent = NULL;
return r;
}
ictx->parent->snap_set(ictx->parent->snap_name);
- ictx->parent->snap_lock.Unlock();
+ ictx->parent->snap_lock.put_write();
return 0;
}
@@ -1234,8 +1229,8 @@ reprotect_and_return_err:
if (r < 0)
return r;
- Mutex::Locker l(ictx->snap_lock);
- Mutex::Locker l2(ictx->parent_lock);
+ RWLock::RLocker l(ictx->snap_lock);
+ RWLock::RLocker l2(ictx->parent_lock);
parent_spec parent_spec;
@@ -1264,7 +1259,7 @@ reprotect_and_return_err:
}
if (parent_snap_name) {
- Mutex::Locker l(ictx->parent->snap_lock);
+ RWLock::RLocker l(ictx->parent->snap_lock);
r = ictx->parent->get_snap_name(parent_spec.snap_id,
parent_snap_name);
if (r < 0) {
@@ -1309,14 +1304,14 @@ reprotect_and_return_err:
old_format = ictx->old_format;
unknown_format = false;
id = ictx->id;
- ictx->md_lock.Lock();
+ ictx->md_lock.get_read();
trim_image(ictx, 0, prog_ctx);
- ictx->md_lock.Unlock();
+ ictx->md_lock.put_read();
- ictx->parent_lock.Lock();
+ ictx->parent_lock.get_read();
// struct assignment
parent_info parent_info = ictx->parent_md;
- ictx->parent_lock.Unlock();
+ ictx->parent_lock.put_read();
r = cls_client::remove_child(&ictx->md_ctx, RBD_CHILDREN,
parent_info.spec, id);
@@ -1373,7 +1368,6 @@ reprotect_and_return_err:
int resize_helper(ImageCtx *ictx, uint64_t size, ProgressContext& prog_ctx)
{
- assert(ictx->md_lock.is_locked());
CephContext *cct = ictx->cct;
if (size == ictx->size) {
@@ -1431,7 +1425,7 @@ reprotect_and_return_err:
if (r < 0)
return r;
- Mutex::Locker l(ictx->md_lock);
+ RWLock::WLocker l(ictx->md_lock);
if (size < ictx->size && ictx->object_cacher) {
// need to invalidate since we're deleting objects, and
// ObjectCacher doesn't track non-existent objects
@@ -1454,7 +1448,7 @@ reprotect_and_return_err:
return r;
bufferlist bl, bl2;
- Mutex::Locker l(ictx->snap_lock);
+ RWLock::RLocker l(ictx->snap_lock);
for (map<string, SnapInfo>::iterator it = ictx->snaps_by_name.begin();
it != ictx->snaps_by_name.end(); ++it) {
snap_info_t info;
@@ -1469,8 +1463,6 @@ reprotect_and_return_err:
int add_snap(ImageCtx *ictx, const char *snap_name)
{
- assert(ictx->md_lock.is_locked());
-
uint64_t snap_id;
int r = ictx->md_ctx.selfmanaged_snap_create(&snap_id);
@@ -1499,14 +1491,12 @@ reprotect_and_return_err:
int rm_snap(ImageCtx *ictx, const char *snap_name)
{
- assert(ictx->md_lock.is_locked());
-
int r;
if (ictx->old_format) {
r = cls_client::old_snapshot_remove(&ictx->md_ctx,
ictx->header_oid, snap_name);
} else {
- Mutex::Locker l(ictx->snap_lock);
+ RWLock::RLocker l(ictx->snap_lock);
r = cls_client::snapshot_remove(&ictx->md_ctx,
ictx->header_oid,
ictx->get_snap_id(snap_name));
@@ -1530,7 +1520,7 @@ reprotect_and_return_err:
ictx->refresh_lock.Unlock();
if (needs_refresh) {
- Mutex::Locker l(ictx->md_lock);
+ RWLock::WLocker l(ictx->md_lock);
int r = ictx_refresh(ictx);
if (r < 0) {
@@ -1543,8 +1533,6 @@ reprotect_and_return_err:
}
int refresh_parent(ImageCtx *ictx) {
- assert(ictx->snap_lock.is_locked());
- assert(ictx->parent_lock.is_locked());
// close the parent if it changed or this image no longer needs
// to read from it
int r;
@@ -1578,7 +1566,6 @@ reprotect_and_return_err:
int ictx_refresh(ImageCtx *ictx)
{
CephContext *cct = ictx->cct;
- assert(ictx->md_lock.is_locked());
bufferlist bl, bl2;
ldout(cct, 20) << "ictx_refresh " << ictx << dendl;
@@ -1596,9 +1583,9 @@ reprotect_and_return_err:
vector<parent_info> snap_parents;
vector<uint8_t> snap_protection;
{
- Mutex::Locker l(ictx->snap_lock);
+ RWLock::WLocker l(ictx->snap_lock);
{
- Mutex::Locker l2(ictx->parent_lock);
+ RWLock::WLocker l2(ictx->parent_lock);
ictx->lockers.clear();
if (ictx->old_format) {
r = read_header(ictx->md_ctx, ictx->header_oid, &ictx->header, NULL);
@@ -1748,8 +1735,8 @@ reprotect_and_return_err:
if (r < 0)
return r;
- Mutex::Locker l(ictx->md_lock);
- Mutex::Locker l2(ictx->snap_lock);
+ RWLock::WLocker l(ictx->md_lock);
+ RWLock::WLocker l2(ictx->snap_lock);
if (!ictx->snap_exists)
return -ENOENT;
@@ -1825,11 +1812,11 @@ reprotect_and_return_err:
<< " -> " << destname << dendl;
int order = src->order;
- src->md_lock.Lock();
- src->snap_lock.Lock();
+ src->md_lock.get_read();
+ src->snap_lock.get_read();
uint64_t src_size = src->get_image_size(src->snap_id);
- src->snap_lock.Unlock();
- src->md_lock.Unlock();
+ src->snap_lock.put_read();
+ src->md_lock.put_read();
int r = create(dest_md_ctx, destname, src_size, src->old_format,
src->features, &order, src->stripe_unit, src->stripe_count);
@@ -1855,17 +1842,17 @@ reprotect_and_return_err:
{
CopyProgressCtx cp(prog_ctx);
- src->md_lock.Lock();
- src->snap_lock.Lock();
+ src->md_lock.get_read();
+ src->snap_lock.get_read();
uint64_t src_size = src->get_image_size(src->snap_id);
- src->snap_lock.Unlock();
- src->md_lock.Unlock();
+ src->snap_lock.put_read();
+ src->md_lock.put_read();
- dest->md_lock.Lock();
- dest->snap_lock.Lock();
+ dest->md_lock.get_read();
+ dest->snap_lock.get_read();
uint64_t dest_size = dest->get_image_size(dest->snap_id);
- dest->snap_lock.Unlock();
- dest->md_lock.Unlock();
+ dest->snap_lock.put_read();
+ dest->md_lock.put_read();
if (dest_size < src_size) {
lderr(src->cct) << " src size " << src_size << " != dest size " << dest_size << dendl;
@@ -1889,8 +1876,8 @@ reprotect_and_return_err:
int _snap_set(ImageCtx *ictx, const char *snap_name)
{
- Mutex::Locker l1(ictx->snap_lock);
- Mutex::Locker l2(ictx->parent_lock);
+ RWLock::WLocker l1(ictx->snap_lock);
+ RWLock::WLocker l2(ictx->parent_lock);
int r;
if ((snap_name != NULL) && (strlen(snap_name) != 0)) {
r = ictx->snap_set(snap_name);
@@ -1935,9 +1922,9 @@ reprotect_and_return_err:
}
}
- ictx->md_lock.Lock();
+ ictx->md_lock.get_write();
r = ictx_refresh(ictx);
- ictx->md_lock.Unlock();
+ ictx->md_lock.put_write();
if (r < 0)
goto err_close;
@@ -1985,28 +1972,36 @@ reprotect_and_return_err:
return r;
}
- Mutex::Locker l(ictx->md_lock);
- Mutex::Locker l2(ictx->snap_lock);
- Mutex::Locker l3(ictx->parent_lock);
+ uint64_t object_size;
+ uint64_t period;
+ uint64_t overlap;
+ uint64_t overlap_periods;
+ uint64_t overlap_objects;
- // can't flatten a non-clone
- if (ictx->parent_md.spec.pool_id == -1) {
- lderr(ictx->cct) << "image has no parent" << dendl;
- return -EINVAL;
- }
- if (ictx->snap_id != CEPH_NOSNAP || ictx->read_only) {
- lderr(ictx->cct) << "snapshots cannot be flattened" << dendl;
- return -EROFS;
- }
+ {
+ RWLock::RLocker l(ictx->md_lock);
+ RWLock::RLocker l2(ictx->snap_lock);
+ RWLock::RLocker l3(ictx->parent_lock);
- assert(ictx->parent != NULL);
- assert(ictx->parent_md.overlap <= ictx->size);
+ // can't flatten a non-clone
+ if (ictx->parent_md.spec.pool_id == -1) {
+ lderr(ictx->cct) << "image has no parent" << dendl;
+ return -EINVAL;
+ }
+ if (ictx->snap_id != CEPH_NOSNAP || ictx->read_only) {
+ lderr(ictx->cct) << "snapshots cannot be flattened" << dendl;
+ return -EROFS;
+ }
- uint64_t object_size = ictx->get_object_size();
- uint64_t period = ictx->get_stripe_period();
- uint64_t overlap = ictx->parent_md.overlap;
- uint64_t overlap_periods = (overlap + period - 1) / period;
- uint64_t overlap_objects = overlap_periods * ictx->get_stripe_count();
+ assert(ictx->parent != NULL);
+ assert(ictx->parent_md.overlap <= ictx->size);
+
+ object_size = ictx->get_object_size();
+ period = ictx->get_stripe_period();
+ overlap = ictx->parent_md.overlap;
+ overlap_periods = (overlap + period - 1) / period;
+ overlap_objects = overlap_periods * ictx->get_stripe_count();
+ }
char *buf = new char[object_size];
@@ -2021,6 +2016,13 @@ reprotect_and_return_err:
uint64_t object_overlap = ictx->prune_parent_extents(objectx, overlap);
assert(object_overlap <= object_size);
+ RWLock::RLocker l(ictx->parent_lock);
+ // stop early if the parent went away - it just means
+ // another flatten finished first, so this one is useless.
+ if (!ictx->parent) {
+ r = 0;
+ goto err;
+ }
if ((r = read(ictx->parent, objectx, buf, NULL)) < 0) {
lderr(ictx->cct) << "reading from parent failed" << dendl;
goto err;
@@ -2049,15 +2051,18 @@ reprotect_and_return_err:
// and if there are no snaps, remove from the children object as well
// (if snapshots remain, they have their own parent info, and the child
// will be removed when the last snap goes away)
+ ictx->snap_lock.get_read();
if (ictx->snaps.empty()) {
ldout(ictx->cct, 2) << "removing child from children list..." << dendl;
int r = cls_client::remove_child(&ictx->md_ctx, RBD_CHILDREN,
ictx->parent_md.spec, ictx->id);
if (r < 0) {
lderr(ictx->cct) << "error removing child from children list" << dendl;
+ ictx->snap_lock.put_read();
return r;
}
}
+ ictx->snap_lock.put_read();
notify_change(ictx->md_ctx, ictx->header_oid, NULL, ictx);
ldout(ictx->cct, 20) << "finished flattening" << dendl;
@@ -2078,7 +2083,7 @@ reprotect_and_return_err:
if (r < 0)
return r;
- Mutex::Locker locker(ictx->md_lock);
+ RWLock::RLocker locker(ictx->md_lock);
if (exclusive)
*exclusive = ictx->exclusive_locked;
if (tag)
@@ -2115,7 +2120,7 @@ reprotect_and_return_err:
* checks that we think we will succeed. But for now, let's not
* duplicate that code.
*/
- Mutex::Locker locker(ictx->md_lock);
+ RWLock::RLocker locker(ictx->md_lock);
r = rados::cls::lock::lock(&ictx->md_ctx, ictx->header_oid, RBD_LOCK_NAME,
exclusive ? LOCK_EXCLUSIVE : LOCK_SHARED,
cookie, tag, "", utime_t(), 0);
@@ -2135,7 +2140,7 @@ reprotect_and_return_err:
if (r < 0)
return r;
- Mutex::Locker locker(ictx->md_lock);
+ RWLock::RLocker locker(ictx->md_lock);
r = rados::cls::lock::unlock(&ictx->md_ctx, ictx->header_oid,
RBD_LOCK_NAME, cookie);
if (r < 0)
@@ -2160,7 +2165,7 @@ reprotect_and_return_err:
<< "'" << dendl;
return -EINVAL;
}
- Mutex::Locker locker(ictx->md_lock);
+ RWLock::RLocker locker(ictx->md_lock);
r = rados::cls::lock::break_lock(&ictx->md_ctx, ictx->header_oid,
RBD_LOCK_NAME, cookie, lock_client);
if (r < 0)
@@ -2434,12 +2439,12 @@ reprotect_and_return_err:
// validate extent against image size; clip to image size if necessary
int clip_io(ImageCtx *ictx, uint64_t off, uint64_t *len)
{
- ictx->md_lock.Lock();
- ictx->snap_lock.Lock();
+ ictx->md_lock.get_read();
+ ictx->snap_lock.get_read();
uint64_t image_size = ictx->get_image_size(ictx->snap_id);
bool snap_exists = ictx->snap_exists;
- ictx->snap_lock.Unlock();
- ictx->md_lock.Unlock();
+ ictx->snap_lock.put_read();
+ ictx->md_lock.put_read();
if (!snap_exists)
return -ENOENT;
@@ -2507,14 +2512,14 @@ reprotect_and_return_err:
if (r < 0)
return r;
- ictx->snap_lock.Lock();
+ ictx->snap_lock.get_read();
snapid_t snap_id = ictx->snap_id;
::SnapContext snapc = ictx->snapc;
- ictx->parent_lock.Lock();
+ ictx->parent_lock.get_read();
uint64_t overlap = 0;
ictx->get_parent_overlap(ictx->snap_id, &overlap);
- ictx->parent_lock.Unlock();
- ictx->snap_lock.Unlock();
+ ictx->parent_lock.put_read();
+ ictx->snap_lock.put_read();
if (snap_id != CEPH_NOSNAP || ictx->read_only)
return -EROFS;
@@ -2592,14 +2597,14 @@ reprotect_and_return_err:
return r;
// TODO: check for snap
- ictx->snap_lock.Lock();
+ ictx->snap_lock.get_read();
snapid_t snap_id = ictx->snap_id;
::SnapContext snapc = ictx->snapc;
- ictx->parent_lock.Lock();
+ ictx->parent_lock.get_read();
uint64_t overlap = 0;
ictx->get_parent_overlap(ictx->snap_id, &overlap);
- ictx->parent_lock.Unlock();
- ictx->snap_lock.Unlock();
+ ictx->parent_lock.put_read();
+ ictx->snap_lock.put_read();
if (snap_id != CEPH_NOSNAP || ictx->read_only)
return -EROFS;
@@ -2685,9 +2690,9 @@ reprotect_and_return_err:
if (r < 0)
return r;
- ictx->snap_lock.Lock();
+ ictx->snap_lock.get_read();
snap_t snap_id = ictx->snap_id;
- ictx->snap_lock.Unlock();
+ ictx->snap_lock.put_read();
// map
map<object_t,vector<ObjectExtent> > object_extents;
diff --git a/src/test/run-rbd-tests b/src/test/run-rbd-tests
index d3c8b9e98ca..ddd1f50cdf6 100755
--- a/src/test/run-rbd-tests
+++ b/src/test/run-rbd-tests
@@ -10,7 +10,7 @@ PATH="$CEPH_SRC:$PATH"
recreate_pool() {
POOL_NAME=$1
PG_NUM=100
- ceph osd pool delete $POOL_NAME
+ ceph osd pool delete $POOL_NAME $POOL_NAME --yes-i-really-really-mean-it
ceph osd pool create $POOL_NAME $PG_NUM
}