diff options
-rw-r--r-- | README.md | 7 | ||||
-rw-r--r-- | kafka/producer.py | 21 | ||||
-rw-r--r-- | test/test_producer.py | 31 |
3 files changed, 59 insertions, 0 deletions
@@ -42,9 +42,14 @@ kafka = KafkaClient("localhost:9092") # To send messages synchronously producer = SimpleProducer(kafka) + +# Note that the application is responsible for encoding messages to type str producer.send_messages("my-topic", "some message") producer.send_messages("my-topic", "this method", "is variadic") +# Send unicode message +producer.send_messages("my-topic", u'你怎么样?'.encode('utf-8')) + # To send messages asynchronously producer = SimpleProducer(kafka, async=True) producer.send_messages("my-topic", "async message") @@ -78,6 +83,8 @@ producer = SimpleProducer(kafka, batch_send=True, # To consume messages consumer = SimpleConsumer(kafka, "my-group", "my-topic") for message in consumer: + # message is raw byte string -- decode if necessary! + # e.g., for unicode: `message.decode('utf-8')` print(message) kafka.close() diff --git a/kafka/producer.py b/kafka/producer.py index 95c75c4..800e677 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -148,7 +148,28 @@ class Producer(object): def send_messages(self, topic, partition, *msg): """ Helper method to send produce requests + @param: topic, name of topic for produce request -- type str + @param: partition, partition number for produce request -- type int + @param: *msg, one or more message payloads -- type str + @returns: ResponseRequest returned by server + raises on error + + Note that msg type *must* be encoded to str by user. + Passing unicode message will not work, for example + you should encode before calling send_messages via + something like `unicode_message.encode('utf-8')` + + All messages produced via this method will set the message 'key' to Null """ + + # Guarantee that msg is actually a list or tuple (should always be true) + if not isinstance(msg, (list, tuple)): + raise TypeError("msg is not a list or tuple!") + + # Raise TypeError if any message is not encoded as a str + if any(not isinstance(m, str) for m in msg): + raise TypeError("all produce message payloads must be type str") + if self.async: for m in msg: self.queue.put((TopicAndPartition(topic, partition), m)) diff --git a/test/test_producer.py b/test/test_producer.py new file mode 100644 index 0000000..a84e20f --- /dev/null +++ b/test/test_producer.py @@ -0,0 +1,31 @@ +# -*- coding: utf-8 -*- + +import logging +import os +import random +import struct +import unittest2 + +from mock import MagicMock, patch + +from kafka import KafkaClient +from kafka.producer import Producer + +class TestKafkaProducer(unittest2.TestCase): + def test_producer_message_types(self): + + producer = Producer(MagicMock()) + topic = "test-topic" + partition = 0 + + bad_data_types = (u'你怎么样?', 12, ['a','list'], ('a','tuple'), {'a': 'dict'}) + for m in bad_data_types: + with self.assertRaises(TypeError): + logging.debug("attempting to send message of type %s", type(m)) + producer.send_messages(topic, partition, m) + + good_data_types = ('a string!',) + for m in good_data_types: + # This should not raise an exception + producer.send_messages(topic, partition, m) + |