diff options
author | Victor Stinner <victor.stinner@gmail.com> | 2014-06-11 12:18:26 +0200 |
---|---|---|
committer | Victor Stinner <victor.stinner@gmail.com> | 2014-06-11 12:18:26 +0200 |
commit | e3f4e5f7ae38b975d7d4ed2941157b25ecafe1a2 (patch) | |
tree | 27493cc152b3ee7600fe9c8b90e7ec9e061d7c3f /trollius/events.py | |
parent | 9bcb6690366a799dcf7a73157bbdfc8785056ac7 (diff) | |
download | trollius-e3f4e5f7ae38b975d7d4ed2941157b25ecafe1a2.tar.gz |
Basic interoperability with asyncio
* Reuse AbstractEventLoopPolicy class from asyncio, if available, so
asyncio.set_event_loop() accepts Trollius event loops
* trollius.Task._step() retrieves the result from StopIteration, so it's
possible to chain a Trollius coroutine with an asyncio coroutine
* trollius.Task._step() accepts also asyncio.Future objects
Diffstat (limited to 'trollius/events.py')
-rw-r--r-- | trollius/events.py | 345 |
1 files changed, 177 insertions, 168 deletions
diff --git a/trollius/events.py b/trollius/events.py index 57872c6..4e0e860 100644 --- a/trollius/events.py +++ b/trollius/events.py @@ -12,6 +12,10 @@ __all__ = ['AbstractEventLoopPolicy', import subprocess import threading import socket +try: + import asyncio +except ImportError: + asyncio = None class Handle(object): @@ -113,241 +117,246 @@ class AbstractServer(object): return NotImplemented -class AbstractEventLoop(object): - """Abstract event loop.""" +if asyncio is not None: + # Reuse asyncio class so asyncio.set_event_loop() accepts Trollius event + # loops + AbstractEventLoop = asyncio.AbstractEventLoop +else: + class AbstractEventLoop(object): + """Abstract event loop.""" - # Running and stopping the event loop. + # Running and stopping the event loop. - def run_forever(self): - """Run the event loop until stop() is called.""" - raise NotImplementedError + def run_forever(self): + """Run the event loop until stop() is called.""" + raise NotImplementedError - def run_until_complete(self, future): - """Run the event loop until a Future is done. + def run_until_complete(self, future): + """Run the event loop until a Future is done. - Return the Future's result, or raise its exception. - """ - raise NotImplementedError + Return the Future's result, or raise its exception. + """ + raise NotImplementedError - def stop(self): - """Stop the event loop as soon as reasonable. + def stop(self): + """Stop the event loop as soon as reasonable. - Exactly how soon that is may depend on the implementation, but - no more I/O callbacks should be scheduled. - """ - raise NotImplementedError + Exactly how soon that is may depend on the implementation, but + no more I/O callbacks should be scheduled. + """ + raise NotImplementedError - def is_running(self): - """Return whether the event loop is currently running.""" - raise NotImplementedError + def is_running(self): + """Return whether the event loop is currently running.""" + raise NotImplementedError - def close(self): - """Close the loop. + def close(self): + """Close the loop. - The loop should not be running. + The loop should not be running. - This is idempotent and irreversible. + This is idempotent and irreversible. - No other methods should be called after this one. - """ - raise NotImplementedError + No other methods should be called after this one. + """ + raise NotImplementedError - # Methods scheduling callbacks. All these return Handles. + # Methods scheduling callbacks. All these return Handles. - def call_soon(self, callback, *args): - return self.call_later(0, callback, *args) + def call_soon(self, callback, *args): + return self.call_later(0, callback, *args) - def call_later(self, delay, callback, *args): - raise NotImplementedError + def call_later(self, delay, callback, *args): + raise NotImplementedError - def call_at(self, when, callback, *args): - raise NotImplementedError + def call_at(self, when, callback, *args): + raise NotImplementedError - def time(self): - raise NotImplementedError + def time(self): + raise NotImplementedError - # Methods for interacting with threads. + # Methods for interacting with threads. - def call_soon_threadsafe(self, callback, *args): - raise NotImplementedError + def call_soon_threadsafe(self, callback, *args): + raise NotImplementedError - def run_in_executor(self, executor, callback, *args): - raise NotImplementedError + def run_in_executor(self, executor, callback, *args): + raise NotImplementedError - def set_default_executor(self, executor): - raise NotImplementedError + def set_default_executor(self, executor): + raise NotImplementedError - # Network I/O methods returning Futures. + # Network I/O methods returning Futures. - def getaddrinfo(self, host, port, family=0, type=0, proto=0, flags=0): - raise NotImplementedError + def getaddrinfo(self, host, port, family=0, type=0, proto=0, flags=0): + raise NotImplementedError - def getnameinfo(self, sockaddr, flags=0): - raise NotImplementedError + def getnameinfo(self, sockaddr, flags=0): + raise NotImplementedError - def create_connection(self, protocol_factory, host=None, port=None, - ssl=None, family=0, proto=0, flags=0, sock=None, - local_addr=None, server_hostname=None): - raise NotImplementedError + def create_connection(self, protocol_factory, host=None, port=None, + ssl=None, family=0, proto=0, flags=0, sock=None, + local_addr=None, server_hostname=None): + raise NotImplementedError - def create_server(self, protocol_factory, host=None, port=None, - family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, - sock=None, backlog=100, ssl=None, reuse_address=None): - """A coroutine which creates a TCP server bound to host and port. + def create_server(self, protocol_factory, host=None, port=None, + family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, + sock=None, backlog=100, ssl=None, reuse_address=None): + """A coroutine which creates a TCP server bound to host and port. - The return value is a Server object which can be used to stop - the service. + The return value is a Server object which can be used to stop + the service. - If host is an empty string or None all interfaces are assumed - and a list of multiple sockets will be returned (most likely - one for IPv4 and another one for IPv6). + If host is an empty string or None all interfaces are assumed + and a list of multiple sockets will be returned (most likely + one for IPv4 and another one for IPv6). - family can be set to either AF_INET or AF_INET6 to force the - socket to use IPv4 or IPv6. If not set it will be determined - from host (defaults to AF_UNSPEC). + family can be set to either AF_INET or AF_INET6 to force the + socket to use IPv4 or IPv6. If not set it will be determined + from host (defaults to AF_UNSPEC). - flags is a bitmask for getaddrinfo(). + flags is a bitmask for getaddrinfo(). - sock can optionally be specified in order to use a preexisting - socket object. + sock can optionally be specified in order to use a preexisting + socket object. - backlog is the maximum number of queued connections passed to - listen() (defaults to 100). + backlog is the maximum number of queued connections passed to + listen() (defaults to 100). - ssl can be set to an SSLContext to enable SSL over the - accepted connections. + ssl can be set to an SSLContext to enable SSL over the + accepted connections. - reuse_address tells the kernel to reuse a local socket in - TIME_WAIT state, without waiting for its natural timeout to - expire. If not specified will automatically be set to True on - UNIX. - """ - raise NotImplementedError + reuse_address tells the kernel to reuse a local socket in + TIME_WAIT state, without waiting for its natural timeout to + expire. If not specified will automatically be set to True on + UNIX. + """ + raise NotImplementedError - def create_unix_connection(self, protocol_factory, path, - ssl=None, sock=None, - server_hostname=None): - raise NotImplementedError + def create_unix_connection(self, protocol_factory, path, + ssl=None, sock=None, + server_hostname=None): + raise NotImplementedError - def create_unix_server(self, protocol_factory, path, - sock=None, backlog=100, ssl=None): - """A coroutine which creates a UNIX Domain Socket server. + def create_unix_server(self, protocol_factory, path, + sock=None, backlog=100, ssl=None): + """A coroutine which creates a UNIX Domain Socket server. - The return value is a Server object, which can be used to stop - the service. + The return value is a Server object, which can be used to stop + the service. - path is a str, representing a file systsem path to bind the - server socket to. + path is a str, representing a file systsem path to bind the + server socket to. - sock can optionally be specified in order to use a preexisting - socket object. + sock can optionally be specified in order to use a preexisting + socket object. - backlog is the maximum number of queued connections passed to - listen() (defaults to 100). + backlog is the maximum number of queued connections passed to + listen() (defaults to 100). - ssl can be set to an SSLContext to enable SSL over the - accepted connections. - """ - raise NotImplementedError + ssl can be set to an SSLContext to enable SSL over the + accepted connections. + """ + raise NotImplementedError - def create_datagram_endpoint(self, protocol_factory, - local_addr=None, remote_addr=None, - family=0, proto=0, flags=0): - raise NotImplementedError + def create_datagram_endpoint(self, protocol_factory, + local_addr=None, remote_addr=None, + family=0, proto=0, flags=0): + raise NotImplementedError - # Pipes and subprocesses. + # Pipes and subprocesses. - def connect_read_pipe(self, protocol_factory, pipe): - """Register read pipe in event loop. Set the pipe to non-blocking mode. + def connect_read_pipe(self, protocol_factory, pipe): + """Register read pipe in event loop. Set the pipe to non-blocking mode. - protocol_factory should instantiate object with Protocol interface. - pipe is a file-like object. - Return pair (transport, protocol), where transport supports the - ReadTransport interface.""" - # The reason to accept file-like object instead of just file descriptor - # is: we need to own pipe and close it at transport finishing - # Can got complicated errors if pass f.fileno(), - # close fd in pipe transport then close f and vise versa. - raise NotImplementedError + protocol_factory should instantiate object with Protocol interface. + pipe is a file-like object. + Return pair (transport, protocol), where transport supports the + ReadTransport interface.""" + # The reason to accept file-like object instead of just file descriptor + # is: we need to own pipe and close it at transport finishing + # Can got complicated errors if pass f.fileno(), + # close fd in pipe transport then close f and vise versa. + raise NotImplementedError - def connect_write_pipe(self, protocol_factory, pipe): - """Register write pipe in event loop. - - protocol_factory should instantiate object with BaseProtocol interface. - Pipe is file-like object already switched to nonblocking. - Return pair (transport, protocol), where transport support - WriteTransport interface.""" - # The reason to accept file-like object instead of just file descriptor - # is: we need to own pipe and close it at transport finishing - # Can got complicated errors if pass f.fileno(), - # close fd in pipe transport then close f and vise versa. - raise NotImplementedError + def connect_write_pipe(self, protocol_factory, pipe): + """Register write pipe in event loop. - def subprocess_shell(self, protocol_factory, cmd, stdin=subprocess.PIPE, - stdout=subprocess.PIPE, stderr=subprocess.PIPE, - **kwargs): - raise NotImplementedError + protocol_factory should instantiate object with BaseProtocol interface. + Pipe is file-like object already switched to nonblocking. + Return pair (transport, protocol), where transport support + WriteTransport interface.""" + # The reason to accept file-like object instead of just file descriptor + # is: we need to own pipe and close it at transport finishing + # Can got complicated errors if pass f.fileno(), + # close fd in pipe transport then close f and vise versa. + raise NotImplementedError - def subprocess_exec(self, protocol_factory, *args, **kwargs): - raise NotImplementedError + def subprocess_shell(self, protocol_factory, cmd, stdin=subprocess.PIPE, + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + **kwargs): + raise NotImplementedError - # Ready-based callback registration methods. - # The add_*() methods return None. - # The remove_*() methods return True if something was removed, - # False if there was nothing to delete. + def subprocess_exec(self, protocol_factory, *args, **kwargs): + raise NotImplementedError - def add_reader(self, fd, callback, *args): - raise NotImplementedError + # Ready-based callback registration methods. + # The add_*() methods return None. + # The remove_*() methods return True if something was removed, + # False if there was nothing to delete. - def remove_reader(self, fd): - raise NotImplementedError + def add_reader(self, fd, callback, *args): + raise NotImplementedError - def add_writer(self, fd, callback, *args): - raise NotImplementedError + def remove_reader(self, fd): + raise NotImplementedError - def remove_writer(self, fd): - raise NotImplementedError + def add_writer(self, fd, callback, *args): + raise NotImplementedError - # Completion based I/O methods returning Futures. + def remove_writer(self, fd): + raise NotImplementedError - def sock_recv(self, sock, nbytes): - raise NotImplementedError + # Completion based I/O methods returning Futures. - def sock_sendall(self, sock, data): - raise NotImplementedError + def sock_recv(self, sock, nbytes): + raise NotImplementedError - def sock_connect(self, sock, address): - raise NotImplementedError + def sock_sendall(self, sock, data): + raise NotImplementedError - def sock_accept(self, sock): - raise NotImplementedError + def sock_connect(self, sock, address): + raise NotImplementedError - # Signal handling. + def sock_accept(self, sock): + raise NotImplementedError - def add_signal_handler(self, sig, callback, *args): - raise NotImplementedError + # Signal handling. - def remove_signal_handler(self, sig): - raise NotImplementedError + def add_signal_handler(self, sig, callback, *args): + raise NotImplementedError - # Error handlers. + def remove_signal_handler(self, sig): + raise NotImplementedError - def set_exception_handler(self, handler): - raise NotImplementedError + # Error handlers. - def default_exception_handler(self, context): - raise NotImplementedError + def set_exception_handler(self, handler): + raise NotImplementedError - def call_exception_handler(self, context): - raise NotImplementedError + def default_exception_handler(self, context): + raise NotImplementedError - # Debug flag management. + def call_exception_handler(self, context): + raise NotImplementedError - def get_debug(self): - raise NotImplementedError + # Debug flag management. - def set_debug(self, enabled): - raise NotImplementedError + def get_debug(self): + raise NotImplementedError + + def set_debug(self, enabled): + raise NotImplementedError class AbstractEventLoopPolicy(object): |