summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2016-01-10 00:25:12 -0800
committerDana Powers <dana.powers@rd.io>2016-01-10 00:25:12 -0800
commitb7104957f7294d3cb0e47d47ff1b6710acf5653e (patch)
tree3a6cd7f5fd7e7a782982169529c9a42cbbe6d476
parentcc22d1bab82fd234f2a47d347152a321aaa0b53e (diff)
downloadkafka-python-b7104957f7294d3cb0e47d47ff1b6710acf5653e.tar.gz
Move ConsumerProtocol definition to kafka.coordinator.protocol
-rw-r--r--kafka/coordinator/assignors/roundrobin.py2
-rw-r--r--kafka/coordinator/consumer.py44
-rw-r--r--kafka/coordinator/protocol.py33
3 files changed, 44 insertions, 35 deletions
diff --git a/kafka/coordinator/assignors/roundrobin.py b/kafka/coordinator/assignors/roundrobin.py
index 55b73e1..d7cd884 100644
--- a/kafka/coordinator/assignors/roundrobin.py
+++ b/kafka/coordinator/assignors/roundrobin.py
@@ -6,7 +6,7 @@ import six
from .abstract import AbstractPartitionAssignor
from ...common import TopicPartition
-from ..consumer import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment
+from ..protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment
log = logging.getLogger(__name__)
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index 48d5e14..af3e019 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -1,3 +1,5 @@
+from __future__ import absolute_import
+
import copy
import collections
import logging
@@ -6,44 +8,18 @@ import time
import six
from .base import BaseCoordinator
-import kafka.common as Errors
-from kafka.common import OffsetAndMetadata, TopicPartition
-from kafka.future import Future
-from kafka.protocol.commit import (
+from .protocol import (
+ ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment,
+ ConsumerProtocol)
+from ..common import OffsetAndMetadata, TopicPartition
+from ..future import Future
+from ..protocol.commit import (
OffsetCommitRequest_v2, OffsetCommitRequest_v1, OffsetCommitRequest_v0,
OffsetFetchRequest_v0, OffsetFetchRequest_v1)
-from kafka.protocol.struct import Struct
-from kafka.protocol.types import Array, Bytes, Int16, Int32, Schema, String
-
-log = logging.getLogger(__name__)
-
-
-class ConsumerProtocolMemberMetadata(Struct):
- SCHEMA = Schema(
- ('version', Int16),
- ('subscription', Array(String('utf-8'))),
- ('user_data', Bytes))
-
-
-class ConsumerProtocolMemberAssignment(Struct):
- SCHEMA = Schema(
- ('version', Int16),
- ('assignment', Array(
- ('topic', String('utf-8')),
- ('partitions', Array(Int32)))),
- ('user_data', Bytes))
-
- def partitions(self):
- return [TopicPartition(topic, partition)
- for topic, partitions in self.assignment # pylint: disable-msg=no-member
- for partition in partitions]
+import kafka.common as Errors
-class ConsumerProtocol(object):
- PROTOCOL_TYPE = 'consumer'
- ASSIGNMENT_STRATEGIES = ('roundrobin',)
- METADATA = ConsumerProtocolMemberMetadata
- ASSIGNMENT = ConsumerProtocolMemberAssignment
+log = logging.getLogger(__name__)
class ConsumerCoordinator(BaseCoordinator):
diff --git a/kafka/coordinator/protocol.py b/kafka/coordinator/protocol.py
new file mode 100644
index 0000000..9af7225
--- /dev/null
+++ b/kafka/coordinator/protocol.py
@@ -0,0 +1,33 @@
+from __future__ import absolute_import
+
+from kafka.common import TopicPartition
+from kafka.protocol.struct import Struct
+from kafka.protocol.types import Array, Bytes, Int16, Int32, Schema, String
+
+
+class ConsumerProtocolMemberMetadata(Struct):
+ SCHEMA = Schema(
+ ('version', Int16),
+ ('subscription', Array(String('utf-8'))),
+ ('user_data', Bytes))
+
+
+class ConsumerProtocolMemberAssignment(Struct):
+ SCHEMA = Schema(
+ ('version', Int16),
+ ('assignment', Array(
+ ('topic', String('utf-8')),
+ ('partitions', Array(Int32)))),
+ ('user_data', Bytes))
+
+ def partitions(self):
+ return [TopicPartition(topic, partition)
+ for topic, partitions in self.assignment # pylint: disable-msg=no-member
+ for partition in partitions]
+
+
+class ConsumerProtocol(object):
+ PROTOCOL_TYPE = 'consumer'
+ ASSIGNMENT_STRATEGIES = ('roundrobin',)
+ METADATA = ConsumerProtocolMemberMetadata
+ ASSIGNMENT = ConsumerProtocolMemberAssignment