diff options
author | Yury Selivanov <yury@magic.io> | 2018-09-17 19:35:30 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-09-17 19:35:30 -0400 |
commit | 512d7101098b971837cbb406942215244f636547 (patch) | |
tree | 47d49f63a1e624497af5e68c07586e4f5e94853c /Doc/library | |
parent | c63d81b3feaa008a6be4c7c83c324174e8d95c24 (diff) | |
download | cpython-git-512d7101098b971837cbb406942215244f636547.tar.gz |
bpo-33649: Backport asyncio docs from 'master' to 3.7 (GH-9377)
Diffstat (limited to 'Doc/library')
-rw-r--r-- | Doc/library/asyncio-api-index.rst | 218 | ||||
-rw-r--r-- | Doc/library/asyncio-dev.rst | 451 | ||||
-rw-r--r-- | Doc/library/asyncio-eventloop.rst | 1374 | ||||
-rw-r--r-- | Doc/library/asyncio-eventloops.rst | 244 | ||||
-rw-r--r-- | Doc/library/asyncio-exceptions.rst | 91 | ||||
-rw-r--r-- | Doc/library/asyncio-future.rst | 250 | ||||
-rw-r--r-- | Doc/library/asyncio-llapi-index.rst | 510 | ||||
-rw-r--r-- | Doc/library/asyncio-platforms.rst | 106 | ||||
-rw-r--r-- | Doc/library/asyncio-policy.rst | 221 | ||||
-rw-r--r-- | Doc/library/asyncio-protocol.rst | 1126 | ||||
-rw-r--r-- | Doc/library/asyncio-queue.rst | 204 | ||||
-rw-r--r-- | Doc/library/asyncio-stream.rst | 488 | ||||
-rw-r--r-- | Doc/library/asyncio-subprocess.rst | 475 | ||||
-rw-r--r-- | Doc/library/asyncio-sync.rst | 362 | ||||
-rw-r--r-- | Doc/library/asyncio-task.rst | 1022 | ||||
-rw-r--r-- | Doc/library/asyncio.rst | 106 | ||||
-rw-r--r-- | Doc/library/ipc.rst | 9 |
17 files changed, 4294 insertions, 2963 deletions
diff --git a/Doc/library/asyncio-api-index.rst b/Doc/library/asyncio-api-index.rst new file mode 100644 index 0000000000..d5b5659abc --- /dev/null +++ b/Doc/library/asyncio-api-index.rst @@ -0,0 +1,218 @@ +.. currentmodule:: asyncio + + +==================== +High-level API Index +==================== + +This page lists all high-level async/await enabled asyncio APIs. + + +Tasks +===== + +Utilities to run asyncio programs, create Tasks, and +await on multiple things with timeouts. + +.. list-table:: + :widths: 50 50 + :class: full-width-table + + * - :func:`run` + - Create event loop, run a coroutine, close the loop. + + * - :func:`create_task` + - Start an asyncio Task. + + * - ``await`` :func:`sleep` + - Sleep for a number of seconds. + + * - ``await`` :func:`gather` + - Schedule and wait for things concurrently. + + * - ``await`` :func:`wait_for` + - Run with a timeout. + + * - ``await`` :func:`shield` + - Shield from cancellation. + + * - ``await`` :func:`wait` + - Monitor for completion. + + * - :func:`current_task` + - Return the current Task. + + * - :func:`all_tasks` + - Return all tasks for an event loop. + + * - :class:`Task` + - Task object. + + * - :func:`run_coroutine_threadsafe` + - Schedule a coroutine from another OS thread. + + * - ``for in`` :func:`as_completed` + - Monitor for completion with a ``for`` loop. + + +.. rubric:: Examples + +* :ref:`Using asyncio.gather() to run things in parallel + <asyncio_example_gather>`. + +* :ref:`Using asyncio.wait_for() to enforce a timeout + <asyncio_example_waitfor>`. + +* :ref:`Cancellation <asyncio_example_task_cancel>`. + +* :ref:`Using asyncio.sleep() <asyncio_example_sleep>`. + +* See also the main :ref:`Tasks documentation page <coroutine>`. + + +Queues +====== + +Queues should be used to distribute work amongst multiple asyncio Tasks, +implement connection pools, and pub/sub patterns. + + +.. list-table:: + :widths: 50 50 + :class: full-width-table + + * - :class:`Queue` + - A FIFO queue. + + * - :class:`PriorityQueue` + - A priority queue. + + * - :class:`LifoQueue` + - A LIFO queue. + + +.. rubric:: Examples + +* :ref:`Using asyncio.Queue to distribute workload between several + Tasks <asyncio_example_queue_dist>`. + +* See also the :ref:`Queues documentation page <asyncio-queues>`. + + +Subprocesses +============ + +Utilities to spawn subprocesses and run shell commands. + +.. list-table:: + :widths: 50 50 + :class: full-width-table + + * - ``await`` :func:`create_subprocess_exec` + - Create a subprocess. + + * - ``await`` :func:`create_subprocess_shell` + - Run a shell command. + + +.. rubric:: Examples + +* :ref:`Executing a shell command <asyncio_example_subprocess_shell>`. + +* See also the :ref:`subprocess APIs <asyncio-subprocess>` + documentation. + + +Streams +======= + +High-level APIs to work with network IO. + +.. list-table:: + :widths: 50 50 + :class: full-width-table + + * - ``await`` :func:`open_connection` + - Establish a TCP connection. + + * - ``await`` :func:`open_unix_connection` + - Establish a Unix socket connection. + + * - ``await`` :func:`start_server` + - Start a TCP server. + + * - ``await`` :func:`start_unix_server` + - Start a Unix socket server. + + * - :class:`StreamReader` + - High-level async/await object to receive network data. + + * - :class:`StreamWriter` + - High-level async/await object to send network data. + + +.. rubric:: Examples + +* :ref:`Example TCP client <asyncio_example_stream>`. + +* See also the :ref:`streams APIs <asyncio-streams>` + documentation. + + +Synchronization +=============== + +Threading-like synchronization primitives that can be used in Tasks. + +.. list-table:: + :widths: 50 50 + :class: full-width-table + + * - :class:`Lock` + - A mutex lock. + + * - :class:`Event` + - An event object. + + * - :class:`Condition` + - A condition object. + + * - :class:`Semaphore` + - A semaphore. + + * - :class:`BoundedSemaphore` + - A bounded semaphore. + + +.. rubric:: Examples + +* :ref:`Using asyncio.Event <asyncio_example_sync_event>`. + +* See also the documentation of asyncio + :ref:`synchronization primitives <asyncio-sync>`. + + +Exceptions +========== + +.. list-table:: + :widths: 50 50 + :class: full-width-table + + + * - :exc:`asyncio.TimeoutError` + - Raised on timeout by functions like :func:`wait_for`. + Keep in mind that ``asyncio.TimeoutError`` is **unrelated** + to the built-in :exc:`TimeoutError` exception. + + * - :exc:`asyncio.CancelledError` + - Raised when a Task is cancelled. See also :meth:`Task.cancel`. + + +.. rubric:: Examples + +* :ref:`Handling CancelledError to run code on cancellation request + <asyncio_example_task_cancel>`. + +* See also the full list of + :ref:`asyncio-specific exceptions <asyncio-exceptions>`. diff --git a/Doc/library/asyncio-dev.rst b/Doc/library/asyncio-dev.rst index 100fff561c..5f926fceb2 100644 --- a/Doc/library/asyncio-dev.rst +++ b/Doc/library/asyncio-dev.rst @@ -2,415 +2,236 @@ .. _asyncio-dev: -Develop with asyncio -==================== +======================= +Developing with asyncio +======================= -Asynchronous programming is different than classical "sequential" programming. -This page lists common traps and explains how to avoid them. +Asynchronous programming is different from classic "sequential" +programming. +This page lists common mistakes and traps and explains how +to avoid them. -.. _asyncio-debug-mode: - -Debug mode of asyncio ---------------------- - -The implementation of :mod:`asyncio` has been written for performance. -In order to ease the development of asynchronous code, you may wish to -enable *debug mode*. -To enable all debug checks for an application: +.. _asyncio-debug-mode: -* Enable the asyncio debug mode globally by setting the environment variable - :envvar:`PYTHONASYNCIODEBUG` to ``1``, using ``-X dev`` command line option - (see the :option:`-X` option), or by calling - :meth:`AbstractEventLoop.set_debug`. -* Set the log level of the :ref:`asyncio logger <asyncio-logger>` to - :py:data:`logging.DEBUG`. For example, call - ``logging.basicConfig(level=logging.DEBUG)`` at startup. -* Configure the :mod:`warnings` module to display :exc:`ResourceWarning` - warnings. For example, use the ``-Wdefault`` command line option of Python to - display them. +Debug Mode +========== -Examples debug checks: +By default asyncio runs in production mode. In order to ease +the development asyncio has a *debug mode*. -* Log :ref:`coroutines defined but never "yielded from" - <asyncio-coroutine-not-scheduled>` -* :meth:`~AbstractEventLoop.call_soon` and :meth:`~AbstractEventLoop.call_at` methods - raise an exception if they are called from the wrong thread. -* Log the execution time of the selector -* Log callbacks taking more than 100 ms to be executed. The - :attr:`AbstractEventLoop.slow_callback_duration` attribute is the minimum - duration in seconds of "slow" callbacks. -* :exc:`ResourceWarning` warnings are emitted when transports and event loops - are :ref:`not closed explicitly <asyncio-close-transports>`. +There are several ways to enable asyncio debug mode: -.. versionchanged:: 3.7 +* Setting the :envvar:`PYTHONASYNCIODEBUG` environment variable to ``1``. - The new ``-X dev`` command line option can now also be used to enable - the debug mode. +* Using the :option:`-X` ``dev`` Python command line option. -.. seealso:: +* Passing ``debug=True`` to :func:`asyncio.run`. - The :meth:`AbstractEventLoop.set_debug` method and the :ref:`asyncio logger - <asyncio-logger>`. +* Calling :meth:`loop.set_debug`. +In addition to enabling the debug mode, consider also: -Cancellation ------------- +* setting the log level of the :ref:`asyncio logger <asyncio-logger>` to + :py:data:`logging.DEBUG`, for example the following snippet of code + can be run at startup of the application:: -Cancellation of tasks is not common in classic programming. In asynchronous -programming, not only is it something common, but you have to prepare your -code to handle it. + logging.basicConfig(level=logging.DEBUG) -Futures and tasks can be cancelled explicitly with their :meth:`Future.cancel` -method. The :func:`wait_for` function cancels the waited task when the timeout -occurs. There are many other cases where a task can be cancelled indirectly. +* configuring the :mod:`warnings` module to display + :exc:`ResourceWarning` warnings. One way of doing that is by + using the :option:`-W` ``default`` command line option. -Don't call :meth:`~Future.set_result` or :meth:`~Future.set_exception` method -of :class:`Future` if the future is cancelled: it would fail with an exception. -For example, write:: - if not fut.cancelled(): - fut.set_result('done') +When the debug mode is enabled: -Don't schedule directly a call to the :meth:`~Future.set_result` or the -:meth:`~Future.set_exception` method of a future with -:meth:`AbstractEventLoop.call_soon`: the future can be cancelled before its method -is called. +* asyncio checks for :ref:`coroutines that were not awaited + <asyncio-coroutine-not-scheduled>` and logs them; this mitigates + the "forgotten await" pitfall. -If you wait for a future, you should check early if the future was cancelled to -avoid useless operations. Example:: +* Many non-treadsafe asyncio APIs (such as :meth:`loop.call_soon` and + :meth:`loop.call_at` methods) raise an exception if they are called + from a wrong thread. - async def slow_operation(fut): - if fut.cancelled(): - return - # ... slow computation ... - await fut - # ... +* The execution time of the I/O selector is logged if it takes too long to + perform an I/O operation. -The :func:`shield` function can also be used to ignore cancellation. +* Callbacks taking longer than 100ms are logged. The + :attr:`loop.slow_callback_duration` attribute can be used to set the + minimum execution duration in seconds that is considered "slow". .. _asyncio-multithreading: -Concurrency and multithreading ------------------------------- +Concurrency and Multithreading +============================== -An event loop runs in a thread and executes all callbacks and tasks in the same -thread. While a task is running in the event loop, no other task is running in -the same thread. But when the task uses ``await``, the task is suspended -and the event loop executes the next task. +An event loop runs in a thread (typically the main thread) and executes +all callbacks and Tasks in its thread. While a Task is running in the +event loop, no other Tasks can run in the same thread. When a Task +executes an ``await`` expression, the running Task gets suspended, and +the event loop executes the next Task. -To schedule a callback from a different thread, the -:meth:`AbstractEventLoop.call_soon_threadsafe` method should be used. Example:: +To schedule a callback from a different OS thread, the +:meth:`loop.call_soon_threadsafe` method should be used. Example:: loop.call_soon_threadsafe(callback, *args) -Most asyncio objects are not thread safe. You should only worry if you access -objects outside the event loop. For example, to cancel a future, don't call -directly its :meth:`Future.cancel` method, but:: +Almost all asyncio objects are not thread safe, which is typically +not a problem unless there is code that works with them from outside +of a Task or a callback. If there's a need for such code to call a +low-level asyncio API, the :meth:`loop.call_soon_threadsafe` method +should be used, e.g.:: loop.call_soon_threadsafe(fut.cancel) -To handle signals and to execute subprocesses, the event loop must be run in -the main thread. - -To schedule a coroutine object from a different thread, the +To schedule a coroutine object from a different OS thread, the :func:`run_coroutine_threadsafe` function should be used. It returns a :class:`concurrent.futures.Future` to access the result:: - future = asyncio.run_coroutine_threadsafe(coro_func(), loop) - result = future.result(timeout) # Wait for the result with a timeout - -The :meth:`AbstractEventLoop.run_in_executor` method can be used with a thread pool -executor to execute a callback in different thread to not block the thread of -the event loop. - -.. seealso:: + async def coro_func(): + return await asyncio.sleep(1, 42) - The :ref:`Synchronization primitives <asyncio-sync>` section describes ways - to synchronize tasks. + # Later in another OS thread: - The :ref:`Subprocess and threads <asyncio-subprocess-threads>` section lists - asyncio limitations to run subprocesses from different threads. + future = asyncio.run_coroutine_threadsafe(coro_func(), loop) + # Wait for the result: + result = future.result() +To handle signals and to execute subprocesses, the event loop must be +run in the main thread. +The :meth:`loop.run_in_executor` method can be used with a +:class:`concurrent.futures.ThreadPoolExecutor` to execute +blocking code in a different OS thread without blocking the OS thread +that the event loop runs in. .. _asyncio-handle-blocking: -Handle blocking functions correctly ------------------------------------ - -Blocking functions should not be called directly. For example, if a function -blocks for 1 second, other tasks are delayed by 1 second which can have an -important impact on reactivity. +Running Blocking Code +===================== -For networking and subprocesses, the :mod:`asyncio` module provides high-level -APIs like :ref:`protocols <asyncio-protocol>`. +Blocking (CPU-bound) code should not be called directly. For example, +if a function performs a CPU-intensive calculation for 1 second, +all concurrent asyncio Tasks and IO operations would be delayed +by 1 second. -An executor can be used to run a task in a different thread or even in a -different process, to not block the thread of the event loop. See the -:meth:`AbstractEventLoop.run_in_executor` method. - -.. seealso:: - - The :ref:`Delayed calls <asyncio-delayed-calls>` section details how the - event loop handles time. +An executor can be used to run a task in a different thread or even in +a different process to avoid blocking block the OS thread with the +event loop. See the :meth:`loop.run_in_executor` method for more +details. .. _asyncio-logger: Logging -------- - -The :mod:`asyncio` module logs information with the :mod:`logging` module in -the logger ``'asyncio'``. +======= -The default log level for the :mod:`asyncio` module is :py:data:`logging.INFO`. -For those not wanting such verbosity from :mod:`asyncio` the log level can -be changed. For example, to change the level to :py:data:`logging.WARNING`: +asyncio uses the :mod:`logging` module and all logging is performed +via the ``"asyncio"`` logger. -.. code-block:: none +The default log level is :py:data:`logging.INFO`, which can be easily +adjusted:: - logging.getLogger('asyncio').setLevel(logging.WARNING) + logging.getLogger("asyncio").setLevel(logging.WARNING) .. _asyncio-coroutine-not-scheduled: -Detect coroutine objects never scheduled ----------------------------------------- +Detect never-awaited coroutines +=============================== -When a coroutine function is called and its result is not passed to -:func:`ensure_future` or to the :meth:`AbstractEventLoop.create_task` method, -the execution of the coroutine object will never be scheduled which is -probably a bug. :ref:`Enable the debug mode of asyncio <asyncio-debug-mode>` -to :ref:`log a warning <asyncio-logger>` to detect it. - -Example with the bug:: +When a coroutine function is called, but not awaited +(e.g. ``coro()`` instead of ``await coro()``) +or the coroutine is not scheduled with :meth:`asyncio.create_task`, asyncio +will emit a :exc:`RuntimeWarning`:: import asyncio async def test(): print("never scheduled") + async def main(): + test() + + asyncio.run(main()) + +Output:: + + test.py:7: RuntimeWarning: coroutine 'test' was never awaited test() Output in debug mode:: - Coroutine test() at test.py:3 was never yielded from - Coroutine object created at (most recent call last): - File "test.py", line 7, in <module> - test() + test.py:7: RuntimeWarning: coroutine 'test' was never awaited + Coroutine created at (most recent call last) + File "../t.py", line 9, in <module> + asyncio.run(main(), debug=True) + + < .. > -The fix is to call the :func:`ensure_future` function or the -:meth:`AbstractEventLoop.create_task` method with the coroutine object. + File "../t.py", line 7, in main + test() + test() -.. seealso:: +The usual fix is to either await the coroutine or call the +:meth:`asyncio.create_task` function:: - :ref:`Pending task destroyed <asyncio-pending-task-destroyed>`. + async def main(): + await test() -Detect exceptions never consumed --------------------------------- +Detect never-retrieved exceptions +================================= -Python usually calls :func:`sys.excepthook` on unhandled exceptions. If -:meth:`Future.set_exception` is called, but the exception is never consumed, -:func:`sys.excepthook` is not called. Instead, :ref:`a log is emitted -<asyncio-logger>` when the future is deleted by the garbage collector, with the -traceback where the exception was raised. +If a :meth:`Future.set_exception` is called but the Future object is +never awaited on, the exception would never be propagated to the +user code. In this case, asyncio would emit a log message when the +Future object is garbage collected. -Example of unhandled exception:: +Example of an unhandled exception:: import asyncio - @asyncio.coroutine - def bug(): + async def bug(): raise Exception("not consumed") - loop = asyncio.get_event_loop() - asyncio.ensure_future(bug()) - loop.run_forever() - loop.close() + async def main(): + asyncio.create_task(bug()) + + asyncio.run(main()) Output:: Task exception was never retrieved - future: <Task finished coro=<coro() done, defined at asyncio/coroutines.py:139> exception=Exception('not consumed',)> - Traceback (most recent call last): - File "asyncio/tasks.py", line 237, in _step - result = next(coro) - File "asyncio/coroutines.py", line 141, in coro - res = func(*args, **kw) - File "test.py", line 5, in bug - raise Exception("not consumed") - Exception: not consumed - -:ref:`Enable the debug mode of asyncio <asyncio-debug-mode>` to get the -traceback where the task was created. Output in debug mode:: + future: <Task finished coro=<bug() done, defined at test.py:3> + exception=Exception('not consumed')> - Task exception was never retrieved - future: <Task finished coro=<bug() done, defined at test.py:3> exception=Exception('not consumed',) created at test.py:8> - source_traceback: Object created at (most recent call last): - File "test.py", line 8, in <module> - asyncio.ensure_future(bug()) Traceback (most recent call last): - File "asyncio/tasks.py", line 237, in _step - result = next(coro) - File "asyncio/coroutines.py", line 79, in __next__ - return next(self.gen) - File "asyncio/coroutines.py", line 141, in coro - res = func(*args, **kw) - File "test.py", line 5, in bug + File "test.py", line 4, in bug raise Exception("not consumed") Exception: not consumed -There are different options to fix this issue. The first option is to chain the -coroutine in another coroutine and use classic try/except:: - - async def handle_exception(): - try: - await bug() - except Exception: - print("exception consumed") - - loop = asyncio.get_event_loop() - asyncio.ensure_future(handle_exception()) - loop.run_forever() - loop.close() - -Another option is to use the :meth:`AbstractEventLoop.run_until_complete` -function:: - - task = asyncio.ensure_future(bug()) - try: - loop.run_until_complete(task) - except Exception: - print("exception consumed") - -.. seealso:: - - The :meth:`Future.exception` method. - - -Chain coroutines correctly --------------------------- - -When a coroutine function calls other coroutine functions and tasks, they -should be chained explicitly with ``await``. Otherwise, the execution is -not guaranteed to be sequential. +:ref:`Enable the debug mode <asyncio-debug-mode>` to get the +traceback where the task was created:: -Example with different bugs using :func:`asyncio.sleep` to simulate slow -operations:: + asyncio.run(main(), debug=True) - import asyncio - - async def create(): - await asyncio.sleep(3.0) - print("(1) create file") - - async def write(): - await asyncio.sleep(1.0) - print("(2) write into file") - - async def close(): - print("(3) close file") - - async def test(): - asyncio.ensure_future(create()) - asyncio.ensure_future(write()) - asyncio.ensure_future(close()) - await asyncio.sleep(2.0) - loop.stop() - - loop = asyncio.get_event_loop() - asyncio.ensure_future(test()) - loop.run_forever() - print("Pending tasks at exit: %s" % asyncio.Task.all_tasks(loop)) - loop.close() - -Expected output: - -.. code-block:: none - - (1) create file - (2) write into file - (3) close file - Pending tasks at exit: set() - -Actual output: - -.. code-block:: none - - (3) close file - (2) write into file - Pending tasks at exit: {<Task pending create() at test.py:7 wait_for=<Future pending cb=[Task._wakeup()]>>} - Task was destroyed but it is pending! - task: <Task pending create() done at test.py:5 wait_for=<Future pending cb=[Task._wakeup()]>> - -The loop stopped before the ``create()`` finished, ``close()`` has been called -before ``write()``, whereas coroutine functions were called in this order: -``create()``, ``write()``, ``close()``. - -To fix the example, tasks must be marked with ``await``:: - - async def test(): - await asyncio.ensure_future(create()) - await asyncio.ensure_future(write()) - await asyncio.ensure_future(close()) - await asyncio.sleep(2.0) - loop.stop() - -Or without ``asyncio.ensure_future()``:: - - async def test(): - await create() - await write() - await close() - await asyncio.sleep(2.0) - loop.stop() - - -.. _asyncio-pending-task-destroyed: - -Pending task destroyed ----------------------- - -If a pending task is destroyed, the execution of its wrapped :ref:`coroutine -<coroutine>` did not complete. It is probably a bug and so a warning is logged. - -Example of log: - -.. code-block:: none - - Task was destroyed but it is pending! - task: <Task pending coro=<kill_me() done, defined at test.py:5> wait_for=<Future pending cb=[Task._wakeup()]>> - -:ref:`Enable the debug mode of asyncio <asyncio-debug-mode>` to get the -traceback where the task was created. Example of log in debug mode: +Output in debug mode:: -.. code-block:: none + Task exception was never retrieved + future: <Task finished coro=<bug() done, defined at test.py:3> + exception=Exception('not consumed') created at asyncio/tasks.py:321> - Task was destroyed but it is pending! source_traceback: Object created at (most recent call last): - File "test.py", line 15, in <module> - task = asyncio.ensure_future(coro, loop=loop) - task: <Task pending coro=<kill_me() done, defined at test.py:5> wait_for=<Future pending cb=[Task._wakeup()] created at test.py:7> created at test.py:15> - - -.. seealso:: - - :ref:`Detect coroutine objects never scheduled <asyncio-coroutine-not-scheduled>`. - -.. _asyncio-close-transports: + File "../t.py", line 9, in <module> + asyncio.run(main(), debug=True) -Close transports and event loops --------------------------------- + < .. > -When a transport is no more needed, call its ``close()`` method to release -resources. Event loops must also be closed explicitly. - -If a transport or an event loop is not closed explicitly, a -:exc:`ResourceWarning` warning will be emitted in its destructor. By default, -:exc:`ResourceWarning` warnings are ignored. The :ref:`Debug mode of asyncio -<asyncio-debug-mode>` section explains how to display them. + Traceback (most recent call last): + File "../t.py", line 4, in bug + raise Exception("not consumed") + Exception: not consumed diff --git a/Doc/library/asyncio-eventloop.rst b/Doc/library/asyncio-eventloop.rst index 317f3fb85c..3b13a81a5b 100644 --- a/Doc/library/asyncio-eventloop.rst +++ b/Doc/library/asyncio-eventloop.rst @@ -1,103 +1,159 @@ .. currentmodule:: asyncio -.. _asyncio-event-loop: -Base Event Loop -=============== +========== +Event Loop +========== -**Source code:** :source:`Lib/asyncio/events.py` -The event loop is the central execution device provided by :mod:`asyncio`. -It provides multiple facilities, including: +.. rubric:: Preface -* Registering, executing and cancelling delayed calls (timeouts). +The event loop is the core of every asyncio application. +Event loops run asynchronous tasks and callbacks, perform network +IO operations, and run subprocesses. -* Creating client and server :ref:`transports <asyncio-transport>` for various - kinds of communication. +Application developers should typically use the high-level asyncio functions, +such as :func:`asyncio.run`, and should rarely need to reference the loop +object or call its methods. This section is intended mostly for authors +of lower-level code, libraries, and frameworks, who need finer control over +the event loop behavior. -* Launching subprocesses and the associated :ref:`transports - <asyncio-transport>` for communication with an external program. +.. rubric:: Obtaining the Event Loop -* Delegating costly function calls to a pool of threads. +The following low-level functions can be used to get, set, or create +an event loop: -.. class:: BaseEventLoop +.. function:: get_running_loop() - This class is an implementation detail. It is a subclass of - :class:`AbstractEventLoop` and may be a base class of concrete - event loop implementations found in :mod:`asyncio`. It should not - be used directly; use :class:`AbstractEventLoop` instead. - ``BaseEventLoop`` should not be subclassed by third-party code; the - internal interface is not stable. + Return the running event loop in the current OS thread. -.. class:: AbstractEventLoop + If there is no running event loop a :exc:`RuntimeError` is raised. + This function can only be called from a coroutine or a callback. - Abstract base class of event loops. + .. versionadded:: 3.7 - This class is :ref:`not thread safe <asyncio-multithreading>`. +.. function:: get_event_loop() -Run an event loop ------------------ + Get the current event loop. If there is no current event loop set + in the current OS thread and :func:`set_event_loop` has not yet + been called, asyncio will create a new event loop and set it as the + current one. -.. method:: AbstractEventLoop.run_forever() + Because this function has rather complex behavior (especially + when custom event loop policies are in use), using the + :func:`get_running_loop` function is preferred to :func:`get_event_loop` + in coroutines and callbacks. - Run until :meth:`stop` is called. If :meth:`stop` is called before - :meth:`run_forever()` is called, this polls the I/O selector once - with a timeout of zero, runs all callbacks scheduled in response to - I/O events (and those that were already scheduled), and then exits. - If :meth:`stop` is called while :meth:`run_forever` is running, - this will run the current batch of callbacks and then exit. Note - that callbacks scheduled by callbacks will not run in that case; - they will run the next time :meth:`run_forever` is called. + Consider also using the :func:`asyncio.run` function instead of using + lower level functions to manually create and close an event loop. - .. versionchanged:: 3.5.1 +.. function:: set_event_loop(loop) -.. method:: AbstractEventLoop.run_until_complete(future) + Set *loop* as a current event loop for the current OS thread. - Run until the :class:`Future` is done. +.. function:: new_event_loop() - If the argument is a :ref:`coroutine object <coroutine>`, it is wrapped by - :func:`ensure_future`. + Create a new event loop object. - Return the Future's result, or raise its exception. +Note that the behaviour of :func:`get_event_loop`, :func:`set_event_loop`, +and :func:`new_event_loop` functions can be altered by +:ref:`setting a custom event loop policy <asyncio-policies>`. -.. method:: AbstractEventLoop.is_running() - Returns running status of event loop. +.. rubric:: Contents -.. method:: AbstractEventLoop.stop() +This documentation page contains the following sections: - Stop running the event loop. +* The `Event Loop Methods`_ section is the reference documentation of + the event loop APIs; - This causes :meth:`run_forever` to exit at the next suitable - opportunity (see there for more details). +* The `Callback Handles`_ section documents the :class:`Handle` and + :class:`TimerHandle` instances which are returned from scheduling + methods such as :meth:`loop.call_soon` and :meth:`loop.call_later`; - .. versionchanged:: 3.5.1 +* The `Server Objects`_ section documents types returned from + event loop methods like :meth:`loop.create_server`; + +* The `Event Loop Implementations`_ section documents the + :class:`SelectorEventLoop` and :class:`ProactorEventLoop` classes; + +* The `Examples`_ section showcases how to work with some event + loop APIs. + + +.. _asyncio-event-loop: + +Event Loop Methods +================== + +Event loops have **low-level** APIs for the following: + +.. contents:: + :depth: 1 + :local: + + +Running and stopping the loop +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +.. method:: loop.run_until_complete(future) + + Run until the *future* (an instance of :class:`Future`) has + completed. + + If the argument is a :ref:`coroutine object <coroutine>` it + is implicitly scheduled to run as a :class:`asyncio.Task`. + + Return the Future's result or raise its exception. + +.. method:: loop.run_forever() + + Run the event loop until :meth:`stop` is called. + + If :meth:`stop` is called before :meth:`run_forever()` is called, + the loop will poll the I/O selector once with a timeout of zero, + run all callbacks scheduled in response to I/O events (and + those that were already scheduled), and then exit. + + If :meth:`stop` is called while :meth:`run_forever` is running, + the loop will run the current batch of callbacks and then exit. + Note that new callbacks scheduled by callbacks will not run in this + case; instead, they will run the next time :meth:`run_forever` or + :meth:`run_until_complete` is called. + +.. method:: loop.stop() + + Stop the event loop. -.. method:: AbstractEventLoop.is_closed() +.. method:: loop.is_running() - Returns ``True`` if the event loop was closed. + Return ``True`` if the event loop is currently running. - .. versionadded:: 3.4.2 +.. method:: loop.is_closed() -.. method:: AbstractEventLoop.close() + Return ``True`` if the event loop was closed. - Close the event loop. The loop must not be running. Pending - callbacks will be lost. +.. method:: loop.close() - This clears the queues and shuts down the executor, but does not wait for - the executor to finish. + Close the event loop. - This is idempotent and irreversible. No other methods should be called after - this one. + The loop must be running when this function is called. + Any pending callbacks will be discarded. + This method clears all queues and shuts down the executor, but does + not wait for the executor to finish. -.. coroutinemethod:: AbstractEventLoop.shutdown_asyncgens() + This method is idempotent and irreversible. No other methods + should be called after the event loop is closed. + +.. coroutinemethod:: loop.shutdown_asyncgens() Schedule all currently open :term:`asynchronous generator` objects to close with an :meth:`~agen.aclose()` call. After calling this method, - the event loop will issue a warning whenever a new asynchronous generator - is iterated. Should be used to finalize all scheduled asynchronous - generators reliably. Example:: + the event loop will issue a warning if a new asynchronous generator + is iterated. This should be used to reliably finalize all scheduled + asynchronous generators, e.g.:: + try: loop.run_forever() @@ -108,232 +164,223 @@ Run an event loop .. versionadded:: 3.6 -.. _asyncio-pass-keywords: - -Calls ------ - -Most :mod:`asyncio` functions don't accept keywords. If you want to pass -keywords to your callback, use :func:`functools.partial`. For example, -``loop.call_soon(functools.partial(print, "Hello", flush=True))`` will call -``print("Hello", flush=True)``. - -.. note:: - :func:`functools.partial` is better than ``lambda`` functions, because - :mod:`asyncio` can inspect :func:`functools.partial` object to display - parameters in debug mode, whereas ``lambda`` functions have a poor - representation. +Scheduling callbacks +^^^^^^^^^^^^^^^^^^^^ -.. method:: AbstractEventLoop.call_soon(callback, *args, context=None) +.. method:: loop.call_soon(callback, *args, context=None) - Arrange for a callback to be called as soon as possible. The callback is - called after :meth:`call_soon` returns, when control returns to the event - loop. + Schedule a *callback* to be called with *args* arguments at + the next iteration of the event loop. - This operates as a :abbr:`FIFO (first-in, first-out)` queue, callbacks - are called in the order in which they are registered. Each callback - will be called exactly once. + Callbacks are called in the order in which they are registered. + Each callback will be called exactly once. - Any positional arguments after the callback will be passed to the - callback when it is called. - - An optional keyword-only *context* argument allows specifying a custom - :class:`contextvars.Context` for the *callback* to run in. The current - context is used when no *context* is provided. + An optional keyword-only *context* argument allows specifying a + custom :class:`contextvars.Context` for the *callback* to run in. + The current context is used when no *context* is provided. An instance of :class:`asyncio.Handle` is returned, which can be - used to cancel the callback. - - :ref:`Use functools.partial to pass keywords to the callback - <asyncio-pass-keywords>`. + used later to cancel the callback. - .. versionchanged:: 3.7 - The *context* keyword-only parameter was added. See :pep:`567` - for more details. + This method is not thread-safe. -.. method:: AbstractEventLoop.call_soon_threadsafe(callback, *args, context=None) +.. method:: loop.call_soon_threadsafe(callback, *args, context=None) - Like :meth:`call_soon`, but thread safe. + A thread-safe variant of :meth:`call_soon`. Must be used to + schedule callbacks *from another thread*. See the :ref:`concurrency and multithreading <asyncio-multithreading>` section of the documentation. - .. versionchanged:: 3.7 - The *context* keyword-only parameter was added. See :pep:`567` - for more details. +.. versionchanged:: 3.7 + The *context* keyword-only parameter was added. See :pep:`567` + for more details. +.. _asyncio-pass-keywords: -.. _asyncio-delayed-calls: +.. note:: -Delayed calls -------------- + Most :mod:`asyncio` scheduling functions don't allow passing + keyword arguments. To do that, use :func:`functools.partial`:: -The event loop has its own internal clock for computing timeouts. -Which clock is used depends on the (platform-specific) event loop -implementation; ideally it is a monotonic clock. This will generally be -a different clock than :func:`time.time`. + # will schedule "print("Hello", flush=True)" + loop.call_soon( + functools.partial(print, "Hello", flush=True)) -.. note:: + Using partial objects is usually more convenient than using lambdas, + as asyncio can render partial objects better in debug and error + messages. - Timeouts (relative *delay* or absolute *when*) should not exceed one day. +.. _asyncio-delayed-calls: + +Scheduling delayed callbacks +^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -.. method:: AbstractEventLoop.call_later(delay, callback, *args, context=None) +Event loop provides mechanisms to schedule callback functions +to be called at some point in the future. Event loop uses monotonic +clocks to track time. - Arrange for the *callback* to be called after the given *delay* - seconds (either an int or float). - An instance of :class:`asyncio.TimerHandle` is returned, which can be - used to cancel the callback. +.. method:: loop.call_later(delay, callback, *args, context=None) - *callback* will be called exactly once per call to :meth:`call_later`. - If two callbacks are scheduled for exactly the same time, it is - undefined which will be called first. + Schedule *callback* to be called after the given *delay* + number of seconds (can be either an int or a float). - The optional positional *args* will be passed to the callback when it - is called. If you want the callback to be called with some named - arguments, use a closure or :func:`functools.partial`. + An instance of :class:`asyncio.TimerHandle` is returned which can + be used to cancel the callback. - An optional keyword-only *context* argument allows specifying a custom - :class:`contextvars.Context` for the *callback* to run in. The current - context is used when no *context* is provided. + *callback* will be called exactly once. If two callbacks are + scheduled for exactly the same time, the order in which they + are called is undefined. - :ref:`Use functools.partial to pass keywords to the callback - <asyncio-pass-keywords>`. + The optional positional *args* will be passed to the callback when + it is called. If you want the callback to be called with keyword + arguments use :func:`functools.partial`. + + An optional keyword-only *context* argument allows specifying a + custom :class:`contextvars.Context` for the *callback* to run in. + The current context is used when no *context* is provided. .. versionchanged:: 3.7 The *context* keyword-only parameter was added. See :pep:`567` for more details. -.. method:: AbstractEventLoop.call_at(when, callback, *args, context=None) + .. versionchanged:: 3.7.1 + In Python 3.7.0 and earlier with the default event loop implementation, + the *delay* could not exceed one day. + This has been fixed in Python 3.7.1. - Arrange for the *callback* to be called at the given absolute timestamp - *when* (an int or float), using the same time reference as - :meth:`AbstractEventLoop.time`. +.. method:: loop.call_at(when, callback, *args, context=None) - This method's behavior is the same as :meth:`call_later`. + Schedule *callback* to be called at the given absolute timestamp + *when* (an int or a float), using the same time reference as + :meth:`loop.time`. - An instance of :class:`asyncio.TimerHandle` is returned, which can be - used to cancel the callback. + This method's behavior is the same as :meth:`call_later`. - :ref:`Use functools.partial to pass keywords to the callback - <asyncio-pass-keywords>`. + An instance of :class:`asyncio.TimerHandle` is returned which can + be used to cancel the callback. .. versionchanged:: 3.7 The *context* keyword-only parameter was added. See :pep:`567` for more details. -.. method:: AbstractEventLoop.time() + .. versionchanged:: 3.7.1 + In Python 3.7.0 and earlier with the default event loop implementation, + the difference between *when* and the current time could not exceed + one day. This has been fixed in Python 3.7.1. - Return the current time, as a :class:`float` value, according to the - event loop's internal clock. +.. method:: loop.time() -.. seealso:: + Return the current time, as a :class:`float` value, according to + the event loop's internal monotonic clock. - The :func:`asyncio.sleep` function. +.. note:: + Timeouts (relative *delay* or absolute *when*) should not + exceed one day. -Futures -------- +.. seealso:: -.. method:: AbstractEventLoop.create_future() + The :func:`asyncio.sleep` function. - Create an :class:`asyncio.Future` object attached to the loop. - This is a preferred way to create futures in asyncio, as event - loop implementations can provide alternative implementations - of the Future class (with better performance or instrumentation). +Creating Futures and Tasks +^^^^^^^^^^^^^^^^^^^^^^^^^^ - .. versionadded:: 3.5.2 +.. method:: loop.create_future() + Create an :class:`asyncio.Future` object attached to the event loop. -Tasks ------ + This is the preferred way to create Futures in asyncio. This lets + third-party event loops provide alternative implementations of + the Future object (with better performance or instrumentation). -.. method:: AbstractEventLoop.create_task(coro) + .. versionadded:: 3.5.2 - Schedule the execution of a :ref:`coroutine object <coroutine>`: wrap it in - a future. Return a :class:`Task` object. +.. method:: loop.create_task(coro) - Third-party event loops can use their own subclass of :class:`Task` for - interoperability. In this case, the result type is a subclass of - :class:`Task`. + Schedule the execution of a :ref:`coroutine`. + Return a :class:`Task` object. - .. versionadded:: 3.4.2 + Third-party event loops can use their own subclass of :class:`Task` + for interoperability. In this case, the result type is a subclass + of :class:`Task`. -.. method:: AbstractEventLoop.set_task_factory(factory) +.. method:: loop.set_task_factory(factory) Set a task factory that will be used by - :meth:`AbstractEventLoop.create_task`. + :meth:`loop.create_task`. If *factory* is ``None`` the default task factory will be set. + Otherwise, *factory* must be a *callable* with the signature matching + ``(loop, coro)``, where *loop* is a reference to the active + event loop, and *coro* is a coroutine object. The callable + must return a :class:`asyncio.Future`-compatible object. - If *factory* is a *callable*, it should have a signature matching - ``(loop, coro)``, where *loop* will be a reference to the active - event loop, *coro* will be a coroutine object. The callable - must return an :class:`asyncio.Future` compatible object. +.. method:: loop.get_task_factory() - .. versionadded:: 3.4.4 + Return a task factory or ``None`` if the default one is in use. -.. method:: AbstractEventLoop.get_task_factory() - Return a task factory, or ``None`` if the default one is in use. +Opening network connections +^^^^^^^^^^^^^^^^^^^^^^^^^^^ - .. versionadded:: 3.4.4 +.. coroutinemethod:: loop.create_connection(protocol_factory, \ + host=None, port=None, \*, ssl=None, \ + family=0, proto=0, flags=0, sock=None, \ + local_addr=None, server_hostname=None, \ + ssl_handshake_timeout=None) + Open a streaming transport connection to a given + address specified by *host* and *port*. -Creating connections --------------------- + The socket family can be either :py:data:`~socket.AF_INET` or + :py:data:`~socket.AF_INET6` depending on *host* (or the *family* + argument, if provided). -.. coroutinemethod:: AbstractEventLoop.create_connection(protocol_factory, host=None, port=None, \*, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None) + The socket type will be :py:data:`~socket.SOCK_STREAM`. - Create a streaming transport connection to a given Internet *host* and - *port*: socket family :py:data:`~socket.AF_INET` or - :py:data:`~socket.AF_INET6` depending on *host* (or *family* if specified), - socket type :py:data:`~socket.SOCK_STREAM`. *protocol_factory* must be a - callable returning a :ref:`protocol <asyncio-protocol>` instance. + *protocol_factory* must be a callable returning an + :ref:`asyncio protocol <asyncio-protocol>` implementation. This method will try to establish the connection in the background. When successful, it returns a ``(transport, protocol)`` pair. The chronological synopsis of the underlying operation is as follows: - #. The connection is established, and a :ref:`transport <asyncio-transport>` - is created to represent it. + #. The connection is established and a :ref:`transport <asyncio-transport>` + is created for it. - #. *protocol_factory* is called without arguments and must return a - :ref:`protocol <asyncio-protocol>` instance. + #. *protocol_factory* is called without arguments and is expected to + return a :ref:`protocol <asyncio-protocol>` instance. - #. The protocol instance is tied to the transport, and its - :meth:`connection_made` method is called. + #. The protocol instance is coupled with the transport by calling its + :meth:`~BaseProtocol.connection_made` method. - #. The coroutine returns successfully with the ``(transport, protocol)`` - pair. + #. A ``(transport, protocol)`` tuple is returned on success. - The created transport is an implementation-dependent bidirectional stream. + The created transport is an implementation-dependent bidirectional + stream. - .. note:: - *protocol_factory* can be any kind of callable, not necessarily - a class. For example, if you want to use a pre-created - protocol instance, you can pass ``lambda: my_protocol``. - - Options that change how the connection is created: + Other arguments: * *ssl*: if given and not false, a SSL/TLS transport is created (by default a plain TCP transport is created). If *ssl* is a :class:`ssl.SSLContext` object, this context is used to create - the transport; if *ssl* is :const:`True`, a context with some - unspecified default settings is used. + the transport; if *ssl* is :const:`True`, a default context returned + from :func:`ssl.create_default_context` is used. .. seealso:: :ref:`SSL/TLS security considerations <ssl-security>` - * *server_hostname*, is only for use together with *ssl*, - and sets or overrides the hostname that the target server's certificate - will be matched against. By default the value of the *host* argument + * *server_hostname* sets or overrides the hostname that the target + server's certificate will be matched against. Should only be passed + if *ssl* is not ``None``. By default the value of the *host* argument is used. If *host* is empty, there is no default and you must pass a value for *server_hostname*. If *server_hostname* is an empty string, hostname matching is disabled (which is a serious security - risk, allowing for man-in-the-middle-attacks). + risk, allowing for potential man-in-the-middle attacks). * *family*, *proto*, *flags* are the optional address family, protocol and flags to be passed through to getaddrinfo() for *host* resolution. @@ -347,38 +394,51 @@ Creating connections * *local_addr*, if given, is a ``(local_host, local_port)`` tuple used to bind the socket to locally. The *local_host* and *local_port* - are looked up using getaddrinfo(), similarly to *host* and *port*. + are looked up using ``getaddrinfo()``, similarly to *host* and *port*. - * *ssl_handshake_timeout* is (for an SSL connection) the time in seconds - to wait for the SSL handshake to complete before aborting the connection. + * *ssl_handshake_timeout* is (for a TLS connection) the time in seconds + to wait for the TLS handshake to complete before aborting the connection. ``60.0`` seconds if ``None`` (default). .. versionadded:: 3.7 The *ssl_handshake_timeout* parameter. + .. versionchanged:: 3.6 + + The socket option :py:data:`~socket.TCP_NODELAY` is set by default + for all TCP connections. + .. versionchanged:: 3.5 - On Windows with :class:`ProactorEventLoop`, SSL/TLS is now supported. + Added support for SSL/TLS in :class:`ProactorEventLoop`. .. seealso:: - The :func:`open_connection` function can be used to get a pair of - (:class:`StreamReader`, :class:`StreamWriter`) instead of a protocol. + The :func:`open_connection` function is a high-level alternative + API. It returns a pair of (:class:`StreamReader`, :class:`StreamWriter`) + that can be used directly in async/await code. +.. coroutinemethod:: loop.create_datagram_endpoint(protocol_factory, \ + local_addr=None, remote_addr=None, \*, \ + family=0, proto=0, flags=0, \ + reuse_address=None, reuse_port=None, \ + allow_broadcast=None, sock=None) -.. coroutinemethod:: AbstractEventLoop.create_datagram_endpoint(protocol_factory, local_addr=None, remote_addr=None, \*, family=0, proto=0, flags=0, reuse_address=None, reuse_port=None, allow_broadcast=None, sock=None) + Create a datagram connection. - Create datagram connection: socket family :py:data:`~socket.AF_INET`, - :py:data:`~socket.AF_INET6` or :py:data:`~socket.AF_UNIX` depending on - *host* (or *family* if specified), socket type - :py:data:`~socket.SOCK_DGRAM`. *protocol_factory* must be a - callable returning a :ref:`protocol <asyncio-protocol>` instance. + The socket family can be either :py:data:`~socket.AF_INET`, + :py:data:`~socket.AF_INET6`, or :py:data:`~socket.AF_UNIX`, + depending on *host* (or the *family* argument, if provided). - This method will try to establish the connection in the background. - When successful, it returns a ``(transport, protocol)`` pair. + The socket type will be :py:data:`~socket.SOCK_DGRAM`. + + *protocol_factory* must be a callable returning a + :ref:`protocol <asyncio-protocol>` implementation. - Options changing how the connection is created: + A tuple of ``(transport, protocol)`` is returned on success. + + Other arguments: * *local_addr*, if given, is a ``(local_host, local_port)`` tuple used to bind the socket to locally. The *local_host* and *local_port* @@ -394,14 +454,14 @@ Creating connections corresponding :mod:`socket` module constants. * *reuse_address* tells the kernel to reuse a local socket in - TIME_WAIT state, without waiting for its natural timeout to + ``TIME_WAIT`` state, without waiting for its natural timeout to expire. If not specified will automatically be set to ``True`` on - UNIX. + Unix. * *reuse_port* tells the kernel to allow this endpoint to be bound to the same port as other existing endpoints are bound to, so long as they all set this flag when being created. This option is not supported on Windows - and some UNIX's. If the :py:data:`~socket.SO_REUSEPORT` constant is not + and some Unixes. If the :py:data:`~socket.SO_REUSEPORT` constant is not defined then this capability is unsupported. * *allow_broadcast* tells the kernel to allow this endpoint to send @@ -412,7 +472,7 @@ Creating connections transport. If specified, *local_addr* and *remote_addr* should be omitted (must be :const:`None`). - On Windows with :class:`ProactorEventLoop`, this method is not supported. + On Windows, with :class:`ProactorEventLoop`, this method is not supported. See :ref:`UDP echo client protocol <asyncio-udp-echo-client-protocol>` and :ref:`UDP echo server protocol <asyncio-udp-echo-server-protocol>` examples. @@ -421,23 +481,26 @@ Creating connections The *family*, *proto*, *flags*, *reuse_address*, *reuse_port, *allow_broadcast*, and *sock* parameters were added. -.. coroutinemethod:: AbstractEventLoop.create_unix_connection(protocol_factory, path=None, \*, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None) +.. coroutinemethod:: loop.create_unix_connection(protocol_factory, \ + path=None, \*, ssl=None, sock=None, \ + server_hostname=None, ssl_handshake_timeout=None) - Create UNIX connection: socket family :py:data:`~socket.AF_UNIX`, socket - type :py:data:`~socket.SOCK_STREAM`. The :py:data:`~socket.AF_UNIX` socket - family is used to communicate between processes on the same machine - efficiently. + Create a Unix connection. - This method will try to establish the connection in the background. - When successful, it returns a ``(transport, protocol)`` pair. + The socket family will be :py:data:`~socket.AF_UNIX`; socket + type will be :py:data:`~socket.SOCK_STREAM`. + + A tuple of ``(transport, protocol)`` is returned on success. - *path* is the name of a UNIX domain socket, and is required unless a *sock* - parameter is specified. Abstract UNIX sockets, :class:`str`, - :class:`bytes`, and :class:`~pathlib.Path` paths are supported. + *path* is the name of a Unix domain socket and is required, + unless a *sock* parameter is specified. Abstract Unix sockets, + :class:`str`, :class:`bytes`, and :class:`~pathlib.Path` paths are + supported. - See the :meth:`AbstractEventLoop.create_connection` method for parameters. + See the documentation of the :meth:`loop.create_connection` method + for information about arguments to this method. - Availability: UNIX. + Availability: Unix. .. versionadded:: 3.7 @@ -448,55 +511,68 @@ Creating connections The *path* parameter can now be a :term:`path-like object`. -Creating listening connections ------------------------------- +Creating network servers +^^^^^^^^^^^^^^^^^^^^^^^^ -.. coroutinemethod:: AbstractEventLoop.create_server(protocol_factory, host=None, port=None, \*, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, start_serving=True) +.. coroutinemethod:: loop.create_server(protocol_factory, \ + host=None, port=None, \*, \ + family=socket.AF_UNSPEC, \ + flags=socket.AI_PASSIVE, \ + sock=None, backlog=100, ssl=None, \ + reuse_address=None, reuse_port=None, \ + ssl_handshake_timeout=None, start_serving=True) - Create a TCP server (socket type :data:`~socket.SOCK_STREAM`) bound to - *host* and *port*. + Create a TCP server (socket type :data:`~socket.SOCK_STREAM`) listening + on *port* of the *host* address. - Return a :class:`Server` object, its :attr:`~Server.sockets` attribute - contains created sockets. Use the :meth:`Server.close` method to stop the - server: close listening sockets. + Returns a :class:`Server` object. - Parameters: + Arguments: + + * *protocol_factory* must be a callable returning a + :ref:`protocol <asyncio-protocol>` implementation. + + * The *host* parameter can be set to several types which determine where + the server would be listening: - * The *host* parameter can be a string, in that case the TCP server is - bound to *host* and *port*. The *host* parameter can also be a sequence - of strings and in that case the TCP server is bound to all hosts of the - sequence. If *host* is an empty string or ``None``, all interfaces are - assumed and a list of multiple sockets will be returned (most likely one - for IPv4 and another one for IPv6). + - If *host* is a string, the TCP server is bound to a single network + interface specified by *host*. + + - If *host* is a sequence of strings, the TCP server is bound to all + network interfaces specified by the sequence. + + - If *host* is an empty string or ``None``, all interfaces are + assumed and a list of multiple sockets will be returned (most likely + one for IPv4 and another one for IPv6). * *family* can be set to either :data:`socket.AF_INET` or - :data:`~socket.AF_INET6` to force the socket to use IPv4 or IPv6. If not set - it will be determined from host (defaults to :data:`socket.AF_UNSPEC`). + :data:`~socket.AF_INET6` to force the socket to use IPv4 or IPv6. + If not set, the *family* will be determined from host name + (defaults to :data:`~socket.AF_UNSPEC`). * *flags* is a bitmask for :meth:`getaddrinfo`. * *sock* can optionally be specified in order to use a preexisting - socket object. If specified, *host* and *port* should be omitted (must be - :const:`None`). + socket object. If specified, *host* and *port* must not be specified. * *backlog* is the maximum number of queued connections passed to :meth:`~socket.socket.listen` (defaults to 100). - * *ssl* can be set to an :class:`~ssl.SSLContext` to enable SSL over the - accepted connections. + * *ssl* can be set to an :class:`~ssl.SSLContext` instance to enable + TLS over the accepted connections. * *reuse_address* tells the kernel to reuse a local socket in - TIME_WAIT state, without waiting for its natural timeout to + ``TIME_WAIT`` state, without waiting for its natural timeout to expire. If not specified will automatically be set to ``True`` on - UNIX. + Unix. * *reuse_port* tells the kernel to allow this endpoint to be bound to the same port as other existing endpoints are bound to, so long as they all set this flag when being created. This option is not supported on Windows. - * *ssl_handshake_timeout* is (for an SSL server) the time in seconds to wait - for the SSL handshake to complete before aborting the connection. + * *ssl_handshake_timeout* is (for a TLS server) the time in seconds to wait + for the TLS handshake to complete before aborting the connection. ``60.0`` seconds if ``None`` (default). * *start_serving* set to ``True`` (the default) causes the created server @@ -507,32 +583,44 @@ Creating listening connections .. versionadded:: 3.7 - *ssl_handshake_timeout* and *start_serving* parameters. + Added *ssl_handshake_timeout* and *start_serving* parameters. - .. versionchanged:: 3.5 + .. versionchanged:: 3.6 - On Windows with :class:`ProactorEventLoop`, SSL/TLS is now supported. + The socket option :py:data:`~socket.TCP_NODELAY` is set by default + for all TCP connections. - .. seealso:: + .. versionchanged:: 3.5 - The function :func:`start_server` creates a (:class:`StreamReader`, - :class:`StreamWriter`) pair and calls back a function with this pair. + Added support for SSL/TLS in :class:`ProactorEventLoop`. .. versionchanged:: 3.5.1 - The *host* parameter can now be a sequence of strings. + The *host* parameter can be a sequence of strings. + .. seealso:: -.. coroutinemethod:: AbstractEventLoop.create_unix_server(protocol_factory, path=None, \*, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, start_serving=True) + The :func:`start_server` function is a higher-level alternative API + that returns a pair of :class:`StreamReader` and :class:`StreamWriter` + that can be used in an async/await code. - Similar to :meth:`AbstractEventLoop.create_server`, but specific to the - socket family :py:data:`~socket.AF_UNIX`. - *path* is the name of a UNIX domain socket, and is required unless a *sock* - parameter is specified. Abstract UNIX sockets, :class:`str`, - :class:`bytes`, and :class:`~pathlib.Path` paths are supported. +.. coroutinemethod:: loop.create_unix_server(protocol_factory, path=None, \ + \*, sock=None, backlog=100, ssl=None, \ + ssl_handshake_timeout=None, start_serving=True) - Availability: UNIX. + Similar to :meth:`loop.create_server` but works with the + :py:data:`~socket.AF_UNIX` socket family. + + *path* is the name of a Unix domain socket, and is required, + unless a *sock* argument is provided. Abstract Unix sockets, + :class:`str`, :class:`bytes`, and :class:`~pathlib.Path` paths + are supported. + + See the documentation of the :meth:`loop.create_server` method + for information about arguments to this method. + + Availability: Unix. .. versionadded:: 3.7 @@ -542,26 +630,30 @@ Creating listening connections The *path* parameter can now be a :class:`~pathlib.Path` object. -.. coroutinemethod:: BaseEventLoop.connect_accepted_socket(protocol_factory, sock, \*, ssl=None, ssl_handshake_timeout=None) +.. coroutinemethod:: loop.connect_accepted_socket(protocol_factory, \ + sock, \*, ssl=None, ssl_handshake_timeout=None) - Handle an accepted connection. + Wrap an already accepted connection into a transport/protocol pair. - This is used by servers that accept connections outside of - asyncio but that use asyncio to handle them. + This method can be used by servers that accept connections outside + of asyncio but that use asyncio to handle them. Parameters: - * *sock* is a preexisting socket object returned from an ``accept`` - call. + * *protocol_factory* must be a callable returning a + :ref:`protocol <asyncio-protocol>` implementation. + + * *sock* is a preexisting socket object returned from + :meth:`socket.accept <socket.socket.accept>`. - * *ssl* can be set to an :class:`~ssl.SSLContext` to enable SSL over the - accepted connections. + * *ssl* can be set to an :class:`~ssl.SSLContext` to enable SSL over + the accepted connections. * *ssl_handshake_timeout* is (for an SSL connection) the time in seconds to wait for the SSL handshake to complete before aborting the connection. ``60.0`` seconds if ``None`` (default). - When completed it returns a ``(transport, protocol)`` pair. + Returns a ``(transport, protocol)`` pair. .. versionadded:: 3.7 @@ -570,15 +662,14 @@ Creating listening connections .. versionadded:: 3.5.3 -File Transferring ------------------ +Transferring files +^^^^^^^^^^^^^^^^^^ -.. coroutinemethod:: AbstractEventLoop.sendfile(transport, file, \ - offset=0, count=None, \ - *, fallback=True) +.. coroutinemethod:: loop.sendfile(transport, file, \ + offset=0, count=None, *, fallback=True) - Send a *file* to *transport*, return the total number of bytes - which were sent. + Send a *file* over a *transport*. Return the total number of bytes + sent. The method uses high-performance :meth:`os.sendfile` if available. @@ -586,167 +677,163 @@ File Transferring *offset* tells from where to start reading the file. If specified, *count* is the total number of bytes to transmit as opposed to - sending the file until EOF is reached. File position is updated on - return or also in case of error in which case :meth:`file.tell() - <io.IOBase.tell>` can be used to figure out the number of bytes - which were sent. + sending the file until EOF is reached. File position is always updated, + even when this method raises an error, and + :meth:`file.tell() <io.IOBase.tell>` can be used to obtain the actual + number of bytes sent. *fallback* set to ``True`` makes asyncio to manually read and send - the file when the platform does not support the sendfile syscall + the file when the platform does not support the sendfile system call (e.g. Windows or SSL socket on Unix). Raise :exc:`SendfileNotAvailableError` if the system does not support - *sendfile* syscall and *fallback* is ``False``. + the *sendfile* syscall and *fallback* is ``False``. .. versionadded:: 3.7 TLS Upgrade ------------ +^^^^^^^^^^^ -.. coroutinemethod:: AbstractEventLoop.start_tls(transport, protocol, sslcontext, \*, server_side=False, server_hostname=None, ssl_handshake_timeout=None) +.. coroutinemethod:: loop.start_tls(transport, protocol, \ + sslcontext, \*, server_side=False, \ + server_hostname=None, ssl_handshake_timeout=None) - Upgrades an existing connection to TLS. + Upgrade an existing transport-based connection to TLS. - Returns a new transport instance, that the *protocol* must start using + Return a new transport instance, that the *protocol* must start using immediately after the *await*. The *transport* instance passed to the *start_tls* method should never be used again. Parameters: * *transport* and *protocol* instances that methods like - :meth:`~AbstractEventLoop.create_server` and - :meth:`~AbstractEventLoop.create_connection` return. + :meth:`~loop.create_server` and + :meth:`~loop.create_connection` return. * *sslcontext*: a configured instance of :class:`~ssl.SSLContext`. * *server_side* pass ``True`` when a server-side connection is being - upgraded (like the one created by :meth:`~AbstractEventLoop.create_server`). + upgraded (like the one created by :meth:`~loop.create_server`). * *server_hostname*: sets or overrides the host name that the target server's certificate will be matched against. - * *ssl_handshake_timeout* is (for an SSL connection) the time in seconds to - wait for the SSL handshake to complete before aborting the connection. + * *ssl_handshake_timeout* is (for a TLS connection) the time in seconds to + wait for the TLS handshake to complete before aborting the connection. ``60.0`` seconds if ``None`` (default). .. versionadded:: 3.7 -Watch file descriptors ----------------------- - -On Windows with :class:`SelectorEventLoop`, only socket handles are supported -(ex: pipe file descriptors are not supported). - -On Windows with :class:`ProactorEventLoop`, these methods are not supported. +Watching file descriptors +^^^^^^^^^^^^^^^^^^^^^^^^^ -.. method:: AbstractEventLoop.add_reader(fd, callback, \*args) +.. method:: loop.add_reader(fd, callback, \*args) - Start watching the file descriptor for read availability and then call the - *callback* with specified arguments. + Start monitoring the *fd* file descriptor for read availability and + invoke *callback* with the specified arguments once *fd* is available for + reading. - :ref:`Use functools.partial to pass keywords to the callback - <asyncio-pass-keywords>`. +.. method:: loop.remove_reader(fd) -.. method:: AbstractEventLoop.remove_reader(fd) + Stop monitoring the *fd* file descriptor for read availability. - Stop watching the file descriptor for read availability. +.. method:: loop.add_writer(fd, callback, \*args) -.. method:: AbstractEventLoop.add_writer(fd, callback, \*args) + Start monitoring the *fd* file descriptor for write availability and + invoke *callback* with the specified arguments once *fd* is available for + writing. - Start watching the file descriptor for write availability and then call the - *callback* with specified arguments. + Use :func:`functools.partial` :ref:`to pass keywords + <asyncio-pass-keywords>` to *func*. - :ref:`Use functools.partial to pass keywords to the callback - <asyncio-pass-keywords>`. +.. method:: loop.remove_writer(fd) -.. method:: AbstractEventLoop.remove_writer(fd) + Stop monitoring the *fd* file descriptor for write availability. - Stop watching the file descriptor for write availability. +See also :ref:`Platform Support <asyncio-platform-support>` section +for some limitations of these methods. -The :ref:`watch a file descriptor for read events <asyncio-watch-read-event>` -example uses the low-level :meth:`AbstractEventLoop.add_reader` method to register -the file descriptor of a socket. +Working with socket objects directly +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -Low-level socket operations ---------------------------- +In general, protocol implementations that use transport-based APIs +such as :meth:`loop.create_connection` and :meth:`loop.create_server` +are faster than implementations that work with sockets directly. +However, there are some use cases when performance is not critical, and +working with :class:`~socket.socket` objects directly is more +convenient. -.. coroutinemethod:: AbstractEventLoop.sock_recv(sock, nbytes) +.. coroutinemethod:: loop.sock_recv(sock, nbytes) - Receive data from the socket. Modeled after blocking - :meth:`socket.socket.recv` method. + Receive up to *nbytes* from *sock*. Asynchronous version of + :meth:`socket.recv() <socket.socket.recv>`. - The return value is a bytes object - representing the data received. The maximum amount of data to be received - at once is specified by *nbytes*. + Return the received data as a bytes object. - With :class:`SelectorEventLoop` event loop, the socket *sock* must be - non-blocking. + *sock* must be a non-blocking socket. .. versionchanged:: 3.7 - Even though the method was always documented as a coroutine - method, before Python 3.7 it returned a :class:`Future`. - Since Python 3.7, this is an ``async def`` method. + Even though this method was always documented as a coroutine + method, releases before Python 3.7 returned a :class:`Future`. + Since Python 3.7 this is an ``async def`` method. -.. coroutinemethod:: AbstractEventLoop.sock_recv_into(sock, buf) +.. coroutinemethod:: loop.sock_recv_into(sock, buf) - Receive data from the socket. Modeled after blocking - :meth:`socket.socket.recv_into` method. + Receive data from *sock* into the *buf* buffer. Modeled after the blocking + :meth:`socket.recv_into() <socket.socket.recv_into>` method. - The received data is written into *buf* (a writable buffer). - The return value is the number of bytes written. + Return the number of bytes written to the buffer. - With :class:`SelectorEventLoop` event loop, the socket *sock* must be - non-blocking. + *sock* must be a non-blocking socket. .. versionadded:: 3.7 -.. coroutinemethod:: AbstractEventLoop.sock_sendall(sock, data) +.. coroutinemethod:: loop.sock_sendall(sock, data) - Send data to the socket. Modeled after blocking - :meth:`socket.socket.sendall` method. + Send *data* to the *sock* socket. Asynchronous version of + :meth:`socket.sendall() <socket.socket.sendall>`. - The socket must be connected to a remote socket. - This method continues to send data from *data* until either all data has - been sent or an error occurs. ``None`` is returned on success. On error, - an exception is raised, and there is no way to determine how much data, if - any, was successfully processed by the receiving end of the connection. + This method continues to send to the socket until either all data + in *data* has been sent or an error occurs. ``None`` is returned + on success. On error, an exception is raised. Additionally, there is no way + to determine how much data, if any, was successfully processed by the + receiving end of the connection. - With :class:`SelectorEventLoop` event loop, the socket *sock* must be - non-blocking. + *sock* must be a non-blocking socket. .. versionchanged:: 3.7 Even though the method was always documented as a coroutine method, before Python 3.7 it returned an :class:`Future`. Since Python 3.7, this is an ``async def`` method. -.. coroutinemethod:: AbstractEventLoop.sock_connect(sock, address) +.. coroutinemethod:: loop.sock_connect(sock, address) - Connect to a remote socket at *address*. Modeled after - blocking :meth:`socket.socket.connect` method. + Connect *sock* to a remote socket at *address*. - With :class:`SelectorEventLoop` event loop, the socket *sock* must be - non-blocking. + Asynchronous version of :meth:`socket.connect() <socket.socket.connect>`. + + *sock* must be a non-blocking socket. .. versionchanged:: 3.5.2 ``address`` no longer needs to be resolved. ``sock_connect`` will try to check if the *address* is already resolved by calling :func:`socket.inet_pton`. If not, - :meth:`AbstractEventLoop.getaddrinfo` will be used to resolve the + :meth:`loop.getaddrinfo` will be used to resolve the *address*. .. seealso:: - :meth:`AbstractEventLoop.create_connection` + :meth:`loop.create_connection` and :func:`asyncio.open_connection() <open_connection>`. -.. coroutinemethod:: AbstractEventLoop.sock_accept(sock) +.. coroutinemethod:: loop.sock_accept(sock) - Accept a connection. Modeled after blocking - :meth:`socket.socket.accept`. + Accept a connection. Modeled after the blocking + :meth:`socket.accept() <socket.socket.accept>` method. The socket must be bound to an address and listening for connections. The return value is a pair ``(conn, address)`` where *conn* @@ -754,7 +841,7 @@ Low-level socket operations and *address* is the address bound to the socket on the other end of the connection. - The socket *sock* must be non-blocking. + *sock* must be a non-blocking socket. .. versionchanged:: 3.7 Even though the method was always documented as a coroutine @@ -763,51 +850,51 @@ Low-level socket operations .. seealso:: - :meth:`AbstractEventLoop.create_server` and :func:`start_server`. + :meth:`loop.create_server` and :func:`start_server`. -.. coroutinemethod:: AbstractEventLoop.sock_sendfile(sock, file, \ - offset=0, count=None, \ - *, fallback=True) +.. coroutinemethod:: loop.sock_sendfile(sock, file, offset=0, count=None, \ + \*, fallback=True) - Send a file using high-performance :mod:`os.sendfile` if possible - and return the total number of bytes which were sent. + Send a file using high-performance :mod:`os.sendfile` if possible. + Return the total number of bytes sent. - Asynchronous version of :meth:`socket.socket.sendfile`. + Asynchronous version of :meth:`socket.sendfile() <socket.socket.sendfile>`. - *sock* must be non-blocking :class:`~socket.socket` of - :const:`socket.SOCK_STREAM` type. + *sock* must be a non-blocking :const:`socket.SOCK_STREAM` + :class:`~socket.socket`. - *file* must be a regular file object opened in binary mode. + *file* must be a regular file object open in binary mode. *offset* tells from where to start reading the file. If specified, *count* is the total number of bytes to transmit as opposed to - sending the file until EOF is reached. File position is updated on - return or also in case of error in which case :meth:`file.tell() - <io.IOBase.tell>` can be used to figure out the number of bytes - which were sent. + sending the file until EOF is reached. File position is always updated, + even when this method raises an error, and + :meth:`file.tell() <io.IOBase.tell>` can be used to obtain the actual + number of bytes sent. - *fallback* set to ``True`` makes asyncio to manually read and send + *fallback*, when set to ``True``, makes asyncio manually read and send the file when the platform does not support the sendfile syscall (e.g. Windows or SSL socket on Unix). Raise :exc:`SendfileNotAvailableError` if the system does not support *sendfile* syscall and *fallback* is ``False``. + *sock* must be a non-blocking socket. + .. versionadded:: 3.7 -Resolve host name ------------------ +DNS +^^^ -.. coroutinemethod:: AbstractEventLoop.getaddrinfo(host, port, \*, family=0, type=0, proto=0, flags=0) +.. coroutinemethod:: loop.getaddrinfo(host, port, \*, family=0, \ + type=0, proto=0, flags=0) - This method is a :ref:`coroutine <coroutine>`, similar to - :meth:`socket.getaddrinfo` function but non-blocking. + Asynchronous version of :meth:`socket.getaddrinfo`. -.. coroutinemethod:: AbstractEventLoop.getnameinfo(sockaddr, flags=0) +.. coroutinemethod:: loop.getnameinfo(sockaddr, flags=0) - This method is a :ref:`coroutine <coroutine>`, similar to - :meth:`socket.getnameinfo` function but non-blocking. + Asynchronous version of :meth:`socket.getnameinfo`. .. versionchanged:: 3.7 Both *getaddrinfo* and *getnameinfo* methods were always documented @@ -816,141 +903,157 @@ Resolve host name both methods are coroutines. -Connect pipes -------------- +Working with pipes +^^^^^^^^^^^^^^^^^^ -On Windows with :class:`SelectorEventLoop`, these methods are not supported. -Use :class:`ProactorEventLoop` to support pipes on Windows. +.. coroutinemethod:: loop.connect_read_pipe(protocol_factory, pipe) -.. coroutinemethod:: AbstractEventLoop.connect_read_pipe(protocol_factory, pipe) + Register the read end of *pipe* in the event loop. - Register read pipe in eventloop. + *protocol_factory* must be a callable returning an + :ref:`asyncio protocol <asyncio-protocol>` implementation. - *protocol_factory* should instantiate object with :class:`Protocol` - interface. *pipe* is a :term:`file-like object <file object>`. - Return pair ``(transport, protocol)``, where *transport* supports the - :class:`ReadTransport` interface. + *pipe* is a :term:`file-like object <file object>`. + + Return pair ``(transport, protocol)``, where *transport* supports + the :class:`ReadTransport` interface and *protocol* is an object + instantiated by the *protocol_factory*. With :class:`SelectorEventLoop` event loop, the *pipe* is set to non-blocking mode. -.. coroutinemethod:: AbstractEventLoop.connect_write_pipe(protocol_factory, pipe) +.. coroutinemethod:: loop.connect_write_pipe(protocol_factory, pipe) + + Register the write end of *pipe* in the event loop. - Register write pipe in eventloop. + *protocol_factory* must be a callable returning an + :ref:`asyncio protocol <asyncio-protocol>` implementation. + + *pipe* is :term:`file-like object <file object>`. - *protocol_factory* should instantiate object with :class:`BaseProtocol` - interface. *pipe* is :term:`file-like object <file object>`. Return pair ``(transport, protocol)``, where *transport* supports - :class:`WriteTransport` interface. + :class:`WriteTransport` interface and *protocol* is an object + instantiated by the *protocol_factory*. With :class:`SelectorEventLoop` event loop, the *pipe* is set to non-blocking mode. -.. seealso:: +.. note:: - The :meth:`AbstractEventLoop.subprocess_exec` and - :meth:`AbstractEventLoop.subprocess_shell` methods. + :class:`SelectorEventLoop` does not support the above methods on + Windows. Use :class:`ProactorEventLoop` instead for Windows. +.. seealso:: + + The :meth:`loop.subprocess_exec` and + :meth:`loop.subprocess_shell` methods. -UNIX signals ------------- -Availability: UNIX only. +Unix signals +^^^^^^^^^^^^ -.. method:: AbstractEventLoop.add_signal_handler(signum, callback, \*args) +.. method:: loop.add_signal_handler(signum, callback, \*args) - Add a handler for a signal. + Set *callback* as the handler for the *signum* signal. Raise :exc:`ValueError` if the signal number is invalid or uncatchable. Raise :exc:`RuntimeError` if there is a problem setting up the handler. - :ref:`Use functools.partial to pass keywords to the callback - <asyncio-pass-keywords>`. + Use :func:`functools.partial` :ref:`to pass keywords + <asyncio-pass-keywords>` to *func*. + +.. method:: loop.remove_signal_handler(sig) -.. method:: AbstractEventLoop.remove_signal_handler(sig) + Remove the handler for the *sig* signal. - Remove a handler for a signal. + Return ``True`` if the signal handler was removed, or ``False`` if + no handler was set for the given signal. - Return ``True`` if a signal handler was removed, ``False`` if not. +Availability: Unix. .. seealso:: The :mod:`signal` module. -Executor --------- +Executing code in thread or process pools +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -Call a function in an :class:`~concurrent.futures.Executor` (pool of threads or -pool of processes). By default, an event loop uses a thread pool executor -(:class:`~concurrent.futures.ThreadPoolExecutor`). +.. coroutinemethod:: loop.run_in_executor(executor, func, \*args) -.. method:: AbstractEventLoop.run_in_executor(executor, func, \*args) + Arrange for *func* to be called in the specified executor. - Arrange for a *func* to be called in the specified executor. - - The *executor* argument should be an :class:`~concurrent.futures.Executor` + The *executor* argument should be an :class:`concurrent.futures.Executor` instance. The default executor is used if *executor* is ``None``. - :ref:`Use functools.partial to pass keywords to the *func* - <asyncio-pass-keywords>`. + Use :func:`functools.partial` :ref:`to pass keywords + <asyncio-pass-keywords>` to *func*. This method returns a :class:`asyncio.Future` object. .. versionchanged:: 3.5.3 - :meth:`BaseEventLoop.run_in_executor` no longer configures the + :meth:`loop.run_in_executor` no longer configures the ``max_workers`` of the thread pool executor it creates, instead leaving it up to the thread pool executor (:class:`~concurrent.futures.ThreadPoolExecutor`) to set the default. -.. method:: AbstractEventLoop.set_default_executor(executor) +.. method:: loop.set_default_executor(executor) + + Set *executor* as the default executor used by :meth:`run_in_executor`. + *executor* should be an instance of + :class:`~concurrent.futures.ThreadPoolExecutor`. - Set the default executor used by :meth:`run_in_executor`. + .. deprecated:: 3.7 + Using an executor that is not an instance of + :class:`~concurrent.futures.ThreadPoolExecutor` is deprecated and + will trigger an error in Python 3.9. + + *executor* must be an instance of + :class:`concurrent.futures.ThreadPoolExecutor`. Error Handling API ------------------- +^^^^^^^^^^^^^^^^^^ Allows customizing how exceptions are handled in the event loop. -.. method:: AbstractEventLoop.set_exception_handler(handler) +.. method:: loop.set_exception_handler(handler) Set *handler* as the new event loop exception handler. If *handler* is ``None``, the default exception handler will - be set. - - If *handler* is a callable object, it should have a - matching signature to ``(loop, context)``, where ``loop`` - will be a reference to the active event loop, ``context`` - will be a ``dict`` object (see :meth:`call_exception_handler` - documentation for details about context). + be set. Otherwise, *handler* must be a callable with the signature + matching ``(loop, context)``, where ``loop`` + is a reference to the active event loop, and ``context`` + is a ``dict`` object containing the details of the exception + (see :meth:`call_exception_handler` documentation for details + about context). -.. method:: AbstractEventLoop.get_exception_handler() +.. method:: loop.get_exception_handler() - Return the exception handler, or ``None`` if the default one - is in use. + Return the current exception handler, or ``None`` if no custom + exception handler was set. .. versionadded:: 3.5.2 -.. method:: AbstractEventLoop.default_exception_handler(context) +.. method:: loop.default_exception_handler(context) Default exception handler. This is called when an exception occurs and no exception - handler is set, and can be called by a custom exception - handler that wants to defer to the default behavior. + handler is set. This can be called by a custom exception + handler that wants to defer to the default handler behavior. *context* parameter has the same meaning as in :meth:`call_exception_handler`. -.. method:: AbstractEventLoop.call_exception_handler(context) +.. method:: loop.call_exception_handler(context) Call the current event loop exception handler. *context* is a ``dict`` object containing the following keys - (new keys may be introduced later): + (new keys may be introduced in future Python versions): * 'message': Error message; * 'exception' (optional): Exception object; @@ -962,14 +1065,14 @@ Allows customizing how exceptions are handled in the event loop. .. note:: - Note: this method should not be overloaded in subclassed - event loops. For any custom exception handling, use - :meth:`set_exception_handler()` method. + This method should not be overloaded in subclassed + event loops. For custom exception handling, use + the :meth:`set_exception_handler()` method. -Debug mode ----------- +Enabling debug mode +^^^^^^^^^^^^^^^^^^^ -.. method:: AbstractEventLoop.get_debug() +.. method:: loop.get_debug() Get the debug mode (:class:`bool`) of the event loop. @@ -977,29 +1080,173 @@ Debug mode :envvar:`PYTHONASYNCIODEBUG` is set to a non-empty string, ``False`` otherwise. - .. versionadded:: 3.4.2 - -.. method:: AbstractEventLoop.set_debug(enabled: bool) +.. method:: loop.set_debug(enabled: bool) Set the debug mode of the event loop. - .. versionadded:: 3.4.2 + .. versionchanged:: 3.7 + + The new ``-X dev`` command line option can now also be used + to enable the debug mode. .. seealso:: The :ref:`debug mode of asyncio <asyncio-debug-mode>`. -Server ------- -.. class:: Server +Running Subprocesses +^^^^^^^^^^^^^^^^^^^^ + +Methods described in this subsections are low-level. In regular +async/await code consider using the high-level +:func:`asyncio.create_subprocess_shell` and +:func:`asyncio.create_subprocess_exec` convenience functions instead. + +.. note:: + + The default asyncio event loop on **Windows** does not support + subprocesses. See :ref:`Subprocess Support on Windows + <asyncio-windows-subprocess>` for details. + +.. coroutinemethod:: loop.subprocess_exec(protocol_factory, \*args, \ + stdin=subprocess.PIPE, stdout=subprocess.PIPE, \ + stderr=subprocess.PIPE, \*\*kwargs) + + Create a subprocess from one or more string arguments specified by + *args*. + + *args* must be a list of strings represented by: + + * :class:`str`; + * or :class:`bytes`, encoded to the + :ref:`filesystem encoding <filesystem-encoding>`. + + The first string specifies the program executable, + and the remaining strings specify the arguments. Together, string + arguments form the ``argv`` of the program. + + This is similar to the standard library :class:`subprocess.Popen` + class called with ``shell=False`` and the list of strings passed as + the first argument; however, where :class:`~subprocess.Popen` takes + a single argument which is list of strings, *subprocess_exec* + takes multiple string arguments. + + The *protocol_factory* must be a callable returning a subclass of the + :class:`asyncio.SubprocessProtocol` class. + + Other parameters: + + * *stdin*: either a file-like object representing a pipe to be + connected to the subprocess's standard input stream using + :meth:`~loop.connect_write_pipe`, or the + :const:`subprocess.PIPE` constant (default). By default a new + pipe will be created and connected. + + * *stdout*: either a file-like object representing the pipe to be + connected to the subprocess's standard output stream using + :meth:`~loop.connect_read_pipe`, or the + :const:`subprocess.PIPE` constant (default). By default a new pipe + will be created and connected. + + * *stderr*: either a file-like object representing the pipe to be + connected to the subprocess's standard error stream using + :meth:`~loop.connect_read_pipe`, or one of + :const:`subprocess.PIPE` (default) or :const:`subprocess.STDOUT` + constants. + + By default a new pipe will be created and connected. When + :const:`subprocess.STDOUT` is specified, the subprocess' standard + error stream will be connected to the same pipe as the standard + output stream. + + * All other keyword arguments are passed to :class:`subprocess.Popen` + without interpretation, except for *bufsize*, *universal_newlines* + and *shell*, which should not be specified at all. + + See the constructor of the :class:`subprocess.Popen` class + for documentation on other arguments. + + Returns a pair of ``(transport, protocol)``, where *transport* + conforms to the :class:`asyncio.SubprocessTransport` base class and + *protocol* is an object instantiated by the *protocol_factory*. + +.. coroutinemethod:: loop.subprocess_shell(protocol_factory, cmd, \*, \ + stdin=subprocess.PIPE, stdout=subprocess.PIPE, \ + stderr=subprocess.PIPE, \*\*kwargs) + + Create a subprocess from *cmd*, which can be a :class:`str` or a + :class:`bytes` string encoded to the + :ref:`filesystem encoding <filesystem-encoding>`, + using the platform's "shell" syntax. + + This is similar to the standard library :class:`subprocess.Popen` + class called with ``shell=True``. + + The *protocol_factory* must be a callable returning a subclass of the + :class:`SubprocessProtocol` class. + + See :meth:`~loop.subprocess_exec` for more details about + the remaining arguments. + + Returns a pair of ``(transport, protocol)``, where *transport* + conforms to the :class:`SubprocessTransport` base class and + *protocol* is an object instantiated by the *protocol_factory*. + +.. note:: + It is the application's responsibility to ensure that all whitespace + and special characters are quoted appropriately to avoid `shell injection + <https://en.wikipedia.org/wiki/Shell_injection#Shell_injection>`_ + vulnerabilities. The :func:`shlex.quote` function can be used to + properly escape whitespace and special characters in strings that + are going to be used to construct shell commands. + + +Callback Handles +================ + +.. class:: Handle + + A callback wrapper object returned by :meth:`loop.call_soon`, + :meth:`loop.call_soon_threadsafe`. + + .. method:: cancel() + + Cancel the callback. If the callback has already been canceled + or executed, this method has no effect. + + .. method:: cancelled() + + Return ``True`` if the callback was cancelled. + + .. versionadded:: 3.7 + +.. class:: TimerHandle + + A callback wrapper object returned by :meth:`loop.call_later`, + and :meth:`loop.call_at`. + + This class is a subclass of :class:`Handle`. + + .. method:: when() + + Return a scheduled callback time as :class:`float` seconds. + + The time is an absolute timestamp, using the same time + reference as :meth:`loop.time`. + + .. versionadded:: 3.7 - Server listening on sockets. - Object created by :meth:`AbstractEventLoop.create_server`, - :meth:`AbstractEventLoop.create_unix_server`, :func:`start_server`, - and :func:`start_unix_server` functions. Don't instantiate the class - directly. +Server Objects +============== + +Server objects are created by :meth:`loop.create_server`, +:meth:`loop.create_unix_server`, :func:`start_server`, +and :func:`start_unix_server` functions. + +Do not instantiate the class directly. + +.. class:: Server *Server* objects are asynchronous context managers. When used in an ``async with`` statement, it's guaranteed that the Server object is @@ -1022,15 +1269,15 @@ Server Stop serving: close listening sockets and set the :attr:`sockets` attribute to ``None``. - The sockets that represent existing incoming client connections are left - open. + The sockets that represent existing incoming client connections + are left open. The server is closed asynchronously, use the :meth:`wait_closed` coroutine to wait until the server is closed. .. method:: get_loop() - Gives the event loop associated with the server object. + Return the event loop associated with the server object. .. versionadded:: 3.7 @@ -1041,12 +1288,12 @@ Server This method is idempotent, so it can be called when the server is already being serving. - The new *start_serving* keyword-only parameter to - :meth:`AbstractEventLoop.create_server` and - :meth:`asyncio.start_server` allows to create a Server object - that is not accepting connections right away. In which case - this method, or :meth:`Server.serve_forever` can be used - to make the Server object to start accepting connections. + The *start_serving* keyword-only parameter to + :meth:`loop.create_server` and + :meth:`asyncio.start_server` allows creating a Server object + that is not accepting connections initially. In this case + ``Server.start_serving()``, or :meth:`Server.serve_forever` can be used + to make the Server start accepting connections. .. versionadded:: 3.7 @@ -1088,78 +1335,99 @@ Server .. attribute:: sockets - List of :class:`socket.socket` objects the server is listening to, or - ``None`` if the server is closed. + List of :class:`socket.socket` objects the server is listening on, + or ``None`` if the server is closed. .. versionchanged:: 3.7 - Prior to Python 3.7 ``Server.sockets`` used to return the - internal list of server's sockets directly. In 3.7 a copy + Prior to Python 3.7 ``Server.sockets`` used to return an + internal list of server sockets directly. In 3.7 a copy of that list is returned. -Handle ------- +.. _asyncio-event-loops: -.. class:: Handle +Event Loop Implementations +========================== - A callback wrapper object returned by :func:`AbstractEventLoop.call_soon`, - :func:`AbstractEventLoop.call_soon_threadsafe`. +asyncio ships with two different event loop implementations: +:class:`SelectorEventLoop` and :class:`ProactorEventLoop`. - .. method:: cancel() +By default asyncio is configured to use :class:`SelectorEventLoop` +on all platforms. - Cancel the call. If the callback is already canceled or executed, - this method has no effect. - .. method:: cancelled() +.. class:: SelectorEventLoop - Return ``True`` if the call was cancelled. + An event loop based on the :mod:`selectors` module. - .. versionadded:: 3.7 + Uses the most efficient *selector* available for the given + platform. It is also possible to manually configure the + exact selector implementation to be used:: -.. class:: TimerHandle + import asyncio + import selectors - A callback wrapper object returned by :func:`AbstractEventLoop.call_later`, - and :func:`AbstractEventLoop.call_at`. + selector = selectors.SelectSelector() + loop = asyncio.SelectorEventLoop(selector) + asyncio.set_event_loop(loop) - The class is inherited from :class:`Handle`. - .. method:: when() + Availability: Unix, Windows. - Return a scheduled callback time as :class:`float` seconds. - The time is an absolute timestamp, using the same time - reference as :meth:`AbstractEventLoop.time`. +.. class:: ProactorEventLoop - .. versionadded:: 3.7 + An event loop for Windows that uses "I/O Completion Ports" (IOCP). + + Availability: Windows. + An example how to use :class:`ProactorEventLoop` on Windows:: -SendfileNotAvailableError -------------------------- + import asyncio + import sys + if sys.platform == 'win32': + loop = asyncio.ProactorEventLoop() + asyncio.set_event_loop(loop) + + .. seealso:: + + `MSDN documentation on I/O Completion Ports + <https://docs.microsoft.com/en-ca/windows/desktop/FileIO/i-o-completion-ports>`_. + + +.. class:: AbstractEventLoop -.. exception:: SendfileNotAvailableError + Abstract base class for asyncio-compliant event loops. - Sendfile syscall is not available, subclass of :exc:`RuntimeError`. + The :ref:`Event Loop Methods <asyncio-event-loop>` section lists all + methods that an alternative implementation of ``AbstractEventLoop`` + should have defined. - Raised if the OS does not support sendfile syscall for - given socket or file type. +Examples +======== -Event loop examples -------------------- +Note that all examples in this section **purposefully** show how +to use the low-level event loop APIs, such as :meth:`loop.run_forever` +and :meth:`loop.call_soon`. Modern asyncio applications rarely +need to be written this way; consider using the high-level functions +like :func:`asyncio.run`. -.. _asyncio-hello-world-callback: + +.. _asyncio_example_lowlevel_helloworld: Hello World with call_soon() ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -Example using the :meth:`AbstractEventLoop.call_soon` method to schedule a -callback. The callback displays ``"Hello World"`` and then stops the event -loop:: +An example using the :meth:`loop.call_soon` method to schedule a +callback. The callback displays ``"Hello World"`` and then stops the +event loop:: import asyncio def hello_world(loop): + """A callback to print 'Hello World' and stop the event loop""" print('Hello World') loop.stop() @@ -1169,23 +1437,25 @@ loop:: loop.call_soon(hello_world, loop) # Blocking call interrupted by loop.stop() - loop.run_forever() - loop.close() + try: + loop.run_forever() + finally: + loop.close() .. seealso:: - The :ref:`Hello World coroutine <asyncio-hello-world-coroutine>` example - uses a :ref:`coroutine <coroutine>`. + A similar :ref:`Hello World <coroutine>` + example created with a coroutine and the :func:`run` function. -.. _asyncio-date-callback: +.. _asyncio_example_call_later: Display the current date with call_later() ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -Example of callback displaying the current date every second. The callback uses -the :meth:`AbstractEventLoop.call_later` method to reschedule itself during 5 -seconds, and then stops the event loop:: +An example of a callback displaying the current date every second. The +callback uses the :meth:`loop.call_later` method to reschedule itself +after 5 seconds, and then stops the event loop:: import asyncio import datetime @@ -1204,36 +1474,40 @@ seconds, and then stops the event loop:: loop.call_soon(display_date, end_time, loop) # Blocking call interrupted by loop.stop() - loop.run_forever() - loop.close() + try: + loop.run_forever() + finally: + loop.close() .. seealso:: - The :ref:`coroutine displaying the current date - <asyncio-date-coroutine>` example uses a :ref:`coroutine - <coroutine>`. + A similar :ref:`current date <asyncio_example_sleep>` example + created with a coroutine and the :func:`run` function. -.. _asyncio-watch-read-event: +.. _asyncio_example_watch_fd: Watch a file descriptor for read events ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Wait until a file descriptor received some data using the -:meth:`AbstractEventLoop.add_reader` method and then close the event loop:: +:meth:`loop.add_reader` method and then close the event loop:: import asyncio from socket import socketpair # Create a pair of connected file descriptors rsock, wsock = socketpair() + loop = asyncio.get_event_loop() def reader(): data = rsock.recv(100) print("Received:", data.decode()) + # We are done: unregister the file descriptor loop.remove_reader(rsock) + # Stop the event loop loop.stop() @@ -1243,30 +1517,35 @@ Wait until a file descriptor received some data using the # Simulate the reception of data from the network loop.call_soon(wsock.send, 'abc'.encode()) - # Run the event loop - loop.run_forever() - - # We are done, close sockets and the event loop - rsock.close() - wsock.close() - loop.close() + try: + # Run the event loop + loop.run_forever() + finally: + # We are done. Close sockets and the event loop. + rsock.close() + wsock.close() + loop.close() .. seealso:: - The :ref:`register an open socket to wait for data using a protocol - <asyncio-register-socket>` example uses a low-level protocol created by the - :meth:`AbstractEventLoop.create_connection` method. + * A similar :ref:`example <asyncio_example_create_connection>` + using transports, protocols, and the + :meth:`loop.create_connection` method. - The :ref:`register an open socket to wait for data using streams - <asyncio-register-socket-streams>` example uses high-level streams - created by the :func:`open_connection` function in a coroutine. + * Another similar :ref:`example <asyncio_example_create_connection-streams>` + using the high-level :func:`asyncio.open_connection` function + and streams. +.. _asyncio_example_unix_signals: + Set signal handlers for SIGINT and SIGTERM ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -Register handlers for signals :py:data:`SIGINT` and :py:data:`SIGTERM` using -the :meth:`AbstractEventLoop.add_signal_handler` method:: +(This ``signals`` example only works on Unix.) + +Register handlers for signals :py:data:`SIGINT` and :py:data:`SIGTERM` +using the :meth:`loop.add_signal_handler` method:: import asyncio import functools @@ -1277,16 +1556,17 @@ the :meth:`AbstractEventLoop.add_signal_handler` method:: print("got signal %s: exit" % signame) loop.stop() - loop = asyncio.get_event_loop() - for signame in ('SIGINT', 'SIGTERM'): - loop.add_signal_handler(getattr(signal, signame), - functools.partial(ask_exit, signame)) + async def main(): + loop = asyncio.get_running_loop() - print("Event loop running forever, press Ctrl+C to interrupt.") - print("pid %s: send SIGINT or SIGTERM to exit." % os.getpid()) - try: - loop.run_forever() - finally: - loop.close() + for signame in {'SIGINT', 'SIGTERM'}: + loop.add_signal_handler( + getattr(signal, signame), + functools.partial(ask_exit, signame)) + + await asyncio.sleep(3600) + + print("Event loop running for 1 hour, press Ctrl+C to interrupt.") + print(f"pid {os.getpid()}: send SIGINT or SIGTERM to exit.") -This example only works on UNIX. + asyncio.run(main()) diff --git a/Doc/library/asyncio-eventloops.rst b/Doc/library/asyncio-eventloops.rst deleted file mode 100644 index 7f6e9535a8..0000000000 --- a/Doc/library/asyncio-eventloops.rst +++ /dev/null @@ -1,244 +0,0 @@ -.. currentmodule:: asyncio - -Event loops -=========== - -**Source code:** :source:`Lib/asyncio/events.py` - -Event loop functions --------------------- - -The following functions are convenient shortcuts to accessing the methods of the -global policy. Note that this provides access to the default policy, unless an -alternative policy was set by calling :func:`set_event_loop_policy` earlier in -the execution of the process. - -.. function:: get_event_loop() - - Equivalent to calling ``get_event_loop_policy().get_event_loop()``. - -.. function:: set_event_loop(loop) - - Equivalent to calling ``get_event_loop_policy().set_event_loop(loop)``. - -.. function:: new_event_loop() - - Equivalent to calling ``get_event_loop_policy().new_event_loop()``. - -.. function:: get_running_loop() - - Return the running event loop in the current OS thread. If there - is no running event loop a :exc:`RuntimeError` is raised. - - .. versionadded:: 3.7 - - -.. _asyncio-event-loops: - -Available event loops ---------------------- - -asyncio currently provides two implementations of event loops: -:class:`SelectorEventLoop` and :class:`ProactorEventLoop`. - -.. class:: SelectorEventLoop - - Event loop based on the :mod:`selectors` module. Subclass of - :class:`AbstractEventLoop`. - - Use the most efficient selector available on the platform. - - On Windows, only sockets are supported (ex: pipes are not supported): - see the `MSDN documentation of select - <https://msdn.microsoft.com/en-us/library/windows/desktop/ms740141%28v=vs.85%29.aspx>`_. - -.. class:: ProactorEventLoop - - Proactor event loop for Windows using "I/O Completion Ports" aka IOCP. - Subclass of :class:`AbstractEventLoop`. - - Availability: Windows. - - .. seealso:: - - `MSDN documentation on I/O Completion Ports - <https://msdn.microsoft.com/en-us/library/windows/desktop/aa365198%28v=vs.85%29.aspx>`_. - -Example to use a :class:`ProactorEventLoop` on Windows:: - - import asyncio, sys - - if sys.platform == 'win32': - loop = asyncio.ProactorEventLoop() - asyncio.set_event_loop(loop) - -.. _asyncio-platform-support: - -Platform support ----------------- - -The :mod:`asyncio` module has been designed to be portable, but each platform -still has subtle differences and may not support all :mod:`asyncio` features. - -Windows -^^^^^^^ - -Common limits of Windows event loops: - -- :meth:`~AbstractEventLoop.create_unix_connection` and - :meth:`~AbstractEventLoop.create_unix_server` are not supported: the socket - family :data:`socket.AF_UNIX` is specific to UNIX -- :meth:`~AbstractEventLoop.add_signal_handler` and - :meth:`~AbstractEventLoop.remove_signal_handler` are not supported -- :meth:`EventLoopPolicy.set_child_watcher` is not supported. - :class:`ProactorEventLoop` supports subprocesses. It has only one - implementation to watch child processes, there is no need to configure it. - -:class:`SelectorEventLoop` specific limits: - -- :class:`~selectors.SelectSelector` is used which only supports sockets - and is limited to 512 sockets. -- :meth:`~AbstractEventLoop.add_reader` and :meth:`~AbstractEventLoop.add_writer` only - accept file descriptors of sockets -- Pipes are not supported - (ex: :meth:`~AbstractEventLoop.connect_read_pipe`, - :meth:`~AbstractEventLoop.connect_write_pipe`) -- :ref:`Subprocesses <asyncio-subprocess>` are not supported - (ex: :meth:`~AbstractEventLoop.subprocess_exec`, - :meth:`~AbstractEventLoop.subprocess_shell`) - -:class:`ProactorEventLoop` specific limits: - -- :meth:`~AbstractEventLoop.create_datagram_endpoint` (UDP) is not supported -- :meth:`~AbstractEventLoop.add_reader` and :meth:`~AbstractEventLoop.add_writer` are - not supported - -The resolution of the monotonic clock on Windows is usually around 15.6 msec. -The best resolution is 0.5 msec. The resolution depends on the hardware -(availability of `HPET -<https://en.wikipedia.org/wiki/High_Precision_Event_Timer>`_) and on the Windows -configuration. See :ref:`asyncio delayed calls <asyncio-delayed-calls>`. - -.. versionchanged:: 3.5 - - :class:`ProactorEventLoop` now supports SSL. - - -Mac OS X -^^^^^^^^ - -Character devices like PTY are only well supported since Mavericks (Mac OS -10.9). They are not supported at all on Mac OS 10.5 and older. - -On Mac OS 10.6, 10.7 and 10.8, the default event loop is -:class:`SelectorEventLoop` which uses :class:`selectors.KqueueSelector`. -:class:`selectors.KqueueSelector` does not support character devices on these -versions. The :class:`SelectorEventLoop` can be used with -:class:`~selectors.SelectSelector` or :class:`~selectors.PollSelector` to -support character devices on these versions of Mac OS X. Example:: - - import asyncio - import selectors - - selector = selectors.SelectSelector() - loop = asyncio.SelectorEventLoop(selector) - asyncio.set_event_loop(loop) - - -Event loop policies and the default policy ------------------------------------------- - -Event loop management is abstracted with a *policy* pattern, to provide maximal -flexibility for custom platforms and frameworks. Throughout the execution of a -process, a single global policy object manages the event loops available to the -process based on the calling context. A policy is an object implementing the -:class:`AbstractEventLoopPolicy` interface. - -For most users of :mod:`asyncio`, policies never have to be dealt with -explicitly, since the default global policy is sufficient (see below). - -The module-level functions -:func:`get_event_loop` and :func:`set_event_loop` provide convenient access to -event loops managed by the default policy. - - -Event loop policy interface ---------------------------- - -An event loop policy must implement the following interface: - -.. class:: AbstractEventLoopPolicy - - Event loop policy. - - .. method:: get_event_loop() - - Get the event loop for the current context. - - Returns an event loop object implementing the :class:`AbstractEventLoop` - interface. In case called from coroutine, it returns the currently - running event loop. - - Raises an exception in case no event loop has been set for the current - context and the current policy does not specify to create one. It must - never return ``None``. - - .. versionchanged:: 3.6 - - .. method:: set_event_loop(loop) - - Set the event loop for the current context to *loop*. - - .. method:: new_event_loop() - - Create and return a new event loop object according to this policy's - rules. - - If there's need to set this loop as the event loop for the current - context, :meth:`set_event_loop` must be called explicitly. - - -The default policy defines context as the current thread, and manages an event -loop per thread that interacts with :mod:`asyncio`. An exception to this rule -happens when :meth:`~AbstractEventLoopPolicy.get_event_loop` is called from a -running future/coroutine, in which case it will return the current loop -running that future/coroutine. - -If the current thread doesn't already have an event loop associated with it, -the default policy's :meth:`~AbstractEventLoopPolicy.get_event_loop` method -creates one when called from the main thread, but raises :exc:`RuntimeError` -otherwise. - - -Access to the global loop policy --------------------------------- - -.. function:: get_event_loop_policy() - - Get the current event loop policy. - -.. function:: set_event_loop_policy(policy) - - Set the current event loop policy. If *policy* is ``None``, the default - policy is restored. - - -Customizing the event loop policy ---------------------------------- - -To implement a new event loop policy, it is recommended you subclass the -concrete default event loop policy :class:`DefaultEventLoopPolicy` -and override the methods for which you want to change behavior, for example:: - - class MyEventLoopPolicy(asyncio.DefaultEventLoopPolicy): - - def get_event_loop(self): - """Get the event loop. - - This may be None or an instance of EventLoop. - """ - loop = super().get_event_loop() - # Do something with loop ... - return loop - - asyncio.set_event_loop_policy(MyEventLoopPolicy()) diff --git a/Doc/library/asyncio-exceptions.rst b/Doc/library/asyncio-exceptions.rst new file mode 100644 index 0000000000..dbd5df7208 --- /dev/null +++ b/Doc/library/asyncio-exceptions.rst @@ -0,0 +1,91 @@ +.. currentmodule:: asyncio + + +.. _asyncio-exceptions: + +========== +Exceptions +========== + + +.. exception:: TimeoutError + + The operation has exceeded the given deadline. + + .. important:: + This exception is different from the builtin :exc:`TimeoutError` + exception. + + +.. exception:: CancelledError + + The operation has been cancelled. + + This exception can be caught to perform custom operations + when asyncio Tasks are cancelled. In almost all situations the + exception must be re-raised. + + .. important:: + + This exception is a subclass of :exc:`Exception`, so it can be + accidentally suppressed by an overly broad ``try..except`` block:: + + try: + await operation + except Exception: + # The cancellation is broken because the *except* block + # suppresses the CancelledError exception. + log.log('an error has occurred') + + Instead, the following pattern should be used:: + + try: + await operation + except asyncio.CancelledError: + raise + except Exception: + log.log('an error has occurred') + + +.. exception:: InvalidStateError + + Invalid internal state of :class:`Task` or :class:`Future`. + + Can be raised in situations like setting a result value for a + *Future* object that already has a result value set. + + +.. exception:: SendfileNotAvailableError + + The "sendfile" syscall is not available for the given + socket or file type. + + A subclass of :exc:`RuntimeError`. + + +.. exception:: IncompleteReadError + + The requested read operation did not complete fully. + + Raised by the :ref:`asyncio stream APIs<asyncio-streams>`. + + This exception is a subclass of :exc:`EOFError`. + + .. attribute:: expected + + The total number (:class:`int`) of expected bytes. + + .. attribute:: partial + + A string of :class:`bytes` read before the end of stream was reached. + + +.. exception:: LimitOverrunError + + Reached the buffer size limit while looking for a separator. + + Raised by the :ref:`asyncio stream APIs <asyncio-streams>`. + + .. attribute:: consumed + + The total number of to be consumed bytes. diff --git a/Doc/library/asyncio-future.rst b/Doc/library/asyncio-future.rst new file mode 100644 index 0000000000..d6c5335c0e --- /dev/null +++ b/Doc/library/asyncio-future.rst @@ -0,0 +1,250 @@ +.. currentmodule:: asyncio + + +.. _asyncio-futures: + +======= +Futures +======= + +*Future* objects are used to bridge low-level callback-based code +with high-level async/await code. + + +Future Functions +================ + +.. function:: isfuture(obj) + + Return ``True`` if *obj* is either of: + + * an instance of :class:`asyncio.Future`, + * an instance of :class:`asyncio.Task`, + * a Future-like object with a ``_asyncio_future_blocking`` + attribute. + + .. versionadded:: 3.5 + + +.. function:: ensure_future(obj, \*, loop=None) + + Return: + + * *obj* argument as is, if *obj* is a :class:`Future`, + a :class:`Task`, or a Future-like object (:func:`isfuture` + is used for the test.) + + * a :class:`Task` object wrapping *obj*, if *obj* is a + coroutine (:func:`iscoroutine` is used for the test.) + + * a :class:`Task` object that would await on *obj*, if *obj* is an + awaitable (:func:`inspect.isawaitable` is used for the test.) + + If *obj* is neither of the above a :exc:`TypeError` is raised. + + .. important:: + + See also the :func:`create_task` function which is the + preferred way for creating new Tasks. + + .. versionchanged:: 3.5.1 + The function accepts any :term:`awaitable` object. + + +.. function:: wrap_future(future, \*, loop=None) + + Wrap a :class:`concurrent.futures.Future` object in a + :class:`asyncio.Future` object. + + +Future Object +============= + +.. class:: Future(\*, loop=None) + + A Future represents an eventual result of an asynchronous + operation. Not thread-safe. + + Future is an :term:`awaitable` object. Coroutines can await on + Future objects until they either have a result or an exception + set, or until they are cancelled. + + Typically Futures are used to enable low-level + callback-based code (e.g. in protocols implemented using asyncio + :ref:`transports <asyncio-transports-protocols>`) + to interoperate with high-level async/await code. + + The rule of thumb is to never expose Future objects in user-facing + APIs, and the recommended way to create a Future object is to call + :meth:`loop.create_future`. This way alternative event loop + implementations can inject their own optimized implementations + of a Future object. + + .. versionchanged:: 3.7 + Added support for the :mod:`contextvars` module. + + .. method:: result() + + Return the result of the Future. + + If the Future is *done* and has a result set by the + :meth:`set_result` method, the result value is returned. + + If the Future is *done* and has an exception set by the + :meth:`set_exception` method, this method raises the exception. + + If the Future has been *cancelled*, this method raises + a :exc:`CancelledError` exception. + + If the Future's result isn't yet available, this method raises + a :exc:`InvalidStateError` exception. + + .. method:: set_result(result) + + Mark the Future as *done* and set its result. + + Raises a :exc:`InvalidStateError` error if the Future is + already *done*. + + .. method:: set_exception(exception) + + Mark the Future as *done* and set an exception. + + Raises a :exc:`InvalidStateError` error if the Future is + already *done*. + + .. method:: done() + + Return ``True`` if the Future is *done*. + + A Future is *done* if it was *cancelled* or if it has a result + or an exception set with :meth:`set_result` or + :meth:`set_exception` calls. + + .. method:: cancelled() + + Return ``True`` if the Future was *cancelled*. + + The method is usually used to check if a Future is not + *cancelled* before setting a result or an exception for it:: + + if not fut.cancelled(): + fut.set_result(42) + + .. method:: add_done_callback(callback, *, context=None) + + Add a callback to be run when the Future is *done*. + + The *callback* is called with the Future object as its only + argument. + + If the Future is already *done* when this method is called, + the callback is scheduled with :meth:`loop.call_soon`. + + An optional keyword-only *context* argument allows specifying a + custom :class:`contextvars.Context` for the *callback* to run in. + The current context is used when no *context* is provided. + + :func:`functools.partial` can be used to pass parameters + to the callback, e.g.:: + + # Call 'print("Future:", fut)' when "fut" is done. + fut.add_done_callback( + functools.partial(print, "Future:")) + + .. versionchanged:: 3.7 + The *context* keyword-only parameter was added. + See :pep:`567` for more details. + + .. method:: remove_done_callback(callback) + + Remove *callback* from the callbacks list. + + Returns the number of callbacks removed, which is typically 1, + unless a callback was added more than once. + + .. method:: cancel() + + Cancel the Future and schedule callbacks. + + If the Future is already *done* or *cancelled*, return ``False``. + Otherwise, change the Future's state to *cancelled*, + schedule the callbacks, and return ``True``. + + .. method:: exception() + + Return the exception that was set on this Future. + + The exception (or ``None`` if no exception was set) is + returned only if the Future is *done*. + + If the Future has been *cancelled*, this method raises a + :exc:`CancelledError` exception. + + If the Future isn't *done* yet, this method raises an + :exc:`InvalidStateError` exception. + + .. method:: get_loop() + + Return the event loop the Future object is bound to. + + .. versionadded:: 3.7 + + +.. _asyncio_example_future: + +This example creates a Future object, creates and schedules an +asynchronous Task to set result for the Future, and waits until +the Future has a result:: + + async def set_after(fut, delay, value): + # Sleep for *delay* seconds. + await asyncio.sleep(delay) + + # Set *value* as a result of *fut* Future. + fut.set_result(value) + + async def main(): + # Get the current event loop. + loop = asyncio.get_running_loop() + + # Create a new Future object. + fut = loop.create_future() + + # Run "set_after()" coroutine in a parallel Task. + # We are using the low-level "loop.create_task()" API here because + # we already have a reference to the event loop at hand. + # Otherwise we could have just used "asyncio.create_task()". + loop.create_task( + set_after(fut, 1, '... world')) + + print('hello ...') + + # Wait until *fut* has a result (1 second) and print it. + print(await fut) + + asyncio.run(main()) + + +.. important:: + + The Future object was designed to mimic + :class:`concurrent.futures.Future`. Key differences include: + + - unlike asyncio Futures, :class:`concurrent.futures.Future` + instances cannot be awaited. + + - :meth:`asyncio.Future.result` and :meth:`asyncio.Future.exception` + do not accept the *timeout* argument. + + - :meth:`asyncio.Future.result` and :meth:`asyncio.Future.exception` + raise an :exc:`InvalidStateError` exception when the Future is not + *done*. + + - Callbacks registered with :meth:`asyncio.Future.add_done_callback` + are not called immediately. They are scheduled with + :meth:`loop.call_soon` instead. + + - asyncio Future is not compatible with the + :func:`concurrent.futures.wait` and + :func:`concurrent.futures.as_completed` functions. diff --git a/Doc/library/asyncio-llapi-index.rst b/Doc/library/asyncio-llapi-index.rst new file mode 100644 index 0000000000..7fb1e60f50 --- /dev/null +++ b/Doc/library/asyncio-llapi-index.rst @@ -0,0 +1,510 @@ +.. currentmodule:: asyncio + + +=================== +Low-level API Index +=================== + +This page lists all low-level asyncio APIs. + + +Obtaining the Event Loop +======================== + +.. list-table:: + :widths: 50 50 + :class: full-width-table + + * - :func:`asyncio.get_running_loop` + - The **preferred** function to get the running event loop. + + * - :func:`asyncio.get_event_loop` + - Get an event loop instance (current or via the policy). + + * - :func:`asyncio.set_event_loop` + - Set the event loop as current via the current policy. + + * - :func:`asyncio.new_event_loop` + - Create a new event loop. + + +.. rubric:: Examples + +* :ref:`Using asyncio.get_running_loop() <asyncio_example_future>`. + + +Event Loop Methods +================== + +See also the main documentation section about the +:ref:`event loop methods <asyncio-event-loop>`. + +.. rubric:: Lifecycle +.. list-table:: + :widths: 50 50 + :class: full-width-table + + * - :meth:`loop.run_until_complete` + - Run a Future/Task/awaitable until complete. + + * - :meth:`loop.run_forever` + - Run the event loop forever. + + * - :meth:`loop.stop` + - Stop the event loop. + + * - :meth:`loop.stop` + - Close the event loop. + + * - :meth:`loop.is_running()` + - Return ``True`` if the event loop is running. + + * - :meth:`loop.is_closed()` + - Return ``True`` if the event loop is closed. + + * - ``await`` :meth:`loop.shutdown_asyncgens` + - Close asynchronous generators. + + +.. rubric:: Debugging +.. list-table:: + :widths: 50 50 + :class: full-width-table + + * - :meth:`loop.set_debug` + - Enable or disable the debug mode. + + * - :meth:`loop.get_debug` + - Get the current debug mode. + + +.. rubric:: Scheduling Callbacks +.. list-table:: + :widths: 50 50 + :class: full-width-table + + * - :meth:`loop.call_soon` + - Invoke a callback soon. + + * - :meth:`loop.call_soon_threadsafe` + - A thread-safe variant of :meth:`loop.call_soon`. + + * - :meth:`loop.call_later` + - Invoke a callback *after* the given time. + + * - :meth:`loop.call_at` + - Invoke a callback *at* the given time. + + +.. rubric:: Thread/Process Pool +.. list-table:: + :widths: 50 50 + :class: full-width-table + + * - ``await`` :meth:`loop.run_in_executor` + - Run a CPU-bound or other blocking function in + a :mod:`concurrent.futures` executor. + + * - :meth:`loop.set_default_executor` + - Set the default executor for :meth:`loop.run_in_executor`. + + +.. rubric:: Tasks and Futures +.. list-table:: + :widths: 50 50 + :class: full-width-table + + * - :meth:`loop.create_future` + - Create a :class:`Future` object. + + * - :meth:`loop.create_task` + - Schedule coroutine as a :class:`Task`. + + * - :meth:`loop.set_task_factory` + - Set a factory used by :meth:`loop.create_task` to + create :class:`Tasks <Task>`. + + * - :meth:`loop.get_task_factory` + - Get the factory :meth:`loop.create_task` uses + to create :class:`Tasks <Task>`. + + +.. rubric:: DNS +.. list-table:: + :widths: 50 50 + :class: full-width-table + + * - ``await`` :meth:`loop.getaddrinfo` + - Asynchronous version of :meth:`socket.getaddrinfo`. + + * - ``await`` :meth:`loop.getnameinfo` + - Asynchronous version of :meth:`socket.getnameinfo`. + + +.. rubric:: Networking and IPC +.. list-table:: + :widths: 50 50 + :class: full-width-table + + * - ``await`` :meth:`loop.create_connection` + - Open a TCP connection. + + * - ``await`` :meth:`loop.create_server` + - Create a TCP server. + + * - ``await`` :meth:`loop.create_unix_connection` + - Open a Unix socket connection. + + * - ``await`` :meth:`loop.create_unix_server` + - Create a Unix socket server. + + * - ``await`` :meth:`loop.connect_accepted_socket` + - Wrap a :class:`~socket.socket` into a ``(transport, protocol)`` + pair. + + * - ``await`` :meth:`loop.create_datagram_endpoint` + - Open a datagram (UDP) connection. + + * - ``await`` :meth:`loop.sendfile` + - Send a file over a transport. + + * - ``await`` :meth:`loop.start_tls` + - Upgrade an existing connection to TLS. + + * - ``await`` :meth:`loop.connect_read_pipe` + - Wrap a read end of a pipe into a ``(transport, protocol)`` pair. + + * - ``await`` :meth:`loop.connect_write_pipe` + - Wrap a write end of a pipe into a ``(transport, protocol)`` pair. + + +.. rubric:: Sockets +.. list-table:: + :widths: 50 50 + :class: full-width-table + + * - ``await`` :meth:`loop.sock_recv` + - Receive data from the :class:`~socket.socket`. + + * - ``await`` :meth:`loop.sock_recv_into` + - Receive data from the :class:`~socket.socket` into a buffer. + + * - ``await`` :meth:`loop.sock_sendall` + - Send data to the :class:`~socket.socket`. + + * - ``await`` :meth:`loop.sock_connect` + - Connect the :class:`~socket.socket`. + + * - ``await`` :meth:`loop.sock_accept` + - Accept a :class:`~socket.socket` connection. + + * - ``await`` :meth:`loop.sock_sendfile` + - Send a file over the :class:`~socket.socket`. + + * - :meth:`loop.add_reader` + - Start watching a file descriptor for read availability. + + * - :meth:`loop.remove_reader` + - Stop watching a file descriptor for read availability. + + * - :meth:`loop.add_writer` + - Start watching a file descriptor for write availability. + + * - :meth:`loop.remove_writer` + - Stop watching a file descriptor for write availability. + + +.. rubric:: Unix Signals +.. list-table:: + :widths: 50 50 + :class: full-width-table + + * - :meth:`loop.add_signal_handler` + - Add a handler for a :mod:`signal`. + + * - :meth:`loop.remove_signal_handler` + - Remove a handler for a :mod:`signal`. + + +.. rubric:: Subprocesses +.. list-table:: + :widths: 50 50 + :class: full-width-table + + * - :meth:`loop.subprocess_exec` + - Spawn a subprocess. + + * - :meth:`loop.subprocess_shell` + - Spawn a subprocess from a shell command. + + +.. rubric:: Error Handling +.. list-table:: + :widths: 50 50 + :class: full-width-table + + * - :meth:`loop.call_exception_handler` + - Call the exception handler. + + * - :meth:`loop.set_exception_handler` + - Set a new exception handler. + + * - :meth:`loop.get_exception_handler` + - Get the current exception handler. + + * - :meth:`loop.default_exception_handler` + - The default exception handler implementation. + + +.. rubric:: Examples + +* :ref:`Using asyncio.get_event_loop() and loop.run_forever() + <asyncio_example_lowlevel_helloworld>`. + +* :ref:`Using loop.call_later() <asyncio_example_call_later>`. + +* Using ``loop.create_connection()`` to implement + :ref:`an echo-client <asyncio_example_tcp_echo_client_protocol>`. + +* Using ``loop.create_connection()`` to + :ref:`connect a socket <asyncio_example_create_connection>`. + +* :ref:`Using add_reader() to watch an FD for read events + <asyncio_example_watch_fd>`. + +* :ref:`Using loop.add_signal_handler() <asyncio_example_unix_signals>`. + +* :ref:`Using loop.subprocess_exec() <asyncio_example_subprocess_proto>`. + + +Transports +========== + +All transports implement the following methods: + +.. list-table:: + :widths: 50 50 + :class: full-width-table + + * - :meth:`transport.close() <BaseTransport.close>` + - Close the transport. + + * - :meth:`transport.is_closing() <BaseTransport.is_closing>` + - Return ``True`` if the transport is closing or is closed. + + * - :meth:`transport.get_extra_info() <BaseTransport.get_extra_info>` + - Request for information about the transport. + + * - :meth:`transport.set_protocol() <BaseTransport.set_protocol>` + - Set a new protocol. + + * - :meth:`transport.get_protocol() <BaseTransport.get_protocol>` + - Return the current protocol. + + +Transports that can receive data (TCP and Unix connections, +pipes, etc). Returned from methods like +:meth:`loop.create_connection`, :meth:`loop.create_unix_connection`, +:meth:`loop.connect_read_pipe`, etc: + +.. rubric:: Read Transports +.. list-table:: + :widths: 50 50 + :class: full-width-table + + * - :meth:`transport.is_reading() <ReadTransport.is_reading>` + - Return ``True`` if the transport is receiving. + + * - :meth:`transport.pause_reading() <ReadTransport.pause_reading>` + - Pause receiving. + + * - :meth:`transport.resume_reading() <ReadTransport.resume_reading>` + - Resume receiving. + + +Transports that can Send data (TCP and Unix connections, +pipes, etc). Returned from methods like +:meth:`loop.create_connection`, :meth:`loop.create_unix_connection`, +:meth:`loop.connect_write_pipe`, etc: + +.. rubric:: Write Transports +.. list-table:: + :widths: 50 50 + :class: full-width-table + + * - :meth:`transport.write() <WriteTransport.write>` + - Write data to the transport. + + * - :meth:`transport.writelines() <WriteTransport.writelines>` + - Write buffers to the transport. + + * - :meth:`transport.can_write_eof() <WriteTransport.can_write_eof>` + - Return :const:`True` if the transport supports sending EOF. + + * - :meth:`transport.write_eof() <WriteTransport.write_eof>` + - Close and send EOF after flushing buffered data. + + * - :meth:`transport.abort() <WriteTransport.abort>` + - Close the transport immediately. + + * - :meth:`transport.get_write_buffer_size() + <WriteTransport.get_write_buffer_size>` + - Return high and low water marks for write flow control. + + * - :meth:`transport.set_write_buffer_limits() + <WriteTransport.set_write_buffer_limits>` + - Set new high and low water marks for write flow control. + + +Transports returned by :meth:`loop.create_datagram_endpoint`: + +.. rubric:: Datagram Transports +.. list-table:: + :widths: 50 50 + :class: full-width-table + + * - :meth:`transport.sendto() <DatagramTransport.sendto>` + - Send data to the remote peer. + + * - :meth:`transport.abort() <DatagramTransport.abort>` + - Close the transport immediately. + + +Low-level transport abstraction over subprocesses. +Returned by :meth:`loop.subprocess_exec` and +:meth:`loop.subprocess_shell`: + +.. rubric:: Subprocess Transports +.. list-table:: + :widths: 50 50 + :class: full-width-table + + * - :meth:`transport.get_pid() <SubprocessTransport.get_pid>` + - Return the subprocess process id. + + * - :meth:`transport.get_pipe_transport() + <SubprocessTransport.get_pipe_transport>` + - Return the transport for the requested communication pipe + (*stdin*, *stdout*, or *stderr*). + + * - :meth:`transport.get_returncode() <SubprocessTransport.get_returncode>` + - Return the subprocess return code. + + * - :meth:`transport.kill() <SubprocessTransport.kill>` + - Kill the subprocess. + + * - :meth:`transport.send_signal() <SubprocessTransport.send_signal>` + - Send a signal to the subprocess. + + * - :meth:`transport.terminate() <SubprocessTransport.terminate>` + - Stop the subprocess. + + * - :meth:`transport.close() <SubprocessTransport.close>` + - Kill the subprocess and close all pipes. + + +Protocols +========= + +Protocol classes can implement the following **callback methods**: + +.. list-table:: + :widths: 50 50 + :class: full-width-table + + * - ``callback`` :meth:`connection_made() <BaseProtocol.connection_made>` + - Called when a connection is made. + + * - ``callback`` :meth:`connection_lost() <BaseProtocol.connection_lost>` + - Called when the connection is lost or closed. + + * - ``callback`` :meth:`pause_writing() <BaseProtocol.pause_writing>` + - Called when the transport's buffer goes over the high water mark. + + * - ``callback`` :meth:`resume_writing() <BaseProtocol.resume_writing>` + - Called when the transport's buffer drains below the low water mark. + + +.. rubric:: Streaming Protocols (TCP, Unix Sockets, Pipes) +.. list-table:: + :widths: 50 50 + :class: full-width-table + + * - ``callback`` :meth:`data_received() <Protocol.data_received>` + - Called when some data is received. + + * - ``callback`` :meth:`eof_received() <Protocol.eof_received>` + - Called when an EOF is received. + + +.. rubric:: Buffered Streaming Protocols +.. list-table:: + :widths: 50 50 + :class: full-width-table + + * - ``callback`` :meth:`get_buffer() <BufferedProtocol.get_buffer>` + - Called to allocate a new receive buffer. + + * - ``callback`` :meth:`buffer_updated() <BufferedProtocol.buffer_updated>` + - Called when the buffer was updated with the received data. + + * - ``callback`` :meth:`eof_received() <BufferedProtocol.eof_received>` + - Called when an EOF is received. + + +.. rubric:: Datagram Protocols +.. list-table:: + :widths: 50 50 + :class: full-width-table + + * - ``callback`` :meth:`datagram_received() + <DatagramProtocol.datagram_received>` + - Called when a datagram is received. + + * - ``callback`` :meth:`error_received() <DatagramProtocol.error_received>` + - Called when a previous send or receive operation raises an + :class:`OSError`. + + +.. rubric:: Subprocess Protocols +.. list-table:: + :widths: 50 50 + :class: full-width-table + + * - ``callback`` :meth:`pipe_data_received() + <SubprocessProtocol.pipe_data_received>` + - Called when the child process writes data into its + *stdout* or *stderr* pipe. + + * - ``callback`` :meth:`pipe_connection_lost() + <SubprocessProtocol.pipe_connection_lost>` + - Called when one of the pipes communicating with + the child process is closed. + + * - ``callback`` :meth:`process_exited() + <SubprocessProtocol.process_exited>` + - Called when the child process has exited. + + +Event Loop Policies +=================== + +Policies is a low-level mechanism to alter the behavior of +functions like :func:`asyncio.get_event_loop`. See also +the main :ref:`policies section <asyncio-policies>` for more +details. + + +.. rubric:: Accessing Policies +.. list-table:: + :widths: 50 50 + :class: full-width-table + + * - :meth:`asyncio.get_event_loop_policy` + - Return the current process-wide policy. + + * - :meth:`asyncio.set_event_loop_policy` + - Set a new process-wide policy. + + * - :class:`AbstractEventLoopPolicy` + - Base class for policy objects. diff --git a/Doc/library/asyncio-platforms.rst b/Doc/library/asyncio-platforms.rst new file mode 100644 index 0000000000..f8ecb58d3a --- /dev/null +++ b/Doc/library/asyncio-platforms.rst @@ -0,0 +1,106 @@ +.. currentmodule:: asyncio + + +.. _asyncio-platform-support: + + +================ +Platform Support +================ + +The :mod:`asyncio` module is designed to be portable, +but some platforms have subtle differences and limitations +due to the platforms' underlying architecture and capabilities. + + +All Platforms +============= + +* :meth:`loop.add_reader` and :meth:`loop.add_writer` + cannot be used to monitor file I/O. + + +Windows +======= + +All event loops on Windows do not support the following methods: + +* :meth:`loop.create_unix_connection` and + :meth:`loop.create_unix_server` are not supported. + The :data:`socket.AF_UNIX` socket family is specific to Unix. + +* :meth:`loop.add_signal_handler` and + :meth:`loop.remove_signal_handler` are not supported. + +:class:`SelectorEventLoop` has the following limitations: + +* :class:`~selectors.SelectSelector` is used to wait on socket events: + it supports sockets and is limited to 512 sockets. + +* :meth:`loop.add_reader` and :meth:`loop.add_writer` only accept + socket handles (e.g. pipe file descriptors are not supported). + +* Pipes are not supported, so the :meth:`loop.connect_read_pipe` + and :meth:`loop.connect_write_pipe` methods are not implemented. + +* :ref:`Subprocesses <asyncio-subprocess>` are not supported, i.e. + :meth:`loop.subprocess_exec` and :meth:`loop.subprocess_shell` + methods are not implemented. + +:class:`ProactorEventLoop` has the following limitations: + +* The :meth:`loop.create_datagram_endpoint` method + is not supported. + +* The :meth:`loop.add_reader` and :meth:`loop.add_writer` + methods are not supported. + +The resolution of the monotonic clock on Windows is usually around 15.6 +msec. The best resolution is 0.5 msec. The resolution depends on the +hardware (availability of `HPET +<https://en.wikipedia.org/wiki/High_Precision_Event_Timer>`_) and on the +Windows configuration. + + +.. _asyncio-windows-subprocess: + +Subprocess Support on Windows +----------------------------- + +:class:`SelectorEventLoop` on Windows does not support subproceses. +On Windows, :class:`ProactorEventLoop` should be used instead:: + + import asyncio + + asyncio.set_event_loop_policy( + asyncio.WindowsProactorEventLoopPolicy()) + + asyncio.run(your_code()) + + +The :meth:`policy.set_child_watcher() +<AbstractEventLoopPolicy.set_child_watcher>` function is also +not supported, as :class:`ProactorEventLoop` has a different mechanism +to watch child processes. + + +macOS +===== + +Modern macOS versions are fully supported. + +.. rubric:: macOS <= 10.8 + +On macOS 10.6, 10.7 and 10.8, the default event loop +uses :class:`selectors.KqueueSelector`, which does not support +character devices on these versions. The :class:`SelectorEventLoop` +can be manually configured to use :class:`~selectors.SelectSelector` +or :class:`~selectors.PollSelector` to support character devices on +these older versions of macOS. Example:: + + import asyncio + import selectors + + selector = selectors.SelectSelector() + loop = asyncio.SelectorEventLoop(selector) + asyncio.set_event_loop(loop) diff --git a/Doc/library/asyncio-policy.rst b/Doc/library/asyncio-policy.rst new file mode 100644 index 0000000000..42f936da46 --- /dev/null +++ b/Doc/library/asyncio-policy.rst @@ -0,0 +1,221 @@ +.. currentmodule:: asyncio + + +.. _asyncio-policies: + +======== +Policies +======== + +An event loop policy is a global per-process object that controls +the management of the event loop. Each event loop has a default +policy, which can be changed and customized using the policy API. + +A policy defines the notion of *context* and manages a +separate event loop per context. The default policy +defines *context* to be the current thread. + +By using a custom event loop policy, the behavior of +:func:`get_event_loop`, :func:`set_event_loop`, and +:func:`new_event_loop` functions can be customized. + +Policy objects should implement the APIs defined +in the :class:`AbstractEventLoopPolicy` abstract base class. + + +Getting and Setting the Policy +============================== + +The following functions can be used to get and set the policy +for the current process: + +.. function:: get_event_loop_policy() + + Return the current process-wide policy. + +.. function:: set_event_loop_policy(policy) + + Set the current process-wide policy to *policy*. + + If *policy* is set to ``None``, the default policy is restored. + + +Policy Objects +============== + +The abstract event loop policy base class is defined as follows: + +.. class:: AbstractEventLoopPolicy + + An abstract base class for asyncio policies. + + .. method:: get_event_loop() + + Get the event loop for the current context. + + Return an event loop object implementing the + :class:`AbstractEventLoop` interface. + + This method should never return ``None``. + + .. versionchanged:: 3.6 + + .. method:: set_event_loop(loop) + + Set the event loop for the current context to *loop*. + + .. method:: new_event_loop() + + Create and return a new event loop object. + + This method should never return ``None``. + + .. method:: get_child_watcher() + + Get a child process watcher object. + + Return a watcher object implementing the + :class:`AbstractChildWatcher` interface. + + This function is Unix specific. + + .. method:: set_child_watcher(watcher) + + Get the current child process watcher to *watcher*. + + This function is Unix specific. + + +asyncio ships with the following built-in policies: + + +.. class:: DefaultEventLoopPolicy + + The default asyncio policy. Uses :class:`SelectorEventLoop` + on both Unix and Windows platforms. + + There is no need to install the default policy manually. asyncio + is configured to use the default policy automatically. + + +.. class:: WindowsProactorEventLoopPolicy + + An alternative event loop policy that uses the + :class:`ProactorEventLoop` event loop implementation. + + Availability: Windows. + + +Process Watchers +================ + +A process watcher allows customization of how an event loop monitors +child processes on Unix. Specifically, the event loop needs to know +when a child process has exited. + +In asyncio, child processes are created with +:func:`create_subprocess_exec` and :meth:`loop.subprocess_exec` +functions. + +asyncio defines the :class:`AbstractChildWatcher` abstract base class, +which child watchers should implement, and has two different +implementations: :class:`SafeChildWatcher` (configured to be used +by default) and :class:`FastChildWatcher`. + +See also the :ref:`Subprocess and Threads <asyncio-subprocess-threads>` +section. + +The following two functions can be used to customize the child process watcher +implementation used by the asyncio event loop: + +.. function:: get_child_watcher() + + Return the current child watcher for the current policy. + +.. function:: set_child_watcher(watcher) + + Set the current child watcher to *watcher* for the current + policy. *watcher* must implement methods defined in the + :class:`AbstractChildWatcher` base class. + +.. note:: + Third-party event loops implementations might not support + custom child watchers. For such event loops, using + :func:`set_child_watcher` might be prohibited or have no effect. + +.. class:: AbstractChildWatcher + + .. method:: add_child_handler(pid, callback, \*args) + + Register a new child handler. + + Arrange for ``callback(pid, returncode, *args)`` to be called + when a process with PID equal to *pid* terminates. Specifying + another callback for the same process replaces the previous + handler. + + The *callback* callable must be thread-safe. + + .. method:: remove_child_handler(pid) + + Removes the handler for process with PID equal to *pid*. + + The function returns ``True`` if the handler was successfully + removed, ``False`` if there was nothing to remove. + + .. method:: attach_loop(loop) + + Attach the watcher to an event loop. + + If the watcher was previously attached to an event loop, then + it is first detached before attaching to the new loop. + + Note: loop may be ``None``. + + .. method:: close() + + Close the watcher. + + This method has to be called to ensure that underlying + resources are cleaned-up. + +.. class:: SafeChildWatcher + + This implementation avoids disrupting other code spawning processes + by polling every process explicitly on a :py:data:`SIGCHLD` signal. + + This is a safe solution but it has a significant overhead when + handling a big number of processes (*O(n)* each time a + :py:data:`SIGCHLD` is received). + + asyncio uses this safe implementation by default. + +.. class:: FastChildWatcher + + This implementation reaps every terminated processes by calling + ``os.waitpid(-1)`` directly, possibly breaking other code spawning + processes and waiting for their termination. + + There is no noticeable overhead when handling a big number of + children (*O(1)* each time a child terminates). + + +Custom Policies +=============== + +To implement a new event loop policy, it is recommended to subclass +:class:`DefaultEventLoopPolicy` and override the methods for which +custom behavior is wanted, e.g.:: + + class MyEventLoopPolicy(asyncio.DefaultEventLoopPolicy): + + def get_event_loop(self): + """Get the event loop. + + This may be None or an instance of EventLoop. + """ + loop = super().get_event_loop() + # Do something with loop ... + return loop + + asyncio.set_event_loop_policy(MyEventLoopPolicy()) diff --git a/Doc/library/asyncio-protocol.rst b/Doc/library/asyncio-protocol.rst index 9a08a4a490..bdfdcf7ddb 100644 --- a/Doc/library/asyncio-protocol.rst +++ b/Doc/library/asyncio-protocol.rst @@ -1,12 +1,68 @@ .. currentmodule:: asyncio -+++++++++++++++++++++++++++++++++++++++++++++ -Transports and protocols (callback based API) -+++++++++++++++++++++++++++++++++++++++++++++ -**Source code:** :source:`Lib/asyncio/transports.py` +.. _asyncio-transports-protocols: + + +======================== +Transports and Protocols +======================== + +.. rubric:: Preface + +Transports and Protocols are used by the **low-level** event loop +APIs such as :meth:`loop.create_connection`. They use +callback-based programming style and enable high-performance +implementations of network or IPC protocols (e.g. HTTP). + +Essentially, transports and protocols should only be used in +libraries and frameworks and never in high-level asyncio +applications. + +This documentation page covers both `Transports`_ and `Protocols`_. + +.. rubric:: Introduction + +At the highest level, the transport is concerned with *how* bytes +are transmitted, while the protocol determines *which* bytes to +transmit (and to some extent when). + +A different way of saying the same thing: a transport is an +abstraction for a socket (or similar I/O endpoint) while a protocol +is an abstraction for an application, from the transport's point +of view. + +Yet another view is the transport and protocol interfaces +together define an abstract interface for using network I/O and +interprocess I/O. + +There is always a 1:1 relationship between transport and protocol +objects: the protocol calls transport methods to send data, +while the transport calls protocol methods to pass it data that +has been received. + +Most of connection oriented event loop methods +(such as :meth:`loop.create_connection`) usually accept a +*protocol_factory* argument used to create a *Protocol* object +for an accepted connection, represented by a *Transport* object. +Such methods usually return a tuple of ``(transport, protocol)``. + +.. rubric:: Contents + +This documentation page contains the following sections: + +* The `Transports`_ section documents asyncio :class:`BaseTransport`, + :class:`ReadTransport`, :class:`WriteTransport`, :class:`Transport`, + :class:`DatagramTransport`, and :class:`SubprocessTransport` + classes. + +* The `Protocols`_ section documents asyncio :class:`BaseProtocol`, + :class:`Protocol`, :class:`BufferedProtocol`, + :class:`DatagramProtocol`, and :class:`SubprocessProtocol` classes. + +* The `Examples`_ section showcases how to work with transports, + protocols, and low-level event loop APIs. -**Source code:** :source:`Lib/asyncio/protocols.py` .. _asyncio-transport: @@ -14,293 +70,360 @@ Transports ========== Transports are classes provided by :mod:`asyncio` in order to abstract -various kinds of communication channels. You generally won't instantiate -a transport yourself; instead, you will call an :class:`AbstractEventLoop` method -which will create the transport and try to initiate the underlying -communication channel, calling you back when it succeeds. +various kinds of communication channels. -Once the communication channel is established, a transport is always -paired with a :ref:`protocol <asyncio-protocol>` instance. The protocol can -then call the transport's methods for various purposes. +Transport objects are always instantiated by an +ref:`asyncio event loop <asyncio-event-loop>`. -:mod:`asyncio` currently implements transports for TCP, UDP, SSL, and -subprocess pipes. The methods available on a transport depend on -the transport's kind. +asyncio implements transports for TCP, UDP, SSL, and subprocess pipes. +The methods available on a transport depend on the transport's kind. The transport classes are :ref:`not thread safe <asyncio-multithreading>`. -.. versionchanged:: 3.6 - The socket option ``TCP_NODELAY`` is now set by default. - -BaseTransport -------------- +Transports Hierarchy +-------------------- .. class:: BaseTransport - Base class for transports. + Base class for all transports. Contains methods that all + asyncio transports share. - .. method:: close() +.. class:: WriteTransport(BaseTransport) - Close the transport. If the transport has a buffer for outgoing - data, buffered data will be flushed asynchronously. No more data - will be received. After all buffered data is flushed, the - protocol's :meth:`connection_lost` method will be called with - :const:`None` as its argument. + A base transport for write-only connections. - .. method:: is_closing() + Instances of the *WriteTransport* class are returned from + the :meth:`loop.connect_write_pipe` event loop method and + are also used by subprocess-related methods like + :meth:`loop.subprocess_exec`. - Return ``True`` if the transport is closing or is closed. +.. class:: ReadTransport(BaseTransport) - .. versionadded:: 3.5.1 + A base transport for read-only connections. - .. method:: get_extra_info(name, default=None) + Instances of the *ReadTransport* class are returned from + the :meth:`loop.connect_read_pipe` event loop method and + are also used by subprocess-related methods like + :meth:`loop.subprocess_exec`. - Return optional transport information. *name* is a string representing - the piece of transport-specific information to get, *default* is the - value to return if the information doesn't exist. +.. class:: Transport(WriteTransport, ReadTransport) - This method allows transport implementations to easily expose - channel-specific information. + Interface representing a bidirectional transport, such as a + TCP connection. - * socket: + The user does not instantiate a transport directly; they call a + utility function, passing it a protocol factory and other + information necessary to create the transport and protocol. - - ``'peername'``: the remote address to which the socket is connected, - result of :meth:`socket.socket.getpeername` (``None`` on error) - - ``'socket'``: :class:`socket.socket` instance - - ``'sockname'``: the socket's own address, - result of :meth:`socket.socket.getsockname` + Instances of the *Transport* class are returned from or used by + event loop methods like :meth:`loop.create_connection`, + :meth:`loop.create_unix_connection`, + :meth:`loop.create_server`, :meth:`loop.sendfile`, etc. - * SSL socket: - - ``'compression'``: the compression algorithm being used as a string, - or ``None`` if the connection isn't compressed; result of - :meth:`ssl.SSLSocket.compression` - - ``'cipher'``: a three-value tuple containing the name of the cipher - being used, the version of the SSL protocol that defines its use, and - the number of secret bits being used; result of - :meth:`ssl.SSLSocket.cipher` - - ``'peercert'``: peer certificate; result of - :meth:`ssl.SSLSocket.getpeercert` - - ``'sslcontext'``: :class:`ssl.SSLContext` instance - - ``'ssl_object'``: :class:`ssl.SSLObject` or :class:`ssl.SSLSocket` - instance +.. class:: DatagramTransport(BaseTransport) - * pipe: + A transport for datagram (UDP) connections. - - ``'pipe'``: pipe object + Instances of the *DatagramTransport* class are returned from + the :meth:`loop.create_datagram_endpoint` event loop method. - * subprocess: - - ``'subprocess'``: :class:`subprocess.Popen` instance +.. class:: SubprocessTransport(BaseTransport) - .. method:: set_protocol(protocol) + An abstraction to represent a connection between a parent and its + child OS process. - Set a new protocol. Switching protocol should only be done when both - protocols are documented to support the switch. + Instances of the *SubprocessTransport* class are returned from + event loop methods :meth:`loop.subprocess_shell` and + :meth:`loop.subprocess_exec`. - .. versionadded:: 3.5.3 - .. method:: get_protocol +Base Transport +-------------- - Return the current protocol. +.. method:: BaseTransport.close() - .. versionadded:: 3.5.3 + Close the transport. - .. versionchanged:: 3.5.1 - ``'ssl_object'`` info was added to SSL sockets. + If the transport has a buffer for outgoing + data, buffered data will be flushed asynchronously. No more data + will be received. After all buffered data is flushed, the + protocol's :meth:`protocol.connection_lost() + <BaseProtocol.connection_lost>` method will be called with + :const:`None` as its argument. +.. method:: BaseTransport.is_closing() -ReadTransport -------------- + Return ``True`` if the transport is closing or is closed. -.. class:: ReadTransport +.. method:: BaseTransport.get_extra_info(name, default=None) - Interface for read-only transports. + Return information about the transport or underlying resources + it uses. - .. method:: is_reading() + *name* is a string representing the piece of transport-specific + information to get. - Return ``True`` if the transport is receiving new data. + *default* is the value to return if the information is not + available, or if the transport does not support querying it + with the given third-party event loop implementation or on the + current platform. - .. versionadded:: 3.7 + For example, the following code attempts to get the underlying + socket object of the transport:: - .. method:: pause_reading() + sock = transport.get_extra_info('socket') + if sock is not None: + print(sock.getsockopt(...)) - Pause the receiving end of the transport. No data will be passed to - the protocol's :meth:`data_received` method until :meth:`resume_reading` - is called. + Categories of information that can be queried on some transports: - .. versionchanged:: 3.7 - The method is idempotent, i.e. it can be called when the - transport is already paused or closed. + * socket: - .. method:: resume_reading() + - ``'peername'``: the remote address to which the socket is + connected, result of :meth:`socket.socket.getpeername` + (``None`` on error) - Resume the receiving end. The protocol's :meth:`data_received` method - will be called once again if some data is available for reading. + - ``'socket'``: :class:`socket.socket` instance - .. versionchanged:: 3.7 - The method is idempotent, i.e. it can be called when the - transport is already reading. + - ``'sockname'``: the socket's own address, + result of :meth:`socket.socket.getsockname` + * SSL socket: -WriteTransport --------------- + - ``'compression'``: the compression algorithm being used as a + string, or ``None`` if the connection isn't compressed; result + of :meth:`ssl.SSLSocket.compression` + + - ``'cipher'``: a three-value tuple containing the name of the + cipher being used, the version of the SSL protocol that defines + its use, and the number of secret bits being used; result of + :meth:`ssl.SSLSocket.cipher` + + - ``'peercert'``: peer certificate; result of + :meth:`ssl.SSLSocket.getpeercert` -.. class:: WriteTransport + - ``'sslcontext'``: :class:`ssl.SSLContext` instance - Interface for write-only transports. + - ``'ssl_object'``: :class:`ssl.SSLObject` or + :class:`ssl.SSLSocket` instance - .. method:: abort() + * pipe: - Close the transport immediately, without waiting for pending operations - to complete. Buffered data will be lost. No more data will be received. - The protocol's :meth:`connection_lost` method will eventually be - called with :const:`None` as its argument. + - ``'pipe'``: pipe object - .. method:: can_write_eof() + * subprocess: - Return :const:`True` if the transport supports :meth:`write_eof`, - :const:`False` if not. + - ``'subprocess'``: :class:`subprocess.Popen` instance - .. method:: get_write_buffer_size() +.. method:: BaseTransport.set_protocol(protocol) - Return the current size of the output buffer used by the transport. + Set a new protocol. - .. method:: get_write_buffer_limits() + Switching protocol should only be done when both + protocols are documented to support the switch. - Get the *high*- and *low*-water limits for write flow control. Return a - tuple ``(low, high)`` where *low* and *high* are positive number of - bytes. +.. method:: BaseTransport.get_protocol() - Use :meth:`set_write_buffer_limits` to set the limits. + Return the current protocol. - .. versionadded:: 3.4.2 - .. method:: set_write_buffer_limits(high=None, low=None) +Read-only Transports +-------------------- - Set the *high*- and *low*-water limits for write flow control. +.. method:: ReadTransport.is_reading() - These two values (measured in number of - bytes) control when the protocol's - :meth:`pause_writing` and :meth:`resume_writing` methods are called. - If specified, the low-water limit must be less than or equal to the - high-water limit. Neither *high* nor *low* can be negative. + Return ``True`` if the transport is receiving new data. - :meth:`pause_writing` is called when the buffer size becomes greater - than or equal to the *high* value. If writing has been paused, - :meth:`resume_writing` is called when the buffer size becomes less - than or equal to the *low* value. + .. versionadded:: 3.7 - The defaults are implementation-specific. If only the - high-water limit is given, the low-water limit defaults to an - implementation-specific value less than or equal to the - high-water limit. Setting *high* to zero forces *low* to zero as - well, and causes :meth:`pause_writing` to be called whenever the - buffer becomes non-empty. Setting *low* to zero causes - :meth:`resume_writing` to be called only once the buffer is empty. - Use of zero for either limit is generally sub-optimal as it - reduces opportunities for doing I/O and computation - concurrently. +.. method:: ReadTransport.pause_reading() - Use :meth:`get_write_buffer_limits` to get the limits. + Pause the receiving end of the transport. No data will be passed to + the protocol's :meth:`protocol.data_received() <Protocol.data_received>` + method until :meth:`resume_reading` is called. - .. method:: write(data) + .. versionchanged:: 3.7 + The method is idempotent, i.e. it can be called when the + transport is already paused or closed. - Write some *data* bytes to the transport. +.. method:: ReadTransport.resume_reading() - This method does not block; it buffers the data and arranges for it - to be sent out asynchronously. + Resume the receiving end. The protocol's + :meth:`protocol.data_received() <Protocol.data_received>` method + will be called once again if some data is available for reading. - .. method:: writelines(list_of_data) + .. versionchanged:: 3.7 + The method is idempotent, i.e. it can be called when the + transport is already reading. - Write a list (or any iterable) of data bytes to the transport. - This is functionally equivalent to calling :meth:`write` on each - element yielded by the iterable, but may be implemented more efficiently. - .. method:: write_eof() +Write-only Transports +--------------------- - Close the write end of the transport after flushing buffered data. - Data may still be received. +.. method:: WriteTransport.abort() - This method can raise :exc:`NotImplementedError` if the transport - (e.g. SSL) doesn't support half-closes. + Close the transport immediately, without waiting for pending operations + to complete. Buffered data will be lost. No more data will be received. + The protocol's :meth:`protocol.connection_lost() + <BaseProtocol.connection_lost>` method will eventually be + called with :const:`None` as its argument. +.. method:: WriteTransport.can_write_eof() -DatagramTransport ------------------ + Return :const:`True` if the transport supports + :meth:`~WriteTransport.write_eof`, :const:`False` if not. -.. method:: DatagramTransport.sendto(data, addr=None) +.. method:: WriteTransport.get_write_buffer_size() - Send the *data* bytes to the remote peer given by *addr* (a - transport-dependent target address). If *addr* is :const:`None`, the - data is sent to the target address given on transport creation. + Return the current size of the output buffer used by the transport. + +.. method:: WriteTransport.get_write_buffer_limits() + + Get the *high* and *low* watermarks for write flow control. Return a + tuple ``(low, high)`` where *low* and *high* are positive number of + bytes. + + Use :meth:`set_write_buffer_limits` to set the limits. + + .. versionadded:: 3.4.2 + +.. method:: WriteTransport.set_write_buffer_limits(high=None, low=None) + + Set the *high* and *low* watermarks for write flow control. + + These two values (measured in number of + bytes) control when the protocol's + :meth:`protocol.pause_writing() <BaseProtocol.pause_writing>` + and :meth:`protocol.resume_writing() <BaseProtocol.resume_writing>` + methods are called. If specified, the low watermark must be less + than or equal to the high watermark. Neither *high* nor *low* + can be negative. + + :meth:`~BaseProtocol.pause_writing` is called when the buffer size + becomes greater than or equal to the *high* value. If writing has + been paused, :meth:`~BaseProtocol.resume_writing` is called when + the buffer size becomes less than or equal to the *low* value. + + The defaults are implementation-specific. If only the + high watermark is given, the low watermark defaults to an + implementation-specific value less than or equal to the + high watermark. Setting *high* to zero forces *low* to zero as + well, and causes :meth:`~BaseProtocol.pause_writing` to be called + whenever the buffer becomes non-empty. Setting *low* to zero causes + :meth:`~BaseProtocol.resume_writing` to be called only once the + buffer is empty. Use of zero for either limit is generally + sub-optimal as it reduces opportunities for doing I/O and + computation concurrently. + + Use :meth:`~WriteTransport.get_write_buffer_limits` + to get the limits. + +.. method:: WriteTransport.write(data) + + Write some *data* bytes to the transport. This method does not block; it buffers the data and arranges for it to be sent out asynchronously. +.. method:: WriteTransport.writelines(list_of_data) + + Write a list (or any iterable) of data bytes to the transport. + This is functionally equivalent to calling :meth:`write` on each + element yielded by the iterable, but may be implemented more + efficiently. + +.. method:: WriteTransport.write_eof() + + Close the write end of the transport after flushing all buffered data. + Data may still be received. + + This method can raise :exc:`NotImplementedError` if the transport + (e.g. SSL) doesn't support half-closed connections. + + +Datagram Transports +------------------- + +.. method:: DatagramTransport.sendto(data, addr=None) + + Send the *data* bytes to the remote peer given by *addr* (a + transport-dependent target address). If *addr* is :const:`None`, + the data is sent to the target address given on transport + creation. + + This method does not block; it buffers the data and arranges + for it to be sent out asynchronously. + .. method:: DatagramTransport.abort() - Close the transport immediately, without waiting for pending operations - to complete. Buffered data will be lost. No more data will be received. - The protocol's :meth:`connection_lost` method will eventually be - called with :const:`None` as its argument. + Close the transport immediately, without waiting for pending + operations to complete. Buffered data will be lost. + No more data will be received. The protocol's + :meth:`protocol.connection_lost() <BaseProtocol.connection_lost>` + method will eventually be called with :const:`None` as its argument. + + +.. _asyncio-subprocess-transports: + +Subprocess Transports +--------------------- +.. method:: SubprocessTransport.get_pid() -BaseSubprocessTransport ------------------------ + Return the subprocess process id as an integer. -.. class:: BaseSubprocessTransport +.. method:: SubprocessTransport.get_pipe_transport(fd) - .. method:: get_pid() + Return the transport for the communication pipe corresponding to the + integer file descriptor *fd*: - Return the subprocess process id as an integer. + * ``0``: readable streaming transport of the standard input (*stdin*), + or :const:`None` if the subprocess was not created with ``stdin=PIPE`` + * ``1``: writable streaming transport of the standard output (*stdout*), + or :const:`None` if the subprocess was not created with ``stdout=PIPE`` + * ``2``: writable streaming transport of the standard error (*stderr*), + or :const:`None` if the subprocess was not created with ``stderr=PIPE`` + * other *fd*: :const:`None` - .. method:: get_pipe_transport(fd) +.. method:: SubprocessTransport.get_returncode() - Return the transport for the communication pipe corresponding to the - integer file descriptor *fd*: + Return the subprocess return code as an integer or :const:`None` + if it hasn't returned, which is similar to the + :attr:`subprocess.Popen.returncode` attribute. - * ``0``: readable streaming transport of the standard input (*stdin*), - or :const:`None` if the subprocess was not created with ``stdin=PIPE`` - * ``1``: writable streaming transport of the standard output (*stdout*), - or :const:`None` if the subprocess was not created with ``stdout=PIPE`` - * ``2``: writable streaming transport of the standard error (*stderr*), - or :const:`None` if the subprocess was not created with ``stderr=PIPE`` - * other *fd*: :const:`None` +.. method:: SubprocessTransport.kill() - .. method:: get_returncode() + Kill the subprocess. - Return the subprocess returncode as an integer or :const:`None` - if it hasn't returned, similarly to the - :attr:`subprocess.Popen.returncode` attribute. + On POSIX systems, the function sends SIGKILL to the subprocess. + On Windows, this method is an alias for :meth:`terminate`. - .. method:: kill() + See also :meth:`subprocess.Popen.kill`. - Kill the subprocess, as in :meth:`subprocess.Popen.kill`. +.. method:: SubprocessTransport.send_signal(signal) - On POSIX systems, the function sends SIGKILL to the subprocess. - On Windows, this method is an alias for :meth:`terminate`. + Send the *signal* number to the subprocess, as in + :meth:`subprocess.Popen.send_signal`. - .. method:: send_signal(signal) +.. method:: SubprocessTransport.terminate() - Send the *signal* number to the subprocess, as in - :meth:`subprocess.Popen.send_signal`. + Stop the subprocess. - .. method:: terminate() + On POSIX systems, this method sends SIGTERM to the subprocess. + On Windows, the Windows API function TerminateProcess() is called to + stop the subprocess. - Ask the subprocess to stop, as in :meth:`subprocess.Popen.terminate`. - This method is an alias for the :meth:`close` method. + See also :meth:`subprocess.Popen.terminate`. - On POSIX systems, this method sends SIGTERM to the subprocess. - On Windows, the Windows API function TerminateProcess() is called to - stop the subprocess. +.. method:: SubprocessTransport.close() - .. method:: close() + Kill the subprocess by calling the :meth:`kill` method. - Ask the subprocess to stop by calling the :meth:`terminate` method if the - subprocess hasn't returned yet, and close transports of all pipes - (*stdin*, *stdout* and *stderr*). + If the subprocess hasn't returned yet, and close transports of + *stdin*, *stdout*, and *stderr* pipes. .. _asyncio-protocol: @@ -308,65 +431,61 @@ BaseSubprocessTransport Protocols ========= -:mod:`asyncio` provides base classes that you can subclass to implement -your network protocols. Those classes are used in conjunction with -:ref:`transports <asyncio-transport>` (see below): the protocol parses incoming -data and asks for the writing of outgoing data, while the transport is -responsible for the actual I/O and buffering. +asyncio provides a set of abstract base classes that should be used +to implement network protocols. Those classes are meant to be used +together with :ref:`transports <asyncio-transport>`. -When subclassing a protocol class, it is recommended you override certain -methods. Those methods are callbacks: they will be called by the transport -on certain events (for example when some data is received); you shouldn't -call them yourself, unless you are implementing a transport. +Subclasses of abstract base protocol classes may implement some or +all methods. All these methods are callbacks: they are called by +transports on certain events, for example when some data is received. +A base protocol method should be called by the corresponding transport. -.. note:: - All callbacks have default implementations, which are empty. Therefore, - you only need to implement the callbacks for the events in which you - are interested. +Base Protocols +-------------- + +.. class:: BaseProtocol -Protocol classes ----------------- + Base protocol with methods that all protocols share. -.. class:: Protocol +.. class:: Protocol(BaseProtocol) - The base class for implementing streaming protocols (for use with - e.g. TCP and SSL transports). + The base class for implementing streaming protocols + (TCP, Unix sockets, etc). -.. class:: BufferedProtocol +.. class:: BufferedProtocol(BaseProtocol) A base class for implementing streaming protocols with manual control of the receive buffer. - .. versionadded:: 3.7 - **Important:** this has been added to asyncio in Python 3.7 - *on a provisional basis*! Treat it as an experimental API that - might be changed or removed in Python 3.8. - -.. class:: DatagramProtocol +.. class:: DatagramProtocol(BaseProtocol) - The base class for implementing datagram protocols (for use with - e.g. UDP transports). + The base class for implementing datagram (UDP) protocols. -.. class:: SubprocessProtocol +.. class:: SubprocessProtocol(BaseProtocol) The base class for implementing protocols communicating with child - processes (through a set of unidirectional pipes). + processes (unidirectional pipes). -Connection callbacks --------------------- +Base Protocol +------------- + +All asyncio protocols can implement Base Protocol callbacks. + +.. rubric:: Connection Callbacks -These callbacks may be called on :class:`Protocol`, :class:`DatagramProtocol` -and :class:`SubprocessProtocol` instances: +Connection callbacks are called on all protocols, exactly once per +a successful connection. All other protocol callbacks can only be +called between those two methods. .. method:: BaseProtocol.connection_made(transport) Called when a connection is made. The *transport* argument is the transport representing the - connection. You are responsible for storing it somewhere - (e.g. as an attribute) if you need to. + connection. The protocol is responsible for storing the reference + to its transport. .. method:: BaseProtocol.connection_lost(exc) @@ -376,65 +495,76 @@ and :class:`SubprocessProtocol` instances: The latter means a regular EOF is received, or the connection was aborted or closed by this side of the connection. -:meth:`~BaseProtocol.connection_made` and :meth:`~BaseProtocol.connection_lost` -are called exactly once per successful connection. All other callbacks will be -called between those two methods, which allows for easier resource management -in your protocol implementation. -The following callbacks may be called only on :class:`SubprocessProtocol` -instances: +.. rubric:: Flow Control Callbacks -.. method:: SubprocessProtocol.pipe_data_received(fd, data) +Flow control callbacks can be called by transports to pause or +resume writing performed by the protocol. - Called when the child process writes data into its stdout or stderr pipe. - *fd* is the integer file descriptor of the pipe. *data* is a non-empty - bytes object containing the data. +See the documentation of the :meth:`~WriteTransport.set_write_buffer_limits` +method for more details. -.. method:: SubprocessProtocol.pipe_connection_lost(fd, exc) +.. method:: BaseProtocol.pause_writing() - Called when one of the pipes communicating with the child process - is closed. *fd* is the integer file descriptor that was closed. + Called when the transport's buffer goes over the high watermark. -.. method:: SubprocessProtocol.process_exited() +.. method:: BaseProtocol.resume_writing() - Called when the child process has exited. + Called when the transport's buffer drains below the low watermark. + +If the buffer size equals the high watermark, +:meth:`~BaseProtocol.pause_writing` is not called: the buffer size must +go strictly over. + +Conversely, :meth:`~BaseProtocol.resume_writing` is called when the +buffer size is equal or lower than the low watermark. These end +conditions are important to ensure that things go as expected when +either mark is zero. -Streaming protocols +Streaming Protocols ------------------- -The following callbacks are called on :class:`Protocol` instances: +Event methods, such as :meth:`loop.create_server`, +:meth:`loop.create_unix_server`, :meth:`loop.create_connection`, +:meth:`loop.create_unix_connection`, :meth:`loop.connect_accepted_socket`, +:meth:`loop.connect_read_pipe`, and :meth:`loop.connect_write_pipe` +accept factories that return streaming protocols. .. method:: Protocol.data_received(data) - Called when some data is received. *data* is a non-empty bytes object - containing the incoming data. + Called when some data is received. *data* is a non-empty bytes + object containing the incoming data. - .. note:: - Whether the data is buffered, chunked or reassembled depends on - the transport. In general, you shouldn't rely on specific semantics - and instead make your parsing generic and flexible enough. However, - data is always received in the correct order. + Whether the data is buffered, chunked or reassembled depends on + the transport. In general, you shouldn't rely on specific semantics + and instead make your parsing generic and flexible. However, + data is always received in the correct order. + + The method can be called an arbitrary number of times while + a connection is open. + + However, :meth:`protocol.eof_received() <Protocol.eof_received>` + is called at most once. Once `eof_received()` is called, + ``data_received()`` is not called anymore. .. method:: Protocol.eof_received() Called when the other end signals it won't send any more data - (for example by calling :meth:`write_eof`, if the other end also uses + (for example by calling :meth:`transport.write_eof() + <WriteTransport.write_eof>`, if the other end also uses asyncio). This method may return a false value (including ``None``), in which case the transport will close itself. Conversely, if this method returns a - true value, closing the transport is up to the protocol. Since the - default implementation returns ``None``, it implicitly closes the connection. + true value, the protocol used determines whether to close the transport. + Since the default implementation returns ``None``, it implicitly closes the + connection. - .. note:: - Some transports such as SSL don't support half-closed connections, - in which case returning true from this method will not prevent closing - the connection. + Some transports, including SSL, don't support half-closed connections, + in which case returning true from this method will result in the connection + being closed. -:meth:`data_received` can be called an arbitrary number of times during -a connection. However, :meth:`eof_received` is called at most once -and, if called, :meth:`data_received` won't be called after it. State machine: @@ -446,25 +576,23 @@ State machine: -> connection_lost -> end -Streaming protocols with manual receive buffer control ------------------------------------------------------- +Buffered Streaming Protocols +---------------------------- .. versionadded:: 3.7 - **Important:** :class:`BufferedProtocol` has been added to - asyncio in Python 3.7 *on a provisional basis*! Consider it as an - experimental API that might be changed or removed in Python 3.8. - + **Important:** this has been added to asyncio in Python 3.7 + *on a provisional basis*! This is as an experimental API that + might be changed or removed completely in Python 3.8. -Event methods, such as :meth:`AbstractEventLoop.create_server` and -:meth:`AbstractEventLoop.create_connection`, accept factories that -return protocols that implement this interface. +Buffered Protocols can be used with any event loop method +that supports `Streaming Protocols`_. -The idea of BufferedProtocol is that it allows to manually allocate -and control the receive buffer. Event loops can then use the buffer +``BufferedProtocol`` implementations allow explicit manual allocation +and control of the receive buffer. Event loops can then use the buffer provided by the protocol to avoid unnecessary data copies. This can result in noticeable performance improvement for protocols that -receive big amounts of data. Sophisticated protocols implementations -can allocate the buffer only once at creation time. +receive big amounts of data. Sophisticated protocol implementations +can significantly reduce the number of buffer allocations. The following callbacks are called on :class:`BufferedProtocol` instances: @@ -473,12 +601,12 @@ instances: Called to allocate a new receive buffer. - *sizehint* is a recommended minimal size for the returned - buffer. It is acceptable to return smaller or bigger buffers + *sizehint* is the recommended minimum size for the returned + buffer. It is acceptable to return smaller or larger buffers than what *sizehint* suggests. When set to -1, the buffer size - can be arbitrary. It is an error to return a zero-sized buffer. + can be arbitrary. It is an error to return a buffer with a zero size. - Must return an object that implements the + ``get_buffer()`` must return an object implementing the :ref:`buffer protocol <bufferobjects>`. .. method:: BufferedProtocol.buffer_updated(nbytes) @@ -489,13 +617,15 @@ instances: .. method:: BufferedProtocol.eof_received() - See the documentation of the :meth:`Protocol.eof_received` method. + See the documentation of the :meth:`protocol.eof_received() + <Protocol.eof_received>` method. -:meth:`get_buffer` can be called an arbitrary number of times during -a connection. However, :meth:`eof_received` is called at most once -and, if called, :meth:`get_buffer` and :meth:`buffer_updated` -won't be called after it. +:meth:`~BufferedProtocol.get_buffer` can be called an arbitrary number +of times during a connection. However, :meth:`protocol.eof_received() +<Protocol.eof_received>` is called at most once +and, if called, :meth:`~BufferedProtocol.get_buffer` and +:meth:`~BufferedProtocol.buffer_updated` won't be called after it. State machine: @@ -509,10 +639,11 @@ State machine: -> connection_lost -> end -Datagram protocols +Datagram Protocols ------------------ -The following callbacks are called on :class:`DatagramProtocol` instances. +Datagram Protocol instances should be constructed by protocol +factories passed to the :meth:`loop.create_datagram_endpoint` method. .. method:: DatagramProtocol.datagram_received(data, addr) @@ -526,80 +657,120 @@ The following callbacks are called on :class:`DatagramProtocol` instances. :class:`OSError`. *exc* is the :class:`OSError` instance. This method is called in rare conditions, when the transport (e.g. UDP) - detects that a datagram couldn't be delivered to its recipient. + detects that a datagram could not be delivered to its recipient. In many conditions though, undeliverable datagrams will be silently dropped. +.. note:: -Flow control callbacks ----------------------- + On BSD systems (macOS, FreeBSD, etc.) flow control is not supported + for datagram protocols, because there is no reliable way to detect send + failures caused by writing too many packets. -These callbacks may be called on :class:`Protocol`, -:class:`DatagramProtocol` and :class:`SubprocessProtocol` instances: + The socket always appears 'ready' and excess packets are dropped. An + :class:`OSError` with ``errno`` set to :const:`errno.ENOBUFS` may + or may not be raised; if it is raised, it will be reported to + :meth:`DatagramProtocol.error_received` but otherwise ignored. -.. method:: BaseProtocol.pause_writing() - Called when the transport's buffer goes over the high-water mark. +.. _asyncio-subprocess-protocols: -.. method:: BaseProtocol.resume_writing() +Subprocess Protocols +-------------------- - Called when the transport's buffer drains below the low-water mark. +Datagram Protocol instances should be constructed by protocol +factories passed to the :meth:`loop.subprocess_exec` and +:meth:`loop.subprocess_shell` methods. +.. method:: SubprocessProtocol.pipe_data_received(fd, data) -:meth:`pause_writing` and :meth:`resume_writing` calls are paired -- -:meth:`pause_writing` is called once when the buffer goes strictly over -the high-water mark (even if subsequent writes increases the buffer size -even more), and eventually :meth:`resume_writing` is called once when the -buffer size reaches the low-water mark. + Called when the child process writes data into its stdout or stderr + pipe. -.. note:: - If the buffer size equals the high-water mark, - :meth:`pause_writing` is not called -- it must go strictly over. - Conversely, :meth:`resume_writing` is called when the buffer size is - equal or lower than the low-water mark. These end conditions - are important to ensure that things go as expected when either - mark is zero. + *fd* is the integer file descriptor of the pipe. -.. note:: - On BSD systems (OS X, FreeBSD, etc.) flow control is not supported - for :class:`DatagramProtocol`, because send failures caused by - writing too many packets cannot be detected easily. The socket - always appears 'ready' and excess packets are dropped; an - :class:`OSError` with errno set to :const:`errno.ENOBUFS` may or - may not be raised; if it is raised, it will be reported to - :meth:`DatagramProtocol.error_received` but otherwise ignored. + *data* is a non-empty bytes object containing the received data. + +.. method:: SubprocessProtocol.pipe_connection_lost(fd, exc) + + Called when one of the pipes communicating with the child process + is closed. + + *fd* is the integer file descriptor that was closed. + +.. method:: SubprocessProtocol.process_exited() + + Called when the child process has exited. + + +Examples +======== + +.. _asyncio_example_tcp_echo_server_protocol: + +TCP Echo Server +--------------- + +Create a TCP echo server using the :meth:`loop.create_server` method, send back +received data, and close the connection:: + + import asyncio + + + class EchoServerClientProtocol(asyncio.Protocol): + def connection_made(self, transport): + peername = transport.get_extra_info('peername') + print('Connection from {}'.format(peername)) + self.transport = transport + + def data_received(self, data): + message = data.decode() + print('Data received: {!r}'.format(message)) + + print('Send: {!r}'.format(message)) + self.transport.write(data) + + print('Close the client socket') + self.transport.close() + + + async def main(): + # Get a reference to the event loop as we plan to use + # low-level APIs. + loop = asyncio.get_running_loop() + server = await loop.create_server( + lambda: EchoServerClientProtocol(), + '127.0.0.1', 8888) -Coroutines and protocols ------------------------- + async with server: + await server.serve_forever() -Coroutines can be scheduled in a protocol method using :func:`ensure_future`, -but there is no guarantee made about the execution order. Protocols are not -aware of coroutines created in protocol methods and so will not wait for them. -To have a reliable execution order, -use :ref:`stream objects <asyncio-streams>` in a -coroutine with ``await``. For example, the :meth:`StreamWriter.drain` -coroutine can be used to wait until the write buffer is flushed. + asyncio.run(main()) -Protocol examples -================= +.. seealso:: + + The :ref:`TCP echo server using streams <asyncio-tcp-echo-server-streams>` + example uses the high-level :func:`asyncio.start_server` function. -.. _asyncio-tcp-echo-client-protocol: +.. _asyncio_example_tcp_echo_client_protocol: -TCP echo client protocol ------------------------- +TCP Echo Client +--------------- -TCP echo client using the :meth:`AbstractEventLoop.create_connection` method, send -data and wait until the connection is closed:: +A TCP echo client using the :meth:`loop.create_connection` method, sends +data, and waits until the connection is closed:: import asyncio + class EchoClientProtocol(asyncio.Protocol): - def __init__(self, message, loop): + def __init__(self, message, on_con_lost, loop): self.message = message self.loop = loop + self.on_con_lost = on_con_lost def connection_made(self, transport): transport.write(self.message.encode()) @@ -610,99 +781,99 @@ data and wait until the connection is closed:: def connection_lost(self, exc): print('The server closed the connection') - print('Stop the event loop') - self.loop.stop() - - loop = asyncio.get_event_loop() - message = 'Hello World!' - coro = loop.create_connection(lambda: EchoClientProtocol(message, loop), - '127.0.0.1', 8888) - loop.run_until_complete(coro) - loop.run_forever() - loop.close() - -The event loop is running twice. The -:meth:`~AbstractEventLoop.run_until_complete` method is preferred in this short -example to raise an exception if the server is not listening, instead of -having to write a short coroutine to handle the exception and stop the -running loop. At :meth:`~AbstractEventLoop.run_until_complete` exit, the loop is -no longer running, so there is no need to stop the loop in case of an error. + self.on_con_lost.set_result(True) + + + async def main(): + # Get a reference to the event loop as we plan to use + # low-level APIs. + loop = asyncio.get_running_loop() + + on_con_lost = loop.create_future() + message = 'Hello World!' + + transport, protocol = await loop.create_connection( + lambda: EchoClientProtocol(message, on_con_lost, loop), + '127.0.0.1', 8888) + + # Wait until the protocol signals that the connection + # is lost and close the transport. + try: + await on_con_lost + finally: + transport.close() + + + asyncio.run(main()) + .. seealso:: The :ref:`TCP echo client using streams <asyncio-tcp-echo-client-streams>` - example uses the :func:`asyncio.open_connection` function. + example uses the high-level :func:`asyncio.open_connection` function. -.. _asyncio-tcp-echo-server-protocol: +.. _asyncio-udp-echo-server-protocol: -TCP echo server protocol ------------------------- +UDP Echo Server +--------------- -TCP echo server using the :meth:`AbstractEventLoop.create_server` method, send back -received data and close the connection:: +A UDP echo server, using the :meth:`loop.create_datagram_endpoint` +method, sends back received data:: import asyncio - class EchoServerClientProtocol(asyncio.Protocol): + + class EchoServerProtocol: def connection_made(self, transport): - peername = transport.get_extra_info('peername') - print('Connection from {}'.format(peername)) self.transport = transport - def data_received(self, data): + def datagram_received(self, data, addr): message = data.decode() - print('Data received: {!r}'.format(message)) + print('Received %r from %s' % (message, addr)) + print('Send %r to %s' % (message, addr)) + self.transport.sendto(data, addr) - print('Send: {!r}'.format(message)) - self.transport.write(data) - print('Close the client socket') - self.transport.close() - - loop = asyncio.get_event_loop() - # Each client connection will create a new protocol instance - coro = loop.create_server(EchoServerClientProtocol, '127.0.0.1', 8888) - server = loop.run_until_complete(coro) + async def main(): + print("Starting UDP server") - # Serve requests until Ctrl+C is pressed - print('Serving on {}'.format(server.sockets[0].getsockname())) - try: - loop.run_forever() - except KeyboardInterrupt: - pass + # Get a reference to the event loop as we plan to use + # low-level APIs. + loop = asyncio.get_running_loop() - # Close the server - server.close() - loop.run_until_complete(server.wait_closed()) - loop.close() + # One protocol instance will be created to serve all + # client requests. + transport, protocol = await loop.create_datagram_endpoint( + lambda: EchoServerProtocol(), + local_addr=('127.0.0.1', 9999)) -:meth:`Transport.close` can be called immediately after -:meth:`WriteTransport.write` even if data are not sent yet on the socket: both -methods are asynchronous. ``await`` is not needed because these transport -methods are not coroutines. + try: + await asyncio.sleep(3600) # Serve for 1 hour. + finally: + transport.close() -.. seealso:: - The :ref:`TCP echo server using streams <asyncio-tcp-echo-server-streams>` - example uses the :func:`asyncio.start_server` function. + asyncio.run(main()) .. _asyncio-udp-echo-client-protocol: -UDP echo client protocol ------------------------- +UDP Echo Client +--------------- -UDP echo client using the :meth:`AbstractEventLoop.create_datagram_endpoint` -method, send data and close the transport when we received the answer:: +A UDP echo client, using the :meth:`loop.create_datagram_endpoint` +method, sends data and closes the transport when it receives the answer:: import asyncio + class EchoClientProtocol: def __init__(self, message, loop): self.message = message self.loop = loop self.transport = None + self.on_con_lost = loop.create_future() def connection_made(self, transport): self.transport = transport @@ -719,75 +890,46 @@ method, send data and close the transport when we received the answer:: print('Error received:', exc) def connection_lost(self, exc): - print("Socket closed, stop the event loop") - loop = asyncio.get_event_loop() - loop.stop() + print("Connection closed") + self.on_con_lost.set_result(True) - loop = asyncio.get_event_loop() - message = "Hello World!" - connect = loop.create_datagram_endpoint( - lambda: EchoClientProtocol(message, loop), - remote_addr=('127.0.0.1', 9999)) - transport, protocol = loop.run_until_complete(connect) - loop.run_forever() - transport.close() - loop.close() + async def main(): + # Get a reference to the event loop as we plan to use + # low-level APIs. + loop = asyncio.get_running_loop() -.. _asyncio-udp-echo-server-protocol: - -UDP echo server protocol ------------------------- - -UDP echo server using the :meth:`AbstractEventLoop.create_datagram_endpoint` -method, send back received data:: - - import asyncio - - class EchoServerProtocol: - def connection_made(self, transport): - self.transport = transport - - def datagram_received(self, data, addr): - message = data.decode() - print('Received %r from %s' % (message, addr)) - print('Send %r to %s' % (message, addr)) - self.transport.sendto(data, addr) + message = "Hello World!" + transport, protocol = await loop.create_datagram_endpoint( + lambda: EchoClientProtocol(message, loop), + remote_addr=('127.0.0.1', 9999)) - loop = asyncio.get_event_loop() - print("Starting UDP server") - # One protocol instance will be created to serve all client requests - listen = loop.create_datagram_endpoint( - EchoServerProtocol, local_addr=('127.0.0.1', 9999)) - transport, protocol = loop.run_until_complete(listen) + try: + await protocol.on_con_lost + finally: + transport.close() - try: - loop.run_forever() - except KeyboardInterrupt: - pass - transport.close() - loop.close() + asyncio.run(main()) -.. _asyncio-register-socket: +.. _asyncio_example_create_connection: -Register an open socket to wait for data using a protocol ---------------------------------------------------------- +Connecting Existing Sockets +--------------------------- Wait until a socket receives data using the -:meth:`AbstractEventLoop.create_connection` method with a protocol, and then close -the event loop :: +:meth:`loop.create_connection` method with a protocol:: import asyncio - from socket import socketpair + import socket - # Create a pair of connected sockets - rsock, wsock = socketpair() - loop = asyncio.get_event_loop() class MyProtocol(asyncio.Protocol): - transport = None + + def __init__(self, loop): + self.transport = None + self.on_con_lost = loop.create_future() def connection_made(self, transport): self.transport = transport @@ -795,35 +937,105 @@ the event loop :: def data_received(self, data): print("Received:", data.decode()) - # We are done: close the transport (it will call connection_lost()) + # We are done: close the transport; + # connection_lost() will be called automatically. self.transport.close() def connection_lost(self, exc): - # The socket has been closed, stop the event loop - loop.stop() + # The socket has been closed + self.on_con_lost.set_result(True) + + + async def main(): + # Get a reference to the event loop as we plan to use + # low-level APIs. + loop = asyncio.get_running_loop() + + # Create a pair of connected sockets + rsock, wsock = socket.socketpair() - # Register the socket to wait for data - connect_coro = loop.create_connection(MyProtocol, sock=rsock) - transport, protocol = loop.run_until_complete(connect_coro) + # Register the socket to wait for data. + transport, protocol = await loop.create_connection( + lambda: MyProtocol(loop), sock=rsock) - # Simulate the reception of data from the network - loop.call_soon(wsock.send, 'abc'.encode()) + # Simulate the reception of data from the network. + loop.call_soon(wsock.send, 'abc'.encode()) - # Run the event loop - loop.run_forever() + try: + await protocol.on_con_lost + finally: + transport.close() + wsock.close() - # We are done, close sockets and the event loop - rsock.close() - wsock.close() - loop.close() + asyncio.run(main()) .. seealso:: The :ref:`watch a file descriptor for read events - <asyncio-watch-read-event>` example uses the low-level - :meth:`AbstractEventLoop.add_reader` method to register the file descriptor of a - socket. + <asyncio_example_watch_fd>` example uses the low-level + :meth:`loop.add_reader` method to register an FD. The :ref:`register an open socket to wait for data using streams - <asyncio-register-socket-streams>` example uses high-level streams + <asyncio_example_create_connection-streams>` example uses high-level streams created by the :func:`open_connection` function in a coroutine. + +.. _asyncio_example_subprocess_proto: + +loop.subprocess_exec() and SubprocessProtocol +--------------------------------------------- + +An example of a subprocess protocol used to get the output of a +subprocess and to wait for the subprocess exit. + +The subprocess is created by th :meth:`loop.subprocess_exec` method:: + + import asyncio + import sys + + class DateProtocol(asyncio.SubprocessProtocol): + def __init__(self, exit_future): + self.exit_future = exit_future + self.output = bytearray() + + def pipe_data_received(self, fd, data): + self.output.extend(data) + + def process_exited(self): + self.exit_future.set_result(True) + + async def get_date(): + # Get a reference to the event loop as we plan to use + # low-level APIs. + loop = asyncio.get_running_loop() + + code = 'import datetime; print(datetime.datetime.now())' + exit_future = asyncio.Future(loop=loop) + + # Create the subprocess controlled by DateProtocol; + # redirect the standard output into a pipe. + transport, protocol = await loop.subprocess_exec( + lambda: DateProtocol(exit_future), + sys.executable, '-c', code, + stdin=None, stderr=None) + + # Wait for the subprocess exit using the process_exited() + # method of the protocol. + await exit_future + + # Close the stdout pipe. + transport.close() + + # Read the output which was collected by the + # pipe_data_received() method of the protocol. + data = bytes(protocol.output) + return data.decode('ascii').rstrip() + + if sys.platform == "win32": + asyncio.set_event_loop_policy( + asyncio.WindowsProactorEventLoopPolicy()) + + date = asyncio.run(get_date()) + print(f"Current date: {date}") + +See also the :ref:`same example <asyncio_example_create_subprocess_exec>` +written using high-level APIs. diff --git a/Doc/library/asyncio-queue.rst b/Doc/library/asyncio-queue.rst index 65497f29d8..bd0e70c0d9 100644 --- a/Doc/library/asyncio-queue.rst +++ b/Doc/library/asyncio-queue.rst @@ -1,41 +1,42 @@ .. currentmodule:: asyncio +.. _asyncio-queues: + +====== Queues ====== -**Source code:** :source:`Lib/asyncio/queues.py` - -Queues: +asyncio queues are designed to be similar to classes of the +:mod:`queue` module. Although asyncio queues are not thread-safe, +they are designed to be used specifically in async/await code. -* :class:`Queue` -* :class:`PriorityQueue` -* :class:`LifoQueue` +Note that methods of asyncio queues don't have a *timeout* parameter; +use :func:`asyncio.wait_for` function to do queue operations with a +timeout. -asyncio queue API was designed to be close to classes of the :mod:`queue` -module (:class:`~queue.Queue`, :class:`~queue.PriorityQueue`, -:class:`~queue.LifoQueue`), but it has no *timeout* parameter. The -:func:`asyncio.wait_for` function can be used to cancel a task after a timeout. +See also the `Examples`_ section below. Queue ------ +===== .. class:: Queue(maxsize=0, \*, loop=None) - A queue, useful for coordinating producer and consumer coroutines. + A first in, first out (FIFO) queue. - If *maxsize* is less than or equal to zero, the queue size is infinite. If - it is an integer greater than ``0``, then ``await put()`` will block - when the queue reaches *maxsize*, until an item is removed by :meth:`get`. + If *maxsize* is less than or equal to zero, the queue size is + infinite. If it is an integer greater than ``0``, then + ``await put()`` blocks when the queue reaches *maxsize* + until an item is removed by :meth:`get`. - Unlike the standard library :mod:`queue`, you can reliably know this Queue's - size with :meth:`qsize`, since your single-threaded asyncio application won't - be interrupted between calling :meth:`qsize` and doing an operation on the - Queue. + Unlike the standard library threading :mod:`queue`, the size of + the queue is always known and can be returned by calling the + :meth:`qsize` method. This class is :ref:`not thread safe <asyncio-multithreading>`. - .. versionchanged:: 3.4.4 - New :meth:`join` and :meth:`task_done` methods. + .. attribute:: maxsize + + Number of items allowed in the queue. .. method:: empty() @@ -45,53 +46,33 @@ Queue Return ``True`` if there are :attr:`maxsize` items in the queue. - .. note:: - - If the Queue was initialized with ``maxsize=0`` (the default), then - :meth:`full()` is never ``True``. + If the queue was initialized with ``maxsize=0`` (the default), + then :meth:`full()` never returns ``True``. .. coroutinemethod:: get() - Remove and return an item from the queue. If queue is empty, wait until - an item is available. - - This method is a :ref:`coroutine <coroutine>`. - - .. seealso:: - - The :meth:`empty` method. + Remove and return an item from the queue. If queue is empty, + wait until an item is available. .. method:: get_nowait() - Remove and return an item from the queue. - Return an item if one is immediately available, else raise :exc:`QueueEmpty`. .. coroutinemethod:: join() - Block until all items in the queue have been gotten and processed. - - The count of unfinished tasks goes up whenever an item is added to the - queue. The count goes down whenever a consumer thread calls - :meth:`task_done` to indicate that the item was retrieved and all work on - it is complete. When the count of unfinished tasks drops to zero, - :meth:`join` unblocks. - - This method is a :ref:`coroutine <coroutine>`. + Block until all items in the queue have been received and processed. - .. versionadded:: 3.4.4 + The count of unfinished tasks goes up whenever an item is added + to the queue. The count goes down whenever a consumer thread calls + :meth:`task_done` to indicate that the item was retrieved and all + work on it is complete. When the count of unfinished tasks drops + to zero, :meth:`join` unblocks. .. coroutinemethod:: put(item) - Put an item into the queue. If the queue is full, wait until a free slot - is available before adding item. - - This method is a :ref:`coroutine <coroutine>`. - - .. seealso:: - - The :meth:`full` method. + Put an item into the queue. If the queue is full, wait until a + free slot is available before adding the item. .. method:: put_nowait(item) @@ -101,60 +82,119 @@ Queue .. method:: qsize() - Number of items in the queue. + Return the number of items in the queue. .. method:: task_done() Indicate that a formerly enqueued task is complete. - Used by queue consumers. For each :meth:`~Queue.get` used to fetch a task, a - subsequent call to :meth:`task_done` tells the queue that the processing - on the task is complete. - - If a :meth:`join` is currently blocking, it will resume when all items - have been processed (meaning that a :meth:`task_done` call was received - for every item that had been :meth:`~Queue.put` into the queue). - - Raises :exc:`ValueError` if called more times than there were items - placed in the queue. + Used by queue consumers. For each :meth:`~Queue.get` used to + fetch a task, a subsequent call to :meth:`task_done` tells the + queue that the processing on the task is complete. - .. versionadded:: 3.4.4 - - .. attribute:: maxsize + If a :meth:`join` is currently blocking, it will resume when all + items have been processed (meaning that a :meth:`task_done` + call was received for every item that had been :meth:`~Queue.put` + into the queue). - Number of items allowed in the queue. + Raises :exc:`ValueError` if called more times than there were + items placed in the queue. -PriorityQueue -------------- +Priority Queue +============== .. class:: PriorityQueue - A subclass of :class:`Queue`; retrieves entries in priority order (lowest - first). + A variant of :class:`Queue`; retrieves entries in priority order + (lowest first). - Entries are typically tuples of the form: (priority number, data). + Entries are typically tuples of the form + ``(priority_number, data)``. -LifoQueue ---------- +LIFO Queue +========== .. class:: LifoQueue - A subclass of :class:`Queue` that retrieves most recently added entries - first. + A variant of :class:`Queue` that retrieves most recently added + entries first (last in, first out). Exceptions -^^^^^^^^^^ +========== .. exception:: QueueEmpty - Exception raised when the :meth:`~Queue.get_nowait` method is called on a - :class:`Queue` object which is empty. + This exception is raised when the :meth:`~Queue.get_nowait` method + is called on an empty queue. .. exception:: QueueFull - Exception raised when the :meth:`~Queue.put_nowait` method is called on a - :class:`Queue` object which is full. + Exception raised when the :meth:`~Queue.put_nowait` method is called + on a queue that has reached its *maxsize*. + + +Examples +======== + +.. _asyncio_example_queue_dist: + +Queues can be used to distribute workload between several +concurrent tasks:: + + import asyncio + import random + import time + + + async def worker(name, queue): + while True: + # Get a "work item" out of the queue. + sleep_for = await queue.get() + + # Sleep for the "sleep_for" seconds. + await asyncio.sleep(sleep_for) + + # Notify the queue that the "work item" has been processed. + queue.task_done() + + print(f'{name} has slept for {sleep_for:.2f} seconds') + + + async def main(): + # Create a queue that we will use to store our "workload". + queue = asyncio.Queue() + + # Generate random timings and put them into the queue. + total_sleep_time = 0 + for _ in range(20): + sleep_for = random.uniform(0.05, 1.0) + total_sleep_time += sleep_for + queue.put_nowait(sleep_for) + + # Create three worker tasks to process the queue concurrently. + tasks = [] + for i in range(3): + task = asyncio.create_task(worker(f'worker-{i}', queue)) + tasks.append(task) + + # Wait until the queue is fully processed. + started_at = time.monotonic() + await queue.join() + total_slept_for = time.monotonic() - started_at + + # Cancel our worker tasks. + for task in tasks: + task.cancel() + # Wait until all worker tasks are cancelled. + await asyncio.gather(*tasks, return_exceptions=True) + + print('====') + print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds') + print(f'total expected sleep time: {total_sleep_time:.2f} seconds') + + + asyncio.run(main()) diff --git a/Doc/library/asyncio-stream.rst b/Doc/library/asyncio-stream.rst index ca7daabdad..c543aa6d41 100644 --- a/Doc/library/asyncio-stream.rst +++ b/Doc/library/asyncio-stream.rst @@ -2,85 +2,120 @@ .. _asyncio-streams: -+++++++++++++++++++++++++++++ -Streams (coroutine based API) -+++++++++++++++++++++++++++++ +======= +Streams +======= -**Source code:** :source:`Lib/asyncio/streams.py` +Streams are high-level async/await-ready primitives to work with +network connections. Streams allow sending and receiving data without +using callbacks or low-level protocols and transports. -Stream functions -================ +.. _asyncio_example_stream: -.. note:: +Here is an example of a TCP echo client written using asyncio +streams:: - The top-level functions in this module are meant as convenience wrappers - only; there's really nothing special there, and if they don't do - exactly what you want, feel free to copy their code. + import asyncio + + async def tcp_echo_client(message): + reader, writer = await asyncio.open_connection( + '127.0.0.1', 8888) + print(f'Send: {message!r}') + writer.write(message.encode()) -.. coroutinefunction:: open_connection(host=None, port=None, \*, loop=None, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None) + data = await reader.read(100) + print(f'Received: {data.decode()!r}') - A wrapper for :meth:`~AbstractEventLoop.create_connection()` returning a (reader, - writer) pair. + print('Close the connection') + writer.close() + await writer.wait_closed() - The reader returned is a :class:`StreamReader` instance; the writer is - a :class:`StreamWriter` instance. + asyncio.run(tcp_echo_client('Hello World!')) - When specified, the *loop* argument determines which event loop to use, - and the *limit* argument determines the buffer size limit used by the - returned :class:`StreamReader` instance. - The rest of the arguments are passed directly to - :meth:`AbstractEventLoop.create_connection`. +See also the `Examples`_ section below. + + +.. rubric:: Stream Functions + +The following top-level asyncio functions can be used to create +and work with streams: + + +.. coroutinefunction:: open_connection(host=None, port=None, \*, \ + loop=None, limit=None, ssl=None, family=0, \ + proto=0, flags=0, sock=None, local_addr=None, \ + server_hostname=None, ssl_handshake_timeout=None) + + Establish a network connection and return a pair of + ``(reader, writer)`` objects. - This function is a :ref:`coroutine <coroutine>`. + The returned *reader* and *writer* objects are instances of + :class:`StreamReader` and :class:`StreamWriter` classes. + + The *loop* argument is optional and can always be determined + automatically when this function is awaited from a coroutine. + + *limit* determines the buffer size limit used by the + returned :class:`StreamReader` instance. By default the *limit* + is set to 64 KiB. + + The rest of the arguments are passed directly to + :meth:`loop.create_connection`. .. versionadded:: 3.7 The *ssl_handshake_timeout* parameter. -.. coroutinefunction:: start_server(client_connected_cb, host=None, port=None, \*, loop=None, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, start_serving=True) +.. coroutinefunction:: start_server(client_connected_cb, host=None, \ + port=None, \*, loop=None, limit=None, \ + family=socket.AF_UNSPEC, \ + flags=socket.AI_PASSIVE, sock=None, \ + backlog=100, ssl=None, reuse_address=None, \ + reuse_port=None, ssl_handshake_timeout=None, \ + start_serving=True) - Start a socket server, with a callback for each client connected. The return - value is the same as :meth:`~AbstractEventLoop.create_server()`. + Start a socket server. The *client_connected_cb* callback is called whenever a new client - connection is established. It receives a reader/writer pair as two - arguments, the first is a :class:`StreamReader` instance, - and the second is a :class:`StreamWriter` instance. + connection is established. It receives a ``(reader, writer)`` pair + as two arguments, instances of the :class:`StreamReader` and + :class:`StreamWriter` classes. - *client_connected_cb* accepts a plain callable or a + *client_connected_cb* can be a plain callable or a :ref:`coroutine function <coroutine>`; if it is a coroutine function, - it will be automatically converted into a :class:`Task`. + it will be automatically scheduled as a :class:`Task`. - When specified, the *loop* argument determines which event loop to use, - and the *limit* argument determines the buffer size limit used by the - :class:`StreamReader` instance passed to *client_connected_cb*. + The *loop* argument is optional and can always be determined + automatically when this method is awaited from a coroutine. - The rest of the arguments are passed directly to - :meth:`~AbstractEventLoop.create_server()`. + *limit* determines the buffer size limit used by the + returned :class:`StreamReader` instance. By default the *limit* + is set to 64 KiB. - This function is a :ref:`coroutine <coroutine>`. + The rest of the arguments are passed directly to + :meth:`loop.create_server`. .. versionadded:: 3.7 The *ssl_handshake_timeout* and *start_serving* parameters. -.. coroutinefunction:: open_unix_connection(path=None, \*, loop=None, limit=None, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None) - A wrapper for :meth:`~AbstractEventLoop.create_unix_connection()` returning - a (reader, writer) pair. +.. rubric:: Unix Sockets - When specified, the *loop* argument determines which event loop to use, - and the *limit* argument determines the buffer size limit used by the - returned :class:`StreamReader` instance. +.. coroutinefunction:: open_unix_connection(path=None, \*, loop=None, \ + limit=None, ssl=None, sock=None, \ + server_hostname=None, ssl_handshake_timeout=None) - The rest of the arguments are passed directly to - :meth:`~AbstractEventLoop.create_unix_connection()`. + Establish a Unix socket connection and return a pair of + ``(reader, writer)``. + + Similar to :func:`open_connection` but operates on Unix sockets. - This function is a :ref:`coroutine <coroutine>`. + See also the documentation of :meth:`loop.create_unix_connection`. - Availability: UNIX. + Availability: Unix. .. versionadded:: 3.7 @@ -90,29 +125,19 @@ Stream functions The *path* parameter can now be a :term:`path-like object` -.. coroutinefunction:: start_unix_server(client_connected_cb, path=None, \*, loop=None, limit=None, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, start_serving=True) - Start a UNIX Domain Socket server, with a callback for each client connected. +.. coroutinefunction:: start_unix_server(client_connected_cb, path=None, \ + \*, loop=None, limit=None, sock=None, \ + backlog=100, ssl=None, ssl_handshake_timeout=None, \ + start_serving=True) - The *client_connected_cb* callback is called whenever a new client - connection is established. It receives a reader/writer pair as two - arguments, the first is a :class:`StreamReader` instance, - and the second is a :class:`StreamWriter` instance. + Start a Unix socket server. - *client_connected_cb* accepts a plain callable or a - :ref:`coroutine function <coroutine>`; if it is a coroutine function, - it will be automatically converted into a :class:`Task`. + Similar to :func:`start_server` but works with Unix sockets. - When specified, the *loop* argument determines which event loop to use, - and the *limit* argument determines the buffer size limit used by the - :class:`StreamReader` instance passed to *client_connected_cb*. + See also the documentation of :meth:`loop.create_unix_server`. - The rest of the arguments are passed directly to - :meth:`~AbstractEventLoop.create_unix_server()`. - - This function is a :ref:`coroutine <coroutine>`. - - Availability: UNIX. + Availability: Unix. .. versionadded:: 3.7 @@ -123,229 +148,156 @@ Stream functions The *path* parameter can now be a :term:`path-like object`. -StreamReader -============ - -.. class:: StreamReader(limit=_DEFAULT_LIMIT, loop=None) - - This class is :ref:`not thread safe <asyncio-multithreading>`. - - The *limit* argument's default value is set to _DEFAULT_LIMIT which is 2**16 (64 KiB) - - .. method:: exception() - - Get the exception. - - .. method:: feed_eof() - - Acknowledge the EOF. +--------- - .. method:: feed_data(data) - Feed *data* bytes in the internal buffer. Any operations waiting - for the data will be resumed. - - .. method:: set_exception(exc) +StreamReader +============ - Set the exception. +.. class:: StreamReader - .. method:: set_transport(transport) + Represents a reader object that provides APIs to read data + from the IO stream. - Set the transport. + It is not recommended to instantiate *StreamReader* objects + directly; use :func:`open_connection` and :func:`start_server` + instead. .. coroutinemethod:: read(n=-1) Read up to *n* bytes. If *n* is not provided, or set to ``-1``, read until EOF and return all read bytes. - If the EOF was received and the internal buffer is empty, + If EOF was received and the internal buffer is empty, return an empty ``bytes`` object. - This method is a :ref:`coroutine <coroutine>`. - .. coroutinemethod:: readline() - Read one line, where "line" is a sequence of bytes ending with ``\n``. + Read one line, where "line" is a sequence of bytes + ending with ``\n``. - If EOF is received, and ``\n`` was not found, the method will - return the partial read bytes. + If EOF is received and ``\n`` was not found, the method + returns partially read data. - If the EOF was received and the internal buffer is empty, + If EOF is received and the internal buffer is empty, return an empty ``bytes`` object. - This method is a :ref:`coroutine <coroutine>`. - .. coroutinemethod:: readexactly(n) - Read exactly *n* bytes. Raise an :exc:`IncompleteReadError` if the end of - the stream is reached before *n* can be read, the - :attr:`IncompleteReadError.partial` attribute of the exception contains - the partial read bytes. + Read exactly *n* bytes. - This method is a :ref:`coroutine <coroutine>`. + Raise an :exc:`IncompleteReadError` if EOF is reached before *n* + can be read. Use the :attr:`IncompleteReadError.partial` + attribute to get the partially read data. .. coroutinemethod:: readuntil(separator=b'\\n') - Read data from the stream until ``separator`` is found. + Read data from the stream until *separator* is found. On success, the data and separator will be removed from the internal buffer (consumed). Returned data will include the separator at the end. - Configured stream limit is used to check result. Limit sets the - maximal length of data that can be returned, not counting the - separator. - - If an EOF occurs and the complete separator is still not found, - an :exc:`IncompleteReadError` exception will be - raised, and the internal buffer will be reset. The - :attr:`IncompleteReadError.partial` attribute may contain the - separator partially. + If the amount of data read exceeds the configured stream limit, a + :exc:`LimitOverrunError` exception is raised, and the data + is left in the internal buffer and can be read again. - If the data cannot be read because of over limit, a - :exc:`LimitOverrunError` exception will be raised, and the data - will be left in the internal buffer, so it can be read again. + If EOF is reached before the complete separator is found, + an :exc:`IncompleteReadError` exception is raised, and the internal + buffer is reset. The :attr:`IncompleteReadError.partial` attribute + may contain a portion of the separator. .. versionadded:: 3.5.2 .. method:: at_eof() - Return ``True`` if the buffer is empty and :meth:`feed_eof` was called. + Return ``True`` if the buffer is empty and :meth:`feed_eof` + was called. StreamWriter ============ -.. class:: StreamWriter(transport, protocol, reader, loop) +.. class:: StreamWriter - Wraps a Transport. + Represents a writer object that provides APIs to write data + to the IO stream. - This exposes :meth:`write`, :meth:`writelines`, :meth:`can_write_eof()`, - :meth:`write_eof`, :meth:`get_extra_info` and :meth:`close`. It adds - :meth:`drain` which returns an optional :class:`Future` on which you can - wait for flow control. It also adds a transport attribute which references - the :class:`Transport` directly. - - This class is :ref:`not thread safe <asyncio-multithreading>`. - - .. attribute:: transport - - Transport. + It is not recommended to instantiate *StreamWriter* objects + directly; use :func:`open_connection` and :func:`start_server` + instead. .. method:: can_write_eof() - Return :const:`True` if the transport supports :meth:`write_eof`, - :const:`False` if not. See :meth:`WriteTransport.can_write_eof`. - - .. method:: close() - - Close the transport: see :meth:`BaseTransport.close`. - - .. method:: is_closing() - - Return ``True`` if the writer is closing or is closed. - - .. versionadded:: 3.7 - - .. coroutinemethod:: wait_closed() + Return *True* if the underlying transport supports + the :meth:`write_eof` method, *False* otherwise. - Wait until the writer is closed. - - Should be called after :meth:`close` to wait until the underlying - connection (and the associated transport/protocol pair) is closed. - - .. versionadded:: 3.7 - - .. coroutinemethod:: drain() - - Let the write buffer of the underlying transport a chance to be flushed. - - The intended use is to write:: - - w.write(data) - await w.drain() + .. method:: write_eof() - When the size of the transport buffer reaches the high-water limit (the - protocol is paused), block until the size of the buffer is drained down - to the low-water limit and the protocol is resumed. When there is nothing - to wait for, the yield-from continues immediately. + Close the write end of the stream after the buffered write + data is flushed. - Yielding from :meth:`drain` gives the opportunity for the loop to - schedule the write operation and flush the buffer. It should especially - be used when a possibly large amount of data is written to the transport, - and the coroutine does not yield-from between calls to :meth:`write`. + .. attribute:: transport - This method is a :ref:`coroutine <coroutine>`. + Return the underlying asyncio transport. .. method:: get_extra_info(name, default=None) - Return optional transport information: see - :meth:`BaseTransport.get_extra_info`. + Access optional transport information; see + :meth:`BaseTransport.get_extra_info` for details. .. method:: write(data) - Write some *data* bytes to the transport: see - :meth:`WriteTransport.write`. - - .. method:: writelines(data) + Write *data* to the stream. - Write a list (or any iterable) of data bytes to the transport: - see :meth:`WriteTransport.writelines`. - - .. method:: write_eof() + This method is not subject to flow control. Calls to ``write()`` should + be followed by :meth:`drain`. - Close the write end of the transport after flushing buffered data: - see :meth:`WriteTransport.write_eof`. - - -StreamReaderProtocol -==================== - -.. class:: StreamReaderProtocol(stream_reader, client_connected_cb=None, loop=None) - - Trivial helper class to adapt between :class:`Protocol` and - :class:`StreamReader`. Subclass of :class:`Protocol`. - - *stream_reader* is a :class:`StreamReader` instance, *client_connected_cb* - is an optional function called with (stream_reader, stream_writer) when a - connection is made, *loop* is the event loop instance to use. + .. method:: writelines(data) - (This is a helper class instead of making :class:`StreamReader` itself a - :class:`Protocol` subclass, because the :class:`StreamReader` has other - potential uses, and to prevent the user of the :class:`StreamReader` from - accidentally calling inappropriate methods of the protocol.) + Write a list (or any iterable) of bytes to the stream. + This method is not subject to flow control. Calls to ``writelines()`` + should be followed by :meth:`drain`. -IncompleteReadError -=================== + .. coroutinemethod:: drain() -.. exception:: IncompleteReadError + Wait until it is appropriate to resume writing to the stream. + Example:: - Incomplete read error, subclass of :exc:`EOFError`. + writer.write(data) + await writer.drain() - .. attribute:: expected + This is a flow control method that interacts with the underlying + IO write buffer. When the size of the buffer reaches + the high watermark, *drain()* blocks until the size of the + buffer is drained down to the low watermark and writing can + be resumed. When there is nothing to wait for, the :meth:`drain` + returns immediately. - Total number of expected bytes (:class:`int`). + .. method:: close() - .. attribute:: partial + Close the stream. - Read bytes string before the end of stream was reached (:class:`bytes`). + .. method:: is_closing() + Return ``True`` if the stream is closed or in the process of + being closed. -LimitOverrunError -================= + .. versionadded:: 3.7 -.. exception:: LimitOverrunError + .. coroutinemethod:: wait_closed() - Reached the buffer limit while looking for a separator. + Wait until the stream is closed. - .. attribute:: consumed + Should be called after :meth:`close` to wait until the underlying + connection is closed. - Total number of to be consumed bytes. + .. versionadded:: 3.7 -Stream examples -=============== +Examples +======== .. _asyncio-tcp-echo-client-streams: @@ -356,28 +308,26 @@ TCP echo client using the :func:`asyncio.open_connection` function:: import asyncio - async def tcp_echo_client(message, loop): - reader, writer = await asyncio.open_connection('127.0.0.1', 8888, - loop=loop) + async def tcp_echo_client(message): + reader, writer = await asyncio.open_connection( + '127.0.0.1', 8888) - print('Send: %r' % message) + print(f'Send: {message!r}') writer.write(message.encode()) data = await reader.read(100) - print('Received: %r' % data.decode()) + print(f'Received: {data.decode()!r}') - print('Close the socket') + print('Close the connection') writer.close() - message = 'Hello World!' - loop = asyncio.get_event_loop() - loop.run_until_complete(tcp_echo_client(message, loop)) - loop.close() + asyncio.run(tcp_echo_client('Hello World!')) + .. seealso:: - The :ref:`TCP echo client protocol <asyncio-tcp-echo-client-protocol>` - example uses the :meth:`AbstractEventLoop.create_connection` method. + The :ref:`TCP echo client protocol <asyncio_example_tcp_echo_client_protocol>` + example uses the low-level :meth:`loop.create_connection` method. .. _asyncio-tcp-echo-server-streams: @@ -393,35 +343,33 @@ TCP echo server using the :func:`asyncio.start_server` function:: data = await reader.read(100) message = data.decode() addr = writer.get_extra_info('peername') - print("Received %r from %r" % (message, addr)) - print("Send: %r" % message) + print(f"Received {message!r} from {addr!r}") + + print(f"Send: {message!r}") writer.write(data) await writer.drain() - print("Close the client socket") + print("Close the connection") writer.close() - loop = asyncio.get_event_loop() - coro = asyncio.start_server(handle_echo, '127.0.0.1', 8888, loop=loop) - server = loop.run_until_complete(coro) + async def main(): + server = await asyncio.start_server( + handle_echo, '127.0.0.1', 8888) - # Serve requests until Ctrl+C is pressed - print('Serving on {}'.format(server.sockets[0].getsockname())) - try: - loop.run_forever() - except KeyboardInterrupt: - pass + addr = server.sockets[0].getsockname() + print(f'Serving on {addr}') + + async with server: + await server.serve_forever() + + asyncio.run(main()) - # Close the server - server.close() - loop.run_until_complete(server.wait_closed()) - loop.close() .. seealso:: - The :ref:`TCP echo server protocol <asyncio-tcp-echo-server-protocol>` - example uses the :meth:`AbstractEventLoop.create_server` method. + The :ref:`TCP echo server protocol <asyncio_example_tcp_echo_server_protocol>` + example uses the :meth:`loop.create_server` method. Get HTTP headers @@ -436,30 +384,34 @@ Simple example querying HTTP headers of the URL passed on the command line:: async def print_http_headers(url): url = urllib.parse.urlsplit(url) if url.scheme == 'https': - connect = asyncio.open_connection(url.hostname, 443, ssl=True) + reader, writer = await asyncio.open_connection( + url.hostname, 443, ssl=True) else: - connect = asyncio.open_connection(url.hostname, 80) - reader, writer = await connect - query = ('HEAD {path} HTTP/1.0\r\n' - 'Host: {hostname}\r\n' - '\r\n').format(path=url.path or '/', hostname=url.hostname) + reader, writer = await asyncio.open_connection( + url.hostname, 80) + + query = ( + f"HEAD {url.path or '/'} HTTP/1.0\r\n" + f"Host: {url.hostname}\r\n" + f"\r\n" + ) + writer.write(query.encode('latin-1')) while True: line = await reader.readline() if not line: break + line = line.decode('latin1').rstrip() if line: - print('HTTP header> %s' % line) + print(f'HTTP header> {line}') # Ignore the body, close the socket writer.close() url = sys.argv[1] - loop = asyncio.get_event_loop() - task = asyncio.ensure_future(print_http_headers(url)) - loop.run_until_complete(task) - loop.close() + asyncio.run(print_http_headers(url)) + Usage:: @@ -469,7 +421,8 @@ or with HTTPS:: python example.py https://example.com/path/page.html -.. _asyncio-register-socket-streams: + +.. _asyncio_example_create_connection-streams: Register an open socket to wait for data using streams ------------------------------------------------------ @@ -478,14 +431,18 @@ Coroutine waiting until a socket receives data using the :func:`open_connection` function:: import asyncio - from socket import socketpair + import socket + + async def wait_for_data(): + # Get a reference to the current event loop because + # we want to access low-level APIs. + loop = asyncio.get_running_loop() - async def wait_for_data(loop): - # Create a pair of connected sockets - rsock, wsock = socketpair() + # Create a pair of connected sockets. + rsock, wsock = socket.socketpair() - # Register the open socket to wait for data - reader, writer = await asyncio.open_connection(sock=rsock, loop=loop) + # Register the open socket to wait for data. + reader, writer = await asyncio.open_connection(sock=rsock) # Simulate the reception of data from the network loop.call_soon(wsock.send, 'abc'.encode()) @@ -500,17 +457,14 @@ Coroutine waiting until a socket receives data using the # Close the second socket wsock.close() - loop = asyncio.get_event_loop() - loop.run_until_complete(wait_for_data(loop)) - loop.close() + asyncio.run(wait_for_data()) .. seealso:: The :ref:`register an open socket to wait for data using a protocol - <asyncio-register-socket>` example uses a low-level protocol created by the - :meth:`AbstractEventLoop.create_connection` method. + <asyncio_example_create_connection>` example uses a low-level protocol and + the :meth:`loop.create_connection` method. The :ref:`watch a file descriptor for read events - <asyncio-watch-read-event>` example uses the low-level - :meth:`AbstractEventLoop.add_reader` method to register the file descriptor of a - socket. + <asyncio_example_watch_fd>` example uses the low-level + :meth:`loop.add_reader` method to watch a file descriptor. diff --git a/Doc/library/asyncio-subprocess.rst b/Doc/library/asyncio-subprocess.rst index 60e174574b..0bcf66175c 100644 --- a/Doc/library/asyncio-subprocess.rst +++ b/Doc/library/asyncio-subprocess.rst @@ -2,245 +2,220 @@ .. _asyncio-subprocess: -Subprocess -========== +============ +Subprocesses +============ -**Source code:** :source:`Lib/asyncio/subprocess.py` +This section describes high-level async/await asyncio APIs to +create and manage subprocesses. -Windows event loop ------------------- +.. _asyncio_example_subprocess_shell: -On Windows, the default event loop is :class:`SelectorEventLoop` which does not -support subprocesses. :class:`ProactorEventLoop` should be used instead. -Example to use it on Windows:: +Here's an example of how asyncio can run a shell command and +obtain its result:: - import asyncio, sys - - if sys.platform == 'win32': - loop = asyncio.ProactorEventLoop() - asyncio.set_event_loop(loop) - -.. seealso:: - - :ref:`Available event loops <asyncio-event-loops>` and :ref:`Platform - support <asyncio-platform-support>`. - - -Create a subprocess: high-level API using Process -------------------------------------------------- - -.. coroutinefunction:: create_subprocess_exec(\*args, stdin=None, stdout=None, stderr=None, loop=None, limit=None, \*\*kwds) - - Create a subprocess. - - The *limit* parameter sets the buffer limit passed to the - :class:`StreamReader`. See :meth:`AbstractEventLoop.subprocess_exec` for other - parameters. - - Return a :class:`~asyncio.subprocess.Process` instance. - - This function is a :ref:`coroutine <coroutine>`. - -.. coroutinefunction:: create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None, loop=None, limit=None, \*\*kwds) - - Run the shell command *cmd*. - - The *limit* parameter sets the buffer limit passed to the - :class:`StreamReader`. See :meth:`AbstractEventLoop.subprocess_shell` for other - parameters. + import asyncio - Return a :class:`~asyncio.subprocess.Process` instance. + async def run(cmd): + proc = await asyncio.create_subprocess_shell( + cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE) - It is the application's responsibility to ensure that all whitespace and - metacharacters are quoted appropriately to avoid `shell injection - <https://en.wikipedia.org/wiki/Shell_injection#Shell_injection>`_ - vulnerabilities. The :func:`shlex.quote` function can be used to properly - escape whitespace and shell metacharacters in strings that are going to be - used to construct shell commands. + stdout, stderr = await proc.communicate() - This function is a :ref:`coroutine <coroutine>`. + print(f'[{cmd!r} exited with {proc.returncode}]') + if stdout: + print(f'[stdout]\n{stdout.decode()}') + if stderr: + print(f'[stderr]\n{stderr.decode()}') -Use the :meth:`AbstractEventLoop.connect_read_pipe` and -:meth:`AbstractEventLoop.connect_write_pipe` methods to connect pipes. + asyncio.run(run('ls /zzz')) +will print:: -Create a subprocess: low-level API using subprocess.Popen ---------------------------------------------------------- + ['ls /zzz' exited with 1] + [stderr] + ls: /zzz: No such file or directory -Run subprocesses asynchronously using the :mod:`subprocess` module. +Because all asyncio subprocess functions are asynchronous and asyncio +provides many tools to work with such functions, it is easy to execute +and monitor multiple subprocesses in parallel. It is indeed trivial +to modify the above example to run several commands simultaneously:: -.. coroutinemethod:: AbstractEventLoop.subprocess_exec(protocol_factory, \*args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, \*\*kwargs) + async def main(): + await asyncio.gather( + run('ls /zzz'), + run('sleep 1; echo "hello"')) - Create a subprocess from one or more string arguments (character strings or - bytes strings encoded to the :ref:`filesystem encoding - <filesystem-encoding>`), where the first string - specifies the program to execute, and the remaining strings specify the - program's arguments. (Thus, together the string arguments form the - ``sys.argv`` value of the program, assuming it is a Python script.) This is - similar to the standard library :class:`subprocess.Popen` class called with - shell=False and the list of strings passed as the first argument; - however, where :class:`~subprocess.Popen` takes a single argument which is - list of strings, :func:`subprocess_exec` takes multiple string arguments. + asyncio.run(main()) - The *protocol_factory* must instantiate a subclass of the - :class:`asyncio.SubprocessProtocol` class. +See also the `Examples`_ subsection. - Other parameters: - * *stdin*: Either a file-like object representing the pipe to be connected - to the subprocess's standard input stream using - :meth:`~AbstractEventLoop.connect_write_pipe`, or the constant - :const:`subprocess.PIPE` (the default). By default a new pipe will be - created and connected. +Creating Subprocesses +===================== - * *stdout*: Either a file-like object representing the pipe to be connected - to the subprocess's standard output stream using - :meth:`~AbstractEventLoop.connect_read_pipe`, or the constant - :const:`subprocess.PIPE` (the default). By default a new pipe will be - created and connected. +.. coroutinefunction:: create_subprocess_exec(\*args, stdin=None, \ + stdout=None, stderr=None, loop=None, \ + limit=None, \*\*kwds) - * *stderr*: Either a file-like object representing the pipe to be connected - to the subprocess's standard error stream using - :meth:`~AbstractEventLoop.connect_read_pipe`, or one of the constants - :const:`subprocess.PIPE` (the default) or :const:`subprocess.STDOUT`. - By default a new pipe will be created and connected. When - :const:`subprocess.STDOUT` is specified, the subprocess's standard error - stream will be connected to the same pipe as the standard output stream. + Create a subprocess. - * All other keyword arguments are passed to :class:`subprocess.Popen` - without interpretation, except for *bufsize*, *universal_newlines* and - *shell*, which should not be specified at all. + The *limit* argument sets the buffer limit for :class:`StreamReader` + wrappers for :attr:`Process.stdout` and :attr:`Process.stderr` + (if :attr:`subprocess.PIPE` is passed to *stdout* and *stderr* arguments). - Returns a pair of ``(transport, protocol)``, where *transport* is an - instance of :class:`BaseSubprocessTransport`. + Return a :class:`~asyncio.subprocess.Process` instance. - This method is a :ref:`coroutine <coroutine>`. + See the documentation of :meth:`loop.subprocess_exec` for other + parameters. - See the constructor of the :class:`subprocess.Popen` class for parameters. +.. coroutinefunction:: create_subprocess_shell(cmd, stdin=None, \ + stdout=None, stderr=None, loop=None, \ + limit=None, \*\*kwds) -.. coroutinemethod:: AbstractEventLoop.subprocess_shell(protocol_factory, cmd, \*, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, \*\*kwargs) + Run the *cmd* shell command. - Create a subprocess from *cmd*, which is a character string or a bytes - string encoded to the :ref:`filesystem encoding <filesystem-encoding>`, - using the platform's "shell" syntax. This is similar to the standard library - :class:`subprocess.Popen` class called with ``shell=True``. + The *limit* argument sets the buffer limit for :class:`StreamReader` + wrappers for :attr:`Process.stdout` and :attr:`Process.stderr` + (if :attr:`subprocess.PIPE` is passed to *stdout* and *stderr* arguments). - The *protocol_factory* must instantiate a subclass of the - :class:`asyncio.SubprocessProtocol` class. + Return a :class:`~asyncio.subprocess.Process` instance. - See :meth:`~AbstractEventLoop.subprocess_exec` for more details about - the remaining arguments. + See the documentation of :meth:`loop.subprocess_shell` for other + parameters. - Returns a pair of ``(transport, protocol)``, where *transport* is an - instance of :class:`BaseSubprocessTransport`. +.. important:: It is the application's responsibility to ensure that all whitespace and - metacharacters are quoted appropriately to avoid `shell injection + special characters are quoted appropriately to avoid `shell injection <https://en.wikipedia.org/wiki/Shell_injection#Shell_injection>`_ vulnerabilities. The :func:`shlex.quote` function can be used to properly - escape whitespace and shell metacharacters in strings that are going to be - used to construct shell commands. + escape whitespace and special shell characters in strings that are going + to be used to construct shell commands. - This method is a :ref:`coroutine <coroutine>`. +.. note:: + + The default asyncio event loop implementation on **Windows** does not + support subprocesses. Subprocesses are available for Windows if a + :class:`ProactorEventLoop` is used. + See :ref:`Subprocess Support on Windows <asyncio-windows-subprocess>` + for details. .. seealso:: - The :meth:`AbstractEventLoop.connect_read_pipe` and - :meth:`AbstractEventLoop.connect_write_pipe` methods. + asyncio also has the following *low-level* APIs to work with subprocesses: + :meth:`loop.subprocess_exec`, :meth:`loop.subprocess_shell`, + :meth:`loop.connect_read_pipe`, :meth:`loop.connect_write_pipe`, + as well as the :ref:`Subprocess Transports <asyncio-subprocess-transports>` + and :ref:`Subprocess Protocols <asyncio-subprocess-protocols>`. Constants ---------- +========= .. data:: asyncio.subprocess.PIPE - Special value that can be used as the *stdin*, *stdout* or *stderr* argument - to :func:`create_subprocess_shell` and :func:`create_subprocess_exec` and - indicates that a pipe to the standard stream should be opened. + Can be passed to the *stdin*, *stdout* or *stderr* parameters. + + If *PIPE* is passed to *stdin* argument, the + :attr:`Process.stdin <asyncio.subprocess.Process.stdin>` attribute + will point to a :class:`StreamWriter` instance. + + If *PIPE* is passed to *stdout* or *stderr* arguments, the + :attr:`Process.stdout <asyncio.subprocess.Process.stdout>` and + :attr:`Process.stderr <asyncio.subprocess.Process.stderr>` + attributes will point to :class:`StreamReader` instances. .. data:: asyncio.subprocess.STDOUT - Special value that can be used as the *stderr* argument to - :func:`create_subprocess_shell` and :func:`create_subprocess_exec` and - indicates that standard error should go into the same handle as standard - output. + Special value that can be used as the *stderr* argument and indicates + that standard error should be redirected into standard output. .. data:: asyncio.subprocess.DEVNULL Special value that can be used as the *stdin*, *stdout* or *stderr* argument - to :func:`create_subprocess_shell` and :func:`create_subprocess_exec` and - indicates that the special file :data:`os.devnull` will be used. + to process creation functions. It indicates that the special file + :data:`os.devnull` will be used for the corresponding subprocess stream. -Process -------- +Interacting with Subprocesses +============================= + +Both :func:`create_subprocess_exec` and :func:`create_subprocess_shell` +functions return instances of the *Process* class. *Process* is a high-level +wrapper that allows communicating with subprocesses and watching for +their completion. .. class:: asyncio.subprocess.Process - A subprocess created by the :func:`create_subprocess_exec` or the - :func:`create_subprocess_shell` function. + An object that wraps OS processes created by the + :func:`create_subprocess_exec` and :func:`create_subprocess_shell` + functions. + + This class is designed to have a similar API to the + :class:`subprocess.Popen` class, but there are some + notable differences: - The API of the :class:`~asyncio.subprocess.Process` class was designed to be - close to the API of the :class:`subprocess.Popen` class, but there are some - differences: + * unlike Popen, Process instances do not have an equivalent to + the :meth:`~subprocess.Popen.poll` method; - * There is no explicit :meth:`~subprocess.Popen.poll` method - * The :meth:`~subprocess.Popen.communicate` and - :meth:`~subprocess.Popen.wait` methods don't take a *timeout* parameter: - use the :func:`wait_for` function - * The *universal_newlines* parameter is not supported (only bytes strings - are supported) - * The :meth:`~asyncio.subprocess.Process.wait` method of - the :class:`~asyncio.subprocess.Process` class is asynchronous whereas the - :meth:`~subprocess.Popen.wait` method of the :class:`~subprocess.Popen` - class is implemented as a busy loop. + * the :meth:`~asyncio.subprocess.Process.communicate` and + :meth:`~asyncio.subprocess.Process.wait` methods don't have a + *timeout* parameter: use the :func:`wait_for` function; - This class is :ref:`not thread safe <asyncio-multithreading>`. See also the - :ref:`Subprocess and threads <asyncio-subprocess-threads>` section. + * the :meth:`Process.wait() <asyncio.subprocess.Process.wait>` method + is asynchronous, whereas :meth:`subprocess.Popen.wait` method + is implemented as a blocking busy loop; + + * the *universal_newlines* parameter is not supported. + + This class is :ref:`not thread safe <asyncio-multithreading>`. + + See also the :ref:`Subprocess and Threads <asyncio-subprocess-threads>` + section. .. coroutinemethod:: wait() - Wait for child process to terminate. Set and return :attr:`returncode` - attribute. + Wait for the child process to terminate. - This method is a :ref:`coroutine <coroutine>`. + Set and return the :attr:`returncode` attribute. .. note:: - This will deadlock when using ``stdout=PIPE`` or ``stderr=PIPE`` and - the child process generates enough output to a pipe such that it - blocks waiting for the OS pipe buffer to accept more data. Use the - :meth:`communicate` method when using pipes to avoid that. + This method can deadlock when using ``stdout=PIPE`` or + ``stderr=PIPE`` and the child process generates so much output + that it blocks waiting for the OS pipe buffer to accept + more data. Use the :meth:`communicate` method when using pipes + to avoid this condition. .. coroutinemethod:: communicate(input=None) - Interact with process: Send data to stdin. Read data from stdout and - stderr, until end-of-file is reached. Wait for process to terminate. - The optional *input* argument should be data to be sent to the child - process, or ``None``, if no data should be sent to the child. The type - of *input* must be bytes. + Interact with process: - :meth:`communicate` returns a tuple ``(stdout_data, stderr_data)``. + 1. send data to *stdin* (if *input* is not ``None``); + 2. read data from *stdout* and *stderr*, until EOF is reached; + 3. wait for process to terminate. - If a :exc:`BrokenPipeError` or :exc:`ConnectionResetError` exception is - raised when writing *input* into stdin, the exception is ignored. It - occurs when the process exits before all data are written into stdin. + The optional *input* argument is the data (:class:`bytes` object) + that will be sent to the child process. - Note that if you want to send data to the process's stdin, you need to - create the Process object with ``stdin=PIPE``. Similarly, to get anything - other than ``None`` in the result tuple, you need to give ``stdout=PIPE`` - and/or ``stderr=PIPE`` too. + Return a tuple ``(stdout_data, stderr_data)``. - This method is a :ref:`coroutine <coroutine>`. - - .. note:: + If either :exc:`BrokenPipeError` or :exc:`ConnectionResetError` + exception is raised when writing *input* into *stdin*, the + exception is ignored. This condition occurs when the process + exits before all data are written into *stdin*. - The data read is buffered in memory, so do not use this method if the - data size is large or unlimited. + If it is desired to send data to the process' *stdin*, + the process needs to be created with ``stdin=PIPE``. Similarly, + to get anything other than ``None`` in the result tuple, the + process has to be created with ``stdout=PIPE`` and/or + ``stderr=PIPE`` arguments. - .. versionchanged:: 3.4.2 - The method now ignores :exc:`BrokenPipeError` and - :exc:`ConnectionResetError`. + Note, that the data read is buffered in memory, so do not use + this method if the data size is large or unlimited. .. method:: send_signal(signal) @@ -255,67 +230,81 @@ Process .. method:: terminate() - Stop the child. On Posix OSs the method sends :py:data:`signal.SIGTERM` - to the child. On Windows the Win32 API function - :c:func:`TerminateProcess` is called to stop the child. + Stop the child process. + + On POSIX systems this method sends :py:data:`signal.SIGTERM` to the + child process. + + On Windows the Win32 API function :c:func:`TerminateProcess` is + called to stop the child process. .. method:: kill() - Kills the child. On Posix OSs the function sends :py:data:`SIGKILL` to - the child. On Windows :meth:`kill` is an alias for :meth:`terminate`. + Kill the child. + + On POSIX systems this method sends :py:data:`SIGKILL` to the child + process. + + On Windows this method is an alias for :meth:`terminate`. .. attribute:: stdin - Standard input stream (:class:`StreamWriter`), ``None`` if the process - was created with ``stdin=None``. + Standard input stream (:class:`StreamWriter`) or ``None`` + if the process was created with ``stdin=None``. .. attribute:: stdout - Standard output stream (:class:`StreamReader`), ``None`` if the process - was created with ``stdout=None``. + Standard output stream (:class:`StreamReader`) or ``None`` + if the process was created with ``stdout=None``. .. attribute:: stderr - Standard error stream (:class:`StreamReader`), ``None`` if the process - was created with ``stderr=None``. + Standard error stream (:class:`StreamReader`) or ``None`` + if the process was created with ``stderr=None``. .. warning:: - Use the :meth:`communicate` method rather than :attr:`.stdin.write - <stdin>`, :attr:`.stdout.read <stdout>` or :attr:`.stderr.read <stderr>` - to avoid deadlocks due to streams pausing reading or writing and blocking - the child process. + Use the :meth:`communicate` method rather than + :attr:`process.stdin.write() <stdin>`, + :attr:`await process.stdout.read() <stdout>` or + :attr:`await process.stderr.read <stderr>`. + This avoids deadlocks due to streams pausing reading or writing + and blocking the child process. .. attribute:: pid - The identifier of the process. + Process identification number (PID). Note that for processes created by the :func:`create_subprocess_shell` - function, this attribute is the process identifier of the spawned shell. + function, this attribute is the PID of the spawned shell. .. attribute:: returncode - Return code of the process when it exited. A ``None`` value indicates - that the process has not terminated yet. + Return code of the process when it exits. - A negative value ``-N`` indicates that the child was terminated by signal - ``N`` (Unix only). + A ``None`` value indicates that the process has not terminated yet. + + A negative value ``-N`` indicates that the child was terminated + by signal ``N`` (POSIX only). .. _asyncio-subprocess-threads: -Subprocess and threads +Subprocess and Threads ---------------------- -asyncio supports running subprocesses from different threads, but there -are limits: +Standard asyncio event loop supports running subprocesses from +different threads, but there are limitations: + +* An event loop must run in the main thread. -* An event loop must run in the main thread -* The child watcher must be instantiated in the main thread, before executing - subprocesses from other threads. Call the :func:`get_child_watcher` - function in the main thread to instantiate the child watcher. +* The child watcher must be instantiated in the main thread + before executing subprocesses from other threads. Call the + :func:`get_child_watcher` function in the main thread to instantiate + the child watcher. -The :class:`asyncio.subprocess.Process` class is not thread safe. +Note that alternative event loop implementations might not share +the above limitations; please refer to their documentation. .. seealso:: @@ -323,97 +312,45 @@ The :class:`asyncio.subprocess.Process` class is not thread safe. <asyncio-multithreading>` section. -Subprocess examples -------------------- - -Subprocess using transport and protocol -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -Example of a subprocess protocol using to get the output of a subprocess and to -wait for the subprocess exit. The subprocess is created by the -:meth:`AbstractEventLoop.subprocess_exec` method:: - - import asyncio - import sys - - class DateProtocol(asyncio.SubprocessProtocol): - def __init__(self, exit_future): - self.exit_future = exit_future - self.output = bytearray() +Examples +-------- - def pipe_data_received(self, fd, data): - self.output.extend(data) +An example using the :class:`~asyncio.subprocess.Process` class to +control a subprocess and the :class:`StreamReader` class to read from +its standard output. - def process_exited(self): - self.exit_future.set_result(True) +.. _asyncio_example_create_subprocess_exec: - async def get_date(loop): - code = 'import datetime; print(datetime.datetime.now())' - exit_future = asyncio.Future(loop=loop) - - # Create the subprocess controlled by the protocol DateProtocol, - # redirect the standard output into a pipe - transport, protocol = await loop.subprocess_exec( - lambda: DateProtocol(exit_future), - sys.executable, '-c', code, - stdin=None, stderr=None) - - # Wait for the subprocess exit using the process_exited() method - # of the protocol - await exit_future - - # Close the stdout pipe - transport.close() - - # Read the output which was collected by the pipe_data_received() - # method of the protocol - data = bytes(protocol.output) - return data.decode('ascii').rstrip() - - if sys.platform == "win32": - loop = asyncio.ProactorEventLoop() - asyncio.set_event_loop(loop) - else: - loop = asyncio.get_event_loop() - - date = loop.run_until_complete(get_date(loop)) - print("Current date: %s" % date) - loop.close() - - -Subprocess using streams -^^^^^^^^^^^^^^^^^^^^^^^^ - -Example using the :class:`~asyncio.subprocess.Process` class to control the -subprocess and the :class:`StreamReader` class to read from the standard -output. The subprocess is created by the :func:`create_subprocess_exec` +The subprocess is created by the :func:`create_subprocess_exec` function:: - import asyncio.subprocess + import asyncio import sys async def get_date(): code = 'import datetime; print(datetime.datetime.now())' - # Create the subprocess, redirect the standard output into a pipe + # Create the subprocess; redirect the standard output + # into a pipe. proc = await asyncio.create_subprocess_exec( sys.executable, '-c', code, stdout=asyncio.subprocess.PIPE) - # Read one line of output + # Read one line of output. data = await proc.stdout.readline() line = data.decode('ascii').rstrip() - # Wait for the subprocess exit + # Wait for the subprocess exit. await proc.wait() return line if sys.platform == "win32": - loop = asyncio.ProactorEventLoop() - asyncio.set_event_loop(loop) - else: - loop = asyncio.get_event_loop() - - date = loop.run_until_complete(get_date()) - print("Current date: %s" % date) - loop.close() + asyncio.set_event_loop_policy( + asyncio.WindowsProactorEventLoopPolicy()) + + date = asyncio.run(get_date()) + print(f"Current date: {date}") + + +See also the :ref:`same example <asyncio_example_subprocess_proto>` +written using low-level APIs. diff --git a/Doc/library/asyncio-sync.rst b/Doc/library/asyncio-sync.rst index 574f70f069..18b5629704 100644 --- a/Doc/library/asyncio-sync.rst +++ b/Doc/library/asyncio-sync.rst @@ -1,172 +1,206 @@ .. currentmodule:: asyncio + .. _asyncio-sync: -Synchronization primitives ========================== +Synchronization Primitives +========================== + +asyncio synchronization primitives are designed to be similar to +those of the :mod:`threading` module with two important caveats: + +* asyncio primitives are not thread-safe, therefore they should not + be used for OS thread synchronization (use :mod:`threading` for + that); -**Source code:** :source:`Lib/asyncio/locks.py` +* methods of these synchronization primitives do not accept the *timeout* + argument; use the :func:`asyncio.wait_for` function to perform + operations with timeouts. -Locks: +asyncio has the following basic sychronization primitives: * :class:`Lock` * :class:`Event` * :class:`Condition` - -Semaphores: - * :class:`Semaphore` * :class:`BoundedSemaphore` -asyncio lock API was designed to be close to classes of the :mod:`threading` -module (:class:`~threading.Lock`, :class:`~threading.Event`, -:class:`~threading.Condition`, :class:`~threading.Semaphore`, -:class:`~threading.BoundedSemaphore`), but it has no *timeout* parameter. The -:func:`asyncio.wait_for` function can be used to cancel a task after a timeout. + +--------- Lock ----- +==== .. class:: Lock(\*, loop=None) - Primitive lock objects. + Implements a mutex lock for asyncio tasks. Not thread-safe. - A primitive lock is a synchronization primitive that is not owned by a - particular coroutine when locked. A primitive lock is in one of two states, - 'locked' or 'unlocked'. + An asyncio lock can be used to guarantee exclusive access to a + shared resource. - The lock is created in the unlocked state. - It has two basic methods, :meth:`acquire` and :meth:`release`. - When the state is unlocked, acquire() changes the state to - locked and returns immediately. When the state is locked, acquire() blocks - until a call to release() in another coroutine changes it to unlocked, then - the acquire() call resets it to locked and returns. The release() method - should only be called in the locked state; it changes the state to unlocked - and returns immediately. If an attempt is made to release an unlocked lock, - a :exc:`RuntimeError` will be raised. + The preferred way to use a Lock is an :keyword:`async with` + statement:: - When more than one coroutine is blocked in acquire() waiting for the state - to turn to unlocked, only one coroutine proceeds when a release() call - resets the state to unlocked; first coroutine which is blocked in acquire() - is being processed. + lock = asyncio.Lock() - :meth:`acquire` is a coroutine and should be called with ``await``. + # ... later + async with lock: + # access shared state - Locks support the :ref:`context management protocol <async-with-locks>`. + which is equivalent to:: - This class is :ref:`not thread safe <asyncio-multithreading>`. + lock = asyncio.Lock() - .. method:: locked() - - Return ``True`` if the lock is acquired. + # ... later + await lock.acquire() + try: + # access shared state + finally: + lock.release() .. coroutinemethod:: acquire() - Acquire a lock. - - This method blocks until the lock is unlocked, then sets it to locked and - returns ``True``. + Acquire the lock. - This method is a :ref:`coroutine <coroutine>`. + This method waits until the lock is *unlocked*, sets it to + *locked* and returns ``True``. .. method:: release() - Release a lock. + Release the lock. - When the lock is locked, reset it to unlocked, and return. If any other - coroutines are blocked waiting for the lock to become unlocked, allow - exactly one of them to proceed. + When the lock is *locked*, reset it to *unlocked* and return. - When invoked on an unlocked lock, a :exc:`RuntimeError` is raised. + If the lock is *unlocked*, a :exc:`RuntimeError` is raised. - There is no return value. + .. method:: locked() + + Return ``True`` if the lock is *locked*. Event ------ +===== .. class:: Event(\*, loop=None) - An Event implementation, asynchronous equivalent to :class:`threading.Event`. + An event object. Not thread-safe. - Class implementing event objects. An event manages a flag that can be set to - true with the :meth:`set` method and reset to false with the :meth:`clear` - method. The :meth:`wait` method blocks until the flag is true. The flag is - initially false. + An asyncio event can be used to notify multiple asyncio tasks + that some event has happened. - This class is :ref:`not thread safe <asyncio-multithreading>`. + An Event object manages an internal flag that can be set to *true* + with the :meth:`set` method and reset to *false* with the + :meth:`clear` method. The :meth:`wait` method blocks until the + flag is set to *true*. The flag is set to *false* initially. - .. method:: clear() + .. _asyncio_example_sync_event: - Reset the internal flag to false. Subsequently, coroutines calling - :meth:`wait` will block until :meth:`set` is called to set the internal - flag to true again. + Example:: - .. method:: is_set() + async def waiter(event): + print('waiting for it ...') + await event.wait() + print('... got it!') - Return ``True`` if and only if the internal flag is true. + async def main(): + # Create an Event object. + event = asyncio.Event() - .. method:: set() + # Spawn a Task to wait until 'event' is set. + waiter_task = asyncio.create_task(waiter(event)) + + # Sleep for 1 second and set the event. + await asyncio.sleep(1) + event.set() - Set the internal flag to true. All coroutines waiting for it to become - true are awakened. Coroutine that call :meth:`wait` once the flag is true - will not block at all. + # Wait until the waiter task is finished. + await waiter_task + + asyncio.run(main()) .. coroutinemethod:: wait() - Block until the internal flag is true. + Wait until the event is set. + + If the event is set, return ``True`` immediately. + Otherwise block until another task calls :meth:`set`. - If the internal flag is true on entry, return ``True`` immediately. - Otherwise, block until another coroutine calls :meth:`set` to set the - flag to true, then return ``True``. + .. method:: set() + + Set the event. - This method is a :ref:`coroutine <coroutine>`. + All tasks waiting for event to be set will be immediately + awakened. + + .. method:: clear() + + Clear (unset) the event. + + Tasks awaiting on :meth:`wait` will now block until the + :meth:`set` method is called again. + + .. method:: is_set() + + Return ``True`` if the event is set. Condition ---------- +========= .. class:: Condition(lock=None, \*, loop=None) - A Condition implementation, asynchronous equivalent to - :class:`threading.Condition`. + A Condition object. Not thread-safe. - This class implements condition variable objects. A condition variable - allows one or more coroutines to wait until they are notified by another - coroutine. + An asyncio condition primitive can be used by a task to wait for + some event to happen and then get exclusive access to a shared + resource. - If the *lock* argument is given and not ``None``, it must be a :class:`Lock` - object, and it is used as the underlying lock. Otherwise, - a new :class:`Lock` object is created and used as the underlying lock. + In essence, a Condition object combines the functionality + of an :class:`Event` and a :class:`Lock`. It is possible to have + multiple Condition objects share one Lock, which allows coordinating + exclusive access to a shared resource between different tasks + interested in particular states of that shared resource. - Conditions support the :ref:`context management protocol - <async-with-locks>`. + The optional *lock* argument must be a :class:`Lock` object or + ``None``. In the latter case a new Lock object is created + automatically. - This class is :ref:`not thread safe <asyncio-multithreading>`. + The preferred way to use a Condition is an :keyword:`async with` + statement:: - .. coroutinemethod:: acquire() + cond = asyncio.Condition() - Acquire the underlying lock. + # ... later + async with cond: + await cond.wait() - This method blocks until the lock is unlocked, then sets it to locked and - returns ``True``. + which is equivalent to:: - This method is a :ref:`coroutine <coroutine>`. + cond = asyncio.Condition() - .. method:: notify(n=1) + # ... later + await lock.acquire() + try: + await cond.wait() + finally: + lock.release() - By default, wake up one coroutine waiting on this condition, if any. - If the calling coroutine has not acquired the lock when this method is - called, a :exc:`RuntimeError` is raised. + .. coroutinemethod:: acquire() + + Acquire the underlying lock. + + This method waits until the underlying lock is *unlocked*, + sets it to *locked* and returns ``True``. - This method wakes up at most *n* of the coroutines waiting for the - condition variable; it is a no-op if no coroutines are waiting. + .. method:: notify(n=1) - .. note:: + Wake up at most *n* tasks (1 by default) waiting on this + condition. The method is no-op if no tasks are waiting. - An awakened coroutine does not actually return from its :meth:`wait` - call until it can reacquire the lock. Since :meth:`notify` does not - release the lock, its caller should. + The lock must be acquired before this method is called and + released shortly after. If called with an *unlocked* lock + a :exc:`RuntimeError` error is raised. .. method:: locked() @@ -174,78 +208,87 @@ Condition .. method:: notify_all() - Wake up all coroutines waiting on this condition. This method acts like - :meth:`notify`, but wakes up all waiting coroutines instead of one. If the - calling coroutine has not acquired the lock when this method is called, a - :exc:`RuntimeError` is raised. + Wake up all tasks waiting on this condition. - .. method:: release() + This method acts like :meth:`notify`, but wakes up all waiting + tasks. - Release the underlying lock. + The lock must be acquired before this method is called and + released shortly after. If called with an *unlocked* lock + a :exc:`RuntimeError` error is raised. - When the lock is locked, reset it to unlocked, and return. If any other - coroutines are blocked waiting for the lock to become unlocked, allow - exactly one of them to proceed. + .. method:: release() - When invoked on an unlocked lock, a :exc:`RuntimeError` is raised. + Release the underlying lock. - There is no return value. + When invoked on an unlocked lock, a :exc:`RuntimeError` is + raised. .. coroutinemethod:: wait() Wait until notified. - If the calling coroutine has not acquired the lock when this method is + If the calling task has not acquired the lock when this method is called, a :exc:`RuntimeError` is raised. - This method releases the underlying lock, and then blocks until it is - awakened by a :meth:`notify` or :meth:`notify_all` call for the same - condition variable in another coroutine. Once awakened, it re-acquires - the lock and returns ``True``. - - This method is a :ref:`coroutine <coroutine>`. + This method releases the underlying lock, and then blocks until + it is awakened by a :meth:`notify` or :meth:`notify_all` call. + Once awakened, the Condition re-acquires its lock and this method + returns ``True``. .. coroutinemethod:: wait_for(predicate) - Wait until a predicate becomes true. - - The predicate should be a callable which result will be interpreted as a - boolean value. The final predicate value is the return value. + Wait until a predicate becomes *true*. - This method is a :ref:`coroutine <coroutine>`. + The predicate must be a callable which result will be + interpreted as a boolean value. The final value is the + return value. Semaphore ---------- +========= .. class:: Semaphore(value=1, \*, loop=None) - A Semaphore implementation. + A Semaphore object. Not thread-safe. A semaphore manages an internal counter which is decremented by each - :meth:`acquire` call and incremented by each :meth:`release` call. The - counter can never go below zero; when :meth:`acquire` finds that it is zero, - it blocks, waiting until some other coroutine calls :meth:`release`. + :meth:`acquire` call and incremented by each :meth:`release` call. + The counter can never go below zero; when :meth:`acquire` finds + that it is zero, it blocks, waiting until some task calls + :meth:`release`. + + The optional *value* argument gives the initial value for the + internal counter (``1`` by default). If the given value is + less than ``0`` a :exc:`ValueError` is raised. + + The preferred way to use a Semaphore is an :keyword:`async with` + statement:: + + sem = asyncio.Semaphore(10) - The optional argument gives the initial value for the internal counter; it - defaults to ``1``. If the value given is less than ``0``, :exc:`ValueError` - is raised. + # ... later + async with sem: + # work with shared resource - Semaphores support the :ref:`context management protocol - <async-with-locks>`. + which is equivalent to:: - This class is :ref:`not thread safe <asyncio-multithreading>`. + sem = asyncio.Semaphore(10) + + # ... later + await sem.acquire() + try: + # work with shared resource + finally: + sem.release() .. coroutinemethod:: acquire() Acquire a semaphore. - If the internal counter is larger than zero on entry, decrement it by one - and return ``True`` immediately. If it is zero on entry, block, waiting - until some other coroutine has called :meth:`release` to make it larger - than ``0``, and then return ``True``. - - This method is a :ref:`coroutine <coroutine>`. + If the internal counter is greater than zero, decrement + it by one and return ``True`` immediately. If it is zero, wait + until a :meth:`release` is called and return ``True``. .. method:: locked() @@ -253,53 +296,30 @@ Semaphore .. method:: release() - Release a semaphore, incrementing the internal counter by one. When it - was zero on entry and another coroutine is waiting for it to become - larger than zero again, wake up that coroutine. + Release a semaphore, incrementing the internal counter by one. + Can wake up a task waiting to acquire the semaphore. + + Unlike :class:`BoundedSemaphore`, :class:`Semaphore` allows + making more ``release()`` calls than ``acquire()`` calls. BoundedSemaphore ----------------- +================ .. class:: BoundedSemaphore(value=1, \*, loop=None) - A bounded semaphore implementation. Inherit from :class:`Semaphore`. - - This raises :exc:`ValueError` in :meth:`~Semaphore.release` if it would - increase the value above the initial value. + A bounded semaphore object. Not thread-safe. - Bounded semaphores support the :ref:`context management - protocol <async-with-locks>`. + Bounded Semaphore is a version of :class:`Semaphore` that raises + a :exc:`ValueError` in :meth:`~Semaphore.release` if it + increases the internal counter above the initial *value*. - This class is :ref:`not thread safe <asyncio-multithreading>`. +--------- -.. _async-with-locks: - -Using locks, conditions and semaphores in the :keyword:`async with` statement ------------------------------------------------------------------------------ - -:class:`Lock`, :class:`Condition`, :class:`Semaphore`, and -:class:`BoundedSemaphore` objects can be used in :keyword:`async with` -statements. - -The :meth:`acquire` method will be called when the block is entered, -and :meth:`release` will be called when the block is exited. Hence, -the following snippet:: - - async with lock: - # do something... - -is equivalent to:: - - await lock.acquire() - try: - # do something... - finally: - lock.release() .. deprecated:: 3.7 - Lock acquiring using ``await lock`` or ``yield from lock`` and + Acquiring a lock using ``await lock`` or ``yield from lock`` and/or :keyword:`with` statement (``with await lock``, ``with (yield from - lock)``) are deprecated. + lock)``) is deprecated. Use ``async with lock`` instead. diff --git a/Doc/library/asyncio-task.rst b/Doc/library/asyncio-task.rst index 2b480d4be3..670e4a5fbe 100644 --- a/Doc/library/asyncio-task.rst +++ b/Doc/library/asyncio-task.rst @@ -1,97 +1,119 @@ .. currentmodule:: asyncio -Tasks and coroutines + +==================== +Coroutines and Tasks ==================== -**Source code:** :source:`Lib/asyncio/tasks.py` +This section outlines high-level asyncio APIs to work with coroutines +and Tasks. + +.. contents:: + :depth: 1 + :local: -**Source code:** :source:`Lib/asyncio/coroutines.py` .. _coroutine: Coroutines ----------- +========== -Coroutines used with :mod:`asyncio` may be implemented using the -:keyword:`async def` statement, or by using :term:`generators <generator>`. -The :keyword:`async def` type of coroutine was added in Python 3.5, and -is recommended if there is no need to support older Python versions. +Coroutines declared with async/await syntax is the preferred way of +writing asyncio applications. For example, the following snippet +of code prints "hello", waits 1 second, and then prints "world":: -Generator-based coroutines should be decorated with :func:`@asyncio.coroutine -<asyncio.coroutine>`, although this is not strictly enforced. -The decorator enables compatibility with :keyword:`async def` coroutines, -and also serves as documentation. Generator-based -coroutines use the ``yield from`` syntax introduced in :pep:`380`, -instead of the original ``yield`` syntax. + >>> import asyncio -The word "coroutine", like the word "generator", is used for two -different (though related) concepts: + >>> async def main(): + ... print('hello') + ... await asyncio.sleep(1) + ... print('world') -- The function that defines a coroutine - (a function definition using :keyword:`async def` or - decorated with ``@asyncio.coroutine``). If disambiguation is needed - we will call this a *coroutine function* (:func:`iscoroutinefunction` - returns ``True``). + >>> asyncio.run(main()) + hello + world -- The object obtained by calling a coroutine function. This object - represents a computation or an I/O operation (usually a combination) - that will complete eventually. If disambiguation is needed we will - call it a *coroutine object* (:func:`iscoroutine` returns ``True``). +Note that simply calling a coroutine will not schedule it to +be executed:: -Things a coroutine can do: + >>> main() + <coroutine object main at 0x1053bb7c8> -- ``result = await future`` or ``result = yield from future`` -- - suspends the coroutine until the - future is done, then returns the future's result, or raises an - exception, which will be propagated. (If the future is cancelled, - it will raise a ``CancelledError`` exception.) Note that tasks are - futures, and everything said about futures also applies to tasks. +To actually run a coroutine asyncio provides three main mechanisms: -- ``result = await coroutine`` or ``result = yield from coroutine`` -- - wait for another coroutine to - produce a result (or raise an exception, which will be propagated). - The ``coroutine`` expression must be a *call* to another coroutine. +* The :func:`asyncio.run` function to run the top-level + entry point "main()" function (see the above example.) -- ``return expression`` -- produce a result to the coroutine that is - waiting for this one using :keyword:`await` or ``yield from``. +* Awaiting on a coroutine. The following snippet of code will + print "hello" after waiting for 1 second, and then print "world" + after waiting for *another* 2 seconds:: -- ``raise exception`` -- raise an exception in the coroutine that is - waiting for this one using :keyword:`await` or ``yield from``. + import asyncio + import time -Calling a coroutine does not start its code running -- -the coroutine object returned by the call doesn't do anything until you -schedule its execution. There are two basic ways to start it running: -call ``await coroutine`` or ``yield from coroutine`` from another coroutine -(assuming the other coroutine is already running!), or schedule its execution -using the :func:`ensure_future` function or the :meth:`AbstractEventLoop.create_task` -method. + async def say_after(delay, what): + await asyncio.sleep(delay) + print(what) + async def main(): + print('started at', time.strftime('%X')) -Coroutines (and tasks) can only run when the event loop is running. + await say_after(1, 'hello') + await say_after(2, 'world') -.. decorator:: coroutine + print('finished at', time.strftime('%X')) - Decorator to mark generator-based coroutines. This enables - the generator use :keyword:`!yield from` to call :keyword:`async - def` coroutines, and also enables the generator to be called by - :keyword:`async def` coroutines, for instance using an - :keyword:`await` expression. + asyncio.run(main()) - There is no need to decorate :keyword:`async def` coroutines themselves. + Expected output:: - If the generator is not yielded from before it is destroyed, an error - message is logged. See :ref:`Detect coroutines never scheduled - <asyncio-coroutine-not-scheduled>`. + started at 17:13:52 + hello + world + finished at 17:13:55 -.. note:: +* The :func:`asyncio.create_task` function to run coroutines + concurrently as asyncio :class:`Tasks <Task>`. + + Let's modify the above example and run two "set_after" coroutines + *concurrently*:: + + async def main(): + task1 = asyncio.create_task( + say_after(1, 'hello')) + + task2 = asyncio.create_task( + say_after(2, 'world')) + + print('started at', time.strftime('%X')) + + # Wait until both tasks are completed (should take + # around 2 seconds.) + await task1 + await task2 + + print('finished at', time.strftime('%X')) + + Note that expected output now shows that the snippet runs + 1 second faster than before:: + + started at 17:14:32 + hello + world + finished at 17:14:34 + +Note that in this documentation the term "coroutine" can be used for +two closely related concepts: + +* a *coroutine function*: an :keyword:`async def` function; - In this documentation, some methods are documented as coroutines, - even if they are plain Python functions returning a :class:`Future`. - This is intentional to have a freedom of tweaking the implementation - of these functions in the future. If such a function is needed to be - used in a callback-style code, wrap its result with :func:`ensure_future`. +* a *coroutine object*: object returned by calling a + *coroutine function*. +Running an asyncio Program +========================== + .. function:: run(coro, \*, debug=False) This function runs the passed coroutine, taking care of @@ -101,45 +123,46 @@ Coroutines (and tasks) can only run when the event loop is running. This function cannot be called when another asyncio event loop is running in the same thread. - If debug is True, the event loop will be run in debug mode. + If *debug* is ``True``, the event loop will be run in debug mode. This function always creates a new event loop and closes it at the end. It should be used as a main entry point for asyncio programs, and should ideally only be called once. .. versionadded:: 3.7 - **Important:** this has been been added to asyncio in Python 3.7 - on a :term:`provisional basis <provisional api>`. + **Important:** this function has been added to asyncio in + Python 3.7 on a :term:`provisional basis <provisional api>`. -.. _asyncio-hello-world-coroutine: +Creating Tasks +============== -Example: Hello World coroutine -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +.. function:: create_task(coro) -Example of coroutine displaying ``"Hello World"``:: + Wrap the *coro* :ref:`coroutine <coroutine>` into a task and schedule + its execution. Return the task object. - import asyncio + The task is executed in the loop returned by :func:`get_running_loop`, + :exc:`RuntimeError` is raised if there is no running loop in + current thread. - async def hello_world(): - print("Hello World!") + .. versionadded:: 3.7 - asyncio.run(hello_world()) -.. seealso:: +Sleeping +======== - The :ref:`Hello World with call_soon() <asyncio-hello-world-callback>` - example uses the :meth:`AbstractEventLoop.call_soon` method to schedule a - callback. +.. coroutinefunction:: sleep(delay, result=None, \*, loop=None) + Block for *delay* seconds. -.. _asyncio-date-coroutine: + If *result* is provided, it is returned to the caller + when the coroutine completes. -Example: Coroutine displaying the current date -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + .. _asyncio_example_sleep: -Example of coroutine displaying the current date every second during 5 seconds -using the :meth:`sleep` function:: + Example of coroutine displaying the current date every second + for 5 seconds:: import asyncio import datetime @@ -155,408 +178,283 @@ using the :meth:`sleep` function:: asyncio.run(display_date()) -.. seealso:: - - The :ref:`display the current date with call_later() - <asyncio-date-callback>` example uses a callback with the - :meth:`AbstractEventLoop.call_later` method. - - -Example: Chain coroutines -^^^^^^^^^^^^^^^^^^^^^^^^^ - -Example chaining coroutines:: - - import asyncio - - async def compute(x, y): - print("Compute %s + %s ..." % (x, y)) - await asyncio.sleep(1.0) - return x + y - - async def print_sum(x, y): - result = await compute(x, y) - print("%s + %s = %s" % (x, y, result)) - - loop = asyncio.get_event_loop() - loop.run_until_complete(print_sum(1, 2)) - loop.close() - -``compute()`` is chained to ``print_sum()``: ``print_sum()`` coroutine waits -until ``compute()`` is completed before returning its result. - -Sequence diagram of the example: - -.. image:: tulip_coro.png - :align: center - -The "Task" is created by the :meth:`AbstractEventLoop.run_until_complete` method -when it gets a coroutine object instead of a task. - -The diagram shows the control flow, it does not describe exactly how things -work internally. For example, the sleep coroutine creates an internal future -which uses :meth:`AbstractEventLoop.call_later` to wake up the task in 1 second. - - -InvalidStateError ------------------ - -.. exception:: InvalidStateError - - The operation is not allowed in this state. - - -TimeoutError ------------- - -.. exception:: TimeoutError - - The operation exceeded the given deadline. - -.. note:: - - This exception is different from the builtin :exc:`TimeoutError` exception! - - -Future ------- - -.. class:: Future(\*, loop=None) - - This class is *almost* compatible with :class:`concurrent.futures.Future`. - - Differences: - - - :meth:`result` and :meth:`exception` do not take a timeout argument and - raise an exception when the future isn't done yet. - - - Callbacks registered with :meth:`add_done_callback` are always called - via the event loop's :meth:`~AbstractEventLoop.call_soon`. - - - This class is not compatible with the :func:`~concurrent.futures.wait` and - :func:`~concurrent.futures.as_completed` functions in the - :mod:`concurrent.futures` package. - - This class is :ref:`not thread safe <asyncio-multithreading>`. - - .. method:: cancel() - - Cancel the future and schedule callbacks. - - If the future is already done or cancelled, return ``False``. Otherwise, - change the future's state to cancelled, schedule the callbacks and return - ``True``. - - .. method:: cancelled() - - Return ``True`` if the future was cancelled. - - .. method:: done() - - Return ``True`` if the future is done. - - Done means either that a result / exception are available, or that the - future was cancelled. - - .. method:: result() - - Return the result this future represents. - - If the future has been cancelled, raises :exc:`CancelledError`. If the - future's result isn't yet available, raises :exc:`InvalidStateError`. If - the future is done and has an exception set, this exception is raised. - - .. method:: exception() - - Return the exception that was set on this future. - - The exception (or ``None`` if no exception was set) is returned only if - the future is done. If the future has been cancelled, raises - :exc:`CancelledError`. If the future isn't done yet, raises - :exc:`InvalidStateError`. - .. method:: add_done_callback(callback, *, context=None) +Running Tasks Concurrently +========================== - Add a callback to be run when the future becomes done. +.. function:: gather(\*fs, loop=None, return_exceptions=False) - The *callback* is called with a single argument - the future object. If the - future is already done when this is called, the callback is scheduled - with :meth:`~AbstractEventLoop.call_soon`. + Return a Future aggregating results from the given coroutine objects, + Tasks, or Futures. - An optional keyword-only *context* argument allows specifying a custom - :class:`contextvars.Context` for the *callback* to run in. The current - context is used when no *context* is provided. + If all Tasks/Futures are completed successfully, the result is an + aggregate list of returned values. The result values are in the + order of the original *fs* sequence. - :ref:`Use functools.partial to pass parameters to the callback - <asyncio-pass-keywords>`. For example, - ``fut.add_done_callback(functools.partial(print, "Future:", - flush=True))`` will call ``print("Future:", fut, flush=True)``. + All coroutines in the *fs* list are automatically + scheduled as :class:`Tasks <Task>`. - .. versionchanged:: 3.7 - The *context* keyword-only parameter was added. See :pep:`567` - for more details. + If *return_exceptions* is ``True``, exceptions in the Tasks/Futures + are treated the same as successful results, and gathered in the + result list. Otherwise, the first raised exception is immediately + propagated to the returned Future. - .. method:: remove_done_callback(fn) + If the outer Future is *cancelled*, all submitted Tasks/Futures + (that have not completed yet) are also *cancelled*. - Remove all instances of a callback from the "call when done" list. + If any child is *cancelled*, it is treated as if it raised + :exc:`CancelledError` -- the outer Future is **not** cancelled in + this case. This is to prevent the cancellation of one submitted + Task/Future to cause other Tasks/Futures to be cancelled. - Returns the number of callbacks removed. + All futures must share the same event loop. - .. method:: set_result(result) + .. versionchanged:: 3.7 + If the *gather* itself is cancelled, the cancellation is + propagated regardless of *return_exceptions*. - Mark the future done and set its result. + .. _asyncio_example_gather: - If the future is already done when this method is called, raises - :exc:`InvalidStateError`. + Example:: - .. method:: set_exception(exception) + import asyncio - Mark the future done and set an exception. + async def factorial(name, number): + f = 1 + for i in range(2, number + 1): + print(f"Task {name}: Compute factorial({i})...") + await asyncio.sleep(1) + f *= i + print(f"Task {name}: factorial({number}) = {f}") - If the future is already done when this method is called, raises - :exc:`InvalidStateError`. + async def main(): + await asyncio.gather( + factorial("A", 2), + factorial("B", 3), + factorial("C", 4), + )) - .. method:: get_loop() + asyncio.run(main()) - Return the event loop the future object is bound to. + # Expected output: + # + # Task A: Compute factorial(2)... + # Task B: Compute factorial(2)... + # Task C: Compute factorial(2)... + # Task A: factorial(2) = 2 + # Task B: Compute factorial(3)... + # Task C: Compute factorial(3)... + # Task B: factorial(3) = 6 + # Task C: Compute factorial(4)... + # Task C: factorial(4) = 24 - .. versionadded:: 3.7 +Shielding Tasks From Cancellation +================================= -Example: Future with run_until_complete() -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +.. coroutinefunction:: shield(fut, \*, loop=None) -Example combining a :class:`Future` and a :ref:`coroutine function -<coroutine>`:: + Wait for a Future/Task while protecting it from being cancelled. - import asyncio + *fut* can be a coroutine, a Task, or a Future-like object. If + *fut* is a coroutine it is automatically scheduled as a + :class:`Task`. - async def slow_operation(future): - await asyncio.sleep(1) - future.set_result('Future is done!') + The statement:: - loop = asyncio.get_event_loop() - future = asyncio.Future() - asyncio.ensure_future(slow_operation(future)) - loop.run_until_complete(future) - print(future.result()) - loop.close() + res = await shield(something()) -The coroutine function is responsible for the computation (which takes 1 second) -and it stores the result into the future. The -:meth:`~AbstractEventLoop.run_until_complete` method waits for the completion of -the future. + is equivalent to:: -.. note:: - The :meth:`~AbstractEventLoop.run_until_complete` method uses internally the - :meth:`~Future.add_done_callback` method to be notified when the future is - done. + res = await something() + *except* that if the coroutine containing it is cancelled, the + Task running in ``something()`` is not cancelled. From the point + of view of ``something()``, the cancellation did not happen. + Although its caller is still cancelled, so the "await" expression + still raises a :exc:`CancelledError`. -Example: Future with run_forever() -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + If ``something()`` is cancelled by other means (i.e. from within + itself) that would also cancel ``shield()``. -The previous example can be written differently using the -:meth:`Future.add_done_callback` method to describe explicitly the control -flow:: + If it is desired to completely ignore cancellation (not recommended) + the ``shield()`` function should be combined with a try/except + clause, as follows:: - import asyncio - - async def slow_operation(future): - await asyncio.sleep(1) - future.set_result('Future is done!') + try: + res = await shield(something()) + except CancelledError: + res = None - def got_result(future): - print(future.result()) - loop.stop() - loop = asyncio.get_event_loop() - future = asyncio.Future() - asyncio.ensure_future(slow_operation(future)) - future.add_done_callback(got_result) - try: - loop.run_forever() - finally: - loop.close() +Timeouts +======== -In this example, the future is used to link ``slow_operation()`` to -``got_result()``: when ``slow_operation()`` is done, ``got_result()`` is called -with the result. +.. coroutinefunction:: wait_for(fut, timeout, \*, loop=None) + Wait for a coroutine, Task, or Future to complete with timeout. -Task ----- + *fut* can be a coroutine, a Task, or a Future-like object. If + *fut* is a coroutine it is automatically scheduled as a + :class:`Task`. -.. function:: create_task(coro) + *timeout* can either be ``None`` or a float or int number of seconds + to wait for. If *timeout* is ``None``, block until the future + completes. - Wrap a :ref:`coroutine <coroutine>` *coro* into a task and schedule - its execution. Return the task object. + If a timeout occurs, it cancels the task and raises + :exc:`asyncio.TimeoutError`. - The task is executed in :func:`get_running_loop` context, - :exc:`RuntimeError` is raised if there is no running loop in - current thread. + To avoid the task cancellation, wrap it in :func:`shield`. - .. versionadded:: 3.7 + The function will wait until the future is actually cancelled, + so the total wait time may exceed the *timeout*. -.. class:: Task(coro, \*, loop=None) + If the wait is cancelled, the future *fut* is also cancelled. - A unit for concurrent running of :ref:`coroutines <coroutine>`, - subclass of :class:`Future`. + .. _asyncio_example_waitfor: - A task is responsible for executing a coroutine object in an event loop. If - the wrapped coroutine yields from a future, the task suspends the execution - of the wrapped coroutine and waits for the completion of the future. When - the future is done, the execution of the wrapped coroutine restarts with the - result or the exception of the future. + Example:: - Event loops use cooperative scheduling: an event loop only runs one task at - a time. Other tasks may run in parallel if other event loops are - running in different threads. While a task waits for the completion of a - future, the event loop executes a new task. + async def eternity(): + # Sleep for one hour + await asyncio.sleep(3600) + print('yay!') - The cancellation of a task is different from the cancellation of a - future. Calling :meth:`cancel` will throw a - :exc:`~concurrent.futures.CancelledError` to the wrapped - coroutine. :meth:`~Future.cancelled` only returns ``True`` if the - wrapped coroutine did not catch the - :exc:`~concurrent.futures.CancelledError` exception, or raised a - :exc:`~concurrent.futures.CancelledError` exception. + async def main(): + # Wait for at most 1 second + try: + await asyncio.wait_for(eternity(), timeout=1.0) + except asyncio.TimeoutError: + print('timeout!') - If a pending task is destroyed, the execution of its wrapped :ref:`coroutine - <coroutine>` did not complete. It is probably a bug and a warning is - logged: see :ref:`Pending task destroyed <asyncio-pending-task-destroyed>`. + asyncio.run(main()) - Don't directly create :class:`Task` instances: use the :func:`create_task` - function or the :meth:`AbstractEventLoop.create_task` method. + # Expected output: + # + # timeout! - Tasks support the :mod:`contextvars` module. When a Task - is created it copies the current context and later runs its coroutine - in the copied context. See :pep:`567` for more details. + .. versionchanged:: 3.7 + When *fut* is cancelled due to a timeout, ``wait_for`` waits + for *fut* to be cancelled. Previously, it raised + :exc:`asyncio.TimeoutError` immediately. - This class is :ref:`not thread safe <asyncio-multithreading>`. - .. versionchanged:: 3.7 - Added support for the :mod:`contextvars` module. +Waiting Primitives +================== - .. classmethod:: all_tasks(loop=None) +.. coroutinefunction:: wait(fs, \*, loop=None, timeout=None,\ + return_when=ALL_COMPLETED) - Return a set of all tasks for an event loop. + Wait for a set of coroutines, Tasks, or Futures to complete. - By default all tasks for the current event loop are returned. - If *loop* is ``None``, :func:`get_event_loop` function - is used to get the current loop. + *fs* is a list of coroutines, Futures, and/or Tasks. Coroutines + are automatically scheduled as :class:`Tasks <Task>`. - .. classmethod:: current_task(loop=None) + Returns two sets of Tasks/Futures: ``(done, pending)``. - Return the currently running task in an event loop or ``None``. + *timeout* (a float or int), if specified, can be used to control + the maximum number of seconds to wait before returning. - By default the current task for the current event loop is returned. + Note that this function does not raise :exc:`asyncio.TimeoutError`. + Futures or Tasks that aren't done when the timeout occurs are simply + returned in the second set. - ``None`` is returned when called not in the context of a :class:`Task`. + *return_when* indicates when this function should return. It must + be one of the following constants: - .. method:: cancel() + .. tabularcolumns:: |l|L| - Request that this task cancel itself. + +-----------------------------+----------------------------------------+ + | Constant | Description | + +=============================+========================================+ + | :const:`FIRST_COMPLETED` | The function will return when any | + | | future finishes or is cancelled. | + +-----------------------------+----------------------------------------+ + | :const:`FIRST_EXCEPTION` | The function will return when any | + | | future finishes by raising an | + | | exception. If no future raises an | + | | exception then it is equivalent to | + | | :const:`ALL_COMPLETED`. | + +-----------------------------+----------------------------------------+ + | :const:`ALL_COMPLETED` | The function will return when all | + | | futures finish or are cancelled. | + +-----------------------------+----------------------------------------+ - This arranges for a :exc:`~concurrent.futures.CancelledError` to be - thrown into the wrapped coroutine on the next cycle through the event - loop. The coroutine then has a chance to clean up or even deny the - request using try/except/finally. + Unlike :func:`~asyncio.wait_for`, ``wait()`` does not cancel the + futures when a timeout occurs. - Unlike :meth:`Future.cancel`, this does not guarantee that the task - will be cancelled: the exception might be caught and acted upon, delaying - cancellation of the task or preventing cancellation completely. The task - may also return a value or raise a different exception. + Usage:: - Immediately after this method is called, :meth:`~Future.cancelled` will - not return ``True`` (unless the task was already cancelled). A task will - be marked as cancelled when the wrapped coroutine terminates with a - :exc:`~concurrent.futures.CancelledError` exception (even if - :meth:`cancel` was not called). + done, pending = await asyncio.wait(fs) - .. method:: get_stack(\*, limit=None) - Return the list of stack frames for this task's coroutine. +.. function:: as_completed(fs, \*, loop=None, timeout=None) - If the coroutine is not done, this returns the stack where it is - suspended. If the coroutine has completed successfully or was - cancelled, this returns an empty list. If the coroutine was - terminated by an exception, this returns the list of traceback - frames. + Return an iterator of awaitables which return + :class:`Future` instances. - The frames are always ordered from oldest to newest. + Raises :exc:`asyncio.TimeoutError` if the timeout occurs before + all Futures are done. - The optional limit gives the maximum number of frames to return; by - default all available frames are returned. Its meaning differs depending - on whether a stack or a traceback is returned: the newest frames of a - stack are returned, but the oldest frames of a traceback are returned. - (This matches the behavior of the traceback module.) + Example:: - For reasons beyond our control, only one stack frame is returned for a - suspended coroutine. + for f in as_completed(fs): + result = await f + # ... - .. method:: print_stack(\*, limit=None, file=None) - Print the stack or traceback for this task's coroutine. +Scheduling From Other Threads +============================= - This produces output similar to that of the traceback module, for the - frames retrieved by get_stack(). The limit argument is passed to - get_stack(). The file argument is an I/O stream to which the output - is written; by default output is written to sys.stderr. +.. function:: run_coroutine_threadsafe(coro, loop) + Submit a coroutine to the given event loop. Thread-safe. -Example: Parallel execution of tasks -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + Return a :class:`concurrent.futures.Future` to access the result. -Example executing 3 tasks (A, B, C) in parallel:: + This function is meant to be called from a different OS thread + than the one where the event loop is running. Example:: - import asyncio + # Create a coroutine + coro = asyncio.sleep(1, result=3) - async def factorial(name, number): - f = 1 - for i in range(2, number+1): - print("Task %s: Compute factorial(%s)..." % (name, i)) - await asyncio.sleep(1) - f *= i - print("Task %s: factorial(%s) = %s" % (name, number, f)) + # Submit the coroutine to a given loop + future = asyncio.run_coroutine_threadsafe(coro, loop) - loop = asyncio.get_event_loop() - loop.run_until_complete(asyncio.gather( - factorial("A", 2), - factorial("B", 3), - factorial("C", 4), - )) - loop.close() + # Wait for the result with an optional timeout argument + assert future.result(timeout) == 3 -Output:: + If an exception is raised in the coroutine, the returned Future + will be notified. It can also be used to cancel the task in + the event loop:: - Task A: Compute factorial(2)... - Task B: Compute factorial(2)... - Task C: Compute factorial(2)... - Task A: factorial(2) = 2 - Task B: Compute factorial(3)... - Task C: Compute factorial(3)... - Task B: factorial(3) = 6 - Task C: Compute factorial(4)... - Task C: factorial(4) = 24 + try: + result = future.result(timeout) + except asyncio.TimeoutError: + print('The coroutine took too long, cancelling the task...') + future.cancel() + except Exception as exc: + print('The coroutine raised an exception: {!r}'.format(exc)) + else: + print('The coroutine returned: {!r}'.format(result)) -A task is automatically scheduled for execution when it is created. The event -loop stops when all tasks are done. + See the :ref:`concurrency and multithreading <asyncio-multithreading>` + section of the documentation. + Unlike other asyncio functions this functions requires the *loop* + argument to be passed explicitly. -Task functions --------------- + .. versionadded:: 3.5.1 -.. note:: - In the functions below, the optional *loop* argument allows explicitly setting - the event loop object used by the underlying task or coroutine. If it's - not provided, the default event loop is used. +Introspection +============= .. function:: current_task(loop=None) - Return the current running :class:`Task` instance or ``None``, if + Return the currently running :class:`Task` instance, or ``None`` if no task is running. If *loop* is ``None`` :func:`get_running_loop` is used to get @@ -567,246 +465,236 @@ Task functions .. function:: all_tasks(loop=None) - Return a set of :class:`Task` objects created for the loop. + Return a set of not yet finished :class:`Task` objects run by + the loop. If *loop* is ``None``, :func:`get_running_loop` is used for getting - current loop (contrary to the deprecated :meth:`Task.all_tasks` method - that uses :func:`get_event_loop`.) + current loop. .. versionadded:: 3.7 -.. function:: as_completed(fs, \*, loop=None, timeout=None) - - Return an iterator whose values, when waited for, are :class:`Future` - instances. +Task Object +=========== - Raises :exc:`asyncio.TimeoutError` if the timeout occurs before all Futures - are done. - - Example:: - - for f in as_completed(fs): - result = await f # The 'await' may raise - # Use result - - .. note:: - - The futures ``f`` are not necessarily members of fs. +.. class:: Task(coro, \*, loop=None) -.. function:: ensure_future(coro_or_future, \*, loop=None) + A :class:`Future`-like object that wraps a Python + :ref:`coroutine <coroutine>`. Not thread-safe. - Schedule the execution of a :ref:`coroutine object <coroutine>`: wrap it in - a future. Return a :class:`Task` object. + Tasks are used to run coroutines in event loops. + If a coroutine awaits on a Future, the Task suspends + the execution of the coroutine and waits for the completion + of the Future. When the Future is *done*, the execution of + the wrapped coroutine resumes. - If the argument is a :class:`Future`, it is returned directly. + Event loops use cooperative scheduling: an event loop runs + one Task at a time. While a Task awaits for the completion of a + Future, the event loop runs other Tasks, callbacks, or performs + IO operations. - .. versionadded:: 3.4.4 + Use the high-level :func:`asyncio.create_task` function to create + Tasks, or the low-level :meth:`loop.create_task` or + :func:`ensure_future` functions. Manual instantiation of Tasks + is discouraged. - .. versionchanged:: 3.5.1 - The function accepts any :term:`awaitable` object. + To cancel a running Task use the :meth:`cancel` method. Calling it + will cause the Task to throw a :exc:`CancelledError` exception into + the wrapped coroutine. If a coroutine is awaiting on a Future + object during cancellation, the Future object will be cancelled. - .. note:: + :meth:`cancelled` can be used to check if the Task was cancelled. + The method returns ``True`` if the wrapped coroutine did not + suppress the :exc:`CancelledError` exception and was actually + cancelled. - :func:`create_task` (added in Python 3.7) is the preferable way - for spawning new tasks. + :class:`asyncio.Task` inherits from :class:`Future` all of its + APIs except :meth:`Future.set_result` and + :meth:`Future.set_exception`. - .. seealso:: + Tasks support the :mod:`contextvars` module. When a Task + is created it copies the current context and later runs its + coroutine in the copied context. - The :func:`create_task` function and - :meth:`AbstractEventLoop.create_task` method. + .. versionchanged:: 3.7 + Added support for the :mod:`contextvars` module. -.. function:: wrap_future(future, \*, loop=None) + .. method:: cancel() - Wrap a :class:`concurrent.futures.Future` object in a :class:`Future` - object. + Request the Task to be cancelled. -.. function:: gather(\*coros_or_futures, loop=None, return_exceptions=False) + This arranges for a :exc:`CancelledError` exception to be thrown + into the wrapped coroutine on the next cycle of the event loop. - Return a future aggregating results from the given coroutine objects or - futures. + The coroutine then has a chance to clean up or even deny the + request by suppressing the exception with a :keyword:`try` ... + ... ``except CancelledError`` ... :keyword:`finally` block. + Therefore, unlike :meth:`Future.cancel`, :meth:`Task.cancel` does + not guarantee that the Task will be cancelled, although + suppressing cancellation completely is not common and is actively + discouraged. - All futures must share the same event loop. If all the tasks are done - successfully, the returned future's result is the list of results (in the - order of the original sequence, not necessarily the order of results - arrival). If *return_exceptions* is true, exceptions in the tasks are - treated the same as successful results, and gathered in the result list; - otherwise, the first raised exception will be immediately propagated to the - returned future. + .. _asyncio_example_task_cancel: - Cancellation: if the outer Future is cancelled, all children (that have not - completed yet) are also cancelled. If any child is cancelled, this is - treated as if it raised :exc:`~concurrent.futures.CancelledError` -- the - outer Future is *not* cancelled in this case. (This is to prevent the - cancellation of one child to cause other children to be cancelled.) + The following example illustrates how coroutines can intercept + the cancellation request:: - .. versionchanged:: 3.7.0 - If the *gather* itself is cancelled, the cancellation is propagated - regardless of *return_exceptions*. + async def cancel_me(): + print('cancel_me(): before sleep') -.. function:: iscoroutine(obj) + try: + # Wait for 1 hour + await asyncio.sleep(3600) + except asyncio.CancelledError: + print('cancel_me(): cancel sleep') + raise + finally: + print('cancel_me(): after sleep') - Return ``True`` if *obj* is a :ref:`coroutine object <coroutine>`, - which may be based on a generator or an :keyword:`async def` coroutine. + async def main(): + # Create a "cancel_me" Task + task = asyncio.create_task(cancel_me()) -.. function:: iscoroutinefunction(func) + # Wait for 1 second + await asyncio.sleep(1) - Return ``True`` if *func* is determined to be a :ref:`coroutine function - <coroutine>`, which may be a decorated generator function or an - :keyword:`async def` function. + task.cancel() + try: + await task + except asyncio.CancelledError: + print("main(): cancel_me is cancelled now") -.. function:: run_coroutine_threadsafe(coro, loop) + asyncio.run(main()) - Submit a :ref:`coroutine object <coroutine>` to a given event loop. + # Expected output: + # + # cancel_me(): before sleep + # cancel_me(): cancel sleep + # cancel_me(): after sleep + # main(): cancel_me is cancelled now - Return a :class:`concurrent.futures.Future` to access the result. + .. method:: cancelled() - This function is meant to be called from a different thread than the one - where the event loop is running. Usage:: + Return ``True`` if the Task is *cancelled*. - # Create a coroutine - coro = asyncio.sleep(1, result=3) - # Submit the coroutine to a given loop - future = asyncio.run_coroutine_threadsafe(coro, loop) - # Wait for the result with an optional timeout argument - assert future.result(timeout) == 3 + The Task is *cancelled* when the cancellation was requested with + :meth:`cancel` and the wrapped coroutine propagated the + :exc:`CancelledError` exception thrown into it. - If an exception is raised in the coroutine, the returned future will be - notified. It can also be used to cancel the task in the event loop:: + .. method:: done() - try: - result = future.result(timeout) - except asyncio.TimeoutError: - print('The coroutine took too long, cancelling the task...') - future.cancel() - except Exception as exc: - print('The coroutine raised an exception: {!r}'.format(exc)) - else: - print('The coroutine returned: {!r}'.format(result)) + Return ``True`` if the Task is *done*. - See the :ref:`concurrency and multithreading <asyncio-multithreading>` - section of the documentation. + A Task is *done* when the wrapped coroutine either returned + a value, raised an exception, or the Task was cancelled. - .. note:: + .. method:: get_stack(\*, limit=None) - Unlike other functions from the module, - :func:`run_coroutine_threadsafe` requires the *loop* argument to - be passed explicitly. + Return the list of stack frames for this Task. - .. versionadded:: 3.5.1 + If the wrapped coroutine is not done, this returns the stack + where it is suspended. If the coroutine has completed + successfully or was cancelled, this returns an empty list. + If the coroutine was terminated by an exception, this returns + the list of traceback frames. -.. coroutinefunction:: sleep(delay, result=None, \*, loop=None) + The frames are always ordered from oldest to newest. - Create a :ref:`coroutine <coroutine>` that completes after a given - time (in seconds). If *result* is provided, it is produced to the caller - when the coroutine completes. + Only one stack frame is returned for a suspended coroutine. - The resolution of the sleep depends on the :ref:`granularity of the event - loop <asyncio-delayed-calls>`. + The optional *limit* argument sets the maximum number of frames + to return; by default all available frames are returned. + The ordering of the returned list differs depending on whether + a stack or a traceback is returned: the newest frames of a + stack are returned, but the oldest frames of a traceback are + returned. (This matches the behavior of the traceback module.) - This function is a :ref:`coroutine <coroutine>`. + .. method:: print_stack(\*, limit=None, file=None) -.. coroutinefunction:: shield(arg, \*, loop=None) + Print the stack or traceback for this Task. - Wait for a future, shielding it from cancellation. + This produces output similar to that of the traceback module + for the frames retrieved by :meth:`get_stack`. - The statement:: + The *limit* argument is passed to :meth:`get_stack` directly. - res = await shield(something()) + The *file* argument is an I/O stream to which the output + is written; by default output is written to :data:`sys.stderr`. - is exactly equivalent to the statement:: + .. classmethod:: all_tasks(loop=None) - res = await something() + Return a set of all tasks for an event loop. - *except* that if the coroutine containing it is cancelled, the task running - in ``something()`` is not cancelled. From the point of view of - ``something()``, the cancellation did not happen. But its caller is still - cancelled, so the yield-from expression still raises - :exc:`~concurrent.futures.CancelledError`. Note: If ``something()`` is - cancelled by other means this will still cancel ``shield()``. + By default all tasks for the current event loop are returned. + If *loop* is ``None``, the :func:`get_event_loop` function + is used to get the current loop. - If you want to completely ignore cancellation (not recommended) you can - combine ``shield()`` with a try/except clause, as follows:: + This method is **deprecated** and will be removed in + Python 3.9. Use the :func:`all_tasks` function instead. - try: - res = await shield(something()) - except CancelledError: - res = None + .. classmethod:: current_task(loop=None) + Return the currently running task or ``None``. -.. coroutinefunction:: wait(futures, \*, loop=None, timeout=None,\ - return_when=ALL_COMPLETED) + If *loop* is ``None``, the :func:`get_event_loop` function + is used to get the current loop. - Wait for the Futures and coroutine objects given by the sequence *futures* - to complete. Coroutines will be wrapped in Tasks. Returns two sets of - :class:`Future`: (done, pending). + This method is **deprecated** and will be removed in + Python 3.9. Use the :func:`current_task` function instead. - The sequence *futures* must not be empty. - *timeout* can be used to control the maximum number of seconds to wait before - returning. *timeout* can be an int or float. If *timeout* is not specified - or ``None``, there is no limit to the wait time. +.. _asyncio_generator_based_coro: - *return_when* indicates when this function should return. It must be one of - the following constants of the :mod:`concurrent.futures` module: +Generator-based Coroutines +========================== - .. tabularcolumns:: |l|L| +.. note:: - +-----------------------------+----------------------------------------+ - | Constant | Description | - +=============================+========================================+ - | :const:`FIRST_COMPLETED` | The function will return when any | - | | future finishes or is cancelled. | - +-----------------------------+----------------------------------------+ - | :const:`FIRST_EXCEPTION` | The function will return when any | - | | future finishes by raising an | - | | exception. If no future raises an | - | | exception then it is equivalent to | - | | :const:`ALL_COMPLETED`. | - +-----------------------------+----------------------------------------+ - | :const:`ALL_COMPLETED` | The function will return when all | - | | futures finish or are cancelled. | - +-----------------------------+----------------------------------------+ + Support for generator-based coroutines is **deprecated** and + is scheduled for removal in Python 4.0. - Unlike :func:`~asyncio.wait_for`, ``wait()`` will not cancel the futures - when a timeout occurs. +Generator-based coroutines predate async/await syntax. They are +Python generators that use ``yield from`` expressions to await +on Futures and other coroutines. - This function is a :ref:`coroutine <coroutine>`. +Generator-based coroutines should be decorated with +:func:`@asyncio.coroutine <asyncio.coroutine>`, although this is not +enforced. - Usage:: - done, pending = await asyncio.wait(fs) +.. decorator:: coroutine - .. note:: + Decorator to mark generator-based coroutines. - This does not raise :exc:`asyncio.TimeoutError`! Futures that aren't done - when the timeout occurs are returned in the second set. + This decorator enables legacy generator-based coroutines to be + compatible with async/await code:: + @asyncio.coroutine + def old_style_coroutine(): + yield from asyncio.sleep(1) -.. coroutinefunction:: wait_for(fut, timeout, \*, loop=None) + async def main(): + await old_style_coroutine() - Wait for the single :class:`Future` or :ref:`coroutine object <coroutine>` - to complete with timeout. If *timeout* is ``None``, block until the future - completes. + This decorator is **deprecated** and is scheduled for removal in + Python 4.0. - Coroutine will be wrapped in :class:`Task`. + This decorator should not be used for :keyword:`async def` + coroutines. - Returns result of the Future or coroutine. When a timeout occurs, it - cancels the task and raises :exc:`asyncio.TimeoutError`. To avoid the task - cancellation, wrap it in :func:`shield`. The function will wait until - the future is actually cancelled, so the total wait time may exceed - the *timeout*. +.. function:: iscoroutine(obj) - If the wait is cancelled, the future *fut* is also cancelled. + Return ``True`` if *obj* is a :ref:`coroutine object <coroutine>`. - This function is a :ref:`coroutine <coroutine>`, usage:: + This method is different from :func:`inspect.iscoroutine` because + it returns ``True`` for generator-based coroutines decorated with + :func:`@coroutine <coroutine>`. - result = await asyncio.wait_for(fut, 60.0) +.. function:: iscoroutinefunction(func) - .. versionchanged:: 3.4.3 - If the wait is cancelled, the future *fut* is now also cancelled. + Return ``True`` if *func* is a :ref:`coroutine function + <coroutine>`. - .. versionchanged:: 3.7 - When *fut* is cancelled due to a timeout, ``wait_for`` now waits - for *fut* to be cancelled. Previously, - it raised :exc:`~asyncio.TimeoutError` immediately. + This method is different from :func:`inspect.iscoroutinefunction` + because it returns ``True`` for generator-based coroutine functions + decorated with :func:`@coroutine <coroutine>`. diff --git a/Doc/library/asyncio.rst b/Doc/library/asyncio.rst index b076b7d009..bfc97001bb 100644 --- a/Doc/library/asyncio.rst +++ b/Doc/library/asyncio.rst @@ -1,66 +1,92 @@ -:mod:`asyncio` --- Asynchronous I/O, event loop, coroutines and tasks -===================================================================== +:mod:`asyncio` --- Asynchronous I/O +=================================== .. module:: asyncio - :synopsis: Asynchronous I/O, event loop, coroutines and tasks. + :synopsis: Asynchronous I/O. -.. versionadded:: 3.4 +-------------- -**Source code:** :source:`Lib/asyncio/` +.. sidebar:: Hello World! --------------- + .. code-block:: python + + import asyncio + + async def main(): + print('Hello ...') + await asyncio.sleep(1) + print('... World!') + + asyncio.run(main()) + +asyncio is a library to write **concurrent** code using +the **async/await** syntax. -This module provides infrastructure for writing single-threaded concurrent -code using coroutines, multiplexing I/O access over sockets and other -resources, running network clients and servers, and other related primitives. -Here is a more detailed list of the package contents: +asyncio is used as a foundation for multiple Python asynchronous +frameworks that provide high-performance network and web-servers, +database connection libraries, distributed task queues, etc. -* a pluggable :ref:`event loop <asyncio-event-loop>` with various system-specific - implementations; +asyncio is often a perfect fit for IO-bound and high-level +**structured** network code. -* :ref:`transport <asyncio-transport>` and :ref:`protocol <asyncio-protocol>` abstractions - (similar to those in `Twisted <https://twistedmatrix.com/trac/>`_); +asyncio provides a set of **high-level** APIs to: -* concrete support for TCP, UDP, SSL, subprocess pipes, delayed calls, and - others (some may be system-dependent); +* :ref:`run Python coroutines <coroutine>` concurrently and + have full control over their execution; -* a :class:`Future` class that mimics the one in the :mod:`concurrent.futures` - module, but adapted for use with the event loop; +* perform :ref:`network IO and IPC <asyncio-streams>`; -* coroutines and tasks based on ``yield from`` (:PEP:`380`), to help write - concurrent code in a sequential fashion; +* control :ref:`subprocesses <asyncio-subprocess>`; -* cancellation support for :class:`Future`\s and coroutines; +* distribute tasks via :ref:`queues <asyncio-queues>`; -* :ref:`synchronization primitives <asyncio-sync>` for use between coroutines in - a single thread, mimicking those in the :mod:`threading` module; +* :ref:`synchronize <asyncio-sync>` concurrent code; -* an interface for passing work off to a threadpool, for times when - you absolutely, positively have to use a library that makes blocking - I/O calls. +Additionally, there are **low-level** APIs for +*library and framework developers* to: -Asynchronous programming is more complex than classical "sequential" -programming: see the :ref:`Develop with asyncio <asyncio-dev>` page which lists -common traps and explains how to avoid them. :ref:`Enable the debug mode -<asyncio-debug-mode>` during development to detect common issues. +* create and manage :ref:`event loops <asyncio-event-loop>`, which + provide asynchronous APIs for :meth:`networking <loop.create_server>`, + running :meth:`subprocesses <loop.subprocess_exec>`, + handling :meth:`OS signals <loop.add_signal_handler>`, etc; -Table of contents: +* implement efficient protocols using + :ref:`transports <asyncio-transports-protocols>`; + +* :ref:`bridge <asyncio-futures>` callback-based libraries and code + with async/await syntax. + + +.. We use the "rubric" directive here to avoid creating + the "Reference" subsection in the TOC. + +.. rubric:: Reference .. toctree:: - :maxdepth: 3 + :caption: High-level APIs + :maxdepth: 1 - asyncio-eventloop.rst - asyncio-eventloops.rst asyncio-task.rst - asyncio-protocol.rst asyncio-stream.rst - asyncio-subprocess.rst asyncio-sync.rst + asyncio-subprocess.rst asyncio-queue.rst - asyncio-dev.rst + asyncio-exceptions.rst + +.. toctree:: + :caption: Low-level APIs + :maxdepth: 1 -.. seealso:: + asyncio-eventloop.rst + asyncio-future.rst + asyncio-protocol.rst + asyncio-policy.rst + asyncio-platforms.rst - The :mod:`asyncio` module was designed in :PEP:`3156`. For a - motivational primer on transports and protocols, see :PEP:`3153`. +.. toctree:: + :caption: Guides and Tutorials + :maxdepth: 1 + asyncio-api-index.rst + asyncio-llapi-index.rst + asyncio-dev.rst diff --git a/Doc/library/ipc.rst b/Doc/library/ipc.rst index 6b1756331e..b88a174eb9 100644 --- a/Doc/library/ipc.rst +++ b/Doc/library/ipc.rst @@ -1,11 +1,11 @@ .. _ipc: ***************************************** -Interprocess Communication and Networking +Networking and Interprocess Communication ***************************************** -The modules described in this chapter provide mechanisms for different processes -to communicate. +The modules described in this chapter provide mechanisms for +networking and inter-processes communication. Some modules only work for two processes that are on the same machine, e.g. :mod:`signal` and :mod:`mmap`. Other modules support networking protocols @@ -15,12 +15,13 @@ The list of modules described in this chapter is: .. toctree:: + :maxdepth: 1 + asyncio.rst socket.rst ssl.rst select.rst selectors.rst - asyncio.rst asyncore.rst asynchat.rst signal.rst |