diff options
author | Dana Powers <dana.powers@rd.io> | 2015-12-28 13:24:37 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-12-28 14:57:54 -0800 |
commit | 87257bddf23850774794d4dc070a15ddddbd7830 (patch) | |
tree | aded09128b8b8dd1110168fb6604b04623668433 /kafka/future.py | |
parent | b643ba1a1ed7838625012e6f75a1ed9b35ffd022 (diff) | |
download | kafka-python-87257bddf23850774794d4dc070a15ddddbd7830.tar.gz |
Improvements to kafka.future.Future
- log exceptions in callbacks/errors - dont raise
- guarantee future.exception is an instance, not a class/type
- *args, **kwargs in add_callback / errback (functools.partial)
- add_both(f, *args, **kwargs) to add same f as callback and errback
- chain(new_future) to trigger new_future on success / failure of this
Diffstat (limited to 'kafka/future.py')
-rw-r--r-- | kafka/future.py | 45 |
1 files changed, 37 insertions, 8 deletions
diff --git a/kafka/future.py b/kafka/future.py index 20c31cf..1f22cb7 100644 --- a/kafka/future.py +++ b/kafka/future.py @@ -1,4 +1,9 @@ -from kafka.common import IllegalStateError +import functools +import logging + +import kafka.common as Errors + +log = logging.getLogger(__name__) class Future(object): @@ -23,32 +28,56 @@ class Future(object): def success(self, value): if self.is_done: - raise IllegalStateError('Invalid attempt to complete a request future which is already complete') + raise Errors.IllegalStateError('Invalid attempt to complete a' + ' request future which is already' + ' complete') self.value = value self.is_done = True for f in self._callbacks: - f(value) + try: + f(value) + except Exception: + log.exception('Error processing callback') return self def failure(self, e): if self.is_done: - raise IllegalStateError('Invalid attempt to complete a request future which is already complete') - self.exception = e + raise Errors.IllegalStateError('Invalid attempt to complete a' + ' request future which is already' + ' complete') + self.exception = e if type(e) is not type else e() self.is_done = True for f in self._errbacks: - f(e) + try: + f(e) + except Exception: + log.exception('Error processing errback') return self - def add_callback(self, f): + def add_callback(self, f, *args, **kwargs): + if args or kwargs: + f = functools.partial(f, *args, **kwargs) if self.is_done and not self.exception: f(self.value) else: self._callbacks.append(f) return self - def add_errback(self, f): + def add_errback(self, f, *args, **kwargs): + if args or kwargs: + f = functools.partial(f, *args, **kwargs) if self.is_done and self.exception: f(self.exception) else: self._errbacks.append(f) return self + + def add_both(self, f, *args, **kwargs): + self.add_callback(f, *args, **kwargs) + self.add_errback(f, *args, **kwargs) + return self + + def chain(self, future): + self.add_callback(future.success) + self.add_errback(future.failure) + return self |