summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2016-01-23 15:34:50 -0800
committerDana Powers <dana.powers@rd.io>2016-01-24 17:33:08 -0800
commit4761e242c16d184414602296feba4afe8040d14f (patch)
tree717ea62b1f8f7b96d53ab5f0e1197b43861f1eaa
parentf944392273baa6c28db82a76b1197fb498737275 (diff)
downloadkafka-python-4761e242c16d184414602296feba4afe8040d14f.tar.gz
Add thread-aware futures for use with KafkaProducer
-rw-r--r--kafka/producer/future.py66
1 files changed, 66 insertions, 0 deletions
diff --git a/kafka/producer/future.py b/kafka/producer/future.py
new file mode 100644
index 0000000..52c4ffc
--- /dev/null
+++ b/kafka/producer/future.py
@@ -0,0 +1,66 @@
+from __future__ import absolute_import
+
+import collections
+import threading
+
+from ..future import Future
+
+import kafka.common as Errors
+
+
+class FutureProduceResult(Future):
+ def __init__(self, topic_partition):
+ super(FutureProduceResult, self).__init__()
+ self.topic_partition = topic_partition
+ self._latch = threading.Event()
+
+ def success(self, value):
+ ret = super(FutureProduceResult, self).success(value)
+ self._latch.set()
+ return ret
+
+ def failure(self, error):
+ ret = super(FutureProduceResult, self).failure(error)
+ self._latch.set()
+ return ret
+
+ def await(self, timeout=None):
+ return self._latch.wait(timeout)
+
+
+class FutureRecordMetadata(Future):
+ def __init__(self, produce_future, relative_offset):
+ super(FutureRecordMetadata, self).__init__()
+ self._produce_future = produce_future
+ self.relative_offset = relative_offset
+ produce_future.add_callback(self._produce_success)
+ produce_future.add_errback(self.failure)
+
+ def _produce_success(self, base_offset):
+ self.success(RecordMetadata(self._produce_future.topic_partition,
+ base_offset, self.relative_offset))
+
+ def get(self, timeout=None):
+ if not self.is_done and not self._produce_future.await(timeout):
+ raise Errors.KafkaTimeoutError(
+ "Timeout after waiting for %s secs." % timeout)
+ assert self.is_done
+ if self.failed():
+ raise self.exception # pylint: disable-msg=raising-bad-type
+ return self.value
+
+
+class RecordMetadata(collections.namedtuple(
+ 'RecordMetadata', 'topic partition topic_partition offset')):
+ def __new__(cls, tp, base_offset, relative_offset=None):
+ offset = base_offset
+ if relative_offset is not None and base_offset != -1:
+ offset += relative_offset
+ return super(RecordMetadata, cls).__new__(cls, tp.topic, tp.partition, tp, offset)
+
+ def __str__(self):
+ return 'RecordMetadata(topic=%s, partition=%s, offset=%s)' % (
+ self.topic, self.partition, self.offset)
+
+ def __repr__(self):
+ return str(self)