summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2014-02-19 18:23:01 +0000
committerAsk Solem <ask@celeryproject.org>2014-05-03 22:27:17 +0100
commitb5df9fbe27a3350d4e0bb317a2254f0baa8d8bd7 (patch)
tree3187a6de40b46eade23b6b398954a138a3ce52e3
parent5952dcdedd613cdf3faaaf6d90912626b7079fee (diff)
downloadkombu-b5df9fbe27a3350d4e0bb317a2254f0baa8d8bd7.tar.gz
Async HTTP client using curl
-rw-r--r--kombu/async/http/__init__.py10
-rw-r--r--kombu/async/http/base.py221
-rw-r--r--kombu/async/http/curl.py273
-rw-r--r--kombu/async/hub.py5
-rw-r--r--kombu/exceptions.py14
-rw-r--r--kombu/tests/async/test_http.py71
-rw-r--r--kombu/utils/__init__.py10
-rw-r--r--kombu/utils/functional.py139
8 files changed, 740 insertions, 3 deletions
diff --git a/kombu/async/http/__init__.py b/kombu/async/http/__init__.py
new file mode 100644
index 00000000..8c18dfa9
--- /dev/null
+++ b/kombu/async/http/__init__.py
@@ -0,0 +1,10 @@
+from __future__ import absolute_import
+
+from .base import Request, Headers, Response
+
+__all__ = ['Client', 'Headers', 'Response', 'Request']
+
+
+def Client(hub, **kwargs):
+ from .curl import CurlClient
+ return CurlClient(hub, **kwargs)
diff --git a/kombu/async/http/base.py b/kombu/async/http/base.py
new file mode 100644
index 00000000..4b1c56cc
--- /dev/null
+++ b/kombu/async/http/base.py
@@ -0,0 +1,221 @@
+from __future__ import absolute_import
+
+import sys
+
+from amqp import promise
+
+from kombu.exceptions import HttpError
+from kombu.five import items
+from kombu.utils import cached_property, coro
+from kombu.utils.encoding import bytes_to_str
+from kombu.utils.functional import memoize
+
+try: # pragma: no cover
+ from http.client import responses
+except ImportError:
+ from httplib import responses # noqa
+
+__all__ = ['Headers', 'Response', 'Request']
+
+PYPY = hasattr(sys, 'pypy_version_info')
+
+
+@memoize(maxsize=1000)
+def normalize_header(key):
+ return '-'.join(p.capitalize() for p in key.split('-'))
+
+
+class Headers(dict):
+ # TODO: This is just a regular dict and will not perform normalization
+ # when looking up keys etc.
+
+ #: Set when all of the headers have been read.
+ complete = False
+
+ #: Internal attribute used to keep track of continuation lines.
+ _prev_key = None
+
+
+class Request(object):
+ """A HTTP Request.
+
+ :param url: The URL to request.
+ :param method: The HTTP method to use (defaults to ``GET``).
+ :keyword headers: Optional headers for this request
+ (:class:`dict` or :class:`~kombu.async.http.Headers`).
+ :keyword body: Optional body for this request.
+ :keyword connect_timeout: Connection timeout in float seconds
+ (default 30.0).
+ :keyword timeout: Time in float seconds before the request times out
+ (default 30.0).
+ :keyword follow_redirects: Specify if the client should follow redirects
+ (enabled by default).
+ :keyword max_redirects: Maximum number of redirects (default 6).
+ :keyword use_gzip: Allow the server to use gzip compression (enabled by
+ default).
+ :keyword validate_cert: Set to true if the server certificate should be
+ verified when performing ``https://`` requests (enabled by default).
+ :keyword user_agent: Custom user agent for this request.
+ :keyword network_interace: Network interface to use for this request.
+ :keyword on_ready: Callback to be called when the response has been
+ received. Must accept single ``response`` argument.
+ :kewyord on_stream: Optional callback to be called every time body content
+ has been read from the socket. If specified then the response body
+ and buffer attributes will not be available.
+ :keyword on_timeout: Optional callback to be called if the request
+ times out.
+ :keyword on_header: Optional callback to be called for every header line
+ received from the server. The signature is ``(headers, line)``
+ and note that if you want ``response.headers`` to be populated
+ then your callback needs to also call
+ ``client.on_header(headers, line)``.
+ :keyword on_prepare: Optional callback that is implementation specific
+ (e.g. curl client will pass the ``curl`` instance to this callback).
+ :keyword proxy_host: Optional proxy host. Note that a ``proxy_port`` must
+ also be provided or a :exc:`ValueError` will be raised.
+ :keyword proxy_username: Optional username to use when logging in
+ to the proxy.
+ :keyword proxy_password: Optional password to use when authenticating
+ with the proxy server.
+ :keyword ca_certs: Custom CA certificates file to use.
+ :keyword client_key: Optional filename for client SSL key.
+ :keyword client_cert: Optional filename for client SSL certificate.
+
+ """
+
+ url = headers = body = user_agent = network_interface = \
+ on_stream = on_timeout = on_ready = on_header = on_prepare = \
+ proxy_host = proxy_port = proxy_username = proxy_password = \
+ ca_certs = client_key = client_cert = None
+
+ method = 'GET'
+ connect_timeout = 30.0
+ request_timeout = 30.0
+ follow_redirects = True
+ max_redirects = 6
+ use_gzip = True
+ validate_cert = True
+
+ if PYPY:
+ __slots__ = ('__weakref__', )
+
+ def __init__(self, url, method='GET', on_ready=None, **kwargs):
+ self.url = url
+ self.method = method or self.method
+ self.on_ready = on_ready or promise()
+ if kwargs:
+ for k, v in items(kwargs):
+ setattr(self, k, v)
+ if not isinstance(self.headers, Headers):
+ self.headers = Headers(self.headers or {})
+
+ def then(self, callback, errback=None):
+ self.on_ready.then(callback, errback)
+
+
+class Response(object):
+ """HTTP Response.
+
+ :param request: See :attr:`request`.
+ :keyword code: See :attr:`code`.
+ :keyword headers: See :attr:`headers`.
+ :keyword buffer: See :attr:`buffer`
+ :keyword effective_url: See :attr:`effective_url`.
+ :keyword status: See :attr:`status`.
+
+ """
+
+ # :class:`Request` object used to get this response.
+ request = None
+
+ #: HTTP response code (e.g. 200, 404, or 500).
+ code = None
+
+ #: HTTP headers for this response (:class:`Headers`).
+ headers = None
+
+ #: Socket read buffer.
+ buffer = None
+
+ #: The destination url for this request after following redirects.
+ effective_url = None
+
+ #: Error instance if the request resulted in a HTTP error code.
+
+ #: Human equivalent of :attr:`code`, e.g. ``OK``, `Not found`, or
+ #: 'Internal Server Error'.
+ status = None
+
+ if PYPY:
+ __slots__ = ('__weakref__', )
+
+ def __init__(self, request, code, headers=None, buffer=None,
+ effective_url=None, error=None, status=None):
+ self.request = request
+ self.code = code
+ self.headers = headers if headers is not None else Headers()
+ self.buffer = buffer
+ self.effective_url = effective_url or request.url
+
+ self.status = status or responses.get(self.code, 'Unknown')
+ self.error = error
+ if self.error is None and (self.code < 200 or self.code > 299):
+ self.error = HttpError(self.code, self.status, self)
+
+ def raise_for_error(self):
+ """Raise :class:`~kombu.exceptions.HttpError` if the request resulted
+ in a HTTP error code."""
+ if self.error:
+ raise self.error
+
+ @cached_property
+ def body(self):
+ """The full contents of the response body.
+
+ Note that accessing this propery will evaluate the buffer
+ and subsequent accesses will be cached.
+
+ """
+ if self.buffer is not None:
+ return self.buffer.getvalue()
+
+
+@coro
+def header_parser(keyt=normalize_header):
+ while 1:
+ (line, headers) = yield
+ if line.startswith('HTTP/'):
+ continue
+ elif not line:
+ headers.complete = True
+ continue
+ elif line[0].isspace():
+ headers[headers._prev_key] = ' '.join(['', line.lstrip()])
+ else:
+ key, value = line.split(':', 1)
+ key = headers._prev_key = keyt(key)
+ headers[key] = value.strip()
+
+
+class BaseClient(object):
+ Headers = Headers
+ Request = Request
+ Response = Response
+
+ def __init__(self, hub, **kwargs):
+ self.hub = hub
+ self._header_parser = header_parser()
+
+ def perform(self, request, **kwargs):
+ if not isinstance(request, self.Request):
+ request = self.Request(request, **kwargs)
+ self.add_request(request)
+
+ def close(self):
+ pass
+
+ def on_header(self, headers, line):
+ try:
+ self._header_parser.send((bytes_to_str(line), headers))
+ except StopIteration:
+ self._header_parser = header_parser()
diff --git a/kombu/async/http/curl.py b/kombu/async/http/curl.py
new file mode 100644
index 00000000..6d31463f
--- /dev/null
+++ b/kombu/async/http/curl.py
@@ -0,0 +1,273 @@
+from __future__ import absolute_import
+
+import pycurl
+
+from collections import deque
+from functools import partial
+from io import BytesIO
+from time import time
+
+from kombu.async.hub import READ, WRITE
+from kombu.exceptions import HttpError
+from kombu.five import items
+from kombu.utils.encoding import bytes_to_str
+
+from .base import BaseClient
+
+try:
+ import pycurl # noqa
+except ImportError:
+ pycurl = Curl = METH_TO_CURL = None # noqa
+else:
+ from pycurl import Curl # noqa
+
+ METH_TO_CURL = { # noqa
+ 'GET': pycurl.HTTPGET,
+ 'POST': pycurl.POST,
+ 'PUT': pycurl.UPLOAD,
+ 'HEAD': pycurl.NOBODY,
+ }
+
+__all__ = ['CurlClient']
+
+DEFAULT_USER_AGENT = 'Mozilla/5.0 (compatible; pycurl)'
+EXTRA_METHODS = frozenset(['DELETE', 'OPTIONS', 'PATCH'])
+
+
+class CurlClient(BaseClient):
+ Curl = Curl
+
+ def __init__(self, hub, max_clients=10):
+ if pycurl is None:
+ raise ImportError('The curl client requires the pycurl library.')
+ super(CurlClient, self).__init__(hub)
+ self.max_clients = max_clients
+
+ self._multi = pycurl.CurlMulti()
+ self._multi.setopt(pycurl.M_TIMERFUNCTION, self._set_timeout)
+ self._multi.setopt(pycurl.M_SOCKETFUNCTION, self._handle_socket)
+ self._curls = [self.Curl() for i in range(max_clients)]
+ self._free_list = self._curls[:]
+ self._pending = deque()
+ self._fds = {}
+
+ self._socket_action = self._multi.socket_action
+ self._timeout_check_tref = self.hub.call_repeatedly(
+ 1.0, self._timeout_check,
+ )
+
+ # pycurl 7.29.0 workaround
+ dummy_curl_handle = pycurl.Curl()
+ self._multi.add_handle(dummy_curl_handle)
+ self._multi.remove_handle(dummy_curl_handle)
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *exc_info):
+ self.close()
+
+ def close(self):
+ self._timeout_check_tref.cancel()
+ for _curl in self._curls:
+ _curl.close()
+ self._multi.close()
+
+ def add_request(self, request):
+ self._pending.append(request)
+ self._process_queue()
+ self._set_timeout(0)
+
+ def _handle_socket(self, event, fd, multi, data, _pycurl=pycurl):
+ if event == _pycurl.POLL_REMOVE:
+ if fd in self._fds:
+ self.hub.remove(fd)
+ self._fds.pop(fd, None)
+ else:
+ if fd in self._fds:
+ self.hub.remove(fd)
+ if event == _pycurl.POLL_IN:
+ self.hub.add_reader(fd, self.on_readable, fd)
+ self._fds[fd] = READ
+ elif event == _pycurl.POLL_OUT:
+ self.hub.add_writer(fd, self.on_writable, fd)
+ self._fds[fd] = WRITE
+ elif event == _pycurl.POLL_INOUT:
+ self.hub.add_reader(fd, self.on_readable, fd)
+ self.hub.add_writer(fd, self.on_writable, fd)
+ self._fds[fd] = READ | WRITE
+
+ def _set_timeout(self, msecs):
+ pass # TODO
+
+ def _timeout_check(self, _pycurl=pycurl):
+ while 1:
+ try:
+ ret, _ = self._multi.socket_all()
+ except pycurl.error as exc:
+ ret = exc.args[0]
+ if ret != _pycurl.E_CALL_MULTI_PERFORM:
+ break
+ self._process_pending_requests()
+
+ def on_readable(self, fd, _pycurl=pycurl):
+ return self._on_event(fd, _pycurl.CSELECT_IN)
+
+ def on_writable(self, fd, _pycurl=pycurl):
+ return self._on_event(fd, _pycurl.CSELECT_OUT)
+
+ def _on_event(self, fd, event, _pycurl=pycurl):
+ while 1:
+ try:
+ ret, _ = self._socket_action(fd, event)
+ except pycurl.error as exc:
+ ret = exc.args[0]
+ if ret != _pycurl.E_CALL_MULTI_PERFORM:
+ break
+ self._process_pending_requests()
+
+ def _process_pending_requests(self):
+ while 1:
+ q, succeeded, failed = self._multi.info_read()
+ for curl in succeeded:
+ self._process(curl)
+ for curl, errno, reason in failed:
+ self._process(curl, errno, reason)
+ if q == 0:
+ break
+ self._process_queue()
+
+ def _process_queue(self):
+ while 1:
+ started = 0
+ while self._free_list and self._pending:
+ started += 1
+ curl = self._free_list.pop()
+ request = self._pending.popleft()
+ headers = self.Headers()
+ buf = BytesIO()
+ curl.info = {
+ 'headers': headers,
+ 'buffer': buf,
+ 'request': request,
+ 'curl_start_time': time(),
+ }
+ self._setup_request(curl, request, buf, headers)
+ self._multi.add_handle(curl)
+ if not started:
+ break
+
+ def _process(self, curl, errno=None, reason=None, _pycurl=pycurl):
+ info, curl.info = curl.info, None
+ self._multi.remove_handle(curl)
+ self._free_list.append(curl)
+ buffer = info['buffer']
+ if errno:
+ code = 599
+ error = HttpError(code, reason)
+ error.errno = errno
+ effective_url = None
+ buffer.close()
+ buffer = None
+ else:
+ error = None
+ code = curl.getinfo(_pycurl.HTTP_CODE)
+ effective_url = curl.getinfo(_pycurl.EFFECTIVE_URL)
+ buffer.seek(0)
+ try:
+ request = info['request']
+ request.on_ready(self.Response(
+ request=request, code=code, headers=info['headers'],
+ buffer=buffer, effective_url=effective_url, error=error,
+ ))
+ except Exception as exc:
+ raise
+ self.hub.on_callback_error(request.on_ready, exc)
+
+ def _setup_request(self, curl, request, buffer, headers, _pycurl=pycurl):
+ setopt = curl.setopt
+ setopt(_pycurl.URL, bytes_to_str(request.url))
+
+ # see tornado curl client
+ request.headers.setdefault('Expect', '')
+ request.headers.setdefault('Pragma', '')
+
+ curl.setopt(
+ _pycurl.HTTPHEADER,
+ ['%s %s'.format(h) for h in items(request.headers)],
+ )
+
+ setopt(
+ _pycurl.HEADERFUNCTION,
+ partial(request.on_header or self.on_header, request.headers),
+ )
+ setopt(
+ _pycurl.WRITEFUNCTION, request.on_stream or buffer.write,
+ )
+ setopt(
+ _pycurl.FOLLOWLOCATION, request.follow_redirects,
+ )
+ setopt(
+ _pycurl.USERAGENT,
+ bytes_to_str(request.user_agent or DEFAULT_USER_AGENT),
+ )
+ if request.network_interface:
+ setopt(_pycurl.INTERFACE, request.network_interface)
+ setopt(
+ _pycurl.ENCODING, 'gzip,deflate' if request.use_gzip else 'none',
+ )
+ if request.proxy_host:
+ if not request.proxy_port:
+ raise ValueError('Request with proxy_host but no proxy_port')
+ setopt(_pycurl.PROXY, request.proxy_host)
+ setopt(_pycurl.PROXYPORT, request.proxy_port)
+ if request.proxy_username:
+ setopt(_pycurl.PROXYUSERPWD, '{0}:{1}'.format(
+ request.proxy_username, request.proxy_password or ''))
+ else:
+ setopt(_pycurl.PROXY, '')
+ curl.unsetopt(_pycurl.PROXYUSERPWD)
+
+ setopt(_pycurl.SSL_VERIFYPEER, 1 if request.validate_cert else 0)
+ setopt(_pycurl.SSL_VERIFYHOST, 2 if request.validate_cert else 0)
+ if request.ca_certs is not None:
+ setopt(_pycurl.CAINFO, request.ca_certs)
+
+ setopt(_pycurl.IPRESOLVE, pycurl.IPRESOLVE_WHATEVER)
+
+ for meth in METH_TO_CURL.values():
+ setopt(meth, False)
+ try:
+ meth = METH_TO_CURL[request.method]
+ except KeyError:
+ curl.setopt(_pycurl.CUSTOMREQUEST, request.method)
+ else:
+ curl.unsetopt(_pycurl.CUSTOMREQUEST)
+ setopt(meth, True)
+
+ if request.method in ('POST', 'PUT'):
+ assert request.body is not None
+ reqbuffer = BytesIO(request.body)
+ setopt(_pycurl.READFUNCTION, reqbuffer.read)
+ if request.method == 'POST':
+
+ def ioctl(cmd):
+ if cmd == _pycurl.IOCMD_RESTARTREAD:
+ reqbuffer.seek(0)
+ setopt(_pycurl.IOCTLFUNCTION, ioctl)
+ setopt(_pycurl.POSTFIELDSIZE, len(request.body))
+ else:
+ setopt(_pycurl.INFILESIZE, len(request.body))
+ elif request.method == 'GET':
+ assert request.body is None
+
+ # TODO Does not support Basic AUTH
+ curl.unsetopt(_pycurl.USERPWD)
+
+ if request.client_cert is not None:
+ setopt(_pycurl.SSLCERT, request.client_cert)
+ if request.client_key is not None:
+ setopt(_pycurl.SSLKEY, request.client_key)
+
+ if request.on_prepare is not None:
+ request.on_prepare(curl)
diff --git a/kombu/async/hub.py b/kombu/async/hub.py
index 673a60ac..264f8188 100644
--- a/kombu/async/hub.py
+++ b/kombu/async/hub.py
@@ -247,6 +247,11 @@ class Hub(object):
self.writers.pop(fd, None)
self.consolidate.discard(fd)
+ def on_callback_error(self, callback, exc):
+ logger.error(
+ 'Callback %r raised exception: %r', callback, exc, exc_info=1,
+ )
+
def create_loop(self,
generator=generator, sleep=sleep, min=min, next=next,
Empty=Empty, StopIteration=StopIteration,
diff --git a/kombu/exceptions.py b/kombu/exceptions.py
index 716bc693..0798bf7a 100644
--- a/kombu/exceptions.py
+++ b/kombu/exceptions.py
@@ -15,7 +15,7 @@ __all__ = ['NotBoundError', 'MessageStateError', 'TimeoutError',
'LimitExceeded', 'ConnectionLimitExceeded',
'ChannelLimitExceeded', 'ConnectionError', 'ChannelError',
'VersionMismatch', 'SerializerNotInstalled', 'ResourceError',
- 'SerializationError', 'EncodeError', 'DecodeError']
+ 'SerializationError', 'EncodeError', 'DecodeError', 'HttpError']
TimeoutError = socket.timeout
@@ -81,3 +81,15 @@ class InconsistencyError(ConnectionError):
"""Data or environment has been found to be inconsistent,
depending on the cause it may be possible to retry the operation."""
pass
+
+
+class HttpError(Exception):
+
+ def __init__(self, code, message=None, response=None):
+ self.code = code
+ self.message = message
+ self.response = response
+ super(HttpError, self).__init__(code, message, response)
+
+ def __str__(self):
+ return 'HTTP {0.code}: {0.message}'.format(self)
diff --git a/kombu/tests/async/test_http.py b/kombu/tests/async/test_http.py
new file mode 100644
index 00000000..785f9e82
--- /dev/null
+++ b/kombu/tests/async/test_http.py
@@ -0,0 +1,71 @@
+from __future__ import absolute_import
+
+from amqp import promise
+from kombu.async import Hub
+from kombu.async import http
+
+from kombu.tests.case import Case, Mock
+
+
+class test_Request(Case):
+
+ def test_init(self):
+ x = http.Request('http://foo', method='POST')
+ self.assertEqual(x.url, 'http://foo')
+ self.assertEqual(x.method, 'POST')
+
+ x = http.Request('x', max_redirects=100)
+ self.assertEqual(x.max_redirects, 100)
+
+ self.assertIsInstance(x.headers, http.Headers)
+ h = http.Headers()
+ x = http.Request('x', headers=h)
+ self.assertIs(x.headers, h)
+ self.assertIsInstance(x.on_ready, promise)
+
+ def test_then(self):
+ callback = Mock()
+ x = http.Request('http://foo')
+ x.then(callback)
+
+ x.on_ready(1)
+ callback.assert_called_with(1)
+
+
+class test_Client(Case):
+
+ def test_get_request(self):
+ hub = Hub()
+ callback = Mock(name='callback')
+
+ def on_ready(response):
+ print('{0.effective_url} -> {0.code}'.format(response))
+ on_ready = promise(on_ready)
+ on_ready.then(callback)
+ requests = [
+ http.Request(
+ 'http://localhost:8000/README.rst',
+ on_ready=on_ready,
+ ),
+ http.Request(
+ 'http://localhost:8000/AUTHORS',
+ on_ready=on_ready,
+ ),
+ http.Request(
+ 'http://localhost:8000/pavement.py',
+ on_ready=on_ready,
+ ),
+ http.Request(
+ 'http://localhost:8000/setup.py',
+ on_ready=on_ready,
+ ),
+ ]
+ client = http.Client(hub)
+ for request in requests:
+ request.then(promise(callback))
+ client.perform(request)
+
+ print('START PERFORM')
+ while callback.call_count < len(requests):
+ hub.run_once()
+ print('-END PERFORM')
diff --git a/kombu/utils/__init__.py b/kombu/utils/__init__.py
index 0745ddfe..5d5e7bfa 100644
--- a/kombu/utils/__init__.py
+++ b/kombu/utils/__init__.py
@@ -448,3 +448,13 @@ def maybe_fileno(f):
return fileno(f)
except FILENO_ERRORS:
pass
+
+
+def coro(gen):
+
+ @wraps(gen)
+ def wind_up(*args, **kwargs):
+ it = gen(*args, **kwargs)
+ next(it)
+ return it
+ return wind_up
diff --git a/kombu/utils/functional.py b/kombu/utils/functional.py
index 746f42fa..f959520b 100644
--- a/kombu/utils/functional.py
+++ b/kombu/utils/functional.py
@@ -1,12 +1,147 @@
from __future__ import absolute_import
import sys
+import threading
from collections import Iterable, Mapping
+from functools import wraps
+from itertools import islice
-from kombu.five import string_t
+from kombu.five import UserDict, string_t, keys
-__all__ = ['lazy', 'maybe_evaluate', 'is_list', 'maybe_list']
+from .compat import OrderedDict
+
+__all__ = ['LRUCache', 'memoize', 'lazy', 'maybe_evaluate',
+ 'is_list', 'maybe_list']
+KEYWORD_MARK = object()
+
+
+class LRUCache(UserDict):
+ """LRU Cache implementation using a doubly linked list to track access.
+
+ :keyword limit: The maximum number of keys to keep in the cache.
+ When a new key is inserted and the limit has been exceeded,
+ the *Least Recently Used* key will be discarded from the
+ cache.
+
+ """
+
+ def __init__(self, limit=None):
+ self.limit = limit
+ self.mutex = threading.RLock()
+ self.data = OrderedDict()
+
+ def __getitem__(self, key):
+ with self.mutex:
+ value = self[key] = self.data.pop(key)
+ return value
+
+ def update(self, *args, **kwargs):
+ with self.mutex:
+ data, limit = self.data, self.limit
+ data.update(*args, **kwargs)
+ if limit and len(data) > limit:
+ # pop additional items in case limit exceeded
+ # negative overflow will lead to an empty list
+ for item in islice(iter(data), len(data) - limit):
+ data.pop(item)
+
+ def __setitem__(self, key, value):
+ # remove least recently used key.
+ with self.mutex:
+ if self.limit and len(self.data) >= self.limit:
+ self.data.pop(next(iter(self.data)))
+ self.data[key] = value
+
+ def __iter__(self):
+ return iter(self.data)
+
+ def _iterate_items(self):
+ for k in self:
+ try:
+ yield (k, self.data[k])
+ except KeyError: # pragma: no cover
+ pass
+ iteritems = _iterate_items
+
+ def _iterate_values(self):
+ for k in self:
+ try:
+ yield self.data[k]
+ except KeyError: # pragma: no cover
+ pass
+ itervalues = _iterate_values
+
+ def _iterate_keys(self):
+ # userdict.keys in py3k calls __getitem__
+ return keys(self.data)
+ iterkeys = _iterate_keys
+
+ def incr(self, key, delta=1):
+ with self.mutex:
+ # this acts as memcached does- store as a string, but return a
+ # integer as long as it exists and we can cast it
+ newval = int(self.data.pop(key)) + delta
+ self[key] = str(newval)
+ return newval
+
+ def __getstate__(self):
+ d = dict(vars(self))
+ d.pop('mutex')
+ return d
+
+ def __setstate__(self, state):
+ self.__dict__ = state
+ self.mutex = threading.RLock()
+
+ if sys.version_info[0] == 3: # pragma: no cover
+ keys = _iterate_keys
+ values = _iterate_values
+ items = _iterate_items
+ else: # noqa
+
+ def keys(self):
+ return list(self._iterate_keys())
+
+ def values(self):
+ return list(self._iterate_values())
+
+ def items(self):
+ return list(self._iterate_items())
+
+
+def memoize(maxsize=None, Cache=LRUCache):
+
+ def _memoize(fun):
+ mutex = threading.Lock()
+ cache = Cache(limit=maxsize)
+
+ @wraps(fun)
+ def _M(*args, **kwargs):
+ key = args + (KEYWORD_MARK, ) + tuple(sorted(kwargs.items()))
+ try:
+ with mutex:
+ value = cache[key]
+ except KeyError:
+ value = fun(*args, **kwargs)
+ _M.misses += 1
+ with mutex:
+ cache[key] = value
+ else:
+ _M.hits += 1
+ return value
+
+ def clear():
+ """Clear the cache and reset cache statistics."""
+ cache.clear()
+ _M.hits = _M.misses = 0
+
+ _M.hits = _M.misses = 0
+ _M.clear = clear
+ _M.original_func = fun
+ return _M
+
+ return _memoize
class lazy(object):