diff options
author | Taras <voyn1991@gmail.com> | 2017-10-11 17:39:11 +0300 |
---|---|---|
committer | Taras <voyn1991@gmail.com> | 2017-10-12 11:10:44 +0300 |
commit | a8b25decf1d70e50223ab5c4fe5a122f0a9476ad (patch) | |
tree | 79b891b4326255f7c3e495a79c6362ec19e2e309 | |
parent | a12ca527a4b8ac77e21e63db7d47b4a68015b780 (diff) | |
download | kafka-python-a8b25decf1d70e50223ab5c4fe5a122f0a9476ad.tar.gz |
Remove the check for timestamp None in producer, as it's done in RecordBatch anyway.
Minor abc doc fixes.
-rw-r--r-- | kafka/consumer/fetcher.py | 6 | ||||
-rw-r--r-- | kafka/producer/kafka.py | 2 | ||||
-rw-r--r-- | kafka/record/abc.py | 21 |
3 files changed, 7 insertions, 22 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 54a771a..493c1ff 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -467,12 +467,6 @@ class Fetcher(six.Iterator): log.exception('StopIteration raised unpacking messageset: %s', e) raise Exception('StopIteration raised unpacking messageset') - # If unpacking raises AssertionError, it means decompression unsupported - # See Issue 1033 - except AssertionError as e: - log.exception('AssertionError raised unpacking messageset: %s', e) - raise - def __iter__(self): # pylint: disable=non-iterator-returned return self diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index f2a480b..a53ac49 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -546,8 +546,6 @@ class KafkaProducer(object): self._ensure_valid_record_size(message_size) tp = TopicPartition(topic, partition) - if timestamp_ms is None: - timestamp_ms = int(time.time() * 1000) log.debug("Sending (key=%r value=%r) to %s", key, value, tp) result = self._accumulator.append(tp, timestamp_ms, key_bytes, value_bytes, diff --git a/kafka/record/abc.py b/kafka/record/abc.py index 4f14d76..3b2395a 100644 --- a/kafka/record/abc.py +++ b/kafka/record/abc.py @@ -36,28 +36,21 @@ class ABCRecord(object): be the checksum for v0 and v1 and None for v2 and above. """ - @abc.abstractproperty - def headers(self): - """ If supported by version list of key-value tuples, or empty list if - not supported by format. - """ - class ABCRecordBatchBuilder(object): __metaclass__ = abc.ABCMeta @abc.abstractmethod - def append(self, offset, timestamp, key, value, headers): + def append(self, offset, timestamp, key, value): """ Writes record to internal buffer. Arguments: offset (int): Relative offset of record, starting from 0 - timestamp (int): Timestamp in milliseconds since beginning of the - epoch (midnight Jan 1, 1970 (UTC)) + timestamp (int or None): Timestamp in milliseconds since beginning + of the epoch (midnight Jan 1, 1970 (UTC)). If omited, will be + set to current time. key (bytes or None): Key of the record value (bytes or None): Value of the record - headers (List[Tuple[str, bytes]]): Headers of the record. Header - keys can not be ``None``. Returns: (bytes, int): Checksum of the written record (or None for v2 and @@ -74,10 +67,10 @@ class ABCRecordBatchBuilder(object): @abc.abstractmethod def build(self): """ Close for append, compress if needed, write size and header and - return a ready to send bytes object. + return a ready to send buffer object. Return: - io.BytesIO: finished batch, ready to send. + bytearray: finished batch, ready to send. """ @@ -105,7 +98,7 @@ class ABCRecords(object): @abc.abstractmethod def size_in_bytes(self): - """ Returns the size of buffer. + """ Returns the size of inner buffer. """ @abc.abstractmethod |