summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorViktor Shlapakov <vshlapakov@gmail.com>2015-02-24 21:31:47 +0300
committerViktor Shlapakov <vshlapakov@gmail.com>2015-02-25 10:37:24 +0300
commit250778453fcece7316615bd62795e9293c62e525 (patch)
treed46bb79e14ed4c997bcd4a0a8aef8359f90d1c70
parent60a73788ee9036a79078193dfab892c6e6ef8f9b (diff)
downloadkafka-python-250778453fcece7316615bd62795e9293c62e525.tar.gz
Trying to use threading for async batching
-rw-r--r--kafka/producer/base.py8
1 files changed, 5 insertions, 3 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index 5b41bc9..a9288d9 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -3,12 +3,14 @@ from __future__ import absolute_import
import logging
import time
+from Queue import Queue
try:
from queue import Empty
except ImportError:
from Queue import Empty
from collections import defaultdict
-from multiprocessing import Queue, Process
+
+from threading import Thread
import six
@@ -140,7 +142,7 @@ class Producer(object):
log.warning("Current implementation does not retry Failed messages")
log.warning("Use at your own risk! (or help improve with a PR!)")
self.queue = Queue() # Messages are sent through this queue
- self.proc = Process(target=_send_upstream,
+ self.proc = Thread(target=_send_upstream,
args=(self.queue,
self.client.copy(),
self.codec,
@@ -211,4 +213,4 @@ class Producer(object):
self.proc.join(timeout)
if self.proc.is_alive():
- self.proc.terminate()
+ raise SystemError("Can't join Kafka async thread")