summaryrefslogtreecommitdiff
path: root/kafka/producer.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/producer.py')
-rw-r--r--kafka/producer.py14
1 files changed, 10 insertions, 4 deletions
diff --git a/kafka/producer.py b/kafka/producer.py
index 8a6bff0..e1f4a22 100644
--- a/kafka/producer.py
+++ b/kafka/producer.py
@@ -4,11 +4,17 @@ import logging
import time
import random
-from Queue import Empty
+try:
+ from queue import Empty
+except ImportError:
+ from Queue import Empty
from collections import defaultdict
from itertools import cycle
from multiprocessing import Queue, Process
+import six
+from six.moves import xrange
+
from kafka.common import (
ProduceRequest, TopicAndPartition, UnsupportedCodecError, UnknownTopicOrPartitionError
)
@@ -172,8 +178,8 @@ class Producer(object):
if not isinstance(msg, (list, tuple)):
raise TypeError("msg is not a list or tuple!")
- # Raise TypeError if any message is not encoded as a str
- if any(not isinstance(m, str) for m in msg):
+ # Raise TypeError if any message is not encoded as bytes
+ if any(not isinstance(m, six.binary_type) for m in msg):
raise TypeError("all produce message payloads must be type str")
if self.async:
@@ -221,7 +227,7 @@ class SimpleProducer(Producer):
batch_send_every_t - If set, messages are send after this timeout
random_start - If true, randomize the initial partition which the
the first message block will be published to, otherwise
- if false, the first message block will always publish
+ if false, the first message block will always publish
to partition 0 before cycling through each partition
"""
def __init__(self, client, async=False,