summaryrefslogtreecommitdiff
path: root/test/test_producer_integration.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_producer_integration.py')
-rw-r--r--test/test_producer_integration.py366
1 files changed, 295 insertions, 71 deletions
diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py
index 8bab4d5..e148ad8 100644
--- a/test/test_producer_integration.py
+++ b/test/test_producer_integration.py
@@ -1,5 +1,5 @@
-import unittest
import time
+import unittest
from kafka import * # noqa
from kafka.common import * # noqa
@@ -14,48 +14,35 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
def setUpClass(cls): # noqa
cls.zk = ZookeeperFixture.instance()
cls.server = KafkaFixture.instance(0, cls.zk.host, cls.zk.port)
- cls.client = KafkaClient('%s:%d' % (cls.server.host, cls.server.port))
@classmethod
def tearDownClass(cls): # noqa
- cls.client.close()
cls.server.close()
cls.zk.close()
def test_produce_many_simple(self):
start_offset = self.current_offset(self.topic, 0)
- produce = ProduceRequest(self.topic, 0, messages=[
- create_message("Test message %d" % i) for i in range(100)
- ])
-
- resp = self.client.send_produce_request([produce])
- self.assertEqual(len(resp), 1) # Only one response
- self.assertEqual(resp[0].error, 0) # No error
- self.assertEqual(resp[0].offset, start_offset) # Initial offset of first message
+ self.assert_produce_request(
+ [ create_message("Test message %d" % i) for i in range(100) ],
+ start_offset,
+ 100,
+ )
- self.assertEqual(self.current_offset(self.topic, 0), start_offset+100)
-
- resp = self.client.send_produce_request([produce])
- self.assertEqual(len(resp), 1) # Only one response
- self.assertEqual(resp[0].error, 0) # No error
- self.assertEqual(resp[0].offset, start_offset+100) # Initial offset of first message
-
- self.assertEqual(self.current_offset(self.topic, 0), start_offset+200)
+ self.assert_produce_request(
+ [ create_message("Test message %d" % i) for i in range(100) ],
+ start_offset+100,
+ 100,
+ )
def test_produce_10k_simple(self):
start_offset = self.current_offset(self.topic, 0)
- produce = ProduceRequest(self.topic, 0, messages=[
- create_message("Test message %d" % i) for i in range(10000)
- ])
-
- resp = self.client.send_produce_request([produce])
- self.assertEqual(len(resp), 1) # Only one response
- self.assertEqual(resp[0].error, 0) # No error
- self.assertEqual(resp[0].offset, start_offset) # Initial offset of first message
-
- self.assertEqual(self.current_offset(self.topic, 0), start_offset+10000)
+ self.assert_produce_request(
+ [ create_message("Test message %d" % i) for i in range(10000) ],
+ start_offset,
+ 10000,
+ )
def test_produce_many_gzip(self):
start_offset = self.current_offset(self.topic, 0)
@@ -63,31 +50,23 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
message1 = create_gzip_message(["Gzipped 1 %d" % i for i in range(100)])
message2 = create_gzip_message(["Gzipped 2 %d" % i for i in range(100)])
- produce = ProduceRequest(self.topic, 0, messages=[message1, message2])
-
- resp = self.client.send_produce_request([produce])
- self.assertEqual(len(resp), 1) # Only one response
- self.assertEqual(resp[0].error, 0) # No error
- self.assertEqual(resp[0].offset, start_offset) # Initial offset of first message
-
- self.assertEqual(self.current_offset(self.topic, 0), start_offset+200)
+ self.assert_produce_request(
+ [ message1, message2 ],
+ start_offset,
+ 200,
+ )
@unittest.skip("All snappy integration tests fail with nosnappyjava")
def test_produce_many_snappy(self):
start_offset = self.current_offset(self.topic, 0)
- produce = ProduceRequest(self.topic, 0, messages=[
- create_snappy_message(["Snappy 1 %d" % i for i in range(100)]),
- create_snappy_message(["Snappy 2 %d" % i for i in range(100)]),
- ])
-
- resp = self.client.send_produce_request([produce])
-
- self.assertEqual(len(resp), 1) # Only one response
- self.assertEqual(resp[0].error, 0) # No error
- self.assertEqual(resp[0].offset, start_offset) # Initial offset of first message
-
- self.assertEqual(self.current_offset(self.topic, 0), start_offset+200)
+ self.assert_produce_request([
+ create_snappy_message(["Snappy 1 %d" % i for i in range(100)]),
+ create_snappy_message(["Snappy 2 %d" % i for i in range(100)]),
+ ],
+ start_offset,
+ 200,
+ )
def test_produce_mixed(self):
start_offset = self.current_offset(self.topic, 0)
@@ -103,37 +82,282 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
msg_count += 100
messages.append(create_snappy_message(["Snappy %d" % i for i in range(100)]))
- produce = ProduceRequest(self.topic, 0, messages=messages)
- resp = self.client.send_produce_request([produce])
-
- self.assertEqual(len(resp), 1) # Only one response
- self.assertEqual(resp[0].error, 0) # No error
- self.assertEqual(resp[0].offset, start_offset) # Initial offset of first message
-
- self.assertEqual(self.current_offset(self.topic, 0), start_offset+msg_count)
+ self.assert_produce_request(messages, start_offset, msg_count)
def test_produce_100k_gzipped(self):
start_offset = self.current_offset(self.topic, 0)
- req1 = ProduceRequest(self.topic, 0, messages=[
- create_gzip_message(["Gzipped batch 1, message %d" % i for i in range(50000)])
+ self.assert_produce_request([
+ create_gzip_message(["Gzipped batch 1, message %d" % i for i in range(50000)])
+ ],
+ start_offset,
+ 50000,
+ )
+
+ self.assert_produce_request([
+ create_gzip_message(["Gzipped batch 1, message %d" % i for i in range(50000)])
+ ],
+ start_offset+50000,
+ 50000,
+ )
+
+ ############################
+ # SimpleProducer Tests #
+ ############################
+
+ def test_simple_producer(self):
+ start_offset0 = self.current_offset(self.topic, 0)
+ start_offset1 = self.current_offset(self.topic, 1)
+ producer = SimpleProducer(self.client)
+
+ # Will go to partition 0
+ msg1, msg2, msg3, msg4, msg5 = [ str(uuid.uuid4()) for x in xrange(5) ]
+ resp = producer.send_messages(self.topic, self.msg("one"), self.msg("two"))
+ self.assert_produce_response(resp, start_offset0)
+
+ # Will go to partition 1
+ resp = producer.send_messages(self.topic, self.msg("three"))
+ self.assert_produce_response(resp, start_offset1)
+
+ self.assert_fetch_offset(0, start_offset0, [ self.msg("one"), self.msg("two") ])
+ self.assert_fetch_offset(1, start_offset1, [ self.msg("three") ])
+
+ # Will go to partition 0
+ resp = producer.send_messages(self.topic, self.msg("four"), self.msg("five"))
+ self.assert_produce_response(resp, start_offset0+2)
+ self.assert_fetch_offset(0, start_offset0, [ self.msg("one"), self.msg("two"), self.msg("four"), self.msg("five") ])
+
+ producer.stop()
+
+ def test_round_robin_partitioner(self):
+ msg1, msg2, msg3, msg4 = [ str(uuid.uuid4()) for _ in range(4) ]
+
+ start_offset0 = self.current_offset(self.topic, 0)
+ start_offset1 = self.current_offset(self.topic, 1)
+
+ producer = KeyedProducer(self.client, partitioner=RoundRobinPartitioner)
+ resp1 = producer.send(self.topic, "key1", self.msg("one"))
+ resp2 = producer.send(self.topic, "key2", self.msg("two"))
+ resp3 = producer.send(self.topic, "key3", self.msg("three"))
+ resp4 = producer.send(self.topic, "key4", self.msg("four"))
+
+ self.assert_produce_response(resp1, start_offset0+0)
+ self.assert_produce_response(resp2, start_offset1+0)
+ self.assert_produce_response(resp3, start_offset0+1)
+ self.assert_produce_response(resp4, start_offset1+1)
+
+ self.assert_fetch_offset(0, start_offset0, [ self.msg("one"), self.msg("three") ])
+ self.assert_fetch_offset(1, start_offset1, [ self.msg("two"), self.msg("four") ])
+
+ producer.stop()
+
+ def test_hashed_partitioner(self):
+ start_offset0 = self.current_offset(self.topic, 0)
+ start_offset1 = self.current_offset(self.topic, 1)
+
+ producer = KeyedProducer(self.client, partitioner=HashedPartitioner)
+ resp1 = producer.send(self.topic, 1, self.msg("one"))
+ resp2 = producer.send(self.topic, 2, self.msg("two"))
+ resp3 = producer.send(self.topic, 3, self.msg("three"))
+ resp4 = producer.send(self.topic, 3, self.msg("four"))
+ resp5 = producer.send(self.topic, 4, self.msg("five"))
+
+ self.assert_produce_response(resp1, start_offset1+0)
+ self.assert_produce_response(resp2, start_offset0+0)
+ self.assert_produce_response(resp3, start_offset1+1)
+ self.assert_produce_response(resp4, start_offset1+2)
+ self.assert_produce_response(resp5, start_offset0+1)
+
+ self.assert_fetch_offset(0, start_offset0, [ self.msg("two"), self.msg("five") ])
+ self.assert_fetch_offset(1, start_offset1, [ self.msg("one"), self.msg("three"), self.msg("four") ])
+
+ producer.stop()
+
+ def test_acks_none(self):
+ start_offset0 = self.current_offset(self.topic, 0)
+ start_offset1 = self.current_offset(self.topic, 1)
+
+ producer = SimpleProducer(self.client, req_acks=SimpleProducer.ACK_NOT_REQUIRED)
+ resp = producer.send_messages(self.topic, self.msg("one"))
+ self.assertEquals(len(resp), 0)
+
+ self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ])
+ producer.stop()
+
+ def test_acks_local_write(self):
+ start_offset0 = self.current_offset(self.topic, 0)
+ start_offset1 = self.current_offset(self.topic, 1)
+
+ producer = SimpleProducer(self.client, req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE)
+ resp = producer.send_messages(self.topic, self.msg("one"))
+
+ self.assert_produce_response(resp, start_offset0)
+ self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ])
+
+ producer.stop()
+
+ def test_acks_cluster_commit(self):
+ start_offset0 = self.current_offset(self.topic, 0)
+ start_offset1 = self.current_offset(self.topic, 1)
+
+ producer = SimpleProducer(
+ self.client,
+ req_acks=SimpleProducer.ACK_AFTER_CLUSTER_COMMIT)
+
+ resp = producer.send_messages(self.topic, self.msg("one"))
+ self.assert_produce_response(resp, start_offset0)
+ self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ])
+
+ producer.stop()
+
+ def test_batched_simple_producer__triggers_by_message(self):
+ start_offset0 = self.current_offset(self.topic, 0)
+ start_offset1 = self.current_offset(self.topic, 1)
+
+ producer = SimpleProducer(self.client,
+ batch_send=True,
+ batch_send_every_n=5,
+ batch_send_every_t=20)
+
+ # Send 5 messages and do a fetch
+ resp = producer.send_messages(self.topic,
+ self.msg("one"),
+ self.msg("two"),
+ self.msg("three"),
+ self.msg("four"),
+ )
+
+ # Batch mode is async. No ack
+ self.assertEquals(len(resp), 0)
+
+ # It hasn't sent yet
+ self.assert_fetch_offset(0, start_offset0, [])
+ self.assert_fetch_offset(1, start_offset1, [])
+
+ resp = producer.send_messages(self.topic,
+ self.msg("five"),
+ self.msg("six"),
+ self.msg("seven"),
+ )
+
+ # Batch mode is async. No ack
+ self.assertEquals(len(resp), 0)
+
+ self.assert_fetch_offset(0, start_offset0, [
+ self.msg("one"),
+ self.msg("two"),
+ self.msg("three"),
+ self.msg("four"),
])
- resp1 = self.client.send_produce_request([req1])
- self.assertEqual(len(resp1), 1) # Only one response
- self.assertEqual(resp1[0].error, 0) # No error
- self.assertEqual(resp1[0].offset, start_offset) # Initial offset of first message
+ self.assert_fetch_offset(1, start_offset1, [
+ self.msg("five"),
+ # self.msg("six"),
+ # self.msg("seven"),
+ ])
- self.assertEqual(self.current_offset(self.topic, 0), start_offset+50000)
+ producer.stop()
+
+ def test_batched_simple_producer__triggers_by_time(self):
+ start_offset0 = self.current_offset(self.topic, 0)
+ start_offset1 = self.current_offset(self.topic, 1)
+
+ producer = SimpleProducer(self.client,
+ batch_send=True,
+ batch_send_every_n=100,
+ batch_send_every_t=5)
+
+ # Send 5 messages and do a fetch
+ resp = producer.send_messages(self.topic,
+ self.msg("one"),
+ self.msg("two"),
+ self.msg("three"),
+ self.msg("four"),
+ )
+
+ # Batch mode is async. No ack
+ self.assertEquals(len(resp), 0)
+
+ # It hasn't sent yet
+ self.assert_fetch_offset(0, start_offset0, [])
+ self.assert_fetch_offset(1, start_offset1, [])
+
+ resp = producer.send_messages(self.topic,
+ self.msg("five"),
+ self.msg("six"),
+ self.msg("seven"),
+ )
+
+ # Batch mode is async. No ack
+ self.assertEquals(len(resp), 0)
+
+ # Wait the timeout out
+ time.sleep(5)
+
+ self.assert_fetch_offset(0, start_offset0, [
+ self.msg("one"),
+ self.msg("two"),
+ self.msg("three"),
+ self.msg("four"),
+ ])
- req2 = ProduceRequest(self.topic, 0, messages=[
- create_gzip_message(["Gzipped batch 2, message %d" % i for i in range(50000)])
+ self.assert_fetch_offset(1, start_offset1, [
+ self.msg("five"),
+ self.msg("six"),
+ self.msg("seven"),
])
- resp2 = self.client.send_produce_request([req2])
+ producer.stop()
+
+ def test_async_simple_producer(self):
+ start_offset0 = self.current_offset(self.topic, 0)
+ start_offset1 = self.current_offset(self.topic, 1)
+
+ producer = SimpleProducer(self.client, async=True)
+ resp = producer.send_messages(self.topic, self.msg("one"))
+ self.assertEquals(len(resp), 0)
+
+ self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ])
+
+ producer.stop()
+
+ def test_async_keyed_producer(self):
+ start_offset0 = self.current_offset(self.topic, 0)
+ start_offset1 = self.current_offset(self.topic, 1)
+
+ producer = KeyedProducer(self.client, partitioner = RoundRobinPartitioner, async=True)
+
+ resp = producer.send(self.topic, "key1", self.msg("one"))
+ self.assertEquals(len(resp), 0)
+
+ self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ])
+
+ producer.stop()
+
+ def assert_produce_request(self, messages, initial_offset, message_ct):
+ produce = ProduceRequest(self.topic, 0, messages=messages)
+
+ # There should only be one response message from the server.
+ # This will throw an exception if there's more than one.
+ resp = self.client.send_produce_request([ produce ])
+ self.assert_produce_response(resp, initial_offset)
+
+ self.assertEqual(self.current_offset(self.topic, 0), initial_offset + message_ct)
+
+ def assert_produce_response(self, resp, initial_offset):
+ self.assertEqual(len(resp), 1)
+ self.assertEqual(resp[0].error, 0)
+ self.assertEqual(resp[0].offset, initial_offset)
+
+ def assert_fetch_offset(self, partition, start_offset, expected_messages):
+ # There should only be one response message from the server.
+ # This will throw an exception if there's more than one.
+
+ resp, = self.client.send_fetch_request([ FetchRequest(self.topic, partition, start_offset, 1024) ])
- self.assertEqual(len(resp2), 1) # Only one response
- self.assertEqual(resp2[0].error, 0) # No error
- self.assertEqual(resp2[0].offset, start_offset+50000) # Initial offset of first message
+ self.assertEquals(resp.error, 0)
+ self.assertEquals(resp.partition, partition)
+ messages = [ x.message.value for x in resp.messages ]
- self.assertEqual(self.current_offset(self.topic, 0), start_offset+100000)
+ self.assertEqual(messages, expected_messages)
+ self.assertEquals(resp.highwaterMark, start_offset+len(expected_messages))