diff options
author | Lou Marvin Caraig <loumarvincaraig@gmail.com> | 2014-11-26 13:06:21 +0100 |
---|---|---|
committer | Lou Marvin Caraig <loumarvincaraig@gmail.com> | 2014-11-26 17:10:03 +0100 |
commit | a9e77bdaa2490d4c8c343d18f32d7b256c50ddd7 (patch) | |
tree | 02d7a7fc69748552d2e478b344e6fb8d4cc22a42 /test | |
parent | 2716d06ea01edc07a52ffb6a645b4b331965b781 (diff) | |
download | kafka-python-a9e77bdaa2490d4c8c343d18f32d7b256c50ddd7.tar.gz |
Fixed TestKafkaProducerIntegration
Diffstat (limited to 'test')
-rw-r--r-- | test/test_producer_integration.py | 42 | ||||
-rw-r--r-- | test/testutil.py | 4 |
2 files changed, 29 insertions, 17 deletions
diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py index 1ebbc86..d68af72 100644 --- a/test/test_producer_integration.py +++ b/test/test_producer_integration.py @@ -199,10 +199,10 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): start_offset1 = self.current_offset(self.topic, 1) producer = KeyedProducer(self.client, partitioner=RoundRobinPartitioner) - resp1 = producer.send(self.topic, "key1", self.msg("one")) - resp2 = producer.send(self.topic, "key2", self.msg("two")) - resp3 = producer.send(self.topic, "key3", self.msg("three")) - resp4 = producer.send(self.topic, "key4", self.msg("four")) + resp1 = producer.send(self.topic, self.key("key1"), self.msg("one")) + resp2 = producer.send(self.topic, self.key("key2"), self.msg("two")) + resp3 = producer.send(self.topic, self.key("key3"), self.msg("three")) + resp4 = producer.send(self.topic, self.key("key4"), self.msg("four")) self.assert_produce_response(resp1, start_offset0+0) self.assert_produce_response(resp2, start_offset1+0) @@ -220,20 +220,28 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): start_offset1 = self.current_offset(self.topic, 1) producer = KeyedProducer(self.client, partitioner=HashedPartitioner) - resp1 = producer.send(self.topic, 1, self.msg("one")) - resp2 = producer.send(self.topic, 2, self.msg("two")) - resp3 = producer.send(self.topic, 3, self.msg("three")) - resp4 = producer.send(self.topic, 3, self.msg("four")) - resp5 = producer.send(self.topic, 4, self.msg("five")) + resp1 = producer.send(self.topic, self.key("1"), self.msg("one")) + resp2 = producer.send(self.topic, self.key("2"), self.msg("two")) + resp3 = producer.send(self.topic, self.key("3"), self.msg("three")) + resp4 = producer.send(self.topic, self.key("3"), self.msg("four")) + resp5 = producer.send(self.topic, self.key("4"), self.msg("five")) - self.assert_produce_response(resp1, start_offset1+0) - self.assert_produce_response(resp2, start_offset0+0) - self.assert_produce_response(resp3, start_offset1+1) - self.assert_produce_response(resp4, start_offset1+2) - self.assert_produce_response(resp5, start_offset0+1) + offsets = {0: start_offset0, 1: start_offset1} + messages = {0: [], 1: []} - self.assert_fetch_offset(0, start_offset0, [ self.msg("two"), self.msg("five") ]) - self.assert_fetch_offset(1, start_offset1, [ self.msg("one"), self.msg("three"), self.msg("four") ]) + keys = [self.key(k) for k in ["1", "2", "3", "3", "4"]] + resps = [resp1, resp2, resp3, resp4, resp5] + msgs = [self.msg(m) for m in ["one", "two", "three", "four", "five"]] + + for key, resp, msg in zip(keys, resps, msgs): + k = hash(key) % 2 + offset = offsets[k] + self.assert_produce_response(resp, offset) + offsets[k] += 1 + messages[k].append(msg) + + self.assert_fetch_offset(0, start_offset0, messages[0]) + self.assert_fetch_offset(1, start_offset1, messages[1]) producer.stop() @@ -393,7 +401,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): producer = KeyedProducer(self.client, partitioner = RoundRobinPartitioner, async=True) - resp = producer.send(self.topic, "key1", self.msg("one")) + resp = producer.send(self.topic, self.key("key1"), self.msg("one")) self.assertEquals(len(resp), 0) self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ]) diff --git a/test/testutil.py b/test/testutil.py index fba3869..7661cbc 100644 --- a/test/testutil.py +++ b/test/testutil.py @@ -89,6 +89,10 @@ class KafkaIntegrationTestCase(unittest.TestCase): return self._messages[s].encode('utf-8') + def key(self, k): + return k.encode('utf-8') + + class Timer(object): def __enter__(self): self.start = time.time() |