diff options
-rw-r--r-- | kafka/producer.py | 19 |
1 files changed, 11 insertions, 8 deletions
diff --git a/kafka/producer.py b/kafka/producer.py index 4279e61..12a2934 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -8,7 +8,7 @@ from collections import defaultdict from itertools import cycle from multiprocessing import Queue, Process -from kafka.common import ProduceRequest +from kafka.common import ProduceRequest, TopicAndPartition from kafka.partitioner import HashedPartitioner from kafka.protocol import create_message @@ -44,25 +44,27 @@ def _send_upstream(queue, client, batch_time, batch_size, # timeout is reached while count > 0 and timeout >= 0: try: - topic, partition, msg = queue.get(timeout=timeout) + topic_partition, msg = queue.get(timeout=timeout) except Empty: break # Check if the controller has requested us to stop - if topic == STOP_ASYNC_PRODUCER: + if topic_partition == STOP_ASYNC_PRODUCER: stop = True break # Adjust the timeout to match the remaining period count -= 1 timeout = send_at - time.time() - msgset[(topic, partition)].append(msg) + msgset[topic_partition].append(msg) # Send collected requests upstream reqs = [] - for (topic, partition), messages in msgset.items(): - req = ProduceRequest(topic, partition, messages) + for topic_partition, messages in msgset.items(): + req = ProduceRequest(topic_partition.topic, + topic_partition.partition, + messages) reqs.append(req) try: @@ -136,7 +138,8 @@ class Producer(object): """ if self.async: for m in msg: - self.queue.put((topic, partition, create_message(m))) + self.queue.put((TopicAndPartition(topic, partition), + create_message(m))) resp = [] else: messages = [create_message(m) for m in msg] @@ -155,7 +158,7 @@ class Producer(object): forcefully cleaning up. """ if self.async: - self.queue.put((STOP_ASYNC_PRODUCER, None, None)) + self.queue.put((STOP_ASYNC_PRODUCER, None)) self.proc.join(timeout) if self.proc.is_alive(): |