From ca9d2fabc352f5b6f2709295df7382f5dd7bfc97 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 13 Jul 2016 09:15:57 -0700 Subject: Fix bug causing KafkaProducer to double-compress message batches on retry --- kafka/producer/buffer.py | 39 +++++++++++++++++++++++---------------- 1 file changed, 23 insertions(+), 16 deletions(-) (limited to 'kafka/producer/buffer.py') diff --git a/kafka/producer/buffer.py b/kafka/producer/buffer.py index 5dc2e1f..5fcb35f 100644 --- a/kafka/producer/buffer.py +++ b/kafka/producer/buffer.py @@ -89,22 +89,29 @@ class MessageSetBuffer(object): return self._buffer.tell() >= self._batch_size def close(self): - if self._compressor: - # TODO: avoid copies with bytearray / memoryview - self._buffer.seek(4) - msg = Message(self._compressor(self._buffer.read()), - attributes=self._compression_attributes, - magic=self._message_version) - encoded = msg.encode() - self._buffer.seek(4) - self._buffer.write(Int64.encode(0)) # offset 0 for wrapper msg - self._buffer.write(Int32.encode(len(encoded))) - self._buffer.write(encoded) - - # Update the message set size, and return ready for full read() - size = self._buffer.tell() - 4 - self._buffer.seek(0) - self._buffer.write(Int32.encode(size)) + # This method may be called multiple times on the same batch + # i.e., on retries + # we need to make sure we only close it out once + # otherwise compressed messages may be double-compressed + # see Issue 718 + if not self._closed: + if self._compressor: + # TODO: avoid copies with bytearray / memoryview + self._buffer.seek(4) + msg = Message(self._compressor(self._buffer.read()), + attributes=self._compression_attributes, + magic=self._message_version) + encoded = msg.encode() + self._buffer.seek(4) + self._buffer.write(Int64.encode(0)) # offset 0 for wrapper msg + self._buffer.write(Int32.encode(len(encoded))) + self._buffer.write(encoded) + + # Update the message set size, and return ready for full read() + size = self._buffer.tell() - 4 + self._buffer.seek(0) + self._buffer.write(Int32.encode(size)) + self._buffer.seek(0) self._closed = True -- cgit v1.2.1