summaryrefslogtreecommitdiff
path: root/tulip/tasks.py
blob: f385f49beecb643156c377b869d9f2ac1fdef24c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
"""Support for tasks, coroutines and the scheduler."""

__all__ = ['coroutine', 'task', 'Task',
           'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
           'wait', 'as_completed', 'sleep',
           ]

import concurrent.futures
import functools
import inspect
import time

from . import futures
from .log import tulip_log


def coroutine(func):
    """Decorator to mark coroutines.

    Decorator wraps non generator functions and returns generator wrapper.
    If non generator function returns generator of Future it yield-from it.

    TODO: This is a feel-good API only. It is not enforced.
    """
    if inspect.isgeneratorfunction(func):
        coro = func
    else:
        tulip_log.warning(
            'Coroutine function %s is not a generator.', func.__name__)

        @functools.wraps(func)
        def coro(*args, **kw):
            res = func(*args, **kw)

            if isinstance(res, futures.Future) or inspect.isgenerator(res):
                res = yield from res

            return res

    coro._is_coroutine = True  # Not sure who can use this.
    return coro


# TODO: Do we need this?
def iscoroutinefunction(func):
    """Return True if func is a decorated coroutine function."""
    return (inspect.isgeneratorfunction(func) and
            getattr(func, '_is_coroutine', False))


# TODO: Do we need this?
def iscoroutine(obj):
    """Return True if obj is a coroutine object."""
    return inspect.isgenerator(obj)  # TODO: And what?


def task(func):
    """Decorator for a coroutine to be wrapped in a Task."""
    def task_wrapper(*args, **kwds):
        coro = func(*args, **kwds)
        return Task(coro)
    return task_wrapper


class Task(futures.Future):
    """A coroutine wrapped in a Future."""

    def __init__(self, coro, event_loop=None, timeout=None):
        assert inspect.isgenerator(coro)  # Must be a coroutine *object*.
        super().__init__(event_loop=event_loop, timeout=timeout)
        self._coro = coro
        self._must_cancel = False
        self._event_loop.call_soon(self._step)

    def __repr__(self):
        res = super().__repr__()
        if (self._must_cancel and
            self._state == futures._PENDING and
            '<PENDING' in res):
            res = res.replace('<PENDING', '<CANCELLING', 1)
        i = res.find('<')
        if i < 0:
            i = len(res)
        res = res[:i] + '(<{}>)'.format(self._coro.__name__) + res[i:]
        return res

    def cancel(self):
        if self.done():
            return False
        self._must_cancel = True
        # _step() will call super().cancel() to call the callbacks.
        self._event_loop.call_soon(self._step_maybe)
        return True

    def cancelled(self):
        return self._must_cancel or super().cancelled()

    def _step_maybe(self):
        # Helper for cancel().
        if not self.done():
            return self._step()

    def _step(self, value=None, exc=None):
        if self.done():
            tulip_log.warning(
                '_step(): already done: %r, %r, %r', self, value, exc)
            return
        # We'll call either coro.throw(exc) or coro.send(value).
        if self._must_cancel:
            exc = futures.CancelledError
        coro = self._coro
        try:
            if exc is not None:
                result = coro.throw(exc)
            elif value is not None:
                result = coro.send(value)
            else:
                result = next(coro)
        except StopIteration as exc:
            if self._must_cancel:
                super().cancel()
            else:
                self.set_result(exc.value)
        except Exception as exc:
            if self._must_cancel:
                super().cancel()
            else:
                self.set_exception(exc)
                tulip_log.exception('Exception in task')
        except BaseException as exc:
            if self._must_cancel:
                super().cancel()
            else:
                self.set_exception(exc)
                tulip_log.exception('BaseException in task')
            raise
        else:
            # XXX No check for self._must_cancel here?
            if isinstance(result, futures.Future):
                if not result._blocking:
                    self._event_loop.call_soon(
                        self._step,
                        None, RuntimeError(
                            'yield was used instead of yield from in task %r '
                            'with %r' % (self, result)))
                else:
                    result._blocking = False
                    result.add_done_callback(self._wakeup)

            elif isinstance(result, concurrent.futures.Future):
                # This ought to be more efficient than wrap_future(),
                # because we don't create an extra Future.
                result.add_done_callback(
                    lambda future:
                        self._event_loop.call_soon_threadsafe(
                            self._wakeup, future))
            else:
                if inspect.isgenerator(result):
                    self._event_loop.call_soon(
                        self._step,
                        None, RuntimeError(
                            'yield was used instead of yield from for '
                            'generator in task %r with %s' % (self, result)))
                else:
                    if result is not None:
                        tulip_log.warning('_step(): bad yield: %r', result)

                    self._event_loop.call_soon(self._step)

    def _wakeup(self, future):
        try:
            value = future.result()
        except Exception as exc:
            self._step(None, exc)
        else:
            self._step(value, None)


