1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
|
# pylint: skip-file
from __future__ import absolute_import
import pytest
import itertools
from collections import OrderedDict
from kafka.client_async import KafkaClient
from kafka.consumer.fetcher import Fetcher, NoOffsetForPartitionError
from kafka.consumer.subscription_state import SubscriptionState
from kafka.metrics import Metrics
from kafka.protocol.fetch import FetchRequest
from kafka.protocol.offset import OffsetResponse
from kafka.structs import TopicPartition
from kafka.future import Future
from kafka.errors import (
StaleMetadata, LeaderNotAvailableError, NotLeaderForPartitionError,
UnknownTopicOrPartitionError
)
@pytest.fixture
def client(mocker):
return mocker.Mock(spec=KafkaClient(bootstrap_servers=[], api_version=(0, 9)))
@pytest.fixture
def subscription_state():
return SubscriptionState()
@pytest.fixture
def fetcher(client, subscription_state):
subscription_state.subscribe(topics=['foobar'])
assignment = [TopicPartition('foobar', i) for i in range(3)]
subscription_state.assign_from_subscribed(assignment)
for tp in assignment:
subscription_state.seek(tp, 0)
return Fetcher(client, subscription_state, Metrics())
def test_send_fetches(fetcher, mocker):
fetch_requests = [
FetchRequest[0](
-1, fetcher.config['fetch_max_wait_ms'],
fetcher.config['fetch_min_bytes'],
[('foobar', [
(0, 0, fetcher.config['max_partition_fetch_bytes']),
(1, 0, fetcher.config['max_partition_fetch_bytes']),
])]),
FetchRequest[0](
-1, fetcher.config['fetch_max_wait_ms'],
fetcher.config['fetch_min_bytes'],
[('foobar', [
(2, 0, fetcher.config['max_partition_fetch_bytes']),
])])
]
mocker.patch.object(fetcher, '_create_fetch_requests',
return_value=dict(enumerate(fetch_requests)))
ret = fetcher.send_fetches()
for node, request in enumerate(fetch_requests):
fetcher._client.send.assert_any_call(node, request)
assert len(ret) == len(fetch_requests)
@pytest.mark.parametrize(("api_version", "fetch_version"), [
((0, 10, 1), 3),
((0, 10, 0), 2),
((0, 9), 1),
((0, 8), 0)
])
def test_create_fetch_requests(fetcher, mocker, api_version, fetch_version):
fetcher._client.in_flight_request_count.return_value = 0
fetcher.config['api_version'] = api_version
by_node = fetcher._create_fetch_requests()
requests = by_node.values()
assert all([isinstance(r, FetchRequest[fetch_version]) for r in requests])
def test_update_fetch_positions(fetcher, mocker):
mocker.patch.object(fetcher, '_reset_offset')
partition = TopicPartition('foobar', 0)
# unassigned partition
fetcher.update_fetch_positions([TopicPartition('fizzbuzz', 0)])
assert fetcher._reset_offset.call_count == 0
# fetchable partition (has offset, not paused)
fetcher.update_fetch_positions([partition])
assert fetcher._reset_offset.call_count == 0
# partition needs reset, no committed offset
fetcher._subscriptions.need_offset_reset(partition)
fetcher._subscriptions.assignment[partition].awaiting_reset = False
fetcher.update_fetch_positions([partition])
fetcher._reset_offset.assert_called_with(partition)
assert fetcher._subscriptions.assignment[partition].awaiting_reset is True
fetcher.update_fetch_positions([partition])
fetcher._reset_offset.assert_called_with(partition)
# partition needs reset, has committed offset
fetcher._reset_offset.reset_mock()
fetcher._subscriptions.need_offset_reset(partition)
fetcher._subscriptions.assignment[partition].awaiting_reset = False
fetcher._subscriptions.assignment[partition].committed = 123
mocker.patch.object(fetcher._subscriptions, 'seek')
fetcher.update_fetch_positions([partition])
assert fetcher._reset_offset.call_count == 0
fetcher._subscriptions.seek.assert_called_with(partition, 123)
def test__reset_offset(fetcher, mocker):
tp = TopicPartition("topic", 0)
fetcher._subscriptions.subscribe(topics="topic")
fetcher._subscriptions.assign_from_subscribed([tp])
fetcher._subscriptions.need_offset_reset(tp)
mocked = mocker.patch.object(fetcher, '_retrieve_offsets')
mocked.return_value = {}
with pytest.raises(NoOffsetForPartitionError):
fetcher._reset_offset(tp)
mocked.return_value = {tp: (1001, None)}
fetcher._reset_offset(tp)
assert not fetcher._subscriptions.assignment[tp].awaiting_reset
assert fetcher._subscriptions.assignment[tp].position == 1001
def test__send_offset_requests(fetcher, mocker):
tp = TopicPartition("topic_send_offset", 1)
mocked_send = mocker.patch.object(fetcher, "_send_offset_request")
send_futures = []
def send_side_effect(*args, **kw):
f = Future()
send_futures.append(f)
return f
mocked_send.side_effect = send_side_effect
mocked_leader = mocker.patch.object(
fetcher._client.cluster, "leader_for_partition")
# First we report unavailable leader 2 times different ways and later
# always as available
mocked_leader.side_effect = itertools.chain(
[None, -1], itertools.cycle([0]))
# Leader == None
fut = fetcher._send_offset_requests({tp: 0})
assert fut.failed()
assert isinstance(fut.exception, StaleMetadata)
assert not mocked_send.called
# Leader == -1
fut = fetcher._send_offset_requests({tp: 0})
assert fut.failed()
assert isinstance(fut.exception, LeaderNotAvailableError)
assert not mocked_send.called
# Leader == 0, send failed
fut = fetcher._send_offset_requests({tp: 0})
assert not fut.is_done
assert mocked_send.called
# Check that we bound the futures correctly to chain failure
send_futures.pop().failure(NotLeaderForPartitionError(tp))
assert fut.failed()
assert isinstance(fut.exception, NotLeaderForPartitionError)
# Leader == 0, send success
fut = fetcher._send_offset_requests({tp: 0})
assert not fut.is_done
assert mocked_send.called
# Check that we bound the futures correctly to chain success
send_futures.pop().success({tp: (10, 10000)})
assert fut.succeeded()
assert fut.value == {tp: (10, 10000)}
def test__send_offset_requests_multiple_nodes(fetcher, mocker):
tp1 = TopicPartition("topic_send_offset", 1)
tp2 = TopicPartition("topic_send_offset", 2)
tp3 = TopicPartition("topic_send_offset", 3)
tp4 = TopicPartition("topic_send_offset", 4)
mocked_send = mocker.patch.object(fetcher, "_send_offset_request")
send_futures = []
def send_side_effect(node_id, timestamps):
f = Future()
send_futures.append((node_id, timestamps, f))
return f
mocked_send.side_effect = send_side_effect
mocked_leader = mocker.patch.object(
fetcher._client.cluster, "leader_for_partition")
mocked_leader.side_effect = itertools.cycle([0, 1])
# -- All node succeeded case
tss = OrderedDict([(tp1, 0), (tp2, 0), (tp3, 0), (tp4, 0)])
fut = fetcher._send_offset_requests(tss)
assert not fut.is_done
assert mocked_send.call_count == 2
req_by_node = {}
second_future = None
for node, timestamps, f in send_futures:
req_by_node[node] = timestamps
if node == 0:
# Say tp3 does not have any messages so it's missing
f.success({tp1: (11, 1001)})
else:
second_future = f
assert req_by_node == {
0: {tp1: 0, tp3: 0},
1: {tp2: 0, tp4: 0}
}
# We only resolved 1 future so far, so result future is not yet ready
assert not fut.is_done
second_future.success({tp2: (12, 1002), tp4: (14, 1004)})
assert fut.succeeded()
assert fut.value == {tp1: (11, 1001), tp2: (12, 1002), tp4: (14, 1004)}
# -- First succeeded second not
del send_futures[:]
fut = fetcher._send_offset_requests(tss)
assert len(send_futures) == 2
send_futures[0][2].success({tp1: (11, 1001)})
send_futures[1][2].failure(UnknownTopicOrPartitionError(tp1))
assert fut.failed()
assert isinstance(fut.exception, UnknownTopicOrPartitionError)
# -- First fails second succeeded
del send_futures[:]
fut = fetcher._send_offset_requests(tss)
assert len(send_futures) == 2
send_futures[0][2].failure(UnknownTopicOrPartitionError(tp1))
send_futures[1][2].success({tp1: (11, 1001)})
assert fut.failed()
assert isinstance(fut.exception, UnknownTopicOrPartitionError)
def test__handle_offset_response(fetcher, mocker):
# Broker returns UnsupportedForMessageFormatError, will omit partition
fut = Future()
res = OffsetResponse[1]([
("topic", [(0, 43, -1, -1)]),
("topic", [(1, 0, 1000, 9999)])
])
fetcher._handle_offset_response(fut, res)
assert fut.succeeded()
assert fut.value == {TopicPartition("topic", 1): (9999, 1000)}
# Broker returns NotLeaderForPartitionError
fut = Future()
res = OffsetResponse[1]([
("topic", [(0, 6, -1, -1)]),
])
fetcher._handle_offset_response(fut, res)
assert fut.failed()
assert isinstance(fut.exception, NotLeaderForPartitionError)
# Broker returns UnknownTopicOrPartitionError
fut = Future()
res = OffsetResponse[1]([
("topic", [(0, 3, -1, -1)]),
])
fetcher._handle_offset_response(fut, res)
assert fut.failed()
assert isinstance(fut.exception, UnknownTopicOrPartitionError)
# Broker returns many errors and 1 result
# Will fail on 1st error and return
fut = Future()
res = OffsetResponse[1]([
("topic", [(0, 43, -1, -1)]),
("topic", [(1, 6, -1, -1)]),
("topic", [(2, 3, -1, -1)]),
("topic", [(3, 0, 1000, 9999)])
])
fetcher._handle_offset_response(fut, res)
assert fut.failed()
assert isinstance(fut.exception, NotLeaderForPartitionError)
|