diff options
Diffstat (limited to 'kafka/producer.py')
-rw-r--r-- | kafka/producer.py | 14 |
1 files changed, 10 insertions, 4 deletions
diff --git a/kafka/producer.py b/kafka/producer.py index 8a6bff0..e1f4a22 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -4,11 +4,17 @@ import logging import time import random -from Queue import Empty +try: + from queue import Empty +except ImportError: + from Queue import Empty from collections import defaultdict from itertools import cycle from multiprocessing import Queue, Process +import six +from six.moves import xrange + from kafka.common import ( ProduceRequest, TopicAndPartition, UnsupportedCodecError, UnknownTopicOrPartitionError ) @@ -172,8 +178,8 @@ class Producer(object): if not isinstance(msg, (list, tuple)): raise TypeError("msg is not a list or tuple!") - # Raise TypeError if any message is not encoded as a str - if any(not isinstance(m, str) for m in msg): + # Raise TypeError if any message is not encoded as bytes + if any(not isinstance(m, six.binary_type) for m in msg): raise TypeError("all produce message payloads must be type str") if self.async: @@ -221,7 +227,7 @@ class SimpleProducer(Producer): batch_send_every_t - If set, messages are send after this timeout random_start - If true, randomize the initial partition which the the first message block will be published to, otherwise - if false, the first message block will always publish + if false, the first message block will always publish to partition 0 before cycling through each partition """ def __init__(self, client, async=False, |