diff options
| author | Jenkins <jenkins@review.openstack.org> | 2013-04-04 23:08:25 +0000 |
|---|---|---|
| committer | Gerrit Code Review <review@openstack.org> | 2013-04-04 23:08:25 +0000 |
| commit | 2d97609a52bef2f437c15ff72dced29663c570ff (patch) | |
| tree | 76d77ecbd0d7da9bfa9fdedeba73f08f63f66e38 /bin | |
| parent | fab61c8275893c7ab7279336b9d64d1154264059 (diff) | |
| parent | 2b3d1719073fa58b651ca82f64a366e3f737d71a (diff) | |
| download | python-swiftclient-2d97609a52bef2f437c15ff72dced29663c570ff.tar.gz | |
Merge "Static large object support."1.4.0
Diffstat (limited to 'bin')
| -rwxr-xr-x | bin/swift | 139 |
1 files changed, 105 insertions, 34 deletions
@@ -30,6 +30,11 @@ from time import sleep, time from traceback import format_exception from urllib import quote, unquote +try: + import simplejson as json +except ImportError: + import json + from swiftclient import Connection, ClientException, HTTPException, utils from swiftclient.version import version_info @@ -111,6 +116,8 @@ class QueueFunctionThread(Thread): self.args = args self.kwargs = kwargs self.exc_infos = [] + self.results = [] + self.store_results = kwargs.pop('store_results', False) def run(self): while True: @@ -123,7 +130,9 @@ class QueueFunctionThread(Thread): else: try: if not self.abort: - self.func(item, *self.args, **self.kwargs) + res = self.func(item, *self.args, **self.kwargs) + if self.store_results: + self.results.append(res) except Exception: self.exc_infos.append(exc_info()) finally: @@ -171,19 +180,23 @@ def st_delete(parser, args, print_queue, error_queue): def _delete_object((container, obj), conn): try: old_manifest = None + query_string = None if not options.leave_segments: try: - old_manifest = conn.head_object(container, obj).get( - 'x-object-manifest') + headers = conn.head_object(container, obj) + old_manifest = headers.get('x-object-manifest') + if utils.config_true_value( + headers.get('x-static-large-object')): + query_string = 'multipart-manifest=delete' except ClientException, err: if err.http_status != 404: raise - conn.delete_object(container, obj) + conn.delete_object(container, obj, query_string=query_string) if old_manifest: segment_queue = Queue(10000) scontainer, sprefix = old_manifest.split('/', 1) scontainer = unquote(scontainer) - sprefix = unquote(sprefix) + sprefix = unquote(sprefix).rstrip('/') + '/' for delobj in conn.get_container(scontainer, prefix=sprefix)[1]: segment_queue.put((scontainer, delobj['name'])) @@ -793,7 +806,10 @@ def st_upload(parser, args, print_queue, error_queue): default=[], help='Set request headers with the syntax header:value. ' ' This option may be repeated. Example -H content-type:text/plain ' '-H "Content-Length: 4000"') - + parser.add_option('', '--use-slo', action='store_true', default=False, + help='When used in conjuction with --segment-size will ' + 'create a Static Large Object instead of the default ' + 'Dynamic Large Object.') (options, args) = parse_args(parser, args) args = args[1:] if len(args) < 2: @@ -811,14 +827,17 @@ def st_upload(parser, args, print_queue, error_queue): seg_container = args[0] +'_segments' if options.segment_container: seg_container = options.segment_container - conn.put_object(job.get('container', seg_container), + etag = conn.put_object(job.get('container', seg_container), job['obj'], fp, content_length=job['segment_size']) + job['segment_location'] = '/%s/%s' % (seg_container, job['obj']) + job['segment_etag'] = etag if options.verbose and 'log_line' in job: if conn.attempts > 1: print_queue.put('%s [after %d attempts]' % (job['log_line'], conn.attempts)) else: print_queue.put(job['log_line']) + return job def _object_job(job, conn): path = job['path'] @@ -855,6 +874,8 @@ def st_upload(parser, args, print_queue, error_queue): # manifest object and need to delete the old segments # ourselves. old_manifest = None + old_slo_manifest_paths = [] + new_slo_manifest_paths = set() if options.changed or not options.leave_segments: try: headers = conn.head_object(container, obj) @@ -865,6 +886,16 @@ def st_upload(parser, args, print_queue, error_queue): return if not options.leave_segments: old_manifest = headers.get('x-object-manifest') + if utils.config_true_value( + headers.get('x-static-large-object')): + headers, manifest_data = conn.get_object( + container, obj, + query_string='multipart-manifest=get') + for old_seg in json.loads(manifest_data): + seg_path = old_seg['name'].lstrip('/') + if isinstance(seg_path, unicode): + seg_path = seg_path.encode('utf-8') + old_slo_manifest_paths.append(seg_path) except ClientException, err: if err.http_status != 404: raise @@ -879,9 +910,10 @@ def st_upload(parser, args, print_queue, error_queue): seg_container = options.segment_container full_size = getsize(path) segment_queue = Queue(10000) - segment_threads = [QueueFunctionThread(segment_queue, - _segment_job, create_connection()) for _junk in - xrange(options.segment_threads)] + segment_threads = [ + QueueFunctionThread(segment_queue, + _segment_job, create_connection(), store_results=True) + for _junk in xrange(options.segment_threads)] for thread in segment_threads: thread.start() segment = 0 @@ -890,13 +922,20 @@ def st_upload(parser, args, print_queue, error_queue): segment_size = int(options.segment_size) if segment_start + segment_size > full_size: segment_size = full_size - segment_start - segment_queue.put({'path': path, - 'obj': '%s/%s/%s/%s/%08d' % (obj, - put_headers['x-object-meta-mtime'], full_size, - options.segment_size, segment), - 'segment_start': segment_start, - 'segment_size': segment_size, - 'log_line': '%s segment %s' % (obj, segment)}) + if options.use_slo: + segment_name = '%s/slo/%s/%s/%s/%08d' % ( + obj, put_headers['x-object-meta-mtime'], + full_size, options.segment_size, segment) + else: + segment_name = '%s/%s/%s/%s/%08d' % ( + obj, put_headers['x-object-meta-mtime'], + full_size, options.segment_size, segment) + segment_queue.put( + {'path': path, 'obj': segment_name, + 'segment_start': segment_start, + 'segment_size': segment_size, + 'segment_index': segment, + 'log_line': '%s segment %s' % (obj, segment)}) segment += 1 segment_start += segment_size while not segment_queue.empty(): @@ -909,27 +948,59 @@ def st_upload(parser, args, print_queue, error_queue): raise ClientException('Aborting manifest creation ' 'because not all segments could be uploaded. %s/%s' % (container, obj)) - new_object_manifest = '%s/%s/%s/%s/%s' % ( - quote(seg_container), quote(obj), - put_headers['x-object-meta-mtime'], full_size, - options.segment_size) - if old_manifest == new_object_manifest: - old_manifest = None - put_headers['x-object-manifest'] = new_object_manifest - conn.put_object(container, obj, '', content_length=0, - headers=put_headers) + if options.use_slo: + slo_segments = [] + for thread in segment_threads: + slo_segments += thread.results + slo_segments.sort(key=lambda d: d['segment_index']) + for seg in slo_segments: + seg_loc = seg['segment_location'].lstrip('/') + if isinstance(seg_loc, unicode): + seg_loc = seg_loc.encode('utf-8') + new_slo_manifest_paths.add(seg_loc) + + manifest_data = json.dumps([ + {'path': d['segment_location'], + 'etag': d['segment_etag'], + 'size_bytes': d['segment_size']} + for d in slo_segments]) + + put_headers['x-static-large-object'] = 'true' + conn.put_object(container, obj, manifest_data, + headers=put_headers, + query_string='multipart-manifest=put') + else: + new_object_manifest = '%s/%s/%s/%s/%s/' % ( + quote(seg_container), quote(obj), + put_headers['x-object-meta-mtime'], full_size, + options.segment_size) + if old_manifest and old_manifest.rstrip('/') == \ + new_object_manifest.rstrip('/'): + old_manifest = None + put_headers['x-object-manifest'] = new_object_manifest + conn.put_object(container, obj, '', content_length=0, + headers=put_headers) else: conn.put_object(container, obj, open(path, 'rb'), content_length=getsize(path), headers=put_headers) - if old_manifest: + if old_manifest or old_slo_manifest_paths: segment_queue = Queue(10000) - scontainer, sprefix = old_manifest.split('/', 1) - scontainer = unquote(scontainer) - sprefix = unquote(sprefix) - for delobj in conn.get_container(scontainer, - prefix=sprefix)[1]: - segment_queue.put({'delete': True, - 'container': scontainer, 'obj': delobj['name']}) + if old_manifest: + scontainer, sprefix = old_manifest.split('/', 1) + scontainer = unquote(scontainer) + sprefix = unquote(sprefix).rstrip('/') + '/' + for delobj in conn.get_container(scontainer, + prefix=sprefix)[1]: + segment_queue.put({'delete': True, + 'container': scontainer, 'obj': delobj['name']}) + if old_slo_manifest_paths: + for seg_to_delete in old_slo_manifest_paths: + if seg_to_delete in new_slo_manifest_paths: + continue + scont, sobj = \ + seg_to_delete.split('/', 1) + segment_queue.put({'delete': True, + 'container': scont, 'obj': sobj}) if not segment_queue.empty(): segment_threads = [QueueFunctionThread(segment_queue, _segment_job, create_connection()) for _junk in |
