diff options
-rw-r--r-- | kafka/producer/base.py | 11 | ||||
-rw-r--r-- | kafka/producer/keyed.py | 2 |
2 files changed, 7 insertions, 6 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py index c29491e..7f9b18c 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -49,7 +49,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, # timeout is reached while count > 0 and timeout >= 0: try: - topic_partition, msg = queue.get(timeout=timeout) + topic_partition, msg, key = queue.get(timeout=timeout) except Empty: break @@ -67,7 +67,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, # Send collected requests upstream reqs = [] for topic_partition, msg in msgset.items(): - messages = create_message_set(msg, codec) + messages = create_message_set(msg, codec, key) req = ProduceRequest(topic_partition.topic, topic_partition.partition, messages) @@ -180,12 +180,13 @@ class Producer(object): if any(not isinstance(m, six.binary_type) for m in msg): raise TypeError("all produce message payloads must be type bytes") + key = kwargs.pop('key', None) if self.async: for m in msg: - self.queue.put((TopicAndPartition(topic, partition), m)) + self.queue.put((TopicAndPartition(topic, partition), m, key)) resp = [] else: - messages = create_message_set(msg, self.codec) + messages = create_message_set(msg, self.codec, key) req = ProduceRequest(topic, partition, messages) try: resp = self.client.send_produce_request([req], acks=self.req_acks, @@ -201,7 +202,7 @@ class Producer(object): forcefully cleaning up. """ if self.async: - self.queue.put((STOP_ASYNC_PRODUCER, None)) + self.queue.put((STOP_ASYNC_PRODUCER, None, None)) self.proc.join(timeout) if self.proc.is_alive(): diff --git a/kafka/producer/keyed.py b/kafka/producer/keyed.py index d2311c6..473f70a 100644 --- a/kafka/producer/keyed.py +++ b/kafka/producer/keyed.py @@ -56,7 +56,7 @@ class KeyedProducer(Producer): def send(self, topic, key, msg): partition = self._next_partition(topic, key) - return self.send_messages(topic, partition, msg) + return self._send_messages(topic, partition, msg, key=key) def __repr__(self): return '<KeyedProducer batch=%s>' % self.async |