summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/producer.py15
1 files changed, 9 insertions, 6 deletions
diff --git a/kafka/producer.py b/kafka/producer.py
index 5eb1bd8..ce71d66 100644
--- a/kafka/producer.py
+++ b/kafka/producer.py
@@ -104,13 +104,16 @@ class Producer(object):
msgset[partition].append(msg)
# Send collected requests upstream
+ reqs = []
for partition, messages in msgset.items():
- try:
- req = ProduceRequest(self.topic, partition, messages)
- self.client.send_produce_request([req], acks=self.req_acks,
- timeout=self.ack_timeout)
- except Exception as exp:
- log.error("Error sending message", exc_info=sys.exc_info())
+ req = ProduceRequest(self.topic, partition, messages)
+ reqs.append(req)
+
+ try:
+ self.client.send_produce_request(reqs, acks=self.req_acks,
+ timeout=self.ack_timeout)
+ except Exception as exp:
+ log.error("Error sending message", exc_info=sys.exc_info())
def send_messages(self, partition, *msg):
"""