diff options
Diffstat (limited to 'test/test_producer_integration.py')
-rw-r--r-- | test/test_producer_integration.py | 64 |
1 files changed, 45 insertions, 19 deletions
diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py index a304e83..ca0da6a 100644 --- a/test/test_producer_integration.py +++ b/test/test_producer_integration.py @@ -15,7 +15,50 @@ from kafka.producer.base import Producer from kafka.structs import FetchRequestPayload, ProduceRequestPayload from test.fixtures import ZookeeperFixture, KafkaFixture -from test.testutil import KafkaIntegrationTestCase, kafka_versions +from test.testutil import KafkaIntegrationTestCase, kafka_versions, current_offset + +# TODO: This duplicates a TestKafkaProducerIntegration method temporarily +# while the migration to pytest is in progress +def assert_produce_request(client, topic, messages, initial_offset, message_ct, + partition=0): + """Verify the correctness of a produce request + """ + produce = ProduceRequestPayload(topic, partition, messages=messages) + + # There should only be one response message from the server. + # This will throw an exception if there's more than one. + resp = client.send_produce_request([produce]) + assert_produce_response(resp, initial_offset) + + assert current_offset(client, topic, partition) == initial_offset + message_ct + +def assert_produce_response(resp, initial_offset): + """Verify that a produce response is well-formed + """ + assert len(resp) == 1 + assert resp[0].error == 0 + assert resp[0].offset == initial_offset + +def test_produce_many_simple(simple_client, topic): + """Test multiple produces using the SimpleClient + """ + start_offset = current_offset(simple_client, topic, 0) + + assert_produce_request( + simple_client, topic, + [create_message(("Test message %d" % i).encode('utf-8')) + for i in range(100)], + start_offset, + 100, + ) + + assert_produce_request( + simple_client, topic, + [create_message(("Test message %d" % i).encode('utf-8')) + for i in range(100)], + start_offset+100, + 100, + ) class TestKafkaProducerIntegration(KafkaIntegrationTestCase): @@ -26,7 +69,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): return cls.zk = ZookeeperFixture.instance() - cls.server = KafkaFixture.instance(0, cls.zk.host, cls.zk.port) + cls.server = KafkaFixture.instance(0, cls.zk) @classmethod def tearDownClass(cls): # noqa @@ -36,23 +79,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): cls.server.close() cls.zk.close() - def test_produce_many_simple(self): - start_offset = self.current_offset(self.topic, 0) - - self.assert_produce_request( - [create_message(("Test message %d" % i).encode('utf-8')) - for i in range(100)], - start_offset, - 100, - ) - - self.assert_produce_request( - [create_message(("Test message %d" % i).encode('utf-8')) - for i in range(100)], - start_offset+100, - 100, - ) - def test_produce_10k_simple(self): start_offset = self.current_offset(self.topic, 0) |