diff options
-rw-r--r-- | kafka/producer/simple.py | 8 | ||||
-rw-r--r-- | kafka/util.py | 7 | ||||
-rw-r--r-- | test/test_producer.py | 15 |
3 files changed, 28 insertions, 2 deletions
diff --git a/kafka/producer/simple.py b/kafka/producer/simple.py index afeae06..2699cf2 100644 --- a/kafka/producer/simple.py +++ b/kafka/producer/simple.py @@ -2,6 +2,7 @@ from __future__ import absolute_import import logging import random +import six from itertools import cycle @@ -68,8 +69,13 @@ class SimpleProducer(Producer): return next(self.partition_cycles[topic]) def send_messages(self, topic, *msg): + if not isinstance(topic, six.binary_type): + topic = topic.encode('utf-8') + partition = self._next_partition(topic) - return super(SimpleProducer, self).send_messages(topic, partition, *msg) + return super(SimpleProducer, self).send_messages( + topic, partition, *msg + ) def __repr__(self): return '<SimpleProducer batch=%s>' % self.async diff --git a/kafka/util.py b/kafka/util.py index 622b1a7..14d2b2c 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -126,7 +126,11 @@ class ReentrantTimer(object): self.active = None def _timer(self, active): - while not active.wait(self.t): + # python2.6 Event.wait() always returns None + # python2.7 and greater returns the flag value (true/false) + # we want the flag value, so add an 'or' here for python2.6 + # this is redundant for later python versions (FLAG OR FLAG == FLAG) + while not (active.wait(self.t) or active.is_set()): self.fn(*self.args, **self.kwargs) def start(self): @@ -146,3 +150,4 @@ class ReentrantTimer(object): self.thread.join(self.t + 1) # noinspection PyAttributeOutsideInit self.timer = None + self.fn = None diff --git a/test/test_producer.py b/test/test_producer.py index caf8fe3..f6b3d6a 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -7,6 +7,7 @@ from . import unittest from kafka.producer.base import Producer + class TestKafkaProducer(unittest.TestCase): def test_producer_message_types(self): @@ -25,3 +26,17 @@ class TestKafkaProducer(unittest.TestCase): # This should not raise an exception producer.send_messages(topic, partition, m) + def test_topic_message_types(self): + from kafka.producer.simple import SimpleProducer + + client = MagicMock() + + def partitions(topic): + return [0, 1] + + client.get_partition_ids_for_topic = partitions + + producer = SimpleProducer(client, random_start=False) + topic = b"test-topic" + producer.send_messages(topic, b'hi') + assert client.send_produce_request.called |