summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2018-11-23 10:17:59 -0800
committerJeff Widman <jeff@jeffwidman.com>2019-01-03 13:46:47 -0800
commit2e0ada055886ad01cc193b1007d3f79717b5c9df (patch)
tree335b8f1c9a3c867a8b6590b26fac139c9980a01d
parent807ac8244cd39ca8426cfeda245ec27802c0a600 (diff)
downloadkafka-python-2e0ada055886ad01cc193b1007d3f79717b5c9df.tar.gz
Fix response error checking in KafkaAdminClient send_to_controller
Previously we weren't accounting for when the response tuple also has a `error_message` value. Note that in Java, the error fieldname is inconsistent: - `CreateTopicsResponse` / `CreatePartitionsResponse` uses `topic_errors` - `DeleteTopicsResponse` uses `topic_error_codes` So this updates the `CreateTopicsResponse` classes to match. The fix is a little brittle, but should suffice for now.
-rw-r--r--kafka/admin/client.py14
-rw-r--r--kafka/protocol/admin.py6
2 files changed, 15 insertions, 5 deletions
diff --git a/kafka/admin/client.py b/kafka/admin/client.py
index 4c780fb..bd173b9 100644
--- a/kafka/admin/client.py
+++ b/kafka/admin/client.py
@@ -331,8 +331,18 @@ class KafkaAdminClient(object):
while tries:
tries -= 1
response = self._send_request_to_node(self._controller_id, request)
- # DeleteTopicsResponse returns topic_error_codes rather than topic_errors
- for topic, error_code in getattr(response, "topic_errors", response.topic_error_codes):
+ # In Java, the error fieldname is inconsistent:
+ # - CreateTopicsResponse / CreatePartitionsResponse uses topic_errors
+ # - DeleteTopicsResponse uses topic_error_codes
+ # So this is a little brittle in that it assumes all responses have
+ # one of these attributes and that they always unpack into
+ # (topic, error_code) tuples.
+ topic_error_tuples = getattr(response, "topic_errors", response.topic_error_codes)
+ # Also small py2/py3 compatibility -- py3 can ignore extra values
+ # during unpack via: for x, y, *rest in list_of_values. py2 cannot.
+ # So for now we have to map across the list and explicitly drop any
+ # extra values (usually the error_message)
+ for topic, error_code in map(lambda e: e[:2], topic_error_tuples):
error_type = Errors.for_code(error_code)
if tries and error_type is NotControllerError:
# No need to inspect the rest of the errors for
diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py
index fc62c35..e6efad7 100644
--- a/kafka/protocol/admin.py
+++ b/kafka/protocol/admin.py
@@ -51,7 +51,7 @@ class CreateTopicsResponse_v0(Response):
API_KEY = 19
API_VERSION = 0
SCHEMA = Schema(
- ('topic_error_codes', Array(
+ ('topic_errors', Array(
('topic', String('utf-8')),
('error_code', Int16)))
)
@@ -61,7 +61,7 @@ class CreateTopicsResponse_v1(Response):
API_KEY = 19
API_VERSION = 1
SCHEMA = Schema(
- ('topic_error_codes', Array(
+ ('topic_errors', Array(
('topic', String('utf-8')),
('error_code', Int16),
('error_message', String('utf-8'))))
@@ -73,7 +73,7 @@ class CreateTopicsResponse_v2(Response):
API_VERSION = 2
SCHEMA = Schema(
('throttle_time_ms', Int32),
- ('topic_error_codes', Array(
+ ('topic_errors', Array(
('topic', String('utf-8')),
('error_code', Int16),
('error_message', String('utf-8'))))