diff options
author | Mahendra M <mahendra.m@gmail.com> | 2013-06-28 13:59:06 +0530 |
---|---|---|
committer | Mahendra M <mahendra.m@gmail.com> | 2013-06-28 13:59:06 +0530 |
commit | c13ee1df6ab2900ffe0bd48e6376993c0d312a70 (patch) | |
tree | 031a5fbf46c8b26e54aad3c984ec6cd31f63998f /kafka/consumer.py | |
parent | 7b2a08faed84296221164bcaca15af08dbb88581 (diff) | |
download | kafka-python-c13ee1df6ab2900ffe0bd48e6376993c0d312a70.tar.gz |
Fix cases of single partition
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r-- | kafka/consumer.py | 5 |
1 files changed, 3 insertions, 2 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py index 9e6a0eb..23a2f90 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -437,11 +437,12 @@ class MultiProcessConsumer(Consumer): partitions_per_proc += 1 # The final set of chunks - chunks = map(None, *[iter(partitions)] * int(partitions_per_proc)) + chunker = lambda *x: [] + list(x) + chunks = map(chunker, *[iter(partitions)] * int(partitions_per_proc)) self.procs = [] for chunk in chunks: - chunk = filter(lambda x: x is not None, list(chunk)) + chunk = filter(lambda x: x is not None, chunk) proc = Process(target=self._consume, args=(chunk,)) proc.daemon = True proc.start() |