diff options
Diffstat (limited to 'swift')
28 files changed, 4145 insertions, 496 deletions
diff --git a/swift/account/reaper.py b/swift/account/reaper.py index ce69fab92..06a008535 100644 --- a/swift/account/reaper.py +++ b/swift/account/reaper.py @@ -19,6 +19,7 @@ from swift import gettext_ as _ from logging import DEBUG from math import sqrt from time import time +import itertools from eventlet import GreenPool, sleep, Timeout @@ -432,7 +433,7 @@ class AccountReaper(Daemon): * See also: :func:`swift.common.ring.Ring.get_nodes` for a description of the container node dicts. """ - container_nodes = list(container_nodes) + cnodes = itertools.cycle(container_nodes) try: ring = self.get_object_ring(policy_index) except PolicyError: @@ -443,7 +444,7 @@ class AccountReaper(Daemon): successes = 0 failures = 0 for node in nodes: - cnode = container_nodes.pop() + cnode = next(cnodes) try: direct_delete_object( node, part, account, container, obj, diff --git a/swift/cli/info.py b/swift/cli/info.py index 142b103f4..a8cfabd17 100644 --- a/swift/cli/info.py +++ b/swift/cli/info.py @@ -24,7 +24,7 @@ from swift.common.request_helpers import is_sys_meta, is_user_meta, \ from swift.account.backend import AccountBroker, DATADIR as ABDATADIR from swift.container.backend import ContainerBroker, DATADIR as CBDATADIR from swift.obj.diskfile import get_data_dir, read_metadata, DATADIR_BASE, \ - extract_policy_index + extract_policy from swift.common.storage_policy import POLICIES @@ -341,10 +341,7 @@ def print_obj(datafile, check_etag=True, swift_dir='/etc/swift', datadir = DATADIR_BASE # try to extract policy index from datafile disk path - try: - policy_index = extract_policy_index(datafile) - except ValueError: - pass + policy_index = int(extract_policy(datafile) or POLICIES.legacy) try: if policy_index: diff --git a/swift/common/constraints.py b/swift/common/constraints.py index d4458ddf8..8e3ba53b0 100644 --- a/swift/common/constraints.py +++ b/swift/common/constraints.py @@ -204,6 +204,19 @@ def check_object_creation(req, object_name): return check_metadata(req, 'object') +def check_dir(root, drive): + """ + Verify that the path to the device is a directory and is a lesser + constraint that is enforced when a full mount_check isn't possible + with, for instance, a VM using loopback or partitions. + + :param root: base path where the dir is + :param drive: drive name to be checked + :returns: True if it is a valid directoy, False otherwise + """ + return os.path.isdir(os.path.join(root, drive)) + + def check_mount(root, drive): """ Verify that the path to the device is a mount point and mounted. This diff --git a/swift/common/exceptions.py b/swift/common/exceptions.py index d7ea759d6..b4c926eb1 100644 --- a/swift/common/exceptions.py +++ b/swift/common/exceptions.py @@ -31,10 +31,32 @@ class SwiftException(Exception): pass +class PutterConnectError(Exception): + + def __init__(self, status=None): + self.status = status + + class InvalidTimestamp(SwiftException): pass +class InsufficientStorage(SwiftException): + pass + + +class FooterNotSupported(SwiftException): + pass + + +class MultiphasePUTNotSupported(SwiftException): + pass + + +class SuffixSyncError(SwiftException): + pass + + class DiskFileError(SwiftException): pass @@ -103,6 +125,10 @@ class ConnectionTimeout(Timeout): pass +class ResponseTimeout(Timeout): + pass + + class DriveNotMounted(SwiftException): pass diff --git a/swift/common/manager.py b/swift/common/manager.py index a4ef350da..260516d6a 100644 --- a/swift/common/manager.py +++ b/swift/common/manager.py @@ -33,7 +33,8 @@ ALL_SERVERS = ['account-auditor', 'account-server', 'container-auditor', 'container-replicator', 'container-reconciler', 'container-server', 'container-sync', 'container-updater', 'object-auditor', 'object-server', - 'object-expirer', 'object-replicator', 'object-updater', + 'object-expirer', 'object-replicator', + 'object-reconstructor', 'object-updater', 'proxy-server', 'account-replicator', 'account-reaper'] MAIN_SERVERS = ['proxy-server', 'account-server', 'container-server', 'object-server'] diff --git a/swift/common/middleware/formpost.py b/swift/common/middleware/formpost.py index 7132b342a..56a6d20f3 100644 --- a/swift/common/middleware/formpost.py +++ b/swift/common/middleware/formpost.py @@ -218,7 +218,14 @@ class FormPost(object): env, attrs['boundary']) start_response(status, headers) return [body] - except (FormInvalid, MimeInvalid, EOFError) as err: + except MimeInvalid: + body = 'FormPost: invalid starting boundary' + start_response( + '400 Bad Request', + (('Content-Type', 'text/plain'), + ('Content-Length', str(len(body))))) + return [body] + except (FormInvalid, EOFError) as err: body = 'FormPost: %s' % err start_response( '400 Bad Request', diff --git a/swift/common/request_helpers.py b/swift/common/request_helpers.py index 08e0ab5dc..14b9fd884 100644 --- a/swift/common/request_helpers.py +++ b/swift/common/request_helpers.py @@ -26,10 +26,12 @@ import time from contextlib import contextmanager from urllib import unquote from swift import gettext_ as _ +from swift.common.storage_policy import POLICIES from swift.common.constraints import FORMAT2CONTENT_TYPE from swift.common.exceptions import ListingIterError, SegmentError from swift.common.http import is_success -from swift.common.swob import HTTPBadRequest, HTTPNotAcceptable +from swift.common.swob import (HTTPBadRequest, HTTPNotAcceptable, + HTTPServiceUnavailable) from swift.common.utils import split_path, validate_device_partition from swift.common.wsgi import make_subrequest @@ -82,21 +84,27 @@ def get_listing_content_type(req): def get_name_and_placement(request, minsegs=1, maxsegs=None, rest_with_last=False): """ - Utility function to split and validate the request path and - storage_policy_index. The storage_policy_index is extracted from - the headers of the request and converted to an integer, and then the - args are passed through to :meth:`split_and_validate_path`. + Utility function to split and validate the request path and storage + policy. The storage policy index is extracted from the headers of + the request and converted to a StoragePolicy instance. The + remaining args are passed through to + :meth:`split_and_validate_path`. :returns: a list, result of :meth:`split_and_validate_path` with - storage_policy_index appended on the end - :raises: HTTPBadRequest + the BaseStoragePolicy instance appended on the end + :raises: HTTPServiceUnavailable if the path is invalid or no policy exists + with the extracted policy_index. """ - policy_idx = request.headers.get('X-Backend-Storage-Policy-Index', '0') - policy_idx = int(policy_idx) + policy_index = request.headers.get('X-Backend-Storage-Policy-Index') + policy = POLICIES.get_by_index(policy_index) + if not policy: + raise HTTPServiceUnavailable( + body=_("No policy with index %s") % policy_index, + request=request, content_type='text/plain') results = split_and_validate_path(request, minsegs=minsegs, maxsegs=maxsegs, rest_with_last=rest_with_last) - results.append(policy_idx) + results.append(policy) return results diff --git a/swift/common/ring/ring.py b/swift/common/ring/ring.py index daad23ff1..62e19951d 100644 --- a/swift/common/ring/ring.py +++ b/swift/common/ring/ring.py @@ -243,7 +243,7 @@ class Ring(object): if dev_id not in seen_ids: part_nodes.append(self.devs[dev_id]) seen_ids.add(dev_id) - return part_nodes + return [dict(node, index=i) for i, node in enumerate(part_nodes)] def get_part(self, account, container=None, obj=None): """ @@ -291,6 +291,7 @@ class Ring(object): ====== =============================================================== id unique integer identifier amongst devices + index offset into the primary node list for the partition weight a float of the relative weight of this device as compared to others; this indicates how many partitions the builder will try to assign to this device diff --git a/swift/common/storage_policy.py b/swift/common/storage_policy.py index f33eda539..e45ab018c 100644 --- a/swift/common/storage_policy.py +++ b/swift/common/storage_policy.py @@ -17,10 +17,18 @@ import string from swift.common.utils import config_true_value, SWIFT_CONF_FILE from swift.common.ring import Ring +from swift.common.utils import quorum_size +from swift.common.exceptions import RingValidationError +from pyeclib.ec_iface import ECDriver, ECDriverError, VALID_EC_TYPES LEGACY_POLICY_NAME = 'Policy-0' VALID_CHARS = '-' + string.letters + string.digits +DEFAULT_POLICY_TYPE = REPL_POLICY = 'replication' +EC_POLICY = 'erasure_coding' + +DEFAULT_EC_OBJECT_SEGMENT_SIZE = 1048576 + class PolicyError(ValueError): @@ -38,36 +46,73 @@ def _get_policy_string(base, policy_index): return return_string -def get_policy_string(base, policy_index): +def get_policy_string(base, policy_or_index): """ - Helper function to construct a string from a base and the policy - index. Used to encode the policy index into either a file name - or a directory name by various modules. + Helper function to construct a string from a base and the policy. + Used to encode the policy index into either a file name or a + directory name by various modules. :param base: the base string - :param policy_index: the storage policy index + :param policy_or_index: StoragePolicy instance, or an index + (string or int), if None the legacy + storage Policy-0 is assumed. :returns: base name with policy index added + :raises: PolicyError if no policy exists with the given policy_index """ - if POLICIES.get_by_index(policy_index) is None: - raise PolicyError("No policy with index %r" % policy_index) - return _get_policy_string(base, policy_index) + if isinstance(policy_or_index, BaseStoragePolicy): + policy = policy_or_index + else: + policy = POLICIES.get_by_index(policy_or_index) + if policy is None: + raise PolicyError("Unknown policy", index=policy_or_index) + return _get_policy_string(base, int(policy)) -class StoragePolicy(object): +def split_policy_string(policy_string): """ - Represents a storage policy. - Not meant to be instantiated directly; use - :func:`~swift.common.storage_policy.reload_storage_policies` to load - POLICIES from ``swift.conf``. + Helper function to convert a string representing a base and a + policy. Used to decode the policy from either a file name or + a directory name by various modules. + + :param policy_string: base name with policy index added + + :raises: PolicyError if given index does not map to a valid policy + :returns: a tuple, in the form (base, policy) where base is the base + string and policy is the StoragePolicy instance for the + index encoded in the policy_string. + """ + if '-' in policy_string: + base, policy_index = policy_string.rsplit('-', 1) + else: + base, policy_index = policy_string, None + policy = POLICIES.get_by_index(policy_index) + if get_policy_string(base, policy) != policy_string: + raise PolicyError("Unknown policy", index=policy_index) + return base, policy + + +class BaseStoragePolicy(object): + """ + Represents a storage policy. Not meant to be instantiated directly; + implement a derived subclasses (e.g. StoragePolicy, ECStoragePolicy, etc) + or use :func:`~swift.common.storage_policy.reload_storage_policies` to + load POLICIES from ``swift.conf``. The object_ring property is lazy loaded once the service's ``swift_dir`` is known via :meth:`~StoragePolicyCollection.get_object_ring`, but it may be over-ridden via object_ring kwarg at create time for testing or actively loaded with :meth:`~StoragePolicy.load_ring`. """ + + policy_type_to_policy_cls = {} + def __init__(self, idx, name='', is_default=False, is_deprecated=False, object_ring=None): + # do not allow BaseStoragePolicy class to be instantiated directly + if type(self) == BaseStoragePolicy: + raise TypeError("Can't instantiate BaseStoragePolicy directly") + # policy parameter validation try: self.idx = int(idx) except ValueError: @@ -88,6 +133,8 @@ class StoragePolicy(object): self.name = name self.is_deprecated = config_true_value(is_deprecated) self.is_default = config_true_value(is_default) + if self.policy_type not in BaseStoragePolicy.policy_type_to_policy_cls: + raise PolicyError('Invalid type', self.policy_type) if self.is_deprecated and self.is_default: raise PolicyError('Deprecated policy can not be default. ' 'Invalid config', self.idx) @@ -101,8 +148,80 @@ class StoragePolicy(object): return cmp(self.idx, int(other)) def __repr__(self): - return ("StoragePolicy(%d, %r, is_default=%s, is_deprecated=%s)") % ( - self.idx, self.name, self.is_default, self.is_deprecated) + return ("%s(%d, %r, is_default=%s, " + "is_deprecated=%s, policy_type=%r)") % \ + (self.__class__.__name__, self.idx, self.name, + self.is_default, self.is_deprecated, self.policy_type) + + @classmethod + def register(cls, policy_type): + """ + Decorator for Storage Policy implementations to register + their StoragePolicy class. This will also set the policy_type + attribute on the registered implementation. + """ + def register_wrapper(policy_cls): + if policy_type in cls.policy_type_to_policy_cls: + raise PolicyError( + '%r is already registered for the policy_type %r' % ( + cls.policy_type_to_policy_cls[policy_type], + policy_type)) + cls.policy_type_to_policy_cls[policy_type] = policy_cls + policy_cls.policy_type = policy_type + return policy_cls + return register_wrapper + + @classmethod + def _config_options_map(cls): + """ + Map config option name to StoragePolicy parameter name. + """ + return { + 'name': 'name', + 'policy_type': 'policy_type', + 'default': 'is_default', + 'deprecated': 'is_deprecated', + } + + @classmethod + def from_config(cls, policy_index, options): + config_to_policy_option_map = cls._config_options_map() + policy_options = {} + for config_option, value in options.items(): + try: + policy_option = config_to_policy_option_map[config_option] + except KeyError: + raise PolicyError('Invalid option %r in ' + 'storage-policy section' % config_option, + index=policy_index) + policy_options[policy_option] = value + return cls(policy_index, **policy_options) + + def get_info(self, config=False): + """ + Return the info dict and conf file options for this policy. + + :param config: boolean, if True all config options are returned + """ + info = {} + for config_option, policy_attribute in \ + self._config_options_map().items(): + info[config_option] = getattr(self, policy_attribute) + if not config: + # remove some options for public consumption + if not self.is_default: + info.pop('default') + if not self.is_deprecated: + info.pop('deprecated') + info.pop('policy_type') + return info + + def _validate_ring(self): + """ + Hook, called when the ring is loaded. Can be used to + validate the ring against the StoragePolicy configuration. + """ + pass def load_ring(self, swift_dir): """ @@ -114,11 +233,224 @@ class StoragePolicy(object): return self.object_ring = Ring(swift_dir, ring_name=self.ring_name) - def get_options(self): - """Return the valid conf file options for this policy.""" - return {'name': self.name, - 'default': self.is_default, - 'deprecated': self.is_deprecated} + # Validate ring to make sure it conforms to policy requirements + self._validate_ring() + + @property + def quorum(self): + """ + Number of successful backend requests needed for the proxy to + consider the client request successful. + """ + raise NotImplementedError() + + +@BaseStoragePolicy.register(REPL_POLICY) +class StoragePolicy(BaseStoragePolicy): + """ + Represents a storage policy of type 'replication'. Default storage policy + class unless otherwise overridden from swift.conf. + + Not meant to be instantiated directly; use + :func:`~swift.common.storage_policy.reload_storage_policies` to load + POLICIES from ``swift.conf``. + """ + + @property + def quorum(self): + """ + Quorum concept in the replication case: + floor(number of replica / 2) + 1 + """ + if not self.object_ring: + raise PolicyError('Ring is not loaded') + return quorum_size(self.object_ring.replica_count) + + +@BaseStoragePolicy.register(EC_POLICY) +class ECStoragePolicy(BaseStoragePolicy): + """ + Represents a storage policy of type 'erasure_coding'. + + Not meant to be instantiated directly; use + :func:`~swift.common.storage_policy.reload_storage_policies` to load + POLICIES from ``swift.conf``. + """ + def __init__(self, idx, name='', is_default=False, + is_deprecated=False, object_ring=None, + ec_segment_size=DEFAULT_EC_OBJECT_SEGMENT_SIZE, + ec_type=None, ec_ndata=None, ec_nparity=None): + + super(ECStoragePolicy, self).__init__( + idx, name, is_default, is_deprecated, object_ring) + + # Validate erasure_coding policy specific members + # ec_type is one of the EC implementations supported by PyEClib + if ec_type is None: + raise PolicyError('Missing ec_type') + if ec_type not in VALID_EC_TYPES: + raise PolicyError('Wrong ec_type %s for policy %s, should be one' + ' of "%s"' % (ec_type, self.name, + ', '.join(VALID_EC_TYPES))) + self._ec_type = ec_type + + # Define _ec_ndata as the number of EC data fragments + # Accessible as the property "ec_ndata" + try: + value = int(ec_ndata) + if value <= 0: + raise ValueError + self._ec_ndata = value + except (TypeError, ValueError): + raise PolicyError('Invalid ec_num_data_fragments %r' % + ec_ndata, index=self.idx) + + # Define _ec_nparity as the number of EC parity fragments + # Accessible as the property "ec_nparity" + try: + value = int(ec_nparity) + if value <= 0: + raise ValueError + self._ec_nparity = value + except (TypeError, ValueError): + raise PolicyError('Invalid ec_num_parity_fragments %r' + % ec_nparity, index=self.idx) + + # Define _ec_segment_size as the encode segment unit size + # Accessible as the property "ec_segment_size" + try: + value = int(ec_segment_size) + if value <= 0: + raise ValueError + self._ec_segment_size = value + except (TypeError, ValueError): + raise PolicyError('Invalid ec_object_segment_size %r' % + ec_segment_size, index=self.idx) + + # Initialize PyECLib EC backend + try: + self.pyeclib_driver = \ + ECDriver(k=self._ec_ndata, m=self._ec_nparity, + ec_type=self._ec_type) + except ECDriverError as e: + raise PolicyError("Error creating EC policy (%s)" % e, + index=self.idx) + + # quorum size in the EC case depends on the choice of EC scheme. + self._ec_quorum_size = \ + self._ec_ndata + self.pyeclib_driver.min_parity_fragments_needed() + + @property + def ec_type(self): + return self._ec_type + + @property + def ec_ndata(self): + return self._ec_ndata + + @property + def ec_nparity(self): + return self._ec_nparity + + @property + def ec_segment_size(self): + return self._ec_segment_size + + @property + def fragment_size(self): + """ + Maximum length of a fragment, including header. + + NB: a fragment archive is a sequence of 0 or more max-length + fragments followed by one possibly-shorter fragment. + """ + # Technically pyeclib's get_segment_info signature calls for + # (data_len, segment_size) but on a ranged GET we don't know the + # ec-content-length header before we need to compute where in the + # object we should request to align with the fragment size. So we + # tell pyeclib a lie - from it's perspective, as long as data_len >= + # segment_size it'll give us the answer we want. From our + # perspective, because we only use this answer to calculate the + # *minimum* size we should read from an object body even if data_len < + # segment_size we'll still only read *the whole one and only last + # fragment* and pass than into pyeclib who will know what to do with + # it just as it always does when the last fragment is < fragment_size. + return self.pyeclib_driver.get_segment_info( + self.ec_segment_size, self.ec_segment_size)['fragment_size'] + + @property + def ec_scheme_description(self): + """ + This short hand form of the important parts of the ec schema is stored + in Object System Metadata on the EC Fragment Archives for debugging. + """ + return "%s %d+%d" % (self._ec_type, self._ec_ndata, self._ec_nparity) + + def __repr__(self): + return ("%s, EC config(ec_type=%s, ec_segment_size=%d, " + "ec_ndata=%d, ec_nparity=%d)") % ( + super(ECStoragePolicy, self).__repr__(), self.ec_type, + self.ec_segment_size, self.ec_ndata, self.ec_nparity) + + @classmethod + def _config_options_map(cls): + options = super(ECStoragePolicy, cls)._config_options_map() + options.update({ + 'ec_type': 'ec_type', + 'ec_object_segment_size': 'ec_segment_size', + 'ec_num_data_fragments': 'ec_ndata', + 'ec_num_parity_fragments': 'ec_nparity', + }) + return options + + def get_info(self, config=False): + info = super(ECStoragePolicy, self).get_info(config=config) + if not config: + info.pop('ec_object_segment_size') + info.pop('ec_num_data_fragments') + info.pop('ec_num_parity_fragments') + info.pop('ec_type') + return info + + def _validate_ring(self): + """ + EC specific validation + + Replica count check - we need _at_least_ (#data + #parity) replicas + configured. Also if the replica count is larger than exactly that + number there's a non-zero risk of error for code that is considering + the number of nodes in the primary list from the ring. + """ + if not self.object_ring: + raise PolicyError('Ring is not loaded') + nodes_configured = self.object_ring.replica_count + if nodes_configured != (self.ec_ndata + self.ec_nparity): + raise RingValidationError( + 'EC ring for policy %s needs to be configured with ' + 'exactly %d nodes. Got %d.' % (self.name, + self.ec_ndata + self.ec_nparity, nodes_configured)) + + @property + def quorum(self): + """ + Number of successful backend requests needed for the proxy to consider + the client request successful. + + The quorum size for EC policies defines the minimum number + of data + parity elements required to be able to guarantee + the desired fault tolerance, which is the number of data + elements supplemented by the minimum number of parity + elements required by the chosen erasure coding scheme. + + For example, for Reed-Solomon, the minimum number parity + elements required is 1, and thus the quorum_size requirement + is ec_ndata + 1. + + Given the number of parity elements required is not the same + for every erasure coding scheme, consult PyECLib for + min_parity_fragments_needed() + """ + return self._ec_quorum_size class StoragePolicyCollection(object): @@ -236,9 +568,19 @@ class StoragePolicyCollection(object): :returns: storage policy, or None if no such policy """ # makes it easier for callers to just pass in a header value - index = int(index) if index else 0 + if index in ('', None): + index = 0 + else: + try: + index = int(index) + except ValueError: + return None return self.by_index.get(index) + @property + def legacy(self): + return self.get_by_index(None) + def get_object_ring(self, policy_idx, swift_dir): """ Get the ring object to use to handle a request based on its policy. @@ -267,10 +609,7 @@ class StoragePolicyCollection(object): # delete from /info if deprecated if pol.is_deprecated: continue - policy_entry = {} - policy_entry['name'] = pol.name - if pol.is_default: - policy_entry['default'] = pol.is_default + policy_entry = pol.get_info() policy_info.append(policy_entry) return policy_info @@ -287,22 +626,10 @@ def parse_storage_policies(conf): if not section.startswith('storage-policy:'): continue policy_index = section.split(':', 1)[1] - # map config option name to StoragePolicy parameter name - config_to_policy_option_map = { - 'name': 'name', - 'default': 'is_default', - 'deprecated': 'is_deprecated', - } - policy_options = {} - for config_option, value in conf.items(section): - try: - policy_option = config_to_policy_option_map[config_option] - except KeyError: - raise PolicyError('Invalid option %r in ' - 'storage-policy section %r' % ( - config_option, section)) - policy_options[policy_option] = value - policy = StoragePolicy(policy_index, **policy_options) + config_options = dict(conf.items(section)) + policy_type = config_options.pop('policy_type', DEFAULT_POLICY_TYPE) + policy_cls = BaseStoragePolicy.policy_type_to_policy_cls[policy_type] + policy = policy_cls.from_config(policy_index, config_options) policies.append(policy) return StoragePolicyCollection(policies) diff --git a/swift/common/swob.py b/swift/common/swob.py index 729cdd96f..c2e3afb4e 100644 --- a/swift/common/swob.py +++ b/swift/common/swob.py @@ -36,7 +36,7 @@ needs to change. """ from collections import defaultdict -from cStringIO import StringIO +from StringIO import StringIO import UserDict import time from functools import partial @@ -128,6 +128,20 @@ class _UTC(tzinfo): UTC = _UTC() +class WsgiStringIO(StringIO): + """ + This class adds support for the additional wsgi.input methods defined on + eventlet.wsgi.Input to the StringIO class which would otherwise be a fine + stand-in for the file-like object in the WSGI environment. + """ + + def set_hundred_continue_response_headers(self, headers): + pass + + def send_hundred_continue_response(self): + pass + + def _datetime_property(header): """ Set and retrieve the datetime value of self.headers[header] @@ -743,16 +757,16 @@ def _req_environ_property(environ_field): def _req_body_property(): """ Set and retrieve the Request.body parameter. It consumes wsgi.input and - returns the results. On assignment, uses a StringIO to create a new + returns the results. On assignment, uses a WsgiStringIO to create a new wsgi.input. """ def getter(self): body = self.environ['wsgi.input'].read() - self.environ['wsgi.input'] = StringIO(body) + self.environ['wsgi.input'] = WsgiStringIO(body) return body def setter(self, value): - self.environ['wsgi.input'] = StringIO(value) + self.environ['wsgi.input'] = WsgiStringIO(value) self.environ['CONTENT_LENGTH'] = str(len(value)) return property(getter, setter, doc="Get and set the request body str") @@ -820,7 +834,7 @@ class Request(object): :param path: encoded, parsed, and unquoted into PATH_INFO :param environ: WSGI environ dictionary :param headers: HTTP headers - :param body: stuffed in a StringIO and hung on wsgi.input + :param body: stuffed in a WsgiStringIO and hung on wsgi.input :param kwargs: any environ key with an property setter """ headers = headers or {} @@ -855,10 +869,10 @@ class Request(object): } env.update(environ) if body is not None: - env['wsgi.input'] = StringIO(body) + env['wsgi.input'] = WsgiStringIO(body) env['CONTENT_LENGTH'] = str(len(body)) elif 'wsgi.input' not in env: - env['wsgi.input'] = StringIO('') + env['wsgi.input'] = WsgiStringIO('') req = Request(env) for key, val in headers.iteritems(): req.headers[key] = val @@ -965,7 +979,7 @@ class Request(object): env.update({ 'REQUEST_METHOD': 'GET', 'CONTENT_LENGTH': '0', - 'wsgi.input': StringIO(''), + 'wsgi.input': WsgiStringIO(''), }) return Request(env) @@ -1102,10 +1116,12 @@ class Response(object): app_iter = _resp_app_iter_property() def __init__(self, body=None, status=200, headers=None, app_iter=None, - request=None, conditional_response=False, **kw): + request=None, conditional_response=False, + conditional_etag=None, **kw): self.headers = HeaderKeyDict( [('Content-Type', 'text/html; charset=UTF-8')]) self.conditional_response = conditional_response + self._conditional_etag = conditional_etag self.request = request self.body = body self.app_iter = app_iter @@ -1131,6 +1147,26 @@ class Response(object): if 'charset' in kw and 'content_type' in kw: self.charset = kw['charset'] + @property + def conditional_etag(self): + """ + The conditional_etag keyword argument for Response will allow the + conditional match value of a If-Match request to be compared to a + non-standard value. + + This is available for Storage Policies that do not store the client + object data verbatim on the storage nodes, but still need support + conditional requests. + + It's most effectively used with X-Backend-Etag-Is-At which would + define the additional Metadata key where the original ETag of the + clear-form client request data. + """ + if self._conditional_etag is not None: + return self._conditional_etag + else: + return self.etag + def _prepare_for_ranges(self, ranges): """ Prepare the Response for multiple ranges. @@ -1161,15 +1197,16 @@ class Response(object): return content_size, content_type def _response_iter(self, app_iter, body): + etag = self.conditional_etag if self.conditional_response and self.request: - if self.etag and self.request.if_none_match and \ - self.etag in self.request.if_none_match: + if etag and self.request.if_none_match and \ + etag in self.request.if_none_match: self.status = 304 self.content_length = 0 return [''] - if self.etag and self.request.if_match and \ - self.etag not in self.request.if_match: + if etag and self.request.if_match and \ + etag not in self.request.if_match: self.status = 412 self.content_length = 0 return [''] diff --git a/swift/common/utils.py b/swift/common/utils.py index cf7b7e7c5..19dcfd3d6 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -2236,11 +2236,16 @@ class GreenAsyncPile(object): Correlating results with jobs (if necessary) is left to the caller. """ - def __init__(self, size): + def __init__(self, size_or_pool): """ - :param size: size pool of green threads to use + :param size_or_pool: thread pool size or a pool to use """ - self._pool = GreenPool(size) + if isinstance(size_or_pool, GreenPool): + self._pool = size_or_pool + size = self._pool.size + else: + self._pool = GreenPool(size_or_pool) + size = size_or_pool self._responses = eventlet.queue.LightQueue(size) self._inflight = 0 @@ -2646,6 +2651,10 @@ def public(func): def quorum_size(n): """ + quorum size as it applies to services that use 'replication' for data + integrity (Account/Container services). Object quorum_size is defined + on a storage policy basis. + Number of successful backend requests needed for the proxy to consider the client request successful. """ @@ -3139,6 +3148,26 @@ _rfc_extension_pattern = re.compile( r'(?:\s*;\s*(' + _rfc_token + r")\s*(?:=\s*(" + _rfc_token + r'|"(?:[^"\\]|\\.)*"))?)') +_content_range_pattern = re.compile(r'^bytes (\d+)-(\d+)/(\d+)$') + + +def parse_content_range(content_range): + """ + Parse a content-range header into (first_byte, last_byte, total_size). + + See RFC 7233 section 4.2 for details on the header format, but it's + basically "Content-Range: bytes ${start}-${end}/${total}". + + :param content_range: Content-Range header value to parse, + e.g. "bytes 100-1249/49004" + :returns: 3-tuple (start, end, total) + :raises: ValueError if malformed + """ + found = re.search(_content_range_pattern, content_range) + if not found: + raise ValueError("malformed Content-Range %r" % (content_range,)) + return tuple(int(x) for x in found.groups()) + def parse_content_type(content_type): """ @@ -3293,8 +3322,11 @@ def iter_multipart_mime_documents(wsgi_input, boundary, read_chunk_size=4096): :raises: MimeInvalid if the document is malformed """ boundary = '--' + boundary - if wsgi_input.readline(len(boundary + '\r\n')).strip() != boundary: - raise swift.common.exceptions.MimeInvalid('invalid starting boundary') + blen = len(boundary) + 2 # \r\n + got = wsgi_input.readline(blen) + if got.strip() != boundary: + raise swift.common.exceptions.MimeInvalid( + 'invalid starting boundary: wanted %r, got %r', (boundary, got)) boundary = '\r\n' + boundary input_buffer = '' done = False diff --git a/swift/common/wsgi.py b/swift/common/wsgi.py index b1e1f5ea7..35df2077f 100644 --- a/swift/common/wsgi.py +++ b/swift/common/wsgi.py @@ -25,6 +25,7 @@ import time import mimetools from swift import gettext_ as _ from StringIO import StringIO +from textwrap import dedent import eventlet import eventlet.debug @@ -96,13 +97,34 @@ def _loadconfigdir(object_type, uri, path, name, relative_to, global_conf): loadwsgi._loaders['config_dir'] = _loadconfigdir +class ConfigString(NamedConfigLoader): + """ + Wrap a raw config string up for paste.deploy. + + If you give one of these to our loadcontext (e.g. give it to our + appconfig) we'll intercept it and get it routed to the right loader. + """ + + def __init__(self, config_string): + self.contents = StringIO(dedent(config_string)) + self.filename = "string" + defaults = { + 'here': "string", + '__file__': "string", + } + self.parser = loadwsgi.NicerConfigParser("string", defaults=defaults) + self.parser.optionxform = str # Don't lower-case keys + self.parser.readfp(self.contents) + + def wrap_conf_type(f): """ Wrap a function whos first argument is a paste.deploy style config uri, - such that you can pass it an un-adorned raw filesystem path and the config - directive (either config: or config_dir:) will be added automatically - based on the type of filesystem entity at the given path (either a file or - directory) before passing it through to the paste.deploy function. + such that you can pass it an un-adorned raw filesystem path (or config + string) and the config directive (either config:, config_dir:, or + config_str:) will be added automatically based on the type of entity + (either a file or directory, or if no such entity on the file system - + just a string) before passing it through to the paste.deploy function. """ def wrapper(conf_path, *args, **kwargs): if os.path.isdir(conf_path): @@ -332,6 +354,12 @@ class PipelineWrapper(object): def loadcontext(object_type, uri, name=None, relative_to=None, global_conf=None): + if isinstance(uri, loadwsgi.ConfigLoader): + # bypass loadcontext's uri parsing and loader routing and + # just directly return the context + if global_conf: + uri.update_defaults(global_conf, overwrite=False) + return uri.get_context(object_type, name, global_conf) add_conf_type = wrap_conf_type(lambda x: x) return loadwsgi.loadcontext(object_type, add_conf_type(uri), name=name, relative_to=relative_to, diff --git a/swift/container/sync.py b/swift/container/sync.py index 0f42de6e9..a409de4ac 100644 --- a/swift/container/sync.py +++ b/swift/container/sync.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import errno import os import uuid from swift import gettext_ as _ @@ -25,8 +26,8 @@ from eventlet import sleep, Timeout import swift.common.db from swift.container.backend import ContainerBroker, DATADIR from swift.common.container_sync_realms import ContainerSyncRealms -from swift.common.direct_client import direct_get_object -from swift.common.internal_client import delete_object, put_object +from swift.common.internal_client import ( + delete_object, put_object, InternalClient, UnexpectedResponse) from swift.common.exceptions import ClientException from swift.common.ring import Ring from swift.common.ring.utils import is_local_device @@ -37,6 +38,55 @@ from swift.common.utils import ( from swift.common.daemon import Daemon from swift.common.http import HTTP_UNAUTHORIZED, HTTP_NOT_FOUND from swift.common.storage_policy import POLICIES +from swift.common.wsgi import ConfigString + + +# The default internal client config body is to support upgrades without +# requiring deployment of the new /etc/swift/internal-client.conf +ic_conf_body = """ +[DEFAULT] +# swift_dir = /etc/swift +# user = swift +# You can specify default log routing here if you want: +# log_name = swift +# log_facility = LOG_LOCAL0 +# log_level = INFO +# log_address = /dev/log +# +# comma separated list of functions to call to setup custom log handlers. +# functions get passed: conf, name, log_to_console, log_route, fmt, logger, +# adapted_logger +# log_custom_handlers = +# +# If set, log_udp_host will override log_address +# log_udp_host = +# log_udp_port = 514 +# +# You can enable StatsD logging here: +# log_statsd_host = localhost +# log_statsd_port = 8125 +# log_statsd_default_sample_rate = 1.0 +# log_statsd_sample_rate_factor = 1.0 +# log_statsd_metric_prefix = + +[pipeline:main] +pipeline = catch_errors proxy-logging cache proxy-server + +[app:proxy-server] +use = egg:swift#proxy +# See proxy-server.conf-sample for options + +[filter:cache] +use = egg:swift#memcache +# See proxy-server.conf-sample for options + +[filter:proxy-logging] +use = egg:swift#proxy_logging + +[filter:catch_errors] +use = egg:swift#catch_errors +# See proxy-server.conf-sample for options +""".lstrip() class ContainerSync(Daemon): @@ -103,12 +153,12 @@ class ContainerSync(Daemon): loaded. This is overridden by unit tests. """ - def __init__(self, conf, container_ring=None): + def __init__(self, conf, container_ring=None, logger=None): #: The dict of configuration values from the [container-sync] section #: of the container-server.conf. self.conf = conf #: Logger to use for container-sync log lines. - self.logger = get_logger(conf, log_route='container-sync') + self.logger = logger or get_logger(conf, log_route='container-sync') #: Path to the local device mount points. self.devices = conf.get('devices', '/srv/node') #: Indicates whether mount points should be verified as actual mount @@ -159,6 +209,26 @@ class ContainerSync(Daemon): swift.common.db.DB_PREALLOCATION = \ config_true_value(conf.get('db_preallocation', 'f')) self.conn_timeout = float(conf.get('conn_timeout', 5)) + request_tries = int(conf.get('request_tries') or 3) + + internal_client_conf_path = conf.get('internal_client_conf_path') + if not internal_client_conf_path: + self.logger.warning( + _('Configuration option internal_client_conf_path not ' + 'defined. Using default configuration, See ' + 'internal-client.conf-sample for options')) + internal_client_conf = ConfigString(ic_conf_body) + else: + internal_client_conf = internal_client_conf_path + try: + self.swift = InternalClient( + internal_client_conf, 'Swift Container Sync', request_tries) + except IOError as err: + if err.errno != errno.ENOENT: + raise + raise SystemExit( + _('Unable to load internal client from config: %r (%s)') % + (internal_client_conf_path, err)) def get_object_ring(self, policy_idx): """ @@ -380,39 +450,32 @@ class ContainerSync(Daemon): looking_for_timestamp = Timestamp(row['created_at']) timestamp = -1 headers = body = None - headers_out = {'X-Backend-Storage-Policy-Index': + # look up for the newest one + headers_out = {'X-Newest': True, + 'X-Backend-Storage-Policy-Index': str(info['storage_policy_index'])} - for node in nodes: - try: - these_headers, this_body = direct_get_object( - node, part, info['account'], info['container'], - row['name'], headers=headers_out, - resp_chunk_size=65536) - this_timestamp = Timestamp( - these_headers['x-timestamp']) - if this_timestamp > timestamp: - timestamp = this_timestamp - headers = these_headers - body = this_body - except ClientException as err: - # If any errors are not 404, make sure we report the - # non-404 one. We don't want to mistakenly assume the - # object no longer exists just because one says so and - # the others errored for some other reason. - if not exc or getattr( - exc, 'http_status', HTTP_NOT_FOUND) == \ - HTTP_NOT_FOUND: - exc = err - except (Exception, Timeout) as err: - exc = err + try: + source_obj_status, source_obj_info, source_obj_iter = \ + self.swift.get_object(info['account'], + info['container'], row['name'], + headers=headers_out, + acceptable_statuses=(2, 4)) + + except (Exception, UnexpectedResponse, Timeout) as err: + source_obj_info = {} + source_obj_iter = None + exc = err + timestamp = Timestamp(source_obj_info.get( + 'x-timestamp', 0)) + headers = source_obj_info + body = source_obj_iter if timestamp < looking_for_timestamp: if exc: raise exc raise Exception( - _('Unknown exception trying to GET: %(node)r ' + _('Unknown exception trying to GET: ' '%(account)r %(container)r %(object)r'), - {'node': node, 'part': part, - 'account': info['account'], + {'account': info['account'], 'container': info['container'], 'object': row['name']}) for key in ('date', 'last-modified'): diff --git a/swift/obj/diskfile.py b/swift/obj/diskfile.py index a8d14dfa2..c49d557fe 100644 --- a/swift/obj/diskfile.py +++ b/swift/obj/diskfile.py @@ -40,7 +40,7 @@ import hashlib import logging import traceback import xattr -from os.path import basename, dirname, exists, getmtime, join +from os.path import basename, dirname, exists, getmtime, join, splitext from random import shuffle from tempfile import mkstemp from contextlib import contextmanager @@ -50,7 +50,7 @@ from eventlet import Timeout from eventlet.hubs import trampoline from swift import gettext_ as _ -from swift.common.constraints import check_mount +from swift.common.constraints import check_mount, check_dir from swift.common.request_helpers import is_sys_meta from swift.common.utils import mkdirs, Timestamp, \ storage_directory, hash_path, renamer, fallocate, fsync, \ @@ -63,7 +63,9 @@ from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist, \ DiskFileDeleted, DiskFileError, DiskFileNotOpen, PathNotDir, \ ReplicationLockTimeout, DiskFileExpired, DiskFileXattrNotSupported from swift.common.swob import multi_range_iterator -from swift.common.storage_policy import get_policy_string, POLICIES +from swift.common.storage_policy import ( + get_policy_string, split_policy_string, PolicyError, POLICIES, + REPL_POLICY, EC_POLICY) from functools import partial @@ -154,10 +156,10 @@ def write_metadata(fd, metadata, xattr_size=65536): raise -def extract_policy_index(obj_path): +def extract_policy(obj_path): """ - Extracts the policy index for an object (based on the name of the objects - directory) given the device-relative path to the object. Returns 0 in + Extracts the policy for an object (based on the name of the objects + directory) given the device-relative path to the object. Returns None in the event that the path is malformed in some way. The device-relative path is everything after the mount point; for example: @@ -170,19 +172,18 @@ def extract_policy_index(obj_path): objects-5/179/485dc017205a81df3af616d917c90179/1401811134.873649.data :param obj_path: device-relative path of an object - :returns: storage policy index + :returns: a :class:`~swift.common.storage_policy.BaseStoragePolicy` or None """ - policy_idx = 0 try: obj_portion = obj_path[obj_path.index(DATADIR_BASE):] obj_dirname = obj_portion[:obj_portion.index('/')] except Exception: - return policy_idx - if '-' in obj_dirname: - base, policy_idx = obj_dirname.split('-', 1) - if POLICIES.get_by_index(policy_idx) is None: - policy_idx = 0 - return int(policy_idx) + return None + try: + base, policy = split_policy_string(obj_dirname) + except PolicyError: + return None + return policy def quarantine_renamer(device_path, corrupted_file_path): @@ -197,9 +198,13 @@ def quarantine_renamer(device_path, corrupted_file_path): :raises OSError: re-raises non errno.EEXIST / errno.ENOTEMPTY exceptions from rename """ + policy = extract_policy(corrupted_file_path) + if policy is None: + # TODO: support a quarantine-unknown location + policy = POLICIES.legacy from_dir = dirname(corrupted_file_path) to_dir = join(device_path, 'quarantined', - get_data_dir(extract_policy_index(corrupted_file_path)), + get_data_dir(policy), basename(from_dir)) invalidate_hash(dirname(from_dir)) try: @@ -429,8 +434,9 @@ class AuditLocation(object): stringify to a filesystem path so the auditor's logs look okay. """ - def __init__(self, path, device, partition): - self.path, self.device, self.partition = path, device, partition + def __init__(self, path, device, partition, policy): + self.path, self.device, self.partition, self.policy = ( + path, device, partition, policy) def __str__(self): return str(self.path) @@ -470,19 +476,17 @@ def object_audit_location_generator(devices, mount_check=True, logger=None, _('Skipping %s as it is not mounted'), device) continue # loop through object dirs for all policies - for dir in [dir for dir in os.listdir(os.path.join(devices, device)) - if dir.startswith(DATADIR_BASE)]: - datadir_path = os.path.join(devices, device, dir) - # warn if the object dir doesn't match with a policy - policy_idx = 0 - if '-' in dir: - base, policy_idx = dir.split('-', 1) + for dir_ in os.listdir(os.path.join(devices, device)): + if not dir_.startswith(DATADIR_BASE): + continue try: - get_data_dir(policy_idx) - except ValueError: + base, policy = split_policy_string(dir_) + except PolicyError as e: if logger: - logger.warn(_('Directory %s does not map to a ' - 'valid policy') % dir) + logger.warn(_('Directory %r does not map ' + 'to a valid policy (%s)') % (dir_, e)) + continue + datadir_path = os.path.join(devices, device, dir_) partitions = listdir(datadir_path) for partition in partitions: part_path = os.path.join(datadir_path, partition) @@ -502,9 +506,50 @@ def object_audit_location_generator(devices, mount_check=True, logger=None, continue for hsh in hashes: hsh_path = os.path.join(suff_path, hsh) - yield AuditLocation(hsh_path, device, partition) + yield AuditLocation(hsh_path, device, partition, + policy) + + +def strip_self(f): + """ + Wrapper to attach module level functions to base class. + """ + def wrapper(self, *args, **kwargs): + return f(*args, **kwargs) + return wrapper + +class DiskFileRouter(object): + policy_type_to_manager_cls = {} + + @classmethod + def register(cls, policy_type): + """ + Decorator for Storage Policy implementations to register + their DiskFile implementation. + """ + def register_wrapper(diskfile_cls): + if policy_type in cls.policy_type_to_manager_cls: + raise PolicyError( + '%r is already registered for the policy_type %r' % ( + cls.policy_type_to_manager_cls[policy_type], + policy_type)) + cls.policy_type_to_manager_cls[policy_type] = diskfile_cls + return diskfile_cls + return register_wrapper + + def __init__(self, *args, **kwargs): + self.policy_to_manager = {} + for policy in POLICIES: + manager_cls = self.policy_type_to_manager_cls[policy.policy_type] + self.policy_to_manager[policy] = manager_cls(*args, **kwargs) + + def __getitem__(self, policy): + return self.policy_to_manager[policy] + + +@DiskFileRouter.register(REPL_POLICY) class DiskFileManager(object): """ Management class for devices, providing common place for shared parameters @@ -527,6 +572,16 @@ class DiskFileManager(object): :param conf: caller provided configuration object :param logger: caller provided logger """ + + diskfile_cls = None # DiskFile will be set after that class is defined + + # module level functions dropped to implementation specific + hash_cleanup_listdir = strip_self(hash_cleanup_listdir) + _get_hashes = strip_self(get_hashes) + invalidate_hash = strip_self(invalidate_hash) + get_ondisk_files = strip_self(get_ondisk_files) + quarantine_renamer = strip_self(quarantine_renamer) + def __init__(self, conf, logger): self.logger = logger self.devices = conf.get('devices', '/srv/node') @@ -583,21 +638,25 @@ class DiskFileManager(object): def get_dev_path(self, device, mount_check=None): """ - Return the path to a device, checking to see that it is a proper mount - point based on a configuration parameter. + Return the path to a device, first checking to see if either it + is a proper mount point, or at least a directory depending on + the mount_check configuration option. :param device: name of target device :param mount_check: whether or not to check mountedness of device. Defaults to bool(self.mount_check). :returns: full path to the device, None if the path to the device is - not a proper mount point. + not a proper mount point or directory. """ - should_check = self.mount_check if mount_check is None else mount_check - if should_check and not check_mount(self.devices, device): - dev_path = None - else: - dev_path = os.path.join(self.devices, device) - return dev_path + # we'll do some kind of check unless explicitly forbidden + if mount_check is not False: + if mount_check or self.mount_check: + check = check_mount + else: + check = check_dir + if not check(self.devices, device): + return None + return os.path.join(self.devices, device) @contextmanager def replication_lock(self, device): @@ -619,28 +678,27 @@ class DiskFileManager(object): yield True def pickle_async_update(self, device, account, container, obj, data, - timestamp, policy_idx): + timestamp, policy): device_path = self.construct_dev_path(device) - async_dir = os.path.join(device_path, get_async_dir(policy_idx)) + async_dir = os.path.join(device_path, get_async_dir(policy)) ohash = hash_path(account, container, obj) self.threadpools[device].run_in_thread( write_pickle, data, os.path.join(async_dir, ohash[-3:], ohash + '-' + Timestamp(timestamp).internal), - os.path.join(device_path, get_tmp_dir(policy_idx))) + os.path.join(device_path, get_tmp_dir(policy))) self.logger.increment('async_pendings') def get_diskfile(self, device, partition, account, container, obj, - policy_idx=0, **kwargs): + policy, **kwargs): dev_path = self.get_dev_path(device) if not dev_path: raise DiskFileDeviceUnavailable() - return DiskFile(self, dev_path, self.threadpools[device], - partition, account, container, obj, - policy_idx=policy_idx, - use_splice=self.use_splice, pipe_size=self.pipe_size, - **kwargs) + return self.diskfile_cls(self, dev_path, self.threadpools[device], + partition, account, container, obj, + policy=policy, use_splice=self.use_splice, + pipe_size=self.pipe_size, **kwargs) def object_audit_location_generator(self, device_dirs=None): return object_audit_location_generator(self.devices, self.mount_check, @@ -648,12 +706,12 @@ class DiskFileManager(object): def get_diskfile_from_audit_location(self, audit_location): dev_path = self.get_dev_path(audit_location.device, mount_check=False) - return DiskFile.from_hash_dir( + return self.diskfile_cls.from_hash_dir( self, audit_location.path, dev_path, - audit_location.partition) + audit_location.partition, policy=audit_location.policy) def get_diskfile_from_hash(self, device, partition, object_hash, - policy_idx, **kwargs): + policy, **kwargs): """ Returns a DiskFile instance for an object at the given object_hash. Just in case someone thinks of refactoring, be @@ -667,13 +725,14 @@ class DiskFileManager(object): if not dev_path: raise DiskFileDeviceUnavailable() object_path = os.path.join( - dev_path, get_data_dir(policy_idx), partition, object_hash[-3:], + dev_path, get_data_dir(policy), str(partition), object_hash[-3:], object_hash) try: - filenames = hash_cleanup_listdir(object_path, self.reclaim_age) + filenames = self.hash_cleanup_listdir(object_path, + self.reclaim_age) except OSError as err: if err.errno == errno.ENOTDIR: - quar_path = quarantine_renamer(dev_path, object_path) + quar_path = self.quarantine_renamer(dev_path, object_path) logging.exception( _('Quarantined %(object_path)s to %(quar_path)s because ' 'it is not a directory'), {'object_path': object_path, @@ -693,21 +752,20 @@ class DiskFileManager(object): metadata.get('name', ''), 3, 3, True) except ValueError: raise DiskFileNotExist() - return DiskFile(self, dev_path, self.threadpools[device], - partition, account, container, obj, - policy_idx=policy_idx, **kwargs) + return self.diskfile_cls(self, dev_path, self.threadpools[device], + partition, account, container, obj, + policy=policy, **kwargs) - def get_hashes(self, device, partition, suffix, policy_idx): + def get_hashes(self, device, partition, suffixes, policy): dev_path = self.get_dev_path(device) if not dev_path: raise DiskFileDeviceUnavailable() - partition_path = os.path.join(dev_path, get_data_dir(policy_idx), + partition_path = os.path.join(dev_path, get_data_dir(policy), partition) if not os.path.exists(partition_path): mkdirs(partition_path) - suffixes = suffix.split('-') if suffix else [] _junk, hashes = self.threadpools[device].force_run_in_thread( - get_hashes, partition_path, recalculate=suffixes) + self._get_hashes, partition_path, recalculate=suffixes) return hashes def _listdir(self, path): @@ -720,7 +778,7 @@ class DiskFileManager(object): path, err) return [] - def yield_suffixes(self, device, partition, policy_idx): + def yield_suffixes(self, device, partition, policy): """ Yields tuples of (full_path, suffix_only) for suffixes stored on the given device and partition. @@ -728,7 +786,7 @@ class DiskFileManager(object): dev_path = self.get_dev_path(device) if not dev_path: raise DiskFileDeviceUnavailable() - partition_path = os.path.join(dev_path, get_data_dir(policy_idx), + partition_path = os.path.join(dev_path, get_data_dir(policy), partition) for suffix in self._listdir(partition_path): if len(suffix) != 3: @@ -739,7 +797,7 @@ class DiskFileManager(object): continue yield (os.path.join(partition_path, suffix), suffix) - def yield_hashes(self, device, partition, policy_idx, suffixes=None): + def yield_hashes(self, device, partition, policy, suffixes=None, **kwargs): """ Yields tuples of (full_path, hash_only, timestamp) for object information stored for the given device, partition, and @@ -752,17 +810,18 @@ class DiskFileManager(object): if not dev_path: raise DiskFileDeviceUnavailable() if suffixes is None: - suffixes = self.yield_suffixes(device, partition, policy_idx) + suffixes = self.yield_suffixes(device, partition, policy) else: - partition_path = os.path.join(dev_path, get_data_dir(policy_idx), - partition) + partition_path = os.path.join(dev_path, + get_data_dir(policy), + str(partition)) suffixes = ( (os.path.join(partition_path, suffix), suffix) for suffix in suffixes) for suffix_path, suffix in suffixes: for object_hash in self._listdir(suffix_path): object_path = os.path.join(suffix_path, object_hash) - for name in hash_cleanup_listdir( + for name in self.hash_cleanup_listdir( object_path, self.reclaim_age): ts, ext = name.rsplit('.', 1) yield (object_path, object_hash, ts) @@ -794,8 +853,11 @@ class DiskFileWriter(object): :param tmppath: full path name of the opened file descriptor :param bytes_per_sync: number bytes written between sync calls :param threadpool: internal thread pool to use for disk operations + :param diskfile: the diskfile creating this DiskFileWriter instance """ - def __init__(self, name, datadir, fd, tmppath, bytes_per_sync, threadpool): + + def __init__(self, name, datadir, fd, tmppath, bytes_per_sync, threadpool, + diskfile): # Parameter tracking self._name = name self._datadir = datadir @@ -803,6 +865,7 @@ class DiskFileWriter(object): self._tmppath = tmppath self._bytes_per_sync = bytes_per_sync self._threadpool = threadpool + self._diskfile = diskfile # Internal attributes self._upload_size = 0 @@ -811,6 +874,10 @@ class DiskFileWriter(object): self._put_succeeded = False @property + def manager(self): + return self._diskfile.manager + + @property def put_succeeded(self): return self._put_succeeded @@ -855,7 +922,7 @@ class DiskFileWriter(object): # drop_cache() after fsync() to avoid redundant work (pages all # clean). drop_buffer_cache(self._fd, 0, self._upload_size) - invalidate_hash(dirname(self._datadir)) + self.manager.invalidate_hash(dirname(self._datadir)) # After the rename completes, this object will be available for other # requests to reference. renamer(self._tmppath, target_path) @@ -864,7 +931,7 @@ class DiskFileWriter(object): # succeeded, the tempfile would no longer exist at its original path. self._put_succeeded = True try: - hash_cleanup_listdir(self._datadir) + self.manager.hash_cleanup_listdir(self._datadir) except OSError: logging.exception(_('Problem cleaning up %s'), self._datadir) @@ -887,6 +954,16 @@ class DiskFileWriter(object): self._threadpool.force_run_in_thread( self._finalize_put, metadata, target_path) + def commit(self, timestamp): + """ + Perform any operations necessary to mark the object as durable. For + replication policy type this is a no-op. + + :param timestamp: object put timestamp, an instance of + :class:`~swift.common.utils.Timestamp` + """ + pass + class DiskFileReader(object): """ @@ -917,17 +994,20 @@ class DiskFileReader(object): :param quarantine_hook: 1-arg callable called w/reason when quarantined :param use_splice: if true, use zero-copy splice() to send data :param pipe_size: size of pipe buffer used in zero-copy operations + :param diskfile: the diskfile creating this DiskFileReader instance :param keep_cache: should resulting reads be kept in the buffer cache """ def __init__(self, fp, data_file, obj_size, etag, threadpool, disk_chunk_size, keep_cache_size, device_path, logger, - quarantine_hook, use_splice, pipe_size, keep_cache=False): + quarantine_hook, use_splice, pipe_size, diskfile, + keep_cache=False): # Parameter tracking self._fp = fp self._data_file = data_file self._obj_size = obj_size self._etag = etag self._threadpool = threadpool + self._diskfile = diskfile self._disk_chunk_size = disk_chunk_size self._device_path = device_path self._logger = logger @@ -950,6 +1030,10 @@ class DiskFileReader(object): self._suppress_file_closing = False self._quarantined_dir = None + @property + def manager(self): + return self._diskfile.manager + def __iter__(self): """Returns an iterator over the data file.""" try: @@ -1130,7 +1214,8 @@ class DiskFileReader(object): def _quarantine(self, msg): self._quarantined_dir = self._threadpool.run_in_thread( - quarantine_renamer, self._device_path, self._data_file) + self.manager.quarantine_renamer, self._device_path, + self._data_file) self._logger.warn("Quarantined object %s: %s" % ( self._data_file, msg)) self._logger.increment('quarantines') @@ -1196,15 +1281,18 @@ class DiskFile(object): :param container: container name for the object :param obj: object name for the object :param _datadir: override the full datadir otherwise constructed here - :param policy_idx: used to get the data dir when constructing it here + :param policy: the StoragePolicy instance :param use_splice: if true, use zero-copy splice() to send data :param pipe_size: size of pipe buffer used in zero-copy operations """ + reader_cls = DiskFileReader + writer_cls = DiskFileWriter + def __init__(self, mgr, device_path, threadpool, partition, account=None, container=None, obj=None, _datadir=None, - policy_idx=0, use_splice=False, pipe_size=None): - self._mgr = mgr + policy=None, use_splice=False, pipe_size=None, **kwargs): + self._manager = mgr self._device_path = device_path self._threadpool = threadpool or ThreadPool(nthreads=0) self._logger = mgr.logger @@ -1212,6 +1300,7 @@ class DiskFile(object): self._bytes_per_sync = mgr.bytes_per_sync self._use_splice = use_splice self._pipe_size = pipe_size + self.policy = policy if account and container and obj: self._name = '/' + '/'.join((account, container, obj)) self._account = account @@ -1219,7 +1308,7 @@ class DiskFile(object): self._obj = obj name_hash = hash_path(account, container, obj) self._datadir = join( - device_path, storage_directory(get_data_dir(policy_idx), + device_path, storage_directory(get_data_dir(policy), partition, name_hash)) else: # gets populated when we read the metadata @@ -1228,7 +1317,7 @@ class DiskFile(object): self._container = None self._obj = None self._datadir = None - self._tmpdir = join(device_path, get_tmp_dir(policy_idx)) + self._tmpdir = join(device_path, get_tmp_dir(policy)) self._metadata = None self._data_file = None self._fp = None @@ -1239,10 +1328,14 @@ class DiskFile(object): else: name_hash = hash_path(account, container, obj) self._datadir = join( - device_path, storage_directory(get_data_dir(policy_idx), + device_path, storage_directory(get_data_dir(policy), partition, name_hash)) @property + def manager(self): + return self._manager + + @property def account(self): return self._account @@ -1267,8 +1360,9 @@ class DiskFile(object): return Timestamp(self._metadata.get('X-Timestamp')) @classmethod - def from_hash_dir(cls, mgr, hash_dir_path, device_path, partition): - return cls(mgr, device_path, None, partition, _datadir=hash_dir_path) + def from_hash_dir(cls, mgr, hash_dir_path, device_path, partition, policy): + return cls(mgr, device_path, None, partition, _datadir=hash_dir_path, + policy=policy) def open(self): """ @@ -1307,7 +1401,7 @@ class DiskFile(object): .. note:: - An implemenation shall raise `DiskFileNotOpen` when has not + An implementation shall raise `DiskFileNotOpen` when has not previously invoked the :func:`swift.obj.diskfile.DiskFile.open` method. """ @@ -1339,7 +1433,7 @@ class DiskFile(object): :returns: DiskFileQuarantined exception object """ self._quarantined_dir = self._threadpool.run_in_thread( - quarantine_renamer, self._device_path, data_file) + self.manager.quarantine_renamer, self._device_path, data_file) self._logger.warn("Quarantined object %s: %s" % ( data_file, msg)) self._logger.increment('quarantines') @@ -1384,7 +1478,7 @@ class DiskFile(object): # The data directory does not exist, so the object cannot exist. fileset = (None, None, None) else: - fileset = get_ondisk_files(files, self._datadir) + fileset = self.manager.get_ondisk_files(files, self._datadir) return fileset def _construct_exception_from_ts_file(self, ts_file): @@ -1576,12 +1670,12 @@ class DiskFile(object): Not needed by the REST layer. :returns: a :class:`swift.obj.diskfile.DiskFileReader` object """ - dr = DiskFileReader( + dr = self.reader_cls( self._fp, self._data_file, int(self._metadata['Content-Length']), self._metadata['ETag'], self._threadpool, self._disk_chunk_size, - self._mgr.keep_cache_size, self._device_path, self._logger, + self._manager.keep_cache_size, self._device_path, self._logger, use_splice=self._use_splice, quarantine_hook=_quarantine_hook, - pipe_size=self._pipe_size, keep_cache=keep_cache) + pipe_size=self._pipe_size, diskfile=self, keep_cache=keep_cache) # At this point the reader object is now responsible for closing # the file pointer. self._fp = None @@ -1621,8 +1715,10 @@ class DiskFile(object): if err.errno in (errno.ENOSPC, errno.EDQUOT): raise DiskFileNoSpace() raise - dfw = DiskFileWriter(self._name, self._datadir, fd, tmppath, - self._bytes_per_sync, self._threadpool) + dfw = self.writer_cls(self._name, self._datadir, fd, tmppath, + bytes_per_sync=self._bytes_per_sync, + threadpool=self._threadpool, + diskfile=self) yield dfw finally: try: @@ -1671,8 +1767,619 @@ class DiskFile(object): :raises DiskFileError: this implementation will raise the same errors as the `create()` method. """ - timestamp = Timestamp(timestamp).internal - + # this is dumb, only tests send in strings + timestamp = Timestamp(timestamp) with self.create() as deleter: deleter._extension = '.ts' - deleter.put({'X-Timestamp': timestamp}) + deleter.put({'X-Timestamp': timestamp.internal}) + +# TODO: move DiskFileManager definition down here +DiskFileManager.diskfile_cls = DiskFile + + +class ECDiskFileReader(DiskFileReader): + pass + + +class ECDiskFileWriter(DiskFileWriter): + + def _finalize_durable(self, durable_file_path): + exc = msg = None + try: + with open(durable_file_path, 'w') as _fd: + fsync(_fd) + try: + self.manager.hash_cleanup_listdir(self._datadir) + except OSError: + self.manager.logger.exception( + _('Problem cleaning up %s'), self._datadir) + except OSError: + msg = (_('Problem fsyncing durable state file: %s'), + durable_file_path) + exc = DiskFileError(msg) + except IOError as io_err: + if io_err.errno in (errno.ENOSPC, errno.EDQUOT): + msg = (_("No space left on device for %s"), + durable_file_path) + exc = DiskFileNoSpace() + else: + msg = (_('Problem writing durable state file: %s'), + durable_file_path) + exc = DiskFileError(msg) + if exc: + self.manager.logger.exception(msg) + raise exc + + def commit(self, timestamp): + """ + Finalize put by writing a timestamp.durable file for the object. We + do this for EC policy because it requires a 2-phase put commit + confirmation. + + :param timestamp: object put timestamp, an instance of + :class:`~swift.common.utils.Timestamp` + """ + durable_file_path = os.path.join( + self._datadir, timestamp.internal + '.durable') + self._threadpool.force_run_in_thread( + self._finalize_durable, durable_file_path) + + def put(self, metadata): + """ + The only difference between this method and the replication policy + DiskFileWriter method is the call into manager.make_on_disk_filename + to construct the data file name. + """ + timestamp = Timestamp(metadata['X-Timestamp']) + fi = None + if self._extension == '.data': + # generally we treat the fragment index provided in metadata as + # canon, but if it's unavailable (e.g. tests) it's reasonable to + # use the frag_index provided at instantiation. Either way make + # sure that the fragment index is included in object sysmeta. + fi = metadata.setdefault('X-Object-Sysmeta-Ec-Frag-Index', + self._diskfile._frag_index) + filename = self.manager.make_on_disk_filename( + timestamp, self._extension, frag_index=fi) + metadata['name'] = self._name + target_path = join(self._datadir, filename) + + self._threadpool.force_run_in_thread( + self._finalize_put, metadata, target_path) + + +class ECDiskFile(DiskFile): + + reader_cls = ECDiskFileReader + writer_cls = ECDiskFileWriter + + def __init__(self, *args, **kwargs): + super(ECDiskFile, self).__init__(*args, **kwargs) + frag_index = kwargs.get('frag_index') + self._frag_index = None + if frag_index is not None: + self._frag_index = self.manager.validate_fragment_index(frag_index) + + def _get_ondisk_file(self): + """ + The only difference between this method and the replication policy + DiskFile method is passing in the frag_index kwarg to our manager's + get_ondisk_files method. + """ + try: + files = os.listdir(self._datadir) + except OSError as err: + if err.errno == errno.ENOTDIR: + # If there's a file here instead of a directory, quarantine + # it; something's gone wrong somewhere. + raise self._quarantine( + # hack: quarantine_renamer actually renames the directory + # enclosing the filename you give it, but here we just + # want this one file and not its parent. + os.path.join(self._datadir, "made-up-filename"), + "Expected directory, found file at %s" % self._datadir) + elif err.errno != errno.ENOENT: + raise DiskFileError( + "Error listing directory %s: %s" % (self._datadir, err)) + # The data directory does not exist, so the object cannot exist. + fileset = (None, None, None) + else: + fileset = self.manager.get_ondisk_files( + files, self._datadir, frag_index=self._frag_index) + return fileset + + def purge(self, timestamp, frag_index): + """ + Remove a tombstone file matching the specified timestamp or + datafile matching the specified timestamp and fragment index + from the object directory. + + This provides the EC reconstructor/ssync process with a way to + remove a tombstone or fragment from a handoff node after + reverting it to its primary node. + + The hash will be invalidated, and if empty or invalid the + hsh_path will be removed on next hash_cleanup_listdir. + + :param timestamp: the object timestamp, an instance of + :class:`~swift.common.utils.Timestamp` + :param frag_index: a fragment archive index, must be a whole number. + """ + for ext in ('.data', '.ts'): + purge_file = self.manager.make_on_disk_filename( + timestamp, ext=ext, frag_index=frag_index) + remove_file(os.path.join(self._datadir, purge_file)) + self.manager.invalidate_hash(dirname(self._datadir)) + + +@DiskFileRouter.register(EC_POLICY) +class ECDiskFileManager(DiskFileManager): + diskfile_cls = ECDiskFile + + def validate_fragment_index(self, frag_index): + """ + Return int representation of frag_index, or raise a DiskFileError if + frag_index is not a whole number. + """ + try: + frag_index = int(str(frag_index)) + except (ValueError, TypeError) as e: + raise DiskFileError( + 'Bad fragment index: %s: %s' % (frag_index, e)) + if frag_index < 0: + raise DiskFileError( + 'Fragment index must not be negative: %s' % frag_index) + return frag_index + + def make_on_disk_filename(self, timestamp, ext=None, frag_index=None, + *a, **kw): + """ + Returns the EC specific filename for given timestamp. + + :param timestamp: the object timestamp, an instance of + :class:`~swift.common.utils.Timestamp` + :param ext: an optional string representing a file extension to be + appended to the returned file name + :param frag_index: a fragment archive index, used with .data extension + only, must be a whole number. + :returns: a file name + :raises DiskFileError: if ext=='.data' and the kwarg frag_index is not + a whole number + """ + rv = timestamp.internal + if ext == '.data': + # for datafiles only we encode the fragment index in the filename + # to allow archives of different indexes to temporarily be stored + # on the same node in certain situations + frag_index = self.validate_fragment_index(frag_index) + rv += '#' + str(frag_index) + if ext: + rv = '%s%s' % (rv, ext) + return rv + + def parse_on_disk_filename(self, filename): + """ + Returns the timestamp extracted from a policy specific .data file name. + For EC policy the data file name includes a fragment index which must + be stripped off to retrieve the timestamp. + + :param filename: the data file name including extension + :returns: a dict, with keys for timestamp, frag_index, and ext:: + + * timestamp is a :class:`~swift.common.utils.Timestamp` + * frag_index is an int or None + * ext is a string, the file extension including the leading dot or + the empty string if the filename has no extenstion. + + :raises DiskFileError: if any part of the filename is not able to be + validated. + """ + frag_index = None + filename, ext = splitext(filename) + parts = filename.split('#', 1) + timestamp = parts[0] + if ext == '.data': + # it is an error for an EC data file to not have a valid + # fragment index + try: + frag_index = parts[1] + except IndexError: + frag_index = None + frag_index = self.validate_fragment_index(frag_index) + return { + 'timestamp': Timestamp(timestamp), + 'frag_index': frag_index, + 'ext': ext, + } + + def is_obsolete(self, filename, other_filename): + """ + Test if a given file is considered to be obsolete with respect to + another file in an object storage dir. + + Implements EC policy specific behavior when comparing files against a + .durable file. + + A simple string comparison would consider t2#1.data to be older than + t2.durable (since t2#1.data < t2.durable). By stripping off the file + extensions we get the desired behavior: t2#1 > t2 without compromising + the detection of t1#1 < t2. + + :param filename: a string representing an absolute filename + :param other_filename: a string representing an absolute filename + :returns: True if filename is considered obsolete, False otherwise. + """ + if other_filename.endswith('.durable'): + return splitext(filename)[0] < splitext(other_filename)[0] + return filename < other_filename + + def _gather_on_disk_file(self, filename, ext, context, frag_index=None, + **kwargs): + """ + Called by gather_ondisk_files() for each file in an object + datadir in reverse sorted order. If a file is considered part of a + valid on-disk file set it will be added to the context dict, keyed by + its extension. If a file is considered to be obsolete it will be added + to a list stored under the key 'obsolete' in the context dict. + + :param filename: name of file to be accepted or not + :param ext: extension part of filename + :param context: a context dict that may have been populated by previous + calls to this method + :param frag_index: if set, search for a specific fragment index .data + file, otherwise accept the first valid .data file. + :returns: True if a valid file set has been found, False otherwise + """ + + # if first file with given extension then add filename to context + # dict and return True + accept_first = lambda: context.setdefault(ext, filename) == filename + # add the filename to the list of obsolete files in context dict + discard = lambda: context.setdefault('obsolete', []).append(filename) + # set a flag in the context dict indicating that a valid fileset has + # been found + set_valid_fileset = lambda: context.setdefault('found_valid', True) + # return True if the valid fileset flag is set in the context dict + have_valid_fileset = lambda: context.get('found_valid') + + if context.get('.durable'): + # a .durable file has been found + if ext == '.data': + if self.is_obsolete(filename, context.get('.durable')): + # this and remaining data files are older than durable + discard() + set_valid_fileset() + else: + # accept the first .data file if it matches requested + # frag_index, or if no specific frag_index is requested + fi = self.parse_on_disk_filename(filename)['frag_index'] + if frag_index is None or frag_index == int(fi): + accept_first() + set_valid_fileset() + # else: keep searching for a .data file to match frag_index + context.setdefault('fragments', []).append(filename) + else: + # there can no longer be a matching .data file so mark what has + # been found so far as the valid fileset + discard() + set_valid_fileset() + elif ext == '.data': + # not yet found a .durable + if have_valid_fileset(): + # valid fileset means we must have a newer + # .ts, so discard the older .data file + discard() + else: + # .data newer than a .durable or .ts, don't discard yet + context.setdefault('fragments_without_durable', []).append( + filename) + elif ext == '.ts': + if have_valid_fileset() or not accept_first(): + # newer .data, .durable or .ts already found so discard this + discard() + if not have_valid_fileset(): + # remove any .meta that may have been previously found + context['.meta'] = None + set_valid_fileset() + elif ext in ('.meta', '.durable'): + if have_valid_fileset() or not accept_first(): + # newer .data, .durable or .ts already found so discard this + discard() + else: + # ignore unexpected files + pass + return have_valid_fileset() + + def _verify_on_disk_files(self, accepted_files, frag_index=None, **kwargs): + """ + Verify that the final combination of on disk files complies with the + diskfile contract. + + :param accepted_files: files that have been found and accepted + :param frag_index: specifies a specific fragment index .data file + :returns: True if the file combination is compliant, False otherwise + """ + if not accepted_files.get('.data'): + # We may find only a .meta, which doesn't mean the on disk + # contract is broken. So we clear it to comply with + # superclass assertions. + accepted_files['.meta'] = None + + data_file, meta_file, ts_file, durable_file = tuple( + [accepted_files.get(ext) + for ext in ('.data', '.meta', '.ts', '.durable')]) + + return ((data_file is None or durable_file is not None) + and (data_file is None and meta_file is None + and ts_file is None and durable_file is None) + or (ts_file is not None and data_file is None + and meta_file is None and durable_file is None) + or (data_file is not None and durable_file is not None + and ts_file is None) + or (durable_file is not None and meta_file is None + and ts_file is None)) + + def gather_ondisk_files(self, files, include_obsolete=False, + frag_index=None, verify=False, **kwargs): + """ + Given a simple list of files names, iterate over them to determine the + files that constitute a valid object, and optionally determine the + files that are obsolete and could be deleted. Note that some files may + fall into neither category. + + :param files: a list of file names. + :param include_obsolete: By default the iteration will stop when a + valid file set has been found. Setting this + argument to True will cause the iteration to + continue in order to find all obsolete files. + :param frag_index: if set, search for a specific fragment index .data + file, otherwise accept the first valid .data file. + :returns: a dict that may contain: valid on disk files keyed by their + filename extension; a list of obsolete files stored under the + key 'obsolete'. + """ + # This visitor pattern enables future refactoring of other disk + # manager implementations to re-use this method and override + # _gather_ondisk_file and _verify_ondisk_files to apply implementation + # specific selection and verification of on-disk files. + files.sort(reverse=True) + results = {} + for afile in files: + ts_file = results.get('.ts') + data_file = results.get('.data') + if not include_obsolete: + assert ts_file is None, "On-disk file search loop" \ + " continuing after tombstone, %s, encountered" % ts_file + assert data_file is None, "On-disk file search loop" \ + " continuing after data file, %s, encountered" % data_file + + ext = splitext(afile)[1] + if self._gather_on_disk_file( + afile, ext, results, frag_index=frag_index, **kwargs): + if not include_obsolete: + break + + if verify: + assert self._verify_on_disk_files( + results, frag_index=frag_index, **kwargs), \ + "On-disk file search algorithm contract is broken: %s" \ + % results.values() + return results + + def get_ondisk_files(self, files, datadir, **kwargs): + """ + Given a simple list of files names, determine the files to use. + + :param files: simple set of files as a python list + :param datadir: directory name files are from for convenience + :returns: a tuple of data, meta, and tombstone + """ + # maintain compatibility with 'legacy' get_ondisk_files return value + accepted_files = self.gather_ondisk_files(files, verify=True, **kwargs) + result = [(join(datadir, accepted_files.get(ext)) + if accepted_files.get(ext) else None) + for ext in ('.data', '.meta', '.ts')] + return tuple(result) + + def cleanup_ondisk_files(self, hsh_path, reclaim_age=ONE_WEEK, + frag_index=None): + """ + Clean up on-disk files that are obsolete and gather the set of valid + on-disk files for an object. + + :param hsh_path: object hash path + :param reclaim_age: age in seconds at which to remove tombstones + :param frag_index: if set, search for a specific fragment index .data + file, otherwise accept the first valid .data file + :returns: a dict that may contain: valid on disk files keyed by their + filename extension; a list of obsolete files stored under the + key 'obsolete'; a list of files remaining in the directory, + reverse sorted, stored under the key 'files'. + """ + def is_reclaimable(filename): + timestamp = self.parse_on_disk_filename(filename)['timestamp'] + return (time.time() - float(timestamp)) > reclaim_age + + files = listdir(hsh_path) + files.sort(reverse=True) + results = self.gather_ondisk_files(files, include_obsolete=True, + frag_index=frag_index) + if '.durable' in results and not results.get('fragments'): + # a .durable with no .data is deleted as soon as it is found + results.setdefault('obsolete', []).append(results.pop('.durable')) + if '.ts' in results and is_reclaimable(results['.ts']): + results.setdefault('obsolete', []).append(results.pop('.ts')) + for filename in results.get('fragments_without_durable', []): + # stray fragments are not deleted until reclaim-age + if is_reclaimable(filename): + results.setdefault('obsolete', []).append(filename) + for filename in results.get('obsolete', []): + remove_file(join(hsh_path, filename)) + files.remove(filename) + results['files'] = files + return results + + def hash_cleanup_listdir(self, hsh_path, reclaim_age=ONE_WEEK): + """ + List contents of a hash directory and clean up any old files. + For EC policy, delete files older than a .durable or .ts file. + + :param hsh_path: object hash path + :param reclaim_age: age in seconds at which to remove tombstones + :returns: list of files remaining in the directory, reverse sorted + """ + # maintain compatibility with 'legacy' hash_cleanup_listdir + # return value + return self.cleanup_ondisk_files( + hsh_path, reclaim_age=reclaim_age)['files'] + + def yield_hashes(self, device, partition, policy, + suffixes=None, frag_index=None): + """ + This is the same as the replicated yield_hashes except when frag_index + is provided data files for fragment indexes not matching the given + frag_index are skipped. + """ + dev_path = self.get_dev_path(device) + if not dev_path: + raise DiskFileDeviceUnavailable() + if suffixes is None: + suffixes = self.yield_suffixes(device, partition, policy) + else: + partition_path = os.path.join(dev_path, + get_data_dir(policy), + str(partition)) + suffixes = ( + (os.path.join(partition_path, suffix), suffix) + for suffix in suffixes) + for suffix_path, suffix in suffixes: + for object_hash in self._listdir(suffix_path): + object_path = os.path.join(suffix_path, object_hash) + newest_valid_file = None + try: + results = self.cleanup_ondisk_files( + object_path, self.reclaim_age, frag_index=frag_index) + newest_valid_file = (results.get('.meta') + or results.get('.data') + or results.get('.ts')) + if newest_valid_file: + timestamp = self.parse_on_disk_filename( + newest_valid_file)['timestamp'] + yield (object_path, object_hash, timestamp.internal) + except AssertionError as err: + self.logger.debug('Invalid file set in %s (%s)' % ( + object_path, err)) + except DiskFileError as err: + self.logger.debug( + 'Invalid diskfile filename %r in %r (%s)' % ( + newest_valid_file, object_path, err)) + + def _hash_suffix(self, path, reclaim_age): + """ + The only difference between this method and the module level function + hash_suffix is the way that files are updated on the returned hash. + + Instead of all filenames hashed into a single hasher, each file name + will fall into a bucket either by fragment index for datafiles, or + None (indicating a durable, metadata or tombstone). + """ + # hash_per_fi instead of single hash for whole suffix + hash_per_fi = defaultdict(hashlib.md5) + try: + path_contents = sorted(os.listdir(path)) + except OSError as err: + if err.errno in (errno.ENOTDIR, errno.ENOENT): + raise PathNotDir() + raise + for hsh in path_contents: + hsh_path = join(path, hsh) + try: + files = self.hash_cleanup_listdir(hsh_path, reclaim_age) + except OSError as err: + if err.errno == errno.ENOTDIR: + partition_path = dirname(path) + objects_path = dirname(partition_path) + device_path = dirname(objects_path) + quar_path = quarantine_renamer(device_path, hsh_path) + logging.exception( + _('Quarantined %(hsh_path)s to %(quar_path)s because ' + 'it is not a directory'), {'hsh_path': hsh_path, + 'quar_path': quar_path}) + continue + raise + if not files: + try: + os.rmdir(hsh_path) + except OSError: + pass + # we just deleted this hsh_path, why are we waiting + # until the next suffix hash to raise PathNotDir so that + # this suffix will get del'd from the suffix hashes? + for filename in files: + info = self.parse_on_disk_filename(filename) + fi = info['frag_index'] + if fi is None: + hash_per_fi[fi].update(filename) + else: + hash_per_fi[fi].update(info['timestamp'].internal) + try: + os.rmdir(path) + except OSError: + pass + # here we flatten out the hashers hexdigest into a dictionary instead + # of just returning the one hexdigest for the whole suffix + return dict((fi, md5.hexdigest()) for fi, md5 in hash_per_fi.items()) + + def _get_hashes(self, partition_path, recalculate=None, do_listdir=False, + reclaim_age=None): + """ + The only difference with this method and the module level function + get_hashes is the call to hash_suffix routes to a method _hash_suffix + on this instance. + """ + reclaim_age = reclaim_age or self.reclaim_age + hashed = 0 + hashes_file = join(partition_path, HASH_FILE) + modified = False + force_rewrite = False + hashes = {} + mtime = -1 + + if recalculate is None: + recalculate = [] + + try: + with open(hashes_file, 'rb') as fp: + hashes = pickle.load(fp) + mtime = getmtime(hashes_file) + except Exception: + do_listdir = True + force_rewrite = True + if do_listdir: + for suff in os.listdir(partition_path): + if len(suff) == 3: + hashes.setdefault(suff, None) + modified = True + hashes.update((suffix, None) for suffix in recalculate) + for suffix, hash_ in hashes.items(): + if not hash_: + suffix_dir = join(partition_path, suffix) + try: + hashes[suffix] = self._hash_suffix(suffix_dir, reclaim_age) + hashed += 1 + except PathNotDir: + del hashes[suffix] + except OSError: + logging.exception(_('Error hashing suffix')) + modified = True + if modified: + with lock_path(partition_path): + if force_rewrite or not exists(hashes_file) or \ + getmtime(hashes_file) == mtime: + write_pickle( + hashes, hashes_file, partition_path, PICKLE_PROTOCOL) + return hashed, hashes + return self._get_hashes(partition_path, recalculate, do_listdir, + reclaim_age) + else: + return hashed, hashes diff --git a/swift/obj/mem_diskfile.py b/swift/obj/mem_diskfile.py index efb8c6c8c..be5fbf134 100644 --- a/swift/obj/mem_diskfile.py +++ b/swift/obj/mem_diskfile.py @@ -57,6 +57,12 @@ class InMemoryFileSystem(object): def get_diskfile(self, account, container, obj, **kwargs): return DiskFile(self, account, container, obj) + def pickle_async_update(self, *args, **kwargs): + """ + For now don't handle async updates. + """ + pass + class DiskFileWriter(object): """ @@ -98,6 +104,16 @@ class DiskFileWriter(object): metadata['name'] = self._name self._filesystem.put_object(self._name, self._fp, metadata) + def commit(self, timestamp): + """ + Perform any operations necessary to mark the object as durable. For + mem_diskfile type this is a no-op. + + :param timestamp: object put timestamp, an instance of + :class:`~swift.common.utils.Timestamp` + """ + pass + class DiskFileReader(object): """ diff --git a/swift/obj/mem_server.py b/swift/obj/mem_server.py index 83647661a..764a92a92 100644 --- a/swift/obj/mem_server.py +++ b/swift/obj/mem_server.py @@ -15,15 +15,7 @@ """ In-Memory Object Server for Swift """ -import os -from swift import gettext_ as _ -from eventlet import Timeout - -from swift.common.bufferedhttp import http_connect -from swift.common.exceptions import ConnectionTimeout - -from swift.common.http import is_success from swift.obj.mem_diskfile import InMemoryFileSystem from swift.obj import server @@ -53,49 +45,6 @@ class ObjectController(server.ObjectController): """ return self._filesystem.get_diskfile(account, container, obj, **kwargs) - def async_update(self, op, account, container, obj, host, partition, - contdevice, headers_out, objdevice, policy_idx): - """ - Sends or saves an async update. - - :param op: operation performed (ex: 'PUT', or 'DELETE') - :param account: account name for the object - :param container: container name for the object - :param obj: object name - :param host: host that the container is on - :param partition: partition that the container is on - :param contdevice: device name that the container is on - :param headers_out: dictionary of headers to send in the container - request - :param objdevice: device name that the object is in - :param policy_idx: the associated storage policy index - """ - headers_out['user-agent'] = 'object-server %s' % os.getpid() - full_path = '/%s/%s/%s' % (account, container, obj) - if all([host, partition, contdevice]): - try: - with ConnectionTimeout(self.conn_timeout): - ip, port = host.rsplit(':', 1) - conn = http_connect(ip, port, contdevice, partition, op, - full_path, headers_out) - with Timeout(self.node_timeout): - response = conn.getresponse() - response.read() - if is_success(response.status): - return - else: - self.logger.error(_( - 'ERROR Container update failed: %(status)d ' - 'response from %(ip)s:%(port)s/%(dev)s'), - {'status': response.status, 'ip': ip, 'port': port, - 'dev': contdevice}) - except (Exception, Timeout): - self.logger.exception(_( - 'ERROR container update failed with ' - '%(ip)s:%(port)s/%(dev)s'), - {'ip': ip, 'port': port, 'dev': contdevice}) - # FIXME: For now don't handle async updates - def REPLICATE(self, request): """ Handle REPLICATE requests for the Swift Object Server. This is used diff --git a/swift/obj/reconstructor.py b/swift/obj/reconstructor.py new file mode 100644 index 000000000..0ee2afbf6 --- /dev/null +++ b/swift/obj/reconstructor.py @@ -0,0 +1,925 @@ +# Copyright (c) 2010-2015 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +from os.path import join +import random +import time +import itertools +from collections import defaultdict +import cPickle as pickle +import shutil + +from eventlet import (GreenPile, GreenPool, Timeout, sleep, hubs, tpool, + spawn) +from eventlet.support.greenlets import GreenletExit + +from swift import gettext_ as _ +from swift.common.utils import ( + whataremyips, unlink_older_than, compute_eta, get_logger, + dump_recon_cache, ismount, mkdirs, config_true_value, list_from_csv, + get_hub, tpool_reraise, GreenAsyncPile, Timestamp, remove_file) +from swift.common.swob import HeaderKeyDict +from swift.common.bufferedhttp import http_connect +from swift.common.daemon import Daemon +from swift.common.ring.utils import is_local_device +from swift.obj.ssync_sender import Sender as ssync_sender +from swift.common.http import HTTP_OK, HTTP_INSUFFICIENT_STORAGE +from swift.obj.diskfile import DiskFileRouter, get_data_dir, \ + get_tmp_dir +from swift.common.storage_policy import POLICIES, EC_POLICY +from swift.common.exceptions import ConnectionTimeout, DiskFileError, \ + SuffixSyncError + +SYNC, REVERT = ('sync_only', 'sync_revert') + + +hubs.use_hub(get_hub()) + + +class RebuildingECDiskFileStream(object): + """ + This class wraps the the reconstructed fragment archive data and + metadata in the DiskFile interface for ssync. + """ + + def __init__(self, metadata, frag_index, rebuilt_fragment_iter): + # start with metadata from a participating FA + self.metadata = metadata + + # the new FA is going to have the same length as others in the set + self._content_length = self.metadata['Content-Length'] + + # update the FI and delete the ETag, the obj server will + # recalc on the other side... + self.metadata['X-Object-Sysmeta-Ec-Frag-Index'] = frag_index + del self.metadata['ETag'] + + self.frag_index = frag_index + self.rebuilt_fragment_iter = rebuilt_fragment_iter + + def get_metadata(self): + return self.metadata + + @property + def content_length(self): + return self._content_length + + def reader(self): + for chunk in self.rebuilt_fragment_iter: + yield chunk + + +class ObjectReconstructor(Daemon): + """ + Reconstruct objects using erasure code. And also rebalance EC Fragment + Archive objects off handoff nodes. + + Encapsulates most logic and data needed by the object reconstruction + process. Each call to .reconstruct() performs one pass. It's up to the + caller to do this in a loop. + """ + + def __init__(self, conf, logger=None): + """ + :param conf: configuration object obtained from ConfigParser + :param logger: logging object + """ + self.conf = conf + self.logger = logger or get_logger( + conf, log_route='object-reconstructor') + self.devices_dir = conf.get('devices', '/srv/node') + self.mount_check = config_true_value(conf.get('mount_check', 'true')) + self.swift_dir = conf.get('swift_dir', '/etc/swift') + self.port = int(conf.get('bind_port', 6000)) + self.concurrency = int(conf.get('concurrency', 1)) + self.stats_interval = int(conf.get('stats_interval', '300')) + self.ring_check_interval = int(conf.get('ring_check_interval', 15)) + self.next_check = time.time() + self.ring_check_interval + self.reclaim_age = int(conf.get('reclaim_age', 86400 * 7)) + self.partition_times = [] + self.run_pause = int(conf.get('run_pause', 30)) + self.http_timeout = int(conf.get('http_timeout', 60)) + self.lockup_timeout = int(conf.get('lockup_timeout', 1800)) + self.recon_cache_path = conf.get('recon_cache_path', + '/var/cache/swift') + self.rcache = os.path.join(self.recon_cache_path, "object.recon") + # defaults subject to change after beta + self.conn_timeout = float(conf.get('conn_timeout', 0.5)) + self.node_timeout = float(conf.get('node_timeout', 10)) + self.network_chunk_size = int(conf.get('network_chunk_size', 65536)) + self.disk_chunk_size = int(conf.get('disk_chunk_size', 65536)) + self.headers = { + 'Content-Length': '0', + 'user-agent': 'obj-reconstructor %s' % os.getpid()} + self.handoffs_first = config_true_value(conf.get('handoffs_first', + False)) + self._df_router = DiskFileRouter(conf, self.logger) + + def load_object_ring(self, policy): + """ + Make sure the policy's rings are loaded. + + :param policy: the StoragePolicy instance + :returns: appropriate ring object + """ + policy.load_ring(self.swift_dir) + return policy.object_ring + + def check_ring(self, object_ring): + """ + Check to see if the ring has been updated + + :param object_ring: the ring to check + :returns: boolean indicating whether or not the ring has changed + """ + if time.time() > self.next_check: + self.next_check = time.time() + self.ring_check_interval + if object_ring.has_changed(): + return False + return True + + def _full_path(self, node, part, path, policy): + return '%(replication_ip)s:%(replication_port)s' \ + '/%(device)s/%(part)s%(path)s ' \ + 'policy#%(policy)d frag#%(frag_index)s' % { + 'replication_ip': node['replication_ip'], + 'replication_port': node['replication_port'], + 'device': node['device'], + 'part': part, 'path': path, + 'policy': policy, + 'frag_index': node.get('index', 'handoff'), + } + + def _get_response(self, node, part, path, headers, policy): + """ + Helper method for reconstruction that GETs a single EC fragment + archive + + :param node: the node to GET from + :param part: the partition + :param path: full path of the desired EC archive + :param headers: the headers to send + :param policy: an instance of + :class:`~swift.common.storage_policy.BaseStoragePolicy` + :returns: response + """ + resp = None + headers['X-Backend-Node-Index'] = node['index'] + try: + with ConnectionTimeout(self.conn_timeout): + conn = http_connect(node['ip'], node['port'], node['device'], + part, 'GET', path, headers=headers) + with Timeout(self.node_timeout): + resp = conn.getresponse() + if resp.status != HTTP_OK: + self.logger.warning( + _("Invalid response %(resp)s from %(full_path)s"), + {'resp': resp.status, + 'full_path': self._full_path(node, part, path, policy)}) + resp = None + except (Exception, Timeout): + self.logger.exception( + _("Trying to GET %(full_path)s"), { + 'full_path': self._full_path(node, part, path, policy)}) + return resp + + def reconstruct_fa(self, job, node, metadata): + """ + Reconstructs a fragment archive - this method is called from ssync + after a remote node responds that is missing this object - the local + diskfile is opened to provide metadata - but to reconstruct the + missing fragment archive we must connect to multiple object servers. + + :param job: job from ssync_sender + :param node: node that we're rebuilding to + :param metadata: the metadata to attach to the rebuilt archive + :returns: a DiskFile like class for use by ssync + :raises DiskFileError: if the fragment archive cannot be reconstructed + """ + + part_nodes = job['policy'].object_ring.get_part_nodes( + job['partition']) + part_nodes.remove(node) + + # the fragment index we need to reconstruct is the position index + # of the node we're rebuilding to within the primary part list + fi_to_rebuild = node['index'] + + # KISS send out connection requests to all nodes, see what sticks + headers = { + 'X-Backend-Storage-Policy-Index': int(job['policy']), + } + pile = GreenAsyncPile(len(part_nodes)) + path = metadata['name'] + for node in part_nodes: + pile.spawn(self._get_response, node, job['partition'], + path, headers, job['policy']) + responses = [] + etag = None + for resp in pile: + if not resp: + continue + resp.headers = HeaderKeyDict(resp.getheaders()) + responses.append(resp) + etag = sorted(responses, reverse=True, + key=lambda r: Timestamp( + r.headers.get('X-Backend-Timestamp') + ))[0].headers.get('X-Object-Sysmeta-Ec-Etag') + responses = [r for r in responses if + r.headers.get('X-Object-Sysmeta-Ec-Etag') == etag] + + if len(responses) >= job['policy'].ec_ndata: + break + else: + self.logger.error( + 'Unable to get enough responses (%s/%s) ' + 'to reconstruct %s with ETag %s' % ( + len(responses), job['policy'].ec_ndata, + self._full_path(node, job['partition'], + metadata['name'], job['policy']), + etag)) + raise DiskFileError('Unable to reconstruct EC archive') + + rebuilt_fragment_iter = self.make_rebuilt_fragment_iter( + responses[:job['policy'].ec_ndata], path, job['policy'], + fi_to_rebuild) + return RebuildingECDiskFileStream(metadata, fi_to_rebuild, + rebuilt_fragment_iter) + + def _reconstruct(self, policy, fragment_payload, frag_index): + # XXX with jerasure this doesn't work if we need to rebuild a + # parity fragment, and not all data fragments are available + # segment = policy.pyeclib_driver.reconstruct( + # fragment_payload, [frag_index])[0] + + # for safety until pyeclib 1.0.7 we'll just use decode and encode + segment = policy.pyeclib_driver.decode(fragment_payload) + return policy.pyeclib_driver.encode(segment)[frag_index] + + def make_rebuilt_fragment_iter(self, responses, path, policy, frag_index): + """ + Turn a set of connections from backend object servers into a generator + that yields up the rebuilt fragment archive for frag_index. + """ + + def _get_one_fragment(resp): + buff = '' + remaining_bytes = policy.fragment_size + while remaining_bytes: + chunk = resp.read(remaining_bytes) + if not chunk: + break + remaining_bytes -= len(chunk) + buff += chunk + return buff + + def fragment_payload_iter(): + # We need a fragment from each connections, so best to + # use a GreenPile to keep them ordered and in sync + pile = GreenPile(len(responses)) + while True: + for resp in responses: + pile.spawn(_get_one_fragment, resp) + try: + with Timeout(self.node_timeout): + fragment_payload = [fragment for fragment in pile] + except (Exception, Timeout): + self.logger.exception( + _("Error trying to rebuild %(path)s " + "policy#%(policy)d frag#%(frag_index)s"), { + 'path': path, + 'policy': policy, + 'frag_index': frag_index, + }) + break + if not all(fragment_payload): + break + rebuilt_fragment = self._reconstruct( + policy, fragment_payload, frag_index) + yield rebuilt_fragment + + return fragment_payload_iter() + + def stats_line(self): + """ + Logs various stats for the currently running reconstruction pass. + """ + if self.reconstruction_count: + elapsed = (time.time() - self.start) or 0.000001 + rate = self.reconstruction_count / elapsed + self.logger.info( + _("%(reconstructed)d/%(total)d (%(percentage).2f%%)" + " partitions reconstructed in %(time).2fs (%(rate).2f/sec, " + "%(remaining)s remaining)"), + {'reconstructed': self.reconstruction_count, + 'total': self.job_count, + 'percentage': + self.reconstruction_count * 100.0 / self.job_count, + 'time': time.time() - self.start, 'rate': rate, + 'remaining': '%d%s' % compute_eta(self.start, + self.reconstruction_count, + self.job_count)}) + if self.suffix_count: + self.logger.info( + _("%(checked)d suffixes checked - " + "%(hashed).2f%% hashed, %(synced).2f%% synced"), + {'checked': self.suffix_count, + 'hashed': (self.suffix_hash * 100.0) / self.suffix_count, + 'synced': (self.suffix_sync * 100.0) / self.suffix_count}) + self.partition_times.sort() + self.logger.info( + _("Partition times: max %(max).4fs, " + "min %(min).4fs, med %(med).4fs"), + {'max': self.partition_times[-1], + 'min': self.partition_times[0], + 'med': self.partition_times[ + len(self.partition_times) // 2]}) + else: + self.logger.info( + _("Nothing reconstructed for %s seconds."), + (time.time() - self.start)) + + def kill_coros(self): + """Utility function that kills all coroutines currently running.""" + for coro in list(self.run_pool.coroutines_running): + try: + coro.kill(GreenletExit) + except GreenletExit: + pass + + def heartbeat(self): + """ + Loop that runs in the background during reconstruction. It + periodically logs progress. + """ + while True: + sleep(self.stats_interval) + self.stats_line() + + def detect_lockups(self): + """ + In testing, the pool.waitall() call very occasionally failed to return. + This is an attempt to make sure the reconstructor finishes its + reconstruction pass in some eventuality. + """ + while True: + sleep(self.lockup_timeout) + if self.reconstruction_count == self.last_reconstruction_count: + self.logger.error(_("Lockup detected.. killing live coros.")) + self.kill_coros() + self.last_reconstruction_count = self.reconstruction_count + + def _get_partners(self, frag_index, part_nodes): + """ + Returns the left and right partners of the node whose index is + equal to the given frag_index. + + :param frag_index: a fragment index + :param part_nodes: a list of primary nodes + :returns: [<node-to-left>, <node-to-right>] + """ + return [ + part_nodes[(frag_index - 1) % len(part_nodes)], + part_nodes[(frag_index + 1) % len(part_nodes)], + ] + + def _get_hashes(self, policy, path, recalculate=None, do_listdir=False): + df_mgr = self._df_router[policy] + hashed, suffix_hashes = tpool_reraise( + df_mgr._get_hashes, path, recalculate=recalculate, + do_listdir=do_listdir, reclaim_age=self.reclaim_age) + self.logger.update_stats('suffix.hashes', hashed) + return suffix_hashes + + def get_suffix_delta(self, local_suff, local_index, + remote_suff, remote_index): + """ + Compare the local suffix hashes with the remote suffix hashes + for the given local and remote fragment indexes. Return those + suffixes which should be synced. + + :param local_suff: the local suffix hashes (from _get_hashes) + :param local_index: the local fragment index for the job + :param remote_suff: the remote suffix hashes (from remote + REPLICATE request) + :param remote_index: the remote fragment index for the job + + :returns: a list of strings, the suffix dirs to sync + """ + suffixes = [] + for suffix, sub_dict_local in local_suff.iteritems(): + sub_dict_remote = remote_suff.get(suffix, {}) + if (sub_dict_local.get(None) != sub_dict_remote.get(None) or + sub_dict_local.get(local_index) != + sub_dict_remote.get(remote_index)): + suffixes.append(suffix) + return suffixes + + def rehash_remote(self, node, job, suffixes): + try: + with Timeout(self.http_timeout): + conn = http_connect( + node['replication_ip'], node['replication_port'], + node['device'], job['partition'], 'REPLICATE', + '/' + '-'.join(sorted(suffixes)), + headers=self.headers) + conn.getresponse().read() + except (Exception, Timeout): + self.logger.exception( + _("Trying to sync suffixes with %s") % self._full_path( + node, job['partition'], '', job['policy'])) + + def _get_suffixes_to_sync(self, job, node): + """ + For SYNC jobs we need to make a remote REPLICATE request to get + the remote node's current suffix's hashes and then compare to our + local suffix's hashes to decide which suffixes (if any) are out + of sync. + + :param: the job dict, with the keys defined in ``_get_part_jobs`` + :param node: the remote node dict + :returns: a (possibly empty) list of strings, the suffixes to be + synced with the remote node. + """ + # get hashes from the remote node + remote_suffixes = None + try: + with Timeout(self.http_timeout): + resp = http_connect( + node['replication_ip'], node['replication_port'], + node['device'], job['partition'], 'REPLICATE', + '', headers=self.headers).getresponse() + if resp.status == HTTP_INSUFFICIENT_STORAGE: + self.logger.error( + _('%s responded as unmounted'), + self._full_path(node, job['partition'], '', + job['policy'])) + elif resp.status != HTTP_OK: + self.logger.error( + _("Invalid response %(resp)s " + "from %(full_path)s"), { + 'resp': resp.status, + 'full_path': self._full_path( + node, job['partition'], '', + job['policy']) + }) + else: + remote_suffixes = pickle.loads(resp.read()) + except (Exception, Timeout): + # all exceptions are logged here so that our caller can + # safely catch our exception and continue to the next node + # without logging + self.logger.exception('Unable to get remote suffix hashes ' + 'from %r' % self._full_path( + node, job['partition'], '', + job['policy'])) + + if remote_suffixes is None: + raise SuffixSyncError('Unable to get remote suffix hashes') + + suffixes = self.get_suffix_delta(job['hashes'], + job['frag_index'], + remote_suffixes, + node['index']) + # now recalculate local hashes for suffixes that don't + # match so we're comparing the latest + local_suff = self._get_hashes(job['policy'], job['path'], + recalculate=suffixes) + + suffixes = self.get_suffix_delta(local_suff, + job['frag_index'], + remote_suffixes, + node['index']) + + self.suffix_count += len(suffixes) + return suffixes + + def delete_reverted_objs(self, job, objects, frag_index): + """ + For EC we can potentially revert only some of a partition + so we'll delete reverted objects here. Note that we delete + the fragment index of the file we sent to the remote node. + + :param job: the job being processed + :param objects: a dict of objects to be deleted, each entry maps + hash=>timestamp + :param frag_index: (int) the fragment index of data files to be deleted + """ + df_mgr = self._df_router[job['policy']] + for object_hash, timestamp in objects.items(): + try: + df = df_mgr.get_diskfile_from_hash( + job['local_dev']['device'], job['partition'], + object_hash, job['policy'], + frag_index=frag_index) + df.purge(Timestamp(timestamp), frag_index) + except DiskFileError: + continue + + def process_job(self, job): + """ + Sync the local partition with the remote node(s) according to + the parameters of the job. For primary nodes, the SYNC job type + will define both left and right hand sync_to nodes to ssync with + as defined by this primary nodes index in the node list based on + the fragment index found in the partition. For non-primary + nodes (either handoff revert, or rebalance) the REVERT job will + define a single node in sync_to which is the proper/new home for + the fragment index. + + N.B. ring rebalancing can be time consuming and handoff nodes' + fragment indexes do not have a stable order, it's possible to + have more than one REVERT job for a partition, and in some rare + failure conditions there may even also be a SYNC job for the + same partition - but each one will be processed separately + because each job will define a separate list of node(s) to + 'sync_to'. + + :param: the job dict, with the keys defined in ``_get_job_info`` + """ + self.headers['X-Backend-Storage-Policy-Index'] = int(job['policy']) + begin = time.time() + if job['job_type'] == REVERT: + self._revert(job, begin) + else: + self._sync(job, begin) + self.partition_times.append(time.time() - begin) + self.reconstruction_count += 1 + + def _sync(self, job, begin): + """ + Process a SYNC job. + """ + self.logger.increment( + 'partition.update.count.%s' % (job['local_dev']['device'],)) + # after our left and right partners, if there's some sort of + # failure we'll continue onto the remaining primary nodes and + # make sure they're in sync - or potentially rebuild missing + # fragments we find + dest_nodes = itertools.chain( + job['sync_to'], + # I think we could order these based on our index to better + # protect against a broken chain + itertools.ifilter( + lambda n: n['id'] not in (n['id'] for n in job['sync_to']), + job['policy'].object_ring.get_part_nodes(job['partition'])), + ) + syncd_with = 0 + for node in dest_nodes: + if syncd_with >= len(job['sync_to']): + # success! + break + + try: + suffixes = self._get_suffixes_to_sync(job, node) + except SuffixSyncError: + continue + + if not suffixes: + syncd_with += 1 + continue + + # ssync any out-of-sync suffixes with the remote node + success, _ = ssync_sender( + self, node, job, suffixes)() + # let remote end know to rehash it's suffixes + self.rehash_remote(node, job, suffixes) + # update stats for this attempt + self.suffix_sync += len(suffixes) + self.logger.update_stats('suffix.syncs', len(suffixes)) + if success: + syncd_with += 1 + self.logger.timing_since('partition.update.timing', begin) + + def _revert(self, job, begin): + """ + Process a REVERT job. + """ + self.logger.increment( + 'partition.delete.count.%s' % (job['local_dev']['device'],)) + # we'd desperately like to push this partition back to it's + # primary location, but if that node is down, the next best thing + # is one of the handoff locations - which *might* be us already! + dest_nodes = itertools.chain( + job['sync_to'], + job['policy'].object_ring.get_more_nodes(job['partition']), + ) + syncd_with = 0 + reverted_objs = {} + for node in dest_nodes: + if syncd_with >= len(job['sync_to']): + break + if node['id'] == job['local_dev']['id']: + # this is as good a place as any for this data for now + break + success, in_sync_objs = ssync_sender( + self, node, job, job['suffixes'])() + self.rehash_remote(node, job, job['suffixes']) + if success: + syncd_with += 1 + reverted_objs.update(in_sync_objs) + if syncd_with >= len(job['sync_to']): + self.delete_reverted_objs( + job, reverted_objs, job['frag_index']) + self.logger.timing_since('partition.delete.timing', begin) + + def _get_part_jobs(self, local_dev, part_path, partition, policy): + """ + Helper function to build jobs for a partition, this method will + read the suffix hashes and create job dictionaries to describe + the needed work. There will be one job for each fragment index + discovered in the partition. + + For a fragment index which corresponds to this node's ring + index, a job with job_type SYNC will be created to ensure that + the left and right hand primary ring nodes for the part have the + corresponding left and right hand fragment archives. + + A fragment index (or entire partition) for which this node is + not the primary corresponding node, will create job(s) with + job_type REVERT to ensure that fragment archives are pushed to + the correct node and removed from this one. + + A partition may result in multiple jobs. Potentially many + REVERT jobs, and zero or one SYNC job. + + :param local_dev: the local device + :param part_path: full path to partition + :param partition: partition number + :param policy: the policy + + :returns: a list of dicts of job info + """ + # find all the fi's in the part, and which suffixes have them + hashes = self._get_hashes(policy, part_path, do_listdir=True) + non_data_fragment_suffixes = [] + data_fi_to_suffixes = defaultdict(list) + for suffix, fi_hash in hashes.items(): + if not fi_hash: + # this is for sanity and clarity, normally an empty + # suffix would get del'd from the hashes dict, but an + # OSError trying to re-hash the suffix could leave the + # value empty - it will log the exception; but there's + # no way to properly address this suffix at this time. + continue + data_frag_indexes = [f for f in fi_hash if f is not None] + if not data_frag_indexes: + non_data_fragment_suffixes.append(suffix) + else: + for fi in data_frag_indexes: + data_fi_to_suffixes[fi].append(suffix) + + # helper to ensure consistent structure of jobs + def build_job(job_type, frag_index, suffixes, sync_to): + return { + 'job_type': job_type, + 'frag_index': frag_index, + 'suffixes': suffixes, + 'sync_to': sync_to, + 'partition': partition, + 'path': part_path, + 'hashes': hashes, + 'policy': policy, + 'local_dev': local_dev, + # ssync likes to have it handy + 'device': local_dev['device'], + } + + # aggregate jobs for all the fragment index in this part + jobs = [] + + # check the primary nodes - to see if the part belongs here + part_nodes = policy.object_ring.get_part_nodes(partition) + for node in part_nodes: + if node['id'] == local_dev['id']: + # this partition belongs here, we'll need a sync job + frag_index = node['index'] + try: + suffixes = data_fi_to_suffixes.pop(frag_index) + except KeyError: + suffixes = [] + sync_job = build_job( + job_type=SYNC, + frag_index=frag_index, + suffixes=suffixes, + sync_to=self._get_partners(frag_index, part_nodes), + ) + # ssync callback to rebuild missing fragment_archives + sync_job['sync_diskfile_builder'] = self.reconstruct_fa + jobs.append(sync_job) + break + + # assign remaining data fragment suffixes to revert jobs + ordered_fis = sorted((len(suffixes), fi) for fi, suffixes + in data_fi_to_suffixes.items()) + for count, fi in ordered_fis: + revert_job = build_job( + job_type=REVERT, + frag_index=fi, + suffixes=data_fi_to_suffixes[fi], + sync_to=[part_nodes[fi]], + ) + jobs.append(revert_job) + + # now we need to assign suffixes that have no data fragments + if non_data_fragment_suffixes: + if jobs: + # the first job will be either the sync_job, or the + # revert_job for the fragment index that is most common + # among the suffixes + jobs[0]['suffixes'].extend(non_data_fragment_suffixes) + else: + # this is an unfortunate situation, we need a revert job to + # push partitions off this node, but none of the suffixes + # have any data fragments to hint at which node would be a + # good candidate to receive the tombstones. + jobs.append(build_job( + job_type=REVERT, + frag_index=None, + suffixes=non_data_fragment_suffixes, + # this is super safe + sync_to=part_nodes, + # something like this would be probably be better + # sync_to=random.sample(part_nodes, 3), + )) + # return a list of jobs for this part + return jobs + + def collect_parts(self, override_devices=None, + override_partitions=None): + """ + Helper for yielding partitions in the top level reconstructor + """ + override_devices = override_devices or [] + override_partitions = override_partitions or [] + ips = whataremyips() + for policy in POLICIES: + if policy.policy_type != EC_POLICY: + continue + self._diskfile_mgr = self._df_router[policy] + self.load_object_ring(policy) + data_dir = get_data_dir(policy) + local_devices = itertools.ifilter( + lambda dev: dev and is_local_device( + ips, self.port, + dev['replication_ip'], dev['replication_port']), + policy.object_ring.devs) + for local_dev in local_devices: + if override_devices and (local_dev['device'] not in + override_devices): + continue + dev_path = join(self.devices_dir, local_dev['device']) + obj_path = join(dev_path, data_dir) + tmp_path = join(dev_path, get_tmp_dir(int(policy))) + if self.mount_check and not ismount(dev_path): + self.logger.warn(_('%s is not mounted'), + local_dev['device']) + continue + unlink_older_than(tmp_path, time.time() - + self.reclaim_age) + if not os.path.exists(obj_path): + try: + mkdirs(obj_path) + except Exception: + self.logger.exception( + 'Unable to create %s' % obj_path) + continue + try: + partitions = os.listdir(obj_path) + except OSError: + self.logger.exception( + 'Unable to list partitions in %r' % obj_path) + continue + for partition in partitions: + part_path = join(obj_path, partition) + if not (partition.isdigit() and + os.path.isdir(part_path)): + self.logger.warning( + 'Unexpected entity in data dir: %r' % part_path) + remove_file(part_path) + continue + partition = int(partition) + if override_partitions and (partition not in + override_partitions): + continue + part_info = { + 'local_dev': local_dev, + 'policy': policy, + 'partition': partition, + 'part_path': part_path, + } + yield part_info + + def build_reconstruction_jobs(self, part_info): + """ + Helper function for collect_jobs to build jobs for reconstruction + using EC style storage policy + """ + jobs = self._get_part_jobs(**part_info) + random.shuffle(jobs) + if self.handoffs_first: + # Move the handoff revert jobs to the front of the list + jobs.sort(key=lambda job: job['job_type'], reverse=True) + self.job_count += len(jobs) + return jobs + + def _reset_stats(self): + self.start = time.time() + self.job_count = 0 + self.suffix_count = 0 + self.suffix_sync = 0 + self.suffix_hash = 0 + self.reconstruction_count = 0 + self.last_reconstruction_count = -1 + + def delete_partition(self, path): + self.logger.info(_("Removing partition: %s"), path) + tpool.execute(shutil.rmtree, path, ignore_errors=True) + + def reconstruct(self, **kwargs): + """Run a reconstruction pass""" + self._reset_stats() + self.partition_times = [] + + stats = spawn(self.heartbeat) + lockup_detector = spawn(self.detect_lockups) + sleep() # Give spawns a cycle + + try: + self.run_pool = GreenPool(size=self.concurrency) + for part_info in self.collect_parts(**kwargs): + if not self.check_ring(part_info['policy'].object_ring): + self.logger.info(_("Ring change detected. Aborting " + "current reconstruction pass.")) + return + jobs = self.build_reconstruction_jobs(part_info) + if not jobs: + # If this part belongs on this node, _get_part_jobs + # will *always* build a sync_job - even if there's + # no suffixes in the partition that needs to sync. + # If there's any suffixes in the partition then our + # job list would have *at least* one revert job. + # Therefore we know this part a) doesn't belong on + # this node and b) doesn't have any suffixes in it. + self.run_pool.spawn(self.delete_partition, + part_info['part_path']) + for job in jobs: + self.run_pool.spawn(self.process_job, job) + with Timeout(self.lockup_timeout): + self.run_pool.waitall() + except (Exception, Timeout): + self.logger.exception(_("Exception in top-level" + "reconstruction loop")) + self.kill_coros() + finally: + stats.kill() + lockup_detector.kill() + self.stats_line() + + def run_once(self, *args, **kwargs): + start = time.time() + self.logger.info(_("Running object reconstructor in script mode.")) + override_devices = list_from_csv(kwargs.get('devices')) + override_partitions = [int(p) for p in + list_from_csv(kwargs.get('partitions'))] + self.reconstruct( + override_devices=override_devices, + override_partitions=override_partitions) + total = (time.time() - start) / 60 + self.logger.info( + _("Object reconstruction complete (once). (%.02f minutes)"), total) + if not (override_partitions or override_devices): + dump_recon_cache({'object_reconstruction_time': total, + 'object_reconstruction_last': time.time()}, + self.rcache, self.logger) + + def run_forever(self, *args, **kwargs): + self.logger.info(_("Starting object reconstructor in daemon mode.")) + # Run the reconstructor continually + while True: + start = time.time() + self.logger.info(_("Starting object reconstruction pass.")) + # Run the reconstructor + self.reconstruct() + total = (time.time() - start) / 60 + self.logger.info( + _("Object reconstruction complete. (%.02f minutes)"), total) + dump_recon_cache({'object_reconstruction_time': total, + 'object_reconstruction_last': time.time()}, + self.rcache, self.logger) + self.logger.debug('reconstruction sleeping for %s seconds.', + self.run_pause) + sleep(self.run_pause) diff --git a/swift/obj/replicator.py b/swift/obj/replicator.py index 5ee32884c..580d1827e 100644 --- a/swift/obj/replicator.py +++ b/swift/obj/replicator.py @@ -39,7 +39,7 @@ from swift.common.http import HTTP_OK, HTTP_INSUFFICIENT_STORAGE from swift.obj import ssync_sender from swift.obj.diskfile import (DiskFileManager, get_hashes, get_data_dir, get_tmp_dir) -from swift.common.storage_policy import POLICIES +from swift.common.storage_policy import POLICIES, REPL_POLICY hubs.use_hub(get_hub()) @@ -110,14 +110,15 @@ class ObjectReplicator(Daemon): """ return self.sync_method(node, job, suffixes, *args, **kwargs) - def get_object_ring(self, policy_idx): + def load_object_ring(self, policy): """ - Get the ring object to use to handle a request based on its policy. + Make sure the policy's rings are loaded. - :policy_idx: policy index as defined in swift.conf + :param policy: the StoragePolicy instance :returns: appropriate ring object """ - return POLICIES.get_object_ring(policy_idx, self.swift_dir) + policy.load_ring(self.swift_dir) + return policy.object_ring def _rsync(self, args): """ @@ -170,7 +171,7 @@ class ObjectReplicator(Daemon): sync method in Swift. """ if not os.path.exists(job['path']): - return False, set() + return False, {} args = [ 'rsync', '--recursive', @@ -195,11 +196,11 @@ class ObjectReplicator(Daemon): args.append(spath) had_any = True if not had_any: - return False, set() - data_dir = get_data_dir(job['policy_idx']) + return False, {} + data_dir = get_data_dir(job['policy']) args.append(join(rsync_module, node['device'], data_dir, job['partition'])) - return self._rsync(args) == 0, set() + return self._rsync(args) == 0, {} def ssync(self, node, job, suffixes, remote_check_objs=None): return ssync_sender.Sender( @@ -231,7 +232,7 @@ class ObjectReplicator(Daemon): if len(suff) == 3 and isdir(join(path, suff))] self.replication_count += 1 self.logger.increment('partition.delete.count.%s' % (job['device'],)) - self.headers['X-Backend-Storage-Policy-Index'] = job['policy_idx'] + self.headers['X-Backend-Storage-Policy-Index'] = int(job['policy']) begin = time.time() try: responses = [] @@ -245,8 +246,9 @@ class ObjectReplicator(Daemon): self.conf.get('sync_method', 'rsync') == 'ssync': kwargs['remote_check_objs'] = \ synced_remote_regions[node['region']] - # cand_objs is a list of objects for deletion - success, cand_objs = self.sync( + # candidates is a dict(hash=>timestamp) of objects + # for deletion + success, candidates = self.sync( node, job, suffixes, **kwargs) if success: with Timeout(self.http_timeout): @@ -257,7 +259,8 @@ class ObjectReplicator(Daemon): '/' + '-'.join(suffixes), headers=self.headers) conn.getresponse().read() if node['region'] != job['region']: - synced_remote_regions[node['region']] = cand_objs + synced_remote_regions[node['region']] = \ + candidates.keys() responses.append(success) for region, cand_objs in synced_remote_regions.iteritems(): if delete_objs is None: @@ -314,7 +317,7 @@ class ObjectReplicator(Daemon): """ self.replication_count += 1 self.logger.increment('partition.update.count.%s' % (job['device'],)) - self.headers['X-Backend-Storage-Policy-Index'] = job['policy_idx'] + self.headers['X-Backend-Storage-Policy-Index'] = int(job['policy']) begin = time.time() try: hashed, local_hash = tpool_reraise( @@ -328,7 +331,8 @@ class ObjectReplicator(Daemon): random.shuffle(job['nodes']) nodes = itertools.chain( job['nodes'], - job['object_ring'].get_more_nodes(int(job['partition']))) + job['policy'].object_ring.get_more_nodes( + int(job['partition']))) while attempts_left > 0: # If this throws StopIteration it will be caught way below node = next(nodes) @@ -460,16 +464,15 @@ class ObjectReplicator(Daemon): self.kill_coros() self.last_replication_count = self.replication_count - def process_repl(self, policy, ips, override_devices=None, - override_partitions=None): + def build_replication_jobs(self, policy, ips, override_devices=None, + override_partitions=None): """ Helper function for collect_jobs to build jobs for replication using replication style storage policy """ jobs = [] - obj_ring = self.get_object_ring(policy.idx) - data_dir = get_data_dir(policy.idx) - for local_dev in [dev for dev in obj_ring.devs + data_dir = get_data_dir(policy) + for local_dev in [dev for dev in policy.object_ring.devs if (dev and is_local_device(ips, self.port, @@ -479,7 +482,7 @@ class ObjectReplicator(Daemon): or dev['device'] in override_devices))]: dev_path = join(self.devices_dir, local_dev['device']) obj_path = join(dev_path, data_dir) - tmp_path = join(dev_path, get_tmp_dir(int(policy))) + tmp_path = join(dev_path, get_tmp_dir(policy)) if self.mount_check and not ismount(dev_path): self.logger.warn(_('%s is not mounted'), local_dev['device']) continue @@ -497,7 +500,8 @@ class ObjectReplicator(Daemon): try: job_path = join(obj_path, partition) - part_nodes = obj_ring.get_part_nodes(int(partition)) + part_nodes = policy.object_ring.get_part_nodes( + int(partition)) nodes = [node for node in part_nodes if node['id'] != local_dev['id']] jobs.append( @@ -506,9 +510,8 @@ class ObjectReplicator(Daemon): obj_path=obj_path, nodes=nodes, delete=len(nodes) > len(part_nodes) - 1, - policy_idx=policy.idx, + policy=policy, partition=partition, - object_ring=obj_ring, region=local_dev['region'])) except ValueError: continue @@ -530,13 +533,15 @@ class ObjectReplicator(Daemon): jobs = [] ips = whataremyips() for policy in POLICIES: - if (override_policies is not None - and str(policy.idx) not in override_policies): - continue - # may need to branch here for future policy types - jobs += self.process_repl(policy, ips, - override_devices=override_devices, - override_partitions=override_partitions) + if policy.policy_type == REPL_POLICY: + if (override_policies is not None and + str(policy.idx) not in override_policies): + continue + # ensure rings are loaded for policy + self.load_object_ring(policy) + jobs += self.build_replication_jobs( + policy, ips, override_devices=override_devices, + override_partitions=override_partitions) random.shuffle(jobs) if self.handoffs_first: # Move the handoff parts to the front of the list @@ -569,7 +574,7 @@ class ObjectReplicator(Daemon): if self.mount_check and not ismount(dev_path): self.logger.warn(_('%s is not mounted'), job['device']) continue - if not self.check_ring(job['object_ring']): + if not self.check_ring(job['policy'].object_ring): self.logger.info(_("Ring change detected. Aborting " "current replication pass.")) return diff --git a/swift/obj/server.py b/swift/obj/server.py index ad0f9faeb..658f207a8 100644 --- a/swift/obj/server.py +++ b/swift/obj/server.py @@ -16,10 +16,12 @@ """ Object Server for Swift """ import cPickle as pickle +import json import os import multiprocessing import time import traceback +import rfc822 import socket import math from swift import gettext_ as _ @@ -30,7 +32,7 @@ from eventlet import sleep, wsgi, Timeout from swift.common.utils import public, get_logger, \ config_true_value, timing_stats, replication, \ normalize_delete_at_timestamp, get_log_line, Timestamp, \ - get_expirer_container + get_expirer_container, iter_multipart_mime_documents from swift.common.bufferedhttp import http_connect from swift.common.constraints import check_object_creation, \ valid_timestamp, check_utf8 @@ -48,8 +50,35 @@ from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPCreated, \ HTTPPreconditionFailed, HTTPRequestTimeout, HTTPUnprocessableEntity, \ HTTPClientDisconnect, HTTPMethodNotAllowed, Request, Response, \ HTTPInsufficientStorage, HTTPForbidden, HTTPException, HeaderKeyDict, \ - HTTPConflict -from swift.obj.diskfile import DATAFILE_SYSTEM_META, DiskFileManager + HTTPConflict, HTTPServerError +from swift.obj.diskfile import DATAFILE_SYSTEM_META, DiskFileRouter + + +def iter_mime_headers_and_bodies(wsgi_input, mime_boundary, read_chunk_size): + mime_documents_iter = iter_multipart_mime_documents( + wsgi_input, mime_boundary, read_chunk_size) + + for file_like in mime_documents_iter: + hdrs = HeaderKeyDict(rfc822.Message(file_like, 0)) + yield (hdrs, file_like) + + +def drain(file_like, read_size, timeout): + """ + Read and discard any bytes from file_like. + + :param file_like: file-like object to read from + :param read_size: how big a chunk to read at a time + :param timeout: how long to wait for a read (use None for no timeout) + + :raises ChunkReadTimeout: if no chunk was read in time + """ + + while True: + with ChunkReadTimeout(timeout): + chunk = file_like.read(read_size) + if not chunk: + break class EventletPlungerString(str): @@ -142,7 +171,7 @@ class ObjectController(BaseStorageServer): # Common on-disk hierarchy shared across account, container and object # servers. - self._diskfile_mgr = DiskFileManager(conf, self.logger) + self._diskfile_router = DiskFileRouter(conf, self.logger) # This is populated by global_conf_callback way below as the semaphore # is shared by all workers. if 'replication_semaphore' in conf: @@ -156,7 +185,7 @@ class ObjectController(BaseStorageServer): conf.get('replication_failure_ratio') or 1.0) def get_diskfile(self, device, partition, account, container, obj, - policy_idx, **kwargs): + policy, **kwargs): """ Utility method for instantiating a DiskFile object supporting a given REST API. @@ -165,11 +194,11 @@ class ObjectController(BaseStorageServer): DiskFile class would simply over-ride this method to provide that behavior. """ - return self._diskfile_mgr.get_diskfile( - device, partition, account, container, obj, policy_idx, **kwargs) + return self._diskfile_router[policy].get_diskfile( + device, partition, account, container, obj, policy, **kwargs) def async_update(self, op, account, container, obj, host, partition, - contdevice, headers_out, objdevice, policy_index): + contdevice, headers_out, objdevice, policy): """ Sends or saves an async update. @@ -183,7 +212,7 @@ class ObjectController(BaseStorageServer): :param headers_out: dictionary of headers to send in the container request :param objdevice: device name that the object is in - :param policy_index: the associated storage policy index + :param policy: the associated BaseStoragePolicy instance """ headers_out['user-agent'] = 'object-server %s' % os.getpid() full_path = '/%s/%s/%s' % (account, container, obj) @@ -213,12 +242,11 @@ class ObjectController(BaseStorageServer): data = {'op': op, 'account': account, 'container': container, 'obj': obj, 'headers': headers_out} timestamp = headers_out['x-timestamp'] - self._diskfile_mgr.pickle_async_update(objdevice, account, container, - obj, data, timestamp, - policy_index) + self._diskfile_router[policy].pickle_async_update( + objdevice, account, container, obj, data, timestamp, policy) def container_update(self, op, account, container, obj, request, - headers_out, objdevice, policy_idx): + headers_out, objdevice, policy): """ Update the container when objects are updated. @@ -230,6 +258,7 @@ class ObjectController(BaseStorageServer): :param headers_out: dictionary of headers to send in the container request(s) :param objdevice: device name that the object is in + :param policy: the BaseStoragePolicy instance """ headers_in = request.headers conthosts = [h.strip() for h in @@ -255,14 +284,14 @@ class ObjectController(BaseStorageServer): headers_out['x-trans-id'] = headers_in.get('x-trans-id', '-') headers_out['referer'] = request.as_referer() - headers_out['X-Backend-Storage-Policy-Index'] = policy_idx + headers_out['X-Backend-Storage-Policy-Index'] = int(policy) for conthost, contdevice in updates: self.async_update(op, account, container, obj, conthost, contpartition, contdevice, headers_out, - objdevice, policy_idx) + objdevice, policy) def delete_at_update(self, op, delete_at, account, container, obj, - request, objdevice, policy_index): + request, objdevice, policy): """ Update the expiring objects container when objects are updated. @@ -273,7 +302,7 @@ class ObjectController(BaseStorageServer): :param obj: object name :param request: the original request driving the update :param objdevice: device name that the object is in - :param policy_index: the policy index to be used for tmp dir + :param policy: the BaseStoragePolicy instance (used for tmp dir) """ if config_true_value( request.headers.get('x-backend-replication', 'f')): @@ -333,13 +362,66 @@ class ObjectController(BaseStorageServer): op, self.expiring_objects_account, delete_at_container, '%s-%s/%s/%s' % (delete_at, account, container, obj), host, partition, contdevice, headers_out, objdevice, - policy_index) + policy) + + def _make_timeout_reader(self, file_like): + def timeout_reader(): + with ChunkReadTimeout(self.client_timeout): + return file_like.read(self.network_chunk_size) + return timeout_reader + + def _read_put_commit_message(self, mime_documents_iter): + rcvd_commit = False + try: + with ChunkReadTimeout(self.client_timeout): + commit_hdrs, commit_iter = next(mime_documents_iter) + if commit_hdrs.get('X-Document', None) == "put commit": + rcvd_commit = True + drain(commit_iter, self.network_chunk_size, self.client_timeout) + except ChunkReadTimeout: + raise HTTPClientDisconnect() + except StopIteration: + raise HTTPBadRequest(body="couldn't find PUT commit MIME doc") + return rcvd_commit + + def _read_metadata_footer(self, mime_documents_iter): + try: + with ChunkReadTimeout(self.client_timeout): + footer_hdrs, footer_iter = next(mime_documents_iter) + except ChunkReadTimeout: + raise HTTPClientDisconnect() + except StopIteration: + raise HTTPBadRequest(body="couldn't find footer MIME doc") + + timeout_reader = self._make_timeout_reader(footer_iter) + try: + footer_body = ''.join(iter(timeout_reader, '')) + except ChunkReadTimeout: + raise HTTPClientDisconnect() + + footer_md5 = footer_hdrs.get('Content-MD5') + if not footer_md5: + raise HTTPBadRequest(body="no Content-MD5 in footer") + if footer_md5 != md5(footer_body).hexdigest(): + raise HTTPUnprocessableEntity(body="footer MD5 mismatch") + + try: + return HeaderKeyDict(json.loads(footer_body)) + except ValueError: + raise HTTPBadRequest("invalid JSON for footer doc") + + def _check_container_override(self, update_headers, metadata): + for key, val in metadata.iteritems(): + override_prefix = 'x-backend-container-update-override-' + if key.lower().startswith(override_prefix): + override = key.lower().replace(override_prefix, 'x-') + update_headers[override] = val @public @timing_stats() def POST(self, request): """Handle HTTP POST requests for the Swift Object Server.""" - device, partition, account, container, obj, policy_idx = \ + device, partition, account, container, obj, policy = \ get_name_and_placement(request, 5, 5, True) req_timestamp = valid_timestamp(request) new_delete_at = int(request.headers.get('X-Delete-At') or 0) @@ -349,7 +431,7 @@ class ObjectController(BaseStorageServer): try: disk_file = self.get_diskfile( device, partition, account, container, obj, - policy_idx=policy_idx) + policy=policy) except DiskFileDeviceUnavailable: return HTTPInsufficientStorage(drive=device, request=request) try: @@ -374,11 +456,11 @@ class ObjectController(BaseStorageServer): if orig_delete_at != new_delete_at: if new_delete_at: self.delete_at_update('PUT', new_delete_at, account, container, - obj, request, device, policy_idx) + obj, request, device, policy) if orig_delete_at: self.delete_at_update('DELETE', orig_delete_at, account, container, obj, request, device, - policy_idx) + policy) try: disk_file.write_metadata(metadata) except (DiskFileXattrNotSupported, DiskFileNoSpace): @@ -389,7 +471,7 @@ class ObjectController(BaseStorageServer): @timing_stats() def PUT(self, request): """Handle HTTP PUT requests for the Swift Object Server.""" - device, partition, account, container, obj, policy_idx = \ + device, partition, account, container, obj, policy = \ get_name_and_placement(request, 5, 5, True) req_timestamp = valid_timestamp(request) error_response = check_object_creation(request, obj) @@ -404,10 +486,22 @@ class ObjectController(BaseStorageServer): except ValueError as e: return HTTPBadRequest(body=str(e), request=request, content_type='text/plain') + + # In case of multipart-MIME put, the proxy sends a chunked request, + # but may let us know the real content length so we can verify that + # we have enough disk space to hold the object. + if fsize is None: + fsize = request.headers.get('X-Backend-Obj-Content-Length') + if fsize is not None: + try: + fsize = int(fsize) + except ValueError as e: + return HTTPBadRequest(body=str(e), request=request, + content_type='text/plain') try: disk_file = self.get_diskfile( device, partition, account, container, obj, - policy_idx=policy_idx) + policy=policy) except DiskFileDeviceUnavailable: return HTTPInsufficientStorage(drive=device, request=request) try: @@ -439,13 +533,51 @@ class ObjectController(BaseStorageServer): with disk_file.create(size=fsize) as writer: upload_size = 0 - def timeout_reader(): - with ChunkReadTimeout(self.client_timeout): - return request.environ['wsgi.input'].read( - self.network_chunk_size) + # If the proxy wants to send us object metadata after the + # object body, it sets some headers. We have to tell the + # proxy, in the 100 Continue response, that we're able to + # parse a multipart MIME document and extract the object and + # metadata from it. If we don't, then the proxy won't + # actually send the footer metadata. + have_metadata_footer = False + use_multiphase_commit = False + mime_documents_iter = iter([]) + obj_input = request.environ['wsgi.input'] + + hundred_continue_headers = [] + if config_true_value( + request.headers.get( + 'X-Backend-Obj-Multiphase-Commit')): + use_multiphase_commit = True + hundred_continue_headers.append( + ('X-Obj-Multiphase-Commit', 'yes')) + + if config_true_value( + request.headers.get('X-Backend-Obj-Metadata-Footer')): + have_metadata_footer = True + hundred_continue_headers.append( + ('X-Obj-Metadata-Footer', 'yes')) + + if have_metadata_footer or use_multiphase_commit: + obj_input.set_hundred_continue_response_headers( + hundred_continue_headers) + mime_boundary = request.headers.get( + 'X-Backend-Obj-Multipart-Mime-Boundary') + if not mime_boundary: + return HTTPBadRequest("no MIME boundary") + try: + with ChunkReadTimeout(self.client_timeout): + mime_documents_iter = iter_mime_headers_and_bodies( + request.environ['wsgi.input'], + mime_boundary, self.network_chunk_size) + _junk_hdrs, obj_input = next(mime_documents_iter) + except ChunkReadTimeout: + return HTTPRequestTimeout(request=request) + + timeout_reader = self._make_timeout_reader(obj_input) try: - for chunk in iter(lambda: timeout_reader(), ''): + for chunk in iter(timeout_reader, ''): start_time = time.time() if start_time > upload_expiration: self.logger.increment('PUT.timeouts') @@ -461,9 +593,16 @@ class ObjectController(BaseStorageServer): upload_size) if fsize is not None and fsize != upload_size: return HTTPClientDisconnect(request=request) + + footer_meta = {} + if have_metadata_footer: + footer_meta = self._read_metadata_footer( + mime_documents_iter) + + request_etag = (footer_meta.get('etag') or + request.headers.get('etag', '')).lower() etag = etag.hexdigest() - if 'etag' in request.headers and \ - request.headers['etag'].lower() != etag: + if request_etag and request_etag != etag: return HTTPUnprocessableEntity(request=request) metadata = { 'X-Timestamp': request.timestamp.internal, @@ -473,6 +612,8 @@ class ObjectController(BaseStorageServer): } metadata.update(val for val in request.headers.iteritems() if is_sys_or_user_meta('object', val[0])) + metadata.update(val for val in footer_meta.iteritems() + if is_sys_or_user_meta('object', val[0])) headers_to_copy = ( request.headers.get( 'X-Backend-Replication-Headers', '').split() + @@ -482,39 +623,63 @@ class ObjectController(BaseStorageServer): header_caps = header_key.title() metadata[header_caps] = request.headers[header_key] writer.put(metadata) + + # if the PUT requires a two-phase commit (a data and a commit + # phase) send the proxy server another 100-continue response + # to indicate that we are finished writing object data + if use_multiphase_commit: + request.environ['wsgi.input'].\ + send_hundred_continue_response() + if not self._read_put_commit_message(mime_documents_iter): + return HTTPServerError(request=request) + # got 2nd phase confirmation, write a timestamp.durable + # state file to indicate a successful PUT + + writer.commit(request.timestamp) + + # Drain any remaining MIME docs from the socket. There + # shouldn't be any, but we must read the whole request body. + try: + while True: + with ChunkReadTimeout(self.client_timeout): + _junk_hdrs, _junk_body = next(mime_documents_iter) + drain(_junk_body, self.network_chunk_size, + self.client_timeout) + except ChunkReadTimeout: + raise HTTPClientDisconnect() + except StopIteration: + pass + except (DiskFileXattrNotSupported, DiskFileNoSpace): return HTTPInsufficientStorage(drive=device, request=request) if orig_delete_at != new_delete_at: if new_delete_at: self.delete_at_update( 'PUT', new_delete_at, account, container, obj, request, - device, policy_idx) + device, policy) if orig_delete_at: self.delete_at_update( 'DELETE', orig_delete_at, account, container, obj, - request, device, policy_idx) + request, device, policy) update_headers = HeaderKeyDict({ 'x-size': metadata['Content-Length'], 'x-content-type': metadata['Content-Type'], 'x-timestamp': metadata['X-Timestamp'], 'x-etag': metadata['ETag']}) # apply any container update header overrides sent with request - for key, val in request.headers.iteritems(): - override_prefix = 'x-backend-container-update-override-' - if key.lower().startswith(override_prefix): - override = key.lower().replace(override_prefix, 'x-') - update_headers[override] = val + self._check_container_override(update_headers, request.headers) + self._check_container_override(update_headers, footer_meta) self.container_update( 'PUT', account, container, obj, request, update_headers, - device, policy_idx) + device, policy) return HTTPCreated(request=request, etag=etag) @public @timing_stats() def GET(self, request): """Handle HTTP GET requests for the Swift Object Server.""" - device, partition, account, container, obj, policy_idx = \ + device, partition, account, container, obj, policy = \ get_name_and_placement(request, 5, 5, True) keep_cache = self.keep_cache_private or ( 'X-Auth-Token' not in request.headers and @@ -522,7 +687,7 @@ class ObjectController(BaseStorageServer): try: disk_file = self.get_diskfile( device, partition, account, container, obj, - policy_idx=policy_idx) + policy=policy) except DiskFileDeviceUnavailable: return HTTPInsufficientStorage(drive=device, request=request) try: @@ -533,9 +698,14 @@ class ObjectController(BaseStorageServer): keep_cache = (self.keep_cache_private or ('X-Auth-Token' not in request.headers and 'X-Storage-Token' not in request.headers)) + conditional_etag = None + if 'X-Backend-Etag-Is-At' in request.headers: + conditional_etag = metadata.get( + request.headers['X-Backend-Etag-Is-At']) response = Response( app_iter=disk_file.reader(keep_cache=keep_cache), - request=request, conditional_response=True) + request=request, conditional_response=True, + conditional_etag=conditional_etag) response.headers['Content-Type'] = metadata.get( 'Content-Type', 'application/octet-stream') for key, value in metadata.iteritems(): @@ -567,12 +737,12 @@ class ObjectController(BaseStorageServer): @timing_stats(sample_rate=0.8) def HEAD(self, request): """Handle HTTP HEAD requests for the Swift Object Server.""" - device, partition, account, container, obj, policy_idx = \ + device, partition, account, container, obj, policy = \ get_name_and_placement(request, 5, 5, True) try: disk_file = self.get_diskfile( device, partition, account, container, obj, - policy_idx=policy_idx) + policy=policy) except DiskFileDeviceUnavailable: return HTTPInsufficientStorage(drive=device, request=request) try: @@ -585,7 +755,12 @@ class ObjectController(BaseStorageServer): headers['X-Backend-Timestamp'] = e.timestamp.internal return HTTPNotFound(request=request, headers=headers, conditional_response=True) - response = Response(request=request, conditional_response=True) + conditional_etag = None + if 'X-Backend-Etag-Is-At' in request.headers: + conditional_etag = metadata.get( + request.headers['X-Backend-Etag-Is-At']) + response = Response(request=request, conditional_response=True, + conditional_etag=conditional_etag) response.headers['Content-Type'] = metadata.get( 'Content-Type', 'application/octet-stream') for key, value in metadata.iteritems(): @@ -609,13 +784,13 @@ class ObjectController(BaseStorageServer): @timing_stats() def DELETE(self, request): """Handle HTTP DELETE requests for the Swift Object Server.""" - device, partition, account, container, obj, policy_idx = \ + device, partition, account, container, obj, policy = \ get_name_and_placement(request, 5, 5, True) req_timestamp = valid_timestamp(request) try: disk_file = self.get_diskfile( device, partition, account, container, obj, - policy_idx=policy_idx) + policy=policy) except DiskFileDeviceUnavailable: return HTTPInsufficientStorage(drive=device, request=request) try: @@ -667,13 +842,13 @@ class ObjectController(BaseStorageServer): if orig_delete_at: self.delete_at_update('DELETE', orig_delete_at, account, container, obj, request, device, - policy_idx) + policy) if orig_timestamp < req_timestamp: disk_file.delete(req_timestamp) self.container_update( 'DELETE', account, container, obj, request, HeaderKeyDict({'x-timestamp': req_timestamp.internal}), - device, policy_idx) + device, policy) return response_class( request=request, headers={'X-Backend-Timestamp': response_timestamp.internal}) @@ -685,12 +860,17 @@ class ObjectController(BaseStorageServer): """ Handle REPLICATE requests for the Swift Object Server. This is used by the object replicator to get hashes for directories. + + Note that the name REPLICATE is preserved for historical reasons as + this verb really just returns the hashes information for the specified + parameters and is used, for example, by both replication and EC. """ - device, partition, suffix, policy_idx = \ + device, partition, suffix_parts, policy = \ get_name_and_placement(request, 2, 3, True) + suffixes = suffix_parts.split('-') if suffix_parts else [] try: - hashes = self._diskfile_mgr.get_hashes(device, partition, suffix, - policy_idx) + hashes = self._diskfile_router[policy].get_hashes( + device, partition, suffixes, policy) except DiskFileDeviceUnavailable: resp = HTTPInsufficientStorage(drive=device, request=request) else: @@ -700,7 +880,7 @@ class ObjectController(BaseStorageServer): @public @replication @timing_stats(sample_rate=0.1) - def REPLICATION(self, request): + def SSYNC(self, request): return Response(app_iter=ssync_receiver.Receiver(self, request)()) def __call__(self, env, start_response): @@ -734,7 +914,7 @@ class ObjectController(BaseStorageServer): trans_time = time.time() - start_time if self.log_requests: log_line = get_log_line(req, res, trans_time, '') - if req.method in ('REPLICATE', 'REPLICATION') or \ + if req.method in ('REPLICATE', 'SSYNC') or \ 'X-Backend-Replication' in req.headers: self.logger.debug(log_line) else: diff --git a/swift/obj/ssync_receiver.py b/swift/obj/ssync_receiver.py index 248715d00..b636a1624 100644 --- a/swift/obj/ssync_receiver.py +++ b/swift/obj/ssync_receiver.py @@ -24,27 +24,28 @@ from swift.common import exceptions from swift.common import http from swift.common import swob from swift.common import utils +from swift.common import request_helpers class Receiver(object): """ - Handles incoming REPLICATION requests to the object server. + Handles incoming SSYNC requests to the object server. These requests come from the object-replicator daemon that uses :py:mod:`.ssync_sender`. - The number of concurrent REPLICATION requests is restricted by + The number of concurrent SSYNC requests is restricted by use of a replication_semaphore and can be configured with the object-server.conf [object-server] replication_concurrency setting. - A REPLICATION request is really just an HTTP conduit for + An SSYNC request is really just an HTTP conduit for sender/receiver replication communication. The overall - REPLICATION request should always succeed, but it will contain + SSYNC request should always succeed, but it will contain multiple requests within its request and response bodies. This "hack" is done so that replication concurrency can be managed. - The general process inside a REPLICATION request is: + The general process inside an SSYNC request is: 1. Initialize the request: Basic request validation, mount check, acquire semaphore lock, etc.. @@ -72,10 +73,10 @@ class Receiver(object): def __call__(self): """ - Processes a REPLICATION request. + Processes an SSYNC request. Acquires a semaphore lock and then proceeds through the steps - of the REPLICATION process. + of the SSYNC process. """ # The general theme for functions __call__ calls is that they should # raise exceptions.MessageTimeout for client timeouts (logged locally), @@ -88,7 +89,7 @@ class Receiver(object): try: # Double try blocks in case our main error handlers fail. try: - # intialize_request is for preamble items that can be done + # initialize_request is for preamble items that can be done # outside a replication semaphore lock. for data in self.initialize_request(): yield data @@ -98,7 +99,7 @@ class Receiver(object): if not self.app.replication_semaphore.acquire(False): raise swob.HTTPServiceUnavailable() try: - with self.app._diskfile_mgr.replication_lock(self.device): + with self.diskfile_mgr.replication_lock(self.device): for data in self.missing_check(): yield data for data in self.updates(): @@ -111,7 +112,7 @@ class Receiver(object): self.app.replication_semaphore.release() except exceptions.ReplicationLockTimeout as err: self.app.logger.debug( - '%s/%s/%s REPLICATION LOCK TIMEOUT: %s' % ( + '%s/%s/%s SSYNC LOCK TIMEOUT: %s' % ( self.request.remote_addr, self.device, self.partition, err)) yield ':ERROR: %d %r\n' % (0, str(err)) @@ -166,14 +167,17 @@ class Receiver(object): """ # The following is the setting we talk about above in _ensure_flush. self.request.environ['eventlet.minimum_write_chunk_size'] = 0 - self.device, self.partition = utils.split_path( - urllib.unquote(self.request.path), 2, 2, False) - self.policy_idx = \ - int(self.request.headers.get('X-Backend-Storage-Policy-Index', 0)) + self.device, self.partition, self.policy = \ + request_helpers.get_name_and_placement(self.request, 2, 2, False) + if 'X-Backend-Ssync-Frag-Index' in self.request.headers: + self.frag_index = int( + self.request.headers['X-Backend-Ssync-Frag-Index']) + else: + self.frag_index = None utils.validate_device_partition(self.device, self.partition) - if self.app._diskfile_mgr.mount_check and \ - not constraints.check_mount( - self.app._diskfile_mgr.devices, self.device): + self.diskfile_mgr = self.app._diskfile_router[self.policy] + if self.diskfile_mgr.mount_check and not constraints.check_mount( + self.diskfile_mgr.devices, self.device): raise swob.HTTPInsufficientStorage(drive=self.device) self.fp = self.request.environ['wsgi.input'] for data in self._ensure_flush(): @@ -182,7 +186,7 @@ class Receiver(object): def missing_check(self): """ Handles the receiver-side of the MISSING_CHECK step of a - REPLICATION request. + SSYNC request. Receives a list of hashes and timestamps of object information the sender can provide and responds with a list @@ -226,11 +230,13 @@ class Receiver(object): line = self.fp.readline(self.app.network_chunk_size) if not line or line.strip() == ':MISSING_CHECK: END': break - object_hash, timestamp = [urllib.unquote(v) for v in line.split()] + parts = line.split() + object_hash, timestamp = [urllib.unquote(v) for v in parts[:2]] want = False try: - df = self.app._diskfile_mgr.get_diskfile_from_hash( - self.device, self.partition, object_hash, self.policy_idx) + df = self.diskfile_mgr.get_diskfile_from_hash( + self.device, self.partition, object_hash, self.policy, + frag_index=self.frag_index) except exceptions.DiskFileNotExist: want = True else: @@ -253,7 +259,7 @@ class Receiver(object): def updates(self): """ - Handles the UPDATES step of a REPLICATION request. + Handles the UPDATES step of an SSYNC request. Receives a set of PUT and DELETE subrequests that will be routed to the object server itself for processing. These @@ -353,7 +359,7 @@ class Receiver(object): subreq_iter()) else: raise Exception('Invalid subrequest method %s' % method) - subreq.headers['X-Backend-Storage-Policy-Index'] = self.policy_idx + subreq.headers['X-Backend-Storage-Policy-Index'] = int(self.policy) subreq.headers['X-Backend-Replication'] = 'True' if replication_headers: subreq.headers['X-Backend-Replication-Headers'] = \ diff --git a/swift/obj/ssync_sender.py b/swift/obj/ssync_sender.py index 1058ab262..8e9202c00 100644 --- a/swift/obj/ssync_sender.py +++ b/swift/obj/ssync_sender.py @@ -22,7 +22,7 @@ from swift.common import http class Sender(object): """ - Sends REPLICATION requests to the object server. + Sends SSYNC requests to the object server. These requests are eventually handled by :py:mod:`.ssync_receiver` and full documentation about the @@ -31,6 +31,7 @@ class Sender(object): def __init__(self, daemon, node, job, suffixes, remote_check_objs=None): self.daemon = daemon + self.df_mgr = self.daemon._diskfile_mgr self.node = node self.job = job self.suffixes = suffixes @@ -38,28 +39,28 @@ class Sender(object): self.response = None self.response_buffer = '' self.response_chunk_left = 0 - self.available_set = set() + # available_map has an entry for each object in given suffixes that + # is available to be sync'd; each entry is a hash => timestamp + self.available_map = {} # When remote_check_objs is given in job, ssync_sender trys only to # make sure those objects exist or not in remote. self.remote_check_objs = remote_check_objs + # send_list has an entry for each object that the receiver wants to + # be sync'ed; each entry is an object hash self.send_list = [] self.failures = 0 - @property - def policy_idx(self): - return int(self.job.get('policy_idx', 0)) - def __call__(self): """ Perform ssync with remote node. - :returns: a 2-tuple, in the form (success, can_delete_objs). - - Success is a boolean, and can_delete_objs is an iterable of strings - representing the hashes which are in sync with the remote node. + :returns: a 2-tuple, in the form (success, can_delete_objs) where + success is a boolean and can_delete_objs is the map of + objects that are in sync with the receiver. Each entry in + can_delete_objs maps a hash => timestamp """ if not self.suffixes: - return True, set() + return True, {} try: # Double try blocks in case our main error handler fails. try: @@ -72,18 +73,20 @@ class Sender(object): self.missing_check() if self.remote_check_objs is None: self.updates() - can_delete_obj = self.available_set + can_delete_obj = self.available_map else: # when we are initialized with remote_check_objs we don't # *send* any requested updates; instead we only collect # what's already in sync and safe for deletion - can_delete_obj = self.available_set.difference( - self.send_list) + in_sync_hashes = (set(self.available_map.keys()) - + set(self.send_list)) + can_delete_obj = dict((hash_, self.available_map[hash_]) + for hash_ in in_sync_hashes) self.disconnect() if not self.failures: return True, can_delete_obj else: - return False, set() + return False, {} except (exceptions.MessageTimeout, exceptions.ReplicationException) as err: self.daemon.logger.error( @@ -109,11 +112,11 @@ class Sender(object): # would only get called if the above except Exception handler # failed (bad node or job data). self.daemon.logger.exception('EXCEPTION in replication.Sender') - return False, set() + return False, {} def connect(self): """ - Establishes a connection and starts a REPLICATION request + Establishes a connection and starts an SSYNC request with the object server. """ with exceptions.MessageTimeout( @@ -121,11 +124,13 @@ class Sender(object): self.connection = bufferedhttp.BufferedHTTPConnection( '%s:%s' % (self.node['replication_ip'], self.node['replication_port'])) - self.connection.putrequest('REPLICATION', '/%s/%s' % ( + self.connection.putrequest('SSYNC', '/%s/%s' % ( self.node['device'], self.job['partition'])) self.connection.putheader('Transfer-Encoding', 'chunked') self.connection.putheader('X-Backend-Storage-Policy-Index', - self.policy_idx) + int(self.job['policy'])) + self.connection.putheader('X-Backend-Ssync-Frag-Index', + self.node['index']) self.connection.endheaders() with exceptions.MessageTimeout( self.daemon.node_timeout, 'connect receive'): @@ -137,7 +142,7 @@ class Sender(object): def readline(self): """ - Reads a line from the REPLICATION response body. + Reads a line from the SSYNC response body. httplib has no readline and will block on read(x) until x is read, so we have to do the work ourselves. A bit of this is @@ -183,7 +188,7 @@ class Sender(object): def missing_check(self): """ Handles the sender-side of the MISSING_CHECK step of a - REPLICATION request. + SSYNC request. Full documentation of this can be found at :py:meth:`.Receiver.missing_check`. @@ -193,14 +198,15 @@ class Sender(object): self.daemon.node_timeout, 'missing_check start'): msg = ':MISSING_CHECK: START\r\n' self.connection.send('%x\r\n%s\r\n' % (len(msg), msg)) - hash_gen = self.daemon._diskfile_mgr.yield_hashes( + hash_gen = self.df_mgr.yield_hashes( self.job['device'], self.job['partition'], - self.policy_idx, self.suffixes) + self.job['policy'], self.suffixes, + frag_index=self.job.get('frag_index')) if self.remote_check_objs is not None: hash_gen = ifilter(lambda (path, object_hash, timestamp): object_hash in self.remote_check_objs, hash_gen) for path, object_hash, timestamp in hash_gen: - self.available_set.add(object_hash) + self.available_map[object_hash] = timestamp with exceptions.MessageTimeout( self.daemon.node_timeout, 'missing_check send line'): @@ -234,12 +240,13 @@ class Sender(object): line = line.strip() if line == ':MISSING_CHECK: END': break - if line: - self.send_list.append(line) + parts = line.split() + if parts: + self.send_list.append(parts[0]) def updates(self): """ - Handles the sender-side of the UPDATES step of a REPLICATION + Handles the sender-side of the UPDATES step of an SSYNC request. Full documentation of this can be found at @@ -252,15 +259,19 @@ class Sender(object): self.connection.send('%x\r\n%s\r\n' % (len(msg), msg)) for object_hash in self.send_list: try: - df = self.daemon._diskfile_mgr.get_diskfile_from_hash( + df = self.df_mgr.get_diskfile_from_hash( self.job['device'], self.job['partition'], object_hash, - self.policy_idx) + self.job['policy'], frag_index=self.job.get('frag_index')) except exceptions.DiskFileNotExist: continue url_path = urllib.quote( '/%s/%s/%s' % (df.account, df.container, df.obj)) try: df.open() + # EC reconstructor may have passed a callback to build + # an alternative diskfile... + df = self.job.get('sync_diskfile_builder', lambda *args: df)( + self.job, self.node, df.get_metadata()) except exceptions.DiskFileDeleted as err: self.send_delete(url_path, err.timestamp) except exceptions.DiskFileError: @@ -328,7 +339,7 @@ class Sender(object): def disconnect(self): """ Closes down the connection to the object server once done - with the REPLICATION request. + with the SSYNC request. """ try: with exceptions.MessageTimeout( diff --git a/swift/obj/updater.py b/swift/obj/updater.py index 6c40c456a..f5d1f37fa 100644 --- a/swift/obj/updater.py +++ b/swift/obj/updater.py @@ -29,7 +29,8 @@ from swift.common.ring import Ring from swift.common.utils import get_logger, renamer, write_pickle, \ dump_recon_cache, config_true_value, ismount from swift.common.daemon import Daemon -from swift.obj.diskfile import get_tmp_dir, get_async_dir, ASYNCDIR_BASE +from swift.common.storage_policy import split_policy_string, PolicyError +from swift.obj.diskfile import get_tmp_dir, ASYNCDIR_BASE from swift.common.http import is_success, HTTP_NOT_FOUND, \ HTTP_INTERNAL_SERVER_ERROR @@ -148,28 +149,19 @@ class ObjectUpdater(Daemon): start_time = time.time() # loop through async pending dirs for all policies for asyncdir in self._listdir(device): - # skip stuff like "accounts", "containers", etc. - if not (asyncdir == ASYNCDIR_BASE or - asyncdir.startswith(ASYNCDIR_BASE + '-')): - continue - # we only care about directories async_pending = os.path.join(device, asyncdir) if not os.path.isdir(async_pending): continue - - if asyncdir == ASYNCDIR_BASE: - policy_idx = 0 - else: - _junk, policy_idx = asyncdir.split('-', 1) - try: - policy_idx = int(policy_idx) - get_async_dir(policy_idx) - except ValueError: - self.logger.warn(_('Directory %s does not map to a ' - 'valid policy') % asyncdir) - continue - + if not asyncdir.startswith(ASYNCDIR_BASE): + # skip stuff like "accounts", "containers", etc. + continue + try: + base, policy = split_policy_string(asyncdir) + except PolicyError as e: + self.logger.warn(_('Directory %r does not map ' + 'to a valid policy (%s)') % (asyncdir, e)) + continue for prefix in self._listdir(async_pending): prefix_path = os.path.join(async_pending, prefix) if not os.path.isdir(prefix_path): @@ -193,7 +185,7 @@ class ObjectUpdater(Daemon): os.unlink(update_path) else: self.process_object_update(update_path, device, - policy_idx) + policy) last_obj_hash = obj_hash time.sleep(self.slowdown) try: @@ -202,13 +194,13 @@ class ObjectUpdater(Daemon): pass self.logger.timing_since('timing', start_time) - def process_object_update(self, update_path, device, policy_idx): + def process_object_update(self, update_path, device, policy): """ Process the object information to be updated and update. :param update_path: path to pickled object update file :param device: path to device - :param policy_idx: storage policy index of object update + :param policy: storage policy of object update """ try: update = pickle.load(open(update_path, 'rb')) @@ -228,7 +220,7 @@ class ObjectUpdater(Daemon): headers_out = update['headers'].copy() headers_out['user-agent'] = 'object-updater %s' % os.getpid() headers_out.setdefault('X-Backend-Storage-Policy-Index', - str(policy_idx)) + str(int(policy))) events = [spawn(self.object_update, node, part, update['op'], obj, headers_out) for node in nodes if node['id'] not in successes] @@ -256,7 +248,7 @@ class ObjectUpdater(Daemon): if new_successes: update['successes'] = successes write_pickle(update, update_path, os.path.join( - device, get_tmp_dir(policy_idx))) + device, get_tmp_dir(policy))) def object_update(self, node, part, op, obj, headers_out): """ diff --git a/swift/proxy/controllers/__init__.py b/swift/proxy/controllers/__init__.py index de4c0145b..706fd9165 100644 --- a/swift/proxy/controllers/__init__.py +++ b/swift/proxy/controllers/__init__.py @@ -13,7 +13,7 @@ from swift.proxy.controllers.base import Controller from swift.proxy.controllers.info import InfoController -from swift.proxy.controllers.obj import ObjectController +from swift.proxy.controllers.obj import ObjectControllerRouter from swift.proxy.controllers.account import AccountController from swift.proxy.controllers.container import ContainerController @@ -22,5 +22,5 @@ __all__ = [ 'ContainerController', 'Controller', 'InfoController', - 'ObjectController', + 'ObjectControllerRouter', ] diff --git a/swift/proxy/controllers/account.py b/swift/proxy/controllers/account.py index ea2f8ae33..915e1c481 100644 --- a/swift/proxy/controllers/account.py +++ b/swift/proxy/controllers/account.py @@ -58,9 +58,10 @@ class AccountController(Controller): constraints.MAX_ACCOUNT_NAME_LENGTH) return resp - partition, nodes = self.app.account_ring.get_nodes(self.account_name) + partition = self.app.account_ring.get_part(self.account_name) + node_iter = self.app.iter_nodes(self.app.account_ring, partition) resp = self.GETorHEAD_base( - req, _('Account'), self.app.account_ring, partition, + req, _('Account'), node_iter, partition, req.swift_entity_path.rstrip('/')) if resp.status_int == HTTP_NOT_FOUND: if resp.headers.get('X-Account-Status', '').lower() == 'deleted': diff --git a/swift/proxy/controllers/base.py b/swift/proxy/controllers/base.py index 0aeb803f1..ca12d343e 100644 --- a/swift/proxy/controllers/base.py +++ b/swift/proxy/controllers/base.py @@ -28,6 +28,7 @@ import os import time import functools import inspect +import logging import operator from sys import exc_info from swift import gettext_ as _ @@ -39,14 +40,14 @@ from eventlet.timeout import Timeout from swift.common.wsgi import make_pre_authed_env from swift.common.utils import Timestamp, config_true_value, \ public, split_path, list_from_csv, GreenthreadSafeIterator, \ - quorum_size, GreenAsyncPile + GreenAsyncPile, quorum_size, parse_content_range from swift.common.bufferedhttp import http_connect from swift.common.exceptions import ChunkReadTimeout, ChunkWriteTimeout, \ ConnectionTimeout from swift.common.http import is_informational, is_success, is_redirection, \ is_server_error, HTTP_OK, HTTP_PARTIAL_CONTENT, HTTP_MULTIPLE_CHOICES, \ HTTP_BAD_REQUEST, HTTP_NOT_FOUND, HTTP_SERVICE_UNAVAILABLE, \ - HTTP_INSUFFICIENT_STORAGE, HTTP_UNAUTHORIZED + HTTP_INSUFFICIENT_STORAGE, HTTP_UNAUTHORIZED, HTTP_CONTINUE from swift.common.swob import Request, Response, HeaderKeyDict, Range, \ HTTPException, HTTPRequestedRangeNotSatisfiable from swift.common.request_helpers import strip_sys_meta_prefix, \ @@ -593,16 +594,37 @@ def close_swift_conn(src): pass +def bytes_to_skip(record_size, range_start): + """ + Assume an object is composed of N records, where the first N-1 are all + the same size and the last is at most that large, but may be smaller. + + When a range request is made, it might start with a partial record. This + must be discarded, lest the consumer get bad data. This is particularly + true of suffix-byte-range requests, e.g. "Range: bytes=-12345" where the + size of the object is unknown at the time the request is made. + + This function computes the number of bytes that must be discarded to + ensure only whole records are yielded. Erasure-code decoding needs this. + + This function could have been inlined, but it took enough tries to get + right that some targeted unit tests were desirable, hence its extraction. + """ + return (record_size - (range_start % record_size)) % record_size + + class GetOrHeadHandler(object): - def __init__(self, app, req, server_type, ring, partition, path, - backend_headers): + def __init__(self, app, req, server_type, node_iter, partition, path, + backend_headers, client_chunk_size=None): self.app = app - self.ring = ring + self.node_iter = node_iter self.server_type = server_type self.partition = partition self.path = path self.backend_headers = backend_headers + self.client_chunk_size = client_chunk_size + self.skip_bytes = 0 self.used_nodes = [] self.used_source_etag = '' @@ -649,6 +671,35 @@ class GetOrHeadHandler(object): else: self.backend_headers['Range'] = 'bytes=%d-' % num_bytes + def learn_size_from_content_range(self, start, end): + """ + If client_chunk_size is set, makes sure we yield things starting on + chunk boundaries based on the Content-Range header in the response. + + Sets our first Range header to the value learned from the + Content-Range header in the response; if we were given a + fully-specified range (e.g. "bytes=123-456"), this is a no-op. + + If we were given a half-specified range (e.g. "bytes=123-" or + "bytes=-456"), then this changes the Range header to a + semantically-equivalent one *and* it lets us resume on a proper + boundary instead of just in the middle of a piece somewhere. + + If the original request is for more than one range, this does not + affect our backend Range header, since we don't support resuming one + of those anyway. + """ + if self.client_chunk_size: + self.skip_bytes = bytes_to_skip(self.client_chunk_size, start) + + if 'Range' in self.backend_headers: + req_range = Range(self.backend_headers['Range']) + + if len(req_range.ranges) > 1: + return + + self.backend_headers['Range'] = "bytes=%d-%d" % (start, end) + def is_good_source(self, src): """ Indicates whether or not the request made to the backend found @@ -674,42 +725,74 @@ class GetOrHeadHandler(object): """ try: nchunks = 0 - bytes_read_from_source = 0 + client_chunk_size = self.client_chunk_size + bytes_consumed_from_backend = 0 node_timeout = self.app.node_timeout if self.server_type == 'Object': node_timeout = self.app.recoverable_node_timeout + buf = '' while True: try: with ChunkReadTimeout(node_timeout): chunk = source.read(self.app.object_chunk_size) nchunks += 1 - bytes_read_from_source += len(chunk) + buf += chunk except ChunkReadTimeout: exc_type, exc_value, exc_traceback = exc_info() if self.newest or self.server_type != 'Object': raise exc_type, exc_value, exc_traceback try: - self.fast_forward(bytes_read_from_source) + self.fast_forward(bytes_consumed_from_backend) except (NotImplementedError, HTTPException, ValueError): raise exc_type, exc_value, exc_traceback + buf = '' new_source, new_node = self._get_source_and_node() if new_source: self.app.exception_occurred( node, _('Object'), - _('Trying to read during GET (retrying)')) + _('Trying to read during GET (retrying)'), + level=logging.ERROR, exc_info=( + exc_type, exc_value, exc_traceback)) # Close-out the connection as best as possible. if getattr(source, 'swift_conn', None): close_swift_conn(source) source = new_source node = new_node - bytes_read_from_source = 0 continue else: raise exc_type, exc_value, exc_traceback + + if buf and self.skip_bytes: + if self.skip_bytes < len(buf): + buf = buf[self.skip_bytes:] + bytes_consumed_from_backend += self.skip_bytes + self.skip_bytes = 0 + else: + self.skip_bytes -= len(buf) + bytes_consumed_from_backend += len(buf) + buf = '' + if not chunk: + if buf: + with ChunkWriteTimeout(self.app.client_timeout): + bytes_consumed_from_backend += len(buf) + yield buf + buf = '' break - with ChunkWriteTimeout(self.app.client_timeout): - yield chunk + + if client_chunk_size is not None: + while len(buf) >= client_chunk_size: + client_chunk = buf[:client_chunk_size] + buf = buf[client_chunk_size:] + with ChunkWriteTimeout(self.app.client_timeout): + yield client_chunk + bytes_consumed_from_backend += len(client_chunk) + else: + with ChunkWriteTimeout(self.app.client_timeout): + yield buf + bytes_consumed_from_backend += len(buf) + buf = '' + # This is for fairness; if the network is outpacing the CPU, # we'll always be able to read and write data without # encountering an EWOULDBLOCK, and so eventlet will not switch @@ -757,7 +840,7 @@ class GetOrHeadHandler(object): node_timeout = self.app.node_timeout if self.server_type == 'Object' and not self.newest: node_timeout = self.app.recoverable_node_timeout - for node in self.app.iter_nodes(self.ring, self.partition): + for node in self.node_iter: if node in self.used_nodes: continue start_node_timing = time.time() @@ -793,8 +876,10 @@ class GetOrHeadHandler(object): src_headers = dict( (k.lower(), v) for k, v in possible_source.getheaders()) - if src_headers.get('etag', '').strip('"') != \ - self.used_source_etag: + + if self.used_source_etag != src_headers.get( + 'x-object-sysmeta-ec-etag', + src_headers.get('etag', '')).strip('"'): self.statuses.append(HTTP_NOT_FOUND) self.reasons.append('') self.bodies.append('') @@ -832,7 +917,9 @@ class GetOrHeadHandler(object): src_headers = dict( (k.lower(), v) for k, v in possible_source.getheaders()) - self.used_source_etag = src_headers.get('etag', '').strip('"') + self.used_source_etag = src_headers.get( + 'x-object-sysmeta-ec-etag', + src_headers.get('etag', '')).strip('"') return source, node return None, None @@ -841,13 +928,17 @@ class GetOrHeadHandler(object): res = None if source: res = Response(request=req) + res.status = source.status + update_headers(res, source.getheaders()) if req.method == 'GET' and \ source.status in (HTTP_OK, HTTP_PARTIAL_CONTENT): + cr = res.headers.get('Content-Range') + if cr: + start, end, total = parse_content_range(cr) + self.learn_size_from_content_range(start, end) res.app_iter = self._make_app_iter(req, node, source) # See NOTE: swift_conn at top of file about this. res.swift_conn = source.swift_conn - res.status = source.status - update_headers(res, source.getheaders()) if not res.environ: res.environ = {} res.environ['swift_x_timestamp'] = \ @@ -993,7 +1084,8 @@ class Controller(object): else: info['partition'] = part info['nodes'] = nodes - info.setdefault('storage_policy', '0') + if info.get('storage_policy') is None: + info['storage_policy'] = 0 return info def _make_request(self, nodes, part, method, path, headers, query, @@ -1098,6 +1190,13 @@ class Controller(object): '%s %s' % (self.server_type, req.method), overrides=overrides, headers=resp_headers) + def _quorum_size(self, n): + """ + Number of successful backend responses needed for the proxy to + consider the client request successful. + """ + return quorum_size(n) + def have_quorum(self, statuses, node_count): """ Given a list of statuses from several requests, determine if @@ -1107,16 +1206,18 @@ class Controller(object): :param node_count: number of nodes being queried (basically ring count) :returns: True or False, depending on if quorum is established """ - quorum = quorum_size(node_count) + quorum = self._quorum_size(node_count) if len(statuses) >= quorum: - for hundred in (HTTP_OK, HTTP_MULTIPLE_CHOICES, HTTP_BAD_REQUEST): + for hundred in (HTTP_CONTINUE, HTTP_OK, HTTP_MULTIPLE_CHOICES, + HTTP_BAD_REQUEST): if sum(1 for s in statuses if hundred <= s < hundred + 100) >= quorum: return True return False def best_response(self, req, statuses, reasons, bodies, server_type, - etag=None, headers=None, overrides=None): + etag=None, headers=None, overrides=None, + quorum_size=None): """ Given a list of responses from several servers, choose the best to return to the API. @@ -1128,10 +1229,16 @@ class Controller(object): :param server_type: type of server the responses came from :param etag: etag :param headers: headers of each response + :param overrides: overrides to apply when lacking quorum + :param quorum_size: quorum size to use :returns: swob.Response object with the correct status, body, etc. set """ + if quorum_size is None: + quorum_size = self._quorum_size(len(statuses)) + resp = self._compute_quorum_response( - req, statuses, reasons, bodies, etag, headers) + req, statuses, reasons, bodies, etag, headers, + quorum_size=quorum_size) if overrides and not resp: faked_up_status_indices = set() transformed = [] @@ -1145,7 +1252,8 @@ class Controller(object): statuses, reasons, headers, bodies = zip(*transformed) resp = self._compute_quorum_response( req, statuses, reasons, bodies, etag, headers, - indices_to_avoid=faked_up_status_indices) + indices_to_avoid=faked_up_status_indices, + quorum_size=quorum_size) if not resp: resp = Response(request=req) @@ -1156,14 +1264,14 @@ class Controller(object): return resp def _compute_quorum_response(self, req, statuses, reasons, bodies, etag, - headers, indices_to_avoid=()): + headers, quorum_size, indices_to_avoid=()): if not statuses: return None for hundred in (HTTP_OK, HTTP_MULTIPLE_CHOICES, HTTP_BAD_REQUEST): hstatuses = \ [(i, s) for i, s in enumerate(statuses) if hundred <= s < hundred + 100] - if len(hstatuses) >= quorum_size(len(statuses)): + if len(hstatuses) >= quorum_size: resp = Response(request=req) try: status_index, status = max( @@ -1228,22 +1336,25 @@ class Controller(object): else: self.app.logger.warning('Could not autocreate account %r' % path) - def GETorHEAD_base(self, req, server_type, ring, partition, path): + def GETorHEAD_base(self, req, server_type, node_iter, partition, path, + client_chunk_size=None): """ Base handler for HTTP GET or HEAD requests. :param req: swob.Request object :param server_type: server type used in logging - :param ring: the ring to obtain nodes from + :param node_iter: an iterator to obtain nodes from :param partition: partition :param path: path for the request + :param client_chunk_size: chunk size for response body iterator :returns: swob.Response object """ backend_headers = self.generate_request_headers( req, additional=req.headers) - handler = GetOrHeadHandler(self.app, req, self.server_type, ring, - partition, path, backend_headers) + handler = GetOrHeadHandler(self.app, req, self.server_type, node_iter, + partition, path, backend_headers, + client_chunk_size=client_chunk_size) res = handler.get_working_response(req) if not res: diff --git a/swift/proxy/controllers/container.py b/swift/proxy/controllers/container.py index fb422e68d..3e4a2bb03 100644 --- a/swift/proxy/controllers/container.py +++ b/swift/proxy/controllers/container.py @@ -93,8 +93,9 @@ class ContainerController(Controller): return HTTPNotFound(request=req) part = self.app.container_ring.get_part( self.account_name, self.container_name) + node_iter = self.app.iter_nodes(self.app.container_ring, part) resp = self.GETorHEAD_base( - req, _('Container'), self.app.container_ring, part, + req, _('Container'), node_iter, part, req.swift_entity_path) if 'swift.authorize' in req.environ: req.acl = resp.headers.get('x-container-read') diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index 2b53ba7a8..a83242b5f 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -24,13 +24,17 @@ # These shenanigans are to ensure all related objects can be garbage # collected. We've seen objects hang around forever otherwise. +import collections import itertools import mimetypes import time import math +import random +from hashlib import md5 from swift import gettext_ as _ from urllib import unquote, quote +from greenlet import GreenletExit from eventlet import GreenPile from eventlet.queue import Queue from eventlet.timeout import Timeout @@ -38,7 +42,8 @@ from eventlet.timeout import Timeout from swift.common.utils import ( clean_content_type, config_true_value, ContextPool, csv_append, GreenAsyncPile, GreenthreadSafeIterator, json, Timestamp, - normalize_delete_at_timestamp, public, quorum_size, get_expirer_container) + normalize_delete_at_timestamp, public, get_expirer_container, + quorum_size) from swift.common.bufferedhttp import http_connect from swift.common.constraints import check_metadata, check_object_creation, \ check_copy_from_header, check_destination_header, \ @@ -46,21 +51,24 @@ from swift.common.constraints import check_metadata, check_object_creation, \ from swift.common import constraints from swift.common.exceptions import ChunkReadTimeout, \ ChunkWriteTimeout, ConnectionTimeout, ListingIterNotFound, \ - ListingIterNotAuthorized, ListingIterError + ListingIterNotAuthorized, ListingIterError, ResponseTimeout, \ + InsufficientStorage, FooterNotSupported, MultiphasePUTNotSupported, \ + PutterConnectError from swift.common.http import ( is_success, is_client_error, is_server_error, HTTP_CONTINUE, HTTP_CREATED, HTTP_MULTIPLE_CHOICES, HTTP_NOT_FOUND, HTTP_INTERNAL_SERVER_ERROR, HTTP_SERVICE_UNAVAILABLE, HTTP_INSUFFICIENT_STORAGE, - HTTP_PRECONDITION_FAILED, HTTP_CONFLICT) -from swift.common.storage_policy import POLICIES + HTTP_PRECONDITION_FAILED, HTTP_CONFLICT, is_informational) +from swift.common.storage_policy import (POLICIES, REPL_POLICY, EC_POLICY, + ECDriverError, PolicyError) from swift.proxy.controllers.base import Controller, delay_denial, \ cors_validation from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPNotFound, \ HTTPPreconditionFailed, HTTPRequestEntityTooLarge, HTTPRequestTimeout, \ - HTTPServerError, HTTPServiceUnavailable, Request, \ - HTTPClientDisconnect, HeaderKeyDict, HTTPException + HTTPServerError, HTTPServiceUnavailable, Request, HeaderKeyDict, \ + HTTPClientDisconnect, HTTPUnprocessableEntity, Response, HTTPException from swift.common.request_helpers import is_sys_or_user_meta, is_sys_meta, \ - remove_items, copy_header_subset + remove_items, copy_header_subset, close_if_possible def copy_headers_into(from_r, to_r): @@ -85,8 +93,41 @@ def check_content_type(req): return None -class ObjectController(Controller): - """WSGI controller for object requests.""" +class ObjectControllerRouter(object): + + policy_type_to_controller_map = {} + + @classmethod + def register(cls, policy_type): + """ + Decorator for Storage Policy implemenations to register + their ObjectController implementations. + + This also fills in a policy_type attribute on the class. + """ + def register_wrapper(controller_cls): + if policy_type in cls.policy_type_to_controller_map: + raise PolicyError( + '%r is already registered for the policy_type %r' % ( + cls.policy_type_to_controller_map[policy_type], + policy_type)) + cls.policy_type_to_controller_map[policy_type] = controller_cls + controller_cls.policy_type = policy_type + return controller_cls + return register_wrapper + + def __init__(self): + self.policy_to_controller_cls = {} + for policy in POLICIES: + self.policy_to_controller_cls[policy] = \ + self.policy_type_to_controller_map[policy.policy_type] + + def __getitem__(self, policy): + return self.policy_to_controller_cls[policy] + + +class BaseObjectController(Controller): + """Base WSGI controller for object requests.""" server_type = 'Object' def __init__(self, app, account_name, container_name, object_name, @@ -114,8 +155,10 @@ class ObjectController(Controller): lreq.environ['QUERY_STRING'] = \ 'format=json&prefix=%s&marker=%s' % (quote(lprefix), quote(marker)) + container_node_iter = self.app.iter_nodes(self.app.container_ring, + lpartition) lresp = self.GETorHEAD_base( - lreq, _('Container'), self.app.container_ring, lpartition, + lreq, _('Container'), container_node_iter, lpartition, lreq.swift_entity_path) if 'swift.authorize' in env: lreq.acl = lresp.headers.get('x-container-read') @@ -180,6 +223,7 @@ class ObjectController(Controller): # pass the policy index to storage nodes via req header policy_index = req.headers.get('X-Backend-Storage-Policy-Index', container_info['storage_policy']) + policy = POLICIES.get_by_index(policy_index) obj_ring = self.app.get_object_ring(policy_index) req.headers['X-Backend-Storage-Policy-Index'] = policy_index if 'swift.authorize' in req.environ: @@ -188,9 +232,10 @@ class ObjectController(Controller): return aresp partition = obj_ring.get_part( self.account_name, self.container_name, self.object_name) - resp = self.GETorHEAD_base( - req, _('Object'), obj_ring, partition, - req.swift_entity_path) + node_iter = self.app.iter_nodes(obj_ring, partition) + + resp = self._reroute(policy)._get_or_head_response( + req, node_iter, partition, policy) if ';' in resp.headers.get('content-type', ''): resp.content_type = clean_content_type( @@ -383,7 +428,10 @@ class ObjectController(Controller): _('Trying to get final status of PUT to %s') % req.path) return (None, None) - def _get_put_responses(self, req, conns, nodes): + def _get_put_responses(self, req, conns, nodes, **kwargs): + """ + Collect replicated object responses. + """ statuses = [] reasons = [] bodies = [] @@ -488,6 +536,7 @@ class ObjectController(Controller): self.object_name = src_obj_name self.container_name = src_container_name self.account_name = src_account_name + source_resp = self.GET(source_req) # This gives middlewares a way to change the source; for example, @@ -589,8 +638,9 @@ class ObjectController(Controller): 'X-Newest': 'True'} hreq = Request.blank(req.path_info, headers=_headers, environ={'REQUEST_METHOD': 'HEAD'}) + hnode_iter = self.app.iter_nodes(obj_ring, partition) hresp = self.GETorHEAD_base( - hreq, _('Object'), obj_ring, partition, + hreq, _('Object'), hnode_iter, partition, hreq.swift_entity_path) is_manifest = 'X-Object-Manifest' in req.headers or \ @@ -654,7 +704,10 @@ class ObjectController(Controller): req.headers['X-Timestamp'] = Timestamp(time.time()).internal return None - def _check_failure_put_connections(self, conns, req, nodes): + def _check_failure_put_connections(self, conns, req, nodes, min_conns): + """ + Identify any failed connections and check minimum connection count. + """ if req.if_none_match is not None and '*' in req.if_none_match: statuses = [conn.resp.status for conn in conns if conn.resp] if HTTP_PRECONDITION_FAILED in statuses: @@ -675,7 +728,6 @@ class ObjectController(Controller): 'timestamps': ', '.join(timestamps)}) raise HTTPAccepted(request=req) - min_conns = quorum_size(len(nodes)) self._check_min_conn(req, conns, min_conns) def _get_put_connections(self, req, nodes, partition, outgoing_headers, @@ -709,8 +761,12 @@ class ObjectController(Controller): raise HTTPServiceUnavailable(request=req) def _transfer_data(self, req, data_source, conns, nodes): - min_conns = quorum_size(len(nodes)) + """ + Transfer data for a replicated object. + This method was added in the PUT method extraction change + """ + min_conns = quorum_size(len(nodes)) bytes_transferred = 0 try: with ContextPool(len(nodes)) as pool: @@ -775,11 +831,11 @@ class ObjectController(Controller): This method is responsible for establishing connection with storage nodes and sending object to each one of those - nodes. After sending the data, the "best" reponse will be + nodes. After sending the data, the "best" response will be returned based on statuses from all connections """ - policy_idx = req.headers.get('X-Backend-Storage-Policy-Index') - policy = POLICIES.get_by_index(policy_idx) + policy_index = req.headers.get('X-Backend-Storage-Policy-Index') + policy = POLICIES.get_by_index(policy_index) if not nodes: return HTTPNotFound() @@ -790,11 +846,11 @@ class ObjectController(Controller): expect = False conns = self._get_put_connections(req, nodes, partition, outgoing_headers, policy, expect) - + min_conns = quorum_size(len(nodes)) try: # check that a minimum number of connections were established and # meet all the correct conditions set in the request - self._check_failure_put_connections(conns, req, nodes) + self._check_failure_put_connections(conns, req, nodes, min_conns) # transfer data self._transfer_data(req, data_source, conns, nodes) @@ -1015,6 +1071,21 @@ class ObjectController(Controller): headers, overrides=status_overrides) return resp + def _reroute(self, policy): + """ + For COPY requests we need to make sure the controller instance the + request is routed through is the correct type for the policy. + """ + if not policy: + raise HTTPServiceUnavailable('Unknown Storage Policy') + if policy.policy_type != self.policy_type: + controller = self.app.obj_controller_router[policy]( + self.app, self.account_name, self.container_name, + self.object_name) + else: + controller = self + return controller + @public @cors_validation @delay_denial @@ -1031,6 +1102,7 @@ class ObjectController(Controller): self.account_name = dest_account del req.headers['Destination-Account'] dest_container, dest_object = check_destination_header(req) + source = '/%s/%s' % (self.container_name, self.object_name) self.container_name = dest_container self.object_name = dest_object @@ -1042,4 +1114,1109 @@ class ObjectController(Controller): req.headers['Content-Length'] = 0 req.headers['X-Copy-From'] = quote(source) del req.headers['Destination'] - return self.PUT(req) + + container_info = self.container_info( + dest_account, dest_container, req) + dest_policy = POLICIES.get_by_index(container_info['storage_policy']) + + return self._reroute(dest_policy).PUT(req) + + +@ObjectControllerRouter.register(REPL_POLICY) +class ReplicatedObjectController(BaseObjectController): + + def _get_or_head_response(self, req, node_iter, partition, policy): + resp = self.GETorHEAD_base( + req, _('Object'), node_iter, partition, + req.swift_entity_path) + return resp + + +class ECAppIter(object): + """ + WSGI iterable that decodes EC fragment archives (or portions thereof) + into the original object (or portions thereof). + + :param path: path for the request + + :param policy: storage policy for this object + + :param internal_app_iters: list of the WSGI iterables from object server + GET responses for fragment archives. For an M+K erasure code, the + caller must supply M such iterables. + + :param range_specs: list of dictionaries describing the ranges requested + by the client. Each dictionary contains the start and end of the + client's requested byte range as well as the start and end of the EC + segments containing that byte range. + + :param obj_length: length of the object, in bytes. Learned from the + headers in the GET response from the object server. + + :param logger: a logger + """ + def __init__(self, path, policy, internal_app_iters, range_specs, + obj_length, logger): + self.path = path + self.policy = policy + self.internal_app_iters = internal_app_iters + self.range_specs = range_specs + self.obj_length = obj_length + self.boundary = '' + self.logger = logger + + def close(self): + for it in self.internal_app_iters: + close_if_possible(it) + + def __iter__(self): + segments_iter = self.decode_segments_from_fragments() + + if len(self.range_specs) == 0: + # plain GET; just yield up segments + for seg in segments_iter: + yield seg + return + + if len(self.range_specs) > 1: + raise NotImplementedError("multi-range GETs not done yet") + + for range_spec in self.range_specs: + client_start = range_spec['client_start'] + client_end = range_spec['client_end'] + segment_start = range_spec['segment_start'] + segment_end = range_spec['segment_end'] + + seg_size = self.policy.ec_segment_size + is_suffix = client_start is None + + if is_suffix: + # Suffix byte ranges (i.e. requests for the last N bytes of + # an object) are likely to end up not on a segment boundary. + client_range_len = client_end + client_start = max(self.obj_length - client_range_len, 0) + client_end = self.obj_length - 1 + + # may be mid-segment; if it is, then everything up to the + # first segment boundary is garbage, and is discarded before + # ever getting into this function. + unaligned_segment_start = max(self.obj_length - segment_end, 0) + alignment_offset = ( + (seg_size - (unaligned_segment_start % seg_size)) + % seg_size) + segment_start = unaligned_segment_start + alignment_offset + segment_end = self.obj_length - 1 + else: + # It's entirely possible that the client asked for a range that + # includes some bytes we have and some we don't; for example, a + # range of bytes 1000-20000000 on a 1500-byte object. + segment_end = (min(segment_end, self.obj_length - 1) + if segment_end is not None + else self.obj_length - 1) + client_end = (min(client_end, self.obj_length - 1) + if client_end is not None + else self.obj_length - 1) + + num_segments = int( + math.ceil(float(segment_end + 1 - segment_start) + / self.policy.ec_segment_size)) + # We get full segments here, but the client may have requested a + # byte range that begins or ends in the middle of a segment. + # Thus, we have some amount of overrun (extra decoded bytes) + # that we trim off so the client gets exactly what they + # requested. + start_overrun = client_start - segment_start + end_overrun = segment_end - client_end + + for i, next_seg in enumerate(segments_iter): + # We may have a start_overrun of more than one segment in + # the case of suffix-byte-range requests. However, we never + # have an end_overrun of more than one segment. + if start_overrun > 0: + seglen = len(next_seg) + if seglen <= start_overrun: + start_overrun -= seglen + continue + else: + next_seg = next_seg[start_overrun:] + start_overrun = 0 + + if i == (num_segments - 1) and end_overrun: + next_seg = next_seg[:-end_overrun] + + yield next_seg + + def decode_segments_from_fragments(self): + # Decodes the fragments from the object servers and yields one + # segment at a time. + queues = [Queue(1) for _junk in range(len(self.internal_app_iters))] + + def put_fragments_in_queue(frag_iter, queue): + try: + for fragment in frag_iter: + if fragment[0] == ' ': + raise Exception('Leading whitespace on fragment.') + queue.put(fragment) + except GreenletExit: + # killed by contextpool + pass + except ChunkReadTimeout: + # unable to resume in GetOrHeadHandler + pass + except: # noqa + self.logger.exception("Exception fetching fragments for %r" % + self.path) + finally: + queue.resize(2) # ensure there's room + queue.put(None) + + with ContextPool(len(self.internal_app_iters)) as pool: + for app_iter, queue in zip( + self.internal_app_iters, queues): + pool.spawn(put_fragments_in_queue, app_iter, queue) + + while True: + fragments = [] + for qi, queue in enumerate(queues): + fragment = queue.get() + queue.task_done() + fragments.append(fragment) + + # If any object server connection yields out a None; we're + # done. Either they are all None, and we've finished + # successfully; or some un-recoverable failure has left us + # with an un-reconstructible list of fragments - so we'll + # break out of the iter so WSGI can tear down the broken + # connection. + if not all(fragments): + break + try: + segment = self.policy.pyeclib_driver.decode(fragments) + except ECDriverError: + self.logger.exception("Error decoding fragments for %r" % + self.path) + raise + + yield segment + + def app_iter_range(self, start, end): + return self + + def app_iter_ranges(self, content_type, boundary, content_size): + self.boundary = boundary + + +def client_range_to_segment_range(client_start, client_end, segment_size): + """ + Takes a byterange from the client and converts it into a byterange + spanning the necessary segments. + + Handles prefix, suffix, and fully-specified byte ranges. + + Examples: + client_range_to_segment_range(100, 700, 512) = (0, 1023) + client_range_to_segment_range(100, 700, 256) = (0, 767) + client_range_to_segment_range(300, None, 256) = (256, None) + + :param client_start: first byte of the range requested by the client + :param client_end: last byte of the range requested by the client + :param segment_size: size of an EC segment, in bytes + + :returns: a 2-tuple (seg_start, seg_end) where + + * seg_start is the first byte of the first segment, or None if this is + a suffix byte range + + * seg_end is the last byte of the last segment, or None if this is a + prefix byte range + """ + # the index of the first byte of the first segment + segment_start = ( + int(client_start // segment_size) + * segment_size) if client_start is not None else None + # the index of the last byte of the last segment + segment_end = ( + # bytes M- + None if client_end is None else + # bytes M-N + (((int(client_end // segment_size) + 1) + * segment_size) - 1) if client_start is not None else + # bytes -N: we get some extra bytes to make sure we + # have all we need. + # + # To see why, imagine a 100-byte segment size, a + # 340-byte object, and a request for the last 50 + # bytes. Naively requesting the last 100 bytes would + # result in a truncated first segment and hence a + # truncated download. (Of course, the actual + # obj-server requests are for fragments, not + # segments, but that doesn't change the + # calculation.) + # + # This does mean that we fetch an extra segment if + # the object size is an exact multiple of the + # segment size. It's a little wasteful, but it's + # better to be a little wasteful than to get some + # range requests completely wrong. + (int(math.ceil(( + float(client_end) / segment_size) + 1)) # nsegs + * segment_size)) + return (segment_start, segment_end) + + +def segment_range_to_fragment_range(segment_start, segment_end, segment_size, + fragment_size): + """ + Takes a byterange spanning some segments and converts that into a + byterange spanning the corresponding fragments within their fragment + archives. + + Handles prefix, suffix, and fully-specified byte ranges. + + :param segment_start: first byte of the first segment + :param segment_end: last byte of the last segment + :param segment_size: size of an EC segment, in bytes + :param fragment_size: size of an EC fragment, in bytes + + :returns: a 2-tuple (frag_start, frag_end) where + + * frag_start is the first byte of the first fragment, or None if this + is a suffix byte range + + * frag_end is the last byte of the last fragment, or None if this is a + prefix byte range + """ + # Note: segment_start and (segment_end + 1) are + # multiples of segment_size, so we don't have to worry + # about integer math giving us rounding troubles. + # + # There's a whole bunch of +1 and -1 in here; that's because HTTP wants + # byteranges to be inclusive of the start and end, so e.g. bytes 200-300 + # is a range containing 101 bytes. Python has half-inclusive ranges, of + # course, so we have to convert back and forth. We try to keep things in + # HTTP-style byteranges for consistency. + + # the index of the first byte of the first fragment + fragment_start = (( + segment_start / segment_size * fragment_size) + if segment_start is not None else None) + # the index of the last byte of the last fragment + fragment_end = ( + # range unbounded on the right + None if segment_end is None else + # range unbounded on the left; no -1 since we're + # asking for the last N bytes, not to have a + # particular byte be the last one + ((segment_end + 1) / segment_size + * fragment_size) if segment_start is None else + # range bounded on both sides; the -1 is because the + # rest of the expression computes the length of the + # fragment, and a range of N bytes starts at index M + # and ends at M + N - 1. + ((segment_end + 1) / segment_size * fragment_size) - 1) + return (fragment_start, fragment_end) + + +NO_DATA_SENT = 1 +SENDING_DATA = 2 +DATA_SENT = 3 +DATA_ACKED = 4 +COMMIT_SENT = 5 + + +class ECPutter(object): + """ + This is here mostly to wrap up the fact that all EC PUTs are + chunked because of the mime boundary footer trick and the first + half of the two-phase PUT conversation handling. + + An HTTP PUT request that supports streaming. + + Probably deserves more docs than this, but meh. + """ + def __init__(self, conn, node, resp, path, connect_duration, + mime_boundary): + # Note: you probably want to call Putter.connect() instead of + # instantiating one of these directly. + self.conn = conn + self.node = node + self.resp = resp + self.path = path + self.connect_duration = connect_duration + # for handoff nodes node_index is None + self.node_index = node.get('index') + self.mime_boundary = mime_boundary + self.chunk_hasher = md5() + + self.failed = False + self.queue = None + self.state = NO_DATA_SENT + + def current_status(self): + """ + Returns the current status of the response. + + A response starts off with no current status, then may or may not have + a status of 100 for some time, and then ultimately has a final status + like 200, 404, et cetera. + """ + return self.resp.status + + def await_response(self, timeout, informational=False): + """ + Get 100-continue response indicating the end of 1st phase of a 2-phase + commit or the final response, i.e. the one with status >= 200. + + Might or might not actually wait for anything. If we said Expect: + 100-continue but got back a non-100 response, that'll be the thing + returned, and we won't do any network IO to get it. OTOH, if we got + a 100 Continue response and sent up the PUT request's body, then + we'll actually read the 2xx-5xx response off the network here. + + :returns: HTTPResponse + :raises: Timeout if the response took too long + """ + conn = self.conn + with Timeout(timeout): + if not conn.resp: + if informational: + self.resp = conn.getexpect() + else: + self.resp = conn.getresponse() + return self.resp + + def spawn_sender_greenthread(self, pool, queue_depth, write_timeout, + exception_handler): + """Call before sending the first chunk of request body""" + self.queue = Queue(queue_depth) + pool.spawn(self._send_file, write_timeout, exception_handler) + + def wait(self): + if self.queue.unfinished_tasks: + self.queue.join() + + def _start_mime_doc_object_body(self): + self.queue.put("--%s\r\nX-Document: object body\r\n\r\n" % + (self.mime_boundary,)) + + def send_chunk(self, chunk): + if not chunk: + # If we're not using chunked transfer-encoding, sending a 0-byte + # chunk is just wasteful. If we *are* using chunked + # transfer-encoding, sending a 0-byte chunk terminates the + # request body. Neither one of these is good. + return + elif self.state == DATA_SENT: + raise ValueError("called send_chunk after end_of_object_data") + + if self.state == NO_DATA_SENT and self.mime_boundary: + # We're sending the object plus other stuff in the same request + # body, all wrapped up in multipart MIME, so we'd better start + # off the MIME document before sending any object data. + self._start_mime_doc_object_body() + self.state = SENDING_DATA + + self.queue.put(chunk) + + def end_of_object_data(self, footer_metadata): + """ + Call when there is no more data to send. + + :param footer_metadata: dictionary of metadata items + """ + if self.state == DATA_SENT: + raise ValueError("called end_of_object_data twice") + elif self.state == NO_DATA_SENT and self.mime_boundary: + self._start_mime_doc_object_body() + + footer_body = json.dumps(footer_metadata) + footer_md5 = md5(footer_body).hexdigest() + + tail_boundary = ("--%s" % (self.mime_boundary,)) + + message_parts = [ + ("\r\n--%s\r\n" % self.mime_boundary), + "X-Document: object metadata\r\n", + "Content-MD5: %s\r\n" % footer_md5, + "\r\n", + footer_body, "\r\n", + tail_boundary, "\r\n", + ] + self.queue.put("".join(message_parts)) + + self.queue.put('') + self.state = DATA_SENT + + def send_commit_confirmation(self): + """ + Call when there are > quorum 2XX responses received. Send commit + confirmations to all object nodes to finalize the PUT. + """ + if self.state == COMMIT_SENT: + raise ValueError("called send_commit_confirmation twice") + + self.state = DATA_ACKED + + if self.mime_boundary: + body = "put_commit_confirmation" + tail_boundary = ("--%s--" % (self.mime_boundary,)) + message_parts = [ + "X-Document: put commit\r\n", + "\r\n", + body, "\r\n", + tail_boundary, + ] + self.queue.put("".join(message_parts)) + + self.queue.put('') + self.state = COMMIT_SENT + + def _send_file(self, write_timeout, exception_handler): + """ + Method for a file PUT coro. Takes chunks from a queue and sends them + down a socket. + + If something goes wrong, the "failed" attribute will be set to true + and the exception handler will be called. + """ + while True: + chunk = self.queue.get() + if not self.failed: + to_send = "%x\r\n%s\r\n" % (len(chunk), chunk) + try: + with ChunkWriteTimeout(write_timeout): + self.conn.send(to_send) + except (Exception, ChunkWriteTimeout): + self.failed = True + exception_handler(self.conn.node, _('Object'), + _('Trying to write to %s') % self.path) + self.queue.task_done() + + @classmethod + def connect(cls, node, part, path, headers, conn_timeout, node_timeout, + chunked=False): + """ + Connect to a backend node and send the headers. + + :returns: Putter instance + + :raises: ConnectionTimeout if initial connection timed out + :raises: ResponseTimeout if header retrieval timed out + :raises: InsufficientStorage on 507 response from node + :raises: PutterConnectError on non-507 server error response from node + :raises: FooterNotSupported if need_metadata_footer is set but + backend node can't process footers + :raises: MultiphasePUTNotSupported if need_multiphase_support is + set but backend node can't handle multiphase PUT + """ + mime_boundary = "%.64x" % random.randint(0, 16 ** 64) + headers = HeaderKeyDict(headers) + # We're going to be adding some unknown amount of data to the + # request, so we can't use an explicit content length, and thus + # we must use chunked encoding. + headers['Transfer-Encoding'] = 'chunked' + headers['Expect'] = '100-continue' + if 'Content-Length' in headers: + headers['X-Backend-Obj-Content-Length'] = \ + headers.pop('Content-Length') + + headers['X-Backend-Obj-Multipart-Mime-Boundary'] = mime_boundary + + headers['X-Backend-Obj-Metadata-Footer'] = 'yes' + + headers['X-Backend-Obj-Multiphase-Commit'] = 'yes' + + start_time = time.time() + with ConnectionTimeout(conn_timeout): + conn = http_connect(node['ip'], node['port'], node['device'], + part, 'PUT', path, headers) + connect_duration = time.time() - start_time + + with ResponseTimeout(node_timeout): + resp = conn.getexpect() + + if resp.status == HTTP_INSUFFICIENT_STORAGE: + raise InsufficientStorage + + if is_server_error(resp.status): + raise PutterConnectError(resp.status) + + if is_informational(resp.status): + continue_headers = HeaderKeyDict(resp.getheaders()) + can_send_metadata_footer = config_true_value( + continue_headers.get('X-Obj-Metadata-Footer', 'no')) + can_handle_multiphase_put = config_true_value( + continue_headers.get('X-Obj-Multiphase-Commit', 'no')) + + if not can_send_metadata_footer: + raise FooterNotSupported() + + if not can_handle_multiphase_put: + raise MultiphasePUTNotSupported() + + conn.node = node + conn.resp = None + if is_success(resp.status) or resp.status == HTTP_CONFLICT: + conn.resp = resp + elif (headers.get('If-None-Match', None) is not None and + resp.status == HTTP_PRECONDITION_FAILED): + conn.resp = resp + + return cls(conn, node, resp, path, connect_duration, mime_boundary) + + +def chunk_transformer(policy, nstreams): + segment_size = policy.ec_segment_size + + buf = collections.deque() + total_buf_len = 0 + + chunk = yield + while chunk: + buf.append(chunk) + total_buf_len += len(chunk) + if total_buf_len >= segment_size: + chunks_to_encode = [] + # extract as many chunks as we can from the input buffer + while total_buf_len >= segment_size: + to_take = segment_size + pieces = [] + while to_take > 0: + piece = buf.popleft() + if len(piece) > to_take: + buf.appendleft(piece[to_take:]) + piece = piece[:to_take] + pieces.append(piece) + to_take -= len(piece) + total_buf_len -= len(piece) + chunks_to_encode.append(''.join(pieces)) + + frags_by_byte_order = [] + for chunk_to_encode in chunks_to_encode: + frags_by_byte_order.append( + policy.pyeclib_driver.encode(chunk_to_encode)) + # Sequential calls to encode() have given us a list that + # looks like this: + # + # [[frag_A1, frag_B1, frag_C1, ...], + # [frag_A2, frag_B2, frag_C2, ...], ...] + # + # What we need is a list like this: + # + # [(frag_A1 + frag_A2 + ...), # destined for node A + # (frag_B1 + frag_B2 + ...), # destined for node B + # (frag_C1 + frag_C2 + ...), # destined for node C + # ...] + obj_data = [''.join(frags) + for frags in zip(*frags_by_byte_order)] + chunk = yield obj_data + else: + # didn't have enough data to encode + chunk = yield None + + # Now we've gotten an empty chunk, which indicates end-of-input. + # Take any leftover bytes and encode them. + last_bytes = ''.join(buf) + if last_bytes: + last_frags = policy.pyeclib_driver.encode(last_bytes) + yield last_frags + else: + yield [''] * nstreams + + +def trailing_metadata(policy, client_obj_hasher, + bytes_transferred_from_client, + fragment_archive_index): + return { + # etag and size values are being added twice here. + # The container override header is used to update the container db + # with these values as they represent the correct etag and size for + # the whole object and not just the FA. + # The object sysmeta headers will be saved on each FA of the object. + 'X-Object-Sysmeta-EC-Etag': client_obj_hasher.hexdigest(), + 'X-Object-Sysmeta-EC-Content-Length': + str(bytes_transferred_from_client), + 'X-Backend-Container-Update-Override-Etag': + client_obj_hasher.hexdigest(), + 'X-Backend-Container-Update-Override-Size': + str(bytes_transferred_from_client), + 'X-Object-Sysmeta-Ec-Frag-Index': str(fragment_archive_index), + # These fields are for debuggability, + # AKA "what is this thing?" + 'X-Object-Sysmeta-EC-Scheme': policy.ec_scheme_description, + 'X-Object-Sysmeta-EC-Segment-Size': str(policy.ec_segment_size), + } + + +@ObjectControllerRouter.register(EC_POLICY) +class ECObjectController(BaseObjectController): + + def _get_or_head_response(self, req, node_iter, partition, policy): + req.headers.setdefault("X-Backend-Etag-Is-At", + "X-Object-Sysmeta-Ec-Etag") + + if req.method == 'HEAD': + # no fancy EC decoding here, just one plain old HEAD request to + # one object server because all fragments hold all metadata + # information about the object. + resp = self.GETorHEAD_base( + req, _('Object'), node_iter, partition, + req.swift_entity_path) + else: # GET request + orig_range = None + range_specs = [] + if req.range: + orig_range = req.range + # Since segments and fragments have different sizes, we need + # to modify the Range header sent to the object servers to + # make sure we get the right fragments out of the fragment + # archives. + segment_size = policy.ec_segment_size + fragment_size = policy.fragment_size + + range_specs = [] + new_ranges = [] + for client_start, client_end in req.range.ranges: + + segment_start, segment_end = client_range_to_segment_range( + client_start, client_end, segment_size) + + fragment_start, fragment_end = \ + segment_range_to_fragment_range( + segment_start, segment_end, + segment_size, fragment_size) + + new_ranges.append((fragment_start, fragment_end)) + range_specs.append({'client_start': client_start, + 'client_end': client_end, + 'segment_start': segment_start, + 'segment_end': segment_end}) + + req.range = "bytes=" + ",".join( + "%s-%s" % (s if s is not None else "", + e if e is not None else "") + for s, e in new_ranges) + + node_iter = GreenthreadSafeIterator(node_iter) + num_gets = policy.ec_ndata + with ContextPool(num_gets) as pool: + pile = GreenAsyncPile(pool) + for _junk in range(num_gets): + pile.spawn(self.GETorHEAD_base, + req, 'Object', node_iter, partition, + req.swift_entity_path, + client_chunk_size=policy.fragment_size) + + responses = list(pile) + good_responses = [] + bad_responses = [] + for response in responses: + if is_success(response.status_int): + good_responses.append(response) + else: + bad_responses.append(response) + + req.range = orig_range + if len(good_responses) == num_gets: + # If these aren't all for the same object, then error out so + # at least the client doesn't get garbage. We can do a lot + # better here with more work, but this'll work for now. + found_obj_etags = set( + resp.headers['X-Object-Sysmeta-Ec-Etag'] + for resp in good_responses) + if len(found_obj_etags) > 1: + self.app.logger.debug( + "Returning 503 for %s; found too many etags (%s)", + req.path, + ", ".join(found_obj_etags)) + return HTTPServiceUnavailable(request=req) + + # we found enough pieces to decode the object, so now let's + # decode the object + resp_headers = HeaderKeyDict(good_responses[0].headers.items()) + resp_headers.pop('Content-Range', None) + eccl = resp_headers.get('X-Object-Sysmeta-Ec-Content-Length') + obj_length = int(eccl) if eccl is not None else None + + resp = Response( + request=req, + headers=resp_headers, + conditional_response=True, + app_iter=ECAppIter( + req.swift_entity_path, + policy, + [r.app_iter for r in good_responses], + range_specs, + obj_length, + logger=self.app.logger)) + else: + resp = self.best_response( + req, + [r.status_int for r in bad_responses], + [r.status.split(' ', 1)[1] for r in bad_responses], + [r.body for r in bad_responses], + 'Object', + headers=[r.headers for r in bad_responses]) + + self._fix_response_headers(resp) + return resp + + def _fix_response_headers(self, resp): + # EC fragment archives each have different bytes, hence different + # etags. However, they all have the original object's etag stored in + # sysmeta, so we copy that here so the client gets it. + resp.headers['Etag'] = resp.headers.get( + 'X-Object-Sysmeta-Ec-Etag') + resp.headers['Content-Length'] = resp.headers.get( + 'X-Object-Sysmeta-Ec-Content-Length') + + return resp + + def _connect_put_node(self, node_iter, part, path, headers, + logger_thread_locals): + """ + Make a connection for a erasure encoded object. + + Connects to the first working node that it finds in node_iter and sends + over the request headers. Returns a Putter to handle the rest of the + streaming, or None if no working nodes were found. + """ + # the object server will get different bytes, so these + # values do not apply (Content-Length might, in general, but + # in the specific case of replication vs. EC, it doesn't). + headers.pop('Content-Length', None) + headers.pop('Etag', None) + + self.app.logger.thread_locals = logger_thread_locals + for node in node_iter: + try: + putter = ECPutter.connect( + node, part, path, headers, + conn_timeout=self.app.conn_timeout, + node_timeout=self.app.node_timeout) + self.app.set_node_timing(node, putter.connect_duration) + return putter + except InsufficientStorage: + self.app.error_limit(node, _('ERROR Insufficient Storage')) + except PutterConnectError as e: + self.app.error_occurred( + node, _('ERROR %(status)d Expect: 100-continue ' + 'From Object Server') % { + 'status': e.status}) + except (Exception, Timeout): + self.app.exception_occurred( + node, _('Object'), + _('Expect: 100-continue on %s') % path) + + def _determine_chunk_destinations(self, putters): + """ + Given a list of putters, return a dict where the key is the putter + and the value is the node index to use. + + This is done so that we line up handoffs using the same node index + (in the primary part list) as the primary that the handoff is standing + in for. This lets erasure-code fragment archives wind up on the + preferred local primary nodes when possible. + """ + # Give each putter a "chunk index": the index of the + # transformed chunk that we'll send to it. + # + # For primary nodes, that's just its index (primary 0 gets + # chunk 0, primary 1 gets chunk 1, and so on). For handoffs, + # we assign the chunk index of a missing primary. + handoff_conns = [] + chunk_index = {} + for p in putters: + if p.node_index is not None: + chunk_index[p] = p.node_index + else: + handoff_conns.append(p) + + # Note: we may have more holes than handoffs. This is okay; it + # just means that we failed to connect to one or more storage + # nodes. Holes occur when a storage node is down, in which + # case the connection is not replaced, and when a storage node + # returns 507, in which case a handoff is used to replace it. + holes = [x for x in range(len(putters)) + if x not in chunk_index.values()] + + for hole, p in zip(holes, handoff_conns): + chunk_index[p] = hole + return chunk_index + + def _transfer_data(self, req, policy, data_source, putters, nodes, + min_conns, etag_hasher): + """ + Transfer data for an erasure coded object. + + This method was added in the PUT method extraction change + """ + bytes_transferred = 0 + chunk_transform = chunk_transformer(policy, len(nodes)) + chunk_transform.send(None) + + def send_chunk(chunk): + if etag_hasher: + etag_hasher.update(chunk) + backend_chunks = chunk_transform.send(chunk) + if backend_chunks is None: + # If there's not enough bytes buffered for erasure-encoding + # or whatever we're doing, the transform will give us None. + return + + for putter in list(putters): + backend_chunk = backend_chunks[chunk_index[putter]] + if not putter.failed: + putter.chunk_hasher.update(backend_chunk) + putter.send_chunk(backend_chunk) + else: + putters.remove(putter) + self._check_min_conn( + req, putters, min_conns, msg='Object PUT exceptions during' + ' send, %(conns)s/%(nodes)s required connections') + + try: + with ContextPool(len(putters)) as pool: + + # build our chunk index dict to place handoffs in the + # same part nodes index as the primaries they are covering + chunk_index = self._determine_chunk_destinations(putters) + + for putter in putters: + putter.spawn_sender_greenthread( + pool, self.app.put_queue_depth, self.app.node_timeout, + self.app.exception_occurred) + while True: + with ChunkReadTimeout(self.app.client_timeout): + try: + chunk = next(data_source) + except StopIteration: + computed_etag = (etag_hasher.hexdigest() + if etag_hasher else None) + received_etag = req.headers.get( + 'etag', '').strip('"') + if (computed_etag and received_etag and + computed_etag != received_etag): + raise HTTPUnprocessableEntity(request=req) + + send_chunk('') # flush out any buffered data + + for putter in putters: + trail_md = trailing_metadata( + policy, etag_hasher, + bytes_transferred, + chunk_index[putter]) + trail_md['Etag'] = \ + putter.chunk_hasher.hexdigest() + putter.end_of_object_data(trail_md) + break + bytes_transferred += len(chunk) + if bytes_transferred > constraints.MAX_FILE_SIZE: + raise HTTPRequestEntityTooLarge(request=req) + + send_chunk(chunk) + + for putter in putters: + putter.wait() + + # for storage policies requiring 2-phase commit (e.g. + # erasure coding), enforce >= 'quorum' number of + # 100-continue responses - this indicates successful + # object data and metadata commit and is a necessary + # condition to be met before starting 2nd PUT phase + final_phase = False + need_quorum = True + statuses, reasons, bodies, _junk, quorum = \ + self._get_put_responses( + req, putters, len(nodes), final_phase, + min_conns, need_quorum=need_quorum) + if not quorum: + self.app.logger.error( + _('Not enough object servers ack\'ed (got %d)'), + statuses.count(HTTP_CONTINUE)) + raise HTTPServiceUnavailable(request=req) + # quorum achieved, start 2nd phase - send commit + # confirmation to participating object servers + # so they write a .durable state file indicating + # a successful PUT + for putter in putters: + putter.send_commit_confirmation() + for putter in putters: + putter.wait() + except ChunkReadTimeout as err: + self.app.logger.warn( + _('ERROR Client read timeout (%ss)'), err.seconds) + self.app.logger.increment('client_timeouts') + raise HTTPRequestTimeout(request=req) + except HTTPException: + raise + except (Exception, Timeout): + self.app.logger.exception( + _('ERROR Exception causing client disconnect')) + raise HTTPClientDisconnect(request=req) + if req.content_length and bytes_transferred < req.content_length: + req.client_disconnect = True + self.app.logger.warn( + _('Client disconnected without sending enough data')) + self.app.logger.increment('client_disconnects') + raise HTTPClientDisconnect(request=req) + + def _have_adequate_successes(self, statuses, min_responses): + """ + Given a list of statuses from several requests, determine if a + satisfactory number of nodes have responded with 2xx statuses to + deem the transaction for a succssful response to the client. + + :param statuses: list of statuses returned so far + :param min_responses: minimal pass criterion for number of successes + :returns: True or False, depending on current number of successes + """ + if sum(1 for s in statuses if is_success(s)) >= min_responses: + return True + return False + + def _await_response(self, conn, final_phase): + return conn.await_response( + self.app.node_timeout, not final_phase) + + def _get_conn_response(self, conn, req, final_phase, **kwargs): + try: + resp = self._await_response(conn, final_phase=final_phase, + **kwargs) + except (Exception, Timeout): + resp = None + if final_phase: + status_type = 'final' + else: + status_type = 'commit' + self.app.exception_occurred( + conn.node, _('Object'), + _('Trying to get %s status of PUT to %s') % ( + status_type, req.path)) + return (conn, resp) + + def _get_put_responses(self, req, putters, num_nodes, final_phase, + min_responses, need_quorum=True): + """ + Collect erasure coded object responses. + + Collect object responses to a PUT request and determine if + satisfactory number of nodes have returned success. Return + statuses, quorum result if indicated by 'need_quorum' and + etags if this is a final phase or a multiphase PUT transaction. + + :param req: the request + :param putters: list of putters for the request + :param num_nodes: number of nodes involved + :param final_phase: boolean indicating if this is the last phase + :param min_responses: minimum needed when not requiring quorum + :param need_quorum: boolean indicating if quorum is required + """ + statuses = [] + reasons = [] + bodies = [] + etags = set() + + pile = GreenAsyncPile(len(putters)) + for putter in putters: + if putter.failed: + continue + pile.spawn(self._get_conn_response, putter, req, + final_phase=final_phase) + + def _handle_response(putter, response): + statuses.append(response.status) + reasons.append(response.reason) + if final_phase: + body = response.read() + bodies.append(body) + else: + body = '' + if response.status == HTTP_INSUFFICIENT_STORAGE: + putter.failed = True + self.app.error_limit(putter.node, + _('ERROR Insufficient Storage')) + elif response.status >= HTTP_INTERNAL_SERVER_ERROR: + putter.failed = True + self.app.error_occurred( + putter.node, + _('ERROR %(status)d %(body)s From Object Server ' + 're: %(path)s') % + {'status': response.status, + 'body': body[:1024], 'path': req.path}) + elif is_success(response.status): + etags.add(response.getheader('etag').strip('"')) + + quorum = False + for (putter, response) in pile: + if response: + _handle_response(putter, response) + if self._have_adequate_successes(statuses, min_responses): + break + else: + putter.failed = True + + # give any pending requests *some* chance to finish + finished_quickly = pile.waitall(self.app.post_quorum_timeout) + for (putter, response) in finished_quickly: + if response: + _handle_response(putter, response) + + if need_quorum: + if final_phase: + while len(statuses) < num_nodes: + statuses.append(HTTP_SERVICE_UNAVAILABLE) + reasons.append('') + bodies.append('') + else: + # intermediate response phase - set return value to true only + # if there are enough 100-continue acknowledgements + if self.have_quorum(statuses, num_nodes): + quorum = True + + return statuses, reasons, bodies, etags, quorum + + def _store_object(self, req, data_source, nodes, partition, + outgoing_headers): + """ + Store an erasure coded object. + """ + policy_index = int(req.headers.get('X-Backend-Storage-Policy-Index')) + policy = POLICIES.get_by_index(policy_index) + # Since the request body sent from client -> proxy is not + # the same as the request body sent proxy -> object, we + # can't rely on the object-server to do the etag checking - + # so we have to do it here. + etag_hasher = md5() + + min_conns = policy.quorum + putters = self._get_put_connections( + req, nodes, partition, outgoing_headers, + policy, expect=True) + + try: + # check that a minimum number of connections were established and + # meet all the correct conditions set in the request + self._check_failure_put_connections(putters, req, nodes, min_conns) + + self._transfer_data(req, policy, data_source, putters, + nodes, min_conns, etag_hasher) + final_phase = True + need_quorum = False + min_resp = 2 + putters = [p for p in putters if not p.failed] + # ignore response etags, and quorum boolean + statuses, reasons, bodies, _etags, _quorum = \ + self._get_put_responses(req, putters, len(nodes), + final_phase, min_resp, + need_quorum=need_quorum) + except HTTPException as resp: + return resp + + etag = etag_hasher.hexdigest() + resp = self.best_response(req, statuses, reasons, bodies, + _('Object PUT'), etag=etag, + quorum_size=min_conns) + resp.last_modified = math.ceil( + float(Timestamp(req.headers['X-Timestamp']))) + return resp diff --git a/swift/proxy/server.py b/swift/proxy/server.py index 28d41df55..8c9e22372 100644 --- a/swift/proxy/server.py +++ b/swift/proxy/server.py @@ -20,6 +20,8 @@ from swift import gettext_ as _ from random import shuffle from time import time import itertools +import functools +import sys from eventlet import Timeout @@ -32,11 +34,12 @@ from swift.common.utils import cache_from_env, get_logger, \ affinity_key_function, affinity_locality_predicate, list_from_csv, \ register_swift_info from swift.common.constraints import check_utf8 -from swift.proxy.controllers import AccountController, ObjectController, \ - ContainerController, InfoController +from swift.proxy.controllers import AccountController, ContainerController, \ + ObjectControllerRouter, InfoController +from swift.proxy.controllers.base import get_container_info from swift.common.swob import HTTPBadRequest, HTTPForbidden, \ HTTPMethodNotAllowed, HTTPNotFound, HTTPPreconditionFailed, \ - HTTPServerError, HTTPException, Request + HTTPServerError, HTTPException, Request, HTTPServiceUnavailable # List of entry points for mandatory middlewares. @@ -109,6 +112,7 @@ class Application(object): # ensure rings are loaded for all configured storage policies for policy in POLICIES: policy.load_ring(swift_dir) + self.obj_controller_router = ObjectControllerRouter() self.memcache = memcache mimetypes.init(mimetypes.knownfiles + [os.path.join(swift_dir, 'mime.types')]) @@ -235,29 +239,44 @@ class Application(object): """ return POLICIES.get_object_ring(policy_idx, self.swift_dir) - def get_controller(self, path): + def get_controller(self, req): """ Get the controller to handle a request. - :param path: path from request + :param req: the request :returns: tuple of (controller class, path dictionary) :raises: ValueError (thrown by split_path) if given invalid path """ - if path == '/info': + if req.path == '/info': d = dict(version=None, expose_info=self.expose_info, disallowed_sections=self.disallowed_sections, admin_key=self.admin_key) return InfoController, d - version, account, container, obj = split_path(path, 1, 4, True) + version, account, container, obj = split_path(req.path, 1, 4, True) d = dict(version=version, account_name=account, container_name=container, object_name=obj) if obj and container and account: - return ObjectController, d + info = get_container_info(req.environ, self) + policy_index = req.headers.get('X-Backend-Storage-Policy-Index', + info['storage_policy']) + policy = POLICIES.get_by_index(policy_index) + if not policy: + # This indicates that a new policy has been created, + # with rings, deployed, released (i.e. deprecated = + # False), used by a client to create a container via + # another proxy that was restarted after the policy + # was released, and is now cached - all before this + # worker was HUPed to stop accepting new + # connections. There should never be an "unknown" + # index - but when there is - it's probably operator + # error and hopefully temporary. + raise HTTPServiceUnavailable('Unknown Storage Policy') + return self.obj_controller_router[policy], d elif container and account: return ContainerController, d elif account and not container and not obj: @@ -317,7 +336,7 @@ class Application(object): request=req, body='Invalid UTF8 or contains NULL') try: - controller, path_parts = self.get_controller(req.path) + controller, path_parts = self.get_controller(req) p = req.path_info if isinstance(p, unicode): p = p.encode('utf-8') @@ -474,9 +493,9 @@ class Application(object): def iter_nodes(self, ring, partition, node_iter=None): """ Yields nodes for a ring partition, skipping over error - limited nodes and stopping at the configurable number of - nodes. If a node yielded subsequently gets error limited, an - extra node will be yielded to take its place. + limited nodes and stopping at the configurable number of nodes. If a + node yielded subsequently gets error limited, an extra node will be + yielded to take its place. Note that if you're going to iterate over this concurrently from multiple greenthreads, you'll want to use a @@ -527,7 +546,8 @@ class Application(object): if nodes_left <= 0: return - def exception_occurred(self, node, typ, additional_info): + def exception_occurred(self, node, typ, additional_info, + **kwargs): """ Handle logging of generic exceptions. @@ -536,11 +556,18 @@ class Application(object): :param additional_info: additional information to log """ self._incr_node_errors(node) - self.logger.exception( - _('ERROR with %(type)s server %(ip)s:%(port)s/%(device)s re: ' - '%(info)s'), - {'type': typ, 'ip': node['ip'], 'port': node['port'], - 'device': node['device'], 'info': additional_info}) + if 'level' in kwargs: + log = functools.partial(self.logger.log, kwargs.pop('level')) + if 'exc_info' not in kwargs: + kwargs['exc_info'] = sys.exc_info() + else: + log = self.logger.exception + log(_('ERROR with %(type)s server %(ip)s:%(port)s/%(device)s' + ' re: %(info)s'), { + 'type': typ, 'ip': node['ip'], 'port': + node['port'], 'device': node['device'], + 'info': additional_info + }, **kwargs) def modify_wsgi_pipeline(self, pipe): """ |