From 1d89ac72f5efc4ef9b9746a41c94d45519c8f065 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 16 Feb 2018 12:15:11 -0800 Subject: Scratching on __main__ --- kafka/__main__.py | 104 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 104 insertions(+) create mode 100644 kafka/__main__.py diff --git a/kafka/__main__.py b/kafka/__main__.py new file mode 100644 index 0000000..d22fed1 --- /dev/null +++ b/kafka/__main__.py @@ -0,0 +1,104 @@ +from __future__ import absolute_import, print_function + +import argparse +import operator +from pprint import pprint +import sys +import time + +from kafka import KafkaConsumer, KafkaProducer +from kafka.client_async import KafkaClient +from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment +from kafka.errors import for_code as error_for_code +from kafka.protocol.admin import ListGroupsRequest, DescribeGroupsRequest +from kafka.protocol.commit import GroupCoordinatorRequest + +def print_padded(data): + fmt = [] + for i in range(len(data[0])): + pad = max([len(str(operator.getitem(row, i))) for row in data]) + fmt.append('{%d:<%d}' % (i, pad)) + fmt_str = ' '.join(fmt) + for row in data: + print(fmt_str.format(*row)) + + +def check_future(future): + if future.failed(): + print(future.exception) + sys.exit(getattr(future.exception, 'errno', 128)) + elif getattr(future.value, 'error_code', 0) != 0: + print(error_for_code(future.value.error_code)) + sys.exit(future.value.error_code) + + +parser = argparse.ArgumentParser() +parser.add_argument('--server', required=True, action='append') +parser.add_argument('--list-topics', action='store_true') +parser.add_argument('--list-groups', action='store_true') +parser.add_argument('--describe-group') + +args = parser.parse_args() + +cli = KafkaClient(bootstrap_servers=args.server) + +if args.list_topics: + cli.cluster.need_all_topic_metadata = True + metadata = cli.poll(future=cli.cluster.request_update())[0] + + topics = [topic for _, topic, _, _ in metadata.topics] + for topic in sorted(topics): + print(topic) + + +elif args.list_groups: + req = ListGroupsRequest[0]() + + # start connections to all brokers + [cli.ready(broker.nodeId) for broker in cli.cluster.brokers()] + + results = [('GROUP', 'TYPE', 'NODE', 'HOST')] + for broker in cli.cluster.brokers(): + while not cli.ready(broker.nodeId): + time.sleep(0.1) + future = cli.send(broker.nodeId, req) + cli.poll(future=future) + check_future(future) + for group_name, group_type in future.value.groups: + results.append((group_name, group_type, broker.nodeId, broker.host)) + print_padded(results[0:1] + sorted(results[1:])) + + +elif args.describe_group: + req = GroupCoordinatorRequest[0](args.describe_group) + node_id = cli.least_loaded_node() + future = cli.send(node_id, req) + cli.poll(future=future) + check_future(future) + + req = DescribeGroupsRequest[0]([args.describe_group]) + coordinator_id = future.value.coordinator_id + while not cli.ready(coordinator_id): + time.sleep(0.1) + future = cli.send(coordinator_id, req) + cli.poll(future=future) + check_future(future) + + for error_code, group, state, protocol_type, protocol, members in future.value.groups: + results = [('GROUP', 'STATE', 'TYPE', 'PROTOCOL', 'MEMBERS', 'ERRORCODE')] + results.append((group, state, protocol_type, protocol, len(members), error_code)) + print_padded(results) + print() + results = [('MEMBER', 'CLIENT', 'HOST', 'VERSION', 'SUBSCRIPTION', 'ASSIGNMENT')] + for member_id, client_id, client_host, metadata_bytes, assignment_bytes in sorted(members): + metadata = ConsumerProtocolMemberMetadata.decode(metadata_bytes) + assignment = ConsumerProtocolMemberAssignment.decode(assignment_bytes) + assigned = {} + for topic, partitions in assignment.assignment: + assigned[topic] = partitions + + # Print a new row per client - subscribed topic + for topic in metadata.subscription: + results.append((member_id, client_id, client_host, metadata.version, + topic, assigned.get(topic, []))) + print_padded(results[0:1] + sorted(results[1:])) -- cgit v1.2.1