summaryrefslogtreecommitdiff
path: root/bin
diff options
context:
space:
mode:
authorDarrell Bishop <darrell@swiftstack.com>2013-06-26 11:41:29 -0700
committerDarrell Bishop <darrell@swiftstack.com>2013-06-26 23:00:46 -0700
commitf022aac0cf460ca4d3208ba1c22fae5f32ae34af (patch)
treed500b27830c78299eedcc2d3b57ab09172e5be29 /bin
parent1c86d62fdebf41bb55ad878020f8c3a24ca5674a (diff)
downloadpython-swiftclient-f022aac0cf460ca4d3208ba1c22fae5f32ae34af.tar.gz
Add -p option to download command.
Allow the ability to download a subset of containers (--all with -p) or a subset of objects within a container (container name with -p). This patch also includes a drive-by fix for "download --all" which would not actually download any objects (for me, at least) because the object queue got filled with "stop" messages before the container workers had run long enough to put work in the object queue. Doh! I also closed up a few holes where an (unexpected, obviously) Exception could cause the process to hang because non-daemon threads still existed. Change-Id: I71c6935c60282b5353badc2dfce8a935d47e3bb7
Diffstat (limited to 'bin')
-rwxr-xr-xbin/swift281
1 files changed, 151 insertions, 130 deletions
diff --git a/bin/swift b/bin/swift
index 8cce1c3..9ca0745 100755
--- a/bin/swift
+++ b/bin/swift
@@ -25,7 +25,7 @@ from os.path import basename, dirname, getmtime, getsize, isdir, join
from Queue import Queue
from random import shuffle
from sys import argv, exc_info, exit, stderr, stdout
-from threading import enumerate as threading_enumerate, Thread
+from threading import Thread
from time import sleep, time, gmtime, strftime
from traceback import format_exception
from urllib import quote, unquote
@@ -84,16 +84,6 @@ class StopWorkerThreadSignal(object):
pass
-def shutdown_worker_threads(queue, thread_list):
- for thread in [t for t in thread_list if t.isAlive()]:
- queue.put(StopWorkerThreadSignal())
-
-
-def immediate_exit(signum, frame):
- stderr.write(" Aborted\n")
- os_exit(2)
-
-
class QueueFunctionThread(Thread):
def __init__(self, queue, func, *args, **kwargs):
@@ -128,6 +118,24 @@ class QueueFunctionThread(Thread):
self.exc_infos.append(exc_info())
+def shutdown_worker_threads(queue, thread_list):
+ """
+ Takes a job queue and a list of associated QueueFunctionThread objects,
+ puts a StopWorkerThreadSignal object into the queue, and waits for the
+ queue to flush.
+ """
+ for thread in [t for t in thread_list if t.isAlive()]:
+ queue.put(StopWorkerThreadSignal())
+
+ while any(map(QueueFunctionThread.is_alive, thread_list)):
+ sleep(0.05)
+
+
+def immediate_exit(signum, frame):
+ stderr.write(" Aborted\n")
+ os_exit(2)
+
+
st_delete_help = '''
delete [options] --all OR delete container [options] [object] [object] ...
Deletes everything in the account (with --all), or everything in a
@@ -261,47 +269,52 @@ def st_delete(parser, args, print_queue, error_queue):
for _junk in xrange(options.container_threads)]
for thread in container_threads:
thread.start()
- if not args:
- conn = create_connection()
- try:
- marker = ''
- while True:
- containers = \
- [c['name'] for c in conn.get_account(marker=marker)[1]]
- if not containers:
- break
- for container in containers:
- container_queue.put(container)
- marker = containers[-1]
- except ClientException as err:
- if err.http_status != 404:
- raise
- error_queue.put('Account not found')
- elif len(args) == 1:
- if '/' in args[0]:
- print >> stderr, 'WARNING: / in container name; you might have ' \
- 'meant %r instead of %r.' % \
- (args[0].replace('/', ' ', 1), args[0])
- conn = create_connection()
- _delete_container(args[0], conn)
- else:
- for obj in args[1:]:
- object_queue.put((args[0], obj))
- shutdown_worker_threads(container_queue, container_threads)
- put_errors_from_threads(container_threads, error_queue)
+ try:
+ if not args:
+ conn = create_connection()
+ try:
+ marker = ''
+ while True:
+ containers = [
+ c['name'] for c in conn.get_account(marker=marker)[1]]
+ if not containers:
+ break
+ for container in containers:
+ container_queue.put(container)
+ marker = containers[-1]
+ except ClientException as err:
+ if err.http_status != 404:
+ raise
+ error_queue.put('Account not found')
+ elif len(args) == 1:
+ if '/' in args[0]:
+ print >> stderr, 'WARNING: / in container name; you might ' \
+ 'have meant %r instead of %r.' % (
+ args[0].replace('/', ' ', 1), args[0])
+ conn = create_connection()
+ _delete_container(args[0], conn)
+ else:
+ for obj in args[1:]:
+ object_queue.put((args[0], obj))
+ finally:
+ shutdown_worker_threads(container_queue, container_threads)
+ put_errors_from_threads(container_threads, error_queue)
- shutdown_worker_threads(object_queue, object_threads)
- put_errors_from_threads(object_threads, error_queue)
+ shutdown_worker_threads(object_queue, object_threads)
+ put_errors_from_threads(object_threads, error_queue)
st_download_help = '''
-download --all OR download container [options] [object] [object] ...
- Downloads everything in the account (with --all), or everything in a
- container, or a list of objects depending on the args given. For a single
- object download, you may use the -o [--output] <filename> option to
- redirect the output to a specific file or if "-" then just redirect to
- stdout.'''.strip('\n')
+download --all [options] OR download container [options] [object] [object] ...
+ Downloads everything in the account (with --all), or everything in all
+ containers in the account matching a prefix (with --all and -p [--prefix]),
+ or everything in a container, or a subset of a container with -p
+ [--prefix], or a list of objects depending on the args given. -p or
+ --prefix is an option that will only download items beginning with that
+ prefix. For a single object download, you may use the -o [--output]
+ <filename> option to redirect the output to a specific file or if "-" then
+ just redirect to stdout.'''.strip('\n')
def st_download(parser, args, print_queue, error_queue):
@@ -314,6 +327,9 @@ def st_download(parser, args, print_queue, error_queue):
default='', help='Marker to use when starting a container or '
'account download')
parser.add_option(
+ '-p', '--prefix', dest='prefix',
+ help='Will only download items beginning with the prefix')
+ parser.add_option(
'-o', '--output', dest='out_file', help='For a single '
'file download, stream the output to an alternate location ')
parser.add_option(
@@ -426,12 +442,14 @@ def st_download(parser, args, print_queue, error_queue):
container_queue = Queue(10000)
- def _download_container(container, conn):
+ def _download_container(container, conn, prefix=None):
try:
marker = options.marker
while True:
- objects = [o['name'] for o in
- conn.get_container(container, marker=marker)[1]]
+ objects = [
+ o['name'] for o in
+ conn.get_container(container, marker=marker,
+ prefix=prefix)[1]]
if not objects:
break
marker = objects[-1]
@@ -455,42 +473,50 @@ def st_download(parser, args, print_queue, error_queue):
for _junk in xrange(options.container_threads)]
for thread in container_threads:
thread.start()
- if not args:
- conn = create_connection()
- try:
- marker = options.marker
- while True:
- containers = [c['name']
- for c in conn.get_account(marker=marker)[1]]
- if not containers:
- break
- marker = containers[-1]
- shuffle(containers)
- for container in containers:
- container_queue.put(container)
- except ClientException as err:
- if err.http_status != 404:
- raise
- error_queue.put('Account not found')
- elif len(args) == 1:
- if '/' in args[0]:
- print >> stderr, 'WARNING: / in container name; you might have ' \
- 'meant %r instead of %r.' % \
- (args[0].replace('/', ' ', 1), args[0])
- _download_container(args[0], create_connection())
- else:
- if len(args) == 2:
- obj = args[1]
- object_queue.put((args[0], obj, options.out_file))
- else:
- for obj in args[1:]:
- object_queue.put((args[0], obj))
- shutdown_worker_threads(container_queue, container_threads)
- put_errors_from_threads(container_threads, error_queue)
+ # We musn't let the main thread die with an exception while non-daemonic
+ # threads exist or the process with hang and ignore Ctrl-C. So we catch
+ # anything and tidy up the threads in a finally block.
+ try:
+ if not args:
+ # --all case
+ conn = create_connection()
+ try:
+ marker = options.marker
+ while True:
+ containers = [
+ c['name'] for c in conn.get_account(
+ marker=marker, prefix=options.prefix)[1]]
+ if not containers:
+ break
+ marker = containers[-1]
+ shuffle(containers)
+ for container in containers:
+ container_queue.put(container)
+ except ClientException as err:
+ if err.http_status != 404:
+ raise
+ error_queue.put('Account not found')
+ elif len(args) == 1:
+ if '/' in args[0]:
+ print >> stderr, ('WARNING: / in container name; you might '
+ 'have meant %r instead of %r.' % (
+ args[0].replace('/', ' ', 1), args[0]))
+ _download_container(args[0], create_connection(),
+ options.prefix)
+ else:
+ if len(args) == 2:
+ obj = args[1]
+ object_queue.put((args[0], obj, options.out_file))
+ else:
+ for obj in args[1:]:
+ object_queue.put((args[0], obj))
+ finally:
+ shutdown_worker_threads(container_queue, container_threads)
+ put_errors_from_threads(container_threads, error_queue)
- shutdown_worker_threads(object_queue, object_threads)
- put_errors_from_threads(object_threads, error_queue)
+ shutdown_worker_threads(object_queue, object_threads)
+ put_errors_from_threads(object_threads, error_queue)
def prt_bytes(bytes, human_flag):
@@ -546,7 +572,7 @@ def st_list(parser, args, print_queue, error_queue):
parser.add_option(
'-d', '--delimiter', dest='delimiter',
help='Will roll up items with the given delimiter'
- ' (see Cloud Files general documentation for what this means)')
+ ' (see OpenStack Swift API documentation for what this means)')
(options, args) = parse_args(parser, args)
args = args[1:]
if options.delimiter and not args:
@@ -971,34 +997,37 @@ def st_upload(parser, args, print_queue, error_queue):
for _junk in xrange(options.segment_threads)]
for thread in segment_threads:
thread.start()
- segment = 0
- segment_start = 0
- while segment_start < full_size:
- segment_size = int(options.segment_size)
- if segment_start + segment_size > full_size:
- segment_size = full_size - segment_start
- if options.use_slo:
- segment_name = '%s/slo/%s/%s/%s/%08d' % (
- obj, put_headers['x-object-meta-mtime'],
- full_size, options.segment_size, segment)
- else:
- segment_name = '%s/%s/%s/%s/%08d' % (
- obj, put_headers['x-object-meta-mtime'],
- full_size, options.segment_size, segment)
- segment_queue.put(
- {'path': path, 'obj': segment_name,
- 'segment_start': segment_start,
- 'segment_size': segment_size,
- 'segment_index': segment,
- 'log_line': '%s segment %s' % (obj, segment)})
- segment += 1
- segment_start += segment_size
- shutdown_worker_threads(segment_queue, segment_threads)
- if put_errors_from_threads(segment_threads, error_queue):
- raise ClientException(
- 'Aborting manifest creation '
- 'because not all segments could be uploaded. %s/%s'
- % (container, obj))
+ try:
+ segment = 0
+ segment_start = 0
+ while segment_start < full_size:
+ segment_size = int(options.segment_size)
+ if segment_start + segment_size > full_size:
+ segment_size = full_size - segment_start
+ if options.use_slo:
+ segment_name = '%s/slo/%s/%s/%s/%08d' % (
+ obj, put_headers['x-object-meta-mtime'],
+ full_size, options.segment_size, segment)
+ else:
+ segment_name = '%s/%s/%s/%s/%08d' % (
+ obj, put_headers['x-object-meta-mtime'],
+ full_size, options.segment_size, segment)
+ segment_queue.put(
+ {'path': path, 'obj': segment_name,
+ 'segment_start': segment_start,
+ 'segment_size': segment_size,
+ 'segment_index': segment,
+ 'log_line': '%s segment %s' % (obj, segment)})
+ segment += 1
+ segment_start += segment_size
+ finally:
+ shutdown_worker_threads(segment_queue, segment_threads)
+ if put_errors_from_threads(segment_threads,
+ error_queue):
+ raise ClientException(
+ 'Aborting manifest creation '
+ 'because not all segments could be uploaded. '
+ '%s/%s' % (container, obj))
if options.use_slo:
slo_segments = []
for thread in segment_threads:
@@ -1118,19 +1147,20 @@ def st_upload(parser, args, print_queue, error_queue):
except Exception as err:
error_queue.put(
'Error trying to create container %r: %s' % (args[0], err))
+
try:
for arg in args[1:]:
if isdir(arg):
_upload_dir(arg)
else:
object_queue.put({'path': arg})
-
- shutdown_worker_threads(object_queue, object_threads)
- put_errors_from_threads(object_threads, error_queue)
except ClientException as err:
if err.http_status != 404:
raise
error_queue.put('Account not found')
+ finally:
+ shutdown_worker_threads(object_queue, object_threads)
+ put_errors_from_threads(object_threads, error_queue)
def split_headers(options, prefix='', error_queue=None):
@@ -1364,7 +1394,7 @@ Examples:
print item
print_thread = QueueFunctionThread(print_queue, _print)
- print_thread.setDaemon(True)
+ print_thread.start()
error_count = 0
error_queue = Queue(10000)
@@ -1377,7 +1407,7 @@ Examples:
print >> stderr, item
error_thread = QueueFunctionThread(error_queue, _error)
- error_thread.setDaemon(True)
+ error_thread.start()
parser.usage = globals()['st_%s_help' % args[0]]
try:
@@ -1385,18 +1415,9 @@ Examples:
error_queue)
except (ClientException, HTTPException, socket.error) as err:
error_queue.put(str(err))
-
- # Let other threads start working, now start print and error thread,
- # this is to prevent the main thread shutdown two thread prematurely
- print_thread.start()
- error_thread.start()
-
- # If not all the worker threads have finished, then the main thread
- # has to wait. Only when there are main, error and print thread left
- # the main thread can proceed to finish up.
- while (len(threading_enumerate()) > 3 or not error_queue.empty() or
- not print_queue.empty()):
- sleep(0.5)
+ finally:
+ shutdown_worker_threads(print_queue, [print_thread])
+ shutdown_worker_threads(error_queue, [error_thread])
if error_count:
exit(1)