summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorMahendra M <mahendra.m@gmail.com>2013-10-08 08:50:52 +0530
committerMahendra M <mahendra.m@gmail.com>2013-10-08 08:50:52 +0530
commitf9cf62816ff2c2255d414a2d9f3dd32d8c81418b (patch)
treeb43b90fcdaaef0839329b20a02c79f8229773b26 /test
parent75de0f00956eb7cf0394fcfabb6a7d63057409fe (diff)
parenteb2c1735f26ce11540fb92ea94817f43b9b3a798 (diff)
downloadkafka-python-f9cf62816ff2c2255d414a2d9f3dd32d8c81418b.tar.gz
Merge branch 'master' into prod-windows
Conflicts: kafka/producer.py
Diffstat (limited to 'test')
-rw-r--r--test/fixtures.py25
-rw-r--r--test/resources/kafka.properties3
-rw-r--r--test/test_integration.py107
3 files changed, 124 insertions, 11 deletions
diff --git a/test/fixtures.py b/test/fixtures.py
index 00c1afd..946c64f 100644
--- a/test/fixtures.py
+++ b/test/fixtures.py
@@ -242,7 +242,7 @@ class ZookeeperFixture(object):
class KafkaFixture(object):
@staticmethod
- def instance(broker_id, zk_host, zk_port, zk_chroot=None):
+ def instance(broker_id, zk_host, zk_port, zk_chroot=None, replicas=1, partitions=2):
if zk_chroot is None:
zk_chroot = "kafka-python_" + str(uuid.uuid4()).replace("-", "_")
if "KAFKA_URI" in os.environ:
@@ -251,11 +251,11 @@ class KafkaFixture(object):
fixture = ExternalService(host, port)
else:
(host, port) = ("127.0.0.1", get_open_port())
- fixture = KafkaFixture(host, port, broker_id, zk_host, zk_port, zk_chroot)
+ fixture = KafkaFixture(host, port, broker_id, zk_host, zk_port, zk_chroot, replicas, partitions)
fixture.open()
return fixture
- def __init__(self, host, port, broker_id, zk_host, zk_port, zk_chroot):
+ def __init__(self, host, port, broker_id, zk_host, zk_port, zk_chroot, replicas=1, partitions=2):
self.host = host
self.port = port
@@ -265,19 +265,24 @@ class KafkaFixture(object):
self.zk_port = zk_port
self.zk_chroot = zk_chroot
+ self.replicas = replicas
+ self.partitions = partitions
+
self.tmp_dir = None
self.child = None
def open(self):
self.tmp_dir = tempfile.mkdtemp()
print("*** Running local Kafka instance")
- print(" host = %s" % self.host)
- print(" port = %s" % self.port)
- print(" broker_id = %s" % self.broker_id)
- print(" zk_host = %s" % self.zk_host)
- print(" zk_port = %s" % self.zk_port)
- print(" zk_chroot = %s" % self.zk_chroot)
- print(" tmp_dir = %s" % self.tmp_dir)
+ print(" host = %s" % self.host)
+ print(" port = %s" % self.port)
+ print(" broker_id = %s" % self.broker_id)
+ print(" zk_host = %s" % self.zk_host)
+ print(" zk_port = %s" % self.zk_port)
+ print(" zk_chroot = %s" % self.zk_chroot)
+ print(" replicas = %s" % self.replicas)
+ print(" partitions = %s" % self.partitions)
+ print(" tmp_dir = %s" % self.tmp_dir)
# Create directories
os.mkdir(os.path.join(self.tmp_dir, "logs"))
diff --git a/test/resources/kafka.properties b/test/resources/kafka.properties
index d42c097..f8732fb 100644
--- a/test/resources/kafka.properties
+++ b/test/resources/kafka.properties
@@ -32,7 +32,8 @@ socket.request.max.bytes=104857600
############################# Log Basics #############################
log.dir={tmp_dir}/data
-num.partitions=2
+num.partitions={partitions}
+default.replication.factor={replicas}
############################# Log Flush Policy #############################
diff --git a/test/test_integration.py b/test/test_integration.py
index d8ead59..a10dae2 100644
--- a/test/test_integration.py
+++ b/test/test_integration.py
@@ -770,6 +770,113 @@ class TestConsumer(unittest.TestCase):
self.assertEquals(all_messages[i], message.message)
self.assertEquals(i, 19)
+class TestFailover(unittest.TestCase):
+
+ @classmethod
+ def setUpClass(cls):
+
+ zk_chroot = random_string(10)
+ replicas = 2
+ partitions = 2
+
+ # mini zookeeper, 2 kafka brokers
+ cls.zk = ZookeeperFixture.instance()
+ kk_args = [cls.zk.host, cls.zk.port, zk_chroot, replicas, partitions]
+ cls.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)]
+ cls.client = KafkaClient(cls.brokers[0].host, cls.brokers[0].port)
+
+ @classmethod
+ def tearDownClass(cls):
+ cls.client.close()
+ for broker in cls.brokers:
+ broker.close()
+ cls.zk.close()
+
+ def test_switch_leader(self):
+
+ key, topic, partition = random_string(5), 'test_switch_leader', 0
+ producer = SimpleProducer(self.client, topic)
+
+ for i in range(1, 4):
+
+ # XXX unfortunately, the conns dict needs to be warmed for this to work
+ # XXX unfortunately, for warming to work, we need at least as many partitions as brokers
+ self._send_random_messages(producer, 10)
+
+ # kil leader for partition 0
+ broker = self._kill_leader(topic, partition)
+
+ # expect failure, reload meta data
+ with self.assertRaises(FailedPayloadsException):
+ producer.send_messages('part 1')
+ producer.send_messages('part 2')
+ time.sleep(1)
+
+ # send to new leader
+ self._send_random_messages(producer, 10)
+
+ broker.open()
+ time.sleep(3)
+
+ # count number of messages
+ count = self._count_messages('test_switch_leader group %s' % i, topic)
+ self.assertIn(count, range(20 * i, 22 * i + 1))
+
+ producer.stop()
+
+ def test_switch_leader_async(self):
+
+ key, topic, partition = random_string(5), 'test_switch_leader_async', 0
+ producer = SimpleProducer(self.client, topic, async=True)
+
+ for i in range(1, 4):
+
+ self._send_random_messages(producer, 10)
+
+ # kil leader for partition 0
+ broker = self._kill_leader(topic, partition)
+
+ # expect failure, reload meta data
+ producer.send_messages('part 1')
+ producer.send_messages('part 2')
+ time.sleep(1)
+
+ # send to new leader
+ self._send_random_messages(producer, 10)
+
+ broker.open()
+ time.sleep(3)
+
+ # count number of messages
+ count = self._count_messages('test_switch_leader_async group %s' % i, topic)
+ self.assertIn(count, range(20 * i, 22 * i + 1))
+
+ producer.stop()
+
+ def _send_random_messages(self, producer, n):
+ for j in range(n):
+ resp = producer.send_messages(random_string(10))
+ if len(resp) > 0:
+ self.assertEquals(resp[0].error, 0)
+ time.sleep(1) # give it some time
+
+ def _kill_leader(self, topic, partition):
+ leader = self.client.topics_to_brokers[TopicAndPartition(topic, partition)]
+ broker = self.brokers[leader.nodeId]
+ broker.close()
+ time.sleep(1) # give it some time
+ return broker
+
+ def _count_messages(self, group, topic):
+ client = KafkaClient(self.brokers[0].host, self.brokers[0].port)
+ consumer = SimpleConsumer(client, group, topic, auto_commit=False)
+ all_messages = []
+ for message in consumer:
+ all_messages.append(message)
+ consumer.stop()
+ client.close()
+ return len(all_messages)
+
def random_string(l):
s = "".join(random.choice(string.letters) for i in xrange(l))