summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2015-04-14 07:41:34 +0000
committerGerrit Code Review <review@openstack.org>2015-04-14 07:41:34 +0000
commit4e6c09513514774444bfe9679a55714331b66c1f (patch)
treed8870cdbad1b89671cff416538299b2ee805c3b3
parent376ecc74fccef390e4e4de66cae3ef20bcbd021f (diff)
parented5406628884432e23bbabb02d2855d4b51a332d (diff)
downloadswift-4e6c09513514774444bfe9679a55714331b66c1f.tar.gz
Merge "Add support for policy types, 'erasure_coding' policy" into feature/ec_review
-rw-r--r--doc/source/admin_guide.rst10
-rw-r--r--etc/container-server.conf-sample3
-rw-r--r--etc/swift.conf-sample42
-rwxr-xr-xswift/cli/ringbuilder.py25
-rw-r--r--swift/common/ring/builder.py39
-rw-r--r--swift/common/storage_policy.py379
-rw-r--r--swift/container/sync.py7
-rw-r--r--swift/obj/diskfile.py13
-rw-r--r--test/functional/__init__.py2
-rw-r--r--test/unit/cli/test_ringbuilder.py27
-rw-r--r--test/unit/common/middleware/test_recon.py14
-rw-r--r--test/unit/common/test_internal_client.py45
-rw-r--r--test/unit/common/test_storage_policy.py414
-rw-r--r--test/unit/container/test_sync.py7
-rw-r--r--test/unit/obj/test_diskfile.py5
-rwxr-xr-xtest/unit/obj/test_server.py1
16 files changed, 917 insertions, 116 deletions
diff --git a/doc/source/admin_guide.rst b/doc/source/admin_guide.rst
index d96353c84..5b7a02850 100644
--- a/doc/source/admin_guide.rst
+++ b/doc/source/admin_guide.rst
@@ -88,6 +88,16 @@ attempting to write to or read the builder/ring files while operations are in
progress. This can be useful in environments where ring management has been
automated but the operator still needs to interact with the rings manually.
+If the ring builder is not producing the balances that you are
+expecting, you can gain visibility into what it's doing with the
+``--debug`` flag.::
+
+ swift-ring-builder <builder-file> rebalance --debug
+
+This produces a great deal of output that is mostly useful if you are
+either (a) attempting to fix the ring builder, or (b) filing a bug
+against the ring builder.
+
-----------------------
Scripting Ring Creation
-----------------------
diff --git a/etc/container-server.conf-sample b/etc/container-server.conf-sample
index de511368a..6e881d9e0 100644
--- a/etc/container-server.conf-sample
+++ b/etc/container-server.conf-sample
@@ -167,6 +167,9 @@ use = egg:swift#recon
#
# Maximum amount of time to spend syncing each container per pass
# container_time = 60
+#
+# Maximum amount of time in seconds for the connection attempt
+# conn_timeout = 5
# Note: Put it at the beginning of the pipeline to profile all middleware. But
# it is safer to put this after healthcheck.
diff --git a/etc/swift.conf-sample b/etc/swift.conf-sample
index fac17676c..872681401 100644
--- a/etc/swift.conf-sample
+++ b/etc/swift.conf-sample
@@ -22,9 +22,13 @@ swift_hash_path_prefix = changeme
# defined you must define a policy with index 0 and you must specify a
# default. It is recommended you always define a section for
# storage-policy:0.
+#
+# A 'policy_type' argument is also supported but is not mandatory. Default
+# policy type 'replication' is used when 'policy_type' is unspecified.
[storage-policy:0]
name = Policy-0
default = yes
+#policy_type = replication
# the following section would declare a policy called 'silver', the number of
# replicas will be determined by how the ring is built. In this example the
@@ -39,9 +43,45 @@ default = yes
# current default.
#[storage-policy:1]
#name = silver
+#policy_type = replication
+
+# The following declares a storage policy of type 'erasure_coding' which uses
+# Erasure Coding for data reliability. The 'erasure_coding' storage policy in
+# Swift is available as a "beta". Please refer to Swift documentation for
+# details on how the 'erasure_coding' storage policy is implemented.
+#
+# Swift uses PyECLib, a Python Erasure coding API library, for encode/decode
+# operations. Please refer to Swift documentation for details on how to
+# install PyECLib.
+#
+# When defining an EC policy, 'policy_type' needs to be 'erasure_coding' and
+# EC configuration parameters 'ec_type', 'ec_num_data_fragments' and
+# 'ec_num_parity_fragments' must be specified. 'ec_type' is chosen from the
+# list of EC backends supported by PyECLib. The ring configured for the
+# storage policy must have it's "replica" count configured to
+# 'ec_num_data_fragments' + 'ec_num_parity_fragments' - this requirement is
+# validated when services start. 'ec_object_segment_size' is the amount of
+# data that will be buffered up before feeding a segment into the
+# encoder/decoder. More information about these configuration options and
+# supported `ec_type` schemes is available in the Swift documentation. Please
+# refer to Swift documentation for details on how to configure EC policies.
+#
+# The example 'deepfreeze10-4' policy defined below is a _sample_
+# configuration with 10 'data' and 4 'parity' fragments. 'ec_type'
+# defines the Erasure Coding scheme. 'jerasure_rs_vand' (Reed-Solomon
+# Vandermonde) is used as an example below.
+#
+#[storage-policy:2]
+#name = deepfreeze10-4
+#policy_type = erasure_coding
+#ec_type = jerasure_rs_vand
+#ec_num_data_fragments = 10
+#ec_num_parity_fragments = 4
+#ec_object_segment_size = 1048576
+
# The swift-constraints section sets the basic constraints on data
-# saved in the swift cluster. These constraints are automatically
+# saved in the swift cluster. These constraints are automatically
# published by the proxy server in responses to /info requests.
[swift-constraints]
diff --git a/swift/cli/ringbuilder.py b/swift/cli/ringbuilder.py
index 0a7dab533..16315627d 100755
--- a/swift/cli/ringbuilder.py
+++ b/swift/cli/ringbuilder.py
@@ -14,12 +14,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
+
from errno import EEXIST
from itertools import islice, izip
from operator import itemgetter
from os import mkdir
from os.path import basename, abspath, dirname, exists, join as pathjoin
-from sys import argv as sys_argv, exit, stderr
+from sys import argv as sys_argv, exit, stderr, stdout
from textwrap import wrap
from time import time
import optparse
@@ -831,6 +833,8 @@ swift-ring-builder <builder_file> rebalance [options]
help='Force a rebalanced ring to save even '
'if < 1% of parts changed')
parser.add_option('-s', '--seed', help="seed to use for rebalance")
+ parser.add_option('-d', '--debug', action='store_true',
+ help="print debug information")
options, args = parser.parse_args(argv)
def get_seed(index):
@@ -841,6 +845,14 @@ swift-ring-builder <builder_file> rebalance [options]
except IndexError:
pass
+ if options.debug:
+ logger = logging.getLogger("swift.ring.builder")
+ logger.setLevel(logging.DEBUG)
+ handler = logging.StreamHandler(stdout)
+ formatter = logging.Formatter("%(levelname)s: %(message)s")
+ handler.setFormatter(formatter)
+ logger.addHandler(handler)
+
devs_changed = builder.devs_changed
try:
last_balance = builder.get_balance()
@@ -889,11 +901,12 @@ swift-ring-builder <builder_file> rebalance [options]
status = EXIT_SUCCESS
if builder.dispersion > 0:
print '-' * 79
- print('NOTE: Dispersion of %.06f indicates some parts are not\n'
- ' optimally dispersed.\n\n'
- ' You may want adjust some device weights, increase\n'
- ' the overload or review the dispersion report.' %
- builder.dispersion)
+ print(
+ 'NOTE: Dispersion of %.06f indicates some parts are not\n'
+ ' optimally dispersed.\n\n'
+ ' You may want to adjust some device weights, increase\n'
+ ' the overload or review the dispersion report.' %
+ builder.dispersion)
status = EXIT_WARNING
print '-' * 79
elif balance > 5 and balance / 100.0 > builder.overload:
diff --git a/swift/common/ring/builder.py b/swift/common/ring/builder.py
index b464bd76b..6672fdbec 100644
--- a/swift/common/ring/builder.py
+++ b/swift/common/ring/builder.py
@@ -17,6 +17,7 @@ import bisect
import copy
import errno
import itertools
+import logging
import math
import random
import cPickle as pickle
@@ -33,6 +34,16 @@ from swift.common.ring.utils import tiers_for_dev, build_tier_tree, \
MAX_BALANCE = 999.99
+try:
+ # python 2.7+
+ from logging import NullHandler
+except ImportError:
+ # python 2.6
+ class NullHandler(logging.Handler):
+ def emit(self, *a, **kw):
+ pass
+
+
class RingBuilder(object):
"""
Used to build swift.common.ring.RingData instances to be written to disk
@@ -96,6 +107,11 @@ class RingBuilder(object):
self._remove_devs = []
self._ring = None
+ self.logger = logging.getLogger("swift.ring.builder")
+ if not self.logger.handlers:
+ # silence "no handler for X" error messages
+ self.logger.addHandler(NullHandler())
+
def weight_of_one_part(self):
"""
Returns the weight of each partition as calculated from the
@@ -355,6 +371,7 @@ class RingBuilder(object):
self._ring = None
if self._last_part_moves_epoch is None:
+ self.logger.debug("New builder; performing initial balance")
self._initial_balance()
self.devs_changed = False
self._build_dispersion_graph()
@@ -363,16 +380,23 @@ class RingBuilder(object):
self._update_last_part_moves()
last_balance = 0
new_parts, removed_part_count = self._adjust_replica2part2dev_size()
+ self.logger.debug(
+ "%d new parts and %d removed parts from replica-count change",
+ len(new_parts), removed_part_count)
changed_parts += removed_part_count
self._set_parts_wanted()
self._reassign_parts(new_parts)
changed_parts += len(new_parts)
while True:
reassign_parts = self._gather_reassign_parts()
- self._reassign_parts(reassign_parts)
changed_parts += len(reassign_parts)
+ self.logger.debug("Gathered %d parts", changed_parts)
+ self._reassign_parts(reassign_parts)
+ self.logger.debug("Assigned %d parts", changed_parts)
while self._remove_devs:
- self.devs[self._remove_devs.pop()['id']] = None
+ remove_dev_id = self._remove_devs.pop()['id']
+ self.logger.debug("Removing dev %d", remove_dev_id)
+ self.devs[remove_dev_id] = None
balance = self.get_balance()
if balance < 1 or abs(last_balance - balance) < 1 or \
changed_parts == self.parts:
@@ -786,6 +810,9 @@ class RingBuilder(object):
if dev_id in dev_ids:
self._last_part_moves[part] = 0
removed_dev_parts[part].append(replica)
+ self.logger.debug(
+ "Gathered %d/%d from dev %d [dev removed]",
+ part, replica, dev_id)
# Now we gather partitions that are "at risk" because they aren't
# currently sufficient spread out across the cluster.
@@ -859,6 +886,9 @@ class RingBuilder(object):
dev['parts'] -= 1
removed_replica = True
moved_parts += 1
+ self.logger.debug(
+ "Gathered %d/%d from dev %d [dispersion]",
+ part, replica, dev['id'])
break
if removed_replica:
for tier in tfd[dev['id']]:
@@ -894,6 +924,9 @@ class RingBuilder(object):
dev['parts_wanted'] += 1
dev['parts'] -= 1
reassign_parts[part].append(replica)
+ self.logger.debug(
+ "Gathered %d/%d from dev %d [weight]",
+ part, replica, dev['id'])
reassign_parts.update(spread_out_parts)
reassign_parts.update(removed_dev_parts)
@@ -1121,6 +1154,8 @@ class RingBuilder(object):
new_index, new_last_sort_key)
self._replica2part2dev[replica][part] = dev['id']
+ self.logger.debug(
+ "Placed %d/%d onto dev %d", part, replica, dev['id'])
# Just to save memory and keep from accidental reuse.
for dev in self._iter_devs():
diff --git a/swift/common/storage_policy.py b/swift/common/storage_policy.py
index f33eda539..23e52fc56 100644
--- a/swift/common/storage_policy.py
+++ b/swift/common/storage_policy.py
@@ -17,10 +17,18 @@ import string
from swift.common.utils import config_true_value, SWIFT_CONF_FILE
from swift.common.ring import Ring
+from swift.common.utils import quorum_size
+from swift.common.exceptions import RingValidationError
+from pyeclib.ec_iface import ECDriver, ECDriverError, VALID_EC_TYPES
LEGACY_POLICY_NAME = 'Policy-0'
VALID_CHARS = '-' + string.letters + string.digits
+DEFAULT_POLICY_TYPE = REPL_POLICY = 'replication'
+EC_POLICY = 'erasure_coding'
+
+DEFAULT_EC_OBJECT_SEGMENT_SIZE = 1048576
+
class PolicyError(ValueError):
@@ -38,36 +46,73 @@ def _get_policy_string(base, policy_index):
return return_string
-def get_policy_string(base, policy_index):
+def get_policy_string(base, policy_or_index):
"""
- Helper function to construct a string from a base and the policy
- index. Used to encode the policy index into either a file name
- or a directory name by various modules.
+ Helper function to construct a string from a base and the policy.
+ Used to encode the policy index into either a file name or a
+ directory name by various modules.
:param base: the base string
- :param policy_index: the storage policy index
+ :param policy_or_index: StoragePolicy instance, or an index
+ (string or int), if None the legacy
+ storage Policy-0 is assumed.
:returns: base name with policy index added
+ :raises: PolicyError if no policy exists with the given policy_index
"""
- if POLICIES.get_by_index(policy_index) is None:
- raise PolicyError("No policy with index %r" % policy_index)
- return _get_policy_string(base, policy_index)
+ if isinstance(policy_or_index, BaseStoragePolicy):
+ policy = policy_or_index
+ else:
+ policy = POLICIES.get_by_index(policy_or_index)
+ if policy is None:
+ raise PolicyError("Unknown policy", index=policy_or_index)
+ return _get_policy_string(base, int(policy))
-class StoragePolicy(object):
+def split_policy_string(policy_string):
"""
- Represents a storage policy.
- Not meant to be instantiated directly; use
- :func:`~swift.common.storage_policy.reload_storage_policies` to load
- POLICIES from ``swift.conf``.
+ Helper function to convert a string representing a base and a
+ policy. Used to decode the policy from either a file name or
+ a directory name by various modules.
+
+ :param policy_string: base name with policy index added
+
+ :raises: PolicyError if given index does not map to a valid policy
+ :returns: a tuple, in the form (base, policy) where base is the base
+ string and policy is the StoragePolicy instance for the
+ index encoded in the policy_string.
+ """
+ if '-' in policy_string:
+ base, policy_index = policy_string.rsplit('-', 1)
+ else:
+ base, policy_index = policy_string, None
+ policy = POLICIES.get_by_index(policy_index)
+ if get_policy_string(base, policy) != policy_string:
+ raise PolicyError("Unknown policy", index=policy_index)
+ return base, policy
+
+
+class BaseStoragePolicy(object):
+ """
+ Represents a storage policy. Not meant to be instantiated directly;
+ implement a derived subclasses (e.g. StoragePolicy, ECStoragePolicy, etc)
+ or use :func:`~swift.common.storage_policy.reload_storage_policies` to
+ load POLICIES from ``swift.conf``.
The object_ring property is lazy loaded once the service's ``swift_dir``
is known via :meth:`~StoragePolicyCollection.get_object_ring`, but it may
be over-ridden via object_ring kwarg at create time for testing or
actively loaded with :meth:`~StoragePolicy.load_ring`.
"""
+
+ policy_type_to_policy_cls = {}
+
def __init__(self, idx, name='', is_default=False, is_deprecated=False,
object_ring=None):
+ # do not allow BaseStoragePolicy class to be instantiated directly
+ if type(self) == BaseStoragePolicy:
+ raise TypeError("Can't instantiate BaseStoragePolicy directly")
+ # policy parameter validation
try:
self.idx = int(idx)
except ValueError:
@@ -88,6 +133,8 @@ class StoragePolicy(object):
self.name = name
self.is_deprecated = config_true_value(is_deprecated)
self.is_default = config_true_value(is_default)
+ if self.policy_type not in BaseStoragePolicy.policy_type_to_policy_cls:
+ raise PolicyError('Invalid type', self.policy_type)
if self.is_deprecated and self.is_default:
raise PolicyError('Deprecated policy can not be default. '
'Invalid config', self.idx)
@@ -101,8 +148,80 @@ class StoragePolicy(object):
return cmp(self.idx, int(other))
def __repr__(self):
- return ("StoragePolicy(%d, %r, is_default=%s, is_deprecated=%s)") % (
- self.idx, self.name, self.is_default, self.is_deprecated)
+ return ("%s(%d, %r, is_default=%s, "
+ "is_deprecated=%s, policy_type=%r)") % \
+ (self.__class__.__name__, self.idx, self.name,
+ self.is_default, self.is_deprecated, self.policy_type)
+
+ @classmethod
+ def register(cls, policy_type):
+ """
+ Decorator for Storage Policy implementations to register
+ their StoragePolicy class. This will also set the policy_type
+ attribute on the registered implementation.
+ """
+ def register_wrapper(policy_cls):
+ if policy_type in cls.policy_type_to_policy_cls:
+ raise PolicyError(
+ '%r is already registered for the policy_type %r' % (
+ cls.policy_type_to_policy_cls[policy_type],
+ policy_type))
+ cls.policy_type_to_policy_cls[policy_type] = policy_cls
+ policy_cls.policy_type = policy_type
+ return policy_cls
+ return register_wrapper
+
+ @classmethod
+ def _config_options_map(cls):
+ """
+ Map config option name to StoragePolicy parameter name.
+ """
+ return {
+ 'name': 'name',
+ 'policy_type': 'policy_type',
+ 'default': 'is_default',
+ 'deprecated': 'is_deprecated',
+ }
+
+ @classmethod
+ def from_config(cls, policy_index, options):
+ config_to_policy_option_map = cls._config_options_map()
+ policy_options = {}
+ for config_option, value in options.items():
+ try:
+ policy_option = config_to_policy_option_map[config_option]
+ except KeyError:
+ raise PolicyError('Invalid option %r in '
+ 'storage-policy section' % config_option,
+ index=policy_index)
+ policy_options[policy_option] = value
+ return cls(policy_index, **policy_options)
+
+ def get_info(self, config=False):
+ """
+ Return the info dict and conf file options for this policy.
+
+ :param config: boolean, if True all config options are returned
+ """
+ info = {}
+ for config_option, policy_attribute in \
+ self._config_options_map().items():
+ info[config_option] = getattr(self, policy_attribute)
+ if not config:
+ # remove some options for public consumption
+ if not self.is_default:
+ info.pop('default')
+ if not self.is_deprecated:
+ info.pop('deprecated')
+ info.pop('policy_type')
+ return info
+
+ def _validate_ring(self):
+ """
+ Hook, called when the ring is loaded. Can be used to
+ validate the ring against the StoragePolicy configuration.
+ """
+ pass
def load_ring(self, swift_dir):
"""
@@ -114,11 +233,194 @@ class StoragePolicy(object):
return
self.object_ring = Ring(swift_dir, ring_name=self.ring_name)
- def get_options(self):
- """Return the valid conf file options for this policy."""
- return {'name': self.name,
- 'default': self.is_default,
- 'deprecated': self.is_deprecated}
+ # Validate ring to make sure it conforms to policy requirements
+ self._validate_ring()
+
+ @property
+ def quorum(self):
+ """
+ Number of successful backend requests needed for the proxy to
+ consider the client request successful.
+ """
+ raise NotImplementedError()
+
+
+@BaseStoragePolicy.register(REPL_POLICY)
+class StoragePolicy(BaseStoragePolicy):
+ """
+ Represents a storage policy of type 'replication'. Default storage policy
+ class unless otherwise overridden from swift.conf.
+
+ Not meant to be instantiated directly; use
+ :func:`~swift.common.storage_policy.reload_storage_policies` to load
+ POLICIES from ``swift.conf``.
+ """
+
+ @property
+ def quorum(self):
+ """
+ Quorum concept in the replication case:
+ floor(number of replica / 2) + 1
+ """
+ if not self.object_ring:
+ raise PolicyError('Ring is not loaded')
+ return quorum_size(self.object_ring.replica_count)
+
+
+@BaseStoragePolicy.register(EC_POLICY)
+class ECStoragePolicy(BaseStoragePolicy):
+ """
+ Represents a storage policy of type 'erasure_coding'.
+
+ Not meant to be instantiated directly; use
+ :func:`~swift.common.storage_policy.reload_storage_policies` to load
+ POLICIES from ``swift.conf``.
+ """
+ def __init__(self, idx, name='', is_default=False,
+ is_deprecated=False, object_ring=None,
+ ec_segment_size=DEFAULT_EC_OBJECT_SEGMENT_SIZE,
+ ec_type=None, ec_ndata=None, ec_nparity=None):
+
+ super(ECStoragePolicy, self).__init__(
+ idx, name, is_default, is_deprecated, object_ring)
+
+ # Validate erasure_coding policy specific members
+ # ec_type is one of the EC implementations supported by PyEClib
+ if ec_type is None:
+ raise PolicyError('Missing ec_type')
+ if ec_type not in VALID_EC_TYPES:
+ raise PolicyError('Wrong ec_type %s for policy %s, should be one'
+ ' of "%s"' % (ec_type, self.name,
+ ', '.join(VALID_EC_TYPES)))
+ self._ec_type = ec_type
+
+ # Define _ec_ndata as the number of EC data fragments
+ # Accessible as the property "ec_ndata"
+ try:
+ value = int(ec_ndata)
+ if value <= 0:
+ raise ValueError
+ self._ec_ndata = value
+ except (TypeError, ValueError):
+ raise PolicyError('Invalid ec_num_data_fragments %r' %
+ ec_ndata, index=self.idx)
+
+ # Define _ec_nparity as the number of EC parity fragments
+ # Accessible as the property "ec_nparity"
+ try:
+ value = int(ec_nparity)
+ if value <= 0:
+ raise ValueError
+ self._ec_nparity = value
+ except (TypeError, ValueError):
+ raise PolicyError('Invalid ec_num_parity_fragments %r'
+ % ec_nparity, index=self.idx)
+
+ # Define _ec_segment_size as the encode segment unit size
+ # Accessible as the property "ec_segment_size"
+ try:
+ value = int(ec_segment_size)
+ if value <= 0:
+ raise ValueError
+ self._ec_segment_size = value
+ except (TypeError, ValueError):
+ raise PolicyError('Invalid ec_object_segment_size %r' %
+ ec_segment_size, index=self.idx)
+
+ # Initialize PyECLib EC backend
+ try:
+ self.pyeclib_driver = \
+ ECDriver(k=self._ec_ndata, m=self._ec_nparity,
+ ec_type=self._ec_type)
+ except ECDriverError as e:
+ raise PolicyError("Error creating EC policy (%s)" % e,
+ index=self.idx)
+
+ # quorum size in the EC case depends on the choice of EC scheme.
+ self._ec_quorum_size = \
+ self._ec_ndata + self.pyeclib_driver.min_parity_fragments_needed()
+
+ @property
+ def ec_type(self):
+ return self._ec_type
+
+ @property
+ def ec_ndata(self):
+ return self._ec_ndata
+
+ @property
+ def ec_nparity(self):
+ return self._ec_nparity
+
+ @property
+ def ec_segment_size(self):
+ return self._ec_segment_size
+
+ def __repr__(self):
+ return ("%s, EC config(ec_type=%s, ec_segment_size=%d, "
+ "ec_ndata=%d, ec_nparity=%d)") % (
+ super(ECStoragePolicy, self).__repr__(), self.ec_type,
+ self.ec_segment_size, self.ec_ndata, self.ec_nparity)
+
+ @classmethod
+ def _config_options_map(cls):
+ options = super(ECStoragePolicy, cls)._config_options_map()
+ options.update({
+ 'ec_type': 'ec_type',
+ 'ec_object_segment_size': 'ec_segment_size',
+ 'ec_num_data_fragments': 'ec_ndata',
+ 'ec_num_parity_fragments': 'ec_nparity',
+ })
+ return options
+
+ def get_info(self, config=False):
+ info = super(ECStoragePolicy, self).get_info(config=config)
+ if not config:
+ info.pop('ec_object_segment_size')
+ info.pop('ec_num_data_fragments')
+ info.pop('ec_num_parity_fragments')
+ info.pop('ec_type')
+ return info
+
+ def _validate_ring(self):
+ """
+ EC specific validation
+
+ Replica count check - we need _at_least_ (#data + #parity) replicas
+ configured. Also if the replica count is larger than exactly that
+ number there's a non-zero risk of error for code that is considering
+ the number of nodes in the primary list from the ring.
+ """
+ if not self.object_ring:
+ raise PolicyError('Ring is not loaded')
+ nodes_configured = self.object_ring.replica_count
+ if nodes_configured != (self.ec_ndata + self.ec_nparity):
+ raise RingValidationError(
+ 'EC ring for policy %s needs to be configured with '
+ 'exactly %d nodes. Got %d.' % (self.name,
+ self.ec_ndata + self.ec_nparity, nodes_configured))
+
+ @property
+ def quorum(self):
+ """
+ Number of successful backend requests needed for the proxy to consider
+ the client request successful.
+
+ The quorum size for EC policies defines the minimum number
+ of data + parity elements required to be able to guarantee
+ the desired fault tolerance, which is the number of data
+ elements supplemented by the minimum number of parity
+ elements required by the chosen erasure coding scheme.
+
+ For example, for Reed-Solomon, the minimum number parity
+ elements required is 1, and thus the quorum_size requirement
+ is ec_ndata + 1.
+
+ Given the number of parity elements required is not the same
+ for every erasure coding scheme, consult PyECLib for
+ min_parity_fragments_needed()
+ """
+ return self._ec_quorum_size
class StoragePolicyCollection(object):
@@ -236,9 +538,19 @@ class StoragePolicyCollection(object):
:returns: storage policy, or None if no such policy
"""
# makes it easier for callers to just pass in a header value
- index = int(index) if index else 0
+ if index in ('', None):
+ index = 0
+ else:
+ try:
+ index = int(index)
+ except ValueError:
+ return None
return self.by_index.get(index)
+ @property
+ def legacy(self):
+ return self.get_by_index(None)
+
def get_object_ring(self, policy_idx, swift_dir):
"""
Get the ring object to use to handle a request based on its policy.
@@ -267,10 +579,7 @@ class StoragePolicyCollection(object):
# delete from /info if deprecated
if pol.is_deprecated:
continue
- policy_entry = {}
- policy_entry['name'] = pol.name
- if pol.is_default:
- policy_entry['default'] = pol.is_default
+ policy_entry = pol.get_info()
policy_info.append(policy_entry)
return policy_info
@@ -287,22 +596,10 @@ def parse_storage_policies(conf):
if not section.startswith('storage-policy:'):
continue
policy_index = section.split(':', 1)[1]
- # map config option name to StoragePolicy parameter name
- config_to_policy_option_map = {
- 'name': 'name',
- 'default': 'is_default',
- 'deprecated': 'is_deprecated',
- }
- policy_options = {}
- for config_option, value in conf.items(section):
- try:
- policy_option = config_to_policy_option_map[config_option]
- except KeyError:
- raise PolicyError('Invalid option %r in '
- 'storage-policy section %r' % (
- config_option, section))
- policy_options[policy_option] = value
- policy = StoragePolicy(policy_index, **policy_options)
+ config_options = dict(conf.items(section))
+ policy_type = config_options.pop('policy_type', DEFAULT_POLICY_TYPE)
+ policy_cls = BaseStoragePolicy.policy_type_to_policy_cls[policy_type]
+ policy = policy_cls.from_config(policy_index, config_options)
policies.append(policy)
return StoragePolicyCollection(policies)
diff --git a/swift/container/sync.py b/swift/container/sync.py
index 4bf5fc5c3..0f42de6e9 100644
--- a/swift/container/sync.py
+++ b/swift/container/sync.py
@@ -158,6 +158,7 @@ class ContainerSync(Daemon):
self._myport = int(conf.get('bind_port', 6001))
swift.common.db.DB_PREALLOCATION = \
config_true_value(conf.get('db_preallocation', 'f'))
+ self.conn_timeout = float(conf.get('conn_timeout', 5))
def get_object_ring(self, policy_idx):
"""
@@ -361,7 +362,8 @@ class ContainerSync(Daemon):
headers['x-container-sync-key'] = user_key
delete_object(sync_to, name=row['name'], headers=headers,
proxy=self.select_http_proxy(),
- logger=self.logger)
+ logger=self.logger,
+ timeout=self.conn_timeout)
except ClientException as err:
if err.http_status != HTTP_NOT_FOUND:
raise
@@ -434,7 +436,8 @@ class ContainerSync(Daemon):
headers['x-container-sync-key'] = user_key
put_object(sync_to, name=row['name'], headers=headers,
contents=FileLikeIter(body),
- proxy=self.select_http_proxy(), logger=self.logger)
+ proxy=self.select_http_proxy(), logger=self.logger,
+ timeout=self.conn_timeout)
self.container_puts += 1
self.logger.increment('puts')
self.logger.timing_since('puts.timing', start_time)
diff --git a/swift/obj/diskfile.py b/swift/obj/diskfile.py
index a8d14dfa2..06073ef91 100644
--- a/swift/obj/diskfile.py
+++ b/swift/obj/diskfile.py
@@ -63,7 +63,7 @@ from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist, \
DiskFileDeleted, DiskFileError, DiskFileNotOpen, PathNotDir, \
ReplicationLockTimeout, DiskFileExpired, DiskFileXattrNotSupported
from swift.common.swob import multi_range_iterator
-from swift.common.storage_policy import get_policy_string, POLICIES
+from swift.common.storage_policy import get_policy_string, split_policy_string
from functools import partial
@@ -178,11 +178,7 @@ def extract_policy_index(obj_path):
obj_dirname = obj_portion[:obj_portion.index('/')]
except Exception:
return policy_idx
- if '-' in obj_dirname:
- base, policy_idx = obj_dirname.split('-', 1)
- if POLICIES.get_by_index(policy_idx) is None:
- policy_idx = 0
- return int(policy_idx)
+ return int(split_policy_string(obj_dirname)[1])
def quarantine_renamer(device_path, corrupted_file_path):
@@ -474,11 +470,8 @@ def object_audit_location_generator(devices, mount_check=True, logger=None,
if dir.startswith(DATADIR_BASE)]:
datadir_path = os.path.join(devices, device, dir)
# warn if the object dir doesn't match with a policy
- policy_idx = 0
- if '-' in dir:
- base, policy_idx = dir.split('-', 1)
try:
- get_data_dir(policy_idx)
+ base, policy = split_policy_string(dir)
except ValueError:
if logger:
logger.warn(_('Directory %s does not map to a '
diff --git a/test/functional/__init__.py b/test/functional/__init__.py
index 4a8cb80bd..73e500663 100644
--- a/test/functional/__init__.py
+++ b/test/functional/__init__.py
@@ -223,7 +223,7 @@ def _in_process_setup_ring(swift_conf, conf_src_dir, testdir):
# make policy_to_test be policy index 0 and default for the test config
sp_zero_section = sp_prefix + '0'
conf.add_section(sp_zero_section)
- for (k, v) in policy_to_test.get_options().items():
+ for (k, v) in policy_to_test.get_info(config=True).items():
conf.set(sp_zero_section, k, v)
conf.set(sp_zero_section, 'default', True)
diff --git a/test/unit/cli/test_ringbuilder.py b/test/unit/cli/test_ringbuilder.py
index 38991bfe4..8b96f2516 100644
--- a/test/unit/cli/test_ringbuilder.py
+++ b/test/unit/cli/test_ringbuilder.py
@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
import mock
import os
import StringIO
@@ -1744,6 +1745,32 @@ class TestRebalanceCommand(unittest.TestCase, RunSwiftRingBuilderMixin):
raise
return (mock_stdout.getvalue(), mock_stderr.getvalue())
+ def test_debug(self):
+ # NB: getLogger(name) always returns the same object
+ rb_logger = logging.getLogger("swift.ring.builder")
+ try:
+ self.assertNotEqual(rb_logger.getEffectiveLevel(), logging.DEBUG)
+
+ self.run_srb("create", 8, 3, 1)
+ self.run_srb("add",
+ "r1z1-10.1.1.1:2345/sda", 100.0,
+ "r1z1-10.1.1.1:2345/sdb", 100.0,
+ "r1z1-10.1.1.1:2345/sdc", 100.0,
+ "r1z1-10.1.1.1:2345/sdd", 100.0)
+ self.run_srb("rebalance", "--debug")
+ self.assertEqual(rb_logger.getEffectiveLevel(), logging.DEBUG)
+
+ rb_logger.setLevel(logging.INFO)
+ self.run_srb("rebalance", "--debug", "123")
+ self.assertEqual(rb_logger.getEffectiveLevel(), logging.DEBUG)
+
+ rb_logger.setLevel(logging.INFO)
+ self.run_srb("rebalance", "123", "--debug")
+ self.assertEqual(rb_logger.getEffectiveLevel(), logging.DEBUG)
+
+ finally:
+ rb_logger.setLevel(logging.INFO) # silence other test cases
+
def test_rebalance_warning_appears(self):
self.run_srb("create", 8, 3, 24)
# all in one machine: totally balanceable
diff --git a/test/unit/common/middleware/test_recon.py b/test/unit/common/middleware/test_recon.py
index 2032c62d1..a46c4ae6c 100644
--- a/test/unit/common/middleware/test_recon.py
+++ b/test/unit/common/middleware/test_recon.py
@@ -492,6 +492,9 @@ class TestReconSuccess(TestCase):
from_cache_response = {'async_pending': 5}
self.fakecache.fakeout = from_cache_response
rv = self.app.get_async_info()
+ self.assertEquals(self.fakecache.fakeout_calls,
+ [((['async_pending'],
+ '/var/cache/swift/object.recon'), {})])
self.assertEquals(rv, {'async_pending': 5})
def test_get_replication_info_account(self):
@@ -588,6 +591,17 @@ class TestReconSuccess(TestCase):
'/var/cache/swift/object.recon'), {})])
self.assertEquals(rv, {"object_updater_sweep": 0.79848217964172363})
+ def test_get_expirer_info_object(self):
+ from_cache_response = {'object_expiration_pass': 0.79848217964172363,
+ 'expired_last_pass': 99}
+ self.fakecache.fakeout_calls = []
+ self.fakecache.fakeout = from_cache_response
+ rv = self.app.get_expirer_info('object')
+ self.assertEquals(self.fakecache.fakeout_calls,
+ [((['object_expiration_pass', 'expired_last_pass'],
+ '/var/cache/swift/object.recon'), {})])
+ self.assertEquals(rv, from_cache_response)
+
def test_get_auditor_info_account(self):
from_cache_response = {"account_auditor_pass_completed": 0.24,
"account_audits_failed": 0,
diff --git a/test/unit/common/test_internal_client.py b/test/unit/common/test_internal_client.py
index df140ebdd..b7d680688 100644
--- a/test/unit/common/test_internal_client.py
+++ b/test/unit/common/test_internal_client.py
@@ -235,19 +235,20 @@ class TestInternalClient(unittest.TestCase):
write_fake_ring(object_ring_path)
with patch_policies([StoragePolicy(0, 'legacy', True)]):
client = internal_client.InternalClient(conf_path, 'test', 1)
- self.assertEqual(client.account_ring, client.app.app.app.account_ring)
- self.assertEqual(client.account_ring.serialized_path,
- account_ring_path)
- self.assertEqual(client.container_ring,
- client.app.app.app.container_ring)
- self.assertEqual(client.container_ring.serialized_path,
- container_ring_path)
- object_ring = client.app.app.app.get_object_ring(0)
- self.assertEqual(client.get_object_ring(0),
- object_ring)
- self.assertEqual(object_ring.serialized_path,
- object_ring_path)
- self.assertEquals(client.auto_create_account_prefix, '-')
+ self.assertEqual(client.account_ring,
+ client.app.app.app.account_ring)
+ self.assertEqual(client.account_ring.serialized_path,
+ account_ring_path)
+ self.assertEqual(client.container_ring,
+ client.app.app.app.container_ring)
+ self.assertEqual(client.container_ring.serialized_path,
+ container_ring_path)
+ object_ring = client.app.app.app.get_object_ring(0)
+ self.assertEqual(client.get_object_ring(0),
+ object_ring)
+ self.assertEqual(object_ring.serialized_path,
+ object_ring_path)
+ self.assertEquals(client.auto_create_account_prefix, '-')
def test_init(self):
class App(object):
@@ -333,6 +334,24 @@ class TestInternalClient(unittest.TestCase):
self.assertEquals(3, client.sleep_called)
self.assertEquals(4, client.tries)
+ def test_base_request_timeout(self):
+ # verify that base_request passes timeout arg on to urlopen
+ body = {"some": "content"}
+
+ class FakeConn(object):
+ def read(self):
+ return json.dumps(body)
+
+ for timeout in (0.0, 42.0, None):
+ mocked_func = 'swift.common.internal_client.urllib2.urlopen'
+ with mock.patch(mocked_func) as mock_urlopen:
+ mock_urlopen.side_effect = [FakeConn()]
+ sc = internal_client.SimpleClient('http://0.0.0.0/')
+ _, resp_body = sc.base_request('GET', timeout=timeout)
+ mock_urlopen.assert_called_once_with(mock.ANY, timeout=timeout)
+ # sanity check
+ self.assertEquals(body, resp_body)
+
def test_make_request_method_path_headers(self):
class InternalClient(internal_client.InternalClient):
def __init__(self):
diff --git a/test/unit/common/test_storage_policy.py b/test/unit/common/test_storage_policy.py
index 21fed77ee..6406dc192 100644
--- a/test/unit/common/test_storage_policy.py
+++ b/test/unit/common/test_storage_policy.py
@@ -19,8 +19,23 @@ import mock
from tempfile import NamedTemporaryFile
from test.unit import patch_policies, FakeRing
from swift.common.storage_policy import (
- StoragePolicy, StoragePolicyCollection, POLICIES, PolicyError,
- parse_storage_policies, reload_storage_policies, get_policy_string)
+ StoragePolicyCollection, POLICIES, PolicyError, parse_storage_policies,
+ reload_storage_policies, get_policy_string, split_policy_string,
+ BaseStoragePolicy, StoragePolicy, ECStoragePolicy, REPL_POLICY, EC_POLICY,
+ VALID_EC_TYPES, DEFAULT_EC_OBJECT_SEGMENT_SIZE)
+from swift.common.exceptions import RingValidationError
+
+
+@BaseStoragePolicy.register('fake')
+class FakeStoragePolicy(BaseStoragePolicy):
+ """
+ Test StoragePolicy class - the only user at the moment is
+ test_validate_policies_type_invalid()
+ """
+ def __init__(self, idx, name='', is_default=False, is_deprecated=False,
+ object_ring=None):
+ super(FakeStoragePolicy, self).__init__(
+ idx, name, is_default, is_deprecated, object_ring)
class TestStoragePolicies(unittest.TestCase):
@@ -31,15 +46,35 @@ class TestStoragePolicies(unittest.TestCase):
conf.readfp(StringIO.StringIO(conf_str))
return conf
- @patch_policies([StoragePolicy(0, 'zero', True),
- StoragePolicy(1, 'one', False),
- StoragePolicy(2, 'two', False),
- StoragePolicy(3, 'three', False, is_deprecated=True)])
+ def assertRaisesWithMessage(self, exc_class, message, f, *args, **kwargs):
+ try:
+ f(*args, **kwargs)
+ except exc_class as err:
+ err_msg = str(err)
+ self.assert_(message in err_msg, 'Error message %r did not '
+ 'have expected substring %r' % (err_msg, message))
+ else:
+ self.fail('%r did not raise %s' % (message, exc_class.__name__))
+
+ def test_policy_baseclass_instantiate(self):
+ self.assertRaisesWithMessage(TypeError,
+ "Can't instantiate BaseStoragePolicy",
+ BaseStoragePolicy, 1, 'one')
+
+ @patch_policies([
+ StoragePolicy(0, 'zero', is_default=True),
+ StoragePolicy(1, 'one'),
+ StoragePolicy(2, 'two'),
+ StoragePolicy(3, 'three', is_deprecated=True),
+ ECStoragePolicy(10, 'ten', ec_type='jerasure_rs_vand',
+ ec_ndata=10, ec_nparity=4),
+ ])
def test_swift_info(self):
# the deprecated 'three' should not exist in expect
expect = [{'default': True, 'name': 'zero'},
{'name': 'two'},
- {'name': 'one'}]
+ {'name': 'one'},
+ {'name': 'ten'}]
swift_info = POLICIES.get_policy_info()
self.assertEquals(sorted(expect, key=lambda k: k['name']),
sorted(swift_info, key=lambda k: k['name']))
@@ -48,10 +83,48 @@ class TestStoragePolicies(unittest.TestCase):
def test_get_policy_string(self):
self.assertEquals(get_policy_string('something', 0), 'something')
self.assertEquals(get_policy_string('something', None), 'something')
+ self.assertEquals(get_policy_string('something', ''), 'something')
self.assertEquals(get_policy_string('something', 1),
'something' + '-1')
self.assertRaises(PolicyError, get_policy_string, 'something', 99)
+ @patch_policies
+ def test_split_policy_string(self):
+ expectations = {
+ 'something': ('something', POLICIES[0]),
+ 'something-1': ('something', POLICIES[1]),
+ 'tmp': ('tmp', POLICIES[0]),
+ 'objects': ('objects', POLICIES[0]),
+ 'tmp-1': ('tmp', POLICIES[1]),
+ 'objects-1': ('objects', POLICIES[1]),
+ 'objects-': PolicyError,
+ 'objects-0': PolicyError,
+ 'objects--1': ('objects-', POLICIES[1]),
+ 'objects-+1': PolicyError,
+ 'objects--': PolicyError,
+ 'objects-foo': PolicyError,
+ 'objects--bar': PolicyError,
+ 'objects-+bar': PolicyError,
+ # questionable, demonstrated as inverse of get_policy_string
+ 'objects+0': ('objects+0', POLICIES[0]),
+ '': ('', POLICIES[0]),
+ '0': ('0', POLICIES[0]),
+ '-1': ('', POLICIES[1]),
+ }
+ for policy_string, expected in expectations.items():
+ if expected == PolicyError:
+ try:
+ invalid = split_policy_string(policy_string)
+ except PolicyError:
+ continue # good
+ else:
+ self.fail('The string %r returned %r '
+ 'instead of raising a PolicyError' %
+ (policy_string, invalid))
+ self.assertEqual(expected, split_policy_string(policy_string))
+ # should be inverse of get_policy_string
+ self.assertEqual(policy_string, get_policy_string(*expected))
+
def test_defaults(self):
self.assertTrue(len(POLICIES) > 0)
@@ -66,7 +139,9 @@ class TestStoragePolicies(unittest.TestCase):
def test_storage_policy_repr(self):
test_policies = [StoragePolicy(0, 'aay', True),
StoragePolicy(1, 'bee', False),
- StoragePolicy(2, 'cee', False)]
+ StoragePolicy(2, 'cee', False),
+ ECStoragePolicy(10, 'ten', ec_type='jerasure_rs_vand',
+ ec_ndata=10, ec_nparity=3)]
policies = StoragePolicyCollection(test_policies)
for policy in policies:
policy_repr = repr(policy)
@@ -75,6 +150,13 @@ class TestStoragePolicies(unittest.TestCase):
self.assert_('is_deprecated=%s' % policy.is_deprecated in
policy_repr)
self.assert_(policy.name in policy_repr)
+ if policy.policy_type == EC_POLICY:
+ self.assert_('ec_type=%s' % policy.ec_type in policy_repr)
+ self.assert_('ec_ndata=%s' % policy.ec_ndata in policy_repr)
+ self.assert_('ec_nparity=%s' %
+ policy.ec_nparity in policy_repr)
+ self.assert_('ec_segment_size=%s' %
+ policy.ec_segment_size in policy_repr)
collection_repr = repr(policies)
collection_repr_lines = collection_repr.splitlines()
self.assert_(policies.__class__.__name__ in collection_repr_lines[0])
@@ -157,15 +239,16 @@ class TestStoragePolicies(unittest.TestCase):
def test_validate_policy_params(self):
StoragePolicy(0, 'name') # sanity
# bogus indexes
- self.assertRaises(PolicyError, StoragePolicy, 'x', 'name')
- self.assertRaises(PolicyError, StoragePolicy, -1, 'name')
+ self.assertRaises(PolicyError, FakeStoragePolicy, 'x', 'name')
+ self.assertRaises(PolicyError, FakeStoragePolicy, -1, 'name')
+
# non-zero Policy-0
- self.assertRaisesWithMessage(PolicyError, 'reserved', StoragePolicy,
- 1, 'policy-0')
+ self.assertRaisesWithMessage(PolicyError, 'reserved',
+ FakeStoragePolicy, 1, 'policy-0')
# deprecate default
self.assertRaisesWithMessage(
PolicyError, 'Deprecated policy can not be default',
- StoragePolicy, 1, 'Policy-1', is_default=True,
+ FakeStoragePolicy, 1, 'Policy-1', is_default=True,
is_deprecated=True)
# weird names
names = (
@@ -178,7 +261,7 @@ class TestStoragePolicies(unittest.TestCase):
)
for name in names:
self.assertRaisesWithMessage(PolicyError, 'Invalid name',
- StoragePolicy, 1, name)
+ FakeStoragePolicy, 1, name)
def test_validate_policies_names(self):
# duplicate names
@@ -188,6 +271,40 @@ class TestStoragePolicies(unittest.TestCase):
self.assertRaises(PolicyError, StoragePolicyCollection,
test_policies)
+ def test_validate_policies_type_default(self):
+ # no type specified - make sure the policy is initialized to
+ # DEFAULT_POLICY_TYPE
+ test_policy = FakeStoragePolicy(0, 'zero', True)
+ self.assertEquals(test_policy.policy_type, 'fake')
+
+ def test_validate_policies_type_invalid(self):
+ class BogusStoragePolicy(FakeStoragePolicy):
+ policy_type = 'bogus'
+ # unsupported policy type - initialization with FakeStoragePolicy
+ self.assertRaisesWithMessage(PolicyError, 'Invalid type',
+ BogusStoragePolicy, 1, 'one')
+
+ def test_policies_type_attribute(self):
+ test_policies = [
+ StoragePolicy(0, 'zero', is_default=True),
+ StoragePolicy(1, 'one'),
+ StoragePolicy(2, 'two'),
+ StoragePolicy(3, 'three', is_deprecated=True),
+ ECStoragePolicy(10, 'ten', ec_type='jerasure_rs_vand',
+ ec_ndata=10, ec_nparity=3),
+ ]
+ policies = StoragePolicyCollection(test_policies)
+ self.assertEquals(policies.get_by_index(0).policy_type,
+ REPL_POLICY)
+ self.assertEquals(policies.get_by_index(1).policy_type,
+ REPL_POLICY)
+ self.assertEquals(policies.get_by_index(2).policy_type,
+ REPL_POLICY)
+ self.assertEquals(policies.get_by_index(3).policy_type,
+ REPL_POLICY)
+ self.assertEquals(policies.get_by_index(10).policy_type,
+ EC_POLICY)
+
def test_names_are_normalized(self):
test_policies = [StoragePolicy(0, 'zero', True),
StoragePolicy(1, 'ZERO', False)]
@@ -207,16 +324,6 @@ class TestStoragePolicies(unittest.TestCase):
self.assertEqual(pol1, policies.get_by_name(name))
self.assertEqual(policies.get_by_name(name).name, 'One')
- def assertRaisesWithMessage(self, exc_class, message, f, *args, **kwargs):
- try:
- f(*args, **kwargs)
- except exc_class as err:
- err_msg = str(err)
- self.assert_(message in err_msg, 'Error message %r did not '
- 'have expected substring %r' % (err_msg, message))
- else:
- self.fail('%r did not raise %s' % (message, exc_class.__name__))
-
def test_deprecated_default(self):
bad_conf = self._conf("""
[storage-policy:1]
@@ -395,6 +502,133 @@ class TestStoragePolicies(unittest.TestCase):
self.assertRaisesWithMessage(PolicyError, 'Invalid name',
parse_storage_policies, bad_conf)
+ # policy_type = erasure_coding
+
+ # missing ec_type, ec_num_data_fragments and ec_num_parity_fragments
+ bad_conf = self._conf("""
+ [storage-policy:0]
+ name = zero
+ [storage-policy:1]
+ name = ec10-4
+ policy_type = erasure_coding
+ """)
+
+ self.assertRaisesWithMessage(PolicyError, 'Missing ec_type',
+ parse_storage_policies, bad_conf)
+
+ # missing ec_type, but other options valid...
+ bad_conf = self._conf("""
+ [storage-policy:0]
+ name = zero
+ [storage-policy:1]
+ name = ec10-4
+ policy_type = erasure_coding
+ ec_num_data_fragments = 10
+ ec_num_parity_fragments = 4
+ """)
+
+ self.assertRaisesWithMessage(PolicyError, 'Missing ec_type',
+ parse_storage_policies, bad_conf)
+
+ # ec_type specified, but invalid...
+ bad_conf = self._conf("""
+ [storage-policy:0]
+ name = zero
+ default = yes
+ [storage-policy:1]
+ name = ec10-4
+ policy_type = erasure_coding
+ ec_type = garbage_alg
+ ec_num_data_fragments = 10
+ ec_num_parity_fragments = 4
+ """)
+
+ self.assertRaisesWithMessage(PolicyError,
+ 'Wrong ec_type garbage_alg for policy '
+ 'ec10-4, should be one of "%s"' %
+ (', '.join(VALID_EC_TYPES)),
+ parse_storage_policies, bad_conf)
+
+ # missing and invalid ec_num_parity_fragments
+ bad_conf = self._conf("""
+ [storage-policy:0]
+ name = zero
+ [storage-policy:1]
+ name = ec10-4
+ policy_type = erasure_coding
+ ec_type = jerasure_rs_vand
+ ec_num_data_fragments = 10
+ """)
+
+ self.assertRaisesWithMessage(PolicyError,
+ 'Invalid ec_num_parity_fragments',
+ parse_storage_policies, bad_conf)
+
+ for num_parity in ('-4', '0', 'x'):
+ bad_conf = self._conf("""
+ [storage-policy:0]
+ name = zero
+ [storage-policy:1]
+ name = ec10-4
+ policy_type = erasure_coding
+ ec_type = jerasure_rs_vand
+ ec_num_data_fragments = 10
+ ec_num_parity_fragments = %s
+ """ % num_parity)
+
+ self.assertRaisesWithMessage(PolicyError,
+ 'Invalid ec_num_parity_fragments',
+ parse_storage_policies, bad_conf)
+
+ # missing and invalid ec_num_data_fragments
+ bad_conf = self._conf("""
+ [storage-policy:0]
+ name = zero
+ [storage-policy:1]
+ name = ec10-4
+ policy_type = erasure_coding
+ ec_type = jerasure_rs_vand
+ ec_num_parity_fragments = 4
+ """)
+
+ self.assertRaisesWithMessage(PolicyError,
+ 'Invalid ec_num_data_fragments',
+ parse_storage_policies, bad_conf)
+
+ for num_data in ('-10', '0', 'x'):
+ bad_conf = self._conf("""
+ [storage-policy:0]
+ name = zero
+ [storage-policy:1]
+ name = ec10-4
+ policy_type = erasure_coding
+ ec_type = jerasure_rs_vand
+ ec_num_data_fragments = %s
+ ec_num_parity_fragments = 4
+ """ % num_data)
+
+ self.assertRaisesWithMessage(PolicyError,
+ 'Invalid ec_num_data_fragments',
+ parse_storage_policies, bad_conf)
+
+ # invalid ec_object_segment_size
+ for segment_size in ('-4', '0', 'x'):
+ bad_conf = self._conf("""
+ [storage-policy:0]
+ name = zero
+ [storage-policy:1]
+ name = ec10-4
+ policy_type = erasure_coding
+ ec_object_segment_size = %s
+ ec_type = jerasure_rs_vand
+ ec_num_data_fragments = 10
+ ec_num_parity_fragments = 4
+ """ % segment_size)
+
+ self.assertRaisesWithMessage(PolicyError,
+ 'Invalid ec_object_segment_size',
+ parse_storage_policies, bad_conf)
+
# Additional section added to ensure parser ignores other sections
conf = self._conf("""
[some-other-section]
@@ -430,6 +664,8 @@ class TestStoragePolicies(unittest.TestCase):
self.assertEquals("zero", policies.get_by_index(None).name)
self.assertEquals("zero", policies.get_by_index('').name)
+ self.assertEqual(policies.get_by_index(0), policies.legacy)
+
def test_reload_invalid_storage_policies(self):
conf = self._conf("""
[storage-policy:0]
@@ -512,18 +748,124 @@ class TestStoragePolicies(unittest.TestCase):
for policy in POLICIES:
self.assertEqual(POLICIES[int(policy)], policy)
- def test_storage_policy_get_options(self):
- policy = StoragePolicy(1, 'gold', True, False)
- self.assertEqual({'name': 'gold',
- 'default': True,
- 'deprecated': False},
- policy.get_options())
-
- policy = StoragePolicy(1, 'gold', False, True)
- self.assertEqual({'name': 'gold',
- 'default': False,
- 'deprecated': True},
- policy.get_options())
+ def test_quorum_size_replication(self):
+ expected_sizes = {1: 1,
+ 2: 2,
+ 3: 2,
+ 4: 3,
+ 5: 3}
+ for n, expected in expected_sizes.items():
+ policy = StoragePolicy(0, 'zero',
+ object_ring=FakeRing(replicas=n))
+ self.assertEqual(policy.quorum, expected)
+
+ def test_quorum_size_erasure_coding(self):
+ test_ec_policies = [
+ ECStoragePolicy(10, 'ec8-2', ec_type='jerasure_rs_vand',
+ ec_ndata=8, ec_nparity=2),
+ ECStoragePolicy(11, 'df10-6', ec_type='flat_xor_hd_4',
+ ec_ndata=10, ec_nparity=6),
+ ]
+ for ec_policy in test_ec_policies:
+ k = ec_policy.ec_ndata
+ expected_size = \
+ k + ec_policy.pyeclib_driver.min_parity_fragments_needed()
+ self.assertEqual(expected_size, ec_policy.quorum)
+
+ def test_validate_ring(self):
+ test_policies = [
+ ECStoragePolicy(0, 'ec8-2', ec_type='jerasure_rs_vand',
+ ec_ndata=8, ec_nparity=2,
+ object_ring=FakeRing(replicas=8),
+ is_default=True),
+ ECStoragePolicy(1, 'ec10-4', ec_type='jerasure_rs_vand',
+ ec_ndata=10, ec_nparity=4,
+ object_ring=FakeRing(replicas=10)),
+ ECStoragePolicy(2, 'ec4-2', ec_type='jerasure_rs_vand',
+ ec_ndata=4, ec_nparity=2,
+ object_ring=FakeRing(replicas=7)),
+ ]
+ policies = StoragePolicyCollection(test_policies)
+
+ for policy in policies:
+ msg = 'EC ring for policy %s needs to be configured with ' \
+ 'exactly %d nodes.' % \
+ (policy.name, policy.ec_ndata + policy.ec_nparity)
+ self.assertRaisesWithMessage(
+ RingValidationError, msg,
+ policy._validate_ring)
+
+ def test_storage_policy_get_info(self):
+ test_policies = [
+ StoragePolicy(0, 'zero', is_default=True),
+ StoragePolicy(1, 'one', is_deprecated=True),
+ ECStoragePolicy(10, 'ten',
+ ec_type='jerasure_rs_vand',
+ ec_ndata=10, ec_nparity=3),
+ ECStoragePolicy(11, 'done', is_deprecated=True,
+ ec_type='jerasure_rs_vand',
+ ec_ndata=10, ec_nparity=3),
+ ]
+ policies = StoragePolicyCollection(test_policies)
+ expected = {
+ # default replication
+ (0, True): {
+ 'name': 'zero',
+ 'default': True,
+ 'deprecated': False,
+ 'policy_type': REPL_POLICY
+ },
+ (0, False): {
+ 'name': 'zero',
+ 'default': True,
+ },
+ # deprecated replication
+ (1, True): {
+ 'name': 'one',
+ 'default': False,
+ 'deprecated': True,
+ 'policy_type': REPL_POLICY
+ },
+ (1, False): {
+ 'name': 'one',
+ 'deprecated': True,
+ },
+ # enabled ec
+ (10, True): {
+ 'name': 'ten',
+ 'default': False,
+ 'deprecated': False,
+ 'policy_type': EC_POLICY,
+ 'ec_type': 'jerasure_rs_vand',
+ 'ec_num_data_fragments': 10,
+ 'ec_num_parity_fragments': 3,
+ 'ec_object_segment_size': DEFAULT_EC_OBJECT_SEGMENT_SIZE,
+ },
+ (10, False): {
+ 'name': 'ten',
+ },
+ # deprecated ec
+ (11, True): {
+ 'name': 'done',
+ 'default': False,
+ 'deprecated': True,
+ 'policy_type': EC_POLICY,
+ 'ec_type': 'jerasure_rs_vand',
+ 'ec_num_data_fragments': 10,
+ 'ec_num_parity_fragments': 3,
+ 'ec_object_segment_size': DEFAULT_EC_OBJECT_SEGMENT_SIZE,
+ },
+ (11, False): {
+ 'name': 'done',
+ 'deprecated': True,
+ },
+ }
+ self.maxDiff = None
+ for policy in policies:
+ expected_info = expected[(int(policy), True)]
+ self.assertEqual(policy.get_info(config=True), expected_info)
+ expected_info = expected[(int(policy), False)]
+ self.assertEqual(policy.get_info(config=False), expected_info)
if __name__ == '__main__':
diff --git a/test/unit/container/test_sync.py b/test/unit/container/test_sync.py
index 645c7935c..aa5cebc28 100644
--- a/test/unit/container/test_sync.py
+++ b/test/unit/container/test_sync.py
@@ -652,7 +652,7 @@ class TestContainerSync(unittest.TestCase):
fake_logger = FakeLogger()
def fake_delete_object(path, name=None, headers=None, proxy=None,
- logger=None):
+ logger=None, timeout=None):
self.assertEquals(path, 'http://sync/to/path')
self.assertEquals(name, 'object')
if realm:
@@ -666,6 +666,7 @@ class TestContainerSync(unittest.TestCase):
{'x-container-sync-key': 'key', 'x-timestamp': '1.2'})
self.assertEquals(proxy, 'http://proxy')
self.assertEqual(logger, fake_logger)
+ self.assertEqual(timeout, 5.0)
sync.delete_object = fake_delete_object
cs = sync.ContainerSync({}, container_ring=FakeRing())
@@ -759,7 +760,8 @@ class TestContainerSync(unittest.TestCase):
fake_logger = FakeLogger()
def fake_put_object(sync_to, name=None, headers=None,
- contents=None, proxy=None, logger=None):
+ contents=None, proxy=None, logger=None,
+ timeout=None):
self.assertEquals(sync_to, 'http://sync/to/path')
self.assertEquals(name, 'object')
if realm:
@@ -780,6 +782,7 @@ class TestContainerSync(unittest.TestCase):
self.assertEquals(contents.read(), 'contents')
self.assertEquals(proxy, 'http://proxy')
self.assertEqual(logger, fake_logger)
+ self.assertEqual(timeout, 5.0)
sync.put_object = fake_put_object
diff --git a/test/unit/obj/test_diskfile.py b/test/unit/obj/test_diskfile.py
index cc6747555..8ccb2618f 100644
--- a/test/unit/obj/test_diskfile.py
+++ b/test/unit/obj/test_diskfile.py
@@ -140,9 +140,10 @@ class TestDiskFileModuleMethods(unittest.TestCase):
pn = '/objects-1/0/606/198452b6ef6247c78606/1401379842.14643.data'
self.assertEqual(diskfile.extract_policy_index(pn), 1)
- # bad policy index
+ # well formatted but, unknown policy index
pn = 'objects-2/0/606/198427efcff042c78606/1401379842.14643.data'
- self.assertEqual(diskfile.extract_policy_index(pn), 0)
+ self.assertRaises(ValueError,
+ diskfile.extract_policy_index, pn)
bad_path = '/srv/node/sda1/objects-t/1/abc/def/1234.data'
self.assertRaises(ValueError,
diskfile.extract_policy_index, bad_path)
diff --git a/test/unit/obj/test_server.py b/test/unit/obj/test_server.py
index 1823a9014..583c9a9e3 100755
--- a/test/unit/obj/test_server.py
+++ b/test/unit/obj/test_server.py
@@ -4510,6 +4510,7 @@ class TestObjectServer(unittest.TestCase):
resp.close()
+@patch_policies
class TestZeroCopy(unittest.TestCase):
"""Test the object server's zero-copy functionality"""