summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-07-15 13:13:39 -0700
committerGitHub <noreply@github.com>2016-07-15 13:13:39 -0700
commita577053e5e804f444e81bd430402b5eb94a65e99 (patch)
tree102f7c3250b4c995a1e29135fb78aebce59d3b49
parent916c25726f6238c5af92728aa8df8d8fddd809a7 (diff)
parentad13500cd1276b71bd88fbe3836d7982a6bf1ce3 (diff)
downloadkafka-python-a577053e5e804f444e81bd430402b5eb94a65e99.tar.gz
Merge pull request #755 from dpkp/unrecurse_unpack_message_set
Drop recursion in Fetcher _unpack_message_set
-rw-r--r--kafka/consumer/fetcher.py73
-rw-r--r--kafka/consumer/group.py8
2 files changed, 64 insertions, 17 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index 5f3eb1d..34ff4cb 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -39,6 +39,7 @@ class Fetcher(six.Iterator):
'fetch_max_wait_ms': 500,
'max_partition_fetch_bytes': 1048576,
'check_crcs': True,
+ 'skip_double_compressed_messages': False,
'iterator_refetch_records': 1, # undocumented -- interface may change
'api_version': (0, 8, 0),
}
@@ -71,6 +72,13 @@ class Fetcher(six.Iterator):
consumed. This ensures no on-the-wire or on-disk corruption to
the messages occurred. This check adds some overhead, so it may
be disabled in cases seeking extreme performance. Default: True
+ skip_double_compressed_messages (bool): A bug in KafkaProducer
+ caused some messages to be corrupted via double-compression.
+ By default, the fetcher will return the messages as a compressed
+ blob of bytes with a single offset, i.e. how the message was
+ actually published to the cluster. If you prefer to have the
+ fetcher automatically detect corrupt messages and skip them,
+ set this option to True. Default: False.
"""
self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config:
@@ -352,33 +360,64 @@ class Fetcher(six.Iterator):
position)
return dict(drained)
- def _unpack_message_set(self, tp, messages, relative_offset=0):
+ def _unpack_message_set(self, tp, messages):
try:
for offset, size, msg in messages:
if self.config['check_crcs'] and not msg.validate_crc():
raise Errors.InvalidMessageError(msg)
elif msg.is_compressed():
- mset = msg.decompress()
- # new format uses relative offsets for compressed messages
+ # If relative offset is used, we need to decompress the entire message first to compute
+ # the absolute offset.
+ inner_mset = msg.decompress()
+
+ # There should only ever be a single layer of compression
+ if inner_mset[0][-1].is_compressed():
+ log.warning('MessageSet at %s offset %d appears '
+ ' double-compressed. This should not'
+ ' happen -- check your producers!',
+ tp, offset)
+ if self.config['skip_double_compressed_messages']:
+ log.warning('Skipping double-compressed message at'
+ ' %s %d', tp, offset)
+ continue
+
if msg.magic > 0:
- last_offset, _, _ = mset[-1]
- relative = offset - last_offset
+ last_offset, _, _ = inner_mset[-1]
+ absolute_base_offset = offset - last_offset
else:
- relative = 0
- for record in self._unpack_message_set(tp, mset, relative):
- yield record
+ absolute_base_offset = -1
+
+ for inner_offset, inner_size, inner_msg in inner_mset:
+ if msg.magic > 0:
+ # When magic value is greater than 0, the timestamp
+ # of a compressed message depends on the
+ # typestamp type of the wrapper message:
+
+ if msg.timestamp_type == 0: # CREATE_TIME (0)
+ inner_timestamp = inner_msg.timestamp
+
+ elif msg.timestamp_type == 1: # LOG_APPEND_TIME (1)
+ inner_timestamp = msg.timestamp
+
+ else:
+ raise ValueError('Unknown timestamp type: {}'.format(msg.timestamp_type))
+ else:
+ inner_timestamp = msg.timestamp
+
+ if absolute_base_offset >= 0:
+ inner_offset += absolute_base_offset
+
+ key, value = self._deserialize(inner_msg)
+ yield ConsumerRecord(tp.topic, tp.partition, inner_offset,
+ inner_timestamp, msg.timestamp_type,
+ key, value)
+
else:
- # Message v1 adds timestamp
- if msg.magic > 0:
- timestamp = msg.timestamp
- timestamp_type = msg.timestamp_type
- else:
- timestamp = timestamp_type = None
key, value = self._deserialize(msg)
- yield ConsumerRecord(tp.topic, tp.partition,
- offset + relative_offset,
- timestamp, timestamp_type,
+ yield ConsumerRecord(tp.topic, tp.partition, offset,
+ msg.timestamp, msg.timestamp_type,
key, value)
+
# If unpacking raises StopIteration, it is erroneously
# caught by the generator. We want all exceptions to be raised
# back to the user. See Issue 545
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 8509999..7fe509a 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -123,6 +123,13 @@ class KafkaConsumer(six.Iterator):
consumer_timeout_ms (int): number of milliseconds to block during
message iteration before raising StopIteration (i.e., ending the
iterator). Default -1 (block forever).
+ skip_double_compressed_messages (bool): A bug in KafkaProducer <= 1.2.4
+ caused some messages to be corrupted via double-compression.
+ By default, the fetcher will return these messages as a compressed
+ blob of bytes with a single offset, i.e. how the message was
+ actually published to the cluster. If you prefer to have the
+ fetcher automatically detect corrupt messages and skip them,
+ set this option to True. Default: False.
security_protocol (str): Protocol used to communicate with brokers.
Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT.
ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping
@@ -189,6 +196,7 @@ class KafkaConsumer(six.Iterator):
'send_buffer_bytes': None,
'receive_buffer_bytes': None,
'consumer_timeout_ms': -1,
+ 'skip_double_compressed_messages': False,
'security_protocol': 'PLAINTEXT',
'ssl_context': None,
'ssl_check_hostname': True,