diff options
author | Mahendra M <mahendra.m@gmail.com> | 2013-06-24 18:05:13 +0530 |
---|---|---|
committer | Mahendra M <mahendra.m@gmail.com> | 2013-06-24 18:05:13 +0530 |
commit | b578725fc338dc80cb82ad3471d488881f9dd785 (patch) | |
tree | 880c3715913fc6ab36c192a6299c56c94d82259b | |
parent | aec388474ee74607683daff3c8f74f4543d26e86 (diff) | |
download | kafka-python-b578725fc338dc80cb82ad3471d488881f9dd785.tar.gz |
Add support for multi-process consumer
-rw-r--r-- | README.md | 14 | ||||
-rw-r--r-- | kafka/consumer.py | 334 |
2 files changed, 300 insertions, 48 deletions
@@ -57,6 +57,20 @@ producer.send("key2", "this methode") producer = KeyedProducer(kafka, "my-topic", partitioner=RoundRobinPartitioner) ``` +# Multiprocess consumer +# This will split the number of partitions among two processes +consumer = MultiProcessConsumer(kafka, "my-topic", "my-group", num_procs=2) + +# This will spawn processes such that each handles 2 partitions max +consumer = MultiProcessConsumer(kafka, "my-topic", "my-group", + partitions_per_proc=2) + +for message in consumer: + print(message) + +for message in consumer.get_messages(count=5, block=True, timeout=4): + print(message) + ## Low level ```python diff --git a/kafka/consumer.py b/kafka/consumer.py index 467bd76..e6d95d0 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -1,6 +1,7 @@ from itertools import izip_longest, repeat import logging from threading import Lock +from multiprocessing import Process, Queue, Event, Value from kafka.common import ( ErrorMapping, FetchRequest, @@ -17,36 +18,17 @@ AUTO_COMMIT_MSG_COUNT = 100 AUTO_COMMIT_INTERVAL = 5000 -class SimpleConsumer(object): - """ - A simple consumer implementation that consumes all partitions for a topic - - client: a connected KafkaClient - group: a name for this consumer, used for offset storage and must be unique - topic: the topic to consume - partitions: An optional list of partitions to consume the data from - - auto_commit: default True. Whether or not to auto commit the offsets - auto_commit_every_n: default 100. How many messages to consume - before a commit - auto_commit_every_t: default 5000. How much time (in milliseconds) to - wait before commit - - Auto commit details: - If both auto_commit_every_n and auto_commit_every_t are set, they will - reset one another when one is triggered. These triggers simply call the - commit method on this class. A manual call to commit will also reset - these triggers - - """ - def __init__(self, client, group, topic, auto_commit=True, partitions=None, +class Consumer(object): + def __init__(self, client, topic, partitions=None, auto_commit=True, auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, auto_commit_every_t=AUTO_COMMIT_INTERVAL): + self.client = client self.topic = topic self.group = group self.client._load_metadata_for_topics(topic) self.offsets = {} + self.partition_info = False if not partitions: partitions = self.client.topic_partitions[topic] @@ -87,6 +69,123 @@ class SimpleConsumer(object): for partition in partitions: self.offsets[partition] = 0 + def provide_partition_info(self): + self.partition_info = True + + def _timed_commit(self): + """ + Commit offsets as part of timer + """ + self.commit() + + # Once the commit is done, start the timer again + self.commit_timer.start() + + def commit(self, partitions=None): + """ + Commit offsets for this consumer + + partitions: list of partitions to commit, default is to commit + all of them + """ + + # short circuit if nothing happened + if self.count_since_commit == 0: + return + + with self.commit_lock: + reqs = [] + if not partitions: # commit all partitions + partitions = self.offsets.keys() + + for partition in partitions: + offset = self.offsets[partition] + log.debug("Commit offset %d in SimpleConsumer: " + "group=%s, topic=%s, partition=%s" % + (offset, self.group, self.topic, partition)) + + reqs.append(OffsetCommitRequest(self.topic, partition, + offset, None)) + + resps = self.client.send_offset_commit_request(self.group, reqs) + for resp in resps: + assert resp.error == 0 + + self.count_since_commit = 0 + + def _auto_commit(self): + """ + Check if we have to commit based on number of messages and commit + """ + + # Check if we are supposed to do an auto-commit + if not self.auto_commit or self.auto_commit_every_n is None: + return + + if self.count_since_commit > self.auto_commit_every_n: + if self.commit_timer is not None: + self.commit_timer.stop() + self.commit() + self.commit_timer.start() + else: + self.commit() + + def pending(self, partitions=None): + """ + Gets the pending message count + + partitions: list of partitions to check for, default is to check all + """ + if not partitions: + partitions = self.offsets.keys() + + total = 0 + reqs = [] + + for partition in partitions: + reqs.append(OffsetRequest(self.topic, partition, -1, 1)) + + resps = self.client.send_offset_request(reqs) + for resp in resps: + partition = resp.partition + pending = resp.offsets[0] + offset = self.offsets[partition] + total += pending - offset - (1 if offset > 0 else 0) + + return total + + +class SimpleConsumer(Consumer): + """ + A simple consumer implementation that consumes all partitions for a topic + + client: a connected KafkaClient + group: a name for this consumer, used for offset storage and must be unique + topic: the topic to consume + partitions: An optional list of partitions to consume the data from + + auto_commit: default True. Whether or not to auto commit the offsets + auto_commit_every_n: default 100. How many messages to consume + before a commit + auto_commit_every_t: default 5000. How much time (in milliseconds) to + wait before commit + + Auto commit details: + If both auto_commit_every_n and auto_commit_every_t are set, they will + reset one another when one is triggered. These triggers simply call the + commit method on this class. A manual call to commit will also reset + these triggers + + """ + def __init__(self, client, group, topic, auto_commit=True, partitions=None, + auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, + auto_commit_every_t=AUTO_COMMIT_INTERVAL): + + super(SimpleConsumer, self).__init__(client, group, topic, + auto_commit, partitions, + auto_commit_every_n, + auto_commit_every_t) + def stop(self): if self.commit_timer is not None: self.commit_timer.stop() @@ -130,30 +229,6 @@ class SimpleConsumer(object): else: raise ValueError("Unexpected value for `whence`, %d" % whence) - def pending(self, partitions=None): - """ - Gets the pending message count - - partitions: list of partitions to check for, default is to check all - """ - if not partitions: - partitions = self.offsets.keys() - - total = 0 - reqs = [] - - for partition in partitions: - reqs.append(OffsetRequest(self.topic, partition, -1, 1)) - - resps = self.client.send_offset_request(reqs) - for resp in resps: - partition = resp.partition - pending = resp.offsets[0] - offset = self.offsets[partition] - total += pending - offset - (1 if offset > 0 else 0) - - return total - def _timed_commit(self): """ Commit offsets as part of timer @@ -230,7 +305,10 @@ class SimpleConsumer(object): for partition, it in iters.items(): try: - yield it.next() + if self.partition_info: + yield (partition, it.next()) + else: + yield it.next() except StopIteration: log.debug("Done iterating over partition %s" % partition) del iters[partition] @@ -250,6 +328,9 @@ class SimpleConsumer(object): the end of this partition. """ + if offset != 0: + offset += 1 + while True: # TODO: configure fetch size req = FetchRequest(self.topic, partition, offset, 1024) @@ -268,3 +349,160 @@ class SimpleConsumer(object): break else: offset = next_offset + 1 + + +class MultiProcessConsumer(Consumer): + """ + A consumer implementation that consumes partitions for a topic in + parallel from multiple partitions + + client: a connected KafkaClient + group: a name for this consumer, used for offset storage and must be unique + topic: the topic to consume + + auto_commit: default True. Whether or not to auto commit the offsets + auto_commit_every_n: default 100. How many messages to consume + before a commit + auto_commit_every_t: default 5000. How much time (in milliseconds) to + wait before commit + num_procs: Number of processes to start for consuming messages. + The available partitions will be divided among these processes + partitions_per_proc: Number of partitions to be allocated per process + (overrides num_procs) + + Auto commit details: + If both auto_commit_every_n and auto_commit_every_t are set, they will + reset one another when one is triggered. These triggers simply call the + commit method on this class. A manual call to commit will also reset + these triggers + + """ + def __init__(self, client, group, topic, auto_commit=True, + auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, + auto_commit_every_t=AUTO_COMMIT_INTERVAL, + num_procs=1, partitions_per_proc=0): + + # Initiate the base consumer class + super(MultiProcessConsumer, self).__init__(client, group, topic, + auto_commit, partitions, + auto_commit_every_n, + auto_commit_every_t) + + # Variables for managing and controlling the data flow from + # consumer child process to master + self.queue = Queue() # Child consumers dump messages into this + self.start = Event() # Indicates the consumers to start + self.exit = Event() # Requests the consumers to shutdown + self.pause = Event() # Requests the consumers to pause + self.size = Value('i', 0) # Indicator of number of messages to fetch + + partitions = self.offsets.keys() + + # If unspecified, start one consumer per partition + if not partitions_per_proc: + partitions_per_proc = round(len(partitions) * 1.0 / num_procs) + if partitions_per_proc < num_procs * 0.5: + partitions_per_proc += 1 + + self.procs = [] + + for slices in map(None, *[iter(partitions)] * int(partitions_per_proc)): + proc = Process(target=_self._consume, args=(slices,)) + proc.daemon = True + proc.start() + self.procs.append(proc) + + # We do not need a consumer instance anymore + consumer.stop() + + def _consume(self, slices): + + # We will start consumers without auto-commit. Auto-commit will be + # done by the master process. + consumer = SimpleConsumer(self.client, self.group, self.topic, + partitions=slices, + auto_commit=False, + auto_commit_every_n=0, + auto_commit_every_t=None) + + # Ensure that the consumer provides the partition information + consumer.provide_partition_info() + + while True: + self.start.wait() + if self.exit.isSet(): + break + + count = 0 + for partition, message in consumer: + self.queue.put((partition, message)) + count += 1 + + # We have reached the required size. The master might have + # more than what he needs. Wait for a while + if count == self.size.value: + self.pause.wait() + break + + consumer.stop() + + def stop(self): + # Set exit and start off all waiting consumers + self.exit.set() + self.start.set() + + for proc in self.procs: + proc.join() + proc.terminate() + + def __iter__(self): + # Trigger the consumer procs to start off + self.size.value = 0 + self.start.set() + self.pause.set() + + while not self.queue.empty(): + partition, message = self.queue.get() + yield message + + # Count, check and commit messages if necessary + self.offsets[partition] = message.offset + self.count_since_commit += 1 + self._auto_commit() + + self.start.clear() + + def get_messages(self, count=1, block=True, timeout=10): + messages = [] + + # Give a size hint to the consumers + self.size.value = count + self.pause.clear() + + while count > 0: + # Trigger consumption only if the queue is empty + # By doing this, we will ensure that consumers do not + # go into overdrive and keep consuming thousands of + # messages when the user might need only two + if self.queue.empty(): + self.start.set() + + try: + partition, message = self.queue.get(block, timeout) + except Queue.Empty: + break + + messages.append(message) + + # Count, check and commit messages if necessary + self.offsets[partition] = message.offset + self.count_since_commit += 1 + self._auto_commit() + count -= 1 + + self.size.value = 0 + + self.start.clear() + self.pause.set() + + return messages |