summaryrefslogtreecommitdiff
path: root/old
diff options
context:
space:
mode:
authorGuido van Rossum <guido@google.com>2012-12-18 10:23:53 -0800
committerGuido van Rossum <guido@google.com>2012-12-18 10:23:53 -0800
commit43bd19fa201946d5fa70b721866e20d40cf40498 (patch)
tree906a6d29386807b24d478df9715b19e8cd68e860 /old
parent54e24f2ba2155ed59b4963ad6c040ac1d3c75df8 (diff)
downloadtrollius-43bd19fa201946d5fa70b721866e20d40cf40498.tar.gz
Move old stuff into old/.
Diffstat (limited to 'old')
-rw-r--r--old/Makefile16
-rw-r--r--old/echoclt.py79
-rw-r--r--old/echosvr.py60
-rw-r--r--old/http_client.py78
-rw-r--r--old/http_server.py68
-rw-r--r--old/main.py134
-rw-r--r--old/p3time.py47
-rw-r--r--old/polling.py535
-rw-r--r--old/scheduling.py354
-rw-r--r--old/sockets.py348
-rw-r--r--old/transports.py496
-rwxr-xr-xold/xkcd.py18
-rw-r--r--old/yyftime.py75
13 files changed, 2308 insertions, 0 deletions
diff --git a/old/Makefile b/old/Makefile
new file mode 100644
index 0000000..d352cd7
--- /dev/null
+++ b/old/Makefile
@@ -0,0 +1,16 @@
+PYTHON=python3
+
+main:
+ $(PYTHON) main.py -v
+
+echo:
+ $(PYTHON) echosvr.py -v
+
+profile:
+ $(PYTHON) -m profile -s time main.py
+
+time:
+ $(PYTHON) p3time.py
+
+ytime:
+ $(PYTHON) yyftime.py
diff --git a/old/echoclt.py b/old/echoclt.py
new file mode 100644
index 0000000..c24c573
--- /dev/null
+++ b/old/echoclt.py
@@ -0,0 +1,79 @@
+#!/usr/bin/env python3.3
+"""Example echo client."""
+
+# Stdlib imports.
+import logging
+import socket
+import sys
+import time
+
+# Local imports.
+import scheduling
+import sockets
+
+
+def echoclient(host, port):
+ """COROUTINE"""
+ testdata = b'hi hi hi ha ha ha\n'
+ try:
+ trans = yield from sockets.create_transport(host, port,
+ af=socket.AF_INET)
+ except OSError:
+ return False
+ try:
+ ok = yield from trans.send(testdata)
+ if ok:
+ response = yield from trans.recv(100)
+ ok = response == testdata.upper()
+ return ok
+ finally:
+ trans.close()
+
+
+def doit(n):
+ """COROUTINE"""
+ t0 = time.time()
+ tasks = set()
+ for i in range(n):
+ t = scheduling.Task(echoclient('127.0.0.1', 1111), 'client-%d' % i)
+ tasks.add(t)
+ ok = 0
+ bad = 0
+ for t in tasks:
+ try:
+ yield from t
+ except Exception:
+ bad += 1
+ else:
+ ok += 1
+ t1 = time.time()
+ print('ok: ', ok)
+ print('bad:', bad)
+ print('dt: ', round(t1-t0, 6))
+
+
+def main():
+ # Initialize logging.
+ if '-d' in sys.argv:
+ level = logging.DEBUG
+ elif '-v' in sys.argv:
+ level = logging.INFO
+ elif '-q' in sys.argv:
+ level = logging.ERROR
+ else:
+ level = logging.WARN
+ logging.basicConfig(level=level)
+
+ # Get integer from command line.
+ n = 1
+ for arg in sys.argv[1:]:
+ if not arg.startswith('-'):
+ n = int(arg)
+ break
+
+ # Run scheduler, starting it off with doit().
+ scheduling.run(doit(n))
+
+
+if __name__ == '__main__':
+ main()
diff --git a/old/echosvr.py b/old/echosvr.py
new file mode 100644
index 0000000..4085f4c
--- /dev/null
+++ b/old/echosvr.py
@@ -0,0 +1,60 @@
+#!/usr/bin/env python3.3
+"""Example echo server."""
+
+# Stdlib imports.
+import logging
+import socket
+import sys
+
+# Local imports.
+import scheduling
+import sockets
+
+
+def handler(conn, addr):
+ """COROUTINE: Handle one connection."""
+ logging.info('Accepting connection from %r', addr)
+ trans = sockets.SocketTransport(conn)
+ rdr = sockets.BufferedReader(trans)
+ while True:
+ line = yield from rdr.readline()
+ logging.debug('Received: %r from %r', line, addr)
+ if not line:
+ break
+ yield from trans.send(line.upper())
+ logging.debug('Closing %r', addr)
+ trans.close()
+
+
+def doit():
+ """COROUTINE: Set the wheels in motion."""
+ # Set up listener.
+ listener = yield from sockets.create_listener('localhost', 1111,
+ af=socket.AF_INET,
+ backlog=100)
+ logging.info('Listening on %r', listener.sock.getsockname())
+
+ # Loop accepting connections.
+ while True:
+ conn, addr = yield from listener.accept()
+ t = scheduling.Task(handler(conn, addr))
+
+
+def main():
+ # Initialize logging.
+ if '-d' in sys.argv:
+ level = logging.DEBUG
+ elif '-v' in sys.argv:
+ level = logging.INFO
+ elif '-q' in sys.argv:
+ level = logging.ERROR
+ else:
+ level = logging.WARN
+ logging.basicConfig(level=level)
+
+ # Run scheduler, starting it off with doit().
+ scheduling.run(doit())
+
+
+if __name__ == '__main__':
+ main()
diff --git a/old/http_client.py b/old/http_client.py
new file mode 100644
index 0000000..8937ba2
--- /dev/null
+++ b/old/http_client.py
@@ -0,0 +1,78 @@
+"""Crummy HTTP client.
+
+This is not meant as an example of how to write a good client.
+"""
+
+# Stdlib.
+import re
+import time
+
+# Local.
+import sockets
+
+
+def urlfetch(host, port=None, path='/', method='GET',
+ body=None, hdrs=None, encoding='utf-8', ssl=None, af=0):
+ """COROUTINE: Make an HTTP 1.0 request."""
+ t0 = time.time()
+ if port is None:
+ if ssl:
+ port = 443
+ else:
+ port = 80
+ trans = yield from sockets.create_transport(host, port, ssl=ssl, af=af)
+ yield from trans.send(method.encode(encoding) + b' ' +
+ path.encode(encoding) + b' HTTP/1.0\r\n')
+ if hdrs:
+ kwds = dict(hdrs)
+ else:
+ kwds = {}
+ if 'host' not in kwds:
+ kwds['host'] = host
+ if body is not None:
+ kwds['content_length'] = len(body)
+ for header, value in kwds.items():
+ yield from trans.send(header.replace('_', '-').encode(encoding) +
+ b': ' + value.encode(encoding) + b'\r\n')
+
+ yield from trans.send(b'\r\n')
+ if body is not None:
+ yield from trans.send(body)
+
+ # Read HTTP response line.
+ rdr = sockets.BufferedReader(trans)
+ resp = yield from rdr.readline()
+ m = re.match(br'(?ix) http/(\d\.\d) \s+ (\d\d\d) \s+ ([^\r]*)\r?\n\Z',
+ resp)
+ if not m:
+ trans.close()
+ raise IOError('No valid HTTP response: %r' % resp)
+ http_version, status, message = m.groups()
+
+ # Read HTTP headers.
+ headers = []
+ hdict = {}
+ while True:
+ line = yield from rdr.readline()
+ if not line.strip():
+ break
+ m = re.match(br'([^\s:]+):\s*([^\r]*)\r?\n\Z', line)
+ if not m:
+ raise IOError('Invalid header: %r' % line)
+ header, value = m.groups()
+ headers.append((header, value))
+ hdict[header.decode(encoding).lower()] = value.decode(encoding)
+
+ # Read response body.
+ content_length = hdict.get('content-length')
+ if content_length is not None:
+ size = int(content_length) # TODO: Catch errors.
+ assert size >= 0, size
+ else:
+ size = 2**20 # Protective limit (1 MB).
+ data = yield from rdr.readexactly(size)
+ trans.close() # Can this block?
+ t1 = time.time()
+ result = (host, port, path, int(status), len(data), round(t1-t0, 3))
+## print(result)
+ return result
diff --git a/old/http_server.py b/old/http_server.py
new file mode 100644
index 0000000..2b1e3dd
--- /dev/null
+++ b/old/http_server.py
@@ -0,0 +1,68 @@
+#!/usr/bin/env python3.3
+"""Simple HTTP server.
+
+This currenty exists just so we can benchmark this thing!
+"""
+
+# Stdlib imports.
+import logging
+import re
+import socket
+import sys
+
+# Local imports.
+import scheduling
+import sockets
+
+
+def handler(conn, addr):
+ """COROUTINE: Handle one connection."""
+ ##logging.info('Accepting connection from %r', addr)
+ trans = sockets.SocketTransport(conn)
+ rdr = sockets.BufferedReader(trans)
+
+ # Read but ignore request line.
+ request_line = yield from rdr.readline()
+
+ # Consume headers but don't interpret them.
+ while True:
+ header_line = yield from rdr.readline()
+ if not header_line.strip():
+ break
+
+ # Always send an empty 200 response and close.
+ yield from trans.send(b'HTTP/1.0 200 Ok\r\n\r\n')
+ trans.close()
+
+
+def doit():
+ """COROUTINE: Set the wheels in motion."""
+ # Set up listener.
+ listener = yield from sockets.create_listener('localhost', 8080,
+ af=socket.AF_INET)
+ logging.info('Listening on %r', listener.sock.getsockname())
+
+ # Loop accepting connections.
+ while True:
+ conn, addr = yield from listener.accept()
+ t = scheduling.Task(handler(conn, addr))
+
+
+def main():
+ # Initialize logging.
+ if '-d' in sys.argv:
+ level = logging.DEBUG
+ elif '-v' in sys.argv:
+ level = logging.INFO
+ elif '-q' in sys.argv:
+ level = logging.ERROR
+ else:
+ level = logging.WARN
+ logging.basicConfig(level=level)
+
+ # Run scheduler, starting it off with doit().
+ scheduling.run(doit())
+
+
+if __name__ == '__main__':
+ main()
diff --git a/old/main.py b/old/main.py
new file mode 100644
index 0000000..c1f9d0a
--- /dev/null
+++ b/old/main.py
@@ -0,0 +1,134 @@
+#!/usr/bin/env python3.3
+"""Example HTTP client using yield-from coroutines (PEP 380).
+
+Requires Python 3.3.
+
+There are many micro-optimizations possible here, but that's not the point.
+
+Some incomplete laundry lists:
+
+TODO:
+- Take test urls from command line.
+- Move urlfetch to a separate module.
+- Profiling.
+- Docstrings.
+- Unittests.
+
+FUNCTIONALITY:
+- Connection pool (keep connection open).
+- Chunked encoding (request and response).
+- Pipelining, e.g. zlib (request and response).
+- Automatic encoding/decoding.
+"""
+
+__author__ = 'Guido van Rossum <guido@python.org>'
+
+# Standard library imports (keep in alphabetic order).
+import logging
+import os
+import time
+import socket
+import sys
+
+# Local imports (keep in alphabetic order).
+import scheduling
+import http_client
+
+
+
+def doit2():
+ argses = [
+ ('localhost', 8080, '/'),
+ ('127.0.0.1', 8080, '/home'),
+ ('python.org', 80, '/'),
+ ('xkcd.com', 443, '/'),
+ ]
+ results = yield from scheduling.map_over(
+ lambda args: http_client.urlfetch(*args), argses, timeout=2)
+ for res in results:
+ print('-->', res)
+ return []
+
+
+def doit():
+ TIMEOUT = 2
+ tasks = set()
+
+ # This references NDB's default test service.
+ # (Sadly the service is single-threaded.)
+ task1 = scheduling.Task(http_client.urlfetch('localhost', 8080, path='/'),
+ 'root', timeout=TIMEOUT)
+ tasks.add(task1)
+ task2 = scheduling.Task(http_client.urlfetch('127.0.0.1', 8080,
+ path='/home'),
+ 'home', timeout=TIMEOUT)
+ tasks.add(task2)
+
+ # Fetch python.org home page.
+ task3 = scheduling.Task(http_client.urlfetch('python.org', 80, path='/'),
+ 'python', timeout=TIMEOUT)
+ tasks.add(task3)
+
+ # Fetch XKCD home page using SSL. (Doesn't like IPv6.)
+ task4 = scheduling.Task(http_client.urlfetch('xkcd.com', ssl=True, path='/',
+ af=socket.AF_INET),
+ 'xkcd', timeout=TIMEOUT)
+ tasks.add(task4)
+
+## # Fetch many links from python.org (/x.y.z).
+## for x in '123':
+## for y in '0123456789':
+## path = '/{}.{}'.format(x, y)
+## g = http_client.urlfetch('82.94.164.162', 80,
+## path=path, hdrs={'host': 'python.org'})
+## t = scheduling.Task(g, path, timeout=2)
+## tasks.add(t)
+
+## print(tasks)
+ yield from scheduling.Task(scheduling.sleep(1), timeout=0.2).wait()
+ winners = yield from scheduling.wait_any(tasks)
+ print('And the winners are:', [w.name for w in winners])
+ tasks = yield from scheduling.wait_all(tasks)
+ print('And the players were:', [t.name for t in tasks])
+ return tasks
+
+
+def logtimes(real):
+ utime, stime, cutime, cstime, unused = os.times()
+ logging.info('real %10.3f', real)
+ logging.info('user %10.3f', utime + cutime)
+ logging.info('sys %10.3f', stime + cstime)
+
+
+def main():
+ t0 = time.time()
+
+ # Initialize logging.
+ if '-d' in sys.argv:
+ level = logging.DEBUG
+ elif '-v' in sys.argv:
+ level = logging.INFO
+ elif '-q' in sys.argv:
+ level = logging.ERROR
+ else:
+ level = logging.WARN
+ logging.basicConfig(level=level)
+
+ # Run scheduler, starting it off with doit().
+ task = scheduling.run(doit())
+ if task.exception:
+ print('Exception:', repr(task.exception))
+ if isinstance(task.exception, AssertionError):
+ raise task.exception
+ else:
+ for t in task.result:
+ print(t.name + ':',
+ repr(t.exception) if t.exception else t.result)
+
+ # Report real, user, sys times.
+ t1 = time.time()
+ logtimes(t1-t0)
+
+
+if __name__ == '__main__':
+ main()
diff --git a/old/p3time.py b/old/p3time.py
new file mode 100644
index 0000000..35e14c9
--- /dev/null
+++ b/old/p3time.py
@@ -0,0 +1,47 @@
+"""Compare timing of plain vs. yield-from calls."""
+
+import gc
+import time
+
+def plain(n):
+ if n <= 0:
+ return 1
+ l = plain(n-1)
+ r = plain(n-1)
+ return l + 1 + r
+
+def coroutine(n):
+ if n <= 0:
+ return 1
+ l = yield from coroutine(n-1)
+ r = yield from coroutine(n-1)
+ return l + 1 + r
+
+def submain(depth):
+ t0 = time.time()
+ k = plain(depth)
+ t1 = time.time()
+ fmt = ' {} {} {:-9,.5f}'
+ delta0 = t1-t0
+ print(('plain' + fmt).format(depth, k, delta0))
+
+ t0 = time.time()
+ try:
+ g = coroutine(depth)
+ while True:
+ next(g)
+ except StopIteration as err:
+ k = err.value
+ t1 = time.time()
+ delta1 = t1-t0
+ print(('coro.' + fmt).format(depth, k, delta1))
+ if delta0:
+ print(('relat' + fmt).format(depth, k, delta1/delta0))
+
+def main(reasonable=16):
+ gc.disable()
+ for depth in range(reasonable):
+ submain(depth)
+
+if __name__ == '__main__':
+ main()
diff --git a/old/polling.py b/old/polling.py
new file mode 100644
index 0000000..cffed2a
--- /dev/null
+++ b/old/polling.py
@@ -0,0 +1,535 @@
+"""Event loop and related classes.
+
+The event loop can be broken up into a pollster (the part responsible
+for telling us when file descriptors are ready) and the event loop
+proper, which wraps a pollster with functionality for scheduling
+callbacks, immediately or at a given time in the future.
+
+Whenever a public API takes a callback, subsequent positional
+arguments will be passed to the callback if/when it is called. This
+avoids the proliferation of trivial lambdas implementing closures.
+Keyword arguments for the callback are not supported; this is a
+conscious design decision, leaving the door open for keyword arguments
+to modify the meaning of the API call itself.
+
+There are several implementations of the pollster part, several using
+esoteric system calls that exist only on some platforms. These are:
+
+- kqueue (most BSD systems)
+- epoll (newer Linux systems)
+- poll (most UNIX systems)
+- select (all UNIX systems, and Windows)
+- TODO: Support IOCP on Windows and some UNIX platforms.
+
+NOTE: We don't use select on systems where any of the others is
+available, because select performs poorly as the number of file
+descriptors goes up. The ranking is roughly:
+
+ 1. kqueue, epoll, IOCP
+ 2. poll
+ 3. select
+
+TODO:
+- Optimize the various pollsters.
+- Unittests.
+"""
+
+import collections
+import concurrent.futures
+import heapq
+import logging
+import os
+import select
+import threading
+import time
+
+
+class PollsterBase:
+ """Base class for all polling implementations.
+
+ This defines an interface to register and unregister readers and
+ writers for specific file descriptors, and an interface to get a
+ list of events. There's also an interface to check whether any
+ readers or writers are currently registered.
+ """
+
+ def __init__(self):
+ super().__init__()
+ self.readers = {} # {fd: token, ...}.
+ self.writers = {} # {fd: token, ...}.
+
+ def pollable(self):
+ """Return True if any readers or writers are currently registered."""
+ return bool(self.readers or self.writers)
+
+ # Subclasses are expected to extend the add/remove methods.
+
+ def register_reader(self, fd, token):
+ """Add or update a reader for a file descriptor."""
+ self.readers[fd] = token
+
+ def register_writer(self, fd, token):
+ """Add or update a writer for a file descriptor."""
+ self.writers[fd] = token
+
+ def unregister_reader(self, fd):
+ """Remove the reader for a file descriptor."""
+ del self.readers[fd]
+
+ def unregister_writer(self, fd):
+ """Remove the writer for a file descriptor."""
+ del self.writers[fd]
+
+ def poll(self, timeout=None):
+ """Poll for events. A subclass must implement this.
+
+ If timeout is omitted or None, this blocks until at least one
+ event is ready. Otherwise, timeout gives a maximum time to
+ wait (an int of float in seconds) -- the method returns as
+ soon as at least one event is ready or when the timeout is
+ expired. For a non-blocking poll, pass 0.
+
+ The return value is a list of events; it is empty when the
+ timeout expired before any events were ready. Each event
+ is a token previously passed to register_reader/writer().
+ """
+ raise NotImplementedError
+
+
+class SelectPollster(PollsterBase):
+ """Pollster implementation using select."""
+
+ def poll(self, timeout=None):
+ readable, writable, _ = select.select(self.readers, self.writers,
+ [], timeout)
+ events = []
+ events += (self.readers[fd] for fd in readable)
+ events += (self.writers[fd] for fd in writable)
+ return events
+
+
+class PollPollster(PollsterBase):
+ """Pollster implementation using poll."""
+
+ def __init__(self):
+ super().__init__()
+ self._poll = select.poll()
+
+ def _update(self, fd):
+ assert isinstance(fd, int), fd
+ flags = 0
+ if fd in self.readers:
+ flags |= select.POLLIN
+ if fd in self.writers:
+ flags |= select.POLLOUT
+ if flags:
+ self._poll.register(fd, flags)
+ else:
+ self._poll.unregister(fd)
+
+ def register_reader(self, fd, callback, *args):
+ super().register_reader(fd, callback, *args)
+ self._update(fd)
+
+ def register_writer(self, fd, callback, *args):
+ super().register_writer(fd, callback, *args)
+ self._update(fd)
+
+ def unregister_reader(self, fd):
+ super().unregister_reader(fd)
+ self._update(fd)
+
+ def unregister_writer(self, fd):
+ super().unregister_writer(fd)
+ self._update(fd)
+
+ def poll(self, timeout=None):
+ # Timeout is in seconds, but poll() takes milliseconds.
+ msecs = None if timeout is None else int(round(1000 * timeout))
+ events = []
+ for fd, flags in self._poll.poll(msecs):
+ if flags & (select.POLLIN | select.POLLHUP):
+ if fd in self.readers:
+ events.append(self.readers[fd])
+ if flags & (select.POLLOUT | select.POLLHUP):
+ if fd in self.writers:
+ events.append(self.writers[fd])
+ return events
+
+
+class EPollPollster(PollsterBase):
+ """Pollster implementation using epoll."""
+
+ def __init__(self):
+ super().__init__()
+ self._epoll = select.epoll()
+
+ def _update(self, fd):
+ assert isinstance(fd, int), fd
+ eventmask = 0
+ if fd in self.readers:
+ eventmask |= select.EPOLLIN
+ if fd in self.writers:
+ eventmask |= select.EPOLLOUT
+ if eventmask:
+ try:
+ self._epoll.register(fd, eventmask)
+ except IOError:
+ self._epoll.modify(fd, eventmask)
+ else:
+ self._epoll.unregister(fd)
+
+ def register_reader(self, fd, callback, *args):
+ super().register_reader(fd, callback, *args)
+ self._update(fd)
+
+ def register_writer(self, fd, callback, *args):
+ super().register_writer(fd, callback, *args)
+ self._update(fd)
+
+ def unregister_reader(self, fd):
+ super().unregister_reader(fd)
+ self._update(fd)
+
+ def unregister_writer(self, fd):
+ super().unregister_writer(fd)
+ self._update(fd)
+
+ def poll(self, timeout=None):
+ if timeout is None:
+ timeout = -1 # epoll.poll() uses -1 to mean "wait forever".
+ events = []
+ for fd, eventmask in self._epoll.poll(timeout):
+ if eventmask & select.EPOLLIN:
+ if fd in self.readers:
+ events.append(self.readers[fd])
+ if eventmask & select.EPOLLOUT:
+ if fd in self.writers:
+ events.append(self.writers[fd])
+ return events
+
+
+class KqueuePollster(PollsterBase):
+ """Pollster implementation using kqueue."""
+
+ def __init__(self):
+ super().__init__()
+ self._kqueue = select.kqueue()
+
+ def register_reader(self, fd, callback, *args):
+ if fd not in self.readers:
+ kev = select.kevent(fd, select.KQ_FILTER_READ, select.KQ_EV_ADD)
+ self._kqueue.control([kev], 0, 0)
+ return super().register_reader(fd, callback, *args)
+
+ def register_writer(self, fd, callback, *args):
+ if fd not in self.writers:
+ kev = select.kevent(fd, select.KQ_FILTER_WRITE, select.KQ_EV_ADD)
+ self._kqueue.control([kev], 0, 0)
+ return super().register_writer(fd, callback, *args)
+
+ def unregister_reader(self, fd):
+ super().unregister_reader(fd)
+ kev = select.kevent(fd, select.KQ_FILTER_READ, select.KQ_EV_DELETE)
+ self._kqueue.control([kev], 0, 0)
+
+ def unregister_writer(self, fd):
+ super().unregister_writer(fd)
+ kev = select.kevent(fd, select.KQ_FILTER_WRITE, select.KQ_EV_DELETE)
+ self._kqueue.control([kev], 0, 0)
+
+ def poll(self, timeout=None):
+ events = []
+ max_ev = len(self.readers) + len(self.writers)
+ for kev in self._kqueue.control(None, max_ev, timeout):
+ fd = kev.ident
+ flag = kev.filter
+ if flag == select.KQ_FILTER_READ and fd in self.readers:
+ events.append(self.readers[fd])
+ elif flag == select.KQ_FILTER_WRITE and fd in self.writers:
+ events.append(self.writers[fd])
+ return events
+
+
+# Pick the best pollster class for the platform.
+if hasattr(select, 'kqueue'):
+ best_pollster = KqueuePollster
+elif hasattr(select, 'epoll'):
+ best_pollster = EPollPollster
+elif hasattr(select, 'poll'):
+ best_pollster = PollPollster
+else:
+ best_pollster = SelectPollster
+
+
+class DelayedCall:
+ """Object returned by callback registration methods."""
+
+ def __init__(self, when, callback, args, kwds=None):
+ self.when = when
+ self.callback = callback
+ self.args = args
+ self.kwds = kwds
+ self.cancelled = False
+
+ def cancel(self):
+ self.cancelled = True
+
+ def __lt__(self, other):
+ return self.when < other.when
+
+ def __le__(self, other):
+ return self.when <= other.when
+
+ def __eq__(self, other):
+ return self.when == other.when
+
+
+class EventLoop:
+ """Event loop functionality.
+
+ This defines public APIs call_soon(), call_later(), run_once() and
+ run(). It also wraps Pollster APIs register_reader(),
+ register_writer(), remove_reader(), remove_writer() with
+ add_reader() etc.
+
+ This class's instance variables are not part of its API.
+ """
+
+ def __init__(self, pollster=None):
+ super().__init__()
+ if pollster is None:
+ logging.info('Using pollster: %s', best_pollster.__name__)
+ pollster = best_pollster()
+ self.pollster = pollster
+ self.ready = collections.deque() # [(callback, args), ...]
+ self.scheduled = [] # [(when, callback, args), ...]
+
+ def add_reader(self, fd, callback, *args):
+ """Add a reader callback. Return a DelayedCall instance."""
+ dcall = DelayedCall(None, callback, args)
+ self.pollster.register_reader(fd, dcall)
+ return dcall
+
+ def remove_reader(self, fd):
+ """Remove a reader callback."""
+ self.pollster.unregister_reader(fd)
+
+ def add_writer(self, fd, callback, *args):
+ """Add a writer callback. Return a DelayedCall instance."""
+ dcall = DelayedCall(None, callback, args)
+ self.pollster.register_writer(fd, dcall)
+ return dcall
+
+ def remove_writer(self, fd):
+ """Remove a writer callback."""
+ self.pollster.unregister_writer(fd)
+
+ def add_callback(self, dcall):
+ """Add a DelayedCall to ready or scheduled."""
+ if dcall.cancelled:
+ return
+ if dcall.when is None:
+ self.ready.append(dcall)
+ else:
+ heapq.heappush(self.scheduled, dcall)
+
+ def call_soon(self, callback, *args):
+ """Arrange for a callback to be called as soon as possible.
+
+ This operates as a FIFO queue, callbacks are called in the
+ order in which they are registered. Each callback will be
+ called exactly once.
+
+ Any positional arguments after the callback will be passed to
+ the callback when it is called.
+ """
+ dcall = DelayedCall(None, callback, args)
+ self.ready.append(dcall)
+ return dcall
+
+ def call_later(self, when, callback, *args):
+ """Arrange for a callback to be called at a given time.
+
+ Return an object with a cancel() method that can be used to
+ cancel the call.
+
+ The time can be an int or float, expressed in seconds.
+
+ If when is small enough (~11 days), it's assumed to be a
+ relative time, meaning the call will be scheduled that many
+ seconds in the future; otherwise it's assumed to be a posix
+ timestamp as returned by time.time().
+
+ Each callback will be called exactly once. If two callbacks
+ are scheduled for exactly the same time, it undefined which
+ will be called first.
+
+ Any positional arguments after the callback will be passed to
+ the callback when it is called.
+ """
+ if when < 10000000:
+ when += time.time()
+ dcall = DelayedCall(when, callback, args)
+ heapq.heappush(self.scheduled, dcall)
+ return dcall
+
+ def run_once(self):
+ """Run one full iteration of the event loop.
+
+ This calls all currently ready callbacks, polls for I/O,
+ schedules the resulting callbacks, and finally schedules
+ 'call_later' callbacks.
+ """
+ # TODO: Break each of these into smaller pieces.
+ # TODO: Pass in a timeout or deadline or something.
+ # TODO: Refactor to separate the callbacks from the readers/writers.
+ # TODO: As step 4, run everything scheduled by steps 1-3.
+ # TODO: An alternative API would be to do the *minimal* amount
+ # of work, e.g. one callback or one I/O poll.
+
+ # This is the only place where callbacks are actually *called*.
+ # All other places just add them to ready.
+ # TODO: Ensure this loop always finishes, even if some
+ # callbacks keeps registering more callbacks.
+ while self.ready:
+ dcall = self.ready.popleft()
+ if not dcall.cancelled:
+ try:
+ if dcall.kwds:
+ dcall.callback(*dcall.args, **dcall.kwds)
+ else:
+ dcall.callback(*dcall.args)
+ except Exception:
+ logging.exception('Exception in callback %s %r',
+ dcall.callback, dcall.args)
+
+ # Remove delayed calls that were cancelled from head of queue.
+ while self.scheduled and self.scheduled[0].cancelled:
+ heapq.heappop(self.scheduled)
+
+ # Inspect the poll queue.
+ if self.pollster.pollable():
+ if self.scheduled:
+ when = self.scheduled[0].when
+ timeout = max(0, when - time.time())
+ else:
+ timeout = None
+ t0 = time.time()
+ events = self.pollster.poll(timeout)
+ t1 = time.time()
+ argstr = '' if timeout is None else ' %.3f' % timeout
+ if t1-t0 >= 1:
+ level = logging.INFO
+ else:
+ level = logging.DEBUG
+ logging.log(level, 'poll%s took %.3f seconds', argstr, t1-t0)
+ for dcall in events:
+ self.add_callback(dcall)
+
+ # Handle 'later' callbacks that are ready.
+ now = time.time()
+ while self.scheduled:
+ dcall = self.scheduled[0]
+ if dcall.when > now:
+ break
+ dcall = heapq.heappop(self.scheduled)
+ self.call_soon(dcall.callback, *dcall.args)
+
+ def run(self):
+ """Run the event loop until there is no work left to do.
+
+ This keeps going as long as there are either readable and
+ writable file descriptors, or scheduled callbacks (of either
+ variety).
+ """
+ while self.ready or self.scheduled or self.pollster.pollable():
+ self.run_once()
+
+
+MAX_WORKERS = 5 # Default max workers when creating an executor.
+
+
+class ThreadRunner:
+ """Helper to submit work to a thread pool and wait for it.
+
+ This is the glue between the single-threaded callback-based async
+ world and the threaded world. Use it to call functions that must
+ block and don't have an async alternative (e.g. getaddrinfo()).
+
+ The only public API is submit().
+ """
+
+ def __init__(self, eventloop, executor=None):
+ self.eventloop = eventloop
+ self.executor = executor # Will be constructed lazily.
+ self.pipe_read_fd, self.pipe_write_fd = os.pipe()
+ self.active_count = 0
+
+ def read_callback(self):
+ """Semi-permanent callback while at least one future is active."""
+ assert self.active_count > 0, self.active_count
+ data = os.read(self.pipe_read_fd, 8192) # Traditional buffer size.
+ self.active_count -= len(data)
+ if self.active_count == 0:
+ self.eventloop.remove_reader(self.pipe_read_fd)
+ assert self.active_count >= 0, self.active_count
+
+ def submit(self, func, *args, executor=None, callback=None):
+ """Submit a function to the thread pool.
+
+ This returns a concurrent.futures.Future instance. The caller
+ should not wait for that, but rather use the callback argument..
+ """
+ if executor is None:
+ executor = self.executor
+ if executor is None:
+ # Lazily construct a default executor.
+ # TODO: Should this be shared between threads?
+ executor = concurrent.futures.ThreadPoolExecutor(MAX_WORKERS)
+ self.executor = executor
+ assert self.active_count >= 0, self.active_count
+ future = executor.submit(func, *args)
+ if self.active_count == 0:
+ self.eventloop.add_reader(self.pipe_read_fd, self.read_callback)
+ self.active_count += 1
+ def done_callback(fut):
+ if callback is not None:
+ self.eventloop.call_soon(callback, fut)
+ # TODO: Wake up the pipe in call_soon()?
+ os.write(self.pipe_write_fd, b'x')
+ future.add_done_callback(done_callback)
+ return future
+
+
+class Context(threading.local):
+ """Thread-local context.
+
+ We use this to avoid having to explicitly pass around an event loop
+ or something to hold the current task.
+
+ TODO: Add an API so frameworks can substitute a different notion
+ of context more easily.
+ """
+
+ def __init__(self, eventloop=None, threadrunner=None):
+ # Default event loop and thread runner are lazily constructed
+ # when first accessed.
+ self._eventloop = eventloop
+ self._threadrunner = threadrunner
+ self.current_task = None # For the benefit of scheduling.py.
+
+ @property
+ def eventloop(self):
+ if self._eventloop is None:
+ self._eventloop = EventLoop()
+ return self._eventloop
+
+ @property
+ def threadrunner(self):
+ if self._threadrunner is None:
+ self._threadrunner = ThreadRunner(self.eventloop)
+ return self._threadrunner
+
+
+context = Context() # Thread-local!
diff --git a/old/scheduling.py b/old/scheduling.py
new file mode 100644
index 0000000..3864571
--- /dev/null
+++ b/old/scheduling.py
@@ -0,0 +1,354 @@
+#!/usr/bin/env python3.3
+"""Example coroutine scheduler, PEP-380-style ('yield from <generator>').
+
+Requires Python 3.3.
+
+There are likely micro-optimizations possible here, but that's not the point.
+
+TODO:
+- Docstrings.
+- Unittests.
+
+PATTERNS TO TRY:
+- Various synchronization primitives (Lock, RLock, Event, Condition,
+ Semaphore, BoundedSemaphore, Barrier).
+"""
+
+__author__ = 'Guido van Rossum <guido@python.org>'
+
+# Standard library imports (keep in alphabetic order).
+from concurrent.futures import CancelledError, TimeoutError
+import logging
+import time
+import types
+
+# Local imports (keep in alphabetic order).
+import polling
+
+
+context = polling.context
+
+
+class Task:
+ """Wrapper around a stack of generators.
+
+ This is a bit like a Future, but with a different interface.
+
+ TODO:
+ - wait for result.
+ """
+
+ def __init__(self, gen, name=None, *, timeout=None):
+ assert isinstance(gen, types.GeneratorType), repr(gen)
+ self.gen = gen
+ self.name = name or gen.__name__
+ self.timeout = timeout
+ self.eventloop = context.eventloop
+ self.canceleer = None
+ if timeout is not None:
+ self.canceleer = self.eventloop.call_later(timeout, self.cancel)
+ self.blocked = False
+ self.unblocker = None
+ self.cancelled = False
+ self.must_cancel = False
+ self.alive = True
+ self.result = None
+ self.exception = None
+ self.done_callbacks = []
+ # Start the task immediately.
+ self.eventloop.call_soon(self.step)
+
+ def add_done_callback(self, done_callback):
+ # For better or for worse, the callback will always be called
+ # with the task as an argument, like concurrent.futures.Future.
+ # TODO: Call it right away if task is no longer alive.
+ dcall = polling.DelayedCall(None, done_callback, (self,))
+ self.done_callbacks.append(dcall)
+ self.done_callbacks = [dc for dc in self.done_callbacks
+ if not dc.cancelled]
+ return dcall
+
+ def __repr__(self):
+ parts = [self.name]
+ is_current = (self is context.current_task)
+ if self.blocked:
+ parts.append('blocking' if is_current else 'blocked')
+ elif self.alive:
+ parts.append('running' if is_current else 'runnable')
+ if self.must_cancel:
+ parts.append('must_cancel')
+ if self.cancelled:
+ parts.append('cancelled')
+ if self.exception is not None:
+ parts.append('exception=%r' % self.exception)
+ elif not self.alive:
+ parts.append('result=%r' % (self.result,))
+ if self.timeout is not None:
+ parts.append('timeout=%.3f' % self.timeout)
+ return 'Task<' + ', '.join(parts) + '>'
+
+ def cancel(self):
+ if self.alive:
+ if not self.must_cancel and not self.cancelled:
+ self.must_cancel = True
+ if self.blocked:
+ self.unblock()
+
+ def step(self):
+ assert self.alive, self
+ try:
+ context.current_task = self
+ if self.must_cancel:
+ self.must_cancel = False
+ self.cancelled = True
+ self.gen.throw(CancelledError())
+ else:
+ next(self.gen)
+ except StopIteration as exc:
+ self.alive = False
+ self.result = exc.value
+ except Exception as exc:
+ self.alive = False
+ self.exception = exc
+ logging.debug('Uncaught exception in %s', self,
+ exc_info=True, stack_info=True)
+ except BaseException as exc:
+ self.alive = False
+ self.exception = exc
+ raise
+ else:
+ if not self.blocked:
+ self.eventloop.call_soon(self.step)
+ finally:
+ context.current_task = None
+ if not self.alive:
+ # Cancel timeout callback if set.
+ if self.canceleer is not None:
+ self.canceleer.cancel()
+ # Schedule done_callbacks.
+ for dcall in self.done_callbacks:
+ self.eventloop.add_callback(dcall)
+
+ def block(self, unblock_callback=None, *unblock_args):
+ assert self is context.current_task, self
+ assert self.alive, self
+ assert not self.blocked, self
+ self.blocked = True
+ self.unblocker = (unblock_callback, unblock_args)
+
+ def unblock_if_alive(self, unused=None):
+ # Ignore optional argument so we can be a Future's done_callback.
+ if self.alive:
+ self.unblock()
+
+ def unblock(self, unused=None):
+ # Ignore optional argument so we can be a Future's done_callback.
+ assert self.alive, self
+ assert self.blocked, self
+ self.blocked = False
+ unblock_callback, unblock_args = self.unblocker
+ if unblock_callback is not None:
+ try:
+ unblock_callback(*unblock_args)
+ except Exception:
+ logging.error('Exception in unblocker in task %r', self.name)
+ raise
+ finally:
+ self.unblocker = None
+ self.eventloop.call_soon(self.step)
+
+ def block_io(self, fd, flag):
+ assert isinstance(fd, int), repr(fd)
+ assert flag in ('r', 'w'), repr(flag)
+ if flag == 'r':
+ self.block(self.eventloop.remove_reader, fd)
+ self.eventloop.add_reader(fd, self.unblock)
+ else:
+ self.block(self.eventloop.remove_writer, fd)
+ self.eventloop.add_writer(fd, self.unblock)
+
+ def wait(self):
+ """COROUTINE: Wait until this task is finished."""
+ current_task = context.current_task
+ assert self is not current_task, (self, current_task) # How confusing!
+ if not self.alive:
+ return
+ current_task.block()
+ self.add_done_callback(current_task.unblock)
+ yield
+
+ def __iter__(self):
+ """COROUTINE: Wait, then return result or raise exception.
+
+ This adds a little magic so you can say
+
+ x = yield from Task(gen())
+
+ and it is equivalent to
+
+ x = yield from gen()
+
+ but with the option to add a timeout (and only a tad slower).
+ """
+ if self.alive:
+ yield from self.wait()
+ assert not self.alive
+ if self.exception is not None:
+ raise self.exception
+ return self.result
+
+
+def run(arg=None):
+ """Run the event loop until it's out of work.
+
+ If you pass a generator, it will be spawned for you.
+ You can also pass a task (already started).
+ Returns the task.
+ """
+ t = None
+ if arg is not None:
+ if isinstance(arg, Task):
+ t = arg
+ else:
+ t = Task(arg)
+ context.eventloop.run()
+ if t is not None and t.exception is not None:
+ logging.error('Uncaught exception in startup task: %r',
+ t.exception)
+ return t
+
+
+def sleep(secs):
+ """COROUTINE: Sleep for some time (a float in seconds)."""
+ current_task = context.current_task
+ unblocker = context.eventloop.call_later(secs, current_task.unblock)
+ current_task.block(unblocker.cancel)
+ yield
+
+
+def block_r(fd):
+ """COROUTINE: Block until a file descriptor is ready for reading."""
+ context.current_task.block_io(fd, 'r')
+ yield
+
+
+def block_w(fd):
+ """COROUTINE: Block until a file descriptor is ready for writing."""
+ context.current_task.block_io(fd, 'w')
+ yield
+
+
+def call_in_thread(func, *args, executor=None):
+ """COROUTINE: Run a function in a thread."""
+ task = context.current_task
+ eventloop = context.eventloop
+ future = context.threadrunner.submit(func, *args,
+ executor=executor,
+ callback=task.unblock_if_alive)
+ task.block(future.cancel)
+ yield
+ assert future.done()
+ return future.result()
+
+
+def wait_for(count, tasks):
+ """COROUTINE: Wait for the first N of a set of tasks to complete.
+
+ May return more than N if more than N are immediately ready.
+
+ NOTE: Tasks that were cancelled or raised are also considered ready.
+ """
+ assert tasks
+ assert all(isinstance(task, Task) for task in tasks)
+ tasks = set(tasks)
+ assert 1 <= count <= len(tasks)
+ current_task = context.current_task
+ assert all(task is not current_task for task in tasks)
+ todo = set()
+ done = set()
+ dcalls = []
+ def wait_for_callback(task):
+ nonlocal todo, done, current_task, count, dcalls
+ todo.remove(task)
+ if len(done) < count:
+ done.add(task)
+ if len(done) == count:
+ for dcall in dcalls:
+ dcall.cancel()
+ current_task.unblock()
+ for task in tasks:
+ if task.alive:
+ todo.add(task)
+ else:
+ done.add(task)
+ if len(done) < count:
+ for task in todo:
+ dcall = task.add_done_callback(wait_for_callback)
+ dcalls.append(dcall)
+ current_task.block()
+ yield
+ return done
+
+
+def wait_any(tasks):
+ """COROUTINE: Wait for the first of a set of tasks to complete."""
+ return wait_for(1, tasks)
+
+
+def wait_all(tasks):
+ """COROUTINE: Wait for all of a set of tasks to complete."""
+ return wait_for(len(tasks), tasks)
+
+
+def map_over(gen, *args, timeout=None):
+ """COROUTINE: map a generator over one or more iterables.
+
+ E.g. map_over(foo, xs, ys) runs
+
+ Task(foo(x, y)) for x, y in zip(xs, ys)
+
+ and returns a list of all results (in that order). However if any
+ task raises an exception, the remaining tasks are cancelled and
+ the exception is propagated.
+ """
+ # gen is a generator function.
+ tasks = [Task(gobj, timeout=timeout) for gobj in map(gen, *args)]
+ return (yield from par_tasks(tasks))
+
+
+def par(*args):
+ """COROUTINE: Wait for generators, return a list of results.
+
+ Raises as soon as one of the tasks raises an exception (and then
+ remaining tasks are cancelled).
+
+ This differs from par_tasks() in two ways:
+ - takes *args instead of list of args
+ - each arg may be a generator or a task
+ """
+ tasks = []
+ for arg in args:
+ if not isinstance(arg, Task):
+ # TODO: assert arg is a generator or an iterator?
+ arg = Task(arg)
+ tasks.append(arg)
+ return (yield from par_tasks(tasks))
+
+
+def par_tasks(tasks):
+ """COROUTINE: Wait for a list of tasks, return a list of results.
+
+ Raises as soon as one of the tasks raises an exception (and then
+ remaining tasks are cancelled).
+ """
+ todo = set(tasks)
+ while todo:
+ ts = yield from wait_any(todo)
+ for t in ts:
+ assert not t.alive, t
+ todo.remove(t)
+ if t.exception is not None:
+ for other in todo:
+ other.cancel()
+ raise t.exception
+ return [t.result for t in tasks]
diff --git a/old/sockets.py b/old/sockets.py
new file mode 100644
index 0000000..a5005dc
--- /dev/null
+++ b/old/sockets.py
@@ -0,0 +1,348 @@
+"""Socket wrappers to go with scheduling.py.
+
+Classes:
+
+- SocketTransport: a transport implementation wrapping a socket.
+- SslTransport: a transport implementation wrapping SSL around a socket.
+- BufferedReader: a buffer wrapping the read end of a transport.
+
+Functions (all coroutines):
+
+- connect(): connect a socket.
+- getaddrinfo(): look up an address.
+- create_connection(): look up address and return a connected socket for it.
+- create_transport(): look up address and return a connected transport.
+
+TODO:
+- Improve transport abstraction.
+- Make a nice protocol abstraction.
+- Unittests.
+- A write() call that isn't a generator (needed so you can substitute it
+ for sys.stderr, pass it to logging.StreamHandler, etc.).
+"""
+
+__author__ = 'Guido van Rossum <guido@python.org>'
+
+# Stdlib imports.
+import errno
+import socket
+import ssl
+
+# Local imports.
+import scheduling
+
+# Errno values indicating the connection was disconnected.
+_DISCONNECTED = frozenset((errno.ECONNRESET,
+ errno.ENOTCONN,
+ errno.ESHUTDOWN,
+ errno.ECONNABORTED,
+ errno.EPIPE,
+ errno.EBADF,
+ ))
+
+# Errno values indicating the socket isn't ready for I/O just yet.
+_TRYAGAIN = frozenset((errno.EAGAIN, errno.EWOULDBLOCK))
+
+
+class SocketTransport:
+ """Transport wrapping a socket.
+
+ The socket must already be connected in non-blocking mode.
+ """
+
+ def __init__(self, sock):
+ self.sock = sock
+
+ def recv(self, n):
+ """COROUTINE: Read up to n bytes, blocking as needed.
+
+ Always returns at least one byte, except if the socket was
+ closed or disconnected and there's no more data; then it
+ returns b''.
+ """
+ assert n >= 0, n
+ while True:
+ try:
+ return self.sock.recv(n)
+ except socket.error as err:
+ if err.errno in _TRYAGAIN:
+ pass
+ elif err.errno in _DISCONNECTED:
+ return b''
+ else:
+ raise # Unexpected, propagate.
+ yield from scheduling.block_r(self.sock.fileno())
+
+ def send(self, data):
+ """COROUTINE; Send data to the socket, blocking until all written.
+
+ Return True if all went well, False if socket was disconnected.
+ """
+ while data:
+ try:
+ n = self.sock.send(data)
+ except socket.error as err:
+ if err.errno in _TRYAGAIN:
+ pass
+ elif err.errno in _DISCONNECTED:
+ return False
+ else:
+ raise # Unexpected, propagate.
+ else:
+ assert 0 <= n <= len(data), (n, len(data))
+ if n == len(data):
+ break
+ data = data[n:]
+ continue
+ yield from scheduling.block_w(self.sock.fileno())
+
+ return True
+
+ def close(self):
+ """Close the socket. (Not a coroutine.)"""
+ self.sock.close()
+
+
+class SslTransport:
+ """Transport wrapping a socket in SSL.
+
+ The socket must already be connected at the TCP level in
+ non-blocking mode.
+ """
+
+ def __init__(self, rawsock, sslcontext=None):
+ self.rawsock = rawsock
+ self.sslcontext = sslcontext or ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+ self.sslsock = self.sslcontext.wrap_socket(
+ self.rawsock, do_handshake_on_connect=False)
+
+ def do_handshake(self):
+ """COROUTINE: Finish the SSL handshake."""
+ while True:
+ try:
+ self.sslsock.do_handshake()
+ except ssl.SSLWantReadError:
+ yield from scheduling.block_r(self.sslsock.fileno())
+ except ssl.SSLWantWriteError:
+ yield from scheduling.block_w(self.sslsock.fileno())
+ else:
+ break
+
+ def recv(self, n):
+ """COROUTINE: Read up to n bytes.
+
+ This blocks until at least one byte is read, or until EOF.
+ """
+ while True:
+ try:
+ return self.sslsock.recv(n)
+ except ssl.SSLWantReadError:
+ yield from scheduling.block_r(self.sslsock.fileno())
+ except ssl.SSLWantWriteError:
+ yield from scheduling.block_w(self.sslsock.fileno())
+ except socket.error as err:
+ if err.errno in _TRYAGAIN:
+ yield from scheduling.block_r(self.sslsock.fileno())
+ elif err.errno in _DISCONNECTED:
+ # Can this happen?
+ return b''
+ else:
+ raise # Unexpected, propagate.
+
+ def send(self, data):
+ """COROUTINE: Send data to the socket, blocking as needed."""
+ while data:
+ try:
+ n = self.sslsock.send(data)
+ except ssl.SSLWantReadError:
+ yield from scheduling.block_r(self.sslsock.fileno())
+ except ssl.SSLWantWriteError:
+ yield from scheduling.block_w(self.sslsock.fileno())
+ except socket.error as err:
+ if err.errno in _TRYAGAIN:
+ yield from scheduling.block_w(self.sslsock.fileno())
+ elif err.errno in _DISCONNECTED:
+ return False
+ else:
+ raise # Unexpected, propagate.
+ if n == len(data):
+ break
+ data = data[n:]
+
+ return True
+
+ def close(self):
+ """Close the socket. (Not a coroutine.)
+
+ This also closes the raw socket.
+ """
+ self.sslsock.close()
+
+ # TODO: More SSL-specific methods, e.g. certificate stuff, unwrap(), ...
+
+
+class BufferedReader:
+ """A buffered reader wrapping a transport."""
+
+ def __init__(self, trans, limit=8192):
+ self.trans = trans
+ self.limit = limit
+ self.buffer = b''
+ self.eof = False
+
+ def read(self, n):
+ """COROUTINE: Read up to n bytes, blocking at most once."""
+ assert n >= 0, n
+ if not self.buffer and not self.eof:
+ yield from self._fillbuffer(max(n, self.limit))
+ return self._getfrombuffer(n)
+
+ def readexactly(self, n):
+ """COUROUTINE: Read exactly n bytes, or until EOF."""
+ blocks = []
+ count = 0
+ while count < n:
+ block = yield from self.read(n - count)
+ if not block:
+ break
+ blocks.append(block)
+ count += len(block)
+ return b''.join(blocks)
+
+ def readline(self):
+ """COROUTINE: Read up to newline or limit, whichever comes first."""
+ end = self.buffer.find(b'\n') + 1 # Point past newline, or 0.
+ while not end and not self.eof and len(self.buffer) < self.limit:
+ anchor = len(self.buffer)
+ yield from self._fillbuffer(self.limit)
+ end = self.buffer.find(b'\n', anchor) + 1
+ if not end:
+ end = len(self.buffer)
+ if end > self.limit:
+ end = self.limit
+ return self._getfrombuffer(end)
+
+ def _getfrombuffer(self, n):
+ """Read up to n bytes without blocking (not a coroutine)."""
+ if n >= len(self.buffer):
+ result, self.buffer = self.buffer, b''
+ else:
+ result, self.buffer = self.buffer[:n], self.buffer[n:]
+ return result
+
+ def _fillbuffer(self, n):
+ """COROUTINE: Fill buffer with one (up to) n bytes from transport."""
+ assert not self.eof, '_fillbuffer called at eof'
+ data = yield from self.trans.recv(n)
+ if data:
+ self.buffer += data
+ else:
+ self.eof = True
+
+
+def connect(sock, address):
+ """COROUTINE: Connect a socket to an address."""
+ try:
+ sock.connect(address)
+ except socket.error as err:
+ if err.errno != errno.EINPROGRESS:
+ raise
+ yield from scheduling.block_w(sock.fileno())
+ err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
+ if err != 0:
+ raise IOError(err, 'Connection refused')
+
+
+def getaddrinfo(host, port, af=0, socktype=0, proto=0):
+ """COROUTINE: Look up an address and return a list of infos for it.
+
+ Each info is a tuple (af, socktype, protocol, canonname, address).
+ """
+ infos = yield from scheduling.call_in_thread(socket.getaddrinfo,
+ host, port, af,
+ socktype, proto)
+ return infos
+
+
+def create_connection(host, port, af=0, socktype=socket.SOCK_STREAM, proto=0):
+ """COROUTINE: Look up address and create a socket connected to it."""
+ infos = yield from getaddrinfo(host, port, af, socktype, proto)
+ if not infos:
+ raise IOError('getaddrinfo() returned an empty list')
+ exc = None
+ for af, socktype, proto, cname, address in infos:
+ sock = None
+ try:
+ sock = socket.socket(af, socktype, proto)
+ sock.setblocking(False)
+ yield from connect(sock, address)
+ break
+ except socket.error as err:
+ if sock is not None:
+ sock.close()
+ if exc is None:
+ exc = err
+ else:
+ raise exc
+ return sock
+
+
+def create_transport(host, port, af=0, ssl=None):
+ """COROUTINE: Look up address and create a transport connected to it."""
+ if ssl is None:
+ ssl = (port == 443)
+ sock = yield from create_connection(host, port, af)
+ if ssl:
+ trans = SslTransport(sock)
+ yield from trans.do_handshake()
+ else:
+ trans = SocketTransport(sock)
+ return trans
+
+
+class Listener:
+ """Wrapper for a listening socket."""
+
+ def __init__(self, sock):
+ self.sock = sock
+
+ def accept(self):
+ """COROUTINE: Accept a connection."""
+ while True:
+ try:
+ conn, addr = self.sock.accept()
+ except socket.error as err:
+ if err.errno in _TRYAGAIN:
+ yield from scheduling.block_r(self.sock.fileno())
+ else:
+ raise # Unexpected, propagate.
+ else:
+ conn.setblocking(False)
+ return conn, addr
+
+
+def create_listener(host, port, af=0, socktype=0, proto=0,
+ backlog=5, reuse_addr=True):
+ """COROUTINE: Look up address and create a listener for it."""
+ infos = yield from getaddrinfo(host, port, af, socktype, proto)
+ if not infos:
+ raise IOError('getaddrinfo() returned an empty list')
+ exc = None
+ for af, socktype, proto, cname, address in infos:
+ sock = None
+ try:
+ sock = socket.socket(af, socktype, proto)
+ sock.setblocking(False)
+ if reuse_addr:
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ sock.bind(address)
+ sock.listen(backlog)
+ break
+ except socket.error as err:
+ if sock is not None:
+ sock.close()
+ if exc is None:
+ exc = err
+ else:
+ raise exc
+ return Listener(sock)
diff --git a/old/transports.py b/old/transports.py
new file mode 100644
index 0000000..19095bf
--- /dev/null
+++ b/old/transports.py
@@ -0,0 +1,496 @@
+"""Transports and Protocols, actually.
+
+Inspired by Twisted, PEP 3153 and github.com/lvh/async-pep.
+
+THIS IS NOT REAL CODE! IT IS JUST AN EXPERIMENT.
+"""
+
+# Stdlib imports.
+import collections
+import errno
+import logging
+import socket
+import ssl
+import sys
+import time
+
+# Local imports.
+import polling
+import scheduling
+import sockets
+
+# Errno values indicating the connection was disconnected.
+_DISCONNECTED = frozenset((errno.ECONNRESET,
+ errno.ENOTCONN,
+ errno.ESHUTDOWN,
+ errno.ECONNABORTED,
+ errno.EPIPE,
+ errno.EBADF,
+ ))
+
+# Errno values indicating the socket isn't ready for I/O just yet.
+_TRYAGAIN = frozenset((errno.EAGAIN, errno.EWOULDBLOCK))
+
+
+class Transport:
+ """ABC representing a transport.
+
+ There may be many implementations. The user never instantiates
+ this directly; they call some utility function, passing it a
+ protocol, and the utility function will call the protocol's
+ connection_made() method with a transport (or it will call
+ connection_lost() with an exception if it fails to create the
+ desired transport).
+
+ The implementation here raises NotImplemented for every method
+ except writelines(), which calls write() in a loop.
+ """
+
+ def write(self, data):
+ """Write some data (bytes) to the transport.
+
+ This does not block; it buffers the data and arranges for it
+ to be sent out asynchronously.
+ """
+ raise NotImplementedError
+
+ def writelines(self, list_of_data):
+ """Write a list (or any iterable) of data (bytes) to the transport.
+
+ The default implementation just calls write() for each item in
+ the list/iterable.
+ """
+ for data in list_of_data:
+ self.write(data)
+
+ def close(self):
+ """Closes the transport.
+
+ Buffered data will be flushed asynchronously. No more data will
+ be received. When all buffered data is flushed, the protocol's
+ connection_lost() method is called with None as its argument.
+ """
+ raise NotImplementedError
+
+ def abort(self):
+ """Closes the transport immediately.
+
+ Buffered data will be lost. No more data will be received.
+ The protocol's connection_lost() method is called with None as
+ its argument.
+ """
+ raise NotImplementedError
+
+ def half_close(self):
+ """Closes the write end after flushing buffered data.
+
+ Data may still be received.
+
+ TODO: What's the use case for this? How to implement it?
+ Should it call shutdown(SHUT_WR) after all the data is flushed?
+ Is there no use case for closing the other half first?
+ """
+ raise NotImplementedError
+
+ def pause(self):
+ """Pause the receiving end.
+
+ No data will be received until resume() is called.
+ """
+ raise NotImplementedError
+
+ def resume(self):
+ """Resume the receiving end.
+
+ Cancels a pause() call, resumes receiving data.
+ """
+ raise NotImplementedError
+
+
+class Protocol:
+ """ABC representing a protocol.
+
+ The user should implement this interface. They can inherit from
+ this class but don't need to. The implementations here do
+ nothing.
+
+ When the user wants to requests a transport, they pass a protocol
+ instance to a utility function.
+
+ When the connection is made successfully, connection_made() is
+ called with a suitable transport object. Then data_received()
+ will be called 0 or more times with data (bytes) received from the
+ transport; finally, connection_list() will be called exactly once
+ with either an exception object or None as an argument.
+
+ If the utility function does not succeed in creating a transport,
+ it will call connection_lost() with an exception object.
+
+ State machine of calls:
+
+ start -> [CM -> DR*] -> CL -> end
+ """
+
+ def connection_made(self, transport):
+ """Called when a connection is made.
+
+ The argument is the transport representing the connection.
+ To send data, call its write() or writelines() method.
+ To receive data, wait for data_received() calls.
+ When the connection is closed, connection_lost() is called.
+ """
+
+ def data_received(self, data):
+ """Called when some data is received.
+
+ The argument is a bytes object.
+
+ TODO: Should we allow it to be a bytesarray or some other
+ memory buffer?
+ """
+
+ def connection_lost(self, exc):
+ """Called when the connection is lost or closed.
+
+ Also called when we fail to make a connection at all (in that
+ case connection_made() will not be called).
+
+ The argument is an exception object or None (the latter
+ meaning a regular EOF is received or the connection was
+ aborted or closed).
+ """
+
+
+# TODO: The rest is platform specific and should move elsewhere.
+
+class UnixSocketTransport(Transport):
+
+ def __init__(self, eventloop, protocol, sock):
+ self._eventloop = eventloop
+ self._protocol = protocol
+ self._sock = sock
+ self._buffer = collections.deque() # For write().
+ self._write_closed = False
+
+ def _on_readable(self):
+ try:
+ data = self._sock.recv(8192)
+ except socket.error as exc:
+ if exc.errno not in _TRYAGAIN:
+ self._bad_error(exc)
+ else:
+ if not data:
+ self._eventloop.remove_reader(self._sock.fileno())
+ self._sock.close()
+ self._protocol.connection_lost(None)
+ else:
+ self._protocol.data_received(data) # XXX call_soon()?
+
+ def write(self, data):
+ assert isinstance(data, bytes)
+ assert not self._write_closed
+ if not data:
+ # Silly, but it happens.
+ return
+ if self._buffer:
+ # We've already registered a callback, just buffer the data.
+ self._buffer.append(data)
+ # Consider pausing if the total length of the buffer is
+ # truly huge.
+ return
+
+ # TODO: Refactor so there's more sharing between this and
+ # _on_writable().
+
+ # There's no callback registered yet. It's quite possible
+ # that the kernel has buffer space for our data, so try to
+ # write now. Since the socket is non-blocking it will
+ # give us an error in _TRYAGAIN if it doesn't have enough
+ # space for even one more byte; it will return the number
+ # of bytes written if it can write at least one byte.
+ try:
+ n = self._sock.send(data)
+ except socket.error as exc:
+ # An error.
+ if exc.errno not in _TRYAGAIN:
+ self._bad_error(exc)
+ return
+ # The kernel doesn't have room for more data right now.
+ n = 0
+ else:
+ # Wrote at least one byte.
+ if n == len(data):
+ # Wrote it all. Done!
+ if self._write_closed:
+ self._sock.shutdown(socket.SHUT_WR)
+ return
+ # Throw away the data that was already written.
+ # TODO: Do this without copying the data?
+ data = data[n:]
+ self._buffer.append(data)
+ self._eventloop.add_writer(self._sock.fileno(), self._on_writable)
+
+ def _on_writable(self):
+ while self._buffer:
+ data = self._buffer[0]
+ # TODO: Join small amounts of data?
+ try:
+ n = self._sock.send(data)
+ except socket.error as exc:
+ # Error handling is the same as in write().
+ if exc.errno not in _TRYAGAIN:
+ self._bad_error(exc)
+ return
+ if n < len(data):
+ self._buffer[0] = data[n:]
+ return
+ self._buffer.popleft()
+ self._eventloop.remove_writer(self._sock.fileno())
+ if self._write_closed:
+ self._sock.shutdown(socket.SHUT_WR)
+
+ def abort(self):
+ self._bad_error(None)
+
+ def _bad_error(self, exc):
+ # A serious error. Close the socket etc.
+ fd = self._sock.fileno()
+ # TODO: Record whether we have a writer and/or reader registered.
+ try:
+ self._eventloop.remove_writer(fd)
+ except Exception:
+ pass
+ try:
+ self._eventloop.remove_reader(fd)
+ except Exception:
+ pass
+ self._sock.close()
+ self._protocol.connection_lost(exc) # XXX call_soon()?
+
+ def half_close(self):
+ self._write_closed = True
+
+
+class UnixSslTransport(Transport):
+
+ # TODO: Refactor Socket and Ssl transport to share some code.
+ # (E.g. buffering.)
+
+ # TODO: Consider using coroutines instead of callbacks, it seems
+ # much easier that way.
+
+ def __init__(self, eventloop, protocol, rawsock, sslcontext=None):
+ self._eventloop = eventloop
+ self._protocol = protocol
+ self._rawsock = rawsock
+ self._sslcontext = sslcontext or ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+ self._sslsock = self._sslcontext.wrap_socket(
+ self._rawsock, do_handshake_on_connect=False)
+
+ self._buffer = collections.deque() # For write().
+ self._write_closed = False
+
+ # Try the handshake now. Likely it will raise EAGAIN, then it
+ # will take care of registering the appropriate callback.
+ self._on_handshake()
+
+ def _bad_error(self, exc):
+ # A serious error. Close the socket etc.
+ fd = self._sslsock.fileno()
+ # TODO: Record whether we have a writer and/or reader registered.
+ try:
+ self._eventloop.remove_writer(fd)
+ except Exception:
+ pass
+ try:
+ self._eventloop.remove_reader(fd)
+ except Exception:
+ pass
+ self._sslsock.close()
+ self._protocol.connection_lost(exc) # XXX call_soon()?
+
+ def _on_handshake(self):
+ fd = self._sslsock.fileno()
+ try:
+ self._sslsock.do_handshake()
+ except ssl.SSLWantReadError:
+ self._eventloop.add_reader(fd, self._on_handshake)
+ return
+ except ssl.SSLWantWriteError:
+ self._eventloop.add_writable(fd, self._on_handshake)
+ return
+ # TODO: What if it raises another error?
+ try:
+ self._eventloop.remove_reader(fd)
+ except Exception:
+ pass
+ try:
+ self._eventloop.remove_writer(fd)
+ except Exception:
+ pass
+ self._protocol.connection_made(self)
+ self._eventloop.add_reader(fd, self._on_ready)
+ self._eventloop.add_writer(fd, self._on_ready)
+
+ def _on_ready(self):
+ # Because of renegotiations (?), there's no difference between
+ # readable and writable. We just try both. XXX This may be
+ # incorrect; we probably need to keep state about what we
+ # should do next.
+
+ # Maybe we're already closed...
+ fd = self._sslsock.fileno()
+ if fd < 0:
+ return
+
+ # First try reading.
+ try:
+ data = self._sslsock.recv(8192)
+ except ssl.SSLWantReadError:
+ pass
+ except ssl.SSLWantWriteError:
+ pass
+ except socket.error as exc:
+ if exc.errno not in _TRYAGAIN:
+ self._bad_error(exc)
+ return
+ else:
+ if data:
+ self._protocol.data_received(data)
+ else:
+ # TODO: Don't close when self._buffer is non-empty.
+ assert not self._buffer
+ self._eventloop.remove_reader(fd)
+ self._eventloop.remove_writer(fd)
+ self._sslsock.close()
+ self._protocol.connection_lost(None)
+ return
+
+ # Now try writing, if there's anything to write.
+ if not self._buffer:
+ return
+
+ data = self._buffer[0]
+ try:
+ n = self._sslsock.send(data)
+ except ssl.SSLWantReadError:
+ pass
+ except ssl.SSLWantWriteError:
+ pass
+ except socket.error as exc:
+ if exc.errno not in _TRYAGAIN:
+ self._bad_error(exc)
+ return
+ else:
+ if n == len(data):
+ self._buffer.popleft()
+ # Could try again, but let's just have the next callback do it.
+ else:
+ self._buffer[0] = data[n:]
+
+ def write(self, data):
+ assert isinstance(data, bytes)
+ assert not self._write_closed
+ if not data:
+ return
+ self._buffer.append(data)
+ # We could optimize, but the callback can do this for now.
+
+ def half_close(self):
+ self._write_closed = True
+ # Just set the flag. Calling shutdown() on the ssl socket
+ # breaks something, causing recv() to return binary data.
+
+
+def make_connection(protocol, host, port=None, af=0, socktype=0, proto=0,
+ use_ssl=None):
+ # TODO: Pass in a protocol factory, not a protocol.
+ # What should be the exact sequence of events?
+ # - socket
+ # - transport
+ # - protocol
+ # - tell transport about protocol
+ # - tell protocol about transport
+ # Or should the latter two be reversed? Does it matter?
+ if port is None:
+ port = 443 if use_ssl else 80
+ if use_ssl is None:
+ use_ssl = (port == 443)
+ if not socktype:
+ socktype = socket.SOCK_STREAM
+ eventloop = polling.context.eventloop
+
+ def on_socket_connected(task):
+ assert not task.alive
+ if task.exception is not None:
+ # TODO: Call some callback.
+ raise task.exception
+ sock = task.result
+ assert sock is not None
+ logging.debug('on_socket_connected')
+ if use_ssl:
+ # You can pass an ssl.SSLContext object as use_ssl,
+ # or a bool.
+ if isinstance(use_ssl, bool):
+ sslcontext = None
+ else:
+ sslcontext = use_ssl
+ transport = UnixSslTransport(eventloop, protocol, sock, sslcontext)
+ else:
+ transport = UnixSocketTransport(eventloop, protocol, sock)
+ # TODO: Should the ransport make the following calls?
+ protocol.connection_made(transport) # XXX call_soon()?
+ # Don't do this before connection_made() is called.
+ eventloop.add_reader(sock.fileno(), transport._on_readable)
+
+ coro = sockets.create_connection(host, port, af, socktype, proto)
+ task = scheduling.Task(coro)
+ task.add_done_callback(on_socket_connected)
+
+
+def main(): # Testing...
+
+ # Initialize logging.
+ if '-d' in sys.argv:
+ level = logging.DEBUG
+ elif '-v' in sys.argv:
+ level = logging.INFO
+ elif '-q' in sys.argv:
+ level = logging.ERROR
+ else:
+ level = logging.WARN
+ logging.basicConfig(level=level)
+
+ host = 'xkcd.com'
+ if sys.argv[1:] and '.' in sys.argv[-1]:
+ host = sys.argv[-1]
+
+ t0 = time.time()
+
+ class TestProtocol(Protocol):
+ def connection_made(self, transport):
+ logging.info('Connection made at %.3f secs', time.time() - t0)
+ self.transport = transport
+ self.transport.write(b'GET / HTTP/1.0\r\nHost: ' +
+ host.encode('ascii') +
+ b'\r\n\r\n')
+ self.transport.half_close()
+ def data_received(self, data):
+ logging.info('Received %d bytes at t=%.3f',
+ len(data), time.time() - t0)
+ logging.debug('Received %r', data)
+ def connection_lost(self, exc):
+ logging.debug('Connection lost: %r', exc)
+ self.t1 = time.time()
+ logging.info('Total time %.3f secs', self.t1 - t0)
+
+ tp = TestProtocol()
+ logging.debug('tp = %r', tp)
+ make_connection(tp, host, use_ssl=('-S' in sys.argv))
+ logging.info('Running...')
+ polling.context.eventloop.run()
+ logging.info('Done.')
+
+
+if __name__ == '__main__':
+ main()
diff --git a/old/xkcd.py b/old/xkcd.py
new file mode 100755
index 0000000..474009d
--- /dev/null
+++ b/old/xkcd.py
@@ -0,0 +1,18 @@
+#!/usr/bin/env python3.3
+"""Minimal synchronous SSL demo, connecting to xkcd.com."""
+
+import socket, ssl
+
+s = socket.socket()
+s.connect(('xkcd.com', 443))
+ss = ssl.wrap_socket(s)
+
+ss.send(b'GET / HTTP/1.0\r\n\r\n')
+
+while True:
+ data = ss.recv(1000000)
+ print(data)
+ if not data:
+ break
+
+ss.close()
diff --git a/old/yyftime.py b/old/yyftime.py
new file mode 100644
index 0000000..f55234b
--- /dev/null
+++ b/old/yyftime.py
@@ -0,0 +1,75 @@
+"""Compare timing of yield-from <generator> vs. yield <future> calls."""
+
+import gc
+import time
+
+def coroutine(n):
+ if n <= 0:
+ return 1
+ l = yield from coroutine(n-1)
+ r = yield from coroutine(n-1)
+ return l + 1 + r
+
+def run_coro(depth):
+ t0 = time.time()
+ try:
+ g = coroutine(depth)
+ while True:
+ next(g)
+ except StopIteration as err:
+ k = err.value
+ t1 = time.time()
+ print('coro', depth, k, round(t1-t0, 6))
+ return t1-t0
+
+class Future:
+
+ def __init__(self, g):
+ self.g = g
+
+ def wait(self):
+ value = None
+ try:
+ while True:
+ f = self.g.send(value)
+ f.wait()
+ value = f.value
+ except StopIteration as err:
+ self.value = err.value
+
+
+
+def task(func): # Decorator
+ def wrapper(*args):
+ g = func(*args)
+ f = Future(g)
+ return f
+ return wrapper
+
+@task
+def oldstyle(n):
+ if n <= 0:
+ return 1
+ l = yield oldstyle(n-1)
+ r = yield oldstyle(n-1)
+ return l + 1 + r
+
+def run_olds(depth):
+ t0 = time.time()
+ f = oldstyle(depth)
+ f.wait()
+ k = f.value
+ t1 = time.time()
+ print('olds', depth, k, round(t1-t0, 6))
+ return t1-t0
+
+def main():
+ gc.disable()
+ for depth in range(16):
+ tc = run_coro(depth)
+ to = run_olds(depth)
+ if tc:
+ print('ratio', round(to/tc, 2))
+
+if __name__ == '__main__':
+ main()