summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/producer.py19
1 files changed, 11 insertions, 8 deletions
diff --git a/kafka/producer.py b/kafka/producer.py
index 4279e61..12a2934 100644
--- a/kafka/producer.py
+++ b/kafka/producer.py
@@ -8,7 +8,7 @@ from collections import defaultdict
from itertools import cycle
from multiprocessing import Queue, Process
-from kafka.common import ProduceRequest
+from kafka.common import ProduceRequest, TopicAndPartition
from kafka.partitioner import HashedPartitioner
from kafka.protocol import create_message
@@ -44,25 +44,27 @@ def _send_upstream(queue, client, batch_time, batch_size,
# timeout is reached
while count > 0 and timeout >= 0:
try:
- topic, partition, msg = queue.get(timeout=timeout)
+ topic_partition, msg = queue.get(timeout=timeout)
except Empty:
break
# Check if the controller has requested us to stop
- if topic == STOP_ASYNC_PRODUCER:
+ if topic_partition == STOP_ASYNC_PRODUCER:
stop = True
break
# Adjust the timeout to match the remaining period
count -= 1
timeout = send_at - time.time()
- msgset[(topic, partition)].append(msg)
+ msgset[topic_partition].append(msg)
# Send collected requests upstream
reqs = []
- for (topic, partition), messages in msgset.items():
- req = ProduceRequest(topic, partition, messages)
+ for topic_partition, messages in msgset.items():
+ req = ProduceRequest(topic_partition.topic,
+ topic_partition.partition,
+ messages)
reqs.append(req)
try:
@@ -136,7 +138,8 @@ class Producer(object):
"""
if self.async:
for m in msg:
- self.queue.put((topic, partition, create_message(m)))
+ self.queue.put((TopicAndPartition(topic, partition),
+ create_message(m)))
resp = []
else:
messages = [create_message(m) for m in msg]
@@ -155,7 +158,7 @@ class Producer(object):
forcefully cleaning up.
"""
if self.async:
- self.queue.put((STOP_ASYNC_PRODUCER, None, None))
+ self.queue.put((STOP_ASYNC_PRODUCER, None))
self.proc.join(timeout)
if self.proc.is_alive():