diff options
-rw-r--r-- | kafka/producer.py | 15 |
1 files changed, 9 insertions, 6 deletions
diff --git a/kafka/producer.py b/kafka/producer.py index 5eb1bd8..ce71d66 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -104,13 +104,16 @@ class Producer(object): msgset[partition].append(msg) # Send collected requests upstream + reqs = [] for partition, messages in msgset.items(): - try: - req = ProduceRequest(self.topic, partition, messages) - self.client.send_produce_request([req], acks=self.req_acks, - timeout=self.ack_timeout) - except Exception as exp: - log.error("Error sending message", exc_info=sys.exc_info()) + req = ProduceRequest(self.topic, partition, messages) + reqs.append(req) + + try: + self.client.send_produce_request(reqs, acks=self.req_acks, + timeout=self.ack_timeout) + except Exception as exp: + log.error("Error sending message", exc_info=sys.exc_info()) def send_messages(self, partition, *msg): """ |