summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLinus Wallgren <linus.wallgren@gmail.com>2017-05-18 22:49:20 +0200
committerDana Powers <dana.powers@gmail.com>2017-06-16 23:26:14 -0700
commit97261f491d02fb7f72ba30abcfc26240f520a9b4 (patch)
treef01f4d23df893acb4093c2e330730208e6c5e7f3
parent73d78bc76ade2b42abcdea32095d1df930e21c55 (diff)
downloadkafka-python-97261f491d02fb7f72ba30abcfc26240f520a9b4.tar.gz
Describe consumer thread-safety
-rw-r--r--README.rst9
-rw-r--r--docs/index.rst10
-rwxr-xr-xexample.py7
-rw-r--r--kafka/consumer/group.py2
4 files changed, 25 insertions, 3 deletions
diff --git a/README.rst b/README.rst
index 8462504..751a524 100644
--- a/README.rst
+++ b/README.rst
@@ -110,6 +110,15 @@ for more details.
>>> for i in range(1000):
... producer.send('foobar', b'msg %d' % i)
+Thread safety
+*************
+
+The KafkaProducer can be used across threads without issue, unlike the
+KafkaConsumer which cannot.
+
+While it is possible to use the KafkaConsumer in a thread-local manner,
+multiprocessing is recommended.
+
Compression
***********
diff --git a/docs/index.rst b/docs/index.rst
index 18f0721..550d246 100644
--- a/docs/index.rst
+++ b/docs/index.rst
@@ -109,6 +109,16 @@ client. See `KafkaProducer <apidoc/KafkaProducer.html>`_ for more details.
... producer.send('foobar', b'msg %d' % i)
+Thread safety
+*************
+
+The KafkaProducer can be used across threads without issue, unlike the
+KafkaConsumer which cannot.
+
+While it is possible to use the KafkaConsumer in a thread-local manner,
+multiprocessing is recommended.
+
+
Compression
***********
diff --git a/example.py b/example.py
index a1a1e1e..2431ee2 100755
--- a/example.py
+++ b/example.py
@@ -1,5 +1,6 @@
#!/usr/bin/env python
import threading, logging, time
+import multiprocessing
from kafka import KafkaConsumer, KafkaProducer
@@ -16,7 +17,7 @@ class Producer(threading.Thread):
time.sleep(1)
-class Consumer(threading.Thread):
+class Consumer(multiprocessing.Process):
daemon = True
def run(self):
@@ -29,12 +30,12 @@ class Consumer(threading.Thread):
def main():
- threads = [
+ tasks = [
Producer(),
Consumer()
]
- for t in threads:
+ for t in tasks:
t.start()
time.sleep(10)
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 7fa5710..15a8947 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -33,6 +33,8 @@ class KafkaConsumer(six.Iterator):
to allow multiple consumers to load balance consumption of topics (requires
kafka >= 0.9.0.0).
+ The consumer is not thread safe and should not be shared across threads.
+
Arguments:
*topics (str): optional list of topics to subscribe to. If not set,
call :meth:`~kafka.KafkaConsumer.subscribe` or