summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThiago da Silva <thiago@redhat.com>2015-02-17 16:55:34 -0500
committerThiago da Silva <thiago@redhat.com>2015-03-21 12:30:58 -0400
commit23d0842dec250905f68df601926ba8228392b322 (patch)
tree5e1581e72ff60149b0e8f82a50faaf2b0567c541
parent3d3db0ab789f37a69fc178eeb74c8cc61e4c6c1b (diff)
downloadswift-23d0842dec250905f68df601926ba8228392b322.tar.gz
Refactoring the PUT method
Extracting large chunks of the PUT method into smaller methods to improve maintainability and reuse of code. Based on the work that Clay Gerrard started: https://review.openstack.org/#/c/77812/ Co-Authored-By: Clay Gerrard <clay.gerrard@gmail.com> Change-Id: Id479fc5b159a2782361ac4a6e4a6d8bbaee4fe85 Signed-off-by: Thiago da Silva <thiago@redhat.com>
-rw-r--r--swift/common/swob.py4
-rw-r--r--swift/proxy/controllers/obj.py561
-rw-r--r--test/unit/__init__.py3
-rw-r--r--test/unit/proxy/test_server.py98
-rw-r--r--test/unit/proxy/test_sysmeta.py3
5 files changed, 424 insertions, 245 deletions
diff --git a/swift/common/swob.py b/swift/common/swob.py
index 1c43316ba..729cdd96f 100644
--- a/swift/common/swob.py
+++ b/swift/common/swob.py
@@ -929,6 +929,10 @@ class Request(object):
return '/' + entity_path
@property
+ def is_chunked(self):
+ return 'chunked' in self.headers.get('transfer-encoding', '')
+
+ @property
def url(self):
"Provides the full url of the request"
return self.host_url + self.path_qs
diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py
index 1b9bcab61..70b0d0cf6 100644
--- a/swift/proxy/controllers/obj.py
+++ b/swift/proxy/controllers/obj.py
@@ -52,12 +52,13 @@ from swift.common.http import (
HTTP_MULTIPLE_CHOICES, HTTP_NOT_FOUND, HTTP_INTERNAL_SERVER_ERROR,
HTTP_SERVICE_UNAVAILABLE, HTTP_INSUFFICIENT_STORAGE,
HTTP_PRECONDITION_FAILED, HTTP_CONFLICT)
+from swift.common.storage_policy import POLICIES
from swift.proxy.controllers.base import Controller, delay_denial, \
cors_validation
from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPNotFound, \
HTTPPreconditionFailed, HTTPRequestEntityTooLarge, HTTPRequestTimeout, \
HTTPServerError, HTTPServiceUnavailable, Request, \
- HTTPClientDisconnect, HeaderKeyDict
+ HTTPClientDisconnect, HeaderKeyDict, HTTPException
from swift.common.request_helpers import is_sys_or_user_meta, is_sys_meta, \
remove_items, copy_header_subset
@@ -321,7 +322,13 @@ class ObjectController(Controller):
def _connect_put_node(self, nodes, part, path, headers,
logger_thread_locals):
- """Method for a file PUT connect"""
+ """
+ Make a connection for a replicated object.
+
+ Connects to the first working node that it finds in node_iter
+ and sends over the request headers. Returns an HTTPConnection
+ object to handle the rest of the streaming.
+ """
self.app.logger.thread_locals = logger_thread_locals
for node in nodes:
try:
@@ -350,36 +357,41 @@ class ObjectController(Controller):
self.app.error_limit(node, _('ERROR Insufficient Storage'))
elif is_server_error(resp.status):
self.app.error_occurred(
- node, _('ERROR %(status)d Expect: 100-continue '
- 'From Object Server') % {
- 'status': resp.status})
+ node,
+ _('ERROR %(status)d Expect: 100-continue '
+ 'From Object Server') % {
+ 'status': resp.status})
except (Exception, Timeout):
self.app.exception_occurred(
node, _('Object'),
_('Expect: 100-continue on %s') % path)
+ def _await_response(self, conn, **kwargs):
+ with Timeout(self.app.node_timeout):
+ if conn.resp:
+ return conn.resp
+ else:
+ return conn.getresponse()
+
+ def _get_conn_response(self, conn, req, **kwargs):
+ try:
+ resp = self._await_response(conn, **kwargs)
+ return (conn, resp)
+ except (Exception, Timeout):
+ self.app.exception_occurred(
+ conn.node, _('Object'),
+ _('Trying to get final status of PUT to %s') % req.path)
+ return (None, None)
+
def _get_put_responses(self, req, conns, nodes):
statuses = []
reasons = []
bodies = []
etags = set()
- def get_conn_response(conn):
- try:
- with Timeout(self.app.node_timeout):
- if conn.resp:
- return (conn, conn.resp)
- else:
- return (conn, conn.getresponse())
- except (Exception, Timeout):
- self.app.exception_occurred(
- conn.node, _('Object'),
- _('Trying to get final status of PUT to %s') % req.path)
- return (None, None)
-
pile = GreenAsyncPile(len(conns))
for conn in conns:
- pile.spawn(get_conn_response, conn)
+ pile.spawn(self._get_conn_response, conn, req)
def _handle_response(conn, response):
statuses.append(response.status)
@@ -440,56 +452,135 @@ class ObjectController(Controller):
return req, delete_at_container, delete_at_part, delete_at_nodes
- @public
- @cors_validation
- @delay_denial
- def PUT(self, req):
- """HTTP PUT request handler."""
- if req.if_none_match is not None and '*' not in req.if_none_match:
- # Sending an etag with if-none-match isn't currently supported
- return HTTPBadRequest(request=req, content_type='text/plain',
- body='If-None-Match only supports *')
- container_info = self.container_info(
- self.account_name, self.container_name, req)
- policy_index = req.headers.get('X-Backend-Storage-Policy-Index',
- container_info['storage_policy'])
- obj_ring = self.app.get_object_ring(policy_index)
+ def _handle_copy_request(self, req):
+ """
+ This method handles copying objects based on values set in the headers
+ 'X-Copy-From' and 'X-Copy-From-Account'
- # pass the policy index to storage nodes via req header
- req.headers['X-Backend-Storage-Policy-Index'] = policy_index
- container_partition = container_info['partition']
- containers = container_info['nodes']
- req.acl = container_info['write_acl']
- req.environ['swift_sync_key'] = container_info['sync_key']
- object_versions = container_info['versions']
- if 'swift.authorize' in req.environ:
- aresp = req.environ['swift.authorize'](req)
- if aresp:
- return aresp
+ This method was added as part of the refactoring of the PUT method and
+ the functionality is expected to be moved to middleware
+ """
+ if req.environ.get('swift.orig_req_method', req.method) != 'POST':
+ req.environ.setdefault('swift.log_info', []).append(
+ 'x-copy-from:%s' % req.headers['X-Copy-From'])
+ ver, acct, _rest = req.split_path(2, 3, True)
+ src_account_name = req.headers.get('X-Copy-From-Account', None)
+ if src_account_name:
+ src_account_name = check_account_format(req, src_account_name)
+ else:
+ src_account_name = acct
+ src_container_name, src_obj_name = check_copy_from_header(req)
+ source_header = '/%s/%s/%s/%s' % (
+ ver, src_account_name, src_container_name, src_obj_name)
+ source_req = req.copy_get()
+
+ # make sure the source request uses it's container_info
+ source_req.headers.pop('X-Backend-Storage-Policy-Index', None)
+ source_req.path_info = source_header
+ source_req.headers['X-Newest'] = 'true'
+
+ orig_obj_name = self.object_name
+ orig_container_name = self.container_name
+ orig_account_name = self.account_name
+ sink_req = Request.blank(req.path_info,
+ environ=req.environ, headers=req.headers)
+
+ self.object_name = src_obj_name
+ self.container_name = src_container_name
+ self.account_name = src_account_name
+ source_resp = self.GET(source_req)
+
+ # This gives middlewares a way to change the source; for example,
+ # this lets you COPY a SLO manifest and have the new object be the
+ # concatenation of the segments (like what a GET request gives
+ # the client), not a copy of the manifest file.
+ hook = req.environ.get(
+ 'swift.copy_hook',
+ (lambda source_req, source_resp, sink_req: source_resp))
+ source_resp = hook(source_req, source_resp, sink_req)
+
+ # reset names
+ self.object_name = orig_obj_name
+ self.container_name = orig_container_name
+ self.account_name = orig_account_name
+
+ if source_resp.status_int >= HTTP_MULTIPLE_CHOICES:
+ # this is a bit of ugly code, but I'm willing to live with it
+ # until copy request handling moves to middleware
+ return source_resp, None, None, None
+ if source_resp.content_length is None:
+ # This indicates a transfer-encoding: chunked source object,
+ # which currently only happens because there are more than
+ # CONTAINER_LISTING_LIMIT segments in a segmented object. In
+ # this case, we're going to refuse to do the server-side copy.
+ raise HTTPRequestEntityTooLarge(request=req)
+ if source_resp.content_length > constraints.MAX_FILE_SIZE:
+ raise HTTPRequestEntityTooLarge(request=req)
+
+ data_source = iter(source_resp.app_iter)
+ sink_req.content_length = source_resp.content_length
+ sink_req.etag = source_resp.etag
+
+ # we no longer need the X-Copy-From header
+ del sink_req.headers['X-Copy-From']
+ if 'X-Copy-From-Account' in sink_req.headers:
+ del sink_req.headers['X-Copy-From-Account']
+ if not req.content_type_manually_set:
+ sink_req.headers['Content-Type'] = \
+ source_resp.headers['Content-Type']
+ if config_true_value(
+ sink_req.headers.get('x-fresh-metadata', 'false')):
+ # post-as-copy: ignore new sysmeta, copy existing sysmeta
+ condition = lambda k: is_sys_meta('object', k)
+ remove_items(sink_req.headers, condition)
+ copy_header_subset(source_resp, sink_req, condition)
+ else:
+ # copy/update existing sysmeta and user meta
+ copy_headers_into(source_resp, sink_req)
+ copy_headers_into(req, sink_req)
- if not containers:
- return HTTPNotFound(request=req)
+ # copy over x-static-large-object for POSTs and manifest copies
+ if 'X-Static-Large-Object' in source_resp.headers and \
+ req.params.get('multipart-manifest') == 'get':
+ sink_req.headers['X-Static-Large-Object'] = \
+ source_resp.headers['X-Static-Large-Object']
- # Sometimes the 'content-type' header exists, but is set to None.
- content_type_manually_set = True
- detect_content_type = \
- config_true_value(req.headers.get('x-detect-content-type'))
- if detect_content_type or not req.headers.get('content-type'):
- guessed_type, _junk = mimetypes.guess_type(req.path_info)
- req.headers['Content-Type'] = guessed_type or \
- 'application/octet-stream'
- if detect_content_type:
- req.headers.pop('x-detect-content-type')
- else:
- content_type_manually_set = False
+ req = sink_req
- error_response = check_object_creation(req, self.object_name) or \
- check_content_type(req)
- if error_response:
- return error_response
+ def update_response(req, resp):
+ acct, path = source_resp.environ['PATH_INFO'].split('/', 3)[2:4]
+ resp.headers['X-Copied-From-Account'] = quote(acct)
+ resp.headers['X-Copied-From'] = quote(path)
+ if 'last-modified' in source_resp.headers:
+ resp.headers['X-Copied-From-Last-Modified'] = \
+ source_resp.headers['last-modified']
+ copy_headers_into(req, resp)
+ return resp
+ # this is a bit of ugly code, but I'm willing to live with it
+ # until copy request handling moves to middleware
+ return None, req, data_source, update_response
+
+ def _handle_object_versions(self, req):
+ """
+ This method handles versionining of objects in containers that
+ have the feature enabled.
+
+ When a new PUT request is sent, the proxy checks for previous versions
+ of that same object name. If found, it is copied to a different
+ container and the new version is stored in its place.
+
+ This method was added as part of the PUT method refactoring and the
+ functionality is expected to be moved to middleware
+ """
+ container_info = self.container_info(
+ self.account_name, self.container_name, req)
+ policy_index = req.headers.get('X-Backend-Storage-Policy-Index',
+ container_info['storage_policy'])
+ obj_ring = self.app.get_object_ring(policy_index)
partition, nodes = obj_ring.get_nodes(
self.account_name, self.container_name, self.object_name)
+ object_versions = container_info['versions']
# do a HEAD request for checking object versions
if object_versions and not req.environ.get('swift_versioned_copy'):
@@ -502,20 +593,6 @@ class ObjectController(Controller):
hreq, _('Object'), obj_ring, partition,
hreq.swift_entity_path)
- # Used by container sync feature
- if 'x-timestamp' in req.headers:
- try:
- req_timestamp = Timestamp(req.headers['X-Timestamp'])
- except ValueError:
- return HTTPBadRequest(
- request=req, content_type='text/plain',
- body='X-Timestamp should be a UNIX timestamp float value; '
- 'was %r' % req.headers['x-timestamp'])
- req.headers['X-Timestamp'] = req_timestamp.internal
- else:
- req.headers['X-Timestamp'] = Timestamp(time.time()).internal
-
- if object_versions and not req.environ.get('swift_versioned_copy'):
is_manifest = 'X-Object-Manifest' in req.headers or \
'X-Object-Manifest' in hresp.headers
if hresp.status_int != HTTP_NOT_FOUND and not is_manifest:
@@ -543,120 +620,41 @@ class ObjectController(Controller):
copy_resp = self.COPY(copy_req)
if is_client_error(copy_resp.status_int):
# missing container or bad permissions
- return HTTPPreconditionFailed(request=req)
+ raise HTTPPreconditionFailed(request=req)
elif not is_success(copy_resp.status_int):
# could not copy the data, bail
- return HTTPServiceUnavailable(request=req)
+ raise HTTPServiceUnavailable(request=req)
- reader = req.environ['wsgi.input'].read
- data_source = iter(lambda: reader(self.app.client_chunk_size), '')
- source_header = req.headers.get('X-Copy-From')
- source_resp = None
- if source_header:
- if req.environ.get('swift.orig_req_method', req.method) != 'POST':
- req.environ.setdefault('swift.log_info', []).append(
- 'x-copy-from:%s' % source_header)
- ver, acct, _rest = req.split_path(2, 3, True)
- src_account_name = req.headers.get('X-Copy-From-Account', None)
- if src_account_name:
- src_account_name = check_account_format(req, src_account_name)
- else:
- src_account_name = acct
- src_container_name, src_obj_name = check_copy_from_header(req)
- source_header = '/%s/%s/%s/%s' % (
- ver, src_account_name, src_container_name, src_obj_name)
- source_req = req.copy_get()
-
- # make sure the source request uses it's container_info
- source_req.headers.pop('X-Backend-Storage-Policy-Index', None)
- source_req.path_info = source_header
- source_req.headers['X-Newest'] = 'true'
- orig_obj_name = self.object_name
- orig_container_name = self.container_name
- orig_account_name = self.account_name
- self.object_name = src_obj_name
- self.container_name = src_container_name
- self.account_name = src_account_name
- sink_req = Request.blank(req.path_info,
- environ=req.environ, headers=req.headers)
- source_resp = self.GET(source_req)
-
- # This gives middlewares a way to change the source; for example,
- # this lets you COPY a SLO manifest and have the new object be the
- # concatenation of the segments (like what a GET request gives
- # the client), not a copy of the manifest file.
- hook = req.environ.get(
- 'swift.copy_hook',
- (lambda source_req, source_resp, sink_req: source_resp))
- source_resp = hook(source_req, source_resp, sink_req)
-
- if source_resp.status_int >= HTTP_MULTIPLE_CHOICES:
- return source_resp
- self.object_name = orig_obj_name
- self.container_name = orig_container_name
- self.account_name = orig_account_name
- data_source = iter(source_resp.app_iter)
- sink_req.content_length = source_resp.content_length
- if sink_req.content_length is None:
- # This indicates a transfer-encoding: chunked source object,
- # which currently only happens because there are more than
- # CONTAINER_LISTING_LIMIT segments in a segmented object. In
- # this case, we're going to refuse to do the server-side copy.
- return HTTPRequestEntityTooLarge(request=req)
- if sink_req.content_length > constraints.MAX_FILE_SIZE:
- return HTTPRequestEntityTooLarge(request=req)
- sink_req.etag = source_resp.etag
-
- # we no longer need the X-Copy-From header
- del sink_req.headers['X-Copy-From']
- if 'X-Copy-From-Account' in sink_req.headers:
- del sink_req.headers['X-Copy-From-Account']
- if not content_type_manually_set:
- sink_req.headers['Content-Type'] = \
- source_resp.headers['Content-Type']
- if config_true_value(
- sink_req.headers.get('x-fresh-metadata', 'false')):
- # post-as-copy: ignore new sysmeta, copy existing sysmeta
- condition = lambda k: is_sys_meta('object', k)
- remove_items(sink_req.headers, condition)
- copy_header_subset(source_resp, sink_req, condition)
+ def _update_content_type(self, req):
+ # Sometimes the 'content-type' header exists, but is set to None.
+ req.content_type_manually_set = True
+ detect_content_type = \
+ config_true_value(req.headers.get('x-detect-content-type'))
+ if detect_content_type or not req.headers.get('content-type'):
+ guessed_type, _junk = mimetypes.guess_type(req.path_info)
+ req.headers['Content-Type'] = guessed_type or \
+ 'application/octet-stream'
+ if detect_content_type:
+ req.headers.pop('x-detect-content-type')
else:
- # copy/update existing sysmeta and user meta
- copy_headers_into(source_resp, sink_req)
- copy_headers_into(req, sink_req)
-
- # copy over x-static-large-object for POSTs and manifest copies
- if 'X-Static-Large-Object' in source_resp.headers and \
- req.params.get('multipart-manifest') == 'get':
- sink_req.headers['X-Static-Large-Object'] = \
- source_resp.headers['X-Static-Large-Object']
-
- req = sink_req
-
- req, delete_at_container, delete_at_part, \
- delete_at_nodes = self._config_obj_expiration(req)
-
- node_iter = GreenthreadSafeIterator(
- self.iter_nodes_local_first(obj_ring, partition))
- pile = GreenPile(len(nodes))
- te = req.headers.get('transfer-encoding', '')
- chunked = ('chunked' in te)
-
- outgoing_headers = self._backend_requests(
- req, len(nodes), container_partition, containers,
- delete_at_container, delete_at_part, delete_at_nodes)
+ req.content_type_manually_set = False
- for nheaders in outgoing_headers:
- # RFC2616:8.2.3 disallows 100-continue without a body
- if (req.content_length > 0) or chunked:
- nheaders['Expect'] = '100-continue'
- pile.spawn(self._connect_put_node, node_iter, partition,
- req.swift_entity_path, nheaders,
- self.app.logger.thread_locals)
-
- conns = [conn for conn in pile if conn]
- min_conns = quorum_size(len(nodes))
+ def _update_x_timestamp(self, req):
+ # Used by container sync feature
+ if 'x-timestamp' in req.headers:
+ try:
+ req_timestamp = Timestamp(req.headers['X-Timestamp'])
+ except ValueError:
+ raise HTTPBadRequest(
+ request=req, content_type='text/plain',
+ body='X-Timestamp should be a UNIX timestamp float value; '
+ 'was %r' % req.headers['x-timestamp'])
+ req.headers['X-Timestamp'] = req_timestamp.internal
+ else:
+ req.headers['X-Timestamp'] = Timestamp(time.time()).internal
+ return None
+ def _check_failure_put_connections(self, conns, req, nodes):
if req.if_none_match is not None and '*' in req.if_none_match:
statuses = [conn.resp.status for conn in conns if conn.resp]
if HTTP_PRECONDITION_FAILED in statuses:
@@ -664,7 +662,7 @@ class ObjectController(Controller):
self.app.logger.debug(
_('Object PUT returning 412, %(statuses)r'),
{'statuses': statuses})
- return HTTPPreconditionFailed(request=req)
+ raise HTTPPreconditionFailed(request=req)
if any(conn for conn in conns if conn.resp and
conn.resp.status == HTTP_CONFLICT):
@@ -675,14 +673,44 @@ class ObjectController(Controller):
'%(req_timestamp)s <= %(timestamps)r'),
{'req_timestamp': req.timestamp.internal,
'timestamps': ', '.join(timestamps)})
- return HTTPAccepted(request=req)
+ raise HTTPAccepted(request=req)
+
+ min_conns = quorum_size(len(nodes))
+ self._check_min_conn(req, conns, min_conns)
+
+ def _get_put_connections(self, req, nodes, partition, outgoing_headers,
+ policy, expect):
+ """
+ Establish connections to storage nodes for PUT request
+ """
+ obj_ring = policy.object_ring
+ node_iter = GreenthreadSafeIterator(
+ self.iter_nodes_local_first(obj_ring, partition))
+ pile = GreenPile(len(nodes))
+
+ for nheaders in outgoing_headers:
+ if expect:
+ nheaders['Expect'] = '100-continue'
+ pile.spawn(self._connect_put_node, node_iter, partition,
+ req.swift_entity_path, nheaders,
+ self.app.logger.thread_locals)
+
+ conns = [conn for conn in pile if conn]
+
+ return conns
+
+ def _check_min_conn(self, req, conns, min_conns, msg=None):
+ msg = msg or 'Object PUT returning 503, %(conns)s/%(nodes)s ' \
+ 'required connections'
if len(conns) < min_conns:
- self.app.logger.error(
- _('Object PUT returning 503, %(conns)s/%(nodes)s '
- 'required connections'),
- {'conns': len(conns), 'nodes': min_conns})
- return HTTPServiceUnavailable(request=req)
+ self.app.logger.error((msg),
+ {'conns': len(conns), 'nodes': min_conns})
+ raise HTTPServiceUnavailable(request=req)
+
+ def _transfer_data(self, req, data_source, conns, nodes):
+ min_conns = quorum_size(len(nodes))
+
bytes_transferred = 0
try:
with ContextPool(len(nodes)) as pool:
@@ -695,48 +723,90 @@ class ObjectController(Controller):
try:
chunk = next(data_source)
except StopIteration:
- if chunked:
+ if req.is_chunked:
for conn in conns:
conn.queue.put('0\r\n\r\n')
break
bytes_transferred += len(chunk)
if bytes_transferred > constraints.MAX_FILE_SIZE:
- return HTTPRequestEntityTooLarge(request=req)
+ raise HTTPRequestEntityTooLarge(request=req)
for conn in list(conns):
if not conn.failed:
conn.queue.put(
'%x\r\n%s\r\n' % (len(chunk), chunk)
- if chunked else chunk)
+ if req.is_chunked else chunk)
else:
+ conn.close()
conns.remove(conn)
- if len(conns) < min_conns:
- self.app.logger.error(_(
- 'Object PUT exceptions during'
- ' send, %(conns)s/%(nodes)s required connections'),
- {'conns': len(conns), 'nodes': min_conns})
- return HTTPServiceUnavailable(request=req)
+ self._check_min_conn(
+ req, conns, min_conns,
+ msg='Object PUT exceptions during'
+ ' send, %(conns)s/%(nodes)s required connections')
for conn in conns:
if conn.queue.unfinished_tasks:
conn.queue.join()
conns = [conn for conn in conns if not conn.failed]
+ self._check_min_conn(
+ req, conns, min_conns,
+ msg='Object PUT exceptions after last send, '
+ '%(conns)s/%(nodes)s required connections')
except ChunkReadTimeout as err:
self.app.logger.warn(
_('ERROR Client read timeout (%ss)'), err.seconds)
self.app.logger.increment('client_timeouts')
- return HTTPRequestTimeout(request=req)
+ raise HTTPRequestTimeout(request=req)
+ except HTTPException:
+ raise
except (Exception, Timeout):
self.app.logger.exception(
_('ERROR Exception causing client disconnect'))
- return HTTPClientDisconnect(request=req)
+ raise HTTPClientDisconnect(request=req)
if req.content_length and bytes_transferred < req.content_length:
req.client_disconnect = True
self.app.logger.warn(
_('Client disconnected without sending enough data'))
self.app.logger.increment('client_disconnects')
- return HTTPClientDisconnect(request=req)
+ raise HTTPClientDisconnect(request=req)
+
+ def _store_object(self, req, data_source, nodes, partition,
+ outgoing_headers):
+ """
+ Store a replicated object.
+
+ This method is responsible for establishing connection
+ with storage nodes and sending object to each one of those
+ nodes. After sending the data, the "best" reponse will be
+ returned based on statuses from all connections
+ """
+ policy_idx = req.headers.get('X-Backend-Storage-Policy-Index')
+ policy = POLICIES.get_by_index(policy_idx)
+ if not nodes:
+ return HTTPNotFound()
+
+ # RFC2616:8.2.3 disallows 100-continue without a body
+ if (req.content_length > 0) or req.is_chunked:
+ expect = True
+ else:
+ expect = False
+ conns = self._get_put_connections(req, nodes, partition,
+ outgoing_headers, policy, expect)
+
+ try:
+ # check that a minimum number of connections were established and
+ # meet all the correct conditions set in the request
+ self._check_failure_put_connections(conns, req, nodes)
- statuses, reasons, bodies, etags = self._get_put_responses(req, conns,
- nodes)
+ # transfer data
+ self._transfer_data(req, data_source, conns, nodes)
+
+ # get responses
+ statuses, reasons, bodies, etags = self._get_put_responses(
+ req, conns, nodes)
+ except HTTPException as resp:
+ return resp
+ finally:
+ for conn in conns:
+ conn.close()
if len(etags) > 1:
self.app.logger.error(
@@ -745,14 +815,6 @@ class ObjectController(Controller):
etag = etags.pop() if len(etags) else None
resp = self.best_response(req, statuses, reasons, bodies,
_('Object PUT'), etag=etag)
- if source_header:
- acct, path = source_header.split('/', 3)[2:4]
- resp.headers['X-Copied-From-Account'] = quote(acct)
- resp.headers['X-Copied-From'] = quote(path)
- if 'last-modified' in source_resp.headers:
- resp.headers['X-Copied-From-Last-Modified'] = \
- source_resp.headers['last-modified']
- copy_headers_into(req, resp)
resp.last_modified = math.ceil(
float(Timestamp(req.headers['X-Timestamp'])))
return resp
@@ -760,6 +822,79 @@ class ObjectController(Controller):
@public
@cors_validation
@delay_denial
+ def PUT(self, req):
+ """HTTP PUT request handler."""
+ if req.if_none_match is not None and '*' not in req.if_none_match:
+ # Sending an etag with if-none-match isn't currently supported
+ return HTTPBadRequest(request=req, content_type='text/plain',
+ body='If-None-Match only supports *')
+ container_info = self.container_info(
+ self.account_name, self.container_name, req)
+ policy_index = req.headers.get('X-Backend-Storage-Policy-Index',
+ container_info['storage_policy'])
+ obj_ring = self.app.get_object_ring(policy_index)
+ container_nodes = container_info['nodes']
+ container_partition = container_info['partition']
+ partition, nodes = obj_ring.get_nodes(
+ self.account_name, self.container_name, self.object_name)
+
+ # pass the policy index to storage nodes via req header
+ req.headers['X-Backend-Storage-Policy-Index'] = policy_index
+ req.acl = container_info['write_acl']
+ req.environ['swift_sync_key'] = container_info['sync_key']
+
+ # is request authorized
+ if 'swift.authorize' in req.environ:
+ aresp = req.environ['swift.authorize'](req)
+ if aresp:
+ return aresp
+
+ if not container_info['nodes']:
+ return HTTPNotFound(request=req)
+
+ # update content type in case it is missing
+ self._update_content_type(req)
+
+ # check constraints on object name and request headers
+ error_response = check_object_creation(req, self.object_name) or \
+ check_content_type(req)
+ if error_response:
+ return error_response
+
+ self._update_x_timestamp(req)
+
+ # check if versioning is enabled and handle copying previous version
+ self._handle_object_versions(req)
+
+ # check if request is a COPY of an existing object
+ source_header = req.headers.get('X-Copy-From')
+ if source_header:
+ error_response, req, data_source, update_response = \
+ self._handle_copy_request(req)
+ if error_response:
+ return error_response
+ else:
+ reader = req.environ['wsgi.input'].read
+ data_source = iter(lambda: reader(self.app.client_chunk_size), '')
+ update_response = lambda req, resp: resp
+
+ # check if object is set to be automaticaly deleted (i.e. expired)
+ req, delete_at_container, delete_at_part, \
+ delete_at_nodes = self._config_obj_expiration(req)
+
+ # add special headers to be handled by storage nodes
+ outgoing_headers = self._backend_requests(
+ req, len(nodes), container_partition, container_nodes,
+ delete_at_container, delete_at_part, delete_at_nodes)
+
+ # send object to storage nodes
+ resp = self._store_object(
+ req, data_source, nodes, partition, outgoing_headers)
+ return update_response(req, resp)
+
+ @public
+ @cors_validation
+ @delay_denial
def DELETE(self, req):
"""HTTP DELETE request handler."""
container_info = self.container_info(
diff --git a/test/unit/__init__.py b/test/unit/__init__.py
index 0e10d3bac..da7212c98 100644
--- a/test/unit/__init__.py
+++ b/test/unit/__init__.py
@@ -733,6 +733,9 @@ def fake_http_connect(*code_iter, **kwargs):
def getheader(self, name, default=None):
return swob.HeaderKeyDict(self.getheaders()).get(name, default)
+ def close(self):
+ pass
+
timestamps_iter = iter(kwargs.get('timestamps') or ['1'] * len(code_iter))
etag_iter = iter(kwargs.get('etags') or [None] * len(code_iter))
if isinstance(kwargs.get('headers'), list):
diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py
index 6d5bf0ed5..1b3b2e06d 100644
--- a/test/unit/proxy/test_server.py
+++ b/test/unit/proxy/test_server.py
@@ -993,7 +993,10 @@ class TestObjectController(unittest.TestCase):
headers={'Content-Length': '0',
'Content-Type': 'text/plain'})
self.app.update_request(req)
- res = method(req)
+ try:
+ res = method(req)
+ except HTTPException as res:
+ pass
self.assertEquals(res.status_int, expected)
# repeat test
@@ -1003,7 +1006,10 @@ class TestObjectController(unittest.TestCase):
headers={'Content-Length': '0',
'Content-Type': 'text/plain'})
self.app.update_request(req)
- res = method(req)
+ try:
+ res = method(req)
+ except HTTPException as res:
+ pass
self.assertEquals(res.status_int, expected)
@unpatch_policies
@@ -1734,7 +1740,10 @@ class TestObjectController(unittest.TestCase):
req = Request.blank('/v1/a/c/o.jpg', {})
req.content_length = 0
self.app.update_request(req)
- res = controller.PUT(req)
+ try:
+ res = controller.PUT(req)
+ except HTTPException as res:
+ pass
expected = str(expected)
self.assertEquals(res.status[:len(expected)], expected)
test_status_map((200, 200, 201, 201, -1), 201) # connect exc
@@ -1763,7 +1772,10 @@ class TestObjectController(unittest.TestCase):
environ={'REQUEST_METHOD': 'PUT'},
body='some data')
self.app.update_request(req)
- res = controller.PUT(req)
+ try:
+ res = controller.PUT(req)
+ except HTTPException as res:
+ pass
expected = str(expected)
self.assertEquals(res.status[:len(expected)], expected)
test_status_map((200, 200, 201, -1, 201), 201)
@@ -1805,7 +1817,10 @@ class TestObjectController(unittest.TestCase):
req = Request.blank('/v1/a/c/o.jpg', {})
req.content_length = 0
self.app.update_request(req)
- res = controller.PUT(req)
+ try:
+ res = controller.PUT(req)
+ except HTTPException as res:
+ pass
expected = str(expected)
self.assertEquals(res.status[:len(str(expected))],
str(expected))
@@ -3391,7 +3406,10 @@ class TestObjectController(unittest.TestCase):
self.app.update_request(req)
self.app.memcache.store = {}
- resp = controller.PUT(req)
+ try:
+ resp = controller.PUT(req)
+ except HTTPException as resp:
+ pass
self.assertEquals(resp.status_int, 413)
def test_basic_COPY(self):
@@ -3632,7 +3650,10 @@ class TestObjectController(unittest.TestCase):
kwargs = dict(body=copy_from_obj_body)
with self.controller_context(req, *status_list,
**kwargs) as controller:
- resp = controller.COPY(req)
+ try:
+ resp = controller.COPY(req)
+ except HTTPException as resp:
+ pass
self.assertEquals(resp.status_int, 413)
@_limit_max_file_size
@@ -3656,7 +3677,10 @@ class TestObjectController(unittest.TestCase):
kwargs = dict(body=copy_from_obj_body)
with self.controller_context(req, *status_list,
**kwargs) as controller:
- resp = controller.COPY(req)
+ try:
+ resp = controller.COPY(req)
+ except HTTPException as resp:
+ pass
self.assertEquals(resp.status_int, 413)
def test_COPY_newest(self):
@@ -3698,41 +3722,46 @@ class TestObjectController(unittest.TestCase):
def test_COPY_delete_at(self):
with save_globals():
- given_headers = {}
+ backend_requests = []
- def fake_connect_put_node(nodes, part, path, headers,
- logger_thread_locals):
- given_headers.update(headers)
+ def capture_requests(ipaddr, port, device, partition, method, path,
+ headers=None, query_string=None):
+ backend_requests.append((method, path, headers))
controller = proxy_server.ObjectController(self.app, 'a',
'c', 'o')
- controller._connect_put_node = fake_connect_put_node
- set_http_connect(200, 200, 200, 200, 200, 201, 201, 201)
+ set_http_connect(200, 200, 200, 200, 200, 201, 201, 201,
+ give_connect=capture_requests)
self.app.memcache.store = {}
req = Request.blank('/v1/a/c/o',
environ={'REQUEST_METHOD': 'COPY'},
headers={'Destination': '/c/o'})
self.app.update_request(req)
- controller.COPY(req)
- self.assertEquals(given_headers.get('X-Delete-At'), '9876543210')
- self.assertTrue('X-Delete-At-Host' in given_headers)
- self.assertTrue('X-Delete-At-Device' in given_headers)
- self.assertTrue('X-Delete-At-Partition' in given_headers)
- self.assertTrue('X-Delete-At-Container' in given_headers)
+ resp = controller.COPY(req)
+ self.assertEqual(201, resp.status_int) # sanity
+ for method, path, given_headers in backend_requests:
+ if method != 'PUT':
+ continue
+ self.assertEquals(given_headers.get('X-Delete-At'),
+ '9876543210')
+ self.assertTrue('X-Delete-At-Host' in given_headers)
+ self.assertTrue('X-Delete-At-Device' in given_headers)
+ self.assertTrue('X-Delete-At-Partition' in given_headers)
+ self.assertTrue('X-Delete-At-Container' in given_headers)
def test_COPY_account_delete_at(self):
with save_globals():
- given_headers = {}
+ backend_requests = []
- def fake_connect_put_node(nodes, part, path, headers,
- logger_thread_locals):
- given_headers.update(headers)
+ def capture_requests(ipaddr, port, device, partition, method, path,
+ headers=None, query_string=None):
+ backend_requests.append((method, path, headers))
controller = proxy_server.ObjectController(self.app, 'a',
'c', 'o')
- controller._connect_put_node = fake_connect_put_node
- set_http_connect(200, 200, 200, 200, 200, 200, 200, 201, 201, 201)
+ set_http_connect(200, 200, 200, 200, 200, 200, 200, 201, 201, 201,
+ give_connect=capture_requests)
self.app.memcache.store = {}
req = Request.blank('/v1/a/c/o',
environ={'REQUEST_METHOD': 'COPY'},
@@ -3740,12 +3769,17 @@ class TestObjectController(unittest.TestCase):
'Destination-Account': 'a1'})
self.app.update_request(req)
- controller.COPY(req)
- self.assertEquals(given_headers.get('X-Delete-At'), '9876543210')
- self.assertTrue('X-Delete-At-Host' in given_headers)
- self.assertTrue('X-Delete-At-Device' in given_headers)
- self.assertTrue('X-Delete-At-Partition' in given_headers)
- self.assertTrue('X-Delete-At-Container' in given_headers)
+ resp = controller.COPY(req)
+ self.assertEqual(201, resp.status_int) # sanity
+ for method, path, given_headers in backend_requests:
+ if method != 'PUT':
+ continue
+ self.assertEquals(given_headers.get('X-Delete-At'),
+ '9876543210')
+ self.assertTrue('X-Delete-At-Host' in given_headers)
+ self.assertTrue('X-Delete-At-Device' in given_headers)
+ self.assertTrue('X-Delete-At-Partition' in given_headers)
+ self.assertTrue('X-Delete-At-Container' in given_headers)
def test_chunked_put(self):
diff --git a/test/unit/proxy/test_sysmeta.py b/test/unit/proxy/test_sysmeta.py
index c3b673108..d80f2855e 100644
--- a/test/unit/proxy/test_sysmeta.py
+++ b/test/unit/proxy/test_sysmeta.py
@@ -70,6 +70,9 @@ class FakeServerConnection(WSGIContext):
def send(self, data):
self.data += data
+ def close(self):
+ pass
+
def __call__(self, ipaddr, port, device, partition, method, path,
headers=None, query_string=None):
self.path = quote('/' + device + '/' + str(partition) + path)