diff options
Diffstat (limited to 'test/test_integration.py')
-rw-r--r-- | test/test_integration.py | 82 |
1 files changed, 41 insertions, 41 deletions
diff --git a/test/test_integration.py b/test/test_integration.py index 5a22630..d0da523 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -284,8 +284,8 @@ class TestKafkaClient(KafkaTestCase): # Producer Tests def test_simple_producer(self): - producer = SimpleProducer(self.client, self.topic) - resp = producer.send_messages("one", "two") + producer = SimpleProducer(self.client) + resp = producer.send_messages(self.topic, "one", "two") # Will go to partition 0 self.assertEquals(len(resp), 1) @@ -293,7 +293,7 @@ class TestKafkaClient(KafkaTestCase): self.assertEquals(resp[0].offset, 0) # offset of first msg # Will go to partition 1 - resp = producer.send_messages("three") + resp = producer.send_messages(self.topic, "three") self.assertEquals(len(resp), 1) self.assertEquals(resp[0].error, 0) self.assertEquals(resp[0].offset, 0) # offset of first msg @@ -315,7 +315,7 @@ class TestKafkaClient(KafkaTestCase): self.assertEquals(messages[0].message.value, "three") # Will go to partition 0 - resp = producer.send_messages("four", "five") + resp = producer.send_messages(self.topic, "four", "five") self.assertEquals(len(resp), 1) self.assertEquals(resp[0].error, 0) self.assertEquals(resp[0].offset, 2) # offset of first msg @@ -323,12 +323,12 @@ class TestKafkaClient(KafkaTestCase): producer.stop() def test_round_robin_partitioner(self): - producer = KeyedProducer(self.client, self.topic, + producer = KeyedProducer(self.client, partitioner=RoundRobinPartitioner) - producer.send("key1", "one") - producer.send("key2", "two") - producer.send("key3", "three") - producer.send("key4", "four") + producer.send(self.topic, "key1", "one") + producer.send(self.topic, "key2", "two") + producer.send(self.topic, "key3", "three") + producer.send(self.topic, "key4", "four") fetch1 = FetchRequest(self.topic, 0, 0, 1024) fetch2 = FetchRequest(self.topic, 1, 0, 1024) @@ -357,12 +357,12 @@ class TestKafkaClient(KafkaTestCase): producer.stop() def test_hashed_partitioner(self): - producer = KeyedProducer(self.client, self.topic, + producer = KeyedProducer(self.client, partitioner=HashedPartitioner) - producer.send(1, "one") - producer.send(2, "two") - producer.send(3, "three") - producer.send(4, "four") + producer.send(self.topic, 1, "one") + producer.send(self.topic, 2, "two") + producer.send(self.topic, 3, "three") + producer.send(self.topic, 4, "four") fetch1 = FetchRequest(self.topic, 0, 0, 1024) fetch2 = FetchRequest(self.topic, 1, 0, 1024) @@ -391,9 +391,9 @@ class TestKafkaClient(KafkaTestCase): producer.stop() def test_acks_none(self): - producer = SimpleProducer(self.client, self.topic, + producer = SimpleProducer(self.client, req_acks=SimpleProducer.ACK_NOT_REQUIRED) - resp = producer.send_messages("one") + resp = producer.send_messages(self.topic, "one") self.assertEquals(len(resp), 0) fetch = FetchRequest(self.topic, 0, 0, 1024) @@ -410,9 +410,9 @@ class TestKafkaClient(KafkaTestCase): producer.stop() def test_acks_local_write(self): - producer = SimpleProducer(self.client, self.topic, + producer = SimpleProducer(self.client, req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE) - resp = producer.send_messages("one") + resp = producer.send_messages(self.topic, "one") self.assertEquals(len(resp), 1) fetch = FetchRequest(self.topic, 0, 0, 1024) @@ -430,9 +430,9 @@ class TestKafkaClient(KafkaTestCase): def test_acks_cluster_commit(self): producer = SimpleProducer( - self.client, self.topic, + self.client, req_acks=SimpleProducer.ACK_AFTER_CLUSTER_COMMIT) - resp = producer.send_messages("one") + resp = producer.send_messages(self.topic, "one") self.assertEquals(len(resp), 1) fetch = FetchRequest(self.topic, 0, 0, 1024) @@ -449,8 +449,8 @@ class TestKafkaClient(KafkaTestCase): producer.stop() def test_async_simple_producer(self): - producer = SimpleProducer(self.client, self.topic, async=True) - resp = producer.send_messages("one") + producer = SimpleProducer(self.client, async=True) + resp = producer.send_messages(self.topic, "one") self.assertEquals(len(resp), 0) # Give it some time @@ -470,9 +470,9 @@ class TestKafkaClient(KafkaTestCase): producer.stop() def test_async_keyed_producer(self): - producer = KeyedProducer(self.client, self.topic, async=True) + producer = KeyedProducer(self.client, async=True) - resp = producer.send("key1", "one") + resp = producer.send(self.topic, "key1", "one") self.assertEquals(len(resp), 0) # Give it some time @@ -492,14 +492,14 @@ class TestKafkaClient(KafkaTestCase): producer.stop() def test_batched_simple_producer(self): - producer = SimpleProducer(self.client, self.topic, + producer = SimpleProducer(self.client, batch_send=True, batch_send_every_n=10, batch_send_every_t=20) # Send 5 messages and do a fetch msgs = ["message-%d" % i for i in range(0, 5)] - resp = producer.send_messages(*msgs) + resp = producer.send_messages(self.topic, *msgs) # Batch mode is async. No ack self.assertEquals(len(resp), 0) @@ -522,7 +522,7 @@ class TestKafkaClient(KafkaTestCase): # Send 5 more messages, wait for 2 seconds and do a fetch msgs = ["message-%d" % i for i in range(5, 10)] - resp = producer.send_messages(*msgs) + resp = producer.send_messages(self.topic, *msgs) # Give it some time time.sleep(2) @@ -542,9 +542,9 @@ class TestKafkaClient(KafkaTestCase): # Send 7 messages and wait for 20 seconds msgs = ["message-%d" % i for i in range(10, 15)] - resp = producer.send_messages(*msgs) + resp = producer.send_messages(self.topic, *msgs) msgs = ["message-%d" % i for i in range(15, 17)] - resp = producer.send_messages(*msgs) + resp = producer.send_messages(self.topic, *msgs) fetch1 = FetchRequest(self.topic, 0, 5, 1024) fetch2 = FetchRequest(self.topic, 1, 5, 1024) @@ -846,25 +846,25 @@ class TestFailover(KafkaTestCase): def test_switch_leader(self): key, topic, partition = random_string(5), self.topic, 0 - producer = SimpleProducer(self.client, topic) + producer = SimpleProducer(self.client) for i in range(1, 4): # XXX unfortunately, the conns dict needs to be warmed for this to work # XXX unfortunately, for warming to work, we need at least as many partitions as brokers - self._send_random_messages(producer, 10) + self._send_random_messages(producer, self.topic, 10) # kil leader for partition 0 broker = self._kill_leader(topic, partition) # expect failure, reload meta data with self.assertRaises(FailedPayloadsError): - producer.send_messages('part 1') - producer.send_messages('part 2') + producer.send_messages(self.topic, 'part 1') + producer.send_messages(self.topic, 'part 2') time.sleep(1) # send to new leader - self._send_random_messages(producer, 10) + self._send_random_messages(producer, self.topic, 10) broker.open() time.sleep(3) @@ -877,22 +877,22 @@ class TestFailover(KafkaTestCase): def test_switch_leader_async(self): key, topic, partition = random_string(5), self.topic, 0 - producer = SimpleProducer(self.client, topic, async=True) + producer = SimpleProducer(self.client, async=True) for i in range(1, 4): - self._send_random_messages(producer, 10) + self._send_random_messages(producer, self.topic, 10) # kil leader for partition 0 broker = self._kill_leader(topic, partition) # expect failure, reload meta data - producer.send_messages('part 1') - producer.send_messages('part 2') + producer.send_messages(self.topic, 'part 1') + producer.send_messages(self.topic, 'part 2') time.sleep(1) # send to new leader - self._send_random_messages(producer, 10) + self._send_random_messages(producer, self.topic, 10) broker.open() time.sleep(3) @@ -903,9 +903,9 @@ class TestFailover(KafkaTestCase): producer.stop() - def _send_random_messages(self, producer, n): + def _send_random_messages(self, producer, topic, n): for j in range(n): - resp = producer.send_messages(random_string(10)) + resp = producer.send_messages(topic, random_string(10)) if len(resp) > 0: self.assertEquals(resp[0].error, 0) time.sleep(1) # give it some time |