summaryrefslogtreecommitdiff
path: root/test/test_producer_integration.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_producer_integration.py')
-rw-r--r--test/test_producer_integration.py64
1 files changed, 45 insertions, 19 deletions
diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py
index a304e83..ca0da6a 100644
--- a/test/test_producer_integration.py
+++ b/test/test_producer_integration.py
@@ -15,7 +15,50 @@ from kafka.producer.base import Producer
from kafka.structs import FetchRequestPayload, ProduceRequestPayload
from test.fixtures import ZookeeperFixture, KafkaFixture
-from test.testutil import KafkaIntegrationTestCase, kafka_versions
+from test.testutil import KafkaIntegrationTestCase, kafka_versions, current_offset
+
+# TODO: This duplicates a TestKafkaProducerIntegration method temporarily
+# while the migration to pytest is in progress
+def assert_produce_request(client, topic, messages, initial_offset, message_ct,
+ partition=0):
+ """Verify the correctness of a produce request
+ """
+ produce = ProduceRequestPayload(topic, partition, messages=messages)
+
+ # There should only be one response message from the server.
+ # This will throw an exception if there's more than one.
+ resp = client.send_produce_request([produce])
+ assert_produce_response(resp, initial_offset)
+
+ assert current_offset(client, topic, partition) == initial_offset + message_ct
+
+def assert_produce_response(resp, initial_offset):
+ """Verify that a produce response is well-formed
+ """
+ assert len(resp) == 1
+ assert resp[0].error == 0
+ assert resp[0].offset == initial_offset
+
+def test_produce_many_simple(simple_client, topic):
+ """Test multiple produces using the SimpleClient
+ """
+ start_offset = current_offset(simple_client, topic, 0)
+
+ assert_produce_request(
+ simple_client, topic,
+ [create_message(("Test message %d" % i).encode('utf-8'))
+ for i in range(100)],
+ start_offset,
+ 100,
+ )
+
+ assert_produce_request(
+ simple_client, topic,
+ [create_message(("Test message %d" % i).encode('utf-8'))
+ for i in range(100)],
+ start_offset+100,
+ 100,
+ )
class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
@@ -26,7 +69,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
return
cls.zk = ZookeeperFixture.instance()
- cls.server = KafkaFixture.instance(0, cls.zk.host, cls.zk.port)
+ cls.server = KafkaFixture.instance(0, cls.zk)
@classmethod
def tearDownClass(cls): # noqa
@@ -36,23 +79,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
cls.server.close()
cls.zk.close()
- def test_produce_many_simple(self):
- start_offset = self.current_offset(self.topic, 0)
-
- self.assert_produce_request(
- [create_message(("Test message %d" % i).encode('utf-8'))
- for i in range(100)],
- start_offset,
- 100,
- )
-
- self.assert_produce_request(
- [create_message(("Test message %d" % i).encode('utf-8'))
- for i in range(100)],
- start_offset+100,
- 100,
- )
-
def test_produce_10k_simple(self):
start_offset = self.current_offset(self.topic, 0)