1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
|
import os
import time
import unittest
from kafka import * # noqa
from kafka.common import * # noqa
from fixtures import ZookeeperFixture, KafkaFixture
from testutil import *
class TestFailover(KafkaIntegrationTestCase):
create_client = False
@classmethod
def setUpClass(cls): # noqa
if not os.environ.get('KAFKA_VERSION'):
return
zk_chroot = random_string(10)
replicas = 2
partitions = 2
# mini zookeeper, 2 kafka brokers
cls.zk = ZookeeperFixture.instance()
kk_args = [cls.zk.host, cls.zk.port, zk_chroot, replicas, partitions]
cls.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)]
hosts = ['%s:%d' % (b.host, b.port) for b in cls.brokers]
cls.client = KafkaClient(hosts)
@classmethod
def tearDownClass(cls):
if not os.environ.get('KAFKA_VERSION'):
return
cls.client.close()
for broker in cls.brokers:
broker.close()
cls.zk.close()
@kafka_versions("all")
def test_switch_leader(self):
key, topic, partition = random_string(5), self.topic, 0
producer = SimpleProducer(self.client)
for i in range(1, 4):
# XXX unfortunately, the conns dict needs to be warmed for this to work
# XXX unfortunately, for warming to work, we need at least as many partitions as brokers
self._send_random_messages(producer, self.topic, 10)
# kil leader for partition 0
broker = self._kill_leader(topic, partition)
# expect failure, reload meta data
with self.assertRaises(FailedPayloadsError):
producer.send_messages(self.topic, 'part 1')
producer.send_messages(self.topic, 'part 2')
time.sleep(1)
# send to new leader
self._send_random_messages(producer, self.topic, 10)
broker.open()
time.sleep(3)
# count number of messages
count = self._count_messages('test_switch_leader group %s' % i, topic)
self.assertIn(count, range(20 * i, 22 * i + 1))
producer.stop()
@kafka_versions("all")
def test_switch_leader_async(self):
key, topic, partition = random_string(5), self.topic, 0
producer = SimpleProducer(self.client, async=True)
for i in range(1, 4):
self._send_random_messages(producer, self.topic, 10)
# kil leader for partition 0
broker = self._kill_leader(topic, partition)
# expect failure, reload meta data
producer.send_messages(self.topic, 'part 1')
producer.send_messages(self.topic, 'part 2')
time.sleep(1)
# send to new leader
self._send_random_messages(producer, self.topic, 10)
broker.open()
time.sleep(3)
# count number of messages
count = self._count_messages('test_switch_leader_async group %s' % i, topic)
self.assertIn(count, range(20 * i, 22 * i + 1))
producer.stop()
def _send_random_messages(self, producer, topic, n):
for j in range(n):
resp = producer.send_messages(topic, random_string(10))
if len(resp) > 0:
self.assertEquals(resp[0].error, 0)
time.sleep(1) # give it some time
def _kill_leader(self, topic, partition):
leader = self.client.topics_to_brokers[TopicAndPartition(topic, partition)]
broker = self.brokers[leader.nodeId]
broker.close()
time.sleep(1) # give it some time
return broker
def _count_messages(self, group, topic):
hosts = '%s:%d' % (self.brokers[0].host, self.brokers[0].port)
client = KafkaClient(hosts)
consumer = SimpleConsumer(client, group, topic, auto_commit=False, iter_timeout=0)
all_messages = []
for message in consumer:
all_messages.append(message)
consumer.stop()
client.close()
return len(all_messages)
|