diff options
author | Guido van Rossum <guido@google.com> | 2012-12-18 10:23:53 -0800 |
---|---|---|
committer | Guido van Rossum <guido@google.com> | 2012-12-18 10:23:53 -0800 |
commit | 43bd19fa201946d5fa70b721866e20d40cf40498 (patch) | |
tree | 906a6d29386807b24d478df9715b19e8cd68e860 /old | |
parent | 54e24f2ba2155ed59b4963ad6c040ac1d3c75df8 (diff) | |
download | trollius-43bd19fa201946d5fa70b721866e20d40cf40498.tar.gz |
Move old stuff into old/.
Diffstat (limited to 'old')
-rw-r--r-- | old/Makefile | 16 | ||||
-rw-r--r-- | old/echoclt.py | 79 | ||||
-rw-r--r-- | old/echosvr.py | 60 | ||||
-rw-r--r-- | old/http_client.py | 78 | ||||
-rw-r--r-- | old/http_server.py | 68 | ||||
-rw-r--r-- | old/main.py | 134 | ||||
-rw-r--r-- | old/p3time.py | 47 | ||||
-rw-r--r-- | old/polling.py | 535 | ||||
-rw-r--r-- | old/scheduling.py | 354 | ||||
-rw-r--r-- | old/sockets.py | 348 | ||||
-rw-r--r-- | old/transports.py | 496 | ||||
-rwxr-xr-x | old/xkcd.py | 18 | ||||
-rw-r--r-- | old/yyftime.py | 75 |
13 files changed, 2308 insertions, 0 deletions
diff --git a/old/Makefile b/old/Makefile new file mode 100644 index 0000000..d352cd7 --- /dev/null +++ b/old/Makefile @@ -0,0 +1,16 @@ +PYTHON=python3 + +main: + $(PYTHON) main.py -v + +echo: + $(PYTHON) echosvr.py -v + +profile: + $(PYTHON) -m profile -s time main.py + +time: + $(PYTHON) p3time.py + +ytime: + $(PYTHON) yyftime.py diff --git a/old/echoclt.py b/old/echoclt.py new file mode 100644 index 0000000..c24c573 --- /dev/null +++ b/old/echoclt.py @@ -0,0 +1,79 @@ +#!/usr/bin/env python3.3 +"""Example echo client.""" + +# Stdlib imports. +import logging +import socket +import sys +import time + +# Local imports. +import scheduling +import sockets + + +def echoclient(host, port): + """COROUTINE""" + testdata = b'hi hi hi ha ha ha\n' + try: + trans = yield from sockets.create_transport(host, port, + af=socket.AF_INET) + except OSError: + return False + try: + ok = yield from trans.send(testdata) + if ok: + response = yield from trans.recv(100) + ok = response == testdata.upper() + return ok + finally: + trans.close() + + +def doit(n): + """COROUTINE""" + t0 = time.time() + tasks = set() + for i in range(n): + t = scheduling.Task(echoclient('127.0.0.1', 1111), 'client-%d' % i) + tasks.add(t) + ok = 0 + bad = 0 + for t in tasks: + try: + yield from t + except Exception: + bad += 1 + else: + ok += 1 + t1 = time.time() + print('ok: ', ok) + print('bad:', bad) + print('dt: ', round(t1-t0, 6)) + + +def main(): + # Initialize logging. + if '-d' in sys.argv: + level = logging.DEBUG + elif '-v' in sys.argv: + level = logging.INFO + elif '-q' in sys.argv: + level = logging.ERROR + else: + level = logging.WARN + logging.basicConfig(level=level) + + # Get integer from command line. + n = 1 + for arg in sys.argv[1:]: + if not arg.startswith('-'): + n = int(arg) + break + + # Run scheduler, starting it off with doit(). + scheduling.run(doit(n)) + + +if __name__ == '__main__': + main() diff --git a/old/echosvr.py b/old/echosvr.py new file mode 100644 index 0000000..4085f4c --- /dev/null +++ b/old/echosvr.py @@ -0,0 +1,60 @@ +#!/usr/bin/env python3.3 +"""Example echo server.""" + +# Stdlib imports. +import logging +import socket +import sys + +# Local imports. +import scheduling +import sockets + + +def handler(conn, addr): + """COROUTINE: Handle one connection.""" + logging.info('Accepting connection from %r', addr) + trans = sockets.SocketTransport(conn) + rdr = sockets.BufferedReader(trans) + while True: + line = yield from rdr.readline() + logging.debug('Received: %r from %r', line, addr) + if not line: + break + yield from trans.send(line.upper()) + logging.debug('Closing %r', addr) + trans.close() + + +def doit(): + """COROUTINE: Set the wheels in motion.""" + # Set up listener. + listener = yield from sockets.create_listener('localhost', 1111, + af=socket.AF_INET, + backlog=100) + logging.info('Listening on %r', listener.sock.getsockname()) + + # Loop accepting connections. + while True: + conn, addr = yield from listener.accept() + t = scheduling.Task(handler(conn, addr)) + + +def main(): + # Initialize logging. + if '-d' in sys.argv: + level = logging.DEBUG + elif '-v' in sys.argv: + level = logging.INFO + elif '-q' in sys.argv: + level = logging.ERROR + else: + level = logging.WARN + logging.basicConfig(level=level) + + # Run scheduler, starting it off with doit(). + scheduling.run(doit()) + + +if __name__ == '__main__': + main() diff --git a/old/http_client.py b/old/http_client.py new file mode 100644 index 0000000..8937ba2 --- /dev/null +++ b/old/http_client.py @@ -0,0 +1,78 @@ +"""Crummy HTTP client. + +This is not meant as an example of how to write a good client. +""" + +# Stdlib. +import re +import time + +# Local. +import sockets + + +def urlfetch(host, port=None, path='/', method='GET', + body=None, hdrs=None, encoding='utf-8', ssl=None, af=0): + """COROUTINE: Make an HTTP 1.0 request.""" + t0 = time.time() + if port is None: + if ssl: + port = 443 + else: + port = 80 + trans = yield from sockets.create_transport(host, port, ssl=ssl, af=af) + yield from trans.send(method.encode(encoding) + b' ' + + path.encode(encoding) + b' HTTP/1.0\r\n') + if hdrs: + kwds = dict(hdrs) + else: + kwds = {} + if 'host' not in kwds: + kwds['host'] = host + if body is not None: + kwds['content_length'] = len(body) + for header, value in kwds.items(): + yield from trans.send(header.replace('_', '-').encode(encoding) + + b': ' + value.encode(encoding) + b'\r\n') + + yield from trans.send(b'\r\n') + if body is not None: + yield from trans.send(body) + + # Read HTTP response line. + rdr = sockets.BufferedReader(trans) + resp = yield from rdr.readline() + m = re.match(br'(?ix) http/(\d\.\d) \s+ (\d\d\d) \s+ ([^\r]*)\r?\n\Z', + resp) + if not m: + trans.close() + raise IOError('No valid HTTP response: %r' % resp) + http_version, status, message = m.groups() + + # Read HTTP headers. + headers = [] + hdict = {} + while True: + line = yield from rdr.readline() + if not line.strip(): + break + m = re.match(br'([^\s:]+):\s*([^\r]*)\r?\n\Z', line) + if not m: + raise IOError('Invalid header: %r' % line) + header, value = m.groups() + headers.append((header, value)) + hdict[header.decode(encoding).lower()] = value.decode(encoding) + + # Read response body. + content_length = hdict.get('content-length') + if content_length is not None: + size = int(content_length) # TODO: Catch errors. + assert size >= 0, size + else: + size = 2**20 # Protective limit (1 MB). + data = yield from rdr.readexactly(size) + trans.close() # Can this block? + t1 = time.time() + result = (host, port, path, int(status), len(data), round(t1-t0, 3)) +## print(result) + return result diff --git a/old/http_server.py b/old/http_server.py new file mode 100644 index 0000000..2b1e3dd --- /dev/null +++ b/old/http_server.py @@ -0,0 +1,68 @@ +#!/usr/bin/env python3.3 +"""Simple HTTP server. + +This currenty exists just so we can benchmark this thing! +""" + +# Stdlib imports. +import logging +import re +import socket +import sys + +# Local imports. +import scheduling +import sockets + + +def handler(conn, addr): + """COROUTINE: Handle one connection.""" + ##logging.info('Accepting connection from %r', addr) + trans = sockets.SocketTransport(conn) + rdr = sockets.BufferedReader(trans) + + # Read but ignore request line. + request_line = yield from rdr.readline() + + # Consume headers but don't interpret them. + while True: + header_line = yield from rdr.readline() + if not header_line.strip(): + break + + # Always send an empty 200 response and close. + yield from trans.send(b'HTTP/1.0 200 Ok\r\n\r\n') + trans.close() + + +def doit(): + """COROUTINE: Set the wheels in motion.""" + # Set up listener. + listener = yield from sockets.create_listener('localhost', 8080, + af=socket.AF_INET) + logging.info('Listening on %r', listener.sock.getsockname()) + + # Loop accepting connections. + while True: + conn, addr = yield from listener.accept() + t = scheduling.Task(handler(conn, addr)) + + +def main(): + # Initialize logging. + if '-d' in sys.argv: + level = logging.DEBUG + elif '-v' in sys.argv: + level = logging.INFO + elif '-q' in sys.argv: + level = logging.ERROR + else: + level = logging.WARN + logging.basicConfig(level=level) + + # Run scheduler, starting it off with doit(). + scheduling.run(doit()) + + +if __name__ == '__main__': + main() diff --git a/old/main.py b/old/main.py new file mode 100644 index 0000000..c1f9d0a --- /dev/null +++ b/old/main.py @@ -0,0 +1,134 @@ +#!/usr/bin/env python3.3 +"""Example HTTP client using yield-from coroutines (PEP 380). + +Requires Python 3.3. + +There are many micro-optimizations possible here, but that's not the point. + +Some incomplete laundry lists: + +TODO: +- Take test urls from command line. +- Move urlfetch to a separate module. +- Profiling. +- Docstrings. +- Unittests. + +FUNCTIONALITY: +- Connection pool (keep connection open). +- Chunked encoding (request and response). +- Pipelining, e.g. zlib (request and response). +- Automatic encoding/decoding. +""" + +__author__ = 'Guido van Rossum <guido@python.org>' + +# Standard library imports (keep in alphabetic order). +import logging +import os +import time +import socket +import sys + +# Local imports (keep in alphabetic order). +import scheduling +import http_client + + + +def doit2(): + argses = [ + ('localhost', 8080, '/'), + ('127.0.0.1', 8080, '/home'), + ('python.org', 80, '/'), + ('xkcd.com', 443, '/'), + ] + results = yield from scheduling.map_over( + lambda args: http_client.urlfetch(*args), argses, timeout=2) + for res in results: + print('-->', res) + return [] + + +def doit(): + TIMEOUT = 2 + tasks = set() + + # This references NDB's default test service. + # (Sadly the service is single-threaded.) + task1 = scheduling.Task(http_client.urlfetch('localhost', 8080, path='/'), + 'root', timeout=TIMEOUT) + tasks.add(task1) + task2 = scheduling.Task(http_client.urlfetch('127.0.0.1', 8080, + path='/home'), + 'home', timeout=TIMEOUT) + tasks.add(task2) + + # Fetch python.org home page. + task3 = scheduling.Task(http_client.urlfetch('python.org', 80, path='/'), + 'python', timeout=TIMEOUT) + tasks.add(task3) + + # Fetch XKCD home page using SSL. (Doesn't like IPv6.) + task4 = scheduling.Task(http_client.urlfetch('xkcd.com', ssl=True, path='/', + af=socket.AF_INET), + 'xkcd', timeout=TIMEOUT) + tasks.add(task4) + +## # Fetch many links from python.org (/x.y.z). +## for x in '123': +## for y in '0123456789': +## path = '/{}.{}'.format(x, y) +## g = http_client.urlfetch('82.94.164.162', 80, +## path=path, hdrs={'host': 'python.org'}) +## t = scheduling.Task(g, path, timeout=2) +## tasks.add(t) + +## print(tasks) + yield from scheduling.Task(scheduling.sleep(1), timeout=0.2).wait() + winners = yield from scheduling.wait_any(tasks) + print('And the winners are:', [w.name for w in winners]) + tasks = yield from scheduling.wait_all(tasks) + print('And the players were:', [t.name for t in tasks]) + return tasks + + +def logtimes(real): + utime, stime, cutime, cstime, unused = os.times() + logging.info('real %10.3f', real) + logging.info('user %10.3f', utime + cutime) + logging.info('sys %10.3f', stime + cstime) + + +def main(): + t0 = time.time() + + # Initialize logging. + if '-d' in sys.argv: + level = logging.DEBUG + elif '-v' in sys.argv: + level = logging.INFO + elif '-q' in sys.argv: + level = logging.ERROR + else: + level = logging.WARN + logging.basicConfig(level=level) + + # Run scheduler, starting it off with doit(). + task = scheduling.run(doit()) + if task.exception: + print('Exception:', repr(task.exception)) + if isinstance(task.exception, AssertionError): + raise task.exception + else: + for t in task.result: + print(t.name + ':', + repr(t.exception) if t.exception else t.result) + + # Report real, user, sys times. + t1 = time.time() + logtimes(t1-t0) + + +if __name__ == '__main__': + main() diff --git a/old/p3time.py b/old/p3time.py new file mode 100644 index 0000000..35e14c9 --- /dev/null +++ b/old/p3time.py @@ -0,0 +1,47 @@ +"""Compare timing of plain vs. yield-from calls.""" + +import gc +import time + +def plain(n): + if n <= 0: + return 1 + l = plain(n-1) + r = plain(n-1) + return l + 1 + r + +def coroutine(n): + if n <= 0: + return 1 + l = yield from coroutine(n-1) + r = yield from coroutine(n-1) + return l + 1 + r + +def submain(depth): + t0 = time.time() + k = plain(depth) + t1 = time.time() + fmt = ' {} {} {:-9,.5f}' + delta0 = t1-t0 + print(('plain' + fmt).format(depth, k, delta0)) + + t0 = time.time() + try: + g = coroutine(depth) + while True: + next(g) + except StopIteration as err: + k = err.value + t1 = time.time() + delta1 = t1-t0 + print(('coro.' + fmt).format(depth, k, delta1)) + if delta0: + print(('relat' + fmt).format(depth, k, delta1/delta0)) + +def main(reasonable=16): + gc.disable() + for depth in range(reasonable): + submain(depth) + +if __name__ == '__main__': + main() diff --git a/old/polling.py b/old/polling.py new file mode 100644 index 0000000..cffed2a --- /dev/null +++ b/old/polling.py @@ -0,0 +1,535 @@ +"""Event loop and related classes. + +The event loop can be broken up into a pollster (the part responsible +for telling us when file descriptors are ready) and the event loop +proper, which wraps a pollster with functionality for scheduling +callbacks, immediately or at a given time in the future. + +Whenever a public API takes a callback, subsequent positional +arguments will be passed to the callback if/when it is called. This +avoids the proliferation of trivial lambdas implementing closures. +Keyword arguments for the callback are not supported; this is a +conscious design decision, leaving the door open for keyword arguments +to modify the meaning of the API call itself. + +There are several implementations of the pollster part, several using +esoteric system calls that exist only on some platforms. These are: + +- kqueue (most BSD systems) +- epoll (newer Linux systems) +- poll (most UNIX systems) +- select (all UNIX systems, and Windows) +- TODO: Support IOCP on Windows and some UNIX platforms. + +NOTE: We don't use select on systems where any of the others is +available, because select performs poorly as the number of file +descriptors goes up. The ranking is roughly: + + 1. kqueue, epoll, IOCP + 2. poll + 3. select + +TODO: +- Optimize the various pollsters. +- Unittests. +""" + +import collections +import concurrent.futures +import heapq +import logging +import os +import select +import threading +import time + + +class PollsterBase: + """Base class for all polling implementations. + + This defines an interface to register and unregister readers and + writers for specific file descriptors, and an interface to get a + list of events. There's also an interface to check whether any + readers or writers are currently registered. + """ + + def __init__(self): + super().__init__() + self.readers = {} # {fd: token, ...}. + self.writers = {} # {fd: token, ...}. + + def pollable(self): + """Return True if any readers or writers are currently registered.""" + return bool(self.readers or self.writers) + + # Subclasses are expected to extend the add/remove methods. + + def register_reader(self, fd, token): + """Add or update a reader for a file descriptor.""" + self.readers[fd] = token + + def register_writer(self, fd, token): + """Add or update a writer for a file descriptor.""" + self.writers[fd] = token + + def unregister_reader(self, fd): + """Remove the reader for a file descriptor.""" + del self.readers[fd] + + def unregister_writer(self, fd): + """Remove the writer for a file descriptor.""" + del self.writers[fd] + + def poll(self, timeout=None): + """Poll for events. A subclass must implement this. + + If timeout is omitted or None, this blocks until at least one + event is ready. Otherwise, timeout gives a maximum time to + wait (an int of float in seconds) -- the method returns as + soon as at least one event is ready or when the timeout is + expired. For a non-blocking poll, pass 0. + + The return value is a list of events; it is empty when the + timeout expired before any events were ready. Each event + is a token previously passed to register_reader/writer(). + """ + raise NotImplementedError + + +class SelectPollster(PollsterBase): + """Pollster implementation using select.""" + + def poll(self, timeout=None): + readable, writable, _ = select.select(self.readers, self.writers, + [], timeout) + events = [] + events += (self.readers[fd] for fd in readable) + events += (self.writers[fd] for fd in writable) + return events + + +class PollPollster(PollsterBase): + """Pollster implementation using poll.""" + + def __init__(self): + super().__init__() + self._poll = select.poll() + + def _update(self, fd): + assert isinstance(fd, int), fd + flags = 0 + if fd in self.readers: + flags |= select.POLLIN + if fd in self.writers: + flags |= select.POLLOUT + if flags: + self._poll.register(fd, flags) + else: + self._poll.unregister(fd) + + def register_reader(self, fd, callback, *args): + super().register_reader(fd, callback, *args) + self._update(fd) + + def register_writer(self, fd, callback, *args): + super().register_writer(fd, callback, *args) + self._update(fd) + + def unregister_reader(self, fd): + super().unregister_reader(fd) + self._update(fd) + + def unregister_writer(self, fd): + super().unregister_writer(fd) + self._update(fd) + + def poll(self, timeout=None): + # Timeout is in seconds, but poll() takes milliseconds. + msecs = None if timeout is None else int(round(1000 * timeout)) + events = [] + for fd, flags in self._poll.poll(msecs): + if flags & (select.POLLIN | select.POLLHUP): + if fd in self.readers: + events.append(self.readers[fd]) + if flags & (select.POLLOUT | select.POLLHUP): + if fd in self.writers: + events.append(self.writers[fd]) + return events + + +class EPollPollster(PollsterBase): + """Pollster implementation using epoll.""" + + def __init__(self): + super().__init__() + self._epoll = select.epoll() + + def _update(self, fd): + assert isinstance(fd, int), fd + eventmask = 0 + if fd in self.readers: + eventmask |= select.EPOLLIN + if fd in self.writers: + eventmask |= select.EPOLLOUT + if eventmask: + try: + self._epoll.register(fd, eventmask) + except IOError: + self._epoll.modify(fd, eventmask) + else: + self._epoll.unregister(fd) + + def register_reader(self, fd, callback, *args): + super().register_reader(fd, callback, *args) + self._update(fd) + + def register_writer(self, fd, callback, *args): + super().register_writer(fd, callback, *args) + self._update(fd) + + def unregister_reader(self, fd): + super().unregister_reader(fd) + self._update(fd) + + def unregister_writer(self, fd): + super().unregister_writer(fd) + self._update(fd) + + def poll(self, timeout=None): + if timeout is None: + timeout = -1 # epoll.poll() uses -1 to mean "wait forever". + events = [] + for fd, eventmask in self._epoll.poll(timeout): + if eventmask & select.EPOLLIN: + if fd in self.readers: + events.append(self.readers[fd]) + if eventmask & select.EPOLLOUT: + if fd in self.writers: + events.append(self.writers[fd]) + return events + + +class KqueuePollster(PollsterBase): + """Pollster implementation using kqueue.""" + + def __init__(self): + super().__init__() + self._kqueue = select.kqueue() + + def register_reader(self, fd, callback, *args): + if fd not in self.readers: + kev = select.kevent(fd, select.KQ_FILTER_READ, select.KQ_EV_ADD) + self._kqueue.control([kev], 0, 0) + return super().register_reader(fd, callback, *args) + + def register_writer(self, fd, callback, *args): + if fd not in self.writers: + kev = select.kevent(fd, select.KQ_FILTER_WRITE, select.KQ_EV_ADD) + self._kqueue.control([kev], 0, 0) + return super().register_writer(fd, callback, *args) + + def unregister_reader(self, fd): + super().unregister_reader(fd) + kev = select.kevent(fd, select.KQ_FILTER_READ, select.KQ_EV_DELETE) + self._kqueue.control([kev], 0, 0) + + def unregister_writer(self, fd): + super().unregister_writer(fd) + kev = select.kevent(fd, select.KQ_FILTER_WRITE, select.KQ_EV_DELETE) + self._kqueue.control([kev], 0, 0) + + def poll(self, timeout=None): + events = [] + max_ev = len(self.readers) + len(self.writers) + for kev in self._kqueue.control(None, max_ev, timeout): + fd = kev.ident + flag = kev.filter + if flag == select.KQ_FILTER_READ and fd in self.readers: + events.append(self.readers[fd]) + elif flag == select.KQ_FILTER_WRITE and fd in self.writers: + events.append(self.writers[fd]) + return events + + +# Pick the best pollster class for the platform. +if hasattr(select, 'kqueue'): + best_pollster = KqueuePollster +elif hasattr(select, 'epoll'): + best_pollster = EPollPollster +elif hasattr(select, 'poll'): + best_pollster = PollPollster +else: + best_pollster = SelectPollster + + +class DelayedCall: + """Object returned by callback registration methods.""" + + def __init__(self, when, callback, args, kwds=None): + self.when = when + self.callback = callback + self.args = args + self.kwds = kwds + self.cancelled = False + + def cancel(self): + self.cancelled = True + + def __lt__(self, other): + return self.when < other.when + + def __le__(self, other): + return self.when <= other.when + + def __eq__(self, other): + return self.when == other.when + + +class EventLoop: + """Event loop functionality. + + This defines public APIs call_soon(), call_later(), run_once() and + run(). It also wraps Pollster APIs register_reader(), + register_writer(), remove_reader(), remove_writer() with + add_reader() etc. + + This class's instance variables are not part of its API. + """ + + def __init__(self, pollster=None): + super().__init__() + if pollster is None: + logging.info('Using pollster: %s', best_pollster.__name__) + pollster = best_pollster() + self.pollster = pollster + self.ready = collections.deque() # [(callback, args), ...] + self.scheduled = [] # [(when, callback, args), ...] + + def add_reader(self, fd, callback, *args): + """Add a reader callback. Return a DelayedCall instance.""" + dcall = DelayedCall(None, callback, args) + self.pollster.register_reader(fd, dcall) + return dcall + + def remove_reader(self, fd): + """Remove a reader callback.""" + self.pollster.unregister_reader(fd) + + def add_writer(self, fd, callback, *args): + """Add a writer callback. Return a DelayedCall instance.""" + dcall = DelayedCall(None, callback, args) + self.pollster.register_writer(fd, dcall) + return dcall + + def remove_writer(self, fd): + """Remove a writer callback.""" + self.pollster.unregister_writer(fd) + + def add_callback(self, dcall): + """Add a DelayedCall to ready or scheduled.""" + if dcall.cancelled: + return + if dcall.when is None: + self.ready.append(dcall) + else: + heapq.heappush(self.scheduled, dcall) + + def call_soon(self, callback, *args): + """Arrange for a callback to be called as soon as possible. + + This operates as a FIFO queue, callbacks are called in the + order in which they are registered. Each callback will be + called exactly once. + + Any positional arguments after the callback will be passed to + the callback when it is called. + """ + dcall = DelayedCall(None, callback, args) + self.ready.append(dcall) + return dcall + + def call_later(self, when, callback, *args): + """Arrange for a callback to be called at a given time. + + Return an object with a cancel() method that can be used to + cancel the call. + + The time can be an int or float, expressed in seconds. + + If when is small enough (~11 days), it's assumed to be a + relative time, meaning the call will be scheduled that many + seconds in the future; otherwise it's assumed to be a posix + timestamp as returned by time.time(). + + Each callback will be called exactly once. If two callbacks + are scheduled for exactly the same time, it undefined which + will be called first. + + Any positional arguments after the callback will be passed to + the callback when it is called. + """ + if when < 10000000: + when += time.time() + dcall = DelayedCall(when, callback, args) + heapq.heappush(self.scheduled, dcall) + return dcall + + def run_once(self): + """Run one full iteration of the event loop. + + This calls all currently ready callbacks, polls for I/O, + schedules the resulting callbacks, and finally schedules + 'call_later' callbacks. + """ + # TODO: Break each of these into smaller pieces. + # TODO: Pass in a timeout or deadline or something. + # TODO: Refactor to separate the callbacks from the readers/writers. + # TODO: As step 4, run everything scheduled by steps 1-3. + # TODO: An alternative API would be to do the *minimal* amount + # of work, e.g. one callback or one I/O poll. + + # This is the only place where callbacks are actually *called*. + # All other places just add them to ready. + # TODO: Ensure this loop always finishes, even if some + # callbacks keeps registering more callbacks. + while self.ready: + dcall = self.ready.popleft() + if not dcall.cancelled: + try: + if dcall.kwds: + dcall.callback(*dcall.args, **dcall.kwds) + else: + dcall.callback(*dcall.args) + except Exception: + logging.exception('Exception in callback %s %r', + dcall.callback, dcall.args) + + # Remove delayed calls that were cancelled from head of queue. + while self.scheduled and self.scheduled[0].cancelled: + heapq.heappop(self.scheduled) + + # Inspect the poll queue. + if self.pollster.pollable(): + if self.scheduled: + when = self.scheduled[0].when + timeout = max(0, when - time.time()) + else: + timeout = None + t0 = time.time() + events = self.pollster.poll(timeout) + t1 = time.time() + argstr = '' if timeout is None else ' %.3f' % timeout + if t1-t0 >= 1: + level = logging.INFO + else: + level = logging.DEBUG + logging.log(level, 'poll%s took %.3f seconds', argstr, t1-t0) + for dcall in events: + self.add_callback(dcall) + + # Handle 'later' callbacks that are ready. + now = time.time() + while self.scheduled: + dcall = self.scheduled[0] + if dcall.when > now: + break + dcall = heapq.heappop(self.scheduled) + self.call_soon(dcall.callback, *dcall.args) + + def run(self): + """Run the event loop until there is no work left to do. + + This keeps going as long as there are either readable and + writable file descriptors, or scheduled callbacks (of either + variety). + """ + while self.ready or self.scheduled or self.pollster.pollable(): + self.run_once() + + +MAX_WORKERS = 5 # Default max workers when creating an executor. + + +class ThreadRunner: + """Helper to submit work to a thread pool and wait for it. + + This is the glue between the single-threaded callback-based async + world and the threaded world. Use it to call functions that must + block and don't have an async alternative (e.g. getaddrinfo()). + + The only public API is submit(). + """ + + def __init__(self, eventloop, executor=None): + self.eventloop = eventloop + self.executor = executor # Will be constructed lazily. + self.pipe_read_fd, self.pipe_write_fd = os.pipe() + self.active_count = 0 + + def read_callback(self): + """Semi-permanent callback while at least one future is active.""" + assert self.active_count > 0, self.active_count + data = os.read(self.pipe_read_fd, 8192) # Traditional buffer size. + self.active_count -= len(data) + if self.active_count == 0: + self.eventloop.remove_reader(self.pipe_read_fd) + assert self.active_count >= 0, self.active_count + + def submit(self, func, *args, executor=None, callback=None): + """Submit a function to the thread pool. + + This returns a concurrent.futures.Future instance. The caller + should not wait for that, but rather use the callback argument.. + """ + if executor is None: + executor = self.executor + if executor is None: + # Lazily construct a default executor. + # TODO: Should this be shared between threads? + executor = concurrent.futures.ThreadPoolExecutor(MAX_WORKERS) + self.executor = executor + assert self.active_count >= 0, self.active_count + future = executor.submit(func, *args) + if self.active_count == 0: + self.eventloop.add_reader(self.pipe_read_fd, self.read_callback) + self.active_count += 1 + def done_callback(fut): + if callback is not None: + self.eventloop.call_soon(callback, fut) + # TODO: Wake up the pipe in call_soon()? + os.write(self.pipe_write_fd, b'x') + future.add_done_callback(done_callback) + return future + + +class Context(threading.local): + """Thread-local context. + + We use this to avoid having to explicitly pass around an event loop + or something to hold the current task. + + TODO: Add an API so frameworks can substitute a different notion + of context more easily. + """ + + def __init__(self, eventloop=None, threadrunner=None): + # Default event loop and thread runner are lazily constructed + # when first accessed. + self._eventloop = eventloop + self._threadrunner = threadrunner + self.current_task = None # For the benefit of scheduling.py. + + @property + def eventloop(self): + if self._eventloop is None: + self._eventloop = EventLoop() + return self._eventloop + + @property + def threadrunner(self): + if self._threadrunner is None: + self._threadrunner = ThreadRunner(self.eventloop) + return self._threadrunner + + +context = Context() # Thread-local! diff --git a/old/scheduling.py b/old/scheduling.py new file mode 100644 index 0000000..3864571 --- /dev/null +++ b/old/scheduling.py @@ -0,0 +1,354 @@ +#!/usr/bin/env python3.3 +"""Example coroutine scheduler, PEP-380-style ('yield from <generator>'). + +Requires Python 3.3. + +There are likely micro-optimizations possible here, but that's not the point. + +TODO: +- Docstrings. +- Unittests. + +PATTERNS TO TRY: +- Various synchronization primitives (Lock, RLock, Event, Condition, + Semaphore, BoundedSemaphore, Barrier). +""" + +__author__ = 'Guido van Rossum <guido@python.org>' + +# Standard library imports (keep in alphabetic order). +from concurrent.futures import CancelledError, TimeoutError +import logging +import time +import types + +# Local imports (keep in alphabetic order). +import polling + + +context = polling.context + + +class Task: + """Wrapper around a stack of generators. + + This is a bit like a Future, but with a different interface. + + TODO: + - wait for result. + """ + + def __init__(self, gen, name=None, *, timeout=None): + assert isinstance(gen, types.GeneratorType), repr(gen) + self.gen = gen + self.name = name or gen.__name__ + self.timeout = timeout + self.eventloop = context.eventloop + self.canceleer = None + if timeout is not None: + self.canceleer = self.eventloop.call_later(timeout, self.cancel) + self.blocked = False + self.unblocker = None + self.cancelled = False + self.must_cancel = False + self.alive = True + self.result = None + self.exception = None + self.done_callbacks = [] + # Start the task immediately. + self.eventloop.call_soon(self.step) + + def add_done_callback(self, done_callback): + # For better or for worse, the callback will always be called + # with the task as an argument, like concurrent.futures.Future. + # TODO: Call it right away if task is no longer alive. + dcall = polling.DelayedCall(None, done_callback, (self,)) + self.done_callbacks.append(dcall) + self.done_callbacks = [dc for dc in self.done_callbacks + if not dc.cancelled] + return dcall + + def __repr__(self): + parts = [self.name] + is_current = (self is context.current_task) + if self.blocked: + parts.append('blocking' if is_current else 'blocked') + elif self.alive: + parts.append('running' if is_current else 'runnable') + if self.must_cancel: + parts.append('must_cancel') + if self.cancelled: + parts.append('cancelled') + if self.exception is not None: + parts.append('exception=%r' % self.exception) + elif not self.alive: + parts.append('result=%r' % (self.result,)) + if self.timeout is not None: + parts.append('timeout=%.3f' % self.timeout) + return 'Task<' + ', '.join(parts) + '>' + + def cancel(self): + if self.alive: + if not self.must_cancel and not self.cancelled: + self.must_cancel = True + if self.blocked: + self.unblock() + + def step(self): + assert self.alive, self + try: + context.current_task = self + if self.must_cancel: + self.must_cancel = False + self.cancelled = True + self.gen.throw(CancelledError()) + else: + next(self.gen) + except StopIteration as exc: + self.alive = False + self.result = exc.value + except Exception as exc: + self.alive = False + self.exception = exc + logging.debug('Uncaught exception in %s', self, + exc_info=True, stack_info=True) + except BaseException as exc: + self.alive = False + self.exception = exc + raise + else: + if not self.blocked: + self.eventloop.call_soon(self.step) + finally: + context.current_task = None + if not self.alive: + # Cancel timeout callback if set. + if self.canceleer is not None: + self.canceleer.cancel() + # Schedule done_callbacks. + for dcall in self.done_callbacks: + self.eventloop.add_callback(dcall) + + def block(self, unblock_callback=None, *unblock_args): + assert self is context.current_task, self + assert self.alive, self + assert not self.blocked, self + self.blocked = True + self.unblocker = (unblock_callback, unblock_args) + + def unblock_if_alive(self, unused=None): + # Ignore optional argument so we can be a Future's done_callback. + if self.alive: + self.unblock() + + def unblock(self, unused=None): + # Ignore optional argument so we can be a Future's done_callback. + assert self.alive, self + assert self.blocked, self + self.blocked = False + unblock_callback, unblock_args = self.unblocker + if unblock_callback is not None: + try: + unblock_callback(*unblock_args) + except Exception: + logging.error('Exception in unblocker in task %r', self.name) + raise + finally: + self.unblocker = None + self.eventloop.call_soon(self.step) + + def block_io(self, fd, flag): + assert isinstance(fd, int), repr(fd) + assert flag in ('r', 'w'), repr(flag) + if flag == 'r': + self.block(self.eventloop.remove_reader, fd) + self.eventloop.add_reader(fd, self.unblock) + else: + self.block(self.eventloop.remove_writer, fd) + self.eventloop.add_writer(fd, self.unblock) + + def wait(self): + """COROUTINE: Wait until this task is finished.""" + current_task = context.current_task + assert self is not current_task, (self, current_task) # How confusing! + if not self.alive: + return + current_task.block() + self.add_done_callback(current_task.unblock) + yield + + def __iter__(self): + """COROUTINE: Wait, then return result or raise exception. + + This adds a little magic so you can say + + x = yield from Task(gen()) + + and it is equivalent to + + x = yield from gen() + + but with the option to add a timeout (and only a tad slower). + """ + if self.alive: + yield from self.wait() + assert not self.alive + if self.exception is not None: + raise self.exception + return self.result + + +def run(arg=None): + """Run the event loop until it's out of work. + + If you pass a generator, it will be spawned for you. + You can also pass a task (already started). + Returns the task. + """ + t = None + if arg is not None: + if isinstance(arg, Task): + t = arg + else: + t = Task(arg) + context.eventloop.run() + if t is not None and t.exception is not None: + logging.error('Uncaught exception in startup task: %r', + t.exception) + return t + + +def sleep(secs): + """COROUTINE: Sleep for some time (a float in seconds).""" + current_task = context.current_task + unblocker = context.eventloop.call_later(secs, current_task.unblock) + current_task.block(unblocker.cancel) + yield + + +def block_r(fd): + """COROUTINE: Block until a file descriptor is ready for reading.""" + context.current_task.block_io(fd, 'r') + yield + + +def block_w(fd): + """COROUTINE: Block until a file descriptor is ready for writing.""" + context.current_task.block_io(fd, 'w') + yield + + +def call_in_thread(func, *args, executor=None): + """COROUTINE: Run a function in a thread.""" + task = context.current_task + eventloop = context.eventloop + future = context.threadrunner.submit(func, *args, + executor=executor, + callback=task.unblock_if_alive) + task.block(future.cancel) + yield + assert future.done() + return future.result() + + +def wait_for(count, tasks): + """COROUTINE: Wait for the first N of a set of tasks to complete. + + May return more than N if more than N are immediately ready. + + NOTE: Tasks that were cancelled or raised are also considered ready. + """ + assert tasks + assert all(isinstance(task, Task) for task in tasks) + tasks = set(tasks) + assert 1 <= count <= len(tasks) + current_task = context.current_task + assert all(task is not current_task for task in tasks) + todo = set() + done = set() + dcalls = [] + def wait_for_callback(task): + nonlocal todo, done, current_task, count, dcalls + todo.remove(task) + if len(done) < count: + done.add(task) + if len(done) == count: + for dcall in dcalls: + dcall.cancel() + current_task.unblock() + for task in tasks: + if task.alive: + todo.add(task) + else: + done.add(task) + if len(done) < count: + for task in todo: + dcall = task.add_done_callback(wait_for_callback) + dcalls.append(dcall) + current_task.block() + yield + return done + + +def wait_any(tasks): + """COROUTINE: Wait for the first of a set of tasks to complete.""" + return wait_for(1, tasks) + + +def wait_all(tasks): + """COROUTINE: Wait for all of a set of tasks to complete.""" + return wait_for(len(tasks), tasks) + + +def map_over(gen, *args, timeout=None): + """COROUTINE: map a generator over one or more iterables. + + E.g. map_over(foo, xs, ys) runs + + Task(foo(x, y)) for x, y in zip(xs, ys) + + and returns a list of all results (in that order). However if any + task raises an exception, the remaining tasks are cancelled and + the exception is propagated. + """ + # gen is a generator function. + tasks = [Task(gobj, timeout=timeout) for gobj in map(gen, *args)] + return (yield from par_tasks(tasks)) + + +def par(*args): + """COROUTINE: Wait for generators, return a list of results. + + Raises as soon as one of the tasks raises an exception (and then + remaining tasks are cancelled). + + This differs from par_tasks() in two ways: + - takes *args instead of list of args + - each arg may be a generator or a task + """ + tasks = [] + for arg in args: + if not isinstance(arg, Task): + # TODO: assert arg is a generator or an iterator? + arg = Task(arg) + tasks.append(arg) + return (yield from par_tasks(tasks)) + + +def par_tasks(tasks): + """COROUTINE: Wait for a list of tasks, return a list of results. + + Raises as soon as one of the tasks raises an exception (and then + remaining tasks are cancelled). + """ + todo = set(tasks) + while todo: + ts = yield from wait_any(todo) + for t in ts: + assert not t.alive, t + todo.remove(t) + if t.exception is not None: + for other in todo: + other.cancel() + raise t.exception + return [t.result for t in tasks] diff --git a/old/sockets.py b/old/sockets.py new file mode 100644 index 0000000..a5005dc --- /dev/null +++ b/old/sockets.py @@ -0,0 +1,348 @@ +"""Socket wrappers to go with scheduling.py. + +Classes: + +- SocketTransport: a transport implementation wrapping a socket. +- SslTransport: a transport implementation wrapping SSL around a socket. +- BufferedReader: a buffer wrapping the read end of a transport. + +Functions (all coroutines): + +- connect(): connect a socket. +- getaddrinfo(): look up an address. +- create_connection(): look up address and return a connected socket for it. +- create_transport(): look up address and return a connected transport. + +TODO: +- Improve transport abstraction. +- Make a nice protocol abstraction. +- Unittests. +- A write() call that isn't a generator (needed so you can substitute it + for sys.stderr, pass it to logging.StreamHandler, etc.). +""" + +__author__ = 'Guido van Rossum <guido@python.org>' + +# Stdlib imports. +import errno +import socket +import ssl + +# Local imports. +import scheduling + +# Errno values indicating the connection was disconnected. +_DISCONNECTED = frozenset((errno.ECONNRESET, + errno.ENOTCONN, + errno.ESHUTDOWN, + errno.ECONNABORTED, + errno.EPIPE, + errno.EBADF, + )) + +# Errno values indicating the socket isn't ready for I/O just yet. +_TRYAGAIN = frozenset((errno.EAGAIN, errno.EWOULDBLOCK)) + + +class SocketTransport: + """Transport wrapping a socket. + + The socket must already be connected in non-blocking mode. + """ + + def __init__(self, sock): + self.sock = sock + + def recv(self, n): + """COROUTINE: Read up to n bytes, blocking as needed. + + Always returns at least one byte, except if the socket was + closed or disconnected and there's no more data; then it + returns b''. + """ + assert n >= 0, n + while True: + try: + return self.sock.recv(n) + except socket.error as err: + if err.errno in _TRYAGAIN: + pass + elif err.errno in _DISCONNECTED: + return b'' + else: + raise # Unexpected, propagate. + yield from scheduling.block_r(self.sock.fileno()) + + def send(self, data): + """COROUTINE; Send data to the socket, blocking until all written. + + Return True if all went well, False if socket was disconnected. + """ + while data: + try: + n = self.sock.send(data) + except socket.error as err: + if err.errno in _TRYAGAIN: + pass + elif err.errno in _DISCONNECTED: + return False + else: + raise # Unexpected, propagate. + else: + assert 0 <= n <= len(data), (n, len(data)) + if n == len(data): + break + data = data[n:] + continue + yield from scheduling.block_w(self.sock.fileno()) + + return True + + def close(self): + """Close the socket. (Not a coroutine.)""" + self.sock.close() + + +class SslTransport: + """Transport wrapping a socket in SSL. + + The socket must already be connected at the TCP level in + non-blocking mode. + """ + + def __init__(self, rawsock, sslcontext=None): + self.rawsock = rawsock + self.sslcontext = sslcontext or ssl.SSLContext(ssl.PROTOCOL_SSLv23) + self.sslsock = self.sslcontext.wrap_socket( + self.rawsock, do_handshake_on_connect=False) + + def do_handshake(self): + """COROUTINE: Finish the SSL handshake.""" + while True: + try: + self.sslsock.do_handshake() + except ssl.SSLWantReadError: + yield from scheduling.block_r(self.sslsock.fileno()) + except ssl.SSLWantWriteError: + yield from scheduling.block_w(self.sslsock.fileno()) + else: + break + + def recv(self, n): + """COROUTINE: Read up to n bytes. + + This blocks until at least one byte is read, or until EOF. + """ + while True: + try: + return self.sslsock.recv(n) + except ssl.SSLWantReadError: + yield from scheduling.block_r(self.sslsock.fileno()) + except ssl.SSLWantWriteError: + yield from scheduling.block_w(self.sslsock.fileno()) + except socket.error as err: + if err.errno in _TRYAGAIN: + yield from scheduling.block_r(self.sslsock.fileno()) + elif err.errno in _DISCONNECTED: + # Can this happen? + return b'' + else: + raise # Unexpected, propagate. + + def send(self, data): + """COROUTINE: Send data to the socket, blocking as needed.""" + while data: + try: + n = self.sslsock.send(data) + except ssl.SSLWantReadError: + yield from scheduling.block_r(self.sslsock.fileno()) + except ssl.SSLWantWriteError: + yield from scheduling.block_w(self.sslsock.fileno()) + except socket.error as err: + if err.errno in _TRYAGAIN: + yield from scheduling.block_w(self.sslsock.fileno()) + elif err.errno in _DISCONNECTED: + return False + else: + raise # Unexpected, propagate. + if n == len(data): + break + data = data[n:] + + return True + + def close(self): + """Close the socket. (Not a coroutine.) + + This also closes the raw socket. + """ + self.sslsock.close() + + # TODO: More SSL-specific methods, e.g. certificate stuff, unwrap(), ... + + +class BufferedReader: + """A buffered reader wrapping a transport.""" + + def __init__(self, trans, limit=8192): + self.trans = trans + self.limit = limit + self.buffer = b'' + self.eof = False + + def read(self, n): + """COROUTINE: Read up to n bytes, blocking at most once.""" + assert n >= 0, n + if not self.buffer and not self.eof: + yield from self._fillbuffer(max(n, self.limit)) + return self._getfrombuffer(n) + + def readexactly(self, n): + """COUROUTINE: Read exactly n bytes, or until EOF.""" + blocks = [] + count = 0 + while count < n: + block = yield from self.read(n - count) + if not block: + break + blocks.append(block) + count += len(block) + return b''.join(blocks) + + def readline(self): + """COROUTINE: Read up to newline or limit, whichever comes first.""" + end = self.buffer.find(b'\n') + 1 # Point past newline, or 0. + while not end and not self.eof and len(self.buffer) < self.limit: + anchor = len(self.buffer) + yield from self._fillbuffer(self.limit) + end = self.buffer.find(b'\n', anchor) + 1 + if not end: + end = len(self.buffer) + if end > self.limit: + end = self.limit + return self._getfrombuffer(end) + + def _getfrombuffer(self, n): + """Read up to n bytes without blocking (not a coroutine).""" + if n >= len(self.buffer): + result, self.buffer = self.buffer, b'' + else: + result, self.buffer = self.buffer[:n], self.buffer[n:] + return result + + def _fillbuffer(self, n): + """COROUTINE: Fill buffer with one (up to) n bytes from transport.""" + assert not self.eof, '_fillbuffer called at eof' + data = yield from self.trans.recv(n) + if data: + self.buffer += data + else: + self.eof = True + + +def connect(sock, address): + """COROUTINE: Connect a socket to an address.""" + try: + sock.connect(address) + except socket.error as err: + if err.errno != errno.EINPROGRESS: + raise + yield from scheduling.block_w(sock.fileno()) + err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) + if err != 0: + raise IOError(err, 'Connection refused') + + +def getaddrinfo(host, port, af=0, socktype=0, proto=0): + """COROUTINE: Look up an address and return a list of infos for it. + + Each info is a tuple (af, socktype, protocol, canonname, address). + """ + infos = yield from scheduling.call_in_thread(socket.getaddrinfo, + host, port, af, + socktype, proto) + return infos + + +def create_connection(host, port, af=0, socktype=socket.SOCK_STREAM, proto=0): + """COROUTINE: Look up address and create a socket connected to it.""" + infos = yield from getaddrinfo(host, port, af, socktype, proto) + if not infos: + raise IOError('getaddrinfo() returned an empty list') + exc = None + for af, socktype, proto, cname, address in infos: + sock = None + try: + sock = socket.socket(af, socktype, proto) + sock.setblocking(False) + yield from connect(sock, address) + break + except socket.error as err: + if sock is not None: + sock.close() + if exc is None: + exc = err + else: + raise exc + return sock + + +def create_transport(host, port, af=0, ssl=None): + """COROUTINE: Look up address and create a transport connected to it.""" + if ssl is None: + ssl = (port == 443) + sock = yield from create_connection(host, port, af) + if ssl: + trans = SslTransport(sock) + yield from trans.do_handshake() + else: + trans = SocketTransport(sock) + return trans + + +class Listener: + """Wrapper for a listening socket.""" + + def __init__(self, sock): + self.sock = sock + + def accept(self): + """COROUTINE: Accept a connection.""" + while True: + try: + conn, addr = self.sock.accept() + except socket.error as err: + if err.errno in _TRYAGAIN: + yield from scheduling.block_r(self.sock.fileno()) + else: + raise # Unexpected, propagate. + else: + conn.setblocking(False) + return conn, addr + + +def create_listener(host, port, af=0, socktype=0, proto=0, + backlog=5, reuse_addr=True): + """COROUTINE: Look up address and create a listener for it.""" + infos = yield from getaddrinfo(host, port, af, socktype, proto) + if not infos: + raise IOError('getaddrinfo() returned an empty list') + exc = None + for af, socktype, proto, cname, address in infos: + sock = None + try: + sock = socket.socket(af, socktype, proto) + sock.setblocking(False) + if reuse_addr: + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.bind(address) + sock.listen(backlog) + break + except socket.error as err: + if sock is not None: + sock.close() + if exc is None: + exc = err + else: + raise exc + return Listener(sock) diff --git a/old/transports.py b/old/transports.py new file mode 100644 index 0000000..19095bf --- /dev/null +++ b/old/transports.py @@ -0,0 +1,496 @@ +"""Transports and Protocols, actually. + +Inspired by Twisted, PEP 3153 and github.com/lvh/async-pep. + +THIS IS NOT REAL CODE! IT IS JUST AN EXPERIMENT. +""" + +# Stdlib imports. +import collections +import errno +import logging +import socket +import ssl +import sys +import time + +# Local imports. +import polling +import scheduling +import sockets + +# Errno values indicating the connection was disconnected. +_DISCONNECTED = frozenset((errno.ECONNRESET, + errno.ENOTCONN, + errno.ESHUTDOWN, + errno.ECONNABORTED, + errno.EPIPE, + errno.EBADF, + )) + +# Errno values indicating the socket isn't ready for I/O just yet. +_TRYAGAIN = frozenset((errno.EAGAIN, errno.EWOULDBLOCK)) + + +class Transport: + """ABC representing a transport. + + There may be many implementations. The user never instantiates + this directly; they call some utility function, passing it a + protocol, and the utility function will call the protocol's + connection_made() method with a transport (or it will call + connection_lost() with an exception if it fails to create the + desired transport). + + The implementation here raises NotImplemented for every method + except writelines(), which calls write() in a loop. + """ + + def write(self, data): + """Write some data (bytes) to the transport. + + This does not block; it buffers the data and arranges for it + to be sent out asynchronously. + """ + raise NotImplementedError + + def writelines(self, list_of_data): + """Write a list (or any iterable) of data (bytes) to the transport. + + The default implementation just calls write() for each item in + the list/iterable. + """ + for data in list_of_data: + self.write(data) + + def close(self): + """Closes the transport. + + Buffered data will be flushed asynchronously. No more data will + be received. When all buffered data is flushed, the protocol's + connection_lost() method is called with None as its argument. + """ + raise NotImplementedError + + def abort(self): + """Closes the transport immediately. + + Buffered data will be lost. No more data will be received. + The protocol's connection_lost() method is called with None as + its argument. + """ + raise NotImplementedError + + def half_close(self): + """Closes the write end after flushing buffered data. + + Data may still be received. + + TODO: What's the use case for this? How to implement it? + Should it call shutdown(SHUT_WR) after all the data is flushed? + Is there no use case for closing the other half first? + """ + raise NotImplementedError + + def pause(self): + """Pause the receiving end. + + No data will be received until resume() is called. + """ + raise NotImplementedError + + def resume(self): + """Resume the receiving end. + + Cancels a pause() call, resumes receiving data. + """ + raise NotImplementedError + + +class Protocol: + """ABC representing a protocol. + + The user should implement this interface. They can inherit from + this class but don't need to. The implementations here do + nothing. + + When the user wants to requests a transport, they pass a protocol + instance to a utility function. + + When the connection is made successfully, connection_made() is + called with a suitable transport object. Then data_received() + will be called 0 or more times with data (bytes) received from the + transport; finally, connection_list() will be called exactly once + with either an exception object or None as an argument. + + If the utility function does not succeed in creating a transport, + it will call connection_lost() with an exception object. + + State machine of calls: + + start -> [CM -> DR*] -> CL -> end + """ + + def connection_made(self, transport): + """Called when a connection is made. + + The argument is the transport representing the connection. + To send data, call its write() or writelines() method. + To receive data, wait for data_received() calls. + When the connection is closed, connection_lost() is called. + """ + + def data_received(self, data): + """Called when some data is received. + + The argument is a bytes object. + + TODO: Should we allow it to be a bytesarray or some other + memory buffer? + """ + + def connection_lost(self, exc): + """Called when the connection is lost or closed. + + Also called when we fail to make a connection at all (in that + case connection_made() will not be called). + + The argument is an exception object or None (the latter + meaning a regular EOF is received or the connection was + aborted or closed). + """ + + +# TODO: The rest is platform specific and should move elsewhere. + +class UnixSocketTransport(Transport): + + def __init__(self, eventloop, protocol, sock): + self._eventloop = eventloop + self._protocol = protocol + self._sock = sock + self._buffer = collections.deque() # For write(). + self._write_closed = False + + def _on_readable(self): + try: + data = self._sock.recv(8192) + except socket.error as exc: + if exc.errno not in _TRYAGAIN: + self._bad_error(exc) + else: + if not data: + self._eventloop.remove_reader(self._sock.fileno()) + self._sock.close() + self._protocol.connection_lost(None) + else: + self._protocol.data_received(data) # XXX call_soon()? + + def write(self, data): + assert isinstance(data, bytes) + assert not self._write_closed + if not data: + # Silly, but it happens. + return + if self._buffer: + # We've already registered a callback, just buffer the data. + self._buffer.append(data) + # Consider pausing if the total length of the buffer is + # truly huge. + return + + # TODO: Refactor so there's more sharing between this and + # _on_writable(). + + # There's no callback registered yet. It's quite possible + # that the kernel has buffer space for our data, so try to + # write now. Since the socket is non-blocking it will + # give us an error in _TRYAGAIN if it doesn't have enough + # space for even one more byte; it will return the number + # of bytes written if it can write at least one byte. + try: + n = self._sock.send(data) + except socket.error as exc: + # An error. + if exc.errno not in _TRYAGAIN: + self._bad_error(exc) + return + # The kernel doesn't have room for more data right now. + n = 0 + else: + # Wrote at least one byte. + if n == len(data): + # Wrote it all. Done! + if self._write_closed: + self._sock.shutdown(socket.SHUT_WR) + return + # Throw away the data that was already written. + # TODO: Do this without copying the data? + data = data[n:] + self._buffer.append(data) + self._eventloop.add_writer(self._sock.fileno(), self._on_writable) + + def _on_writable(self): + while self._buffer: + data = self._buffer[0] + # TODO: Join small amounts of data? + try: + n = self._sock.send(data) + except socket.error as exc: + # Error handling is the same as in write(). + if exc.errno not in _TRYAGAIN: + self._bad_error(exc) + return + if n < len(data): + self._buffer[0] = data[n:] + return + self._buffer.popleft() + self._eventloop.remove_writer(self._sock.fileno()) + if self._write_closed: + self._sock.shutdown(socket.SHUT_WR) + + def abort(self): + self._bad_error(None) + + def _bad_error(self, exc): + # A serious error. Close the socket etc. + fd = self._sock.fileno() + # TODO: Record whether we have a writer and/or reader registered. + try: + self._eventloop.remove_writer(fd) + except Exception: + pass + try: + self._eventloop.remove_reader(fd) + except Exception: + pass + self._sock.close() + self._protocol.connection_lost(exc) # XXX call_soon()? + + def half_close(self): + self._write_closed = True + + +class UnixSslTransport(Transport): + + # TODO: Refactor Socket and Ssl transport to share some code. + # (E.g. buffering.) + + # TODO: Consider using coroutines instead of callbacks, it seems + # much easier that way. + + def __init__(self, eventloop, protocol, rawsock, sslcontext=None): + self._eventloop = eventloop + self._protocol = protocol + self._rawsock = rawsock + self._sslcontext = sslcontext or ssl.SSLContext(ssl.PROTOCOL_SSLv23) + self._sslsock = self._sslcontext.wrap_socket( + self._rawsock, do_handshake_on_connect=False) + + self._buffer = collections.deque() # For write(). + self._write_closed = False + + # Try the handshake now. Likely it will raise EAGAIN, then it + # will take care of registering the appropriate callback. + self._on_handshake() + + def _bad_error(self, exc): + # A serious error. Close the socket etc. + fd = self._sslsock.fileno() + # TODO: Record whether we have a writer and/or reader registered. + try: + self._eventloop.remove_writer(fd) + except Exception: + pass + try: + self._eventloop.remove_reader(fd) + except Exception: + pass + self._sslsock.close() + self._protocol.connection_lost(exc) # XXX call_soon()? + + def _on_handshake(self): + fd = self._sslsock.fileno() + try: + self._sslsock.do_handshake() + except ssl.SSLWantReadError: + self._eventloop.add_reader(fd, self._on_handshake) + return + except ssl.SSLWantWriteError: + self._eventloop.add_writable(fd, self._on_handshake) + return + # TODO: What if it raises another error? + try: + self._eventloop.remove_reader(fd) + except Exception: + pass + try: + self._eventloop.remove_writer(fd) + except Exception: + pass + self._protocol.connection_made(self) + self._eventloop.add_reader(fd, self._on_ready) + self._eventloop.add_writer(fd, self._on_ready) + + def _on_ready(self): + # Because of renegotiations (?), there's no difference between + # readable and writable. We just try both. XXX This may be + # incorrect; we probably need to keep state about what we + # should do next. + + # Maybe we're already closed... + fd = self._sslsock.fileno() + if fd < 0: + return + + # First try reading. + try: + data = self._sslsock.recv(8192) + except ssl.SSLWantReadError: + pass + except ssl.SSLWantWriteError: + pass + except socket.error as exc: + if exc.errno not in _TRYAGAIN: + self._bad_error(exc) + return + else: + if data: + self._protocol.data_received(data) + else: + # TODO: Don't close when self._buffer is non-empty. + assert not self._buffer + self._eventloop.remove_reader(fd) + self._eventloop.remove_writer(fd) + self._sslsock.close() + self._protocol.connection_lost(None) + return + + # Now try writing, if there's anything to write. + if not self._buffer: + return + + data = self._buffer[0] + try: + n = self._sslsock.send(data) + except ssl.SSLWantReadError: + pass + except ssl.SSLWantWriteError: + pass + except socket.error as exc: + if exc.errno not in _TRYAGAIN: + self._bad_error(exc) + return + else: + if n == len(data): + self._buffer.popleft() + # Could try again, but let's just have the next callback do it. + else: + self._buffer[0] = data[n:] + + def write(self, data): + assert isinstance(data, bytes) + assert not self._write_closed + if not data: + return + self._buffer.append(data) + # We could optimize, but the callback can do this for now. + + def half_close(self): + self._write_closed = True + # Just set the flag. Calling shutdown() on the ssl socket + # breaks something, causing recv() to return binary data. + + +def make_connection(protocol, host, port=None, af=0, socktype=0, proto=0, + use_ssl=None): + # TODO: Pass in a protocol factory, not a protocol. + # What should be the exact sequence of events? + # - socket + # - transport + # - protocol + # - tell transport about protocol + # - tell protocol about transport + # Or should the latter two be reversed? Does it matter? + if port is None: + port = 443 if use_ssl else 80 + if use_ssl is None: + use_ssl = (port == 443) + if not socktype: + socktype = socket.SOCK_STREAM + eventloop = polling.context.eventloop + + def on_socket_connected(task): + assert not task.alive + if task.exception is not None: + # TODO: Call some callback. + raise task.exception + sock = task.result + assert sock is not None + logging.debug('on_socket_connected') + if use_ssl: + # You can pass an ssl.SSLContext object as use_ssl, + # or a bool. + if isinstance(use_ssl, bool): + sslcontext = None + else: + sslcontext = use_ssl + transport = UnixSslTransport(eventloop, protocol, sock, sslcontext) + else: + transport = UnixSocketTransport(eventloop, protocol, sock) + # TODO: Should the ransport make the following calls? + protocol.connection_made(transport) # XXX call_soon()? + # Don't do this before connection_made() is called. + eventloop.add_reader(sock.fileno(), transport._on_readable) + + coro = sockets.create_connection(host, port, af, socktype, proto) + task = scheduling.Task(coro) + task.add_done_callback(on_socket_connected) + + +def main(): # Testing... + + # Initialize logging. + if '-d' in sys.argv: + level = logging.DEBUG + elif '-v' in sys.argv: + level = logging.INFO + elif '-q' in sys.argv: + level = logging.ERROR + else: + level = logging.WARN + logging.basicConfig(level=level) + + host = 'xkcd.com' + if sys.argv[1:] and '.' in sys.argv[-1]: + host = sys.argv[-1] + + t0 = time.time() + + class TestProtocol(Protocol): + def connection_made(self, transport): + logging.info('Connection made at %.3f secs', time.time() - t0) + self.transport = transport + self.transport.write(b'GET / HTTP/1.0\r\nHost: ' + + host.encode('ascii') + + b'\r\n\r\n') + self.transport.half_close() + def data_received(self, data): + logging.info('Received %d bytes at t=%.3f', + len(data), time.time() - t0) + logging.debug('Received %r', data) + def connection_lost(self, exc): + logging.debug('Connection lost: %r', exc) + self.t1 = time.time() + logging.info('Total time %.3f secs', self.t1 - t0) + + tp = TestProtocol() + logging.debug('tp = %r', tp) + make_connection(tp, host, use_ssl=('-S' in sys.argv)) + logging.info('Running...') + polling.context.eventloop.run() + logging.info('Done.') + + +if __name__ == '__main__': + main() diff --git a/old/xkcd.py b/old/xkcd.py new file mode 100755 index 0000000..474009d --- /dev/null +++ b/old/xkcd.py @@ -0,0 +1,18 @@ +#!/usr/bin/env python3.3 +"""Minimal synchronous SSL demo, connecting to xkcd.com.""" + +import socket, ssl + +s = socket.socket() +s.connect(('xkcd.com', 443)) +ss = ssl.wrap_socket(s) + +ss.send(b'GET / HTTP/1.0\r\n\r\n') + +while True: + data = ss.recv(1000000) + print(data) + if not data: + break + +ss.close() diff --git a/old/yyftime.py b/old/yyftime.py new file mode 100644 index 0000000..f55234b --- /dev/null +++ b/old/yyftime.py @@ -0,0 +1,75 @@ +"""Compare timing of yield-from <generator> vs. yield <future> calls.""" + +import gc +import time + +def coroutine(n): + if n <= 0: + return 1 + l = yield from coroutine(n-1) + r = yield from coroutine(n-1) + return l + 1 + r + +def run_coro(depth): + t0 = time.time() + try: + g = coroutine(depth) + while True: + next(g) + except StopIteration as err: + k = err.value + t1 = time.time() + print('coro', depth, k, round(t1-t0, 6)) + return t1-t0 + +class Future: + + def __init__(self, g): + self.g = g + + def wait(self): + value = None + try: + while True: + f = self.g.send(value) + f.wait() + value = f.value + except StopIteration as err: + self.value = err.value + + + +def task(func): # Decorator + def wrapper(*args): + g = func(*args) + f = Future(g) + return f + return wrapper + +@task +def oldstyle(n): + if n <= 0: + return 1 + l = yield oldstyle(n-1) + r = yield oldstyle(n-1) + return l + 1 + r + +def run_olds(depth): + t0 = time.time() + f = oldstyle(depth) + f.wait() + k = f.value + t1 = time.time() + print('olds', depth, k, round(t1-t0, 6)) + return t1-t0 + +def main(): + gc.disable() + for depth in range(16): + tc = run_coro(depth) + to = run_olds(depth) + if tc: + print('ratio', round(to/tc, 2)) + +if __name__ == '__main__': + main() |