diff options
authorMichael Barton <>2010-07-08 01:37:44 +0000
committerMonty Taylor <>2010-07-08 01:37:44 +0000
commitfe067413b1fb61cdfba97d543189c5a01230b45c (patch)
Import upstream version 1.0.0upstream-1.0.0
41 files changed, 9069 insertions, 0 deletions
diff --git a/PKG-INFO b/PKG-INFO
new file mode 100644
index 000000000..40150b514
--- /dev/null
+++ b/PKG-INFO
@@ -0,0 +1,15 @@
+Metadata-Version: 1.0
+Name: swift
+Version: 1.0.0-1
+Summary: Swift
+Author: OpenStack, LLC.
+Author-email: UNKNOWN
+License: Apache License (2.0)
+Description: UNKNOWN
+Platform: UNKNOWN
+Classifier: Development Status :: 4 - Beta
+Classifier: License :: OSI Approved :: Apache Software License
+Classifier: Operating System :: POSIX :: Linux
+Classifier: Programming Language :: Python :: 2.6
+Classifier: Environment :: No Input/Output (Daemon)
diff --git a/README b/README
new file mode 100644
index 000000000..ac8681ae0
--- /dev/null
+++ b/README
@@ -0,0 +1,17 @@
+A distributed object store that was originally developed as the basis for
+Rackspace's Cloud Files.
+To build documentation run `make html` in the /doc folder, and then browse to
+The best place to get started is the "SAIO - Swift All In One", which will walk
+you through setting up a development cluster of Swift in a VM.
+For more information, vist us at, or come hang out
+on our IRC channel, #openstack on freenode.
+Swift Development Team
diff --git a/bin/ b/bin/
new file mode 100755
index 000000000..ede5c985d
--- /dev/null
+++ b/bin/
@@ -0,0 +1,1276 @@
+#!/usr/bin/python -u
+# Copyright (c) 2010 OpenStack, LLC.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+ # Try to use installed swift.common.client...
+ from swift.common.client import get_auth, ClientException, Connection
+ # But if not installed, use an included copy.
+ # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
+ # Inclusion of swift.common.client
+ """
+ Cloud Files client library used internally
+ """
+ import socket
+ from cStringIO import StringIO
+ from httplib import HTTPConnection, HTTPException, HTTPSConnection
+ from re import compile, DOTALL
+ from tokenize import generate_tokens, STRING, NAME, OP
+ from urllib import quote as _quote, unquote
+ from urlparse import urlparse, urlunparse
+ try:
+ from eventlet import sleep
+ except:
+ from time import sleep
+ def quote(value, safe='/'):
+ """
+ Patched version of urllib.quote that encodes utf8 strings before quoting
+ """
+ if isinstance(value, unicode):
+ value = value.encode('utf8')
+ return _quote(value, safe)
+ # look for a real json parser first
+ try:
+ # simplejson is popular and pretty good
+ from simplejson import loads as json_loads
+ except ImportError:
+ try:
+ # 2.6 will have a json module in the stdlib
+ from json import loads as json_loads
+ except ImportError:
+ # fall back on local parser otherwise
+ comments = compile(r'/\*.*\*/|//[^\r\n]*', DOTALL)
+ def json_loads(string):
+ '''
+ Fairly competent json parser exploiting the python tokenizer and
+ eval(). -- From python-cloudfiles
+ _loads(serialized_json) -> object
+ '''
+ try:
+ res = []
+ consts = {'true': True, 'false': False, 'null': None}
+ string = '(' + comments.sub('', string) + ')'
+ for type, val, _, _, _ in \
+ generate_tokens(StringIO(string).readline):
+ if (type == OP and val not in '[]{}:,()-') or \
+ (type == NAME and val not in consts):
+ raise AttributeError()
+ elif type == STRING:
+ res.append('u')
+ res.append(val.replace('\\/', '/'))
+ else:
+ res.append(val)
+ return eval(''.join(res), {}, consts)
+ except:
+ raise AttributeError()
+ class ClientException(Exception):
+ def __init__(self, msg, http_scheme='', http_host='', http_port='',
+ http_path='', http_query='', http_status=0, http_reason='',
+ http_device=''):
+ Exception.__init__(self, msg)
+ self.msg = msg
+ self.http_scheme = http_scheme
+ self.http_host = http_host
+ self.http_port = http_port
+ self.http_path = http_path
+ self.http_query = http_query
+ self.http_status = http_status
+ self.http_reason = http_reason
+ self.http_device = http_device
+ def __str__(self):
+ a = self.msg
+ b = ''
+ if self.http_scheme:
+ b += '%s://' % self.http_scheme
+ if self.http_host:
+ b += self.http_host
+ if self.http_port:
+ b += ':%s' % self.http_port
+ if self.http_path:
+ b += self.http_path
+ if self.http_query:
+ b += '?%s' % self.http_query
+ if self.http_status:
+ if b:
+ b = '%s %s' % (b, self.http_status)
+ else:
+ b = str(self.http_status)
+ if self.http_reason:
+ if b:
+ b = '%s %s' % (b, self.http_reason)
+ else:
+ b = '- %s' % self.http_reason
+ if self.http_device:
+ if b:
+ b = '%s: device %s' % (b, self.http_device)
+ else:
+ b = 'device %s' % self.http_device
+ return b and '%s: %s' % (a, b) or a
+ def http_connection(url):
+ """
+ Make an HTTPConnection or HTTPSConnection
+ :param url: url to connect to
+ :returns: tuple of (parsed url, connection object)
+ :raises ClientException: Unable to handle protocol scheme
+ """
+ parsed = urlparse(url)
+ if parsed.scheme == 'http':
+ conn = HTTPConnection(parsed.netloc)
+ elif parsed.scheme == 'https':
+ conn = HTTPSConnection(parsed.netloc)
+ else:
+ raise ClientException('Cannot handle protocol scheme %s for url %s' %
+ (parsed.scheme, repr(url)))
+ return parsed, conn
+ def get_auth(url, user, key, snet=False):
+ """
+ Get authentication credentials
+ :param url: authentication URL
+ :param user: user to auth as
+ :param key: key or passowrd for auth
+ :param snet: use SERVICENET internal network default is False
+ :returns: tuple of (storage URL, storage token, auth token)
+ :raises ClientException: HTTP GET request to auth URL failed
+ """
+ parsed, conn = http_connection(url)
+ conn.request('GET', parsed.path, '',
+ {'X-Auth-User': user, 'X-Auth-Key': key})
+ resp = conn.getresponse()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException('Auth GET failed', http_scheme=parsed.scheme,
+, http_port=conn.port,
+ http_path=parsed.path, http_status=resp.status,
+ http_reason=resp.reason)
+ url = resp.getheader('x-storage-url')
+ if snet:
+ parsed = list(urlparse(url))
+ # Second item in the list is the netloc
+ parsed[1] = 'snet-' + parsed[1]
+ url = urlunparse(parsed)
+ return url, resp.getheader('x-storage-token',
+ resp.getheader('x-auth-token'))
+ def get_account(url, token, marker=None, limit=None, prefix=None,
+ http_conn=None, full_listing=False):
+ """
+ Get a listing of containers for the account.
+ :param url: storage URL
+ :param token: auth token
+ :param marker: marker query
+ :param limit: limit query
+ :param prefix: prefix query
+ :param http_conn: HTTP connection object (If None, it will create the
+ conn object)
+ :param full_listing: if True, return a full listing, else returns a max
+ of 10000 listings
+ :returns: a list of accounts
+ :raises ClientException: HTTP GET request failed
+ """
+ if not http_conn:
+ http_conn = http_connection(url)
+ if full_listing:
+ rv = []
+ listing = get_account(url, token, marker, limit, prefix, http_conn)
+ while listing:
+ rv.extend(listing)
+ marker = listing[-1]['name']
+ listing = get_account(url, token, marker, limit, prefix, http_conn)
+ return rv
+ parsed, conn = http_conn
+ qs = 'format=json'
+ if marker:
+ qs += '&marker=%s' % quote(marker)
+ if limit:
+ qs += '&limit=%d' % limit
+ if prefix:
+ qs += '&prefix=%s' % quote(prefix)
+ conn.request('GET', '%s?%s' % (parsed.path, qs), '',
+ {'X-Auth-Token': token})
+ resp = conn.getresponse()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException('Account GET failed', http_scheme=parsed.scheme,
+, http_port=conn.port,
+ http_path=parsed.path, http_query=qs, http_status=resp.status,
+ http_reason=resp.reason)
+ if resp.status == 204:
+ return []
+ return json_loads(
+ def head_account(url, token, http_conn=None):
+ """
+ Get account stats.
+ :param url: storage URL
+ :param token: auth token
+ :param http_conn: HTTP connection object (If None, it will create the
+ conn object)
+ :returns: a tuple of (container count, object count, bytes used)
+ :raises ClientException: HTTP HEAD request failed
+ """
+ if http_conn:
+ parsed, conn = http_conn
+ else:
+ parsed, conn = http_connection(url)
+ conn.request('HEAD', parsed.path, '', {'X-Auth-Token': token})
+ resp = conn.getresponse()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException('Account HEAD failed', http_scheme=parsed.scheme,
+, http_port=conn.port,
+ http_path=parsed.path, http_status=resp.status,
+ http_reason=resp.reason)
+ return int(resp.getheader('x-account-container-count', 0)), \
+ int(resp.getheader('x-account-object-count', 0)), \
+ int(resp.getheader('x-account-bytes-used', 0))
+ def get_container(url, token, container, marker=None, limit=None,
+ prefix=None, delimiter=None, http_conn=None,
+ full_listing=False):
+ """
+ Get a listing of objects for the container.
+ :param url: storage URL
+ :param token: auth token
+ :param container: container name to get a listing for
+ :param marker: marker query
+ :param limit: limit query
+ :param prefix: prefix query
+ :param delimeter: string to delimit the queries on
+ :param http_conn: HTTP connection object (If None, it will create the
+ conn object)
+ :param full_listing: if True, return a full listing, else returns a max
+ of 10000 listings
+ :returns: a list of objects
+ :raises ClientException: HTTP GET request failed
+ """
+ if not http_conn:
+ http_conn = http_connection(url)
+ if full_listing:
+ rv = []
+ listing = get_container(url, token, container, marker, limit, prefix,
+ delimiter, http_conn)
+ while listing:
+ rv.extend(listing)
+ if not delimiter:
+ marker = listing[-1]['name']
+ else:
+ marker = listing[-1].get('name', listing[-1].get('subdir'))
+ listing = get_container(url, token, container, marker, limit,
+ prefix, delimiter, http_conn)
+ return rv
+ parsed, conn = http_conn
+ path = '%s/%s' % (parsed.path, quote(container))
+ qs = 'format=json'
+ if marker:
+ qs += '&marker=%s' % quote(marker)
+ if limit:
+ qs += '&limit=%d' % limit
+ if prefix:
+ qs += '&prefix=%s' % quote(prefix)
+ if delimiter:
+ qs += '&delimiter=%s' % quote(delimiter)
+ conn.request('GET', '%s?%s' % (path, qs), '', {'X-Auth-Token': token})
+ resp = conn.getresponse()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException('Container GET failed',
+ http_scheme=parsed.scheme,,
+ http_port=conn.port, http_path=path, http_query=qs,
+ http_status=resp.status, http_reason=resp.reason)
+ if resp.status == 204:
+ return []
+ return json_loads(
+ def head_container(url, token, container, http_conn=None):
+ """
+ Get container stats.
+ :param url: storage URL
+ :param token: auth token
+ :param container: container name to get stats for
+ :param http_conn: HTTP connection object (If None, it will create the
+ conn object)
+ :returns: a tuple of (object count, bytes used)
+ :raises ClientException: HTTP HEAD request failed
+ """
+ if http_conn:
+ parsed, conn = http_conn
+ else:
+ parsed, conn = http_connection(url)
+ path = '%s/%s' % (parsed.path, quote(container))
+ conn.request('HEAD', path, '', {'X-Auth-Token': token})
+ resp = conn.getresponse()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException('Container HEAD failed',
+ http_scheme=parsed.scheme,,
+ http_port=conn.port, http_path=path, http_status=resp.status,
+ http_reason=resp.reason)
+ return int(resp.getheader('x-container-object-count', 0)), \
+ int(resp.getheader('x-container-bytes-used', 0))
+ def put_container(url, token, container, http_conn=None):
+ """
+ Create a container
+ :param url: storage URL
+ :param token: auth token
+ :param container: container name to create
+ :param http_conn: HTTP connection object (If None, it will create the
+ conn object)
+ :raises ClientException: HTTP PUT request failed
+ """
+ if http_conn:
+ parsed, conn = http_conn
+ else:
+ parsed, conn = http_connection(url)
+ path = '%s/%s' % (parsed.path, quote(container))
+ conn.request('PUT', path, '', {'X-Auth-Token': token})
+ resp = conn.getresponse()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException('Container PUT failed',
+ http_scheme=parsed.scheme,,
+ http_port=conn.port, http_path=path, http_status=resp.status,
+ http_reason=resp.reason)
+ def delete_container(url, token, container, http_conn=None):
+ """
+ Delete a container
+ :param url: storage URL
+ :param token: auth token
+ :param container: container name to delete
+ :param http_conn: HTTP connection object (If None, it will create the
+ conn object)
+ :raises ClientException: HTTP DELETE request failed
+ """
+ if http_conn:
+ parsed, conn = http_conn
+ else:
+ parsed, conn = http_connection(url)
+ path = '%s/%s' % (parsed.path, quote(container))
+ conn.request('DELETE', path, '', {'X-Auth-Token': token})
+ resp = conn.getresponse()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException('Container DELETE failed',
+ http_scheme=parsed.scheme,,
+ http_port=conn.port, http_path=path, http_status=resp.status,
+ http_reason=resp.reason)
+ def get_object(url, token, container, name, http_conn=None,
+ resp_chunk_size=None):
+ """
+ Get an object
+ :param url: storage URL
+ :param token: auth token
+ :param container: container name that the object is in
+ :param name: object name to get
+ :param http_conn: HTTP connection object (If None, it will create the
+ conn object)
+ :param resp_chunk_size: if defined, chunk size of data to read
+ :returns: a list of objects
+ :raises ClientException: HTTP GET request failed
+ """
+ if http_conn:
+ parsed, conn = http_conn
+ else:
+ parsed, conn = http_connection(url)
+ path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
+ conn.request('GET', path, '', {'X-Auth-Token': token})
+ resp = conn.getresponse()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException('Object GET failed', http_scheme=parsed.scheme,
+, http_port=conn.port, http_path=path,
+ http_status=resp.status, http_reason=resp.reason)
+ metadata = {}
+ for key, value in resp.getheaders():
+ if key.lower().startswith('x-object-meta-'):
+ metadata[unquote(key[len('x-object-meta-'):])] = unquote(value)
+ if resp_chunk_size:
+ def _object_body():
+ buf =
+ while buf:
+ yield buf
+ buf =
+ object_body = _object_body()
+ else:
+ object_body =
+ return resp.getheader('content-type'), \
+ int(resp.getheader('content-length', 0)), \
+ resp.getheader('last-modified'), \
+ resp.getheader('etag').strip('"'), \
+ metadata, \
+ object_body
+ def head_object(url, token, container, name, http_conn=None):
+ """
+ Get object info
+ :param url: storage URL
+ :param token: auth token
+ :param container: container name that the object is in
+ :param name: object name to get info for
+ :param http_conn: HTTP connection object (If None, it will create the
+ conn object)
+ :returns: a tuple of (content type, content length, last modfied, etag,
+ dictionary of metadata)
+ :raises ClientException: HTTP HEAD request failed
+ """
+ if http_conn:
+ parsed, conn = http_conn
+ else:
+ parsed, conn = http_connection(url)
+ path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
+ conn.request('HEAD', path, '', {'X-Auth-Token': token})
+ resp = conn.getresponse()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException('Object HEAD failed', http_scheme=parsed.scheme,
+, http_port=conn.port, http_path=path,
+ http_status=resp.status, http_reason=resp.reason)
+ metadata = {}
+ for key, value in resp.getheaders():
+ if key.lower().startswith('x-object-meta-'):
+ metadata[unquote(key[len('x-object-meta-'):])] = unquote(value)
+ return resp.getheader('content-type'), \
+ int(resp.getheader('content-length', 0)), \
+ resp.getheader('last-modified'), \
+ resp.getheader('etag').strip('"'), \
+ metadata
+ def put_object(url, token, container, name, contents, metadata={},
+ content_length=None, etag=None, chunk_size=65536,
+ content_type=None, http_conn=None):
+ """
+ Put an object
+ :param url: storage URL
+ :param token: auth token
+ :param container: container name that the object is in
+ :param name: object name to put
+ :param contents: file like object to read object data from
+ :param metadata: dictionary of object metadata
+ :param content_length: value to send as content-length header
+ :param etag: etag of contents
+ :param chunk_size: chunk size of data to write
+ :param content_type: value to send as content-type header
+ :param http_conn: HTTP connection object (If None, it will create the
+ conn object)
+ :returns: etag from server response
+ :raises ClientException: HTTP PUT request failed
+ """
+ if http_conn:
+ parsed, conn = http_conn
+ else:
+ parsed, conn = http_connection(url)
+ path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
+ headers = {'X-Auth-Token': token}
+ for key, value in metadata.iteritems():
+ headers['X-Object-Meta-%s' % quote(key)] = quote(value)
+ if etag:
+ headers['ETag'] = etag.strip('"')
+ if content_length is not None:
+ headers['Content-Length'] = str(content_length)
+ if content_type is not None:
+ headers['Content-Type'] = content_type
+ if not contents:
+ headers['Content-Length'] = '0'
+ if hasattr(contents, 'read'):
+ conn.putrequest('PUT', path)
+ for header, value in headers.iteritems():
+ conn.putheader(header, value)
+ if not content_length:
+ conn.putheader('Transfer-Encoding', 'chunked')
+ conn.endheaders()
+ chunk =
+ while chunk:
+ if not content_length:
+ conn.send('%x\r\n%s\r\n' % (len(chunk), chunk))
+ else:
+ conn.send(chunk)
+ chunk =
+ if not content_length:
+ conn.send('0\r\n\r\n')
+ else:
+ conn.request('PUT', path, contents, headers)
+ resp = conn.getresponse()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException('Object PUT failed', http_scheme=parsed.scheme,
+, http_port=conn.port, http_path=path,
+ http_status=resp.status, http_reason=resp.reason)
+ return resp.getheader('etag').strip('"')
+ def post_object(url, token, container, name, metadata, http_conn=None):
+ """
+ Change object metadata
+ :param url: storage URL
+ :param token: auth token
+ :param container: container name that the object is in
+ :param name: object name to change
+ :param metadata: dictionary of object metadata
+ :param http_conn: HTTP connection object (If None, it will create the
+ conn object)
+ :raises ClientException: HTTP POST request failed
+ """
+ if http_conn:
+ parsed, conn = http_conn
+ else:
+ parsed, conn = http_connection(url)
+ path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
+ headers = {'X-Auth-Token': token}
+ for key, value in metadata.iteritems():
+ headers['X-Object-Meta-%s' % quote(key)] = quote(value)
+ conn.request('POST', path, '', headers)
+ resp = conn.getresponse()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException('Object POST failed', http_scheme=parsed.scheme,
+, http_port=conn.port, http_path=path,
+ http_status=resp.status, http_reason=resp.reason)
+ def delete_object(url, token, container, name, http_conn=None):
+ """
+ Delete object
+ :param url: storage URL
+ :param token: auth token
+ :param container: container name that the object is in
+ :param name: object name to delete
+ :param http_conn: HTTP connection object (If None, it will create the
+ conn object)
+ :raises ClientException: HTTP DELETE request failed
+ """
+ if http_conn:
+ parsed, conn = http_conn
+ else:
+ parsed, conn = http_connection(url)
+ path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
+ conn.request('DELETE', path, '', {'X-Auth-Token': token})
+ resp = conn.getresponse()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException('Object DELETE failed',
+ http_scheme=parsed.scheme,,
+ http_port=conn.port, http_path=path, http_status=resp.status,
+ http_reason=resp.reason)
+ class Connection(object):
+ """Convenience class to make requests that will also retry the request"""
+ def __init__(self, authurl, user, key, retries=5, preauthurl=None,
+ preauthtoken=None, snet=False):
+ """
+ :param authurl: authenitcation URL
+ :param user: user name to authenticate as
+ :param key: key/password to authenticate with
+ :param retries: Number of times to retry the request before failing
+ :param preauthurl: storage URL (if you have already authenticated)
+ :param preauthtoken: authentication token (if you have already
+ authenticated)
+ :param snet: use SERVICENET internal network default is False
+ """
+ self.authurl = authurl
+ self.user = user
+ self.key = key
+ self.retries = retries
+ self.http_conn = None
+ self.url = preauthurl
+ self.token = preauthtoken
+ self.attempts = 0
+ self.snet = snet
+ def _retry(self, func, *args, **kwargs):
+ kwargs['http_conn'] = self.http_conn
+ self.attempts = 0
+ backoff = 1
+ while self.attempts <= self.retries:
+ self.attempts += 1
+ try:
+ if not self.url or not self.token:
+ self.url, self.token = \
+ get_auth(self.authurl, self.user, self.key, snet=self.snet)
+ self.http_conn = None
+ if not self.http_conn:
+ self.http_conn = http_connection(self.url)
+ kwargs['http_conn'] = self.http_conn
+ rv = func(self.url, self.token, *args, **kwargs)
+ return rv
+ except (socket.error, HTTPException):
+ if self.attempts > self.retries:
+ raise
+ self.http_conn = None
+ except ClientException, err:
+ if self.attempts > self.retries:
+ raise
+ if err.http_status == 401:
+ self.url = self.token = None
+ if self.attempts > 1:
+ raise
+ elif 500 <= err.http_status <= 599:
+ pass
+ else:
+ raise
+ sleep(backoff)
+ backoff *= 2
+ def head_account(self):
+ """Wrapper for head_account"""
+ return self._retry(head_account)
+ def get_account(self, marker=None, limit=None, prefix=None,
+ full_listing=False):
+ """Wrapper for get_account"""
+ # TODO: With full_listing=True this will restart the entire listing
+ # with each retry. Need to make a better version that just retries
+ # where it left off.
+ return self._retry(get_account, marker=marker, limit=limit,
+ prefix=prefix, full_listing=full_listing)
+ def head_container(self, container):
+ """Wrapper for head_container"""
+ return self._retry(head_container, container)
+ def get_container(self, container, marker=None, limit=None, prefix=None,
+ delimiter=None, full_listing=False):
+ """Wrapper for get_container"""
+ # TODO: With full_listing=True this will restart the entire listing
+ # with each retry. Need to make a better version that just retries
+ # where it left off.
+ return self._retry(get_container, container, marker=marker,
+ limit=limit, prefix=prefix, delimiter=delimiter,
+ full_listing=full_listing)
+ def put_container(self, container):
+ """Wrapper for put_container"""
+ return self._retry(put_container, container)
+ def delete_container(self, container):
+ """Wrapper for delete_container"""
+ return self._retry(delete_container, container)
+ def head_object(self, container, obj):
+ """Wrapper for head_object"""
+ return self._retry(head_object, container, obj)
+ def get_object(self, container, obj, resp_chunk_size=None):
+ """Wrapper for get_object"""
+ return self._retry(get_object, container, obj,
+ resp_chunk_size=resp_chunk_size)
+ def put_object(self, container, obj, contents, metadata={},
+ content_length=None, etag=None, chunk_size=65536,
+ content_type=None):
+ """Wrapper for put_object"""
+ return self._retry(put_object, container, obj, contents,
+ metadata=metadata, content_length=content_length, etag=etag,
+ chunk_size=chunk_size, content_type=content_type)
+ def post_object(self, container, obj, metadata):
+ """Wrapper for post_object"""
+ return self._retry(post_object, container, obj, metadata)
+ def delete_object(self, container, obj):
+ """Wrapper for delete_object"""
+ return self._retry(delete_object, container, obj)
+ # End inclusion of swift.common.client
+ # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
+from errno import EEXIST, ENOENT
+from hashlib import md5
+from optparse import OptionParser
+from os import environ, listdir, makedirs, utime
+from os.path import basename, dirname, getmtime, getsize, isdir, join
+from Queue import Empty, Queue
+from sys import argv, exit, stderr
+from threading import enumerate as threading_enumerate, Thread
+from time import sleep
+def mkdirs(path):
+ try:
+ makedirs(path)
+ except OSError, err:
+ if err.errno != EEXIST:
+ raise
+class QueueFunctionThread(Thread):
+ def __init__(self, queue, func, *args, **kwargs):
+ """ Calls func for each item in queue; func is called with a queued
+ item as the first arg followed by *args and **kwargs. Use the abort
+ attribute to have the thread empty the queue (without processing)
+ and exit. """
+ Thread.__init__(self)
+ self.abort = False
+ self.queue = queue
+ self.func = func
+ self.args = args
+ self.kwargs = kwargs
+ def run(self):
+ while True:
+ try:
+ item = self.queue.get_nowait()
+ if not self.abort:
+ self.func(item, *self.args, **self.kwargs)
+ self.queue.task_done()
+ except Empty:
+ if self.abort:
+ break
+ sleep(0.01)
+st_delete_help = '''
+delete --all OR delete container [object] [object] ...
+ Deletes everything in the account (with --all), or everything in a
+ container, or a list of objects depending on the args given.'''.strip('\n')
+def st_delete(options, args):
+ if (not args and not options.yes_all) or (args and options.yes_all):
+ options.error_queue.put('Usage: %s [options] %s' %
+ (basename(argv[0]), st_delete_help))
+ return
+ object_queue = Queue(10000)
+ def _delete_object((container, obj), conn):
+ try:
+ conn.delete_object(container, obj)
+ if options.verbose:
+ path = options.yes_all and join(container, obj) or obj
+ if path[:1] in ('/', '\\'):
+ path = path[1:]
+ options.print_queue.put(path)
+ except ClientException, err:
+ if err.http_status != 404:
+ raise
+ options.error_queue.put('Object %s not found' %
+ repr('%s/%s' % (container, obj)))
+ container_queue = Queue(10000)
+ def _delete_container(container, conn):
+ try:
+ marker = ''
+ while True:
+ objects = [o['name'] for o in
+ conn.get_container(container, marker=marker)]
+ if not objects:
+ break
+ for obj in objects:
+ object_queue.put((container, obj))
+ marker = objects[-1]
+ while not object_queue.empty():
+ sleep(0.01)
+ attempts = 1
+ while True:
+ try:
+ conn.delete_container(container)
+ break
+ except ClientException, err:
+ if err.http_status != 409:
+ raise
+ if attempts > 10:
+ raise
+ attempts += 1
+ sleep(1)
+ except ClientException, err:
+ if err.http_status != 404:
+ raise
+ options.error_queue.put('Container %s not found' % repr(container))
+ url, token = get_auth(options.auth, options.user, options.key, snet=options.snet)
+ create_connection = lambda: Connection(options.auth, options.user,
+ options.key, preauthurl=url,
+ preauthtoken=token, snet=options.snet)
+ object_threads = [QueueFunctionThread(object_queue, _delete_object,
+ create_connection()) for _ in xrange(10)]
+ for thread in object_threads:
+ thread.start()
+ container_threads = [QueueFunctionThread(container_queue,
+ _delete_container, create_connection()) for _ in xrange(10)]
+ for thread in container_threads:
+ thread.start()
+ if not args:
+ conn = create_connection()
+ try:
+ marker = ''
+ while True:
+ containers = \
+ [c['name'] for c in conn.get_account(marker=marker)]
+ if not containers:
+ break
+ for container in containers:
+ container_queue.put(container)
+ marker = containers[-1]
+ while not container_queue.empty():
+ sleep(0.01)
+ while not object_queue.empty():
+ sleep(0.01)
+ except ClientException, err:
+ if err.http_status != 404:
+ raise
+ options.error_queue.put('Account not found')
+ elif len(args) == 1:
+ conn = create_connection()
+ _delete_container(args[0], conn)
+ else:
+ for obj in args[1:]:
+ object_queue.put((args[0], obj))
+ while not container_queue.empty():
+ sleep(0.01)
+ for thread in container_threads:
+ thread.abort = True
+ while thread.isAlive():
+ thread.join(0.01)
+ while not object_queue.empty():
+ sleep(0.01)
+ for thread in object_threads:
+ thread.abort = True
+ while thread.isAlive():
+ thread.join(0.01)
+st_download_help = '''
+download --all OR download container [object] [object] ...
+ Downloads everything in the account (with --all), or everything in a
+ container, or a list of objects depending on the args given.'''.strip('\n')
+def st_download(options, args):
+ if (not args and not options.yes_all) or (args and options.yes_all):
+ options.error_queue.put('Usage: %s [options] %s' %
+ (basename(argv[0]), st_download_help))
+ return
+ object_queue = Queue(10000)
+ def _download_object((container, obj), conn):
+ try:
+ content_type, content_length, _, etag, metadata, body = \
+ conn.get_object(container, obj, resp_chunk_size=65536)
+ path = options.yes_all and join(container, obj) or obj
+ if path[:1] in ('/', '\\'):
+ path = path[1:]
+ if content_type.split(';', 1)[0] == 'text/directory':
+ if not isdir(path):
+ mkdirs(path)
+ read_length = 0
+ md5sum = md5()
+ for chunk in body:
+ read_length += len(chunk)
+ md5sum.update(chunk)
+ else:
+ dirpath = dirname(path)
+ if dirpath and not isdir(dirpath):
+ mkdirs(dirpath)
+ fp = open(path, 'wb')
+ read_length = 0
+ md5sum = md5()
+ for chunk in body :
+ fp.write(chunk)
+ read_length += len(chunk)
+ md5sum.update(chunk)
+ fp.close()
+ if md5sum.hexdigest() != etag:
+ options.error_queue.put('%s: md5sum != etag, %s != %s' %
+ (path, md5sum.hexdigest(), etag))
+ if read_length != content_length:
+ options.error_queue.put(
+ '%s: read_length != content_length, %d != %d' %
+ (path, read_length, content_length))
+ if 'mtime' in metadata:
+ mtime = float(metadata['mtime'])
+ utime(path, (mtime, mtime))
+ if options.verbose:
+ options.print_queue.put(path)
+ except ClientException, err:
+ if err.http_status != 404:
+ raise
+ options.error_queue.put('Object %s not found' %
+ repr('%s/%s' % (container, obj)))
+ container_queue = Queue(10000)
+ def _download_container(container, conn):
+ try:
+ marker = ''
+ while True:
+ objects = [o['name'] for o in
+ conn.get_container(container, marker=marker)]
+ if not objects:
+ break
+ for obj in objects:
+ object_queue.put((container, obj))
+ marker = objects[-1]
+ except ClientException, err:
+ if err.http_status != 404:
+ raise
+ options.error_queue.put('Container %s not found' % repr(container))
+ url, token = get_auth(options.auth, options.user, options.key, snet=options.snet)
+ create_connection = lambda: Connection(options.auth, options.user,
+ options.key, preauthurl=url,
+ preauthtoken=token, snet=options.snet)
+ object_threads = [QueueFunctionThread(object_queue, _download_object,
+ create_connection()) for _ in xrange(10)]
+ for thread in object_threads:
+ thread.start()
+ container_threads = [QueueFunctionThread(container_queue,
+ _download_container, create_connection()) for _ in xrange(10)]
+ for thread in container_threads:
+ thread.start()
+ if not args:
+ conn = create_connection()
+ try:
+ marker = ''
+ while True:
+ containers = [c['name']
+ for c in conn.get_account(marker=marker)]
+ if not containers:
+ break
+ for container in containers:
+ container_queue.put(container)
+ marker = containers[-1]
+ except ClientException, err:
+ if err.http_status != 404:
+ raise
+ options.error_queue.put('Account not found')
+ elif len(args) == 1:
+ _download_container(args[0], create_connection())
+ else:
+ for obj in args[1:]:
+ object_queue.put((args[0], obj))
+ while not container_queue.empty():
+ sleep(0.01)
+ for thread in container_threads:
+ thread.abort = True
+ while thread.isAlive():
+ thread.join(0.01)
+ while not object_queue.empty():
+ sleep(0.01)
+ for thread in object_threads:
+ thread.abort = True
+ while thread.isAlive():
+ thread.join(0.01)
+st_list_help = '''
+list [options] [container]
+ Lists the containers for the account or the objects for a container. -p or
+ --prefix is an option that will only list items beginning with that prefix.
+ -d or --delimiter is option (for container listings only) that will roll up
+ items with the given delimiter (see Cloud Files general documentation for
+ what this means).
+def st_list(options, args):
+ if len(args) > 1:
+ options.error_queue.put('Usage: %s [options] %s' %
+ (basename(argv[0]), st_list_help))
+ return
+ conn = Connection(options.auth, options.user, options.key, snet=options.snet)
+ try:
+ marker = ''
+ while True:
+ if not args:
+ items = conn.get_account(marker=marker, prefix=options.prefix)
+ else:
+ items = conn.get_container(args[0], marker=marker,
+ prefix=options.prefix, delimiter=options.delimiter)
+ if not items:
+ break
+ for item in items:
+ options.print_queue.put(item.get('name', item.get('subdir')))
+ marker = items[-1].get('name', items[-1].get('subdir'))
+ except ClientException, err:
+ if err.http_status != 404:
+ raise
+ if not args:
+ options.error_queue.put('Account not found')
+ else:
+ options.error_queue.put('Container %s not found' % repr(args[0]))
+st_stat_help = '''
+stat [container] [object]
+ Displays information for the account, container, or object depending on the
+ args given (if any).'''.strip('\n')
+def st_stat(options, args):
+ conn = Connection(options.auth, options.user, options.key)
+ if not args:
+ try:
+ container_count, object_count, bytes_used = conn.head_account()
+ options.print_queue.put('''
+ Account: %s
+Containers: %d
+ Objects: %d
+ Bytes: %d'''.strip('\n') % (conn.url.rsplit('/', 1)[-1], container_count,
+ object_count, bytes_used))
+ except ClientException, err:
+ if err.http_status != 404:
+ raise
+ options.error_queue.put('Account not found')
+ elif len(args) == 1:
+ try:
+ object_count, bytes_used = conn.head_container(args[0])
+ options.print_queue.put('''
+ Account: %s
+Container: %s
+ Objects: %d
+ Bytes: %d'''.strip('\n') % (conn.url.rsplit('/', 1)[-1], args[0],
+ object_count, bytes_used))
+ except ClientException, err:
+ if err.http_status != 404:
+ raise
+ options.error_queue.put('Container %s not found' % repr(args[0]))
+ elif len(args) == 2:
+ try:
+ content_type, content_length, last_modified, etag, metadata = \
+ conn.head_object(args[0], args[1])
+ options.print_queue.put('''
+ Account: %s
+ Container: %s
+ Object: %s
+ Content Type: %s
+Content Length: %d
+ Last Modified: %s
+ ETag: %s'''.strip('\n') % (conn.url.rsplit('/', 1)[-1], args[0],
+ args[1], content_type, content_length,
+ last_modified, etag))
+ for key, value in metadata.items():
+ options.print_queue.put('%14s: %s' % ('Meta %s' % key, value))
+ except ClientException, err:
+ if err.http_status != 404:
+ raise
+ options.error_queue.put('Object %s not found' %
+ repr('%s/%s' % (args[0], args[1])))
+ else:
+ options.error_queue.put('Usage: %s [options] %s' %
+ (basename(argv[0]), st_stat_help))
+st_upload_help = '''
+upload [options] container file_or_directory [file_or_directory] [...]
+ Uploads to the given container the files and directories specified by the
+ remaining args. -c or --changed is an option that will only upload files
+ that have changed since the last upload.'''.strip('\n')
+def st_upload(options, args):
+ if len(args) < 2:
+ options.error_queue.put('Usage: %s [options] %s' %
+ (basename(argv[0]), st_upload_help))
+ return
+ file_queue = Queue(10000)
+ def _upload_file((path, dir_marker), conn):
+ try:
+ obj = path
+ if obj.startswith('./') or obj.startswith('.\\'):
+ obj = obj[2:]
+ metadata = {'mtime': str(getmtime(path))}
+ if dir_marker:
+ if options.changed:
+ try:
+ ct, cl, lm, et, md = conn.head_object(args[0], obj)
+ if ct.split(';', 1)[0] == 'text/directory' and \
+ cl == 0 and \
+ et == 'd41d8cd98f00b204e9800998ecf8427e' and \
+ md.get('mtime') == metadata['mtime']:
+ return
+ except ClientException, err:
+ if err.http_status != 404:
+ raise
+ conn.put_object(args[0], obj, '', content_length=0,
+ content_type='text/directory',
+ metadata=metadata)
+ else:
+ if options.changed:
+ try:
+ ct, cl, lm, et, md = conn.head_object(args[0], obj)
+ if cl == getsize(path) and \
+ md.get('mtime') == metadata['mtime']:
+ return
+ except ClientException, err:
+ if err.http_status != 404:
+ raise
+ conn.put_object(args[0], obj, open(path, 'rb'),
+ content_length=getsize(path),
+ metadata=metadata)
+ if options.verbose:
+ options.print_queue.put(obj)
+ except OSError, err:
+ if err.errno != ENOENT:
+ raise
+ options.error_queue.put('Local file %s not found' % repr(path))
+ def _upload_dir(path):
+ names = listdir(path)
+ if not names:
+ file_queue.put((path, True)) # dir_marker = True
+ else:
+ for name in listdir(path):
+ subpath = join(path, name)
+ if isdir(subpath):
+ _upload_dir(subpath)
+ else:
+ file_queue.put((subpath, False)) # dir_marker = False
+ url, token = get_auth(options.auth, options.user, options.key, snet=options.snet)
+ create_connection = lambda: Connection(options.auth, options.user,
+ options.key, preauthurl=url,
+ preauthtoken=token, snet=options.snet)
+ file_threads = [QueueFunctionThread(file_queue, _upload_file,
+ create_connection()) for _ in xrange(10)]
+ for thread in file_threads:
+ thread.start()
+ conn = create_connection()
+ try:
+ conn.put_container(args[0])
+ for arg in args[1:]:
+ if isdir(arg):
+ _upload_dir(arg)
+ else:
+ file_queue.put((arg, False)) # dir_marker = False
+ while not file_queue.empty():
+ sleep(0.01)
+ for thread in file_threads:
+ thread.abort = True
+ while thread.isAlive():
+ thread.join(0.01)
+ except ClientException, err:
+ if err.http_status != 404:
+ raise
+ options.error_queue.put('Account not found')
+if __name__ == '__main__':
+ parser = OptionParser(version='%prog 1.0', usage='''
+Usage: %%prog [options] <command> [args]
+ %(st_stat_help)s
+ %(st_list_help)s
+ %(st_upload_help)s
+ %(st_download_help)s
+ %(st_delete_help)s
+ %%prog -A -U user -K key stat
+'''.strip('\n') % globals())
+ parser.add_option('-s', '--snet', action='store_true', dest='snet',
+ default=False, help='Use SERVICENET internal network')
+ parser.add_option('-q', '--quiet', action='store_false', dest='verbose',
+ default=True, help='Suppress status output')
+ parser.add_option('-a', '--all', action='store_true', dest='yes_all',
+ default=False, help='Indicate that you really want the '
+ 'whole account for commands that require --all in such '
+ 'a case')
+ parser.add_option('-c', '--changed', action='store_true', dest='changed',
+ default=False, help='For the upload command: will '
+ 'only upload files that have changed since the last '
+ 'upload')
+ parser.add_option('-p', '--prefix', dest='prefix',
+ help='For the list command: will only list items '
+ 'beginning with the prefix')
+ parser.add_option('-d', '--delimiter', dest='delimiter',
+ help='For the list command on containers: will roll up '
+ 'items with the given delimiter (see Cloud Files '
+ 'general documentation for what this means).')
+ parser.add_option('-A', '--auth', dest='auth',
+ help='URL for obtaining an auth token')
+ parser.add_option('-U', '--user', dest='user',
+ help='User name for obtaining an auth token')
+ parser.add_option('-K', '--key', dest='key',
+ help='Key for obtaining an auth token')
+ args = argv[1:]
+ if not args:
+ args.append('-h')
+ (options, args) = parser.parse_args(args)
+ required_help = '''
+Requires ST_AUTH, ST_USER, and ST_KEY environment variables be set or
+overridden with -A, -U, or -K.'''.strip('\n')
+ for attr in ('auth', 'user', 'key'):
+ if not getattr(options, attr, None):
+ setattr(options, attr, environ.get('ST_%s' % attr.upper()))
+ if not getattr(options, attr, None):
+ exit(required_help)
+ commands = ('delete', 'download', 'list', 'stat', 'upload')
+ if not args or args[0] not in commands:
+ parser.print_usage()
+ if args:
+ exit('no such command: %s' % args[0])
+ exit()
+ options.print_queue = Queue(10000)
+ def _print(item):
+ if isinstance(item, unicode):
+ item = item.encode('utf8')
+ print item
+ print_thread = QueueFunctionThread(options.print_queue, _print)
+ print_thread.start()
+ options.error_queue = Queue(10000)
+ def _error(item):
+ if isinstance(item, unicode):
+ item = item.encode('utf8')
+ print >>stderr, item
+ error_thread = QueueFunctionThread(options.error_queue, _error)
+ error_thread.start()
+ try:
+ globals()['st_%s' % args[0]](options, args[1:])
+ while not options.print_queue.empty():
+ sleep(0.01)
+ print_thread.abort = True
+ while print_thread.isAlive():
+ print_thread.join(0.01)
+ while not options.error_queue.empty():
+ sleep(0.01)
+ error_thread.abort = True
+ while error_thread.isAlive():
+ error_thread.join(0.01)
+ except:
+ for thread in threading_enumerate():
+ thread.abort = True
+ raise
diff --git a/bin/ b/bin/
new file mode 100755
index 000000000..fe611562d
--- /dev/null
+++ b/bin/
@@ -0,0 +1,351 @@
+# Copyright (c) 2010 OpenStack, LLC.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import os
+import sys
+from urllib import quote
+from hashlib import md5
+import getopt
+from itertools import chain
+import simplejson
+from eventlet.greenpool import GreenPool
+from eventlet.event import Event
+from swift.common.ring import Ring
+from swift.common.utils import split_path
+from swift.common.bufferedhttp import http_connect
+usage = """
+%(cmd)s [options] [url 1] [url 2] ...
+ -c [concurrency] Set the concurrency, default 50
+ -r [ring dir] Ring locations, default /etc/swift
+ -e [filename] File for writing a list of inconsistent urls
+ -d Also download files and verify md5
+You can also feed a list of urls to the script through stdin.
+ %(cmd)s SOSO_88ad0b83-b2c5-4fa1-b2d6-60c597202076
+ %(cmd)s SOSO_88ad0b83-b2c5-4fa1-b2d6-60c597202076/container/object
+ %(cmd)s -e errors.txt SOSO_88ad0b83-b2c5-4fa1-b2d6-60c597202076/container
+ %(cmd)s < errors.txt
+ %(cmd)s -c 25 -d < errors.txt
+""" % {'cmd': sys.argv[0]}
+class Auditor(object):
+ def __init__(self, swift_dir='/etc/swift', concurrency=50, deep=False,
+ error_file=None):
+ self.pool = GreenPool(concurrency)
+ self.object_ring = Ring(os.path.join(swift_dir, 'object.ring.gz'))
+ self.container_ring = Ring(os.path.join(swift_dir, 'container.ring.gz'))
+ self.account_ring = Ring(os.path.join(swift_dir, 'account.ring.gz'))
+ self.deep = deep
+ self.error_file = error_file
+ # zero out stats
+ self.accounts_checked = self.account_exceptions = \
+ self.account_not_found = self.account_container_mismatch = \
+ self.account_object_mismatch = self.objects_checked = \
+ self.object_exceptions = self.object_not_found = \
+ self.object_checksum_mismatch = self.containers_checked = \
+ self.container_exceptions = self.container_count_mismatch = \
+ self.container_not_found = self.container_obj_mismatch = 0
+ self.list_cache = {}
+ self.in_progress = {}
+ def audit_object(self, account, container, name):
+ path = '/%s/%s/%s' % (quote(account), quote(container), quote(name))
+ part, nodes = self.object_ring.get_nodes(account, container, name)
+ container_listing = self.audit_container(account, container)
+ consistent = True
+ if name not in container_listing:
+ print " Object %s missing in container listing!" % path
+ consistent = False
+ hash = None
+ else:
+ hash = container_listing[name]['hash']
+ etags = []
+ for node in nodes:
+ try:
+ if self.deep:
+ conn = http_connect(node['ip'], node['port'],
+ node['device'], part, 'GET', path, {})
+ resp = conn.getresponse()
+ calc_hash = md5()
+ chunk = True
+ while chunk:
+ chunk =
+ calc_hash.update(chunk)
+ calc_hash = calc_hash.hexdigest()
+ if resp.status // 100 != 2:
+ self.object_not_found += 1
+ consistent = False
+ print ' Bad status GETting object "%s" on %s/%s' \
+ % (path, node['ip'], node['device'])
+ continue
+ if resp.getheader('ETag').strip('"') != calc_hash:
+ self.object_checksum_mismatch += 1
+ consistent = False
+ print ' MD5 doesnt match etag for "%s" on %s/%s' \
+ % (path, node['ip'], node['device'])
+ etags.append(resp.getheader('ETag'))
+ else:
+ conn = http_connect(node['ip'], node['port'],
+ node['device'], part, 'HEAD', path, {})
+ resp = conn.getresponse()
+ if resp.status // 100 != 2:
+ self.object_not_found += 1
+ consistent = False
+ print ' Bad status HEADing object "%s" on %s/%s' \
+ % (path, node['ip'], node['device'])
+ continue
+ etags.append(resp.getheader('ETag'))
+ except Exception:
+ self.object_exceptions += 1
+ consistent = False
+ print ' Exception fetching object "%s" on %s/%s' \
+ % (path, node['ip'], node['device'])
+ continue
+ if not etags:
+ consistent = False
+ print " Failed fo fetch object %s at all!" % path
+ elif hash:
+ for etag in etags:
+ if resp.getheader('ETag').strip('"') != hash:
+ consistent = False
+ self.object_checksum_mismatch += 1
+ print ' ETag mismatch for "%s" on %s/%s' \
+ % (path, node['ip'], node['device'])
+ if not consistent and self.error_file:
+ print >>open(self.error_file, 'a'), path
+ self.objects_checked += 1
+ def audit_container(self, account, name, recurse=False):
+ if (account, name) in self.in_progress:
+ self.in_progress[(account, name)].wait()
+ if (account, name) in self.list_cache:
+ return self.list_cache[(account, name)]
+ self.in_progress[(account, name)] = Event()
+ print 'Auditing container "%s"...' % name
+ path = '/%s/%s' % (quote(account), quote(name))
+ account_listing = self.audit_account(account)
+ consistent = True
+ if name not in account_listing:
+ consistent = False
+ print " Container %s not in account listing!" % path
+ part, nodes = self.container_ring.get_nodes(account, name)
+ rec_d = {}
+ responses = {}
+ for node in nodes:
+ marker = ''
+ results = True
+ while results:
+ node_id = node['id']
+ try:
+ conn = http_connect(node['ip'], node['port'], node['device'],
+ part, 'GET', path, {},
+ 'format=json&marker=%s' % quote(marker))
+ resp = conn.getresponse()
+ if resp.status // 100 != 2:
+ self.container_not_found += 1
+ consistent = False
+ print ' Bad status GETting container "%s" on %s/%s' % \
+ (path, node['ip'], node['device'])
+ break
+ if node['id'] not in responses:
+ responses[node['id']] = dict(resp.getheaders())
+ results = simplejson.loads(
+ except Exception:
+ self.container_exceptions += 1
+ consistent = False
+ print ' Exception GETting container "%s" on %s/%s' % \
+ (path, node['ip'], node['device'])
+ break
+ if results:
+ marker = results[-1]['name']
+ for obj in results:
+ obj_name = obj['name']
+ if obj_name not in rec_d:
+ rec_d[obj_name] = obj
+ if obj['last_modified'] != rec_d[obj_name]['last_modified']:
+ self.container_obj_mismatch += 1
+ consistent = False
+ print " Different versions of %s/%s in container dbs." % \
+ (quote(name), quote(obj['name']))
+ if obj['last_modified'] > rec_d[obj_name]['last_modified']:
+ rec_d[obj_name] = obj
+ obj_counts = [int(header['x-container-object-count'])
+ for header in responses.values()]
+ if not obj_counts:
+ consistent = False
+ print " Failed to fetch container %s at all!" % path
+ else:
+ if len(set(obj_counts)) != 1:
+ self.container_count_mismatch += 1
+ consistent = False
+ print " Container databases don't agree on number of objects."
+ print " Max: %s, Min: %s" % (max(obj_counts), min(obj_counts))
+ self.containers_checked += 1
+ self.list_cache[(account, name)] = rec_d
+ self.in_progress[(account, name)].send(True)
+ del self.in_progress[(account, name)]
+ if recurse:
+ for obj in rec_d.keys():
+ self.pool.spawn_n(self.audit_object, account, name, obj)
+ if not consistent and self.error_file:
+ print >>open(self.error_file, 'a'), path
+ return rec_d
+ def audit_account(self, account, recurse=False):
+ if account in self.in_progress:
+ self.in_progress[account].wait()
+ if account in self.list_cache:
+ return self.list_cache[account]
+ self.in_progress[account] = Event()
+ print "Auditing account %s..." % account
+ consistent = True
+ path = '/%s' % account
+ part, nodes = self.account_ring.get_nodes(account)
+ responses = {}
+ for node in nodes:
+ marker = ''
+ results = True
+ while results:
+ node_id = node['id']
+ try:
+ conn = http_connect(node['ip'], node['port'],
+ node['device'], part, 'GET', path, {},
+ 'format=json&marker=%s' % quote(marker))
+ resp = conn.getresponse()
+ if resp.status // 100 != 2:
+ self.account_not_found += 1
+ consistent = False
+ print " Bad status GETting account %(ip)s:%(device)s" \
+ % node
+ break
+ results = simplejson.loads(
+ except Exception:
+ self.account_exceptions += 1
+ consistent = False
+ print " Exception GETting account %(ip)s:%(device)s" % node
+ break
+ if node_id not in responses:
+ responses[node_id] = [dict(resp.getheaders()), []]
+ responses[node_id][1].extend(results)
+ if results:
+ marker = results[-1]['name']
+ headers = [resp[0] for resp in responses.values()]
+ cont_counts = [int(header['x-account-container-count'])
+ for header in headers]
+ if len(set(cont_counts)) != 1:
+ self.account_container_mismatch += 1
+ consistent = False
+ print " Account databases don't agree on number of containers."
+ print " Max: %s, Min: %s" % (max(cont_counts), min(cont_counts))
+ obj_counts = [int(header['x-account-object-count'])
+ for header in headers]
+ if len(set(obj_counts)) != 1:
+ self.account_object_mismatch += 1
+ consistent = False
+ print " Account databases don't agree on number of objects."
+ print " Max: %s, Min: %s" % (max(obj_counts), min(obj_counts))
+ containers = set()
+ for resp in responses.values():
+ containers.update(container['name'] for container in resp[1])
+ self.list_cache[account] = containers
+ self.in_progress[account].send(True)
+ del self.in_progress[account]
+ self.accounts_checked += 1
+ if recurse:
+ for container in containers:
+ self.pool.spawn_n(self.audit_container, account, container, True)
+ if not consistent and self.error_file:
+ print >>open(self.error_file, 'a'), path
+ return containers
+ def audit(self, account, container=None, obj=None):
+ if obj and container:
+ self.pool.spawn_n(self.audit_object, account, container, obj)
+ elif container:
+ self.pool.spawn_n(self.audit_container, account, container, True)
+ else:
+ self.pool.spawn_n(self.audit_account, account, True)
+ def wait(self):
+ self.pool.waitall()
+ def print_stats(self):
+ print
+ print " Accounts checked: %d" % self.accounts_checked
+ if self.account_not_found:
+ print " Missing Replicas: %d" % self.account_not_found
+ if self.account_exceptions:
+ print " Exceptions: %d" % self.account_exceptions
+ if self.account_container_mismatch:
+ print " Cntainer mismatch: %d" % self.account_container_mismatch
+ if self.account_object_mismatch:
+ print " Object mismatch: %d" % self.account_object_mismatch
+ print
+ print "Containers checked: %d" % self.containers_checked
+ if self.container_not_found:
+ print " Missing Replicas: %d" % self.container_not_found
+ if self.container_exceptions:
+ print " Exceptions: %d" % self.container_exceptions
+ if self.container_count_mismatch:
+ print " Count mismatch: %d" % self.container_count_mismatch
+ if self.container_obj_mismatch:
+ print " Obj mismatch: %d" % self.container_obj_mismatch
+ print
+ print " Objects checked: %d" % self.objects_checked
+ if self.object_not_found:
+ print " Missing Replicas: %d" % self.object_not_found
+ if self.object_exceptions:
+ print " Exceptions: %d" % self.object_exceptions
+ if self.object_checksum_mismatch:
+ print " MD5 Mismatch: %d" % self.object_checksum_mismatch
+if __name__ == '__main__':
+ try:
+ optlist, args = getopt.getopt(sys.argv[1:], 'c:r:e:d')
+ except getopt.GetoptError, err:
+ print str(err)
+ print usage
+ sys.exit(2)
+ if not args and os.isatty(sys.stdin.fileno()):
+ print usage
+ sys.exit()
+ opts = dict(optlist)
+ options = {
+ 'concurrency': int(opts.get('-c', 50)),
+ 'error_file': opts.get('-e', None),
+ 'swift_dir': opts.get('-r', '/etc/swift'),
+ 'deep': '-d' in opts,
+ }
+ auditor = Auditor(**options)
+ if not os.isatty(sys.stdin.fileno()):
+ args = chain(args, sys.stdin)
+ for path in args:
+ path = '/' + path.rstrip('\r\n').lstrip('/')
+ auditor.audit(*split_path(path, 1, 3, True))
+ auditor.wait()
+ auditor.print_stats()
diff --git a/bin/ b/bin/
new file mode 100755
index 000000000..2bbb73c23
--- /dev/null
+++ b/bin/
@@ -0,0 +1,69 @@
+# Copyright (c) 2010 OpenStack, LLC.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import os
+import signal
+import sys
+from ConfigParser import ConfigParser
+from swift.account.auditor import AccountAuditor
+from swift.common import utils
+if __name__ == '__main__':
+ if len(sys.argv) < 2:
+ print "Usage: account-auditor CONFIG_FILE [once]"
+ sys.exit()
+ once = len(sys.argv) > 2 and sys.argv[2] == 'once'
+ c = ConfigParser()
+ if not[1]):
+ print "Unable to read config file."
+ sys.exit(1)
+ server_conf = dict(c.items('account-server'))
+ if c.has_section('account-auditor'):
+ auditor_conf = dict(c.items('account-auditor'))
+ else:
+ print "Unable to find account-auditor config section in %s." % \
+ sys.argv[1]
+ sys.exit(1)
+ logger = utils.get_logger(auditor_conf, 'account-auditor')
+ # log uncaught exceptions
+ sys.excepthook = lambda *exc_info: \
+ logger.critical('UNCAUGHT EXCEPTION', exc_info=exc_info)
+ sys.stdout = sys.stderr = utils.LoggerFileObject(logger)
+ utils.drop_privileges(server_conf.get('user', 'swift'))
+ try:
+ os.setsid()
+ except OSError:
+ pass
+ def kill_children(*args):
+ signal.signal(signal.SIGTERM, signal.SIG_IGN)
+ os.killpg(0, signal.SIGTERM)
+ sys.exit()
+ signal.signal(signal.SIGTERM, kill_children)
+ auditor = AccountAuditor(server_conf, auditor_conf)
+ if once:
+ auditor.audit_once()
+ else:
+ auditor.audit_forever()
diff --git a/bin/ b/bin/
new file mode 100755
index 000000000..d62e43add
--- /dev/null
+++ b/bin/
@@ -0,0 +1,69 @@
+# Copyright (c) 2010 OpenStack, LLC.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import os
+import signal
+import sys
+from ConfigParser import ConfigParser
+from swift.account.reaper import AccountReaper
+from swift.common import utils
+if __name__ == '__main__':
+ if len(sys.argv) < 2:
+ print "Usage: account-reaper CONFIG_FILE [once]"
+ sys.exit()
+ once = len(sys.argv) > 2 and sys.argv[2] == 'once'
+ c = ConfigParser()
+ if not[1]):
+ print "Unable to read config file."
+ sys.exit(1)
+ server_conf = dict(c.items('account-server'))
+ if c.has_section('account-reaper'):
+ reaper_conf = dict(c.items('account-reaper'))
+ else:
+ print "Unable to find account-reaper config section in %s." % \
+ sys.argv[1]
+ sys.exit(1)
+ logger = utils.get_logger(reaper_conf, 'account-reaper')
+ # log uncaught exceptions
+ sys.excepthook = lambda *exc_info: \
+ logger.critical('UNCAUGHT EXCEPTION', exc_info=exc_info)
+ sys.stdout = sys.stderr = utils.LoggerFileObject(logger)
+ utils.drop_privileges(server_conf.get('user', 'swift'))
+ try:
+ os.setsid()
+ except OSError:
+ pass
+ def kill_children(*args):
+ signal.signal(signal.SIGTERM, signal.SIG_IGN)
+ os.killpg(0, signal.SIGTERM)
+ sys.exit()
+ signal.signal(signal.SIGTERM, kill_children)
+ reaper = AccountReaper(server_conf, reaper_conf)
+ if once:
+ reaper.reap_once()
+ else:
+ reaper.reap_forever()
diff --git a/bin/ b/bin/
new file mode 100755
index 000000000..3e47eaa4f
--- /dev/null
+++ b/bin/
@@ -0,0 +1,57 @@
+# Copyright (c) 2010 OpenStack, LLC.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import sys
+from ConfigParser import ConfigParser
+import getopt
+from swift.account import server as account_server
+from swift.common import db, db_replicator, utils
+class AccountReplicator(db_replicator.Replicator):
+ server_type = 'account'
+ ring_file = 'account.ring.gz'
+ brokerclass = db.AccountBroker
+ datadir = account_server.DATADIR
+ default_port = 6002
+if __name__ == '__main__':
+ optlist, args = getopt.getopt(sys.argv[1:], '', ['once'])
+ if not args:
+ print "Usage: account-replicator <--once> CONFIG_FILE [once]"
+ sys.exit()
+ c = ConfigParser()
+ if not[0]):
+ print "Unable to read config file."
+ sys.exit(1)
+ once = len(args) > 1 and args[1] == 'once'
+ server_conf = dict(c.items('account-server'))
+ if c.has_section('account-replicator'):
+ replicator_conf = dict(c.items('account-replicator'))
+ else:
+ print "Unable to find account-replicator config section in %s." % \
+ args[0]
+ sys.exit(1)
+ utils.drop_privileges(server_conf.get('user', 'swift'))
+ if once or '--once' in [opt[0] for opt in optlist]:
+ AccountReplicator(server_conf, replicator_conf).replicate_once()
+ else:
+ AccountReplicator(server_conf, replicator_conf).replicate_forever()
diff --git a/bin/ b/bin/
new file mode 100755
index 000000000..d5e802104
--- /dev/null
+++ b/bin/
@@ -0,0 +1,30 @@
+# Copyright (c) 2010 OpenStack, LLC.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from ConfigParser import ConfigParser
+import sys
+from swift.common.wsgi import run_wsgi
+from swift.account.server import AccountController
+if __name__ == '__main__':
+ c = ConfigParser()
+ if not[1]):
+ print "Unable to read config file."
+ sys.exit(1)
+ conf = dict(c.items('account-server'))
+ run_wsgi(AccountController, conf, default_port=6002)
diff --git a/bin/ b/bin/
new file mode 100755
index 000000000..ccaf93cf5
--- /dev/null
+++ b/bin/
@@ -0,0 +1,45 @@
+# Copyright (c) 2010 OpenStack, LLC.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from ConfigParser import ConfigParser
+from sys import argv, exit
+from swift.common.bufferedhttp import http_connect_raw as http_connect
+if __name__ == '__main__':
+ f = '/etc/swift/auth-server.conf'
+ if len(argv) == 5:
+ f = argv[4]
+ elif len(argv) != 4:
+ exit('Syntax: %s <new_account> <new_user> <new_password> [conf_file]' %
+ argv[0])
+ new_account = argv[1]
+ new_user = argv[2]
+ new_password = argv[3]
+ c = ConfigParser()
+ if not
+ exit('Unable to read conf file: %s' % f)
+ conf = dict(c.items('auth-server'))
+ host = conf.get('bind_ip', '')
+ port = int(conf.get('bind_port', 11000))
+ path = '/account/%s/%s' % (new_account, new_user)
+ conn = http_connect(host, port, 'PUT', path, {'x-auth-key':new_password})
+ resp = conn.getresponse()
+ if resp.status == 204:
+ print resp.getheader('x-storage-url')
+ else:
+ print 'Account creation failed. (%d)' % resp.status
diff --git a/bin/ b/bin/
new file mode 100755
index 000000000..51212b4f4
--- /dev/null
+++ b/bin/
@@ -0,0 +1,40 @@
+# Copyright (c) 2010 OpenStack, LLC.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from ConfigParser import ConfigParser
+from sys import argv, exit
+from swift.common.bufferedhttp import http_connect_raw as http_connect
+if __name__ == '__main__':
+ f = '/etc/swift/auth-server.conf'
+ if len(argv) == 2:
+ f = argv[1]
+ elif len(argv) != 1:
+ exit('Syntax: %s [conf_file]' % argv[0])
+ c = ConfigParser()
+ if not
+ exit('Unable to read conf file: %s' % f)
+ conf = dict(c.items('auth-server'))
+ host = conf.get('bind_ip', '')
+ port = int(conf.get('bind_port', 11000))
+ path = '/recreate_accounts'
+ conn = http_connect(host, port, 'POST', path)
+ resp = conn.getresponse()
+ if resp.status == 200:
+ print
+ else:
+ print 'Recreating accounts failed. (%d)' % resp.status
diff --git a/bin/ b/bin/
new file mode 100755
index 000000000..775530e07
--- /dev/null
+++ b/bin/
@@ -0,0 +1,30 @@
+# Copyright (c) 2010 OpenStack, LLC.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from ConfigParser import ConfigParser
+import sys
+from swift.common.wsgi import run_wsgi
+from swift.auth.server import AuthController
+if __name__ == '__main__':
+ c = ConfigParser()
+ if not[1]):
+ print "Unable to read config file."
+ sys.exit(1)
+ conf = dict(c.items('auth-server'))
+ run_wsgi(AuthController, conf, default_port=11000)
diff --git a/bin/ b/bin/
new file mode 100755
index 000000000..1ff1682c5
--- /dev/null
+++ b/bin/
@@ -0,0 +1,69 @@
+# Copyright (c) 2010 OpenStack, LLC.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import os
+import signal
+import sys
+from ConfigParser import ConfigParser
+from swift.container.auditor import ContainerAuditor
+from swift.common import utils
+if __name__ == '__main__':
+ if len(sys.argv) < 2:
+ print "Usage: container-auditor CONFIG_FILE [once]"
+ sys.exit()
+ once = len(sys.argv) > 2 and sys.argv[2] == 'once'
+ c = ConfigParser()
+ if not[1]):
+ print "Unable to read config file."
+ sys.exit(1)
+ server_conf = dict(c.items('container-server'))
+ if c.has_section('container-auditor'):
+ auditor_conf = dict(c.items('container-auditor'))
+ else:
+ print "Unable to find container-auditor config section in %s." % \
+ sys.argv[1]
+ sys.exit(1)
+ logger = utils.get_logger(auditor_conf, 'container-auditor')
+ # log uncaught exceptions
+ sys.excepthook = lambda *exc_info: \
+ logger.critical('UNCAUGHT EXCEPTION', exc_info=exc_info)
+ sys.stdout = sys.stderr = utils.LoggerFileObject(logger)
+ utils.drop_privileges(server_conf.get('user', 'swift'))
+ try:
+ os.setsid()
+ except OSError:
+ pass
+ def kill_children(*args):
+ signal.signal(signal.SIGTERM, signal.SIG_IGN)
+ os.killpg(0, signal.SIGTERM)
+ sys.exit()
+ signal.signal(signal.SIGTERM, kill_children)
+ auditor = ContainerAuditor(server_conf, auditor_conf)
+ if once:
+ auditor.audit_once()
+ else:
+ auditor.audit_forever()
diff --git a/bin/ b/bin/
new file mode 100755
index 000000000..db2ffc8b2
--- /dev/null
+++ b/bin/
@@ -0,0 +1,57 @@
+# Copyright (c) 2010 OpenStack, LLC.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import sys
+from ConfigParser import ConfigParser
+import getopt
+from swift.container import server as container_server
+from swift.common import db, db_replicator, utils
+class ContainerReplicator(db_replicator.Replicator):
+ server_type = 'container'
+ ring_file = 'container.ring.gz'
+ brokerclass = db.ContainerBroker
+ datadir = container_server.DATADIR
+ default_port = 6001
+if __name__ == '__main__':
+ optlist, args = getopt.getopt(sys.argv[1:], '', ['once'])
+ if not args:
+ print "Usage: container-replicator <--once> CONFIG_FILE [once]"
+ sys.exit()
+ c = ConfigParser()
+ if not[0]):
+ print "Unable to read config file."
+ sys.exit(1)
+ once = len(args) > 1 and args[1] == 'once'
+ server_conf = dict(c.items('container-server'))
+ if c.has_section('container-replicator'):
+ replicator_conf = dict(c.items('container-replicator'))
+ else:
+ print "Unable to find container-replicator config section in %s." % \
+ args[0]
+ sys.exit(1)
+ utils.drop_privileges(server_conf.get('user', 'swift'))
+ if once or '--once' in [opt[0] for opt in optlist]:
+ ContainerReplicator(server_conf, replicator_conf).replicate_once()
+ else:
+ ContainerReplicator(server_conf, replicator_conf).replicate_forever()
diff --git a/bin/ b/bin/
new file mode 100755
index 000000000..e0ad813f4
--- /dev/null
+++ b/bin/
@@ -0,0 +1,30 @@
+# Copyright (c) 2010 OpenStack, LLC.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from ConfigParser import ConfigParser
+import sys
+from swift.common.wsgi import run_wsgi
+from swift.container.server import ContainerController
+if __name__ == '__main__':
+ c = ConfigParser()
+ if not[1]):
+ print "Unable to read config file."
+ sys.exit(1)
+ conf = dict(c.items('container-server'))
+ run_wsgi(ContainerController, conf, default_port=6001)
diff --git a/bin/ b/bin/
new file mode 100755
index 000000000..c37ba3680
--- /dev/null
+++ b/bin/
@@ -0,0 +1,63 @@
+# Copyright (c) 2010 OpenStack, LLC.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import os
+import signal
+import sys
+from ConfigParser import ConfigParser
+from swift.container.updater import ContainerUpdater
+from swift.common import utils
+if __name__ == '__main__':
+ if len(sys.argv) < 2:
+ print "Usage: container-updater CONFIG_FILE [once]"
+ sys.exit()
+ once = len(sys.argv) > 2 and sys.argv[2] == 'once'
+ c = ConfigParser()
+ if not[1]):
+ print "Unable to read config file."
+ sys.exit(1)
+ server_conf = dict(c.items('container-server'))
+ if c.has_section('container-updater'):
+ updater_conf = dict(c.items('container-updater'))
+ else:
+ print "Unable to find container-updater config section in %s." % \
+ sys.argv[1]
+ sys.exit(1)
+ utils.drop_privileges(server_conf.get('user', 'swift'))
+ try:
+ os.setsid()
+ except OSError:
+ pass
+ def kill_children(*args):
+ signal.signal(signal.SIGTERM, signal.SIG_IGN)
+ os.killpg(0, signal.SIGTERM)
+ sys.exit()
+ signal.signal(signal.SIGTERM, kill_children)
+ updater = ContainerUpdater(server_conf, updater_conf)
+ if once:
+ updater.update_once_single_threaded()
+ else:
+ updater.update_forever()
diff --git a/bin/ b/bin/
new file mode 100755
index 000000000..cde28c1ed
--- /dev/null
+++ b/bin/
@@ -0,0 +1,125 @@
+# Copyright (c) 2010 OpenStack, LLC.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import datetime
+import os
+import re
+import subprocess
+import sys
+from ConfigParser import ConfigParser
+from swift.common.utils import get_logger
+# To search for more types of errors, add the regex to the list below
+error_re = [
+ 'error.*(sd[a-z])',
+ '(sd[a-z]).*error',
+def get_devices(device_dir, logger):
+ devices = []
+ for line in open('/proc/mounts').readlines():
+ data = line.strip().split()
+ block_device = data[0]
+ mount_point = data[1]
+ if mount_point.startswith(device_dir):
+ device = {}
+ device['mount_point'] = mount_point
+ device['block_device'] = block_device
+ try:
+ device_num = os.stat(block_device).st_rdev
+ except OSError, e:
+ # If we can't stat the device, then something weird is going on
+ logger.error("Error: Could not stat %s!" %
+ block_device)
+ continue
+ device['major'] = str(os.major(device_num))
+ device['minor'] = str(os.minor(device_num))
+ devices.append(device)
+ for line in open('/proc/partitions').readlines()[2:]:
+ major,minor,blocks,kernel_device = line.strip().split()
+ device = [d for d in devices
+ if d['major'] == major and d['minor'] == minor]
+ if device:
+ device[0]['kernel_device'] = kernel_device
+ return devices
+def get_errors(minutes):
+ errors = {}
+ start_time = - datetime.timedelta(minutes=minutes)
+ for line in open('/var/log/kern.log'):
+ if '[ 0.000000]' in line:
+ # Ignore anything before the last boot
+ errors = {}
+ continue
+ log_time_string = '%s %s' % (start_time.year,' '.join(line.split()[:3]))
+ log_time = datetime.datetime.strptime(
+ log_time_string,'%Y %b %d %H:%M:%S')
+ if log_time > start_time:
+ for err in error_re:
+ for device in re.findall(err,line):
+ errors[device] = errors.get(device,0) + 1
+ return errors
+def comment_fstab(mount_point):
+ with open('/etc/fstab', 'r') as fstab:
+ with open('/etc/', 'w') as new_fstab:
+ for line in fstab:
+ parts = line.split()
+ if len(parts) > 2 and line.split()[1] == mount_point:
+ new_fstab.write('#' + line)
+ else:
+ new_fstab.write(line)
+ os.rename('/etc/', '/etc/fstab')
+if __name__ == '__main__':
+ c = ConfigParser()
+ try:
+ conf_path = sys.argv[1]
+ except:
+ print "Usage: %s CONF_FILE" % sys.argv[0].split('/')[-1]
+ sys.exit(1)
+ if not
+ print "Unable to read config file %s" % conf_path
+ sys.exit(1)
+ conf = dict(c.items('drive-audit'))
+ device_dir = conf.get('device_dir', '/srv/node')
+ minutes = int(conf.get('minutes', 60))
+ error_limit = int(conf.get('error_limit', 1))
+ logger = get_logger(conf, 'drive-audit')
+ devices = get_devices(device_dir, logger)
+ logger.debug("Devices found: %s" % str(devices))
+ if not devices:
+ logger.error("Error: No devices found!")
+ errors = get_errors(minutes)
+ logger.debug("Errors found: %s" % str(errors))
+ unmounts = 0
+ for kernel_device,count in errors.items():
+ if count >= error_limit:
+ device = [d for d in devices
+ if d['kernel_device'].startswith(kernel_device)]
+ if device:
+ mount_point = device[0]['mount_point']
+ if mount_point.startswith('/srv/node'):
+"Unmounting %s with %d errors" %
+ (mount_point, count))
+"Commenting out %s from /etc/fstab" %
+ (mount_point))
+ comment_fstab(mount_point)
+ unmounts += 1
+ if unmounts == 0:
+"No drives were unmounted")
diff --git a/bin/ b/bin/
new file mode 100755
index 000000000..0a0c5be8f
--- /dev/null
+++ b/bin/
@@ -0,0 +1,87 @@
+# Copyright (c) 2010 OpenStack, LLC.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import sys
+import urllib
+from swift.common.ring import Ring
+from swift.common.utils import hash_path
+if len(sys.argv) < 3 or len(sys.argv) > 5:
+ print 'Usage: %s <ring.gz> <account> [<container>] [<object>]' % sys.argv[0]
+ print 'Shows the nodes responsible for the item specified.'
+ print 'Example:'
+ print ' $ %s /etc/swift/account.ring.gz MyAccount' % sys.argv[0]
+ print ' Partition 5743883'
+ print ' Hash 96ae332a60b58910784e4417a03e1ad0'
+ print ' sdd1'
+ print ' sdb1'
+ print ' sdf1'
+ sys.exit(1)
+ringloc = None
+account = None
+container = None
+obj = None
+if len(sys.argv) > 4: ring,account,container,obj = sys.argv[1:5]
+elif len(sys.argv) > 3: ring,account,container = sys.argv[1:4]
+elif len(sys.argv) > 2: ring,account = sys.argv[1:3]
+print '\nAccount \t%s' % account
+print 'Container\t%s' % container
+print 'Object \t%s\n' % obj
+if obj:
+ hash_str = hash_path(account,container,obj)
+ part, nodes = Ring(ring).get_nodes(account,container,obj)
+ for node in nodes:
+ print 'Server:Port Device\t%s:%s %s' % (node['ip'], node['port'], node['device'])
+ print '\nPartition\t%s' % part
+ print 'Hash \t%s\n' % hash_str
+ for node in nodes:
+ acct_cont_obj = "%s/%s/%s" % (account, container, obj)
+ print 'curl -I -XHEAD "http://%s:%s/%s/%s/%s"' % (node['ip'],node['port'],node['device'],part,urllib.quote(acct_cont_obj))
+ print "\n"
+ for node in nodes:
+ print 'ssh %s "ls -lah /srv/node/%s/objects/%s/%s/%s/"' % (node['ip'],node['device'],part,hash_str[-3:],hash_str)
+elif container:
+ hash_str = hash_path(account,container)
+ part, nodes = Ring(ring).get_nodes(account,container)
+ for node in nodes:
+ print 'Server:Port Device\t%s:%s %s' % (node['ip'], node['port'], node['device'])
+ print '\nPartition %s' % part
+ print 'Hash %s\n' % hash_str
+ for node in nodes:
+ acct_cont = "%s/%s" % (account,container)
+ print 'curl -I -XHEAD "http://%s:%s/%s/%s/%s"' % (node['ip'],node['port'],node['device'],part,urllib.quote(acct_cont))
+ print "\n"
+ for node in nodes:
+ print 'ssh %s "ls -lah /srv/node/%s/containers/%s/%s/%s/%s.db"' % (node['ip'],node['device'],part,hash_str[-3:],hash_str,hash_str)
+elif account:
+ hash_str = hash_path(account)
+ part, nodes = Ring(ring).get_nodes(account)
+ for node in nodes:
+ print 'Server:Port Device\t%s:%s %s' % (node['ip'], node['port'], node['device'])
+ print '\nPartition %s' % part
+ print 'Hash %s\n' % hash_str
+ for node in nodes:
+ print 'curl -I -XHEAD "http://%s:%s/%s/%s/%s"' % (node['ip'],node['port'],node['device'],part, urllib.quote(account))
+ print "\n"
+ for node in nodes:
+ print 'ssh %s "ls -lah /srv/node/%s/accounts/%s/%s/%s/%s.db"' % (node['ip'],node['device'],part,hash_str[-3:],hash_str,hash_str)
+ print "\n\n"
diff --git a/bin/ b/bin/
new file mode 100755
index 000000000..e403764b4
--- /dev/null
+++ b/bin/
@@ -0,0 +1,181 @@
+# Copyright (c) 2010 OpenStack, LLC.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from __future__ import with_statement
+import errno
+import glob
+import os
+import resource
+import signal
+import sys
+import time
+ALL_SERVERS = ['account-auditor', 'account-server', 'container-auditor',
+ 'container-replicator', 'container-server', 'container-updater',
+ 'object-auditor', 'object-server', 'object-replicator', 'object-updater',
+ 'proxy-server', 'account-replicator', 'auth-server', 'account-reaper']
+GRACEFUL_SHUTDOWN_SERVERS = ['account-server', 'container-server',
+ 'object-server', 'proxy-server', 'auth-server']
+MAX_MEMORY = (1024 * 1024 * 1024) * 2 # 2 GB
+_, server, command = sys.argv
+if server == 'all':
+ servers = ALL_SERVERS
+ if '-' not in server:
+ server = '%s-server' % server
+ servers = [server]
+command = command.lower()
+def pid_files(server):
+ if os.path.exists('/var/run/swift/' % server):
+ pid_files = ['/var/run/swift/' % server]
+ else:
+ pid_files = glob.glob('/var/run/swift/%s/*.pid' % server)
+ for pid_file in pid_files:
+ pid = int(open(pid_file).read().strip())
+ yield pid_file, pid
+def do_start(server, once=False):
+ server_type = '-'.join(server.split('-')[:-1])
+ for pid_file, pid in pid_files(server):
+ if os.path.exists('/proc/%s' % pid):
+ print "%s appears to already be running: %s" % (server, pid_file)
+ return
+ else:
+ print "Removing stale pid file %s" % pid_file
+ os.unlink(pid_file)
+ try:
+ resource.setrlimit(resource.RLIMIT_NOFILE,
+ resource.setrlimit(resource.RLIMIT_DATA,
+ except ValueError:
+ print "Unable to increase file descriptor limit. Running as non-root?"
+ os.environ['PYTHON_EGG_CACHE'] = '/tmp'
+ def launch(ini_file, pid_file):
+ pid = os.fork()
+ if pid == 0:
+ os.setsid()
+ with open(os.devnull, 'r+b') as nullfile:
+ for desc in (0, 1, 2): # close stdio
+ try:
+ os.dup2(nullfile.fileno(), desc)
+ except OSError:
+ pass
+ try:
+ if once:
+ os.execl('/usr/bin/swift-%s' % server, server,
+ ini_file, 'once')
+ else:
+ os.execl('/usr/bin/swift-%s' % server, server, ini_file)
+ except OSError:
+ print 'unable to launch %s' % server
+ sys.exit(0)
+ else:
+ fp = open(pid_file, 'w')
+ fp.write('%d\n' % pid)
+ fp.close()
+ try:
+ os.mkdir('/var/run/swift')
+ except OSError, err:
+ if err.errno == errno.EACCES:
+ sys.exit('Unable to create /var/run/swift. Running as non-root?')
+ elif err.errno != errno.EEXIST:
+ raise
+ if os.path.exists('/etc/swift/%s-server.conf' % server_type):
+ if once:
+ print 'Running %s once' % server
+ else:
+ print 'Starting %s' % server
+ launch('/etc/swift/%s-server.conf' % server_type,
+ '/var/run/swift/' % server)
+ else:
+ try:
+ os.mkdir('/var/run/swift/%s' % server)
+ except OSError, err:
+ if err.errno == errno.EACCES:
+ sys.exit(
+ 'Unable to create /var/run/swift. Running as non-root?')
+ elif err.errno != errno.EEXIST:
+ raise
+ if once:
+ print 'Running %ss once' % server
+ else:
+ print 'Starting %ss' % server
+ for num, ini_file in enumerate(glob.glob('/etc/swift/%s-server/*.conf' % server_type)):
+ launch(ini_file, '/var/run/swift/%s/' % (server, num))
+def do_stop(server, graceful=False):
+ if graceful and server in GRACEFUL_SHUTDOWN_SERVERS:
+ sig = signal.SIGHUP
+ else:
+ sig = signal.SIGTERM
+ did_anything = False
+ pfiles = pid_files(server)
+ for pid_file, pid in pfiles:
+ did_anything = True
+ try:
+ print 'Stopping %s pid: %s signal: %s' % (server, pid, sig)
+ os.kill(pid, sig)
+ except OSError:
+ print "Process %d not running" % pid
+ try:
+ os.unlink(pid_file)
+ except OSError:
+ pass
+ for pid_file, pid in pfiles:
+ for _ in xrange(150): # 15 seconds
+ if not os.path.exists('/proc/%s' % pid):
+ break
+ time.sleep(0.1)
+ else:
+ print 'Waited 15 seconds for pid %s (%s) to die; giving up' % \
+ (pid, pid_file)
+ if not did_anything:
+ print 'No %s running' % server
+if command == 'start':
+ for server in servers:
+ do_start(server)
+if command == 'stop':
+ for server in servers:
+ do_stop(server)
+if command == 'shutdown':
+ for server in servers:
+ do_stop(server, graceful=True)
+if command == 'restart':
+ for server in servers:
+ do_stop(server)
+ for server in servers:
+ do_start(server)
+if command == 'reload':
+ for server in servers:
+ do_stop(server, graceful=True)
+ do_start(server)
+if command == 'once':
+ for server in servers:
+ do_start(server, once=True)
diff --git a/bin/ b/bin/
new file mode 100755
index 000000000..5d0e54e66
--- /dev/null
+++ b/bin/
@@ -0,0 +1,69 @@
+# Copyright (c) 2010 OpenStack, LLC.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import os
+import signal
+import sys
+from ConfigParser import ConfigParser
+from swift.obj.auditor import ObjectAuditor
+from swift.common import utils
+if __name__ == '__main__':
+ if len(sys.argv) < 2:
+ print "Usage: object-auditor CONFIG_FILE [once]"
+ sys.exit()
+ once = len(sys.argv) > 2 and sys.argv[2] == 'once'
+ c = ConfigParser()
+ if not[1]):
+ print "Unable to read config file."
+ sys.exit(1)
+ server_conf = dict(c.items('object-server'))
+ if c.has_section('object-auditor'):
+ auditor_conf = dict(c.items('object-auditor'))
+ else:
+ print "Unable to find object-auditor config section in %s." % \
+ sys.argv[1]
+ sys.exit(1)
+ logger = utils.get_logger(auditor_conf, 'object-auditor')
+ # log uncaught exceptions
+ sys.excepthook = lambda *exc_info: \
+ logger.critical('UNCAUGHT EXCEPTION', exc_info=exc_info)
+ sys.stdout = sys.stderr = utils.LoggerFileObject(logger)
+ utils.drop_privileges(server_conf.get('user', 'swift'))
+ try:
+ os.setsid()
+ except OSError:
+ pass
+ def kill_children(*args):
+ signal.signal(signal.SIGTERM, signal.SIG_IGN)
+ os.killpg(0, signal.SIGTERM)
+ sys.exit()
+ signal.signal(signal.SIGTERM, kill_children)
+ auditor = ObjectAuditor(server_conf, auditor_conf)
+ if once:
+ auditor.audit_once()
+ else:
+ auditor.audit_forever()
diff --git a/bin/ b/bin/
new file mode 100755
index 000000000..57f252207
--- /dev/null
+++ b/bin/
@@ -0,0 +1,92 @@
+# Copyright (c) 2010 OpenStack, LLC.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import sys
+import cPickle as pickle
+from datetime import datetime
+from hashlib import md5
+from swift.common.ring import Ring
+from swift.obj.server import read_metadata
+from swift.common.utils import hash_path
+if __name__ == '__main__':
+ if len(sys.argv) <= 1:
+ print "Usage: %s OBJECT_FILE" % sys.argv[0]
+ sys.exit(1)
+ try:
+ ring = Ring('/etc/swift/object.ring.gz')
+ except:
+ ring = None
+ datafile = sys.argv[1]
+ fp = open(datafile, 'rb')
+ metadata = read_metadata(fp)
+ path = metadata.pop('name','')
+ content_type = metadata.pop('Content-Type','')
+ ts = metadata.pop('X-Timestamp','')
+ etag = metadata.pop('ETag','')
+ length = metadata.pop('Content-Length','')
+ if path:
+ print 'Path: %s' % path
+ account, container, obj = path.split('/',3)[1:]
+ print ' Account: %s' % account
+ print ' Container: %s' % container
+ print ' Object: %s' % obj
+ obj_hash = hash_path(account, container, obj)
+ print ' Object hash: %s' % obj_hash
+ if ring is not None:
+ print 'Ring locations:'
+ part, nodes = ring.get_nodes(account, container, obj)
+ for node in nodes:
+ print (' %s:%s - /srv/node/%s/objects/%s/%s/%s/' %
+ (node['ip'], node['port'], node['device'], part,
+ obj_hash[-3:], obj_hash, ts))
+ else:
+ print 'Path: Not found in metadata'
+ if content_type:
+ print 'Content-Type: %s' % content_type
+ else:
+ print 'Content-Type: Not found in metadata'
+ if ts:
+ print 'Timestamp: %s (%s)' % (datetime.fromtimestamp(float(ts)), ts)
+ else:
+ print 'Timestamp: Not found in metadata'
+ h = md5()
+ file_len = 0
+ while True:
+ data =*1024)
+ if not data:
+ break
+ h.update(data)
+ file_len += len(data)
+ h = h.hexdigest()
+ if etag:
+ if h == etag:
+ print 'ETag: %s (valid)' % etag
+ else:
+ print "Etag: %s doesn't match file hash of %s!" % (etag, h)
+ else:
+ print 'ETag: Not found in metadata'
+ if length:
+ if file_len == int(length):
+ print 'Content-Length: %s (valid)' % length
+ else:
+ print "Content-Length: %s doesn't match file length of %s" % (
+ length, file_len)
+ else:
+ print 'Content-Length: Not found in metadata'
+ print 'User Metadata: %s' % metadata
+ fp.close()
diff --git a/bin/ b/bin/
new file mode 100755
index 000000000..c0603b96a
--- /dev/null
+++ b/bin/
@@ -0,0 +1,30 @@
+# Copyright (c) 2010 OpenStack, LLC.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from ConfigParser import ConfigParser
+import sys
+from swift.common.wsgi import run_wsgi
+from swift.obj.server import ObjectController
+if __name__ == '__main__':
+ c = ConfigParser()
+ if not[1]):
+ print "Unable to read config file."
+ sys.exit(1)
+ conf = dict(c.items('object-server'))
+ run_wsgi(ObjectController, conf, default_port=6000)
diff --git a/bin/ b/bin/
new file mode 100755
index 000000000..595840a07
--- /dev/null
+++ b/bin/
@@ -0,0 +1,64 @@
+# Copyright (c) 2010 OpenStack, LLC.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import os
+import signal
+import sys
+from ConfigParser import ConfigParser
+from swift.obj.updater import ObjectUpdater
+from swift.common import utils
+if __name__ == '__main__':
+ if len(sys.argv) < 2:
+ print "Usage: object-updater CONFIG_FILE [once]"
+ sys.exit()
+ once = len(sys.argv) > 2 and sys.argv[2] == 'once'
+ c = ConfigParser()
+ if not[1]):
+ print "Unable to read config file."
+ sys.exit(1)
+ server_conf = dict(c.items('object-server'))
+ if c.has_section('object-updater'):
+ updater_conf = dict(c.items('object-updater'))
+ else:
+ print "Unable to find object-updater config section in %s." % \
+ sys.argv[1]
+ sys.exit(1)
+ utils.drop_privileges(server_conf.get('user', 'swift'))
+ try:
+ os.setsid()
+ except OSError:
+ pass
+ def kill_children(*args):
+ signal.signal(signal.SIGTERM, signal.SIG_IGN)
+ os.killpg(0, signal.SIGTERM)
+ sys.exit()
+ signal.signal(signal.SIGTERM, kill_children)
+ updater = ObjectUpdater(server_conf, updater_conf)
+ if once:
+ updater.update_once_single_threaded()
+ else:
+ updater.update_forever()
diff --git a/bin/ b/bin/
new file mode 100755
index 000000000..d1d19677e
--- /dev/null
+++ b/bin/
@@ -0,0 +1,45 @@
+# Copyright (c) 2010 OpenStack, LLC.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from ConfigParser import ConfigParser
+import os
+import sys
+from swift.common.wsgi import run_wsgi
+from swift.common.auth import DevAuthMiddleware
+from swift.common.memcached import MemcacheRing
+from swift.common.utils import get_logger
+from swift.proxy.server import Application
+if __name__ == '__main__':
+ c = ConfigParser()
+ if not[1]):
+ print "Unable to read config file."
+ sys.exit(1)
+ conf = dict(c.items('proxy-server'))
+ swift_dir = conf.get('swift_dir', '/etc/swift')
+ c = ConfigParser()
+, 'auth-server.conf'))
+ auth_conf = dict(c.items('auth-server'))
+ memcache = MemcacheRing([s.strip() for s in
+ conf.get('memcache_servers', '').split(',')
+ if s.strip()])
+ logger = get_logger(conf, 'proxy')
+ app = Application(conf, memcache, logger)
+ # Wrap the app with auth
+ app = DevAuthMiddleware(app, auth_conf, memcache, logger)
+ run_wsgi(app, conf, logger=logger, default_port=80)
diff --git a/bin/ b/bin/
new file mode 100755
index 000000000..6c12fb582
--- /dev/null
+++ b/bin/
@@ -0,0 +1,558 @@
+#!/usr/bin/python -uO
+# Copyright (c) 2010 OpenStack, LLC.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import cPickle as pickle
+from errno import EEXIST
+from gzip import GzipFile
+from os import mkdir
+from os.path import basename, dirname, exists, join as pathjoin
+from sys import argv, exit
+from time import time
+from swift.common.ring import RingBuilder
+def search_devs(builder, search_value):
+ # d<device_id>z<zone>-<ip>:<port>/<device_name>_<meta>
+ orig_search_value = search_value
+ match = []
+ if search_value.startswith('d'):
+ i = 1
+ while i < len(search_value) and search_value[i].isdigit():
+ i += 1
+ match.append(('id', int(search_value[1:i])))
+ search_value = search_value[i:]
+ if search_value.startswith('z'):
+ i = 1
+ while i < len(search_value) and search_value[i].isdigit():
+ i += 1
+ match.append(('zone', int(search_value[1:i])))
+ search_value = search_value[i:]
+ if search_value.startswith('-'):
+ search_value = search_value[1:]
+ if len(search_value) and search_value[0].isdigit():
+ i = 1
+ while i < len(search_value) and search_value[i] in '0123456789.':
+ i += 1
+ match.append(('ip', search_value[:i]))
+ search_value = search_value[i:]
+ if search_value.startswith(':'):
+ i = 1
+ while i < len(search_value) and search_value[i].isdigit():
+ i += 1
+ match.append(('port', int(search_value[1:i])))
+ search_value = search_value[i:]
+ if search_value.startswith('/'):
+ i = 1
+ while i < len(search_value) and search_value[i] != '_':
+ i += 1
+ match.append(('device', search_value[1:i]))
+ search_value = search_value[i:]
+ if search_value.startswith('_'):
+ match.append(('meta', search_value[1:]))
+ search_value = ''
+ if search_value:
+ raise ValueError('Invalid <search-value>: %s' % repr(orig_search_value))
+ devs = []
+ for dev in builder.devs:
+ if not dev:
+ continue
+ matched = True
+ for key, value in match:
+ if key == 'meta':
+ if value not in dev.get(key):
+ matched = False
+ elif dev.get(key) != value:
+ matched = False
+ if matched:
+ devs.append(dev)
+ return devs
+ The <search-value> can be of the form:
+ d<device_id>z<zone>-<ip>:<port>/<device_name>_<meta>
+ Any part is optional, but you must include at least one part.
+ Examples:
+ d74 Matches the device id 74
+ z1 Matches devices in zone 1
+ z1- Matches devices in zone 1 with the ip
+ Matches devices in any zone with the ip
+ z1:5678 Matches devices in zone 1 using port 5678
+ :5678 Matches devices that use port 5678
+ /sdb1 Matches devices with the device name sdb1
+ _shiny Matches devices with shiny in the meta data
+ _"snet:" Matches devices with snet: in the meta data
+ Most specific example:
+ d74z1-"snet:"
+ Nerd explanation:
+ All items require their single character prefix except the ip, in which
+ case the - is optional unless the device id or zone is also included.
+ring_builder <builder_file> create <part_power> <replicas> <min_part_hours>
+ Creates <builder_file> with 2^<part_power> partitions and <replicas>.
+ <min_part_hours> is number of hours to restrict moving a partition more
+ than once.
+ring_builder <builder_file> search <search-value>
+ Shows information about matching devices.
+'''.strip() % globals()
+ADD_HELP = '''
+ring_builder <builder_file> add z<zone>-<ip>:<port>/<device_name>_<meta> <wght>
+ Adds a device to the ring with the given information. No partitions will be
+ assigned to the new device until after running 'rebalance'. This is so you
+ can make multiple device changes and rebalance them all just once.
+ring_builder <builder_file> set_weight <search-value> <weight>
+ Resets the device's weight. No partitions will be reassigned to or from the
+ device until after running 'rebalance'. This is so you can make multiple
+ device changes and rebalance them all just once.
+'''.strip() % globals()
+ring_builder <builder_file> set_info <search-value>
+ <ip>:<port>/<device_name>_<meta>
+ Resets the device's information. This information isn't used to assign
+ partitions, so you can use 'write_ring' afterward to rewrite the current
+ ring with the newer device information. Any of the parts are optional
+ in the final <ip>:<port>/<device_name>_<meta> parameter; just give what you
+ want to change. For instance set_info d74 _"snet:" would just
+ update the meta data for device id 74.
+'''.strip() % globals()
+ring_builder <builder_file> remove <search-value>
+ Removes the device(s) from the ring. This should normally just be used for
+ a device that has failed. For a device you wish to decommission, it's best
+ to set its weight to 0, wait for it to drain all its data, then use this
+ remove command. This will not take effect until after running 'rebalance'.
+ This is so you can make multiple device changes and rebalance them all just
+ once.
+'''.strip() % globals()
+ring_builder <builder_file> set_min_part_hours <hours>
+ Changes the <min_part_hours> to the given <hours>. This should be set to
+ however long a full replication/update cycle takes. We're working on a way
+ to determine this more easily than scanning logs.
+if __name__ == '__main__':
+ if len(argv) < 2:
+ print '''
+ring_builder %(MAJOR_VERSION)s.%(MINOR_VERSION)s
+ring_builder <builder_file>
+ Shows information about the ring and the devices within.
+ring_builder <builder_file> rebalance
+ Attempts to rebalance the ring by reassigning partitions that haven't been
+ recently reassigned.
+ring_builder <builder_file> validate
+ Just runs the validation routines on the ring.
+ring_builder <builder_file> write_ring
+ Just rewrites the distributable ring file. This is done automatically after
+ a successful rebalance, so really this is only useful after one or more
+ 'set_info' calls when no rebalance is needed but you want to send out the
+ new device information.
+Quick list: create search add set_weight set_info remove rebalance write_ring
+ set_min_part_hours
+Exit codes: 0 = ring changed, 1 = ring did not change, 2 = error
+'''.strip() % globals()
+ if exists(argv[1]):
+ builder = pickle.load(open(argv[1], 'rb'))
+ for dev in builder.devs:
+ if dev and 'meta' not in dev:
+ dev['meta'] = ''
+ elif len(argv) < 3 or argv[2] != 'create':
+ print 'Ring Builder file does not exist: %s' % argv[1]
+ exit(EXIT_ERROR)
+ elif argv[2] == 'create':
+ if len(argv) < 6:
+ builder = RingBuilder(int(argv[3]), int(argv[4]), int(argv[5]))
+ backup_dir = pathjoin(dirname(argv[1]), 'backups')
+ try:
+ mkdir(backup_dir)
+ except OSError, err:
+ if err.errno != EEXIST:
+ raise
+ pickle.dump(builder, open(pathjoin(backup_dir,
+ '%d.' % time() + basename(argv[1])), 'wb'), protocol=2)
+ pickle.dump(builder, open(argv[1], 'wb'), protocol=2)
+ backup_dir = pathjoin(dirname(argv[1]), 'backups')
+ try:
+ mkdir(backup_dir)
+ except OSError, err:
+ if err.errno != EEXIST:
+ raise
+ ring_file = argv[1]
+ if ring_file.endswith('.builder'):
+ ring_file = ring_file[:-len('.builder')]
+ ring_file += '.ring.gz'
+ if len(argv) == 2:
+ print '%s, build version %d' % (argv[1], builder.version)
+ zones = 0
+ balance = 0
+ if builder.devs:
+ zones = len(set(d['zone'] for d in builder.devs if d is not None))
+ balance = builder.get_balance()
+ print '%d partitions, %d replicas, %d zones, %d devices, %.02f ' \
+ 'balance' % (, builder.replicas, zones,
+ len([d for d in builder.devs if d]), balance)
+ print 'The minimum number of hours before a partition can be ' \
+ 'reassigned is %s' % builder.min_part_hours
+ if builder.devs:
+ print 'Devices: id zone ip address port name ' \
+ 'weight partitions balance meta'
+ weighted_parts = * builder.replicas / \
+ sum(d['weight'] for d in builder.devs if d is not None)
+ for dev in builder.devs:
+ if dev is None:
+ continue
+ if not dev['weight']:
+ if dev['parts']:
+ balance = 999.99
+ else:
+ balance = 0
+ else:
+ balance = 100.0 * dev['parts'] / \
+ (dev['weight'] * weighted_parts) - 100.0
+ print ' %5d %5d %15s %5d %9s %6.02f %10s %7.02f %s' % \
+ (dev['id'], dev['zone'], dev['ip'], dev['port'],
+ dev['device'], dev['weight'], dev['parts'], balance,
+ dev['meta'])
+ if argv[2] == 'search':
+ if len(argv) < 4:
+ devs = search_devs(builder, argv[3])
+ if not devs:
+ print 'No matching devices found'
+ exit(EXIT_ERROR)
+ print 'Devices: id zone ip address port name ' \
+ 'weight partitions balance meta'
+ weighted_parts = * builder.replicas / \
+ sum(d['weight'] for d in builder.devs if d is not None)
+ for dev in devs:
+ if not dev['weight']:
+ if dev['parts']:
+ balance = 999.99
+ else:
+ balance = 0
+ else:
+ balance = 100.0 * dev['parts'] / \
+ (dev['weight'] * weighted_parts) - 100.0
+ print ' %5d %5d %15s %5d %9s %6.02f %10s %7.02f %s' % \
+ (dev['id'], dev['zone'], dev['ip'], dev['port'],
+ dev['device'], dev['weight'], dev['parts'], balance,
+ dev['meta'])
+ elif argv[2] == 'add':
+ # add z<zone>-<ip>:<port>/<device_name>_<meta> <wght>
+ if len(argv) < 5:
+ print ADD_HELP
+ if not argv[3].startswith('z'):
+ print 'Invalid add value: %s' % argv[3]
+ exit(EXIT_ERROR)
+ i = 1
+ while i < len(argv[3]) and argv[3][i].isdigit():
+ i += 1
+ zone = int(argv[3][1:i])
+ rest = argv[3][i:]
+ if not rest.startswith('-'):
+ print 'Invalid add value: %s' % argv[3]
+ exit(EXIT_ERROR)
+ i = 1
+ while i < len(rest) and rest[i] in '0123456789.':
+ i += 1
+ ip = rest[1:i]
+ rest = rest[i:]
+ if not rest.startswith(':'):
+ print 'Invalid add value: %s' % argv[3]
+ exit(EXIT_ERROR)
+ i = 1
+ while i < len(rest) and rest[i].isdigit():
+ i += 1
+ port = int(rest[1:i])
+ rest = rest[i:]
+ if not rest.startswith('/'):
+ print 'Invalid add value: %s' % argv[3]
+ exit(EXIT_ERROR)
+ i = 1
+ while i < len(rest) and rest[i] != '_':
+ i += 1
+ device_name = rest[1:i]
+ rest = rest[i:]
+ meta = ''
+ if rest.startswith('_'):
+ meta = rest[1:]
+ weight = float(argv[4])
+ for dev in builder.devs:
+ if dev is None:
+ continue
+ if dev['ip'] == ip and dev['port'] == port and \
+ dev['device'] == device_name:
+ print 'Device %d already uses %s:%d/%s.' % \
+ (dev['id'], dev['ip'], dev['port'], dev['device'])
+ exit(EXIT_ERROR)
+ next_dev_id = 0
+ if builder.devs:
+ next_dev_id = max(d['id'] for d in builder.devs if d) + 1
+ builder.add_dev({'id': next_dev_id, 'zone': zone, 'ip': ip,
+ 'port': port, 'device': device_name, 'weight': weight,
+ 'meta': meta})
+ print 'Device z%s-%s:%s/%s_"%s" with %s weight got id %s' % \
+ (zone, ip, port, device_name, meta, weight, next_dev_id)
+ pickle.dump(builder, open(argv[1], 'wb'), protocol=2)
+ elif argv[2] == 'set_weight':
+ if len(argv) != 5:
+ devs = search_devs(builder, argv[3])
+ weight = float(argv[4])
+ if not devs:
+ print 'No matching devices found'
+ exit(EXIT_ERROR)
+ if len(devs) > 1:
+ print 'Matched more than one device:'
+ for dev in devs:
+ print ' d%(id)sz%(zone)s-%(ip)s:%(port)s/%(device)s_' \
+ '"%(meta)s"' % dev
+ if raw_input('Are you sure you want to update the weight for '
+ 'these %s devices? (y/N) ' % len(devs)) != 'y':
+ print 'Aborting device modifications'
+ exit(EXIT_ERROR)
+ for dev in devs:
+ builder.set_dev_weight(dev['id'], weight)
+ print 'd%(id)sz%(zone)s-%(ip)s:%(port)s/%(device)s_"%(meta)s" ' \
+ 'weight set to %(weight)s' % dev
+ pickle.dump(builder, open(argv[1], 'wb'), protocol=2)
+ elif argv[2] == 'set_info':
+ if len(argv) != 5:
+ devs = search_devs(builder, argv[3])
+ change_value = argv[4]
+ change = []
+ if len(change_value) and change_value[0].isdigit():
+ i = 1
+ while i < len(change_value) and change_value[i] in '0123456789.':
+ i += 1
+ change.append(('ip', change_value[:i]))
+ change_value = change_value[i:]
+ if change_value.startswith(':'):
+ i = 1
+ while i < len(change_value) and change_value[i].isdigit():
+ i += 1
+ change.append(('port', int(change_value[1:i])))
+ change_value = change_value[i:]
+ if change_value.startswith('/'):
+ i = 1
+ while i < len(change_value) and change_value[i] != '_':
+ i += 1
+ change.append(('device', change_value[1:i]))
+ change_value = change_value[i:]
+ if change_value.startswith('_'):
+ change.append(('meta', change_value[1:]))
+ change_value = ''
+ if change_value or not change:
+ raise ValueError('Invalid set info change value: %s' %
+ repr(argv[4]))
+ if not devs:
+ print 'No matching devices found'
+ exit(EXIT_ERROR)
+ if len(devs) > 1:
+ print 'Matched more than one device:'
+ for dev in devs:
+ print ' d%(id)sz%(zone)s-%(ip)s:%(port)s/%(device)s_' \
+ '"%(meta)s"' % dev
+ if raw_input('Are you sure you want to update the info for '
+ 'these %s devices? (y/N) ' % len(devs)) != 'y':
+ print 'Aborting device modifications'
+ exit(EXIT_ERROR)
+ for dev in devs:
+ orig_dev_string = \
+ 'd%(id)sz%(zone)s-%(ip)s:%(port)s/%(device)s_"%(meta)s"' % dev
+ test_dev = dict(dev)
+ for key, value in change:
+ test_dev[key] = value
+ for check_dev in builder.devs:
+ if not check_dev or check_dev['id'] == test_dev['id']:
+ continue
+ if check_dev['ip'] == test_dev['ip'] and \
+ check_dev['port'] == test_dev['port'] and \
+ check_dev['device'] == test_dev['device']:
+ print 'Device %d already uses %s:%d/%s.' % \
+ (check_dev['id'], check_dev['ip'], check_dev['port'],
+ check_dev['device'])
+ exit(EXIT_ERROR)
+ for key, value in change:
+ dev[key] = value
+ new_dev_string = \
+ 'd%(id)sz%(zone)s-%(ip)s:%(port)s/%(device)s_"%(meta)s"' % dev
+ print 'Device %s is now %s' % (orig_dev_string, new_dev_string)
+ pickle.dump(builder, open(argv[1], 'wb'), protocol=2)
+ elif argv[2] == 'remove':
+ if len(argv) < 4:
+ devs = search_devs(builder, argv[3])
+ if not devs:
+ print 'No matching devices found'
+ exit(EXIT_ERROR)
+ if len(devs) > 1:
+ print 'Matched more than one device:'
+ for dev in devs:
+ print ' d%(id)sz%(zone)s-%(ip)s:%(port)s/%(device)s_' \
+ '"%(meta)s"' % dev
+ if raw_input('Are you sure you want to remove these %s devices? '
+ '(y/N) ' % len(devs)) != 'y':
+ print 'Aborting device removals'
+ exit(EXIT_ERROR)
+ for dev in devs:
+ builder.remove_dev(dev['id'])
+ print 'd%(id)sz%(zone)s-%(ip)s:%(port)s/%(device)s_"%(meta)s" ' \
+ 'marked for removal and will be removed next rebalance.' % dev
+ pickle.dump(builder, open(argv[1], 'wb'), protocol=2)
+ elif argv[2] == 'rebalance':
+ devs_changed = builder.devs_changed
+ last_balance = builder.get_balance()
+ parts, balance = builder.rebalance()
+ if not parts:
+ print 'No partitions could be reassigned.'
+ print 'Either none need to be or none can be due to ' \
+ 'min_part_hours [%s].' % builder.min_part_hours
+ if not devs_changed and abs(last_balance - balance) < 1:
+ print 'Cowardly refusing to save rebalance as it did not change ' \
+ 'at least 1%.'
+ builder.validate()
+ print 'Reassigned %d (%.02f%%) partitions. Balance is now %.02f.' % \
+ (parts, 100.0 * parts /, balance)
+ if balance > 5:
+ print '-' * 79
+ print 'NOTE: Balance of %.02f indicates you should push this ' % \
+ balance
+ print ' ring, wait at least %d hours, and rebalance/repush.' \
+ % builder.min_part_hours
+ print '-' * 79
+ ts = time()
+ pickle.dump(builder.get_ring(),
+ GzipFile(pathjoin(backup_dir, '%d.' % ts +
+ basename(ring_file)), 'wb'), protocol=2)
+ pickle.dump(builder, open(pathjoin(backup_dir,
+ '%d.' % ts + basename(argv[1])), 'wb'), protocol=2)
+ pickle.dump(builder.get_ring(), GzipFile(ring_file, 'wb'), protocol=2)
+ pickle.dump(builder, open(argv[1], 'wb'), protocol=2)
+ elif argv[2] == 'validate':
+ builder.validate()
+ elif argv[2] == 'write_ring':
+ pickle.dump(builder.get_ring(),
+ GzipFile(pathjoin(backup_dir, '%d.' % time() +
+ basename(ring_file)), 'wb'), protocol=2)
+ pickle.dump(builder.get_ring(), GzipFile(ring_file, 'wb'), protocol=2)
+ elif argv[2] == 'pretend_min_part_hours_passed':
+ builder.pretend_min_part_hours_passed()
+ pickle.dump(builder, open(argv[1], 'wb'), protocol=2)
+ elif argv[2] == 'set_min_part_hours':
+ if len(argv) < 4:
+ builder.change_min_part_hours(int(argv[3]))
+ print 'The minimum number of hours before a partition can be ' \
+ 'reassigned is now set to %s' % argv[3]
+ pickle.dump(builder, open(argv[1], 'wb'), protocol=2)
+ print 'Unknown command: %s' % argv[2]
+ exit(EXIT_ERROR)
diff --git a/bin/ b/bin/
new file mode 100755
index 000000000..793c8b6ce
--- /dev/null
+++ b/bin/
@@ -0,0 +1,197 @@
+#!/usr/bin/python -u
+# Copyright (c) 2010 OpenStack, LLC.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import os
+import traceback
+from ConfigParser import ConfigParser
+from optparse import OptionParser
+from sys import exit, argv
+from time import time
+from uuid import uuid4
+from eventlet import GreenPool, patcher, sleep
+from eventlet.pools import Pool
+from swift.common.client import Connection, get_auth
+from swift.common.ring import Ring
+from swift.common.utils import compute_eta, get_time_units
+def put_container(connpool, container, report):
+ global retries_done
+ try:
+ with connpool.item() as conn:
+ conn.put_container(container)
+ retries_done += conn.attempts - 1
+ if report:
+ report(True)
+ except:
+ if report:
+ report(False)
+ raise
+def put_object(connpool, container, obj, report):
+ global retries_done
+ try:
+ with connpool.item() as conn:
+ conn.put_object(container, obj, obj, metadata={'stats': obj})
+ retries_done += conn.attempts - 1
+ if report:
+ report(True)
+ except:
+ if report:
+ report(False)
+ raise
+def report(success):
+ global begun, created, item_type, next_report, need_to_create, retries_done
+ if not success:
+ traceback.print_exc()
+ exit('Gave up due to error(s).')
+ created += 1
+ if time() < next_report:
+ return
+ next_report = time() + 5
+ eta, eta_unit = compute_eta(begun, created, need_to_create)
+ print '\r\x1B[KCreating %s: %d of %d, %d%s left, %d retries' % (item_type,
+ created, need_to_create, round(eta), eta_unit, retries_done),
+if __name__ == '__main__':
+ global begun, created, item_type, next_report, need_to_create, retries_done
+ patcher.monkey_patch()
+ parser = OptionParser()
+ parser.add_option('-d', '--dispersion', action='store_true',
+ dest='dispersion', default=False,
+ help='Run the dispersion population')
+ parser.add_option('-p', '--performance', action='store_true',
+ dest='performance', default=False,
+ help='Run the performance population')
+ args = argv[1:]
+ if not args:
+ args.append('-h')
+ (options, args) = parser.parse_args(args)
+ conf_file = '/etc/swift/stats.conf'
+ if args:
+ conf_file = args[0]
+ c = ConfigParser()
+ if not
+ exit('Unable to read config file: %s' % conf_file)
+ conf = dict(c.items('stats'))
+ swift_dir = conf.get('swift_dir', '/etc/swift')
+ dispersion_coverage = int(conf.get('dispersion_coverage', 1))
+ big_container_count = int(conf.get('big_container_count', 1000000))
+ retries = int(conf.get('retries', 5))
+ concurrency = int(conf.get('concurrency', 50))
+ coropool = GreenPool(size=concurrency)
+ retries_done = 0
+ url, token = get_auth(conf['auth_url'], conf['auth_user'],
+ conf['auth_key'])
+ account = url.rsplit('/', 1)[1]
+ connpool = Pool(max_size=concurrency)
+ connpool.create = lambda: Connection(conf['auth_url'],
+ conf['auth_user'], conf['auth_key'],
+ retries=retries,
+ preauthurl=url, preauthtoken=token)
+ if options.dispersion:
+ container_ring = Ring(os.path.join(swift_dir, 'container.ring.gz'))
+ parts_left = \
+ dict((x, x) for x in xrange(container_ring.partition_count))
+ item_type = 'containers'
+ created = 0
+ retries_done = 0
+ need_to_create = need_to_queue = \
+ dispersion_coverage / 100.0 * container_ring.partition_count
+ begun = next_report = time()
+ next_report += 2
+ while need_to_queue >= 1:
+ container = 'stats_container_dispersion_%s' % uuid4()
+ part, _ = container_ring.get_nodes(account, container)
+ if part in parts_left:
+ coropool.spawn(put_container, connpool, container, report)
+ sleep()
+ del parts_left[part]
+ need_to_queue -= 1
+ coropool.waitall()
+ elapsed, elapsed_unit = get_time_units(time() - begun)
+ print '\r\x1B[KCreated %d containers for dispersion reporting, ' \
+ '%d%s, %d retries' % \
+ (need_to_create, round(elapsed), elapsed_unit, retries_done)
+ container = 'stats_objects'
+ put_container(connpool, container, None)
+ object_ring = Ring(os.path.join(swift_dir, 'object.ring.gz'))
+ parts_left = dict((x, x) for x in xrange(object_ring.partition_count))
+ item_type = 'objects'
+ created = 0
+ retries_done = 0
+ need_to_create = need_to_queue = \
+ dispersion_coverage / 100.0 * object_ring.partition_count
+ begun = next_report = time()
+ next_report += 2
+ while need_to_queue >= 1:
+ obj = 'stats_object_dispersion_%s' % uuid4()
+ part, _ = object_ring.get_nodes(account, container, obj)
+ if part in parts_left:
+ coropool.spawn(put_object, connpool, container, obj, report)
+ sleep()
+ del parts_left[part]
+ need_to_queue -= 1
+ coropool.waitall()
+ elapsed, elapsed_unit = get_time_units(time() - begun)
+ print '\r\x1B[KCreated %d objects for dispersion reporting, ' \
+ '%d%s, %d retries' % \
+ (need_to_create, round(elapsed), elapsed_unit, retries_done)
+ if options.performance:
+ container = 'big_container'
+ put_container(connpool, container, None)
+ item_type = 'objects'
+ created = 0
+ retries_done = 0
+ need_to_create = need_to_queue = big_container_count
+ begun = next_report = time()
+ next_report += 2
+ segments = ['00']
+ for x in xrange(big_container_count):
+ obj = '%s/%02x' % ('/'.join(segments), x)
+ coropool.spawn(put_object, connpool, container, obj, report)
+ sleep()
+ need_to_queue -= 1
+ i = 0
+ while True:
+ nxt = int(segments[i], 16) + 1
+ if nxt < 10005:
+ segments[i] = '%02x' % nxt
+ break
+ else:
+ segments[i] = '00'
+ i += 1
+ if len(segments) <= i:
+ segments.append('00')
+ break
+ coropool.waitall()
+ elapsed, elapsed_unit = get_time_units(time() - begun)
+ print '\r\x1B[KCreated %d objects for performance reporting, ' \
+ '%d%s, %d retries' % \
+ (need_to_create, round(elapsed), elapsed_unit, retries_done)
diff --git a/bin/ b/bin/
new file mode 100755
index 000000000..537a73169
--- /dev/null
+++ b/bin/
@@ -0,0 +1,942 @@
+#!/usr/bin/python -u
+# Copyright (c) 2010 OpenStack, LLC.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import csv
+import os
+import socket
+from ConfigParser import ConfigParser
+from httplib import HTTPException
+from optparse import OptionParser
+from sys import argv, exit, stderr
+from time import time
+from uuid import uuid4
+from eventlet import GreenPool, hubs, patcher, sleep, Timeout
+from eventlet.pools import Pool
+from swift.common import direct_client
+from swift.common.client import ClientException, Connection, get_auth
+from swift.common.ring import Ring
+from swift.common.utils import compute_eta, get_time_units
+unmounted = []
+def get_error_log(prefix):
+ def error_log(msg_or_exc):
+ global unmounted
+ if hasattr(msg_or_exc, 'http_status') and \
+ msg_or_exc.http_status == 507:
+ identifier = '%s:%s/%s'
+ if identifier not in unmounted:
+ unmounted.append(identifier)
+ print >>stderr, 'ERROR: %s:%s/%s is unmounted -- This will ' \
+ 'cause replicas designated for that device to be ' \
+ 'considered missing until resolved or the ring is ' \
+ 'updated.' % (msg_or_exc.http_host, msg_or_exc.http_port,
+ msg_or_exc.http_device)
+ if not hasattr(msg_or_exc, 'http_status') or \
+ msg_or_exc.http_status not in (404, 507):
+ print >>stderr, 'ERROR: %s: %s' % (prefix, msg_or_exc)
+ return error_log
+def audit(coropool, connpool, account, container_ring, object_ring, options):
+ begun = time()
+ with connpool.item() as conn:
+ estimated_items = [conn.head_account()[0]]
+ items_completed = [0]
+ retries_done = [0]
+ containers_missing_replicas = {}
+ objects_missing_replicas = {}
+ next_report = [time() + 2]
+ def report():
+ if options.verbose and time() >= next_report[0]:
+ next_report[0] = time() + 5
+ eta, eta_unit = \
+ compute_eta(begun, items_completed[0], estimated_items[0])
+ print '\r\x1B[KAuditing items: %d of %d, %d%s left, %d ' \
+ 'retries' % (items_completed[0], estimated_items[0],
+ round(eta), eta_unit, retries_done[0]),
+ def direct_container(container, part, nodes):
+ estimated_objects = 0
+ for node in nodes:
+ found = False
+ error_log = get_error_log('%(ip)s:%(port)s/%(device)s' % node)
+ try:
+ attempts, info = direct_client.retry(
+ direct_client.direct_head_container, node,
+ part, account, container,
+ error_log=error_log,
+ retries=options.retries)
+ retries_done[0] += attempts - 1
+ found = True
+ if not estimated_objects:
+ estimated_objects = info[0]
+ except ClientException, err:
+ if err.http_status not in (404, 507):
+ error_log('Giving up on /%s/%s/%s: %s' % (part, account,
+ container, err))
+ except (Exception, Timeout), err:
+ error_log('Giving up on /%s/%s/%s: %s' % (part, account,
+ container, err))
+ if not found:
+ if container in containers_missing_replicas:
+ containers_missing_replicas[container].append(node)
+ else:
+ containers_missing_replicas[container] = [node]
+ estimated_items[0] += estimated_objects
+ items_completed[0] += 1
+ report()
+ def direct_object(container, obj, part, nodes):
+ for node in nodes:
+ found = False
+ error_log = get_error_log('%(ip)s:%(port)s/%(device)s' % node)
+ try:
+ attempts, _ = direct_client.retry(
+ direct_client.direct_head_object, node, part,
+ account, container, obj, error_log=error_log,
+ retries=options.retries)
+ retries_done[0] += attempts - 1
+ found = True
+ except ClientException, err:
+ if err.http_status not in (404, 507):
+ error_log('Giving up on /%s/%s/%s: %s' % (part, account,
+ container, err))
+ except (Exception, Timeout), err:
+ error_log('Giving up on /%s/%s/%s: %s' % (part, account,
+ container, err))
+ if not found:
+ opath = '/%s/%s' % (container, obj)
+ if opath in objects_missing_replicas:
+ objects_missing_replicas[opath].append(node)
+ else:
+ objects_missing_replicas[opath] = [node]
+ items_completed[0] += 1
+ report()
+ cmarker = ''
+ while True:
+ with connpool.item() as conn:
+ containers = [c['name'] for c in conn.get_account(marker=cmarker)]
+ if not containers:
+ break
+ cmarker = containers[-1]
+ for container in containers:
+ part, nodes = container_ring.get_nodes(account, container)
+ coropool.spawn(direct_container, container, part, nodes)
+ for container in containers:
+ omarker = ''
+ while True:
+ with connpool.item() as conn:
+ objects = [o['name'] for o in
+ conn.get_container(container, marker=omarker)]
+ if not objects:
+ break
+ omarker = objects[-1]
+ for obj in objects:
+ part, nodes = object_ring.get_nodes(account, container, obj)
+ coropool.spawn(direct_object, container, obj, part, nodes)
+ coropool.waitall()
+ print '\r\x1B[K\r',
+ if not containers_missing_replicas and not objects_missing_replicas:
+ print 'No missing items.'
+ return
+ if containers_missing_replicas:
+ print 'Containers Missing'
+ print '-' * 78
+ for container in sorted(containers_missing_replicas.keys()):
+ part, _ = container_ring.get_nodes(account, container)
+ for node in containers_missing_replicas[container]:
+ print 'http://%s:%s/%s/%s/%s/%s' % (node['ip'], node['port'],
+ node['device'], part, account, container)
+ if objects_missing_replicas:
+ if containers_missing_replicas:
+ print
+ print 'Objects Missing'
+ print '-' * 78
+ for opath in sorted(objects_missing_replicas.keys()):
+ _, container, obj = opath.split('/', 2)
+ part, _ = object_ring.get_nodes(account, container, obj)
+ for node in objects_missing_replicas[opath]:
+ print 'http://%s:%s/%s/%s/%s/%s/%s' % (node['ip'],
+ node['port'], node['device'], part, account, container,
+ obj)
+def container_dispersion_report(coropool, connpool, account, container_ring,
+ options):
+ """ Returns (number of containers listed, number of distinct partitions,
+ number of container copies found) """
+ with connpool.item() as conn:
+ containers = [c['name'] for c in
+ conn.get_account(prefix='stats_container_dispersion_',
+ full_listing=True)]
+ containers_listed = len(containers)
+ if not containers_listed:
+ print >>stderr, 'No containers to query. Has stats-populate been run?'
+ return 0
+ retries_done = [0]
+ containers_queried = [0]
+ container_copies_found = [0, 0, 0, 0]
+ begun = time()
+ next_report = [time() + 2]
+ def direct(container, part, nodes):
+ found_count = 0
+ for node in nodes:
+ error_log = get_error_log('%(ip)s:%(port)s/%(device)s' % node)
+ try:
+ attempts, _ = direct_client.retry(
+ direct_client.direct_head_container, node,
+ part, account, container, error_log=error_log,
+ retries=options.retries)
+ retries_done[0] += attempts - 1
+ found_count += 1
+ except ClientException, err:
+ if err.http_status not in (404, 507):
+ error_log('Giving up on /%s/%s/%s: %s' % (part, account,
+ container, err))
+ except (Exception, Timeout), err:
+ error_log('Giving up on /%s/%s/%s: %s' % (part, account,
+ container, err))
+ container_copies_found[found_count] += 1
+ containers_queried[0] += 1
+ if options.verbose and time() >= next_report[0]:
+ next_report[0] = time() + 5
+ eta, eta_unit = compute_eta(begun, containers_queried[0],
+ containers_listed)
+ print '\r\x1B[KQuerying containers: %d of %d, %d%s left, %d ' \
+ 'retries' % (containers_queried[0], containers_listed,
+ round(eta), eta_unit, retries_done[0]),
+ container_parts = {}
+ for container in containers:
+ part, nodes = container_ring.get_nodes(account, container)
+ if part not in container_parts:
+ container_parts[part] = part
+ coropool.spawn(direct, container, part, nodes)
+ coropool.waitall()
+ distinct_partitions = len(container_parts)
+ copies_expected = distinct_partitions * container_ring.replica_count
+ copies_found = sum(a * b for a, b in enumerate(container_copies_found))
+ value = 100.0 * copies_found / copies_expected
+ if options.verbose:
+ elapsed, elapsed_unit = get_time_units(time() - begun)
+ print '\r\x1B[KQueried %d containers for dispersion reporting, ' \
+ '%d%s, %d retries' % (containers_listed, round(elapsed),
+ elapsed_unit, retries_done[0])
+ if containers_listed - distinct_partitions:
+ print 'There were %d overlapping partitions' % (
+ containers_listed - distinct_partitions)
+ if container_copies_found[2]:
+ print 'There were %d partitions missing one copy.' % \
+ container_copies_found[2]
+ if container_copies_found[1]:
+ print '! There were %d partitions missing two copies.' % \
+ container_copies_found[1]
+ if container_copies_found[0]:
+ print '!!! There were %d partitions missing all copies.' % \
+ container_copies_found[0]
+ print '%.02f%% of container copies found (%d of %d)' % (
+ value, copies_found, copies_expected)
+ print 'Sample represents %.02f%% of the container partition space' % (
+ 100.0 * distinct_partitions / container_ring.partition_count)
+ return value
+def object_dispersion_report(coropool, connpool, account, object_ring, options):
+ """ Returns (number of objects listed, number of distinct partitions,
+ number of object copies found) """
+ container = 'stats_objects'
+ with connpool.item() as conn:
+ try:
+ objects = [o['name'] for o in conn.get_container(container,
+ prefix='stats_object_dispersion_', full_listing=True)]
+ except ClientException, err:
+ if err.http_status != 404:
+ raise
+ print >>stderr, 'No objects to query. Has stats-populate been run?'
+ return 0
+ objects_listed = len(objects)
+ if not objects_listed:
+ print >>stderr, 'No objects to query. Has stats-populate been run?'
+ return 0
+ retries_done = [0]
+ objects_queried = [0]
+ object_copies_found = [0, 0, 0, 0]
+ begun = time()
+ next_report = [time() + 2]
+ def direct(obj, part, nodes):
+ found_count = 0
+ for node in nodes:
+ error_log = get_error_log('%(ip)s:%(port)s/%(device)s' % node)
+ try:
+ attempts, _ = direct_client.retry(
+ direct_client.direct_head_object, node, part,
+ account, container, obj, error_log=error_log,
+ retries=options.retries)
+ retries_done[0] += attempts - 1
+ found_count += 1
+ except ClientException, err:
+ if err.http_status not in (404, 507):
+ error_log('Giving up on /%s/%s/%s/%s: %s' % (part, account,
+ container, obj, err))
+ except (Exception, Timeout), err:
+ error_log('Giving up on /%s/%s/%s/%s: %s' % (part, account,
+ container, obj, err))
+ object_copies_found[found_count] += 1
+ objects_queried[0] += 1
+ if options.verbose and time() >= next_report[0]:
+ next_report[0] = time() + 5
+ eta, eta_unit = compute_eta(begun, objects_queried[0],
+ objects_listed)
+ print '\r\x1B[KQuerying objects: %d of %d, %d%s left, %d ' \
+ 'retries' % (objects_queried[0], objects_listed, round(eta),
+ eta_unit, retries_done[0]),
+ object_parts = {}
+ for obj in objects:
+ part, nodes = object_ring.get_nodes(account, container, obj)
+ if part not in object_parts:
+ object_parts[part] = part
+ coropool.spawn(direct, obj, part, nodes)
+ coropool.waitall()
+ distinct_partitions = len(object_parts)
+ copies_expected = distinct_partitions * object_ring.replica_count
+ copies_found = sum(a * b for a, b in enumerate(object_copies_found))
+ value = 100.0 * copies_found / copies_expected
+ if options.verbose:
+ elapsed, elapsed_unit = get_time_units(time() - begun)
+ print '\r\x1B[KQueried %d objects for dispersion reporting, ' \
+ '%d%s, %d retries' % (objects_listed, round(elapsed),
+ elapsed_unit, retries_done[0])
+ if objects_listed - distinct_partitions:
+ print 'There were %d overlapping partitions' % (
+ objects_listed - distinct_partitions)
+ if object_copies_found[2]:
+ print 'There were %d partitions missing one copy.' % \
+ object_copies_found[2]
+ if object_copies_found[1]:
+ print '! There were %d partitions missing two copies.' % \
+ object_copies_found[1]
+ if object_copies_found[0]:
+ print '!!! There were %d partitions missing all copies.' % \
+ object_copies_found[0]
+ print '%.02f%% of object copies found (%d of %d)' % (
+ value, copies_found, copies_expected)
+ print 'Sample represents %.02f%% of the object partition space' % (
+ 100.0 * distinct_partitions / object_ring.partition_count)
+ return value
+def container_put_report(coropool, connpool, count, options):
+ successes = [0]
+ failures = [0]
+ retries_done = [0]
+ begun = time()
+ next_report = [time() + 2]
+ def put(container):
+ with connpool.item() as conn:
+ try:
+ conn.put_container(container)
+ successes[0] += 1
+ except (Exception, Timeout):
+ failures[0] += 1
+ if options.verbose and time() >= next_report[0]:
+ next_report[0] = time() + 5
+ eta, eta_unit = compute_eta(begun, successes[0] + failures[0],
+ count)
+ print '\r\x1B[KCreating containers: %d of %d, %d%s left, %d ' \
+ 'retries' % (successes[0] + failures[0], count, eta,
+ eta_unit, retries_done[0]),
+ for x in xrange(count):
+ coropool.spawn(put, 'stats_container_put_%02x' % x)
+ coropool.waitall()
+ successes = successes[0]
+ failures = failures[0]
+ value = 100.0 * successes / count
+ if options.verbose:
+ elapsed, elapsed_unit = get_time_units(time() - begun)
+ print '\r\x1B[KCreated %d containers for performance reporting, ' \
+ '%d%s, %d retries' % (count, round(elapsed), elapsed_unit,
+ retries_done[0])
+ print '%d succeeded, %d failed, %.02f%% success rate' % (
+ successes, failures, value)
+ return value
+def container_head_report(coropool, connpool, options):
+ successes = [0]
+ failures = [0]
+ retries_done = [0]
+ begun = time()
+ next_report = [time() + 2]
+ with connpool.item() as conn:
+ containers = [c['name'] for c in
+ conn.get_account(prefix='stats_container_put_',
+ full_listing=True)]
+ count = len(containers)
+ def head(container):
+ with connpool.item() as conn:
+ try:
+ conn.head_container(container)
+ successes[0] += 1
+ except (Exception, Timeout):
+ failures[0] += 1
+ if options.verbose and time() >= next_report[0]:
+ next_report[0] = time() + 5
+ eta, eta_unit = compute_eta(begun, successes[0] + failures[0],
+ count)
+ print '\r\x1B[KHeading containers: %d of %d, %d%s left, %d ' \
+ 'retries' % (successes[0] + failures[0], count, eta,
+ eta_unit, retries_done[0]),
+ for container in containers:
+ coropool.spawn(head, container)
+ coropool.waitall()
+ successes = successes[0]
+ failures = failures[0]
+ value = 100.0 * successes / len(containers)
+ if options.verbose:
+ elapsed, elapsed_unit = get_time_units(time() - begun)
+ print '\r\x1B[KHeaded %d containers for performance reporting, ' \
+ '%d%s, %d retries' % (count, round(elapsed), elapsed_unit,
+ retries_done[0])
+ print '%d succeeded, %d failed, %.02f%% success rate' % (
+ successes, failures, value)
+ return value
+def container_get_report(coropool, connpool, options):
+ successes = [0]
+ failures = [0]
+ retries_done = [0]
+ begun = time()
+ next_report = [time() + 2]
+ with connpool.item() as conn:
+ containers = [c['name'] for c in
+ conn.get_account(prefix='stats_container_put_',
+ full_listing=True)]
+ count = len(containers)
+ def get(container):
+ with connpool.item() as conn:
+ try:
+ conn.get_container(container)
+ successes[0] += 1
+ except (Exception, Timeout):
+ failures[0] += 1
+ if options.verbose and time() >= next_report[0]:
+ next_report[0] = time() + 5
+ eta, eta_unit = compute_eta(begun, successes[0] + failures[0],
+ count)
+ print '\r\x1B[KListing containers: %d of %d, %d%s left, %d ' \
+ 'retries' % (successes[0] + failures[0], count, eta,
+ eta_unit, retries_done[0]),
+ for container in containers:
+ coropool.spawn(get, container)
+ coropool.waitall()
+ successes = successes[0]
+ failures = failures[0]
+ value = 100.0 * successes / len(containers)
+ if options.verbose:
+ elapsed, elapsed_unit = get_time_units(time() - begun)
+ print '\r\x1B[KListing %d containers for performance reporting, ' \
+ '%d%s, %d retries' % (count, round(elapsed), elapsed_unit,
+ retries_done[0])
+ print '%d succeeded, %d failed, %.02f%% success rate' % (
+ successes, failures, value)
+ return value
+def container_standard_listing_report(coropool, connpool, options):
+ begun = time()
+ if options.verbose:
+ print 'Listing big_container',
+ with connpool.item() as conn:
+ try:
+ value = len(conn.get_container('big_container', full_listing=True))
+ except ClientException, err:
+ if err.http_status != 404:
+ raise
+ print >>stderr, \
+ "big_container doesn't exist. Has stats-populate been run?"
+ return 0
+ if options.verbose:
+ elapsed, elapsed_unit = get_time_units(time() - begun)
+ print '\rGot %d objects (standard listing) in big_container, %d%s' % \
+ (value, elapsed, elapsed_unit)
+ return value
+def container_prefix_listing_report(coropool, connpool, options):
+ begun = time()
+ if options.verbose:
+ print 'Prefix-listing big_container',
+ value = 0
+ with connpool.item() as conn:
+ try:
+ for x in xrange(256):
+ value += len(conn.get_container('big_container',
+ prefix=('%02x' % x), full_listing=True))
+ except ClientException, err:
+ if err.http_status != 404:
+ raise
+ print >>stderr, \
+ "big_container doesn't exist. Has stats-populate been run?"
+ return 0
+ if options.verbose:
+ elapsed, elapsed_unit = get_time_units(time() - begun)
+ print '\rGot %d objects (prefix listing) in big_container, %d%s' % \
+ (value, elapsed, elapsed_unit)
+ return value
+def container_prefix_delimiter_listing_report(coropool, connpool, options):
+ begun = time()
+ if options.verbose:
+ print 'Prefix-delimiter-listing big_container',
+ value = [0]
+ def list(prefix=None):
+ marker = None
+ while True:
+ try:
+ with connpool.item() as conn:
+ listing = conn.get_container('big_container',
+ marker=marker, prefix=prefix, delimiter='/')
+ except ClientException, err:
+ if err.http_status != 404:
+ raise
+ print >>stderr, "big_container doesn't exist. " \
+ "Has stats-populate been run?"
+ return 0
+ if not len(listing):
+ break
+ marker = listing[-1].get('name', listing[-1].get('subdir'))
+ value[0] += len(listing)
+ subdirs = []
+ i = 0
+ # Capping the subdirs we'll list per dir to 10
+ while len(subdirs) < 10 and i < len(listing):
+ if 'subdir' in listing[i]:
+ subdirs.append(listing[i]['subdir'])
+ i += 1
+ del listing
+ for subdir in subdirs:
+ coropool.spawn(list, subdir)
+ sleep()
+ coropool.spawn(list)
+ coropool.waitall()
+ value = value[0]
+ if options.verbose:
+ elapsed, elapsed_unit = get_time_units(time() - begun)
+ print '\rGot %d objects/subdirs in big_container, %d%s' % (value,
+ elapsed, elapsed_unit)
+ return value
+def container_delete_report(coropool, connpool, options):
+ successes = [0]
+ failures = [0]
+ retries_done = [0]
+ begun = time()
+ next_report = [time() + 2]
+ with connpool.item() as conn:
+ containers = [c['name'] for c in
+ conn.get_account(prefix='stats_container_put_',
+ full_listing=True)]
+ count = len(containers)
+ def delete(container):
+ with connpool.item() as conn:
+ try:
+ conn.delete_container(container)
+ successes[0] += 1
+ except (Exception, Timeout):
+ failures[0] += 1
+ if options.verbose and time() >= next_report[0]:
+ next_report[0] = time() + 5
+ eta, eta_unit = compute_eta(begun, successes[0] + failures[0],
+ count)
+ print '\r\x1B[KDeleting containers: %d of %d, %d%s left, %d ' \
+ 'retries' % (successes[0] + failures[0], count, eta,
+ eta_unit, retries_done[0]),
+ for container in containers:
+ coropool.spawn(delete, container)
+ coropool.waitall()
+ successes = successes[0]
+ failures = failures[0]
+ value = 100.0 * successes / len(containers)
+ if options.verbose:
+ elapsed, elapsed_unit = get_time_units(time() - begun)
+ print '\r\x1B[KDeleting %d containers for performance reporting, ' \
+ '%d%s, %d retries' % (count, round(elapsed), elapsed_unit,
+ retries_done[0])
+ print '%d succeeded, %d failed, %.02f%% success rate' % (
+ successes, failures, value)
+ return value
+def object_put_report(coropool, connpool, count, options):
+ successes = [0]
+ failures = [0]
+ retries_done = [0]
+ begun = time()
+ next_report = [time() + 2]
+ def put(obj):
+ with connpool.item() as conn:
+ try:
+ conn.put_object('stats_object_put', obj, '')
+ successes[0] += 1
+ except (Exception, Timeout):
+ failures[0] += 1
+ if options.verbose and time() >= next_report[0]:
+ next_report[0] = time() + 5
+ eta, eta_unit = compute_eta(begun, successes[0] + failures[0],
+ count)
+ print '\r\x1B[KCreating objects: %d of %d, %d%s left, %d ' \
+ 'retries' % (successes[0] + failures[0], count, eta,
+ eta_unit, retries_done[0]),
+ with connpool.item() as conn:
+ conn.put_container('stats_object_put')
+ for x in xrange(count):
+ coropool.spawn(put, 'stats_object_put_%02x' % x)
+ coropool.waitall()
+ successes = successes[0]
+ failures = failures[0]
+ value = 100.0 * successes / count
+ if options.verbose:
+ elapsed, elapsed_unit = get_time_units(time() - begun)
+ print '\r\x1B[KCreated %d objects for performance reporting, ' \
+ '%d%s, %d retries' % (count, round(elapsed), elapsed_unit,
+ retries_done[0])
+ print '%d succeeded, %d failed, %.02f%% success rate' % (
+ successes, failures, value)
+ return value
+def object_head_report(coropool, connpool, options):
+ successes = [0]
+ failures = [0]
+ retries_done = [0]
+ begun = time()
+ next_report = [time() + 2]
+ with connpool.item() as conn:
+ objects = [o['name'] for o in conn.get_container('stats_object_put',
+ prefix='stats_object_put_', full_listing=True)]
+ count = len(objects)
+ def head(obj):
+ with connpool.item() as conn:
+ try:
+ conn.head_object('stats_object_put', obj)
+ successes[0] += 1
+ except (Exception, Timeout):
+ failures[0] += 1
+ if options.verbose and time() >= next_report[0]:
+ next_report[0] = time() + 5
+ eta, eta_unit = compute_eta(begun, successes[0] + failures[0],
+ count)
+ print '\r\x1B[KHeading objects: %d of %d, %d%s left, %d ' \
+ 'retries' % (successes[0] + failures[0], count, eta,
+ eta_unit, retries_done[0]),
+ for obj in objects:
+ coropool.spawn(head, obj)
+ coropool.waitall()
+ successes = successes[0]
+ failures = failures[0]
+ value = 100.0 * successes / len(objects)
+ if options.verbose:
+ elapsed, elapsed_unit = get_time_units(time() - begun)
+ print '\r\x1B[KHeaded %d objects for performance reporting, ' \
+ '%d%s, %d retries' % (count, round(elapsed), elapsed_unit,
+ retries_done[0])
+ print '%d succeeded, %d failed, %.02f%% success rate' % (
+ successes, failures, value)
+ return value
+def object_get_report(coropool, connpool, options):
+ successes = [0]
+ failures = [0]
+ retries_done = [0]
+ begun = time()
+ next_report = [time() + 2]
+ with connpool.item() as conn:
+ objects = [o['name'] for o in conn.get_container('stats_object_put',
+ prefix='stats_object_put_', full_listing=True)]
+ count = len(objects)
+ def get(obj):
+ with connpool.item() as conn:
+ try:
+ conn.get_object('stats_object_put', obj)
+ successes[0] += 1
+ except (Exception, Timeout):
+ failures[0] += 1
+ if options.verbose and time() >= next_report[0]:
+ next_report[0] = time() + 5
+ eta, eta_unit = compute_eta(begun, successes[0] + failures[0],
+ count)
+ print '\r\x1B[KRetrieving objects: %d of %d, %d%s left, %d ' \
+ 'retries' % (successes[0] + failures[0], count, eta,
+ eta_unit, retries_done[0]),
+ for obj in objects:
+ coropool.spawn(get, obj)
+ coropool.waitall()
+ successes = successes[0]
+ failures = failures[0]
+ value = 100.0 * successes / len(objects)
+ if options.verbose:
+ elapsed, elapsed_unit = get_time_units(time() - begun)
+ print '\r\x1B[KRetrieved %d objects for performance reporting, ' \
+ '%d%s, %d retries' % (count, round(elapsed), elapsed_unit,
+ retries_done[0])
+ print '%d succeeded, %d failed, %.02f%% success rate' % (
+ successes, failures, value)
+ return value
+def object_delete_report(coropool, connpool, options):
+ successes = [0]
+ failures = [0]
+ retries_done = [0]
+ begun = time()
+ next_report = [time() + 2]
+ with connpool.item() as conn:
+ objects = [o['name'] for o in conn.get_container('stats_object_put',
+ prefix='stats_object_put_', full_listing=True)]
+ count = len(objects)
+ def delete(obj):
+ with connpool.item() as conn:
+ try:
+ conn.delete_object('stats_object_put', obj)
+ successes[0] += 1
+ except (Exception, Timeout):
+ failures[0] += 1
+ if options.verbose and time() >= next_report[0]:
+ next_report[0] = time() + 5
+ eta, eta_unit = compute_eta(begun, successes[0] + failures[0],
+ count)
+ print '\r\x1B[KDeleting objects: %d of %d, %d%s left, %d ' \
+ 'retries' % (successes[0] + failures[0], count, eta,
+ eta_unit, retries_done[0]),
+ for obj in objects:
+ coropool.spawn(delete, obj)
+ coropool.waitall()
+ successes = successes[0]
+ failures = failures[0]
+ value = 100.0 * successes / len(objects)
+ if options.verbose:
+ elapsed, elapsed_unit = get_time_units(time() - begun)
+ print '\r\x1B[KDeleted %d objects for performance reporting, ' \
+ '%d%s, %d retries' % (count, round(elapsed), elapsed_unit,
+ retries_done[0])
+ print '%d succeeded, %d failed, %.02f%% success rate' % (
+ successes, failures, value)
+ return value
+if __name__ == '__main__':
+ patcher.monkey_patch()
+ hubs.get_hub().debug_exceptions = False
+ parser = OptionParser(usage='''
+Usage: %prog [options] [conf_file]
+[conf_file] defaults to /etc/swift/stats.conf'''.strip())
+ parser.add_option('-a', '--audit', action='store_true',
+ dest='audit', default=False,
+ help='Run the audit checks')
+ parser.add_option('-d', '--dispersion', action='store_true',
+ dest='dispersion', default=False,
+ help='Run the dispersion reports')
+ parser.add_option('-o', '--output', dest='csv_output',
+ default=None,
+ help='Override where the CSV report is written '
+ '(default from conf file); the keyword None will '
+ 'suppress the CSV report')
+ parser.add_option('-p', '--performance', action='store_true',
+ dest='performance', default=False,
+ help='Run the performance reports')
+ parser.add_option('-q', '--quiet', action='store_false', dest='verbose',
+ default=True, help='Suppress status output')
+ parser.add_option('-r', '--retries', dest='retries',
+ default=None,
+ help='Override retry attempts (default from conf file)')
+ args = argv[1:]
+ if not args:
+ args.append('-h')
+ (options, args) = parser.parse_args(args)
+ conf_file = '/etc/swift/stats.conf'
+ if args:
+ conf_file = args.pop(0)
+ c = ConfigParser()
+ if not
+ exit('Unable to read config file: %s' % conf_file)
+ conf = dict(c.items('stats'))
+ swift_dir = conf.get('swift_dir', '/etc/swift')
+ dispersion_coverage = int(conf.get('dispersion_coverage', 1))
+ container_put_count = int(conf.get('container_put_count', 1000))
+ object_put_count = int(conf.get('object_put_count', 1000))
+ concurrency = int(conf.get('concurrency', 50))
+ if options.retries:
+ options.retries = int(options.retries)
+ else:
+ options.retries = int(conf.get('retries', 5))
+ if not options.csv_output:
+ csv_output = conf.get('csv_output', '/etc/swift/stats.csv')
+ coropool = GreenPool(size=concurrency)
+ url, token = get_auth(conf['auth_url'], conf['auth_user'],
+ conf['auth_key'])
+ account = url.rsplit('/', 1)[1]
+ connpool = Pool(max_size=concurrency)
+ connpool.create = lambda: Connection(conf['auth_url'],
+ conf['auth_user'], conf['auth_key'],
+ retries=options.retries, preauthurl=url,
+ preauthtoken=token)
+ report = [time(), 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+ 0, 0, 0, 0]
+ xrange(len(report))
+ container_ring = Ring(os.path.join(swift_dir, 'container.ring.gz'))
+ object_ring = Ring(os.path.join(swift_dir, 'object.ring.gz'))
+ if options.audit:
+ audit(coropool, connpool, account, container_ring, object_ring, options)
+ if options.verbose and (options.dispersion or options.performance):
+ print
+ if options.dispersion:
+ begin = time()
+ report[R_CDR_VALUE] = container_dispersion_report(coropool, connpool,
+ account, container_ring, options)
+ report[R_CDR_TIME] = time() - begin
+ if options.verbose:
+ print
+ begin = time()
+ report[R_ODR_VALUE] = object_dispersion_report(coropool, connpool,
+ account, object_ring, options)
+ report[R_ODR_TIME] = time() - begin
+ if options.verbose and options.performance:
+ print
+ if options.performance:
+ begin = time()
+ report[R_CPUT_RATE] = container_put_report(coropool, connpool,
+ container_put_count, options)
+ report[R_CPUT_TIME] = time() - begin
+ if options.verbose:
+ print
+ begin = time()
+ report[R_CHEAD_RATE] = \
+ container_head_report(coropool, connpool, options)
+ report[R_CHEAD_TIME] = time() - begin
+ if options.verbose:
+ print
+ begin = time()
+ report[R_CGET_RATE] = container_get_report(coropool, connpool, options)
+ report[R_CGET_TIME] = time() - begin
+ if options.verbose:
+ print
+ begin = time()
+ report[R_CDELETE_RATE] = \
+ container_delete_report(coropool, connpool, options)
+ report[R_CDELETE_TIME] = time() - begin
+ if options.verbose:
+ print
+ begin = time()
+ container_standard_listing_report(coropool, connpool, options)
+ report[R_CLSTANDARD_TIME] = time() - begin
+ if options.verbose:
+ print
+ begin = time()
+ container_prefix_listing_report(coropool, connpool, options)
+ report[R_CLPREFIX_TIME] = time() - begin
+ if options.verbose:
+ print
+ begin = time()
+ container_prefix_delimiter_listing_report(coropool, connpool, options)
+ report[R_CLPREDELIM_TIME] = time() - begin
+ if options.verbose:
+ print
+ begin = time()
+ report[R_OPUT_RATE] = \
+ object_put_report(coropool, connpool, object_put_count, options)
+ report[R_OPUT_TIME] = time() - begin
+ if options.verbose:
+ print
+ begin = time()
+ report[R_OHEAD_RATE] = object_head_report(coropool, connpool, options)
+ report[R_OHEAD_TIME] = time() - begin
+ if options.verbose:
+ print
+ begin = time()
+ report[R_OGET_RATE] = object_get_report(coropool, connpool, options)
+ report[R_OGET_TIME] = time() - begin
+ if options.verbose:
+ print
+ begin = time()
+ report[R_ODELETE_RATE] = \
+ object_delete_report(coropool, connpool, options)
+ report[R_ODELETE_TIME] = time() - begin
+ if options.csv_output != 'None':
+ try:
+ if not os.path.exists(csv_output):
+ f = open(csv_output, 'wb')
+ f.write('Timestamp,'
+ 'Container Dispersion Report Time,'
+ 'Container Dispersion Report Value,'
+ 'Object Dispersion Report Time,'
+ 'Object Dispersion Report Value,'
+ 'Container PUT Report Time,'
+ 'Container PUT Report Success Rate,'
+ 'Container HEAD Report Time,'
+ 'Container HEAD Report Success Rate,'
+ 'Container GET Report Time,'
+ 'Container GET Report Success Rate'
+ 'Container DELETE Report Time,'
+ 'Container DELETE Report Success Rate,'
+ 'Container Standard Listing Time,'
+ 'Container Prefix Listing Time,'
+ 'Container Prefix Delimiter Listing Time,'
+ 'Object PUT Report Time,'
+ 'Object PUT Report Success Rate,'
+ 'Object HEAD Report Time,'
+ 'Object HEAD Report Success Rate,'
+ 'Object GET Report Time,'
+ 'Object GET Report Success Rate'
+ 'Object DELETE Report Time,'
+ 'Object DELETE Report Success Rate\r\n')
+ csv = csv.writer(f)
+ else:
+ csv = csv.writer(open(csv_output, 'ab'))
+ csv.writerow(report)
+ except Exception, err:
+ print >>stderr, 'Could not write CSV report:', err
diff --git a/ b/
new file mode 100644
index 000000000..14d911b6c
--- /dev/null
+++ b/
@@ -0,0 +1,48 @@
+# Copyright (c) 2010 OpenStack, LLC.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from distutils.core import setup
+ name='swift',
+ version='1.0.0-1',
+ description='Swift',
+ license='Apache License (2.0)',
+ author='OpenStack, LLC.',
+ url='',
+ packages=['swift', 'swift.common'],
+ classifiers=[
+ 'Development Status :: 4 - Beta',
+ 'License :: OSI Approved :: Apache Software License',
+ 'Operating System :: POSIX :: Linux',
+ 'Programming Language :: Python :: 2.6',
+ 'Environment :: No Input/Output (Daemon)',
+ ],
+ scripts=['bin/', 'bin/',
+ 'bin/', 'bin/',
+ 'bin/', 'bin/',
+ 'bin/',
+ 'bin/', 'bin/',
+ 'bin/',
+ 'bin/',
+ 'bin/', 'bin/',
+ 'bin/', 'bin/',
+ 'bin/', 'bin/',
+ 'bin/', 'bin/',
+ 'bin/', 'bin/',
+ 'bin/', 'bin/',
+ 'bin/']
diff --git a/swift/ b/swift/
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/swift/
diff --git a/swift/common/ b/swift/common/
new file mode 100644
index 000000000..fe1925beb
--- /dev/null
+++ b/swift/common/
@@ -0,0 +1,6 @@
+""" Code common to all of Swift. """
+FILE_SIZE_LIMIT = 5368709122
diff --git a/swift/common/ b/swift/common/
new file mode 100644
index 000000000..1e7be05da
--- /dev/null
+++ b/swift/common/
@@ -0,0 +1,98 @@
+# Copyright (c) 2010 OpenStack, LLC.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from ConfigParser import ConfigParser, NoOptionError
+import os
+import time
+from webob.request import Request
+from webob.exc import HTTPUnauthorized, HTTPPreconditionFailed
+from eventlet.timeout import Timeout
+from swift.common.utils import split_path
+from swift.common.bufferedhttp import http_connect_raw as http_connect
+class DevAuthMiddleware(object):
+ """
+ Auth Middleware that uses the dev auth server
+ """
+ def __init__(self, app, conf, memcache_client, logger):
+ = app
+ self.memcache_client = memcache_client
+ self.logger = logger
+ self.conf = conf
+ self.auth_host = conf.get('bind_ip', '')
+ self.auth_port = int(conf.get('bind_port', 11000))
+ self.timeout = int(conf.get('node_timeout', 10))
+ def __call__(self, env, start_response):
+ req = Request(env)
+ if req.path != '/healthcheck':
+ if 'x-storage-token' in req.headers and \
+ 'x-auth-token' not in req.headers:
+ req.headers['x-auth-token'] = req.headers['x-storage-token']
+ version, account, container, obj = split_path(req.path, 1, 4, True)
+ if account is None:
+ return HTTPPreconditionFailed(request=req, body='Bad URL')(
+ env, start_response)
+ if not req.headers.get('x-auth-token'):
+ return HTTPPreconditionFailed(request=req,
+ body='Missing Auth Token')(env, start_response)
+ if account is None:
+ return HTTPPreconditionFailed(
+ request=req, body='Bad URL')(env, start_response)
+ if not self.auth(account, req.headers['x-auth-token']):
+ return HTTPUnauthorized(request=req)(env, start_response)
+ # If we get here, then things should be good.
+ return, start_response)
+ def auth(self, account, token):
+ """
+ Dev authorization implmentation
+ :param account: account name
+ :param token: auth token
+ :returns: True if authorization is successful, False otherwise
+ """
+ key = 'auth/%s/%s' % (account, token)
+ now = time.time()
+ cached_auth_data = self.memcache_client.get(key)
+ if cached_auth_data:
+ start, expiration = cached_auth_data
+ if now - start <= expiration:
+ return True
+ try:
+ with Timeout(self.timeout):
+ conn = http_connect(self.auth_host, self.auth_port, 'GET',
+ '/token/%s/%s' % (account, token))
+ resp = conn.getresponse()
+ conn.close()
+ if resp.status == 204:
+ validated = float(resp.getheader('x-auth-ttl'))
+ else:
+ validated = False
+ except:
+ self.logger.exception('ERROR with auth')
+ return False
+ if not validated:
+ return False
+ else:
+ val = (now, validated)
+ self.memcache_client.set(key, val, timeout=validated)
+ return True
diff --git a/swift/common/ b/swift/common/
new file mode 100644
index 000000000..ed5f0cec2
--- /dev/null
+++ b/swift/common/
@@ -0,0 +1,158 @@
+# Copyright (c) 2010 OpenStack, LLC.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+Monkey Patch httplib.HTTPResponse to buffer reads of headers. This can improve
+performance when making large numbers of small HTTP requests. This module
+also provides helper functions to make HTTP connections using
+.. warning::
+ If you use this, be sure that the libraries you are using do not access
+ the socket directly (xmlrpclib, I'm looking at you :/), and instead
+ make all calls through httplib.
+from urllib import quote
+import logging
+import time
+from import HTTPConnection, HTTPResponse, _UNKNOWN, \
+class BufferedHTTPResponse(HTTPResponse):
+ """HTTPResponse class that buffers reading of headers"""
+ def __init__(self, sock, debuglevel=0, strict=0,
+ method=None): # pragma: no cover
+ self.sock = sock
+ self.fp = sock.makefile('rb')
+ self.debuglevel = debuglevel
+ self.strict = strict
+ self._method = method
+ self.msg = None
+ # from the Status-Line of the response
+ self.version = _UNKNOWN # HTTP-Version
+ self.status = _UNKNOWN # Status-Code
+ self.reason = _UNKNOWN # Reason-Phrase
+ self.chunked = _UNKNOWN # is "chunked" being used?
+ self.chunk_left = _UNKNOWN # bytes left to read in current chunk
+ self.length = _UNKNOWN # number of bytes left in response
+ self.will_close = _UNKNOWN # conn will close at end of response
+ def expect_response(self):
+ self.fp = self.sock.makefile('rb', 0)
+ version, status, reason = self._read_status()
+ if status != CONTINUE:
+ self._read_status = lambda: (version, status, reason)
+ self.begin()
+ else:
+ self.status = status
+ self.reason = reason.strip()
+ self.version = 11
+ self.msg = HTTPMessage(self.fp, 0)
+ self.msg.fp = None
+class BufferedHTTPConnection(HTTPConnection):
+ """HTTPConnection class that uses BufferedHTTPResponse"""
+ response_class = BufferedHTTPResponse
+ def connect(self):
+ self._connected_time = time.time()
+ return HTTPConnection.connect(self)
+ def putrequest(self, method, url, skip_host=0, skip_accept_encoding=0):
+ self._method = method
+ self._path = url
+ self._txn_id = '-'
+ return HTTPConnection.putrequest(self, method, url, skip_host,
+ skip_accept_encoding)
+ def putheader(self, header, value):
+ if header.lower() == 'x-cf-trans-id':
+ self._txn_id = value
+ return HTTPConnection.putheader(self, header, value)
+ def getexpect(self):
+ response = BufferedHTTPResponse(self.sock, strict=self.strict,
+ method=self._method)
+ response.expect_response()
+ return response
+ def getresponse(self):
+ response = HTTPConnection.getresponse(self)
+ logging.debug("HTTP PERF: %.5f seconds to %s %s:%s %s (%s)" %
+ (time.time() - self._connected_time, self._method,,
+ self.port, self._path, self._txn_id))
+ return response
+def http_connect(ipaddr, port, device, partition, method, path,
+ headers=None, query_string=None):
+ """
+ Helper function to create a HTTPConnection object that is buffered
+ for backend Swift services.
+ :param ipaddr: IPv4 address to connect to
+ :param port: port to connect to
+ :param device: device of the node to query
+ :param partition: partition on the device
+ :param method: HTTP method to request ('GET', 'PUT', 'POST', etc.)
+ :param path: request path
+ :param headers: dictionary of headers
+ :param query_string: request query string
+ :returns: HTTPConnection object
+ """
+ conn = BufferedHTTPConnection('%s:%s' % (ipaddr, port))
+ path = quote('/' + device + '/' + str(partition) + path)
+ if query_string:
+ path += '?' + query_string
+ conn.path = path
+ conn.putrequest(method, path)
+ if headers:
+ for header, value in headers.iteritems():
+ conn.putheader(header, value)
+ conn.endheaders()
+ return conn
+def http_connect_raw(ipaddr, port, method, path, headers=None,
+ query_string=None):
+ """
+ Helper function to create a HTTPConnection object that is buffered.
+ :param ipaddr: IPv4 address to connect to
+ :param port: port to connect to
+ :param method: HTTP method to request ('GET', 'PUT', 'POST', etc.)
+ :param path: request path
+ :param headers: dictionary of headers
+ :param query_string: request query string
+ :returns: HTTPConnection object
+ """
+ conn = BufferedHTTPConnection('%s:%s' % (ipaddr, port))
+ if query_string:
+ path += '?' + query_string
+ conn.path = path
+ conn.putrequest(method, path)
+ if headers:
+ for header, value in headers.iteritems():
+ conn.putheader(header, value)
+ conn.endheaders()
+ return conn
diff --git a/swift/common/ b/swift/common/
new file mode 100644
index 000000000..3f3b9d8b4
--- /dev/null
+++ b/swift/common/
@@ -0,0 +1,718 @@
+# Copyright (c) 2010 OpenStack, LLC.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+Cloud Files client library used internally
+import socket
+from cStringIO import StringIO
+from httplib import HTTPConnection, HTTPException, HTTPSConnection
+from re import compile, DOTALL
+from tokenize import generate_tokens, STRING, NAME, OP
+from urllib import quote as _quote, unquote
+from urlparse import urlparse, urlunparse
+ from eventlet import sleep
+ from time import sleep
+def quote(value, safe='/'):
+ """
+ Patched version of urllib.quote that encodes utf8 strings before quoting
+ """
+ if isinstance(value, unicode):
+ value = value.encode('utf8')
+ return _quote(value, safe)
+# look for a real json parser first
+ # simplejson is popular and pretty good
+ from simplejson import loads as json_loads
+except ImportError:
+ try:
+ # 2.6 will have a json module in the stdlib
+ from json import loads as json_loads
+ except ImportError:
+ # fall back on local parser otherwise
+ comments = compile(r'/\*.*\*/|//[^\r\n]*', DOTALL)
+ def json_loads(string):
+ '''
+ Fairly competent json parser exploiting the python tokenizer and
+ eval(). -- From python-cloudfiles
+ _loads(serialized_json) -> object
+ '''
+ try:
+ res = []
+ consts = {'true': True, 'false': False, 'null': None}
+ string = '(' + comments.sub('', string) + ')'
+ for type, val, _, _, _ in \
+ generate_tokens(StringIO(string).readline):
+ if (type == OP and val not in '[]{}:,()-') or \
+ (type == NAME and val not in consts):
+ raise AttributeError()
+ elif type == STRING:
+ res.append('u')
+ res.append(val.replace('\\/', '/'))
+ else:
+ res.append(val)
+ return eval(''.join(res), {}, consts)
+ except:
+ raise AttributeError()
+class ClientException(Exception):
+ def __init__(self, msg, http_scheme='', http_host='', http_port='',
+ http_path='', http_query='', http_status=0, http_reason='',
+ http_device=''):
+ Exception.__init__(self, msg)
+ self.msg = msg
+ self.http_scheme = http_scheme
+ self.http_host = http_host
+ self.http_port = http_port
+ self.http_path = http_path
+ self.http_query = http_query
+ self.http_status = http_status
+ self.http_reason = http_reason
+ self.http_device = http_device
+ def __str__(self):
+ a = self.msg
+ b = ''
+ if self.http_scheme:
+ b += '%s://' % self.http_scheme
+ if self.http_host:
+ b += self.http_host
+ if self.http_port:
+ b += ':%s' % self.http_port
+ if self.http_path:
+ b += self.http_path
+ if self.http_query:
+ b += '?%s' % self.http_query
+ if self.http_status:
+ if b:
+ b = '%s %s' % (b, self.http_status)
+ else:
+ b = str(self.http_status)
+ if self.http_reason:
+ if b:
+ b = '%s %s' % (b, self.http_reason)
+ else:
+ b = '- %s' % self.http_reason
+ if self.http_device:
+ if b:
+ b = '%s: device %s' % (b, self.http_device)
+ else:
+ b = 'device %s' % self.http_device
+ return b and '%s: %s' % (a, b) or a
+def http_connection(url):
+ """
+ Make an HTTPConnection or HTTPSConnection
+ :param url: url to connect to
+ :returns: tuple of (parsed url, connection object)
+ :raises ClientException: Unable to handle protocol scheme
+ """
+ parsed = urlparse(url)
+ if parsed.scheme == 'http':
+ conn = HTTPConnection(parsed.netloc)
+ elif parsed.scheme == 'https':
+ conn = HTTPSConnection(parsed.netloc)
+ else:
+ raise ClientException('Cannot handle protocol scheme %s for url %s' %
+ (parsed.scheme, repr(url)))
+ return parsed, conn
+def get_auth(url, user, key, snet=False):
+ """
+ Get authentication credentials
+ :param url: authentication URL
+ :param user: user to auth as
+ :param key: key or passowrd for auth
+ :param snet: use SERVICENET internal network default is False
+ :returns: tuple of (storage URL, storage token, auth token)
+ :raises ClientException: HTTP GET request to auth URL failed
+ """
+ parsed, conn = http_connection(url)
+ conn.request('GET', parsed.path, '',
+ {'X-Auth-User': user, 'X-Auth-Key': key})
+ resp = conn.getresponse()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException('Auth GET failed', http_scheme=parsed.scheme,
+, http_port=conn.port,
+ http_path=parsed.path, http_status=resp.status,
+ http_reason=resp.reason)
+ url = resp.getheader('x-storage-url')
+ if snet:
+ parsed = list(urlparse(url))
+ # Second item in the list is the netloc
+ parsed[1] = 'snet-' + parsed[1]
+ url = urlunparse(parsed)
+ return url, resp.getheader('x-storage-token',
+ resp.getheader('x-auth-token'))
+def get_account(url, token, marker=None, limit=None, prefix=None,
+ http_conn=None, full_listing=False):
+ """
+ Get a listing of containers for the account.
+ :param url: storage URL
+ :param token: auth token
+ :param marker: marker query
+ :param limit: limit query
+ :param prefix: prefix query
+ :param http_conn: HTTP connection object (If None, it will create the
+ conn object)
+ :param full_listing: if True, return a full listing, else returns a max
+ of 10000 listings
+ :returns: a list of accounts
+ :raises ClientException: HTTP GET request failed
+ """
+ if not http_conn:
+ http_conn = http_connection(url)
+ if full_listing:
+ rv = []
+ listing = get_account(url, token, marker, limit, prefix, http_conn)
+ while listing:
+ rv.extend(listing)
+ marker = listing[-1]['name']
+ listing = get_account(url, token, marker, limit, prefix, http_conn)
+ return rv
+ parsed, conn = http_conn
+ qs = 'format=json'
+ if marker:
+ qs += '&marker=%s' % quote(marker)
+ if limit:
+ qs += '&limit=%d' % limit
+ if prefix:
+ qs += '&prefix=%s' % quote(prefix)
+ conn.request('GET', '%s?%s' % (parsed.path, qs), '',
+ {'X-Auth-Token': token})
+ resp = conn.getresponse()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException('Account GET failed', http_scheme=parsed.scheme,
+, http_port=conn.port,
+ http_path=parsed.path, http_query=qs, http_status=resp.status,
+ http_reason=resp.reason)
+ if resp.status == 204:
+ return []
+ return json_loads(
+def head_account(url, token, http_conn=None):
+ """
+ Get account stats.
+ :param url: storage URL
+ :param token: auth token
+ :param http_conn: HTTP connection object (If None, it will create the
+ conn object)
+ :returns: a tuple of (container count, object count, bytes used)
+ :raises ClientException: HTTP HEAD request failed
+ """
+ if http_conn:
+ parsed, conn = http_conn
+ else:
+ parsed, conn = http_connection(url)
+ conn.request('HEAD', parsed.path, '', {'X-Auth-Token': token})
+ resp = conn.getresponse()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException('Account HEAD failed', http_scheme=parsed.scheme,
+, http_port=conn.port,
+ http_path=parsed.path, http_status=resp.status,
+ http_reason=resp.reason)
+ return int(resp.getheader('x-account-container-count', 0)), \
+ int(resp.getheader('x-account-object-count', 0)), \
+ int(resp.getheader('x-account-bytes-used', 0))
+def get_container(url, token, container, marker=None, limit=None,
+ prefix=None, delimiter=None, http_conn=None,
+ full_listing=False):
+ """
+ Get a listing of objects for the container.
+ :param url: storage URL
+ :param token: auth token
+ :param container: container name to get a listing for
+ :param marker: marker query
+ :param limit: limit query
+ :param prefix: prefix query
+ :param delimeter: string to delimit the queries on
+ :param http_conn: HTTP connection object (If None, it will create the
+ conn object)
+ :param full_listing: if True, return a full listing, else returns a max
+ of 10000 listings
+ :returns: a list of objects
+ :raises ClientException: HTTP GET request failed
+ """
+ if not http_conn:
+ http_conn = http_connection(url)
+ if full_listing:
+ rv = []
+ listing = get_container(url, token, container, marker, limit, prefix,
+ delimiter, http_conn)
+ while listing:
+ rv.extend(listing)
+ if not delimiter:
+ marker = listing[-1]['name']
+ else:
+ marker = listing[-1].get('name', listing[-1].get('subdir'))
+ listing = get_container(url, token, container, marker, limit,
+ prefix, delimiter, http_conn)
+ return rv
+ parsed, conn = http_conn
+ path = '%s/%s' % (parsed.path, quote(container))
+ qs = 'format=json'
+ if marker:
+ qs += '&marker=%s' % quote(marker)
+ if limit:
+ qs += '&limit=%d' % limit
+ if prefix:
+ qs += '&prefix=%s' % quote(prefix)
+ if delimiter:
+ qs += '&delimiter=%s' % quote(delimiter)
+ conn.request('GET', '%s?%s' % (path, qs), '', {'X-Auth-Token': token})
+ resp = conn.getresponse()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException('Container GET failed',
+ http_scheme=parsed.scheme,,
+ http_port=conn.port, http_path=path, http_query=qs,
+ http_status=resp.status, http_reason=resp.reason)
+ if resp.status == 204:
+ return []
+ return json_loads(
+def head_container(url, token, container, http_conn=None):
+ """
+ Get container stats.
+ :param url: storage URL
+ :param token: auth token
+ :param container: container name to get stats for
+ :param http_conn: HTTP connection object (If None, it will create the
+ conn object)
+ :returns: a tuple of (object count, bytes used)
+ :raises ClientException: HTTP HEAD request failed
+ """
+ if http_conn:
+ parsed, conn = http_conn
+ else:
+ parsed, conn = http_connection(url)
+ path = '%s/%s' % (parsed.path, quote(container))
+ conn.request('HEAD', path, '', {'X-Auth-Token': token})
+ resp = conn.getresponse()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException('Container HEAD failed',
+ http_scheme=parsed.scheme,,
+ http_port=conn.port, http_path=path, http_status=resp.status,
+ http_reason=resp.reason)
+ return int(resp.getheader('x-container-object-count', 0)), \
+ int(resp.getheader('x-container-bytes-used', 0))
+def put_container(url, token, container, http_conn=None):
+ """
+ Create a container
+ :param url: storage URL
+ :param token: auth token
+ :param container: container name to create
+ :param http_conn: HTTP connection object (If None, it will create the
+ conn object)
+ :raises ClientException: HTTP PUT request failed
+ """
+ if http_conn:
+ parsed, conn = http_conn
+ else:
+ parsed, conn = http_connection(url)
+ path = '%s/%s' % (parsed.path, quote(container))
+ conn.request('PUT', path, '', {'X-Auth-Token': token})
+ resp = conn.getresponse()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException('Container PUT failed',
+ http_scheme=parsed.scheme,,
+ http_port=conn.port, http_path=path, http_status=resp.status,
+ http_reason=resp.reason)
+def delete_container(url, token, container, http_conn=None):
+ """
+ Delete a container
+ :param url: storage URL
+ :param token: auth token
+ :param container: container name to delete
+ :param http_conn: HTTP connection object (If None, it will create the
+ conn object)
+ :raises ClientException: HTTP DELETE request failed
+ """
+ if http_conn:
+ parsed, conn = http_conn
+ else:
+ parsed, conn = http_connection(url)
+ path = '%s/%s' % (parsed.path, quote(container))
+ conn.request('DELETE', path, '', {'X-Auth-Token': token})
+ resp = conn.getresponse()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException('Container DELETE failed',
+ http_scheme=parsed.scheme,,
+ http_port=conn.port, http_path=path, http_status=resp.status,
+ http_reason=resp.reason)
+def get_object(url, token, container, name, http_conn=None,
+ resp_chunk_size=None):
+ """
+ Get an object
+ :param url: storage URL
+ :param token: auth token
+ :param container: container name that the object is in
+ :param name: object name to get
+ :param http_conn: HTTP connection object (If None, it will create the
+ conn object)
+ :param resp_chunk_size: if defined, chunk size of data to read
+ :returns: a list of objects
+ :raises ClientException: HTTP GET request failed
+ """
+ if http_conn:
+ parsed, conn = http_conn
+ else:
+ parsed, conn = http_connection(url)
+ path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
+ conn.request('GET', path, '', {'X-Auth-Token': token})
+ resp = conn.getresponse()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException('Object GET failed', http_scheme=parsed.scheme,
+, http_port=conn.port, http_path=path,
+ http_status=resp.status, http_reason=resp.reason)
+ metadata = {}
+ for key, value in resp.getheaders():
+ if key.lower().startswith('x-object-meta-'):
+ metadata[unquote(key[len('x-object-meta-'):])] = unquote(value)
+ if resp_chunk_size:
+ def _object_body():
+ buf =
+ while buf:
+ yield buf
+ buf =
+ object_body = _object_body()
+ else:
+ object_body =
+ return resp.getheader('content-type'), \
+ int(resp.getheader('content-length', 0)), \
+ resp.getheader('last-modified'), \
+ resp.getheader('etag').strip('"'), \
+ metadata, \
+ object_body
+def head_object(url, token, container, name, http_conn=None):
+ """
+ Get object info
+ :param url: storage URL
+ :param token: auth token
+ :param container: container name that the object is in
+ :param name: object name to get info for
+ :param http_conn: HTTP connection object (If None, it will create the
+ conn object)
+ :returns: a tuple of (content type, content length, last modfied, etag,
+ dictionary of metadata)
+ :raises ClientException: HTTP HEAD request failed
+ """
+ if http_conn:
+ parsed, conn = http_conn
+ else:
+ parsed, conn = http_connection(url)
+ path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
+ conn.request('HEAD', path, '', {'X-Auth-Token': token})
+ resp = conn.getresponse()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException('Object HEAD failed', http_scheme=parsed.scheme,
+, http_port=conn.port, http_path=path,
+ http_status=resp.status, http_reason=resp.reason)
+ metadata = {}
+ for key, value in resp.getheaders():
+ if key.lower().startswith('x-object-meta-'):
+ metadata[unquote(key[len('x-object-meta-'):])] = unquote(value)
+ return resp.getheader('content-type'), \
+ int(resp.getheader('content-length', 0)), \
+ resp.getheader('last-modified'), \
+ resp.getheader('etag').strip('"'), \
+ metadata
+def put_object(url, token, container, name, contents, metadata={},
+ content_length=None, etag=None, chunk_size=65536,
+ content_type=None, http_conn=None):
+ """
+ Put an object
+ :param url: storage URL
+ :param token: auth token
+ :param container: container name that the object is in
+ :param name: object name to put
+ :param contents: file like object to read object data from
+ :param metadata: dictionary of object metadata
+ :param content_length: value to send as content-length header
+ :param etag: etag of contents
+ :param chunk_size: chunk size of data to write
+ :param content_type: value to send as content-type header
+ :param http_conn: HTTP connection object (If None, it will create the
+ conn object)
+ :returns: etag from server response
+ :raises ClientException: HTTP PUT request failed
+ """
+ if http_conn:
+ parsed, conn = http_conn
+ else:
+ parsed, conn = http_connection(url)
+ path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
+ headers = {'X-Auth-Token': token}
+ for key, value in metadata.iteritems():
+ headers['X-Object-Meta-%s' % quote(key)] = quote(value)
+ if etag:
+ headers['ETag'] = etag.strip('"')
+ if content_length is not None:
+ headers['Content-Length'] = str(content_length)
+ if content_type is not None:
+ headers['Content-Type'] = content_type
+ if not contents:
+ headers['Content-Length'] = '0'
+ if hasattr(contents, 'read'):
+ conn.putrequest('PUT', path)
+ for header, value in headers.iteritems():
+ conn.putheader(header, value)
+ if not content_length:
+ conn.putheader('Transfer-Encoding', 'chunked')
+ conn.endheaders()
+ chunk =
+ while chunk:
+ if not content_length:
+ conn.send('%x\r\n%s\r\n' % (len(chunk), chunk))
+ else:
+ conn.send(chunk)
+ chunk =
+ if not content_length:
+ conn.send('0\r\n\r\n')
+ else:
+ conn.request('PUT', path, contents, headers)
+ resp = conn.getresponse()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException('Object PUT failed', http_scheme=parsed.scheme,
+, http_port=conn.port, http_path=path,
+ http_status=resp.status, http_reason=resp.reason)
+ return resp.getheader('etag').strip('"')
+def post_object(url, token, container, name, metadata, http_conn=None):
+ """
+ Change object metadata
+ :param url: storage URL
+ :param token: auth token
+ :param container: container name that the object is in
+ :param name: object name to change
+ :param metadata: dictionary of object metadata
+ :param http_conn: HTTP connection object (If None, it will create the
+ conn object)
+ :raises ClientException: HTTP POST request failed
+ """
+ if http_conn:
+ parsed, conn = http_conn
+ else:
+ parsed, conn = http_connection(url)
+ path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
+ headers = {'X-Auth-Token': token}
+ for key, value in metadata.iteritems():
+ headers['X-Object-Meta-%s' % quote(key)] = quote(value)
+ conn.request('POST', path, '', headers)
+ resp = conn.getresponse()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException('Object POST failed', http_scheme=parsed.scheme,
+, http_port=conn.port, http_path=path,
+ http_status=resp.status, http_reason=resp.reason)
+def delete_object(url, token, container, name, http_conn=None):
+ """
+ Delete object
+ :param url: storage URL
+ :param token: auth token
+ :param container: container name that the object is in
+ :param name: object name to delete
+ :param http_conn: HTTP connection object (If None, it will create the
+ conn object)
+ :raises ClientException: HTTP DELETE request failed
+ """
+ if http_conn:
+ parsed, conn = http_conn
+ else:
+ parsed, conn = http_connection(url)
+ path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
+ conn.request('DELETE', path, '', {'X-Auth-Token': token})
+ resp = conn.getresponse()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException('Object DELETE failed',
+ http_scheme=parsed.scheme,,
+ http_port=conn.port, http_path=path, http_status=resp.status,
+ http_reason=resp.reason)
+class Connection(object):
+ """Convenience class to make requests that will also retry the request"""
+ def __init__(self, authurl, user, key, retries=5, preauthurl=None,
+ preauthtoken=None, snet=False):
+ """
+ :param authurl: authenitcation URL
+ :param user: user name to authenticate as
+ :param key: key/password to authenticate with
+ :param retries: Number of times to retry the request before failing
+ :param preauthurl: storage URL (if you have already authenticated)
+ :param preauthtoken: authentication token (if you have already
+ authenticated)
+ :param snet: use SERVICENET internal network default is False
+ """
+ self.authurl = authurl
+ self.user = user
+ self.key = key
+ self.retries = retries
+ self.http_conn = None
+ self.url = preauthurl
+ self.token = preauthtoken
+ self.attempts = 0
+ self.snet = snet
+ def _retry(self, func, *args, **kwargs):
+ kwargs['http_conn'] = self.http_conn
+ self.attempts = 0
+ backoff = 1
+ while self.attempts <= self.retries:
+ self.attempts += 1
+ try:
+ if not self.url or not self.token:
+ self.url, self.token = \
+ get_auth(self.authurl, self.user, self.key, snet=self.snet)
+ self.http_conn = None
+ if not self.http_conn:
+ self.http_conn = http_connection(self.url)
+ kwargs['http_conn'] = self.http_conn
+ rv = func(self.url, self.token, *args, **kwargs)
+ return rv
+ except (socket.error, HTTPException):
+ if self.attempts > self.retries:
+ raise
+ self.http_conn = None
+ except ClientException, err:
+ if self.attempts > self.retries:
+ raise
+ if err.http_status == 401:
+ self.url = self.token = None
+ if self.attempts > 1:
+ raise
+ elif 500 <= err.http_status <= 599:
+ pass
+ else:
+ raise
+ sleep(backoff)
+ backoff *= 2
+ def head_account(self):
+ """Wrapper for head_account"""
+ return self._retry(head_account)
+ def get_account(self, marker=None, limit=None, prefix=None,
+ full_listing=False):
+ """Wrapper for get_account"""
+ # TODO: With full_listing=True this will restart the entire listing
+ # with each retry. Need to make a better version that just retries
+ # where it left off.
+ return self._retry(get_account, marker=marker, limit=limit,
+ prefix=prefix, full_listing=full_listing)
+ def head_container(self, container):
+ """Wrapper for head_container"""
+ return self._retry(head_container, container)
+ def get_container(self, container, marker=None, limit=None, prefix=None,
+ delimiter=None, full_listing=False):
+ """Wrapper for get_container"""
+ # TODO: With full_listing=True this will restart the entire listing
+ # with each retry. Need to make a better version that just retries
+ # where it left off.
+ return self._retry(get_container, container, marker=marker,
+ limit=limit, prefix=prefix, delimiter=delimiter,
+ full_listing=full_listing)
+ def put_container(self, container):
+ """Wrapper for put_container"""
+ return self._retry(put_container, container)
+ def delete_container(self, container):
+ """Wrapper for delete_container"""
+ return self._retry(delete_container, container)
+ def head_object(self, container, obj):
+ """Wrapper for head_object"""
+ return self._retry(head_object, container, obj)
+ def get_object(self, container, obj, resp_chunk_size=None):
+ """Wrapper for get_object"""
+ return self._retry(get_object, container, obj,
+ resp_chunk_size=resp_chunk_size)
+ def put_object(self, container, obj, contents, metadata={},
+ content_length=None, etag=None, chunk_size=65536,
+ content_type=None):
+ """Wrapper for put_object"""
+ return self._retry(put_object, container, obj, contents,
+ metadata=metadata, content_length=content_length, etag=etag,
+ chunk_size=chunk_size, content_type=content_type)
+ def post_object(self, container, obj, metadata):
+ """Wrapper for post_object"""
+ return self._retry(post_object, container, obj, metadata)
+ def delete_object(self, container, obj):
+ """Wrapper for delete_object"""
+ return self._retry(delete_object, container, obj)
diff --git a/swift/common/ b/swift/common/
new file mode 100644
index 000000000..8fdacb5a9
--- /dev/null
+++ b/swift/common/
@@ -0,0 +1,152 @@
+# Copyright (c) 2010 OpenStack, LLC.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import os
+import re
+from webob.exc import HTTPBadRequest, HTTPLengthRequired, \
+ HTTPRequestEntityTooLarge
+#: Max file size allowed for objects
+MAX_FILE_SIZE = 5 * 1024 * 1024 * 1024 + 2
+#: Max length of the name of a key for metadata
+#: Max length of the value of a key for metadata
+#: Max number of metadata items
+#: Max overall size of metadata
+#: Max object name length
+def check_metadata(req):
+ """
+ Check metadata sent for objects in the request headers.
+ :param req: request object
+ :raises HTTPBadRequest: bad metadata
+ """
+ meta_count = 0
+ meta_size = 0
+ for key, value in req.headers.iteritems():
+ if not key.lower().startswith('x-object-meta-'):
+ continue
+ key = key[len('x-object-meta-'):]
+ if not key:
+ return HTTPBadRequest(body='Metadata name cannot be empty',
+ request=req, content_type='text/plain')
+ meta_count += 1
+ meta_size += len(key) + len(value)
+ if len(key) > MAX_META_NAME_LENGTH:
+ return HTTPBadRequest(
+ body='Metadata name too long; max %d'
+ request=req, content_type='text/plain')
+ elif len(value) > MAX_META_VALUE_LENGTH:
+ return HTTPBadRequest(
+ body='Metadata value too long; max %d'
+ request=req, content_type='text/plain')
+ elif meta_count > MAX_META_COUNT:
+ return HTTPBadRequest(
+ body='Too many metadata items; max %d' % MAX_META_COUNT,
+ request=req, content_type='text/plain')
+ elif meta_size > MAX_META_OVERALL_SIZE:
+ return HTTPBadRequest(
+ body='Total metadata too large; max %d'
+ request=req, content_type='text/plain')
+ return None
+def check_object_creation(req, object_name):
+ """
+ Check to ensure that everything is alright about an object to be created.
+ :param req: HTTP request object
+ :param object_name: name of object to be created
+ :raises HTTPRequestEntityTooLarge: the object is too large
+ :raises HTTPLengthRequered: missing content-length header and not
+ a chunked request
+ :raises HTTPBadRequest: missing or bad content-type header, or
+ bad metadata
+ """
+ if req.content_length and req.content_length > MAX_FILE_SIZE:
+ return HTTPRequestEntityTooLarge(body='Your request is too large.',
+ request=req, content_type='text/plain')
+ if req.content_length is None and \
+ req.headers.get('transfer-encoding') != 'chunked':
+ return HTTPLengthRequired(request=req)
+ if len(object_name) > MAX_OBJECT_NAME_LENGTH:
+ return HTTPBadRequest(body='Object name length of %d longer than %d' %
+ (len(object_name), MAX_OBJECT_NAME_LENGTH), request=req,
+ content_type='text/plain')
+ if 'Content-Type' not in req.headers:
+ return HTTPBadRequest(request=req, content_type='text/plain',
+ body='No content type')
+ if not check_xml_encodable(req.headers['Content-Type']):
+ return HTTPBadRequest(request=req, body='Invalid Content-Type',
+ content_type='text/plain')
+ return check_metadata(req)
+def check_mount(root, drive):
+ """
+ Verify that the path to the device is a mount point and mounted. This
+ allows us to fast fail on drives that have been unmounted because of
+ issues, and also prevents us for accidently filling up the root partition.
+ :param root: base path where the devices are mounted
+ :param drive: drive name to be checked
+ :returns: True if it is a valid mounted device, False otherwise
+ """
+ if not drive.isalnum():
+ return False
+ path = os.path.join(root, drive)
+ return os.path.exists(path) and os.path.ismount(path)
+def check_float(string):
+ """
+ Helper function for checking if a string can be converted to a float.
+ :param string: string to be verified as a float
+ :returns: True if the string can be converted to a float, False otherwise
+ """
+ try:
+ float(string)
+ return True
+ except ValueError:
+ return False
+_invalid_xml = re.compile(ur'[^\x09\x0a\x0d\x20-\uD7FF\uE000-\uFFFD%s-%s]' %
+ (unichr(0x10000), unichr(0x10FFFF)))
+def check_xml_encodable(string):
+ """
+ Validate if a string can be encoded in xml.
+ :param string: string to be validated
+ :returns: True if the string can be encoded in xml, False otherwise
+ """
+ try:
+ return not'UTF-8'))
+ except UnicodeDecodeError:
+ return False
diff --git a/swift/common/ b/swift/common/
new file mode 100644
index 000000000..3464d451b
--- /dev/null
+++ b/swift/common/
@@ -0,0 +1,1463 @@
+# Copyright (c) 2010 OpenStack, LLC.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+""" Database code for Swift """
+from __future__ import with_statement
+from contextlib import contextmanager
+import hashlib
+import logging
+import operator
+import os
+from uuid import uuid4
+import time
+import cPickle as pickle
+import errno
+from random import randint
+from tempfile import mkstemp
+import math
+from eventlet import sleep
+import sqlite3
+from swift.common.utils import normalize_timestamp, renamer, \
+ mkdirs, lock_parent_directory, fallocate
+from swift.common.exceptions import LockTimeout
+#: Timeout for trying to connect to a DB
+#: Pickle protocol to use
+#: Max number of pending entries
+PENDING_CAP = 131072
+class DatabaseConnectionError(sqlite3.DatabaseError):
+ """More friendly error messages for DB Errors."""
+ def __init__(self, path, msg, timeout=0):
+ self.path = path
+ self.timeout = timeout
+ self.msg = msg
+ def __str__(self):
+ return 'DB connection error (%s, %s):\n%s' % (
+ self.path, self.timeout, self.msg)
+class GreenDBConnection(sqlite3.Connection):
+ """SQLite DB Connection handler that plays well with eventlet."""
+ def __init__(self, *args, **kwargs):
+ self.timeout = kwargs.get('timeout', BROKER_TIMEOUT)
+ kwargs['timeout'] = 0
+ self.db_file = args and args[0] or '-'
+ sqlite3.Connection.__init__(self, *args, **kwargs)
+ def _timeout(self, call):
+ with LockTimeout(self.timeout, self.db_file):
+ while True:
+ try:
+ return call()
+ except sqlite3.OperationalError, e:
+ if 'locked' not in str(e):
+ raise
+ sleep(0.05)
+ def execute(self, *args, **kwargs):
+ return self._timeout(lambda: sqlite3.Connection.execute(
+ self, *args, **kwargs))
+ def commit(self):
+ return self._timeout(lambda: sqlite3.Connection.commit(self))
+def dict_factory(crs, row):
+ """
+ This should only be used when you need a real dict,
+ i.e. when you're going to serialize the results.
+ """
+ return dict(
+ ((col[0], row[idx]) for idx, col in enumerate(crs.description)))
+def chexor(old, name, timestamp):
+ """
+ Each entry in the account and container databases is XORed by the 128-bit
+ hash on insert or delete. This serves as a rolling, order-independent hash
+ of the contents. (check + XOR)
+ :param old: hex representation of the current DB hash
+ :param name: name of the object or container being inserted
+ :param timestamp: timestamp of the new record
+ :returns: a hex representation of the new hash value
+ """
+ if name is None:
+ raise Exception('name is None!')
+ old = old.decode('hex')
+ new = hashlib.md5(('%s-%s' % (name, timestamp)).encode('utf_8')).digest()
+ response = ''.join(
+ map(chr, map(operator.xor, map(ord, old), map(ord, new))))
+ return response.encode('hex')
+def get_db_connection(path, timeout=30, okay_to_create=False):
+ """
+ Returns a properly configured SQLite database connection.
+ :param path: path to DB
+ :param timeout: timeout for connection
+ :param okay_to_create: if True, create the DB if it doesn't exist
+ :returns: DB connection object
+ """
+ try:
+ connect_time = time.time()
+ conn = sqlite3.connect(path, check_same_thread=False,
+ factory=GreenDBConnection, timeout=timeout)
+ if path != ':memory:' and not okay_to_create:
+ # attempt to detect and fail when connect creates the db file
+ stat = os.stat(path)
+ if stat.st_size == 0 and stat.st_ctime >= connect_time:
+ os.unlink(path)
+ raise DatabaseConnectionError(path,
+ 'DB file created by connect?')
+ conn.row_factory = sqlite3.Row
+ conn.text_factory = str
+ conn.execute('PRAGMA synchronous = NORMAL')
+ conn.execute('PRAGMA count_changes = OFF')
+ conn.execute('PRAGMA temp_store = MEMORY')
+ conn.create_function('chexor', 3, chexor)
+ except sqlite3.DatabaseError:
+ import traceback
+ raise DatabaseConnectionError(path, traceback.format_exc(),
+ timeout=timeout)
+ return conn
+class DatabaseBroker(object):
+ """Encapsulates working with a database."""
+ def __init__(self, db_file, timeout=BROKER_TIMEOUT, logger=None,
+ account=None, container=None, pending_timeout=10,
+ stale_reads_ok=False):
+ """ Encapsulates working with a database. """
+ self.conn = None
+ self.db_file = db_file
+ self.pending_file = self.db_file + '.pending'
+ self.pending_timeout = pending_timeout
+ self.stale_reads_ok = stale_reads_ok
+ self.db_dir = os.path.dirname(db_file)
+ self.timeout = timeout
+ self.logger = logger or logging.getLogger()
+ self.account = account
+ self.container = container
+ def initialize(self, put_timestamp=None):
+ """
+ Create the DB
+ :param put_timestamp: timestamp of initial PUT request
+ """
+ if self.db_file == ':memory:':
+ tmp_db_file = None
+ conn = get_db_connection(self.db_file, self.timeout)
+ else:
+ mkdirs(self.db_dir)
+ fd, tmp_db_file = mkstemp(suffix='.tmp', dir=self.db_dir)
+ os.close(fd)
+ conn = sqlite3.connect(tmp_db_file, check_same_thread=False,
+ factory=GreenDBConnection, timeout=0)
+ # creating dbs implicitly does a lot of transactions, so we
+ # pick fast, unsafe options here and do a big fsync at the end.
+ conn.execute('PRAGMA synchronous = OFF')
+ conn.execute('PRAGMA temp_store = MEMORY')
+ conn.execute('PRAGMA journal_mode = MEMORY')
+ conn.create_function('chexor', 3, chexor)
+ conn.row_factory = sqlite3.Row
+ conn.text_factory = str
+ conn.executescript("""
+ CREATE TABLE outgoing_sync (
+ remote_id TEXT UNIQUE,
+ sync_point INTEGER,
+ updated_at TEXT DEFAULT 0
+ );
+ CREATE TABLE incoming_sync (
+ remote_id TEXT UNIQUE,
+ sync_point INTEGER,
+ updated_at TEXT DEFAULT 0
+ );
+ CREATE TRIGGER outgoing_sync_insert AFTER INSERT ON outgoing_sync
+ UPDATE outgoing_sync
+ SET updated_at = STRFTIME('%s', 'NOW')
+ END;
+ CREATE TRIGGER outgoing_sync_update AFTER UPDATE ON outgoing_sync
+ UPDATE outgoing_sync
+ SET updated_at = STRFTIME('%s', 'NOW')
+ END;
+ CREATE TRIGGER incoming_sync_insert AFTER INSERT ON incoming_sync
+ UPDATE incoming_sync
+ SET updated_at = STRFTIME('%s', 'NOW')
+ END;
+ CREATE TRIGGER incoming_sync_update AFTER UPDATE ON incoming_sync
+ UPDATE incoming_sync
+ SET updated_at = STRFTIME('%s', 'NOW')
+ END;
+ """)
+ if not put_timestamp:
+ put_timestamp = normalize_timestamp(0)
+ self._initialize(conn, put_timestamp)
+ conn.commit()
+ if tmp_db_file:
+ conn.close()
+ with open(tmp_db_file, 'r+b') as fp:
+ os.fsync(fp.fileno())
+ with lock_parent_directory(self.db_file, self.pending_timeout):
+ if os.path.exists(self.db_file):
+ # It's as if there was a "condition" where different parts
+ # of the system were "racing" each other.
+ raise DatabaseConnectionError(self.db_file,
+ 'DB created by someone else while working?')
+ renamer(tmp_db_file, self.db_file)
+ self.conn = get_db_connection(self.db_file, self.timeout)
+ else:
+ self.conn = conn
+ def delete_db(self, timestamp):
+ """
+ Mark the DB as deleted
+ :param timestamp: delete timestamp
+ """
+ timestamp = normalize_timestamp(timestamp)
+ with self.get() as conn:
+ self._delete_db(conn, timestamp)
+ conn.commit()
+ @contextmanager
+ def get(self):
+ """Use with the "with" statement; returns a database connection."""
+ if not self.conn:
+ if self.db_file != ':memory:' and os.path.exists(self.db_file):
+ self.conn = get_db_connection(self.db_file, self.timeout)
+ else:
+ raise DatabaseConnectionError(self.db_file, "DB doesn't exist")
+ conn = self.conn
+ self.conn = None
+ try:
+ yield conn
+ conn.rollback()
+ self.conn = conn
+ except:
+ conn.close()
+ raise
+ @contextmanager
+ def lock(self):
+ """Use with the "with" statement; locks a database."""
+ if not self.conn:
+ if self.db_file != ':memory:' and os.path.exists(self.db_file):
+ self.conn = get_db_connection(self.db_file, self.timeout)
+ else:
+ raise DatabaseConnectionError(self.db_file, "DB doesn't exist")
+ conn = self.conn
+ self.conn = None
+ orig_isolation_level = conn.isolation_level
+ conn.isolation_level = None
+ conn.execute('BEGIN IMMEDIATE')
+ try:
+ yield True
+ except:
+ pass
+ try:
+ conn.execute('ROLLBACK')
+ conn.isolation_level = orig_isolation_level
+ self.conn = conn
+ except: # pragma: no cover
+ logging.exception(
+ 'Broker error trying to rollback locked connection')
+ conn.close()
+ def newid(self, remote_id):
+ """
+ Re-id the database. This should be called after an rsync.
+ :param remote_id: the ID of the remote database being rsynced in
+ """
+ with self.get() as conn:
+ row = conn.execute('''
+ UPDATE %s_stat SET id=?
+ ''' % self.db_type, (str(uuid4()),))
+ row = conn.execute('''
+ ''' % self.db_contains_type).fetchone()
+ sync_point = row['ROWID'] if row else -1
+ conn.execute('''
+ INSERT OR REPLACE INTO incoming_sync (sync_point, remote_id)
+ VALUES (?, ?)
+ ''', (sync_point, remote_id))
+ self._newid(conn)
+ conn.commit()
+ def _newid(self, conn):
+ # Override for additional work when receiving an rsynced db.
+ pass
+ def merge_timestamps(self, created_at, put_timestamp, delete_timestamp):
+ """
+ Used in replication to handle updating timestamps.
+ :param created_at: create timestamp
+ :param put_timestamp: put timestamp
+ :param delete_timestamp: delete timestamp
+ """
+ with self.get() as conn:
+ row = conn.execute('''
+ UPDATE %s_stat SET created_at=MIN(?, created_at),
+ put_timestamp=MAX(?, put_timestamp),
+ delete_timestamp=MAX(?, delete_timestamp)
+ ''' % self.db_type, (created_at, put_timestamp, delete_timestamp))
+ conn.commit()
+ def get_items_since(self, start, count):
+ """
+ Get a list of objects in the database between start and end.
+ :param start: start ROWID
+ :param count: number to get
+ :returns: list of objects between start and end
+ """
+ try:
+ self._commit_puts()
+ except LockTimeout:
+ if not self.stale_reads_ok:
+ raise
+ with self.get() as conn:
+ curs = conn.execute('''
+ ''' % self.db_contains_type, (start, count))
+ curs.row_factory = dict_factory
+ return [r for r in curs]
+ def get_sync(self, id, incoming=True):
+ """
+ Gets the most recent sync point for a server from the sync table.
+ :param id: remote ID to get the sync_point for
+ :param incoming: if True, get the last incoming sync, otherwise get
+ the last outgoing sync
+ :returns: the sync point, or -1 if the id doesn't exist.
+ """
+ with self.get() as conn:
+ row = conn.execute(
+ "SELECT sync_point FROM %s_sync WHERE remote_id=?"
+ % ('incoming' if incoming else 'outgoing'), (id,)).fetchone()
+ if not row:
+ return -1
+ return row['sync_point']
+ def get_syncs(self, incoming=True):
+ """
+ Get a serialized copy of the sync table.
+ :param incoming: if True, get the last incoming sync, otherwise get
+ the last outgoing sync
+ :returns: list of {'remote_id', 'sync_point'}
+ """
+ with self.get() as conn:
+ curs = conn.execute('''
+ SELECT remote_id, sync_point FROM %s_sync
+ ''' % 'incoming' if incoming else 'outgoing')
+ result = []
+ for row in curs:
+ result.append({'remote_id': row[0], 'sync_point': row[1]})
+ return result
+ def get_replication_info(self):
+ """
+ Get information about the DB required for replication.
+ :returns: tuple of (hash, id, created_at, put_timestamp,
+ delete_timestamp) from the DB
+ """
+ try:
+ self._commit_puts()
+ except LockTimeout:
+ if not self.stale_reads_ok:
+ raise
+ with self.get() as conn:
+ curs = conn.execute('''
+ SELECT hash, id, created_at, put_timestamp, delete_timestamp,
+ %s_count AS count,
+ ON == '%s') LIMIT 1
+ ''' % (self.db_contains_type, self.db_type, self.db_contains_type))
+ curs.row_factory = dict_factory
+ return curs.fetchone()
+ def _commit_puts(self):
+ pass # stub to be overridden if need be
+ def merge_syncs(self, sync_points, incoming=True):
+ """
+ Merge a list of sync points with the incoming sync table.
+ :param sync_points: list of sync points where a sync point is a dict of
+ {'sync_point', 'remote_id'}
+ :param incoming: if True, get the last incoming sync, otherwise get
+ the last outgoing sync
+ """
+ with self.get() as conn:
+ for rec in sync_points:
+ try:
+ conn.execute('''
+ INSERT INTO %s_sync (sync_point, remote_id)
+ VALUES (?, ?)
+ ''' % ('incoming' if incoming else 'outgoing'),
+ (rec['sync_point'], rec['remote_id']))
+ except sqlite3.IntegrityError:
+ conn.execute('''
+ UPDATE %s_sync SET sync_point=max(?, sync_point)
+ WHERE remote_id=?
+ ''' % ('incoming' if incoming else 'outgoing'),
+ (rec['sync_point'], rec['remote_id']))
+ conn.commit()
+ def _preallocate(self):
+ """
+ The idea is to allocate space in front of an expanding db. If it gets
+ within 512k of a boundary, it allocates to the next boundary.
+ Boundaries are 2m, 5m, 10m, 25m, 50m, then every 50m after.
+ """
+ if self.db_file == ':memory:':
+ return
+ MB = (1024 * 1024)
+ def prealloc_points():
+ for pm in (1, 2, 5, 10, 25, 50):
+ yield pm * MB
+ while True:
+ pm += 50
+ yield pm * MB
+ stat = os.stat(self.db_file)
+ file_size = stat.st_size
+ allocated_size = stat.st_blocks * 512
+ for point in prealloc_points():
+ if file_size <= point - MB / 2:
+ prealloc_size = point
+ break
+ if allocated_size < prealloc_size:
+ with open(self.db_file, 'rb+') as fp:
+ fallocate(fp.fileno(), int(prealloc_size))
+class ContainerBroker(DatabaseBroker):
+ """Encapsulates working with a container database."""
+ db_type = 'container'
+ db_contains_type = 'object'
+ def _initialize(self, conn, put_timestamp):
+ """Creates a brand new database (tables, indices, triggers, etc.)"""
+ if not self.account:
+ raise ValueError(
+ 'Attempting to create a new database with no account set')
+ if not self.container:
+ raise ValueError(
+ 'Attempting to create a new database with no container set')
+ self.create_object_table(conn)
+ self.create_container_stat_table(conn, put_timestamp)
+ def create_object_table(self, conn):
+ """
+ Create the object table which is specifc to the container DB.
+ :param conn: DB connection object
+ """
+ conn.executescript("""
+ CREATE TABLE object (
+ created_at TEXT,
+ size INTEGER,
+ content_type TEXT,
+ etag TEXT,
+ );
+ CREATE INDEX ix_object_deleted ON object (deleted);
+ CREATE TRIGGER object_insert AFTER INSERT ON object
+ UPDATE container_stat
+ SET object_count = object_count + (1 - new.deleted),
+ bytes_used = bytes_used + new.size,
+ hash = chexor(hash,, new.created_at);
+ END;
+ CREATE TRIGGER object_update BEFORE UPDATE ON object
+ END;
+ CREATE TRIGGER object_delete AFTER DELETE ON object
+ UPDATE container_stat
+ SET object_count = object_count - (1 - old.deleted),
+ bytes_used = bytes_used - old.size,
+ hash = chexor(hash,, old.created_at);
+ END;
+ """)
+ def create_container_stat_table(self, conn, put_timestamp=None):
+ """
+ Create the container_stat table which is specifc to the container DB.
+ :param conn: DB connection object
+ :param put_timestamp: put timestamp
+ """
+ if put_timestamp is None:
+ put_timestamp = normalize_timestamp(0)
+ conn.executescript("""
+ CREATE TABLE container_stat (
+ account TEXT,
+ container TEXT,
+ created_at TEXT,
+ put_timestamp TEXT DEFAULT '0',
+ delete_timestamp TEXT DEFAULT '0',
+ object_count INTEGER,
+ bytes_used INTEGER,
+ reported_put_timestamp TEXT DEFAULT '0',
+ reported_delete_timestamp TEXT DEFAULT '0',
+ reported_object_count INTEGER DEFAULT 0,
+ reported_bytes_used INTEGER DEFAULT 0,
+ hash TEXT default '00000000000000000000000000000000',
+ id TEXT,
+ status TEXT DEFAULT '',
+ status_changed_at TEXT DEFAULT '0'
+ );
+ INSERT INTO container_stat (object_count, bytes_used)
+ VALUES (0, 0);
+ """)
+ conn.execute('''
+ UPDATE container_stat
+ SET account = ?, container = ?, created_at = ?, id = ?,
+ put_timestamp = ?
+ ''', (self.account, self.container, normalize_timestamp(time.time()),
+ str(uuid4()), put_timestamp))
+ def _newid(self, conn):
+ conn.execute('''
+ UPDATE container_stat
+ SET reported_put_timestamp = 0, reported_delete_timestamp = 0,
+ reported_object_count = 0, reported_bytes_used = 0''')
+ def update_put_timestamp(self, timestamp):
+ """
+ Update the put_timestamp. Only modifies it if it is greater than
+ the current timestamp.
+ :param timestamp: put timestamp
+ """
+ with self.get() as conn:
+ conn.execute('''
+ UPDATE container_stat SET put_timestamp = ?
+ WHERE put_timestamp < ? ''', (timestamp, timestamp))
+ conn.commit()
+ def _delete_db(self, conn, timestamp):
+ """
+ Mark the DB as deleted
+ :param conn: DB connection object
+ :param timestamp: timestamp to mark as deleted
+ """
+ conn.execute("""
+ UPDATE container_stat
+ SET delete_timestamp = ?,
+ status = 'DELETED',
+ status_changed_at = ?
+ WHERE delete_timestamp < ? """, (timestamp, timestamp, timestamp))
+ def empty(self):
+ """
+ Check if the DB is empty.
+ :returns: True if the database has no active objects, False otherwise
+ """
+ try:
+ self._commit_puts()
+ except LockTimeout:
+ if not self.stale_reads_ok:
+ raise
+ with self.get() as conn:
+ row = conn.execute(
+ 'SELECT object_count from container_stat').fetchone()
+ return (row[0] == 0)
+ def _commit_puts(self, item_list=None):
+ """Handles commiting rows in .pending files."""
+ if self.db_file == ':memory:' or not os.path.exists(self.pending_file):
+ return
+ if item_list is None:
+ item_list = []
+ with lock_parent_directory(self.pending_file, self.pending_timeout):
+ self._preallocate()
+ if not os.path.getsize(self.pending_file):
+ if item_list:
+ self.merge_items(item_list)
+ return
+ with open(self.pending_file, 'r+b') as fp:
+ for entry in':'):
+ if entry:
+ try:
+ (name, timestamp, size, content_type, etag,
+ deleted) = pickle.loads(entry.decode('base64'))
+ item_list.append({'name': name, 'created_at':
+ timestamp, 'size': size, 'content_type':
+ content_type, 'etag': etag,
+ 'deleted': deleted})
+ except:
+ self.logger.exception(
+ 'Invalid pending entry %s: %s'
+ % (self.pending_file, entry))
+ if item_list:
+ self.merge_items(item_list)
+ try:
+ os.ftruncate(fp.fileno(), 0)
+ except OSError, err:
+ if err.errno != errno.ENOENT:
+ raise
+ def reclaim(self, object_timestamp, sync_timestamp):
+ """
+ Delete rows from the object table that are marked deleted and
+ whose created_at timestamp is < object_timestamp. Also deletes rows
+ from incoming_sync and outgoing_sync where the updated_at timestamp is
+ < sync_timestamp.
+ :param object_timestamp: max created_at timestamp of object rows to
+ delete
+ :param sync_timestamp: max update_at timestamp of sync rows to delete
+ """
+ self._commit_puts()
+ with self.get() as conn:
+ conn.execute("""
+ DELETE FROM object
+ WHERE deleted = 1
+ AND created_at < ?""", (object_timestamp,))
+ try:
+ conn.execute('''
+ DELETE FROM outgoing_sync WHERE updated_at < ?
+ ''', (sync_timestamp,))
+ conn.execute('''
+ DELETE FROM incoming_sync WHERE updated_at < ?
+ ''', (sync_timestamp,))
+ except sqlite3.OperationalError, err:
+ # Old dbs didn't have updated_at in the _sync tables.
+ if 'no such column: updated_at' not in str(err):
+ raise
+ conn.commit()
+ def delete_object(self, name, timestamp):
+ """
+ Mark an object deleted.
+ :param name: object name to be deleted
+ :param timestamp: timestamp when the object was marked as deleted
+ """
+ self.put_object(name, timestamp, 0, 'application/deleted', 'noetag', 1)
+ def put_object(self, name, timestamp, size, content_type, etag, deleted=0):
+ """
+ Creates an object in the DB with its metadata.
+ :param name: object name to be created
+ :param timestamp: timestamp of when the object was created
+ :param size: object size
+ :param content_type: object content-type
+ :param etag: object etag
+ :param deleted: if True, marks the object as deleted and sets the
+ deteleted_at timestamp to timestamp
+ """
+ record = {'name': name, 'created_at': timestamp, 'size': size,
+ 'content_type': content_type, 'etag': etag,
+ 'deleted': deleted}
+ if self.db_file == ':memory:':
+ self.merge_items([record])
+ return
+ if not os.path.exists(self.db_file):
+ raise DatabaseConnectionError(self.db_file, "DB doesn't exist")
+ pending_size = 0
+ try:
+ pending_size = os.path.getsize(self.pending_file)
+ except OSError, err:
+ if err.errno != errno.ENOENT:
+ raise
+ if pending_size > PENDING_CAP:
+ self._commit_puts([record])
+ else:
+ with lock_parent_directory(
+ self.pending_file, self.pending_timeout):
+ with open(self.pending_file, 'a+b') as fp:
+ # Colons aren't used in base64 encoding; so they are our
+ # delimiter
+ fp.write(':')
+ fp.write(pickle.dumps(
+ (name, timestamp, size, content_type, etag, deleted),
+ protocol=PICKLE_PROTOCOL).encode('base64'))
+ fp.flush()
+ def is_deleted(self, timestamp=None):
+ """
+ Check if the DB is considered to be deleted.
+ :returns: True if the DB is considered to be deleted, False otherwise
+ """
+ if self.db_file != ':memory:' and not os.path.exists(self.db_file):
+ return True
+ try:
+ self._commit_puts()
+ except LockTimeout:
+ if not self.stale_reads_ok:
+ raise
+ with self.get() as conn:
+ row = conn.execute('''
+ SELECT put_timestamp, delete_timestamp, object_count
+ FROM container_stat''').fetchone()
+ # leave this db as a tombstone for a consistency window
+ if timestamp and row['delete_timestamp'] > timestamp:
+ return False
+ # The container is considered deleted if the delete_timestamp
+ # value is greater than the put_timestamp, and there are no
+ # objects in the container.
+ return (row['object_count'] in (None, '', 0, '0')) and \
+ (float(row['delete_timestamp']) > float(row['put_timestamp']))
+ def get_info(self):
+ """
+ Get global data for the container.
+ :returns: a tuple of (account, container, created_at, put_timestamp,
+ delete_timestamp, object_count, bytes_used,
+ reported_put_timestamp, reported_delete_timestamp,
+ reported_object_count, reported_bytes_used, hash, id)
+ """
+ try:
+ self._commit_puts()
+ except LockTimeout:
+ if not self.stale_reads_ok:
+ raise
+ with self.get() as conn:
+ return conn.execute('''
+ SELECT account, container, created_at, put_timestamp,
+ delete_timestamp, object_count, bytes_used,
+ reported_put_timestamp, reported_delete_timestamp,
+ reported_object_count, reported_bytes_used, hash, id
+ FROM container_stat
+ ''').fetchone()
+ def reported(self, put_timestamp, delete_timestamp, object_count,
+ bytes_used):
+ """
+ Update reported stats.
+ :param put_timestamp: put_timestamp to update
+ :param delete_timestamp: delete_timestamp to update
+ :param object_count: object_count to update
+ :param bytes_used: bytes_used to update
+ """
+ with self.get() as conn:
+ conn.execute('''
+ UPDATE container_stat
+ SET reported_put_timestamp = ?, reported_delete_timestamp = ?,
+ reported_object_count = ?, reported_bytes_used = ?
+ ''', (put_timestamp, delete_timestamp, object_count, bytes_used))
+ conn.commit()
+ def get_random_objects(self, max_count=100):
+ """
+ Get random objects from the DB. This is used by the container_auditor
+ when testing random objects for existence.
+ :param max_count: maximum number of objects to get
+ :returns: list of object names
+ """
+ try:
+ self._commit_puts()
+ except LockTimeout:
+ if not self.stale_reads_ok:
+ raise
+ rv = []
+ with self.get() as conn:
+ row = conn.execute('''
+ ''').fetchone()
+ if not row:
+ return []
+ max_rowid = row['ROWID']
+ for _ in xrange(min(max_count, max_rowid)):
+ row = conn.execute('''
+ SELECT name FROM object WHERE ROWID >= ? AND +deleted = 0
+ ''', (randint(0, max_rowid),)).fetchone()
+ if row:
+ rv.append(row['name'])
+ return list(set(rv))
+ def list_objects_iter(self, limit, marker, prefix, delimiter, path=None,
+ format=None):
+ """
+ Get a list of objects sorted by name starting at marker onward, up
+ to limit entries. Entries will begin with the prefix and will not
+ have the delimiter after the prefix.
+ :param limit: maximum number of entries to get
+ :param marker: marker query
+ :param prefix: prefix query
+ :param delimeter: delimeter for query
+ :param path: if defined, will set the prefix and delimter based on
+ the path
+ :param format: TOOD: remove as it is no longer used
+ :returns: list of tuples of (name, created_at, size, content_type,
+ etag)
+ """
+ try:
+ self._commit_puts()
+ except LockTimeout:
+ if not self.stale_reads_ok:
+ raise
+ if path is not None:
+ prefix = path
+ if path:
+ prefix = path = path.rstrip('/') + '/'
+ delimiter = '/'
+ elif delimiter and not prefix:
+ prefix = ''
+ orig_marker = marker
+ with self.get() as conn:
+ results = []
+ while len(results) < limit:
+ query = '''SELECT name, created_at, size, content_type, etag
+ FROM object WHERE'''
+ query_args = []
+ if marker and marker >= prefix:
+ query += ' name > ? AND'
+ query_args.append(marker)
+ elif prefix:
+ query += ' name >= ? AND'
+ query_args.append(prefix)
+ query += ' +deleted = 0 ORDER BY name LIMIT ?'
+ query_args.append(limit - len(results))
+ curs = conn.execute(query, query_args)
+ curs.row_factory = None
+ if prefix is None:
+ return [r for r in curs]
+ if not delimiter:
+ return [r for r in curs if r[0].startswith(prefix)]
+ rowcount = 0
+ for row in curs:
+ rowcount += 1
+ marker = name = row[0]
+ if len(results) >= limit or not name.startswith(prefix):
+ curs.close()
+ return results
+ end = name.find(delimiter, len(prefix))
+ if path is not None:
+ if name == path:
+ continue
+ if end >= 0 and len(name) > end + len(delimiter):
+ marker = name[:end] + chr(ord(delimiter) + 1)
+ curs.close()
+ break
+ elif end > 0:
+ marker = name[:end] + chr(ord(delimiter) + 1)
+ dir_name = name[:end + 1]
+ if dir_name != orig_marker:
+ results.append([dir_name, '0', 0, None, ''])
+ curs.close()
+ break
+ results.append(row)
+ if not rowcount:
+ break
+ return results
+ def merge_items(self, item_list, source=None):
+ """
+ Merge items into the object table.
+ :param item_list: list of dictionaries of {'name', 'created_at',
+ 'size', 'content_type', 'etag', 'deleted'}
+ :param source: if defined, update incoming_sync with the source
+ """
+ with self.get() as conn:
+ max_rowid = -1
+ for rec in item_list:
+ curs = conn.execute('''
+ DELETE FROM object WHERE name = ? AND
+ (created_at < ?)
+ ''', (rec['name'], rec['created_at']))
+ try:
+ conn.execute('''
+ INSERT INTO object (name, created_at, size,
+ content_type, etag, deleted)
+ VALUES (?, ?, ?, ?, ?, ?)
+ ''', ([rec['name'], rec['created_at'], rec['size'],
+ rec['content_type'], rec['etag'], rec['deleted']]))
+ except sqlite3.IntegrityError:
+ pass
+ if source:
+ max_rowid = max(max_rowid, rec['ROWID'])
+ if source:
+ try:
+ conn.execute('''
+ INSERT INTO incoming_sync (sync_point, remote_id)
+ VALUES (?, ?)
+ ''', (max_rowid, source))
+ except sqlite3.IntegrityError:
+ conn.execute('''
+ UPDATE incoming_sync SET sync_point=max(?, sync_point)
+ WHERE remote_id=?
+ ''', (max_rowid, source))
+ conn.commit()
+class AccountBroker(DatabaseBroker):
+ """Encapsulates working with a account database."""
+ db_type = 'account'
+ db_contains_type = 'container'
+ def _initialize(self, conn, put_timestamp):
+ """
+ Create a brand new database (tables, indices, triggers, etc.)
+ :param conn: DB connection object
+ :param put_timestamp: put timestamp
+ """
+ if not self.account:
+ raise ValueError(
+ 'Attempting to create a new database with no account set')
+ self.create_container_table(conn)
+ self.create_account_stat_table(conn, put_timestamp)
+ def create_container_table(self, conn):
+ """
+ Create container table which is specific to the account DB.
+ :param conn: DB connection object
+ """
+ conn.executescript("""
+ CREATE TABLE container (
+ put_timestamp TEXT,
+ delete_timestamp TEXT,
+ object_count INTEGER,
+ bytes_used INTEGER,
+ );
+ CREATE INDEX ix_container_deleted ON container (deleted);
+ CREATE INDEX ix_container_name ON container (name);
+ CREATE TRIGGER container_insert AFTER INSERT ON container
+ UPDATE account_stat
+ SET container_count = container_count + (1 - new.deleted),
+ object_count = object_count + new.object_count,
+ bytes_used = bytes_used + new.bytes_used,
+ hash = chexor(hash,,
+ new.put_timestamp || '-' ||
+ new.delete_timestamp || '-' ||
+ new.object_count || '-' || new.bytes_used);
+ END;
+ CREATE TRIGGER container_update BEFORE UPDATE ON container
+ END;
+ CREATE TRIGGER container_delete AFTER DELETE ON container
+ UPDATE account_stat
+ SET container_count = container_count - (1 - old.deleted),
+ object_count = object_count - old.object_count,
+ bytes_used = bytes_used - old.bytes_used,
+ hash = chexor(hash,,
+ old.put_timestamp || '-' ||
+ old.delete_timestamp || '-' ||
+ old.object_count || '-' || old.bytes_used);
+ END;
+ """)
+ def create_account_stat_table(self, conn, put_timestamp):
+ """
+ Create account_stat table which is specific to the account DB.
+ :param conn: DB connection object
+ :param put_timestamp: put timestamp
+ """
+ conn.executescript("""
+ CREATE TABLE account_stat (
+ account TEXT,
+ created_at TEXT,
+ put_timestamp TEXT DEFAULT '0',
+ delete_timestamp TEXT DEFAULT '0',
+ container_count INTEGER,
+ object_count INTEGER DEFAULT 0,
+ bytes_used INTEGER DEFAULT 0,
+ hash TEXT default '00000000000000000000000000000000',
+ id TEXT,
+ status TEXT DEFAULT '',
+ status_changed_at TEXT DEFAULT '0'
+ );
+ INSERT INTO account_stat (container_count) VALUES (0);
+ """)
+ conn.execute('''
+ UPDATE account_stat SET account = ?, created_at = ?, id = ?,
+ put_timestamp = ?
+ ''', (self.account, normalize_timestamp(time.time()), str(uuid4()),
+ put_timestamp))
+ def update_put_timestamp(self, timestamp):
+ """
+ Update the put_timestamp. Only modifies it if it is greater than
+ the current timestamp.
+ :param timestamp: put timestamp
+ """
+ with self.get() as conn:
+ conn.execute('''
+ UPDATE account_stat SET put_timestamp = ?
+ WHERE put_timestamp < ? ''', (timestamp, timestamp))
+ conn.commit()
+ def _delete_db(self, conn, timestamp, force=False):
+ """
+ Mark the DB as deleted.
+ :param conn: DB connection object
+ :param timestamp: timestamp to mark as deleted
+ """
+ conn.execute("""
+ UPDATE account_stat
+ SET delete_timestamp = ?,
+ status = 'DELETED',
+ status_changed_at = ?
+ WHERE delete_timestamp < ? """, (timestamp, timestamp, timestamp))
+ def _commit_puts(self, item_list=None):
+ """Handles commiting rows in .pending files."""
+ if self.db_file == ':memory:' or not os.path.exists(self.pending_file):
+ return
+ if item_list is None:
+ item_list = []
+ with lock_parent_directory(self.pending_file, self.pending_timeout):
+ self._preallocate()
+ if not os.path.getsize(self.pending_file):
+ if item_list:
+ self.merge_items(item_list)
+ return
+ with open(self.pending_file, 'r+b') as fp:
+ for entry in':'):
+ if entry:
+ try:
+ (name, put_timestamp, delete_timestamp,
+ object_count, bytes_used, deleted) = \
+ pickle.loads(entry.decode('base64'))
+ item_list.append({'name': name,
+ 'put_timestamp': put_timestamp,
+ 'delete_timestamp': delete_timestamp,
+ 'object_count': object_count,
+ 'bytes_used': bytes_used,
+ 'deleted': deleted})
+ except:
+ self.logger.exception(
+ 'Invalid pending entry %s: %s'
+ % (self.pending_file, entry))
+ if item_list:
+ self.merge_items(item_list)
+ try:
+ os.ftruncate(fp.fileno(), 0)
+ except OSError, err:
+ if err.errno != errno.ENOENT:
+ raise
+ def empty(self):
+ """
+ Check if the account DB is empty.
+ :returns: True if the database has no active containers.
+ """
+ try:
+ self._commit_puts()
+ except LockTimeout:
+ if not self.stale_reads_ok:
+ raise
+ with self.get() as conn:
+ row = conn.execute(
+ 'SELECT container_count from account_stat').fetchone()
+ return (row[0] == 0)
+ def reclaim(self, container_timestamp, sync_timestamp):
+ """
+ Delete rows from the container table that are marked deleted and
+ whose created_at timestamp is < object_timestamp. Also deletes rows
+ from incoming_sync and outgoing_sync where the updated_at timestamp is
+ < sync_timestamp.
+ :param object_timestamp: max created_at timestamp of container rows to
+ delete
+ :param sync_timestamp: max update_at timestamp of sync rows to delete
+ """
+ self._commit_puts()
+ with self.get() as conn:
+ conn.execute('''
+ deleted = 1 AND delete_timestamp < ?
+ ''', (container_timestamp,))
+ try:
+ conn.execute('''
+ DELETE FROM outgoing_sync WHERE updated_at < ?
+ ''', (sync_timestamp,))
+ conn.execute('''
+ DELETE FROM incoming_sync WHERE updated_at < ?
+ ''', (sync_timestamp,))
+ except sqlite3.OperationalError, err:
+ # Old dbs didn't have updated_at in the _sync tables.
+ if 'no such column: updated_at' not in str(err):
+ raise
+ conn.commit()
+ def get_container_timestamp(self, container_name):
+ """
+ Get the put_timestamp of a container.
+ :param container_name: container name
+ :returns: put_timestamp of the container
+ """
+ try:
+ self._commit_puts()
+ except LockTimeout:
+ if not self.stale_reads_ok:
+ raise
+ with self.get() as conn:
+ ret = conn.execute('''
+ SELECT put_timestamp FROM container
+ WHERE name = ? AND deleted != 1''',
+ (container_name,)).fetchone()
+ if ret:
+ ret = ret[0]
+ return ret
+ def put_container(self, name, put_timestamp, delete_timestamp,
+ object_count, bytes_used):
+ """
+ Create a container with the given attributes.
+ :param name: name of the container to create
+ :param put_timestamp: put_timestamp of the container to create
+ :param delete_timestamp: delete_timestamp of the container to create
+ :param object_count: number of objects in the container
+ :param bytes_used: number of bytes used by the container
+ """
+ if delete_timestamp > put_timestamp and \
+ object_count in (None, '', 0, '0'):
+ deleted = 1
+ else:
+ deleted = 0
+ record = {'name': name, 'put_timestamp': put_timestamp,
+ 'delete_timestamp': delete_timestamp,
+ 'object_count': object_count,
+ 'bytes_used': bytes_used,
+ 'deleted': deleted}
+ if self.db_file == ':memory:':
+ self.merge_items([record])
+ return
+ commit = False
+ with lock_parent_directory(self.pending_file, self.pending_timeout):
+ with open(self.pending_file, 'a+b') as fp:
+ # Colons aren't used in base64 encoding; so they are our
+ # delimiter
+ fp.write(':')
+ fp.write(pickle.dumps(
+ (name, put_timestamp, delete_timestamp, object_count,
+ bytes_used, deleted),
+ protocol=PICKLE_PROTOCOL).encode('base64'))
+ fp.flush()
+ if fp.tell() > PENDING_CAP:
+ commit = True
+ if commit:
+ self._commit_puts()
+ def can_delete_db(self, cutoff):
+ """
+ Check if the accont DB can be deleted.
+ :returns: True if the account can be deleted, False otherwise
+ """
+ self._commit_puts()
+ with self.get() as conn:
+ row = conn.execute('''
+ SELECT status, put_timestamp, delete_timestamp, container_count
+ FROM account_stat''').fetchone()
+ # The account is considered deleted if its status is marked
+ # as 'DELETED" and the delete_timestamp is older than the supplied
+ # cutoff date; or if the delete_timestamp value is greater than
+ # the put_timestamp, and there are no containers for the account
+ status_del = (row['status'] == 'DELETED')
+ deltime = float(row['delete_timestamp'])
+ past_cutoff = (deltime < cutoff)
+ time_later = (row['delete_timestamp'] > row['put_timestamp'])
+ no_containers = (row['container_count'] in (None, '', 0, '0'))
+ return (
+ (status_del and past_cutoff) or (time_later and no_containers))
+ def is_deleted(self):
+ """
+ Check if the account DB is considered to be deleted.
+ :returns: True if the account DB is considered to be deleted, False
+ otherwise
+ """
+ if self.db_file != ':memory:' and not os.path.exists(self.db_file):
+ return True
+ try:
+ self._commit_puts()
+ except LockTimeout:
+ if not self.stale_reads_ok:
+ raise
+ with self.get() as conn:
+ row = conn.execute('''
+ SELECT put_timestamp, delete_timestamp, container_count, status
+ FROM account_stat''').fetchone()
+ return row['status'] == 'DELETED' or (
+ row['container_count'] in (None, '', 0, '0') and
+ row['delete_timestamp'] > row['put_timestamp'])
+ def is_status_deleted(self):
+ """Only returns true if the status field is set to DELETED."""
+ with self.get() as conn:
+ row = conn.execute('''
+ SELECT status
+ FROM account_stat''').fetchone()
+ return (row['status'] == "DELETED")
+ def get_info(self):
+ """
+ Get global data for the account.
+ :returns: a tuple of (account, created_at, put_timestamp,
+ delete_timestamp, container_count, object_count,
+ bytes_used, hash, id)
+ """
+ try:
+ self._commit_puts()
+ except LockTimeout:
+ if not self.stale_reads_ok:
+ raise
+ with self.get() as conn:
+ return conn.execute('''
+ SELECT account, created_at, put_timestamp, delete_timestamp,
+ container_count, object_count, bytes_used, hash, id
+ FROM account_stat
+ ''').fetchone()
+ def get_random_containers(self, max_count=100):
+ """
+ Get random containers from the DB. This is used by the
+ account_auditor when testing random containerss for existence.
+ :param max_count: maximum number of containers to get
+ :returns: list of container names
+ """
+ try:
+ self._commit_puts()
+ except LockTimeout:
+ if not self.stale_reads_ok:
+ raise
+ rv = []
+ with self.get() as conn:
+ row = conn.execute('''
+ ''').fetchone()
+ if not row:
+ return []
+ max_rowid = row['ROWID']
+ for _ in xrange(min(max_count, max_rowid)):
+ row = conn.execute('''
+ SELECT name FROM container WHERE
+ ROWID >= ? AND +deleted = 0
+ ''', (randint(0, max_rowid),)).fetchone()
+ if row:
+ rv.append(row['name'])
+ return list(set(rv))
+ def list_containers_iter(self, limit, marker, prefix, delimiter):
+ """
+ Get a list of containerss sorted by name starting at marker onward, up
+ to limit entries. Entries will begin with the prefix and will not
+ have the delimiter after the prefix.
+ :param limit: maximum number of entries to get
+ :param marker: marker query
+ :param prefix: prefix query
+ :param delimeter: delimeter for query
+ :returns: list of tuples of (name, object_count, bytes_used, 0)
+ """
+ try:
+ self._commit_puts()
+ except LockTimeout:
+ if not self.stale_reads_ok:
+ raise
+ if delimiter and not prefix:
+ prefix = ''
+ orig_marker = marker
+ with self.get() as conn:
+ results = []
+ while len(results) < limit:
+ query = """
+ SELECT name, object_count, bytes_used, 0
+ FROM container
+ WHERE deleted = 0 AND """
+ query_args = []
+ if marker and marker >= prefix:
+ query += ' name > ? AND'
+ query_args.append(marker)
+ elif prefix:
+ query += ' name >= ? AND'
+ query_args.append(prefix)
+ query += ' +deleted = 0 ORDER BY name LIMIT ?'
+ query_args.append(limit - len(results))
+ curs = conn.execute(query, query_args)
+ curs.row_factory = None
+ if prefix is None:
+ return [r for r in curs]
+ if not delimiter:
+ return [r for r in curs if r[0].startswith(prefix)]
+ rowcount = 0
+ for row in curs:
+ rowcount += 1
+ marker = name = row[0]
+ if len(results) >= limit or not name.startswith(prefix):
+ curs.close()
+ return results
+ end = name.find(delimiter, len(prefix))
+ if end > 0:
+ marker = name[:end] + chr(ord(delimiter) + 1)
+ dir_name = name[:end + 1]
+ if dir_name != orig_marker:
+ results.append([dir_name, 0, 0, 1])
+ curs.close()
+ break
+ results.append(row)
+ if not rowcount:
+ break
+ return results
+ def merge_items(self, item_list, source=None):
+ """
+ Merge items into the container table.
+ :param item_list: list of dictionaries of {'name', 'put_timestamp',
+ 'delete_timestamp', 'object_count', 'bytes_used',
+ 'deleted'}
+ :param source: if defined, update incoming_sync with the source
+ """
+ with self.get() as conn:
+ max_rowid = -1
+ for rec in item_list:
+ record = [rec['name'], rec['put_timestamp'],
+ rec['delete_timestamp'], rec['object_count'],
+ rec['bytes_used'], rec['deleted']]
+ try:
+ conn.execute('''
+ INSERT INTO container (name, put_timestamp,
+ delete_timestamp, object_count, bytes_used,
+ deleted)
+ VALUES (?, ?, ?, ?, ?, ?)
+ ''', record)
+ except sqlite3.IntegrityError:
+ curs = conn.execute('''
+ SELECT name, put_timestamp, delete_timestamp,
+ object_count, bytes_used, deleted
+ FROM container WHERE name = ? AND
+ (put_timestamp < ? OR delete_timestamp < ? OR
+ object_count != ? OR bytes_used != ?)''',
+ (rec['name'], rec['put_timestamp'],
+ rec['delete_timestamp'], rec['object_count'],
+ rec['bytes_used']))
+ curs.row_factory = None
+ row = curs.fetchone()
+ if row:
+ row = list(row)
+ for i in xrange(5):
+ if record[i] is None and row[i] is not None:
+ record[i] = row[i]
+ if row[1] > record[1]: # Keep newest put_timestamp
+ record[1] = row[1]
+ if row[2] > record[2]: # Keep newest delete_timestamp
+ record[2] = row[2]
+ conn.execute('DELETE FROM container WHERE name = ?',
+ (record[0],))
+ # If deleted, mark as such
+ if record[2] > record[1] and \
+ record[3] in (None, '', 0, '0'):
+ record[5] = 1
+ else:
+ record[5] = 0
+ try:
+ conn.execute('''
+ INSERT INTO container (name, put_timestamp,
+ delete_timestamp, object_count, bytes_used,
+ deleted)
+ VALUES (?, ?, ?, ?, ?, ?)
+ ''', record)
+ except sqlite3.IntegrityError:
+ continue
+ if source:
+ max_rowid = max(max_rowid, rec['ROWID'])
+ if source:
+ try:
+ conn.execute('''
+ INSERT INTO incoming_sync (sync_point, remote_id)
+ VALUES (?, ?)
+ ''', (max_rowid, source))
+ except sqlite3.IntegrityError:
+ conn.execute('''
+ UPDATE incoming_sync SET sync_point=max(?, sync_point)
+ WHERE remote_id=?
+ ''', (max_rowid, source))
+ conn.commit()
diff --git a/swift/common/ b/swift/common/
new file mode 100644
index 000000000..b8df33038
--- /dev/null
+++ b/swift/common/
@@ -0,0 +1,526 @@
+# Copyright (c) 2010 OpenStack, LLC.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from __future__ import with_statement
+import sys
+import os
+import random
+import math
+import time
+import shutil
+from eventlet import GreenPool, sleep, Timeout
+from import subprocess
+import simplejson
+from webob import Response
+from webob.exc import HTTPNotFound, HTTPNoContent, HTTPAccepted, \
+ HTTPInsufficientStorage, HTTPBadRequest
+from swift.common.utils import get_logger, whataremyips, storage_directory, \
+ renamer, mkdirs, lock_parent_directory, unlink_older_than, LoggerFileObject
+from swift.common import ring
+from swift.common.bufferedhttp import BufferedHTTPConnection
+from swift.common.exceptions import DriveNotMounted, ConnectionTimeout
+def quarantine_db(object_file, server_type):
+ """
+ In the case that a corrupt file is found, move it to a quarantined area to
+ allow replication to fix it.
+ :param object_file: path to corrupt file
+ :param server_type: type of file that is corrupt
+ ('container' or 'account')
+ """
+ object_dir = os.path.dirname(object_file)
+ quarantine_dir = os.path.abspath(os.path.join(object_dir, '..',
+ '..', '..', '..', 'quarantined', server_type + 's',
+ os.path.basename(object_dir)))
+ renamer(object_dir, quarantine_dir)
+class ReplConnection(BufferedHTTPConnection):
+ """
+ Helper to simplify POSTing to a remote server.
+ """
+ def __init__(self, node, partition, hash_, logger):
+ ""
+ self.logger = logger
+ self.node = node
+ BufferedHTTPConnection.__init__(self, '%(ip)s:%(port)s' % node)
+ self.path = '/%s/%s/%s' % (node['device'], partition, hash_)
+ def post(self, *args):
+ """
+ Make an HTTP POST request
+ :param args: list of json-encodable objects
+ :returns: httplib response object
+ """
+ try:
+ body = simplejson.dumps(args)
+ self.request('POST', self.path, body,
+ {'Content-Type': 'application/json'})
+ response = self.getresponse()
+ =
+ return response
+ except:
+ self.logger.exception(
+ 'ERROR reading HTTP response from %s' % self.node)
+ return None
+class Replicator(object):
+ """
+ Implements the logic for directing db replication.
+ """
+ def __init__(self, server_conf, replicator_conf):
+ self.logger = \
+ get_logger(replicator_conf, '%s-replicator' % self.server_type)
+ # log uncaught exceptions
+ sys.excepthook = lambda *exc_info: \
+ self.logger.critical('UNCAUGHT EXCEPTION', exc_info=exc_info)
+ sys.stdout = sys.stderr = LoggerFileObject(self.logger)
+ self.root = server_conf.get('devices', '/srv/node')
+ self.mount_check = server_conf.get('mount_check', 'true').lower() in \
+ ('true', 't', '1', 'on', 'yes', 'y')
+ self.port = int(server_conf.get('bind_port', self.default_port))
+ concurrency = int(replicator_conf.get('concurrency', 8))
+ self.cpool = GreenPool(size=concurrency)
+ swift_dir = server_conf.get('swift_dir', '/etc/swift')
+ self.ring = ring.Ring(os.path.join(swift_dir, self.ring_file))
+ self.per_diff = int(replicator_conf.get('per_diff', 1000))
+ self.run_pause = int(replicator_conf.get('run_pause', 30))
+ self.vm_test_mode = replicator_conf.get(
+ 'vm_test_mode', 'no').lower() in ('yes', 'true', 'on', '1')
+ self.node_timeout = int(replicator_conf.get('node_timeout', 10))
+ self.conn_timeout = float(replicator_conf.get('conn_timeout', 0.5))
+ self.reclaim_age = float(replicator_conf.get('reclaim_age', 86400 * 7))
+ self._zero_stats()
+ def _zero_stats(self):
+ """Zero out the stats."""
+ self.stats = {'attempted': 0, 'success': 0, 'failure': 0, 'ts_repl': 0,
+ 'no_change': 0, 'hashmatch': 0, 'rsync': 0, 'diff': 0,
+ 'remove': 0, 'empty': 0, 'remote_merge': 0,
+ 'start': time.time()}
+ def _report_stats(self):
+ """Report the current stats to the logs."""
+ 'Attempted to replicate %d dbs in %.5f seconds (%.5f/s)'
+ % (self.stats['attempted'], time.time() - self.stats['start'],
+ self.stats['attempted'] /
+ (time.time() - self.stats['start'] + 0.0000001)))
+'Removed %(remove)d dbs' % self.stats)
+'%(success)s successes, %(failure)s failures'
+ % self.stats)
+' '.join(['%s:%s' % item for item in
+ self.stats.items() if item[0] in
+ ('no_change', 'hashmatch', 'rsync', 'diff', 'ts_repl', 'empty')]))
+ def _rsync_file(self, db_file, remote_file, whole_file=True):
+ """
+ Sync a single file using rsync. Used by _rsync_db to handle syncing.
+ :param db_file: file to be synced
+ :param remote_file: remote location to sync the DB file to
+ :param whole-file: if True, uses rsync's --whole-file flag
+ :returns: True if the sync was successful, False otherwise
+ """
+ popen_args = ['rsync', '--quiet', '--no-motd',
+ '--timeout=%s' % int(math.ceil(self.node_timeout)),
+ '--contimeout=%s' % int(math.ceil(self.conn_timeout))]
+ if whole_file:
+ popen_args.append('--whole-file')
+ popen_args.extend([db_file, remote_file])
+ proc = subprocess.Popen(popen_args)
+ proc.communicate()
+ if proc.returncode != 0:
+ self.logger.error('ERROR rsync failed with %s: %s' %
+ (proc.returncode, popen_args))
+ return proc.returncode == 0
+ def _rsync_db(self, broker, device, http, local_id,
+ post_method='complete_rsync', post_timeout=None):
+ """
+ Sync a whole db using rsync.
+ :param broker: DB broker object of DB to be synced
+ :param device: device to sync to
+ :param http: ReplConnection object
+ :param local_id: unique ID of the local database replica
+ :param post_method: remote operation to perform after rsync
+ :param post_timeout: timeout to wait in seconds
+ """
+ if self.vm_test_mode:
+ remote_file = '%s::%s%s/%s/tmp/%s' % (device['ip'],
+ self.server_type, device['port'], device['device'],
+ local_id)
+ else:
+ remote_file = '%s::%s/%s/tmp/%s' % (device['ip'],
+ self.server_type, device['device'], local_id)
+ mtime = os.path.getmtime(broker.db_file)
+ if not self._rsync_file(broker.db_file, remote_file):
+ return False
+ # perform block-level sync if the db was modified during the first sync
+ if os.path.exists(broker.db_file + '-journal') or \
+ os.path.getmtime(broker.db_file) > mtime:
+ # grab a lock so nobody else can modify it
+ with broker.lock():
+ if not self._rsync_file(broker.db_file, remote_file, False):
+ return False
+ with Timeout(post_timeout or self.node_timeout):
+ response =, local_id)
+ return response and response.status >= 200 and response.status < 300
+ def _usync_db(self, point, broker, http, remote_id, local_id):
+ """
+ Sync a db by sending all records since the last sync.
+ :param point: synchronization high water mark between the replicas
+ :param broker: database broker object
+ :param http: ReplConnection object for the remote server
+ :param remote_id: database id for the remote replica
+ :param local_id: database id for the local replica
+ :returns: boolean indicating completion and success
+ """
+ self.stats['diff'] += 1
+ self.logger.debug('Syncing chunks with %s',
+ sync_table = broker.get_syncs()
+ objects = broker.get_items_since(point, self.per_diff)
+ while len(objects):
+ with Timeout(self.node_timeout):
+ response ='merge_items', objects, local_id)
+ if not response or response.status >= 300 or response.status < 200:
+ if response:
+ self.logger.error('ERROR Bad response %s from %s' %
+ (response.status,
+ return False
+ point = objects[-1]['ROWID']
+ objects = broker.get_items_since(point, self.per_diff)
+ with Timeout(self.node_timeout):
+ response ='merge_syncs', sync_table)
+ if response and response.status >= 200 and response.status < 300:
+ broker.merge_syncs([{'remote_id': remote_id,
+ 'sync_point': point}], incoming=False)
+ return True
+ return False
+ def _in_sync(self, rinfo, info, broker, local_sync):
+ """
+ Determine whether or not two replicas of a databases are considered
+ to be in sync.
+ :param rinfo: remote database info
+ :param info: local database info
+ :param broker: database broker object
+ :param local_sync: cached last sync point between replicas
+ :returns: boolean indicating whether or not the replicas are in sync
+ """
+ if max(rinfo['point'], local_sync) >= info['max_row']:
+ self.stats['no_change'] += 1
+ return True
+ if rinfo['hash'] == info['hash']:
+ self.stats['hashmatch'] += 1
+ broker.merge_syncs([{'remote_id': rinfo['id'],
+ 'sync_point': rinfo['point']}], incoming=False)
+ return True
+ def _http_connect(self, node, partition, db_file):
+ """
+ Make an http_connection using ReplConnection
+ :param node: node dictionary from the ring
+ :param partition: partition partition to send in the url
+ :param db_file: DB file
+ :returns: ReplConnection object
+ """
+ return ReplConnection(node, partition,
+ os.path.basename(db_file).split('.', 1)[0], self.logger)
+ def _repl_to_node(self, node, broker, partition, info):
+ """
+ Replicate a database to a node.
+ :param node: node dictionary from the ring to be replicated to
+ :param broker: DB broker for the DB to be replication
+ :param partition: partition on the node to replicate to
+ :param info: DB info as a dictionary of {'max_row', 'hash', 'id',
+ 'created_at', 'put_timestamp', 'delete_timestamp'}
+ :returns: True if successful, False otherwise
+ """
+ with ConnectionTimeout(self.conn_timeout):
+ http = self._http_connect(node, partition, broker.db_file)
+ if not http:
+ self.logger.error(
+ 'ERROR Unable to connect to remote server: %s' % node)
+ return False
+ with Timeout(self.node_timeout):
+ response ='sync', info['max_row'], info['hash'],
+ info['id'], info['created_at'], info['put_timestamp'],
+ info['delete_timestamp'])
+ if not response:
+ return False
+ elif response.status == HTTPNotFound.code: # completely missing, rsync
+ self.stats['rsync'] += 1
+ return self._rsync_db(broker, node, http, info['id'])
+ elif response.status == HTTPInsufficientStorage.code:
+ raise DriveNotMounted()
+ elif response.status >= 200 and response.status < 300:
+ rinfo = simplejson.loads(
+ local_sync = broker.get_sync(rinfo['id'], incoming=False)
+ if self._in_sync(rinfo, info, broker, local_sync):
+ return True
+ # if the difference in rowids between the two differs by
+ # more than 50%, rsync then do a remote merge.
+ if rinfo['max_row'] / float(info['max_row']) < 0.5:
+ self.stats['remote_merge'] += 1
+ return self._rsync_db(broker, node, http, info['id'],
+ post_method='rsync_then_merge',
+ post_timeout=(info['count'] / 2000))
+ # else send diffs over to the remote server
+ return self._usync_db(max(rinfo['point'], local_sync),
+ broker, http, rinfo['id'], info['id'])
+ def _replicate_object(self, partition, object_file, node_id):
+ """
+ Replicate the db, choosing method based on whether or not it
+ already exists on peers.
+ :param partition: partition to be replicated to
+ :param object_file: DB file name to be replicated
+ :param node_id: node id of the node to be replicated to
+ """
+ self.logger.debug('Replicating db %s' % object_file)
+ self.stats['attempted'] += 1
+ try:
+ broker = self.brokerclass(object_file, pending_timeout=30)
+ broker.reclaim(time.time() - self.reclaim_age,
+ time.time() - (self.reclaim_age * 2))
+ info = broker.get_replication_info()
+ except Exception, e:
+ if 'no such table' in str(e):
+ self.logger.error('Quarantining DB %s' % object_file)
+ quarantine_db(broker.db_file, broker.db_type)
+ else:
+ self.logger.exception('ERROR reading db %s' % object_file)
+ self.stats['failure'] += 1
+ return
+ # The db is considered deleted if the delete_timestamp value is greater
+ # than the put_timestamp, and there are no objects.
+ delete_timestamp = 0
+ try:
+ delete_timestamp = float(info['delete_timestamp'])
+ except ValueError:
+ pass
+ put_timestamp = 0
+ try:
+ put_timestamp = float(info['put_timestamp'])
+ except ValueError:
+ pass
+ if delete_timestamp < (time.time() - self.reclaim_age) and \
+ delete_timestamp > put_timestamp and \
+ info['count'] in (None, '', 0, '0'):
+ with lock_parent_directory(object_file):
+ shutil.rmtree(os.path.dirname(object_file), True)
+ self.stats['remove'] += 1
+ return
+ responses = []
+ nodes = self.ring.get_part_nodes(int(partition))
+ shouldbehere = bool([n for n in nodes if n['id'] == node_id])
+ repl_nodes = [n for n in nodes if n['id'] != node_id]
+ more_nodes = self.ring.get_more_nodes(int(partition))
+ for node in repl_nodes:
+ success = False
+ try:
+ success = self._repl_to_node(node, broker, partition, info)
+ except DriveNotMounted:
+ repl_nodes.append(
+ self.logger.error('ERROR Remote drive not mounted %s' % node)
+ except:
+ self.logger.exception('ERROR syncing %s with node %s' %
+ (object_file, node))
+ self.stats['success' if success else 'failure'] += 1
+ responses.append(success)
+ if not shouldbehere and all(responses):
+ # If the db shouldn't be on this node and has been successfully
+ # synced to all of its peers, it can be removed.
+ with lock_parent_directory(object_file):
+ shutil.rmtree(os.path.dirname(object_file), True)
+ self.stats['remove'] += 1
+ def roundrobin_datadirs(self, datadirs):
+ """
+ Generator to walk the data dirs in a round robin manner, evenly
+ hitting each device on the system.
+ :param datadirs: a list of paths to walk
+ """
+ def walk_datadir(datadir, node_id):
+ partitions = os.listdir(datadir)
+ random.shuffle(partitions)
+ for partition in partitions:
+ part_dir = os.path.join(datadir, partition)
+ for root, dirs, files in os.walk(part_dir, topdown=False):
+ for fname in (f for f in files if f.endswith('.db')):
+ object_file = os.path.join(root, fname)
+ yield (partition, object_file, node_id)
+ its = [walk_datadir(datadir, node_id) for datadir, node_id in datadirs]
+ while its:
+ for it in its:
+ try:
+ yield
+ except StopIteration:
+ its.remove(it)
+ def replicate_once(self):
+ """Run a replication pass once."""
+ self._zero_stats()
+ dirs = []
+ ips = whataremyips()
+ if not ips:
+ self.logger.error('ERROR Failed to get my own IPs?')
+ return
+ for node in self.ring.devs:
+ if node and node['ip'] in ips and node['port'] == self.port:
+ if self.mount_check and not os.path.ismount(
+ os.path.join(self.root, node['device'])):
+ self.logger.warn(
+ 'Skipping %(device)s as it is not mounted' % node)
+ continue
+ unlink_older_than(
+ os.path.join(self.root, node['device'], 'tmp'),
+ time.time() - self.reclaim_age)
+ datadir = os.path.join(self.root, node['device'], self.datadir)
+ if os.path.isdir(datadir):
+ dirs.append((datadir, node['id']))
+'Beginning replication run')
+ for part, object_file, node_id in self.roundrobin_datadirs(dirs):
+ self.cpool.spawn_n(
+ self._replicate_object, part, object_file, node_id)
+ self.cpool.waitall()
+'Replication run OVER')
+ self._report_stats()
+ def replicate_forever(self):
+ """
+ Replicate dbs under the given root in an infinite loop.
+ """
+ while True:
+ try:
+ self.replicate_once()
+ except:
+ self.logger.exception('ERROR trying to replicate')
+ sleep(self.run_pause)
+class ReplicatorRpc(object):
+ """Handle Replication RPC calls. TODO: redbo document please :)"""
+ def __init__(self, root, datadir, broker_class, mount_check=True):
+ self.root = root
+ self.datadir = datadir
+ self.broker_class = broker_class
+ self.mount_check = mount_check
+ def dispatch(self, post_args, args):
+ if not hasattr(args, 'pop'):
+ return HTTPBadRequest(body='Invalid object type')
+ op = args.pop(0)
+ drive, partition, hsh = post_args
+ if self.mount_check and \
+ not os.path.ismount(os.path.join(self.root, drive)):
+ return Response(status='507 %s is not mounted' % drive)
+ db_file = os.path.join(self.root, drive,
+ storage_directory(self.datadir, partition, hsh), hsh + '.db')
+ if op == 'rsync_then_merge':
+ return self.rsync_then_merge(drive, db_file, args)
+ if op == 'complete_rsync':
+ return self.complete_rsync(drive, db_file, args)
+ else:
+ # someone might be about to rsync a db to us,
+ # make sure there's a tmp dir to receive it.
+ mkdirs(os.path.join(self.root, drive, 'tmp'))
+ if not os.path.exists(db_file):
+ return HTTPNotFound()
+ return getattr(self, op)(self.broker_class(db_file), args)
+ def sync(self, broker, args):
+ (remote_sync, hash_, id_, created_at, put_timestamp,
+ delete_timestamp) = args
+ try:
+ info = broker.get_replication_info()
+ except Exception, e:
+ if 'no such table' in str(e):
+ # TODO find a real logger
+ print "Quarantining DB %s" % broker.db_file
+ quarantine_db(broker.db_file, broker.db_type)
+ return HTTPNotFound()
+ raise
+ if info['put_timestamp'] != put_timestamp or \
+ info['created_at'] != created_at or \
+ info['delete_timestamp'] != delete_timestamp:
+ broker.merge_timestamps(
+ created_at, put_timestamp, delete_timestamp)
+ info['point'] = broker.get_sync(id_)
+ if hash_ == info['hash'] and info['point'] < remote_sync:
+ broker.merge_syncs([{'remote_id': id_,
+ 'sync_point': remote_sync}])
+ info['point'] = remote_sync
+ return Response(simplejson.dumps(info))
+ def merge_syncs(self, broker, args):
+ broker.merge_syncs(args[0])
+ return HTTPAccepted()
+ def merge_items(self, broker, args):
+ broker.merge_items(args[0], args[1])
+ return HTTPAccepted()
+ def complete_rsync(self, drive, db_file, args):
+ old_filename = os.path.join(self.root, drive, 'tmp', args[0])
+ if os.path.exists(db_file):
+ return HTTPNotFound()
+ if not os.path.exists(old_filename):
+ return HTTPNotFound()
+ broker = self.broker_class(old_filename)
+ broker.newid(args[0])
+ renamer(old_filename, db_file)
+ return HTTPNoContent()
+ def rsync_then_merge(self, drive, db_file, args):
+ old_filename = os.path.join(self.root, drive, 'tmp', args[0])
+ if not os.path.exists(db_file) or not os.path.exists(old_filename):
+ return HTTPNotFound()
+ new_broker = self.broker_class(old_filename)
+ existing_broker = self.broker_class(db_file)
+ point = -1
+ objects = existing_broker.get_items_since(point, 1000)
+ while len(objects):
+ new_broker.merge_items(objects)
+ point = objects[-1]['ROWID']
+ objects = existing_broker.get_items_since(point, 1000)
+ sleep()
+ new_broker.newid(args[0])
+ renamer(old_filename, db_file)
+ return HTTPNoContent()
diff --git a/swift/common/ b/swift/common/
new file mode 100644
index 000000000..1d7030c09
--- /dev/null
+++ b/swift/common/
@@ -0,0 +1,303 @@
+# Copyright (c) 2010 OpenStack, LLC.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+Internal client library for making calls directly to the servers rather than
+through the proxy.
+import socket
+from httplib import HTTPException
+from time import time
+from urllib import quote as _quote, unquote
+from eventlet import sleep, Timeout
+from swift.common.bufferedhttp import http_connect
+from swift.common.client import ClientException, json_loads
+from swift.common.utils import normalize_timestamp
+def quote(value, safe='/'):
+ if isinstance(value, unicode):
+ value = value.encode('utf8')
+ return _quote(value, safe)
+def direct_head_container(node, part, account, container, conn_timeout=5,
+ response_timeout=15):
+ """
+ Request container information directly from the container server.
+ :param node: node dictionary from the ring
+ :param part: partition the container is on
+ :param account: account name
+ :param container: container name
+ :param conn_timeout: timeout in seconds for establishing the connection
+ :param response_timeout: timeout in seconds for getting the response
+ :returns: tuple of (object count, bytes used)
+ """
+ path = '/%s/%s' % (account, container)
+ with Timeout(conn_timeout):
+ conn = http_connect(node['ip'], node['port'], node['device'], part,
+ 'HEAD', path)
+ with Timeout(response_timeout):
+ resp = conn.getresponse()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException(
+ 'Container server %s:%s direct HEAD %s gave status %s' %
+ (node['ip'], node['port'],
+ repr('/%s/%s%s' % (node['device'], part, path)),
+ resp.status),
+ http_host=node['ip'], http_port=node['port'],
+ http_device=node['device'], http_status=resp.status,
+ http_reason=resp.reason)
+ return int(resp.getheader('x-container-object-count')), \
+ int(resp.getheader('x-container-bytes-used'))
+def direct_get_container(node, part, account, container, marker=None,
+ limit=None, prefix=None, delimiter=None,
+ conn_timeout=5, response_timeout=15):
+ """
+ Get container listings directly from the container server.
+ :param node: node dictionary from the ring
+ :param part: partition the container is on
+ :param account: account name
+ :param container: container name
+ :param marker: marker query
+ :param limit: query limit
+ :param prefix: prefix query
+ :param delimeter: delimeter for the query
+ :param conn_timeout: timeout in seconds for establishing the connection
+ :param response_timeout: timeout in seconds for getting the response
+ :returns: list of objects
+ """
+ path = '/%s/%s' % (account, container)
+ qs = 'format=json'
+ if marker:
+ qs += '&marker=%s' % quote(marker)
+ if limit:
+ qs += '&limit=%d' % limit
+ if prefix:
+ qs += '&prefix=%s' % quote(prefix)
+ if delimiter:
+ qs += '&delimiter=%s' % quote(delimiter)
+ with Timeout(conn_timeout):
+ conn = http_connect(node['ip'], node['port'], node['device'], part,
+ 'GET', path, query_string='format=json')
+ with Timeout(response_timeout):
+ resp = conn.getresponse()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException(
+ 'Container server %s:%s direct GET %s gave stats %s' % (node['ip'],
+ node['port'], repr('/%s/%s%s' % (node['device'], part, path)),
+ resp.status),
+ http_host=node['ip'], http_port=node['port'],
+ http_device=node['device'], http_status=resp.status,
+ http_reason=resp.reason)
+ if resp.status == 204:
+ return []
+ return json_loads(
+def direct_delete_container(node, part, account, container, conn_timeout=5,
+ response_timeout=15, headers={}):
+ path = '/%s/%s' % (account, container)
+ headers['X-Timestamp'] = normalize_timestamp(time())
+ with Timeout(conn_timeout):
+ conn = http_connect(node['ip'], node['port'], node['device'], part,
+ 'DELETE', path, headers)
+ with Timeout(response_timeout):
+ resp = conn.getresponse()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException(
+ 'Container server %s:%s direct DELETE %s gave status %s' %
+ (node['ip'], node['port'],
+ repr('/%s/%s%s' % (node['device'], part, path)),
+ resp.status),
+ http_host=node['ip'], http_port=node['port'],
+ http_device=node['device'], http_status=resp.status,
+ http_reason=resp.reason)
+ return resp
+def direct_head_object(node, part, account, container, obj, conn_timeout=5,
+ response_timeout=15):
+ """
+ Request object information directly from the object server.
+ :param node: node dictionary from the ring
+ :param part: partition the container is on
+ :param account: account name
+ :param container: container name
+ :param obj: object name
+ :param conn_timeout: timeout in seconds for establishing the connection
+ :param response_timeout: timeout in seconds for getting the response
+ :returns: tuple of (content-type, object size, last modified timestamp,
+ etag, metadata dictionary)
+ """
+ path = '/%s/%s/%s' % (account, container, obj)
+ with Timeout(conn_timeout):
+ conn = http_connect(node['ip'], node['port'], node['device'], part,
+ 'HEAD', path)
+ with Timeout(response_timeout):
+ resp = conn.getresponse()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException(
+ 'Object server %s:%s direct HEAD %s gave status %s' %
+ (node['ip'], node['port'],
+ repr('/%s/%s%s' % (node['device'], part, path)),
+ resp.status),
+ http_host=node['ip'], http_port=node['port'],
+ http_device=node['device'], http_status=resp.status,
+ http_reason=resp.reason)
+ metadata = {}
+ for key, value in resp.getheaders():
+ if key.lower().startswith('x-object-meta-'):
+ metadata[unquote(key[len('x-object-meta-'):])] = unquote(value)
+ return resp.getheader('content-type'), \
+ int(resp.getheader('content-length')), \
+ resp.getheader('last-modified'), \
+ resp.getheader('etag').strip('"'), \
+ metadata
+def direct_get_object(node, part, account, container, obj, conn_timeout=5,
+ response_timeout=15):
+ """
+ Get object directly from the object server.
+ :param node: node dictionary from the ring
+ :param part: partition the container is on
+ :param account: account name
+ :param container: container name
+ :param obj: object name
+ :param conn_timeout: timeout in seconds for establishing the connection
+ :param response_timeout: timeout in seconds for getting the response
+ :returns: object
+ """
+ path = '/%s/%s/%s' % (account, container, obj)
+ with Timeout(conn_timeout):
+ conn = http_connect(node['ip'], node['port'], node['device'], part,
+ 'GET', path)
+ with Timeout(response_timeout):
+ resp = conn.getresponse()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException(
+ 'Object server %s:%s direct GET %s gave status %s' %
+ (node['ip'], node['port'],
+ repr('/%s/%s%s' % (node['device'], part, path)),
+ resp.status),
+ http_host=node['ip'], http_port=node['port'],
+ http_device=node['device'], http_status=resp.status,
+ http_reason=resp.reason)
+ metadata = {}
+ for key, value in resp.getheaders():
+ if key.lower().startswith('x-object-meta-'):
+ metadata[unquote(key[len('x-object-meta-'):])] = unquote(value)
+ return (resp.getheader('content-type'),
+ int(resp.getheader('content-length')),
+ resp.getheader('last-modified'),
+ resp.getheader('etag').strip('"'),
+ metadata,
+def direct_delete_object(node, part, account, container, obj,
+ conn_timeout=5, response_timeout=15, headers={}):
+ """
+ Delete object directly from the object server.
+ :param node: node dictionary from the ring
+ :param part: partition the container is on
+ :param account: account name
+ :param container: container name
+ :param obj: object name
+ :param conn_timeout: timeout in seconds for establishing the connection
+ :param response_timeout: timeout in seconds for getting the response
+ :returns: response from server
+ """
+ path = '/%s/%s/%s' % (account, container, obj)
+ headers['X-Timestamp'] = normalize_timestamp(time())
+ with Timeout(conn_timeout):
+ conn = http_connect(node['ip'], node['port'], node['device'], part,
+ 'DELETE', path, headers)
+ with Timeout(response_timeout):
+ resp = conn.getresponse()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException(
+ 'Object server %s:%s direct DELETE %s gave status %s' %
+ (node['ip'], node['port'],
+ repr('/%s/%s%s' % (node['device'], part, path)),
+ resp.status),
+ http_host=node['ip'], http_port=node['port'],
+ http_device=node['device'], http_status=resp.status,
+ http_reason=resp.reason)
+ return resp
+def retry(func, *args, **kwargs):
+ """
+ Helper function to retry a given function a number of times.
+ :param func: callable to be called
+ :param retries: number of retries
+ :param error_log: logger for errors
+ :param args: arguments to send to func
+ :param kwargs: keyward arguments to send to func (if retries or
+ error_log are sent, they will be deleted from kwargs
+ before sending on to func)
+ :returns: restult of func
+ """
+ retries = 5
+ if 'retries' in kwargs:
+ retries = kwargs['retries']
+ del kwargs['retries']
+ error_log = None
+ if 'error_log' in kwargs:
+ error_log = kwargs['error_log']
+ del kwargs['error_log']
+ attempts = 0
+ backoff = 1
+ while attempts <= retries:
+ attempts += 1
+ try:
+ return attempts, func(*args, **kwargs)
+ except (socket.error, HTTPException, Timeout), err:
+ if error_log:
+ error_log(err)
+ if attempts > retries:
+ raise
+ except ClientException, err:
+ if error_log:
+ error_log(err)
+ if attempts > retries or err.http_status < 500 or \
+ err.http_status == 507 or err.http_status > 599:
+ raise
+ sleep(backoff)
+ backoff *= 2
+ # Shouldn't actually get down here, but just in case.
+ if args and 'ip' in args[0]:
+ raise ClientException('Raise too many retries',
+ http_host=args[0]['ip'], http_port=args[0]['port'],
+ http_device=args[0]['device'])
+ else:
+ raise ClientException('Raise too many retries')
diff --git a/swift/common/ b/swift/common/
new file mode 100644
index 000000000..bf1b07cae
--- /dev/null
+++ b/swift/common/
@@ -0,0 +1,35 @@
+# Copyright (c) 2010 OpenStack, LLC.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from eventlet import TimeoutError
+class MessageTimeout(TimeoutError):
+ def __init__(self, seconds=None, msg=None):
+ TimeoutError.__init__(self, seconds=seconds)
+ self.msg = msg
+ def __str__(self):
+ return '%s: %s' % (TimeoutError.__str__(self), self.msg)
+class AuditException(Exception): pass
+class AuthException(Exception): pass
+class ChunkReadTimeout(TimeoutError): pass
+class ChunkWriteTimeout(TimeoutError): pass
+class ConnectionTimeout(TimeoutError): pass
+class DriveNotMounted(Exception): pass
+class LockTimeout(MessageTimeout): pass
diff --git a/swift/common/ b/swift/common/
new file mode 100644
index 000000000..a483eaffd
--- /dev/null
+++ b/swift/common/
@@ -0,0 +1,28 @@
+# Copyright (c) 2010 OpenStack, LLC.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from webob import Response
+class HealthCheckController(object):
+ """Basic controller used for monitoring."""
+ def __init__(self, *args, **kwargs):
+ pass
+ @classmethod
+ def GET(self, req):
+ return Response(request=req, body="OK", content_type="text/plain")
+healthcheck = HealthCheckController.GET
diff --git a/swift/common/ b/swift/common/
new file mode 100644
index 000000000..e232e404f
--- /dev/null
+++ b/swift/common/
@@ -0,0 +1,272 @@
+# Copyright (c) 2010 OpenStack, LLC.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+Lucid comes with memcached: v1.4.2. Protocol documentation for that
+version is at:
+import cPickle as pickle
+import logging
+import socket
+import time
+from bisect import bisect
+from hashlib import md5
+# if ERROR_LIMIT_COUNT errors occur in ERROR_LIMIT_TIME seconds, the server
+# will be considered failed for ERROR_LIMIT_DURATION seconds.
+def md5hash(key):
+ return md5(key).hexdigest()
+class MemcacheRing(object):
+ """
+ Simple, consistent-hashed memcache client.
+ """
+ def __init__(self, servers, connect_timeout=CONN_TIMEOUT,
+ io_timeout=IO_TIMEOUT, tries=TRY_COUNT):
+ self._ring = {}
+ self._errors = dict(((serv, []) for serv in servers))
+ self._error_limited = dict(((serv, 0) for serv in servers))
+ for server in sorted(servers):
+ for i in xrange(NODE_WEIGHT):
+ self._ring[md5hash('%s-%s' % (server, i))] = server
+ self._tries = tries if tries <= len(servers) else len(servers)
+ self._sorted = sorted(self._ring.keys())
+ self._client_cache = dict(((server, []) for server in servers))
+ self._connect_timeout = connect_timeout
+ self._io_timeout = io_timeout
+ def _exception_occurred(self, server, e, action='talking'):
+ if isinstance(e, socket.timeout):
+ logging.error("Timeout %s to memcached: %s" % (action, server))
+ else:
+ logging.exception("Error %s to memcached: %s" % (action, server))
+ now = time.time()
+ self._errors[server].append(time.time())
+ if len(self._errors[server]) > ERROR_LIMIT_COUNT:
+ self._errors[server] = [err for err in self._errors[server]
+ if err > now - ERROR_LIMIT_TIME]
+ if len(self._errors[server]) > ERROR_LIMIT_COUNT:
+ self._error_limited[server] = now + ERROR_LIMIT_DURATION
+ logging.error('Error limiting server %s' % server)
+ def _get_conns(self, key):
+ """
+ Retrieves a server conn from the pool, or connects a new one.
+ Chooses the server based on a consistent hash of "key".
+ """
+ pos = bisect(self._sorted, key)
+ served = []
+ while len(served) < self._tries:
+ pos = (pos + 1) % len(self._sorted)
+ server = self._ring[self._sorted[pos]]
+ if server in served:
+ continue
+ served.append(server)
+ if self._error_limited[server] > time.time():
+ continue
+ try:
+ fp, sock = self._client_cache[server].pop()
+ yield server, fp, sock
+ except IndexError:
+ try:
+ host, port = server.split(':')
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
+ sock.settimeout(self._connect_timeout)
+ sock.connect((host, int(port)))
+ sock.settimeout(self._io_timeout)
+ yield server, sock.makefile(), sock
+ except Exception, e:
+ self._exception_occurred(server, e, 'connecting')
+ def _return_conn(self, server, fp, sock):
+ """ Returns a server connection to the pool """
+ self._client_cache[server].append((fp, sock))
+ def set(self, key, value, serialize=True, timeout=0):
+ """
+ Set a key/value pair in memcache
+ :param key: key
+ :param value: value
+ :param serialize: if True, value is pickled before sending to memcache
+ :param timeout: ttl in memcache
+ """
+ key = md5hash(key)
+ if timeout > 0:
+ timeout += time.time()
+ flags = 0
+ if serialize:
+ value = pickle.dumps(value, PICKLE_PROTOCOL)
+ flags |= PICKLE_FLAG
+ for (server, fp, sock) in self._get_conns(key):
+ try:
+ sock.sendall('set %s %d %d %s noreply\r\n%s\r\n' % \
+ (key, flags, timeout, len(value), value))
+ self._return_conn(server, fp, sock)
+ return
+ except Exception, e:
+ self._exception_occurred(server, e)
+ def get(self, key):
+ """
+ Gets the object specified by key. It will also unpickle the object
+ before returning if it is pickled in memcache.
+ :param key: key
+ :returns: value of the key in memcache
+ """
+ key = md5hash(key)
+ value = None
+ for (server, fp, sock) in self._get_conns(key):
+ try:
+ sock.sendall('get %s\r\n' % key)
+ line = fp.readline().strip().split()
+ while line[0].upper() != 'END':
+ if line[0].upper() == 'VALUE' and line[1] == key:
+ size = int(line[3])
+ value =
+ if int(line[2]) & PICKLE_FLAG:
+ value = pickle.loads(value)
+ fp.readline()
+ line = fp.readline().strip().split()
+ self._return_conn(server, fp, sock)
+ return value
+ except Exception, e:
+ self._exception_occurred(server, e)
+ def incr(self, key, delta=1, timeout=0):
+ """
+ Increments a key which has a numeric value by delta.
+ If the key can't be found, it's added as delta.
+ :param key: key
+ :param delta: amount to add to the value of key (or set as the value
+ if the key is not found)
+ :param timeout: ttl in memcache
+ """
+ key = md5hash(key)
+ for (server, fp, sock) in self._get_conns(key):
+ try:
+ sock.sendall('incr %s %s\r\n' % (key, delta))
+ line = fp.readline().strip().split()
+ if line[0].upper() == 'NOT_FOUND':
+ line[0] = str(delta)
+ sock.sendall('add %s %d %d %s noreply\r\n%s\r\n' % \
+ (key, 0, timeout, len(line[0]), line[0]))
+ ret = int(line[0].strip())
+ self._return_conn(server, fp, sock)
+ return ret
+ except Exception, e:
+ self._exception_occurred(server, e)
+ def delete(self, key):
+ """
+ Deletes a key/value pair from memcache.
+ :param key: key to be deleted
+ """
+ key = md5hash(key)
+ for (server, fp, sock) in self._get_conns(key):
+ try:
+ sock.sendall('delete %s noreply\r\n' % key)
+ self._return_conn(server, fp, sock)
+ return
+ except Exception, e:
+ self._exception_occurred(server, e)
+ def set_multi(self, mapping, server_key, serialize=True, timeout=0):
+ """
+ Sets multiple key/value pairs in memcache.
+ :param mapping: dictonary of keys and values to be set in memcache
+ :param servery_key: key to use in determining which server in the ring
+ is used
+ :param serialize: if True, value is pickled before sending to memcache
+ :param timeout: ttl for memcache
+ """
+ server_key = md5hash(server_key)
+ if timeout > 0:
+ timeout += time.time()
+ msg = ''
+ for key, value in mapping.iteritems():
+ key = md5hash(key)
+ flags = 0
+ if serialize:
+ value = pickle.dumps(value, PICKLE_PROTOCOL)
+ flags |= PICKLE_FLAG
+ msg += ('set %s %d %d %s noreply\r\n%s\r\n' %
+ (key, flags, timeout, len(value), value))
+ for (server, fp, sock) in self._get_conns(server_key):
+ try:
+ sock.sendall(msg)
+ self._return_conn(server, fp, sock)
+ return
+ except Exception, e:
+ self._exception_occurred(server, e)
+ def get_multi(self, keys, server_key):
+ """
+ Gets multiple values from memcache for the given keys.
+ :param keys: keys for values to be retrieved from memcache
+ :param servery_key: key to use in determining which server in the ring
+ is used
+ :returns: list of values
+ """
+ server_key = md5hash(server_key)
+ keys = [md5hash(key) for key in keys]
+ for (server, fp, sock) in self._get_conns(server_key):
+ try:
+ sock.sendall('get %s\r\n' % ' '.join(keys))
+ line = fp.readline().strip().split()
+ responses = {}
+ while line[0].upper() != 'END':
+ if line[0].upper() == 'VALUE':
+ size = int(line[3])
+ value =
+ if int(line[2]) & PICKLE_FLAG:
+ value = pickle.loads(value)
+ responses[line[1]] = value
+ fp.readline()
+ line = fp.readline().strip().split()
+ values = []
+ for key in keys:
+ if key in responses:
+ values.append(responses[key])
+ else:
+ values.append(None)
+ self._return_conn(server, fp, sock)
+ return values
+ except Exception, e:
+ self._exception_occurred(server, e)
diff --git a/swift/common/ b/swift/common/
new file mode 100644
index 000000000..c2c091ce7
--- /dev/null
+++ b/swift/common/
@@ -0,0 +1,490 @@
+# Copyright (c) 2010 OpenStack, LLC.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Miscellaneous utility functions for use with Swift."""
+import errno
+import fcntl
+import os
+import pwd
+import signal
+import sys
+import time
+import mimetools
+from hashlib import md5
+from random import shuffle
+from urllib import quote
+from contextlib import contextmanager
+import ctypes
+import ctypes.util
+import fcntl
+import struct
+import eventlet
+from eventlet import greenio, GreenPool, sleep, Timeout, listen
+from import socket, subprocess, ssl, thread, threading
+from swift.common.exceptions import LockTimeout, MessageTimeout
+# logging doesn't import patched as cleanly as one would like
+from logging.handlers import SysLogHandler
+import logging
+logging.thread =
+logging.threading =
+logging._lock = logging.threading.RLock()
+libc = ctypes.CDLL(ctypes.util.find_library('c'))
+sys_fallocate = libc.fallocate
+posix_fadvise = libc.posix_fadvise
+# Used by hash_path to offer a bit more security when generating hashes for
+# paths. It simply appends this value to all paths; guessing the hash a path
+# will end up with would also require knowing this suffix.
+HASH_PATH_SUFFIX = os.environ.get('SWIFT_HASH_PATH_SUFFIX', 'endcap')
+def get_param(req, name, default=None):
+ """
+ Get parameters from an HTTP request ensuring proper handling UTF-8
+ encoding.
+ :param req: Webob request object
+ :param name: parameter name
+ :param default: result to return if the parameter is not found
+ :returns: HTTP request parameter value
+ """
+ value = req.str_params.get(name, default)
+ if value:
+ value.decode('utf8') # Ensure UTF8ness
+ return value
+def fallocate(fd, size):
+ """
+ Pre-allocate disk space for a file file.
+ :param fd: file descriptor
+ :param size: size to allocate (in bytes)
+ """
+ if size > 0:
+ # 1 means "FALLOC_FL_KEEP_SIZE", which means it pre-allocates invisibly
+ ret = sys_fallocate(fd, 1, 0, ctypes.c_uint64(size))
+ # XXX: in (not very thorough) testing, errno always seems to be 0?
+ err = ctypes.get_errno()
+ if ret and err not in (0, errno.ENOSYS):
+ raise OSError(err, 'Unable to fallocate(%s)' % size)
+def drop_buffer_cache(fd, offset, length):
+ """
+ Drop 'buffer' cache for the given range of the given file.
+ :param fd: file descriptor
+ :param offset: start offset
+ :param length: length
+ """
+ ret = posix_fadvise(fd, ctypes.c_uint64(offset), ctypes.c_uint64(length), 4)
+ if ret != 0:
+ print "posix_fadvise(%s, %s, %s, 4) -> %s" % (fd, offset, length, ret)
+def normalize_timestamp(timestamp):
+ """
+ Format a timestamp (string or numeric) into a standardized
+ xxxxxxxxxx.xxxxx format.
+ :param timestamp: unix timestamp
+ :returns: normalized timestamp as a string
+ """
+ return "%016.05f" % (float(timestamp))
+def mkdirs(path):
+ """
+ Ensures the path is a directory or makes it if not. Errors if the path
+ exists but is a file or on permissions failure.
+ :param path: path to create
+ """
+ if not os.path.isdir(path):
+ try:
+ os.makedirs(path)
+ except OSError, err:
+ if err.errno != errno.EEXIST or not os.path.isdir(path):
+ raise
+def renamer(old, new): # pragma: no cover
+ """
+ Attempt to fix^H^H^Hhide race conditions like empty object directories
+ being removed by backend processes during uploads, by retrying.
+ :param old: old path to be renamed
+ :param new: new path to be renamed to
+ """
+ try:
+ mkdirs(os.path.dirname(new))
+ os.rename(old, new)
+ except OSError:
+ mkdirs(os.path.dirname(new))
+ os.rename(old, new)
+def split_path(path, minsegs=1, maxsegs=None, rest_with_last=False):
+ """
+ Validate and split the given HTTP request path.
+ **Examples**::
+ ['a'] = split_path('/a')
+ ['a', None] = split_path('/a', 1, 2)
+ ['a', 'c'] = split_path('/a/c', 1, 2)
+ ['a', 'c', 'o/r'] = split_path('/a/c/o/r', 1, 3, True)
+ :param path: HTTP Request path to be split
+ :param minsegs: Minimum number of segments to be extracted
+ :param maxsegs: Maximum number of segments to be extracted
+ :param rest_with_last: If True, trailing data will be returned as part
+ of last segment. If False, and there is
+ trailing data, raises ValueError.
+ :returns: list of segments with a length of maxsegs (non-existant
+ segments will return as None)
+ """
+ if not maxsegs:
+ maxsegs = minsegs
+ if minsegs > maxsegs:
+ raise ValueError('minsegs > maxsegs: %d > %d' % (minsegs, maxsegs))
+ if rest_with_last:
+ segs = path.split('/', maxsegs)
+ minsegs += 1
+ maxsegs += 1
+ count = len(segs)
+ if segs[0] or count < minsegs or count > maxsegs or \
+ '' in segs[1:minsegs]:
+ raise ValueError('Invalid path: %s' % quote(path))
+ else:
+ minsegs += 1
+ maxsegs += 1
+ segs = path.split('/', maxsegs)
+ count = len(segs)
+ if segs[0] or count < minsegs or count > maxsegs + 1 or \
+ '' in segs[1:minsegs] or (count == maxsegs + 1 and segs[maxsegs]):
+ raise ValueError('Invalid path: %s' % quote(path))
+ segs = segs[1:maxsegs]
+ segs.extend([None] * (maxsegs - 1 - len(segs)))
+ return segs
+class NullLogger():
+ """A no-op logger for eventlet wsgi."""
+ def write(self, *args):
+ #"Logs" the args to nowhere
+ pass
+class LoggerFileObject(object):
+ def __init__(self, logger):
+ self.logger = logger
+ def write(self, value):
+ value = value.strip()
+ if value:
+ if 'Connection reset by peer' in value:
+ self.logger.error('STDOUT: Connection reset by peer')
+ else:
+ self.logger.error('STDOUT: %s' % value)
+ def writelines(self, values):
+ self.logger.error('STDOUT: %s' % '#012'.join(values))
+ def close(self):
+ pass
+ def flush(self):
+ pass
+ def __iter__(self):
+ return self
+ def next(self):
+ raise IOError(errno.EBADF, 'Bad file descriptor')
+ def read(self, size=-1):
+ raise IOError(errno.EBADF, 'Bad file descriptor')
+ def readline(self, size=-1):
+ raise IOError(errno.EBADF, 'Bad file descriptor')
+ def tell(self):
+ return 0
+ def xreadlines(self):
+ return self
+def drop_privileges(user):
+ """
+ Sets the userid of the current process
+ :param user: User id to change privileges to
+ """
+ user = pwd.getpwnam(user)
+ os.setgid(user[3])
+ os.setuid(user[2])
+class NamedLogger(object):
+ """Cheesy version of the LoggerAdapter available in Python 3"""
+ def __init__(self, logger, server):
+ self.logger = logger
+ self.server = server
+ for proxied_method in ('debug', 'info', 'log', 'warn', 'warning',
+ 'error', 'critical'):
+ setattr(self, proxied_method,
+ self._proxy(getattr(logger, proxied_method)))
+ def _proxy(self, logger_meth):
+ def _inner_proxy(msg, *args, **kwargs):
+ msg = '%s %s' % (self.server, msg)
+ logger_meth(msg, *args, **kwargs)
+ return _inner_proxy
+ def getEffectiveLevel(self):
+ return self.logger.getEffectiveLevel()
+ def exception(self, msg, *args):
+ _, exc, _ = sys.exc_info()
+ call = self.logger.error
+ emsg = ''
+ if isinstance(exc, OSError):
+ if exc.errno in (errno.EIO, errno.ENOSPC):
+ emsg = str(exc)
+ else:
+ call = self.logger.exception
+ elif isinstance(exc, socket.error):
+ if exc.errno == errno.ECONNREFUSED:
+ emsg = 'Connection refused'
+ elif exc.errno == errno.EHOSTUNREACH:
+ emsg = 'Host unreachable'
+ else:
+ call = self.logger.exception
+ elif isinstance(exc, eventlet.Timeout):
+ emsg = exc.__class__.__name__
+ if hasattr(exc, 'seconds'):
+ emsg += ' (%ss)' % exc.seconds
+ if isinstance(exc, MessageTimeout):
+ if exc.msg:
+ emsg += ' %s' % exc.msg
+ else:
+ call = self.logger.exception
+ call('%s %s: %s' % (self.server, msg, emsg), *args)
+def get_logger(conf, name):
+ """
+ Get the current system logger using config settings.
+ **Log config and defaults**::
+ log_facility = LOG_LOCAL0
+ log_level = INFO
+ :param conf: Configuration dict to read settings from
+ :param name: Name of the logger
+ """
+ root_logger = logging.getLogger()
+ if hasattr(get_logger, 'handler') and get_logger.handler:
+ root_logger.removeHandler(get_logger.handler)
+ get_logger.handler = None
+ if conf is None:
+ root_logger.setLevel(logging.INFO)
+ return NamedLogger(root_logger, name)
+ get_logger.handler = SysLogHandler(address='/dev/log',
+ facility=getattr(SysLogHandler, conf.get('log_facility', 'LOG_LOCAL0'),
+ SysLogHandler.LOG_LOCAL0))
+ root_logger.addHandler(get_logger.handler)
+ root_logger.setLevel(
+ getattr(logging, conf.get('log_level', 'INFO').upper(), logging.INFO))
+ return NamedLogger(root_logger, name)
+def whataremyips():
+ """
+ Get the machine's ip addresses using ifconfig
+ :returns: list of Strings of IPv4 ip addresses
+ """
+ proc = subprocess.Popen(['/sbin/ifconfig'], stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT)
+ ret_val = proc.wait()
+ results ='\n')
+ return [x.split(':')[1].split()[0] for x in results if 'inet addr' in x]
+def storage_directory(datadir, partition, hash):
+ """
+ Get the storage directory
+ :param datadir: Base data directory
+ :param partition: Partition
+ :param hash: Account, container or object hash
+ :returns: Storage directory
+ """
+ return os.path.join(datadir, partition, hash[-3:], hash)
+def hash_path(account, container=None, object=None, raw_digest=False):
+ """
+ Get the connonical hash for an account/container/object
+ :param account: Account
+ :param container: Container
+ :param object: Object
+ :param raw_digest: If True, return the raw version rather than a hex digest
+ :returns: hash string
+ """
+ if object and not container:
+ raise ValueError('container is required if object is provided')
+ paths = [account]
+ if container:
+ paths.append(container)
+ if object:
+ paths.append(object)
+ if raw_digest:
+ return md5('/' + '/'.join(paths) + HASH_PATH_SUFFIX).digest()
+ else:
+ return md5('/' + '/'.join(paths) + HASH_PATH_SUFFIX).hexdigest()
+def lock_path(directory, timeout=10):
+ """
+ Context manager that acquires a lock on a directory. This will block until
+ the lock can be acquired, or the timeout time has expired (whichever occurs
+ first).
+ :param directory: directory to be locked
+ :param timeout: timeout (in seconds)
+ """
+ mkdirs(directory)
+ fd =, os.O_RDONLY)
+ try:
+ with LockTimeout(timeout, directory):
+ while True:
+ try:
+ fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
+ break
+ except IOError, err:
+ if err.errno != errno.EAGAIN:
+ raise
+ sleep(0.01)
+ yield True
+ finally:
+ os.close(fd)
+def lock_parent_directory(filename, timeout=10):
+ """
+ Context manager that acquires a lock on the parent directory of the given
+ file path. This will block until the lock can be acquired, or the timeout
+ time has expired (whichever occurs first).
+ :param filename: file path of the parent directory to be locked
+ :param timeout: timeout (in seconds)
+ """
+ return lock_path(os.path.dirname(filename))
+def get_time_units(time_amount):
+ """
+ Get a nomralized length of time in the largest unit of time (hours,
+ minutes, or seconds.)
+ :param time_amount: length of time in seconds
+ :returns: A touple of (length of time, unit of time) where unit of time is
+ one of ('h', 'm', 's')
+ """
+ time_unit = 's'
+ if time_amount > 60:
+ time_amount /= 60
+ time_unit = 'm'
+ if time_amount > 60:
+ time_amount /= 60
+ time_unit = 'h'
+ return time_amount, time_unit
+def compute_eta(start_time, current_value, final_value):
+ """
+ Compute an ETA. Now only if we could also have a progress bar...
+ :param start_time: Unix timestamp when the operation began
+ :param current_value: Current value
+ :param final_value: Final value
+ :returns: ETA as a tuple of (length of time, unit of time) where unit of
+ time is one of ('h', 'm', 's')
+ """
+ elapsed = time.time() - start_time
+ completion = (float(current_value) / final_value) or 0.00001
+ return get_time_units(1.0 / completion * elapsed - elapsed)
+def iter_devices_partitions(devices_dir, item_type):
+ """
+ Iterate over partitions accross all devices.
+ :param devices_dir: Path to devices
+ :param item_type: One of 'accounts', 'containers', or 'objects'
+ :returns: Each iteration returns a tuple of (device, partition)
+ """
+ devices = os.listdir(devices_dir)
+ shuffle(devices)
+ devices_partitions = []
+ for device in devices:
+ partitions = os.listdir(os.path.join(devices_dir, device, item_type))
+ shuffle(partitions)
+ devices_partitions.append((device, iter(partitions)))
+ yielded = True
+ while yielded:
+ yielded = False
+ for device, partitions in devices_partitions:
+ try:
+ yield device,
+ yielded = True
+ except StopIteration:
+ pass
+def unlink_older_than(path, mtime):
+ """
+ Remove any file in a given path that that was last modified before mtime.
+ :param path: Path to remove file from
+ :mtime: Timestamp of oldest file to keep
+ """
+ if os.path.exists(path):
+ for fname in os.listdir(path):
+ fpath = os.path.join(path, fname)
+ try:
+ if os.path.getmtime(fpath) < mtime:
+ os.unlink(fpath)
+ except OSError:
+ pass
diff --git a/swift/common/ b/swift/common/
new file mode 100644
index 000000000..d025896d3
--- /dev/null
+++ b/swift/common/
@@ -0,0 +1,164 @@
+# Copyright (c) 2010 OpenStack, LLC.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""WSGI tools for use with swift."""
+import errno
+import os
+import signal
+import sys
+import time
+import mimetools
+import eventlet
+from eventlet import greenio, GreenPool, sleep, wsgi, listen
+# Hook to ensure connection resets don't blow up our servers.
+# Remove with next release of Eventlet that has it in the set already.
+from errno import ECONNRESET
+from import socket, ssl
+from swift.common.utils import get_logger, drop_privileges, \
+ LoggerFileObject, NullLogger
+def monkey_patch_mimetools():
+ """
+ mimetools.Message defaults content-type to "text/plain"
+ This changes it to default to None, so we can detect missing headers.
+ """
+ orig_parsetype = mimetools.Message.parsetype
+ def parsetype(self):
+ if not self.typeheader:
+ self.type = None
+ self.maintype = None
+ self.subtype = None
+ self.plisttext = ''
+ else:
+ orig_parsetype(self)
+ mimetools.Message.parsetype = parsetype
+# We might be able to pull pieces of this out to test, but right now it seems
+# like more work than it's worth.
+def run_wsgi(app, conf, *args, **kwargs): # pragma: no cover
+ """
+ Loads common settings from conf, then instantiates app and runs
+ the server using the specified number of workers.
+ :param app: WSGI callable
+ :param conf: Configuration dictionary
+ """
+ if 'logger' in kwargs:
+ logger = kwargs['logger']
+ else:
+ logger = get_logger(conf, app.log_name)
+ # log uncaught exceptions
+ sys.excepthook = lambda *exc_info: \
+ logger.critical('UNCAUGHT EXCEPTION', exc_info=exc_info)
+ sys.stdout = sys.stderr = LoggerFileObject(logger)
+ try:
+ os.setsid()
+ except OSError:
+ no_cover = True # pass
+ bind_addr = (conf.get('bind_ip', ''),
+ int(conf.get('bind_port', kwargs.get('default_port', 8080))))
+ sock = None
+ retry_until = time.time() + 30
+ while not sock and time.time() < retry_until:
+ try:
+ sock = listen(bind_addr)
+ if 'cert_file' in conf:
+ sock = ssl.wrap_socket(sock, certfile=conf['cert_file'],
+ keyfile=conf['key_file'])
+ except socket.error, err:
+ if err.args[0] != errno.EADDRINUSE:
+ raise
+ sleep(0.1)
+ if not sock:
+ raise Exception('Could not bind to %s:%s after trying for 30 seconds' %
+ bind_addr)
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ # in my experience, sockets can hang around forever without keepalive
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
+ sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 600)
+ worker_count = int(conf.get('workers', '1'))
+ drop_privileges(conf.get('user', 'swift'))
+ if isinstance(app, type):
+ # Instantiate app if it hasn't been already
+ app = app(conf, *args)
+ def run_server():
+ wsgi.HttpProtocol.default_request_version = "HTTP/1.0"
+ eventlet.hubs.use_hub('poll')
+ eventlet.patcher.monkey_patch(all=False, socket=True)
+ monkey_patch_mimetools()
+ pool = GreenPool(size=1024)
+ try:
+ wsgi.server(sock, app, NullLogger(), custom_pool=pool)
+ except socket.error, err:
+ if err[0] != errno.EINVAL:
+ raise
+ pool.waitall()
+ # Useful for profiling [no forks].
+ if worker_count == 0:
+ run_server()
+ return
+ def kill_children(*args):
+ """Kills the entire process group."""
+ logger.error('SIGTERM received')
+ signal.signal(signal.SIGTERM, signal.SIG_IGN)
+ running[0] = False
+ os.killpg(0, signal.SIGTERM)
+ def hup(*args):
+ """Shuts down the server, but allows running requests to complete"""
+ logger.error('SIGHUP received')
+ signal.signal(signal.SIGHUP, signal.SIG_IGN)
+ running[0] = False
+ running = [True]
+ signal.signal(signal.SIGTERM, kill_children)
+ signal.signal(signal.SIGHUP, hup)
+ children = []
+ while running[0]:
+ while len(children) < worker_count:
+ pid = os.fork()
+ if pid == 0:
+ signal.signal(signal.SIGHUP, signal.SIG_DFL)
+ signal.signal(signal.SIGTERM, signal.SIG_DFL)
+ run_server()
+'Child %d exiting normally' % os.getpid())
+ return
+ else:
+'Started child %s' % pid)
+ children.append(pid)
+ try:
+ pid, status = os.wait()
+ if os.WIFEXITED(status) or os.WIFSIGNALED(status):
+ logger.error('Removing dead child %s' % pid)
+ children.remove(pid)
+ except OSError, err:
+ if err.errno not in (errno.EINTR, errno.ECHILD):
+ raise
+ greenio.shutdown_safe(sock)
+ sock.close()