diff options
-rw-r--r-- | kombu/async/http/__init__.py | 10 | ||||
-rw-r--r-- | kombu/async/http/base.py | 221 | ||||
-rw-r--r-- | kombu/async/http/curl.py | 273 | ||||
-rw-r--r-- | kombu/async/hub.py | 5 | ||||
-rw-r--r-- | kombu/exceptions.py | 14 | ||||
-rw-r--r-- | kombu/tests/async/test_http.py | 71 | ||||
-rw-r--r-- | kombu/utils/__init__.py | 10 | ||||
-rw-r--r-- | kombu/utils/functional.py | 139 |
8 files changed, 740 insertions, 3 deletions
diff --git a/kombu/async/http/__init__.py b/kombu/async/http/__init__.py new file mode 100644 index 00000000..8c18dfa9 --- /dev/null +++ b/kombu/async/http/__init__.py @@ -0,0 +1,10 @@ +from __future__ import absolute_import + +from .base import Request, Headers, Response + +__all__ = ['Client', 'Headers', 'Response', 'Request'] + + +def Client(hub, **kwargs): + from .curl import CurlClient + return CurlClient(hub, **kwargs) diff --git a/kombu/async/http/base.py b/kombu/async/http/base.py new file mode 100644 index 00000000..4b1c56cc --- /dev/null +++ b/kombu/async/http/base.py @@ -0,0 +1,221 @@ +from __future__ import absolute_import + +import sys + +from amqp import promise + +from kombu.exceptions import HttpError +from kombu.five import items +from kombu.utils import cached_property, coro +from kombu.utils.encoding import bytes_to_str +from kombu.utils.functional import memoize + +try: # pragma: no cover + from http.client import responses +except ImportError: + from httplib import responses # noqa + +__all__ = ['Headers', 'Response', 'Request'] + +PYPY = hasattr(sys, 'pypy_version_info') + + +@memoize(maxsize=1000) +def normalize_header(key): + return '-'.join(p.capitalize() for p in key.split('-')) + + +class Headers(dict): + # TODO: This is just a regular dict and will not perform normalization + # when looking up keys etc. + + #: Set when all of the headers have been read. + complete = False + + #: Internal attribute used to keep track of continuation lines. + _prev_key = None + + +class Request(object): + """A HTTP Request. + + :param url: The URL to request. + :param method: The HTTP method to use (defaults to ``GET``). + :keyword headers: Optional headers for this request + (:class:`dict` or :class:`~kombu.async.http.Headers`). + :keyword body: Optional body for this request. + :keyword connect_timeout: Connection timeout in float seconds + (default 30.0). + :keyword timeout: Time in float seconds before the request times out + (default 30.0). + :keyword follow_redirects: Specify if the client should follow redirects + (enabled by default). + :keyword max_redirects: Maximum number of redirects (default 6). + :keyword use_gzip: Allow the server to use gzip compression (enabled by + default). + :keyword validate_cert: Set to true if the server certificate should be + verified when performing ``https://`` requests (enabled by default). + :keyword user_agent: Custom user agent for this request. + :keyword network_interace: Network interface to use for this request. + :keyword on_ready: Callback to be called when the response has been + received. Must accept single ``response`` argument. + :kewyord on_stream: Optional callback to be called every time body content + has been read from the socket. If specified then the response body + and buffer attributes will not be available. + :keyword on_timeout: Optional callback to be called if the request + times out. + :keyword on_header: Optional callback to be called for every header line + received from the server. The signature is ``(headers, line)`` + and note that if you want ``response.headers`` to be populated + then your callback needs to also call + ``client.on_header(headers, line)``. + :keyword on_prepare: Optional callback that is implementation specific + (e.g. curl client will pass the ``curl`` instance to this callback). + :keyword proxy_host: Optional proxy host. Note that a ``proxy_port`` must + also be provided or a :exc:`ValueError` will be raised. + :keyword proxy_username: Optional username to use when logging in + to the proxy. + :keyword proxy_password: Optional password to use when authenticating + with the proxy server. + :keyword ca_certs: Custom CA certificates file to use. + :keyword client_key: Optional filename for client SSL key. + :keyword client_cert: Optional filename for client SSL certificate. + + """ + + url = headers = body = user_agent = network_interface = \ + on_stream = on_timeout = on_ready = on_header = on_prepare = \ + proxy_host = proxy_port = proxy_username = proxy_password = \ + ca_certs = client_key = client_cert = None + + method = 'GET' + connect_timeout = 30.0 + request_timeout = 30.0 + follow_redirects = True + max_redirects = 6 + use_gzip = True + validate_cert = True + + if PYPY: + __slots__ = ('__weakref__', ) + + def __init__(self, url, method='GET', on_ready=None, **kwargs): + self.url = url + self.method = method or self.method + self.on_ready = on_ready or promise() + if kwargs: + for k, v in items(kwargs): + setattr(self, k, v) + if not isinstance(self.headers, Headers): + self.headers = Headers(self.headers or {}) + + def then(self, callback, errback=None): + self.on_ready.then(callback, errback) + + +class Response(object): + """HTTP Response. + + :param request: See :attr:`request`. + :keyword code: See :attr:`code`. + :keyword headers: See :attr:`headers`. + :keyword buffer: See :attr:`buffer` + :keyword effective_url: See :attr:`effective_url`. + :keyword status: See :attr:`status`. + + """ + + # :class:`Request` object used to get this response. + request = None + + #: HTTP response code (e.g. 200, 404, or 500). + code = None + + #: HTTP headers for this response (:class:`Headers`). + headers = None + + #: Socket read buffer. + buffer = None + + #: The destination url for this request after following redirects. + effective_url = None + + #: Error instance if the request resulted in a HTTP error code. + + #: Human equivalent of :attr:`code`, e.g. ``OK``, `Not found`, or + #: 'Internal Server Error'. + status = None + + if PYPY: + __slots__ = ('__weakref__', ) + + def __init__(self, request, code, headers=None, buffer=None, + effective_url=None, error=None, status=None): + self.request = request + self.code = code + self.headers = headers if headers is not None else Headers() + self.buffer = buffer + self.effective_url = effective_url or request.url + + self.status = status or responses.get(self.code, 'Unknown') + self.error = error + if self.error is None and (self.code < 200 or self.code > 299): + self.error = HttpError(self.code, self.status, self) + + def raise_for_error(self): + """Raise :class:`~kombu.exceptions.HttpError` if the request resulted + in a HTTP error code.""" + if self.error: + raise self.error + + @cached_property + def body(self): + """The full contents of the response body. + + Note that accessing this propery will evaluate the buffer + and subsequent accesses will be cached. + + """ + if self.buffer is not None: + return self.buffer.getvalue() + + +@coro +def header_parser(keyt=normalize_header): + while 1: + (line, headers) = yield + if line.startswith('HTTP/'): + continue + elif not line: + headers.complete = True + continue + elif line[0].isspace(): + headers[headers._prev_key] = ' '.join(['', line.lstrip()]) + else: + key, value = line.split(':', 1) + key = headers._prev_key = keyt(key) + headers[key] = value.strip() + + +class BaseClient(object): + Headers = Headers + Request = Request + Response = Response + + def __init__(self, hub, **kwargs): + self.hub = hub + self._header_parser = header_parser() + + def perform(self, request, **kwargs): + if not isinstance(request, self.Request): + request = self.Request(request, **kwargs) + self.add_request(request) + + def close(self): + pass + + def on_header(self, headers, line): + try: + self._header_parser.send((bytes_to_str(line), headers)) + except StopIteration: + self._header_parser = header_parser() diff --git a/kombu/async/http/curl.py b/kombu/async/http/curl.py new file mode 100644 index 00000000..6d31463f --- /dev/null +++ b/kombu/async/http/curl.py @@ -0,0 +1,273 @@ +from __future__ import absolute_import + +import pycurl + +from collections import deque +from functools import partial +from io import BytesIO +from time import time + +from kombu.async.hub import READ, WRITE +from kombu.exceptions import HttpError +from kombu.five import items +from kombu.utils.encoding import bytes_to_str + +from .base import BaseClient + +try: + import pycurl # noqa +except ImportError: + pycurl = Curl = METH_TO_CURL = None # noqa +else: + from pycurl import Curl # noqa + + METH_TO_CURL = { # noqa + 'GET': pycurl.HTTPGET, + 'POST': pycurl.POST, + 'PUT': pycurl.UPLOAD, + 'HEAD': pycurl.NOBODY, + } + +__all__ = ['CurlClient'] + +DEFAULT_USER_AGENT = 'Mozilla/5.0 (compatible; pycurl)' +EXTRA_METHODS = frozenset(['DELETE', 'OPTIONS', 'PATCH']) + + +class CurlClient(BaseClient): + Curl = Curl + + def __init__(self, hub, max_clients=10): + if pycurl is None: + raise ImportError('The curl client requires the pycurl library.') + super(CurlClient, self).__init__(hub) + self.max_clients = max_clients + + self._multi = pycurl.CurlMulti() + self._multi.setopt(pycurl.M_TIMERFUNCTION, self._set_timeout) + self._multi.setopt(pycurl.M_SOCKETFUNCTION, self._handle_socket) + self._curls = [self.Curl() for i in range(max_clients)] + self._free_list = self._curls[:] + self._pending = deque() + self._fds = {} + + self._socket_action = self._multi.socket_action + self._timeout_check_tref = self.hub.call_repeatedly( + 1.0, self._timeout_check, + ) + + # pycurl 7.29.0 workaround + dummy_curl_handle = pycurl.Curl() + self._multi.add_handle(dummy_curl_handle) + self._multi.remove_handle(dummy_curl_handle) + + def __enter__(self): + return self + + def __exit__(self, *exc_info): + self.close() + + def close(self): + self._timeout_check_tref.cancel() + for _curl in self._curls: + _curl.close() + self._multi.close() + + def add_request(self, request): + self._pending.append(request) + self._process_queue() + self._set_timeout(0) + + def _handle_socket(self, event, fd, multi, data, _pycurl=pycurl): + if event == _pycurl.POLL_REMOVE: + if fd in self._fds: + self.hub.remove(fd) + self._fds.pop(fd, None) + else: + if fd in self._fds: + self.hub.remove(fd) + if event == _pycurl.POLL_IN: + self.hub.add_reader(fd, self.on_readable, fd) + self._fds[fd] = READ + elif event == _pycurl.POLL_OUT: + self.hub.add_writer(fd, self.on_writable, fd) + self._fds[fd] = WRITE + elif event == _pycurl.POLL_INOUT: + self.hub.add_reader(fd, self.on_readable, fd) + self.hub.add_writer(fd, self.on_writable, fd) + self._fds[fd] = READ | WRITE + + def _set_timeout(self, msecs): + pass # TODO + + def _timeout_check(self, _pycurl=pycurl): + while 1: + try: + ret, _ = self._multi.socket_all() + except pycurl.error as exc: + ret = exc.args[0] + if ret != _pycurl.E_CALL_MULTI_PERFORM: + break + self._process_pending_requests() + + def on_readable(self, fd, _pycurl=pycurl): + return self._on_event(fd, _pycurl.CSELECT_IN) + + def on_writable(self, fd, _pycurl=pycurl): + return self._on_event(fd, _pycurl.CSELECT_OUT) + + def _on_event(self, fd, event, _pycurl=pycurl): + while 1: + try: + ret, _ = self._socket_action(fd, event) + except pycurl.error as exc: + ret = exc.args[0] + if ret != _pycurl.E_CALL_MULTI_PERFORM: + break + self._process_pending_requests() + + def _process_pending_requests(self): + while 1: + q, succeeded, failed = self._multi.info_read() + for curl in succeeded: + self._process(curl) + for curl, errno, reason in failed: + self._process(curl, errno, reason) + if q == 0: + break + self._process_queue() + + def _process_queue(self): + while 1: + started = 0 + while self._free_list and self._pending: + started += 1 + curl = self._free_list.pop() + request = self._pending.popleft() + headers = self.Headers() + buf = BytesIO() + curl.info = { + 'headers': headers, + 'buffer': buf, + 'request': request, + 'curl_start_time': time(), + } + self._setup_request(curl, request, buf, headers) + self._multi.add_handle(curl) + if not started: + break + + def _process(self, curl, errno=None, reason=None, _pycurl=pycurl): + info, curl.info = curl.info, None + self._multi.remove_handle(curl) + self._free_list.append(curl) + buffer = info['buffer'] + if errno: + code = 599 + error = HttpError(code, reason) + error.errno = errno + effective_url = None + buffer.close() + buffer = None + else: + error = None + code = curl.getinfo(_pycurl.HTTP_CODE) + effective_url = curl.getinfo(_pycurl.EFFECTIVE_URL) + buffer.seek(0) + try: + request = info['request'] + request.on_ready(self.Response( + request=request, code=code, headers=info['headers'], + buffer=buffer, effective_url=effective_url, error=error, + )) + except Exception as exc: + raise + self.hub.on_callback_error(request.on_ready, exc) + + def _setup_request(self, curl, request, buffer, headers, _pycurl=pycurl): + setopt = curl.setopt + setopt(_pycurl.URL, bytes_to_str(request.url)) + + # see tornado curl client + request.headers.setdefault('Expect', '') + request.headers.setdefault('Pragma', '') + + curl.setopt( + _pycurl.HTTPHEADER, + ['%s %s'.format(h) for h in items(request.headers)], + ) + + setopt( + _pycurl.HEADERFUNCTION, + partial(request.on_header or self.on_header, request.headers), + ) + setopt( + _pycurl.WRITEFUNCTION, request.on_stream or buffer.write, + ) + setopt( + _pycurl.FOLLOWLOCATION, request.follow_redirects, + ) + setopt( + _pycurl.USERAGENT, + bytes_to_str(request.user_agent or DEFAULT_USER_AGENT), + ) + if request.network_interface: + setopt(_pycurl.INTERFACE, request.network_interface) + setopt( + _pycurl.ENCODING, 'gzip,deflate' if request.use_gzip else 'none', + ) + if request.proxy_host: + if not request.proxy_port: + raise ValueError('Request with proxy_host but no proxy_port') + setopt(_pycurl.PROXY, request.proxy_host) + setopt(_pycurl.PROXYPORT, request.proxy_port) + if request.proxy_username: + setopt(_pycurl.PROXYUSERPWD, '{0}:{1}'.format( + request.proxy_username, request.proxy_password or '')) + else: + setopt(_pycurl.PROXY, '') + curl.unsetopt(_pycurl.PROXYUSERPWD) + + setopt(_pycurl.SSL_VERIFYPEER, 1 if request.validate_cert else 0) + setopt(_pycurl.SSL_VERIFYHOST, 2 if request.validate_cert else 0) + if request.ca_certs is not None: + setopt(_pycurl.CAINFO, request.ca_certs) + + setopt(_pycurl.IPRESOLVE, pycurl.IPRESOLVE_WHATEVER) + + for meth in METH_TO_CURL.values(): + setopt(meth, False) + try: + meth = METH_TO_CURL[request.method] + except KeyError: + curl.setopt(_pycurl.CUSTOMREQUEST, request.method) + else: + curl.unsetopt(_pycurl.CUSTOMREQUEST) + setopt(meth, True) + + if request.method in ('POST', 'PUT'): + assert request.body is not None + reqbuffer = BytesIO(request.body) + setopt(_pycurl.READFUNCTION, reqbuffer.read) + if request.method == 'POST': + + def ioctl(cmd): + if cmd == _pycurl.IOCMD_RESTARTREAD: + reqbuffer.seek(0) + setopt(_pycurl.IOCTLFUNCTION, ioctl) + setopt(_pycurl.POSTFIELDSIZE, len(request.body)) + else: + setopt(_pycurl.INFILESIZE, len(request.body)) + elif request.method == 'GET': + assert request.body is None + + # TODO Does not support Basic AUTH + curl.unsetopt(_pycurl.USERPWD) + + if request.client_cert is not None: + setopt(_pycurl.SSLCERT, request.client_cert) + if request.client_key is not None: + setopt(_pycurl.SSLKEY, request.client_key) + + if request.on_prepare is not None: + request.on_prepare(curl) diff --git a/kombu/async/hub.py b/kombu/async/hub.py index 673a60ac..264f8188 100644 --- a/kombu/async/hub.py +++ b/kombu/async/hub.py @@ -247,6 +247,11 @@ class Hub(object): self.writers.pop(fd, None) self.consolidate.discard(fd) + def on_callback_error(self, callback, exc): + logger.error( + 'Callback %r raised exception: %r', callback, exc, exc_info=1, + ) + def create_loop(self, generator=generator, sleep=sleep, min=min, next=next, Empty=Empty, StopIteration=StopIteration, diff --git a/kombu/exceptions.py b/kombu/exceptions.py index 716bc693..0798bf7a 100644 --- a/kombu/exceptions.py +++ b/kombu/exceptions.py @@ -15,7 +15,7 @@ __all__ = ['NotBoundError', 'MessageStateError', 'TimeoutError', 'LimitExceeded', 'ConnectionLimitExceeded', 'ChannelLimitExceeded', 'ConnectionError', 'ChannelError', 'VersionMismatch', 'SerializerNotInstalled', 'ResourceError', - 'SerializationError', 'EncodeError', 'DecodeError'] + 'SerializationError', 'EncodeError', 'DecodeError', 'HttpError'] TimeoutError = socket.timeout @@ -81,3 +81,15 @@ class InconsistencyError(ConnectionError): """Data or environment has been found to be inconsistent, depending on the cause it may be possible to retry the operation.""" pass + + +class HttpError(Exception): + + def __init__(self, code, message=None, response=None): + self.code = code + self.message = message + self.response = response + super(HttpError, self).__init__(code, message, response) + + def __str__(self): + return 'HTTP {0.code}: {0.message}'.format(self) diff --git a/kombu/tests/async/test_http.py b/kombu/tests/async/test_http.py new file mode 100644 index 00000000..785f9e82 --- /dev/null +++ b/kombu/tests/async/test_http.py @@ -0,0 +1,71 @@ +from __future__ import absolute_import + +from amqp import promise +from kombu.async import Hub +from kombu.async import http + +from kombu.tests.case import Case, Mock + + +class test_Request(Case): + + def test_init(self): + x = http.Request('http://foo', method='POST') + self.assertEqual(x.url, 'http://foo') + self.assertEqual(x.method, 'POST') + + x = http.Request('x', max_redirects=100) + self.assertEqual(x.max_redirects, 100) + + self.assertIsInstance(x.headers, http.Headers) + h = http.Headers() + x = http.Request('x', headers=h) + self.assertIs(x.headers, h) + self.assertIsInstance(x.on_ready, promise) + + def test_then(self): + callback = Mock() + x = http.Request('http://foo') + x.then(callback) + + x.on_ready(1) + callback.assert_called_with(1) + + +class test_Client(Case): + + def test_get_request(self): + hub = Hub() + callback = Mock(name='callback') + + def on_ready(response): + print('{0.effective_url} -> {0.code}'.format(response)) + on_ready = promise(on_ready) + on_ready.then(callback) + requests = [ + http.Request( + 'http://localhost:8000/README.rst', + on_ready=on_ready, + ), + http.Request( + 'http://localhost:8000/AUTHORS', + on_ready=on_ready, + ), + http.Request( + 'http://localhost:8000/pavement.py', + on_ready=on_ready, + ), + http.Request( + 'http://localhost:8000/setup.py', + on_ready=on_ready, + ), + ] + client = http.Client(hub) + for request in requests: + request.then(promise(callback)) + client.perform(request) + + print('START PERFORM') + while callback.call_count < len(requests): + hub.run_once() + print('-END PERFORM') diff --git a/kombu/utils/__init__.py b/kombu/utils/__init__.py index 0745ddfe..5d5e7bfa 100644 --- a/kombu/utils/__init__.py +++ b/kombu/utils/__init__.py @@ -448,3 +448,13 @@ def maybe_fileno(f): return fileno(f) except FILENO_ERRORS: pass + + +def coro(gen): + + @wraps(gen) + def wind_up(*args, **kwargs): + it = gen(*args, **kwargs) + next(it) + return it + return wind_up diff --git a/kombu/utils/functional.py b/kombu/utils/functional.py index 746f42fa..f959520b 100644 --- a/kombu/utils/functional.py +++ b/kombu/utils/functional.py @@ -1,12 +1,147 @@ from __future__ import absolute_import import sys +import threading from collections import Iterable, Mapping +from functools import wraps +from itertools import islice -from kombu.five import string_t +from kombu.five import UserDict, string_t, keys -__all__ = ['lazy', 'maybe_evaluate', 'is_list', 'maybe_list'] +from .compat import OrderedDict + +__all__ = ['LRUCache', 'memoize', 'lazy', 'maybe_evaluate', + 'is_list', 'maybe_list'] +KEYWORD_MARK = object() + + +class LRUCache(UserDict): + """LRU Cache implementation using a doubly linked list to track access. + + :keyword limit: The maximum number of keys to keep in the cache. + When a new key is inserted and the limit has been exceeded, + the *Least Recently Used* key will be discarded from the + cache. + + """ + + def __init__(self, limit=None): + self.limit = limit + self.mutex = threading.RLock() + self.data = OrderedDict() + + def __getitem__(self, key): + with self.mutex: + value = self[key] = self.data.pop(key) + return value + + def update(self, *args, **kwargs): + with self.mutex: + data, limit = self.data, self.limit + data.update(*args, **kwargs) + if limit and len(data) > limit: + # pop additional items in case limit exceeded + # negative overflow will lead to an empty list + for item in islice(iter(data), len(data) - limit): + data.pop(item) + + def __setitem__(self, key, value): + # remove least recently used key. + with self.mutex: + if self.limit and len(self.data) >= self.limit: + self.data.pop(next(iter(self.data))) + self.data[key] = value + + def __iter__(self): + return iter(self.data) + + def _iterate_items(self): + for k in self: + try: + yield (k, self.data[k]) + except KeyError: # pragma: no cover + pass + iteritems = _iterate_items + + def _iterate_values(self): + for k in self: + try: + yield self.data[k] + except KeyError: # pragma: no cover + pass + itervalues = _iterate_values + + def _iterate_keys(self): + # userdict.keys in py3k calls __getitem__ + return keys(self.data) + iterkeys = _iterate_keys + + def incr(self, key, delta=1): + with self.mutex: + # this acts as memcached does- store as a string, but return a + # integer as long as it exists and we can cast it + newval = int(self.data.pop(key)) + delta + self[key] = str(newval) + return newval + + def __getstate__(self): + d = dict(vars(self)) + d.pop('mutex') + return d + + def __setstate__(self, state): + self.__dict__ = state + self.mutex = threading.RLock() + + if sys.version_info[0] == 3: # pragma: no cover + keys = _iterate_keys + values = _iterate_values + items = _iterate_items + else: # noqa + + def keys(self): + return list(self._iterate_keys()) + + def values(self): + return list(self._iterate_values()) + + def items(self): + return list(self._iterate_items()) + + +def memoize(maxsize=None, Cache=LRUCache): + + def _memoize(fun): + mutex = threading.Lock() + cache = Cache(limit=maxsize) + + @wraps(fun) + def _M(*args, **kwargs): + key = args + (KEYWORD_MARK, ) + tuple(sorted(kwargs.items())) + try: + with mutex: + value = cache[key] + except KeyError: + value = fun(*args, **kwargs) + _M.misses += 1 + with mutex: + cache[key] = value + else: + _M.hits += 1 + return value + + def clear(): + """Clear the cache and reset cache statistics.""" + cache.clear() + _M.hits = _M.misses = 0 + + _M.hits = _M.misses = 0 + _M.clear = clear + _M.original_func = fun + return _M + + return _memoize class lazy(object): |