summaryrefslogtreecommitdiff
path: root/kafka/__main__.py
blob: d22fed16812ea7820176ec9d9ba0fbc97a074783 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
from __future__ import absolute_import, print_function

import argparse
import operator
from pprint import pprint
import sys
import time

from kafka import KafkaConsumer, KafkaProducer
from kafka.client_async import KafkaClient
from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment
from kafka.errors import for_code as error_for_code
from kafka.protocol.admin import ListGroupsRequest, DescribeGroupsRequest
from kafka.protocol.commit import GroupCoordinatorRequest

def print_padded(data):
    fmt = []
    for i in range(len(data[0])):
        pad = max([len(str(operator.getitem(row, i))) for row in data])
        fmt.append('{%d:<%d}' % (i, pad))
    fmt_str = '   '.join(fmt)
    for row in data:
        print(fmt_str.format(*row))


def check_future(future):
    if future.failed():
        print(future.exception)
        sys.exit(getattr(future.exception, 'errno', 128))
    elif getattr(future.value, 'error_code', 0) != 0:
        print(error_for_code(future.value.error_code))
        sys.exit(future.value.error_code)


parser = argparse.ArgumentParser()
parser.add_argument('--server', required=True, action='append')
parser.add_argument('--list-topics', action='store_true')
parser.add_argument('--list-groups', action='store_true')
parser.add_argument('--describe-group')

args = parser.parse_args()

cli = KafkaClient(bootstrap_servers=args.server)

if args.list_topics:
    cli.cluster.need_all_topic_metadata = True
    metadata = cli.poll(future=cli.cluster.request_update())[0]

    topics = [topic for _, topic, _, _ in metadata.topics]
    for topic in sorted(topics):
        print(topic)


elif args.list_groups:
    req = ListGroupsRequest[0]()

    # start connections to all brokers
    [cli.ready(broker.nodeId) for broker in cli.cluster.brokers()]

    results = [('GROUP', 'TYPE', 'NODE', 'HOST')]
    for broker in cli.cluster.brokers():
        while not cli.ready(broker.nodeId):
            time.sleep(0.1)
        future = cli.send(broker.nodeId, req)
        cli.poll(future=future)
        check_future(future)
        for group_name, group_type in future.value.groups:
            results.append((group_name, group_type, broker.nodeId, broker.host))
    print_padded(results[0:1] + sorted(results[1:]))


elif args.describe_group:
    req = GroupCoordinatorRequest[0](args.describe_group)
    node_id = cli.least_loaded_node()
    future = cli.send(node_id, req)
    cli.poll(future=future)
    check_future(future)

    req = DescribeGroupsRequest[0]([args.describe_group])
    coordinator_id = future.value.coordinator_id
    while not cli.ready(coordinator_id):
        time.sleep(0.1)
    future = cli.send(coordinator_id, req)
    cli.poll(future=future)
    check_future(future)

    for error_code, group, state, protocol_type, protocol, members in future.value.groups:
        results = [('GROUP', 'STATE', 'TYPE', 'PROTOCOL', 'MEMBERS', 'ERRORCODE')]
        results.append((group, state, protocol_type, protocol, len(members), error_code))
        print_padded(results)
        print()
        results = [('MEMBER', 'CLIENT', 'HOST', 'VERSION', 'SUBSCRIPTION', 'ASSIGNMENT')]
        for member_id, client_id, client_host, metadata_bytes, assignment_bytes in sorted(members):
            metadata = ConsumerProtocolMemberMetadata.decode(metadata_bytes)
            assignment = ConsumerProtocolMemberAssignment.decode(assignment_bytes)
            assigned = {}
            for topic, partitions in assignment.assignment:
                assigned[topic] = partitions

            # Print a new row per client - subscribed topic
            for topic in metadata.subscription:
                results.append((member_id, client_id, client_host, metadata.version,
                                topic, assigned.get(topic, [])))
        print_padded(results[0:1] + sorted(results[1:]))