summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/producer/base.py11
-rw-r--r--kafka/producer/keyed.py2
2 files changed, 7 insertions, 6 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index c29491e..7f9b18c 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -49,7 +49,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
# timeout is reached
while count > 0 and timeout >= 0:
try:
- topic_partition, msg = queue.get(timeout=timeout)
+ topic_partition, msg, key = queue.get(timeout=timeout)
except Empty:
break
@@ -67,7 +67,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
# Send collected requests upstream
reqs = []
for topic_partition, msg in msgset.items():
- messages = create_message_set(msg, codec)
+ messages = create_message_set(msg, codec, key)
req = ProduceRequest(topic_partition.topic,
topic_partition.partition,
messages)
@@ -180,12 +180,13 @@ class Producer(object):
if any(not isinstance(m, six.binary_type) for m in msg):
raise TypeError("all produce message payloads must be type bytes")
+ key = kwargs.pop('key', None)
if self.async:
for m in msg:
- self.queue.put((TopicAndPartition(topic, partition), m))
+ self.queue.put((TopicAndPartition(topic, partition), m, key))
resp = []
else:
- messages = create_message_set(msg, self.codec)
+ messages = create_message_set(msg, self.codec, key)
req = ProduceRequest(topic, partition, messages)
try:
resp = self.client.send_produce_request([req], acks=self.req_acks,
@@ -201,7 +202,7 @@ class Producer(object):
forcefully cleaning up.
"""
if self.async:
- self.queue.put((STOP_ASYNC_PRODUCER, None))
+ self.queue.put((STOP_ASYNC_PRODUCER, None, None))
self.proc.join(timeout)
if self.proc.is_alive():
diff --git a/kafka/producer/keyed.py b/kafka/producer/keyed.py
index d2311c6..473f70a 100644
--- a/kafka/producer/keyed.py
+++ b/kafka/producer/keyed.py
@@ -56,7 +56,7 @@ class KeyedProducer(Producer):
def send(self, topic, key, msg):
partition = self._next_partition(topic, key)
- return self.send_messages(topic, partition, msg)
+ return self._send_messages(topic, partition, msg, key=key)
def __repr__(self):
return '<KeyedProducer batch=%s>' % self.async