diff options
-rw-r--r-- | README.md | 2 | ||||
-rw-r--r-- | kafka/consumer.py | 5 |
2 files changed, 5 insertions, 2 deletions
@@ -59,6 +59,8 @@ producer = KeyedProducer(kafka, "my-topic", partitioner=RoundRobinPartitioner) ## Multiprocess consumer ```python +from kafka.consume import MultiProcessConsumer + # This will split the number of partitions among two processes consumer = MultiProcessConsumer(kafka, "my-topic", "my-group", num_procs=2) diff --git a/kafka/consumer.py b/kafka/consumer.py index 9e6a0eb..23a2f90 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -437,11 +437,12 @@ class MultiProcessConsumer(Consumer): partitions_per_proc += 1 # The final set of chunks - chunks = map(None, *[iter(partitions)] * int(partitions_per_proc)) + chunker = lambda *x: [] + list(x) + chunks = map(chunker, *[iter(partitions)] * int(partitions_per_proc)) self.procs = [] for chunk in chunks: - chunk = filter(lambda x: x is not None, list(chunk)) + chunk = filter(lambda x: x is not None, chunk) proc = Process(target=self._consume, args=(chunk,)) proc.daemon = True proc.start() |