summaryrefslogtreecommitdiff
path: root/polling.py
blob: a9e1491ab300a7537b23803715b49138c760c2ae (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
"""Event loop and related classes.

The event loop can be broken up into a pollster (the part responsible
for telling us when file descriptors are ready) and the event loop
proper, which wraps a pollster with functionality for scheduling
callbacks, immediately or at a given time in the future.

Whenever a public API takes a callback, subsequent positional
arguments will be passed to the callback if/when it is called.  This
avoids the proliferation of trivial lambdas implementing closures.
Keyword arguments for the callback are not supported; this is a
conscious design decision, leaving the door open for keyword arguments
to modify the meaning of the API call itself.

There are several implementations of the pollster part, several using
esoteric system calls that exist only on some platforms.  These are:

- kqueue (most BSD systems)
- epoll (newer Linux systems)
- poll (most UNIX systems)
- select (all UNIX systems, and Windows)
- TODO: Support IOCP on Windows and some UNIX platforms.

NOTE: We don't use select on systems where any of the others is
available, because select performs poorly as the number of file
descriptors goes up.  The ranking is roughly:

  1. kqueue, epoll, IOCP
  2. poll
  3. select

TODO:
- Optimize the various pollsters.
- Unittests.
"""

import collections
import concurrent.futures
import heapq
import logging
import os
import select
import time
import socket

# local imports
from proactor import Proactor, Future


class DelayedCall:
    """Object returned by callback registration methods."""

    def __init__(self, when, callback, args, kwds=None):
        self.when = when
        self.callback = callback
        self.args = args
        self.kwds = kwds
        self.cancelled = False

    def cancel(self):
        self.cancelled = True

    def __lt__(self, other):
        return self.when < other.when

    def __le__(self, other):
        return self.when <= other.when

    def __eq__(self, other):
        return self.when == other.when


class EventLoop:
    """Event loop functionality.

    This defines public APIs call_soon(), call_later(), run_once() and
    run().  It also wraps Pollster APIs register_reader(),
    register_writer(), remove_reader(), remove_writer() with
    add_reader() etc.

    This class's instance variables are not part of its API.
    """

    def __init__(self, proactor=None):
        super().__init__()
        if proactor is None:
            logging.info('Using proactor: %s', Proactor.__name__)
            proactor = Proactor()
        self.proactor = proactor
        self.ready = collections.deque()  # [(callback, args), ...]
        self.scheduled = []  # [(when, callback, args), ...]

    def add_callback(self, dcall):
        """Add a DelayedCall to ready or scheduled."""
        if dcall.cancelled:
            return
        if dcall.when is None:
            self.ready.append(dcall)
        else:
            heapq.heappush(self.scheduled, dcall)

    def call_soon(self, callback, *args):
        """Arrange for a callback to be called as soon as possible.

        This operates as a FIFO queue, callbacks are called in the
        order in which they are registered.  Each callback will be
        called exactly once.

        Any positional arguments after the callback will be passed to
        the callback when it is called.
        """
        dcall = DelayedCall(None, callback, args)
        self.ready.append(dcall)
        return dcall

    def call_later(self, when, callback, *args):
        """Arrange for a callback to be called at a given time.

        Return an object with a cancel() method that can be used to
        cancel the call.

        The time can be an int or float, expressed in seconds.

        If when is small enough (~11 days), it's assumed to be a
        relative time, meaning the call will be scheduled that many
        seconds in the future; otherwise it's assumed to be a posix
        timestamp as returned by time.time().

        Each callback will be called exactly once.  If two callbacks
        are scheduled for exactly the same time, it undefined which
        will be called first.

        Any positional arguments after the callback will be passed to
        the callback when it is called.
        """
        if when < 10000000:
            when += time.time()
        dcall = DelayedCall(when, callback, args)
        heapq.heappush(self.scheduled, dcall)
        return dcall

    def run_once(self):
        """Run one full iteration of the event loop.

        This calls all currently ready callbacks, polls for I/O,
        schedules the resulting callbacks, and finally schedules
        'call_later' callbacks.
        """
        # TODO: Break each of these into smaller pieces.
        # TODO: Pass in a timeout or deadline or something.
        # TODO: Refactor to separate the callbacks from the readers/writers.
        # TODO: As step 4, run everything scheduled by steps 1-3.
        # TODO: An alternative API would be to do the *minimal* amount
        # of work, e.g. one callback or one I/O poll.

        # This is the only place where callbacks are actually *called*.
        # All other places just add them to ready.
        # TODO: Ensure this loop always finishes, even if some
        # callbacks keeps registering more callbacks.
        while self.ready:
            dcall = self.ready.popleft()
            if not dcall.cancelled:
                try:
                    if dcall.kwds:
                        dcall.callback(*dcall.args, **dcall.kwds)
                    else:
                        dcall.callback(*dcall.args)
                except Exception:
                    logging.exception('Exception in callback %s %r',
                                      dcall.callback, dcall.args)

        # Remove delayed calls that were cancelled from head of queue.
        while self.scheduled and self.scheduled[0].cancelled:
            heapq.heappop(self.scheduled)

        # Inspect the poll queue.
        if self.proactor.pollable():
            if self.scheduled:
                when = self.scheduled[0].when
                timeout = max(0, when - time.time())
            else:
                timeout = None
            t0 = time.time()
            # done callbacks added to ready futures get run by poll()
            self.proactor.poll(timeout)
            t1 = time.time()
            argstr = '' if timeout is None else ' %.3f' % timeout
            if t1-t0 >= 1:
                level = logging.INFO
            else:
                level = logging.DEBUG
            logging.log(level, 'poll%s took %.3f seconds', argstr, t1-t0)

        # Handle 'later' callbacks that are ready.
        while self.scheduled:
            dcall = self.scheduled[0]
            if dcall.when > time.time():
                break
            dcall = heapq.heappop(self.scheduled)
            self.call_soon(dcall.callback, *dcall.args)

    def run(self):
        """Run the event loop until there is no work left to do.

        This keeps going as long as there are either readable and
        writable file descriptors, or scheduled callbacks (of either
        variety).
        """
        while self.ready or self.scheduled or self.proactor.pollable():
            self.run_once()



MAX_WORKERS = 5  # Default max workers when creating an executor.

try:
    from socket import socketpair
except ImportError:
    def socketpair():
        with socket.socket() as l:
            l.bind(('127.0.0.1', 0))
            l.listen(1)
            c = socket.socket()
            c.connect(l.getsockname())
            a, _ = l.accept()
            return a, c

class ThreadRunner:
    """Helper to submit work to a thread pool and wait for it.

    This is the glue between the single-threaded callback-based async
    world and the threaded world.  Use it to call functions that must
    block and don't have an async alternative (e.g. getaddrinfo()).

    The only public API is submit().
    """

    def __init__(self, eventloop, executor=None):
        self.eventloop = eventloop
        self.executor = executor  # Will be constructed lazily.
        self.rsock, self.wsock = socketpair()
        self.rsock.settimeout(0)
        self.active_count = 0
        self.read_future = None

    def start_read(self):
        while self.active_count > 0:
            try:
                res = self.eventloop.proactor.recv(self.rsock, 8192)
            except Future as f:
                self.read_future = f
                self.read_future.add_done_callback(self.read_callback)
                break
            else:
                self.active_count -= len(res)

    def read_callback(self, f):
        assert self.active_count > 0, self.active_count
        assert f is self.read_future
        try:
            self.active_count -= len(self.read_future.result())
        except OSError:
            pass
        finally:
            self.read_future = None
        assert self.active_count >= 0, self.active_count
        if self.active_count > 0:
            self.start_read()

    def submit(self, func, *args, executor=None, insert_callback=None):
        """Submit a function to the thread pool.

        This returns a concurrent.futures.Future instance.  The caller
        should not wait for that, but rather add a callback to it.
        """
        if executor is None:
            executor = self.executor
            if executor is None:
                # Lazily construct a default executor.
                # TODO: Should this be shared between threads?
                executor = concurrent.futures.ThreadPoolExecutor(MAX_WORKERS)
                self.executor = executor
        assert self.active_count >= 0, self.active_count
        self.active_count += 1
        future = executor.submit(func, *args)
        if self.read_future is None or self.read_future.done():
            self.start_read()
        if insert_callback is not None:
            future.add_done_callback(insert_callback)
        def done_callback(future):
            self.wsock.sendall(b'x')
        future.add_done_callback(done_callback)
        return future