summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-04-25 16:27:37 -0700
committerDana Powers <dana.powers@gmail.com>2016-04-25 16:27:37 -0700
commit22dd002800839fd0788648e8308104bb012d96b7 (patch)
treefb99058c606a14d46b911ded6ea640364d562b3f
parentc7564d239795f9d20f5a027f32130ee1ae84ab3e (diff)
parent87e71b4e781a1d1a92d4e538f44036ae9ff9c593 (diff)
downloadkafka-python-22dd002800839fd0788648e8308104bb012d96b7.tar.gz
Merge pull request #670 from zackdever/predictable-future
Consistent error handling in future call/errbacks + better test failures
-rw-r--r--kafka/future.py27
-rw-r--r--test/__init__.py3
-rw-r--r--test/test_coordinator.py2
3 files changed, 19 insertions, 13 deletions
diff --git a/kafka/future.py b/kafka/future.py
index 2c8fd86..c22755a 100644
--- a/kafka/future.py
+++ b/kafka/future.py
@@ -5,6 +5,8 @@ log = logging.getLogger(__name__)
class Future(object):
+ error_on_callbacks = False # and errbacks
+
def __init__(self):
self.is_done = False
self.value = None
@@ -28,11 +30,7 @@ class Future(object):
assert not self.is_done, 'Future is already complete'
self.value = value
self.is_done = True
- for f in self._callbacks:
- try:
- f(value)
- except Exception:
- log.exception('Error processing callback')
+ self._call_backs('callback', self._callbacks, self.value)
return self
def failure(self, e):
@@ -41,18 +39,14 @@ class Future(object):
assert isinstance(self.exception, BaseException), (
'future failed without an exception')
self.is_done = True
- for f in self._errbacks:
- try:
- f(self.exception)
- except Exception:
- log.exception('Error processing errback')
+ self._call_backs('errback', self._errbacks, self.exception)
return self
def add_callback(self, f, *args, **kwargs):
if args or kwargs:
f = functools.partial(f, *args, **kwargs)
if self.is_done and not self.exception:
- f(self.value)
+ self._call_backs('callback', [f], self.value)
else:
self._callbacks.append(f)
return self
@@ -61,7 +55,7 @@ class Future(object):
if args or kwargs:
f = functools.partial(f, *args, **kwargs)
if self.is_done and self.exception:
- f(self.exception)
+ self._call_backs('callback', [f], self.exception)
else:
self._errbacks.append(f)
return self
@@ -75,3 +69,12 @@ class Future(object):
self.add_callback(future.success)
self.add_errback(future.failure)
return self
+
+ def _call_backs(self, back_type, backs, value):
+ for f in backs:
+ try:
+ f(value)
+ except Exception as e:
+ log.exception('Error processing %s', back_type)
+ if self.error_on_callbacks:
+ raise e
diff --git a/test/__init__.py b/test/__init__.py
index f91d0fa..0eb2edc 100644
--- a/test/__init__.py
+++ b/test/__init__.py
@@ -15,3 +15,6 @@ except ImportError:
pass
logging.getLogger(__name__).addHandler(NullHandler())
+
+from kafka.future import Future
+Future.error_on_callbacks = True # always fail during testing
diff --git a/test/test_coordinator.py b/test/test_coordinator.py
index 4b90f30..15b915d 100644
--- a/test/test_coordinator.py
+++ b/test/test_coordinator.py
@@ -548,7 +548,7 @@ def test_send_offset_fetch_request_success(patched_coord, partitions):
patched_coord._client.send.return_value = _f
future = patched_coord._send_offset_fetch_request(partitions)
(node, request), _ = patched_coord._client.send.call_args
- response = OffsetFetchResponse[0]([('foobar', [(0, 0), (1, 0)])])
+ response = OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 0), (1, 234, b'', 0)])])
_f.success(response)
patched_coord._handle_offset_fetch_response.assert_called_with(
future, response)