summaryrefslogtreecommitdiff
path: root/test/test_integration.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_integration.py')
-rw-r--r--test/test_integration.py82
1 files changed, 41 insertions, 41 deletions
diff --git a/test/test_integration.py b/test/test_integration.py
index 5a22630..d0da523 100644
--- a/test/test_integration.py
+++ b/test/test_integration.py
@@ -284,8 +284,8 @@ class TestKafkaClient(KafkaTestCase):
# Producer Tests
def test_simple_producer(self):
- producer = SimpleProducer(self.client, self.topic)
- resp = producer.send_messages("one", "two")
+ producer = SimpleProducer(self.client)
+ resp = producer.send_messages(self.topic, "one", "two")
# Will go to partition 0
self.assertEquals(len(resp), 1)
@@ -293,7 +293,7 @@ class TestKafkaClient(KafkaTestCase):
self.assertEquals(resp[0].offset, 0) # offset of first msg
# Will go to partition 1
- resp = producer.send_messages("three")
+ resp = producer.send_messages(self.topic, "three")
self.assertEquals(len(resp), 1)
self.assertEquals(resp[0].error, 0)
self.assertEquals(resp[0].offset, 0) # offset of first msg
@@ -315,7 +315,7 @@ class TestKafkaClient(KafkaTestCase):
self.assertEquals(messages[0].message.value, "three")
# Will go to partition 0
- resp = producer.send_messages("four", "five")
+ resp = producer.send_messages(self.topic, "four", "five")
self.assertEquals(len(resp), 1)
self.assertEquals(resp[0].error, 0)
self.assertEquals(resp[0].offset, 2) # offset of first msg
@@ -323,12 +323,12 @@ class TestKafkaClient(KafkaTestCase):
producer.stop()
def test_round_robin_partitioner(self):
- producer = KeyedProducer(self.client, self.topic,
+ producer = KeyedProducer(self.client,
partitioner=RoundRobinPartitioner)
- producer.send("key1", "one")
- producer.send("key2", "two")
- producer.send("key3", "three")
- producer.send("key4", "four")
+ producer.send(self.topic, "key1", "one")
+ producer.send(self.topic, "key2", "two")
+ producer.send(self.topic, "key3", "three")
+ producer.send(self.topic, "key4", "four")
fetch1 = FetchRequest(self.topic, 0, 0, 1024)
fetch2 = FetchRequest(self.topic, 1, 0, 1024)
@@ -357,12 +357,12 @@ class TestKafkaClient(KafkaTestCase):
producer.stop()
def test_hashed_partitioner(self):
- producer = KeyedProducer(self.client, self.topic,
+ producer = KeyedProducer(self.client,
partitioner=HashedPartitioner)
- producer.send(1, "one")
- producer.send(2, "two")
- producer.send(3, "three")
- producer.send(4, "four")
+ producer.send(self.topic, 1, "one")
+ producer.send(self.topic, 2, "two")
+ producer.send(self.topic, 3, "three")
+ producer.send(self.topic, 4, "four")
fetch1 = FetchRequest(self.topic, 0, 0, 1024)
fetch2 = FetchRequest(self.topic, 1, 0, 1024)
@@ -391,9 +391,9 @@ class TestKafkaClient(KafkaTestCase):
producer.stop()
def test_acks_none(self):
- producer = SimpleProducer(self.client, self.topic,
+ producer = SimpleProducer(self.client,
req_acks=SimpleProducer.ACK_NOT_REQUIRED)
- resp = producer.send_messages("one")
+ resp = producer.send_messages(self.topic, "one")
self.assertEquals(len(resp), 0)
fetch = FetchRequest(self.topic, 0, 0, 1024)
@@ -410,9 +410,9 @@ class TestKafkaClient(KafkaTestCase):
producer.stop()
def test_acks_local_write(self):
- producer = SimpleProducer(self.client, self.topic,
+ producer = SimpleProducer(self.client,
req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE)
- resp = producer.send_messages("one")
+ resp = producer.send_messages(self.topic, "one")
self.assertEquals(len(resp), 1)
fetch = FetchRequest(self.topic, 0, 0, 1024)
@@ -430,9 +430,9 @@ class TestKafkaClient(KafkaTestCase):
def test_acks_cluster_commit(self):
producer = SimpleProducer(
- self.client, self.topic,
+ self.client,
req_acks=SimpleProducer.ACK_AFTER_CLUSTER_COMMIT)
- resp = producer.send_messages("one")
+ resp = producer.send_messages(self.topic, "one")
self.assertEquals(len(resp), 1)
fetch = FetchRequest(self.topic, 0, 0, 1024)
@@ -449,8 +449,8 @@ class TestKafkaClient(KafkaTestCase):
producer.stop()
def test_async_simple_producer(self):
- producer = SimpleProducer(self.client, self.topic, async=True)
- resp = producer.send_messages("one")
+ producer = SimpleProducer(self.client, async=True)
+ resp = producer.send_messages(self.topic, "one")
self.assertEquals(len(resp), 0)
# Give it some time
@@ -470,9 +470,9 @@ class TestKafkaClient(KafkaTestCase):
producer.stop()
def test_async_keyed_producer(self):
- producer = KeyedProducer(self.client, self.topic, async=True)
+ producer = KeyedProducer(self.client, async=True)
- resp = producer.send("key1", "one")
+ resp = producer.send(self.topic, "key1", "one")
self.assertEquals(len(resp), 0)
# Give it some time
@@ -492,14 +492,14 @@ class TestKafkaClient(KafkaTestCase):
producer.stop()
def test_batched_simple_producer(self):
- producer = SimpleProducer(self.client, self.topic,
+ producer = SimpleProducer(self.client,
batch_send=True,
batch_send_every_n=10,
batch_send_every_t=20)
# Send 5 messages and do a fetch
msgs = ["message-%d" % i for i in range(0, 5)]
- resp = producer.send_messages(*msgs)
+ resp = producer.send_messages(self.topic, *msgs)
# Batch mode is async. No ack
self.assertEquals(len(resp), 0)
@@ -522,7 +522,7 @@ class TestKafkaClient(KafkaTestCase):
# Send 5 more messages, wait for 2 seconds and do a fetch
msgs = ["message-%d" % i for i in range(5, 10)]
- resp = producer.send_messages(*msgs)
+ resp = producer.send_messages(self.topic, *msgs)
# Give it some time
time.sleep(2)
@@ -542,9 +542,9 @@ class TestKafkaClient(KafkaTestCase):
# Send 7 messages and wait for 20 seconds
msgs = ["message-%d" % i for i in range(10, 15)]
- resp = producer.send_messages(*msgs)
+ resp = producer.send_messages(self.topic, *msgs)
msgs = ["message-%d" % i for i in range(15, 17)]
- resp = producer.send_messages(*msgs)
+ resp = producer.send_messages(self.topic, *msgs)
fetch1 = FetchRequest(self.topic, 0, 5, 1024)
fetch2 = FetchRequest(self.topic, 1, 5, 1024)
@@ -846,25 +846,25 @@ class TestFailover(KafkaTestCase):
def test_switch_leader(self):
key, topic, partition = random_string(5), self.topic, 0
- producer = SimpleProducer(self.client, topic)
+ producer = SimpleProducer(self.client)
for i in range(1, 4):
# XXX unfortunately, the conns dict needs to be warmed for this to work
# XXX unfortunately, for warming to work, we need at least as many partitions as brokers
- self._send_random_messages(producer, 10)
+ self._send_random_messages(producer, self.topic, 10)
# kil leader for partition 0
broker = self._kill_leader(topic, partition)
# expect failure, reload meta data
with self.assertRaises(FailedPayloadsError):
- producer.send_messages('part 1')
- producer.send_messages('part 2')
+ producer.send_messages(self.topic, 'part 1')
+ producer.send_messages(self.topic, 'part 2')
time.sleep(1)
# send to new leader
- self._send_random_messages(producer, 10)
+ self._send_random_messages(producer, self.topic, 10)
broker.open()
time.sleep(3)
@@ -877,22 +877,22 @@ class TestFailover(KafkaTestCase):
def test_switch_leader_async(self):
key, topic, partition = random_string(5), self.topic, 0
- producer = SimpleProducer(self.client, topic, async=True)
+ producer = SimpleProducer(self.client, async=True)
for i in range(1, 4):
- self._send_random_messages(producer, 10)
+ self._send_random_messages(producer, self.topic, 10)
# kil leader for partition 0
broker = self._kill_leader(topic, partition)
# expect failure, reload meta data
- producer.send_messages('part 1')
- producer.send_messages('part 2')
+ producer.send_messages(self.topic, 'part 1')
+ producer.send_messages(self.topic, 'part 2')
time.sleep(1)
# send to new leader
- self._send_random_messages(producer, 10)
+ self._send_random_messages(producer, self.topic, 10)
broker.open()
time.sleep(3)
@@ -903,9 +903,9 @@ class TestFailover(KafkaTestCase):
producer.stop()
- def _send_random_messages(self, producer, n):
+ def _send_random_messages(self, producer, topic, n):
for j in range(n):
- resp = producer.send_messages(random_string(10))
+ resp = producer.send_messages(topic, random_string(10))
if len(resp) > 0:
self.assertEquals(resp[0].error, 0)
time.sleep(1) # give it some time