diff options
author | Viktor Shlapakov <vshlapakov@gmail.com> | 2015-02-24 21:31:47 +0300 |
---|---|---|
committer | Viktor Shlapakov <vshlapakov@gmail.com> | 2015-02-25 10:37:24 +0300 |
commit | 250778453fcece7316615bd62795e9293c62e525 (patch) | |
tree | d46bb79e14ed4c997bcd4a0a8aef8359f90d1c70 | |
parent | 60a73788ee9036a79078193dfab892c6e6ef8f9b (diff) | |
download | kafka-python-250778453fcece7316615bd62795e9293c62e525.tar.gz |
Trying to use threading for async batching
-rw-r--r-- | kafka/producer/base.py | 8 |
1 files changed, 5 insertions, 3 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 5b41bc9..a9288d9 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -3,12 +3,14 @@ from __future__ import absolute_import import logging import time +from Queue import Queue try: from queue import Empty except ImportError: from Queue import Empty from collections import defaultdict -from multiprocessing import Queue, Process + +from threading import Thread import six @@ -140,7 +142,7 @@ class Producer(object): log.warning("Current implementation does not retry Failed messages") log.warning("Use at your own risk! (or help improve with a PR!)") self.queue = Queue() # Messages are sent through this queue - self.proc = Process(target=_send_upstream, + self.proc = Thread(target=_send_upstream, args=(self.queue, self.client.copy(), self.codec, @@ -211,4 +213,4 @@ class Producer(object): self.proc.join(timeout) if self.proc.is_alive(): - self.proc.terminate() + raise SystemError("Can't join Kafka async thread") |