diff options
author | Mahendra M <mahendra.m@gmail.com> | 2013-06-28 13:59:06 +0530 |
---|---|---|
committer | Mahendra M <mahendra.m@gmail.com> | 2013-06-28 13:59:06 +0530 |
commit | c13ee1df6ab2900ffe0bd48e6376993c0d312a70 (patch) | |
tree | 031a5fbf46c8b26e54aad3c984ec6cd31f63998f | |
parent | 7b2a08faed84296221164bcaca15af08dbb88581 (diff) | |
download | kafka-python-c13ee1df6ab2900ffe0bd48e6376993c0d312a70.tar.gz |
Fix cases of single partition
-rw-r--r-- | README.md | 2 | ||||
-rw-r--r-- | kafka/consumer.py | 5 |
2 files changed, 5 insertions, 2 deletions
@@ -59,6 +59,8 @@ producer = KeyedProducer(kafka, "my-topic", partitioner=RoundRobinPartitioner) ## Multiprocess consumer ```python +from kafka.consume import MultiProcessConsumer + # This will split the number of partitions among two processes consumer = MultiProcessConsumer(kafka, "my-topic", "my-group", num_procs=2) diff --git a/kafka/consumer.py b/kafka/consumer.py index 9e6a0eb..23a2f90 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -437,11 +437,12 @@ class MultiProcessConsumer(Consumer): partitions_per_proc += 1 # The final set of chunks - chunks = map(None, *[iter(partitions)] * int(partitions_per_proc)) + chunker = lambda *x: [] + list(x) + chunks = map(chunker, *[iter(partitions)] * int(partitions_per_proc)) self.procs = [] for chunk in chunks: - chunk = filter(lambda x: x is not None, list(chunk)) + chunk = filter(lambda x: x is not None, chunk) proc = Process(target=self._consume, args=(chunk,)) proc.daemon = True proc.start() |