diff options
author | Dana Powers <dana.powers@gmail.com> | 2015-12-02 12:49:49 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2015-12-02 12:49:49 -0800 |
commit | 0d41e6b07ea16b1074d3a63d521747361f1c7145 (patch) | |
tree | 1889ef033e0f0270252bc608b466add391ed42bc | |
parent | e99a934bab1d551d07dd0c6365f6a730028489f3 (diff) | |
parent | 3fbd1cc80965a0f49bfd6c505a1c021af95354fb (diff) | |
download | kafka-python-0d41e6b07ea16b1074d3a63d521747361f1c7145.tar.gz |
Merge pull request #409 from scrapinghub/feature-allow-null-payload
Allow null payload for deletion feature
-rw-r--r-- | kafka/producer/base.py | 12 | ||||
-rw-r--r-- | test/test_producer.py | 24 | ||||
-rw-r--r-- | test/test_producer_integration.py | 22 |
3 files changed, 53 insertions, 5 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 3c826cd..d5c013a 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -355,9 +355,15 @@ class Producer(object): if not isinstance(msg, (list, tuple)): raise TypeError("msg is not a list or tuple!") - # Raise TypeError if any message is not encoded as bytes - if any(not isinstance(m, six.binary_type) for m in msg): - raise TypeError("all produce message payloads must be type bytes") + for m in msg: + # The protocol allows to have key & payload with null values both, + # (https://goo.gl/o694yN) but having (null,null) pair doesn't make sense. + if m is None: + if key is None: + raise TypeError("key and payload can't be null in one") + # Raise TypeError if any non-null message is not encoded as bytes + elif not isinstance(m, six.binary_type): + raise TypeError("all produce message payloads must be null or type bytes") # Raise TypeError if topic is not encoded as bytes if not isinstance(topic, six.binary_type): diff --git a/test/test_producer.py b/test/test_producer.py index 27272f6..e681e43 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -7,7 +7,7 @@ import time from mock import MagicMock, patch from . import unittest -from kafka import KafkaClient, SimpleProducer +from kafka import KafkaClient, SimpleProducer, KeyedProducer from kafka.common import ( AsyncProducerQueueFull, FailedPayloadsError, NotLeaderForPartitionError, ProduceResponse, RetryOptions, TopicAndPartition @@ -33,7 +33,8 @@ class TestKafkaProducer(unittest.TestCase): topic = b"test-topic" partition = 0 - bad_data_types = (u'你怎么样?', 12, ['a', 'list'], ('a', 'tuple'), {'a': 'dict'}) + bad_data_types = (u'你怎么样?', 12, ['a', 'list'], + ('a', 'tuple'), {'a': 'dict'}, None,) for m in bad_data_types: with self.assertRaises(TypeError): logging.debug("attempting to send message of type %s", type(m)) @@ -44,6 +45,25 @@ class TestKafkaProducer(unittest.TestCase): # This should not raise an exception producer.send_messages(topic, partition, m) + def test_keyedproducer_message_types(self): + client = MagicMock() + client.get_partition_ids_for_topic.return_value = [0, 1] + producer = KeyedProducer(client) + topic = b"test-topic" + key = b"testkey" + + bad_data_types = (u'你怎么样?', 12, ['a', 'list'], + ('a', 'tuple'), {'a': 'dict'},) + for m in bad_data_types: + with self.assertRaises(TypeError): + logging.debug("attempting to send message of type %s", type(m)) + producer.send_messages(topic, key, m) + + good_data_types = (b'a string!', None,) + for m in good_data_types: + # This should not raise an exception + producer.send_messages(topic, key, m) + def test_topic_message_types(self): client = MagicMock() diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py index abf34c3..46b6851 100644 --- a/test/test_producer_integration.py +++ b/test/test_producer_integration.py @@ -341,6 +341,28 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): # KeyedProducer Tests # ############################ + @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.0") + def test_keyedproducer_null_payload(self): + partitions = self.client.get_partition_ids_for_topic(self.topic) + start_offsets = [self.current_offset(self.topic, p) for p in partitions] + + producer = KeyedProducer(self.client, partitioner=RoundRobinPartitioner) + key = "test" + + resp = producer.send_messages(self.topic, self.key("key1"), self.msg("one")) + self.assert_produce_response(resp, start_offsets[0]) + resp = producer.send_messages(self.topic, self.key("key2"), None) + self.assert_produce_response(resp, start_offsets[1]) + resp = producer.send_messages(self.topic, self.key("key3"), None) + self.assert_produce_response(resp, start_offsets[0]+1) + resp = producer.send_messages(self.topic, self.key("key4"), self.msg("four")) + self.assert_produce_response(resp, start_offsets[1]+1) + + self.assert_fetch_offset(partitions[0], start_offsets[0], [ self.msg("one"), None ]) + self.assert_fetch_offset(partitions[1], start_offsets[1], [ None, self.msg("four") ]) + + producer.stop() + @kafka_versions("all") def test_round_robin_partitioner(self): partitions = self.client.get_partition_ids_for_topic(self.topic) |