summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMahendra M <mahendra.m@gmail.com>2013-10-08 09:28:58 +0530
committerMahendra M <mahendra.m@gmail.com>2013-10-08 09:28:58 +0530
commit6e3ee64ef3d8e65de9aab601e7239fbd1ac0de93 (patch)
tree06267ed9ed686bc73646c1637762b727f03a6d32
parent92d70e7310b5519a1be86f189a4ddb9d772a0434 (diff)
downloadkafka-python-6e3ee64ef3d8e65de9aab601e7239fbd1ac0de93.tar.gz
Ensure that multiprocess consumer works in windows
-rw-r--r--kafka/consumer.py116
1 files changed, 63 insertions, 53 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
index 7d44f28..f2898ad 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -404,6 +404,63 @@ class SimpleConsumer(Consumer):
offset = next_offset + 1
+def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
+ """
+ A child process worker which consumes messages based on the
+ notifications given by the controller process
+
+ NOTE: Ideally, this should have been a method inside the Consumer
+ class. However, multiprocessing module has issues in windows. The
+ functionality breaks unless this function is kept outside of a class
+ """
+
+ # Make the child processes open separate socket connections
+ client.reinit()
+
+ # We will start consumers without auto-commit. Auto-commit will be
+ # done by the master controller process.
+ consumer = SimpleConsumer(client, group, topic,
+ partitions=chunk,
+ auto_commit=False,
+ auto_commit_every_n=None,
+ auto_commit_every_t=None)
+
+ # Ensure that the consumer provides the partition information
+ consumer.provide_partition_info()
+
+ while True:
+ # Wait till the controller indicates us to start consumption
+ start.wait()
+
+ # If we are asked to quit, do so
+ if exit.is_set():
+ break
+
+ # Consume messages and add them to the queue. If the controller
+ # indicates a specific number of messages, follow that advice
+ count = 0
+
+ for partition, message in consumer:
+ queue.put((partition, message))
+ count += 1
+
+ # We have reached the required size. The controller might have
+ # more than what he needs. Wait for a while.
+ # Without this logic, it is possible that we run into a big
+ # loop consuming all available messages before the controller
+ # can reset the 'start' event
+ if count == size.value:
+ pause.wait()
+ break
+
+ # In case we did not receive any message, give up the CPU for
+ # a while before we try again
+ if count == 0:
+ time.sleep(0.1)
+
+ consumer.stop()
+
+
class MultiProcessConsumer(Consumer):
"""
A consumer implementation that consumes partitions for a topic in
@@ -468,63 +525,16 @@ class MultiProcessConsumer(Consumer):
self.procs = []
for chunk in chunks:
chunk = filter(lambda x: x is not None, chunk)
- proc = Process(target=self._consume, args=(chunk,))
+ args = (client.copy(),
+ group, topic, chunk,
+ self.queue, self.start, self.exit,
+ self.pause, self.size)
+
+ proc = Process(target=_mp_consume, args=args)
proc.daemon = True
proc.start()
self.procs.append(proc)
- def _consume(self, partitions):
- """
- A child process worker which consumes messages based on the
- notifications given by the controller process
- """
-
- # Make the child processes open separate socket connections
- self.client.reinit()
-
- # We will start consumers without auto-commit. Auto-commit will be
- # done by the master controller process.
- consumer = SimpleConsumer(self.client, self.group, self.topic,
- partitions=partitions,
- auto_commit=False,
- auto_commit_every_n=None,
- auto_commit_every_t=None)
-
- # Ensure that the consumer provides the partition information
- consumer.provide_partition_info()
-
- while True:
- # Wait till the controller indicates us to start consumption
- self.start.wait()
-
- # If we are asked to quit, do so
- if self.exit.is_set():
- break
-
- # Consume messages and add them to the queue. If the controller
- # indicates a specific number of messages, follow that advice
- count = 0
-
- for partition, message in consumer:
- self.queue.put((partition, message))
- count += 1
-
- # We have reached the required size. The controller might have
- # more than what he needs. Wait for a while.
- # Without this logic, it is possible that we run into a big
- # loop consuming all available messages before the controller
- # can reset the 'start' event
- if count == self.size.value:
- self.pause.wait()
- break
-
- # In case we did not receive any message, give up the CPU for
- # a while before we try again
- if count == 0:
- time.sleep(0.1)
-
- consumer.stop()
-
def stop(self):
# Set exit and start off all waiting consumers
self.exit.set()