summaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorVictor Stinner <victor.stinner@gmail.com>2015-01-26 10:52:45 +0100
committerVictor Stinner <victor.stinner@gmail.com>2015-01-26 10:52:45 +0100
commit684f3be00011d3c6cc4f81f5cb61c157e5eb5205 (patch)
tree886f9bbb1d98945db650803971bc72c56cebd142 /examples
downloadtrollius-git-684f3be00011d3c6cc4f81f5cb61c157e5eb5205.tar.gz
Python issue #23208: Add BaseEventLoop._current_handle
In debug mode, BaseEventLoop._run_once() now sets the BaseEventLoop._current_handle attribute to the handle currently executed. In release mode or when no handle is executed, the attribute is None. BaseEventLoop.default_exception_handler() displays the traceback of the current handle if available.
Diffstat (limited to 'examples')
-rw-r--r--examples/cacheclt.py213
-rw-r--r--examples/cachesvr.py249
-rw-r--r--examples/child_process.py128
-rw-r--r--examples/crawl.py863
-rw-r--r--examples/echo_client_tulip.py20
-rw-r--r--examples/echo_server_tulip.py20
-rw-r--r--examples/fetch0.py35
-rw-r--r--examples/fetch1.py78
-rw-r--r--examples/fetch2.py141
-rw-r--r--examples/fetch3.py230
-rw-r--r--examples/fuzz_as_completed.py69
-rw-r--r--examples/hello_callback.py17
-rw-r--r--examples/hello_coroutine.py18
-rw-r--r--examples/shell.py50
-rw-r--r--examples/simple_tcp_server.py154
-rw-r--r--examples/sink.py94
-rw-r--r--examples/source.py100
-rw-r--r--examples/source1.py98
-rw-r--r--examples/stacks.py44
-rw-r--r--examples/subprocess_attach_read_pipe.py33
-rw-r--r--examples/subprocess_attach_write_pipe.py35
-rw-r--r--examples/subprocess_shell.py87
-rwxr-xr-xexamples/tcp_echo.py128
-rw-r--r--examples/timing_tcp_server.py168
-rwxr-xr-xexamples/udp_echo.py104
25 files changed, 3176 insertions, 0 deletions
diff --git a/examples/cacheclt.py b/examples/cacheclt.py
new file mode 100644
index 0000000..b11a4d1
--- /dev/null
+++ b/examples/cacheclt.py
@@ -0,0 +1,213 @@
+"""Client for cache server.
+
+See cachesvr.py for protocol description.
+"""
+
+import argparse
+import asyncio
+from asyncio import test_utils
+import json
+import logging
+
+ARGS = argparse.ArgumentParser(description='Cache client example.')
+ARGS.add_argument(
+ '--tls', action='store_true', dest='tls',
+ default=False, help='Use TLS')
+ARGS.add_argument(
+ '--iocp', action='store_true', dest='iocp',
+ default=False, help='Use IOCP event loop (Windows only)')
+ARGS.add_argument(
+ '--host', action='store', dest='host',
+ default='localhost', help='Host name')
+ARGS.add_argument(
+ '--port', action='store', dest='port',
+ default=54321, type=int, help='Port number')
+ARGS.add_argument(
+ '--timeout', action='store', dest='timeout',
+ default=5, type=float, help='Timeout')
+ARGS.add_argument(
+ '--max_backoff', action='store', dest='max_backoff',
+ default=5, type=float, help='Max backoff on reconnect')
+ARGS.add_argument(
+ '--ntasks', action='store', dest='ntasks',
+ default=10, type=int, help='Number of tester tasks')
+ARGS.add_argument(
+ '--ntries', action='store', dest='ntries',
+ default=5, type=int, help='Number of request tries before giving up')
+
+
+args = ARGS.parse_args()
+
+
+class CacheClient:
+ """Multiplexing cache client.
+
+ This wraps a single connection to the cache client. The
+ connection is automatically re-opened when an error occurs.
+
+ Multiple tasks may share this object; the requests will be
+ serialized.
+
+ The public API is get(), set(), delete() (all are coroutines).
+ """
+
+ def __init__(self, host, port, sslctx=None, loop=None):
+ self.host = host
+ self.port = port
+ self.sslctx = sslctx
+ self.loop = loop
+ self.todo = set()
+ self.initialized = False
+ self.task = asyncio.Task(self.activity(), loop=self.loop)
+
+ @asyncio.coroutine
+ def get(self, key):
+ resp = yield from self.request('get', key)
+ if resp is None:
+ return None
+ return resp.get('value')
+
+ @asyncio.coroutine
+ def set(self, key, value):
+ resp = yield from self.request('set', key, value)
+ if resp is None:
+ return False
+ return resp.get('status') == 'ok'
+
+ @asyncio.coroutine
+ def delete(self, key):
+ resp = yield from self.request('delete', key)
+ if resp is None:
+ return False
+ return resp.get('status') == 'ok'
+
+ @asyncio.coroutine
+ def request(self, type, key, value=None):
+ assert not self.task.done()
+ data = {'type': type, 'key': key}
+ if value is not None:
+ data['value'] = value
+ payload = json.dumps(data).encode('utf8')
+ waiter = asyncio.Future(loop=self.loop)
+ if self.initialized:
+ try:
+ yield from self.send(payload, waiter)
+ except IOError:
+ self.todo.add((payload, waiter))
+ else:
+ self.todo.add((payload, waiter))
+ return (yield from waiter)
+
+ @asyncio.coroutine
+ def activity(self):
+ backoff = 0
+ while True:
+ try:
+ self.reader, self.writer = yield from asyncio.open_connection(
+ self.host, self.port, ssl=self.sslctx, loop=self.loop)
+ except Exception as exc:
+ backoff = min(args.max_backoff, backoff + (backoff//2) + 1)
+ logging.info('Error connecting: %r; sleep %s', exc, backoff)
+ yield from asyncio.sleep(backoff, loop=self.loop)
+ continue
+ backoff = 0
+ self.next_id = 0
+ self.pending = {}
+ self. initialized = True
+ try:
+ while self.todo:
+ payload, waiter = self.todo.pop()
+ if not waiter.done():
+ yield from self.send(payload, waiter)
+ while True:
+ resp_id, resp = yield from self.process()
+ if resp_id in self.pending:
+ payload, waiter = self.pending.pop(resp_id)
+ if not waiter.done():
+ waiter.set_result(resp)
+ except Exception as exc:
+ self.initialized = False
+ self.writer.close()
+ while self.pending:
+ req_id, pair = self.pending.popitem()
+ payload, waiter = pair
+ if not waiter.done():
+ self.todo.add(pair)
+ logging.info('Error processing: %r', exc)
+
+ @asyncio.coroutine
+ def send(self, payload, waiter):
+ self.next_id += 1
+ req_id = self.next_id
+ frame = 'request %d %d\n' % (req_id, len(payload))
+ self.writer.write(frame.encode('ascii'))
+ self.writer.write(payload)
+ self.pending[req_id] = payload, waiter
+ yield from self.writer.drain()
+
+ @asyncio.coroutine
+ def process(self):
+ frame = yield from self.reader.readline()
+ if not frame:
+ raise EOFError()
+ head, tail = frame.split(None, 1)
+ if head == b'error':
+ raise IOError('OOB error: %r' % tail)
+ if head != b'response':
+ raise IOError('Bad frame: %r' % frame)
+ resp_id, resp_size = map(int, tail.split())
+ data = yield from self.reader.readexactly(resp_size)
+ if len(data) != resp_size:
+ raise EOFError()
+ resp = json.loads(data.decode('utf8'))
+ return resp_id, resp
+
+
+def main():
+ asyncio.set_event_loop(None)
+ if args.iocp:
+ from asyncio.windows_events import ProactorEventLoop
+ loop = ProactorEventLoop()
+ else:
+ loop = asyncio.new_event_loop()
+ sslctx = None
+ if args.tls:
+ sslctx = test_utils.dummy_ssl_context()
+ cache = CacheClient(args.host, args.port, sslctx=sslctx, loop=loop)
+ try:
+ loop.run_until_complete(
+ asyncio.gather(
+ *[testing(i, cache, loop) for i in range(args.ntasks)],
+ loop=loop))
+ finally:
+ loop.close()
+
+
+@asyncio.coroutine
+def testing(label, cache, loop):
+
+ def w(g):
+ return asyncio.wait_for(g, args.timeout, loop=loop)
+
+ key = 'foo-%s' % label
+ while True:
+ logging.info('%s %s', label, '-'*20)
+ try:
+ ret = yield from w(cache.set(key, 'hello-%s-world' % label))
+ logging.info('%s set %s', label, ret)
+ ret = yield from w(cache.get(key))
+ logging.info('%s get %s', label, ret)
+ ret = yield from w(cache.delete(key))
+ logging.info('%s del %s', label, ret)
+ ret = yield from w(cache.get(key))
+ logging.info('%s get2 %s', label, ret)
+ except asyncio.TimeoutError:
+ logging.warn('%s Timeout', label)
+ except Exception as exc:
+ logging.exception('%s Client exception: %r', label, exc)
+ break
+
+
+if __name__ == '__main__':
+ logging.basicConfig(level=logging.INFO)
+ main()
diff --git a/examples/cachesvr.py b/examples/cachesvr.py
new file mode 100644
index 0000000..053f9c2
--- /dev/null
+++ b/examples/cachesvr.py
@@ -0,0 +1,249 @@
+"""A simple memcache-like server.
+
+The basic data structure maintained is a single in-memory dictionary
+mapping string keys to string values, with operations get, set and
+delete. (Both keys and values may contain Unicode.)
+
+This is a TCP server listening on port 54321. There is no
+authentication.
+
+Requests provide an operation and return a response. A connection may
+be used for multiple requests. The connection is closed when a client
+sends a bad request.
+
+If a client is idle for over 5 seconds (i.e., it does not send another
+request, or fails to read the whole response, within this time), it is
+disconnected.
+
+Framing of requests and responses within a connection uses a
+line-based protocol. The first line of a request is the frame header
+and contains three whitespace-delimited token followed by LF or CRLF:
+
+- the keyword 'request'
+- a decimal request ID; the first request is '1', the second '2', etc.
+- a decimal byte count giving the size of the rest of the request
+
+Note that the requests ID *must* be consecutive and start at '1' for
+each connection.
+
+Response frames look the same except the keyword is 'response'. The
+response ID matches the request ID. There should be exactly one
+response to each request and responses should be seen in the same
+order as the requests.
+
+After the frame, individual requests and responses are JSON encoded.
+
+If the frame header or the JSON request body cannot be parsed, an
+unframed error message (always starting with 'error') is written back
+and the connection is closed.
+
+JSON-encoded requests can be:
+
+- {"type": "get", "key": <string>}
+- {"type": "set", "key": <string>, "value": <string>}
+- {"type": "delete", "key": <string>}
+
+Responses are also JSON-encoded:
+
+- {"status": "ok", "value": <string>} # Successful get request
+- {"status": "ok"} # Successful set or delete request
+- {"status": "notfound"} # Key not found for get or delete request
+
+If the request is valid JSON but cannot be handled (e.g., the type or
+key field is absent or invalid), an error response of the following
+form is returned, but the connection is not closed:
+
+- {"error": <string>}
+"""
+
+import argparse
+import asyncio
+import json
+import logging
+import os
+import random
+
+ARGS = argparse.ArgumentParser(description='Cache server example.')
+ARGS.add_argument(
+ '--tls', action='store_true', dest='tls',
+ default=False, help='Use TLS')
+ARGS.add_argument(
+ '--iocp', action='store_true', dest='iocp',
+ default=False, help='Use IOCP event loop (Windows only)')
+ARGS.add_argument(
+ '--host', action='store', dest='host',
+ default='localhost', help='Host name')
+ARGS.add_argument(
+ '--port', action='store', dest='port',
+ default=54321, type=int, help='Port number')
+ARGS.add_argument(
+ '--timeout', action='store', dest='timeout',
+ default=5, type=float, help='Timeout')
+ARGS.add_argument(
+ '--random_failure_percent', action='store', dest='fail_percent',
+ default=0, type=float, help='Fail randomly N percent of the time')
+ARGS.add_argument(
+ '--random_failure_sleep', action='store', dest='fail_sleep',
+ default=0, type=float, help='Sleep time when randomly failing')
+ARGS.add_argument(
+ '--random_response_sleep', action='store', dest='resp_sleep',
+ default=0, type=float, help='Sleep time before responding')
+
+args = ARGS.parse_args()
+
+
+class Cache:
+
+ def __init__(self, loop):
+ self.loop = loop
+ self.table = {}
+
+ @asyncio.coroutine
+ def handle_client(self, reader, writer):
+ # Wrapper to log stuff and close writer (i.e., transport).
+ peer = writer.get_extra_info('socket').getpeername()
+ logging.info('got a connection from %s', peer)
+ try:
+ yield from self.frame_parser(reader, writer)
+ except Exception as exc:
+ logging.error('error %r from %s', exc, peer)
+ else:
+ logging.info('end connection from %s', peer)
+ finally:
+ writer.close()
+
+ @asyncio.coroutine
+ def frame_parser(self, reader, writer):
+ # This takes care of the framing.
+ last_request_id = 0
+ while True:
+ # Read the frame header, parse it, read the data.
+ # NOTE: The readline() and readexactly() calls will hang
+ # if the client doesn't send enough data but doesn't
+ # disconnect either. We add a timeout to each. (But the
+ # timeout should really be implemented by StreamReader.)
+ framing_b = yield from asyncio.wait_for(
+ reader.readline(),
+ timeout=args.timeout, loop=self.loop)
+ if random.random()*100 < args.fail_percent:
+ logging.warn('Inserting random failure')
+ yield from asyncio.sleep(args.fail_sleep*random.random(),
+ loop=self.loop)
+ writer.write(b'error random failure\r\n')
+ break
+ logging.debug('framing_b = %r', framing_b)
+ if not framing_b:
+ break # Clean close.
+ try:
+ frame_keyword, request_id_b, byte_count_b = framing_b.split()
+ except ValueError:
+ writer.write(b'error unparseable frame\r\n')
+ break
+ if frame_keyword != b'request':
+ writer.write(b'error frame does not start with request\r\n')
+ break
+ try:
+ request_id, byte_count = int(request_id_b), int(byte_count_b)
+ except ValueError:
+ writer.write(b'error unparsable frame parameters\r\n')
+ break
+ if request_id != last_request_id + 1 or byte_count < 2:
+ writer.write(b'error invalid frame parameters\r\n')
+ break
+ last_request_id = request_id
+ request_b = yield from asyncio.wait_for(
+ reader.readexactly(byte_count),
+ timeout=args.timeout, loop=self.loop)
+ try:
+ request = json.loads(request_b.decode('utf8'))
+ except ValueError:
+ writer.write(b'error unparsable json\r\n')
+ break
+ response = self.handle_request(request) # Not a coroutine.
+ if response is None:
+ writer.write(b'error unhandlable request\r\n')
+ break
+ response_b = json.dumps(response).encode('utf8') + b'\r\n'
+ byte_count = len(response_b)
+ framing_s = 'response {} {}\r\n'.format(request_id, byte_count)
+ writer.write(framing_s.encode('ascii'))
+ yield from asyncio.sleep(args.resp_sleep*random.random(),
+ loop=self.loop)
+ writer.write(response_b)
+
+ def handle_request(self, request):
+ # This parses one request and farms it out to a specific handler.
+ # Return None for all errors.
+ if not isinstance(request, dict):
+ return {'error': 'request is not a dict'}
+ request_type = request.get('type')
+ if request_type is None:
+ return {'error': 'no type in request'}
+ if request_type not in {'get', 'set', 'delete'}:
+ return {'error': 'unknown request type'}
+ key = request.get('key')
+ if not isinstance(key, str):
+ return {'error': 'key is not a string'}
+ if request_type == 'get':
+ return self.handle_get(key)
+ if request_type == 'set':
+ value = request.get('value')
+ if not isinstance(value, str):
+ return {'error': 'value is not a string'}
+ return self.handle_set(key, value)
+ if request_type == 'delete':
+ return self.handle_delete(key)
+ assert False, 'bad request type' # Should have been caught above.
+
+ def handle_get(self, key):
+ value = self.table.get(key)
+ if value is None:
+ return {'status': 'notfound'}
+ else:
+ return {'status': 'ok', 'value': value}
+
+ def handle_set(self, key, value):
+ self.table[key] = value
+ return {'status': 'ok'}
+
+ def handle_delete(self, key):
+ if key not in self.table:
+ return {'status': 'notfound'}
+ else:
+ del self.table[key]
+ return {'status': 'ok'}
+
+
+def main():
+ asyncio.set_event_loop(None)
+ if args.iocp:
+ from asyncio.windows_events import ProactorEventLoop
+ loop = ProactorEventLoop()
+ else:
+ loop = asyncio.new_event_loop()
+ sslctx = None
+ if args.tls:
+ import ssl
+ # TODO: take cert/key from args as well.
+ here = os.path.join(os.path.dirname(__file__), '..', 'tests')
+ sslctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+ sslctx.options |= ssl.OP_NO_SSLv2
+ sslctx.load_cert_chain(
+ certfile=os.path.join(here, 'ssl_cert.pem'),
+ keyfile=os.path.join(here, 'ssl_key.pem'))
+ cache = Cache(loop)
+ task = asyncio.streams.start_server(cache.handle_client,
+ args.host, args.port,
+ ssl=sslctx, loop=loop)
+ svr = loop.run_until_complete(task)
+ for sock in svr.sockets:
+ logging.info('socket %s', sock.getsockname())
+ try:
+ loop.run_forever()
+ finally:
+ loop.close()
+
+
+if __name__ == '__main__':
+ logging.basicConfig(level=logging.INFO)
+ main()
diff --git a/examples/child_process.py b/examples/child_process.py
new file mode 100644
index 0000000..3fac175
--- /dev/null
+++ b/examples/child_process.py
@@ -0,0 +1,128 @@
+"""
+Example of asynchronous interaction with a child python process.
+
+This example shows how to attach an existing Popen object and use the low level
+transport-protocol API. See shell.py and subprocess_shell.py for higher level
+examples.
+"""
+
+import os
+import sys
+
+try:
+ import asyncio
+except ImportError:
+ # asyncio is not installed
+ sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
+ import asyncio
+
+if sys.platform == 'win32':
+ from asyncio.windows_utils import Popen, PIPE
+ from asyncio.windows_events import ProactorEventLoop
+else:
+ from subprocess import Popen, PIPE
+
+#
+# Return a write-only transport wrapping a writable pipe
+#
+
+@asyncio.coroutine
+def connect_write_pipe(file):
+ loop = asyncio.get_event_loop()
+ transport, _ = yield from loop.connect_write_pipe(asyncio.Protocol, file)
+ return transport
+
+#
+# Wrap a readable pipe in a stream
+#
+
+@asyncio.coroutine
+def connect_read_pipe(file):
+ loop = asyncio.get_event_loop()
+ stream_reader = asyncio.StreamReader(loop=loop)
+ def factory():
+ return asyncio.StreamReaderProtocol(stream_reader)
+ transport, _ = yield from loop.connect_read_pipe(factory, file)
+ return stream_reader, transport
+
+
+#
+# Example
+#
+
+@asyncio.coroutine
+def main(loop):
+ # program which prints evaluation of each expression from stdin
+ code = r'''if 1:
+ import os
+ def writeall(fd, buf):
+ while buf:
+ n = os.write(fd, buf)
+ buf = buf[n:]
+ while True:
+ s = os.read(0, 1024)
+ if not s:
+ break
+ s = s.decode('ascii')
+ s = repr(eval(s)) + '\n'
+ s = s.encode('ascii')
+ writeall(1, s)
+ '''
+
+ # commands to send to input
+ commands = iter([b"1+1\n",
+ b"2**16\n",
+ b"1/3\n",
+ b"'x'*50",
+ b"1/0\n"])
+
+ # start subprocess and wrap stdin, stdout, stderr
+ p = Popen([sys.executable, '-c', code],
+ stdin=PIPE, stdout=PIPE, stderr=PIPE)
+
+ stdin = yield from connect_write_pipe(p.stdin)
+ stdout, stdout_transport = yield from connect_read_pipe(p.stdout)
+ stderr, stderr_transport = yield from connect_read_pipe(p.stderr)
+
+ # interact with subprocess
+ name = {stdout:'OUT', stderr:'ERR'}
+ registered = {asyncio.Task(stderr.readline()): stderr,
+ asyncio.Task(stdout.readline()): stdout}
+ while registered:
+ # write command
+ cmd = next(commands, None)
+ if cmd is None:
+ stdin.close()
+ else:
+ print('>>>', cmd.decode('ascii').rstrip())
+ stdin.write(cmd)
+
+ # get and print lines from stdout, stderr
+ timeout = None
+ while registered:
+ done, pending = yield from asyncio.wait(
+ registered, timeout=timeout,
+ return_when=asyncio.FIRST_COMPLETED)
+ if not done:
+ break
+ for f in done:
+ stream = registered.pop(f)
+ res = f.result()
+ print(name[stream], res.decode('ascii').rstrip())
+ if res != b'':
+ registered[asyncio.Task(stream.readline())] = stream
+ timeout = 0.0
+
+ stdout_transport.close()
+ stderr_transport.close()
+
+if __name__ == '__main__':
+ if sys.platform == 'win32':
+ loop = ProactorEventLoop()
+ asyncio.set_event_loop(loop)
+ else:
+ loop = asyncio.get_event_loop()
+ try:
+ loop.run_until_complete(main(loop))
+ finally:
+ loop.close()
diff --git a/examples/crawl.py b/examples/crawl.py
new file mode 100644
index 0000000..4bb0b4e
--- /dev/null
+++ b/examples/crawl.py
@@ -0,0 +1,863 @@
+#!/usr/bin/env python3.4
+
+"""A simple web crawler."""
+
+# TODO:
+# - More organized logging (with task ID or URL?).
+# - Use logging module for Logger.
+# - KeyboardInterrupt in HTML parsing may hang or report unretrieved error.
+# - Support gzip encoding.
+# - Close connection if HTTP/1.0 response.
+# - Add timeouts. (E.g. when switching networks, all seems to hang.)
+# - Add arguments to specify TLS settings (e.g. cert/key files).
+# - Skip reading large non-text/html files?
+# - Use ETag and If-Modified-Since?
+# - Handle out of file descriptors directly? (How?)
+
+import argparse
+import asyncio
+import asyncio.locks
+import cgi
+from http.client import BadStatusLine
+import logging
+import re
+import sys
+import time
+import urllib.parse
+
+
+ARGS = argparse.ArgumentParser(description="Web crawler")
+ARGS.add_argument(
+ '--iocp', action='store_true', dest='iocp',
+ default=False, help='Use IOCP event loop (Windows only)')
+ARGS.add_argument(
+ '--select', action='store_true', dest='select',
+ default=False, help='Use Select event loop instead of default')
+ARGS.add_argument(
+ 'roots', nargs='*',
+ default=[], help='Root URL (may be repeated)')
+ARGS.add_argument(
+ '--max_redirect', action='store', type=int, metavar='N',
+ default=10, help='Limit redirection chains (for 301, 302 etc.)')
+ARGS.add_argument(
+ '--max_tries', action='store', type=int, metavar='N',
+ default=4, help='Limit retries on network errors')
+ARGS.add_argument(
+ '--max_tasks', action='store', type=int, metavar='N',
+ default=100, help='Limit concurrent connections')
+ARGS.add_argument(
+ '--max_pool', action='store', type=int, metavar='N',
+ default=100, help='Limit connection pool size')
+ARGS.add_argument(
+ '--exclude', action='store', metavar='REGEX',
+ help='Exclude matching URLs')
+ARGS.add_argument(
+ '--strict', action='store_true',
+ default=True, help='Strict host matching (default)')
+ARGS.add_argument(
+ '--lenient', action='store_false', dest='strict',
+ default=False, help='Lenient host matching')
+ARGS.add_argument(
+ '-v', '--verbose', action='count', dest='level',
+ default=1, help='Verbose logging (repeat for more verbose)')
+ARGS.add_argument(
+ '-q', '--quiet', action='store_const', const=0, dest='level',
+ default=1, help='Quiet logging (opposite of --verbose)')
+
+
+ESCAPES = [('quot', '"'),
+ ('gt', '>'),
+ ('lt', '<'),
+ ('amp', '&') # Must be last.
+ ]
+
+
+def unescape(url):
+ """Turn &amp; into &, and so on.
+
+ This is the inverse of cgi.escape().
+ """
+ for name, char in ESCAPES:
+ url = url.replace('&' + name + ';', char)
+ return url
+
+
+def fix_url(url):
+ """Prefix a schema-less URL with http://."""
+ if '://' not in url:
+ url = 'http://' + url
+ return url
+
+
+class Logger:
+
+ def __init__(self, level):
+ self.level = level
+
+ def _log(self, n, args):
+ if self.level >= n:
+ print(*args, file=sys.stderr, flush=True)
+
+ def log(self, n, *args):
+ self._log(n, args)
+
+ def __call__(self, n, *args):
+ self._log(n, args)
+
+
+class ConnectionPool:
+ """A connection pool.
+
+ To open a connection, use reserve(). To recycle it, use unreserve().
+
+ The pool is mostly just a mapping from (host, port, ssl) tuples to
+ lists of Connections. The currently active connections are *not*
+ in the data structure; get_connection() takes the connection out,
+ and recycle_connection() puts it back in. To recycle a
+ connection, call conn.close(recycle=True).
+
+ There are limits to both the overall pool and the per-key pool.
+ """
+
+ def __init__(self, log, max_pool=10, max_tasks=5):
+ self.log = log
+ self.max_pool = max_pool # Overall limit.
+ self.max_tasks = max_tasks # Per-key limit.
+ self.loop = asyncio.get_event_loop()
+ self.connections = {} # {(host, port, ssl): [Connection, ...], ...}
+ self.queue = [] # [Connection, ...]
+
+ def close(self):
+ """Close all connections available for reuse."""
+ for conns in self.connections.values():
+ for conn in conns:
+ conn.close()
+ self.connections.clear()
+ self.queue.clear()
+
+ @asyncio.coroutine
+ def get_connection(self, host, port, ssl):
+ """Create or reuse a connection."""
+ port = port or (443 if ssl else 80)
+ try:
+ ipaddrs = yield from self.loop.getaddrinfo(host, port)
+ except Exception as exc:
+ self.log(0, 'Exception %r for (%r, %r)' % (exc, host, port))
+ raise
+ self.log(1, '* %s resolves to %s' %
+ (host, ', '.join(ip[4][0] for ip in ipaddrs)))
+
+ # Look for a reusable connection.
+ for _, _, _, _, (h, p, *_) in ipaddrs:
+ key = h, p, ssl
+ conn = None
+ conns = self.connections.get(key)
+ while conns:
+ conn = conns.pop(0)
+ self.queue.remove(conn)
+ if not conns:
+ del self.connections[key]
+ if conn.stale():
+ self.log(1, 'closing stale connection for', key)
+ conn.close() # Just in case.
+ else:
+ self.log(1, '* Reusing pooled connection', key,
+ 'FD =', conn.fileno())
+ return conn
+
+ # Create a new connection.
+ conn = Connection(self.log, self, host, port, ssl)
+ yield from conn.connect()
+ self.log(1, '* New connection', conn.key, 'FD =', conn.fileno())
+ return conn
+
+ def recycle_connection(self, conn):
+ """Make a connection available for reuse.
+
+ This also prunes the pool if it exceeds the size limits.
+ """
+ if conn.stale():
+ conn.close()
+ return
+
+ key = conn.key
+ conns = self.connections.setdefault(key, [])
+ conns.append(conn)
+ self.queue.append(conn)
+
+ if len(conns) <= self.max_tasks and len(self.queue) <= self.max_pool:
+ return
+
+ # Prune the queue.
+
+ # Close stale connections for this key first.
+ stale = [conn for conn in conns if conn.stale()]
+ if stale:
+ for conn in stale:
+ conns.remove(conn)
+ self.queue.remove(conn)
+ self.log(1, 'closing stale connection for', key)
+ conn.close()
+ if not conns:
+ del self.connections[key]
+
+ # Close oldest connection(s) for this key if limit reached.
+ while len(conns) > self.max_tasks:
+ conn = conns.pop(0)
+ self.queue.remove(conn)
+ self.log(1, 'closing oldest connection for', key)
+ conn.close()
+
+ if len(self.queue) <= self.max_pool:
+ return
+
+ # Close overall stale connections.
+ stale = [conn for conn in self.queue if conn.stale()]
+ if stale:
+ for conn in stale:
+ conns = self.connections.get(conn.key)
+ conns.remove(conn)
+ self.queue.remove(conn)
+ self.log(1, 'closing stale connection for', key)
+ conn.close()
+
+ # Close oldest overall connection(s) if limit reached.
+ while len(self.queue) > self.max_pool:
+ conn = self.queue.pop(0)
+ conns = self.connections.get(conn.key)
+ c = conns.pop(0)
+ assert conn == c, (conn.key, conn, c, conns)
+ self.log(1, 'closing overall oldest connection for', conn.key)
+ conn.close()
+
+
+class Connection:
+
+ def __init__(self, log, pool, host, port, ssl):
+ self.log = log
+ self.pool = pool
+ self.host = host
+ self.port = port
+ self.ssl = ssl
+ self.reader = None
+ self.writer = None
+ self.key = None
+
+ def stale(self):
+ return self.reader is None or self.reader.at_eof()
+
+ def fileno(self):
+ writer = self.writer
+ if writer is not None:
+ transport = writer.transport
+ if transport is not None:
+ sock = transport.get_extra_info('socket')
+ if sock is not None:
+ return sock.fileno()
+ return None
+
+ @asyncio.coroutine
+ def connect(self):
+ self.reader, self.writer = yield from asyncio.open_connection(
+ self.host, self.port, ssl=self.ssl)
+ peername = self.writer.get_extra_info('peername')
+ if peername:
+ self.host, self.port = peername[:2]
+ else:
+ self.log(1, 'NO PEERNAME???', self.host, self.port, self.ssl)
+ self.key = self.host, self.port, self.ssl
+
+ def close(self, recycle=False):
+ if recycle and not self.stale():
+ self.pool.recycle_connection(self)
+ else:
+ self.writer.close()
+ self.pool = self.reader = self.writer = None
+
+
+class Request:
+ """HTTP request.
+
+ Use connect() to open a connection; send_request() to send the
+ request; get_response() to receive the response headers.
+ """
+
+ def __init__(self, log, url, pool):
+ self.log = log
+ self.url = url
+ self.pool = pool
+ self.parts = urllib.parse.urlparse(self.url)
+ self.scheme = self.parts.scheme
+ assert self.scheme in ('http', 'https'), repr(url)
+ self.ssl = self.parts.scheme == 'https'
+ self.netloc = self.parts.netloc
+ self.hostname = self.parts.hostname
+ self.port = self.parts.port or (443 if self.ssl else 80)
+ self.path = (self.parts.path or '/')
+ self.query = self.parts.query
+ if self.query:
+ self.full_path = '%s?%s' % (self.path, self.query)
+ else:
+ self.full_path = self.path
+ self.http_version = 'HTTP/1.1'
+ self.method = 'GET'
+ self.headers = []
+ self.conn = None
+
+ @asyncio.coroutine
+ def connect(self):
+ """Open a connection to the server."""
+ self.log(1, '* Connecting to %s:%s using %s for %s' %
+ (self.hostname, self.port,
+ 'ssl' if self.ssl else 'tcp',
+ self.url))
+ self.conn = yield from self.pool.get_connection(self.hostname,
+ self.port, self.ssl)
+
+ def close(self, recycle=False):
+ """Close the connection, recycle if requested."""
+ if self.conn is not None:
+ if not recycle:
+ self.log(1, 'closing connection for', self.conn.key)
+ self.conn.close(recycle)
+ self.conn = None
+
+ @asyncio.coroutine
+ def putline(self, line):
+ """Write a line to the connection.
+
+ Used for the request line and headers.
+ """
+ self.log(2, '>', line)
+ self.conn.writer.write(line.encode('latin-1') + b'\r\n')
+
+ @asyncio.coroutine
+ def send_request(self):
+ """Send the request."""
+ request_line = '%s %s %s' % (self.method, self.full_path,
+ self.http_version)
+ yield from self.putline(request_line)
+ # TODO: What if a header is already set?
+ self.headers.append(('User-Agent', 'asyncio-example-crawl/0.0'))
+ self.headers.append(('Host', self.netloc))
+ self.headers.append(('Accept', '*/*'))
+ ##self.headers.append(('Accept-Encoding', 'gzip'))
+ for key, value in self.headers:
+ line = '%s: %s' % (key, value)
+ yield from self.putline(line)
+ yield from self.putline('')
+
+ @asyncio.coroutine
+ def get_response(self):
+ """Receive the response."""
+ response = Response(self.log, self.conn.reader)
+ yield from response.read_headers()
+ return response
+
+
+class Response:
+ """HTTP response.
+
+ Call read_headers() to receive the request headers. Then check
+ the status attribute and call get_header() to inspect the headers.
+ Finally call read() to receive the body.
+ """
+
+ def __init__(self, log, reader):
+ self.log = log
+ self.reader = reader
+ self.http_version = None # 'HTTP/1.1'
+ self.status = None # 200
+ self.reason = None # 'Ok'
+ self.headers = [] # [('Content-Type', 'text/html')]
+
+ @asyncio.coroutine
+ def getline(self):
+ """Read one line from the connection."""
+ line = (yield from self.reader.readline()).decode('latin-1').rstrip()
+ self.log(2, '<', line)
+ return line
+
+ @asyncio.coroutine
+ def read_headers(self):
+ """Read the response status and the request headers."""
+ status_line = yield from self.getline()
+ status_parts = status_line.split(None, 2)
+ if len(status_parts) != 3:
+ self.log(0, 'bad status_line', repr(status_line))
+ raise BadStatusLine(status_line)
+ self.http_version, status, self.reason = status_parts
+ self.status = int(status)
+ while True:
+ header_line = yield from self.getline()
+ if not header_line:
+ break
+ # TODO: Continuation lines.
+ key, value = header_line.split(':', 1)
+ self.headers.append((key, value.strip()))
+
+ def get_redirect_url(self, default=''):
+ """Inspect the status and return the redirect url if appropriate."""
+ if self.status not in (300, 301, 302, 303, 307):
+ return default
+ return self.get_header('Location', default)
+
+ def get_header(self, key, default=''):
+ """Get one header value, using a case insensitive header name."""
+ key = key.lower()
+ for k, v in self.headers:
+ if k.lower() == key:
+ return v
+ return default
+
+ @asyncio.coroutine
+ def read(self):
+ """Read the response body.
+
+ This honors Content-Length and Transfer-Encoding: chunked.
+ """
+ nbytes = None
+ for key, value in self.headers:
+ if key.lower() == 'content-length':
+ nbytes = int(value)
+ break
+ if nbytes is None:
+ if self.get_header('transfer-encoding').lower() == 'chunked':
+ self.log(2, 'parsing chunked response')
+ blocks = []
+ while True:
+ size_header = yield from self.reader.readline()
+ if not size_header:
+ self.log(0, 'premature end of chunked response')
+ break
+ self.log(3, 'size_header =', repr(size_header))
+ parts = size_header.split(b';')
+ size = int(parts[0], 16)
+ if size:
+ self.log(3, 'reading chunk of', size, 'bytes')
+ block = yield from self.reader.readexactly(size)
+ assert len(block) == size, (len(block), size)
+ blocks.append(block)
+ crlf = yield from self.reader.readline()
+ assert crlf == b'\r\n', repr(crlf)
+ if not size:
+ break
+ body = b''.join(blocks)
+ self.log(1, 'chunked response had', len(body),
+ 'bytes in', len(blocks), 'blocks')
+ else:
+ self.log(3, 'reading until EOF')
+ body = yield from self.reader.read()
+ # TODO: Should make sure not to recycle the connection
+ # in this case.
+ else:
+ body = yield from self.reader.readexactly(nbytes)
+ return body
+
+
+class Fetcher:
+ """Logic and state for one URL.
+
+ When found in crawler.busy, this represents a URL to be fetched or
+ in the process of being fetched; when found in crawler.done, this
+ holds the results from fetching it.
+
+ This is usually associated with a task. This references the
+ crawler for the connection pool and to add more URLs to its todo
+ list.
+
+ Call fetch() to do the fetching, then report() to print the results.
+ """
+
+ def __init__(self, log, url, crawler, max_redirect=10, max_tries=4):
+ self.log = log
+ self.url = url
+ self.crawler = crawler
+ # We don't loop resolving redirects here -- we just use this
+ # to decide whether to add the redirect URL to crawler.todo.
+ self.max_redirect = max_redirect
+ # But we do loop to retry on errors a few times.
+ self.max_tries = max_tries
+ # Everything we collect from the response goes here.
+ self.task = None
+ self.exceptions = []
+ self.tries = 0
+ self.request = None
+ self.response = None
+ self.body = None
+ self.next_url = None
+ self.ctype = None
+ self.pdict = None
+ self.encoding = None
+ self.urls = None
+ self.new_urls = None
+
+ @asyncio.coroutine
+ def fetch(self):
+ """Attempt to fetch the contents of the URL.
+
+ If successful, and the data is HTML, extract further links and
+ add them to the crawler. Redirects are also added back there.
+ """
+ while self.tries < self.max_tries:
+ self.tries += 1
+ self.request = None
+ try:
+ self.request = Request(self.log, self.url, self.crawler.pool)
+ yield from self.request.connect()
+ yield from self.request.send_request()
+ self.response = yield from self.request.get_response()
+ self.body = yield from self.response.read()
+ h_conn = self.response.get_header('connection').lower()
+ if h_conn != 'close':
+ self.request.close(recycle=True)
+ self.request = None
+ if self.tries > 1:
+ self.log(1, 'try', self.tries, 'for', self.url, 'success')
+ break
+ except (BadStatusLine, OSError) as exc:
+ self.exceptions.append(exc)
+ self.log(1, 'try', self.tries, 'for', self.url,
+ 'raised', repr(exc))
+ ##import pdb; pdb.set_trace()
+ # Don't reuse the connection in this case.
+ finally:
+ if self.request is not None:
+ self.request.close()
+ else:
+ # We never broke out of the while loop, i.e. all tries failed.
+ self.log(0, 'no success for', self.url,
+ 'in', self.max_tries, 'tries')
+ return
+ next_url = self.response.get_redirect_url()
+ if next_url:
+ self.next_url = urllib.parse.urljoin(self.url, next_url)
+ if self.max_redirect > 0:
+ self.log(1, 'redirect to', self.next_url, 'from', self.url)
+ self.crawler.add_url(self.next_url, self.max_redirect-1)
+ else:
+ self.log(0, 'redirect limit reached for', self.next_url,
+ 'from', self.url)
+ else:
+ if self.response.status == 200:
+ self.ctype = self.response.get_header('content-type')
+ self.pdict = {}
+ if self.ctype:
+ self.ctype, self.pdict = cgi.parse_header(self.ctype)
+ self.encoding = self.pdict.get('charset', 'utf-8')
+ if self.ctype == 'text/html':
+ body = self.body.decode(self.encoding, 'replace')
+ # Replace href with (?:href|src) to follow image links.
+ self.urls = set(re.findall(r'(?i)href=["\']?([^\s"\'<>]+)',
+ body))
+ if self.urls:
+ self.log(1, 'got', len(self.urls),
+ 'distinct urls from', self.url)
+ self.new_urls = set()
+ for url in self.urls:
+ url = unescape(url)
+ url = urllib.parse.urljoin(self.url, url)
+ url, frag = urllib.parse.urldefrag(url)
+ if self.crawler.add_url(url):
+ self.new_urls.add(url)
+
+ def report(self, stats, file=None):
+ """Print a report on the state for this URL.
+
+ Also update the Stats instance.
+ """
+ if self.task is not None:
+ if not self.task.done():
+ stats.add('pending')
+ print(self.url, 'pending', file=file)
+ return
+ elif self.task.cancelled():
+ stats.add('cancelled')
+ print(self.url, 'cancelled', file=file)
+ return
+ elif self.task.exception():
+ stats.add('exception')
+ exc = self.task.exception()
+ stats.add('exception_' + exc.__class__.__name__)
+ print(self.url, exc, file=file)
+ return
+ if len(self.exceptions) == self.tries:
+ stats.add('fail')
+ exc = self.exceptions[-1]
+ stats.add('fail_' + str(exc.__class__.__name__))
+ print(self.url, 'error', exc, file=file)
+ elif self.next_url:
+ stats.add('redirect')
+ print(self.url, self.response.status, 'redirect', self.next_url,
+ file=file)
+ elif self.ctype == 'text/html':
+ stats.add('html')
+ size = len(self.body or b'')
+ stats.add('html_bytes', size)
+ print(self.url, self.response.status,
+ self.ctype, self.encoding,
+ size,
+ '%d/%d' % (len(self.new_urls or ()), len(self.urls or ())),
+ file=file)
+ elif self.response is None:
+ print(self.url, 'no response object')
+ else:
+ size = len(self.body or b'')
+ if self.response.status == 200:
+ stats.add('other')
+ stats.add('other_bytes', size)
+ else:
+ stats.add('error')
+ stats.add('error_bytes', size)
+ stats.add('status_%s' % self.response.status)
+ print(self.url, self.response.status,
+ self.ctype, self.encoding,
+ size,
+ file=file)
+
+
+class Stats:
+ """Record stats of various sorts."""
+
+ def __init__(self):
+ self.stats = {}
+
+ def add(self, key, count=1):
+ self.stats[key] = self.stats.get(key, 0) + count
+
+ def report(self, file=None):
+ for key, count in sorted(self.stats.items()):
+ print('%10d' % count, key, file=file)
+
+
+class Crawler:
+ """Crawl a set of URLs.
+
+ This manages three disjoint sets of URLs (todo, busy, done). The
+ data structures actually store dicts -- the values in todo give
+ the redirect limit, while the values in busy and done are Fetcher
+ instances.
+ """
+ def __init__(self, log,
+ roots, exclude=None, strict=True, # What to crawl.
+ max_redirect=10, max_tries=4, # Per-url limits.
+ max_tasks=10, max_pool=10, # Global limits.
+ ):
+ self.log = log
+ self.roots = roots
+ self.exclude = exclude
+ self.strict = strict
+ self.max_redirect = max_redirect
+ self.max_tries = max_tries
+ self.max_tasks = max_tasks
+ self.max_pool = max_pool
+ self.todo = {}
+ self.busy = {}
+ self.done = {}
+ self.pool = ConnectionPool(self.log, max_pool, max_tasks)
+ self.root_domains = set()
+ for root in roots:
+ parts = urllib.parse.urlparse(root)
+ host, port = urllib.parse.splitport(parts.netloc)
+ if not host:
+ continue
+ if re.match(r'\A[\d\.]*\Z', host):
+ self.root_domains.add(host)
+ else:
+ host = host.lower()
+ if self.strict:
+ self.root_domains.add(host)
+ if host.startswith('www.'):
+ self.root_domains.add(host[4:])
+ else:
+ self.root_domains.add('www.' + host)
+ else:
+ parts = host.split('.')
+ if len(parts) > 2:
+ host = '.'.join(parts[-2:])
+ self.root_domains.add(host)
+ for root in roots:
+ self.add_url(root)
+ self.governor = asyncio.locks.Semaphore(max_tasks)
+ self.termination = asyncio.locks.Condition()
+ self.t0 = time.time()
+ self.t1 = None
+
+ def close(self):
+ """Close resources (currently only the pool)."""
+ self.pool.close()
+
+ def host_okay(self, host):
+ """Check if a host should be crawled.
+
+ A literal match (after lowercasing) is always good. For hosts
+ that don't look like IP addresses, some approximate matches
+ are okay depending on the strict flag.
+ """
+ host = host.lower()
+ if host in self.root_domains:
+ return True
+ if re.match(r'\A[\d\.]*\Z', host):
+ return False
+ if self.strict:
+ return self._host_okay_strictish(host)
+ else:
+ return self._host_okay_lenient(host)
+
+ def _host_okay_strictish(self, host):
+ """Check if a host should be crawled, strict-ish version.
+
+ This checks for equality modulo an initial 'www.' component.
+ """
+ if host.startswith('www.'):
+ if host[4:] in self.root_domains:
+ return True
+ else:
+ if 'www.' + host in self.root_domains:
+ return True
+ return False
+
+ def _host_okay_lenient(self, host):
+ """Check if a host should be crawled, lenient version.
+
+ This compares the last two components of the host.
+ """
+ parts = host.split('.')
+ if len(parts) > 2:
+ host = '.'.join(parts[-2:])
+ return host in self.root_domains
+
+ def add_url(self, url, max_redirect=None):
+ """Add a URL to the todo list if not seen before."""
+ if self.exclude and re.search(self.exclude, url):
+ return False
+ parts = urllib.parse.urlparse(url)
+ if parts.scheme not in ('http', 'https'):
+ self.log(2, 'skipping non-http scheme in', url)
+ return False
+ host, port = urllib.parse.splitport(parts.netloc)
+ if not self.host_okay(host):
+ self.log(2, 'skipping non-root host in', url)
+ return False
+ if max_redirect is None:
+ max_redirect = self.max_redirect
+ if url in self.todo or url in self.busy or url in self.done:
+ return False
+ self.log(1, 'adding', url, max_redirect)
+ self.todo[url] = max_redirect
+ return True
+
+ @asyncio.coroutine
+ def crawl(self):
+ """Run the crawler until all finished."""
+ with (yield from self.termination):
+ while self.todo or self.busy:
+ if self.todo:
+ url, max_redirect = self.todo.popitem()
+ fetcher = Fetcher(self.log, url,
+ crawler=self,
+ max_redirect=max_redirect,
+ max_tries=self.max_tries,
+ )
+ self.busy[url] = fetcher
+ fetcher.task = asyncio.Task(self.fetch(fetcher))
+ else:
+ yield from self.termination.wait()
+ self.t1 = time.time()
+
+ @asyncio.coroutine
+ def fetch(self, fetcher):
+ """Call the Fetcher's fetch(), with a limit on concurrency.
+
+ Once this returns, move the fetcher from busy to done.
+ """
+ url = fetcher.url
+ with (yield from self.governor):
+ try:
+ yield from fetcher.fetch() # Fetcher gonna fetch.
+ finally:
+ # Force GC of the task, so the error is logged.
+ fetcher.task = None
+ with (yield from self.termination):
+ self.done[url] = fetcher
+ del self.busy[url]
+ self.termination.notify()
+
+ def report(self, file=None):
+ """Print a report on all completed URLs."""
+ if self.t1 is None:
+ self.t1 = time.time()
+ dt = self.t1 - self.t0
+ if dt and self.max_tasks:
+ speed = len(self.done) / dt / self.max_tasks
+ else:
+ speed = 0
+ stats = Stats()
+ print('*** Report ***', file=file)
+ try:
+ show = []
+ show.extend(self.done.items())
+ show.extend(self.busy.items())
+ show.sort()
+ for url, fetcher in show:
+ fetcher.report(stats, file=file)
+ except KeyboardInterrupt:
+ print('\nInterrupted', file=file)
+ print('Finished', len(self.done),
+ 'urls in %.3f secs' % dt,
+ '(max_tasks=%d)' % self.max_tasks,
+ '(%.3f urls/sec/task)' % speed,
+ file=file)
+ stats.report(file=file)
+ print('Todo:', len(self.todo), file=file)
+ print('Busy:', len(self.busy), file=file)
+ print('Done:', len(self.done), file=file)
+ print('Date:', time.ctime(), 'local time', file=file)
+
+
+def main():
+ """Main program.
+
+ Parse arguments, set up event loop, run crawler, print report.
+ """
+ args = ARGS.parse_args()
+ if not args.roots:
+ print('Use --help for command line help')
+ return
+
+ log = Logger(args.level)
+
+ if args.iocp:
+ from asyncio.windows_events import ProactorEventLoop
+ loop = ProactorEventLoop()
+ asyncio.set_event_loop(loop)
+ elif args.select:
+ loop = asyncio.SelectorEventLoop()
+ asyncio.set_event_loop(loop)
+ else:
+ loop = asyncio.get_event_loop()
+
+ roots = {fix_url(root) for root in args.roots}
+
+ crawler = Crawler(log,
+ roots, exclude=args.exclude,
+ strict=args.strict,
+ max_redirect=args.max_redirect,
+ max_tries=args.max_tries,
+ max_tasks=args.max_tasks,
+ max_pool=args.max_pool,
+ )
+ try:
+ loop.run_until_complete(crawler.crawl()) # Crawler gonna crawl.
+ except KeyboardInterrupt:
+ sys.stderr.flush()
+ print('\nInterrupted\n')
+ finally:
+ crawler.report()
+ crawler.close()
+ loop.close()
+
+
+if __name__ == '__main__':
+ logging.basicConfig(level=logging.INFO)
+ main()
diff --git a/examples/echo_client_tulip.py b/examples/echo_client_tulip.py
new file mode 100644
index 0000000..88124ef
--- /dev/null
+++ b/examples/echo_client_tulip.py
@@ -0,0 +1,20 @@
+import asyncio
+
+END = b'Bye-bye!\n'
+
+@asyncio.coroutine
+def echo_client():
+ reader, writer = yield from asyncio.open_connection('localhost', 8000)
+ writer.write(b'Hello, world\n')
+ writer.write(b'What a fine day it is.\n')
+ writer.write(END)
+ while True:
+ line = yield from reader.readline()
+ print('received:', line)
+ if line == END or not line:
+ break
+ writer.close()
+
+loop = asyncio.get_event_loop()
+loop.run_until_complete(echo_client())
+loop.close()
diff --git a/examples/echo_server_tulip.py b/examples/echo_server_tulip.py
new file mode 100644
index 0000000..8167e54
--- /dev/null
+++ b/examples/echo_server_tulip.py
@@ -0,0 +1,20 @@
+import asyncio
+
+@asyncio.coroutine
+def echo_server():
+ yield from asyncio.start_server(handle_connection, 'localhost', 8000)
+
+@asyncio.coroutine
+def handle_connection(reader, writer):
+ while True:
+ data = yield from reader.read(8192)
+ if not data:
+ break
+ writer.write(data)
+
+loop = asyncio.get_event_loop()
+loop.run_until_complete(echo_server())
+try:
+ loop.run_forever()
+finally:
+ loop.close()
diff --git a/examples/fetch0.py b/examples/fetch0.py
new file mode 100644
index 0000000..180fcf2
--- /dev/null
+++ b/examples/fetch0.py
@@ -0,0 +1,35 @@
+"""Simplest possible HTTP client."""
+
+import sys
+
+from asyncio import *
+
+
+@coroutine
+def fetch():
+ r, w = yield from open_connection('python.org', 80)
+ request = 'GET / HTTP/1.0\r\n\r\n'
+ print('>', request, file=sys.stderr)
+ w.write(request.encode('latin-1'))
+ while True:
+ line = yield from r.readline()
+ line = line.decode('latin-1').rstrip()
+ if not line:
+ break
+ print('<', line, file=sys.stderr)
+ print(file=sys.stderr)
+ body = yield from r.read()
+ return body
+
+
+def main():
+ loop = get_event_loop()
+ try:
+ body = loop.run_until_complete(fetch())
+ finally:
+ loop.close()
+ print(body.decode('latin-1'), end='')
+
+
+if __name__ == '__main__':
+ main()
diff --git a/examples/fetch1.py b/examples/fetch1.py
new file mode 100644
index 0000000..8dbb6e4
--- /dev/null
+++ b/examples/fetch1.py
@@ -0,0 +1,78 @@
+"""Fetch one URL and write its content to stdout.
+
+This version adds URL parsing (including SSL) and a Response object.
+"""
+
+import sys
+import urllib.parse
+
+from asyncio import *
+
+
+class Response:
+
+ def __init__(self, verbose=True):
+ self.verbose = verbose
+ self.http_version = None # 'HTTP/1.1'
+ self.status = None # 200
+ self.reason = None # 'Ok'
+ self.headers = [] # [('Content-Type', 'text/html')]
+
+ @coroutine
+ def read(self, reader):
+ @coroutine
+ def getline():
+ return (yield from reader.readline()).decode('latin-1').rstrip()
+ status_line = yield from getline()
+ if self.verbose: print('<', status_line, file=sys.stderr)
+ self.http_version, status, self.reason = status_line.split(None, 2)
+ self.status = int(status)
+ while True:
+ header_line = yield from getline()
+ if not header_line:
+ break
+ if self.verbose: print('<', header_line, file=sys.stderr)
+ # TODO: Continuation lines.
+ key, value = header_line.split(':', 1)
+ self.headers.append((key, value.strip()))
+ if self.verbose: print(file=sys.stderr)
+
+
+@coroutine
+def fetch(url, verbose=True):
+ parts = urllib.parse.urlparse(url)
+ if parts.scheme == 'http':
+ ssl = False
+ elif parts.scheme == 'https':
+ ssl = True
+ else:
+ print('URL must use http or https.')
+ sys.exit(1)
+ port = parts.port
+ if port is None:
+ port = 443 if ssl else 80
+ path = parts.path or '/'
+ if parts.query:
+ path += '?' + parts.query
+ request = 'GET %s HTTP/1.0\r\n\r\n' % path
+ if verbose:
+ print('>', request, file=sys.stderr, end='')
+ r, w = yield from open_connection(parts.hostname, port, ssl=ssl)
+ w.write(request.encode('latin-1'))
+ response = Response(verbose)
+ yield from response.read(r)
+ body = yield from r.read()
+ return body
+
+
+def main():
+ loop = get_event_loop()
+ try:
+ body = loop.run_until_complete(fetch(sys.argv[1], '-v' in sys.argv))
+ finally:
+ loop.close()
+ print(body.decode('latin-1'), end='')
+
+
+if __name__ == '__main__':
+ main()
diff --git a/examples/fetch2.py b/examples/fetch2.py
new file mode 100644
index 0000000..7617b59
--- /dev/null
+++ b/examples/fetch2.py
@@ -0,0 +1,141 @@
+"""Fetch one URL and write its content to stdout.
+
+This version adds a Request object.
+"""
+
+import sys
+import urllib.parse
+from http.client import BadStatusLine
+
+from asyncio import *
+
+
+class Request:
+
+ def __init__(self, url, verbose=True):
+ self.url = url
+ self.verbose = verbose
+ self.parts = urllib.parse.urlparse(self.url)
+ self.scheme = self.parts.scheme
+ assert self.scheme in ('http', 'https'), repr(url)
+ self.ssl = self.parts.scheme == 'https'
+ self.netloc = self.parts.netloc
+ self.hostname = self.parts.hostname
+ self.port = self.parts.port or (443 if self.ssl else 80)
+ self.path = (self.parts.path or '/')
+ self.query = self.parts.query
+ if self.query:
+ self.full_path = '%s?%s' % (self.path, self.query)
+ else:
+ self.full_path = self.path
+ self.http_version = 'HTTP/1.1'
+ self.method = 'GET'
+ self.headers = []
+ self.reader = None
+ self.writer = None
+
+ @coroutine
+ def connect(self):
+ if self.verbose:
+ print('* Connecting to %s:%s using %s' %
+ (self.hostname, self.port, 'ssl' if self.ssl else 'tcp'),
+ file=sys.stderr)
+ self.reader, self.writer = yield from open_connection(self.hostname,
+ self.port,
+ ssl=self.ssl)
+ if self.verbose:
+ print('* Connected to %s' %
+ (self.writer.get_extra_info('peername'),),
+ file=sys.stderr)
+
+ def putline(self, line):
+ self.writer.write(line.encode('latin-1') + b'\r\n')
+
+ @coroutine
+ def send_request(self):
+ request = '%s %s %s' % (self.method, self.full_path, self.http_version)
+ if self.verbose: print('>', request, file=sys.stderr)
+ self.putline(request)
+ if 'host' not in {key.lower() for key, _ in self.headers}:
+ self.headers.insert(0, ('Host', self.netloc))
+ for key, value in self.headers:
+ line = '%s: %s' % (key, value)
+ if self.verbose: print('>', line, file=sys.stderr)
+ self.putline(line)
+ self.putline('')
+
+ @coroutine
+ def get_response(self):
+ response = Response(self.reader, self.verbose)
+ yield from response.read_headers()
+ return response
+
+
+class Response:
+
+ def __init__(self, reader, verbose=True):
+ self.reader = reader
+ self.verbose = verbose
+ self.http_version = None # 'HTTP/1.1'
+ self.status = None # 200
+ self.reason = None # 'Ok'
+ self.headers = [] # [('Content-Type', 'text/html')]
+
+ @coroutine
+ def getline(self):
+ return (yield from self.reader.readline()).decode('latin-1').rstrip()
+
+ @coroutine
+ def read_headers(self):
+ status_line = yield from self.getline()
+ if self.verbose: print('<', status_line, file=sys.stderr)
+ status_parts = status_line.split(None, 2)
+ if len(status_parts) != 3:
+ raise BadStatusLine(status_line)
+ self.http_version, status, self.reason = status_parts
+ self.status = int(status)
+ while True:
+ header_line = yield from self.getline()
+ if not header_line:
+ break
+ if self.verbose: print('<', header_line, file=sys.stderr)
+ # TODO: Continuation lines.
+ key, value = header_line.split(':', 1)
+ self.headers.append((key, value.strip()))
+ if self.verbose: print(file=sys.stderr)
+
+ @coroutine
+ def read(self):
+ nbytes = None
+ for key, value in self.headers:
+ if key.lower() == 'content-length':
+ nbytes = int(value)
+ break
+ if nbytes is None:
+ body = yield from self.reader.read()
+ else:
+ body = yield from self.reader.readexactly(nbytes)
+ return body
+
+
+@coroutine
+def fetch(url, verbose=True):
+ request = Request(url, verbose)
+ yield from request.connect()
+ yield from request.send_request()
+ response = yield from request.get_response()
+ body = yield from response.read()
+ return body
+
+
+def main():
+ loop = get_event_loop()
+ try:
+ body = loop.run_until_complete(fetch(sys.argv[1], '-v' in sys.argv))
+ finally:
+ loop.close()
+ sys.stdout.buffer.write(body)
+
+
+if __name__ == '__main__':
+ main()
diff --git a/examples/fetch3.py b/examples/fetch3.py
new file mode 100644
index 0000000..9419afd
--- /dev/null
+++ b/examples/fetch3.py
@@ -0,0 +1,230 @@
+"""Fetch one URL and write its content to stdout.
+
+This version adds a primitive connection pool, redirect following and
+chunked transfer-encoding. It also supports a --iocp flag.
+"""
+
+import sys
+import urllib.parse
+from http.client import BadStatusLine
+
+from asyncio import *
+
+
+class ConnectionPool:
+ # TODO: Locking? Close idle connections?
+
+ def __init__(self, verbose=False):
+ self.verbose = verbose
+ self.connections = {} # {(host, port, ssl): (reader, writer)}
+
+ def close(self):
+ for _, writer in self.connections.values():
+ writer.close()
+
+ @coroutine
+ def open_connection(self, host, port, ssl):
+ port = port or (443 if ssl else 80)
+ ipaddrs = yield from get_event_loop().getaddrinfo(host, port)
+ if self.verbose:
+ print('* %s resolves to %s' %
+ (host, ', '.join(ip[4][0] for ip in ipaddrs)),
+ file=sys.stderr)
+ for _, _, _, _, (h, p, *_) in ipaddrs:
+ key = h, p, ssl
+ conn = self.connections.get(key)
+ if conn:
+ reader, writer = conn
+ if reader._eof:
+ self.connections.pop(key)
+ continue
+ if self.verbose:
+ print('* Reusing pooled connection', key, file=sys.stderr)
+ return conn
+ reader, writer = yield from open_connection(host, port, ssl=ssl)
+ host, port, *_ = writer.get_extra_info('peername')
+ key = host, port, ssl
+ self.connections[key] = reader, writer
+ if self.verbose:
+ print('* New connection', key, file=sys.stderr)
+ return reader, writer
+
+
+class Request:
+
+ def __init__(self, url, verbose=True):
+ self.url = url
+ self.verbose = verbose
+ self.parts = urllib.parse.urlparse(self.url)
+ self.scheme = self.parts.scheme
+ assert self.scheme in ('http', 'https'), repr(url)
+ self.ssl = self.parts.scheme == 'https'
+ self.netloc = self.parts.netloc
+ self.hostname = self.parts.hostname
+ self.port = self.parts.port or (443 if self.ssl else 80)
+ self.path = (self.parts.path or '/')
+ self.query = self.parts.query
+ if self.query:
+ self.full_path = '%s?%s' % (self.path, self.query)
+ else:
+ self.full_path = self.path
+ self.http_version = 'HTTP/1.1'
+ self.method = 'GET'
+ self.headers = []
+ self.reader = None
+ self.writer = None
+
+ def vprint(self, *args):
+ if self.verbose:
+ print(*args, file=sys.stderr)
+
+ @coroutine
+ def connect(self, pool):
+ self.vprint('* Connecting to %s:%s using %s' %
+ (self.hostname, self.port, 'ssl' if self.ssl else 'tcp'))
+ self.reader, self.writer = \
+ yield from pool.open_connection(self.hostname,
+ self.port,
+ ssl=self.ssl)
+ self.vprint('* Connected to %s' %
+ (self.writer.get_extra_info('peername'),))
+
+ @coroutine
+ def putline(self, line):
+ self.vprint('>', line)
+ self.writer.write(line.encode('latin-1') + b'\r\n')
+ ##yield from self.writer.drain()
+
+ @coroutine
+ def send_request(self):
+ request = '%s %s %s' % (self.method, self.full_path, self.http_version)
+ yield from self.putline(request)
+ if 'host' not in {key.lower() for key, _ in self.headers}:
+ self.headers.insert(0, ('Host', self.netloc))
+ for key, value in self.headers:
+ line = '%s: %s' % (key, value)
+ yield from self.putline(line)
+ yield from self.putline('')
+
+ @coroutine
+ def get_response(self):
+ response = Response(self.reader, self.verbose)
+ yield from response.read_headers()
+ return response
+
+
+class Response:
+
+ def __init__(self, reader, verbose=True):
+ self.reader = reader
+ self.verbose = verbose
+ self.http_version = None # 'HTTP/1.1'
+ self.status = None # 200
+ self.reason = None # 'Ok'
+ self.headers = [] # [('Content-Type', 'text/html')]
+
+ def vprint(self, *args):
+ if self.verbose:
+ print(*args, file=sys.stderr)
+
+ @coroutine
+ def getline(self):
+ line = (yield from self.reader.readline()).decode('latin-1').rstrip()
+ self.vprint('<', line)
+ return line
+
+ @coroutine
+ def read_headers(self):
+ status_line = yield from self.getline()
+ status_parts = status_line.split(None, 2)
+ if len(status_parts) != 3:
+ raise BadStatusLine(status_line)
+ self.http_version, status, self.reason = status_parts
+ self.status = int(status)
+ while True:
+ header_line = yield from self.getline()
+ if not header_line:
+ break
+ # TODO: Continuation lines.
+ key, value = header_line.split(':', 1)
+ self.headers.append((key, value.strip()))
+
+ def get_redirect_url(self, default=None):
+ if self.status not in (300, 301, 302, 303, 307):
+ return default
+ return self.get_header('Location', default)
+
+ def get_header(self, key, default=None):
+ key = key.lower()
+ for k, v in self.headers:
+ if k.lower() == key:
+ return v
+ return default
+
+ @coroutine
+ def read(self):
+ nbytes = None
+ for key, value in self.headers:
+ if key.lower() == 'content-length':
+ nbytes = int(value)
+ break
+ if nbytes is None:
+ if self.get_header('transfer-encoding', '').lower() == 'chunked':
+ blocks = []
+ size = -1
+ while size:
+ size_header = yield from self.reader.readline()
+ if not size_header:
+ break
+ parts = size_header.split(b';')
+ size = int(parts[0], 16)
+ if size:
+ block = yield from self.reader.readexactly(size)
+ assert len(block) == size, (len(block), size)
+ blocks.append(block)
+ crlf = yield from self.reader.readline()
+ assert crlf == b'\r\n', repr(crlf)
+ body = b''.join(blocks)
+ else:
+ body = yield from self.reader.read()
+ else:
+ body = yield from self.reader.readexactly(nbytes)
+ return body
+
+
+@coroutine
+def fetch(url, verbose=True, max_redirect=10):
+ pool = ConnectionPool(verbose)
+ try:
+ for _ in range(max_redirect):
+ request = Request(url, verbose)
+ yield from request.connect(pool)
+ yield from request.send_request()
+ response = yield from request.get_response()
+ body = yield from response.read()
+ next_url = response.get_redirect_url()
+ if not next_url:
+ break
+ url = urllib.parse.urljoin(url, next_url)
+ print('redirect to', url, file=sys.stderr)
+ return body
+ finally:
+ pool.close()
+
+
+def main():
+ if '--iocp' in sys.argv:
+ from asyncio.windows_events import ProactorEventLoop
+ loop = ProactorEventLoop()
+ set_event_loop(loop)
+ else:
+ loop = get_event_loop()
+ try:
+ body = loop.run_until_complete(fetch(sys.argv[1], '-v' in sys.argv))
+ finally:
+ loop.close()
+ sys.stdout.buffer.write(body)
+
+
+if __name__ == '__main__':
+ main()
diff --git a/examples/fuzz_as_completed.py b/examples/fuzz_as_completed.py
new file mode 100644
index 0000000..123fbf1
--- /dev/null
+++ b/examples/fuzz_as_completed.py
@@ -0,0 +1,69 @@
+#!/usr/bin/env python3
+
+"""Fuzz tester for as_completed(), by Glenn Langford."""
+
+import asyncio
+import itertools
+import random
+import sys
+
+@asyncio.coroutine
+def sleeper(time):
+ yield from asyncio.sleep(time)
+ return time
+
+@asyncio.coroutine
+def watcher(tasks,delay=False):
+ res = []
+ for t in asyncio.as_completed(tasks):
+ r = yield from t
+ res.append(r)
+ if delay:
+ # simulate processing delay
+ process_time = random.random() / 10
+ yield from asyncio.sleep(process_time)
+ #print(res)
+ #assert(sorted(res) == res)
+ if sorted(res) != res:
+ print('FAIL', res)
+ print('------------')
+ else:
+ print('.', end='')
+ sys.stdout.flush()
+
+loop = asyncio.get_event_loop()
+
+print('Pass 1')
+# All permutations of discrete task running times must be returned
+# by as_completed in the correct order.
+task_times = [0, 0.1, 0.2, 0.3, 0.4 ] # 120 permutations
+for times in itertools.permutations(task_times):
+ tasks = [ asyncio.Task(sleeper(t)) for t in times ]
+ loop.run_until_complete(asyncio.Task(watcher(tasks)))
+
+print()
+print('Pass 2')
+# Longer task times, with randomized duplicates. 100 tasks each time.
+longer_task_times = [x/10 for x in range(30)]
+for i in range(20):
+ task_times = longer_task_times * 10
+ random.shuffle(task_times)
+ #print('Times', task_times[:500])
+ tasks = [ asyncio.Task(sleeper(t)) for t in task_times[:100] ]
+ loop.run_until_complete(asyncio.Task(watcher(tasks)))
+
+print()
+print('Pass 3')
+# Same as pass 2, but with a random processing delay (0 - 0.1s) after
+# retrieving each future from as_completed and 200 tasks. This tests whether
+# the order that callbacks are triggered is preserved through to the
+# as_completed caller.
+for i in range(20):
+ task_times = longer_task_times * 10
+ random.shuffle(task_times)
+ #print('Times', task_times[:200])
+ tasks = [ asyncio.Task(sleeper(t)) for t in task_times[:200] ]
+ loop.run_until_complete(asyncio.Task(watcher(tasks, delay=True)))
+
+print()
+loop.close()
diff --git a/examples/hello_callback.py b/examples/hello_callback.py
new file mode 100644
index 0000000..7ccbea1
--- /dev/null
+++ b/examples/hello_callback.py
@@ -0,0 +1,17 @@
+"""Print 'Hello World' every two seconds, using a callback."""
+
+import asyncio
+
+
+def print_and_repeat(loop):
+ print('Hello World')
+ loop.call_later(2, print_and_repeat, loop)
+
+
+if __name__ == '__main__':
+ loop = asyncio.get_event_loop()
+ print_and_repeat(loop)
+ try:
+ loop.run_forever()
+ finally:
+ loop.close()
diff --git a/examples/hello_coroutine.py b/examples/hello_coroutine.py
new file mode 100644
index 0000000..b9347aa
--- /dev/null
+++ b/examples/hello_coroutine.py
@@ -0,0 +1,18 @@
+"""Print 'Hello World' every two seconds, using a coroutine."""
+
+import asyncio
+
+
+@asyncio.coroutine
+def greet_every_two_seconds():
+ while True:
+ print('Hello World')
+ yield from asyncio.sleep(2)
+
+
+if __name__ == '__main__':
+ loop = asyncio.get_event_loop()
+ try:
+ loop.run_until_complete(greet_every_two_seconds())
+ finally:
+ loop.close()
diff --git a/examples/shell.py b/examples/shell.py
new file mode 100644
index 0000000..7dc7caf
--- /dev/null
+++ b/examples/shell.py
@@ -0,0 +1,50 @@
+"""Examples using create_subprocess_exec() and create_subprocess_shell()."""
+
+import asyncio
+import signal
+from asyncio.subprocess import PIPE
+
+@asyncio.coroutine
+def cat(loop):
+ proc = yield from asyncio.create_subprocess_shell("cat",
+ stdin=PIPE,
+ stdout=PIPE)
+ print("pid: %s" % proc.pid)
+
+ message = "Hello World!"
+ print("cat write: %r" % message)
+
+ stdout, stderr = yield from proc.communicate(message.encode('ascii'))
+ print("cat read: %r" % stdout.decode('ascii'))
+
+ exitcode = yield from proc.wait()
+ print("(exit code %s)" % exitcode)
+
+@asyncio.coroutine
+def ls(loop):
+ proc = yield from asyncio.create_subprocess_exec("ls",
+ stdout=PIPE)
+ while True:
+ line = yield from proc.stdout.readline()
+ if not line:
+ break
+ print("ls>>", line.decode('ascii').rstrip())
+ try:
+ proc.send_signal(signal.SIGINT)
+ except ProcessLookupError:
+ pass
+
+@asyncio.coroutine
+def test_call(*args, timeout=None):
+ try:
+ proc = yield from asyncio.create_subprocess_exec(*args)
+ exitcode = yield from asyncio.wait_for(proc.wait(), timeout)
+ print("%s: exit code %s" % (' '.join(args), exitcode))
+ except asyncio.TimeoutError:
+ print("timeout! (%.1f sec)" % timeout)
+
+loop = asyncio.get_event_loop()
+loop.run_until_complete(cat(loop))
+loop.run_until_complete(ls(loop))
+loop.run_until_complete(test_call("bash", "-c", "sleep 3", timeout=1.0))
+loop.close()
diff --git a/examples/simple_tcp_server.py b/examples/simple_tcp_server.py
new file mode 100644
index 0000000..5f874ff
--- /dev/null
+++ b/examples/simple_tcp_server.py
@@ -0,0 +1,154 @@
+"""
+Example of a simple TCP server that is written in (mostly) coroutine
+style and uses asyncio.streams.start_server() and
+asyncio.streams.open_connection().
+
+Note that running this example starts both the TCP server and client
+in the same process. It listens on port 12345 on 127.0.0.1, so it will
+fail if this port is currently in use.
+"""
+
+import sys
+import asyncio
+import asyncio.streams
+
+
+class MyServer:
+ """
+ This is just an example of how a TCP server might be potentially
+ structured. This class has basically 3 methods: start the server,
+ handle a client, and stop the server.
+
+ Note that you don't have to follow this structure, it is really
+ just an example or possible starting point.
+ """
+
+ def __init__(self):
+ self.server = None # encapsulates the server sockets
+
+ # this keeps track of all the clients that connected to our
+ # server. It can be useful in some cases, for instance to
+ # kill client connections or to broadcast some data to all
+ # clients...
+ self.clients = {} # task -> (reader, writer)
+
+ def _accept_client(self, client_reader, client_writer):
+ """
+ This method accepts a new client connection and creates a Task
+ to handle this client. self.clients is updated to keep track
+ of the new client.
+ """
+
+ # start a new Task to handle this specific client connection
+ task = asyncio.Task(self._handle_client(client_reader, client_writer))
+ self.clients[task] = (client_reader, client_writer)
+
+ def client_done(task):
+ print("client task done:", task, file=sys.stderr)
+ del self.clients[task]
+
+ task.add_done_callback(client_done)
+
+ @asyncio.coroutine
+ def _handle_client(self, client_reader, client_writer):
+ """
+ This method actually does the work to handle the requests for
+ a specific client. The protocol is line oriented, so there is
+ a main loop that reads a line with a request and then sends
+ out one or more lines back to the client with the result.
+ """
+ while True:
+ data = (yield from client_reader.readline()).decode("utf-8")
+ if not data: # an empty string means the client disconnected
+ break
+ cmd, *args = data.rstrip().split(' ')
+ if cmd == 'add':
+ arg1 = float(args[0])
+ arg2 = float(args[1])
+ retval = arg1 + arg2
+ client_writer.write("{!r}\n".format(retval).encode("utf-8"))
+ elif cmd == 'repeat':
+ times = int(args[0])
+ msg = args[1]
+ client_writer.write("begin\n".encode("utf-8"))
+ for idx in range(times):
+ client_writer.write("{}. {}\n".format(idx+1, msg)
+ .encode("utf-8"))
+ client_writer.write("end\n".encode("utf-8"))
+ else:
+ print("Bad command {!r}".format(data), file=sys.stderr)
+
+ # This enables us to have flow control in our connection.
+ yield from client_writer.drain()
+
+ def start(self, loop):
+ """
+ Starts the TCP server, so that it listens on port 12345.
+
+ For each client that connects, the accept_client method gets
+ called. This method runs the loop until the server sockets
+ are ready to accept connections.
+ """
+ self.server = loop.run_until_complete(
+ asyncio.streams.start_server(self._accept_client,
+ '127.0.0.1', 12345,
+ loop=loop))
+
+ def stop(self, loop):
+ """
+ Stops the TCP server, i.e. closes the listening socket(s).
+
+ This method runs the loop until the server sockets are closed.
+ """
+ if self.server is not None:
+ self.server.close()
+ loop.run_until_complete(self.server.wait_closed())
+ self.server = None
+
+
+def main():
+ loop = asyncio.get_event_loop()
+
+ # creates a server and starts listening to TCP connections
+ server = MyServer()
+ server.start(loop)
+
+ @asyncio.coroutine
+ def client():
+ reader, writer = yield from asyncio.streams.open_connection(
+ '127.0.0.1', 12345, loop=loop)
+
+ def send(msg):
+ print("> " + msg)
+ writer.write((msg + '\n').encode("utf-8"))
+
+ def recv():
+ msgback = (yield from reader.readline()).decode("utf-8").rstrip()
+ print("< " + msgback)
+ return msgback
+
+ # send a line
+ send("add 1 2")
+ msg = yield from recv()
+
+ send("repeat 5 hello")
+ msg = yield from recv()
+ assert msg == 'begin'
+ while True:
+ msg = yield from recv()
+ if msg == 'end':
+ break
+
+ writer.close()
+ yield from asyncio.sleep(0.5)
+
+ # creates a client and connects to our server
+ try:
+ loop.run_until_complete(client())
+ server.stop(loop)
+ finally:
+ loop.close()
+
+
+if __name__ == '__main__':
+ main()
diff --git a/examples/sink.py b/examples/sink.py
new file mode 100644
index 0000000..d362cbb
--- /dev/null
+++ b/examples/sink.py
@@ -0,0 +1,94 @@
+"""Test service that accepts connections and reads all data off them."""
+
+import argparse
+import os
+import sys
+
+from asyncio import *
+
+ARGS = argparse.ArgumentParser(description="TCP data sink example.")
+ARGS.add_argument(
+ '--tls', action='store_true', dest='tls',
+ default=False, help='Use TLS with a self-signed cert')
+ARGS.add_argument(
+ '--iocp', action='store_true', dest='iocp',
+ default=False, help='Use IOCP event loop (Windows only)')
+ARGS.add_argument(
+ '--host', action='store', dest='host',
+ default='127.0.0.1', help='Host name')
+ARGS.add_argument(
+ '--port', action='store', dest='port',
+ default=1111, type=int, help='Port number')
+ARGS.add_argument(
+ '--maxsize', action='store', dest='maxsize',
+ default=16*1024*1024, type=int, help='Max total data size')
+
+server = None
+args = None
+
+
+def dprint(*args):
+ print('sink:', *args, file=sys.stderr)
+
+
+class Service(Protocol):
+
+ def connection_made(self, tr):
+ dprint('connection from', tr.get_extra_info('peername'))
+ dprint('my socket is', tr.get_extra_info('sockname'))
+ self.tr = tr
+ self.total = 0
+
+ def data_received(self, data):
+ if data == b'stop':
+ dprint('stopping server')
+ server.close()
+ self.tr.close()
+ return
+ self.total += len(data)
+ dprint('received', len(data), 'bytes; total', self.total)
+ if self.total > args.maxsize:
+ dprint('closing due to too much data')
+ self.tr.close()
+
+ def connection_lost(self, how):
+ dprint('closed', repr(how))
+
+
+@coroutine
+def start(loop, host, port):
+ global server
+ sslctx = None
+ if args.tls:
+ import ssl
+ # TODO: take cert/key from args as well.
+ here = os.path.join(os.path.dirname(__file__), '..', 'tests')
+ sslctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+ sslctx.options |= ssl.OP_NO_SSLv2
+ sslctx.load_cert_chain(
+ certfile=os.path.join(here, 'ssl_cert.pem'),
+ keyfile=os.path.join(here, 'ssl_key.pem'))
+
+ server = yield from loop.create_server(Service, host, port, ssl=sslctx)
+ dprint('serving TLS' if sslctx else 'serving',
+ [s.getsockname() for s in server.sockets])
+ yield from server.wait_closed()
+
+
+def main():
+ global args
+ args = ARGS.parse_args()
+ if args.iocp:
+ from asyncio.windows_events import ProactorEventLoop
+ loop = ProactorEventLoop()
+ set_event_loop(loop)
+ else:
+ loop = get_event_loop()
+ try:
+ loop.run_until_complete(start(loop, args.host, args.port))
+ finally:
+ loop.close()
+
+
+if __name__ == '__main__':
+ main()
diff --git a/examples/source.py b/examples/source.py
new file mode 100644
index 0000000..7fd11fb
--- /dev/null
+++ b/examples/source.py
@@ -0,0 +1,100 @@
+"""Test client that connects and sends infinite data."""
+
+import argparse
+import sys
+
+from asyncio import *
+from asyncio import test_utils
+
+
+ARGS = argparse.ArgumentParser(description="TCP data sink example.")
+ARGS.add_argument(
+ '--tls', action='store_true', dest='tls',
+ default=False, help='Use TLS')
+ARGS.add_argument(
+ '--iocp', action='store_true', dest='iocp',
+ default=False, help='Use IOCP event loop (Windows only)')
+ARGS.add_argument(
+ '--stop', action='store_true', dest='stop',
+ default=False, help='Stop the server by sending it b"stop" as data')
+ARGS.add_argument(
+ '--host', action='store', dest='host',
+ default='127.0.0.1', help='Host name')
+ARGS.add_argument(
+ '--port', action='store', dest='port',
+ default=1111, type=int, help='Port number')
+ARGS.add_argument(
+ '--size', action='store', dest='size',
+ default=16*1024, type=int, help='Data size')
+
+args = None
+
+
+def dprint(*args):
+ print('source:', *args, file=sys.stderr)
+
+
+class Client(Protocol):
+
+ total = 0
+
+ def connection_made(self, tr):
+ dprint('connecting to', tr.get_extra_info('peername'))
+ dprint('my socket is', tr.get_extra_info('sockname'))
+ self.tr = tr
+ self.lost = False
+ self.loop = get_event_loop()
+ self.waiter = Future()
+ if args.stop:
+ self.tr.write(b'stop')
+ self.tr.close()
+ else:
+ self.data = b'x'*args.size
+ self.write_some_data()
+
+ def write_some_data(self):
+ if self.lost:
+ dprint('lost already')
+ return
+ data = self.data
+ size = len(data)
+ self.total += size
+ dprint('writing', size, 'bytes; total', self.total)
+ self.tr.write(data)
+ self.loop.call_soon(self.write_some_data)
+
+ def connection_lost(self, exc):
+ dprint('lost connection', repr(exc))
+ self.lost = True
+ self.waiter.set_result(None)
+
+
+@coroutine
+def start(loop, host, port):
+ sslctx = None
+ if args.tls:
+ sslctx = test_utils.dummy_ssl_context()
+ tr, pr = yield from loop.create_connection(Client, host, port,
+ ssl=sslctx)
+ dprint('tr =', tr)
+ dprint('pr =', pr)
+ yield from pr.waiter
+
+
+def main():
+ global args
+ args = ARGS.parse_args()
+ if args.iocp:
+ from asyncio.windows_events import ProactorEventLoop
+ loop = ProactorEventLoop()
+ set_event_loop(loop)
+ else:
+ loop = get_event_loop()
+ try:
+ loop.run_until_complete(start(loop, args.host, args.port))
+ finally:
+ loop.close()
+
+
+if __name__ == '__main__':
+ main()
diff --git a/examples/source1.py b/examples/source1.py
new file mode 100644
index 0000000..6802e96
--- /dev/null
+++ b/examples/source1.py
@@ -0,0 +1,98 @@
+"""Like source.py, but uses streams."""
+
+import argparse
+import sys
+
+from asyncio import *
+from asyncio import test_utils
+
+ARGS = argparse.ArgumentParser(description="TCP data sink example.")
+ARGS.add_argument(
+ '--tls', action='store_true', dest='tls',
+ default=False, help='Use TLS')
+ARGS.add_argument(
+ '--iocp', action='store_true', dest='iocp',
+ default=False, help='Use IOCP event loop (Windows only)')
+ARGS.add_argument(
+ '--stop', action='store_true', dest='stop',
+ default=False, help='Stop the server by sending it b"stop" as data')
+ARGS.add_argument(
+ '--host', action='store', dest='host',
+ default='127.0.0.1', help='Host name')
+ARGS.add_argument(
+ '--port', action='store', dest='port',
+ default=1111, type=int, help='Port number')
+ARGS.add_argument(
+ '--size', action='store', dest='size',
+ default=16*1024, type=int, help='Data size')
+
+
+class Debug:
+ """A clever little class that suppresses repetitive messages."""
+
+ overwriting = False
+ label = 'stream1:'
+
+ def print(self, *args):
+ if self.overwriting:
+ print(file=sys.stderr)
+ self.overwriting = 0
+ print(self.label, *args, file=sys.stderr)
+
+ def oprint(self, *args):
+ self.overwriting += 1
+ end = '\n'
+ if self.overwriting >= 3:
+ if self.overwriting == 3:
+ print(self.label, '[...]', file=sys.stderr)
+ end = '\r'
+ print(self.label, *args, file=sys.stderr, end=end, flush=True)
+
+
+@coroutine
+def start(loop, args):
+ d = Debug()
+ total = 0
+ sslctx = None
+ if args.tls:
+ d.print('using dummy SSLContext')
+ sslctx = test_utils.dummy_ssl_context()
+ r, w = yield from open_connection(args.host, args.port, ssl=sslctx)
+ d.print('r =', r)
+ d.print('w =', w)
+ if args.stop:
+ w.write(b'stop')
+ w.close()
+ else:
+ size = args.size
+ data = b'x'*size
+ try:
+ while True:
+ total += size
+ d.oprint('writing', size, 'bytes; total', total)
+ w.write(data)
+ f = w.drain()
+ if f:
+ d.print('pausing')
+ yield from f
+ except (ConnectionResetError, BrokenPipeError) as exc:
+ d.print('caught', repr(exc))
+
+
+def main():
+ global args
+ args = ARGS.parse_args()
+ if args.iocp:
+ from asyncio.windows_events import ProactorEventLoop
+ loop = ProactorEventLoop()
+ set_event_loop(loop)
+ else:
+ loop = get_event_loop()
+ try:
+ loop.run_until_complete(start(loop, args))
+ finally:
+ loop.close()
+
+
+if __name__ == '__main__':
+ main()
diff --git a/examples/stacks.py b/examples/stacks.py
new file mode 100644
index 0000000..0b7e0b2
--- /dev/null
+++ b/examples/stacks.py
@@ -0,0 +1,44 @@
+"""Crude demo for print_stack()."""
+
+
+from asyncio import *
+
+
+@coroutine
+def helper(r):
+ print('--- helper ---')
+ for t in Task.all_tasks():
+ t.print_stack()
+ print('--- end helper ---')
+ line = yield from r.readline()
+ 1/0
+ return line
+
+def doit():
+ l = get_event_loop()
+ lr = l.run_until_complete
+ r, w = lr(open_connection('python.org', 80))
+ t1 = async(helper(r))
+ for t in Task.all_tasks(): t.print_stack()
+ print('---')
+ l._run_once()
+ for t in Task.all_tasks(): t.print_stack()
+ print('---')
+ w.write(b'GET /\r\n')
+ w.write_eof()
+ try:
+ lr(t1)
+ except Exception as e:
+ print('catching', e)
+ finally:
+ for t in Task.all_tasks():
+ t.print_stack()
+ l.close()
+
+
+def main():
+ doit()
+
+
+if __name__ == '__main__':
+ main()
diff --git a/examples/subprocess_attach_read_pipe.py b/examples/subprocess_attach_read_pipe.py
new file mode 100644
index 0000000..d8a6242
--- /dev/null
+++ b/examples/subprocess_attach_read_pipe.py
@@ -0,0 +1,33 @@
+#!/usr/bin/env python3
+"""Example showing how to attach a read pipe to a subprocess."""
+import asyncio
+import os, sys
+
+code = """
+import os, sys
+fd = int(sys.argv[1])
+os.write(fd, b'data')
+os.close(fd)
+"""
+
+loop = asyncio.get_event_loop()
+
+@asyncio.coroutine
+def task():
+ rfd, wfd = os.pipe()
+ args = [sys.executable, '-c', code, str(wfd)]
+
+ pipe = open(rfd, 'rb', 0)
+ reader = asyncio.StreamReader(loop=loop)
+ protocol = asyncio.StreamReaderProtocol(reader, loop=loop)
+ transport, _ = yield from loop.connect_read_pipe(lambda: protocol, pipe)
+
+ proc = yield from asyncio.create_subprocess_exec(*args, pass_fds={wfd})
+ yield from proc.wait()
+
+ os.close(wfd)
+ data = yield from reader.read()
+ print("read = %r" % data.decode())
+
+loop.run_until_complete(task())
+loop.close()
diff --git a/examples/subprocess_attach_write_pipe.py b/examples/subprocess_attach_write_pipe.py
new file mode 100644
index 0000000..8614877
--- /dev/null
+++ b/examples/subprocess_attach_write_pipe.py
@@ -0,0 +1,35 @@
+#!/usr/bin/env python3
+"""Example showing how to attach a write pipe to a subprocess."""
+import asyncio
+import os, sys
+from asyncio import subprocess
+
+code = """
+import os, sys
+fd = int(sys.argv[1])
+data = os.read(fd, 1024)
+sys.stdout.buffer.write(data)
+"""
+
+loop = asyncio.get_event_loop()
+
+@asyncio.coroutine
+def task():
+ rfd, wfd = os.pipe()
+ args = [sys.executable, '-c', code, str(rfd)]
+ proc = yield from asyncio.create_subprocess_exec(
+ *args,
+ pass_fds={rfd},
+ stdout=subprocess.PIPE)
+
+ pipe = open(wfd, 'wb', 0)
+ transport, _ = yield from loop.connect_write_pipe(asyncio.Protocol,
+ pipe)
+ transport.write(b'data')
+
+ stdout, stderr = yield from proc.communicate()
+ print("stdout = %r" % stdout.decode())
+ pipe.close()
+
+loop.run_until_complete(task())
+loop.close()
diff --git a/examples/subprocess_shell.py b/examples/subprocess_shell.py
new file mode 100644
index 0000000..745cb64
--- /dev/null
+++ b/examples/subprocess_shell.py
@@ -0,0 +1,87 @@
+"""Example writing to and reading from a subprocess at the same time using
+tasks."""
+
+import asyncio
+import os
+from asyncio.subprocess import PIPE
+
+
+@asyncio.coroutine
+def send_input(writer, input):
+ try:
+ for line in input:
+ print('sending', len(line), 'bytes')
+ writer.write(line)
+ d = writer.drain()
+ if d:
+ print('pause writing')
+ yield from d
+ print('resume writing')
+ writer.close()
+ except BrokenPipeError:
+ print('stdin: broken pipe error')
+ except ConnectionResetError:
+ print('stdin: connection reset error')
+
+@asyncio.coroutine
+def log_errors(reader):
+ while True:
+ line = yield from reader.readline()
+ if not line:
+ break
+ print('ERROR', repr(line))
+
+@asyncio.coroutine
+def read_stdout(stdout):
+ while True:
+ line = yield from stdout.readline()
+ print('received', repr(line))
+ if not line:
+ break
+
+@asyncio.coroutine
+def start(cmd, input=None, **kwds):
+ kwds['stdout'] = PIPE
+ kwds['stderr'] = PIPE
+ if input is None and 'stdin' not in kwds:
+ kwds['stdin'] = None
+ else:
+ kwds['stdin'] = PIPE
+ proc = yield from asyncio.create_subprocess_shell(cmd, **kwds)
+
+ tasks = []
+ if input is not None:
+ tasks.append(send_input(proc.stdin, input))
+ else:
+ print('No stdin')
+ if proc.stderr is not None:
+ tasks.append(log_errors(proc.stderr))
+ else:
+ print('No stderr')
+ if proc.stdout is not None:
+ tasks.append(read_stdout(proc.stdout))
+ else:
+ print('No stdout')
+
+ if tasks:
+ # feed stdin while consuming stdout to avoid hang
+ # when stdin pipe is full
+ yield from asyncio.wait(tasks)
+
+ exitcode = yield from proc.wait()
+ print("exit code: %s" % exitcode)
+
+
+def main():
+ if os.name == 'nt':
+ loop = asyncio.ProactorEventLoop()
+ asyncio.set_event_loop(loop)
+ else:
+ loop = asyncio.get_event_loop()
+ loop.run_until_complete(start(
+ 'sleep 2; wc', input=[b'foo bar baz\n'*300 for i in range(100)]))
+ loop.close()
+
+
+if __name__ == '__main__':
+ main()
diff --git a/examples/tcp_echo.py b/examples/tcp_echo.py
new file mode 100755
index 0000000..d743242
--- /dev/null
+++ b/examples/tcp_echo.py
@@ -0,0 +1,128 @@
+#!/usr/bin/env python3
+"""TCP echo server example."""
+import argparse
+import asyncio
+import sys
+try:
+ import signal
+except ImportError:
+ signal = None
+
+
+class EchoServer(asyncio.Protocol):
+
+ TIMEOUT = 5.0
+
+ def timeout(self):
+ print('connection timeout, closing.')
+ self.transport.close()
+
+ def connection_made(self, transport):
+ print('connection made')
+ self.transport = transport
+
+ # start 5 seconds timeout timer
+ self.h_timeout = asyncio.get_event_loop().call_later(
+ self.TIMEOUT, self.timeout)
+
+ def data_received(self, data):
+ print('data received: ', data.decode())
+ self.transport.write(b'Re: ' + data)
+
+ # restart timeout timer
+ self.h_timeout.cancel()
+ self.h_timeout = asyncio.get_event_loop().call_later(
+ self.TIMEOUT, self.timeout)
+
+ def eof_received(self):
+ pass
+
+ def connection_lost(self, exc):
+ print('connection lost:', exc)
+ self.h_timeout.cancel()
+
+
+class EchoClient(asyncio.Protocol):
+
+ message = 'This is the message. It will be echoed.'
+
+ def connection_made(self, transport):
+ self.transport = transport
+ self.transport.write(self.message.encode())
+ print('data sent:', self.message)
+
+ def data_received(self, data):
+ print('data received:', data)
+
+ # disconnect after 10 seconds
+ asyncio.get_event_loop().call_later(10.0, self.transport.close)
+
+ def eof_received(self):
+ pass
+
+ def connection_lost(self, exc):
+ print('connection lost:', exc)
+ asyncio.get_event_loop().stop()
+
+
+def start_client(loop, host, port):
+ t = asyncio.Task(loop.create_connection(EchoClient, host, port))
+ loop.run_until_complete(t)
+
+
+def start_server(loop, host, port):
+ f = loop.create_server(EchoServer, host, port)
+ return loop.run_until_complete(f)
+
+
+ARGS = argparse.ArgumentParser(description="TCP Echo example.")
+ARGS.add_argument(
+ '--server', action="store_true", dest='server',
+ default=False, help='Run tcp server')
+ARGS.add_argument(
+ '--client', action="store_true", dest='client',
+ default=False, help='Run tcp client')
+ARGS.add_argument(
+ '--host', action="store", dest='host',
+ default='127.0.0.1', help='Host name')
+ARGS.add_argument(
+ '--port', action="store", dest='port',
+ default=9999, type=int, help='Port number')
+ARGS.add_argument(
+ '--iocp', action="store_true", dest='iocp',
+ default=False, help='Use IOCP event loop')
+
+
+if __name__ == '__main__':
+ args = ARGS.parse_args()
+
+ if ':' in args.host:
+ args.host, port = args.host.split(':', 1)
+ args.port = int(port)
+
+ if (not (args.server or args.client)) or (args.server and args.client):
+ print('Please specify --server or --client\n')
+ ARGS.print_help()
+ else:
+ if args.iocp:
+ from asyncio import windows_events
+ loop = windows_events.ProactorEventLoop()
+ asyncio.set_event_loop(loop)
+ else:
+ loop = asyncio.get_event_loop()
+ print ('Using backend: {0}'.format(loop.__class__.__name__))
+
+ if signal is not None and sys.platform != 'win32':
+ loop.add_signal_handler(signal.SIGINT, loop.stop)
+
+ if args.server:
+ server = start_server(loop, args.host, args.port)
+ else:
+ start_client(loop, args.host, args.port)
+
+ try:
+ loop.run_forever()
+ finally:
+ if args.server:
+ server.close()
+ loop.close()
diff --git a/examples/timing_tcp_server.py b/examples/timing_tcp_server.py
new file mode 100644
index 0000000..883ce6d
--- /dev/null
+++ b/examples/timing_tcp_server.py
@@ -0,0 +1,168 @@
+"""
+A variant of simple_tcp_server.py that measures the time it takes to
+send N messages for a range of N. (This was O(N**2) in a previous
+version of Tulip.)
+
+Note that running this example starts both the TCP server and client
+in the same process. It listens on port 1234 on 127.0.0.1, so it will
+fail if this port is currently in use.
+"""
+
+import sys
+import time
+import random
+
+import asyncio
+import asyncio.streams
+
+
+class MyServer:
+ """
+ This is just an example of how a TCP server might be potentially
+ structured. This class has basically 3 methods: start the server,
+ handle a client, and stop the server.
+
+ Note that you don't have to follow this structure, it is really
+ just an example or possible starting point.
+ """
+
+ def __init__(self):
+ self.server = None # encapsulates the server sockets
+
+ # this keeps track of all the clients that connected to our
+ # server. It can be useful in some cases, for instance to
+ # kill client connections or to broadcast some data to all
+ # clients...
+ self.clients = {} # task -> (reader, writer)
+
+ def _accept_client(self, client_reader, client_writer):
+ """
+ This method accepts a new client connection and creates a Task
+ to handle this client. self.clients is updated to keep track
+ of the new client.
+ """
+
+ # start a new Task to handle this specific client connection
+ task = asyncio.Task(self._handle_client(client_reader, client_writer))
+ self.clients[task] = (client_reader, client_writer)
+
+ def client_done(task):
+ print("client task done:", task, file=sys.stderr)
+ del self.clients[task]
+
+ task.add_done_callback(client_done)
+
+ @asyncio.coroutine
+ def _handle_client(self, client_reader, client_writer):
+ """
+ This method actually does the work to handle the requests for
+ a specific client. The protocol is line oriented, so there is
+ a main loop that reads a line with a request and then sends
+ out one or more lines back to the client with the result.
+ """
+ while True:
+ data = (yield from client_reader.readline()).decode("utf-8")
+ if not data: # an empty string means the client disconnected
+ break
+ cmd, *args = data.rstrip().split(' ')
+ if cmd == 'add':
+ arg1 = float(args[0])
+ arg2 = float(args[1])
+ retval = arg1 + arg2
+ client_writer.write("{!r}\n".format(retval).encode("utf-8"))
+ elif cmd == 'repeat':
+ times = int(args[0])
+ msg = args[1]
+ client_writer.write("begin\n".encode("utf-8"))
+ for idx in range(times):
+ client_writer.write("{}. {}\n".format(
+ idx+1, msg + 'x'*random.randint(10, 50))
+ .encode("utf-8"))
+ client_writer.write("end\n".encode("utf-8"))
+ else:
+ print("Bad command {!r}".format(data), file=sys.stderr)
+
+ # This enables us to have flow control in our connection.
+ yield from client_writer.drain()
+
+ def start(self, loop):
+ """
+ Starts the TCP server, so that it listens on port 1234.
+
+ For each client that connects, the accept_client method gets
+ called. This method runs the loop until the server sockets
+ are ready to accept connections.
+ """
+ self.server = loop.run_until_complete(
+ asyncio.streams.start_server(self._accept_client,
+ '127.0.0.1', 12345,
+ loop=loop))
+
+ def stop(self, loop):
+ """
+ Stops the TCP server, i.e. closes the listening socket(s).
+
+ This method runs the loop until the server sockets are closed.
+ """
+ if self.server is not None:
+ self.server.close()
+ loop.run_until_complete(self.server.wait_closed())
+ self.server = None
+
+
+def main():
+ loop = asyncio.get_event_loop()
+
+ # creates a server and starts listening to TCP connections
+ server = MyServer()
+ server.start(loop)
+
+ @asyncio.coroutine
+ def client():
+ reader, writer = yield from asyncio.streams.open_connection(
+ '127.0.0.1', 12345, loop=loop)
+
+ def send(msg):
+ print("> " + msg)
+ writer.write((msg + '\n').encode("utf-8"))
+
+ def recv():
+ msgback = (yield from reader.readline()).decode("utf-8").rstrip()
+ print("< " + msgback)
+ return msgback
+
+ # send a line
+ send("add 1 2")
+ msg = yield from recv()
+
+ Ns = list(range(100, 100000, 10000))
+ times = []
+
+ for N in Ns:
+ t0 = time.time()
+ send("repeat {} hello world ".format(N))
+ msg = yield from recv()
+ assert msg == 'begin'
+ while True:
+ msg = (yield from reader.readline()).decode("utf-8").rstrip()
+ if msg == 'end':
+ break
+ t1 = time.time()
+ dt = t1 - t0
+ print("Time taken: {:.3f} seconds ({:.6f} per repetition)"
+ .format(dt, dt/N))
+ times.append(dt)
+
+ writer.close()
+ yield from asyncio.sleep(0.5)
+
+ # creates a client and connects to our server
+ try:
+ loop.run_until_complete(client())
+ server.stop(loop)
+ finally:
+ loop.close()
+
+
+if __name__ == '__main__':
+ main()
diff --git a/examples/udp_echo.py b/examples/udp_echo.py
new file mode 100755
index 0000000..93ac7e6
--- /dev/null
+++ b/examples/udp_echo.py
@@ -0,0 +1,104 @@
+#!/usr/bin/env python3
+"""UDP echo example."""
+import argparse
+import sys
+import asyncio
+try:
+ import signal
+except ImportError:
+ signal = None
+
+
+class MyServerUdpEchoProtocol:
+
+ def connection_made(self, transport):
+ print('start', transport)
+ self.transport = transport
+
+ def datagram_received(self, data, addr):
+ print('Data received:', data, addr)
+ self.transport.sendto(data, addr)
+
+ def error_received(self, exc):
+ print('Error received:', exc)
+
+ def connection_lost(self, exc):
+ print('stop', exc)
+
+
+class MyClientUdpEchoProtocol:
+
+ message = 'This is the message. It will be echoed.'
+
+ def connection_made(self, transport):
+ self.transport = transport
+ print('sending "{}"'.format(self.message))
+ self.transport.sendto(self.message.encode())
+ print('waiting to receive')
+
+ def datagram_received(self, data, addr):
+ print('received "{}"'.format(data.decode()))
+ self.transport.close()
+
+ def error_received(self, exc):
+ print('Error received:', exc)
+
+ def connection_lost(self, exc):
+ print('closing transport', exc)
+ loop = asyncio.get_event_loop()
+ loop.stop()
+
+
+def start_server(loop, addr):
+ t = asyncio.Task(loop.create_datagram_endpoint(
+ MyServerUdpEchoProtocol, local_addr=addr))
+ transport, server = loop.run_until_complete(t)
+ return transport
+
+
+def start_client(loop, addr):
+ t = asyncio.Task(loop.create_datagram_endpoint(
+ MyClientUdpEchoProtocol, remote_addr=addr))
+ loop.run_until_complete(t)
+
+
+ARGS = argparse.ArgumentParser(description="UDP Echo example.")
+ARGS.add_argument(
+ '--server', action="store_true", dest='server',
+ default=False, help='Run udp server')
+ARGS.add_argument(
+ '--client', action="store_true", dest='client',
+ default=False, help='Run udp client')
+ARGS.add_argument(
+ '--host', action="store", dest='host',
+ default='127.0.0.1', help='Host name')
+ARGS.add_argument(
+ '--port', action="store", dest='port',
+ default=9999, type=int, help='Port number')
+
+
+if __name__ == '__main__':
+ args = ARGS.parse_args()
+ if ':' in args.host:
+ args.host, port = args.host.split(':', 1)
+ args.port = int(port)
+
+ if (not (args.server or args.client)) or (args.server and args.client):
+ print('Please specify --server or --client\n')
+ ARGS.print_help()
+ else:
+ loop = asyncio.get_event_loop()
+ if signal is not None:
+ loop.add_signal_handler(signal.SIGINT, loop.stop)
+
+ if '--server' in sys.argv:
+ server = start_server(loop, (args.host, args.port))
+ else:
+ start_client(loop, (args.host, args.port))
+
+ try:
+ loop.run_forever()
+ finally:
+ if '--server' in sys.argv:
+ server.close()
+ loop.close()