summaryrefslogtreecommitdiff
path: root/bin
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2013-04-04 23:08:25 +0000
committerGerrit Code Review <review@openstack.org>2013-04-04 23:08:25 +0000
commit2d97609a52bef2f437c15ff72dced29663c570ff (patch)
tree76d77ecbd0d7da9bfa9fdedeba73f08f63f66e38 /bin
parentfab61c8275893c7ab7279336b9d64d1154264059 (diff)
parent2b3d1719073fa58b651ca82f64a366e3f737d71a (diff)
downloadpython-swiftclient-2d97609a52bef2f437c15ff72dced29663c570ff.tar.gz
Merge "Static large object support."1.4.0
Diffstat (limited to 'bin')
-rwxr-xr-xbin/swift139
1 files changed, 105 insertions, 34 deletions
diff --git a/bin/swift b/bin/swift
index 6746c25..c94be8c 100755
--- a/bin/swift
+++ b/bin/swift
@@ -30,6 +30,11 @@ from time import sleep, time
from traceback import format_exception
from urllib import quote, unquote
+try:
+ import simplejson as json
+except ImportError:
+ import json
+
from swiftclient import Connection, ClientException, HTTPException, utils
from swiftclient.version import version_info
@@ -111,6 +116,8 @@ class QueueFunctionThread(Thread):
self.args = args
self.kwargs = kwargs
self.exc_infos = []
+ self.results = []
+ self.store_results = kwargs.pop('store_results', False)
def run(self):
while True:
@@ -123,7 +130,9 @@ class QueueFunctionThread(Thread):
else:
try:
if not self.abort:
- self.func(item, *self.args, **self.kwargs)
+ res = self.func(item, *self.args, **self.kwargs)
+ if self.store_results:
+ self.results.append(res)
except Exception:
self.exc_infos.append(exc_info())
finally:
@@ -171,19 +180,23 @@ def st_delete(parser, args, print_queue, error_queue):
def _delete_object((container, obj), conn):
try:
old_manifest = None
+ query_string = None
if not options.leave_segments:
try:
- old_manifest = conn.head_object(container, obj).get(
- 'x-object-manifest')
+ headers = conn.head_object(container, obj)
+ old_manifest = headers.get('x-object-manifest')
+ if utils.config_true_value(
+ headers.get('x-static-large-object')):
+ query_string = 'multipart-manifest=delete'
except ClientException, err:
if err.http_status != 404:
raise
- conn.delete_object(container, obj)
+ conn.delete_object(container, obj, query_string=query_string)
if old_manifest:
segment_queue = Queue(10000)
scontainer, sprefix = old_manifest.split('/', 1)
scontainer = unquote(scontainer)
- sprefix = unquote(sprefix)
+ sprefix = unquote(sprefix).rstrip('/') + '/'
for delobj in conn.get_container(scontainer,
prefix=sprefix)[1]:
segment_queue.put((scontainer, delobj['name']))
@@ -793,7 +806,10 @@ def st_upload(parser, args, print_queue, error_queue):
default=[], help='Set request headers with the syntax header:value. '
' This option may be repeated. Example -H content-type:text/plain '
'-H "Content-Length: 4000"')
-
+ parser.add_option('', '--use-slo', action='store_true', default=False,
+ help='When used in conjuction with --segment-size will '
+ 'create a Static Large Object instead of the default '
+ 'Dynamic Large Object.')
(options, args) = parse_args(parser, args)
args = args[1:]
if len(args) < 2:
@@ -811,14 +827,17 @@ def st_upload(parser, args, print_queue, error_queue):
seg_container = args[0] +'_segments'
if options.segment_container:
seg_container = options.segment_container
- conn.put_object(job.get('container', seg_container),
+ etag = conn.put_object(job.get('container', seg_container),
job['obj'], fp, content_length=job['segment_size'])
+ job['segment_location'] = '/%s/%s' % (seg_container, job['obj'])
+ job['segment_etag'] = etag
if options.verbose and 'log_line' in job:
if conn.attempts > 1:
print_queue.put('%s [after %d attempts]' %
(job['log_line'], conn.attempts))
else:
print_queue.put(job['log_line'])
+ return job
def _object_job(job, conn):
path = job['path']
@@ -855,6 +874,8 @@ def st_upload(parser, args, print_queue, error_queue):
# manifest object and need to delete the old segments
# ourselves.
old_manifest = None
+ old_slo_manifest_paths = []
+ new_slo_manifest_paths = set()
if options.changed or not options.leave_segments:
try:
headers = conn.head_object(container, obj)
@@ -865,6 +886,16 @@ def st_upload(parser, args, print_queue, error_queue):
return
if not options.leave_segments:
old_manifest = headers.get('x-object-manifest')
+ if utils.config_true_value(
+ headers.get('x-static-large-object')):
+ headers, manifest_data = conn.get_object(
+ container, obj,
+ query_string='multipart-manifest=get')
+ for old_seg in json.loads(manifest_data):
+ seg_path = old_seg['name'].lstrip('/')
+ if isinstance(seg_path, unicode):
+ seg_path = seg_path.encode('utf-8')
+ old_slo_manifest_paths.append(seg_path)
except ClientException, err:
if err.http_status != 404:
raise
@@ -879,9 +910,10 @@ def st_upload(parser, args, print_queue, error_queue):
seg_container = options.segment_container
full_size = getsize(path)
segment_queue = Queue(10000)
- segment_threads = [QueueFunctionThread(segment_queue,
- _segment_job, create_connection()) for _junk in
- xrange(options.segment_threads)]
+ segment_threads = [
+ QueueFunctionThread(segment_queue,
+ _segment_job, create_connection(), store_results=True)
+ for _junk in xrange(options.segment_threads)]
for thread in segment_threads:
thread.start()
segment = 0
@@ -890,13 +922,20 @@ def st_upload(parser, args, print_queue, error_queue):
segment_size = int(options.segment_size)
if segment_start + segment_size > full_size:
segment_size = full_size - segment_start
- segment_queue.put({'path': path,
- 'obj': '%s/%s/%s/%s/%08d' % (obj,
- put_headers['x-object-meta-mtime'], full_size,
- options.segment_size, segment),
- 'segment_start': segment_start,
- 'segment_size': segment_size,
- 'log_line': '%s segment %s' % (obj, segment)})
+ if options.use_slo:
+ segment_name = '%s/slo/%s/%s/%s/%08d' % (
+ obj, put_headers['x-object-meta-mtime'],
+ full_size, options.segment_size, segment)
+ else:
+ segment_name = '%s/%s/%s/%s/%08d' % (
+ obj, put_headers['x-object-meta-mtime'],
+ full_size, options.segment_size, segment)
+ segment_queue.put(
+ {'path': path, 'obj': segment_name,
+ 'segment_start': segment_start,
+ 'segment_size': segment_size,
+ 'segment_index': segment,
+ 'log_line': '%s segment %s' % (obj, segment)})
segment += 1
segment_start += segment_size
while not segment_queue.empty():
@@ -909,27 +948,59 @@ def st_upload(parser, args, print_queue, error_queue):
raise ClientException('Aborting manifest creation '
'because not all segments could be uploaded. %s/%s'
% (container, obj))
- new_object_manifest = '%s/%s/%s/%s/%s' % (
- quote(seg_container), quote(obj),
- put_headers['x-object-meta-mtime'], full_size,
- options.segment_size)
- if old_manifest == new_object_manifest:
- old_manifest = None
- put_headers['x-object-manifest'] = new_object_manifest
- conn.put_object(container, obj, '', content_length=0,
- headers=put_headers)
+ if options.use_slo:
+ slo_segments = []
+ for thread in segment_threads:
+ slo_segments += thread.results
+ slo_segments.sort(key=lambda d: d['segment_index'])
+ for seg in slo_segments:
+ seg_loc = seg['segment_location'].lstrip('/')
+ if isinstance(seg_loc, unicode):
+ seg_loc = seg_loc.encode('utf-8')
+ new_slo_manifest_paths.add(seg_loc)
+
+ manifest_data = json.dumps([
+ {'path': d['segment_location'],
+ 'etag': d['segment_etag'],
+ 'size_bytes': d['segment_size']}
+ for d in slo_segments])
+
+ put_headers['x-static-large-object'] = 'true'
+ conn.put_object(container, obj, manifest_data,
+ headers=put_headers,
+ query_string='multipart-manifest=put')
+ else:
+ new_object_manifest = '%s/%s/%s/%s/%s/' % (
+ quote(seg_container), quote(obj),
+ put_headers['x-object-meta-mtime'], full_size,
+ options.segment_size)
+ if old_manifest and old_manifest.rstrip('/') == \
+ new_object_manifest.rstrip('/'):
+ old_manifest = None
+ put_headers['x-object-manifest'] = new_object_manifest
+ conn.put_object(container, obj, '', content_length=0,
+ headers=put_headers)
else:
conn.put_object(container, obj, open(path, 'rb'),
content_length=getsize(path), headers=put_headers)
- if old_manifest:
+ if old_manifest or old_slo_manifest_paths:
segment_queue = Queue(10000)
- scontainer, sprefix = old_manifest.split('/', 1)
- scontainer = unquote(scontainer)
- sprefix = unquote(sprefix)
- for delobj in conn.get_container(scontainer,
- prefix=sprefix)[1]:
- segment_queue.put({'delete': True,
- 'container': scontainer, 'obj': delobj['name']})
+ if old_manifest:
+ scontainer, sprefix = old_manifest.split('/', 1)
+ scontainer = unquote(scontainer)
+ sprefix = unquote(sprefix).rstrip('/') + '/'
+ for delobj in conn.get_container(scontainer,
+ prefix=sprefix)[1]:
+ segment_queue.put({'delete': True,
+ 'container': scontainer, 'obj': delobj['name']})
+ if old_slo_manifest_paths:
+ for seg_to_delete in old_slo_manifest_paths:
+ if seg_to_delete in new_slo_manifest_paths:
+ continue
+ scont, sobj = \
+ seg_to_delete.split('/', 1)
+ segment_queue.put({'delete': True,
+ 'container': scont, 'obj': sobj})
if not segment_queue.empty():
segment_threads = [QueueFunctionThread(segment_queue,
_segment_job, create_connection()) for _junk in