diff options
author | Dana Powers <dana.powers@rd.io> | 2015-12-10 10:57:27 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-12-10 11:25:15 -0800 |
commit | 7470cade6bb8629d17541e136527369f9d2ec387 (patch) | |
tree | ba424c4d0cc27ffb5ec77196a0e121f3075bc992 | |
parent | c3d2fda3c368771cb93a09bb2f1edaa7a3cf9c2b (diff) | |
download | kafka-python-7470cade6bb8629d17541e136527369f9d2ec387.tar.gz |
Convert OffsetCommit and OffsetFetch protocol encode/decode
-rw-r--r-- | kafka/common.py | 16 | ||||
-rw-r--r-- | kafka/consumer/base.py | 6 | ||||
-rw-r--r-- | kafka/consumer/kafka.py | 6 | ||||
-rw-r--r-- | kafka/context.py | 4 | ||||
-rw-r--r-- | kafka/protocol/legacy.py | 128 | ||||
-rw-r--r-- | test/test_client_integration.py | 6 | ||||
-rw-r--r-- | test/test_consumer.py | 4 | ||||
-rw-r--r-- | test/test_protocol.py | 4 |
8 files changed, 69 insertions, 105 deletions
diff --git a/kafka/common.py b/kafka/common.py index 7ae3294..4fc1e19 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -28,30 +28,30 @@ ProduceResponsePayload = namedtuple("ProduceResponsePayload", ["topic", "partition", "error", "offset"]) # https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-FetchAPI -FetchRequestPayload = namedtuple("FetchRequest", +FetchRequestPayload = namedtuple("FetchRequestPayload", ["topic", "partition", "offset", "max_bytes"]) -FetchResponsePayload = namedtuple("FetchResponse", +FetchResponsePayload = namedtuple("FetchResponsePayload", ["topic", "partition", "error", "highwaterMark", "messages"]) # https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI -OffsetRequestPayload = namedtuple("OffsetRequest", +OffsetRequestPayload = namedtuple("OffsetRequestPayload", ["topic", "partition", "time", "max_offsets"]) -OffsetResponsePayload = namedtuple("OffsetResponse", +OffsetResponsePayload = namedtuple("OffsetResponsePayload", ["topic", "partition", "error", "offsets"]) # https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI -OffsetCommitRequest = namedtuple("OffsetCommitRequest", +OffsetCommitRequestPayload = namedtuple("OffsetCommitRequestPayload", ["topic", "partition", "offset", "metadata"]) -OffsetCommitResponse = namedtuple("OffsetCommitResponse", +OffsetCommitResponsePayload = namedtuple("OffsetCommitResponsePayload", ["topic", "partition", "error"]) -OffsetFetchRequest = namedtuple("OffsetFetchRequest", +OffsetFetchRequestPayload = namedtuple("OffsetFetchRequestPayload", ["topic", "partition"]) -OffsetFetchResponse = namedtuple("OffsetFetchResponse", +OffsetFetchResponsePayload = namedtuple("OffsetFetchResponsePayload", ["topic", "partition", "offset", "metadata", "error"]) diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py index 034d35c..5859d36 100644 --- a/kafka/consumer/base.py +++ b/kafka/consumer/base.py @@ -7,7 +7,7 @@ from threading import Lock import kafka.common from kafka.common import ( - OffsetRequestPayload, OffsetCommitRequest, OffsetFetchRequest, + OffsetRequestPayload, OffsetCommitRequestPayload, OffsetFetchRequestPayload, UnknownTopicOrPartitionError, check_error, KafkaError ) @@ -101,7 +101,7 @@ class Consumer(object): responses = self.client.send_offset_fetch_request( self.group, - [OffsetFetchRequest(self.topic, p) for p in partitions], + [OffsetFetchRequestPayload(self.topic, p) for p in partitions], fail_on_error=False ) @@ -155,7 +155,7 @@ class Consumer(object): 'group=%s, topic=%s, partition=%s', offset, self.group, self.topic, partition) - reqs.append(OffsetCommitRequest(self.topic, partition, + reqs.append(OffsetCommitRequestPayload(self.topic, partition, offset, None)) try: diff --git a/kafka/consumer/kafka.py b/kafka/consumer/kafka.py index 1bd3def..fa70124 100644 --- a/kafka/consumer/kafka.py +++ b/kafka/consumer/kafka.py @@ -11,7 +11,7 @@ import six from kafka.client import KafkaClient from kafka.common import ( - OffsetFetchRequest, OffsetCommitRequest, + OffsetFetchRequestPayload, OffsetCommitRequestPayload, OffsetRequestPayload, FetchRequestPayload, check_error, NotLeaderForPartitionError, UnknownTopicOrPartitionError, OffsetOutOfRangeError, RequestTimedOutError, KafkaMessage, ConsumerTimeout, @@ -546,7 +546,7 @@ class KafkaConsumer(object): continue commits.append( - OffsetCommitRequest(topic_partition[0], topic_partition[1], + OffsetCommitRequestPayload(topic_partition[0], topic_partition[1], commit_offset, metadata) ) @@ -618,7 +618,7 @@ class KafkaConsumer(object): for topic_partition in self._topics: (resp,) = self._client.send_offset_fetch_request( kafka_bytestring(self._config['group_id']), - [OffsetFetchRequest(topic_partition[0], topic_partition[1])], + [OffsetFetchRequestPayload(topic_partition[0], topic_partition[1])], fail_on_error=False) try: check_error(resp) diff --git a/kafka/context.py b/kafka/context.py index ade4db8..376fad1 100644 --- a/kafka/context.py +++ b/kafka/context.py @@ -3,7 +3,7 @@ Context manager to commit/rollback consumer offsets. """ from logging import getLogger -from kafka.common import check_error, OffsetCommitRequest, OffsetOutOfRangeError +from kafka.common import check_error, OffsetCommitRequestPayload, OffsetOutOfRangeError class OffsetCommitContext(object): @@ -139,7 +139,7 @@ class OffsetCommitContext(object): self.logger.debug("Committing partition offsets: %s", partition_offsets) commit_requests = [ - OffsetCommitRequest(self.consumer.topic, partition, offset, None) + OffsetCommitRequestPayload(self.consumer.topic, partition, offset, None) for partition, offset in partition_offsets.items() ] commit_responses = self.consumer.client.send_offset_commit_request( diff --git a/kafka/protocol/legacy.py b/kafka/protocol/legacy.py index c5babf7..feabed3 100644 --- a/kafka/protocol/legacy.py +++ b/kafka/protocol/legacy.py @@ -19,7 +19,6 @@ from kafka.codec import ( gzip_encode, gzip_decode, snappy_encode, snappy_decode ) from kafka.common import ( - OffsetCommitResponse, OffsetFetchResponse, ProtocolError, ChecksumError, UnsupportedCodecError, ConsumerMetadataResponse @@ -258,8 +257,8 @@ class KafkaProtocol(object): partition, payload.time, payload.max_offsets) - for partition, payload in six.iteritems(topic_payloads)]) - for topic, topic_payloads in six.iteritems(group_by_topic_and_partition(payloads))]) + for partition, payload in six.iteritems(topic_payloads)]) + for topic, topic_payloads in six.iteritems(group_by_topic_and_partition(payloads))]) @classmethod def decode_offset_response(cls, response): @@ -327,115 +326,80 @@ class KafkaProtocol(object): return ConsumerMetadataResponse(error, nodeId, host, port) @classmethod - def encode_offset_commit_request(cls, client_id, correlation_id, - group, payloads): + def encode_offset_commit_request(cls, group, payloads): """ - Encode some OffsetCommitRequest structs + Encode an OffsetCommitRequest struct Arguments: - client_id: string - correlation_id: int group: string, the consumer group you are committing offsets for - payloads: list of OffsetCommitRequest + payloads: list of OffsetCommitRequestPayload """ - grouped_payloads = group_by_topic_and_partition(payloads) - - message = [] - message.append(cls._encode_message_header(client_id, correlation_id, - KafkaProtocol.OFFSET_COMMIT_KEY)) - message.append(write_short_string(group)) - message.append(struct.pack('>i', len(grouped_payloads))) - - for topic, topic_payloads in grouped_payloads.items(): - message.append(write_short_string(topic)) - message.append(struct.pack('>i', len(topic_payloads))) - - for partition, payload in topic_payloads.items(): - message.append(struct.pack('>iq', partition, payload.offset)) - message.append(write_short_string(payload.metadata)) + return kafka.protocol.commit.OffsetCommitRequest_v0( + consumer_group=group, + topics=[( + topic, + [( + partition, + payload.offset, + payload.metadata) + for partition, payload in six.iteritems(topic_payloads)]) + for topic, topic_payloads in six.iteritems(group_by_topic_and_partition(payloads))]) - msg = b''.join(message) - return struct.pack('>i%ds' % len(msg), len(msg), msg) @classmethod - def decode_offset_commit_response(cls, data): + def decode_offset_commit_response(cls, response): """ - Decode bytes to an OffsetCommitResponse + Decode OffsetCommitResponse to an OffsetCommitResponsePayload Arguments: - data: bytes to decode + response: OffsetCommitResponse """ - ((correlation_id,), cur) = relative_unpack('>i', data, 0) - ((num_topics,), cur) = relative_unpack('>i', data, cur) - - for _ in xrange(num_topics): - (topic, cur) = read_short_string(data, cur) - ((num_partitions,), cur) = relative_unpack('>i', data, cur) - - for _ in xrange(num_partitions): - ((partition, error), cur) = relative_unpack('>ih', data, cur) - yield OffsetCommitResponse(topic, partition, error) + return [ + kafka.common.OffsetCommitResponsePayload(topic, partition, error) + for topic, partitions in response.topics + for partition, error in partitions + ] @classmethod - def encode_offset_fetch_request(cls, client_id, correlation_id, - group, payloads, from_kafka=False): + def encode_offset_fetch_request(cls, group, payloads, from_kafka=False): """ - Encode some OffsetFetchRequest structs. The request is encoded using + Encode an OffsetFetchRequest struct. The request is encoded using version 0 if from_kafka is false, indicating a request for Zookeeper offsets. It is encoded using version 1 otherwise, indicating a request for Kafka offsets. Arguments: - client_id: string - correlation_id: int group: string, the consumer group you are fetching offsets for - payloads: list of OffsetFetchRequest + payloads: list of OffsetFetchRequestPayload from_kafka: bool, default False, set True for Kafka-committed offsets """ - grouped_payloads = group_by_topic_and_partition(payloads) - - message = [] - reqver = 1 if from_kafka else 0 - message.append(cls._encode_message_header(client_id, correlation_id, - KafkaProtocol.OFFSET_FETCH_KEY, - version=reqver)) - - message.append(write_short_string(group)) - message.append(struct.pack('>i', len(grouped_payloads))) - - for topic, topic_payloads in grouped_payloads.items(): - message.append(write_short_string(topic)) - message.append(struct.pack('>i', len(topic_payloads))) - - for partition, payload in topic_payloads.items(): - message.append(struct.pack('>i', partition)) + if from_kafka: + request_class = kafka.protocol.commit.OffsetFetchRequest_v1 + else: + request_class = kafka.protocol.commit.OffsetFetchRequest_v0 - msg = b''.join(message) - return struct.pack('>i%ds' % len(msg), len(msg), msg) + return request_class( + consumer_group=group, + topics=[( + topic, + list(topic_payloads.keys())) + for topic, topic_payloads in six.iteritems(group_by_topic_and_partition(payloads))]) @classmethod - def decode_offset_fetch_response(cls, data): + def decode_offset_fetch_response(cls, response): """ - Decode bytes to an OffsetFetchResponse + Decode OffsetFetchResponse to OffsetFetchResponsePayloads Arguments: - data: bytes to decode + response: OffsetFetchResponse """ - - ((correlation_id,), cur) = relative_unpack('>i', data, 0) - ((num_topics,), cur) = relative_unpack('>i', data, cur) - - for _ in range(num_topics): - (topic, cur) = read_short_string(data, cur) - ((num_partitions,), cur) = relative_unpack('>i', data, cur) - - for _ in range(num_partitions): - ((partition, offset), cur) = relative_unpack('>iq', data, cur) - (metadata, cur) = read_short_string(data, cur) - ((error,), cur) = relative_unpack('>h', data, cur) - - yield OffsetFetchResponse(topic, partition, offset, - metadata, error) + return [ + kafka.common.OffsetFetchResponsePayload( + topic, partition, offset, metadata, error + ) + for topic, partitions in response.topics + for partition, offset, metadata, error in partitions + ] def create_message(payload, key=None): diff --git a/test/test_client_integration.py b/test/test_client_integration.py index 70da4a3..edd62da 100644 --- a/test/test_client_integration.py +++ b/test/test_client_integration.py @@ -1,7 +1,7 @@ import os from kafka.common import ( - FetchRequestPayload, OffsetCommitRequest, OffsetFetchRequest, + FetchRequestPayload, OffsetCommitRequestPayload, OffsetFetchRequestPayload, KafkaTimeoutError, ProduceRequestPayload ) from kafka.protocol import create_message @@ -85,11 +85,11 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase): @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.1") def test_commit_fetch_offsets(self): - req = OffsetCommitRequest(self.bytes_topic, 0, 42, b"metadata") + req = OffsetCommitRequestPayload(self.bytes_topic, 0, 42, b"metadata") (resp,) = self.client.send_offset_commit_request(b"group", [req]) self.assertEqual(resp.error, 0) - req = OffsetFetchRequest(self.bytes_topic, 0) + req = OffsetFetchRequestPayload(self.bytes_topic, 0) (resp,) = self.client.send_offset_fetch_request(b"group", [req]) self.assertEqual(resp.error, 0) self.assertEqual(resp.offset, 42) diff --git a/test/test_consumer.py b/test/test_consumer.py index 31b7e72..ffce578 100644 --- a/test/test_consumer.py +++ b/test/test_consumer.py @@ -4,7 +4,7 @@ from . import unittest from kafka import SimpleConsumer, KafkaConsumer, MultiProcessConsumer from kafka.common import ( - KafkaConfigurationError, FetchResponsePayload, OffsetFetchResponse, + KafkaConfigurationError, FetchResponsePayload, OffsetFetchResponsePayload, FailedPayloadsError, OffsetAndMessage, NotLeaderForPartitionError, UnknownTopicOrPartitionError ) @@ -86,7 +86,7 @@ class TestSimpleConsumer(unittest.TestCase): client.get_partition_ids_for_topic.return_value = [0, 1] def mock_offset_fetch_request(group, payloads, **kwargs): - return [OffsetFetchResponse(p.topic, p.partition, 0, b'', 0) for p in payloads] + return [OffsetFetchResponsePayload(p.topic, p.partition, 0, b'', 0) for p in payloads] client.send_offset_fetch_request.side_effect = mock_offset_fetch_request diff --git a/test/test_protocol.py b/test/test_protocol.py index c5086b1..8cd4fee 100644 --- a/test/test_protocol.py +++ b/test/test_protocol.py @@ -7,8 +7,8 @@ from . import unittest from kafka.codec import has_snappy, gzip_decode, snappy_decode from kafka.common import ( - OffsetRequestPayload, OffsetCommitRequest, OffsetFetchRequest, - OffsetResponsePayload, OffsetCommitResponse, OffsetFetchResponse, + OffsetRequestPayload, OffsetCommitRequestPayload, OffsetFetchRequestPayload, + OffsetResponsePayload, OffsetCommitResponsePayload, OffsetFetchResponsePayload, ProduceRequestPayload, FetchRequestPayload, Message, ChecksumError, ProduceResponsePayload, FetchResponsePayload, OffsetAndMessage, BrokerMetadata, TopicMetadata, PartitionMetadata, TopicAndPartition, |