diff options
author | Victor Stinner <victor.stinner@gmail.com> | 2015-01-26 10:52:45 +0100 |
---|---|---|
committer | Victor Stinner <victor.stinner@gmail.com> | 2015-01-26 10:52:45 +0100 |
commit | 684f3be00011d3c6cc4f81f5cb61c157e5eb5205 (patch) | |
tree | 886f9bbb1d98945db650803971bc72c56cebd142 /examples | |
download | trollius-git-684f3be00011d3c6cc4f81f5cb61c157e5eb5205.tar.gz |
Python issue #23208: Add BaseEventLoop._current_handle
In debug mode, BaseEventLoop._run_once() now sets the
BaseEventLoop._current_handle attribute to the handle currently executed.
In release mode or when no handle is executed, the attribute is None.
BaseEventLoop.default_exception_handler() displays the traceback of the current
handle if available.
Diffstat (limited to 'examples')
-rw-r--r-- | examples/cacheclt.py | 213 | ||||
-rw-r--r-- | examples/cachesvr.py | 249 | ||||
-rw-r--r-- | examples/child_process.py | 128 | ||||
-rw-r--r-- | examples/crawl.py | 863 | ||||
-rw-r--r-- | examples/echo_client_tulip.py | 20 | ||||
-rw-r--r-- | examples/echo_server_tulip.py | 20 | ||||
-rw-r--r-- | examples/fetch0.py | 35 | ||||
-rw-r--r-- | examples/fetch1.py | 78 | ||||
-rw-r--r-- | examples/fetch2.py | 141 | ||||
-rw-r--r-- | examples/fetch3.py | 230 | ||||
-rw-r--r-- | examples/fuzz_as_completed.py | 69 | ||||
-rw-r--r-- | examples/hello_callback.py | 17 | ||||
-rw-r--r-- | examples/hello_coroutine.py | 18 | ||||
-rw-r--r-- | examples/shell.py | 50 | ||||
-rw-r--r-- | examples/simple_tcp_server.py | 154 | ||||
-rw-r--r-- | examples/sink.py | 94 | ||||
-rw-r--r-- | examples/source.py | 100 | ||||
-rw-r--r-- | examples/source1.py | 98 | ||||
-rw-r--r-- | examples/stacks.py | 44 | ||||
-rw-r--r-- | examples/subprocess_attach_read_pipe.py | 33 | ||||
-rw-r--r-- | examples/subprocess_attach_write_pipe.py | 35 | ||||
-rw-r--r-- | examples/subprocess_shell.py | 87 | ||||
-rwxr-xr-x | examples/tcp_echo.py | 128 | ||||
-rw-r--r-- | examples/timing_tcp_server.py | 168 | ||||
-rwxr-xr-x | examples/udp_echo.py | 104 |
25 files changed, 3176 insertions, 0 deletions
diff --git a/examples/cacheclt.py b/examples/cacheclt.py new file mode 100644 index 0000000..b11a4d1 --- /dev/null +++ b/examples/cacheclt.py @@ -0,0 +1,213 @@ +"""Client for cache server. + +See cachesvr.py for protocol description. +""" + +import argparse +import asyncio +from asyncio import test_utils +import json +import logging + +ARGS = argparse.ArgumentParser(description='Cache client example.') +ARGS.add_argument( + '--tls', action='store_true', dest='tls', + default=False, help='Use TLS') +ARGS.add_argument( + '--iocp', action='store_true', dest='iocp', + default=False, help='Use IOCP event loop (Windows only)') +ARGS.add_argument( + '--host', action='store', dest='host', + default='localhost', help='Host name') +ARGS.add_argument( + '--port', action='store', dest='port', + default=54321, type=int, help='Port number') +ARGS.add_argument( + '--timeout', action='store', dest='timeout', + default=5, type=float, help='Timeout') +ARGS.add_argument( + '--max_backoff', action='store', dest='max_backoff', + default=5, type=float, help='Max backoff on reconnect') +ARGS.add_argument( + '--ntasks', action='store', dest='ntasks', + default=10, type=int, help='Number of tester tasks') +ARGS.add_argument( + '--ntries', action='store', dest='ntries', + default=5, type=int, help='Number of request tries before giving up') + + +args = ARGS.parse_args() + + +class CacheClient: + """Multiplexing cache client. + + This wraps a single connection to the cache client. The + connection is automatically re-opened when an error occurs. + + Multiple tasks may share this object; the requests will be + serialized. + + The public API is get(), set(), delete() (all are coroutines). + """ + + def __init__(self, host, port, sslctx=None, loop=None): + self.host = host + self.port = port + self.sslctx = sslctx + self.loop = loop + self.todo = set() + self.initialized = False + self.task = asyncio.Task(self.activity(), loop=self.loop) + + @asyncio.coroutine + def get(self, key): + resp = yield from self.request('get', key) + if resp is None: + return None + return resp.get('value') + + @asyncio.coroutine + def set(self, key, value): + resp = yield from self.request('set', key, value) + if resp is None: + return False + return resp.get('status') == 'ok' + + @asyncio.coroutine + def delete(self, key): + resp = yield from self.request('delete', key) + if resp is None: + return False + return resp.get('status') == 'ok' + + @asyncio.coroutine + def request(self, type, key, value=None): + assert not self.task.done() + data = {'type': type, 'key': key} + if value is not None: + data['value'] = value + payload = json.dumps(data).encode('utf8') + waiter = asyncio.Future(loop=self.loop) + if self.initialized: + try: + yield from self.send(payload, waiter) + except IOError: + self.todo.add((payload, waiter)) + else: + self.todo.add((payload, waiter)) + return (yield from waiter) + + @asyncio.coroutine + def activity(self): + backoff = 0 + while True: + try: + self.reader, self.writer = yield from asyncio.open_connection( + self.host, self.port, ssl=self.sslctx, loop=self.loop) + except Exception as exc: + backoff = min(args.max_backoff, backoff + (backoff//2) + 1) + logging.info('Error connecting: %r; sleep %s', exc, backoff) + yield from asyncio.sleep(backoff, loop=self.loop) + continue + backoff = 0 + self.next_id = 0 + self.pending = {} + self. initialized = True + try: + while self.todo: + payload, waiter = self.todo.pop() + if not waiter.done(): + yield from self.send(payload, waiter) + while True: + resp_id, resp = yield from self.process() + if resp_id in self.pending: + payload, waiter = self.pending.pop(resp_id) + if not waiter.done(): + waiter.set_result(resp) + except Exception as exc: + self.initialized = False + self.writer.close() + while self.pending: + req_id, pair = self.pending.popitem() + payload, waiter = pair + if not waiter.done(): + self.todo.add(pair) + logging.info('Error processing: %r', exc) + + @asyncio.coroutine + def send(self, payload, waiter): + self.next_id += 1 + req_id = self.next_id + frame = 'request %d %d\n' % (req_id, len(payload)) + self.writer.write(frame.encode('ascii')) + self.writer.write(payload) + self.pending[req_id] = payload, waiter + yield from self.writer.drain() + + @asyncio.coroutine + def process(self): + frame = yield from self.reader.readline() + if not frame: + raise EOFError() + head, tail = frame.split(None, 1) + if head == b'error': + raise IOError('OOB error: %r' % tail) + if head != b'response': + raise IOError('Bad frame: %r' % frame) + resp_id, resp_size = map(int, tail.split()) + data = yield from self.reader.readexactly(resp_size) + if len(data) != resp_size: + raise EOFError() + resp = json.loads(data.decode('utf8')) + return resp_id, resp + + +def main(): + asyncio.set_event_loop(None) + if args.iocp: + from asyncio.windows_events import ProactorEventLoop + loop = ProactorEventLoop() + else: + loop = asyncio.new_event_loop() + sslctx = None + if args.tls: + sslctx = test_utils.dummy_ssl_context() + cache = CacheClient(args.host, args.port, sslctx=sslctx, loop=loop) + try: + loop.run_until_complete( + asyncio.gather( + *[testing(i, cache, loop) for i in range(args.ntasks)], + loop=loop)) + finally: + loop.close() + + +@asyncio.coroutine +def testing(label, cache, loop): + + def w(g): + return asyncio.wait_for(g, args.timeout, loop=loop) + + key = 'foo-%s' % label + while True: + logging.info('%s %s', label, '-'*20) + try: + ret = yield from w(cache.set(key, 'hello-%s-world' % label)) + logging.info('%s set %s', label, ret) + ret = yield from w(cache.get(key)) + logging.info('%s get %s', label, ret) + ret = yield from w(cache.delete(key)) + logging.info('%s del %s', label, ret) + ret = yield from w(cache.get(key)) + logging.info('%s get2 %s', label, ret) + except asyncio.TimeoutError: + logging.warn('%s Timeout', label) + except Exception as exc: + logging.exception('%s Client exception: %r', label, exc) + break + + +if __name__ == '__main__': + logging.basicConfig(level=logging.INFO) + main() diff --git a/examples/cachesvr.py b/examples/cachesvr.py new file mode 100644 index 0000000..053f9c2 --- /dev/null +++ b/examples/cachesvr.py @@ -0,0 +1,249 @@ +"""A simple memcache-like server. + +The basic data structure maintained is a single in-memory dictionary +mapping string keys to string values, with operations get, set and +delete. (Both keys and values may contain Unicode.) + +This is a TCP server listening on port 54321. There is no +authentication. + +Requests provide an operation and return a response. A connection may +be used for multiple requests. The connection is closed when a client +sends a bad request. + +If a client is idle for over 5 seconds (i.e., it does not send another +request, or fails to read the whole response, within this time), it is +disconnected. + +Framing of requests and responses within a connection uses a +line-based protocol. The first line of a request is the frame header +and contains three whitespace-delimited token followed by LF or CRLF: + +- the keyword 'request' +- a decimal request ID; the first request is '1', the second '2', etc. +- a decimal byte count giving the size of the rest of the request + +Note that the requests ID *must* be consecutive and start at '1' for +each connection. + +Response frames look the same except the keyword is 'response'. The +response ID matches the request ID. There should be exactly one +response to each request and responses should be seen in the same +order as the requests. + +After the frame, individual requests and responses are JSON encoded. + +If the frame header or the JSON request body cannot be parsed, an +unframed error message (always starting with 'error') is written back +and the connection is closed. + +JSON-encoded requests can be: + +- {"type": "get", "key": <string>} +- {"type": "set", "key": <string>, "value": <string>} +- {"type": "delete", "key": <string>} + +Responses are also JSON-encoded: + +- {"status": "ok", "value": <string>} # Successful get request +- {"status": "ok"} # Successful set or delete request +- {"status": "notfound"} # Key not found for get or delete request + +If the request is valid JSON but cannot be handled (e.g., the type or +key field is absent or invalid), an error response of the following +form is returned, but the connection is not closed: + +- {"error": <string>} +""" + +import argparse +import asyncio +import json +import logging +import os +import random + +ARGS = argparse.ArgumentParser(description='Cache server example.') +ARGS.add_argument( + '--tls', action='store_true', dest='tls', + default=False, help='Use TLS') +ARGS.add_argument( + '--iocp', action='store_true', dest='iocp', + default=False, help='Use IOCP event loop (Windows only)') +ARGS.add_argument( + '--host', action='store', dest='host', + default='localhost', help='Host name') +ARGS.add_argument( + '--port', action='store', dest='port', + default=54321, type=int, help='Port number') +ARGS.add_argument( + '--timeout', action='store', dest='timeout', + default=5, type=float, help='Timeout') +ARGS.add_argument( + '--random_failure_percent', action='store', dest='fail_percent', + default=0, type=float, help='Fail randomly N percent of the time') +ARGS.add_argument( + '--random_failure_sleep', action='store', dest='fail_sleep', + default=0, type=float, help='Sleep time when randomly failing') +ARGS.add_argument( + '--random_response_sleep', action='store', dest='resp_sleep', + default=0, type=float, help='Sleep time before responding') + +args = ARGS.parse_args() + + +class Cache: + + def __init__(self, loop): + self.loop = loop + self.table = {} + + @asyncio.coroutine + def handle_client(self, reader, writer): + # Wrapper to log stuff and close writer (i.e., transport). + peer = writer.get_extra_info('socket').getpeername() + logging.info('got a connection from %s', peer) + try: + yield from self.frame_parser(reader, writer) + except Exception as exc: + logging.error('error %r from %s', exc, peer) + else: + logging.info('end connection from %s', peer) + finally: + writer.close() + + @asyncio.coroutine + def frame_parser(self, reader, writer): + # This takes care of the framing. + last_request_id = 0 + while True: + # Read the frame header, parse it, read the data. + # NOTE: The readline() and readexactly() calls will hang + # if the client doesn't send enough data but doesn't + # disconnect either. We add a timeout to each. (But the + # timeout should really be implemented by StreamReader.) + framing_b = yield from asyncio.wait_for( + reader.readline(), + timeout=args.timeout, loop=self.loop) + if random.random()*100 < args.fail_percent: + logging.warn('Inserting random failure') + yield from asyncio.sleep(args.fail_sleep*random.random(), + loop=self.loop) + writer.write(b'error random failure\r\n') + break + logging.debug('framing_b = %r', framing_b) + if not framing_b: + break # Clean close. + try: + frame_keyword, request_id_b, byte_count_b = framing_b.split() + except ValueError: + writer.write(b'error unparseable frame\r\n') + break + if frame_keyword != b'request': + writer.write(b'error frame does not start with request\r\n') + break + try: + request_id, byte_count = int(request_id_b), int(byte_count_b) + except ValueError: + writer.write(b'error unparsable frame parameters\r\n') + break + if request_id != last_request_id + 1 or byte_count < 2: + writer.write(b'error invalid frame parameters\r\n') + break + last_request_id = request_id + request_b = yield from asyncio.wait_for( + reader.readexactly(byte_count), + timeout=args.timeout, loop=self.loop) + try: + request = json.loads(request_b.decode('utf8')) + except ValueError: + writer.write(b'error unparsable json\r\n') + break + response = self.handle_request(request) # Not a coroutine. + if response is None: + writer.write(b'error unhandlable request\r\n') + break + response_b = json.dumps(response).encode('utf8') + b'\r\n' + byte_count = len(response_b) + framing_s = 'response {} {}\r\n'.format(request_id, byte_count) + writer.write(framing_s.encode('ascii')) + yield from asyncio.sleep(args.resp_sleep*random.random(), + loop=self.loop) + writer.write(response_b) + + def handle_request(self, request): + # This parses one request and farms it out to a specific handler. + # Return None for all errors. + if not isinstance(request, dict): + return {'error': 'request is not a dict'} + request_type = request.get('type') + if request_type is None: + return {'error': 'no type in request'} + if request_type not in {'get', 'set', 'delete'}: + return {'error': 'unknown request type'} + key = request.get('key') + if not isinstance(key, str): + return {'error': 'key is not a string'} + if request_type == 'get': + return self.handle_get(key) + if request_type == 'set': + value = request.get('value') + if not isinstance(value, str): + return {'error': 'value is not a string'} + return self.handle_set(key, value) + if request_type == 'delete': + return self.handle_delete(key) + assert False, 'bad request type' # Should have been caught above. + + def handle_get(self, key): + value = self.table.get(key) + if value is None: + return {'status': 'notfound'} + else: + return {'status': 'ok', 'value': value} + + def handle_set(self, key, value): + self.table[key] = value + return {'status': 'ok'} + + def handle_delete(self, key): + if key not in self.table: + return {'status': 'notfound'} + else: + del self.table[key] + return {'status': 'ok'} + + +def main(): + asyncio.set_event_loop(None) + if args.iocp: + from asyncio.windows_events import ProactorEventLoop + loop = ProactorEventLoop() + else: + loop = asyncio.new_event_loop() + sslctx = None + if args.tls: + import ssl + # TODO: take cert/key from args as well. + here = os.path.join(os.path.dirname(__file__), '..', 'tests') + sslctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + sslctx.options |= ssl.OP_NO_SSLv2 + sslctx.load_cert_chain( + certfile=os.path.join(here, 'ssl_cert.pem'), + keyfile=os.path.join(here, 'ssl_key.pem')) + cache = Cache(loop) + task = asyncio.streams.start_server(cache.handle_client, + args.host, args.port, + ssl=sslctx, loop=loop) + svr = loop.run_until_complete(task) + for sock in svr.sockets: + logging.info('socket %s', sock.getsockname()) + try: + loop.run_forever() + finally: + loop.close() + + +if __name__ == '__main__': + logging.basicConfig(level=logging.INFO) + main() diff --git a/examples/child_process.py b/examples/child_process.py new file mode 100644 index 0000000..3fac175 --- /dev/null +++ b/examples/child_process.py @@ -0,0 +1,128 @@ +""" +Example of asynchronous interaction with a child python process. + +This example shows how to attach an existing Popen object and use the low level +transport-protocol API. See shell.py and subprocess_shell.py for higher level +examples. +""" + +import os +import sys + +try: + import asyncio +except ImportError: + # asyncio is not installed + sys.path.append(os.path.join(os.path.dirname(__file__), '..')) + import asyncio + +if sys.platform == 'win32': + from asyncio.windows_utils import Popen, PIPE + from asyncio.windows_events import ProactorEventLoop +else: + from subprocess import Popen, PIPE + +# +# Return a write-only transport wrapping a writable pipe +# + +@asyncio.coroutine +def connect_write_pipe(file): + loop = asyncio.get_event_loop() + transport, _ = yield from loop.connect_write_pipe(asyncio.Protocol, file) + return transport + +# +# Wrap a readable pipe in a stream +# + +@asyncio.coroutine +def connect_read_pipe(file): + loop = asyncio.get_event_loop() + stream_reader = asyncio.StreamReader(loop=loop) + def factory(): + return asyncio.StreamReaderProtocol(stream_reader) + transport, _ = yield from loop.connect_read_pipe(factory, file) + return stream_reader, transport + + +# +# Example +# + +@asyncio.coroutine +def main(loop): + # program which prints evaluation of each expression from stdin + code = r'''if 1: + import os + def writeall(fd, buf): + while buf: + n = os.write(fd, buf) + buf = buf[n:] + while True: + s = os.read(0, 1024) + if not s: + break + s = s.decode('ascii') + s = repr(eval(s)) + '\n' + s = s.encode('ascii') + writeall(1, s) + ''' + + # commands to send to input + commands = iter([b"1+1\n", + b"2**16\n", + b"1/3\n", + b"'x'*50", + b"1/0\n"]) + + # start subprocess and wrap stdin, stdout, stderr + p = Popen([sys.executable, '-c', code], + stdin=PIPE, stdout=PIPE, stderr=PIPE) + + stdin = yield from connect_write_pipe(p.stdin) + stdout, stdout_transport = yield from connect_read_pipe(p.stdout) + stderr, stderr_transport = yield from connect_read_pipe(p.stderr) + + # interact with subprocess + name = {stdout:'OUT', stderr:'ERR'} + registered = {asyncio.Task(stderr.readline()): stderr, + asyncio.Task(stdout.readline()): stdout} + while registered: + # write command + cmd = next(commands, None) + if cmd is None: + stdin.close() + else: + print('>>>', cmd.decode('ascii').rstrip()) + stdin.write(cmd) + + # get and print lines from stdout, stderr + timeout = None + while registered: + done, pending = yield from asyncio.wait( + registered, timeout=timeout, + return_when=asyncio.FIRST_COMPLETED) + if not done: + break + for f in done: + stream = registered.pop(f) + res = f.result() + print(name[stream], res.decode('ascii').rstrip()) + if res != b'': + registered[asyncio.Task(stream.readline())] = stream + timeout = 0.0 + + stdout_transport.close() + stderr_transport.close() + +if __name__ == '__main__': + if sys.platform == 'win32': + loop = ProactorEventLoop() + asyncio.set_event_loop(loop) + else: + loop = asyncio.get_event_loop() + try: + loop.run_until_complete(main(loop)) + finally: + loop.close() diff --git a/examples/crawl.py b/examples/crawl.py new file mode 100644 index 0000000..4bb0b4e --- /dev/null +++ b/examples/crawl.py @@ -0,0 +1,863 @@ +#!/usr/bin/env python3.4 + +"""A simple web crawler.""" + +# TODO: +# - More organized logging (with task ID or URL?). +# - Use logging module for Logger. +# - KeyboardInterrupt in HTML parsing may hang or report unretrieved error. +# - Support gzip encoding. +# - Close connection if HTTP/1.0 response. +# - Add timeouts. (E.g. when switching networks, all seems to hang.) +# - Add arguments to specify TLS settings (e.g. cert/key files). +# - Skip reading large non-text/html files? +# - Use ETag and If-Modified-Since? +# - Handle out of file descriptors directly? (How?) + +import argparse +import asyncio +import asyncio.locks +import cgi +from http.client import BadStatusLine +import logging +import re +import sys +import time +import urllib.parse + + +ARGS = argparse.ArgumentParser(description="Web crawler") +ARGS.add_argument( + '--iocp', action='store_true', dest='iocp', + default=False, help='Use IOCP event loop (Windows only)') +ARGS.add_argument( + '--select', action='store_true', dest='select', + default=False, help='Use Select event loop instead of default') +ARGS.add_argument( + 'roots', nargs='*', + default=[], help='Root URL (may be repeated)') +ARGS.add_argument( + '--max_redirect', action='store', type=int, metavar='N', + default=10, help='Limit redirection chains (for 301, 302 etc.)') +ARGS.add_argument( + '--max_tries', action='store', type=int, metavar='N', + default=4, help='Limit retries on network errors') +ARGS.add_argument( + '--max_tasks', action='store', type=int, metavar='N', + default=100, help='Limit concurrent connections') +ARGS.add_argument( + '--max_pool', action='store', type=int, metavar='N', + default=100, help='Limit connection pool size') +ARGS.add_argument( + '--exclude', action='store', metavar='REGEX', + help='Exclude matching URLs') +ARGS.add_argument( + '--strict', action='store_true', + default=True, help='Strict host matching (default)') +ARGS.add_argument( + '--lenient', action='store_false', dest='strict', + default=False, help='Lenient host matching') +ARGS.add_argument( + '-v', '--verbose', action='count', dest='level', + default=1, help='Verbose logging (repeat for more verbose)') +ARGS.add_argument( + '-q', '--quiet', action='store_const', const=0, dest='level', + default=1, help='Quiet logging (opposite of --verbose)') + + +ESCAPES = [('quot', '"'), + ('gt', '>'), + ('lt', '<'), + ('amp', '&') # Must be last. + ] + + +def unescape(url): + """Turn & into &, and so on. + + This is the inverse of cgi.escape(). + """ + for name, char in ESCAPES: + url = url.replace('&' + name + ';', char) + return url + + +def fix_url(url): + """Prefix a schema-less URL with http://.""" + if '://' not in url: + url = 'http://' + url + return url + + +class Logger: + + def __init__(self, level): + self.level = level + + def _log(self, n, args): + if self.level >= n: + print(*args, file=sys.stderr, flush=True) + + def log(self, n, *args): + self._log(n, args) + + def __call__(self, n, *args): + self._log(n, args) + + +class ConnectionPool: + """A connection pool. + + To open a connection, use reserve(). To recycle it, use unreserve(). + + The pool is mostly just a mapping from (host, port, ssl) tuples to + lists of Connections. The currently active connections are *not* + in the data structure; get_connection() takes the connection out, + and recycle_connection() puts it back in. To recycle a + connection, call conn.close(recycle=True). + + There are limits to both the overall pool and the per-key pool. + """ + + def __init__(self, log, max_pool=10, max_tasks=5): + self.log = log + self.max_pool = max_pool # Overall limit. + self.max_tasks = max_tasks # Per-key limit. + self.loop = asyncio.get_event_loop() + self.connections = {} # {(host, port, ssl): [Connection, ...], ...} + self.queue = [] # [Connection, ...] + + def close(self): + """Close all connections available for reuse.""" + for conns in self.connections.values(): + for conn in conns: + conn.close() + self.connections.clear() + self.queue.clear() + + @asyncio.coroutine + def get_connection(self, host, port, ssl): + """Create or reuse a connection.""" + port = port or (443 if ssl else 80) + try: + ipaddrs = yield from self.loop.getaddrinfo(host, port) + except Exception as exc: + self.log(0, 'Exception %r for (%r, %r)' % (exc, host, port)) + raise + self.log(1, '* %s resolves to %s' % + (host, ', '.join(ip[4][0] for ip in ipaddrs))) + + # Look for a reusable connection. + for _, _, _, _, (h, p, *_) in ipaddrs: + key = h, p, ssl + conn = None + conns = self.connections.get(key) + while conns: + conn = conns.pop(0) + self.queue.remove(conn) + if not conns: + del self.connections[key] + if conn.stale(): + self.log(1, 'closing stale connection for', key) + conn.close() # Just in case. + else: + self.log(1, '* Reusing pooled connection', key, + 'FD =', conn.fileno()) + return conn + + # Create a new connection. + conn = Connection(self.log, self, host, port, ssl) + yield from conn.connect() + self.log(1, '* New connection', conn.key, 'FD =', conn.fileno()) + return conn + + def recycle_connection(self, conn): + """Make a connection available for reuse. + + This also prunes the pool if it exceeds the size limits. + """ + if conn.stale(): + conn.close() + return + + key = conn.key + conns = self.connections.setdefault(key, []) + conns.append(conn) + self.queue.append(conn) + + if len(conns) <= self.max_tasks and len(self.queue) <= self.max_pool: + return + + # Prune the queue. + + # Close stale connections for this key first. + stale = [conn for conn in conns if conn.stale()] + if stale: + for conn in stale: + conns.remove(conn) + self.queue.remove(conn) + self.log(1, 'closing stale connection for', key) + conn.close() + if not conns: + del self.connections[key] + + # Close oldest connection(s) for this key if limit reached. + while len(conns) > self.max_tasks: + conn = conns.pop(0) + self.queue.remove(conn) + self.log(1, 'closing oldest connection for', key) + conn.close() + + if len(self.queue) <= self.max_pool: + return + + # Close overall stale connections. + stale = [conn for conn in self.queue if conn.stale()] + if stale: + for conn in stale: + conns = self.connections.get(conn.key) + conns.remove(conn) + self.queue.remove(conn) + self.log(1, 'closing stale connection for', key) + conn.close() + + # Close oldest overall connection(s) if limit reached. + while len(self.queue) > self.max_pool: + conn = self.queue.pop(0) + conns = self.connections.get(conn.key) + c = conns.pop(0) + assert conn == c, (conn.key, conn, c, conns) + self.log(1, 'closing overall oldest connection for', conn.key) + conn.close() + + +class Connection: + + def __init__(self, log, pool, host, port, ssl): + self.log = log + self.pool = pool + self.host = host + self.port = port + self.ssl = ssl + self.reader = None + self.writer = None + self.key = None + + def stale(self): + return self.reader is None or self.reader.at_eof() + + def fileno(self): + writer = self.writer + if writer is not None: + transport = writer.transport + if transport is not None: + sock = transport.get_extra_info('socket') + if sock is not None: + return sock.fileno() + return None + + @asyncio.coroutine + def connect(self): + self.reader, self.writer = yield from asyncio.open_connection( + self.host, self.port, ssl=self.ssl) + peername = self.writer.get_extra_info('peername') + if peername: + self.host, self.port = peername[:2] + else: + self.log(1, 'NO PEERNAME???', self.host, self.port, self.ssl) + self.key = self.host, self.port, self.ssl + + def close(self, recycle=False): + if recycle and not self.stale(): + self.pool.recycle_connection(self) + else: + self.writer.close() + self.pool = self.reader = self.writer = None + + +class Request: + """HTTP request. + + Use connect() to open a connection; send_request() to send the + request; get_response() to receive the response headers. + """ + + def __init__(self, log, url, pool): + self.log = log + self.url = url + self.pool = pool + self.parts = urllib.parse.urlparse(self.url) + self.scheme = self.parts.scheme + assert self.scheme in ('http', 'https'), repr(url) + self.ssl = self.parts.scheme == 'https' + self.netloc = self.parts.netloc + self.hostname = self.parts.hostname + self.port = self.parts.port or (443 if self.ssl else 80) + self.path = (self.parts.path or '/') + self.query = self.parts.query + if self.query: + self.full_path = '%s?%s' % (self.path, self.query) + else: + self.full_path = self.path + self.http_version = 'HTTP/1.1' + self.method = 'GET' + self.headers = [] + self.conn = None + + @asyncio.coroutine + def connect(self): + """Open a connection to the server.""" + self.log(1, '* Connecting to %s:%s using %s for %s' % + (self.hostname, self.port, + 'ssl' if self.ssl else 'tcp', + self.url)) + self.conn = yield from self.pool.get_connection(self.hostname, + self.port, self.ssl) + + def close(self, recycle=False): + """Close the connection, recycle if requested.""" + if self.conn is not None: + if not recycle: + self.log(1, 'closing connection for', self.conn.key) + self.conn.close(recycle) + self.conn = None + + @asyncio.coroutine + def putline(self, line): + """Write a line to the connection. + + Used for the request line and headers. + """ + self.log(2, '>', line) + self.conn.writer.write(line.encode('latin-1') + b'\r\n') + + @asyncio.coroutine + def send_request(self): + """Send the request.""" + request_line = '%s %s %s' % (self.method, self.full_path, + self.http_version) + yield from self.putline(request_line) + # TODO: What if a header is already set? + self.headers.append(('User-Agent', 'asyncio-example-crawl/0.0')) + self.headers.append(('Host', self.netloc)) + self.headers.append(('Accept', '*/*')) + ##self.headers.append(('Accept-Encoding', 'gzip')) + for key, value in self.headers: + line = '%s: %s' % (key, value) + yield from self.putline(line) + yield from self.putline('') + + @asyncio.coroutine + def get_response(self): + """Receive the response.""" + response = Response(self.log, self.conn.reader) + yield from response.read_headers() + return response + + +class Response: + """HTTP response. + + Call read_headers() to receive the request headers. Then check + the status attribute and call get_header() to inspect the headers. + Finally call read() to receive the body. + """ + + def __init__(self, log, reader): + self.log = log + self.reader = reader + self.http_version = None # 'HTTP/1.1' + self.status = None # 200 + self.reason = None # 'Ok' + self.headers = [] # [('Content-Type', 'text/html')] + + @asyncio.coroutine + def getline(self): + """Read one line from the connection.""" + line = (yield from self.reader.readline()).decode('latin-1').rstrip() + self.log(2, '<', line) + return line + + @asyncio.coroutine + def read_headers(self): + """Read the response status and the request headers.""" + status_line = yield from self.getline() + status_parts = status_line.split(None, 2) + if len(status_parts) != 3: + self.log(0, 'bad status_line', repr(status_line)) + raise BadStatusLine(status_line) + self.http_version, status, self.reason = status_parts + self.status = int(status) + while True: + header_line = yield from self.getline() + if not header_line: + break + # TODO: Continuation lines. + key, value = header_line.split(':', 1) + self.headers.append((key, value.strip())) + + def get_redirect_url(self, default=''): + """Inspect the status and return the redirect url if appropriate.""" + if self.status not in (300, 301, 302, 303, 307): + return default + return self.get_header('Location', default) + + def get_header(self, key, default=''): + """Get one header value, using a case insensitive header name.""" + key = key.lower() + for k, v in self.headers: + if k.lower() == key: + return v + return default + + @asyncio.coroutine + def read(self): + """Read the response body. + + This honors Content-Length and Transfer-Encoding: chunked. + """ + nbytes = None + for key, value in self.headers: + if key.lower() == 'content-length': + nbytes = int(value) + break + if nbytes is None: + if self.get_header('transfer-encoding').lower() == 'chunked': + self.log(2, 'parsing chunked response') + blocks = [] + while True: + size_header = yield from self.reader.readline() + if not size_header: + self.log(0, 'premature end of chunked response') + break + self.log(3, 'size_header =', repr(size_header)) + parts = size_header.split(b';') + size = int(parts[0], 16) + if size: + self.log(3, 'reading chunk of', size, 'bytes') + block = yield from self.reader.readexactly(size) + assert len(block) == size, (len(block), size) + blocks.append(block) + crlf = yield from self.reader.readline() + assert crlf == b'\r\n', repr(crlf) + if not size: + break + body = b''.join(blocks) + self.log(1, 'chunked response had', len(body), + 'bytes in', len(blocks), 'blocks') + else: + self.log(3, 'reading until EOF') + body = yield from self.reader.read() + # TODO: Should make sure not to recycle the connection + # in this case. + else: + body = yield from self.reader.readexactly(nbytes) + return body + + +class Fetcher: + """Logic and state for one URL. + + When found in crawler.busy, this represents a URL to be fetched or + in the process of being fetched; when found in crawler.done, this + holds the results from fetching it. + + This is usually associated with a task. This references the + crawler for the connection pool and to add more URLs to its todo + list. + + Call fetch() to do the fetching, then report() to print the results. + """ + + def __init__(self, log, url, crawler, max_redirect=10, max_tries=4): + self.log = log + self.url = url + self.crawler = crawler + # We don't loop resolving redirects here -- we just use this + # to decide whether to add the redirect URL to crawler.todo. + self.max_redirect = max_redirect + # But we do loop to retry on errors a few times. + self.max_tries = max_tries + # Everything we collect from the response goes here. + self.task = None + self.exceptions = [] + self.tries = 0 + self.request = None + self.response = None + self.body = None + self.next_url = None + self.ctype = None + self.pdict = None + self.encoding = None + self.urls = None + self.new_urls = None + + @asyncio.coroutine + def fetch(self): + """Attempt to fetch the contents of the URL. + + If successful, and the data is HTML, extract further links and + add them to the crawler. Redirects are also added back there. + """ + while self.tries < self.max_tries: + self.tries += 1 + self.request = None + try: + self.request = Request(self.log, self.url, self.crawler.pool) + yield from self.request.connect() + yield from self.request.send_request() + self.response = yield from self.request.get_response() + self.body = yield from self.response.read() + h_conn = self.response.get_header('connection').lower() + if h_conn != 'close': + self.request.close(recycle=True) + self.request = None + if self.tries > 1: + self.log(1, 'try', self.tries, 'for', self.url, 'success') + break + except (BadStatusLine, OSError) as exc: + self.exceptions.append(exc) + self.log(1, 'try', self.tries, 'for', self.url, + 'raised', repr(exc)) + ##import pdb; pdb.set_trace() + # Don't reuse the connection in this case. + finally: + if self.request is not None: + self.request.close() + else: + # We never broke out of the while loop, i.e. all tries failed. + self.log(0, 'no success for', self.url, + 'in', self.max_tries, 'tries') + return + next_url = self.response.get_redirect_url() + if next_url: + self.next_url = urllib.parse.urljoin(self.url, next_url) + if self.max_redirect > 0: + self.log(1, 'redirect to', self.next_url, 'from', self.url) + self.crawler.add_url(self.next_url, self.max_redirect-1) + else: + self.log(0, 'redirect limit reached for', self.next_url, + 'from', self.url) + else: + if self.response.status == 200: + self.ctype = self.response.get_header('content-type') + self.pdict = {} + if self.ctype: + self.ctype, self.pdict = cgi.parse_header(self.ctype) + self.encoding = self.pdict.get('charset', 'utf-8') + if self.ctype == 'text/html': + body = self.body.decode(self.encoding, 'replace') + # Replace href with (?:href|src) to follow image links. + self.urls = set(re.findall(r'(?i)href=["\']?([^\s"\'<>]+)', + body)) + if self.urls: + self.log(1, 'got', len(self.urls), + 'distinct urls from', self.url) + self.new_urls = set() + for url in self.urls: + url = unescape(url) + url = urllib.parse.urljoin(self.url, url) + url, frag = urllib.parse.urldefrag(url) + if self.crawler.add_url(url): + self.new_urls.add(url) + + def report(self, stats, file=None): + """Print a report on the state for this URL. + + Also update the Stats instance. + """ + if self.task is not None: + if not self.task.done(): + stats.add('pending') + print(self.url, 'pending', file=file) + return + elif self.task.cancelled(): + stats.add('cancelled') + print(self.url, 'cancelled', file=file) + return + elif self.task.exception(): + stats.add('exception') + exc = self.task.exception() + stats.add('exception_' + exc.__class__.__name__) + print(self.url, exc, file=file) + return + if len(self.exceptions) == self.tries: + stats.add('fail') + exc = self.exceptions[-1] + stats.add('fail_' + str(exc.__class__.__name__)) + print(self.url, 'error', exc, file=file) + elif self.next_url: + stats.add('redirect') + print(self.url, self.response.status, 'redirect', self.next_url, + file=file) + elif self.ctype == 'text/html': + stats.add('html') + size = len(self.body or b'') + stats.add('html_bytes', size) + print(self.url, self.response.status, + self.ctype, self.encoding, + size, + '%d/%d' % (len(self.new_urls or ()), len(self.urls or ())), + file=file) + elif self.response is None: + print(self.url, 'no response object') + else: + size = len(self.body or b'') + if self.response.status == 200: + stats.add('other') + stats.add('other_bytes', size) + else: + stats.add('error') + stats.add('error_bytes', size) + stats.add('status_%s' % self.response.status) + print(self.url, self.response.status, + self.ctype, self.encoding, + size, + file=file) + + +class Stats: + """Record stats of various sorts.""" + + def __init__(self): + self.stats = {} + + def add(self, key, count=1): + self.stats[key] = self.stats.get(key, 0) + count + + def report(self, file=None): + for key, count in sorted(self.stats.items()): + print('%10d' % count, key, file=file) + + +class Crawler: + """Crawl a set of URLs. + + This manages three disjoint sets of URLs (todo, busy, done). The + data structures actually store dicts -- the values in todo give + the redirect limit, while the values in busy and done are Fetcher + instances. + """ + def __init__(self, log, + roots, exclude=None, strict=True, # What to crawl. + max_redirect=10, max_tries=4, # Per-url limits. + max_tasks=10, max_pool=10, # Global limits. + ): + self.log = log + self.roots = roots + self.exclude = exclude + self.strict = strict + self.max_redirect = max_redirect + self.max_tries = max_tries + self.max_tasks = max_tasks + self.max_pool = max_pool + self.todo = {} + self.busy = {} + self.done = {} + self.pool = ConnectionPool(self.log, max_pool, max_tasks) + self.root_domains = set() + for root in roots: + parts = urllib.parse.urlparse(root) + host, port = urllib.parse.splitport(parts.netloc) + if not host: + continue + if re.match(r'\A[\d\.]*\Z', host): + self.root_domains.add(host) + else: + host = host.lower() + if self.strict: + self.root_domains.add(host) + if host.startswith('www.'): + self.root_domains.add(host[4:]) + else: + self.root_domains.add('www.' + host) + else: + parts = host.split('.') + if len(parts) > 2: + host = '.'.join(parts[-2:]) + self.root_domains.add(host) + for root in roots: + self.add_url(root) + self.governor = asyncio.locks.Semaphore(max_tasks) + self.termination = asyncio.locks.Condition() + self.t0 = time.time() + self.t1 = None + + def close(self): + """Close resources (currently only the pool).""" + self.pool.close() + + def host_okay(self, host): + """Check if a host should be crawled. + + A literal match (after lowercasing) is always good. For hosts + that don't look like IP addresses, some approximate matches + are okay depending on the strict flag. + """ + host = host.lower() + if host in self.root_domains: + return True + if re.match(r'\A[\d\.]*\Z', host): + return False + if self.strict: + return self._host_okay_strictish(host) + else: + return self._host_okay_lenient(host) + + def _host_okay_strictish(self, host): + """Check if a host should be crawled, strict-ish version. + + This checks for equality modulo an initial 'www.' component. + """ + if host.startswith('www.'): + if host[4:] in self.root_domains: + return True + else: + if 'www.' + host in self.root_domains: + return True + return False + + def _host_okay_lenient(self, host): + """Check if a host should be crawled, lenient version. + + This compares the last two components of the host. + """ + parts = host.split('.') + if len(parts) > 2: + host = '.'.join(parts[-2:]) + return host in self.root_domains + + def add_url(self, url, max_redirect=None): + """Add a URL to the todo list if not seen before.""" + if self.exclude and re.search(self.exclude, url): + return False + parts = urllib.parse.urlparse(url) + if parts.scheme not in ('http', 'https'): + self.log(2, 'skipping non-http scheme in', url) + return False + host, port = urllib.parse.splitport(parts.netloc) + if not self.host_okay(host): + self.log(2, 'skipping non-root host in', url) + return False + if max_redirect is None: + max_redirect = self.max_redirect + if url in self.todo or url in self.busy or url in self.done: + return False + self.log(1, 'adding', url, max_redirect) + self.todo[url] = max_redirect + return True + + @asyncio.coroutine + def crawl(self): + """Run the crawler until all finished.""" + with (yield from self.termination): + while self.todo or self.busy: + if self.todo: + url, max_redirect = self.todo.popitem() + fetcher = Fetcher(self.log, url, + crawler=self, + max_redirect=max_redirect, + max_tries=self.max_tries, + ) + self.busy[url] = fetcher + fetcher.task = asyncio.Task(self.fetch(fetcher)) + else: + yield from self.termination.wait() + self.t1 = time.time() + + @asyncio.coroutine + def fetch(self, fetcher): + """Call the Fetcher's fetch(), with a limit on concurrency. + + Once this returns, move the fetcher from busy to done. + """ + url = fetcher.url + with (yield from self.governor): + try: + yield from fetcher.fetch() # Fetcher gonna fetch. + finally: + # Force GC of the task, so the error is logged. + fetcher.task = None + with (yield from self.termination): + self.done[url] = fetcher + del self.busy[url] + self.termination.notify() + + def report(self, file=None): + """Print a report on all completed URLs.""" + if self.t1 is None: + self.t1 = time.time() + dt = self.t1 - self.t0 + if dt and self.max_tasks: + speed = len(self.done) / dt / self.max_tasks + else: + speed = 0 + stats = Stats() + print('*** Report ***', file=file) + try: + show = [] + show.extend(self.done.items()) + show.extend(self.busy.items()) + show.sort() + for url, fetcher in show: + fetcher.report(stats, file=file) + except KeyboardInterrupt: + print('\nInterrupted', file=file) + print('Finished', len(self.done), + 'urls in %.3f secs' % dt, + '(max_tasks=%d)' % self.max_tasks, + '(%.3f urls/sec/task)' % speed, + file=file) + stats.report(file=file) + print('Todo:', len(self.todo), file=file) + print('Busy:', len(self.busy), file=file) + print('Done:', len(self.done), file=file) + print('Date:', time.ctime(), 'local time', file=file) + + +def main(): + """Main program. + + Parse arguments, set up event loop, run crawler, print report. + """ + args = ARGS.parse_args() + if not args.roots: + print('Use --help for command line help') + return + + log = Logger(args.level) + + if args.iocp: + from asyncio.windows_events import ProactorEventLoop + loop = ProactorEventLoop() + asyncio.set_event_loop(loop) + elif args.select: + loop = asyncio.SelectorEventLoop() + asyncio.set_event_loop(loop) + else: + loop = asyncio.get_event_loop() + + roots = {fix_url(root) for root in args.roots} + + crawler = Crawler(log, + roots, exclude=args.exclude, + strict=args.strict, + max_redirect=args.max_redirect, + max_tries=args.max_tries, + max_tasks=args.max_tasks, + max_pool=args.max_pool, + ) + try: + loop.run_until_complete(crawler.crawl()) # Crawler gonna crawl. + except KeyboardInterrupt: + sys.stderr.flush() + print('\nInterrupted\n') + finally: + crawler.report() + crawler.close() + loop.close() + + +if __name__ == '__main__': + logging.basicConfig(level=logging.INFO) + main() diff --git a/examples/echo_client_tulip.py b/examples/echo_client_tulip.py new file mode 100644 index 0000000..88124ef --- /dev/null +++ b/examples/echo_client_tulip.py @@ -0,0 +1,20 @@ +import asyncio + +END = b'Bye-bye!\n' + +@asyncio.coroutine +def echo_client(): + reader, writer = yield from asyncio.open_connection('localhost', 8000) + writer.write(b'Hello, world\n') + writer.write(b'What a fine day it is.\n') + writer.write(END) + while True: + line = yield from reader.readline() + print('received:', line) + if line == END or not line: + break + writer.close() + +loop = asyncio.get_event_loop() +loop.run_until_complete(echo_client()) +loop.close() diff --git a/examples/echo_server_tulip.py b/examples/echo_server_tulip.py new file mode 100644 index 0000000..8167e54 --- /dev/null +++ b/examples/echo_server_tulip.py @@ -0,0 +1,20 @@ +import asyncio + +@asyncio.coroutine +def echo_server(): + yield from asyncio.start_server(handle_connection, 'localhost', 8000) + +@asyncio.coroutine +def handle_connection(reader, writer): + while True: + data = yield from reader.read(8192) + if not data: + break + writer.write(data) + +loop = asyncio.get_event_loop() +loop.run_until_complete(echo_server()) +try: + loop.run_forever() +finally: + loop.close() diff --git a/examples/fetch0.py b/examples/fetch0.py new file mode 100644 index 0000000..180fcf2 --- /dev/null +++ b/examples/fetch0.py @@ -0,0 +1,35 @@ +"""Simplest possible HTTP client.""" + +import sys + +from asyncio import * + + +@coroutine +def fetch(): + r, w = yield from open_connection('python.org', 80) + request = 'GET / HTTP/1.0\r\n\r\n' + print('>', request, file=sys.stderr) + w.write(request.encode('latin-1')) + while True: + line = yield from r.readline() + line = line.decode('latin-1').rstrip() + if not line: + break + print('<', line, file=sys.stderr) + print(file=sys.stderr) + body = yield from r.read() + return body + + +def main(): + loop = get_event_loop() + try: + body = loop.run_until_complete(fetch()) + finally: + loop.close() + print(body.decode('latin-1'), end='') + + +if __name__ == '__main__': + main() diff --git a/examples/fetch1.py b/examples/fetch1.py new file mode 100644 index 0000000..8dbb6e4 --- /dev/null +++ b/examples/fetch1.py @@ -0,0 +1,78 @@ +"""Fetch one URL and write its content to stdout. + +This version adds URL parsing (including SSL) and a Response object. +""" + +import sys +import urllib.parse + +from asyncio import * + + +class Response: + + def __init__(self, verbose=True): + self.verbose = verbose + self.http_version = None # 'HTTP/1.1' + self.status = None # 200 + self.reason = None # 'Ok' + self.headers = [] # [('Content-Type', 'text/html')] + + @coroutine + def read(self, reader): + @coroutine + def getline(): + return (yield from reader.readline()).decode('latin-1').rstrip() + status_line = yield from getline() + if self.verbose: print('<', status_line, file=sys.stderr) + self.http_version, status, self.reason = status_line.split(None, 2) + self.status = int(status) + while True: + header_line = yield from getline() + if not header_line: + break + if self.verbose: print('<', header_line, file=sys.stderr) + # TODO: Continuation lines. + key, value = header_line.split(':', 1) + self.headers.append((key, value.strip())) + if self.verbose: print(file=sys.stderr) + + +@coroutine +def fetch(url, verbose=True): + parts = urllib.parse.urlparse(url) + if parts.scheme == 'http': + ssl = False + elif parts.scheme == 'https': + ssl = True + else: + print('URL must use http or https.') + sys.exit(1) + port = parts.port + if port is None: + port = 443 if ssl else 80 + path = parts.path or '/' + if parts.query: + path += '?' + parts.query + request = 'GET %s HTTP/1.0\r\n\r\n' % path + if verbose: + print('>', request, file=sys.stderr, end='') + r, w = yield from open_connection(parts.hostname, port, ssl=ssl) + w.write(request.encode('latin-1')) + response = Response(verbose) + yield from response.read(r) + body = yield from r.read() + return body + + +def main(): + loop = get_event_loop() + try: + body = loop.run_until_complete(fetch(sys.argv[1], '-v' in sys.argv)) + finally: + loop.close() + print(body.decode('latin-1'), end='') + + +if __name__ == '__main__': + main() diff --git a/examples/fetch2.py b/examples/fetch2.py new file mode 100644 index 0000000..7617b59 --- /dev/null +++ b/examples/fetch2.py @@ -0,0 +1,141 @@ +"""Fetch one URL and write its content to stdout. + +This version adds a Request object. +""" + +import sys +import urllib.parse +from http.client import BadStatusLine + +from asyncio import * + + +class Request: + + def __init__(self, url, verbose=True): + self.url = url + self.verbose = verbose + self.parts = urllib.parse.urlparse(self.url) + self.scheme = self.parts.scheme + assert self.scheme in ('http', 'https'), repr(url) + self.ssl = self.parts.scheme == 'https' + self.netloc = self.parts.netloc + self.hostname = self.parts.hostname + self.port = self.parts.port or (443 if self.ssl else 80) + self.path = (self.parts.path or '/') + self.query = self.parts.query + if self.query: + self.full_path = '%s?%s' % (self.path, self.query) + else: + self.full_path = self.path + self.http_version = 'HTTP/1.1' + self.method = 'GET' + self.headers = [] + self.reader = None + self.writer = None + + @coroutine + def connect(self): + if self.verbose: + print('* Connecting to %s:%s using %s' % + (self.hostname, self.port, 'ssl' if self.ssl else 'tcp'), + file=sys.stderr) + self.reader, self.writer = yield from open_connection(self.hostname, + self.port, + ssl=self.ssl) + if self.verbose: + print('* Connected to %s' % + (self.writer.get_extra_info('peername'),), + file=sys.stderr) + + def putline(self, line): + self.writer.write(line.encode('latin-1') + b'\r\n') + + @coroutine + def send_request(self): + request = '%s %s %s' % (self.method, self.full_path, self.http_version) + if self.verbose: print('>', request, file=sys.stderr) + self.putline(request) + if 'host' not in {key.lower() for key, _ in self.headers}: + self.headers.insert(0, ('Host', self.netloc)) + for key, value in self.headers: + line = '%s: %s' % (key, value) + if self.verbose: print('>', line, file=sys.stderr) + self.putline(line) + self.putline('') + + @coroutine + def get_response(self): + response = Response(self.reader, self.verbose) + yield from response.read_headers() + return response + + +class Response: + + def __init__(self, reader, verbose=True): + self.reader = reader + self.verbose = verbose + self.http_version = None # 'HTTP/1.1' + self.status = None # 200 + self.reason = None # 'Ok' + self.headers = [] # [('Content-Type', 'text/html')] + + @coroutine + def getline(self): + return (yield from self.reader.readline()).decode('latin-1').rstrip() + + @coroutine + def read_headers(self): + status_line = yield from self.getline() + if self.verbose: print('<', status_line, file=sys.stderr) + status_parts = status_line.split(None, 2) + if len(status_parts) != 3: + raise BadStatusLine(status_line) + self.http_version, status, self.reason = status_parts + self.status = int(status) + while True: + header_line = yield from self.getline() + if not header_line: + break + if self.verbose: print('<', header_line, file=sys.stderr) + # TODO: Continuation lines. + key, value = header_line.split(':', 1) + self.headers.append((key, value.strip())) + if self.verbose: print(file=sys.stderr) + + @coroutine + def read(self): + nbytes = None + for key, value in self.headers: + if key.lower() == 'content-length': + nbytes = int(value) + break + if nbytes is None: + body = yield from self.reader.read() + else: + body = yield from self.reader.readexactly(nbytes) + return body + + +@coroutine +def fetch(url, verbose=True): + request = Request(url, verbose) + yield from request.connect() + yield from request.send_request() + response = yield from request.get_response() + body = yield from response.read() + return body + + +def main(): + loop = get_event_loop() + try: + body = loop.run_until_complete(fetch(sys.argv[1], '-v' in sys.argv)) + finally: + loop.close() + sys.stdout.buffer.write(body) + + +if __name__ == '__main__': + main() diff --git a/examples/fetch3.py b/examples/fetch3.py new file mode 100644 index 0000000..9419afd --- /dev/null +++ b/examples/fetch3.py @@ -0,0 +1,230 @@ +"""Fetch one URL and write its content to stdout. + +This version adds a primitive connection pool, redirect following and +chunked transfer-encoding. It also supports a --iocp flag. +""" + +import sys +import urllib.parse +from http.client import BadStatusLine + +from asyncio import * + + +class ConnectionPool: + # TODO: Locking? Close idle connections? + + def __init__(self, verbose=False): + self.verbose = verbose + self.connections = {} # {(host, port, ssl): (reader, writer)} + + def close(self): + for _, writer in self.connections.values(): + writer.close() + + @coroutine + def open_connection(self, host, port, ssl): + port = port or (443 if ssl else 80) + ipaddrs = yield from get_event_loop().getaddrinfo(host, port) + if self.verbose: + print('* %s resolves to %s' % + (host, ', '.join(ip[4][0] for ip in ipaddrs)), + file=sys.stderr) + for _, _, _, _, (h, p, *_) in ipaddrs: + key = h, p, ssl + conn = self.connections.get(key) + if conn: + reader, writer = conn + if reader._eof: + self.connections.pop(key) + continue + if self.verbose: + print('* Reusing pooled connection', key, file=sys.stderr) + return conn + reader, writer = yield from open_connection(host, port, ssl=ssl) + host, port, *_ = writer.get_extra_info('peername') + key = host, port, ssl + self.connections[key] = reader, writer + if self.verbose: + print('* New connection', key, file=sys.stderr) + return reader, writer + + +class Request: + + def __init__(self, url, verbose=True): + self.url = url + self.verbose = verbose + self.parts = urllib.parse.urlparse(self.url) + self.scheme = self.parts.scheme + assert self.scheme in ('http', 'https'), repr(url) + self.ssl = self.parts.scheme == 'https' + self.netloc = self.parts.netloc + self.hostname = self.parts.hostname + self.port = self.parts.port or (443 if self.ssl else 80) + self.path = (self.parts.path or '/') + self.query = self.parts.query + if self.query: + self.full_path = '%s?%s' % (self.path, self.query) + else: + self.full_path = self.path + self.http_version = 'HTTP/1.1' + self.method = 'GET' + self.headers = [] + self.reader = None + self.writer = None + + def vprint(self, *args): + if self.verbose: + print(*args, file=sys.stderr) + + @coroutine + def connect(self, pool): + self.vprint('* Connecting to %s:%s using %s' % + (self.hostname, self.port, 'ssl' if self.ssl else 'tcp')) + self.reader, self.writer = \ + yield from pool.open_connection(self.hostname, + self.port, + ssl=self.ssl) + self.vprint('* Connected to %s' % + (self.writer.get_extra_info('peername'),)) + + @coroutine + def putline(self, line): + self.vprint('>', line) + self.writer.write(line.encode('latin-1') + b'\r\n') + ##yield from self.writer.drain() + + @coroutine + def send_request(self): + request = '%s %s %s' % (self.method, self.full_path, self.http_version) + yield from self.putline(request) + if 'host' not in {key.lower() for key, _ in self.headers}: + self.headers.insert(0, ('Host', self.netloc)) + for key, value in self.headers: + line = '%s: %s' % (key, value) + yield from self.putline(line) + yield from self.putline('') + + @coroutine + def get_response(self): + response = Response(self.reader, self.verbose) + yield from response.read_headers() + return response + + +class Response: + + def __init__(self, reader, verbose=True): + self.reader = reader + self.verbose = verbose + self.http_version = None # 'HTTP/1.1' + self.status = None # 200 + self.reason = None # 'Ok' + self.headers = [] # [('Content-Type', 'text/html')] + + def vprint(self, *args): + if self.verbose: + print(*args, file=sys.stderr) + + @coroutine + def getline(self): + line = (yield from self.reader.readline()).decode('latin-1').rstrip() + self.vprint('<', line) + return line + + @coroutine + def read_headers(self): + status_line = yield from self.getline() + status_parts = status_line.split(None, 2) + if len(status_parts) != 3: + raise BadStatusLine(status_line) + self.http_version, status, self.reason = status_parts + self.status = int(status) + while True: + header_line = yield from self.getline() + if not header_line: + break + # TODO: Continuation lines. + key, value = header_line.split(':', 1) + self.headers.append((key, value.strip())) + + def get_redirect_url(self, default=None): + if self.status not in (300, 301, 302, 303, 307): + return default + return self.get_header('Location', default) + + def get_header(self, key, default=None): + key = key.lower() + for k, v in self.headers: + if k.lower() == key: + return v + return default + + @coroutine + def read(self): + nbytes = None + for key, value in self.headers: + if key.lower() == 'content-length': + nbytes = int(value) + break + if nbytes is None: + if self.get_header('transfer-encoding', '').lower() == 'chunked': + blocks = [] + size = -1 + while size: + size_header = yield from self.reader.readline() + if not size_header: + break + parts = size_header.split(b';') + size = int(parts[0], 16) + if size: + block = yield from self.reader.readexactly(size) + assert len(block) == size, (len(block), size) + blocks.append(block) + crlf = yield from self.reader.readline() + assert crlf == b'\r\n', repr(crlf) + body = b''.join(blocks) + else: + body = yield from self.reader.read() + else: + body = yield from self.reader.readexactly(nbytes) + return body + + +@coroutine +def fetch(url, verbose=True, max_redirect=10): + pool = ConnectionPool(verbose) + try: + for _ in range(max_redirect): + request = Request(url, verbose) + yield from request.connect(pool) + yield from request.send_request() + response = yield from request.get_response() + body = yield from response.read() + next_url = response.get_redirect_url() + if not next_url: + break + url = urllib.parse.urljoin(url, next_url) + print('redirect to', url, file=sys.stderr) + return body + finally: + pool.close() + + +def main(): + if '--iocp' in sys.argv: + from asyncio.windows_events import ProactorEventLoop + loop = ProactorEventLoop() + set_event_loop(loop) + else: + loop = get_event_loop() + try: + body = loop.run_until_complete(fetch(sys.argv[1], '-v' in sys.argv)) + finally: + loop.close() + sys.stdout.buffer.write(body) + + +if __name__ == '__main__': + main() diff --git a/examples/fuzz_as_completed.py b/examples/fuzz_as_completed.py new file mode 100644 index 0000000..123fbf1 --- /dev/null +++ b/examples/fuzz_as_completed.py @@ -0,0 +1,69 @@ +#!/usr/bin/env python3 + +"""Fuzz tester for as_completed(), by Glenn Langford.""" + +import asyncio +import itertools +import random +import sys + +@asyncio.coroutine +def sleeper(time): + yield from asyncio.sleep(time) + return time + +@asyncio.coroutine +def watcher(tasks,delay=False): + res = [] + for t in asyncio.as_completed(tasks): + r = yield from t + res.append(r) + if delay: + # simulate processing delay + process_time = random.random() / 10 + yield from asyncio.sleep(process_time) + #print(res) + #assert(sorted(res) == res) + if sorted(res) != res: + print('FAIL', res) + print('------------') + else: + print('.', end='') + sys.stdout.flush() + +loop = asyncio.get_event_loop() + +print('Pass 1') +# All permutations of discrete task running times must be returned +# by as_completed in the correct order. +task_times = [0, 0.1, 0.2, 0.3, 0.4 ] # 120 permutations +for times in itertools.permutations(task_times): + tasks = [ asyncio.Task(sleeper(t)) for t in times ] + loop.run_until_complete(asyncio.Task(watcher(tasks))) + +print() +print('Pass 2') +# Longer task times, with randomized duplicates. 100 tasks each time. +longer_task_times = [x/10 for x in range(30)] +for i in range(20): + task_times = longer_task_times * 10 + random.shuffle(task_times) + #print('Times', task_times[:500]) + tasks = [ asyncio.Task(sleeper(t)) for t in task_times[:100] ] + loop.run_until_complete(asyncio.Task(watcher(tasks))) + +print() +print('Pass 3') +# Same as pass 2, but with a random processing delay (0 - 0.1s) after +# retrieving each future from as_completed and 200 tasks. This tests whether +# the order that callbacks are triggered is preserved through to the +# as_completed caller. +for i in range(20): + task_times = longer_task_times * 10 + random.shuffle(task_times) + #print('Times', task_times[:200]) + tasks = [ asyncio.Task(sleeper(t)) for t in task_times[:200] ] + loop.run_until_complete(asyncio.Task(watcher(tasks, delay=True))) + +print() +loop.close() diff --git a/examples/hello_callback.py b/examples/hello_callback.py new file mode 100644 index 0000000..7ccbea1 --- /dev/null +++ b/examples/hello_callback.py @@ -0,0 +1,17 @@ +"""Print 'Hello World' every two seconds, using a callback.""" + +import asyncio + + +def print_and_repeat(loop): + print('Hello World') + loop.call_later(2, print_and_repeat, loop) + + +if __name__ == '__main__': + loop = asyncio.get_event_loop() + print_and_repeat(loop) + try: + loop.run_forever() + finally: + loop.close() diff --git a/examples/hello_coroutine.py b/examples/hello_coroutine.py new file mode 100644 index 0000000..b9347aa --- /dev/null +++ b/examples/hello_coroutine.py @@ -0,0 +1,18 @@ +"""Print 'Hello World' every two seconds, using a coroutine.""" + +import asyncio + + +@asyncio.coroutine +def greet_every_two_seconds(): + while True: + print('Hello World') + yield from asyncio.sleep(2) + + +if __name__ == '__main__': + loop = asyncio.get_event_loop() + try: + loop.run_until_complete(greet_every_two_seconds()) + finally: + loop.close() diff --git a/examples/shell.py b/examples/shell.py new file mode 100644 index 0000000..7dc7caf --- /dev/null +++ b/examples/shell.py @@ -0,0 +1,50 @@ +"""Examples using create_subprocess_exec() and create_subprocess_shell().""" + +import asyncio +import signal +from asyncio.subprocess import PIPE + +@asyncio.coroutine +def cat(loop): + proc = yield from asyncio.create_subprocess_shell("cat", + stdin=PIPE, + stdout=PIPE) + print("pid: %s" % proc.pid) + + message = "Hello World!" + print("cat write: %r" % message) + + stdout, stderr = yield from proc.communicate(message.encode('ascii')) + print("cat read: %r" % stdout.decode('ascii')) + + exitcode = yield from proc.wait() + print("(exit code %s)" % exitcode) + +@asyncio.coroutine +def ls(loop): + proc = yield from asyncio.create_subprocess_exec("ls", + stdout=PIPE) + while True: + line = yield from proc.stdout.readline() + if not line: + break + print("ls>>", line.decode('ascii').rstrip()) + try: + proc.send_signal(signal.SIGINT) + except ProcessLookupError: + pass + +@asyncio.coroutine +def test_call(*args, timeout=None): + try: + proc = yield from asyncio.create_subprocess_exec(*args) + exitcode = yield from asyncio.wait_for(proc.wait(), timeout) + print("%s: exit code %s" % (' '.join(args), exitcode)) + except asyncio.TimeoutError: + print("timeout! (%.1f sec)" % timeout) + +loop = asyncio.get_event_loop() +loop.run_until_complete(cat(loop)) +loop.run_until_complete(ls(loop)) +loop.run_until_complete(test_call("bash", "-c", "sleep 3", timeout=1.0)) +loop.close() diff --git a/examples/simple_tcp_server.py b/examples/simple_tcp_server.py new file mode 100644 index 0000000..5f874ff --- /dev/null +++ b/examples/simple_tcp_server.py @@ -0,0 +1,154 @@ +""" +Example of a simple TCP server that is written in (mostly) coroutine +style and uses asyncio.streams.start_server() and +asyncio.streams.open_connection(). + +Note that running this example starts both the TCP server and client +in the same process. It listens on port 12345 on 127.0.0.1, so it will +fail if this port is currently in use. +""" + +import sys +import asyncio +import asyncio.streams + + +class MyServer: + """ + This is just an example of how a TCP server might be potentially + structured. This class has basically 3 methods: start the server, + handle a client, and stop the server. + + Note that you don't have to follow this structure, it is really + just an example or possible starting point. + """ + + def __init__(self): + self.server = None # encapsulates the server sockets + + # this keeps track of all the clients that connected to our + # server. It can be useful in some cases, for instance to + # kill client connections or to broadcast some data to all + # clients... + self.clients = {} # task -> (reader, writer) + + def _accept_client(self, client_reader, client_writer): + """ + This method accepts a new client connection and creates a Task + to handle this client. self.clients is updated to keep track + of the new client. + """ + + # start a new Task to handle this specific client connection + task = asyncio.Task(self._handle_client(client_reader, client_writer)) + self.clients[task] = (client_reader, client_writer) + + def client_done(task): + print("client task done:", task, file=sys.stderr) + del self.clients[task] + + task.add_done_callback(client_done) + + @asyncio.coroutine + def _handle_client(self, client_reader, client_writer): + """ + This method actually does the work to handle the requests for + a specific client. The protocol is line oriented, so there is + a main loop that reads a line with a request and then sends + out one or more lines back to the client with the result. + """ + while True: + data = (yield from client_reader.readline()).decode("utf-8") + if not data: # an empty string means the client disconnected + break + cmd, *args = data.rstrip().split(' ') + if cmd == 'add': + arg1 = float(args[0]) + arg2 = float(args[1]) + retval = arg1 + arg2 + client_writer.write("{!r}\n".format(retval).encode("utf-8")) + elif cmd == 'repeat': + times = int(args[0]) + msg = args[1] + client_writer.write("begin\n".encode("utf-8")) + for idx in range(times): + client_writer.write("{}. {}\n".format(idx+1, msg) + .encode("utf-8")) + client_writer.write("end\n".encode("utf-8")) + else: + print("Bad command {!r}".format(data), file=sys.stderr) + + # This enables us to have flow control in our connection. + yield from client_writer.drain() + + def start(self, loop): + """ + Starts the TCP server, so that it listens on port 12345. + + For each client that connects, the accept_client method gets + called. This method runs the loop until the server sockets + are ready to accept connections. + """ + self.server = loop.run_until_complete( + asyncio.streams.start_server(self._accept_client, + '127.0.0.1', 12345, + loop=loop)) + + def stop(self, loop): + """ + Stops the TCP server, i.e. closes the listening socket(s). + + This method runs the loop until the server sockets are closed. + """ + if self.server is not None: + self.server.close() + loop.run_until_complete(self.server.wait_closed()) + self.server = None + + +def main(): + loop = asyncio.get_event_loop() + + # creates a server and starts listening to TCP connections + server = MyServer() + server.start(loop) + + @asyncio.coroutine + def client(): + reader, writer = yield from asyncio.streams.open_connection( + '127.0.0.1', 12345, loop=loop) + + def send(msg): + print("> " + msg) + writer.write((msg + '\n').encode("utf-8")) + + def recv(): + msgback = (yield from reader.readline()).decode("utf-8").rstrip() + print("< " + msgback) + return msgback + + # send a line + send("add 1 2") + msg = yield from recv() + + send("repeat 5 hello") + msg = yield from recv() + assert msg == 'begin' + while True: + msg = yield from recv() + if msg == 'end': + break + + writer.close() + yield from asyncio.sleep(0.5) + + # creates a client and connects to our server + try: + loop.run_until_complete(client()) + server.stop(loop) + finally: + loop.close() + + +if __name__ == '__main__': + main() diff --git a/examples/sink.py b/examples/sink.py new file mode 100644 index 0000000..d362cbb --- /dev/null +++ b/examples/sink.py @@ -0,0 +1,94 @@ +"""Test service that accepts connections and reads all data off them.""" + +import argparse +import os +import sys + +from asyncio import * + +ARGS = argparse.ArgumentParser(description="TCP data sink example.") +ARGS.add_argument( + '--tls', action='store_true', dest='tls', + default=False, help='Use TLS with a self-signed cert') +ARGS.add_argument( + '--iocp', action='store_true', dest='iocp', + default=False, help='Use IOCP event loop (Windows only)') +ARGS.add_argument( + '--host', action='store', dest='host', + default='127.0.0.1', help='Host name') +ARGS.add_argument( + '--port', action='store', dest='port', + default=1111, type=int, help='Port number') +ARGS.add_argument( + '--maxsize', action='store', dest='maxsize', + default=16*1024*1024, type=int, help='Max total data size') + +server = None +args = None + + +def dprint(*args): + print('sink:', *args, file=sys.stderr) + + +class Service(Protocol): + + def connection_made(self, tr): + dprint('connection from', tr.get_extra_info('peername')) + dprint('my socket is', tr.get_extra_info('sockname')) + self.tr = tr + self.total = 0 + + def data_received(self, data): + if data == b'stop': + dprint('stopping server') + server.close() + self.tr.close() + return + self.total += len(data) + dprint('received', len(data), 'bytes; total', self.total) + if self.total > args.maxsize: + dprint('closing due to too much data') + self.tr.close() + + def connection_lost(self, how): + dprint('closed', repr(how)) + + +@coroutine +def start(loop, host, port): + global server + sslctx = None + if args.tls: + import ssl + # TODO: take cert/key from args as well. + here = os.path.join(os.path.dirname(__file__), '..', 'tests') + sslctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + sslctx.options |= ssl.OP_NO_SSLv2 + sslctx.load_cert_chain( + certfile=os.path.join(here, 'ssl_cert.pem'), + keyfile=os.path.join(here, 'ssl_key.pem')) + + server = yield from loop.create_server(Service, host, port, ssl=sslctx) + dprint('serving TLS' if sslctx else 'serving', + [s.getsockname() for s in server.sockets]) + yield from server.wait_closed() + + +def main(): + global args + args = ARGS.parse_args() + if args.iocp: + from asyncio.windows_events import ProactorEventLoop + loop = ProactorEventLoop() + set_event_loop(loop) + else: + loop = get_event_loop() + try: + loop.run_until_complete(start(loop, args.host, args.port)) + finally: + loop.close() + + +if __name__ == '__main__': + main() diff --git a/examples/source.py b/examples/source.py new file mode 100644 index 0000000..7fd11fb --- /dev/null +++ b/examples/source.py @@ -0,0 +1,100 @@ +"""Test client that connects and sends infinite data.""" + +import argparse +import sys + +from asyncio import * +from asyncio import test_utils + + +ARGS = argparse.ArgumentParser(description="TCP data sink example.") +ARGS.add_argument( + '--tls', action='store_true', dest='tls', + default=False, help='Use TLS') +ARGS.add_argument( + '--iocp', action='store_true', dest='iocp', + default=False, help='Use IOCP event loop (Windows only)') +ARGS.add_argument( + '--stop', action='store_true', dest='stop', + default=False, help='Stop the server by sending it b"stop" as data') +ARGS.add_argument( + '--host', action='store', dest='host', + default='127.0.0.1', help='Host name') +ARGS.add_argument( + '--port', action='store', dest='port', + default=1111, type=int, help='Port number') +ARGS.add_argument( + '--size', action='store', dest='size', + default=16*1024, type=int, help='Data size') + +args = None + + +def dprint(*args): + print('source:', *args, file=sys.stderr) + + +class Client(Protocol): + + total = 0 + + def connection_made(self, tr): + dprint('connecting to', tr.get_extra_info('peername')) + dprint('my socket is', tr.get_extra_info('sockname')) + self.tr = tr + self.lost = False + self.loop = get_event_loop() + self.waiter = Future() + if args.stop: + self.tr.write(b'stop') + self.tr.close() + else: + self.data = b'x'*args.size + self.write_some_data() + + def write_some_data(self): + if self.lost: + dprint('lost already') + return + data = self.data + size = len(data) + self.total += size + dprint('writing', size, 'bytes; total', self.total) + self.tr.write(data) + self.loop.call_soon(self.write_some_data) + + def connection_lost(self, exc): + dprint('lost connection', repr(exc)) + self.lost = True + self.waiter.set_result(None) + + +@coroutine +def start(loop, host, port): + sslctx = None + if args.tls: + sslctx = test_utils.dummy_ssl_context() + tr, pr = yield from loop.create_connection(Client, host, port, + ssl=sslctx) + dprint('tr =', tr) + dprint('pr =', pr) + yield from pr.waiter + + +def main(): + global args + args = ARGS.parse_args() + if args.iocp: + from asyncio.windows_events import ProactorEventLoop + loop = ProactorEventLoop() + set_event_loop(loop) + else: + loop = get_event_loop() + try: + loop.run_until_complete(start(loop, args.host, args.port)) + finally: + loop.close() + + +if __name__ == '__main__': + main() diff --git a/examples/source1.py b/examples/source1.py new file mode 100644 index 0000000..6802e96 --- /dev/null +++ b/examples/source1.py @@ -0,0 +1,98 @@ +"""Like source.py, but uses streams.""" + +import argparse +import sys + +from asyncio import * +from asyncio import test_utils + +ARGS = argparse.ArgumentParser(description="TCP data sink example.") +ARGS.add_argument( + '--tls', action='store_true', dest='tls', + default=False, help='Use TLS') +ARGS.add_argument( + '--iocp', action='store_true', dest='iocp', + default=False, help='Use IOCP event loop (Windows only)') +ARGS.add_argument( + '--stop', action='store_true', dest='stop', + default=False, help='Stop the server by sending it b"stop" as data') +ARGS.add_argument( + '--host', action='store', dest='host', + default='127.0.0.1', help='Host name') +ARGS.add_argument( + '--port', action='store', dest='port', + default=1111, type=int, help='Port number') +ARGS.add_argument( + '--size', action='store', dest='size', + default=16*1024, type=int, help='Data size') + + +class Debug: + """A clever little class that suppresses repetitive messages.""" + + overwriting = False + label = 'stream1:' + + def print(self, *args): + if self.overwriting: + print(file=sys.stderr) + self.overwriting = 0 + print(self.label, *args, file=sys.stderr) + + def oprint(self, *args): + self.overwriting += 1 + end = '\n' + if self.overwriting >= 3: + if self.overwriting == 3: + print(self.label, '[...]', file=sys.stderr) + end = '\r' + print(self.label, *args, file=sys.stderr, end=end, flush=True) + + +@coroutine +def start(loop, args): + d = Debug() + total = 0 + sslctx = None + if args.tls: + d.print('using dummy SSLContext') + sslctx = test_utils.dummy_ssl_context() + r, w = yield from open_connection(args.host, args.port, ssl=sslctx) + d.print('r =', r) + d.print('w =', w) + if args.stop: + w.write(b'stop') + w.close() + else: + size = args.size + data = b'x'*size + try: + while True: + total += size + d.oprint('writing', size, 'bytes; total', total) + w.write(data) + f = w.drain() + if f: + d.print('pausing') + yield from f + except (ConnectionResetError, BrokenPipeError) as exc: + d.print('caught', repr(exc)) + + +def main(): + global args + args = ARGS.parse_args() + if args.iocp: + from asyncio.windows_events import ProactorEventLoop + loop = ProactorEventLoop() + set_event_loop(loop) + else: + loop = get_event_loop() + try: + loop.run_until_complete(start(loop, args)) + finally: + loop.close() + + +if __name__ == '__main__': + main() diff --git a/examples/stacks.py b/examples/stacks.py new file mode 100644 index 0000000..0b7e0b2 --- /dev/null +++ b/examples/stacks.py @@ -0,0 +1,44 @@ +"""Crude demo for print_stack().""" + + +from asyncio import * + + +@coroutine +def helper(r): + print('--- helper ---') + for t in Task.all_tasks(): + t.print_stack() + print('--- end helper ---') + line = yield from r.readline() + 1/0 + return line + +def doit(): + l = get_event_loop() + lr = l.run_until_complete + r, w = lr(open_connection('python.org', 80)) + t1 = async(helper(r)) + for t in Task.all_tasks(): t.print_stack() + print('---') + l._run_once() + for t in Task.all_tasks(): t.print_stack() + print('---') + w.write(b'GET /\r\n') + w.write_eof() + try: + lr(t1) + except Exception as e: + print('catching', e) + finally: + for t in Task.all_tasks(): + t.print_stack() + l.close() + + +def main(): + doit() + + +if __name__ == '__main__': + main() diff --git a/examples/subprocess_attach_read_pipe.py b/examples/subprocess_attach_read_pipe.py new file mode 100644 index 0000000..d8a6242 --- /dev/null +++ b/examples/subprocess_attach_read_pipe.py @@ -0,0 +1,33 @@ +#!/usr/bin/env python3 +"""Example showing how to attach a read pipe to a subprocess.""" +import asyncio +import os, sys + +code = """ +import os, sys +fd = int(sys.argv[1]) +os.write(fd, b'data') +os.close(fd) +""" + +loop = asyncio.get_event_loop() + +@asyncio.coroutine +def task(): + rfd, wfd = os.pipe() + args = [sys.executable, '-c', code, str(wfd)] + + pipe = open(rfd, 'rb', 0) + reader = asyncio.StreamReader(loop=loop) + protocol = asyncio.StreamReaderProtocol(reader, loop=loop) + transport, _ = yield from loop.connect_read_pipe(lambda: protocol, pipe) + + proc = yield from asyncio.create_subprocess_exec(*args, pass_fds={wfd}) + yield from proc.wait() + + os.close(wfd) + data = yield from reader.read() + print("read = %r" % data.decode()) + +loop.run_until_complete(task()) +loop.close() diff --git a/examples/subprocess_attach_write_pipe.py b/examples/subprocess_attach_write_pipe.py new file mode 100644 index 0000000..8614877 --- /dev/null +++ b/examples/subprocess_attach_write_pipe.py @@ -0,0 +1,35 @@ +#!/usr/bin/env python3 +"""Example showing how to attach a write pipe to a subprocess.""" +import asyncio +import os, sys +from asyncio import subprocess + +code = """ +import os, sys +fd = int(sys.argv[1]) +data = os.read(fd, 1024) +sys.stdout.buffer.write(data) +""" + +loop = asyncio.get_event_loop() + +@asyncio.coroutine +def task(): + rfd, wfd = os.pipe() + args = [sys.executable, '-c', code, str(rfd)] + proc = yield from asyncio.create_subprocess_exec( + *args, + pass_fds={rfd}, + stdout=subprocess.PIPE) + + pipe = open(wfd, 'wb', 0) + transport, _ = yield from loop.connect_write_pipe(asyncio.Protocol, + pipe) + transport.write(b'data') + + stdout, stderr = yield from proc.communicate() + print("stdout = %r" % stdout.decode()) + pipe.close() + +loop.run_until_complete(task()) +loop.close() diff --git a/examples/subprocess_shell.py b/examples/subprocess_shell.py new file mode 100644 index 0000000..745cb64 --- /dev/null +++ b/examples/subprocess_shell.py @@ -0,0 +1,87 @@ +"""Example writing to and reading from a subprocess at the same time using +tasks.""" + +import asyncio +import os +from asyncio.subprocess import PIPE + + +@asyncio.coroutine +def send_input(writer, input): + try: + for line in input: + print('sending', len(line), 'bytes') + writer.write(line) + d = writer.drain() + if d: + print('pause writing') + yield from d + print('resume writing') + writer.close() + except BrokenPipeError: + print('stdin: broken pipe error') + except ConnectionResetError: + print('stdin: connection reset error') + +@asyncio.coroutine +def log_errors(reader): + while True: + line = yield from reader.readline() + if not line: + break + print('ERROR', repr(line)) + +@asyncio.coroutine +def read_stdout(stdout): + while True: + line = yield from stdout.readline() + print('received', repr(line)) + if not line: + break + +@asyncio.coroutine +def start(cmd, input=None, **kwds): + kwds['stdout'] = PIPE + kwds['stderr'] = PIPE + if input is None and 'stdin' not in kwds: + kwds['stdin'] = None + else: + kwds['stdin'] = PIPE + proc = yield from asyncio.create_subprocess_shell(cmd, **kwds) + + tasks = [] + if input is not None: + tasks.append(send_input(proc.stdin, input)) + else: + print('No stdin') + if proc.stderr is not None: + tasks.append(log_errors(proc.stderr)) + else: + print('No stderr') + if proc.stdout is not None: + tasks.append(read_stdout(proc.stdout)) + else: + print('No stdout') + + if tasks: + # feed stdin while consuming stdout to avoid hang + # when stdin pipe is full + yield from asyncio.wait(tasks) + + exitcode = yield from proc.wait() + print("exit code: %s" % exitcode) + + +def main(): + if os.name == 'nt': + loop = asyncio.ProactorEventLoop() + asyncio.set_event_loop(loop) + else: + loop = asyncio.get_event_loop() + loop.run_until_complete(start( + 'sleep 2; wc', input=[b'foo bar baz\n'*300 for i in range(100)])) + loop.close() + + +if __name__ == '__main__': + main() diff --git a/examples/tcp_echo.py b/examples/tcp_echo.py new file mode 100755 index 0000000..d743242 --- /dev/null +++ b/examples/tcp_echo.py @@ -0,0 +1,128 @@ +#!/usr/bin/env python3 +"""TCP echo server example.""" +import argparse +import asyncio +import sys +try: + import signal +except ImportError: + signal = None + + +class EchoServer(asyncio.Protocol): + + TIMEOUT = 5.0 + + def timeout(self): + print('connection timeout, closing.') + self.transport.close() + + def connection_made(self, transport): + print('connection made') + self.transport = transport + + # start 5 seconds timeout timer + self.h_timeout = asyncio.get_event_loop().call_later( + self.TIMEOUT, self.timeout) + + def data_received(self, data): + print('data received: ', data.decode()) + self.transport.write(b'Re: ' + data) + + # restart timeout timer + self.h_timeout.cancel() + self.h_timeout = asyncio.get_event_loop().call_later( + self.TIMEOUT, self.timeout) + + def eof_received(self): + pass + + def connection_lost(self, exc): + print('connection lost:', exc) + self.h_timeout.cancel() + + +class EchoClient(asyncio.Protocol): + + message = 'This is the message. It will be echoed.' + + def connection_made(self, transport): + self.transport = transport + self.transport.write(self.message.encode()) + print('data sent:', self.message) + + def data_received(self, data): + print('data received:', data) + + # disconnect after 10 seconds + asyncio.get_event_loop().call_later(10.0, self.transport.close) + + def eof_received(self): + pass + + def connection_lost(self, exc): + print('connection lost:', exc) + asyncio.get_event_loop().stop() + + +def start_client(loop, host, port): + t = asyncio.Task(loop.create_connection(EchoClient, host, port)) + loop.run_until_complete(t) + + +def start_server(loop, host, port): + f = loop.create_server(EchoServer, host, port) + return loop.run_until_complete(f) + + +ARGS = argparse.ArgumentParser(description="TCP Echo example.") +ARGS.add_argument( + '--server', action="store_true", dest='server', + default=False, help='Run tcp server') +ARGS.add_argument( + '--client', action="store_true", dest='client', + default=False, help='Run tcp client') +ARGS.add_argument( + '--host', action="store", dest='host', + default='127.0.0.1', help='Host name') +ARGS.add_argument( + '--port', action="store", dest='port', + default=9999, type=int, help='Port number') +ARGS.add_argument( + '--iocp', action="store_true", dest='iocp', + default=False, help='Use IOCP event loop') + + +if __name__ == '__main__': + args = ARGS.parse_args() + + if ':' in args.host: + args.host, port = args.host.split(':', 1) + args.port = int(port) + + if (not (args.server or args.client)) or (args.server and args.client): + print('Please specify --server or --client\n') + ARGS.print_help() + else: + if args.iocp: + from asyncio import windows_events + loop = windows_events.ProactorEventLoop() + asyncio.set_event_loop(loop) + else: + loop = asyncio.get_event_loop() + print ('Using backend: {0}'.format(loop.__class__.__name__)) + + if signal is not None and sys.platform != 'win32': + loop.add_signal_handler(signal.SIGINT, loop.stop) + + if args.server: + server = start_server(loop, args.host, args.port) + else: + start_client(loop, args.host, args.port) + + try: + loop.run_forever() + finally: + if args.server: + server.close() + loop.close() diff --git a/examples/timing_tcp_server.py b/examples/timing_tcp_server.py new file mode 100644 index 0000000..883ce6d --- /dev/null +++ b/examples/timing_tcp_server.py @@ -0,0 +1,168 @@ +""" +A variant of simple_tcp_server.py that measures the time it takes to +send N messages for a range of N. (This was O(N**2) in a previous +version of Tulip.) + +Note that running this example starts both the TCP server and client +in the same process. It listens on port 1234 on 127.0.0.1, so it will +fail if this port is currently in use. +""" + +import sys +import time +import random + +import asyncio +import asyncio.streams + + +class MyServer: + """ + This is just an example of how a TCP server might be potentially + structured. This class has basically 3 methods: start the server, + handle a client, and stop the server. + + Note that you don't have to follow this structure, it is really + just an example or possible starting point. + """ + + def __init__(self): + self.server = None # encapsulates the server sockets + + # this keeps track of all the clients that connected to our + # server. It can be useful in some cases, for instance to + # kill client connections or to broadcast some data to all + # clients... + self.clients = {} # task -> (reader, writer) + + def _accept_client(self, client_reader, client_writer): + """ + This method accepts a new client connection and creates a Task + to handle this client. self.clients is updated to keep track + of the new client. + """ + + # start a new Task to handle this specific client connection + task = asyncio.Task(self._handle_client(client_reader, client_writer)) + self.clients[task] = (client_reader, client_writer) + + def client_done(task): + print("client task done:", task, file=sys.stderr) + del self.clients[task] + + task.add_done_callback(client_done) + + @asyncio.coroutine + def _handle_client(self, client_reader, client_writer): + """ + This method actually does the work to handle the requests for + a specific client. The protocol is line oriented, so there is + a main loop that reads a line with a request and then sends + out one or more lines back to the client with the result. + """ + while True: + data = (yield from client_reader.readline()).decode("utf-8") + if not data: # an empty string means the client disconnected + break + cmd, *args = data.rstrip().split(' ') + if cmd == 'add': + arg1 = float(args[0]) + arg2 = float(args[1]) + retval = arg1 + arg2 + client_writer.write("{!r}\n".format(retval).encode("utf-8")) + elif cmd == 'repeat': + times = int(args[0]) + msg = args[1] + client_writer.write("begin\n".encode("utf-8")) + for idx in range(times): + client_writer.write("{}. {}\n".format( + idx+1, msg + 'x'*random.randint(10, 50)) + .encode("utf-8")) + client_writer.write("end\n".encode("utf-8")) + else: + print("Bad command {!r}".format(data), file=sys.stderr) + + # This enables us to have flow control in our connection. + yield from client_writer.drain() + + def start(self, loop): + """ + Starts the TCP server, so that it listens on port 1234. + + For each client that connects, the accept_client method gets + called. This method runs the loop until the server sockets + are ready to accept connections. + """ + self.server = loop.run_until_complete( + asyncio.streams.start_server(self._accept_client, + '127.0.0.1', 12345, + loop=loop)) + + def stop(self, loop): + """ + Stops the TCP server, i.e. closes the listening socket(s). + + This method runs the loop until the server sockets are closed. + """ + if self.server is not None: + self.server.close() + loop.run_until_complete(self.server.wait_closed()) + self.server = None + + +def main(): + loop = asyncio.get_event_loop() + + # creates a server and starts listening to TCP connections + server = MyServer() + server.start(loop) + + @asyncio.coroutine + def client(): + reader, writer = yield from asyncio.streams.open_connection( + '127.0.0.1', 12345, loop=loop) + + def send(msg): + print("> " + msg) + writer.write((msg + '\n').encode("utf-8")) + + def recv(): + msgback = (yield from reader.readline()).decode("utf-8").rstrip() + print("< " + msgback) + return msgback + + # send a line + send("add 1 2") + msg = yield from recv() + + Ns = list(range(100, 100000, 10000)) + times = [] + + for N in Ns: + t0 = time.time() + send("repeat {} hello world ".format(N)) + msg = yield from recv() + assert msg == 'begin' + while True: + msg = (yield from reader.readline()).decode("utf-8").rstrip() + if msg == 'end': + break + t1 = time.time() + dt = t1 - t0 + print("Time taken: {:.3f} seconds ({:.6f} per repetition)" + .format(dt, dt/N)) + times.append(dt) + + writer.close() + yield from asyncio.sleep(0.5) + + # creates a client and connects to our server + try: + loop.run_until_complete(client()) + server.stop(loop) + finally: + loop.close() + + +if __name__ == '__main__': + main() diff --git a/examples/udp_echo.py b/examples/udp_echo.py new file mode 100755 index 0000000..93ac7e6 --- /dev/null +++ b/examples/udp_echo.py @@ -0,0 +1,104 @@ +#!/usr/bin/env python3 +"""UDP echo example.""" +import argparse +import sys +import asyncio +try: + import signal +except ImportError: + signal = None + + +class MyServerUdpEchoProtocol: + + def connection_made(self, transport): + print('start', transport) + self.transport = transport + + def datagram_received(self, data, addr): + print('Data received:', data, addr) + self.transport.sendto(data, addr) + + def error_received(self, exc): + print('Error received:', exc) + + def connection_lost(self, exc): + print('stop', exc) + + +class MyClientUdpEchoProtocol: + + message = 'This is the message. It will be echoed.' + + def connection_made(self, transport): + self.transport = transport + print('sending "{}"'.format(self.message)) + self.transport.sendto(self.message.encode()) + print('waiting to receive') + + def datagram_received(self, data, addr): + print('received "{}"'.format(data.decode())) + self.transport.close() + + def error_received(self, exc): + print('Error received:', exc) + + def connection_lost(self, exc): + print('closing transport', exc) + loop = asyncio.get_event_loop() + loop.stop() + + +def start_server(loop, addr): + t = asyncio.Task(loop.create_datagram_endpoint( + MyServerUdpEchoProtocol, local_addr=addr)) + transport, server = loop.run_until_complete(t) + return transport + + +def start_client(loop, addr): + t = asyncio.Task(loop.create_datagram_endpoint( + MyClientUdpEchoProtocol, remote_addr=addr)) + loop.run_until_complete(t) + + +ARGS = argparse.ArgumentParser(description="UDP Echo example.") +ARGS.add_argument( + '--server', action="store_true", dest='server', + default=False, help='Run udp server') +ARGS.add_argument( + '--client', action="store_true", dest='client', + default=False, help='Run udp client') +ARGS.add_argument( + '--host', action="store", dest='host', + default='127.0.0.1', help='Host name') +ARGS.add_argument( + '--port', action="store", dest='port', + default=9999, type=int, help='Port number') + + +if __name__ == '__main__': + args = ARGS.parse_args() + if ':' in args.host: + args.host, port = args.host.split(':', 1) + args.port = int(port) + + if (not (args.server or args.client)) or (args.server and args.client): + print('Please specify --server or --client\n') + ARGS.print_help() + else: + loop = asyncio.get_event_loop() + if signal is not None: + loop.add_signal_handler(signal.SIGINT, loop.stop) + + if '--server' in sys.argv: + server = start_server(loop, (args.host, args.port)) + else: + start_client(loop, (args.host, args.port)) + + try: + loop.run_forever() + finally: + if '--server' in sys.argv: + server.close() + loop.close() |