diff options
author | Sage Weil <sage@inktank.com> | 2013-03-22 10:06:13 -0700 |
---|---|---|
committer | Sage Weil <sage@inktank.com> | 2013-03-22 10:06:13 -0700 |
commit | c9157a19467a6690299672c536d767e81a79ded3 (patch) | |
tree | 3e139fc6d9790bea43666e5b5bb334c5b29d9eee | |
parent | 0981e4666bd70f191302cd2e6f10c5a7c2d503bd (diff) | |
parent | 9f37b49c5cc92526a7ae9839929e567ff71a760a (diff) | |
download | ceph-c9157a19467a6690299672c536d767e81a79ded3.tar.gz |
Merge pull request #117 from ceph/wip-ceph-disk
ceph-disk-* refactor
-rw-r--r-- | ceph.spec.in | 1 | ||||
-rw-r--r-- | debian/ceph.install | 1 | ||||
-rw-r--r-- | src/Makefile.am | 2 | ||||
-rwxr-xr-x | src/ceph-disk | 1662 | ||||
-rwxr-xr-x | src/ceph-disk-activate | 766 | ||||
-rwxr-xr-x | src/ceph-disk-prepare | 1004 |
6 files changed, 1672 insertions, 1764 deletions
diff --git a/ceph.spec.in b/ceph.spec.in index afee3314289..d72c207f9a0 100644 --- a/ceph.spec.in +++ b/ceph.spec.in @@ -434,6 +434,7 @@ fi %{_libdir}/rados-classes/libcls_lock.so* %{_libdir}/rados-classes/libcls_kvs.so* %{_libdir}/rados-classes/libcls_refcount.so* +/sbin/ceph-disk /sbin/ceph-disk-activate /sbin/ceph-disk-prepare /sbin/ceph-create-keys diff --git a/debian/ceph.install b/debian/ceph.install index b942679fd73..f174f55691f 100644 --- a/debian/ceph.install +++ b/debian/ceph.install @@ -7,6 +7,7 @@ usr/bin/ceph-mon usr/bin/ceph-osd usr/bin/ceph-debugpack usr/bin/ceph_mon_store_converter +sbin/ceph-disk usr/sbin/ sbin/ceph-disk-prepare usr/sbin/ sbin/ceph-disk-activate usr/sbin/ sbin/ceph-create-keys usr/sbin/ diff --git a/src/Makefile.am b/src/Makefile.am index 72037726c3d..fd54df867e2 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -28,6 +28,7 @@ bin_PROGRAMS = bin_DEBUGPROGRAMS = sbin_PROGRAMS = sbin_SCRIPTS = \ + ceph-disk \ ceph-disk-prepare \ ceph-disk-activate \ ceph-create-keys \ @@ -1145,6 +1146,7 @@ EXTRA_DIST += \ $(srcdir)/upstart/radosgw.conf \ $(srcdir)/upstart/radosgw-all.conf \ $(srcdir)/upstart/radosgw-all-starter.conf \ + ceph-disk \ ceph-disk-prepare \ ceph-disk-activate \ ceph-create-keys \ diff --git a/src/ceph-disk b/src/ceph-disk new file mode 100755 index 00000000000..1cf8ba96596 --- /dev/null +++ b/src/ceph-disk @@ -0,0 +1,1662 @@ +#!/usr/bin/python + +import argparse +import errno +import logging +import os +import os.path +import platform +import re +import subprocess +import stat +import sys +import tempfile +import uuid + +CEPH_OSD_ONDISK_MAGIC = 'ceph osd volume v026' + +JOURNAL_UUID = '45b0969e-9b03-4f30-b4c6-b4b80ceff106' +DMCRYPT_JOURNAL_UUID = '45b0969e-9b03-4f30-b4c6-5ec00ceff106' +OSD_UUID = '4fbd7e29-9d25-41b8-afd0-062c0ceff05d' +DMCRYPT_OSD_UUID = '4fbd7e29-9d25-41b8-afd0-5ec00ceff05d' +TOBE_UUID = '89c57f98-2fe5-4dc0-89c1-f3ad0ceff2be' +DMCRYPT_TOBE_UUID = '89c57f98-2fe5-4dc0-89c1-5ec00ceff2be' + +DEFAULT_FS_TYPE = 'xfs' + +MOUNT_OPTIONS = dict( + btrfs='noatime,user_subvol_rm_allowed', + # user_xattr is default ever since linux 2.6.39 / 3.0, but we'll + # delay a moment before removing it fully because we did have some + # issues with ext4 before the xatts-in-leveldb work, and it seemed + # that user_xattr helped + ext4='noatime,user_xattr', + xfs='noatime', + ) + +MKFS_ARGS = dict( + btrfs=[ + '-m', 'single', + '-l', '32768', + '-n', '32768', + ], + xfs=[ + # xfs insists on not overwriting previous fs; even if we wipe + # partition table, we often recreate it exactly the same way, + # so we'll see ghosts of filesystems past + '-f', + '-i', 'size=2048', + ], + ) + +INIT_SYSTEMS = [ + 'upstart', + 'sysvinit', + 'systemd', + 'auto', + ] + + +log_name = __name__ +if log_name == '__main__': + log_name = os.path.basename(sys.argv[0]) +log = logging.getLogger(log_name) + + +###### exceptions ######## + +class Error(Exception): + """ + Error + """ + + def __str__(self): + doc = self.__doc__.strip() + return ': '.join([doc] + [str(a) for a in self.args]) + +class MountError(Error): + """ + Mounting filesystem failed + """ + +class UnmountError(Error): + """ + Unmounting filesystem failed + """ + +class BadMagicError(Error): + """ + Does not look like a Ceph OSD, or incompatible version + """ + +class TruncatedLineError(Error): + """ + Line is truncated + """ + + +class TooManyLinesError(Error): + """ + Too many lines + """ + +class FilesystemTypeError(Error): + """ + Cannot discover filesystem type + """ + + +####### utils + + +def maybe_mkdir(*a, **kw): + # remove any symlink, if it is there.. + if os.path.exists(*a) and stat.S_ISLNK(os.lstat(*a).st_mode): + log.debug('Removing old symlink at %s', *a) + os.unlink(*a) + try: + os.mkdir(*a, **kw) + except OSError, e: + if e.errno == errno.EEXIST: + pass + else: + raise + + +def list_all_partitions(): + """ + Return a list of devices and partitions + """ + ls = {} + with file('/proc/partitions', 'rb') as f: + for line in f.read().split('\n')[2:]: + fields = re.split('\s+', line) + if len(fields) < 5: + continue + name = fields[4] + name = '/dev/' + name + if "dm-" in name: + if "/dev/dm" not in ls: + ls["/dev/dm"] = [] + ls["/dev/dm"].append(name) + if name[-1].isdigit(): + base = name + while base[-1].isdigit(): + base = base[:-1] + ls[base].append(name) + else: + ls[name] = [] + return ls + + +def list_partitions(disk): + """ + Return a list of partitions on the given device + """ + disk = os.path.realpath(disk) + assert not is_partition(disk) + assert disk.startswith('/dev/') + base = disk[5:] + ls = [] + with file('/proc/partitions', 'rb') as f: + for line in f.read().split('\n')[2:]: + fields = re.split('\s+', line) + if len(fields) < 5: + continue + name = fields [4] + if name != base and name.startswith(base): + ls.append('/dev/' + name) + return ls + + +def is_partition(dev): + """ + Check whether a given device is a partition or a full disk. + """ + dev = os.path.realpath(dev) + if not stat.S_ISBLK(os.lstat(dev).st_mode): + raise Error('not a block device', dev) + + # if the device ends in a number, it is a partition (e.g., /dev/sda3) + if dev[-1].isdigit(): + return True + return False + + +def is_mounted(dev): + """ + Check if the given device is mounted. + """ + dev = os.path.realpath(dev) + with file('/proc/mounts') as f: + for line in f.read().split('\n'): + d = line.split(' ')[0] + if os.path.exists(d): + d = os.path.realpath(d) + if dev == d: + return True + return False + + +def is_held(dev): + """ + Check if a device is held by another device (e.g., a dm-crypt mapping) + """ + assert os.path.exists(dev) + dev = os.path.realpath(dev) + base = dev[5:] + disk = base + while disk[-1].isdigit(): + disk = disk[:-1] + directory = '/sys/block/{disk}/{base}/holders'.format(disk=disk, base=base) + if not os.path.exists(directory): + return [] + return os.listdir(directory) + + +def verify_not_in_use(dev): + assert os.path.exists(dev) + if is_partition(dev): + if is_mounted(dev): + raise Error('Device is mounted', dev) + holders = is_held(dev) + if holders: + raise Error('Device is in use by a device-mapper mapping (dm-crypt?)' % dev, ','.join(holders)) + else: + for p in list_partitions(dev): + if is_mounted(p): + raise Error('Device is mounted', p) + holders = is_held(p) + if holders: + raise Error('Device %s is in use by a device-mapper mapping (dm-crypt?)' % p, ','.join(holders)) + + +def must_be_one_line(line): + if line[-1:] != '\n': + raise TruncatedLineError(line) + line = line[:-1] + if '\n' in line: + raise TooManyLinesError(line) + return line + + +def read_one_line(parent, name): + """ + Read a file whose sole contents are a single line. + + Strips the newline. + + :return: Contents of the line, or None if file did not exist. + """ + path = os.path.join(parent, name) + try: + line = file(path, 'rb').read() + except IOError as e: + if e.errno == errno.ENOENT: + return None + else: + raise + + try: + line = must_be_one_line(line) + except (TruncatedLineError, TooManyLinesError) as e: + raise Error('File is corrupt: {path}: {msg}'.format( + path=path, + msg=e, + )) + return line + + +def write_one_line(parent, name, text): + """ + Write a file whose sole contents are a single line. + + Adds a newline. + """ + path = os.path.join(parent, name) + tmp = '{path}.{pid}.tmp'.format(path=path, pid=os.getpid()) + with file(tmp, 'wb') as f: + f.write(text + '\n') + os.fsync(f.fileno()) + os.rename(tmp, path) + + +def check_osd_magic(path): + """ + Check that this path has the Ceph OSD magic. + + :raises: BadMagicError if this does not look like a Ceph OSD data + dir. + """ + magic = read_one_line(path, 'magic') + if magic is None: + # probably not mkfs'ed yet + raise BadMagicError(path) + if magic != CEPH_OSD_ONDISK_MAGIC: + raise BadMagicError(path) + + +def check_osd_id(osd_id): + """ + Ensures osd id is numeric. + """ + if not re.match(r'^[0-9]+$', osd_id): + raise Error('osd id is not numeric') + + +def allocate_osd_id( + cluster, + fsid, + keyring, + ): + log.debug('Allocating OSD id...') + try: + osd_id = _check_output( + args=[ + '/usr/bin/ceph', + '--cluster', cluster, + '--name', 'client.bootstrap-osd', + '--keyring', keyring, + 'osd', 'create', '--concise', + fsid, + ], + ) + except subprocess.CalledProcessError as e: + raise Error('ceph osd create failed', e) + osd_id = must_be_one_line(osd_id) + check_osd_id(osd_id) + return osd_id + + +def get_osd_id(path): + osd_id = read_one_line(path, 'whoami') + if osd_id is not None: + check_osd_id(osd_id) + return osd_id + + +def _check_output(*args, **kwargs): + process = subprocess.Popen( + stdout=subprocess.PIPE, + *args, **kwargs) + out, _ = process.communicate() + ret = process.wait() + if ret: + cmd = kwargs.get("args") + if cmd is None: + cmd = args[0] + raise subprocess.CalledProcessError(ret, cmd, output=out) + return out + + +def get_conf(cluster, variable): + try: + p = subprocess.Popen( + args=[ + '/usr/bin/ceph-conf', + '--cluster={cluster}'.format( + cluster=cluster, + ), + '--name=osd.', + '--lookup', + variable, + ], + stdout=subprocess.PIPE, + close_fds=True, + ) + except OSError as e: + raise Error('error executing ceph-conf', e) + (out, _err) = p.communicate() + ret = p.wait() + if ret == 1: + # config entry not found + return None + elif ret != 0: + raise Error('getting variable from configuration failed') + value = str(out).split('\n', 1)[0] + # don't differentiate between "var=" and no var set + if not value: + return None + return value + + +def get_conf_with_default(cluster, variable): + """ + Get a config value that is known to the C++ code. + + This will fail if called on variables that are not defined in + common config options. + """ + try: + out = _check_output( + args=[ + 'ceph-osd', + '--cluster={cluster}'.format( + cluster=cluster, + ), + '--show-config-value={variable}'.format( + variable=variable, + ), + ], + close_fds=True, + ) + except subprocess.CalledProcessError as e: + raise Error( + 'getting variable from configuration failed', + e, + ) + + value = str(out).split('\n', 1)[0] + return value + + +def get_fsid(cluster): + fsid = get_conf(cluster=cluster, variable='fsid') + if fsid is None: + raise Error('getting cluster uuid from configuration failed') + return fsid + + +def get_or_create_dmcrypt_key( + _uuid, + key_dir, + ): + path = os.path.join(key_dir, _uuid) + + # already have it? + if os.path.exists(path): + return path + + # make a new key + try: + if not os.path.exists(key_dir): + os.makedirs(key_dir) + with file('/dev/urandom', 'rb') as i: + key = i.read(256) + with file(path, 'wb') as f: + f.write(key) + return path + except: + raise Error('unable to read or create dm-crypt key', path) + + +def dmcrypt_map( + rawdev, + keypath, + _uuid, + ): + dev = '/dev/mapper/'+ _uuid + args = [ + 'cryptsetup', + '--key-file', + keypath, + '--key-size', '256', + 'create', + _uuid, + rawdev, + ] + try: + subprocess.check_call(args) + return dev + + except subprocess.CalledProcessError as e: + raise Error('unable to map device', rawdev) + + +def dmcrypt_unmap( + _uuid + ): + args = [ + 'cryptsetup', + 'remove', + _uuid + ] + + try: + subprocess.check_call(args) + + except subprocess.CalledProcessError as e: + raise Error('unable to unmap device', _uuid) + + +def mount( + dev, + fstype, + options, + ): + # pick best-of-breed mount options based on fs type + if options is None: + options = MOUNT_OPTIONS.get(fstype, '') + + # mount + path = tempfile.mkdtemp( + prefix='mnt.', + dir='/var/lib/ceph/tmp', + ) + try: + log.debug('Mounting %s on %s with options %s', dev, path, options) + subprocess.check_call( + args=[ + 'mount', + '-o', options, + '--', + dev, + path, + ], + ) + except subprocess.CalledProcessError as e: + try: + os.rmdir(path) + except (OSError, IOError): + pass + raise MountError(e) + + return path + + +def unmount( + path, + ): + try: + log.debug('Unmounting %s', path) + subprocess.check_call( + args=[ + '/bin/umount', + '--', + path, + ], + ) + except subprocess.CalledProcessError as e: + raise UnmountError(e) + + os.rmdir(path) + + +########################################### + + +def get_free_partition_index(dev): + try: + lines = _check_output( + args=[ + 'parted', + '--machine', + '--', + dev, + 'print', + ], + ) + except subprocess.CalledProcessError as e: + print 'cannot read partition index; assume it isn\'t present\n' + return 1 + + if not lines: + raise Error('parted failed to output anything') + lines = str(lines).splitlines(True) + + if lines[0] not in ['CHS;\n', 'CYL;\n', 'BYT;\n']: + raise Error('weird parted units', lines[0]) + del lines[0] + + if not lines[0].startswith('/dev/'): + raise Error('weird parted disk entry', lines[0]) + del lines[0] + + seen = set() + for line in lines: + idx, _ = line.split(':', 1) + idx = int(idx) + seen.add(idx) + + num = 1 + while num in seen: + num += 1 + return num + + +def zap(dev): + """ + Destroy the partition table and content of a given disk. + """ + try: + log.debug('Zapping partition table on %s', dev) + + # try to wipe out any GPT partition table backups. sgdisk + # isn't too thorough. + lba_size = 4096 + size = 33 * lba_size + with file(dev, 'wb') as f: + f.seek(-size, os.SEEK_END) + f.write(size*'\0') + + subprocess.check_call( + args=[ + 'sgdisk', + '--zap-all', + '--clear', + '--mbrtogpt', + '--', + dev, + ], + ) + except subprocess.CalledProcessError as e: + raise Error(e) + + +def prepare_journal_dev( + data, + journal, + journal_size, + journal_uuid, + journal_dm_keypath, + ): + + if is_partition(journal): + log.debug('Journal %s is a partition', journal) + log.warning('OSD will not be hot-swappable if journal is not the same device as the osd data') + return (journal, None, None) + + ptype = JOURNAL_UUID + if journal_dm_keypath: + ptype = DMCRYPT_JOURNAL_UUID + + # it is a whole disk. create a partition! + num = None + if journal == data: + # we're sharing the disk between osd data and journal; + # make journal be partition number 2, so it's pretty; put + # journal at end of free space so partitioning tools don't + # reorder them suddenly + num = 2 + journal_part = '{num}:-{size}M:0'.format( + num=num, + size=journal_size, + ) + else: + # sgdisk has no way for me to say "whatever is the next + # free index number" when setting type guids etc, so we + # need to awkwardly look up the next free number, and then + # fix that in the call -- and hope nobody races with us; + # then again nothing guards the partition table from races + # anyway + num = get_free_partition_index(dev=journal) + journal_part = '{num}:0:+{size}M'.format( + num=num, + size=journal_size, + ) + log.warning('OSD will not be hot-swappable if journal is not the same device as the osd data') + + try: + log.debug('Creating journal partition num %d size %d on %s', num, journal_size, journal) + subprocess.check_call( + args=[ + 'sgdisk', + '--new={part}'.format(part=journal_part), + '--change-name={num}:ceph journal'.format(num=num), + '--partition-guid={num}:{journal_uuid}'.format( + num=num, + journal_uuid=journal_uuid, + ), + '--typecode={num}:{uuid}'.format( + num=num, + uuid=ptype, + ), + '--', + journal, + ], + ) + subprocess.check_call( + args=[ + # also make sure the kernel refreshes the new table + 'partprobe', + journal, + ], + ) + + journal_symlink = '/dev/disk/by-partuuid/{journal_uuid}'.format( + journal_uuid=journal_uuid, + ) + + journal_dmcrypt = None + if journal_dm_keypath: + journal_dmcrypt = journal_symlink + journal_symlink = '/dev/mapper/{uuid}'.format(uuid=journal_uuid) + + log.debug('Journal is GPT partition %s', journal_symlink) + return (journal_symlink, journal_dmcrypt, journal_uuid) + + except subprocess.CalledProcessError as e: + raise Error(e) + + +def prepare_journal_file( + journal, + journal_size): + + if not os.path.exists(journal): + log.debug('Creating journal file %s with size %dM', journal, journal_size) + with file(journal, 'wb') as f: + f.truncate(journal_size * 1048576) + + # FIXME: should we resize an existing journal file? + + log.debug('Journal is file %s', journal) + log.warning('OSD will not be hot-swappable if journal is not the same device as the osd data') + return (journal, None, None) + + +def prepare_journal( + data, + journal, + journal_size, + journal_uuid, + force_file, + force_dev, + journal_dm_keypath, + ): + + if journal is None: + if force_dev: + raise Error('Journal is unspecified; not a block device') + return (None, None, None) + + if not os.path.exists(journal): + if force_dev: + raise Error('Journal does not exist; not a block device', journal) + return prepare_journal_file(journal, journal_size) + + jmode = os.stat(journal).st_mode + if stat.S_ISREG(jmode): + if force_dev: + raise Error('Journal is not a block device', journal) + return prepare_journal_file(journal, journal_size) + + if stat.S_ISBLK(jmode): + if force_file: + raise Error('Journal is not a regular file', journal) + return prepare_journal_dev(data, journal, journal_size, journal_uuid, journal_dm_keypath) + + raise Error('Journal %s is neither a block device nor regular file', journal) + + +def adjust_symlink(target, path): + create = True + if os.path.lexists(path): + try: + mode = os.lstat(path).st_mode + if stat.S_ISREG(mode): + log.debug('Removing old file %s', path) + os.unlink(path) + elif stat.S_ISLNK(mode): + old = os.readlink(path) + if old != target: + log.debug('Removing old symlink %s -> %s', path, old) + os.unlink(path) + else: + create = False + except: + raise Error('unable to remove (or adjust) old file (symlink)', path) + if create: + log.debug('Creating symlink %s -> %s', path, target) + try: + os.symlink(target, path) + except: + raise Error('unable to create symlink %s -> %s' % (path, target)) + +def prepare_dir( + path, + journal, + cluster_uuid, + osd_uuid, + journal_uuid, + journal_dmcrypt = None, + ): + log.debug('Preparing osd data dir %s', path) + + if osd_uuid is None: + osd_uuid = str(uuid.uuid4()) + + if journal is not None: + # we're using an external journal; point to it here + adjust_symlink(journal, os.path.join(path, 'journal')) + + if journal_dmcrypt is not None: + adjust_symlink(journal_dmcrypt, os.path.join(path, 'journal_dmcrypt')) + else: + try: + os.unlink(os.path.join(path, 'journal_dmcrypt')) + except OSError: + pass + + write_one_line(path, 'ceph_fsid', cluster_uuid) + write_one_line(path, 'fsid', osd_uuid) + write_one_line(path, 'magic', CEPH_OSD_ONDISK_MAGIC) + + if journal_uuid is not None: + # i.e., journal is a tagged partition + write_one_line(path, 'journal_uuid', journal_uuid) + +def prepare_dev( + data, + journal, + fstype, + mkfs_args, + mount_options, + cluster_uuid, + osd_uuid, + journal_uuid, + journal_dmcrypt, + osd_dm_keypath, + ): + """ + Prepare a data/journal combination to be used for an OSD. + + The ``magic`` file is written last, so it's presence is a reliable + indicator of the whole sequence having completed. + + WARNING: This will unconditionally overwrite anything given to + it. + """ + + ptype_tobe = TOBE_UUID + ptype_osd = OSD_UUID + if osd_dm_keypath: + ptype_tobe = DMCRYPT_TOBE_UUID + ptype_osd = DMCRYPT_OSD_UUID + + rawdev = None + if is_partition(data): + log.debug('OSD data device %s is a partition', data) + rawdev = data + else: + log.debug('Creating osd partition on %s', data) + try: + subprocess.check_call( + args=[ + 'sgdisk', + '--largest-new=1', + '--change-name=1:ceph data', + '--partition-guid=1:{osd_uuid}'.format( + osd_uuid=osd_uuid, + ), + '--typecode=1:%s' % ptype_tobe, + '--', + data, + ], + ) + subprocess.check_call( + args=[ + # also make sure the kernel refreshes the new table + 'partprobe', + data, + ], + ) + except subprocess.CalledProcessError as e: + raise Error(e) + + rawdev = '{data}1'.format(data=data) + + dev = None + if osd_dm_keypath: + dev = dmcrypt_map(rawdev, osd_dm_keypath, osd_uuid) + else: + dev = rawdev + + try: + args = [ + 'mkfs', + '-t', + fstype, + ] + if mkfs_args is not None: + args.extend(mkfs_args.split()) + if fstype == 'xfs': + args.extend(['-f']) # always force + else: + args.extend(MKFS_ARGS.get(fstype, [])) + args.extend([ + '--', + dev, + ]) + try: + log.debug('Creating %s fs on %s', fstype, dev) + subprocess.check_call(args=args) + except subprocess.CalledProcessError as e: + raise Error(e) + + #remove whitespaces from mount_options + if mount_options is not None: + mount_options = "".join(mount_options.split()) + + path = mount(dev=dev, fstype=fstype, options=mount_options) + + try: + prepare_dir( + path=path, + journal=journal, + cluster_uuid=cluster_uuid, + osd_uuid=osd_uuid, + journal_uuid=journal_uuid, + journal_dmcrypt=journal_dmcrypt, + ) + finally: + unmount(path) + finally: + if rawdev != dev: + dmcrypt_unmap(osd_uuid) + + if not is_partition(data): + try: + subprocess.check_call( + args=[ + 'sgdisk', + '--typecode=1:%s' % ptype_osd, + '--', + data, + ], + ) + subprocess.check_call( + args=[ + # also make sure the kernel refreshes the new table + 'partprobe', + data, + ], + ) + except subprocess.CalledProcessError as e: + raise Error(e) + + +def main_prepare(args): + journal_dm_keypath = None + osd_dm_keypath = None + + try: + if not os.path.exists(args.data): + raise Error('data path does not exist', args.data) + + # in use? + dmode = os.stat(args.data).st_mode + if stat.S_ISBLK(dmode): + verify_not_in_use(args.data) + + if args.journal and os.path.exists(args.journal): + jmode = os.stat(args.journal).st_mode + if stat.S_ISBLK(jmode): + verify_not_in_use(args.journal) + + if args.zap_disk is not None: + if stat.S_ISBLK(dmode) and not is_partition(args.data): + zap(args.data) + else: + raise Error('not full block device; cannot zap', args.data) + + if args.cluster_uuid is None: + args.cluster_uuid = get_fsid(cluster=args.cluster) + if args.cluster_uuid is None: + raise Error( + 'must have fsid in config or pass --cluster--uuid=', + ) + + if args.fs_type is None: + args.fs_type = get_conf( + cluster=args.cluster, + variable='osd_mkfs_type', + ) + if args.fs_type is None: + args.fs_type = get_conf( + cluster=args.cluster, + variable='osd_fs_type', + ) + if args.fs_type is None: + args.fs_type = DEFAULT_FS_TYPE + + mkfs_args = get_conf( + cluster=args.cluster, + variable='osd_mkfs_options_{fstype}'.format( + fstype=args.fs_type, + ), + ) + if mkfs_args is None: + mkfs_args = get_conf( + cluster=args.cluster, + variable='osd_fs_mkfs_options_{fstype}'.format( + fstype=args.fs_type, + ), + ) + + mount_options = get_conf( + cluster=args.cluster, + variable='osd_mount_options_{fstype}'.format( + fstype=args.fs_type, + ), + ) + if mount_options is None: + mount_options = get_conf( + cluster=args.cluster, + variable='osd_fs_mount_options_{fstype}'.format( + fstype=args.fs_type, + ), + ) + + journal_size = get_conf_with_default( + cluster=args.cluster, + variable='osd_journal_size', + ) + journal_size = int(journal_size) + + # colocate journal with data? + if stat.S_ISBLK(dmode) and not is_partition(args.data) and args.journal is None and args.journal_file is None: + log.info('Will colocate journal with data on %s', args.data) + args.journal = args.data + + if args.journal_uuid is None: + args.journal_uuid = str(uuid.uuid4()) + if args.osd_uuid is None: + args.osd_uuid = str(uuid.uuid4()) + + # dm-crypt keys? + if args.dmcrypt: + journal_dm_keypath = get_or_create_dmcrypt_key(args.journal_uuid, args.dmcrypt_key_dir) + osd_dm_keypath = get_or_create_dmcrypt_key(args.osd_uuid, args.dmcrypt_key_dir) + + # prepare journal + (journal_symlink, journal_dmcrypt, journal_uuid) = prepare_journal( + data=args.data, + journal=args.journal, + journal_size=journal_size, + journal_uuid=args.journal_uuid, + force_file=args.journal_file, + force_dev=args.journal_dev, + journal_dm_keypath=journal_dm_keypath, + ) + + # prepare data + if stat.S_ISDIR(dmode): + if args.data_dev: + raise Error('data path is not a block device', args.data) + prepare_dir( + path=args.data, + journal=journal_symlink, + cluster_uuid=args.cluster_uuid, + osd_uuid=args.osd_uuid, + journal_uuid=journal_uuid, + journal_dmcrypt=journal_dmcrypt, + ) + elif stat.S_ISBLK(dmode): + if args.data_dir: + raise Error('data path is not a directory', args.data) + prepare_dev( + data=args.data, + journal=journal_symlink, + fstype=args.fs_type, + mkfs_args=mkfs_args, + mount_options=mount_options, + cluster_uuid=args.cluster_uuid, + osd_uuid=args.osd_uuid, + journal_uuid=journal_uuid, + journal_dmcrypt=journal_dmcrypt, + osd_dm_keypath=osd_dm_keypath, + ) + else: + raise Error('not a dir or block device', args.data) + + except Error as e: + if journal_dm_keypath: + os.unlink(journal_dm_keypath) + if osd_dm_keypath: + os.unlink(osd_dm_keypath) + raise e + + +########################### + + +def mkfs( + path, + cluster, + osd_id, + fsid, + keyring, + ): + monmap = os.path.join(path, 'activate.monmap') + subprocess.check_call( + args=[ + '/usr/bin/ceph', + '--cluster', cluster, + '--name', 'client.bootstrap-osd', + '--keyring', keyring, + 'mon', 'getmap', '-o', monmap, + ], + ) + + subprocess.check_call( + args=[ + '/usr/bin/ceph-osd', + '--cluster', cluster, + '--mkfs', + '--mkkey', + '-i', osd_id, + '--monmap', monmap, + '--osd-data', path, + '--osd-journal', os.path.join(path, 'journal'), + '--osd-uuid', fsid, + '--keyring', os.path.join(path, 'keyring'), + ], + ) + # TODO ceph-osd --mkfs removes the monmap file? + # os.unlink(monmap) + + +def auth_key( + path, + cluster, + osd_id, + keyring, + ): + subprocess.check_call( + args=[ + '/usr/bin/ceph', + '--cluster', cluster, + '--name', 'client.bootstrap-osd', + '--keyring', keyring, + 'auth', 'add', 'osd.{osd_id}'.format(osd_id=osd_id), + '-i', os.path.join(path, 'keyring'), + 'osd', 'allow *', + 'mon', 'allow rwx', + ], + ) + + +def move_mount( + path, + cluster, + osd_id, + ): + log.debug('Moving mount to final location...') + parent = '/var/lib/ceph/osd' + osd_data = os.path.join( + parent, + '{cluster}-{osd_id}'.format(cluster=cluster, osd_id=osd_id), + ) + maybe_mkdir(osd_data) + subprocess.check_call( + args=[ + '/bin/mount', + '--move', + '--', + path, + osd_data, + ], + ) + + +def start_daemon( + cluster, + osd_id, + ): + log.debug('Starting %s osd.%s...', cluster, osd_id) + + path = '/var/lib/ceph/osd/{cluster}-{osd_id}'.format( + cluster=cluster, osd_id=osd_id) + + # upstart? + try: + if os.path.exists(os.path.join(path,'upstart')): + subprocess.check_call( + args=[ + '/sbin/initctl', + # use emit, not start, because start would fail if the + # instance was already running + 'emit', + # since the daemon starting doesn't guarantee much about + # the service being operational anyway, don't bother + # waiting for it + '--no-wait', + '--', + 'ceph-osd', + 'cluster={cluster}'.format(cluster=cluster), + 'id={osd_id}'.format(osd_id=osd_id), + ], + ) + elif os.path.exists(os.path.join(path, 'sysvinit')): + subprocess.check_call( + args=[ + '/usr/sbin/service', + 'ceph', + 'start', + 'osd.{osd_id}'.format(osd_id=osd_id), + ], + ) + else: + raise Error('{cluster} osd.{osd_id} is not tagged with an init system'.format( + cluster=cluster, + osd_id=osd_id, + )) + except subprocess.CalledProcessError as e: + raise Error('ceph osd start failed', e) + +def detect_fstype( + dev, + ): + fstype = _check_output( + args=[ + '/sbin/blkid', + # we don't want stale cached results + '-p', + '-s', 'TYPE', + '-o' 'value', + '--', + dev, + ], + ) + fstype = must_be_one_line(fstype) + return fstype + + +def mount_activate( + dev, + activate_key_template, + init, + ): + + try: + fstype = detect_fstype(dev=dev) + except (subprocess.CalledProcessError, + TruncatedLineError, + TooManyLinesError) as e: + raise FilesystemTypeError( + 'device {dev}'.format(dev=dev), + e, + ) + + # TODO always using mount options from cluster=ceph for + # now; see http://tracker.newdream.net/issues/3253 + mount_options = get_conf( + cluster='ceph', + variable='osd_mount_options_{fstype}'.format( + fstype=fstype, + ), + ) + + if mount_options is None: + mount_options = get_conf( + cluster='ceph', + variable='osd_fs_mount_options_{fstype}'.format( + fstype=fstype, + ), + ) + + #remove whitespaces from mount_options + if mount_options is not None: + mount_options = "".join(mount_options.split()) + + path = mount(dev=dev, fstype=fstype, options=mount_options) + + osd_id = None + cluster = None + try: + (osd_id, cluster) = activate(path, activate_key_template, init) + + # check if the disk is already active, or if something else is already + # mounted there + active = False + other = False + src_dev = os.stat(path).st_dev + try: + dst_dev = os.stat('/var/lib/ceph/osd/{cluster}-{osd_id}'.format( + cluster=cluster, + osd_id=osd_id)).st_dev + if src_dev == dst_dev: + active = True + else: + parent_dev = os.stat('/var/lib/ceph/osd').st_dev + if dst_dev != parent_dev: + other = True + except OSError: + pass + if active: + log.info('%s osd.%s already mounted in position; unmounting ours.' % (cluster, osd_id)) + unmount(path) + elif other: + raise Error('another %s osd.%s already mounted in position (old/different cluster instance?); unmounting ours.' % (cluster, osd_id)) + else: + move_mount( + path=path, + cluster=cluster, + osd_id=osd_id, + ) + return (cluster, osd_id) + + except: + log.error('Failed to activate') + unmount(path) + raise + finally: + # remove out temp dir + os.rmdir(path) + + +def activate_dir( + path, + activate_key_template, + init, + ): + + if not os.path.exists(path): + raise Error( + 'directory %s does not exist' % path + ) + + (osd_id, cluster) = activate(path, activate_key_template, init) + canonical = '/var/lib/ceph/osd/{cluster}-{osd_id}'.format( + cluster=cluster, + osd_id=osd_id) + if path != canonical: + # symlink it from the proper location + create = True + if os.path.lexists(canonical): + old = os.readlink(canonical) + if old != path: + log.debug('Removing old symlink %s -> %s', canonical, old) + try: + os.unlink(canonical) + except: + raise Error('unable to remove old symlink %s', canonical) + else: + create = False + if create: + log.debug('Creating symlink %s -> %s', canonical, path) + try: + os.symlink(path, canonical) + except: + raise Error('unable to create symlink %s -> %s', canonical, path) + + return (cluster, osd_id) + + +def find_cluster_by_uuid(_uuid): + """ + Find a cluster name by searching /etc/ceph/*.conf for a conf file + with the right uuid. + """ + no_fsid = [] + if not os.path.exists('/etc/ceph'): + return None + for conf_file in os.listdir('/etc/ceph'): + if not conf_file.endswith('.conf'): + continue + cluster = conf_file[:-5] + u = get_conf(cluster, 'fsid') + if u is None: + no_fsid.append(cluster) + elif u == _uuid: + return cluster + # be tolerant of /etc/ceph/ceph.conf without an fsid defined. + if len(no_fsid) == 1 and no_fsid[0] == 'ceph': + log.warning('No fsid defined in /etc/ceph/ceph.conf; using anyway') + return 'ceph' + return None + +def activate( + path, + activate_key_template, + init, + ): + + try: + check_osd_magic(path) + + ceph_fsid = read_one_line(path, 'ceph_fsid') + if ceph_fsid is None: + raise Error('No cluster uuid assigned.') + log.debug('Cluster uuid is %s', ceph_fsid) + + cluster = find_cluster_by_uuid(ceph_fsid) + if cluster is None: + raise Error('No cluster conf found in /etc/ceph with fsid %s' % ceph_fsid) + log.debug('Cluster name is %s', cluster) + + fsid = read_one_line(path, 'fsid') + if fsid is None: + raise Error('No OSD uuid assigned.') + log.debug('OSD uuid is %s', fsid) + + keyring = activate_key_template.format(cluster=cluster) + + osd_id = get_osd_id(path) + if osd_id is None: + osd_id = allocate_osd_id( + cluster=cluster, + fsid=fsid, + keyring=keyring, + ) + write_one_line(path, 'whoami', osd_id) + log.debug('OSD id is %s', osd_id) + + if not os.path.exists(os.path.join(path, 'ready')): + log.debug('Initializing OSD...') + # re-running mkfs is safe, so just run until it completes + mkfs( + path=path, + cluster=cluster, + osd_id=osd_id, + fsid=fsid, + keyring=keyring, + ) + + if init is not None: + if init == 'auto': + c = get_conf( + cluster=cluster, + variable='init' + ) + if c is not None: + init = c + else: + (distro, release, codename) = platform.dist() + if distro == 'Ubuntu': + init = 'upstart' + else: + init = 'sysvinit' + + log.debug('Marking with init system %s', init) + with file(os.path.join(path, init), 'w'): + pass + + # remove markers for others, just in case. + for other in INIT_SYSTEMS: + if other != init: + try: + os.unlink(os.path.join(path, other)) + except OSError: + pass + + if not os.path.exists(os.path.join(path, 'active')): + log.debug('Authorizing OSD key...') + auth_key( + path=path, + cluster=cluster, + osd_id=osd_id, + keyring=keyring, + ) + write_one_line(path, 'active', 'ok') + log.debug('%s osd.%s data dir is ready at %s', cluster, osd_id, path) + return (osd_id, cluster) + except: + raise + + + +def main_activate(args): + cluster = None + osd_id = None + + if not os.path.exists(args.path): + raise Error('%s does not exist', args.path) + + mode = os.stat(args.path).st_mode + if stat.S_ISBLK(mode): + (cluster, osd_id) = mount_activate( + dev=args.path, + activate_key_template=args.activate_key_template, + init=args.mark_init, + ) + elif stat.S_ISDIR(mode): + (cluster, osd_id) = activate_dir( + path=args.path, + activate_key_template=args.activate_key_template, + init=args.mark_init, + ) + else: + raise Error('%s is not a directory or block device', args.path) + + start_daemon( + cluster=cluster, + osd_id=osd_id, + ) + + + +########################### + + +def list_dev(dev): + print '%s' % dev + +def main_list(args): + ls = list_all_partitions() + log.debug('partitions are %s' % ls) + + for base, parts in ls.iteritems(): + if parts: + print '%s :' % base + for part in parts: + list_dev(part) + else: + list_dev(base) + + +########################### + + +def parse_args(): + parser = argparse.ArgumentParser( + 'ceph-disk', + ) + parser.add_argument( + '-v', '--verbose', + action='store_true', default=None, + help='be more verbose', + ) + parser.set_defaults( + # we want to hold on to this, for later + prog=parser.prog, + cluster='ceph', + ) + + subparsers = parser.add_subparsers( + title='subcommands', + description='valid subcommands', + help='sub-command help', + ) + + prepare_parser = subparsers.add_parser('prepare', help='Prepare a directory or disk for a Ceph OSD') + prepare_parser.add_argument( + '--cluster', + metavar='NAME', + help='cluster name to assign this disk to', + ) + prepare_parser.add_argument( + '--cluster-uuid', + metavar='UUID', + help='cluster uuid to assign this disk to', + ) + prepare_parser.add_argument( + '--osd-uuid', + metavar='UUID', + help='unique OSD uuid to assign this disk to', + ) + prepare_parser.add_argument( + '--journal-uuid', + metavar='UUID', + help='unique uuid to assign to the journal', + ) + prepare_parser.add_argument( + '--fs-type', + help='file system type to use (e.g. "ext4")', + ) + prepare_parser.add_argument( + '--zap-disk', + action='store_true', default=None, + help='destroy the partition table (and content) of a disk', + ) + prepare_parser.add_argument( + '--data-dir', + action='store_true', default=None, + help='verify that DATA is a dir', + ) + prepare_parser.add_argument( + '--data-dev', + action='store_true', default=None, + help='verify that DATA is a block device', + ) + prepare_parser.add_argument( + '--journal-file', + action='store_true', default=None, + help='verify that JOURNAL is a file', + ) + prepare_parser.add_argument( + '--journal-dev', + action='store_true', default=None, + help='verify that JOURNAL is a block device', + ) + prepare_parser.add_argument( + '--dmcrypt', + action='store_true', default=None, + help='encrypt DATA and/or JOURNAL devices with dm-crypt', + ) + prepare_parser.add_argument( + '--dmcrypt-key-dir', + metavar='KEYDIR', + default='/etc/ceph/dmcrypt-keys', + help='directory where dm-crypt keys are stored', + ) + prepare_parser.add_argument( + 'data', + metavar='DATA', + help='path to OSD data (a disk block device or directory)', + ) + prepare_parser.add_argument( + 'journal', + metavar='JOURNAL', + nargs='?', + help=('path to OSD journal disk block device;' + + ' leave out to store journal in file'), + ) + prepare_parser.set_defaults( + func=main_prepare, + ) + + activate_parser = subparsers.add_parser('activate', help='Activate a Ceph OSD') + activate_parser.add_argument( + '--mount', + action='store_true', default=None, + help='mount a block device [deprecated, ignored]', + ) + activate_parser.add_argument( + '--activate-key', + metavar='PATH', + help='bootstrap-osd keyring path template (%(default)s)', + dest='activate_key_template', + ) + activate_parser.add_argument( + '--mark-init', + metavar='INITSYSTEM', + help='init system to manage this dir', + default='auto', + choices=INIT_SYSTEMS, + ) + activate_parser.add_argument( + 'path', + metavar='PATH', + nargs='?', + help='path to block device or directory', + ) + activate_parser.set_defaults( + activate_key_template='/var/lib/ceph/bootstrap-osd/{cluster}.keyring', + func=main_activate, + ) + + list_parser = subparsers.add_parser('list', help='List disks, partitions, and Ceph OSDs') + list_parser.set_defaults( + func=main_list, + ) + + args = parser.parse_args() + return args + + +def main(): + args = parse_args() + + loglevel = logging.INFO + if args.verbose: + loglevel = logging.DEBUG + + logging.basicConfig( + level=loglevel, + ) + + try: + args.func(args) + + except Error as e: + print >> sys.stderr, '{prog}: {msg}'.format( + prog=args.prog, + msg=e, + ) + sys.exit(1) + + +if __name__ == '__main__': + main() diff --git a/src/ceph-disk-activate b/src/ceph-disk-activate index 9b5370f5d22..72e89f9af30 100755 --- a/src/ceph-disk-activate +++ b/src/ceph-disk-activate @@ -1,763 +1,3 @@ -#!/usr/bin/python - -import argparse -import errno -import logging -import os -import os.path -import platform -import re -import subprocess -import stat -import sys -import tempfile - -init_systems = [ - 'upstart', - 'sysvinit', - 'systemd', - 'auto', - ] - -log_name = __name__ -if log_name == '__main__': - log_name = os.path.basename(sys.argv[0]) -log = logging.getLogger(log_name) - - -class ActivateError(Exception): - """ - OSD activation error - """ - - def __str__(self): - doc = self.__doc__.strip() - return ': '.join([doc] + [str(a) for a in self.args]) - - -class BadMagicError(ActivateError): - """ - Does not look like a Ceph OSD, or incompatible version - """ - - -class TruncatedLineError(ActivateError): - """ - Line is truncated - """ - - -class TooManyLinesError(ActivateError): - """ - Too many lines - """ - - -class FilesystemTypeError(ActivateError): - """ - Cannot discover filesystem type - """ - - -class MountError(ActivateError): - """ - Mounting filesystem failed - """ - - -class UnmountError(ActivateError): - """ - Unmounting filesystem failed - """ - - -def maybe_mkdir(*a, **kw): - # remove any symlink, if it is there.. - if os.path.exists(*a) and stat.S_ISLNK(os.lstat(*a).st_mode): - log.debug('Removing old symlink at %s', *a) - os.unlink(*a) - try: - os.mkdir(*a, **kw) - except OSError, e: - if e.errno == errno.EEXIST: - pass - else: - raise - - -def must_be_one_line(line): - if line[-1:] != '\n': - raise TruncatedLineError(line) - line = line[:-1] - if '\n' in line: - raise TooManyLinesError(line) - return line - - -def read_one_line(parent, name): - """ - Read a file whose sole contents are a single line. - - Strips the newline. - - :return: Contents of the line, or None if file did not exist. - """ - path = os.path.join(parent, name) - try: - line = file(path, 'rb').read() - except IOError as e: - if e.errno == errno.ENOENT: - return None - else: - raise - - try: - line = must_be_one_line(line) - except (TruncatedLineError, TooManyLinesError) as e: - raise ActivateError('File is corrupt: {path}: {msg}'.format( - path=path, - msg=e, - )) - return line - - -def write_one_line(parent, name, text): - """ - Write a file whose sole contents are a single line. - - Adds a newline. - """ - path = os.path.join(parent, name) - tmp = '{path}.{pid}.tmp'.format(path=path, pid=os.getpid()) - with file(tmp, 'wb') as f: - f.write(text + '\n') - os.fsync(f.fileno()) - os.rename(tmp, path) - - -CEPH_OSD_ONDISK_MAGIC = 'ceph osd volume v026' - - -def check_osd_magic(path): - """ - Check that this path has the Ceph OSD magic. - - :raises: BadMagicError if this does not look like a Ceph OSD data - dir. - """ - magic = read_one_line(path, 'magic') - if magic is None: - # probably not mkfs'ed yet - raise BadMagicError(path) - if magic != CEPH_OSD_ONDISK_MAGIC: - raise BadMagicError(path) - - -def check_osd_id(osd_id): - """ - Ensures osd id is numeric. - """ - if not re.match(r'^[0-9]+$', osd_id): - raise ActivateError('osd id is not numeric') - - -def get_osd_id(path): - osd_id = read_one_line(path, 'whoami') - if osd_id is not None: - check_osd_id(osd_id) - return osd_id - - -# TODO depend on python2.7 -def _check_output(*args, **kwargs): - process = subprocess.Popen( - stdout=subprocess.PIPE, - *args, **kwargs) - out, _ = process.communicate() - ret = process.wait() - if ret: - cmd = kwargs.get("args") - if cmd is None: - cmd = args[0] - raise subprocess.CalledProcessError(ret, cmd, output=out) - return out - - -def allocate_osd_id( - cluster, - fsid, - keyring, - ): - log.debug('Allocating OSD id...') - try: - osd_id = _check_output( - args=[ - '/usr/bin/ceph', - '--cluster', cluster, - '--name', 'client.bootstrap-osd', - '--keyring', keyring, - 'osd', 'create', '--concise', - fsid, - ], - ) - except subprocess.CalledProcessError as e: - raise ActivateError('ceph osd create failed', e) - osd_id = must_be_one_line(osd_id) - check_osd_id(osd_id) - return osd_id - - -def mkfs( - path, - cluster, - osd_id, - fsid, - keyring, - ): - monmap = os.path.join(path, 'activate.monmap') - subprocess.check_call( - args=[ - '/usr/bin/ceph', - '--cluster', cluster, - '--name', 'client.bootstrap-osd', - '--keyring', keyring, - 'mon', 'getmap', '-o', monmap, - ], - ) - - subprocess.check_call( - args=[ - '/usr/bin/ceph-osd', - '--cluster', cluster, - '--mkfs', - '--mkkey', - '-i', osd_id, - '--monmap', monmap, - '--osd-data', path, - '--osd-journal', os.path.join(path, 'journal'), - '--osd-uuid', fsid, - '--keyring', os.path.join(path, 'keyring'), - ], - ) - # TODO ceph-osd --mkfs removes the monmap file? - # os.unlink(monmap) - - -def auth_key( - path, - cluster, - osd_id, - keyring, - ): - subprocess.check_call( - args=[ - '/usr/bin/ceph', - '--cluster', cluster, - '--name', 'client.bootstrap-osd', - '--keyring', keyring, - 'auth', 'add', 'osd.{osd_id}'.format(osd_id=osd_id), - '-i', os.path.join(path, 'keyring'), - 'osd', 'allow *', - 'mon', 'allow rwx', - ], - ) - - -def move_mount( - path, - cluster, - osd_id, - ): - log.debug('Moving mount to final location...') - parent = '/var/lib/ceph/osd' - osd_data = os.path.join( - parent, - '{cluster}-{osd_id}'.format(cluster=cluster, osd_id=osd_id), - ) - maybe_mkdir(osd_data) - subprocess.check_call( - args=[ - '/bin/mount', - '--move', - '--', - path, - osd_data, - ], - ) - - -def start_daemon( - cluster, - osd_id, - ): - log.debug('Starting %s osd.%s...', cluster, osd_id) - - path = '/var/lib/ceph/osd/{cluster}-{osd_id}'.format( - cluster=cluster, osd_id=osd_id) - - # upstart? - try: - if os.path.exists(os.path.join(path,'upstart')): - subprocess.check_call( - args=[ - '/sbin/initctl', - # use emit, not start, because start would fail if the - # instance was already running - 'emit', - # since the daemon starting doesn't guarantee much about - # the service being operational anyway, don't bother - # waiting for it - '--no-wait', - '--', - 'ceph-osd', - 'cluster={cluster}'.format(cluster=cluster), - 'id={osd_id}'.format(osd_id=osd_id), - ], - ) - elif os.path.exists(os.path.join(path, 'sysvinit')): - subprocess.check_call( - args=[ - '/usr/sbin/service', - 'ceph', - 'start', - 'osd.{osd_id}'.format(osd_id=osd_id), - ], - ) - else: - raise ActivateError('{cluster} osd.{osd_id} is not tagged with an init system'.format( - cluster=cluster, - osd_id=osd_id, - )) - except subprocess.CalledProcessError as e: - raise ActivateError('ceph osd start failed', e) - -def detect_fstype( - dev, - ): - fstype = _check_output( - args=[ - '/sbin/blkid', - # we don't want stale cached results - '-p', - '-s', 'TYPE', - '-o' 'value', - '--', - dev, - ], - ) - fstype = must_be_one_line(fstype) - return fstype - - -def get_conf(cluster, variable): - try: - p = subprocess.Popen( - args=[ - '/usr/bin/ceph-conf', - '--cluster={cluster}'.format( - cluster=cluster, - ), - '--name=osd.', - '--lookup', - variable, - ], - stdout=subprocess.PIPE, - close_fds=True, - ) - except OSError as e: - raise ActivateError('error executing ceph-conf', e) - (out, _err) = p.communicate() - ret = p.wait() - if ret == 1: - # config entry not found - return None - elif ret != 0: - raise ActivateError('getting variable from configuration failed') - value = out.split('\n', 1)[0] - # don't differentiate between "var=" and no var set - if not value: - return None - return value - - -MOUNT_OPTIONS = dict( - btrfs='noatime,user_subvol_rm_allowed', - # user_xattr is default ever since linux 2.6.39 / 3.0, but we'll - # delay a moment before removing it fully because we did have some - # issues with ext4 before the xatts-in-leveldb work, and it seemed - # that user_xattr helped - ext4='noatime,user_xattr', - xfs='noatime', - ) - - -def mount( - dev, - fstype, - options, - ): - # pick best-of-breed mount options based on fs type - if options is None: - options = MOUNT_OPTIONS.get(fstype, '') - - # mount - path = tempfile.mkdtemp( - prefix='mnt.', - dir='/var/lib/ceph/tmp', - ) - try: - subprocess.check_call( - args=[ - '/bin/mount', - '-o', options, - '--', - dev, - path, - ], - ) - except subprocess.CalledProcessError as e: - try: - os.rmdir(path) - except (OSError, IOError): - pass - raise MountError(e) - - return path - - -def unmount( - path, - ): - try: - subprocess.check_call( - args=[ - '/bin/umount', - '--', - path, - ], - ) - except subprocess.CalledProcessError as e: - raise UnmountError(e) - -def mount_activate( - dev, - activate_key_template, - init, - ): - - try: - fstype = detect_fstype(dev=dev) - except (subprocess.CalledProcessError, - TruncatedLineError, - TooManyLinesError) as e: - raise FilesystemTypeError( - 'device {dev}'.format(dev=dev), - e, - ) - - # TODO always using mount options from cluster=ceph for - # now; see http://tracker.newdream.net/issues/3253 - mount_options = get_conf( - cluster='ceph', - variable='osd_mount_options_{fstype}'.format( - fstype=fstype, - ), - ) - - if mount_options is None: - mount_options = get_conf( - cluster='ceph', - variable='osd_fs_mount_options_{fstype}'.format( - fstype=fstype, - ), - ) - - #remove whitespaces from mount_options - if mount_options is not None: - mount_options = "".join(mount_options.split()) - - path = mount(dev=dev, fstype=fstype, options=mount_options) - - osd_id = None - cluster = None - try: - (osd_id, cluster) = activate(path, activate_key_template, init) - - # check if the disk is already active, or if something else is already - # mounted there - active = False - other = False - src_dev = os.stat(path).st_dev - try: - dst_dev = os.stat('/var/lib/ceph/osd/{cluster}-{osd_id}'.format( - cluster=cluster, - osd_id=osd_id)).st_dev - if src_dev == dst_dev: - active = True - else: - parent_dev = os.stat('/var/lib/ceph/osd').st_dev - if dst_dev != parent_dev: - other = True - except: - pass - if active: - log.info('%s osd.%s already mounted in position; unmounting ours.' % (cluster, osd_id)) - unmount(path) - elif other: - raise ActivateError('another %s osd.%s already mounted in position (old/different cluster instance?); unmounting ours.' % (cluster, osd_id)) - else: - move_mount( - path=path, - cluster=cluster, - osd_id=osd_id, - ) - return (cluster, osd_id) - - except: - log.error('Failed to activate') - unmount(path) - raise - finally: - # remove out temp dir - os.rmdir(path) - - -def activate_dir( - path, - activate_key_template, - init, - ): - - if not os.path.exists(path): - raise ActivateError( - 'directory %s does not exist' % path - ) - - (osd_id, cluster) = activate(path, activate_key_template, init) - canonical = '/var/lib/ceph/osd/{cluster}-{osd_id}'.format( - cluster=cluster, - osd_id=osd_id) - if path != canonical: - # symlink it from the proper location - create = True - if os.path.lexists(canonical): - old = os.readlink(canonical) - if old != path: - log.debug('Removing old symlink %s -> %s', canonical, old) - try: - os.unlink(canonical) - except: - raise ActivateError('unable to remove old symlink %s', canonical) - else: - create = False - if create: - log.debug('Creating symlink %s -> %s', canonical, path) - try: - os.symlink(path, canonical) - except: - raise ActivateError('unable to create symlink %s -> %s', canonical, path) - - return (cluster, osd_id) - - -def find_cluster_by_uuid(uuid): - """ - Find a cluster name by searching /etc/ceph/*.conf for a conf file - with the right uuid. - """ - no_fsid = [] - if not os.path.exists('/etc/ceph'): - return None - for file in os.listdir('/etc/ceph'): - if not file.endswith('.conf'): - continue - cluster = file[:-5] - u = get_conf(cluster, 'fsid') - if u is None: - no_fsid.append(cluster) - elif u == uuid: - return cluster - # be tolerant of /etc/ceph/ceph.conf without an fsid defined. - if len(no_fsid) == 1 and no_fsid[0] == 'ceph': - log.warning('No fsid defined in /etc/ceph/ceph.conf; using anyway') - return 'ceph' - return None - -def activate( - path, - activate_key_template, - init, - ): - - try: - check_osd_magic(path) - - ceph_fsid = read_one_line(path, 'ceph_fsid') - if ceph_fsid is None: - raise ActivateError('No cluster uuid assigned.') - log.debug('Cluster uuid is %s', ceph_fsid) - - cluster = find_cluster_by_uuid(ceph_fsid) - if cluster is None: - raise ActivateError('No cluster conf found in /etc/ceph with fsid %s' % ceph_fsid) - log.debug('Cluster name is %s', cluster) - - fsid = read_one_line(path, 'fsid') - if fsid is None: - raise ActivateError('No OSD uuid assigned.') - log.debug('OSD uuid is %s', fsid) - - keyring = activate_key_template.format(cluster=cluster) - - osd_id = get_osd_id(path) - if osd_id is None: - osd_id = allocate_osd_id( - cluster=cluster, - fsid=fsid, - keyring=keyring, - ) - write_one_line(path, 'whoami', osd_id) - log.debug('OSD id is %s', osd_id) - - if not os.path.exists(os.path.join(path, 'ready')): - log.debug('Initializing OSD...') - # re-running mkfs is safe, so just run until it completes - mkfs( - path=path, - cluster=cluster, - osd_id=osd_id, - fsid=fsid, - keyring=keyring, - ) - - if init is not None: - if init == 'auto': - c = get_conf( - cluster=cluster, - variable='init' - ) - if c is not None: - init = c - else: - (distro, release, codename) = platform.dist() - if distro == 'Ubuntu': - init = 'upstart' - else: - init = 'sysvinit' - - log.debug('Marking with init system %s', init) - with file(os.path.join(path, init), 'w'): - pass - - # remove markers for others, just in case. - for other in init_systems: - if other != init: - try: - os.unlink(os.path.join(path, other)) - except: - pass - - if not os.path.exists(os.path.join(path, 'active')): - log.debug('Authorizing OSD key...') - auth_key( - path=path, - cluster=cluster, - osd_id=osd_id, - keyring=keyring, - ) - write_one_line(path, 'active', 'ok') - log.debug('%s osd.%s data dir is ready at %s', cluster, osd_id, path) - return (osd_id, cluster) - except: - raise - -def parse_args(): - parser = argparse.ArgumentParser( - description='Activate a Ceph OSD', - ) - parser.add_argument( - '-v', '--verbose', - action='store_true', default=None, - help='be more verbose', - ) - parser.add_argument( - '--mount', - action='store_true', default=None, - help='mount a block device; path must follow', - ) - parser.add_argument( - '--activate-key', - metavar='PATH', - help='bootstrap-osd keyring path template (%(default)s)', - dest='activate_key_template', - ) - parser.add_argument( - 'path', - metavar='PATH', - nargs='?', - help='path to block device or directory', - ) - parser.add_argument( - '--mark-init', - metavar='INITSYSTEM', - help='init system to manage this dir', - default='auto', - choices=init_systems, - ) - parser.set_defaults( - activate_key_template='/var/lib/ceph/bootstrap-osd/{cluster}.keyring', - # we want to hold on to this, for later - prog=parser.prog, - ) - args = parser.parse_args() - return args - - -def main(): - args = parse_args() - - loglevel = logging.INFO - if args.verbose: - loglevel = logging.DEBUG - - logging.basicConfig( - level=loglevel, - ) - - try: - cluster = None - osd_id = None - - if not os.path.exists(args.path): - raise ActivateError('%s does not exist', args.path) - - mode = os.stat(args.path).st_mode - if stat.S_ISBLK(mode): - (cluster, osd_id) = mount_activate( - dev=args.path, - activate_key_template=args.activate_key_template, - init=args.mark_init, - ) - elif stat.S_ISDIR(mode): - (cluster, osd_id) = activate_dir( - path=args.path, - activate_key_template=args.activate_key_template, - init=args.mark_init, - ) - else: - raise ActivateError('%s is not a directory or block device', args.path) - - start_daemon( - cluster=cluster, - osd_id=osd_id, - ) - - except ActivateError as e: - print >>sys.stderr, '{prog}: {msg}'.format( - prog=args.prog, - msg=e, - ) - sys.exit(1) - -if __name__ == '__main__': - main() +#!/bin/sh +dir=`dirname $0` +$dir/ceph-disk activate $* diff --git a/src/ceph-disk-prepare b/src/ceph-disk-prepare index 3d5cbf6fe1f..f9255eb8831 100755 --- a/src/ceph-disk-prepare +++ b/src/ceph-disk-prepare @@ -1,1001 +1,3 @@ -#!/usr/bin/python - -import argparse -import logging -import os -import os.path -import re -import subprocess -import stat -import sys -import tempfile -import uuid - -CEPH_OSD_ONDISK_MAGIC = 'ceph osd volume v026' - -JOURNAL_UUID = '45b0969e-9b03-4f30-b4c6-b4b80ceff106' -DMCRYPT_JOURNAL_UUID = '45b0969e-9b03-4f30-b4c6-5ec00ceff106' -OSD_UUID = '4fbd7e29-9d25-41b8-afd0-062c0ceff05d' -DMCRYPT_OSD_UUID = '4fbd7e29-9d25-41b8-afd0-5ec00ceff05d' -TOBE_UUID = '89c57f98-2fe5-4dc0-89c1-f3ad0ceff2be' -DMCRYPT_TOBE_UUID = '89c57f98-2fe5-4dc0-89c1-5ec00ceff2be' - -DEFAULT_FS_TYPE = 'xfs' - -MOUNT_OPTIONS = dict( - btrfs='noatime,user_subvol_rm_allowed', - ext4='noatime,user_xattr', - xfs='noatime', - ) - -MKFS_ARGS = dict( - btrfs=[ - '-m', 'single', - '-l', '32768', - '-n', '32768', - ], - xfs=[ - # xfs insists on not overwriting previous fs; even if we wipe - # partition table, we often recreate it exactly the same way, - # so we'll see ghosts of filesystems past - '-f', - '-i', 'size=2048', - ], - ) - - -log_name = __name__ -if log_name == '__main__': - log_name = os.path.basename(sys.argv[0]) -log = logging.getLogger(log_name) - - -class PrepareError(Exception): - """ - OSD preparation error - """ - - def __str__(self): - doc = self.__doc__.strip() - return ': '.join([doc] + [str(a) for a in self.args]) - - -class MountError(PrepareError): - """ - Mounting filesystem failed - """ - - -class UnmountError(PrepareError): - """ - Unmounting filesystem failed - """ - -def list_partitions(disk): - """ - Return a list of partitions on the given device - """ - disk = os.path.realpath(disk) - assert not is_partition(disk) - assert disk.startswith('/dev/') - base = disk[5:] - ls = [] - with file('/proc/partitions', 'rb') as f: - for line in f.read().split('\n')[2:]: - fields = re.split('\s+', line) - if len(fields) < 5: - continue - (_, major, minor, blocks, name) = fields - if name != base and name.startswith(base): - ls.append('/dev/' + name) - return ls - -def is_partition(dev): - """ - Check whether a given device is a partition or a full disk. - """ - dev = os.path.realpath(dev) - if not stat.S_ISBLK(os.lstat(dev).st_mode): - raise PrepareError('not a block device', dev) - - # if the device ends in a number, it is a partition (e.g., /dev/sda3) - if dev[-1].isdigit(): - return True - return False - -def is_mounted(dev): - """ - Check if the given device is mounted. - """ - dev = os.path.realpath(dev) - with file('/proc/mounts') as f: - for line in f.read().split('\n'): - d = line.split(' ')[0] - if os.path.exists(d): - d = os.path.realpath(d) - if dev == d: - return True - return False - -def is_held(dev): - """ - Check if a device is held by another device (e.g., a dm-crypt mapping) - """ - assert os.path.exists(dev) - dev = os.path.realpath(dev) - base = dev[5:] - disk = base - while disk[-1].isdigit(): - disk = disk[:-1] - dir = '/sys/block/{disk}/{base}/holders'.format(disk=disk, base=base) - if not os.path.exists(dir): - return [] - return os.listdir(dir) - -def verify_not_in_use(dev): - assert os.path.exists(dev) - if is_partition(dev): - if is_mounted(dev): - raise PrepareError('Device is mounted', dev) - holders = is_held(dev) - if holders: - raise PrepareError('Device is in use by a device-mapper mapping (dm-crypt?)' % dev, ','.join(holders)) - else: - for p in list_partitions(dev): - if is_mounted(p): - raise PrepareError('Device is mounted', p) - holders = is_held(p) - if holders: - raise PrepareError('Device %s is in use by a device-mapper mapping (dm-crypt?)' % p, ','.join(holders)) - -def write_one_line(parent, name, text): - """ - Write a file whose sole contents are a single line. - - Adds a newline. - """ - path = os.path.join(parent, name) - tmp = '{path}.{pid}.tmp'.format(path=path, pid=os.getpid()) - with file(tmp, 'wb') as f: - f.write(text + '\n') - os.fsync(f.fileno()) - os.rename(tmp, path) - - -# TODO depend on python2.7 -def _check_output(*args, **kwargs): - process = subprocess.Popen( - stdout=subprocess.PIPE, - *args, **kwargs) - out, _ = process.communicate() - ret = process.wait() - if ret: - cmd = kwargs.get("args") - if cmd is None: - cmd = args[0] - raise subprocess.CalledProcessError(ret, cmd, output=out) - return out - - -def get_conf(cluster, variable): - try: - p = subprocess.Popen( - args=[ - 'ceph-conf', - '--cluster={cluster}'.format( - cluster=cluster, - ), - '--name=osd.', - '--lookup', - variable, - ], - stdout=subprocess.PIPE, - close_fds=True, - ) - except OSError as e: - raise PrepareError('error executing ceph-conf', e) - (out, _err) = p.communicate() - ret = p.wait() - if ret == 1: - # config entry not found - return None - elif ret != 0: - raise PrepareError('getting variable from configuration failed') - value = out.split('\n', 1)[0] - # don't differentiate between "var=" and no var set - if not value: - return None - return value - - -def get_conf_with_default(cluster, variable): - """ - Get a config value that is known to the C++ code. - - This will fail if called on variables that are not defined in - common config options. - """ - try: - out = _check_output( - args=[ - 'ceph-osd', - '--cluster={cluster}'.format( - cluster=cluster, - ), - '--show-config-value={variable}'.format( - variable=variable, - ), - ], - close_fds=True, - ) - except subprocess.CalledProcessError as e: - raise PrepareError( - 'getting variable from configuration failed', - e, - ) - - value = out.split('\n', 1)[0] - return value - - -def get_fsid(cluster): - fsid = get_conf(cluster=cluster, variable='fsid') - if fsid is None: - raise PrepareError('getting cluster uuid from configuration failed') - return fsid - - -def get_or_create_dmcrypt_key( - uuid, - key_dir, - ): - path = os.path.join(key_dir, uuid) - - # already have it? - if os.path.exists(path): - return path - - # make a new key - try: - if not os.path.exists(key_dir): - os.makedirs(key_dir) - with file('/dev/urandom', 'rb') as i: - key = i.read(256) - with file(path, 'wb') as f: - f.write(key) - return path - except: - raise PrepareError('unable to read or create dm-crypt key', path) - - -def dmcrypt_map( - rawdev, - keypath, - uuid, - ): - dev = '/dev/mapper/'+ uuid - args = [ - 'cryptsetup', - '--key-file', - keypath, - '--key-size', '256', - 'create', - uuid, - rawdev, - ] - try: - subprocess.check_call(args) - return dev - - except subprocess.CalledProcessError as e: - raise PrepareError('unable to map device', rawdev) - - -def dmcrypt_unmap( - uuid - ): - args = [ - 'cryptsetup', - 'remove', - uuid - ] - - try: - subprocess.check_call(args) - - except subprocess.CalledProcessError as e: - raise PrepareError('unable to unmap device', uuid) - - -def mount( - dev, - fstype, - options, - ): - # pick best-of-breed mount options based on fs type - if options is None: - options = MOUNT_OPTIONS.get(fstype, '') - - # mount - path = tempfile.mkdtemp( - prefix='mnt.', - dir='/var/lib/ceph/tmp', - ) - try: - log.debug('Mounting %s on %s with options %s', dev, path, options) - subprocess.check_call( - args=[ - 'mount', - '-o', options, - '--', - dev, - path, - ], - ) - except subprocess.CalledProcessError as e: - try: - os.rmdir(path) - except (OSError, IOError): - pass - raise MountError(e) - - return path - - -def unmount( - path, - ): - try: - log.debug('Unmounting %s', path) - subprocess.check_call( - args=[ - 'umount', - '--', - path, - ], - ) - except subprocess.CalledProcessError as e: - raise UnmountError(e) - - os.rmdir(path) - - -def get_free_partition_index(dev): - try: - lines = _check_output( - args=[ - 'parted', - '--machine', - '--', - dev, - 'print', - ], - ) - except subprocess.CalledProcessError as e: - print 'cannot read partition index; assume it isn\'t present\n' - return 1 - - if not lines: - raise PrepareError('parted failed to output anything') - lines = lines.splitlines(True) - - if lines[0] not in ['CHS;\n', 'CYL;\n', 'BYT;\n']: - raise PrepareError('weird parted units', lines[0]) - del lines[0] - - if not lines[0].startswith('/dev/'): - raise PrepareError('weird parted disk entry', lines[0]) - del lines[0] - - seen = set() - for line in lines: - idx, _ = line.split(':', 1) - idx = int(idx) - seen.add(idx) - - num = 1 - while num in seen: - num += 1 - return num - - -def zap(dev): - """ - Destroy the partition table and content of a given disk. - """ - try: - log.debug('Zapping partition table on %s', dev) - - # try to wipe out any GPT partition table backups. sgdisk - # isn't too thorough. - lba_size = 4096 - size = 33 * lba_size - with file(dev, 'wb') as f: - f.seek(-size, os.SEEK_END) - f.write(size*'\0') - - subprocess.check_call( - args=[ - 'sgdisk', - '--zap-all', - '--clear', - '--mbrtogpt', - '--', - dev, - ], - ) - except subprocess.CalledProcessError as e: - raise PrepareError(e) - - -def prepare_journal_dev( - data, - journal, - journal_size, - journal_uuid, - journal_dm_keypath, - ): - - if is_partition(journal): - log.debug('Journal %s is a partition', journal) - log.warning('OSD will not be hot-swappable if journal is not the same device as the osd data') - return (journal, None, None) - - key = None - ptype = JOURNAL_UUID - if journal_dm_keypath: - ptype = DMCRYPT_JOURNAL_UUID - - # it is a whole disk. create a partition! - num = None - if journal == data: - # we're sharing the disk between osd data and journal; - # make journal be partition number 2, so it's pretty; put - # journal at end of free space so partitioning tools don't - # reorder them suddenly - num = 2 - journal_part = '{num}:-{size}M:0'.format( - num=num, - size=journal_size, - ) - else: - # sgdisk has no way for me to say "whatever is the next - # free index number" when setting type guids etc, so we - # need to awkwardly look up the next free number, and then - # fix that in the call -- and hope nobody races with us; - # then again nothing guards the partition table from races - # anyway - num = get_free_partition_index(dev=journal) - journal_part = '{num}:0:+{size}M'.format( - num=num, - size=journal_size, - ) - log.warning('OSD will not be hot-swappable if journal is not the same device as the osd data') - - try: - log.debug('Creating journal partition num %d size %d on %s', num, journal_size, journal) - subprocess.check_call( - args=[ - 'sgdisk', - '--new={part}'.format(part=journal_part), - '--change-name={num}:ceph journal'.format(num=num), - '--partition-guid={num}:{journal_uuid}'.format( - num=num, - journal_uuid=journal_uuid, - ), - '--typecode={num}:{uuid}'.format( - num=num, - uuid=ptype, - ), - '--', - journal, - ], - ) - subprocess.check_call( - args=[ - # also make sure the kernel refreshes the new table - 'partprobe', - journal, - ], - ) - - journal_symlink = '/dev/disk/by-partuuid/{journal_uuid}'.format( - journal_uuid=journal_uuid, - ) - - journal_dmcrypt = None - if journal_dm_keypath: - journal_dmcrypt = journal_symlink - journal_symlink = '/dev/mapper/{uuid}'.format(uuid=journal_uuid) - - log.debug('Journal is GPT partition %s', journal_symlink) - return (journal_symlink, journal_dmcrypt, journal_uuid) - - except subprocess.CalledProcessError as e: - raise PrepareError(e) - - -def prepare_journal_file( - journal, - journal_size): - - if not os.path.exists(journal): - log.debug('Creating journal file %s with size %dM', journal, journal_size) - with file(journal, 'wb') as f: - f.truncate(journal_size * 1048576) - - # FIXME: should we resize an existing journal file? - - log.debug('Journal is file %s', journal) - log.warning('OSD will not be hot-swappable if journal is not the same device as the osd data') - return (journal, None, None) - - -def prepare_journal( - data, - journal, - journal_size, - journal_uuid, - force_file, - force_dev, - journal_dm_keypath, - ): - - if journal is None: - if force_dev: - raise PrepareError('Journal is unspecified; not a block device') - return (None, None, None) - - if not os.path.exists(journal): - if force_dev: - raise PrepareError('Journal does not exist; not a block device', journal) - return prepare_journal_file(journal, journal_size) - - jmode = os.stat(journal).st_mode - if stat.S_ISREG(jmode): - if force_dev: - raise PrepareError('Journal is not a block device', journal) - return prepare_journal_file(journal, journal_size) - - if stat.S_ISBLK(jmode): - if force_file: - raise PrepareError('Journal is not a regular file', journal) - return prepare_journal_dev(data, journal, journal_size, journal_uuid, journal_dm_keypath) - - raise PrepareError('Journal %s is neither a block device nor regular file', journal) - - -def adjust_symlink(target, path): - create = True - if os.path.lexists(path): - try: - mode = os.path.lstat(canonical).st_mode - if stat.S_ISREG(mode): - log.debug('Removing old file %s', canonical) - os.unlink(canonical) - elif stat.S_ISLNK(mode): - old = os.readlink(canonical) - if old != journal: - log.debug('Removing old symlink %s -> %s', canonical, old) - os.unlink(canonical) - else: - create = False - except: - raise PrepareError('unable to remove (or adjust) old file (symlink)', canonical) - if create: - log.debug('Creating symlink %s -> %s', path, target) - try: - os.symlink(target, path) - except: - raise PrepareError('unable to create symlink %s -> %s' % (path, target)) - -def prepare_dir( - path, - journal, - cluster_uuid, - osd_uuid, - journal_uuid, - journal_dmcrypt = None, - ): - log.debug('Preparing osd data dir %s', path) - - if osd_uuid is None: - osd_uuid = str(uuid.uuid4()) - - if journal is not None: - # we're using an external journal; point to it here - adjust_symlink(journal, os.path.join(path, 'journal')) - - if journal_dmcrypt is not None: - adjust_symlink(journal_dmcrypt, os.path.join(path, 'journal_dmcrypt')) - else: - try: - os.unlink(os.path.join(path, 'journal_dmcrypt')) - except: - pass - - write_one_line(path, 'ceph_fsid', cluster_uuid) - write_one_line(path, 'fsid', osd_uuid) - write_one_line(path, 'magic', CEPH_OSD_ONDISK_MAGIC) - - if journal_uuid is not None: - # i.e., journal is a tagged partition - write_one_line(path, 'journal_uuid', journal_uuid) - -def prepare_dev( - data, - journal, - fstype, - mkfs_args, - mount_options, - cluster_uuid, - osd_uuid, - journal_uuid, - journal_dmcrypt, - osd_dm_keypath, - ): - """ - Prepare a data/journal combination to be used for an OSD. - - The ``magic`` file is written last, so it's presence is a reliable - indicator of the whole sequence having completed. - - WARNING: This will unconditionally overwrite anything given to - it. - """ - - ptype_tobe = TOBE_UUID - ptype_osd = OSD_UUID - if osd_dm_keypath: - ptype_tobe = DMCRYPT_TOBE_UUID - ptype_osd = DMCRYPT_OSD_UUID - - rawdev = None - if is_partition(data): - log.debug('OSD data device %s is a partition', data) - rawdev = data - else: - log.debug('Creating osd partition on %s', data) - try: - subprocess.check_call( - args=[ - 'sgdisk', - '--largest-new=1', - '--change-name=1:ceph data', - '--partition-guid=1:{osd_uuid}'.format( - osd_uuid=osd_uuid, - ), - '--typecode=1:%s' % ptype_tobe, - '--', - data, - ], - ) - subprocess.check_call( - args=[ - # also make sure the kernel refreshes the new table - 'partprobe', - data, - ], - ) - except subprocess.CalledProcessError as e: - raise PrepareError(e) - - rawdev = '{data}1'.format(data=data) - - dev = None - if osd_dm_keypath: - dev = dmcrypt_map(rawdev, osd_dm_keypath, osd_uuid) - else: - dev = rawdev - - try: - args = [ - 'mkfs', - '-t', - fstype, - ] - if mkfs_args is not None: - args.extend(mkfs_args.split()) - if fstype == 'xfs': - args.extend(['-f']) # always force - else: - args.extend(MKFS_ARGS.get(fstype, [])) - args.extend([ - '--', - dev, - ]) - try: - log.debug('Creating %s fs on %s', fstype, dev) - subprocess.check_call(args=args) - except subprocess.CalledProcessError as e: - raise PrepareError(e) - - #remove whitespaces from mount_options - if mount_options is not None: - mount_options = "".join(mount_options.split()) - - path = mount(dev=dev, fstype=fstype, options=mount_options) - - try: - prepare_dir( - path=path, - journal=journal, - cluster_uuid=cluster_uuid, - osd_uuid=osd_uuid, - journal_uuid=journal_uuid, - journal_dmcrypt=journal_dmcrypt, - ) - finally: - unmount(path) - finally: - if rawdev != dev: - dmcrypt_unmap(osd_uuid) - - if not is_partition(data): - try: - subprocess.check_call( - args=[ - 'sgdisk', - '--typecode=1:%s' % ptype_osd, - '--', - data, - ], - ) - subprocess.check_call( - args=[ - # also make sure the kernel refreshes the new table - 'partprobe', - data, - ], - ) - except subprocess.CalledProcessError as e: - raise PrepareError(e) - - -def parse_args(): - parser = argparse.ArgumentParser( - description='Prepare a directory for a Ceph OSD', - ) - parser.add_argument( - '-v', '--verbose', - action='store_true', default=None, - help='be more verbose', - ) - parser.add_argument( - '--cluster', - metavar='NAME', - help='cluster name to assign this disk to', - ) - parser.add_argument( - '--cluster-uuid', - metavar='UUID', - help='cluster uuid to assign this disk to', - ) - parser.add_argument( - '--osd-uuid', - metavar='UUID', - help='unique OSD uuid to assign this disk to', - ) - parser.add_argument( - '--journal-uuid', - metavar='UUID', - help='unique uuid to assign to the journal', - ) - parser.add_argument( - '--fs-type', - help='file system type to use (e.g. "ext4")', - ) - parser.add_argument( - '--zap-disk', - action='store_true', default=None, - help='destroy the partition table (and content) of a disk', - ) - parser.add_argument( - '--data-dir', - action='store_true', default=None, - help='verify that DATA is a dir', - ) - parser.add_argument( - '--data-dev', - action='store_true', default=None, - help='verify that DATA is a block device', - ) - parser.add_argument( - '--journal-file', - action='store_true', default=None, - help='verify that JOURNAL is a file', - ) - parser.add_argument( - '--journal-dev', - action='store_true', default=None, - help='verify that JOURNAL is a block device', - ) - parser.add_argument( - '--dmcrypt', - action='store_true', default=None, - help='encrypt DATA and/or JOURNAL devices with dm-crypt', - ) - parser.add_argument( - '--dmcrypt-key-dir', - metavar='KEYDIR', - default='/etc/ceph/dmcrypt-keys', - help='directory where dm-crypt keys are stored', - ) - parser.add_argument( - 'data', - metavar='DATA', - help='path to OSD data (a disk block device or directory)', - ) - parser.add_argument( - 'journal', - metavar='JOURNAL', - nargs='?', - help=('path to OSD journal disk block device;' - + ' leave out to store journal in file'), - ) - parser.set_defaults( - # we want to hold on to this, for later - prog=parser.prog, - cluster='ceph', - ) - args = parser.parse_args() - return args - - -def main(): - args = parse_args() - - loglevel = logging.INFO - if args.verbose: - loglevel = logging.DEBUG - - logging.basicConfig( - level=loglevel, - ) - - journal_dm_keypath = None - osd_dm_keypath = None - - try: - if not os.path.exists(args.data): - raise PrepareError('data path does not exist', args.data) - - # in use? - dmode = os.stat(args.data).st_mode - if stat.S_ISBLK(dmode): - verify_not_in_use(args.data) - - if args.journal and os.path.exists(args.journal): - jmode = os.stat(args.journal).st_mode - if stat.S_ISBLK(jmode): - verify_not_in_use(args.journal) - - if args.zap_disk is not None: - if stat.S_ISBLK(dmode) and not is_partition(args.data): - zap(args.data) - else: - raise PrepareError('not full block device; cannot zap', args.data) - - if args.cluster_uuid is None: - args.cluster_uuid = get_fsid(cluster=args.cluster) - if args.cluster_uuid is None: - raise PrepareError( - 'must have fsid in config or pass --cluster--uuid=', - ) - - if args.fs_type is None: - args.fs_type = get_conf( - cluster=args.cluster, - variable='osd_mkfs_type', - ) - if args.fs_type is None: - args.fs_type = get_conf( - cluster=args.cluster, - variable='osd_fs_type', - ) - if args.fs_type is None: - args.fs_type = DEFAULT_FS_TYPE - - mkfs_args = get_conf( - cluster=args.cluster, - variable='osd_mkfs_options_{fstype}'.format( - fstype=args.fs_type, - ), - ) - if mkfs_args is None: - mkfs_args = get_conf( - cluster=args.cluster, - variable='osd_fs_mkfs_options_{fstype}'.format( - fstype=args.fs_type, - ), - ) - - mount_options = get_conf( - cluster=args.cluster, - variable='osd_mount_options_{fstype}'.format( - fstype=args.fs_type, - ), - ) - if mount_options is None: - mount_options = get_conf( - cluster=args.cluster, - variable='osd_fs_mount_options_{fstype}'.format( - fstype=args.fs_type, - ), - ) - - journal_size = get_conf_with_default( - cluster=args.cluster, - variable='osd_journal_size', - ) - journal_size = int(journal_size) - - # colocate journal with data? - if stat.S_ISBLK(dmode) and not is_partition(args.data) and args.journal is None and args.journal_file is None: - log.info('Will colocate journal with data on %s', args.data) - args.journal = args.data - - if args.journal_uuid is None: - args.journal_uuid = str(uuid.uuid4()) - if args.osd_uuid is None: - args.osd_uuid = str(uuid.uuid4()) - - # dm-crypt keys? - if args.dmcrypt: - journal_dm_keypath = get_or_create_dmcrypt_key(args.journal_uuid, args.dmcrypt_key_dir) - osd_dm_keypath = get_or_create_dmcrypt_key(args.osd_uuid, args.dmcrypt_key_dir) - - # prepare journal - (journal_symlink, journal_dmcrypt, journal_uuid) = prepare_journal( - data=args.data, - journal=args.journal, - journal_size=journal_size, - journal_uuid=args.journal_uuid, - force_file=args.journal_file, - force_dev=args.journal_dev, - journal_dm_keypath=journal_dm_keypath, - ) - - # prepare data - if stat.S_ISDIR(dmode): - if args.data_dev: - raise PrepareError('data path is not a block device', args.data) - prepare_dir( - path=args.data, - journal=journal_symlink, - cluster_uuid=args.cluster_uuid, - osd_uuid=args.osd_uuid, - journal_uuid=journal_uuid, - journal_dmcrypt=journal_dmcrypt, - ) - elif stat.S_ISBLK(dmode): - if args.data_dir: - raise PrepareError('data path is not a directory', args.data) - prepare_dev( - data=args.data, - journal=journal_symlink, - fstype=args.fs_type, - mkfs_args=mkfs_args, - mount_options=mount_options, - cluster_uuid=args.cluster_uuid, - osd_uuid=args.osd_uuid, - journal_uuid=journal_uuid, - journal_dmcrypt=journal_dmcrypt, - osd_dm_keypath=osd_dm_keypath, - ) - else: - raise PrepareError('not a dir or block device', args.data) - - except PrepareError as e: - if journal_dm_keypath: - os.unlink(journal_dm_keypath) - if osd_dm_keypath: - os.unlink(osd_dm_keypath) - print >>sys.stderr, '{prog}: {msg}'.format( - prog=args.prog, - msg=e, - ) - sys.exit(1) - -if __name__ == '__main__': - main() +#!/bin/sh +dir=`dirname $0` +$dir/ceph-disk prepare $* |