diff options
author | Sage Weil <sage@inktank.com> | 2013-02-22 14:23:45 -0800 |
---|---|---|
committer | Sage Weil <sage@inktank.com> | 2013-02-22 14:23:45 -0800 |
commit | e4fd70fcec3a9abb7f19517326e46f58031c4196 (patch) | |
tree | 999606a3c247af90874bdad1b5bacac058fa69e2 | |
parent | e03657e45226fc21dfb3257c0d2ee47cf67ee8b1 (diff) | |
parent | a1ae8562877d1b902918e866a1699214090c40bd (diff) | |
download | ceph-e4fd70fcec3a9abb7f19517326e46f58031c4196.tar.gz |
Merge remote-tracking branch 'gh/wip-rbd-flatten-deadlock'
Reviewed-by: Sage Weil <sage@inktank.com>
-rw-r--r-- | src/common/RWLock.h | 27 | ||||
-rw-r--r-- | src/librbd/AioRequest.cc | 10 | ||||
-rw-r--r-- | src/librbd/ImageCtx.cc | 36 | ||||
-rw-r--r-- | src/librbd/ImageCtx.h | 7 | ||||
-rw-r--r-- | src/librbd/LibrbdWriteback.cc | 16 | ||||
-rw-r--r-- | src/librbd/internal.cc | 229 | ||||
-rwxr-xr-x | src/test/run-rbd-tests | 2 |
7 files changed, 167 insertions, 160 deletions
diff --git a/src/common/RWLock.h b/src/common/RWLock.h index f8915ed288e..6fabc8c3083 100644 --- a/src/common/RWLock.h +++ b/src/common/RWLock.h @@ -48,7 +48,7 @@ public: // read void get_read() { if (g_lockdep) id = lockdep_will_lock(name, id); - pthread_rwlock_rdlock(&L); + pthread_rwlock_rdlock(&L); if (g_lockdep) id = lockdep_locked(name, id); } bool try_get_read() { @@ -78,6 +78,31 @@ public: void put_write() { unlock(); } + +public: + class RLocker { + RWLock &m_lock; + + public: + RLocker(RWLock& lock) : m_lock(lock) { + m_lock.get_read(); + } + ~RLocker() { + m_lock.put_read(); + } + }; + + class WLocker { + RWLock &m_lock; + + public: + WLocker(RWLock& lock) : m_lock(lock) { + m_lock.get_write(); + } + ~WLocker() { + m_lock.put_write(); + } + }; }; #endif // !_Mutex_Posix_ diff --git a/src/librbd/AioRequest.cc b/src/librbd/AioRequest.cc index b9a76e48e0a..95519fb372d 100644 --- a/src/librbd/AioRequest.cc +++ b/src/librbd/AioRequest.cc @@ -4,6 +4,7 @@ #include "common/ceph_context.h" #include "common/dout.h" #include "common/Mutex.h" +#include "common/RWLock.h" #include "librbd/AioCompletion.h" #include "librbd/ImageCtx.h" @@ -50,7 +51,6 @@ namespace librbd { void AioRequest::read_from_parent(vector<pair<uint64_t,uint64_t> >& image_extents) { assert(!m_parent_completion); - assert(m_ictx->parent_lock.is_locked()); m_parent_completion = aio_create_completion_internal(this, rbd_req_cb); ldout(m_ictx->cct, 20) << "read_from_parent this = " << this << " parent completion " << m_parent_completion @@ -68,8 +68,8 @@ namespace librbd { << " r = " << r << dendl; if (!m_tried_parent && r == -ENOENT) { - Mutex::Locker l(m_ictx->snap_lock); - Mutex::Locker l2(m_ictx->parent_lock); + RWLock::RLocker l(m_ictx->snap_lock); + RWLock::RLocker l2(m_ictx->parent_lock); // calculate reverse mapping onto the image vector<pair<uint64_t,uint64_t> > image_extents; @@ -158,8 +158,8 @@ namespace librbd { if (r == -ENOENT) { - Mutex::Locker l(m_ictx->snap_lock); - Mutex::Locker l2(m_ictx->parent_lock); + RWLock::RLocker l(m_ictx->snap_lock); + RWLock::RLocker l2(m_ictx->parent_lock); /* * Parent may have disappeared; if so, recover by using diff --git a/src/librbd/ImageCtx.cc b/src/librbd/ImageCtx.cc index 7909f8c620c..f2ffd79b662 100644 --- a/src/librbd/ImageCtx.cc +++ b/src/librbd/ImageCtx.cc @@ -224,7 +224,6 @@ namespace librbd { int ImageCtx::snap_set(string in_snap_name) { - assert(snap_lock.is_locked()); map<string, SnapInfo>::iterator it = snaps_by_name.find(in_snap_name); if (it != snaps_by_name.end()) { snap_name = in_snap_name; @@ -238,7 +237,6 @@ namespace librbd { void ImageCtx::snap_unset() { - assert(snap_lock.is_locked()); snap_id = CEPH_NOSNAP; snap_name = ""; snap_exists = true; @@ -247,7 +245,6 @@ namespace librbd { snap_t ImageCtx::get_snap_id(string in_snap_name) const { - assert(snap_lock.is_locked()); map<string, SnapInfo>::const_iterator it = snaps_by_name.find(in_snap_name); if (it != snaps_by_name.end()) return it->second.id; @@ -256,7 +253,6 @@ namespace librbd { int ImageCtx::get_snap_name(snapid_t in_snap_id, string *out_snap_name) const { - assert(snap_lock.is_locked()); map<string, SnapInfo>::const_iterator it; for (it = snaps_by_name.begin(); it != snaps_by_name.end(); it++) { @@ -270,7 +266,6 @@ namespace librbd { int ImageCtx::get_parent_spec(snapid_t in_snap_id, parent_spec *out_pspec) { - assert(snap_lock.is_locked()); map<string, SnapInfo>::iterator it; for (it = snaps_by_name.begin(); it != snaps_by_name.end(); it++) { @@ -322,7 +317,6 @@ namespace librbd { int ImageCtx::is_snap_protected(string in_snap_name, bool *is_protected) const { - assert(snap_lock.is_locked()); map<string, SnapInfo>::const_iterator it = snaps_by_name.find(in_snap_name); if (it != snaps_by_name.end()) { *is_protected = @@ -335,7 +329,6 @@ namespace librbd { int ImageCtx::is_snap_unprotected(string in_snap_name, bool *is_unprotected) const { - assert(snap_lock.is_locked()); map<string, SnapInfo>::const_iterator it = snaps_by_name.find(in_snap_name); if (it != snaps_by_name.end()) { *is_unprotected = @@ -347,7 +340,6 @@ namespace librbd { int ImageCtx::get_snap_size(string in_snap_name, uint64_t *out_size) const { - assert(snap_lock.is_locked()); map<string, SnapInfo>::const_iterator it = snaps_by_name.find(in_snap_name); if (it != snaps_by_name.end()) { *out_size = it->second.size; @@ -361,7 +353,6 @@ namespace librbd { parent_info parent, uint8_t protection_status) { - assert(snap_lock.is_locked()); snaps.push_back(id); SnapInfo info(id, in_size, features, parent, protection_status); snaps_by_name.insert(pair<string, SnapInfo>(in_snap_name, info)); @@ -369,8 +360,6 @@ namespace librbd { uint64_t ImageCtx::get_image_size(snap_t in_snap_id) const { - assert(md_lock.is_locked()); - assert(snap_lock.is_locked()); if (in_snap_id == CEPH_NOSNAP) { return size; } @@ -386,8 +375,6 @@ namespace librbd { int ImageCtx::get_features(snap_t in_snap_id, uint64_t *out_features) const { - assert(md_lock.is_locked()); - assert(snap_lock.is_locked()); if (in_snap_id == CEPH_NOSNAP) { *out_features = features; return 0; @@ -405,8 +392,6 @@ namespace librbd { int64_t ImageCtx::get_parent_pool_id(snap_t in_snap_id) const { - assert(snap_lock.is_locked()); - assert(parent_lock.is_locked()); if (in_snap_id == CEPH_NOSNAP) { return parent_md.spec.pool_id; } @@ -422,8 +407,6 @@ namespace librbd { string ImageCtx::get_parent_image_id(snap_t in_snap_id) const { - assert(snap_lock.is_locked()); - assert(parent_lock.is_locked()); if (in_snap_id == CEPH_NOSNAP) { return parent_md.spec.image_id; } @@ -439,8 +422,6 @@ namespace librbd { uint64_t ImageCtx::get_parent_snap_id(snap_t in_snap_id) const { - assert(snap_lock.is_locked()); - assert(parent_lock.is_locked()); if (in_snap_id == CEPH_NOSNAP) { return parent_md.spec.snap_id; } @@ -456,8 +437,6 @@ namespace librbd { int ImageCtx::get_parent_overlap(snap_t in_snap_id, uint64_t *overlap) const { - assert(snap_lock.is_locked()); - assert(parent_lock.is_locked()); if (in_snap_id == CEPH_NOSNAP) { *overlap = parent_md.overlap; return 0; @@ -475,9 +454,9 @@ namespace librbd { void ImageCtx::aio_read_from_cache(object_t o, bufferlist *bl, size_t len, uint64_t off, Context *onfinish) { - snap_lock.Lock(); + snap_lock.get_read(); ObjectCacher::OSDRead *rd = object_cacher->prepare_read(snap_id, bl, 0); - snap_lock.Unlock(); + snap_lock.put_read(); ObjectExtent extent(o, 0 /* a lie */, off, len); extent.oloc.pool = data_ctx.get_id(); extent.buffer_extents.push_back(make_pair(0, len)); @@ -491,10 +470,10 @@ namespace librbd { void ImageCtx::write_to_cache(object_t o, bufferlist& bl, size_t len, uint64_t off) { - snap_lock.Lock(); + snap_lock.get_read(); ObjectCacher::OSDWrite *wr = object_cacher->prepare_write(snapc, bl, utime_t(), 0); - snap_lock.Unlock(); + snap_lock.put_read(); ObjectExtent extent(o, 0, off, len); extent.oloc.pool = data_ctx.get_id(); extent.buffer_extents.push_back(make_pair(0, len)); @@ -542,14 +521,13 @@ namespace librbd { } void ImageCtx::shutdown_cache() { - md_lock.Lock(); + md_lock.get_write(); invalidate_cache(); - md_lock.Unlock(); + md_lock.put_write(); object_cacher->stop(); } void ImageCtx::invalidate_cache() { - assert(md_lock.is_locked()); if (!object_cacher) return; cache_lock.Lock(); @@ -582,8 +560,6 @@ namespace librbd { size_t ImageCtx::parent_io_len(uint64_t offset, size_t length, snap_t in_snap_id) { - assert(snap_lock.is_locked()); - assert(parent_lock.is_locked()); uint64_t overlap = 0; get_parent_overlap(in_snap_id, &overlap); diff --git a/src/librbd/ImageCtx.h b/src/librbd/ImageCtx.h index b24639906b2..99a07a6bf1c 100644 --- a/src/librbd/ImageCtx.h +++ b/src/librbd/ImageCtx.h @@ -11,6 +11,7 @@ #include <vector> #include "common/Mutex.h" +#include "common/RWLock.h" #include "common/snap_types.h" #include "include/buffer.h" #include "include/rbd/librbd.hpp" @@ -59,12 +60,12 @@ namespace librbd { * Lock ordering: * md_lock, cache_lock, snap_lock, parent_lock, refresh_lock */ - Mutex md_lock; // protects access to the mutable image metadata that + RWLock md_lock; // protects access to the mutable image metadata that // isn't guarded by other locks below // (size, features, image locks, etc) Mutex cache_lock; // used as client_lock for the ObjectCacher - Mutex snap_lock; // protects snapshot-related member variables: - Mutex parent_lock; // protects parent_md and parent + RWLock snap_lock; // protects snapshot-related member variables: + RWLock parent_lock; // protects parent_md and parent Mutex refresh_lock; // protects refresh_seq and last_refresh bool old_format; diff --git a/src/librbd/LibrbdWriteback.cc b/src/librbd/LibrbdWriteback.cc index 6d69a8297e7..1689ad91860 100644 --- a/src/librbd/LibrbdWriteback.cc +++ b/src/librbd/LibrbdWriteback.cc @@ -87,13 +87,13 @@ namespace librbd { bool LibrbdWriteback::may_copy_on_write(const object_t& oid, uint64_t read_off, uint64_t read_len, snapid_t snapid) { - m_ictx->snap_lock.Lock(); + m_ictx->snap_lock.get_read(); librados::snap_t snap_id = m_ictx->snap_id; - m_ictx->parent_lock.Lock(); + m_ictx->parent_lock.get_read(); uint64_t overlap = 0; m_ictx->get_parent_overlap(snap_id, &overlap); - m_ictx->parent_lock.Unlock(); - m_ictx->snap_lock.Unlock(); + m_ictx->parent_lock.put_read(); + m_ictx->snap_lock.put_read(); uint64_t object_no = oid_to_object_no(oid.name, m_ictx->object_prefix); @@ -116,13 +116,13 @@ namespace librbd { uint64_t trunc_size, __u32 trunc_seq, Context *oncommit) { - m_ictx->snap_lock.Lock(); + m_ictx->snap_lock.get_read(); librados::snap_t snap_id = m_ictx->snap_id; - m_ictx->parent_lock.Lock(); + m_ictx->parent_lock.get_read(); uint64_t overlap = 0; m_ictx->get_parent_overlap(snap_id, &overlap); - m_ictx->parent_lock.Unlock(); - m_ictx->snap_lock.Unlock(); + m_ictx->parent_lock.put_read(); + m_ictx->snap_lock.put_read(); uint64_t object_no = oid_to_object_no(oid.name, m_ictx->object_prefix); diff --git a/src/librbd/internal.cc b/src/librbd/internal.cc index 33b948d2310..d913b314edd 100644 --- a/src/librbd/internal.cc +++ b/src/librbd/internal.cc @@ -109,11 +109,11 @@ namespace librbd { void image_info(ImageCtx *ictx, image_info_t& info, size_t infosize) { int obj_order = ictx->order; - ictx->md_lock.Lock(); - ictx->snap_lock.Lock(); + ictx->md_lock.get_read(); + ictx->snap_lock.get_read(); info.size = ictx->get_image_size(ictx->snap_id); - ictx->snap_lock.Unlock(); - ictx->md_lock.Unlock(); + ictx->snap_lock.put_read(); + ictx->md_lock.put_read(); info.obj_size = 1ULL << obj_order; info.num_objs = howmany(info.size, ictx->get_object_size()); info.order = obj_order; @@ -143,7 +143,6 @@ namespace librbd { void trim_image(ImageCtx *ictx, uint64_t newsize, ProgressContext& prog_ctx) { - assert(ictx->md_lock.is_locked()); CephContext *cct = (CephContext *)ictx->data_ctx.cct(); uint64_t size = ictx->get_current_size(); @@ -247,7 +246,6 @@ namespace librbd { uint64_t ver; if (ictx) { - assert(ictx->md_lock.is_locked()); ictx->refresh_lock.Lock(); ldout(ictx->cct, 20) << "notify_change refresh_seq = " << ictx->refresh_seq << " last_refresh = " << ictx->last_refresh << dendl; @@ -310,7 +308,6 @@ namespace librbd { int rollback_image(ImageCtx *ictx, uint64_t snap_id, ProgressContext& prog_ctx) { - assert(ictx->md_lock.is_locked()); uint64_t numseg = ictx->get_num_objects(); uint64_t bsize = ictx->get_object_size(); @@ -431,7 +428,7 @@ namespace librbd { if (r < 0) return r; - Mutex::Locker l(ictx->md_lock); + RWLock::RLocker l(ictx->md_lock); do { r = add_snap(ictx, snap_name); } while (r == -ESTALE); @@ -475,18 +472,18 @@ namespace librbd { if (r < 0) return r; - Mutex::Locker l(ictx->md_lock); + RWLock::RLocker l(ictx->md_lock); snap_t snap_id; { // block for purposes of auto-destruction of l2 on early return - Mutex::Locker l2(ictx->snap_lock); + RWLock::RLocker l2(ictx->snap_lock); snap_id = ictx->get_snap_id(snap_name); if (snap_id == CEPH_NOSNAP) return -ENOENT; parent_spec our_pspec; - Mutex::Locker l3(ictx->parent_lock); + RWLock::RLocker l3(ictx->parent_lock); r = ictx->get_parent_spec(snap_id, &our_pspec); if (r < 0) { lderr(ictx->cct) << "snap_remove: can't get parent spec" << dendl; @@ -529,8 +526,8 @@ namespace librbd { if (r < 0) return r; - Mutex::Locker l(ictx->md_lock); - Mutex::Locker l2(ictx->snap_lock); + RWLock::RLocker l(ictx->md_lock); + RWLock::RLocker l2(ictx->snap_lock); uint64_t features; ictx->get_features(ictx->snap_id, &features); if ((features & RBD_FEATURE_LAYERING) == 0) { @@ -549,7 +546,7 @@ namespace librbd { if (is_protected) return -EBUSY; - + r = cls_client::set_protection_status(&ictx->md_ctx, ictx->header_oid, snap_id, @@ -572,8 +569,8 @@ namespace librbd { if (r < 0) return r; - Mutex::Locker l(ictx->md_lock); - Mutex::Locker l2(ictx->snap_lock); + RWLock::RLocker l(ictx->md_lock); + RWLock::RLocker l2(ictx->snap_lock); uint64_t features; ictx->get_features(ictx->snap_id, &features); if ((features & RBD_FEATURE_LAYERING) == 0) { @@ -661,7 +658,7 @@ reprotect_and_return_err: if (r < 0) return r; - Mutex::Locker l(ictx->snap_lock); + RWLock::RLocker l(ictx->snap_lock); bool is_unprotected; r = ictx->is_snap_unprotected(snap_name, &is_unprotected); // consider both PROTECTED or UNPROTECTING to be 'protected', @@ -919,13 +916,13 @@ reprotect_and_return_err: goto err_close_parent; } - p_imctx->md_lock.Lock(); - p_imctx->snap_lock.Lock(); + p_imctx->md_lock.get_read(); + p_imctx->snap_lock.get_read(); p_imctx->get_features(p_imctx->snap_id, &p_features); size = p_imctx->get_image_size(p_imctx->snap_id); p_imctx->is_snap_protected(p_imctx->snap_name, &snap_protected); - p_imctx->snap_lock.Unlock(); - p_imctx->md_lock.Unlock(); + p_imctx->snap_lock.put_read(); + p_imctx->md_lock.put_read(); if ((p_features & RBD_FEATURE_LAYERING) != RBD_FEATURE_LAYERING) { lderr(cct) << "parent image must support layering" << dendl; @@ -968,14 +965,14 @@ reprotect_and_return_err: goto err_close_child; } - p_imctx->md_lock.Lock(); + p_imctx->md_lock.get_write(); r = ictx_refresh(p_imctx); - p_imctx->md_lock.Unlock(); + p_imctx->md_lock.put_write(); if (r == 0) { - p_imctx->snap_lock.Lock(); + p_imctx->snap_lock.get_read(); r = p_imctx->is_snap_protected(p_imctx->snap_name, &snap_protected); - p_imctx->snap_lock.Unlock(); + p_imctx->snap_lock.put_read(); } if (r < 0 || !snap_protected) { // we lost the race with unprotect @@ -1145,8 +1142,8 @@ reprotect_and_return_err: int r = ictx_check(ictx); if (r < 0) return r; - Mutex::Locker l(ictx->md_lock); - Mutex::Locker l2(ictx->snap_lock); + RWLock::RLocker l(ictx->md_lock); + RWLock::RLocker l2(ictx->snap_lock); *size = ictx->get_image_size(ictx->snap_id); return 0; } @@ -1156,8 +1153,8 @@ reprotect_and_return_err: int r = ictx_check(ictx); if (r < 0) return r; - Mutex::Locker l(ictx->md_lock); - Mutex::Locker l2(ictx->snap_lock); + RWLock::RLocker l(ictx->md_lock); + RWLock::RLocker l2(ictx->snap_lock); return ictx->get_features(ictx->snap_id, features); } @@ -1166,15 +1163,13 @@ reprotect_and_return_err: int r = ictx_check(ictx); if (r < 0) return r; - Mutex::Locker l(ictx->snap_lock); - Mutex::Locker l2(ictx->parent_lock); + RWLock::RLocker l(ictx->snap_lock); + RWLock::RLocker l2(ictx->parent_lock); return ictx->get_parent_overlap(ictx->snap_id, overlap); } int open_parent(ImageCtx *ictx) { - assert(ictx->snap_lock.is_locked()); - assert(ictx->parent_lock.is_locked()); string pool_name; Rados rados(ictx->md_ctx); @@ -1212,17 +1207,17 @@ reprotect_and_return_err: return r; } - ictx->parent->snap_lock.Lock(); + ictx->parent->snap_lock.get_write(); r = ictx->parent->get_snap_name(parent_snap_id, &ictx->parent->snap_name); if (r < 0) { lderr(ictx->cct) << "parent snapshot does not exist" << dendl; - ictx->parent->snap_lock.Unlock(); + ictx->parent->snap_lock.put_write(); close_image(ictx->parent); ictx->parent = NULL; return r; } ictx->parent->snap_set(ictx->parent->snap_name); - ictx->parent->snap_lock.Unlock(); + ictx->parent->snap_lock.put_write(); return 0; } @@ -1234,8 +1229,8 @@ reprotect_and_return_err: if (r < 0) return r; - Mutex::Locker l(ictx->snap_lock); - Mutex::Locker l2(ictx->parent_lock); + RWLock::RLocker l(ictx->snap_lock); + RWLock::RLocker l2(ictx->parent_lock); parent_spec parent_spec; @@ -1264,7 +1259,7 @@ reprotect_and_return_err: } if (parent_snap_name) { - Mutex::Locker l(ictx->parent->snap_lock); + RWLock::RLocker l(ictx->parent->snap_lock); r = ictx->parent->get_snap_name(parent_spec.snap_id, parent_snap_name); if (r < 0) { @@ -1309,14 +1304,14 @@ reprotect_and_return_err: old_format = ictx->old_format; unknown_format = false; id = ictx->id; - ictx->md_lock.Lock(); + ictx->md_lock.get_read(); trim_image(ictx, 0, prog_ctx); - ictx->md_lock.Unlock(); + ictx->md_lock.put_read(); - ictx->parent_lock.Lock(); + ictx->parent_lock.get_read(); // struct assignment parent_info parent_info = ictx->parent_md; - ictx->parent_lock.Unlock(); + ictx->parent_lock.put_read(); r = cls_client::remove_child(&ictx->md_ctx, RBD_CHILDREN, parent_info.spec, id); @@ -1373,7 +1368,6 @@ reprotect_and_return_err: int resize_helper(ImageCtx *ictx, uint64_t size, ProgressContext& prog_ctx) { - assert(ictx->md_lock.is_locked()); CephContext *cct = ictx->cct; if (size == ictx->size) { @@ -1431,7 +1425,7 @@ reprotect_and_return_err: if (r < 0) return r; - Mutex::Locker l(ictx->md_lock); + RWLock::WLocker l(ictx->md_lock); if (size < ictx->size && ictx->object_cacher) { // need to invalidate since we're deleting objects, and // ObjectCacher doesn't track non-existent objects @@ -1454,7 +1448,7 @@ reprotect_and_return_err: return r; bufferlist bl, bl2; - Mutex::Locker l(ictx->snap_lock); + RWLock::RLocker l(ictx->snap_lock); for (map<string, SnapInfo>::iterator it = ictx->snaps_by_name.begin(); it != ictx->snaps_by_name.end(); ++it) { snap_info_t info; @@ -1469,8 +1463,6 @@ reprotect_and_return_err: int add_snap(ImageCtx *ictx, const char *snap_name) { - assert(ictx->md_lock.is_locked()); - uint64_t snap_id; int r = ictx->md_ctx.selfmanaged_snap_create(&snap_id); @@ -1499,14 +1491,12 @@ reprotect_and_return_err: int rm_snap(ImageCtx *ictx, const char *snap_name) { - assert(ictx->md_lock.is_locked()); - int r; if (ictx->old_format) { r = cls_client::old_snapshot_remove(&ictx->md_ctx, ictx->header_oid, snap_name); } else { - Mutex::Locker l(ictx->snap_lock); + RWLock::RLocker l(ictx->snap_lock); r = cls_client::snapshot_remove(&ictx->md_ctx, ictx->header_oid, ictx->get_snap_id(snap_name)); @@ -1530,7 +1520,7 @@ reprotect_and_return_err: ictx->refresh_lock.Unlock(); if (needs_refresh) { - Mutex::Locker l(ictx->md_lock); + RWLock::WLocker l(ictx->md_lock); int r = ictx_refresh(ictx); if (r < 0) { @@ -1543,8 +1533,6 @@ reprotect_and_return_err: } int refresh_parent(ImageCtx *ictx) { - assert(ictx->snap_lock.is_locked()); - assert(ictx->parent_lock.is_locked()); // close the parent if it changed or this image no longer needs // to read from it int r; @@ -1578,7 +1566,6 @@ reprotect_and_return_err: int ictx_refresh(ImageCtx *ictx) { CephContext *cct = ictx->cct; - assert(ictx->md_lock.is_locked()); bufferlist bl, bl2; ldout(cct, 20) << "ictx_refresh " << ictx << dendl; @@ -1596,9 +1583,9 @@ reprotect_and_return_err: vector<parent_info> snap_parents; vector<uint8_t> snap_protection; { - Mutex::Locker l(ictx->snap_lock); + RWLock::WLocker l(ictx->snap_lock); { - Mutex::Locker l2(ictx->parent_lock); + RWLock::WLocker l2(ictx->parent_lock); ictx->lockers.clear(); if (ictx->old_format) { r = read_header(ictx->md_ctx, ictx->header_oid, &ictx->header, NULL); @@ -1748,8 +1735,8 @@ reprotect_and_return_err: if (r < 0) return r; - Mutex::Locker l(ictx->md_lock); - Mutex::Locker l2(ictx->snap_lock); + RWLock::WLocker l(ictx->md_lock); + RWLock::WLocker l2(ictx->snap_lock); if (!ictx->snap_exists) return -ENOENT; @@ -1825,11 +1812,11 @@ reprotect_and_return_err: << " -> " << destname << dendl; int order = src->order; - src->md_lock.Lock(); - src->snap_lock.Lock(); + src->md_lock.get_read(); + src->snap_lock.get_read(); uint64_t src_size = src->get_image_size(src->snap_id); - src->snap_lock.Unlock(); - src->md_lock.Unlock(); + src->snap_lock.put_read(); + src->md_lock.put_read(); int r = create(dest_md_ctx, destname, src_size, src->old_format, src->features, &order, src->stripe_unit, src->stripe_count); @@ -1855,17 +1842,17 @@ reprotect_and_return_err: { CopyProgressCtx cp(prog_ctx); - src->md_lock.Lock(); - src->snap_lock.Lock(); + src->md_lock.get_read(); + src->snap_lock.get_read(); uint64_t src_size = src->get_image_size(src->snap_id); - src->snap_lock.Unlock(); - src->md_lock.Unlock(); + src->snap_lock.put_read(); + src->md_lock.put_read(); - dest->md_lock.Lock(); - dest->snap_lock.Lock(); + dest->md_lock.get_read(); + dest->snap_lock.get_read(); uint64_t dest_size = dest->get_image_size(dest->snap_id); - dest->snap_lock.Unlock(); - dest->md_lock.Unlock(); + dest->snap_lock.put_read(); + dest->md_lock.put_read(); if (dest_size < src_size) { lderr(src->cct) << " src size " << src_size << " != dest size " << dest_size << dendl; @@ -1889,8 +1876,8 @@ reprotect_and_return_err: int _snap_set(ImageCtx *ictx, const char *snap_name) { - Mutex::Locker l1(ictx->snap_lock); - Mutex::Locker l2(ictx->parent_lock); + RWLock::WLocker l1(ictx->snap_lock); + RWLock::WLocker l2(ictx->parent_lock); int r; if ((snap_name != NULL) && (strlen(snap_name) != 0)) { r = ictx->snap_set(snap_name); @@ -1935,9 +1922,9 @@ reprotect_and_return_err: } } - ictx->md_lock.Lock(); + ictx->md_lock.get_write(); r = ictx_refresh(ictx); - ictx->md_lock.Unlock(); + ictx->md_lock.put_write(); if (r < 0) goto err_close; @@ -1985,28 +1972,36 @@ reprotect_and_return_err: return r; } - Mutex::Locker l(ictx->md_lock); - Mutex::Locker l2(ictx->snap_lock); - Mutex::Locker l3(ictx->parent_lock); + uint64_t object_size; + uint64_t period; + uint64_t overlap; + uint64_t overlap_periods; + uint64_t overlap_objects; - // can't flatten a non-clone - if (ictx->parent_md.spec.pool_id == -1) { - lderr(ictx->cct) << "image has no parent" << dendl; - return -EINVAL; - } - if (ictx->snap_id != CEPH_NOSNAP || ictx->read_only) { - lderr(ictx->cct) << "snapshots cannot be flattened" << dendl; - return -EROFS; - } + { + RWLock::RLocker l(ictx->md_lock); + RWLock::RLocker l2(ictx->snap_lock); + RWLock::RLocker l3(ictx->parent_lock); - assert(ictx->parent != NULL); - assert(ictx->parent_md.overlap <= ictx->size); + // can't flatten a non-clone + if (ictx->parent_md.spec.pool_id == -1) { + lderr(ictx->cct) << "image has no parent" << dendl; + return -EINVAL; + } + if (ictx->snap_id != CEPH_NOSNAP || ictx->read_only) { + lderr(ictx->cct) << "snapshots cannot be flattened" << dendl; + return -EROFS; + } - uint64_t object_size = ictx->get_object_size(); - uint64_t period = ictx->get_stripe_period(); - uint64_t overlap = ictx->parent_md.overlap; - uint64_t overlap_periods = (overlap + period - 1) / period; - uint64_t overlap_objects = overlap_periods * ictx->get_stripe_count(); + assert(ictx->parent != NULL); + assert(ictx->parent_md.overlap <= ictx->size); + + object_size = ictx->get_object_size(); + period = ictx->get_stripe_period(); + overlap = ictx->parent_md.overlap; + overlap_periods = (overlap + period - 1) / period; + overlap_objects = overlap_periods * ictx->get_stripe_count(); + } char *buf = new char[object_size]; @@ -2021,6 +2016,13 @@ reprotect_and_return_err: uint64_t object_overlap = ictx->prune_parent_extents(objectx, overlap); assert(object_overlap <= object_size); + RWLock::RLocker l(ictx->parent_lock); + // stop early if the parent went away - it just means + // another flatten finished first, so this one is useless. + if (!ictx->parent) { + r = 0; + goto err; + } if ((r = read(ictx->parent, objectx, buf, NULL)) < 0) { lderr(ictx->cct) << "reading from parent failed" << dendl; goto err; @@ -2049,15 +2051,18 @@ reprotect_and_return_err: // and if there are no snaps, remove from the children object as well // (if snapshots remain, they have their own parent info, and the child // will be removed when the last snap goes away) + ictx->snap_lock.get_read(); if (ictx->snaps.empty()) { ldout(ictx->cct, 2) << "removing child from children list..." << dendl; int r = cls_client::remove_child(&ictx->md_ctx, RBD_CHILDREN, ictx->parent_md.spec, ictx->id); if (r < 0) { lderr(ictx->cct) << "error removing child from children list" << dendl; + ictx->snap_lock.put_read(); return r; } } + ictx->snap_lock.put_read(); notify_change(ictx->md_ctx, ictx->header_oid, NULL, ictx); ldout(ictx->cct, 20) << "finished flattening" << dendl; @@ -2078,7 +2083,7 @@ reprotect_and_return_err: if (r < 0) return r; - Mutex::Locker locker(ictx->md_lock); + RWLock::RLocker locker(ictx->md_lock); if (exclusive) *exclusive = ictx->exclusive_locked; if (tag) @@ -2115,7 +2120,7 @@ reprotect_and_return_err: * checks that we think we will succeed. But for now, let's not * duplicate that code. */ - Mutex::Locker locker(ictx->md_lock); + RWLock::RLocker locker(ictx->md_lock); r = rados::cls::lock::lock(&ictx->md_ctx, ictx->header_oid, RBD_LOCK_NAME, exclusive ? LOCK_EXCLUSIVE : LOCK_SHARED, cookie, tag, "", utime_t(), 0); @@ -2135,7 +2140,7 @@ reprotect_and_return_err: if (r < 0) return r; - Mutex::Locker locker(ictx->md_lock); + RWLock::RLocker locker(ictx->md_lock); r = rados::cls::lock::unlock(&ictx->md_ctx, ictx->header_oid, RBD_LOCK_NAME, cookie); if (r < 0) @@ -2160,7 +2165,7 @@ reprotect_and_return_err: << "'" << dendl; return -EINVAL; } - Mutex::Locker locker(ictx->md_lock); + RWLock::RLocker locker(ictx->md_lock); r = rados::cls::lock::break_lock(&ictx->md_ctx, ictx->header_oid, RBD_LOCK_NAME, cookie, lock_client); if (r < 0) @@ -2434,12 +2439,12 @@ reprotect_and_return_err: // validate extent against image size; clip to image size if necessary int clip_io(ImageCtx *ictx, uint64_t off, uint64_t *len) { - ictx->md_lock.Lock(); - ictx->snap_lock.Lock(); + ictx->md_lock.get_read(); + ictx->snap_lock.get_read(); uint64_t image_size = ictx->get_image_size(ictx->snap_id); bool snap_exists = ictx->snap_exists; - ictx->snap_lock.Unlock(); - ictx->md_lock.Unlock(); + ictx->snap_lock.put_read(); + ictx->md_lock.put_read(); if (!snap_exists) return -ENOENT; @@ -2507,14 +2512,14 @@ reprotect_and_return_err: if (r < 0) return r; - ictx->snap_lock.Lock(); + ictx->snap_lock.get_read(); snapid_t snap_id = ictx->snap_id; ::SnapContext snapc = ictx->snapc; - ictx->parent_lock.Lock(); + ictx->parent_lock.get_read(); uint64_t overlap = 0; ictx->get_parent_overlap(ictx->snap_id, &overlap); - ictx->parent_lock.Unlock(); - ictx->snap_lock.Unlock(); + ictx->parent_lock.put_read(); + ictx->snap_lock.put_read(); if (snap_id != CEPH_NOSNAP || ictx->read_only) return -EROFS; @@ -2592,14 +2597,14 @@ reprotect_and_return_err: return r; // TODO: check for snap - ictx->snap_lock.Lock(); + ictx->snap_lock.get_read(); snapid_t snap_id = ictx->snap_id; ::SnapContext snapc = ictx->snapc; - ictx->parent_lock.Lock(); + ictx->parent_lock.get_read(); uint64_t overlap = 0; ictx->get_parent_overlap(ictx->snap_id, &overlap); - ictx->parent_lock.Unlock(); - ictx->snap_lock.Unlock(); + ictx->parent_lock.put_read(); + ictx->snap_lock.put_read(); if (snap_id != CEPH_NOSNAP || ictx->read_only) return -EROFS; @@ -2685,9 +2690,9 @@ reprotect_and_return_err: if (r < 0) return r; - ictx->snap_lock.Lock(); + ictx->snap_lock.get_read(); snap_t snap_id = ictx->snap_id; - ictx->snap_lock.Unlock(); + ictx->snap_lock.put_read(); // map map<object_t,vector<ObjectExtent> > object_extents; diff --git a/src/test/run-rbd-tests b/src/test/run-rbd-tests index d3c8b9e98ca..ddd1f50cdf6 100755 --- a/src/test/run-rbd-tests +++ b/src/test/run-rbd-tests @@ -10,7 +10,7 @@ PATH="$CEPH_SRC:$PATH" recreate_pool() { POOL_NAME=$1 PG_NUM=100 - ceph osd pool delete $POOL_NAME + ceph osd pool delete $POOL_NAME $POOL_NAME --yes-i-really-really-mean-it ceph osd pool create $POOL_NAME $PG_NUM } |