summaryrefslogtreecommitdiff
path: root/swift/common
diff options
context:
space:
mode:
Diffstat (limited to 'swift/common')
-rw-r--r--swift/common/__init__.py6
-rw-r--r--swift/common/auth.py98
-rw-r--r--swift/common/bufferedhttp.py158
-rw-r--r--swift/common/client.py718
-rw-r--r--swift/common/constraints.py152
-rw-r--r--swift/common/db.py1463
-rw-r--r--swift/common/db_replicator.py526
-rw-r--r--swift/common/direct_client.py303
-rw-r--r--swift/common/exceptions.py35
-rw-r--r--swift/common/healthcheck.py28
-rw-r--r--swift/common/memcached.py272
-rw-r--r--swift/common/utils.py490
-rw-r--r--swift/common/wsgi.py164
13 files changed, 4413 insertions, 0 deletions
diff --git a/swift/common/__init__.py b/swift/common/__init__.py
new file mode 100644
index 000000000..fe1925beb
--- /dev/null
+++ b/swift/common/__init__.py
@@ -0,0 +1,6 @@
+""" Code common to all of Swift. """
+
+ACCOUNT_LISTING_LIMIT = 10000
+CONTAINER_LISTING_LIMIT = 10000
+FILE_SIZE_LIMIT = 5368709122
+
diff --git a/swift/common/auth.py b/swift/common/auth.py
new file mode 100644
index 000000000..1e7be05da
--- /dev/null
+++ b/swift/common/auth.py
@@ -0,0 +1,98 @@
+# Copyright (c) 2010 OpenStack, LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ConfigParser import ConfigParser, NoOptionError
+import os
+import time
+
+from webob.request import Request
+from webob.exc import HTTPUnauthorized, HTTPPreconditionFailed
+from eventlet.timeout import Timeout
+
+from swift.common.utils import split_path
+from swift.common.bufferedhttp import http_connect_raw as http_connect
+
+
+class DevAuthMiddleware(object):
+ """
+ Auth Middleware that uses the dev auth server
+ """
+ def __init__(self, app, conf, memcache_client, logger):
+ self.app = app
+ self.memcache_client = memcache_client
+ self.logger = logger
+ self.conf = conf
+ self.auth_host = conf.get('bind_ip', '127.0.0.1')
+ self.auth_port = int(conf.get('bind_port', 11000))
+ self.timeout = int(conf.get('node_timeout', 10))
+
+ def __call__(self, env, start_response):
+ req = Request(env)
+ if req.path != '/healthcheck':
+ if 'x-storage-token' in req.headers and \
+ 'x-auth-token' not in req.headers:
+ req.headers['x-auth-token'] = req.headers['x-storage-token']
+ version, account, container, obj = split_path(req.path, 1, 4, True)
+ if account is None:
+ return HTTPPreconditionFailed(request=req, body='Bad URL')(
+ env, start_response)
+ if not req.headers.get('x-auth-token'):
+ return HTTPPreconditionFailed(request=req,
+ body='Missing Auth Token')(env, start_response)
+ if account is None:
+ return HTTPPreconditionFailed(
+ request=req, body='Bad URL')(env, start_response)
+ if not self.auth(account, req.headers['x-auth-token']):
+ return HTTPUnauthorized(request=req)(env, start_response)
+
+ # If we get here, then things should be good.
+ return self.app(env, start_response)
+
+ def auth(self, account, token):
+ """
+ Dev authorization implmentation
+
+ :param account: account name
+ :param token: auth token
+
+ :returns: True if authorization is successful, False otherwise
+ """
+ key = 'auth/%s/%s' % (account, token)
+ now = time.time()
+ cached_auth_data = self.memcache_client.get(key)
+ if cached_auth_data:
+ start, expiration = cached_auth_data
+ if now - start <= expiration:
+ return True
+ try:
+ with Timeout(self.timeout):
+ conn = http_connect(self.auth_host, self.auth_port, 'GET',
+ '/token/%s/%s' % (account, token))
+ resp = conn.getresponse()
+ resp.read()
+ conn.close()
+ if resp.status == 204:
+ validated = float(resp.getheader('x-auth-ttl'))
+ else:
+ validated = False
+ except:
+ self.logger.exception('ERROR with auth')
+ return False
+ if not validated:
+ return False
+ else:
+ val = (now, validated)
+ self.memcache_client.set(key, val, timeout=validated)
+ return True
diff --git a/swift/common/bufferedhttp.py b/swift/common/bufferedhttp.py
new file mode 100644
index 000000000..ed5f0cec2
--- /dev/null
+++ b/swift/common/bufferedhttp.py
@@ -0,0 +1,158 @@
+# Copyright (c) 2010 OpenStack, LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Monkey Patch httplib.HTTPResponse to buffer reads of headers. This can improve
+performance when making large numbers of small HTTP requests. This module
+also provides helper functions to make HTTP connections using
+BufferedHTTPResponse.
+
+.. warning::
+
+ If you use this, be sure that the libraries you are using do not access
+ the socket directly (xmlrpclib, I'm looking at you :/), and instead
+ make all calls through httplib.
+"""
+
+from urllib import quote
+import logging
+import time
+
+from eventlet.green.httplib import HTTPConnection, HTTPResponse, _UNKNOWN, \
+ CONTINUE, HTTPMessage
+
+
+class BufferedHTTPResponse(HTTPResponse):
+ """HTTPResponse class that buffers reading of headers"""
+
+ def __init__(self, sock, debuglevel=0, strict=0,
+ method=None): # pragma: no cover
+ self.sock = sock
+ self.fp = sock.makefile('rb')
+ self.debuglevel = debuglevel
+ self.strict = strict
+ self._method = method
+
+ self.msg = None
+
+ # from the Status-Line of the response
+ self.version = _UNKNOWN # HTTP-Version
+ self.status = _UNKNOWN # Status-Code
+ self.reason = _UNKNOWN # Reason-Phrase
+
+ self.chunked = _UNKNOWN # is "chunked" being used?
+ self.chunk_left = _UNKNOWN # bytes left to read in current chunk
+ self.length = _UNKNOWN # number of bytes left in response
+ self.will_close = _UNKNOWN # conn will close at end of response
+
+ def expect_response(self):
+ self.fp = self.sock.makefile('rb', 0)
+ version, status, reason = self._read_status()
+ if status != CONTINUE:
+ self._read_status = lambda: (version, status, reason)
+ self.begin()
+ else:
+ self.status = status
+ self.reason = reason.strip()
+ self.version = 11
+ self.msg = HTTPMessage(self.fp, 0)
+ self.msg.fp = None
+
+
+class BufferedHTTPConnection(HTTPConnection):
+ """HTTPConnection class that uses BufferedHTTPResponse"""
+ response_class = BufferedHTTPResponse
+
+ def connect(self):
+ self._connected_time = time.time()
+ return HTTPConnection.connect(self)
+
+ def putrequest(self, method, url, skip_host=0, skip_accept_encoding=0):
+ self._method = method
+ self._path = url
+ self._txn_id = '-'
+ return HTTPConnection.putrequest(self, method, url, skip_host,
+ skip_accept_encoding)
+
+ def putheader(self, header, value):
+ if header.lower() == 'x-cf-trans-id':
+ self._txn_id = value
+ return HTTPConnection.putheader(self, header, value)
+
+ def getexpect(self):
+ response = BufferedHTTPResponse(self.sock, strict=self.strict,
+ method=self._method)
+ response.expect_response()
+ return response
+
+ def getresponse(self):
+ response = HTTPConnection.getresponse(self)
+ logging.debug("HTTP PERF: %.5f seconds to %s %s:%s %s (%s)" %
+ (time.time() - self._connected_time, self._method, self.host,
+ self.port, self._path, self._txn_id))
+ return response
+
+
+def http_connect(ipaddr, port, device, partition, method, path,
+ headers=None, query_string=None):
+ """
+ Helper function to create a HTTPConnection object that is buffered
+ for backend Swift services.
+
+ :param ipaddr: IPv4 address to connect to
+ :param port: port to connect to
+ :param device: device of the node to query
+ :param partition: partition on the device
+ :param method: HTTP method to request ('GET', 'PUT', 'POST', etc.)
+ :param path: request path
+ :param headers: dictionary of headers
+ :param query_string: request query string
+ :returns: HTTPConnection object
+ """
+ conn = BufferedHTTPConnection('%s:%s' % (ipaddr, port))
+ path = quote('/' + device + '/' + str(partition) + path)
+ if query_string:
+ path += '?' + query_string
+ conn.path = path
+ conn.putrequest(method, path)
+ if headers:
+ for header, value in headers.iteritems():
+ conn.putheader(header, value)
+ conn.endheaders()
+ return conn
+
+def http_connect_raw(ipaddr, port, method, path, headers=None,
+ query_string=None):
+ """
+ Helper function to create a HTTPConnection object that is buffered.
+
+ :param ipaddr: IPv4 address to connect to
+ :param port: port to connect to
+ :param method: HTTP method to request ('GET', 'PUT', 'POST', etc.)
+ :param path: request path
+ :param headers: dictionary of headers
+ :param query_string: request query string
+ :returns: HTTPConnection object
+ """
+ conn = BufferedHTTPConnection('%s:%s' % (ipaddr, port))
+ if query_string:
+ path += '?' + query_string
+ conn.path = path
+ conn.putrequest(method, path)
+ if headers:
+ for header, value in headers.iteritems():
+ conn.putheader(header, value)
+ conn.endheaders()
+ return conn
diff --git a/swift/common/client.py b/swift/common/client.py
new file mode 100644
index 000000000..3f3b9d8b4
--- /dev/null
+++ b/swift/common/client.py
@@ -0,0 +1,718 @@
+# Copyright (c) 2010 OpenStack, LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Cloud Files client library used internally
+"""
+import socket
+from cStringIO import StringIO
+from httplib import HTTPConnection, HTTPException, HTTPSConnection
+from re import compile, DOTALL
+from tokenize import generate_tokens, STRING, NAME, OP
+from urllib import quote as _quote, unquote
+from urlparse import urlparse, urlunparse
+
+try:
+ from eventlet import sleep
+except:
+ from time import sleep
+
+
+def quote(value, safe='/'):
+ """
+ Patched version of urllib.quote that encodes utf8 strings before quoting
+ """
+ if isinstance(value, unicode):
+ value = value.encode('utf8')
+ return _quote(value, safe)
+
+
+# look for a real json parser first
+try:
+ # simplejson is popular and pretty good
+ from simplejson import loads as json_loads
+except ImportError:
+ try:
+ # 2.6 will have a json module in the stdlib
+ from json import loads as json_loads
+ except ImportError:
+ # fall back on local parser otherwise
+ comments = compile(r'/\*.*\*/|//[^\r\n]*', DOTALL)
+
+ def json_loads(string):
+ '''
+ Fairly competent json parser exploiting the python tokenizer and
+ eval(). -- From python-cloudfiles
+
+ _loads(serialized_json) -> object
+ '''
+ try:
+ res = []
+ consts = {'true': True, 'false': False, 'null': None}
+ string = '(' + comments.sub('', string) + ')'
+ for type, val, _, _, _ in \
+ generate_tokens(StringIO(string).readline):
+ if (type == OP and val not in '[]{}:,()-') or \
+ (type == NAME and val not in consts):
+ raise AttributeError()
+ elif type == STRING:
+ res.append('u')
+ res.append(val.replace('\\/', '/'))
+ else:
+ res.append(val)
+ return eval(''.join(res), {}, consts)
+ except:
+ raise AttributeError()
+
+
+class ClientException(Exception):
+
+ def __init__(self, msg, http_scheme='', http_host='', http_port='',
+ http_path='', http_query='', http_status=0, http_reason='',
+ http_device=''):
+ Exception.__init__(self, msg)
+ self.msg = msg
+ self.http_scheme = http_scheme
+ self.http_host = http_host
+ self.http_port = http_port
+ self.http_path = http_path
+ self.http_query = http_query
+ self.http_status = http_status
+ self.http_reason = http_reason
+ self.http_device = http_device
+
+ def __str__(self):
+ a = self.msg
+ b = ''
+ if self.http_scheme:
+ b += '%s://' % self.http_scheme
+ if self.http_host:
+ b += self.http_host
+ if self.http_port:
+ b += ':%s' % self.http_port
+ if self.http_path:
+ b += self.http_path
+ if self.http_query:
+ b += '?%s' % self.http_query
+ if self.http_status:
+ if b:
+ b = '%s %s' % (b, self.http_status)
+ else:
+ b = str(self.http_status)
+ if self.http_reason:
+ if b:
+ b = '%s %s' % (b, self.http_reason)
+ else:
+ b = '- %s' % self.http_reason
+ if self.http_device:
+ if b:
+ b = '%s: device %s' % (b, self.http_device)
+ else:
+ b = 'device %s' % self.http_device
+ return b and '%s: %s' % (a, b) or a
+
+
+def http_connection(url):
+ """
+ Make an HTTPConnection or HTTPSConnection
+
+ :param url: url to connect to
+ :returns: tuple of (parsed url, connection object)
+ :raises ClientException: Unable to handle protocol scheme
+ """
+ parsed = urlparse(url)
+ if parsed.scheme == 'http':
+ conn = HTTPConnection(parsed.netloc)
+ elif parsed.scheme == 'https':
+ conn = HTTPSConnection(parsed.netloc)
+ else:
+ raise ClientException('Cannot handle protocol scheme %s for url %s' %
+ (parsed.scheme, repr(url)))
+ return parsed, conn
+
+
+def get_auth(url, user, key, snet=False):
+ """
+ Get authentication credentials
+
+ :param url: authentication URL
+ :param user: user to auth as
+ :param key: key or passowrd for auth
+ :param snet: use SERVICENET internal network default is False
+ :returns: tuple of (storage URL, storage token, auth token)
+ :raises ClientException: HTTP GET request to auth URL failed
+ """
+ parsed, conn = http_connection(url)
+ conn.request('GET', parsed.path, '',
+ {'X-Auth-User': user, 'X-Auth-Key': key})
+ resp = conn.getresponse()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException('Auth GET failed', http_scheme=parsed.scheme,
+ http_host=conn.host, http_port=conn.port,
+ http_path=parsed.path, http_status=resp.status,
+ http_reason=resp.reason)
+ url = resp.getheader('x-storage-url')
+ if snet:
+ parsed = list(urlparse(url))
+ # Second item in the list is the netloc
+ parsed[1] = 'snet-' + parsed[1]
+ url = urlunparse(parsed)
+ return url, resp.getheader('x-storage-token',
+ resp.getheader('x-auth-token'))
+
+
+def get_account(url, token, marker=None, limit=None, prefix=None,
+ http_conn=None, full_listing=False):
+ """
+ Get a listing of containers for the account.
+
+ :param url: storage URL
+ :param token: auth token
+ :param marker: marker query
+ :param limit: limit query
+ :param prefix: prefix query
+ :param http_conn: HTTP connection object (If None, it will create the
+ conn object)
+ :param full_listing: if True, return a full listing, else returns a max
+ of 10000 listings
+ :returns: a list of accounts
+ :raises ClientException: HTTP GET request failed
+ """
+ if not http_conn:
+ http_conn = http_connection(url)
+ if full_listing:
+ rv = []
+ listing = get_account(url, token, marker, limit, prefix, http_conn)
+ while listing:
+ rv.extend(listing)
+ marker = listing[-1]['name']
+ listing = get_account(url, token, marker, limit, prefix, http_conn)
+ return rv
+ parsed, conn = http_conn
+ qs = 'format=json'
+ if marker:
+ qs += '&marker=%s' % quote(marker)
+ if limit:
+ qs += '&limit=%d' % limit
+ if prefix:
+ qs += '&prefix=%s' % quote(prefix)
+ conn.request('GET', '%s?%s' % (parsed.path, qs), '',
+ {'X-Auth-Token': token})
+ resp = conn.getresponse()
+ if resp.status < 200 or resp.status >= 300:
+ resp.read()
+ raise ClientException('Account GET failed', http_scheme=parsed.scheme,
+ http_host=conn.host, http_port=conn.port,
+ http_path=parsed.path, http_query=qs, http_status=resp.status,
+ http_reason=resp.reason)
+ if resp.status == 204:
+ resp.read()
+ return []
+ return json_loads(resp.read())
+
+
+def head_account(url, token, http_conn=None):
+ """
+ Get account stats.
+
+ :param url: storage URL
+ :param token: auth token
+ :param http_conn: HTTP connection object (If None, it will create the
+ conn object)
+ :returns: a tuple of (container count, object count, bytes used)
+ :raises ClientException: HTTP HEAD request failed
+ """
+ if http_conn:
+ parsed, conn = http_conn
+ else:
+ parsed, conn = http_connection(url)
+ conn.request('HEAD', parsed.path, '', {'X-Auth-Token': token})
+ resp = conn.getresponse()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException('Account HEAD failed', http_scheme=parsed.scheme,
+ http_host=conn.host, http_port=conn.port,
+ http_path=parsed.path, http_status=resp.status,
+ http_reason=resp.reason)
+ return int(resp.getheader('x-account-container-count', 0)), \
+ int(resp.getheader('x-account-object-count', 0)), \
+ int(resp.getheader('x-account-bytes-used', 0))
+
+
+def get_container(url, token, container, marker=None, limit=None,
+ prefix=None, delimiter=None, http_conn=None,
+ full_listing=False):
+ """
+ Get a listing of objects for the container.
+
+ :param url: storage URL
+ :param token: auth token
+ :param container: container name to get a listing for
+ :param marker: marker query
+ :param limit: limit query
+ :param prefix: prefix query
+ :param delimeter: string to delimit the queries on
+ :param http_conn: HTTP connection object (If None, it will create the
+ conn object)
+ :param full_listing: if True, return a full listing, else returns a max
+ of 10000 listings
+ :returns: a list of objects
+ :raises ClientException: HTTP GET request failed
+ """
+ if not http_conn:
+ http_conn = http_connection(url)
+ if full_listing:
+ rv = []
+ listing = get_container(url, token, container, marker, limit, prefix,
+ delimiter, http_conn)
+ while listing:
+ rv.extend(listing)
+ if not delimiter:
+ marker = listing[-1]['name']
+ else:
+ marker = listing[-1].get('name', listing[-1].get('subdir'))
+ listing = get_container(url, token, container, marker, limit,
+ prefix, delimiter, http_conn)
+ return rv
+ parsed, conn = http_conn
+ path = '%s/%s' % (parsed.path, quote(container))
+ qs = 'format=json'
+ if marker:
+ qs += '&marker=%s' % quote(marker)
+ if limit:
+ qs += '&limit=%d' % limit
+ if prefix:
+ qs += '&prefix=%s' % quote(prefix)
+ if delimiter:
+ qs += '&delimiter=%s' % quote(delimiter)
+ conn.request('GET', '%s?%s' % (path, qs), '', {'X-Auth-Token': token})
+ resp = conn.getresponse()
+ if resp.status < 200 or resp.status >= 300:
+ resp.read()
+ raise ClientException('Container GET failed',
+ http_scheme=parsed.scheme, http_host=conn.host,
+ http_port=conn.port, http_path=path, http_query=qs,
+ http_status=resp.status, http_reason=resp.reason)
+ if resp.status == 204:
+ resp.read()
+ return []
+ return json_loads(resp.read())
+
+
+def head_container(url, token, container, http_conn=None):
+ """
+ Get container stats.
+
+ :param url: storage URL
+ :param token: auth token
+ :param container: container name to get stats for
+ :param http_conn: HTTP connection object (If None, it will create the
+ conn object)
+ :returns: a tuple of (object count, bytes used)
+ :raises ClientException: HTTP HEAD request failed
+ """
+ if http_conn:
+ parsed, conn = http_conn
+ else:
+ parsed, conn = http_connection(url)
+ path = '%s/%s' % (parsed.path, quote(container))
+ conn.request('HEAD', path, '', {'X-Auth-Token': token})
+ resp = conn.getresponse()
+ resp.read()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException('Container HEAD failed',
+ http_scheme=parsed.scheme, http_host=conn.host,
+ http_port=conn.port, http_path=path, http_status=resp.status,
+ http_reason=resp.reason)
+ return int(resp.getheader('x-container-object-count', 0)), \
+ int(resp.getheader('x-container-bytes-used', 0))
+
+
+def put_container(url, token, container, http_conn=None):
+ """
+ Create a container
+
+ :param url: storage URL
+ :param token: auth token
+ :param container: container name to create
+ :param http_conn: HTTP connection object (If None, it will create the
+ conn object)
+ :raises ClientException: HTTP PUT request failed
+ """
+ if http_conn:
+ parsed, conn = http_conn
+ else:
+ parsed, conn = http_connection(url)
+ path = '%s/%s' % (parsed.path, quote(container))
+ conn.request('PUT', path, '', {'X-Auth-Token': token})
+ resp = conn.getresponse()
+ resp.read()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException('Container PUT failed',
+ http_scheme=parsed.scheme, http_host=conn.host,
+ http_port=conn.port, http_path=path, http_status=resp.status,
+ http_reason=resp.reason)
+
+
+def delete_container(url, token, container, http_conn=None):
+ """
+ Delete a container
+
+ :param url: storage URL
+ :param token: auth token
+ :param container: container name to delete
+ :param http_conn: HTTP connection object (If None, it will create the
+ conn object)
+ :raises ClientException: HTTP DELETE request failed
+ """
+ if http_conn:
+ parsed, conn = http_conn
+ else:
+ parsed, conn = http_connection(url)
+ path = '%s/%s' % (parsed.path, quote(container))
+ conn.request('DELETE', path, '', {'X-Auth-Token': token})
+ resp = conn.getresponse()
+ resp.read()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException('Container DELETE failed',
+ http_scheme=parsed.scheme, http_host=conn.host,
+ http_port=conn.port, http_path=path, http_status=resp.status,
+ http_reason=resp.reason)
+
+
+def get_object(url, token, container, name, http_conn=None,
+ resp_chunk_size=None):
+ """
+ Get an object
+
+ :param url: storage URL
+ :param token: auth token
+ :param container: container name that the object is in
+ :param name: object name to get
+ :param http_conn: HTTP connection object (If None, it will create the
+ conn object)
+ :param resp_chunk_size: if defined, chunk size of data to read
+ :returns: a list of objects
+ :raises ClientException: HTTP GET request failed
+ """
+ if http_conn:
+ parsed, conn = http_conn
+ else:
+ parsed, conn = http_connection(url)
+ path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
+ conn.request('GET', path, '', {'X-Auth-Token': token})
+ resp = conn.getresponse()
+ if resp.status < 200 or resp.status >= 300:
+ resp.read()
+ raise ClientException('Object GET failed', http_scheme=parsed.scheme,
+ http_host=conn.host, http_port=conn.port, http_path=path,
+ http_status=resp.status, http_reason=resp.reason)
+ metadata = {}
+ for key, value in resp.getheaders():
+ if key.lower().startswith('x-object-meta-'):
+ metadata[unquote(key[len('x-object-meta-'):])] = unquote(value)
+ if resp_chunk_size:
+
+ def _object_body():
+ buf = resp.read(resp_chunk_size)
+ while buf:
+ yield buf
+ buf = resp.read(resp_chunk_size)
+ object_body = _object_body()
+ else:
+ object_body = resp.read()
+ return resp.getheader('content-type'), \
+ int(resp.getheader('content-length', 0)), \
+ resp.getheader('last-modified'), \
+ resp.getheader('etag').strip('"'), \
+ metadata, \
+ object_body
+
+
+def head_object(url, token, container, name, http_conn=None):
+ """
+ Get object info
+
+ :param url: storage URL
+ :param token: auth token
+ :param container: container name that the object is in
+ :param name: object name to get info for
+ :param http_conn: HTTP connection object (If None, it will create the
+ conn object)
+ :returns: a tuple of (content type, content length, last modfied, etag,
+ dictionary of metadata)
+ :raises ClientException: HTTP HEAD request failed
+ """
+ if http_conn:
+ parsed, conn = http_conn
+ else:
+ parsed, conn = http_connection(url)
+ path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
+ conn.request('HEAD', path, '', {'X-Auth-Token': token})
+ resp = conn.getresponse()
+ resp.read()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException('Object HEAD failed', http_scheme=parsed.scheme,
+ http_host=conn.host, http_port=conn.port, http_path=path,
+ http_status=resp.status, http_reason=resp.reason)
+ metadata = {}
+ for key, value in resp.getheaders():
+ if key.lower().startswith('x-object-meta-'):
+ metadata[unquote(key[len('x-object-meta-'):])] = unquote(value)
+ return resp.getheader('content-type'), \
+ int(resp.getheader('content-length', 0)), \
+ resp.getheader('last-modified'), \
+ resp.getheader('etag').strip('"'), \
+ metadata
+
+
+def put_object(url, token, container, name, contents, metadata={},
+ content_length=None, etag=None, chunk_size=65536,
+ content_type=None, http_conn=None):
+ """
+ Put an object
+
+ :param url: storage URL
+ :param token: auth token
+ :param container: container name that the object is in
+ :param name: object name to put
+ :param contents: file like object to read object data from
+ :param metadata: dictionary of object metadata
+ :param content_length: value to send as content-length header
+ :param etag: etag of contents
+ :param chunk_size: chunk size of data to write
+ :param content_type: value to send as content-type header
+ :param http_conn: HTTP connection object (If None, it will create the
+ conn object)
+ :returns: etag from server response
+ :raises ClientException: HTTP PUT request failed
+ """
+ if http_conn:
+ parsed, conn = http_conn
+ else:
+ parsed, conn = http_connection(url)
+ path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
+ headers = {'X-Auth-Token': token}
+ for key, value in metadata.iteritems():
+ headers['X-Object-Meta-%s' % quote(key)] = quote(value)
+ if etag:
+ headers['ETag'] = etag.strip('"')
+ if content_length is not None:
+ headers['Content-Length'] = str(content_length)
+ if content_type is not None:
+ headers['Content-Type'] = content_type
+ if not contents:
+ headers['Content-Length'] = '0'
+ if hasattr(contents, 'read'):
+ conn.putrequest('PUT', path)
+ for header, value in headers.iteritems():
+ conn.putheader(header, value)
+ if not content_length:
+ conn.putheader('Transfer-Encoding', 'chunked')
+ conn.endheaders()
+ chunk = contents.read(chunk_size)
+ while chunk:
+ if not content_length:
+ conn.send('%x\r\n%s\r\n' % (len(chunk), chunk))
+ else:
+ conn.send(chunk)
+ chunk = contents.read(chunk_size)
+ if not content_length:
+ conn.send('0\r\n\r\n')
+ else:
+ conn.request('PUT', path, contents, headers)
+ resp = conn.getresponse()
+ resp.read()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException('Object PUT failed', http_scheme=parsed.scheme,
+ http_host=conn.host, http_port=conn.port, http_path=path,
+ http_status=resp.status, http_reason=resp.reason)
+ return resp.getheader('etag').strip('"')
+
+
+def post_object(url, token, container, name, metadata, http_conn=None):
+ """
+ Change object metadata
+
+ :param url: storage URL
+ :param token: auth token
+ :param container: container name that the object is in
+ :param name: object name to change
+ :param metadata: dictionary of object metadata
+ :param http_conn: HTTP connection object (If None, it will create the
+ conn object)
+ :raises ClientException: HTTP POST request failed
+ """
+ if http_conn:
+ parsed, conn = http_conn
+ else:
+ parsed, conn = http_connection(url)
+ path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
+ headers = {'X-Auth-Token': token}
+ for key, value in metadata.iteritems():
+ headers['X-Object-Meta-%s' % quote(key)] = quote(value)
+ conn.request('POST', path, '', headers)
+ resp = conn.getresponse()
+ resp.read()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException('Object POST failed', http_scheme=parsed.scheme,
+ http_host=conn.host, http_port=conn.port, http_path=path,
+ http_status=resp.status, http_reason=resp.reason)
+
+
+def delete_object(url, token, container, name, http_conn=None):
+ """
+ Delete object
+
+ :param url: storage URL
+ :param token: auth token
+ :param container: container name that the object is in
+ :param name: object name to delete
+ :param http_conn: HTTP connection object (If None, it will create the
+ conn object)
+ :raises ClientException: HTTP DELETE request failed
+ """
+ if http_conn:
+ parsed, conn = http_conn
+ else:
+ parsed, conn = http_connection(url)
+ path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
+ conn.request('DELETE', path, '', {'X-Auth-Token': token})
+ resp = conn.getresponse()
+ resp.read()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException('Object DELETE failed',
+ http_scheme=parsed.scheme, http_host=conn.host,
+ http_port=conn.port, http_path=path, http_status=resp.status,
+ http_reason=resp.reason)
+
+
+class Connection(object):
+ """Convenience class to make requests that will also retry the request"""
+
+ def __init__(self, authurl, user, key, retries=5, preauthurl=None,
+ preauthtoken=None, snet=False):
+ """
+ :param authurl: authenitcation URL
+ :param user: user name to authenticate as
+ :param key: key/password to authenticate with
+ :param retries: Number of times to retry the request before failing
+ :param preauthurl: storage URL (if you have already authenticated)
+ :param preauthtoken: authentication token (if you have already
+ authenticated)
+ :param snet: use SERVICENET internal network default is False
+ """
+ self.authurl = authurl
+ self.user = user
+ self.key = key
+ self.retries = retries
+ self.http_conn = None
+ self.url = preauthurl
+ self.token = preauthtoken
+ self.attempts = 0
+ self.snet = snet
+
+ def _retry(self, func, *args, **kwargs):
+ kwargs['http_conn'] = self.http_conn
+ self.attempts = 0
+ backoff = 1
+ while self.attempts <= self.retries:
+ self.attempts += 1
+ try:
+ if not self.url or not self.token:
+ self.url, self.token = \
+ get_auth(self.authurl, self.user, self.key, snet=self.snet)
+ self.http_conn = None
+ if not self.http_conn:
+ self.http_conn = http_connection(self.url)
+ kwargs['http_conn'] = self.http_conn
+ rv = func(self.url, self.token, *args, **kwargs)
+ return rv
+ except (socket.error, HTTPException):
+ if self.attempts > self.retries:
+ raise
+ self.http_conn = None
+ except ClientException, err:
+ if self.attempts > self.retries:
+ raise
+ if err.http_status == 401:
+ self.url = self.token = None
+ if self.attempts > 1:
+ raise
+ elif 500 <= err.http_status <= 599:
+ pass
+ else:
+ raise
+ sleep(backoff)
+ backoff *= 2
+
+ def head_account(self):
+ """Wrapper for head_account"""
+ return self._retry(head_account)
+
+ def get_account(self, marker=None, limit=None, prefix=None,
+ full_listing=False):
+ """Wrapper for get_account"""
+ # TODO: With full_listing=True this will restart the entire listing
+ # with each retry. Need to make a better version that just retries
+ # where it left off.
+ return self._retry(get_account, marker=marker, limit=limit,
+ prefix=prefix, full_listing=full_listing)
+
+ def head_container(self, container):
+ """Wrapper for head_container"""
+ return self._retry(head_container, container)
+
+ def get_container(self, container, marker=None, limit=None, prefix=None,
+ delimiter=None, full_listing=False):
+ """Wrapper for get_container"""
+ # TODO: With full_listing=True this will restart the entire listing
+ # with each retry. Need to make a better version that just retries
+ # where it left off.
+ return self._retry(get_container, container, marker=marker,
+ limit=limit, prefix=prefix, delimiter=delimiter,
+ full_listing=full_listing)
+
+ def put_container(self, container):
+ """Wrapper for put_container"""
+ return self._retry(put_container, container)
+
+ def delete_container(self, container):
+ """Wrapper for delete_container"""
+ return self._retry(delete_container, container)
+
+ def head_object(self, container, obj):
+ """Wrapper for head_object"""
+ return self._retry(head_object, container, obj)
+
+ def get_object(self, container, obj, resp_chunk_size=None):
+ """Wrapper for get_object"""
+ return self._retry(get_object, container, obj,
+ resp_chunk_size=resp_chunk_size)
+
+ def put_object(self, container, obj, contents, metadata={},
+ content_length=None, etag=None, chunk_size=65536,
+ content_type=None):
+ """Wrapper for put_object"""
+ return self._retry(put_object, container, obj, contents,
+ metadata=metadata, content_length=content_length, etag=etag,
+ chunk_size=chunk_size, content_type=content_type)
+
+ def post_object(self, container, obj, metadata):
+ """Wrapper for post_object"""
+ return self._retry(post_object, container, obj, metadata)
+
+ def delete_object(self, container, obj):
+ """Wrapper for delete_object"""
+ return self._retry(delete_object, container, obj)
diff --git a/swift/common/constraints.py b/swift/common/constraints.py
new file mode 100644
index 000000000..8fdacb5a9
--- /dev/null
+++ b/swift/common/constraints.py
@@ -0,0 +1,152 @@
+# Copyright (c) 2010 OpenStack, LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import re
+
+from webob.exc import HTTPBadRequest, HTTPLengthRequired, \
+ HTTPRequestEntityTooLarge
+
+
+#: Max file size allowed for objects
+MAX_FILE_SIZE = 5 * 1024 * 1024 * 1024 + 2
+#: Max length of the name of a key for metadata
+MAX_META_NAME_LENGTH = 128
+#: Max length of the value of a key for metadata
+MAX_META_VALUE_LENGTH = 256
+#: Max number of metadata items
+MAX_META_COUNT = 90
+#: Max overall size of metadata
+MAX_META_OVERALL_SIZE = 4096
+#: Max object name length
+MAX_OBJECT_NAME_LENGTH = 1024
+
+
+def check_metadata(req):
+ """
+ Check metadata sent for objects in the request headers.
+
+ :param req: request object
+ :raises HTTPBadRequest: bad metadata
+ """
+ meta_count = 0
+ meta_size = 0
+ for key, value in req.headers.iteritems():
+ if not key.lower().startswith('x-object-meta-'):
+ continue
+ key = key[len('x-object-meta-'):]
+ if not key:
+ return HTTPBadRequest(body='Metadata name cannot be empty',
+ request=req, content_type='text/plain')
+ meta_count += 1
+ meta_size += len(key) + len(value)
+ if len(key) > MAX_META_NAME_LENGTH:
+ return HTTPBadRequest(
+ body='Metadata name too long; max %d'
+ % MAX_META_NAME_LENGTH,
+ request=req, content_type='text/plain')
+ elif len(value) > MAX_META_VALUE_LENGTH:
+ return HTTPBadRequest(
+ body='Metadata value too long; max %d'
+ % MAX_META_VALUE_LENGTH,
+ request=req, content_type='text/plain')
+ elif meta_count > MAX_META_COUNT:
+ return HTTPBadRequest(
+ body='Too many metadata items; max %d' % MAX_META_COUNT,
+ request=req, content_type='text/plain')
+ elif meta_size > MAX_META_OVERALL_SIZE:
+ return HTTPBadRequest(
+ body='Total metadata too large; max %d'
+ % MAX_META_OVERALL_SIZE,
+ request=req, content_type='text/plain')
+ return None
+
+
+def check_object_creation(req, object_name):
+ """
+ Check to ensure that everything is alright about an object to be created.
+
+ :param req: HTTP request object
+ :param object_name: name of object to be created
+ :raises HTTPRequestEntityTooLarge: the object is too large
+ :raises HTTPLengthRequered: missing content-length header and not
+ a chunked request
+ :raises HTTPBadRequest: missing or bad content-type header, or
+ bad metadata
+ """
+ if req.content_length and req.content_length > MAX_FILE_SIZE:
+ return HTTPRequestEntityTooLarge(body='Your request is too large.',
+ request=req, content_type='text/plain')
+ if req.content_length is None and \
+ req.headers.get('transfer-encoding') != 'chunked':
+ return HTTPLengthRequired(request=req)
+ if len(object_name) > MAX_OBJECT_NAME_LENGTH:
+ return HTTPBadRequest(body='Object name length of %d longer than %d' %
+ (len(object_name), MAX_OBJECT_NAME_LENGTH), request=req,
+ content_type='text/plain')
+ if 'Content-Type' not in req.headers:
+ return HTTPBadRequest(request=req, content_type='text/plain',
+ body='No content type')
+ if not check_xml_encodable(req.headers['Content-Type']):
+ return HTTPBadRequest(request=req, body='Invalid Content-Type',
+ content_type='text/plain')
+ return check_metadata(req)
+
+
+def check_mount(root, drive):
+ """
+ Verify that the path to the device is a mount point and mounted. This
+ allows us to fast fail on drives that have been unmounted because of
+ issues, and also prevents us for accidently filling up the root partition.
+
+ :param root: base path where the devices are mounted
+ :param drive: drive name to be checked
+ :returns: True if it is a valid mounted device, False otherwise
+ """
+ if not drive.isalnum():
+ return False
+ path = os.path.join(root, drive)
+ return os.path.exists(path) and os.path.ismount(path)
+
+
+def check_float(string):
+ """
+ Helper function for checking if a string can be converted to a float.
+
+ :param string: string to be verified as a float
+ :returns: True if the string can be converted to a float, False otherwise
+ """
+ try:
+ float(string)
+ return True
+ except ValueError:
+ return False
+
+
+_invalid_xml = re.compile(ur'[^\x09\x0a\x0d\x20-\uD7FF\uE000-\uFFFD%s-%s]' %
+ (unichr(0x10000), unichr(0x10FFFF)))
+
+
+def check_xml_encodable(string):
+ """
+ Validate if a string can be encoded in xml.
+
+ :param string: string to be validated
+ :returns: True if the string can be encoded in xml, False otherwise
+ """
+ try:
+ return not _invalid_xml.search(string.decode('UTF-8'))
+ except UnicodeDecodeError:
+ return False
diff --git a/swift/common/db.py b/swift/common/db.py
new file mode 100644
index 000000000..3464d451b
--- /dev/null
+++ b/swift/common/db.py
@@ -0,0 +1,1463 @@
+# Copyright (c) 2010 OpenStack, LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+""" Database code for Swift """
+
+from __future__ import with_statement
+from contextlib import contextmanager
+import hashlib
+import logging
+import operator
+import os
+from uuid import uuid4
+import time
+import cPickle as pickle
+import errno
+from random import randint
+from tempfile import mkstemp
+import math
+
+from eventlet import sleep
+import sqlite3
+
+from swift.common.utils import normalize_timestamp, renamer, \
+ mkdirs, lock_parent_directory, fallocate
+from swift.common.exceptions import LockTimeout
+
+
+#: Timeout for trying to connect to a DB
+BROKER_TIMEOUT = 25
+#: Pickle protocol to use
+PICKLE_PROTOCOL = 2
+#: Max number of pending entries
+PENDING_CAP = 131072
+
+
+class DatabaseConnectionError(sqlite3.DatabaseError):
+ """More friendly error messages for DB Errors."""
+ def __init__(self, path, msg, timeout=0):
+ self.path = path
+ self.timeout = timeout
+ self.msg = msg
+
+ def __str__(self):
+ return 'DB connection error (%s, %s):\n%s' % (
+ self.path, self.timeout, self.msg)
+
+
+class GreenDBConnection(sqlite3.Connection):
+ """SQLite DB Connection handler that plays well with eventlet."""
+ def __init__(self, *args, **kwargs):
+ self.timeout = kwargs.get('timeout', BROKER_TIMEOUT)
+ kwargs['timeout'] = 0
+ self.db_file = args and args[0] or '-'
+ sqlite3.Connection.__init__(self, *args, **kwargs)
+
+ def _timeout(self, call):
+ with LockTimeout(self.timeout, self.db_file):
+ while True:
+ try:
+ return call()
+ except sqlite3.OperationalError, e:
+ if 'locked' not in str(e):
+ raise
+ sleep(0.05)
+
+ def execute(self, *args, **kwargs):
+ return self._timeout(lambda: sqlite3.Connection.execute(
+ self, *args, **kwargs))
+
+ def commit(self):
+ return self._timeout(lambda: sqlite3.Connection.commit(self))
+
+
+def dict_factory(crs, row):
+ """
+ This should only be used when you need a real dict,
+ i.e. when you're going to serialize the results.
+ """
+ return dict(
+ ((col[0], row[idx]) for idx, col in enumerate(crs.description)))
+
+
+def chexor(old, name, timestamp):
+ """
+ Each entry in the account and container databases is XORed by the 128-bit
+ hash on insert or delete. This serves as a rolling, order-independent hash
+ of the contents. (check + XOR)
+
+ :param old: hex representation of the current DB hash
+ :param name: name of the object or container being inserted
+ :param timestamp: timestamp of the new record
+ :returns: a hex representation of the new hash value
+ """
+ if name is None:
+ raise Exception('name is None!')
+ old = old.decode('hex')
+ new = hashlib.md5(('%s-%s' % (name, timestamp)).encode('utf_8')).digest()
+ response = ''.join(
+ map(chr, map(operator.xor, map(ord, old), map(ord, new))))
+ return response.encode('hex')
+
+
+def get_db_connection(path, timeout=30, okay_to_create=False):
+ """
+ Returns a properly configured SQLite database connection.
+
+ :param path: path to DB
+ :param timeout: timeout for connection
+ :param okay_to_create: if True, create the DB if it doesn't exist
+ :returns: DB connection object
+ """
+ try:
+ connect_time = time.time()
+ conn = sqlite3.connect(path, check_same_thread=False,
+ factory=GreenDBConnection, timeout=timeout)
+ if path != ':memory:' and not okay_to_create:
+ # attempt to detect and fail when connect creates the db file
+ stat = os.stat(path)
+ if stat.st_size == 0 and stat.st_ctime >= connect_time:
+ os.unlink(path)
+ raise DatabaseConnectionError(path,
+ 'DB file created by connect?')
+ conn.row_factory = sqlite3.Row
+ conn.text_factory = str
+ conn.execute('PRAGMA synchronous = NORMAL')
+ conn.execute('PRAGMA count_changes = OFF')
+ conn.execute('PRAGMA temp_store = MEMORY')
+ conn.create_function('chexor', 3, chexor)
+ except sqlite3.DatabaseError:
+ import traceback
+ raise DatabaseConnectionError(path, traceback.format_exc(),
+ timeout=timeout)
+ return conn
+
+
+class DatabaseBroker(object):
+ """Encapsulates working with a database."""
+
+ def __init__(self, db_file, timeout=BROKER_TIMEOUT, logger=None,
+ account=None, container=None, pending_timeout=10,
+ stale_reads_ok=False):
+ """ Encapsulates working with a database. """
+ self.conn = None
+ self.db_file = db_file
+ self.pending_file = self.db_file + '.pending'
+ self.pending_timeout = pending_timeout
+ self.stale_reads_ok = stale_reads_ok
+ self.db_dir = os.path.dirname(db_file)
+ self.timeout = timeout
+ self.logger = logger or logging.getLogger()
+ self.account = account
+ self.container = container
+
+ def initialize(self, put_timestamp=None):
+ """
+ Create the DB
+
+ :param put_timestamp: timestamp of initial PUT request
+ """
+ if self.db_file == ':memory:':
+ tmp_db_file = None
+ conn = get_db_connection(self.db_file, self.timeout)
+ else:
+ mkdirs(self.db_dir)
+ fd, tmp_db_file = mkstemp(suffix='.tmp', dir=self.db_dir)
+ os.close(fd)
+ conn = sqlite3.connect(tmp_db_file, check_same_thread=False,
+ factory=GreenDBConnection, timeout=0)
+ # creating dbs implicitly does a lot of transactions, so we
+ # pick fast, unsafe options here and do a big fsync at the end.
+ conn.execute('PRAGMA synchronous = OFF')
+ conn.execute('PRAGMA temp_store = MEMORY')
+ conn.execute('PRAGMA journal_mode = MEMORY')
+ conn.create_function('chexor', 3, chexor)
+ conn.row_factory = sqlite3.Row
+ conn.text_factory = str
+ conn.executescript("""
+ CREATE TABLE outgoing_sync (
+ remote_id TEXT UNIQUE,
+ sync_point INTEGER,
+ updated_at TEXT DEFAULT 0
+ );
+ CREATE TABLE incoming_sync (
+ remote_id TEXT UNIQUE,
+ sync_point INTEGER,
+ updated_at TEXT DEFAULT 0
+ );
+ CREATE TRIGGER outgoing_sync_insert AFTER INSERT ON outgoing_sync
+ BEGIN
+ UPDATE outgoing_sync
+ SET updated_at = STRFTIME('%s', 'NOW')
+ WHERE ROWID = new.ROWID;
+ END;
+ CREATE TRIGGER outgoing_sync_update AFTER UPDATE ON outgoing_sync
+ BEGIN
+ UPDATE outgoing_sync
+ SET updated_at = STRFTIME('%s', 'NOW')
+ WHERE ROWID = new.ROWID;
+ END;
+ CREATE TRIGGER incoming_sync_insert AFTER INSERT ON incoming_sync
+ BEGIN
+ UPDATE incoming_sync
+ SET updated_at = STRFTIME('%s', 'NOW')
+ WHERE ROWID = new.ROWID;
+ END;
+ CREATE TRIGGER incoming_sync_update AFTER UPDATE ON incoming_sync
+ BEGIN
+ UPDATE incoming_sync
+ SET updated_at = STRFTIME('%s', 'NOW')
+ WHERE ROWID = new.ROWID;
+ END;
+ """)
+ if not put_timestamp:
+ put_timestamp = normalize_timestamp(0)
+ self._initialize(conn, put_timestamp)
+ conn.commit()
+ if tmp_db_file:
+ conn.close()
+ with open(tmp_db_file, 'r+b') as fp:
+ os.fsync(fp.fileno())
+ with lock_parent_directory(self.db_file, self.pending_timeout):
+ if os.path.exists(self.db_file):
+ # It's as if there was a "condition" where different parts
+ # of the system were "racing" each other.
+ raise DatabaseConnectionError(self.db_file,
+ 'DB created by someone else while working?')
+ renamer(tmp_db_file, self.db_file)
+ self.conn = get_db_connection(self.db_file, self.timeout)
+ else:
+ self.conn = conn
+
+ def delete_db(self, timestamp):
+ """
+ Mark the DB as deleted
+
+ :param timestamp: delete timestamp
+ """
+ timestamp = normalize_timestamp(timestamp)
+ with self.get() as conn:
+ self._delete_db(conn, timestamp)
+ conn.commit()
+
+ @contextmanager
+ def get(self):
+ """Use with the "with" statement; returns a database connection."""
+ if not self.conn:
+ if self.db_file != ':memory:' and os.path.exists(self.db_file):
+ self.conn = get_db_connection(self.db_file, self.timeout)
+ else:
+ raise DatabaseConnectionError(self.db_file, "DB doesn't exist")
+ conn = self.conn
+ self.conn = None
+ try:
+ yield conn
+ conn.rollback()
+ self.conn = conn
+ except:
+ conn.close()
+ raise
+
+ @contextmanager
+ def lock(self):
+ """Use with the "with" statement; locks a database."""
+ if not self.conn:
+ if self.db_file != ':memory:' and os.path.exists(self.db_file):
+ self.conn = get_db_connection(self.db_file, self.timeout)
+ else:
+ raise DatabaseConnectionError(self.db_file, "DB doesn't exist")
+ conn = self.conn
+ self.conn = None
+ orig_isolation_level = conn.isolation_level
+ conn.isolation_level = None
+ conn.execute('BEGIN IMMEDIATE')
+ try:
+ yield True
+ except:
+ pass
+ try:
+ conn.execute('ROLLBACK')
+ conn.isolation_level = orig_isolation_level
+ self.conn = conn
+ except: # pragma: no cover
+ logging.exception(
+ 'Broker error trying to rollback locked connection')
+ conn.close()
+
+ def newid(self, remote_id):
+ """
+ Re-id the database. This should be called after an rsync.
+
+ :param remote_id: the ID of the remote database being rsynced in
+ """
+ with self.get() as conn:
+ row = conn.execute('''
+ UPDATE %s_stat SET id=?
+ ''' % self.db_type, (str(uuid4()),))
+ row = conn.execute('''
+ SELECT ROWID FROM %s ORDER BY ROWID DESC LIMIT 1
+ ''' % self.db_contains_type).fetchone()
+ sync_point = row['ROWID'] if row else -1
+ conn.execute('''
+ INSERT OR REPLACE INTO incoming_sync (sync_point, remote_id)
+ VALUES (?, ?)
+ ''', (sync_point, remote_id))
+ self._newid(conn)
+ conn.commit()
+
+ def _newid(self, conn):
+ # Override for additional work when receiving an rsynced db.
+ pass
+
+ def merge_timestamps(self, created_at, put_timestamp, delete_timestamp):
+ """
+ Used in replication to handle updating timestamps.
+
+ :param created_at: create timestamp
+ :param put_timestamp: put timestamp
+ :param delete_timestamp: delete timestamp
+ """
+ with self.get() as conn:
+ row = conn.execute('''
+ UPDATE %s_stat SET created_at=MIN(?, created_at),
+ put_timestamp=MAX(?, put_timestamp),
+ delete_timestamp=MAX(?, delete_timestamp)
+ ''' % self.db_type, (created_at, put_timestamp, delete_timestamp))
+ conn.commit()
+
+ def get_items_since(self, start, count):
+ """
+ Get a list of objects in the database between start and end.
+
+ :param start: start ROWID
+ :param count: number to get
+ :returns: list of objects between start and end
+ """
+ try:
+ self._commit_puts()
+ except LockTimeout:
+ if not self.stale_reads_ok:
+ raise
+ with self.get() as conn:
+ curs = conn.execute('''
+ SELECT * FROM %s WHERE ROWID > ? ORDER BY ROWID ASC LIMIT ?
+ ''' % self.db_contains_type, (start, count))
+ curs.row_factory = dict_factory
+ return [r for r in curs]
+
+ def get_sync(self, id, incoming=True):
+ """
+ Gets the most recent sync point for a server from the sync table.
+
+ :param id: remote ID to get the sync_point for
+ :param incoming: if True, get the last incoming sync, otherwise get
+ the last outgoing sync
+ :returns: the sync point, or -1 if the id doesn't exist.
+ """
+ with self.get() as conn:
+ row = conn.execute(
+ "SELECT sync_point FROM %s_sync WHERE remote_id=?"
+ % ('incoming' if incoming else 'outgoing'), (id,)).fetchone()
+ if not row:
+ return -1
+ return row['sync_point']
+
+ def get_syncs(self, incoming=True):
+ """
+ Get a serialized copy of the sync table.
+
+ :param incoming: if True, get the last incoming sync, otherwise get
+ the last outgoing sync
+ :returns: list of {'remote_id', 'sync_point'}
+ """
+ with self.get() as conn:
+ curs = conn.execute('''
+ SELECT remote_id, sync_point FROM %s_sync
+ ''' % 'incoming' if incoming else 'outgoing')
+ result = []
+ for row in curs:
+ result.append({'remote_id': row[0], 'sync_point': row[1]})
+ return result
+
+ def get_replication_info(self):
+ """
+ Get information about the DB required for replication.
+
+ :returns: tuple of (hash, id, created_at, put_timestamp,
+ delete_timestamp) from the DB
+ """
+ try:
+ self._commit_puts()
+ except LockTimeout:
+ if not self.stale_reads_ok:
+ raise
+ with self.get() as conn:
+ curs = conn.execute('''
+ SELECT hash, id, created_at, put_timestamp, delete_timestamp,
+ %s_count AS count,
+ CASE WHEN SQLITE_SEQUENCE.seq IS NOT NULL
+ THEN SQLITE_SEQUENCE.seq ELSE -1 END AS max_row
+ FROM (%s_stat LEFT JOIN SQLITE_SEQUENCE
+ ON SQLITE_SEQUENCE.name == '%s') LIMIT 1
+ ''' % (self.db_contains_type, self.db_type, self.db_contains_type))
+ curs.row_factory = dict_factory
+ return curs.fetchone()
+
+ def _commit_puts(self):
+ pass # stub to be overridden if need be
+
+ def merge_syncs(self, sync_points, incoming=True):
+ """
+ Merge a list of sync points with the incoming sync table.
+
+ :param sync_points: list of sync points where a sync point is a dict of
+ {'sync_point', 'remote_id'}
+ :param incoming: if True, get the last incoming sync, otherwise get
+ the last outgoing sync
+ """
+ with self.get() as conn:
+ for rec in sync_points:
+ try:
+ conn.execute('''
+ INSERT INTO %s_sync (sync_point, remote_id)
+ VALUES (?, ?)
+ ''' % ('incoming' if incoming else 'outgoing'),
+ (rec['sync_point'], rec['remote_id']))
+ except sqlite3.IntegrityError:
+ conn.execute('''
+ UPDATE %s_sync SET sync_point=max(?, sync_point)
+ WHERE remote_id=?
+ ''' % ('incoming' if incoming else 'outgoing'),
+ (rec['sync_point'], rec['remote_id']))
+ conn.commit()
+
+ def _preallocate(self):
+ """
+ The idea is to allocate space in front of an expanding db. If it gets
+ within 512k of a boundary, it allocates to the next boundary.
+ Boundaries are 2m, 5m, 10m, 25m, 50m, then every 50m after.
+ """
+ if self.db_file == ':memory:':
+ return
+ MB = (1024 * 1024)
+
+ def prealloc_points():
+ for pm in (1, 2, 5, 10, 25, 50):
+ yield pm * MB
+ while True:
+ pm += 50
+ yield pm * MB
+
+ stat = os.stat(self.db_file)
+ file_size = stat.st_size
+ allocated_size = stat.st_blocks * 512
+ for point in prealloc_points():
+ if file_size <= point - MB / 2:
+ prealloc_size = point
+ break
+ if allocated_size < prealloc_size:
+ with open(self.db_file, 'rb+') as fp:
+ fallocate(fp.fileno(), int(prealloc_size))
+
+
+class ContainerBroker(DatabaseBroker):
+ """Encapsulates working with a container database."""
+ db_type = 'container'
+ db_contains_type = 'object'
+
+ def _initialize(self, conn, put_timestamp):
+ """Creates a brand new database (tables, indices, triggers, etc.)"""
+ if not self.account:
+ raise ValueError(
+ 'Attempting to create a new database with no account set')
+ if not self.container:
+ raise ValueError(
+ 'Attempting to create a new database with no container set')
+ self.create_object_table(conn)
+ self.create_container_stat_table(conn, put_timestamp)
+
+ def create_object_table(self, conn):
+ """
+ Create the object table which is specifc to the container DB.
+
+ :param conn: DB connection object
+ """
+ conn.executescript("""
+ CREATE TABLE object (
+ ROWID INTEGER PRIMARY KEY AUTOINCREMENT,
+ name TEXT UNIQUE,
+ created_at TEXT,
+ size INTEGER,
+ content_type TEXT,
+ etag TEXT,
+ deleted INTEGER DEFAULT 0
+ );
+
+ CREATE INDEX ix_object_deleted ON object (deleted);
+
+ CREATE TRIGGER object_insert AFTER INSERT ON object
+ BEGIN
+ UPDATE container_stat
+ SET object_count = object_count + (1 - new.deleted),
+ bytes_used = bytes_used + new.size,
+ hash = chexor(hash, new.name, new.created_at);
+ END;
+
+ CREATE TRIGGER object_update BEFORE UPDATE ON object
+ BEGIN
+ SELECT RAISE(FAIL, 'UPDATE not allowed; DELETE and INSERT');
+ END;
+
+ CREATE TRIGGER object_delete AFTER DELETE ON object
+ BEGIN
+ UPDATE container_stat
+ SET object_count = object_count - (1 - old.deleted),
+ bytes_used = bytes_used - old.size,
+ hash = chexor(hash, old.name, old.created_at);
+ END;
+ """)
+
+ def create_container_stat_table(self, conn, put_timestamp=None):
+ """
+ Create the container_stat table which is specifc to the container DB.
+
+ :param conn: DB connection object
+ :param put_timestamp: put timestamp
+ """
+ if put_timestamp is None:
+ put_timestamp = normalize_timestamp(0)
+ conn.executescript("""
+ CREATE TABLE container_stat (
+ account TEXT,
+ container TEXT,
+ created_at TEXT,
+ put_timestamp TEXT DEFAULT '0',
+ delete_timestamp TEXT DEFAULT '0',
+ object_count INTEGER,
+ bytes_used INTEGER,
+ reported_put_timestamp TEXT DEFAULT '0',
+ reported_delete_timestamp TEXT DEFAULT '0',
+ reported_object_count INTEGER DEFAULT 0,
+ reported_bytes_used INTEGER DEFAULT 0,
+ hash TEXT default '00000000000000000000000000000000',
+ id TEXT,
+ status TEXT DEFAULT '',
+ status_changed_at TEXT DEFAULT '0'
+ );
+
+ INSERT INTO container_stat (object_count, bytes_used)
+ VALUES (0, 0);
+ """)
+ conn.execute('''
+ UPDATE container_stat
+ SET account = ?, container = ?, created_at = ?, id = ?,
+ put_timestamp = ?
+ ''', (self.account, self.container, normalize_timestamp(time.time()),
+ str(uuid4()), put_timestamp))
+
+ def _newid(self, conn):
+ conn.execute('''
+ UPDATE container_stat
+ SET reported_put_timestamp = 0, reported_delete_timestamp = 0,
+ reported_object_count = 0, reported_bytes_used = 0''')
+
+ def update_put_timestamp(self, timestamp):
+ """
+ Update the put_timestamp. Only modifies it if it is greater than
+ the current timestamp.
+
+ :param timestamp: put timestamp
+ """
+ with self.get() as conn:
+ conn.execute('''
+ UPDATE container_stat SET put_timestamp = ?
+ WHERE put_timestamp < ? ''', (timestamp, timestamp))
+ conn.commit()
+
+ def _delete_db(self, conn, timestamp):
+ """
+ Mark the DB as deleted
+
+ :param conn: DB connection object
+ :param timestamp: timestamp to mark as deleted
+ """
+ conn.execute("""
+ UPDATE container_stat
+ SET delete_timestamp = ?,
+ status = 'DELETED',
+ status_changed_at = ?
+ WHERE delete_timestamp < ? """, (timestamp, timestamp, timestamp))
+
+ def empty(self):
+ """
+ Check if the DB is empty.
+
+ :returns: True if the database has no active objects, False otherwise
+ """
+ try:
+ self._commit_puts()
+ except LockTimeout:
+ if not self.stale_reads_ok:
+ raise
+ with self.get() as conn:
+ row = conn.execute(
+ 'SELECT object_count from container_stat').fetchone()
+ return (row[0] == 0)
+
+ def _commit_puts(self, item_list=None):
+ """Handles commiting rows in .pending files."""
+ if self.db_file == ':memory:' or not os.path.exists(self.pending_file):
+ return
+ if item_list is None:
+ item_list = []
+ with lock_parent_directory(self.pending_file, self.pending_timeout):
+ self._preallocate()
+ if not os.path.getsize(self.pending_file):
+ if item_list:
+ self.merge_items(item_list)
+ return
+ with open(self.pending_file, 'r+b') as fp:
+ for entry in fp.read().split(':'):
+ if entry:
+ try:
+ (name, timestamp, size, content_type, etag,
+ deleted) = pickle.loads(entry.decode('base64'))
+ item_list.append({'name': name, 'created_at':
+ timestamp, 'size': size, 'content_type':
+ content_type, 'etag': etag,
+ 'deleted': deleted})
+ except:
+ self.logger.exception(
+ 'Invalid pending entry %s: %s'
+ % (self.pending_file, entry))
+ if item_list:
+ self.merge_items(item_list)
+ try:
+ os.ftruncate(fp.fileno(), 0)
+ except OSError, err:
+ if err.errno != errno.ENOENT:
+ raise
+
+ def reclaim(self, object_timestamp, sync_timestamp):
+ """
+ Delete rows from the object table that are marked deleted and
+ whose created_at timestamp is < object_timestamp. Also deletes rows
+ from incoming_sync and outgoing_sync where the updated_at timestamp is
+ < sync_timestamp.
+
+ :param object_timestamp: max created_at timestamp of object rows to
+ delete
+ :param sync_timestamp: max update_at timestamp of sync rows to delete
+ """
+ self._commit_puts()
+ with self.get() as conn:
+ conn.execute("""
+ DELETE FROM object
+ WHERE deleted = 1
+ AND created_at < ?""", (object_timestamp,))
+ try:
+ conn.execute('''
+ DELETE FROM outgoing_sync WHERE updated_at < ?
+ ''', (sync_timestamp,))
+ conn.execute('''
+ DELETE FROM incoming_sync WHERE updated_at < ?
+ ''', (sync_timestamp,))
+ except sqlite3.OperationalError, err:
+ # Old dbs didn't have updated_at in the _sync tables.
+ if 'no such column: updated_at' not in str(err):
+ raise
+ conn.commit()
+
+ def delete_object(self, name, timestamp):
+ """
+ Mark an object deleted.
+
+ :param name: object name to be deleted
+ :param timestamp: timestamp when the object was marked as deleted
+ """
+ self.put_object(name, timestamp, 0, 'application/deleted', 'noetag', 1)
+
+ def put_object(self, name, timestamp, size, content_type, etag, deleted=0):
+ """
+ Creates an object in the DB with its metadata.
+
+ :param name: object name to be created
+ :param timestamp: timestamp of when the object was created
+ :param size: object size
+ :param content_type: object content-type
+ :param etag: object etag
+ :param deleted: if True, marks the object as deleted and sets the
+ deteleted_at timestamp to timestamp
+ """
+ record = {'name': name, 'created_at': timestamp, 'size': size,
+ 'content_type': content_type, 'etag': etag,
+ 'deleted': deleted}
+ if self.db_file == ':memory:':
+ self.merge_items([record])
+ return
+ if not os.path.exists(self.db_file):
+ raise DatabaseConnectionError(self.db_file, "DB doesn't exist")
+ pending_size = 0
+ try:
+ pending_size = os.path.getsize(self.pending_file)
+ except OSError, err:
+ if err.errno != errno.ENOENT:
+ raise
+ if pending_size > PENDING_CAP:
+ self._commit_puts([record])
+ else:
+ with lock_parent_directory(
+ self.pending_file, self.pending_timeout):
+ with open(self.pending_file, 'a+b') as fp:
+ # Colons aren't used in base64 encoding; so they are our
+ # delimiter
+ fp.write(':')
+ fp.write(pickle.dumps(
+ (name, timestamp, size, content_type, etag, deleted),
+ protocol=PICKLE_PROTOCOL).encode('base64'))
+ fp.flush()
+
+ def is_deleted(self, timestamp=None):
+ """
+ Check if the DB is considered to be deleted.
+
+ :returns: True if the DB is considered to be deleted, False otherwise
+ """
+ if self.db_file != ':memory:' and not os.path.exists(self.db_file):
+ return True
+ try:
+ self._commit_puts()
+ except LockTimeout:
+ if not self.stale_reads_ok:
+ raise
+ with self.get() as conn:
+ row = conn.execute('''
+ SELECT put_timestamp, delete_timestamp, object_count
+ FROM container_stat''').fetchone()
+ # leave this db as a tombstone for a consistency window
+ if timestamp and row['delete_timestamp'] > timestamp:
+ return False
+ # The container is considered deleted if the delete_timestamp
+ # value is greater than the put_timestamp, and there are no
+ # objects in the container.
+ return (row['object_count'] in (None, '', 0, '0')) and \
+ (float(row['delete_timestamp']) > float(row['put_timestamp']))
+
+ def get_info(self):
+ """
+ Get global data for the container.
+
+ :returns: a tuple of (account, container, created_at, put_timestamp,
+ delete_timestamp, object_count, bytes_used,
+ reported_put_timestamp, reported_delete_timestamp,
+ reported_object_count, reported_bytes_used, hash, id)
+ """
+ try:
+ self._commit_puts()
+ except LockTimeout:
+ if not self.stale_reads_ok:
+ raise
+ with self.get() as conn:
+ return conn.execute('''
+ SELECT account, container, created_at, put_timestamp,
+ delete_timestamp, object_count, bytes_used,
+ reported_put_timestamp, reported_delete_timestamp,
+ reported_object_count, reported_bytes_used, hash, id
+ FROM container_stat
+ ''').fetchone()
+
+ def reported(self, put_timestamp, delete_timestamp, object_count,
+ bytes_used):
+ """
+ Update reported stats.
+
+ :param put_timestamp: put_timestamp to update
+ :param delete_timestamp: delete_timestamp to update
+ :param object_count: object_count to update
+ :param bytes_used: bytes_used to update
+ """
+ with self.get() as conn:
+ conn.execute('''
+ UPDATE container_stat
+ SET reported_put_timestamp = ?, reported_delete_timestamp = ?,
+ reported_object_count = ?, reported_bytes_used = ?
+ ''', (put_timestamp, delete_timestamp, object_count, bytes_used))
+ conn.commit()
+
+ def get_random_objects(self, max_count=100):
+ """
+ Get random objects from the DB. This is used by the container_auditor
+ when testing random objects for existence.
+
+ :param max_count: maximum number of objects to get
+
+ :returns: list of object names
+ """
+ try:
+ self._commit_puts()
+ except LockTimeout:
+ if not self.stale_reads_ok:
+ raise
+ rv = []
+ with self.get() as conn:
+ row = conn.execute('''
+ SELECT ROWID FROM object ORDER BY ROWID DESC LIMIT 1
+ ''').fetchone()
+ if not row:
+ return []
+ max_rowid = row['ROWID']
+ for _ in xrange(min(max_count, max_rowid)):
+ row = conn.execute('''
+ SELECT name FROM object WHERE ROWID >= ? AND +deleted = 0
+ LIMIT 1
+ ''', (randint(0, max_rowid),)).fetchone()
+ if row:
+ rv.append(row['name'])
+ return list(set(rv))
+
+ def list_objects_iter(self, limit, marker, prefix, delimiter, path=None,
+ format=None):
+ """
+ Get a list of objects sorted by name starting at marker onward, up
+ to limit entries. Entries will begin with the prefix and will not
+ have the delimiter after the prefix.
+
+ :param limit: maximum number of entries to get
+ :param marker: marker query
+ :param prefix: prefix query
+ :param delimeter: delimeter for query
+ :param path: if defined, will set the prefix and delimter based on
+ the path
+ :param format: TOOD: remove as it is no longer used
+
+ :returns: list of tuples of (name, created_at, size, content_type,
+ etag)
+ """
+ try:
+ self._commit_puts()
+ except LockTimeout:
+ if not self.stale_reads_ok:
+ raise
+ if path is not None:
+ prefix = path
+ if path:
+ prefix = path = path.rstrip('/') + '/'
+ delimiter = '/'
+ elif delimiter and not prefix:
+ prefix = ''
+ orig_marker = marker
+ with self.get() as conn:
+ results = []
+ while len(results) < limit:
+ query = '''SELECT name, created_at, size, content_type, etag
+ FROM object WHERE'''
+ query_args = []
+ if marker and marker >= prefix:
+ query += ' name > ? AND'
+ query_args.append(marker)
+ elif prefix:
+ query += ' name >= ? AND'
+ query_args.append(prefix)
+ query += ' +deleted = 0 ORDER BY name LIMIT ?'
+ query_args.append(limit - len(results))
+ curs = conn.execute(query, query_args)
+ curs.row_factory = None
+
+ if prefix is None:
+ return [r for r in curs]
+ if not delimiter:
+ return [r for r in curs if r[0].startswith(prefix)]
+ rowcount = 0
+ for row in curs:
+ rowcount += 1
+ marker = name = row[0]
+ if len(results) >= limit or not name.startswith(prefix):
+ curs.close()
+ return results
+ end = name.find(delimiter, len(prefix))
+ if path is not None:
+ if name == path:
+ continue
+ if end >= 0 and len(name) > end + len(delimiter):
+ marker = name[:end] + chr(ord(delimiter) + 1)
+ curs.close()
+ break
+ elif end > 0:
+ marker = name[:end] + chr(ord(delimiter) + 1)
+ dir_name = name[:end + 1]
+ if dir_name != orig_marker:
+ results.append([dir_name, '0', 0, None, ''])
+ curs.close()
+ break
+ results.append(row)
+ if not rowcount:
+ break
+ return results
+
+ def merge_items(self, item_list, source=None):
+ """
+ Merge items into the object table.
+
+ :param item_list: list of dictionaries of {'name', 'created_at',
+ 'size', 'content_type', 'etag', 'deleted'}
+ :param source: if defined, update incoming_sync with the source
+ """
+ with self.get() as conn:
+ max_rowid = -1
+ for rec in item_list:
+ curs = conn.execute('''
+ DELETE FROM object WHERE name = ? AND
+ (created_at < ?)
+ ''', (rec['name'], rec['created_at']))
+ try:
+ conn.execute('''
+ INSERT INTO object (name, created_at, size,
+ content_type, etag, deleted)
+ VALUES (?, ?, ?, ?, ?, ?)
+ ''', ([rec['name'], rec['created_at'], rec['size'],
+ rec['content_type'], rec['etag'], rec['deleted']]))
+ except sqlite3.IntegrityError:
+ pass
+ if source:
+ max_rowid = max(max_rowid, rec['ROWID'])
+ if source:
+ try:
+ conn.execute('''
+ INSERT INTO incoming_sync (sync_point, remote_id)
+ VALUES (?, ?)
+ ''', (max_rowid, source))
+ except sqlite3.IntegrityError:
+ conn.execute('''
+ UPDATE incoming_sync SET sync_point=max(?, sync_point)
+ WHERE remote_id=?
+ ''', (max_rowid, source))
+ conn.commit()
+
+
+class AccountBroker(DatabaseBroker):
+ """Encapsulates working with a account database."""
+ db_type = 'account'
+ db_contains_type = 'container'
+
+ def _initialize(self, conn, put_timestamp):
+ """
+ Create a brand new database (tables, indices, triggers, etc.)
+
+ :param conn: DB connection object
+ :param put_timestamp: put timestamp
+ """
+ if not self.account:
+ raise ValueError(
+ 'Attempting to create a new database with no account set')
+ self.create_container_table(conn)
+ self.create_account_stat_table(conn, put_timestamp)
+
+ def create_container_table(self, conn):
+ """
+ Create container table which is specific to the account DB.
+
+ :param conn: DB connection object
+ """
+ conn.executescript("""
+ CREATE TABLE container (
+ ROWID INTEGER PRIMARY KEY AUTOINCREMENT,
+ name TEXT UNIQUE,
+ put_timestamp TEXT,
+ delete_timestamp TEXT,
+ object_count INTEGER,
+ bytes_used INTEGER,
+ deleted INTEGER DEFAULT 0
+ );
+
+ CREATE INDEX ix_container_deleted ON container (deleted);
+ CREATE INDEX ix_container_name ON container (name);
+ CREATE TRIGGER container_insert AFTER INSERT ON container
+ BEGIN
+ UPDATE account_stat
+ SET container_count = container_count + (1 - new.deleted),
+ object_count = object_count + new.object_count,
+ bytes_used = bytes_used + new.bytes_used,
+ hash = chexor(hash, new.name,
+ new.put_timestamp || '-' ||
+ new.delete_timestamp || '-' ||
+ new.object_count || '-' || new.bytes_used);
+ END;
+
+ CREATE TRIGGER container_update BEFORE UPDATE ON container
+ BEGIN
+ SELECT RAISE(FAIL, 'UPDATE not allowed; DELETE and INSERT');
+ END;
+
+
+ CREATE TRIGGER container_delete AFTER DELETE ON container
+ BEGIN
+ UPDATE account_stat
+ SET container_count = container_count - (1 - old.deleted),
+ object_count = object_count - old.object_count,
+ bytes_used = bytes_used - old.bytes_used,
+ hash = chexor(hash, old.name,
+ old.put_timestamp || '-' ||
+ old.delete_timestamp || '-' ||
+ old.object_count || '-' || old.bytes_used);
+ END;
+ """)
+
+ def create_account_stat_table(self, conn, put_timestamp):
+ """
+ Create account_stat table which is specific to the account DB.
+
+ :param conn: DB connection object
+ :param put_timestamp: put timestamp
+ """
+ conn.executescript("""
+ CREATE TABLE account_stat (
+ account TEXT,
+ created_at TEXT,
+ put_timestamp TEXT DEFAULT '0',
+ delete_timestamp TEXT DEFAULT '0',
+ container_count INTEGER,
+ object_count INTEGER DEFAULT 0,
+ bytes_used INTEGER DEFAULT 0,
+ hash TEXT default '00000000000000000000000000000000',
+ id TEXT,
+ status TEXT DEFAULT '',
+ status_changed_at TEXT DEFAULT '0'
+ );
+
+ INSERT INTO account_stat (container_count) VALUES (0);
+ """)
+
+ conn.execute('''
+ UPDATE account_stat SET account = ?, created_at = ?, id = ?,
+ put_timestamp = ?
+ ''', (self.account, normalize_timestamp(time.time()), str(uuid4()),
+ put_timestamp))
+
+ def update_put_timestamp(self, timestamp):
+ """
+ Update the put_timestamp. Only modifies it if it is greater than
+ the current timestamp.
+
+ :param timestamp: put timestamp
+ """
+ with self.get() as conn:
+ conn.execute('''
+ UPDATE account_stat SET put_timestamp = ?
+ WHERE put_timestamp < ? ''', (timestamp, timestamp))
+ conn.commit()
+
+ def _delete_db(self, conn, timestamp, force=False):
+ """
+ Mark the DB as deleted.
+
+ :param conn: DB connection object
+ :param timestamp: timestamp to mark as deleted
+ """
+ conn.execute("""
+ UPDATE account_stat
+ SET delete_timestamp = ?,
+ status = 'DELETED',
+ status_changed_at = ?
+ WHERE delete_timestamp < ? """, (timestamp, timestamp, timestamp))
+
+ def _commit_puts(self, item_list=None):
+ """Handles commiting rows in .pending files."""
+ if self.db_file == ':memory:' or not os.path.exists(self.pending_file):
+ return
+ if item_list is None:
+ item_list = []
+ with lock_parent_directory(self.pending_file, self.pending_timeout):
+ self._preallocate()
+ if not os.path.getsize(self.pending_file):
+ if item_list:
+ self.merge_items(item_list)
+ return
+ with open(self.pending_file, 'r+b') as fp:
+ for entry in fp.read().split(':'):
+ if entry:
+ try:
+ (name, put_timestamp, delete_timestamp,
+ object_count, bytes_used, deleted) = \
+ pickle.loads(entry.decode('base64'))
+ item_list.append({'name': name,
+ 'put_timestamp': put_timestamp,
+ 'delete_timestamp': delete_timestamp,
+ 'object_count': object_count,
+ 'bytes_used': bytes_used,
+ 'deleted': deleted})
+ except:
+ self.logger.exception(
+ 'Invalid pending entry %s: %s'
+ % (self.pending_file, entry))
+ if item_list:
+ self.merge_items(item_list)
+ try:
+ os.ftruncate(fp.fileno(), 0)
+ except OSError, err:
+ if err.errno != errno.ENOENT:
+ raise
+
+ def empty(self):
+ """
+ Check if the account DB is empty.
+
+ :returns: True if the database has no active containers.
+ """
+ try:
+ self._commit_puts()
+ except LockTimeout:
+ if not self.stale_reads_ok:
+ raise
+ with self.get() as conn:
+ row = conn.execute(
+ 'SELECT container_count from account_stat').fetchone()
+ return (row[0] == 0)
+
+ def reclaim(self, container_timestamp, sync_timestamp):
+ """
+ Delete rows from the container table that are marked deleted and
+ whose created_at timestamp is < object_timestamp. Also deletes rows
+ from incoming_sync and outgoing_sync where the updated_at timestamp is
+ < sync_timestamp.
+
+ :param object_timestamp: max created_at timestamp of container rows to
+ delete
+ :param sync_timestamp: max update_at timestamp of sync rows to delete
+ """
+
+ self._commit_puts()
+ with self.get() as conn:
+ conn.execute('''
+ DELETE FROM container WHERE
+ deleted = 1 AND delete_timestamp < ?
+ ''', (container_timestamp,))
+ try:
+ conn.execute('''
+ DELETE FROM outgoing_sync WHERE updated_at < ?
+ ''', (sync_timestamp,))
+ conn.execute('''
+ DELETE FROM incoming_sync WHERE updated_at < ?
+ ''', (sync_timestamp,))
+ except sqlite3.OperationalError, err:
+ # Old dbs didn't have updated_at in the _sync tables.
+ if 'no such column: updated_at' not in str(err):
+ raise
+ conn.commit()
+
+ def get_container_timestamp(self, container_name):
+ """
+ Get the put_timestamp of a container.
+
+ :param container_name: container name
+
+ :returns: put_timestamp of the container
+ """
+ try:
+ self._commit_puts()
+ except LockTimeout:
+ if not self.stale_reads_ok:
+ raise
+ with self.get() as conn:
+ ret = conn.execute('''
+ SELECT put_timestamp FROM container
+ WHERE name = ? AND deleted != 1''',
+ (container_name,)).fetchone()
+ if ret:
+ ret = ret[0]
+ return ret
+
+ def put_container(self, name, put_timestamp, delete_timestamp,
+ object_count, bytes_used):
+ """
+ Create a container with the given attributes.
+
+ :param name: name of the container to create
+ :param put_timestamp: put_timestamp of the container to create
+ :param delete_timestamp: delete_timestamp of the container to create
+ :param object_count: number of objects in the container
+ :param bytes_used: number of bytes used by the container
+ """
+ if delete_timestamp > put_timestamp and \
+ object_count in (None, '', 0, '0'):
+ deleted = 1
+ else:
+ deleted = 0
+ record = {'name': name, 'put_timestamp': put_timestamp,
+ 'delete_timestamp': delete_timestamp,
+ 'object_count': object_count,
+ 'bytes_used': bytes_used,
+ 'deleted': deleted}
+ if self.db_file == ':memory:':
+ self.merge_items([record])
+ return
+ commit = False
+ with lock_parent_directory(self.pending_file, self.pending_timeout):
+ with open(self.pending_file, 'a+b') as fp:
+ # Colons aren't used in base64 encoding; so they are our
+ # delimiter
+ fp.write(':')
+ fp.write(pickle.dumps(
+ (name, put_timestamp, delete_timestamp, object_count,
+ bytes_used, deleted),
+ protocol=PICKLE_PROTOCOL).encode('base64'))
+ fp.flush()
+ if fp.tell() > PENDING_CAP:
+ commit = True
+ if commit:
+ self._commit_puts()
+
+ def can_delete_db(self, cutoff):
+ """
+ Check if the accont DB can be deleted.
+
+ :returns: True if the account can be deleted, False otherwise
+ """
+ self._commit_puts()
+ with self.get() as conn:
+ row = conn.execute('''
+ SELECT status, put_timestamp, delete_timestamp, container_count
+ FROM account_stat''').fetchone()
+ # The account is considered deleted if its status is marked
+ # as 'DELETED" and the delete_timestamp is older than the supplied
+ # cutoff date; or if the delete_timestamp value is greater than
+ # the put_timestamp, and there are no containers for the account
+ status_del = (row['status'] == 'DELETED')
+ deltime = float(row['delete_timestamp'])
+ past_cutoff = (deltime < cutoff)
+ time_later = (row['delete_timestamp'] > row['put_timestamp'])
+ no_containers = (row['container_count'] in (None, '', 0, '0'))
+ return (
+ (status_del and past_cutoff) or (time_later and no_containers))
+
+ def is_deleted(self):
+ """
+ Check if the account DB is considered to be deleted.
+
+ :returns: True if the account DB is considered to be deleted, False
+ otherwise
+ """
+ if self.db_file != ':memory:' and not os.path.exists(self.db_file):
+ return True
+ try:
+ self._commit_puts()
+ except LockTimeout:
+ if not self.stale_reads_ok:
+ raise
+ with self.get() as conn:
+ row = conn.execute('''
+ SELECT put_timestamp, delete_timestamp, container_count, status
+ FROM account_stat''').fetchone()
+ return row['status'] == 'DELETED' or (
+ row['container_count'] in (None, '', 0, '0') and
+ row['delete_timestamp'] > row['put_timestamp'])
+
+ def is_status_deleted(self):
+ """Only returns true if the status field is set to DELETED."""
+ with self.get() as conn:
+ row = conn.execute('''
+ SELECT status
+ FROM account_stat''').fetchone()
+ return (row['status'] == "DELETED")
+
+ def get_info(self):
+ """
+ Get global data for the account.
+
+ :returns: a tuple of (account, created_at, put_timestamp,
+ delete_timestamp, container_count, object_count,
+ bytes_used, hash, id)
+ """
+ try:
+ self._commit_puts()
+ except LockTimeout:
+ if not self.stale_reads_ok:
+ raise
+ with self.get() as conn:
+ return conn.execute('''
+ SELECT account, created_at, put_timestamp, delete_timestamp,
+ container_count, object_count, bytes_used, hash, id
+ FROM account_stat
+ ''').fetchone()
+
+ def get_random_containers(self, max_count=100):
+ """
+ Get random containers from the DB. This is used by the
+ account_auditor when testing random containerss for existence.
+
+ :param max_count: maximum number of containers to get
+
+ :returns: list of container names
+ """
+ try:
+ self._commit_puts()
+ except LockTimeout:
+ if not self.stale_reads_ok:
+ raise
+ rv = []
+ with self.get() as conn:
+ row = conn.execute('''
+ SELECT ROWID FROM container ORDER BY ROWID DESC LIMIT 1
+ ''').fetchone()
+ if not row:
+ return []
+ max_rowid = row['ROWID']
+ for _ in xrange(min(max_count, max_rowid)):
+ row = conn.execute('''
+ SELECT name FROM container WHERE
+ ROWID >= ? AND +deleted = 0
+ LIMIT 1
+ ''', (randint(0, max_rowid),)).fetchone()
+ if row:
+ rv.append(row['name'])
+ return list(set(rv))
+
+ def list_containers_iter(self, limit, marker, prefix, delimiter):
+ """
+ Get a list of containerss sorted by name starting at marker onward, up
+ to limit entries. Entries will begin with the prefix and will not
+ have the delimiter after the prefix.
+
+ :param limit: maximum number of entries to get
+ :param marker: marker query
+ :param prefix: prefix query
+ :param delimeter: delimeter for query
+
+ :returns: list of tuples of (name, object_count, bytes_used, 0)
+ """
+ try:
+ self._commit_puts()
+ except LockTimeout:
+ if not self.stale_reads_ok:
+ raise
+ if delimiter and not prefix:
+ prefix = ''
+ orig_marker = marker
+ with self.get() as conn:
+ results = []
+ while len(results) < limit:
+ query = """
+ SELECT name, object_count, bytes_used, 0
+ FROM container
+ WHERE deleted = 0 AND """
+ query_args = []
+ if marker and marker >= prefix:
+ query += ' name > ? AND'
+ query_args.append(marker)
+ elif prefix:
+ query += ' name >= ? AND'
+ query_args.append(prefix)
+ query += ' +deleted = 0 ORDER BY name LIMIT ?'
+ query_args.append(limit - len(results))
+ curs = conn.execute(query, query_args)
+ curs.row_factory = None
+
+ if prefix is None:
+ return [r for r in curs]
+ if not delimiter:
+ return [r for r in curs if r[0].startswith(prefix)]
+ rowcount = 0
+ for row in curs:
+ rowcount += 1
+ marker = name = row[0]
+ if len(results) >= limit or not name.startswith(prefix):
+ curs.close()
+ return results
+ end = name.find(delimiter, len(prefix))
+ if end > 0:
+ marker = name[:end] + chr(ord(delimiter) + 1)
+ dir_name = name[:end + 1]
+ if dir_name != orig_marker:
+ results.append([dir_name, 0, 0, 1])
+ curs.close()
+ break
+ results.append(row)
+ if not rowcount:
+ break
+ return results
+
+ def merge_items(self, item_list, source=None):
+ """
+ Merge items into the container table.
+
+ :param item_list: list of dictionaries of {'name', 'put_timestamp',
+ 'delete_timestamp', 'object_count', 'bytes_used',
+ 'deleted'}
+ :param source: if defined, update incoming_sync with the source
+ """
+ with self.get() as conn:
+ max_rowid = -1
+ for rec in item_list:
+ record = [rec['name'], rec['put_timestamp'],
+ rec['delete_timestamp'], rec['object_count'],
+ rec['bytes_used'], rec['deleted']]
+ try:
+ conn.execute('''
+ INSERT INTO container (name, put_timestamp,
+ delete_timestamp, object_count, bytes_used,
+ deleted)
+ VALUES (?, ?, ?, ?, ?, ?)
+ ''', record)
+ except sqlite3.IntegrityError:
+ curs = conn.execute('''
+ SELECT name, put_timestamp, delete_timestamp,
+ object_count, bytes_used, deleted
+ FROM container WHERE name = ? AND
+ (put_timestamp < ? OR delete_timestamp < ? OR
+ object_count != ? OR bytes_used != ?)''',
+ (rec['name'], rec['put_timestamp'],
+ rec['delete_timestamp'], rec['object_count'],
+ rec['bytes_used']))
+ curs.row_factory = None
+ row = curs.fetchone()
+ if row:
+ row = list(row)
+ for i in xrange(5):
+ if record[i] is None and row[i] is not None:
+ record[i] = row[i]
+ if row[1] > record[1]: # Keep newest put_timestamp
+ record[1] = row[1]
+ if row[2] > record[2]: # Keep newest delete_timestamp
+ record[2] = row[2]
+ conn.execute('DELETE FROM container WHERE name = ?',
+ (record[0],))
+ # If deleted, mark as such
+ if record[2] > record[1] and \
+ record[3] in (None, '', 0, '0'):
+ record[5] = 1
+ else:
+ record[5] = 0
+ try:
+ conn.execute('''
+ INSERT INTO container (name, put_timestamp,
+ delete_timestamp, object_count, bytes_used,
+ deleted)
+ VALUES (?, ?, ?, ?, ?, ?)
+ ''', record)
+ except sqlite3.IntegrityError:
+ continue
+ if source:
+ max_rowid = max(max_rowid, rec['ROWID'])
+ if source:
+ try:
+ conn.execute('''
+ INSERT INTO incoming_sync (sync_point, remote_id)
+ VALUES (?, ?)
+ ''', (max_rowid, source))
+ except sqlite3.IntegrityError:
+ conn.execute('''
+ UPDATE incoming_sync SET sync_point=max(?, sync_point)
+ WHERE remote_id=?
+ ''', (max_rowid, source))
+ conn.commit()
diff --git a/swift/common/db_replicator.py b/swift/common/db_replicator.py
new file mode 100644
index 000000000..b8df33038
--- /dev/null
+++ b/swift/common/db_replicator.py
@@ -0,0 +1,526 @@
+# Copyright (c) 2010 OpenStack, LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import with_statement
+import sys
+import os
+import random
+import math
+import time
+import shutil
+
+from eventlet import GreenPool, sleep, Timeout
+from eventlet.green import subprocess
+import simplejson
+from webob import Response
+from webob.exc import HTTPNotFound, HTTPNoContent, HTTPAccepted, \
+ HTTPInsufficientStorage, HTTPBadRequest
+
+from swift.common.utils import get_logger, whataremyips, storage_directory, \
+ renamer, mkdirs, lock_parent_directory, unlink_older_than, LoggerFileObject
+from swift.common import ring
+from swift.common.bufferedhttp import BufferedHTTPConnection
+from swift.common.exceptions import DriveNotMounted, ConnectionTimeout
+
+
+def quarantine_db(object_file, server_type):
+ """
+ In the case that a corrupt file is found, move it to a quarantined area to
+ allow replication to fix it.
+
+ :param object_file: path to corrupt file
+ :param server_type: type of file that is corrupt
+ ('container' or 'account')
+ """
+ object_dir = os.path.dirname(object_file)
+ quarantine_dir = os.path.abspath(os.path.join(object_dir, '..',
+ '..', '..', '..', 'quarantined', server_type + 's',
+ os.path.basename(object_dir)))
+ renamer(object_dir, quarantine_dir)
+
+
+class ReplConnection(BufferedHTTPConnection):
+ """
+ Helper to simplify POSTing to a remote server.
+ """
+ def __init__(self, node, partition, hash_, logger):
+ ""
+ self.logger = logger
+ self.node = node
+ BufferedHTTPConnection.__init__(self, '%(ip)s:%(port)s' % node)
+ self.path = '/%s/%s/%s' % (node['device'], partition, hash_)
+
+ def post(self, *args):
+ """
+ Make an HTTP POST request
+
+ :param args: list of json-encodable objects
+
+ :returns: httplib response object
+ """
+ try:
+ body = simplejson.dumps(args)
+ self.request('POST', self.path, body,
+ {'Content-Type': 'application/json'})
+ response = self.getresponse()
+ response.data = response.read()
+ return response
+ except:
+ self.logger.exception(
+ 'ERROR reading HTTP response from %s' % self.node)
+ return None
+
+
+class Replicator(object):
+ """
+ Implements the logic for directing db replication.
+ """
+
+ def __init__(self, server_conf, replicator_conf):
+ self.logger = \
+ get_logger(replicator_conf, '%s-replicator' % self.server_type)
+ # log uncaught exceptions
+ sys.excepthook = lambda *exc_info: \
+ self.logger.critical('UNCAUGHT EXCEPTION', exc_info=exc_info)
+ sys.stdout = sys.stderr = LoggerFileObject(self.logger)
+ self.root = server_conf.get('devices', '/srv/node')
+ self.mount_check = server_conf.get('mount_check', 'true').lower() in \
+ ('true', 't', '1', 'on', 'yes', 'y')
+ self.port = int(server_conf.get('bind_port', self.default_port))
+ concurrency = int(replicator_conf.get('concurrency', 8))
+ self.cpool = GreenPool(size=concurrency)
+ swift_dir = server_conf.get('swift_dir', '/etc/swift')
+ self.ring = ring.Ring(os.path.join(swift_dir, self.ring_file))
+ self.per_diff = int(replicator_conf.get('per_diff', 1000))
+ self.run_pause = int(replicator_conf.get('run_pause', 30))
+ self.vm_test_mode = replicator_conf.get(
+ 'vm_test_mode', 'no').lower() in ('yes', 'true', 'on', '1')
+ self.node_timeout = int(replicator_conf.get('node_timeout', 10))
+ self.conn_timeout = float(replicator_conf.get('conn_timeout', 0.5))
+ self.reclaim_age = float(replicator_conf.get('reclaim_age', 86400 * 7))
+ self._zero_stats()
+
+ def _zero_stats(self):
+ """Zero out the stats."""
+ self.stats = {'attempted': 0, 'success': 0, 'failure': 0, 'ts_repl': 0,
+ 'no_change': 0, 'hashmatch': 0, 'rsync': 0, 'diff': 0,
+ 'remove': 0, 'empty': 0, 'remote_merge': 0,
+ 'start': time.time()}
+
+ def _report_stats(self):
+ """Report the current stats to the logs."""
+ self.logger.info(
+ 'Attempted to replicate %d dbs in %.5f seconds (%.5f/s)'
+ % (self.stats['attempted'], time.time() - self.stats['start'],
+ self.stats['attempted'] /
+ (time.time() - self.stats['start'] + 0.0000001)))
+ self.logger.info('Removed %(remove)d dbs' % self.stats)
+ self.logger.info('%(success)s successes, %(failure)s failures'
+ % self.stats)
+ self.logger.info(' '.join(['%s:%s' % item for item in
+ self.stats.items() if item[0] in
+ ('no_change', 'hashmatch', 'rsync', 'diff', 'ts_repl', 'empty')]))
+
+ def _rsync_file(self, db_file, remote_file, whole_file=True):
+ """
+ Sync a single file using rsync. Used by _rsync_db to handle syncing.
+
+ :param db_file: file to be synced
+ :param remote_file: remote location to sync the DB file to
+ :param whole-file: if True, uses rsync's --whole-file flag
+
+ :returns: True if the sync was successful, False otherwise
+ """
+ popen_args = ['rsync', '--quiet', '--no-motd',
+ '--timeout=%s' % int(math.ceil(self.node_timeout)),
+ '--contimeout=%s' % int(math.ceil(self.conn_timeout))]
+ if whole_file:
+ popen_args.append('--whole-file')
+ popen_args.extend([db_file, remote_file])
+ proc = subprocess.Popen(popen_args)
+ proc.communicate()
+ if proc.returncode != 0:
+ self.logger.error('ERROR rsync failed with %s: %s' %
+ (proc.returncode, popen_args))
+ return proc.returncode == 0
+
+ def _rsync_db(self, broker, device, http, local_id,
+ post_method='complete_rsync', post_timeout=None):
+ """
+ Sync a whole db using rsync.
+
+ :param broker: DB broker object of DB to be synced
+ :param device: device to sync to
+ :param http: ReplConnection object
+ :param local_id: unique ID of the local database replica
+ :param post_method: remote operation to perform after rsync
+ :param post_timeout: timeout to wait in seconds
+ """
+ if self.vm_test_mode:
+ remote_file = '%s::%s%s/%s/tmp/%s' % (device['ip'],
+ self.server_type, device['port'], device['device'],
+ local_id)
+ else:
+ remote_file = '%s::%s/%s/tmp/%s' % (device['ip'],
+ self.server_type, device['device'], local_id)
+ mtime = os.path.getmtime(broker.db_file)
+ if not self._rsync_file(broker.db_file, remote_file):
+ return False
+ # perform block-level sync if the db was modified during the first sync
+ if os.path.exists(broker.db_file + '-journal') or \
+ os.path.getmtime(broker.db_file) > mtime:
+ # grab a lock so nobody else can modify it
+ with broker.lock():
+ if not self._rsync_file(broker.db_file, remote_file, False):
+ return False
+ with Timeout(post_timeout or self.node_timeout):
+ response = http.post(post_method, local_id)
+ return response and response.status >= 200 and response.status < 300
+
+ def _usync_db(self, point, broker, http, remote_id, local_id):
+ """
+ Sync a db by sending all records since the last sync.
+
+ :param point: synchronization high water mark between the replicas
+ :param broker: database broker object
+ :param http: ReplConnection object for the remote server
+ :param remote_id: database id for the remote replica
+ :param local_id: database id for the local replica
+
+ :returns: boolean indicating completion and success
+ """
+ self.stats['diff'] += 1
+ self.logger.debug('Syncing chunks with %s', http.host)
+ sync_table = broker.get_syncs()
+ objects = broker.get_items_since(point, self.per_diff)
+ while len(objects):
+ with Timeout(self.node_timeout):
+ response = http.post('merge_items', objects, local_id)
+ if not response or response.status >= 300 or response.status < 200:
+ if response:
+ self.logger.error('ERROR Bad response %s from %s' %
+ (response.status, http.host))
+ return False
+ point = objects[-1]['ROWID']
+ objects = broker.get_items_since(point, self.per_diff)
+ with Timeout(self.node_timeout):
+ response = http.post('merge_syncs', sync_table)
+ if response and response.status >= 200 and response.status < 300:
+ broker.merge_syncs([{'remote_id': remote_id,
+ 'sync_point': point}], incoming=False)
+ return True
+ return False
+
+ def _in_sync(self, rinfo, info, broker, local_sync):
+ """
+ Determine whether or not two replicas of a databases are considered
+ to be in sync.
+
+ :param rinfo: remote database info
+ :param info: local database info
+ :param broker: database broker object
+ :param local_sync: cached last sync point between replicas
+
+ :returns: boolean indicating whether or not the replicas are in sync
+ """
+ if max(rinfo['point'], local_sync) >= info['max_row']:
+ self.stats['no_change'] += 1
+ return True
+ if rinfo['hash'] == info['hash']:
+ self.stats['hashmatch'] += 1
+ broker.merge_syncs([{'remote_id': rinfo['id'],
+ 'sync_point': rinfo['point']}], incoming=False)
+ return True
+
+ def _http_connect(self, node, partition, db_file):
+ """
+ Make an http_connection using ReplConnection
+
+ :param node: node dictionary from the ring
+ :param partition: partition partition to send in the url
+ :param db_file: DB file
+
+ :returns: ReplConnection object
+ """
+ return ReplConnection(node, partition,
+ os.path.basename(db_file).split('.', 1)[0], self.logger)
+
+ def _repl_to_node(self, node, broker, partition, info):
+ """
+ Replicate a database to a node.
+
+ :param node: node dictionary from the ring to be replicated to
+ :param broker: DB broker for the DB to be replication
+ :param partition: partition on the node to replicate to
+ :param info: DB info as a dictionary of {'max_row', 'hash', 'id',
+ 'created_at', 'put_timestamp', 'delete_timestamp'}
+
+ :returns: True if successful, False otherwise
+ """
+ with ConnectionTimeout(self.conn_timeout):
+ http = self._http_connect(node, partition, broker.db_file)
+ if not http:
+ self.logger.error(
+ 'ERROR Unable to connect to remote server: %s' % node)
+ return False
+ with Timeout(self.node_timeout):
+ response = http.post('sync', info['max_row'], info['hash'],
+ info['id'], info['created_at'], info['put_timestamp'],
+ info['delete_timestamp'])
+ if not response:
+ return False
+ elif response.status == HTTPNotFound.code: # completely missing, rsync
+ self.stats['rsync'] += 1
+ return self._rsync_db(broker, node, http, info['id'])
+ elif response.status == HTTPInsufficientStorage.code:
+ raise DriveNotMounted()
+ elif response.status >= 200 and response.status < 300:
+ rinfo = simplejson.loads(response.data)
+ local_sync = broker.get_sync(rinfo['id'], incoming=False)
+ if self._in_sync(rinfo, info, broker, local_sync):
+ return True
+ # if the difference in rowids between the two differs by
+ # more than 50%, rsync then do a remote merge.
+ if rinfo['max_row'] / float(info['max_row']) < 0.5:
+ self.stats['remote_merge'] += 1
+ return self._rsync_db(broker, node, http, info['id'],
+ post_method='rsync_then_merge',
+ post_timeout=(info['count'] / 2000))
+ # else send diffs over to the remote server
+ return self._usync_db(max(rinfo['point'], local_sync),
+ broker, http, rinfo['id'], info['id'])
+
+ def _replicate_object(self, partition, object_file, node_id):
+ """
+ Replicate the db, choosing method based on whether or not it
+ already exists on peers.
+
+ :param partition: partition to be replicated to
+ :param object_file: DB file name to be replicated
+ :param node_id: node id of the node to be replicated to
+ """
+ self.logger.debug('Replicating db %s' % object_file)
+ self.stats['attempted'] += 1
+ try:
+ broker = self.brokerclass(object_file, pending_timeout=30)
+ broker.reclaim(time.time() - self.reclaim_age,
+ time.time() - (self.reclaim_age * 2))
+ info = broker.get_replication_info()
+ except Exception, e:
+ if 'no such table' in str(e):
+ self.logger.error('Quarantining DB %s' % object_file)
+ quarantine_db(broker.db_file, broker.db_type)
+ else:
+ self.logger.exception('ERROR reading db %s' % object_file)
+ self.stats['failure'] += 1
+ return
+ # The db is considered deleted if the delete_timestamp value is greater
+ # than the put_timestamp, and there are no objects.
+ delete_timestamp = 0
+ try:
+ delete_timestamp = float(info['delete_timestamp'])
+ except ValueError:
+ pass
+ put_timestamp = 0
+ try:
+ put_timestamp = float(info['put_timestamp'])
+ except ValueError:
+ pass
+ if delete_timestamp < (time.time() - self.reclaim_age) and \
+ delete_timestamp > put_timestamp and \
+ info['count'] in (None, '', 0, '0'):
+ with lock_parent_directory(object_file):
+ shutil.rmtree(os.path.dirname(object_file), True)
+ self.stats['remove'] += 1
+ return
+ responses = []
+ nodes = self.ring.get_part_nodes(int(partition))
+ shouldbehere = bool([n for n in nodes if n['id'] == node_id])
+ repl_nodes = [n for n in nodes if n['id'] != node_id]
+ more_nodes = self.ring.get_more_nodes(int(partition))
+ for node in repl_nodes:
+ success = False
+ try:
+ success = self._repl_to_node(node, broker, partition, info)
+ except DriveNotMounted:
+ repl_nodes.append(more_nodes.next())
+ self.logger.error('ERROR Remote drive not mounted %s' % node)
+ except:
+ self.logger.exception('ERROR syncing %s with node %s' %
+ (object_file, node))
+ self.stats['success' if success else 'failure'] += 1
+ responses.append(success)
+ if not shouldbehere and all(responses):
+ # If the db shouldn't be on this node and has been successfully
+ # synced to all of its peers, it can be removed.
+ with lock_parent_directory(object_file):
+ shutil.rmtree(os.path.dirname(object_file), True)
+ self.stats['remove'] += 1
+
+ def roundrobin_datadirs(self, datadirs):
+ """
+ Generator to walk the data dirs in a round robin manner, evenly
+ hitting each device on the system.
+
+ :param datadirs: a list of paths to walk
+ """
+ def walk_datadir(datadir, node_id):
+ partitions = os.listdir(datadir)
+ random.shuffle(partitions)
+ for partition in partitions:
+ part_dir = os.path.join(datadir, partition)
+ for root, dirs, files in os.walk(part_dir, topdown=False):
+ for fname in (f for f in files if f.endswith('.db')):
+ object_file = os.path.join(root, fname)
+ yield (partition, object_file, node_id)
+ its = [walk_datadir(datadir, node_id) for datadir, node_id in datadirs]
+ while its:
+ for it in its:
+ try:
+ yield it.next()
+ except StopIteration:
+ its.remove(it)
+
+ def replicate_once(self):
+ """Run a replication pass once."""
+ self._zero_stats()
+ dirs = []
+ ips = whataremyips()
+ if not ips:
+ self.logger.error('ERROR Failed to get my own IPs?')
+ return
+ for node in self.ring.devs:
+ if node and node['ip'] in ips and node['port'] == self.port:
+ if self.mount_check and not os.path.ismount(
+ os.path.join(self.root, node['device'])):
+ self.logger.warn(
+ 'Skipping %(device)s as it is not mounted' % node)
+ continue
+ unlink_older_than(
+ os.path.join(self.root, node['device'], 'tmp'),
+ time.time() - self.reclaim_age)
+ datadir = os.path.join(self.root, node['device'], self.datadir)
+ if os.path.isdir(datadir):
+ dirs.append((datadir, node['id']))
+ self.logger.info('Beginning replication run')
+ for part, object_file, node_id in self.roundrobin_datadirs(dirs):
+ self.cpool.spawn_n(
+ self._replicate_object, part, object_file, node_id)
+ self.cpool.waitall()
+ self.logger.info('Replication run OVER')
+ self._report_stats()
+
+ def replicate_forever(self):
+ """
+ Replicate dbs under the given root in an infinite loop.
+ """
+ while True:
+ try:
+ self.replicate_once()
+ except:
+ self.logger.exception('ERROR trying to replicate')
+ sleep(self.run_pause)
+
+
+class ReplicatorRpc(object):
+ """Handle Replication RPC calls. TODO: redbo document please :)"""
+
+ def __init__(self, root, datadir, broker_class, mount_check=True):
+ self.root = root
+ self.datadir = datadir
+ self.broker_class = broker_class
+ self.mount_check = mount_check
+
+ def dispatch(self, post_args, args):
+ if not hasattr(args, 'pop'):
+ return HTTPBadRequest(body='Invalid object type')
+ op = args.pop(0)
+ drive, partition, hsh = post_args
+ if self.mount_check and \
+ not os.path.ismount(os.path.join(self.root, drive)):
+ return Response(status='507 %s is not mounted' % drive)
+ db_file = os.path.join(self.root, drive,
+ storage_directory(self.datadir, partition, hsh), hsh + '.db')
+ if op == 'rsync_then_merge':
+ return self.rsync_then_merge(drive, db_file, args)
+ if op == 'complete_rsync':
+ return self.complete_rsync(drive, db_file, args)
+ else:
+ # someone might be about to rsync a db to us,
+ # make sure there's a tmp dir to receive it.
+ mkdirs(os.path.join(self.root, drive, 'tmp'))
+ if not os.path.exists(db_file):
+ return HTTPNotFound()
+ return getattr(self, op)(self.broker_class(db_file), args)
+
+ def sync(self, broker, args):
+ (remote_sync, hash_, id_, created_at, put_timestamp,
+ delete_timestamp) = args
+ try:
+ info = broker.get_replication_info()
+ except Exception, e:
+ if 'no such table' in str(e):
+ # TODO find a real logger
+ print "Quarantining DB %s" % broker.db_file
+ quarantine_db(broker.db_file, broker.db_type)
+ return HTTPNotFound()
+ raise
+ if info['put_timestamp'] != put_timestamp or \
+ info['created_at'] != created_at or \
+ info['delete_timestamp'] != delete_timestamp:
+ broker.merge_timestamps(
+ created_at, put_timestamp, delete_timestamp)
+ info['point'] = broker.get_sync(id_)
+ if hash_ == info['hash'] and info['point'] < remote_sync:
+ broker.merge_syncs([{'remote_id': id_,
+ 'sync_point': remote_sync}])
+ info['point'] = remote_sync
+ return Response(simplejson.dumps(info))
+
+ def merge_syncs(self, broker, args):
+ broker.merge_syncs(args[0])
+ return HTTPAccepted()
+
+ def merge_items(self, broker, args):
+ broker.merge_items(args[0], args[1])
+ return HTTPAccepted()
+
+ def complete_rsync(self, drive, db_file, args):
+ old_filename = os.path.join(self.root, drive, 'tmp', args[0])
+ if os.path.exists(db_file):
+ return HTTPNotFound()
+ if not os.path.exists(old_filename):
+ return HTTPNotFound()
+ broker = self.broker_class(old_filename)
+ broker.newid(args[0])
+ renamer(old_filename, db_file)
+ return HTTPNoContent()
+
+ def rsync_then_merge(self, drive, db_file, args):
+ old_filename = os.path.join(self.root, drive, 'tmp', args[0])
+ if not os.path.exists(db_file) or not os.path.exists(old_filename):
+ return HTTPNotFound()
+ new_broker = self.broker_class(old_filename)
+ existing_broker = self.broker_class(db_file)
+ point = -1
+ objects = existing_broker.get_items_since(point, 1000)
+ while len(objects):
+ new_broker.merge_items(objects)
+ point = objects[-1]['ROWID']
+ objects = existing_broker.get_items_since(point, 1000)
+ sleep()
+ new_broker.newid(args[0])
+ renamer(old_filename, db_file)
+ return HTTPNoContent()
diff --git a/swift/common/direct_client.py b/swift/common/direct_client.py
new file mode 100644
index 000000000..1d7030c09
--- /dev/null
+++ b/swift/common/direct_client.py
@@ -0,0 +1,303 @@
+# Copyright (c) 2010 OpenStack, LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Internal client library for making calls directly to the servers rather than
+through the proxy.
+"""
+
+import socket
+from httplib import HTTPException
+from time import time
+from urllib import quote as _quote, unquote
+
+from eventlet import sleep, Timeout
+
+from swift.common.bufferedhttp import http_connect
+from swift.common.client import ClientException, json_loads
+from swift.common.utils import normalize_timestamp
+
+
+def quote(value, safe='/'):
+ if isinstance(value, unicode):
+ value = value.encode('utf8')
+ return _quote(value, safe)
+
+
+def direct_head_container(node, part, account, container, conn_timeout=5,
+ response_timeout=15):
+ """
+ Request container information directly from the container server.
+
+ :param node: node dictionary from the ring
+ :param part: partition the container is on
+ :param account: account name
+ :param container: container name
+ :param conn_timeout: timeout in seconds for establishing the connection
+ :param response_timeout: timeout in seconds for getting the response
+ :returns: tuple of (object count, bytes used)
+ """
+ path = '/%s/%s' % (account, container)
+ with Timeout(conn_timeout):
+ conn = http_connect(node['ip'], node['port'], node['device'], part,
+ 'HEAD', path)
+ with Timeout(response_timeout):
+ resp = conn.getresponse()
+ resp.read()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException(
+ 'Container server %s:%s direct HEAD %s gave status %s' %
+ (node['ip'], node['port'],
+ repr('/%s/%s%s' % (node['device'], part, path)),
+ resp.status),
+ http_host=node['ip'], http_port=node['port'],
+ http_device=node['device'], http_status=resp.status,
+ http_reason=resp.reason)
+ return int(resp.getheader('x-container-object-count')), \
+ int(resp.getheader('x-container-bytes-used'))
+
+
+def direct_get_container(node, part, account, container, marker=None,
+ limit=None, prefix=None, delimiter=None,
+ conn_timeout=5, response_timeout=15):
+ """
+ Get container listings directly from the container server.
+
+ :param node: node dictionary from the ring
+ :param part: partition the container is on
+ :param account: account name
+ :param container: container name
+ :param marker: marker query
+ :param limit: query limit
+ :param prefix: prefix query
+ :param delimeter: delimeter for the query
+ :param conn_timeout: timeout in seconds for establishing the connection
+ :param response_timeout: timeout in seconds for getting the response
+ :returns: list of objects
+ """
+ path = '/%s/%s' % (account, container)
+ qs = 'format=json'
+ if marker:
+ qs += '&marker=%s' % quote(marker)
+ if limit:
+ qs += '&limit=%d' % limit
+ if prefix:
+ qs += '&prefix=%s' % quote(prefix)
+ if delimiter:
+ qs += '&delimiter=%s' % quote(delimiter)
+ with Timeout(conn_timeout):
+ conn = http_connect(node['ip'], node['port'], node['device'], part,
+ 'GET', path, query_string='format=json')
+ with Timeout(response_timeout):
+ resp = conn.getresponse()
+ if resp.status < 200 or resp.status >= 300:
+ resp.read()
+ raise ClientException(
+ 'Container server %s:%s direct GET %s gave stats %s' % (node['ip'],
+ node['port'], repr('/%s/%s%s' % (node['device'], part, path)),
+ resp.status),
+ http_host=node['ip'], http_port=node['port'],
+ http_device=node['device'], http_status=resp.status,
+ http_reason=resp.reason)
+ if resp.status == 204:
+ resp.read()
+ return []
+ return json_loads(resp.read())
+
+
+def direct_delete_container(node, part, account, container, conn_timeout=5,
+ response_timeout=15, headers={}):
+ path = '/%s/%s' % (account, container)
+ headers['X-Timestamp'] = normalize_timestamp(time())
+ with Timeout(conn_timeout):
+ conn = http_connect(node['ip'], node['port'], node['device'], part,
+ 'DELETE', path, headers)
+ with Timeout(response_timeout):
+ resp = conn.getresponse()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException(
+ 'Container server %s:%s direct DELETE %s gave status %s' %
+ (node['ip'], node['port'],
+ repr('/%s/%s%s' % (node['device'], part, path)),
+ resp.status),
+ http_host=node['ip'], http_port=node['port'],
+ http_device=node['device'], http_status=resp.status,
+ http_reason=resp.reason)
+ return resp
+
+
+def direct_head_object(node, part, account, container, obj, conn_timeout=5,
+ response_timeout=15):
+ """
+ Request object information directly from the object server.
+
+ :param node: node dictionary from the ring
+ :param part: partition the container is on
+ :param account: account name
+ :param container: container name
+ :param obj: object name
+ :param conn_timeout: timeout in seconds for establishing the connection
+ :param response_timeout: timeout in seconds for getting the response
+ :returns: tuple of (content-type, object size, last modified timestamp,
+ etag, metadata dictionary)
+ """
+ path = '/%s/%s/%s' % (account, container, obj)
+ with Timeout(conn_timeout):
+ conn = http_connect(node['ip'], node['port'], node['device'], part,
+ 'HEAD', path)
+ with Timeout(response_timeout):
+ resp = conn.getresponse()
+ resp.read()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException(
+ 'Object server %s:%s direct HEAD %s gave status %s' %
+ (node['ip'], node['port'],
+ repr('/%s/%s%s' % (node['device'], part, path)),
+ resp.status),
+ http_host=node['ip'], http_port=node['port'],
+ http_device=node['device'], http_status=resp.status,
+ http_reason=resp.reason)
+ metadata = {}
+ for key, value in resp.getheaders():
+ if key.lower().startswith('x-object-meta-'):
+ metadata[unquote(key[len('x-object-meta-'):])] = unquote(value)
+ return resp.getheader('content-type'), \
+ int(resp.getheader('content-length')), \
+ resp.getheader('last-modified'), \
+ resp.getheader('etag').strip('"'), \
+ metadata
+
+
+def direct_get_object(node, part, account, container, obj, conn_timeout=5,
+ response_timeout=15):
+ """
+ Get object directly from the object server.
+
+ :param node: node dictionary from the ring
+ :param part: partition the container is on
+ :param account: account name
+ :param container: container name
+ :param obj: object name
+ :param conn_timeout: timeout in seconds for establishing the connection
+ :param response_timeout: timeout in seconds for getting the response
+ :returns: object
+ """
+ path = '/%s/%s/%s' % (account, container, obj)
+ with Timeout(conn_timeout):
+ conn = http_connect(node['ip'], node['port'], node['device'], part,
+ 'GET', path)
+ with Timeout(response_timeout):
+ resp = conn.getresponse()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException(
+ 'Object server %s:%s direct GET %s gave status %s' %
+ (node['ip'], node['port'],
+ repr('/%s/%s%s' % (node['device'], part, path)),
+ resp.status),
+ http_host=node['ip'], http_port=node['port'],
+ http_device=node['device'], http_status=resp.status,
+ http_reason=resp.reason)
+ metadata = {}
+ for key, value in resp.getheaders():
+ if key.lower().startswith('x-object-meta-'):
+ metadata[unquote(key[len('x-object-meta-'):])] = unquote(value)
+ return (resp.getheader('content-type'),
+ int(resp.getheader('content-length')),
+ resp.getheader('last-modified'),
+ resp.getheader('etag').strip('"'),
+ metadata,
+ resp.read())
+
+
+def direct_delete_object(node, part, account, container, obj,
+ conn_timeout=5, response_timeout=15, headers={}):
+ """
+ Delete object directly from the object server.
+
+ :param node: node dictionary from the ring
+ :param part: partition the container is on
+ :param account: account name
+ :param container: container name
+ :param obj: object name
+ :param conn_timeout: timeout in seconds for establishing the connection
+ :param response_timeout: timeout in seconds for getting the response
+ :returns: response from server
+ """
+ path = '/%s/%s/%s' % (account, container, obj)
+ headers['X-Timestamp'] = normalize_timestamp(time())
+ with Timeout(conn_timeout):
+ conn = http_connect(node['ip'], node['port'], node['device'], part,
+ 'DELETE', path, headers)
+ with Timeout(response_timeout):
+ resp = conn.getresponse()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException(
+ 'Object server %s:%s direct DELETE %s gave status %s' %
+ (node['ip'], node['port'],
+ repr('/%s/%s%s' % (node['device'], part, path)),
+ resp.status),
+ http_host=node['ip'], http_port=node['port'],
+ http_device=node['device'], http_status=resp.status,
+ http_reason=resp.reason)
+ return resp
+
+
+def retry(func, *args, **kwargs):
+ """
+ Helper function to retry a given function a number of times.
+
+ :param func: callable to be called
+ :param retries: number of retries
+ :param error_log: logger for errors
+ :param args: arguments to send to func
+ :param kwargs: keyward arguments to send to func (if retries or
+ error_log are sent, they will be deleted from kwargs
+ before sending on to func)
+ :returns: restult of func
+ """
+ retries = 5
+ if 'retries' in kwargs:
+ retries = kwargs['retries']
+ del kwargs['retries']
+ error_log = None
+ if 'error_log' in kwargs:
+ error_log = kwargs['error_log']
+ del kwargs['error_log']
+ attempts = 0
+ backoff = 1
+ while attempts <= retries:
+ attempts += 1
+ try:
+ return attempts, func(*args, **kwargs)
+ except (socket.error, HTTPException, Timeout), err:
+ if error_log:
+ error_log(err)
+ if attempts > retries:
+ raise
+ except ClientException, err:
+ if error_log:
+ error_log(err)
+ if attempts > retries or err.http_status < 500 or \
+ err.http_status == 507 or err.http_status > 599:
+ raise
+ sleep(backoff)
+ backoff *= 2
+ # Shouldn't actually get down here, but just in case.
+ if args and 'ip' in args[0]:
+ raise ClientException('Raise too many retries',
+ http_host=args[0]['ip'], http_port=args[0]['port'],
+ http_device=args[0]['device'])
+ else:
+ raise ClientException('Raise too many retries')
diff --git a/swift/common/exceptions.py b/swift/common/exceptions.py
new file mode 100644
index 000000000..bf1b07cae
--- /dev/null
+++ b/swift/common/exceptions.py
@@ -0,0 +1,35 @@
+# Copyright (c) 2010 OpenStack, LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from eventlet import TimeoutError
+
+
+class MessageTimeout(TimeoutError):
+
+ def __init__(self, seconds=None, msg=None):
+ TimeoutError.__init__(self, seconds=seconds)
+ self.msg = msg
+
+ def __str__(self):
+ return '%s: %s' % (TimeoutError.__str__(self), self.msg)
+
+
+class AuditException(Exception): pass
+class AuthException(Exception): pass
+class ChunkReadTimeout(TimeoutError): pass
+class ChunkWriteTimeout(TimeoutError): pass
+class ConnectionTimeout(TimeoutError): pass
+class DriveNotMounted(Exception): pass
+class LockTimeout(MessageTimeout): pass
diff --git a/swift/common/healthcheck.py b/swift/common/healthcheck.py
new file mode 100644
index 000000000..a483eaffd
--- /dev/null
+++ b/swift/common/healthcheck.py
@@ -0,0 +1,28 @@
+# Copyright (c) 2010 OpenStack, LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from webob import Response
+
+class HealthCheckController(object):
+ """Basic controller used for monitoring."""
+
+ def __init__(self, *args, **kwargs):
+ pass
+
+ @classmethod
+ def GET(self, req):
+ return Response(request=req, body="OK", content_type="text/plain")
+
+healthcheck = HealthCheckController.GET
diff --git a/swift/common/memcached.py b/swift/common/memcached.py
new file mode 100644
index 000000000..e232e404f
--- /dev/null
+++ b/swift/common/memcached.py
@@ -0,0 +1,272 @@
+# Copyright (c) 2010 OpenStack, LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Lucid comes with memcached: v1.4.2. Protocol documentation for that
+version is at:
+
+http://github.com/memcached/memcached/blob/1.4.2/doc/protocol.txt
+"""
+
+import cPickle as pickle
+import logging
+import socket
+import time
+from bisect import bisect
+from hashlib import md5
+
+
+CONN_TIMEOUT = 0.3
+IO_TIMEOUT = 2.0
+PICKLE_FLAG = 1
+NODE_WEIGHT = 50
+PICKLE_PROTOCOL = 2
+TRY_COUNT = 3
+
+# if ERROR_LIMIT_COUNT errors occur in ERROR_LIMIT_TIME seconds, the server
+# will be considered failed for ERROR_LIMIT_DURATION seconds.
+ERROR_LIMIT_COUNT = 10
+ERROR_LIMIT_TIME = 60
+ERROR_LIMIT_DURATION = 300
+
+
+def md5hash(key):
+ return md5(key).hexdigest()
+
+
+class MemcacheRing(object):
+ """
+ Simple, consistent-hashed memcache client.
+ """
+
+ def __init__(self, servers, connect_timeout=CONN_TIMEOUT,
+ io_timeout=IO_TIMEOUT, tries=TRY_COUNT):
+ self._ring = {}
+ self._errors = dict(((serv, []) for serv in servers))
+ self._error_limited = dict(((serv, 0) for serv in servers))
+ for server in sorted(servers):
+ for i in xrange(NODE_WEIGHT):
+ self._ring[md5hash('%s-%s' % (server, i))] = server
+ self._tries = tries if tries <= len(servers) else len(servers)
+ self._sorted = sorted(self._ring.keys())
+ self._client_cache = dict(((server, []) for server in servers))
+ self._connect_timeout = connect_timeout
+ self._io_timeout = io_timeout
+
+ def _exception_occurred(self, server, e, action='talking'):
+ if isinstance(e, socket.timeout):
+ logging.error("Timeout %s to memcached: %s" % (action, server))
+ else:
+ logging.exception("Error %s to memcached: %s" % (action, server))
+ now = time.time()
+ self._errors[server].append(time.time())
+ if len(self._errors[server]) > ERROR_LIMIT_COUNT:
+ self._errors[server] = [err for err in self._errors[server]
+ if err > now - ERROR_LIMIT_TIME]
+ if len(self._errors[server]) > ERROR_LIMIT_COUNT:
+ self._error_limited[server] = now + ERROR_LIMIT_DURATION
+ logging.error('Error limiting server %s' % server)
+
+ def _get_conns(self, key):
+ """
+ Retrieves a server conn from the pool, or connects a new one.
+ Chooses the server based on a consistent hash of "key".
+ """
+ pos = bisect(self._sorted, key)
+ served = []
+ while len(served) < self._tries:
+ pos = (pos + 1) % len(self._sorted)
+ server = self._ring[self._sorted[pos]]
+ if server in served:
+ continue
+ served.append(server)
+ if self._error_limited[server] > time.time():
+ continue
+ try:
+ fp, sock = self._client_cache[server].pop()
+ yield server, fp, sock
+ except IndexError:
+ try:
+ host, port = server.split(':')
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
+ sock.settimeout(self._connect_timeout)
+ sock.connect((host, int(port)))
+ sock.settimeout(self._io_timeout)
+ yield server, sock.makefile(), sock
+ except Exception, e:
+ self._exception_occurred(server, e, 'connecting')
+
+ def _return_conn(self, server, fp, sock):
+ """ Returns a server connection to the pool """
+ self._client_cache[server].append((fp, sock))
+
+ def set(self, key, value, serialize=True, timeout=0):
+ """
+ Set a key/value pair in memcache
+
+ :param key: key
+ :param value: value
+ :param serialize: if True, value is pickled before sending to memcache
+ :param timeout: ttl in memcache
+ """
+ key = md5hash(key)
+ if timeout > 0:
+ timeout += time.time()
+ flags = 0
+ if serialize:
+ value = pickle.dumps(value, PICKLE_PROTOCOL)
+ flags |= PICKLE_FLAG
+ for (server, fp, sock) in self._get_conns(key):
+ try:
+ sock.sendall('set %s %d %d %s noreply\r\n%s\r\n' % \
+ (key, flags, timeout, len(value), value))
+ self._return_conn(server, fp, sock)
+ return
+ except Exception, e:
+ self._exception_occurred(server, e)
+
+ def get(self, key):
+ """
+ Gets the object specified by key. It will also unpickle the object
+ before returning if it is pickled in memcache.
+
+ :param key: key
+ :returns: value of the key in memcache
+ """
+ key = md5hash(key)
+ value = None
+ for (server, fp, sock) in self._get_conns(key):
+ try:
+ sock.sendall('get %s\r\n' % key)
+ line = fp.readline().strip().split()
+ while line[0].upper() != 'END':
+ if line[0].upper() == 'VALUE' and line[1] == key:
+ size = int(line[3])
+ value = fp.read(size)
+ if int(line[2]) & PICKLE_FLAG:
+ value = pickle.loads(value)
+ fp.readline()
+ line = fp.readline().strip().split()
+ self._return_conn(server, fp, sock)
+ return value
+ except Exception, e:
+ self._exception_occurred(server, e)
+
+ def incr(self, key, delta=1, timeout=0):
+ """
+ Increments a key which has a numeric value by delta.
+ If the key can't be found, it's added as delta.
+
+ :param key: key
+ :param delta: amount to add to the value of key (or set as the value
+ if the key is not found)
+ :param timeout: ttl in memcache
+ """
+ key = md5hash(key)
+ for (server, fp, sock) in self._get_conns(key):
+ try:
+ sock.sendall('incr %s %s\r\n' % (key, delta))
+ line = fp.readline().strip().split()
+ if line[0].upper() == 'NOT_FOUND':
+ line[0] = str(delta)
+ sock.sendall('add %s %d %d %s noreply\r\n%s\r\n' % \
+ (key, 0, timeout, len(line[0]), line[0]))
+ ret = int(line[0].strip())
+ self._return_conn(server, fp, sock)
+ return ret
+ except Exception, e:
+ self._exception_occurred(server, e)
+
+ def delete(self, key):
+ """
+ Deletes a key/value pair from memcache.
+
+ :param key: key to be deleted
+ """
+ key = md5hash(key)
+ for (server, fp, sock) in self._get_conns(key):
+ try:
+ sock.sendall('delete %s noreply\r\n' % key)
+ self._return_conn(server, fp, sock)
+ return
+ except Exception, e:
+ self._exception_occurred(server, e)
+
+ def set_multi(self, mapping, server_key, serialize=True, timeout=0):
+ """
+ Sets multiple key/value pairs in memcache.
+
+ :param mapping: dictonary of keys and values to be set in memcache
+ :param servery_key: key to use in determining which server in the ring
+ is used
+ :param serialize: if True, value is pickled before sending to memcache
+ :param timeout: ttl for memcache
+ """
+ server_key = md5hash(server_key)
+ if timeout > 0:
+ timeout += time.time()
+ msg = ''
+ for key, value in mapping.iteritems():
+ key = md5hash(key)
+ flags = 0
+ if serialize:
+ value = pickle.dumps(value, PICKLE_PROTOCOL)
+ flags |= PICKLE_FLAG
+ msg += ('set %s %d %d %s noreply\r\n%s\r\n' %
+ (key, flags, timeout, len(value), value))
+ for (server, fp, sock) in self._get_conns(server_key):
+ try:
+ sock.sendall(msg)
+ self._return_conn(server, fp, sock)
+ return
+ except Exception, e:
+ self._exception_occurred(server, e)
+
+ def get_multi(self, keys, server_key):
+ """
+ Gets multiple values from memcache for the given keys.
+
+ :param keys: keys for values to be retrieved from memcache
+ :param servery_key: key to use in determining which server in the ring
+ is used
+ :returns: list of values
+ """
+ server_key = md5hash(server_key)
+ keys = [md5hash(key) for key in keys]
+ for (server, fp, sock) in self._get_conns(server_key):
+ try:
+ sock.sendall('get %s\r\n' % ' '.join(keys))
+ line = fp.readline().strip().split()
+ responses = {}
+ while line[0].upper() != 'END':
+ if line[0].upper() == 'VALUE':
+ size = int(line[3])
+ value = fp.read(size)
+ if int(line[2]) & PICKLE_FLAG:
+ value = pickle.loads(value)
+ responses[line[1]] = value
+ fp.readline()
+ line = fp.readline().strip().split()
+ values = []
+ for key in keys:
+ if key in responses:
+ values.append(responses[key])
+ else:
+ values.append(None)
+ self._return_conn(server, fp, sock)
+ return values
+ except Exception, e:
+ self._exception_occurred(server, e)
diff --git a/swift/common/utils.py b/swift/common/utils.py
new file mode 100644
index 000000000..c2c091ce7
--- /dev/null
+++ b/swift/common/utils.py
@@ -0,0 +1,490 @@
+# Copyright (c) 2010 OpenStack, LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Miscellaneous utility functions for use with Swift."""
+
+import errno
+import fcntl
+import os
+import pwd
+import signal
+import sys
+import time
+import mimetools
+from hashlib import md5
+from random import shuffle
+from urllib import quote
+from contextlib import contextmanager
+import ctypes
+import ctypes.util
+import fcntl
+import struct
+
+import eventlet
+from eventlet import greenio, GreenPool, sleep, Timeout, listen
+from eventlet.green import socket, subprocess, ssl, thread, threading
+
+from swift.common.exceptions import LockTimeout, MessageTimeout
+
+# logging doesn't import patched as cleanly as one would like
+from logging.handlers import SysLogHandler
+import logging
+logging.thread = eventlet.green.thread
+logging.threading = eventlet.green.threading
+logging._lock = logging.threading.RLock()
+
+
+libc = ctypes.CDLL(ctypes.util.find_library('c'))
+sys_fallocate = libc.fallocate
+posix_fadvise = libc.posix_fadvise
+
+
+# Used by hash_path to offer a bit more security when generating hashes for
+# paths. It simply appends this value to all paths; guessing the hash a path
+# will end up with would also require knowing this suffix.
+HASH_PATH_SUFFIX = os.environ.get('SWIFT_HASH_PATH_SUFFIX', 'endcap')
+
+
+def get_param(req, name, default=None):
+ """
+ Get parameters from an HTTP request ensuring proper handling UTF-8
+ encoding.
+
+ :param req: Webob request object
+ :param name: parameter name
+ :param default: result to return if the parameter is not found
+ :returns: HTTP request parameter value
+ """
+ value = req.str_params.get(name, default)
+ if value:
+ value.decode('utf8') # Ensure UTF8ness
+ return value
+
+
+def fallocate(fd, size):
+ """
+ Pre-allocate disk space for a file file.
+
+ :param fd: file descriptor
+ :param size: size to allocate (in bytes)
+ """
+ if size > 0:
+ # 1 means "FALLOC_FL_KEEP_SIZE", which means it pre-allocates invisibly
+ ret = sys_fallocate(fd, 1, 0, ctypes.c_uint64(size))
+ # XXX: in (not very thorough) testing, errno always seems to be 0?
+ err = ctypes.get_errno()
+ if ret and err not in (0, errno.ENOSYS):
+ raise OSError(err, 'Unable to fallocate(%s)' % size)
+
+
+def drop_buffer_cache(fd, offset, length):
+ """
+ Drop 'buffer' cache for the given range of the given file.
+
+ :param fd: file descriptor
+ :param offset: start offset
+ :param length: length
+ """
+ # 4 means "POSIX_FADV_DONTNEED"
+ ret = posix_fadvise(fd, ctypes.c_uint64(offset), ctypes.c_uint64(length), 4)
+ if ret != 0:
+ print "posix_fadvise(%s, %s, %s, 4) -> %s" % (fd, offset, length, ret)
+
+
+def normalize_timestamp(timestamp):
+ """
+ Format a timestamp (string or numeric) into a standardized
+ xxxxxxxxxx.xxxxx format.
+
+ :param timestamp: unix timestamp
+ :returns: normalized timestamp as a string
+ """
+ return "%016.05f" % (float(timestamp))
+
+
+def mkdirs(path):
+ """
+ Ensures the path is a directory or makes it if not. Errors if the path
+ exists but is a file or on permissions failure.
+
+ :param path: path to create
+ """
+ if not os.path.isdir(path):
+ try:
+ os.makedirs(path)
+ except OSError, err:
+ if err.errno != errno.EEXIST or not os.path.isdir(path):
+ raise
+
+
+def renamer(old, new): # pragma: no cover
+ """
+ Attempt to fix^H^H^Hhide race conditions like empty object directories
+ being removed by backend processes during uploads, by retrying.
+
+ :param old: old path to be renamed
+ :param new: new path to be renamed to
+ """
+ try:
+ mkdirs(os.path.dirname(new))
+ os.rename(old, new)
+ except OSError:
+ mkdirs(os.path.dirname(new))
+ os.rename(old, new)
+
+
+def split_path(path, minsegs=1, maxsegs=None, rest_with_last=False):
+ """
+ Validate and split the given HTTP request path.
+
+ **Examples**::
+
+ ['a'] = split_path('/a')
+ ['a', None] = split_path('/a', 1, 2)
+ ['a', 'c'] = split_path('/a/c', 1, 2)
+ ['a', 'c', 'o/r'] = split_path('/a/c/o/r', 1, 3, True)
+
+ :param path: HTTP Request path to be split
+ :param minsegs: Minimum number of segments to be extracted
+ :param maxsegs: Maximum number of segments to be extracted
+ :param rest_with_last: If True, trailing data will be returned as part
+ of last segment. If False, and there is
+ trailing data, raises ValueError.
+ :returns: list of segments with a length of maxsegs (non-existant
+ segments will return as None)
+ """
+ if not maxsegs:
+ maxsegs = minsegs
+ if minsegs > maxsegs:
+ raise ValueError('minsegs > maxsegs: %d > %d' % (minsegs, maxsegs))
+ if rest_with_last:
+ segs = path.split('/', maxsegs)
+ minsegs += 1
+ maxsegs += 1
+ count = len(segs)
+ if segs[0] or count < minsegs or count > maxsegs or \
+ '' in segs[1:minsegs]:
+ raise ValueError('Invalid path: %s' % quote(path))
+ else:
+ minsegs += 1
+ maxsegs += 1
+ segs = path.split('/', maxsegs)
+ count = len(segs)
+ if segs[0] or count < minsegs or count > maxsegs + 1 or \
+ '' in segs[1:minsegs] or (count == maxsegs + 1 and segs[maxsegs]):
+ raise ValueError('Invalid path: %s' % quote(path))
+ segs = segs[1:maxsegs]
+ segs.extend([None] * (maxsegs - 1 - len(segs)))
+ return segs
+
+
+class NullLogger():
+ """A no-op logger for eventlet wsgi."""
+
+ def write(self, *args):
+ #"Logs" the args to nowhere
+ pass
+
+
+class LoggerFileObject(object):
+
+ def __init__(self, logger):
+ self.logger = logger
+
+ def write(self, value):
+ value = value.strip()
+ if value:
+ if 'Connection reset by peer' in value:
+ self.logger.error('STDOUT: Connection reset by peer')
+ else:
+ self.logger.error('STDOUT: %s' % value)
+
+ def writelines(self, values):
+ self.logger.error('STDOUT: %s' % '#012'.join(values))
+
+ def close(self):
+ pass
+
+ def flush(self):
+ pass
+
+ def __iter__(self):
+ return self
+
+ def next(self):
+ raise IOError(errno.EBADF, 'Bad file descriptor')
+
+ def read(self, size=-1):
+ raise IOError(errno.EBADF, 'Bad file descriptor')
+
+ def readline(self, size=-1):
+ raise IOError(errno.EBADF, 'Bad file descriptor')
+
+ def tell(self):
+ return 0
+
+ def xreadlines(self):
+ return self
+
+
+def drop_privileges(user):
+ """
+ Sets the userid of the current process
+
+ :param user: User id to change privileges to
+ """
+ user = pwd.getpwnam(user)
+ os.setgid(user[3])
+ os.setuid(user[2])
+
+
+class NamedLogger(object):
+ """Cheesy version of the LoggerAdapter available in Python 3"""
+
+ def __init__(self, logger, server):
+ self.logger = logger
+ self.server = server
+ for proxied_method in ('debug', 'info', 'log', 'warn', 'warning',
+ 'error', 'critical'):
+ setattr(self, proxied_method,
+ self._proxy(getattr(logger, proxied_method)))
+
+ def _proxy(self, logger_meth):
+ def _inner_proxy(msg, *args, **kwargs):
+ msg = '%s %s' % (self.server, msg)
+ logger_meth(msg, *args, **kwargs)
+ return _inner_proxy
+
+ def getEffectiveLevel(self):
+ return self.logger.getEffectiveLevel()
+
+ def exception(self, msg, *args):
+ _, exc, _ = sys.exc_info()
+ call = self.logger.error
+ emsg = ''
+ if isinstance(exc, OSError):
+ if exc.errno in (errno.EIO, errno.ENOSPC):
+ emsg = str(exc)
+ else:
+ call = self.logger.exception
+ elif isinstance(exc, socket.error):
+ if exc.errno == errno.ECONNREFUSED:
+ emsg = 'Connection refused'
+ elif exc.errno == errno.EHOSTUNREACH:
+ emsg = 'Host unreachable'
+ else:
+ call = self.logger.exception
+ elif isinstance(exc, eventlet.Timeout):
+ emsg = exc.__class__.__name__
+ if hasattr(exc, 'seconds'):
+ emsg += ' (%ss)' % exc.seconds
+ if isinstance(exc, MessageTimeout):
+ if exc.msg:
+ emsg += ' %s' % exc.msg
+ else:
+ call = self.logger.exception
+ call('%s %s: %s' % (self.server, msg, emsg), *args)
+
+
+def get_logger(conf, name):
+ """
+ Get the current system logger using config settings.
+
+ **Log config and defaults**::
+
+ log_facility = LOG_LOCAL0
+ log_level = INFO
+
+ :param conf: Configuration dict to read settings from
+ :param name: Name of the logger
+ """
+ root_logger = logging.getLogger()
+ if hasattr(get_logger, 'handler') and get_logger.handler:
+ root_logger.removeHandler(get_logger.handler)
+ get_logger.handler = None
+ if conf is None:
+ root_logger.setLevel(logging.INFO)
+ return NamedLogger(root_logger, name)
+ get_logger.handler = SysLogHandler(address='/dev/log',
+ facility=getattr(SysLogHandler, conf.get('log_facility', 'LOG_LOCAL0'),
+ SysLogHandler.LOG_LOCAL0))
+ root_logger.addHandler(get_logger.handler)
+ root_logger.setLevel(
+ getattr(logging, conf.get('log_level', 'INFO').upper(), logging.INFO))
+ return NamedLogger(root_logger, name)
+
+
+def whataremyips():
+ """
+ Get the machine's ip addresses using ifconfig
+
+ :returns: list of Strings of IPv4 ip addresses
+ """
+ proc = subprocess.Popen(['/sbin/ifconfig'], stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT)
+ ret_val = proc.wait()
+ results = proc.stdout.read().split('\n')
+ return [x.split(':')[1].split()[0] for x in results if 'inet addr' in x]
+
+
+def storage_directory(datadir, partition, hash):
+ """
+ Get the storage directory
+
+ :param datadir: Base data directory
+ :param partition: Partition
+ :param hash: Account, container or object hash
+ :returns: Storage directory
+ """
+ return os.path.join(datadir, partition, hash[-3:], hash)
+
+
+def hash_path(account, container=None, object=None, raw_digest=False):
+ """
+ Get the connonical hash for an account/container/object
+
+ :param account: Account
+ :param container: Container
+ :param object: Object
+ :param raw_digest: If True, return the raw version rather than a hex digest
+ :returns: hash string
+ """
+ if object and not container:
+ raise ValueError('container is required if object is provided')
+ paths = [account]
+ if container:
+ paths.append(container)
+ if object:
+ paths.append(object)
+ if raw_digest:
+ return md5('/' + '/'.join(paths) + HASH_PATH_SUFFIX).digest()
+ else:
+ return md5('/' + '/'.join(paths) + HASH_PATH_SUFFIX).hexdigest()
+
+
+@contextmanager
+def lock_path(directory, timeout=10):
+ """
+ Context manager that acquires a lock on a directory. This will block until
+ the lock can be acquired, or the timeout time has expired (whichever occurs
+ first).
+
+ :param directory: directory to be locked
+ :param timeout: timeout (in seconds)
+ """
+ mkdirs(directory)
+ fd = os.open(directory, os.O_RDONLY)
+ try:
+ with LockTimeout(timeout, directory):
+ while True:
+ try:
+ fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
+ break
+ except IOError, err:
+ if err.errno != errno.EAGAIN:
+ raise
+ sleep(0.01)
+ yield True
+ finally:
+ os.close(fd)
+
+
+def lock_parent_directory(filename, timeout=10):
+ """
+ Context manager that acquires a lock on the parent directory of the given
+ file path. This will block until the lock can be acquired, or the timeout
+ time has expired (whichever occurs first).
+
+ :param filename: file path of the parent directory to be locked
+ :param timeout: timeout (in seconds)
+ """
+ return lock_path(os.path.dirname(filename))
+
+
+def get_time_units(time_amount):
+ """
+ Get a nomralized length of time in the largest unit of time (hours,
+ minutes, or seconds.)
+
+ :param time_amount: length of time in seconds
+ :returns: A touple of (length of time, unit of time) where unit of time is
+ one of ('h', 'm', 's')
+ """
+ time_unit = 's'
+ if time_amount > 60:
+ time_amount /= 60
+ time_unit = 'm'
+ if time_amount > 60:
+ time_amount /= 60
+ time_unit = 'h'
+ return time_amount, time_unit
+
+
+def compute_eta(start_time, current_value, final_value):
+ """
+ Compute an ETA. Now only if we could also have a progress bar...
+
+ :param start_time: Unix timestamp when the operation began
+ :param current_value: Current value
+ :param final_value: Final value
+ :returns: ETA as a tuple of (length of time, unit of time) where unit of
+ time is one of ('h', 'm', 's')
+ """
+ elapsed = time.time() - start_time
+ completion = (float(current_value) / final_value) or 0.00001
+ return get_time_units(1.0 / completion * elapsed - elapsed)
+
+
+def iter_devices_partitions(devices_dir, item_type):
+ """
+ Iterate over partitions accross all devices.
+
+ :param devices_dir: Path to devices
+ :param item_type: One of 'accounts', 'containers', or 'objects'
+ :returns: Each iteration returns a tuple of (device, partition)
+ """
+ devices = os.listdir(devices_dir)
+ shuffle(devices)
+ devices_partitions = []
+ for device in devices:
+ partitions = os.listdir(os.path.join(devices_dir, device, item_type))
+ shuffle(partitions)
+ devices_partitions.append((device, iter(partitions)))
+ yielded = True
+ while yielded:
+ yielded = False
+ for device, partitions in devices_partitions:
+ try:
+ yield device, partitions.next()
+ yielded = True
+ except StopIteration:
+ pass
+
+
+def unlink_older_than(path, mtime):
+ """
+ Remove any file in a given path that that was last modified before mtime.
+
+ :param path: Path to remove file from
+ :mtime: Timestamp of oldest file to keep
+ """
+ if os.path.exists(path):
+ for fname in os.listdir(path):
+ fpath = os.path.join(path, fname)
+ try:
+ if os.path.getmtime(fpath) < mtime:
+ os.unlink(fpath)
+ except OSError:
+ pass
diff --git a/swift/common/wsgi.py b/swift/common/wsgi.py
new file mode 100644
index 000000000..d025896d3
--- /dev/null
+++ b/swift/common/wsgi.py
@@ -0,0 +1,164 @@
+# Copyright (c) 2010 OpenStack, LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""WSGI tools for use with swift."""
+
+import errno
+import os
+import signal
+import sys
+import time
+import mimetools
+
+import eventlet
+from eventlet import greenio, GreenPool, sleep, wsgi, listen
+
+# Hook to ensure connection resets don't blow up our servers.
+# Remove with next release of Eventlet that has it in the set already.
+from errno import ECONNRESET
+wsgi.ACCEPT_ERRNO.add(ECONNRESET)
+
+from eventlet.green import socket, ssl
+
+from swift.common.utils import get_logger, drop_privileges, \
+ LoggerFileObject, NullLogger
+
+def monkey_patch_mimetools():
+ """
+ mimetools.Message defaults content-type to "text/plain"
+ This changes it to default to None, so we can detect missing headers.
+ """
+
+ orig_parsetype = mimetools.Message.parsetype
+
+ def parsetype(self):
+ if not self.typeheader:
+ self.type = None
+ self.maintype = None
+ self.subtype = None
+ self.plisttext = ''
+ else:
+ orig_parsetype(self)
+
+ mimetools.Message.parsetype = parsetype
+
+# We might be able to pull pieces of this out to test, but right now it seems
+# like more work than it's worth.
+def run_wsgi(app, conf, *args, **kwargs): # pragma: no cover
+ """
+ Loads common settings from conf, then instantiates app and runs
+ the server using the specified number of workers.
+
+ :param app: WSGI callable
+ :param conf: Configuration dictionary
+ """
+ if 'logger' in kwargs:
+ logger = kwargs['logger']
+ else:
+ logger = get_logger(conf, app.log_name)
+ # log uncaught exceptions
+ sys.excepthook = lambda *exc_info: \
+ logger.critical('UNCAUGHT EXCEPTION', exc_info=exc_info)
+ sys.stdout = sys.stderr = LoggerFileObject(logger)
+
+ try:
+ os.setsid()
+ except OSError:
+ no_cover = True # pass
+ bind_addr = (conf.get('bind_ip', '0.0.0.0'),
+ int(conf.get('bind_port', kwargs.get('default_port', 8080))))
+ sock = None
+ retry_until = time.time() + 30
+ while not sock and time.time() < retry_until:
+ try:
+ sock = listen(bind_addr)
+ if 'cert_file' in conf:
+ sock = ssl.wrap_socket(sock, certfile=conf['cert_file'],
+ keyfile=conf['key_file'])
+ except socket.error, err:
+ if err.args[0] != errno.EADDRINUSE:
+ raise
+ sleep(0.1)
+ if not sock:
+ raise Exception('Could not bind to %s:%s after trying for 30 seconds' %
+ bind_addr)
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ # in my experience, sockets can hang around forever without keepalive
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
+ sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 600)
+ worker_count = int(conf.get('workers', '1'))
+ drop_privileges(conf.get('user', 'swift'))
+ if isinstance(app, type):
+ # Instantiate app if it hasn't been already
+ app = app(conf, *args)
+
+ def run_server():
+ wsgi.HttpProtocol.default_request_version = "HTTP/1.0"
+ eventlet.hubs.use_hub('poll')
+ eventlet.patcher.monkey_patch(all=False, socket=True)
+ monkey_patch_mimetools()
+ pool = GreenPool(size=1024)
+ try:
+ wsgi.server(sock, app, NullLogger(), custom_pool=pool)
+ except socket.error, err:
+ if err[0] != errno.EINVAL:
+ raise
+ pool.waitall()
+
+ # Useful for profiling [no forks].
+ if worker_count == 0:
+ run_server()
+ return
+
+ def kill_children(*args):
+ """Kills the entire process group."""
+ logger.error('SIGTERM received')
+ signal.signal(signal.SIGTERM, signal.SIG_IGN)
+ running[0] = False
+ os.killpg(0, signal.SIGTERM)
+
+ def hup(*args):
+ """Shuts down the server, but allows running requests to complete"""
+ logger.error('SIGHUP received')
+ signal.signal(signal.SIGHUP, signal.SIG_IGN)
+ running[0] = False
+
+ running = [True]
+ signal.signal(signal.SIGTERM, kill_children)
+ signal.signal(signal.SIGHUP, hup)
+ children = []
+ while running[0]:
+ while len(children) < worker_count:
+ pid = os.fork()
+ if pid == 0:
+ signal.signal(signal.SIGHUP, signal.SIG_DFL)
+ signal.signal(signal.SIGTERM, signal.SIG_DFL)
+ run_server()
+ logger.info('Child %d exiting normally' % os.getpid())
+ return
+ else:
+ logger.info('Started child %s' % pid)
+ children.append(pid)
+ try:
+ pid, status = os.wait()
+ if os.WIFEXITED(status) or os.WIFSIGNALED(status):
+ logger.error('Removing dead child %s' % pid)
+ children.remove(pid)
+ except OSError, err:
+ if err.errno not in (errno.EINTR, errno.ECHILD):
+ raise
+ greenio.shutdown_safe(sock)
+ sock.close()
+ logger.info('Exited')