summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorOmar Ghishan <omar.ghishan@rd.io>2014-01-27 11:16:53 -0800
committerOmar Ghishan <omar.ghishan@rd.io>2014-01-27 11:16:53 -0800
commitf6df696e0ab11ec931283dcca8c518cd54d57687 (patch)
tree9f9325eb38ceab96bea0136cf1463000bbe89625
parenta46898d01515989f01cc6d39107fd6beecc508f0 (diff)
downloadkafka-python-f6df696e0ab11ec931283dcca8c518cd54d57687.tar.gz
Use TopicAndPartition when producing async messages
-rw-r--r--kafka/producer.py19
1 files changed, 11 insertions, 8 deletions
diff --git a/kafka/producer.py b/kafka/producer.py
index 4279e61..12a2934 100644
--- a/kafka/producer.py
+++ b/kafka/producer.py
@@ -8,7 +8,7 @@ from collections import defaultdict
from itertools import cycle
from multiprocessing import Queue, Process
-from kafka.common import ProduceRequest
+from kafka.common import ProduceRequest, TopicAndPartition
from kafka.partitioner import HashedPartitioner
from kafka.protocol import create_message
@@ -44,25 +44,27 @@ def _send_upstream(queue, client, batch_time, batch_size,
# timeout is reached
while count > 0 and timeout >= 0:
try:
- topic, partition, msg = queue.get(timeout=timeout)
+ topic_partition, msg = queue.get(timeout=timeout)
except Empty:
break
# Check if the controller has requested us to stop
- if topic == STOP_ASYNC_PRODUCER:
+ if topic_partition == STOP_ASYNC_PRODUCER:
stop = True
break
# Adjust the timeout to match the remaining period
count -= 1
timeout = send_at - time.time()
- msgset[(topic, partition)].append(msg)
+ msgset[topic_partition].append(msg)
# Send collected requests upstream
reqs = []
- for (topic, partition), messages in msgset.items():
- req = ProduceRequest(topic, partition, messages)
+ for topic_partition, messages in msgset.items():
+ req = ProduceRequest(topic_partition.topic,
+ topic_partition.partition,
+ messages)
reqs.append(req)
try:
@@ -136,7 +138,8 @@ class Producer(object):
"""
if self.async:
for m in msg:
- self.queue.put((topic, partition, create_message(m)))
+ self.queue.put((TopicAndPartition(topic, partition),
+ create_message(m)))
resp = []
else:
messages = [create_message(m) for m in msg]
@@ -155,7 +158,7 @@ class Producer(object):
forcefully cleaning up.
"""
if self.async:
- self.queue.put((STOP_ASYNC_PRODUCER, None, None))
+ self.queue.put((STOP_ASYNC_PRODUCER, None))
self.proc.join(timeout)
if self.proc.is_alive():