summaryrefslogtreecommitdiff
path: root/kafka/future.py
blob: 4a3af47132f824d7b4702457c816402dd061c0a9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import functools
import logging

log = logging.getLogger(__name__)


class Future(object):
    error_on_callbacks = False # and errbacks

    def __init__(self):
        self.is_done = False
        self.value = None
        self.exception = None
        self._callbacks = []
        self._errbacks = []

    def succeeded(self):
        return self.is_done and not bool(self.exception)

    def failed(self):
        return self.is_done and bool(self.exception)

    def retriable(self):
        try:
            return self.exception.retriable
        except AttributeError:
            return False

    def success(self, value):
        assert not self.is_done, 'Future is already complete'
        self.value = value
        self.is_done = True
        if self._callbacks:
            self._call_backs('callback', self._callbacks, self.value)
        return self

    def failure(self, e):
        assert not self.is_done, 'Future is already complete'
        self.exception = e if type(e) is not type else e()
        assert isinstance(self.exception, BaseException), (
            'future failed without an exception')
        self.is_done = True
        self._call_backs('errback', self._errbacks, self.exception)
        return self

    def add_callback(self, f, *args, **kwargs):
        if args or kwargs:
            f = functools.partial(f, *args, **kwargs)
        if self.is_done and not self.exception:
            self._call_backs('callback', [f], self.value)
        else:
            self._callbacks.append(f)
        return self

    def add_errback(self, f, *args, **kwargs):
        if args or kwargs:
            f = functools.partial(f, *args, **kwargs)
        if self.is_done and self.exception:
            self._call_backs('errback', [f], self.exception)
        else:
            self._errbacks.append(f)
        return self

    def add_both(self, f, *args, **kwargs):
        self.add_callback(f, *args, **kwargs)
        self.add_errback(f, *args, **kwargs)
        return self

    def chain(self, future):
        self.add_callback(future.success)
        self.add_errback(future.failure)
        return self

    def _call_backs(self, back_type, backs, value):
        for f in backs:
            try:
                f(value)
            except Exception as e:
                log.exception('Error processing %s', back_type)
                if self.error_on_callbacks:
                    raise e