summaryrefslogtreecommitdiff
path: root/test/test_client_async.py
blob: aa8ff114ee8805d6e71507753e1536d5319ab069 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127

import pytest

from kafka.client_async import KafkaClient
from kafka.common import BrokerMetadata
from kafka.conn import ConnectionStates
from kafka.future import Future
from kafka.protocol.metadata import MetadataResponse, MetadataRequest


@pytest.mark.parametrize("bootstrap,expected_hosts", [
    (None, [('localhost', 9092)]),
    ('foobar:1234', [('foobar', 1234)]),
    ('fizzbuzz', [('fizzbuzz', 9092)]),
    ('foo:12,bar:34', [('foo', 12), ('bar', 34)]),
    (['fizz:56', 'buzz'], [('fizz', 56), ('buzz', 9092)]),
])
def test_bootstrap_servers(mocker, bootstrap, expected_hosts):
    mocker.patch.object(KafkaClient, '_bootstrap')
    if bootstrap is None:
        KafkaClient()
    else:
        KafkaClient(bootstrap_servers=bootstrap)

    # host order is randomized internally, so resort before testing
    (hosts,), _ = KafkaClient._bootstrap.call_args  # pylint: disable=no-member
    assert sorted(hosts) == sorted(expected_hosts)


@pytest.fixture
def conn(mocker):
    conn = mocker.patch('kafka.client_async.BrokerConnection')
    conn.return_value = conn
    conn.state = ConnectionStates.CONNECTED
    conn.send.return_value = Future().success(
        MetadataResponse(
            [(0, 'foo', 12), (1, 'bar', 34)],  # brokers
            []))  # topics
    return conn


def test_bootstrap_success(conn):
    conn.state = ConnectionStates.CONNECTED
    cli = KafkaClient()
    conn.assert_called_once_with('localhost', 9092, **cli.config)
    conn.connect.assert_called_with()
    conn.send.assert_called_once_with(MetadataRequest([]))
    assert cli._bootstrap_fails == 0
    assert cli.cluster.brokers() == set([BrokerMetadata(0, 'foo', 12),
                                         BrokerMetadata(1, 'bar', 34)])

def test_bootstrap_failure(conn):
    conn.state = ConnectionStates.DISCONNECTED
    cli = KafkaClient()
    conn.assert_called_once_with('localhost', 9092, **cli.config)
    conn.connect.assert_called_with()
    conn.close.assert_called_with()
    assert cli._bootstrap_fails == 1
    assert cli.cluster.brokers() == set()


def test_can_connect():
    pass


def test_initiate_connect():
    pass


def test_finish_connect():
    pass


def test_ready():
    pass


def test_close():
    pass


def test_is_disconnected():
    pass


def test_is_ready():
    pass


def test_can_send_request():
    pass


def test_send():
    pass


def test_poll():
    pass


def test__poll():
    pass


def test_in_flight_request_count():
    pass


def test_least_loaded_node():
    pass


def test_set_topics():
    pass


def test_maybe_refresh_metadata():
    pass


def test_schedule():
    pass


def test_unschedule():
    pass