summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSamuel Merritt <sam@swiftstack.com>2014-09-16 18:40:41 -0700
committerClay Gerrard <clay.gerrard@gmail.com>2015-04-14 00:52:17 -0700
commitb1eda4aef8a228961d5aafe7e4fbd4e812d233ad (patch)
tree42c051e086beec1b9640b0ea925973aceb9c94c5
parentfa89064933fefa33702520b40734c11f08b2c569 (diff)
downloadswift-b1eda4aef8a228961d5aafe7e4fbd4e812d233ad.tar.gz
Allow sending object metadata after data
This lets the proxy server send object metadata to the object server after the object data. This is necessary for EC, as it allows us to compute the etag of the object in the proxy server and still store it with the object. The wire format is a multipart MIME document. For sanity during a rolling upgrade, the multipart MIME document is only sent to the object server if it indicates, via 100 Continue header, that it knows how to consume it. Example 1 (new proxy, new obj server): proxy: PUT /p/a/c/o X-Backend-Obj-Metadata-Footer: yes obj: 100 Continue X-Obj-Metadata-Footer: yes proxy: --MIMEmimeMIMEmime... Example2: (new proxy, old obj server) proxy: PUT /p/a/c/o X-Backend-Obj-Metadata-Footer: yes obj: 100 Continue proxy: <obj body> Co-Authored-By: Alistair Coles <alistair.coles@hp.com> Co-Authored-By: Thiago da Silva <thiago@redhat.com> Co-Authored-By: John Dickinson <me@not.mn> Co-Authored-By: Clay Gerrard <clay.gerrard@gmail.com> Co-Authored-By: Tushar Gohad <tushar.gohad@intel.com> Co-Authored-By: Paul Luse <paul.e.luse@intel.com> Co-Authored-By: Christian Schwede <christian.schwede@enovance.com> Co-Authored-By: Yuan Zhou <yuan.zhou@intel.com> Change-Id: Id38f7e93e3473f19ff88123ae0501000ed9b2e89
-rw-r--r--swift/common/request_helpers.py28
-rw-r--r--swift/common/swob.py63
-rw-r--r--swift/obj/server.py275
-rw-r--r--swift/obj/ssync_receiver.py17
-rw-r--r--swift/proxy/controllers/obj.py3
-rw-r--r--test/unit/common/test_request_helpers.py81
-rw-r--r--test/unit/common/test_swob.py21
-rwxr-xr-xtest/unit/obj/test_server.py954
-rw-r--r--test/unit/obj/test_ssync_receiver.py66
9 files changed, 1333 insertions, 175 deletions
diff --git a/swift/common/request_helpers.py b/swift/common/request_helpers.py
index 08e0ab5dc..14b9fd884 100644
--- a/swift/common/request_helpers.py
+++ b/swift/common/request_helpers.py
@@ -26,10 +26,12 @@ import time
from contextlib import contextmanager
from urllib import unquote
from swift import gettext_ as _
+from swift.common.storage_policy import POLICIES
from swift.common.constraints import FORMAT2CONTENT_TYPE
from swift.common.exceptions import ListingIterError, SegmentError
from swift.common.http import is_success
-from swift.common.swob import HTTPBadRequest, HTTPNotAcceptable
+from swift.common.swob import (HTTPBadRequest, HTTPNotAcceptable,
+ HTTPServiceUnavailable)
from swift.common.utils import split_path, validate_device_partition
from swift.common.wsgi import make_subrequest
@@ -82,21 +84,27 @@ def get_listing_content_type(req):
def get_name_and_placement(request, minsegs=1, maxsegs=None,
rest_with_last=False):
"""
- Utility function to split and validate the request path and
- storage_policy_index. The storage_policy_index is extracted from
- the headers of the request and converted to an integer, and then the
- args are passed through to :meth:`split_and_validate_path`.
+ Utility function to split and validate the request path and storage
+ policy. The storage policy index is extracted from the headers of
+ the request and converted to a StoragePolicy instance. The
+ remaining args are passed through to
+ :meth:`split_and_validate_path`.
:returns: a list, result of :meth:`split_and_validate_path` with
- storage_policy_index appended on the end
- :raises: HTTPBadRequest
+ the BaseStoragePolicy instance appended on the end
+ :raises: HTTPServiceUnavailable if the path is invalid or no policy exists
+ with the extracted policy_index.
"""
- policy_idx = request.headers.get('X-Backend-Storage-Policy-Index', '0')
- policy_idx = int(policy_idx)
+ policy_index = request.headers.get('X-Backend-Storage-Policy-Index')
+ policy = POLICIES.get_by_index(policy_index)
+ if not policy:
+ raise HTTPServiceUnavailable(
+ body=_("No policy with index %s") % policy_index,
+ request=request, content_type='text/plain')
results = split_and_validate_path(request, minsegs=minsegs,
maxsegs=maxsegs,
rest_with_last=rest_with_last)
- results.append(policy_idx)
+ results.append(policy)
return results
diff --git a/swift/common/swob.py b/swift/common/swob.py
index 729cdd96f..c2e3afb4e 100644
--- a/swift/common/swob.py
+++ b/swift/common/swob.py
@@ -36,7 +36,7 @@ needs to change.
"""
from collections import defaultdict
-from cStringIO import StringIO
+from StringIO import StringIO
import UserDict
import time
from functools import partial
@@ -128,6 +128,20 @@ class _UTC(tzinfo):
UTC = _UTC()
+class WsgiStringIO(StringIO):
+ """
+ This class adds support for the additional wsgi.input methods defined on
+ eventlet.wsgi.Input to the StringIO class which would otherwise be a fine
+ stand-in for the file-like object in the WSGI environment.
+ """
+
+ def set_hundred_continue_response_headers(self, headers):
+ pass
+
+ def send_hundred_continue_response(self):
+ pass
+
+
def _datetime_property(header):
"""
Set and retrieve the datetime value of self.headers[header]
@@ -743,16 +757,16 @@ def _req_environ_property(environ_field):
def _req_body_property():
"""
Set and retrieve the Request.body parameter. It consumes wsgi.input and
- returns the results. On assignment, uses a StringIO to create a new
+ returns the results. On assignment, uses a WsgiStringIO to create a new
wsgi.input.
"""
def getter(self):
body = self.environ['wsgi.input'].read()
- self.environ['wsgi.input'] = StringIO(body)
+ self.environ['wsgi.input'] = WsgiStringIO(body)
return body
def setter(self, value):
- self.environ['wsgi.input'] = StringIO(value)
+ self.environ['wsgi.input'] = WsgiStringIO(value)
self.environ['CONTENT_LENGTH'] = str(len(value))
return property(getter, setter, doc="Get and set the request body str")
@@ -820,7 +834,7 @@ class Request(object):
:param path: encoded, parsed, and unquoted into PATH_INFO
:param environ: WSGI environ dictionary
:param headers: HTTP headers
- :param body: stuffed in a StringIO and hung on wsgi.input
+ :param body: stuffed in a WsgiStringIO and hung on wsgi.input
:param kwargs: any environ key with an property setter
"""
headers = headers or {}
@@ -855,10 +869,10 @@ class Request(object):
}
env.update(environ)
if body is not None:
- env['wsgi.input'] = StringIO(body)
+ env['wsgi.input'] = WsgiStringIO(body)
env['CONTENT_LENGTH'] = str(len(body))
elif 'wsgi.input' not in env:
- env['wsgi.input'] = StringIO('')
+ env['wsgi.input'] = WsgiStringIO('')
req = Request(env)
for key, val in headers.iteritems():
req.headers[key] = val
@@ -965,7 +979,7 @@ class Request(object):
env.update({
'REQUEST_METHOD': 'GET',
'CONTENT_LENGTH': '0',
- 'wsgi.input': StringIO(''),
+ 'wsgi.input': WsgiStringIO(''),
})
return Request(env)
@@ -1102,10 +1116,12 @@ class Response(object):
app_iter = _resp_app_iter_property()
def __init__(self, body=None, status=200, headers=None, app_iter=None,
- request=None, conditional_response=False, **kw):
+ request=None, conditional_response=False,
+ conditional_etag=None, **kw):
self.headers = HeaderKeyDict(
[('Content-Type', 'text/html; charset=UTF-8')])
self.conditional_response = conditional_response
+ self._conditional_etag = conditional_etag
self.request = request
self.body = body
self.app_iter = app_iter
@@ -1131,6 +1147,26 @@ class Response(object):
if 'charset' in kw and 'content_type' in kw:
self.charset = kw['charset']
+ @property
+ def conditional_etag(self):
+ """
+ The conditional_etag keyword argument for Response will allow the
+ conditional match value of a If-Match request to be compared to a
+ non-standard value.
+
+ This is available for Storage Policies that do not store the client
+ object data verbatim on the storage nodes, but still need support
+ conditional requests.
+
+ It's most effectively used with X-Backend-Etag-Is-At which would
+ define the additional Metadata key where the original ETag of the
+ clear-form client request data.
+ """
+ if self._conditional_etag is not None:
+ return self._conditional_etag
+ else:
+ return self.etag
+
def _prepare_for_ranges(self, ranges):
"""
Prepare the Response for multiple ranges.
@@ -1161,15 +1197,16 @@ class Response(object):
return content_size, content_type
def _response_iter(self, app_iter, body):
+ etag = self.conditional_etag
if self.conditional_response and self.request:
- if self.etag and self.request.if_none_match and \
- self.etag in self.request.if_none_match:
+ if etag and self.request.if_none_match and \
+ etag in self.request.if_none_match:
self.status = 304
self.content_length = 0
return ['']
- if self.etag and self.request.if_match and \
- self.etag not in self.request.if_match:
+ if etag and self.request.if_match and \
+ etag not in self.request.if_match:
self.status = 412
self.content_length = 0
return ['']
diff --git a/swift/obj/server.py b/swift/obj/server.py
index f4dbb4264..0ebaeb5f0 100644
--- a/swift/obj/server.py
+++ b/swift/obj/server.py
@@ -16,10 +16,12 @@
""" Object Server for Swift """
import cPickle as pickle
+import json
import os
import multiprocessing
import time
import traceback
+import rfc822
import socket
import math
from swift import gettext_ as _
@@ -30,7 +32,7 @@ from eventlet import sleep, wsgi, Timeout
from swift.common.utils import public, get_logger, \
config_true_value, timing_stats, replication, \
normalize_delete_at_timestamp, get_log_line, Timestamp, \
- get_expirer_container
+ get_expirer_container, iter_multipart_mime_documents
from swift.common.bufferedhttp import http_connect
from swift.common.constraints import check_object_creation, \
valid_timestamp, check_utf8
@@ -48,8 +50,35 @@ from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPCreated, \
HTTPPreconditionFailed, HTTPRequestTimeout, HTTPUnprocessableEntity, \
HTTPClientDisconnect, HTTPMethodNotAllowed, Request, Response, \
HTTPInsufficientStorage, HTTPForbidden, HTTPException, HeaderKeyDict, \
- HTTPConflict
-from swift.obj.diskfile import DATAFILE_SYSTEM_META, DiskFileManager
+ HTTPConflict, HTTPServerError
+from swift.obj.diskfile import DATAFILE_SYSTEM_META, DiskFileRouter
+
+
+def iter_mime_headers_and_bodies(wsgi_input, mime_boundary, read_chunk_size):
+ mime_documents_iter = iter_multipart_mime_documents(
+ wsgi_input, mime_boundary, read_chunk_size)
+
+ for file_like in mime_documents_iter:
+ hdrs = HeaderKeyDict(rfc822.Message(file_like, 0))
+ yield (hdrs, file_like)
+
+
+def drain(file_like, read_size, timeout):
+ """
+ Read and discard any bytes from file_like.
+
+ :param file_like: file-like object to read from
+ :param read_size: how big a chunk to read at a time
+ :param timeout: how long to wait for a read (use None for no timeout)
+
+ :raises ChunkReadTimeout: if no chunk was read in time
+ """
+
+ while True:
+ with ChunkReadTimeout(timeout):
+ chunk = file_like.read(read_size)
+ if not chunk:
+ break
class EventletPlungerString(str):
@@ -142,7 +171,7 @@ class ObjectController(BaseStorageServer):
# Common on-disk hierarchy shared across account, container and object
# servers.
- self._diskfile_mgr = DiskFileManager(conf, self.logger)
+ self._diskfile_router = DiskFileRouter(conf, self.logger)
# This is populated by global_conf_callback way below as the semaphore
# is shared by all workers.
if 'replication_semaphore' in conf:
@@ -156,7 +185,7 @@ class ObjectController(BaseStorageServer):
conf.get('replication_failure_ratio') or 1.0)
def get_diskfile(self, device, partition, account, container, obj,
- policy_idx, **kwargs):
+ policy, **kwargs):
"""
Utility method for instantiating a DiskFile object supporting a given
REST API.
@@ -165,11 +194,11 @@ class ObjectController(BaseStorageServer):
DiskFile class would simply over-ride this method to provide that
behavior.
"""
- return self._diskfile_mgr.get_diskfile(
- device, partition, account, container, obj, policy_idx, **kwargs)
+ return self._diskfile_router[policy].get_diskfile(
+ device, partition, account, container, obj, policy, **kwargs)
def async_update(self, op, account, container, obj, host, partition,
- contdevice, headers_out, objdevice, policy_index):
+ contdevice, headers_out, objdevice, policy):
"""
Sends or saves an async update.
@@ -183,7 +212,7 @@ class ObjectController(BaseStorageServer):
:param headers_out: dictionary of headers to send in the container
request
:param objdevice: device name that the object is in
- :param policy_index: the associated storage policy index
+ :param policy: the associated BaseStoragePolicy instance
"""
headers_out['user-agent'] = 'object-server %s' % os.getpid()
full_path = '/%s/%s/%s' % (account, container, obj)
@@ -213,12 +242,11 @@ class ObjectController(BaseStorageServer):
data = {'op': op, 'account': account, 'container': container,
'obj': obj, 'headers': headers_out}
timestamp = headers_out['x-timestamp']
- self._diskfile_mgr.pickle_async_update(objdevice, account, container,
- obj, data, timestamp,
- policy_index)
+ self._diskfile_router[policy].pickle_async_update(
+ objdevice, account, container, obj, data, timestamp, policy)
def container_update(self, op, account, container, obj, request,
- headers_out, objdevice, policy_idx):
+ headers_out, objdevice, policy):
"""
Update the container when objects are updated.
@@ -230,6 +258,7 @@ class ObjectController(BaseStorageServer):
:param headers_out: dictionary of headers to send in the container
request(s)
:param objdevice: device name that the object is in
+ :param policy: the BaseStoragePolicy instance
"""
headers_in = request.headers
conthosts = [h.strip() for h in
@@ -255,14 +284,14 @@ class ObjectController(BaseStorageServer):
headers_out['x-trans-id'] = headers_in.get('x-trans-id', '-')
headers_out['referer'] = request.as_referer()
- headers_out['X-Backend-Storage-Policy-Index'] = policy_idx
+ headers_out['X-Backend-Storage-Policy-Index'] = int(policy)
for conthost, contdevice in updates:
self.async_update(op, account, container, obj, conthost,
contpartition, contdevice, headers_out,
- objdevice, policy_idx)
+ objdevice, policy)
def delete_at_update(self, op, delete_at, account, container, obj,
- request, objdevice, policy_index):
+ request, objdevice, policy):
"""
Update the expiring objects container when objects are updated.
@@ -273,7 +302,7 @@ class ObjectController(BaseStorageServer):
:param obj: object name
:param request: the original request driving the update
:param objdevice: device name that the object is in
- :param policy_index: the policy index to be used for tmp dir
+ :param policy: the BaseStoragePolicy instance (used for tmp dir)
"""
if config_true_value(
request.headers.get('x-backend-replication', 'f')):
@@ -333,13 +362,66 @@ class ObjectController(BaseStorageServer):
op, self.expiring_objects_account, delete_at_container,
'%s-%s/%s/%s' % (delete_at, account, container, obj),
host, partition, contdevice, headers_out, objdevice,
- policy_index)
+ policy)
+
+ def _make_timeout_reader(self, file_like):
+ def timeout_reader():
+ with ChunkReadTimeout(self.client_timeout):
+ return file_like.read(self.network_chunk_size)
+ return timeout_reader
+
+ def _read_put_commit_message(self, mime_documents_iter):
+ rcvd_commit = False
+ try:
+ with ChunkReadTimeout(self.client_timeout):
+ commit_hdrs, commit_iter = next(mime_documents_iter)
+ if commit_hdrs.get('X-Document', None) == "put commit":
+ rcvd_commit = True
+ drain(commit_iter, self.network_chunk_size, self.client_timeout)
+ except ChunkReadTimeout:
+ raise HTTPClientDisconnect()
+ except StopIteration:
+ raise HTTPBadRequest(body="couldn't find PUT commit MIME doc")
+ return rcvd_commit
+
+ def _read_metadata_footer(self, mime_documents_iter):
+ try:
+ with ChunkReadTimeout(self.client_timeout):
+ footer_hdrs, footer_iter = next(mime_documents_iter)
+ except ChunkReadTimeout:
+ raise HTTPClientDisconnect()
+ except StopIteration:
+ raise HTTPBadRequest(body="couldn't find footer MIME doc")
+
+ timeout_reader = self._make_timeout_reader(footer_iter)
+ try:
+ footer_body = ''.join(iter(timeout_reader, ''))
+ except ChunkReadTimeout:
+ raise HTTPClientDisconnect()
+
+ footer_md5 = footer_hdrs.get('Content-MD5')
+ if not footer_md5:
+ raise HTTPBadRequest(body="no Content-MD5 in footer")
+ if footer_md5 != md5(footer_body).hexdigest():
+ raise HTTPUnprocessableEntity(body="footer MD5 mismatch")
+
+ try:
+ return HeaderKeyDict(json.loads(footer_body))
+ except ValueError:
+ raise HTTPBadRequest("invalid JSON for footer doc")
+
+ def _check_container_override(self, update_headers, metadata):
+ for key, val in metadata.iteritems():
+ override_prefix = 'x-backend-container-update-override-'
+ if key.lower().startswith(override_prefix):
+ override = key.lower().replace(override_prefix, 'x-')
+ update_headers[override] = val
@public
@timing_stats()
def POST(self, request):
"""Handle HTTP POST requests for the Swift Object Server."""
- device, partition, account, container, obj, policy_idx = \
+ device, partition, account, container, obj, policy = \
get_name_and_placement(request, 5, 5, True)
req_timestamp = valid_timestamp(request)
new_delete_at = int(request.headers.get('X-Delete-At') or 0)
@@ -349,7 +431,7 @@ class ObjectController(BaseStorageServer):
try:
disk_file = self.get_diskfile(
device, partition, account, container, obj,
- policy_idx=policy_idx)
+ policy=policy)
except DiskFileDeviceUnavailable:
return HTTPInsufficientStorage(drive=device, request=request)
try:
@@ -374,11 +456,11 @@ class ObjectController(BaseStorageServer):
if orig_delete_at != new_delete_at:
if new_delete_at:
self.delete_at_update('PUT', new_delete_at, account, container,
- obj, request, device, policy_idx)
+ obj, request, device, policy)
if orig_delete_at:
self.delete_at_update('DELETE', orig_delete_at, account,
container, obj, request, device,
- policy_idx)
+ policy)
try:
disk_file.write_metadata(metadata)
except (DiskFileXattrNotSupported, DiskFileNoSpace):
@@ -389,7 +471,7 @@ class ObjectController(BaseStorageServer):
@timing_stats()
def PUT(self, request):
"""Handle HTTP PUT requests for the Swift Object Server."""
- device, partition, account, container, obj, policy_idx = \
+ device, partition, account, container, obj, policy = \
get_name_and_placement(request, 5, 5, True)
req_timestamp = valid_timestamp(request)
error_response = check_object_creation(request, obj)
@@ -404,10 +486,22 @@ class ObjectController(BaseStorageServer):
except ValueError as e:
return HTTPBadRequest(body=str(e), request=request,
content_type='text/plain')
+
+ # In case of multipart-MIME put, the proxy sends a chunked request,
+ # but may let us know the real content length so we can verify that
+ # we have enough disk space to hold the object.
+ if fsize is None:
+ fsize = request.headers.get('X-Backend-Obj-Content-Length')
+ if fsize is not None:
+ try:
+ fsize = int(fsize)
+ except ValueError as e:
+ return HTTPBadRequest(body=str(e), request=request,
+ content_type='text/plain')
try:
disk_file = self.get_diskfile(
device, partition, account, container, obj,
- policy_idx=policy_idx)
+ policy=policy)
except DiskFileDeviceUnavailable:
return HTTPInsufficientStorage(drive=device, request=request)
try:
@@ -439,13 +533,51 @@ class ObjectController(BaseStorageServer):
with disk_file.create(size=fsize) as writer:
upload_size = 0
- def timeout_reader():
- with ChunkReadTimeout(self.client_timeout):
- return request.environ['wsgi.input'].read(
- self.network_chunk_size)
+ # If the proxy wants to send us object metadata after the
+ # object body, it sets some headers. We have to tell the
+ # proxy, in the 100 Continue response, that we're able to
+ # parse a multipart MIME document and extract the object and
+ # metadata from it. If we don't, then the proxy won't
+ # actually send the footer metadata.
+ have_metadata_footer = False
+ use_multiphase_commit = False
+ mime_documents_iter = iter([])
+ obj_input = request.environ['wsgi.input']
+
+ hundred_continue_headers = []
+ if config_true_value(
+ request.headers.get(
+ 'X-Backend-Obj-Multiphase-Commit')):
+ use_multiphase_commit = True
+ hundred_continue_headers.append(
+ ('X-Obj-Multiphase-Commit', 'yes'))
+
+ if config_true_value(
+ request.headers.get('X-Backend-Obj-Metadata-Footer')):
+ have_metadata_footer = True
+ hundred_continue_headers.append(
+ ('X-Obj-Metadata-Footer', 'yes'))
+
+ if have_metadata_footer or use_multiphase_commit:
+ obj_input.set_hundred_continue_response_headers(
+ hundred_continue_headers)
+ mime_boundary = request.headers.get(
+ 'X-Backend-Obj-Multipart-Mime-Boundary')
+ if not mime_boundary:
+ return HTTPBadRequest("no MIME boundary")
+ try:
+ with ChunkReadTimeout(self.client_timeout):
+ mime_documents_iter = iter_mime_headers_and_bodies(
+ request.environ['wsgi.input'],
+ mime_boundary, self.network_chunk_size)
+ _junk_hdrs, obj_input = next(mime_documents_iter)
+ except ChunkReadTimeout:
+ return HTTPRequestTimeout(request=request)
+
+ timeout_reader = self._make_timeout_reader(obj_input)
try:
- for chunk in iter(lambda: timeout_reader(), ''):
+ for chunk in iter(timeout_reader, ''):
start_time = time.time()
if start_time > upload_expiration:
self.logger.increment('PUT.timeouts')
@@ -461,9 +593,16 @@ class ObjectController(BaseStorageServer):
upload_size)
if fsize is not None and fsize != upload_size:
return HTTPClientDisconnect(request=request)
+
+ footer_meta = {}
+ if have_metadata_footer:
+ footer_meta = self._read_metadata_footer(
+ mime_documents_iter)
+
+ request_etag = (footer_meta.get('etag') or
+ request.headers.get('etag', '')).lower()
etag = etag.hexdigest()
- if 'etag' in request.headers and \
- request.headers['etag'].lower() != etag:
+ if request_etag and request_etag != etag:
return HTTPUnprocessableEntity(request=request)
metadata = {
'X-Timestamp': request.timestamp.internal,
@@ -473,6 +612,8 @@ class ObjectController(BaseStorageServer):
}
metadata.update(val for val in request.headers.iteritems()
if is_sys_or_user_meta('object', val[0]))
+ metadata.update(val for val in footer_meta.iteritems()
+ if is_sys_or_user_meta('object', val[0]))
headers_to_copy = (
request.headers.get(
'X-Backend-Replication-Headers', '').split() +
@@ -482,39 +623,63 @@ class ObjectController(BaseStorageServer):
header_caps = header_key.title()
metadata[header_caps] = request.headers[header_key]
writer.put(metadata)
+
+ # if the PUT requires a two-phase commit (a data and a commit
+ # phase) send the proxy server another 100-continue response
+ # to indicate that we are finished writing object data
+ if use_multiphase_commit:
+ request.environ['wsgi.input'].\
+ send_hundred_continue_response()
+ if not self._read_put_commit_message(mime_documents_iter):
+ return HTTPServerError(request=request)
+ # got 2nd phase confirmation, write a timestamp.durable
+ # state file to indicate a successful PUT
+
+ writer.commit(request.timestamp)
+
+ # Drain any remaining MIME docs from the socket. There
+ # shouldn't be any, but we must read the whole request body.
+ try:
+ while True:
+ with ChunkReadTimeout(self.client_timeout):
+ _junk_hdrs, _junk_body = next(mime_documents_iter)
+ drain(_junk_body, self.network_chunk_size,
+ self.client_timeout)
+ except ChunkReadTimeout:
+ raise HTTPClientDisconnect()
+ except StopIteration:
+ pass
+
except (DiskFileXattrNotSupported, DiskFileNoSpace):
return HTTPInsufficientStorage(drive=device, request=request)
if orig_delete_at != new_delete_at:
if new_delete_at:
self.delete_at_update(
'PUT', new_delete_at, account, container, obj, request,
- device, policy_idx)
+ device, policy)
if orig_delete_at:
self.delete_at_update(
'DELETE', orig_delete_at, account, container, obj,
- request, device, policy_idx)
+ request, device, policy)
update_headers = HeaderKeyDict({
'x-size': metadata['Content-Length'],
'x-content-type': metadata['Content-Type'],
'x-timestamp': metadata['X-Timestamp'],
'x-etag': metadata['ETag']})
# apply any container update header overrides sent with request
- for key, val in request.headers.iteritems():
- override_prefix = 'x-backend-container-update-override-'
- if key.lower().startswith(override_prefix):
- override = key.lower().replace(override_prefix, 'x-')
- update_headers[override] = val
+ self._check_container_override(update_headers, request.headers)
+ self._check_container_override(update_headers, footer_meta)
self.container_update(
'PUT', account, container, obj, request,
update_headers,
- device, policy_idx)
+ device, policy)
return HTTPCreated(request=request, etag=etag)
@public
@timing_stats()
def GET(self, request):
"""Handle HTTP GET requests for the Swift Object Server."""
- device, partition, account, container, obj, policy_idx = \
+ device, partition, account, container, obj, policy = \
get_name_and_placement(request, 5, 5, True)
keep_cache = self.keep_cache_private or (
'X-Auth-Token' not in request.headers and
@@ -522,7 +687,7 @@ class ObjectController(BaseStorageServer):
try:
disk_file = self.get_diskfile(
device, partition, account, container, obj,
- policy_idx=policy_idx)
+ policy=policy)
except DiskFileDeviceUnavailable:
return HTTPInsufficientStorage(drive=device, request=request)
try:
@@ -533,9 +698,14 @@ class ObjectController(BaseStorageServer):
keep_cache = (self.keep_cache_private or
('X-Auth-Token' not in request.headers and
'X-Storage-Token' not in request.headers))
+ conditional_etag = None
+ if 'X-Backend-Etag-Is-At' in request.headers:
+ conditional_etag = metadata.get(
+ request.headers['X-Backend-Etag-Is-At'])
response = Response(
app_iter=disk_file.reader(keep_cache=keep_cache),
- request=request, conditional_response=True)
+ request=request, conditional_response=True,
+ conditional_etag=conditional_etag)
response.headers['Content-Type'] = metadata.get(
'Content-Type', 'application/octet-stream')
for key, value in metadata.iteritems():
@@ -567,12 +737,12 @@ class ObjectController(BaseStorageServer):
@timing_stats(sample_rate=0.8)
def HEAD(self, request):
"""Handle HTTP HEAD requests for the Swift Object Server."""
- device, partition, account, container, obj, policy_idx = \
+ device, partition, account, container, obj, policy = \
get_name_and_placement(request, 5, 5, True)
try:
disk_file = self.get_diskfile(
device, partition, account, container, obj,
- policy_idx=policy_idx)
+ policy=policy)
except DiskFileDeviceUnavailable:
return HTTPInsufficientStorage(drive=device, request=request)
try:
@@ -585,7 +755,12 @@ class ObjectController(BaseStorageServer):
headers['X-Backend-Timestamp'] = e.timestamp.internal
return HTTPNotFound(request=request, headers=headers,
conditional_response=True)
- response = Response(request=request, conditional_response=True)
+ conditional_etag = None
+ if 'X-Backend-Etag-Is-At' in request.headers:
+ conditional_etag = metadata.get(
+ request.headers['X-Backend-Etag-Is-At'])
+ response = Response(request=request, conditional_response=True,
+ conditional_etag=conditional_etag)
response.headers['Content-Type'] = metadata.get(
'Content-Type', 'application/octet-stream')
for key, value in metadata.iteritems():
@@ -609,13 +784,13 @@ class ObjectController(BaseStorageServer):
@timing_stats()
def DELETE(self, request):
"""Handle HTTP DELETE requests for the Swift Object Server."""
- device, partition, account, container, obj, policy_idx = \
+ device, partition, account, container, obj, policy = \
get_name_and_placement(request, 5, 5, True)
req_timestamp = valid_timestamp(request)
try:
disk_file = self.get_diskfile(
device, partition, account, container, obj,
- policy_idx=policy_idx)
+ policy=policy)
except DiskFileDeviceUnavailable:
return HTTPInsufficientStorage(drive=device, request=request)
try:
@@ -667,13 +842,13 @@ class ObjectController(BaseStorageServer):
if orig_delete_at:
self.delete_at_update('DELETE', orig_delete_at, account,
container, obj, request, device,
- policy_idx)
+ policy)
if orig_timestamp < req_timestamp:
disk_file.delete(req_timestamp)
self.container_update(
'DELETE', account, container, obj, request,
HeaderKeyDict({'x-timestamp': req_timestamp.internal}),
- device, policy_idx)
+ device, policy)
return response_class(
request=request,
headers={'X-Backend-Timestamp': response_timestamp.internal})
@@ -694,7 +869,7 @@ class ObjectController(BaseStorageServer):
get_name_and_placement(request, 2, 3, True)
suffixes = suffix_parts.split('-') if suffix_parts else []
try:
- hashes = self._diskfile_mgr.get_hashes(
+ hashes = self._diskfile_router[policy].get_hashes(
device, partition, suffixes, policy)
except DiskFileDeviceUnavailable:
resp = HTTPInsufficientStorage(drive=device, request=request)
diff --git a/swift/obj/ssync_receiver.py b/swift/obj/ssync_receiver.py
index 248715d00..99495cd48 100644
--- a/swift/obj/ssync_receiver.py
+++ b/swift/obj/ssync_receiver.py
@@ -24,6 +24,7 @@ from swift.common import exceptions
from swift.common import http
from swift.common import swob
from swift.common import utils
+from swift.common import request_helpers
class Receiver(object):
@@ -98,7 +99,7 @@ class Receiver(object):
if not self.app.replication_semaphore.acquire(False):
raise swob.HTTPServiceUnavailable()
try:
- with self.app._diskfile_mgr.replication_lock(self.device):
+ with self.diskfile_mgr.replication_lock(self.device):
for data in self.missing_check():
yield data
for data in self.updates():
@@ -166,14 +167,14 @@ class Receiver(object):
"""
# The following is the setting we talk about above in _ensure_flush.
self.request.environ['eventlet.minimum_write_chunk_size'] = 0
- self.device, self.partition = utils.split_path(
- urllib.unquote(self.request.path), 2, 2, False)
+ self.device, self.partition, self.policy = \
+ request_helpers.get_name_and_placement(self.request, 2, 2, False)
self.policy_idx = \
int(self.request.headers.get('X-Backend-Storage-Policy-Index', 0))
utils.validate_device_partition(self.device, self.partition)
- if self.app._diskfile_mgr.mount_check and \
- not constraints.check_mount(
- self.app._diskfile_mgr.devices, self.device):
+ self.diskfile_mgr = self.app._diskfile_router[self.policy]
+ if self.diskfile_mgr.mount_check and not constraints.check_mount(
+ self.diskfile_mgr.devices, self.device):
raise swob.HTTPInsufficientStorage(drive=self.device)
self.fp = self.request.environ['wsgi.input']
for data in self._ensure_flush():
@@ -229,8 +230,8 @@ class Receiver(object):
object_hash, timestamp = [urllib.unquote(v) for v in line.split()]
want = False
try:
- df = self.app._diskfile_mgr.get_diskfile_from_hash(
- self.device, self.partition, object_hash, self.policy_idx)
+ df = self.diskfile_mgr.get_diskfile_from_hash(
+ self.device, self.partition, object_hash, self.policy)
except exceptions.DiskFileNotExist:
want = True
else:
diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py
index 70b0d0cf6..5407c0e73 100644
--- a/swift/proxy/controllers/obj.py
+++ b/swift/proxy/controllers/obj.py
@@ -882,6 +882,9 @@ class ObjectController(Controller):
req, delete_at_container, delete_at_part, \
delete_at_nodes = self._config_obj_expiration(req)
+ # XXX hack for PUT to EC until the proxy learns how to encode
+ req.headers['X-Object-Sysmeta-Ec-Archive-Index'] = 0
+
# add special headers to be handled by storage nodes
outgoing_headers = self._backend_requests(
req, len(nodes), container_partition, container_nodes,
diff --git a/test/unit/common/test_request_helpers.py b/test/unit/common/test_request_helpers.py
index c87a39979..d2dc02c48 100644
--- a/test/unit/common/test_request_helpers.py
+++ b/test/unit/common/test_request_helpers.py
@@ -16,10 +16,13 @@
"""Tests for swift.common.request_helpers"""
import unittest
-from swift.common.swob import Request
+from swift.common.swob import Request, HTTPException
+from swift.common.storage_policy import POLICIES, EC_POLICY, REPL_POLICY
from swift.common.request_helpers import is_sys_meta, is_user_meta, \
is_sys_or_user_meta, strip_sys_meta_prefix, strip_user_meta_prefix, \
- remove_items, copy_header_subset
+ remove_items, copy_header_subset, get_name_and_placement
+
+from test.unit import patch_policies
server_types = ['account', 'container', 'object']
@@ -81,3 +84,77 @@ class TestRequestHelpers(unittest.TestCase):
self.assertEqual(to_req.headers['A'], 'b')
self.assertFalse('c' in to_req.headers)
self.assertFalse('C' in to_req.headers)
+
+ @patch_policies(with_ec_default=True)
+ def test_get_name_and_placement_object_req(self):
+ path = '/device/part/account/container/object'
+ req = Request.blank(path, headers={
+ 'X-Backend-Storage-Policy-Index': '0'})
+ device, part, account, container, obj, policy = \
+ get_name_and_placement(req, 5, 5, True)
+ self.assertEqual(device, 'device')
+ self.assertEqual(part, 'part')
+ self.assertEqual(account, 'account')
+ self.assertEqual(container, 'container')
+ self.assertEqual(obj, 'object')
+ self.assertEqual(policy, POLICIES[0])
+ self.assertEqual(policy.policy_type, EC_POLICY)
+
+ req.headers['X-Backend-Storage-Policy-Index'] = 1
+ device, part, account, container, obj, policy = \
+ get_name_and_placement(req, 5, 5, True)
+ self.assertEqual(device, 'device')
+ self.assertEqual(part, 'part')
+ self.assertEqual(account, 'account')
+ self.assertEqual(container, 'container')
+ self.assertEqual(obj, 'object')
+ self.assertEqual(policy, POLICIES[1])
+ self.assertEqual(policy.policy_type, REPL_POLICY)
+
+ req.headers['X-Backend-Storage-Policy-Index'] = 'foo'
+ try:
+ device, part, account, container, obj, policy = \
+ get_name_and_placement(req, 5, 5, True)
+ except HTTPException as e:
+ self.assertEqual(e.status_int, 503)
+ self.assertEqual(str(e), '503 Service Unavailable')
+ self.assertEqual(e.body, "No policy with index foo")
+ else:
+ self.fail('get_name_and_placement did not raise error '
+ 'for invalid storage policy index')
+
+ @patch_policies(with_ec_default=True)
+ def test_get_name_and_placement_object_replication(self):
+ # yup, suffixes are sent '-'.joined in the path
+ path = '/device/part/012-345-678-9ab-cde'
+ req = Request.blank(path, headers={
+ 'X-Backend-Storage-Policy-Index': '0'})
+ device, partition, suffix_parts, policy = \
+ get_name_and_placement(req, 2, 3, True)
+ self.assertEqual(device, 'device')
+ self.assertEqual(partition, 'part')
+ self.assertEqual(suffix_parts, '012-345-678-9ab-cde')
+ self.assertEqual(policy, POLICIES[0])
+ self.assertEqual(policy.policy_type, EC_POLICY)
+
+ path = '/device/part'
+ req = Request.blank(path, headers={
+ 'X-Backend-Storage-Policy-Index': '1'})
+ device, partition, suffix_parts, policy = \
+ get_name_and_placement(req, 2, 3, True)
+ self.assertEqual(device, 'device')
+ self.assertEqual(partition, 'part')
+ self.assertEqual(suffix_parts, None) # false-y
+ self.assertEqual(policy, POLICIES[1])
+ self.assertEqual(policy.policy_type, REPL_POLICY)
+
+ path = '/device/part/' # with a trailing slash
+ req = Request.blank(path, headers={
+ 'X-Backend-Storage-Policy-Index': '1'})
+ device, partition, suffix_parts, policy = \
+ get_name_and_placement(req, 2, 3, True)
+ self.assertEqual(device, 'device')
+ self.assertEqual(partition, 'part')
+ self.assertEqual(suffix_parts, '') # still false-y
+ self.assertEqual(policy, POLICIES[1])
+ self.assertEqual(policy.policy_type, REPL_POLICY)
diff --git a/test/unit/common/test_swob.py b/test/unit/common/test_swob.py
index fffb33ecf..7015abb8e 100644
--- a/test/unit/common/test_swob.py
+++ b/test/unit/common/test_swob.py
@@ -1553,6 +1553,17 @@ class TestConditionalIfMatch(unittest.TestCase):
self.assertEquals(resp.status_int, 200)
self.assertEquals(body, 'hi')
+ def test_simple_conditional_etag_match(self):
+ # if etag matches, proceed as normal
+ req = swift.common.swob.Request.blank(
+ '/', headers={'If-Match': 'not-the-etag'})
+ resp = req.get_response(self.fake_app)
+ resp.conditional_response = True
+ resp._conditional_etag = 'not-the-etag'
+ body = ''.join(resp(req.environ, self.fake_start_response))
+ self.assertEquals(resp.status_int, 200)
+ self.assertEquals(body, 'hi')
+
def test_quoted_simple_match(self):
# double quotes or not, doesn't matter
req = swift.common.swob.Request.blank(
@@ -1573,6 +1584,16 @@ class TestConditionalIfMatch(unittest.TestCase):
self.assertEquals(resp.status_int, 412)
self.assertEquals(body, '')
+ def test_simple_conditional_etag_no_match(self):
+ req = swift.common.swob.Request.blank(
+ '/', headers={'If-Match': 'the-etag'})
+ resp = req.get_response(self.fake_app)
+ resp.conditional_response = True
+ resp._conditional_etag = 'not-the-etag'
+ body = ''.join(resp(req.environ, self.fake_start_response))
+ self.assertEquals(resp.status_int, 412)
+ self.assertEquals(body, '')
+
def test_match_star(self):
# "*" means match anything; see RFC 2616 section 14.24
req = swift.common.swob.Request.blank(
diff --git a/test/unit/obj/test_server.py b/test/unit/obj/test_server.py
index ee0d364c9..cfb9fa281 100755
--- a/test/unit/obj/test_server.py
+++ b/test/unit/obj/test_server.py
@@ -18,6 +18,7 @@
import cPickle as pickle
import datetime
+import json
import errno
import operator
import os
@@ -39,17 +40,19 @@ from eventlet.green import httplib
from nose import SkipTest
from swift import __version__ as swift_version
+from swift.common.http import is_success
from test.unit import FakeLogger, debug_logger, mocked_http_conn
from test.unit import connect_tcp, readuntil2crlfs, patch_policies
from swift.obj import server as object_server
from swift.obj import diskfile
-from swift.common import utils, storage_policy, bufferedhttp
+from swift.common import utils, bufferedhttp
from swift.common.utils import hash_path, mkdirs, normalize_timestamp, \
NullLogger, storage_directory, public, replication
from swift.common import constraints
-from swift.common.swob import Request, HeaderKeyDict
+from swift.common.swob import Request, HeaderKeyDict, WsgiStringIO
from swift.common.splice import splice
-from swift.common.storage_policy import POLICIES
+from swift.common.storage_policy import (StoragePolicy, ECStoragePolicy,
+ POLICIES, EC_POLICY)
from swift.common.exceptions import DiskFileDeviceUnavailable
@@ -57,7 +60,14 @@ def mock_time(*args, **kwargs):
return 5000.0
-@patch_policies
+test_policies = [
+ StoragePolicy(0, name='zero', is_default=True),
+ ECStoragePolicy(1, name='one', ec_type='jerasure_rs_vand',
+ ec_ndata=10, ec_nparity=4),
+]
+
+
+@patch_policies(test_policies)
class TestObjectController(unittest.TestCase):
"""Test swift.obj.server.ObjectController"""
@@ -733,6 +743,241 @@ class TestObjectController(unittest.TestCase):
'X-Object-Meta-1': 'One',
'X-Object-Meta-Two': 'Two'})
+ def test_PUT_etag_in_footer(self):
+ timestamp = normalize_timestamp(time())
+ req = Request.blank(
+ '/sda1/p/a/c/o',
+ headers={'X-Timestamp': timestamp,
+ 'Content-Type': 'text/plain',
+ 'Transfer-Encoding': 'chunked',
+ 'Etag': 'other-etag',
+ 'X-Backend-Obj-Metadata-Footer': 'yes',
+ 'X-Backend-Obj-Multipart-Mime-Boundary': 'boundary'},
+ environ={'REQUEST_METHOD': 'PUT'})
+
+ obj_etag = md5("obj data").hexdigest()
+ footer_meta = json.dumps({"Etag": obj_etag})
+ footer_meta_cksum = md5(footer_meta).hexdigest()
+
+ req.body = "\r\n".join((
+ "--boundary",
+ "",
+ "obj data",
+ "--boundary",
+ "Content-MD5: " + footer_meta_cksum,
+ "",
+ footer_meta,
+ "--boundary--",
+ ))
+ req.headers.pop("Content-Length", None)
+
+ resp = req.get_response(self.object_controller)
+ self.assertEqual(resp.etag, obj_etag)
+ self.assertEqual(resp.status_int, 201)
+
+ objfile = os.path.join(
+ self.testdir, 'sda1',
+ storage_directory(diskfile.get_data_dir(POLICIES[0]), 'p',
+ hash_path('a', 'c', 'o')),
+ utils.Timestamp(timestamp).internal + '.data')
+ with open(objfile) as fh:
+ self.assertEqual(fh.read(), "obj data")
+
+ def test_PUT_etag_in_footer_mismatch(self):
+ timestamp = normalize_timestamp(time())
+ req = Request.blank(
+ '/sda1/p/a/c/o',
+ headers={'X-Timestamp': timestamp,
+ 'Content-Type': 'text/plain',
+ 'Transfer-Encoding': 'chunked',
+ 'X-Backend-Obj-Metadata-Footer': 'yes',
+ 'X-Backend-Obj-Multipart-Mime-Boundary': 'boundary'},
+ environ={'REQUEST_METHOD': 'PUT'})
+
+ footer_meta = json.dumps({"Etag": md5("green").hexdigest()})
+ footer_meta_cksum = md5(footer_meta).hexdigest()
+
+ req.body = "\r\n".join((
+ "--boundary",
+ "",
+ "blue",
+ "--boundary",
+ "Content-MD5: " + footer_meta_cksum,
+ "",
+ footer_meta,
+ "--boundary--",
+ ))
+ req.headers.pop("Content-Length", None)
+
+ resp = req.get_response(self.object_controller)
+ self.assertEqual(resp.status_int, 422)
+
+ def test_PUT_meta_in_footer(self):
+ timestamp = normalize_timestamp(time())
+ req = Request.blank(
+ '/sda1/p/a/c/o',
+ headers={'X-Timestamp': timestamp,
+ 'Content-Type': 'text/plain',
+ 'Transfer-Encoding': 'chunked',
+ 'X-Object-Meta-X': 'Z',
+ 'X-Object-Sysmeta-X': 'Z',
+ 'X-Backend-Obj-Metadata-Footer': 'yes',
+ 'X-Backend-Obj-Multipart-Mime-Boundary': 'boundary'},
+ environ={'REQUEST_METHOD': 'PUT'})
+
+ footer_meta = json.dumps({
+ 'X-Object-Meta-X': 'Y',
+ 'X-Object-Sysmeta-X': 'Y',
+ })
+ footer_meta_cksum = md5(footer_meta).hexdigest()
+
+ req.body = "\r\n".join((
+ "--boundary",
+ "",
+ "stuff stuff stuff",
+ "--boundary",
+ "Content-MD5: " + footer_meta_cksum,
+ "",
+ footer_meta,
+ "--boundary--",
+ ))
+ req.headers.pop("Content-Length", None)
+
+ resp = req.get_response(self.object_controller)
+ self.assertEqual(resp.status_int, 201)
+
+ timestamp = normalize_timestamp(time())
+ req = Request.blank(
+ '/sda1/p/a/c/o',
+ headers={'X-Timestamp': timestamp},
+ environ={'REQUEST_METHOD': 'HEAD'})
+ resp = req.get_response(self.object_controller)
+ self.assertEqual(resp.headers.get('X-Object-Meta-X'), 'Y')
+ self.assertEqual(resp.headers.get('X-Object-Sysmeta-X'), 'Y')
+
+ def test_PUT_missing_footer_checksum(self):
+ timestamp = normalize_timestamp(time())
+ req = Request.blank(
+ '/sda1/p/a/c/o',
+ headers={'X-Timestamp': timestamp,
+ 'Content-Type': 'text/plain',
+ 'Transfer-Encoding': 'chunked',
+ 'X-Backend-Obj-Metadata-Footer': 'yes',
+ 'X-Backend-Obj-Multipart-Mime-Boundary': 'boundary'},
+ environ={'REQUEST_METHOD': 'PUT'})
+
+ footer_meta = json.dumps({"Etag": md5("obj data").hexdigest()})
+
+ req.body = "\r\n".join((
+ "--boundary",
+ "",
+ "obj data",
+ "--boundary",
+ # no Content-MD5
+ "",
+ footer_meta,
+ "--boundary--",
+ ))
+ req.headers.pop("Content-Length", None)
+
+ resp = req.get_response(self.object_controller)
+ self.assertEqual(resp.status_int, 400)
+
+ def test_PUT_bad_footer_checksum(self):
+ timestamp = normalize_timestamp(time())
+ req = Request.blank(
+ '/sda1/p/a/c/o',
+ headers={'X-Timestamp': timestamp,
+ 'Content-Type': 'text/plain',
+ 'Transfer-Encoding': 'chunked',
+ 'X-Backend-Obj-Metadata-Footer': 'yes',
+ 'X-Backend-Obj-Multipart-Mime-Boundary': 'boundary'},
+ environ={'REQUEST_METHOD': 'PUT'})
+
+ footer_meta = json.dumps({"Etag": md5("obj data").hexdigest()})
+ bad_footer_meta_cksum = md5(footer_meta + "bad").hexdigest()
+
+ req.body = "\r\n".join((
+ "--boundary",
+ "",
+ "obj data",
+ "--boundary",
+ "Content-MD5: " + bad_footer_meta_cksum,
+ "",
+ footer_meta,
+ "--boundary--",
+ ))
+ req.headers.pop("Content-Length", None)
+
+ resp = req.get_response(self.object_controller)
+ self.assertEqual(resp.status_int, 422)
+
+ def test_PUT_bad_footer_json(self):
+ timestamp = normalize_timestamp(time())
+ req = Request.blank(
+ '/sda1/p/a/c/o',
+ headers={'X-Timestamp': timestamp,
+ 'Content-Type': 'text/plain',
+ 'Transfer-Encoding': 'chunked',
+ 'X-Backend-Obj-Metadata-Footer': 'yes',
+ 'X-Backend-Obj-Multipart-Mime-Boundary': 'boundary'},
+ environ={'REQUEST_METHOD': 'PUT'})
+
+ footer_meta = "{{{[[{{[{[[{[{[[{{{[{{{{[[{{[{["
+ footer_meta_cksum = md5(footer_meta).hexdigest()
+
+ req.body = "\r\n".join((
+ "--boundary",
+ "",
+ "obj data",
+ "--boundary",
+ "Content-MD5: " + footer_meta_cksum,
+ "",
+ footer_meta,
+ "--boundary--",
+ ))
+ req.headers.pop("Content-Length", None)
+
+ resp = req.get_response(self.object_controller)
+ self.assertEqual(resp.status_int, 400)
+
+ def test_PUT_extra_mime_docs_ignored(self):
+ timestamp = normalize_timestamp(time())
+ req = Request.blank(
+ '/sda1/p/a/c/o',
+ headers={'X-Timestamp': timestamp,
+ 'Content-Type': 'text/plain',
+ 'Transfer-Encoding': 'chunked',
+ 'X-Backend-Obj-Metadata-Footer': 'yes',
+ 'X-Backend-Obj-Multipart-Mime-Boundary': 'boundary'},
+ environ={'REQUEST_METHOD': 'PUT'})
+
+ footer_meta = json.dumps({'X-Object-Meta-Mint': 'pepper'})
+ footer_meta_cksum = md5(footer_meta).hexdigest()
+
+ req.body = "\r\n".join((
+ "--boundary",
+ "",
+ "obj data",
+ "--boundary",
+ "Content-MD5: " + footer_meta_cksum,
+ "",
+ footer_meta,
+ "--boundary",
+ "This-Document-Is-Useless: yes",
+ "",
+ "blah blah I take up space",
+ "--boundary--"
+ ))
+ req.headers.pop("Content-Length", None)
+
+ resp = req.get_response(self.object_controller)
+ self.assertEqual(resp.status_int, 201)
+
+ # swob made this into a StringIO for us
+ wsgi_input = req.environ['wsgi.input']
+ self.assertEqual(wsgi_input.tell(), len(wsgi_input.getvalue()))
+
def test_PUT_user_metadata_no_xattr(self):
timestamp = normalize_timestamp(time())
req = Request.blank(
@@ -772,7 +1017,7 @@ class TestObjectController(unittest.TestCase):
headers={'X-Timestamp': timestamp,
'Content-Type': 'text/plain',
'Content-Length': '6'})
- req.environ['wsgi.input'] = StringIO('VERIFY')
+ req.environ['wsgi.input'] = WsgiStringIO('VERIFY')
resp = req.get_response(self.object_controller)
self.assertEquals(resp.status_int, 408)
@@ -1021,6 +1266,40 @@ class TestObjectController(unittest.TestCase):
finally:
object_server.http_connect = old_http_connect
+ def test_PUT_durable_files(self):
+ for policy in POLICIES:
+ timestamp = utils.Timestamp(int(time())).internal
+ data_file_tail = '.data'
+ headers = {'X-Timestamp': timestamp,
+ 'Content-Length': '6',
+ 'Content-Type': 'application/octet-stream',
+ 'X-Backend-Storage-Policy-Index': int(policy)}
+ if policy.policy_type == EC_POLICY:
+ headers['X-Object-Sysmeta-Ec-Frag-Index'] = '2'
+ data_file_tail = '#2.data'
+ req = Request.blank(
+ '/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
+ headers=headers)
+ req.body = 'VERIFY'
+ resp = req.get_response(self.object_controller)
+
+ self.assertEquals(resp.status_int, 201)
+ obj_dir = os.path.join(
+ self.testdir, 'sda1',
+ storage_directory(diskfile.get_data_dir(int(policy)),
+ 'p', hash_path('a', 'c', 'o')))
+ data_file = os.path.join(obj_dir, timestamp) + data_file_tail
+ self.assertTrue(os.path.isfile(data_file),
+ 'Expected file %r not found in %r for policy %r'
+ % (data_file, os.listdir(obj_dir), int(policy)))
+ durable_file = os.path.join(obj_dir, timestamp) + '.durable'
+ if policy.policy_type == EC_POLICY:
+ self.assertTrue(os.path.isfile(durable_file))
+ self.assertFalse(os.path.getsize(durable_file))
+ else:
+ self.assertFalse(os.path.isfile(durable_file))
+ rmtree(obj_dir)
+
def test_HEAD(self):
# Test swift.obj.server.ObjectController.HEAD
req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'HEAD'})
@@ -1295,6 +1574,58 @@ class TestObjectController(unittest.TestCase):
resp = req.get_response(self.object_controller)
self.assertEquals(resp.status_int, 412)
+ def test_GET_if_match_etag_is_at(self):
+ headers = {
+ 'X-Timestamp': utils.Timestamp(time()).internal,
+ 'Content-Type': 'application/octet-stream',
+ 'X-Object-Meta-Xtag': 'madeup',
+ }
+ req = Request.blank('/sda1/p/a/c/o', method='PUT',
+ headers=headers)
+ req.body = 'test'
+ resp = req.get_response(self.object_controller)
+ self.assertEquals(resp.status_int, 201)
+ real_etag = resp.etag
+
+ # match x-backend-etag-is-at
+ req = Request.blank('/sda1/p/a/c/o', headers={
+ 'If-Match': 'madeup',
+ 'X-Backend-Etag-Is-At': 'X-Object-Meta-Xtag'})
+ resp = req.get_response(self.object_controller)
+ self.assertEqual(resp.status_int, 200)
+
+ # no match x-backend-etag-is-at
+ req = Request.blank('/sda1/p/a/c/o', headers={
+ 'If-Match': real_etag,
+ 'X-Backend-Etag-Is-At': 'X-Object-Meta-Xtag'})
+ resp = req.get_response(self.object_controller)
+ self.assertEqual(resp.status_int, 412)
+
+ # etag-is-at metadata doesn't exist, default to real etag
+ req = Request.blank('/sda1/p/a/c/o', headers={
+ 'If-Match': real_etag,
+ 'X-Backend-Etag-Is-At': 'X-Object-Meta-Missing'})
+ resp = req.get_response(self.object_controller)
+ self.assertEqual(resp.status_int, 200)
+
+ # sanity no-match with no etag-is-at
+ req = Request.blank('/sda1/p/a/c/o', headers={
+ 'If-Match': 'madeup'})
+ resp = req.get_response(self.object_controller)
+ self.assertEqual(resp.status_int, 412)
+
+ # sanity match with no etag-is-at
+ req = Request.blank('/sda1/p/a/c/o', headers={
+ 'If-Match': real_etag})
+ resp = req.get_response(self.object_controller)
+ self.assertEqual(resp.status_int, 200)
+
+ # sanity with no if-match
+ req = Request.blank('/sda1/p/a/c/o', headers={
+ 'X-Backend-Etag-Is-At': 'X-Object-Meta-Xtag'})
+ resp = req.get_response(self.object_controller)
+ self.assertEqual(resp.status_int, 200)
+
def test_HEAD_if_match(self):
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={
@@ -2191,7 +2522,7 @@ class TestObjectController(unittest.TestCase):
def test_call_bad_request(self):
# Test swift.obj.server.ObjectController.__call__
- inbuf = StringIO()
+ inbuf = WsgiStringIO()
errbuf = StringIO()
outbuf = StringIO()
@@ -2218,7 +2549,7 @@ class TestObjectController(unittest.TestCase):
self.assertEquals(outbuf.getvalue()[:4], '400 ')
def test_call_not_found(self):
- inbuf = StringIO()
+ inbuf = WsgiStringIO()
errbuf = StringIO()
outbuf = StringIO()
@@ -2245,7 +2576,7 @@ class TestObjectController(unittest.TestCase):
self.assertEquals(outbuf.getvalue()[:4], '404 ')
def test_call_bad_method(self):
- inbuf = StringIO()
+ inbuf = WsgiStringIO()
errbuf = StringIO()
outbuf = StringIO()
@@ -2281,7 +2612,7 @@ class TestObjectController(unittest.TestCase):
with mock.patch("swift.obj.diskfile.hash_path", my_hash_path):
with mock.patch("swift.obj.server.check_object_creation",
my_check):
- inbuf = StringIO()
+ inbuf = WsgiStringIO()
errbuf = StringIO()
outbuf = StringIO()
@@ -2310,7 +2641,7 @@ class TestObjectController(unittest.TestCase):
self.assertEquals(errbuf.getvalue(), '')
self.assertEquals(outbuf.getvalue()[:4], '201 ')
- inbuf = StringIO()
+ inbuf = WsgiStringIO()
errbuf = StringIO()
outbuf = StringIO()
@@ -2461,6 +2792,9 @@ class TestObjectController(unittest.TestCase):
return ' '
return ''
+ def set_hundred_continue_response_headers(*a, **kw):
+ pass
+
req = Request.blank(
'/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'PUT', 'wsgi.input': SlowBody()},
@@ -2490,6 +2824,9 @@ class TestObjectController(unittest.TestCase):
return ' '
return ''
+ def set_hundred_continue_response_headers(*a, **kw):
+ pass
+
req = Request.blank(
'/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'PUT', 'wsgi.input': ShortBody()},
@@ -2572,10 +2909,13 @@ class TestObjectController(unittest.TestCase):
'user-agent': 'object-server %s' % os.getpid(),
'X-Backend-Storage-Policy-Index': int(policy)}])
- @patch_policies([storage_policy.StoragePolicy(0, 'zero', True),
- storage_policy.StoragePolicy(1, 'one'),
- storage_policy.StoragePolicy(37, 'fantastico')])
+ @patch_policies([StoragePolicy(0, 'zero', True),
+ StoragePolicy(1, 'one'),
+ StoragePolicy(37, 'fantastico')])
def test_updating_multiple_delete_at_container_servers(self):
+ # update router post patch
+ self.object_controller._diskfile_router = diskfile.DiskFileRouter(
+ self.conf, self.object_controller.logger)
policy = random.choice(list(POLICIES))
self.object_controller.expiring_objects_account = 'exp'
self.object_controller.expiring_objects_container_divisor = 60
@@ -2691,10 +3031,13 @@ class TestObjectController(unittest.TestCase):
'X-Backend-Storage-Policy-Index': 0,
'x-trans-id': '-'})})
- @patch_policies([storage_policy.StoragePolicy(0, 'zero', True),
- storage_policy.StoragePolicy(1, 'one'),
- storage_policy.StoragePolicy(26, 'twice-thirteen')])
+ @patch_policies([StoragePolicy(0, 'zero', True),
+ StoragePolicy(1, 'one'),
+ StoragePolicy(26, 'twice-thirteen')])
def test_updating_multiple_container_servers(self):
+ # update router post patch
+ self.object_controller._diskfile_router = diskfile.DiskFileRouter(
+ self.conf, self.object_controller.logger)
http_connect_args = []
def fake_http_connect(ipaddr, port, device, partition, method, path,
@@ -2807,6 +3150,8 @@ class TestObjectController(unittest.TestCase):
'X-Delete-At-Host': '10.0.0.2:6002',
'X-Delete-At-Device': 'sda1',
'X-Backend-Storage-Policy-Index': int(policy)}
+ if policy.policy_type == EC_POLICY:
+ headers['X-Object-Sysmeta-Ec-Frag-Index'] = '2'
req = Request.blank(
'/sda1/p/a/c/o', method='PUT', body='', headers=headers)
with mocked_http_conn(
@@ -3010,6 +3355,7 @@ class TestObjectController(unittest.TestCase):
utils.HASH_PATH_PREFIX = _prefix
def test_container_update_no_async_update(self):
+ policy = random.choice(list(POLICIES))
given_args = []
def fake_async_update(*args):
@@ -3020,12 +3366,13 @@ class TestObjectController(unittest.TestCase):
'/v1/a/c/o',
environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': 1,
- 'X-Trans-Id': '1234'})
+ 'X-Trans-Id': '1234',
+ 'X-Backend-Storage-Policy-Index': int(policy)})
self.object_controller.container_update(
'PUT', 'a', 'c', 'o', req, {
'x-size': '0', 'x-etag': 'd41d8cd98f00b204e9800998ecf8427e',
'x-content-type': 'text/plain', 'x-timestamp': '1'},
- 'sda1', 0)
+ 'sda1', policy)
self.assertEquals(given_args, [])
def test_container_update_success(self):
@@ -3107,6 +3454,7 @@ class TestObjectController(unittest.TestCase):
'x-foo': 'bar'}))
def test_container_update_async(self):
+ policy = random.choice(list(POLICIES))
req = Request.blank(
'/sda1/0/a/c/o',
environ={'REQUEST_METHOD': 'PUT'},
@@ -3115,26 +3463,28 @@ class TestObjectController(unittest.TestCase):
'X-Container-Host': 'chost:cport',
'X-Container-Partition': 'cpartition',
'X-Container-Device': 'cdevice',
- 'Content-Type': 'text/plain'}, body='')
+ 'Content-Type': 'text/plain',
+ 'X-Object-Sysmeta-Ec-Frag-Index': 0,
+ 'X-Backend-Storage-Policy-Index': int(policy)}, body='')
given_args = []
def fake_pickle_async_update(*args):
given_args[:] = args
- self.object_controller._diskfile_mgr.pickle_async_update = \
- fake_pickle_async_update
+ diskfile_mgr = self.object_controller._diskfile_router[policy]
+ diskfile_mgr.pickle_async_update = fake_pickle_async_update
with mocked_http_conn(500) as fake_conn:
resp = req.get_response(self.object_controller)
self.assertRaises(StopIteration, fake_conn.code_iter.next)
self.assertEqual(resp.status_int, 201)
self.assertEqual(len(given_args), 7)
(objdevice, account, container, obj, data, timestamp,
- policy_index) = given_args
+ policy) = given_args
self.assertEqual(objdevice, 'sda1')
self.assertEqual(account, 'a')
self.assertEqual(container, 'c')
self.assertEqual(obj, 'o')
self.assertEqual(timestamp, utils.Timestamp(1).internal)
- self.assertEqual(policy_index, 0)
+ self.assertEqual(policy, policy)
self.assertEqual(data, {
'headers': HeaderKeyDict({
'X-Size': '0',
@@ -3143,7 +3493,7 @@ class TestObjectController(unittest.TestCase):
'X-Timestamp': utils.Timestamp(1).internal,
'X-Trans-Id': '123',
'Referer': 'PUT http://localhost/sda1/0/a/c/o',
- 'X-Backend-Storage-Policy-Index': '0',
+ 'X-Backend-Storage-Policy-Index': int(policy),
'X-Etag': 'd41d8cd98f00b204e9800998ecf8427e'}),
'obj': 'o',
'account': 'a',
@@ -3151,6 +3501,7 @@ class TestObjectController(unittest.TestCase):
'op': 'PUT'})
def test_container_update_bad_args(self):
+ policy = random.choice(list(POLICIES))
given_args = []
def fake_async_update(*args):
@@ -3163,7 +3514,8 @@ class TestObjectController(unittest.TestCase):
'X-Trans-Id': '123',
'X-Container-Host': 'chost,badhost',
'X-Container-Partition': 'cpartition',
- 'X-Container-Device': 'cdevice'})
+ 'X-Container-Device': 'cdevice',
+ 'X-Backend-Storage-Policy-Index': int(policy)})
with mock.patch.object(self.object_controller, 'async_update',
fake_async_update):
self.object_controller.container_update(
@@ -3171,7 +3523,7 @@ class TestObjectController(unittest.TestCase):
'x-size': '0',
'x-etag': 'd41d8cd98f00b204e9800998ecf8427e',
'x-content-type': 'text/plain', 'x-timestamp': '1'},
- 'sda1', 0)
+ 'sda1', policy)
self.assertEqual(given_args, [])
errors = self.object_controller.logger.get_lines_for_level('error')
self.assertEqual(len(errors), 1)
@@ -3184,6 +3536,7 @@ class TestObjectController(unittest.TestCase):
def test_delete_at_update_on_put(self):
# Test how delete_at_update works when issued a delete for old
# expiration info after a new put with no new expiration info.
+ policy = random.choice(list(POLICIES))
given_args = []
def fake_async_update(*args):
@@ -3193,11 +3546,12 @@ class TestObjectController(unittest.TestCase):
'/v1/a/c/o',
environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': 1,
- 'X-Trans-Id': '123'})
+ 'X-Trans-Id': '123',
+ 'X-Backend-Storage-Policy-Index': int(policy)})
with mock.patch.object(self.object_controller, 'async_update',
fake_async_update):
self.object_controller.delete_at_update(
- 'DELETE', 2, 'a', 'c', 'o', req, 'sda1', 0)
+ 'DELETE', 2, 'a', 'c', 'o', req, 'sda1', policy)
self.assertEquals(
given_args, [
'DELETE', '.expiring_objects', '0000000000',
@@ -3207,12 +3561,13 @@ class TestObjectController(unittest.TestCase):
'x-timestamp': utils.Timestamp('1').internal,
'x-trans-id': '123',
'referer': 'PUT http://localhost/v1/a/c/o'}),
- 'sda1', 0])
+ 'sda1', policy])
def test_delete_at_negative(self):
# Test how delete_at_update works when issued a delete for old
# expiration info after a new put with no new expiration info.
# Test negative is reset to 0
+ policy = random.choice(list(POLICIES))
given_args = []
def fake_async_update(*args):
@@ -3223,23 +3578,26 @@ class TestObjectController(unittest.TestCase):
'/v1/a/c/o',
environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': 1,
- 'X-Trans-Id': '1234'})
+ 'X-Trans-Id': '1234', 'X-Backend-Storage-Policy-Index':
+ int(policy)})
self.object_controller.delete_at_update(
- 'DELETE', -2, 'a', 'c', 'o', req, 'sda1', 0)
+ 'DELETE', -2, 'a', 'c', 'o', req, 'sda1', policy)
self.assertEquals(given_args, [
'DELETE', '.expiring_objects', '0000000000', '0000000000-a/c/o',
None, None, None,
HeaderKeyDict({
+ # the expiring objects account is always 0
'X-Backend-Storage-Policy-Index': 0,
'x-timestamp': utils.Timestamp('1').internal,
'x-trans-id': '1234',
'referer': 'PUT http://localhost/v1/a/c/o'}),
- 'sda1', 0])
+ 'sda1', policy])
def test_delete_at_cap(self):
# Test how delete_at_update works when issued a delete for old
# expiration info after a new put with no new expiration info.
# Test past cap is reset to cap
+ policy = random.choice(list(POLICIES))
given_args = []
def fake_async_update(*args):
@@ -3250,9 +3608,10 @@ class TestObjectController(unittest.TestCase):
'/v1/a/c/o',
environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': 1,
- 'X-Trans-Id': '1234'})
+ 'X-Trans-Id': '1234',
+ 'X-Backend-Storage-Policy-Index': int(policy)})
self.object_controller.delete_at_update(
- 'DELETE', 12345678901, 'a', 'c', 'o', req, 'sda1', 0)
+ 'DELETE', 12345678901, 'a', 'c', 'o', req, 'sda1', policy)
expiring_obj_container = given_args.pop(2)
expected_exp_cont = utils.get_expirer_container(
utils.normalize_delete_at_timestamp(12345678901),
@@ -3267,12 +3626,13 @@ class TestObjectController(unittest.TestCase):
'x-timestamp': utils.Timestamp('1').internal,
'x-trans-id': '1234',
'referer': 'PUT http://localhost/v1/a/c/o'}),
- 'sda1', 0])
+ 'sda1', policy])
def test_delete_at_update_put_with_info(self):
# Keep next test,
# test_delete_at_update_put_with_info_but_missing_container, in sync
# with this one but just missing the X-Delete-At-Container header.
+ policy = random.choice(list(POLICIES))
given_args = []
def fake_async_update(*args):
@@ -3287,14 +3647,16 @@ class TestObjectController(unittest.TestCase):
'X-Delete-At-Container': '0',
'X-Delete-At-Host': '127.0.0.1:1234',
'X-Delete-At-Partition': '3',
- 'X-Delete-At-Device': 'sdc1'})
+ 'X-Delete-At-Device': 'sdc1',
+ 'X-Backend-Storage-Policy-Index': int(policy)})
self.object_controller.delete_at_update('PUT', 2, 'a', 'c', 'o',
- req, 'sda1', 0)
+ req, 'sda1', policy)
self.assertEquals(
given_args, [
'PUT', '.expiring_objects', '0000000000', '0000000002-a/c/o',
'127.0.0.1:1234',
'3', 'sdc1', HeaderKeyDict({
+ # the .expiring_objects account is always policy-0
'X-Backend-Storage-Policy-Index': 0,
'x-size': '0',
'x-etag': 'd41d8cd98f00b204e9800998ecf8427e',
@@ -3302,11 +3664,12 @@ class TestObjectController(unittest.TestCase):
'x-timestamp': utils.Timestamp('1').internal,
'x-trans-id': '1234',
'referer': 'PUT http://localhost/v1/a/c/o'}),
- 'sda1', 0])
+ 'sda1', policy])
def test_delete_at_update_put_with_info_but_missing_container(self):
# Same as previous test, test_delete_at_update_put_with_info, but just
# missing the X-Delete-At-Container header.
+ policy = random.choice(list(POLICIES))
given_args = []
def fake_async_update(*args):
@@ -3321,9 +3684,10 @@ class TestObjectController(unittest.TestCase):
'X-Trans-Id': '1234',
'X-Delete-At-Host': '127.0.0.1:1234',
'X-Delete-At-Partition': '3',
- 'X-Delete-At-Device': 'sdc1'})
+ 'X-Delete-At-Device': 'sdc1',
+ 'X-Backend-Storage-Policy-Index': int(policy)})
self.object_controller.delete_at_update('PUT', 2, 'a', 'c', 'o',
- req, 'sda1', 0)
+ req, 'sda1', policy)
self.assertEquals(
self.logger.get_lines_for_level('warning'),
['X-Delete-At-Container header must be specified for expiring '
@@ -3331,6 +3695,7 @@ class TestObjectController(unittest.TestCase):
'to the container name for now.'])
def test_delete_at_update_delete(self):
+ policy = random.choice(list(POLICIES))
given_args = []
def fake_async_update(*args):
@@ -3341,9 +3706,10 @@ class TestObjectController(unittest.TestCase):
'/v1/a/c/o',
environ={'REQUEST_METHOD': 'DELETE'},
headers={'X-Timestamp': 1,
- 'X-Trans-Id': '1234'})
+ 'X-Trans-Id': '1234',
+ 'X-Backend-Storage-Policy-Index': int(policy)})
self.object_controller.delete_at_update('DELETE', 2, 'a', 'c', 'o',
- req, 'sda1', 0)
+ req, 'sda1', policy)
self.assertEquals(
given_args, [
'DELETE', '.expiring_objects', '0000000000',
@@ -3353,11 +3719,12 @@ class TestObjectController(unittest.TestCase):
'x-timestamp': utils.Timestamp('1').internal,
'x-trans-id': '1234',
'referer': 'DELETE http://localhost/v1/a/c/o'}),
- 'sda1', 0])
+ 'sda1', policy])
def test_delete_backend_replication(self):
# If X-Backend-Replication: True delete_at_update should completely
# short-circuit.
+ policy = random.choice(list(POLICIES))
given_args = []
def fake_async_update(*args):
@@ -3369,12 +3736,14 @@ class TestObjectController(unittest.TestCase):
environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': 1,
'X-Trans-Id': '1234',
- 'X-Backend-Replication': 'True'})
+ 'X-Backend-Replication': 'True',
+ 'X-Backend-Storage-Policy-Index': int(policy)})
self.object_controller.delete_at_update(
- 'DELETE', -2, 'a', 'c', 'o', req, 'sda1', 0)
+ 'DELETE', -2, 'a', 'c', 'o', req, 'sda1', policy)
self.assertEquals(given_args, [])
def test_POST_calls_delete_at(self):
+ policy = random.choice(list(POLICIES))
given_args = []
def fake_delete_at_update(*args):
@@ -3386,7 +3755,9 @@ class TestObjectController(unittest.TestCase):
'/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': normalize_timestamp(time()),
'Content-Length': '4',
- 'Content-Type': 'application/octet-stream'})
+ 'Content-Type': 'application/octet-stream',
+ 'X-Backend-Storage-Policy-Index': int(policy),
+ 'X-Object-Sysmeta-Ec-Frag-Index': 2})
req.body = 'TEST'
resp = req.get_response(self.object_controller)
self.assertEquals(resp.status_int, 201)
@@ -3397,7 +3768,8 @@ class TestObjectController(unittest.TestCase):
'/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'POST'},
headers={'X-Timestamp': normalize_timestamp(time()),
- 'Content-Type': 'application/x-test'})
+ 'Content-Type': 'application/x-test',
+ 'X-Backend-Storage-Policy-Index': int(policy)})
resp = req.get_response(self.object_controller)
self.assertEquals(resp.status_int, 202)
self.assertEquals(given_args, [])
@@ -3410,13 +3782,14 @@ class TestObjectController(unittest.TestCase):
environ={'REQUEST_METHOD': 'POST'},
headers={'X-Timestamp': timestamp1,
'Content-Type': 'application/x-test',
- 'X-Delete-At': delete_at_timestamp1})
+ 'X-Delete-At': delete_at_timestamp1,
+ 'X-Backend-Storage-Policy-Index': int(policy)})
resp = req.get_response(self.object_controller)
self.assertEquals(resp.status_int, 202)
self.assertEquals(
given_args, [
'PUT', int(delete_at_timestamp1), 'a', 'c', 'o',
- given_args[5], 'sda1', 0])
+ given_args[5], 'sda1', policy])
while given_args:
given_args.pop()
@@ -3429,17 +3802,19 @@ class TestObjectController(unittest.TestCase):
environ={'REQUEST_METHOD': 'POST'},
headers={'X-Timestamp': timestamp2,
'Content-Type': 'application/x-test',
- 'X-Delete-At': delete_at_timestamp2})
+ 'X-Delete-At': delete_at_timestamp2,
+ 'X-Backend-Storage-Policy-Index': int(policy)})
resp = req.get_response(self.object_controller)
self.assertEquals(resp.status_int, 202)
self.assertEquals(
given_args, [
'PUT', int(delete_at_timestamp2), 'a', 'c', 'o',
- given_args[5], 'sda1', 0,
+ given_args[5], 'sda1', policy,
'DELETE', int(delete_at_timestamp1), 'a', 'c', 'o',
- given_args[5], 'sda1', 0])
+ given_args[5], 'sda1', policy])
def test_PUT_calls_delete_at(self):
+ policy = random.choice(list(POLICIES))
given_args = []
def fake_delete_at_update(*args):
@@ -3451,7 +3826,9 @@ class TestObjectController(unittest.TestCase):
'/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': normalize_timestamp(time()),
'Content-Length': '4',
- 'Content-Type': 'application/octet-stream'})
+ 'Content-Type': 'application/octet-stream',
+ 'X-Backend-Storage-Policy-Index': int(policy),
+ 'X-Object-Sysmeta-Ec-Frag-Index': 4})
req.body = 'TEST'
resp = req.get_response(self.object_controller)
self.assertEquals(resp.status_int, 201)
@@ -3465,14 +3842,16 @@ class TestObjectController(unittest.TestCase):
headers={'X-Timestamp': timestamp1,
'Content-Length': '4',
'Content-Type': 'application/octet-stream',
- 'X-Delete-At': delete_at_timestamp1})
+ 'X-Delete-At': delete_at_timestamp1,
+ 'X-Backend-Storage-Policy-Index': int(policy),
+ 'X-Object-Sysmeta-Ec-Frag-Index': 3})
req.body = 'TEST'
resp = req.get_response(self.object_controller)
self.assertEquals(resp.status_int, 201)
self.assertEquals(
given_args, [
'PUT', int(delete_at_timestamp1), 'a', 'c', 'o',
- given_args[5], 'sda1', 0])
+ given_args[5], 'sda1', policy])
while given_args:
given_args.pop()
@@ -3486,16 +3865,18 @@ class TestObjectController(unittest.TestCase):
headers={'X-Timestamp': timestamp2,
'Content-Length': '4',
'Content-Type': 'application/octet-stream',
- 'X-Delete-At': delete_at_timestamp2})
+ 'X-Delete-At': delete_at_timestamp2,
+ 'X-Backend-Storage-Policy-Index': int(policy),
+ 'X-Object-Sysmeta-Ec-Frag-Index': 3})
req.body = 'TEST'
resp = req.get_response(self.object_controller)
self.assertEquals(resp.status_int, 201)
self.assertEquals(
given_args, [
'PUT', int(delete_at_timestamp2), 'a', 'c', 'o',
- given_args[5], 'sda1', 0,
+ given_args[5], 'sda1', policy,
'DELETE', int(delete_at_timestamp1), 'a', 'c', 'o',
- given_args[5], 'sda1', 0])
+ given_args[5], 'sda1', policy])
def test_GET_but_expired(self):
test_time = time() + 10000
@@ -3917,7 +4298,7 @@ class TestObjectController(unittest.TestCase):
self.assertEquals(resp.status_int, 201)
self.assertEquals(given_args, [
'PUT', int(delete_at_timestamp1), 'a', 'c', 'o',
- given_args[5], 'sda1', 0])
+ given_args[5], 'sda1', POLICIES[0]])
while given_args:
given_args.pop()
@@ -3933,7 +4314,7 @@ class TestObjectController(unittest.TestCase):
self.assertEquals(resp.status_int, 204)
self.assertEquals(given_args, [
'DELETE', int(delete_at_timestamp1), 'a', 'c', 'o',
- given_args[5], 'sda1', 0])
+ given_args[5], 'sda1', POLICIES[0]])
def test_PUT_delete_at_in_past(self):
req = Request.blank(
@@ -4132,7 +4513,7 @@ class TestObjectController(unittest.TestCase):
def test_correct_allowed_method(self):
# Test correct work for allowed method using
# swift.obj.server.ObjectController.__call__
- inbuf = StringIO()
+ inbuf = WsgiStringIO()
errbuf = StringIO()
outbuf = StringIO()
self.object_controller = object_server.app_factory(
@@ -4170,7 +4551,7 @@ class TestObjectController(unittest.TestCase):
def test_not_allowed_method(self):
# Test correct work for NOT allowed method using
# swift.obj.server.ObjectController.__call__
- inbuf = StringIO()
+ inbuf = WsgiStringIO()
errbuf = StringIO()
outbuf = StringIO()
self.object_controller = object_server.ObjectController(
@@ -4253,7 +4634,7 @@ class TestObjectController(unittest.TestCase):
self.assertEquals(outbuf.getvalue()[:4], '405 ')
def test_not_utf8_and_not_logging_requests(self):
- inbuf = StringIO()
+ inbuf = WsgiStringIO()
errbuf = StringIO()
outbuf = StringIO()
self.object_controller = object_server.ObjectController(
@@ -4291,7 +4672,7 @@ class TestObjectController(unittest.TestCase):
self.assertEqual(self.logger.get_lines_for_level('info'), [])
def test__call__returns_500(self):
- inbuf = StringIO()
+ inbuf = WsgiStringIO()
errbuf = StringIO()
outbuf = StringIO()
self.logger = debug_logger('test')
@@ -4337,7 +4718,7 @@ class TestObjectController(unittest.TestCase):
self.assertEqual(self.logger.get_lines_for_level('info'), [])
def test_PUT_slow(self):
- inbuf = StringIO()
+ inbuf = WsgiStringIO()
errbuf = StringIO()
outbuf = StringIO()
self.object_controller = object_server.ObjectController(
@@ -4398,9 +4779,12 @@ class TestObjectController(unittest.TestCase):
['1.2.3.4 - - [01/Jan/1970:02:46:41 +0000] "HEAD /sda1/p/a/c/o" '
'404 - "-" "-" "-" 2.0000 "-" 1234 -'])
- @patch_policies([storage_policy.StoragePolicy(0, 'zero', True),
- storage_policy.StoragePolicy(1, 'one', False)])
+ @patch_policies([StoragePolicy(0, 'zero', True),
+ StoragePolicy(1, 'one', False)])
def test_dynamic_datadir(self):
+ # update router post patch
+ self.object_controller._diskfile_router = diskfile.DiskFileRouter(
+ self.conf, self.object_controller.logger)
timestamp = normalize_timestamp(time())
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': timestamp,
@@ -4434,8 +4818,50 @@ class TestObjectController(unittest.TestCase):
self.assertEquals(resp.status_int, 201)
self.assertTrue(os.path.isdir(object_dir))
+ def test_storage_policy_index_is_validated(self):
+ # sanity check that index for existing policy is ok
+ ts = (utils.Timestamp(t).internal for t in
+ itertools.count(int(time())))
+ methods = ('PUT', 'POST', 'GET', 'HEAD', 'REPLICATE', 'DELETE')
+ valid_indices = sorted([int(policy) for policy in POLICIES])
+ for index in valid_indices:
+ object_dir = self.testdir + "/sda1/objects"
+ if index > 0:
+ object_dir = "%s-%s" % (object_dir, index)
+ self.assertFalse(os.path.isdir(object_dir))
+ for method in methods:
+ headers = {
+ 'X-Timestamp': ts.next(),
+ 'Content-Type': 'application/x-test',
+ 'X-Backend-Storage-Policy-Index': index}
+ if POLICIES[index].policy_type == EC_POLICY:
+ headers['X-Object-Sysmeta-Ec-Frag-Index'] = '2'
+ req = Request.blank(
+ '/sda1/p/a/c/o',
+ environ={'REQUEST_METHOD': method},
+ headers=headers)
+ req.body = 'VERIFY'
+ resp = req.get_response(self.object_controller)
+ self.assertTrue(is_success(resp.status_int),
+ '%s method failed: %r' % (method, resp.status))
-@patch_policies
+ # index for non-existent policy should return 503
+ index = valid_indices[-1] + 1
+ for method in methods:
+ req = Request.blank('/sda1/p/a/c/o',
+ environ={'REQUEST_METHOD': method},
+ headers={
+ 'X-Timestamp': ts.next(),
+ 'Content-Type': 'application/x-test',
+ 'X-Backend-Storage-Policy-Index': index})
+ req.body = 'VERIFY'
+ object_dir = self.testdir + "/sda1/objects-%s" % index
+ resp = req.get_response(self.object_controller)
+ self.assertEquals(resp.status_int, 503)
+ self.assertFalse(os.path.isdir(object_dir))
+
+
+@patch_policies(test_policies)
class TestObjectServer(unittest.TestCase):
def setUp(self):
@@ -4447,13 +4873,13 @@ class TestObjectServer(unittest.TestCase):
for device in ('sda1', 'sdb1'):
os.makedirs(os.path.join(self.devices, device))
- conf = {
+ self.conf = {
'devices': self.devices,
'swift_dir': self.tempdir,
'mount_check': 'false',
}
self.logger = debug_logger('test-object-server')
- app = object_server.ObjectController(conf, logger=self.logger)
+ app = object_server.ObjectController(self.conf, logger=self.logger)
sock = listen(('127.0.0.1', 0))
self.server = spawn(wsgi.server, sock, app, utils.NullLogger())
self.port = sock.getsockname()[1]
@@ -4486,6 +4912,23 @@ class TestObjectServer(unittest.TestCase):
resp.read()
resp.close()
+ def test_expect_on_put_footer(self):
+ test_body = 'test'
+ headers = {
+ 'Expect': '100-continue',
+ 'Content-Length': len(test_body),
+ 'X-Timestamp': utils.Timestamp(time()).internal,
+ 'X-Backend-Obj-Metadata-Footer': 'yes',
+ 'X-Backend-Obj-Multipart-Mime-Boundary': 'boundary123',
+ }
+ conn = bufferedhttp.http_connect('127.0.0.1', self.port, 'sda1', '0',
+ 'PUT', '/a/c/o', headers=headers)
+ resp = conn.getexpect()
+ self.assertEqual(resp.status, 100)
+ headers = HeaderKeyDict(resp.getheaders())
+ self.assertEqual(headers['X-Obj-Metadata-Footer'], 'yes')
+ resp.close()
+
def test_expect_on_put_conflict(self):
test_body = 'test'
put_timestamp = utils.Timestamp(time())
@@ -4514,6 +4957,377 @@ class TestObjectServer(unittest.TestCase):
resp.read()
resp.close()
+ def test_multiphase_put_no_mime_boundary(self):
+ test_data = 'obj data'
+ put_timestamp = utils.Timestamp(time()).internal
+ headers = {
+ 'Content-Type': 'text/plain',
+ 'X-Timestamp': put_timestamp,
+ 'Transfer-Encoding': 'chunked',
+ 'Expect': '100-continue',
+ 'X-Backend-Obj-Content-Length': len(test_data),
+ 'X-Backend-Obj-Multiphase-Commit': 'yes',
+ }
+ conn = bufferedhttp.http_connect('127.0.0.1', self.port, 'sda1', '0',
+ 'PUT', '/a/c/o', headers=headers)
+ resp = conn.getexpect()
+ self.assertEqual(resp.status, 400)
+ resp.read()
+ resp.close()
+
+ def test_expect_on_multiphase_put(self):
+ test_data = 'obj data'
+ test_doc = "\r\n".join((
+ "--boundary123",
+ "X-Document: object body",
+ "",
+ test_data,
+ "--boundary123",
+ ))
+
+ put_timestamp = utils.Timestamp(time()).internal
+ headers = {
+ 'Content-Type': 'text/plain',
+ 'X-Timestamp': put_timestamp,
+ 'Transfer-Encoding': 'chunked',
+ 'Expect': '100-continue',
+ 'X-Backend-Obj-Content-Length': len(test_data),
+ 'X-Backend-Obj-Multipart-Mime-Boundary': 'boundary123',
+ 'X-Backend-Obj-Multiphase-Commit': 'yes',
+ }
+ conn = bufferedhttp.http_connect('127.0.0.1', self.port, 'sda1', '0',
+ 'PUT', '/a/c/o', headers=headers)
+ resp = conn.getexpect()
+ self.assertEqual(resp.status, 100)
+ headers = HeaderKeyDict(resp.getheaders())
+ self.assertEqual(headers['X-Obj-Multiphase-Commit'], 'yes')
+
+ to_send = "%x\r\n%s\r\n0\r\n\r\n" % (len(test_doc), test_doc)
+ conn.send(to_send)
+
+ # verify 100-continue response to mark end of phase1
+ resp = conn.getexpect()
+ self.assertEqual(resp.status, 100)
+ resp.close()
+
+ def test_multiphase_put_metadata_footer(self):
+ # Test 2-phase commit conversation - end of 1st phase marked
+ # by 100-continue response from the object server, with a
+ # successful 2nd phase marked by the presence of a .durable
+ # file along with .data file in the object data directory
+ test_data = 'obj data'
+ footer_meta = {
+ "X-Object-Sysmeta-Ec-Frag-Index": "2",
+ "Etag": md5(test_data).hexdigest(),
+ }
+ footer_json = json.dumps(footer_meta)
+ footer_meta_cksum = md5(footer_json).hexdigest()
+ test_doc = "\r\n".join((
+ "--boundary123",
+ "X-Document: object body",
+ "",
+ test_data,
+ "--boundary123",
+ "X-Document: object metadata",
+ "Content-MD5: " + footer_meta_cksum,
+ "",
+ footer_json,
+ "--boundary123",
+ ))
+
+ # phase1 - PUT request with object metadata in footer and
+ # multiphase commit conversation
+ put_timestamp = utils.Timestamp(time()).internal
+ headers = {
+ 'Content-Type': 'text/plain',
+ 'X-Timestamp': put_timestamp,
+ 'Transfer-Encoding': 'chunked',
+ 'Expect': '100-continue',
+ 'X-Backend-Storage-Policy-Index': '1',
+ 'X-Backend-Obj-Content-Length': len(test_data),
+ 'X-Backend-Obj-Metadata-Footer': 'yes',
+ 'X-Backend-Obj-Multipart-Mime-Boundary': 'boundary123',
+ 'X-Backend-Obj-Multiphase-Commit': 'yes',
+ }
+ conn = bufferedhttp.http_connect('127.0.0.1', self.port, 'sda1', '0',
+ 'PUT', '/a/c/o', headers=headers)
+ resp = conn.getexpect()
+ self.assertEqual(resp.status, 100)
+ headers = HeaderKeyDict(resp.getheaders())
+ self.assertEqual(headers['X-Obj-Multiphase-Commit'], 'yes')
+ self.assertEqual(headers['X-Obj-Metadata-Footer'], 'yes')
+
+ to_send = "%x\r\n%s\r\n0\r\n\r\n" % (len(test_doc), test_doc)
+ conn.send(to_send)
+ # verify 100-continue response to mark end of phase1
+ resp = conn.getexpect()
+ self.assertEqual(resp.status, 100)
+
+ # send commit confirmation to start phase2
+ commit_confirmation_doc = "\r\n".join((
+ "X-Document: put commit",
+ "",
+ "commit_confirmation",
+ "--boundary123--",
+ ))
+ to_send = "%x\r\n%s\r\n0\r\n\r\n" % \
+ (len(commit_confirmation_doc), commit_confirmation_doc)
+ conn.send(to_send)
+
+ # verify success (2xx) to make end of phase2
+ resp = conn.getresponse()
+ self.assertEqual(resp.status, 201)
+ resp.read()
+ resp.close()
+
+ # verify successful object data and durable state file write
+ obj_basename = os.path.join(
+ self.devices, 'sda1',
+ storage_directory(diskfile.get_data_dir(POLICIES[1]), '0',
+ hash_path('a', 'c', 'o')),
+ put_timestamp)
+ obj_datafile = obj_basename + '#2.data'
+ self.assertTrue(os.path.isfile(obj_datafile))
+ obj_durablefile = obj_basename + '.durable'
+ self.assertTrue(os.path.isfile(obj_durablefile))
+
+ def test_multiphase_put_no_metadata_footer(self):
+ # Test 2-phase commit conversation, with no metadata footer
+ # at the end of object data - end of 1st phase marked
+ # by 100-continue response from the object server, with a
+ # successful 2nd phase marked by the presence of a .durable
+ # file along with .data file in the object data directory
+ # (No metadata footer case)
+ test_data = 'obj data'
+ test_doc = "\r\n".join((
+ "--boundary123",
+ "X-Document: object body",
+ "",
+ test_data,
+ "--boundary123",
+ ))
+
+ # phase1 - PUT request with multiphase commit conversation
+ # no object metadata in footer
+ put_timestamp = utils.Timestamp(time()).internal
+ headers = {
+ 'Content-Type': 'text/plain',
+ 'X-Timestamp': put_timestamp,
+ 'Transfer-Encoding': 'chunked',
+ 'Expect': '100-continue',
+ # normally the frag index gets sent in the MIME footer (which this
+ # test doesn't have, see `test_multiphase_put_metadata_footer`),
+ # but the proxy *could* send the frag index in the headers and
+ # this test verifies that would work.
+ 'X-Object-Sysmeta-Ec-Frag-Index': '2',
+ 'X-Backend-Storage-Policy-Index': '1',
+ 'X-Backend-Obj-Content-Length': len(test_data),
+ 'X-Backend-Obj-Multipart-Mime-Boundary': 'boundary123',
+ 'X-Backend-Obj-Multiphase-Commit': 'yes',
+ }
+ conn = bufferedhttp.http_connect('127.0.0.1', self.port, 'sda1', '0',
+ 'PUT', '/a/c/o', headers=headers)
+ resp = conn.getexpect()
+ self.assertEqual(resp.status, 100)
+ headers = HeaderKeyDict(resp.getheaders())
+ self.assertEqual(headers['X-Obj-Multiphase-Commit'], 'yes')
+
+ to_send = "%x\r\n%s\r\n0\r\n\r\n" % (len(test_doc), test_doc)
+ conn.send(to_send)
+ # verify 100-continue response to mark end of phase1
+ resp = conn.getexpect()
+ self.assertEqual(resp.status, 100)
+
+ # send commit confirmation to start phase2
+ commit_confirmation_doc = "\r\n".join((
+ "X-Document: put commit",
+ "",
+ "commit_confirmation",
+ "--boundary123--",
+ ))
+ to_send = "%x\r\n%s\r\n0\r\n\r\n" % \
+ (len(commit_confirmation_doc), commit_confirmation_doc)
+ conn.send(to_send)
+
+ # verify success (2xx) to make end of phase2
+ resp = conn.getresponse()
+ self.assertEqual(resp.status, 201)
+ resp.read()
+ resp.close()
+
+ # verify successful object data and durable state file write
+ obj_basename = os.path.join(
+ self.devices, 'sda1',
+ storage_directory(diskfile.get_data_dir(POLICIES[1]), '0',
+ hash_path('a', 'c', 'o')),
+ put_timestamp)
+ obj_datafile = obj_basename + '#2.data'
+ self.assertTrue(os.path.isfile(obj_datafile))
+ obj_durablefile = obj_basename + '.durable'
+ self.assertTrue(os.path.isfile(obj_durablefile))
+
+ def test_multiphase_put_draining(self):
+ # We want to ensure that we read the whole response body even if
+ # it's multipart MIME and there's document parts that we don't
+ # expect or understand. This'll help save our bacon if we ever jam
+ # more stuff in there.
+ in_a_timeout = [False]
+
+ # inherit from BaseException so we get a stack trace when the test
+ # fails instead of just a 500
+ class NotInATimeout(BaseException):
+ pass
+
+ class FakeTimeout(BaseException):
+ def __enter__(self):
+ in_a_timeout[0] = True
+
+ def __exit__(self, typ, value, tb):
+ in_a_timeout[0] = False
+
+ class PickyWsgiStringIO(WsgiStringIO):
+ def read(self, *a, **kw):
+ if not in_a_timeout[0]:
+ raise NotInATimeout()
+ return WsgiStringIO.read(self, *a, **kw)
+
+ def readline(self, *a, **kw):
+ if not in_a_timeout[0]:
+ raise NotInATimeout()
+ return WsgiStringIO.readline(self, *a, **kw)
+
+ test_data = 'obj data'
+ footer_meta = {
+ "X-Object-Sysmeta-Ec-Frag-Index": "7",
+ "Etag": md5(test_data).hexdigest(),
+ }
+ footer_json = json.dumps(footer_meta)
+ footer_meta_cksum = md5(footer_json).hexdigest()
+ test_doc = "\r\n".join((
+ "--boundary123",
+ "X-Document: object body",
+ "",
+ test_data,
+ "--boundary123",
+ "X-Document: object metadata",
+ "Content-MD5: " + footer_meta_cksum,
+ "",
+ footer_json,
+ "--boundary123",
+ "X-Document: we got cleverer",
+ "",
+ "stuff stuff meaningless stuuuuuuuuuuff",
+ "--boundary123",
+ "X-Document: we got even cleverer; can you believe it?",
+ "Waneshaft: ambifacient lunar",
+ "Casing: malleable logarithmic",
+ "",
+ "potato potato potato potato potato potato potato",
+ "--boundary123--"
+ ))
+
+ # phase1 - PUT request with object metadata in footer and
+ # multiphase commit conversation
+ put_timestamp = utils.Timestamp(time()).internal
+ headers = {
+ 'Content-Type': 'text/plain',
+ 'X-Timestamp': put_timestamp,
+ 'Transfer-Encoding': 'chunked',
+ 'Expect': '100-continue',
+ 'X-Backend-Storage-Policy-Index': '1',
+ 'X-Backend-Obj-Content-Length': len(test_data),
+ 'X-Backend-Obj-Metadata-Footer': 'yes',
+ 'X-Backend-Obj-Multipart-Mime-Boundary': 'boundary123',
+ }
+ wsgi_input = PickyWsgiStringIO(test_doc)
+ req = Request.blank(
+ "/sda1/0/a/c/o",
+ environ={'REQUEST_METHOD': 'PUT', 'wsgi.input': wsgi_input},
+ headers=headers)
+
+ app = object_server.ObjectController(self.conf, logger=self.logger)
+ with mock.patch('swift.obj.server.ChunkReadTimeout', FakeTimeout):
+ resp = req.get_response(app)
+ self.assertEqual(resp.status_int, 201) # sanity check
+
+ in_a_timeout[0] = True # so we can check without an exception
+ self.assertEqual(wsgi_input.read(), '') # we read all the bytes
+
+ def test_multiphase_put_bad_commit_message(self):
+ # Test 2-phase commit conversation - end of 1st phase marked
+ # by 100-continue response from the object server, with 2nd
+ # phase commit confirmation being received corrupt
+ test_data = 'obj data'
+ footer_meta = {
+ "X-Object-Sysmeta-Ec-Frag-Index": "7",
+ "Etag": md5(test_data).hexdigest(),
+ }
+ footer_json = json.dumps(footer_meta)
+ footer_meta_cksum = md5(footer_json).hexdigest()
+ test_doc = "\r\n".join((
+ "--boundary123",
+ "X-Document: object body",
+ "",
+ test_data,
+ "--boundary123",
+ "X-Document: object metadata",
+ "Content-MD5: " + footer_meta_cksum,
+ "",
+ footer_json,
+ "--boundary123",
+ ))
+
+ # phase1 - PUT request with object metadata in footer and
+ # multiphase commit conversation
+ put_timestamp = utils.Timestamp(time()).internal
+ headers = {
+ 'Content-Type': 'text/plain',
+ 'X-Timestamp': put_timestamp,
+ 'Transfer-Encoding': 'chunked',
+ 'Expect': '100-continue',
+ 'X-Backend-Storage-Policy-Index': '1',
+ 'X-Backend-Obj-Content-Length': len(test_data),
+ 'X-Backend-Obj-Metadata-Footer': 'yes',
+ 'X-Backend-Obj-Multipart-Mime-Boundary': 'boundary123',
+ 'X-Backend-Obj-Multiphase-Commit': 'yes',
+ }
+ conn = bufferedhttp.http_connect('127.0.0.1', self.port, 'sda1', '0',
+ 'PUT', '/a/c/o', headers=headers)
+ resp = conn.getexpect()
+ self.assertEqual(resp.status, 100)
+ headers = HeaderKeyDict(resp.getheaders())
+ self.assertEqual(headers['X-Obj-Multiphase-Commit'], 'yes')
+ self.assertEqual(headers['X-Obj-Metadata-Footer'], 'yes')
+
+ to_send = "%x\r\n%s\r\n0\r\n\r\n" % (len(test_doc), test_doc)
+ conn.send(to_send)
+ # verify 100-continue response to mark end of phase1
+ resp = conn.getexpect()
+ self.assertEqual(resp.status, 100)
+
+ # send commit confirmation to start phase2
+ commit_confirmation_doc = "\r\n".join((
+ "junkjunk",
+ "--boundary123--",
+ ))
+ to_send = "%x\r\n%s\r\n0\r\n\r\n" % \
+ (len(commit_confirmation_doc), commit_confirmation_doc)
+ conn.send(to_send)
+ resp = conn.getresponse()
+ self.assertEqual(resp.status, 500)
+ resp.read()
+ resp.close()
+ # verify that durable file was NOT created
+ obj_basename = os.path.join(
+ self.devices, 'sda1',
+ storage_directory(diskfile.get_data_dir(1), '0',
+ hash_path('a', 'c', 'o')),
+ put_timestamp)
+ obj_datafile = obj_basename + '#7.data'
+ self.assertTrue(os.path.isfile(obj_datafile))
+ obj_durablefile = obj_basename + '.durable'
+ self.assertFalse(os.path.isfile(obj_durablefile))
+
@patch_policies
class TestZeroCopy(unittest.TestCase):
diff --git a/test/unit/obj/test_ssync_receiver.py b/test/unit/obj/test_ssync_receiver.py
index 9af76185b..002a08a72 100644
--- a/test/unit/obj/test_ssync_receiver.py
+++ b/test/unit/obj/test_ssync_receiver.py
@@ -27,6 +27,7 @@ from swift.common import constraints
from swift.common import exceptions
from swift.common import swob
from swift.common import utils
+from swift.common.storage_policy import POLICIES
from swift.obj import diskfile
from swift.obj import server
from swift.obj import ssync_receiver
@@ -34,6 +35,7 @@ from swift.obj import ssync_receiver
from test import unit
+@unit.patch_policies()
class TestReceiver(unittest.TestCase):
def setUp(self):
@@ -46,12 +48,12 @@ class TestReceiver(unittest.TestCase):
self.testdir = os.path.join(
tempfile.mkdtemp(), 'tmp_test_ssync_receiver')
utils.mkdirs(os.path.join(self.testdir, 'sda1', 'tmp'))
- conf = {
+ self.conf = {
'devices': self.testdir,
'mount_check': 'false',
'replication_one_per_device': 'false',
'log_requests': 'false'}
- self.controller = server.ObjectController(conf)
+ self.controller = server.ObjectController(self.conf)
self.controller.bytes_per_sync = 1
self.account1 = 'a'
@@ -111,8 +113,8 @@ class TestReceiver(unittest.TestCase):
def test_REPLICATION_calls_replication_lock(self):
with mock.patch.object(
- self.controller._diskfile_mgr, 'replication_lock') as \
- mocked_replication_lock:
+ self.controller._diskfile_router[POLICIES.legacy],
+ 'replication_lock') as mocked_replication_lock:
req = swob.Request.blank(
'/sda1/1',
environ={'REQUEST_METHOD': 'REPLICATION'},
@@ -140,9 +142,8 @@ class TestReceiver(unittest.TestCase):
body_lines,
[':MISSING_CHECK: START', ':MISSING_CHECK: END',
':UPDATES: START', ':UPDATES: END'])
- self.assertEqual(rcvr.policy_idx, 0)
+ self.assertEqual(rcvr.policy, POLICIES[0])
- @unit.patch_policies()
def test_Receiver_with_storage_policy_index_header(self):
req = swob.Request.blank(
'/sda1/1',
@@ -157,15 +158,30 @@ class TestReceiver(unittest.TestCase):
body_lines,
[':MISSING_CHECK: START', ':MISSING_CHECK: END',
':UPDATES: START', ':UPDATES: END'])
- self.assertEqual(rcvr.policy_idx, 1)
+ self.assertEqual(rcvr.policy, POLICIES[1])
+
+ def test_Receiver_with_bad_storage_policy_index_header(self):
+ valid_indices = sorted([int(policy) for policy in POLICIES])
+ bad_index = valid_indices[-1] + 1
+ req = swob.Request.blank(
+ '/sda1/1',
+ environ={'REQUEST_METHOD': 'SSYNC',
+ 'HTTP_X_BACKEND_STORAGE_POLICY_INDEX': bad_index},
+ body=':MISSING_CHECK: START\r\n'
+ ':MISSING_CHECK: END\r\n'
+ ':UPDATES: START\r\n:UPDATES: END\r\n')
+ self.controller.logger = mock.MagicMock()
+ receiver = ssync_receiver.Receiver(self.controller, req)
+ body_lines = [chunk.strip() for chunk in receiver() if chunk.strip()]
+ self.assertEqual(body_lines, [":ERROR: 503 'No policy with index 2'"])
def test_REPLICATION_replication_lock_fail(self):
def _mock(path):
with exceptions.ReplicationLockTimeout(0.01, '/somewhere/' + path):
eventlet.sleep(0.05)
with mock.patch.object(
- self.controller._diskfile_mgr, 'replication_lock', _mock):
- self.controller._diskfile_mgr
+ self.controller._diskfile_router[POLICIES.legacy],
+ 'replication_lock', _mock):
self.controller.logger = mock.MagicMock()
req = swob.Request.blank(
'/sda1/1',
@@ -190,7 +206,7 @@ class TestReceiver(unittest.TestCase):
resp = req.get_response(self.controller)
self.assertEqual(
self.body_lines(resp.body),
- [":ERROR: 0 'Invalid path: /device'"])
+ [":ERROR: 400 'Invalid path: /device'"])
self.assertEqual(resp.status_int, 200)
self.assertFalse(mocked_replication_semaphore.acquire.called)
self.assertFalse(mocked_replication_semaphore.release.called)
@@ -203,7 +219,7 @@ class TestReceiver(unittest.TestCase):
resp = req.get_response(self.controller)
self.assertEqual(
self.body_lines(resp.body),
- [":ERROR: 0 'Invalid path: /device/'"])
+ [":ERROR: 400 'Invalid path: /device/'"])
self.assertEqual(resp.status_int, 200)
self.assertFalse(mocked_replication_semaphore.acquire.called)
self.assertFalse(mocked_replication_semaphore.release.called)
@@ -230,7 +246,7 @@ class TestReceiver(unittest.TestCase):
resp = req.get_response(self.controller)
self.assertEqual(
self.body_lines(resp.body),
- [":ERROR: 0 'Invalid path: /device/partition/junk'"])
+ [":ERROR: 400 'Invalid path: /device/partition/junk'"])
self.assertEqual(resp.status_int, 200)
self.assertFalse(mocked_replication_semaphore.acquire.called)
self.assertFalse(mocked_replication_semaphore.release.called)
@@ -240,7 +256,8 @@ class TestReceiver(unittest.TestCase):
mock.patch.object(
self.controller, 'replication_semaphore'),
mock.patch.object(
- self.controller._diskfile_mgr, 'mount_check', False),
+ self.controller._diskfile_router[POLICIES.legacy],
+ 'mount_check', False),
mock.patch.object(
constraints, 'check_mount', return_value=False)) as (
mocked_replication_semaphore,
@@ -259,7 +276,8 @@ class TestReceiver(unittest.TestCase):
mock.patch.object(
self.controller, 'replication_semaphore'),
mock.patch.object(
- self.controller._diskfile_mgr, 'mount_check', True),
+ self.controller._diskfile_router[POLICIES.legacy],
+ 'mount_check', True),
mock.patch.object(
constraints, 'check_mount', return_value=False)) as (
mocked_replication_semaphore,
@@ -275,7 +293,8 @@ class TestReceiver(unittest.TestCase):
"device</p></html>'"])
self.assertEqual(resp.status_int, 200)
mocked_check_mount.assert_called_once_with(
- self.controller._diskfile_mgr.devices, 'device')
+ self.controller._diskfile_router[POLICIES.legacy].devices,
+ 'device')
mocked_check_mount.reset_mock()
mocked_check_mount.return_value = True
@@ -287,7 +306,8 @@ class TestReceiver(unittest.TestCase):
[':ERROR: 0 "Looking for :MISSING_CHECK: START got \'\'"'])
self.assertEqual(resp.status_int, 200)
mocked_check_mount.assert_called_once_with(
- self.controller._diskfile_mgr.devices, 'device')
+ self.controller._diskfile_router[POLICIES.legacy].devices,
+ 'device')
def test_REPLICATION_Exception(self):
@@ -486,7 +506,8 @@ class TestReceiver(unittest.TestCase):
def test_MISSING_CHECK_have_one_exact(self):
object_dir = utils.storage_directory(
- os.path.join(self.testdir, 'sda1', diskfile.get_data_dir(0)),
+ os.path.join(self.testdir, 'sda1',
+ diskfile.get_data_dir(POLICIES[0])),
'1', self.hash1)
utils.mkdirs(object_dir)
fp = open(os.path.join(object_dir, self.ts1 + '.data'), 'w+')
@@ -515,10 +536,10 @@ class TestReceiver(unittest.TestCase):
self.assertFalse(self.controller.logger.error.called)
self.assertFalse(self.controller.logger.exception.called)
- @unit.patch_policies
def test_MISSING_CHECK_storage_policy(self):
object_dir = utils.storage_directory(
- os.path.join(self.testdir, 'sda1', diskfile.get_data_dir(1)),
+ os.path.join(self.testdir, 'sda1',
+ diskfile.get_data_dir(POLICIES[1])),
'1', self.hash1)
utils.mkdirs(object_dir)
fp = open(os.path.join(object_dir, self.ts1 + '.data'), 'w+')
@@ -550,7 +571,8 @@ class TestReceiver(unittest.TestCase):
def test_MISSING_CHECK_have_one_newer(self):
object_dir = utils.storage_directory(
- os.path.join(self.testdir, 'sda1', diskfile.get_data_dir(0)),
+ os.path.join(self.testdir, 'sda1',
+ diskfile.get_data_dir(POLICIES[0])),
'1', self.hash1)
utils.mkdirs(object_dir)
newer_ts1 = utils.normalize_timestamp(float(self.ts1) + 1)
@@ -583,7 +605,8 @@ class TestReceiver(unittest.TestCase):
def test_MISSING_CHECK_have_one_older(self):
object_dir = utils.storage_directory(
- os.path.join(self.testdir, 'sda1', diskfile.get_data_dir(0)),
+ os.path.join(self.testdir, 'sda1',
+ diskfile.get_data_dir(POLICIES[0])),
'1', self.hash1)
utils.mkdirs(object_dir)
older_ts1 = utils.normalize_timestamp(float(self.ts1) - 1)
@@ -1072,7 +1095,6 @@ class TestReceiver(unittest.TestCase):
'content-encoding specialty-header')})
self.assertEqual(req.read_body, '1')
- @unit.patch_policies()
def test_UPDATES_with_storage_policy(self):
_PUT_request = [None]