summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSamuel Merritt <sam@swiftstack.com>2014-10-22 13:18:34 -0700
committerClay Gerrard <clay.gerrard@gmail.com>2015-04-14 00:52:17 -0700
commitdecbcd24d41d6367901db16aaa2578f74870b6b5 (patch)
tree15eaa73f3936610fe14fdff8429ff2cfa8356376
parentb1eda4aef8a228961d5aafe7e4fbd4e812d233ad (diff)
downloadswift-decbcd24d41d6367901db16aaa2578f74870b6b5.tar.gz
Foundational support for PUT and GET of erasure-coded objects
This commit makes it possible to PUT an object into Swift and have it stored using erasure coding instead of replication, and also to GET the object back from Swift at a later time. This works by splitting the incoming object into a number of segments, erasure-coding each segment in turn to get fragments, then concatenating the fragments into fragment archives. Segments are 1 MiB in size, except the last, which is between 1 B and 1 MiB. +====================================================================+ | object data | +====================================================================+ | +------------------------+----------------------+ | | | v v v +===================+ +===================+ +==============+ | segment 1 | | segment 2 | ... | segment N | +===================+ +===================+ +==============+ | | | | v v /=========\ /=========\ | pyeclib | | pyeclib | ... \=========/ \=========/ | | | | +--> fragment A-1 +--> fragment A-2 | | | | | | | | | | +--> fragment B-1 +--> fragment B-2 | | | | ... ... Then, object server A gets the concatenation of fragment A-1, A-2, ..., A-N, so its .data file looks like this (called a "fragment archive"): +=====================================================================+ | fragment A-1 | fragment A-2 | ... | fragment A-N | +=====================================================================+ Since this means that the object server never sees the object data as the client sent it, we have to do a few things to ensure data integrity. First, the proxy has to check the Etag if the client provided it; the object server can't do it since the object server doesn't see the raw data. Second, if the client does not provide an Etag, the proxy computes it and uses the MIME-PUT mechanism to provide it to the object servers after the object body. Otherwise, the object would not have an Etag at all. Third, the proxy computes the MD5 of each fragment archive and sends it to the object server using the MIME-PUT mechanism. With replicated objects, the proxy checks that the Etags from all the object servers match, and if they don't, returns a 500 to the client. This mitigates the risk of data corruption in one of the proxy --> object connections, and signals to the client when it happens. With EC objects, we can't use that same mechanism, so we must send the checksum with each fragment archive to get comparable protection. On the GET path, the inverse happens: the proxy connects to a bunch of object servers (M of them, for an M+K scheme), reads one fragment at a time from each fragment archive, decodes those fragments into a segment, and serves the segment to the client. When an object server dies partway through a GET response, any partially-fetched fragment is discarded, the resumption point is wound back to the nearest fragment boundary, and the GET is retried with the next object server. GET requests for a single byterange work; GET requests for multiple byteranges do not. There are a number of things _not_ included in this commit. Some of them are listed here: * multi-range GET * deferred cleanup of old .data files * durability (daemon to reconstruct missing archives) Co-Authored-By: Alistair Coles <alistair.coles@hp.com> Co-Authored-By: Thiago da Silva <thiago@redhat.com> Co-Authored-By: John Dickinson <me@not.mn> Co-Authored-By: Clay Gerrard <clay.gerrard@gmail.com> Co-Authored-By: Tushar Gohad <tushar.gohad@intel.com> Co-Authored-By: Paul Luse <paul.e.luse@intel.com> Co-Authored-By: Christian Schwede <christian.schwede@enovance.com> Co-Authored-By: Yuan Zhou <yuan.zhou@intel.com> Change-Id: I9c13c03616489f8eab7dcd7c5f21237ed4cb6fd2
-rw-r--r--swift/common/exceptions.py22
-rw-r--r--swift/common/middleware/formpost.py9
-rw-r--r--swift/common/ring/ring.py3
-rw-r--r--swift/common/storage_policy.py30
-rw-r--r--swift/common/utils.py42
-rw-r--r--swift/obj/diskfile.py5
-rw-r--r--swift/proxy/controllers/__init__.py4
-rw-r--r--swift/proxy/controllers/account.py5
-rw-r--r--swift/proxy/controllers/base.py171
-rw-r--r--swift/proxy/controllers/container.py3
-rw-r--r--swift/proxy/controllers/obj.py1228
-rw-r--r--swift/proxy/server.py63
-rw-r--r--test/unit/__init__.py8
-rw-r--r--test/unit/account/test_reaper.py3
-rw-r--r--test/unit/common/ring/test_ring.py61
-rw-r--r--test/unit/common/test_utils.py24
-rw-r--r--test/unit/proxy/controllers/test_base.py101
-rwxr-xr-xtest/unit/proxy/controllers/test_obj.py1266
-rw-r--r--test/unit/proxy/test_server.py1377
19 files changed, 3875 insertions, 550 deletions
diff --git a/swift/common/exceptions.py b/swift/common/exceptions.py
index d7ea759d6..064925431 100644
--- a/swift/common/exceptions.py
+++ b/swift/common/exceptions.py
@@ -31,10 +31,28 @@ class SwiftException(Exception):
pass
+class PutterConnectError(Exception):
+
+ def __init__(self, status=None):
+ self.status = status
+
+
class InvalidTimestamp(SwiftException):
pass
+class InsufficientStorage(SwiftException):
+ pass
+
+
+class FooterNotSupported(SwiftException):
+ pass
+
+
+class MultiphasePUTNotSupported(SwiftException):
+ pass
+
+
class DiskFileError(SwiftException):
pass
@@ -103,6 +121,10 @@ class ConnectionTimeout(Timeout):
pass
+class ResponseTimeout(Timeout):
+ pass
+
+
class DriveNotMounted(SwiftException):
pass
diff --git a/swift/common/middleware/formpost.py b/swift/common/middleware/formpost.py
index 7132b342a..56a6d20f3 100644
--- a/swift/common/middleware/formpost.py
+++ b/swift/common/middleware/formpost.py
@@ -218,7 +218,14 @@ class FormPost(object):
env, attrs['boundary'])
start_response(status, headers)
return [body]
- except (FormInvalid, MimeInvalid, EOFError) as err:
+ except MimeInvalid:
+ body = 'FormPost: invalid starting boundary'
+ start_response(
+ '400 Bad Request',
+ (('Content-Type', 'text/plain'),
+ ('Content-Length', str(len(body)))))
+ return [body]
+ except (FormInvalid, EOFError) as err:
body = 'FormPost: %s' % err
start_response(
'400 Bad Request',
diff --git a/swift/common/ring/ring.py b/swift/common/ring/ring.py
index daad23ff1..62e19951d 100644
--- a/swift/common/ring/ring.py
+++ b/swift/common/ring/ring.py
@@ -243,7 +243,7 @@ class Ring(object):
if dev_id not in seen_ids:
part_nodes.append(self.devs[dev_id])
seen_ids.add(dev_id)
- return part_nodes
+ return [dict(node, index=i) for i, node in enumerate(part_nodes)]
def get_part(self, account, container=None, obj=None):
"""
@@ -291,6 +291,7 @@ class Ring(object):
====== ===============================================================
id unique integer identifier amongst devices
+ index offset into the primary node list for the partition
weight a float of the relative weight of this device as compared to
others; this indicates how many partitions the builder will try
to assign to this device
diff --git a/swift/common/storage_policy.py b/swift/common/storage_policy.py
index 23e52fc56..e45ab018c 100644
--- a/swift/common/storage_policy.py
+++ b/swift/common/storage_policy.py
@@ -356,6 +356,36 @@ class ECStoragePolicy(BaseStoragePolicy):
def ec_segment_size(self):
return self._ec_segment_size
+ @property
+ def fragment_size(self):
+ """
+ Maximum length of a fragment, including header.
+
+ NB: a fragment archive is a sequence of 0 or more max-length
+ fragments followed by one possibly-shorter fragment.
+ """
+ # Technically pyeclib's get_segment_info signature calls for
+ # (data_len, segment_size) but on a ranged GET we don't know the
+ # ec-content-length header before we need to compute where in the
+ # object we should request to align with the fragment size. So we
+ # tell pyeclib a lie - from it's perspective, as long as data_len >=
+ # segment_size it'll give us the answer we want. From our
+ # perspective, because we only use this answer to calculate the
+ # *minimum* size we should read from an object body even if data_len <
+ # segment_size we'll still only read *the whole one and only last
+ # fragment* and pass than into pyeclib who will know what to do with
+ # it just as it always does when the last fragment is < fragment_size.
+ return self.pyeclib_driver.get_segment_info(
+ self.ec_segment_size, self.ec_segment_size)['fragment_size']
+
+ @property
+ def ec_scheme_description(self):
+ """
+ This short hand form of the important parts of the ec schema is stored
+ in Object System Metadata on the EC Fragment Archives for debugging.
+ """
+ return "%s %d+%d" % (self._ec_type, self._ec_ndata, self._ec_nparity)
+
def __repr__(self):
return ("%s, EC config(ec_type=%s, ec_segment_size=%d, "
"ec_ndata=%d, ec_nparity=%d)") % (
diff --git a/swift/common/utils.py b/swift/common/utils.py
index cf7b7e7c5..19dcfd3d6 100644
--- a/swift/common/utils.py
+++ b/swift/common/utils.py
@@ -2236,11 +2236,16 @@ class GreenAsyncPile(object):
Correlating results with jobs (if necessary) is left to the caller.
"""
- def __init__(self, size):
+ def __init__(self, size_or_pool):
"""
- :param size: size pool of green threads to use
+ :param size_or_pool: thread pool size or a pool to use
"""
- self._pool = GreenPool(size)
+ if isinstance(size_or_pool, GreenPool):
+ self._pool = size_or_pool
+ size = self._pool.size
+ else:
+ self._pool = GreenPool(size_or_pool)
+ size = size_or_pool
self._responses = eventlet.queue.LightQueue(size)
self._inflight = 0
@@ -2646,6 +2651,10 @@ def public(func):
def quorum_size(n):
"""
+ quorum size as it applies to services that use 'replication' for data
+ integrity (Account/Container services). Object quorum_size is defined
+ on a storage policy basis.
+
Number of successful backend requests needed for the proxy to consider
the client request successful.
"""
@@ -3139,6 +3148,26 @@ _rfc_extension_pattern = re.compile(
r'(?:\s*;\s*(' + _rfc_token + r")\s*(?:=\s*(" + _rfc_token +
r'|"(?:[^"\\]|\\.)*"))?)')
+_content_range_pattern = re.compile(r'^bytes (\d+)-(\d+)/(\d+)$')
+
+
+def parse_content_range(content_range):
+ """
+ Parse a content-range header into (first_byte, last_byte, total_size).
+
+ See RFC 7233 section 4.2 for details on the header format, but it's
+ basically "Content-Range: bytes ${start}-${end}/${total}".
+
+ :param content_range: Content-Range header value to parse,
+ e.g. "bytes 100-1249/49004"
+ :returns: 3-tuple (start, end, total)
+ :raises: ValueError if malformed
+ """
+ found = re.search(_content_range_pattern, content_range)
+ if not found:
+ raise ValueError("malformed Content-Range %r" % (content_range,))
+ return tuple(int(x) for x in found.groups())
+
def parse_content_type(content_type):
"""
@@ -3293,8 +3322,11 @@ def iter_multipart_mime_documents(wsgi_input, boundary, read_chunk_size=4096):
:raises: MimeInvalid if the document is malformed
"""
boundary = '--' + boundary
- if wsgi_input.readline(len(boundary + '\r\n')).strip() != boundary:
- raise swift.common.exceptions.MimeInvalid('invalid starting boundary')
+ blen = len(boundary) + 2 # \r\n
+ got = wsgi_input.readline(blen)
+ if got.strip() != boundary:
+ raise swift.common.exceptions.MimeInvalid(
+ 'invalid starting boundary: wanted %r, got %r', (boundary, got))
boundary = '\r\n' + boundary
input_buffer = ''
done = False
diff --git a/swift/obj/diskfile.py b/swift/obj/diskfile.py
index 654465ee7..c49d557fe 100644
--- a/swift/obj/diskfile.py
+++ b/swift/obj/diskfile.py
@@ -530,6 +530,11 @@ class DiskFileRouter(object):
their DiskFile implementation.
"""
def register_wrapper(diskfile_cls):
+ if policy_type in cls.policy_type_to_manager_cls:
+ raise PolicyError(
+ '%r is already registered for the policy_type %r' % (
+ cls.policy_type_to_manager_cls[policy_type],
+ policy_type))
cls.policy_type_to_manager_cls[policy_type] = diskfile_cls
return diskfile_cls
return register_wrapper
diff --git a/swift/proxy/controllers/__init__.py b/swift/proxy/controllers/__init__.py
index de4c0145b..706fd9165 100644
--- a/swift/proxy/controllers/__init__.py
+++ b/swift/proxy/controllers/__init__.py
@@ -13,7 +13,7 @@
from swift.proxy.controllers.base import Controller
from swift.proxy.controllers.info import InfoController
-from swift.proxy.controllers.obj import ObjectController
+from swift.proxy.controllers.obj import ObjectControllerRouter
from swift.proxy.controllers.account import AccountController
from swift.proxy.controllers.container import ContainerController
@@ -22,5 +22,5 @@ __all__ = [
'ContainerController',
'Controller',
'InfoController',
- 'ObjectController',
+ 'ObjectControllerRouter',
]
diff --git a/swift/proxy/controllers/account.py b/swift/proxy/controllers/account.py
index ea2f8ae33..915e1c481 100644
--- a/swift/proxy/controllers/account.py
+++ b/swift/proxy/controllers/account.py
@@ -58,9 +58,10 @@ class AccountController(Controller):
constraints.MAX_ACCOUNT_NAME_LENGTH)
return resp
- partition, nodes = self.app.account_ring.get_nodes(self.account_name)
+ partition = self.app.account_ring.get_part(self.account_name)
+ node_iter = self.app.iter_nodes(self.app.account_ring, partition)
resp = self.GETorHEAD_base(
- req, _('Account'), self.app.account_ring, partition,
+ req, _('Account'), node_iter, partition,
req.swift_entity_path.rstrip('/'))
if resp.status_int == HTTP_NOT_FOUND:
if resp.headers.get('X-Account-Status', '').lower() == 'deleted':
diff --git a/swift/proxy/controllers/base.py b/swift/proxy/controllers/base.py
index 0aeb803f1..ca12d343e 100644
--- a/swift/proxy/controllers/base.py
+++ b/swift/proxy/controllers/base.py
@@ -28,6 +28,7 @@ import os
import time
import functools
import inspect
+import logging
import operator
from sys import exc_info
from swift import gettext_ as _
@@ -39,14 +40,14 @@ from eventlet.timeout import Timeout
from swift.common.wsgi import make_pre_authed_env
from swift.common.utils import Timestamp, config_true_value, \
public, split_path, list_from_csv, GreenthreadSafeIterator, \
- quorum_size, GreenAsyncPile
+ GreenAsyncPile, quorum_size, parse_content_range
from swift.common.bufferedhttp import http_connect
from swift.common.exceptions import ChunkReadTimeout, ChunkWriteTimeout, \
ConnectionTimeout
from swift.common.http import is_informational, is_success, is_redirection, \
is_server_error, HTTP_OK, HTTP_PARTIAL_CONTENT, HTTP_MULTIPLE_CHOICES, \
HTTP_BAD_REQUEST, HTTP_NOT_FOUND, HTTP_SERVICE_UNAVAILABLE, \
- HTTP_INSUFFICIENT_STORAGE, HTTP_UNAUTHORIZED
+ HTTP_INSUFFICIENT_STORAGE, HTTP_UNAUTHORIZED, HTTP_CONTINUE
from swift.common.swob import Request, Response, HeaderKeyDict, Range, \
HTTPException, HTTPRequestedRangeNotSatisfiable
from swift.common.request_helpers import strip_sys_meta_prefix, \
@@ -593,16 +594,37 @@ def close_swift_conn(src):
pass
+def bytes_to_skip(record_size, range_start):
+ """
+ Assume an object is composed of N records, where the first N-1 are all
+ the same size and the last is at most that large, but may be smaller.
+
+ When a range request is made, it might start with a partial record. This
+ must be discarded, lest the consumer get bad data. This is particularly
+ true of suffix-byte-range requests, e.g. "Range: bytes=-12345" where the
+ size of the object is unknown at the time the request is made.
+
+ This function computes the number of bytes that must be discarded to
+ ensure only whole records are yielded. Erasure-code decoding needs this.
+
+ This function could have been inlined, but it took enough tries to get
+ right that some targeted unit tests were desirable, hence its extraction.
+ """
+ return (record_size - (range_start % record_size)) % record_size
+
+
class GetOrHeadHandler(object):
- def __init__(self, app, req, server_type, ring, partition, path,
- backend_headers):
+ def __init__(self, app, req, server_type, node_iter, partition, path,
+ backend_headers, client_chunk_size=None):
self.app = app
- self.ring = ring
+ self.node_iter = node_iter
self.server_type = server_type
self.partition = partition
self.path = path
self.backend_headers = backend_headers
+ self.client_chunk_size = client_chunk_size
+ self.skip_bytes = 0
self.used_nodes = []
self.used_source_etag = ''
@@ -649,6 +671,35 @@ class GetOrHeadHandler(object):
else:
self.backend_headers['Range'] = 'bytes=%d-' % num_bytes
+ def learn_size_from_content_range(self, start, end):
+ """
+ If client_chunk_size is set, makes sure we yield things starting on
+ chunk boundaries based on the Content-Range header in the response.
+
+ Sets our first Range header to the value learned from the
+ Content-Range header in the response; if we were given a
+ fully-specified range (e.g. "bytes=123-456"), this is a no-op.
+
+ If we were given a half-specified range (e.g. "bytes=123-" or
+ "bytes=-456"), then this changes the Range header to a
+ semantically-equivalent one *and* it lets us resume on a proper
+ boundary instead of just in the middle of a piece somewhere.
+
+ If the original request is for more than one range, this does not
+ affect our backend Range header, since we don't support resuming one
+ of those anyway.
+ """
+ if self.client_chunk_size:
+ self.skip_bytes = bytes_to_skip(self.client_chunk_size, start)
+
+ if 'Range' in self.backend_headers:
+ req_range = Range(self.backend_headers['Range'])
+
+ if len(req_range.ranges) > 1:
+ return
+
+ self.backend_headers['Range'] = "bytes=%d-%d" % (start, end)
+
def is_good_source(self, src):
"""
Indicates whether or not the request made to the backend found
@@ -674,42 +725,74 @@ class GetOrHeadHandler(object):
"""
try:
nchunks = 0
- bytes_read_from_source = 0
+ client_chunk_size = self.client_chunk_size
+ bytes_consumed_from_backend = 0
node_timeout = self.app.node_timeout
if self.server_type == 'Object':
node_timeout = self.app.recoverable_node_timeout
+ buf = ''
while True:
try:
with ChunkReadTimeout(node_timeout):
chunk = source.read(self.app.object_chunk_size)
nchunks += 1
- bytes_read_from_source += len(chunk)
+ buf += chunk
except ChunkReadTimeout:
exc_type, exc_value, exc_traceback = exc_info()
if self.newest or self.server_type != 'Object':
raise exc_type, exc_value, exc_traceback
try:
- self.fast_forward(bytes_read_from_source)
+ self.fast_forward(bytes_consumed_from_backend)
except (NotImplementedError, HTTPException, ValueError):
raise exc_type, exc_value, exc_traceback
+ buf = ''
new_source, new_node = self._get_source_and_node()
if new_source:
self.app.exception_occurred(
node, _('Object'),
- _('Trying to read during GET (retrying)'))
+ _('Trying to read during GET (retrying)'),
+ level=logging.ERROR, exc_info=(
+ exc_type, exc_value, exc_traceback))
# Close-out the connection as best as possible.
if getattr(source, 'swift_conn', None):
close_swift_conn(source)
source = new_source
node = new_node
- bytes_read_from_source = 0
continue
else:
raise exc_type, exc_value, exc_traceback
+
+ if buf and self.skip_bytes:
+ if self.skip_bytes < len(buf):
+ buf = buf[self.skip_bytes:]
+ bytes_consumed_from_backend += self.skip_bytes
+ self.skip_bytes = 0
+ else:
+ self.skip_bytes -= len(buf)
+ bytes_consumed_from_backend += len(buf)
+ buf = ''
+
if not chunk:
+ if buf:
+ with ChunkWriteTimeout(self.app.client_timeout):
+ bytes_consumed_from_backend += len(buf)
+ yield buf
+ buf = ''
break
- with ChunkWriteTimeout(self.app.client_timeout):
- yield chunk
+
+ if client_chunk_size is not None:
+ while len(buf) >= client_chunk_size:
+ client_chunk = buf[:client_chunk_size]
+ buf = buf[client_chunk_size:]
+ with ChunkWriteTimeout(self.app.client_timeout):
+ yield client_chunk
+ bytes_consumed_from_backend += len(client_chunk)
+ else:
+ with ChunkWriteTimeout(self.app.client_timeout):
+ yield buf
+ bytes_consumed_from_backend += len(buf)
+ buf = ''
+
# This is for fairness; if the network is outpacing the CPU,
# we'll always be able to read and write data without
# encountering an EWOULDBLOCK, and so eventlet will not switch
@@ -757,7 +840,7 @@ class GetOrHeadHandler(object):
node_timeout = self.app.node_timeout
if self.server_type == 'Object' and not self.newest:
node_timeout = self.app.recoverable_node_timeout
- for node in self.app.iter_nodes(self.ring, self.partition):
+ for node in self.node_iter:
if node in self.used_nodes:
continue
start_node_timing = time.time()
@@ -793,8 +876,10 @@ class GetOrHeadHandler(object):
src_headers = dict(
(k.lower(), v) for k, v in
possible_source.getheaders())
- if src_headers.get('etag', '').strip('"') != \
- self.used_source_etag:
+
+ if self.used_source_etag != src_headers.get(
+ 'x-object-sysmeta-ec-etag',
+ src_headers.get('etag', '')).strip('"'):
self.statuses.append(HTTP_NOT_FOUND)
self.reasons.append('')
self.bodies.append('')
@@ -832,7 +917,9 @@ class GetOrHeadHandler(object):
src_headers = dict(
(k.lower(), v) for k, v in
possible_source.getheaders())
- self.used_source_etag = src_headers.get('etag', '').strip('"')
+ self.used_source_etag = src_headers.get(
+ 'x-object-sysmeta-ec-etag',
+ src_headers.get('etag', '')).strip('"')
return source, node
return None, None
@@ -841,13 +928,17 @@ class GetOrHeadHandler(object):
res = None
if source:
res = Response(request=req)
+ res.status = source.status
+ update_headers(res, source.getheaders())
if req.method == 'GET' and \
source.status in (HTTP_OK, HTTP_PARTIAL_CONTENT):
+ cr = res.headers.get('Content-Range')
+ if cr:
+ start, end, total = parse_content_range(cr)
+ self.learn_size_from_content_range(start, end)
res.app_iter = self._make_app_iter(req, node, source)
# See NOTE: swift_conn at top of file about this.
res.swift_conn = source.swift_conn
- res.status = source.status
- update_headers(res, source.getheaders())
if not res.environ:
res.environ = {}
res.environ['swift_x_timestamp'] = \
@@ -993,7 +1084,8 @@ class Controller(object):
else:
info['partition'] = part
info['nodes'] = nodes
- info.setdefault('storage_policy', '0')
+ if info.get('storage_policy') is None:
+ info['storage_policy'] = 0
return info
def _make_request(self, nodes, part, method, path, headers, query,
@@ -1098,6 +1190,13 @@ class Controller(object):
'%s %s' % (self.server_type, req.method),
overrides=overrides, headers=resp_headers)
+ def _quorum_size(self, n):
+ """
+ Number of successful backend responses needed for the proxy to
+ consider the client request successful.
+ """
+ return quorum_size(n)
+
def have_quorum(self, statuses, node_count):
"""
Given a list of statuses from several requests, determine if
@@ -1107,16 +1206,18 @@ class Controller(object):
:param node_count: number of nodes being queried (basically ring count)
:returns: True or False, depending on if quorum is established
"""
- quorum = quorum_size(node_count)
+ quorum = self._quorum_size(node_count)
if len(statuses) >= quorum:
- for hundred in (HTTP_OK, HTTP_MULTIPLE_CHOICES, HTTP_BAD_REQUEST):
+ for hundred in (HTTP_CONTINUE, HTTP_OK, HTTP_MULTIPLE_CHOICES,
+ HTTP_BAD_REQUEST):
if sum(1 for s in statuses
if hundred <= s < hundred + 100) >= quorum:
return True
return False
def best_response(self, req, statuses, reasons, bodies, server_type,
- etag=None, headers=None, overrides=None):
+ etag=None, headers=None, overrides=None,
+ quorum_size=None):
"""
Given a list of responses from several servers, choose the best to
return to the API.
@@ -1128,10 +1229,16 @@ class Controller(object):
:param server_type: type of server the responses came from
:param etag: etag
:param headers: headers of each response
+ :param overrides: overrides to apply when lacking quorum
+ :param quorum_size: quorum size to use
:returns: swob.Response object with the correct status, body, etc. set
"""
+ if quorum_size is None:
+ quorum_size = self._quorum_size(len(statuses))
+
resp = self._compute_quorum_response(
- req, statuses, reasons, bodies, etag, headers)
+ req, statuses, reasons, bodies, etag, headers,
+ quorum_size=quorum_size)
if overrides and not resp:
faked_up_status_indices = set()
transformed = []
@@ -1145,7 +1252,8 @@ class Controller(object):
statuses, reasons, headers, bodies = zip(*transformed)
resp = self._compute_quorum_response(
req, statuses, reasons, bodies, etag, headers,
- indices_to_avoid=faked_up_status_indices)
+ indices_to_avoid=faked_up_status_indices,
+ quorum_size=quorum_size)
if not resp:
resp = Response(request=req)
@@ -1156,14 +1264,14 @@ class Controller(object):
return resp
def _compute_quorum_response(self, req, statuses, reasons, bodies, etag,
- headers, indices_to_avoid=()):
+ headers, quorum_size, indices_to_avoid=()):
if not statuses:
return None
for hundred in (HTTP_OK, HTTP_MULTIPLE_CHOICES, HTTP_BAD_REQUEST):
hstatuses = \
[(i, s) for i, s in enumerate(statuses)
if hundred <= s < hundred + 100]
- if len(hstatuses) >= quorum_size(len(statuses)):
+ if len(hstatuses) >= quorum_size:
resp = Response(request=req)
try:
status_index, status = max(
@@ -1228,22 +1336,25 @@ class Controller(object):
else:
self.app.logger.warning('Could not autocreate account %r' % path)
- def GETorHEAD_base(self, req, server_type, ring, partition, path):
+ def GETorHEAD_base(self, req, server_type, node_iter, partition, path,
+ client_chunk_size=None):
"""
Base handler for HTTP GET or HEAD requests.
:param req: swob.Request object
:param server_type: server type used in logging
- :param ring: the ring to obtain nodes from
+ :param node_iter: an iterator to obtain nodes from
:param partition: partition
:param path: path for the request
+ :param client_chunk_size: chunk size for response body iterator
:returns: swob.Response object
"""
backend_headers = self.generate_request_headers(
req, additional=req.headers)
- handler = GetOrHeadHandler(self.app, req, self.server_type, ring,
- partition, path, backend_headers)
+ handler = GetOrHeadHandler(self.app, req, self.server_type, node_iter,
+ partition, path, backend_headers,
+ client_chunk_size=client_chunk_size)
res = handler.get_working_response(req)
if not res:
diff --git a/swift/proxy/controllers/container.py b/swift/proxy/controllers/container.py
index fb422e68d..3e4a2bb03 100644
--- a/swift/proxy/controllers/container.py
+++ b/swift/proxy/controllers/container.py
@@ -93,8 +93,9 @@ class ContainerController(Controller):
return HTTPNotFound(request=req)
part = self.app.container_ring.get_part(
self.account_name, self.container_name)
+ node_iter = self.app.iter_nodes(self.app.container_ring, part)
resp = self.GETorHEAD_base(
- req, _('Container'), self.app.container_ring, part,
+ req, _('Container'), node_iter, part,
req.swift_entity_path)
if 'swift.authorize' in req.environ:
req.acl = resp.headers.get('x-container-read')
diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py
index 5407c0e73..40e15e48e 100644
--- a/swift/proxy/controllers/obj.py
+++ b/swift/proxy/controllers/obj.py
@@ -24,13 +24,17 @@
# These shenanigans are to ensure all related objects can be garbage
# collected. We've seen objects hang around forever otherwise.
+import collections
import itertools
import mimetypes
import time
import math
+import random
+from hashlib import md5
from swift import gettext_ as _
from urllib import unquote, quote
+from greenlet import GreenletExit
from eventlet import GreenPile
from eventlet.queue import Queue
from eventlet.timeout import Timeout
@@ -38,7 +42,8 @@ from eventlet.timeout import Timeout
from swift.common.utils import (
clean_content_type, config_true_value, ContextPool, csv_append,
GreenAsyncPile, GreenthreadSafeIterator, json, Timestamp,
- normalize_delete_at_timestamp, public, quorum_size, get_expirer_container)
+ normalize_delete_at_timestamp, public, get_expirer_container,
+ quorum_size)
from swift.common.bufferedhttp import http_connect
from swift.common.constraints import check_metadata, check_object_creation, \
check_copy_from_header, check_destination_header, \
@@ -46,21 +51,24 @@ from swift.common.constraints import check_metadata, check_object_creation, \
from swift.common import constraints
from swift.common.exceptions import ChunkReadTimeout, \
ChunkWriteTimeout, ConnectionTimeout, ListingIterNotFound, \
- ListingIterNotAuthorized, ListingIterError
+ ListingIterNotAuthorized, ListingIterError, ResponseTimeout, \
+ InsufficientStorage, FooterNotSupported, MultiphasePUTNotSupported, \
+ PutterConnectError
from swift.common.http import (
is_success, is_client_error, is_server_error, HTTP_CONTINUE, HTTP_CREATED,
HTTP_MULTIPLE_CHOICES, HTTP_NOT_FOUND, HTTP_INTERNAL_SERVER_ERROR,
HTTP_SERVICE_UNAVAILABLE, HTTP_INSUFFICIENT_STORAGE,
- HTTP_PRECONDITION_FAILED, HTTP_CONFLICT)
-from swift.common.storage_policy import POLICIES
+ HTTP_PRECONDITION_FAILED, HTTP_CONFLICT, is_informational)
+from swift.common.storage_policy import (POLICIES, REPL_POLICY, EC_POLICY,
+ ECDriverError, PolicyError)
from swift.proxy.controllers.base import Controller, delay_denial, \
cors_validation
from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPNotFound, \
HTTPPreconditionFailed, HTTPRequestEntityTooLarge, HTTPRequestTimeout, \
- HTTPServerError, HTTPServiceUnavailable, Request, \
- HTTPClientDisconnect, HeaderKeyDict, HTTPException
+ HTTPServerError, HTTPServiceUnavailable, Request, HeaderKeyDict, \
+ HTTPClientDisconnect, HTTPUnprocessableEntity, Response, HTTPException
from swift.common.request_helpers import is_sys_or_user_meta, is_sys_meta, \
- remove_items, copy_header_subset
+ remove_items, copy_header_subset, close_if_possible
def copy_headers_into(from_r, to_r):
@@ -85,8 +93,41 @@ def check_content_type(req):
return None
-class ObjectController(Controller):
- """WSGI controller for object requests."""
+class ObjectControllerRouter(object):
+
+ policy_type_to_controller_map = {}
+
+ @classmethod
+ def register(cls, policy_type):
+ """
+ Decorator for Storage Policy implemenations to register
+ their ObjectController implementations.
+
+ This also fills in a policy_type attribute on the class.
+ """
+ def register_wrapper(controller_cls):
+ if policy_type in cls.policy_type_to_controller_map:
+ raise PolicyError(
+ '%r is already registered for the policy_type %r' % (
+ cls.policy_type_to_controller_map[policy_type],
+ policy_type))
+ cls.policy_type_to_controller_map[policy_type] = controller_cls
+ controller_cls.policy_type = policy_type
+ return controller_cls
+ return register_wrapper
+
+ def __init__(self):
+ self.policy_to_controller_cls = {}
+ for policy in POLICIES:
+ self.policy_to_controller_cls[policy] = \
+ self.policy_type_to_controller_map[policy.policy_type]
+
+ def __getitem__(self, policy):
+ return self.policy_to_controller_cls[policy]
+
+
+class BaseObjectController(Controller):
+ """Base WSGI controller for object requests."""
server_type = 'Object'
def __init__(self, app, account_name, container_name, object_name,
@@ -114,8 +155,10 @@ class ObjectController(Controller):
lreq.environ['QUERY_STRING'] = \
'format=json&prefix=%s&marker=%s' % (quote(lprefix),
quote(marker))
+ container_node_iter = self.app.iter_nodes(self.app.container_ring,
+ lpartition)
lresp = self.GETorHEAD_base(
- lreq, _('Container'), self.app.container_ring, lpartition,
+ lreq, _('Container'), container_node_iter, lpartition,
lreq.swift_entity_path)
if 'swift.authorize' in env:
lreq.acl = lresp.headers.get('x-container-read')
@@ -180,6 +223,7 @@ class ObjectController(Controller):
# pass the policy index to storage nodes via req header
policy_index = req.headers.get('X-Backend-Storage-Policy-Index',
container_info['storage_policy'])
+ policy = POLICIES.get_by_index(policy_index)
obj_ring = self.app.get_object_ring(policy_index)
req.headers['X-Backend-Storage-Policy-Index'] = policy_index
if 'swift.authorize' in req.environ:
@@ -188,9 +232,10 @@ class ObjectController(Controller):
return aresp
partition = obj_ring.get_part(
self.account_name, self.container_name, self.object_name)
- resp = self.GETorHEAD_base(
- req, _('Object'), obj_ring, partition,
- req.swift_entity_path)
+ node_iter = self.app.iter_nodes(obj_ring, partition)
+
+ resp = self._reroute(policy)._get_or_head_response(
+ req, node_iter, partition, policy)
if ';' in resp.headers.get('content-type', ''):
resp.content_type = clean_content_type(
@@ -383,7 +428,10 @@ class ObjectController(Controller):
_('Trying to get final status of PUT to %s') % req.path)
return (None, None)
- def _get_put_responses(self, req, conns, nodes):
+ def _get_put_responses(self, req, conns, nodes, **kwargs):
+ """
+ Collect replicated object responses.
+ """
statuses = []
reasons = []
bodies = []
@@ -488,6 +536,7 @@ class ObjectController(Controller):
self.object_name = src_obj_name
self.container_name = src_container_name
self.account_name = src_account_name
+
source_resp = self.GET(source_req)
# This gives middlewares a way to change the source; for example,
@@ -589,8 +638,9 @@ class ObjectController(Controller):
'X-Newest': 'True'}
hreq = Request.blank(req.path_info, headers=_headers,
environ={'REQUEST_METHOD': 'HEAD'})
+ hnode_iter = self.app.iter_nodes(obj_ring, partition)
hresp = self.GETorHEAD_base(
- hreq, _('Object'), obj_ring, partition,
+ hreq, _('Object'), hnode_iter, partition,
hreq.swift_entity_path)
is_manifest = 'X-Object-Manifest' in req.headers or \
@@ -654,7 +704,10 @@ class ObjectController(Controller):
req.headers['X-Timestamp'] = Timestamp(time.time()).internal
return None
- def _check_failure_put_connections(self, conns, req, nodes):
+ def _check_failure_put_connections(self, conns, req, nodes, min_conns):
+ """
+ Identify any failed connections and check minimum connection count.
+ """
if req.if_none_match is not None and '*' in req.if_none_match:
statuses = [conn.resp.status for conn in conns if conn.resp]
if HTTP_PRECONDITION_FAILED in statuses:
@@ -675,7 +728,6 @@ class ObjectController(Controller):
'timestamps': ', '.join(timestamps)})
raise HTTPAccepted(request=req)
- min_conns = quorum_size(len(nodes))
self._check_min_conn(req, conns, min_conns)
def _get_put_connections(self, req, nodes, partition, outgoing_headers,
@@ -709,8 +761,12 @@ class ObjectController(Controller):
raise HTTPServiceUnavailable(request=req)
def _transfer_data(self, req, data_source, conns, nodes):
- min_conns = quorum_size(len(nodes))
+ """
+ Transfer data for a replicated object.
+ This method was added in the PUT method extraction change
+ """
+ min_conns = quorum_size(len(nodes))
bytes_transferred = 0
try:
with ContextPool(len(nodes)) as pool:
@@ -775,11 +831,11 @@ class ObjectController(Controller):
This method is responsible for establishing connection
with storage nodes and sending object to each one of those
- nodes. After sending the data, the "best" reponse will be
+ nodes. After sending the data, the "best" response will be
returned based on statuses from all connections
"""
- policy_idx = req.headers.get('X-Backend-Storage-Policy-Index')
- policy = POLICIES.get_by_index(policy_idx)
+ policy_index = req.headers.get('X-Backend-Storage-Policy-Index')
+ policy = POLICIES.get_by_index(policy_index)
if not nodes:
return HTTPNotFound()
@@ -790,11 +846,11 @@ class ObjectController(Controller):
expect = False
conns = self._get_put_connections(req, nodes, partition,
outgoing_headers, policy, expect)
-
+ min_conns = quorum_size(len(nodes))
try:
# check that a minimum number of connections were established and
# meet all the correct conditions set in the request
- self._check_failure_put_connections(conns, req, nodes)
+ self._check_failure_put_connections(conns, req, nodes, min_conns)
# transfer data
self._transfer_data(req, data_source, conns, nodes)
@@ -882,9 +938,6 @@ class ObjectController(Controller):
req, delete_at_container, delete_at_part, \
delete_at_nodes = self._config_obj_expiration(req)
- # XXX hack for PUT to EC until the proxy learns how to encode
- req.headers['X-Object-Sysmeta-Ec-Archive-Index'] = 0
-
# add special headers to be handled by storage nodes
outgoing_headers = self._backend_requests(
req, len(nodes), container_partition, container_nodes,
@@ -1014,6 +1067,21 @@ class ObjectController(Controller):
headers, overrides=status_overrides)
return resp
+ def _reroute(self, policy):
+ """
+ For COPY requests we need to make sure the controller instance the
+ request is routed through is the correct type for the policy.
+ """
+ if not policy:
+ raise HTTPServiceUnavailable('Unknown Storage Policy')
+ if policy.policy_type != self.policy_type:
+ controller = self.app.obj_controller_router[policy](
+ self.app, self.account_name, self.container_name,
+ self.object_name)
+ else:
+ controller = self
+ return controller
+
@public
@cors_validation
@delay_denial
@@ -1030,6 +1098,7 @@ class ObjectController(Controller):
self.account_name = dest_account
del req.headers['Destination-Account']
dest_container, dest_object = check_destination_header(req)
+
source = '/%s/%s' % (self.container_name, self.object_name)
self.container_name = dest_container
self.object_name = dest_object
@@ -1041,4 +1110,1109 @@ class ObjectController(Controller):
req.headers['Content-Length'] = 0
req.headers['X-Copy-From'] = quote(source)
del req.headers['Destination']
- return self.PUT(req)
+
+ container_info = self.container_info(
+ dest_account, dest_container, req)
+ dest_policy = POLICIES.get_by_index(container_info['storage_policy'])
+
+ return self._reroute(dest_policy).PUT(req)
+
+
+@ObjectControllerRouter.register(REPL_POLICY)
+class ReplicatedObjectController(BaseObjectController):
+
+ def _get_or_head_response(self, req, node_iter, partition, policy):
+ resp = self.GETorHEAD_base(
+ req, _('Object'), node_iter, partition,
+ req.swift_entity_path)
+ return resp
+
+
+class ECAppIter(object):
+ """
+ WSGI iterable that decodes EC fragment archives (or portions thereof)
+ into the original object (or portions thereof).
+
+ :param path: path for the request
+
+ :param policy: storage policy for this object
+
+ :param internal_app_iters: list of the WSGI iterables from object server
+ GET responses for fragment archives. For an M+K erasure code, the
+ caller must supply M such iterables.
+
+ :param range_specs: list of dictionaries describing the ranges requested
+ by the client. Each dictionary contains the start and end of the
+ client's requested byte range as well as the start and end of the EC
+ segments containing that byte range.
+
+ :param obj_length: length of the object, in bytes. Learned from the
+ headers in the GET response from the object server.
+
+ :param logger: a logger
+ """
+ def __init__(self, path, policy, internal_app_iters, range_specs,
+ obj_length, logger):
+ self.path = path
+ self.policy = policy
+ self.internal_app_iters = internal_app_iters
+ self.range_specs = range_specs
+ self.obj_length = obj_length
+ self.boundary = ''
+ self.logger = logger
+
+ def close(self):
+ for it in self.internal_app_iters:
+ close_if_possible(it)
+
+ def __iter__(self):
+ segments_iter = self.decode_segments_from_fragments()
+
+ if len(self.range_specs) == 0:
+ # plain GET; just yield up segments
+ for seg in segments_iter:
+ yield seg
+ return
+
+ if len(self.range_specs) > 1:
+ raise NotImplementedError("multi-range GETs not done yet")
+
+ for range_spec in self.range_specs:
+ client_start = range_spec['client_start']
+ client_end = range_spec['client_end']
+ segment_start = range_spec['segment_start']
+ segment_end = range_spec['segment_end']
+
+ seg_size = self.policy.ec_segment_size
+ is_suffix = client_start is None
+
+ if is_suffix:
+ # Suffix byte ranges (i.e. requests for the last N bytes of
+ # an object) are likely to end up not on a segment boundary.
+ client_range_len = client_end
+ client_start = max(self.obj_length - client_range_len, 0)
+ client_end = self.obj_length - 1
+
+ # may be mid-segment; if it is, then everything up to the
+ # first segment boundary is garbage, and is discarded before
+ # ever getting into this function.
+ unaligned_segment_start = max(self.obj_length - segment_end, 0)
+ alignment_offset = (
+ (seg_size - (unaligned_segment_start % seg_size))
+ % seg_size)
+ segment_start = unaligned_segment_start + alignment_offset
+ segment_end = self.obj_length - 1
+ else:
+ # It's entirely possible that the client asked for a range that
+ # includes some bytes we have and some we don't; for example, a
+ # range of bytes 1000-20000000 on a 1500-byte object.
+ segment_end = (min(segment_end, self.obj_length - 1)
+ if segment_end is not None
+ else self.obj_length - 1)
+ client_end = (min(client_end, self.obj_length - 1)
+ if client_end is not None
+ else self.obj_length - 1)
+
+ num_segments = int(
+ math.ceil(float(segment_end + 1 - segment_start)
+ / self.policy.ec_segment_size))
+ # We get full segments here, but the client may have requested a
+ # byte range that begins or ends in the middle of a segment.
+ # Thus, we have some amount of overrun (extra decoded bytes)
+ # that we trim off so the client gets exactly what they
+ # requested.
+ start_overrun = client_start - segment_start
+ end_overrun = segment_end - client_end
+
+ for i, next_seg in enumerate(segments_iter):
+ # We may have a start_overrun of more than one segment in
+ # the case of suffix-byte-range requests. However, we never
+ # have an end_overrun of more than one segment.
+ if start_overrun > 0:
+ seglen = len(next_seg)
+ if seglen <= start_overrun:
+ start_overrun -= seglen
+ continue
+ else:
+ next_seg = next_seg[start_overrun:]
+ start_overrun = 0
+
+ if i == (num_segments - 1) and end_overrun:
+ next_seg = next_seg[:-end_overrun]
+
+ yield next_seg
+
+ def decode_segments_from_fragments(self):
+ # Decodes the fragments from the object servers and yields one
+ # segment at a time.
+ queues = [Queue(1) for _junk in range(len(self.internal_app_iters))]
+
+ def put_fragments_in_queue(frag_iter, queue):
+ try:
+ for fragment in frag_iter:
+ if fragment[0] == ' ':
+ raise Exception('Leading whitespace on fragment.')
+ queue.put(fragment)
+ except GreenletExit:
+ # killed by contextpool
+ pass
+ except ChunkReadTimeout:
+ # unable to resume in GetOrHeadHandler
+ pass
+ except: # noqa
+ self.logger.exception("Exception fetching fragments for %r" %
+ self.path)
+ finally:
+ queue.resize(2) # ensure there's room
+ queue.put(None)
+
+ with ContextPool(len(self.internal_app_iters)) as pool:
+ for app_iter, queue in zip(
+ self.internal_app_iters, queues):
+ pool.spawn(put_fragments_in_queue, app_iter, queue)
+
+ while True:
+ fragments = []
+ for qi, queue in enumerate(queues):
+ fragment = queue.get()
+ queue.task_done()
+ fragments.append(fragment)
+
+ # If any object server connection yields out a None; we're
+ # done. Either they are all None, and we've finished
+ # successfully; or some un-recoverable failure has left us
+ # with an un-reconstructible list of fragments - so we'll
+ # break out of the iter so WSGI can tear down the broken
+ # connection.
+ if not all(fragments):
+ break
+ try:
+ segment = self.policy.pyeclib_driver.decode(fragments)
+ except ECDriverError:
+ self.logger.exception("Error decoding fragments for %r" %
+ self.path)
+ raise
+
+ yield segment
+
+ def app_iter_range(self, start, end):
+ return self
+
+ def app_iter_ranges(self, content_type, boundary, content_size):
+ self.boundary = boundary
+
+
+def client_range_to_segment_range(client_start, client_end, segment_size):
+ """
+ Takes a byterange from the client and converts it into a byterange
+ spanning the necessary segments.
+
+ Handles prefix, suffix, and fully-specified byte ranges.
+
+ Examples:
+ client_range_to_segment_range(100, 700, 512) = (0, 1023)
+ client_range_to_segment_range(100, 700, 256) = (0, 767)
+ client_range_to_segment_range(300, None, 256) = (256, None)
+
+ :param client_start: first byte of the range requested by the client
+ :param client_end: last byte of the range requested by the client
+ :param segment_size: size of an EC segment, in bytes
+
+ :returns: a 2-tuple (seg_start, seg_end) where
+
+ * seg_start is the first byte of the first segment, or None if this is
+ a suffix byte range
+
+ * seg_end is the last byte of the last segment, or None if this is a
+ prefix byte range
+ """
+ # the index of the first byte of the first segment
+ segment_start = (
+ int(client_start // segment_size)
+ * segment_size) if client_start is not None else None
+ # the index of the last byte of the last segment
+ segment_end = (
+ # bytes M-
+ None if client_end is None else
+ # bytes M-N
+ (((int(client_end // segment_size) + 1)
+ * segment_size) - 1) if client_start is not None else
+ # bytes -N: we get some extra bytes to make sure we
+ # have all we need.
+ #
+ # To see why, imagine a 100-byte segment size, a
+ # 340-byte object, and a request for the last 50
+ # bytes. Naively requesting the last 100 bytes would
+ # result in a truncated first segment and hence a
+ # truncated download. (Of course, the actual
+ # obj-server requests are for fragments, not
+ # segments, but that doesn't change the
+ # calculation.)
+ #
+ # This does mean that we fetch an extra segment if
+ # the object size is an exact multiple of the
+ # segment size. It's a little wasteful, but it's
+ # better to be a little wasteful than to get some
+ # range requests completely wrong.
+ (int(math.ceil((
+ float(client_end) / segment_size) + 1)) # nsegs
+ * segment_size))
+ return (segment_start, segment_end)
+
+
+def segment_range_to_fragment_range(segment_start, segment_end, segment_size,
+ fragment_size):
+ """
+ Takes a byterange spanning some segments and converts that into a
+ byterange spanning the corresponding fragments within their fragment
+ archives.
+
+ Handles prefix, suffix, and fully-specified byte ranges.
+
+ :param segment_start: first byte of the first segment
+ :param segment_end: last byte of the last segment
+ :param segment_size: size of an EC segment, in bytes
+ :param fragment_size: size of an EC fragment, in bytes
+
+ :returns: a 2-tuple (frag_start, frag_end) where
+
+ * frag_start is the first byte of the first fragment, or None if this
+ is a suffix byte range
+
+ * frag_end is the last byte of the last fragment, or None if this is a
+ prefix byte range
+ """
+ # Note: segment_start and (segment_end + 1) are
+ # multiples of segment_size, so we don't have to worry
+ # about integer math giving us rounding troubles.
+ #
+ # There's a whole bunch of +1 and -1 in here; that's because HTTP wants
+ # byteranges to be inclusive of the start and end, so e.g. bytes 200-300
+ # is a range containing 101 bytes. Python has half-inclusive ranges, of
+ # course, so we have to convert back and forth. We try to keep things in
+ # HTTP-style byteranges for consistency.
+
+ # the index of the first byte of the first fragment
+ fragment_start = ((
+ segment_start / segment_size * fragment_size)
+ if segment_start is not None else None)
+ # the index of the last byte of the last fragment
+ fragment_end = (
+ # range unbounded on the right
+ None if segment_end is None else
+ # range unbounded on the left; no -1 since we're
+ # asking for the last N bytes, not to have a
+ # particular byte be the last one
+ ((segment_end + 1) / segment_size
+ * fragment_size) if segment_start is None else
+ # range bounded on both sides; the -1 is because the
+ # rest of the expression computes the length of the
+ # fragment, and a range of N bytes starts at index M
+ # and ends at M + N - 1.
+ ((segment_end + 1) / segment_size * fragment_size) - 1)
+ return (fragment_start, fragment_end)
+
+
+NO_DATA_SENT = 1
+SENDING_DATA = 2
+DATA_SENT = 3
+DATA_ACKED = 4
+COMMIT_SENT = 5
+
+
+class ECPutter(object):
+ """
+ This is here mostly to wrap up the fact that all EC PUTs are
+ chunked because of the mime boundary footer trick and the first
+ half of the two-phase PUT conversation handling.
+
+ An HTTP PUT request that supports streaming.
+
+ Probably deserves more docs than this, but meh.
+ """
+ def __init__(self, conn, node, resp, path, connect_duration,
+ mime_boundary):
+ # Note: you probably want to call Putter.connect() instead of
+ # instantiating one of these directly.
+ self.conn = conn
+ self.node = node
+ self.resp = resp
+ self.path = path
+ self.connect_duration = connect_duration
+ # for handoff nodes node_index is None
+ self.node_index = node.get('index')
+ self.mime_boundary = mime_boundary
+ self.chunk_hasher = md5()
+
+ self.failed = False
+ self.queue = None
+ self.state = NO_DATA_SENT
+
+ def current_status(self):
+ """
+ Returns the current status of the response.
+
+ A response starts off with no current status, then may or may not have
+ a status of 100 for some time, and then ultimately has a final status
+ like 200, 404, et cetera.
+ """
+ return self.resp.status
+
+ def await_response(self, timeout, informational=False):
+ """
+ Get 100-continue response indicating the end of 1st phase of a 2-phase
+ commit or the final response, i.e. the one with status >= 200.
+
+ Might or might not actually wait for anything. If we said Expect:
+ 100-continue but got back a non-100 response, that'll be the thing
+ returned, and we won't do any network IO to get it. OTOH, if we got
+ a 100 Continue response and sent up the PUT request's body, then
+ we'll actually read the 2xx-5xx response off the network here.
+
+ :returns: HTTPResponse
+ :raises: Timeout if the response took too long
+ """
+ conn = self.conn
+ with Timeout(timeout):
+ if not conn.resp:
+ if informational:
+ self.resp = conn.getexpect()
+ else:
+ self.resp = conn.getresponse()
+ return self.resp
+
+ def spawn_sender_greenthread(self, pool, queue_depth, write_timeout,
+ exception_handler):
+ """Call before sending the first chunk of request body"""
+ self.queue = Queue(queue_depth)
+ pool.spawn(self._send_file, write_timeout, exception_handler)
+
+ def wait(self):
+ if self.queue.unfinished_tasks:
+ self.queue.join()
+
+ def _start_mime_doc_object_body(self):
+ self.queue.put("--%s\r\nX-Document: object body\r\n\r\n" %
+ (self.mime_boundary,))
+
+ def send_chunk(self, chunk):
+ if not chunk:
+ # If we're not using chunked transfer-encoding, sending a 0-byte
+ # chunk is just wasteful. If we *are* using chunked
+ # transfer-encoding, sending a 0-byte chunk terminates the
+ # request body. Neither one of these is good.
+ return
+ elif self.state == DATA_SENT:
+ raise ValueError("called send_chunk after end_of_object_data")
+
+ if self.state == NO_DATA_SENT and self.mime_boundary:
+ # We're sending the object plus other stuff in the same request
+ # body, all wrapped up in multipart MIME, so we'd better start
+ # off the MIME document before sending any object data.
+ self._start_mime_doc_object_body()
+ self.state = SENDING_DATA
+
+ self.queue.put(chunk)
+
+ def end_of_object_data(self, footer_metadata):
+ """
+ Call when there is no more data to send.
+
+ :param footer_metadata: dictionary of metadata items
+ """
+ if self.state == DATA_SENT:
+ raise ValueError("called end_of_object_data twice")
+ elif self.state == NO_DATA_SENT and self.mime_boundary:
+ self._start_mime_doc_object_body()
+
+ footer_body = json.dumps(footer_metadata)
+ footer_md5 = md5(footer_body).hexdigest()
+
+ tail_boundary = ("--%s" % (self.mime_boundary,))
+
+ message_parts = [
+ ("\r\n--%s\r\n" % self.mime_boundary),
+ "X-Document: object metadata\r\n",
+ "Content-MD5: %s\r\n" % footer_md5,
+ "\r\n",
+ footer_body, "\r\n",
+ tail_boundary, "\r\n",
+ ]
+ self.queue.put("".join(message_parts))
+
+ self.queue.put('')
+ self.state = DATA_SENT
+
+ def send_commit_confirmation(self):
+ """
+ Call when there are > quorum 2XX responses received. Send commit
+ confirmations to all object nodes to finalize the PUT.
+ """
+ if self.state == COMMIT_SENT:
+ raise ValueError("called send_commit_confirmation twice")
+
+ self.state = DATA_ACKED
+
+ if self.mime_boundary:
+ body = "put_commit_confirmation"
+ tail_boundary = ("--%s--" % (self.mime_boundary,))
+ message_parts = [
+ "X-Document: put commit\r\n",
+ "\r\n",
+ body, "\r\n",
+ tail_boundary,
+ ]
+ self.queue.put("".join(message_parts))
+
+ self.queue.put('')
+ self.state = COMMIT_SENT
+
+ def _send_file(self, write_timeout, exception_handler):
+ """
+ Method for a file PUT coro. Takes chunks from a queue and sends them
+ down a socket.
+
+ If something goes wrong, the "failed" attribute will be set to true
+ and the exception handler will be called.
+ """
+ while True:
+ chunk = self.queue.get()
+ if not self.failed:
+ to_send = "%x\r\n%s\r\n" % (len(chunk), chunk)
+ try:
+ with ChunkWriteTimeout(write_timeout):
+ self.conn.send(to_send)
+ except (Exception, ChunkWriteTimeout):
+ self.failed = True
+ exception_handler(self.conn.node, _('Object'),
+ _('Trying to write to %s') % self.path)
+ self.queue.task_done()
+
+ @classmethod
+ def connect(cls, node, part, path, headers, conn_timeout, node_timeout,
+ chunked=False):
+ """
+ Connect to a backend node and send the headers.
+
+ :returns: Putter instance
+
+ :raises: ConnectionTimeout if initial connection timed out
+ :raises: ResponseTimeout if header retrieval timed out
+ :raises: InsufficientStorage on 507 response from node
+ :raises: PutterConnectError on non-507 server error response from node
+ :raises: FooterNotSupported if need_metadata_footer is set but
+ backend node can't process footers
+ :raises: MultiphasePUTNotSupported if need_multiphase_support is
+ set but backend node can't handle multiphase PUT
+ """
+ mime_boundary = "%.64x" % random.randint(0, 16 ** 64)
+ headers = HeaderKeyDict(headers)
+ # We're going to be adding some unknown amount of data to the
+ # request, so we can't use an explicit content length, and thus
+ # we must use chunked encoding.
+ headers['Transfer-Encoding'] = 'chunked'
+ headers['Expect'] = '100-continue'
+ if 'Content-Length' in headers:
+ headers['X-Backend-Obj-Content-Length'] = \
+ headers.pop('Content-Length')
+
+ headers['X-Backend-Obj-Multipart-Mime-Boundary'] = mime_boundary
+
+ headers['X-Backend-Obj-Metadata-Footer'] = 'yes'
+
+ headers['X-Backend-Obj-Multiphase-Commit'] = 'yes'
+
+ start_time = time.time()
+ with ConnectionTimeout(conn_timeout):
+ conn = http_connect(node['ip'], node['port'], node['device'],
+ part, 'PUT', path, headers)
+ connect_duration = time.time() - start_time
+
+ with ResponseTimeout(node_timeout):
+ resp = conn.getexpect()
+
+ if resp.status == HTTP_INSUFFICIENT_STORAGE:
+ raise InsufficientStorage
+
+ if is_server_error(resp.status):
+ raise PutterConnectError(resp.status)
+
+ if is_informational(resp.status):
+ continue_headers = HeaderKeyDict(resp.getheaders())
+ can_send_metadata_footer = config_true_value(
+ continue_headers.get('X-Obj-Metadata-Footer', 'no'))
+ can_handle_multiphase_put = config_true_value(
+ continue_headers.get('X-Obj-Multiphase-Commit', 'no'))
+
+ if not can_send_metadata_footer:
+ raise FooterNotSupported()
+
+ if not can_handle_multiphase_put:
+ raise MultiphasePUTNotSupported()
+
+ conn.node = node
+ conn.resp = None
+ if is_success(resp.status) or resp.status == HTTP_CONFLICT:
+ conn.resp = resp
+ elif (headers.get('If-None-Match', None) is not None and
+ resp.status == HTTP_PRECONDITION_FAILED):
+ conn.resp = resp
+
+ return cls(conn, node, resp, path, connect_duration, mime_boundary)
+
+
+def chunk_transformer(policy, nstreams):
+ segment_size = policy.ec_segment_size
+
+ buf = collections.deque()
+ total_buf_len = 0
+
+ chunk = yield
+ while chunk:
+ buf.append(chunk)
+ total_buf_len += len(chunk)
+ if total_buf_len >= segment_size:
+ chunks_to_encode = []
+ # extract as many chunks as we can from the input buffer
+ while total_buf_len >= segment_size:
+ to_take = segment_size
+ pieces = []
+ while to_take > 0:
+ piece = buf.popleft()
+ if len(piece) > to_take:
+ buf.appendleft(piece[to_take:])
+ piece = piece[:to_take]
+ pieces.append(piece)
+ to_take -= len(piece)
+ total_buf_len -= len(piece)
+ chunks_to_encode.append(''.join(pieces))
+
+ frags_by_byte_order = []
+ for chunk_to_encode in chunks_to_encode:
+ frags_by_byte_order.append(
+ policy.pyeclib_driver.encode(chunk_to_encode))
+ # Sequential calls to encode() have given us a list that
+ # looks like this:
+ #
+ # [[frag_A1, frag_B1, frag_C1, ...],
+ # [frag_A2, frag_B2, frag_C2, ...], ...]
+ #
+ # What we need is a list like this:
+ #
+ # [(frag_A1 + frag_A2 + ...), # destined for node A
+ # (frag_B1 + frag_B2 + ...), # destined for node B
+ # (frag_C1 + frag_C2 + ...), # destined for node C
+ # ...]
+ obj_data = [''.join(frags)
+ for frags in zip(*frags_by_byte_order)]
+ chunk = yield obj_data
+ else:
+ # didn't have enough data to encode
+ chunk = yield None
+
+ # Now we've gotten an empty chunk, which indicates end-of-input.
+ # Take any leftover bytes and encode them.
+ last_bytes = ''.join(buf)
+ if last_bytes:
+ last_frags = policy.pyeclib_driver.encode(last_bytes)
+ yield last_frags
+ else:
+ yield [''] * nstreams
+
+
+def trailing_metadata(policy, client_obj_hasher,
+ bytes_transferred_from_client,
+ fragment_archive_index):
+ return {
+ # etag and size values are being added twice here.
+ # The container override header is used to update the container db
+ # with these values as they represent the correct etag and size for
+ # the whole object and not just the FA.
+ # The object sysmeta headers will be saved on each FA of the object.
+ 'X-Object-Sysmeta-EC-Etag': client_obj_hasher.hexdigest(),
+ 'X-Object-Sysmeta-EC-Content-Length':
+ str(bytes_transferred_from_client),
+ 'X-Backend-Container-Update-Override-Etag':
+ client_obj_hasher.hexdigest(),
+ 'X-Backend-Container-Update-Override-Size':
+ str(bytes_transferred_from_client),
+ 'X-Object-Sysmeta-Ec-Frag-Index': str(fragment_archive_index),
+ # These fields are for debuggability,
+ # AKA "what is this thing?"
+ 'X-Object-Sysmeta-EC-Scheme': policy.ec_scheme_description,
+ 'X-Object-Sysmeta-EC-Segment-Size': str(policy.ec_segment_size),
+ }
+
+
+@ObjectControllerRouter.register(EC_POLICY)
+class ECObjectController(BaseObjectController):
+
+ def _get_or_head_response(self, req, node_iter, partition, policy):
+ req.headers.setdefault("X-Backend-Etag-Is-At",
+ "X-Object-Sysmeta-Ec-Etag")
+
+ if req.method == 'HEAD':
+ # no fancy EC decoding here, just one plain old HEAD request to
+ # one object server because all fragments hold all metadata
+ # information about the object.
+ resp = self.GETorHEAD_base(
+ req, _('Object'), node_iter, partition,
+ req.swift_entity_path)
+ else: # GET request
+ orig_range = None
+ range_specs = []
+ if req.range:
+ orig_range = req.range
+ # Since segments and fragments have different sizes, we need
+ # to modify the Range header sent to the object servers to
+ # make sure we get the right fragments out of the fragment
+ # archives.
+ segment_size = policy.ec_segment_size
+ fragment_size = policy.fragment_size
+
+ range_specs = []
+ new_ranges = []
+ for client_start, client_end in req.range.ranges:
+
+ segment_start, segment_end = client_range_to_segment_range(
+ client_start, client_end, segment_size)
+
+ fragment_start, fragment_end = \
+ segment_range_to_fragment_range(
+ segment_start, segment_end,
+ segment_size, fragment_size)
+
+ new_ranges.append((fragment_start, fragment_end))
+ range_specs.append({'client_start': client_start,
+ 'client_end': client_end,
+ 'segment_start': segment_start,
+ 'segment_end': segment_end})
+
+ req.range = "bytes=" + ",".join(
+ "%s-%s" % (s if s is not None else "",
+ e if e is not None else "")
+ for s, e in new_ranges)
+
+ node_iter = GreenthreadSafeIterator(node_iter)
+ num_gets = policy.ec_ndata
+ with ContextPool(num_gets) as pool:
+ pile = GreenAsyncPile(pool)
+ for _junk in range(num_gets):
+ pile.spawn(self.GETorHEAD_base,
+ req, 'Object', node_iter, partition,
+ req.swift_entity_path,
+ client_chunk_size=policy.fragment_size)
+
+ responses = list(pile)
+ good_responses = []
+ bad_responses = []
+ for response in responses:
+ if is_success(response.status_int):
+ good_responses.append(response)
+ else:
+ bad_responses.append(response)
+
+ req.range = orig_range
+ if len(good_responses) == num_gets:
+ # If these aren't all for the same object, then error out so
+ # at least the client doesn't get garbage. We can do a lot
+ # better here with more work, but this'll work for now.
+ found_obj_etags = set(
+ resp.headers['X-Object-Sysmeta-Ec-Etag']
+ for resp in good_responses)
+ if len(found_obj_etags) > 1:
+ self.app.logger.debug(
+ "Returning 503 for %s; found too many etags (%s)",
+ req.path,
+ ", ".join(found_obj_etags))
+ return HTTPServiceUnavailable(request=req)
+
+ # we found enough pieces to decode the object, so now let's
+ # decode the object
+ resp_headers = HeaderKeyDict(good_responses[0].headers.items())
+ resp_headers.pop('Content-Range', None)
+ eccl = resp_headers.get('X-Object-Sysmeta-Ec-Content-Length')
+ obj_length = int(eccl) if eccl is not None else None
+
+ resp = Response(
+ request=req,
+ headers=resp_headers,
+ conditional_response=True,
+ app_iter=ECAppIter(
+ req.swift_entity_path,
+ policy,
+ [r.app_iter for r in good_responses],
+ range_specs,
+ obj_length,
+ logger=self.app.logger))
+ else:
+ resp = self.best_response(
+ req,
+ [r.status_int for r in bad_responses],
+ [r.status.split(' ', 1)[1] for r in bad_responses],
+ [r.body for r in bad_responses],
+ 'Object',
+ headers=[r.headers for r in bad_responses])
+
+ self._fix_response_headers(resp)
+ return resp
+
+ def _fix_response_headers(self, resp):
+ # EC fragment archives each have different bytes, hence different
+ # etags. However, they all have the original object's etag stored in
+ # sysmeta, so we copy that here so the client gets it.
+ resp.headers['Etag'] = resp.headers.get(
+ 'X-Object-Sysmeta-Ec-Etag')
+ resp.headers['Content-Length'] = resp.headers.get(
+ 'X-Object-Sysmeta-Ec-Content-Length')
+
+ return resp
+
+ def _connect_put_node(self, node_iter, part, path, headers,
+ logger_thread_locals):
+ """
+ Make a connection for a erasure encoded object.
+
+ Connects to the first working node that it finds in node_iter and sends
+ over the request headers. Returns a Putter to handle the rest of the
+ streaming, or None if no working nodes were found.
+ """
+ # the object server will get different bytes, so these
+ # values do not apply (Content-Length might, in general, but
+ # in the specific case of replication vs. EC, it doesn't).
+ headers.pop('Content-Length', None)
+ headers.pop('Etag', None)
+
+ self.app.logger.thread_locals = logger_thread_locals
+ for node in node_iter:
+ try:
+ putter = ECPutter.connect(
+ node, part, path, headers,
+ conn_timeout=self.app.conn_timeout,
+ node_timeout=self.app.node_timeout)
+ self.app.set_node_timing(node, putter.connect_duration)
+ return putter
+ except InsufficientStorage:
+ self.app.error_limit(node, _('ERROR Insufficient Storage'))
+ except PutterConnectError as e:
+ self.app.error_occurred(
+ node, _('ERROR %(status)d Expect: 100-continue '
+ 'From Object Server') % {
+ 'status': e.status})
+ except (Exception, Timeout):
+ self.app.exception_occurred(
+ node, _('Object'),
+ _('Expect: 100-continue on %s') % path)
+
+ def _determine_chunk_destinations(self, putters):
+ """
+ Given a list of putters, return a dict where the key is the putter
+ and the value is the node index to use.
+
+ This is done so that we line up handoffs using the same node index
+ (in the primary part list) as the primary that the handoff is standing
+ in for. This lets erasure-code fragment archives wind up on the
+ preferred local primary nodes when possible.
+ """
+ # Give each putter a "chunk index": the index of the
+ # transformed chunk that we'll send to it.
+ #
+ # For primary nodes, that's just its index (primary 0 gets
+ # chunk 0, primary 1 gets chunk 1, and so on). For handoffs,
+ # we assign the chunk index of a missing primary.
+ handoff_conns = []
+ chunk_index = {}
+ for p in putters:
+ if p.node_index is not None:
+ chunk_index[p] = p.node_index
+ else:
+ handoff_conns.append(p)
+
+ # Note: we may have more holes than handoffs. This is okay; it
+ # just means that we failed to connect to one or more storage
+ # nodes. Holes occur when a storage node is down, in which
+ # case the connection is not replaced, and when a storage node
+ # returns 507, in which case a handoff is used to replace it.
+ holes = [x for x in range(len(putters))
+ if x not in chunk_index.values()]
+
+ for hole, p in zip(holes, handoff_conns):
+ chunk_index[p] = hole
+ return chunk_index
+
+ def _transfer_data(self, req, policy, data_source, putters, nodes,
+ min_conns, etag_hasher):
+ """
+ Transfer data for an erasure coded object.
+
+ This method was added in the PUT method extraction change
+ """
+ bytes_transferred = 0
+ chunk_transform = chunk_transformer(policy, len(nodes))
+ chunk_transform.send(None)
+
+ def send_chunk(chunk):
+ if etag_hasher:
+ etag_hasher.update(chunk)
+ backend_chunks = chunk_transform.send(chunk)
+ if backend_chunks is None:
+ # If there's not enough bytes buffered for erasure-encoding
+ # or whatever we're doing, the transform will give us None.
+ return
+
+ for putter in list(putters):
+ backend_chunk = backend_chunks[chunk_index[putter]]
+ if not putter.failed:
+ putter.chunk_hasher.update(backend_chunk)
+ putter.send_chunk(backend_chunk)
+ else:
+ putters.remove(putter)
+ self._check_min_conn(
+ req, putters, min_conns, msg='Object PUT exceptions during'
+ ' send, %(conns)s/%(nodes)s required connections')
+
+ try:
+ with ContextPool(len(putters)) as pool:
+
+ # build our chunk index dict to place handoffs in the
+ # same part nodes index as the primaries they are covering
+ chunk_index = self._determine_chunk_destinations(putters)
+
+ for putter in putters:
+ putter.spawn_sender_greenthread(
+ pool, self.app.put_queue_depth, self.app.node_timeout,
+ self.app.exception_occurred)
+ while True:
+ with ChunkReadTimeout(self.app.client_timeout):
+ try:
+ chunk = next(data_source)
+ except StopIteration:
+ computed_etag = (etag_hasher.hexdigest()
+ if etag_hasher else None)
+ received_etag = req.headers.get(
+ 'etag', '').strip('"')
+ if (computed_etag and received_etag and
+ computed_etag != received_etag):
+ raise HTTPUnprocessableEntity(request=req)
+
+ send_chunk('') # flush out any buffered data
+
+ for putter in putters:
+ trail_md = trailing_metadata(
+ policy, etag_hasher,
+ bytes_transferred,
+ chunk_index[putter])
+ trail_md['Etag'] = \
+ putter.chunk_hasher.hexdigest()
+ putter.end_of_object_data(trail_md)
+ break
+ bytes_transferred += len(chunk)
+ if bytes_transferred > constraints.MAX_FILE_SIZE:
+ raise HTTPRequestEntityTooLarge(request=req)
+
+ send_chunk(chunk)
+
+ for putter in putters:
+ putter.wait()
+
+ # for storage policies requiring 2-phase commit (e.g.
+ # erasure coding), enforce >= 'quorum' number of
+ # 100-continue responses - this indicates successful
+ # object data and metadata commit and is a necessary
+ # condition to be met before starting 2nd PUT phase
+ final_phase = False
+ need_quorum = True
+ statuses, reasons, bodies, _junk, quorum = \
+ self._get_put_responses(
+ req, putters, len(nodes), final_phase,
+ min_conns, need_quorum=need_quorum)
+ if not quorum:
+ self.app.logger.error(
+ _('Not enough object servers ack\'ed (got %d)'),
+ statuses.count(HTTP_CONTINUE))
+ raise HTTPServiceUnavailable(request=req)
+ # quorum achieved, start 2nd phase - send commit
+ # confirmation to participating object servers
+ # so they write a .durable state file indicating
+ # a successful PUT
+ for putter in putters:
+ putter.send_commit_confirmation()
+ for putter in putters:
+ putter.wait()
+ except ChunkReadTimeout as err:
+ self.app.logger.warn(
+ _('ERROR Client read timeout (%ss)'), err.seconds)
+ self.app.logger.increment('client_timeouts')
+ raise HTTPRequestTimeout(request=req)
+ except HTTPException:
+ raise
+ except (Exception, Timeout):
+ self.app.logger.exception(
+ _('ERROR Exception causing client disconnect'))
+ raise HTTPClientDisconnect(request=req)
+ if req.content_length and bytes_transferred < req.content_length:
+ req.client_disconnect = True
+ self.app.logger.warn(
+ _('Client disconnected without sending enough data'))
+ self.app.logger.increment('client_disconnects')
+ raise HTTPClientDisconnect(request=req)
+
+ def _have_adequate_successes(self, statuses, min_responses):
+ """
+ Given a list of statuses from several requests, determine if a
+ satisfactory number of nodes have responded with 2xx statuses to
+ deem the transaction for a succssful response to the client.
+
+ :param statuses: list of statuses returned so far
+ :param min_responses: minimal pass criterion for number of successes
+ :returns: True or False, depending on current number of successes
+ """
+ if sum(1 for s in statuses if is_success(s)) >= min_responses:
+ return True
+ return False
+
+ def _await_response(self, conn, final_phase):
+ return conn.await_response(
+ self.app.node_timeout, not final_phase)
+
+ def _get_conn_response(self, conn, req, final_phase, **kwargs):
+ try:
+ resp = self._await_response(conn, final_phase=final_phase,
+ **kwargs)
+ except (Exception, Timeout):
+ resp = None
+ if final_phase:
+ status_type = 'final'
+ else:
+ status_type = 'commit'
+ self.app.exception_occurred(
+ conn.node, _('Object'),
+ _('Trying to get %s status of PUT to %s') % (
+ status_type, req.path))
+ return (conn, resp)
+
+ def _get_put_responses(self, req, putters, num_nodes, final_phase,
+ min_responses, need_quorum=True):
+ """
+ Collect erasure coded object responses.
+
+ Collect object responses to a PUT request and determine if
+ satisfactory number of nodes have returned success. Return
+ statuses, quorum result if indicated by 'need_quorum' and
+ etags if this is a final phase or a multiphase PUT transaction.
+
+ :param req: the request
+ :param putters: list of putters for the request
+ :param num_nodes: number of nodes involved
+ :param final_phase: boolean indicating if this is the last phase
+ :param min_responses: minimum needed when not requiring quorum
+ :param need_quorum: boolean indicating if quorum is required
+ """
+ statuses = []
+ reasons = []
+ bodies = []
+ etags = set()
+
+ pile = GreenAsyncPile(len(putters))
+ for putter in putters:
+ if putter.failed:
+ continue
+ pile.spawn(self._get_conn_response, putter, req,
+ final_phase=final_phase)
+
+ def _handle_response(putter, response):
+ statuses.append(response.status)
+ reasons.append(response.reason)
+ if final_phase:
+ body = response.read()
+ bodies.append(body)
+ else:
+ body = ''
+ if response.status == HTTP_INSUFFICIENT_STORAGE:
+ putter.failed = True
+ self.app.error_limit(putter.node,
+ _('ERROR Insufficient Storage'))
+ elif response.status >= HTTP_INTERNAL_SERVER_ERROR:
+ putter.failed = True
+ self.app.error_occurred(
+ putter.node,
+ _('ERROR %(status)d %(body)s From Object Server '
+ 're: %(path)s') %
+ {'status': response.status,
+ 'body': body[:1024], 'path': req.path})
+ elif is_success(response.status):
+ etags.add(response.getheader('etag').strip('"'))
+
+ quorum = False
+ for (putter, response) in pile:
+ if response:
+ _handle_response(putter, response)
+ if self._have_adequate_successes(statuses, min_responses):
+ break
+ else:
+ putter.failed = True
+
+ # give any pending requests *some* chance to finish
+ finished_quickly = pile.waitall(self.app.post_quorum_timeout)
+ for (putter, response) in finished_quickly:
+ if response:
+ _handle_response(putter, response)
+
+ if need_quorum:
+ if final_phase:
+ while len(statuses) < num_nodes:
+ statuses.append(HTTP_SERVICE_UNAVAILABLE)
+ reasons.append('')
+ bodies.append('')
+ else:
+ # intermediate response phase - set return value to true only
+ # if there are enough 100-continue acknowledgements
+ if self.have_quorum(statuses, num_nodes):
+ quorum = True
+
+ return statuses, reasons, bodies, etags, quorum
+
+ def _store_object(self, req, data_source, nodes, partition,
+ outgoing_headers):
+ """
+ Store an erasure coded object.
+ """
+ policy_index = int(req.headers.get('X-Backend-Storage-Policy-Index'))
+ policy = POLICIES.get_by_index(policy_index)
+ # Since the request body sent from client -> proxy is not
+ # the same as the request body sent proxy -> object, we
+ # can't rely on the object-server to do the etag checking -
+ # so we have to do it here.
+ etag_hasher = md5()
+
+ min_conns = policy.quorum
+ putters = self._get_put_connections(
+ req, nodes, partition, outgoing_headers,
+ policy, expect=True)
+
+ try:
+ # check that a minimum number of connections were established and
+ # meet all the correct conditions set in the request
+ self._check_failure_put_connections(putters, req, nodes, min_conns)
+
+ self._transfer_data(req, policy, data_source, putters,
+ nodes, min_conns, etag_hasher)
+ final_phase = True
+ need_quorum = False
+ min_resp = 2
+ putters = [p for p in putters if not p.failed]
+ # ignore response etags, and quorum boolean
+ statuses, reasons, bodies, _etags, _quorum = \
+ self._get_put_responses(req, putters, len(nodes),
+ final_phase, min_resp,
+ need_quorum=need_quorum)
+ except HTTPException as resp:
+ return resp
+
+ etag = etag_hasher.hexdigest()
+ resp = self.best_response(req, statuses, reasons, bodies,
+ _('Object PUT'), etag=etag,
+ quorum_size=min_conns)
+ resp.last_modified = math.ceil(
+ float(Timestamp(req.headers['X-Timestamp'])))
+ return resp
diff --git a/swift/proxy/server.py b/swift/proxy/server.py
index 28d41df55..8c9e22372 100644
--- a/swift/proxy/server.py
+++ b/swift/proxy/server.py
@@ -20,6 +20,8 @@ from swift import gettext_ as _
from random import shuffle
from time import time
import itertools
+import functools
+import sys
from eventlet import Timeout
@@ -32,11 +34,12 @@ from swift.common.utils import cache_from_env, get_logger, \
affinity_key_function, affinity_locality_predicate, list_from_csv, \
register_swift_info
from swift.common.constraints import check_utf8
-from swift.proxy.controllers import AccountController, ObjectController, \
- ContainerController, InfoController
+from swift.proxy.controllers import AccountController, ContainerController, \
+ ObjectControllerRouter, InfoController
+from swift.proxy.controllers.base import get_container_info
from swift.common.swob import HTTPBadRequest, HTTPForbidden, \
HTTPMethodNotAllowed, HTTPNotFound, HTTPPreconditionFailed, \
- HTTPServerError, HTTPException, Request
+ HTTPServerError, HTTPException, Request, HTTPServiceUnavailable
# List of entry points for mandatory middlewares.
@@ -109,6 +112,7 @@ class Application(object):
# ensure rings are loaded for all configured storage policies
for policy in POLICIES:
policy.load_ring(swift_dir)
+ self.obj_controller_router = ObjectControllerRouter()
self.memcache = memcache
mimetypes.init(mimetypes.knownfiles +
[os.path.join(swift_dir, 'mime.types')])
@@ -235,29 +239,44 @@ class Application(object):
"""
return POLICIES.get_object_ring(policy_idx, self.swift_dir)
- def get_controller(self, path):
+ def get_controller(self, req):
"""
Get the controller to handle a request.
- :param path: path from request
+ :param req: the request
:returns: tuple of (controller class, path dictionary)
:raises: ValueError (thrown by split_path) if given invalid path
"""
- if path == '/info':
+ if req.path == '/info':
d = dict(version=None,
expose_info=self.expose_info,
disallowed_sections=self.disallowed_sections,
admin_key=self.admin_key)
return InfoController, d
- version, account, container, obj = split_path(path, 1, 4, True)
+ version, account, container, obj = split_path(req.path, 1, 4, True)
d = dict(version=version,
account_name=account,
container_name=container,
object_name=obj)
if obj and container and account:
- return ObjectController, d
+ info = get_container_info(req.environ, self)
+ policy_index = req.headers.get('X-Backend-Storage-Policy-Index',
+ info['storage_policy'])
+ policy = POLICIES.get_by_index(policy_index)
+ if not policy:
+ # This indicates that a new policy has been created,
+ # with rings, deployed, released (i.e. deprecated =
+ # False), used by a client to create a container via
+ # another proxy that was restarted after the policy
+ # was released, and is now cached - all before this
+ # worker was HUPed to stop accepting new
+ # connections. There should never be an "unknown"
+ # index - but when there is - it's probably operator
+ # error and hopefully temporary.
+ raise HTTPServiceUnavailable('Unknown Storage Policy')
+ return self.obj_controller_router[policy], d
elif container and account:
return ContainerController, d
elif account and not container and not obj:
@@ -317,7 +336,7 @@ class Application(object):
request=req, body='Invalid UTF8 or contains NULL')
try:
- controller, path_parts = self.get_controller(req.path)
+ controller, path_parts = self.get_controller(req)
p = req.path_info
if isinstance(p, unicode):
p = p.encode('utf-8')
@@ -474,9 +493,9 @@ class Application(object):
def iter_nodes(self, ring, partition, node_iter=None):
"""
Yields nodes for a ring partition, skipping over error
- limited nodes and stopping at the configurable number of
- nodes. If a node yielded subsequently gets error limited, an
- extra node will be yielded to take its place.
+ limited nodes and stopping at the configurable number of nodes. If a
+ node yielded subsequently gets error limited, an extra node will be
+ yielded to take its place.
Note that if you're going to iterate over this concurrently from
multiple greenthreads, you'll want to use a
@@ -527,7 +546,8 @@ class Application(object):
if nodes_left <= 0:
return
- def exception_occurred(self, node, typ, additional_info):
+ def exception_occurred(self, node, typ, additional_info,
+ **kwargs):
"""
Handle logging of generic exceptions.
@@ -536,11 +556,18 @@ class Application(object):
:param additional_info: additional information to log
"""
self._incr_node_errors(node)
- self.logger.exception(
- _('ERROR with %(type)s server %(ip)s:%(port)s/%(device)s re: '
- '%(info)s'),
- {'type': typ, 'ip': node['ip'], 'port': node['port'],
- 'device': node['device'], 'info': additional_info})
+ if 'level' in kwargs:
+ log = functools.partial(self.logger.log, kwargs.pop('level'))
+ if 'exc_info' not in kwargs:
+ kwargs['exc_info'] = sys.exc_info()
+ else:
+ log = self.logger.exception
+ log(_('ERROR with %(type)s server %(ip)s:%(port)s/%(device)s'
+ ' re: %(info)s'), {
+ 'type': typ, 'ip': node['ip'], 'port':
+ node['port'], 'device': node['device'],
+ 'info': additional_info
+ }, **kwargs)
def modify_wsgi_pipeline(self, pipe):
"""
diff --git a/test/unit/__init__.py b/test/unit/__init__.py
index e87b99fbf..372fb58bb 100644
--- a/test/unit/__init__.py
+++ b/test/unit/__init__.py
@@ -67,11 +67,11 @@ def patch_policies(thing_or_policies=None, legacy_only=False,
elif with_ec_default:
default_policies = [
ECStoragePolicy(0, name='ec', is_default=True,
- ec_type='jerasure_rs_vand', ec_ndata=4,
- ec_nparity=2, ec_segment_size=4096),
+ ec_type='jerasure_rs_vand', ec_ndata=10,
+ ec_nparity=4, ec_segment_size=4096),
StoragePolicy(1, name='unu'),
]
- default_ring_args = [{'replicas': 6}, {}]
+ default_ring_args = [{'replicas': 14}, {}]
else:
default_policies = [
StoragePolicy(0, name='nulo', is_default=True),
@@ -223,7 +223,7 @@ class FakeRing(Ring):
return self.replicas
def _get_part_nodes(self, part):
- return list(self._devs)
+ return [dict(node, index=i) for i, node in enumerate(list(self._devs))]
def get_more_nodes(self, part):
# replicas^2 is the true cap
diff --git a/test/unit/account/test_reaper.py b/test/unit/account/test_reaper.py
index 6c1c102b8..d42bb4dbb 100644
--- a/test/unit/account/test_reaper.py
+++ b/test/unit/account/test_reaper.py
@@ -297,7 +297,8 @@ class TestReaper(unittest.TestCase):
'X-Backend-Storage-Policy-Index': policy.idx
}
ring = r.get_object_ring(policy.idx)
- expected = call(ring.devs[i], 0, 'a', 'c', 'o',
+ expected = call(dict(ring.devs[i], index=i), 0,
+ 'a', 'c', 'o',
headers=headers, conn_timeout=0.5,
response_timeout=10)
self.assertEqual(call_args, expected)
diff --git a/test/unit/common/ring/test_ring.py b/test/unit/common/ring/test_ring.py
index fff715785..b97b60eee 100644
--- a/test/unit/common/ring/test_ring.py
+++ b/test/unit/common/ring/test_ring.py
@@ -363,63 +363,74 @@ class TestRing(TestRingBase):
self.assertRaises(TypeError, self.ring.get_nodes)
part, nodes = self.ring.get_nodes('a')
self.assertEquals(part, 0)
- self.assertEquals(nodes, [self.intended_devs[0],
- self.intended_devs[3]])
+ self.assertEquals(nodes, [dict(node, index=i) for i, node in
+ enumerate([self.intended_devs[0],
+ self.intended_devs[3]])])
part, nodes = self.ring.get_nodes('a1')
self.assertEquals(part, 0)
- self.assertEquals(nodes, [self.intended_devs[0],
- self.intended_devs[3]])
+ self.assertEquals(nodes, [dict(node, index=i) for i, node in
+ enumerate([self.intended_devs[0],
+ self.intended_devs[3]])])
part, nodes = self.ring.get_nodes('a4')
self.assertEquals(part, 1)
- self.assertEquals(nodes, [self.intended_devs[1],
- self.intended_devs[4]])
+ self.assertEquals(nodes, [dict(node, index=i) for i, node in
+ enumerate([self.intended_devs[1],
+ self.intended_devs[4]])])
part, nodes = self.ring.get_nodes('aa')
self.assertEquals(part, 1)
- self.assertEquals(nodes, [self.intended_devs[1],
- self.intended_devs[4]])
+ self.assertEquals(nodes, [dict(node, index=i) for i, node in
+ enumerate([self.intended_devs[1],
+ self.intended_devs[4]])])
part, nodes = self.ring.get_nodes('a', 'c1')
self.assertEquals(part, 0)
- self.assertEquals(nodes, [self.intended_devs[0],
- self.intended_devs[3]])
+ self.assertEquals(nodes, [dict(node, index=i) for i, node in
+ enumerate([self.intended_devs[0],
+ self.intended_devs[3]])])
part, nodes = self.ring.get_nodes('a', 'c0')
self.assertEquals(part, 3)
- self.assertEquals(nodes, [self.intended_devs[1],
- self.intended_devs[4]])
+ self.assertEquals(nodes, [dict(node, index=i) for i, node in
+ enumerate([self.intended_devs[1],
+ self.intended_devs[4]])])
part, nodes = self.ring.get_nodes('a', 'c3')
self.assertEquals(part, 2)
- self.assertEquals(nodes, [self.intended_devs[0],
- self.intended_devs[3]])
+ self.assertEquals(nodes, [dict(node, index=i) for i, node in
+ enumerate([self.intended_devs[0],
+ self.intended_devs[3]])])
part, nodes = self.ring.get_nodes('a', 'c2')
- self.assertEquals(part, 2)
- self.assertEquals(nodes, [self.intended_devs[0],
- self.intended_devs[3]])
+ self.assertEquals(nodes, [dict(node, index=i) for i, node in
+ enumerate([self.intended_devs[0],
+ self.intended_devs[3]])])
part, nodes = self.ring.get_nodes('a', 'c', 'o1')
self.assertEquals(part, 1)
- self.assertEquals(nodes, [self.intended_devs[1],
- self.intended_devs[4]])
+ self.assertEquals(nodes, [dict(node, index=i) for i, node in
+ enumerate([self.intended_devs[1],
+ self.intended_devs[4]])])
part, nodes = self.ring.get_nodes('a', 'c', 'o5')
self.assertEquals(part, 0)
- self.assertEquals(nodes, [self.intended_devs[0],
- self.intended_devs[3]])
+ self.assertEquals(nodes, [dict(node, index=i) for i, node in
+ enumerate([self.intended_devs[0],
+ self.intended_devs[3]])])
part, nodes = self.ring.get_nodes('a', 'c', 'o0')
self.assertEquals(part, 0)
- self.assertEquals(nodes, [self.intended_devs[0],
- self.intended_devs[3]])
+ self.assertEquals(nodes, [dict(node, index=i) for i, node in
+ enumerate([self.intended_devs[0],
+ self.intended_devs[3]])])
part, nodes = self.ring.get_nodes('a', 'c', 'o2')
self.assertEquals(part, 2)
- self.assertEquals(nodes, [self.intended_devs[0],
- self.intended_devs[3]])
+ self.assertEquals(nodes, [dict(node, index=i) for i, node in
+ enumerate([self.intended_devs[0],
+ self.intended_devs[3]])])
def add_dev_to_ring(self, new_dev):
self.ring.devs.append(new_dev)
diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py
index 1489501e5..22aa3db5e 100644
--- a/test/unit/common/test_utils.py
+++ b/test/unit/common/test_utils.py
@@ -2190,13 +2190,14 @@ cluster_dfw1 = http://dfw1.host/v1/
self.assertFalse(utils.streq_const_time('a', 'aaaaa'))
self.assertFalse(utils.streq_const_time('ABC123', 'abc123'))
- def test_quorum_size(self):
+ def test_replication_quorum_size(self):
expected_sizes = {1: 1,
2: 2,
3: 2,
4: 3,
5: 3}
- got_sizes = dict([(n, utils.quorum_size(n)) for n in expected_sizes])
+ got_sizes = dict([(n, utils.quorum_size(n))
+ for n in expected_sizes])
self.assertEqual(expected_sizes, got_sizes)
def test_rsync_ip_ipv4_localhost(self):
@@ -4593,6 +4594,22 @@ class TestLRUCache(unittest.TestCase):
self.assertEqual(f.size(), 4)
+class TestParseContentRange(unittest.TestCase):
+ def test_good(self):
+ start, end, total = utils.parse_content_range("bytes 100-200/300")
+ self.assertEqual(start, 100)
+ self.assertEqual(end, 200)
+ self.assertEqual(total, 300)
+
+ def test_bad(self):
+ self.assertRaises(ValueError, utils.parse_content_range,
+ "100-300/500")
+ self.assertRaises(ValueError, utils.parse_content_range,
+ "bytes 100-200/aardvark")
+ self.assertRaises(ValueError, utils.parse_content_range,
+ "bytes bulbous-bouffant/4994801")
+
+
class TestParseContentDisposition(unittest.TestCase):
def test_basic_content_type(self):
@@ -4622,7 +4639,8 @@ class TestIterMultipartMimeDocuments(unittest.TestCase):
it.next()
except MimeInvalid as err:
exc = err
- self.assertEquals(str(exc), 'invalid starting boundary')
+ self.assertTrue('invalid starting boundary' in str(exc))
+ self.assertTrue('--unique' in str(exc))
def test_empty(self):
it = utils.iter_multipart_mime_documents(StringIO('--unique'),
diff --git a/test/unit/proxy/controllers/test_base.py b/test/unit/proxy/controllers/test_base.py
index 2c2094ffe..037e28b44 100644
--- a/test/unit/proxy/controllers/test_base.py
+++ b/test/unit/proxy/controllers/test_base.py
@@ -21,9 +21,11 @@ from swift.proxy.controllers.base import headers_to_container_info, \
headers_to_account_info, headers_to_object_info, get_container_info, \
get_container_memcache_key, get_account_info, get_account_memcache_key, \
get_object_env_key, get_info, get_object_info, \
- Controller, GetOrHeadHandler, _set_info_cache, _set_object_info_cache
+ Controller, GetOrHeadHandler, _set_info_cache, _set_object_info_cache, \
+ bytes_to_skip
from swift.common.swob import Request, HTTPException, HeaderKeyDict, \
RESPONSE_REASONS
+from swift.common import exceptions
from swift.common.utils import split_path
from swift.common.http import is_success
from swift.common.storage_policy import StoragePolicy
@@ -159,9 +161,11 @@ class TestFuncs(unittest.TestCase):
def test_GETorHEAD_base(self):
base = Controller(self.app)
req = Request.blank('/v1/a/c/o/with/slashes')
+ ring = FakeRing()
+ nodes = list(ring.get_part_nodes(0)) + list(ring.get_more_nodes(0))
with patch('swift.proxy.controllers.base.'
'http_connect', fake_http_connect(200)):
- resp = base.GETorHEAD_base(req, 'object', FakeRing(), 'part',
+ resp = base.GETorHEAD_base(req, 'object', iter(nodes), 'part',
'/a/c/o/with/slashes')
self.assertTrue('swift.object/a/c/o/with/slashes' in resp.environ)
self.assertEqual(
@@ -169,14 +173,14 @@ class TestFuncs(unittest.TestCase):
req = Request.blank('/v1/a/c/o')
with patch('swift.proxy.controllers.base.'
'http_connect', fake_http_connect(200)):
- resp = base.GETorHEAD_base(req, 'object', FakeRing(), 'part',
+ resp = base.GETorHEAD_base(req, 'object', iter(nodes), 'part',
'/a/c/o')
self.assertTrue('swift.object/a/c/o' in resp.environ)
self.assertEqual(resp.environ['swift.object/a/c/o']['status'], 200)
req = Request.blank('/v1/a/c')
with patch('swift.proxy.controllers.base.'
'http_connect', fake_http_connect(200)):
- resp = base.GETorHEAD_base(req, 'container', FakeRing(), 'part',
+ resp = base.GETorHEAD_base(req, 'container', iter(nodes), 'part',
'/a/c')
self.assertTrue('swift.container/a/c' in resp.environ)
self.assertEqual(resp.environ['swift.container/a/c']['status'], 200)
@@ -184,7 +188,7 @@ class TestFuncs(unittest.TestCase):
req = Request.blank('/v1/a')
with patch('swift.proxy.controllers.base.'
'http_connect', fake_http_connect(200)):
- resp = base.GETorHEAD_base(req, 'account', FakeRing(), 'part',
+ resp = base.GETorHEAD_base(req, 'account', iter(nodes), 'part',
'/a')
self.assertTrue('swift.account/a' in resp.environ)
self.assertEqual(resp.environ['swift.account/a']['status'], 200)
@@ -546,7 +550,7 @@ class TestFuncs(unittest.TestCase):
resp,
headers_to_object_info(headers.items(), 200))
- def test_have_quorum(self):
+ def test_base_have_quorum(self):
base = Controller(self.app)
# just throw a bunch of test cases at it
self.assertEqual(base.have_quorum([201, 404], 3), False)
@@ -648,3 +652,88 @@ class TestFuncs(unittest.TestCase):
self.assertEqual(v, dst_headers[k.lower()])
for k, v in bad_hdrs.iteritems():
self.assertFalse(k.lower() in dst_headers)
+
+ def test_client_chunk_size(self):
+
+ class TestSource(object):
+ def __init__(self, chunks):
+ self.chunks = list(chunks)
+
+ def read(self, _read_size):
+ if self.chunks:
+ return self.chunks.pop(0)
+ else:
+ return ''
+
+ source = TestSource((
+ 'abcd', '1234', 'abc', 'd1', '234abcd1234abcd1', '2'))
+ req = Request.blank('/v1/a/c/o')
+ node = {}
+ handler = GetOrHeadHandler(self.app, req, None, None, None, None, {},
+ client_chunk_size=8)
+
+ app_iter = handler._make_app_iter(req, node, source)
+ client_chunks = list(app_iter)
+ self.assertEqual(client_chunks, [
+ 'abcd1234', 'abcd1234', 'abcd1234', 'abcd12'])
+
+ def test_client_chunk_size_resuming(self):
+
+ class TestSource(object):
+ def __init__(self, chunks):
+ self.chunks = list(chunks)
+
+ def read(self, _read_size):
+ if self.chunks:
+ chunk = self.chunks.pop(0)
+ if chunk is None:
+ raise exceptions.ChunkReadTimeout()
+ else:
+ return chunk
+ else:
+ return ''
+
+ node = {'ip': '1.2.3.4', 'port': 6000, 'device': 'sda'}
+
+ source1 = TestSource(['abcd', '1234', 'abc', None])
+ source2 = TestSource(['efgh5678'])
+ req = Request.blank('/v1/a/c/o')
+ handler = GetOrHeadHandler(
+ self.app, req, 'Object', None, None, None, {},
+ client_chunk_size=8)
+
+ app_iter = handler._make_app_iter(req, node, source1)
+ with patch.object(handler, '_get_source_and_node',
+ lambda: (source2, node)):
+ client_chunks = list(app_iter)
+ self.assertEqual(client_chunks, ['abcd1234', 'efgh5678'])
+ self.assertEqual(handler.backend_headers['Range'], 'bytes=8-')
+
+ def test_bytes_to_skip(self):
+ # if you start at the beginning, skip nothing
+ self.assertEqual(bytes_to_skip(1024, 0), 0)
+
+ # missed the first 10 bytes, so we've got 1014 bytes of partial
+ # record
+ self.assertEqual(bytes_to_skip(1024, 10), 1014)
+
+ # skipped some whole records first
+ self.assertEqual(bytes_to_skip(1024, 4106), 1014)
+
+ # landed on a record boundary
+ self.assertEqual(bytes_to_skip(1024, 1024), 0)
+ self.assertEqual(bytes_to_skip(1024, 2048), 0)
+
+ # big numbers
+ self.assertEqual(bytes_to_skip(2 ** 20, 2 ** 32), 0)
+ self.assertEqual(bytes_to_skip(2 ** 20, 2 ** 32 + 1), 2 ** 20 - 1)
+ self.assertEqual(bytes_to_skip(2 ** 20, 2 ** 32 + 2 ** 19), 2 ** 19)
+
+ # odd numbers
+ self.assertEqual(bytes_to_skip(123, 0), 0)
+ self.assertEqual(bytes_to_skip(123, 23), 100)
+ self.assertEqual(bytes_to_skip(123, 247), 122)
+
+ # prime numbers
+ self.assertEqual(bytes_to_skip(11, 7), 4)
+ self.assertEqual(bytes_to_skip(97, 7873823), 55)
diff --git a/test/unit/proxy/controllers/test_obj.py b/test/unit/proxy/controllers/test_obj.py
index 002582a1a..a38e753ae 100755
--- a/test/unit/proxy/controllers/test_obj.py
+++ b/test/unit/proxy/controllers/test_obj.py
@@ -14,11 +14,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import email.parser
import itertools
import random
import time
import unittest
+from collections import defaultdict
from contextlib import contextmanager
+import json
+from hashlib import md5
import mock
from eventlet import Timeout
@@ -26,13 +30,26 @@ from eventlet import Timeout
import swift
from swift.common import utils, swob
from swift.proxy import server as proxy_server
-from swift.common.storage_policy import StoragePolicy, POLICIES
+from swift.proxy.controllers import obj
+from swift.proxy.controllers.base import get_info as _real_get_info
+from swift.common.storage_policy import POLICIES, ECDriverError
from test.unit import FakeRing, FakeMemcache, fake_http_connect, \
- debug_logger, patch_policies
+ debug_logger, patch_policies, SlowBody
from test.unit.proxy.test_server import node_error_count
+def unchunk_body(chunked_body):
+ body = ''
+ remaining = chunked_body
+ while remaining:
+ hex_length, remaining = remaining.split('\r\n', 1)
+ length = int(hex_length, 16)
+ body += remaining[:length]
+ remaining = remaining[length + 2:]
+ return body
+
+
@contextmanager
def set_http_connect(*args, **kwargs):
old_connect = swift.proxy.controllers.base.http_connect
@@ -55,31 +72,76 @@ def set_http_connect(*args, **kwargs):
class PatchedObjControllerApp(proxy_server.Application):
"""
- This patch is just a hook over handle_request to ensure that when
- get_controller is called the ObjectController class is patched to
- return a (possibly stubbed) ObjectController class.
+ This patch is just a hook over the proxy server's __call__ to ensure
+ that calls to get_info will return the stubbed value for
+ container_info if it's a container info call.
"""
- object_controller = proxy_server.ObjectController
+ container_info = {}
+ per_container_info = {}
+
+ def __call__(self, *args, **kwargs):
- def handle_request(self, req):
- with mock.patch('swift.proxy.server.ObjectController',
- new=self.object_controller):
- return super(PatchedObjControllerApp, self).handle_request(req)
+ def _fake_get_info(app, env, account, container=None, **kwargs):
+ if container:
+ if container in self.per_container_info:
+ return self.per_container_info[container]
+ return self.container_info
+ else:
+ return _real_get_info(app, env, account, container, **kwargs)
+ mock_path = 'swift.proxy.controllers.base.get_info'
+ with mock.patch(mock_path, new=_fake_get_info):
+ return super(
+ PatchedObjControllerApp, self).__call__(*args, **kwargs)
+
+
+class BaseObjectControllerMixin(object):
+ container_info = {
+ 'write_acl': None,
+ 'read_acl': None,
+ 'storage_policy': None,
+ 'sync_key': None,
+ 'versions': None,
+ }
+
+ # this needs to be set on the test case
+ controller_cls = None
-@patch_policies([StoragePolicy(0, 'zero', True,
- object_ring=FakeRing(max_more_nodes=9))])
-class TestObjControllerWriteAffinity(unittest.TestCase):
def setUp(self):
- self.app = proxy_server.Application(
+ # setup fake rings with handoffs
+ for policy in POLICIES:
+ policy.object_ring.max_more_nodes = policy.object_ring.replicas
+
+ self.logger = debug_logger('proxy-server')
+ self.logger.thread_locals = ('txn1', '127.0.0.2')
+ self.app = PatchedObjControllerApp(
None, FakeMemcache(), account_ring=FakeRing(),
- container_ring=FakeRing(), logger=debug_logger())
- self.app.request_node_count = lambda ring: 10000000
- self.app.sort_nodes = lambda l: l # stop shuffling the primary nodes
+ container_ring=FakeRing(), logger=self.logger)
+ # you can over-ride the container_info just by setting it on the app
+ self.app.container_info = dict(self.container_info)
+ # default policy and ring references
+ self.policy = POLICIES.default
+ self.obj_ring = self.policy.object_ring
+ self._ts_iter = (utils.Timestamp(t) for t in
+ itertools.count(int(time.time())))
+
+ def ts(self):
+ return self._ts_iter.next()
+
+ def replicas(self, policy=None):
+ policy = policy or POLICIES.default
+ return policy.object_ring.replicas
+
+ def quorum(self, policy=None):
+ policy = policy or POLICIES.default
+ return policy.quorum
def test_iter_nodes_local_first_noops_when_no_affinity(self):
- controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o')
+ # this test needs a stable node order - most don't
+ self.app.sort_nodes = lambda l: l
+ controller = self.controller_cls(
+ self.app, 'a', 'c', 'o')
self.app.write_affinity_is_local_fn = None
object_ring = self.app.get_object_ring(None)
all_nodes = object_ring.get_part_nodes(1)
@@ -93,80 +155,335 @@ class TestObjControllerWriteAffinity(unittest.TestCase):
self.assertEqual(all_nodes, local_first_nodes)
def test_iter_nodes_local_first_moves_locals_first(self):
- controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o')
+ controller = self.controller_cls(
+ self.app, 'a', 'c', 'o')
self.app.write_affinity_is_local_fn = (
lambda node: node['region'] == 1)
- self.app.write_affinity_node_count = lambda ring: 4
+ # we'll write to one more than replica count local nodes
+ self.app.write_affinity_node_count = lambda r: r + 1
object_ring = self.app.get_object_ring(None)
+ # make our fake ring have plenty of nodes, and not get limited
+ # artificially by the proxy max request node count
+ object_ring.max_more_nodes = 100000
+ self.app.request_node_count = lambda r: 100000
+
all_nodes = object_ring.get_part_nodes(1)
all_nodes.extend(object_ring.get_more_nodes(1))
+ # i guess fake_ring wants the get_more_nodes iter to more safely be
+ # converted to a list with a smallish sort of limit which *can* be
+ # lower than max_more_nodes
+ fake_rings_real_max_more_nodes_value = object_ring.replicas ** 2
+ self.assertEqual(len(all_nodes), fake_rings_real_max_more_nodes_value)
+
+ # make sure we have enough local nodes (sanity)
+ all_local_nodes = [n for n in all_nodes if
+ self.app.write_affinity_is_local_fn(n)]
+ self.assertTrue(len(all_local_nodes) >= self.replicas() + 1)
+
+ # finally, create the local_first_nodes iter and flatten it out
local_first_nodes = list(controller.iter_nodes_local_first(
object_ring, 1))
# the local nodes move up in the ordering
- self.assertEqual([1, 1, 1, 1],
- [node['region'] for node in local_first_nodes[:4]])
+ self.assertEqual([1] * (self.replicas() + 1), [
+ node['region'] for node in local_first_nodes[
+ :self.replicas() + 1]])
# we don't skip any nodes
self.assertEqual(len(all_nodes), len(local_first_nodes))
self.assertEqual(sorted(all_nodes), sorted(local_first_nodes))
+ def test_iter_nodes_local_first_best_effort(self):
+ controller = self.controller_cls(
+ self.app, 'a', 'c', 'o')
+ self.app.write_affinity_is_local_fn = (
+ lambda node: node['region'] == 1)
+
+ object_ring = self.app.get_object_ring(None)
+ all_nodes = object_ring.get_part_nodes(1)
+ all_nodes.extend(object_ring.get_more_nodes(1))
+
+ local_first_nodes = list(controller.iter_nodes_local_first(
+ object_ring, 1))
+
+ # we won't have quite enough local nodes...
+ self.assertEqual(len(all_nodes), self.replicas() +
+ POLICIES.default.object_ring.max_more_nodes)
+ all_local_nodes = [n for n in all_nodes if
+ self.app.write_affinity_is_local_fn(n)]
+ self.assertEqual(len(all_local_nodes), self.replicas())
+ # but the local nodes we do have are at the front of the local iter
+ first_n_local_first_nodes = local_first_nodes[:len(all_local_nodes)]
+ self.assertEqual(sorted(all_local_nodes),
+ sorted(first_n_local_first_nodes))
+ # but we *still* don't *skip* any nodes
+ self.assertEqual(len(all_nodes), len(local_first_nodes))
+ self.assertEqual(sorted(all_nodes), sorted(local_first_nodes))
+
def test_connect_put_node_timeout(self):
- controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o')
+ controller = self.controller_cls(
+ self.app, 'a', 'c', 'o')
self.app.conn_timeout = 0.05
with set_http_connect(slow_connect=True):
nodes = [dict(ip='', port='', device='')]
res = controller._connect_put_node(nodes, '', '', {}, ('', ''))
self.assertTrue(res is None)
+ def test_DELETE_simple(self):
+ req = swift.common.swob.Request.blank('/v1/a/c/o', method='DELETE')
+ codes = [204] * self.replicas()
+ with set_http_connect(*codes):
+ resp = req.get_response(self.app)
+ self.assertEquals(resp.status_int, 204)
+
+ def test_DELETE_missing_one(self):
+ req = swift.common.swob.Request.blank('/v1/a/c/o', method='DELETE')
+ codes = [404] + [204] * (self.replicas() - 1)
+ random.shuffle(codes)
+ with set_http_connect(*codes):
+ resp = req.get_response(self.app)
+ self.assertEquals(resp.status_int, 204)
-@patch_policies([
- StoragePolicy(0, 'zero', True),
- StoragePolicy(1, 'one'),
- StoragePolicy(2, 'two'),
-])
-class TestObjController(unittest.TestCase):
- container_info = {
- 'partition': 1,
- 'nodes': [
- {'ip': '127.0.0.1', 'port': '1', 'device': 'sda'},
- {'ip': '127.0.0.1', 'port': '2', 'device': 'sda'},
- {'ip': '127.0.0.1', 'port': '3', 'device': 'sda'},
- ],
- 'write_acl': None,
- 'read_acl': None,
- 'storage_policy': None,
- 'sync_key': None,
- 'versions': None,
- }
+ def test_DELETE_not_found(self):
+ req = swift.common.swob.Request.blank('/v1/a/c/o', method='DELETE')
+ codes = [404] * (self.replicas() - 1) + [204]
+ with set_http_connect(*codes):
+ resp = req.get_response(self.app)
+ self.assertEquals(resp.status_int, 404)
- def setUp(self):
- # setup fake rings with handoffs
- self.obj_ring = FakeRing(max_more_nodes=3)
- for policy in POLICIES:
- policy.object_ring = self.obj_ring
+ def test_DELETE_mostly_found(self):
+ req = swift.common.swob.Request.blank('/v1/a/c/o', method='DELETE')
+ mostly_204s = [204] * self.quorum()
+ codes = mostly_204s + [404] * (self.replicas() - len(mostly_204s))
+ self.assertEqual(len(codes), self.replicas())
+ with set_http_connect(*codes):
+ resp = req.get_response(self.app)
+ self.assertEquals(resp.status_int, 204)
- logger = debug_logger('proxy-server')
- logger.thread_locals = ('txn1', '127.0.0.2')
- self.app = PatchedObjControllerApp(
- None, FakeMemcache(), account_ring=FakeRing(),
- container_ring=FakeRing(), logger=logger)
+ def test_DELETE_mostly_not_found(self):
+ req = swift.common.swob.Request.blank('/v1/a/c/o', method='DELETE')
+ mostly_404s = [404] * self.quorum()
+ codes = mostly_404s + [204] * (self.replicas() - len(mostly_404s))
+ self.assertEqual(len(codes), self.replicas())
+ with set_http_connect(*codes):
+ resp = req.get_response(self.app)
+ self.assertEquals(resp.status_int, 404)
+
+ def test_DELETE_half_not_found_statuses(self):
+ self.obj_ring.set_replicas(4)
+
+ req = swift.common.swob.Request.blank('/v1/a/c/o', method='DELETE')
+ with set_http_connect(404, 204, 404, 204):
+ resp = req.get_response(self.app)
+ self.assertEquals(resp.status_int, 204)
+
+ def test_DELETE_half_not_found_headers_and_body(self):
+ # Transformed responses have bogus bodies and headers, so make sure we
+ # send the client headers and body from a real node's response.
+ self.obj_ring.set_replicas(4)
+
+ status_codes = (404, 404, 204, 204)
+ bodies = ('not found', 'not found', '', '')
+ headers = [{}, {}, {'Pick-Me': 'yes'}, {'Pick-Me': 'yes'}]
+
+ req = swift.common.swob.Request.blank('/v1/a/c/o', method='DELETE')
+ with set_http_connect(*status_codes, body_iter=bodies,
+ headers=headers):
+ resp = req.get_response(self.app)
+ self.assertEquals(resp.status_int, 204)
+ self.assertEquals(resp.headers.get('Pick-Me'), 'yes')
+ self.assertEquals(resp.body, '')
+
+ def test_DELETE_handoff(self):
+ req = swift.common.swob.Request.blank('/v1/a/c/o', method='DELETE')
+ codes = [204] * self.replicas()
+ with set_http_connect(507, *codes):
+ resp = req.get_response(self.app)
+ self.assertEquals(resp.status_int, 204)
+
+ def test_POST_non_int_delete_after(self):
+ t = str(int(time.time() + 100)) + '.1'
+ req = swob.Request.blank('/v1/a/c/o', method='POST',
+ headers={'Content-Type': 'foo/bar',
+ 'X-Delete-After': t})
+ resp = req.get_response(self.app)
+ self.assertEqual(resp.status_int, 400)
+ self.assertEqual('Non-integer X-Delete-After', resp.body)
+
+ def test_PUT_non_int_delete_after(self):
+ t = str(int(time.time() + 100)) + '.1'
+ req = swob.Request.blank('/v1/a/c/o', method='PUT', body='',
+ headers={'Content-Type': 'foo/bar',
+ 'X-Delete-After': t})
+ with set_http_connect():
+ resp = req.get_response(self.app)
+ self.assertEqual(resp.status_int, 400)
+ self.assertEqual('Non-integer X-Delete-After', resp.body)
+
+ def test_POST_negative_delete_after(self):
+ req = swob.Request.blank('/v1/a/c/o', method='POST',
+ headers={'Content-Type': 'foo/bar',
+ 'X-Delete-After': '-60'})
+ resp = req.get_response(self.app)
+ self.assertEqual(resp.status_int, 400)
+ self.assertEqual('X-Delete-After in past', resp.body)
+
+ def test_PUT_negative_delete_after(self):
+ req = swob.Request.blank('/v1/a/c/o', method='PUT', body='',
+ headers={'Content-Type': 'foo/bar',
+ 'X-Delete-After': '-60'})
+ with set_http_connect():
+ resp = req.get_response(self.app)
+ self.assertEqual(resp.status_int, 400)
+ self.assertEqual('X-Delete-After in past', resp.body)
+
+ def test_POST_delete_at_non_integer(self):
+ t = str(int(time.time() + 100)) + '.1'
+ req = swob.Request.blank('/v1/a/c/o', method='POST',
+ headers={'Content-Type': 'foo/bar',
+ 'X-Delete-At': t})
+ resp = req.get_response(self.app)
+ self.assertEqual(resp.status_int, 400)
+ self.assertEqual('Non-integer X-Delete-At', resp.body)
+
+ def test_PUT_delete_at_non_integer(self):
+ t = str(int(time.time() - 100)) + '.1'
+ req = swob.Request.blank('/v1/a/c/o', method='PUT', body='',
+ headers={'Content-Type': 'foo/bar',
+ 'X-Delete-At': t})
+ with set_http_connect():
+ resp = req.get_response(self.app)
+ self.assertEqual(resp.status_int, 400)
+ self.assertEqual('Non-integer X-Delete-At', resp.body)
+
+ def test_POST_delete_at_in_past(self):
+ t = str(int(time.time() - 100))
+ req = swob.Request.blank('/v1/a/c/o', method='POST',
+ headers={'Content-Type': 'foo/bar',
+ 'X-Delete-At': t})
+ resp = req.get_response(self.app)
+ self.assertEqual(resp.status_int, 400)
+ self.assertEqual('X-Delete-At in past', resp.body)
+
+ def test_PUT_delete_at_in_past(self):
+ t = str(int(time.time() - 100))
+ req = swob.Request.blank('/v1/a/c/o', method='PUT', body='',
+ headers={'Content-Type': 'foo/bar',
+ 'X-Delete-At': t})
+ with set_http_connect():
+ resp = req.get_response(self.app)
+ self.assertEqual(resp.status_int, 400)
+ self.assertEqual('X-Delete-At in past', resp.body)
- class FakeContainerInfoObjController(proxy_server.ObjectController):
+ def test_HEAD_simple(self):
+ req = swift.common.swob.Request.blank('/v1/a/c/o', method='HEAD')
+ with set_http_connect(200):
+ resp = req.get_response(self.app)
+ self.assertEquals(resp.status_int, 200)
- def container_info(controller, *args, **kwargs):
- patch_path = 'swift.proxy.controllers.base.get_info'
- with mock.patch(patch_path) as mock_get_info:
- mock_get_info.return_value = dict(self.container_info)
- return super(FakeContainerInfoObjController,
- controller).container_info(*args, **kwargs)
+ def test_HEAD_x_newest(self):
+ req = swift.common.swob.Request.blank('/v1/a/c/o', method='HEAD',
+ headers={'X-Newest': 'true'})
+ with set_http_connect(200, 200, 200):
+ resp = req.get_response(self.app)
+ self.assertEquals(resp.status_int, 200)
+
+ def test_HEAD_x_newest_different_timestamps(self):
+ req = swob.Request.blank('/v1/a/c/o', method='HEAD',
+ headers={'X-Newest': 'true'})
+ ts = (utils.Timestamp(t) for t in itertools.count(int(time.time())))
+ timestamps = [next(ts) for i in range(3)]
+ newest_timestamp = timestamps[-1]
+ random.shuffle(timestamps)
+ backend_response_headers = [{
+ 'X-Backend-Timestamp': t.internal,
+ 'X-Timestamp': t.normal
+ } for t in timestamps]
+ with set_http_connect(200, 200, 200,
+ headers=backend_response_headers):
+ resp = req.get_response(self.app)
+ self.assertEqual(resp.status_int, 200)
+ self.assertEqual(resp.headers['x-timestamp'], newest_timestamp.normal)
- # this is taking advantage of the fact that self.app is a
- # PachedObjControllerApp, so handle_response will route into an
- # instance of our FakeContainerInfoObjController just by
- # overriding the class attribute for object_controller
- self.app.object_controller = FakeContainerInfoObjController
+ def test_HEAD_x_newest_with_two_vector_timestamps(self):
+ req = swob.Request.blank('/v1/a/c/o', method='HEAD',
+ headers={'X-Newest': 'true'})
+ ts = (utils.Timestamp(time.time(), offset=offset)
+ for offset in itertools.count())
+ timestamps = [next(ts) for i in range(3)]
+ newest_timestamp = timestamps[-1]
+ random.shuffle(timestamps)
+ backend_response_headers = [{
+ 'X-Backend-Timestamp': t.internal,
+ 'X-Timestamp': t.normal
+ } for t in timestamps]
+ with set_http_connect(200, 200, 200,
+ headers=backend_response_headers):
+ resp = req.get_response(self.app)
+ self.assertEqual(resp.status_int, 200)
+ self.assertEqual(resp.headers['x-backend-timestamp'],
+ newest_timestamp.internal)
+
+ def test_HEAD_x_newest_with_some_missing(self):
+ req = swob.Request.blank('/v1/a/c/o', method='HEAD',
+ headers={'X-Newest': 'true'})
+ ts = (utils.Timestamp(t) for t in itertools.count(int(time.time())))
+ request_count = self.app.request_node_count(self.obj_ring.replicas)
+ backend_response_headers = [{
+ 'x-timestamp': next(ts).normal,
+ } for i in range(request_count)]
+ responses = [404] * (request_count - 1)
+ responses.append(200)
+ request_log = []
+
+ def capture_requests(ip, port, device, part, method, path,
+ headers=None, **kwargs):
+ req = {
+ 'ip': ip,
+ 'port': port,
+ 'device': device,
+ 'part': part,
+ 'method': method,
+ 'path': path,
+ 'headers': headers,
+ }
+ request_log.append(req)
+ with set_http_connect(*responses,
+ headers=backend_response_headers,
+ give_connect=capture_requests):
+ resp = req.get_response(self.app)
+ self.assertEqual(resp.status_int, 200)
+ for req in request_log:
+ self.assertEqual(req['method'], 'HEAD')
+ self.assertEqual(req['path'], '/a/c/o')
+
+ def test_container_sync_delete(self):
+ ts = (utils.Timestamp(t) for t in itertools.count(int(time.time())))
+ test_indexes = [None] + [int(p) for p in POLICIES]
+ for policy_index in test_indexes:
+ req = swob.Request.blank(
+ '/v1/a/c/o', method='DELETE', headers={
+ 'X-Timestamp': ts.next().internal})
+ codes = [409] * self.obj_ring.replicas
+ ts_iter = itertools.repeat(ts.next().internal)
+ with set_http_connect(*codes, timestamps=ts_iter):
+ resp = req.get_response(self.app)
+ self.assertEqual(resp.status_int, 409)
+
+ def test_PUT_requires_length(self):
+ req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT')
+ resp = req.get_response(self.app)
+ self.assertEquals(resp.status_int, 411)
+
+# end of BaseObjectControllerMixin
+
+
+@patch_policies()
+class TestReplicatedObjController(BaseObjectControllerMixin,
+ unittest.TestCase):
+
+ controller_cls = obj.ReplicatedObjectController
def test_PUT_simple(self):
req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT')
@@ -279,56 +596,6 @@ class TestObjController(unittest.TestCase):
resp = req.get_response(self.app)
self.assertEquals(resp.status_int, 404)
- def test_DELETE_simple(self):
- req = swift.common.swob.Request.blank('/v1/a/c/o', method='DELETE')
- with set_http_connect(204, 204, 204):
- resp = req.get_response(self.app)
- self.assertEquals(resp.status_int, 204)
-
- def test_DELETE_missing_one(self):
- req = swift.common.swob.Request.blank('/v1/a/c/o', method='DELETE')
- with set_http_connect(404, 204, 204):
- resp = req.get_response(self.app)
- self.assertEquals(resp.status_int, 204)
-
- def test_DELETE_half_not_found_statuses(self):
- self.obj_ring.set_replicas(4)
-
- req = swift.common.swob.Request.blank('/v1/a/c/o', method='DELETE')
- with set_http_connect(404, 204, 404, 204):
- resp = req.get_response(self.app)
- self.assertEquals(resp.status_int, 204)
-
- def test_DELETE_half_not_found_headers_and_body(self):
- # Transformed responses have bogus bodies and headers, so make sure we
- # send the client headers and body from a real node's response.
- self.obj_ring.set_replicas(4)
-
- status_codes = (404, 404, 204, 204)
- bodies = ('not found', 'not found', '', '')
- headers = [{}, {}, {'Pick-Me': 'yes'}, {'Pick-Me': 'yes'}]
-
- req = swift.common.swob.Request.blank('/v1/a/c/o', method='DELETE')
- with set_http_connect(*status_codes, body_iter=bodies,
- headers=headers):
- resp = req.get_response(self.app)
- self.assertEquals(resp.status_int, 204)
- self.assertEquals(resp.headers.get('Pick-Me'), 'yes')
- self.assertEquals(resp.body, '')
-
- def test_DELETE_not_found(self):
- req = swift.common.swob.Request.blank('/v1/a/c/o', method='DELETE')
- with set_http_connect(404, 404, 204):
- resp = req.get_response(self.app)
- self.assertEquals(resp.status_int, 404)
-
- def test_DELETE_handoff(self):
- req = swift.common.swob.Request.blank('/v1/a/c/o', method='DELETE')
- codes = [204] * self.obj_ring.replicas
- with set_http_connect(507, *codes):
- resp = req.get_response(self.app)
- self.assertEquals(resp.status_int, 204)
-
def test_POST_as_COPY_simple(self):
req = swift.common.swob.Request.blank('/v1/a/c/o', method='POST')
head_resp = [200] * self.obj_ring.replicas + \
@@ -364,45 +631,11 @@ class TestObjController(unittest.TestCase):
self.assertTrue('X-Delete-At-Partition' in given_headers)
self.assertTrue('X-Delete-At-Container' in given_headers)
- def test_POST_non_int_delete_after(self):
- t = str(int(time.time() + 100)) + '.1'
- req = swob.Request.blank('/v1/a/c/o', method='POST',
- headers={'Content-Type': 'foo/bar',
- 'X-Delete-After': t})
- resp = req.get_response(self.app)
- self.assertEqual(resp.status_int, 400)
- self.assertEqual('Non-integer X-Delete-After', resp.body)
-
- def test_POST_negative_delete_after(self):
- req = swob.Request.blank('/v1/a/c/o', method='POST',
- headers={'Content-Type': 'foo/bar',
- 'X-Delete-After': '-60'})
- resp = req.get_response(self.app)
- self.assertEqual(resp.status_int, 400)
- self.assertEqual('X-Delete-After in past', resp.body)
-
- def test_POST_delete_at_non_integer(self):
- t = str(int(time.time() + 100)) + '.1'
- req = swob.Request.blank('/v1/a/c/o', method='POST',
- headers={'Content-Type': 'foo/bar',
- 'X-Delete-At': t})
- resp = req.get_response(self.app)
- self.assertEqual(resp.status_int, 400)
- self.assertEqual('Non-integer X-Delete-At', resp.body)
-
- def test_POST_delete_at_in_past(self):
- t = str(int(time.time() - 100))
- req = swob.Request.blank('/v1/a/c/o', method='POST',
- headers={'Content-Type': 'foo/bar',
- 'X-Delete-At': t})
- resp = req.get_response(self.app)
- self.assertEqual(resp.status_int, 400)
- self.assertEqual('X-Delete-At in past', resp.body)
-
- def test_PUT_converts_delete_after_to_delete_at(self):
+ def test_PUT_delete_at(self):
+ t = str(int(time.time() + 100))
req = swob.Request.blank('/v1/a/c/o', method='PUT', body='',
headers={'Content-Type': 'foo/bar',
- 'X-Delete-After': '60'})
+ 'X-Delete-At': t})
put_headers = []
def capture_headers(ip, port, device, part, method, path, headers,
@@ -410,44 +643,20 @@ class TestObjController(unittest.TestCase):
if method == 'PUT':
put_headers.append(headers)
codes = [201] * self.obj_ring.replicas
- t = time.time()
with set_http_connect(*codes, give_connect=capture_headers):
- with mock.patch('time.time', lambda: t):
- resp = req.get_response(self.app)
+ resp = req.get_response(self.app)
self.assertEquals(resp.status_int, 201)
- expected_delete_at = str(int(t) + 60)
for given_headers in put_headers:
- self.assertEquals(given_headers.get('X-Delete-At'),
- expected_delete_at)
+ self.assertEquals(given_headers.get('X-Delete-At'), t)
self.assertTrue('X-Delete-At-Host' in given_headers)
self.assertTrue('X-Delete-At-Device' in given_headers)
self.assertTrue('X-Delete-At-Partition' in given_headers)
self.assertTrue('X-Delete-At-Container' in given_headers)
- def test_PUT_non_int_delete_after(self):
- t = str(int(time.time() + 100)) + '.1'
- req = swob.Request.blank('/v1/a/c/o', method='PUT', body='',
- headers={'Content-Type': 'foo/bar',
- 'X-Delete-After': t})
- with set_http_connect():
- resp = req.get_response(self.app)
- self.assertEqual(resp.status_int, 400)
- self.assertEqual('Non-integer X-Delete-After', resp.body)
-
- def test_PUT_negative_delete_after(self):
- req = swob.Request.blank('/v1/a/c/o', method='PUT', body='',
- headers={'Content-Type': 'foo/bar',
- 'X-Delete-After': '-60'})
- with set_http_connect():
- resp = req.get_response(self.app)
- self.assertEqual(resp.status_int, 400)
- self.assertEqual('X-Delete-After in past', resp.body)
-
- def test_PUT_delete_at(self):
- t = str(int(time.time() + 100))
+ def test_PUT_converts_delete_after_to_delete_at(self):
req = swob.Request.blank('/v1/a/c/o', method='PUT', body='',
headers={'Content-Type': 'foo/bar',
- 'X-Delete-At': t})
+ 'X-Delete-After': '60'})
put_headers = []
def capture_headers(ip, port, device, part, method, path, headers,
@@ -455,40 +664,24 @@ class TestObjController(unittest.TestCase):
if method == 'PUT':
put_headers.append(headers)
codes = [201] * self.obj_ring.replicas
+ t = time.time()
with set_http_connect(*codes, give_connect=capture_headers):
- resp = req.get_response(self.app)
+ with mock.patch('time.time', lambda: t):
+ resp = req.get_response(self.app)
self.assertEquals(resp.status_int, 201)
+ expected_delete_at = str(int(t) + 60)
for given_headers in put_headers:
- self.assertEquals(given_headers.get('X-Delete-At'), t)
+ self.assertEquals(given_headers.get('X-Delete-At'),
+ expected_delete_at)
self.assertTrue('X-Delete-At-Host' in given_headers)
self.assertTrue('X-Delete-At-Device' in given_headers)
self.assertTrue('X-Delete-At-Partition' in given_headers)
self.assertTrue('X-Delete-At-Container' in given_headers)
- def test_PUT_delete_at_non_integer(self):
- t = str(int(time.time() - 100)) + '.1'
- req = swob.Request.blank('/v1/a/c/o', method='PUT', body='',
- headers={'Content-Type': 'foo/bar',
- 'X-Delete-At': t})
- with set_http_connect():
- resp = req.get_response(self.app)
- self.assertEqual(resp.status_int, 400)
- self.assertEqual('Non-integer X-Delete-At', resp.body)
-
- def test_PUT_delete_at_in_past(self):
- t = str(int(time.time() - 100))
- req = swob.Request.blank('/v1/a/c/o', method='PUT', body='',
- headers={'Content-Type': 'foo/bar',
- 'X-Delete-At': t})
- with set_http_connect():
- resp = req.get_response(self.app)
- self.assertEqual(resp.status_int, 400)
- self.assertEqual('X-Delete-At in past', resp.body)
-
def test_container_sync_put_x_timestamp_not_found(self):
test_indexes = [None] + [int(p) for p in POLICIES]
for policy_index in test_indexes:
- self.container_info['storage_policy'] = policy_index
+ self.app.container_info['storage_policy'] = policy_index
put_timestamp = utils.Timestamp(time.time()).normal
req = swob.Request.blank(
'/v1/a/c/o', method='PUT', headers={
@@ -502,7 +695,7 @@ class TestObjController(unittest.TestCase):
def test_container_sync_put_x_timestamp_match(self):
test_indexes = [None] + [int(p) for p in POLICIES]
for policy_index in test_indexes:
- self.container_info['storage_policy'] = policy_index
+ self.app.container_info['storage_policy'] = policy_index
put_timestamp = utils.Timestamp(time.time()).normal
req = swob.Request.blank(
'/v1/a/c/o', method='PUT', headers={
@@ -518,7 +711,7 @@ class TestObjController(unittest.TestCase):
ts = (utils.Timestamp(t) for t in itertools.count(int(time.time())))
test_indexes = [None] + [int(p) for p in POLICIES]
for policy_index in test_indexes:
- self.container_info['storage_policy'] = policy_index
+ self.app.container_info['storage_policy'] = policy_index
req = swob.Request.blank(
'/v1/a/c/o', method='PUT', headers={
'Content-Length': 0,
@@ -544,19 +737,6 @@ class TestObjController(unittest.TestCase):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 201)
- def test_container_sync_delete(self):
- ts = (utils.Timestamp(t) for t in itertools.count(int(time.time())))
- test_indexes = [None] + [int(p) for p in POLICIES]
- for policy_index in test_indexes:
- req = swob.Request.blank(
- '/v1/a/c/o', method='DELETE', headers={
- 'X-Timestamp': ts.next().internal})
- codes = [409] * self.obj_ring.replicas
- ts_iter = itertools.repeat(ts.next().internal)
- with set_http_connect(*codes, timestamps=ts_iter):
- resp = req.get_response(self.app)
- self.assertEqual(resp.status_int, 409)
-
def test_put_x_timestamp_conflict(self):
ts = (utils.Timestamp(t) for t in itertools.count(int(time.time())))
req = swob.Request.blank(
@@ -624,88 +804,6 @@ class TestObjController(unittest.TestCase):
resp = req.get_response(self.app)
self.assertEquals(resp.status_int, 201)
- def test_HEAD_simple(self):
- req = swift.common.swob.Request.blank('/v1/a/c/o', method='HEAD')
- with set_http_connect(200):
- resp = req.get_response(self.app)
- self.assertEquals(resp.status_int, 200)
-
- def test_HEAD_x_newest(self):
- req = swift.common.swob.Request.blank('/v1/a/c/o', method='HEAD',
- headers={'X-Newest': 'true'})
- with set_http_connect(200, 200, 200):
- resp = req.get_response(self.app)
- self.assertEquals(resp.status_int, 200)
-
- def test_HEAD_x_newest_different_timestamps(self):
- req = swob.Request.blank('/v1/a/c/o', method='HEAD',
- headers={'X-Newest': 'true'})
- ts = (utils.Timestamp(t) for t in itertools.count(int(time.time())))
- timestamps = [next(ts) for i in range(3)]
- newest_timestamp = timestamps[-1]
- random.shuffle(timestamps)
- backend_response_headers = [{
- 'X-Backend-Timestamp': t.internal,
- 'X-Timestamp': t.normal
- } for t in timestamps]
- with set_http_connect(200, 200, 200,
- headers=backend_response_headers):
- resp = req.get_response(self.app)
- self.assertEqual(resp.status_int, 200)
- self.assertEqual(resp.headers['x-timestamp'], newest_timestamp.normal)
-
- def test_HEAD_x_newest_with_two_vector_timestamps(self):
- req = swob.Request.blank('/v1/a/c/o', method='HEAD',
- headers={'X-Newest': 'true'})
- ts = (utils.Timestamp(time.time(), offset=offset)
- for offset in itertools.count())
- timestamps = [next(ts) for i in range(3)]
- newest_timestamp = timestamps[-1]
- random.shuffle(timestamps)
- backend_response_headers = [{
- 'X-Backend-Timestamp': t.internal,
- 'X-Timestamp': t.normal
- } for t in timestamps]
- with set_http_connect(200, 200, 200,
- headers=backend_response_headers):
- resp = req.get_response(self.app)
- self.assertEqual(resp.status_int, 200)
- self.assertEqual(resp.headers['x-backend-timestamp'],
- newest_timestamp.internal)
-
- def test_HEAD_x_newest_with_some_missing(self):
- req = swob.Request.blank('/v1/a/c/o', method='HEAD',
- headers={'X-Newest': 'true'})
- ts = (utils.Timestamp(t) for t in itertools.count(int(time.time())))
- request_count = self.app.request_node_count(self.obj_ring.replicas)
- backend_response_headers = [{
- 'x-timestamp': next(ts).normal,
- } for i in range(request_count)]
- responses = [404] * (request_count - 1)
- responses.append(200)
- request_log = []
-
- def capture_requests(ip, port, device, part, method, path,
- headers=None, **kwargs):
- req = {
- 'ip': ip,
- 'port': port,
- 'device': device,
- 'part': part,
- 'method': method,
- 'path': path,
- 'headers': headers,
- }
- request_log.append(req)
- with set_http_connect(*responses,
- headers=backend_response_headers,
- give_connect=capture_requests):
- resp = req.get_response(self.app)
- self.assertEqual(resp.status_int, 200)
- for req in request_log:
- self.assertEqual(req['method'], 'HEAD')
- self.assertEqual(req['path'], '/a/c/o')
-
def test_PUT_log_info(self):
req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT')
req.headers['x-copy-from'] = 'some/where'
@@ -731,18 +829,15 @@ class TestObjController(unittest.TestCase):
self.assertEquals(req.environ.get('swift.log_info'), None)
-@patch_policies([
- StoragePolicy(0, 'zero', True),
- StoragePolicy(1, 'one'),
- StoragePolicy(2, 'two'),
-])
-class TestObjControllerLegacyCache(TestObjController):
+@patch_policies(legacy_only=True)
+class TestObjControllerLegacyCache(TestReplicatedObjController):
"""
This test pretends like memcache returned a stored value that should
resemble whatever "old" format. It catches KeyErrors you'd get if your
code was expecting some new format during a rolling upgrade.
"""
+ # in this case policy_index is missing
container_info = {
'read_acl': None,
'write_acl': None,
@@ -750,6 +845,567 @@ class TestObjControllerLegacyCache(TestObjController):
'versions': None,
}
+ def test_invalid_storage_policy_cache(self):
+ self.app.container_info['storage_policy'] = 1
+ for method in ('GET', 'HEAD', 'POST', 'PUT', 'COPY'):
+ req = swob.Request.blank('/v1/a/c/o', method=method)
+ with set_http_connect():
+ resp = req.get_response(self.app)
+ self.assertEqual(resp.status_int, 503)
+
+
+@patch_policies(with_ec_default=True)
+class TestECObjController(BaseObjectControllerMixin, unittest.TestCase):
+ container_info = {
+ 'read_acl': None,
+ 'write_acl': None,
+ 'sync_key': None,
+ 'versions': None,
+ 'storage_policy': '0',
+ }
+
+ controller_cls = obj.ECObjectController
+
+ def test_determine_chunk_destinations(self):
+ class FakePutter(object):
+ def __init__(self, index):
+ self.node_index = index
+
+ controller = self.controller_cls(
+ self.app, 'a', 'c', 'o')
+
+ # create a dummy list of putters, check no handoffs
+ putters = []
+ for index in range(0, 4):
+ putters.append(FakePutter(index))
+ got = controller._determine_chunk_destinations(putters)
+ expected = {}
+ for i, p in enumerate(putters):
+ expected[p] = i
+ self.assertEquals(got, expected)
+
+ # now lets make a handoff at the end
+ putters[3].node_index = None
+ got = controller._determine_chunk_destinations(putters)
+ self.assertEquals(got, expected)
+ putters[3].node_index = 3
+
+ # now lets make a handoff at the start
+ putters[0].node_index = None
+ got = controller._determine_chunk_destinations(putters)
+ self.assertEquals(got, expected)
+ putters[0].node_index = 0
+
+ # now lets make a handoff in the middle
+ putters[2].node_index = None
+ got = controller._determine_chunk_destinations(putters)
+ self.assertEquals(got, expected)
+ putters[2].node_index = 0
+
+ # now lets make all of them handoffs
+ for index in range(0, 4):
+ putters[index].node_index = None
+ got = controller._determine_chunk_destinations(putters)
+ self.assertEquals(got, expected)
+
+ def test_GET_simple(self):
+ req = swift.common.swob.Request.blank('/v1/a/c/o')
+ get_resp = [200] * self.policy.ec_ndata
+ with set_http_connect(*get_resp):
+ resp = req.get_response(self.app)
+ self.assertEquals(resp.status_int, 200)
+
+ def test_GET_simple_x_newest(self):
+ req = swift.common.swob.Request.blank('/v1/a/c/o',
+ headers={'X-Newest': 'true'})
+ codes = [200] * self.replicas()
+ codes += [404] * self.obj_ring.max_more_nodes
+ with set_http_connect(*codes):
+ resp = req.get_response(self.app)
+ self.assertEquals(resp.status_int, 200)
+
+ def test_GET_error(self):
+ req = swift.common.swob.Request.blank('/v1/a/c/o')
+ get_resp = [503] + [200] * self.policy.ec_ndata
+ with set_http_connect(*get_resp):
+ resp = req.get_response(self.app)
+ self.assertEquals(resp.status_int, 200)
+
+ def test_GET_with_body(self):
+ req = swift.common.swob.Request.blank('/v1/a/c/o')
+ # turn a real body into fragments
+ segment_size = self.policy.ec_segment_size
+ real_body = ('asdf' * segment_size)[:-10]
+ # split it up into chunks
+ chunks = [real_body[x:x + segment_size]
+ for x in range(0, len(real_body), segment_size)]
+ fragment_payloads = []
+ for chunk in chunks:
+ fragments = self.policy.pyeclib_driver.encode(chunk)
+ if not fragments:
+ break
+ fragment_payloads.append(fragments)
+ # sanity
+ sanity_body = ''
+ for fragment_payload in fragment_payloads:
+ sanity_body += self.policy.pyeclib_driver.decode(
+ fragment_payload)
+ self.assertEqual(len(real_body), len(sanity_body))
+ self.assertEqual(real_body, sanity_body)
+
+ node_fragments = zip(*fragment_payloads)
+ self.assertEqual(len(node_fragments), self.replicas()) # sanity
+ responses = [(200, ''.join(node_fragments[i]), {})
+ for i in range(POLICIES.default.ec_ndata)]
+ status_codes, body_iter, headers = zip(*responses)
+ with set_http_connect(*status_codes, body_iter=body_iter,
+ headers=headers):
+ resp = req.get_response(self.app)
+ self.assertEquals(resp.status_int, 200)
+ self.assertEqual(len(real_body), len(resp.body))
+ self.assertEqual(real_body, resp.body)
+
+ def test_PUT_simple(self):
+ req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT',
+ body='')
+ codes = [201] * self.replicas()
+ expect_headers = {
+ 'X-Obj-Metadata-Footer': 'yes',
+ 'X-Obj-Multiphase-Commit': 'yes'
+ }
+ with set_http_connect(*codes, expect_headers=expect_headers):
+ resp = req.get_response(self.app)
+ self.assertEquals(resp.status_int, 201)
+
+ def test_PUT_with_explicit_commit_status(self):
+ req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT',
+ body='')
+ codes = [(100, 100, 201)] * self.replicas()
+ expect_headers = {
+ 'X-Obj-Metadata-Footer': 'yes',
+ 'X-Obj-Multiphase-Commit': 'yes'
+ }
+ with set_http_connect(*codes, expect_headers=expect_headers):
+ resp = req.get_response(self.app)
+ self.assertEquals(resp.status_int, 201)
+
+ def test_PUT_error(self):
+ req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT',
+ body='')
+ codes = [503] * self.replicas()
+ expect_headers = {
+ 'X-Obj-Metadata-Footer': 'yes',
+ 'X-Obj-Multiphase-Commit': 'yes'
+ }
+ with set_http_connect(*codes, expect_headers=expect_headers):
+ resp = req.get_response(self.app)
+ self.assertEquals(resp.status_int, 503)
+
+ def test_PUT_mostly_success(self):
+ req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT',
+ body='')
+ codes = [201] * self.quorum()
+ codes += [503] * (self.replicas() - len(codes))
+ random.shuffle(codes)
+ expect_headers = {
+ 'X-Obj-Metadata-Footer': 'yes',
+ 'X-Obj-Multiphase-Commit': 'yes'
+ }
+ with set_http_connect(*codes, expect_headers=expect_headers):
+ resp = req.get_response(self.app)
+ self.assertEquals(resp.status_int, 201)
+
+ def test_PUT_error_commit(self):
+ req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT',
+ body='')
+ codes = [(100, 503, Exception('not used'))] * self.replicas()
+ expect_headers = {
+ 'X-Obj-Metadata-Footer': 'yes',
+ 'X-Obj-Multiphase-Commit': 'yes'
+ }
+ with set_http_connect(*codes, expect_headers=expect_headers):
+ resp = req.get_response(self.app)
+ self.assertEquals(resp.status_int, 503)
+
+ def test_PUT_mostly_success_commit(self):
+ req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT',
+ body='')
+ codes = [201] * self.quorum()
+ codes += [(100, 503, Exception('not used'))] * (
+ self.replicas() - len(codes))
+ random.shuffle(codes)
+ expect_headers = {
+ 'X-Obj-Metadata-Footer': 'yes',
+ 'X-Obj-Multiphase-Commit': 'yes'
+ }
+ with set_http_connect(*codes, expect_headers=expect_headers):
+ resp = req.get_response(self.app)
+ self.assertEquals(resp.status_int, 201)
+
+ def test_PUT_mostly_error_commit(self):
+ req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT',
+ body='')
+ codes = [(100, 503, Exception('not used'))] * self.quorum()
+ codes += [201] * (self.replicas() - len(codes))
+ random.shuffle(codes)
+ expect_headers = {
+ 'X-Obj-Metadata-Footer': 'yes',
+ 'X-Obj-Multiphase-Commit': 'yes'
+ }
+ with set_http_connect(*codes, expect_headers=expect_headers):
+ resp = req.get_response(self.app)
+ self.assertEquals(resp.status_int, 503)
+
+ def test_PUT_commit_timeout(self):
+ req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT',
+ body='')
+ codes = [201] * (self.replicas() - 1)
+ codes.append((100, Timeout(), Exception('not used')))
+ expect_headers = {
+ 'X-Obj-Metadata-Footer': 'yes',
+ 'X-Obj-Multiphase-Commit': 'yes'
+ }
+ with set_http_connect(*codes, expect_headers=expect_headers):
+ resp = req.get_response(self.app)
+ self.assertEquals(resp.status_int, 201)
+
+ def test_PUT_commit_exception(self):
+ req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT',
+ body='')
+ codes = [201] * (self.replicas() - 1)
+ codes.append((100, Exception('kaboom!'), Exception('not used')))
+ expect_headers = {
+ 'X-Obj-Metadata-Footer': 'yes',
+ 'X-Obj-Multiphase-Commit': 'yes'
+ }
+ with set_http_connect(*codes, expect_headers=expect_headers):
+ resp = req.get_response(self.app)
+ self.assertEquals(resp.status_int, 201)
+
+ def test_PUT_with_body(self):
+ req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT')
+ segment_size = self.policy.ec_segment_size
+ test_body = ('asdf' * segment_size)[:-10]
+ etag = md5(test_body).hexdigest()
+ size = len(test_body)
+ req.body = test_body
+ codes = [201] * self.replicas()
+ expect_headers = {
+ 'X-Obj-Metadata-Footer': 'yes',
+ 'X-Obj-Multiphase-Commit': 'yes'
+ }
+
+ put_requests = defaultdict(lambda: {'boundary': None, 'chunks': []})
+
+ def capture_body(conn_id, chunk):
+ put_requests[conn_id]['chunks'].append(chunk)
+
+ def capture_headers(ip, port, device, part, method, path, headers,
+ **kwargs):
+ conn_id = kwargs['connection_id']
+ put_requests[conn_id]['boundary'] = headers[
+ 'X-Backend-Obj-Multipart-Mime-Boundary']
+
+ with set_http_connect(*codes, expect_headers=expect_headers,
+ give_send=capture_body,
+ give_connect=capture_headers):
+ resp = req.get_response(self.app)
+
+ self.assertEquals(resp.status_int, 201)
+ frag_archives = []
+ for connection_id, info in put_requests.items():
+ body = unchunk_body(''.join(info['chunks']))
+ self.assertTrue(info['boundary'] is not None,
+ "didn't get boundary for conn %r" % (
+ connection_id,))
+
+ # email.parser.FeedParser doesn't know how to take a multipart
+ # message and boundary together and parse it; it only knows how
+ # to take a string, parse the headers, and figure out the
+ # boundary on its own.
+ parser = email.parser.FeedParser()
+ parser.feed(
+ "Content-Type: multipart/nobodycares; boundary=%s\r\n\r\n" %
+ info['boundary'])
+ parser.feed(body)
+ message = parser.close()
+
+ self.assertTrue(message.is_multipart()) # sanity check
+ mime_parts = message.get_payload()
+ self.assertEqual(len(mime_parts), 3)
+ obj_part, footer_part, commit_part = mime_parts
+
+ # attach the body to frag_archives list
+ self.assertEqual(obj_part['X-Document'], 'object body')
+ frag_archives.append(obj_part.get_payload())
+
+ # validate some footer metadata
+ self.assertEqual(footer_part['X-Document'], 'object metadata')
+ footer_metadata = json.loads(footer_part.get_payload())
+ self.assertTrue(footer_metadata)
+ expected = {
+ 'X-Object-Sysmeta-EC-Content-Length': str(size),
+ 'X-Backend-Container-Update-Override-Size': str(size),
+ 'X-Object-Sysmeta-EC-Etag': etag,
+ 'X-Backend-Container-Update-Override-Etag': etag,
+ 'X-Object-Sysmeta-EC-Segment-Size': str(segment_size),
+ }
+ for header, value in expected.items():
+ self.assertEqual(footer_metadata[header], value)
+
+ # sanity on commit message
+ self.assertEqual(commit_part['X-Document'], 'put commit')
+
+ self.assertEqual(len(frag_archives), self.replicas())
+ fragment_size = self.policy.fragment_size
+ node_payloads = []
+ for fa in frag_archives:
+ payload = [fa[x:x + fragment_size]
+ for x in range(0, len(fa), fragment_size)]
+ node_payloads.append(payload)
+ fragment_payloads = zip(*node_payloads)
+
+ expected_body = ''
+ for fragment_payload in fragment_payloads:
+ self.assertEqual(len(fragment_payload), self.replicas())
+ if True:
+ fragment_payload = list(fragment_payload)
+ expected_body += self.policy.pyeclib_driver.decode(
+ fragment_payload)
+
+ self.assertEqual(len(test_body), len(expected_body))
+ self.assertEqual(test_body, expected_body)
+
+ def test_PUT_old_obj_server(self):
+ req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT',
+ body='')
+ responses = [
+ # one server will response 100-continue but not include the
+ # needful expect headers and the connection will be dropped
+ ((100, Exception('not used')), {}),
+ ] + [
+ # and pleanty of successful responses too
+ (201, {
+ 'X-Obj-Metadata-Footer': 'yes',
+ 'X-Obj-Multiphase-Commit': 'yes',
+ }),
+ ] * self.replicas()
+ random.shuffle(responses)
+ if responses[-1][0] != 201:
+ # whoops, stupid random
+ responses = responses[1:] + [responses[0]]
+ codes, expect_headers = zip(*responses)
+ with set_http_connect(*codes, expect_headers=expect_headers):
+ resp = req.get_response(self.app)
+ self.assertEquals(resp.status_int, 201)
+
+ def test_COPY_cross_policy_type_from_replicated(self):
+ self.app.per_container_info = {
+ 'c1': self.app.container_info.copy(),
+ 'c2': self.app.container_info.copy(),
+ }
+ # make c2 use replicated storage policy 1
+ self.app.per_container_info['c2']['storage_policy'] = '1'
+
+ # a put request with copy from source c2
+ req = swift.common.swob.Request.blank('/v1/a/c1/o', method='PUT',
+ body='', headers={
+ 'X-Copy-From': 'c2/o'})
+
+ # c2 get
+ codes = [200] * self.replicas(POLICIES[1])
+ codes += [404] * POLICIES[1].object_ring.max_more_nodes
+ # c1 put
+ codes += [201] * self.replicas()
+ expect_headers = {
+ 'X-Obj-Metadata-Footer': 'yes',
+ 'X-Obj-Multiphase-Commit': 'yes'
+ }
+ with set_http_connect(*codes, expect_headers=expect_headers):
+ resp = req.get_response(self.app)
+ self.assertEqual(resp.status_int, 201)
+
+ def test_COPY_cross_policy_type_to_replicated(self):
+ self.app.per_container_info = {
+ 'c1': self.app.container_info.copy(),
+ 'c2': self.app.container_info.copy(),
+ }
+ # make c1 use replicated storage policy 1
+ self.app.per_container_info['c1']['storage_policy'] = '1'
+
+ # a put request with copy from source c2
+ req = swift.common.swob.Request.blank('/v1/a/c1/o', method='PUT',
+ body='', headers={
+ 'X-Copy-From': 'c2/o'})
+
+ # c2 get
+ codes = [200] * self.replicas()
+ codes += [404] * self.obj_ring.max_more_nodes
+ headers = {
+ 'X-Object-Sysmeta-Ec-Content-Length': 0,
+ }
+ # c1 put
+ codes += [201] * self.replicas(POLICIES[1])
+ with set_http_connect(*codes, headers=headers):
+ resp = req.get_response(self.app)
+ self.assertEqual(resp.status_int, 201)
+
+ def test_COPY_cross_policy_type_unknown(self):
+ self.app.per_container_info = {
+ 'c1': self.app.container_info.copy(),
+ 'c2': self.app.container_info.copy(),
+ }
+ # make c1 use some made up storage policy index
+ self.app.per_container_info['c1']['storage_policy'] = '13'
+
+ # a COPY request of c2 with destination in c1
+ req = swift.common.swob.Request.blank('/v1/a/c2/o', method='COPY',
+ body='', headers={
+ 'Destination': 'c1/o'})
+ with set_http_connect():
+ resp = req.get_response(self.app)
+ self.assertEqual(resp.status_int, 503)
+
+ def _make_ec_archive_bodies(self, test_body, policy=None):
+ policy = policy or self.policy
+ segment_size = policy.ec_segment_size
+ # split up the body into buffers
+ chunks = [test_body[x:x + segment_size]
+ for x in range(0, len(test_body), segment_size)]
+ # encode the buffers into fragment payloads
+ fragment_payloads = []
+ for chunk in chunks:
+ fragments = self.policy.pyeclib_driver.encode(chunk)
+ if not fragments:
+ break
+ fragment_payloads.append(fragments)
+
+ # join up the fragment payloads per node
+ ec_archive_bodies = [''.join(fragments)
+ for fragments in zip(*fragment_payloads)]
+ return ec_archive_bodies
+
+ def test_GET_mismatched_fragment_archives(self):
+ segment_size = self.policy.ec_segment_size
+ test_data1 = ('test' * segment_size)[:-333]
+ # N.B. the object data *length* here is different
+ test_data2 = ('blah1' * segment_size)[:-333]
+
+ etag1 = md5(test_data1).hexdigest()
+ etag2 = md5(test_data2).hexdigest()
+
+ ec_archive_bodies1 = self._make_ec_archive_bodies(test_data1)
+ ec_archive_bodies2 = self._make_ec_archive_bodies(test_data2)
+
+ headers1 = {'X-Object-Sysmeta-Ec-Etag': etag1}
+ # here we're going to *lie* and say the etag here matches
+ headers2 = {'X-Object-Sysmeta-Ec-Etag': etag1}
+
+ responses1 = [(200, body, headers1)
+ for body in ec_archive_bodies1]
+ responses2 = [(200, body, headers2)
+ for body in ec_archive_bodies2]
+
+ req = swob.Request.blank('/v1/a/c/o')
+
+ # sanity check responses1
+ responses = responses1[:self.policy.ec_ndata]
+ status_codes, body_iter, headers = zip(*responses)
+ with set_http_connect(*status_codes, body_iter=body_iter,
+ headers=headers):
+ resp = req.get_response(self.app)
+ self.assertEqual(resp.status_int, 200)
+ self.assertEqual(md5(resp.body).hexdigest(), etag1)
+
+ # sanity check responses2
+ responses = responses2[:self.policy.ec_ndata]
+ status_codes, body_iter, headers = zip(*responses)
+ with set_http_connect(*status_codes, body_iter=body_iter,
+ headers=headers):
+ resp = req.get_response(self.app)
+ self.assertEqual(resp.status_int, 200)
+ self.assertEqual(md5(resp.body).hexdigest(), etag2)
+
+ # now mix the responses a bit
+ mix_index = random.randint(0, self.policy.ec_ndata - 1)
+ mixed_responses = responses1[:self.policy.ec_ndata]
+ mixed_responses[mix_index] = responses2[mix_index]
+
+ status_codes, body_iter, headers = zip(*mixed_responses)
+ with set_http_connect(*status_codes, body_iter=body_iter,
+ headers=headers):
+ resp = req.get_response(self.app)
+ self.assertEqual(resp.status_int, 200)
+ try:
+ resp.body
+ except ECDriverError:
+ pass
+ else:
+ self.fail('invalid ec fragment response body did not blow up!')
+ error_lines = self.logger.get_lines_for_level('error')
+ self.assertEqual(1, len(error_lines))
+ msg = error_lines[0]
+ self.assertTrue('Error decoding fragments' in msg)
+ self.assertTrue('/a/c/o' in msg)
+ log_msg_args, log_msg_kwargs = self.logger.log_dict['error'][0]
+ self.assertEqual(log_msg_kwargs['exc_info'][0], ECDriverError)
+
+ def test_GET_read_timeout(self):
+ segment_size = self.policy.ec_segment_size
+ test_data = ('test' * segment_size)[:-333]
+ etag = md5(test_data).hexdigest()
+ ec_archive_bodies = self._make_ec_archive_bodies(test_data)
+ headers = {'X-Object-Sysmeta-Ec-Etag': etag}
+ self.app.recoverable_node_timeout = 0.01
+ responses = [(200, SlowBody(body, 0.1), headers)
+ for body in ec_archive_bodies]
+
+ req = swob.Request.blank('/v1/a/c/o')
+
+ status_codes, body_iter, headers = zip(*responses + [
+ (404, '', {}) for i in range(
+ self.policy.object_ring.max_more_nodes)])
+ with set_http_connect(*status_codes, body_iter=body_iter,
+ headers=headers):
+ resp = req.get_response(self.app)
+ self.assertEqual(resp.status_int, 200)
+ # do this inside the fake http context manager, it'll try to
+ # resume but won't be able to give us all the right bytes
+ self.assertNotEqual(md5(resp.body).hexdigest(), etag)
+ error_lines = self.logger.get_lines_for_level('error')
+ self.assertEqual(self.replicas(), len(error_lines))
+ nparity = self.policy.ec_nparity
+ for line in error_lines[:nparity]:
+ self.assertTrue('retrying' in line)
+ for line in error_lines[nparity:]:
+ self.assertTrue('ChunkReadTimeout (0.01s)' in line)
+
+ def test_GET_read_timeout_resume(self):
+ segment_size = self.policy.ec_segment_size
+ test_data = ('test' * segment_size)[:-333]
+ etag = md5(test_data).hexdigest()
+ ec_archive_bodies = self._make_ec_archive_bodies(test_data)
+ headers = {'X-Object-Sysmeta-Ec-Etag': etag}
+ self.app.recoverable_node_timeout = 0.05
+ # first one is slow
+ responses = [(200, SlowBody(ec_archive_bodies[0], 0.1), headers)]
+ # ... the rest are fine
+ responses += [(200, body, headers)
+ for body in ec_archive_bodies[1:]]
+
+ req = swob.Request.blank('/v1/a/c/o')
+
+ status_codes, body_iter, headers = zip(
+ *responses[:self.policy.ec_ndata + 1])
+ with set_http_connect(*status_codes, body_iter=body_iter,
+ headers=headers):
+ resp = req.get_response(self.app)
+ self.assertEqual(resp.status_int, 200)
+ self.assertTrue(md5(resp.body).hexdigest(), etag)
+ error_lines = self.logger.get_lines_for_level('error')
+ self.assertEqual(1, len(error_lines))
+ self.assertTrue('retrying' in error_lines[0])
+
if __name__ == '__main__':
unittest.main()
diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py
index 5bee370fc..969c54d94 100644
--- a/test/unit/proxy/test_server.py
+++ b/test/unit/proxy/test_server.py
@@ -1,3 +1,4 @@
+# -*- coding: utf-8 -*-
# Copyright (c) 2010-2012 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -14,10 +15,13 @@
# limitations under the License.
import logging
+import math
import os
+import pickle
import sys
import unittest
-from contextlib import contextmanager, nested
+from contextlib import closing, contextmanager, nested
+from gzip import GzipFile
from shutil import rmtree
from StringIO import StringIO
import gc
@@ -25,6 +29,7 @@ import time
from textwrap import dedent
from urllib import quote
from hashlib import md5
+from pyeclib.ec_iface import ECDriverError
from tempfile import mkdtemp
import weakref
import operator
@@ -35,13 +40,14 @@ import random
import mock
from eventlet import sleep, spawn, wsgi, listen, Timeout
-from swift.common.utils import json
+from swift.common.utils import hash_path, json, storage_directory, public
from test.unit import (
connect_tcp, readuntil2crlfs, FakeLogger, fake_http_connect, FakeRing,
FakeMemcache, debug_logger, patch_policies, write_fake_ring,
mocked_http_conn)
from swift.proxy import server as proxy_server
+from swift.proxy.controllers.obj import ReplicatedObjectController
from swift.account import server as account_server
from swift.container import server as container_server
from swift.obj import server as object_server
@@ -49,16 +55,18 @@ from swift.common.middleware import proxy_logging
from swift.common.middleware.acl import parse_acl, format_acl
from swift.common.exceptions import ChunkReadTimeout, DiskFileNotExist
from swift.common import utils, constraints
+from swift.common.ring import RingData
from swift.common.utils import mkdirs, normalize_timestamp, NullLogger
from swift.common.wsgi import monkey_patch_mimetools, loadapp
from swift.proxy.controllers import base as proxy_base
from swift.proxy.controllers.base import get_container_memcache_key, \
get_account_memcache_key, cors_validation
import swift.proxy.controllers
+import swift.proxy.controllers.obj
from swift.common.swob import Request, Response, HTTPUnauthorized, \
- HTTPException
+ HTTPException, HeaderKeyDict
from swift.common import storage_policy
-from swift.common.storage_policy import StoragePolicy, \
+from swift.common.storage_policy import StoragePolicy, ECStoragePolicy, \
StoragePolicyCollection, POLICIES
from swift.common.request_helpers import get_sys_meta_prefix
@@ -101,8 +109,10 @@ def do_setup(the_object_server):
con2lis = listen(('localhost', 0))
obj1lis = listen(('localhost', 0))
obj2lis = listen(('localhost', 0))
+ obj3lis = listen(('localhost', 0))
+ objsocks = [obj1lis, obj2lis, obj3lis]
_test_sockets = \
- (prolis, acc1lis, acc2lis, con1lis, con2lis, obj1lis, obj2lis)
+ (prolis, acc1lis, acc2lis, con1lis, con2lis, obj1lis, obj2lis, obj3lis)
account_ring_path = os.path.join(_testdir, 'account.ring.gz')
account_devs = [
{'port': acc1lis.getsockname()[1]},
@@ -118,27 +128,45 @@ def do_setup(the_object_server):
storage_policy._POLICIES = StoragePolicyCollection([
StoragePolicy(0, 'zero', True),
StoragePolicy(1, 'one', False),
- StoragePolicy(2, 'two', False)])
+ StoragePolicy(2, 'two', False),
+ ECStoragePolicy(3, 'ec', ec_type='jerasure_rs_vand',
+ ec_ndata=2, ec_nparity=1, ec_segment_size=4096)])
obj_rings = {
0: ('sda1', 'sdb1'),
1: ('sdc1', 'sdd1'),
2: ('sde1', 'sdf1'),
+ # sdg1, sdh1, sdi1 taken by policy 3 (see below)
}
for policy_index, devices in obj_rings.items():
policy = POLICIES[policy_index]
- dev1, dev2 = devices
obj_ring_path = os.path.join(_testdir, policy.ring_name + '.ring.gz')
obj_devs = [
- {'port': obj1lis.getsockname()[1], 'device': dev1},
- {'port': obj2lis.getsockname()[1], 'device': dev2},
- ]
+ {'port': objsock.getsockname()[1], 'device': dev}
+ for objsock, dev in zip(objsocks, devices)]
write_fake_ring(obj_ring_path, *obj_devs)
+
+ # write_fake_ring can't handle a 3-element ring, and the EC policy needs
+ # at least 3 devs to work with, so we do it manually
+ devs = [{'id': 0, 'zone': 0, 'device': 'sdg1', 'ip': '127.0.0.1',
+ 'port': obj1lis.getsockname()[1]},
+ {'id': 1, 'zone': 0, 'device': 'sdh1', 'ip': '127.0.0.1',
+ 'port': obj2lis.getsockname()[1]},
+ {'id': 2, 'zone': 0, 'device': 'sdi1', 'ip': '127.0.0.1',
+ 'port': obj3lis.getsockname()[1]}]
+ pol3_replica2part2dev_id = [[0, 1, 2, 0],
+ [1, 2, 0, 1],
+ [2, 0, 1, 2]]
+ obj3_ring_path = os.path.join(_testdir, POLICIES[3].ring_name + '.ring.gz')
+ part_shift = 30
+ with closing(GzipFile(obj3_ring_path, 'wb')) as fh:
+ pickle.dump(RingData(pol3_replica2part2dev_id, devs, part_shift), fh)
+
prosrv = proxy_server.Application(conf, FakeMemcacheReturnsNone(),
logger=debug_logger('proxy'))
for policy in POLICIES:
# make sure all the rings are loaded
prosrv.get_object_ring(policy.idx)
- # don't loose this one!
+ # don't lose this one!
_test_POLICIES = storage_policy._POLICIES
acc1srv = account_server.AccountController(
conf, logger=debug_logger('acct1'))
@@ -152,8 +180,10 @@ def do_setup(the_object_server):
conf, logger=debug_logger('obj1'))
obj2srv = the_object_server.ObjectController(
conf, logger=debug_logger('obj2'))
+ obj3srv = the_object_server.ObjectController(
+ conf, logger=debug_logger('obj3'))
_test_servers = \
- (prosrv, acc1srv, acc2srv, con1srv, con2srv, obj1srv, obj2srv)
+ (prosrv, acc1srv, acc2srv, con1srv, con2srv, obj1srv, obj2srv, obj3srv)
nl = NullLogger()
logging_prosv = proxy_logging.ProxyLoggingMiddleware(prosrv, conf,
logger=prosrv.logger)
@@ -164,8 +194,9 @@ def do_setup(the_object_server):
con2spa = spawn(wsgi.server, con2lis, con2srv, nl)
obj1spa = spawn(wsgi.server, obj1lis, obj1srv, nl)
obj2spa = spawn(wsgi.server, obj2lis, obj2srv, nl)
+ obj3spa = spawn(wsgi.server, obj3lis, obj3srv, nl)
_test_coros = \
- (prospa, acc1spa, acc2spa, con1spa, con2spa, obj1spa, obj2spa)
+ (prospa, acc1spa, acc2spa, con1spa, con2spa, obj1spa, obj2spa, obj3spa)
# Create account
ts = normalize_timestamp(time.time())
partition, nodes = prosrv.account_ring.get_nodes('a')
@@ -279,6 +310,15 @@ def sortHeaderNames(headerNames):
return ', '.join(headers)
+def parse_headers_string(headers_str):
+ headers_dict = HeaderKeyDict()
+ for line in headers_str.split('\r\n'):
+ if ': ' in line:
+ header, value = line.split(': ', 1)
+ headers_dict[header] = value
+ return headers_dict
+
+
def node_error_count(proxy_app, ring_node):
# Reach into the proxy's internals to get the error count for a
# particular node
@@ -845,12 +885,12 @@ class TestProxyServer(unittest.TestCase):
self.assertTrue(app.admin_key is None)
def test_get_info_controller(self):
- path = '/info'
+ req = Request.blank('/info')
app = proxy_server.Application({}, FakeMemcache(),
account_ring=FakeRing(),
container_ring=FakeRing())
- controller, path_parts = app.get_controller(path)
+ controller, path_parts = app.get_controller(req)
self.assertTrue('version' in path_parts)
self.assertTrue(path_parts['version'] is None)
@@ -860,6 +900,65 @@ class TestProxyServer(unittest.TestCase):
self.assertEqual(controller.__name__, 'InfoController')
+ def test_error_limit_methods(self):
+ logger = debug_logger('test')
+ app = proxy_server.Application({}, FakeMemcache(),
+ account_ring=FakeRing(),
+ container_ring=FakeRing(),
+ logger=logger)
+ node = app.container_ring.get_part_nodes(0)[0]
+ # error occurred
+ app.error_occurred(node, 'test msg')
+ self.assertTrue('test msg' in
+ logger.get_lines_for_level('error')[-1])
+ self.assertEqual(1, node_error_count(app, node))
+
+ # exception occurred
+ try:
+ raise Exception('kaboom1!')
+ except Exception as e1:
+ app.exception_occurred(node, 'test1', 'test1 msg')
+ line = logger.get_lines_for_level('error')[-1]
+ self.assertTrue('test1 server' in line)
+ self.assertTrue('test1 msg' in line)
+ log_args, log_kwargs = logger.log_dict['error'][-1]
+ self.assertTrue(log_kwargs['exc_info'])
+ self.assertEqual(log_kwargs['exc_info'][1], e1)
+ self.assertEqual(2, node_error_count(app, node))
+
+ # warning exception occurred
+ try:
+ raise Exception('kaboom2!')
+ except Exception as e2:
+ app.exception_occurred(node, 'test2', 'test2 msg',
+ level=logging.WARNING)
+ line = logger.get_lines_for_level('warning')[-1]
+ self.assertTrue('test2 server' in line)
+ self.assertTrue('test2 msg' in line)
+ log_args, log_kwargs = logger.log_dict['warning'][-1]
+ self.assertTrue(log_kwargs['exc_info'])
+ self.assertEqual(log_kwargs['exc_info'][1], e2)
+ self.assertEqual(3, node_error_count(app, node))
+
+ # custom exception occurred
+ try:
+ raise Exception('kaboom3!')
+ except Exception as e3:
+ e3_info = sys.exc_info()
+ try:
+ raise Exception('kaboom4!')
+ except Exception:
+ pass
+ app.exception_occurred(node, 'test3', 'test3 msg',
+ level=logging.WARNING, exc_info=e3_info)
+ line = logger.get_lines_for_level('warning')[-1]
+ self.assertTrue('test3 server' in line)
+ self.assertTrue('test3 msg' in line)
+ log_args, log_kwargs = logger.log_dict['warning'][-1]
+ self.assertTrue(log_kwargs['exc_info'])
+ self.assertEqual(log_kwargs['exc_info'][1], e3)
+ self.assertEqual(4, node_error_count(app, node))
+
@patch_policies([
StoragePolicy(0, 'zero', is_default=True),
@@ -980,6 +1079,23 @@ class TestObjectController(unittest.TestCase):
for policy in POLICIES:
policy.object_ring = FakeRing(base_port=3000)
+ def put_container(self, policy_name, container_name):
+ # Note: only works if called with unpatched policies
+ prolis = _test_sockets[0]
+ sock = connect_tcp(('localhost', prolis.getsockname()[1]))
+ fd = sock.makefile()
+ fd.write('PUT /v1/a/%s HTTP/1.1\r\n'
+ 'Host: localhost\r\n'
+ 'Connection: close\r\n'
+ 'Content-Length: 0\r\n'
+ 'X-Storage-Token: t\r\n'
+ 'X-Storage-Policy: %s\r\n'
+ '\r\n' % (container_name, policy_name))
+ fd.flush()
+ headers = readuntil2crlfs(fd)
+ exp = 'HTTP/1.1 2'
+ self.assertEqual(headers[:len(exp)], exp)
+
def assert_status_map(self, method, statuses, expected, raise_exc=False):
with save_globals():
kwargs = {}
@@ -1208,6 +1324,619 @@ class TestObjectController(unittest.TestCase):
self.assertEqual(res.status_int, 200)
self.assertEqual(res.body, obj)
+ @unpatch_policies
+ def test_PUT_ec(self):
+ policy = POLICIES[3]
+ self.put_container("ec", "ec-con")
+
+ obj = 'abCD' * 10 # small, so we don't get multiple EC stripes
+ prolis = _test_sockets[0]
+ sock = connect_tcp(('localhost', prolis.getsockname()[1]))
+ fd = sock.makefile()
+ fd.write('PUT /v1/a/ec-con/o1 HTTP/1.1\r\n'
+ 'Host: localhost\r\n'
+ 'Connection: close\r\n'
+ 'Etag: "%s"\r\n'
+ 'Content-Length: %d\r\n'
+ 'X-Storage-Token: t\r\n'
+ 'Content-Type: application/octet-stream\r\n'
+ '\r\n%s' % (md5(obj).hexdigest(), len(obj), obj))
+ fd.flush()
+ headers = readuntil2crlfs(fd)
+ exp = 'HTTP/1.1 201'
+ self.assertEqual(headers[:len(exp)], exp)
+
+ ecd = policy.pyeclib_driver
+ expected_pieces = set(ecd.encode(obj))
+
+ # go to disk to make sure it's there and all erasure-coded
+ partition, nodes = policy.object_ring.get_nodes('a', 'ec-con', 'o1')
+ conf = {'devices': _testdir, 'mount_check': 'false'}
+ df_mgr = diskfile.DiskFileManager(conf, FakeLogger())
+
+ got_pieces = set()
+ got_indices = set()
+ got_durable = []
+ for node_index, node in enumerate(nodes):
+ df = df_mgr.get_diskfile(node['device'], partition,
+ 'a', 'ec-con', 'o1',
+ policy=policy)
+ with df.open():
+ meta = df.get_metadata()
+ contents = ''.join(df.reader())
+ got_pieces.add(contents)
+
+ # check presence for a .durable file for the timestamp
+ durable_file = os.path.join(
+ _testdir, node['device'], storage_directory(
+ diskfile.get_data_dir(policy),
+ partition, hash_path('a', 'ec-con', 'o1')),
+ utils.Timestamp(df.timestamp).internal + '.durable')
+
+ if os.path.isfile(durable_file):
+ got_durable.append(True)
+
+ lmeta = dict((k.lower(), v) for k, v in meta.items())
+ got_indices.add(
+ lmeta['x-object-sysmeta-ec-frag-index'])
+
+ self.assertEqual(
+ lmeta['x-object-sysmeta-ec-etag'],
+ md5(obj).hexdigest())
+ self.assertEqual(
+ lmeta['x-object-sysmeta-ec-content-length'],
+ str(len(obj)))
+ self.assertEqual(
+ lmeta['x-object-sysmeta-ec-segment-size'],
+ '4096')
+ self.assertEqual(
+ lmeta['x-object-sysmeta-ec-scheme'],
+ 'jerasure_rs_vand 2+1')
+ self.assertEqual(
+ lmeta['etag'],
+ md5(contents).hexdigest())
+
+ self.assertEqual(expected_pieces, got_pieces)
+ self.assertEqual(set(('0', '1', '2')), got_indices)
+
+ # verify at least 2 puts made it all the way to the end of 2nd
+ # phase, ie at least 2 .durable statuses were written
+ num_durable_puts = sum(d is True for d in got_durable)
+ self.assertTrue(num_durable_puts >= 2)
+
+ @unpatch_policies
+ def test_PUT_ec_multiple_segments(self):
+ ec_policy = POLICIES[3]
+ self.put_container("ec", "ec-con")
+
+ pyeclib_header_size = len(ec_policy.pyeclib_driver.encode("")[0])
+ segment_size = ec_policy.ec_segment_size
+
+ # Big enough to have multiple segments. Also a multiple of the
+ # segment size to get coverage of that path too.
+ obj = 'ABC' * segment_size
+
+ prolis = _test_sockets[0]
+ sock = connect_tcp(('localhost', prolis.getsockname()[1]))
+ fd = sock.makefile()
+ fd.write('PUT /v1/a/ec-con/o2 HTTP/1.1\r\n'
+ 'Host: localhost\r\n'
+ 'Connection: close\r\n'
+ 'Content-Length: %d\r\n'
+ 'X-Storage-Token: t\r\n'
+ 'Content-Type: application/octet-stream\r\n'
+ '\r\n%s' % (len(obj), obj))
+ fd.flush()
+ headers = readuntil2crlfs(fd)
+ exp = 'HTTP/1.1 201'
+ self.assertEqual(headers[:len(exp)], exp)
+
+ # it's a 2+1 erasure code, so each fragment archive should be half
+ # the length of the object, plus three inline pyeclib metadata
+ # things (one per segment)
+ expected_length = (len(obj) / 2 + pyeclib_header_size * 3)
+
+ partition, nodes = ec_policy.object_ring.get_nodes(
+ 'a', 'ec-con', 'o2')
+
+ conf = {'devices': _testdir, 'mount_check': 'false'}
+ df_mgr = diskfile.DiskFileManager(conf, FakeLogger())
+
+ got_durable = []
+ fragment_archives = []
+ for node in nodes:
+ df = df_mgr.get_diskfile(
+ node['device'], partition, 'a',
+ 'ec-con', 'o2', policy=ec_policy)
+ with df.open():
+ contents = ''.join(df.reader())
+ fragment_archives.append(contents)
+ self.assertEqual(len(contents), expected_length)
+
+ # check presence for a .durable file for the timestamp
+ durable_file = os.path.join(
+ _testdir, node['device'], storage_directory(
+ diskfile.get_data_dir(ec_policy),
+ partition, hash_path('a', 'ec-con', 'o2')),
+ utils.Timestamp(df.timestamp).internal + '.durable')
+
+ if os.path.isfile(durable_file):
+ got_durable.append(True)
+
+ # Verify that we can decode each individual fragment and that they
+ # are all the correct size
+ fragment_size = ec_policy.fragment_size
+ nfragments = int(
+ math.ceil(float(len(fragment_archives[0])) / fragment_size))
+
+ for fragment_index in range(nfragments):
+ fragment_start = fragment_index * fragment_size
+ fragment_end = (fragment_index + 1) * fragment_size
+
+ try:
+ frags = [fa[fragment_start:fragment_end]
+ for fa in fragment_archives]
+ seg = ec_policy.pyeclib_driver.decode(frags)
+ except ECDriverError:
+ self.fail("Failed to decode fragments %d; this probably "
+ "means the fragments are not the sizes they "
+ "should be" % fragment_index)
+
+ segment_start = fragment_index * segment_size
+ segment_end = (fragment_index + 1) * segment_size
+
+ self.assertEqual(seg, obj[segment_start:segment_end])
+
+ # verify at least 2 puts made it all the way to the end of 2nd
+ # phase, ie at least 2 .durable statuses were written
+ num_durable_puts = sum(d is True for d in got_durable)
+ self.assertTrue(num_durable_puts >= 2)
+
+ @unpatch_policies
+ def test_PUT_ec_object_etag_mismatch(self):
+ self.put_container("ec", "ec-con")
+
+ obj = '90:6A:02:60:B1:08-96da3e706025537fc42464916427727e'
+ prolis = _test_sockets[0]
+ prosrv = _test_servers[0]
+ sock = connect_tcp(('localhost', prolis.getsockname()[1]))
+ fd = sock.makefile()
+ fd.write('PUT /v1/a/ec-con/o3 HTTP/1.1\r\n'
+ 'Host: localhost\r\n'
+ 'Connection: close\r\n'
+ 'Etag: %s\r\n'
+ 'Content-Length: %d\r\n'
+ 'X-Storage-Token: t\r\n'
+ 'Content-Type: application/octet-stream\r\n'
+ '\r\n%s' % (md5('something else').hexdigest(), len(obj), obj))
+ fd.flush()
+ headers = readuntil2crlfs(fd)
+ exp = 'HTTP/1.1 422'
+ self.assertEqual(headers[:len(exp)], exp)
+
+ # nothing should have made it to disk on the object servers
+ partition, nodes = prosrv.get_object_ring(3).get_nodes(
+ 'a', 'ec-con', 'o3')
+ conf = {'devices': _testdir, 'mount_check': 'false'}
+
+ partition, nodes = prosrv.get_object_ring(3).get_nodes(
+ 'a', 'ec-con', 'o3')
+ conf = {'devices': _testdir, 'mount_check': 'false'}
+ df_mgr = diskfile.DiskFileManager(conf, FakeLogger())
+
+ for node in nodes:
+ df = df_mgr.get_diskfile(node['device'], partition,
+ 'a', 'ec-con', 'o3', policy=POLICIES[3])
+ self.assertRaises(DiskFileNotExist, df.open)
+
+ @unpatch_policies
+ def test_PUT_ec_fragment_archive_etag_mismatch(self):
+ self.put_container("ec", "ec-con")
+
+ # Cause a hash mismatch by feeding one particular MD5 hasher some
+ # extra data. The goal here is to get exactly one of the hashers in
+ # an object server.
+ countdown = [1]
+
+ def busted_md5_constructor(initial_str=""):
+ hasher = md5(initial_str)
+ if countdown[0] == 0:
+ hasher.update('wrong')
+ countdown[0] -= 1
+ return hasher
+
+ obj = 'uvarovite-esurience-cerated-symphysic'
+ prolis = _test_sockets[0]
+ prosrv = _test_servers[0]
+ sock = connect_tcp(('localhost', prolis.getsockname()[1]))
+ with mock.patch('swift.obj.server.md5', busted_md5_constructor):
+ fd = sock.makefile()
+ fd.write('PUT /v1/a/ec-con/pimento HTTP/1.1\r\n'
+ 'Host: localhost\r\n'
+ 'Connection: close\r\n'
+ 'Etag: %s\r\n'
+ 'Content-Length: %d\r\n'
+ 'X-Storage-Token: t\r\n'
+ 'Content-Type: application/octet-stream\r\n'
+ '\r\n%s' % (md5(obj).hexdigest(), len(obj), obj))
+ fd.flush()
+ headers = readuntil2crlfs(fd)
+ exp = 'HTTP/1.1 503' # no quorum
+ self.assertEqual(headers[:len(exp)], exp)
+
+ # 2/3 of the fragment archives should have landed on disk
+ partition, nodes = prosrv.get_object_ring(3).get_nodes(
+ 'a', 'ec-con', 'pimento')
+ conf = {'devices': _testdir, 'mount_check': 'false'}
+
+ partition, nodes = prosrv.get_object_ring(3).get_nodes(
+ 'a', 'ec-con', 'pimento')
+ conf = {'devices': _testdir, 'mount_check': 'false'}
+
+ df_mgr = diskfile.DiskFileManager(conf, FakeLogger())
+
+ found = 0
+ for node in nodes:
+ df = df_mgr.get_diskfile(node['device'], partition,
+ 'a', 'ec-con', 'pimento',
+ policy=POLICIES[3])
+ try:
+ df.open()
+ found += 1
+ except DiskFileNotExist:
+ pass
+ self.assertEqual(found, 2)
+
+ @unpatch_policies
+ def test_PUT_ec_if_none_match(self):
+ self.put_container("ec", "ec-con")
+
+ obj = 'ananepionic-lepidophyllous-ropewalker-neglectful'
+ prolis = _test_sockets[0]
+ sock = connect_tcp(('localhost', prolis.getsockname()[1]))
+ fd = sock.makefile()
+ fd.write('PUT /v1/a/ec-con/inm HTTP/1.1\r\n'
+ 'Host: localhost\r\n'
+ 'Connection: close\r\n'
+ 'Etag: "%s"\r\n'
+ 'Content-Length: %d\r\n'
+ 'X-Storage-Token: t\r\n'
+ 'Content-Type: application/octet-stream\r\n'
+ '\r\n%s' % (md5(obj).hexdigest(), len(obj), obj))
+ fd.flush()
+ headers = readuntil2crlfs(fd)
+ exp = 'HTTP/1.1 201'
+ self.assertEqual(headers[:len(exp)], exp)
+
+ sock = connect_tcp(('localhost', prolis.getsockname()[1]))
+ fd = sock.makefile()
+ fd.write('PUT /v1/a/ec-con/inm HTTP/1.1\r\n'
+ 'Host: localhost\r\n'
+ 'Connection: close\r\n'
+ 'If-None-Match: *\r\n'
+ 'Etag: "%s"\r\n'
+ 'Content-Length: %d\r\n'
+ 'X-Storage-Token: t\r\n'
+ 'Content-Type: application/octet-stream\r\n'
+ '\r\n%s' % (md5(obj).hexdigest(), len(obj), obj))
+ fd.flush()
+ headers = readuntil2crlfs(fd)
+ exp = 'HTTP/1.1 412'
+ self.assertEqual(headers[:len(exp)], exp)
+
+ @unpatch_policies
+ def test_GET_ec(self):
+ self.put_container("ec", "ec-con")
+
+ obj = '0123456' * 11 * 17
+
+ prolis = _test_sockets[0]
+ sock = connect_tcp(('localhost', prolis.getsockname()[1]))
+ fd = sock.makefile()
+ fd.write('PUT /v1/a/ec-con/go-get-it HTTP/1.1\r\n'
+ 'Host: localhost\r\n'
+ 'Connection: close\r\n'
+ 'Content-Length: %d\r\n'
+ 'X-Storage-Token: t\r\n'
+ 'X-Object-Meta-Color: chartreuse\r\n'
+ 'Content-Type: application/octet-stream\r\n'
+ '\r\n%s' % (len(obj), obj))
+ fd.flush()
+ headers = readuntil2crlfs(fd)
+ exp = 'HTTP/1.1 201'
+ self.assertEqual(headers[:len(exp)], exp)
+
+ sock = connect_tcp(('localhost', prolis.getsockname()[1]))
+ fd = sock.makefile()
+ fd.write('GET /v1/a/ec-con/go-get-it HTTP/1.1\r\n'
+ 'Host: localhost\r\n'
+ 'Connection: close\r\n'
+ 'X-Storage-Token: t\r\n'
+ '\r\n')
+ fd.flush()
+ headers = readuntil2crlfs(fd)
+ exp = 'HTTP/1.1 200'
+ self.assertEqual(headers[:len(exp)], exp)
+
+ headers = parse_headers_string(headers)
+ self.assertEqual(str(len(obj)), headers['Content-Length'])
+ self.assertEqual(md5(obj).hexdigest(), headers['Etag'])
+ self.assertEqual('chartreuse', headers['X-Object-Meta-Color'])
+
+ gotten_obj = ''
+ while True:
+ buf = fd.read(64)
+ if not buf:
+ break
+ gotten_obj += buf
+ self.assertEqual(gotten_obj, obj)
+
+ @unpatch_policies
+ def test_conditional_GET_ec(self):
+ self.put_container("ec", "ec-con")
+
+ obj = 'this object has an etag and is otherwise unimportant'
+ etag = md5(obj).hexdigest()
+ not_etag = md5(obj + "blahblah").hexdigest()
+
+ prolis = _test_sockets[0]
+ prosrv = _test_servers[0]
+ sock = connect_tcp(('localhost', prolis.getsockname()[1]))
+ fd = sock.makefile()
+ fd.write('PUT /v1/a/ec-con/conditionals HTTP/1.1\r\n'
+ 'Host: localhost\r\n'
+ 'Connection: close\r\n'
+ 'Content-Length: %d\r\n'
+ 'X-Storage-Token: t\r\n'
+ 'Content-Type: application/octet-stream\r\n'
+ '\r\n%s' % (len(obj), obj))
+ fd.flush()
+ headers = readuntil2crlfs(fd)
+ exp = 'HTTP/1.1 201'
+ self.assertEqual(headers[:len(exp)], exp)
+
+ for verb in ('GET', 'HEAD'):
+ # If-Match
+ req = Request.blank(
+ '/v1/a/ec-con/conditionals',
+ environ={'REQUEST_METHOD': verb},
+ headers={'If-Match': etag})
+ resp = req.get_response(prosrv)
+ self.assertEqual(resp.status_int, 200)
+
+ req = Request.blank(
+ '/v1/a/ec-con/conditionals',
+ environ={'REQUEST_METHOD': verb},
+ headers={'If-Match': not_etag})
+ resp = req.get_response(prosrv)
+ self.assertEqual(resp.status_int, 412)
+
+ req = Request.blank(
+ '/v1/a/ec-con/conditionals',
+ environ={'REQUEST_METHOD': verb},
+ headers={'If-Match': "*"})
+ resp = req.get_response(prosrv)
+ self.assertEqual(resp.status_int, 200)
+
+ # If-None-Match
+ req = Request.blank(
+ '/v1/a/ec-con/conditionals',
+ environ={'REQUEST_METHOD': verb},
+ headers={'If-None-Match': etag})
+ resp = req.get_response(prosrv)
+ self.assertEqual(resp.status_int, 304)
+
+ req = Request.blank(
+ '/v1/a/ec-con/conditionals',
+ environ={'REQUEST_METHOD': verb},
+ headers={'If-None-Match': not_etag})
+ resp = req.get_response(prosrv)
+ self.assertEqual(resp.status_int, 200)
+
+ req = Request.blank(
+ '/v1/a/ec-con/conditionals',
+ environ={'REQUEST_METHOD': verb},
+ headers={'If-None-Match': "*"})
+ resp = req.get_response(prosrv)
+ self.assertEqual(resp.status_int, 304)
+
+ @unpatch_policies
+ def test_GET_ec_big(self):
+ self.put_container("ec", "ec-con")
+
+ # our EC segment size is 4 KiB, so this is multiple (3) segments;
+ # we'll verify that with a sanity check
+ obj = 'a moose once bit my sister' * 400
+ self.assertTrue(
+ len(obj) > POLICIES.get_by_name("ec").ec_segment_size * 2,
+ "object is too small for proper testing")
+
+ prolis = _test_sockets[0]
+ sock = connect_tcp(('localhost', prolis.getsockname()[1]))
+ fd = sock.makefile()
+ fd.write('PUT /v1/a/ec-con/big-obj-get HTTP/1.1\r\n'
+ 'Host: localhost\r\n'
+ 'Connection: close\r\n'
+ 'Content-Length: %d\r\n'
+ 'X-Storage-Token: t\r\n'
+ 'Content-Type: application/octet-stream\r\n'
+ '\r\n%s' % (len(obj), obj))
+ fd.flush()
+ headers = readuntil2crlfs(fd)
+ exp = 'HTTP/1.1 201'
+ self.assertEqual(headers[:len(exp)], exp)
+
+ sock = connect_tcp(('localhost', prolis.getsockname()[1]))
+ fd = sock.makefile()
+ fd.write('GET /v1/a/ec-con/big-obj-get HTTP/1.1\r\n'
+ 'Host: localhost\r\n'
+ 'Connection: close\r\n'
+ 'X-Storage-Token: t\r\n'
+ '\r\n')
+ fd.flush()
+ headers = readuntil2crlfs(fd)
+ exp = 'HTTP/1.1 200'
+ self.assertEqual(headers[:len(exp)], exp)
+
+ headers = parse_headers_string(headers)
+ self.assertEqual(str(len(obj)), headers['Content-Length'])
+ self.assertEqual(md5(obj).hexdigest(), headers['Etag'])
+
+ gotten_obj = ''
+ while True:
+ buf = fd.read(64)
+ if not buf:
+ break
+ gotten_obj += buf
+ # This may look like a redundant test, but when things fail, this
+ # has a useful failure message while the subsequent one spews piles
+ # of garbage and demolishes your terminal's scrollback buffer.
+ self.assertEqual(len(gotten_obj), len(obj))
+ self.assertEqual(gotten_obj, obj)
+
+ @unpatch_policies
+ def test_GET_ec_failure_handling(self):
+ self.put_container("ec", "ec-con")
+
+ obj = 'look at this object; it is simply amazing ' * 500
+ prolis = _test_sockets[0]
+ sock = connect_tcp(('localhost', prolis.getsockname()[1]))
+ fd = sock.makefile()
+ fd.write('PUT /v1/a/ec-con/crash-test-dummy HTTP/1.1\r\n'
+ 'Host: localhost\r\n'
+ 'Connection: close\r\n'
+ 'Content-Length: %d\r\n'
+ 'X-Storage-Token: t\r\n'
+ 'Content-Type: application/octet-stream\r\n'
+ '\r\n%s' % (len(obj), obj))
+ fd.flush()
+ headers = readuntil2crlfs(fd)
+ exp = 'HTTP/1.1 201'
+ self.assertEqual(headers[:len(exp)], exp)
+
+ def explodey_iter(inner_iter):
+ yield next(inner_iter)
+ raise Exception("doom ba doom")
+
+ real_ec_app_iter = swift.proxy.controllers.obj.ECAppIter
+
+ def explodey_ec_app_iter(path, policy, iterators, *a, **kw):
+ # Each thing in `iterators` here is a document-parts iterator,
+ # and we want to fail after getting a little into each part.
+ #
+ # That way, we ensure we've started streaming the response to
+ # the client when things go wrong.
+ return real_ec_app_iter(
+ path, policy,
+ [explodey_iter(i) for i in iterators],
+ *a, **kw)
+
+ with mock.patch("swift.proxy.controllers.obj.ECAppIter",
+ explodey_ec_app_iter):
+ sock = connect_tcp(('localhost', prolis.getsockname()[1]))
+ fd = sock.makefile()
+ fd.write('GET /v1/a/ec-con/crash-test-dummy HTTP/1.1\r\n'
+ 'Host: localhost\r\n'
+ 'Connection: close\r\n'
+ 'X-Storage-Token: t\r\n'
+ '\r\n')
+ fd.flush()
+ headers = readuntil2crlfs(fd)
+ exp = 'HTTP/1.1 200'
+ self.assertEqual(headers[:len(exp)], exp)
+
+ headers = parse_headers_string(headers)
+ self.assertEqual(str(len(obj)), headers['Content-Length'])
+ self.assertEqual(md5(obj).hexdigest(), headers['Etag'])
+
+ gotten_obj = ''
+ try:
+ with Timeout(300): # don't hang the testrun when this fails
+ while True:
+ buf = fd.read(64)
+ if not buf:
+ break
+ gotten_obj += buf
+ except Timeout:
+ self.fail("GET hung when connection failed")
+
+ # Ensure we failed partway through, otherwise the mocks could
+ # get out of date without anyone noticing
+ self.assertTrue(0 < len(gotten_obj) < len(obj))
+
+ @unpatch_policies
+ def test_HEAD_ec(self):
+ self.put_container("ec", "ec-con")
+
+ obj = '0123456' * 11 * 17
+
+ prolis = _test_sockets[0]
+ sock = connect_tcp(('localhost', prolis.getsockname()[1]))
+ fd = sock.makefile()
+ fd.write('PUT /v1/a/ec-con/go-head-it HTTP/1.1\r\n'
+ 'Host: localhost\r\n'
+ 'Connection: close\r\n'
+ 'Content-Length: %d\r\n'
+ 'X-Storage-Token: t\r\n'
+ 'X-Object-Meta-Color: chartreuse\r\n'
+ 'Content-Type: application/octet-stream\r\n'
+ '\r\n%s' % (len(obj), obj))
+ fd.flush()
+ headers = readuntil2crlfs(fd)
+ exp = 'HTTP/1.1 201'
+ self.assertEqual(headers[:len(exp)], exp)
+
+ sock = connect_tcp(('localhost', prolis.getsockname()[1]))
+ fd = sock.makefile()
+ fd.write('HEAD /v1/a/ec-con/go-head-it HTTP/1.1\r\n'
+ 'Host: localhost\r\n'
+ 'Connection: close\r\n'
+ 'X-Storage-Token: t\r\n'
+ '\r\n')
+ fd.flush()
+ headers = readuntil2crlfs(fd)
+ exp = 'HTTP/1.1 200'
+ self.assertEqual(headers[:len(exp)], exp)
+
+ headers = parse_headers_string(headers)
+ self.assertEqual(str(len(obj)), headers['Content-Length'])
+ self.assertEqual(md5(obj).hexdigest(), headers['Etag'])
+ self.assertEqual('chartreuse', headers['X-Object-Meta-Color'])
+
+ @unpatch_policies
+ def test_GET_ec_404(self):
+ self.put_container("ec", "ec-con")
+
+ prolis = _test_sockets[0]
+ sock = connect_tcp(('localhost', prolis.getsockname()[1]))
+ fd = sock.makefile()
+ fd.write('GET /v1/a/ec-con/yes-we-have-no-bananas HTTP/1.1\r\n'
+ 'Host: localhost\r\n'
+ 'Connection: close\r\n'
+ 'X-Storage-Token: t\r\n'
+ '\r\n')
+ fd.flush()
+ headers = readuntil2crlfs(fd)
+ exp = 'HTTP/1.1 404'
+ self.assertEqual(headers[:len(exp)], exp)
+
+ @unpatch_policies
+ def test_HEAD_ec_404(self):
+ self.put_container("ec", "ec-con")
+
+ prolis = _test_sockets[0]
+ sock = connect_tcp(('localhost', prolis.getsockname()[1]))
+ fd = sock.makefile()
+ fd.write('HEAD /v1/a/ec-con/yes-we-have-no-bananas HTTP/1.1\r\n'
+ 'Host: localhost\r\n'
+ 'Connection: close\r\n'
+ 'X-Storage-Token: t\r\n'
+ '\r\n')
+ fd.flush()
+ headers = readuntil2crlfs(fd)
+ exp = 'HTTP/1.1 404'
+ self.assertEqual(headers[:len(exp)], exp)
+
def test_PUT_expect_header_zero_content_length(self):
test_errors = []
@@ -1219,8 +1948,8 @@ class TestObjectController(unittest.TestCase):
'server!')
with save_globals():
- controller = proxy_server.ObjectController(self.app, 'account',
- 'container', 'object')
+ controller = ReplicatedObjectController(
+ self.app, 'account', 'container', 'object')
# The (201, Exception('test')) tuples in there have the effect of
# changing the status of the initial expect response. The default
# expect response from FakeConn for 201 is 100.
@@ -1255,8 +1984,8 @@ class TestObjectController(unittest.TestCase):
'non-zero byte PUT!')
with save_globals():
- controller = \
- proxy_server.ObjectController(self.app, 'a', 'c', 'o.jpg')
+ controller = ReplicatedObjectController(
+ self.app, 'a', 'c', 'o.jpg')
# the (100, 201) tuples in there are just being extra explicit
# about the FakeConn returning the 100 Continue status when the
# object controller calls getexpect. Which is FakeConn's default
@@ -1291,7 +2020,8 @@ class TestObjectController(unittest.TestCase):
self.app.write_affinity_node_count = lambda r: 3
controller = \
- proxy_server.ObjectController(self.app, 'a', 'c', 'o.jpg')
+ ReplicatedObjectController(
+ self.app, 'a', 'c', 'o.jpg')
set_http_connect(200, 200, 201, 201, 201,
give_connect=test_connect)
req = Request.blank('/v1/a/c/o.jpg', {})
@@ -1326,7 +2056,8 @@ class TestObjectController(unittest.TestCase):
self.app.write_affinity_node_count = lambda r: 3
controller = \
- proxy_server.ObjectController(self.app, 'a', 'c', 'o.jpg')
+ ReplicatedObjectController(
+ self.app, 'a', 'c', 'o.jpg')
self.app.error_limit(
object_ring.get_part_nodes(1)[0], 'test')
set_http_connect(200, 200, # account, container
@@ -1348,6 +2079,27 @@ class TestObjectController(unittest.TestCase):
self.assertNotEqual(0, written_to[2][1] % 2)
@unpatch_policies
+ def test_PUT_no_etag_fallocate(self):
+ with mock.patch('swift.obj.diskfile.fallocate') as mock_fallocate:
+ prolis = _test_sockets[0]
+ sock = connect_tcp(('localhost', prolis.getsockname()[1]))
+ fd = sock.makefile()
+ obj = 'hemoleucocytic-surfactant'
+ fd.write('PUT /v1/a/c/o HTTP/1.1\r\n'
+ 'Host: localhost\r\n'
+ 'Connection: close\r\n'
+ 'Content-Length: %d\r\n'
+ 'X-Storage-Token: t\r\n'
+ 'Content-Type: application/octet-stream\r\n'
+ '\r\n%s' % (len(obj), obj))
+ fd.flush()
+ headers = readuntil2crlfs(fd)
+ exp = 'HTTP/1.1 201'
+ self.assertEqual(headers[:len(exp)], exp)
+ # one for each obj server; this test has 2
+ self.assertEqual(len(mock_fallocate.mock_calls), 2)
+
+ @unpatch_policies
def test_PUT_message_length_using_content_length(self):
prolis = _test_sockets[0]
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
@@ -1586,7 +2338,8 @@ class TestObjectController(unittest.TestCase):
"last_modified": "1970-01-01T00:00:01.000000"}])
body_iter = ('', '', body, '', '', '', '', '', '', '', '', '', '', '')
with save_globals():
- controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o')
+ controller = ReplicatedObjectController(
+ self.app, 'a', 'c', 'o')
# HEAD HEAD GET GET HEAD GET GET GET PUT PUT
# PUT DEL DEL DEL
set_http_connect(200, 200, 200, 200, 200, 200, 200, 200, 201, 201,
@@ -1607,6 +2360,8 @@ class TestObjectController(unittest.TestCase):
StoragePolicy(1, 'one', True, object_ring=FakeRing())
])
def test_DELETE_on_expired_versioned_object(self):
+ # reset the router post patch_policies
+ self.app.obj_controller_router = proxy_server.ObjectControllerRouter()
methods = set()
def test_connect(ipaddr, port, device, partition, method, path,
@@ -1634,8 +2389,8 @@ class TestObjectController(unittest.TestCase):
yield obj
with save_globals():
- controller = proxy_server.ObjectController(self.app,
- 'a', 'c', 'o')
+ controller = ReplicatedObjectController(
+ self.app, 'a', 'c', 'o')
controller.container_info = fake_container_info
controller._listing_iter = fake_list_iter
set_http_connect(404, 404, 404, # get for the previous version
@@ -1657,8 +2412,8 @@ class TestObjectController(unittest.TestCase):
def test_PUT_auto_content_type(self):
with save_globals():
- controller = proxy_server.ObjectController(self.app, 'account',
- 'container', 'object')
+ controller = ReplicatedObjectController(
+ self.app, 'account', 'container', 'object')
def test_content_type(filename, expected):
# The three responses here are for account_info() (HEAD to
@@ -1704,8 +2459,8 @@ class TestObjectController(unittest.TestCase):
def test_PUT(self):
with save_globals():
- controller = proxy_server.ObjectController(self.app, 'account',
- 'container', 'object')
+ controller = ReplicatedObjectController(
+ self.app, 'account', 'container', 'object')
def test_status_map(statuses, expected):
set_http_connect(*statuses)
@@ -1724,8 +2479,8 @@ class TestObjectController(unittest.TestCase):
def test_PUT_connect_exceptions(self):
with save_globals():
- controller = proxy_server.ObjectController(self.app, 'account',
- 'container', 'object')
+ controller = ReplicatedObjectController(
+ self.app, 'account', 'container', 'object')
def test_status_map(statuses, expected):
set_http_connect(*statuses)
@@ -1755,8 +2510,8 @@ class TestObjectController(unittest.TestCase):
def test_PUT_send_exceptions(self):
with save_globals():
- controller = proxy_server.ObjectController(self.app, 'account',
- 'container', 'object')
+ controller = ReplicatedObjectController(
+ self.app, 'account', 'container', 'object')
def test_status_map(statuses, expected):
self.app.memcache.store = {}
@@ -1778,8 +2533,8 @@ class TestObjectController(unittest.TestCase):
def test_PUT_max_size(self):
with save_globals():
set_http_connect(201, 201, 201)
- controller = proxy_server.ObjectController(self.app, 'account',
- 'container', 'object')
+ controller = ReplicatedObjectController(
+ self.app, 'account', 'container', 'object')
req = Request.blank('/v1/a/c/o', {}, headers={
'Content-Length': str(constraints.MAX_FILE_SIZE + 1),
'Content-Type': 'foo/bar'})
@@ -1790,8 +2545,8 @@ class TestObjectController(unittest.TestCase):
def test_PUT_bad_content_type(self):
with save_globals():
set_http_connect(201, 201, 201)
- controller = proxy_server.ObjectController(self.app, 'account',
- 'container', 'object')
+ controller = ReplicatedObjectController(
+ self.app, 'account', 'container', 'object')
req = Request.blank('/v1/a/c/o', {}, headers={
'Content-Length': 0, 'Content-Type': 'foo/bar;swift_hey=45'})
self.app.update_request(req)
@@ -1801,8 +2556,8 @@ class TestObjectController(unittest.TestCase):
def test_PUT_getresponse_exceptions(self):
with save_globals():
- controller = proxy_server.ObjectController(self.app, 'account',
- 'container', 'object')
+ controller = ReplicatedObjectController(
+ self.app, 'account', 'container', 'object')
def test_status_map(statuses, expected):
self.app.memcache.store = {}
@@ -1847,6 +2602,8 @@ class TestObjectController(unittest.TestCase):
StoragePolicy(1, 'one', object_ring=FakeRing()),
])
def test_POST_backend_headers(self):
+ # reset the router post patch_policies
+ self.app.obj_controller_router = proxy_server.ObjectControllerRouter()
self.app.object_post_as_copy = False
self.app.sort_nodes = lambda nodes: nodes
backend_requests = []
@@ -2117,8 +2874,8 @@ class TestObjectController(unittest.TestCase):
with save_globals():
limit = constraints.MAX_META_VALUE_LENGTH
self.app.object_post_as_copy = False
- proxy_server.ObjectController(self.app, 'account',
- 'container', 'object')
+ ReplicatedObjectController(
+ self.app, 'account', 'container', 'object')
set_http_connect(200, 200, 202, 202, 202)
# acct cont obj obj obj
req = Request.blank('/v1/a/c/o', {'REQUEST_METHOD': 'POST'},
@@ -2665,8 +3422,8 @@ class TestObjectController(unittest.TestCase):
self.assertEqual(node_list, got_nodes)
def test_best_response_sets_headers(self):
- controller = proxy_server.ObjectController(self.app, 'account',
- 'container', 'object')
+ controller = ReplicatedObjectController(
+ self.app, 'account', 'container', 'object')
req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'GET'})
resp = controller.best_response(req, [200] * 3, ['OK'] * 3, [''] * 3,
'Object', headers=[{'X-Test': '1'},
@@ -2675,8 +3432,8 @@ class TestObjectController(unittest.TestCase):
self.assertEquals(resp.headers['X-Test'], '1')
def test_best_response_sets_etag(self):
- controller = proxy_server.ObjectController(self.app, 'account',
- 'container', 'object')
+ controller = ReplicatedObjectController(
+ self.app, 'account', 'container', 'object')
req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'GET'})
resp = controller.best_response(req, [200] * 3, ['OK'] * 3, [''] * 3,
'Object')
@@ -2709,8 +3466,8 @@ class TestObjectController(unittest.TestCase):
req = Request.blank('/v1/a/c/o',
environ={'REQUEST_METHOD': 'HEAD'})
self.app.update_request(req)
- controller = proxy_server.ObjectController(self.app, 'account',
- 'container', 'object')
+ controller = ReplicatedObjectController(
+ self.app, 'account', 'container', 'object')
set_http_connect(200, 200, 200)
resp = controller.HEAD(req)
self.assertEquals(resp.status_int, 200)
@@ -2722,8 +3479,8 @@ class TestObjectController(unittest.TestCase):
def test_error_limiting(self):
with save_globals():
- controller = proxy_server.ObjectController(self.app, 'account',
- 'container', 'object')
+ controller = ReplicatedObjectController(
+ self.app, 'account', 'container', 'object')
controller.app.sort_nodes = lambda l: l
object_ring = controller.app.get_object_ring(None)
self.assert_status_map(controller.HEAD, (200, 200, 503, 200, 200),
@@ -2759,8 +3516,8 @@ class TestObjectController(unittest.TestCase):
def test_error_limiting_survives_ring_reload(self):
with save_globals():
- controller = proxy_server.ObjectController(self.app, 'account',
- 'container', 'object')
+ controller = ReplicatedObjectController(
+ self.app, 'account', 'container', 'object')
controller.app.sort_nodes = lambda l: l
object_ring = controller.app.get_object_ring(None)
self.assert_status_map(controller.HEAD, (200, 200, 503, 200, 200),
@@ -2787,8 +3544,8 @@ class TestObjectController(unittest.TestCase):
def test_PUT_error_limiting(self):
with save_globals():
- controller = proxy_server.ObjectController(self.app, 'account',
- 'container', 'object')
+ controller = ReplicatedObjectController(
+ self.app, 'account', 'container', 'object')
controller.app.sort_nodes = lambda l: l
object_ring = controller.app.get_object_ring(None)
# acc con obj obj obj
@@ -2806,8 +3563,8 @@ class TestObjectController(unittest.TestCase):
def test_PUT_error_limiting_last_node(self):
with save_globals():
- controller = proxy_server.ObjectController(self.app, 'account',
- 'container', 'object')
+ controller = ReplicatedObjectController(
+ self.app, 'account', 'container', 'object')
controller.app.sort_nodes = lambda l: l
object_ring = controller.app.get_object_ring(None)
# acc con obj obj obj
@@ -2827,8 +3584,8 @@ class TestObjectController(unittest.TestCase):
with save_globals():
self.app.memcache = FakeMemcacheReturnsNone()
self.app._error_limiting = {}
- controller = proxy_server.ObjectController(self.app, 'account',
- 'container', 'object')
+ controller = ReplicatedObjectController(
+ self.app, 'account', 'container', 'object')
set_http_connect(200, 200, 200, 200, 200, 200)
req = Request.blank('/v1/a/c/o',
environ={'REQUEST_METHOD': 'DELETE'})
@@ -2924,8 +3681,8 @@ class TestObjectController(unittest.TestCase):
with save_globals():
self.app.object_post_as_copy = False
self.app.memcache = FakeMemcacheReturnsNone()
- controller = proxy_server.ObjectController(self.app, 'account',
- 'container', 'object')
+ controller = ReplicatedObjectController(
+ self.app, 'account', 'container', 'object')
set_http_connect(200, 404, 404, 404, 200, 200, 200)
req = Request.blank('/v1/a/c/o',
@@ -2945,8 +3702,8 @@ class TestObjectController(unittest.TestCase):
def test_PUT_POST_as_copy_requires_container_exist(self):
with save_globals():
self.app.memcache = FakeMemcacheReturnsNone()
- controller = proxy_server.ObjectController(self.app, 'account',
- 'container', 'object')
+ controller = ReplicatedObjectController(
+ self.app, 'account', 'container', 'object')
set_http_connect(200, 404, 404, 404, 200, 200, 200)
req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'PUT'})
self.app.update_request(req)
@@ -2963,8 +3720,8 @@ class TestObjectController(unittest.TestCase):
def test_bad_metadata(self):
with save_globals():
- controller = proxy_server.ObjectController(self.app, 'account',
- 'container', 'object')
+ controller = ReplicatedObjectController(
+ self.app, 'account', 'container', 'object')
set_http_connect(200, 200, 201, 201, 201)
# acct cont obj obj obj
req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
@@ -3060,8 +3817,8 @@ class TestObjectController(unittest.TestCase):
@contextmanager
def controller_context(self, req, *args, **kwargs):
_v, account, container, obj = utils.split_path(req.path, 4, 4, True)
- controller = proxy_server.ObjectController(self.app, account,
- container, obj)
+ controller = ReplicatedObjectController(
+ self.app, account, container, obj)
self.app.update_request(req)
self.app.memcache.store = {}
with save_globals():
@@ -3678,7 +4435,8 @@ class TestObjectController(unittest.TestCase):
def test_COPY_newest(self):
with save_globals():
- controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o')
+ controller = ReplicatedObjectController(
+ self.app, 'a', 'c', 'o')
req = Request.blank('/v1/a/c/o',
environ={'REQUEST_METHOD': 'COPY'},
headers={'Destination': '/c/o'})
@@ -3696,7 +4454,8 @@ class TestObjectController(unittest.TestCase):
def test_COPY_account_newest(self):
with save_globals():
- controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o')
+ controller = ReplicatedObjectController(
+ self.app, 'a', 'c', 'o')
req = Request.blank('/v1/a/c/o',
environ={'REQUEST_METHOD': 'COPY'},
headers={'Destination': '/c1/o',
@@ -3721,8 +4480,8 @@ class TestObjectController(unittest.TestCase):
headers=None, query_string=None):
backend_requests.append((method, path, headers))
- controller = proxy_server.ObjectController(self.app, 'a',
- 'c', 'o')
+ controller = ReplicatedObjectController(
+ self.app, 'a', 'c', 'o')
set_http_connect(200, 200, 200, 200, 200, 201, 201, 201,
give_connect=capture_requests)
self.app.memcache.store = {}
@@ -3751,8 +4510,8 @@ class TestObjectController(unittest.TestCase):
headers=None, query_string=None):
backend_requests.append((method, path, headers))
- controller = proxy_server.ObjectController(self.app, 'a',
- 'c', 'o')
+ controller = ReplicatedObjectController(
+ self.app, 'a', 'c', 'o')
set_http_connect(200, 200, 200, 200, 200, 200, 200, 201, 201, 201,
give_connect=capture_requests)
self.app.memcache.store = {}
@@ -3797,8 +4556,8 @@ class TestObjectController(unittest.TestCase):
with save_globals():
set_http_connect(201, 201, 201, 201)
- controller = proxy_server.ObjectController(self.app, 'account',
- 'container', 'object')
+ controller = ReplicatedObjectController(
+ self.app, 'account', 'container', 'object')
req = Request.blank('/v1/a/c/o',
environ={'REQUEST_METHOD': 'COPY'},
headers={'Transfer-Encoding': 'chunked',
@@ -3828,7 +4587,7 @@ class TestObjectController(unittest.TestCase):
def test_chunked_put_bad_version(self):
# Check bad version
(prolis, acc1lis, acc2lis, con1lis, con2lis, obj1lis,
- obj2lis) = _test_sockets
+ obj2lis, obj3lis) = _test_sockets
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
fd = sock.makefile()
fd.write('GET /v0 HTTP/1.1\r\nHost: localhost\r\n'
@@ -3842,7 +4601,7 @@ class TestObjectController(unittest.TestCase):
def test_chunked_put_bad_path(self):
# Check bad path
(prolis, acc1lis, acc2lis, con1lis, con2lis, obj1lis,
- obj2lis) = _test_sockets
+ obj2lis, obj3lis) = _test_sockets
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
fd = sock.makefile()
fd.write('GET invalid HTTP/1.1\r\nHost: localhost\r\n'
@@ -3856,7 +4615,7 @@ class TestObjectController(unittest.TestCase):
def test_chunked_put_bad_utf8(self):
# Check invalid utf-8
(prolis, acc1lis, acc2lis, con1lis, con2lis, obj1lis,
- obj2lis) = _test_sockets
+ obj2lis, obj3lis) = _test_sockets
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
fd = sock.makefile()
fd.write('GET /v1/a%80 HTTP/1.1\r\nHost: localhost\r\n'
@@ -3871,7 +4630,7 @@ class TestObjectController(unittest.TestCase):
def test_chunked_put_bad_path_no_controller(self):
# Check bad path, no controller
(prolis, acc1lis, acc2lis, con1lis, con2lis, obj1lis,
- obj2lis) = _test_sockets
+ obj2lis, obj3lis) = _test_sockets
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
fd = sock.makefile()
fd.write('GET /v1 HTTP/1.1\r\nHost: localhost\r\n'
@@ -3886,7 +4645,7 @@ class TestObjectController(unittest.TestCase):
def test_chunked_put_bad_method(self):
# Check bad method
(prolis, acc1lis, acc2lis, con1lis, con2lis, obj1lis,
- obj2lis) = _test_sockets
+ obj2lis, obj3lis) = _test_sockets
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
fd = sock.makefile()
fd.write('LICK /v1/a HTTP/1.1\r\nHost: localhost\r\n'
@@ -3901,9 +4660,9 @@ class TestObjectController(unittest.TestCase):
def test_chunked_put_unhandled_exception(self):
# Check unhandled exception
(prosrv, acc1srv, acc2srv, con1srv, con2srv, obj1srv,
- obj2srv) = _test_servers
+ obj2srv, obj3srv) = _test_servers
(prolis, acc1lis, acc2lis, con1lis, con2lis, obj1lis,
- obj2lis) = _test_sockets
+ obj2lis, obj3lis) = _test_sockets
orig_update_request = prosrv.update_request
def broken_update_request(*args, **kwargs):
@@ -3927,7 +4686,7 @@ class TestObjectController(unittest.TestCase):
# the part Application.log_request that 'enforces' a
# content_length on the response.
(prolis, acc1lis, acc2lis, con1lis, con2lis, obj1lis,
- obj2lis) = _test_sockets
+ obj2lis, obj3lis) = _test_sockets
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
fd = sock.makefile()
fd.write('HEAD /v1/a HTTP/1.1\r\nHost: localhost\r\n'
@@ -3951,7 +4710,7 @@ class TestObjectController(unittest.TestCase):
ustr_short = '\xe1\xbc\xb8\xce\xbf\xe1\xbd\xbatest'
# Create ustr container
(prolis, acc1lis, acc2lis, con1lis, con2lis, obj1lis,
- obj2lis) = _test_sockets
+ obj2lis, obj3lis) = _test_sockets
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
fd = sock.makefile()
fd.write('PUT /v1/a/%s HTTP/1.1\r\nHost: localhost\r\n'
@@ -4063,7 +4822,7 @@ class TestObjectController(unittest.TestCase):
def test_chunked_put_chunked_put(self):
# Do chunked object put
(prolis, acc1lis, acc2lis, con1lis, con2lis, obj1lis,
- obj2lis) = _test_sockets
+ obj2lis, obj3lis) = _test_sockets
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
fd = sock.makefile()
# Also happens to assert that x-storage-token is taken as a
@@ -4094,7 +4853,7 @@ class TestObjectController(unittest.TestCase):
versions_to_create = 3
# Create a container for our versioned object testing
(prolis, acc1lis, acc2lis, con1lis, con2lis, obj1lis,
- obj2lis) = _test_sockets
+ obj2lis, obj3lis) = _test_sockets
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
fd = sock.makefile()
pre = quote('%03x' % len(o))
@@ -4478,8 +5237,8 @@ class TestObjectController(unittest.TestCase):
@unpatch_policies
def test_conditional_range_get(self):
- (prolis, acc1lis, acc2lis, con1lis, con2lis, obj1lis, obj2lis) = \
- _test_sockets
+ (prolis, acc1lis, acc2lis, con1lis, con2lis, obj1lis, obj2lis,
+ obj3lis) = _test_sockets
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
# make a container
@@ -4527,8 +5286,8 @@ class TestObjectController(unittest.TestCase):
def test_mismatched_etags(self):
with save_globals():
# no etag supplied, object servers return success w/ diff values
- controller = proxy_server.ObjectController(self.app, 'account',
- 'container', 'object')
+ controller = ReplicatedObjectController(
+ self.app, 'account', 'container', 'object')
req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'Content-Length': '0'})
self.app.update_request(req)
@@ -4559,8 +5318,8 @@ class TestObjectController(unittest.TestCase):
with save_globals():
req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'GET'})
self.app.update_request(req)
- controller = proxy_server.ObjectController(self.app, 'account',
- 'container', 'object')
+ controller = ReplicatedObjectController(
+ self.app, 'account', 'container', 'object')
set_http_connect(200, 200, 200)
resp = controller.GET(req)
self.assert_('accept-ranges' in resp.headers)
@@ -4571,8 +5330,8 @@ class TestObjectController(unittest.TestCase):
req = Request.blank('/v1/a/c/o',
environ={'REQUEST_METHOD': 'HEAD'})
self.app.update_request(req)
- controller = proxy_server.ObjectController(self.app, 'account',
- 'container', 'object')
+ controller = ReplicatedObjectController(
+ self.app, 'account', 'container', 'object')
set_http_connect(200, 200, 200)
resp = controller.HEAD(req)
self.assert_('accept-ranges' in resp.headers)
@@ -4586,8 +5345,8 @@ class TestObjectController(unittest.TestCase):
return HTTPUnauthorized(request=req)
with save_globals():
set_http_connect(200, 200, 201, 201, 201)
- controller = proxy_server.ObjectController(self.app, 'account',
- 'container', 'object')
+ controller = ReplicatedObjectController(
+ self.app, 'account', 'container', 'object')
req = Request.blank('/v1/a/c/o')
req.environ['swift.authorize'] = authorize
self.app.update_request(req)
@@ -4602,8 +5361,8 @@ class TestObjectController(unittest.TestCase):
return HTTPUnauthorized(request=req)
with save_globals():
set_http_connect(200, 200, 201, 201, 201)
- controller = proxy_server.ObjectController(self.app, 'account',
- 'container', 'object')
+ controller = ReplicatedObjectController(
+ self.app, 'account', 'container', 'object')
req = Request.blank('/v1/a/c/o', {'REQUEST_METHOD': 'HEAD'})
req.environ['swift.authorize'] = authorize
self.app.update_request(req)
@@ -4619,8 +5378,8 @@ class TestObjectController(unittest.TestCase):
with save_globals():
self.app.object_post_as_copy = False
set_http_connect(200, 200, 201, 201, 201)
- controller = proxy_server.ObjectController(self.app, 'account',
- 'container', 'object')
+ controller = ReplicatedObjectController(
+ self.app, 'account', 'container', 'object')
req = Request.blank('/v1/a/c/o',
environ={'REQUEST_METHOD': 'POST'},
headers={'Content-Length': '5'}, body='12345')
@@ -4637,8 +5396,8 @@ class TestObjectController(unittest.TestCase):
return HTTPUnauthorized(request=req)
with save_globals():
set_http_connect(200, 200, 200, 200, 200, 201, 201, 201)
- controller = proxy_server.ObjectController(self.app, 'account',
- 'container', 'object')
+ controller = ReplicatedObjectController(
+ self.app, 'account', 'container', 'object')
req = Request.blank('/v1/a/c/o',
environ={'REQUEST_METHOD': 'POST'},
headers={'Content-Length': '5'}, body='12345')
@@ -4655,8 +5414,8 @@ class TestObjectController(unittest.TestCase):
return HTTPUnauthorized(request=req)
with save_globals():
set_http_connect(200, 200, 201, 201, 201)
- controller = proxy_server.ObjectController(self.app, 'account',
- 'container', 'object')
+ controller = ReplicatedObjectController(
+ self.app, 'account', 'container', 'object')
req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'Content-Length': '5'}, body='12345')
req.environ['swift.authorize'] = authorize
@@ -4672,8 +5431,8 @@ class TestObjectController(unittest.TestCase):
return HTTPUnauthorized(request=req)
with save_globals():
set_http_connect(200, 200, 200, 200, 200, 201, 201, 201)
- controller = proxy_server.ObjectController(self.app, 'account',
- 'container', 'object')
+ controller = ReplicatedObjectController(
+ self.app, 'account', 'container', 'object')
req = Request.blank('/v1/a/c/o',
environ={'REQUEST_METHOD': 'COPY'},
headers={'Destination': 'c/o'})
@@ -4685,8 +5444,8 @@ class TestObjectController(unittest.TestCase):
def test_POST_converts_delete_after_to_delete_at(self):
with save_globals():
self.app.object_post_as_copy = False
- controller = proxy_server.ObjectController(self.app, 'account',
- 'container', 'object')
+ controller = ReplicatedObjectController(
+ self.app, 'account', 'container', 'object')
set_http_connect(200, 200, 202, 202, 202)
self.app.memcache.store = {}
orig_time = time.time
@@ -4709,6 +5468,8 @@ class TestObjectController(unittest.TestCase):
StoragePolicy(1, 'one', True, object_ring=FakeRing())
])
def test_PUT_versioning_with_nonzero_default_policy(self):
+ # reset the router post patch_policies
+ self.app.obj_controller_router = proxy_server.ObjectControllerRouter()
def test_connect(ipaddr, port, device, partition, method, path,
headers=None, query_string=None):
@@ -4734,8 +5495,8 @@ class TestObjectController(unittest.TestCase):
{'zone': 2, 'ip': '10.0.0.2', 'region': 0,
'id': 2, 'device': 'sdc', 'port': 1002}]}
with save_globals():
- controller = proxy_server.ObjectController(self.app, 'a',
- 'c', 'o.jpg')
+ controller = ReplicatedObjectController(
+ self.app, 'a', 'c', 'o.jpg')
controller.container_info = fake_container_info
set_http_connect(200, 200, 200, # head: for the last version
@@ -4756,6 +5517,8 @@ class TestObjectController(unittest.TestCase):
StoragePolicy(1, 'one', True, object_ring=FakeRing())
])
def test_cross_policy_DELETE_versioning(self):
+ # reset the router post patch_policies
+ self.app.obj_controller_router = proxy_server.ObjectControllerRouter()
requests = []
def capture_requests(ipaddr, port, device, partition, method, path,
@@ -4885,8 +5648,8 @@ class TestObjectController(unittest.TestCase):
def test_OPTIONS(self):
with save_globals():
- controller = proxy_server.ObjectController(self.app, 'a',
- 'c', 'o.jpg')
+ controller = ReplicatedObjectController(
+ self.app, 'a', 'c', 'o.jpg')
def my_empty_container_info(*args):
return {}
@@ -4993,7 +5756,8 @@ class TestObjectController(unittest.TestCase):
def test_CORS_valid(self):
with save_globals():
- controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o')
+ controller = ReplicatedObjectController(
+ self.app, 'a', 'c', 'o')
def stubContainerInfo(*args):
return {
@@ -5046,7 +5810,8 @@ class TestObjectController(unittest.TestCase):
def test_CORS_valid_with_obj_headers(self):
with save_globals():
- controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o')
+ controller = ReplicatedObjectController(
+ self.app, 'a', 'c', 'o')
def stubContainerInfo(*args):
return {
@@ -5107,7 +5872,8 @@ class TestObjectController(unittest.TestCase):
def test_PUT_x_container_headers_with_equal_replicas(self):
req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'Content-Length': '5'}, body='12345')
- controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o')
+ controller = ReplicatedObjectController(
+ self.app, 'a', 'c', 'o')
seen_headers = self._gather_x_container_headers(
controller.PUT, req,
200, 200, 201, 201, 201) # HEAD HEAD PUT PUT PUT
@@ -5128,7 +5894,8 @@ class TestObjectController(unittest.TestCase):
req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'Content-Length': '5'}, body='12345')
- controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o')
+ controller = ReplicatedObjectController(
+ self.app, 'a', 'c', 'o')
seen_headers = self._gather_x_container_headers(
controller.PUT, req,
200, 200, 201, 201, 201) # HEAD HEAD PUT PUT PUT
@@ -5150,7 +5917,8 @@ class TestObjectController(unittest.TestCase):
req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'Content-Length': '5'}, body='12345')
- controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o')
+ controller = ReplicatedObjectController(
+ self.app, 'a', 'c', 'o')
seen_headers = self._gather_x_container_headers(
controller.PUT, req,
200, 200, 201, 201, 201) # HEAD HEAD PUT PUT PUT
@@ -5174,7 +5942,8 @@ class TestObjectController(unittest.TestCase):
req = Request.blank('/v1/a/c/o',
environ={'REQUEST_METHOD': 'POST'},
headers={'Content-Type': 'application/stuff'})
- controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o')
+ controller = ReplicatedObjectController(
+ self.app, 'a', 'c', 'o')
seen_headers = self._gather_x_container_headers(
controller.POST, req,
200, 200, 200, 200, 200) # HEAD HEAD POST POST POST
@@ -5197,7 +5966,8 @@ class TestObjectController(unittest.TestCase):
req = Request.blank('/v1/a/c/o',
environ={'REQUEST_METHOD': 'DELETE'},
headers={'Content-Type': 'application/stuff'})
- controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o')
+ controller = ReplicatedObjectController(
+ self.app, 'a', 'c', 'o')
seen_headers = self._gather_x_container_headers(
controller.DELETE, req,
200, 200, 200, 200, 200) # HEAD HEAD DELETE DELETE DELETE
@@ -5226,7 +5996,8 @@ class TestObjectController(unittest.TestCase):
headers={'Content-Type': 'application/stuff',
'Content-Length': '0',
'X-Delete-At': str(delete_at_timestamp)})
- controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o')
+ controller = ReplicatedObjectController(
+ self.app, 'a', 'c', 'o')
seen_headers = self._gather_x_container_headers(
controller.PUT, req,
200, 200, 201, 201, 201, # HEAD HEAD PUT PUT PUT
@@ -5262,7 +6033,8 @@ class TestObjectController(unittest.TestCase):
headers={'Content-Type': 'application/stuff',
'Content-Length': 0,
'X-Delete-At': str(delete_at_timestamp)})
- controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o')
+ controller = ReplicatedObjectController(
+ self.app, 'a', 'c', 'o')
seen_headers = self._gather_x_container_headers(
controller.PUT, req,
200, 200, 201, 201, 201, # HEAD HEAD PUT PUT PUT
@@ -5284,6 +6056,373 @@ class TestObjectController(unittest.TestCase):
])
+class TestECMismatchedFA(unittest.TestCase):
+ def tearDown(self):
+ prosrv = _test_servers[0]
+ # don't leak error limits and poison other tests
+ prosrv._error_limiting = {}
+
+ def test_mixing_different_objects_fragment_archives(self):
+ (prosrv, acc1srv, acc2srv, con1srv, con2srv, obj1srv,
+ obj2srv, obj3srv) = _test_servers
+ ec_policy = POLICIES[3]
+
+ @public
+ def bad_disk(req):
+ return Response(status=507, body="borken")
+
+ ensure_container = Request.blank(
+ "/v1/a/ec-crazytown",
+ environ={"REQUEST_METHOD": "PUT"},
+ headers={"X-Storage-Policy": "ec", "X-Auth-Token": "t"})
+ resp = ensure_container.get_response(prosrv)
+ self.assertTrue(resp.status_int in (201, 202))
+
+ obj1 = "first version..."
+ put_req1 = Request.blank(
+ "/v1/a/ec-crazytown/obj",
+ environ={"REQUEST_METHOD": "PUT"},
+ headers={"X-Auth-Token": "t"})
+ put_req1.body = obj1
+
+ obj2 = u"versiĆ³n segundo".encode("utf-8")
+ put_req2 = Request.blank(
+ "/v1/a/ec-crazytown/obj",
+ environ={"REQUEST_METHOD": "PUT"},
+ headers={"X-Auth-Token": "t"})
+ put_req2.body = obj2
+
+ # pyeclib has checks for unequal-length; we don't want to trip those
+ self.assertEqual(len(obj1), len(obj2))
+
+ # Servers obj1 and obj2 will have the first version of the object
+ prosrv._error_limiting = {}
+ with nested(
+ mock.patch.object(obj3srv, 'PUT', bad_disk),
+ mock.patch(
+ 'swift.common.storage_policy.ECStoragePolicy.quorum')):
+ type(ec_policy).quorum = mock.PropertyMock(return_value=2)
+ resp = put_req1.get_response(prosrv)
+ self.assertEqual(resp.status_int, 201)
+
+ # Server obj3 (and, in real life, some handoffs) will have the
+ # second version of the object.
+ prosrv._error_limiting = {}
+ with nested(
+ mock.patch.object(obj1srv, 'PUT', bad_disk),
+ mock.patch.object(obj2srv, 'PUT', bad_disk),
+ mock.patch(
+ 'swift.common.storage_policy.ECStoragePolicy.quorum'),
+ mock.patch(
+ 'swift.proxy.controllers.base.Controller._quorum_size',
+ lambda *a, **kw: 1)):
+ type(ec_policy).quorum = mock.PropertyMock(return_value=1)
+ resp = put_req2.get_response(prosrv)
+ self.assertEqual(resp.status_int, 201)
+
+ # A GET that only sees 1 fragment archive should fail
+ get_req = Request.blank("/v1/a/ec-crazytown/obj",
+ environ={"REQUEST_METHOD": "GET"},
+ headers={"X-Auth-Token": "t"})
+ prosrv._error_limiting = {}
+ with nested(
+ mock.patch.object(obj1srv, 'GET', bad_disk),
+ mock.patch.object(obj2srv, 'GET', bad_disk)):
+ resp = get_req.get_response(prosrv)
+ self.assertEqual(resp.status_int, 503)
+
+ # A GET that sees 2 matching FAs will work
+ get_req = Request.blank("/v1/a/ec-crazytown/obj",
+ environ={"REQUEST_METHOD": "GET"},
+ headers={"X-Auth-Token": "t"})
+ prosrv._error_limiting = {}
+ with mock.patch.object(obj3srv, 'GET', bad_disk):
+ resp = get_req.get_response(prosrv)
+ self.assertEqual(resp.status_int, 200)
+ self.assertEqual(resp.body, obj1)
+
+ # A GET that sees 2 mismatching FAs will fail
+ get_req = Request.blank("/v1/a/ec-crazytown/obj",
+ environ={"REQUEST_METHOD": "GET"},
+ headers={"X-Auth-Token": "t"})
+ prosrv._error_limiting = {}
+ with mock.patch.object(obj2srv, 'GET', bad_disk):
+ resp = get_req.get_response(prosrv)
+ self.assertEqual(resp.status_int, 503)
+
+
+class TestObjectECRangedGET(unittest.TestCase):
+ def setUp(self):
+ self.app = proxy_server.Application(
+ None, FakeMemcache(),
+ logger=debug_logger('proxy-ut'),
+ account_ring=FakeRing(),
+ container_ring=FakeRing())
+
+ @classmethod
+ def setUpClass(cls):
+ cls.obj_name = 'range-get-test'
+ cls.tiny_obj_name = 'range-get-test-tiny'
+ cls.aligned_obj_name = 'range-get-test-aligned'
+
+ # Note: only works if called with unpatched policies
+ prolis = _test_sockets[0]
+ sock = connect_tcp(('localhost', prolis.getsockname()[1]))
+ fd = sock.makefile()
+ fd.write('PUT /v1/a/ec-con HTTP/1.1\r\n'
+ 'Host: localhost\r\n'
+ 'Connection: close\r\n'
+ 'Content-Length: 0\r\n'
+ 'X-Storage-Token: t\r\n'
+ 'X-Storage-Policy: ec\r\n'
+ '\r\n')
+ fd.flush()
+ headers = readuntil2crlfs(fd)
+ exp = 'HTTP/1.1 2'
+ assert headers[:len(exp)] == exp, "container PUT failed"
+
+ seg_size = POLICIES.get_by_name("ec").ec_segment_size
+ cls.seg_size = seg_size
+ # EC segment size is 4 KiB, hence this gives 4 segments, which we
+ # then verify with a quick sanity check
+ cls.obj = ' my hovercraft is full of eels '.join(
+ str(s) for s in range(431))
+ assert seg_size * 4 > len(cls.obj) > seg_size * 3, \
+ "object is wrong number of segments"
+
+ cls.tiny_obj = 'tiny, tiny object'
+ assert len(cls.tiny_obj) < seg_size, "tiny_obj too large"
+
+ cls.aligned_obj = "".join(
+ "abcdEFGHijkl%04d" % x for x in range(512))
+ assert len(cls.aligned_obj) % seg_size == 0, "aligned obj not aligned"
+
+ for obj_name, obj in ((cls.obj_name, cls.obj),
+ (cls.tiny_obj_name, cls.tiny_obj),
+ (cls.aligned_obj_name, cls.aligned_obj)):
+ sock = connect_tcp(('localhost', prolis.getsockname()[1]))
+ fd = sock.makefile()
+ fd.write('PUT /v1/a/ec-con/%s HTTP/1.1\r\n'
+ 'Host: localhost\r\n'
+ 'Connection: close\r\n'
+ 'Content-Length: %d\r\n'
+ 'X-Storage-Token: t\r\n'
+ 'Content-Type: application/octet-stream\r\n'
+ '\r\n%s' % (obj_name, len(obj), obj))
+ fd.flush()
+ headers = readuntil2crlfs(fd)
+ exp = 'HTTP/1.1 201'
+ assert headers[:len(exp)] == exp, \
+ "object PUT failed %s" % obj_name
+
+ def _get_obj(self, range_value, obj_name=None):
+ if obj_name is None:
+ obj_name = self.obj_name
+
+ prolis = _test_sockets[0]
+ sock = connect_tcp(('localhost', prolis.getsockname()[1]))
+ fd = sock.makefile()
+ fd.write('GET /v1/a/ec-con/%s HTTP/1.1\r\n'
+ 'Host: localhost\r\n'
+ 'Connection: close\r\n'
+ 'X-Storage-Token: t\r\n'
+ 'Range: %s\r\n'
+ '\r\n' % (obj_name, range_value))
+ fd.flush()
+ headers = readuntil2crlfs(fd)
+ # e.g. "HTTP/1.1 206 Partial Content\r\n..."
+ status_code = int(headers[9:12])
+ headers = parse_headers_string(headers)
+
+ gotten_obj = ''
+ while True:
+ buf = fd.read(64)
+ if not buf:
+ break
+ gotten_obj += buf
+
+ return (status_code, headers, gotten_obj)
+
+ def test_unaligned(self):
+ # One segment's worth of data, but straddling two segment boundaries
+ # (so it has data from three segments)
+ status, headers, gotten_obj = self._get_obj("bytes=3783-7878")
+ self.assertEqual(status, 206)
+ self.assertEqual(headers['Content-Length'], "4096")
+ self.assertEqual(headers['Content-Range'], "bytes 3783-7878/14513")
+ self.assertEqual(len(gotten_obj), 4096)
+ self.assertEqual(gotten_obj, self.obj[3783:7879])
+
+ def test_aligned_left(self):
+ # First byte is aligned to a segment boundary, last byte is not
+ status, headers, gotten_obj = self._get_obj("bytes=0-5500")
+ self.assertEqual(status, 206)
+ self.assertEqual(headers['Content-Length'], "5501")
+ self.assertEqual(headers['Content-Range'], "bytes 0-5500/14513")
+ self.assertEqual(len(gotten_obj), 5501)
+ self.assertEqual(gotten_obj, self.obj[:5501])
+
+ def test_aligned_range(self):
+ # Ranged GET that wants exactly one segment
+ status, headers, gotten_obj = self._get_obj("bytes=4096-8191")
+ self.assertEqual(status, 206)
+ self.assertEqual(headers['Content-Length'], "4096")
+ self.assertEqual(headers['Content-Range'], "bytes 4096-8191/14513")
+ self.assertEqual(len(gotten_obj), 4096)
+ self.assertEqual(gotten_obj, self.obj[4096:8192])
+
+ def test_aligned_range_end(self):
+ # Ranged GET that wants exactly the last segment
+ status, headers, gotten_obj = self._get_obj("bytes=12288-14512")
+ self.assertEqual(status, 206)
+ self.assertEqual(headers['Content-Length'], "2225")
+ self.assertEqual(headers['Content-Range'], "bytes 12288-14512/14513")
+ self.assertEqual(len(gotten_obj), 2225)
+ self.assertEqual(gotten_obj, self.obj[12288:])
+
+ def test_aligned_range_aligned_obj(self):
+ # Ranged GET that wants exactly the last segment, which is full-size
+ status, headers, gotten_obj = self._get_obj("bytes=4096-8191",
+ self.aligned_obj_name)
+ self.assertEqual(status, 206)
+ self.assertEqual(headers['Content-Length'], "4096")
+ self.assertEqual(headers['Content-Range'], "bytes 4096-8191/8192")
+ self.assertEqual(len(gotten_obj), 4096)
+ self.assertEqual(gotten_obj, self.aligned_obj[4096:8192])
+
+ def test_byte_0(self):
+ # Just the first byte, but it's index 0, so that's easy to get wrong
+ status, headers, gotten_obj = self._get_obj("bytes=0-0")
+ self.assertEqual(status, 206)
+ self.assertEqual(headers['Content-Length'], "1")
+ self.assertEqual(headers['Content-Range'], "bytes 0-0/14513")
+ self.assertEqual(gotten_obj, self.obj[0])
+
+ def test_unsatisfiable(self):
+ # Goes just one byte too far off the end of the object, so it's
+ # unsatisfiable
+ status, _junk, _junk = self._get_obj(
+ "bytes=%d-%d" % (len(self.obj), len(self.obj) + 100))
+ self.assertEqual(status, 416)
+
+ def test_off_end(self):
+ # Ranged GET that's mostly off the end of the object, but overlaps
+ # it in just the last byte
+ status, headers, gotten_obj = self._get_obj(
+ "bytes=%d-%d" % (len(self.obj) - 1, len(self.obj) + 100))
+ self.assertEqual(status, 206)
+ self.assertEqual(headers['Content-Length'], '1')
+ self.assertEqual(headers['Content-Range'], 'bytes 14512-14512/14513')
+ self.assertEqual(gotten_obj, self.obj[-1])
+
+ def test_aligned_off_end(self):
+ # Ranged GET that starts on a segment boundary but asks for a whole lot
+ status, headers, gotten_obj = self._get_obj(
+ "bytes=%d-%d" % (8192, len(self.obj) + 100))
+ self.assertEqual(status, 206)
+ self.assertEqual(headers['Content-Length'], '6321')
+ self.assertEqual(headers['Content-Range'], 'bytes 8192-14512/14513')
+ self.assertEqual(gotten_obj, self.obj[8192:])
+
+ def test_way_off_end(self):
+ # Ranged GET that's mostly off the end of the object, but overlaps
+ # it in just the last byte, and wants multiple segments' worth off
+ # the end
+ status, headers, gotten_obj = self._get_obj(
+ "bytes=%d-%d" % (len(self.obj) - 1, len(self.obj) * 1000))
+ self.assertEqual(status, 206)
+ self.assertEqual(headers['Content-Length'], '1')
+ self.assertEqual(headers['Content-Range'], 'bytes 14512-14512/14513')
+ self.assertEqual(gotten_obj, self.obj[-1])
+
+ def test_boundaries(self):
+ # Wants the last byte of segment 1 + the first byte of segment 2
+ status, headers, gotten_obj = self._get_obj("bytes=4095-4096")
+ self.assertEqual(status, 206)
+ self.assertEqual(headers['Content-Length'], '2')
+ self.assertEqual(headers['Content-Range'], 'bytes 4095-4096/14513')
+ self.assertEqual(gotten_obj, self.obj[4095:4097])
+
+ def test_until_end(self):
+ # Wants the last byte of segment 1 + the rest
+ status, headers, gotten_obj = self._get_obj("bytes=4095-")
+ self.assertEqual(status, 206)
+ self.assertEqual(headers['Content-Length'], '10418')
+ self.assertEqual(headers['Content-Range'], 'bytes 4095-14512/14513')
+ self.assertEqual(gotten_obj, self.obj[4095:])
+
+ def test_small_suffix(self):
+ # Small range-suffix GET: the last 100 bytes (less than one segment)
+ status, headers, gotten_obj = self._get_obj("bytes=-100")
+ self.assertEqual(status, 206)
+ self.assertEqual(headers['Content-Length'], '100')
+ self.assertEqual(headers['Content-Range'], 'bytes 14413-14512/14513')
+ self.assertEqual(len(gotten_obj), 100)
+ self.assertEqual(gotten_obj, self.obj[-100:])
+
+ def test_small_suffix_aligned(self):
+ # Small range-suffix GET: the last 100 bytes, last segment is
+ # full-size
+ status, headers, gotten_obj = self._get_obj("bytes=-100",
+ self.aligned_obj_name)
+ self.assertEqual(status, 206)
+ self.assertEqual(headers['Content-Length'], '100')
+ self.assertEqual(headers['Content-Range'], 'bytes 8092-8191/8192')
+ self.assertEqual(len(gotten_obj), 100)
+
+ def test_suffix_two_segs(self):
+ # Ask for enough data that we need the last two segments. The last
+ # segment is short, though, so this ensures we compensate for that.
+ #
+ # Note that the total range size is less than one full-size segment.
+ suffix_len = len(self.obj) % self.seg_size + 1
+
+ status, headers, gotten_obj = self._get_obj("bytes=-%d" % suffix_len)
+ self.assertEqual(status, 206)
+ self.assertEqual(headers['Content-Length'], str(suffix_len))
+ self.assertEqual(headers['Content-Range'],
+ 'bytes %d-%d/%d' % (len(self.obj) - suffix_len,
+ len(self.obj) - 1,
+ len(self.obj)))
+ self.assertEqual(len(gotten_obj), suffix_len)
+
+ def test_large_suffix(self):
+ # Large range-suffix GET: the last 5000 bytes (more than one segment)
+ status, headers, gotten_obj = self._get_obj("bytes=-5000")
+ self.assertEqual(status, 206)
+ self.assertEqual(headers['Content-Length'], '5000')
+ self.assertEqual(headers['Content-Range'], 'bytes 9513-14512/14513')
+ self.assertEqual(len(gotten_obj), 5000)
+ self.assertEqual(gotten_obj, self.obj[-5000:])
+
+ def test_overlarge_suffix(self):
+ # The last N+1 bytes of an N-byte object
+ status, headers, gotten_obj = self._get_obj(
+ "bytes=-%d" % (len(self.obj) + 1))
+ self.assertEqual(status, 206)
+ self.assertEqual(headers['Content-Length'], '14513')
+ self.assertEqual(headers['Content-Range'], 'bytes 0-14512/14513')
+ self.assertEqual(len(gotten_obj), len(self.obj))
+ self.assertEqual(gotten_obj, self.obj)
+
+ def test_small_suffix_tiny_object(self):
+ status, headers, gotten_obj = self._get_obj(
+ "bytes=-5", self.tiny_obj_name)
+ self.assertEqual(status, 206)
+ self.assertEqual(headers['Content-Length'], '5')
+ self.assertEqual(headers['Content-Range'], 'bytes 12-16/17')
+ self.assertEqual(gotten_obj, self.tiny_obj[12:])
+
+ def test_overlarge_suffix_tiny_object(self):
+ status, headers, gotten_obj = self._get_obj(
+ "bytes=-1234567890", self.tiny_obj_name)
+ self.assertEqual(status, 206)
+ self.assertEqual(headers['Content-Length'], '17')
+ self.assertEqual(headers['Content-Range'], 'bytes 0-16/17')
+ self.assertEqual(len(gotten_obj), len(self.tiny_obj))
+ self.assertEqual(gotten_obj, self.tiny_obj)
+
+
@patch_policies([
StoragePolicy(0, 'zero', True, object_ring=FakeRing(base_port=3000)),
StoragePolicy(1, 'one', False, object_ring=FakeRing(base_port=3000)),
@@ -5526,7 +6665,7 @@ class TestContainerController(unittest.TestCase):
headers)
self.assertEqual(int(headers
['X-Backend-Storage-Policy-Index']),
- policy.idx)
+ int(policy))
# make sure all mocked responses are consumed
self.assertRaises(StopIteration, mock_conn.code_iter.next)