diff options
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r-- | kafka/consumer.py | 116 |
1 files changed, 63 insertions, 53 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py index 7d44f28..f2898ad 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -404,6 +404,63 @@ class SimpleConsumer(Consumer): offset = next_offset + 1 +def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size): + """ + A child process worker which consumes messages based on the + notifications given by the controller process + + NOTE: Ideally, this should have been a method inside the Consumer + class. However, multiprocessing module has issues in windows. The + functionality breaks unless this function is kept outside of a class + """ + + # Make the child processes open separate socket connections + client.reinit() + + # We will start consumers without auto-commit. Auto-commit will be + # done by the master controller process. + consumer = SimpleConsumer(client, group, topic, + partitions=chunk, + auto_commit=False, + auto_commit_every_n=None, + auto_commit_every_t=None) + + # Ensure that the consumer provides the partition information + consumer.provide_partition_info() + + while True: + # Wait till the controller indicates us to start consumption + start.wait() + + # If we are asked to quit, do so + if exit.is_set(): + break + + # Consume messages and add them to the queue. If the controller + # indicates a specific number of messages, follow that advice + count = 0 + + for partition, message in consumer: + queue.put((partition, message)) + count += 1 + + # We have reached the required size. The controller might have + # more than what he needs. Wait for a while. + # Without this logic, it is possible that we run into a big + # loop consuming all available messages before the controller + # can reset the 'start' event + if count == size.value: + pause.wait() + break + + # In case we did not receive any message, give up the CPU for + # a while before we try again + if count == 0: + time.sleep(0.1) + + consumer.stop() + + class MultiProcessConsumer(Consumer): """ A consumer implementation that consumes partitions for a topic in @@ -468,63 +525,16 @@ class MultiProcessConsumer(Consumer): self.procs = [] for chunk in chunks: chunk = filter(lambda x: x is not None, chunk) - proc = Process(target=self._consume, args=(chunk,)) + args = (client.copy(), + group, topic, chunk, + self.queue, self.start, self.exit, + self.pause, self.size) + + proc = Process(target=_mp_consume, args=args) proc.daemon = True proc.start() self.procs.append(proc) - def _consume(self, partitions): - """ - A child process worker which consumes messages based on the - notifications given by the controller process - """ - - # Make the child processes open separate socket connections - self.client.reinit() - - # We will start consumers without auto-commit. Auto-commit will be - # done by the master controller process. - consumer = SimpleConsumer(self.client, self.group, self.topic, - partitions=partitions, - auto_commit=False, - auto_commit_every_n=None, - auto_commit_every_t=None) - - # Ensure that the consumer provides the partition information - consumer.provide_partition_info() - - while True: - # Wait till the controller indicates us to start consumption - self.start.wait() - - # If we are asked to quit, do so - if self.exit.is_set(): - break - - # Consume messages and add them to the queue. If the controller - # indicates a specific number of messages, follow that advice - count = 0 - - for partition, message in consumer: - self.queue.put((partition, message)) - count += 1 - - # We have reached the required size. The controller might have - # more than what he needs. Wait for a while. - # Without this logic, it is possible that we run into a big - # loop consuming all available messages before the controller - # can reset the 'start' event - if count == self.size.value: - self.pause.wait() - break - - # In case we did not receive any message, give up the CPU for - # a while before we try again - if count == 0: - time.sleep(0.1) - - consumer.stop() - def stop(self): # Set exit and start off all waiting consumers self.exit.set() |