summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTaras <voyn1991@gmail.com>2017-10-12 11:07:14 +0300
committerTaras <voyn1991@gmail.com>2017-10-12 11:10:44 +0300
commitd10051bb09942bfd48c6f262a8cdbf5651963c2e (patch)
tree1f6a262324e3ced9c8655d71fbed5e9ed8a0234d
parente992fbfad926486766ff7b63a499f9cf29984fec (diff)
downloadkafka-python-d10051bb09942bfd48c6f262a8cdbf5651963c2e.tar.gz
Added minor fixes for PR review
-rw-r--r--benchmarks/record_batch_compose.py12
-rw-r--r--benchmarks/record_batch_read.py10
-rw-r--r--kafka/producer/kafka.py2
-rw-r--r--kafka/producer/record_accumulator.py18
-rw-r--r--kafka/protocol/message.py3
-rw-r--r--kafka/record/abc.py2
6 files changed, 20 insertions, 27 deletions
diff --git a/benchmarks/record_batch_compose.py b/benchmarks/record_batch_compose.py
index 11320ca..86012df 100644
--- a/benchmarks/record_batch_compose.py
+++ b/benchmarks/record_batch_compose.py
@@ -1,11 +1,13 @@
#!/usr/bin/env python3
from __future__ import print_function
-import perf
-from kafka.record.memory_records import MemoryRecordsBuilder
-import itertools
-import random
import hashlib
+import itertools
import os
+import random
+
+import perf
+
+from kafka.record.memory_records import MemoryRecordsBuilder
DEFAULT_BATCH_SIZE = 1600 * 1024
@@ -13,7 +15,7 @@ KEY_SIZE = 6
VALUE_SIZE = 60
TIMESTAMP_RANGE = [1505824130000, 1505824140000]
-# With values above v1 record is 100 bytes, so 10_000 bytes for 100 messages
+# With values above v1 record is 100 bytes, so 10 000 bytes for 100 messages
MESSAGES_PER_BATCH = 100
diff --git a/benchmarks/record_batch_read.py b/benchmarks/record_batch_read.py
index 4ded5b0..7ae471e 100644
--- a/benchmarks/record_batch_read.py
+++ b/benchmarks/record_batch_read.py
@@ -1,11 +1,13 @@
#!/usr/bin/env python
from __future__ import print_function
-import perf
-from kafka.record.memory_records import MemoryRecords, MemoryRecordsBuilder
-import itertools
-import random
import hashlib
+import itertools
import os
+import random
+
+import perf
+
+from kafka.record.memory_records import MemoryRecords, MemoryRecordsBuilder
DEFAULT_BATCH_SIZE = 1600 * 1024
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index a53ac49..5638b61 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -370,7 +370,7 @@ class KafkaProducer(object):
else:
checker, compression_attrs = self._COMPRESSORS[ct]
assert checker(), "Libraries for {} compression codec not found".format(ct)
- self.config['compression_type'] = compression_attrs
+ self.config['compression_attrs'] = compression_attrs
message_version = self._max_usable_produce_magic()
self._accumulator = RecordAccumulator(message_version=message_version, metrics=self._metrics, **self.config)
diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py
index 0c0ce27..716ae65 100644
--- a/kafka/producer/record_accumulator.py
+++ b/kafka/producer/record_accumulator.py
@@ -149,7 +149,7 @@ class RecordAccumulator(object):
will block up to max_block_ms, raising an exception on timeout.
In the current implementation, this setting is an approximation.
Default: 33554432 (32MB)
- compression_type (int): The compression type for all data generated by
+ compression_attrs (int): The compression type for all data generated by
the producer. Valid values are gzip(1), snappy(2), lz4(3), or
none(0).
Compression is of full batches of data, so the efficacy of batching
@@ -168,7 +168,7 @@ class RecordAccumulator(object):
DEFAULT_CONFIG = {
'buffer_memory': 33554432,
'batch_size': 16384,
- 'compression_type': None,
+ 'compression_attrs': 0,
'linger_ms': 0,
'retry_backoff_ms': 100,
'message_version': 0,
@@ -176,24 +176,12 @@ class RecordAccumulator(object):
'metric_group_prefix': 'producer-metrics',
}
- _COMPRESSORS = {
- 'gzip': LegacyRecordBatchBuilder.CODEC_GZIP,
- 'snappy': LegacyRecordBatchBuilder.CODEC_SNAPPY,
- 'lz4': LegacyRecordBatchBuilder.CODEC_LZ4,
- None: LegacyRecordBatchBuilder.CODEC_NONE
- }
-
def __init__(self, **configs):
self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config:
if key in configs:
self.config[key] = configs.pop(key)
- # Convert compression type to INT presentation. Mostly for unit tests,
- # as Producer should pass already converted values.
- ct = self.config["compression_type"]
- self.config["compression_type"] = self._COMPRESSORS.get(ct, ct)
-
self._closed = False
self._flushes_in_progress = AtomicInteger()
self._appends_in_progress = AtomicInteger()
@@ -269,7 +257,7 @@ class RecordAccumulator(object):
records = MemoryRecordsBuilder(
self.config['message_version'],
- self.config['compression_type'],
+ self.config['compression_attrs'],
self.config['batch_size']
)
diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py
index f5a51a9..a330ed8 100644
--- a/kafka/protocol/message.py
+++ b/kafka/protocol/message.py
@@ -161,7 +161,8 @@ class MessageSet(AbstractType):
if prepend_size:
# rewind and return all the bytes
items.seek(items.tell() - 4)
- return items.read(size + 4)
+ size += 4
+ return items.read(size)
encoded_values = []
for (offset, message) in items:
diff --git a/kafka/record/abc.py b/kafka/record/abc.py
index 3b2395a..8a27276 100644
--- a/kafka/record/abc.py
+++ b/kafka/record/abc.py
@@ -47,7 +47,7 @@ class ABCRecordBatchBuilder(object):
Arguments:
offset (int): Relative offset of record, starting from 0
timestamp (int or None): Timestamp in milliseconds since beginning
- of the epoch (midnight Jan 1, 1970 (UTC)). If omited, will be
+ of the epoch (midnight Jan 1, 1970 (UTC)). If omitted, will be
set to current time.
key (bytes or None): Key of the record
value (bytes or None): Value of the record