diff options
author | Viktor Shlapakov <vshlapakov@gmail.com> | 2015-06-17 16:11:37 +0300 |
---|---|---|
committer | Viktor Shlapakov <vshlapakov@gmail.com> | 2015-06-17 16:11:37 +0300 |
commit | 4df78c293f162c5b99f18c860af10ddafc0c8861 (patch) | |
tree | 0110e9af6f1dbecd8629695e4dca3081ec35dd56 /test | |
parent | 86772332474500ec251ff9bd5e4be761563e9aac (diff) | |
download | kafka-python-4df78c293f162c5b99f18c860af10ddafc0c8861.tar.gz |
Add KeyedProducer test with null payloads
Diffstat (limited to 'test')
-rw-r--r-- | test/test_producer_integration.py | 22 |
1 files changed, 22 insertions, 0 deletions
diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py index abf34c3..46b6851 100644 --- a/test/test_producer_integration.py +++ b/test/test_producer_integration.py @@ -341,6 +341,28 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): # KeyedProducer Tests # ############################ + @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.0") + def test_keyedproducer_null_payload(self): + partitions = self.client.get_partition_ids_for_topic(self.topic) + start_offsets = [self.current_offset(self.topic, p) for p in partitions] + + producer = KeyedProducer(self.client, partitioner=RoundRobinPartitioner) + key = "test" + + resp = producer.send_messages(self.topic, self.key("key1"), self.msg("one")) + self.assert_produce_response(resp, start_offsets[0]) + resp = producer.send_messages(self.topic, self.key("key2"), None) + self.assert_produce_response(resp, start_offsets[1]) + resp = producer.send_messages(self.topic, self.key("key3"), None) + self.assert_produce_response(resp, start_offsets[0]+1) + resp = producer.send_messages(self.topic, self.key("key4"), self.msg("four")) + self.assert_produce_response(resp, start_offsets[1]+1) + + self.assert_fetch_offset(partitions[0], start_offsets[0], [ self.msg("one"), None ]) + self.assert_fetch_offset(partitions[1], start_offsets[1], [ None, self.msg("four") ]) + + producer.stop() + @kafka_versions("all") def test_round_robin_partitioner(self): partitions = self.client.get_partition_ids_for_topic(self.topic) |