diff options
author | Dana Powers <dana.powers@gmail.com> | 2018-11-23 10:17:59 -0800 |
---|---|---|
committer | Jeff Widman <jeff@jeffwidman.com> | 2019-01-03 13:46:47 -0800 |
commit | 2e0ada055886ad01cc193b1007d3f79717b5c9df (patch) | |
tree | 335b8f1c9a3c867a8b6590b26fac139c9980a01d | |
parent | 807ac8244cd39ca8426cfeda245ec27802c0a600 (diff) | |
download | kafka-python-2e0ada055886ad01cc193b1007d3f79717b5c9df.tar.gz |
Fix response error checking in KafkaAdminClient send_to_controller
Previously we weren't accounting for when the response tuple also has a
`error_message` value.
Note that in Java, the error fieldname is inconsistent:
- `CreateTopicsResponse` / `CreatePartitionsResponse` uses `topic_errors`
- `DeleteTopicsResponse` uses `topic_error_codes`
So this updates the `CreateTopicsResponse` classes to match.
The fix is a little brittle, but should suffice for now.
-rw-r--r-- | kafka/admin/client.py | 14 | ||||
-rw-r--r-- | kafka/protocol/admin.py | 6 |
2 files changed, 15 insertions, 5 deletions
diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 4c780fb..bd173b9 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -331,8 +331,18 @@ class KafkaAdminClient(object): while tries: tries -= 1 response = self._send_request_to_node(self._controller_id, request) - # DeleteTopicsResponse returns topic_error_codes rather than topic_errors - for topic, error_code in getattr(response, "topic_errors", response.topic_error_codes): + # In Java, the error fieldname is inconsistent: + # - CreateTopicsResponse / CreatePartitionsResponse uses topic_errors + # - DeleteTopicsResponse uses topic_error_codes + # So this is a little brittle in that it assumes all responses have + # one of these attributes and that they always unpack into + # (topic, error_code) tuples. + topic_error_tuples = getattr(response, "topic_errors", response.topic_error_codes) + # Also small py2/py3 compatibility -- py3 can ignore extra values + # during unpack via: for x, y, *rest in list_of_values. py2 cannot. + # So for now we have to map across the list and explicitly drop any + # extra values (usually the error_message) + for topic, error_code in map(lambda e: e[:2], topic_error_tuples): error_type = Errors.for_code(error_code) if tries and error_type is NotControllerError: # No need to inspect the rest of the errors for diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py index fc62c35..e6efad7 100644 --- a/kafka/protocol/admin.py +++ b/kafka/protocol/admin.py @@ -51,7 +51,7 @@ class CreateTopicsResponse_v0(Response): API_KEY = 19 API_VERSION = 0 SCHEMA = Schema( - ('topic_error_codes', Array( + ('topic_errors', Array( ('topic', String('utf-8')), ('error_code', Int16))) ) @@ -61,7 +61,7 @@ class CreateTopicsResponse_v1(Response): API_KEY = 19 API_VERSION = 1 SCHEMA = Schema( - ('topic_error_codes', Array( + ('topic_errors', Array( ('topic', String('utf-8')), ('error_code', Int16), ('error_message', String('utf-8')))) @@ -73,7 +73,7 @@ class CreateTopicsResponse_v2(Response): API_VERSION = 2 SCHEMA = Schema( ('throttle_time_ms', Int32), - ('topic_error_codes', Array( + ('topic_errors', Array( ('topic', String('utf-8')), ('error_code', Int16), ('error_message', String('utf-8')))) |