summaryrefslogtreecommitdiff
path: root/kafka/consumer/fetcher.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer/fetcher.py')
-rw-r--r--kafka/consumer/fetcher.py31
1 files changed, 19 insertions, 12 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index d09f9da..cda136d 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -15,6 +15,7 @@ from kafka.metrics.stats import Avg, Count, Max, Rate
from kafka.protocol.fetch import FetchRequest
from kafka.protocol.message import PartialMessage
from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy
+from kafka.serializer import Deserializer
from kafka.structs import TopicPartition
log = logging.getLogger(__name__)
@@ -507,7 +508,12 @@ class Fetcher(six.Iterator):
if absolute_base_offset >= 0:
inner_offset += absolute_base_offset
- key, value = self._deserialize(inner_msg)
+ key = self._deserialize(
+ self.config['key_deserializer'],
+ tp.topic, inner_msg.key)
+ value = self._deserialize(
+ self.config['value_deserializer'],
+ tp.topic, inner_msg.value)
yield ConsumerRecord(tp.topic, tp.partition, inner_offset,
inner_timestamp, msg.timestamp_type,
key, value, inner_msg.crc,
@@ -515,7 +521,12 @@ class Fetcher(six.Iterator):
len(inner_msg.value) if inner_msg.value is not None else -1)
else:
- key, value = self._deserialize(msg)
+ key = self._deserialize(
+ self.config['key_deserializer'],
+ tp.topic, msg.key)
+ value = self._deserialize(
+ self.config['value_deserializer'],
+ tp.topic, msg.value)
yield ConsumerRecord(tp.topic, tp.partition, offset,
msg.timestamp, msg.timestamp_type,
key, value, msg.crc,
@@ -541,16 +552,12 @@ class Fetcher(six.Iterator):
self._iterator = None
raise
- def _deserialize(self, msg):
- if self.config['key_deserializer']:
- key = self.config['key_deserializer'](msg.key) # pylint: disable-msg=not-callable
- else:
- key = msg.key
- if self.config['value_deserializer']:
- value = self.config['value_deserializer'](msg.value) # pylint: disable-msg=not-callable
- else:
- value = msg.value
- return key, value
+ def _deserialize(self, f, topic, bytes_):
+ if not f:
+ return bytes_
+ if isinstance(f, Deserializer):
+ return f.deserialize(topic, bytes_)
+ return f(bytes_)
def _send_offset_request(self, partition, timestamp):
"""Fetch a single offset before the given timestamp for the partition.