From 64347bd9faa0a314c3c49b48df941d3022f9e311 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sat, 17 Dec 2016 17:06:46 -0800 Subject: Add kafka.serializer interfaces --- kafka/__init__.py | 1 + kafka/consumer/fetcher.py | 31 +++++++++++++++++++------------ kafka/producer/kafka.py | 25 +++++++++++++------------ kafka/serializer/__init__.py | 3 +++ kafka/serializer/abstract.py | 31 +++++++++++++++++++++++++++++++ 5 files changed, 67 insertions(+), 24 deletions(-) create mode 100644 kafka/serializer/__init__.py create mode 100644 kafka/serializer/abstract.py diff --git a/kafka/__init__.py b/kafka/__init__.py index 0d7d113..6a80418 100644 --- a/kafka/__init__.py +++ b/kafka/__init__.py @@ -26,6 +26,7 @@ from kafka.protocol import ( create_message, create_gzip_message, create_snappy_message) from kafka.partitioner import RoundRobinPartitioner, HashedPartitioner, Murmur2Partitioner from kafka.structs import TopicPartition, OffsetAndMetadata +from kafka.serializer import Serializer, Deserializer # To be deprecated when KafkaProducer interface is released from kafka.client import SimpleClient diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index d09f9da..cda136d 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -15,6 +15,7 @@ from kafka.metrics.stats import Avg, Count, Max, Rate from kafka.protocol.fetch import FetchRequest from kafka.protocol.message import PartialMessage from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy +from kafka.serializer import Deserializer from kafka.structs import TopicPartition log = logging.getLogger(__name__) @@ -507,7 +508,12 @@ class Fetcher(six.Iterator): if absolute_base_offset >= 0: inner_offset += absolute_base_offset - key, value = self._deserialize(inner_msg) + key = self._deserialize( + self.config['key_deserializer'], + tp.topic, inner_msg.key) + value = self._deserialize( + self.config['value_deserializer'], + tp.topic, inner_msg.value) yield ConsumerRecord(tp.topic, tp.partition, inner_offset, inner_timestamp, msg.timestamp_type, key, value, inner_msg.crc, @@ -515,7 +521,12 @@ class Fetcher(six.Iterator): len(inner_msg.value) if inner_msg.value is not None else -1) else: - key, value = self._deserialize(msg) + key = self._deserialize( + self.config['key_deserializer'], + tp.topic, msg.key) + value = self._deserialize( + self.config['value_deserializer'], + tp.topic, msg.value) yield ConsumerRecord(tp.topic, tp.partition, offset, msg.timestamp, msg.timestamp_type, key, value, msg.crc, @@ -541,16 +552,12 @@ class Fetcher(six.Iterator): self._iterator = None raise - def _deserialize(self, msg): - if self.config['key_deserializer']: - key = self.config['key_deserializer'](msg.key) # pylint: disable-msg=not-callable - else: - key = msg.key - if self.config['value_deserializer']: - value = self.config['value_deserializer'](msg.value) # pylint: disable-msg=not-callable - else: - value = msg.value - return key, value + def _deserialize(self, f, topic, bytes_): + if not f: + return bytes_ + if isinstance(f, Deserializer): + return f.deserialize(topic, bytes_) + return f(bytes_) def _send_offset_request(self, partition, timestamp): """Fetch a single offset before the given timestamp for the partition. diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 17f27ab..e19121b 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -13,6 +13,7 @@ from ..client_async import KafkaClient, selectors from ..metrics import MetricConfig, Metrics from ..partitioner.default import DefaultPartitioner from ..protocol.message import Message, MessageSet +from ..serializer import Serializer from ..structs import TopicPartition from .future import FutureRecordMetadata, FutureProduceResult from .record_accumulator import AtomicInteger, RecordAccumulator @@ -485,7 +486,12 @@ class KafkaProducer(object): # available self._wait_on_metadata(topic, self.config['max_block_ms'] / 1000.0) - key_bytes, value_bytes = self._serialize(topic, key, value) + key_bytes = self._serialize( + self.config['key_serializer'], + topic, key) + value_bytes = self._serialize( + self.config['value_serializer'], + topic, value) partition = self._partition(topic, partition, key, value, key_bytes, value_bytes) @@ -606,17 +612,12 @@ class KafkaProducer(object): else: log.debug("_wait_on_metadata woke after %s secs.", elapsed) - def _serialize(self, topic, key, value): - # pylint: disable-msg=not-callable - if self.config['key_serializer']: - serialized_key = self.config['key_serializer'](key) - else: - serialized_key = key - if self.config['value_serializer']: - serialized_value = self.config['value_serializer'](value) - else: - serialized_value = value - return serialized_key, serialized_value + def _serialize(self, f, topic, data): + if not f: + return data + if isinstance(f, Serializer): + return f.serialize(topic, data) + return f(data) def _partition(self, topic, partition, key, value, serialized_key, serialized_value): diff --git a/kafka/serializer/__init__.py b/kafka/serializer/__init__.py new file mode 100644 index 0000000..c08cffe --- /dev/null +++ b/kafka/serializer/__init__.py @@ -0,0 +1,3 @@ +from __future__ import absolute_import + +from .abstract import Serializer, Deserializer diff --git a/kafka/serializer/abstract.py b/kafka/serializer/abstract.py new file mode 100644 index 0000000..18ad8d6 --- /dev/null +++ b/kafka/serializer/abstract.py @@ -0,0 +1,31 @@ +from __future__ import absolute_import + +import abc + + +class Serializer(object): + __meta__ = abc.ABCMeta + + def __init__(self, **config): + pass + + @abc.abstractmethod + def serialize(self, topic, value): + pass + + def close(self): + pass + + +class Deserializer(object): + __meta__ = abc.ABCMeta + + def __init__(self, **config): + pass + + @abc.abstractmethod + def deserialize(self, topic, bytes_): + pass + + def close(self): + pass -- cgit v1.2.1