summaryrefslogtreecommitdiff
path: root/bin
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2013-07-05 17:16:34 +0000
committerGerrit Code Review <review@openstack.org>2013-07-05 17:16:34 +0000
commit6f7458a2903f6be2bf3271825e837d30b4e9f737 (patch)
tree6005995e9a513f6e8211670e673b359ceb70dcc8 /bin
parentb03843f3ac8bef30c9b7e0d436d8984d6d4146c2 (diff)
parentf022aac0cf460ca4d3208ba1c22fae5f32ae34af (diff)
downloadpython-swiftclient-6f7458a2903f6be2bf3271825e837d30b4e9f737.tar.gz
Merge "Add -p option to download command."
Diffstat (limited to 'bin')
-rwxr-xr-xbin/swift281
1 files changed, 151 insertions, 130 deletions
diff --git a/bin/swift b/bin/swift
index b444013..416b183 100755
--- a/bin/swift
+++ b/bin/swift
@@ -25,7 +25,7 @@ from os.path import basename, dirname, getmtime, getsize, isdir, join
from Queue import Queue
from random import shuffle
from sys import argv, exc_info, exit, stderr, stdout
-from threading import enumerate as threading_enumerate, Thread
+from threading import Thread
from time import sleep, time, gmtime, strftime
from traceback import format_exception
from urllib import quote, unquote
@@ -84,16 +84,6 @@ class StopWorkerThreadSignal(object):
pass
-def shutdown_worker_threads(queue, thread_list):
- for thread in [t for t in thread_list if t.isAlive()]:
- queue.put(StopWorkerThreadSignal())
-
-
-def immediate_exit(signum, frame):
- stderr.write(" Aborted\n")
- os_exit(2)
-
-
class QueueFunctionThread(Thread):
def __init__(self, queue, func, *args, **kwargs):
@@ -128,6 +118,24 @@ class QueueFunctionThread(Thread):
self.exc_infos.append(exc_info())
+def shutdown_worker_threads(queue, thread_list):
+ """
+ Takes a job queue and a list of associated QueueFunctionThread objects,
+ puts a StopWorkerThreadSignal object into the queue, and waits for the
+ queue to flush.
+ """
+ for thread in [t for t in thread_list if t.isAlive()]:
+ queue.put(StopWorkerThreadSignal())
+
+ while any(map(QueueFunctionThread.is_alive, thread_list)):
+ sleep(0.05)
+
+
+def immediate_exit(signum, frame):
+ stderr.write(" Aborted\n")
+ os_exit(2)
+
+
st_delete_help = '''
delete [options] --all OR delete container [options] [object] [object] ...
Deletes everything in the account (with --all), or everything in a
@@ -261,47 +269,52 @@ def st_delete(parser, args, print_queue, error_queue):
for _junk in xrange(options.container_threads)]
for thread in container_threads:
thread.start()
- if not args:
- conn = create_connection()
- try:
- marker = ''
- while True:
- containers = \
- [c['name'] for c in conn.get_account(marker=marker)[1]]
- if not containers:
- break
- for container in containers:
- container_queue.put(container)
- marker = containers[-1]
- except ClientException as err:
- if err.http_status != 404:
- raise
- error_queue.put('Account not found')
- elif len(args) == 1:
- if '/' in args[0]:
- print >> stderr, 'WARNING: / in container name; you might have ' \
- 'meant %r instead of %r.' % \
- (args[0].replace('/', ' ', 1), args[0])
- conn = create_connection()
- _delete_container(args[0], conn)
- else:
- for obj in args[1:]:
- object_queue.put((args[0], obj))
- shutdown_worker_threads(container_queue, container_threads)
- put_errors_from_threads(container_threads, error_queue)
+ try:
+ if not args:
+ conn = create_connection()
+ try:
+ marker = ''
+ while True:
+ containers = [
+ c['name'] for c in conn.get_account(marker=marker)[1]]
+ if not containers:
+ break
+ for container in containers:
+ container_queue.put(container)
+ marker = containers[-1]
+ except ClientException as err:
+ if err.http_status != 404:
+ raise
+ error_queue.put('Account not found')
+ elif len(args) == 1:
+ if '/' in args[0]:
+ print >> stderr, 'WARNING: / in container name; you might ' \
+ 'have meant %r instead of %r.' % (
+ args[0].replace('/', ' ', 1), args[0])
+ conn = create_connection()
+ _delete_container(args[0], conn)
+ else:
+ for obj in args[1:]:
+ object_queue.put((args[0], obj))
+ finally:
+ shutdown_worker_threads(container_queue, container_threads)
+ put_errors_from_threads(container_threads, error_queue)
- shutdown_worker_threads(object_queue, object_threads)
- put_errors_from_threads(object_threads, error_queue)
+ shutdown_worker_threads(object_queue, object_threads)
+ put_errors_from_threads(object_threads, error_queue)
st_download_help = '''
-download --all OR download container [options] [object] [object] ...
- Downloads everything in the account (with --all), or everything in a
- container, or a list of objects depending on the args given. For a single
- object download, you may use the -o [--output] <filename> option to
- redirect the output to a specific file or if "-" then just redirect to
- stdout.'''.strip('\n')
+download --all [options] OR download container [options] [object] [object] ...
+ Downloads everything in the account (with --all), or everything in all
+ containers in the account matching a prefix (with --all and -p [--prefix]),
+ or everything in a container, or a subset of a container with -p
+ [--prefix], or a list of objects depending on the args given. -p or
+ --prefix is an option that will only download items beginning with that
+ prefix. For a single object download, you may use the -o [--output]
+ <filename> option to redirect the output to a specific file or if "-" then
+ just redirect to stdout.'''.strip('\n')
def st_download(parser, args, print_queue, error_queue):
@@ -314,6 +327,9 @@ def st_download(parser, args, print_queue, error_queue):
default='', help='Marker to use when starting a container or '
'account download')
parser.add_option(
+ '-p', '--prefix', dest='prefix',
+ help='Will only download items beginning with the prefix')
+ parser.add_option(
'-o', '--output', dest='out_file', help='For a single '
'file download, stream the output to an alternate location ')
parser.add_option(
@@ -426,12 +442,14 @@ def st_download(parser, args, print_queue, error_queue):
container_queue = Queue(10000)
- def _download_container(container, conn):
+ def _download_container(container, conn, prefix=None):
try:
marker = options.marker
while True:
- objects = [o['name'] for o in
- conn.get_container(container, marker=marker)[1]]
+ objects = [
+ o['name'] for o in
+ conn.get_container(container, marker=marker,
+ prefix=prefix)[1]]
if not objects:
break
marker = objects[-1]
@@ -455,42 +473,50 @@ def st_download(parser, args, print_queue, error_queue):
for _junk in xrange(options.container_threads)]
for thread in container_threads:
thread.start()
- if not args:
- conn = create_connection()
- try:
- marker = options.marker
- while True:
- containers = [c['name']
- for c in conn.get_account(marker=marker)[1]]
- if not containers:
- break
- marker = containers[-1]
- shuffle(containers)
- for container in containers:
- container_queue.put(container)
- except ClientException as err:
- if err.http_status != 404:
- raise
- error_queue.put('Account not found')
- elif len(args) == 1:
- if '/' in args[0]:
- print >> stderr, 'WARNING: / in container name; you might have ' \
- 'meant %r instead of %r.' % \
- (args[0].replace('/', ' ', 1), args[0])
- _download_container(args[0], create_connection())
- else:
- if len(args) == 2:
- obj = args[1]
- object_queue.put((args[0], obj, options.out_file))
- else:
- for obj in args[1:]:
- object_queue.put((args[0], obj))
- shutdown_worker_threads(container_queue, container_threads)
- put_errors_from_threads(container_threads, error_queue)
+ # We musn't let the main thread die with an exception while non-daemonic
+ # threads exist or the process with hang and ignore Ctrl-C. So we catch
+ # anything and tidy up the threads in a finally block.
+ try:
+ if not args:
+ # --all case
+ conn = create_connection()
+ try:
+ marker = options.marker
+ while True:
+ containers = [
+ c['name'] for c in conn.get_account(
+ marker=marker, prefix=options.prefix)[1]]
+ if not containers:
+ break
+ marker = containers[-1]
+ shuffle(containers)
+ for container in containers:
+ container_queue.put(container)
+ except ClientException as err:
+ if err.http_status != 404:
+ raise
+ error_queue.put('Account not found')
+ elif len(args) == 1:
+ if '/' in args[0]:
+ print >> stderr, ('WARNING: / in container name; you might '
+ 'have meant %r instead of %r.' % (
+ args[0].replace('/', ' ', 1), args[0]))
+ _download_container(args[0], create_connection(),
+ options.prefix)
+ else:
+ if len(args) == 2:
+ obj = args[1]
+ object_queue.put((args[0], obj, options.out_file))
+ else:
+ for obj in args[1:]:
+ object_queue.put((args[0], obj))
+ finally:
+ shutdown_worker_threads(container_queue, container_threads)
+ put_errors_from_threads(container_threads, error_queue)
- shutdown_worker_threads(object_queue, object_threads)
- put_errors_from_threads(object_threads, error_queue)
+ shutdown_worker_threads(object_queue, object_threads)
+ put_errors_from_threads(object_threads, error_queue)
def prt_bytes(bytes, human_flag):
@@ -550,7 +576,7 @@ def st_list(parser, args, print_queue, error_queue):
parser.add_option(
'-d', '--delimiter', dest='delimiter',
help='Will roll up items with the given delimiter'
- ' (see Cloud Files general documentation for what this means)')
+ ' (see OpenStack Swift API documentation for what this means)')
(options, args) = parse_args(parser, args)
args = args[1:]
if options.delimiter and not args:
@@ -1002,34 +1028,37 @@ def st_upload(parser, args, print_queue, error_queue):
for _junk in xrange(options.segment_threads)]
for thread in segment_threads:
thread.start()
- segment = 0
- segment_start = 0
- while segment_start < full_size:
- segment_size = int(options.segment_size)
- if segment_start + segment_size > full_size:
- segment_size = full_size - segment_start
- if options.use_slo:
- segment_name = '%s/slo/%s/%s/%s/%08d' % (
- obj, put_headers['x-object-meta-mtime'],
- full_size, options.segment_size, segment)
- else:
- segment_name = '%s/%s/%s/%s/%08d' % (
- obj, put_headers['x-object-meta-mtime'],
- full_size, options.segment_size, segment)
- segment_queue.put(
- {'path': path, 'obj': segment_name,
- 'segment_start': segment_start,
- 'segment_size': segment_size,
- 'segment_index': segment,
- 'log_line': '%s segment %s' % (obj, segment)})
- segment += 1
- segment_start += segment_size
- shutdown_worker_threads(segment_queue, segment_threads)
- if put_errors_from_threads(segment_threads, error_queue):
- raise ClientException(
- 'Aborting manifest creation '
- 'because not all segments could be uploaded. %s/%s'
- % (container, obj))
+ try:
+ segment = 0
+ segment_start = 0
+ while segment_start < full_size:
+ segment_size = int(options.segment_size)
+ if segment_start + segment_size > full_size:
+ segment_size = full_size - segment_start
+ if options.use_slo:
+ segment_name = '%s/slo/%s/%s/%s/%08d' % (
+ obj, put_headers['x-object-meta-mtime'],
+ full_size, options.segment_size, segment)
+ else:
+ segment_name = '%s/%s/%s/%s/%08d' % (
+ obj, put_headers['x-object-meta-mtime'],
+ full_size, options.segment_size, segment)
+ segment_queue.put(
+ {'path': path, 'obj': segment_name,
+ 'segment_start': segment_start,
+ 'segment_size': segment_size,
+ 'segment_index': segment,
+ 'log_line': '%s segment %s' % (obj, segment)})
+ segment += 1
+ segment_start += segment_size
+ finally:
+ shutdown_worker_threads(segment_queue, segment_threads)
+ if put_errors_from_threads(segment_threads,
+ error_queue):
+ raise ClientException(
+ 'Aborting manifest creation '
+ 'because not all segments could be uploaded. '
+ '%s/%s' % (container, obj))
if options.use_slo:
slo_segments = []
for thread in segment_threads:
@@ -1149,19 +1178,20 @@ def st_upload(parser, args, print_queue, error_queue):
except Exception as err:
error_queue.put(
'Error trying to create container %r: %s' % (args[0], err))
+
try:
for arg in args[1:]:
if isdir(arg):
_upload_dir(arg)
else:
object_queue.put({'path': arg})
-
- shutdown_worker_threads(object_queue, object_threads)
- put_errors_from_threads(object_threads, error_queue)
except ClientException as err:
if err.http_status != 404:
raise
error_queue.put('Account not found')
+ finally:
+ shutdown_worker_threads(object_queue, object_threads)
+ put_errors_from_threads(object_threads, error_queue)
def split_headers(options, prefix='', error_queue=None):
@@ -1395,7 +1425,7 @@ Examples:
print item
print_thread = QueueFunctionThread(print_queue, _print)
- print_thread.setDaemon(True)
+ print_thread.start()
error_count = 0
error_queue = Queue(10000)
@@ -1408,7 +1438,7 @@ Examples:
print >> stderr, item
error_thread = QueueFunctionThread(error_queue, _error)
- error_thread.setDaemon(True)
+ error_thread.start()
parser.usage = globals()['st_%s_help' % args[0]]
try:
@@ -1416,18 +1446,9 @@ Examples:
error_queue)
except (ClientException, HTTPException, socket.error) as err:
error_queue.put(str(err))
-
- # Let other threads start working, now start print and error thread,
- # this is to prevent the main thread shutdown two thread prematurely
- print_thread.start()
- error_thread.start()
-
- # If not all the worker threads have finished, then the main thread
- # has to wait. Only when there are main, error and print thread left
- # the main thread can proceed to finish up.
- while (len(threading_enumerate()) > 3 or not error_queue.empty() or
- not print_queue.empty()):
- sleep(0.5)
+ finally:
+ shutdown_worker_threads(print_queue, [print_thread])
+ shutdown_worker_threads(error_queue, [error_thread])
if error_count:
exit(1)