# wait() and as_completed() similar to those in PEP 3148.

FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
ALL_COMPLETED = concurrent.futures.ALL_COMPLETED


# Even though this *is* a @coroutine, we don't mark it as such!
def wait(fs, timeout=None, return_when=ALL_COMPLETED):
    """Wait for the Futures and and coroutines given by fs to complete.

    Coroutines will be wrapped in Tasks.

    Returns two sets of Future: (done, pending).

    Usage:

        done, pending = yield from tulip.wait(fs)

    Note: This does not raise TimeoutError!  Futures that aren't done
    when the timeout occurs are returned in the second set.
    """
    fs = _wrap_coroutines(fs)
    return _wait(fs, timeout, return_when)


@coroutine
def _wait(fs, timeout=None, return_when=ALL_COMPLETED):
    """Internal helper: Like wait() but does not wrap coroutines."""
    done, pending = set(), set()

    errors = 0
    for f in fs:
        if f.done():
            done.add(f)
            if not f.cancelled() and f.exception() is not None:
                errors += 1
        else:
            pending.add(f)

    if (not pending or
        timeout is not None and timeout <= 0 or
        return_when == FIRST_COMPLETED and done or
        return_when == FIRST_EXCEPTION and errors):
        return done, pending

    # Will always be cancelled eventually.
    bail = futures.Future(timeout=timeout)

    def _on_completion(f):
        pending.remove(f)
        done.add(f)
        if (not pending or
            return_when == FIRST_COMPLETED or
            (return_when == FIRST_EXCEPTION and
             not f.cancelled() and
             f.exception() is not None)):
            bail.cancel()

    try:
        for f in pending:
            f.add_done_callback(_on_completion)
        try:
            yield from bail
        except futures.CancelledError:
            pass
    finally:
        for f in pending:
            f.remove_done_callback(_on_completion)

    really_done = set(f for f in pending if f.done())
    if really_done:
        done.update(really_done)
        pending.difference_update(really_done)

    return done, pending


# This is *not* a @coroutine!  It is just an iterator (yielding Futures).
def as_completed(fs, timeout=None):
    """Return an iterator whose values, when waited for, are Futures.

    This differs from PEP 3148; the proper way to use this is:

        for f in as_completed(fs):
            result = yield from f  # The 'yield from' may raise.
            # Use result.

    Raises TimeoutError if the timeout occurs before all Futures are
    done.

    Note: The futures 'f' are not necessarily members of fs.
    """
    deadline = None
    if timeout is not None:
        deadline = time.monotonic() + timeout

    done = None  # Make nonlocal happy.
    fs = _wrap_coroutines(fs)

    while fs:
        if deadline is not None:
            timeout = deadline - time.monotonic()

        @coroutine
        def _wait_for_some():
            nonlocal done, fs
            done, fs = yield from _wait(fs, timeout=timeout,
                                        return_when=FIRST_COMPLETED)
            if not done:
                fs = set()
                raise futures.TimeoutError()
            return done.pop().result()  # May raise.

        yield Task(_wait_for_some())
        for f in done:
            yield f


def _wrap_coroutines(fs):
    """Internal helper to process an iterator of Futures and coroutines.

    Returns a set of Futures.
    """
    wrapped = set()
    for f in fs:
        if not isinstance(f, futures.Future):
            assert iscoroutine(f)
            f = Task(f)
        wrapped.add(f)
    return wrapped


def sleep(when, result=None):
    """Return a Future that completes after a given time (in seconds).

    It's okay to cancel the Future.

    Undocumented feature: sleep(when, x) sets the Future's result to x.
    """
    future = futures.Future()
    future._event_loop.call_later(when, future.set_result, result)
    return future