summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--README.md7
-rw-r--r--kafka/producer.py21
-rw-r--r--test/test_producer.py31
3 files changed, 59 insertions, 0 deletions
diff --git a/README.md b/README.md
index f123435..e176bad 100644
--- a/README.md
+++ b/README.md
@@ -42,9 +42,14 @@ kafka = KafkaClient("localhost:9092")
# To send messages synchronously
producer = SimpleProducer(kafka)
+
+# Note that the application is responsible for encoding messages to type str
producer.send_messages("my-topic", "some message")
producer.send_messages("my-topic", "this method", "is variadic")
+# Send unicode message
+producer.send_messages("my-topic", u'你怎么样?'.encode('utf-8'))
+
# To send messages asynchronously
producer = SimpleProducer(kafka, async=True)
producer.send_messages("my-topic", "async message")
@@ -78,6 +83,8 @@ producer = SimpleProducer(kafka, batch_send=True,
# To consume messages
consumer = SimpleConsumer(kafka, "my-group", "my-topic")
for message in consumer:
+ # message is raw byte string -- decode if necessary!
+ # e.g., for unicode: `message.decode('utf-8')`
print(message)
kafka.close()
diff --git a/kafka/producer.py b/kafka/producer.py
index 95c75c4..800e677 100644
--- a/kafka/producer.py
+++ b/kafka/producer.py
@@ -148,7 +148,28 @@ class Producer(object):
def send_messages(self, topic, partition, *msg):
"""
Helper method to send produce requests
+ @param: topic, name of topic for produce request -- type str
+ @param: partition, partition number for produce request -- type int
+ @param: *msg, one or more message payloads -- type str
+ @returns: ResponseRequest returned by server
+ raises on error
+
+ Note that msg type *must* be encoded to str by user.
+ Passing unicode message will not work, for example
+ you should encode before calling send_messages via
+ something like `unicode_message.encode('utf-8')`
+
+ All messages produced via this method will set the message 'key' to Null
"""
+
+ # Guarantee that msg is actually a list or tuple (should always be true)
+ if not isinstance(msg, (list, tuple)):
+ raise TypeError("msg is not a list or tuple!")
+
+ # Raise TypeError if any message is not encoded as a str
+ if any(not isinstance(m, str) for m in msg):
+ raise TypeError("all produce message payloads must be type str")
+
if self.async:
for m in msg:
self.queue.put((TopicAndPartition(topic, partition), m))
diff --git a/test/test_producer.py b/test/test_producer.py
new file mode 100644
index 0000000..a84e20f
--- /dev/null
+++ b/test/test_producer.py
@@ -0,0 +1,31 @@
+# -*- coding: utf-8 -*-
+
+import logging
+import os
+import random
+import struct
+import unittest2
+
+from mock import MagicMock, patch
+
+from kafka import KafkaClient
+from kafka.producer import Producer
+
+class TestKafkaProducer(unittest2.TestCase):
+ def test_producer_message_types(self):
+
+ producer = Producer(MagicMock())
+ topic = "test-topic"
+ partition = 0
+
+ bad_data_types = (u'你怎么样?', 12, ['a','list'], ('a','tuple'), {'a': 'dict'})
+ for m in bad_data_types:
+ with self.assertRaises(TypeError):
+ logging.debug("attempting to send message of type %s", type(m))
+ producer.send_messages(topic, partition, m)
+
+ good_data_types = ('a string!',)
+ for m in good_data_types:
+ # This should not raise an exception
+ producer.send_messages(topic, partition, m)
+