summaryrefslogtreecommitdiff
path: root/swift
diff options
context:
space:
mode:
Diffstat (limited to 'swift')
-rw-r--r--swift/account/reaper.py5
-rw-r--r--swift/cli/info.py7
-rw-r--r--swift/common/constraints.py13
-rw-r--r--swift/common/exceptions.py26
-rw-r--r--swift/common/manager.py3
-rw-r--r--swift/common/middleware/formpost.py9
-rw-r--r--swift/common/request_helpers.py28
-rw-r--r--swift/common/ring/ring.py3
-rw-r--r--swift/common/storage_policy.py409
-rw-r--r--swift/common/swob.py63
-rw-r--r--swift/common/utils.py42
-rw-r--r--swift/common/wsgi.py36
-rw-r--r--swift/container/sync.py125
-rw-r--r--swift/obj/diskfile.py889
-rw-r--r--swift/obj/mem_diskfile.py16
-rw-r--r--swift/obj/mem_server.py51
-rw-r--r--swift/obj/reconstructor.py925
-rw-r--r--swift/obj/replicator.py69
-rw-r--r--swift/obj/server.py288
-rw-r--r--swift/obj/ssync_receiver.py52
-rw-r--r--swift/obj/ssync_sender.py71
-rw-r--r--swift/obj/updater.py40
-rw-r--r--swift/proxy/controllers/__init__.py4
-rw-r--r--swift/proxy/controllers/account.py5
-rw-r--r--swift/proxy/controllers/base.py171
-rw-r--r--swift/proxy/controllers/container.py3
-rw-r--r--swift/proxy/controllers/obj.py1225
-rw-r--r--swift/proxy/server.py63
28 files changed, 4145 insertions, 496 deletions
diff --git a/swift/account/reaper.py b/swift/account/reaper.py
index ce69fab92..06a008535 100644
--- a/swift/account/reaper.py
+++ b/swift/account/reaper.py
@@ -19,6 +19,7 @@ from swift import gettext_ as _
from logging import DEBUG
from math import sqrt
from time import time
+import itertools
from eventlet import GreenPool, sleep, Timeout
@@ -432,7 +433,7 @@ class AccountReaper(Daemon):
* See also: :func:`swift.common.ring.Ring.get_nodes` for a description
of the container node dicts.
"""
- container_nodes = list(container_nodes)
+ cnodes = itertools.cycle(container_nodes)
try:
ring = self.get_object_ring(policy_index)
except PolicyError:
@@ -443,7 +444,7 @@ class AccountReaper(Daemon):
successes = 0
failures = 0
for node in nodes:
- cnode = container_nodes.pop()
+ cnode = next(cnodes)
try:
direct_delete_object(
node, part, account, container, obj,
diff --git a/swift/cli/info.py b/swift/cli/info.py
index 142b103f4..a8cfabd17 100644
--- a/swift/cli/info.py
+++ b/swift/cli/info.py
@@ -24,7 +24,7 @@ from swift.common.request_helpers import is_sys_meta, is_user_meta, \
from swift.account.backend import AccountBroker, DATADIR as ABDATADIR
from swift.container.backend import ContainerBroker, DATADIR as CBDATADIR
from swift.obj.diskfile import get_data_dir, read_metadata, DATADIR_BASE, \
- extract_policy_index
+ extract_policy
from swift.common.storage_policy import POLICIES
@@ -341,10 +341,7 @@ def print_obj(datafile, check_etag=True, swift_dir='/etc/swift',
datadir = DATADIR_BASE
# try to extract policy index from datafile disk path
- try:
- policy_index = extract_policy_index(datafile)
- except ValueError:
- pass
+ policy_index = int(extract_policy(datafile) or POLICIES.legacy)
try:
if policy_index:
diff --git a/swift/common/constraints.py b/swift/common/constraints.py
index d4458ddf8..8e3ba53b0 100644
--- a/swift/common/constraints.py
+++ b/swift/common/constraints.py
@@ -204,6 +204,19 @@ def check_object_creation(req, object_name):
return check_metadata(req, 'object')
+def check_dir(root, drive):
+ """
+ Verify that the path to the device is a directory and is a lesser
+ constraint that is enforced when a full mount_check isn't possible
+ with, for instance, a VM using loopback or partitions.
+
+ :param root: base path where the dir is
+ :param drive: drive name to be checked
+ :returns: True if it is a valid directoy, False otherwise
+ """
+ return os.path.isdir(os.path.join(root, drive))
+
+
def check_mount(root, drive):
"""
Verify that the path to the device is a mount point and mounted. This
diff --git a/swift/common/exceptions.py b/swift/common/exceptions.py
index d7ea759d6..b4c926eb1 100644
--- a/swift/common/exceptions.py
+++ b/swift/common/exceptions.py
@@ -31,10 +31,32 @@ class SwiftException(Exception):
pass
+class PutterConnectError(Exception):
+
+ def __init__(self, status=None):
+ self.status = status
+
+
class InvalidTimestamp(SwiftException):
pass
+class InsufficientStorage(SwiftException):
+ pass
+
+
+class FooterNotSupported(SwiftException):
+ pass
+
+
+class MultiphasePUTNotSupported(SwiftException):
+ pass
+
+
+class SuffixSyncError(SwiftException):
+ pass
+
+
class DiskFileError(SwiftException):
pass
@@ -103,6 +125,10 @@ class ConnectionTimeout(Timeout):
pass
+class ResponseTimeout(Timeout):
+ pass
+
+
class DriveNotMounted(SwiftException):
pass
diff --git a/swift/common/manager.py b/swift/common/manager.py
index a4ef350da..260516d6a 100644
--- a/swift/common/manager.py
+++ b/swift/common/manager.py
@@ -33,7 +33,8 @@ ALL_SERVERS = ['account-auditor', 'account-server', 'container-auditor',
'container-replicator', 'container-reconciler',
'container-server', 'container-sync',
'container-updater', 'object-auditor', 'object-server',
- 'object-expirer', 'object-replicator', 'object-updater',
+ 'object-expirer', 'object-replicator',
+ 'object-reconstructor', 'object-updater',
'proxy-server', 'account-replicator', 'account-reaper']
MAIN_SERVERS = ['proxy-server', 'account-server', 'container-server',
'object-server']
diff --git a/swift/common/middleware/formpost.py b/swift/common/middleware/formpost.py
index 7132b342a..56a6d20f3 100644
--- a/swift/common/middleware/formpost.py
+++ b/swift/common/middleware/formpost.py
@@ -218,7 +218,14 @@ class FormPost(object):
env, attrs['boundary'])
start_response(status, headers)
return [body]
- except (FormInvalid, MimeInvalid, EOFError) as err:
+ except MimeInvalid:
+ body = 'FormPost: invalid starting boundary'
+ start_response(
+ '400 Bad Request',
+ (('Content-Type', 'text/plain'),
+ ('Content-Length', str(len(body)))))
+ return [body]
+ except (FormInvalid, EOFError) as err:
body = 'FormPost: %s' % err
start_response(
'400 Bad Request',
diff --git a/swift/common/request_helpers.py b/swift/common/request_helpers.py
index 08e0ab5dc..14b9fd884 100644
--- a/swift/common/request_helpers.py
+++ b/swift/common/request_helpers.py
@@ -26,10 +26,12 @@ import time
from contextlib import contextmanager
from urllib import unquote
from swift import gettext_ as _
+from swift.common.storage_policy import POLICIES
from swift.common.constraints import FORMAT2CONTENT_TYPE
from swift.common.exceptions import ListingIterError, SegmentError
from swift.common.http import is_success
-from swift.common.swob import HTTPBadRequest, HTTPNotAcceptable
+from swift.common.swob import (HTTPBadRequest, HTTPNotAcceptable,
+ HTTPServiceUnavailable)
from swift.common.utils import split_path, validate_device_partition
from swift.common.wsgi import make_subrequest
@@ -82,21 +84,27 @@ def get_listing_content_type(req):
def get_name_and_placement(request, minsegs=1, maxsegs=None,
rest_with_last=False):
"""
- Utility function to split and validate the request path and
- storage_policy_index. The storage_policy_index is extracted from
- the headers of the request and converted to an integer, and then the
- args are passed through to :meth:`split_and_validate_path`.
+ Utility function to split and validate the request path and storage
+ policy. The storage policy index is extracted from the headers of
+ the request and converted to a StoragePolicy instance. The
+ remaining args are passed through to
+ :meth:`split_and_validate_path`.
:returns: a list, result of :meth:`split_and_validate_path` with
- storage_policy_index appended on the end
- :raises: HTTPBadRequest
+ the BaseStoragePolicy instance appended on the end
+ :raises: HTTPServiceUnavailable if the path is invalid or no policy exists
+ with the extracted policy_index.
"""
- policy_idx = request.headers.get('X-Backend-Storage-Policy-Index', '0')
- policy_idx = int(policy_idx)
+ policy_index = request.headers.get('X-Backend-Storage-Policy-Index')
+ policy = POLICIES.get_by_index(policy_index)
+ if not policy:
+ raise HTTPServiceUnavailable(
+ body=_("No policy with index %s") % policy_index,
+ request=request, content_type='text/plain')
results = split_and_validate_path(request, minsegs=minsegs,
maxsegs=maxsegs,
rest_with_last=rest_with_last)
- results.append(policy_idx)
+ results.append(policy)
return results
diff --git a/swift/common/ring/ring.py b/swift/common/ring/ring.py
index daad23ff1..62e19951d 100644
--- a/swift/common/ring/ring.py
+++ b/swift/common/ring/ring.py
@@ -243,7 +243,7 @@ class Ring(object):
if dev_id not in seen_ids:
part_nodes.append(self.devs[dev_id])
seen_ids.add(dev_id)
- return part_nodes
+ return [dict(node, index=i) for i, node in enumerate(part_nodes)]
def get_part(self, account, container=None, obj=None):
"""
@@ -291,6 +291,7 @@ class Ring(object):
====== ===============================================================
id unique integer identifier amongst devices
+ index offset into the primary node list for the partition
weight a float of the relative weight of this device as compared to
others; this indicates how many partitions the builder will try
to assign to this device
diff --git a/swift/common/storage_policy.py b/swift/common/storage_policy.py
index f33eda539..e45ab018c 100644
--- a/swift/common/storage_policy.py
+++ b/swift/common/storage_policy.py
@@ -17,10 +17,18 @@ import string
from swift.common.utils import config_true_value, SWIFT_CONF_FILE
from swift.common.ring import Ring
+from swift.common.utils import quorum_size
+from swift.common.exceptions import RingValidationError
+from pyeclib.ec_iface import ECDriver, ECDriverError, VALID_EC_TYPES
LEGACY_POLICY_NAME = 'Policy-0'
VALID_CHARS = '-' + string.letters + string.digits
+DEFAULT_POLICY_TYPE = REPL_POLICY = 'replication'
+EC_POLICY = 'erasure_coding'
+
+DEFAULT_EC_OBJECT_SEGMENT_SIZE = 1048576
+
class PolicyError(ValueError):
@@ -38,36 +46,73 @@ def _get_policy_string(base, policy_index):
return return_string
-def get_policy_string(base, policy_index):
+def get_policy_string(base, policy_or_index):
"""
- Helper function to construct a string from a base and the policy
- index. Used to encode the policy index into either a file name
- or a directory name by various modules.
+ Helper function to construct a string from a base and the policy.
+ Used to encode the policy index into either a file name or a
+ directory name by various modules.
:param base: the base string
- :param policy_index: the storage policy index
+ :param policy_or_index: StoragePolicy instance, or an index
+ (string or int), if None the legacy
+ storage Policy-0 is assumed.
:returns: base name with policy index added
+ :raises: PolicyError if no policy exists with the given policy_index
"""
- if POLICIES.get_by_index(policy_index) is None:
- raise PolicyError("No policy with index %r" % policy_index)
- return _get_policy_string(base, policy_index)
+ if isinstance(policy_or_index, BaseStoragePolicy):
+ policy = policy_or_index
+ else:
+ policy = POLICIES.get_by_index(policy_or_index)
+ if policy is None:
+ raise PolicyError("Unknown policy", index=policy_or_index)
+ return _get_policy_string(base, int(policy))
-class StoragePolicy(object):
+def split_policy_string(policy_string):
"""
- Represents a storage policy.
- Not meant to be instantiated directly; use
- :func:`~swift.common.storage_policy.reload_storage_policies` to load
- POLICIES from ``swift.conf``.
+ Helper function to convert a string representing a base and a
+ policy. Used to decode the policy from either a file name or
+ a directory name by various modules.
+
+ :param policy_string: base name with policy index added
+
+ :raises: PolicyError if given index does not map to a valid policy
+ :returns: a tuple, in the form (base, policy) where base is the base
+ string and policy is the StoragePolicy instance for the
+ index encoded in the policy_string.
+ """
+ if '-' in policy_string:
+ base, policy_index = policy_string.rsplit('-', 1)
+ else:
+ base, policy_index = policy_string, None
+ policy = POLICIES.get_by_index(policy_index)
+ if get_policy_string(base, policy) != policy_string:
+ raise PolicyError("Unknown policy", index=policy_index)
+ return base, policy
+
+
+class BaseStoragePolicy(object):
+ """
+ Represents a storage policy. Not meant to be instantiated directly;
+ implement a derived subclasses (e.g. StoragePolicy, ECStoragePolicy, etc)
+ or use :func:`~swift.common.storage_policy.reload_storage_policies` to
+ load POLICIES from ``swift.conf``.
The object_ring property is lazy loaded once the service's ``swift_dir``
is known via :meth:`~StoragePolicyCollection.get_object_ring`, but it may
be over-ridden via object_ring kwarg at create time for testing or
actively loaded with :meth:`~StoragePolicy.load_ring`.
"""
+
+ policy_type_to_policy_cls = {}
+
def __init__(self, idx, name='', is_default=False, is_deprecated=False,
object_ring=None):
+ # do not allow BaseStoragePolicy class to be instantiated directly
+ if type(self) == BaseStoragePolicy:
+ raise TypeError("Can't instantiate BaseStoragePolicy directly")
+ # policy parameter validation
try:
self.idx = int(idx)
except ValueError:
@@ -88,6 +133,8 @@ class StoragePolicy(object):
self.name = name
self.is_deprecated = config_true_value(is_deprecated)
self.is_default = config_true_value(is_default)
+ if self.policy_type not in BaseStoragePolicy.policy_type_to_policy_cls:
+ raise PolicyError('Invalid type', self.policy_type)
if self.is_deprecated and self.is_default:
raise PolicyError('Deprecated policy can not be default. '
'Invalid config', self.idx)
@@ -101,8 +148,80 @@ class StoragePolicy(object):
return cmp(self.idx, int(other))
def __repr__(self):
- return ("StoragePolicy(%d, %r, is_default=%s, is_deprecated=%s)") % (
- self.idx, self.name, self.is_default, self.is_deprecated)
+ return ("%s(%d, %r, is_default=%s, "
+ "is_deprecated=%s, policy_type=%r)") % \
+ (self.__class__.__name__, self.idx, self.name,
+ self.is_default, self.is_deprecated, self.policy_type)
+
+ @classmethod
+ def register(cls, policy_type):
+ """
+ Decorator for Storage Policy implementations to register
+ their StoragePolicy class. This will also set the policy_type
+ attribute on the registered implementation.
+ """
+ def register_wrapper(policy_cls):
+ if policy_type in cls.policy_type_to_policy_cls:
+ raise PolicyError(
+ '%r is already registered for the policy_type %r' % (
+ cls.policy_type_to_policy_cls[policy_type],
+ policy_type))
+ cls.policy_type_to_policy_cls[policy_type] = policy_cls
+ policy_cls.policy_type = policy_type
+ return policy_cls
+ return register_wrapper
+
+ @classmethod
+ def _config_options_map(cls):
+ """
+ Map config option name to StoragePolicy parameter name.
+ """
+ return {
+ 'name': 'name',
+ 'policy_type': 'policy_type',
+ 'default': 'is_default',
+ 'deprecated': 'is_deprecated',
+ }
+
+ @classmethod
+ def from_config(cls, policy_index, options):
+ config_to_policy_option_map = cls._config_options_map()
+ policy_options = {}
+ for config_option, value in options.items():
+ try:
+ policy_option = config_to_policy_option_map[config_option]
+ except KeyError:
+ raise PolicyError('Invalid option %r in '
+ 'storage-policy section' % config_option,
+ index=policy_index)
+ policy_options[policy_option] = value
+ return cls(policy_index, **policy_options)
+
+ def get_info(self, config=False):
+ """
+ Return the info dict and conf file options for this policy.
+
+ :param config: boolean, if True all config options are returned
+ """
+ info = {}
+ for config_option, policy_attribute in \
+ self._config_options_map().items():
+ info[config_option] = getattr(self, policy_attribute)
+ if not config:
+ # remove some options for public consumption
+ if not self.is_default:
+ info.pop('default')
+ if not self.is_deprecated:
+ info.pop('deprecated')
+ info.pop('policy_type')
+ return info
+
+ def _validate_ring(self):
+ """
+ Hook, called when the ring is loaded. Can be used to
+ validate the ring against the StoragePolicy configuration.
+ """
+ pass
def load_ring(self, swift_dir):
"""
@@ -114,11 +233,224 @@ class StoragePolicy(object):
return
self.object_ring = Ring(swift_dir, ring_name=self.ring_name)
- def get_options(self):
- """Return the valid conf file options for this policy."""
- return {'name': self.name,
- 'default': self.is_default,
- 'deprecated': self.is_deprecated}
+ # Validate ring to make sure it conforms to policy requirements
+ self._validate_ring()
+
+ @property
+ def quorum(self):
+ """
+ Number of successful backend requests needed for the proxy to
+ consider the client request successful.
+ """
+ raise NotImplementedError()
+
+
+@BaseStoragePolicy.register(REPL_POLICY)
+class StoragePolicy(BaseStoragePolicy):
+ """
+ Represents a storage policy of type 'replication'. Default storage policy
+ class unless otherwise overridden from swift.conf.
+
+ Not meant to be instantiated directly; use
+ :func:`~swift.common.storage_policy.reload_storage_policies` to load
+ POLICIES from ``swift.conf``.
+ """
+
+ @property
+ def quorum(self):
+ """
+ Quorum concept in the replication case:
+ floor(number of replica / 2) + 1
+ """
+ if not self.object_ring:
+ raise PolicyError('Ring is not loaded')
+ return quorum_size(self.object_ring.replica_count)
+
+
+@BaseStoragePolicy.register(EC_POLICY)
+class ECStoragePolicy(BaseStoragePolicy):
+ """
+ Represents a storage policy of type 'erasure_coding'.
+
+ Not meant to be instantiated directly; use
+ :func:`~swift.common.storage_policy.reload_storage_policies` to load
+ POLICIES from ``swift.conf``.
+ """
+ def __init__(self, idx, name='', is_default=False,
+ is_deprecated=False, object_ring=None,
+ ec_segment_size=DEFAULT_EC_OBJECT_SEGMENT_SIZE,
+ ec_type=None, ec_ndata=None, ec_nparity=None):
+
+ super(ECStoragePolicy, self).__init__(
+ idx, name, is_default, is_deprecated, object_ring)
+
+ # Validate erasure_coding policy specific members
+ # ec_type is one of the EC implementations supported by PyEClib
+ if ec_type is None:
+ raise PolicyError('Missing ec_type')
+ if ec_type not in VALID_EC_TYPES:
+ raise PolicyError('Wrong ec_type %s for policy %s, should be one'
+ ' of "%s"' % (ec_type, self.name,
+ ', '.join(VALID_EC_TYPES)))
+ self._ec_type = ec_type
+
+ # Define _ec_ndata as the number of EC data fragments
+ # Accessible as the property "ec_ndata"
+ try:
+ value = int(ec_ndata)
+ if value <= 0:
+ raise ValueError
+ self._ec_ndata = value
+ except (TypeError, ValueError):
+ raise PolicyError('Invalid ec_num_data_fragments %r' %
+ ec_ndata, index=self.idx)
+
+ # Define _ec_nparity as the number of EC parity fragments
+ # Accessible as the property "ec_nparity"
+ try:
+ value = int(ec_nparity)
+ if value <= 0:
+ raise ValueError
+ self._ec_nparity = value
+ except (TypeError, ValueError):
+ raise PolicyError('Invalid ec_num_parity_fragments %r'
+ % ec_nparity, index=self.idx)
+
+ # Define _ec_segment_size as the encode segment unit size
+ # Accessible as the property "ec_segment_size"
+ try:
+ value = int(ec_segment_size)
+ if value <= 0:
+ raise ValueError
+ self._ec_segment_size = value
+ except (TypeError, ValueError):
+ raise PolicyError('Invalid ec_object_segment_size %r' %
+ ec_segment_size, index=self.idx)
+
+ # Initialize PyECLib EC backend
+ try:
+ self.pyeclib_driver = \
+ ECDriver(k=self._ec_ndata, m=self._ec_nparity,
+ ec_type=self._ec_type)
+ except ECDriverError as e:
+ raise PolicyError("Error creating EC policy (%s)" % e,
+ index=self.idx)
+
+ # quorum size in the EC case depends on the choice of EC scheme.
+ self._ec_quorum_size = \
+ self._ec_ndata + self.pyeclib_driver.min_parity_fragments_needed()
+
+ @property
+ def ec_type(self):
+ return self._ec_type
+
+ @property
+ def ec_ndata(self):
+ return self._ec_ndata
+
+ @property
+ def ec_nparity(self):
+ return self._ec_nparity
+
+ @property
+ def ec_segment_size(self):
+ return self._ec_segment_size
+
+ @property
+ def fragment_size(self):
+ """
+ Maximum length of a fragment, including header.
+
+ NB: a fragment archive is a sequence of 0 or more max-length
+ fragments followed by one possibly-shorter fragment.
+ """
+ # Technically pyeclib's get_segment_info signature calls for
+ # (data_len, segment_size) but on a ranged GET we don't know the
+ # ec-content-length header before we need to compute where in the
+ # object we should request to align with the fragment size. So we
+ # tell pyeclib a lie - from it's perspective, as long as data_len >=
+ # segment_size it'll give us the answer we want. From our
+ # perspective, because we only use this answer to calculate the
+ # *minimum* size we should read from an object body even if data_len <
+ # segment_size we'll still only read *the whole one and only last
+ # fragment* and pass than into pyeclib who will know what to do with
+ # it just as it always does when the last fragment is < fragment_size.
+ return self.pyeclib_driver.get_segment_info(
+ self.ec_segment_size, self.ec_segment_size)['fragment_size']
+
+ @property
+ def ec_scheme_description(self):
+ """
+ This short hand form of the important parts of the ec schema is stored
+ in Object System Metadata on the EC Fragment Archives for debugging.
+ """
+ return "%s %d+%d" % (self._ec_type, self._ec_ndata, self._ec_nparity)
+
+ def __repr__(self):
+ return ("%s, EC config(ec_type=%s, ec_segment_size=%d, "
+ "ec_ndata=%d, ec_nparity=%d)") % (
+ super(ECStoragePolicy, self).__repr__(), self.ec_type,
+ self.ec_segment_size, self.ec_ndata, self.ec_nparity)
+
+ @classmethod
+ def _config_options_map(cls):
+ options = super(ECStoragePolicy, cls)._config_options_map()
+ options.update({
+ 'ec_type': 'ec_type',
+ 'ec_object_segment_size': 'ec_segment_size',
+ 'ec_num_data_fragments': 'ec_ndata',
+ 'ec_num_parity_fragments': 'ec_nparity',
+ })
+ return options
+
+ def get_info(self, config=False):
+ info = super(ECStoragePolicy, self).get_info(config=config)
+ if not config:
+ info.pop('ec_object_segment_size')
+ info.pop('ec_num_data_fragments')
+ info.pop('ec_num_parity_fragments')
+ info.pop('ec_type')
+ return info
+
+ def _validate_ring(self):
+ """
+ EC specific validation
+
+ Replica count check - we need _at_least_ (#data + #parity) replicas
+ configured. Also if the replica count is larger than exactly that
+ number there's a non-zero risk of error for code that is considering
+ the number of nodes in the primary list from the ring.
+ """
+ if not self.object_ring:
+ raise PolicyError('Ring is not loaded')
+ nodes_configured = self.object_ring.replica_count
+ if nodes_configured != (self.ec_ndata + self.ec_nparity):
+ raise RingValidationError(
+ 'EC ring for policy %s needs to be configured with '
+ 'exactly %d nodes. Got %d.' % (self.name,
+ self.ec_ndata + self.ec_nparity, nodes_configured))
+
+ @property
+ def quorum(self):
+ """
+ Number of successful backend requests needed for the proxy to consider
+ the client request successful.
+
+ The quorum size for EC policies defines the minimum number
+ of data + parity elements required to be able to guarantee
+ the desired fault tolerance, which is the number of data
+ elements supplemented by the minimum number of parity
+ elements required by the chosen erasure coding scheme.
+
+ For example, for Reed-Solomon, the minimum number parity
+ elements required is 1, and thus the quorum_size requirement
+ is ec_ndata + 1.
+
+ Given the number of parity elements required is not the same
+ for every erasure coding scheme, consult PyECLib for
+ min_parity_fragments_needed()
+ """
+ return self._ec_quorum_size
class StoragePolicyCollection(object):
@@ -236,9 +568,19 @@ class StoragePolicyCollection(object):
:returns: storage policy, or None if no such policy
"""
# makes it easier for callers to just pass in a header value
- index = int(index) if index else 0
+ if index in ('', None):
+ index = 0
+ else:
+ try:
+ index = int(index)
+ except ValueError:
+ return None
return self.by_index.get(index)
+ @property
+ def legacy(self):
+ return self.get_by_index(None)
+
def get_object_ring(self, policy_idx, swift_dir):
"""
Get the ring object to use to handle a request based on its policy.
@@ -267,10 +609,7 @@ class StoragePolicyCollection(object):
# delete from /info if deprecated
if pol.is_deprecated:
continue
- policy_entry = {}
- policy_entry['name'] = pol.name
- if pol.is_default:
- policy_entry['default'] = pol.is_default
+ policy_entry = pol.get_info()
policy_info.append(policy_entry)
return policy_info
@@ -287,22 +626,10 @@ def parse_storage_policies(conf):
if not section.startswith('storage-policy:'):
continue
policy_index = section.split(':', 1)[1]
- # map config option name to StoragePolicy parameter name
- config_to_policy_option_map = {
- 'name': 'name',
- 'default': 'is_default',
- 'deprecated': 'is_deprecated',
- }
- policy_options = {}
- for config_option, value in conf.items(section):
- try:
- policy_option = config_to_policy_option_map[config_option]
- except KeyError:
- raise PolicyError('Invalid option %r in '
- 'storage-policy section %r' % (
- config_option, section))
- policy_options[policy_option] = value
- policy = StoragePolicy(policy_index, **policy_options)
+ config_options = dict(conf.items(section))
+ policy_type = config_options.pop('policy_type', DEFAULT_POLICY_TYPE)
+ policy_cls = BaseStoragePolicy.policy_type_to_policy_cls[policy_type]
+ policy = policy_cls.from_config(policy_index, config_options)
policies.append(policy)
return StoragePolicyCollection(policies)
diff --git a/swift/common/swob.py b/swift/common/swob.py
index 729cdd96f..c2e3afb4e 100644
--- a/swift/common/swob.py
+++ b/swift/common/swob.py
@@ -36,7 +36,7 @@ needs to change.
"""
from collections import defaultdict
-from cStringIO import StringIO
+from StringIO import StringIO
import UserDict
import time
from functools import partial
@@ -128,6 +128,20 @@ class _UTC(tzinfo):
UTC = _UTC()
+class WsgiStringIO(StringIO):
+ """
+ This class adds support for the additional wsgi.input methods defined on
+ eventlet.wsgi.Input to the StringIO class which would otherwise be a fine
+ stand-in for the file-like object in the WSGI environment.
+ """
+
+ def set_hundred_continue_response_headers(self, headers):
+ pass
+
+ def send_hundred_continue_response(self):
+ pass
+
+
def _datetime_property(header):
"""
Set and retrieve the datetime value of self.headers[header]
@@ -743,16 +757,16 @@ def _req_environ_property(environ_field):
def _req_body_property():
"""
Set and retrieve the Request.body parameter. It consumes wsgi.input and
- returns the results. On assignment, uses a StringIO to create a new
+ returns the results. On assignment, uses a WsgiStringIO to create a new
wsgi.input.
"""
def getter(self):
body = self.environ['wsgi.input'].read()
- self.environ['wsgi.input'] = StringIO(body)
+ self.environ['wsgi.input'] = WsgiStringIO(body)
return body
def setter(self, value):
- self.environ['wsgi.input'] = StringIO(value)
+ self.environ['wsgi.input'] = WsgiStringIO(value)
self.environ['CONTENT_LENGTH'] = str(len(value))
return property(getter, setter, doc="Get and set the request body str")
@@ -820,7 +834,7 @@ class Request(object):
:param path: encoded, parsed, and unquoted into PATH_INFO
:param environ: WSGI environ dictionary
:param headers: HTTP headers
- :param body: stuffed in a StringIO and hung on wsgi.input
+ :param body: stuffed in a WsgiStringIO and hung on wsgi.input
:param kwargs: any environ key with an property setter
"""
headers = headers or {}
@@ -855,10 +869,10 @@ class Request(object):
}
env.update(environ)
if body is not None:
- env['wsgi.input'] = StringIO(body)
+ env['wsgi.input'] = WsgiStringIO(body)
env['CONTENT_LENGTH'] = str(len(body))
elif 'wsgi.input' not in env:
- env['wsgi.input'] = StringIO('')
+ env['wsgi.input'] = WsgiStringIO('')
req = Request(env)
for key, val in headers.iteritems():
req.headers[key] = val
@@ -965,7 +979,7 @@ class Request(object):
env.update({
'REQUEST_METHOD': 'GET',
'CONTENT_LENGTH': '0',
- 'wsgi.input': StringIO(''),
+ 'wsgi.input': WsgiStringIO(''),
})
return Request(env)
@@ -1102,10 +1116,12 @@ class Response(object):
app_iter = _resp_app_iter_property()
def __init__(self, body=None, status=200, headers=None, app_iter=None,
- request=None, conditional_response=False, **kw):
+ request=None, conditional_response=False,
+ conditional_etag=None, **kw):
self.headers = HeaderKeyDict(
[('Content-Type', 'text/html; charset=UTF-8')])
self.conditional_response = conditional_response
+ self._conditional_etag = conditional_etag
self.request = request
self.body = body
self.app_iter = app_iter
@@ -1131,6 +1147,26 @@ class Response(object):
if 'charset' in kw and 'content_type' in kw:
self.charset = kw['charset']
+ @property
+ def conditional_etag(self):
+ """
+ The conditional_etag keyword argument for Response will allow the
+ conditional match value of a If-Match request to be compared to a
+ non-standard value.
+
+ This is available for Storage Policies that do not store the client
+ object data verbatim on the storage nodes, but still need support
+ conditional requests.
+
+ It's most effectively used with X-Backend-Etag-Is-At which would
+ define the additional Metadata key where the original ETag of the
+ clear-form client request data.
+ """
+ if self._conditional_etag is not None:
+ return self._conditional_etag
+ else:
+ return self.etag
+
def _prepare_for_ranges(self, ranges):
"""
Prepare the Response for multiple ranges.
@@ -1161,15 +1197,16 @@ class Response(object):
return content_size, content_type
def _response_iter(self, app_iter, body):
+ etag = self.conditional_etag
if self.conditional_response and self.request:
- if self.etag and self.request.if_none_match and \
- self.etag in self.request.if_none_match:
+ if etag and self.request.if_none_match and \
+ etag in self.request.if_none_match:
self.status = 304
self.content_length = 0
return ['']
- if self.etag and self.request.if_match and \
- self.etag not in self.request.if_match:
+ if etag and self.request.if_match and \
+ etag not in self.request.if_match:
self.status = 412
self.content_length = 0
return ['']
diff --git a/swift/common/utils.py b/swift/common/utils.py
index cf7b7e7c5..19dcfd3d6 100644
--- a/swift/common/utils.py
+++ b/swift/common/utils.py
@@ -2236,11 +2236,16 @@ class GreenAsyncPile(object):
Correlating results with jobs (if necessary) is left to the caller.
"""
- def __init__(self, size):
+ def __init__(self, size_or_pool):
"""
- :param size: size pool of green threads to use
+ :param size_or_pool: thread pool size or a pool to use
"""
- self._pool = GreenPool(size)
+ if isinstance(size_or_pool, GreenPool):
+ self._pool = size_or_pool
+ size = self._pool.size
+ else:
+ self._pool = GreenPool(size_or_pool)
+ size = size_or_pool
self._responses = eventlet.queue.LightQueue(size)
self._inflight = 0
@@ -2646,6 +2651,10 @@ def public(func):
def quorum_size(n):
"""
+ quorum size as it applies to services that use 'replication' for data
+ integrity (Account/Container services). Object quorum_size is defined
+ on a storage policy basis.
+
Number of successful backend requests needed for the proxy to consider
the client request successful.
"""
@@ -3139,6 +3148,26 @@ _rfc_extension_pattern = re.compile(
r'(?:\s*;\s*(' + _rfc_token + r")\s*(?:=\s*(" + _rfc_token +
r'|"(?:[^"\\]|\\.)*"))?)')
+_content_range_pattern = re.compile(r'^bytes (\d+)-(\d+)/(\d+)$')
+
+
+def parse_content_range(content_range):
+ """
+ Parse a content-range header into (first_byte, last_byte, total_size).
+
+ See RFC 7233 section 4.2 for details on the header format, but it's
+ basically "Content-Range: bytes ${start}-${end}/${total}".
+
+ :param content_range: Content-Range header value to parse,
+ e.g. "bytes 100-1249/49004"
+ :returns: 3-tuple (start, end, total)
+ :raises: ValueError if malformed
+ """
+ found = re.search(_content_range_pattern, content_range)
+ if not found:
+ raise ValueError("malformed Content-Range %r" % (content_range,))
+ return tuple(int(x) for x in found.groups())
+
def parse_content_type(content_type):
"""
@@ -3293,8 +3322,11 @@ def iter_multipart_mime_documents(wsgi_input, boundary, read_chunk_size=4096):
:raises: MimeInvalid if the document is malformed
"""
boundary = '--' + boundary
- if wsgi_input.readline(len(boundary + '\r\n')).strip() != boundary:
- raise swift.common.exceptions.MimeInvalid('invalid starting boundary')
+ blen = len(boundary) + 2 # \r\n
+ got = wsgi_input.readline(blen)
+ if got.strip() != boundary:
+ raise swift.common.exceptions.MimeInvalid(
+ 'invalid starting boundary: wanted %r, got %r', (boundary, got))
boundary = '\r\n' + boundary
input_buffer = ''
done = False
diff --git a/swift/common/wsgi.py b/swift/common/wsgi.py
index b1e1f5ea7..35df2077f 100644
--- a/swift/common/wsgi.py
+++ b/swift/common/wsgi.py
@@ -25,6 +25,7 @@ import time
import mimetools
from swift import gettext_ as _
from StringIO import StringIO
+from textwrap import dedent
import eventlet
import eventlet.debug
@@ -96,13 +97,34 @@ def _loadconfigdir(object_type, uri, path, name, relative_to, global_conf):
loadwsgi._loaders['config_dir'] = _loadconfigdir
+class ConfigString(NamedConfigLoader):
+ """
+ Wrap a raw config string up for paste.deploy.
+
+ If you give one of these to our loadcontext (e.g. give it to our
+ appconfig) we'll intercept it and get it routed to the right loader.
+ """
+
+ def __init__(self, config_string):
+ self.contents = StringIO(dedent(config_string))
+ self.filename = "string"
+ defaults = {
+ 'here': "string",
+ '__file__': "string",
+ }
+ self.parser = loadwsgi.NicerConfigParser("string", defaults=defaults)
+ self.parser.optionxform = str # Don't lower-case keys
+ self.parser.readfp(self.contents)
+
+
def wrap_conf_type(f):
"""
Wrap a function whos first argument is a paste.deploy style config uri,
- such that you can pass it an un-adorned raw filesystem path and the config
- directive (either config: or config_dir:) will be added automatically
- based on the type of filesystem entity at the given path (either a file or
- directory) before passing it through to the paste.deploy function.
+ such that you can pass it an un-adorned raw filesystem path (or config
+ string) and the config directive (either config:, config_dir:, or
+ config_str:) will be added automatically based on the type of entity
+ (either a file or directory, or if no such entity on the file system -
+ just a string) before passing it through to the paste.deploy function.
"""
def wrapper(conf_path, *args, **kwargs):
if os.path.isdir(conf_path):
@@ -332,6 +354,12 @@ class PipelineWrapper(object):
def loadcontext(object_type, uri, name=None, relative_to=None,
global_conf=None):
+ if isinstance(uri, loadwsgi.ConfigLoader):
+ # bypass loadcontext's uri parsing and loader routing and
+ # just directly return the context
+ if global_conf:
+ uri.update_defaults(global_conf, overwrite=False)
+ return uri.get_context(object_type, name, global_conf)
add_conf_type = wrap_conf_type(lambda x: x)
return loadwsgi.loadcontext(object_type, add_conf_type(uri), name=name,
relative_to=relative_to,
diff --git a/swift/container/sync.py b/swift/container/sync.py
index 0f42de6e9..a409de4ac 100644
--- a/swift/container/sync.py
+++ b/swift/container/sync.py
@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import errno
import os
import uuid
from swift import gettext_ as _
@@ -25,8 +26,8 @@ from eventlet import sleep, Timeout
import swift.common.db
from swift.container.backend import ContainerBroker, DATADIR
from swift.common.container_sync_realms import ContainerSyncRealms
-from swift.common.direct_client import direct_get_object
-from swift.common.internal_client import delete_object, put_object
+from swift.common.internal_client import (
+ delete_object, put_object, InternalClient, UnexpectedResponse)
from swift.common.exceptions import ClientException
from swift.common.ring import Ring
from swift.common.ring.utils import is_local_device
@@ -37,6 +38,55 @@ from swift.common.utils import (
from swift.common.daemon import Daemon
from swift.common.http import HTTP_UNAUTHORIZED, HTTP_NOT_FOUND
from swift.common.storage_policy import POLICIES
+from swift.common.wsgi import ConfigString
+
+
+# The default internal client config body is to support upgrades without
+# requiring deployment of the new /etc/swift/internal-client.conf
+ic_conf_body = """
+[DEFAULT]
+# swift_dir = /etc/swift
+# user = swift
+# You can specify default log routing here if you want:
+# log_name = swift
+# log_facility = LOG_LOCAL0
+# log_level = INFO
+# log_address = /dev/log
+#
+# comma separated list of functions to call to setup custom log handlers.
+# functions get passed: conf, name, log_to_console, log_route, fmt, logger,
+# adapted_logger
+# log_custom_handlers =
+#
+# If set, log_udp_host will override log_address
+# log_udp_host =
+# log_udp_port = 514
+#
+# You can enable StatsD logging here:
+# log_statsd_host = localhost
+# log_statsd_port = 8125
+# log_statsd_default_sample_rate = 1.0
+# log_statsd_sample_rate_factor = 1.0
+# log_statsd_metric_prefix =
+
+[pipeline:main]
+pipeline = catch_errors proxy-logging cache proxy-server
+
+[app:proxy-server]
+use = egg:swift#proxy
+# See proxy-server.conf-sample for options
+
+[filter:cache]
+use = egg:swift#memcache
+# See proxy-server.conf-sample for options
+
+[filter:proxy-logging]
+use = egg:swift#proxy_logging
+
+[filter:catch_errors]
+use = egg:swift#catch_errors
+# See proxy-server.conf-sample for options
+""".lstrip()
class ContainerSync(Daemon):
@@ -103,12 +153,12 @@ class ContainerSync(Daemon):
loaded. This is overridden by unit tests.
"""
- def __init__(self, conf, container_ring=None):
+ def __init__(self, conf, container_ring=None, logger=None):
#: The dict of configuration values from the [container-sync] section
#: of the container-server.conf.
self.conf = conf
#: Logger to use for container-sync log lines.
- self.logger = get_logger(conf, log_route='container-sync')
+ self.logger = logger or get_logger(conf, log_route='container-sync')
#: Path to the local device mount points.
self.devices = conf.get('devices', '/srv/node')
#: Indicates whether mount points should be verified as actual mount
@@ -159,6 +209,26 @@ class ContainerSync(Daemon):
swift.common.db.DB_PREALLOCATION = \
config_true_value(conf.get('db_preallocation', 'f'))
self.conn_timeout = float(conf.get('conn_timeout', 5))
+ request_tries = int(conf.get('request_tries') or 3)
+
+ internal_client_conf_path = conf.get('internal_client_conf_path')
+ if not internal_client_conf_path:
+ self.logger.warning(
+ _('Configuration option internal_client_conf_path not '
+ 'defined. Using default configuration, See '
+ 'internal-client.conf-sample for options'))
+ internal_client_conf = ConfigString(ic_conf_body)
+ else:
+ internal_client_conf = internal_client_conf_path
+ try:
+ self.swift = InternalClient(
+ internal_client_conf, 'Swift Container Sync', request_tries)
+ except IOError as err:
+ if err.errno != errno.ENOENT:
+ raise
+ raise SystemExit(
+ _('Unable to load internal client from config: %r (%s)') %
+ (internal_client_conf_path, err))
def get_object_ring(self, policy_idx):
"""
@@ -380,39 +450,32 @@ class ContainerSync(Daemon):
looking_for_timestamp = Timestamp(row['created_at'])
timestamp = -1
headers = body = None
- headers_out = {'X-Backend-Storage-Policy-Index':
+ # look up for the newest one
+ headers_out = {'X-Newest': True,
+ 'X-Backend-Storage-Policy-Index':
str(info['storage_policy_index'])}
- for node in nodes:
- try:
- these_headers, this_body = direct_get_object(
- node, part, info['account'], info['container'],
- row['name'], headers=headers_out,
- resp_chunk_size=65536)
- this_timestamp = Timestamp(
- these_headers['x-timestamp'])
- if this_timestamp > timestamp:
- timestamp = this_timestamp
- headers = these_headers
- body = this_body
- except ClientException as err:
- # If any errors are not 404, make sure we report the
- # non-404 one. We don't want to mistakenly assume the
- # object no longer exists just because one says so and
- # the others errored for some other reason.
- if not exc or getattr(
- exc, 'http_status', HTTP_NOT_FOUND) == \
- HTTP_NOT_FOUND:
- exc = err
- except (Exception, Timeout) as err:
- exc = err
+ try:
+ source_obj_status, source_obj_info, source_obj_iter = \
+ self.swift.get_object(info['account'],
+ info['container'], row['name'],
+ headers=headers_out,
+ acceptable_statuses=(2, 4))
+
+ except (Exception, UnexpectedResponse, Timeout) as err:
+ source_obj_info = {}
+ source_obj_iter = None
+ exc = err
+ timestamp = Timestamp(source_obj_info.get(
+ 'x-timestamp', 0))
+ headers = source_obj_info
+ body = source_obj_iter
if timestamp < looking_for_timestamp:
if exc:
raise exc
raise Exception(
- _('Unknown exception trying to GET: %(node)r '
+ _('Unknown exception trying to GET: '
'%(account)r %(container)r %(object)r'),
- {'node': node, 'part': part,
- 'account': info['account'],
+ {'account': info['account'],
'container': info['container'],
'object': row['name']})
for key in ('date', 'last-modified'):
diff --git a/swift/obj/diskfile.py b/swift/obj/diskfile.py
index a8d14dfa2..c49d557fe 100644
--- a/swift/obj/diskfile.py
+++ b/swift/obj/diskfile.py
@@ -40,7 +40,7 @@ import hashlib
import logging
import traceback
import xattr
-from os.path import basename, dirname, exists, getmtime, join
+from os.path import basename, dirname, exists, getmtime, join, splitext
from random import shuffle
from tempfile import mkstemp
from contextlib import contextmanager
@@ -50,7 +50,7 @@ from eventlet import Timeout
from eventlet.hubs import trampoline
from swift import gettext_ as _
-from swift.common.constraints import check_mount
+from swift.common.constraints import check_mount, check_dir
from swift.common.request_helpers import is_sys_meta
from swift.common.utils import mkdirs, Timestamp, \
storage_directory, hash_path, renamer, fallocate, fsync, \
@@ -63,7 +63,9 @@ from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist, \
DiskFileDeleted, DiskFileError, DiskFileNotOpen, PathNotDir, \
ReplicationLockTimeout, DiskFileExpired, DiskFileXattrNotSupported
from swift.common.swob import multi_range_iterator
-from swift.common.storage_policy import get_policy_string, POLICIES
+from swift.common.storage_policy import (
+ get_policy_string, split_policy_string, PolicyError, POLICIES,
+ REPL_POLICY, EC_POLICY)
from functools import partial
@@ -154,10 +156,10 @@ def write_metadata(fd, metadata, xattr_size=65536):
raise
-def extract_policy_index(obj_path):
+def extract_policy(obj_path):
"""
- Extracts the policy index for an object (based on the name of the objects
- directory) given the device-relative path to the object. Returns 0 in
+ Extracts the policy for an object (based on the name of the objects
+ directory) given the device-relative path to the object. Returns None in
the event that the path is malformed in some way.
The device-relative path is everything after the mount point; for example:
@@ -170,19 +172,18 @@ def extract_policy_index(obj_path):
objects-5/179/485dc017205a81df3af616d917c90179/1401811134.873649.data
:param obj_path: device-relative path of an object
- :returns: storage policy index
+ :returns: a :class:`~swift.common.storage_policy.BaseStoragePolicy` or None
"""
- policy_idx = 0
try:
obj_portion = obj_path[obj_path.index(DATADIR_BASE):]
obj_dirname = obj_portion[:obj_portion.index('/')]
except Exception:
- return policy_idx
- if '-' in obj_dirname:
- base, policy_idx = obj_dirname.split('-', 1)
- if POLICIES.get_by_index(policy_idx) is None:
- policy_idx = 0
- return int(policy_idx)
+ return None
+ try:
+ base, policy = split_policy_string(obj_dirname)
+ except PolicyError:
+ return None
+ return policy
def quarantine_renamer(device_path, corrupted_file_path):
@@ -197,9 +198,13 @@ def quarantine_renamer(device_path, corrupted_file_path):
:raises OSError: re-raises non errno.EEXIST / errno.ENOTEMPTY
exceptions from rename
"""
+ policy = extract_policy(corrupted_file_path)
+ if policy is None:
+ # TODO: support a quarantine-unknown location
+ policy = POLICIES.legacy
from_dir = dirname(corrupted_file_path)
to_dir = join(device_path, 'quarantined',
- get_data_dir(extract_policy_index(corrupted_file_path)),
+ get_data_dir(policy),
basename(from_dir))
invalidate_hash(dirname(from_dir))
try:
@@ -429,8 +434,9 @@ class AuditLocation(object):
stringify to a filesystem path so the auditor's logs look okay.
"""
- def __init__(self, path, device, partition):
- self.path, self.device, self.partition = path, device, partition
+ def __init__(self, path, device, partition, policy):
+ self.path, self.device, self.partition, self.policy = (
+ path, device, partition, policy)
def __str__(self):
return str(self.path)
@@ -470,19 +476,17 @@ def object_audit_location_generator(devices, mount_check=True, logger=None,
_('Skipping %s as it is not mounted'), device)
continue
# loop through object dirs for all policies
- for dir in [dir for dir in os.listdir(os.path.join(devices, device))
- if dir.startswith(DATADIR_BASE)]:
- datadir_path = os.path.join(devices, device, dir)
- # warn if the object dir doesn't match with a policy
- policy_idx = 0
- if '-' in dir:
- base, policy_idx = dir.split('-', 1)
+ for dir_ in os.listdir(os.path.join(devices, device)):
+ if not dir_.startswith(DATADIR_BASE):
+ continue
try:
- get_data_dir(policy_idx)
- except ValueError:
+ base, policy = split_policy_string(dir_)
+ except PolicyError as e:
if logger:
- logger.warn(_('Directory %s does not map to a '
- 'valid policy') % dir)
+ logger.warn(_('Directory %r does not map '
+ 'to a valid policy (%s)') % (dir_, e))
+ continue
+ datadir_path = os.path.join(devices, device, dir_)
partitions = listdir(datadir_path)
for partition in partitions:
part_path = os.path.join(datadir_path, partition)
@@ -502,9 +506,50 @@ def object_audit_location_generator(devices, mount_check=True, logger=None,
continue
for hsh in hashes:
hsh_path = os.path.join(suff_path, hsh)
- yield AuditLocation(hsh_path, device, partition)
+ yield AuditLocation(hsh_path, device, partition,
+ policy)
+
+
+def strip_self(f):
+ """
+ Wrapper to attach module level functions to base class.
+ """
+ def wrapper(self, *args, **kwargs):
+ return f(*args, **kwargs)
+ return wrapper
+
+class DiskFileRouter(object):
+ policy_type_to_manager_cls = {}
+
+ @classmethod
+ def register(cls, policy_type):
+ """
+ Decorator for Storage Policy implementations to register
+ their DiskFile implementation.
+ """
+ def register_wrapper(diskfile_cls):
+ if policy_type in cls.policy_type_to_manager_cls:
+ raise PolicyError(
+ '%r is already registered for the policy_type %r' % (
+ cls.policy_type_to_manager_cls[policy_type],
+ policy_type))
+ cls.policy_type_to_manager_cls[policy_type] = diskfile_cls
+ return diskfile_cls
+ return register_wrapper
+
+ def __init__(self, *args, **kwargs):
+ self.policy_to_manager = {}
+ for policy in POLICIES:
+ manager_cls = self.policy_type_to_manager_cls[policy.policy_type]
+ self.policy_to_manager[policy] = manager_cls(*args, **kwargs)
+
+ def __getitem__(self, policy):
+ return self.policy_to_manager[policy]
+
+
+@DiskFileRouter.register(REPL_POLICY)
class DiskFileManager(object):
"""
Management class for devices, providing common place for shared parameters
@@ -527,6 +572,16 @@ class DiskFileManager(object):
:param conf: caller provided configuration object
:param logger: caller provided logger
"""
+
+ diskfile_cls = None # DiskFile will be set after that class is defined
+
+ # module level functions dropped to implementation specific
+ hash_cleanup_listdir = strip_self(hash_cleanup_listdir)
+ _get_hashes = strip_self(get_hashes)
+ invalidate_hash = strip_self(invalidate_hash)
+ get_ondisk_files = strip_self(get_ondisk_files)
+ quarantine_renamer = strip_self(quarantine_renamer)
+
def __init__(self, conf, logger):
self.logger = logger
self.devices = conf.get('devices', '/srv/node')
@@ -583,21 +638,25 @@ class DiskFileManager(object):
def get_dev_path(self, device, mount_check=None):
"""
- Return the path to a device, checking to see that it is a proper mount
- point based on a configuration parameter.
+ Return the path to a device, first checking to see if either it
+ is a proper mount point, or at least a directory depending on
+ the mount_check configuration option.
:param device: name of target device
:param mount_check: whether or not to check mountedness of device.
Defaults to bool(self.mount_check).
:returns: full path to the device, None if the path to the device is
- not a proper mount point.
+ not a proper mount point or directory.
"""
- should_check = self.mount_check if mount_check is None else mount_check
- if should_check and not check_mount(self.devices, device):
- dev_path = None
- else:
- dev_path = os.path.join(self.devices, device)
- return dev_path
+ # we'll do some kind of check unless explicitly forbidden
+ if mount_check is not False:
+ if mount_check or self.mount_check:
+ check = check_mount
+ else:
+ check = check_dir
+ if not check(self.devices, device):
+ return None
+ return os.path.join(self.devices, device)
@contextmanager
def replication_lock(self, device):
@@ -619,28 +678,27 @@ class DiskFileManager(object):
yield True
def pickle_async_update(self, device, account, container, obj, data,
- timestamp, policy_idx):
+ timestamp, policy):
device_path = self.construct_dev_path(device)
- async_dir = os.path.join(device_path, get_async_dir(policy_idx))
+ async_dir = os.path.join(device_path, get_async_dir(policy))
ohash = hash_path(account, container, obj)
self.threadpools[device].run_in_thread(
write_pickle,
data,
os.path.join(async_dir, ohash[-3:], ohash + '-' +
Timestamp(timestamp).internal),
- os.path.join(device_path, get_tmp_dir(policy_idx)))
+ os.path.join(device_path, get_tmp_dir(policy)))
self.logger.increment('async_pendings')
def get_diskfile(self, device, partition, account, container, obj,
- policy_idx=0, **kwargs):
+ policy, **kwargs):
dev_path = self.get_dev_path(device)
if not dev_path:
raise DiskFileDeviceUnavailable()
- return DiskFile(self, dev_path, self.threadpools[device],
- partition, account, container, obj,
- policy_idx=policy_idx,
- use_splice=self.use_splice, pipe_size=self.pipe_size,
- **kwargs)
+ return self.diskfile_cls(self, dev_path, self.threadpools[device],
+ partition, account, container, obj,
+ policy=policy, use_splice=self.use_splice,
+ pipe_size=self.pipe_size, **kwargs)
def object_audit_location_generator(self, device_dirs=None):
return object_audit_location_generator(self.devices, self.mount_check,
@@ -648,12 +706,12 @@ class DiskFileManager(object):
def get_diskfile_from_audit_location(self, audit_location):
dev_path = self.get_dev_path(audit_location.device, mount_check=False)
- return DiskFile.from_hash_dir(
+ return self.diskfile_cls.from_hash_dir(
self, audit_location.path, dev_path,
- audit_location.partition)
+ audit_location.partition, policy=audit_location.policy)
def get_diskfile_from_hash(self, device, partition, object_hash,
- policy_idx, **kwargs):
+ policy, **kwargs):
"""
Returns a DiskFile instance for an object at the given
object_hash. Just in case someone thinks of refactoring, be
@@ -667,13 +725,14 @@ class DiskFileManager(object):
if not dev_path:
raise DiskFileDeviceUnavailable()
object_path = os.path.join(
- dev_path, get_data_dir(policy_idx), partition, object_hash[-3:],
+ dev_path, get_data_dir(policy), str(partition), object_hash[-3:],
object_hash)
try:
- filenames = hash_cleanup_listdir(object_path, self.reclaim_age)
+ filenames = self.hash_cleanup_listdir(object_path,
+ self.reclaim_age)
except OSError as err:
if err.errno == errno.ENOTDIR:
- quar_path = quarantine_renamer(dev_path, object_path)
+ quar_path = self.quarantine_renamer(dev_path, object_path)
logging.exception(
_('Quarantined %(object_path)s to %(quar_path)s because '
'it is not a directory'), {'object_path': object_path,
@@ -693,21 +752,20 @@ class DiskFileManager(object):
metadata.get('name', ''), 3, 3, True)
except ValueError:
raise DiskFileNotExist()
- return DiskFile(self, dev_path, self.threadpools[device],
- partition, account, container, obj,
- policy_idx=policy_idx, **kwargs)
+ return self.diskfile_cls(self, dev_path, self.threadpools[device],
+ partition, account, container, obj,
+ policy=policy, **kwargs)
- def get_hashes(self, device, partition, suffix, policy_idx):
+ def get_hashes(self, device, partition, suffixes, policy):
dev_path = self.get_dev_path(device)
if not dev_path:
raise DiskFileDeviceUnavailable()
- partition_path = os.path.join(dev_path, get_data_dir(policy_idx),
+ partition_path = os.path.join(dev_path, get_data_dir(policy),
partition)
if not os.path.exists(partition_path):
mkdirs(partition_path)
- suffixes = suffix.split('-') if suffix else []
_junk, hashes = self.threadpools[device].force_run_in_thread(
- get_hashes, partition_path, recalculate=suffixes)
+ self._get_hashes, partition_path, recalculate=suffixes)
return hashes
def _listdir(self, path):
@@ -720,7 +778,7 @@ class DiskFileManager(object):
path, err)
return []
- def yield_suffixes(self, device, partition, policy_idx):
+ def yield_suffixes(self, device, partition, policy):
"""
Yields tuples of (full_path, suffix_only) for suffixes stored
on the given device and partition.
@@ -728,7 +786,7 @@ class DiskFileManager(object):
dev_path = self.get_dev_path(device)
if not dev_path:
raise DiskFileDeviceUnavailable()
- partition_path = os.path.join(dev_path, get_data_dir(policy_idx),
+ partition_path = os.path.join(dev_path, get_data_dir(policy),
partition)
for suffix in self._listdir(partition_path):
if len(suffix) != 3:
@@ -739,7 +797,7 @@ class DiskFileManager(object):
continue
yield (os.path.join(partition_path, suffix), suffix)
- def yield_hashes(self, device, partition, policy_idx, suffixes=None):
+ def yield_hashes(self, device, partition, policy, suffixes=None, **kwargs):
"""
Yields tuples of (full_path, hash_only, timestamp) for object
information stored for the given device, partition, and
@@ -752,17 +810,18 @@ class DiskFileManager(object):
if not dev_path:
raise DiskFileDeviceUnavailable()
if suffixes is None:
- suffixes = self.yield_suffixes(device, partition, policy_idx)
+ suffixes = self.yield_suffixes(device, partition, policy)
else:
- partition_path = os.path.join(dev_path, get_data_dir(policy_idx),
- partition)
+ partition_path = os.path.join(dev_path,
+ get_data_dir(policy),
+ str(partition))
suffixes = (
(os.path.join(partition_path, suffix), suffix)
for suffix in suffixes)
for suffix_path, suffix in suffixes:
for object_hash in self._listdir(suffix_path):
object_path = os.path.join(suffix_path, object_hash)
- for name in hash_cleanup_listdir(
+ for name in self.hash_cleanup_listdir(
object_path, self.reclaim_age):
ts, ext = name.rsplit('.', 1)
yield (object_path, object_hash, ts)
@@ -794,8 +853,11 @@ class DiskFileWriter(object):
:param tmppath: full path name of the opened file descriptor
:param bytes_per_sync: number bytes written between sync calls
:param threadpool: internal thread pool to use for disk operations
+ :param diskfile: the diskfile creating this DiskFileWriter instance
"""
- def __init__(self, name, datadir, fd, tmppath, bytes_per_sync, threadpool):
+
+ def __init__(self, name, datadir, fd, tmppath, bytes_per_sync, threadpool,
+ diskfile):
# Parameter tracking
self._name = name
self._datadir = datadir
@@ -803,6 +865,7 @@ class DiskFileWriter(object):
self._tmppath = tmppath
self._bytes_per_sync = bytes_per_sync
self._threadpool = threadpool
+ self._diskfile = diskfile
# Internal attributes
self._upload_size = 0
@@ -811,6 +874,10 @@ class DiskFileWriter(object):
self._put_succeeded = False
@property
+ def manager(self):
+ return self._diskfile.manager
+
+ @property
def put_succeeded(self):
return self._put_succeeded
@@ -855,7 +922,7 @@ class DiskFileWriter(object):
# drop_cache() after fsync() to avoid redundant work (pages all
# clean).
drop_buffer_cache(self._fd, 0, self._upload_size)
- invalidate_hash(dirname(self._datadir))
+ self.manager.invalidate_hash(dirname(self._datadir))
# After the rename completes, this object will be available for other
# requests to reference.
renamer(self._tmppath, target_path)
@@ -864,7 +931,7 @@ class DiskFileWriter(object):
# succeeded, the tempfile would no longer exist at its original path.
self._put_succeeded = True
try:
- hash_cleanup_listdir(self._datadir)
+ self.manager.hash_cleanup_listdir(self._datadir)
except OSError:
logging.exception(_('Problem cleaning up %s'), self._datadir)
@@ -887,6 +954,16 @@ class DiskFileWriter(object):
self._threadpool.force_run_in_thread(
self._finalize_put, metadata, target_path)
+ def commit(self, timestamp):
+ """
+ Perform any operations necessary to mark the object as durable. For
+ replication policy type this is a no-op.
+
+ :param timestamp: object put timestamp, an instance of
+ :class:`~swift.common.utils.Timestamp`
+ """
+ pass
+
class DiskFileReader(object):
"""
@@ -917,17 +994,20 @@ class DiskFileReader(object):
:param quarantine_hook: 1-arg callable called w/reason when quarantined
:param use_splice: if true, use zero-copy splice() to send data
:param pipe_size: size of pipe buffer used in zero-copy operations
+ :param diskfile: the diskfile creating this DiskFileReader instance
:param keep_cache: should resulting reads be kept in the buffer cache
"""
def __init__(self, fp, data_file, obj_size, etag, threadpool,
disk_chunk_size, keep_cache_size, device_path, logger,
- quarantine_hook, use_splice, pipe_size, keep_cache=False):
+ quarantine_hook, use_splice, pipe_size, diskfile,
+ keep_cache=False):
# Parameter tracking
self._fp = fp
self._data_file = data_file
self._obj_size = obj_size
self._etag = etag
self._threadpool = threadpool
+ self._diskfile = diskfile
self._disk_chunk_size = disk_chunk_size
self._device_path = device_path
self._logger = logger
@@ -950,6 +1030,10 @@ class DiskFileReader(object):
self._suppress_file_closing = False
self._quarantined_dir = None
+ @property
+ def manager(self):
+ return self._diskfile.manager
+
def __iter__(self):
"""Returns an iterator over the data file."""
try:
@@ -1130,7 +1214,8 @@ class DiskFileReader(object):
def _quarantine(self, msg):
self._quarantined_dir = self._threadpool.run_in_thread(
- quarantine_renamer, self._device_path, self._data_file)
+ self.manager.quarantine_renamer, self._device_path,
+ self._data_file)
self._logger.warn("Quarantined object %s: %s" % (
self._data_file, msg))
self._logger.increment('quarantines')
@@ -1196,15 +1281,18 @@ class DiskFile(object):
:param container: container name for the object
:param obj: object name for the object
:param _datadir: override the full datadir otherwise constructed here
- :param policy_idx: used to get the data dir when constructing it here
+ :param policy: the StoragePolicy instance
:param use_splice: if true, use zero-copy splice() to send data
:param pipe_size: size of pipe buffer used in zero-copy operations
"""
+ reader_cls = DiskFileReader
+ writer_cls = DiskFileWriter
+
def __init__(self, mgr, device_path, threadpool, partition,
account=None, container=None, obj=None, _datadir=None,
- policy_idx=0, use_splice=False, pipe_size=None):
- self._mgr = mgr
+ policy=None, use_splice=False, pipe_size=None, **kwargs):
+ self._manager = mgr
self._device_path = device_path
self._threadpool = threadpool or ThreadPool(nthreads=0)
self._logger = mgr.logger
@@ -1212,6 +1300,7 @@ class DiskFile(object):
self._bytes_per_sync = mgr.bytes_per_sync
self._use_splice = use_splice
self._pipe_size = pipe_size
+ self.policy = policy
if account and container and obj:
self._name = '/' + '/'.join((account, container, obj))
self._account = account
@@ -1219,7 +1308,7 @@ class DiskFile(object):
self._obj = obj
name_hash = hash_path(account, container, obj)
self._datadir = join(
- device_path, storage_directory(get_data_dir(policy_idx),
+ device_path, storage_directory(get_data_dir(policy),
partition, name_hash))
else:
# gets populated when we read the metadata
@@ -1228,7 +1317,7 @@ class DiskFile(object):
self._container = None
self._obj = None
self._datadir = None
- self._tmpdir = join(device_path, get_tmp_dir(policy_idx))
+ self._tmpdir = join(device_path, get_tmp_dir(policy))
self._metadata = None
self._data_file = None
self._fp = None
@@ -1239,10 +1328,14 @@ class DiskFile(object):
else:
name_hash = hash_path(account, container, obj)
self._datadir = join(
- device_path, storage_directory(get_data_dir(policy_idx),
+ device_path, storage_directory(get_data_dir(policy),
partition, name_hash))
@property
+ def manager(self):
+ return self._manager
+
+ @property
def account(self):
return self._account
@@ -1267,8 +1360,9 @@ class DiskFile(object):
return Timestamp(self._metadata.get('X-Timestamp'))
@classmethod
- def from_hash_dir(cls, mgr, hash_dir_path, device_path, partition):
- return cls(mgr, device_path, None, partition, _datadir=hash_dir_path)
+ def from_hash_dir(cls, mgr, hash_dir_path, device_path, partition, policy):
+ return cls(mgr, device_path, None, partition, _datadir=hash_dir_path,
+ policy=policy)
def open(self):
"""
@@ -1307,7 +1401,7 @@ class DiskFile(object):
.. note::
- An implemenation shall raise `DiskFileNotOpen` when has not
+ An implementation shall raise `DiskFileNotOpen` when has not
previously invoked the :func:`swift.obj.diskfile.DiskFile.open`
method.
"""
@@ -1339,7 +1433,7 @@ class DiskFile(object):
:returns: DiskFileQuarantined exception object
"""
self._quarantined_dir = self._threadpool.run_in_thread(
- quarantine_renamer, self._device_path, data_file)
+ self.manager.quarantine_renamer, self._device_path, data_file)
self._logger.warn("Quarantined object %s: %s" % (
data_file, msg))
self._logger.increment('quarantines')
@@ -1384,7 +1478,7 @@ class DiskFile(object):
# The data directory does not exist, so the object cannot exist.
fileset = (None, None, None)
else:
- fileset = get_ondisk_files(files, self._datadir)
+ fileset = self.manager.get_ondisk_files(files, self._datadir)
return fileset
def _construct_exception_from_ts_file(self, ts_file):
@@ -1576,12 +1670,12 @@ class DiskFile(object):
Not needed by the REST layer.
:returns: a :class:`swift.obj.diskfile.DiskFileReader` object
"""
- dr = DiskFileReader(
+ dr = self.reader_cls(
self._fp, self._data_file, int(self._metadata['Content-Length']),
self._metadata['ETag'], self._threadpool, self._disk_chunk_size,
- self._mgr.keep_cache_size, self._device_path, self._logger,
+ self._manager.keep_cache_size, self._device_path, self._logger,
use_splice=self._use_splice, quarantine_hook=_quarantine_hook,
- pipe_size=self._pipe_size, keep_cache=keep_cache)
+ pipe_size=self._pipe_size, diskfile=self, keep_cache=keep_cache)
# At this point the reader object is now responsible for closing
# the file pointer.
self._fp = None
@@ -1621,8 +1715,10 @@ class DiskFile(object):
if err.errno in (errno.ENOSPC, errno.EDQUOT):
raise DiskFileNoSpace()
raise
- dfw = DiskFileWriter(self._name, self._datadir, fd, tmppath,
- self._bytes_per_sync, self._threadpool)
+ dfw = self.writer_cls(self._name, self._datadir, fd, tmppath,
+ bytes_per_sync=self._bytes_per_sync,
+ threadpool=self._threadpool,
+ diskfile=self)
yield dfw
finally:
try:
@@ -1671,8 +1767,619 @@ class DiskFile(object):
:raises DiskFileError: this implementation will raise the same
errors as the `create()` method.
"""
- timestamp = Timestamp(timestamp).internal
-
+ # this is dumb, only tests send in strings
+ timestamp = Timestamp(timestamp)
with self.create() as deleter:
deleter._extension = '.ts'
- deleter.put({'X-Timestamp': timestamp})
+ deleter.put({'X-Timestamp': timestamp.internal})
+
+# TODO: move DiskFileManager definition down here
+DiskFileManager.diskfile_cls = DiskFile
+
+
+class ECDiskFileReader(DiskFileReader):
+ pass
+
+
+class ECDiskFileWriter(DiskFileWriter):
+
+ def _finalize_durable(self, durable_file_path):
+ exc = msg = None
+ try:
+ with open(durable_file_path, 'w') as _fd:
+ fsync(_fd)
+ try:
+ self.manager.hash_cleanup_listdir(self._datadir)
+ except OSError:
+ self.manager.logger.exception(
+ _('Problem cleaning up %s'), self._datadir)
+ except OSError:
+ msg = (_('Problem fsyncing durable state file: %s'),
+ durable_file_path)
+ exc = DiskFileError(msg)
+ except IOError as io_err:
+ if io_err.errno in (errno.ENOSPC, errno.EDQUOT):
+ msg = (_("No space left on device for %s"),
+ durable_file_path)
+ exc = DiskFileNoSpace()
+ else:
+ msg = (_('Problem writing durable state file: %s'),
+ durable_file_path)
+ exc = DiskFileError(msg)
+ if exc:
+ self.manager.logger.exception(msg)
+ raise exc
+
+ def commit(self, timestamp):
+ """
+ Finalize put by writing a timestamp.durable file for the object. We
+ do this for EC policy because it requires a 2-phase put commit
+ confirmation.
+
+ :param timestamp: object put timestamp, an instance of
+ :class:`~swift.common.utils.Timestamp`
+ """
+ durable_file_path = os.path.join(
+ self._datadir, timestamp.internal + '.durable')
+ self._threadpool.force_run_in_thread(
+ self._finalize_durable, durable_file_path)
+
+ def put(self, metadata):
+ """
+ The only difference between this method and the replication policy
+ DiskFileWriter method is the call into manager.make_on_disk_filename
+ to construct the data file name.
+ """
+ timestamp = Timestamp(metadata['X-Timestamp'])
+ fi = None
+ if self._extension == '.data':
+ # generally we treat the fragment index provided in metadata as
+ # canon, but if it's unavailable (e.g. tests) it's reasonable to
+ # use the frag_index provided at instantiation. Either way make
+ # sure that the fragment index is included in object sysmeta.
+ fi = metadata.setdefault('X-Object-Sysmeta-Ec-Frag-Index',
+ self._diskfile._frag_index)
+ filename = self.manager.make_on_disk_filename(
+ timestamp, self._extension, frag_index=fi)
+ metadata['name'] = self._name
+ target_path = join(self._datadir, filename)
+
+ self._threadpool.force_run_in_thread(
+ self._finalize_put, metadata, target_path)
+
+
+class ECDiskFile(DiskFile):
+
+ reader_cls = ECDiskFileReader
+ writer_cls = ECDiskFileWriter
+
+ def __init__(self, *args, **kwargs):
+ super(ECDiskFile, self).__init__(*args, **kwargs)
+ frag_index = kwargs.get('frag_index')
+ self._frag_index = None
+ if frag_index is not None:
+ self._frag_index = self.manager.validate_fragment_index(frag_index)
+
+ def _get_ondisk_file(self):
+ """
+ The only difference between this method and the replication policy
+ DiskFile method is passing in the frag_index kwarg to our manager's
+ get_ondisk_files method.
+ """
+ try:
+ files = os.listdir(self._datadir)
+ except OSError as err:
+ if err.errno == errno.ENOTDIR:
+ # If there's a file here instead of a directory, quarantine
+ # it; something's gone wrong somewhere.
+ raise self._quarantine(
+ # hack: quarantine_renamer actually renames the directory
+ # enclosing the filename you give it, but here we just
+ # want this one file and not its parent.
+ os.path.join(self._datadir, "made-up-filename"),
+ "Expected directory, found file at %s" % self._datadir)
+ elif err.errno != errno.ENOENT:
+ raise DiskFileError(
+ "Error listing directory %s: %s" % (self._datadir, err))
+ # The data directory does not exist, so the object cannot exist.
+ fileset = (None, None, None)
+ else:
+ fileset = self.manager.get_ondisk_files(
+ files, self._datadir, frag_index=self._frag_index)
+ return fileset
+
+ def purge(self, timestamp, frag_index):
+ """
+ Remove a tombstone file matching the specified timestamp or
+ datafile matching the specified timestamp and fragment index
+ from the object directory.
+
+ This provides the EC reconstructor/ssync process with a way to
+ remove a tombstone or fragment from a handoff node after
+ reverting it to its primary node.
+
+ The hash will be invalidated, and if empty or invalid the
+ hsh_path will be removed on next hash_cleanup_listdir.
+
+ :param timestamp: the object timestamp, an instance of
+ :class:`~swift.common.utils.Timestamp`
+ :param frag_index: a fragment archive index, must be a whole number.
+ """
+ for ext in ('.data', '.ts'):
+ purge_file = self.manager.make_on_disk_filename(
+ timestamp, ext=ext, frag_index=frag_index)
+ remove_file(os.path.join(self._datadir, purge_file))
+ self.manager.invalidate_hash(dirname(self._datadir))
+
+
+@DiskFileRouter.register(EC_POLICY)
+class ECDiskFileManager(DiskFileManager):
+ diskfile_cls = ECDiskFile
+
+ def validate_fragment_index(self, frag_index):
+ """
+ Return int representation of frag_index, or raise a DiskFileError if
+ frag_index is not a whole number.
+ """
+ try:
+ frag_index = int(str(frag_index))
+ except (ValueError, TypeError) as e:
+ raise DiskFileError(
+ 'Bad fragment index: %s: %s' % (frag_index, e))
+ if frag_index < 0:
+ raise DiskFileError(
+ 'Fragment index must not be negative: %s' % frag_index)
+ return frag_index
+
+ def make_on_disk_filename(self, timestamp, ext=None, frag_index=None,
+ *a, **kw):
+ """
+ Returns the EC specific filename for given timestamp.
+
+ :param timestamp: the object timestamp, an instance of
+ :class:`~swift.common.utils.Timestamp`
+ :param ext: an optional string representing a file extension to be
+ appended to the returned file name
+ :param frag_index: a fragment archive index, used with .data extension
+ only, must be a whole number.
+ :returns: a file name
+ :raises DiskFileError: if ext=='.data' and the kwarg frag_index is not
+ a whole number
+ """
+ rv = timestamp.internal
+ if ext == '.data':
+ # for datafiles only we encode the fragment index in the filename
+ # to allow archives of different indexes to temporarily be stored
+ # on the same node in certain situations
+ frag_index = self.validate_fragment_index(frag_index)
+ rv += '#' + str(frag_index)
+ if ext:
+ rv = '%s%s' % (rv, ext)
+ return rv
+
+ def parse_on_disk_filename(self, filename):
+ """
+ Returns the timestamp extracted from a policy specific .data file name.
+ For EC policy the data file name includes a fragment index which must
+ be stripped off to retrieve the timestamp.
+
+ :param filename: the data file name including extension
+ :returns: a dict, with keys for timestamp, frag_index, and ext::
+
+ * timestamp is a :class:`~swift.common.utils.Timestamp`
+ * frag_index is an int or None
+ * ext is a string, the file extension including the leading dot or
+ the empty string if the filename has no extenstion.
+
+ :raises DiskFileError: if any part of the filename is not able to be
+ validated.
+ """
+ frag_index = None
+ filename, ext = splitext(filename)
+ parts = filename.split('#', 1)
+ timestamp = parts[0]
+ if ext == '.data':
+ # it is an error for an EC data file to not have a valid
+ # fragment index
+ try:
+ frag_index = parts[1]
+ except IndexError:
+ frag_index = None
+ frag_index = self.validate_fragment_index(frag_index)
+ return {
+ 'timestamp': Timestamp(timestamp),
+ 'frag_index': frag_index,
+ 'ext': ext,
+ }
+
+ def is_obsolete(self, filename, other_filename):
+ """
+ Test if a given file is considered to be obsolete with respect to
+ another file in an object storage dir.
+
+ Implements EC policy specific behavior when comparing files against a
+ .durable file.
+
+ A simple string comparison would consider t2#1.data to be older than
+ t2.durable (since t2#1.data < t2.durable). By stripping off the file
+ extensions we get the desired behavior: t2#1 > t2 without compromising
+ the detection of t1#1 < t2.
+
+ :param filename: a string representing an absolute filename
+ :param other_filename: a string representing an absolute filename
+ :returns: True if filename is considered obsolete, False otherwise.
+ """
+ if other_filename.endswith('.durable'):
+ return splitext(filename)[0] < splitext(other_filename)[0]
+ return filename < other_filename
+
+ def _gather_on_disk_file(self, filename, ext, context, frag_index=None,
+ **kwargs):
+ """
+ Called by gather_ondisk_files() for each file in an object
+ datadir in reverse sorted order. If a file is considered part of a
+ valid on-disk file set it will be added to the context dict, keyed by
+ its extension. If a file is considered to be obsolete it will be added
+ to a list stored under the key 'obsolete' in the context dict.
+
+ :param filename: name of file to be accepted or not
+ :param ext: extension part of filename
+ :param context: a context dict that may have been populated by previous
+ calls to this method
+ :param frag_index: if set, search for a specific fragment index .data
+ file, otherwise accept the first valid .data file.
+ :returns: True if a valid file set has been found, False otherwise
+ """
+
+ # if first file with given extension then add filename to context
+ # dict and return True
+ accept_first = lambda: context.setdefault(ext, filename) == filename
+ # add the filename to the list of obsolete files in context dict
+ discard = lambda: context.setdefault('obsolete', []).append(filename)
+ # set a flag in the context dict indicating that a valid fileset has
+ # been found
+ set_valid_fileset = lambda: context.setdefault('found_valid', True)
+ # return True if the valid fileset flag is set in the context dict
+ have_valid_fileset = lambda: context.get('found_valid')
+
+ if context.get('.durable'):
+ # a .durable file has been found
+ if ext == '.data':
+ if self.is_obsolete(filename, context.get('.durable')):
+ # this and remaining data files are older than durable
+ discard()
+ set_valid_fileset()
+ else:
+ # accept the first .data file if it matches requested
+ # frag_index, or if no specific frag_index is requested
+ fi = self.parse_on_disk_filename(filename)['frag_index']
+ if frag_index is None or frag_index == int(fi):
+ accept_first()
+ set_valid_fileset()
+ # else: keep searching for a .data file to match frag_index
+ context.setdefault('fragments', []).append(filename)
+ else:
+ # there can no longer be a matching .data file so mark what has
+ # been found so far as the valid fileset
+ discard()
+ set_valid_fileset()
+ elif ext == '.data':
+ # not yet found a .durable
+ if have_valid_fileset():
+ # valid fileset means we must have a newer
+ # .ts, so discard the older .data file
+ discard()
+ else:
+ # .data newer than a .durable or .ts, don't discard yet
+ context.setdefault('fragments_without_durable', []).append(
+ filename)
+ elif ext == '.ts':
+ if have_valid_fileset() or not accept_first():
+ # newer .data, .durable or .ts already found so discard this
+ discard()
+ if not have_valid_fileset():
+ # remove any .meta that may have been previously found
+ context['.meta'] = None
+ set_valid_fileset()
+ elif ext in ('.meta', '.durable'):
+ if have_valid_fileset() or not accept_first():
+ # newer .data, .durable or .ts already found so discard this
+ discard()
+ else:
+ # ignore unexpected files
+ pass
+ return have_valid_fileset()
+
+ def _verify_on_disk_files(self, accepted_files, frag_index=None, **kwargs):
+ """
+ Verify that the final combination of on disk files complies with the
+ diskfile contract.
+
+ :param accepted_files: files that have been found and accepted
+ :param frag_index: specifies a specific fragment index .data file
+ :returns: True if the file combination is compliant, False otherwise
+ """
+ if not accepted_files.get('.data'):
+ # We may find only a .meta, which doesn't mean the on disk
+ # contract is broken. So we clear it to comply with
+ # superclass assertions.
+ accepted_files['.meta'] = None
+
+ data_file, meta_file, ts_file, durable_file = tuple(
+ [accepted_files.get(ext)
+ for ext in ('.data', '.meta', '.ts', '.durable')])
+
+ return ((data_file is None or durable_file is not None)
+ and (data_file is None and meta_file is None
+ and ts_file is None and durable_file is None)
+ or (ts_file is not None and data_file is None
+ and meta_file is None and durable_file is None)
+ or (data_file is not None and durable_file is not None
+ and ts_file is None)
+ or (durable_file is not None and meta_file is None
+ and ts_file is None))
+
+ def gather_ondisk_files(self, files, include_obsolete=False,
+ frag_index=None, verify=False, **kwargs):
+ """
+ Given a simple list of files names, iterate over them to determine the
+ files that constitute a valid object, and optionally determine the
+ files that are obsolete and could be deleted. Note that some files may
+ fall into neither category.
+
+ :param files: a list of file names.
+ :param include_obsolete: By default the iteration will stop when a
+ valid file set has been found. Setting this
+ argument to True will cause the iteration to
+ continue in order to find all obsolete files.
+ :param frag_index: if set, search for a specific fragment index .data
+ file, otherwise accept the first valid .data file.
+ :returns: a dict that may contain: valid on disk files keyed by their
+ filename extension; a list of obsolete files stored under the
+ key 'obsolete'.
+ """
+ # This visitor pattern enables future refactoring of other disk
+ # manager implementations to re-use this method and override
+ # _gather_ondisk_file and _verify_ondisk_files to apply implementation
+ # specific selection and verification of on-disk files.
+ files.sort(reverse=True)
+ results = {}
+ for afile in files:
+ ts_file = results.get('.ts')
+ data_file = results.get('.data')
+ if not include_obsolete:
+ assert ts_file is None, "On-disk file search loop" \
+ " continuing after tombstone, %s, encountered" % ts_file
+ assert data_file is None, "On-disk file search loop" \
+ " continuing after data file, %s, encountered" % data_file
+
+ ext = splitext(afile)[1]
+ if self._gather_on_disk_file(
+ afile, ext, results, frag_index=frag_index, **kwargs):
+ if not include_obsolete:
+ break
+
+ if verify:
+ assert self._verify_on_disk_files(
+ results, frag_index=frag_index, **kwargs), \
+ "On-disk file search algorithm contract is broken: %s" \
+ % results.values()
+ return results
+
+ def get_ondisk_files(self, files, datadir, **kwargs):
+ """
+ Given a simple list of files names, determine the files to use.
+
+ :param files: simple set of files as a python list
+ :param datadir: directory name files are from for convenience
+ :returns: a tuple of data, meta, and tombstone
+ """
+ # maintain compatibility with 'legacy' get_ondisk_files return value
+ accepted_files = self.gather_ondisk_files(files, verify=True, **kwargs)
+ result = [(join(datadir, accepted_files.get(ext))
+ if accepted_files.get(ext) else None)
+ for ext in ('.data', '.meta', '.ts')]
+ return tuple(result)
+
+ def cleanup_ondisk_files(self, hsh_path, reclaim_age=ONE_WEEK,
+ frag_index=None):
+ """
+ Clean up on-disk files that are obsolete and gather the set of valid
+ on-disk files for an object.
+
+ :param hsh_path: object hash path
+ :param reclaim_age: age in seconds at which to remove tombstones
+ :param frag_index: if set, search for a specific fragment index .data
+ file, otherwise accept the first valid .data file
+ :returns: a dict that may contain: valid on disk files keyed by their
+ filename extension; a list of obsolete files stored under the
+ key 'obsolete'; a list of files remaining in the directory,
+ reverse sorted, stored under the key 'files'.
+ """
+ def is_reclaimable(filename):
+ timestamp = self.parse_on_disk_filename(filename)['timestamp']
+ return (time.time() - float(timestamp)) > reclaim_age
+
+ files = listdir(hsh_path)
+ files.sort(reverse=True)
+ results = self.gather_ondisk_files(files, include_obsolete=True,
+ frag_index=frag_index)
+ if '.durable' in results and not results.get('fragments'):
+ # a .durable with no .data is deleted as soon as it is found
+ results.setdefault('obsolete', []).append(results.pop('.durable'))
+ if '.ts' in results and is_reclaimable(results['.ts']):
+ results.setdefault('obsolete', []).append(results.pop('.ts'))
+ for filename in results.get('fragments_without_durable', []):
+ # stray fragments are not deleted until reclaim-age
+ if is_reclaimable(filename):
+ results.setdefault('obsolete', []).append(filename)
+ for filename in results.get('obsolete', []):
+ remove_file(join(hsh_path, filename))
+ files.remove(filename)
+ results['files'] = files
+ return results
+
+ def hash_cleanup_listdir(self, hsh_path, reclaim_age=ONE_WEEK):
+ """
+ List contents of a hash directory and clean up any old files.
+ For EC policy, delete files older than a .durable or .ts file.
+
+ :param hsh_path: object hash path
+ :param reclaim_age: age in seconds at which to remove tombstones
+ :returns: list of files remaining in the directory, reverse sorted
+ """
+ # maintain compatibility with 'legacy' hash_cleanup_listdir
+ # return value
+ return self.cleanup_ondisk_files(
+ hsh_path, reclaim_age=reclaim_age)['files']
+
+ def yield_hashes(self, device, partition, policy,
+ suffixes=None, frag_index=None):
+ """
+ This is the same as the replicated yield_hashes except when frag_index
+ is provided data files for fragment indexes not matching the given
+ frag_index are skipped.
+ """
+ dev_path = self.get_dev_path(device)
+ if not dev_path:
+ raise DiskFileDeviceUnavailable()
+ if suffixes is None:
+ suffixes = self.yield_suffixes(device, partition, policy)
+ else:
+ partition_path = os.path.join(dev_path,
+ get_data_dir(policy),
+ str(partition))
+ suffixes = (
+ (os.path.join(partition_path, suffix), suffix)
+ for suffix in suffixes)
+ for suffix_path, suffix in suffixes:
+ for object_hash in self._listdir(suffix_path):
+ object_path = os.path.join(suffix_path, object_hash)
+ newest_valid_file = None
+ try:
+ results = self.cleanup_ondisk_files(
+ object_path, self.reclaim_age, frag_index=frag_index)
+ newest_valid_file = (results.get('.meta')
+ or results.get('.data')
+ or results.get('.ts'))
+ if newest_valid_file:
+ timestamp = self.parse_on_disk_filename(
+ newest_valid_file)['timestamp']
+ yield (object_path, object_hash, timestamp.internal)
+ except AssertionError as err:
+ self.logger.debug('Invalid file set in %s (%s)' % (
+ object_path, err))
+ except DiskFileError as err:
+ self.logger.debug(
+ 'Invalid diskfile filename %r in %r (%s)' % (
+ newest_valid_file, object_path, err))
+
+ def _hash_suffix(self, path, reclaim_age):
+ """
+ The only difference between this method and the module level function
+ hash_suffix is the way that files are updated on the returned hash.
+
+ Instead of all filenames hashed into a single hasher, each file name
+ will fall into a bucket either by fragment index for datafiles, or
+ None (indicating a durable, metadata or tombstone).
+ """
+ # hash_per_fi instead of single hash for whole suffix
+ hash_per_fi = defaultdict(hashlib.md5)
+ try:
+ path_contents = sorted(os.listdir(path))
+ except OSError as err:
+ if err.errno in (errno.ENOTDIR, errno.ENOENT):
+ raise PathNotDir()
+ raise
+ for hsh in path_contents:
+ hsh_path = join(path, hsh)
+ try:
+ files = self.hash_cleanup_listdir(hsh_path, reclaim_age)
+ except OSError as err:
+ if err.errno == errno.ENOTDIR:
+ partition_path = dirname(path)
+ objects_path = dirname(partition_path)
+ device_path = dirname(objects_path)
+ quar_path = quarantine_renamer(device_path, hsh_path)
+ logging.exception(
+ _('Quarantined %(hsh_path)s to %(quar_path)s because '
+ 'it is not a directory'), {'hsh_path': hsh_path,
+ 'quar_path': quar_path})
+ continue
+ raise
+ if not files:
+ try:
+ os.rmdir(hsh_path)
+ except OSError:
+ pass
+ # we just deleted this hsh_path, why are we waiting
+ # until the next suffix hash to raise PathNotDir so that
+ # this suffix will get del'd from the suffix hashes?
+ for filename in files:
+ info = self.parse_on_disk_filename(filename)
+ fi = info['frag_index']
+ if fi is None:
+ hash_per_fi[fi].update(filename)
+ else:
+ hash_per_fi[fi].update(info['timestamp'].internal)
+ try:
+ os.rmdir(path)
+ except OSError:
+ pass
+ # here we flatten out the hashers hexdigest into a dictionary instead
+ # of just returning the one hexdigest for the whole suffix
+ return dict((fi, md5.hexdigest()) for fi, md5 in hash_per_fi.items())
+
+ def _get_hashes(self, partition_path, recalculate=None, do_listdir=False,
+ reclaim_age=None):
+ """
+ The only difference with this method and the module level function
+ get_hashes is the call to hash_suffix routes to a method _hash_suffix
+ on this instance.
+ """
+ reclaim_age = reclaim_age or self.reclaim_age
+ hashed = 0
+ hashes_file = join(partition_path, HASH_FILE)
+ modified = False
+ force_rewrite = False
+ hashes = {}
+ mtime = -1
+
+ if recalculate is None:
+ recalculate = []
+
+ try:
+ with open(hashes_file, 'rb') as fp:
+ hashes = pickle.load(fp)
+ mtime = getmtime(hashes_file)
+ except Exception:
+ do_listdir = True
+ force_rewrite = True
+ if do_listdir:
+ for suff in os.listdir(partition_path):
+ if len(suff) == 3:
+ hashes.setdefault(suff, None)
+ modified = True
+ hashes.update((suffix, None) for suffix in recalculate)
+ for suffix, hash_ in hashes.items():
+ if not hash_:
+ suffix_dir = join(partition_path, suffix)
+ try:
+ hashes[suffix] = self._hash_suffix(suffix_dir, reclaim_age)
+ hashed += 1
+ except PathNotDir:
+ del hashes[suffix]
+ except OSError:
+ logging.exception(_('Error hashing suffix'))
+ modified = True
+ if modified:
+ with lock_path(partition_path):
+ if force_rewrite or not exists(hashes_file) or \
+ getmtime(hashes_file) == mtime:
+ write_pickle(
+ hashes, hashes_file, partition_path, PICKLE_PROTOCOL)
+ return hashed, hashes
+ return self._get_hashes(partition_path, recalculate, do_listdir,
+ reclaim_age)
+ else:
+ return hashed, hashes
diff --git a/swift/obj/mem_diskfile.py b/swift/obj/mem_diskfile.py
index efb8c6c8c..be5fbf134 100644
--- a/swift/obj/mem_diskfile.py
+++ b/swift/obj/mem_diskfile.py
@@ -57,6 +57,12 @@ class InMemoryFileSystem(object):
def get_diskfile(self, account, container, obj, **kwargs):
return DiskFile(self, account, container, obj)
+ def pickle_async_update(self, *args, **kwargs):
+ """
+ For now don't handle async updates.
+ """
+ pass
+
class DiskFileWriter(object):
"""
@@ -98,6 +104,16 @@ class DiskFileWriter(object):
metadata['name'] = self._name
self._filesystem.put_object(self._name, self._fp, metadata)
+ def commit(self, timestamp):
+ """
+ Perform any operations necessary to mark the object as durable. For
+ mem_diskfile type this is a no-op.
+
+ :param timestamp: object put timestamp, an instance of
+ :class:`~swift.common.utils.Timestamp`
+ """
+ pass
+
class DiskFileReader(object):
"""
diff --git a/swift/obj/mem_server.py b/swift/obj/mem_server.py
index 83647661a..764a92a92 100644
--- a/swift/obj/mem_server.py
+++ b/swift/obj/mem_server.py
@@ -15,15 +15,7 @@
""" In-Memory Object Server for Swift """
-import os
-from swift import gettext_ as _
-from eventlet import Timeout
-
-from swift.common.bufferedhttp import http_connect
-from swift.common.exceptions import ConnectionTimeout
-
-from swift.common.http import is_success
from swift.obj.mem_diskfile import InMemoryFileSystem
from swift.obj import server
@@ -53,49 +45,6 @@ class ObjectController(server.ObjectController):
"""
return self._filesystem.get_diskfile(account, container, obj, **kwargs)
- def async_update(self, op, account, container, obj, host, partition,
- contdevice, headers_out, objdevice, policy_idx):
- """
- Sends or saves an async update.
-
- :param op: operation performed (ex: 'PUT', or 'DELETE')
- :param account: account name for the object
- :param container: container name for the object
- :param obj: object name
- :param host: host that the container is on
- :param partition: partition that the container is on
- :param contdevice: device name that the container is on
- :param headers_out: dictionary of headers to send in the container
- request
- :param objdevice: device name that the object is in
- :param policy_idx: the associated storage policy index
- """
- headers_out['user-agent'] = 'object-server %s' % os.getpid()
- full_path = '/%s/%s/%s' % (account, container, obj)
- if all([host, partition, contdevice]):
- try:
- with ConnectionTimeout(self.conn_timeout):
- ip, port = host.rsplit(':', 1)
- conn = http_connect(ip, port, contdevice, partition, op,
- full_path, headers_out)
- with Timeout(self.node_timeout):
- response = conn.getresponse()
- response.read()
- if is_success(response.status):
- return
- else:
- self.logger.error(_(
- 'ERROR Container update failed: %(status)d '
- 'response from %(ip)s:%(port)s/%(dev)s'),
- {'status': response.status, 'ip': ip, 'port': port,
- 'dev': contdevice})
- except (Exception, Timeout):
- self.logger.exception(_(
- 'ERROR container update failed with '
- '%(ip)s:%(port)s/%(dev)s'),
- {'ip': ip, 'port': port, 'dev': contdevice})
- # FIXME: For now don't handle async updates
-
def REPLICATE(self, request):
"""
Handle REPLICATE requests for the Swift Object Server. This is used
diff --git a/swift/obj/reconstructor.py b/swift/obj/reconstructor.py
new file mode 100644
index 000000000..0ee2afbf6
--- /dev/null
+++ b/swift/obj/reconstructor.py
@@ -0,0 +1,925 @@
+# Copyright (c) 2010-2015 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+from os.path import join
+import random
+import time
+import itertools
+from collections import defaultdict
+import cPickle as pickle
+import shutil
+
+from eventlet import (GreenPile, GreenPool, Timeout, sleep, hubs, tpool,
+ spawn)
+from eventlet.support.greenlets import GreenletExit
+
+from swift import gettext_ as _
+from swift.common.utils import (
+ whataremyips, unlink_older_than, compute_eta, get_logger,
+ dump_recon_cache, ismount, mkdirs, config_true_value, list_from_csv,
+ get_hub, tpool_reraise, GreenAsyncPile, Timestamp, remove_file)
+from swift.common.swob import HeaderKeyDict
+from swift.common.bufferedhttp import http_connect
+from swift.common.daemon import Daemon
+from swift.common.ring.utils import is_local_device
+from swift.obj.ssync_sender import Sender as ssync_sender
+from swift.common.http import HTTP_OK, HTTP_INSUFFICIENT_STORAGE
+from swift.obj.diskfile import DiskFileRouter, get_data_dir, \
+ get_tmp_dir
+from swift.common.storage_policy import POLICIES, EC_POLICY
+from swift.common.exceptions import ConnectionTimeout, DiskFileError, \
+ SuffixSyncError
+
+SYNC, REVERT = ('sync_only', 'sync_revert')
+
+
+hubs.use_hub(get_hub())
+
+
+class RebuildingECDiskFileStream(object):
+ """
+ This class wraps the the reconstructed fragment archive data and
+ metadata in the DiskFile interface for ssync.
+ """
+
+ def __init__(self, metadata, frag_index, rebuilt_fragment_iter):
+ # start with metadata from a participating FA
+ self.metadata = metadata
+
+ # the new FA is going to have the same length as others in the set
+ self._content_length = self.metadata['Content-Length']
+
+ # update the FI and delete the ETag, the obj server will
+ # recalc on the other side...
+ self.metadata['X-Object-Sysmeta-Ec-Frag-Index'] = frag_index
+ del self.metadata['ETag']
+
+ self.frag_index = frag_index
+ self.rebuilt_fragment_iter = rebuilt_fragment_iter
+
+ def get_metadata(self):
+ return self.metadata
+
+ @property
+ def content_length(self):
+ return self._content_length
+
+ def reader(self):
+ for chunk in self.rebuilt_fragment_iter:
+ yield chunk
+
+
+class ObjectReconstructor(Daemon):
+ """
+ Reconstruct objects using erasure code. And also rebalance EC Fragment
+ Archive objects off handoff nodes.
+
+ Encapsulates most logic and data needed by the object reconstruction
+ process. Each call to .reconstruct() performs one pass. It's up to the
+ caller to do this in a loop.
+ """
+
+ def __init__(self, conf, logger=None):
+ """
+ :param conf: configuration object obtained from ConfigParser
+ :param logger: logging object
+ """
+ self.conf = conf
+ self.logger = logger or get_logger(
+ conf, log_route='object-reconstructor')
+ self.devices_dir = conf.get('devices', '/srv/node')
+ self.mount_check = config_true_value(conf.get('mount_check', 'true'))
+ self.swift_dir = conf.get('swift_dir', '/etc/swift')
+ self.port = int(conf.get('bind_port', 6000))
+ self.concurrency = int(conf.get('concurrency', 1))
+ self.stats_interval = int(conf.get('stats_interval', '300'))
+ self.ring_check_interval = int(conf.get('ring_check_interval', 15))
+ self.next_check = time.time() + self.ring_check_interval
+ self.reclaim_age = int(conf.get('reclaim_age', 86400 * 7))
+ self.partition_times = []
+ self.run_pause = int(conf.get('run_pause', 30))
+ self.http_timeout = int(conf.get('http_timeout', 60))
+ self.lockup_timeout = int(conf.get('lockup_timeout', 1800))
+ self.recon_cache_path = conf.get('recon_cache_path',
+ '/var/cache/swift')
+ self.rcache = os.path.join(self.recon_cache_path, "object.recon")
+ # defaults subject to change after beta
+ self.conn_timeout = float(conf.get('conn_timeout', 0.5))
+ self.node_timeout = float(conf.get('node_timeout', 10))
+ self.network_chunk_size = int(conf.get('network_chunk_size', 65536))
+ self.disk_chunk_size = int(conf.get('disk_chunk_size', 65536))
+ self.headers = {
+ 'Content-Length': '0',
+ 'user-agent': 'obj-reconstructor %s' % os.getpid()}
+ self.handoffs_first = config_true_value(conf.get('handoffs_first',
+ False))
+ self._df_router = DiskFileRouter(conf, self.logger)
+
+ def load_object_ring(self, policy):
+ """
+ Make sure the policy's rings are loaded.
+
+ :param policy: the StoragePolicy instance
+ :returns: appropriate ring object
+ """
+ policy.load_ring(self.swift_dir)
+ return policy.object_ring
+
+ def check_ring(self, object_ring):
+ """
+ Check to see if the ring has been updated
+
+ :param object_ring: the ring to check
+ :returns: boolean indicating whether or not the ring has changed
+ """
+ if time.time() > self.next_check:
+ self.next_check = time.time() + self.ring_check_interval
+ if object_ring.has_changed():
+ return False
+ return True
+
+ def _full_path(self, node, part, path, policy):
+ return '%(replication_ip)s:%(replication_port)s' \
+ '/%(device)s/%(part)s%(path)s ' \
+ 'policy#%(policy)d frag#%(frag_index)s' % {
+ 'replication_ip': node['replication_ip'],
+ 'replication_port': node['replication_port'],
+ 'device': node['device'],
+ 'part': part, 'path': path,
+ 'policy': policy,
+ 'frag_index': node.get('index', 'handoff'),
+ }
+
+ def _get_response(self, node, part, path, headers, policy):
+ """
+ Helper method for reconstruction that GETs a single EC fragment
+ archive
+
+ :param node: the node to GET from
+ :param part: the partition
+ :param path: full path of the desired EC archive
+ :param headers: the headers to send
+ :param policy: an instance of
+ :class:`~swift.common.storage_policy.BaseStoragePolicy`
+ :returns: response
+ """
+ resp = None
+ headers['X-Backend-Node-Index'] = node['index']
+ try:
+ with ConnectionTimeout(self.conn_timeout):
+ conn = http_connect(node['ip'], node['port'], node['device'],
+ part, 'GET', path, headers=headers)
+ with Timeout(self.node_timeout):
+ resp = conn.getresponse()
+ if resp.status != HTTP_OK:
+ self.logger.warning(
+ _("Invalid response %(resp)s from %(full_path)s"),
+ {'resp': resp.status,
+ 'full_path': self._full_path(node, part, path, policy)})
+ resp = None
+ except (Exception, Timeout):
+ self.logger.exception(
+ _("Trying to GET %(full_path)s"), {
+ 'full_path': self._full_path(node, part, path, policy)})
+ return resp
+
+ def reconstruct_fa(self, job, node, metadata):
+ """
+ Reconstructs a fragment archive - this method is called from ssync
+ after a remote node responds that is missing this object - the local
+ diskfile is opened to provide metadata - but to reconstruct the
+ missing fragment archive we must connect to multiple object servers.
+
+ :param job: job from ssync_sender
+ :param node: node that we're rebuilding to
+ :param metadata: the metadata to attach to the rebuilt archive
+ :returns: a DiskFile like class for use by ssync
+ :raises DiskFileError: if the fragment archive cannot be reconstructed
+ """
+
+ part_nodes = job['policy'].object_ring.get_part_nodes(
+ job['partition'])
+ part_nodes.remove(node)
+
+ # the fragment index we need to reconstruct is the position index
+ # of the node we're rebuilding to within the primary part list
+ fi_to_rebuild = node['index']
+
+ # KISS send out connection requests to all nodes, see what sticks
+ headers = {
+ 'X-Backend-Storage-Policy-Index': int(job['policy']),
+ }
+ pile = GreenAsyncPile(len(part_nodes))
+ path = metadata['name']
+ for node in part_nodes:
+ pile.spawn(self._get_response, node, job['partition'],
+ path, headers, job['policy'])
+ responses = []
+ etag = None
+ for resp in pile:
+ if not resp:
+ continue
+ resp.headers = HeaderKeyDict(resp.getheaders())
+ responses.append(resp)
+ etag = sorted(responses, reverse=True,
+ key=lambda r: Timestamp(
+ r.headers.get('X-Backend-Timestamp')
+ ))[0].headers.get('X-Object-Sysmeta-Ec-Etag')
+ responses = [r for r in responses if
+ r.headers.get('X-Object-Sysmeta-Ec-Etag') == etag]
+
+ if len(responses) >= job['policy'].ec_ndata:
+ break
+ else:
+ self.logger.error(
+ 'Unable to get enough responses (%s/%s) '
+ 'to reconstruct %s with ETag %s' % (
+ len(responses), job['policy'].ec_ndata,
+ self._full_path(node, job['partition'],
+ metadata['name'], job['policy']),
+ etag))
+ raise DiskFileError('Unable to reconstruct EC archive')
+
+ rebuilt_fragment_iter = self.make_rebuilt_fragment_iter(
+ responses[:job['policy'].ec_ndata], path, job['policy'],
+ fi_to_rebuild)
+ return RebuildingECDiskFileStream(metadata, fi_to_rebuild,
+ rebuilt_fragment_iter)
+
+ def _reconstruct(self, policy, fragment_payload, frag_index):
+ # XXX with jerasure this doesn't work if we need to rebuild a
+ # parity fragment, and not all data fragments are available
+ # segment = policy.pyeclib_driver.reconstruct(
+ # fragment_payload, [frag_index])[0]
+
+ # for safety until pyeclib 1.0.7 we'll just use decode and encode
+ segment = policy.pyeclib_driver.decode(fragment_payload)
+ return policy.pyeclib_driver.encode(segment)[frag_index]
+
+ def make_rebuilt_fragment_iter(self, responses, path, policy, frag_index):
+ """
+ Turn a set of connections from backend object servers into a generator
+ that yields up the rebuilt fragment archive for frag_index.
+ """
+
+ def _get_one_fragment(resp):
+ buff = ''
+ remaining_bytes = policy.fragment_size
+ while remaining_bytes:
+ chunk = resp.read(remaining_bytes)
+ if not chunk:
+ break
+ remaining_bytes -= len(chunk)
+ buff += chunk
+ return buff
+
+ def fragment_payload_iter():
+ # We need a fragment from each connections, so best to
+ # use a GreenPile to keep them ordered and in sync
+ pile = GreenPile(len(responses))
+ while True:
+ for resp in responses:
+ pile.spawn(_get_one_fragment, resp)
+ try:
+ with Timeout(self.node_timeout):
+ fragment_payload = [fragment for fragment in pile]
+ except (Exception, Timeout):
+ self.logger.exception(
+ _("Error trying to rebuild %(path)s "
+ "policy#%(policy)d frag#%(frag_index)s"), {
+ 'path': path,
+ 'policy': policy,
+ 'frag_index': frag_index,
+ })
+ break
+ if not all(fragment_payload):
+ break
+ rebuilt_fragment = self._reconstruct(
+ policy, fragment_payload, frag_index)
+ yield rebuilt_fragment
+
+ return fragment_payload_iter()
+
+ def stats_line(self):
+ """
+ Logs various stats for the currently running reconstruction pass.
+ """
+ if self.reconstruction_count:
+ elapsed = (time.time() - self.start) or 0.000001
+ rate = self.reconstruction_count / elapsed
+ self.logger.info(
+ _("%(reconstructed)d/%(total)d (%(percentage).2f%%)"
+ " partitions reconstructed in %(time).2fs (%(rate).2f/sec, "
+ "%(remaining)s remaining)"),
+ {'reconstructed': self.reconstruction_count,
+ 'total': self.job_count,
+ 'percentage':
+ self.reconstruction_count * 100.0 / self.job_count,
+ 'time': time.time() - self.start, 'rate': rate,
+ 'remaining': '%d%s' % compute_eta(self.start,
+ self.reconstruction_count,
+ self.job_count)})
+ if self.suffix_count:
+ self.logger.info(
+ _("%(checked)d suffixes checked - "
+ "%(hashed).2f%% hashed, %(synced).2f%% synced"),
+ {'checked': self.suffix_count,
+ 'hashed': (self.suffix_hash * 100.0) / self.suffix_count,
+ 'synced': (self.suffix_sync * 100.0) / self.suffix_count})
+ self.partition_times.sort()
+ self.logger.info(
+ _("Partition times: max %(max).4fs, "
+ "min %(min).4fs, med %(med).4fs"),
+ {'max': self.partition_times[-1],
+ 'min': self.partition_times[0],
+ 'med': self.partition_times[
+ len(self.partition_times) // 2]})
+ else:
+ self.logger.info(
+ _("Nothing reconstructed for %s seconds."),
+ (time.time() - self.start))
+
+ def kill_coros(self):
+ """Utility function that kills all coroutines currently running."""
+ for coro in list(self.run_pool.coroutines_running):
+ try:
+ coro.kill(GreenletExit)
+ except GreenletExit:
+ pass
+
+ def heartbeat(self):
+ """
+ Loop that runs in the background during reconstruction. It
+ periodically logs progress.
+ """
+ while True:
+ sleep(self.stats_interval)
+ self.stats_line()
+
+ def detect_lockups(self):
+ """
+ In testing, the pool.waitall() call very occasionally failed to return.
+ This is an attempt to make sure the reconstructor finishes its
+ reconstruction pass in some eventuality.
+ """
+ while True:
+ sleep(self.lockup_timeout)
+ if self.reconstruction_count == self.last_reconstruction_count:
+ self.logger.error(_("Lockup detected.. killing live coros."))
+ self.kill_coros()
+ self.last_reconstruction_count = self.reconstruction_count
+
+ def _get_partners(self, frag_index, part_nodes):
+ """
+ Returns the left and right partners of the node whose index is
+ equal to the given frag_index.
+
+ :param frag_index: a fragment index
+ :param part_nodes: a list of primary nodes
+ :returns: [<node-to-left>, <node-to-right>]
+ """
+ return [
+ part_nodes[(frag_index - 1) % len(part_nodes)],
+ part_nodes[(frag_index + 1) % len(part_nodes)],
+ ]
+
+ def _get_hashes(self, policy, path, recalculate=None, do_listdir=False):
+ df_mgr = self._df_router[policy]
+ hashed, suffix_hashes = tpool_reraise(
+ df_mgr._get_hashes, path, recalculate=recalculate,
+ do_listdir=do_listdir, reclaim_age=self.reclaim_age)
+ self.logger.update_stats('suffix.hashes', hashed)
+ return suffix_hashes
+
+ def get_suffix_delta(self, local_suff, local_index,
+ remote_suff, remote_index):
+ """
+ Compare the local suffix hashes with the remote suffix hashes
+ for the given local and remote fragment indexes. Return those
+ suffixes which should be synced.
+
+ :param local_suff: the local suffix hashes (from _get_hashes)
+ :param local_index: the local fragment index for the job
+ :param remote_suff: the remote suffix hashes (from remote
+ REPLICATE request)
+ :param remote_index: the remote fragment index for the job
+
+ :returns: a list of strings, the suffix dirs to sync
+ """
+ suffixes = []
+ for suffix, sub_dict_local in local_suff.iteritems():
+ sub_dict_remote = remote_suff.get(suffix, {})
+ if (sub_dict_local.get(None) != sub_dict_remote.get(None) or
+ sub_dict_local.get(local_index) !=
+ sub_dict_remote.get(remote_index)):
+ suffixes.append(suffix)
+ return suffixes
+
+ def rehash_remote(self, node, job, suffixes):
+ try:
+ with Timeout(self.http_timeout):
+ conn = http_connect(
+ node['replication_ip'], node['replication_port'],
+ node['device'], job['partition'], 'REPLICATE',
+ '/' + '-'.join(sorted(suffixes)),
+ headers=self.headers)
+ conn.getresponse().read()
+ except (Exception, Timeout):
+ self.logger.exception(
+ _("Trying to sync suffixes with %s") % self._full_path(
+ node, job['partition'], '', job['policy']))
+
+ def _get_suffixes_to_sync(self, job, node):
+ """
+ For SYNC jobs we need to make a remote REPLICATE request to get
+ the remote node's current suffix's hashes and then compare to our
+ local suffix's hashes to decide which suffixes (if any) are out
+ of sync.
+
+ :param: the job dict, with the keys defined in ``_get_part_jobs``
+ :param node: the remote node dict
+ :returns: a (possibly empty) list of strings, the suffixes to be
+ synced with the remote node.
+ """
+ # get hashes from the remote node
+ remote_suffixes = None
+ try:
+ with Timeout(self.http_timeout):
+ resp = http_connect(
+ node['replication_ip'], node['replication_port'],
+ node['device'], job['partition'], 'REPLICATE',
+ '', headers=self.headers).getresponse()
+ if resp.status == HTTP_INSUFFICIENT_STORAGE:
+ self.logger.error(
+ _('%s responded as unmounted'),
+ self._full_path(node, job['partition'], '',
+ job['policy']))
+ elif resp.status != HTTP_OK:
+ self.logger.error(
+ _("Invalid response %(resp)s "
+ "from %(full_path)s"), {
+ 'resp': resp.status,
+ 'full_path': self._full_path(
+ node, job['partition'], '',
+ job['policy'])
+ })
+ else:
+ remote_suffixes = pickle.loads(resp.read())
+ except (Exception, Timeout):
+ # all exceptions are logged here so that our caller can
+ # safely catch our exception and continue to the next node
+ # without logging
+ self.logger.exception('Unable to get remote suffix hashes '
+ 'from %r' % self._full_path(
+ node, job['partition'], '',
+ job['policy']))
+
+ if remote_suffixes is None:
+ raise SuffixSyncError('Unable to get remote suffix hashes')
+
+ suffixes = self.get_suffix_delta(job['hashes'],
+ job['frag_index'],
+ remote_suffixes,
+ node['index'])
+ # now recalculate local hashes for suffixes that don't
+ # match so we're comparing the latest
+ local_suff = self._get_hashes(job['policy'], job['path'],
+ recalculate=suffixes)
+
+ suffixes = self.get_suffix_delta(local_suff,
+ job['frag_index'],
+ remote_suffixes,
+ node['index'])
+
+ self.suffix_count += len(suffixes)
+ return suffixes
+
+ def delete_reverted_objs(self, job, objects, frag_index):
+ """
+ For EC we can potentially revert only some of a partition
+ so we'll delete reverted objects here. Note that we delete
+ the fragment index of the file we sent to the remote node.
+
+ :param job: the job being processed
+ :param objects: a dict of objects to be deleted, each entry maps
+ hash=>timestamp
+ :param frag_index: (int) the fragment index of data files to be deleted
+ """
+ df_mgr = self._df_router[job['policy']]
+ for object_hash, timestamp in objects.items():
+ try:
+ df = df_mgr.get_diskfile_from_hash(
+ job['local_dev']['device'], job['partition'],
+ object_hash, job['policy'],
+ frag_index=frag_index)
+ df.purge(Timestamp(timestamp), frag_index)
+ except DiskFileError:
+ continue
+
+ def process_job(self, job):
+ """
+ Sync the local partition with the remote node(s) according to
+ the parameters of the job. For primary nodes, the SYNC job type
+ will define both left and right hand sync_to nodes to ssync with
+ as defined by this primary nodes index in the node list based on
+ the fragment index found in the partition. For non-primary
+ nodes (either handoff revert, or rebalance) the REVERT job will
+ define a single node in sync_to which is the proper/new home for
+ the fragment index.
+
+ N.B. ring rebalancing can be time consuming and handoff nodes'
+ fragment indexes do not have a stable order, it's possible to
+ have more than one REVERT job for a partition, and in some rare
+ failure conditions there may even also be a SYNC job for the
+ same partition - but each one will be processed separately
+ because each job will define a separate list of node(s) to
+ 'sync_to'.
+
+ :param: the job dict, with the keys defined in ``_get_job_info``
+ """
+ self.headers['X-Backend-Storage-Policy-Index'] = int(job['policy'])
+ begin = time.time()
+ if job['job_type'] == REVERT:
+ self._revert(job, begin)
+ else:
+ self._sync(job, begin)
+ self.partition_times.append(time.time() - begin)
+ self.reconstruction_count += 1
+
+ def _sync(self, job, begin):
+ """
+ Process a SYNC job.
+ """
+ self.logger.increment(
+ 'partition.update.count.%s' % (job['local_dev']['device'],))
+ # after our left and right partners, if there's some sort of
+ # failure we'll continue onto the remaining primary nodes and
+ # make sure they're in sync - or potentially rebuild missing
+ # fragments we find
+ dest_nodes = itertools.chain(
+ job['sync_to'],
+ # I think we could order these based on our index to better
+ # protect against a broken chain
+ itertools.ifilter(
+ lambda n: n['id'] not in (n['id'] for n in job['sync_to']),
+ job['policy'].object_ring.get_part_nodes(job['partition'])),
+ )
+ syncd_with = 0
+ for node in dest_nodes:
+ if syncd_with >= len(job['sync_to']):
+ # success!
+ break
+
+ try:
+ suffixes = self._get_suffixes_to_sync(job, node)
+ except SuffixSyncError:
+ continue
+
+ if not suffixes:
+ syncd_with += 1
+ continue
+
+ # ssync any out-of-sync suffixes with the remote node
+ success, _ = ssync_sender(
+ self, node, job, suffixes)()
+ # let remote end know to rehash it's suffixes
+ self.rehash_remote(node, job, suffixes)
+ # update stats for this attempt
+ self.suffix_sync += len(suffixes)
+ self.logger.update_stats('suffix.syncs', len(suffixes))
+ if success:
+ syncd_with += 1
+ self.logger.timing_since('partition.update.timing', begin)
+
+ def _revert(self, job, begin):
+ """
+ Process a REVERT job.
+ """
+ self.logger.increment(
+ 'partition.delete.count.%s' % (job['local_dev']['device'],))
+ # we'd desperately like to push this partition back to it's
+ # primary location, but if that node is down, the next best thing
+ # is one of the handoff locations - which *might* be us already!
+ dest_nodes = itertools.chain(
+ job['sync_to'],
+ job['policy'].object_ring.get_more_nodes(job['partition']),
+ )
+ syncd_with = 0
+ reverted_objs = {}
+ for node in dest_nodes:
+ if syncd_with >= len(job['sync_to']):
+ break
+ if node['id'] == job['local_dev']['id']:
+ # this is as good a place as any for this data for now
+ break
+ success, in_sync_objs = ssync_sender(
+ self, node, job, job['suffixes'])()
+ self.rehash_remote(node, job, job['suffixes'])
+ if success:
+ syncd_with += 1
+ reverted_objs.update(in_sync_objs)
+ if syncd_with >= len(job['sync_to']):
+ self.delete_reverted_objs(
+ job, reverted_objs, job['frag_index'])
+ self.logger.timing_since('partition.delete.timing', begin)
+
+ def _get_part_jobs(self, local_dev, part_path, partition, policy):
+ """
+ Helper function to build jobs for a partition, this method will
+ read the suffix hashes and create job dictionaries to describe
+ the needed work. There will be one job for each fragment index
+ discovered in the partition.
+
+ For a fragment index which corresponds to this node's ring
+ index, a job with job_type SYNC will be created to ensure that
+ the left and right hand primary ring nodes for the part have the
+ corresponding left and right hand fragment archives.
+
+ A fragment index (or entire partition) for which this node is
+ not the primary corresponding node, will create job(s) with
+ job_type REVERT to ensure that fragment archives are pushed to
+ the correct node and removed from this one.
+
+ A partition may result in multiple jobs. Potentially many
+ REVERT jobs, and zero or one SYNC job.
+
+ :param local_dev: the local device
+ :param part_path: full path to partition
+ :param partition: partition number
+ :param policy: the policy
+
+ :returns: a list of dicts of job info
+ """
+ # find all the fi's in the part, and which suffixes have them
+ hashes = self._get_hashes(policy, part_path, do_listdir=True)
+ non_data_fragment_suffixes = []
+ data_fi_to_suffixes = defaultdict(list)
+ for suffix, fi_hash in hashes.items():
+ if not fi_hash:
+ # this is for sanity and clarity, normally an empty
+ # suffix would get del'd from the hashes dict, but an
+ # OSError trying to re-hash the suffix could leave the
+ # value empty - it will log the exception; but there's
+ # no way to properly address this suffix at this time.
+ continue
+ data_frag_indexes = [f for f in fi_hash if f is not None]
+ if not data_frag_indexes:
+ non_data_fragment_suffixes.append(suffix)
+ else:
+ for fi in data_frag_indexes:
+ data_fi_to_suffixes[fi].append(suffix)
+
+ # helper to ensure consistent structure of jobs
+ def build_job(job_type, frag_index, suffixes, sync_to):
+ return {
+ 'job_type': job_type,
+ 'frag_index': frag_index,
+ 'suffixes': suffixes,
+ 'sync_to': sync_to,
+ 'partition': partition,
+ 'path': part_path,
+ 'hashes': hashes,
+ 'policy': policy,
+ 'local_dev': local_dev,
+ # ssync likes to have it handy
+ 'device': local_dev['device'],
+ }
+
+ # aggregate jobs for all the fragment index in this part
+ jobs = []
+
+ # check the primary nodes - to see if the part belongs here
+ part_nodes = policy.object_ring.get_part_nodes(partition)
+ for node in part_nodes:
+ if node['id'] == local_dev['id']:
+ # this partition belongs here, we'll need a sync job
+ frag_index = node['index']
+ try:
+ suffixes = data_fi_to_suffixes.pop(frag_index)
+ except KeyError:
+ suffixes = []
+ sync_job = build_job(
+ job_type=SYNC,
+ frag_index=frag_index,
+ suffixes=suffixes,
+ sync_to=self._get_partners(frag_index, part_nodes),
+ )
+ # ssync callback to rebuild missing fragment_archives
+ sync_job['sync_diskfile_builder'] = self.reconstruct_fa
+ jobs.append(sync_job)
+ break
+
+ # assign remaining data fragment suffixes to revert jobs
+ ordered_fis = sorted((len(suffixes), fi) for fi, suffixes
+ in data_fi_to_suffixes.items())
+ for count, fi in ordered_fis:
+ revert_job = build_job(
+ job_type=REVERT,
+ frag_index=fi,
+ suffixes=data_fi_to_suffixes[fi],
+ sync_to=[part_nodes[fi]],
+ )
+ jobs.append(revert_job)
+
+ # now we need to assign suffixes that have no data fragments
+ if non_data_fragment_suffixes:
+ if jobs:
+ # the first job will be either the sync_job, or the
+ # revert_job for the fragment index that is most common
+ # among the suffixes
+ jobs[0]['suffixes'].extend(non_data_fragment_suffixes)
+ else:
+ # this is an unfortunate situation, we need a revert job to
+ # push partitions off this node, but none of the suffixes
+ # have any data fragments to hint at which node would be a
+ # good candidate to receive the tombstones.
+ jobs.append(build_job(
+ job_type=REVERT,
+ frag_index=None,
+ suffixes=non_data_fragment_suffixes,
+ # this is super safe
+ sync_to=part_nodes,
+ # something like this would be probably be better
+ # sync_to=random.sample(part_nodes, 3),
+ ))
+ # return a list of jobs for this part
+ return jobs
+
+ def collect_parts(self, override_devices=None,
+ override_partitions=None):
+ """
+ Helper for yielding partitions in the top level reconstructor
+ """
+ override_devices = override_devices or []
+ override_partitions = override_partitions or []
+ ips = whataremyips()
+ for policy in POLICIES:
+ if policy.policy_type != EC_POLICY:
+ continue
+ self._diskfile_mgr = self._df_router[policy]
+ self.load_object_ring(policy)
+ data_dir = get_data_dir(policy)
+ local_devices = itertools.ifilter(
+ lambda dev: dev and is_local_device(
+ ips, self.port,
+ dev['replication_ip'], dev['replication_port']),
+ policy.object_ring.devs)
+ for local_dev in local_devices:
+ if override_devices and (local_dev['device'] not in
+ override_devices):
+ continue
+ dev_path = join(self.devices_dir, local_dev['device'])
+ obj_path = join(dev_path, data_dir)
+ tmp_path = join(dev_path, get_tmp_dir(int(policy)))
+ if self.mount_check and not ismount(dev_path):
+ self.logger.warn(_('%s is not mounted'),
+ local_dev['device'])
+ continue
+ unlink_older_than(tmp_path, time.time() -
+ self.reclaim_age)
+ if not os.path.exists(obj_path):
+ try:
+ mkdirs(obj_path)
+ except Exception:
+ self.logger.exception(
+ 'Unable to create %s' % obj_path)
+ continue
+ try:
+ partitions = os.listdir(obj_path)
+ except OSError:
+ self.logger.exception(
+ 'Unable to list partitions in %r' % obj_path)
+ continue
+ for partition in partitions:
+ part_path = join(obj_path, partition)
+ if not (partition.isdigit() and
+ os.path.isdir(part_path)):
+ self.logger.warning(
+ 'Unexpected entity in data dir: %r' % part_path)
+ remove_file(part_path)
+ continue
+ partition = int(partition)
+ if override_partitions and (partition not in
+ override_partitions):
+ continue
+ part_info = {
+ 'local_dev': local_dev,
+ 'policy': policy,
+ 'partition': partition,
+ 'part_path': part_path,
+ }
+ yield part_info
+
+ def build_reconstruction_jobs(self, part_info):
+ """
+ Helper function for collect_jobs to build jobs for reconstruction
+ using EC style storage policy
+ """
+ jobs = self._get_part_jobs(**part_info)
+ random.shuffle(jobs)
+ if self.handoffs_first:
+ # Move the handoff revert jobs to the front of the list
+ jobs.sort(key=lambda job: job['job_type'], reverse=True)
+ self.job_count += len(jobs)
+ return jobs
+
+ def _reset_stats(self):
+ self.start = time.time()
+ self.job_count = 0
+ self.suffix_count = 0
+ self.suffix_sync = 0
+ self.suffix_hash = 0
+ self.reconstruction_count = 0
+ self.last_reconstruction_count = -1
+
+ def delete_partition(self, path):
+ self.logger.info(_("Removing partition: %s"), path)
+ tpool.execute(shutil.rmtree, path, ignore_errors=True)
+
+ def reconstruct(self, **kwargs):
+ """Run a reconstruction pass"""
+ self._reset_stats()
+ self.partition_times = []
+
+ stats = spawn(self.heartbeat)
+ lockup_detector = spawn(self.detect_lockups)
+ sleep() # Give spawns a cycle
+
+ try:
+ self.run_pool = GreenPool(size=self.concurrency)
+ for part_info in self.collect_parts(**kwargs):
+ if not self.check_ring(part_info['policy'].object_ring):
+ self.logger.info(_("Ring change detected. Aborting "
+ "current reconstruction pass."))
+ return
+ jobs = self.build_reconstruction_jobs(part_info)
+ if not jobs:
+ # If this part belongs on this node, _get_part_jobs
+ # will *always* build a sync_job - even if there's
+ # no suffixes in the partition that needs to sync.
+ # If there's any suffixes in the partition then our
+ # job list would have *at least* one revert job.
+ # Therefore we know this part a) doesn't belong on
+ # this node and b) doesn't have any suffixes in it.
+ self.run_pool.spawn(self.delete_partition,
+ part_info['part_path'])
+ for job in jobs:
+ self.run_pool.spawn(self.process_job, job)
+ with Timeout(self.lockup_timeout):
+ self.run_pool.waitall()
+ except (Exception, Timeout):
+ self.logger.exception(_("Exception in top-level"
+ "reconstruction loop"))
+ self.kill_coros()
+ finally:
+ stats.kill()
+ lockup_detector.kill()
+ self.stats_line()
+
+ def run_once(self, *args, **kwargs):
+ start = time.time()
+ self.logger.info(_("Running object reconstructor in script mode."))
+ override_devices = list_from_csv(kwargs.get('devices'))
+ override_partitions = [int(p) for p in
+ list_from_csv(kwargs.get('partitions'))]
+ self.reconstruct(
+ override_devices=override_devices,
+ override_partitions=override_partitions)
+ total = (time.time() - start) / 60
+ self.logger.info(
+ _("Object reconstruction complete (once). (%.02f minutes)"), total)
+ if not (override_partitions or override_devices):
+ dump_recon_cache({'object_reconstruction_time': total,
+ 'object_reconstruction_last': time.time()},
+ self.rcache, self.logger)
+
+ def run_forever(self, *args, **kwargs):
+ self.logger.info(_("Starting object reconstructor in daemon mode."))
+ # Run the reconstructor continually
+ while True:
+ start = time.time()
+ self.logger.info(_("Starting object reconstruction pass."))
+ # Run the reconstructor
+ self.reconstruct()
+ total = (time.time() - start) / 60
+ self.logger.info(
+ _("Object reconstruction complete. (%.02f minutes)"), total)
+ dump_recon_cache({'object_reconstruction_time': total,
+ 'object_reconstruction_last': time.time()},
+ self.rcache, self.logger)
+ self.logger.debug('reconstruction sleeping for %s seconds.',
+ self.run_pause)
+ sleep(self.run_pause)
diff --git a/swift/obj/replicator.py b/swift/obj/replicator.py
index 5ee32884c..580d1827e 100644
--- a/swift/obj/replicator.py
+++ b/swift/obj/replicator.py
@@ -39,7 +39,7 @@ from swift.common.http import HTTP_OK, HTTP_INSUFFICIENT_STORAGE
from swift.obj import ssync_sender
from swift.obj.diskfile import (DiskFileManager, get_hashes, get_data_dir,
get_tmp_dir)
-from swift.common.storage_policy import POLICIES
+from swift.common.storage_policy import POLICIES, REPL_POLICY
hubs.use_hub(get_hub())
@@ -110,14 +110,15 @@ class ObjectReplicator(Daemon):
"""
return self.sync_method(node, job, suffixes, *args, **kwargs)
- def get_object_ring(self, policy_idx):
+ def load_object_ring(self, policy):
"""
- Get the ring object to use to handle a request based on its policy.
+ Make sure the policy's rings are loaded.
- :policy_idx: policy index as defined in swift.conf
+ :param policy: the StoragePolicy instance
:returns: appropriate ring object
"""
- return POLICIES.get_object_ring(policy_idx, self.swift_dir)
+ policy.load_ring(self.swift_dir)
+ return policy.object_ring
def _rsync(self, args):
"""
@@ -170,7 +171,7 @@ class ObjectReplicator(Daemon):
sync method in Swift.
"""
if not os.path.exists(job['path']):
- return False, set()
+ return False, {}
args = [
'rsync',
'--recursive',
@@ -195,11 +196,11 @@ class ObjectReplicator(Daemon):
args.append(spath)
had_any = True
if not had_any:
- return False, set()
- data_dir = get_data_dir(job['policy_idx'])
+ return False, {}
+ data_dir = get_data_dir(job['policy'])
args.append(join(rsync_module, node['device'],
data_dir, job['partition']))
- return self._rsync(args) == 0, set()
+ return self._rsync(args) == 0, {}
def ssync(self, node, job, suffixes, remote_check_objs=None):
return ssync_sender.Sender(
@@ -231,7 +232,7 @@ class ObjectReplicator(Daemon):
if len(suff) == 3 and isdir(join(path, suff))]
self.replication_count += 1
self.logger.increment('partition.delete.count.%s' % (job['device'],))
- self.headers['X-Backend-Storage-Policy-Index'] = job['policy_idx']
+ self.headers['X-Backend-Storage-Policy-Index'] = int(job['policy'])
begin = time.time()
try:
responses = []
@@ -245,8 +246,9 @@ class ObjectReplicator(Daemon):
self.conf.get('sync_method', 'rsync') == 'ssync':
kwargs['remote_check_objs'] = \
synced_remote_regions[node['region']]
- # cand_objs is a list of objects for deletion
- success, cand_objs = self.sync(
+ # candidates is a dict(hash=>timestamp) of objects
+ # for deletion
+ success, candidates = self.sync(
node, job, suffixes, **kwargs)
if success:
with Timeout(self.http_timeout):
@@ -257,7 +259,8 @@ class ObjectReplicator(Daemon):
'/' + '-'.join(suffixes), headers=self.headers)
conn.getresponse().read()
if node['region'] != job['region']:
- synced_remote_regions[node['region']] = cand_objs
+ synced_remote_regions[node['region']] = \
+ candidates.keys()
responses.append(success)
for region, cand_objs in synced_remote_regions.iteritems():
if delete_objs is None:
@@ -314,7 +317,7 @@ class ObjectReplicator(Daemon):
"""
self.replication_count += 1
self.logger.increment('partition.update.count.%s' % (job['device'],))
- self.headers['X-Backend-Storage-Policy-Index'] = job['policy_idx']
+ self.headers['X-Backend-Storage-Policy-Index'] = int(job['policy'])
begin = time.time()
try:
hashed, local_hash = tpool_reraise(
@@ -328,7 +331,8 @@ class ObjectReplicator(Daemon):
random.shuffle(job['nodes'])
nodes = itertools.chain(
job['nodes'],
- job['object_ring'].get_more_nodes(int(job['partition'])))
+ job['policy'].object_ring.get_more_nodes(
+ int(job['partition'])))
while attempts_left > 0:
# If this throws StopIteration it will be caught way below
node = next(nodes)
@@ -460,16 +464,15 @@ class ObjectReplicator(Daemon):
self.kill_coros()
self.last_replication_count = self.replication_count
- def process_repl(self, policy, ips, override_devices=None,
- override_partitions=None):
+ def build_replication_jobs(self, policy, ips, override_devices=None,
+ override_partitions=None):
"""
Helper function for collect_jobs to build jobs for replication
using replication style storage policy
"""
jobs = []
- obj_ring = self.get_object_ring(policy.idx)
- data_dir = get_data_dir(policy.idx)
- for local_dev in [dev for dev in obj_ring.devs
+ data_dir = get_data_dir(policy)
+ for local_dev in [dev for dev in policy.object_ring.devs
if (dev
and is_local_device(ips,
self.port,
@@ -479,7 +482,7 @@ class ObjectReplicator(Daemon):
or dev['device'] in override_devices))]:
dev_path = join(self.devices_dir, local_dev['device'])
obj_path = join(dev_path, data_dir)
- tmp_path = join(dev_path, get_tmp_dir(int(policy)))
+ tmp_path = join(dev_path, get_tmp_dir(policy))
if self.mount_check and not ismount(dev_path):
self.logger.warn(_('%s is not mounted'), local_dev['device'])
continue
@@ -497,7 +500,8 @@ class ObjectReplicator(Daemon):
try:
job_path = join(obj_path, partition)
- part_nodes = obj_ring.get_part_nodes(int(partition))
+ part_nodes = policy.object_ring.get_part_nodes(
+ int(partition))
nodes = [node for node in part_nodes
if node['id'] != local_dev['id']]
jobs.append(
@@ -506,9 +510,8 @@ class ObjectReplicator(Daemon):
obj_path=obj_path,
nodes=nodes,
delete=len(nodes) > len(part_nodes) - 1,
- policy_idx=policy.idx,
+ policy=policy,
partition=partition,
- object_ring=obj_ring,
region=local_dev['region']))
except ValueError:
continue
@@ -530,13 +533,15 @@ class ObjectReplicator(Daemon):
jobs = []
ips = whataremyips()
for policy in POLICIES:
- if (override_policies is not None
- and str(policy.idx) not in override_policies):
- continue
- # may need to branch here for future policy types
- jobs += self.process_repl(policy, ips,
- override_devices=override_devices,
- override_partitions=override_partitions)
+ if policy.policy_type == REPL_POLICY:
+ if (override_policies is not None and
+ str(policy.idx) not in override_policies):
+ continue
+ # ensure rings are loaded for policy
+ self.load_object_ring(policy)
+ jobs += self.build_replication_jobs(
+ policy, ips, override_devices=override_devices,
+ override_partitions=override_partitions)
random.shuffle(jobs)
if self.handoffs_first:
# Move the handoff parts to the front of the list
@@ -569,7 +574,7 @@ class ObjectReplicator(Daemon):
if self.mount_check and not ismount(dev_path):
self.logger.warn(_('%s is not mounted'), job['device'])
continue
- if not self.check_ring(job['object_ring']):
+ if not self.check_ring(job['policy'].object_ring):
self.logger.info(_("Ring change detected. Aborting "
"current replication pass."))
return
diff --git a/swift/obj/server.py b/swift/obj/server.py
index ad0f9faeb..658f207a8 100644
--- a/swift/obj/server.py
+++ b/swift/obj/server.py
@@ -16,10 +16,12 @@
""" Object Server for Swift """
import cPickle as pickle
+import json
import os
import multiprocessing
import time
import traceback
+import rfc822
import socket
import math
from swift import gettext_ as _
@@ -30,7 +32,7 @@ from eventlet import sleep, wsgi, Timeout
from swift.common.utils import public, get_logger, \
config_true_value, timing_stats, replication, \
normalize_delete_at_timestamp, get_log_line, Timestamp, \
- get_expirer_container
+ get_expirer_container, iter_multipart_mime_documents
from swift.common.bufferedhttp import http_connect
from swift.common.constraints import check_object_creation, \
valid_timestamp, check_utf8
@@ -48,8 +50,35 @@ from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPCreated, \
HTTPPreconditionFailed, HTTPRequestTimeout, HTTPUnprocessableEntity, \
HTTPClientDisconnect, HTTPMethodNotAllowed, Request, Response, \
HTTPInsufficientStorage, HTTPForbidden, HTTPException, HeaderKeyDict, \
- HTTPConflict
-from swift.obj.diskfile import DATAFILE_SYSTEM_META, DiskFileManager
+ HTTPConflict, HTTPServerError
+from swift.obj.diskfile import DATAFILE_SYSTEM_META, DiskFileRouter
+
+
+def iter_mime_headers_and_bodies(wsgi_input, mime_boundary, read_chunk_size):
+ mime_documents_iter = iter_multipart_mime_documents(
+ wsgi_input, mime_boundary, read_chunk_size)
+
+ for file_like in mime_documents_iter:
+ hdrs = HeaderKeyDict(rfc822.Message(file_like, 0))
+ yield (hdrs, file_like)
+
+
+def drain(file_like, read_size, timeout):
+ """
+ Read and discard any bytes from file_like.
+
+ :param file_like: file-like object to read from
+ :param read_size: how big a chunk to read at a time
+ :param timeout: how long to wait for a read (use None for no timeout)
+
+ :raises ChunkReadTimeout: if no chunk was read in time
+ """
+
+ while True:
+ with ChunkReadTimeout(timeout):
+ chunk = file_like.read(read_size)
+ if not chunk:
+ break
class EventletPlungerString(str):
@@ -142,7 +171,7 @@ class ObjectController(BaseStorageServer):
# Common on-disk hierarchy shared across account, container and object
# servers.
- self._diskfile_mgr = DiskFileManager(conf, self.logger)
+ self._diskfile_router = DiskFileRouter(conf, self.logger)
# This is populated by global_conf_callback way below as the semaphore
# is shared by all workers.
if 'replication_semaphore' in conf:
@@ -156,7 +185,7 @@ class ObjectController(BaseStorageServer):
conf.get('replication_failure_ratio') or 1.0)
def get_diskfile(self, device, partition, account, container, obj,
- policy_idx, **kwargs):
+ policy, **kwargs):
"""
Utility method for instantiating a DiskFile object supporting a given
REST API.
@@ -165,11 +194,11 @@ class ObjectController(BaseStorageServer):
DiskFile class would simply over-ride this method to provide that
behavior.
"""
- return self._diskfile_mgr.get_diskfile(
- device, partition, account, container, obj, policy_idx, **kwargs)
+ return self._diskfile_router[policy].get_diskfile(
+ device, partition, account, container, obj, policy, **kwargs)
def async_update(self, op, account, container, obj, host, partition,
- contdevice, headers_out, objdevice, policy_index):
+ contdevice, headers_out, objdevice, policy):
"""
Sends or saves an async update.
@@ -183,7 +212,7 @@ class ObjectController(BaseStorageServer):
:param headers_out: dictionary of headers to send in the container
request
:param objdevice: device name that the object is in
- :param policy_index: the associated storage policy index
+ :param policy: the associated BaseStoragePolicy instance
"""
headers_out['user-agent'] = 'object-server %s' % os.getpid()
full_path = '/%s/%s/%s' % (account, container, obj)
@@ -213,12 +242,11 @@ class ObjectController(BaseStorageServer):
data = {'op': op, 'account': account, 'container': container,
'obj': obj, 'headers': headers_out}
timestamp = headers_out['x-timestamp']
- self._diskfile_mgr.pickle_async_update(objdevice, account, container,
- obj, data, timestamp,
- policy_index)
+ self._diskfile_router[policy].pickle_async_update(
+ objdevice, account, container, obj, data, timestamp, policy)
def container_update(self, op, account, container, obj, request,
- headers_out, objdevice, policy_idx):
+ headers_out, objdevice, policy):
"""
Update the container when objects are updated.
@@ -230,6 +258,7 @@ class ObjectController(BaseStorageServer):
:param headers_out: dictionary of headers to send in the container
request(s)
:param objdevice: device name that the object is in
+ :param policy: the BaseStoragePolicy instance
"""
headers_in = request.headers
conthosts = [h.strip() for h in
@@ -255,14 +284,14 @@ class ObjectController(BaseStorageServer):
headers_out['x-trans-id'] = headers_in.get('x-trans-id', '-')
headers_out['referer'] = request.as_referer()
- headers_out['X-Backend-Storage-Policy-Index'] = policy_idx
+ headers_out['X-Backend-Storage-Policy-Index'] = int(policy)
for conthost, contdevice in updates:
self.async_update(op, account, container, obj, conthost,
contpartition, contdevice, headers_out,
- objdevice, policy_idx)
+ objdevice, policy)
def delete_at_update(self, op, delete_at, account, container, obj,
- request, objdevice, policy_index):
+ request, objdevice, policy):
"""
Update the expiring objects container when objects are updated.
@@ -273,7 +302,7 @@ class ObjectController(BaseStorageServer):
:param obj: object name
:param request: the original request driving the update
:param objdevice: device name that the object is in
- :param policy_index: the policy index to be used for tmp dir
+ :param policy: the BaseStoragePolicy instance (used for tmp dir)
"""
if config_true_value(
request.headers.get('x-backend-replication', 'f')):
@@ -333,13 +362,66 @@ class ObjectController(BaseStorageServer):
op, self.expiring_objects_account, delete_at_container,
'%s-%s/%s/%s' % (delete_at, account, container, obj),
host, partition, contdevice, headers_out, objdevice,
- policy_index)
+ policy)
+
+ def _make_timeout_reader(self, file_like):
+ def timeout_reader():
+ with ChunkReadTimeout(self.client_timeout):
+ return file_like.read(self.network_chunk_size)
+ return timeout_reader
+
+ def _read_put_commit_message(self, mime_documents_iter):
+ rcvd_commit = False
+ try:
+ with ChunkReadTimeout(self.client_timeout):
+ commit_hdrs, commit_iter = next(mime_documents_iter)
+ if commit_hdrs.get('X-Document', None) == "put commit":
+ rcvd_commit = True
+ drain(commit_iter, self.network_chunk_size, self.client_timeout)
+ except ChunkReadTimeout:
+ raise HTTPClientDisconnect()
+ except StopIteration:
+ raise HTTPBadRequest(body="couldn't find PUT commit MIME doc")
+ return rcvd_commit
+
+ def _read_metadata_footer(self, mime_documents_iter):
+ try:
+ with ChunkReadTimeout(self.client_timeout):
+ footer_hdrs, footer_iter = next(mime_documents_iter)
+ except ChunkReadTimeout:
+ raise HTTPClientDisconnect()
+ except StopIteration:
+ raise HTTPBadRequest(body="couldn't find footer MIME doc")
+
+ timeout_reader = self._make_timeout_reader(footer_iter)
+ try:
+ footer_body = ''.join(iter(timeout_reader, ''))
+ except ChunkReadTimeout:
+ raise HTTPClientDisconnect()
+
+ footer_md5 = footer_hdrs.get('Content-MD5')
+ if not footer_md5:
+ raise HTTPBadRequest(body="no Content-MD5 in footer")
+ if footer_md5 != md5(footer_body).hexdigest():
+ raise HTTPUnprocessableEntity(body="footer MD5 mismatch")
+
+ try:
+ return HeaderKeyDict(json.loads(footer_body))
+ except ValueError:
+ raise HTTPBadRequest("invalid JSON for footer doc")
+
+ def _check_container_override(self, update_headers, metadata):
+ for key, val in metadata.iteritems():
+ override_prefix = 'x-backend-container-update-override-'
+ if key.lower().startswith(override_prefix):
+ override = key.lower().replace(override_prefix, 'x-')
+ update_headers[override] = val
@public
@timing_stats()
def POST(self, request):
"""Handle HTTP POST requests for the Swift Object Server."""
- device, partition, account, container, obj, policy_idx = \
+ device, partition, account, container, obj, policy = \
get_name_and_placement(request, 5, 5, True)
req_timestamp = valid_timestamp(request)
new_delete_at = int(request.headers.get('X-Delete-At') or 0)
@@ -349,7 +431,7 @@ class ObjectController(BaseStorageServer):
try:
disk_file = self.get_diskfile(
device, partition, account, container, obj,
- policy_idx=policy_idx)
+ policy=policy)
except DiskFileDeviceUnavailable:
return HTTPInsufficientStorage(drive=device, request=request)
try:
@@ -374,11 +456,11 @@ class ObjectController(BaseStorageServer):
if orig_delete_at != new_delete_at:
if new_delete_at:
self.delete_at_update('PUT', new_delete_at, account, container,
- obj, request, device, policy_idx)
+ obj, request, device, policy)
if orig_delete_at:
self.delete_at_update('DELETE', orig_delete_at, account,
container, obj, request, device,
- policy_idx)
+ policy)
try:
disk_file.write_metadata(metadata)
except (DiskFileXattrNotSupported, DiskFileNoSpace):
@@ -389,7 +471,7 @@ class ObjectController(BaseStorageServer):
@timing_stats()
def PUT(self, request):
"""Handle HTTP PUT requests for the Swift Object Server."""
- device, partition, account, container, obj, policy_idx = \
+ device, partition, account, container, obj, policy = \
get_name_and_placement(request, 5, 5, True)
req_timestamp = valid_timestamp(request)
error_response = check_object_creation(request, obj)
@@ -404,10 +486,22 @@ class ObjectController(BaseStorageServer):
except ValueError as e:
return HTTPBadRequest(body=str(e), request=request,
content_type='text/plain')
+
+ # In case of multipart-MIME put, the proxy sends a chunked request,
+ # but may let us know the real content length so we can verify that
+ # we have enough disk space to hold the object.
+ if fsize is None:
+ fsize = request.headers.get('X-Backend-Obj-Content-Length')
+ if fsize is not None:
+ try:
+ fsize = int(fsize)
+ except ValueError as e:
+ return HTTPBadRequest(body=str(e), request=request,
+ content_type='text/plain')
try:
disk_file = self.get_diskfile(
device, partition, account, container, obj,
- policy_idx=policy_idx)
+ policy=policy)
except DiskFileDeviceUnavailable:
return HTTPInsufficientStorage(drive=device, request=request)
try:
@@ -439,13 +533,51 @@ class ObjectController(BaseStorageServer):
with disk_file.create(size=fsize) as writer:
upload_size = 0
- def timeout_reader():
- with ChunkReadTimeout(self.client_timeout):
- return request.environ['wsgi.input'].read(
- self.network_chunk_size)
+ # If the proxy wants to send us object metadata after the
+ # object body, it sets some headers. We have to tell the
+ # proxy, in the 100 Continue response, that we're able to
+ # parse a multipart MIME document and extract the object and
+ # metadata from it. If we don't, then the proxy won't
+ # actually send the footer metadata.
+ have_metadata_footer = False
+ use_multiphase_commit = False
+ mime_documents_iter = iter([])
+ obj_input = request.environ['wsgi.input']
+
+ hundred_continue_headers = []
+ if config_true_value(
+ request.headers.get(
+ 'X-Backend-Obj-Multiphase-Commit')):
+ use_multiphase_commit = True
+ hundred_continue_headers.append(
+ ('X-Obj-Multiphase-Commit', 'yes'))
+
+ if config_true_value(
+ request.headers.get('X-Backend-Obj-Metadata-Footer')):
+ have_metadata_footer = True
+ hundred_continue_headers.append(
+ ('X-Obj-Metadata-Footer', 'yes'))
+
+ if have_metadata_footer or use_multiphase_commit:
+ obj_input.set_hundred_continue_response_headers(
+ hundred_continue_headers)
+ mime_boundary = request.headers.get(
+ 'X-Backend-Obj-Multipart-Mime-Boundary')
+ if not mime_boundary:
+ return HTTPBadRequest("no MIME boundary")
+ try:
+ with ChunkReadTimeout(self.client_timeout):
+ mime_documents_iter = iter_mime_headers_and_bodies(
+ request.environ['wsgi.input'],
+ mime_boundary, self.network_chunk_size)
+ _junk_hdrs, obj_input = next(mime_documents_iter)
+ except ChunkReadTimeout:
+ return HTTPRequestTimeout(request=request)
+
+ timeout_reader = self._make_timeout_reader(obj_input)
try:
- for chunk in iter(lambda: timeout_reader(), ''):
+ for chunk in iter(timeout_reader, ''):
start_time = time.time()
if start_time > upload_expiration:
self.logger.increment('PUT.timeouts')
@@ -461,9 +593,16 @@ class ObjectController(BaseStorageServer):
upload_size)
if fsize is not None and fsize != upload_size:
return HTTPClientDisconnect(request=request)
+
+ footer_meta = {}
+ if have_metadata_footer:
+ footer_meta = self._read_metadata_footer(
+ mime_documents_iter)
+
+ request_etag = (footer_meta.get('etag') or
+ request.headers.get('etag', '')).lower()
etag = etag.hexdigest()
- if 'etag' in request.headers and \
- request.headers['etag'].lower() != etag:
+ if request_etag and request_etag != etag:
return HTTPUnprocessableEntity(request=request)
metadata = {
'X-Timestamp': request.timestamp.internal,
@@ -473,6 +612,8 @@ class ObjectController(BaseStorageServer):
}
metadata.update(val for val in request.headers.iteritems()
if is_sys_or_user_meta('object', val[0]))
+ metadata.update(val for val in footer_meta.iteritems()
+ if is_sys_or_user_meta('object', val[0]))
headers_to_copy = (
request.headers.get(
'X-Backend-Replication-Headers', '').split() +
@@ -482,39 +623,63 @@ class ObjectController(BaseStorageServer):
header_caps = header_key.title()
metadata[header_caps] = request.headers[header_key]
writer.put(metadata)
+
+ # if the PUT requires a two-phase commit (a data and a commit
+ # phase) send the proxy server another 100-continue response
+ # to indicate that we are finished writing object data
+ if use_multiphase_commit:
+ request.environ['wsgi.input'].\
+ send_hundred_continue_response()
+ if not self._read_put_commit_message(mime_documents_iter):
+ return HTTPServerError(request=request)
+ # got 2nd phase confirmation, write a timestamp.durable
+ # state file to indicate a successful PUT
+
+ writer.commit(request.timestamp)
+
+ # Drain any remaining MIME docs from the socket. There
+ # shouldn't be any, but we must read the whole request body.
+ try:
+ while True:
+ with ChunkReadTimeout(self.client_timeout):
+ _junk_hdrs, _junk_body = next(mime_documents_iter)
+ drain(_junk_body, self.network_chunk_size,
+ self.client_timeout)
+ except ChunkReadTimeout:
+ raise HTTPClientDisconnect()
+ except StopIteration:
+ pass
+
except (DiskFileXattrNotSupported, DiskFileNoSpace):
return HTTPInsufficientStorage(drive=device, request=request)
if orig_delete_at != new_delete_at:
if new_delete_at:
self.delete_at_update(
'PUT', new_delete_at, account, container, obj, request,
- device, policy_idx)
+ device, policy)
if orig_delete_at:
self.delete_at_update(
'DELETE', orig_delete_at, account, container, obj,
- request, device, policy_idx)
+ request, device, policy)
update_headers = HeaderKeyDict({
'x-size': metadata['Content-Length'],
'x-content-type': metadata['Content-Type'],
'x-timestamp': metadata['X-Timestamp'],
'x-etag': metadata['ETag']})
# apply any container update header overrides sent with request
- for key, val in request.headers.iteritems():
- override_prefix = 'x-backend-container-update-override-'
- if key.lower().startswith(override_prefix):
- override = key.lower().replace(override_prefix, 'x-')
- update_headers[override] = val
+ self._check_container_override(update_headers, request.headers)
+ self._check_container_override(update_headers, footer_meta)
self.container_update(
'PUT', account, container, obj, request,
update_headers,
- device, policy_idx)
+ device, policy)
return HTTPCreated(request=request, etag=etag)
@public
@timing_stats()
def GET(self, request):
"""Handle HTTP GET requests for the Swift Object Server."""
- device, partition, account, container, obj, policy_idx = \
+ device, partition, account, container, obj, policy = \
get_name_and_placement(request, 5, 5, True)
keep_cache = self.keep_cache_private or (
'X-Auth-Token' not in request.headers and
@@ -522,7 +687,7 @@ class ObjectController(BaseStorageServer):
try:
disk_file = self.get_diskfile(
device, partition, account, container, obj,
- policy_idx=policy_idx)
+ policy=policy)
except DiskFileDeviceUnavailable:
return HTTPInsufficientStorage(drive=device, request=request)
try:
@@ -533,9 +698,14 @@ class ObjectController(BaseStorageServer):
keep_cache = (self.keep_cache_private or
('X-Auth-Token' not in request.headers and
'X-Storage-Token' not in request.headers))
+ conditional_etag = None
+ if 'X-Backend-Etag-Is-At' in request.headers:
+ conditional_etag = metadata.get(
+ request.headers['X-Backend-Etag-Is-At'])
response = Response(
app_iter=disk_file.reader(keep_cache=keep_cache),
- request=request, conditional_response=True)
+ request=request, conditional_response=True,
+ conditional_etag=conditional_etag)
response.headers['Content-Type'] = metadata.get(
'Content-Type', 'application/octet-stream')
for key, value in metadata.iteritems():
@@ -567,12 +737,12 @@ class ObjectController(BaseStorageServer):
@timing_stats(sample_rate=0.8)
def HEAD(self, request):
"""Handle HTTP HEAD requests for the Swift Object Server."""
- device, partition, account, container, obj, policy_idx = \
+ device, partition, account, container, obj, policy = \
get_name_and_placement(request, 5, 5, True)
try:
disk_file = self.get_diskfile(
device, partition, account, container, obj,
- policy_idx=policy_idx)
+ policy=policy)
except DiskFileDeviceUnavailable:
return HTTPInsufficientStorage(drive=device, request=request)
try:
@@ -585,7 +755,12 @@ class ObjectController(BaseStorageServer):
headers['X-Backend-Timestamp'] = e.timestamp.internal
return HTTPNotFound(request=request, headers=headers,
conditional_response=True)
- response = Response(request=request, conditional_response=True)
+ conditional_etag = None
+ if 'X-Backend-Etag-Is-At' in request.headers:
+ conditional_etag = metadata.get(
+ request.headers['X-Backend-Etag-Is-At'])
+ response = Response(request=request, conditional_response=True,
+ conditional_etag=conditional_etag)
response.headers['Content-Type'] = metadata.get(
'Content-Type', 'application/octet-stream')
for key, value in metadata.iteritems():
@@ -609,13 +784,13 @@ class ObjectController(BaseStorageServer):
@timing_stats()
def DELETE(self, request):
"""Handle HTTP DELETE requests for the Swift Object Server."""
- device, partition, account, container, obj, policy_idx = \
+ device, partition, account, container, obj, policy = \
get_name_and_placement(request, 5, 5, True)
req_timestamp = valid_timestamp(request)
try:
disk_file = self.get_diskfile(
device, partition, account, container, obj,
- policy_idx=policy_idx)
+ policy=policy)
except DiskFileDeviceUnavailable:
return HTTPInsufficientStorage(drive=device, request=request)
try:
@@ -667,13 +842,13 @@ class ObjectController(BaseStorageServer):
if orig_delete_at:
self.delete_at_update('DELETE', orig_delete_at, account,
container, obj, request, device,
- policy_idx)
+ policy)
if orig_timestamp < req_timestamp:
disk_file.delete(req_timestamp)
self.container_update(
'DELETE', account, container, obj, request,
HeaderKeyDict({'x-timestamp': req_timestamp.internal}),
- device, policy_idx)
+ device, policy)
return response_class(
request=request,
headers={'X-Backend-Timestamp': response_timestamp.internal})
@@ -685,12 +860,17 @@ class ObjectController(BaseStorageServer):
"""
Handle REPLICATE requests for the Swift Object Server. This is used
by the object replicator to get hashes for directories.
+
+ Note that the name REPLICATE is preserved for historical reasons as
+ this verb really just returns the hashes information for the specified
+ parameters and is used, for example, by both replication and EC.
"""
- device, partition, suffix, policy_idx = \
+ device, partition, suffix_parts, policy = \
get_name_and_placement(request, 2, 3, True)
+ suffixes = suffix_parts.split('-') if suffix_parts else []
try:
- hashes = self._diskfile_mgr.get_hashes(device, partition, suffix,
- policy_idx)
+ hashes = self._diskfile_router[policy].get_hashes(
+ device, partition, suffixes, policy)
except DiskFileDeviceUnavailable:
resp = HTTPInsufficientStorage(drive=device, request=request)
else:
@@ -700,7 +880,7 @@ class ObjectController(BaseStorageServer):
@public
@replication
@timing_stats(sample_rate=0.1)
- def REPLICATION(self, request):
+ def SSYNC(self, request):
return Response(app_iter=ssync_receiver.Receiver(self, request)())
def __call__(self, env, start_response):
@@ -734,7 +914,7 @@ class ObjectController(BaseStorageServer):
trans_time = time.time() - start_time
if self.log_requests:
log_line = get_log_line(req, res, trans_time, '')
- if req.method in ('REPLICATE', 'REPLICATION') or \
+ if req.method in ('REPLICATE', 'SSYNC') or \
'X-Backend-Replication' in req.headers:
self.logger.debug(log_line)
else:
diff --git a/swift/obj/ssync_receiver.py b/swift/obj/ssync_receiver.py
index 248715d00..b636a1624 100644
--- a/swift/obj/ssync_receiver.py
+++ b/swift/obj/ssync_receiver.py
@@ -24,27 +24,28 @@ from swift.common import exceptions
from swift.common import http
from swift.common import swob
from swift.common import utils
+from swift.common import request_helpers
class Receiver(object):
"""
- Handles incoming REPLICATION requests to the object server.
+ Handles incoming SSYNC requests to the object server.
These requests come from the object-replicator daemon that uses
:py:mod:`.ssync_sender`.
- The number of concurrent REPLICATION requests is restricted by
+ The number of concurrent SSYNC requests is restricted by
use of a replication_semaphore and can be configured with the
object-server.conf [object-server] replication_concurrency
setting.
- A REPLICATION request is really just an HTTP conduit for
+ An SSYNC request is really just an HTTP conduit for
sender/receiver replication communication. The overall
- REPLICATION request should always succeed, but it will contain
+ SSYNC request should always succeed, but it will contain
multiple requests within its request and response bodies. This
"hack" is done so that replication concurrency can be managed.
- The general process inside a REPLICATION request is:
+ The general process inside an SSYNC request is:
1. Initialize the request: Basic request validation, mount check,
acquire semaphore lock, etc..
@@ -72,10 +73,10 @@ class Receiver(object):
def __call__(self):
"""
- Processes a REPLICATION request.
+ Processes an SSYNC request.
Acquires a semaphore lock and then proceeds through the steps
- of the REPLICATION process.
+ of the SSYNC process.
"""
# The general theme for functions __call__ calls is that they should
# raise exceptions.MessageTimeout for client timeouts (logged locally),
@@ -88,7 +89,7 @@ class Receiver(object):
try:
# Double try blocks in case our main error handlers fail.
try:
- # intialize_request is for preamble items that can be done
+ # initialize_request is for preamble items that can be done
# outside a replication semaphore lock.
for data in self.initialize_request():
yield data
@@ -98,7 +99,7 @@ class Receiver(object):
if not self.app.replication_semaphore.acquire(False):
raise swob.HTTPServiceUnavailable()
try:
- with self.app._diskfile_mgr.replication_lock(self.device):
+ with self.diskfile_mgr.replication_lock(self.device):
for data in self.missing_check():
yield data
for data in self.updates():
@@ -111,7 +112,7 @@ class Receiver(object):
self.app.replication_semaphore.release()
except exceptions.ReplicationLockTimeout as err:
self.app.logger.debug(
- '%s/%s/%s REPLICATION LOCK TIMEOUT: %s' % (
+ '%s/%s/%s SSYNC LOCK TIMEOUT: %s' % (
self.request.remote_addr, self.device, self.partition,
err))
yield ':ERROR: %d %r\n' % (0, str(err))
@@ -166,14 +167,17 @@ class Receiver(object):
"""
# The following is the setting we talk about above in _ensure_flush.
self.request.environ['eventlet.minimum_write_chunk_size'] = 0
- self.device, self.partition = utils.split_path(
- urllib.unquote(self.request.path), 2, 2, False)
- self.policy_idx = \
- int(self.request.headers.get('X-Backend-Storage-Policy-Index', 0))
+ self.device, self.partition, self.policy = \
+ request_helpers.get_name_and_placement(self.request, 2, 2, False)
+ if 'X-Backend-Ssync-Frag-Index' in self.request.headers:
+ self.frag_index = int(
+ self.request.headers['X-Backend-Ssync-Frag-Index'])
+ else:
+ self.frag_index = None
utils.validate_device_partition(self.device, self.partition)
- if self.app._diskfile_mgr.mount_check and \
- not constraints.check_mount(
- self.app._diskfile_mgr.devices, self.device):
+ self.diskfile_mgr = self.app._diskfile_router[self.policy]
+ if self.diskfile_mgr.mount_check and not constraints.check_mount(
+ self.diskfile_mgr.devices, self.device):
raise swob.HTTPInsufficientStorage(drive=self.device)
self.fp = self.request.environ['wsgi.input']
for data in self._ensure_flush():
@@ -182,7 +186,7 @@ class Receiver(object):
def missing_check(self):
"""
Handles the receiver-side of the MISSING_CHECK step of a
- REPLICATION request.
+ SSYNC request.
Receives a list of hashes and timestamps of object
information the sender can provide and responds with a list
@@ -226,11 +230,13 @@ class Receiver(object):
line = self.fp.readline(self.app.network_chunk_size)
if not line or line.strip() == ':MISSING_CHECK: END':
break
- object_hash, timestamp = [urllib.unquote(v) for v in line.split()]
+ parts = line.split()
+ object_hash, timestamp = [urllib.unquote(v) for v in parts[:2]]
want = False
try:
- df = self.app._diskfile_mgr.get_diskfile_from_hash(
- self.device, self.partition, object_hash, self.policy_idx)
+ df = self.diskfile_mgr.get_diskfile_from_hash(
+ self.device, self.partition, object_hash, self.policy,
+ frag_index=self.frag_index)
except exceptions.DiskFileNotExist:
want = True
else:
@@ -253,7 +259,7 @@ class Receiver(object):
def updates(self):
"""
- Handles the UPDATES step of a REPLICATION request.
+ Handles the UPDATES step of an SSYNC request.
Receives a set of PUT and DELETE subrequests that will be
routed to the object server itself for processing. These
@@ -353,7 +359,7 @@ class Receiver(object):
subreq_iter())
else:
raise Exception('Invalid subrequest method %s' % method)
- subreq.headers['X-Backend-Storage-Policy-Index'] = self.policy_idx
+ subreq.headers['X-Backend-Storage-Policy-Index'] = int(self.policy)
subreq.headers['X-Backend-Replication'] = 'True'
if replication_headers:
subreq.headers['X-Backend-Replication-Headers'] = \
diff --git a/swift/obj/ssync_sender.py b/swift/obj/ssync_sender.py
index 1058ab262..8e9202c00 100644
--- a/swift/obj/ssync_sender.py
+++ b/swift/obj/ssync_sender.py
@@ -22,7 +22,7 @@ from swift.common import http
class Sender(object):
"""
- Sends REPLICATION requests to the object server.
+ Sends SSYNC requests to the object server.
These requests are eventually handled by
:py:mod:`.ssync_receiver` and full documentation about the
@@ -31,6 +31,7 @@ class Sender(object):
def __init__(self, daemon, node, job, suffixes, remote_check_objs=None):
self.daemon = daemon
+ self.df_mgr = self.daemon._diskfile_mgr
self.node = node
self.job = job
self.suffixes = suffixes
@@ -38,28 +39,28 @@ class Sender(object):
self.response = None
self.response_buffer = ''
self.response_chunk_left = 0
- self.available_set = set()
+ # available_map has an entry for each object in given suffixes that
+ # is available to be sync'd; each entry is a hash => timestamp
+ self.available_map = {}
# When remote_check_objs is given in job, ssync_sender trys only to
# make sure those objects exist or not in remote.
self.remote_check_objs = remote_check_objs
+ # send_list has an entry for each object that the receiver wants to
+ # be sync'ed; each entry is an object hash
self.send_list = []
self.failures = 0
- @property
- def policy_idx(self):
- return int(self.job.get('policy_idx', 0))
-
def __call__(self):
"""
Perform ssync with remote node.
- :returns: a 2-tuple, in the form (success, can_delete_objs).
-
- Success is a boolean, and can_delete_objs is an iterable of strings
- representing the hashes which are in sync with the remote node.
+ :returns: a 2-tuple, in the form (success, can_delete_objs) where
+ success is a boolean and can_delete_objs is the map of
+ objects that are in sync with the receiver. Each entry in
+ can_delete_objs maps a hash => timestamp
"""
if not self.suffixes:
- return True, set()
+ return True, {}
try:
# Double try blocks in case our main error handler fails.
try:
@@ -72,18 +73,20 @@ class Sender(object):
self.missing_check()
if self.remote_check_objs is None:
self.updates()
- can_delete_obj = self.available_set
+ can_delete_obj = self.available_map
else:
# when we are initialized with remote_check_objs we don't
# *send* any requested updates; instead we only collect
# what's already in sync and safe for deletion
- can_delete_obj = self.available_set.difference(
- self.send_list)
+ in_sync_hashes = (set(self.available_map.keys()) -
+ set(self.send_list))
+ can_delete_obj = dict((hash_, self.available_map[hash_])
+ for hash_ in in_sync_hashes)
self.disconnect()
if not self.failures:
return True, can_delete_obj
else:
- return False, set()
+ return False, {}
except (exceptions.MessageTimeout,
exceptions.ReplicationException) as err:
self.daemon.logger.error(
@@ -109,11 +112,11 @@ class Sender(object):
# would only get called if the above except Exception handler
# failed (bad node or job data).
self.daemon.logger.exception('EXCEPTION in replication.Sender')
- return False, set()
+ return False, {}
def connect(self):
"""
- Establishes a connection and starts a REPLICATION request
+ Establishes a connection and starts an SSYNC request
with the object server.
"""
with exceptions.MessageTimeout(
@@ -121,11 +124,13 @@ class Sender(object):
self.connection = bufferedhttp.BufferedHTTPConnection(
'%s:%s' % (self.node['replication_ip'],
self.node['replication_port']))
- self.connection.putrequest('REPLICATION', '/%s/%s' % (
+ self.connection.putrequest('SSYNC', '/%s/%s' % (
self.node['device'], self.job['partition']))
self.connection.putheader('Transfer-Encoding', 'chunked')
self.connection.putheader('X-Backend-Storage-Policy-Index',
- self.policy_idx)
+ int(self.job['policy']))
+ self.connection.putheader('X-Backend-Ssync-Frag-Index',
+ self.node['index'])
self.connection.endheaders()
with exceptions.MessageTimeout(
self.daemon.node_timeout, 'connect receive'):
@@ -137,7 +142,7 @@ class Sender(object):
def readline(self):
"""
- Reads a line from the REPLICATION response body.
+ Reads a line from the SSYNC response body.
httplib has no readline and will block on read(x) until x is
read, so we have to do the work ourselves. A bit of this is
@@ -183,7 +188,7 @@ class Sender(object):
def missing_check(self):
"""
Handles the sender-side of the MISSING_CHECK step of a
- REPLICATION request.
+ SSYNC request.
Full documentation of this can be found at
:py:meth:`.Receiver.missing_check`.
@@ -193,14 +198,15 @@ class Sender(object):
self.daemon.node_timeout, 'missing_check start'):
msg = ':MISSING_CHECK: START\r\n'
self.connection.send('%x\r\n%s\r\n' % (len(msg), msg))
- hash_gen = self.daemon._diskfile_mgr.yield_hashes(
+ hash_gen = self.df_mgr.yield_hashes(
self.job['device'], self.job['partition'],
- self.policy_idx, self.suffixes)
+ self.job['policy'], self.suffixes,
+ frag_index=self.job.get('frag_index'))
if self.remote_check_objs is not None:
hash_gen = ifilter(lambda (path, object_hash, timestamp):
object_hash in self.remote_check_objs, hash_gen)
for path, object_hash, timestamp in hash_gen:
- self.available_set.add(object_hash)
+ self.available_map[object_hash] = timestamp
with exceptions.MessageTimeout(
self.daemon.node_timeout,
'missing_check send line'):
@@ -234,12 +240,13 @@ class Sender(object):
line = line.strip()
if line == ':MISSING_CHECK: END':
break
- if line:
- self.send_list.append(line)
+ parts = line.split()
+ if parts:
+ self.send_list.append(parts[0])
def updates(self):
"""
- Handles the sender-side of the UPDATES step of a REPLICATION
+ Handles the sender-side of the UPDATES step of an SSYNC
request.
Full documentation of this can be found at
@@ -252,15 +259,19 @@ class Sender(object):
self.connection.send('%x\r\n%s\r\n' % (len(msg), msg))
for object_hash in self.send_list:
try:
- df = self.daemon._diskfile_mgr.get_diskfile_from_hash(
+ df = self.df_mgr.get_diskfile_from_hash(
self.job['device'], self.job['partition'], object_hash,
- self.policy_idx)
+ self.job['policy'], frag_index=self.job.get('frag_index'))
except exceptions.DiskFileNotExist:
continue
url_path = urllib.quote(
'/%s/%s/%s' % (df.account, df.container, df.obj))
try:
df.open()
+ # EC reconstructor may have passed a callback to build
+ # an alternative diskfile...
+ df = self.job.get('sync_diskfile_builder', lambda *args: df)(
+ self.job, self.node, df.get_metadata())
except exceptions.DiskFileDeleted as err:
self.send_delete(url_path, err.timestamp)
except exceptions.DiskFileError:
@@ -328,7 +339,7 @@ class Sender(object):
def disconnect(self):
"""
Closes down the connection to the object server once done
- with the REPLICATION request.
+ with the SSYNC request.
"""
try:
with exceptions.MessageTimeout(
diff --git a/swift/obj/updater.py b/swift/obj/updater.py
index 6c40c456a..f5d1f37fa 100644
--- a/swift/obj/updater.py
+++ b/swift/obj/updater.py
@@ -29,7 +29,8 @@ from swift.common.ring import Ring
from swift.common.utils import get_logger, renamer, write_pickle, \
dump_recon_cache, config_true_value, ismount
from swift.common.daemon import Daemon
-from swift.obj.diskfile import get_tmp_dir, get_async_dir, ASYNCDIR_BASE
+from swift.common.storage_policy import split_policy_string, PolicyError
+from swift.obj.diskfile import get_tmp_dir, ASYNCDIR_BASE
from swift.common.http import is_success, HTTP_NOT_FOUND, \
HTTP_INTERNAL_SERVER_ERROR
@@ -148,28 +149,19 @@ class ObjectUpdater(Daemon):
start_time = time.time()
# loop through async pending dirs for all policies
for asyncdir in self._listdir(device):
- # skip stuff like "accounts", "containers", etc.
- if not (asyncdir == ASYNCDIR_BASE or
- asyncdir.startswith(ASYNCDIR_BASE + '-')):
- continue
-
# we only care about directories
async_pending = os.path.join(device, asyncdir)
if not os.path.isdir(async_pending):
continue
-
- if asyncdir == ASYNCDIR_BASE:
- policy_idx = 0
- else:
- _junk, policy_idx = asyncdir.split('-', 1)
- try:
- policy_idx = int(policy_idx)
- get_async_dir(policy_idx)
- except ValueError:
- self.logger.warn(_('Directory %s does not map to a '
- 'valid policy') % asyncdir)
- continue
-
+ if not asyncdir.startswith(ASYNCDIR_BASE):
+ # skip stuff like "accounts", "containers", etc.
+ continue
+ try:
+ base, policy = split_policy_string(asyncdir)
+ except PolicyError as e:
+ self.logger.warn(_('Directory %r does not map '
+ 'to a valid policy (%s)') % (asyncdir, e))
+ continue
for prefix in self._listdir(async_pending):
prefix_path = os.path.join(async_pending, prefix)
if not os.path.isdir(prefix_path):
@@ -193,7 +185,7 @@ class ObjectUpdater(Daemon):
os.unlink(update_path)
else:
self.process_object_update(update_path, device,
- policy_idx)
+ policy)
last_obj_hash = obj_hash
time.sleep(self.slowdown)
try:
@@ -202,13 +194,13 @@ class ObjectUpdater(Daemon):
pass
self.logger.timing_since('timing', start_time)
- def process_object_update(self, update_path, device, policy_idx):
+ def process_object_update(self, update_path, device, policy):
"""
Process the object information to be updated and update.
:param update_path: path to pickled object update file
:param device: path to device
- :param policy_idx: storage policy index of object update
+ :param policy: storage policy of object update
"""
try:
update = pickle.load(open(update_path, 'rb'))
@@ -228,7 +220,7 @@ class ObjectUpdater(Daemon):
headers_out = update['headers'].copy()
headers_out['user-agent'] = 'object-updater %s' % os.getpid()
headers_out.setdefault('X-Backend-Storage-Policy-Index',
- str(policy_idx))
+ str(int(policy)))
events = [spawn(self.object_update,
node, part, update['op'], obj, headers_out)
for node in nodes if node['id'] not in successes]
@@ -256,7 +248,7 @@ class ObjectUpdater(Daemon):
if new_successes:
update['successes'] = successes
write_pickle(update, update_path, os.path.join(
- device, get_tmp_dir(policy_idx)))
+ device, get_tmp_dir(policy)))
def object_update(self, node, part, op, obj, headers_out):
"""
diff --git a/swift/proxy/controllers/__init__.py b/swift/proxy/controllers/__init__.py
index de4c0145b..706fd9165 100644
--- a/swift/proxy/controllers/__init__.py
+++ b/swift/proxy/controllers/__init__.py
@@ -13,7 +13,7 @@
from swift.proxy.controllers.base import Controller
from swift.proxy.controllers.info import InfoController
-from swift.proxy.controllers.obj import ObjectController
+from swift.proxy.controllers.obj import ObjectControllerRouter
from swift.proxy.controllers.account import AccountController
from swift.proxy.controllers.container import ContainerController
@@ -22,5 +22,5 @@ __all__ = [
'ContainerController',
'Controller',
'InfoController',
- 'ObjectController',
+ 'ObjectControllerRouter',
]
diff --git a/swift/proxy/controllers/account.py b/swift/proxy/controllers/account.py
index ea2f8ae33..915e1c481 100644
--- a/swift/proxy/controllers/account.py
+++ b/swift/proxy/controllers/account.py
@@ -58,9 +58,10 @@ class AccountController(Controller):
constraints.MAX_ACCOUNT_NAME_LENGTH)
return resp
- partition, nodes = self.app.account_ring.get_nodes(self.account_name)
+ partition = self.app.account_ring.get_part(self.account_name)
+ node_iter = self.app.iter_nodes(self.app.account_ring, partition)
resp = self.GETorHEAD_base(
- req, _('Account'), self.app.account_ring, partition,
+ req, _('Account'), node_iter, partition,
req.swift_entity_path.rstrip('/'))
if resp.status_int == HTTP_NOT_FOUND:
if resp.headers.get('X-Account-Status', '').lower() == 'deleted':
diff --git a/swift/proxy/controllers/base.py b/swift/proxy/controllers/base.py
index 0aeb803f1..ca12d343e 100644
--- a/swift/proxy/controllers/base.py
+++ b/swift/proxy/controllers/base.py
@@ -28,6 +28,7 @@ import os
import time
import functools
import inspect
+import logging
import operator
from sys import exc_info
from swift import gettext_ as _
@@ -39,14 +40,14 @@ from eventlet.timeout import Timeout
from swift.common.wsgi import make_pre_authed_env
from swift.common.utils import Timestamp, config_true_value, \
public, split_path, list_from_csv, GreenthreadSafeIterator, \
- quorum_size, GreenAsyncPile
+ GreenAsyncPile, quorum_size, parse_content_range
from swift.common.bufferedhttp import http_connect
from swift.common.exceptions import ChunkReadTimeout, ChunkWriteTimeout, \
ConnectionTimeout
from swift.common.http import is_informational, is_success, is_redirection, \
is_server_error, HTTP_OK, HTTP_PARTIAL_CONTENT, HTTP_MULTIPLE_CHOICES, \
HTTP_BAD_REQUEST, HTTP_NOT_FOUND, HTTP_SERVICE_UNAVAILABLE, \
- HTTP_INSUFFICIENT_STORAGE, HTTP_UNAUTHORIZED
+ HTTP_INSUFFICIENT_STORAGE, HTTP_UNAUTHORIZED, HTTP_CONTINUE
from swift.common.swob import Request, Response, HeaderKeyDict, Range, \
HTTPException, HTTPRequestedRangeNotSatisfiable
from swift.common.request_helpers import strip_sys_meta_prefix, \
@@ -593,16 +594,37 @@ def close_swift_conn(src):
pass
+def bytes_to_skip(record_size, range_start):
+ """
+ Assume an object is composed of N records, where the first N-1 are all
+ the same size and the last is at most that large, but may be smaller.
+
+ When a range request is made, it might start with a partial record. This
+ must be discarded, lest the consumer get bad data. This is particularly
+ true of suffix-byte-range requests, e.g. "Range: bytes=-12345" where the
+ size of the object is unknown at the time the request is made.
+
+ This function computes the number of bytes that must be discarded to
+ ensure only whole records are yielded. Erasure-code decoding needs this.
+
+ This function could have been inlined, but it took enough tries to get
+ right that some targeted unit tests were desirable, hence its extraction.
+ """
+ return (record_size - (range_start % record_size)) % record_size
+
+
class GetOrHeadHandler(object):
- def __init__(self, app, req, server_type, ring, partition, path,
- backend_headers):
+ def __init__(self, app, req, server_type, node_iter, partition, path,
+ backend_headers, client_chunk_size=None):
self.app = app
- self.ring = ring
+ self.node_iter = node_iter
self.server_type = server_type
self.partition = partition
self.path = path
self.backend_headers = backend_headers
+ self.client_chunk_size = client_chunk_size
+ self.skip_bytes = 0
self.used_nodes = []
self.used_source_etag = ''
@@ -649,6 +671,35 @@ class GetOrHeadHandler(object):
else:
self.backend_headers['Range'] = 'bytes=%d-' % num_bytes
+ def learn_size_from_content_range(self, start, end):
+ """
+ If client_chunk_size is set, makes sure we yield things starting on
+ chunk boundaries based on the Content-Range header in the response.
+
+ Sets our first Range header to the value learned from the
+ Content-Range header in the response; if we were given a
+ fully-specified range (e.g. "bytes=123-456"), this is a no-op.
+
+ If we were given a half-specified range (e.g. "bytes=123-" or
+ "bytes=-456"), then this changes the Range header to a
+ semantically-equivalent one *and* it lets us resume on a proper
+ boundary instead of just in the middle of a piece somewhere.
+
+ If the original request is for more than one range, this does not
+ affect our backend Range header, since we don't support resuming one
+ of those anyway.
+ """
+ if self.client_chunk_size:
+ self.skip_bytes = bytes_to_skip(self.client_chunk_size, start)
+
+ if 'Range' in self.backend_headers:
+ req_range = Range(self.backend_headers['Range'])
+
+ if len(req_range.ranges) > 1:
+ return
+
+ self.backend_headers['Range'] = "bytes=%d-%d" % (start, end)
+
def is_good_source(self, src):
"""
Indicates whether or not the request made to the backend found
@@ -674,42 +725,74 @@ class GetOrHeadHandler(object):
"""
try:
nchunks = 0
- bytes_read_from_source = 0
+ client_chunk_size = self.client_chunk_size
+ bytes_consumed_from_backend = 0
node_timeout = self.app.node_timeout
if self.server_type == 'Object':
node_timeout = self.app.recoverable_node_timeout
+ buf = ''
while True:
try:
with ChunkReadTimeout(node_timeout):
chunk = source.read(self.app.object_chunk_size)
nchunks += 1
- bytes_read_from_source += len(chunk)
+ buf += chunk
except ChunkReadTimeout:
exc_type, exc_value, exc_traceback = exc_info()
if self.newest or self.server_type != 'Object':
raise exc_type, exc_value, exc_traceback
try:
- self.fast_forward(bytes_read_from_source)
+ self.fast_forward(bytes_consumed_from_backend)
except (NotImplementedError, HTTPException, ValueError):
raise exc_type, exc_value, exc_traceback
+ buf = ''
new_source, new_node = self._get_source_and_node()
if new_source:
self.app.exception_occurred(
node, _('Object'),
- _('Trying to read during GET (retrying)'))
+ _('Trying to read during GET (retrying)'),
+ level=logging.ERROR, exc_info=(
+ exc_type, exc_value, exc_traceback))
# Close-out the connection as best as possible.
if getattr(source, 'swift_conn', None):
close_swift_conn(source)
source = new_source
node = new_node
- bytes_read_from_source = 0
continue
else:
raise exc_type, exc_value, exc_traceback
+
+ if buf and self.skip_bytes:
+ if self.skip_bytes < len(buf):
+ buf = buf[self.skip_bytes:]
+ bytes_consumed_from_backend += self.skip_bytes
+ self.skip_bytes = 0
+ else:
+ self.skip_bytes -= len(buf)
+ bytes_consumed_from_backend += len(buf)
+ buf = ''
+
if not chunk:
+ if buf:
+ with ChunkWriteTimeout(self.app.client_timeout):
+ bytes_consumed_from_backend += len(buf)
+ yield buf
+ buf = ''
break
- with ChunkWriteTimeout(self.app.client_timeout):
- yield chunk
+
+ if client_chunk_size is not None:
+ while len(buf) >= client_chunk_size:
+ client_chunk = buf[:client_chunk_size]
+ buf = buf[client_chunk_size:]
+ with ChunkWriteTimeout(self.app.client_timeout):
+ yield client_chunk
+ bytes_consumed_from_backend += len(client_chunk)
+ else:
+ with ChunkWriteTimeout(self.app.client_timeout):
+ yield buf
+ bytes_consumed_from_backend += len(buf)
+ buf = ''
+
# This is for fairness; if the network is outpacing the CPU,
# we'll always be able to read and write data without
# encountering an EWOULDBLOCK, and so eventlet will not switch
@@ -757,7 +840,7 @@ class GetOrHeadHandler(object):
node_timeout = self.app.node_timeout
if self.server_type == 'Object' and not self.newest:
node_timeout = self.app.recoverable_node_timeout
- for node in self.app.iter_nodes(self.ring, self.partition):
+ for node in self.node_iter:
if node in self.used_nodes:
continue
start_node_timing = time.time()
@@ -793,8 +876,10 @@ class GetOrHeadHandler(object):
src_headers = dict(
(k.lower(), v) for k, v in
possible_source.getheaders())
- if src_headers.get('etag', '').strip('"') != \
- self.used_source_etag:
+
+ if self.used_source_etag != src_headers.get(
+ 'x-object-sysmeta-ec-etag',
+ src_headers.get('etag', '')).strip('"'):
self.statuses.append(HTTP_NOT_FOUND)
self.reasons.append('')
self.bodies.append('')
@@ -832,7 +917,9 @@ class GetOrHeadHandler(object):
src_headers = dict(
(k.lower(), v) for k, v in
possible_source.getheaders())
- self.used_source_etag = src_headers.get('etag', '').strip('"')
+ self.used_source_etag = src_headers.get(
+ 'x-object-sysmeta-ec-etag',
+ src_headers.get('etag', '')).strip('"')
return source, node
return None, None
@@ -841,13 +928,17 @@ class GetOrHeadHandler(object):
res = None
if source:
res = Response(request=req)
+ res.status = source.status
+ update_headers(res, source.getheaders())
if req.method == 'GET' and \
source.status in (HTTP_OK, HTTP_PARTIAL_CONTENT):
+ cr = res.headers.get('Content-Range')
+ if cr:
+ start, end, total = parse_content_range(cr)
+ self.learn_size_from_content_range(start, end)
res.app_iter = self._make_app_iter(req, node, source)
# See NOTE: swift_conn at top of file about this.
res.swift_conn = source.swift_conn
- res.status = source.status
- update_headers(res, source.getheaders())
if not res.environ:
res.environ = {}
res.environ['swift_x_timestamp'] = \
@@ -993,7 +1084,8 @@ class Controller(object):
else:
info['partition'] = part
info['nodes'] = nodes
- info.setdefault('storage_policy', '0')
+ if info.get('storage_policy') is None:
+ info['storage_policy'] = 0
return info
def _make_request(self, nodes, part, method, path, headers, query,
@@ -1098,6 +1190,13 @@ class Controller(object):
'%s %s' % (self.server_type, req.method),
overrides=overrides, headers=resp_headers)
+ def _quorum_size(self, n):
+ """
+ Number of successful backend responses needed for the proxy to
+ consider the client request successful.
+ """
+ return quorum_size(n)
+
def have_quorum(self, statuses, node_count):
"""
Given a list of statuses from several requests, determine if
@@ -1107,16 +1206,18 @@ class Controller(object):
:param node_count: number of nodes being queried (basically ring count)
:returns: True or False, depending on if quorum is established
"""
- quorum = quorum_size(node_count)
+ quorum = self._quorum_size(node_count)
if len(statuses) >= quorum:
- for hundred in (HTTP_OK, HTTP_MULTIPLE_CHOICES, HTTP_BAD_REQUEST):
+ for hundred in (HTTP_CONTINUE, HTTP_OK, HTTP_MULTIPLE_CHOICES,
+ HTTP_BAD_REQUEST):
if sum(1 for s in statuses
if hundred <= s < hundred + 100) >= quorum:
return True
return False
def best_response(self, req, statuses, reasons, bodies, server_type,
- etag=None, headers=None, overrides=None):
+ etag=None, headers=None, overrides=None,
+ quorum_size=None):
"""
Given a list of responses from several servers, choose the best to
return to the API.
@@ -1128,10 +1229,16 @@ class Controller(object):
:param server_type: type of server the responses came from
:param etag: etag
:param headers: headers of each response
+ :param overrides: overrides to apply when lacking quorum
+ :param quorum_size: quorum size to use
:returns: swob.Response object with the correct status, body, etc. set
"""
+ if quorum_size is None:
+ quorum_size = self._quorum_size(len(statuses))
+
resp = self._compute_quorum_response(
- req, statuses, reasons, bodies, etag, headers)
+ req, statuses, reasons, bodies, etag, headers,
+ quorum_size=quorum_size)
if overrides and not resp:
faked_up_status_indices = set()
transformed = []
@@ -1145,7 +1252,8 @@ class Controller(object):
statuses, reasons, headers, bodies = zip(*transformed)
resp = self._compute_quorum_response(
req, statuses, reasons, bodies, etag, headers,
- indices_to_avoid=faked_up_status_indices)
+ indices_to_avoid=faked_up_status_indices,
+ quorum_size=quorum_size)
if not resp:
resp = Response(request=req)
@@ -1156,14 +1264,14 @@ class Controller(object):
return resp
def _compute_quorum_response(self, req, statuses, reasons, bodies, etag,
- headers, indices_to_avoid=()):
+ headers, quorum_size, indices_to_avoid=()):
if not statuses:
return None
for hundred in (HTTP_OK, HTTP_MULTIPLE_CHOICES, HTTP_BAD_REQUEST):
hstatuses = \
[(i, s) for i, s in enumerate(statuses)
if hundred <= s < hundred + 100]
- if len(hstatuses) >= quorum_size(len(statuses)):
+ if len(hstatuses) >= quorum_size:
resp = Response(request=req)
try:
status_index, status = max(
@@ -1228,22 +1336,25 @@ class Controller(object):
else:
self.app.logger.warning('Could not autocreate account %r' % path)
- def GETorHEAD_base(self, req, server_type, ring, partition, path):
+ def GETorHEAD_base(self, req, server_type, node_iter, partition, path,
+ client_chunk_size=None):
"""
Base handler for HTTP GET or HEAD requests.
:param req: swob.Request object
:param server_type: server type used in logging
- :param ring: the ring to obtain nodes from
+ :param node_iter: an iterator to obtain nodes from
:param partition: partition
:param path: path for the request
+ :param client_chunk_size: chunk size for response body iterator
:returns: swob.Response object
"""
backend_headers = self.generate_request_headers(
req, additional=req.headers)
- handler = GetOrHeadHandler(self.app, req, self.server_type, ring,
- partition, path, backend_headers)
+ handler = GetOrHeadHandler(self.app, req, self.server_type, node_iter,
+ partition, path, backend_headers,
+ client_chunk_size=client_chunk_size)
res = handler.get_working_response(req)
if not res:
diff --git a/swift/proxy/controllers/container.py b/swift/proxy/controllers/container.py
index fb422e68d..3e4a2bb03 100644
--- a/swift/proxy/controllers/container.py
+++ b/swift/proxy/controllers/container.py
@@ -93,8 +93,9 @@ class ContainerController(Controller):
return HTTPNotFound(request=req)
part = self.app.container_ring.get_part(
self.account_name, self.container_name)
+ node_iter = self.app.iter_nodes(self.app.container_ring, part)
resp = self.GETorHEAD_base(
- req, _('Container'), self.app.container_ring, part,
+ req, _('Container'), node_iter, part,
req.swift_entity_path)
if 'swift.authorize' in req.environ:
req.acl = resp.headers.get('x-container-read')
diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py
index 2b53ba7a8..a83242b5f 100644
--- a/swift/proxy/controllers/obj.py
+++ b/swift/proxy/controllers/obj.py
@@ -24,13 +24,17 @@
# These shenanigans are to ensure all related objects can be garbage
# collected. We've seen objects hang around forever otherwise.
+import collections
import itertools
import mimetypes
import time
import math
+import random
+from hashlib import md5
from swift import gettext_ as _
from urllib import unquote, quote
+from greenlet import GreenletExit
from eventlet import GreenPile
from eventlet.queue import Queue
from eventlet.timeout import Timeout
@@ -38,7 +42,8 @@ from eventlet.timeout import Timeout
from swift.common.utils import (
clean_content_type, config_true_value, ContextPool, csv_append,
GreenAsyncPile, GreenthreadSafeIterator, json, Timestamp,
- normalize_delete_at_timestamp, public, quorum_size, get_expirer_container)
+ normalize_delete_at_timestamp, public, get_expirer_container,
+ quorum_size)
from swift.common.bufferedhttp import http_connect
from swift.common.constraints import check_metadata, check_object_creation, \
check_copy_from_header, check_destination_header, \
@@ -46,21 +51,24 @@ from swift.common.constraints import check_metadata, check_object_creation, \
from swift.common import constraints
from swift.common.exceptions import ChunkReadTimeout, \
ChunkWriteTimeout, ConnectionTimeout, ListingIterNotFound, \
- ListingIterNotAuthorized, ListingIterError
+ ListingIterNotAuthorized, ListingIterError, ResponseTimeout, \
+ InsufficientStorage, FooterNotSupported, MultiphasePUTNotSupported, \
+ PutterConnectError
from swift.common.http import (
is_success, is_client_error, is_server_error, HTTP_CONTINUE, HTTP_CREATED,
HTTP_MULTIPLE_CHOICES, HTTP_NOT_FOUND, HTTP_INTERNAL_SERVER_ERROR,
HTTP_SERVICE_UNAVAILABLE, HTTP_INSUFFICIENT_STORAGE,
- HTTP_PRECONDITION_FAILED, HTTP_CONFLICT)
-from swift.common.storage_policy import POLICIES
+ HTTP_PRECONDITION_FAILED, HTTP_CONFLICT, is_informational)
+from swift.common.storage_policy import (POLICIES, REPL_POLICY, EC_POLICY,
+ ECDriverError, PolicyError)
from swift.proxy.controllers.base import Controller, delay_denial, \
cors_validation
from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPNotFound, \
HTTPPreconditionFailed, HTTPRequestEntityTooLarge, HTTPRequestTimeout, \
- HTTPServerError, HTTPServiceUnavailable, Request, \
- HTTPClientDisconnect, HeaderKeyDict, HTTPException
+ HTTPServerError, HTTPServiceUnavailable, Request, HeaderKeyDict, \
+ HTTPClientDisconnect, HTTPUnprocessableEntity, Response, HTTPException
from swift.common.request_helpers import is_sys_or_user_meta, is_sys_meta, \
- remove_items, copy_header_subset
+ remove_items, copy_header_subset, close_if_possible
def copy_headers_into(from_r, to_r):
@@ -85,8 +93,41 @@ def check_content_type(req):
return None
-class ObjectController(Controller):
- """WSGI controller for object requests."""
+class ObjectControllerRouter(object):
+
+ policy_type_to_controller_map = {}
+
+ @classmethod
+ def register(cls, policy_type):
+ """
+ Decorator for Storage Policy implemenations to register
+ their ObjectController implementations.
+
+ This also fills in a policy_type attribute on the class.
+ """
+ def register_wrapper(controller_cls):
+ if policy_type in cls.policy_type_to_controller_map:
+ raise PolicyError(
+ '%r is already registered for the policy_type %r' % (
+ cls.policy_type_to_controller_map[policy_type],
+ policy_type))
+ cls.policy_type_to_controller_map[policy_type] = controller_cls
+ controller_cls.policy_type = policy_type
+ return controller_cls
+ return register_wrapper
+
+ def __init__(self):
+ self.policy_to_controller_cls = {}
+ for policy in POLICIES:
+ self.policy_to_controller_cls[policy] = \
+ self.policy_type_to_controller_map[policy.policy_type]
+
+ def __getitem__(self, policy):
+ return self.policy_to_controller_cls[policy]
+
+
+class BaseObjectController(Controller):
+ """Base WSGI controller for object requests."""
server_type = 'Object'
def __init__(self, app, account_name, container_name, object_name,
@@ -114,8 +155,10 @@ class ObjectController(Controller):
lreq.environ['QUERY_STRING'] = \
'format=json&prefix=%s&marker=%s' % (quote(lprefix),
quote(marker))
+ container_node_iter = self.app.iter_nodes(self.app.container_ring,
+ lpartition)
lresp = self.GETorHEAD_base(
- lreq, _('Container'), self.app.container_ring, lpartition,
+ lreq, _('Container'), container_node_iter, lpartition,
lreq.swift_entity_path)
if 'swift.authorize' in env:
lreq.acl = lresp.headers.get('x-container-read')
@@ -180,6 +223,7 @@ class ObjectController(Controller):
# pass the policy index to storage nodes via req header
policy_index = req.headers.get('X-Backend-Storage-Policy-Index',
container_info['storage_policy'])
+ policy = POLICIES.get_by_index(policy_index)
obj_ring = self.app.get_object_ring(policy_index)
req.headers['X-Backend-Storage-Policy-Index'] = policy_index
if 'swift.authorize' in req.environ:
@@ -188,9 +232,10 @@ class ObjectController(Controller):
return aresp
partition = obj_ring.get_part(
self.account_name, self.container_name, self.object_name)
- resp = self.GETorHEAD_base(
- req, _('Object'), obj_ring, partition,
- req.swift_entity_path)
+ node_iter = self.app.iter_nodes(obj_ring, partition)
+
+ resp = self._reroute(policy)._get_or_head_response(
+ req, node_iter, partition, policy)
if ';' in resp.headers.get('content-type', ''):
resp.content_type = clean_content_type(
@@ -383,7 +428,10 @@ class ObjectController(Controller):
_('Trying to get final status of PUT to %s') % req.path)
return (None, None)
- def _get_put_responses(self, req, conns, nodes):
+ def _get_put_responses(self, req, conns, nodes, **kwargs):
+ """
+ Collect replicated object responses.
+ """
statuses = []
reasons = []
bodies = []
@@ -488,6 +536,7 @@ class ObjectController(Controller):
self.object_name = src_obj_name
self.container_name = src_container_name
self.account_name = src_account_name
+
source_resp = self.GET(source_req)
# This gives middlewares a way to change the source; for example,
@@ -589,8 +638,9 @@ class ObjectController(Controller):
'X-Newest': 'True'}
hreq = Request.blank(req.path_info, headers=_headers,
environ={'REQUEST_METHOD': 'HEAD'})
+ hnode_iter = self.app.iter_nodes(obj_ring, partition)
hresp = self.GETorHEAD_base(
- hreq, _('Object'), obj_ring, partition,
+ hreq, _('Object'), hnode_iter, partition,
hreq.swift_entity_path)
is_manifest = 'X-Object-Manifest' in req.headers or \
@@ -654,7 +704,10 @@ class ObjectController(Controller):
req.headers['X-Timestamp'] = Timestamp(time.time()).internal
return None
- def _check_failure_put_connections(self, conns, req, nodes):
+ def _check_failure_put_connections(self, conns, req, nodes, min_conns):
+ """
+ Identify any failed connections and check minimum connection count.
+ """
if req.if_none_match is not None and '*' in req.if_none_match:
statuses = [conn.resp.status for conn in conns if conn.resp]
if HTTP_PRECONDITION_FAILED in statuses:
@@ -675,7 +728,6 @@ class ObjectController(Controller):
'timestamps': ', '.join(timestamps)})
raise HTTPAccepted(request=req)
- min_conns = quorum_size(len(nodes))
self._check_min_conn(req, conns, min_conns)
def _get_put_connections(self, req, nodes, partition, outgoing_headers,
@@ -709,8 +761,12 @@ class ObjectController(Controller):
raise HTTPServiceUnavailable(request=req)
def _transfer_data(self, req, data_source, conns, nodes):
- min_conns = quorum_size(len(nodes))
+ """
+ Transfer data for a replicated object.
+ This method was added in the PUT method extraction change
+ """
+ min_conns = quorum_size(len(nodes))
bytes_transferred = 0
try:
with ContextPool(len(nodes)) as pool:
@@ -775,11 +831,11 @@ class ObjectController(Controller):
This method is responsible for establishing connection
with storage nodes and sending object to each one of those
- nodes. After sending the data, the "best" reponse will be
+ nodes. After sending the data, the "best" response will be
returned based on statuses from all connections
"""
- policy_idx = req.headers.get('X-Backend-Storage-Policy-Index')
- policy = POLICIES.get_by_index(policy_idx)
+ policy_index = req.headers.get('X-Backend-Storage-Policy-Index')
+ policy = POLICIES.get_by_index(policy_index)
if not nodes:
return HTTPNotFound()
@@ -790,11 +846,11 @@ class ObjectController(Controller):
expect = False
conns = self._get_put_connections(req, nodes, partition,
outgoing_headers, policy, expect)
-
+ min_conns = quorum_size(len(nodes))
try:
# check that a minimum number of connections were established and
# meet all the correct conditions set in the request
- self._check_failure_put_connections(conns, req, nodes)
+ self._check_failure_put_connections(conns, req, nodes, min_conns)
# transfer data
self._transfer_data(req, data_source, conns, nodes)
@@ -1015,6 +1071,21 @@ class ObjectController(Controller):
headers, overrides=status_overrides)
return resp
+ def _reroute(self, policy):
+ """
+ For COPY requests we need to make sure the controller instance the
+ request is routed through is the correct type for the policy.
+ """
+ if not policy:
+ raise HTTPServiceUnavailable('Unknown Storage Policy')
+ if policy.policy_type != self.policy_type:
+ controller = self.app.obj_controller_router[policy](
+ self.app, self.account_name, self.container_name,
+ self.object_name)
+ else:
+ controller = self
+ return controller
+
@public
@cors_validation
@delay_denial
@@ -1031,6 +1102,7 @@ class ObjectController(Controller):
self.account_name = dest_account
del req.headers['Destination-Account']
dest_container, dest_object = check_destination_header(req)
+
source = '/%s/%s' % (self.container_name, self.object_name)
self.container_name = dest_container
self.object_name = dest_object
@@ -1042,4 +1114,1109 @@ class ObjectController(Controller):
req.headers['Content-Length'] = 0
req.headers['X-Copy-From'] = quote(source)
del req.headers['Destination']
- return self.PUT(req)
+
+ container_info = self.container_info(
+ dest_account, dest_container, req)
+ dest_policy = POLICIES.get_by_index(container_info['storage_policy'])
+
+ return self._reroute(dest_policy).PUT(req)
+
+
+@ObjectControllerRouter.register(REPL_POLICY)
+class ReplicatedObjectController(BaseObjectController):
+
+ def _get_or_head_response(self, req, node_iter, partition, policy):
+ resp = self.GETorHEAD_base(
+ req, _('Object'), node_iter, partition,
+ req.swift_entity_path)
+ return resp
+
+
+class ECAppIter(object):
+ """
+ WSGI iterable that decodes EC fragment archives (or portions thereof)
+ into the original object (or portions thereof).
+
+ :param path: path for the request
+
+ :param policy: storage policy for this object
+
+ :param internal_app_iters: list of the WSGI iterables from object server
+ GET responses for fragment archives. For an M+K erasure code, the
+ caller must supply M such iterables.
+
+ :param range_specs: list of dictionaries describing the ranges requested
+ by the client. Each dictionary contains the start and end of the
+ client's requested byte range as well as the start and end of the EC
+ segments containing that byte range.
+
+ :param obj_length: length of the object, in bytes. Learned from the
+ headers in the GET response from the object server.
+
+ :param logger: a logger
+ """
+ def __init__(self, path, policy, internal_app_iters, range_specs,
+ obj_length, logger):
+ self.path = path
+ self.policy = policy
+ self.internal_app_iters = internal_app_iters
+ self.range_specs = range_specs
+ self.obj_length = obj_length
+ self.boundary = ''
+ self.logger = logger
+
+ def close(self):
+ for it in self.internal_app_iters:
+ close_if_possible(it)
+
+ def __iter__(self):
+ segments_iter = self.decode_segments_from_fragments()
+
+ if len(self.range_specs) == 0:
+ # plain GET; just yield up segments
+ for seg in segments_iter:
+ yield seg
+ return
+
+ if len(self.range_specs) > 1:
+ raise NotImplementedError("multi-range GETs not done yet")
+
+ for range_spec in self.range_specs:
+ client_start = range_spec['client_start']
+ client_end = range_spec['client_end']
+ segment_start = range_spec['segment_start']
+ segment_end = range_spec['segment_end']
+
+ seg_size = self.policy.ec_segment_size
+ is_suffix = client_start is None
+
+ if is_suffix:
+ # Suffix byte ranges (i.e. requests for the last N bytes of
+ # an object) are likely to end up not on a segment boundary.
+ client_range_len = client_end
+ client_start = max(self.obj_length - client_range_len, 0)
+ client_end = self.obj_length - 1
+
+ # may be mid-segment; if it is, then everything up to the
+ # first segment boundary is garbage, and is discarded before
+ # ever getting into this function.
+ unaligned_segment_start = max(self.obj_length - segment_end, 0)
+ alignment_offset = (
+ (seg_size - (unaligned_segment_start % seg_size))
+ % seg_size)
+ segment_start = unaligned_segment_start + alignment_offset
+ segment_end = self.obj_length - 1
+ else:
+ # It's entirely possible that the client asked for a range that
+ # includes some bytes we have and some we don't; for example, a
+ # range of bytes 1000-20000000 on a 1500-byte object.
+ segment_end = (min(segment_end, self.obj_length - 1)
+ if segment_end is not None
+ else self.obj_length - 1)
+ client_end = (min(client_end, self.obj_length - 1)
+ if client_end is not None
+ else self.obj_length - 1)
+
+ num_segments = int(
+ math.ceil(float(segment_end + 1 - segment_start)
+ / self.policy.ec_segment_size))
+ # We get full segments here, but the client may have requested a
+ # byte range that begins or ends in the middle of a segment.
+ # Thus, we have some amount of overrun (extra decoded bytes)
+ # that we trim off so the client gets exactly what they
+ # requested.
+ start_overrun = client_start - segment_start
+ end_overrun = segment_end - client_end
+
+ for i, next_seg in enumerate(segments_iter):
+ # We may have a start_overrun of more than one segment in
+ # the case of suffix-byte-range requests. However, we never
+ # have an end_overrun of more than one segment.
+ if start_overrun > 0:
+ seglen = len(next_seg)
+ if seglen <= start_overrun:
+ start_overrun -= seglen
+ continue
+ else:
+ next_seg = next_seg[start_overrun:]
+ start_overrun = 0
+
+ if i == (num_segments - 1) and end_overrun:
+ next_seg = next_seg[:-end_overrun]
+
+ yield next_seg
+
+ def decode_segments_from_fragments(self):
+ # Decodes the fragments from the object servers and yields one
+ # segment at a time.
+ queues = [Queue(1) for _junk in range(len(self.internal_app_iters))]
+
+ def put_fragments_in_queue(frag_iter, queue):
+ try:
+ for fragment in frag_iter:
+ if fragment[0] == ' ':
+ raise Exception('Leading whitespace on fragment.')
+ queue.put(fragment)
+ except GreenletExit:
+ # killed by contextpool
+ pass
+ except ChunkReadTimeout:
+ # unable to resume in GetOrHeadHandler
+ pass
+ except: # noqa
+ self.logger.exception("Exception fetching fragments for %r" %
+ self.path)
+ finally:
+ queue.resize(2) # ensure there's room
+ queue.put(None)
+
+ with ContextPool(len(self.internal_app_iters)) as pool:
+ for app_iter, queue in zip(
+ self.internal_app_iters, queues):
+ pool.spawn(put_fragments_in_queue, app_iter, queue)
+
+ while True:
+ fragments = []
+ for qi, queue in enumerate(queues):
+ fragment = queue.get()
+ queue.task_done()
+ fragments.append(fragment)
+
+ # If any object server connection yields out a None; we're
+ # done. Either they are all None, and we've finished
+ # successfully; or some un-recoverable failure has left us
+ # with an un-reconstructible list of fragments - so we'll
+ # break out of the iter so WSGI can tear down the broken
+ # connection.
+ if not all(fragments):
+ break
+ try:
+ segment = self.policy.pyeclib_driver.decode(fragments)
+ except ECDriverError:
+ self.logger.exception("Error decoding fragments for %r" %
+ self.path)
+ raise
+
+ yield segment
+
+ def app_iter_range(self, start, end):
+ return self
+
+ def app_iter_ranges(self, content_type, boundary, content_size):
+ self.boundary = boundary
+
+
+def client_range_to_segment_range(client_start, client_end, segment_size):
+ """
+ Takes a byterange from the client and converts it into a byterange
+ spanning the necessary segments.
+
+ Handles prefix, suffix, and fully-specified byte ranges.
+
+ Examples:
+ client_range_to_segment_range(100, 700, 512) = (0, 1023)
+ client_range_to_segment_range(100, 700, 256) = (0, 767)
+ client_range_to_segment_range(300, None, 256) = (256, None)
+
+ :param client_start: first byte of the range requested by the client
+ :param client_end: last byte of the range requested by the client
+ :param segment_size: size of an EC segment, in bytes
+
+ :returns: a 2-tuple (seg_start, seg_end) where
+
+ * seg_start is the first byte of the first segment, or None if this is
+ a suffix byte range
+
+ * seg_end is the last byte of the last segment, or None if this is a
+ prefix byte range
+ """
+ # the index of the first byte of the first segment
+ segment_start = (
+ int(client_start // segment_size)
+ * segment_size) if client_start is not None else None
+ # the index of the last byte of the last segment
+ segment_end = (
+ # bytes M-
+ None if client_end is None else
+ # bytes M-N
+ (((int(client_end // segment_size) + 1)
+ * segment_size) - 1) if client_start is not None else
+ # bytes -N: we get some extra bytes to make sure we
+ # have all we need.
+ #
+ # To see why, imagine a 100-byte segment size, a
+ # 340-byte object, and a request for the last 50
+ # bytes. Naively requesting the last 100 bytes would
+ # result in a truncated first segment and hence a
+ # truncated download. (Of course, the actual
+ # obj-server requests are for fragments, not
+ # segments, but that doesn't change the
+ # calculation.)
+ #
+ # This does mean that we fetch an extra segment if
+ # the object size is an exact multiple of the
+ # segment size. It's a little wasteful, but it's
+ # better to be a little wasteful than to get some
+ # range requests completely wrong.
+ (int(math.ceil((
+ float(client_end) / segment_size) + 1)) # nsegs
+ * segment_size))
+ return (segment_start, segment_end)
+
+
+def segment_range_to_fragment_range(segment_start, segment_end, segment_size,
+ fragment_size):
+ """
+ Takes a byterange spanning some segments and converts that into a
+ byterange spanning the corresponding fragments within their fragment
+ archives.
+
+ Handles prefix, suffix, and fully-specified byte ranges.
+
+ :param segment_start: first byte of the first segment
+ :param segment_end: last byte of the last segment
+ :param segment_size: size of an EC segment, in bytes
+ :param fragment_size: size of an EC fragment, in bytes
+
+ :returns: a 2-tuple (frag_start, frag_end) where
+
+ * frag_start is the first byte of the first fragment, or None if this
+ is a suffix byte range
+
+ * frag_end is the last byte of the last fragment, or None if this is a
+ prefix byte range
+ """
+ # Note: segment_start and (segment_end + 1) are
+ # multiples of segment_size, so we don't have to worry
+ # about integer math giving us rounding troubles.
+ #
+ # There's a whole bunch of +1 and -1 in here; that's because HTTP wants
+ # byteranges to be inclusive of the start and end, so e.g. bytes 200-300
+ # is a range containing 101 bytes. Python has half-inclusive ranges, of
+ # course, so we have to convert back and forth. We try to keep things in
+ # HTTP-style byteranges for consistency.
+
+ # the index of the first byte of the first fragment
+ fragment_start = ((
+ segment_start / segment_size * fragment_size)
+ if segment_start is not None else None)
+ # the index of the last byte of the last fragment
+ fragment_end = (
+ # range unbounded on the right
+ None if segment_end is None else
+ # range unbounded on the left; no -1 since we're
+ # asking for the last N bytes, not to have a
+ # particular byte be the last one
+ ((segment_end + 1) / segment_size
+ * fragment_size) if segment_start is None else
+ # range bounded on both sides; the -1 is because the
+ # rest of the expression computes the length of the
+ # fragment, and a range of N bytes starts at index M
+ # and ends at M + N - 1.
+ ((segment_end + 1) / segment_size * fragment_size) - 1)
+ return (fragment_start, fragment_end)
+
+
+NO_DATA_SENT = 1
+SENDING_DATA = 2
+DATA_SENT = 3
+DATA_ACKED = 4
+COMMIT_SENT = 5
+
+
+class ECPutter(object):
+ """
+ This is here mostly to wrap up the fact that all EC PUTs are
+ chunked because of the mime boundary footer trick and the first
+ half of the two-phase PUT conversation handling.
+
+ An HTTP PUT request that supports streaming.
+
+ Probably deserves more docs than this, but meh.
+ """
+ def __init__(self, conn, node, resp, path, connect_duration,
+ mime_boundary):
+ # Note: you probably want to call Putter.connect() instead of
+ # instantiating one of these directly.
+ self.conn = conn
+ self.node = node
+ self.resp = resp
+ self.path = path
+ self.connect_duration = connect_duration
+ # for handoff nodes node_index is None
+ self.node_index = node.get('index')
+ self.mime_boundary = mime_boundary
+ self.chunk_hasher = md5()
+
+ self.failed = False
+ self.queue = None
+ self.state = NO_DATA_SENT
+
+ def current_status(self):
+ """
+ Returns the current status of the response.
+
+ A response starts off with no current status, then may or may not have
+ a status of 100 for some time, and then ultimately has a final status
+ like 200, 404, et cetera.
+ """
+ return self.resp.status
+
+ def await_response(self, timeout, informational=False):
+ """
+ Get 100-continue response indicating the end of 1st phase of a 2-phase
+ commit or the final response, i.e. the one with status >= 200.
+
+ Might or might not actually wait for anything. If we said Expect:
+ 100-continue but got back a non-100 response, that'll be the thing
+ returned, and we won't do any network IO to get it. OTOH, if we got
+ a 100 Continue response and sent up the PUT request's body, then
+ we'll actually read the 2xx-5xx response off the network here.
+
+ :returns: HTTPResponse
+ :raises: Timeout if the response took too long
+ """
+ conn = self.conn
+ with Timeout(timeout):
+ if not conn.resp:
+ if informational:
+ self.resp = conn.getexpect()
+ else:
+ self.resp = conn.getresponse()
+ return self.resp
+
+ def spawn_sender_greenthread(self, pool, queue_depth, write_timeout,
+ exception_handler):
+ """Call before sending the first chunk of request body"""
+ self.queue = Queue(queue_depth)
+ pool.spawn(self._send_file, write_timeout, exception_handler)
+
+ def wait(self):
+ if self.queue.unfinished_tasks:
+ self.queue.join()
+
+ def _start_mime_doc_object_body(self):
+ self.queue.put("--%s\r\nX-Document: object body\r\n\r\n" %
+ (self.mime_boundary,))
+
+ def send_chunk(self, chunk):
+ if not chunk:
+ # If we're not using chunked transfer-encoding, sending a 0-byte
+ # chunk is just wasteful. If we *are* using chunked
+ # transfer-encoding, sending a 0-byte chunk terminates the
+ # request body. Neither one of these is good.
+ return
+ elif self.state == DATA_SENT:
+ raise ValueError("called send_chunk after end_of_object_data")
+
+ if self.state == NO_DATA_SENT and self.mime_boundary:
+ # We're sending the object plus other stuff in the same request
+ # body, all wrapped up in multipart MIME, so we'd better start
+ # off the MIME document before sending any object data.
+ self._start_mime_doc_object_body()
+ self.state = SENDING_DATA
+
+ self.queue.put(chunk)
+
+ def end_of_object_data(self, footer_metadata):
+ """
+ Call when there is no more data to send.
+
+ :param footer_metadata: dictionary of metadata items
+ """
+ if self.state == DATA_SENT:
+ raise ValueError("called end_of_object_data twice")
+ elif self.state == NO_DATA_SENT and self.mime_boundary:
+ self._start_mime_doc_object_body()
+
+ footer_body = json.dumps(footer_metadata)
+ footer_md5 = md5(footer_body).hexdigest()
+
+ tail_boundary = ("--%s" % (self.mime_boundary,))
+
+ message_parts = [
+ ("\r\n--%s\r\n" % self.mime_boundary),
+ "X-Document: object metadata\r\n",
+ "Content-MD5: %s\r\n" % footer_md5,
+ "\r\n",
+ footer_body, "\r\n",
+ tail_boundary, "\r\n",
+ ]
+ self.queue.put("".join(message_parts))
+
+ self.queue.put('')
+ self.state = DATA_SENT
+
+ def send_commit_confirmation(self):
+ """
+ Call when there are > quorum 2XX responses received. Send commit
+ confirmations to all object nodes to finalize the PUT.
+ """
+ if self.state == COMMIT_SENT:
+ raise ValueError("called send_commit_confirmation twice")
+
+ self.state = DATA_ACKED
+
+ if self.mime_boundary:
+ body = "put_commit_confirmation"
+ tail_boundary = ("--%s--" % (self.mime_boundary,))
+ message_parts = [
+ "X-Document: put commit\r\n",
+ "\r\n",
+ body, "\r\n",
+ tail_boundary,
+ ]
+ self.queue.put("".join(message_parts))
+
+ self.queue.put('')
+ self.state = COMMIT_SENT
+
+ def _send_file(self, write_timeout, exception_handler):
+ """
+ Method for a file PUT coro. Takes chunks from a queue and sends them
+ down a socket.
+
+ If something goes wrong, the "failed" attribute will be set to true
+ and the exception handler will be called.
+ """
+ while True:
+ chunk = self.queue.get()
+ if not self.failed:
+ to_send = "%x\r\n%s\r\n" % (len(chunk), chunk)
+ try:
+ with ChunkWriteTimeout(write_timeout):
+ self.conn.send(to_send)
+ except (Exception, ChunkWriteTimeout):
+ self.failed = True
+ exception_handler(self.conn.node, _('Object'),
+ _('Trying to write to %s') % self.path)
+ self.queue.task_done()
+
+ @classmethod
+ def connect(cls, node, part, path, headers, conn_timeout, node_timeout,
+ chunked=False):
+ """
+ Connect to a backend node and send the headers.
+
+ :returns: Putter instance
+
+ :raises: ConnectionTimeout if initial connection timed out
+ :raises: ResponseTimeout if header retrieval timed out
+ :raises: InsufficientStorage on 507 response from node
+ :raises: PutterConnectError on non-507 server error response from node
+ :raises: FooterNotSupported if need_metadata_footer is set but
+ backend node can't process footers
+ :raises: MultiphasePUTNotSupported if need_multiphase_support is
+ set but backend node can't handle multiphase PUT
+ """
+ mime_boundary = "%.64x" % random.randint(0, 16 ** 64)
+ headers = HeaderKeyDict(headers)
+ # We're going to be adding some unknown amount of data to the
+ # request, so we can't use an explicit content length, and thus
+ # we must use chunked encoding.
+ headers['Transfer-Encoding'] = 'chunked'
+ headers['Expect'] = '100-continue'
+ if 'Content-Length' in headers:
+ headers['X-Backend-Obj-Content-Length'] = \
+ headers.pop('Content-Length')
+
+ headers['X-Backend-Obj-Multipart-Mime-Boundary'] = mime_boundary
+
+ headers['X-Backend-Obj-Metadata-Footer'] = 'yes'
+
+ headers['X-Backend-Obj-Multiphase-Commit'] = 'yes'
+
+ start_time = time.time()
+ with ConnectionTimeout(conn_timeout):
+ conn = http_connect(node['ip'], node['port'], node['device'],
+ part, 'PUT', path, headers)
+ connect_duration = time.time() - start_time
+
+ with ResponseTimeout(node_timeout):
+ resp = conn.getexpect()
+
+ if resp.status == HTTP_INSUFFICIENT_STORAGE:
+ raise InsufficientStorage
+
+ if is_server_error(resp.status):
+ raise PutterConnectError(resp.status)
+
+ if is_informational(resp.status):
+ continue_headers = HeaderKeyDict(resp.getheaders())
+ can_send_metadata_footer = config_true_value(
+ continue_headers.get('X-Obj-Metadata-Footer', 'no'))
+ can_handle_multiphase_put = config_true_value(
+ continue_headers.get('X-Obj-Multiphase-Commit', 'no'))
+
+ if not can_send_metadata_footer:
+ raise FooterNotSupported()
+
+ if not can_handle_multiphase_put:
+ raise MultiphasePUTNotSupported()
+
+ conn.node = node
+ conn.resp = None
+ if is_success(resp.status) or resp.status == HTTP_CONFLICT:
+ conn.resp = resp
+ elif (headers.get('If-None-Match', None) is not None and
+ resp.status == HTTP_PRECONDITION_FAILED):
+ conn.resp = resp
+
+ return cls(conn, node, resp, path, connect_duration, mime_boundary)
+
+
+def chunk_transformer(policy, nstreams):
+ segment_size = policy.ec_segment_size
+
+ buf = collections.deque()
+ total_buf_len = 0
+
+ chunk = yield
+ while chunk:
+ buf.append(chunk)
+ total_buf_len += len(chunk)
+ if total_buf_len >= segment_size:
+ chunks_to_encode = []
+ # extract as many chunks as we can from the input buffer
+ while total_buf_len >= segment_size:
+ to_take = segment_size
+ pieces = []
+ while to_take > 0:
+ piece = buf.popleft()
+ if len(piece) > to_take:
+ buf.appendleft(piece[to_take:])
+ piece = piece[:to_take]
+ pieces.append(piece)
+ to_take -= len(piece)
+ total_buf_len -= len(piece)
+ chunks_to_encode.append(''.join(pieces))
+
+ frags_by_byte_order = []
+ for chunk_to_encode in chunks_to_encode:
+ frags_by_byte_order.append(
+ policy.pyeclib_driver.encode(chunk_to_encode))
+ # Sequential calls to encode() have given us a list that
+ # looks like this:
+ #
+ # [[frag_A1, frag_B1, frag_C1, ...],
+ # [frag_A2, frag_B2, frag_C2, ...], ...]
+ #
+ # What we need is a list like this:
+ #
+ # [(frag_A1 + frag_A2 + ...), # destined for node A
+ # (frag_B1 + frag_B2 + ...), # destined for node B
+ # (frag_C1 + frag_C2 + ...), # destined for node C
+ # ...]
+ obj_data = [''.join(frags)
+ for frags in zip(*frags_by_byte_order)]
+ chunk = yield obj_data
+ else:
+ # didn't have enough data to encode
+ chunk = yield None
+
+ # Now we've gotten an empty chunk, which indicates end-of-input.
+ # Take any leftover bytes and encode them.
+ last_bytes = ''.join(buf)
+ if last_bytes:
+ last_frags = policy.pyeclib_driver.encode(last_bytes)
+ yield last_frags
+ else:
+ yield [''] * nstreams
+
+
+def trailing_metadata(policy, client_obj_hasher,
+ bytes_transferred_from_client,
+ fragment_archive_index):
+ return {
+ # etag and size values are being added twice here.
+ # The container override header is used to update the container db
+ # with these values as they represent the correct etag and size for
+ # the whole object and not just the FA.
+ # The object sysmeta headers will be saved on each FA of the object.
+ 'X-Object-Sysmeta-EC-Etag': client_obj_hasher.hexdigest(),
+ 'X-Object-Sysmeta-EC-Content-Length':
+ str(bytes_transferred_from_client),
+ 'X-Backend-Container-Update-Override-Etag':
+ client_obj_hasher.hexdigest(),
+ 'X-Backend-Container-Update-Override-Size':
+ str(bytes_transferred_from_client),
+ 'X-Object-Sysmeta-Ec-Frag-Index': str(fragment_archive_index),
+ # These fields are for debuggability,
+ # AKA "what is this thing?"
+ 'X-Object-Sysmeta-EC-Scheme': policy.ec_scheme_description,
+ 'X-Object-Sysmeta-EC-Segment-Size': str(policy.ec_segment_size),
+ }
+
+
+@ObjectControllerRouter.register(EC_POLICY)
+class ECObjectController(BaseObjectController):
+
+ def _get_or_head_response(self, req, node_iter, partition, policy):
+ req.headers.setdefault("X-Backend-Etag-Is-At",
+ "X-Object-Sysmeta-Ec-Etag")
+
+ if req.method == 'HEAD':
+ # no fancy EC decoding here, just one plain old HEAD request to
+ # one object server because all fragments hold all metadata
+ # information about the object.
+ resp = self.GETorHEAD_base(
+ req, _('Object'), node_iter, partition,
+ req.swift_entity_path)
+ else: # GET request
+ orig_range = None
+ range_specs = []
+ if req.range:
+ orig_range = req.range
+ # Since segments and fragments have different sizes, we need
+ # to modify the Range header sent to the object servers to
+ # make sure we get the right fragments out of the fragment
+ # archives.
+ segment_size = policy.ec_segment_size
+ fragment_size = policy.fragment_size
+
+ range_specs = []
+ new_ranges = []
+ for client_start, client_end in req.range.ranges:
+
+ segment_start, segment_end = client_range_to_segment_range(
+ client_start, client_end, segment_size)
+
+ fragment_start, fragment_end = \
+ segment_range_to_fragment_range(
+ segment_start, segment_end,
+ segment_size, fragment_size)
+
+ new_ranges.append((fragment_start, fragment_end))
+ range_specs.append({'client_start': client_start,
+ 'client_end': client_end,
+ 'segment_start': segment_start,
+ 'segment_end': segment_end})
+
+ req.range = "bytes=" + ",".join(
+ "%s-%s" % (s if s is not None else "",
+ e if e is not None else "")
+ for s, e in new_ranges)
+
+ node_iter = GreenthreadSafeIterator(node_iter)
+ num_gets = policy.ec_ndata
+ with ContextPool(num_gets) as pool:
+ pile = GreenAsyncPile(pool)
+ for _junk in range(num_gets):
+ pile.spawn(self.GETorHEAD_base,
+ req, 'Object', node_iter, partition,
+ req.swift_entity_path,
+ client_chunk_size=policy.fragment_size)
+
+ responses = list(pile)
+ good_responses = []
+ bad_responses = []
+ for response in responses:
+ if is_success(response.status_int):
+ good_responses.append(response)
+ else:
+ bad_responses.append(response)
+
+ req.range = orig_range
+ if len(good_responses) == num_gets:
+ # If these aren't all for the same object, then error out so
+ # at least the client doesn't get garbage. We can do a lot
+ # better here with more work, but this'll work for now.
+ found_obj_etags = set(
+ resp.headers['X-Object-Sysmeta-Ec-Etag']
+ for resp in good_responses)
+ if len(found_obj_etags) > 1:
+ self.app.logger.debug(
+ "Returning 503 for %s; found too many etags (%s)",
+ req.path,
+ ", ".join(found_obj_etags))
+ return HTTPServiceUnavailable(request=req)
+
+ # we found enough pieces to decode the object, so now let's
+ # decode the object
+ resp_headers = HeaderKeyDict(good_responses[0].headers.items())
+ resp_headers.pop('Content-Range', None)
+ eccl = resp_headers.get('X-Object-Sysmeta-Ec-Content-Length')
+ obj_length = int(eccl) if eccl is not None else None
+
+ resp = Response(
+ request=req,
+ headers=resp_headers,
+ conditional_response=True,
+ app_iter=ECAppIter(
+ req.swift_entity_path,
+ policy,
+ [r.app_iter for r in good_responses],
+ range_specs,
+ obj_length,
+ logger=self.app.logger))
+ else:
+ resp = self.best_response(
+ req,
+ [r.status_int for r in bad_responses],
+ [r.status.split(' ', 1)[1] for r in bad_responses],
+ [r.body for r in bad_responses],
+ 'Object',
+ headers=[r.headers for r in bad_responses])
+
+ self._fix_response_headers(resp)
+ return resp
+
+ def _fix_response_headers(self, resp):
+ # EC fragment archives each have different bytes, hence different
+ # etags. However, they all have the original object's etag stored in
+ # sysmeta, so we copy that here so the client gets it.
+ resp.headers['Etag'] = resp.headers.get(
+ 'X-Object-Sysmeta-Ec-Etag')
+ resp.headers['Content-Length'] = resp.headers.get(
+ 'X-Object-Sysmeta-Ec-Content-Length')
+
+ return resp
+
+ def _connect_put_node(self, node_iter, part, path, headers,
+ logger_thread_locals):
+ """
+ Make a connection for a erasure encoded object.
+
+ Connects to the first working node that it finds in node_iter and sends
+ over the request headers. Returns a Putter to handle the rest of the
+ streaming, or None if no working nodes were found.
+ """
+ # the object server will get different bytes, so these
+ # values do not apply (Content-Length might, in general, but
+ # in the specific case of replication vs. EC, it doesn't).
+ headers.pop('Content-Length', None)
+ headers.pop('Etag', None)
+
+ self.app.logger.thread_locals = logger_thread_locals
+ for node in node_iter:
+ try:
+ putter = ECPutter.connect(
+ node, part, path, headers,
+ conn_timeout=self.app.conn_timeout,
+ node_timeout=self.app.node_timeout)
+ self.app.set_node_timing(node, putter.connect_duration)
+ return putter
+ except InsufficientStorage:
+ self.app.error_limit(node, _('ERROR Insufficient Storage'))
+ except PutterConnectError as e:
+ self.app.error_occurred(
+ node, _('ERROR %(status)d Expect: 100-continue '
+ 'From Object Server') % {
+ 'status': e.status})
+ except (Exception, Timeout):
+ self.app.exception_occurred(
+ node, _('Object'),
+ _('Expect: 100-continue on %s') % path)
+
+ def _determine_chunk_destinations(self, putters):
+ """
+ Given a list of putters, return a dict where the key is the putter
+ and the value is the node index to use.
+
+ This is done so that we line up handoffs using the same node index
+ (in the primary part list) as the primary that the handoff is standing
+ in for. This lets erasure-code fragment archives wind up on the
+ preferred local primary nodes when possible.
+ """
+ # Give each putter a "chunk index": the index of the
+ # transformed chunk that we'll send to it.
+ #
+ # For primary nodes, that's just its index (primary 0 gets
+ # chunk 0, primary 1 gets chunk 1, and so on). For handoffs,
+ # we assign the chunk index of a missing primary.
+ handoff_conns = []
+ chunk_index = {}
+ for p in putters:
+ if p.node_index is not None:
+ chunk_index[p] = p.node_index
+ else:
+ handoff_conns.append(p)
+
+ # Note: we may have more holes than handoffs. This is okay; it
+ # just means that we failed to connect to one or more storage
+ # nodes. Holes occur when a storage node is down, in which
+ # case the connection is not replaced, and when a storage node
+ # returns 507, in which case a handoff is used to replace it.
+ holes = [x for x in range(len(putters))
+ if x not in chunk_index.values()]
+
+ for hole, p in zip(holes, handoff_conns):
+ chunk_index[p] = hole
+ return chunk_index
+
+ def _transfer_data(self, req, policy, data_source, putters, nodes,
+ min_conns, etag_hasher):
+ """
+ Transfer data for an erasure coded object.
+
+ This method was added in the PUT method extraction change
+ """
+ bytes_transferred = 0
+ chunk_transform = chunk_transformer(policy, len(nodes))
+ chunk_transform.send(None)
+
+ def send_chunk(chunk):
+ if etag_hasher:
+ etag_hasher.update(chunk)
+ backend_chunks = chunk_transform.send(chunk)
+ if backend_chunks is None:
+ # If there's not enough bytes buffered for erasure-encoding
+ # or whatever we're doing, the transform will give us None.
+ return
+
+ for putter in list(putters):
+ backend_chunk = backend_chunks[chunk_index[putter]]
+ if not putter.failed:
+ putter.chunk_hasher.update(backend_chunk)
+ putter.send_chunk(backend_chunk)
+ else:
+ putters.remove(putter)
+ self._check_min_conn(
+ req, putters, min_conns, msg='Object PUT exceptions during'
+ ' send, %(conns)s/%(nodes)s required connections')
+
+ try:
+ with ContextPool(len(putters)) as pool:
+
+ # build our chunk index dict to place handoffs in the
+ # same part nodes index as the primaries they are covering
+ chunk_index = self._determine_chunk_destinations(putters)
+
+ for putter in putters:
+ putter.spawn_sender_greenthread(
+ pool, self.app.put_queue_depth, self.app.node_timeout,
+ self.app.exception_occurred)
+ while True:
+ with ChunkReadTimeout(self.app.client_timeout):
+ try:
+ chunk = next(data_source)
+ except StopIteration:
+ computed_etag = (etag_hasher.hexdigest()
+ if etag_hasher else None)
+ received_etag = req.headers.get(
+ 'etag', '').strip('"')
+ if (computed_etag and received_etag and
+ computed_etag != received_etag):
+ raise HTTPUnprocessableEntity(request=req)
+
+ send_chunk('') # flush out any buffered data
+
+ for putter in putters:
+ trail_md = trailing_metadata(
+ policy, etag_hasher,
+ bytes_transferred,
+ chunk_index[putter])
+ trail_md['Etag'] = \
+ putter.chunk_hasher.hexdigest()
+ putter.end_of_object_data(trail_md)
+ break
+ bytes_transferred += len(chunk)
+ if bytes_transferred > constraints.MAX_FILE_SIZE:
+ raise HTTPRequestEntityTooLarge(request=req)
+
+ send_chunk(chunk)
+
+ for putter in putters:
+ putter.wait()
+
+ # for storage policies requiring 2-phase commit (e.g.
+ # erasure coding), enforce >= 'quorum' number of
+ # 100-continue responses - this indicates successful
+ # object data and metadata commit and is a necessary
+ # condition to be met before starting 2nd PUT phase
+ final_phase = False
+ need_quorum = True
+ statuses, reasons, bodies, _junk, quorum = \
+ self._get_put_responses(
+ req, putters, len(nodes), final_phase,
+ min_conns, need_quorum=need_quorum)
+ if not quorum:
+ self.app.logger.error(
+ _('Not enough object servers ack\'ed (got %d)'),
+ statuses.count(HTTP_CONTINUE))
+ raise HTTPServiceUnavailable(request=req)
+ # quorum achieved, start 2nd phase - send commit
+ # confirmation to participating object servers
+ # so they write a .durable state file indicating
+ # a successful PUT
+ for putter in putters:
+ putter.send_commit_confirmation()
+ for putter in putters:
+ putter.wait()
+ except ChunkReadTimeout as err:
+ self.app.logger.warn(
+ _('ERROR Client read timeout (%ss)'), err.seconds)
+ self.app.logger.increment('client_timeouts')
+ raise HTTPRequestTimeout(request=req)
+ except HTTPException:
+ raise
+ except (Exception, Timeout):
+ self.app.logger.exception(
+ _('ERROR Exception causing client disconnect'))
+ raise HTTPClientDisconnect(request=req)
+ if req.content_length and bytes_transferred < req.content_length:
+ req.client_disconnect = True
+ self.app.logger.warn(
+ _('Client disconnected without sending enough data'))
+ self.app.logger.increment('client_disconnects')
+ raise HTTPClientDisconnect(request=req)
+
+ def _have_adequate_successes(self, statuses, min_responses):
+ """
+ Given a list of statuses from several requests, determine if a
+ satisfactory number of nodes have responded with 2xx statuses to
+ deem the transaction for a succssful response to the client.
+
+ :param statuses: list of statuses returned so far
+ :param min_responses: minimal pass criterion for number of successes
+ :returns: True or False, depending on current number of successes
+ """
+ if sum(1 for s in statuses if is_success(s)) >= min_responses:
+ return True
+ return False
+
+ def _await_response(self, conn, final_phase):
+ return conn.await_response(
+ self.app.node_timeout, not final_phase)
+
+ def _get_conn_response(self, conn, req, final_phase, **kwargs):
+ try:
+ resp = self._await_response(conn, final_phase=final_phase,
+ **kwargs)
+ except (Exception, Timeout):
+ resp = None
+ if final_phase:
+ status_type = 'final'
+ else:
+ status_type = 'commit'
+ self.app.exception_occurred(
+ conn.node, _('Object'),
+ _('Trying to get %s status of PUT to %s') % (
+ status_type, req.path))
+ return (conn, resp)
+
+ def _get_put_responses(self, req, putters, num_nodes, final_phase,
+ min_responses, need_quorum=True):
+ """
+ Collect erasure coded object responses.
+
+ Collect object responses to a PUT request and determine if
+ satisfactory number of nodes have returned success. Return
+ statuses, quorum result if indicated by 'need_quorum' and
+ etags if this is a final phase or a multiphase PUT transaction.
+
+ :param req: the request
+ :param putters: list of putters for the request
+ :param num_nodes: number of nodes involved
+ :param final_phase: boolean indicating if this is the last phase
+ :param min_responses: minimum needed when not requiring quorum
+ :param need_quorum: boolean indicating if quorum is required
+ """
+ statuses = []
+ reasons = []
+ bodies = []
+ etags = set()
+
+ pile = GreenAsyncPile(len(putters))
+ for putter in putters:
+ if putter.failed:
+ continue
+ pile.spawn(self._get_conn_response, putter, req,
+ final_phase=final_phase)
+
+ def _handle_response(putter, response):
+ statuses.append(response.status)
+ reasons.append(response.reason)
+ if final_phase:
+ body = response.read()
+ bodies.append(body)
+ else:
+ body = ''
+ if response.status == HTTP_INSUFFICIENT_STORAGE:
+ putter.failed = True
+ self.app.error_limit(putter.node,
+ _('ERROR Insufficient Storage'))
+ elif response.status >= HTTP_INTERNAL_SERVER_ERROR:
+ putter.failed = True
+ self.app.error_occurred(
+ putter.node,
+ _('ERROR %(status)d %(body)s From Object Server '
+ 're: %(path)s') %
+ {'status': response.status,
+ 'body': body[:1024], 'path': req.path})
+ elif is_success(response.status):
+ etags.add(response.getheader('etag').strip('"'))
+
+ quorum = False
+ for (putter, response) in pile:
+ if response:
+ _handle_response(putter, response)
+ if self._have_adequate_successes(statuses, min_responses):
+ break
+ else:
+ putter.failed = True
+
+ # give any pending requests *some* chance to finish
+ finished_quickly = pile.waitall(self.app.post_quorum_timeout)
+ for (putter, response) in finished_quickly:
+ if response:
+ _handle_response(putter, response)
+
+ if need_quorum:
+ if final_phase:
+ while len(statuses) < num_nodes:
+ statuses.append(HTTP_SERVICE_UNAVAILABLE)
+ reasons.append('')
+ bodies.append('')
+ else:
+ # intermediate response phase - set return value to true only
+ # if there are enough 100-continue acknowledgements
+ if self.have_quorum(statuses, num_nodes):
+ quorum = True
+
+ return statuses, reasons, bodies, etags, quorum
+
+ def _store_object(self, req, data_source, nodes, partition,
+ outgoing_headers):
+ """
+ Store an erasure coded object.
+ """
+ policy_index = int(req.headers.get('X-Backend-Storage-Policy-Index'))
+ policy = POLICIES.get_by_index(policy_index)
+ # Since the request body sent from client -> proxy is not
+ # the same as the request body sent proxy -> object, we
+ # can't rely on the object-server to do the etag checking -
+ # so we have to do it here.
+ etag_hasher = md5()
+
+ min_conns = policy.quorum
+ putters = self._get_put_connections(
+ req, nodes, partition, outgoing_headers,
+ policy, expect=True)
+
+ try:
+ # check that a minimum number of connections were established and
+ # meet all the correct conditions set in the request
+ self._check_failure_put_connections(putters, req, nodes, min_conns)
+
+ self._transfer_data(req, policy, data_source, putters,
+ nodes, min_conns, etag_hasher)
+ final_phase = True
+ need_quorum = False
+ min_resp = 2
+ putters = [p for p in putters if not p.failed]
+ # ignore response etags, and quorum boolean
+ statuses, reasons, bodies, _etags, _quorum = \
+ self._get_put_responses(req, putters, len(nodes),
+ final_phase, min_resp,
+ need_quorum=need_quorum)
+ except HTTPException as resp:
+ return resp
+
+ etag = etag_hasher.hexdigest()
+ resp = self.best_response(req, statuses, reasons, bodies,
+ _('Object PUT'), etag=etag,
+ quorum_size=min_conns)
+ resp.last_modified = math.ceil(
+ float(Timestamp(req.headers['X-Timestamp'])))
+ return resp
diff --git a/swift/proxy/server.py b/swift/proxy/server.py
index 28d41df55..8c9e22372 100644
--- a/swift/proxy/server.py
+++ b/swift/proxy/server.py
@@ -20,6 +20,8 @@ from swift import gettext_ as _
from random import shuffle
from time import time
import itertools
+import functools
+import sys
from eventlet import Timeout
@@ -32,11 +34,12 @@ from swift.common.utils import cache_from_env, get_logger, \
affinity_key_function, affinity_locality_predicate, list_from_csv, \
register_swift_info
from swift.common.constraints import check_utf8
-from swift.proxy.controllers import AccountController, ObjectController, \
- ContainerController, InfoController
+from swift.proxy.controllers import AccountController, ContainerController, \
+ ObjectControllerRouter, InfoController
+from swift.proxy.controllers.base import get_container_info
from swift.common.swob import HTTPBadRequest, HTTPForbidden, \
HTTPMethodNotAllowed, HTTPNotFound, HTTPPreconditionFailed, \
- HTTPServerError, HTTPException, Request
+ HTTPServerError, HTTPException, Request, HTTPServiceUnavailable
# List of entry points for mandatory middlewares.
@@ -109,6 +112,7 @@ class Application(object):
# ensure rings are loaded for all configured storage policies
for policy in POLICIES:
policy.load_ring(swift_dir)
+ self.obj_controller_router = ObjectControllerRouter()
self.memcache = memcache
mimetypes.init(mimetypes.knownfiles +
[os.path.join(swift_dir, 'mime.types')])
@@ -235,29 +239,44 @@ class Application(object):
"""
return POLICIES.get_object_ring(policy_idx, self.swift_dir)
- def get_controller(self, path):
+ def get_controller(self, req):
"""
Get the controller to handle a request.
- :param path: path from request
+ :param req: the request
:returns: tuple of (controller class, path dictionary)
:raises: ValueError (thrown by split_path) if given invalid path
"""
- if path == '/info':
+ if req.path == '/info':
d = dict(version=None,
expose_info=self.expose_info,
disallowed_sections=self.disallowed_sections,
admin_key=self.admin_key)
return InfoController, d
- version, account, container, obj = split_path(path, 1, 4, True)
+ version, account, container, obj = split_path(req.path, 1, 4, True)
d = dict(version=version,
account_name=account,
container_name=container,
object_name=obj)
if obj and container and account:
- return ObjectController, d
+ info = get_container_info(req.environ, self)
+ policy_index = req.headers.get('X-Backend-Storage-Policy-Index',
+ info['storage_policy'])
+ policy = POLICIES.get_by_index(policy_index)
+ if not policy:
+ # This indicates that a new policy has been created,
+ # with rings, deployed, released (i.e. deprecated =
+ # False), used by a client to create a container via
+ # another proxy that was restarted after the policy
+ # was released, and is now cached - all before this
+ # worker was HUPed to stop accepting new
+ # connections. There should never be an "unknown"
+ # index - but when there is - it's probably operator
+ # error and hopefully temporary.
+ raise HTTPServiceUnavailable('Unknown Storage Policy')
+ return self.obj_controller_router[policy], d
elif container and account:
return ContainerController, d
elif account and not container and not obj:
@@ -317,7 +336,7 @@ class Application(object):
request=req, body='Invalid UTF8 or contains NULL')
try:
- controller, path_parts = self.get_controller(req.path)
+ controller, path_parts = self.get_controller(req)
p = req.path_info
if isinstance(p, unicode):
p = p.encode('utf-8')
@@ -474,9 +493,9 @@ class Application(object):
def iter_nodes(self, ring, partition, node_iter=None):
"""
Yields nodes for a ring partition, skipping over error
- limited nodes and stopping at the configurable number of
- nodes. If a node yielded subsequently gets error limited, an
- extra node will be yielded to take its place.
+ limited nodes and stopping at the configurable number of nodes. If a
+ node yielded subsequently gets error limited, an extra node will be
+ yielded to take its place.
Note that if you're going to iterate over this concurrently from
multiple greenthreads, you'll want to use a
@@ -527,7 +546,8 @@ class Application(object):
if nodes_left <= 0:
return
- def exception_occurred(self, node, typ, additional_info):
+ def exception_occurred(self, node, typ, additional_info,
+ **kwargs):
"""
Handle logging of generic exceptions.
@@ -536,11 +556,18 @@ class Application(object):
:param additional_info: additional information to log
"""
self._incr_node_errors(node)
- self.logger.exception(
- _('ERROR with %(type)s server %(ip)s:%(port)s/%(device)s re: '
- '%(info)s'),
- {'type': typ, 'ip': node['ip'], 'port': node['port'],
- 'device': node['device'], 'info': additional_info})
+ if 'level' in kwargs:
+ log = functools.partial(self.logger.log, kwargs.pop('level'))
+ if 'exc_info' not in kwargs:
+ kwargs['exc_info'] = sys.exc_info()
+ else:
+ log = self.logger.exception
+ log(_('ERROR with %(type)s server %(ip)s:%(port)s/%(device)s'
+ ' re: %(info)s'), {
+ 'type': typ, 'ip': node['ip'], 'port':
+ node['port'], 'device': node['device'],
+ 'info': additional_info
+ }, **kwargs)
def modify_wsgi_pipeline(self, pipe):
"""