summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-12-10 10:57:27 -0800
committerDana Powers <dana.powers@rd.io>2015-12-10 11:25:15 -0800
commit7470cade6bb8629d17541e136527369f9d2ec387 (patch)
treeba424c4d0cc27ffb5ec77196a0e121f3075bc992
parentc3d2fda3c368771cb93a09bb2f1edaa7a3cf9c2b (diff)
downloadkafka-python-7470cade6bb8629d17541e136527369f9d2ec387.tar.gz
Convert OffsetCommit and OffsetFetch protocol encode/decode
-rw-r--r--kafka/common.py16
-rw-r--r--kafka/consumer/base.py6
-rw-r--r--kafka/consumer/kafka.py6
-rw-r--r--kafka/context.py4
-rw-r--r--kafka/protocol/legacy.py128
-rw-r--r--test/test_client_integration.py6
-rw-r--r--test/test_consumer.py4
-rw-r--r--test/test_protocol.py4
8 files changed, 69 insertions, 105 deletions
diff --git a/kafka/common.py b/kafka/common.py
index 7ae3294..4fc1e19 100644
--- a/kafka/common.py
+++ b/kafka/common.py
@@ -28,30 +28,30 @@ ProduceResponsePayload = namedtuple("ProduceResponsePayload",
["topic", "partition", "error", "offset"])
# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-FetchAPI
-FetchRequestPayload = namedtuple("FetchRequest",
+FetchRequestPayload = namedtuple("FetchRequestPayload",
["topic", "partition", "offset", "max_bytes"])
-FetchResponsePayload = namedtuple("FetchResponse",
+FetchResponsePayload = namedtuple("FetchResponsePayload",
["topic", "partition", "error", "highwaterMark", "messages"])
# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI
-OffsetRequestPayload = namedtuple("OffsetRequest",
+OffsetRequestPayload = namedtuple("OffsetRequestPayload",
["topic", "partition", "time", "max_offsets"])
-OffsetResponsePayload = namedtuple("OffsetResponse",
+OffsetResponsePayload = namedtuple("OffsetResponsePayload",
["topic", "partition", "error", "offsets"])
# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
-OffsetCommitRequest = namedtuple("OffsetCommitRequest",
+OffsetCommitRequestPayload = namedtuple("OffsetCommitRequestPayload",
["topic", "partition", "offset", "metadata"])
-OffsetCommitResponse = namedtuple("OffsetCommitResponse",
+OffsetCommitResponsePayload = namedtuple("OffsetCommitResponsePayload",
["topic", "partition", "error"])
-OffsetFetchRequest = namedtuple("OffsetFetchRequest",
+OffsetFetchRequestPayload = namedtuple("OffsetFetchRequestPayload",
["topic", "partition"])
-OffsetFetchResponse = namedtuple("OffsetFetchResponse",
+OffsetFetchResponsePayload = namedtuple("OffsetFetchResponsePayload",
["topic", "partition", "offset", "metadata", "error"])
diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py
index 034d35c..5859d36 100644
--- a/kafka/consumer/base.py
+++ b/kafka/consumer/base.py
@@ -7,7 +7,7 @@ from threading import Lock
import kafka.common
from kafka.common import (
- OffsetRequestPayload, OffsetCommitRequest, OffsetFetchRequest,
+ OffsetRequestPayload, OffsetCommitRequestPayload, OffsetFetchRequestPayload,
UnknownTopicOrPartitionError, check_error, KafkaError
)
@@ -101,7 +101,7 @@ class Consumer(object):
responses = self.client.send_offset_fetch_request(
self.group,
- [OffsetFetchRequest(self.topic, p) for p in partitions],
+ [OffsetFetchRequestPayload(self.topic, p) for p in partitions],
fail_on_error=False
)
@@ -155,7 +155,7 @@ class Consumer(object):
'group=%s, topic=%s, partition=%s',
offset, self.group, self.topic, partition)
- reqs.append(OffsetCommitRequest(self.topic, partition,
+ reqs.append(OffsetCommitRequestPayload(self.topic, partition,
offset, None))
try:
diff --git a/kafka/consumer/kafka.py b/kafka/consumer/kafka.py
index 1bd3def..fa70124 100644
--- a/kafka/consumer/kafka.py
+++ b/kafka/consumer/kafka.py
@@ -11,7 +11,7 @@ import six
from kafka.client import KafkaClient
from kafka.common import (
- OffsetFetchRequest, OffsetCommitRequest,
+ OffsetFetchRequestPayload, OffsetCommitRequestPayload,
OffsetRequestPayload, FetchRequestPayload,
check_error, NotLeaderForPartitionError, UnknownTopicOrPartitionError,
OffsetOutOfRangeError, RequestTimedOutError, KafkaMessage, ConsumerTimeout,
@@ -546,7 +546,7 @@ class KafkaConsumer(object):
continue
commits.append(
- OffsetCommitRequest(topic_partition[0], topic_partition[1],
+ OffsetCommitRequestPayload(topic_partition[0], topic_partition[1],
commit_offset, metadata)
)
@@ -618,7 +618,7 @@ class KafkaConsumer(object):
for topic_partition in self._topics:
(resp,) = self._client.send_offset_fetch_request(
kafka_bytestring(self._config['group_id']),
- [OffsetFetchRequest(topic_partition[0], topic_partition[1])],
+ [OffsetFetchRequestPayload(topic_partition[0], topic_partition[1])],
fail_on_error=False)
try:
check_error(resp)
diff --git a/kafka/context.py b/kafka/context.py
index ade4db8..376fad1 100644
--- a/kafka/context.py
+++ b/kafka/context.py
@@ -3,7 +3,7 @@ Context manager to commit/rollback consumer offsets.
"""
from logging import getLogger
-from kafka.common import check_error, OffsetCommitRequest, OffsetOutOfRangeError
+from kafka.common import check_error, OffsetCommitRequestPayload, OffsetOutOfRangeError
class OffsetCommitContext(object):
@@ -139,7 +139,7 @@ class OffsetCommitContext(object):
self.logger.debug("Committing partition offsets: %s", partition_offsets)
commit_requests = [
- OffsetCommitRequest(self.consumer.topic, partition, offset, None)
+ OffsetCommitRequestPayload(self.consumer.topic, partition, offset, None)
for partition, offset in partition_offsets.items()
]
commit_responses = self.consumer.client.send_offset_commit_request(
diff --git a/kafka/protocol/legacy.py b/kafka/protocol/legacy.py
index c5babf7..feabed3 100644
--- a/kafka/protocol/legacy.py
+++ b/kafka/protocol/legacy.py
@@ -19,7 +19,6 @@ from kafka.codec import (
gzip_encode, gzip_decode, snappy_encode, snappy_decode
)
from kafka.common import (
- OffsetCommitResponse, OffsetFetchResponse,
ProtocolError, ChecksumError,
UnsupportedCodecError,
ConsumerMetadataResponse
@@ -258,8 +257,8 @@ class KafkaProtocol(object):
partition,
payload.time,
payload.max_offsets)
- for partition, payload in six.iteritems(topic_payloads)])
- for topic, topic_payloads in six.iteritems(group_by_topic_and_partition(payloads))])
+ for partition, payload in six.iteritems(topic_payloads)])
+ for topic, topic_payloads in six.iteritems(group_by_topic_and_partition(payloads))])
@classmethod
def decode_offset_response(cls, response):
@@ -327,115 +326,80 @@ class KafkaProtocol(object):
return ConsumerMetadataResponse(error, nodeId, host, port)
@classmethod
- def encode_offset_commit_request(cls, client_id, correlation_id,
- group, payloads):
+ def encode_offset_commit_request(cls, group, payloads):
"""
- Encode some OffsetCommitRequest structs
+ Encode an OffsetCommitRequest struct
Arguments:
- client_id: string
- correlation_id: int
group: string, the consumer group you are committing offsets for
- payloads: list of OffsetCommitRequest
+ payloads: list of OffsetCommitRequestPayload
"""
- grouped_payloads = group_by_topic_and_partition(payloads)
-
- message = []
- message.append(cls._encode_message_header(client_id, correlation_id,
- KafkaProtocol.OFFSET_COMMIT_KEY))
- message.append(write_short_string(group))
- message.append(struct.pack('>i', len(grouped_payloads)))
-
- for topic, topic_payloads in grouped_payloads.items():
- message.append(write_short_string(topic))
- message.append(struct.pack('>i', len(topic_payloads)))
-
- for partition, payload in topic_payloads.items():
- message.append(struct.pack('>iq', partition, payload.offset))
- message.append(write_short_string(payload.metadata))
+ return kafka.protocol.commit.OffsetCommitRequest_v0(
+ consumer_group=group,
+ topics=[(
+ topic,
+ [(
+ partition,
+ payload.offset,
+ payload.metadata)
+ for partition, payload in six.iteritems(topic_payloads)])
+ for topic, topic_payloads in six.iteritems(group_by_topic_and_partition(payloads))])
- msg = b''.join(message)
- return struct.pack('>i%ds' % len(msg), len(msg), msg)
@classmethod
- def decode_offset_commit_response(cls, data):
+ def decode_offset_commit_response(cls, response):
"""
- Decode bytes to an OffsetCommitResponse
+ Decode OffsetCommitResponse to an OffsetCommitResponsePayload
Arguments:
- data: bytes to decode
+ response: OffsetCommitResponse
"""
- ((correlation_id,), cur) = relative_unpack('>i', data, 0)
- ((num_topics,), cur) = relative_unpack('>i', data, cur)
-
- for _ in xrange(num_topics):
- (topic, cur) = read_short_string(data, cur)
- ((num_partitions,), cur) = relative_unpack('>i', data, cur)
-
- for _ in xrange(num_partitions):
- ((partition, error), cur) = relative_unpack('>ih', data, cur)
- yield OffsetCommitResponse(topic, partition, error)
+ return [
+ kafka.common.OffsetCommitResponsePayload(topic, partition, error)
+ for topic, partitions in response.topics
+ for partition, error in partitions
+ ]
@classmethod
- def encode_offset_fetch_request(cls, client_id, correlation_id,
- group, payloads, from_kafka=False):
+ def encode_offset_fetch_request(cls, group, payloads, from_kafka=False):
"""
- Encode some OffsetFetchRequest structs. The request is encoded using
+ Encode an OffsetFetchRequest struct. The request is encoded using
version 0 if from_kafka is false, indicating a request for Zookeeper
offsets. It is encoded using version 1 otherwise, indicating a request
for Kafka offsets.
Arguments:
- client_id: string
- correlation_id: int
group: string, the consumer group you are fetching offsets for
- payloads: list of OffsetFetchRequest
+ payloads: list of OffsetFetchRequestPayload
from_kafka: bool, default False, set True for Kafka-committed offsets
"""
- grouped_payloads = group_by_topic_and_partition(payloads)
-
- message = []
- reqver = 1 if from_kafka else 0
- message.append(cls._encode_message_header(client_id, correlation_id,
- KafkaProtocol.OFFSET_FETCH_KEY,
- version=reqver))
-
- message.append(write_short_string(group))
- message.append(struct.pack('>i', len(grouped_payloads)))
-
- for topic, topic_payloads in grouped_payloads.items():
- message.append(write_short_string(topic))
- message.append(struct.pack('>i', len(topic_payloads)))
-
- for partition, payload in topic_payloads.items():
- message.append(struct.pack('>i', partition))
+ if from_kafka:
+ request_class = kafka.protocol.commit.OffsetFetchRequest_v1
+ else:
+ request_class = kafka.protocol.commit.OffsetFetchRequest_v0
- msg = b''.join(message)
- return struct.pack('>i%ds' % len(msg), len(msg), msg)
+ return request_class(
+ consumer_group=group,
+ topics=[(
+ topic,
+ list(topic_payloads.keys()))
+ for topic, topic_payloads in six.iteritems(group_by_topic_and_partition(payloads))])
@classmethod
- def decode_offset_fetch_response(cls, data):
+ def decode_offset_fetch_response(cls, response):
"""
- Decode bytes to an OffsetFetchResponse
+ Decode OffsetFetchResponse to OffsetFetchResponsePayloads
Arguments:
- data: bytes to decode
+ response: OffsetFetchResponse
"""
-
- ((correlation_id,), cur) = relative_unpack('>i', data, 0)
- ((num_topics,), cur) = relative_unpack('>i', data, cur)
-
- for _ in range(num_topics):
- (topic, cur) = read_short_string(data, cur)
- ((num_partitions,), cur) = relative_unpack('>i', data, cur)
-
- for _ in range(num_partitions):
- ((partition, offset), cur) = relative_unpack('>iq', data, cur)
- (metadata, cur) = read_short_string(data, cur)
- ((error,), cur) = relative_unpack('>h', data, cur)
-
- yield OffsetFetchResponse(topic, partition, offset,
- metadata, error)
+ return [
+ kafka.common.OffsetFetchResponsePayload(
+ topic, partition, offset, metadata, error
+ )
+ for topic, partitions in response.topics
+ for partition, offset, metadata, error in partitions
+ ]
def create_message(payload, key=None):
diff --git a/test/test_client_integration.py b/test/test_client_integration.py
index 70da4a3..edd62da 100644
--- a/test/test_client_integration.py
+++ b/test/test_client_integration.py
@@ -1,7 +1,7 @@
import os
from kafka.common import (
- FetchRequestPayload, OffsetCommitRequest, OffsetFetchRequest,
+ FetchRequestPayload, OffsetCommitRequestPayload, OffsetFetchRequestPayload,
KafkaTimeoutError, ProduceRequestPayload
)
from kafka.protocol import create_message
@@ -85,11 +85,11 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase):
@kafka_versions("0.8.1", "0.8.1.1", "0.8.2.1")
def test_commit_fetch_offsets(self):
- req = OffsetCommitRequest(self.bytes_topic, 0, 42, b"metadata")
+ req = OffsetCommitRequestPayload(self.bytes_topic, 0, 42, b"metadata")
(resp,) = self.client.send_offset_commit_request(b"group", [req])
self.assertEqual(resp.error, 0)
- req = OffsetFetchRequest(self.bytes_topic, 0)
+ req = OffsetFetchRequestPayload(self.bytes_topic, 0)
(resp,) = self.client.send_offset_fetch_request(b"group", [req])
self.assertEqual(resp.error, 0)
self.assertEqual(resp.offset, 42)
diff --git a/test/test_consumer.py b/test/test_consumer.py
index 31b7e72..ffce578 100644
--- a/test/test_consumer.py
+++ b/test/test_consumer.py
@@ -4,7 +4,7 @@ from . import unittest
from kafka import SimpleConsumer, KafkaConsumer, MultiProcessConsumer
from kafka.common import (
- KafkaConfigurationError, FetchResponsePayload, OffsetFetchResponse,
+ KafkaConfigurationError, FetchResponsePayload, OffsetFetchResponsePayload,
FailedPayloadsError, OffsetAndMessage,
NotLeaderForPartitionError, UnknownTopicOrPartitionError
)
@@ -86,7 +86,7 @@ class TestSimpleConsumer(unittest.TestCase):
client.get_partition_ids_for_topic.return_value = [0, 1]
def mock_offset_fetch_request(group, payloads, **kwargs):
- return [OffsetFetchResponse(p.topic, p.partition, 0, b'', 0) for p in payloads]
+ return [OffsetFetchResponsePayload(p.topic, p.partition, 0, b'', 0) for p in payloads]
client.send_offset_fetch_request.side_effect = mock_offset_fetch_request
diff --git a/test/test_protocol.py b/test/test_protocol.py
index c5086b1..8cd4fee 100644
--- a/test/test_protocol.py
+++ b/test/test_protocol.py
@@ -7,8 +7,8 @@ from . import unittest
from kafka.codec import has_snappy, gzip_decode, snappy_decode
from kafka.common import (
- OffsetRequestPayload, OffsetCommitRequest, OffsetFetchRequest,
- OffsetResponsePayload, OffsetCommitResponse, OffsetFetchResponse,
+ OffsetRequestPayload, OffsetCommitRequestPayload, OffsetFetchRequestPayload,
+ OffsetResponsePayload, OffsetCommitResponsePayload, OffsetFetchResponsePayload,
ProduceRequestPayload, FetchRequestPayload, Message, ChecksumError,
ProduceResponsePayload, FetchResponsePayload, OffsetAndMessage,
BrokerMetadata, TopicMetadata, PartitionMetadata, TopicAndPartition,