1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
|
from __future__ import absolute_import, print_function
import argparse
import operator
from pprint import pprint
import sys
import time
from kafka import KafkaConsumer, KafkaProducer
from kafka.client_async import KafkaClient
from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment
from kafka.errors import for_code as error_for_code
from kafka.protocol.admin import ListGroupsRequest, DescribeGroupsRequest
from kafka.protocol.commit import GroupCoordinatorRequest
def print_padded(data):
fmt = []
for i in range(len(data[0])):
pad = max([len(str(operator.getitem(row, i))) for row in data])
fmt.append('{%d:<%d}' % (i, pad))
fmt_str = ' '.join(fmt)
for row in data:
print(fmt_str.format(*row))
def check_future(future):
if future.failed():
print(future.exception)
sys.exit(getattr(future.exception, 'errno', 128))
elif getattr(future.value, 'error_code', 0) != 0:
print(error_for_code(future.value.error_code))
sys.exit(future.value.error_code)
parser = argparse.ArgumentParser()
parser.add_argument('--server', required=True, action='append')
parser.add_argument('--list-topics', action='store_true')
parser.add_argument('--list-groups', action='store_true')
parser.add_argument('--describe-group')
args = parser.parse_args()
cli = KafkaClient(bootstrap_servers=args.server)
if args.list_topics:
cli.cluster.need_all_topic_metadata = True
metadata = cli.poll(future=cli.cluster.request_update())[0]
topics = [topic for _, topic, _, _ in metadata.topics]
for topic in sorted(topics):
print(topic)
elif args.list_groups:
req = ListGroupsRequest[0]()
# start connections to all brokers
[cli.ready(broker.nodeId) for broker in cli.cluster.brokers()]
results = [('GROUP', 'TYPE', 'NODE', 'HOST')]
for broker in cli.cluster.brokers():
while not cli.ready(broker.nodeId):
time.sleep(0.1)
future = cli.send(broker.nodeId, req)
cli.poll(future=future)
check_future(future)
for group_name, group_type in future.value.groups:
results.append((group_name, group_type, broker.nodeId, broker.host))
print_padded(results[0:1] + sorted(results[1:]))
elif args.describe_group:
req = GroupCoordinatorRequest[0](args.describe_group)
node_id = cli.least_loaded_node()
future = cli.send(node_id, req)
cli.poll(future=future)
check_future(future)
req = DescribeGroupsRequest[0]([args.describe_group])
coordinator_id = future.value.coordinator_id
while not cli.ready(coordinator_id):
time.sleep(0.1)
future = cli.send(coordinator_id, req)
cli.poll(future=future)
check_future(future)
for error_code, group, state, protocol_type, protocol, members in future.value.groups:
results = [('GROUP', 'STATE', 'TYPE', 'PROTOCOL', 'MEMBERS', 'ERRORCODE')]
results.append((group, state, protocol_type, protocol, len(members), error_code))
print_padded(results)
print()
results = [('MEMBER', 'CLIENT', 'HOST', 'VERSION', 'SUBSCRIPTION', 'ASSIGNMENT')]
for member_id, client_id, client_host, metadata_bytes, assignment_bytes in sorted(members):
metadata = ConsumerProtocolMemberMetadata.decode(metadata_bytes)
assignment = ConsumerProtocolMemberAssignment.decode(assignment_bytes)
assigned = {}
for topic, partitions in assignment.assignment:
assigned[topic] = partitions
# Print a new row per client - subscribed topic
for topic in metadata.subscription:
results.append((member_id, client_id, client_host, metadata.version,
topic, assigned.get(topic, [])))
print_padded(results[0:1] + sorted(results[1:]))
|