From 97261f491d02fb7f72ba30abcfc26240f520a9b4 Mon Sep 17 00:00:00 2001 From: Linus Wallgren Date: Thu, 18 May 2017 22:49:20 +0200 Subject: Describe consumer thread-safety --- README.rst | 9 +++++++++ docs/index.rst | 10 ++++++++++ example.py | 7 ++++--- kafka/consumer/group.py | 2 ++ 4 files changed, 25 insertions(+), 3 deletions(-) diff --git a/README.rst b/README.rst index 8462504..751a524 100644 --- a/README.rst +++ b/README.rst @@ -110,6 +110,15 @@ for more details. >>> for i in range(1000): ... producer.send('foobar', b'msg %d' % i) +Thread safety +************* + +The KafkaProducer can be used across threads without issue, unlike the +KafkaConsumer which cannot. + +While it is possible to use the KafkaConsumer in a thread-local manner, +multiprocessing is recommended. + Compression *********** diff --git a/docs/index.rst b/docs/index.rst index 18f0721..550d246 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -109,6 +109,16 @@ client. See `KafkaProducer `_ for more details. ... producer.send('foobar', b'msg %d' % i) +Thread safety +************* + +The KafkaProducer can be used across threads without issue, unlike the +KafkaConsumer which cannot. + +While it is possible to use the KafkaConsumer in a thread-local manner, +multiprocessing is recommended. + + Compression *********** diff --git a/example.py b/example.py index a1a1e1e..2431ee2 100755 --- a/example.py +++ b/example.py @@ -1,5 +1,6 @@ #!/usr/bin/env python import threading, logging, time +import multiprocessing from kafka import KafkaConsumer, KafkaProducer @@ -16,7 +17,7 @@ class Producer(threading.Thread): time.sleep(1) -class Consumer(threading.Thread): +class Consumer(multiprocessing.Process): daemon = True def run(self): @@ -29,12 +30,12 @@ class Consumer(threading.Thread): def main(): - threads = [ + tasks = [ Producer(), Consumer() ] - for t in threads: + for t in tasks: t.start() time.sleep(10) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 7fa5710..15a8947 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -33,6 +33,8 @@ class KafkaConsumer(six.Iterator): to allow multiple consumers to load balance consumption of topics (requires kafka >= 0.9.0.0). + The consumer is not thread safe and should not be shared across threads. + Arguments: *topics (str): optional list of topics to subscribe to. If not set, call :meth:`~kafka.KafkaConsumer.subscribe` or -- cgit v1.2.1