summaryrefslogtreecommitdiff
path: root/swift/proxy/controllers
diff options
context:
space:
mode:
Diffstat (limited to 'swift/proxy/controllers')
-rw-r--r--swift/proxy/controllers/__init__.py4
-rw-r--r--swift/proxy/controllers/account.py5
-rw-r--r--swift/proxy/controllers/base.py171
-rw-r--r--swift/proxy/controllers/container.py3
-rw-r--r--swift/proxy/controllers/obj.py1225
5 files changed, 1349 insertions, 59 deletions
diff --git a/swift/proxy/controllers/__init__.py b/swift/proxy/controllers/__init__.py
index de4c0145b..706fd9165 100644
--- a/swift/proxy/controllers/__init__.py
+++ b/swift/proxy/controllers/__init__.py
@@ -13,7 +13,7 @@
from swift.proxy.controllers.base import Controller
from swift.proxy.controllers.info import InfoController
-from swift.proxy.controllers.obj import ObjectController
+from swift.proxy.controllers.obj import ObjectControllerRouter
from swift.proxy.controllers.account import AccountController
from swift.proxy.controllers.container import ContainerController
@@ -22,5 +22,5 @@ __all__ = [
'ContainerController',
'Controller',
'InfoController',
- 'ObjectController',
+ 'ObjectControllerRouter',
]
diff --git a/swift/proxy/controllers/account.py b/swift/proxy/controllers/account.py
index ea2f8ae33..915e1c481 100644
--- a/swift/proxy/controllers/account.py
+++ b/swift/proxy/controllers/account.py
@@ -58,9 +58,10 @@ class AccountController(Controller):
constraints.MAX_ACCOUNT_NAME_LENGTH)
return resp
- partition, nodes = self.app.account_ring.get_nodes(self.account_name)
+ partition = self.app.account_ring.get_part(self.account_name)
+ node_iter = self.app.iter_nodes(self.app.account_ring, partition)
resp = self.GETorHEAD_base(
- req, _('Account'), self.app.account_ring, partition,
+ req, _('Account'), node_iter, partition,
req.swift_entity_path.rstrip('/'))
if resp.status_int == HTTP_NOT_FOUND:
if resp.headers.get('X-Account-Status', '').lower() == 'deleted':
diff --git a/swift/proxy/controllers/base.py b/swift/proxy/controllers/base.py
index 0aeb803f1..ca12d343e 100644
--- a/swift/proxy/controllers/base.py
+++ b/swift/proxy/controllers/base.py
@@ -28,6 +28,7 @@ import os
import time
import functools
import inspect
+import logging
import operator
from sys import exc_info
from swift import gettext_ as _
@@ -39,14 +40,14 @@ from eventlet.timeout import Timeout
from swift.common.wsgi import make_pre_authed_env
from swift.common.utils import Timestamp, config_true_value, \
public, split_path, list_from_csv, GreenthreadSafeIterator, \
- quorum_size, GreenAsyncPile
+ GreenAsyncPile, quorum_size, parse_content_range
from swift.common.bufferedhttp import http_connect
from swift.common.exceptions import ChunkReadTimeout, ChunkWriteTimeout, \
ConnectionTimeout
from swift.common.http import is_informational, is_success, is_redirection, \
is_server_error, HTTP_OK, HTTP_PARTIAL_CONTENT, HTTP_MULTIPLE_CHOICES, \
HTTP_BAD_REQUEST, HTTP_NOT_FOUND, HTTP_SERVICE_UNAVAILABLE, \
- HTTP_INSUFFICIENT_STORAGE, HTTP_UNAUTHORIZED
+ HTTP_INSUFFICIENT_STORAGE, HTTP_UNAUTHORIZED, HTTP_CONTINUE
from swift.common.swob import Request, Response, HeaderKeyDict, Range, \
HTTPException, HTTPRequestedRangeNotSatisfiable
from swift.common.request_helpers import strip_sys_meta_prefix, \
@@ -593,16 +594,37 @@ def close_swift_conn(src):
pass
+def bytes_to_skip(record_size, range_start):
+ """
+ Assume an object is composed of N records, where the first N-1 are all
+ the same size and the last is at most that large, but may be smaller.
+
+ When a range request is made, it might start with a partial record. This
+ must be discarded, lest the consumer get bad data. This is particularly
+ true of suffix-byte-range requests, e.g. "Range: bytes=-12345" where the
+ size of the object is unknown at the time the request is made.
+
+ This function computes the number of bytes that must be discarded to
+ ensure only whole records are yielded. Erasure-code decoding needs this.
+
+ This function could have been inlined, but it took enough tries to get
+ right that some targeted unit tests were desirable, hence its extraction.
+ """
+ return (record_size - (range_start % record_size)) % record_size
+
+
class GetOrHeadHandler(object):
- def __init__(self, app, req, server_type, ring, partition, path,
- backend_headers):
+ def __init__(self, app, req, server_type, node_iter, partition, path,
+ backend_headers, client_chunk_size=None):
self.app = app
- self.ring = ring
+ self.node_iter = node_iter
self.server_type = server_type
self.partition = partition
self.path = path
self.backend_headers = backend_headers
+ self.client_chunk_size = client_chunk_size
+ self.skip_bytes = 0
self.used_nodes = []
self.used_source_etag = ''
@@ -649,6 +671,35 @@ class GetOrHeadHandler(object):
else:
self.backend_headers['Range'] = 'bytes=%d-' % num_bytes
+ def learn_size_from_content_range(self, start, end):
+ """
+ If client_chunk_size is set, makes sure we yield things starting on
+ chunk boundaries based on the Content-Range header in the response.
+
+ Sets our first Range header to the value learned from the
+ Content-Range header in the response; if we were given a
+ fully-specified range (e.g. "bytes=123-456"), this is a no-op.
+
+ If we were given a half-specified range (e.g. "bytes=123-" or
+ "bytes=-456"), then this changes the Range header to a
+ semantically-equivalent one *and* it lets us resume on a proper
+ boundary instead of just in the middle of a piece somewhere.
+
+ If the original request is for more than one range, this does not
+ affect our backend Range header, since we don't support resuming one
+ of those anyway.
+ """
+ if self.client_chunk_size:
+ self.skip_bytes = bytes_to_skip(self.client_chunk_size, start)
+
+ if 'Range' in self.backend_headers:
+ req_range = Range(self.backend_headers['Range'])
+
+ if len(req_range.ranges) > 1:
+ return
+
+ self.backend_headers['Range'] = "bytes=%d-%d" % (start, end)
+
def is_good_source(self, src):
"""
Indicates whether or not the request made to the backend found
@@ -674,42 +725,74 @@ class GetOrHeadHandler(object):
"""
try:
nchunks = 0
- bytes_read_from_source = 0
+ client_chunk_size = self.client_chunk_size
+ bytes_consumed_from_backend = 0
node_timeout = self.app.node_timeout
if self.server_type == 'Object':
node_timeout = self.app.recoverable_node_timeout
+ buf = ''
while True:
try:
with ChunkReadTimeout(node_timeout):
chunk = source.read(self.app.object_chunk_size)
nchunks += 1
- bytes_read_from_source += len(chunk)
+ buf += chunk
except ChunkReadTimeout:
exc_type, exc_value, exc_traceback = exc_info()
if self.newest or self.server_type != 'Object':
raise exc_type, exc_value, exc_traceback
try:
- self.fast_forward(bytes_read_from_source)
+ self.fast_forward(bytes_consumed_from_backend)
except (NotImplementedError, HTTPException, ValueError):
raise exc_type, exc_value, exc_traceback
+ buf = ''
new_source, new_node = self._get_source_and_node()
if new_source:
self.app.exception_occurred(
node, _('Object'),
- _('Trying to read during GET (retrying)'))
+ _('Trying to read during GET (retrying)'),
+ level=logging.ERROR, exc_info=(
+ exc_type, exc_value, exc_traceback))
# Close-out the connection as best as possible.
if getattr(source, 'swift_conn', None):
close_swift_conn(source)
source = new_source
node = new_node
- bytes_read_from_source = 0
continue
else:
raise exc_type, exc_value, exc_traceback
+
+ if buf and self.skip_bytes:
+ if self.skip_bytes < len(buf):
+ buf = buf[self.skip_bytes:]
+ bytes_consumed_from_backend += self.skip_bytes
+ self.skip_bytes = 0
+ else:
+ self.skip_bytes -= len(buf)
+ bytes_consumed_from_backend += len(buf)
+ buf = ''
+
if not chunk:
+ if buf:
+ with ChunkWriteTimeout(self.app.client_timeout):
+ bytes_consumed_from_backend += len(buf)
+ yield buf
+ buf = ''
break
- with ChunkWriteTimeout(self.app.client_timeout):
- yield chunk
+
+ if client_chunk_size is not None:
+ while len(buf) >= client_chunk_size:
+ client_chunk = buf[:client_chunk_size]
+ buf = buf[client_chunk_size:]
+ with ChunkWriteTimeout(self.app.client_timeout):
+ yield client_chunk
+ bytes_consumed_from_backend += len(client_chunk)
+ else:
+ with ChunkWriteTimeout(self.app.client_timeout):
+ yield buf
+ bytes_consumed_from_backend += len(buf)
+ buf = ''
+
# This is for fairness; if the network is outpacing the CPU,
# we'll always be able to read and write data without
# encountering an EWOULDBLOCK, and so eventlet will not switch
@@ -757,7 +840,7 @@ class GetOrHeadHandler(object):
node_timeout = self.app.node_timeout
if self.server_type == 'Object' and not self.newest:
node_timeout = self.app.recoverable_node_timeout
- for node in self.app.iter_nodes(self.ring, self.partition):
+ for node in self.node_iter:
if node in self.used_nodes:
continue
start_node_timing = time.time()
@@ -793,8 +876,10 @@ class GetOrHeadHandler(object):
src_headers = dict(
(k.lower(), v) for k, v in
possible_source.getheaders())
- if src_headers.get('etag', '').strip('"') != \
- self.used_source_etag:
+
+ if self.used_source_etag != src_headers.get(
+ 'x-object-sysmeta-ec-etag',
+ src_headers.get('etag', '')).strip('"'):
self.statuses.append(HTTP_NOT_FOUND)
self.reasons.append('')
self.bodies.append('')
@@ -832,7 +917,9 @@ class GetOrHeadHandler(object):
src_headers = dict(
(k.lower(), v) for k, v in
possible_source.getheaders())
- self.used_source_etag = src_headers.get('etag', '').strip('"')
+ self.used_source_etag = src_headers.get(
+ 'x-object-sysmeta-ec-etag',
+ src_headers.get('etag', '')).strip('"')
return source, node
return None, None
@@ -841,13 +928,17 @@ class GetOrHeadHandler(object):
res = None
if source:
res = Response(request=req)
+ res.status = source.status
+ update_headers(res, source.getheaders())
if req.method == 'GET' and \
source.status in (HTTP_OK, HTTP_PARTIAL_CONTENT):
+ cr = res.headers.get('Content-Range')
+ if cr:
+ start, end, total = parse_content_range(cr)
+ self.learn_size_from_content_range(start, end)
res.app_iter = self._make_app_iter(req, node, source)
# See NOTE: swift_conn at top of file about this.
res.swift_conn = source.swift_conn
- res.status = source.status
- update_headers(res, source.getheaders())
if not res.environ:
res.environ = {}
res.environ['swift_x_timestamp'] = \
@@ -993,7 +1084,8 @@ class Controller(object):
else:
info['partition'] = part
info['nodes'] = nodes
- info.setdefault('storage_policy', '0')
+ if info.get('storage_policy') is None:
+ info['storage_policy'] = 0
return info
def _make_request(self, nodes, part, method, path, headers, query,
@@ -1098,6 +1190,13 @@ class Controller(object):
'%s %s' % (self.server_type, req.method),
overrides=overrides, headers=resp_headers)
+ def _quorum_size(self, n):
+ """
+ Number of successful backend responses needed for the proxy to
+ consider the client request successful.
+ """
+ return quorum_size(n)
+
def have_quorum(self, statuses, node_count):
"""
Given a list of statuses from several requests, determine if
@@ -1107,16 +1206,18 @@ class Controller(object):
:param node_count: number of nodes being queried (basically ring count)
:returns: True or False, depending on if quorum is established
"""
- quorum = quorum_size(node_count)
+ quorum = self._quorum_size(node_count)
if len(statuses) >= quorum:
- for hundred in (HTTP_OK, HTTP_MULTIPLE_CHOICES, HTTP_BAD_REQUEST):
+ for hundred in (HTTP_CONTINUE, HTTP_OK, HTTP_MULTIPLE_CHOICES,
+ HTTP_BAD_REQUEST):
if sum(1 for s in statuses
if hundred <= s < hundred + 100) >= quorum:
return True
return False
def best_response(self, req, statuses, reasons, bodies, server_type,
- etag=None, headers=None, overrides=None):
+ etag=None, headers=None, overrides=None,
+ quorum_size=None):
"""
Given a list of responses from several servers, choose the best to
return to the API.
@@ -1128,10 +1229,16 @@ class Controller(object):
:param server_type: type of server the responses came from
:param etag: etag
:param headers: headers of each response
+ :param overrides: overrides to apply when lacking quorum
+ :param quorum_size: quorum size to use
:returns: swob.Response object with the correct status, body, etc. set
"""
+ if quorum_size is None:
+ quorum_size = self._quorum_size(len(statuses))
+
resp = self._compute_quorum_response(
- req, statuses, reasons, bodies, etag, headers)
+ req, statuses, reasons, bodies, etag, headers,
+ quorum_size=quorum_size)
if overrides and not resp:
faked_up_status_indices = set()
transformed = []
@@ -1145,7 +1252,8 @@ class Controller(object):
statuses, reasons, headers, bodies = zip(*transformed)
resp = self._compute_quorum_response(
req, statuses, reasons, bodies, etag, headers,
- indices_to_avoid=faked_up_status_indices)
+ indices_to_avoid=faked_up_status_indices,
+ quorum_size=quorum_size)
if not resp:
resp = Response(request=req)
@@ -1156,14 +1264,14 @@ class Controller(object):
return resp
def _compute_quorum_response(self, req, statuses, reasons, bodies, etag,
- headers, indices_to_avoid=()):
+ headers, quorum_size, indices_to_avoid=()):
if not statuses:
return None
for hundred in (HTTP_OK, HTTP_MULTIPLE_CHOICES, HTTP_BAD_REQUEST):
hstatuses = \
[(i, s) for i, s in enumerate(statuses)
if hundred <= s < hundred + 100]
- if len(hstatuses) >= quorum_size(len(statuses)):
+ if len(hstatuses) >= quorum_size:
resp = Response(request=req)
try:
status_index, status = max(
@@ -1228,22 +1336,25 @@ class Controller(object):
else:
self.app.logger.warning('Could not autocreate account %r' % path)
- def GETorHEAD_base(self, req, server_type, ring, partition, path):
+ def GETorHEAD_base(self, req, server_type, node_iter, partition, path,
+ client_chunk_size=None):
"""
Base handler for HTTP GET or HEAD requests.
:param req: swob.Request object
:param server_type: server type used in logging
- :param ring: the ring to obtain nodes from
+ :param node_iter: an iterator to obtain nodes from
:param partition: partition
:param path: path for the request
+ :param client_chunk_size: chunk size for response body iterator
:returns: swob.Response object
"""
backend_headers = self.generate_request_headers(
req, additional=req.headers)
- handler = GetOrHeadHandler(self.app, req, self.server_type, ring,
- partition, path, backend_headers)
+ handler = GetOrHeadHandler(self.app, req, self.server_type, node_iter,
+ partition, path, backend_headers,
+ client_chunk_size=client_chunk_size)
res = handler.get_working_response(req)
if not res:
diff --git a/swift/proxy/controllers/container.py b/swift/proxy/controllers/container.py
index fb422e68d..3e4a2bb03 100644
--- a/swift/proxy/controllers/container.py
+++ b/swift/proxy/controllers/container.py
@@ -93,8 +93,9 @@ class ContainerController(Controller):
return HTTPNotFound(request=req)
part = self.app.container_ring.get_part(
self.account_name, self.container_name)
+ node_iter = self.app.iter_nodes(self.app.container_ring, part)
resp = self.GETorHEAD_base(
- req, _('Container'), self.app.container_ring, part,
+ req, _('Container'), node_iter, part,
req.swift_entity_path)
if 'swift.authorize' in req.environ:
req.acl = resp.headers.get('x-container-read')
diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py
index 2b53ba7a8..a83242b5f 100644
--- a/swift/proxy/controllers/obj.py
+++ b/swift/proxy/controllers/obj.py
@@ -24,13 +24,17 @@
# These shenanigans are to ensure all related objects can be garbage
# collected. We've seen objects hang around forever otherwise.
+import collections
import itertools
import mimetypes
import time
import math
+import random
+from hashlib import md5
from swift import gettext_ as _
from urllib import unquote, quote
+from greenlet import GreenletExit
from eventlet import GreenPile
from eventlet.queue import Queue
from eventlet.timeout import Timeout
@@ -38,7 +42,8 @@ from eventlet.timeout import Timeout
from swift.common.utils import (
clean_content_type, config_true_value, ContextPool, csv_append,
GreenAsyncPile, GreenthreadSafeIterator, json, Timestamp,
- normalize_delete_at_timestamp, public, quorum_size, get_expirer_container)
+ normalize_delete_at_timestamp, public, get_expirer_container,
+ quorum_size)
from swift.common.bufferedhttp import http_connect
from swift.common.constraints import check_metadata, check_object_creation, \
check_copy_from_header, check_destination_header, \
@@ -46,21 +51,24 @@ from swift.common.constraints import check_metadata, check_object_creation, \
from swift.common import constraints
from swift.common.exceptions import ChunkReadTimeout, \
ChunkWriteTimeout, ConnectionTimeout, ListingIterNotFound, \
- ListingIterNotAuthorized, ListingIterError
+ ListingIterNotAuthorized, ListingIterError, ResponseTimeout, \
+ InsufficientStorage, FooterNotSupported, MultiphasePUTNotSupported, \
+ PutterConnectError
from swift.common.http import (
is_success, is_client_error, is_server_error, HTTP_CONTINUE, HTTP_CREATED,
HTTP_MULTIPLE_CHOICES, HTTP_NOT_FOUND, HTTP_INTERNAL_SERVER_ERROR,
HTTP_SERVICE_UNAVAILABLE, HTTP_INSUFFICIENT_STORAGE,
- HTTP_PRECONDITION_FAILED, HTTP_CONFLICT)
-from swift.common.storage_policy import POLICIES
+ HTTP_PRECONDITION_FAILED, HTTP_CONFLICT, is_informational)
+from swift.common.storage_policy import (POLICIES, REPL_POLICY, EC_POLICY,
+ ECDriverError, PolicyError)
from swift.proxy.controllers.base import Controller, delay_denial, \
cors_validation
from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPNotFound, \
HTTPPreconditionFailed, HTTPRequestEntityTooLarge, HTTPRequestTimeout, \
- HTTPServerError, HTTPServiceUnavailable, Request, \
- HTTPClientDisconnect, HeaderKeyDict, HTTPException
+ HTTPServerError, HTTPServiceUnavailable, Request, HeaderKeyDict, \
+ HTTPClientDisconnect, HTTPUnprocessableEntity, Response, HTTPException
from swift.common.request_helpers import is_sys_or_user_meta, is_sys_meta, \
- remove_items, copy_header_subset
+ remove_items, copy_header_subset, close_if_possible
def copy_headers_into(from_r, to_r):
@@ -85,8 +93,41 @@ def check_content_type(req):
return None
-class ObjectController(Controller):
- """WSGI controller for object requests."""
+class ObjectControllerRouter(object):
+
+ policy_type_to_controller_map = {}
+
+ @classmethod
+ def register(cls, policy_type):
+ """
+ Decorator for Storage Policy implemenations to register
+ their ObjectController implementations.
+
+ This also fills in a policy_type attribute on the class.
+ """
+ def register_wrapper(controller_cls):
+ if policy_type in cls.policy_type_to_controller_map:
+ raise PolicyError(
+ '%r is already registered for the policy_type %r' % (
+ cls.policy_type_to_controller_map[policy_type],
+ policy_type))
+ cls.policy_type_to_controller_map[policy_type] = controller_cls
+ controller_cls.policy_type = policy_type
+ return controller_cls
+ return register_wrapper
+
+ def __init__(self):
+ self.policy_to_controller_cls = {}
+ for policy in POLICIES:
+ self.policy_to_controller_cls[policy] = \
+ self.policy_type_to_controller_map[policy.policy_type]
+
+ def __getitem__(self, policy):
+ return self.policy_to_controller_cls[policy]
+
+
+class BaseObjectController(Controller):
+ """Base WSGI controller for object requests."""
server_type = 'Object'
def __init__(self, app, account_name, container_name, object_name,
@@ -114,8 +155,10 @@ class ObjectController(Controller):
lreq.environ['QUERY_STRING'] = \
'format=json&prefix=%s&marker=%s' % (quote(lprefix),
quote(marker))
+ container_node_iter = self.app.iter_nodes(self.app.container_ring,
+ lpartition)
lresp = self.GETorHEAD_base(
- lreq, _('Container'), self.app.container_ring, lpartition,
+ lreq, _('Container'), container_node_iter, lpartition,
lreq.swift_entity_path)
if 'swift.authorize' in env:
lreq.acl = lresp.headers.get('x-container-read')
@@ -180,6 +223,7 @@ class ObjectController(Controller):
# pass the policy index to storage nodes via req header
policy_index = req.headers.get('X-Backend-Storage-Policy-Index',
container_info['storage_policy'])
+ policy = POLICIES.get_by_index(policy_index)
obj_ring = self.app.get_object_ring(policy_index)
req.headers['X-Backend-Storage-Policy-Index'] = policy_index
if 'swift.authorize' in req.environ:
@@ -188,9 +232,10 @@ class ObjectController(Controller):
return aresp
partition = obj_ring.get_part(
self.account_name, self.container_name, self.object_name)
- resp = self.GETorHEAD_base(
- req, _('Object'), obj_ring, partition,
- req.swift_entity_path)
+ node_iter = self.app.iter_nodes(obj_ring, partition)
+
+ resp = self._reroute(policy)._get_or_head_response(
+ req, node_iter, partition, policy)
if ';' in resp.headers.get('content-type', ''):
resp.content_type = clean_content_type(
@@ -383,7 +428,10 @@ class ObjectController(Controller):
_('Trying to get final status of PUT to %s') % req.path)
return (None, None)
- def _get_put_responses(self, req, conns, nodes):
+ def _get_put_responses(self, req, conns, nodes, **kwargs):
+ """
+ Collect replicated object responses.
+ """
statuses = []
reasons = []
bodies = []
@@ -488,6 +536,7 @@ class ObjectController(Controller):
self.object_name = src_obj_name
self.container_name = src_container_name
self.account_name = src_account_name
+
source_resp = self.GET(source_req)
# This gives middlewares a way to change the source; for example,
@@ -589,8 +638,9 @@ class ObjectController(Controller):
'X-Newest': 'True'}
hreq = Request.blank(req.path_info, headers=_headers,
environ={'REQUEST_METHOD': 'HEAD'})
+ hnode_iter = self.app.iter_nodes(obj_ring, partition)
hresp = self.GETorHEAD_base(
- hreq, _('Object'), obj_ring, partition,
+ hreq, _('Object'), hnode_iter, partition,
hreq.swift_entity_path)
is_manifest = 'X-Object-Manifest' in req.headers or \
@@ -654,7 +704,10 @@ class ObjectController(Controller):
req.headers['X-Timestamp'] = Timestamp(time.time()).internal
return None
- def _check_failure_put_connections(self, conns, req, nodes):
+ def _check_failure_put_connections(self, conns, req, nodes, min_conns):
+ """
+ Identify any failed connections and check minimum connection count.
+ """
if req.if_none_match is not None and '*' in req.if_none_match:
statuses = [conn.resp.status for conn in conns if conn.resp]
if HTTP_PRECONDITION_FAILED in statuses:
@@ -675,7 +728,6 @@ class ObjectController(Controller):
'timestamps': ', '.join(timestamps)})
raise HTTPAccepted(request=req)
- min_conns = quorum_size(len(nodes))
self._check_min_conn(req, conns, min_conns)
def _get_put_connections(self, req, nodes, partition, outgoing_headers,
@@ -709,8 +761,12 @@ class ObjectController(Controller):
raise HTTPServiceUnavailable(request=req)
def _transfer_data(self, req, data_source, conns, nodes):
- min_conns = quorum_size(len(nodes))
+ """
+ Transfer data for a replicated object.
+ This method was added in the PUT method extraction change
+ """
+ min_conns = quorum_size(len(nodes))
bytes_transferred = 0
try:
with ContextPool(len(nodes)) as pool:
@@ -775,11 +831,11 @@ class ObjectController(Controller):
This method is responsible for establishing connection
with storage nodes and sending object to each one of those
- nodes. After sending the data, the "best" reponse will be
+ nodes. After sending the data, the "best" response will be
returned based on statuses from all connections
"""
- policy_idx = req.headers.get('X-Backend-Storage-Policy-Index')
- policy = POLICIES.get_by_index(policy_idx)
+ policy_index = req.headers.get('X-Backend-Storage-Policy-Index')
+ policy = POLICIES.get_by_index(policy_index)
if not nodes:
return HTTPNotFound()
@@ -790,11 +846,11 @@ class ObjectController(Controller):
expect = False
conns = self._get_put_connections(req, nodes, partition,
outgoing_headers, policy, expect)
-
+ min_conns = quorum_size(len(nodes))
try:
# check that a minimum number of connections were established and
# meet all the correct conditions set in the request
- self._check_failure_put_connections(conns, req, nodes)
+ self._check_failure_put_connections(conns, req, nodes, min_conns)
# transfer data
self._transfer_data(req, data_source, conns, nodes)
@@ -1015,6 +1071,21 @@ class ObjectController(Controller):
headers, overrides=status_overrides)
return resp
+ def _reroute(self, policy):
+ """
+ For COPY requests we need to make sure the controller instance the
+ request is routed through is the correct type for the policy.
+ """
+ if not policy:
+ raise HTTPServiceUnavailable('Unknown Storage Policy')
+ if policy.policy_type != self.policy_type:
+ controller = self.app.obj_controller_router[policy](
+ self.app, self.account_name, self.container_name,
+ self.object_name)
+ else:
+ controller = self
+ return controller
+
@public
@cors_validation
@delay_denial
@@ -1031,6 +1102,7 @@ class ObjectController(Controller):
self.account_name = dest_account
del req.headers['Destination-Account']
dest_container, dest_object = check_destination_header(req)
+
source = '/%s/%s' % (self.container_name, self.object_name)
self.container_name = dest_container
self.object_name = dest_object
@@ -1042,4 +1114,1109 @@ class ObjectController(Controller):
req.headers['Content-Length'] = 0
req.headers['X-Copy-From'] = quote(source)
del req.headers['Destination']
- return self.PUT(req)
+
+ container_info = self.container_info(
+ dest_account, dest_container, req)
+ dest_policy = POLICIES.get_by_index(container_info['storage_policy'])
+
+ return self._reroute(dest_policy).PUT(req)
+
+
+@ObjectControllerRouter.register(REPL_POLICY)
+class ReplicatedObjectController(BaseObjectController):
+
+ def _get_or_head_response(self, req, node_iter, partition, policy):
+ resp = self.GETorHEAD_base(
+ req, _('Object'), node_iter, partition,
+ req.swift_entity_path)
+ return resp
+
+
+class ECAppIter(object):
+ """
+ WSGI iterable that decodes EC fragment archives (or portions thereof)
+ into the original object (or portions thereof).
+
+ :param path: path for the request
+
+ :param policy: storage policy for this object
+
+ :param internal_app_iters: list of the WSGI iterables from object server
+ GET responses for fragment archives. For an M+K erasure code, the
+ caller must supply M such iterables.
+
+ :param range_specs: list of dictionaries describing the ranges requested
+ by the client. Each dictionary contains the start and end of the
+ client's requested byte range as well as the start and end of the EC
+ segments containing that byte range.
+
+ :param obj_length: length of the object, in bytes. Learned from the
+ headers in the GET response from the object server.
+
+ :param logger: a logger
+ """
+ def __init__(self, path, policy, internal_app_iters, range_specs,
+ obj_length, logger):
+ self.path = path
+ self.policy = policy
+ self.internal_app_iters = internal_app_iters
+ self.range_specs = range_specs
+ self.obj_length = obj_length
+ self.boundary = ''
+ self.logger = logger
+
+ def close(self):
+ for it in self.internal_app_iters:
+ close_if_possible(it)
+
+ def __iter__(self):
+ segments_iter = self.decode_segments_from_fragments()
+
+ if len(self.range_specs) == 0:
+ # plain GET; just yield up segments
+ for seg in segments_iter:
+ yield seg
+ return
+
+ if len(self.range_specs) > 1:
+ raise NotImplementedError("multi-range GETs not done yet")
+
+ for range_spec in self.range_specs:
+ client_start = range_spec['client_start']
+ client_end = range_spec['client_end']
+ segment_start = range_spec['segment_start']
+ segment_end = range_spec['segment_end']
+
+ seg_size = self.policy.ec_segment_size
+ is_suffix = client_start is None
+
+ if is_suffix:
+ # Suffix byte ranges (i.e. requests for the last N bytes of
+ # an object) are likely to end up not on a segment boundary.
+ client_range_len = client_end
+ client_start = max(self.obj_length - client_range_len, 0)
+ client_end = self.obj_length - 1
+
+ # may be mid-segment; if it is, then everything up to the
+ # first segment boundary is garbage, and is discarded before
+ # ever getting into this function.
+ unaligned_segment_start = max(self.obj_length - segment_end, 0)
+ alignment_offset = (
+ (seg_size - (unaligned_segment_start % seg_size))
+ % seg_size)
+ segment_start = unaligned_segment_start + alignment_offset
+ segment_end = self.obj_length - 1
+ else:
+ # It's entirely possible that the client asked for a range that
+ # includes some bytes we have and some we don't; for example, a
+ # range of bytes 1000-20000000 on a 1500-byte object.
+ segment_end = (min(segment_end, self.obj_length - 1)
+ if segment_end is not None
+ else self.obj_length - 1)
+ client_end = (min(client_end, self.obj_length - 1)
+ if client_end is not None
+ else self.obj_length - 1)
+
+ num_segments = int(
+ math.ceil(float(segment_end + 1 - segment_start)
+ / self.policy.ec_segment_size))
+ # We get full segments here, but the client may have requested a
+ # byte range that begins or ends in the middle of a segment.
+ # Thus, we have some amount of overrun (extra decoded bytes)
+ # that we trim off so the client gets exactly what they
+ # requested.
+ start_overrun = client_start - segment_start
+ end_overrun = segment_end - client_end
+
+ for i, next_seg in enumerate(segments_iter):
+ # We may have a start_overrun of more than one segment in
+ # the case of suffix-byte-range requests. However, we never
+ # have an end_overrun of more than one segment.
+ if start_overrun > 0:
+ seglen = len(next_seg)
+ if seglen <= start_overrun:
+ start_overrun -= seglen
+ continue
+ else:
+ next_seg = next_seg[start_overrun:]
+ start_overrun = 0
+
+ if i == (num_segments - 1) and end_overrun:
+ next_seg = next_seg[:-end_overrun]
+
+ yield next_seg
+
+ def decode_segments_from_fragments(self):
+ # Decodes the fragments from the object servers and yields one
+ # segment at a time.
+ queues = [Queue(1) for _junk in range(len(self.internal_app_iters))]
+
+ def put_fragments_in_queue(frag_iter, queue):
+ try:
+ for fragment in frag_iter:
+ if fragment[0] == ' ':
+ raise Exception('Leading whitespace on fragment.')
+ queue.put(fragment)
+ except GreenletExit:
+ # killed by contextpool
+ pass
+ except ChunkReadTimeout:
+ # unable to resume in GetOrHeadHandler
+ pass
+ except: # noqa
+ self.logger.exception("Exception fetching fragments for %r" %
+ self.path)
+ finally:
+ queue.resize(2) # ensure there's room
+ queue.put(None)
+
+ with ContextPool(len(self.internal_app_iters)) as pool:
+ for app_iter, queue in zip(
+ self.internal_app_iters, queues):
+ pool.spawn(put_fragments_in_queue, app_iter, queue)
+
+ while True:
+ fragments = []
+ for qi, queue in enumerate(queues):
+ fragment = queue.get()
+ queue.task_done()
+ fragments.append(fragment)
+
+ # If any object server connection yields out a None; we're
+ # done. Either they are all None, and we've finished
+ # successfully; or some un-recoverable failure has left us
+ # with an un-reconstructible list of fragments - so we'll
+ # break out of the iter so WSGI can tear down the broken
+ # connection.
+ if not all(fragments):
+ break
+ try:
+ segment = self.policy.pyeclib_driver.decode(fragments)
+ except ECDriverError:
+ self.logger.exception("Error decoding fragments for %r" %
+ self.path)
+ raise
+
+ yield segment
+
+ def app_iter_range(self, start, end):
+ return self
+
+ def app_iter_ranges(self, content_type, boundary, content_size):
+ self.boundary = boundary
+
+
+def client_range_to_segment_range(client_start, client_end, segment_size):
+ """
+ Takes a byterange from the client and converts it into a byterange
+ spanning the necessary segments.
+
+ Handles prefix, suffix, and fully-specified byte ranges.
+
+ Examples:
+ client_range_to_segment_range(100, 700, 512) = (0, 1023)
+ client_range_to_segment_range(100, 700, 256) = (0, 767)
+ client_range_to_segment_range(300, None, 256) = (256, None)
+
+ :param client_start: first byte of the range requested by the client
+ :param client_end: last byte of the range requested by the client
+ :param segment_size: size of an EC segment, in bytes
+
+ :returns: a 2-tuple (seg_start, seg_end) where
+
+ * seg_start is the first byte of the first segment, or None if this is
+ a suffix byte range
+
+ * seg_end is the last byte of the last segment, or None if this is a
+ prefix byte range
+ """
+ # the index of the first byte of the first segment
+ segment_start = (
+ int(client_start // segment_size)
+ * segment_size) if client_start is not None else None
+ # the index of the last byte of the last segment
+ segment_end = (
+ # bytes M-
+ None if client_end is None else
+ # bytes M-N
+ (((int(client_end // segment_size) + 1)
+ * segment_size) - 1) if client_start is not None else
+ # bytes -N: we get some extra bytes to make sure we
+ # have all we need.
+ #
+ # To see why, imagine a 100-byte segment size, a
+ # 340-byte object, and a request for the last 50
+ # bytes. Naively requesting the last 100 bytes would
+ # result in a truncated first segment and hence a
+ # truncated download. (Of course, the actual
+ # obj-server requests are for fragments, not
+ # segments, but that doesn't change the
+ # calculation.)
+ #
+ # This does mean that we fetch an extra segment if
+ # the object size is an exact multiple of the
+ # segment size. It's a little wasteful, but it's
+ # better to be a little wasteful than to get some
+ # range requests completely wrong.
+ (int(math.ceil((
+ float(client_end) / segment_size) + 1)) # nsegs
+ * segment_size))
+ return (segment_start, segment_end)
+
+
+def segment_range_to_fragment_range(segment_start, segment_end, segment_size,
+ fragment_size):
+ """
+ Takes a byterange spanning some segments and converts that into a
+ byterange spanning the corresponding fragments within their fragment
+ archives.
+
+ Handles prefix, suffix, and fully-specified byte ranges.
+
+ :param segment_start: first byte of the first segment
+ :param segment_end: last byte of the last segment
+ :param segment_size: size of an EC segment, in bytes
+ :param fragment_size: size of an EC fragment, in bytes
+
+ :returns: a 2-tuple (frag_start, frag_end) where
+
+ * frag_start is the first byte of the first fragment, or None if this
+ is a suffix byte range
+
+ * frag_end is the last byte of the last fragment, or None if this is a
+ prefix byte range
+ """
+ # Note: segment_start and (segment_end + 1) are
+ # multiples of segment_size, so we don't have to worry
+ # about integer math giving us rounding troubles.
+ #
+ # There's a whole bunch of +1 and -1 in here; that's because HTTP wants
+ # byteranges to be inclusive of the start and end, so e.g. bytes 200-300
+ # is a range containing 101 bytes. Python has half-inclusive ranges, of
+ # course, so we have to convert back and forth. We try to keep things in
+ # HTTP-style byteranges for consistency.
+
+ # the index of the first byte of the first fragment
+ fragment_start = ((
+ segment_start / segment_size * fragment_size)
+ if segment_start is not None else None)
+ # the index of the last byte of the last fragment
+ fragment_end = (
+ # range unbounded on the right
+ None if segment_end is None else
+ # range unbounded on the left; no -1 since we're
+ # asking for the last N bytes, not to have a
+ # particular byte be the last one
+ ((segment_end + 1) / segment_size
+ * fragment_size) if segment_start is None else
+ # range bounded on both sides; the -1 is because the
+ # rest of the expression computes the length of the
+ # fragment, and a range of N bytes starts at index M
+ # and ends at M + N - 1.
+ ((segment_end + 1) / segment_size * fragment_size) - 1)
+ return (fragment_start, fragment_end)
+
+
+NO_DATA_SENT = 1
+SENDING_DATA = 2
+DATA_SENT = 3
+DATA_ACKED = 4
+COMMIT_SENT = 5
+
+
+class ECPutter(object):
+ """
+ This is here mostly to wrap up the fact that all EC PUTs are
+ chunked because of the mime boundary footer trick and the first
+ half of the two-phase PUT conversation handling.
+
+ An HTTP PUT request that supports streaming.
+
+ Probably deserves more docs than this, but meh.
+ """
+ def __init__(self, conn, node, resp, path, connect_duration,
+ mime_boundary):
+ # Note: you probably want to call Putter.connect() instead of
+ # instantiating one of these directly.
+ self.conn = conn
+ self.node = node
+ self.resp = resp
+ self.path = path
+ self.connect_duration = connect_duration
+ # for handoff nodes node_index is None
+ self.node_index = node.get('index')
+ self.mime_boundary = mime_boundary
+ self.chunk_hasher = md5()
+
+ self.failed = False
+ self.queue = None
+ self.state = NO_DATA_SENT
+
+ def current_status(self):
+ """
+ Returns the current status of the response.
+
+ A response starts off with no current status, then may or may not have
+ a status of 100 for some time, and then ultimately has a final status
+ like 200, 404, et cetera.
+ """
+ return self.resp.status
+
+ def await_response(self, timeout, informational=False):
+ """
+ Get 100-continue response indicating the end of 1st phase of a 2-phase
+ commit or the final response, i.e. the one with status >= 200.
+
+ Might or might not actually wait for anything. If we said Expect:
+ 100-continue but got back a non-100 response, that'll be the thing
+ returned, and we won't do any network IO to get it. OTOH, if we got
+ a 100 Continue response and sent up the PUT request's body, then
+ we'll actually read the 2xx-5xx response off the network here.
+
+ :returns: HTTPResponse
+ :raises: Timeout if the response took too long
+ """
+ conn = self.conn
+ with Timeout(timeout):
+ if not conn.resp:
+ if informational:
+ self.resp = conn.getexpect()
+ else:
+ self.resp = conn.getresponse()
+ return self.resp
+
+ def spawn_sender_greenthread(self, pool, queue_depth, write_timeout,
+ exception_handler):
+ """Call before sending the first chunk of request body"""
+ self.queue = Queue(queue_depth)
+ pool.spawn(self._send_file, write_timeout, exception_handler)
+
+ def wait(self):
+ if self.queue.unfinished_tasks:
+ self.queue.join()
+
+ def _start_mime_doc_object_body(self):
+ self.queue.put("--%s\r\nX-Document: object body\r\n\r\n" %
+ (self.mime_boundary,))
+
+ def send_chunk(self, chunk):
+ if not chunk:
+ # If we're not using chunked transfer-encoding, sending a 0-byte
+ # chunk is just wasteful. If we *are* using chunked
+ # transfer-encoding, sending a 0-byte chunk terminates the
+ # request body. Neither one of these is good.
+ return
+ elif self.state == DATA_SENT:
+ raise ValueError("called send_chunk after end_of_object_data")
+
+ if self.state == NO_DATA_SENT and self.mime_boundary:
+ # We're sending the object plus other stuff in the same request
+ # body, all wrapped up in multipart MIME, so we'd better start
+ # off the MIME document before sending any object data.
+ self._start_mime_doc_object_body()
+ self.state = SENDING_DATA
+
+ self.queue.put(chunk)
+
+ def end_of_object_data(self, footer_metadata):
+ """
+ Call when there is no more data to send.
+
+ :param footer_metadata: dictionary of metadata items
+ """
+ if self.state == DATA_SENT:
+ raise ValueError("called end_of_object_data twice")
+ elif self.state == NO_DATA_SENT and self.mime_boundary:
+ self._start_mime_doc_object_body()
+
+ footer_body = json.dumps(footer_metadata)
+ footer_md5 = md5(footer_body).hexdigest()
+
+ tail_boundary = ("--%s" % (self.mime_boundary,))
+
+ message_parts = [
+ ("\r\n--%s\r\n" % self.mime_boundary),
+ "X-Document: object metadata\r\n",
+ "Content-MD5: %s\r\n" % footer_md5,
+ "\r\n",
+ footer_body, "\r\n",
+ tail_boundary, "\r\n",
+ ]
+ self.queue.put("".join(message_parts))
+
+ self.queue.put('')
+ self.state = DATA_SENT
+
+ def send_commit_confirmation(self):
+ """
+ Call when there are > quorum 2XX responses received. Send commit
+ confirmations to all object nodes to finalize the PUT.
+ """
+ if self.state == COMMIT_SENT:
+ raise ValueError("called send_commit_confirmation twice")
+
+ self.state = DATA_ACKED
+
+ if self.mime_boundary:
+ body = "put_commit_confirmation"
+ tail_boundary = ("--%s--" % (self.mime_boundary,))
+ message_parts = [
+ "X-Document: put commit\r\n",
+ "\r\n",
+ body, "\r\n",
+ tail_boundary,
+ ]
+ self.queue.put("".join(message_parts))
+
+ self.queue.put('')
+ self.state = COMMIT_SENT
+
+ def _send_file(self, write_timeout, exception_handler):
+ """
+ Method for a file PUT coro. Takes chunks from a queue and sends them
+ down a socket.
+
+ If something goes wrong, the "failed" attribute will be set to true
+ and the exception handler will be called.
+ """
+ while True:
+ chunk = self.queue.get()
+ if not self.failed:
+ to_send = "%x\r\n%s\r\n" % (len(chunk), chunk)
+ try:
+ with ChunkWriteTimeout(write_timeout):
+ self.conn.send(to_send)
+ except (Exception, ChunkWriteTimeout):
+ self.failed = True
+ exception_handler(self.conn.node, _('Object'),
+ _('Trying to write to %s') % self.path)
+ self.queue.task_done()
+
+ @classmethod
+ def connect(cls, node, part, path, headers, conn_timeout, node_timeout,
+ chunked=False):
+ """
+ Connect to a backend node and send the headers.
+
+ :returns: Putter instance
+
+ :raises: ConnectionTimeout if initial connection timed out
+ :raises: ResponseTimeout if header retrieval timed out
+ :raises: InsufficientStorage on 507 response from node
+ :raises: PutterConnectError on non-507 server error response from node
+ :raises: FooterNotSupported if need_metadata_footer is set but
+ backend node can't process footers
+ :raises: MultiphasePUTNotSupported if need_multiphase_support is
+ set but backend node can't handle multiphase PUT
+ """
+ mime_boundary = "%.64x" % random.randint(0, 16 ** 64)
+ headers = HeaderKeyDict(headers)
+ # We're going to be adding some unknown amount of data to the
+ # request, so we can't use an explicit content length, and thus
+ # we must use chunked encoding.
+ headers['Transfer-Encoding'] = 'chunked'
+ headers['Expect'] = '100-continue'
+ if 'Content-Length' in headers:
+ headers['X-Backend-Obj-Content-Length'] = \
+ headers.pop('Content-Length')
+
+ headers['X-Backend-Obj-Multipart-Mime-Boundary'] = mime_boundary
+
+ headers['X-Backend-Obj-Metadata-Footer'] = 'yes'
+
+ headers['X-Backend-Obj-Multiphase-Commit'] = 'yes'
+
+ start_time = time.time()
+ with ConnectionTimeout(conn_timeout):
+ conn = http_connect(node['ip'], node['port'], node['device'],
+ part, 'PUT', path, headers)
+ connect_duration = time.time() - start_time
+
+ with ResponseTimeout(node_timeout):
+ resp = conn.getexpect()
+
+ if resp.status == HTTP_INSUFFICIENT_STORAGE:
+ raise InsufficientStorage
+
+ if is_server_error(resp.status):
+ raise PutterConnectError(resp.status)
+
+ if is_informational(resp.status):
+ continue_headers = HeaderKeyDict(resp.getheaders())
+ can_send_metadata_footer = config_true_value(
+ continue_headers.get('X-Obj-Metadata-Footer', 'no'))
+ can_handle_multiphase_put = config_true_value(
+ continue_headers.get('X-Obj-Multiphase-Commit', 'no'))
+
+ if not can_send_metadata_footer:
+ raise FooterNotSupported()
+
+ if not can_handle_multiphase_put:
+ raise MultiphasePUTNotSupported()
+
+ conn.node = node
+ conn.resp = None
+ if is_success(resp.status) or resp.status == HTTP_CONFLICT:
+ conn.resp = resp
+ elif (headers.get('If-None-Match', None) is not None and
+ resp.status == HTTP_PRECONDITION_FAILED):
+ conn.resp = resp
+
+ return cls(conn, node, resp, path, connect_duration, mime_boundary)
+
+
+def chunk_transformer(policy, nstreams):
+ segment_size = policy.ec_segment_size
+
+ buf = collections.deque()
+ total_buf_len = 0
+
+ chunk = yield
+ while chunk:
+ buf.append(chunk)
+ total_buf_len += len(chunk)
+ if total_buf_len >= segment_size:
+ chunks_to_encode = []
+ # extract as many chunks as we can from the input buffer
+ while total_buf_len >= segment_size:
+ to_take = segment_size
+ pieces = []
+ while to_take > 0:
+ piece = buf.popleft()
+ if len(piece) > to_take:
+ buf.appendleft(piece[to_take:])
+ piece = piece[:to_take]
+ pieces.append(piece)
+ to_take -= len(piece)
+ total_buf_len -= len(piece)
+ chunks_to_encode.append(''.join(pieces))
+
+ frags_by_byte_order = []
+ for chunk_to_encode in chunks_to_encode:
+ frags_by_byte_order.append(
+ policy.pyeclib_driver.encode(chunk_to_encode))
+ # Sequential calls to encode() have given us a list that
+ # looks like this:
+ #
+ # [[frag_A1, frag_B1, frag_C1, ...],
+ # [frag_A2, frag_B2, frag_C2, ...], ...]
+ #
+ # What we need is a list like this:
+ #
+ # [(frag_A1 + frag_A2 + ...), # destined for node A
+ # (frag_B1 + frag_B2 + ...), # destined for node B
+ # (frag_C1 + frag_C2 + ...), # destined for node C
+ # ...]
+ obj_data = [''.join(frags)
+ for frags in zip(*frags_by_byte_order)]
+ chunk = yield obj_data
+ else:
+ # didn't have enough data to encode
+ chunk = yield None
+
+ # Now we've gotten an empty chunk, which indicates end-of-input.
+ # Take any leftover bytes and encode them.
+ last_bytes = ''.join(buf)
+ if last_bytes:
+ last_frags = policy.pyeclib_driver.encode(last_bytes)
+ yield last_frags
+ else:
+ yield [''] * nstreams
+
+
+def trailing_metadata(policy, client_obj_hasher,
+ bytes_transferred_from_client,
+ fragment_archive_index):
+ return {
+ # etag and size values are being added twice here.
+ # The container override header is used to update the container db
+ # with these values as they represent the correct etag and size for
+ # the whole object and not just the FA.
+ # The object sysmeta headers will be saved on each FA of the object.
+ 'X-Object-Sysmeta-EC-Etag': client_obj_hasher.hexdigest(),
+ 'X-Object-Sysmeta-EC-Content-Length':
+ str(bytes_transferred_from_client),
+ 'X-Backend-Container-Update-Override-Etag':
+ client_obj_hasher.hexdigest(),
+ 'X-Backend-Container-Update-Override-Size':
+ str(bytes_transferred_from_client),
+ 'X-Object-Sysmeta-Ec-Frag-Index': str(fragment_archive_index),
+ # These fields are for debuggability,
+ # AKA "what is this thing?"
+ 'X-Object-Sysmeta-EC-Scheme': policy.ec_scheme_description,
+ 'X-Object-Sysmeta-EC-Segment-Size': str(policy.ec_segment_size),
+ }
+
+
+@ObjectControllerRouter.register(EC_POLICY)
+class ECObjectController(BaseObjectController):
+
+ def _get_or_head_response(self, req, node_iter, partition, policy):
+ req.headers.setdefault("X-Backend-Etag-Is-At",
+ "X-Object-Sysmeta-Ec-Etag")
+
+ if req.method == 'HEAD':
+ # no fancy EC decoding here, just one plain old HEAD request to
+ # one object server because all fragments hold all metadata
+ # information about the object.
+ resp = self.GETorHEAD_base(
+ req, _('Object'), node_iter, partition,
+ req.swift_entity_path)
+ else: # GET request
+ orig_range = None
+ range_specs = []
+ if req.range:
+ orig_range = req.range
+ # Since segments and fragments have different sizes, we need
+ # to modify the Range header sent to the object servers to
+ # make sure we get the right fragments out of the fragment
+ # archives.
+ segment_size = policy.ec_segment_size
+ fragment_size = policy.fragment_size
+
+ range_specs = []
+ new_ranges = []
+ for client_start, client_end in req.range.ranges:
+
+ segment_start, segment_end = client_range_to_segment_range(
+ client_start, client_end, segment_size)
+
+ fragment_start, fragment_end = \
+ segment_range_to_fragment_range(
+ segment_start, segment_end,
+ segment_size, fragment_size)
+
+ new_ranges.append((fragment_start, fragment_end))
+ range_specs.append({'client_start': client_start,
+ 'client_end': client_end,
+ 'segment_start': segment_start,
+ 'segment_end': segment_end})
+
+ req.range = "bytes=" + ",".join(
+ "%s-%s" % (s if s is not None else "",
+ e if e is not None else "")
+ for s, e in new_ranges)
+
+ node_iter = GreenthreadSafeIterator(node_iter)
+ num_gets = policy.ec_ndata
+ with ContextPool(num_gets) as pool:
+ pile = GreenAsyncPile(pool)
+ for _junk in range(num_gets):
+ pile.spawn(self.GETorHEAD_base,
+ req, 'Object', node_iter, partition,
+ req.swift_entity_path,
+ client_chunk_size=policy.fragment_size)
+
+ responses = list(pile)
+ good_responses = []
+ bad_responses = []
+ for response in responses:
+ if is_success(response.status_int):
+ good_responses.append(response)
+ else:
+ bad_responses.append(response)
+
+ req.range = orig_range
+ if len(good_responses) == num_gets:
+ # If these aren't all for the same object, then error out so
+ # at least the client doesn't get garbage. We can do a lot
+ # better here with more work, but this'll work for now.
+ found_obj_etags = set(
+ resp.headers['X-Object-Sysmeta-Ec-Etag']
+ for resp in good_responses)
+ if len(found_obj_etags) > 1:
+ self.app.logger.debug(
+ "Returning 503 for %s; found too many etags (%s)",
+ req.path,
+ ", ".join(found_obj_etags))
+ return HTTPServiceUnavailable(request=req)
+
+ # we found enough pieces to decode the object, so now let's
+ # decode the object
+ resp_headers = HeaderKeyDict(good_responses[0].headers.items())
+ resp_headers.pop('Content-Range', None)
+ eccl = resp_headers.get('X-Object-Sysmeta-Ec-Content-Length')
+ obj_length = int(eccl) if eccl is not None else None
+
+ resp = Response(
+ request=req,
+ headers=resp_headers,
+ conditional_response=True,
+ app_iter=ECAppIter(
+ req.swift_entity_path,
+ policy,
+ [r.app_iter for r in good_responses],
+ range_specs,
+ obj_length,
+ logger=self.app.logger))
+ else:
+ resp = self.best_response(
+ req,
+ [r.status_int for r in bad_responses],
+ [r.status.split(' ', 1)[1] for r in bad_responses],
+ [r.body for r in bad_responses],
+ 'Object',
+ headers=[r.headers for r in bad_responses])
+
+ self._fix_response_headers(resp)
+ return resp
+
+ def _fix_response_headers(self, resp):
+ # EC fragment archives each have different bytes, hence different
+ # etags. However, they all have the original object's etag stored in
+ # sysmeta, so we copy that here so the client gets it.
+ resp.headers['Etag'] = resp.headers.get(
+ 'X-Object-Sysmeta-Ec-Etag')
+ resp.headers['Content-Length'] = resp.headers.get(
+ 'X-Object-Sysmeta-Ec-Content-Length')
+
+ return resp
+
+ def _connect_put_node(self, node_iter, part, path, headers,
+ logger_thread_locals):
+ """
+ Make a connection for a erasure encoded object.
+
+ Connects to the first working node that it finds in node_iter and sends
+ over the request headers. Returns a Putter to handle the rest of the
+ streaming, or None if no working nodes were found.
+ """
+ # the object server will get different bytes, so these
+ # values do not apply (Content-Length might, in general, but
+ # in the specific case of replication vs. EC, it doesn't).
+ headers.pop('Content-Length', None)
+ headers.pop('Etag', None)
+
+ self.app.logger.thread_locals = logger_thread_locals
+ for node in node_iter:
+ try:
+ putter = ECPutter.connect(
+ node, part, path, headers,
+ conn_timeout=self.app.conn_timeout,
+ node_timeout=self.app.node_timeout)
+ self.app.set_node_timing(node, putter.connect_duration)
+ return putter
+ except InsufficientStorage:
+ self.app.error_limit(node, _('ERROR Insufficient Storage'))
+ except PutterConnectError as e:
+ self.app.error_occurred(
+ node, _('ERROR %(status)d Expect: 100-continue '
+ 'From Object Server') % {
+ 'status': e.status})
+ except (Exception, Timeout):
+ self.app.exception_occurred(
+ node, _('Object'),
+ _('Expect: 100-continue on %s') % path)
+
+ def _determine_chunk_destinations(self, putters):
+ """
+ Given a list of putters, return a dict where the key is the putter
+ and the value is the node index to use.
+
+ This is done so that we line up handoffs using the same node index
+ (in the primary part list) as the primary that the handoff is standing
+ in for. This lets erasure-code fragment archives wind up on the
+ preferred local primary nodes when possible.
+ """
+ # Give each putter a "chunk index": the index of the
+ # transformed chunk that we'll send to it.
+ #
+ # For primary nodes, that's just its index (primary 0 gets
+ # chunk 0, primary 1 gets chunk 1, and so on). For handoffs,
+ # we assign the chunk index of a missing primary.
+ handoff_conns = []
+ chunk_index = {}
+ for p in putters:
+ if p.node_index is not None:
+ chunk_index[p] = p.node_index
+ else:
+ handoff_conns.append(p)
+
+ # Note: we may have more holes than handoffs. This is okay; it
+ # just means that we failed to connect to one or more storage
+ # nodes. Holes occur when a storage node is down, in which
+ # case the connection is not replaced, and when a storage node
+ # returns 507, in which case a handoff is used to replace it.
+ holes = [x for x in range(len(putters))
+ if x not in chunk_index.values()]
+
+ for hole, p in zip(holes, handoff_conns):
+ chunk_index[p] = hole
+ return chunk_index
+
+ def _transfer_data(self, req, policy, data_source, putters, nodes,
+ min_conns, etag_hasher):
+ """
+ Transfer data for an erasure coded object.
+
+ This method was added in the PUT method extraction change
+ """
+ bytes_transferred = 0
+ chunk_transform = chunk_transformer(policy, len(nodes))
+ chunk_transform.send(None)
+
+ def send_chunk(chunk):
+ if etag_hasher:
+ etag_hasher.update(chunk)
+ backend_chunks = chunk_transform.send(chunk)
+ if backend_chunks is None:
+ # If there's not enough bytes buffered for erasure-encoding
+ # or whatever we're doing, the transform will give us None.
+ return
+
+ for putter in list(putters):
+ backend_chunk = backend_chunks[chunk_index[putter]]
+ if not putter.failed:
+ putter.chunk_hasher.update(backend_chunk)
+ putter.send_chunk(backend_chunk)
+ else:
+ putters.remove(putter)
+ self._check_min_conn(
+ req, putters, min_conns, msg='Object PUT exceptions during'
+ ' send, %(conns)s/%(nodes)s required connections')
+
+ try:
+ with ContextPool(len(putters)) as pool:
+
+ # build our chunk index dict to place handoffs in the
+ # same part nodes index as the primaries they are covering
+ chunk_index = self._determine_chunk_destinations(putters)
+
+ for putter in putters:
+ putter.spawn_sender_greenthread(
+ pool, self.app.put_queue_depth, self.app.node_timeout,
+ self.app.exception_occurred)
+ while True:
+ with ChunkReadTimeout(self.app.client_timeout):
+ try:
+ chunk = next(data_source)
+ except StopIteration:
+ computed_etag = (etag_hasher.hexdigest()
+ if etag_hasher else None)
+ received_etag = req.headers.get(
+ 'etag', '').strip('"')
+ if (computed_etag and received_etag and
+ computed_etag != received_etag):
+ raise HTTPUnprocessableEntity(request=req)
+
+ send_chunk('') # flush out any buffered data
+
+ for putter in putters:
+ trail_md = trailing_metadata(
+ policy, etag_hasher,
+ bytes_transferred,
+ chunk_index[putter])
+ trail_md['Etag'] = \
+ putter.chunk_hasher.hexdigest()
+ putter.end_of_object_data(trail_md)
+ break
+ bytes_transferred += len(chunk)
+ if bytes_transferred > constraints.MAX_FILE_SIZE:
+ raise HTTPRequestEntityTooLarge(request=req)
+
+ send_chunk(chunk)
+
+ for putter in putters:
+ putter.wait()
+
+ # for storage policies requiring 2-phase commit (e.g.
+ # erasure coding), enforce >= 'quorum' number of
+ # 100-continue responses - this indicates successful
+ # object data and metadata commit and is a necessary
+ # condition to be met before starting 2nd PUT phase
+ final_phase = False
+ need_quorum = True
+ statuses, reasons, bodies, _junk, quorum = \
+ self._get_put_responses(
+ req, putters, len(nodes), final_phase,
+ min_conns, need_quorum=need_quorum)
+ if not quorum:
+ self.app.logger.error(
+ _('Not enough object servers ack\'ed (got %d)'),
+ statuses.count(HTTP_CONTINUE))
+ raise HTTPServiceUnavailable(request=req)
+ # quorum achieved, start 2nd phase - send commit
+ # confirmation to participating object servers
+ # so they write a .durable state file indicating
+ # a successful PUT
+ for putter in putters:
+ putter.send_commit_confirmation()
+ for putter in putters:
+ putter.wait()
+ except ChunkReadTimeout as err:
+ self.app.logger.warn(
+ _('ERROR Client read timeout (%ss)'), err.seconds)
+ self.app.logger.increment('client_timeouts')
+ raise HTTPRequestTimeout(request=req)
+ except HTTPException:
+ raise
+ except (Exception, Timeout):
+ self.app.logger.exception(
+ _('ERROR Exception causing client disconnect'))
+ raise HTTPClientDisconnect(request=req)
+ if req.content_length and bytes_transferred < req.content_length:
+ req.client_disconnect = True
+ self.app.logger.warn(
+ _('Client disconnected without sending enough data'))
+ self.app.logger.increment('client_disconnects')
+ raise HTTPClientDisconnect(request=req)
+
+ def _have_adequate_successes(self, statuses, min_responses):
+ """
+ Given a list of statuses from several requests, determine if a
+ satisfactory number of nodes have responded with 2xx statuses to
+ deem the transaction for a succssful response to the client.
+
+ :param statuses: list of statuses returned so far
+ :param min_responses: minimal pass criterion for number of successes
+ :returns: True or False, depending on current number of successes
+ """
+ if sum(1 for s in statuses if is_success(s)) >= min_responses:
+ return True
+ return False
+
+ def _await_response(self, conn, final_phase):
+ return conn.await_response(
+ self.app.node_timeout, not final_phase)
+
+ def _get_conn_response(self, conn, req, final_phase, **kwargs):
+ try:
+ resp = self._await_response(conn, final_phase=final_phase,
+ **kwargs)
+ except (Exception, Timeout):
+ resp = None
+ if final_phase:
+ status_type = 'final'
+ else:
+ status_type = 'commit'
+ self.app.exception_occurred(
+ conn.node, _('Object'),
+ _('Trying to get %s status of PUT to %s') % (
+ status_type, req.path))
+ return (conn, resp)
+
+ def _get_put_responses(self, req, putters, num_nodes, final_phase,
+ min_responses, need_quorum=True):
+ """
+ Collect erasure coded object responses.
+
+ Collect object responses to a PUT request and determine if
+ satisfactory number of nodes have returned success. Return
+ statuses, quorum result if indicated by 'need_quorum' and
+ etags if this is a final phase or a multiphase PUT transaction.
+
+ :param req: the request
+ :param putters: list of putters for the request
+ :param num_nodes: number of nodes involved
+ :param final_phase: boolean indicating if this is the last phase
+ :param min_responses: minimum needed when not requiring quorum
+ :param need_quorum: boolean indicating if quorum is required
+ """
+ statuses = []
+ reasons = []
+ bodies = []
+ etags = set()
+
+ pile = GreenAsyncPile(len(putters))
+ for putter in putters:
+ if putter.failed:
+ continue
+ pile.spawn(self._get_conn_response, putter, req,
+ final_phase=final_phase)
+
+ def _handle_response(putter, response):
+ statuses.append(response.status)
+ reasons.append(response.reason)
+ if final_phase:
+ body = response.read()
+ bodies.append(body)
+ else:
+ body = ''
+ if response.status == HTTP_INSUFFICIENT_STORAGE:
+ putter.failed = True
+ self.app.error_limit(putter.node,
+ _('ERROR Insufficient Storage'))
+ elif response.status >= HTTP_INTERNAL_SERVER_ERROR:
+ putter.failed = True
+ self.app.error_occurred(
+ putter.node,
+ _('ERROR %(status)d %(body)s From Object Server '
+ 're: %(path)s') %
+ {'status': response.status,
+ 'body': body[:1024], 'path': req.path})
+ elif is_success(response.status):
+ etags.add(response.getheader('etag').strip('"'))
+
+ quorum = False
+ for (putter, response) in pile:
+ if response:
+ _handle_response(putter, response)
+ if self._have_adequate_successes(statuses, min_responses):
+ break
+ else:
+ putter.failed = True
+
+ # give any pending requests *some* chance to finish
+ finished_quickly = pile.waitall(self.app.post_quorum_timeout)
+ for (putter, response) in finished_quickly:
+ if response:
+ _handle_response(putter, response)
+
+ if need_quorum:
+ if final_phase:
+ while len(statuses) < num_nodes:
+ statuses.append(HTTP_SERVICE_UNAVAILABLE)
+ reasons.append('')
+ bodies.append('')
+ else:
+ # intermediate response phase - set return value to true only
+ # if there are enough 100-continue acknowledgements
+ if self.have_quorum(statuses, num_nodes):
+ quorum = True
+
+ return statuses, reasons, bodies, etags, quorum
+
+ def _store_object(self, req, data_source, nodes, partition,
+ outgoing_headers):
+ """
+ Store an erasure coded object.
+ """
+ policy_index = int(req.headers.get('X-Backend-Storage-Policy-Index'))
+ policy = POLICIES.get_by_index(policy_index)
+ # Since the request body sent from client -> proxy is not
+ # the same as the request body sent proxy -> object, we
+ # can't rely on the object-server to do the etag checking -
+ # so we have to do it here.
+ etag_hasher = md5()
+
+ min_conns = policy.quorum
+ putters = self._get_put_connections(
+ req, nodes, partition, outgoing_headers,
+ policy, expect=True)
+
+ try:
+ # check that a minimum number of connections were established and
+ # meet all the correct conditions set in the request
+ self._check_failure_put_connections(putters, req, nodes, min_conns)
+
+ self._transfer_data(req, policy, data_source, putters,
+ nodes, min_conns, etag_hasher)
+ final_phase = True
+ need_quorum = False
+ min_resp = 2
+ putters = [p for p in putters if not p.failed]
+ # ignore response etags, and quorum boolean
+ statuses, reasons, bodies, _etags, _quorum = \
+ self._get_put_responses(req, putters, len(nodes),
+ final_phase, min_resp,
+ need_quorum=need_quorum)
+ except HTTPException as resp:
+ return resp
+
+ etag = etag_hasher.hexdigest()
+ resp = self.best_response(req, statuses, reasons, bodies,
+ _('Object PUT'), etag=etag,
+ quorum_size=min_conns)
+ resp.last_modified = math.ceil(
+ float(Timestamp(req.headers['X-Timestamp'])))
+ return resp