summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2015-12-02 12:49:49 -0800
committerDana Powers <dana.powers@gmail.com>2015-12-02 12:49:49 -0800
commit0d41e6b07ea16b1074d3a63d521747361f1c7145 (patch)
tree1889ef033e0f0270252bc608b466add391ed42bc
parente99a934bab1d551d07dd0c6365f6a730028489f3 (diff)
parent3fbd1cc80965a0f49bfd6c505a1c021af95354fb (diff)
downloadkafka-python-0d41e6b07ea16b1074d3a63d521747361f1c7145.tar.gz
Merge pull request #409 from scrapinghub/feature-allow-null-payload
Allow null payload for deletion feature
-rw-r--r--kafka/producer/base.py12
-rw-r--r--test/test_producer.py24
-rw-r--r--test/test_producer_integration.py22
3 files changed, 53 insertions, 5 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index 3c826cd..d5c013a 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -355,9 +355,15 @@ class Producer(object):
if not isinstance(msg, (list, tuple)):
raise TypeError("msg is not a list or tuple!")
- # Raise TypeError if any message is not encoded as bytes
- if any(not isinstance(m, six.binary_type) for m in msg):
- raise TypeError("all produce message payloads must be type bytes")
+ for m in msg:
+ # The protocol allows to have key & payload with null values both,
+ # (https://goo.gl/o694yN) but having (null,null) pair doesn't make sense.
+ if m is None:
+ if key is None:
+ raise TypeError("key and payload can't be null in one")
+ # Raise TypeError if any non-null message is not encoded as bytes
+ elif not isinstance(m, six.binary_type):
+ raise TypeError("all produce message payloads must be null or type bytes")
# Raise TypeError if topic is not encoded as bytes
if not isinstance(topic, six.binary_type):
diff --git a/test/test_producer.py b/test/test_producer.py
index 27272f6..e681e43 100644
--- a/test/test_producer.py
+++ b/test/test_producer.py
@@ -7,7 +7,7 @@ import time
from mock import MagicMock, patch
from . import unittest
-from kafka import KafkaClient, SimpleProducer
+from kafka import KafkaClient, SimpleProducer, KeyedProducer
from kafka.common import (
AsyncProducerQueueFull, FailedPayloadsError, NotLeaderForPartitionError,
ProduceResponse, RetryOptions, TopicAndPartition
@@ -33,7 +33,8 @@ class TestKafkaProducer(unittest.TestCase):
topic = b"test-topic"
partition = 0
- bad_data_types = (u'你怎么样?', 12, ['a', 'list'], ('a', 'tuple'), {'a': 'dict'})
+ bad_data_types = (u'你怎么样?', 12, ['a', 'list'],
+ ('a', 'tuple'), {'a': 'dict'}, None,)
for m in bad_data_types:
with self.assertRaises(TypeError):
logging.debug("attempting to send message of type %s", type(m))
@@ -44,6 +45,25 @@ class TestKafkaProducer(unittest.TestCase):
# This should not raise an exception
producer.send_messages(topic, partition, m)
+ def test_keyedproducer_message_types(self):
+ client = MagicMock()
+ client.get_partition_ids_for_topic.return_value = [0, 1]
+ producer = KeyedProducer(client)
+ topic = b"test-topic"
+ key = b"testkey"
+
+ bad_data_types = (u'你怎么样?', 12, ['a', 'list'],
+ ('a', 'tuple'), {'a': 'dict'},)
+ for m in bad_data_types:
+ with self.assertRaises(TypeError):
+ logging.debug("attempting to send message of type %s", type(m))
+ producer.send_messages(topic, key, m)
+
+ good_data_types = (b'a string!', None,)
+ for m in good_data_types:
+ # This should not raise an exception
+ producer.send_messages(topic, key, m)
+
def test_topic_message_types(self):
client = MagicMock()
diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py
index abf34c3..46b6851 100644
--- a/test/test_producer_integration.py
+++ b/test/test_producer_integration.py
@@ -341,6 +341,28 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
# KeyedProducer Tests #
############################
+ @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.0")
+ def test_keyedproducer_null_payload(self):
+ partitions = self.client.get_partition_ids_for_topic(self.topic)
+ start_offsets = [self.current_offset(self.topic, p) for p in partitions]
+
+ producer = KeyedProducer(self.client, partitioner=RoundRobinPartitioner)
+ key = "test"
+
+ resp = producer.send_messages(self.topic, self.key("key1"), self.msg("one"))
+ self.assert_produce_response(resp, start_offsets[0])
+ resp = producer.send_messages(self.topic, self.key("key2"), None)
+ self.assert_produce_response(resp, start_offsets[1])
+ resp = producer.send_messages(self.topic, self.key("key3"), None)
+ self.assert_produce_response(resp, start_offsets[0]+1)
+ resp = producer.send_messages(self.topic, self.key("key4"), self.msg("four"))
+ self.assert_produce_response(resp, start_offsets[1]+1)
+
+ self.assert_fetch_offset(partitions[0], start_offsets[0], [ self.msg("one"), None ])
+ self.assert_fetch_offset(partitions[1], start_offsets[1], [ None, self.msg("four") ])
+
+ producer.stop()
+
@kafka_versions("all")
def test_round_robin_partitioner(self):
partitions = self.client.get_partition_ids_for_topic(self.topic)