summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/producer/base.py8
1 files changed, 5 insertions, 3 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index 5b41bc9..a9288d9 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -3,12 +3,14 @@ from __future__ import absolute_import
import logging
import time
+from Queue import Queue
try:
from queue import Empty
except ImportError:
from Queue import Empty
from collections import defaultdict
-from multiprocessing import Queue, Process
+
+from threading import Thread
import six
@@ -140,7 +142,7 @@ class Producer(object):
log.warning("Current implementation does not retry Failed messages")
log.warning("Use at your own risk! (or help improve with a PR!)")
self.queue = Queue() # Messages are sent through this queue
- self.proc = Process(target=_send_upstream,
+ self.proc = Thread(target=_send_upstream,
args=(self.queue,
self.client.copy(),
self.codec,
@@ -211,4 +213,4 @@ class Producer(object):
self.proc.join(timeout)
if self.proc.is_alive():
- self.proc.terminate()
+ raise SystemError("Can't join Kafka async thread")