From 64347bd9faa0a314c3c49b48df941d3022f9e311 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sat, 17 Dec 2016 17:06:46 -0800 Subject: Add kafka.serializer interfaces --- kafka/producer/kafka.py | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) (limited to 'kafka/producer/kafka.py') diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 17f27ab..e19121b 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -13,6 +13,7 @@ from ..client_async import KafkaClient, selectors from ..metrics import MetricConfig, Metrics from ..partitioner.default import DefaultPartitioner from ..protocol.message import Message, MessageSet +from ..serializer import Serializer from ..structs import TopicPartition from .future import FutureRecordMetadata, FutureProduceResult from .record_accumulator import AtomicInteger, RecordAccumulator @@ -485,7 +486,12 @@ class KafkaProducer(object): # available self._wait_on_metadata(topic, self.config['max_block_ms'] / 1000.0) - key_bytes, value_bytes = self._serialize(topic, key, value) + key_bytes = self._serialize( + self.config['key_serializer'], + topic, key) + value_bytes = self._serialize( + self.config['value_serializer'], + topic, value) partition = self._partition(topic, partition, key, value, key_bytes, value_bytes) @@ -606,17 +612,12 @@ class KafkaProducer(object): else: log.debug("_wait_on_metadata woke after %s secs.", elapsed) - def _serialize(self, topic, key, value): - # pylint: disable-msg=not-callable - if self.config['key_serializer']: - serialized_key = self.config['key_serializer'](key) - else: - serialized_key = key - if self.config['value_serializer']: - serialized_value = self.config['value_serializer'](value) - else: - serialized_value = value - return serialized_key, serialized_value + def _serialize(self, f, topic, data): + if not f: + return data + if isinstance(f, Serializer): + return f.serialize(topic, data) + return f(data) def _partition(self, topic, partition, key, value, serialized_key, serialized_value): -- cgit v1.2.1