diff options
authorThiago da Silva <>2015-02-17 16:55:34 -0500
committerThiago da Silva <>2015-03-21 12:30:58 -0400
commit23d0842dec250905f68df601926ba8228392b322 (patch)
parent3d3db0ab789f37a69fc178eeb74c8cc61e4c6c1b (diff)
Refactoring the PUT method
Extracting large chunks of the PUT method into smaller methods to improve maintainability and reuse of code. Based on the work that Clay Gerrard started: Co-Authored-By: Clay Gerrard <> Change-Id: Id479fc5b159a2782361ac4a6e4a6d8bbaee4fe85 Signed-off-by: Thiago da Silva <>
5 files changed, 424 insertions, 245 deletions
diff --git a/swift/common/ b/swift/common/
index 1c43316ba..729cdd96f 100644
--- a/swift/common/
+++ b/swift/common/
@@ -929,6 +929,10 @@ class Request(object):
return '/' + entity_path
+ def is_chunked(self):
+ return 'chunked' in self.headers.get('transfer-encoding', '')
+ @property
def url(self):
"Provides the full url of the request"
return self.host_url + self.path_qs
diff --git a/swift/proxy/controllers/ b/swift/proxy/controllers/
index 1b9bcab61..70b0d0cf6 100644
--- a/swift/proxy/controllers/
+++ b/swift/proxy/controllers/
@@ -52,12 +52,13 @@ from swift.common.http import (
+from swift.common.storage_policy import POLICIES
from swift.proxy.controllers.base import Controller, delay_denial, \
from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPNotFound, \
HTTPPreconditionFailed, HTTPRequestEntityTooLarge, HTTPRequestTimeout, \
HTTPServerError, HTTPServiceUnavailable, Request, \
- HTTPClientDisconnect, HeaderKeyDict
+ HTTPClientDisconnect, HeaderKeyDict, HTTPException
from swift.common.request_helpers import is_sys_or_user_meta, is_sys_meta, \
remove_items, copy_header_subset
@@ -321,7 +322,13 @@ class ObjectController(Controller):
def _connect_put_node(self, nodes, part, path, headers,
- """Method for a file PUT connect"""
+ """
+ Make a connection for a replicated object.
+ Connects to the first working node that it finds in node_iter
+ and sends over the request headers. Returns an HTTPConnection
+ object to handle the rest of the streaming.
+ """ = logger_thread_locals
for node in nodes:
@@ -350,36 +357,41 @@ class ObjectController(Controller):, _('ERROR Insufficient Storage'))
elif is_server_error(resp.status):
- node, _('ERROR %(status)d Expect: 100-continue '
- 'From Object Server') % {
- 'status': resp.status})
+ node,
+ _('ERROR %(status)d Expect: 100-continue '
+ 'From Object Server') % {
+ 'status': resp.status})
except (Exception, Timeout):
node, _('Object'),
_('Expect: 100-continue on %s') % path)
+ def _await_response(self, conn, **kwargs):
+ with Timeout(
+ if conn.resp:
+ return conn.resp
+ else:
+ return conn.getresponse()
+ def _get_conn_response(self, conn, req, **kwargs):
+ try:
+ resp = self._await_response(conn, **kwargs)
+ return (conn, resp)
+ except (Exception, Timeout):
+ conn.node, _('Object'),
+ _('Trying to get final status of PUT to %s') % req.path)
+ return (None, None)
def _get_put_responses(self, req, conns, nodes):
statuses = []
reasons = []
bodies = []
etags = set()
- def get_conn_response(conn):
- try:
- with Timeout(
- if conn.resp:
- return (conn, conn.resp)
- else:
- return (conn, conn.getresponse())
- except (Exception, Timeout):
- conn.node, _('Object'),
- _('Trying to get final status of PUT to %s') % req.path)
- return (None, None)
pile = GreenAsyncPile(len(conns))
for conn in conns:
- pile.spawn(get_conn_response, conn)
+ pile.spawn(self._get_conn_response, conn, req)
def _handle_response(conn, response):
@@ -440,56 +452,135 @@ class ObjectController(Controller):
return req, delete_at_container, delete_at_part, delete_at_nodes
- @public
- @cors_validation
- @delay_denial
- def PUT(self, req):
- """HTTP PUT request handler."""
- if req.if_none_match is not None and '*' not in req.if_none_match:
- # Sending an etag with if-none-match isn't currently supported
- return HTTPBadRequest(request=req, content_type='text/plain',
- body='If-None-Match only supports *')
- container_info = self.container_info(
- self.account_name, self.container_name, req)
- policy_index = req.headers.get('X-Backend-Storage-Policy-Index',
- container_info['storage_policy'])
- obj_ring =
+ def _handle_copy_request(self, req):
+ """
+ This method handles copying objects based on values set in the headers
+ 'X-Copy-From' and 'X-Copy-From-Account'
- # pass the policy index to storage nodes via req header
- req.headers['X-Backend-Storage-Policy-Index'] = policy_index
- container_partition = container_info['partition']
- containers = container_info['nodes']
- req.acl = container_info['write_acl']
- req.environ['swift_sync_key'] = container_info['sync_key']
- object_versions = container_info['versions']
- if 'swift.authorize' in req.environ:
- aresp = req.environ['swift.authorize'](req)
- if aresp:
- return aresp
+ This method was added as part of the refactoring of the PUT method and
+ the functionality is expected to be moved to middleware
+ """
+ if req.environ.get('swift.orig_req_method', req.method) != 'POST':
+ req.environ.setdefault('swift.log_info', []).append(
+ 'x-copy-from:%s' % req.headers['X-Copy-From'])
+ ver, acct, _rest = req.split_path(2, 3, True)
+ src_account_name = req.headers.get('X-Copy-From-Account', None)
+ if src_account_name:
+ src_account_name = check_account_format(req, src_account_name)
+ else:
+ src_account_name = acct
+ src_container_name, src_obj_name = check_copy_from_header(req)
+ source_header = '/%s/%s/%s/%s' % (
+ ver, src_account_name, src_container_name, src_obj_name)
+ source_req = req.copy_get()
+ # make sure the source request uses it's container_info
+ source_req.headers.pop('X-Backend-Storage-Policy-Index', None)
+ source_req.path_info = source_header
+ source_req.headers['X-Newest'] = 'true'
+ orig_obj_name = self.object_name
+ orig_container_name = self.container_name
+ orig_account_name = self.account_name
+ sink_req = Request.blank(req.path_info,
+ environ=req.environ, headers=req.headers)
+ self.object_name = src_obj_name
+ self.container_name = src_container_name
+ self.account_name = src_account_name
+ source_resp = self.GET(source_req)
+ # This gives middlewares a way to change the source; for example,
+ # this lets you COPY a SLO manifest and have the new object be the
+ # concatenation of the segments (like what a GET request gives
+ # the client), not a copy of the manifest file.
+ hook = req.environ.get(
+ 'swift.copy_hook',
+ (lambda source_req, source_resp, sink_req: source_resp))
+ source_resp = hook(source_req, source_resp, sink_req)
+ # reset names
+ self.object_name = orig_obj_name
+ self.container_name = orig_container_name
+ self.account_name = orig_account_name
+ if source_resp.status_int >= HTTP_MULTIPLE_CHOICES:
+ # this is a bit of ugly code, but I'm willing to live with it
+ # until copy request handling moves to middleware
+ return source_resp, None, None, None
+ if source_resp.content_length is None:
+ # This indicates a transfer-encoding: chunked source object,
+ # which currently only happens because there are more than
+ # CONTAINER_LISTING_LIMIT segments in a segmented object. In
+ # this case, we're going to refuse to do the server-side copy.
+ raise HTTPRequestEntityTooLarge(request=req)
+ if source_resp.content_length > constraints.MAX_FILE_SIZE:
+ raise HTTPRequestEntityTooLarge(request=req)
+ data_source = iter(source_resp.app_iter)
+ sink_req.content_length = source_resp.content_length
+ sink_req.etag = source_resp.etag
+ # we no longer need the X-Copy-From header
+ del sink_req.headers['X-Copy-From']
+ if 'X-Copy-From-Account' in sink_req.headers:
+ del sink_req.headers['X-Copy-From-Account']
+ if not req.content_type_manually_set:
+ sink_req.headers['Content-Type'] = \
+ source_resp.headers['Content-Type']
+ if config_true_value(
+ sink_req.headers.get('x-fresh-metadata', 'false')):
+ # post-as-copy: ignore new sysmeta, copy existing sysmeta
+ condition = lambda k: is_sys_meta('object', k)
+ remove_items(sink_req.headers, condition)
+ copy_header_subset(source_resp, sink_req, condition)
+ else:
+ # copy/update existing sysmeta and user meta
+ copy_headers_into(source_resp, sink_req)
+ copy_headers_into(req, sink_req)
- if not containers:
- return HTTPNotFound(request=req)
+ # copy over x-static-large-object for POSTs and manifest copies
+ if 'X-Static-Large-Object' in source_resp.headers and \
+ req.params.get('multipart-manifest') == 'get':
+ sink_req.headers['X-Static-Large-Object'] = \
+ source_resp.headers['X-Static-Large-Object']
- # Sometimes the 'content-type' header exists, but is set to None.
- content_type_manually_set = True
- detect_content_type = \
- config_true_value(req.headers.get('x-detect-content-type'))
- if detect_content_type or not req.headers.get('content-type'):
- guessed_type, _junk = mimetypes.guess_type(req.path_info)
- req.headers['Content-Type'] = guessed_type or \
- 'application/octet-stream'
- if detect_content_type:
- req.headers.pop('x-detect-content-type')
- else:
- content_type_manually_set = False
+ req = sink_req
- error_response = check_object_creation(req, self.object_name) or \
- check_content_type(req)
- if error_response:
- return error_response
+ def update_response(req, resp):
+ acct, path = source_resp.environ['PATH_INFO'].split('/', 3)[2:4]
+ resp.headers['X-Copied-From-Account'] = quote(acct)
+ resp.headers['X-Copied-From'] = quote(path)
+ if 'last-modified' in source_resp.headers:
+ resp.headers['X-Copied-From-Last-Modified'] = \
+ source_resp.headers['last-modified']
+ copy_headers_into(req, resp)
+ return resp
+ # this is a bit of ugly code, but I'm willing to live with it
+ # until copy request handling moves to middleware
+ return None, req, data_source, update_response
+ def _handle_object_versions(self, req):
+ """
+ This method handles versionining of objects in containers that
+ have the feature enabled.
+ When a new PUT request is sent, the proxy checks for previous versions
+ of that same object name. If found, it is copied to a different
+ container and the new version is stored in its place.
+ This method was added as part of the PUT method refactoring and the
+ functionality is expected to be moved to middleware
+ """
+ container_info = self.container_info(
+ self.account_name, self.container_name, req)
+ policy_index = req.headers.get('X-Backend-Storage-Policy-Index',
+ container_info['storage_policy'])
+ obj_ring =
partition, nodes = obj_ring.get_nodes(
self.account_name, self.container_name, self.object_name)
+ object_versions = container_info['versions']
# do a HEAD request for checking object versions
if object_versions and not req.environ.get('swift_versioned_copy'):
@@ -502,20 +593,6 @@ class ObjectController(Controller):
hreq, _('Object'), obj_ring, partition,
- # Used by container sync feature
- if 'x-timestamp' in req.headers:
- try:
- req_timestamp = Timestamp(req.headers['X-Timestamp'])
- except ValueError:
- return HTTPBadRequest(
- request=req, content_type='text/plain',
- body='X-Timestamp should be a UNIX timestamp float value; '
- 'was %r' % req.headers['x-timestamp'])
- req.headers['X-Timestamp'] = req_timestamp.internal
- else:
- req.headers['X-Timestamp'] = Timestamp(time.time()).internal
- if object_versions and not req.environ.get('swift_versioned_copy'):
is_manifest = 'X-Object-Manifest' in req.headers or \
'X-Object-Manifest' in hresp.headers
if hresp.status_int != HTTP_NOT_FOUND and not is_manifest:
@@ -543,120 +620,41 @@ class ObjectController(Controller):
copy_resp = self.COPY(copy_req)
if is_client_error(copy_resp.status_int):
# missing container or bad permissions
- return HTTPPreconditionFailed(request=req)
+ raise HTTPPreconditionFailed(request=req)
elif not is_success(copy_resp.status_int):
# could not copy the data, bail
- return HTTPServiceUnavailable(request=req)
+ raise HTTPServiceUnavailable(request=req)
- reader = req.environ['wsgi.input'].read
- data_source = iter(lambda: reader(, '')
- source_header = req.headers.get('X-Copy-From')
- source_resp = None
- if source_header:
- if req.environ.get('swift.orig_req_method', req.method) != 'POST':
- req.environ.setdefault('swift.log_info', []).append(
- 'x-copy-from:%s' % source_header)
- ver, acct, _rest = req.split_path(2, 3, True)
- src_account_name = req.headers.get('X-Copy-From-Account', None)
- if src_account_name:
- src_account_name = check_account_format(req, src_account_name)
- else:
- src_account_name = acct
- src_container_name, src_obj_name = check_copy_from_header(req)
- source_header = '/%s/%s/%s/%s' % (
- ver, src_account_name, src_container_name, src_obj_name)
- source_req = req.copy_get()
- # make sure the source request uses it's container_info
- source_req.headers.pop('X-Backend-Storage-Policy-Index', None)
- source_req.path_info = source_header
- source_req.headers['X-Newest'] = 'true'
- orig_obj_name = self.object_name
- orig_container_name = self.container_name
- orig_account_name = self.account_name
- self.object_name = src_obj_name
- self.container_name = src_container_name
- self.account_name = src_account_name
- sink_req = Request.blank(req.path_info,
- environ=req.environ, headers=req.headers)
- source_resp = self.GET(source_req)
- # This gives middlewares a way to change the source; for example,
- # this lets you COPY a SLO manifest and have the new object be the
- # concatenation of the segments (like what a GET request gives
- # the client), not a copy of the manifest file.
- hook = req.environ.get(
- 'swift.copy_hook',
- (lambda source_req, source_resp, sink_req: source_resp))
- source_resp = hook(source_req, source_resp, sink_req)
- if source_resp.status_int >= HTTP_MULTIPLE_CHOICES:
- return source_resp
- self.object_name = orig_obj_name
- self.container_name = orig_container_name
- self.account_name = orig_account_name
- data_source = iter(source_resp.app_iter)
- sink_req.content_length = source_resp.content_length
- if sink_req.content_length is None:
- # This indicates a transfer-encoding: chunked source object,
- # which currently only happens because there are more than
- # CONTAINER_LISTING_LIMIT segments in a segmented object. In
- # this case, we're going to refuse to do the server-side copy.
- return HTTPRequestEntityTooLarge(request=req)
- if sink_req.content_length > constraints.MAX_FILE_SIZE:
- return HTTPRequestEntityTooLarge(request=req)
- sink_req.etag = source_resp.etag
- # we no longer need the X-Copy-From header
- del sink_req.headers['X-Copy-From']
- if 'X-Copy-From-Account' in sink_req.headers:
- del sink_req.headers['X-Copy-From-Account']
- if not content_type_manually_set:
- sink_req.headers['Content-Type'] = \
- source_resp.headers['Content-Type']
- if config_true_value(
- sink_req.headers.get('x-fresh-metadata', 'false')):
- # post-as-copy: ignore new sysmeta, copy existing sysmeta
- condition = lambda k: is_sys_meta('object', k)
- remove_items(sink_req.headers, condition)
- copy_header_subset(source_resp, sink_req, condition)
+ def _update_content_type(self, req):
+ # Sometimes the 'content-type' header exists, but is set to None.
+ req.content_type_manually_set = True
+ detect_content_type = \
+ config_true_value(req.headers.get('x-detect-content-type'))
+ if detect_content_type or not req.headers.get('content-type'):
+ guessed_type, _junk = mimetypes.guess_type(req.path_info)
+ req.headers['Content-Type'] = guessed_type or \
+ 'application/octet-stream'
+ if detect_content_type:
+ req.headers.pop('x-detect-content-type')
- # copy/update existing sysmeta and user meta
- copy_headers_into(source_resp, sink_req)
- copy_headers_into(req, sink_req)
- # copy over x-static-large-object for POSTs and manifest copies
- if 'X-Static-Large-Object' in source_resp.headers and \
- req.params.get('multipart-manifest') == 'get':
- sink_req.headers['X-Static-Large-Object'] = \
- source_resp.headers['X-Static-Large-Object']
- req = sink_req
- req, delete_at_container, delete_at_part, \
- delete_at_nodes = self._config_obj_expiration(req)
- node_iter = GreenthreadSafeIterator(
- self.iter_nodes_local_first(obj_ring, partition))
- pile = GreenPile(len(nodes))
- te = req.headers.get('transfer-encoding', '')
- chunked = ('chunked' in te)
- outgoing_headers = self._backend_requests(
- req, len(nodes), container_partition, containers,
- delete_at_container, delete_at_part, delete_at_nodes)
+ req.content_type_manually_set = False
- for nheaders in outgoing_headers:
- # RFC2616:8.2.3 disallows 100-continue without a body
- if (req.content_length > 0) or chunked:
- nheaders['Expect'] = '100-continue'
- pile.spawn(self._connect_put_node, node_iter, partition,
- req.swift_entity_path, nheaders,
- conns = [conn for conn in pile if conn]
- min_conns = quorum_size(len(nodes))
+ def _update_x_timestamp(self, req):
+ # Used by container sync feature
+ if 'x-timestamp' in req.headers:
+ try:
+ req_timestamp = Timestamp(req.headers['X-Timestamp'])
+ except ValueError:
+ raise HTTPBadRequest(
+ request=req, content_type='text/plain',
+ body='X-Timestamp should be a UNIX timestamp float value; '
+ 'was %r' % req.headers['x-timestamp'])
+ req.headers['X-Timestamp'] = req_timestamp.internal
+ else:
+ req.headers['X-Timestamp'] = Timestamp(time.time()).internal
+ return None
+ def _check_failure_put_connections(self, conns, req, nodes):
if req.if_none_match is not None and '*' in req.if_none_match:
statuses = [conn.resp.status for conn in conns if conn.resp]
@@ -664,7 +662,7 @@ class ObjectController(Controller):
_('Object PUT returning 412, %(statuses)r'),
{'statuses': statuses})
- return HTTPPreconditionFailed(request=req)
+ raise HTTPPreconditionFailed(request=req)
if any(conn for conn in conns if conn.resp and
conn.resp.status == HTTP_CONFLICT):
@@ -675,14 +673,44 @@ class ObjectController(Controller):
'%(req_timestamp)s <= %(timestamps)r'),
{'req_timestamp': req.timestamp.internal,
'timestamps': ', '.join(timestamps)})
- return HTTPAccepted(request=req)
+ raise HTTPAccepted(request=req)
+ min_conns = quorum_size(len(nodes))
+ self._check_min_conn(req, conns, min_conns)
+ def _get_put_connections(self, req, nodes, partition, outgoing_headers,
+ policy, expect):
+ """
+ Establish connections to storage nodes for PUT request
+ """
+ obj_ring = policy.object_ring
+ node_iter = GreenthreadSafeIterator(
+ self.iter_nodes_local_first(obj_ring, partition))
+ pile = GreenPile(len(nodes))
+ for nheaders in outgoing_headers:
+ if expect:
+ nheaders['Expect'] = '100-continue'
+ pile.spawn(self._connect_put_node, node_iter, partition,
+ req.swift_entity_path, nheaders,
+ conns = [conn for conn in pile if conn]
+ return conns
+ def _check_min_conn(self, req, conns, min_conns, msg=None):
+ msg = msg or 'Object PUT returning 503, %(conns)s/%(nodes)s ' \
+ 'required connections'
if len(conns) < min_conns:
- _('Object PUT returning 503, %(conns)s/%(nodes)s '
- 'required connections'),
- {'conns': len(conns), 'nodes': min_conns})
- return HTTPServiceUnavailable(request=req)
+ {'conns': len(conns), 'nodes': min_conns})
+ raise HTTPServiceUnavailable(request=req)
+ def _transfer_data(self, req, data_source, conns, nodes):
+ min_conns = quorum_size(len(nodes))
bytes_transferred = 0
with ContextPool(len(nodes)) as pool:
@@ -695,48 +723,90 @@ class ObjectController(Controller):
chunk = next(data_source)
except StopIteration:
- if chunked:
+ if req.is_chunked:
for conn in conns:
bytes_transferred += len(chunk)
if bytes_transferred > constraints.MAX_FILE_SIZE:
- return HTTPRequestEntityTooLarge(request=req)
+ raise HTTPRequestEntityTooLarge(request=req)
for conn in list(conns):
if not conn.failed:
'%x\r\n%s\r\n' % (len(chunk), chunk)
- if chunked else chunk)
+ if req.is_chunked else chunk)
+ conn.close()
- if len(conns) < min_conns:
- 'Object PUT exceptions during'
- ' send, %(conns)s/%(nodes)s required connections'),
- {'conns': len(conns), 'nodes': min_conns})
- return HTTPServiceUnavailable(request=req)
+ self._check_min_conn(
+ req, conns, min_conns,
+ msg='Object PUT exceptions during'
+ ' send, %(conns)s/%(nodes)s required connections')
for conn in conns:
if conn.queue.unfinished_tasks:
conns = [conn for conn in conns if not conn.failed]
+ self._check_min_conn(
+ req, conns, min_conns,
+ msg='Object PUT exceptions after last send, '
+ '%(conns)s/%(nodes)s required connections')
except ChunkReadTimeout as err:
_('ERROR Client read timeout (%ss)'), err.seconds)'client_timeouts')
- return HTTPRequestTimeout(request=req)
+ raise HTTPRequestTimeout(request=req)
+ except HTTPException:
+ raise
except (Exception, Timeout):
_('ERROR Exception causing client disconnect'))
- return HTTPClientDisconnect(request=req)
+ raise HTTPClientDisconnect(request=req)
if req.content_length and bytes_transferred < req.content_length:
req.client_disconnect = True
_('Client disconnected without sending enough data'))'client_disconnects')
- return HTTPClientDisconnect(request=req)
+ raise HTTPClientDisconnect(request=req)
+ def _store_object(self, req, data_source, nodes, partition,
+ outgoing_headers):
+ """
+ Store a replicated object.
+ This method is responsible for establishing connection
+ with storage nodes and sending object to each one of those
+ nodes. After sending the data, the "best" reponse will be
+ returned based on statuses from all connections
+ """
+ policy_idx = req.headers.get('X-Backend-Storage-Policy-Index')
+ policy = POLICIES.get_by_index(policy_idx)
+ if not nodes:
+ return HTTPNotFound()
+ # RFC2616:8.2.3 disallows 100-continue without a body
+ if (req.content_length > 0) or req.is_chunked:
+ expect = True
+ else:
+ expect = False
+ conns = self._get_put_connections(req, nodes, partition,
+ outgoing_headers, policy, expect)
+ try:
+ # check that a minimum number of connections were established and
+ # meet all the correct conditions set in the request
+ self._check_failure_put_connections(conns, req, nodes)
- statuses, reasons, bodies, etags = self._get_put_responses(req, conns,
- nodes)
+ # transfer data
+ self._transfer_data(req, data_source, conns, nodes)
+ # get responses
+ statuses, reasons, bodies, etags = self._get_put_responses(
+ req, conns, nodes)
+ except HTTPException as resp:
+ return resp
+ finally:
+ for conn in conns:
+ conn.close()
if len(etags) > 1:
@@ -745,14 +815,6 @@ class ObjectController(Controller):
etag = etags.pop() if len(etags) else None
resp = self.best_response(req, statuses, reasons, bodies,
_('Object PUT'), etag=etag)
- if source_header:
- acct, path = source_header.split('/', 3)[2:4]
- resp.headers['X-Copied-From-Account'] = quote(acct)
- resp.headers['X-Copied-From'] = quote(path)
- if 'last-modified' in source_resp.headers:
- resp.headers['X-Copied-From-Last-Modified'] = \
- source_resp.headers['last-modified']
- copy_headers_into(req, resp)
resp.last_modified = math.ceil(
return resp
@@ -760,6 +822,79 @@ class ObjectController(Controller):
+ def PUT(self, req):
+ """HTTP PUT request handler."""
+ if req.if_none_match is not None and '*' not in req.if_none_match:
+ # Sending an etag with if-none-match isn't currently supported
+ return HTTPBadRequest(request=req, content_type='text/plain',
+ body='If-None-Match only supports *')
+ container_info = self.container_info(
+ self.account_name, self.container_name, req)
+ policy_index = req.headers.get('X-Backend-Storage-Policy-Index',
+ container_info['storage_policy'])
+ obj_ring =
+ container_nodes = container_info['nodes']
+ container_partition = container_info['partition']
+ partition, nodes = obj_ring.get_nodes(
+ self.account_name, self.container_name, self.object_name)
+ # pass the policy index to storage nodes via req header
+ req.headers['X-Backend-Storage-Policy-Index'] = policy_index
+ req.acl = container_info['write_acl']
+ req.environ['swift_sync_key'] = container_info['sync_key']
+ # is request authorized
+ if 'swift.authorize' in req.environ:
+ aresp = req.environ['swift.authorize'](req)
+ if aresp:
+ return aresp
+ if not container_info['nodes']:
+ return HTTPNotFound(request=req)
+ # update content type in case it is missing
+ self._update_content_type(req)
+ # check constraints on object name and request headers
+ error_response = check_object_creation(req, self.object_name) or \
+ check_content_type(req)
+ if error_response:
+ return error_response
+ self._update_x_timestamp(req)
+ # check if versioning is enabled and handle copying previous version
+ self._handle_object_versions(req)
+ # check if request is a COPY of an existing object
+ source_header = req.headers.get('X-Copy-From')
+ if source_header:
+ error_response, req, data_source, update_response = \
+ self._handle_copy_request(req)
+ if error_response:
+ return error_response
+ else:
+ reader = req.environ['wsgi.input'].read
+ data_source = iter(lambda: reader(, '')
+ update_response = lambda req, resp: resp
+ # check if object is set to be automaticaly deleted (i.e. expired)
+ req, delete_at_container, delete_at_part, \
+ delete_at_nodes = self._config_obj_expiration(req)
+ # add special headers to be handled by storage nodes
+ outgoing_headers = self._backend_requests(
+ req, len(nodes), container_partition, container_nodes,
+ delete_at_container, delete_at_part, delete_at_nodes)
+ # send object to storage nodes
+ resp = self._store_object(
+ req, data_source, nodes, partition, outgoing_headers)
+ return update_response(req, resp)
+ @public
+ @cors_validation
+ @delay_denial
def DELETE(self, req):
"""HTTP DELETE request handler."""
container_info = self.container_info(
diff --git a/test/unit/ b/test/unit/
index 0e10d3bac..da7212c98 100644
--- a/test/unit/
+++ b/test/unit/
@@ -733,6 +733,9 @@ def fake_http_connect(*code_iter, **kwargs):
def getheader(self, name, default=None):
return swob.HeaderKeyDict(self.getheaders()).get(name, default)
+ def close(self):
+ pass
timestamps_iter = iter(kwargs.get('timestamps') or ['1'] * len(code_iter))
etag_iter = iter(kwargs.get('etags') or [None] * len(code_iter))
if isinstance(kwargs.get('headers'), list):
diff --git a/test/unit/proxy/ b/test/unit/proxy/
index 6d5bf0ed5..1b3b2e06d 100644
--- a/test/unit/proxy/
+++ b/test/unit/proxy/
@@ -993,7 +993,10 @@ class TestObjectController(unittest.TestCase):
headers={'Content-Length': '0',
'Content-Type': 'text/plain'})
- res = method(req)
+ try:
+ res = method(req)
+ except HTTPException as res:
+ pass
self.assertEquals(res.status_int, expected)
# repeat test
@@ -1003,7 +1006,10 @@ class TestObjectController(unittest.TestCase):
headers={'Content-Length': '0',
'Content-Type': 'text/plain'})
- res = method(req)
+ try:
+ res = method(req)
+ except HTTPException as res:
+ pass
self.assertEquals(res.status_int, expected)
@@ -1734,7 +1740,10 @@ class TestObjectController(unittest.TestCase):
req = Request.blank('/v1/a/c/o.jpg', {})
req.content_length = 0
- res = controller.PUT(req)
+ try:
+ res = controller.PUT(req)
+ except HTTPException as res:
+ pass
expected = str(expected)
self.assertEquals(res.status[:len(expected)], expected)
test_status_map((200, 200, 201, 201, -1), 201) # connect exc
@@ -1763,7 +1772,10 @@ class TestObjectController(unittest.TestCase):
environ={'REQUEST_METHOD': 'PUT'},
body='some data')
- res = controller.PUT(req)
+ try:
+ res = controller.PUT(req)
+ except HTTPException as res:
+ pass
expected = str(expected)
self.assertEquals(res.status[:len(expected)], expected)
test_status_map((200, 200, 201, -1, 201), 201)
@@ -1805,7 +1817,10 @@ class TestObjectController(unittest.TestCase):
req = Request.blank('/v1/a/c/o.jpg', {})
req.content_length = 0
- res = controller.PUT(req)
+ try:
+ res = controller.PUT(req)
+ except HTTPException as res:
+ pass
expected = str(expected)
@@ -3391,7 +3406,10 @@ class TestObjectController(unittest.TestCase): = {}
- resp = controller.PUT(req)
+ try:
+ resp = controller.PUT(req)
+ except HTTPException as resp:
+ pass
self.assertEquals(resp.status_int, 413)
def test_basic_COPY(self):
@@ -3632,7 +3650,10 @@ class TestObjectController(unittest.TestCase):
kwargs = dict(body=copy_from_obj_body)
with self.controller_context(req, *status_list,
**kwargs) as controller:
- resp = controller.COPY(req)
+ try:
+ resp = controller.COPY(req)
+ except HTTPException as resp:
+ pass
self.assertEquals(resp.status_int, 413)
@@ -3656,7 +3677,10 @@ class TestObjectController(unittest.TestCase):
kwargs = dict(body=copy_from_obj_body)
with self.controller_context(req, *status_list,
**kwargs) as controller:
- resp = controller.COPY(req)
+ try:
+ resp = controller.COPY(req)
+ except HTTPException as resp:
+ pass
self.assertEquals(resp.status_int, 413)
def test_COPY_newest(self):
@@ -3698,41 +3722,46 @@ class TestObjectController(unittest.TestCase):
def test_COPY_delete_at(self):
with save_globals():
- given_headers = {}
+ backend_requests = []
- def fake_connect_put_node(nodes, part, path, headers,
- logger_thread_locals):
- given_headers.update(headers)
+ def capture_requests(ipaddr, port, device, partition, method, path,
+ headers=None, query_string=None):
+ backend_requests.append((method, path, headers))
controller = proxy_server.ObjectController(, 'a',
'c', 'o')
- controller._connect_put_node = fake_connect_put_node
- set_http_connect(200, 200, 200, 200, 200, 201, 201, 201)
+ set_http_connect(200, 200, 200, 200, 200, 201, 201, 201,
+ give_connect=capture_requests) = {}
req = Request.blank('/v1/a/c/o',
environ={'REQUEST_METHOD': 'COPY'},
headers={'Destination': '/c/o'})
- controller.COPY(req)
- self.assertEquals(given_headers.get('X-Delete-At'), '9876543210')
- self.assertTrue('X-Delete-At-Host' in given_headers)
- self.assertTrue('X-Delete-At-Device' in given_headers)
- self.assertTrue('X-Delete-At-Partition' in given_headers)
- self.assertTrue('X-Delete-At-Container' in given_headers)
+ resp = controller.COPY(req)
+ self.assertEqual(201, resp.status_int) # sanity
+ for method, path, given_headers in backend_requests:
+ if method != 'PUT':
+ continue
+ self.assertEquals(given_headers.get('X-Delete-At'),
+ '9876543210')
+ self.assertTrue('X-Delete-At-Host' in given_headers)
+ self.assertTrue('X-Delete-At-Device' in given_headers)
+ self.assertTrue('X-Delete-At-Partition' in given_headers)
+ self.assertTrue('X-Delete-At-Container' in given_headers)
def test_COPY_account_delete_at(self):
with save_globals():
- given_headers = {}
+ backend_requests = []
- def fake_connect_put_node(nodes, part, path, headers,
- logger_thread_locals):
- given_headers.update(headers)
+ def capture_requests(ipaddr, port, device, partition, method, path,
+ headers=None, query_string=None):
+ backend_requests.append((method, path, headers))
controller = proxy_server.ObjectController(, 'a',
'c', 'o')
- controller._connect_put_node = fake_connect_put_node
- set_http_connect(200, 200, 200, 200, 200, 200, 200, 201, 201, 201)
+ set_http_connect(200, 200, 200, 200, 200, 200, 200, 201, 201, 201,
+ give_connect=capture_requests) = {}
req = Request.blank('/v1/a/c/o',
environ={'REQUEST_METHOD': 'COPY'},
@@ -3740,12 +3769,17 @@ class TestObjectController(unittest.TestCase):
'Destination-Account': 'a1'})
- controller.COPY(req)
- self.assertEquals(given_headers.get('X-Delete-At'), '9876543210')
- self.assertTrue('X-Delete-At-Host' in given_headers)
- self.assertTrue('X-Delete-At-Device' in given_headers)
- self.assertTrue('X-Delete-At-Partition' in given_headers)
- self.assertTrue('X-Delete-At-Container' in given_headers)
+ resp = controller.COPY(req)
+ self.assertEqual(201, resp.status_int) # sanity
+ for method, path, given_headers in backend_requests:
+ if method != 'PUT':
+ continue
+ self.assertEquals(given_headers.get('X-Delete-At'),
+ '9876543210')
+ self.assertTrue('X-Delete-At-Host' in given_headers)
+ self.assertTrue('X-Delete-At-Device' in given_headers)
+ self.assertTrue('X-Delete-At-Partition' in given_headers)
+ self.assertTrue('X-Delete-At-Container' in given_headers)
def test_chunked_put(self):
diff --git a/test/unit/proxy/ b/test/unit/proxy/
index c3b673108..d80f2855e 100644
--- a/test/unit/proxy/
+++ b/test/unit/proxy/
@@ -70,6 +70,9 @@ class FakeServerConnection(WSGIContext):
def send(self, data): += data
+ def close(self):
+ pass
def __call__(self, ipaddr, port, device, partition, method, path,
headers=None, query_string=None):
self.path = quote('/' + device + '/' + str(partition) + path)