diff options
-rw-r--r-- | README.rst | 9 | ||||
-rw-r--r-- | docs/index.rst | 10 | ||||
-rwxr-xr-x | example.py | 7 | ||||
-rw-r--r-- | kafka/consumer/group.py | 2 |
4 files changed, 25 insertions, 3 deletions
@@ -110,6 +110,15 @@ for more details. >>> for i in range(1000): ... producer.send('foobar', b'msg %d' % i) +Thread safety +************* + +The KafkaProducer can be used across threads without issue, unlike the +KafkaConsumer which cannot. + +While it is possible to use the KafkaConsumer in a thread-local manner, +multiprocessing is recommended. + Compression *********** diff --git a/docs/index.rst b/docs/index.rst index 18f0721..550d246 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -109,6 +109,16 @@ client. See `KafkaProducer <apidoc/KafkaProducer.html>`_ for more details. ... producer.send('foobar', b'msg %d' % i) +Thread safety +************* + +The KafkaProducer can be used across threads without issue, unlike the +KafkaConsumer which cannot. + +While it is possible to use the KafkaConsumer in a thread-local manner, +multiprocessing is recommended. + + Compression *********** @@ -1,5 +1,6 @@ #!/usr/bin/env python import threading, logging, time +import multiprocessing from kafka import KafkaConsumer, KafkaProducer @@ -16,7 +17,7 @@ class Producer(threading.Thread): time.sleep(1) -class Consumer(threading.Thread): +class Consumer(multiprocessing.Process): daemon = True def run(self): @@ -29,12 +30,12 @@ class Consumer(threading.Thread): def main(): - threads = [ + tasks = [ Producer(), Consumer() ] - for t in threads: + for t in tasks: t.start() time.sleep(10) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 7fa5710..15a8947 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -33,6 +33,8 @@ class KafkaConsumer(six.Iterator): to allow multiple consumers to load balance consumption of topics (requires kafka >= 0.9.0.0). + The consumer is not thread safe and should not be shared across threads. + Arguments: *topics (str): optional list of topics to subscribe to. If not set, call :meth:`~kafka.KafkaConsumer.subscribe` or |