diff options
author | Yehuda Sadeh <yehuda.sadeh@dreamhost.com> | 2011-11-22 15:05:45 -0800 |
---|---|---|
committer | Yehuda Sadeh <yehuda.sadeh@dreamhost.com> | 2011-11-22 15:06:16 -0800 |
commit | ebe5fc60d20f92a0037c53c1e7bd7ae512be3da4 (patch) | |
tree | 2c9a3d46d666e89964d69d7572d05b84f0006b2f | |
parent | a859763b1cba844d0d56b861a372e5f63f87c607 (diff) | |
download | ceph-ebe5fc60d20f92a0037c53c1e7bd7ae512be3da4.tar.gz |
obsync: tear out rgw
-rwxr-xr-x | src/obsync/obsync | 262 |
1 files changed, 1 insertions, 261 deletions
diff --git a/src/obsync/obsync b/src/obsync/obsync index 2d5e2928827..89751494d75 100755 --- a/src/obsync/obsync +++ b/src/obsync/obsync @@ -28,8 +28,6 @@ import hashlib import mimetypes import os from StringIO import StringIO -import rados -import rgw import re import shutil import string @@ -45,13 +43,9 @@ global opts # Translation table mapping users in the source to users in the destination. global xuser -# Librgw instance -global lrgw -lrgw = None - ###### Usage ####### USAGE = """ -obsync synchronizes S3, Rados, and local objects. The source and destination +obsync synchronizes S3 and local objects. The source and destination can both be local or both remote. Examples: @@ -84,22 +78,6 @@ defaults. obsync (options) [source] [destination]""" -###### Constants ####### -ACL_XATTR = "rados.acl" -META_XATTR_PREFIX = "rados.meta." -CONTENT_TYPE_XATTR = "rados.content_type" - -RGW_META_BUCKET_NAME = ".rgw" -RGW_USERS_UID_BUCKET_NAME = ".users.uid" -RGW_META_ETAG = "user.rgw.etag" -RGW_META_PREFIX = "user.x-amz-meta-" -RGW_META_CONTENT_TYPE = "user.rgw.content_type" -RGW_META_ACL = "user.rgw.acl" - -def vvprint(s): - if (opts.more_verbose): - print s - ###### Exception classes ####### class ObsyncException(Exception): def __init__(self, ty, e): @@ -550,15 +528,6 @@ class Store(object): else: is_secure = os.environ.has_key("SRC_SECURE") return S3Store(s3_url, create, akey, skey, is_secure) - rados_url = strip_prefix("rgw:", url) - if (rados_url): - dst_owner = None - if (is_dst): - if not os.environ.has_key("DST_OWNER"): - raise ObsyncArgumentParsingException("You must set \ -DST_OWNER when uploading files to RgwStore.") - dst_owner = os.environ["DST_OWNER"] - return RgwStore(rados_url, create, akey, skey, dst_owner) file_url = strip_prefix("file://", url) if (file_url): return FileStore(file_url, create) @@ -865,235 +834,6 @@ class FileStore(Store): if (opts.more_verbose): print "FileStore: removed %s" % obj.name -###### Rgw store ####### -class RgwStoreIterator(object): - """RgwStore iterator""" - def __init__(self, it, rgw_store): - self.it = it # has type rados.ObjectIterator - self.rgw_store = rgw_store - self.prefix = self.rgw_store.key_prefix - self.prefix_len = len(self.rgw_store.key_prefix) - def __iter__(self): - return self - def next(self): - rados_obj = None - while True: - # This will raise StopIteration when there are no more objects to - # iterate on - rados_obj = self.it.next() - # do the prefixes match? - if rados_obj.key[:self.prefix_len] == self.prefix: - break - ret = self.rgw_store.obsync_obj_from_rgw(rados_obj.key) - if (ret == None): - raise ObsyncPermanentException("internal iterator error") - return ret - -class RgwStore(Store): - def __init__(self, url, create, akey, skey, owner): - global lrgw - if (lrgw == None): - lrgw = rgw.Rgw() - self.owner = owner - self.user_exists_cache = {} - self.users_uid_ioctx = None - # Parse the rados url - conf_end = string.find(url, ":") - if (conf_end == -1): - raise ObsyncPermanentException("RgwStore URLs are of the form \ -rgw:path/to/ceph/conf:bucket:key_prefix. Failed to find the path to the conf.") - self.conf_file_path = url[0:conf_end] - bucket_end = url.find(":", conf_end+1) - if (bucket_end == -1): - self.rgw_bucket_name = url[conf_end+1:] - self.key_prefix = "" - else: - self.rgw_bucket_name = url[conf_end+1:bucket_end] - self.key_prefix = url[bucket_end+1:] - if (self.rgw_bucket_name == ""): - raise ObsyncPermanentException("RgwStore URLs are of the form \ -rgw:/path/to/ceph/conf:pool:key_prefix. Failed to find the bucket.") - if (opts.more_verbose): - print "self.conf_file_path = '" + self.conf_file_path + "', ", - print "self.rgw_bucket_name = '" + self.rgw_bucket_name + "' ", - print "self.key_prefix = '" + self.key_prefix + "'" - self.rados = rados.Rados() - self.rados.conf_read_file(self.conf_file_path) - self.rados.connect() - if self.owner != None and not self.user_exists(ACL_TYPE_CANON_USER + self.owner): - raise ObsyncPermanentException("Unknown owner! DST_OWNER=%s" % self.owner) - if (not self.rados.pool_exists(self.rgw_bucket_name)): - if (create): - self.create_rgw_bucket(self.rgw_bucket_name) - else: - raise ObsyncPermanentException("NonexistentStore") - elif self.owner == None: - # Figure out what owner we should use when creating objects. - # We use the owner of the destination bucket - ioctx = self.rados.open_ioctx(RGW_META_BUCKET_NAME) - try: - bin_ = ioctx.get_xattr(self.rgw_bucket_name, RGW_META_ACL) - xml = lrgw.acl_bin2xml(bin_) - acl = AclPolicy.from_xml(xml) - self.owner = acl.owner_id - if (opts.more_verbose): - print "using owner \"%s\"" % self.owner - finally: - ioctx.close() - self.ioctx = self.rados.open_ioctx(self.rgw_bucket_name) - Store.__init__(self, "rgw:" + url) - def create_rgw_bucket(self, rgw_bucket_name): - global lrgw - """ Create an rgw bucket named 'rgw_bucket_name' """ - if (self.owner == None): - raise ObsyncArgumentParsingException("Can't create a bucket \ -without knowing who should own it. Please set DST_OWNER") - self.rados.create_pool(self.rgw_bucket_name) - ioctx = None - try: - ioctx = self.rados.open_ioctx(RGW_META_BUCKET_NAME) - ioctx.write(rgw_bucket_name, "", 0) - print "ioctx.set_xattr(rgw_bucket_name=" + rgw_bucket_name + ", " + \ - "user.rgw.acl=" + self.owner + ")" - new_bucket_acl = "\ -<AccessControlPolicy xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\"> \ -<Owner><ID>%s</ID></Owner><AccessControlList>\ -<Grant><Grantee xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" \ -xsi:type=\"CanonicalUser\"><ID>%s</ID> \ -<DisplayName>display-name</DisplayName></Grantee> \ -<Permission>FULL_CONTROL</Permission></Grant>\ -</AccessControlList></AccessControlPolicy>" % (self.owner, self.owner) - new_bucket_acl_bin = lrgw.acl_xml2bin(new_bucket_acl) - ioctx.set_xattr(rgw_bucket_name, "user.rgw.acl", new_bucket_acl_bin) - finally: - if (ioctx): - ioctx.close() - def obsync_obj_from_rgw(self, obj_name): - """Create an obsync object from a Rados object""" - try: - size, tm = self.ioctx.stat(obj_name) - except rados.ObjectNotFound: - return None - md5 = None - meta = {} - for k,v in self.ioctx.get_xattrs(obj_name): - if k == RGW_META_ETAG: - md5 = v - elif k == RGW_META_CONTENT_TYPE: - meta[CONTENT_TYPE_XATTR] = v - elif k[:len(RGW_META_PREFIX)] == RGW_META_PREFIX: - meta["rados.meta." + k[len(RGW_META_PREFIX):]] = v - elif opts.more_verbose: - print "ignoring unknown xattr " + k - if (md5 == None): - raise ObsyncPermanentException("error on object %s: expected to find " + \ - "extended attribute %s" % (obj_name, RGW_META_ETAG)) - if (opts.more_verbose): - print "meta = " + str(meta) - return Object(obj_name, md5, size, meta) - def __str__(self): - return "rgw:" + self.conf_file_path + ":" + self.rgw_bucket_name - def get_acl(self, obj): - global lrgw - bin_ = None - try: - bin_ = self.ioctx.get_xattr(obj.name, RGW_META_ACL) - except rados.NoData: - return LocalAcl.get_empty(obj.name) - xml = lrgw.acl_bin2xml(bin_) - return LocalAcl.from_xml(obj.name, xml) - def make_local_copy(self, obj): - temp_file = None - temp_file_f = None - try: - # read the object from rgw in chunks - temp_file = tempfile.NamedTemporaryFile(mode='w+b', delete=False) - temp_file_f = open(temp_file.name, 'w') - off = 0 - while True: - buf = self.ioctx.read(obj.name, offset = off, length = 8192) - if (len(buf) == 0): - break - temp_file_f.write(buf) - if (len(buf) < 8192): - break - off += 8192 - temp_file_f.close() - except Exception, e: - if (temp_file_f): - temp_file_f.close() - if (temp_file): - os.unlink(temp_file.name) - raise ObsyncTemporaryException(e) - return LocalCopy(obj.name, temp_file.name, True) - def all_objects(self): - it = self.ioctx.list_objects() - return RgwStoreIterator(it, self) - def locate_object(self, obj): - return self.obsync_obj_from_rgw(obj.name) - def user_exists(self, user): - if (self.user_exists_cache.has_key(user)): - return self.user_exists_cache[user] - if user[:len(ACL_TYPE_CANON_USER)] == ACL_TYPE_CANON_USER: - if (self.users_uid_ioctx == None): - # will be closed in __del__ - self.users_uid_ioctx = self.rados.open_ioctx(RGW_USERS_UID_BUCKET_NAME) - try: - self.users_uid_ioctx.stat(user[len(ACL_TYPE_CANON_USER):]) - except rados.ObjectNotFound: - return False - self.user_exists_cache[user] = True - return True - elif user[:len(ACL_TYPE_EMAIL_USER)] == ACL_TYPE_EMAIL_USER: - raise ObsyncPermanentException("rgw target can't handle email users yet.") - elif user[:len(ACL_TYPE_GROUP)] == ACL_TYPE_GROUP: - raise ObsyncPermanentException("rgw target can't handle groups yet.") - else: - raise ObsyncPermanentException("can't understand user name %s" % user) - def upload(self, local_copy, src_acl, obj): - global lrgw - if (opts.more_verbose): - print "RgwStore.UPLOAD: local_copy.path='" + local_copy.path + "' " + \ - "obj='" + obj.name + "'" - if (opts.dry_run): - return - local_copy_f = open(local_copy.path, 'r') - off = 0 - while True: - buf = local_copy_f.read(8192) - if ((len(buf) == 0) and (off != 0)): - break - self.ioctx.write(obj.name, buf, off) - if (len(buf) < 8192): - break - off += 8192 - self.ioctx.set_xattr(obj.name, "user.rgw.etag", obj.md5) - if (src_acl.acl_policy == None): - ap = AclPolicy.create_default(self.owner) - else: - ap = src_acl.acl_policy - for user in ap.get_all_users(): - if not self.user_exists(user): - raise ObsyncPermanentException("You must provide an --xuser entry to translate \ -user %s into something valid for the rgw destination.") - xml = ap.to_xml() - bin_ = lrgw.acl_xml2bin(xml) - self.ioctx.set_xattr(obj.name, "user.rgw.acl", bin_) - content_type = "application/octet-stream" - for k,v in obj.meta.items(): - if k == CONTENT_TYPE_XATTR: - content_type = v - elif k[:len(META_XATTR_PREFIX)] == META_XATTR_PREFIX: - self.ioctx.set_xattr(obj.name, - RGW_META_PREFIX + k[len(META_XATTR_PREFIX):], v) - self.ioctx.set_xattr(obj.name, "user.rgw.content_type", content_type) - def remove(self, obj): - if (opts.dry_run): - return - self.ioctx.remove_object(obj.name) - if (opts.more_verbose): - print "RgwStore: removed %s" % obj.name - ###### Functions ####### def delete_unreferenced(src, dst): """ delete everything from dst that is not referenced in src """ |