diff options
Diffstat (limited to 'kombu/async/http/curl.py')
-rw-r--r-- | kombu/async/http/curl.py | 273 |
1 files changed, 273 insertions, 0 deletions
diff --git a/kombu/async/http/curl.py b/kombu/async/http/curl.py new file mode 100644 index 00000000..6d31463f --- /dev/null +++ b/kombu/async/http/curl.py @@ -0,0 +1,273 @@ +from __future__ import absolute_import + +import pycurl + +from collections import deque +from functools import partial +from io import BytesIO +from time import time + +from kombu.async.hub import READ, WRITE +from kombu.exceptions import HttpError +from kombu.five import items +from kombu.utils.encoding import bytes_to_str + +from .base import BaseClient + +try: + import pycurl # noqa +except ImportError: + pycurl = Curl = METH_TO_CURL = None # noqa +else: + from pycurl import Curl # noqa + + METH_TO_CURL = { # noqa + 'GET': pycurl.HTTPGET, + 'POST': pycurl.POST, + 'PUT': pycurl.UPLOAD, + 'HEAD': pycurl.NOBODY, + } + +__all__ = ['CurlClient'] + +DEFAULT_USER_AGENT = 'Mozilla/5.0 (compatible; pycurl)' +EXTRA_METHODS = frozenset(['DELETE', 'OPTIONS', 'PATCH']) + + +class CurlClient(BaseClient): + Curl = Curl + + def __init__(self, hub, max_clients=10): + if pycurl is None: + raise ImportError('The curl client requires the pycurl library.') + super(CurlClient, self).__init__(hub) + self.max_clients = max_clients + + self._multi = pycurl.CurlMulti() + self._multi.setopt(pycurl.M_TIMERFUNCTION, self._set_timeout) + self._multi.setopt(pycurl.M_SOCKETFUNCTION, self._handle_socket) + self._curls = [self.Curl() for i in range(max_clients)] + self._free_list = self._curls[:] + self._pending = deque() + self._fds = {} + + self._socket_action = self._multi.socket_action + self._timeout_check_tref = self.hub.call_repeatedly( + 1.0, self._timeout_check, + ) + + # pycurl 7.29.0 workaround + dummy_curl_handle = pycurl.Curl() + self._multi.add_handle(dummy_curl_handle) + self._multi.remove_handle(dummy_curl_handle) + + def __enter__(self): + return self + + def __exit__(self, *exc_info): + self.close() + + def close(self): + self._timeout_check_tref.cancel() + for _curl in self._curls: + _curl.close() + self._multi.close() + + def add_request(self, request): + self._pending.append(request) + self._process_queue() + self._set_timeout(0) + + def _handle_socket(self, event, fd, multi, data, _pycurl=pycurl): + if event == _pycurl.POLL_REMOVE: + if fd in self._fds: + self.hub.remove(fd) + self._fds.pop(fd, None) + else: + if fd in self._fds: + self.hub.remove(fd) + if event == _pycurl.POLL_IN: + self.hub.add_reader(fd, self.on_readable, fd) + self._fds[fd] = READ + elif event == _pycurl.POLL_OUT: + self.hub.add_writer(fd, self.on_writable, fd) + self._fds[fd] = WRITE + elif event == _pycurl.POLL_INOUT: + self.hub.add_reader(fd, self.on_readable, fd) + self.hub.add_writer(fd, self.on_writable, fd) + self._fds[fd] = READ | WRITE + + def _set_timeout(self, msecs): + pass # TODO + + def _timeout_check(self, _pycurl=pycurl): + while 1: + try: + ret, _ = self._multi.socket_all() + except pycurl.error as exc: + ret = exc.args[0] + if ret != _pycurl.E_CALL_MULTI_PERFORM: + break + self._process_pending_requests() + + def on_readable(self, fd, _pycurl=pycurl): + return self._on_event(fd, _pycurl.CSELECT_IN) + + def on_writable(self, fd, _pycurl=pycurl): + return self._on_event(fd, _pycurl.CSELECT_OUT) + + def _on_event(self, fd, event, _pycurl=pycurl): + while 1: + try: + ret, _ = self._socket_action(fd, event) + except pycurl.error as exc: + ret = exc.args[0] + if ret != _pycurl.E_CALL_MULTI_PERFORM: + break + self._process_pending_requests() + + def _process_pending_requests(self): + while 1: + q, succeeded, failed = self._multi.info_read() + for curl in succeeded: + self._process(curl) + for curl, errno, reason in failed: + self._process(curl, errno, reason) + if q == 0: + break + self._process_queue() + + def _process_queue(self): + while 1: + started = 0 + while self._free_list and self._pending: + started += 1 + curl = self._free_list.pop() + request = self._pending.popleft() + headers = self.Headers() + buf = BytesIO() + curl.info = { + 'headers': headers, + 'buffer': buf, + 'request': request, + 'curl_start_time': time(), + } + self._setup_request(curl, request, buf, headers) + self._multi.add_handle(curl) + if not started: + break + + def _process(self, curl, errno=None, reason=None, _pycurl=pycurl): + info, curl.info = curl.info, None + self._multi.remove_handle(curl) + self._free_list.append(curl) + buffer = info['buffer'] + if errno: + code = 599 + error = HttpError(code, reason) + error.errno = errno + effective_url = None + buffer.close() + buffer = None + else: + error = None + code = curl.getinfo(_pycurl.HTTP_CODE) + effective_url = curl.getinfo(_pycurl.EFFECTIVE_URL) + buffer.seek(0) + try: + request = info['request'] + request.on_ready(self.Response( + request=request, code=code, headers=info['headers'], + buffer=buffer, effective_url=effective_url, error=error, + )) + except Exception as exc: + raise + self.hub.on_callback_error(request.on_ready, exc) + + def _setup_request(self, curl, request, buffer, headers, _pycurl=pycurl): + setopt = curl.setopt + setopt(_pycurl.URL, bytes_to_str(request.url)) + + # see tornado curl client + request.headers.setdefault('Expect', '') + request.headers.setdefault('Pragma', '') + + curl.setopt( + _pycurl.HTTPHEADER, + ['%s %s'.format(h) for h in items(request.headers)], + ) + + setopt( + _pycurl.HEADERFUNCTION, + partial(request.on_header or self.on_header, request.headers), + ) + setopt( + _pycurl.WRITEFUNCTION, request.on_stream or buffer.write, + ) + setopt( + _pycurl.FOLLOWLOCATION, request.follow_redirects, + ) + setopt( + _pycurl.USERAGENT, + bytes_to_str(request.user_agent or DEFAULT_USER_AGENT), + ) + if request.network_interface: + setopt(_pycurl.INTERFACE, request.network_interface) + setopt( + _pycurl.ENCODING, 'gzip,deflate' if request.use_gzip else 'none', + ) + if request.proxy_host: + if not request.proxy_port: + raise ValueError('Request with proxy_host but no proxy_port') + setopt(_pycurl.PROXY, request.proxy_host) + setopt(_pycurl.PROXYPORT, request.proxy_port) + if request.proxy_username: + setopt(_pycurl.PROXYUSERPWD, '{0}:{1}'.format( + request.proxy_username, request.proxy_password or '')) + else: + setopt(_pycurl.PROXY, '') + curl.unsetopt(_pycurl.PROXYUSERPWD) + + setopt(_pycurl.SSL_VERIFYPEER, 1 if request.validate_cert else 0) + setopt(_pycurl.SSL_VERIFYHOST, 2 if request.validate_cert else 0) + if request.ca_certs is not None: + setopt(_pycurl.CAINFO, request.ca_certs) + + setopt(_pycurl.IPRESOLVE, pycurl.IPRESOLVE_WHATEVER) + + for meth in METH_TO_CURL.values(): + setopt(meth, False) + try: + meth = METH_TO_CURL[request.method] + except KeyError: + curl.setopt(_pycurl.CUSTOMREQUEST, request.method) + else: + curl.unsetopt(_pycurl.CUSTOMREQUEST) + setopt(meth, True) + + if request.method in ('POST', 'PUT'): + assert request.body is not None + reqbuffer = BytesIO(request.body) + setopt(_pycurl.READFUNCTION, reqbuffer.read) + if request.method == 'POST': + + def ioctl(cmd): + if cmd == _pycurl.IOCMD_RESTARTREAD: + reqbuffer.seek(0) + setopt(_pycurl.IOCTLFUNCTION, ioctl) + setopt(_pycurl.POSTFIELDSIZE, len(request.body)) + else: + setopt(_pycurl.INFILESIZE, len(request.body)) + elif request.method == 'GET': + assert request.body is None + + # TODO Does not support Basic AUTH + curl.unsetopt(_pycurl.USERPWD) + + if request.client_cert is not None: + setopt(_pycurl.SSLCERT, request.client_cert) + if request.client_key is not None: + setopt(_pycurl.SSLKEY, request.client_key) + + if request.on_prepare is not None: + request.on_prepare(curl) |