diff options
author | Viktor Shlapakov <vshlapakov@gmail.com> | 2015-03-11 12:41:34 +0300 |
---|---|---|
committer | Viktor Shlapakov <vshlapakov@gmail.com> | 2015-03-13 15:48:30 +0300 |
commit | f8012c1a74a60623d541d30ff5a21e86fd48c81a (patch) | |
tree | fed3b5887fb5baa529dc364720b59b749cf706b8 | |
parent | 09028f0b22389a6319a2c1283c902b5f83b81917 (diff) | |
download | kafka-python-f8012c1a74a60623d541d30ff5a21e86fd48c81a.tar.gz |
Moved additional MP consumer options to **kwargs
-rw-r--r-- | kafka/consumer/multiprocess.py | 6 |
1 files changed, 3 insertions, 3 deletions
diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py index 2bb97f3..cdfaeeb 100644 --- a/kafka/consumer/multiprocess.py +++ b/kafka/consumer/multiprocess.py @@ -21,7 +21,7 @@ Events = namedtuple("Events", ["start", "pause", "exit"]) log = logging.getLogger("kafka") -def _mp_consume(client, group, topic, queue, size, events, consumer_options): +def _mp_consume(client, group, topic, queue, size, events, **consumer_options): """ A child process worker which consumes messages based on the notifications given by the controller process @@ -153,8 +153,8 @@ class MultiProcessConsumer(Consumer): options.update(simple_consumer_options) args = (client.copy(), group, topic, self.queue, - self.size, self.events, options) - proc = Process(target=_mp_consume, args=args) + self.size, self.events) + proc = Process(target=_mp_consume, args=args, kwargs=options) proc.daemon = True proc.start() self.procs.append(proc) |