diff options
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r-- | kafka/consumer.py | 511 |
1 files changed, 422 insertions, 89 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py index db7a793..4c64cf2 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -1,6 +1,10 @@ +from collections import defaultdict from itertools import izip_longest, repeat import logging +import time from threading import Lock +from multiprocessing import Process, Queue, Event, Value +from Queue import Empty from kafka.common import ( ErrorMapping, FetchRequest, @@ -16,37 +20,60 @@ log = logging.getLogger("kafka") AUTO_COMMIT_MSG_COUNT = 100 AUTO_COMMIT_INTERVAL = 5000 +FETCH_DEFAULT_BLOCK_TIMEOUT = 1 +FETCH_MAX_WAIT_TIME = 100 +FETCH_MIN_BYTES = 4096 -class SimpleConsumer(object): + +class FetchContext(object): + """ + Class for managing the state of a consumer during fetch """ - A simple consumer implementation that consumes all partitions for a topic + def __init__(self, consumer, block, timeout): + self.consumer = consumer + self.block = block - client: a connected KafkaClient - group: a name for this consumer, used for offset storage and must be unique - topic: the topic to consume + if block and not timeout: + timeout = FETCH_DEFAULT_BLOCK_TIMEOUT - auto_commit: default True. Whether or not to auto commit the offsets - auto_commit_every_n: default 100. How many messages to consume - before a commit - auto_commit_every_t: default 5000. How much time (in milliseconds) to - wait before commit + self.timeout = timeout * 1000 - Auto commit details: - If both auto_commit_every_n and auto_commit_every_t are set, they will - reset one another when one is triggered. These triggers simply call the - commit method on this class. A manual call to commit will also reset - these triggers + def __enter__(self): + """Set fetch values based on blocking status""" + if self.block: + self.consumer.fetch_max_wait_time = self.timeout + self.consumer.fetch_min_bytes = 1 + else: + self.consumer.fetch_min_bytes = 0 + def __exit__(self, type, value, traceback): + """Reset values to default""" + self.consumer.fetch_max_wait_time = FETCH_MAX_WAIT_TIME + self.consumer.fetch_min_bytes = FETCH_MIN_BYTES + + +class Consumer(object): + """ + Base class to be used by other consumers. Not to be used directly + + This base class provides logic for + * initialization and fetching metadata of partitions + * Auto-commit logic + * APIs for fetching pending message count """ - def __init__(self, client, group, topic, auto_commit=False, + def __init__(self, client, group, topic, partitions=None, auto_commit=True, auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, auto_commit_every_t=AUTO_COMMIT_INTERVAL): + self.client = client self.topic = topic self.group = group self.client._load_metadata_for_topics(topic) self.offsets = {} + if not partitions: + partitions = self.client.topic_partitions[topic] + # Variables for handling offset commits self.commit_lock = Lock() self.commit_timer = None @@ -73,21 +100,139 @@ class SimpleConsumer(object): # Uncomment for 0.8.1 # - #for partition in self.client.topic_partitions[topic]: + #for partition in partitions: # req = OffsetFetchRequest(topic, partition) # (offset,) = self.client.send_offset_fetch_request(group, [req], # callback=get_or_init_offset_callback, # fail_on_error=False) # self.offsets[partition] = offset - for partition in self.client.topic_partitions[topic]: + for partition in partitions: self.offsets[partition] = 0 + def commit(self, partitions=None): + """ + Commit offsets for this consumer + + partitions: list of partitions to commit, default is to commit + all of them + """ + + # short circuit if nothing happened. This check is kept outside + # to prevent un-necessarily acquiring a lock for checking the state + if self.count_since_commit == 0: + return + + with self.commit_lock: + # Do this check again, just in case the state has changed + # during the lock acquiring timeout + if self.count_since_commit == 0: + return + + reqs = [] + if not partitions: # commit all partitions + partitions = self.offsets.keys() + + for partition in partitions: + offset = self.offsets[partition] + log.debug("Commit offset %d in SimpleConsumer: " + "group=%s, topic=%s, partition=%s" % + (offset, self.group, self.topic, partition)) + + reqs.append(OffsetCommitRequest(self.topic, partition, + offset, None)) + + resps = self.client.send_offset_commit_request(self.group, reqs) + for resp in resps: + assert resp.error == 0 + + self.count_since_commit = 0 + + def _auto_commit(self): + """ + Check if we have to commit based on number of messages and commit + """ + + # Check if we are supposed to do an auto-commit + if not self.auto_commit or self.auto_commit_every_n is None: + return + + if self.count_since_commit > self.auto_commit_every_n: + self.commit() + def stop(self): if self.commit_timer is not None: self.commit_timer.stop() self.commit() + def pending(self, partitions=None): + """ + Gets the pending message count + + partitions: list of partitions to check for, default is to check all + """ + if not partitions: + partitions = self.offsets.keys() + + total = 0 + reqs = [] + + for partition in partitions: + reqs.append(OffsetRequest(self.topic, partition, -1, 1)) + + resps = self.client.send_offset_request(reqs) + for resp in resps: + partition = resp.partition + pending = resp.offsets[0] + offset = self.offsets[partition] + total += pending - offset - (1 if offset > 0 else 0) + + return total + + +class SimpleConsumer(Consumer): + """ + A simple consumer implementation that consumes all/specified partitions + for a topic + + client: a connected KafkaClient + group: a name for this consumer, used for offset storage and must be unique + topic: the topic to consume + partitions: An optional list of partitions to consume the data from + + auto_commit: default True. Whether or not to auto commit the offsets + auto_commit_every_n: default 100. How many messages to consume + before a commit + auto_commit_every_t: default 5000. How much time (in milliseconds) to + wait before commit + + Auto commit details: + If both auto_commit_every_n and auto_commit_every_t are set, they will + reset one another when one is triggered. These triggers simply call the + commit method on this class. A manual call to commit will also reset + these triggers + """ + def __init__(self, client, group, topic, auto_commit=True, partitions=None, + auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, + auto_commit_every_t=AUTO_COMMIT_INTERVAL): + + self.partition_info = False # Do not return partition info in msgs + self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME + self.fetch_min_bytes = FETCH_MIN_BYTES + self.fetch_started = defaultdict(bool) # defaults to false + + super(SimpleConsumer, self).__init__(client, group, topic, + partitions=partitions, + auto_commit=auto_commit, + auto_commit_every_n=auto_commit_every_n, + auto_commit_every_t=auto_commit_every_t) + + def provide_partition_info(self): + """ + Indicates that partition info must be returned by the consumer + """ + self.partition_info = True + def seek(self, offset, whence): """ Alter the current offset in the consumer, similar to fseek @@ -116,6 +261,12 @@ class SimpleConsumer(object): reqs.append(OffsetRequest(self.topic, partition, -2, 1)) elif whence == 2: reqs.append(OffsetRequest(self.topic, partition, -1, 1)) + + # The API returns back the next available offset + # For eg: if the current offset is 18, the API will return + # back 19. So, if we have to seek 5 points before, we will + # end up going back to 14, instead of 13. Adjust this + deltas[partition] -= 1 else: pass @@ -126,79 +277,30 @@ class SimpleConsumer(object): else: raise ValueError("Unexpected value for `whence`, %d" % whence) - def pending(self, partitions=[]): - """ - Gets the pending message count - - partitions: list of partitions to check for, default is to check all + def get_messages(self, count=1, block=True, timeout=0.1): """ - if len(partitions) == 0: - partitions = self.offsets.keys() - - total = 0 - reqs = [] - - for partition in partitions: - reqs.append(OffsetRequest(self.topic, partition, -1, 1)) + Fetch the specified number of messages - resps = self.client.send_offset_request(reqs) - for resp in resps: - partition = resp.partition - pending = resp.offsets[0] - offset = self.offsets[partition] - total += pending - offset - (1 if offset > 0 else 0) - - return total - - def commit(self, partitions=[]): - """ - Commit offsets for this consumer - - partitions: list of partitions to commit, default is to commit - all of them + count: Indicates the maximum number of messages to be fetched + block: If True, the API will block till some messages are fetched. + timeout: If None, and block=True, the API will block infinitely. + If >0, API will block for specified time (in seconds) """ + messages = [] + iterator = self.__iter__() - # short circuit if nothing happened. This check is kept outside - # to prevent un-necessarily acquiring a lock for checking the state - if self.count_since_commit == 0: - return - - with self.commit_lock: - # Do this check again, just in case the state has changed - # during the lock acquiring timeout - if self.count_since_commit == 0: - return - - reqs = [] - if len(partitions) == 0: # commit all partitions - partitions = self.offsets.keys() - - for partition in partitions: - offset = self.offsets[partition] - log.debug("Commit offset %d in SimpleConsumer: " - "group=%s, topic=%s, partition=%s" % - (offset, self.group, self.topic, partition)) - - reqs.append(OffsetCommitRequest(self.topic, partition, - offset, None)) + # HACK: This splits the timeout between available partitions + timeout = timeout * 1.0 / len(self.offsets) - resps = self.client.send_offset_commit_request(self.group, reqs) - for resp in resps: - assert resp.error == 0 - - self.count_since_commit = 0 - - def _auto_commit(self): - """ - Check if we have to commit based on number of messages and commit - """ - - # Check if we are supposed to do an auto-commit - if not self.auto_commit or self.auto_commit_every_n is None: - return + with FetchContext(self, block, timeout): + while count > 0: + try: + messages.append(next(iterator)) + except StopIteration as exp: + break + count -= 1 - if self.count_since_commit > self.auto_commit_every_n: - self.commit() + return messages def __iter__(self): """ @@ -218,7 +320,10 @@ class SimpleConsumer(object): for partition, it in iters.items(): try: - yield it.next() + if self.partition_info: + yield (partition, it.next()) + else: + yield it.next() except StopIteration: log.debug("Done iterating over partition %s" % partition) del iters[partition] @@ -238,10 +343,23 @@ class SimpleConsumer(object): the end of this partition. """ + # The offset that is stored in the consumer is the offset that + # we have consumed. In subsequent iterations, we are supposed to + # fetch the next message (that is from the next offset) + # However, for the 0th message, the offset should be as-is. + # An OffsetFetchRequest to Kafka gives 0 for a new queue. This is + # problematic, since 0 is offset of a message which we have not yet + # consumed. + if self.fetch_started[partition]: + offset += 1 + while True: # TODO: configure fetch size req = FetchRequest(self.topic, partition, offset, 1024) - (resp,) = self.client.send_fetch_request([req]) + + (resp,) = self.client.send_fetch_request([req], + max_wait_time=self.fetch_max_wait_time, + min_bytes=self.fetch_min_bytes) assert resp.topic == self.topic assert resp.partition == partition @@ -249,10 +367,225 @@ class SimpleConsumer(object): next_offset = None for message in resp.messages: next_offset = message.offset - yield message - # update the internal state _after_ we yield the message + + # update the offset before the message is yielded. This is + # so that the consumer state is not lost in certain cases. + # For eg: the message is yielded and consumed by the caller, + # but the caller does not come back into the generator again. + # The message will be consumed but the status will not be + # updated in the consumer + self.fetch_started[partition] = True self.offsets[partition] = message.offset + yield message if next_offset is None: break else: offset = next_offset + 1 + + +class MultiProcessConsumer(Consumer): + """ + A consumer implementation that consumes partitions for a topic in + parallel using multiple processes + + client: a connected KafkaClient + group: a name for this consumer, used for offset storage and must be unique + topic: the topic to consume + + auto_commit: default True. Whether or not to auto commit the offsets + auto_commit_every_n: default 100. How many messages to consume + before a commit + auto_commit_every_t: default 5000. How much time (in milliseconds) to + wait before commit + num_procs: Number of processes to start for consuming messages. + The available partitions will be divided among these processes + partitions_per_proc: Number of partitions to be allocated per process + (overrides num_procs) + + Auto commit details: + If both auto_commit_every_n and auto_commit_every_t are set, they will + reset one another when one is triggered. These triggers simply call the + commit method on this class. A manual call to commit will also reset + these triggers + """ + def __init__(self, client, group, topic, auto_commit=True, + auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, + auto_commit_every_t=AUTO_COMMIT_INTERVAL, + num_procs=1, partitions_per_proc=0): + + # Initiate the base consumer class + super(MultiProcessConsumer, self).__init__(client, group, topic, + partitions=None, + auto_commit=auto_commit, + auto_commit_every_n=auto_commit_every_n, + auto_commit_every_t=auto_commit_every_t) + + # Variables for managing and controlling the data flow from + # consumer child process to master + self.queue = Queue(1024) # Child consumers dump messages into this + self.start = Event() # Indicates the consumers to start fetch + self.exit = Event() # Requests the consumers to shutdown + self.pause = Event() # Requests the consumers to pause fetch + self.size = Value('i', 0) # Indicator of number of messages to fetch + + partitions = self.offsets.keys() + + # If unspecified, start one consumer per partition + # The logic below ensures that + # * we do not cross the num_procs limit + # * we have an even distribution of partitions among processes + if not partitions_per_proc: + partitions_per_proc = round(len(partitions) * 1.0 / num_procs) + if partitions_per_proc < num_procs * 0.5: + partitions_per_proc += 1 + + # The final set of chunks + chunker = lambda *x: [] + list(x) + chunks = map(chunker, *[iter(partitions)] * int(partitions_per_proc)) + + self.procs = [] + for chunk in chunks: + chunk = filter(lambda x: x is not None, chunk) + proc = Process(target=self._consume, args=(chunk,)) + proc.daemon = True + proc.start() + self.procs.append(proc) + + def _consume(self, partitions): + """ + A child process worker which consumes messages based on the + notifications given by the controller process + """ + + # Make the child processes open separate socket connections + self.client.reinit() + + # We will start consumers without auto-commit. Auto-commit will be + # done by the master controller process. + consumer = SimpleConsumer(self.client, self.group, self.topic, + partitions=partitions, + auto_commit=False, + auto_commit_every_n=None, + auto_commit_every_t=None) + + # Ensure that the consumer provides the partition information + consumer.provide_partition_info() + + while True: + # Wait till the controller indicates us to start consumption + self.start.wait() + + # If we are asked to quit, do so + if self.exit.is_set(): + break + + # Consume messages and add them to the queue. If the controller + # indicates a specific number of messages, follow that advice + count = 0 + + for partition, message in consumer: + self.queue.put((partition, message)) + count += 1 + + # We have reached the required size. The controller might have + # more than what he needs. Wait for a while. + # Without this logic, it is possible that we run into a big + # loop consuming all available messages before the controller + # can reset the 'start' event + if count == self.size.value: + self.pause.wait() + break + + # In case we did not receive any message, give up the CPU for + # a while before we try again + if count == 0: + time.sleep(0.1) + + consumer.stop() + + def stop(self): + # Set exit and start off all waiting consumers + self.exit.set() + self.pause.set() + self.start.set() + + for proc in self.procs: + proc.join() + proc.terminate() + + super(MultiProcessConsumer, self).stop() + + def __iter__(self): + """ + Iterator to consume the messages available on this consumer + """ + # Trigger the consumer procs to start off. + # We will iterate till there are no more messages available + self.size.value = 0 + self.pause.set() + + while True: + self.start.set() + try: + # We will block for a small while so that the consumers get + # a chance to run and put some messages in the queue + # TODO: This is a hack and will make the consumer block for + # at least one second. Need to find a better way of doing this + partition, message = self.queue.get(block=True, timeout=1) + except Empty: + break + + # Count, check and commit messages if necessary + self.offsets[partition] = message.offset + self.start.clear() + yield message + + self.count_since_commit += 1 + self._auto_commit() + + self.start.clear() + + def get_messages(self, count=1, block=True, timeout=10): + """ + Fetch the specified number of messages + + count: Indicates the maximum number of messages to be fetched + block: If True, the API will block till some messages are fetched. + timeout: If None, and block=True, the API will block infinitely. + If >0, API will block for specified time (in seconds) + """ + messages = [] + + # Give a size hint to the consumers. Each consumer process will fetch + # a maximum of "count" messages. This will fetch more messages than + # necessary, but these will not be committed to kafka. Also, the extra + # messages can be provided in subsequent runs + self.size.value = count + self.pause.clear() + + while count > 0: + # Trigger consumption only if the queue is empty + # By doing this, we will ensure that consumers do not + # go into overdrive and keep consuming thousands of + # messages when the user might need only a few + if self.queue.empty(): + self.start.set() + + try: + partition, message = self.queue.get(block, timeout) + except Empty: + break + + messages.append(message) + + # Count, check and commit messages if necessary + self.offsets[partition] = message.offset + self.count_since_commit += 1 + self._auto_commit() + count -= 1 + + self.size.value = 0 + self.start.clear() + self.pause.set() + + return messages |