diff options
author | Darrell Bishop <darrell@swiftstack.com> | 2013-06-26 22:47:49 -0700 |
---|---|---|
committer | Darrell Bishop <darrell@swiftstack.com> | 2013-07-28 22:08:17 -0700 |
commit | 9198e95468b3005730c931da1701f34b1a9ce2d9 (patch) | |
tree | d6034f1a489b67273c19dc43221b142bd0c7bb65 | |
parent | 5d9c6f845cc98da720fea7e2343fdbb0db9a42a5 (diff) | |
download | python-swiftclient-9198e95468b3005730c931da1701f34b1a9ce2d9.tar.gz |
Move multi-threading code to a library.
This patch extracts the multi-threading code from bin/swift into
swiftclient/multithreading and adds tests. In particular, this new way
of doing it (with context managers) will prevent non-daemonic threads
from wedging the process when unexpected exceptions happen.
I enabled reporting of which lines, specifically, are not covered by
unit tests (added -m option to "coverage report" in .unittests).
This patch includes a drive-by fix for uploading a segmented file with
--use-slo when that object already exists. A key of "name" was used
instead of "path", raising KeyError.
There's also another drive-by fix for uploading segmented objects with
--use-slo. Commit 874e0e4427b80e1b15b74a1557b73ba9d61443ca regressed
this by removing the capturing of thread-worker results in
QueueFunctionThread.run(). This patch restores that functionality and
the feature (uploading SLO objects).
Change-Id: I0b4f677e4a734e83d1a25088d9a74f7d46384e53
-rwxr-xr-x | .unittests | 2 | ||||
-rwxr-xr-x | bin/swift | 658 | ||||
-rw-r--r-- | doc/source/conf.py | 7 | ||||
-rw-r--r-- | doc/source/swiftclient.rst | 16 | ||||
-rw-r--r-- | swiftclient/client.py | 60 | ||||
-rw-r--r-- | swiftclient/exceptions.py | 72 | ||||
-rw-r--r-- | swiftclient/multithreading.py | 241 | ||||
-rw-r--r-- | swiftclient/utils.py | 1 | ||||
-rw-r--r-- | tests/test_multithreading.py | 334 |
9 files changed, 928 insertions, 463 deletions
@@ -3,6 +3,6 @@ set -e python setup.py testr --coverage RET=$? -coverage report +coverage report -m rm -f .coverage exit $RET @@ -22,12 +22,9 @@ from hashlib import md5 from optparse import OptionParser, SUPPRESS_HELP from os import environ, listdir, makedirs, utime, _exit as os_exit from os.path import basename, dirname, getmtime, getsize, isdir, join -from Queue import Queue from random import shuffle -from sys import argv, exc_info, exit, stderr, stdout -from threading import Thread +from sys import argv, exit, stderr, stdout from time import sleep, time, gmtime, strftime -from traceback import format_exception from urllib import quote, unquote try: @@ -35,7 +32,10 @@ try: except ImportError: import json -from swiftclient import Connection, ClientException, HTTPException, utils +from swiftclient import Connection, HTTPException +from swiftclient.utils import config_true_value +from swiftclient.multithreading import MultiThreadingManager +from swiftclient.exceptions import ClientException from swiftclient.version import version_info @@ -63,75 +63,6 @@ def mkdirs(path): raise -def put_errors_from_threads(threads, error_queue): - """ - Places any errors from the threads into error_queue. - :param threads: A list of QueueFunctionThread instances. - :param error_queue: A queue to put error strings into. - :returns: True if any errors were found. - """ - was_error = False - for thread in threads: - for info in thread.exc_infos: - was_error = True - if isinstance(info[1], ClientException): - error_queue.put(str(info[1])) - else: - error_queue.put(''.join(format_exception(*info))) - return was_error - - -class StopWorkerThreadSignal(object): - pass - - -class QueueFunctionThread(Thread): - - def __init__(self, queue, func, *args, **kwargs): - """ - Calls func for each item in queue; func is called with a queued - item as the first arg followed by *args and **kwargs. Use the - PriorityQueue for sending quit signal when Ctrl-C is pressed. - """ - Thread.__init__(self) - self.queue = queue - self.func = func - self.args = args - self.kwargs = kwargs - self.exc_infos = [] - self.results = [] - self.store_results = kwargs.pop('store_results', False) - - def run(self): - while True: - try: - item = self.queue.get() - if isinstance(item, StopWorkerThreadSignal): - break - except: - # This catch is important and it may occur when ctrl-C is - # pressed, in this case simply quit the thread - break - else: - try: - self.func(item, *self.args, **self.kwargs) - except Exception: - self.exc_infos.append(exc_info()) - - -def shutdown_worker_threads(queue, thread_list): - """ - Takes a job queue and a list of associated QueueFunctionThread objects, - puts a StopWorkerThreadSignal object into the queue, and waits for the - queue to flush. - """ - for thread in [t for t in thread_list if t.isAlive()]: - queue.put(StopWorkerThreadSignal()) - - while any(map(QueueFunctionThread.is_alive, thread_list)): - sleep(0.05) - - def immediate_exit(signum, frame): stderr.write(" Aborted\n") os_exit(2) @@ -145,7 +76,7 @@ delete [options] --all OR delete container [options] [object] [object] ... --leave-segments option.'''.strip('\n') -def st_delete(parser, args, print_queue, error_queue): +def st_delete(parser, args, thread_manager): parser.add_option( '-a', '--all', action='store_true', dest='yes_all', default=False, help='Indicates that you really want to delete ' @@ -164,20 +95,19 @@ def st_delete(parser, args, print_queue, error_queue): (options, args) = parse_args(parser, args) args = args[1:] if (not args and not options.yes_all) or (args and options.yes_all): - error_queue.put('Usage: %s [options] %s' % - (basename(argv[0]), st_delete_help)) + thread_manager.error('Usage: %s [options] %s', + basename(argv[0]), st_delete_help) return def _delete_segment((container, obj), conn): conn.delete_object(container, obj) if options.verbose: if conn.attempts > 2: - print_queue.put('%s/%s [after %d attempts]' % - (container, obj, conn.attempts)) + thread_manager.print_msg( + '%s/%s [after %d attempts]', container, + obj, conn.attempts) else: - print_queue.put('%s/%s' % (container, obj)) - - object_queue = Queue(10000) + thread_manager.print_msg('%s/%s', container, obj) def _delete_object((container, obj), conn): try: @@ -187,7 +117,7 @@ def st_delete(parser, args, print_queue, error_queue): try: headers = conn.head_object(container, obj) old_manifest = headers.get('x-object-manifest') - if utils.config_true_value( + if config_true_value( headers.get('x-static-large-object')): query_string = 'multipart-manifest=delete' except ClientException as err: @@ -195,7 +125,10 @@ def st_delete(parser, args, print_queue, error_queue): raise conn.delete_object(container, obj, query_string=query_string) if old_manifest: - segment_queue = Queue(10000) + segment_manager = thread_manager.queue_manager( + _delete_segment, options.object_threads, + connection_maker=create_connection) + segment_queue = segment_manager.queue scontainer, sprefix = old_manifest.split('/', 1) scontainer = unquote(scontainer) sprefix = unquote(sprefix).rstrip('/') + '/' @@ -203,32 +136,23 @@ def st_delete(parser, args, print_queue, error_queue): prefix=sprefix)[1]: segment_queue.put((scontainer, delobj['name'])) if not segment_queue.empty(): - segment_threads = [QueueFunctionThread( - segment_queue, - _delete_segment, create_connection()) for _junk in - xrange(options.object_threads)] - for thread in segment_threads: - thread.start() - shutdown_worker_threads(segment_queue, segment_threads) - put_errors_from_threads(segment_threads, error_queue) + with segment_manager: + pass if options.verbose: path = options.yes_all and join(container, obj) or obj if path[:1] in ('/', '\\'): path = path[1:] if conn.attempts > 1: - print_queue.put('%s [after %d attempts]' % - (path, conn.attempts)) + thread_manager.print_msg('%s [after %d attempts]', path, + conn.attempts) else: - print_queue.put(path) + thread_manager.print_msg(path) except ClientException as err: if err.http_status != 404: raise - error_queue.put('Object %s not found' % - repr('%s/%s' % (container, obj))) + thread_manager.error("Object '%s/%s' not found", container, obj) - container_queue = Queue(10000) - - def _delete_container(container, conn): + def _delete_container(container, conn, object_queue): try: marker = '' while True: @@ -256,54 +180,43 @@ def st_delete(parser, args, print_queue, error_queue): except ClientException as err: if err.http_status != 404: raise - error_queue.put('Container %s not found' % repr(container)) + thread_manager.error('Container %r not found', container) create_connection = lambda: get_conn(options) - object_threads = \ - [QueueFunctionThread(object_queue, _delete_object, create_connection()) - for _junk in xrange(options.object_threads)] - for thread in object_threads: - thread.start() - container_threads = \ - [QueueFunctionThread(container_queue, _delete_container, - create_connection()) - for _junk in xrange(options.container_threads)] - for thread in container_threads: - thread.start() - - try: - if not args: - conn = create_connection() - try: - marker = '' - while True: - containers = [ - c['name'] for c in conn.get_account(marker=marker)[1]] - if not containers: - break - for container in containers: - container_queue.put(container) - marker = containers[-1] - except ClientException as err: - if err.http_status != 404: - raise - error_queue.put('Account not found') - elif len(args) == 1: - if '/' in args[0]: - print >> stderr, 'WARNING: / in container name; you might ' \ - 'have meant %r instead of %r.' % ( + obj_manager = thread_manager.queue_manager( + _delete_object, options.object_threads, + connection_maker=create_connection) + with obj_manager as object_queue: + cont_manager = thread_manager.queue_manager( + _delete_container, options.container_threads, object_queue, + connection_maker=create_connection) + with cont_manager as container_queue: + if not args: + conn = create_connection() + try: + marker = '' + while True: + containers = [ + c['name'] + for c in conn.get_account(marker=marker)[1]] + if not containers: + break + for container in containers: + container_queue.put(container) + marker = containers[-1] + except ClientException as err: + if err.http_status != 404: + raise + thread_manager.error('Account not found') + elif len(args) == 1: + if '/' in args[0]: + print >> stderr, 'WARNING: / in container name; you ' + 'might have meant %r instead of %r.' % ( args[0].replace('/', ' ', 1), args[0]) - conn = create_connection() - _delete_container(args[0], conn) - else: - for obj in args[1:]: - object_queue.put((args[0], obj)) - finally: - shutdown_worker_threads(container_queue, container_threads) - put_errors_from_threads(container_threads, error_queue) - - shutdown_worker_threads(object_queue, object_threads) - put_errors_from_threads(object_threads, error_queue) + container_queue.put(args[0]) + else: + for obj in args[1:]: + object_queue.put((args[0], obj)) st_download_help = ''' @@ -318,7 +231,7 @@ download --all [options] OR download container [options] [object] [object] ... just redirect to stdout.'''.strip('\n') -def st_download(parser, args, print_queue, error_queue): +def st_download(parser, args, thread_manager): parser.add_option( '-a', '--all', action='store_true', dest='yes_all', default=False, help='Indicates that you really want to download ' @@ -350,12 +263,10 @@ def st_download(parser, args, print_queue, error_queue): if options.out_file and len(args) != 2: exit('-o option only allowed for single file downloads') if (not args and not options.yes_all) or (args and options.yes_all): - error_queue.put('Usage: %s [options] %s' % - (basename(argv[0]), st_download_help)) + thread_manager.error('Usage: %s [options] %s', basename(argv[0]), + st_download_help) return - object_queue = Queue(10000) - def _download_object(queue_arg, conn): if len(queue_arg) == 2: container, obj = queue_arg @@ -415,11 +326,12 @@ def st_download(parser, args, print_queue, error_queue): if not options.no_download: fp.close() if md5sum and md5sum.hexdigest() != etag: - error_queue.put('%s: md5sum != etag, %s != %s' % - (path, md5sum.hexdigest(), etag)) + thread_manager.error('%s: md5sum != etag, %s != %s', + path, md5sum.hexdigest(), etag) if content_length is not None and read_length != content_length: - error_queue.put('%s: read_length != content_length, %d != %d' % - (path, read_length, content_length)) + thread_manager.error( + '%s: read_length != content_length, %d != %d', + path, read_length, content_length) if 'x-object-meta-mtime' in headers and not options.out_file \ and not options.no_download: @@ -431,19 +343,23 @@ def st_download(parser, args, print_queue, error_queue): header_receipt - start_time, finish_time - start_time, float(read_length) / (finish_time - start_time) / 1000000) if conn.attempts > 1: - print_queue.put('%s [%s after %d attempts]' % - (path, time_str, conn.attempts)) + thread_manager.print_msg('%s [%s after %d attempts]', path, + time_str, conn.attempts) else: - print_queue.put('%s [%s]' % (path, time_str)) + thread_manager.print_msg('%s [%s]', path, time_str) except ClientException as err: if err.http_status != 404: raise - error_queue.put('Object %s not found' % - repr('%s/%s' % (container, obj))) + thread_manager.error("Object '%s/%s' not found", container, obj) - container_queue = Queue(10000) - - def _download_container(container, conn, prefix=None): + def _download_container(queue_arg, conn): + if len(queue_arg) == 2: + container, object_queue = queue_arg + prefix = None + elif len(queue_arg) == 3: + container, object_queue, prefix = queue_arg + else: + raise Exception("Invalid queue_arg length of %s" % len(queue_arg)) try: marker = options.marker while True: @@ -460,64 +376,49 @@ def st_download(parser, args, print_queue, error_queue): except ClientException as err: if err.http_status != 404: raise - error_queue.put('Container %s not found' % repr(container)) + thread_manager.error('Container %r not found', container) create_connection = lambda: get_conn(options) - object_threads = [QueueFunctionThread( - object_queue, _download_object, - create_connection()) for _junk in xrange(options.object_threads)] - for thread in object_threads: - thread.start() - container_threads = [QueueFunctionThread( - container_queue, - _download_container, create_connection()) - for _junk in xrange(options.container_threads)] - for thread in container_threads: - thread.start() - - # We musn't let the main thread die with an exception while non-daemonic - # threads exist or the process with hang and ignore Ctrl-C. So we catch - # anything and tidy up the threads in a finally block. - try: - if not args: - # --all case - conn = create_connection() - try: - marker = options.marker - while True: - containers = [ - c['name'] for c in conn.get_account( - marker=marker, prefix=options.prefix)[1]] - if not containers: - break - marker = containers[-1] - shuffle(containers) - for container in containers: - container_queue.put(container) - except ClientException as err: - if err.http_status != 404: - raise - error_queue.put('Account not found') - elif len(args) == 1: - if '/' in args[0]: - print >> stderr, ('WARNING: / in container name; you might ' - 'have meant %r instead of %r.' % ( + obj_manager = thread_manager.queue_manager( + _download_object, options.object_threads, + connection_maker=create_connection) + with obj_manager as object_queue: + cont_manager = thread_manager.queue_manager( + _download_container, options.container_threads, + connection_maker=create_connection) + with cont_manager as container_queue: + if not args: + # --all case + conn = create_connection() + try: + marker = options.marker + while True: + containers = [ + c['name'] for c in conn.get_account( + marker=marker, prefix=options.prefix)[1]] + if not containers: + break + marker = containers[-1] + shuffle(containers) + for container in containers: + container_queue.put((container, object_queue)) + except ClientException as err: + if err.http_status != 404: + raise + thread_manager.error('Account not found') + elif len(args) == 1: + if '/' in args[0]: + print >> stderr, ('WARNING: / in container name; you ' + 'might have meant %r instead of %r.' % ( args[0].replace('/', ' ', 1), args[0])) - _download_container(args[0], create_connection(), - options.prefix) - else: - if len(args) == 2: - obj = args[1] - object_queue.put((args[0], obj, options.out_file)) + container_queue.put((args[0], object_queue, options.prefix)) else: - for obj in args[1:]: - object_queue.put((args[0], obj)) - finally: - shutdown_worker_threads(container_queue, container_threads) - put_errors_from_threads(container_threads, error_queue) - - shutdown_worker_threads(object_queue, object_threads) - put_errors_from_threads(object_threads, error_queue) + if len(args) == 2: + obj = args[1] + object_queue.put((args[0], obj, options.out_file)) + else: + for obj in args[1:]: + object_queue.put((args[0], obj)) def prt_bytes(bytes, human_flag): @@ -560,7 +461,7 @@ list [options] [container] '''.strip('\n') -def st_list(parser, args, print_queue, error_queue): +def st_list(parser, args, thread_manager): parser.add_option( '-l', '--long', dest='long', help='Long listing ' 'similar to ls -l command', action='store_true', default=False) @@ -583,8 +484,8 @@ def st_list(parser, args, print_queue, error_queue): if options.delimiter and not args: exit('-d option only allowed for container listings') if len(args) > 1 or len(args) == 1 and args[0].find('/') >= 0: - error_queue.put('Usage: %s [options] %s' % - (basename(argv[0]), st_list_help)) + thread_manager.error('Usage: %s [options] %s', basename(argv[0]), + st_list_help) return conn = get_conn(options) @@ -605,12 +506,13 @@ def st_list(parser, args, print_queue, error_queue): item_name = item.get('name') if not options.long and not options.human: - print_queue.put(item.get('name', item.get('subdir'))) + thread_manager.print_msg( + item.get('name', item.get('subdir'))) else: item_bytes = item.get('bytes') total_bytes += item_bytes if len(args) == 0: # listing containers - bytes = prt_bytes(item_bytes, options.human) + byte_str = prt_bytes(item_bytes, options.human) count = item.get('count') total_count += count try: @@ -620,41 +522,42 @@ def st_list(parser, args, print_queue, error_queue): except ClientException: datestamp = '????-??-?? ??:??:??' if not options.totals: - print_queue.put("%5s %s %s %s" % - (count, bytes, datestamp, - item_name)) + thread_manager.print_msg("%5s %s %s %s", count, + byte_str, datestamp, + item_name) else: # list container contents subdir = item.get('subdir') if subdir is None: - bytes = prt_bytes(item_bytes, options.human) + byte_str = prt_bytes(item_bytes, options.human) date, xtime = item.get('last_modified').split('T') xtime = xtime.split('.')[0] else: - bytes = prt_bytes(0, options.human) + byte_str = prt_bytes(0, options.human) date = xtime = '' item_name = subdir if not options.totals: - print_queue.put("%s %10s %8s %s" % - (bytes, date, xtime, item_name)) + thread_manager.print_msg("%s %10s %8s %s", + byte_str, date, xtime, + item_name) marker = items[-1].get('name', items[-1].get('subdir')) # report totals if options.long or options.human: if len(args) == 0: - print_queue.put("%5s %s" % (prt_bytes(total_count, True), - prt_bytes(total_bytes, - options.human))) + thread_manager.print_msg( + "%5s %s", prt_bytes(total_count, True), + prt_bytes(total_bytes, options.human)) else: - print_queue.put("%s" % (prt_bytes(total_bytes, options.human))) + thread_manager.print_msg(prt_bytes(total_bytes, options.human)) except ClientException as err: if err.http_status != 404: raise if not args: - error_queue.put('Account not found') + thread_manager.error('Account not found') else: - error_queue.put('Container %s not found' % repr(args[0])) + thread_manager.error('Container %r not found', args[0]) st_stat_help = ''' stat [container] [object] @@ -663,7 +566,7 @@ stat [container] [object] like 'list --lh' noting number of objs a multiple of 1024'''.strip('\n') -def st_stat(parser, args, print_queue, error_queue): +def st_stat(parser, args, thread_manager): parser.add_option( '--lh', dest='human', help="report totals like 'list --lh'", action='store_true', default=False) @@ -674,36 +577,36 @@ def st_stat(parser, args, print_queue, error_queue): try: headers = conn.head_account() if options.verbose > 1: - print_queue.put(''' + thread_manager.print_msg(''' StorageURL: %s Auth Token: %s -'''.strip('\n') % (conn.url, conn.token)) +'''.strip('\n'), conn.url, conn.token) container_count = int(headers.get('x-account-container-count', 0)) object_count = prt_bytes(headers.get('x-account-object-count', 0), options.human).lstrip() bytes_used = prt_bytes(headers.get('x-account-bytes-used', 0), options.human).lstrip() - print_queue.put(''' + thread_manager.print_msg(''' Account: %s Containers: %d Objects: %s - Bytes: %s'''.strip('\n') % (conn.url.rsplit('/', 1)[-1], container_count, - object_count, bytes_used)) + Bytes: %s'''.strip('\n'), conn.url.rsplit('/', 1)[-1], container_count, + object_count, bytes_used) for key, value in headers.items(): if key.startswith('x-account-meta-'): - print_queue.put( - '%10s: %s' % ('Meta %s' % - key[len('x-account-meta-'):].title(), value)) + thread_manager.print_msg( + '%10s: %s', + 'Meta %s' % key[len('x-account-meta-'):].title(), + value) for key, value in headers.items(): if not key.startswith('x-account-meta-') and key not in ( 'content-length', 'date', 'x-account-container-count', 'x-account-object-count', 'x-account-bytes-used'): - print_queue.put( - '%10s: %s' % (key.title(), value)) + thread_manager.print_msg('%10s: %s', key.title(), value) except ClientException as err: if err.http_status != 404: raise - error_queue.put('Account not found') + thread_manager.error('Account not found') elif len(args) == 1: if '/' in args[0]: print >> stderr, 'WARNING: / in container name; you might have ' \ @@ -716,7 +619,7 @@ Containers: %d options.human).lstrip() bytes_used = prt_bytes(headers.get('x-container-bytes-used', 0), options.human).lstrip() - print_queue.put(''' + thread_manager.print_msg(''' Account: %s Container: %s Objects: %s @@ -724,69 +627,68 @@ Container: %s Read ACL: %s Write ACL: %s Sync To: %s - Sync Key: %s'''.strip('\n') % (conn.url.rsplit('/', 1)[-1], args[0], - object_count, bytes_used, - headers.get('x-container-read', ''), - headers.get('x-container-write', ''), - headers.get('x-container-sync-to', ''), - headers.get('x-container-sync-key', ''))) + Sync Key: %s'''.strip('\n'), conn.url.rsplit('/', 1)[-1], args[0], + object_count, bytes_used, + headers.get('x-container-read', ''), + headers.get('x-container-write', ''), + headers.get('x-container-sync-to', ''), + headers.get('x-container-sync-key', '')) for key, value in headers.items(): if key.startswith('x-container-meta-'): - print_queue.put( - '%9s: %s' % ('Meta %s' % - key[len('x-container-meta-'):].title(), value)) + thread_manager.print_msg( + '%9s: %s', + 'Meta %s' % key[len('x-container-meta-'):].title(), + value) for key, value in headers.items(): if not key.startswith('x-container-meta-') and key not in ( 'content-length', 'date', 'x-container-object-count', 'x-container-bytes-used', 'x-container-read', 'x-container-write', 'x-container-sync-to', 'x-container-sync-key'): - print_queue.put( - '%9s: %s' % (key.title(), value)) + thread_manager.print_msg('%9s: %s', key.title(), value) except ClientException as err: if err.http_status != 404: raise - error_queue.put('Container %s not found' % repr(args[0])) + thread_manager.error('Container %r not found', args[0]) elif len(args) == 2: try: headers = conn.head_object(args[0], args[1]) - print_queue.put(''' + thread_manager.print_msg(''' Account: %s Container: %s Object: %s - Content Type: %s'''.strip('\n') % (conn.url.rsplit('/', 1)[-1], args[0], - args[1], headers.get('content-type'))) + Content Type: %s'''.strip('\n'), conn.url.rsplit('/', 1)[-1], args[0], + args[1], headers.get('content-type')) if 'content-length' in headers: - print_queue.put('Content Length: %s' % - prt_bytes(headers['content-length'], - options.human).lstrip()) + thread_manager.print_msg('Content Length: %s', + prt_bytes(headers['content-length'], + options.human).lstrip()) if 'last-modified' in headers: - print_queue.put(' Last Modified: %s' % - headers['last-modified']) + thread_manager.print_msg(' Last Modified: %s', + headers['last-modified']) if 'etag' in headers: - print_queue.put(' ETag: %s' % headers['etag']) + thread_manager.print_msg(' ETag: %s', headers['etag']) if 'x-object-manifest' in headers: - print_queue.put(' Manifest: %s' % - headers['x-object-manifest']) + thread_manager.print_msg(' Manifest: %s', + headers['x-object-manifest']) for key, value in headers.items(): if key.startswith('x-object-meta-'): - print_queue.put( - '%14s: %s' % ('Meta %s' % - key[len('x-object-meta-'):].title(), value)) + thread_manager.print_msg( + '%14s: %s', + 'Meta %s' % key[len('x-object-meta-'):].title(), + value) for key, value in headers.items(): if not key.startswith('x-object-meta-') and key not in ( 'content-type', 'content-length', 'last-modified', 'etag', 'date', 'x-object-manifest'): - print_queue.put( - '%14s: %s' % (key.title(), value)) + thread_manager.print_msg('%14s: %s', key.title(), value) except ClientException as err: if err.http_status != 404: raise - error_queue.put('Object %s not found' % - repr('%s/%s' % (args[0], args[1]))) + thread_manager.error("Object %s/%s not found", args[0], args[1]) else: - error_queue.put('Usage: %s [options] %s' % - (basename(argv[0]), st_stat_help)) + thread_manager.error('Usage: %s [options] %s', basename(argv[0]), + st_stat_help) st_post_help = ''' @@ -800,7 +702,7 @@ post [options] [container] [object] post -m Color:Blue -m Size:Large'''.strip('\n') -def st_post(parser, args, print_queue, error_queue): +def st_post(parser, args, thread_manager): parser.add_option( '-r', '--read-acl', dest='read_acl', help='Sets the ' 'Read ACL for containers. Quick summary of ACL syntax: .r:*, ' @@ -831,19 +733,21 @@ def st_post(parser, args, print_queue, error_queue): exit('-r, -w, -t, and -k options only allowed for containers') conn = get_conn(options) if not args: - headers = split_headers(options.meta, 'X-Account-Meta-', error_queue) + headers = split_headers( + options.meta, 'X-Account-Meta-', thread_manager) try: conn.post_account(headers=headers) except ClientException as err: if err.http_status != 404: raise - error_queue.put('Account not found') + thread_manager.error('Account not found') elif len(args) == 1: if '/' in args[0]: print >> stderr, 'WARNING: / in container name; you might have ' \ 'meant %r instead of %r.' % \ (args[0].replace('/', ' ', 1), args[0]) - headers = split_headers(options.meta, 'X-Container-Meta-', error_queue) + headers = split_headers(options.meta, 'X-Container-Meta-', + thread_manager) if options.read_acl is not None: headers['X-Container-Read'] = options.read_acl if options.write_acl is not None: @@ -859,19 +763,18 @@ def st_post(parser, args, print_queue, error_queue): raise conn.put_container(args[0], headers=headers) elif len(args) == 2: - headers = split_headers(options.meta, 'X-Object-Meta-', error_queue) + headers = split_headers(options.meta, 'X-Object-Meta-', thread_manager) # add header options to the headers object for the request. - headers.update(split_headers(options.header, '', error_queue)) + headers.update(split_headers(options.header, '', thread_manager)) try: conn.post_object(args[0], args[1], headers=headers) except ClientException as err: if err.http_status != 404: raise - error_queue.put('Object %s not found' % - repr('%s/%s' % (args[0], args[1]))) + thread_manager.error("Object '%s/%s' not found", args[0], args[1]) else: - error_queue.put('Usage: %s [options] %s' % - (basename(argv[0]), st_post_help)) + thread_manager.error('Usage: %s [options] %s', basename(argv[0]), + st_post_help) st_upload_help = ''' @@ -885,7 +788,7 @@ upload [options] container file_or_directory [file_or_directory] [...] '''.strip('\n') -def st_upload(parser, args, print_queue, error_queue): +def st_upload(parser, args, thread_manager): parser.add_option( '-c', '--changed', action='store_true', dest='changed', default=False, help='Will only upload files that have changed since ' @@ -924,10 +827,9 @@ def st_upload(parser, args, print_queue, error_queue): (options, args) = parse_args(parser, args) args = args[1:] if len(args) < 2: - error_queue.put('Usage: %s [options] %s' % - (basename(argv[0]), st_upload_help)) + thread_manager.error( + 'Usage: %s [options] %s', basename(argv[0]), st_upload_help) return - object_queue = Queue(10000) def _segment_job(job, conn): if job.get('delete', False): @@ -945,10 +847,10 @@ def st_upload(parser, args, print_queue, error_queue): job['segment_etag'] = etag if options.verbose and 'log_line' in job: if conn.attempts > 1: - print_queue.put('%s [after %d attempts]' % - (job['log_line'], conn.attempts)) + thread_manager.print_msg('%s [after %d attempts]', + job['log_line'], conn.attempts) else: - print_queue.put(job['log_line']) + thread_manager.print_msg(job['log_line']) return job def _object_job(job, conn): @@ -998,13 +900,13 @@ def st_upload(parser, args, print_queue, error_queue): return if not options.leave_segments: old_manifest = headers.get('x-object-manifest') - if utils.config_true_value( + if config_true_value( headers.get('x-static-large-object')): headers, manifest_data = conn.get_object( container, obj, query_string='multipart-manifest=get') for old_seg in json.loads(manifest_data): - seg_path = old_seg['name'].lstrip('/') + seg_path = old_seg['path'].lstrip('/') if isinstance(seg_path, unicode): seg_path = seg_path.encode('utf-8') old_slo_manifest_paths.append(seg_path) @@ -1013,7 +915,7 @@ def st_upload(parser, args, print_queue, error_queue): raise # Merge the command line header options to the put_headers put_headers.update(split_headers(options.header, '', - error_queue)) + thread_manager)) # Don't do segment job if object is not big enough if options.segment_size and \ getsize(path) > int(options.segment_size): @@ -1021,15 +923,15 @@ def st_upload(parser, args, print_queue, error_queue): if options.segment_container: seg_container = options.segment_container full_size = getsize(path) - segment_queue = Queue(10000) - segment_threads = [ - QueueFunctionThread( - segment_queue, _segment_job, - create_connection(), store_results=True) - for _junk in xrange(options.segment_threads)] - for thread in segment_threads: - thread.start() - try: + + slo_segments = [] + error_counter = [0] + segment_manager = thread_manager.queue_manager( + _segment_job, options.segment_threads, + store_results=slo_segments, + error_counter=error_counter, + connection_maker=create_connection) + with segment_manager as segment_queue: segment = 0 segment_start = 0 while segment_start < full_size: @@ -1052,18 +954,12 @@ def st_upload(parser, args, print_queue, error_queue): 'log_line': '%s segment %s' % (obj, segment)}) segment += 1 segment_start += segment_size - finally: - shutdown_worker_threads(segment_queue, segment_threads) - if put_errors_from_threads(segment_threads, - error_queue): - raise ClientException( - 'Aborting manifest creation ' - 'because not all segments could be uploaded. ' - '%s/%s' % (container, obj)) + if error_counter[0]: + raise ClientException( + 'Aborting manifest creation ' + 'because not all segments could be uploaded. %s/%s' + % (container, obj)) if options.use_slo: - slo_segments = [] - for thread in segment_threads: - slo_segments += thread.results slo_segments.sort(key=lambda d: d['segment_index']) for seg in slo_segments: seg_loc = seg['segment_location'].lstrip('/') @@ -1097,7 +993,10 @@ def st_upload(parser, args, print_queue, error_queue): container, obj, open(path, 'rb'), content_length=getsize(path), headers=put_headers) if old_manifest or old_slo_manifest_paths: - segment_queue = Queue(10000) + segment_manager = thread_manager.queue_manager( + _segment_job, options.segment_threads, + connection_maker=create_connection) + segment_queue = segment_manager.queue if old_manifest: scontainer, sprefix = old_manifest.split('/', 1) scontainer = unquote(scontainer) @@ -1118,27 +1017,20 @@ def st_upload(parser, args, print_queue, error_queue): {'delete': True, 'container': scont, 'obj': sobj}) if not segment_queue.empty(): - segment_threads = [ - QueueFunctionThread( - segment_queue, - _segment_job, create_connection()) - for _junk in xrange(options.segment_threads)] - for thread in segment_threads: - thread.start() - shutdown_worker_threads(segment_queue, segment_threads) - put_errors_from_threads(segment_threads, error_queue) + with segment_manager: + pass if options.verbose: if conn.attempts > 1: - print_queue.put( - '%s [after %d attempts]' % (obj, conn.attempts)) + thread_manager.print_msg('%s [after %d attempts]', obj, + conn.attempts) else: - print_queue.put(obj) + thread_manager.print_msg(obj) except OSError as err: if err.errno != ENOENT: raise - error_queue.put('Local file %s not found' % repr(path)) + thread_manager.error('Local file %r not found', path) - def _upload_dir(path): + def _upload_dir(path, object_queue): names = listdir(path) if not names: object_queue.put({'path': path, 'dir_marker': True}) @@ -1146,17 +1038,13 @@ def st_upload(parser, args, print_queue, error_queue): for name in listdir(path): subpath = join(path, name) if isdir(subpath): - _upload_dir(subpath) + _upload_dir(subpath, object_queue) else: object_queue.put({'path': subpath}) create_connection = lambda: get_conn(options) - object_threads = [ - QueueFunctionThread(object_queue, _object_job, create_connection()) - for _junk in xrange(options.object_threads)] - for thread in object_threads: - thread.start() conn = create_connection() + # Try to create the container, just in case it doesn't exist. If this # fails, it might just be because the user doesn't have container PUT # permissions, so we'll ignore any error. If there's really a problem, @@ -1174,34 +1062,38 @@ def st_upload(parser, args, print_queue, error_queue): if msg: msg += ': ' msg += err.http_response_content[:60] - error_queue.put( - 'Error trying to create container %r: %s' % (args[0], msg)) + thread_manager.error( + 'Error trying to create container %r: %s', args[0], + msg) except Exception as err: - error_queue.put( - 'Error trying to create container %r: %s' % (args[0], err)) - - try: - for arg in args[1:]: - if isdir(arg): - _upload_dir(arg) - else: - object_queue.put({'path': arg}) - except ClientException as err: - if err.http_status != 404: - raise - error_queue.put('Account not found') - finally: - shutdown_worker_threads(object_queue, object_threads) - put_errors_from_threads(object_threads, error_queue) + thread_manager.error( + 'Error trying to create container %r: %s', args[0], + err) + + object_manager = thread_manager.queue_manager( + _object_job, options.object_threads, + connection_maker=create_connection) + with object_manager as object_queue: + try: + for arg in args[1:]: + if isdir(arg): + _upload_dir(arg, object_queue) + else: + object_queue.put({'path': arg}) + except ClientException as err: + if err.http_status != 404: + raise + thread_manager.error('Account not found') -def split_headers(options, prefix='', error_queue=None): +def split_headers(options, prefix='', thread_manager=None): """ Splits 'Key: Value' strings and returns them as a dictionary. :param options: An array of 'Key: Value' strings :param prefix: String to prepend to all of the keys in the dictionary. - :param error_queue: Queue for thread safe error reporting. + :param thread_manager: MultiThreadingManager for thread safe error + reporting. """ headers = {} for item in options: @@ -1211,8 +1103,8 @@ def split_headers(options, prefix='', error_queue=None): else: error_string = "Metadata parameter %s must contain a ':'.\n%s" \ % (item, st_post_help) - if error_queue: - error_queue.put(error_string) + if thread_manager: + thread_manager.error(error_string) else: exit(error_string) return headers @@ -1391,7 +1283,7 @@ Examples: help='Specify a CA bundle file to use in verifying a ' 'TLS (https) server certificate. ' 'Defaults to env[OS_CACERT]') - default_val = utils.config_true_value(environ.get('SWIFTCLIENT_INSECURE')) + default_val = config_true_value(environ.get('SWIFTCLIENT_INSECURE')) parser.add_option('--insecure', action="store_true", dest="insecure", default=default_val, @@ -1422,38 +1314,16 @@ Examples: logger = logging.getLogger("swiftclient") logging.basicConfig(level=logging.DEBUG) - print_queue = Queue(10000) - - def _print(item): - if isinstance(item, unicode): - item = item.encode('utf8') - print item + had_error = False - print_thread = QueueFunctionThread(print_queue, _print) - print_thread.start() - - error_count = 0 - error_queue = Queue(10000) - - def _error(item): - global error_count - error_count += 1 - if isinstance(item, unicode): - item = item.encode('utf8') - print >> stderr, item + with MultiThreadingManager() as thread_manager: + parser.usage = globals()['st_%s_help' % args[0]] + try: + globals()['st_%s' % args[0]](parser, argv[1:], thread_manager) + except (ClientException, HTTPException, socket.error) as err: + thread_manager.error(str(err)) - error_thread = QueueFunctionThread(error_queue, _error) - error_thread.start() + had_error = thread_manager.error_count - parser.usage = globals()['st_%s_help' % args[0]] - try: - globals()['st_%s' % args[0]](parser, argv[1:], print_queue, - error_queue) - except (ClientException, HTTPException, socket.error) as err: - error_queue.put(str(err)) - finally: - shutdown_worker_threads(print_queue, [print_thread]) - shutdown_worker_threads(error_queue, [error_thread]) - - if error_count: + if had_error: exit(1) diff --git a/doc/source/conf.py b/doc/source/conf.py index b1baab4..356b89e 100644 --- a/doc/source/conf.py +++ b/doc/source/conf.py @@ -15,8 +15,6 @@ import sys import os -import swiftclient - # If extensions (or modules to document with autodoc) are in another directory, # add these directories to sys.path here. If the directory is relative to the @@ -36,6 +34,9 @@ sys.path.insert(0, ROOT) extensions = ['sphinx.ext.autodoc', 'sphinx.ext.doctest', 'sphinx.ext.todo', 'sphinx.ext.coverage'] +autoclass_content = 'both' +autodoc_default_flags = ['members', 'undoc-members', 'show-inheritance'] + # Add any paths that contain templates here, relative to this directory. templates_path = ['_templates'] @@ -50,7 +51,7 @@ master_doc = 'index' # General information about the project. project = u'Swiftclient' -copyright = u'2012 OpenStack, LLC.' +copyright = u'2013 OpenStack, LLC.' # The version info for the project you're documenting, acts as replacement for # |version| and |release|, also used in various other places throughout the diff --git a/doc/source/swiftclient.rst b/doc/source/swiftclient.rst index bc2bac8..8c5a020 100644 --- a/doc/source/swiftclient.rst +++ b/doc/source/swiftclient.rst @@ -4,14 +4,18 @@ swiftclient ============== .. automodule:: swiftclient - :members: - :undoc-members: - :show-inheritance: swiftclient.client ================== .. automodule:: swiftclient.client - :members: - :undoc-members: - :show-inheritance: + +swiftclient.exceptions +====================== + +.. automodule:: swiftclient.exceptions + +swiftclient.multithreading +========================== + +.. automodule:: swiftclient.multithreading diff --git a/swiftclient/client.py b/swiftclient/client.py index c9012be..e50e674 100644 --- a/swiftclient/client.py +++ b/swiftclient/client.py @@ -28,6 +28,8 @@ from urlparse import urlparse, urlunparse from httplib import HTTPException, HTTPConnection, HTTPSConnection from time import sleep +from swiftclient.exceptions import ClientException, InvalidHeadersException + try: from swiftclient.https_connection import HTTPSConnectionNoSSLComp except ImportError: @@ -102,64 +104,6 @@ except ImportError: from json import loads as json_loads -class InvalidHeadersException(Exception): - pass - - -class ClientException(Exception): - - def __init__(self, msg, http_scheme='', http_host='', http_port='', - http_path='', http_query='', http_status=0, http_reason='', - http_device='', http_response_content=''): - Exception.__init__(self, msg) - self.msg = msg - self.http_scheme = http_scheme - self.http_host = http_host - self.http_port = http_port - self.http_path = http_path - self.http_query = http_query - self.http_status = http_status - self.http_reason = http_reason - self.http_device = http_device - self.http_response_content = http_response_content - - def __str__(self): - a = self.msg - b = '' - if self.http_scheme: - b += '%s://' % self.http_scheme - if self.http_host: - b += self.http_host - if self.http_port: - b += ':%s' % self.http_port - if self.http_path: - b += self.http_path - if self.http_query: - b += '?%s' % self.http_query - if self.http_status: - if b: - b = '%s %s' % (b, self.http_status) - else: - b = str(self.http_status) - if self.http_reason: - if b: - b = '%s %s' % (b, self.http_reason) - else: - b = '- %s' % self.http_reason - if self.http_device: - if b: - b = '%s: device %s' % (b, self.http_device) - else: - b = 'device %s' % self.http_device - if self.http_response_content: - if len(self.http_response_content) <= 60: - b += ' %s' % self.http_response_content - else: - b += ' [first 60 chars of response] %s' \ - % self.http_response_content[:60] - return b and '%s: %s' % (a, b) or a - - def http_connection(url, proxy=None, ssl_compression=True): """ Make an HTTPConnection or HTTPSConnection diff --git a/swiftclient/exceptions.py b/swiftclient/exceptions.py new file mode 100644 index 0000000..fe730e5 --- /dev/null +++ b/swiftclient/exceptions.py @@ -0,0 +1,72 @@ +# Copyright (c) 2010-2013 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +class ClientException(Exception): + + def __init__(self, msg, http_scheme='', http_host='', http_port='', + http_path='', http_query='', http_status=0, http_reason='', + http_device='', http_response_content=''): + Exception.__init__(self, msg) + self.msg = msg + self.http_scheme = http_scheme + self.http_host = http_host + self.http_port = http_port + self.http_path = http_path + self.http_query = http_query + self.http_status = http_status + self.http_reason = http_reason + self.http_device = http_device + self.http_response_content = http_response_content + + def __str__(self): + a = self.msg + b = '' + if self.http_scheme: + b += '%s://' % self.http_scheme + if self.http_host: + b += self.http_host + if self.http_port: + b += ':%s' % self.http_port + if self.http_path: + b += self.http_path + if self.http_query: + b += '?%s' % self.http_query + if self.http_status: + if b: + b = '%s %s' % (b, self.http_status) + else: + b = str(self.http_status) + if self.http_reason: + if b: + b = '%s %s' % (b, self.http_reason) + else: + b = '- %s' % self.http_reason + if self.http_device: + if b: + b = '%s: device %s' % (b, self.http_device) + else: + b = 'device %s' % self.http_device + if self.http_response_content: + if len(self.http_response_content) <= 60: + b += ' %s' % self.http_response_content + else: + b += ' [first 60 chars of response] %s' \ + % self.http_response_content[:60] + return b and '%s: %s' % (a, b) or a + + +class InvalidHeadersException(Exception): + pass diff --git a/swiftclient/multithreading.py b/swiftclient/multithreading.py new file mode 100644 index 0000000..890a789 --- /dev/null +++ b/swiftclient/multithreading.py @@ -0,0 +1,241 @@ +# Copyright (c) 2010-2012 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import sys +from time import sleep +from Queue import Queue +from threading import Thread +from traceback import format_exception + +from swiftclient.exceptions import ClientException + + +class StopWorkerThreadSignal(object): + pass + + +class QueueFunctionThread(Thread): + """ + Calls `func`` for each item in ``queue``; ``func`` is called with a + de-queued item as the first arg followed by ``*args`` and ``**kwargs``. + + Any exceptions raised by ``func`` are stored in :attr:`self.exc_infos`. + + If the optional kwarg ``store_results`` is specified, it must be a list and + each result of invoking ``func`` will be appended to that list. + + Putting a :class:`StopWorkerThreadSignal` instance into queue will cause + this thread to exit. + """ + + def __init__(self, queue, func, *args, **kwargs): + """ + :param queue: A :class:`Queue` object from which work jobs will be + pulled. + :param func: A callable which will be invoked with a dequeued item + followed by ``*args`` and ``**kwargs``. + :param \*args: Optional positional arguments for ``func``. + :param \*\*kwargs: Optional kwargs for func. If the kwarg + ``store_results`` is specified, its value must be a + list, and every result from invoking ``func`` will + be appended to the supplied list. The kwarg + ``store_results`` will not be passed into ``func``. + """ + Thread.__init__(self) + self.queue = queue + self.func = func + self.args = args + self.kwargs = kwargs + self.exc_infos = [] + self.store_results = kwargs.pop('store_results', None) + + def run(self): + while True: + item = self.queue.get() + if isinstance(item, StopWorkerThreadSignal): + break + try: + result = self.func(item, *self.args, **self.kwargs) + if self.store_results is not None: + self.store_results.append(result) + except Exception: + self.exc_infos.append(sys.exc_info()) + + +class QueueFunctionManager(object): + """ + A context manager to handle the life-cycle of a single :class:`Queue` + and a list of associated :class:`QueueFunctionThread` instances. + + This class is not usually instantiated directly. Instead, call the + :meth:`MultiThreadingManager.queue_manager` object method, + which will return an instance of this class. + + When entering the context, ``thread_count`` :class:`QueueFunctionThread` + instances are created and started. The input queue is returned. Inside + the context, any work item put into the queue will get worked on by one of + the :class:`QueueFunctionThread` instances. + + When the context is exited, all threads are sent a + :class:`StopWorkerThreadSignal` instance and then all threads are waited + upon. Finally, any exceptions from any of the threads are reported on via + the supplied ``thread_manager``'s :meth:`error` method. If an + ``error_counter`` list was supplied on instantiation, its first element is + incremented once for every exception which occurred. + """ + + def __init__(self, func, thread_count, thread_manager, thread_args=None, + thread_kwargs=None, error_counter=None, + connection_maker=None): + """ + :param func: The worker function which will be passed into each + :class:`QueueFunctionThread`'s constructor. + :param thread_count: The number of worker threads to run. + :param thread_manager: An instance of :class:`MultiThreadingManager`. + :param thread_args: Optional positional arguments to be passed into + each invocation of ``func`` after the de-queued + work item. + :param thread_kwargs: Optional keyword arguments to be passed into each + invocation of ``func``. If a list is supplied as + the ``store_results`` keyword argument, it will + be filled with every result of invoking ``func`` + in all threads. + :param error_counter: Optional list containing one integer. If + supplied, the list's first element will be + incremented once for each exception in any + thread. This happens only when exiting the + context. + :param connection_maker: Optional callable. If supplied, this callable + will be invoked once per created thread, and + the result will be passed into func after the + de-queued work item but before ``thread_args`` + and ``thread_kwargs``. This is used to ensure + each thread has its own connection to Swift. + """ + self.func = func + self.thread_count = thread_count + self.thread_manager = thread_manager + self.error_counter = error_counter + self.connection_maker = connection_maker + self.queue = Queue(10000) + self.thread_list = [] + self.thread_args = thread_args if thread_args else () + self.thread_kwargs = thread_kwargs if thread_kwargs else {} + + def __enter__(self): + for _junk in xrange(self.thread_count): + if self.connection_maker: + thread_args = (self.connection_maker(),) + self.thread_args + else: + thread_args = self.thread_args + qf_thread = QueueFunctionThread(self.queue, self.func, + *thread_args, **self.thread_kwargs) + qf_thread.start() + self.thread_list.append(qf_thread) + return self.queue + + def __exit__(self, exc_type, exc_value, traceback): + for thread in [t for t in self.thread_list if t.isAlive()]: + self.queue.put(StopWorkerThreadSignal()) + + while any(map(QueueFunctionThread.is_alive, self.thread_list)): + sleep(0.05) + + for thread in self.thread_list: + for info in thread.exc_infos: + if self.error_counter: + self.error_counter[0] += 1 + if isinstance(info[1], ClientException): + self.thread_manager.error(str(info[1])) + else: + self.thread_manager.error(''.join(format_exception(*info))) + + +class MultiThreadingManager(object): + """ + One object to manage context for multi-threading. This should make + bin/swift less error-prone and allow us to test this code. + + This object is a context manager and returns itself into the context. When + entering the context, two printing threads are created (see below) and they + are waited on and cleaned up when exiting the context. + + A convenience method, :meth:`queue_manager`, is provided to create a + :class:`QueueFunctionManager` context manager (a thread-pool with an + associated input queue for work items). + + Also, thread-safe printing to two streams is provided. The + :meth:`print_msg` method will print to the supplied ``print_stream`` + (defaults to ``sys.stdout``) and the :meth:`error` method will print to the + supplied ``error_stream`` (defaults to ``sys.stderr``). Both of these + printing methods will format the given string with any supplied ``*args`` + (a la printf) and encode the result to utf8 if necessary. + + The attribute :attr:`self.error_count` is incremented once per error + message printed, so an application can tell if any worker threads + encountered exceptions or otherwise called :meth:`error` on this instance. + The swift command-line tool uses this to exit non-zero if any error strings + were printed. + """ + + def __init__(self, print_stream=sys.stdout, error_stream=sys.stderr): + """ + :param print_stream: The stream to which :meth:`print_msg` sends + formatted messages, encoded to utf8 if necessary. + :param error_stream: The stream to which :meth:`error` sends formatted + messages, encoded to utf8 if necessary. + """ + self.print_stream = print_stream + self.printer = QueueFunctionManager(self._print, 1, self) + self.error_stream = error_stream + self.error_printer = QueueFunctionManager(self._print_error, 1, self) + self.error_count = 0 + + def __enter__(self): + self.printer.__enter__() + self.error_printer.__enter__() + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.error_printer.__exit__(exc_type, exc_value, traceback) + self.printer.__exit__(exc_type, exc_value, traceback) + + def queue_manager(self, func, thread_count, *args, **kwargs): + connection_maker = kwargs.pop('connection_maker', None) + error_counter = kwargs.pop('error_counter', None) + return QueueFunctionManager(func, thread_count, self, thread_args=args, + thread_kwargs=kwargs, + connection_maker=connection_maker, + error_counter=error_counter) + + def print_msg(self, msg, *fmt_args): + if fmt_args: + msg = msg % fmt_args + self.printer.queue.put(msg) + + def error(self, msg, *fmt_args): + if fmt_args: + msg = msg % fmt_args + self.error_printer.queue.put(msg) + + def _print(self, item, stream=None): + if stream is None: + stream = self.print_stream + if isinstance(item, unicode): + item = item.encode('utf8') + print >>stream, item + + def _print_error(self, item): + self.error_count += 1 + return self._print(item, stream=self.error_stream) diff --git a/swiftclient/utils.py b/swiftclient/utils.py index f309d29..33d89a5 100644 --- a/swiftclient/utils.py +++ b/swiftclient/utils.py @@ -12,7 +12,6 @@ # implied. # See the License for the specific language governing permissions and # limitations under the License. - """Miscellaneous utility functions for use with Swift.""" TRUE_VALUES = set(('true', '1', 'yes', 'on', 't', 'y')) diff --git a/tests/test_multithreading.py b/tests/test_multithreading.py new file mode 100644 index 0000000..5a28582 --- /dev/null +++ b/tests/test_multithreading.py @@ -0,0 +1,334 @@ +# Copyright (c) 2010-2013 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys +import time +import mock +import testtools +import threading +from cStringIO import StringIO +from Queue import Queue, Empty + +from swiftclient import multithreading as mt +from swiftclient.exceptions import ClientException + + +class ThreadTestCase(testtools.TestCase): + def setUp(self): + super(ThreadTestCase, self).setUp() + self.got_args_kwargs = Queue() + self.starting_thread_count = threading.active_count() + + def _func(self, q_item, *args, **kwargs): + self.got_items.put(q_item) + self.got_args_kwargs.put((args, kwargs)) + + if q_item == 'go boom': + raise Exception('I went boom!') + if q_item == 'c boom': + raise ClientException( + 'Client Boom', http_scheme='http', http_host='192.168.22.1', + http_port=80, http_path='/booze', http_status=404, + http_reason='to much', http_response_content='no sir!') + + return 'best result EVAR!' + + def assertQueueContains(self, queue, expected_contents): + got_contents = [] + try: + while True: + got_contents.append(queue.get(timeout=0.1)) + except Empty: + pass + if isinstance(expected_contents, set): + got_contents = set(got_contents) + self.assertEqual(expected_contents, got_contents) + + +class TestQueueFunctionThread(ThreadTestCase): + def setUp(self): + super(TestQueueFunctionThread, self).setUp() + + self.input_queue = Queue() + self.got_items = Queue() + self.stored_results = [] + + self.qft = mt.QueueFunctionThread(self.input_queue, self._func, + 'one_arg', 'two_arg', + red_fish='blue_arg', + store_results=self.stored_results) + self.qft.start() + + def tearDown(self): + if self.qft.is_alive(): + self.finish_up_thread() + + super(TestQueueFunctionThread, self).tearDown() + + def finish_up_thread(self): + self.input_queue.put(mt.StopWorkerThreadSignal()) + while self.qft.is_alive(): + time.sleep(0.05) + + def test_plumbing_and_store_results(self): + self.input_queue.put('abc') + self.input_queue.put(123) + self.finish_up_thread() + + self.assertQueueContains(self.got_items, ['abc', 123]) + self.assertQueueContains(self.got_args_kwargs, [ + (('one_arg', 'two_arg'), {'red_fish': 'blue_arg'}), + (('one_arg', 'two_arg'), {'red_fish': 'blue_arg'})]) + self.assertEqual(self.stored_results, + ['best result EVAR!', 'best result EVAR!']) + + def test_exception_handling(self): + self.input_queue.put('go boom') + self.input_queue.put('ok') + self.input_queue.put('go boom') + self.finish_up_thread() + + self.assertQueueContains(self.got_items, + ['go boom', 'ok', 'go boom']) + self.assertEqual(len(self.qft.exc_infos), 2) + self.assertEqual(Exception, self.qft.exc_infos[0][0]) + self.assertEqual(Exception, self.qft.exc_infos[1][0]) + self.assertEqual(('I went boom!',), self.qft.exc_infos[0][1].args) + self.assertEqual(('I went boom!',), self.qft.exc_infos[1][1].args) + + +class TestQueueFunctionManager(ThreadTestCase): + def setUp(self): + super(TestQueueFunctionManager, self).setUp() + self.thread_manager = mock.create_autospec( + mt.MultiThreadingManager, spec_set=True, instance=True) + self.thread_count = 4 + self.error_counter = [0] + self.got_items = Queue() + self.stored_results = [] + self.qfq = mt.QueueFunctionManager( + self._func, self.thread_count, self.thread_manager, + thread_args=('1arg', '2arg'), + thread_kwargs={'a': 'b', 'store_results': self.stored_results}, + error_counter=self.error_counter, + connection_maker=self.connection_maker) + + def connection_maker(self): + return 'yup, I made a connection' + + def test_context_manager_without_error_counter(self): + self.qfq = mt.QueueFunctionManager( + self._func, self.thread_count, self.thread_manager, + thread_args=('1arg', '2arg'), + thread_kwargs={'a': 'b', 'store_results': self.stored_results}, + connection_maker=self.connection_maker) + + with self.qfq as input_queue: + self.assertEqual(self.starting_thread_count + self.thread_count, + threading.active_count()) + input_queue.put('go boom') + + self.assertEqual(self.starting_thread_count, threading.active_count()) + error_strs = map(str, self.thread_manager.error.call_args_list) + self.assertEqual(1, len(error_strs)) + self.assertTrue('Exception: I went boom!' in error_strs[0]) + + def test_context_manager_without_conn_maker_or_error_counter(self): + self.qfq = mt.QueueFunctionManager( + self._func, self.thread_count, self.thread_manager, + thread_args=('1arg', '2arg'), thread_kwargs={'a': 'b'}) + + with self.qfq as input_queue: + self.assertEqual(self.starting_thread_count + self.thread_count, + threading.active_count()) + for i in xrange(20): + input_queue.put('slap%d' % i) + + self.assertEqual(self.starting_thread_count, threading.active_count()) + self.assertEqual([], self.thread_manager.error.call_args_list) + self.assertEqual(0, self.error_counter[0]) + self.assertQueueContains(self.got_items, + set(['slap%d' % i for i in xrange(20)])) + self.assertQueueContains( + self.got_args_kwargs, + [(('1arg', '2arg'), {'a': 'b'})] * 20) + self.assertEqual(self.stored_results, []) + + def test_context_manager_with_exceptions(self): + with self.qfq as input_queue: + self.assertEqual(self.starting_thread_count + self.thread_count, + threading.active_count()) + for i in xrange(20): + input_queue.put('item%d' % i if i % 2 == 0 else 'go boom') + + self.assertEqual(self.starting_thread_count, threading.active_count()) + error_strs = map(str, self.thread_manager.error.call_args_list) + self.assertEqual(10, len(error_strs)) + self.assertTrue(all(['Exception: I went boom!' in s for s in + error_strs])) + self.assertEqual(10, self.error_counter[0]) + expected_items = set(['go boom'] + ['item%d' % i for i in xrange(20) + if i % 2 == 0]) + self.assertQueueContains(self.got_items, expected_items) + self.assertQueueContains( + self.got_args_kwargs, + [(('yup, I made a connection', '1arg', '2arg'), {'a': 'b'})] * 20) + self.assertEqual(self.stored_results, ['best result EVAR!'] * 10) + + def test_context_manager_with_client_exceptions(self): + with self.qfq as input_queue: + self.assertEqual(self.starting_thread_count + self.thread_count, + threading.active_count()) + for i in xrange(20): + input_queue.put('item%d' % i if i % 2 == 0 else 'c boom') + + self.assertEqual(self.starting_thread_count, threading.active_count()) + error_strs = map(str, self.thread_manager.error.call_args_list) + self.assertEqual(10, len(error_strs)) + stringification = 'Client Boom: ' \ + 'http://192.168.22.1:80/booze 404 to much no sir!' + self.assertTrue(all([stringification in s for s in error_strs])) + self.assertEqual(10, self.error_counter[0]) + expected_items = set(['c boom'] + ['item%d' % i for i in xrange(20) + if i % 2 == 0]) + self.assertQueueContains(self.got_items, expected_items) + self.assertQueueContains( + self.got_args_kwargs, + [(('yup, I made a connection', '1arg', '2arg'), {'a': 'b'})] * 20) + self.assertEqual(self.stored_results, ['best result EVAR!'] * 10) + + def test_context_manager_with_connection_maker(self): + with self.qfq as input_queue: + self.assertEqual(self.starting_thread_count + self.thread_count, + threading.active_count()) + for i in xrange(20): + input_queue.put('item%d' % i) + + self.assertEqual(self.starting_thread_count, threading.active_count()) + self.assertEqual([], self.thread_manager.error.call_args_list) + self.assertEqual(0, self.error_counter[0]) + self.assertQueueContains(self.got_items, + set(['item%d' % i for i in xrange(20)])) + self.assertQueueContains( + self.got_args_kwargs, + [(('yup, I made a connection', '1arg', '2arg'), {'a': 'b'})] * 20) + self.assertEqual(self.stored_results, ['best result EVAR!'] * 20) + + +class TestMultiThreadingManager(ThreadTestCase): + + @mock.patch('swiftclient.multithreading.QueueFunctionManager') + def test_instantiation(self, mock_qfq): + thread_manager = mt.MultiThreadingManager() + + self.assertEqual([ + mock.call(thread_manager._print, 1, thread_manager), + mock.call(thread_manager._print_error, 1, thread_manager), + ], mock_qfq.call_args_list) + + # These contexts don't get entered into until the + # MultiThreadingManager's context is entered. + self.assertEqual([], thread_manager.printer.__enter__.call_args_list) + self.assertEqual([], + thread_manager.error_printer.__enter__.call_args_list) + + # Test default values for the streams. + self.assertEqual(sys.stdout, thread_manager.print_stream) + self.assertEqual(sys.stderr, thread_manager.error_stream) + + @mock.patch('swiftclient.multithreading.QueueFunctionManager') + def test_queue_manager_no_args(self, mock_qfq): + thread_manager = mt.MultiThreadingManager() + + mock_qfq.reset_mock() + mock_qfq.return_value = 'slap happy!' + + self.assertEqual( + 'slap happy!', + thread_manager.queue_manager(self._func, 88)) + + self.assertEqual([ + mock.call(self._func, 88, thread_manager, thread_args=(), + thread_kwargs={}, connection_maker=None, + error_counter=None) + ], mock_qfq.call_args_list) + + @mock.patch('swiftclient.multithreading.QueueFunctionManager') + def test_queue_manager_with_args(self, mock_qfq): + thread_manager = mt.MultiThreadingManager() + + mock_qfq.reset_mock() + mock_qfq.return_value = 'do run run' + + self.assertEqual( + 'do run run', + thread_manager.queue_manager(self._func, 88, 'fun', times='are', + connection_maker='abc', to='be had', + error_counter='def')) + + self.assertEqual([ + mock.call(self._func, 88, thread_manager, thread_args=('fun',), + thread_kwargs={'times': 'are', 'to': 'be had'}, + connection_maker='abc', error_counter='def') + ], mock_qfq.call_args_list) + + def test_printers(self): + out_stream = StringIO() + err_stream = StringIO() + + with mt.MultiThreadingManager( + print_stream=out_stream, + error_stream=err_stream) as thread_manager: + + # Sanity-checking these gives power to the previous test which + # looked at the default values of thread_manager.print/error_stream + self.assertEqual(out_stream, thread_manager.print_stream) + self.assertEqual(err_stream, thread_manager.error_stream) + + self.assertEqual(self.starting_thread_count + 2, + threading.active_count()) + + thread_manager.print_msg('one-argument') + thread_manager.print_msg('one %s, %d fish', 'fish', 88) + thread_manager.error('I have %d problems, but a %s is not one', + 99, u'\u062A\u062A') + thread_manager.print_msg('some\n%s\nover the %r', 'where', + u'\u062A\u062A') + thread_manager.error('one-error-argument') + thread_manager.error('Sometimes\n%.1f%% just\ndoes not\nwork!', + 3.14159) + + self.assertEqual(self.starting_thread_count, threading.active_count()) + + out_stream.seek(0) + self.assertEqual([ + 'one-argument\n', + 'one fish, 88 fish\n', + 'some\n', 'where\n', "over the u'\\u062a\\u062a'\n", + ], list(out_stream.readlines())) + + err_stream.seek(0) + self.assertEqual([ + u'I have 99 problems, but a \u062A\u062A is not one\n'.encode( + 'utf8'), + 'one-error-argument\n', + 'Sometimes\n', '3.1% just\n', 'does not\n', 'work!\n', + ], list(err_stream.readlines())) + + self.assertEqual(3, thread_manager.error_count) + + +if __name__ == '__main__': + testtools.main() |