summaryrefslogtreecommitdiff
path: root/kafka/producer/kafka.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/producer/kafka.py')
-rw-r--r--kafka/producer/kafka.py25
1 files changed, 13 insertions, 12 deletions
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index 17f27ab..e19121b 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -13,6 +13,7 @@ from ..client_async import KafkaClient, selectors
from ..metrics import MetricConfig, Metrics
from ..partitioner.default import DefaultPartitioner
from ..protocol.message import Message, MessageSet
+from ..serializer import Serializer
from ..structs import TopicPartition
from .future import FutureRecordMetadata, FutureProduceResult
from .record_accumulator import AtomicInteger, RecordAccumulator
@@ -485,7 +486,12 @@ class KafkaProducer(object):
# available
self._wait_on_metadata(topic, self.config['max_block_ms'] / 1000.0)
- key_bytes, value_bytes = self._serialize(topic, key, value)
+ key_bytes = self._serialize(
+ self.config['key_serializer'],
+ topic, key)
+ value_bytes = self._serialize(
+ self.config['value_serializer'],
+ topic, value)
partition = self._partition(topic, partition, key, value,
key_bytes, value_bytes)
@@ -606,17 +612,12 @@ class KafkaProducer(object):
else:
log.debug("_wait_on_metadata woke after %s secs.", elapsed)
- def _serialize(self, topic, key, value):
- # pylint: disable-msg=not-callable
- if self.config['key_serializer']:
- serialized_key = self.config['key_serializer'](key)
- else:
- serialized_key = key
- if self.config['value_serializer']:
- serialized_value = self.config['value_serializer'](value)
- else:
- serialized_value = value
- return serialized_key, serialized_value
+ def _serialize(self, f, topic, data):
+ if not f:
+ return data
+ if isinstance(f, Serializer):
+ return f.serialize(topic, data)
+ return f(data)
def _partition(self, topic, partition, key, value,
serialized_key, serialized_value):