diff options
author | Josh Durgin <josh.durgin@inktank.com> | 2013-02-21 11:17:18 -0800 |
---|---|---|
committer | Josh Durgin <josh.durgin@inktank.com> | 2013-02-21 11:19:40 -0800 |
commit | 995ff0e3eaa560b242da8c019a2e11e735e854f7 (patch) | |
tree | ef16be6547f4f461a1658bdd85618320ea442ec3 | |
parent | e0f8e5a80d6d22bd4dee79a4996ea7265d11b0c1 (diff) | |
download | ceph-995ff0e3eaa560b242da8c019a2e11e735e854f7.tar.gz |
librbd: use rwlocks instead of mutexes for several fields
Image metadata like snapshots, size, and parent is frequently read,
but rarely updated. During flatten, we were depending on the parent
lock to prevent the parent ImageCtx from disappearing out from under
us while we read from it. The copy-up path also needed the parent lock
to be able to read from the parent image, which lead to a deadlock.
Convert parent_lock, snap_lock, and md_lock to RWLocks, and change
their use to read instead of exclusive locks where appropriate. The
main place exclusive locks are needed is in ictx_refresh, so this is
pretty simple. This fixes the deadlock, since parent_lock is only
needed for read access in both flatten and the copy-up operation.
cache_lock and refresh_lock are only really used for exclusive access,
so leave them as regular mutexes.
One downside to this is that there's no way to assert is_locked()
for RWLocks, so we'll have to be very careful about changing code
in the future.
Fixes: #3665
Signed-off-by: Josh Durgin <josh.durgin@inktank.com>
-rw-r--r-- | src/librbd/AioRequest.cc | 10 | ||||
-rw-r--r-- | src/librbd/ImageCtx.cc | 36 | ||||
-rw-r--r-- | src/librbd/ImageCtx.h | 7 | ||||
-rw-r--r-- | src/librbd/LibrbdWriteback.cc | 16 | ||||
-rw-r--r-- | src/librbd/internal.cc | 223 |
5 files changed, 134 insertions, 158 deletions
diff --git a/src/librbd/AioRequest.cc b/src/librbd/AioRequest.cc index b9a76e48e0a..95519fb372d 100644 --- a/src/librbd/AioRequest.cc +++ b/src/librbd/AioRequest.cc @@ -4,6 +4,7 @@ #include "common/ceph_context.h" #include "common/dout.h" #include "common/Mutex.h" +#include "common/RWLock.h" #include "librbd/AioCompletion.h" #include "librbd/ImageCtx.h" @@ -50,7 +51,6 @@ namespace librbd { void AioRequest::read_from_parent(vector<pair<uint64_t,uint64_t> >& image_extents) { assert(!m_parent_completion); - assert(m_ictx->parent_lock.is_locked()); m_parent_completion = aio_create_completion_internal(this, rbd_req_cb); ldout(m_ictx->cct, 20) << "read_from_parent this = " << this << " parent completion " << m_parent_completion @@ -68,8 +68,8 @@ namespace librbd { << " r = " << r << dendl; if (!m_tried_parent && r == -ENOENT) { - Mutex::Locker l(m_ictx->snap_lock); - Mutex::Locker l2(m_ictx->parent_lock); + RWLock::RLocker l(m_ictx->snap_lock); + RWLock::RLocker l2(m_ictx->parent_lock); // calculate reverse mapping onto the image vector<pair<uint64_t,uint64_t> > image_extents; @@ -158,8 +158,8 @@ namespace librbd { if (r == -ENOENT) { - Mutex::Locker l(m_ictx->snap_lock); - Mutex::Locker l2(m_ictx->parent_lock); + RWLock::RLocker l(m_ictx->snap_lock); + RWLock::RLocker l2(m_ictx->parent_lock); /* * Parent may have disappeared; if so, recover by using diff --git a/src/librbd/ImageCtx.cc b/src/librbd/ImageCtx.cc index 7909f8c620c..f2ffd79b662 100644 --- a/src/librbd/ImageCtx.cc +++ b/src/librbd/ImageCtx.cc @@ -224,7 +224,6 @@ namespace librbd { int ImageCtx::snap_set(string in_snap_name) { - assert(snap_lock.is_locked()); map<string, SnapInfo>::iterator it = snaps_by_name.find(in_snap_name); if (it != snaps_by_name.end()) { snap_name = in_snap_name; @@ -238,7 +237,6 @@ namespace librbd { void ImageCtx::snap_unset() { - assert(snap_lock.is_locked()); snap_id = CEPH_NOSNAP; snap_name = ""; snap_exists = true; @@ -247,7 +245,6 @@ namespace librbd { snap_t ImageCtx::get_snap_id(string in_snap_name) const { - assert(snap_lock.is_locked()); map<string, SnapInfo>::const_iterator it = snaps_by_name.find(in_snap_name); if (it != snaps_by_name.end()) return it->second.id; @@ -256,7 +253,6 @@ namespace librbd { int ImageCtx::get_snap_name(snapid_t in_snap_id, string *out_snap_name) const { - assert(snap_lock.is_locked()); map<string, SnapInfo>::const_iterator it; for (it = snaps_by_name.begin(); it != snaps_by_name.end(); it++) { @@ -270,7 +266,6 @@ namespace librbd { int ImageCtx::get_parent_spec(snapid_t in_snap_id, parent_spec *out_pspec) { - assert(snap_lock.is_locked()); map<string, SnapInfo>::iterator it; for (it = snaps_by_name.begin(); it != snaps_by_name.end(); it++) { @@ -322,7 +317,6 @@ namespace librbd { int ImageCtx::is_snap_protected(string in_snap_name, bool *is_protected) const { - assert(snap_lock.is_locked()); map<string, SnapInfo>::const_iterator it = snaps_by_name.find(in_snap_name); if (it != snaps_by_name.end()) { *is_protected = @@ -335,7 +329,6 @@ namespace librbd { int ImageCtx::is_snap_unprotected(string in_snap_name, bool *is_unprotected) const { - assert(snap_lock.is_locked()); map<string, SnapInfo>::const_iterator it = snaps_by_name.find(in_snap_name); if (it != snaps_by_name.end()) { *is_unprotected = @@ -347,7 +340,6 @@ namespace librbd { int ImageCtx::get_snap_size(string in_snap_name, uint64_t *out_size) const { - assert(snap_lock.is_locked()); map<string, SnapInfo>::const_iterator it = snaps_by_name.find(in_snap_name); if (it != snaps_by_name.end()) { *out_size = it->second.size; @@ -361,7 +353,6 @@ namespace librbd { parent_info parent, uint8_t protection_status) { - assert(snap_lock.is_locked()); snaps.push_back(id); SnapInfo info(id, in_size, features, parent, protection_status); snaps_by_name.insert(pair<string, SnapInfo>(in_snap_name, info)); @@ -369,8 +360,6 @@ namespace librbd { uint64_t ImageCtx::get_image_size(snap_t in_snap_id) const { - assert(md_lock.is_locked()); - assert(snap_lock.is_locked()); if (in_snap_id == CEPH_NOSNAP) { return size; } @@ -386,8 +375,6 @@ namespace librbd { int ImageCtx::get_features(snap_t in_snap_id, uint64_t *out_features) const { - assert(md_lock.is_locked()); - assert(snap_lock.is_locked()); if (in_snap_id == CEPH_NOSNAP) { *out_features = features; return 0; @@ -405,8 +392,6 @@ namespace librbd { int64_t ImageCtx::get_parent_pool_id(snap_t in_snap_id) const { - assert(snap_lock.is_locked()); - assert(parent_lock.is_locked()); if (in_snap_id == CEPH_NOSNAP) { return parent_md.spec.pool_id; } @@ -422,8 +407,6 @@ namespace librbd { string ImageCtx::get_parent_image_id(snap_t in_snap_id) const { - assert(snap_lock.is_locked()); - assert(parent_lock.is_locked()); if (in_snap_id == CEPH_NOSNAP) { return parent_md.spec.image_id; } @@ -439,8 +422,6 @@ namespace librbd { uint64_t ImageCtx::get_parent_snap_id(snap_t in_snap_id) const { - assert(snap_lock.is_locked()); - assert(parent_lock.is_locked()); if (in_snap_id == CEPH_NOSNAP) { return parent_md.spec.snap_id; } @@ -456,8 +437,6 @@ namespace librbd { int ImageCtx::get_parent_overlap(snap_t in_snap_id, uint64_t *overlap) const { - assert(snap_lock.is_locked()); - assert(parent_lock.is_locked()); if (in_snap_id == CEPH_NOSNAP) { *overlap = parent_md.overlap; return 0; @@ -475,9 +454,9 @@ namespace librbd { void ImageCtx::aio_read_from_cache(object_t o, bufferlist *bl, size_t len, uint64_t off, Context *onfinish) { - snap_lock.Lock(); + snap_lock.get_read(); ObjectCacher::OSDRead *rd = object_cacher->prepare_read(snap_id, bl, 0); - snap_lock.Unlock(); + snap_lock.put_read(); ObjectExtent extent(o, 0 /* a lie */, off, len); extent.oloc.pool = data_ctx.get_id(); extent.buffer_extents.push_back(make_pair(0, len)); @@ -491,10 +470,10 @@ namespace librbd { void ImageCtx::write_to_cache(object_t o, bufferlist& bl, size_t len, uint64_t off) { - snap_lock.Lock(); + snap_lock.get_read(); ObjectCacher::OSDWrite *wr = object_cacher->prepare_write(snapc, bl, utime_t(), 0); - snap_lock.Unlock(); + snap_lock.put_read(); ObjectExtent extent(o, 0, off, len); extent.oloc.pool = data_ctx.get_id(); extent.buffer_extents.push_back(make_pair(0, len)); @@ -542,14 +521,13 @@ namespace librbd { } void ImageCtx::shutdown_cache() { - md_lock.Lock(); + md_lock.get_write(); invalidate_cache(); - md_lock.Unlock(); + md_lock.put_write(); object_cacher->stop(); } void ImageCtx::invalidate_cache() { - assert(md_lock.is_locked()); if (!object_cacher) return; cache_lock.Lock(); @@ -582,8 +560,6 @@ namespace librbd { size_t ImageCtx::parent_io_len(uint64_t offset, size_t length, snap_t in_snap_id) { - assert(snap_lock.is_locked()); - assert(parent_lock.is_locked()); uint64_t overlap = 0; get_parent_overlap(in_snap_id, &overlap); diff --git a/src/librbd/ImageCtx.h b/src/librbd/ImageCtx.h index b24639906b2..99a07a6bf1c 100644 --- a/src/librbd/ImageCtx.h +++ b/src/librbd/ImageCtx.h @@ -11,6 +11,7 @@ #include <vector> #include "common/Mutex.h" +#include "common/RWLock.h" #include "common/snap_types.h" #include "include/buffer.h" #include "include/rbd/librbd.hpp" @@ -59,12 +60,12 @@ namespace librbd { * Lock ordering: * md_lock, cache_lock, snap_lock, parent_lock, refresh_lock */ - Mutex md_lock; // protects access to the mutable image metadata that + RWLock md_lock; // protects access to the mutable image metadata that // isn't guarded by other locks below // (size, features, image locks, etc) Mutex cache_lock; // used as client_lock for the ObjectCacher - Mutex snap_lock; // protects snapshot-related member variables: - Mutex parent_lock; // protects parent_md and parent + RWLock snap_lock; // protects snapshot-related member variables: + RWLock parent_lock; // protects parent_md and parent Mutex refresh_lock; // protects refresh_seq and last_refresh bool old_format; diff --git a/src/librbd/LibrbdWriteback.cc b/src/librbd/LibrbdWriteback.cc index 6d69a8297e7..1689ad91860 100644 --- a/src/librbd/LibrbdWriteback.cc +++ b/src/librbd/LibrbdWriteback.cc @@ -87,13 +87,13 @@ namespace librbd { bool LibrbdWriteback::may_copy_on_write(const object_t& oid, uint64_t read_off, uint64_t read_len, snapid_t snapid) { - m_ictx->snap_lock.Lock(); + m_ictx->snap_lock.get_read(); librados::snap_t snap_id = m_ictx->snap_id; - m_ictx->parent_lock.Lock(); + m_ictx->parent_lock.get_read(); uint64_t overlap = 0; m_ictx->get_parent_overlap(snap_id, &overlap); - m_ictx->parent_lock.Unlock(); - m_ictx->snap_lock.Unlock(); + m_ictx->parent_lock.put_read(); + m_ictx->snap_lock.put_read(); uint64_t object_no = oid_to_object_no(oid.name, m_ictx->object_prefix); @@ -116,13 +116,13 @@ namespace librbd { uint64_t trunc_size, __u32 trunc_seq, Context *oncommit) { - m_ictx->snap_lock.Lock(); + m_ictx->snap_lock.get_read(); librados::snap_t snap_id = m_ictx->snap_id; - m_ictx->parent_lock.Lock(); + m_ictx->parent_lock.get_read(); uint64_t overlap = 0; m_ictx->get_parent_overlap(snap_id, &overlap); - m_ictx->parent_lock.Unlock(); - m_ictx->snap_lock.Unlock(); + m_ictx->parent_lock.put_read(); + m_ictx->snap_lock.put_read(); uint64_t object_no = oid_to_object_no(oid.name, m_ictx->object_prefix); diff --git a/src/librbd/internal.cc b/src/librbd/internal.cc index 33b948d2310..e1c65f0df40 100644 --- a/src/librbd/internal.cc +++ b/src/librbd/internal.cc @@ -109,11 +109,11 @@ namespace librbd { void image_info(ImageCtx *ictx, image_info_t& info, size_t infosize) { int obj_order = ictx->order; - ictx->md_lock.Lock(); - ictx->snap_lock.Lock(); + ictx->md_lock.get_read(); + ictx->snap_lock.get_read(); info.size = ictx->get_image_size(ictx->snap_id); - ictx->snap_lock.Unlock(); - ictx->md_lock.Unlock(); + ictx->snap_lock.put_read(); + ictx->md_lock.put_read(); info.obj_size = 1ULL << obj_order; info.num_objs = howmany(info.size, ictx->get_object_size()); info.order = obj_order; @@ -143,7 +143,6 @@ namespace librbd { void trim_image(ImageCtx *ictx, uint64_t newsize, ProgressContext& prog_ctx) { - assert(ictx->md_lock.is_locked()); CephContext *cct = (CephContext *)ictx->data_ctx.cct(); uint64_t size = ictx->get_current_size(); @@ -247,7 +246,6 @@ namespace librbd { uint64_t ver; if (ictx) { - assert(ictx->md_lock.is_locked()); ictx->refresh_lock.Lock(); ldout(ictx->cct, 20) << "notify_change refresh_seq = " << ictx->refresh_seq << " last_refresh = " << ictx->last_refresh << dendl; @@ -310,7 +308,6 @@ namespace librbd { int rollback_image(ImageCtx *ictx, uint64_t snap_id, ProgressContext& prog_ctx) { - assert(ictx->md_lock.is_locked()); uint64_t numseg = ictx->get_num_objects(); uint64_t bsize = ictx->get_object_size(); @@ -431,7 +428,7 @@ namespace librbd { if (r < 0) return r; - Mutex::Locker l(ictx->md_lock); + RWLock::RLocker l(ictx->md_lock); do { r = add_snap(ictx, snap_name); } while (r == -ESTALE); @@ -475,18 +472,18 @@ namespace librbd { if (r < 0) return r; - Mutex::Locker l(ictx->md_lock); + RWLock::RLocker l(ictx->md_lock); snap_t snap_id; { // block for purposes of auto-destruction of l2 on early return - Mutex::Locker l2(ictx->snap_lock); + RWLock::RLocker l2(ictx->snap_lock); snap_id = ictx->get_snap_id(snap_name); if (snap_id == CEPH_NOSNAP) return -ENOENT; parent_spec our_pspec; - Mutex::Locker l3(ictx->parent_lock); + RWLock::RLocker l3(ictx->parent_lock); r = ictx->get_parent_spec(snap_id, &our_pspec); if (r < 0) { lderr(ictx->cct) << "snap_remove: can't get parent spec" << dendl; @@ -529,8 +526,8 @@ namespace librbd { if (r < 0) return r; - Mutex::Locker l(ictx->md_lock); - Mutex::Locker l2(ictx->snap_lock); + RWLock::RLocker l(ictx->md_lock); + RWLock::RLocker l2(ictx->snap_lock); uint64_t features; ictx->get_features(ictx->snap_id, &features); if ((features & RBD_FEATURE_LAYERING) == 0) { @@ -549,7 +546,7 @@ namespace librbd { if (is_protected) return -EBUSY; - + r = cls_client::set_protection_status(&ictx->md_ctx, ictx->header_oid, snap_id, @@ -572,8 +569,8 @@ namespace librbd { if (r < 0) return r; - Mutex::Locker l(ictx->md_lock); - Mutex::Locker l2(ictx->snap_lock); + RWLock::RLocker l(ictx->md_lock); + RWLock::RLocker l2(ictx->snap_lock); uint64_t features; ictx->get_features(ictx->snap_id, &features); if ((features & RBD_FEATURE_LAYERING) == 0) { @@ -661,7 +658,7 @@ reprotect_and_return_err: if (r < 0) return r; - Mutex::Locker l(ictx->snap_lock); + RWLock::RLocker l(ictx->snap_lock); bool is_unprotected; r = ictx->is_snap_unprotected(snap_name, &is_unprotected); // consider both PROTECTED or UNPROTECTING to be 'protected', @@ -919,13 +916,13 @@ reprotect_and_return_err: goto err_close_parent; } - p_imctx->md_lock.Lock(); - p_imctx->snap_lock.Lock(); + p_imctx->md_lock.get_read(); + p_imctx->snap_lock.get_read(); p_imctx->get_features(p_imctx->snap_id, &p_features); size = p_imctx->get_image_size(p_imctx->snap_id); p_imctx->is_snap_protected(p_imctx->snap_name, &snap_protected); - p_imctx->snap_lock.Unlock(); - p_imctx->md_lock.Unlock(); + p_imctx->snap_lock.put_read(); + p_imctx->md_lock.put_read(); if ((p_features & RBD_FEATURE_LAYERING) != RBD_FEATURE_LAYERING) { lderr(cct) << "parent image must support layering" << dendl; @@ -968,14 +965,14 @@ reprotect_and_return_err: goto err_close_child; } - p_imctx->md_lock.Lock(); + p_imctx->md_lock.get_write(); r = ictx_refresh(p_imctx); - p_imctx->md_lock.Unlock(); + p_imctx->md_lock.put_write(); if (r == 0) { - p_imctx->snap_lock.Lock(); + p_imctx->snap_lock.get_read(); r = p_imctx->is_snap_protected(p_imctx->snap_name, &snap_protected); - p_imctx->snap_lock.Unlock(); + p_imctx->snap_lock.put_read(); } if (r < 0 || !snap_protected) { // we lost the race with unprotect @@ -1145,8 +1142,8 @@ reprotect_and_return_err: int r = ictx_check(ictx); if (r < 0) return r; - Mutex::Locker l(ictx->md_lock); - Mutex::Locker l2(ictx->snap_lock); + RWLock::RLocker l(ictx->md_lock); + RWLock::RLocker l2(ictx->snap_lock); *size = ictx->get_image_size(ictx->snap_id); return 0; } @@ -1156,8 +1153,8 @@ reprotect_and_return_err: int r = ictx_check(ictx); if (r < 0) return r; - Mutex::Locker l(ictx->md_lock); - Mutex::Locker l2(ictx->snap_lock); + RWLock::RLocker l(ictx->md_lock); + RWLock::RLocker l2(ictx->snap_lock); return ictx->get_features(ictx->snap_id, features); } @@ -1166,15 +1163,13 @@ reprotect_and_return_err: int r = ictx_check(ictx); if (r < 0) return r; - Mutex::Locker l(ictx->snap_lock); - Mutex::Locker l2(ictx->parent_lock); + RWLock::RLocker l(ictx->snap_lock); + RWLock::RLocker l2(ictx->parent_lock); return ictx->get_parent_overlap(ictx->snap_id, overlap); } int open_parent(ImageCtx *ictx) { - assert(ictx->snap_lock.is_locked()); - assert(ictx->parent_lock.is_locked()); string pool_name; Rados rados(ictx->md_ctx); @@ -1212,17 +1207,17 @@ reprotect_and_return_err: return r; } - ictx->parent->snap_lock.Lock(); + ictx->parent->snap_lock.get_write(); r = ictx->parent->get_snap_name(parent_snap_id, &ictx->parent->snap_name); if (r < 0) { lderr(ictx->cct) << "parent snapshot does not exist" << dendl; - ictx->parent->snap_lock.Unlock(); + ictx->parent->snap_lock.put_write(); close_image(ictx->parent); ictx->parent = NULL; return r; } ictx->parent->snap_set(ictx->parent->snap_name); - ictx->parent->snap_lock.Unlock(); + ictx->parent->snap_lock.put_write(); return 0; } @@ -1234,8 +1229,8 @@ reprotect_and_return_err: if (r < 0) return r; - Mutex::Locker l(ictx->snap_lock); - Mutex::Locker l2(ictx->parent_lock); + RWLock::RLocker l(ictx->snap_lock); + RWLock::RLocker l2(ictx->parent_lock); parent_spec parent_spec; @@ -1264,7 +1259,7 @@ reprotect_and_return_err: } if (parent_snap_name) { - Mutex::Locker l(ictx->parent->snap_lock); + RWLock::RLocker l(ictx->parent->snap_lock); r = ictx->parent->get_snap_name(parent_spec.snap_id, parent_snap_name); if (r < 0) { @@ -1309,14 +1304,14 @@ reprotect_and_return_err: old_format = ictx->old_format; unknown_format = false; id = ictx->id; - ictx->md_lock.Lock(); + ictx->md_lock.get_read(); trim_image(ictx, 0, prog_ctx); - ictx->md_lock.Unlock(); + ictx->md_lock.put_read(); - ictx->parent_lock.Lock(); + ictx->parent_lock.get_read(); // struct assignment parent_info parent_info = ictx->parent_md; - ictx->parent_lock.Unlock(); + ictx->parent_lock.put_read(); r = cls_client::remove_child(&ictx->md_ctx, RBD_CHILDREN, parent_info.spec, id); @@ -1373,7 +1368,6 @@ reprotect_and_return_err: int resize_helper(ImageCtx *ictx, uint64_t size, ProgressContext& prog_ctx) { - assert(ictx->md_lock.is_locked()); CephContext *cct = ictx->cct; if (size == ictx->size) { @@ -1431,7 +1425,7 @@ reprotect_and_return_err: if (r < 0) return r; - Mutex::Locker l(ictx->md_lock); + RWLock::WLocker l(ictx->md_lock); if (size < ictx->size && ictx->object_cacher) { // need to invalidate since we're deleting objects, and // ObjectCacher doesn't track non-existent objects @@ -1454,7 +1448,7 @@ reprotect_and_return_err: return r; bufferlist bl, bl2; - Mutex::Locker l(ictx->snap_lock); + RWLock::RLocker l(ictx->snap_lock); for (map<string, SnapInfo>::iterator it = ictx->snaps_by_name.begin(); it != ictx->snaps_by_name.end(); ++it) { snap_info_t info; @@ -1469,8 +1463,6 @@ reprotect_and_return_err: int add_snap(ImageCtx *ictx, const char *snap_name) { - assert(ictx->md_lock.is_locked()); - uint64_t snap_id; int r = ictx->md_ctx.selfmanaged_snap_create(&snap_id); @@ -1499,14 +1491,12 @@ reprotect_and_return_err: int rm_snap(ImageCtx *ictx, const char *snap_name) { - assert(ictx->md_lock.is_locked()); - int r; if (ictx->old_format) { r = cls_client::old_snapshot_remove(&ictx->md_ctx, ictx->header_oid, snap_name); } else { - Mutex::Locker l(ictx->snap_lock); + RWLock::RLocker l(ictx->snap_lock); r = cls_client::snapshot_remove(&ictx->md_ctx, ictx->header_oid, ictx->get_snap_id(snap_name)); @@ -1530,7 +1520,7 @@ reprotect_and_return_err: ictx->refresh_lock.Unlock(); if (needs_refresh) { - Mutex::Locker l(ictx->md_lock); + RWLock::WLocker l(ictx->md_lock); int r = ictx_refresh(ictx); if (r < 0) { @@ -1543,8 +1533,6 @@ reprotect_and_return_err: } int refresh_parent(ImageCtx *ictx) { - assert(ictx->snap_lock.is_locked()); - assert(ictx->parent_lock.is_locked()); // close the parent if it changed or this image no longer needs // to read from it int r; @@ -1578,7 +1566,6 @@ reprotect_and_return_err: int ictx_refresh(ImageCtx *ictx) { CephContext *cct = ictx->cct; - assert(ictx->md_lock.is_locked()); bufferlist bl, bl2; ldout(cct, 20) << "ictx_refresh " << ictx << dendl; @@ -1596,9 +1583,9 @@ reprotect_and_return_err: vector<parent_info> snap_parents; vector<uint8_t> snap_protection; { - Mutex::Locker l(ictx->snap_lock); + RWLock::WLocker l(ictx->snap_lock); { - Mutex::Locker l2(ictx->parent_lock); + RWLock::WLocker l2(ictx->parent_lock); ictx->lockers.clear(); if (ictx->old_format) { r = read_header(ictx->md_ctx, ictx->header_oid, &ictx->header, NULL); @@ -1748,8 +1735,8 @@ reprotect_and_return_err: if (r < 0) return r; - Mutex::Locker l(ictx->md_lock); - Mutex::Locker l2(ictx->snap_lock); + RWLock::WLocker l(ictx->md_lock); + RWLock::WLocker l2(ictx->snap_lock); if (!ictx->snap_exists) return -ENOENT; @@ -1825,11 +1812,11 @@ reprotect_and_return_err: << " -> " << destname << dendl; int order = src->order; - src->md_lock.Lock(); - src->snap_lock.Lock(); + src->md_lock.get_read(); + src->snap_lock.get_read(); uint64_t src_size = src->get_image_size(src->snap_id); - src->snap_lock.Unlock(); - src->md_lock.Unlock(); + src->snap_lock.put_read(); + src->md_lock.put_read(); int r = create(dest_md_ctx, destname, src_size, src->old_format, src->features, &order, src->stripe_unit, src->stripe_count); @@ -1855,17 +1842,17 @@ reprotect_and_return_err: { CopyProgressCtx cp(prog_ctx); - src->md_lock.Lock(); - src->snap_lock.Lock(); + src->md_lock.get_read(); + src->snap_lock.get_read(); uint64_t src_size = src->get_image_size(src->snap_id); - src->snap_lock.Unlock(); - src->md_lock.Unlock(); + src->snap_lock.put_read(); + src->md_lock.put_read(); - dest->md_lock.Lock(); - dest->snap_lock.Lock(); + dest->md_lock.get_read(); + dest->snap_lock.get_read(); uint64_t dest_size = dest->get_image_size(dest->snap_id); - dest->snap_lock.Unlock(); - dest->md_lock.Unlock(); + dest->snap_lock.put_read(); + dest->md_lock.put_read(); if (dest_size < src_size) { lderr(src->cct) << " src size " << src_size << " != dest size " << dest_size << dendl; @@ -1889,8 +1876,8 @@ reprotect_and_return_err: int _snap_set(ImageCtx *ictx, const char *snap_name) { - Mutex::Locker l1(ictx->snap_lock); - Mutex::Locker l2(ictx->parent_lock); + RWLock::WLocker l1(ictx->snap_lock); + RWLock::WLocker l2(ictx->parent_lock); int r; if ((snap_name != NULL) && (strlen(snap_name) != 0)) { r = ictx->snap_set(snap_name); @@ -1935,9 +1922,9 @@ reprotect_and_return_err: } } - ictx->md_lock.Lock(); + ictx->md_lock.get_write(); r = ictx_refresh(ictx); - ictx->md_lock.Unlock(); + ictx->md_lock.put_write(); if (r < 0) goto err_close; @@ -1985,28 +1972,36 @@ reprotect_and_return_err: return r; } - Mutex::Locker l(ictx->md_lock); - Mutex::Locker l2(ictx->snap_lock); - Mutex::Locker l3(ictx->parent_lock); + uint64_t object_size; + uint64_t period; + uint64_t overlap; + uint64_t overlap_periods; + uint64_t overlap_objects; - // can't flatten a non-clone - if (ictx->parent_md.spec.pool_id == -1) { - lderr(ictx->cct) << "image has no parent" << dendl; - return -EINVAL; - } - if (ictx->snap_id != CEPH_NOSNAP || ictx->read_only) { - lderr(ictx->cct) << "snapshots cannot be flattened" << dendl; - return -EROFS; - } + { + RWLock::RLocker l(ictx->md_lock); + RWLock::RLocker l2(ictx->snap_lock); + RWLock::RLocker l3(ictx->parent_lock); - assert(ictx->parent != NULL); - assert(ictx->parent_md.overlap <= ictx->size); + // can't flatten a non-clone + if (ictx->parent_md.spec.pool_id == -1) { + lderr(ictx->cct) << "image has no parent" << dendl; + return -EINVAL; + } + if (ictx->snap_id != CEPH_NOSNAP || ictx->read_only) { + lderr(ictx->cct) << "snapshots cannot be flattened" << dendl; + return -EROFS; + } - uint64_t object_size = ictx->get_object_size(); - uint64_t period = ictx->get_stripe_period(); - uint64_t overlap = ictx->parent_md.overlap; - uint64_t overlap_periods = (overlap + period - 1) / period; - uint64_t overlap_objects = overlap_periods * ictx->get_stripe_count(); + assert(ictx->parent != NULL); + assert(ictx->parent_md.overlap <= ictx->size); + + object_size = ictx->get_object_size(); + period = ictx->get_stripe_period(); + overlap = ictx->parent_md.overlap; + overlap_periods = (overlap + period - 1) / period; + overlap_objects = overlap_periods * ictx->get_stripe_count(); + } char *buf = new char[object_size]; @@ -2021,6 +2016,7 @@ reprotect_and_return_err: uint64_t object_overlap = ictx->prune_parent_extents(objectx, overlap); assert(object_overlap <= object_size); + RWLock::RLocker l(ictx->parent_lock); if ((r = read(ictx->parent, objectx, buf, NULL)) < 0) { lderr(ictx->cct) << "reading from parent failed" << dendl; goto err; @@ -2049,15 +2045,18 @@ reprotect_and_return_err: // and if there are no snaps, remove from the children object as well // (if snapshots remain, they have their own parent info, and the child // will be removed when the last snap goes away) + ictx->snap_lock.get_read(); if (ictx->snaps.empty()) { ldout(ictx->cct, 2) << "removing child from children list..." << dendl; int r = cls_client::remove_child(&ictx->md_ctx, RBD_CHILDREN, ictx->parent_md.spec, ictx->id); if (r < 0) { lderr(ictx->cct) << "error removing child from children list" << dendl; + ictx->snap_lock.put_read(); return r; } } + ictx->snap_lock.put_read(); notify_change(ictx->md_ctx, ictx->header_oid, NULL, ictx); ldout(ictx->cct, 20) << "finished flattening" << dendl; @@ -2078,7 +2077,7 @@ reprotect_and_return_err: if (r < 0) return r; - Mutex::Locker locker(ictx->md_lock); + RWLock::RLocker locker(ictx->md_lock); if (exclusive) *exclusive = ictx->exclusive_locked; if (tag) @@ -2115,7 +2114,7 @@ reprotect_and_return_err: * checks that we think we will succeed. But for now, let's not * duplicate that code. */ - Mutex::Locker locker(ictx->md_lock); + RWLock::RLocker locker(ictx->md_lock); r = rados::cls::lock::lock(&ictx->md_ctx, ictx->header_oid, RBD_LOCK_NAME, exclusive ? LOCK_EXCLUSIVE : LOCK_SHARED, cookie, tag, "", utime_t(), 0); @@ -2135,7 +2134,7 @@ reprotect_and_return_err: if (r < 0) return r; - Mutex::Locker locker(ictx->md_lock); + RWLock::RLocker locker(ictx->md_lock); r = rados::cls::lock::unlock(&ictx->md_ctx, ictx->header_oid, RBD_LOCK_NAME, cookie); if (r < 0) @@ -2160,7 +2159,7 @@ reprotect_and_return_err: << "'" << dendl; return -EINVAL; } - Mutex::Locker locker(ictx->md_lock); + RWLock::RLocker locker(ictx->md_lock); r = rados::cls::lock::break_lock(&ictx->md_ctx, ictx->header_oid, RBD_LOCK_NAME, cookie, lock_client); if (r < 0) @@ -2434,12 +2433,12 @@ reprotect_and_return_err: // validate extent against image size; clip to image size if necessary int clip_io(ImageCtx *ictx, uint64_t off, uint64_t *len) { - ictx->md_lock.Lock(); - ictx->snap_lock.Lock(); + ictx->md_lock.get_read(); + ictx->snap_lock.get_read(); uint64_t image_size = ictx->get_image_size(ictx->snap_id); bool snap_exists = ictx->snap_exists; - ictx->snap_lock.Unlock(); - ictx->md_lock.Unlock(); + ictx->snap_lock.put_read(); + ictx->md_lock.put_read(); if (!snap_exists) return -ENOENT; @@ -2507,14 +2506,14 @@ reprotect_and_return_err: if (r < 0) return r; - ictx->snap_lock.Lock(); + ictx->snap_lock.get_read(); snapid_t snap_id = ictx->snap_id; ::SnapContext snapc = ictx->snapc; - ictx->parent_lock.Lock(); + ictx->parent_lock.get_read(); uint64_t overlap = 0; ictx->get_parent_overlap(ictx->snap_id, &overlap); - ictx->parent_lock.Unlock(); - ictx->snap_lock.Unlock(); + ictx->parent_lock.put_read(); + ictx->snap_lock.put_read(); if (snap_id != CEPH_NOSNAP || ictx->read_only) return -EROFS; @@ -2592,14 +2591,14 @@ reprotect_and_return_err: return r; // TODO: check for snap - ictx->snap_lock.Lock(); + ictx->snap_lock.get_read(); snapid_t snap_id = ictx->snap_id; ::SnapContext snapc = ictx->snapc; - ictx->parent_lock.Lock(); + ictx->parent_lock.get_read(); uint64_t overlap = 0; ictx->get_parent_overlap(ictx->snap_id, &overlap); - ictx->parent_lock.Unlock(); - ictx->snap_lock.Unlock(); + ictx->parent_lock.put_read(); + ictx->snap_lock.put_read(); if (snap_id != CEPH_NOSNAP || ictx->read_only) return -EROFS; @@ -2685,9 +2684,9 @@ reprotect_and_return_err: if (r < 0) return r; - ictx->snap_lock.Lock(); + ictx->snap_lock.get_read(); snap_t snap_id = ictx->snap_id; - ictx->snap_lock.Unlock(); + ictx->snap_lock.put_read(); // map map<object_t,vector<ObjectExtent> > object_extents; |