diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-04-25 16:27:37 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-04-25 16:27:37 -0700 |
commit | 22dd002800839fd0788648e8308104bb012d96b7 (patch) | |
tree | fb99058c606a14d46b911ded6ea640364d562b3f | |
parent | c7564d239795f9d20f5a027f32130ee1ae84ab3e (diff) | |
parent | 87e71b4e781a1d1a92d4e538f44036ae9ff9c593 (diff) | |
download | kafka-python-22dd002800839fd0788648e8308104bb012d96b7.tar.gz |
Merge pull request #670 from zackdever/predictable-future
Consistent error handling in future call/errbacks + better test failures
-rw-r--r-- | kafka/future.py | 27 | ||||
-rw-r--r-- | test/__init__.py | 3 | ||||
-rw-r--r-- | test/test_coordinator.py | 2 |
3 files changed, 19 insertions, 13 deletions
diff --git a/kafka/future.py b/kafka/future.py index 2c8fd86..c22755a 100644 --- a/kafka/future.py +++ b/kafka/future.py @@ -5,6 +5,8 @@ log = logging.getLogger(__name__) class Future(object): + error_on_callbacks = False # and errbacks + def __init__(self): self.is_done = False self.value = None @@ -28,11 +30,7 @@ class Future(object): assert not self.is_done, 'Future is already complete' self.value = value self.is_done = True - for f in self._callbacks: - try: - f(value) - except Exception: - log.exception('Error processing callback') + self._call_backs('callback', self._callbacks, self.value) return self def failure(self, e): @@ -41,18 +39,14 @@ class Future(object): assert isinstance(self.exception, BaseException), ( 'future failed without an exception') self.is_done = True - for f in self._errbacks: - try: - f(self.exception) - except Exception: - log.exception('Error processing errback') + self._call_backs('errback', self._errbacks, self.exception) return self def add_callback(self, f, *args, **kwargs): if args or kwargs: f = functools.partial(f, *args, **kwargs) if self.is_done and not self.exception: - f(self.value) + self._call_backs('callback', [f], self.value) else: self._callbacks.append(f) return self @@ -61,7 +55,7 @@ class Future(object): if args or kwargs: f = functools.partial(f, *args, **kwargs) if self.is_done and self.exception: - f(self.exception) + self._call_backs('callback', [f], self.exception) else: self._errbacks.append(f) return self @@ -75,3 +69,12 @@ class Future(object): self.add_callback(future.success) self.add_errback(future.failure) return self + + def _call_backs(self, back_type, backs, value): + for f in backs: + try: + f(value) + except Exception as e: + log.exception('Error processing %s', back_type) + if self.error_on_callbacks: + raise e diff --git a/test/__init__.py b/test/__init__.py index f91d0fa..0eb2edc 100644 --- a/test/__init__.py +++ b/test/__init__.py @@ -15,3 +15,6 @@ except ImportError: pass logging.getLogger(__name__).addHandler(NullHandler()) + +from kafka.future import Future +Future.error_on_callbacks = True # always fail during testing diff --git a/test/test_coordinator.py b/test/test_coordinator.py index 4b90f30..15b915d 100644 --- a/test/test_coordinator.py +++ b/test/test_coordinator.py @@ -548,7 +548,7 @@ def test_send_offset_fetch_request_success(patched_coord, partitions): patched_coord._client.send.return_value = _f future = patched_coord._send_offset_fetch_request(partitions) (node, request), _ = patched_coord._client.send.call_args - response = OffsetFetchResponse[0]([('foobar', [(0, 0), (1, 0)])]) + response = OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 0), (1, 234, b'', 0)])]) _f.success(response) patched_coord._handle_offset_fetch_response.assert_called_with( future, response) |