summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2018-02-16 12:15:11 -0800
committerDana Powers <dana.powers@gmail.com>2018-02-16 12:15:11 -0800
commit1d89ac72f5efc4ef9b9746a41c94d45519c8f065 (patch)
tree6d7b841534bd971f60c0955b9ae40c1ce6e958e7
parent3fb09697146d7e0f03672f75cb9724a2e0b7af4f (diff)
downloadkafka-python-kafka-snake.tar.gz
Scratching on __main__kafka-snake
-rw-r--r--kafka/__main__.py104
1 files changed, 104 insertions, 0 deletions
diff --git a/kafka/__main__.py b/kafka/__main__.py
new file mode 100644
index 0000000..d22fed1
--- /dev/null
+++ b/kafka/__main__.py
@@ -0,0 +1,104 @@
+from __future__ import absolute_import, print_function
+
+import argparse
+import operator
+from pprint import pprint
+import sys
+import time
+
+from kafka import KafkaConsumer, KafkaProducer
+from kafka.client_async import KafkaClient
+from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment
+from kafka.errors import for_code as error_for_code
+from kafka.protocol.admin import ListGroupsRequest, DescribeGroupsRequest
+from kafka.protocol.commit import GroupCoordinatorRequest
+
+def print_padded(data):
+ fmt = []
+ for i in range(len(data[0])):
+ pad = max([len(str(operator.getitem(row, i))) for row in data])
+ fmt.append('{%d:<%d}' % (i, pad))
+ fmt_str = ' '.join(fmt)
+ for row in data:
+ print(fmt_str.format(*row))
+
+
+def check_future(future):
+ if future.failed():
+ print(future.exception)
+ sys.exit(getattr(future.exception, 'errno', 128))
+ elif getattr(future.value, 'error_code', 0) != 0:
+ print(error_for_code(future.value.error_code))
+ sys.exit(future.value.error_code)
+
+
+parser = argparse.ArgumentParser()
+parser.add_argument('--server', required=True, action='append')
+parser.add_argument('--list-topics', action='store_true')
+parser.add_argument('--list-groups', action='store_true')
+parser.add_argument('--describe-group')
+
+args = parser.parse_args()
+
+cli = KafkaClient(bootstrap_servers=args.server)
+
+if args.list_topics:
+ cli.cluster.need_all_topic_metadata = True
+ metadata = cli.poll(future=cli.cluster.request_update())[0]
+
+ topics = [topic for _, topic, _, _ in metadata.topics]
+ for topic in sorted(topics):
+ print(topic)
+
+
+elif args.list_groups:
+ req = ListGroupsRequest[0]()
+
+ # start connections to all brokers
+ [cli.ready(broker.nodeId) for broker in cli.cluster.brokers()]
+
+ results = [('GROUP', 'TYPE', 'NODE', 'HOST')]
+ for broker in cli.cluster.brokers():
+ while not cli.ready(broker.nodeId):
+ time.sleep(0.1)
+ future = cli.send(broker.nodeId, req)
+ cli.poll(future=future)
+ check_future(future)
+ for group_name, group_type in future.value.groups:
+ results.append((group_name, group_type, broker.nodeId, broker.host))
+ print_padded(results[0:1] + sorted(results[1:]))
+
+
+elif args.describe_group:
+ req = GroupCoordinatorRequest[0](args.describe_group)
+ node_id = cli.least_loaded_node()
+ future = cli.send(node_id, req)
+ cli.poll(future=future)
+ check_future(future)
+
+ req = DescribeGroupsRequest[0]([args.describe_group])
+ coordinator_id = future.value.coordinator_id
+ while not cli.ready(coordinator_id):
+ time.sleep(0.1)
+ future = cli.send(coordinator_id, req)
+ cli.poll(future=future)
+ check_future(future)
+
+ for error_code, group, state, protocol_type, protocol, members in future.value.groups:
+ results = [('GROUP', 'STATE', 'TYPE', 'PROTOCOL', 'MEMBERS', 'ERRORCODE')]
+ results.append((group, state, protocol_type, protocol, len(members), error_code))
+ print_padded(results)
+ print()
+ results = [('MEMBER', 'CLIENT', 'HOST', 'VERSION', 'SUBSCRIPTION', 'ASSIGNMENT')]
+ for member_id, client_id, client_host, metadata_bytes, assignment_bytes in sorted(members):
+ metadata = ConsumerProtocolMemberMetadata.decode(metadata_bytes)
+ assignment = ConsumerProtocolMemberAssignment.decode(assignment_bytes)
+ assigned = {}
+ for topic, partitions in assignment.assignment:
+ assigned[topic] = partitions
+
+ # Print a new row per client - subscribed topic
+ for topic in metadata.subscription:
+ results.append((member_id, client_id, client_host, metadata.version,
+ topic, assigned.get(topic, [])))
+ print_padded(results[0:1] + sorted(results[1:]))