From f6df696e0ab11ec931283dcca8c518cd54d57687 Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Mon, 27 Jan 2014 11:16:53 -0800 Subject: Use TopicAndPartition when producing async messages --- kafka/producer.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) (limited to 'kafka') diff --git a/kafka/producer.py b/kafka/producer.py index 4279e61..12a2934 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -8,7 +8,7 @@ from collections import defaultdict from itertools import cycle from multiprocessing import Queue, Process -from kafka.common import ProduceRequest +from kafka.common import ProduceRequest, TopicAndPartition from kafka.partitioner import HashedPartitioner from kafka.protocol import create_message @@ -44,25 +44,27 @@ def _send_upstream(queue, client, batch_time, batch_size, # timeout is reached while count > 0 and timeout >= 0: try: - topic, partition, msg = queue.get(timeout=timeout) + topic_partition, msg = queue.get(timeout=timeout) except Empty: break # Check if the controller has requested us to stop - if topic == STOP_ASYNC_PRODUCER: + if topic_partition == STOP_ASYNC_PRODUCER: stop = True break # Adjust the timeout to match the remaining period count -= 1 timeout = send_at - time.time() - msgset[(topic, partition)].append(msg) + msgset[topic_partition].append(msg) # Send collected requests upstream reqs = [] - for (topic, partition), messages in msgset.items(): - req = ProduceRequest(topic, partition, messages) + for topic_partition, messages in msgset.items(): + req = ProduceRequest(topic_partition.topic, + topic_partition.partition, + messages) reqs.append(req) try: @@ -136,7 +138,8 @@ class Producer(object): """ if self.async: for m in msg: - self.queue.put((topic, partition, create_message(m))) + self.queue.put((TopicAndPartition(topic, partition), + create_message(m))) resp = [] else: messages = [create_message(m) for m in msg] @@ -155,7 +158,7 @@ class Producer(object): forcefully cleaning up. """ if self.async: - self.queue.put((STOP_ASYNC_PRODUCER, None, None)) + self.queue.put((STOP_ASYNC_PRODUCER, None)) self.proc.join(timeout) if self.proc.is_alive(): -- cgit v1.2.1