diff options
Diffstat (limited to 'test/test_producer_integration.py')
-rw-r--r-- | test/test_producer_integration.py | 529 |
1 files changed, 0 insertions, 529 deletions
diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py deleted file mode 100644 index 8f32cf8..0000000 --- a/test/test_producer_integration.py +++ /dev/null @@ -1,529 +0,0 @@ -import os -import time -import uuid - -import pytest -from kafka.vendor.six.moves import range - -from kafka import ( - SimpleProducer, KeyedProducer, - create_message, create_gzip_message, create_snappy_message, - RoundRobinPartitioner, HashedPartitioner -) -from kafka.codec import has_snappy -from kafka.errors import UnknownTopicOrPartitionError, LeaderNotAvailableError -from kafka.producer.base import Producer -from kafka.protocol.message import PartialMessage -from kafka.structs import FetchRequestPayload, ProduceRequestPayload - -from test.fixtures import ZookeeperFixture, KafkaFixture -from test.testutil import KafkaIntegrationTestCase, env_kafka_version, current_offset - - -# TODO: This duplicates a TestKafkaProducerIntegration method temporarily -# while the migration to pytest is in progress -def assert_produce_request(client, topic, messages, initial_offset, message_ct, - partition=0): - """Verify the correctness of a produce request - """ - produce = ProduceRequestPayload(topic, partition, messages=messages) - - # There should only be one response message from the server. - # This will throw an exception if there's more than one. - resp = client.send_produce_request([produce]) - assert_produce_response(resp, initial_offset) - - assert current_offset(client, topic, partition) == initial_offset + message_ct - - -def assert_produce_response(resp, initial_offset): - """Verify that a produce response is well-formed - """ - assert len(resp) == 1 - assert resp[0].error == 0 - assert resp[0].offset == initial_offset - - -@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") -def test_produce_many_simple(simple_client, topic): - """Test multiple produces using the SimpleClient - """ - start_offset = current_offset(simple_client, topic, 0) - - assert_produce_request( - simple_client, topic, - [create_message(("Test message %d" % i).encode('utf-8')) - for i in range(100)], - start_offset, - 100, - ) - - assert_produce_request( - simple_client, topic, - [create_message(("Test message %d" % i).encode('utf-8')) - for i in range(100)], - start_offset+100, - 100, - ) - - -class TestKafkaProducerIntegration(KafkaIntegrationTestCase): - - @classmethod - def setUpClass(cls): # noqa - if not os.environ.get('KAFKA_VERSION'): - return - - cls.zk = ZookeeperFixture.instance() - cls.server = KafkaFixture.instance(0, cls.zk) - - @classmethod - def tearDownClass(cls): # noqa - if not os.environ.get('KAFKA_VERSION'): - return - - cls.server.close() - cls.zk.close() - - def test_produce_10k_simple(self): - start_offset = self.current_offset(self.topic, 0) - - self.assert_produce_request( - [create_message(("Test message %d" % i).encode('utf-8')) - for i in range(10000)], - start_offset, - 10000, - ) - - def test_produce_many_gzip(self): - start_offset = self.current_offset(self.topic, 0) - - message1 = create_gzip_message([ - (("Gzipped 1 %d" % i).encode('utf-8'), None) for i in range(100)]) - message2 = create_gzip_message([ - (("Gzipped 2 %d" % i).encode('utf-8'), None) for i in range(100)]) - - self.assert_produce_request( - [ message1, message2 ], - start_offset, - 200, - ) - - def test_produce_many_snappy(self): - self.skipTest("All snappy integration tests fail with nosnappyjava") - start_offset = self.current_offset(self.topic, 0) - - self.assert_produce_request([ - create_snappy_message([("Snappy 1 %d" % i, None) for i in range(100)]), - create_snappy_message([("Snappy 2 %d" % i, None) for i in range(100)]), - ], - start_offset, - 200, - ) - - def test_produce_mixed(self): - start_offset = self.current_offset(self.topic, 0) - - msg_count = 1+100 - messages = [ - create_message(b"Just a plain message"), - create_gzip_message([ - (("Gzipped %d" % i).encode('utf-8'), None) for i in range(100)]), - ] - - # All snappy integration tests fail with nosnappyjava - if False and has_snappy(): - msg_count += 100 - messages.append(create_snappy_message([("Snappy %d" % i, None) for i in range(100)])) - - self.assert_produce_request(messages, start_offset, msg_count) - - def test_produce_100k_gzipped(self): - start_offset = self.current_offset(self.topic, 0) - - self.assert_produce_request([ - create_gzip_message([ - (("Gzipped batch 1, message %d" % i).encode('utf-8'), None) - for i in range(50000)]) - ], - start_offset, - 50000, - ) - - self.assert_produce_request([ - create_gzip_message([ - (("Gzipped batch 1, message %d" % i).encode('utf-8'), None) - for i in range(50000)]) - ], - start_offset+50000, - 50000, - ) - - ############################ - # SimpleProducer Tests # - ############################ - - def test_simple_producer_new_topic(self): - producer = SimpleProducer(self.client) - resp = producer.send_messages('new_topic', self.msg('foobar')) - self.assert_produce_response(resp, 0) - producer.stop() - - def test_simple_producer(self): - partitions = self.client.get_partition_ids_for_topic(self.topic) - start_offsets = [self.current_offset(self.topic, p) for p in partitions] - - producer = SimpleProducer(self.client, random_start=False) - - # Goes to first partition, randomly. - resp = producer.send_messages(self.topic, self.msg("one"), self.msg("two")) - self.assert_produce_response(resp, start_offsets[0]) - - # Goes to the next partition, randomly. - resp = producer.send_messages(self.topic, self.msg("three")) - self.assert_produce_response(resp, start_offsets[1]) - - self.assert_fetch_offset(partitions[0], start_offsets[0], [ self.msg("one"), self.msg("two") ]) - self.assert_fetch_offset(partitions[1], start_offsets[1], [ self.msg("three") ]) - - # Goes back to the first partition because there's only two partitions - resp = producer.send_messages(self.topic, self.msg("four"), self.msg("five")) - self.assert_produce_response(resp, start_offsets[0]+2) - self.assert_fetch_offset(partitions[0], start_offsets[0], [ self.msg("one"), self.msg("two"), self.msg("four"), self.msg("five") ]) - - producer.stop() - - def test_producer_random_order(self): - producer = SimpleProducer(self.client, random_start=True) - resp1 = producer.send_messages(self.topic, self.msg("one"), self.msg("two")) - resp2 = producer.send_messages(self.topic, self.msg("three")) - resp3 = producer.send_messages(self.topic, self.msg("four"), self.msg("five")) - - self.assertEqual(resp1[0].partition, resp3[0].partition) - self.assertNotEqual(resp1[0].partition, resp2[0].partition) - - def test_producer_ordered_start(self): - producer = SimpleProducer(self.client, random_start=False) - resp1 = producer.send_messages(self.topic, self.msg("one"), self.msg("two")) - resp2 = producer.send_messages(self.topic, self.msg("three")) - resp3 = producer.send_messages(self.topic, self.msg("four"), self.msg("five")) - - self.assertEqual(resp1[0].partition, 0) - self.assertEqual(resp2[0].partition, 1) - self.assertEqual(resp3[0].partition, 0) - - def test_async_simple_producer(self): - partition = self.client.get_partition_ids_for_topic(self.topic)[0] - start_offset = self.current_offset(self.topic, partition) - - producer = SimpleProducer(self.client, async_send=True, random_start=False) - resp = producer.send_messages(self.topic, self.msg("one")) - self.assertEqual(len(resp), 0) - - # flush messages - producer.stop() - - self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ]) - - - def test_batched_simple_producer__triggers_by_message(self): - partitions = self.client.get_partition_ids_for_topic(self.topic) - start_offsets = [self.current_offset(self.topic, p) for p in partitions] - - # Configure batch producer - batch_messages = 5 - batch_interval = 5 - producer = SimpleProducer( - self.client, - async_send=True, - batch_send_every_n=batch_messages, - batch_send_every_t=batch_interval, - random_start=False) - - # Send 4 messages -- should not trigger a batch - resp = producer.send_messages( - self.topic, - self.msg("one"), - self.msg("two"), - self.msg("three"), - self.msg("four"), - ) - - # Batch mode is async. No ack - self.assertEqual(len(resp), 0) - - # It hasn't sent yet - self.assert_fetch_offset(partitions[0], start_offsets[0], []) - self.assert_fetch_offset(partitions[1], start_offsets[1], []) - - # send 3 more messages -- should trigger batch on first 5 - resp = producer.send_messages( - self.topic, - self.msg("five"), - self.msg("six"), - self.msg("seven"), - ) - - # Batch mode is async. No ack - self.assertEqual(len(resp), 0) - - # Wait until producer has pulled all messages from internal queue - # this should signal that the first batch was sent, and the producer - # is now waiting for enough messages to batch again (or a timeout) - timeout = 5 - start = time.time() - while not producer.queue.empty(): - if time.time() - start > timeout: - self.fail('timeout waiting for producer queue to empty') - time.sleep(0.1) - - # send messages groups all *msgs in a single call to the same partition - # so we should see all messages from the first call in one partition - self.assert_fetch_offset(partitions[0], start_offsets[0], [ - self.msg("one"), - self.msg("two"), - self.msg("three"), - self.msg("four"), - ]) - - # Because we are batching every 5 messages, we should only see one - self.assert_fetch_offset(partitions[1], start_offsets[1], [ - self.msg("five"), - ]) - - producer.stop() - - def test_batched_simple_producer__triggers_by_time(self): - self.skipTest("Flakey test -- should be refactored or removed") - partitions = self.client.get_partition_ids_for_topic(self.topic) - start_offsets = [self.current_offset(self.topic, p) for p in partitions] - - batch_interval = 5 - producer = SimpleProducer( - self.client, - async_send=True, - batch_send_every_n=100, - batch_send_every_t=batch_interval, - random_start=False) - - # Send 5 messages and do a fetch - resp = producer.send_messages( - self.topic, - self.msg("one"), - self.msg("two"), - self.msg("three"), - self.msg("four"), - ) - - # Batch mode is async. No ack - self.assertEqual(len(resp), 0) - - # It hasn't sent yet - self.assert_fetch_offset(partitions[0], start_offsets[0], []) - self.assert_fetch_offset(partitions[1], start_offsets[1], []) - - resp = producer.send_messages(self.topic, - self.msg("five"), - self.msg("six"), - self.msg("seven"), - ) - - # Batch mode is async. No ack - self.assertEqual(len(resp), 0) - - # Wait the timeout out - time.sleep(batch_interval) - - self.assert_fetch_offset(partitions[0], start_offsets[0], [ - self.msg("one"), - self.msg("two"), - self.msg("three"), - self.msg("four"), - ]) - - self.assert_fetch_offset(partitions[1], start_offsets[1], [ - self.msg("five"), - self.msg("six"), - self.msg("seven"), - ]) - - producer.stop() - - - ############################ - # KeyedProducer Tests # - ############################ - - @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") - def test_keyedproducer_null_payload(self): - partitions = self.client.get_partition_ids_for_topic(self.topic) - start_offsets = [self.current_offset(self.topic, p) for p in partitions] - - producer = KeyedProducer(self.client, partitioner=RoundRobinPartitioner) - key = "test" - - resp = producer.send_messages(self.topic, self.key("key1"), self.msg("one")) - self.assert_produce_response(resp, start_offsets[0]) - resp = producer.send_messages(self.topic, self.key("key2"), None) - self.assert_produce_response(resp, start_offsets[1]) - resp = producer.send_messages(self.topic, self.key("key3"), None) - self.assert_produce_response(resp, start_offsets[0]+1) - resp = producer.send_messages(self.topic, self.key("key4"), self.msg("four")) - self.assert_produce_response(resp, start_offsets[1]+1) - - self.assert_fetch_offset(partitions[0], start_offsets[0], [ self.msg("one"), None ]) - self.assert_fetch_offset(partitions[1], start_offsets[1], [ None, self.msg("four") ]) - - producer.stop() - - def test_round_robin_partitioner(self): - partitions = self.client.get_partition_ids_for_topic(self.topic) - start_offsets = [self.current_offset(self.topic, p) for p in partitions] - - producer = KeyedProducer(self.client, partitioner=RoundRobinPartitioner) - resp1 = producer.send_messages(self.topic, self.key("key1"), self.msg("one")) - resp2 = producer.send_messages(self.topic, self.key("key2"), self.msg("two")) - resp3 = producer.send_messages(self.topic, self.key("key3"), self.msg("three")) - resp4 = producer.send_messages(self.topic, self.key("key4"), self.msg("four")) - - self.assert_produce_response(resp1, start_offsets[0]+0) - self.assert_produce_response(resp2, start_offsets[1]+0) - self.assert_produce_response(resp3, start_offsets[0]+1) - self.assert_produce_response(resp4, start_offsets[1]+1) - - self.assert_fetch_offset(partitions[0], start_offsets[0], [ self.msg("one"), self.msg("three") ]) - self.assert_fetch_offset(partitions[1], start_offsets[1], [ self.msg("two"), self.msg("four") ]) - - producer.stop() - - def test_hashed_partitioner(self): - partitions = self.client.get_partition_ids_for_topic(self.topic) - start_offsets = [self.current_offset(self.topic, p) for p in partitions] - - producer = KeyedProducer(self.client, partitioner=HashedPartitioner) - resp1 = producer.send_messages(self.topic, self.key("1"), self.msg("one")) - resp2 = producer.send_messages(self.topic, self.key("2"), self.msg("two")) - resp3 = producer.send_messages(self.topic, self.key("3"), self.msg("three")) - resp4 = producer.send_messages(self.topic, self.key("3"), self.msg("four")) - resp5 = producer.send_messages(self.topic, self.key("4"), self.msg("five")) - - offsets = {partitions[0]: start_offsets[0], partitions[1]: start_offsets[1]} - messages = {partitions[0]: [], partitions[1]: []} - - keys = [self.key(k) for k in ["1", "2", "3", "3", "4"]] - resps = [resp1, resp2, resp3, resp4, resp5] - msgs = [self.msg(m) for m in ["one", "two", "three", "four", "five"]] - - for key, resp, msg in zip(keys, resps, msgs): - k = hash(key) % 2 - partition = partitions[k] - offset = offsets[partition] - self.assert_produce_response(resp, offset) - offsets[partition] += 1 - messages[partition].append(msg) - - self.assert_fetch_offset(partitions[0], start_offsets[0], messages[partitions[0]]) - self.assert_fetch_offset(partitions[1], start_offsets[1], messages[partitions[1]]) - - producer.stop() - - def test_async_keyed_producer(self): - partition = self.client.get_partition_ids_for_topic(self.topic)[0] - start_offset = self.current_offset(self.topic, partition) - - producer = KeyedProducer(self.client, - partitioner=RoundRobinPartitioner, - async_send=True, - batch_send_every_t=1) - - resp = producer.send_messages(self.topic, self.key("key1"), self.msg("one")) - self.assertEqual(len(resp), 0) - - # wait for the server to report a new highwatermark - while self.current_offset(self.topic, partition) == start_offset: - time.sleep(0.1) - - self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ]) - - producer.stop() - - ############################ - # Producer ACK Tests # - ############################ - - def test_acks_none(self): - partition = self.client.get_partition_ids_for_topic(self.topic)[0] - start_offset = self.current_offset(self.topic, partition) - - producer = Producer( - self.client, - req_acks=Producer.ACK_NOT_REQUIRED, - ) - resp = producer.send_messages(self.topic, partition, self.msg("one")) - - # No response from produce request with no acks required - self.assertEqual(len(resp), 0) - - # But the message should still have been delivered - self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ]) - producer.stop() - - def test_acks_local_write(self): - partition = self.client.get_partition_ids_for_topic(self.topic)[0] - start_offset = self.current_offset(self.topic, partition) - - producer = Producer( - self.client, - req_acks=Producer.ACK_AFTER_LOCAL_WRITE, - ) - resp = producer.send_messages(self.topic, partition, self.msg("one")) - - self.assert_produce_response(resp, start_offset) - self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ]) - - producer.stop() - - def test_acks_cluster_commit(self): - partition = self.client.get_partition_ids_for_topic(self.topic)[0] - start_offset = self.current_offset(self.topic, partition) - - producer = Producer( - self.client, - req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT, - ) - - resp = producer.send_messages(self.topic, partition, self.msg("one")) - self.assert_produce_response(resp, start_offset) - self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ]) - - producer.stop() - - def assert_produce_request(self, messages, initial_offset, message_ct, - partition=0): - produce = ProduceRequestPayload(self.topic, partition, messages=messages) - - # There should only be one response message from the server. - # This will throw an exception if there's more than one. - resp = self.client.send_produce_request([ produce ]) - self.assert_produce_response(resp, initial_offset) - - self.assertEqual(self.current_offset(self.topic, partition), initial_offset + message_ct) - - def assert_produce_response(self, resp, initial_offset): - self.assertEqual(len(resp), 1) - self.assertEqual(resp[0].error, 0) - self.assertEqual(resp[0].offset, initial_offset) - - def assert_fetch_offset(self, partition, start_offset, expected_messages): - # There should only be one response message from the server. - # This will throw an exception if there's more than one. - - resp, = self.client.send_fetch_request([FetchRequestPayload(self.topic, partition, start_offset, 1024)]) - - self.assertEqual(resp.error, 0) - self.assertEqual(resp.partition, partition) - messages = [ x.message.value for x in resp.messages - if not isinstance(x.message, PartialMessage) ] - - self.assertEqual(messages, expected_messages) - self.assertEqual(resp.highwaterMark, start_offset+len(expected_messages)) |