diff options
author | David Arthur <mumrah@gmail.com> | 2013-07-11 05:58:54 -0700 |
---|---|---|
committer | David Arthur <mumrah@gmail.com> | 2013-07-11 05:58:54 -0700 |
commit | 5684af438e6cf871540aa8ea8b556737f56e9798 (patch) | |
tree | f9bb1f6046943946236ceae61d9266a8c14bdcfe /test/test_integration.py | |
parent | ffdc08aeec040862d522914a480c135626a19e69 (diff) | |
parent | d2df8f54637490b1dbe858066b74710b57186016 (diff) | |
download | kafka-python-5684af438e6cf871540aa8ea8b556737f56e9798.tar.gz |
Merge pull request #33 from mahendra/asyncproducer
Support for async producer
Merged locally, tests pass, +1
Diffstat (limited to 'test/test_integration.py')
-rw-r--r-- | test/test_integration.py | 277 |
1 files changed, 274 insertions, 3 deletions
diff --git a/test/test_integration.py b/test/test_integration.py index d607b73..6d96b6a 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -1,5 +1,6 @@ import logging import unittest +import time from kafka import * # noqa from kafka.common import * # noqa @@ -254,12 +255,23 @@ class TestKafkaClient(unittest.TestCase): def test_simple_producer(self): producer = SimpleProducer(self.client, "test_simple_producer") - producer.send_messages("one", "two") - producer.send_messages("three") + resp = producer.send_messages("one", "two") + + # Will go to partition 0 + self.assertEquals(len(resp), 1) + self.assertEquals(resp[0].error, 0) + self.assertEquals(resp[0].offset, 0) # offset of first msg + + # Will go to partition 1 + resp = producer.send_messages("three") + self.assertEquals(len(resp), 1) + self.assertEquals(resp[0].error, 0) + self.assertEquals(resp[0].offset, 0) # offset of first msg fetch1 = FetchRequest("test_simple_producer", 0, 0, 1024) fetch2 = FetchRequest("test_simple_producer", 1, 0, 1024) - fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, fetch2]) + fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, + fetch2]) self.assertEquals(fetch_resp1.error, 0) self.assertEquals(fetch_resp1.highwaterMark, 2) messages = list(fetch_resp1.messages) @@ -272,6 +284,265 @@ class TestKafkaClient(unittest.TestCase): self.assertEquals(len(messages), 1) self.assertEquals(messages[0].message.value, "three") + # Will go to partition 0 + resp = producer.send_messages("four", "five") + self.assertEquals(len(resp), 1) + self.assertEquals(resp[0].error, 0) + self.assertEquals(resp[0].offset, 2) # offset of first msg + + producer.stop() + + def test_round_robin_partitioner(self): + producer = KeyedProducer(self.client, "test_round_robin_partitioner", + partitioner=RoundRobinPartitioner) + producer.send("key1", "one") + producer.send("key2", "two") + producer.send("key3", "three") + producer.send("key4", "four") + + fetch1 = FetchRequest("test_round_robin_partitioner", 0, 0, 1024) + fetch2 = FetchRequest("test_round_robin_partitioner", 1, 0, 1024) + + fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, + fetch2]) + + self.assertEquals(fetch_resp1.error, 0) + self.assertEquals(fetch_resp1.highwaterMark, 2) + self.assertEquals(fetch_resp1.partition, 0) + + messages = list(fetch_resp1.messages) + self.assertEquals(len(messages), 2) + self.assertEquals(messages[0].message.value, "one") + self.assertEquals(messages[1].message.value, "three") + + self.assertEquals(fetch_resp2.error, 0) + self.assertEquals(fetch_resp2.highwaterMark, 2) + self.assertEquals(fetch_resp2.partition, 1) + + messages = list(fetch_resp2.messages) + self.assertEquals(len(messages), 2) + self.assertEquals(messages[0].message.value, "two") + self.assertEquals(messages[1].message.value, "four") + + producer.stop() + + def test_hashed_partitioner(self): + producer = KeyedProducer(self.client, "test_hash_partitioner", + partitioner=HashedPartitioner) + producer.send(1, "one") + producer.send(2, "two") + producer.send(3, "three") + producer.send(4, "four") + + fetch1 = FetchRequest("test_hash_partitioner", 0, 0, 1024) + fetch2 = FetchRequest("test_hash_partitioner", 1, 0, 1024) + + fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, + fetch2]) + + self.assertEquals(fetch_resp1.error, 0) + self.assertEquals(fetch_resp1.highwaterMark, 2) + self.assertEquals(fetch_resp1.partition, 0) + + messages = list(fetch_resp1.messages) + self.assertEquals(len(messages), 2) + self.assertEquals(messages[0].message.value, "two") + self.assertEquals(messages[1].message.value, "four") + + self.assertEquals(fetch_resp2.error, 0) + self.assertEquals(fetch_resp2.highwaterMark, 2) + self.assertEquals(fetch_resp2.partition, 1) + + messages = list(fetch_resp2.messages) + self.assertEquals(len(messages), 2) + self.assertEquals(messages[0].message.value, "one") + self.assertEquals(messages[1].message.value, "three") + + producer.stop() + + def test_acks_none(self): + producer = SimpleProducer(self.client, "test_acks_none", + req_acks=SimpleProducer.ACK_NOT_REQUIRED) + resp = producer.send_messages("one") + self.assertEquals(len(resp), 0) + + fetch = FetchRequest("test_acks_none", 0, 0, 1024) + fetch_resp = self.client.send_fetch_request([fetch]) + + self.assertEquals(fetch_resp[0].error, 0) + self.assertEquals(fetch_resp[0].highwaterMark, 1) + self.assertEquals(fetch_resp[0].partition, 0) + + messages = list(fetch_resp[0].messages) + self.assertEquals(len(messages), 1) + self.assertEquals(messages[0].message.value, "one") + + producer.stop() + + def test_acks_local_write(self): + producer = SimpleProducer(self.client, "test_acks_local_write", + req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE) + resp = producer.send_messages("one") + self.assertEquals(len(resp), 1) + + fetch = FetchRequest("test_acks_local_write", 0, 0, 1024) + fetch_resp = self.client.send_fetch_request([fetch]) + + self.assertEquals(fetch_resp[0].error, 0) + self.assertEquals(fetch_resp[0].highwaterMark, 1) + self.assertEquals(fetch_resp[0].partition, 0) + + messages = list(fetch_resp[0].messages) + self.assertEquals(len(messages), 1) + self.assertEquals(messages[0].message.value, "one") + + producer.stop() + + def test_acks_cluster_commit(self): + producer = SimpleProducer(self.client, "test_acks_cluster_commit", + req_acks=SimpleProducer.ACK_AFTER_CLUSTER_COMMIT) + resp = producer.send_messages("one") + self.assertEquals(len(resp), 1) + + fetch = FetchRequest("test_acks_cluster_commit", 0, 0, 1024) + fetch_resp = self.client.send_fetch_request([fetch]) + + self.assertEquals(fetch_resp[0].error, 0) + self.assertEquals(fetch_resp[0].highwaterMark, 1) + self.assertEquals(fetch_resp[0].partition, 0) + + messages = list(fetch_resp[0].messages) + self.assertEquals(len(messages), 1) + self.assertEquals(messages[0].message.value, "one") + + producer.stop() + + def test_async_simple_producer(self): + producer = SimpleProducer(self.client, "test_async_simple_producer", + async=True) + + resp = producer.send_messages("one") + self.assertEquals(len(resp), 0) + + # Give it some time + time.sleep(2) + + fetch = FetchRequest("test_async_simple_producer", 0, 0, 1024) + fetch_resp = self.client.send_fetch_request([fetch]) + + self.assertEquals(fetch_resp[0].error, 0) + self.assertEquals(fetch_resp[0].highwaterMark, 1) + self.assertEquals(fetch_resp[0].partition, 0) + + messages = list(fetch_resp[0].messages) + self.assertEquals(len(messages), 1) + self.assertEquals(messages[0].message.value, "one") + + producer.stop() + + def test_async_keyed_producer(self): + producer = KeyedProducer(self.client, "test_async_keyed_producer", + async=True) + + resp = producer.send("key1", "one") + self.assertEquals(len(resp), 0) + + # Give it some time + time.sleep(2) + + fetch = FetchRequest("test_async_keyed_producer", 0, 0, 1024) + fetch_resp = self.client.send_fetch_request([fetch]) + + self.assertEquals(fetch_resp[0].error, 0) + self.assertEquals(fetch_resp[0].highwaterMark, 1) + self.assertEquals(fetch_resp[0].partition, 0) + + messages = list(fetch_resp[0].messages) + self.assertEquals(len(messages), 1) + self.assertEquals(messages[0].message.value, "one") + + producer.stop() + + def test_batched_simple_producer(self): + producer = SimpleProducer(self.client, "test_batched_simple_producer", + batch_send=True, + batch_send_every_n=10, + batch_send_every_t=20) + + # Send 5 messages and do a fetch + msgs = ["message-%d" % i for i in range(0, 5)] + resp = producer.send_messages(*msgs) + + # Batch mode is async. No ack + self.assertEquals(len(resp), 0) + + # Give it some time + time.sleep(2) + + fetch1 = FetchRequest("test_batched_simple_producer", 0, 0, 1024) + fetch2 = FetchRequest("test_batched_simple_producer", 1, 0, 1024) + fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, + fetch2]) + + self.assertEquals(fetch_resp1.error, 0) + messages = list(fetch_resp1.messages) + self.assertEquals(len(messages), 0) + + self.assertEquals(fetch_resp2.error, 0) + messages = list(fetch_resp2.messages) + self.assertEquals(len(messages), 0) + + # Send 5 more messages, wait for 2 seconds and do a fetch + msgs = ["message-%d" % i for i in range(5, 10)] + resp = producer.send_messages(*msgs) + + # Give it some time + time.sleep(2) + + fetch1 = FetchRequest("test_batched_simple_producer", 0, 0, 1024) + fetch2 = FetchRequest("test_batched_simple_producer", 1, 0, 1024) + fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, + fetch2]) + + self.assertEquals(fetch_resp1.error, 0) + messages = list(fetch_resp1.messages) + self.assertEquals(len(messages), 5) + + self.assertEquals(fetch_resp2.error, 0) + messages = list(fetch_resp2.messages) + self.assertEquals(len(messages), 5) + + # Send 7 messages and wait for 20 seconds + msgs = ["message-%d" % i for i in range(10, 15)] + resp = producer.send_messages(*msgs) + msgs = ["message-%d" % i for i in range(15, 17)] + resp = producer.send_messages(*msgs) + + fetch1 = FetchRequest("test_batched_simple_producer", 0, 5, 1024) + fetch2 = FetchRequest("test_batched_simple_producer", 1, 5, 1024) + fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, + fetch2]) + + self.assertEquals(fetch_resp1.error, 0) + self.assertEquals(fetch_resp2.error, 0) + messages = list(fetch_resp1.messages) + list(fetch_resp2.messages) + self.assertEquals(len(messages), 0) + + # Give it some time + time.sleep(22) + + fetch1 = FetchRequest("test_batched_simple_producer", 0, 5, 1024) + fetch2 = FetchRequest("test_batched_simple_producer", 1, 5, 1024) + fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, + fetch2]) + + self.assertEquals(fetch_resp1.error, 0) + self.assertEquals(fetch_resp2.error, 0) + messages = list(fetch_resp1.messages) + list(fetch_resp2.messages) + self.assertEquals(len(messages), 7) + + producer.stop() + class TestSimpleConsumer(unittest.TestCase): @classmethod |