summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/consumer/multiprocess.py6
1 files changed, 3 insertions, 3 deletions
diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py
index 2bb97f3..cdfaeeb 100644
--- a/kafka/consumer/multiprocess.py
+++ b/kafka/consumer/multiprocess.py
@@ -21,7 +21,7 @@ Events = namedtuple("Events", ["start", "pause", "exit"])
log = logging.getLogger("kafka")
-def _mp_consume(client, group, topic, queue, size, events, consumer_options):
+def _mp_consume(client, group, topic, queue, size, events, **consumer_options):
"""
A child process worker which consumes messages based on the
notifications given by the controller process
@@ -153,8 +153,8 @@ class MultiProcessConsumer(Consumer):
options.update(simple_consumer_options)
args = (client.copy(), group, topic, self.queue,
- self.size, self.events, options)
- proc = Process(target=_mp_consume, args=args)
+ self.size, self.events)
+ proc = Process(target=_mp_consume, args=args, kwargs=options)
proc.daemon = True
proc.start()
self.procs.append(proc)