summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTaras <voyn1991@gmail.com>2017-10-11 17:39:11 +0300
committerTaras <voyn1991@gmail.com>2017-10-12 11:10:44 +0300
commita8b25decf1d70e50223ab5c4fe5a122f0a9476ad (patch)
tree79b891b4326255f7c3e495a79c6362ec19e2e309
parenta12ca527a4b8ac77e21e63db7d47b4a68015b780 (diff)
downloadkafka-python-a8b25decf1d70e50223ab5c4fe5a122f0a9476ad.tar.gz
Remove the check for timestamp None in producer, as it's done in RecordBatch anyway.
Minor abc doc fixes.
-rw-r--r--kafka/consumer/fetcher.py6
-rw-r--r--kafka/producer/kafka.py2
-rw-r--r--kafka/record/abc.py21
3 files changed, 7 insertions, 22 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index 54a771a..493c1ff 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -467,12 +467,6 @@ class Fetcher(six.Iterator):
log.exception('StopIteration raised unpacking messageset: %s', e)
raise Exception('StopIteration raised unpacking messageset')
- # If unpacking raises AssertionError, it means decompression unsupported
- # See Issue 1033
- except AssertionError as e:
- log.exception('AssertionError raised unpacking messageset: %s', e)
- raise
-
def __iter__(self): # pylint: disable=non-iterator-returned
return self
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index f2a480b..a53ac49 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -546,8 +546,6 @@ class KafkaProducer(object):
self._ensure_valid_record_size(message_size)
tp = TopicPartition(topic, partition)
- if timestamp_ms is None:
- timestamp_ms = int(time.time() * 1000)
log.debug("Sending (key=%r value=%r) to %s", key, value, tp)
result = self._accumulator.append(tp, timestamp_ms,
key_bytes, value_bytes,
diff --git a/kafka/record/abc.py b/kafka/record/abc.py
index 4f14d76..3b2395a 100644
--- a/kafka/record/abc.py
+++ b/kafka/record/abc.py
@@ -36,28 +36,21 @@ class ABCRecord(object):
be the checksum for v0 and v1 and None for v2 and above.
"""
- @abc.abstractproperty
- def headers(self):
- """ If supported by version list of key-value tuples, or empty list if
- not supported by format.
- """
-
class ABCRecordBatchBuilder(object):
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
- def append(self, offset, timestamp, key, value, headers):
+ def append(self, offset, timestamp, key, value):
""" Writes record to internal buffer.
Arguments:
offset (int): Relative offset of record, starting from 0
- timestamp (int): Timestamp in milliseconds since beginning of the
- epoch (midnight Jan 1, 1970 (UTC))
+ timestamp (int or None): Timestamp in milliseconds since beginning
+ of the epoch (midnight Jan 1, 1970 (UTC)). If omited, will be
+ set to current time.
key (bytes or None): Key of the record
value (bytes or None): Value of the record
- headers (List[Tuple[str, bytes]]): Headers of the record. Header
- keys can not be ``None``.
Returns:
(bytes, int): Checksum of the written record (or None for v2 and
@@ -74,10 +67,10 @@ class ABCRecordBatchBuilder(object):
@abc.abstractmethod
def build(self):
""" Close for append, compress if needed, write size and header and
- return a ready to send bytes object.
+ return a ready to send buffer object.
Return:
- io.BytesIO: finished batch, ready to send.
+ bytearray: finished batch, ready to send.
"""
@@ -105,7 +98,7 @@ class ABCRecords(object):
@abc.abstractmethod
def size_in_bytes(self):
- """ Returns the size of buffer.
+ """ Returns the size of inner buffer.
"""
@abc.abstractmethod