diff options
Diffstat (limited to 'swift/common')
-rw-r--r-- | swift/common/__init__.py | 6 | ||||
-rw-r--r-- | swift/common/auth.py | 98 | ||||
-rw-r--r-- | swift/common/bufferedhttp.py | 158 | ||||
-rw-r--r-- | swift/common/client.py | 718 | ||||
-rw-r--r-- | swift/common/constraints.py | 152 | ||||
-rw-r--r-- | swift/common/db.py | 1463 | ||||
-rw-r--r-- | swift/common/db_replicator.py | 526 | ||||
-rw-r--r-- | swift/common/direct_client.py | 303 | ||||
-rw-r--r-- | swift/common/exceptions.py | 35 | ||||
-rw-r--r-- | swift/common/healthcheck.py | 28 | ||||
-rw-r--r-- | swift/common/memcached.py | 272 | ||||
-rw-r--r-- | swift/common/utils.py | 490 | ||||
-rw-r--r-- | swift/common/wsgi.py | 164 |
13 files changed, 4413 insertions, 0 deletions
diff --git a/swift/common/__init__.py b/swift/common/__init__.py new file mode 100644 index 000000000..fe1925beb --- /dev/null +++ b/swift/common/__init__.py @@ -0,0 +1,6 @@ +""" Code common to all of Swift. """ + +ACCOUNT_LISTING_LIMIT = 10000 +CONTAINER_LISTING_LIMIT = 10000 +FILE_SIZE_LIMIT = 5368709122 + diff --git a/swift/common/auth.py b/swift/common/auth.py new file mode 100644 index 000000000..1e7be05da --- /dev/null +++ b/swift/common/auth.py @@ -0,0 +1,98 @@ +# Copyright (c) 2010 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ConfigParser import ConfigParser, NoOptionError +import os +import time + +from webob.request import Request +from webob.exc import HTTPUnauthorized, HTTPPreconditionFailed +from eventlet.timeout import Timeout + +from swift.common.utils import split_path +from swift.common.bufferedhttp import http_connect_raw as http_connect + + +class DevAuthMiddleware(object): + """ + Auth Middleware that uses the dev auth server + """ + def __init__(self, app, conf, memcache_client, logger): + self.app = app + self.memcache_client = memcache_client + self.logger = logger + self.conf = conf + self.auth_host = conf.get('bind_ip', '127.0.0.1') + self.auth_port = int(conf.get('bind_port', 11000)) + self.timeout = int(conf.get('node_timeout', 10)) + + def __call__(self, env, start_response): + req = Request(env) + if req.path != '/healthcheck': + if 'x-storage-token' in req.headers and \ + 'x-auth-token' not in req.headers: + req.headers['x-auth-token'] = req.headers['x-storage-token'] + version, account, container, obj = split_path(req.path, 1, 4, True) + if account is None: + return HTTPPreconditionFailed(request=req, body='Bad URL')( + env, start_response) + if not req.headers.get('x-auth-token'): + return HTTPPreconditionFailed(request=req, + body='Missing Auth Token')(env, start_response) + if account is None: + return HTTPPreconditionFailed( + request=req, body='Bad URL')(env, start_response) + if not self.auth(account, req.headers['x-auth-token']): + return HTTPUnauthorized(request=req)(env, start_response) + + # If we get here, then things should be good. + return self.app(env, start_response) + + def auth(self, account, token): + """ + Dev authorization implmentation + + :param account: account name + :param token: auth token + + :returns: True if authorization is successful, False otherwise + """ + key = 'auth/%s/%s' % (account, token) + now = time.time() + cached_auth_data = self.memcache_client.get(key) + if cached_auth_data: + start, expiration = cached_auth_data + if now - start <= expiration: + return True + try: + with Timeout(self.timeout): + conn = http_connect(self.auth_host, self.auth_port, 'GET', + '/token/%s/%s' % (account, token)) + resp = conn.getresponse() + resp.read() + conn.close() + if resp.status == 204: + validated = float(resp.getheader('x-auth-ttl')) + else: + validated = False + except: + self.logger.exception('ERROR with auth') + return False + if not validated: + return False + else: + val = (now, validated) + self.memcache_client.set(key, val, timeout=validated) + return True diff --git a/swift/common/bufferedhttp.py b/swift/common/bufferedhttp.py new file mode 100644 index 000000000..ed5f0cec2 --- /dev/null +++ b/swift/common/bufferedhttp.py @@ -0,0 +1,158 @@ +# Copyright (c) 2010 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Monkey Patch httplib.HTTPResponse to buffer reads of headers. This can improve +performance when making large numbers of small HTTP requests. This module +also provides helper functions to make HTTP connections using +BufferedHTTPResponse. + +.. warning:: + + If you use this, be sure that the libraries you are using do not access + the socket directly (xmlrpclib, I'm looking at you :/), and instead + make all calls through httplib. +""" + +from urllib import quote +import logging +import time + +from eventlet.green.httplib import HTTPConnection, HTTPResponse, _UNKNOWN, \ + CONTINUE, HTTPMessage + + +class BufferedHTTPResponse(HTTPResponse): + """HTTPResponse class that buffers reading of headers""" + + def __init__(self, sock, debuglevel=0, strict=0, + method=None): # pragma: no cover + self.sock = sock + self.fp = sock.makefile('rb') + self.debuglevel = debuglevel + self.strict = strict + self._method = method + + self.msg = None + + # from the Status-Line of the response + self.version = _UNKNOWN # HTTP-Version + self.status = _UNKNOWN # Status-Code + self.reason = _UNKNOWN # Reason-Phrase + + self.chunked = _UNKNOWN # is "chunked" being used? + self.chunk_left = _UNKNOWN # bytes left to read in current chunk + self.length = _UNKNOWN # number of bytes left in response + self.will_close = _UNKNOWN # conn will close at end of response + + def expect_response(self): + self.fp = self.sock.makefile('rb', 0) + version, status, reason = self._read_status() + if status != CONTINUE: + self._read_status = lambda: (version, status, reason) + self.begin() + else: + self.status = status + self.reason = reason.strip() + self.version = 11 + self.msg = HTTPMessage(self.fp, 0) + self.msg.fp = None + + +class BufferedHTTPConnection(HTTPConnection): + """HTTPConnection class that uses BufferedHTTPResponse""" + response_class = BufferedHTTPResponse + + def connect(self): + self._connected_time = time.time() + return HTTPConnection.connect(self) + + def putrequest(self, method, url, skip_host=0, skip_accept_encoding=0): + self._method = method + self._path = url + self._txn_id = '-' + return HTTPConnection.putrequest(self, method, url, skip_host, + skip_accept_encoding) + + def putheader(self, header, value): + if header.lower() == 'x-cf-trans-id': + self._txn_id = value + return HTTPConnection.putheader(self, header, value) + + def getexpect(self): + response = BufferedHTTPResponse(self.sock, strict=self.strict, + method=self._method) + response.expect_response() + return response + + def getresponse(self): + response = HTTPConnection.getresponse(self) + logging.debug("HTTP PERF: %.5f seconds to %s %s:%s %s (%s)" % + (time.time() - self._connected_time, self._method, self.host, + self.port, self._path, self._txn_id)) + return response + + +def http_connect(ipaddr, port, device, partition, method, path, + headers=None, query_string=None): + """ + Helper function to create a HTTPConnection object that is buffered + for backend Swift services. + + :param ipaddr: IPv4 address to connect to + :param port: port to connect to + :param device: device of the node to query + :param partition: partition on the device + :param method: HTTP method to request ('GET', 'PUT', 'POST', etc.) + :param path: request path + :param headers: dictionary of headers + :param query_string: request query string + :returns: HTTPConnection object + """ + conn = BufferedHTTPConnection('%s:%s' % (ipaddr, port)) + path = quote('/' + device + '/' + str(partition) + path) + if query_string: + path += '?' + query_string + conn.path = path + conn.putrequest(method, path) + if headers: + for header, value in headers.iteritems(): + conn.putheader(header, value) + conn.endheaders() + return conn + +def http_connect_raw(ipaddr, port, method, path, headers=None, + query_string=None): + """ + Helper function to create a HTTPConnection object that is buffered. + + :param ipaddr: IPv4 address to connect to + :param port: port to connect to + :param method: HTTP method to request ('GET', 'PUT', 'POST', etc.) + :param path: request path + :param headers: dictionary of headers + :param query_string: request query string + :returns: HTTPConnection object + """ + conn = BufferedHTTPConnection('%s:%s' % (ipaddr, port)) + if query_string: + path += '?' + query_string + conn.path = path + conn.putrequest(method, path) + if headers: + for header, value in headers.iteritems(): + conn.putheader(header, value) + conn.endheaders() + return conn diff --git a/swift/common/client.py b/swift/common/client.py new file mode 100644 index 000000000..3f3b9d8b4 --- /dev/null +++ b/swift/common/client.py @@ -0,0 +1,718 @@ +# Copyright (c) 2010 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Cloud Files client library used internally +""" +import socket +from cStringIO import StringIO +from httplib import HTTPConnection, HTTPException, HTTPSConnection +from re import compile, DOTALL +from tokenize import generate_tokens, STRING, NAME, OP +from urllib import quote as _quote, unquote +from urlparse import urlparse, urlunparse + +try: + from eventlet import sleep +except: + from time import sleep + + +def quote(value, safe='/'): + """ + Patched version of urllib.quote that encodes utf8 strings before quoting + """ + if isinstance(value, unicode): + value = value.encode('utf8') + return _quote(value, safe) + + +# look for a real json parser first +try: + # simplejson is popular and pretty good + from simplejson import loads as json_loads +except ImportError: + try: + # 2.6 will have a json module in the stdlib + from json import loads as json_loads + except ImportError: + # fall back on local parser otherwise + comments = compile(r'/\*.*\*/|//[^\r\n]*', DOTALL) + + def json_loads(string): + ''' + Fairly competent json parser exploiting the python tokenizer and + eval(). -- From python-cloudfiles + + _loads(serialized_json) -> object + ''' + try: + res = [] + consts = {'true': True, 'false': False, 'null': None} + string = '(' + comments.sub('', string) + ')' + for type, val, _, _, _ in \ + generate_tokens(StringIO(string).readline): + if (type == OP and val not in '[]{}:,()-') or \ + (type == NAME and val not in consts): + raise AttributeError() + elif type == STRING: + res.append('u') + res.append(val.replace('\\/', '/')) + else: + res.append(val) + return eval(''.join(res), {}, consts) + except: + raise AttributeError() + + +class ClientException(Exception): + + def __init__(self, msg, http_scheme='', http_host='', http_port='', + http_path='', http_query='', http_status=0, http_reason='', + http_device=''): + Exception.__init__(self, msg) + self.msg = msg + self.http_scheme = http_scheme + self.http_host = http_host + self.http_port = http_port + self.http_path = http_path + self.http_query = http_query + self.http_status = http_status + self.http_reason = http_reason + self.http_device = http_device + + def __str__(self): + a = self.msg + b = '' + if self.http_scheme: + b += '%s://' % self.http_scheme + if self.http_host: + b += self.http_host + if self.http_port: + b += ':%s' % self.http_port + if self.http_path: + b += self.http_path + if self.http_query: + b += '?%s' % self.http_query + if self.http_status: + if b: + b = '%s %s' % (b, self.http_status) + else: + b = str(self.http_status) + if self.http_reason: + if b: + b = '%s %s' % (b, self.http_reason) + else: + b = '- %s' % self.http_reason + if self.http_device: + if b: + b = '%s: device %s' % (b, self.http_device) + else: + b = 'device %s' % self.http_device + return b and '%s: %s' % (a, b) or a + + +def http_connection(url): + """ + Make an HTTPConnection or HTTPSConnection + + :param url: url to connect to + :returns: tuple of (parsed url, connection object) + :raises ClientException: Unable to handle protocol scheme + """ + parsed = urlparse(url) + if parsed.scheme == 'http': + conn = HTTPConnection(parsed.netloc) + elif parsed.scheme == 'https': + conn = HTTPSConnection(parsed.netloc) + else: + raise ClientException('Cannot handle protocol scheme %s for url %s' % + (parsed.scheme, repr(url))) + return parsed, conn + + +def get_auth(url, user, key, snet=False): + """ + Get authentication credentials + + :param url: authentication URL + :param user: user to auth as + :param key: key or passowrd for auth + :param snet: use SERVICENET internal network default is False + :returns: tuple of (storage URL, storage token, auth token) + :raises ClientException: HTTP GET request to auth URL failed + """ + parsed, conn = http_connection(url) + conn.request('GET', parsed.path, '', + {'X-Auth-User': user, 'X-Auth-Key': key}) + resp = conn.getresponse() + if resp.status < 200 or resp.status >= 300: + raise ClientException('Auth GET failed', http_scheme=parsed.scheme, + http_host=conn.host, http_port=conn.port, + http_path=parsed.path, http_status=resp.status, + http_reason=resp.reason) + url = resp.getheader('x-storage-url') + if snet: + parsed = list(urlparse(url)) + # Second item in the list is the netloc + parsed[1] = 'snet-' + parsed[1] + url = urlunparse(parsed) + return url, resp.getheader('x-storage-token', + resp.getheader('x-auth-token')) + + +def get_account(url, token, marker=None, limit=None, prefix=None, + http_conn=None, full_listing=False): + """ + Get a listing of containers for the account. + + :param url: storage URL + :param token: auth token + :param marker: marker query + :param limit: limit query + :param prefix: prefix query + :param http_conn: HTTP connection object (If None, it will create the + conn object) + :param full_listing: if True, return a full listing, else returns a max + of 10000 listings + :returns: a list of accounts + :raises ClientException: HTTP GET request failed + """ + if not http_conn: + http_conn = http_connection(url) + if full_listing: + rv = [] + listing = get_account(url, token, marker, limit, prefix, http_conn) + while listing: + rv.extend(listing) + marker = listing[-1]['name'] + listing = get_account(url, token, marker, limit, prefix, http_conn) + return rv + parsed, conn = http_conn + qs = 'format=json' + if marker: + qs += '&marker=%s' % quote(marker) + if limit: + qs += '&limit=%d' % limit + if prefix: + qs += '&prefix=%s' % quote(prefix) + conn.request('GET', '%s?%s' % (parsed.path, qs), '', + {'X-Auth-Token': token}) + resp = conn.getresponse() + if resp.status < 200 or resp.status >= 300: + resp.read() + raise ClientException('Account GET failed', http_scheme=parsed.scheme, + http_host=conn.host, http_port=conn.port, + http_path=parsed.path, http_query=qs, http_status=resp.status, + http_reason=resp.reason) + if resp.status == 204: + resp.read() + return [] + return json_loads(resp.read()) + + +def head_account(url, token, http_conn=None): + """ + Get account stats. + + :param url: storage URL + :param token: auth token + :param http_conn: HTTP connection object (If None, it will create the + conn object) + :returns: a tuple of (container count, object count, bytes used) + :raises ClientException: HTTP HEAD request failed + """ + if http_conn: + parsed, conn = http_conn + else: + parsed, conn = http_connection(url) + conn.request('HEAD', parsed.path, '', {'X-Auth-Token': token}) + resp = conn.getresponse() + if resp.status < 200 or resp.status >= 300: + raise ClientException('Account HEAD failed', http_scheme=parsed.scheme, + http_host=conn.host, http_port=conn.port, + http_path=parsed.path, http_status=resp.status, + http_reason=resp.reason) + return int(resp.getheader('x-account-container-count', 0)), \ + int(resp.getheader('x-account-object-count', 0)), \ + int(resp.getheader('x-account-bytes-used', 0)) + + +def get_container(url, token, container, marker=None, limit=None, + prefix=None, delimiter=None, http_conn=None, + full_listing=False): + """ + Get a listing of objects for the container. + + :param url: storage URL + :param token: auth token + :param container: container name to get a listing for + :param marker: marker query + :param limit: limit query + :param prefix: prefix query + :param delimeter: string to delimit the queries on + :param http_conn: HTTP connection object (If None, it will create the + conn object) + :param full_listing: if True, return a full listing, else returns a max + of 10000 listings + :returns: a list of objects + :raises ClientException: HTTP GET request failed + """ + if not http_conn: + http_conn = http_connection(url) + if full_listing: + rv = [] + listing = get_container(url, token, container, marker, limit, prefix, + delimiter, http_conn) + while listing: + rv.extend(listing) + if not delimiter: + marker = listing[-1]['name'] + else: + marker = listing[-1].get('name', listing[-1].get('subdir')) + listing = get_container(url, token, container, marker, limit, + prefix, delimiter, http_conn) + return rv + parsed, conn = http_conn + path = '%s/%s' % (parsed.path, quote(container)) + qs = 'format=json' + if marker: + qs += '&marker=%s' % quote(marker) + if limit: + qs += '&limit=%d' % limit + if prefix: + qs += '&prefix=%s' % quote(prefix) + if delimiter: + qs += '&delimiter=%s' % quote(delimiter) + conn.request('GET', '%s?%s' % (path, qs), '', {'X-Auth-Token': token}) + resp = conn.getresponse() + if resp.status < 200 or resp.status >= 300: + resp.read() + raise ClientException('Container GET failed', + http_scheme=parsed.scheme, http_host=conn.host, + http_port=conn.port, http_path=path, http_query=qs, + http_status=resp.status, http_reason=resp.reason) + if resp.status == 204: + resp.read() + return [] + return json_loads(resp.read()) + + +def head_container(url, token, container, http_conn=None): + """ + Get container stats. + + :param url: storage URL + :param token: auth token + :param container: container name to get stats for + :param http_conn: HTTP connection object (If None, it will create the + conn object) + :returns: a tuple of (object count, bytes used) + :raises ClientException: HTTP HEAD request failed + """ + if http_conn: + parsed, conn = http_conn + else: + parsed, conn = http_connection(url) + path = '%s/%s' % (parsed.path, quote(container)) + conn.request('HEAD', path, '', {'X-Auth-Token': token}) + resp = conn.getresponse() + resp.read() + if resp.status < 200 or resp.status >= 300: + raise ClientException('Container HEAD failed', + http_scheme=parsed.scheme, http_host=conn.host, + http_port=conn.port, http_path=path, http_status=resp.status, + http_reason=resp.reason) + return int(resp.getheader('x-container-object-count', 0)), \ + int(resp.getheader('x-container-bytes-used', 0)) + + +def put_container(url, token, container, http_conn=None): + """ + Create a container + + :param url: storage URL + :param token: auth token + :param container: container name to create + :param http_conn: HTTP connection object (If None, it will create the + conn object) + :raises ClientException: HTTP PUT request failed + """ + if http_conn: + parsed, conn = http_conn + else: + parsed, conn = http_connection(url) + path = '%s/%s' % (parsed.path, quote(container)) + conn.request('PUT', path, '', {'X-Auth-Token': token}) + resp = conn.getresponse() + resp.read() + if resp.status < 200 or resp.status >= 300: + raise ClientException('Container PUT failed', + http_scheme=parsed.scheme, http_host=conn.host, + http_port=conn.port, http_path=path, http_status=resp.status, + http_reason=resp.reason) + + +def delete_container(url, token, container, http_conn=None): + """ + Delete a container + + :param url: storage URL + :param token: auth token + :param container: container name to delete + :param http_conn: HTTP connection object (If None, it will create the + conn object) + :raises ClientException: HTTP DELETE request failed + """ + if http_conn: + parsed, conn = http_conn + else: + parsed, conn = http_connection(url) + path = '%s/%s' % (parsed.path, quote(container)) + conn.request('DELETE', path, '', {'X-Auth-Token': token}) + resp = conn.getresponse() + resp.read() + if resp.status < 200 or resp.status >= 300: + raise ClientException('Container DELETE failed', + http_scheme=parsed.scheme, http_host=conn.host, + http_port=conn.port, http_path=path, http_status=resp.status, + http_reason=resp.reason) + + +def get_object(url, token, container, name, http_conn=None, + resp_chunk_size=None): + """ + Get an object + + :param url: storage URL + :param token: auth token + :param container: container name that the object is in + :param name: object name to get + :param http_conn: HTTP connection object (If None, it will create the + conn object) + :param resp_chunk_size: if defined, chunk size of data to read + :returns: a list of objects + :raises ClientException: HTTP GET request failed + """ + if http_conn: + parsed, conn = http_conn + else: + parsed, conn = http_connection(url) + path = '%s/%s/%s' % (parsed.path, quote(container), quote(name)) + conn.request('GET', path, '', {'X-Auth-Token': token}) + resp = conn.getresponse() + if resp.status < 200 or resp.status >= 300: + resp.read() + raise ClientException('Object GET failed', http_scheme=parsed.scheme, + http_host=conn.host, http_port=conn.port, http_path=path, + http_status=resp.status, http_reason=resp.reason) + metadata = {} + for key, value in resp.getheaders(): + if key.lower().startswith('x-object-meta-'): + metadata[unquote(key[len('x-object-meta-'):])] = unquote(value) + if resp_chunk_size: + + def _object_body(): + buf = resp.read(resp_chunk_size) + while buf: + yield buf + buf = resp.read(resp_chunk_size) + object_body = _object_body() + else: + object_body = resp.read() + return resp.getheader('content-type'), \ + int(resp.getheader('content-length', 0)), \ + resp.getheader('last-modified'), \ + resp.getheader('etag').strip('"'), \ + metadata, \ + object_body + + +def head_object(url, token, container, name, http_conn=None): + """ + Get object info + + :param url: storage URL + :param token: auth token + :param container: container name that the object is in + :param name: object name to get info for + :param http_conn: HTTP connection object (If None, it will create the + conn object) + :returns: a tuple of (content type, content length, last modfied, etag, + dictionary of metadata) + :raises ClientException: HTTP HEAD request failed + """ + if http_conn: + parsed, conn = http_conn + else: + parsed, conn = http_connection(url) + path = '%s/%s/%s' % (parsed.path, quote(container), quote(name)) + conn.request('HEAD', path, '', {'X-Auth-Token': token}) + resp = conn.getresponse() + resp.read() + if resp.status < 200 or resp.status >= 300: + raise ClientException('Object HEAD failed', http_scheme=parsed.scheme, + http_host=conn.host, http_port=conn.port, http_path=path, + http_status=resp.status, http_reason=resp.reason) + metadata = {} + for key, value in resp.getheaders(): + if key.lower().startswith('x-object-meta-'): + metadata[unquote(key[len('x-object-meta-'):])] = unquote(value) + return resp.getheader('content-type'), \ + int(resp.getheader('content-length', 0)), \ + resp.getheader('last-modified'), \ + resp.getheader('etag').strip('"'), \ + metadata + + +def put_object(url, token, container, name, contents, metadata={}, + content_length=None, etag=None, chunk_size=65536, + content_type=None, http_conn=None): + """ + Put an object + + :param url: storage URL + :param token: auth token + :param container: container name that the object is in + :param name: object name to put + :param contents: file like object to read object data from + :param metadata: dictionary of object metadata + :param content_length: value to send as content-length header + :param etag: etag of contents + :param chunk_size: chunk size of data to write + :param content_type: value to send as content-type header + :param http_conn: HTTP connection object (If None, it will create the + conn object) + :returns: etag from server response + :raises ClientException: HTTP PUT request failed + """ + if http_conn: + parsed, conn = http_conn + else: + parsed, conn = http_connection(url) + path = '%s/%s/%s' % (parsed.path, quote(container), quote(name)) + headers = {'X-Auth-Token': token} + for key, value in metadata.iteritems(): + headers['X-Object-Meta-%s' % quote(key)] = quote(value) + if etag: + headers['ETag'] = etag.strip('"') + if content_length is not None: + headers['Content-Length'] = str(content_length) + if content_type is not None: + headers['Content-Type'] = content_type + if not contents: + headers['Content-Length'] = '0' + if hasattr(contents, 'read'): + conn.putrequest('PUT', path) + for header, value in headers.iteritems(): + conn.putheader(header, value) + if not content_length: + conn.putheader('Transfer-Encoding', 'chunked') + conn.endheaders() + chunk = contents.read(chunk_size) + while chunk: + if not content_length: + conn.send('%x\r\n%s\r\n' % (len(chunk), chunk)) + else: + conn.send(chunk) + chunk = contents.read(chunk_size) + if not content_length: + conn.send('0\r\n\r\n') + else: + conn.request('PUT', path, contents, headers) + resp = conn.getresponse() + resp.read() + if resp.status < 200 or resp.status >= 300: + raise ClientException('Object PUT failed', http_scheme=parsed.scheme, + http_host=conn.host, http_port=conn.port, http_path=path, + http_status=resp.status, http_reason=resp.reason) + return resp.getheader('etag').strip('"') + + +def post_object(url, token, container, name, metadata, http_conn=None): + """ + Change object metadata + + :param url: storage URL + :param token: auth token + :param container: container name that the object is in + :param name: object name to change + :param metadata: dictionary of object metadata + :param http_conn: HTTP connection object (If None, it will create the + conn object) + :raises ClientException: HTTP POST request failed + """ + if http_conn: + parsed, conn = http_conn + else: + parsed, conn = http_connection(url) + path = '%s/%s/%s' % (parsed.path, quote(container), quote(name)) + headers = {'X-Auth-Token': token} + for key, value in metadata.iteritems(): + headers['X-Object-Meta-%s' % quote(key)] = quote(value) + conn.request('POST', path, '', headers) + resp = conn.getresponse() + resp.read() + if resp.status < 200 or resp.status >= 300: + raise ClientException('Object POST failed', http_scheme=parsed.scheme, + http_host=conn.host, http_port=conn.port, http_path=path, + http_status=resp.status, http_reason=resp.reason) + + +def delete_object(url, token, container, name, http_conn=None): + """ + Delete object + + :param url: storage URL + :param token: auth token + :param container: container name that the object is in + :param name: object name to delete + :param http_conn: HTTP connection object (If None, it will create the + conn object) + :raises ClientException: HTTP DELETE request failed + """ + if http_conn: + parsed, conn = http_conn + else: + parsed, conn = http_connection(url) + path = '%s/%s/%s' % (parsed.path, quote(container), quote(name)) + conn.request('DELETE', path, '', {'X-Auth-Token': token}) + resp = conn.getresponse() + resp.read() + if resp.status < 200 or resp.status >= 300: + raise ClientException('Object DELETE failed', + http_scheme=parsed.scheme, http_host=conn.host, + http_port=conn.port, http_path=path, http_status=resp.status, + http_reason=resp.reason) + + +class Connection(object): + """Convenience class to make requests that will also retry the request""" + + def __init__(self, authurl, user, key, retries=5, preauthurl=None, + preauthtoken=None, snet=False): + """ + :param authurl: authenitcation URL + :param user: user name to authenticate as + :param key: key/password to authenticate with + :param retries: Number of times to retry the request before failing + :param preauthurl: storage URL (if you have already authenticated) + :param preauthtoken: authentication token (if you have already + authenticated) + :param snet: use SERVICENET internal network default is False + """ + self.authurl = authurl + self.user = user + self.key = key + self.retries = retries + self.http_conn = None + self.url = preauthurl + self.token = preauthtoken + self.attempts = 0 + self.snet = snet + + def _retry(self, func, *args, **kwargs): + kwargs['http_conn'] = self.http_conn + self.attempts = 0 + backoff = 1 + while self.attempts <= self.retries: + self.attempts += 1 + try: + if not self.url or not self.token: + self.url, self.token = \ + get_auth(self.authurl, self.user, self.key, snet=self.snet) + self.http_conn = None + if not self.http_conn: + self.http_conn = http_connection(self.url) + kwargs['http_conn'] = self.http_conn + rv = func(self.url, self.token, *args, **kwargs) + return rv + except (socket.error, HTTPException): + if self.attempts > self.retries: + raise + self.http_conn = None + except ClientException, err: + if self.attempts > self.retries: + raise + if err.http_status == 401: + self.url = self.token = None + if self.attempts > 1: + raise + elif 500 <= err.http_status <= 599: + pass + else: + raise + sleep(backoff) + backoff *= 2 + + def head_account(self): + """Wrapper for head_account""" + return self._retry(head_account) + + def get_account(self, marker=None, limit=None, prefix=None, + full_listing=False): + """Wrapper for get_account""" + # TODO: With full_listing=True this will restart the entire listing + # with each retry. Need to make a better version that just retries + # where it left off. + return self._retry(get_account, marker=marker, limit=limit, + prefix=prefix, full_listing=full_listing) + + def head_container(self, container): + """Wrapper for head_container""" + return self._retry(head_container, container) + + def get_container(self, container, marker=None, limit=None, prefix=None, + delimiter=None, full_listing=False): + """Wrapper for get_container""" + # TODO: With full_listing=True this will restart the entire listing + # with each retry. Need to make a better version that just retries + # where it left off. + return self._retry(get_container, container, marker=marker, + limit=limit, prefix=prefix, delimiter=delimiter, + full_listing=full_listing) + + def put_container(self, container): + """Wrapper for put_container""" + return self._retry(put_container, container) + + def delete_container(self, container): + """Wrapper for delete_container""" + return self._retry(delete_container, container) + + def head_object(self, container, obj): + """Wrapper for head_object""" + return self._retry(head_object, container, obj) + + def get_object(self, container, obj, resp_chunk_size=None): + """Wrapper for get_object""" + return self._retry(get_object, container, obj, + resp_chunk_size=resp_chunk_size) + + def put_object(self, container, obj, contents, metadata={}, + content_length=None, etag=None, chunk_size=65536, + content_type=None): + """Wrapper for put_object""" + return self._retry(put_object, container, obj, contents, + metadata=metadata, content_length=content_length, etag=etag, + chunk_size=chunk_size, content_type=content_type) + + def post_object(self, container, obj, metadata): + """Wrapper for post_object""" + return self._retry(post_object, container, obj, metadata) + + def delete_object(self, container, obj): + """Wrapper for delete_object""" + return self._retry(delete_object, container, obj) diff --git a/swift/common/constraints.py b/swift/common/constraints.py new file mode 100644 index 000000000..8fdacb5a9 --- /dev/null +++ b/swift/common/constraints.py @@ -0,0 +1,152 @@ +# Copyright (c) 2010 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import re + +from webob.exc import HTTPBadRequest, HTTPLengthRequired, \ + HTTPRequestEntityTooLarge + + +#: Max file size allowed for objects +MAX_FILE_SIZE = 5 * 1024 * 1024 * 1024 + 2 +#: Max length of the name of a key for metadata +MAX_META_NAME_LENGTH = 128 +#: Max length of the value of a key for metadata +MAX_META_VALUE_LENGTH = 256 +#: Max number of metadata items +MAX_META_COUNT = 90 +#: Max overall size of metadata +MAX_META_OVERALL_SIZE = 4096 +#: Max object name length +MAX_OBJECT_NAME_LENGTH = 1024 + + +def check_metadata(req): + """ + Check metadata sent for objects in the request headers. + + :param req: request object + :raises HTTPBadRequest: bad metadata + """ + meta_count = 0 + meta_size = 0 + for key, value in req.headers.iteritems(): + if not key.lower().startswith('x-object-meta-'): + continue + key = key[len('x-object-meta-'):] + if not key: + return HTTPBadRequest(body='Metadata name cannot be empty', + request=req, content_type='text/plain') + meta_count += 1 + meta_size += len(key) + len(value) + if len(key) > MAX_META_NAME_LENGTH: + return HTTPBadRequest( + body='Metadata name too long; max %d' + % MAX_META_NAME_LENGTH, + request=req, content_type='text/plain') + elif len(value) > MAX_META_VALUE_LENGTH: + return HTTPBadRequest( + body='Metadata value too long; max %d' + % MAX_META_VALUE_LENGTH, + request=req, content_type='text/plain') + elif meta_count > MAX_META_COUNT: + return HTTPBadRequest( + body='Too many metadata items; max %d' % MAX_META_COUNT, + request=req, content_type='text/plain') + elif meta_size > MAX_META_OVERALL_SIZE: + return HTTPBadRequest( + body='Total metadata too large; max %d' + % MAX_META_OVERALL_SIZE, + request=req, content_type='text/plain') + return None + + +def check_object_creation(req, object_name): + """ + Check to ensure that everything is alright about an object to be created. + + :param req: HTTP request object + :param object_name: name of object to be created + :raises HTTPRequestEntityTooLarge: the object is too large + :raises HTTPLengthRequered: missing content-length header and not + a chunked request + :raises HTTPBadRequest: missing or bad content-type header, or + bad metadata + """ + if req.content_length and req.content_length > MAX_FILE_SIZE: + return HTTPRequestEntityTooLarge(body='Your request is too large.', + request=req, content_type='text/plain') + if req.content_length is None and \ + req.headers.get('transfer-encoding') != 'chunked': + return HTTPLengthRequired(request=req) + if len(object_name) > MAX_OBJECT_NAME_LENGTH: + return HTTPBadRequest(body='Object name length of %d longer than %d' % + (len(object_name), MAX_OBJECT_NAME_LENGTH), request=req, + content_type='text/plain') + if 'Content-Type' not in req.headers: + return HTTPBadRequest(request=req, content_type='text/plain', + body='No content type') + if not check_xml_encodable(req.headers['Content-Type']): + return HTTPBadRequest(request=req, body='Invalid Content-Type', + content_type='text/plain') + return check_metadata(req) + + +def check_mount(root, drive): + """ + Verify that the path to the device is a mount point and mounted. This + allows us to fast fail on drives that have been unmounted because of + issues, and also prevents us for accidently filling up the root partition. + + :param root: base path where the devices are mounted + :param drive: drive name to be checked + :returns: True if it is a valid mounted device, False otherwise + """ + if not drive.isalnum(): + return False + path = os.path.join(root, drive) + return os.path.exists(path) and os.path.ismount(path) + + +def check_float(string): + """ + Helper function for checking if a string can be converted to a float. + + :param string: string to be verified as a float + :returns: True if the string can be converted to a float, False otherwise + """ + try: + float(string) + return True + except ValueError: + return False + + +_invalid_xml = re.compile(ur'[^\x09\x0a\x0d\x20-\uD7FF\uE000-\uFFFD%s-%s]' % + (unichr(0x10000), unichr(0x10FFFF))) + + +def check_xml_encodable(string): + """ + Validate if a string can be encoded in xml. + + :param string: string to be validated + :returns: True if the string can be encoded in xml, False otherwise + """ + try: + return not _invalid_xml.search(string.decode('UTF-8')) + except UnicodeDecodeError: + return False diff --git a/swift/common/db.py b/swift/common/db.py new file mode 100644 index 000000000..3464d451b --- /dev/null +++ b/swift/common/db.py @@ -0,0 +1,1463 @@ +# Copyright (c) 2010 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" Database code for Swift """ + +from __future__ import with_statement +from contextlib import contextmanager +import hashlib +import logging +import operator +import os +from uuid import uuid4 +import time +import cPickle as pickle +import errno +from random import randint +from tempfile import mkstemp +import math + +from eventlet import sleep +import sqlite3 + +from swift.common.utils import normalize_timestamp, renamer, \ + mkdirs, lock_parent_directory, fallocate +from swift.common.exceptions import LockTimeout + + +#: Timeout for trying to connect to a DB +BROKER_TIMEOUT = 25 +#: Pickle protocol to use +PICKLE_PROTOCOL = 2 +#: Max number of pending entries +PENDING_CAP = 131072 + + +class DatabaseConnectionError(sqlite3.DatabaseError): + """More friendly error messages for DB Errors.""" + def __init__(self, path, msg, timeout=0): + self.path = path + self.timeout = timeout + self.msg = msg + + def __str__(self): + return 'DB connection error (%s, %s):\n%s' % ( + self.path, self.timeout, self.msg) + + +class GreenDBConnection(sqlite3.Connection): + """SQLite DB Connection handler that plays well with eventlet.""" + def __init__(self, *args, **kwargs): + self.timeout = kwargs.get('timeout', BROKER_TIMEOUT) + kwargs['timeout'] = 0 + self.db_file = args and args[0] or '-' + sqlite3.Connection.__init__(self, *args, **kwargs) + + def _timeout(self, call): + with LockTimeout(self.timeout, self.db_file): + while True: + try: + return call() + except sqlite3.OperationalError, e: + if 'locked' not in str(e): + raise + sleep(0.05) + + def execute(self, *args, **kwargs): + return self._timeout(lambda: sqlite3.Connection.execute( + self, *args, **kwargs)) + + def commit(self): + return self._timeout(lambda: sqlite3.Connection.commit(self)) + + +def dict_factory(crs, row): + """ + This should only be used when you need a real dict, + i.e. when you're going to serialize the results. + """ + return dict( + ((col[0], row[idx]) for idx, col in enumerate(crs.description))) + + +def chexor(old, name, timestamp): + """ + Each entry in the account and container databases is XORed by the 128-bit + hash on insert or delete. This serves as a rolling, order-independent hash + of the contents. (check + XOR) + + :param old: hex representation of the current DB hash + :param name: name of the object or container being inserted + :param timestamp: timestamp of the new record + :returns: a hex representation of the new hash value + """ + if name is None: + raise Exception('name is None!') + old = old.decode('hex') + new = hashlib.md5(('%s-%s' % (name, timestamp)).encode('utf_8')).digest() + response = ''.join( + map(chr, map(operator.xor, map(ord, old), map(ord, new)))) + return response.encode('hex') + + +def get_db_connection(path, timeout=30, okay_to_create=False): + """ + Returns a properly configured SQLite database connection. + + :param path: path to DB + :param timeout: timeout for connection + :param okay_to_create: if True, create the DB if it doesn't exist + :returns: DB connection object + """ + try: + connect_time = time.time() + conn = sqlite3.connect(path, check_same_thread=False, + factory=GreenDBConnection, timeout=timeout) + if path != ':memory:' and not okay_to_create: + # attempt to detect and fail when connect creates the db file + stat = os.stat(path) + if stat.st_size == 0 and stat.st_ctime >= connect_time: + os.unlink(path) + raise DatabaseConnectionError(path, + 'DB file created by connect?') + conn.row_factory = sqlite3.Row + conn.text_factory = str + conn.execute('PRAGMA synchronous = NORMAL') + conn.execute('PRAGMA count_changes = OFF') + conn.execute('PRAGMA temp_store = MEMORY') + conn.create_function('chexor', 3, chexor) + except sqlite3.DatabaseError: + import traceback + raise DatabaseConnectionError(path, traceback.format_exc(), + timeout=timeout) + return conn + + +class DatabaseBroker(object): + """Encapsulates working with a database.""" + + def __init__(self, db_file, timeout=BROKER_TIMEOUT, logger=None, + account=None, container=None, pending_timeout=10, + stale_reads_ok=False): + """ Encapsulates working with a database. """ + self.conn = None + self.db_file = db_file + self.pending_file = self.db_file + '.pending' + self.pending_timeout = pending_timeout + self.stale_reads_ok = stale_reads_ok + self.db_dir = os.path.dirname(db_file) + self.timeout = timeout + self.logger = logger or logging.getLogger() + self.account = account + self.container = container + + def initialize(self, put_timestamp=None): + """ + Create the DB + + :param put_timestamp: timestamp of initial PUT request + """ + if self.db_file == ':memory:': + tmp_db_file = None + conn = get_db_connection(self.db_file, self.timeout) + else: + mkdirs(self.db_dir) + fd, tmp_db_file = mkstemp(suffix='.tmp', dir=self.db_dir) + os.close(fd) + conn = sqlite3.connect(tmp_db_file, check_same_thread=False, + factory=GreenDBConnection, timeout=0) + # creating dbs implicitly does a lot of transactions, so we + # pick fast, unsafe options here and do a big fsync at the end. + conn.execute('PRAGMA synchronous = OFF') + conn.execute('PRAGMA temp_store = MEMORY') + conn.execute('PRAGMA journal_mode = MEMORY') + conn.create_function('chexor', 3, chexor) + conn.row_factory = sqlite3.Row + conn.text_factory = str + conn.executescript(""" + CREATE TABLE outgoing_sync ( + remote_id TEXT UNIQUE, + sync_point INTEGER, + updated_at TEXT DEFAULT 0 + ); + CREATE TABLE incoming_sync ( + remote_id TEXT UNIQUE, + sync_point INTEGER, + updated_at TEXT DEFAULT 0 + ); + CREATE TRIGGER outgoing_sync_insert AFTER INSERT ON outgoing_sync + BEGIN + UPDATE outgoing_sync + SET updated_at = STRFTIME('%s', 'NOW') + WHERE ROWID = new.ROWID; + END; + CREATE TRIGGER outgoing_sync_update AFTER UPDATE ON outgoing_sync + BEGIN + UPDATE outgoing_sync + SET updated_at = STRFTIME('%s', 'NOW') + WHERE ROWID = new.ROWID; + END; + CREATE TRIGGER incoming_sync_insert AFTER INSERT ON incoming_sync + BEGIN + UPDATE incoming_sync + SET updated_at = STRFTIME('%s', 'NOW') + WHERE ROWID = new.ROWID; + END; + CREATE TRIGGER incoming_sync_update AFTER UPDATE ON incoming_sync + BEGIN + UPDATE incoming_sync + SET updated_at = STRFTIME('%s', 'NOW') + WHERE ROWID = new.ROWID; + END; + """) + if not put_timestamp: + put_timestamp = normalize_timestamp(0) + self._initialize(conn, put_timestamp) + conn.commit() + if tmp_db_file: + conn.close() + with open(tmp_db_file, 'r+b') as fp: + os.fsync(fp.fileno()) + with lock_parent_directory(self.db_file, self.pending_timeout): + if os.path.exists(self.db_file): + # It's as if there was a "condition" where different parts + # of the system were "racing" each other. + raise DatabaseConnectionError(self.db_file, + 'DB created by someone else while working?') + renamer(tmp_db_file, self.db_file) + self.conn = get_db_connection(self.db_file, self.timeout) + else: + self.conn = conn + + def delete_db(self, timestamp): + """ + Mark the DB as deleted + + :param timestamp: delete timestamp + """ + timestamp = normalize_timestamp(timestamp) + with self.get() as conn: + self._delete_db(conn, timestamp) + conn.commit() + + @contextmanager + def get(self): + """Use with the "with" statement; returns a database connection.""" + if not self.conn: + if self.db_file != ':memory:' and os.path.exists(self.db_file): + self.conn = get_db_connection(self.db_file, self.timeout) + else: + raise DatabaseConnectionError(self.db_file, "DB doesn't exist") + conn = self.conn + self.conn = None + try: + yield conn + conn.rollback() + self.conn = conn + except: + conn.close() + raise + + @contextmanager + def lock(self): + """Use with the "with" statement; locks a database.""" + if not self.conn: + if self.db_file != ':memory:' and os.path.exists(self.db_file): + self.conn = get_db_connection(self.db_file, self.timeout) + else: + raise DatabaseConnectionError(self.db_file, "DB doesn't exist") + conn = self.conn + self.conn = None + orig_isolation_level = conn.isolation_level + conn.isolation_level = None + conn.execute('BEGIN IMMEDIATE') + try: + yield True + except: + pass + try: + conn.execute('ROLLBACK') + conn.isolation_level = orig_isolation_level + self.conn = conn + except: # pragma: no cover + logging.exception( + 'Broker error trying to rollback locked connection') + conn.close() + + def newid(self, remote_id): + """ + Re-id the database. This should be called after an rsync. + + :param remote_id: the ID of the remote database being rsynced in + """ + with self.get() as conn: + row = conn.execute(''' + UPDATE %s_stat SET id=? + ''' % self.db_type, (str(uuid4()),)) + row = conn.execute(''' + SELECT ROWID FROM %s ORDER BY ROWID DESC LIMIT 1 + ''' % self.db_contains_type).fetchone() + sync_point = row['ROWID'] if row else -1 + conn.execute(''' + INSERT OR REPLACE INTO incoming_sync (sync_point, remote_id) + VALUES (?, ?) + ''', (sync_point, remote_id)) + self._newid(conn) + conn.commit() + + def _newid(self, conn): + # Override for additional work when receiving an rsynced db. + pass + + def merge_timestamps(self, created_at, put_timestamp, delete_timestamp): + """ + Used in replication to handle updating timestamps. + + :param created_at: create timestamp + :param put_timestamp: put timestamp + :param delete_timestamp: delete timestamp + """ + with self.get() as conn: + row = conn.execute(''' + UPDATE %s_stat SET created_at=MIN(?, created_at), + put_timestamp=MAX(?, put_timestamp), + delete_timestamp=MAX(?, delete_timestamp) + ''' % self.db_type, (created_at, put_timestamp, delete_timestamp)) + conn.commit() + + def get_items_since(self, start, count): + """ + Get a list of objects in the database between start and end. + + :param start: start ROWID + :param count: number to get + :returns: list of objects between start and end + """ + try: + self._commit_puts() + except LockTimeout: + if not self.stale_reads_ok: + raise + with self.get() as conn: + curs = conn.execute(''' + SELECT * FROM %s WHERE ROWID > ? ORDER BY ROWID ASC LIMIT ? + ''' % self.db_contains_type, (start, count)) + curs.row_factory = dict_factory + return [r for r in curs] + + def get_sync(self, id, incoming=True): + """ + Gets the most recent sync point for a server from the sync table. + + :param id: remote ID to get the sync_point for + :param incoming: if True, get the last incoming sync, otherwise get + the last outgoing sync + :returns: the sync point, or -1 if the id doesn't exist. + """ + with self.get() as conn: + row = conn.execute( + "SELECT sync_point FROM %s_sync WHERE remote_id=?" + % ('incoming' if incoming else 'outgoing'), (id,)).fetchone() + if not row: + return -1 + return row['sync_point'] + + def get_syncs(self, incoming=True): + """ + Get a serialized copy of the sync table. + + :param incoming: if True, get the last incoming sync, otherwise get + the last outgoing sync + :returns: list of {'remote_id', 'sync_point'} + """ + with self.get() as conn: + curs = conn.execute(''' + SELECT remote_id, sync_point FROM %s_sync + ''' % 'incoming' if incoming else 'outgoing') + result = [] + for row in curs: + result.append({'remote_id': row[0], 'sync_point': row[1]}) + return result + + def get_replication_info(self): + """ + Get information about the DB required for replication. + + :returns: tuple of (hash, id, created_at, put_timestamp, + delete_timestamp) from the DB + """ + try: + self._commit_puts() + except LockTimeout: + if not self.stale_reads_ok: + raise + with self.get() as conn: + curs = conn.execute(''' + SELECT hash, id, created_at, put_timestamp, delete_timestamp, + %s_count AS count, + CASE WHEN SQLITE_SEQUENCE.seq IS NOT NULL + THEN SQLITE_SEQUENCE.seq ELSE -1 END AS max_row + FROM (%s_stat LEFT JOIN SQLITE_SEQUENCE + ON SQLITE_SEQUENCE.name == '%s') LIMIT 1 + ''' % (self.db_contains_type, self.db_type, self.db_contains_type)) + curs.row_factory = dict_factory + return curs.fetchone() + + def _commit_puts(self): + pass # stub to be overridden if need be + + def merge_syncs(self, sync_points, incoming=True): + """ + Merge a list of sync points with the incoming sync table. + + :param sync_points: list of sync points where a sync point is a dict of + {'sync_point', 'remote_id'} + :param incoming: if True, get the last incoming sync, otherwise get + the last outgoing sync + """ + with self.get() as conn: + for rec in sync_points: + try: + conn.execute(''' + INSERT INTO %s_sync (sync_point, remote_id) + VALUES (?, ?) + ''' % ('incoming' if incoming else 'outgoing'), + (rec['sync_point'], rec['remote_id'])) + except sqlite3.IntegrityError: + conn.execute(''' + UPDATE %s_sync SET sync_point=max(?, sync_point) + WHERE remote_id=? + ''' % ('incoming' if incoming else 'outgoing'), + (rec['sync_point'], rec['remote_id'])) + conn.commit() + + def _preallocate(self): + """ + The idea is to allocate space in front of an expanding db. If it gets + within 512k of a boundary, it allocates to the next boundary. + Boundaries are 2m, 5m, 10m, 25m, 50m, then every 50m after. + """ + if self.db_file == ':memory:': + return + MB = (1024 * 1024) + + def prealloc_points(): + for pm in (1, 2, 5, 10, 25, 50): + yield pm * MB + while True: + pm += 50 + yield pm * MB + + stat = os.stat(self.db_file) + file_size = stat.st_size + allocated_size = stat.st_blocks * 512 + for point in prealloc_points(): + if file_size <= point - MB / 2: + prealloc_size = point + break + if allocated_size < prealloc_size: + with open(self.db_file, 'rb+') as fp: + fallocate(fp.fileno(), int(prealloc_size)) + + +class ContainerBroker(DatabaseBroker): + """Encapsulates working with a container database.""" + db_type = 'container' + db_contains_type = 'object' + + def _initialize(self, conn, put_timestamp): + """Creates a brand new database (tables, indices, triggers, etc.)""" + if not self.account: + raise ValueError( + 'Attempting to create a new database with no account set') + if not self.container: + raise ValueError( + 'Attempting to create a new database with no container set') + self.create_object_table(conn) + self.create_container_stat_table(conn, put_timestamp) + + def create_object_table(self, conn): + """ + Create the object table which is specifc to the container DB. + + :param conn: DB connection object + """ + conn.executescript(""" + CREATE TABLE object ( + ROWID INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT UNIQUE, + created_at TEXT, + size INTEGER, + content_type TEXT, + etag TEXT, + deleted INTEGER DEFAULT 0 + ); + + CREATE INDEX ix_object_deleted ON object (deleted); + + CREATE TRIGGER object_insert AFTER INSERT ON object + BEGIN + UPDATE container_stat + SET object_count = object_count + (1 - new.deleted), + bytes_used = bytes_used + new.size, + hash = chexor(hash, new.name, new.created_at); + END; + + CREATE TRIGGER object_update BEFORE UPDATE ON object + BEGIN + SELECT RAISE(FAIL, 'UPDATE not allowed; DELETE and INSERT'); + END; + + CREATE TRIGGER object_delete AFTER DELETE ON object + BEGIN + UPDATE container_stat + SET object_count = object_count - (1 - old.deleted), + bytes_used = bytes_used - old.size, + hash = chexor(hash, old.name, old.created_at); + END; + """) + + def create_container_stat_table(self, conn, put_timestamp=None): + """ + Create the container_stat table which is specifc to the container DB. + + :param conn: DB connection object + :param put_timestamp: put timestamp + """ + if put_timestamp is None: + put_timestamp = normalize_timestamp(0) + conn.executescript(""" + CREATE TABLE container_stat ( + account TEXT, + container TEXT, + created_at TEXT, + put_timestamp TEXT DEFAULT '0', + delete_timestamp TEXT DEFAULT '0', + object_count INTEGER, + bytes_used INTEGER, + reported_put_timestamp TEXT DEFAULT '0', + reported_delete_timestamp TEXT DEFAULT '0', + reported_object_count INTEGER DEFAULT 0, + reported_bytes_used INTEGER DEFAULT 0, + hash TEXT default '00000000000000000000000000000000', + id TEXT, + status TEXT DEFAULT '', + status_changed_at TEXT DEFAULT '0' + ); + + INSERT INTO container_stat (object_count, bytes_used) + VALUES (0, 0); + """) + conn.execute(''' + UPDATE container_stat + SET account = ?, container = ?, created_at = ?, id = ?, + put_timestamp = ? + ''', (self.account, self.container, normalize_timestamp(time.time()), + str(uuid4()), put_timestamp)) + + def _newid(self, conn): + conn.execute(''' + UPDATE container_stat + SET reported_put_timestamp = 0, reported_delete_timestamp = 0, + reported_object_count = 0, reported_bytes_used = 0''') + + def update_put_timestamp(self, timestamp): + """ + Update the put_timestamp. Only modifies it if it is greater than + the current timestamp. + + :param timestamp: put timestamp + """ + with self.get() as conn: + conn.execute(''' + UPDATE container_stat SET put_timestamp = ? + WHERE put_timestamp < ? ''', (timestamp, timestamp)) + conn.commit() + + def _delete_db(self, conn, timestamp): + """ + Mark the DB as deleted + + :param conn: DB connection object + :param timestamp: timestamp to mark as deleted + """ + conn.execute(""" + UPDATE container_stat + SET delete_timestamp = ?, + status = 'DELETED', + status_changed_at = ? + WHERE delete_timestamp < ? """, (timestamp, timestamp, timestamp)) + + def empty(self): + """ + Check if the DB is empty. + + :returns: True if the database has no active objects, False otherwise + """ + try: + self._commit_puts() + except LockTimeout: + if not self.stale_reads_ok: + raise + with self.get() as conn: + row = conn.execute( + 'SELECT object_count from container_stat').fetchone() + return (row[0] == 0) + + def _commit_puts(self, item_list=None): + """Handles commiting rows in .pending files.""" + if self.db_file == ':memory:' or not os.path.exists(self.pending_file): + return + if item_list is None: + item_list = [] + with lock_parent_directory(self.pending_file, self.pending_timeout): + self._preallocate() + if not os.path.getsize(self.pending_file): + if item_list: + self.merge_items(item_list) + return + with open(self.pending_file, 'r+b') as fp: + for entry in fp.read().split(':'): + if entry: + try: + (name, timestamp, size, content_type, etag, + deleted) = pickle.loads(entry.decode('base64')) + item_list.append({'name': name, 'created_at': + timestamp, 'size': size, 'content_type': + content_type, 'etag': etag, + 'deleted': deleted}) + except: + self.logger.exception( + 'Invalid pending entry %s: %s' + % (self.pending_file, entry)) + if item_list: + self.merge_items(item_list) + try: + os.ftruncate(fp.fileno(), 0) + except OSError, err: + if err.errno != errno.ENOENT: + raise + + def reclaim(self, object_timestamp, sync_timestamp): + """ + Delete rows from the object table that are marked deleted and + whose created_at timestamp is < object_timestamp. Also deletes rows + from incoming_sync and outgoing_sync where the updated_at timestamp is + < sync_timestamp. + + :param object_timestamp: max created_at timestamp of object rows to + delete + :param sync_timestamp: max update_at timestamp of sync rows to delete + """ + self._commit_puts() + with self.get() as conn: + conn.execute(""" + DELETE FROM object + WHERE deleted = 1 + AND created_at < ?""", (object_timestamp,)) + try: + conn.execute(''' + DELETE FROM outgoing_sync WHERE updated_at < ? + ''', (sync_timestamp,)) + conn.execute(''' + DELETE FROM incoming_sync WHERE updated_at < ? + ''', (sync_timestamp,)) + except sqlite3.OperationalError, err: + # Old dbs didn't have updated_at in the _sync tables. + if 'no such column: updated_at' not in str(err): + raise + conn.commit() + + def delete_object(self, name, timestamp): + """ + Mark an object deleted. + + :param name: object name to be deleted + :param timestamp: timestamp when the object was marked as deleted + """ + self.put_object(name, timestamp, 0, 'application/deleted', 'noetag', 1) + + def put_object(self, name, timestamp, size, content_type, etag, deleted=0): + """ + Creates an object in the DB with its metadata. + + :param name: object name to be created + :param timestamp: timestamp of when the object was created + :param size: object size + :param content_type: object content-type + :param etag: object etag + :param deleted: if True, marks the object as deleted and sets the + deteleted_at timestamp to timestamp + """ + record = {'name': name, 'created_at': timestamp, 'size': size, + 'content_type': content_type, 'etag': etag, + 'deleted': deleted} + if self.db_file == ':memory:': + self.merge_items([record]) + return + if not os.path.exists(self.db_file): + raise DatabaseConnectionError(self.db_file, "DB doesn't exist") + pending_size = 0 + try: + pending_size = os.path.getsize(self.pending_file) + except OSError, err: + if err.errno != errno.ENOENT: + raise + if pending_size > PENDING_CAP: + self._commit_puts([record]) + else: + with lock_parent_directory( + self.pending_file, self.pending_timeout): + with open(self.pending_file, 'a+b') as fp: + # Colons aren't used in base64 encoding; so they are our + # delimiter + fp.write(':') + fp.write(pickle.dumps( + (name, timestamp, size, content_type, etag, deleted), + protocol=PICKLE_PROTOCOL).encode('base64')) + fp.flush() + + def is_deleted(self, timestamp=None): + """ + Check if the DB is considered to be deleted. + + :returns: True if the DB is considered to be deleted, False otherwise + """ + if self.db_file != ':memory:' and not os.path.exists(self.db_file): + return True + try: + self._commit_puts() + except LockTimeout: + if not self.stale_reads_ok: + raise + with self.get() as conn: + row = conn.execute(''' + SELECT put_timestamp, delete_timestamp, object_count + FROM container_stat''').fetchone() + # leave this db as a tombstone for a consistency window + if timestamp and row['delete_timestamp'] > timestamp: + return False + # The container is considered deleted if the delete_timestamp + # value is greater than the put_timestamp, and there are no + # objects in the container. + return (row['object_count'] in (None, '', 0, '0')) and \ + (float(row['delete_timestamp']) > float(row['put_timestamp'])) + + def get_info(self): + """ + Get global data for the container. + + :returns: a tuple of (account, container, created_at, put_timestamp, + delete_timestamp, object_count, bytes_used, + reported_put_timestamp, reported_delete_timestamp, + reported_object_count, reported_bytes_used, hash, id) + """ + try: + self._commit_puts() + except LockTimeout: + if not self.stale_reads_ok: + raise + with self.get() as conn: + return conn.execute(''' + SELECT account, container, created_at, put_timestamp, + delete_timestamp, object_count, bytes_used, + reported_put_timestamp, reported_delete_timestamp, + reported_object_count, reported_bytes_used, hash, id + FROM container_stat + ''').fetchone() + + def reported(self, put_timestamp, delete_timestamp, object_count, + bytes_used): + """ + Update reported stats. + + :param put_timestamp: put_timestamp to update + :param delete_timestamp: delete_timestamp to update + :param object_count: object_count to update + :param bytes_used: bytes_used to update + """ + with self.get() as conn: + conn.execute(''' + UPDATE container_stat + SET reported_put_timestamp = ?, reported_delete_timestamp = ?, + reported_object_count = ?, reported_bytes_used = ? + ''', (put_timestamp, delete_timestamp, object_count, bytes_used)) + conn.commit() + + def get_random_objects(self, max_count=100): + """ + Get random objects from the DB. This is used by the container_auditor + when testing random objects for existence. + + :param max_count: maximum number of objects to get + + :returns: list of object names + """ + try: + self._commit_puts() + except LockTimeout: + if not self.stale_reads_ok: + raise + rv = [] + with self.get() as conn: + row = conn.execute(''' + SELECT ROWID FROM object ORDER BY ROWID DESC LIMIT 1 + ''').fetchone() + if not row: + return [] + max_rowid = row['ROWID'] + for _ in xrange(min(max_count, max_rowid)): + row = conn.execute(''' + SELECT name FROM object WHERE ROWID >= ? AND +deleted = 0 + LIMIT 1 + ''', (randint(0, max_rowid),)).fetchone() + if row: + rv.append(row['name']) + return list(set(rv)) + + def list_objects_iter(self, limit, marker, prefix, delimiter, path=None, + format=None): + """ + Get a list of objects sorted by name starting at marker onward, up + to limit entries. Entries will begin with the prefix and will not + have the delimiter after the prefix. + + :param limit: maximum number of entries to get + :param marker: marker query + :param prefix: prefix query + :param delimeter: delimeter for query + :param path: if defined, will set the prefix and delimter based on + the path + :param format: TOOD: remove as it is no longer used + + :returns: list of tuples of (name, created_at, size, content_type, + etag) + """ + try: + self._commit_puts() + except LockTimeout: + if not self.stale_reads_ok: + raise + if path is not None: + prefix = path + if path: + prefix = path = path.rstrip('/') + '/' + delimiter = '/' + elif delimiter and not prefix: + prefix = '' + orig_marker = marker + with self.get() as conn: + results = [] + while len(results) < limit: + query = '''SELECT name, created_at, size, content_type, etag + FROM object WHERE''' + query_args = [] + if marker and marker >= prefix: + query += ' name > ? AND' + query_args.append(marker) + elif prefix: + query += ' name >= ? AND' + query_args.append(prefix) + query += ' +deleted = 0 ORDER BY name LIMIT ?' + query_args.append(limit - len(results)) + curs = conn.execute(query, query_args) + curs.row_factory = None + + if prefix is None: + return [r for r in curs] + if not delimiter: + return [r for r in curs if r[0].startswith(prefix)] + rowcount = 0 + for row in curs: + rowcount += 1 + marker = name = row[0] + if len(results) >= limit or not name.startswith(prefix): + curs.close() + return results + end = name.find(delimiter, len(prefix)) + if path is not None: + if name == path: + continue + if end >= 0 and len(name) > end + len(delimiter): + marker = name[:end] + chr(ord(delimiter) + 1) + curs.close() + break + elif end > 0: + marker = name[:end] + chr(ord(delimiter) + 1) + dir_name = name[:end + 1] + if dir_name != orig_marker: + results.append([dir_name, '0', 0, None, '']) + curs.close() + break + results.append(row) + if not rowcount: + break + return results + + def merge_items(self, item_list, source=None): + """ + Merge items into the object table. + + :param item_list: list of dictionaries of {'name', 'created_at', + 'size', 'content_type', 'etag', 'deleted'} + :param source: if defined, update incoming_sync with the source + """ + with self.get() as conn: + max_rowid = -1 + for rec in item_list: + curs = conn.execute(''' + DELETE FROM object WHERE name = ? AND + (created_at < ?) + ''', (rec['name'], rec['created_at'])) + try: + conn.execute(''' + INSERT INTO object (name, created_at, size, + content_type, etag, deleted) + VALUES (?, ?, ?, ?, ?, ?) + ''', ([rec['name'], rec['created_at'], rec['size'], + rec['content_type'], rec['etag'], rec['deleted']])) + except sqlite3.IntegrityError: + pass + if source: + max_rowid = max(max_rowid, rec['ROWID']) + if source: + try: + conn.execute(''' + INSERT INTO incoming_sync (sync_point, remote_id) + VALUES (?, ?) + ''', (max_rowid, source)) + except sqlite3.IntegrityError: + conn.execute(''' + UPDATE incoming_sync SET sync_point=max(?, sync_point) + WHERE remote_id=? + ''', (max_rowid, source)) + conn.commit() + + +class AccountBroker(DatabaseBroker): + """Encapsulates working with a account database.""" + db_type = 'account' + db_contains_type = 'container' + + def _initialize(self, conn, put_timestamp): + """ + Create a brand new database (tables, indices, triggers, etc.) + + :param conn: DB connection object + :param put_timestamp: put timestamp + """ + if not self.account: + raise ValueError( + 'Attempting to create a new database with no account set') + self.create_container_table(conn) + self.create_account_stat_table(conn, put_timestamp) + + def create_container_table(self, conn): + """ + Create container table which is specific to the account DB. + + :param conn: DB connection object + """ + conn.executescript(""" + CREATE TABLE container ( + ROWID INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT UNIQUE, + put_timestamp TEXT, + delete_timestamp TEXT, + object_count INTEGER, + bytes_used INTEGER, + deleted INTEGER DEFAULT 0 + ); + + CREATE INDEX ix_container_deleted ON container (deleted); + CREATE INDEX ix_container_name ON container (name); + CREATE TRIGGER container_insert AFTER INSERT ON container + BEGIN + UPDATE account_stat + SET container_count = container_count + (1 - new.deleted), + object_count = object_count + new.object_count, + bytes_used = bytes_used + new.bytes_used, + hash = chexor(hash, new.name, + new.put_timestamp || '-' || + new.delete_timestamp || '-' || + new.object_count || '-' || new.bytes_used); + END; + + CREATE TRIGGER container_update BEFORE UPDATE ON container + BEGIN + SELECT RAISE(FAIL, 'UPDATE not allowed; DELETE and INSERT'); + END; + + + CREATE TRIGGER container_delete AFTER DELETE ON container + BEGIN + UPDATE account_stat + SET container_count = container_count - (1 - old.deleted), + object_count = object_count - old.object_count, + bytes_used = bytes_used - old.bytes_used, + hash = chexor(hash, old.name, + old.put_timestamp || '-' || + old.delete_timestamp || '-' || + old.object_count || '-' || old.bytes_used); + END; + """) + + def create_account_stat_table(self, conn, put_timestamp): + """ + Create account_stat table which is specific to the account DB. + + :param conn: DB connection object + :param put_timestamp: put timestamp + """ + conn.executescript(""" + CREATE TABLE account_stat ( + account TEXT, + created_at TEXT, + put_timestamp TEXT DEFAULT '0', + delete_timestamp TEXT DEFAULT '0', + container_count INTEGER, + object_count INTEGER DEFAULT 0, + bytes_used INTEGER DEFAULT 0, + hash TEXT default '00000000000000000000000000000000', + id TEXT, + status TEXT DEFAULT '', + status_changed_at TEXT DEFAULT '0' + ); + + INSERT INTO account_stat (container_count) VALUES (0); + """) + + conn.execute(''' + UPDATE account_stat SET account = ?, created_at = ?, id = ?, + put_timestamp = ? + ''', (self.account, normalize_timestamp(time.time()), str(uuid4()), + put_timestamp)) + + def update_put_timestamp(self, timestamp): + """ + Update the put_timestamp. Only modifies it if it is greater than + the current timestamp. + + :param timestamp: put timestamp + """ + with self.get() as conn: + conn.execute(''' + UPDATE account_stat SET put_timestamp = ? + WHERE put_timestamp < ? ''', (timestamp, timestamp)) + conn.commit() + + def _delete_db(self, conn, timestamp, force=False): + """ + Mark the DB as deleted. + + :param conn: DB connection object + :param timestamp: timestamp to mark as deleted + """ + conn.execute(""" + UPDATE account_stat + SET delete_timestamp = ?, + status = 'DELETED', + status_changed_at = ? + WHERE delete_timestamp < ? """, (timestamp, timestamp, timestamp)) + + def _commit_puts(self, item_list=None): + """Handles commiting rows in .pending files.""" + if self.db_file == ':memory:' or not os.path.exists(self.pending_file): + return + if item_list is None: + item_list = [] + with lock_parent_directory(self.pending_file, self.pending_timeout): + self._preallocate() + if not os.path.getsize(self.pending_file): + if item_list: + self.merge_items(item_list) + return + with open(self.pending_file, 'r+b') as fp: + for entry in fp.read().split(':'): + if entry: + try: + (name, put_timestamp, delete_timestamp, + object_count, bytes_used, deleted) = \ + pickle.loads(entry.decode('base64')) + item_list.append({'name': name, + 'put_timestamp': put_timestamp, + 'delete_timestamp': delete_timestamp, + 'object_count': object_count, + 'bytes_used': bytes_used, + 'deleted': deleted}) + except: + self.logger.exception( + 'Invalid pending entry %s: %s' + % (self.pending_file, entry)) + if item_list: + self.merge_items(item_list) + try: + os.ftruncate(fp.fileno(), 0) + except OSError, err: + if err.errno != errno.ENOENT: + raise + + def empty(self): + """ + Check if the account DB is empty. + + :returns: True if the database has no active containers. + """ + try: + self._commit_puts() + except LockTimeout: + if not self.stale_reads_ok: + raise + with self.get() as conn: + row = conn.execute( + 'SELECT container_count from account_stat').fetchone() + return (row[0] == 0) + + def reclaim(self, container_timestamp, sync_timestamp): + """ + Delete rows from the container table that are marked deleted and + whose created_at timestamp is < object_timestamp. Also deletes rows + from incoming_sync and outgoing_sync where the updated_at timestamp is + < sync_timestamp. + + :param object_timestamp: max created_at timestamp of container rows to + delete + :param sync_timestamp: max update_at timestamp of sync rows to delete + """ + + self._commit_puts() + with self.get() as conn: + conn.execute(''' + DELETE FROM container WHERE + deleted = 1 AND delete_timestamp < ? + ''', (container_timestamp,)) + try: + conn.execute(''' + DELETE FROM outgoing_sync WHERE updated_at < ? + ''', (sync_timestamp,)) + conn.execute(''' + DELETE FROM incoming_sync WHERE updated_at < ? + ''', (sync_timestamp,)) + except sqlite3.OperationalError, err: + # Old dbs didn't have updated_at in the _sync tables. + if 'no such column: updated_at' not in str(err): + raise + conn.commit() + + def get_container_timestamp(self, container_name): + """ + Get the put_timestamp of a container. + + :param container_name: container name + + :returns: put_timestamp of the container + """ + try: + self._commit_puts() + except LockTimeout: + if not self.stale_reads_ok: + raise + with self.get() as conn: + ret = conn.execute(''' + SELECT put_timestamp FROM container + WHERE name = ? AND deleted != 1''', + (container_name,)).fetchone() + if ret: + ret = ret[0] + return ret + + def put_container(self, name, put_timestamp, delete_timestamp, + object_count, bytes_used): + """ + Create a container with the given attributes. + + :param name: name of the container to create + :param put_timestamp: put_timestamp of the container to create + :param delete_timestamp: delete_timestamp of the container to create + :param object_count: number of objects in the container + :param bytes_used: number of bytes used by the container + """ + if delete_timestamp > put_timestamp and \ + object_count in (None, '', 0, '0'): + deleted = 1 + else: + deleted = 0 + record = {'name': name, 'put_timestamp': put_timestamp, + 'delete_timestamp': delete_timestamp, + 'object_count': object_count, + 'bytes_used': bytes_used, + 'deleted': deleted} + if self.db_file == ':memory:': + self.merge_items([record]) + return + commit = False + with lock_parent_directory(self.pending_file, self.pending_timeout): + with open(self.pending_file, 'a+b') as fp: + # Colons aren't used in base64 encoding; so they are our + # delimiter + fp.write(':') + fp.write(pickle.dumps( + (name, put_timestamp, delete_timestamp, object_count, + bytes_used, deleted), + protocol=PICKLE_PROTOCOL).encode('base64')) + fp.flush() + if fp.tell() > PENDING_CAP: + commit = True + if commit: + self._commit_puts() + + def can_delete_db(self, cutoff): + """ + Check if the accont DB can be deleted. + + :returns: True if the account can be deleted, False otherwise + """ + self._commit_puts() + with self.get() as conn: + row = conn.execute(''' + SELECT status, put_timestamp, delete_timestamp, container_count + FROM account_stat''').fetchone() + # The account is considered deleted if its status is marked + # as 'DELETED" and the delete_timestamp is older than the supplied + # cutoff date; or if the delete_timestamp value is greater than + # the put_timestamp, and there are no containers for the account + status_del = (row['status'] == 'DELETED') + deltime = float(row['delete_timestamp']) + past_cutoff = (deltime < cutoff) + time_later = (row['delete_timestamp'] > row['put_timestamp']) + no_containers = (row['container_count'] in (None, '', 0, '0')) + return ( + (status_del and past_cutoff) or (time_later and no_containers)) + + def is_deleted(self): + """ + Check if the account DB is considered to be deleted. + + :returns: True if the account DB is considered to be deleted, False + otherwise + """ + if self.db_file != ':memory:' and not os.path.exists(self.db_file): + return True + try: + self._commit_puts() + except LockTimeout: + if not self.stale_reads_ok: + raise + with self.get() as conn: + row = conn.execute(''' + SELECT put_timestamp, delete_timestamp, container_count, status + FROM account_stat''').fetchone() + return row['status'] == 'DELETED' or ( + row['container_count'] in (None, '', 0, '0') and + row['delete_timestamp'] > row['put_timestamp']) + + def is_status_deleted(self): + """Only returns true if the status field is set to DELETED.""" + with self.get() as conn: + row = conn.execute(''' + SELECT status + FROM account_stat''').fetchone() + return (row['status'] == "DELETED") + + def get_info(self): + """ + Get global data for the account. + + :returns: a tuple of (account, created_at, put_timestamp, + delete_timestamp, container_count, object_count, + bytes_used, hash, id) + """ + try: + self._commit_puts() + except LockTimeout: + if not self.stale_reads_ok: + raise + with self.get() as conn: + return conn.execute(''' + SELECT account, created_at, put_timestamp, delete_timestamp, + container_count, object_count, bytes_used, hash, id + FROM account_stat + ''').fetchone() + + def get_random_containers(self, max_count=100): + """ + Get random containers from the DB. This is used by the + account_auditor when testing random containerss for existence. + + :param max_count: maximum number of containers to get + + :returns: list of container names + """ + try: + self._commit_puts() + except LockTimeout: + if not self.stale_reads_ok: + raise + rv = [] + with self.get() as conn: + row = conn.execute(''' + SELECT ROWID FROM container ORDER BY ROWID DESC LIMIT 1 + ''').fetchone() + if not row: + return [] + max_rowid = row['ROWID'] + for _ in xrange(min(max_count, max_rowid)): + row = conn.execute(''' + SELECT name FROM container WHERE + ROWID >= ? AND +deleted = 0 + LIMIT 1 + ''', (randint(0, max_rowid),)).fetchone() + if row: + rv.append(row['name']) + return list(set(rv)) + + def list_containers_iter(self, limit, marker, prefix, delimiter): + """ + Get a list of containerss sorted by name starting at marker onward, up + to limit entries. Entries will begin with the prefix and will not + have the delimiter after the prefix. + + :param limit: maximum number of entries to get + :param marker: marker query + :param prefix: prefix query + :param delimeter: delimeter for query + + :returns: list of tuples of (name, object_count, bytes_used, 0) + """ + try: + self._commit_puts() + except LockTimeout: + if not self.stale_reads_ok: + raise + if delimiter and not prefix: + prefix = '' + orig_marker = marker + with self.get() as conn: + results = [] + while len(results) < limit: + query = """ + SELECT name, object_count, bytes_used, 0 + FROM container + WHERE deleted = 0 AND """ + query_args = [] + if marker and marker >= prefix: + query += ' name > ? AND' + query_args.append(marker) + elif prefix: + query += ' name >= ? AND' + query_args.append(prefix) + query += ' +deleted = 0 ORDER BY name LIMIT ?' + query_args.append(limit - len(results)) + curs = conn.execute(query, query_args) + curs.row_factory = None + + if prefix is None: + return [r for r in curs] + if not delimiter: + return [r for r in curs if r[0].startswith(prefix)] + rowcount = 0 + for row in curs: + rowcount += 1 + marker = name = row[0] + if len(results) >= limit or not name.startswith(prefix): + curs.close() + return results + end = name.find(delimiter, len(prefix)) + if end > 0: + marker = name[:end] + chr(ord(delimiter) + 1) + dir_name = name[:end + 1] + if dir_name != orig_marker: + results.append([dir_name, 0, 0, 1]) + curs.close() + break + results.append(row) + if not rowcount: + break + return results + + def merge_items(self, item_list, source=None): + """ + Merge items into the container table. + + :param item_list: list of dictionaries of {'name', 'put_timestamp', + 'delete_timestamp', 'object_count', 'bytes_used', + 'deleted'} + :param source: if defined, update incoming_sync with the source + """ + with self.get() as conn: + max_rowid = -1 + for rec in item_list: + record = [rec['name'], rec['put_timestamp'], + rec['delete_timestamp'], rec['object_count'], + rec['bytes_used'], rec['deleted']] + try: + conn.execute(''' + INSERT INTO container (name, put_timestamp, + delete_timestamp, object_count, bytes_used, + deleted) + VALUES (?, ?, ?, ?, ?, ?) + ''', record) + except sqlite3.IntegrityError: + curs = conn.execute(''' + SELECT name, put_timestamp, delete_timestamp, + object_count, bytes_used, deleted + FROM container WHERE name = ? AND + (put_timestamp < ? OR delete_timestamp < ? OR + object_count != ? OR bytes_used != ?)''', + (rec['name'], rec['put_timestamp'], + rec['delete_timestamp'], rec['object_count'], + rec['bytes_used'])) + curs.row_factory = None + row = curs.fetchone() + if row: + row = list(row) + for i in xrange(5): + if record[i] is None and row[i] is not None: + record[i] = row[i] + if row[1] > record[1]: # Keep newest put_timestamp + record[1] = row[1] + if row[2] > record[2]: # Keep newest delete_timestamp + record[2] = row[2] + conn.execute('DELETE FROM container WHERE name = ?', + (record[0],)) + # If deleted, mark as such + if record[2] > record[1] and \ + record[3] in (None, '', 0, '0'): + record[5] = 1 + else: + record[5] = 0 + try: + conn.execute(''' + INSERT INTO container (name, put_timestamp, + delete_timestamp, object_count, bytes_used, + deleted) + VALUES (?, ?, ?, ?, ?, ?) + ''', record) + except sqlite3.IntegrityError: + continue + if source: + max_rowid = max(max_rowid, rec['ROWID']) + if source: + try: + conn.execute(''' + INSERT INTO incoming_sync (sync_point, remote_id) + VALUES (?, ?) + ''', (max_rowid, source)) + except sqlite3.IntegrityError: + conn.execute(''' + UPDATE incoming_sync SET sync_point=max(?, sync_point) + WHERE remote_id=? + ''', (max_rowid, source)) + conn.commit() diff --git a/swift/common/db_replicator.py b/swift/common/db_replicator.py new file mode 100644 index 000000000..b8df33038 --- /dev/null +++ b/swift/common/db_replicator.py @@ -0,0 +1,526 @@ +# Copyright (c) 2010 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import with_statement +import sys +import os +import random +import math +import time +import shutil + +from eventlet import GreenPool, sleep, Timeout +from eventlet.green import subprocess +import simplejson +from webob import Response +from webob.exc import HTTPNotFound, HTTPNoContent, HTTPAccepted, \ + HTTPInsufficientStorage, HTTPBadRequest + +from swift.common.utils import get_logger, whataremyips, storage_directory, \ + renamer, mkdirs, lock_parent_directory, unlink_older_than, LoggerFileObject +from swift.common import ring +from swift.common.bufferedhttp import BufferedHTTPConnection +from swift.common.exceptions import DriveNotMounted, ConnectionTimeout + + +def quarantine_db(object_file, server_type): + """ + In the case that a corrupt file is found, move it to a quarantined area to + allow replication to fix it. + + :param object_file: path to corrupt file + :param server_type: type of file that is corrupt + ('container' or 'account') + """ + object_dir = os.path.dirname(object_file) + quarantine_dir = os.path.abspath(os.path.join(object_dir, '..', + '..', '..', '..', 'quarantined', server_type + 's', + os.path.basename(object_dir))) + renamer(object_dir, quarantine_dir) + + +class ReplConnection(BufferedHTTPConnection): + """ + Helper to simplify POSTing to a remote server. + """ + def __init__(self, node, partition, hash_, logger): + "" + self.logger = logger + self.node = node + BufferedHTTPConnection.__init__(self, '%(ip)s:%(port)s' % node) + self.path = '/%s/%s/%s' % (node['device'], partition, hash_) + + def post(self, *args): + """ + Make an HTTP POST request + + :param args: list of json-encodable objects + + :returns: httplib response object + """ + try: + body = simplejson.dumps(args) + self.request('POST', self.path, body, + {'Content-Type': 'application/json'}) + response = self.getresponse() + response.data = response.read() + return response + except: + self.logger.exception( + 'ERROR reading HTTP response from %s' % self.node) + return None + + +class Replicator(object): + """ + Implements the logic for directing db replication. + """ + + def __init__(self, server_conf, replicator_conf): + self.logger = \ + get_logger(replicator_conf, '%s-replicator' % self.server_type) + # log uncaught exceptions + sys.excepthook = lambda *exc_info: \ + self.logger.critical('UNCAUGHT EXCEPTION', exc_info=exc_info) + sys.stdout = sys.stderr = LoggerFileObject(self.logger) + self.root = server_conf.get('devices', '/srv/node') + self.mount_check = server_conf.get('mount_check', 'true').lower() in \ + ('true', 't', '1', 'on', 'yes', 'y') + self.port = int(server_conf.get('bind_port', self.default_port)) + concurrency = int(replicator_conf.get('concurrency', 8)) + self.cpool = GreenPool(size=concurrency) + swift_dir = server_conf.get('swift_dir', '/etc/swift') + self.ring = ring.Ring(os.path.join(swift_dir, self.ring_file)) + self.per_diff = int(replicator_conf.get('per_diff', 1000)) + self.run_pause = int(replicator_conf.get('run_pause', 30)) + self.vm_test_mode = replicator_conf.get( + 'vm_test_mode', 'no').lower() in ('yes', 'true', 'on', '1') + self.node_timeout = int(replicator_conf.get('node_timeout', 10)) + self.conn_timeout = float(replicator_conf.get('conn_timeout', 0.5)) + self.reclaim_age = float(replicator_conf.get('reclaim_age', 86400 * 7)) + self._zero_stats() + + def _zero_stats(self): + """Zero out the stats.""" + self.stats = {'attempted': 0, 'success': 0, 'failure': 0, 'ts_repl': 0, + 'no_change': 0, 'hashmatch': 0, 'rsync': 0, 'diff': 0, + 'remove': 0, 'empty': 0, 'remote_merge': 0, + 'start': time.time()} + + def _report_stats(self): + """Report the current stats to the logs.""" + self.logger.info( + 'Attempted to replicate %d dbs in %.5f seconds (%.5f/s)' + % (self.stats['attempted'], time.time() - self.stats['start'], + self.stats['attempted'] / + (time.time() - self.stats['start'] + 0.0000001))) + self.logger.info('Removed %(remove)d dbs' % self.stats) + self.logger.info('%(success)s successes, %(failure)s failures' + % self.stats) + self.logger.info(' '.join(['%s:%s' % item for item in + self.stats.items() if item[0] in + ('no_change', 'hashmatch', 'rsync', 'diff', 'ts_repl', 'empty')])) + + def _rsync_file(self, db_file, remote_file, whole_file=True): + """ + Sync a single file using rsync. Used by _rsync_db to handle syncing. + + :param db_file: file to be synced + :param remote_file: remote location to sync the DB file to + :param whole-file: if True, uses rsync's --whole-file flag + + :returns: True if the sync was successful, False otherwise + """ + popen_args = ['rsync', '--quiet', '--no-motd', + '--timeout=%s' % int(math.ceil(self.node_timeout)), + '--contimeout=%s' % int(math.ceil(self.conn_timeout))] + if whole_file: + popen_args.append('--whole-file') + popen_args.extend([db_file, remote_file]) + proc = subprocess.Popen(popen_args) + proc.communicate() + if proc.returncode != 0: + self.logger.error('ERROR rsync failed with %s: %s' % + (proc.returncode, popen_args)) + return proc.returncode == 0 + + def _rsync_db(self, broker, device, http, local_id, + post_method='complete_rsync', post_timeout=None): + """ + Sync a whole db using rsync. + + :param broker: DB broker object of DB to be synced + :param device: device to sync to + :param http: ReplConnection object + :param local_id: unique ID of the local database replica + :param post_method: remote operation to perform after rsync + :param post_timeout: timeout to wait in seconds + """ + if self.vm_test_mode: + remote_file = '%s::%s%s/%s/tmp/%s' % (device['ip'], + self.server_type, device['port'], device['device'], + local_id) + else: + remote_file = '%s::%s/%s/tmp/%s' % (device['ip'], + self.server_type, device['device'], local_id) + mtime = os.path.getmtime(broker.db_file) + if not self._rsync_file(broker.db_file, remote_file): + return False + # perform block-level sync if the db was modified during the first sync + if os.path.exists(broker.db_file + '-journal') or \ + os.path.getmtime(broker.db_file) > mtime: + # grab a lock so nobody else can modify it + with broker.lock(): + if not self._rsync_file(broker.db_file, remote_file, False): + return False + with Timeout(post_timeout or self.node_timeout): + response = http.post(post_method, local_id) + return response and response.status >= 200 and response.status < 300 + + def _usync_db(self, point, broker, http, remote_id, local_id): + """ + Sync a db by sending all records since the last sync. + + :param point: synchronization high water mark between the replicas + :param broker: database broker object + :param http: ReplConnection object for the remote server + :param remote_id: database id for the remote replica + :param local_id: database id for the local replica + + :returns: boolean indicating completion and success + """ + self.stats['diff'] += 1 + self.logger.debug('Syncing chunks with %s', http.host) + sync_table = broker.get_syncs() + objects = broker.get_items_since(point, self.per_diff) + while len(objects): + with Timeout(self.node_timeout): + response = http.post('merge_items', objects, local_id) + if not response or response.status >= 300 or response.status < 200: + if response: + self.logger.error('ERROR Bad response %s from %s' % + (response.status, http.host)) + return False + point = objects[-1]['ROWID'] + objects = broker.get_items_since(point, self.per_diff) + with Timeout(self.node_timeout): + response = http.post('merge_syncs', sync_table) + if response and response.status >= 200 and response.status < 300: + broker.merge_syncs([{'remote_id': remote_id, + 'sync_point': point}], incoming=False) + return True + return False + + def _in_sync(self, rinfo, info, broker, local_sync): + """ + Determine whether or not two replicas of a databases are considered + to be in sync. + + :param rinfo: remote database info + :param info: local database info + :param broker: database broker object + :param local_sync: cached last sync point between replicas + + :returns: boolean indicating whether or not the replicas are in sync + """ + if max(rinfo['point'], local_sync) >= info['max_row']: + self.stats['no_change'] += 1 + return True + if rinfo['hash'] == info['hash']: + self.stats['hashmatch'] += 1 + broker.merge_syncs([{'remote_id': rinfo['id'], + 'sync_point': rinfo['point']}], incoming=False) + return True + + def _http_connect(self, node, partition, db_file): + """ + Make an http_connection using ReplConnection + + :param node: node dictionary from the ring + :param partition: partition partition to send in the url + :param db_file: DB file + + :returns: ReplConnection object + """ + return ReplConnection(node, partition, + os.path.basename(db_file).split('.', 1)[0], self.logger) + + def _repl_to_node(self, node, broker, partition, info): + """ + Replicate a database to a node. + + :param node: node dictionary from the ring to be replicated to + :param broker: DB broker for the DB to be replication + :param partition: partition on the node to replicate to + :param info: DB info as a dictionary of {'max_row', 'hash', 'id', + 'created_at', 'put_timestamp', 'delete_timestamp'} + + :returns: True if successful, False otherwise + """ + with ConnectionTimeout(self.conn_timeout): + http = self._http_connect(node, partition, broker.db_file) + if not http: + self.logger.error( + 'ERROR Unable to connect to remote server: %s' % node) + return False + with Timeout(self.node_timeout): + response = http.post('sync', info['max_row'], info['hash'], + info['id'], info['created_at'], info['put_timestamp'], + info['delete_timestamp']) + if not response: + return False + elif response.status == HTTPNotFound.code: # completely missing, rsync + self.stats['rsync'] += 1 + return self._rsync_db(broker, node, http, info['id']) + elif response.status == HTTPInsufficientStorage.code: + raise DriveNotMounted() + elif response.status >= 200 and response.status < 300: + rinfo = simplejson.loads(response.data) + local_sync = broker.get_sync(rinfo['id'], incoming=False) + if self._in_sync(rinfo, info, broker, local_sync): + return True + # if the difference in rowids between the two differs by + # more than 50%, rsync then do a remote merge. + if rinfo['max_row'] / float(info['max_row']) < 0.5: + self.stats['remote_merge'] += 1 + return self._rsync_db(broker, node, http, info['id'], + post_method='rsync_then_merge', + post_timeout=(info['count'] / 2000)) + # else send diffs over to the remote server + return self._usync_db(max(rinfo['point'], local_sync), + broker, http, rinfo['id'], info['id']) + + def _replicate_object(self, partition, object_file, node_id): + """ + Replicate the db, choosing method based on whether or not it + already exists on peers. + + :param partition: partition to be replicated to + :param object_file: DB file name to be replicated + :param node_id: node id of the node to be replicated to + """ + self.logger.debug('Replicating db %s' % object_file) + self.stats['attempted'] += 1 + try: + broker = self.brokerclass(object_file, pending_timeout=30) + broker.reclaim(time.time() - self.reclaim_age, + time.time() - (self.reclaim_age * 2)) + info = broker.get_replication_info() + except Exception, e: + if 'no such table' in str(e): + self.logger.error('Quarantining DB %s' % object_file) + quarantine_db(broker.db_file, broker.db_type) + else: + self.logger.exception('ERROR reading db %s' % object_file) + self.stats['failure'] += 1 + return + # The db is considered deleted if the delete_timestamp value is greater + # than the put_timestamp, and there are no objects. + delete_timestamp = 0 + try: + delete_timestamp = float(info['delete_timestamp']) + except ValueError: + pass + put_timestamp = 0 + try: + put_timestamp = float(info['put_timestamp']) + except ValueError: + pass + if delete_timestamp < (time.time() - self.reclaim_age) and \ + delete_timestamp > put_timestamp and \ + info['count'] in (None, '', 0, '0'): + with lock_parent_directory(object_file): + shutil.rmtree(os.path.dirname(object_file), True) + self.stats['remove'] += 1 + return + responses = [] + nodes = self.ring.get_part_nodes(int(partition)) + shouldbehere = bool([n for n in nodes if n['id'] == node_id]) + repl_nodes = [n for n in nodes if n['id'] != node_id] + more_nodes = self.ring.get_more_nodes(int(partition)) + for node in repl_nodes: + success = False + try: + success = self._repl_to_node(node, broker, partition, info) + except DriveNotMounted: + repl_nodes.append(more_nodes.next()) + self.logger.error('ERROR Remote drive not mounted %s' % node) + except: + self.logger.exception('ERROR syncing %s with node %s' % + (object_file, node)) + self.stats['success' if success else 'failure'] += 1 + responses.append(success) + if not shouldbehere and all(responses): + # If the db shouldn't be on this node and has been successfully + # synced to all of its peers, it can be removed. + with lock_parent_directory(object_file): + shutil.rmtree(os.path.dirname(object_file), True) + self.stats['remove'] += 1 + + def roundrobin_datadirs(self, datadirs): + """ + Generator to walk the data dirs in a round robin manner, evenly + hitting each device on the system. + + :param datadirs: a list of paths to walk + """ + def walk_datadir(datadir, node_id): + partitions = os.listdir(datadir) + random.shuffle(partitions) + for partition in partitions: + part_dir = os.path.join(datadir, partition) + for root, dirs, files in os.walk(part_dir, topdown=False): + for fname in (f for f in files if f.endswith('.db')): + object_file = os.path.join(root, fname) + yield (partition, object_file, node_id) + its = [walk_datadir(datadir, node_id) for datadir, node_id in datadirs] + while its: + for it in its: + try: + yield it.next() + except StopIteration: + its.remove(it) + + def replicate_once(self): + """Run a replication pass once.""" + self._zero_stats() + dirs = [] + ips = whataremyips() + if not ips: + self.logger.error('ERROR Failed to get my own IPs?') + return + for node in self.ring.devs: + if node and node['ip'] in ips and node['port'] == self.port: + if self.mount_check and not os.path.ismount( + os.path.join(self.root, node['device'])): + self.logger.warn( + 'Skipping %(device)s as it is not mounted' % node) + continue + unlink_older_than( + os.path.join(self.root, node['device'], 'tmp'), + time.time() - self.reclaim_age) + datadir = os.path.join(self.root, node['device'], self.datadir) + if os.path.isdir(datadir): + dirs.append((datadir, node['id'])) + self.logger.info('Beginning replication run') + for part, object_file, node_id in self.roundrobin_datadirs(dirs): + self.cpool.spawn_n( + self._replicate_object, part, object_file, node_id) + self.cpool.waitall() + self.logger.info('Replication run OVER') + self._report_stats() + + def replicate_forever(self): + """ + Replicate dbs under the given root in an infinite loop. + """ + while True: + try: + self.replicate_once() + except: + self.logger.exception('ERROR trying to replicate') + sleep(self.run_pause) + + +class ReplicatorRpc(object): + """Handle Replication RPC calls. TODO: redbo document please :)""" + + def __init__(self, root, datadir, broker_class, mount_check=True): + self.root = root + self.datadir = datadir + self.broker_class = broker_class + self.mount_check = mount_check + + def dispatch(self, post_args, args): + if not hasattr(args, 'pop'): + return HTTPBadRequest(body='Invalid object type') + op = args.pop(0) + drive, partition, hsh = post_args + if self.mount_check and \ + not os.path.ismount(os.path.join(self.root, drive)): + return Response(status='507 %s is not mounted' % drive) + db_file = os.path.join(self.root, drive, + storage_directory(self.datadir, partition, hsh), hsh + '.db') + if op == 'rsync_then_merge': + return self.rsync_then_merge(drive, db_file, args) + if op == 'complete_rsync': + return self.complete_rsync(drive, db_file, args) + else: + # someone might be about to rsync a db to us, + # make sure there's a tmp dir to receive it. + mkdirs(os.path.join(self.root, drive, 'tmp')) + if not os.path.exists(db_file): + return HTTPNotFound() + return getattr(self, op)(self.broker_class(db_file), args) + + def sync(self, broker, args): + (remote_sync, hash_, id_, created_at, put_timestamp, + delete_timestamp) = args + try: + info = broker.get_replication_info() + except Exception, e: + if 'no such table' in str(e): + # TODO find a real logger + print "Quarantining DB %s" % broker.db_file + quarantine_db(broker.db_file, broker.db_type) + return HTTPNotFound() + raise + if info['put_timestamp'] != put_timestamp or \ + info['created_at'] != created_at or \ + info['delete_timestamp'] != delete_timestamp: + broker.merge_timestamps( + created_at, put_timestamp, delete_timestamp) + info['point'] = broker.get_sync(id_) + if hash_ == info['hash'] and info['point'] < remote_sync: + broker.merge_syncs([{'remote_id': id_, + 'sync_point': remote_sync}]) + info['point'] = remote_sync + return Response(simplejson.dumps(info)) + + def merge_syncs(self, broker, args): + broker.merge_syncs(args[0]) + return HTTPAccepted() + + def merge_items(self, broker, args): + broker.merge_items(args[0], args[1]) + return HTTPAccepted() + + def complete_rsync(self, drive, db_file, args): + old_filename = os.path.join(self.root, drive, 'tmp', args[0]) + if os.path.exists(db_file): + return HTTPNotFound() + if not os.path.exists(old_filename): + return HTTPNotFound() + broker = self.broker_class(old_filename) + broker.newid(args[0]) + renamer(old_filename, db_file) + return HTTPNoContent() + + def rsync_then_merge(self, drive, db_file, args): + old_filename = os.path.join(self.root, drive, 'tmp', args[0]) + if not os.path.exists(db_file) or not os.path.exists(old_filename): + return HTTPNotFound() + new_broker = self.broker_class(old_filename) + existing_broker = self.broker_class(db_file) + point = -1 + objects = existing_broker.get_items_since(point, 1000) + while len(objects): + new_broker.merge_items(objects) + point = objects[-1]['ROWID'] + objects = existing_broker.get_items_since(point, 1000) + sleep() + new_broker.newid(args[0]) + renamer(old_filename, db_file) + return HTTPNoContent() diff --git a/swift/common/direct_client.py b/swift/common/direct_client.py new file mode 100644 index 000000000..1d7030c09 --- /dev/null +++ b/swift/common/direct_client.py @@ -0,0 +1,303 @@ +# Copyright (c) 2010 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Internal client library for making calls directly to the servers rather than +through the proxy. +""" + +import socket +from httplib import HTTPException +from time import time +from urllib import quote as _quote, unquote + +from eventlet import sleep, Timeout + +from swift.common.bufferedhttp import http_connect +from swift.common.client import ClientException, json_loads +from swift.common.utils import normalize_timestamp + + +def quote(value, safe='/'): + if isinstance(value, unicode): + value = value.encode('utf8') + return _quote(value, safe) + + +def direct_head_container(node, part, account, container, conn_timeout=5, + response_timeout=15): + """ + Request container information directly from the container server. + + :param node: node dictionary from the ring + :param part: partition the container is on + :param account: account name + :param container: container name + :param conn_timeout: timeout in seconds for establishing the connection + :param response_timeout: timeout in seconds for getting the response + :returns: tuple of (object count, bytes used) + """ + path = '/%s/%s' % (account, container) + with Timeout(conn_timeout): + conn = http_connect(node['ip'], node['port'], node['device'], part, + 'HEAD', path) + with Timeout(response_timeout): + resp = conn.getresponse() + resp.read() + if resp.status < 200 or resp.status >= 300: + raise ClientException( + 'Container server %s:%s direct HEAD %s gave status %s' % + (node['ip'], node['port'], + repr('/%s/%s%s' % (node['device'], part, path)), + resp.status), + http_host=node['ip'], http_port=node['port'], + http_device=node['device'], http_status=resp.status, + http_reason=resp.reason) + return int(resp.getheader('x-container-object-count')), \ + int(resp.getheader('x-container-bytes-used')) + + +def direct_get_container(node, part, account, container, marker=None, + limit=None, prefix=None, delimiter=None, + conn_timeout=5, response_timeout=15): + """ + Get container listings directly from the container server. + + :param node: node dictionary from the ring + :param part: partition the container is on + :param account: account name + :param container: container name + :param marker: marker query + :param limit: query limit + :param prefix: prefix query + :param delimeter: delimeter for the query + :param conn_timeout: timeout in seconds for establishing the connection + :param response_timeout: timeout in seconds for getting the response + :returns: list of objects + """ + path = '/%s/%s' % (account, container) + qs = 'format=json' + if marker: + qs += '&marker=%s' % quote(marker) + if limit: + qs += '&limit=%d' % limit + if prefix: + qs += '&prefix=%s' % quote(prefix) + if delimiter: + qs += '&delimiter=%s' % quote(delimiter) + with Timeout(conn_timeout): + conn = http_connect(node['ip'], node['port'], node['device'], part, + 'GET', path, query_string='format=json') + with Timeout(response_timeout): + resp = conn.getresponse() + if resp.status < 200 or resp.status >= 300: + resp.read() + raise ClientException( + 'Container server %s:%s direct GET %s gave stats %s' % (node['ip'], + node['port'], repr('/%s/%s%s' % (node['device'], part, path)), + resp.status), + http_host=node['ip'], http_port=node['port'], + http_device=node['device'], http_status=resp.status, + http_reason=resp.reason) + if resp.status == 204: + resp.read() + return [] + return json_loads(resp.read()) + + +def direct_delete_container(node, part, account, container, conn_timeout=5, + response_timeout=15, headers={}): + path = '/%s/%s' % (account, container) + headers['X-Timestamp'] = normalize_timestamp(time()) + with Timeout(conn_timeout): + conn = http_connect(node['ip'], node['port'], node['device'], part, + 'DELETE', path, headers) + with Timeout(response_timeout): + resp = conn.getresponse() + if resp.status < 200 or resp.status >= 300: + raise ClientException( + 'Container server %s:%s direct DELETE %s gave status %s' % + (node['ip'], node['port'], + repr('/%s/%s%s' % (node['device'], part, path)), + resp.status), + http_host=node['ip'], http_port=node['port'], + http_device=node['device'], http_status=resp.status, + http_reason=resp.reason) + return resp + + +def direct_head_object(node, part, account, container, obj, conn_timeout=5, + response_timeout=15): + """ + Request object information directly from the object server. + + :param node: node dictionary from the ring + :param part: partition the container is on + :param account: account name + :param container: container name + :param obj: object name + :param conn_timeout: timeout in seconds for establishing the connection + :param response_timeout: timeout in seconds for getting the response + :returns: tuple of (content-type, object size, last modified timestamp, + etag, metadata dictionary) + """ + path = '/%s/%s/%s' % (account, container, obj) + with Timeout(conn_timeout): + conn = http_connect(node['ip'], node['port'], node['device'], part, + 'HEAD', path) + with Timeout(response_timeout): + resp = conn.getresponse() + resp.read() + if resp.status < 200 or resp.status >= 300: + raise ClientException( + 'Object server %s:%s direct HEAD %s gave status %s' % + (node['ip'], node['port'], + repr('/%s/%s%s' % (node['device'], part, path)), + resp.status), + http_host=node['ip'], http_port=node['port'], + http_device=node['device'], http_status=resp.status, + http_reason=resp.reason) + metadata = {} + for key, value in resp.getheaders(): + if key.lower().startswith('x-object-meta-'): + metadata[unquote(key[len('x-object-meta-'):])] = unquote(value) + return resp.getheader('content-type'), \ + int(resp.getheader('content-length')), \ + resp.getheader('last-modified'), \ + resp.getheader('etag').strip('"'), \ + metadata + + +def direct_get_object(node, part, account, container, obj, conn_timeout=5, + response_timeout=15): + """ + Get object directly from the object server. + + :param node: node dictionary from the ring + :param part: partition the container is on + :param account: account name + :param container: container name + :param obj: object name + :param conn_timeout: timeout in seconds for establishing the connection + :param response_timeout: timeout in seconds for getting the response + :returns: object + """ + path = '/%s/%s/%s' % (account, container, obj) + with Timeout(conn_timeout): + conn = http_connect(node['ip'], node['port'], node['device'], part, + 'GET', path) + with Timeout(response_timeout): + resp = conn.getresponse() + if resp.status < 200 or resp.status >= 300: + raise ClientException( + 'Object server %s:%s direct GET %s gave status %s' % + (node['ip'], node['port'], + repr('/%s/%s%s' % (node['device'], part, path)), + resp.status), + http_host=node['ip'], http_port=node['port'], + http_device=node['device'], http_status=resp.status, + http_reason=resp.reason) + metadata = {} + for key, value in resp.getheaders(): + if key.lower().startswith('x-object-meta-'): + metadata[unquote(key[len('x-object-meta-'):])] = unquote(value) + return (resp.getheader('content-type'), + int(resp.getheader('content-length')), + resp.getheader('last-modified'), + resp.getheader('etag').strip('"'), + metadata, + resp.read()) + + +def direct_delete_object(node, part, account, container, obj, + conn_timeout=5, response_timeout=15, headers={}): + """ + Delete object directly from the object server. + + :param node: node dictionary from the ring + :param part: partition the container is on + :param account: account name + :param container: container name + :param obj: object name + :param conn_timeout: timeout in seconds for establishing the connection + :param response_timeout: timeout in seconds for getting the response + :returns: response from server + """ + path = '/%s/%s/%s' % (account, container, obj) + headers['X-Timestamp'] = normalize_timestamp(time()) + with Timeout(conn_timeout): + conn = http_connect(node['ip'], node['port'], node['device'], part, + 'DELETE', path, headers) + with Timeout(response_timeout): + resp = conn.getresponse() + if resp.status < 200 or resp.status >= 300: + raise ClientException( + 'Object server %s:%s direct DELETE %s gave status %s' % + (node['ip'], node['port'], + repr('/%s/%s%s' % (node['device'], part, path)), + resp.status), + http_host=node['ip'], http_port=node['port'], + http_device=node['device'], http_status=resp.status, + http_reason=resp.reason) + return resp + + +def retry(func, *args, **kwargs): + """ + Helper function to retry a given function a number of times. + + :param func: callable to be called + :param retries: number of retries + :param error_log: logger for errors + :param args: arguments to send to func + :param kwargs: keyward arguments to send to func (if retries or + error_log are sent, they will be deleted from kwargs + before sending on to func) + :returns: restult of func + """ + retries = 5 + if 'retries' in kwargs: + retries = kwargs['retries'] + del kwargs['retries'] + error_log = None + if 'error_log' in kwargs: + error_log = kwargs['error_log'] + del kwargs['error_log'] + attempts = 0 + backoff = 1 + while attempts <= retries: + attempts += 1 + try: + return attempts, func(*args, **kwargs) + except (socket.error, HTTPException, Timeout), err: + if error_log: + error_log(err) + if attempts > retries: + raise + except ClientException, err: + if error_log: + error_log(err) + if attempts > retries or err.http_status < 500 or \ + err.http_status == 507 or err.http_status > 599: + raise + sleep(backoff) + backoff *= 2 + # Shouldn't actually get down here, but just in case. + if args and 'ip' in args[0]: + raise ClientException('Raise too many retries', + http_host=args[0]['ip'], http_port=args[0]['port'], + http_device=args[0]['device']) + else: + raise ClientException('Raise too many retries') diff --git a/swift/common/exceptions.py b/swift/common/exceptions.py new file mode 100644 index 000000000..bf1b07cae --- /dev/null +++ b/swift/common/exceptions.py @@ -0,0 +1,35 @@ +# Copyright (c) 2010 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from eventlet import TimeoutError + + +class MessageTimeout(TimeoutError): + + def __init__(self, seconds=None, msg=None): + TimeoutError.__init__(self, seconds=seconds) + self.msg = msg + + def __str__(self): + return '%s: %s' % (TimeoutError.__str__(self), self.msg) + + +class AuditException(Exception): pass +class AuthException(Exception): pass +class ChunkReadTimeout(TimeoutError): pass +class ChunkWriteTimeout(TimeoutError): pass +class ConnectionTimeout(TimeoutError): pass +class DriveNotMounted(Exception): pass +class LockTimeout(MessageTimeout): pass diff --git a/swift/common/healthcheck.py b/swift/common/healthcheck.py new file mode 100644 index 000000000..a483eaffd --- /dev/null +++ b/swift/common/healthcheck.py @@ -0,0 +1,28 @@ +# Copyright (c) 2010 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from webob import Response + +class HealthCheckController(object): + """Basic controller used for monitoring.""" + + def __init__(self, *args, **kwargs): + pass + + @classmethod + def GET(self, req): + return Response(request=req, body="OK", content_type="text/plain") + +healthcheck = HealthCheckController.GET diff --git a/swift/common/memcached.py b/swift/common/memcached.py new file mode 100644 index 000000000..e232e404f --- /dev/null +++ b/swift/common/memcached.py @@ -0,0 +1,272 @@ +# Copyright (c) 2010 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Lucid comes with memcached: v1.4.2. Protocol documentation for that +version is at: + +http://github.com/memcached/memcached/blob/1.4.2/doc/protocol.txt +""" + +import cPickle as pickle +import logging +import socket +import time +from bisect import bisect +from hashlib import md5 + + +CONN_TIMEOUT = 0.3 +IO_TIMEOUT = 2.0 +PICKLE_FLAG = 1 +NODE_WEIGHT = 50 +PICKLE_PROTOCOL = 2 +TRY_COUNT = 3 + +# if ERROR_LIMIT_COUNT errors occur in ERROR_LIMIT_TIME seconds, the server +# will be considered failed for ERROR_LIMIT_DURATION seconds. +ERROR_LIMIT_COUNT = 10 +ERROR_LIMIT_TIME = 60 +ERROR_LIMIT_DURATION = 300 + + +def md5hash(key): + return md5(key).hexdigest() + + +class MemcacheRing(object): + """ + Simple, consistent-hashed memcache client. + """ + + def __init__(self, servers, connect_timeout=CONN_TIMEOUT, + io_timeout=IO_TIMEOUT, tries=TRY_COUNT): + self._ring = {} + self._errors = dict(((serv, []) for serv in servers)) + self._error_limited = dict(((serv, 0) for serv in servers)) + for server in sorted(servers): + for i in xrange(NODE_WEIGHT): + self._ring[md5hash('%s-%s' % (server, i))] = server + self._tries = tries if tries <= len(servers) else len(servers) + self._sorted = sorted(self._ring.keys()) + self._client_cache = dict(((server, []) for server in servers)) + self._connect_timeout = connect_timeout + self._io_timeout = io_timeout + + def _exception_occurred(self, server, e, action='talking'): + if isinstance(e, socket.timeout): + logging.error("Timeout %s to memcached: %s" % (action, server)) + else: + logging.exception("Error %s to memcached: %s" % (action, server)) + now = time.time() + self._errors[server].append(time.time()) + if len(self._errors[server]) > ERROR_LIMIT_COUNT: + self._errors[server] = [err for err in self._errors[server] + if err > now - ERROR_LIMIT_TIME] + if len(self._errors[server]) > ERROR_LIMIT_COUNT: + self._error_limited[server] = now + ERROR_LIMIT_DURATION + logging.error('Error limiting server %s' % server) + + def _get_conns(self, key): + """ + Retrieves a server conn from the pool, or connects a new one. + Chooses the server based on a consistent hash of "key". + """ + pos = bisect(self._sorted, key) + served = [] + while len(served) < self._tries: + pos = (pos + 1) % len(self._sorted) + server = self._ring[self._sorted[pos]] + if server in served: + continue + served.append(server) + if self._error_limited[server] > time.time(): + continue + try: + fp, sock = self._client_cache[server].pop() + yield server, fp, sock + except IndexError: + try: + host, port = server.split(':') + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + sock.settimeout(self._connect_timeout) + sock.connect((host, int(port))) + sock.settimeout(self._io_timeout) + yield server, sock.makefile(), sock + except Exception, e: + self._exception_occurred(server, e, 'connecting') + + def _return_conn(self, server, fp, sock): + """ Returns a server connection to the pool """ + self._client_cache[server].append((fp, sock)) + + def set(self, key, value, serialize=True, timeout=0): + """ + Set a key/value pair in memcache + + :param key: key + :param value: value + :param serialize: if True, value is pickled before sending to memcache + :param timeout: ttl in memcache + """ + key = md5hash(key) + if timeout > 0: + timeout += time.time() + flags = 0 + if serialize: + value = pickle.dumps(value, PICKLE_PROTOCOL) + flags |= PICKLE_FLAG + for (server, fp, sock) in self._get_conns(key): + try: + sock.sendall('set %s %d %d %s noreply\r\n%s\r\n' % \ + (key, flags, timeout, len(value), value)) + self._return_conn(server, fp, sock) + return + except Exception, e: + self._exception_occurred(server, e) + + def get(self, key): + """ + Gets the object specified by key. It will also unpickle the object + before returning if it is pickled in memcache. + + :param key: key + :returns: value of the key in memcache + """ + key = md5hash(key) + value = None + for (server, fp, sock) in self._get_conns(key): + try: + sock.sendall('get %s\r\n' % key) + line = fp.readline().strip().split() + while line[0].upper() != 'END': + if line[0].upper() == 'VALUE' and line[1] == key: + size = int(line[3]) + value = fp.read(size) + if int(line[2]) & PICKLE_FLAG: + value = pickle.loads(value) + fp.readline() + line = fp.readline().strip().split() + self._return_conn(server, fp, sock) + return value + except Exception, e: + self._exception_occurred(server, e) + + def incr(self, key, delta=1, timeout=0): + """ + Increments a key which has a numeric value by delta. + If the key can't be found, it's added as delta. + + :param key: key + :param delta: amount to add to the value of key (or set as the value + if the key is not found) + :param timeout: ttl in memcache + """ + key = md5hash(key) + for (server, fp, sock) in self._get_conns(key): + try: + sock.sendall('incr %s %s\r\n' % (key, delta)) + line = fp.readline().strip().split() + if line[0].upper() == 'NOT_FOUND': + line[0] = str(delta) + sock.sendall('add %s %d %d %s noreply\r\n%s\r\n' % \ + (key, 0, timeout, len(line[0]), line[0])) + ret = int(line[0].strip()) + self._return_conn(server, fp, sock) + return ret + except Exception, e: + self._exception_occurred(server, e) + + def delete(self, key): + """ + Deletes a key/value pair from memcache. + + :param key: key to be deleted + """ + key = md5hash(key) + for (server, fp, sock) in self._get_conns(key): + try: + sock.sendall('delete %s noreply\r\n' % key) + self._return_conn(server, fp, sock) + return + except Exception, e: + self._exception_occurred(server, e) + + def set_multi(self, mapping, server_key, serialize=True, timeout=0): + """ + Sets multiple key/value pairs in memcache. + + :param mapping: dictonary of keys and values to be set in memcache + :param servery_key: key to use in determining which server in the ring + is used + :param serialize: if True, value is pickled before sending to memcache + :param timeout: ttl for memcache + """ + server_key = md5hash(server_key) + if timeout > 0: + timeout += time.time() + msg = '' + for key, value in mapping.iteritems(): + key = md5hash(key) + flags = 0 + if serialize: + value = pickle.dumps(value, PICKLE_PROTOCOL) + flags |= PICKLE_FLAG + msg += ('set %s %d %d %s noreply\r\n%s\r\n' % + (key, flags, timeout, len(value), value)) + for (server, fp, sock) in self._get_conns(server_key): + try: + sock.sendall(msg) + self._return_conn(server, fp, sock) + return + except Exception, e: + self._exception_occurred(server, e) + + def get_multi(self, keys, server_key): + """ + Gets multiple values from memcache for the given keys. + + :param keys: keys for values to be retrieved from memcache + :param servery_key: key to use in determining which server in the ring + is used + :returns: list of values + """ + server_key = md5hash(server_key) + keys = [md5hash(key) for key in keys] + for (server, fp, sock) in self._get_conns(server_key): + try: + sock.sendall('get %s\r\n' % ' '.join(keys)) + line = fp.readline().strip().split() + responses = {} + while line[0].upper() != 'END': + if line[0].upper() == 'VALUE': + size = int(line[3]) + value = fp.read(size) + if int(line[2]) & PICKLE_FLAG: + value = pickle.loads(value) + responses[line[1]] = value + fp.readline() + line = fp.readline().strip().split() + values = [] + for key in keys: + if key in responses: + values.append(responses[key]) + else: + values.append(None) + self._return_conn(server, fp, sock) + return values + except Exception, e: + self._exception_occurred(server, e) diff --git a/swift/common/utils.py b/swift/common/utils.py new file mode 100644 index 000000000..c2c091ce7 --- /dev/null +++ b/swift/common/utils.py @@ -0,0 +1,490 @@ +# Copyright (c) 2010 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Miscellaneous utility functions for use with Swift.""" + +import errno +import fcntl +import os +import pwd +import signal +import sys +import time +import mimetools +from hashlib import md5 +from random import shuffle +from urllib import quote +from contextlib import contextmanager +import ctypes +import ctypes.util +import fcntl +import struct + +import eventlet +from eventlet import greenio, GreenPool, sleep, Timeout, listen +from eventlet.green import socket, subprocess, ssl, thread, threading + +from swift.common.exceptions import LockTimeout, MessageTimeout + +# logging doesn't import patched as cleanly as one would like +from logging.handlers import SysLogHandler +import logging +logging.thread = eventlet.green.thread +logging.threading = eventlet.green.threading +logging._lock = logging.threading.RLock() + + +libc = ctypes.CDLL(ctypes.util.find_library('c')) +sys_fallocate = libc.fallocate +posix_fadvise = libc.posix_fadvise + + +# Used by hash_path to offer a bit more security when generating hashes for +# paths. It simply appends this value to all paths; guessing the hash a path +# will end up with would also require knowing this suffix. +HASH_PATH_SUFFIX = os.environ.get('SWIFT_HASH_PATH_SUFFIX', 'endcap') + + +def get_param(req, name, default=None): + """ + Get parameters from an HTTP request ensuring proper handling UTF-8 + encoding. + + :param req: Webob request object + :param name: parameter name + :param default: result to return if the parameter is not found + :returns: HTTP request parameter value + """ + value = req.str_params.get(name, default) + if value: + value.decode('utf8') # Ensure UTF8ness + return value + + +def fallocate(fd, size): + """ + Pre-allocate disk space for a file file. + + :param fd: file descriptor + :param size: size to allocate (in bytes) + """ + if size > 0: + # 1 means "FALLOC_FL_KEEP_SIZE", which means it pre-allocates invisibly + ret = sys_fallocate(fd, 1, 0, ctypes.c_uint64(size)) + # XXX: in (not very thorough) testing, errno always seems to be 0? + err = ctypes.get_errno() + if ret and err not in (0, errno.ENOSYS): + raise OSError(err, 'Unable to fallocate(%s)' % size) + + +def drop_buffer_cache(fd, offset, length): + """ + Drop 'buffer' cache for the given range of the given file. + + :param fd: file descriptor + :param offset: start offset + :param length: length + """ + # 4 means "POSIX_FADV_DONTNEED" + ret = posix_fadvise(fd, ctypes.c_uint64(offset), ctypes.c_uint64(length), 4) + if ret != 0: + print "posix_fadvise(%s, %s, %s, 4) -> %s" % (fd, offset, length, ret) + + +def normalize_timestamp(timestamp): + """ + Format a timestamp (string or numeric) into a standardized + xxxxxxxxxx.xxxxx format. + + :param timestamp: unix timestamp + :returns: normalized timestamp as a string + """ + return "%016.05f" % (float(timestamp)) + + +def mkdirs(path): + """ + Ensures the path is a directory or makes it if not. Errors if the path + exists but is a file or on permissions failure. + + :param path: path to create + """ + if not os.path.isdir(path): + try: + os.makedirs(path) + except OSError, err: + if err.errno != errno.EEXIST or not os.path.isdir(path): + raise + + +def renamer(old, new): # pragma: no cover + """ + Attempt to fix^H^H^Hhide race conditions like empty object directories + being removed by backend processes during uploads, by retrying. + + :param old: old path to be renamed + :param new: new path to be renamed to + """ + try: + mkdirs(os.path.dirname(new)) + os.rename(old, new) + except OSError: + mkdirs(os.path.dirname(new)) + os.rename(old, new) + + +def split_path(path, minsegs=1, maxsegs=None, rest_with_last=False): + """ + Validate and split the given HTTP request path. + + **Examples**:: + + ['a'] = split_path('/a') + ['a', None] = split_path('/a', 1, 2) + ['a', 'c'] = split_path('/a/c', 1, 2) + ['a', 'c', 'o/r'] = split_path('/a/c/o/r', 1, 3, True) + + :param path: HTTP Request path to be split + :param minsegs: Minimum number of segments to be extracted + :param maxsegs: Maximum number of segments to be extracted + :param rest_with_last: If True, trailing data will be returned as part + of last segment. If False, and there is + trailing data, raises ValueError. + :returns: list of segments with a length of maxsegs (non-existant + segments will return as None) + """ + if not maxsegs: + maxsegs = minsegs + if minsegs > maxsegs: + raise ValueError('minsegs > maxsegs: %d > %d' % (minsegs, maxsegs)) + if rest_with_last: + segs = path.split('/', maxsegs) + minsegs += 1 + maxsegs += 1 + count = len(segs) + if segs[0] or count < minsegs or count > maxsegs or \ + '' in segs[1:minsegs]: + raise ValueError('Invalid path: %s' % quote(path)) + else: + minsegs += 1 + maxsegs += 1 + segs = path.split('/', maxsegs) + count = len(segs) + if segs[0] or count < minsegs or count > maxsegs + 1 or \ + '' in segs[1:minsegs] or (count == maxsegs + 1 and segs[maxsegs]): + raise ValueError('Invalid path: %s' % quote(path)) + segs = segs[1:maxsegs] + segs.extend([None] * (maxsegs - 1 - len(segs))) + return segs + + +class NullLogger(): + """A no-op logger for eventlet wsgi.""" + + def write(self, *args): + #"Logs" the args to nowhere + pass + + +class LoggerFileObject(object): + + def __init__(self, logger): + self.logger = logger + + def write(self, value): + value = value.strip() + if value: + if 'Connection reset by peer' in value: + self.logger.error('STDOUT: Connection reset by peer') + else: + self.logger.error('STDOUT: %s' % value) + + def writelines(self, values): + self.logger.error('STDOUT: %s' % '#012'.join(values)) + + def close(self): + pass + + def flush(self): + pass + + def __iter__(self): + return self + + def next(self): + raise IOError(errno.EBADF, 'Bad file descriptor') + + def read(self, size=-1): + raise IOError(errno.EBADF, 'Bad file descriptor') + + def readline(self, size=-1): + raise IOError(errno.EBADF, 'Bad file descriptor') + + def tell(self): + return 0 + + def xreadlines(self): + return self + + +def drop_privileges(user): + """ + Sets the userid of the current process + + :param user: User id to change privileges to + """ + user = pwd.getpwnam(user) + os.setgid(user[3]) + os.setuid(user[2]) + + +class NamedLogger(object): + """Cheesy version of the LoggerAdapter available in Python 3""" + + def __init__(self, logger, server): + self.logger = logger + self.server = server + for proxied_method in ('debug', 'info', 'log', 'warn', 'warning', + 'error', 'critical'): + setattr(self, proxied_method, + self._proxy(getattr(logger, proxied_method))) + + def _proxy(self, logger_meth): + def _inner_proxy(msg, *args, **kwargs): + msg = '%s %s' % (self.server, msg) + logger_meth(msg, *args, **kwargs) + return _inner_proxy + + def getEffectiveLevel(self): + return self.logger.getEffectiveLevel() + + def exception(self, msg, *args): + _, exc, _ = sys.exc_info() + call = self.logger.error + emsg = '' + if isinstance(exc, OSError): + if exc.errno in (errno.EIO, errno.ENOSPC): + emsg = str(exc) + else: + call = self.logger.exception + elif isinstance(exc, socket.error): + if exc.errno == errno.ECONNREFUSED: + emsg = 'Connection refused' + elif exc.errno == errno.EHOSTUNREACH: + emsg = 'Host unreachable' + else: + call = self.logger.exception + elif isinstance(exc, eventlet.Timeout): + emsg = exc.__class__.__name__ + if hasattr(exc, 'seconds'): + emsg += ' (%ss)' % exc.seconds + if isinstance(exc, MessageTimeout): + if exc.msg: + emsg += ' %s' % exc.msg + else: + call = self.logger.exception + call('%s %s: %s' % (self.server, msg, emsg), *args) + + +def get_logger(conf, name): + """ + Get the current system logger using config settings. + + **Log config and defaults**:: + + log_facility = LOG_LOCAL0 + log_level = INFO + + :param conf: Configuration dict to read settings from + :param name: Name of the logger + """ + root_logger = logging.getLogger() + if hasattr(get_logger, 'handler') and get_logger.handler: + root_logger.removeHandler(get_logger.handler) + get_logger.handler = None + if conf is None: + root_logger.setLevel(logging.INFO) + return NamedLogger(root_logger, name) + get_logger.handler = SysLogHandler(address='/dev/log', + facility=getattr(SysLogHandler, conf.get('log_facility', 'LOG_LOCAL0'), + SysLogHandler.LOG_LOCAL0)) + root_logger.addHandler(get_logger.handler) + root_logger.setLevel( + getattr(logging, conf.get('log_level', 'INFO').upper(), logging.INFO)) + return NamedLogger(root_logger, name) + + +def whataremyips(): + """ + Get the machine's ip addresses using ifconfig + + :returns: list of Strings of IPv4 ip addresses + """ + proc = subprocess.Popen(['/sbin/ifconfig'], stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + ret_val = proc.wait() + results = proc.stdout.read().split('\n') + return [x.split(':')[1].split()[0] for x in results if 'inet addr' in x] + + +def storage_directory(datadir, partition, hash): + """ + Get the storage directory + + :param datadir: Base data directory + :param partition: Partition + :param hash: Account, container or object hash + :returns: Storage directory + """ + return os.path.join(datadir, partition, hash[-3:], hash) + + +def hash_path(account, container=None, object=None, raw_digest=False): + """ + Get the connonical hash for an account/container/object + + :param account: Account + :param container: Container + :param object: Object + :param raw_digest: If True, return the raw version rather than a hex digest + :returns: hash string + """ + if object and not container: + raise ValueError('container is required if object is provided') + paths = [account] + if container: + paths.append(container) + if object: + paths.append(object) + if raw_digest: + return md5('/' + '/'.join(paths) + HASH_PATH_SUFFIX).digest() + else: + return md5('/' + '/'.join(paths) + HASH_PATH_SUFFIX).hexdigest() + + +@contextmanager +def lock_path(directory, timeout=10): + """ + Context manager that acquires a lock on a directory. This will block until + the lock can be acquired, or the timeout time has expired (whichever occurs + first). + + :param directory: directory to be locked + :param timeout: timeout (in seconds) + """ + mkdirs(directory) + fd = os.open(directory, os.O_RDONLY) + try: + with LockTimeout(timeout, directory): + while True: + try: + fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) + break + except IOError, err: + if err.errno != errno.EAGAIN: + raise + sleep(0.01) + yield True + finally: + os.close(fd) + + +def lock_parent_directory(filename, timeout=10): + """ + Context manager that acquires a lock on the parent directory of the given + file path. This will block until the lock can be acquired, or the timeout + time has expired (whichever occurs first). + + :param filename: file path of the parent directory to be locked + :param timeout: timeout (in seconds) + """ + return lock_path(os.path.dirname(filename)) + + +def get_time_units(time_amount): + """ + Get a nomralized length of time in the largest unit of time (hours, + minutes, or seconds.) + + :param time_amount: length of time in seconds + :returns: A touple of (length of time, unit of time) where unit of time is + one of ('h', 'm', 's') + """ + time_unit = 's' + if time_amount > 60: + time_amount /= 60 + time_unit = 'm' + if time_amount > 60: + time_amount /= 60 + time_unit = 'h' + return time_amount, time_unit + + +def compute_eta(start_time, current_value, final_value): + """ + Compute an ETA. Now only if we could also have a progress bar... + + :param start_time: Unix timestamp when the operation began + :param current_value: Current value + :param final_value: Final value + :returns: ETA as a tuple of (length of time, unit of time) where unit of + time is one of ('h', 'm', 's') + """ + elapsed = time.time() - start_time + completion = (float(current_value) / final_value) or 0.00001 + return get_time_units(1.0 / completion * elapsed - elapsed) + + +def iter_devices_partitions(devices_dir, item_type): + """ + Iterate over partitions accross all devices. + + :param devices_dir: Path to devices + :param item_type: One of 'accounts', 'containers', or 'objects' + :returns: Each iteration returns a tuple of (device, partition) + """ + devices = os.listdir(devices_dir) + shuffle(devices) + devices_partitions = [] + for device in devices: + partitions = os.listdir(os.path.join(devices_dir, device, item_type)) + shuffle(partitions) + devices_partitions.append((device, iter(partitions))) + yielded = True + while yielded: + yielded = False + for device, partitions in devices_partitions: + try: + yield device, partitions.next() + yielded = True + except StopIteration: + pass + + +def unlink_older_than(path, mtime): + """ + Remove any file in a given path that that was last modified before mtime. + + :param path: Path to remove file from + :mtime: Timestamp of oldest file to keep + """ + if os.path.exists(path): + for fname in os.listdir(path): + fpath = os.path.join(path, fname) + try: + if os.path.getmtime(fpath) < mtime: + os.unlink(fpath) + except OSError: + pass diff --git a/swift/common/wsgi.py b/swift/common/wsgi.py new file mode 100644 index 000000000..d025896d3 --- /dev/null +++ b/swift/common/wsgi.py @@ -0,0 +1,164 @@ +# Copyright (c) 2010 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""WSGI tools for use with swift.""" + +import errno +import os +import signal +import sys +import time +import mimetools + +import eventlet +from eventlet import greenio, GreenPool, sleep, wsgi, listen + +# Hook to ensure connection resets don't blow up our servers. +# Remove with next release of Eventlet that has it in the set already. +from errno import ECONNRESET +wsgi.ACCEPT_ERRNO.add(ECONNRESET) + +from eventlet.green import socket, ssl + +from swift.common.utils import get_logger, drop_privileges, \ + LoggerFileObject, NullLogger + +def monkey_patch_mimetools(): + """ + mimetools.Message defaults content-type to "text/plain" + This changes it to default to None, so we can detect missing headers. + """ + + orig_parsetype = mimetools.Message.parsetype + + def parsetype(self): + if not self.typeheader: + self.type = None + self.maintype = None + self.subtype = None + self.plisttext = '' + else: + orig_parsetype(self) + + mimetools.Message.parsetype = parsetype + +# We might be able to pull pieces of this out to test, but right now it seems +# like more work than it's worth. +def run_wsgi(app, conf, *args, **kwargs): # pragma: no cover + """ + Loads common settings from conf, then instantiates app and runs + the server using the specified number of workers. + + :param app: WSGI callable + :param conf: Configuration dictionary + """ + if 'logger' in kwargs: + logger = kwargs['logger'] + else: + logger = get_logger(conf, app.log_name) + # log uncaught exceptions + sys.excepthook = lambda *exc_info: \ + logger.critical('UNCAUGHT EXCEPTION', exc_info=exc_info) + sys.stdout = sys.stderr = LoggerFileObject(logger) + + try: + os.setsid() + except OSError: + no_cover = True # pass + bind_addr = (conf.get('bind_ip', '0.0.0.0'), + int(conf.get('bind_port', kwargs.get('default_port', 8080)))) + sock = None + retry_until = time.time() + 30 + while not sock and time.time() < retry_until: + try: + sock = listen(bind_addr) + if 'cert_file' in conf: + sock = ssl.wrap_socket(sock, certfile=conf['cert_file'], + keyfile=conf['key_file']) + except socket.error, err: + if err.args[0] != errno.EADDRINUSE: + raise + sleep(0.1) + if not sock: + raise Exception('Could not bind to %s:%s after trying for 30 seconds' % + bind_addr) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + # in my experience, sockets can hang around forever without keepalive + sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 600) + worker_count = int(conf.get('workers', '1')) + drop_privileges(conf.get('user', 'swift')) + if isinstance(app, type): + # Instantiate app if it hasn't been already + app = app(conf, *args) + + def run_server(): + wsgi.HttpProtocol.default_request_version = "HTTP/1.0" + eventlet.hubs.use_hub('poll') + eventlet.patcher.monkey_patch(all=False, socket=True) + monkey_patch_mimetools() + pool = GreenPool(size=1024) + try: + wsgi.server(sock, app, NullLogger(), custom_pool=pool) + except socket.error, err: + if err[0] != errno.EINVAL: + raise + pool.waitall() + + # Useful for profiling [no forks]. + if worker_count == 0: + run_server() + return + + def kill_children(*args): + """Kills the entire process group.""" + logger.error('SIGTERM received') + signal.signal(signal.SIGTERM, signal.SIG_IGN) + running[0] = False + os.killpg(0, signal.SIGTERM) + + def hup(*args): + """Shuts down the server, but allows running requests to complete""" + logger.error('SIGHUP received') + signal.signal(signal.SIGHUP, signal.SIG_IGN) + running[0] = False + + running = [True] + signal.signal(signal.SIGTERM, kill_children) + signal.signal(signal.SIGHUP, hup) + children = [] + while running[0]: + while len(children) < worker_count: + pid = os.fork() + if pid == 0: + signal.signal(signal.SIGHUP, signal.SIG_DFL) + signal.signal(signal.SIGTERM, signal.SIG_DFL) + run_server() + logger.info('Child %d exiting normally' % os.getpid()) + return + else: + logger.info('Started child %s' % pid) + children.append(pid) + try: + pid, status = os.wait() + if os.WIFEXITED(status) or os.WIFSIGNALED(status): + logger.error('Removing dead child %s' % pid) + children.remove(pid) + except OSError, err: + if err.errno not in (errno.EINTR, errno.ECHILD): + raise + greenio.shutdown_safe(sock) + sock.close() + logger.info('Exited') |