summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
Diffstat (limited to 'kafka')
-rw-r--r--kafka/producer/simple.py8
-rw-r--r--kafka/util.py7
2 files changed, 13 insertions, 2 deletions
diff --git a/kafka/producer/simple.py b/kafka/producer/simple.py
index afeae06..2699cf2 100644
--- a/kafka/producer/simple.py
+++ b/kafka/producer/simple.py
@@ -2,6 +2,7 @@ from __future__ import absolute_import
import logging
import random
+import six
from itertools import cycle
@@ -68,8 +69,13 @@ class SimpleProducer(Producer):
return next(self.partition_cycles[topic])
def send_messages(self, topic, *msg):
+ if not isinstance(topic, six.binary_type):
+ topic = topic.encode('utf-8')
+
partition = self._next_partition(topic)
- return super(SimpleProducer, self).send_messages(topic, partition, *msg)
+ return super(SimpleProducer, self).send_messages(
+ topic, partition, *msg
+ )
def __repr__(self):
return '<SimpleProducer batch=%s>' % self.async
diff --git a/kafka/util.py b/kafka/util.py
index 622b1a7..14d2b2c 100644
--- a/kafka/util.py
+++ b/kafka/util.py
@@ -126,7 +126,11 @@ class ReentrantTimer(object):
self.active = None
def _timer(self, active):
- while not active.wait(self.t):
+ # python2.6 Event.wait() always returns None
+ # python2.7 and greater returns the flag value (true/false)
+ # we want the flag value, so add an 'or' here for python2.6
+ # this is redundant for later python versions (FLAG OR FLAG == FLAG)
+ while not (active.wait(self.t) or active.is_set()):
self.fn(*self.args, **self.kwargs)
def start(self):
@@ -146,3 +150,4 @@ class ReentrantTimer(object):
self.thread.join(self.t + 1)
# noinspection PyAttributeOutsideInit
self.timer = None
+ self.fn = None