diff options
-rw-r--r-- | README.md | 11 | ||||
-rw-r--r-- | kafka/producer.py | 2 |
2 files changed, 12 insertions, 1 deletions
@@ -54,6 +54,17 @@ if response: print(response[0].error) print(response[0].offset) +# To send messages in batch. You can use any of the available +# producers for doing this. The following producer will collect +# messages in batch and send them to Kafka after 20 messages are +# collected or every 60 seconds +# Notes: +# * If the producer dies before the messages are sent, there will be losses +# * Call producer.stop() to send the messages and cleanup +producer = SimpleProducer(kafka, "my-topic", batch_send=True, + batch_send_every_n=20, + batch_send_every_t=60) + # To consume messages consumer = SimpleConsumer(kafka, "my-group", "my-topic") for message in consumer: diff --git a/kafka/producer.py b/kafka/producer.py index da7cd96..be80c62 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -152,7 +152,7 @@ class Producer(object): self.queue.put(STOP_ASYNC_PRODUCER) self.proc.join(timeout) - if self.proc.is_alive() + if self.proc.is_alive(): self.proc.terminate() |