diff options
-rw-r--r-- | kafka/consumer/multiprocess.py | 6 |
1 files changed, 3 insertions, 3 deletions
diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py index 2bb97f3..cdfaeeb 100644 --- a/kafka/consumer/multiprocess.py +++ b/kafka/consumer/multiprocess.py @@ -21,7 +21,7 @@ Events = namedtuple("Events", ["start", "pause", "exit"]) log = logging.getLogger("kafka") -def _mp_consume(client, group, topic, queue, size, events, consumer_options): +def _mp_consume(client, group, topic, queue, size, events, **consumer_options): """ A child process worker which consumes messages based on the notifications given by the controller process @@ -153,8 +153,8 @@ class MultiProcessConsumer(Consumer): options.update(simple_consumer_options) args = (client.copy(), group, topic, self.queue, - self.size, self.events, options) - proc = Process(target=_mp_consume, args=args) + self.size, self.events) + proc = Process(target=_mp_consume, args=args, kwargs=options) proc.daemon = True proc.start() self.procs.append(proc) |