summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-07-16 21:20:42 -0700
committerGitHub <noreply@github.com>2016-07-16 21:20:42 -0700
commit5ab4d5c274112a4e2024dea415a0ec4b79009a28 (patch)
tree2f75731a028194d92d8df916a2a6c553385aae80
parent2a7f4dbb8159464941afa25d49428976cc05f902 (diff)
parent277f0ddd61c230181f5f21d427070ec44b36a257 (diff)
downloadkafka-python-5ab4d5c274112a4e2024dea415a0ec4b79009a28.tar.gz
Merge pull request #762 from dpkp/metadata_v1
Use Metadata Request/Response v1 with 0.10+ brokers
-rw-r--r--kafka/client.py6
-rw-r--r--kafka/client_async.py17
-rw-r--r--kafka/cluster.py60
-rw-r--r--kafka/protocol/metadata.py40
-rw-r--r--kafka/protocol/types.py16
-rw-r--r--kafka/structs.py2
-rw-r--r--test/test_client.py61
-rw-r--r--test/test_client_async.py4
8 files changed, 154 insertions, 52 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 891ae03..8a34cc4 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -137,7 +137,7 @@ class SimpleClient(object):
kafka.errors.check_error(resp)
# Otherwise return the BrokerMetadata
- return BrokerMetadata(resp.nodeId, resp.host, resp.port)
+ return BrokerMetadata(resp.nodeId, resp.host, resp.port, None)
def _next_id(self):
"""Generate a new correlation id"""
@@ -525,7 +525,7 @@ class SimpleClient(object):
log.debug('Updating broker metadata: %s', resp.brokers)
log.debug('Updating topic metadata: %s', [topic for _, topic, _ in resp.topics])
- self.brokers = dict([(nodeId, BrokerMetadata(nodeId, host, port))
+ self.brokers = dict([(nodeId, BrokerMetadata(nodeId, host, port, None))
for nodeId, host, port in resp.brokers])
for error, topic, partitions in resp.topics:
@@ -577,7 +577,7 @@ class SimpleClient(object):
# (not sure how this could happen. server could be in bad state)
else:
self.topics_to_brokers[topic_part] = BrokerMetadata(
- leader, None, None
+ leader, None, None, None
)
def send_metadata_request(self, payloads=[], fail_on_error=True,
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 6fa9434..e064d51 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -178,7 +178,11 @@ class KafkaClient(object):
time.sleep(next_at - now)
self._last_bootstrap = time.time()
- metadata_request = MetadataRequest[0]([])
+ if self.config['api_version'] is None or self.config['api_version'] < (0, 10):
+ metadata_request = MetadataRequest[0]([])
+ else:
+ metadata_request = MetadataRequest[1](None)
+
for host, port, afi in hosts:
log.debug("Attempting to bootstrap via node at %s:%s", host, port)
cb = functools.partial(self._conn_state_change, 'bootstrap')
@@ -643,10 +647,17 @@ class KafkaClient(object):
topics = list(self._topics)
if self.cluster.need_all_topic_metadata:
- topics = []
+ if self.config['api_version'] < (0, 10):
+ topics = []
+ else:
+ topics = None
if self._can_send_request(node_id):
- request = MetadataRequest[0](topics)
+ if self.config['api_version'] < (0, 10):
+ api_version = 0
+ else:
+ api_version = 1
+ request = MetadataRequest[api_version](topics)
log.debug("Sending metadata request %s to node %s", request, node_id)
future = self.send(node_id, request)
future.add_callback(self.cluster.update_metadata)
diff --git a/kafka/cluster.py b/kafka/cluster.py
index 9aabec1..694e115 100644
--- a/kafka/cluster.py
+++ b/kafka/cluster.py
@@ -34,6 +34,8 @@ class ClusterMetadata(object):
self._lock = threading.Lock()
self.need_all_topic_metadata = False
self.unauthorized_topics = set()
+ self.internal_topics = set()
+ self.controller = None
self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config:
@@ -150,13 +152,23 @@ class ClusterMetadata(object):
self._future = Future()
return self._future
- def topics(self):
+ def topics(self, exclude_internal_topics=True):
"""Get set of known topics.
+ Arguments:
+ exclude_internal_topics (bool): Whether records from internal topics
+ (such as offsets) should be exposed to the consumer. If set to
+ True the only way to receive records from an internal topic is
+ subscribing to it. Default True
+
Returns:
set: {topic (str), ...}
"""
- return set(self._partitions.keys())
+ topics = set(self._partitions.keys())
+ if exclude_internal_topics:
+ return topics - self.internal_topics
+ else:
+ return topics
def failed_update(self, exception):
"""Update cluster state given a failed MetadataRequest."""
@@ -180,23 +192,41 @@ class ClusterMetadata(object):
# In the common case where we ask for a single topic and get back an
# error, we should fail the future
if len(metadata.topics) == 1 and metadata.topics[0][0] != 0:
- error_code, topic, _ = metadata.topics[0]
+ error_code, topic = metadata.topics[0][:2]
error = Errors.for_code(error_code)(topic)
return self.failed_update(error)
if not metadata.brokers:
log.warning("No broker metadata found in MetadataResponse")
- for node_id, host, port in metadata.brokers:
+ for broker in metadata.brokers:
+ if metadata.API_VERSION == 0:
+ node_id, host, port = broker
+ rack = None
+ else:
+ node_id, host, port, rack = broker
self._brokers.update({
- node_id: BrokerMetadata(node_id, host, port)
+ node_id: BrokerMetadata(node_id, host, port, rack)
})
+ if metadata.API_VERSION == 0:
+ self.controller = None
+ else:
+ self.controller = self._brokers.get(metadata.controller_id)
+
_new_partitions = {}
_new_broker_partitions = collections.defaultdict(set)
_new_unauthorized_topics = set()
+ _new_internal_topics = set()
- for error_code, topic, partitions in metadata.topics:
+ for topic_data in metadata.topics:
+ if metadata.API_VERSION == 0:
+ error_code, topic, partitions = topic_data
+ is_internal = False
+ else:
+ error_code, topic, is_internal, partitions = topic_data
+ if is_internal:
+ _new_internal_topics.add(topic)
error_type = Errors.for_code(error_code)
if error_type is Errors.NoError:
_new_partitions[topic] = {}
@@ -226,6 +256,7 @@ class ClusterMetadata(object):
self._partitions = _new_partitions
self._broker_partitions = _new_broker_partitions
self.unauthorized_topics = _new_unauthorized_topics
+ self.internal_topics = _new_internal_topics
f = None
if self._future:
f = self._future
@@ -272,7 +303,8 @@ class ClusterMetadata(object):
coordinator = BrokerMetadata(
response.coordinator_id,
response.host,
- response.port)
+ response.port,
+ None)
# Assume that group coordinators are just brokers
# (this is true now, but could diverge in future)
@@ -281,12 +313,14 @@ class ClusterMetadata(object):
# If this happens, either brokers have moved without
# changing IDs, or our assumption above is wrong
- elif coordinator != self._brokers[node_id]:
- log.error("GroupCoordinator metadata conflicts with existing"
- " broker metadata. Coordinator: %s, Broker: %s",
- coordinator, self._brokers[node_id])
- self._groups[group] = node_id
- return False
+ else:
+ node = self._brokers[node_id]
+ if coordinator.host != node.host or coordinator.port != node.port:
+ log.error("GroupCoordinator metadata conflicts with existing"
+ " broker metadata. Coordinator: %s, Broker: %s",
+ coordinator, node)
+ self._groups[group] = node_id
+ return False
log.info("Group coordinator for %s is %s", group, coordinator)
self._groups[group] = node_id
diff --git a/kafka/protocol/metadata.py b/kafka/protocol/metadata.py
index 8063dda..2711abb 100644
--- a/kafka/protocol/metadata.py
+++ b/kafka/protocol/metadata.py
@@ -1,5 +1,5 @@
from .struct import Struct
-from .types import Array, Int16, Int32, Schema, String
+from .types import Array, Boolean, Int16, Int32, Schema, String
class MetadataResponse_v0(Struct):
@@ -22,14 +22,46 @@ class MetadataResponse_v0(Struct):
)
+class MetadataResponse_v1(Struct):
+ API_KEY = 3
+ API_VERSION = 1
+ SCHEMA = Schema(
+ ('brokers', Array(
+ ('node_id', Int32),
+ ('host', String('utf-8')),
+ ('port', Int32),
+ ('rack', String('utf-8')))),
+ ('controller_id', Int32),
+ ('topics', Array(
+ ('error_code', Int16),
+ ('topic', String('utf-8')),
+ ('is_internal', Boolean),
+ ('partitions', Array(
+ ('error_code', Int16),
+ ('partition', Int32),
+ ('leader', Int32),
+ ('replicas', Array(Int32)),
+ ('isr', Array(Int32))))))
+ )
+
+
class MetadataRequest_v0(Struct):
API_KEY = 3
API_VERSION = 0
RESPONSE_TYPE = MetadataResponse_v0
SCHEMA = Schema(
- ('topics', Array(String('utf-8')))
+ ('topics', Array(String('utf-8'))) # Empty Array (len 0) for all topics
+ )
+
+
+class MetadataRequest_v1(Struct):
+ API_KEY = 3
+ API_VERSION = 1
+ RESPONSE_TYPE = MetadataResponse_v1
+ SCHEMA = Schema(
+ ('topics', Array(String('utf-8'))) # Null Array (len -1) for all topics
)
-MetadataRequest = [MetadataRequest_v0]
-MetadataResponse = [MetadataResponse_v0]
+MetadataRequest = [MetadataRequest_v0, MetadataRequest_v1]
+MetadataResponse = [MetadataResponse_v0, MetadataResponse_v1]
diff --git a/kafka/protocol/types.py b/kafka/protocol/types.py
index 18aaca1..da10326 100644
--- a/kafka/protocol/types.py
+++ b/kafka/protocol/types.py
@@ -99,6 +99,16 @@ class Bytes(AbstractType):
return value
+class Boolean(AbstractType):
+ @classmethod
+ def encode(cls, value):
+ return _pack('>?', value)
+
+ @classmethod
+ def decode(cls, data):
+ return _unpack('>?', data.read(1))
+
+
class Schema(AbstractType):
def __init__(self, *fields):
if fields:
@@ -145,6 +155,8 @@ class Array(AbstractType):
raise ValueError('Array instantiated with no array_of type')
def encode(self, items):
+ if items is None:
+ return Int32.encode(-1)
return b''.join(
[Int32.encode(len(items))] +
[self.array_of.encode(item) for item in items]
@@ -152,7 +164,11 @@ class Array(AbstractType):
def decode(self, data):
length = Int32.decode(data)
+ if length == -1:
+ return None
return [self.array_of.decode(data) for _ in range(length)]
def repr(self, list_of_items):
+ if list_of_items is None:
+ return 'NULL'
return '[' + ', '.join([self.array_of.repr(item) for item in list_of_items]) + ']'
diff --git a/kafka/structs.py b/kafka/structs.py
index 5902930..3188516 100644
--- a/kafka/structs.py
+++ b/kafka/structs.py
@@ -58,7 +58,7 @@ TopicPartition = namedtuple("TopicPartition",
["topic", "partition"])
BrokerMetadata = namedtuple("BrokerMetadata",
- ["nodeId", "host", "port"])
+ ["nodeId", "host", "port", "rack"])
PartitionMetadata = namedtuple("PartitionMetadata",
["topic", "partition", "leader", "replicas", "isr", "error"])
diff --git a/test/test_client.py b/test/test_client.py
index 660af61..79ac8be 100644
--- a/test/test_client.py
+++ b/test/test_client.py
@@ -1,6 +1,7 @@
import socket
from mock import ANY, MagicMock, patch
+from operator import itemgetter
import six
from . import unittest
@@ -117,9 +118,10 @@ class TestSimpleClient(unittest.TestCase):
mock_conn(conn)
brokers = [
- BrokerMetadata(0, 'broker_1', 4567),
- BrokerMetadata(1, 'broker_2', 5678)
+ BrokerMetadata(0, 'broker_1', 4567, None),
+ BrokerMetadata(1, 'broker_2', 5678, None)
]
+ resp0_brokers = list(map(itemgetter(0, 1, 2), brokers))
topics = [
(NO_ERROR, 'topic_1', [
@@ -137,7 +139,7 @@ class TestSimpleClient(unittest.TestCase):
(NO_ERROR, 2, 0, [0, 1], [0, 1])
])
]
- protocol.decode_metadata_response.return_value = MetadataResponse[0](brokers, topics)
+ protocol.decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics)
# client loads metadata at init
client = SimpleClient(hosts=['broker_1:4567'])
@@ -167,9 +169,10 @@ class TestSimpleClient(unittest.TestCase):
mock_conn(conn)
brokers = [
- BrokerMetadata(0, 'broker_1', 4567),
- BrokerMetadata(1, 'broker_2', 5678)
+ BrokerMetadata(0, 'broker_1', 4567, None),
+ BrokerMetadata(1, 'broker_2', 5678, None)
]
+ resp0_brokers = list(map(itemgetter(0, 1, 2), brokers))
topics = [
(NO_LEADER, 'topic_still_creating', []),
@@ -179,7 +182,7 @@ class TestSimpleClient(unittest.TestCase):
(NO_LEADER, 1, -1, [], []),
]),
]
- protocol.decode_metadata_response.return_value = MetadataResponse[0](brokers, topics)
+ protocol.decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics)
client = SimpleClient(hosts=['broker_1:4567'])
@@ -197,9 +200,10 @@ class TestSimpleClient(unittest.TestCase):
mock_conn(conn)
brokers = [
- BrokerMetadata(0, 'broker_1', 4567),
- BrokerMetadata(1, 'broker_2', 5678)
+ BrokerMetadata(0, 'broker_1', 4567, None),
+ BrokerMetadata(1, 'broker_2', 5678, None)
]
+ resp0_brokers = list(map(itemgetter(0, 1, 2), brokers))
topics = [
(NO_LEADER, 'topic_still_creating', []),
@@ -209,7 +213,7 @@ class TestSimpleClient(unittest.TestCase):
(NO_LEADER, 1, -1, [], []),
]),
]
- decode_metadata_response.return_value = MetadataResponse[0](brokers, topics)
+ decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics)
client = SimpleClient(hosts=['broker_1:4567'])
@@ -230,14 +234,15 @@ class TestSimpleClient(unittest.TestCase):
mock_conn(conn)
brokers = [
- BrokerMetadata(0, 'broker_1', 4567),
- BrokerMetadata(1, 'broker_2', 5678)
+ BrokerMetadata(0, 'broker_1', 4567, None),
+ BrokerMetadata(1, 'broker_2', 5678, None)
]
+ resp0_brokers = list(map(itemgetter(0, 1, 2), brokers))
topics = [
(NO_LEADER, 'topic_no_partitions', [])
]
- protocol.decode_metadata_response.return_value = MetadataResponse[0](brokers, topics)
+ protocol.decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics)
client = SimpleClient(hosts=['broker_1:4567'])
@@ -249,7 +254,7 @@ class TestSimpleClient(unittest.TestCase):
(NO_ERROR, 0, 0, [0, 1], [0, 1])
])
]
- protocol.decode_metadata_response.return_value = MetadataResponse[0](brokers, topics)
+ protocol.decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics)
# calling _get_leader_for_partition (from any broker aware request)
# will try loading metadata again for the same topic
@@ -267,15 +272,16 @@ class TestSimpleClient(unittest.TestCase):
mock_conn(conn)
brokers = [
- BrokerMetadata(0, 'broker_1', 4567),
- BrokerMetadata(1, 'broker_2', 5678)
+ BrokerMetadata(0, 'broker_1', 4567, None),
+ BrokerMetadata(1, 'broker_2', 5678, None)
]
+ resp0_brokers = list(map(itemgetter(0, 1, 2), brokers))
topics = [
(NO_LEADER, 'topic_no_partitions', []),
(UNKNOWN_TOPIC_OR_PARTITION, 'topic_unknown', []),
]
- protocol.decode_metadata_response.return_value = MetadataResponse[0](brokers, topics)
+ protocol.decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics)
client = SimpleClient(hosts=['broker_1:4567'])
@@ -294,9 +300,10 @@ class TestSimpleClient(unittest.TestCase):
mock_conn(conn)
brokers = [
- BrokerMetadata(0, 'broker_1', 4567),
- BrokerMetadata(1, 'broker_2', 5678)
+ BrokerMetadata(0, 'broker_1', 4567, None),
+ BrokerMetadata(1, 'broker_2', 5678, None)
]
+ resp0_brokers = list(map(itemgetter(0, 1, 2), brokers))
topics = [
(NO_ERROR, 'topic_noleader', [
@@ -304,7 +311,7 @@ class TestSimpleClient(unittest.TestCase):
(NO_LEADER, 1, -1, [], []),
]),
]
- protocol.decode_metadata_response.return_value = MetadataResponse[0](brokers, topics)
+ protocol.decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics)
client = SimpleClient(hosts=['broker_1:4567'])
self.assertDictEqual(
@@ -330,7 +337,7 @@ class TestSimpleClient(unittest.TestCase):
(NO_ERROR, 1, 1, [1, 0], [1, 0])
]),
]
- protocol.decode_metadata_response.return_value = MetadataResponse[0](brokers, topics)
+ protocol.decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics)
self.assertEqual(brokers[0], client._get_leader_for_partition('topic_noleader', 0))
self.assertEqual(brokers[1], client._get_leader_for_partition('topic_noleader', 1))
@@ -340,9 +347,10 @@ class TestSimpleClient(unittest.TestCase):
mock_conn(conn)
brokers = [
- BrokerMetadata(0, 'broker_1', 4567),
- BrokerMetadata(1, 'broker_2', 5678)
+ BrokerMetadata(0, 'broker_1', 4567, None),
+ BrokerMetadata(1, 'broker_2', 5678, None)
]
+ resp0_brokers = list(map(itemgetter(0, 1, 2), brokers))
topics = [
(NO_ERROR, 'topic_noleader', [
@@ -350,7 +358,7 @@ class TestSimpleClient(unittest.TestCase):
(NO_LEADER, 1, -1, [], []),
]),
]
- protocol.decode_metadata_response.return_value = MetadataResponse[0](brokers, topics)
+ protocol.decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics)
client = SimpleClient(hosts=['broker_1:4567'])
@@ -368,14 +376,15 @@ class TestSimpleClient(unittest.TestCase):
mock_conn(conn)
brokers = [
- BrokerMetadata(0, 'broker_1', 4567),
- BrokerMetadata(1, 'broker_2', 5678)
+ BrokerMetadata(0, 'broker_1', 4567, None),
+ BrokerMetadata(1, 'broker_2', 5678, None)
]
+ resp0_brokers = list(map(itemgetter(0, 1, 2), brokers))
topics = [
(UNKNOWN_TOPIC_OR_PARTITION, 'topic_doesnt_exist', []),
]
- protocol.decode_metadata_response.return_value = MetadataResponse[0](brokers, topics)
+ protocol.decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics)
client = SimpleClient(hosts=['broker_1:4567'])
diff --git a/test/test_client_async.py b/test/test_client_async.py
index dfe11ea..aa91704 100644
--- a/test/test_client_async.py
+++ b/test/test_client_async.py
@@ -53,8 +53,8 @@ def test_bootstrap_success(conn):
conn.connect.assert_called_with()
conn.send.assert_called_once_with(MetadataRequest[0]([]))
assert cli._bootstrap_fails == 0
- assert cli.cluster.brokers() == set([BrokerMetadata(0, 'foo', 12),
- BrokerMetadata(1, 'bar', 34)])
+ assert cli.cluster.brokers() == set([BrokerMetadata(0, 'foo', 12, None),
+ BrokerMetadata(1, 'bar', 34, None)])
def test_bootstrap_failure(conn):
conn.state = ConnectionStates.DISCONNECTED