diff options
Diffstat (limited to 'test/test_producer_integration.py')
-rw-r--r-- | test/test_producer_integration.py | 22 |
1 files changed, 22 insertions, 0 deletions
diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py index abf34c3..46b6851 100644 --- a/test/test_producer_integration.py +++ b/test/test_producer_integration.py @@ -341,6 +341,28 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): # KeyedProducer Tests # ############################ + @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.0") + def test_keyedproducer_null_payload(self): + partitions = self.client.get_partition_ids_for_topic(self.topic) + start_offsets = [self.current_offset(self.topic, p) for p in partitions] + + producer = KeyedProducer(self.client, partitioner=RoundRobinPartitioner) + key = "test" + + resp = producer.send_messages(self.topic, self.key("key1"), self.msg("one")) + self.assert_produce_response(resp, start_offsets[0]) + resp = producer.send_messages(self.topic, self.key("key2"), None) + self.assert_produce_response(resp, start_offsets[1]) + resp = producer.send_messages(self.topic, self.key("key3"), None) + self.assert_produce_response(resp, start_offsets[0]+1) + resp = producer.send_messages(self.topic, self.key("key4"), self.msg("four")) + self.assert_produce_response(resp, start_offsets[1]+1) + + self.assert_fetch_offset(partitions[0], start_offsets[0], [ self.msg("one"), None ]) + self.assert_fetch_offset(partitions[1], start_offsets[1], [ None, self.msg("four") ]) + + producer.stop() + @kafka_versions("all") def test_round_robin_partitioner(self): partitions = self.client.get_partition_ids_for_topic(self.topic) |