summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoel Wright <joel.wright@sohonet.com>2014-04-04 21:13:01 +0200
committerJoel Wright <joel.wright@sohonet.com>2014-08-26 14:14:21 +0200
commit24673f8d19fe2f48964f528369081c37e880ec47 (patch)
tree8cd0cabfc9b8d858339da556fe561674dd8bc83a
parentd97ec374cb1ef91c34e49302842e5a151ee3e476 (diff)
downloadpython-swiftclient-24673f8d19fe2f48964f528369081c37e880ec47.tar.gz
Add importable SwiftService incorporating shell.py logic
This patch adds a SwiftService class that incorporates the high level logic from swiftclient/shell.py. It also ports shell.py to use the new class, and updates the code in swiftclient/multithreading.py to allow the SwiftService to be used for multiple operations whilst using only one thread pool. Currently, code that imports swiftclient has to have its own logic for things like creating large objects, parallel uploads, and parallel downloads. This patch adds a SwiftService class that makes that functionality available in Python code as well as through the shell. Change-Id: I08c5796b4c01001d79fd571651c3017c16462ffd Implements: blueprint bin-swift-logic-as-importable-library
-rw-r--r--doc/source/swiftclient.rst5
-rw-r--r--requirements.txt1
-rw-r--r--swiftclient/command_helpers.py181
-rw-r--r--swiftclient/multithreading.py286
-rw-r--r--swiftclient/service.py2062
-rwxr-xr-xswiftclient/shell.py1337
-rw-r--r--tests/unit/test_command_helpers.py117
-rw-r--r--tests/unit/test_multithreading.py377
-rw-r--r--tests/unit/test_shell.py115
9 files changed, 2985 insertions, 1496 deletions
diff --git a/doc/source/swiftclient.rst b/doc/source/swiftclient.rst
index 8c5a020..0a07471 100644
--- a/doc/source/swiftclient.rst
+++ b/doc/source/swiftclient.rst
@@ -10,6 +10,11 @@ swiftclient.client
.. automodule:: swiftclient.client
+swiftclient.service
+===================
+
+.. automodule:: swiftclient.service
+
swiftclient.exceptions
======================
diff --git a/requirements.txt b/requirements.txt
index 3cb4ea2..e7c0d41 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,3 +1,4 @@
+futures>=2.1.3
requests>=1.1
simplejson>=2.0.9
six>=1.5.2
diff --git a/swiftclient/command_helpers.py b/swiftclient/command_helpers.py
index ef18636..9a78b9b 100644
--- a/swiftclient/command_helpers.py
+++ b/swiftclient/command_helpers.py
@@ -14,114 +14,167 @@
from swiftclient.utils import prt_bytes
-def stat_account(conn, options, thread_manager):
- items_to_print = []
+POLICY_HEADER_PREFIX = 'x-account-storage-policy-'
+
+
+def stat_account(conn, options):
+ items = []
headers = conn.head_account()
- if options.verbose > 1:
- items_to_print.extend((
+ if options['verbose'] > 1:
+ items.extend([
('StorageURL', conn.url),
('Auth Token', conn.token),
- ))
+ ])
container_count = int(headers.get('x-account-container-count', 0))
object_count = prt_bytes(headers.get('x-account-object-count', 0),
- options.human).lstrip()
+ options['human']).lstrip()
bytes_used = prt_bytes(headers.get('x-account-bytes-used', 0),
- options.human).lstrip()
- items_to_print.extend((
+ options['human']).lstrip()
+ items.extend([
('Account', conn.url.rsplit('/', 1)[-1]),
('Containers', container_count),
('Objects', object_count),
('Bytes', bytes_used),
- ))
+ ])
+
policies = set()
- exclude_policy_headers = []
- ps_header_prefix = 'x-account-storage-policy-'
for header_key, header_value in headers.items():
- if header_key.lower().startswith(ps_header_prefix):
+ if header_key.lower().startswith(POLICY_HEADER_PREFIX):
policy_name = header_key.rsplit('-', 2)[0].split('-', 4)[-1]
policies.add(policy_name)
- exclude_policy_headers.append(header_key)
+
for policy in policies:
- items_to_print.extend((
+ items.extend((
('Objects in policy "' + policy + '"',
- prt_bytes(headers.get(ps_header_prefix + policy + '-object-count',
- 0), options.human).lstrip()),
+ prt_bytes(
+ headers.get(
+ POLICY_HEADER_PREFIX + policy + '-object-count', 0),
+ options['human']
+ ).lstrip()),
('Bytes in policy "' + policy + '"',
- prt_bytes(headers.get(ps_header_prefix + policy + '-bytes-used',
- 0), options.human).lstrip()),
+ prt_bytes(
+ headers.get(
+ POLICY_HEADER_PREFIX + policy + '-bytes-used', 0),
+ options['human']
+ ).lstrip()),
))
- items_to_print.extend(thread_manager.headers_to_items(
+ return items, headers
+
+
+def print_account_stats(items, headers, output_manager):
+ exclude_policy_headers = []
+ for header_key, header_value in headers.items():
+ if header_key.lower().startswith(POLICY_HEADER_PREFIX):
+ exclude_policy_headers.append(header_key)
+
+ items.extend(headers_to_items(
headers, meta_prefix='x-account-meta-',
exclude_headers=([
'content-length', 'date',
'x-account-container-count',
'x-account-object-count',
'x-account-bytes-used'] + exclude_policy_headers)))
+
# line up the items nicely
- offset = max(len(item) for item, value in items_to_print)
- thread_manager.print_items(items_to_print, offset=offset)
+ offset = max(len(item) for item, value in items)
+ output_manager.print_items(items, offset=offset)
-def stat_container(conn, options, args, thread_manager):
- headers = conn.head_container(args[0])
- if options.verbose > 1:
- path = '%s/%s' % (conn.url, args[0])
- thread_manager.print_items((
+def stat_container(conn, options, container):
+ headers = conn.head_container(container)
+ items = []
+ if options['verbose'] > 1:
+ path = '%s/%s' % (conn.url, container)
+ items.extend([
('URL', path),
- ('Auth Token', conn.token),
- ))
+ ('Auth Token', conn.token)
+ ])
object_count = prt_bytes(
headers.get('x-container-object-count', 0),
- options.human).lstrip()
+ options['human']).lstrip()
bytes_used = prt_bytes(headers.get('x-container-bytes-used', 0),
- options.human).lstrip()
- thread_manager.print_items((
+ options['human']).lstrip()
+ items.extend([
('Account', conn.url.rsplit('/', 1)[-1]),
- ('Container', args[0]),
+ ('Container', container),
('Objects', object_count),
('Bytes', bytes_used),
('Read ACL', headers.get('x-container-read', '')),
('Write ACL', headers.get('x-container-write', '')),
('Sync To', headers.get('x-container-sync-to', '')),
- ('Sync Key', headers.get('x-container-sync-key', '')),
+ ('Sync Key', headers.get('x-container-sync-key', ''))
+ ])
+ return items, headers
+
+
+def print_container_stats(items, headers, output_manager):
+ items.extend(headers_to_items(
+ headers,
+ meta_prefix='x-container-meta-',
+ exclude_headers=(
+ 'content-length', 'date',
+ 'x-container-object-count',
+ 'x-container-bytes-used',
+ 'x-container-read',
+ 'x-container-write',
+ 'x-container-sync-to',
+ 'x-container-sync-key'
+ )
))
- thread_manager.print_headers(headers,
- meta_prefix='x-container-meta-',
- exclude_headers=(
- 'content-length', 'date',
- 'x-container-object-count',
- 'x-container-bytes-used',
- 'x-container-read',
- 'x-container-write',
- 'x-container-sync-to',
- 'x-container-sync-key'))
-
-
-def stat_object(conn, options, args, thread_manager):
- headers = conn.head_object(args[0], args[1])
- if options.verbose > 1:
- path = '%s/%s/%s' % (conn.url, args[0], args[1])
- thread_manager.print_items((
+ # line up the items nicely
+ offset = max(len(item) for item, value in items)
+ output_manager.print_items(items, offset=offset)
+
+
+def stat_object(conn, options, container, obj):
+ headers = conn.head_object(container, obj)
+ items = []
+ if options['verbose'] > 1:
+ path = '%s/%s/%s' % (conn.url, container, obj)
+ items.extend([
('URL', path),
- ('Auth Token', conn.token),
- ))
+ ('Auth Token', conn.token)
+ ])
content_length = prt_bytes(headers.get('content-length', 0),
- options.human).lstrip()
- thread_manager.print_items((
+ options['human']).lstrip()
+ items.extend([
('Account', conn.url.rsplit('/', 1)[-1]),
- ('Container', args[0]),
- ('Object', args[1]),
+ ('Container', container),
+ ('Object', obj),
('Content Type', headers.get('content-type')),
('Content Length', content_length),
('Last Modified', headers.get('last-modified')),
('ETag', headers.get('etag')),
- ('Manifest', headers.get('x-object-manifest')),
- ), skip_missing=True)
- thread_manager.print_headers(headers,
- meta_prefix='x-object-meta-',
- exclude_headers=(
- 'content-type', 'content-length',
- 'last-modified', 'etag', 'date',
- 'x-object-manifest'))
+ ('Manifest', headers.get('x-object-manifest'))
+ ])
+ return items, headers
+
+
+def print_object_stats(items, headers, output_manager):
+ items.extend(headers_to_items(
+ headers,
+ meta_prefix='x-object-meta-',
+ exclude_headers=(
+ 'content-type', 'content-length',
+ 'last-modified', 'etag', 'date',
+ 'x-object-manifest')
+ ))
+ # line up the items nicely
+ offset = max(len(item) for item, value in items)
+ output_manager.print_items(items, offset=offset, skip_missing=True)
+
+
+def headers_to_items(headers, meta_prefix='', exclude_headers=None):
+ exclude_headers = exclude_headers or []
+ other_items = []
+ meta_items = []
+ for key, value in headers.items():
+ if key not in exclude_headers:
+ if key.startswith(meta_prefix):
+ meta_key = 'Meta %s' % key[len(meta_prefix):].title()
+ meta_items.append((meta_key, value))
+ else:
+ other_items.append((key.title(), value))
+ return meta_items + other_items
diff --git a/swiftclient/multithreading.py b/swiftclient/multithreading.py
index d187091..a2dcd71 100644
--- a/swiftclient/multithreading.py
+++ b/swiftclient/multithreading.py
@@ -15,171 +15,21 @@
from __future__ import print_function
-from itertools import chain
import six
import sys
-from time import sleep
-from six.moves.queue import Queue
-from threading import Thread
-from traceback import format_exception
-from swiftclient.exceptions import ClientException
+from concurrent.futures import ThreadPoolExecutor
+from six.moves.queue import PriorityQueue
-class StopWorkerThreadSignal(object):
- pass
-
-
-class QueueFunctionThread(Thread):
- """
- Calls `func`` for each item in ``queue``; ``func`` is called with a
- de-queued item as the first arg followed by ``*args`` and ``**kwargs``.
-
- Any exceptions raised by ``func`` are stored in :attr:`self.exc_infos`.
-
- If the optional kwarg ``store_results`` is specified, it must be a list and
- each result of invoking ``func`` will be appended to that list.
-
- Putting a :class:`StopWorkerThreadSignal` instance into queue will cause
- this thread to exit.
- """
-
- def __init__(self, queue, func, *args, **kwargs):
- """
- :param queue: A :class:`Queue` object from which work jobs will be
- pulled.
- :param func: A callable which will be invoked with a dequeued item
- followed by ``*args`` and ``**kwargs``.
- :param \*args: Optional positional arguments for ``func``.
- :param \*\*kwargs: Optional kwargs for func. If the kwarg
- ``store_results`` is specified, its value must be a
- list, and every result from invoking ``func`` will
- be appended to the supplied list. The kwarg
- ``store_results`` will not be passed into ``func``.
- """
- Thread.__init__(self)
- self.queue = queue
- self.func = func
- self.args = args
- self.kwargs = kwargs
- self.exc_infos = []
- self.store_results = kwargs.pop('store_results', None)
-
- def run(self):
- while True:
- item = self.queue.get()
- if isinstance(item, StopWorkerThreadSignal):
- break
- try:
- result = self.func(item, *self.args, **self.kwargs)
- if self.store_results is not None:
- self.store_results.append(result)
- except Exception:
- self.exc_infos.append(sys.exc_info())
-
-
-class QueueFunctionManager(object):
+class OutputManager(object):
"""
- A context manager to handle the life-cycle of a single :class:`Queue`
- and a list of associated :class:`QueueFunctionThread` instances.
-
- This class is not usually instantiated directly. Instead, call the
- :meth:`MultiThreadingManager.queue_manager` object method,
- which will return an instance of this class.
-
- When entering the context, ``thread_count`` :class:`QueueFunctionThread`
- instances are created and started. The input queue is returned. Inside
- the context, any work item put into the queue will get worked on by one of
- the :class:`QueueFunctionThread` instances.
-
- When the context is exited, all threads are sent a
- :class:`StopWorkerThreadSignal` instance and then all threads are waited
- upon. Finally, any exceptions from any of the threads are reported on via
- the supplied ``thread_manager``'s :meth:`error` method. If an
- ``error_counter`` list was supplied on instantiation, its first element is
- incremented once for every exception which occurred.
- """
-
- def __init__(self, func, thread_count, thread_manager, thread_args=None,
- thread_kwargs=None, error_counter=None,
- connection_maker=None):
- """
- :param func: The worker function which will be passed into each
- :class:`QueueFunctionThread`'s constructor.
- :param thread_count: The number of worker threads to run.
- :param thread_manager: An instance of :class:`MultiThreadingManager`.
- :param thread_args: Optional positional arguments to be passed into
- each invocation of ``func`` after the de-queued
- work item.
- :param thread_kwargs: Optional keyword arguments to be passed into each
- invocation of ``func``. If a list is supplied as
- the ``store_results`` keyword argument, it will
- be filled with every result of invoking ``func``
- in all threads.
- :param error_counter: Optional list containing one integer. If
- supplied, the list's first element will be
- incremented once for each exception in any
- thread. This happens only when exiting the
- context.
- :param connection_maker: Optional callable. If supplied, this callable
- will be invoked once per created thread, and
- the result will be passed into func after the
- de-queued work item but before ``thread_args``
- and ``thread_kwargs``. This is used to ensure
- each thread has its own connection to Swift.
- """
- self.func = func
- self.thread_count = thread_count
- self.thread_manager = thread_manager
- self.error_counter = error_counter
- self.connection_maker = connection_maker
- self.queue = Queue(10000)
- self.thread_list = []
- self.thread_args = thread_args if thread_args else ()
- self.thread_kwargs = thread_kwargs if thread_kwargs else {}
-
- def __enter__(self):
- for _junk in range(self.thread_count):
- if self.connection_maker:
- thread_args = (self.connection_maker(),) + self.thread_args
- else:
- thread_args = self.thread_args
- qf_thread = QueueFunctionThread(self.queue, self.func,
- *thread_args, **self.thread_kwargs)
- qf_thread.start()
- self.thread_list.append(qf_thread)
- return self.queue
-
- def __exit__(self, exc_type, exc_value, traceback):
- for thread in [t for t in self.thread_list if t.isAlive()]:
- self.queue.put(StopWorkerThreadSignal())
-
- while any(map(QueueFunctionThread.is_alive, self.thread_list)):
- sleep(0.05)
-
- for thread in self.thread_list:
- for info in thread.exc_infos:
- if self.error_counter:
- self.error_counter[0] += 1
- if isinstance(info[1], ClientException):
- self.thread_manager.error(str(info[1]))
- else:
- self.thread_manager.error(''.join(format_exception(*info)))
-
-
-class MultiThreadingManager(object):
- """
- One object to manage context for multi-threading. This should make
- bin/swift less error-prone and allow us to test this code.
+ One object to manage and provide helper functions for output.
This object is a context manager and returns itself into the context. When
entering the context, two printing threads are created (see below) and they
are waited on and cleaned up when exiting the context.
- A convenience method, :meth:`queue_manager`, is provided to create a
- :class:`QueueFunctionManager` context manager (a thread-pool with an
- associated input queue for work items).
-
Also, thread-safe printing to two streams is provided. The
:meth:`print_msg` method will print to the supplied ``print_stream``
(defaults to ``sys.stdout``) and the :meth:`error` method will print to the
@@ -198,39 +48,29 @@ class MultiThreadingManager(object):
def __init__(self, print_stream=sys.stdout, error_stream=sys.stderr):
"""
:param print_stream: The stream to which :meth:`print_msg` sends
- formatted messages
+ formatted messages.
:param error_stream: The stream to which :meth:`error` sends formatted
- messages
+ messages.
On Python 2, Unicode messages are encoded to utf8.
"""
self.print_stream = print_stream
- self.printer = QueueFunctionManager(self._print, 1, self)
+ self.print_pool = ThreadPoolExecutor(max_workers=1)
self.error_stream = error_stream
- self.error_printer = QueueFunctionManager(self._print_error, 1, self)
+ self.error_print_pool = ThreadPoolExecutor(max_workers=1)
self.error_count = 0
def __enter__(self):
- self.printer.__enter__()
- self.error_printer.__enter__()
return self
def __exit__(self, exc_type, exc_value, traceback):
- self.error_printer.__exit__(exc_type, exc_value, traceback)
- self.printer.__exit__(exc_type, exc_value, traceback)
-
- def queue_manager(self, func, thread_count, *args, **kwargs):
- connection_maker = kwargs.pop('connection_maker', None)
- error_counter = kwargs.pop('error_counter', None)
- return QueueFunctionManager(func, thread_count, self, thread_args=args,
- thread_kwargs=kwargs,
- connection_maker=connection_maker,
- error_counter=error_counter)
+ self.error_print_pool.__exit__(exc_type, exc_value, traceback)
+ self.print_pool.__exit__(exc_type, exc_value, traceback)
def print_msg(self, msg, *fmt_args):
if fmt_args:
msg = msg % fmt_args
- self.printer.queue.put(msg)
+ self.print_pool.submit(self._print, msg)
def print_items(self, items, offset=DEFAULT_OFFSET, skip_missing=False):
lines = []
@@ -241,36 +81,10 @@ class MultiThreadingManager(object):
lines.append((template % (k, v)).rstrip())
self.print_msg('\n'.join(lines))
- def print_headers(self, headers, meta_prefix='', exclude_headers=None,
- offset=DEFAULT_OFFSET):
- exclude_headers = exclude_headers or []
- meta_headers = []
- other_headers = []
- template = '%%%ds: %%s' % offset
- for key, value in headers.items():
- if key.startswith(meta_prefix):
- meta_key = 'Meta %s' % key[len(meta_prefix):].title()
- meta_headers.append(template % (meta_key, value))
- elif key not in exclude_headers:
- other_headers.append(template % (key.title(), value))
- self.print_msg('\n'.join(chain(meta_headers, other_headers)))
-
- def headers_to_items(self, headers, meta_prefix='', exclude_headers=None):
- exclude_headers = exclude_headers or []
- meta_items = []
- other_items = []
- for key, value in headers.items():
- if key.startswith(meta_prefix):
- meta_key = 'Meta %s' % key[len(meta_prefix):].title()
- meta_items.append((meta_key, value))
- elif key not in exclude_headers:
- other_items.append((key.title(), value))
- return meta_items + other_items
-
def error(self, msg, *fmt_args):
if fmt_args:
msg = msg % fmt_args
- self.error_printer.queue.put(msg)
+ self.error_print_pool.submit(self._print_error, msg)
def _print(self, item, stream=None):
if stream is None:
@@ -282,3 +96,79 @@ class MultiThreadingManager(object):
def _print_error(self, item):
self.error_count += 1
return self._print(item, stream=self.error_stream)
+
+
+class MultiThreadingManager(object):
+ """
+ One object to manage context for multi-threading. This should make
+ bin/swift less error-prone and allow us to test this code.
+ """
+
+ def __init__(self, create_connection, segment_threads=10,
+ object_dd_threads=10, object_uu_threads=10,
+ container_threads=10):
+
+ """
+ :param segment_threads: The number of threads allocated to segment
+ uploads
+ :param object_dd_threads: The number of threads allocated to object
+ download/delete jobs
+ :param object_uu_threads: The number of threads allocated to object
+ upload/update based jobs
+ :param container_threads: The number of threads allocated to
+ container/account level jobs
+ """
+ self.segment_pool = ConnectionThreadPoolExecutor(
+ create_connection, max_workers=segment_threads)
+ self.object_dd_pool = ConnectionThreadPoolExecutor(
+ create_connection, max_workers=object_dd_threads)
+ self.object_uu_pool = ConnectionThreadPoolExecutor(
+ create_connection, max_workers=object_uu_threads)
+ self.container_pool = ConnectionThreadPoolExecutor(
+ create_connection, max_workers=container_threads)
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ self.segment_pool.__exit__(exc_type, exc_value, traceback)
+ self.object_dd_pool.__exit__(exc_type, exc_value, traceback)
+ self.object_uu_pool.__exit__(exc_type, exc_value, traceback)
+ self.container_pool.__exit__(exc_type, exc_value, traceback)
+
+
+class ConnectionThreadPoolExecutor(ThreadPoolExecutor):
+ """
+ A wrapper class to maintain a pool of connections alongside the thread
+ pool. We start by creating a priority queue of connections, and each job
+ submitted takes one of those connections (initialising if necessary) and
+ passes it as the first arg to the executed function.
+
+ At the end of execution that connection is returned to the queue.
+
+ By using a PriorityQueue we avoid creating more connections than required.
+ We will only create as many connections as are required concurrently.
+ """
+ def __init__(self, create_connection, max_workers):
+ self._connections = PriorityQueue()
+ self._create_connection = create_connection
+ for p in range(0, max_workers):
+ self._connections.put((p, None))
+ super(ConnectionThreadPoolExecutor, self).__init__(max_workers)
+
+ def submit(self, fn, *args, **kwargs):
+ def conn_fn():
+ priority = None
+ conn = None
+ try:
+ # If we get a connection we must put it back later
+ (priority, conn) = self._connections.get()
+ if conn is None:
+ conn = self._create_connection()
+ conn_args = (conn,) + args
+ return fn(*conn_args, **kwargs)
+ finally:
+ if priority is not None:
+ self._connections.put((priority, conn))
+
+ return super(ConnectionThreadPoolExecutor, self).submit(conn_fn)
diff --git a/swiftclient/service.py b/swiftclient/service.py
new file mode 100644
index 0000000..3cedb1c
--- /dev/null
+++ b/swiftclient/service.py
@@ -0,0 +1,2062 @@
+# Copyright (c) 2010-2013 OpenStack, LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from concurrent.futures import as_completed, CancelledError, TimeoutError
+from copy import deepcopy
+from errno import EEXIST, ENOENT
+from hashlib import md5
+from os import environ, makedirs, stat, utime
+from os.path import (
+ basename, dirname, getmtime, getsize, isdir, join, sep as os_path_sep
+)
+from random import shuffle
+from time import time
+from threading import Thread
+from six import StringIO, text_type
+from six.moves.queue import Queue
+from six.moves.queue import Empty as QueueEmpty
+from six.moves.urllib.parse import quote, unquote
+from six import Iterator, string_types
+
+try:
+ import simplejson as json
+except ImportError:
+ import json
+
+
+from swiftclient import Connection
+from swiftclient.command_helpers import (
+ stat_account, stat_container, stat_object
+)
+from swiftclient.utils import config_true_value
+from swiftclient.exceptions import ClientException
+from swiftclient.multithreading import MultiThreadingManager
+
+
+class ResultsIterator(Iterator):
+ def __init__(self, futures):
+ self.futures = interruptable_as_completed(futures)
+
+ def __iter__(self):
+ return self
+
+ def __next__(self):
+ next_completed_future = next(self.futures)
+ return next_completed_future.result()
+
+
+class SwiftError(Exception):
+ def __init__(self, value, container=None, obj=None,
+ segment=None, exc=None):
+ self.value = value
+ self.container = container
+ self.obj = obj
+ self.segment = segment
+ self.exception = exc
+
+ def __str__(self):
+ value = repr(self.value)
+ if self.container is not None:
+ value += " container:%s" % self.container
+ if self.obj is not None:
+ value += " object:%s" % self.obj
+ if self.segment is not None:
+ value += " segment:%s" % self.segment
+ return value
+
+
+def process_options(options):
+ if not (options['auth'] and options['user'] and options['key']):
+ # Use 2.0 auth if none of the old args are present
+ options['auth_version'] = '2.0'
+
+ # Use new-style args if old ones not present
+ if not options['auth'] and options['os_auth_url']:
+ options['auth'] = options['os_auth_url']
+ if not options['user']and options['os_username']:
+ options['user'] = options['os_username']
+ if not options['key'] and options['os_password']:
+ options['key'] = options['os_password']
+
+ # Specific OpenStack options
+ options['os_options'] = {
+ 'tenant_id': options['os_tenant_id'],
+ 'tenant_name': options['os_tenant_name'],
+ 'service_type': options['os_service_type'],
+ 'endpoint_type': options['os_endpoint_type'],
+ 'auth_token': options['os_auth_token'],
+ 'object_storage_url': options['os_storage_url'],
+ 'region_name': options['os_region_name'],
+ }
+
+_default_global_options = {
+ "snet": False,
+ "verbose": 1,
+ "debug": False,
+ "info": False,
+ "auth": environ.get('ST_AUTH'),
+ "auth_version": environ.get('ST_AUTH_VERSION', '1.0'),
+ "user": environ.get('ST_USER'),
+ "key": environ.get('ST_KEY'),
+ "retries": 5,
+ "os_username": environ.get('OS_USERNAME'),
+ "os_password": environ.get('OS_PASSWORD'),
+ "os_tenant_id": environ.get('OS_TENANT_ID'),
+ "os_tenant_name": environ.get('OS_TENANT_NAME'),
+ "os_auth_url": environ.get('OS_AUTH_URL'),
+ "os_auth_token": environ.get('OS_AUTH_TOKEN'),
+ "os_storage_url": environ.get('OS_STORAGE_URL'),
+ "os_region_name": environ.get('OS_REGION_NAME'),
+ "os_service_type": environ.get('OS_SERVICE_TYPE'),
+ "os_endpoint_type": environ.get('OS_ENDPOINT_TYPE'),
+ "os_cacert": environ.get('OS_CACERT'),
+ "insecure": config_true_value(environ.get('SWIFTCLIENT_INSECURE')),
+ "ssl_compression": False,
+ 'segment_threads': 10,
+ 'object_dd_threads': 10,
+ 'object_uu_threads': 10,
+ 'container_threads': 10
+}
+
+_default_local_options = {
+ 'sync_to': None,
+ 'sync_key': None,
+ 'use_slo': False,
+ 'segment_size': None,
+ 'segment_container': None,
+ 'leave_segments': False,
+ 'changed': None,
+ 'skip_identical': False,
+ 'yes_all': False,
+ 'read_acl': None,
+ 'write_acl': None,
+ 'out_file': None,
+ 'no_download': False,
+ 'long': False,
+ 'totals': False,
+ 'marker': '',
+ 'header': [],
+ 'meta': [],
+ 'prefix': None,
+ 'delimiter': None,
+ 'fail_fast': False,
+ 'human': False,
+ 'dir_marker': False
+}
+
+POLICY = 'X-Storage-Policy'
+
+
+def get_from_queue(q, timeout=864000):
+ while True:
+ try:
+ item = q.get(timeout=timeout)
+ return item
+ except QueueEmpty:
+ # Do nothing here, we only have a timeout to allow interruption
+ pass
+
+
+def get_future_result(f, timeout=86400):
+ while True:
+ try:
+ res = f.result(timeout=timeout)
+ return res
+ except TimeoutError:
+ # Do nothing here, we only have a timeout to allow interruption
+ pass
+
+
+def interruptable_as_completed(fs, timeout=86400):
+ while True:
+ try:
+ for f in as_completed(fs, timeout=timeout):
+ fs.remove(f)
+ yield f
+ return
+ except TimeoutError:
+ # Do nothing here, we only have a timeout to allow interruption
+ pass
+
+
+def get_conn(options):
+ """
+ Return a connection building it from the options.
+ """
+ return Connection(options['auth'],
+ options['user'],
+ options['key'],
+ options['retries'],
+ auth_version=options['auth_version'],
+ os_options=options['os_options'],
+ snet=options['snet'],
+ cacert=options['os_cacert'],
+ insecure=options['insecure'],
+ ssl_compression=options['ssl_compression'])
+
+
+def mkdirs(path):
+ try:
+ makedirs(path)
+ except OSError as err:
+ if err.errno != EEXIST:
+ raise
+
+
+def split_headers(options, prefix=''):
+ """
+ Splits 'Key: Value' strings and returns them as a dictionary.
+
+ :param options: An array of 'Key: Value' strings
+ :param prefix: String to prepend to all of the keys in the dictionary.
+ reporting.
+ """
+ headers = {}
+ for item in options:
+ split_item = item.split(':', 1)
+ if len(split_item) == 2:
+ headers[(prefix + split_item[0]).title()] = split_item[1]
+ else:
+ raise SwiftError(
+ "Metadata parameter %s must contain a ':'.\n%s"
+ % (item, "Example: 'Color:Blue' or 'Size:Large'")
+ )
+ return headers
+
+
+class SwiftUploadObject(object):
+ """
+ Class for specifying an object upload, allowing the object source, name and
+ options to be specified separately for each individual object.
+ """
+ def __init__(self, source, object_name=None, options=None):
+ if isinstance(source, string_types):
+ self.source = source
+ self.object_name = object_name or source
+ self.options = options
+ else:
+ if source.hasattr('read') or source is None:
+ if object_name is None or \
+ not isinstance(object_name, string_types):
+ raise SwiftError(
+ "Object names must be specified as strings for uploads"
+ " from None or file like objects."
+ )
+ else:
+ self.source = source
+ self.object_name = object_name
+ self.options = options
+ else:
+ raise SwiftError(
+ "Unexpected source type for SwiftUploadObject: "
+ "%s" % type(source)
+ )
+ if not self.object_name:
+ raise SwiftError(
+ "Object names must be specified as non-empty strings"
+ )
+
+
+class SwiftPostObject(object):
+ """
+ Class for specifying an object post, allowing the headers/metadata to be
+ specified separately for each individual object.
+ """
+ def __init__(self, object_name, options=None):
+ if not isinstance(object_name, string_types) or not object_name:
+ raise SwiftError(
+ "Object names must be specified as non-empty strings"
+ )
+ else:
+ self.object_name = object_name
+ self.options = options
+
+
+class _SwiftReader(object):
+ """
+ Class for downloading objects from swift and raising appropriate
+ errors on failures caused by either invalid md5sum or size of the
+ data read.
+ """
+ def __init__(self, path, body, headers):
+ self._path = path
+ self._body = body
+ self._actual_read = 0
+ self._content_length = None
+ self._actual_md5 = None
+ self._expected_etag = headers.get('etag')
+
+ if 'x-object-manifest' not in headers and \
+ 'x-static-large-object' not in headers:
+ self.actual_md5 = md5()
+
+ if 'content-length' in headers:
+ self._content_length = int(headers.get('content-length'))
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ if self._actual_md5 is not None:
+ etag = self._actual_md5.hexdigest()
+ if etag != self._expected_etag:
+ raise SwiftError(
+ 'Error downloading %s: md5sum != etag, %s != %s' %
+ (self._path, etag, self._expected_etag)
+ )
+
+ if self._content_length is not None and \
+ self._actual_read != self._content_length:
+ raise SwiftError(
+ 'Error downloading %s: read_length != content_length, '
+ '%d != %d' % (self._path, self._actual_read,
+ self._content_length)
+ )
+
+ def buffer(self):
+ for chunk in self._body:
+ if self._actual_md5 is not None:
+ self._actual_md5.update(chunk)
+ self._actual_read += len(chunk)
+ yield chunk
+
+ def bytes_read(self):
+ return self._actual_read
+
+
+class SwiftService(object):
+ """
+ Service for performing swift operations
+ """
+ def __init__(self, options=None):
+ if options is not None:
+ self._options = dict(
+ _default_global_options,
+ **dict(_default_local_options, **options)
+ )
+ else:
+ self._options = dict(
+ _default_global_options,
+ **_default_local_options
+ )
+ process_options(self._options)
+ create_connection = lambda: get_conn(self._options)
+ self.thread_manager = MultiThreadingManager(
+ create_connection,
+ segment_threads=self._options['segment_threads'],
+ object_dd_threads=self._options['object_dd_threads'],
+ object_uu_threads=self._options['object_uu_threads'],
+ container_threads=self._options['container_threads']
+ )
+
+ def __enter__(self):
+ self.thread_manager.__enter__()
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.thread_manager.__exit__(exc_type, exc_val, exc_tb)
+
+ # Stat related methods
+ #
+ def stat(self, container=None, objects=None, options=None):
+ """
+ Get account stats, container stats or information about a list of
+ objects in a container.
+
+ :param container: The container to query.
+ :param objects: A list of object paths about which to return
+ information (a list of strings).
+ :param options: A dictionary containing options to override the global
+ options specified during the service object creation.
+ These options are applied to all stat operations
+ performed by this call::
+
+ {
+ 'human': False
+ }
+
+ :returns: Either a single dictionary containing stats about an account
+ or container, or an iterator for returning the results of the
+ stat operations on a list of objects.
+
+ :raises: SwiftError
+ """
+ if options is not None:
+ options = dict(self._options, **options)
+ else:
+ options = self._options
+
+ if not container:
+ if objects:
+ raise SwiftError('Objects specified without container')
+ else:
+ res = {
+ 'action': 'stat_account',
+ 'success': True,
+ 'container': container,
+ 'object': None,
+ }
+ try:
+ stats_future = self.thread_manager.container_pool.submit(
+ stat_account, options
+ )
+ items, headers = get_future_result(stats_future)
+ res.update({
+ 'items': items,
+ 'headers': headers
+ })
+ return res
+ except ClientException as err:
+ if err.http_status == 404:
+ res.update({
+ 'success': False,
+ 'error': err
+ })
+ return res
+ raise SwiftError('Account not found', exc=err)
+ except Exception as err:
+ res.update({
+ 'success': False,
+ 'error': err
+ })
+ return res
+ else:
+ if not objects:
+ res = {
+ 'action': 'stat_container',
+ 'container': container,
+ 'object': None,
+ 'success': True,
+ }
+ try:
+ stats_future = self.thread_manager.container_pool.submit(
+ stat_container, options, container
+ )
+ items, headers = get_future_result(stats_future)
+ res.update({
+ 'items': items,
+ 'headers': headers
+ })
+ return res
+ except ClientException as err:
+ if err.http_status != 404:
+ res.update({
+ 'success': False,
+ 'error': err
+ })
+ return res
+ raise SwiftError('Container %r not found' % container,
+ container=container, exc=err)
+ except Exception as err:
+ res.update({
+ 'success': False,
+ 'error': err
+ })
+ return res
+ else:
+ stat_futures = []
+ for stat_o in objects:
+ stat_future = self.thread_manager.object_dd_pool.submit(
+ self._stat_object, container, stat_o, options
+ )
+ stat_futures.append(stat_future)
+
+ return ResultsIterator(stat_futures)
+
+ @staticmethod
+ def _stat_object(conn, container, obj, options):
+ res = {
+ 'action': 'stat_object',
+ 'object': obj,
+ 'container': container,
+ 'success': True,
+ }
+ try:
+ items, headers = stat_object(conn, options, container, obj)
+ res.update({
+ 'items': items,
+ 'headers': headers
+ })
+ return res
+ except Exception as err:
+ res.update({
+ 'success': False,
+ 'error': err
+ })
+ return res
+
+ # Post related methods
+ #
+ def post(self, container=None, objects=None, options=None):
+ """
+ Post operations on an account, container or list of objects
+
+ :param container: The container to make the post operation against.
+ :param objects: A list of object names (strings) or SwiftPostObject
+ instances containing an object name, and an
+ options dict (can be None) to override the options for
+ that individual post operation::
+
+ [
+ 'object_name',
+ SwiftPostObject('object_name', options={...}),
+ ...
+ ]
+
+ The options dict is described below.
+ :param options: A dictionary containing options to override the global
+ options specified during the service object creation.
+ These options are applied to all post operations
+ performed by this call, unless overridden on a per
+ object basis. Possible options are given below::
+
+ {
+ 'meta': [],
+ 'headers': [],
+ 'read_acl': None, # For containers only
+ 'write_acl': None, # For containers only
+ 'sync_to': None, # For containers only
+ 'sync_key': None # For containers only
+ }
+
+ :returns: Either a single result dictionary in the case of a post to a
+ container/account, or an iterator for returning the results
+ of posts to a list of objects.
+
+ :raises: SwiftError
+ """
+ if options is not None:
+ options = dict(self._options, **options)
+ else:
+ options = self._options
+
+ res = {
+ 'success': True,
+ 'container': container,
+ 'object': None,
+ 'headers': {},
+ }
+ if not container:
+ res["action"] = "post_account"
+ if objects:
+ raise SwiftError('Objects specified without container')
+ else:
+ response_dict = {}
+ headers = split_headers(
+ options['meta'], 'X-Account-Meta-')
+ headers.update(
+ split_headers(options['header'], ''))
+ res['headers'] = headers
+ try:
+ post = self.thread_manager.container_pool.submit(
+ self._post_account_job, headers, response_dict
+ )
+ get_future_result(post)
+ except ClientException as err:
+ if err.http_status != 404:
+ res.update({
+ 'success': False,
+ 'error': err,
+ 'response_dict': response_dict
+ })
+ return res
+ raise SwiftError('Account not found')
+ except Exception as err:
+ res.update({
+ 'success': False,
+ 'error': err,
+ 'response_dict': response_dict
+ })
+ return res
+ else:
+ if not objects:
+ res["action"] = "post_container"
+ response_dict = {}
+ headers = split_headers(
+ options['meta'], 'X-Container-Meta-')
+ headers.update(
+ split_headers(options['header'], ''))
+ if options['read_acl'] is not None:
+ headers['X-Container-Read'] = options['read_acl']
+ if options['write_acl'] is not None:
+ headers['X-Container-Write'] = options['write_acl']
+ if options['sync_to'] is not None:
+ headers['X-Container-Sync-To'] = options['sync_to']
+ if options['sync_key'] is not None:
+ headers['X-Container-Sync-Key'] = options['sync_key']
+ res['headers'] = headers
+ try:
+ post = self.thread_manager.container_pool.submit(
+ self._post_container_job, container,
+ headers, response_dict
+ )
+ get_future_result(post)
+ except ClientException as err:
+ if err.http_status != 404:
+ res.update({
+ 'action': 'post_container',
+ 'success': False,
+ 'error': err,
+ 'response_dict': response_dict
+ })
+ return res
+ raise SwiftError(
+ "Container '%s' not found" % container,
+ container=container
+ )
+ except Exception as err:
+ res.update({
+ 'action': 'post_container',
+ 'success': False,
+ 'error': err,
+ 'response_dict': response_dict
+ })
+ return res
+ else:
+ post_futures = []
+ post_objects = self._make_post_objects(objects)
+ for post_object in post_objects:
+ obj = post_object.object_name
+ obj_options = post_object.options
+ response_dict = {}
+ headers = split_headers(
+ options['meta'], 'X-Object-Meta-')
+ # add header options to the headers object for the request.
+ headers.update(
+ split_headers(options['header'], ''))
+ if obj_options is not None:
+ if 'meta' in obj_options:
+ headers.update(
+ split_headers(
+ obj_options['meta'], 'X-Object-Meta'
+ )
+ )
+ if 'headers' in obj_options:
+ headers.update(
+ split_headers(obj_options['header'], '')
+ )
+
+ post = self.thread_manager.object_uu_pool.submit(
+ self._post_object_job, container, obj,
+ headers, response_dict
+ )
+ post_futures.append(post)
+
+ return ResultsIterator(post_futures)
+
+ @staticmethod
+ def _make_post_objects(objects):
+ post_objects = []
+
+ for o in objects:
+ if isinstance(o, string_types):
+ obj = SwiftPostObject(o)
+ post_objects.append(obj)
+ elif isinstance(o, SwiftPostObject):
+ post_objects.append(o)
+ else:
+ raise SwiftError(
+ "The post operation takes only strings or "
+ "SwiftPostObjects as input",
+ obj=o)
+
+ return post_objects
+
+ @staticmethod
+ def _post_account_job(conn, headers, result):
+ return conn.post_account(headers=headers, response_dict=result)
+
+ @staticmethod
+ def _post_container_job(conn, container, headers, result):
+ try:
+ res = conn.post_container(
+ container, headers=headers, response_dict=result)
+ except ClientException as err:
+ if err.http_status != 404:
+ raise
+ _response_dict = {}
+ res = conn.put_container(
+ container, headers=headers, response_dict=_response_dict)
+ result['post_put'] = _response_dict
+ return res
+
+ @staticmethod
+ def _post_object_job(conn, container, obj, headers, result):
+ res = {
+ 'success': True,
+ 'action': 'post_object',
+ 'container': container,
+ 'object': obj,
+ 'headers': headers,
+ 'response_dict': result
+ }
+ try:
+ conn.post_object(
+ container, obj, headers=headers, response_dict=result)
+ except Exception as err:
+ res.update({
+ 'success': False,
+ 'error': err
+ })
+
+ return res
+
+ # List related methods
+ #
+ def list(self, container=None, options=None):
+ """
+ List operations on an account, container.
+
+ :param container: The container to make the list operation against.
+ :param options: A dictionary containing options to override the global
+ options specified during the service object creation::
+
+ {
+ 'long': False,
+ 'prefix': None,
+ 'delimiter': None,
+ }
+
+ :returns: A generator for returning the results of the list operation
+ on an account or container. Each result yielded from the
+ generator is either a 'list_account_part' or
+ 'list_container_part', containing part of the listing.
+ """
+ if options is not None:
+ options = dict(self._options, **options)
+ else:
+ options = self._options
+
+ rq = Queue()
+
+ if container is None:
+ listing_future = self.thread_manager.container_pool.submit(
+ self._list_account_job, options, rq
+ )
+ else:
+ listing_future = self.thread_manager.container_pool.submit(
+ self._list_container_job, container, options, rq
+ )
+
+ res = get_from_queue(rq)
+ while res is not None:
+ yield res
+ res = get_from_queue(rq)
+
+ # Make sure the future has completed
+ get_future_result(listing_future)
+
+ @staticmethod
+ def _list_account_job(conn, options, result_queue):
+ marker = ''
+ success = True
+ error = None
+ try:
+ while True:
+ _, items = conn.get_account(
+ marker=marker, prefix=options['prefix']
+ )
+
+ if not items:
+ result_queue.put(None)
+ return
+
+ if options['long']:
+ for i in items:
+ name = i['name']
+ i['meta'] = conn.head_container(name)
+
+ res = {
+ 'action': 'list_account_part',
+ 'container': None,
+ 'prefix': options['prefix'],
+ 'success': True,
+ 'listing': items,
+ 'marker': marker,
+ }
+ result_queue.put(res)
+
+ marker = items[-1].get('name', items[-1].get('subdir'))
+ except ClientException as err:
+ success = False
+ if err.http_status != 404:
+ error = err
+ else:
+ error = SwiftError('Account not found')
+
+ except Exception as err:
+ success = False
+ error = err
+
+ res = {
+ 'action': 'list_account_part',
+ 'container': None,
+ 'prefix': options['prefix'],
+ 'success': success,
+ 'marker': marker,
+ 'error': error,
+ }
+ result_queue.put(res)
+ result_queue.put(None)
+
+ @staticmethod
+ def _list_container_job(conn, container, options, result_queue):
+ marker = ''
+ success = True
+ error = None
+ try:
+ while True:
+ _, items = conn.get_container(
+ container, marker=marker, prefix=options['prefix'],
+ delimiter=options['delimiter']
+ )
+
+ if not items:
+ result_queue.put(None)
+ return
+
+ res = {
+ 'action': 'list_container_part',
+ 'container': container,
+ 'prefix': options['prefix'],
+ 'success': True,
+ 'marker': marker,
+ 'listing': items,
+ }
+ result_queue.put(res)
+
+ marker = items[-1].get('name', items[-1].get('subdir'))
+ except ClientException as err:
+ success = False
+ if err.http_status != 404:
+ error = err
+ else:
+ error = SwiftError('Container %r not found' % container,
+ container=container)
+ except Exception as err:
+ success = False
+ error = err
+
+ res = {
+ 'action': 'list_container_part',
+ 'container': container,
+ 'prefix': options['prefix'],
+ 'success': success,
+ 'marker': marker,
+ 'error': error,
+ }
+ result_queue.put(res)
+ result_queue.put(None)
+
+ # Download related methods
+ #
+ def download(self, container=None, objects=None, options=None):
+ """
+ Download operations on an account, optional container and optional list
+ of objects.
+
+ :param container: The container to download from.
+ :param objects: A list of object names to download (a list of strings).
+ :param options: A dictionary containing options to override the global
+ options specified during the service object creation::
+
+ {
+ 'yes_all': False,
+ 'marker': '',
+ 'prefix': None,
+ 'no_download': False,
+ 'header': [],
+ 'skip_identical': False,
+ 'out_file': None
+ }
+
+ :returns: A generator for returning the results of the download
+ operations. Each result yielded from the generator is a
+ 'download_object' dictionary containing the results of an
+ individual file download.
+
+ :raises: ClientException
+ :raises: SwiftError
+ """
+ if options is not None:
+ options = dict(self._options, **options)
+ else:
+ options = self._options
+
+ if not container:
+ # Download everything if options['yes_all'] is set
+ if options['yes_all']:
+ try:
+ options_copy = deepcopy(options)
+ options_copy["long"] = False
+ containers = []
+ for part in self.list(options=options_copy):
+ if part["success"]:
+ containers.extend([
+ i['name'] for i in part["listing"]
+ ])
+ else:
+ raise part["error"]
+
+ shuffle(containers)
+
+ o_downs = []
+ for con in containers:
+ objs = []
+ for part in self.list(
+ container=con, options=options_copy):
+ if part["success"]:
+ objs.extend([
+ i['name'] for i in part["listing"]
+ ])
+ else:
+ raise part["error"]
+ shuffle(objs)
+
+ o_downs.extend(
+ self.thread_manager.object_dd_pool.submit(
+ self._download_object_job, con, obj,
+ options_copy
+ ) for obj in objs
+ )
+
+ for o_down in interruptable_as_completed(o_downs):
+ yield o_down.result()
+
+ # If we see a 404 here, the listing of the account failed
+ except ClientException as err:
+ if err.http_status != 404:
+ raise
+ raise SwiftError('Account not found')
+
+ elif not objects:
+ if '/' in container:
+ raise SwiftError('\'/\' in container name',
+ container=container)
+ for res in self._download_container(container, options):
+ yield res
+
+ else:
+ if '/' in container:
+ raise SwiftError('\'/\' in container name',
+ container=container)
+ if options['out_file'] and len(objects) > 1:
+ options['out_file'] = None
+
+ o_downs = [
+ self.thread_manager.object_dd_pool.submit(
+ self._download_object_job, container, obj, options
+ ) for obj in objects
+ ]
+
+ for o_down in interruptable_as_completed(o_downs):
+ yield o_down.result()
+
+ @staticmethod
+ def _download_object_job(conn, container, obj, options):
+ out_file = options['out_file']
+ results_dict = {}
+
+ req_headers = split_headers(options['header'], '')
+
+ pseudodir = False
+ path = join(container, obj) if options['yes_all'] else obj
+ path = path.lstrip(os_path_sep)
+ if options['skip_identical'] and out_file != '-':
+ filename = out_file if out_file else path
+ try:
+ fp = open(filename, 'rb')
+ except IOError:
+ pass
+ else:
+ with fp:
+ md5sum = md5()
+ while True:
+ data = fp.read(65536)
+ if not data:
+ break
+ md5sum.update(data)
+ req_headers['If-None-Match'] = md5sum.hexdigest()
+
+ try:
+ start_time = time()
+
+ headers, body = \
+ conn.get_object(container, obj, resp_chunk_size=65536,
+ headers=req_headers,
+ response_dict=results_dict)
+ headers_receipt = time()
+
+ reader = _SwiftReader(path, body, headers)
+ with reader as obj_body:
+ fp = None
+ try:
+ no_file = options['no_download']
+ make_dir = not no_file and out_file != "-"
+ content_type = headers.get('content-type')
+ if content_type.split(';', 1)[0] == 'text/directory':
+ if make_dir and not isdir(path):
+ mkdirs(path)
+
+ for _ in obj_body.buffer():
+ continue
+ else:
+ dirpath = dirname(path)
+ if make_dir and dirpath and not isdir(dirpath):
+ mkdirs(dirpath)
+
+ if not no_file:
+ if out_file == "-":
+ res = {
+ 'path': path,
+ 'contents': obj_body
+ }
+ return res
+ elif out_file:
+ fp = open(out_file, 'wb')
+ else:
+ if basename(path):
+ fp = open(path, 'wb')
+ else:
+ pseudodir = True
+ no_file = True
+
+ for chunk in obj_body.buffer():
+ if not no_file:
+ fp.write(chunk)
+
+ else:
+ for _ in obj_body.buffer():
+ continue
+
+ finish_time = time()
+ finally:
+ bytes_read = obj_body.bytes_read()
+ if fp is not None:
+ fp.close()
+ if 'x-object-meta-mtime' in headers \
+ and not options['no_download']:
+ mtime = float(headers['x-object-meta-mtime'])
+ if options['out_file'] \
+ and not options['out_file'] == "-":
+ utime(options['out_file'], (mtime, mtime))
+ else:
+ utime(path, (mtime, mtime))
+
+ res = {
+ 'action': 'download_object',
+ 'success': True,
+ 'container': container,
+ 'object': obj,
+ 'path': path,
+ 'pseudodir': pseudodir,
+ 'start_time': start_time,
+ 'finish_time': finish_time,
+ 'headers_receipt': headers_receipt,
+ 'auth_end_time': conn.auth_end_time,
+ 'read_length': bytes_read,
+ 'attempts': conn.attempts,
+ 'response_dict': results_dict
+ }
+ return res
+
+ except Exception as err:
+ res = {
+ 'action': 'download_object',
+ 'container': container,
+ 'object': obj,
+ 'success': False,
+ 'error': err,
+ 'response_dict': results_dict,
+ 'path': path,
+ 'pseudodir': pseudodir,
+ 'attempts': conn.attempts
+ }
+ return res
+
+ def _download_container(self, container, options):
+ try:
+ objects = []
+ for part in self.list(container=container, options=options):
+ if part["success"]:
+ objects.extend([o["name"] for o in part["listing"]])
+ else:
+ raise part["error"]
+
+ o_downs = [
+ self.thread_manager.object_dd_pool.submit(
+ self._download_object_job, container, obj, options
+ ) for obj in objects
+ ]
+
+ for o_down in interruptable_as_completed(o_downs):
+ yield o_down.result()
+
+ except ClientException as err:
+ if err.http_status != 404:
+ raise
+ raise SwiftError('Container %r not found' % container,
+ container=container)
+
+ # Upload related methods
+ #
+ def upload(self, container, objects, options=None):
+ """
+ Upload a list of objects to a given container.
+
+ :param container: The container to put the uploads into.
+ :param objects: A list of file/directory names (strings) or
+ SwiftUploadObject instances containing a source for the
+ created object, an object name, and an options dict
+ (can be None) to override the options for that
+ individual upload operation::
+
+ [
+ '/path/to/file',
+ SwiftUploadObject('/path', object_name='obj1'),
+ ...
+ ]
+
+ The options dict is as described below.
+
+ The SwiftUploadObject source may be one of:
+
+ file - A file like object (with a read method)
+ path - A string containing the path to a local file
+ or directory
+ None - Indicates that we want an empty object
+
+ :param options: A dictionary containing options to override the global
+ options specified during the service object creation.
+ These options are applied to all upload operations
+ performed by this call, unless overridden on a per
+ object basis. Possible options are given below::
+
+ {
+ 'meta': [],
+ 'headers': [],
+ 'segment_size': None,
+ 'use_slo': False,
+ 'segment_container: None,
+ 'leave_segments': False,
+ 'changed': None,
+ 'skip_identical': False,
+ 'fail_fast': False,
+ 'dir_marker': False # Only for None sources
+ }
+
+ :returns: A generator for returning the results of the uploads.
+
+ :raises: SwiftError
+ :raises: ClientException
+ """
+ if options is not None:
+ options = dict(self._options, **options)
+ else:
+ options = self._options
+
+ # Does the account exist?
+ account_stat = self.stat(options=options)
+ if not account_stat["success"]:
+ raise account_stat["error"]
+
+ # Try to create the container, just in case it doesn't exist. If this
+ # fails, it might just be because the user doesn't have container PUT
+ # permissions, so we'll ignore any error. If there's really a problem,
+ # it'll surface on the first object PUT.
+ policy_header = {}
+ _header = split_headers(options["header"])
+ if POLICY in _header:
+ policy_header[POLICY] = \
+ _header[POLICY]
+ create_containers = [
+ self.thread_manager.container_pool.submit(
+ self._create_container_job, container, headers=policy_header
+ )
+ ]
+
+ if options['segment_size'] is not None:
+ seg_container = container + '_segments'
+ if options['segment_container']:
+ seg_container = options['segment_container']
+ if not policy_header:
+ # Since no storage policy was specified on the command line,
+ # rather than just letting swift pick the default storage
+ # policy, we'll try to create the segments container with the
+ # same as the upload container
+ create_containers.append(
+ self.thread_manager.object_uu_pool.submit(
+ self._create_container_job, seg_container,
+ policy_source=container
+ )
+ )
+ else:
+ create_containers.append(
+ self.thread_manager.object_uu_pool.submit(
+ self._create_container_job, seg_container,
+ headers=policy_header
+ )
+ )
+
+ for r in interruptable_as_completed(create_containers):
+ res = r.result()
+ yield res
+
+ # We maintain a results queue here and a separate thread to monitor
+ # the futures because we want to get results back from potential
+ # segment uploads too
+ rq = Queue()
+ file_jobs = {}
+
+ upload_objects = self._make_upload_objects(objects)
+ for upload_object in upload_objects:
+ s = upload_object.source
+ o = upload_object.object_name
+ o_opts = upload_object.options
+ details = {'action': 'upload', 'container': container}
+ if o_opts is not None:
+ object_options = deepcopy(options)
+ object_options.update(o_opts)
+ else:
+ object_options = options
+ if hasattr(s, 'read'):
+ # We've got a file like object to upload to o
+ file_future = self.thread_manager.object_uu_pool.submit(
+ self._upload_object_job, container, s, o, object_options
+ )
+ details['file'] = s
+ details['object'] = o
+ file_jobs[file_future] = details
+ elif s is not None:
+ # We've got a path to upload to o
+ details['path'] = s
+ details['object'] = o
+ if isdir(s):
+ dir_future = self.thread_manager.object_uu_pool.submit(
+ self._create_dir_marker_job, container, o,
+ object_options, path=s
+ )
+ file_jobs[dir_future] = details
+ else:
+ try:
+ stat(s)
+ file_future = \
+ self.thread_manager.object_uu_pool.submit(
+ self._upload_object_job, container, s, o,
+ object_options, results_queue=rq
+ )
+ file_jobs[file_future] = details
+ except OSError as err:
+ # Avoid tying up threads with jobs that will fail
+ res = {
+ 'action': 'upload_object',
+ 'container': container,
+ 'object': o,
+ 'success': False,
+ 'error': err,
+ 'path': s
+ }
+ rq.put(res)
+ else:
+ # Create an empty object (as a dir marker if is_dir)
+ details['file'] = None
+ details['object'] = o
+ if object_options['dir_marker']:
+ dir_future = self.thread_manager.object_uu_pool.submit(
+ self._create_dir_marker_job, container, o,
+ object_options
+ )
+ file_jobs[dir_future] = details
+ else:
+ file_future = self.thread_manager.object_uu_pool.submit(
+ self._upload_object_job, container, StringIO(),
+ o, object_options
+ )
+ file_jobs[file_future] = details
+
+ # Start a thread to watch for upload results
+ Thread(
+ target=self._watch_futures, args=(file_jobs, rq)
+ ).start()
+
+ # yield results as they become available, including those from
+ # segment uploads.
+ res = get_from_queue(rq)
+ cancelled = False
+ while res is not None:
+ yield res
+
+ if not res['success']:
+ if not cancelled and options['fail_fast']:
+ cancelled = True
+ for f in file_jobs:
+ f.cancel()
+
+ res = get_from_queue(rq)
+
+ @staticmethod
+ def _make_upload_objects(objects):
+ upload_objects = []
+
+ for o in objects:
+ if isinstance(o, string_types):
+ obj = SwiftUploadObject(o)
+ upload_objects.append(obj)
+ elif isinstance(o, SwiftUploadObject):
+ upload_objects.append(o)
+ else:
+ raise SwiftError(
+ "The upload operation takes only strings or "
+ "SwiftUploadObjects as input",
+ obj=o)
+
+ return upload_objects
+
+ @staticmethod
+ def _create_container_job(
+ conn, container, headers=None, policy_source=None):
+ """
+ Create a container using the given connection
+
+ :param conn: The swift connection used for requests.
+ :param container: The container name to create.
+ :param headers: An optional dict of headers for the
+ put_container request.
+ :param policy_source: An optional name of a container whose policy we
+ should duplicate.
+ :return: A dict containing the results of the operation.
+ """
+ res = {
+ 'action': 'create_container',
+ 'container': container,
+ 'headers': headers
+ }
+ create_response = {}
+ try:
+ if policy_source is not None:
+ _meta = conn.head_container(policy_source)
+ if 'x-storage-policy' in _meta:
+ policy_header = {
+ POLICY: _meta.get('x-storage-policy')
+ }
+ if headers is None:
+ headers = policy_header
+ else:
+ headers.update(policy_header)
+
+ conn.put_container(
+ container, headers, response_dict=create_response
+ )
+ res.update({
+ 'success': True,
+ 'response_dict': create_response
+ })
+ except Exception as err:
+ res.update({
+ 'success': False,
+ 'error': err,
+ 'response_dict': create_response
+ })
+ return res
+
+ @staticmethod
+ def _create_dir_marker_job(conn, container, obj, options, path=None):
+ res = {
+ 'action': 'create_dir_marker',
+ 'container': container,
+ 'object': obj,
+ 'path': path
+ }
+ results_dict = {}
+ if obj.startswith('./') or obj.startswith('.\\'):
+ obj = obj[2:]
+ if obj.startswith('/'):
+ obj = obj[1:]
+ if path is not None:
+ put_headers = {'x-object-meta-mtime': "%f" % getmtime(path)}
+ else:
+ put_headers = {'x-object-meta-mtime': "%f" % round(time())}
+ res['headers'] = put_headers
+ if options['changed']:
+ try:
+ _empty_string_etag = 'd41d8cd98f00b204e9800998ecf8427e'
+ headers = conn.head_object(container, obj)
+ ct = headers.get('content-type')
+ cl = int(headers.get('content-length'))
+ et = headers.get('etag')
+ mt = headers.get('x-object-meta-mtime')
+ if ct.split(';', 1)[0] == 'text/directory' and \
+ cl == 0 and \
+ et == _empty_string_etag and \
+ mt == put_headers['x-object-meta-mtime']:
+ res['success'] = True
+ return res
+ except ClientException as err:
+ if err.http_status != 404:
+ res.update({
+ 'success': False,
+ 'error': err})
+ return res
+ try:
+ conn.put_object(container, obj, '', content_length=0,
+ content_type='text/directory',
+ headers=put_headers,
+ response_dict=results_dict)
+ res.update({
+ 'success': True,
+ 'response_dict': results_dict})
+ return res
+ except Exception as err:
+ res.update({
+ 'success': False,
+ 'error': err,
+ 'response_dict': results_dict})
+ return res
+
+ @staticmethod
+ def _upload_segment_job(conn, path, container, segment_name, segment_start,
+ segment_size, segment_index, obj_name, options,
+ results_queue=None):
+ results_dict = {}
+ if options['segment_container']:
+ segment_container = options['segment_container']
+ else:
+ segment_container = container + '_segments'
+
+ res = {
+ 'action': 'upload_segment',
+ 'for_object': obj_name,
+ 'segment_index': segment_index,
+ 'segment_size': segment_size,
+ 'segment_location': '/%s/%s' % (segment_container,
+ segment_name),
+ 'log_line': '%s segment %s' % (obj_name, segment_index),
+ }
+ try:
+ fp = open(path, 'rb')
+ fp.seek(segment_start)
+
+ etag = conn.put_object(segment_container,
+ segment_name, fp,
+ content_length=segment_size,
+ response_dict=results_dict)
+
+ res.update({
+ 'success': True,
+ 'response_dict': results_dict,
+ 'segment_etag': etag,
+ 'attempts': conn.attempts
+ })
+
+ if results_queue is not None:
+ results_queue.put(res)
+ return res
+
+ except Exception as err:
+ res.update({
+ 'success': False,
+ 'error': err,
+ 'response_dict': results_dict,
+ 'attempts': conn.attempts
+ })
+
+ if results_queue is not None:
+ results_queue.put(res)
+ return res
+
+ def _upload_object_job(self, conn, container, source, obj, options,
+ results_queue=None):
+ res = {
+ 'action': 'upload_object',
+ 'container': container,
+ 'object': obj
+ }
+ if hasattr(source, 'read'):
+ stream = source
+ path = None
+ else:
+ path = source
+ res['path'] = path
+ try:
+ if obj.startswith('./') or obj.startswith('.\\'):
+ obj = obj[2:]
+ if obj.startswith('/'):
+ obj = obj[1:]
+ if path is not None:
+ put_headers = {'x-object-meta-mtime': "%f" % getmtime(path)}
+ else:
+ put_headers = {'x-object-meta-mtime': "%f" % round(time())}
+
+ res['headers'] = put_headers
+
+ # We need to HEAD all objects now in case we're overwriting a
+ # manifest object and need to delete the old segments
+ # ourselves.
+ old_manifest = None
+ old_slo_manifest_paths = []
+ new_slo_manifest_paths = set()
+ if options['changed'] or options['skip_identical'] \
+ or not options['leave_segments']:
+ checksum = None
+ if options['skip_identical']:
+ try:
+ fp = open(path, 'rb')
+ except IOError:
+ pass
+ else:
+ with fp:
+ md5sum = md5()
+ while True:
+ data = fp.read(65536)
+ if not data:
+ break
+ md5sum.update(data)
+ checksum = md5sum.hexdigest()
+ try:
+ headers = conn.head_object(container, obj)
+ if options['skip_identical'] and checksum is not None:
+ if checksum == headers.get('etag'):
+ res.update({
+ 'success': True,
+ 'status': 'skipped-identical'
+ })
+ return res
+ cl = int(headers.get('content-length'))
+ mt = headers.get('x-object-meta-mtime')
+ if path is not None and options['changed']\
+ and cl == getsize(path) and \
+ mt == put_headers['x-object-meta-mtime']:
+ res.update({
+ 'success': True,
+ 'status': 'skipped-changed'
+ })
+ return res
+ if not options['leave_segments']:
+ old_manifest = headers.get('x-object-manifest')
+ if config_true_value(
+ headers.get('x-static-large-object')):
+ headers, manifest_data = conn.get_object(
+ container, obj,
+ query_string='multipart-manifest=get'
+ )
+ for old_seg in json.loads(manifest_data):
+ seg_path = old_seg['name'].lstrip('/')
+ if isinstance(seg_path, text_type):
+ seg_path = seg_path.encode('utf-8')
+ old_slo_manifest_paths.append(seg_path)
+ except ClientException as err:
+ if err.http_status != 404:
+ res.update({
+ 'success': False,
+ 'error': err
+ })
+ return res
+
+ # Merge the command line header options to the put_headers
+ put_headers.update(split_headers(options['header'], ''))
+
+ # Don't do segment job if object is not big enough, and never do
+ # a segment job if we're reading from a stream - we may fail if we
+ # go over the single object limit, but this gives us a nice way
+ # to create objects from memory
+ if path is not None and options['segment_size'] and \
+ getsize(path) > int(options['segment_size']):
+ res['large_object'] = True
+ seg_container = container + '_segments'
+ if options['segment_container']:
+ seg_container = options['segment_container']
+ full_size = getsize(path)
+
+ segment_futures = []
+ segment_pool = self.thread_manager.segment_pool
+ segment = 0
+ segment_start = 0
+
+ while segment_start < full_size:
+ segment_size = int(options['segment_size'])
+ if segment_start + segment_size > full_size:
+ segment_size = full_size - segment_start
+ if options['use_slo']:
+ segment_name = '%s/slo/%s/%s/%s/%08d' % (
+ obj, put_headers['x-object-meta-mtime'],
+ full_size, options['segment_size'], segment
+ )
+ else:
+ segment_name = '%s/%s/%s/%s/%08d' % (
+ obj, put_headers['x-object-meta-mtime'],
+ full_size, options['segment_size'], segment
+ )
+ seg = segment_pool.submit(
+ self._upload_segment_job, path, container,
+ segment_name, segment_start, segment_size, segment,
+ obj, options, results_queue=results_queue
+ )
+ segment_futures.append(seg)
+ segment += 1
+ segment_start += segment_size
+
+ segment_results = []
+ errors = False
+ exceptions = []
+ for f in interruptable_as_completed(segment_futures):
+ try:
+ r = f.result()
+ if not r['success']:
+ errors = True
+ segment_results.append(r)
+ except Exception as e:
+ errors = True
+ exceptions.append(e)
+ if errors:
+ err = ClientException(
+ 'Aborting manifest creation '
+ 'because not all segments could be uploaded. %s/%s'
+ % (container, obj))
+ res.update({
+ 'success': False,
+ 'error': err,
+ 'exceptions': exceptions,
+ 'segment_results': segment_results
+ })
+ return res
+
+ res['segment_results'] = segment_results
+
+ if options['use_slo']:
+ segment_results.sort(key=lambda di: di['segment_index'])
+ for seg in segment_results:
+ seg_loc = seg['segment_location'].lstrip('/')
+ if isinstance(seg_loc, text_type):
+ seg_loc = seg_loc.encode('utf-8')
+ new_slo_manifest_paths.add(seg_loc)
+
+ manifest_data = json.dumps([
+ {
+ 'path': d['segment_location'],
+ 'etag': d['segment_etag'],
+ 'size_bytes': d['segment_size']
+ } for d in segment_results
+ ])
+
+ put_headers['x-static-large-object'] = 'true'
+ mr = {}
+ conn.put_object(
+ container, obj, manifest_data,
+ headers=put_headers,
+ query_string='multipart-manifest=put',
+ response_dict=mr
+ )
+ res['manifest_response_dict'] = mr
+ else:
+ new_object_manifest = '%s/%s/%s/%s/%s/' % (
+ quote(seg_container), quote(obj),
+ put_headers['x-object-meta-mtime'], full_size,
+ options['segment_size'])
+ if old_manifest and old_manifest.rstrip('/') == \
+ new_object_manifest.rstrip('/'):
+ old_manifest = None
+ put_headers['x-object-manifest'] = new_object_manifest
+ mr = {}
+ conn.put_object(
+ container, obj, '', content_length=0,
+ headers=put_headers,
+ response_dict=mr
+ )
+ res['manifest_response_dict'] = mr
+ else:
+ res['large_object'] = False
+ if path is not None:
+ obr = {}
+ conn.put_object(
+ container, obj, open(path, 'rb'),
+ content_length=getsize(path), headers=put_headers,
+ response_dict=obr
+ )
+ res['response_dict'] = obr
+ else:
+ obr = {}
+ conn.put_object(
+ container, obj, stream, headers=put_headers,
+ response_dict=obr
+ )
+ res['response_dict'] = obr
+ if old_manifest or old_slo_manifest_paths:
+ if old_manifest:
+ scontainer, sprefix = old_manifest.split('/', 1)
+ scontainer = unquote(scontainer)
+ sprefix = unquote(sprefix).rstrip('/') + '/'
+ delobjs = []
+ for delobj in conn.get_container(scontainer,
+ prefix=sprefix)[1]:
+ delobjs.append(delobj['name'])
+ drs = []
+ for dr in self.delete(container=scontainer,
+ objects=delobjs):
+ drs.append(dr)
+ res['segment_delete_results'] = drs
+ if old_slo_manifest_paths:
+ delobjsmap = {}
+ for seg_to_delete in old_slo_manifest_paths:
+ if seg_to_delete in new_slo_manifest_paths:
+ continue
+ scont, sobj = \
+ seg_to_delete.split('/', 1)
+ delobjs_cont = delobjsmap.get(scont, [])
+ delobjs_cont.append(sobj)
+ drs = []
+ for (dscont, dsobjs) in delobjsmap.items():
+ for dr in self.delete(container=dscont,
+ objects=dsobjs):
+ drs.append(dr)
+ res['segment_delete_results'] = drs
+
+ # return dict for printing
+ res.update({
+ 'success': True,
+ 'status': 'uploaded',
+ 'attempts': conn.attempts})
+ return res
+
+ except OSError as err:
+ if err.errno == ENOENT:
+ err = SwiftError('Local file %r not found' % path)
+ res.update({
+ 'success': False,
+ 'error': err
+ })
+ except Exception as err:
+ res.update({
+ 'success': False,
+ 'error': err
+ })
+ return res
+
+ # Delete related methods
+ #
+ def delete(self, container=None, objects=None, options=None):
+ """
+ Delete operations on an account, optional container and optional list
+ of objects.
+
+ :param container: The container to delete or delete from.
+ :param objects: The list of objects to delete.
+ :param options: A dictionary containing options to override the global
+ options specified during the service object creation::
+
+ {
+ 'yes_all': False,
+ 'leave_segments': False,
+ }
+
+ :returns: A generator for returning the results of the delete
+ operations. Each result yielded from the generator is either
+ a 'delete_container', 'delete_object' or 'delete_segment'
+ dictionary containing the results of an individual delete
+ operation.
+
+ :raises: ClientException
+ :raises: SwiftError
+ """
+ if options is not None:
+ options = dict(self._options, **options)
+ else:
+ options = self._options
+
+ rq = Queue()
+ if container is not None:
+ if objects is not None:
+ obj_dels = {}
+ for obj in objects:
+ obj_del = self.thread_manager.object_dd_pool.submit(
+ self._delete_object, container, obj, options,
+ results_queue=rq
+ )
+ obj_details = {'container': container, 'object': obj}
+ obj_dels[obj_del] = obj_details
+
+ # Start a thread to watch for upload results
+ Thread(
+ target=self._watch_futures, args=(obj_dels, rq)
+ ).start()
+
+ # yield results as they become available, raising the first
+ # encountered exception
+ res = get_from_queue(rq)
+ while res is not None:
+ yield res
+
+ # Cancel the remaining jobs if necessary
+ if options['fail_fast'] and not res['success']:
+ for d in obj_dels.keys():
+ d.cancel()
+
+ res = get_from_queue(rq)
+ else:
+ for res in self._delete_container(container, options):
+ yield res
+ else:
+ if objects:
+ raise SwiftError('Objects specified without container')
+ if options['yes_all']:
+ cancelled = False
+ containers = []
+ for part in self.list():
+ if part["success"]:
+ containers.extend(c['name'] for c in part['listing'])
+ else:
+ raise part["error"]
+
+ for con in containers:
+ if cancelled:
+ break
+ else:
+ for res in self._delete_container(
+ con, options=options):
+ yield res
+
+ # Cancel the remaining container deletes, but yield
+ # any pending results
+ if not cancelled and \
+ options['fail_fast'] and \
+ not res['success']:
+ cancelled = True
+
+ @staticmethod
+ def _delete_segment(conn, container, obj, results_queue=None):
+ results_dict = {}
+ try:
+ conn.delete_object(container, obj, response_dict=results_dict)
+ res = {
+ 'action': 'delete_segment',
+ 'container': container,
+ 'object': obj,
+ 'success': True,
+ 'attempts': conn.attempts,
+ 'response_dict': results_dict
+ }
+ except Exception as e:
+ res = {
+ 'action': 'delete_segment',
+ 'container': container,
+ 'object': obj,
+ 'success': False,
+ 'attempts': conn.attempts,
+ 'response_dict': results_dict,
+ 'exception': e
+ }
+
+ if results_queue is not None:
+ results_queue.put(res)
+ return res
+
+ def _delete_object(self, conn, container, obj, options,
+ results_queue=None):
+ try:
+ res = {
+ 'action': 'delete_object',
+ 'container': container,
+ 'object': obj
+ }
+ old_manifest = None
+ query_string = None
+
+ if not options['leave_segments']:
+ try:
+ headers = conn.head_object(container, obj)
+ old_manifest = headers.get('x-object-manifest')
+ if config_true_value(
+ headers.get('x-static-large-object')):
+ query_string = 'multipart-manifest=delete'
+ except ClientException as err:
+ if err.http_status != 404:
+ raise
+
+ results_dict = {}
+ conn.delete_object(container, obj, query_string=query_string,
+ response_dict=results_dict)
+
+ if old_manifest:
+
+ dlo_segments_deleted = True
+ segment_pool = self.thread_manager.segment_pool
+ s_container, s_prefix = old_manifest.split('/', 1)
+ s_container = unquote(s_container)
+ s_prefix = unquote(s_prefix).rstrip('/') + '/'
+
+ del_segs = []
+ for part in self.list(
+ container=s_container, options={'prefix': s_prefix}):
+ if part["success"]:
+ seg_list = [o["name"] for o in part["listing"]]
+ else:
+ raise part["error"]
+
+ for seg in seg_list:
+ print(seg)
+ del_seg = segment_pool.submit(
+ self._delete_segment, s_container,
+ seg, results_queue=results_queue
+ )
+ del_segs.append(del_seg)
+
+ for del_seg in interruptable_as_completed(del_segs):
+ del_res = del_seg.result()
+ if not del_res["success"]:
+ dlo_segments_deleted = False
+
+ res['dlo_segments_deleted'] = dlo_segments_deleted
+
+ res.update({
+ 'success': True,
+ 'response_dict': results_dict,
+ 'attempts': conn.attempts,
+ })
+
+ except Exception as err:
+ res['success'] = False
+ res['error'] = err
+ return res
+
+ return res
+
+ @staticmethod
+ def _delete_empty_container(conn, container):
+ results_dict = {}
+ try:
+ conn.delete_container(container, response_dict=results_dict)
+ res = {
+ 'action': 'delete_container',
+ 'container': container,
+ 'object': None,
+ 'success': True,
+ 'attempts': conn.attempts,
+ 'response_dict': results_dict
+ }
+ except Exception as e:
+ res = {
+ 'action': 'delete_container',
+ 'container': container,
+ 'object': None,
+ 'success': False,
+ 'response_dict': results_dict,
+ 'error': e
+ }
+ return res
+
+ def _delete_container(self, container, options):
+ try:
+ objs = []
+ for part in self.list(container=container):
+ if part["success"]:
+ objs.extend([
+ o['name'] for o in part['listing']
+ ])
+ else:
+ raise part["error"]
+
+ for res in self.delete(
+ container=container, objects=objs, options=options):
+ yield res
+
+ con_del = self.thread_manager.container_pool.submit(
+ self._delete_empty_container, container
+ )
+ con_del_res = get_future_result(con_del)
+
+ except Exception as err:
+ con_del_res = {
+ 'action': 'delete_container',
+ 'container': container,
+ 'object': None,
+ 'success': False,
+ 'error': err
+ }
+
+ yield con_del_res
+
+ # Capabilities related methods
+ #
+ def capabilities(self, url=None):
+ """
+ List the cluster capabilities.
+
+ :param url: Proxy URL of the cluster to retrieve capabilities.
+
+ :returns: A dictionary containing the capabilities of the cluster.
+
+ :raises: ClientException
+ :raises: SwiftError
+ """
+ res = {
+ 'action': 'capabilities'
+ }
+
+ try:
+ cap = self.thread_manager.container_pool.submit(
+ self._get_capabilities, url
+ )
+ capabilities = get_future_result(cap)
+ res.update({
+ 'success': True,
+ 'capabilities': capabilities
+ })
+ if url is not None:
+ res.update({
+ 'url': url
+ })
+ except ClientException as err:
+ if err.http_status != 404:
+ raise err
+ raise SwiftError('Account not found')
+
+ return res
+
+ @staticmethod
+ def _get_capabilities(conn, url):
+ return conn.get_capabilities(url)
+
+ # Helper methods
+ #
+ @staticmethod
+ def _watch_futures(futures, result_queue):
+ """
+ Watches a dict of futures and pushes their results onto the given
+ queue. We use this to wait for a set of futures which may create
+ futures of their own to wait for, whilst also allowing us to
+ immediately return the results of those sub-jobs.
+
+ When all futures have completed, None is pushed to the queue
+
+ If the future is cancelled, we use the dict to return details about
+ the cancellation.
+ """
+ futures_only = list(futures.keys())
+ for f in interruptable_as_completed(futures_only):
+ try:
+ r = f.result()
+ if r is not None:
+ result_queue.put(r)
+ except CancelledError:
+ details = futures[f]
+ res = details
+ res['status'] = 'cancelled'
+ result_queue.put(res)
+ except Exception as err:
+ details = futures[f]
+ res = details
+ res['success'] = False
+ res['error'] = err
+ result_queue.put(res)
+
+ result_queue.put(None)
diff --git a/swiftclient/shell.py b/swiftclient/shell.py
index fd876d9..d0ef8fa 100755
--- a/swiftclient/shell.py
+++ b/swiftclient/shell.py
@@ -20,60 +20,27 @@ import signal
import socket
import logging
-from errno import EEXIST, ENOENT
-from hashlib import md5
from optparse import OptionParser, OptionGroup, SUPPRESS_HELP
-from os import environ, listdir, makedirs, utime, _exit as os_exit
-from os.path import basename, dirname, getmtime, getsize, isdir, join, \
- sep as os_path_sep
-from random import shuffle
-from sys import argv as sys_argv, exit, stderr, stdout
-from time import sleep, time, gmtime, strftime
-from six.moves.urllib.parse import quote, unquote
-
-try:
- import simplejson as json
-except ImportError:
- import json
-
-from swiftclient import Connection, RequestException
-from swiftclient import command_helpers
-from swiftclient.utils import config_true_value, prt_bytes, generate_temp_url
-from swiftclient.multithreading import MultiThreadingManager
+from os import environ, walk, _exit as os_exit
+from os.path import isfile, isdir, join
+from sys import argv as sys_argv, exit, stderr
+from time import gmtime, strftime
+
+from swiftclient import RequestException
+from swiftclient.utils import config_true_value, generate_temp_url, prt_bytes
+from swiftclient.multithreading import OutputManager
from swiftclient.exceptions import ClientException
from swiftclient import __version__ as client_version
+from swiftclient.service import SwiftService, SwiftError, SwiftUploadObject
+from swiftclient.command_helpers import print_account_stats, \
+ print_container_stats, print_object_stats
BASENAME = 'swift'
-POLICY = 'X-Storage-Policy'
commands = ('delete', 'download', 'list', 'post',
'stat', 'upload', 'capabilities', 'info', 'tempurl')
-def get_conn(options):
- """
- Return a connection building it from the options.
- """
- return Connection(options.auth,
- options.user,
- options.key,
- options.retries,
- auth_version=options.auth_version,
- os_options=options.os_options,
- snet=options.snet,
- cacert=options.os_cacert,
- insecure=options.insecure,
- ssl_compression=options.ssl_compression)
-
-
-def mkdirs(path):
- try:
- makedirs(path)
- except OSError as err:
- if err.errno != EEXIST:
- raise
-
-
def immediate_exit(signum, frame):
stderr.write(" Aborted\n")
os_exit(2)
@@ -104,7 +71,7 @@ Optional arguments:
'''.strip("\n")
-def st_delete(parser, args, thread_manager):
+def st_delete(parser, args, output_manager):
parser.add_option(
'-a', '--all', action='store_true', dest='yes_all',
default=False, help='Delete all containers and objects.')
@@ -115,7 +82,7 @@ def st_delete(parser, args, thread_manager):
parser.add_option(
'', '--object-threads', type=int,
default=10, help='Number of threads to use for deleting objects. '
- 'Default is 10')
+ 'Default is 10.')
parser.add_option('', '--container-threads', type=int,
default=10, help='Number of threads to use for '
'deleting containers. '
@@ -123,138 +90,70 @@ def st_delete(parser, args, thread_manager):
(options, args) = parse_args(parser, args)
args = args[1:]
if (not args and not options.yes_all) or (args and options.yes_all):
- thread_manager.error('Usage: %s delete %s\n%s',
+ output_manager.error('Usage: %s delete %s\n%s',
BASENAME, st_delete_options,
st_delete_help)
return
- def _delete_segment(item, conn):
- (container, obj) = item
- conn.delete_object(container, obj)
- if options.verbose:
- if conn.attempts > 2:
- thread_manager.print_msg(
- '%s/%s [after %d attempts]', container,
- obj, conn.attempts)
+ _opts = vars(options)
+ _opts['object_dd_threads'] = options.object_threads
+ with SwiftService(options=_opts) as swift:
+ try:
+ if not args:
+ del_iter = swift.delete()
else:
- thread_manager.print_msg('%s/%s', container, obj)
+ container = args[0]
+ if '/' in container:
+ output_manager.error(
+ 'WARNING: / in container name; you '
+ 'might have meant %r instead of %r.' % (
+ container.replace('/', ' ', 1), container)
+ )
+ return
+ objects = args[1:]
+ if objects:
+ del_iter = swift.delete(container=container,
+ objects=objects)
+ else:
+ del_iter = swift.delete(container=container)
+
+ for r in del_iter:
+ if r['success']:
+ if options.verbose:
+ if r['action'] == 'delete_object':
+ c = r['container']
+ o = r['object']
+ p = '%s/%s' % (c, o) if options.yes_all else o
+ a = r['attempts']
+ if a > 1:
+ output_manager.print_msg(
+ '%s [after %d attempts]', p, a)
+ else:
+ output_manager.print_msg(p)
+
+ elif r['action'] == 'delete_segment':
+ c = r['container']
+ o = r['object']
+ p = '%s/%s' % (c, o)
+ a = r['attempts']
+ if a > 1:
+ output_manager.print_msg(
+ '%s [after %d attempts]', p, a)
+ else:
+ output_manager.print_msg(p)
- def _delete_object(item, conn):
- (container, obj) = item
- try:
- old_manifest = None
- query_string = None
- if not options.leave_segments:
- try:
- headers = conn.head_object(container, obj)
- old_manifest = headers.get('x-object-manifest')
- if config_true_value(
- headers.get('x-static-large-object')):
- query_string = 'multipart-manifest=delete'
- except ClientException as err:
- if err.http_status != 404:
- raise
- conn.delete_object(container, obj, query_string=query_string)
- if old_manifest:
- segment_manager = thread_manager.queue_manager(
- _delete_segment, options.object_threads,
- connection_maker=create_connection)
- segment_queue = segment_manager.queue
- scontainer, sprefix = old_manifest.split('/', 1)
- scontainer = unquote(scontainer)
- sprefix = unquote(sprefix).rstrip('/') + '/'
- for delobj in conn.get_container(scontainer,
- prefix=sprefix)[1]:
- segment_queue.put((scontainer, delobj['name']))
- if not segment_queue.empty():
- with segment_manager:
- pass
- if options.verbose:
- path = options.yes_all and join(container, obj) or obj
- if path[:1] in ('/', '\\'):
- path = path[1:]
- if conn.attempts > 1:
- thread_manager.print_msg('%s [after %d attempts]', path,
- conn.attempts)
else:
- thread_manager.print_msg(path)
- except ClientException as err:
- if err.http_status != 404:
- raise
- thread_manager.error("Object '%s/%s' not found", container, obj)
+ # Special case error prints
+ output_manager.error("An unexpected error occurred whilst "
+ "deleting: %s" % r['error'])
+ except SwiftError as err:
+ output_manager.error(err.value)
- def _delete_container(container, conn, object_queue):
- try:
- marker = ''
- while True:
- objects = [o['name'] for o in
- conn.get_container(container, marker=marker)[1]]
- if not objects:
- break
- for obj in objects:
- object_queue.put((container, obj))
- marker = objects[-1]
- while not object_queue.empty():
- sleep(0.05)
- attempts = 1
- while True:
- try:
- conn.delete_container(container)
- break
- except ClientException as err:
- if err.http_status != 409:
- raise
- if attempts > 10:
- raise
- attempts += 1
- sleep(1)
- except ClientException as err:
- if err.http_status != 404:
- raise
- thread_manager.error('Container %r not found', container)
-
- create_connection = lambda: get_conn(options)
- obj_manager = thread_manager.queue_manager(
- _delete_object, options.object_threads,
- connection_maker=create_connection)
- with obj_manager as object_queue:
- cont_manager = thread_manager.queue_manager(
- _delete_container, options.container_threads, object_queue,
- connection_maker=create_connection)
- with cont_manager as container_queue:
- if not args:
- conn = create_connection()
- try:
- marker = ''
- while True:
- containers = [
- c['name']
- for c in conn.get_account(marker=marker)[1]]
- if not containers:
- break
- for container in containers:
- container_queue.put(container)
- marker = containers[-1]
- except ClientException as err:
- if err.http_status != 404:
- raise
- thread_manager.error('Account not found')
- elif len(args) == 1:
- if '/' in args[0]:
- print(
- 'WARNING: / in container name; you might have meant '
- '%r instead of %r.' % (
- args[0].replace('/', ' ', 1), args[0]),
- file=stderr)
- container_queue.put(args[0])
- else:
- for obj in args[1:]:
- object_queue.put((args[0], obj))
st_download_options = '''[--all] [--marker] [--prefix <prefix>]
[--output <out_file>] [--object-threads <threads>]
[--container-threads <threads>] [--no-download]
- <container> [object]
+ [--skip-identical] <container> <object>
'''
st_download_help = '''
@@ -263,25 +162,25 @@ Download objects from containers.
Positional arguments:
<container> Name of container to download from. To download a
whole account, omit this and specify --all.
- [object] Name of object to download. Specify multiple times
+ <object> Name of object to download. Specify multiple times
for multiple objects. Omit this to download all
objects from the container.
Optional arguments:
- --all Indicates that you really want to download everything
- in the account.
+ --all Indicates that you really want to download
+ everything in the account.
--marker Marker to use when starting a container or account
download.
- --prefix <prefix> Only download items beginning with <prefix>.
+ --prefix <prefix> Only download items beginning with <prefix>
--output <out_file> For a single file download, stream the output to
<out_file>. Specifying "-" as <out_file> will
redirect to stdout.
--object-threads <threads>
Number of threads to use for downloading objects.
- Default is 10
+ Default is 10.
--container-threads <threads>
Number of threads to use for downloading containers.
- Default is 10
+ Default is 10.
--no-download Perform download(s), but don't actually write anything
to disk.
--header <header_name:header_value>
@@ -293,7 +192,7 @@ Optional arguments:
'''.strip("\n")
-def st_download(parser, args, thread_manager):
+def st_download(parser, args, output_manager):
parser.add_option(
'-a', '--all', action='store_true', dest='yes_all',
default=False, help='Indicates that you really want to download '
@@ -335,205 +234,107 @@ def st_download(parser, args, thread_manager):
args = args[1:]
if options.out_file == '-':
options.verbose = 0
+
if options.out_file and len(args) != 2:
exit('-o option only allowed for single file downloads')
+
if (not args and not options.yes_all) or (args and options.yes_all):
- thread_manager.error('Usage: %s download %s\n%s', BASENAME,
+ output_manager.error('Usage: %s download %s\n%s', BASENAME,
st_download_options, st_download_help)
return
- req_headers = split_headers(options.header, '', thread_manager)
-
- def _download_object(queue_arg, conn):
- if len(queue_arg) == 2:
- container, obj = queue_arg
- out_file = None
- elif len(queue_arg) == 3:
- container, obj, out_file = queue_arg
- else:
- raise Exception("Invalid queue_arg length of %s" % len(queue_arg))
- path = options.yes_all and join(container, obj) or obj
- path = path.lstrip(os_path_sep)
- if options.skip_identical and out_file != '-':
- filename = out_file if out_file else path
- try:
- fp = open(filename, 'rb')
- except IOError:
- pass
- else:
- with fp:
- md5sum = md5()
- while True:
- data = fp.read(65536)
- if not data:
- break
- md5sum.update(data)
- req_headers['If-None-Match'] = md5sum.hexdigest()
- try:
- start_time = time()
- headers, body = \
- conn.get_object(container, obj, resp_chunk_size=65536,
- headers=req_headers)
- headers_receipt = time()
- content_type = headers.get('content-type')
- if 'content-length' in headers:
- content_length = int(headers.get('content-length'))
- else:
- content_length = None
- etag = headers.get('etag')
- md5sum = None
- pseudodir = False
- no_file = options.no_download
- make_dir = not no_file and out_file != "-"
- if content_type.split(';', 1)[0] == 'text/directory':
- if make_dir and not isdir(path):
- mkdirs(path)
- read_length = 0
- if 'x-object-manifest' not in headers and \
- 'x-static-large-object' not in headers:
- md5sum = md5()
- for chunk in body:
- read_length += len(chunk)
- if md5sum:
- md5sum.update(chunk)
- else:
- dirpath = dirname(path)
- if make_dir and dirpath and not isdir(dirpath):
- mkdirs(dirpath)
- if not no_file:
- if out_file == "-":
- fp = stdout
- elif out_file:
- fp = open(out_file, 'wb')
- else:
- if basename(path):
- fp = open(path, 'wb')
- else:
- pseudodir = True
- no_file = True
- read_length = 0
- if 'x-object-manifest' not in headers and \
- 'x-static-large-object' not in headers:
- md5sum = md5()
- for chunk in body:
- if not no_file:
- fp.write(chunk)
- read_length += len(chunk)
- if md5sum:
- md5sum.update(chunk)
- if not no_file:
- fp.close()
- if md5sum and md5sum.hexdigest() != etag:
- thread_manager.error('%s: md5sum != etag, %s != %s',
- path, md5sum.hexdigest(), etag)
- if content_length is not None and read_length != content_length:
- thread_manager.error(
- '%s: read_length != content_length, %d != %d',
- path, read_length, content_length)
- if 'x-object-meta-mtime' in headers and not options.out_file \
- and not no_file:
- mtime = float(headers['x-object-meta-mtime'])
- utime(path, (mtime, mtime))
- if options.verbose:
- finish_time = time()
- auth_time = conn.auth_end_time - start_time
- headers_receipt = headers_receipt - start_time
- total_time = finish_time - start_time
- download_time = total_time - auth_time
- if pseudodir:
- time_str = (
- 'auth %.3fs, headers %.3fs, total %.3fs, pseudo' % (
- auth_time, headers_receipt, total_time))
- else:
- time_str = (
- 'auth %.3fs, headers %.3fs, total %.3fs, %.3f MB/s' % (
- auth_time, headers_receipt, total_time,
- float(read_length) / download_time / 1000000))
- if conn.attempts > 1:
- thread_manager.print_msg('%s [%s after %d attempts]', path,
- time_str, conn.attempts)
- else:
- thread_manager.print_msg('%s [%s]', path, time_str)
- except ClientException as err:
- if err.http_status == 304 and options.skip_identical:
- thread_manager.print_msg("Skipped identical file '%s'", path)
- return
- if err.http_status != 404:
- raise
- thread_manager.error("Object '%s/%s' not found", container, obj)
-
- def _download_container(queue_arg, conn):
- if len(queue_arg) == 2:
- container, object_queue = queue_arg
- prefix = None
- elif len(queue_arg) == 3:
- container, object_queue, prefix = queue_arg
- else:
- raise Exception("Invalid queue_arg length of %s" % len(queue_arg))
+
+ _opts = vars(options)
+ _opts['object_dd_threads'] = options.object_threads
+ with SwiftService(options=_opts) as swift:
try:
- marker = options.marker
- while True:
- objects = [
- o['name'] for o in
- conn.get_container(container, marker=marker,
- prefix=prefix)[1]]
- if not objects:
- break
- marker = objects[-1]
- shuffle(objects)
- for obj in objects:
- object_queue.put((container, obj))
- except ClientException as err:
- if err.http_status != 404:
- raise
- thread_manager.error('Container %r not found', container)
-
- create_connection = lambda: get_conn(options)
- obj_manager = thread_manager.queue_manager(
- _download_object, options.object_threads,
- connection_maker=create_connection)
- with obj_manager as object_queue:
- cont_manager = thread_manager.queue_manager(
- _download_container, options.container_threads,
- connection_maker=create_connection)
- with cont_manager as container_queue:
if not args:
- # --all case
- conn = create_connection()
- try:
- marker = options.marker
- while True:
- containers = [
- c['name'] for c in conn.get_account(
- marker=marker, prefix=options.prefix)[1]]
- if not containers:
- break
- marker = containers[-1]
- shuffle(containers)
- for container in containers:
- container_queue.put((container, object_queue))
- except ClientException as err:
- if err.http_status != 404:
- raise
- thread_manager.error('Account not found')
- elif len(args) == 1:
- if '/' in args[0]:
- print(
- 'WARNING: / in container name; you might have meant '
- '%r instead of %r.' % (
- args[0].replace('/', ' ', 1), args[0]),
- file=stderr)
- container_queue.put((args[0], object_queue, options.prefix))
+ down_iter = swift.download()
else:
- if len(args) == 2:
- obj = args[1]
- object_queue.put((args[0], obj, options.out_file))
+ container = args[0]
+ if '/' in container:
+ output_manager.error(
+ 'WARNING: / in container name; you '
+ 'might have meant %r instead of %r.' % (
+ container.replace('/', ' ', 1), container)
+ )
+ return
+ objects = args[1:]
+ if not objects:
+ down_iter = swift.download(container)
else:
- for obj in args[1:]:
- object_queue.put((args[0], obj))
+ down_iter = swift.download(container, objects)
+
+ for down in down_iter:
+ if options.out_file == '-' and 'contents' in down:
+ for chunk in down['contents']:
+ output_manager.print_msg(chunk)
+ else:
+ if down['success']:
+ if options.verbose:
+ start_time = down['start_time']
+ headers_receipt = \
+ down['headers_receipt'] - start_time
+ auth_time = down['auth_end_time'] - start_time
+ finish_time = down['finish_time']
+ read_length = down['read_length']
+ attempts = down['attempts']
+ total_time = finish_time - start_time
+ down_time = total_time - auth_time
+ _mega = 1000000
+ if down['pseudodir']:
+ time_str = (
+ 'auth %.3fs, headers %.3fs, total %.3fs, '
+ 'pseudo' % (
+ auth_time, headers_receipt,
+ total_time
+ )
+ )
+ else:
+ speed = float(read_length) / down_time / _mega
+ time_str = (
+ 'auth %.3fs, headers %.3fs, total %.3fs, '
+ '%.3f MB/s' % (
+ auth_time, headers_receipt,
+ total_time, speed
+ )
+ )
+ path = down['path']
+ if attempts > 1:
+ output_manager.print_msg(
+ '%s [%s after %d attempts]',
+ path, time_str, attempts
+ )
+ else:
+ output_manager.print_msg(
+ '%s [%s]', path, time_str
+ )
+ else:
+ error = down['error']
+ path = down['path']
+ container = down['container']
+ obj = down['object']
+ if isinstance(error, ClientException):
+ if error.http_status == 304 and \
+ options.skip_identical:
+ output_manager.print_msg(
+ "Skipped identical file '%s'", path)
+ continue
+ if error.http_status == 404:
+ output_manager.error(
+ "Object '%s/%s' not found", container, obj)
+ continue
+ output_manager.error(
+ "Error downloading object '%s/%s': %s",
+ container, obj, error)
+
+ except SwiftError as e:
+ output_manager.error(e.value)
+
st_list_options = '''[--long] [--lh] [--totals] [--prefix <prefix>]
[--delimiter <delimiter>]
'''
+
st_list_help = '''
Lists the containers for the account or the objects for a container.
@@ -552,7 +353,7 @@ Optional arguments:
'''.strip('\n')
-def st_list(parser, args, thread_manager):
+def st_list(parser, args, output_manager):
parser.add_option(
'-l', '--long', dest='long', action='store_true', default=False,
help='Long listing format, similar to ls -l.')
@@ -561,95 +362,104 @@ def st_list(parser, args, thread_manager):
default=False, help='Report sizes in human readable format, '
"similar to ls -lh.")
parser.add_option(
- '-t', '--totals', dest='totals', action='store_true', default=False,
- help='Used with -l or --lh, only report totals.')
+ '-t', '--totals', dest='totals',
+ help='used with -l or --lh, only report totals.',
+ action='store_true', default=False)
parser.add_option(
'-p', '--prefix', dest='prefix',
help='Only list items beginning with the prefix.')
parser.add_option(
'-d', '--delimiter', dest='delimiter',
- help='Roll up items with the given delimiter. '
- 'For containers only. See OpenStack Swift API documentation for '
- 'what this means.')
+ help='Roll up items with the given delimiter. For containers '
+ 'only. See OpenStack Swift API documentation for '
+ 'what this means.')
(options, args) = parse_args(parser, args)
args = args[1:]
if options.delimiter and not args:
exit('-d option only allowed for container listings')
- if len(args) > 1 or len(args) == 1 and args[0].find('/') >= 0:
- thread_manager.error('Usage: %s list %s\n%s', BASENAME,
- st_list_options, st_list_help)
- return
- conn = get_conn(options)
- try:
- marker = ''
- total_count = total_bytes = 0
- while True:
+ _opts = vars(options).copy()
+ if _opts['human']:
+ _opts.pop('human')
+ _opts['long'] = True
+
+ with SwiftService(options=_opts) as swift:
+ try:
if not args:
- items = \
- conn.get_account(marker=marker, prefix=options.prefix)[1]
+ stats_parts_gen = swift.list()
else:
- items = conn.get_container(
- args[0], marker=marker,
- prefix=options.prefix, delimiter=options.delimiter)[1]
- if not items:
- break
- for item in items:
- item_name = item.get('name')
-
- if not options.long and not options.human:
- thread_manager.print_msg(
- item.get('name', item.get('subdir')))
+ container = args[0]
+ args = args[1:]
+ if "/" in container or args:
+ output_manager.error(
+ 'Usage: %s list %s\n%s', BASENAME,
+ st_list_options, st_list_help)
+ return
else:
- item_bytes = item.get('bytes')
- total_bytes += item_bytes
- if len(args) == 0: # listing containers
- byte_str = prt_bytes(item_bytes, options.human)
- count = item.get('count')
- total_count += count
- try:
- meta = conn.head_container(item_name)
- utc = gmtime(float(meta.get('x-timestamp')))
- datestamp = strftime('%Y-%m-%d %H:%M:%S', utc)
- except ClientException:
- datestamp = '????-??-?? ??:??:??'
- if not options.totals:
- thread_manager.print_msg("%5s %s %s %s", count,
- byte_str, datestamp,
- item_name)
- else: # list container contents
- subdir = item.get('subdir')
- if subdir is None:
- byte_str = prt_bytes(item_bytes, options.human)
- date, xtime = item.get('last_modified').split('T')
- xtime = xtime.split('.')[0]
+ stats_parts_gen = swift.list(container=container)
+
+ for stats in stats_parts_gen:
+ total_count = total_bytes = 0
+ container = stats.get("container", None)
+ if stats["success"]:
+ for item in stats["listing"]:
+ item_name = item.get('name')
+
+ if not options.long and not options.human:
+ output_manager.print_msg(
+ item.get('name', item.get('subdir')))
else:
- byte_str = prt_bytes(0, options.human)
- date = xtime = ''
- item_name = subdir
- if not options.totals:
- thread_manager.print_msg("%s %10s %8s %s",
- byte_str, date, xtime,
- item_name)
-
- marker = items[-1].get('name', items[-1].get('subdir'))
-
- # report totals
- if options.long or options.human:
- if len(args) == 0:
- thread_manager.print_msg(
- "%5s %s", prt_bytes(total_count, True),
- prt_bytes(total_bytes, options.human))
- else:
- thread_manager.print_msg(prt_bytes(total_bytes, options.human))
+ item_bytes = item.get('bytes')
+ total_bytes += item_bytes
+ if not container: # listing containers
+ byte_str = prt_bytes(item_bytes, options.human)
+ count = item.get('count')
+ total_count += count
+ try:
+ meta = item.get('meta')
+ utc = gmtime(
+ float(meta.get('x-timestamp')))
+ datestamp = strftime(
+ '%Y-%m-%d %H:%M:%S', utc)
+ except ClientException:
+ datestamp = '????-??-?? ??:??:??'
+ if not options.totals:
+ output_manager.print_msg(
+ "%5s %s %s %s", count, byte_str,
+ datestamp, item_name)
+ else: # list container contents
+ subdir = item.get('subdir')
+ if subdir is None:
+ byte_str = prt_bytes(
+ item_bytes, options.human)
+ date, xtime = item.get(
+ 'last_modified').split('T')
+ xtime = xtime.split('.')[0]
+ else:
+ byte_str = prt_bytes(0, options.human)
+ date = xtime = ''
+ item_name = subdir
+ if not options.totals:
+ output_manager.print_msg(
+ "%s %10s %8s %s", byte_str, date,
+ xtime, item_name)
+
+ # report totals
+ if options.long or options.human:
+ if not container:
+ output_manager.print_msg(
+ "%5s %s", prt_bytes(total_count, True),
+ prt_bytes(total_bytes, options.human))
+ else:
+ output_manager.print_msg(
+ prt_bytes(total_bytes, options.human))
+
+ else:
+ raise stats["error"]
+
+ except SwiftError as e:
+ output_manager.error(e.value)
- except ClientException as err:
- if err.http_status != 404:
- raise
- if not args:
- thread_manager.error('Account not found')
- else:
- thread_manager.error('Container %r not found', args[0])
st_stat_options = '''[--lh]
[container] [object]
@@ -660,8 +470,7 @@ Displays information for the account, container, or object.
Positional arguments:
[container] Name of container to stat from.
- [object] Name of object to stat. Specify multiple times
- for multiple objects.
+ [object] Name of object to stat.
Optional arguments:
--lh Report sizes in human readable format similar to
@@ -669,44 +478,57 @@ Optional arguments:
'''.strip('\n')
-def st_stat(parser, args, thread_manager):
+def st_stat(parser, args, output_manager):
parser.add_option(
'--lh', dest='human', action='store_true', default=False,
help='Report sizes in human readable format similar to ls -lh.')
(options, args) = parse_args(parser, args)
args = args[1:]
- conn = get_conn(options)
- if not args:
- try:
- command_helpers.stat_account(conn, options, thread_manager)
- except ClientException as err:
- if err.http_status != 404:
- raise
- thread_manager.error('Account not found')
- elif len(args) == 1:
- if '/' in args[0]:
- print(
- 'WARNING: / in container name; you might have meant %r instead'
- ' of %r.' % (
- args[0].replace('/', ' ', 1), args[0]),
- file=stderr)
- try:
- command_helpers.stat_container(conn, options, args,
- thread_manager)
- except ClientException as err:
- if err.http_status != 404:
- raise
- thread_manager.error('Container %r not found', args[0])
- elif len(args) == 2:
- try:
- command_helpers.stat_object(conn, options, args, thread_manager)
- except ClientException as err:
- if err.http_status != 404:
- raise
- thread_manager.error("Object %s/%s not found", args[0], args[1])
- else:
- thread_manager.error('Usage: %s stat %s\n%s', BASENAME,
- st_stat_options, st_stat_help)
+
+ _opts = vars(options)
+
+ with SwiftService(options=_opts) as swift:
+ if not args:
+ stat_result = swift.stat()
+ items = stat_result['items']
+ headers = stat_result['headers']
+ print_account_stats(items, headers, output_manager)
+ else:
+ try:
+ container = args[0]
+ if '/' in container:
+ output_manager.error(
+ 'WARNING: / in container name; you might have '
+ 'meant %r instead of %r.' %
+ (container.replace('/', ' ', 1), container))
+ return
+ args = args[1:]
+ if not args:
+ stat_result = swift.stat(container=container)
+ items = stat_result['items']
+ headers = stat_result['headers']
+ print_container_stats(items, headers, output_manager)
+ else:
+ if len(args) == 1:
+ objects = [args[0]]
+ stat_results = swift.stat(
+ container=container, objects=objects)
+ for stat_result in stat_results: # only 1 result
+ if stat_result["success"]:
+ items = stat_result['items']
+ headers = stat_result['headers']
+ print_object_stats(
+ items, headers, output_manager
+ )
+ else:
+ raise(stat_result["error"])
+ else:
+ output_manager.error(
+ 'Usage: %s stat %s\n%s', BASENAME,
+ st_stat_options, st_stat_help)
+
+ except SwiftError as e:
+ output_manager.error(e.value)
st_post_options = '''[--read-acl <acl>] [--write-acl <acl>] [--sync-to]
@@ -739,7 +561,7 @@ Optional arguments:
'''.strip('\n')
-def st_post(parser, args, thread_manager):
+def st_post(parser, args, output_manager):
parser.add_option(
'-r', '--read-acl', dest='read_acl', help='Read ACL for containers. '
'Quick summary of ACL syntax: .r:*, .r:-.example.com, '
@@ -768,54 +590,41 @@ def st_post(parser, args, thread_manager):
if (options.read_acl or options.write_acl or options.sync_to or
options.sync_key) and not args:
exit('-r, -w, -t, and -k options only allowed for containers')
- conn = get_conn(options)
- if not args:
- headers = split_headers(
- options.meta, 'X-Account-Meta-', thread_manager)
- headers.update(split_headers(options.header, '', thread_manager))
- try:
- conn.post_account(headers=headers)
- except ClientException as err:
- if err.http_status != 404:
- raise
- thread_manager.error('Account not found')
- elif len(args) == 1:
- if '/' in args[0]:
- print(
- 'WARNING: / in container name; you might have meant %r instead'
- ' of %r.' % (
- args[0].replace('/', ' ', 1), args[0]),
- file=stderr)
- headers = split_headers(options.meta, 'X-Container-Meta-',
- thread_manager)
- headers.update(split_headers(options.header, '', thread_manager))
- if options.read_acl is not None:
- headers['X-Container-Read'] = options.read_acl
- if options.write_acl is not None:
- headers['X-Container-Write'] = options.write_acl
- if options.sync_to is not None:
- headers['X-Container-Sync-To'] = options.sync_to
- if options.sync_key is not None:
- headers['X-Container-Sync-Key'] = options.sync_key
- try:
- conn.post_container(args[0], headers=headers)
- except ClientException as err:
- if err.http_status != 404:
- raise
- conn.put_container(args[0], headers=headers)
- elif len(args) == 2:
- headers = split_headers(options.meta, 'X-Object-Meta-', thread_manager)
- # add header options to the headers object for the request.
- headers.update(split_headers(options.header, '', thread_manager))
+
+ _opts = vars(options)
+
+ with SwiftService(options=_opts) as swift:
try:
- conn.post_object(args[0], args[1], headers=headers)
- except ClientException as err:
- if err.http_status != 404:
- raise
- thread_manager.error("Object '%s/%s' not found", args[0], args[1])
- else:
- thread_manager.error('Usage: %s post %s\n%s', BASENAME,
- st_post_options, st_post_help)
+ if not args:
+ swift.post()
+ else:
+ container = args[0]
+ if '/' in container:
+ output_manager.error(
+ 'WARNING: / in container name; you might have '
+ 'meant %r instead of %r.' %
+ (args[0].replace('/', ' ', 1), args[0]))
+ return
+ args = args[1:]
+ if args:
+ if len(args) == 1:
+ objects = [args[0]]
+ results_iterator = swift.post(
+ container=container, objects=objects
+ )
+ for result in results_iterator: # only 1 result
+ if not result["success"]:
+ raise(result["error"])
+ else:
+ output_manager.error(
+ 'Usage: %s post %s\n%s', BASENAME,
+ st_post_options, st_post_help)
+ else:
+ swift.post(container=container)
+
+ except SwiftError as e:
+ output_manager.error(e.value)
+
st_upload_options = '''[--changed] [--skip-identical] [--segment-size <size>]
[--segment-container <container>] [--leave-segments]
@@ -867,7 +676,7 @@ Optional arguments:
'''.strip('\n')
-def st_upload(parser, args, thread_manager):
+def st_upload(parser, args, output_manager):
parser.add_option(
'-c', '--changed', action='store_true', dest='changed',
default=False, help='Only upload files that have changed since '
@@ -917,336 +726,112 @@ def st_upload(parser, args, thread_manager):
(options, args) = parse_args(parser, args)
args = args[1:]
if len(args) < 2:
- thread_manager.error(
+ output_manager.error(
'Usage: %s upload %s\n%s', BASENAME, st_upload_options,
st_upload_help)
return
+ else:
+ container = args[0]
+ files = args[1:]
- def _segment_job(job, conn):
- if job.get('delete', False):
- conn.delete_object(job['container'], job['obj'])
+ if options.object_name is not None:
+ if len(files) > 1:
+ output_manager.error('object-name only be used with 1 file or dir')
+ return
else:
- fp = open(job['path'], 'rb')
- fp.seek(job['segment_start'])
- seg_container = args[0] + '_segments'
- if options.segment_container:
- seg_container = options.segment_container
- etag = conn.put_object(job.get('container', seg_container),
- job['obj'], fp,
- content_length=job['segment_size'])
- job['segment_location'] = '/%s/%s' % (seg_container, job['obj'])
- job['segment_etag'] = etag
- if options.verbose and 'log_line' in job:
- if conn.attempts > 1:
- thread_manager.print_msg('%s [after %d attempts]',
- job['log_line'], conn.attempts)
- else:
- thread_manager.print_msg(job['log_line'])
- return job
-
- def _object_job(job, conn):
- path = job['path']
- container = job.get('container', args[0])
- dir_marker = job.get('dir_marker', False)
- object_name = job['object_name']
+ orig_path = files[0]
+
+ _opts = vars(options)
+ _opts['object_uu_threads'] = options.object_threads
+ with SwiftService(options=_opts) as swift:
try:
- if object_name is not None:
- object_name.replace("\\", "/")
- obj = object_name
- else:
- obj = path
- if obj.startswith('./') or obj.startswith('.\\'):
- obj = obj[2:]
- if obj.startswith('/'):
- obj = obj[1:]
- put_headers = {'x-object-meta-mtime': "%f" % getmtime(path)}
- if dir_marker:
- if options.changed:
- try:
- headers = conn.head_object(container, obj)
- ct = headers.get('content-type')
- cl = int(headers.get('content-length'))
- et = headers.get('etag')
- mt = headers.get('x-object-meta-mtime')
- if ct.split(';', 1)[0] == 'text/directory' and \
- cl == 0 and \
- et == 'd41d8cd98f00b204e9800998ecf8427e' and \
- mt == put_headers['x-object-meta-mtime']:
- return
- except ClientException as err:
- if err.http_status != 404:
- raise
- conn.put_object(container, obj, '', content_length=0,
- content_type='text/directory',
- headers=put_headers)
- else:
- # We need to HEAD all objects now in case we're overwriting a
- # manifest object and need to delete the old segments
- # ourselves.
- old_manifest = None
- old_slo_manifest_paths = []
- new_slo_manifest_paths = set()
- if options.changed or options.skip_identical \
- or not options.leave_segments:
- if options.skip_identical:
- checksum = None
- try:
- fp = open(path, 'rb')
- except IOError:
- pass
+ objs = []
+ dir_markers = []
+ for f in files:
+ if isfile(f):
+ objs.append(f)
+ elif isdir(f):
+ for (_dir, _ds, _fs) in walk(f):
+ if not (_ds + _fs):
+ dir_markers.append(_dir)
else:
- with fp:
- md5sum = md5()
- while True:
- data = fp.read(65536)
- if not data:
- break
- md5sum.update(data)
- checksum = md5sum.hexdigest()
- try:
- headers = conn.head_object(container, obj)
- cl = int(headers.get('content-length'))
- mt = headers.get('x-object-meta-mtime')
- if (options.skip_identical and
- checksum == headers.get('etag')):
- thread_manager.print_msg(
- "Skipped identical file '%s'", path)
- return
- if options.changed and cl == getsize(path) and \
- mt == put_headers['x-object-meta-mtime']:
- return
- if not options.leave_segments:
- old_manifest = headers.get('x-object-manifest')
- if config_true_value(
- headers.get('x-static-large-object')):
- headers, manifest_data = conn.get_object(
- container, obj,
- query_string='multipart-manifest=get')
- for old_seg in json.loads(manifest_data):
- seg_path = old_seg['name'].lstrip('/')
- if isinstance(seg_path, unicode):
- seg_path = seg_path.encode('utf-8')
- old_slo_manifest_paths.append(seg_path)
- except ClientException as err:
- if err.http_status != 404:
- raise
- # Merge the command line header options to the put_headers
- put_headers.update(split_headers(options.header, '',
- thread_manager))
- # Don't do segment job if object is not big enough
- if options.segment_size and \
- getsize(path) > int(options.segment_size):
- seg_container = container + '_segments'
- if options.segment_container:
- seg_container = options.segment_container
- full_size = getsize(path)
-
- slo_segments = []
- error_counter = [0]
- segment_manager = thread_manager.queue_manager(
- _segment_job, options.segment_threads,
- store_results=slo_segments,
- error_counter=error_counter,
- connection_maker=create_connection)
- with segment_manager as segment_queue:
- segment = 0
- segment_start = 0
- while segment_start < full_size:
- segment_size = int(options.segment_size)
- if segment_start + segment_size > full_size:
- segment_size = full_size - segment_start
- if options.use_slo:
- segment_name = '%s/slo/%s/%s/%s/%08d' % (
- obj, put_headers['x-object-meta-mtime'],
- full_size, options.segment_size, segment)
- else:
- segment_name = '%s/%s/%s/%s/%08d' % (
- obj, put_headers['x-object-meta-mtime'],
- full_size, options.segment_size, segment)
- segment_queue.put(
- {'path': path, 'obj': segment_name,
- 'segment_start': segment_start,
- 'segment_size': segment_size,
- 'segment_index': segment,
- 'log_line': '%s segment %s' % (obj, segment)})
- segment += 1
- segment_start += segment_size
- if error_counter[0]:
- raise ClientException(
- 'Aborting manifest creation '
- 'because not all segments could be uploaded. %s/%s'
- % (container, obj))
- if options.use_slo:
- slo_segments.sort(key=lambda d: d['segment_index'])
- for seg in slo_segments:
- seg_loc = seg['segment_location'].lstrip('/')
- if isinstance(seg_loc, unicode):
- seg_loc = seg_loc.encode('utf-8')
- new_slo_manifest_paths.add(seg_loc)
-
- manifest_data = json.dumps([
- {'path': d['segment_location'],
- 'etag': d['segment_etag'],
- 'size_bytes': d['segment_size']}
- for d in slo_segments])
-
- put_headers['x-static-large-object'] = 'true'
- conn.put_object(container, obj, manifest_data,
- headers=put_headers,
- query_string='multipart-manifest=put')
- else:
- new_object_manifest = '%s/%s/%s/%s/%s/' % (
- quote(seg_container), quote(obj),
- put_headers['x-object-meta-mtime'], full_size,
- options.segment_size)
- if old_manifest and old_manifest.rstrip('/') == \
- new_object_manifest.rstrip('/'):
- old_manifest = None
- put_headers['x-object-manifest'] = new_object_manifest
- conn.put_object(container, obj, '', content_length=0,
- headers=put_headers)
+ objs.extend([join(_dir, _f) for _f in _fs])
else:
- conn.put_object(
- container, obj, open(path, 'rb'),
- content_length=getsize(path), headers=put_headers)
- if old_manifest or old_slo_manifest_paths:
- segment_manager = thread_manager.queue_manager(
- _segment_job, options.segment_threads,
- connection_maker=create_connection)
- segment_queue = segment_manager.queue
- if old_manifest:
- scontainer, sprefix = old_manifest.split('/', 1)
- scontainer = unquote(scontainer)
- sprefix = unquote(sprefix).rstrip('/') + '/'
- for delobj in conn.get_container(scontainer,
- prefix=sprefix)[1]:
- segment_queue.put(
- {'delete': True,
- 'container': scontainer,
- 'obj': delobj['name']})
- if old_slo_manifest_paths:
- for seg_to_delete in old_slo_manifest_paths:
- if seg_to_delete in new_slo_manifest_paths:
- continue
- scont, sobj = \
- seg_to_delete.split('/', 1)
- segment_queue.put(
- {'delete': True,
- 'container': scont, 'obj': sobj})
- if not segment_queue.empty():
- with segment_manager:
- pass
- if options.verbose:
- if conn.attempts > 1:
- thread_manager.print_msg('%s [after %d attempts]', obj,
- conn.attempts)
- else:
- thread_manager.print_msg(obj)
- except OSError as err:
- if err.errno != ENOENT:
- raise
- thread_manager.error('Local file %r not found', path)
-
- def _upload_dir(path, object_queue, object_name):
- names = listdir(path)
- if not names:
- object_queue.put({'path': path, 'object_name': object_name,
- 'dir_marker': True})
- else:
- for name in listdir(path):
- subpath = join(path, name)
- subobjname = None
- if object_name is not None:
- subobjname = join(object_name, name)
- if isdir(subpath):
- _upload_dir(subpath, object_queue, subobjname)
+ output_manager.error("Local file '%s' not found" % f)
+
+ # Now that we've collected all the required files and dir markers
+ # build the tuples for the call to upload
+ if options.object_name is not None:
+ objs = [
+ SwiftUploadObject(
+ o, object_name=o.replace(
+ orig_path, options.object_name, 1
+ )
+ ) for o in objs
+ ]
+ dir_markers = [
+ SwiftUploadObject(
+ None, object_name=d.replace(
+ orig_path, options.object_name, 1
+ ), options={'dir_marker': True}
+ ) for d in dir_markers
+ ]
+
+ for r in swift.upload(container, objs + dir_markers):
+ if r['success']:
+ if options.verbose:
+ if 'attempts' in r and r['attempts'] > 1:
+ if 'object' in r:
+ output_manager.print_msg(
+ '%s [after %d attempts]' %
+ (r['object'],
+ r['attempts'])
+ )
+ else:
+ if 'object' in r:
+ output_manager.print_msg(r['object'])
+ elif 'for_object' in r:
+ output_manager.print_msg(
+ '%s segment %s' % (r['for_object'],
+ r['segment_index'])
+ )
else:
- object_queue.put({'path': subpath,
- 'object_name': subobjname})
-
- create_connection = lambda: get_conn(options)
- conn = create_connection()
-
- # Try to create the container, just in case it doesn't exist. If this
- # fails, it might just be because the user doesn't have container PUT
- # permissions, so we'll ignore any error. If there's really a problem,
- # it'll surface on the first object PUT.
- container_name = args[0]
- try:
- policy_header = {}
- _header = split_headers(options.header)
- if POLICY in _header:
- policy_header[POLICY] = \
- _header[POLICY]
- try:
- conn.put_container(args[0], policy_header)
- except ClientException as err:
- if err.http_status != 409:
- raise
- if POLICY in _header:
- thread_manager.error('Error trying to create %s with '
- 'Storage Policy %s', args[0],
- _header[POLICY].strip())
- if options.segment_size is not None:
- container_name = seg_container = args[0] + '_segments'
- if options.segment_container:
- container_name = seg_container = options.segment_container
- seg_headers = {}
- if POLICY in _header:
- seg_headers[POLICY] = \
- _header[POLICY]
- else:
- # Since no storage policy was specified on the command line,
- # rather than just letting swift pick the default storage
- # policy, we'll try to create the segments container with the
- # same as the upload container
- _meta = conn.head_container(args[0])
- if 'x-storage-policy' in _meta:
- seg_headers[POLICY] = \
- _meta.get('x-storage-policy')
- try:
- conn.put_container(seg_container, seg_headers)
- except ClientException as err:
- if err.http_status != 409:
- raise
- if POLICY in seg_headers:
- thread_manager.error('Error trying to create %s with '
- 'Storage Policy %s', seg_container,
- seg_headers[POLICY].strip())
- except ClientException as err:
- msg = ' '.join(str(x) for x in (err.http_status, err.http_reason))
- if err.http_response_content:
- if msg:
- msg += ': '
- msg += err.http_response_content[:60]
- thread_manager.error(
- 'Error trying to create container %r: %s', container_name,
- msg)
- except Exception as err:
- thread_manager.error(
- 'Error trying to create container %r: %s', container_name,
- err)
-
- if options.object_name is not None:
- if len(args[1:]) > 1:
- thread_manager.error('object-name only be used with 1 file or dir')
- return
- object_name = options.object_name
+ error = r['error']
+ if isinstance(error, SwiftError):
+ output_manager.error("%s" % error)
+ elif isinstance(error, ClientException):
+ if r['action'] == "create_container":
+ if 'X-Storage-Policy' in r['headers']:
+ output_manager.error(
+ 'Error trying to create container %s with '
+ 'Storage Policy %s', container,
+ r['headers']['X-Storage-Policy'].strip()
+ )
+ else:
+ msg = ' '.join(str(x) for x in (
+ error.http_status, error.http_reason)
+ )
+ if error.http_response_content:
+ if msg:
+ msg += ': '
+ msg += error.http_response_content[:60]
+ output_manager.error(
+ 'Error trying to create container %r: %s',
+ container, msg
+ )
+ else:
+ output_manager.error("%s" % error)
+ else:
+ if r['action'] == "create_container":
+ output_manager.error(
+ 'Error trying to create container %r: %s',
+ container, error
+ )
+ else:
+ output_manager.error("%s" % error)
- object_manager = thread_manager.queue_manager(
- _object_job, options.object_threads,
- connection_maker=create_connection)
- with object_manager as object_queue:
- try:
- for arg in args[1:]:
- if isdir(arg):
- _upload_dir(arg, object_queue, object_name)
- else:
- object_queue.put({'path': arg, 'object_name': object_name})
- except ClientException as err:
- if err.http_status != 404:
- raise
- thread_manager.error('Account not found')
+ except SwiftError as e:
+ output_manager.error("%s" % e)
st_capabilities_options = "[<proxy_url>]"
@@ -1260,36 +845,48 @@ Optional positional arguments:
st_info_help = st_capabilities_help
-def st_capabilities(parser, args, thread_manager):
+def st_capabilities(parser, args, output_manager):
def _print_compo_cap(name, capabilities):
for feature, options in sorted(capabilities.items(),
key=lambda x: x[0]):
- thread_manager.print_msg("%s: %s" % (name, feature))
+ output_manager.print_msg("%s: %s" % (name, feature))
if options:
- thread_manager.print_msg(" Options:")
+ output_manager.print_msg(" Options:")
for key, value in sorted(options.items(),
key=lambda x: x[0]):
- thread_manager.print_msg(" %s: %s" % (key, value))
+ output_manager.print_msg(" %s: %s" % (key, value))
+
(options, args) = parse_args(parser, args)
- if (args and len(args) > 2):
- thread_manager.error('Usage: %s capabilities %s\n%s',
+ if args and len(args) > 2:
+ output_manager.error('Usage: %s capabilities %s\n%s',
BASENAME,
st_capabilities_options, st_capabilities_help)
return
- conn = get_conn(options)
- url = None
- if len(args) == 2:
- url = args[1]
- capabilities = conn.get_capabilities(url)
- _print_compo_cap('Core', {'swift': capabilities['swift']})
- del capabilities['swift']
- _print_compo_cap('Additional middleware', capabilities)
+
+ _opts = vars(options)
+ with SwiftService(options=_opts) as swift:
+ try:
+ if len(args) == 2:
+ url = args[1]
+ capabilities_result = swift.capabilities(url)
+ capabilities = capabilities_result['capabilities']
+ else:
+ capabilities_result = swift.capabilities()
+ capabilities = capabilities_result['capabilities']
+
+ _print_compo_cap('Core', {'swift': capabilities['swift']})
+ del capabilities['swift']
+ _print_compo_cap('Additional middleware', capabilities)
+ except SwiftError as e:
+ output_manager.error(e.value)
+
st_info = st_capabilities
st_tempurl_options = '<method> <seconds> <path> <key>'
+
st_tempurl_help = '''
Generates a temporary URL for a Swift object.
@@ -1327,30 +924,6 @@ def st_tempurl(parser, args, thread_manager):
thread_manager.print_msg(url)
-def split_headers(options, prefix='', thread_manager=None):
- """
- Splits 'Key: Value' strings and returns them as a dictionary.
-
- :param options: An array of 'Key: Value' strings
- :param prefix: String to prepend to all of the keys in the dictionary.
- :param thread_manager: MultiThreadingManager for thread safe error
- reporting.
- """
- headers = {}
- for item in options:
- split_item = item.split(':', 1)
- if len(split_item) == 2:
- headers[(prefix + split_item[0]).title()] = split_item[1]
- else:
- error_string = "Metadata parameter %s must contain a ':'.\n%s" \
- % (item, st_post_help)
- if thread_manager:
- thread_manager.error(error_string)
- else:
- exit(error_string)
- return headers
-
-
def parse_args(parser, args, enforce_requires=True):
if not args:
args = ['-h']
@@ -1480,7 +1053,6 @@ Positional arguments:
capabilities List cluster capabilities.
tempurl Create a temporary URL
-
Examples:
%%prog download --help
@@ -1517,7 +1089,7 @@ Examples:
'of all http queries regardless of result status.')
parser.add_option('--info', action='store_true', dest='info',
default=False, help='Show the curl commands and results '
- ' of all http queries which return an error.')
+ 'of all http queries which return an error.')
parser.add_option('-q', '--quiet', action='store_const', dest='verbose',
const=0, default=1, help='Suppress status output.')
parser.add_option('-A', '--auth', dest='auth',
@@ -1708,16 +1280,15 @@ Examples:
elif options.info:
logging.basicConfig(level=logging.INFO)
- had_error = False
+ with OutputManager() as output:
- with MultiThreadingManager() as thread_manager:
parser.usage = globals()['st_%s_help' % args[0]]
try:
- globals()['st_%s' % args[0]](parser, argv[1:], thread_manager)
+ globals()['st_%s' % args[0]](parser, argv[1:], output)
except (ClientException, RequestException, socket.error) as err:
- thread_manager.error(str(err))
+ output.error(str(err))
- had_error = thread_manager.error_count
+ had_error = output.error_count > 0
if had_error:
exit(1)
diff --git a/tests/unit/test_command_helpers.py b/tests/unit/test_command_helpers.py
index 8bb9dc1..ad8012e 100644
--- a/tests/unit/test_command_helpers.py
+++ b/tests/unit/test_command_helpers.py
@@ -22,7 +22,7 @@ from six import StringIO
import testtools
from swiftclient import command_helpers as h
-from swiftclient.multithreading import MultiThreadingManager
+from swiftclient.multithreading import OutputManager
class TestStatHelpers(testtools.TestCase):
@@ -34,10 +34,10 @@ class TestStatHelpers(testtools.TestCase):
'token': 'tk12345',
}
self.conn = mock.MagicMock(**conn_attrs)
- self.options = mock.MagicMock(human=False, verbose=1)
+ self.options = {'human': False, 'verbose': 1}
self.stdout = StringIO()
self.stderr = StringIO()
- self.thread_manager = MultiThreadingManager(self.stdout, self.stderr)
+ self.output_manager = OutputManager(self.stdout, self.stderr)
def assertOut(self, expected):
real = self.stdout.getvalue()
@@ -57,7 +57,7 @@ class TestStatHelpers(testtools.TestCase):
raise
def test_stat_account_human(self):
- self.options.human = True
+ self.options['human'] = True
# stub head_account
stub_headers = {
'x-account-container-count': 42,
@@ -66,8 +66,9 @@ class TestStatHelpers(testtools.TestCase):
}
self.conn.head_account.return_value = stub_headers
- with self.thread_manager as thread_manager:
- h.stat_account(self.conn, self.options, thread_manager)
+ with self.output_manager as output_manager:
+ items, headers = h.stat_account(self.conn, self.options)
+ h.print_account_stats(items, headers, output_manager)
expected = """
Account: a
Containers: 42
@@ -77,7 +78,7 @@ Containers: 42
self.assertOut(expected)
def test_stat_account_verbose(self):
- self.options.verbose += 1
+ self.options['verbose'] += 1
# stub head_account
stub_headers = {
'x-account-container-count': 42,
@@ -86,8 +87,9 @@ Containers: 42
}
self.conn.head_account.return_value = stub_headers
- with self.thread_manager as thread_manager:
- h.stat_account(self.conn, self.options, thread_manager)
+ with self.output_manager as output_manager:
+ items, headers = h.stat_account(self.conn, self.options)
+ h.print_account_stats(items, headers, output_manager)
expected = """
StorageURL: http://storage/v1/a
Auth Token: tk12345
@@ -109,8 +111,9 @@ Containers: 42
}
self.conn.head_account.return_value = stub_headers
- with self.thread_manager as thread_manager:
- h.stat_account(self.conn, self.options, thread_manager)
+ with self.output_manager as output_manager:
+ items, headers = h.stat_account(self.conn, self.options)
+ h.print_account_stats(items, headers, output_manager)
expected = """
Account: a
Containers: 42
@@ -122,7 +125,7 @@ Objects in policy "nada": 1000000
self.assertOut(expected)
def test_stat_container_human(self):
- self.options.human = True
+ self.options['human'] = True
# stub head container request
stub_headers = {
'x-container-object-count': 10 ** 6,
@@ -130,22 +133,23 @@ Objects in policy "nada": 1000000
}
self.conn.head_container.return_value = stub_headers
args = ('c',)
- with self.thread_manager as thread_manager:
- h.stat_container(self.conn, self.options, args, thread_manager)
+ with self.output_manager as output_manager:
+ items, headers = h.stat_container(self.conn, self.options, *args)
+ h.print_container_stats(items, headers, output_manager)
expected = """
- Account: a
- Container: c
- Objects: 976K
- Bytes: 1.0G
- Read ACL:
- Write ACL:
- Sync To:
- Sync Key:
+ Account: a
+Container: c
+ Objects: 976K
+ Bytes: 1.0G
+ Read ACL:
+Write ACL:
+ Sync To:
+ Sync Key:
"""
self.assertOut(expected)
def test_stat_container_verbose(self):
- self.options.verbose += 1
+ self.options['verbose'] += 1
# stub head container request
stub_headers = {
'x-container-object-count': 10 ** 6,
@@ -153,24 +157,25 @@ Objects in policy "nada": 1000000
}
self.conn.head_container.return_value = stub_headers
args = ('c',)
- with self.thread_manager as thread_manager:
- h.stat_container(self.conn, self.options, args, thread_manager)
+ with self.output_manager as output_manager:
+ items, headers = h.stat_container(self.conn, self.options, *args)
+ h.print_container_stats(items, headers, output_manager)
expected = """
- URL: http://storage/v1/a/c
- Auth Token: tk12345
- Account: a
- Container: c
- Objects: 1000000
- Bytes: 1073741824
- Read ACL:
- Write ACL:
- Sync To:
- Sync Key:
+ URL: http://storage/v1/a/c
+Auth Token: tk12345
+ Account: a
+ Container: c
+ Objects: 1000000
+ Bytes: 1073741824
+ Read ACL:
+ Write ACL:
+ Sync To:
+ Sync Key:
"""
self.assertOut(expected)
def test_stat_object_human(self):
- self.options.human = True
+ self.options['human'] = True
# stub head object request
stub_headers = {
'content-length': 2 ** 20,
@@ -180,21 +185,22 @@ Objects in policy "nada": 1000000
}
self.conn.head_object.return_value = stub_headers
args = ('c', 'o')
- with self.thread_manager as thread_manager:
- h.stat_object(self.conn, self.options, args, thread_manager)
+ with self.output_manager as output_manager:
+ items, headers = h.stat_object(self.conn, self.options, *args)
+ h.print_object_stats(items, headers, output_manager)
expected = """
- Account: a
- Container: c
- Object: o
-Content Length: 1.0M
- ETag: 68b329da9893e34099c7d8ad5cb9c940
- Meta Color: blue
+ Account: a
+ Container: c
+ Object: o
+ Content Length: 1.0M
+ ETag: 68b329da9893e34099c7d8ad5cb9c940
+ Meta Color: blue
Content-Encoding: gzip
"""
self.assertOut(expected)
def test_stat_object_verbose(self):
- self.options.verbose += 1
+ self.options['verbose'] += 1
# stub head object request
stub_headers = {
'content-length': 2 ** 20,
@@ -204,17 +210,18 @@ Content-Encoding: gzip
}
self.conn.head_object.return_value = stub_headers
args = ('c', 'o')
- with self.thread_manager as thread_manager:
- h.stat_object(self.conn, self.options, args, thread_manager)
+ with self.output_manager as output_manager:
+ items, headers = h.stat_object(self.conn, self.options, *args)
+ h.print_object_stats(items, headers, output_manager)
expected = """
- URL: http://storage/v1/a/c/o
- Auth Token: tk12345
- Account: a
- Container: c
- Object: o
-Content Length: 1048576
- ETag: 68b329da9893e34099c7d8ad5cb9c940
- Meta Color: blue
+ URL: http://storage/v1/a/c/o
+ Auth Token: tk12345
+ Account: a
+ Container: c
+ Object: o
+ Content Length: 1048576
+ ETag: 68b329da9893e34099c7d8ad5cb9c940
+ Meta Color: blue
Content-Encoding: gzip
"""
self.assertOut(expected)
diff --git a/tests/unit/test_multithreading.py b/tests/unit/test_multithreading.py
index 1df0d4f..cd2e9a6 100644
--- a/tests/unit/test_multithreading.py
+++ b/tests/unit/test_multithreading.py
@@ -14,41 +14,40 @@
# limitations under the License.
import sys
-import time
-
-try:
- from unittest import mock
-except ImportError:
- import mock
-
import testtools
import threading
import six
+
+from concurrent.futures import as_completed
from six.moves.queue import Queue, Empty
+from time import sleep
from swiftclient import multithreading as mt
-from swiftclient.exceptions import ClientException
class ThreadTestCase(testtools.TestCase):
def setUp(self):
super(ThreadTestCase, self).setUp()
+ self.got_items = Queue()
self.got_args_kwargs = Queue()
self.starting_thread_count = threading.active_count()
- def _func(self, q_item, *args, **kwargs):
- self.got_items.put(q_item)
+ def _func(self, conn, item, *args, **kwargs):
+ self.got_items.put((conn, item))
self.got_args_kwargs.put((args, kwargs))
- if q_item == 'go boom':
+ if item == 'sleep':
+ sleep(1)
+ if item == 'go boom':
raise Exception('I went boom!')
- if q_item == 'c boom':
- raise ClientException(
- 'Client Boom', http_scheme='http', http_host='192.168.22.1',
- http_port=80, http_path='/booze', http_status=404,
- http_reason='to much', http_response_content='no sir!')
- return 'best result EVAR!'
+ return 'success'
+
+ def _create_conn(self):
+ return "This is a connection"
+
+ def _create_conn_fail(self):
+ raise Exception("This is a failed connection")
def assertQueueContains(self, queue, expected_contents):
got_contents = []
@@ -62,240 +61,125 @@ class ThreadTestCase(testtools.TestCase):
self.assertEqual(expected_contents, got_contents)
-class TestQueueFunctionThread(ThreadTestCase):
+class TestConnectionThreadPoolExecutor(ThreadTestCase):
def setUp(self):
- super(TestQueueFunctionThread, self).setUp()
-
+ super(TestConnectionThreadPoolExecutor, self).setUp()
self.input_queue = Queue()
- self.got_items = Queue()
self.stored_results = []
- self.qft = mt.QueueFunctionThread(self.input_queue, self._func,
- 'one_arg', 'two_arg',
- red_fish='blue_arg',
- store_results=self.stored_results)
- self.qft.start()
-
def tearDown(self):
- if self.qft.is_alive():
- self.finish_up_thread()
-
- super(TestQueueFunctionThread, self).tearDown()
-
- def finish_up_thread(self):
- self.input_queue.put(mt.StopWorkerThreadSignal())
- while self.qft.is_alive():
- time.sleep(0.05)
-
- def test_plumbing_and_store_results(self):
- self.input_queue.put('abc')
- self.input_queue.put(123)
- self.finish_up_thread()
-
- self.assertQueueContains(self.got_items, ['abc', 123])
- self.assertQueueContains(self.got_args_kwargs, [
- (('one_arg', 'two_arg'), {'red_fish': 'blue_arg'}),
- (('one_arg', 'two_arg'), {'red_fish': 'blue_arg'})])
- self.assertEqual(self.stored_results,
- ['best result EVAR!', 'best result EVAR!'])
-
- def test_exception_handling(self):
- self.input_queue.put('go boom')
- self.input_queue.put('ok')
- self.input_queue.put('go boom')
- self.finish_up_thread()
-
- self.assertQueueContains(self.got_items,
- ['go boom', 'ok', 'go boom'])
- self.assertEqual(len(self.qft.exc_infos), 2)
- self.assertEqual(Exception, self.qft.exc_infos[0][0])
- self.assertEqual(Exception, self.qft.exc_infos[1][0])
- self.assertEqual(('I went boom!',), self.qft.exc_infos[0][1].args)
- self.assertEqual(('I went boom!',), self.qft.exc_infos[1][1].args)
-
-
-class TestQueueFunctionManager(ThreadTestCase):
- def setUp(self):
- super(TestQueueFunctionManager, self).setUp()
- self.thread_manager = mock.create_autospec(
- mt.MultiThreadingManager, spec_set=True, instance=True)
- self.thread_count = 4
- self.error_counter = [0]
- self.got_items = Queue()
- self.stored_results = []
- self.qfq = mt.QueueFunctionManager(
- self._func, self.thread_count, self.thread_manager,
- thread_args=('1arg', '2arg'),
- thread_kwargs={'a': 'b', 'store_results': self.stored_results},
- error_counter=self.error_counter,
- connection_maker=self.connection_maker)
-
- def connection_maker(self):
- return 'yup, I made a connection'
-
- def test_context_manager_without_error_counter(self):
- self.qfq = mt.QueueFunctionManager(
- self._func, self.thread_count, self.thread_manager,
- thread_args=('1arg', '2arg'),
- thread_kwargs={'a': 'b', 'store_results': self.stored_results},
- connection_maker=self.connection_maker)
-
- with self.qfq as input_queue:
- self.assertEqual(self.starting_thread_count + self.thread_count,
- threading.active_count())
- input_queue.put('go boom')
-
- self.assertEqual(self.starting_thread_count, threading.active_count())
- error_strs = list(map(str, self.thread_manager.error.call_args_list))
- self.assertEqual(1, len(error_strs))
- self.assertTrue('Exception: I went boom!' in error_strs[0])
-
- def test_context_manager_without_conn_maker_or_error_counter(self):
- self.qfq = mt.QueueFunctionManager(
- self._func, self.thread_count, self.thread_manager,
- thread_args=('1arg', '2arg'), thread_kwargs={'a': 'b'})
-
- with self.qfq as input_queue:
- self.assertEqual(self.starting_thread_count + self.thread_count,
- threading.active_count())
- for i in range(20):
- input_queue.put('slap%d' % i)
-
- self.assertEqual(self.starting_thread_count, threading.active_count())
- self.assertEqual([], self.thread_manager.error.call_args_list)
- self.assertEqual(0, self.error_counter[0])
- self.assertQueueContains(self.got_items,
- set(['slap%d' % i for i in range(20)]))
- self.assertQueueContains(
- self.got_args_kwargs,
- [(('1arg', '2arg'), {'a': 'b'})] * 20)
- self.assertEqual(self.stored_results, [])
-
- def test_context_manager_with_exceptions(self):
- with self.qfq as input_queue:
- self.assertEqual(self.starting_thread_count + self.thread_count,
- threading.active_count())
- for i in range(20):
- input_queue.put('item%d' % i if i % 2 == 0 else 'go boom')
-
- self.assertEqual(self.starting_thread_count, threading.active_count())
- error_strs = list(map(str, self.thread_manager.error.call_args_list))
- self.assertEqual(10, len(error_strs))
- self.assertTrue(all(['Exception: I went boom!' in s for s in
- error_strs]))
- self.assertEqual(10, self.error_counter[0])
- expected_items = set(['go boom'] +
- ['item%d' % i for i in range(20)
- if i % 2 == 0])
- self.assertQueueContains(self.got_items, expected_items)
- self.assertQueueContains(
- self.got_args_kwargs,
- [(('yup, I made a connection', '1arg', '2arg'), {'a': 'b'})] * 20)
- self.assertEqual(self.stored_results, ['best result EVAR!'] * 10)
-
- def test_context_manager_with_client_exceptions(self):
- with self.qfq as input_queue:
- self.assertEqual(self.starting_thread_count + self.thread_count,
- threading.active_count())
- for i in range(20):
- input_queue.put('item%d' % i if i % 2 == 0 else 'c boom')
-
- self.assertEqual(self.starting_thread_count, threading.active_count())
- error_strs = list(map(str, self.thread_manager.error.call_args_list))
- self.assertEqual(10, len(error_strs))
- stringification = 'Client Boom: ' \
- 'http://192.168.22.1:80/booze 404 to much no sir!'
- self.assertTrue(all([stringification in s for s in error_strs]))
- self.assertEqual(10, self.error_counter[0])
- expected_items = set(['c boom'] +
- ['item%d' % i for i in range(20)
- if i % 2 == 0])
- self.assertQueueContains(self.got_items, expected_items)
- self.assertQueueContains(
- self.got_args_kwargs,
- [(('yup, I made a connection', '1arg', '2arg'), {'a': 'b'})] * 20)
- self.assertEqual(self.stored_results, ['best result EVAR!'] * 10)
-
- def test_context_manager_with_connection_maker(self):
- with self.qfq as input_queue:
- self.assertEqual(self.starting_thread_count + self.thread_count,
- threading.active_count())
- for i in range(20):
- input_queue.put('item%d' % i)
-
- self.assertEqual(self.starting_thread_count, threading.active_count())
- self.assertEqual([], self.thread_manager.error.call_args_list)
- self.assertEqual(0, self.error_counter[0])
- self.assertQueueContains(self.got_items,
- set(['item%d' % i for i in range(20)]))
- self.assertQueueContains(
- self.got_args_kwargs,
- [(('yup, I made a connection', '1arg', '2arg'), {'a': 'b'})] * 20)
- self.assertEqual(self.stored_results, ['best result EVAR!'] * 20)
-
-
-class TestMultiThreadingManager(ThreadTestCase):
-
- @mock.patch('swiftclient.multithreading.QueueFunctionManager')
- def test_instantiation(self, mock_qfq):
- thread_manager = mt.MultiThreadingManager()
-
- self.assertEqual([
- mock.call(thread_manager._print, 1, thread_manager),
- mock.call(thread_manager._print_error, 1, thread_manager),
- ], mock_qfq.call_args_list)
-
- # These contexts don't get entered into until the
- # MultiThreadingManager's context is entered.
- self.assertEqual([], thread_manager.printer.__enter__.call_args_list)
- self.assertEqual([],
- thread_manager.error_printer.__enter__.call_args_list)
-
- # Test default values for the streams.
- self.assertEqual(sys.stdout, thread_manager.print_stream)
- self.assertEqual(sys.stderr, thread_manager.error_stream)
-
- @mock.patch('swiftclient.multithreading.QueueFunctionManager')
- def test_queue_manager_no_args(self, mock_qfq):
- thread_manager = mt.MultiThreadingManager()
-
- mock_qfq.reset_mock()
- mock_qfq.return_value = 'slap happy!'
-
- self.assertEqual(
- 'slap happy!',
- thread_manager.queue_manager(self._func, 88))
-
- self.assertEqual([
- mock.call(self._func, 88, thread_manager, thread_args=(),
- thread_kwargs={}, connection_maker=None,
- error_counter=None)
- ], mock_qfq.call_args_list)
-
- @mock.patch('swiftclient.multithreading.QueueFunctionManager')
- def test_queue_manager_with_args(self, mock_qfq):
- thread_manager = mt.MultiThreadingManager()
-
- mock_qfq.reset_mock()
- mock_qfq.return_value = 'do run run'
-
- self.assertEqual(
- 'do run run',
- thread_manager.queue_manager(self._func, 88, 'fun', times='are',
- connection_maker='abc', to='be had',
- error_counter='def'))
-
- self.assertEqual([
- mock.call(self._func, 88, thread_manager, thread_args=('fun',),
- thread_kwargs={'times': 'are', 'to': 'be had'},
- connection_maker='abc', error_counter='def')
- ], mock_qfq.call_args_list)
+ super(TestConnectionThreadPoolExecutor, self).tearDown()
+
+ def test_submit_good_connection(self):
+ ctpe = mt.ConnectionThreadPoolExecutor(self._create_conn, 1)
+ with ctpe as pool:
+ # Try submitting a job that should succeed
+ f = pool.submit(self._func, "succeed")
+ f.result()
+ self.assertQueueContains(
+ self.got_items,
+ [("This is a connection", "succeed")]
+ )
+
+ # Now a job that fails
+ went_boom = False
+ try:
+ f = pool.submit(self._func, "go boom")
+ f.result()
+ except Exception as e:
+ went_boom = True
+ self.assertEquals('I went boom!', str(e))
+ self.assertTrue(went_boom)
+
+ # Has the connection been returned to the pool?
+ f = pool.submit(self._func, "succeed")
+ f.result()
+ self.assertQueueContains(
+ self.got_items,
+ [
+ ("This is a connection", "go boom"),
+ ("This is a connection", "succeed")
+ ]
+ )
+
+ def test_submit_bad_connection(self):
+ ctpe = mt.ConnectionThreadPoolExecutor(self._create_conn_fail, 1)
+ with ctpe as pool:
+ # Now a connection that fails
+ connection_failed = False
+ try:
+ f = pool.submit(self._func, "succeed")
+ f.result()
+ except Exception as e:
+ connection_failed = True
+ self.assertEquals('This is a failed connection', str(e))
+ self.assertTrue(connection_failed)
+
+ # Make sure we don't lock up on failed connections
+ connection_failed = False
+ try:
+ f = pool.submit(self._func, "go boom")
+ f.result()
+ except Exception as e:
+ connection_failed = True
+ self.assertEquals('This is a failed connection', str(e))
+ self.assertTrue(connection_failed)
+
+ def test_lazy_connections(self):
+ ctpe = mt.ConnectionThreadPoolExecutor(self._create_conn, 10)
+ with ctpe as pool:
+ # Submit multiple jobs sequentially - should only use 1 conn
+ f = pool.submit(self._func, "succeed")
+ f.result()
+ f = pool.submit(self._func, "succeed")
+ f.result()
+ f = pool.submit(self._func, "succeed")
+ f.result()
+
+ expected_connections = [(0, "This is a connection")]
+ expected_connections.extend([(x, None) for x in range(1, 10)])
+
+ self.assertQueueContains(
+ pool._connections, expected_connections
+ )
+
+ ctpe = mt.ConnectionThreadPoolExecutor(self._create_conn, 10)
+ with ctpe as pool:
+ fs = []
+ f1 = pool.submit(self._func, "sleep")
+ f2 = pool.submit(self._func, "sleep")
+ f3 = pool.submit(self._func, "sleep")
+ fs.extend([f1, f2, f3])
+
+ expected_connections = [
+ (0, "This is a connection"),
+ (1, "This is a connection"),
+ (2, "This is a connection")
+ ]
+ expected_connections.extend([(x, None) for x in range(3, 10)])
+
+ for f in as_completed(fs):
+ f.result()
+
+ self.assertQueueContains(
+ pool._connections, expected_connections
+ )
+
+
+class TestOutputManager(testtools.TestCase):
+
+ def test_instantiation(self):
+ output_manager = mt.OutputManager()
+
+ self.assertEqual(sys.stdout, output_manager.print_stream)
+ self.assertEqual(sys.stderr, output_manager.error_stream)
def test_printers(self):
out_stream = six.StringIO()
err_stream = six.StringIO()
+ starting_thread_count = threading.active_count()
- with mt.MultiThreadingManager(
+ with mt.OutputManager(
print_stream=out_stream,
error_stream=err_stream) as thread_manager:
@@ -304,7 +188,8 @@ class TestMultiThreadingManager(ThreadTestCase):
self.assertEqual(out_stream, thread_manager.print_stream)
self.assertEqual(err_stream, thread_manager.error_stream)
- self.assertEqual(self.starting_thread_count + 2,
+ # No printing has happened yet, so no new threads
+ self.assertEqual(starting_thread_count,
threading.active_count())
thread_manager.print_msg('one-argument')
@@ -317,7 +202,13 @@ class TestMultiThreadingManager(ThreadTestCase):
thread_manager.error('Sometimes\n%.1f%% just\ndoes not\nwork!',
3.14159)
- self.assertEqual(self.starting_thread_count, threading.active_count())
+ # Now we have a thread for error printing and a thread for
+ # normal print messages
+ self.assertEqual(starting_thread_count + 2,
+ threading.active_count())
+
+ # The threads should have been cleaned up
+ self.assertEqual(starting_thread_count, threading.active_count())
out_stream.seek(0)
if six.PY3:
diff --git a/tests/unit/test_shell.py b/tests/unit/test_shell.py
index 96f6d1d..80d63d9 100644
--- a/tests/unit/test_shell.py
+++ b/tests/unit/test_shell.py
@@ -24,6 +24,7 @@ import swiftclient
import swiftclient.shell
import swiftclient.utils
+from os.path import basename, dirname
if six.PY2:
BUILTIN_OPEN = '__builtin__.open'
@@ -50,8 +51,8 @@ class TestShell(unittest.TestCase):
except OSError:
pass
- @mock.patch('swiftclient.shell.MultiThreadingManager._print')
- @mock.patch('swiftclient.shell.Connection')
+ @mock.patch('swiftclient.shell.OutputManager._print')
+ @mock.patch('swiftclient.service.Connection')
def test_stat_account(self, connection, mock_print):
argv = ["", "stat"]
return_headers = {
@@ -66,12 +67,11 @@ class TestShell(unittest.TestCase):
calls = [mock.call(' Account: AUTH_account\n' +
'Containers: 1\n' +
' Objects: 2\n' +
- ' Bytes: 3'),
- ]
+ ' Bytes: 3')]
mock_print.assert_has_calls(calls)
- @mock.patch('swiftclient.shell.MultiThreadingManager._print')
- @mock.patch('swiftclient.shell.Connection')
+ @mock.patch('swiftclient.shell.OutputManager._print')
+ @mock.patch('swiftclient.service.Connection')
def test_stat_container(self, connection, mock_print):
return_headers = {
'x-container-object-count': '1',
@@ -85,19 +85,18 @@ class TestShell(unittest.TestCase):
connection.return_value.head_container.return_value = return_headers
connection.return_value.url = 'http://127.0.0.1/v1/AUTH_account'
swiftclient.shell.main(argv)
- calls = [mock.call(' Account: AUTH_account\n' +
- ' Container: container\n' +
- ' Objects: 1\n' +
- ' Bytes: 2\n' +
- ' Read ACL: test2:tester2\n' +
- ' Write ACL: test3:tester3\n' +
- ' Sync To: other\n' +
- ' Sync Key: secret'),
- mock.call('')]
+ calls = [mock.call(' Account: AUTH_account\n' +
+ 'Container: container\n' +
+ ' Objects: 1\n' +
+ ' Bytes: 2\n' +
+ ' Read ACL: test2:tester2\n' +
+ 'Write ACL: test3:tester3\n' +
+ ' Sync To: other\n' +
+ ' Sync Key: secret')]
mock_print.assert_has_calls(calls)
- @mock.patch('swiftclient.shell.MultiThreadingManager._print')
- @mock.patch('swiftclient.shell.Connection')
+ @mock.patch('swiftclient.shell.OutputManager._print')
+ @mock.patch('swiftclient.service.Connection')
def test_stat_object(self, connection, mock_print):
return_headers = {
'x-object-manifest': 'manifest',
@@ -117,12 +116,11 @@ class TestShell(unittest.TestCase):
'Content Length: 42\n' +
' Last Modified: yesterday\n' +
' ETag: md5\n' +
- ' Manifest: manifest'),
- mock.call('')]
+ ' Manifest: manifest')]
mock_print.assert_has_calls(calls)
- @mock.patch('swiftclient.shell.MultiThreadingManager._print')
- @mock.patch('swiftclient.shell.Connection')
+ @mock.patch('swiftclient.shell.OutputManager._print')
+ @mock.patch('swiftclient.service.Connection')
def test_list_account(self, connection, mock_print):
# Test account listing
connection.return_value.get_account.side_effect = [
@@ -138,8 +136,8 @@ class TestShell(unittest.TestCase):
calls = [mock.call('container')]
mock_print.assert_has_calls(calls)
- @mock.patch('swiftclient.shell.MultiThreadingManager._print')
- @mock.patch('swiftclient.shell.Connection')
+ @mock.patch('swiftclient.shell.OutputManager._print')
+ @mock.patch('swiftclient.service.Connection')
def test_list_container(self, connection, mock_print):
connection.return_value.get_container.side_effect = [
[None, [{'name': 'object_a'}]],
@@ -173,8 +171,8 @@ class TestShell(unittest.TestCase):
mock.call(' 0')]
mock_print.assert_has_calls(calls)
- @mock.patch('swiftclient.shell.makedirs')
- @mock.patch('swiftclient.shell.Connection')
+ @mock.patch('swiftclient.service.makedirs')
+ @mock.patch('swiftclient.service.Connection')
def test_download(self, connection, makedirs):
connection.return_value.get_object.return_value = [
{'content-type': 'text/plain',
@@ -194,9 +192,11 @@ class TestShell(unittest.TestCase):
argv = ["", "download", "container"]
swiftclient.shell.main(argv)
calls = [mock.call('container', 'object',
- headers={}, resp_chunk_size=65536),
+ headers={}, resp_chunk_size=65536,
+ response_dict={}),
mock.call('container', 'pseudo/',
- headers={}, resp_chunk_size=65536)]
+ headers={}, resp_chunk_size=65536,
+ response_dict={})]
connection.return_value.get_object.assert_has_calls(
calls, any_order=True)
mock_open.assert_called_once_with('object', 'wb')
@@ -206,12 +206,13 @@ class TestShell(unittest.TestCase):
argv = ["", "download", "container", "object"]
swiftclient.shell.main(argv)
connection.return_value.get_object.assert_called_with(
- 'container', 'object', headers={}, resp_chunk_size=65536)
+ 'container', 'object', headers={}, resp_chunk_size=65536,
+ response_dict={})
mock_open.assert_called_with('object', 'wb')
- @mock.patch('swiftclient.shell.listdir')
- @mock.patch('swiftclient.shell.Connection')
- def test_upload(self, connection, listdir):
+ @mock.patch('swiftclient.shell.walk')
+ @mock.patch('swiftclient.service.Connection')
+ def test_upload(self, connection, walk):
connection.return_value.head_object.return_value = {
'content-length': '0'}
connection.return_value.attempts = 0
@@ -220,7 +221,8 @@ class TestShell(unittest.TestCase):
swiftclient.shell.main(argv)
connection.return_value.put_container.assert_called_with(
'container',
- {'X-Storage-Policy': mock.ANY})
+ {'X-Storage-Policy': mock.ANY},
+ response_dict={})
connection.return_value.put_object.assert_called_with(
'container',
@@ -228,18 +230,23 @@ class TestShell(unittest.TestCase):
mock.ANY,
content_length=0,
headers={'x-object-meta-mtime': mock.ANY,
- 'X-Storage-Policy': 'one'})
+ 'X-Storage-Policy': 'one'},
+ response_dict={})
# Upload whole directory
argv = ["", "upload", "container", "/tmp"]
- listdir.return_value = [self.tmpfile]
+ _tmpfile = self.tmpfile
+ _tmpfile_dir = dirname(_tmpfile)
+ _tmpfile_base = basename(_tmpfile)
+ walk.return_value = [(_tmpfile_dir, [], [_tmpfile_base])]
swiftclient.shell.main(argv)
connection.return_value.put_object.assert_called_with(
'container',
self.tmpfile.lstrip('/'),
mock.ANY,
content_length=0,
- headers={'x-object-meta-mtime': mock.ANY})
+ headers={'x-object-meta-mtime': mock.ANY},
+ response_dict={})
# Upload in segments
connection.return_value.head_container.return_value = {
@@ -250,16 +257,18 @@ class TestShell(unittest.TestCase):
swiftclient.shell.main(argv)
connection.return_value.put_container.assert_called_with(
'container_segments',
- {'X-Storage-Policy': mock.ANY})
+ {'X-Storage-Policy': mock.ANY},
+ response_dict={})
connection.return_value.put_object.assert_called_with(
'container',
self.tmpfile.lstrip('/'),
'',
content_length=0,
headers={'x-object-manifest': mock.ANY,
- 'x-object-meta-mtime': mock.ANY})
+ 'x-object-meta-mtime': mock.ANY},
+ response_dict={})
- @mock.patch('swiftclient.shell.Connection')
+ @mock.patch('swiftclient.service.Connection')
def test_delete_account(self, connection):
connection.return_value.get_account.side_effect = [
[None, [{'name': 'container'}]],
@@ -274,11 +283,11 @@ class TestShell(unittest.TestCase):
connection.return_value.head_object.return_value = {}
swiftclient.shell.main(argv)
connection.return_value.delete_container.assert_called_with(
- 'container')
+ 'container', response_dict={})
connection.return_value.delete_object.assert_called_with(
- 'container', 'object', query_string=None)
+ 'container', 'object', query_string=None, response_dict={})
- @mock.patch('swiftclient.shell.Connection')
+ @mock.patch('swiftclient.service.Connection')
def test_delete_container(self, connection):
connection.return_value.get_container.side_effect = [
[None, [{'name': 'object'}]],
@@ -289,34 +298,34 @@ class TestShell(unittest.TestCase):
connection.return_value.head_object.return_value = {}
swiftclient.shell.main(argv)
connection.return_value.delete_container.assert_called_with(
- 'container')
+ 'container', response_dict={})
connection.return_value.delete_object.assert_called_with(
- 'container', 'object', query_string=None)
+ 'container', 'object', query_string=None, response_dict={})
- @mock.patch('swiftclient.shell.Connection')
+ @mock.patch('swiftclient.service.Connection')
def test_delete_object(self, connection):
argv = ["", "delete", "container", "object"]
connection.return_value.head_object.return_value = {}
connection.return_value.attempts = 0
swiftclient.shell.main(argv)
connection.return_value.delete_object.assert_called_with(
- 'container', 'object', query_string=None)
+ 'container', 'object', query_string=None, response_dict={})
- @mock.patch('swiftclient.shell.Connection')
+ @mock.patch('swiftclient.service.Connection')
def test_post_account(self, connection):
argv = ["", "post"]
connection.return_value.head_object.return_value = {}
swiftclient.shell.main(argv)
connection.return_value.post_account.assert_called_with(
- headers={})
+ headers={}, response_dict={})
argv = ["", "post", "container"]
connection.return_value.head_object.return_value = {}
swiftclient.shell.main(argv)
connection.return_value.post_container.assert_called_with(
- 'container', headers={})
+ 'container', headers={}, response_dict={})
- @mock.patch('swiftclient.shell.Connection')
+ @mock.patch('swiftclient.service.Connection')
def test_post_container(self, connection):
argv = ["", "post", "container",
"--read-acl", "test2:tester2",
@@ -331,9 +340,9 @@ class TestShell(unittest.TestCase):
'X-Container-Write': 'test3:tester3 test4',
'X-Container-Read': 'test2:tester2',
'X-Container-Sync-Key': 'secret',
- 'X-Container-Sync-To': 'othersite'})
+ 'X-Container-Sync-To': 'othersite'}, response_dict={})
- @mock.patch('swiftclient.shell.Connection')
+ @mock.patch('swiftclient.service.Connection')
def test_post_object(self, connection):
argv = ["", "post", "container", "object",
"--meta", "Color:Blue",
@@ -344,7 +353,7 @@ class TestShell(unittest.TestCase):
connection.return_value.post_object.assert_called_with(
'container', 'object', headers={
'Content-Type': 'text/plain',
- 'X-Object-Meta-Color': 'Blue'})
+ 'X-Object-Meta-Color': 'Blue'}, response_dict={})
@mock.patch('swiftclient.shell.generate_temp_url')
def test_temp_url(self, temp_url):
@@ -356,7 +365,7 @@ class TestShell(unittest.TestCase):
temp_url.assert_called_with(
'/v1/AUTH_account/c/o', 60, 'secret_key', 'GET')
- @mock.patch('swiftclient.shell.Connection')
+ @mock.patch('swiftclient.service.Connection')
def test_capabilities(self, connection):
argv = ["", "capabilities"]
connection.return_value.get_capabilities.return_value = {'swift': None}