diff options
Diffstat (limited to 'swift/proxy/controllers')
-rw-r--r-- | swift/proxy/controllers/__init__.py | 4 | ||||
-rw-r--r-- | swift/proxy/controllers/account.py | 5 | ||||
-rw-r--r-- | swift/proxy/controllers/base.py | 171 | ||||
-rw-r--r-- | swift/proxy/controllers/container.py | 3 | ||||
-rw-r--r-- | swift/proxy/controllers/obj.py | 1225 |
5 files changed, 1349 insertions, 59 deletions
diff --git a/swift/proxy/controllers/__init__.py b/swift/proxy/controllers/__init__.py index de4c0145b..706fd9165 100644 --- a/swift/proxy/controllers/__init__.py +++ b/swift/proxy/controllers/__init__.py @@ -13,7 +13,7 @@ from swift.proxy.controllers.base import Controller from swift.proxy.controllers.info import InfoController -from swift.proxy.controllers.obj import ObjectController +from swift.proxy.controllers.obj import ObjectControllerRouter from swift.proxy.controllers.account import AccountController from swift.proxy.controllers.container import ContainerController @@ -22,5 +22,5 @@ __all__ = [ 'ContainerController', 'Controller', 'InfoController', - 'ObjectController', + 'ObjectControllerRouter', ] diff --git a/swift/proxy/controllers/account.py b/swift/proxy/controllers/account.py index ea2f8ae33..915e1c481 100644 --- a/swift/proxy/controllers/account.py +++ b/swift/proxy/controllers/account.py @@ -58,9 +58,10 @@ class AccountController(Controller): constraints.MAX_ACCOUNT_NAME_LENGTH) return resp - partition, nodes = self.app.account_ring.get_nodes(self.account_name) + partition = self.app.account_ring.get_part(self.account_name) + node_iter = self.app.iter_nodes(self.app.account_ring, partition) resp = self.GETorHEAD_base( - req, _('Account'), self.app.account_ring, partition, + req, _('Account'), node_iter, partition, req.swift_entity_path.rstrip('/')) if resp.status_int == HTTP_NOT_FOUND: if resp.headers.get('X-Account-Status', '').lower() == 'deleted': diff --git a/swift/proxy/controllers/base.py b/swift/proxy/controllers/base.py index 0aeb803f1..ca12d343e 100644 --- a/swift/proxy/controllers/base.py +++ b/swift/proxy/controllers/base.py @@ -28,6 +28,7 @@ import os import time import functools import inspect +import logging import operator from sys import exc_info from swift import gettext_ as _ @@ -39,14 +40,14 @@ from eventlet.timeout import Timeout from swift.common.wsgi import make_pre_authed_env from swift.common.utils import Timestamp, config_true_value, \ public, split_path, list_from_csv, GreenthreadSafeIterator, \ - quorum_size, GreenAsyncPile + GreenAsyncPile, quorum_size, parse_content_range from swift.common.bufferedhttp import http_connect from swift.common.exceptions import ChunkReadTimeout, ChunkWriteTimeout, \ ConnectionTimeout from swift.common.http import is_informational, is_success, is_redirection, \ is_server_error, HTTP_OK, HTTP_PARTIAL_CONTENT, HTTP_MULTIPLE_CHOICES, \ HTTP_BAD_REQUEST, HTTP_NOT_FOUND, HTTP_SERVICE_UNAVAILABLE, \ - HTTP_INSUFFICIENT_STORAGE, HTTP_UNAUTHORIZED + HTTP_INSUFFICIENT_STORAGE, HTTP_UNAUTHORIZED, HTTP_CONTINUE from swift.common.swob import Request, Response, HeaderKeyDict, Range, \ HTTPException, HTTPRequestedRangeNotSatisfiable from swift.common.request_helpers import strip_sys_meta_prefix, \ @@ -593,16 +594,37 @@ def close_swift_conn(src): pass +def bytes_to_skip(record_size, range_start): + """ + Assume an object is composed of N records, where the first N-1 are all + the same size and the last is at most that large, but may be smaller. + + When a range request is made, it might start with a partial record. This + must be discarded, lest the consumer get bad data. This is particularly + true of suffix-byte-range requests, e.g. "Range: bytes=-12345" where the + size of the object is unknown at the time the request is made. + + This function computes the number of bytes that must be discarded to + ensure only whole records are yielded. Erasure-code decoding needs this. + + This function could have been inlined, but it took enough tries to get + right that some targeted unit tests were desirable, hence its extraction. + """ + return (record_size - (range_start % record_size)) % record_size + + class GetOrHeadHandler(object): - def __init__(self, app, req, server_type, ring, partition, path, - backend_headers): + def __init__(self, app, req, server_type, node_iter, partition, path, + backend_headers, client_chunk_size=None): self.app = app - self.ring = ring + self.node_iter = node_iter self.server_type = server_type self.partition = partition self.path = path self.backend_headers = backend_headers + self.client_chunk_size = client_chunk_size + self.skip_bytes = 0 self.used_nodes = [] self.used_source_etag = '' @@ -649,6 +671,35 @@ class GetOrHeadHandler(object): else: self.backend_headers['Range'] = 'bytes=%d-' % num_bytes + def learn_size_from_content_range(self, start, end): + """ + If client_chunk_size is set, makes sure we yield things starting on + chunk boundaries based on the Content-Range header in the response. + + Sets our first Range header to the value learned from the + Content-Range header in the response; if we were given a + fully-specified range (e.g. "bytes=123-456"), this is a no-op. + + If we were given a half-specified range (e.g. "bytes=123-" or + "bytes=-456"), then this changes the Range header to a + semantically-equivalent one *and* it lets us resume on a proper + boundary instead of just in the middle of a piece somewhere. + + If the original request is for more than one range, this does not + affect our backend Range header, since we don't support resuming one + of those anyway. + """ + if self.client_chunk_size: + self.skip_bytes = bytes_to_skip(self.client_chunk_size, start) + + if 'Range' in self.backend_headers: + req_range = Range(self.backend_headers['Range']) + + if len(req_range.ranges) > 1: + return + + self.backend_headers['Range'] = "bytes=%d-%d" % (start, end) + def is_good_source(self, src): """ Indicates whether or not the request made to the backend found @@ -674,42 +725,74 @@ class GetOrHeadHandler(object): """ try: nchunks = 0 - bytes_read_from_source = 0 + client_chunk_size = self.client_chunk_size + bytes_consumed_from_backend = 0 node_timeout = self.app.node_timeout if self.server_type == 'Object': node_timeout = self.app.recoverable_node_timeout + buf = '' while True: try: with ChunkReadTimeout(node_timeout): chunk = source.read(self.app.object_chunk_size) nchunks += 1 - bytes_read_from_source += len(chunk) + buf += chunk except ChunkReadTimeout: exc_type, exc_value, exc_traceback = exc_info() if self.newest or self.server_type != 'Object': raise exc_type, exc_value, exc_traceback try: - self.fast_forward(bytes_read_from_source) + self.fast_forward(bytes_consumed_from_backend) except (NotImplementedError, HTTPException, ValueError): raise exc_type, exc_value, exc_traceback + buf = '' new_source, new_node = self._get_source_and_node() if new_source: self.app.exception_occurred( node, _('Object'), - _('Trying to read during GET (retrying)')) + _('Trying to read during GET (retrying)'), + level=logging.ERROR, exc_info=( + exc_type, exc_value, exc_traceback)) # Close-out the connection as best as possible. if getattr(source, 'swift_conn', None): close_swift_conn(source) source = new_source node = new_node - bytes_read_from_source = 0 continue else: raise exc_type, exc_value, exc_traceback + + if buf and self.skip_bytes: + if self.skip_bytes < len(buf): + buf = buf[self.skip_bytes:] + bytes_consumed_from_backend += self.skip_bytes + self.skip_bytes = 0 + else: + self.skip_bytes -= len(buf) + bytes_consumed_from_backend += len(buf) + buf = '' + if not chunk: + if buf: + with ChunkWriteTimeout(self.app.client_timeout): + bytes_consumed_from_backend += len(buf) + yield buf + buf = '' break - with ChunkWriteTimeout(self.app.client_timeout): - yield chunk + + if client_chunk_size is not None: + while len(buf) >= client_chunk_size: + client_chunk = buf[:client_chunk_size] + buf = buf[client_chunk_size:] + with ChunkWriteTimeout(self.app.client_timeout): + yield client_chunk + bytes_consumed_from_backend += len(client_chunk) + else: + with ChunkWriteTimeout(self.app.client_timeout): + yield buf + bytes_consumed_from_backend += len(buf) + buf = '' + # This is for fairness; if the network is outpacing the CPU, # we'll always be able to read and write data without # encountering an EWOULDBLOCK, and so eventlet will not switch @@ -757,7 +840,7 @@ class GetOrHeadHandler(object): node_timeout = self.app.node_timeout if self.server_type == 'Object' and not self.newest: node_timeout = self.app.recoverable_node_timeout - for node in self.app.iter_nodes(self.ring, self.partition): + for node in self.node_iter: if node in self.used_nodes: continue start_node_timing = time.time() @@ -793,8 +876,10 @@ class GetOrHeadHandler(object): src_headers = dict( (k.lower(), v) for k, v in possible_source.getheaders()) - if src_headers.get('etag', '').strip('"') != \ - self.used_source_etag: + + if self.used_source_etag != src_headers.get( + 'x-object-sysmeta-ec-etag', + src_headers.get('etag', '')).strip('"'): self.statuses.append(HTTP_NOT_FOUND) self.reasons.append('') self.bodies.append('') @@ -832,7 +917,9 @@ class GetOrHeadHandler(object): src_headers = dict( (k.lower(), v) for k, v in possible_source.getheaders()) - self.used_source_etag = src_headers.get('etag', '').strip('"') + self.used_source_etag = src_headers.get( + 'x-object-sysmeta-ec-etag', + src_headers.get('etag', '')).strip('"') return source, node return None, None @@ -841,13 +928,17 @@ class GetOrHeadHandler(object): res = None if source: res = Response(request=req) + res.status = source.status + update_headers(res, source.getheaders()) if req.method == 'GET' and \ source.status in (HTTP_OK, HTTP_PARTIAL_CONTENT): + cr = res.headers.get('Content-Range') + if cr: + start, end, total = parse_content_range(cr) + self.learn_size_from_content_range(start, end) res.app_iter = self._make_app_iter(req, node, source) # See NOTE: swift_conn at top of file about this. res.swift_conn = source.swift_conn - res.status = source.status - update_headers(res, source.getheaders()) if not res.environ: res.environ = {} res.environ['swift_x_timestamp'] = \ @@ -993,7 +1084,8 @@ class Controller(object): else: info['partition'] = part info['nodes'] = nodes - info.setdefault('storage_policy', '0') + if info.get('storage_policy') is None: + info['storage_policy'] = 0 return info def _make_request(self, nodes, part, method, path, headers, query, @@ -1098,6 +1190,13 @@ class Controller(object): '%s %s' % (self.server_type, req.method), overrides=overrides, headers=resp_headers) + def _quorum_size(self, n): + """ + Number of successful backend responses needed for the proxy to + consider the client request successful. + """ + return quorum_size(n) + def have_quorum(self, statuses, node_count): """ Given a list of statuses from several requests, determine if @@ -1107,16 +1206,18 @@ class Controller(object): :param node_count: number of nodes being queried (basically ring count) :returns: True or False, depending on if quorum is established """ - quorum = quorum_size(node_count) + quorum = self._quorum_size(node_count) if len(statuses) >= quorum: - for hundred in (HTTP_OK, HTTP_MULTIPLE_CHOICES, HTTP_BAD_REQUEST): + for hundred in (HTTP_CONTINUE, HTTP_OK, HTTP_MULTIPLE_CHOICES, + HTTP_BAD_REQUEST): if sum(1 for s in statuses if hundred <= s < hundred + 100) >= quorum: return True return False def best_response(self, req, statuses, reasons, bodies, server_type, - etag=None, headers=None, overrides=None): + etag=None, headers=None, overrides=None, + quorum_size=None): """ Given a list of responses from several servers, choose the best to return to the API. @@ -1128,10 +1229,16 @@ class Controller(object): :param server_type: type of server the responses came from :param etag: etag :param headers: headers of each response + :param overrides: overrides to apply when lacking quorum + :param quorum_size: quorum size to use :returns: swob.Response object with the correct status, body, etc. set """ + if quorum_size is None: + quorum_size = self._quorum_size(len(statuses)) + resp = self._compute_quorum_response( - req, statuses, reasons, bodies, etag, headers) + req, statuses, reasons, bodies, etag, headers, + quorum_size=quorum_size) if overrides and not resp: faked_up_status_indices = set() transformed = [] @@ -1145,7 +1252,8 @@ class Controller(object): statuses, reasons, headers, bodies = zip(*transformed) resp = self._compute_quorum_response( req, statuses, reasons, bodies, etag, headers, - indices_to_avoid=faked_up_status_indices) + indices_to_avoid=faked_up_status_indices, + quorum_size=quorum_size) if not resp: resp = Response(request=req) @@ -1156,14 +1264,14 @@ class Controller(object): return resp def _compute_quorum_response(self, req, statuses, reasons, bodies, etag, - headers, indices_to_avoid=()): + headers, quorum_size, indices_to_avoid=()): if not statuses: return None for hundred in (HTTP_OK, HTTP_MULTIPLE_CHOICES, HTTP_BAD_REQUEST): hstatuses = \ [(i, s) for i, s in enumerate(statuses) if hundred <= s < hundred + 100] - if len(hstatuses) >= quorum_size(len(statuses)): + if len(hstatuses) >= quorum_size: resp = Response(request=req) try: status_index, status = max( @@ -1228,22 +1336,25 @@ class Controller(object): else: self.app.logger.warning('Could not autocreate account %r' % path) - def GETorHEAD_base(self, req, server_type, ring, partition, path): + def GETorHEAD_base(self, req, server_type, node_iter, partition, path, + client_chunk_size=None): """ Base handler for HTTP GET or HEAD requests. :param req: swob.Request object :param server_type: server type used in logging - :param ring: the ring to obtain nodes from + :param node_iter: an iterator to obtain nodes from :param partition: partition :param path: path for the request + :param client_chunk_size: chunk size for response body iterator :returns: swob.Response object """ backend_headers = self.generate_request_headers( req, additional=req.headers) - handler = GetOrHeadHandler(self.app, req, self.server_type, ring, - partition, path, backend_headers) + handler = GetOrHeadHandler(self.app, req, self.server_type, node_iter, + partition, path, backend_headers, + client_chunk_size=client_chunk_size) res = handler.get_working_response(req) if not res: diff --git a/swift/proxy/controllers/container.py b/swift/proxy/controllers/container.py index fb422e68d..3e4a2bb03 100644 --- a/swift/proxy/controllers/container.py +++ b/swift/proxy/controllers/container.py @@ -93,8 +93,9 @@ class ContainerController(Controller): return HTTPNotFound(request=req) part = self.app.container_ring.get_part( self.account_name, self.container_name) + node_iter = self.app.iter_nodes(self.app.container_ring, part) resp = self.GETorHEAD_base( - req, _('Container'), self.app.container_ring, part, + req, _('Container'), node_iter, part, req.swift_entity_path) if 'swift.authorize' in req.environ: req.acl = resp.headers.get('x-container-read') diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index 2b53ba7a8..a83242b5f 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -24,13 +24,17 @@ # These shenanigans are to ensure all related objects can be garbage # collected. We've seen objects hang around forever otherwise. +import collections import itertools import mimetypes import time import math +import random +from hashlib import md5 from swift import gettext_ as _ from urllib import unquote, quote +from greenlet import GreenletExit from eventlet import GreenPile from eventlet.queue import Queue from eventlet.timeout import Timeout @@ -38,7 +42,8 @@ from eventlet.timeout import Timeout from swift.common.utils import ( clean_content_type, config_true_value, ContextPool, csv_append, GreenAsyncPile, GreenthreadSafeIterator, json, Timestamp, - normalize_delete_at_timestamp, public, quorum_size, get_expirer_container) + normalize_delete_at_timestamp, public, get_expirer_container, + quorum_size) from swift.common.bufferedhttp import http_connect from swift.common.constraints import check_metadata, check_object_creation, \ check_copy_from_header, check_destination_header, \ @@ -46,21 +51,24 @@ from swift.common.constraints import check_metadata, check_object_creation, \ from swift.common import constraints from swift.common.exceptions import ChunkReadTimeout, \ ChunkWriteTimeout, ConnectionTimeout, ListingIterNotFound, \ - ListingIterNotAuthorized, ListingIterError + ListingIterNotAuthorized, ListingIterError, ResponseTimeout, \ + InsufficientStorage, FooterNotSupported, MultiphasePUTNotSupported, \ + PutterConnectError from swift.common.http import ( is_success, is_client_error, is_server_error, HTTP_CONTINUE, HTTP_CREATED, HTTP_MULTIPLE_CHOICES, HTTP_NOT_FOUND, HTTP_INTERNAL_SERVER_ERROR, HTTP_SERVICE_UNAVAILABLE, HTTP_INSUFFICIENT_STORAGE, - HTTP_PRECONDITION_FAILED, HTTP_CONFLICT) -from swift.common.storage_policy import POLICIES + HTTP_PRECONDITION_FAILED, HTTP_CONFLICT, is_informational) +from swift.common.storage_policy import (POLICIES, REPL_POLICY, EC_POLICY, + ECDriverError, PolicyError) from swift.proxy.controllers.base import Controller, delay_denial, \ cors_validation from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPNotFound, \ HTTPPreconditionFailed, HTTPRequestEntityTooLarge, HTTPRequestTimeout, \ - HTTPServerError, HTTPServiceUnavailable, Request, \ - HTTPClientDisconnect, HeaderKeyDict, HTTPException + HTTPServerError, HTTPServiceUnavailable, Request, HeaderKeyDict, \ + HTTPClientDisconnect, HTTPUnprocessableEntity, Response, HTTPException from swift.common.request_helpers import is_sys_or_user_meta, is_sys_meta, \ - remove_items, copy_header_subset + remove_items, copy_header_subset, close_if_possible def copy_headers_into(from_r, to_r): @@ -85,8 +93,41 @@ def check_content_type(req): return None -class ObjectController(Controller): - """WSGI controller for object requests.""" +class ObjectControllerRouter(object): + + policy_type_to_controller_map = {} + + @classmethod + def register(cls, policy_type): + """ + Decorator for Storage Policy implemenations to register + their ObjectController implementations. + + This also fills in a policy_type attribute on the class. + """ + def register_wrapper(controller_cls): + if policy_type in cls.policy_type_to_controller_map: + raise PolicyError( + '%r is already registered for the policy_type %r' % ( + cls.policy_type_to_controller_map[policy_type], + policy_type)) + cls.policy_type_to_controller_map[policy_type] = controller_cls + controller_cls.policy_type = policy_type + return controller_cls + return register_wrapper + + def __init__(self): + self.policy_to_controller_cls = {} + for policy in POLICIES: + self.policy_to_controller_cls[policy] = \ + self.policy_type_to_controller_map[policy.policy_type] + + def __getitem__(self, policy): + return self.policy_to_controller_cls[policy] + + +class BaseObjectController(Controller): + """Base WSGI controller for object requests.""" server_type = 'Object' def __init__(self, app, account_name, container_name, object_name, @@ -114,8 +155,10 @@ class ObjectController(Controller): lreq.environ['QUERY_STRING'] = \ 'format=json&prefix=%s&marker=%s' % (quote(lprefix), quote(marker)) + container_node_iter = self.app.iter_nodes(self.app.container_ring, + lpartition) lresp = self.GETorHEAD_base( - lreq, _('Container'), self.app.container_ring, lpartition, + lreq, _('Container'), container_node_iter, lpartition, lreq.swift_entity_path) if 'swift.authorize' in env: lreq.acl = lresp.headers.get('x-container-read') @@ -180,6 +223,7 @@ class ObjectController(Controller): # pass the policy index to storage nodes via req header policy_index = req.headers.get('X-Backend-Storage-Policy-Index', container_info['storage_policy']) + policy = POLICIES.get_by_index(policy_index) obj_ring = self.app.get_object_ring(policy_index) req.headers['X-Backend-Storage-Policy-Index'] = policy_index if 'swift.authorize' in req.environ: @@ -188,9 +232,10 @@ class ObjectController(Controller): return aresp partition = obj_ring.get_part( self.account_name, self.container_name, self.object_name) - resp = self.GETorHEAD_base( - req, _('Object'), obj_ring, partition, - req.swift_entity_path) + node_iter = self.app.iter_nodes(obj_ring, partition) + + resp = self._reroute(policy)._get_or_head_response( + req, node_iter, partition, policy) if ';' in resp.headers.get('content-type', ''): resp.content_type = clean_content_type( @@ -383,7 +428,10 @@ class ObjectController(Controller): _('Trying to get final status of PUT to %s') % req.path) return (None, None) - def _get_put_responses(self, req, conns, nodes): + def _get_put_responses(self, req, conns, nodes, **kwargs): + """ + Collect replicated object responses. + """ statuses = [] reasons = [] bodies = [] @@ -488,6 +536,7 @@ class ObjectController(Controller): self.object_name = src_obj_name self.container_name = src_container_name self.account_name = src_account_name + source_resp = self.GET(source_req) # This gives middlewares a way to change the source; for example, @@ -589,8 +638,9 @@ class ObjectController(Controller): 'X-Newest': 'True'} hreq = Request.blank(req.path_info, headers=_headers, environ={'REQUEST_METHOD': 'HEAD'}) + hnode_iter = self.app.iter_nodes(obj_ring, partition) hresp = self.GETorHEAD_base( - hreq, _('Object'), obj_ring, partition, + hreq, _('Object'), hnode_iter, partition, hreq.swift_entity_path) is_manifest = 'X-Object-Manifest' in req.headers or \ @@ -654,7 +704,10 @@ class ObjectController(Controller): req.headers['X-Timestamp'] = Timestamp(time.time()).internal return None - def _check_failure_put_connections(self, conns, req, nodes): + def _check_failure_put_connections(self, conns, req, nodes, min_conns): + """ + Identify any failed connections and check minimum connection count. + """ if req.if_none_match is not None and '*' in req.if_none_match: statuses = [conn.resp.status for conn in conns if conn.resp] if HTTP_PRECONDITION_FAILED in statuses: @@ -675,7 +728,6 @@ class ObjectController(Controller): 'timestamps': ', '.join(timestamps)}) raise HTTPAccepted(request=req) - min_conns = quorum_size(len(nodes)) self._check_min_conn(req, conns, min_conns) def _get_put_connections(self, req, nodes, partition, outgoing_headers, @@ -709,8 +761,12 @@ class ObjectController(Controller): raise HTTPServiceUnavailable(request=req) def _transfer_data(self, req, data_source, conns, nodes): - min_conns = quorum_size(len(nodes)) + """ + Transfer data for a replicated object. + This method was added in the PUT method extraction change + """ + min_conns = quorum_size(len(nodes)) bytes_transferred = 0 try: with ContextPool(len(nodes)) as pool: @@ -775,11 +831,11 @@ class ObjectController(Controller): This method is responsible for establishing connection with storage nodes and sending object to each one of those - nodes. After sending the data, the "best" reponse will be + nodes. After sending the data, the "best" response will be returned based on statuses from all connections """ - policy_idx = req.headers.get('X-Backend-Storage-Policy-Index') - policy = POLICIES.get_by_index(policy_idx) + policy_index = req.headers.get('X-Backend-Storage-Policy-Index') + policy = POLICIES.get_by_index(policy_index) if not nodes: return HTTPNotFound() @@ -790,11 +846,11 @@ class ObjectController(Controller): expect = False conns = self._get_put_connections(req, nodes, partition, outgoing_headers, policy, expect) - + min_conns = quorum_size(len(nodes)) try: # check that a minimum number of connections were established and # meet all the correct conditions set in the request - self._check_failure_put_connections(conns, req, nodes) + self._check_failure_put_connections(conns, req, nodes, min_conns) # transfer data self._transfer_data(req, data_source, conns, nodes) @@ -1015,6 +1071,21 @@ class ObjectController(Controller): headers, overrides=status_overrides) return resp + def _reroute(self, policy): + """ + For COPY requests we need to make sure the controller instance the + request is routed through is the correct type for the policy. + """ + if not policy: + raise HTTPServiceUnavailable('Unknown Storage Policy') + if policy.policy_type != self.policy_type: + controller = self.app.obj_controller_router[policy]( + self.app, self.account_name, self.container_name, + self.object_name) + else: + controller = self + return controller + @public @cors_validation @delay_denial @@ -1031,6 +1102,7 @@ class ObjectController(Controller): self.account_name = dest_account del req.headers['Destination-Account'] dest_container, dest_object = check_destination_header(req) + source = '/%s/%s' % (self.container_name, self.object_name) self.container_name = dest_container self.object_name = dest_object @@ -1042,4 +1114,1109 @@ class ObjectController(Controller): req.headers['Content-Length'] = 0 req.headers['X-Copy-From'] = quote(source) del req.headers['Destination'] - return self.PUT(req) + + container_info = self.container_info( + dest_account, dest_container, req) + dest_policy = POLICIES.get_by_index(container_info['storage_policy']) + + return self._reroute(dest_policy).PUT(req) + + +@ObjectControllerRouter.register(REPL_POLICY) +class ReplicatedObjectController(BaseObjectController): + + def _get_or_head_response(self, req, node_iter, partition, policy): + resp = self.GETorHEAD_base( + req, _('Object'), node_iter, partition, + req.swift_entity_path) + return resp + + +class ECAppIter(object): + """ + WSGI iterable that decodes EC fragment archives (or portions thereof) + into the original object (or portions thereof). + + :param path: path for the request + + :param policy: storage policy for this object + + :param internal_app_iters: list of the WSGI iterables from object server + GET responses for fragment archives. For an M+K erasure code, the + caller must supply M such iterables. + + :param range_specs: list of dictionaries describing the ranges requested + by the client. Each dictionary contains the start and end of the + client's requested byte range as well as the start and end of the EC + segments containing that byte range. + + :param obj_length: length of the object, in bytes. Learned from the + headers in the GET response from the object server. + + :param logger: a logger + """ + def __init__(self, path, policy, internal_app_iters, range_specs, + obj_length, logger): + self.path = path + self.policy = policy + self.internal_app_iters = internal_app_iters + self.range_specs = range_specs + self.obj_length = obj_length + self.boundary = '' + self.logger = logger + + def close(self): + for it in self.internal_app_iters: + close_if_possible(it) + + def __iter__(self): + segments_iter = self.decode_segments_from_fragments() + + if len(self.range_specs) == 0: + # plain GET; just yield up segments + for seg in segments_iter: + yield seg + return + + if len(self.range_specs) > 1: + raise NotImplementedError("multi-range GETs not done yet") + + for range_spec in self.range_specs: + client_start = range_spec['client_start'] + client_end = range_spec['client_end'] + segment_start = range_spec['segment_start'] + segment_end = range_spec['segment_end'] + + seg_size = self.policy.ec_segment_size + is_suffix = client_start is None + + if is_suffix: + # Suffix byte ranges (i.e. requests for the last N bytes of + # an object) are likely to end up not on a segment boundary. + client_range_len = client_end + client_start = max(self.obj_length - client_range_len, 0) + client_end = self.obj_length - 1 + + # may be mid-segment; if it is, then everything up to the + # first segment boundary is garbage, and is discarded before + # ever getting into this function. + unaligned_segment_start = max(self.obj_length - segment_end, 0) + alignment_offset = ( + (seg_size - (unaligned_segment_start % seg_size)) + % seg_size) + segment_start = unaligned_segment_start + alignment_offset + segment_end = self.obj_length - 1 + else: + # It's entirely possible that the client asked for a range that + # includes some bytes we have and some we don't; for example, a + # range of bytes 1000-20000000 on a 1500-byte object. + segment_end = (min(segment_end, self.obj_length - 1) + if segment_end is not None + else self.obj_length - 1) + client_end = (min(client_end, self.obj_length - 1) + if client_end is not None + else self.obj_length - 1) + + num_segments = int( + math.ceil(float(segment_end + 1 - segment_start) + / self.policy.ec_segment_size)) + # We get full segments here, but the client may have requested a + # byte range that begins or ends in the middle of a segment. + # Thus, we have some amount of overrun (extra decoded bytes) + # that we trim off so the client gets exactly what they + # requested. + start_overrun = client_start - segment_start + end_overrun = segment_end - client_end + + for i, next_seg in enumerate(segments_iter): + # We may have a start_overrun of more than one segment in + # the case of suffix-byte-range requests. However, we never + # have an end_overrun of more than one segment. + if start_overrun > 0: + seglen = len(next_seg) + if seglen <= start_overrun: + start_overrun -= seglen + continue + else: + next_seg = next_seg[start_overrun:] + start_overrun = 0 + + if i == (num_segments - 1) and end_overrun: + next_seg = next_seg[:-end_overrun] + + yield next_seg + + def decode_segments_from_fragments(self): + # Decodes the fragments from the object servers and yields one + # segment at a time. + queues = [Queue(1) for _junk in range(len(self.internal_app_iters))] + + def put_fragments_in_queue(frag_iter, queue): + try: + for fragment in frag_iter: + if fragment[0] == ' ': + raise Exception('Leading whitespace on fragment.') + queue.put(fragment) + except GreenletExit: + # killed by contextpool + pass + except ChunkReadTimeout: + # unable to resume in GetOrHeadHandler + pass + except: # noqa + self.logger.exception("Exception fetching fragments for %r" % + self.path) + finally: + queue.resize(2) # ensure there's room + queue.put(None) + + with ContextPool(len(self.internal_app_iters)) as pool: + for app_iter, queue in zip( + self.internal_app_iters, queues): + pool.spawn(put_fragments_in_queue, app_iter, queue) + + while True: + fragments = [] + for qi, queue in enumerate(queues): + fragment = queue.get() + queue.task_done() + fragments.append(fragment) + + # If any object server connection yields out a None; we're + # done. Either they are all None, and we've finished + # successfully; or some un-recoverable failure has left us + # with an un-reconstructible list of fragments - so we'll + # break out of the iter so WSGI can tear down the broken + # connection. + if not all(fragments): + break + try: + segment = self.policy.pyeclib_driver.decode(fragments) + except ECDriverError: + self.logger.exception("Error decoding fragments for %r" % + self.path) + raise + + yield segment + + def app_iter_range(self, start, end): + return self + + def app_iter_ranges(self, content_type, boundary, content_size): + self.boundary = boundary + + +def client_range_to_segment_range(client_start, client_end, segment_size): + """ + Takes a byterange from the client and converts it into a byterange + spanning the necessary segments. + + Handles prefix, suffix, and fully-specified byte ranges. + + Examples: + client_range_to_segment_range(100, 700, 512) = (0, 1023) + client_range_to_segment_range(100, 700, 256) = (0, 767) + client_range_to_segment_range(300, None, 256) = (256, None) + + :param client_start: first byte of the range requested by the client + :param client_end: last byte of the range requested by the client + :param segment_size: size of an EC segment, in bytes + + :returns: a 2-tuple (seg_start, seg_end) where + + * seg_start is the first byte of the first segment, or None if this is + a suffix byte range + + * seg_end is the last byte of the last segment, or None if this is a + prefix byte range + """ + # the index of the first byte of the first segment + segment_start = ( + int(client_start // segment_size) + * segment_size) if client_start is not None else None + # the index of the last byte of the last segment + segment_end = ( + # bytes M- + None if client_end is None else + # bytes M-N + (((int(client_end // segment_size) + 1) + * segment_size) - 1) if client_start is not None else + # bytes -N: we get some extra bytes to make sure we + # have all we need. + # + # To see why, imagine a 100-byte segment size, a + # 340-byte object, and a request for the last 50 + # bytes. Naively requesting the last 100 bytes would + # result in a truncated first segment and hence a + # truncated download. (Of course, the actual + # obj-server requests are for fragments, not + # segments, but that doesn't change the + # calculation.) + # + # This does mean that we fetch an extra segment if + # the object size is an exact multiple of the + # segment size. It's a little wasteful, but it's + # better to be a little wasteful than to get some + # range requests completely wrong. + (int(math.ceil(( + float(client_end) / segment_size) + 1)) # nsegs + * segment_size)) + return (segment_start, segment_end) + + +def segment_range_to_fragment_range(segment_start, segment_end, segment_size, + fragment_size): + """ + Takes a byterange spanning some segments and converts that into a + byterange spanning the corresponding fragments within their fragment + archives. + + Handles prefix, suffix, and fully-specified byte ranges. + + :param segment_start: first byte of the first segment + :param segment_end: last byte of the last segment + :param segment_size: size of an EC segment, in bytes + :param fragment_size: size of an EC fragment, in bytes + + :returns: a 2-tuple (frag_start, frag_end) where + + * frag_start is the first byte of the first fragment, or None if this + is a suffix byte range + + * frag_end is the last byte of the last fragment, or None if this is a + prefix byte range + """ + # Note: segment_start and (segment_end + 1) are + # multiples of segment_size, so we don't have to worry + # about integer math giving us rounding troubles. + # + # There's a whole bunch of +1 and -1 in here; that's because HTTP wants + # byteranges to be inclusive of the start and end, so e.g. bytes 200-300 + # is a range containing 101 bytes. Python has half-inclusive ranges, of + # course, so we have to convert back and forth. We try to keep things in + # HTTP-style byteranges for consistency. + + # the index of the first byte of the first fragment + fragment_start = (( + segment_start / segment_size * fragment_size) + if segment_start is not None else None) + # the index of the last byte of the last fragment + fragment_end = ( + # range unbounded on the right + None if segment_end is None else + # range unbounded on the left; no -1 since we're + # asking for the last N bytes, not to have a + # particular byte be the last one + ((segment_end + 1) / segment_size + * fragment_size) if segment_start is None else + # range bounded on both sides; the -1 is because the + # rest of the expression computes the length of the + # fragment, and a range of N bytes starts at index M + # and ends at M + N - 1. + ((segment_end + 1) / segment_size * fragment_size) - 1) + return (fragment_start, fragment_end) + + +NO_DATA_SENT = 1 +SENDING_DATA = 2 +DATA_SENT = 3 +DATA_ACKED = 4 +COMMIT_SENT = 5 + + +class ECPutter(object): + """ + This is here mostly to wrap up the fact that all EC PUTs are + chunked because of the mime boundary footer trick and the first + half of the two-phase PUT conversation handling. + + An HTTP PUT request that supports streaming. + + Probably deserves more docs than this, but meh. + """ + def __init__(self, conn, node, resp, path, connect_duration, + mime_boundary): + # Note: you probably want to call Putter.connect() instead of + # instantiating one of these directly. + self.conn = conn + self.node = node + self.resp = resp + self.path = path + self.connect_duration = connect_duration + # for handoff nodes node_index is None + self.node_index = node.get('index') + self.mime_boundary = mime_boundary + self.chunk_hasher = md5() + + self.failed = False + self.queue = None + self.state = NO_DATA_SENT + + def current_status(self): + """ + Returns the current status of the response. + + A response starts off with no current status, then may or may not have + a status of 100 for some time, and then ultimately has a final status + like 200, 404, et cetera. + """ + return self.resp.status + + def await_response(self, timeout, informational=False): + """ + Get 100-continue response indicating the end of 1st phase of a 2-phase + commit or the final response, i.e. the one with status >= 200. + + Might or might not actually wait for anything. If we said Expect: + 100-continue but got back a non-100 response, that'll be the thing + returned, and we won't do any network IO to get it. OTOH, if we got + a 100 Continue response and sent up the PUT request's body, then + we'll actually read the 2xx-5xx response off the network here. + + :returns: HTTPResponse + :raises: Timeout if the response took too long + """ + conn = self.conn + with Timeout(timeout): + if not conn.resp: + if informational: + self.resp = conn.getexpect() + else: + self.resp = conn.getresponse() + return self.resp + + def spawn_sender_greenthread(self, pool, queue_depth, write_timeout, + exception_handler): + """Call before sending the first chunk of request body""" + self.queue = Queue(queue_depth) + pool.spawn(self._send_file, write_timeout, exception_handler) + + def wait(self): + if self.queue.unfinished_tasks: + self.queue.join() + + def _start_mime_doc_object_body(self): + self.queue.put("--%s\r\nX-Document: object body\r\n\r\n" % + (self.mime_boundary,)) + + def send_chunk(self, chunk): + if not chunk: + # If we're not using chunked transfer-encoding, sending a 0-byte + # chunk is just wasteful. If we *are* using chunked + # transfer-encoding, sending a 0-byte chunk terminates the + # request body. Neither one of these is good. + return + elif self.state == DATA_SENT: + raise ValueError("called send_chunk after end_of_object_data") + + if self.state == NO_DATA_SENT and self.mime_boundary: + # We're sending the object plus other stuff in the same request + # body, all wrapped up in multipart MIME, so we'd better start + # off the MIME document before sending any object data. + self._start_mime_doc_object_body() + self.state = SENDING_DATA + + self.queue.put(chunk) + + def end_of_object_data(self, footer_metadata): + """ + Call when there is no more data to send. + + :param footer_metadata: dictionary of metadata items + """ + if self.state == DATA_SENT: + raise ValueError("called end_of_object_data twice") + elif self.state == NO_DATA_SENT and self.mime_boundary: + self._start_mime_doc_object_body() + + footer_body = json.dumps(footer_metadata) + footer_md5 = md5(footer_body).hexdigest() + + tail_boundary = ("--%s" % (self.mime_boundary,)) + + message_parts = [ + ("\r\n--%s\r\n" % self.mime_boundary), + "X-Document: object metadata\r\n", + "Content-MD5: %s\r\n" % footer_md5, + "\r\n", + footer_body, "\r\n", + tail_boundary, "\r\n", + ] + self.queue.put("".join(message_parts)) + + self.queue.put('') + self.state = DATA_SENT + + def send_commit_confirmation(self): + """ + Call when there are > quorum 2XX responses received. Send commit + confirmations to all object nodes to finalize the PUT. + """ + if self.state == COMMIT_SENT: + raise ValueError("called send_commit_confirmation twice") + + self.state = DATA_ACKED + + if self.mime_boundary: + body = "put_commit_confirmation" + tail_boundary = ("--%s--" % (self.mime_boundary,)) + message_parts = [ + "X-Document: put commit\r\n", + "\r\n", + body, "\r\n", + tail_boundary, + ] + self.queue.put("".join(message_parts)) + + self.queue.put('') + self.state = COMMIT_SENT + + def _send_file(self, write_timeout, exception_handler): + """ + Method for a file PUT coro. Takes chunks from a queue and sends them + down a socket. + + If something goes wrong, the "failed" attribute will be set to true + and the exception handler will be called. + """ + while True: + chunk = self.queue.get() + if not self.failed: + to_send = "%x\r\n%s\r\n" % (len(chunk), chunk) + try: + with ChunkWriteTimeout(write_timeout): + self.conn.send(to_send) + except (Exception, ChunkWriteTimeout): + self.failed = True + exception_handler(self.conn.node, _('Object'), + _('Trying to write to %s') % self.path) + self.queue.task_done() + + @classmethod + def connect(cls, node, part, path, headers, conn_timeout, node_timeout, + chunked=False): + """ + Connect to a backend node and send the headers. + + :returns: Putter instance + + :raises: ConnectionTimeout if initial connection timed out + :raises: ResponseTimeout if header retrieval timed out + :raises: InsufficientStorage on 507 response from node + :raises: PutterConnectError on non-507 server error response from node + :raises: FooterNotSupported if need_metadata_footer is set but + backend node can't process footers + :raises: MultiphasePUTNotSupported if need_multiphase_support is + set but backend node can't handle multiphase PUT + """ + mime_boundary = "%.64x" % random.randint(0, 16 ** 64) + headers = HeaderKeyDict(headers) + # We're going to be adding some unknown amount of data to the + # request, so we can't use an explicit content length, and thus + # we must use chunked encoding. + headers['Transfer-Encoding'] = 'chunked' + headers['Expect'] = '100-continue' + if 'Content-Length' in headers: + headers['X-Backend-Obj-Content-Length'] = \ + headers.pop('Content-Length') + + headers['X-Backend-Obj-Multipart-Mime-Boundary'] = mime_boundary + + headers['X-Backend-Obj-Metadata-Footer'] = 'yes' + + headers['X-Backend-Obj-Multiphase-Commit'] = 'yes' + + start_time = time.time() + with ConnectionTimeout(conn_timeout): + conn = http_connect(node['ip'], node['port'], node['device'], + part, 'PUT', path, headers) + connect_duration = time.time() - start_time + + with ResponseTimeout(node_timeout): + resp = conn.getexpect() + + if resp.status == HTTP_INSUFFICIENT_STORAGE: + raise InsufficientStorage + + if is_server_error(resp.status): + raise PutterConnectError(resp.status) + + if is_informational(resp.status): + continue_headers = HeaderKeyDict(resp.getheaders()) + can_send_metadata_footer = config_true_value( + continue_headers.get('X-Obj-Metadata-Footer', 'no')) + can_handle_multiphase_put = config_true_value( + continue_headers.get('X-Obj-Multiphase-Commit', 'no')) + + if not can_send_metadata_footer: + raise FooterNotSupported() + + if not can_handle_multiphase_put: + raise MultiphasePUTNotSupported() + + conn.node = node + conn.resp = None + if is_success(resp.status) or resp.status == HTTP_CONFLICT: + conn.resp = resp + elif (headers.get('If-None-Match', None) is not None and + resp.status == HTTP_PRECONDITION_FAILED): + conn.resp = resp + + return cls(conn, node, resp, path, connect_duration, mime_boundary) + + +def chunk_transformer(policy, nstreams): + segment_size = policy.ec_segment_size + + buf = collections.deque() + total_buf_len = 0 + + chunk = yield + while chunk: + buf.append(chunk) + total_buf_len += len(chunk) + if total_buf_len >= segment_size: + chunks_to_encode = [] + # extract as many chunks as we can from the input buffer + while total_buf_len >= segment_size: + to_take = segment_size + pieces = [] + while to_take > 0: + piece = buf.popleft() + if len(piece) > to_take: + buf.appendleft(piece[to_take:]) + piece = piece[:to_take] + pieces.append(piece) + to_take -= len(piece) + total_buf_len -= len(piece) + chunks_to_encode.append(''.join(pieces)) + + frags_by_byte_order = [] + for chunk_to_encode in chunks_to_encode: + frags_by_byte_order.append( + policy.pyeclib_driver.encode(chunk_to_encode)) + # Sequential calls to encode() have given us a list that + # looks like this: + # + # [[frag_A1, frag_B1, frag_C1, ...], + # [frag_A2, frag_B2, frag_C2, ...], ...] + # + # What we need is a list like this: + # + # [(frag_A1 + frag_A2 + ...), # destined for node A + # (frag_B1 + frag_B2 + ...), # destined for node B + # (frag_C1 + frag_C2 + ...), # destined for node C + # ...] + obj_data = [''.join(frags) + for frags in zip(*frags_by_byte_order)] + chunk = yield obj_data + else: + # didn't have enough data to encode + chunk = yield None + + # Now we've gotten an empty chunk, which indicates end-of-input. + # Take any leftover bytes and encode them. + last_bytes = ''.join(buf) + if last_bytes: + last_frags = policy.pyeclib_driver.encode(last_bytes) + yield last_frags + else: + yield [''] * nstreams + + +def trailing_metadata(policy, client_obj_hasher, + bytes_transferred_from_client, + fragment_archive_index): + return { + # etag and size values are being added twice here. + # The container override header is used to update the container db + # with these values as they represent the correct etag and size for + # the whole object and not just the FA. + # The object sysmeta headers will be saved on each FA of the object. + 'X-Object-Sysmeta-EC-Etag': client_obj_hasher.hexdigest(), + 'X-Object-Sysmeta-EC-Content-Length': + str(bytes_transferred_from_client), + 'X-Backend-Container-Update-Override-Etag': + client_obj_hasher.hexdigest(), + 'X-Backend-Container-Update-Override-Size': + str(bytes_transferred_from_client), + 'X-Object-Sysmeta-Ec-Frag-Index': str(fragment_archive_index), + # These fields are for debuggability, + # AKA "what is this thing?" + 'X-Object-Sysmeta-EC-Scheme': policy.ec_scheme_description, + 'X-Object-Sysmeta-EC-Segment-Size': str(policy.ec_segment_size), + } + + +@ObjectControllerRouter.register(EC_POLICY) +class ECObjectController(BaseObjectController): + + def _get_or_head_response(self, req, node_iter, partition, policy): + req.headers.setdefault("X-Backend-Etag-Is-At", + "X-Object-Sysmeta-Ec-Etag") + + if req.method == 'HEAD': + # no fancy EC decoding here, just one plain old HEAD request to + # one object server because all fragments hold all metadata + # information about the object. + resp = self.GETorHEAD_base( + req, _('Object'), node_iter, partition, + req.swift_entity_path) + else: # GET request + orig_range = None + range_specs = [] + if req.range: + orig_range = req.range + # Since segments and fragments have different sizes, we need + # to modify the Range header sent to the object servers to + # make sure we get the right fragments out of the fragment + # archives. + segment_size = policy.ec_segment_size + fragment_size = policy.fragment_size + + range_specs = [] + new_ranges = [] + for client_start, client_end in req.range.ranges: + + segment_start, segment_end = client_range_to_segment_range( + client_start, client_end, segment_size) + + fragment_start, fragment_end = \ + segment_range_to_fragment_range( + segment_start, segment_end, + segment_size, fragment_size) + + new_ranges.append((fragment_start, fragment_end)) + range_specs.append({'client_start': client_start, + 'client_end': client_end, + 'segment_start': segment_start, + 'segment_end': segment_end}) + + req.range = "bytes=" + ",".join( + "%s-%s" % (s if s is not None else "", + e if e is not None else "") + for s, e in new_ranges) + + node_iter = GreenthreadSafeIterator(node_iter) + num_gets = policy.ec_ndata + with ContextPool(num_gets) as pool: + pile = GreenAsyncPile(pool) + for _junk in range(num_gets): + pile.spawn(self.GETorHEAD_base, + req, 'Object', node_iter, partition, + req.swift_entity_path, + client_chunk_size=policy.fragment_size) + + responses = list(pile) + good_responses = [] + bad_responses = [] + for response in responses: + if is_success(response.status_int): + good_responses.append(response) + else: + bad_responses.append(response) + + req.range = orig_range + if len(good_responses) == num_gets: + # If these aren't all for the same object, then error out so + # at least the client doesn't get garbage. We can do a lot + # better here with more work, but this'll work for now. + found_obj_etags = set( + resp.headers['X-Object-Sysmeta-Ec-Etag'] + for resp in good_responses) + if len(found_obj_etags) > 1: + self.app.logger.debug( + "Returning 503 for %s; found too many etags (%s)", + req.path, + ", ".join(found_obj_etags)) + return HTTPServiceUnavailable(request=req) + + # we found enough pieces to decode the object, so now let's + # decode the object + resp_headers = HeaderKeyDict(good_responses[0].headers.items()) + resp_headers.pop('Content-Range', None) + eccl = resp_headers.get('X-Object-Sysmeta-Ec-Content-Length') + obj_length = int(eccl) if eccl is not None else None + + resp = Response( + request=req, + headers=resp_headers, + conditional_response=True, + app_iter=ECAppIter( + req.swift_entity_path, + policy, + [r.app_iter for r in good_responses], + range_specs, + obj_length, + logger=self.app.logger)) + else: + resp = self.best_response( + req, + [r.status_int for r in bad_responses], + [r.status.split(' ', 1)[1] for r in bad_responses], + [r.body for r in bad_responses], + 'Object', + headers=[r.headers for r in bad_responses]) + + self._fix_response_headers(resp) + return resp + + def _fix_response_headers(self, resp): + # EC fragment archives each have different bytes, hence different + # etags. However, they all have the original object's etag stored in + # sysmeta, so we copy that here so the client gets it. + resp.headers['Etag'] = resp.headers.get( + 'X-Object-Sysmeta-Ec-Etag') + resp.headers['Content-Length'] = resp.headers.get( + 'X-Object-Sysmeta-Ec-Content-Length') + + return resp + + def _connect_put_node(self, node_iter, part, path, headers, + logger_thread_locals): + """ + Make a connection for a erasure encoded object. + + Connects to the first working node that it finds in node_iter and sends + over the request headers. Returns a Putter to handle the rest of the + streaming, or None if no working nodes were found. + """ + # the object server will get different bytes, so these + # values do not apply (Content-Length might, in general, but + # in the specific case of replication vs. EC, it doesn't). + headers.pop('Content-Length', None) + headers.pop('Etag', None) + + self.app.logger.thread_locals = logger_thread_locals + for node in node_iter: + try: + putter = ECPutter.connect( + node, part, path, headers, + conn_timeout=self.app.conn_timeout, + node_timeout=self.app.node_timeout) + self.app.set_node_timing(node, putter.connect_duration) + return putter + except InsufficientStorage: + self.app.error_limit(node, _('ERROR Insufficient Storage')) + except PutterConnectError as e: + self.app.error_occurred( + node, _('ERROR %(status)d Expect: 100-continue ' + 'From Object Server') % { + 'status': e.status}) + except (Exception, Timeout): + self.app.exception_occurred( + node, _('Object'), + _('Expect: 100-continue on %s') % path) + + def _determine_chunk_destinations(self, putters): + """ + Given a list of putters, return a dict where the key is the putter + and the value is the node index to use. + + This is done so that we line up handoffs using the same node index + (in the primary part list) as the primary that the handoff is standing + in for. This lets erasure-code fragment archives wind up on the + preferred local primary nodes when possible. + """ + # Give each putter a "chunk index": the index of the + # transformed chunk that we'll send to it. + # + # For primary nodes, that's just its index (primary 0 gets + # chunk 0, primary 1 gets chunk 1, and so on). For handoffs, + # we assign the chunk index of a missing primary. + handoff_conns = [] + chunk_index = {} + for p in putters: + if p.node_index is not None: + chunk_index[p] = p.node_index + else: + handoff_conns.append(p) + + # Note: we may have more holes than handoffs. This is okay; it + # just means that we failed to connect to one or more storage + # nodes. Holes occur when a storage node is down, in which + # case the connection is not replaced, and when a storage node + # returns 507, in which case a handoff is used to replace it. + holes = [x for x in range(len(putters)) + if x not in chunk_index.values()] + + for hole, p in zip(holes, handoff_conns): + chunk_index[p] = hole + return chunk_index + + def _transfer_data(self, req, policy, data_source, putters, nodes, + min_conns, etag_hasher): + """ + Transfer data for an erasure coded object. + + This method was added in the PUT method extraction change + """ + bytes_transferred = 0 + chunk_transform = chunk_transformer(policy, len(nodes)) + chunk_transform.send(None) + + def send_chunk(chunk): + if etag_hasher: + etag_hasher.update(chunk) + backend_chunks = chunk_transform.send(chunk) + if backend_chunks is None: + # If there's not enough bytes buffered for erasure-encoding + # or whatever we're doing, the transform will give us None. + return + + for putter in list(putters): + backend_chunk = backend_chunks[chunk_index[putter]] + if not putter.failed: + putter.chunk_hasher.update(backend_chunk) + putter.send_chunk(backend_chunk) + else: + putters.remove(putter) + self._check_min_conn( + req, putters, min_conns, msg='Object PUT exceptions during' + ' send, %(conns)s/%(nodes)s required connections') + + try: + with ContextPool(len(putters)) as pool: + + # build our chunk index dict to place handoffs in the + # same part nodes index as the primaries they are covering + chunk_index = self._determine_chunk_destinations(putters) + + for putter in putters: + putter.spawn_sender_greenthread( + pool, self.app.put_queue_depth, self.app.node_timeout, + self.app.exception_occurred) + while True: + with ChunkReadTimeout(self.app.client_timeout): + try: + chunk = next(data_source) + except StopIteration: + computed_etag = (etag_hasher.hexdigest() + if etag_hasher else None) + received_etag = req.headers.get( + 'etag', '').strip('"') + if (computed_etag and received_etag and + computed_etag != received_etag): + raise HTTPUnprocessableEntity(request=req) + + send_chunk('') # flush out any buffered data + + for putter in putters: + trail_md = trailing_metadata( + policy, etag_hasher, + bytes_transferred, + chunk_index[putter]) + trail_md['Etag'] = \ + putter.chunk_hasher.hexdigest() + putter.end_of_object_data(trail_md) + break + bytes_transferred += len(chunk) + if bytes_transferred > constraints.MAX_FILE_SIZE: + raise HTTPRequestEntityTooLarge(request=req) + + send_chunk(chunk) + + for putter in putters: + putter.wait() + + # for storage policies requiring 2-phase commit (e.g. + # erasure coding), enforce >= 'quorum' number of + # 100-continue responses - this indicates successful + # object data and metadata commit and is a necessary + # condition to be met before starting 2nd PUT phase + final_phase = False + need_quorum = True + statuses, reasons, bodies, _junk, quorum = \ + self._get_put_responses( + req, putters, len(nodes), final_phase, + min_conns, need_quorum=need_quorum) + if not quorum: + self.app.logger.error( + _('Not enough object servers ack\'ed (got %d)'), + statuses.count(HTTP_CONTINUE)) + raise HTTPServiceUnavailable(request=req) + # quorum achieved, start 2nd phase - send commit + # confirmation to participating object servers + # so they write a .durable state file indicating + # a successful PUT + for putter in putters: + putter.send_commit_confirmation() + for putter in putters: + putter.wait() + except ChunkReadTimeout as err: + self.app.logger.warn( + _('ERROR Client read timeout (%ss)'), err.seconds) + self.app.logger.increment('client_timeouts') + raise HTTPRequestTimeout(request=req) + except HTTPException: + raise + except (Exception, Timeout): + self.app.logger.exception( + _('ERROR Exception causing client disconnect')) + raise HTTPClientDisconnect(request=req) + if req.content_length and bytes_transferred < req.content_length: + req.client_disconnect = True + self.app.logger.warn( + _('Client disconnected without sending enough data')) + self.app.logger.increment('client_disconnects') + raise HTTPClientDisconnect(request=req) + + def _have_adequate_successes(self, statuses, min_responses): + """ + Given a list of statuses from several requests, determine if a + satisfactory number of nodes have responded with 2xx statuses to + deem the transaction for a succssful response to the client. + + :param statuses: list of statuses returned so far + :param min_responses: minimal pass criterion for number of successes + :returns: True or False, depending on current number of successes + """ + if sum(1 for s in statuses if is_success(s)) >= min_responses: + return True + return False + + def _await_response(self, conn, final_phase): + return conn.await_response( + self.app.node_timeout, not final_phase) + + def _get_conn_response(self, conn, req, final_phase, **kwargs): + try: + resp = self._await_response(conn, final_phase=final_phase, + **kwargs) + except (Exception, Timeout): + resp = None + if final_phase: + status_type = 'final' + else: + status_type = 'commit' + self.app.exception_occurred( + conn.node, _('Object'), + _('Trying to get %s status of PUT to %s') % ( + status_type, req.path)) + return (conn, resp) + + def _get_put_responses(self, req, putters, num_nodes, final_phase, + min_responses, need_quorum=True): + """ + Collect erasure coded object responses. + + Collect object responses to a PUT request and determine if + satisfactory number of nodes have returned success. Return + statuses, quorum result if indicated by 'need_quorum' and + etags if this is a final phase or a multiphase PUT transaction. + + :param req: the request + :param putters: list of putters for the request + :param num_nodes: number of nodes involved + :param final_phase: boolean indicating if this is the last phase + :param min_responses: minimum needed when not requiring quorum + :param need_quorum: boolean indicating if quorum is required + """ + statuses = [] + reasons = [] + bodies = [] + etags = set() + + pile = GreenAsyncPile(len(putters)) + for putter in putters: + if putter.failed: + continue + pile.spawn(self._get_conn_response, putter, req, + final_phase=final_phase) + + def _handle_response(putter, response): + statuses.append(response.status) + reasons.append(response.reason) + if final_phase: + body = response.read() + bodies.append(body) + else: + body = '' + if response.status == HTTP_INSUFFICIENT_STORAGE: + putter.failed = True + self.app.error_limit(putter.node, + _('ERROR Insufficient Storage')) + elif response.status >= HTTP_INTERNAL_SERVER_ERROR: + putter.failed = True + self.app.error_occurred( + putter.node, + _('ERROR %(status)d %(body)s From Object Server ' + 're: %(path)s') % + {'status': response.status, + 'body': body[:1024], 'path': req.path}) + elif is_success(response.status): + etags.add(response.getheader('etag').strip('"')) + + quorum = False + for (putter, response) in pile: + if response: + _handle_response(putter, response) + if self._have_adequate_successes(statuses, min_responses): + break + else: + putter.failed = True + + # give any pending requests *some* chance to finish + finished_quickly = pile.waitall(self.app.post_quorum_timeout) + for (putter, response) in finished_quickly: + if response: + _handle_response(putter, response) + + if need_quorum: + if final_phase: + while len(statuses) < num_nodes: + statuses.append(HTTP_SERVICE_UNAVAILABLE) + reasons.append('') + bodies.append('') + else: + # intermediate response phase - set return value to true only + # if there are enough 100-continue acknowledgements + if self.have_quorum(statuses, num_nodes): + quorum = True + + return statuses, reasons, bodies, etags, quorum + + def _store_object(self, req, data_source, nodes, partition, + outgoing_headers): + """ + Store an erasure coded object. + """ + policy_index = int(req.headers.get('X-Backend-Storage-Policy-Index')) + policy = POLICIES.get_by_index(policy_index) + # Since the request body sent from client -> proxy is not + # the same as the request body sent proxy -> object, we + # can't rely on the object-server to do the etag checking - + # so we have to do it here. + etag_hasher = md5() + + min_conns = policy.quorum + putters = self._get_put_connections( + req, nodes, partition, outgoing_headers, + policy, expect=True) + + try: + # check that a minimum number of connections were established and + # meet all the correct conditions set in the request + self._check_failure_put_connections(putters, req, nodes, min_conns) + + self._transfer_data(req, policy, data_source, putters, + nodes, min_conns, etag_hasher) + final_phase = True + need_quorum = False + min_resp = 2 + putters = [p for p in putters if not p.failed] + # ignore response etags, and quorum boolean + statuses, reasons, bodies, _etags, _quorum = \ + self._get_put_responses(req, putters, len(nodes), + final_phase, min_resp, + need_quorum=need_quorum) + except HTTPException as resp: + return resp + + etag = etag_hasher.hexdigest() + resp = self.best_response(req, statuses, reasons, bodies, + _('Object PUT'), etag=etag, + quorum_size=min_conns) + resp.last_modified = math.ceil( + float(Timestamp(req.headers['X-Timestamp']))) + return resp |