summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-12-17 17:06:46 -0800
committerDana Powers <dana.powers@gmail.com>2016-12-17 17:06:46 -0800
commit64347bd9faa0a314c3c49b48df941d3022f9e311 (patch)
treef6957cb1f95a7f2a951123975b205323d6d70618
parent07e09c1c2ec6787fc7e4f3c2578d31b4a15d20bc (diff)
downloadkafka-python-serialize_interface.tar.gz
Add kafka.serializer interfacesserialize_interface
-rw-r--r--kafka/__init__.py1
-rw-r--r--kafka/consumer/fetcher.py31
-rw-r--r--kafka/producer/kafka.py25
-rw-r--r--kafka/serializer/__init__.py3
-rw-r--r--kafka/serializer/abstract.py31
5 files changed, 67 insertions, 24 deletions
diff --git a/kafka/__init__.py b/kafka/__init__.py
index 0d7d113..6a80418 100644
--- a/kafka/__init__.py
+++ b/kafka/__init__.py
@@ -26,6 +26,7 @@ from kafka.protocol import (
create_message, create_gzip_message, create_snappy_message)
from kafka.partitioner import RoundRobinPartitioner, HashedPartitioner, Murmur2Partitioner
from kafka.structs import TopicPartition, OffsetAndMetadata
+from kafka.serializer import Serializer, Deserializer
# To be deprecated when KafkaProducer interface is released
from kafka.client import SimpleClient
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index d09f9da..cda136d 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -15,6 +15,7 @@ from kafka.metrics.stats import Avg, Count, Max, Rate
from kafka.protocol.fetch import FetchRequest
from kafka.protocol.message import PartialMessage
from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy
+from kafka.serializer import Deserializer
from kafka.structs import TopicPartition
log = logging.getLogger(__name__)
@@ -507,7 +508,12 @@ class Fetcher(six.Iterator):
if absolute_base_offset >= 0:
inner_offset += absolute_base_offset
- key, value = self._deserialize(inner_msg)
+ key = self._deserialize(
+ self.config['key_deserializer'],
+ tp.topic, inner_msg.key)
+ value = self._deserialize(
+ self.config['value_deserializer'],
+ tp.topic, inner_msg.value)
yield ConsumerRecord(tp.topic, tp.partition, inner_offset,
inner_timestamp, msg.timestamp_type,
key, value, inner_msg.crc,
@@ -515,7 +521,12 @@ class Fetcher(six.Iterator):
len(inner_msg.value) if inner_msg.value is not None else -1)
else:
- key, value = self._deserialize(msg)
+ key = self._deserialize(
+ self.config['key_deserializer'],
+ tp.topic, msg.key)
+ value = self._deserialize(
+ self.config['value_deserializer'],
+ tp.topic, msg.value)
yield ConsumerRecord(tp.topic, tp.partition, offset,
msg.timestamp, msg.timestamp_type,
key, value, msg.crc,
@@ -541,16 +552,12 @@ class Fetcher(six.Iterator):
self._iterator = None
raise
- def _deserialize(self, msg):
- if self.config['key_deserializer']:
- key = self.config['key_deserializer'](msg.key) # pylint: disable-msg=not-callable
- else:
- key = msg.key
- if self.config['value_deserializer']:
- value = self.config['value_deserializer'](msg.value) # pylint: disable-msg=not-callable
- else:
- value = msg.value
- return key, value
+ def _deserialize(self, f, topic, bytes_):
+ if not f:
+ return bytes_
+ if isinstance(f, Deserializer):
+ return f.deserialize(topic, bytes_)
+ return f(bytes_)
def _send_offset_request(self, partition, timestamp):
"""Fetch a single offset before the given timestamp for the partition.
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index 17f27ab..e19121b 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -13,6 +13,7 @@ from ..client_async import KafkaClient, selectors
from ..metrics import MetricConfig, Metrics
from ..partitioner.default import DefaultPartitioner
from ..protocol.message import Message, MessageSet
+from ..serializer import Serializer
from ..structs import TopicPartition
from .future import FutureRecordMetadata, FutureProduceResult
from .record_accumulator import AtomicInteger, RecordAccumulator
@@ -485,7 +486,12 @@ class KafkaProducer(object):
# available
self._wait_on_metadata(topic, self.config['max_block_ms'] / 1000.0)
- key_bytes, value_bytes = self._serialize(topic, key, value)
+ key_bytes = self._serialize(
+ self.config['key_serializer'],
+ topic, key)
+ value_bytes = self._serialize(
+ self.config['value_serializer'],
+ topic, value)
partition = self._partition(topic, partition, key, value,
key_bytes, value_bytes)
@@ -606,17 +612,12 @@ class KafkaProducer(object):
else:
log.debug("_wait_on_metadata woke after %s secs.", elapsed)
- def _serialize(self, topic, key, value):
- # pylint: disable-msg=not-callable
- if self.config['key_serializer']:
- serialized_key = self.config['key_serializer'](key)
- else:
- serialized_key = key
- if self.config['value_serializer']:
- serialized_value = self.config['value_serializer'](value)
- else:
- serialized_value = value
- return serialized_key, serialized_value
+ def _serialize(self, f, topic, data):
+ if not f:
+ return data
+ if isinstance(f, Serializer):
+ return f.serialize(topic, data)
+ return f(data)
def _partition(self, topic, partition, key, value,
serialized_key, serialized_value):
diff --git a/kafka/serializer/__init__.py b/kafka/serializer/__init__.py
new file mode 100644
index 0000000..c08cffe
--- /dev/null
+++ b/kafka/serializer/__init__.py
@@ -0,0 +1,3 @@
+from __future__ import absolute_import
+
+from .abstract import Serializer, Deserializer
diff --git a/kafka/serializer/abstract.py b/kafka/serializer/abstract.py
new file mode 100644
index 0000000..18ad8d6
--- /dev/null
+++ b/kafka/serializer/abstract.py
@@ -0,0 +1,31 @@
+from __future__ import absolute_import
+
+import abc
+
+
+class Serializer(object):
+ __meta__ = abc.ABCMeta
+
+ def __init__(self, **config):
+ pass
+
+ @abc.abstractmethod
+ def serialize(self, topic, value):
+ pass
+
+ def close(self):
+ pass
+
+
+class Deserializer(object):
+ __meta__ = abc.ABCMeta
+
+ def __init__(self, **config):
+ pass
+
+ @abc.abstractmethod
+ def deserialize(self, topic, bytes_):
+ pass
+
+ def close(self):
+ pass