diff options
-rw-r--r-- | kafka/producer/base.py | 8 |
1 files changed, 5 insertions, 3 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 5b41bc9..a9288d9 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -3,12 +3,14 @@ from __future__ import absolute_import import logging import time +from Queue import Queue try: from queue import Empty except ImportError: from Queue import Empty from collections import defaultdict -from multiprocessing import Queue, Process + +from threading import Thread import six @@ -140,7 +142,7 @@ class Producer(object): log.warning("Current implementation does not retry Failed messages") log.warning("Use at your own risk! (or help improve with a PR!)") self.queue = Queue() # Messages are sent through this queue - self.proc = Process(target=_send_upstream, + self.proc = Thread(target=_send_upstream, args=(self.queue, self.client.copy(), self.codec, @@ -211,4 +213,4 @@ class Producer(object): self.proc.join(timeout) if self.proc.is_alive(): - self.proc.terminate() + raise SystemError("Can't join Kafka async thread") |