diff options
author | Taras <voyn1991@gmail.com> | 2017-10-12 11:07:14 +0300 |
---|---|---|
committer | Taras <voyn1991@gmail.com> | 2017-10-12 11:10:44 +0300 |
commit | d10051bb09942bfd48c6f262a8cdbf5651963c2e (patch) | |
tree | 1f6a262324e3ced9c8655d71fbed5e9ed8a0234d | |
parent | e992fbfad926486766ff7b63a499f9cf29984fec (diff) | |
download | kafka-python-d10051bb09942bfd48c6f262a8cdbf5651963c2e.tar.gz |
Added minor fixes for PR review
-rw-r--r-- | benchmarks/record_batch_compose.py | 12 | ||||
-rw-r--r-- | benchmarks/record_batch_read.py | 10 | ||||
-rw-r--r-- | kafka/producer/kafka.py | 2 | ||||
-rw-r--r-- | kafka/producer/record_accumulator.py | 18 | ||||
-rw-r--r-- | kafka/protocol/message.py | 3 | ||||
-rw-r--r-- | kafka/record/abc.py | 2 |
6 files changed, 20 insertions, 27 deletions
diff --git a/benchmarks/record_batch_compose.py b/benchmarks/record_batch_compose.py index 11320ca..86012df 100644 --- a/benchmarks/record_batch_compose.py +++ b/benchmarks/record_batch_compose.py @@ -1,11 +1,13 @@ #!/usr/bin/env python3 from __future__ import print_function -import perf -from kafka.record.memory_records import MemoryRecordsBuilder -import itertools -import random import hashlib +import itertools import os +import random + +import perf + +from kafka.record.memory_records import MemoryRecordsBuilder DEFAULT_BATCH_SIZE = 1600 * 1024 @@ -13,7 +15,7 @@ KEY_SIZE = 6 VALUE_SIZE = 60 TIMESTAMP_RANGE = [1505824130000, 1505824140000] -# With values above v1 record is 100 bytes, so 10_000 bytes for 100 messages +# With values above v1 record is 100 bytes, so 10 000 bytes for 100 messages MESSAGES_PER_BATCH = 100 diff --git a/benchmarks/record_batch_read.py b/benchmarks/record_batch_read.py index 4ded5b0..7ae471e 100644 --- a/benchmarks/record_batch_read.py +++ b/benchmarks/record_batch_read.py @@ -1,11 +1,13 @@ #!/usr/bin/env python from __future__ import print_function -import perf -from kafka.record.memory_records import MemoryRecords, MemoryRecordsBuilder -import itertools -import random import hashlib +import itertools import os +import random + +import perf + +from kafka.record.memory_records import MemoryRecords, MemoryRecordsBuilder DEFAULT_BATCH_SIZE = 1600 * 1024 diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index a53ac49..5638b61 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -370,7 +370,7 @@ class KafkaProducer(object): else: checker, compression_attrs = self._COMPRESSORS[ct] assert checker(), "Libraries for {} compression codec not found".format(ct) - self.config['compression_type'] = compression_attrs + self.config['compression_attrs'] = compression_attrs message_version = self._max_usable_produce_magic() self._accumulator = RecordAccumulator(message_version=message_version, metrics=self._metrics, **self.config) diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py index 0c0ce27..716ae65 100644 --- a/kafka/producer/record_accumulator.py +++ b/kafka/producer/record_accumulator.py @@ -149,7 +149,7 @@ class RecordAccumulator(object): will block up to max_block_ms, raising an exception on timeout. In the current implementation, this setting is an approximation. Default: 33554432 (32MB) - compression_type (int): The compression type for all data generated by + compression_attrs (int): The compression type for all data generated by the producer. Valid values are gzip(1), snappy(2), lz4(3), or none(0). Compression is of full batches of data, so the efficacy of batching @@ -168,7 +168,7 @@ class RecordAccumulator(object): DEFAULT_CONFIG = { 'buffer_memory': 33554432, 'batch_size': 16384, - 'compression_type': None, + 'compression_attrs': 0, 'linger_ms': 0, 'retry_backoff_ms': 100, 'message_version': 0, @@ -176,24 +176,12 @@ class RecordAccumulator(object): 'metric_group_prefix': 'producer-metrics', } - _COMPRESSORS = { - 'gzip': LegacyRecordBatchBuilder.CODEC_GZIP, - 'snappy': LegacyRecordBatchBuilder.CODEC_SNAPPY, - 'lz4': LegacyRecordBatchBuilder.CODEC_LZ4, - None: LegacyRecordBatchBuilder.CODEC_NONE - } - def __init__(self, **configs): self.config = copy.copy(self.DEFAULT_CONFIG) for key in self.config: if key in configs: self.config[key] = configs.pop(key) - # Convert compression type to INT presentation. Mostly for unit tests, - # as Producer should pass already converted values. - ct = self.config["compression_type"] - self.config["compression_type"] = self._COMPRESSORS.get(ct, ct) - self._closed = False self._flushes_in_progress = AtomicInteger() self._appends_in_progress = AtomicInteger() @@ -269,7 +257,7 @@ class RecordAccumulator(object): records = MemoryRecordsBuilder( self.config['message_version'], - self.config['compression_type'], + self.config['compression_attrs'], self.config['batch_size'] ) diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py index f5a51a9..a330ed8 100644 --- a/kafka/protocol/message.py +++ b/kafka/protocol/message.py @@ -161,7 +161,8 @@ class MessageSet(AbstractType): if prepend_size: # rewind and return all the bytes items.seek(items.tell() - 4) - return items.read(size + 4) + size += 4 + return items.read(size) encoded_values = [] for (offset, message) in items: diff --git a/kafka/record/abc.py b/kafka/record/abc.py index 3b2395a..8a27276 100644 --- a/kafka/record/abc.py +++ b/kafka/record/abc.py @@ -47,7 +47,7 @@ class ABCRecordBatchBuilder(object): Arguments: offset (int): Relative offset of record, starting from 0 timestamp (int or None): Timestamp in milliseconds since beginning - of the epoch (midnight Jan 1, 1970 (UTC)). If omited, will be + of the epoch (midnight Jan 1, 1970 (UTC)). If omitted, will be set to current time. key (bytes or None): Key of the record value (bytes or None): Value of the record |