summaryrefslogtreecommitdiff
path: root/kafka/consumer/multiprocess.py
diff options
context:
space:
mode:
authorViktor Shlapakov <vshlapakov@gmail.com>2015-03-11 13:51:07 +0300
committerViktor Shlapakov <vshlapakov@gmail.com>2015-03-11 13:51:07 +0300
commit4bab2fa5d1bc67e18b2f7791ff5fbb8e73143a5e (patch)
tree47ee05544a9ca2669f2f21fd529426f3174e4559 /kafka/consumer/multiprocess.py
parentac66fe9159723c6707258bbb4c6727cf66a78219 (diff)
downloadkafka-python-4bab2fa5d1bc67e18b2f7791ff5fbb8e73143a5e.tar.gz
Cleaned code for MP consumer chunking
Diffstat (limited to 'kafka/consumer/multiprocess.py')
-rw-r--r--kafka/consumer/multiprocess.py19
1 files changed, 9 insertions, 10 deletions
diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py
index 40aecf8..db59f7b 100644
--- a/kafka/consumer/multiprocess.py
+++ b/kafka/consumer/multiprocess.py
@@ -123,26 +123,25 @@ class MultiProcessConsumer(Consumer):
self.pause = Event() # Requests the consumers to pause fetch
self.size = Value('i', 0) # Indicator of number of messages to fetch
- partitions = self.offsets.keys()
+ partitions = list(self.offsets.keys())
- # If unspecified, start one consumer per partition
+ # By default, start one consumer process for all partitions
# The logic below ensures that
# * we do not cross the num_procs limit
# * we have an even distribution of partitions among processes
- if not partitions_per_proc:
- partitions_per_proc = round(len(partitions) * 1.0 / num_procs)
- if partitions_per_proc * num_procs < len(partitions):
- partitions_per_proc += 1
+
+ if partitions_per_proc:
+ num_procs = len(partitions) / partitions_per_proc
+ if num_procs * partitions_per_proc < len(partitions):
+ num_procs += 1
# The final set of chunks
- chunker = lambda *x: [] + list(x)
- chunks = map(chunker, *[iter(partitions)] * int(partitions_per_proc))
+ chunks = [partitions[proc::num_procs] for proc in range(num_procs)]
self.procs = []
for chunk in chunks:
- chunk = filter(lambda x: x is not None, chunk)
args = (client.copy(),
- group, topic, list(chunk),
+ group, topic, chunk,
self.queue, self.start, self.exit,
self.pause, self.size)