diff options
-rw-r--r-- | test/test_integration.py | 81 |
1 files changed, 58 insertions, 23 deletions
diff --git a/test/test_integration.py b/test/test_integration.py index 3d6ccf6..4087df7 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -287,21 +287,26 @@ class TestKafkaClient(KafkaTestCase): producer = SimpleProducer(self.client) resp = producer.send_messages(self.topic, "one", "two") - # Will go to partition 0 + partition_for_first_batch = resp[0].partition + self.assertEquals(len(resp), 1) self.assertEquals(resp[0].error, 0) self.assertEquals(resp[0].offset, 0) # offset of first msg - # Will go to partition 1 + # ensure this partition is different from the first partition resp = producer.send_messages(self.topic, "three") + partition_for_second_batch = resp[0].partition + self.assertNotEquals(partition_for_first_batch, partition_for_second_batch) + self.assertEquals(len(resp), 1) self.assertEquals(resp[0].error, 0) self.assertEquals(resp[0].offset, 0) # offset of first msg - fetch1 = FetchRequest(self.topic, 0, 0, 1024) - fetch2 = FetchRequest(self.topic, 1, 0, 1024) - fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, - fetch2]) + fetch_requests = ( + FetchRequest(self.topic, partition_for_first_batch, 0, 1024), + FetchRequest(self.topic, partition_for_second_batch, 0, 1024), + ) + fetch_resp1, fetch_resp2 = self.client.send_fetch_request(fetch_requests) self.assertEquals(fetch_resp1.error, 0) self.assertEquals(fetch_resp1.highwaterMark, 2) messages = list(fetch_resp1.messages) @@ -314,11 +319,12 @@ class TestKafkaClient(KafkaTestCase): self.assertEquals(len(messages), 1) self.assertEquals(messages[0].message.value, "three") - # Will go to partition 0 + # Will go to same partition as first batch resp = producer.send_messages(self.topic, "four", "five") self.assertEquals(len(resp), 1) self.assertEquals(resp[0].error, 0) self.assertEquals(resp[0].offset, 2) # offset of first msg + self.assertEquals(resp[0].partition, partition_for_first_batch) producer.stop() @@ -396,14 +402,25 @@ class TestKafkaClient(KafkaTestCase): resp = producer.send_messages(self.topic, "one") self.assertEquals(len(resp), 0) - fetch = FetchRequest(self.topic, 0, 0, 1024) - fetch_resp = self.client.send_fetch_request([fetch]) + # fetch from both partitions + fetch_requests = ( + FetchRequest(self.topic, 0, 0, 1024), + FetchRequest(self.topic, 1, 0, 1024), + ) + fetch_resps = self.client.send_fetch_request(fetch_requests) - self.assertEquals(fetch_resp[0].error, 0) - self.assertEquals(fetch_resp[0].highwaterMark, 1) - self.assertEquals(fetch_resp[0].partition, 0) + # determine which partition was selected (due to random round-robin) + published_to_resp = max(fetch_resps, key=lambda x: x.highwaterMark) + not_published_to_resp = min(fetch_resps, key=lambda x: x.highwaterMark) + self.assertNotEquals(published_to_resp.partition, not_published_to_resp.partition) - messages = list(fetch_resp[0].messages) + self.assertEquals(published_to_resp.error, 0) + self.assertEquals(published_to_resp.highwaterMark, 1) + + self.assertEquals(not_published_to_resp.error, 0) + self.assertEquals(not_published_to_resp.highwaterMark, 0) + + messages = list(published_to_resp.messages) self.assertEquals(len(messages), 1) self.assertEquals(messages[0].message.value, "one") @@ -415,12 +432,14 @@ class TestKafkaClient(KafkaTestCase): resp = producer.send_messages(self.topic, "one") self.assertEquals(len(resp), 1) - fetch = FetchRequest(self.topic, 0, 0, 1024) + partition = resp[0].partition + + fetch = FetchRequest(self.topic, partition, 0, 1024) fetch_resp = self.client.send_fetch_request([fetch]) self.assertEquals(fetch_resp[0].error, 0) self.assertEquals(fetch_resp[0].highwaterMark, 1) - self.assertEquals(fetch_resp[0].partition, 0) + self.assertEquals(fetch_resp[0].partition, partition) messages = list(fetch_resp[0].messages) self.assertEquals(len(messages), 1) @@ -435,12 +454,14 @@ class TestKafkaClient(KafkaTestCase): resp = producer.send_messages(self.topic, "one") self.assertEquals(len(resp), 1) - fetch = FetchRequest(self.topic, 0, 0, 1024) + partition = resp[0].partition + + fetch = FetchRequest(self.topic, partition, 0, 1024) fetch_resp = self.client.send_fetch_request([fetch]) self.assertEquals(fetch_resp[0].error, 0) self.assertEquals(fetch_resp[0].highwaterMark, 1) - self.assertEquals(fetch_resp[0].partition, 0) + self.assertEquals(fetch_resp[0].partition, partition) messages = list(fetch_resp[0].messages) self.assertEquals(len(messages), 1) @@ -456,17 +477,31 @@ class TestKafkaClient(KafkaTestCase): # Give it some time time.sleep(2) - fetch = FetchRequest(self.topic, 0, 0, 1024) - fetch_resp = self.client.send_fetch_request([fetch]) + # fetch from both partitions + fetch_requests = ( + FetchRequest(self.topic, 0, 0, 1024), + FetchRequest(self.topic, 1, 0, 1024), + ) + fetch_resps = self.client.send_fetch_request(fetch_requests) - self.assertEquals(fetch_resp[0].error, 0) - self.assertEquals(fetch_resp[0].highwaterMark, 1) - self.assertEquals(fetch_resp[0].partition, 0) + # determine which partition was selected (due to random round-robin) + published_to_resp = max(fetch_resps, key=lambda x: x.highwaterMark) + not_published_to_resp = min(fetch_resps, key=lambda x: x.highwaterMark) + self.assertNotEquals(published_to_resp.partition, not_published_to_resp.partition) - messages = list(fetch_resp[0].messages) + self.assertEquals(published_to_resp.error, 0) + self.assertEquals(published_to_resp.highwaterMark, 1) + + self.assertEquals(not_published_to_resp.error, 0) + self.assertEquals(not_published_to_resp.highwaterMark, 0) + + messages = list(published_to_resp.messages) self.assertEquals(len(messages), 1) self.assertEquals(messages[0].message.value, "one") + messages = list(not_published_to_resp.messages) + self.assertEquals(len(messages), 0) + producer.stop() def test_async_keyed_producer(self): |