diff options
Diffstat (limited to 'test/test_producer_integration.py')
| -rw-r--r-- | test/test_producer_integration.py | 281 | 
1 files changed, 160 insertions, 121 deletions
| diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py index d5831d3..38df69f 100644 --- a/test/test_producer_integration.py +++ b/test/test_producer_integration.py @@ -14,6 +14,7 @@ from kafka.common import (      FetchRequest, ProduceRequest,      UnknownTopicOrPartitionError, LeaderNotAvailableError  ) +from kafka.producer.base import Producer  from test.fixtures import ZookeeperFixture, KafkaFixture  from test.testutil import KafkaIntegrationTestCase, kafka_versions @@ -139,25 +140,26 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):      @kafka_versions("all")      def test_simple_producer(self): -        start_offset0 = self.current_offset(self.topic, 0) -        start_offset1 = self.current_offset(self.topic, 1) +        partitions = self.client.get_partition_ids_for_topic(self.topic) +        start_offsets = [self.current_offset(self.topic, p) for p in partitions] +          producer = SimpleProducer(self.client, random_start=False)          # Goes to first partition, randomly.          resp = producer.send_messages(self.topic, self.msg("one"), self.msg("two")) -        self.assert_produce_response(resp, start_offset0) +        self.assert_produce_response(resp, start_offsets[0])          # Goes to the next partition, randomly.          resp = producer.send_messages(self.topic, self.msg("three")) -        self.assert_produce_response(resp, start_offset1) +        self.assert_produce_response(resp, start_offsets[1]) -        self.assert_fetch_offset(0, start_offset0, [ self.msg("one"), self.msg("two") ]) -        self.assert_fetch_offset(1, start_offset1, [ self.msg("three") ]) +        self.assert_fetch_offset(partitions[0], start_offsets[0], [ self.msg("one"), self.msg("two") ]) +        self.assert_fetch_offset(partitions[1], start_offsets[1], [ self.msg("three") ])          # Goes back to the first partition because there's only two partitions          resp = producer.send_messages(self.topic, self.msg("four"), self.msg("five")) -        self.assert_produce_response(resp, start_offset0+2) -        self.assert_fetch_offset(0, start_offset0, [ self.msg("one"), self.msg("two"), self.msg("four"), self.msg("five") ]) +        self.assert_produce_response(resp, start_offsets[0]+2) +        self.assert_fetch_offset(partitions[0], start_offsets[0], [ self.msg("one"), self.msg("two"), self.msg("four"), self.msg("five") ])          producer.stop() @@ -193,110 +195,38 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):          self.assertEqual(resp3[0].partition, 0)      @kafka_versions("all") -    def test_round_robin_partitioner(self): -        start_offset0 = self.current_offset(self.topic, 0) -        start_offset1 = self.current_offset(self.topic, 1) - -        producer = KeyedProducer(self.client, partitioner=RoundRobinPartitioner) -        resp1 = producer.send(self.topic, self.key("key1"), self.msg("one")) -        resp2 = producer.send(self.topic, self.key("key2"), self.msg("two")) -        resp3 = producer.send(self.topic, self.key("key3"), self.msg("three")) -        resp4 = producer.send(self.topic, self.key("key4"), self.msg("four")) - -        self.assert_produce_response(resp1, start_offset0+0) -        self.assert_produce_response(resp2, start_offset1+0) -        self.assert_produce_response(resp3, start_offset0+1) -        self.assert_produce_response(resp4, start_offset1+1) - -        self.assert_fetch_offset(0, start_offset0, [ self.msg("one"), self.msg("three") ]) -        self.assert_fetch_offset(1, start_offset1, [ self.msg("two"), self.msg("four")  ]) - -        producer.stop() - -    @kafka_versions("all") -    def test_hashed_partitioner(self): -        start_offset0 = self.current_offset(self.topic, 0) -        start_offset1 = self.current_offset(self.topic, 1) - -        producer = KeyedProducer(self.client, partitioner=HashedPartitioner) -        resp1 = producer.send(self.topic, self.key("1"), self.msg("one")) -        resp2 = producer.send(self.topic, self.key("2"), self.msg("two")) -        resp3 = producer.send(self.topic, self.key("3"), self.msg("three")) -        resp4 = producer.send(self.topic, self.key("3"), self.msg("four")) -        resp5 = producer.send(self.topic, self.key("4"), self.msg("five")) - -        offsets = {0: start_offset0, 1: start_offset1} -        messages = {0: [], 1: []} - -        keys = [self.key(k) for k in ["1", "2", "3", "3", "4"]] -        resps = [resp1, resp2, resp3, resp4, resp5] -        msgs = [self.msg(m) for m in ["one", "two", "three", "four", "five"]] - -        for key, resp, msg in zip(keys, resps, msgs): -            k = hash(key) % 2 -            offset = offsets[k] -            self.assert_produce_response(resp, offset) -            offsets[k] += 1 -            messages[k].append(msg) - -        self.assert_fetch_offset(0, start_offset0, messages[0]) -        self.assert_fetch_offset(1, start_offset1, messages[1]) - -        producer.stop() - -    @kafka_versions("all") -    def test_acks_none(self): -        start_offset0 = self.current_offset(self.topic, 0) +    def test_async_simple_producer(self): +        partition = self.client.get_partition_ids_for_topic(self.topic)[0] +        start_offset = self.current_offset(self.topic, partition) -        producer = SimpleProducer(self.client, req_acks=SimpleProducer.ACK_NOT_REQUIRED, -            random_start=False) +        producer = SimpleProducer(self.client, async=True, random_start=False)          resp = producer.send_messages(self.topic, self.msg("one"))          self.assertEqual(len(resp), 0) -        self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ]) -        producer.stop() - -    @kafka_versions("all") -    def test_acks_local_write(self): -        start_offset0 = self.current_offset(self.topic, 0) - -        producer = SimpleProducer(self.client, req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE, -            random_start=False) -        resp = producer.send_messages(self.topic, self.msg("one")) - -        self.assert_produce_response(resp, start_offset0) -        self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ]) +        # wait for the server to report a new highwatermark +        while self.current_offset(self.topic, partition) == start_offset: +          time.sleep(0.1) -        producer.stop() - -    @kafka_versions("all") -    def test_acks_cluster_commit(self): -        start_offset0 = self.current_offset(self.topic, 0) - -        producer = SimpleProducer( -            self.client, -            req_acks=SimpleProducer.ACK_AFTER_CLUSTER_COMMIT, -            random_start=False) - -        resp = producer.send_messages(self.topic, self.msg("one")) -        self.assert_produce_response(resp, start_offset0) -        self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ]) +        self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ])          producer.stop()      @kafka_versions("all")      def test_batched_simple_producer__triggers_by_message(self): -        start_offset0 = self.current_offset(self.topic, 0) -        start_offset1 = self.current_offset(self.topic, 1) +        partitions = self.client.get_partition_ids_for_topic(self.topic) +        start_offsets = [self.current_offset(self.topic, p) for p in partitions] +        # Configure batch producer +        batch_messages = 5 +        batch_interval = 5          producer = SimpleProducer(              self.client,              batch_send=True, -            batch_send_every_n=5, -            batch_send_every_t=20, +            batch_send_every_n=batch_messages, +            batch_send_every_t=batch_interval,              random_start=False) -        # Send 5 messages and do a fetch +        # Send 4 messages -- should not trigger a batch          resp = producer.send_messages(self.topic,              self.msg("one"),              self.msg("two"), @@ -308,9 +238,10 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):          self.assertEqual(len(resp), 0)          # It hasn't sent yet -        self.assert_fetch_offset(0, start_offset0, []) -        self.assert_fetch_offset(1, start_offset1, []) +        self.assert_fetch_offset(partitions[0], start_offsets[0], []) +        self.assert_fetch_offset(partitions[1], start_offsets[1], []) +        # send 3 more messages -- should trigger batch on first 5          resp = producer.send_messages(self.topic,              self.msg("five"),              self.msg("six"), @@ -320,30 +251,32 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):          # Batch mode is async. No ack          self.assertEqual(len(resp), 0) -        self.assert_fetch_offset(0, start_offset0, [ +        # send messages groups all *msgs in a single call to the same partition +        # so we should see all messages from the first call in one partition +        self.assert_fetch_offset(partitions[0], start_offsets[0], [              self.msg("one"),              self.msg("two"),              self.msg("three"),              self.msg("four"),          ]) -        self.assert_fetch_offset(1, start_offset1, [ +        # Because we are batching every 5 messages, we should only see one +        self.assert_fetch_offset(partitions[1], start_offsets[1], [              self.msg("five"), -        #    self.msg("six"), -        #    self.msg("seven"),          ])          producer.stop()      @kafka_versions("all")      def test_batched_simple_producer__triggers_by_time(self): -        start_offset0 = self.current_offset(self.topic, 0) -        start_offset1 = self.current_offset(self.topic, 1) +        partitions = self.client.get_partition_ids_for_topic(self.topic) +        start_offsets = [self.current_offset(self.topic, p) for p in partitions] +        batch_interval = 5          producer = SimpleProducer(self.client,              batch_send=True,              batch_send_every_n=100, -            batch_send_every_t=5, +            batch_send_every_t=batch_interval,              random_start=False)          # Send 5 messages and do a fetch @@ -358,8 +291,8 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):          self.assertEqual(len(resp), 0)          # It hasn't sent yet -        self.assert_fetch_offset(0, start_offset0, []) -        self.assert_fetch_offset(1, start_offset1, []) +        self.assert_fetch_offset(partitions[0], start_offsets[0], []) +        self.assert_fetch_offset(partitions[1], start_offsets[1], [])          resp = producer.send_messages(self.topic,              self.msg("five"), @@ -371,16 +304,16 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):          self.assertEqual(len(resp), 0)          # Wait the timeout out -        time.sleep(5) +        time.sleep(batch_interval) -        self.assert_fetch_offset(0, start_offset0, [ +        self.assert_fetch_offset(partitions[0], start_offsets[0], [              self.msg("one"),              self.msg("two"),              self.msg("three"),              self.msg("four"),          ]) -        self.assert_fetch_offset(1, start_offset1, [ +        self.assert_fetch_offset(partitions[1], start_offsets[1], [              self.msg("five"),              self.msg("six"),              self.msg("seven"), @@ -388,40 +321,146 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):          producer.stop() + +    ############################ +    #   KeyedProducer Tests    # +    ############################ +      @kafka_versions("all") -    def test_async_simple_producer(self): -        start_offset0 = self.current_offset(self.topic, 0) +    def test_round_robin_partitioner(self): +        partitions = self.client.get_partition_ids_for_topic(self.topic) +        start_offsets = [self.current_offset(self.topic, p) for p in partitions] -        producer = SimpleProducer(self.client, async=True, random_start=False) -        resp = producer.send_messages(self.topic, self.msg("one")) -        self.assertEqual(len(resp), 0) +        producer = KeyedProducer(self.client, partitioner=RoundRobinPartitioner) +        resp1 = producer.send(self.topic, self.key("key1"), self.msg("one")) +        resp2 = producer.send(self.topic, self.key("key2"), self.msg("two")) +        resp3 = producer.send(self.topic, self.key("key3"), self.msg("three")) +        resp4 = producer.send(self.topic, self.key("key4"), self.msg("four")) + +        self.assert_produce_response(resp1, start_offsets[0]+0) +        self.assert_produce_response(resp2, start_offsets[1]+0) +        self.assert_produce_response(resp3, start_offsets[0]+1) +        self.assert_produce_response(resp4, start_offsets[1]+1) + +        self.assert_fetch_offset(partitions[0], start_offsets[0], [ self.msg("one"), self.msg("three") ]) +        self.assert_fetch_offset(partitions[1], start_offsets[1], [ self.msg("two"), self.msg("four")  ]) + +        producer.stop() + +    @kafka_versions("all") +    def test_hashed_partitioner(self): +        partitions = self.client.get_partition_ids_for_topic(self.topic) +        start_offsets = [self.current_offset(self.topic, p) for p in partitions] + +        producer = KeyedProducer(self.client, partitioner=HashedPartitioner) +        resp1 = producer.send(self.topic, self.key("1"), self.msg("one")) +        resp2 = producer.send(self.topic, self.key("2"), self.msg("two")) +        resp3 = producer.send(self.topic, self.key("3"), self.msg("three")) +        resp4 = producer.send(self.topic, self.key("3"), self.msg("four")) +        resp5 = producer.send(self.topic, self.key("4"), self.msg("five")) + +        offsets = {partitions[0]: start_offsets[0], partitions[1]: start_offsets[1]} +        messages = {partitions[0]: [], partitions[1]: []} + +        keys = [self.key(k) for k in ["1", "2", "3", "3", "4"]] +        resps = [resp1, resp2, resp3, resp4, resp5] +        msgs = [self.msg(m) for m in ["one", "two", "three", "four", "five"]] + +        for key, resp, msg in zip(keys, resps, msgs): +            k = hash(key) % 2 +            partition = partitions[k] +            offset = offsets[partition] +            self.assert_produce_response(resp, offset) +            offsets[partition] += 1 +            messages[partition].append(msg) -        self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ]) +        self.assert_fetch_offset(partitions[0], start_offsets[0], messages[partitions[0]]) +        self.assert_fetch_offset(partitions[1], start_offsets[1], messages[partitions[1]])          producer.stop()      @kafka_versions("all")      def test_async_keyed_producer(self): -        start_offset0 = self.current_offset(self.topic, 0) +        partition = self.client.get_partition_ids_for_topic(self.topic)[0] +        start_offset = self.current_offset(self.topic, partition)          producer = KeyedProducer(self.client, partitioner = RoundRobinPartitioner, async=True)          resp = producer.send(self.topic, self.key("key1"), self.msg("one"))          self.assertEqual(len(resp), 0) -        self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ]) +        # wait for the server to report a new highwatermark +        while self.current_offset(self.topic, partition) == start_offset: +          time.sleep(0.1) + +        self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ]) + +        producer.stop() + +    ############################ +    #   Producer ACK Tests     # +    ############################ + +    @kafka_versions("all") +    def test_acks_none(self): +        partition = self.client.get_partition_ids_for_topic(self.topic)[0] +        start_offset = self.current_offset(self.topic, partition) + +        producer = Producer( +            self.client, +            req_acks=Producer.ACK_NOT_REQUIRED, +        ) +        resp = producer.send_messages(self.topic, partition, self.msg("one")) + +        # No response from produce request with no acks required +        self.assertEqual(len(resp), 0) + +        # But the message should still have been delivered +        self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ]) +        producer.stop() + +    @kafka_versions("all") +    def test_acks_local_write(self): +        partition = self.client.get_partition_ids_for_topic(self.topic)[0] +        start_offset = self.current_offset(self.topic, partition) + +        producer = Producer( +            self.client, +            req_acks=Producer.ACK_AFTER_LOCAL_WRITE, +        ) +        resp = producer.send_messages(self.topic, partition, self.msg("one")) + +        self.assert_produce_response(resp, start_offset) +        self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ]) + +        producer.stop() + +    @kafka_versions("all") +    def test_acks_cluster_commit(self): +        partition = self.client.get_partition_ids_for_topic(self.topic)[0] +        start_offset = self.current_offset(self.topic, partition) + +        producer = Producer( +            self.client, +            req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT, +        ) + +        resp = producer.send_messages(self.topic, partition, self.msg("one")) +        self.assert_produce_response(resp, start_offset) +        self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ])          producer.stop() -    def assert_produce_request(self, messages, initial_offset, message_ct): -        produce = ProduceRequest(self.topic, 0, messages=messages) +    def assert_produce_request(self, messages, initial_offset, message_ct, +                               partition=0): +        produce = ProduceRequest(self.topic, partition, messages=messages)          # There should only be one response message from the server.          # This will throw an exception if there's more than one.          resp = self.client.send_produce_request([ produce ])          self.assert_produce_response(resp, initial_offset) -        self.assertEqual(self.current_offset(self.topic, 0), initial_offset + message_ct) +        self.assertEqual(self.current_offset(self.topic, partition), initial_offset + message_ct)      def assert_produce_response(self, resp, initial_offset):          self.assertEqual(len(resp), 1) | 
