diff options
author | Lou Marvin Caraig <loumarvincaraig@gmail.com> | 2014-11-26 11:53:57 +0100 |
---|---|---|
committer | Lou Marvin Caraig <loumarvincaraig@gmail.com> | 2014-11-26 15:04:28 +0100 |
commit | 664240ab4dd846c63efa58b210d4832f88446bf6 (patch) | |
tree | 261402d4bb4cce59b8788c1be1f440db001d5d7f | |
parent | 7041a27e2008d94a09f374cf681ca6fc0b4e0525 (diff) | |
download | kafka-python-664240ab4dd846c63efa58b210d4832f88446bf6.tar.gz |
Key is passed when creating messages for both async=False and async=True
-rw-r--r-- | kafka/producer/base.py | 11 | ||||
-rw-r--r-- | kafka/producer/keyed.py | 2 |
2 files changed, 7 insertions, 6 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py index c29491e..7f9b18c 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -49,7 +49,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, # timeout is reached while count > 0 and timeout >= 0: try: - topic_partition, msg = queue.get(timeout=timeout) + topic_partition, msg, key = queue.get(timeout=timeout) except Empty: break @@ -67,7 +67,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, # Send collected requests upstream reqs = [] for topic_partition, msg in msgset.items(): - messages = create_message_set(msg, codec) + messages = create_message_set(msg, codec, key) req = ProduceRequest(topic_partition.topic, topic_partition.partition, messages) @@ -180,12 +180,13 @@ class Producer(object): if any(not isinstance(m, six.binary_type) for m in msg): raise TypeError("all produce message payloads must be type bytes") + key = kwargs.pop('key', None) if self.async: for m in msg: - self.queue.put((TopicAndPartition(topic, partition), m)) + self.queue.put((TopicAndPartition(topic, partition), m, key)) resp = [] else: - messages = create_message_set(msg, self.codec) + messages = create_message_set(msg, self.codec, key) req = ProduceRequest(topic, partition, messages) try: resp = self.client.send_produce_request([req], acks=self.req_acks, @@ -201,7 +202,7 @@ class Producer(object): forcefully cleaning up. """ if self.async: - self.queue.put((STOP_ASYNC_PRODUCER, None)) + self.queue.put((STOP_ASYNC_PRODUCER, None, None)) self.proc.join(timeout) if self.proc.is_alive(): diff --git a/kafka/producer/keyed.py b/kafka/producer/keyed.py index d2311c6..473f70a 100644 --- a/kafka/producer/keyed.py +++ b/kafka/producer/keyed.py @@ -56,7 +56,7 @@ class KeyedProducer(Producer): def send(self, topic, key, msg): partition = self._next_partition(topic, key) - return self.send_messages(topic, partition, msg) + return self._send_messages(topic, partition, msg, key=key) def __repr__(self): return '<KeyedProducer batch=%s>' % self.async |