summaryrefslogtreecommitdiff
path: root/kafka/future.py
blob: 06b8c3a04250cd7071da6bdc2ba7e7c304d6d4f8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import functools
import logging

import kafka.common as Errors

log = logging.getLogger(__name__)


class Future(object):
    def __init__(self):
        self.is_done = False
        self.value = None
        self.exception = None
        self._callbacks = []
        self._errbacks = []

    def succeeded(self):
        return self.is_done and not self.exception

    def failed(self):
        return self.is_done and self.exception

    def retriable(self):
        try:
            return self.exception.retriable
        except AttributeError:
            return False

    def success(self, value):
        assert not self.is_done, 'Future is already complete'
        self.value = value
        self.is_done = True
        for f in self._callbacks:
            try:
                f(value)
            except Exception:
                log.exception('Error processing callback')
        return self

    def failure(self, e):
        assert not self.is_done, 'Future is already complete'
        self.exception = e if type(e) is not type else e()
        assert isinstance(self.exception, BaseException), (
            'future failed without an exception')
        self.is_done = True
        for f in self._errbacks:
            try:
                f(self.exception)
            except Exception:
                log.exception('Error processing errback')
        return self

    def add_callback(self, f, *args, **kwargs):
        if args or kwargs:
            f = functools.partial(f, *args, **kwargs)
        if self.is_done and not self.exception:
            f(self.value)
        else:
            self._callbacks.append(f)
        return self

    def add_errback(self, f, *args, **kwargs):
        if args or kwargs:
            f = functools.partial(f, *args, **kwargs)
        if self.is_done and self.exception:
            f(self.exception)
        else:
            self._errbacks.append(f)
        return self

    def add_both(self, f, *args, **kwargs):
        self.add_callback(f, *args, **kwargs)
        self.add_errback(f, *args, **kwargs)
        return self

    def chain(self, future):
        self.add_callback(future.success)
        self.add_errback(future.failure)
        return self