diff options
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r-- | kafka/consumer.py | 698 |
1 files changed, 0 insertions, 698 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py deleted file mode 100644 index 42628e1..0000000 --- a/kafka/consumer.py +++ /dev/null @@ -1,698 +0,0 @@ -from __future__ import absolute_import - -try: - from itertools import zip_longest as izip_longest, repeat # pylint: disable-msg=E0611 -except ImportError: # python 2 - from itertools import izip_longest as izip_longest, repeat -import logging -import time -import numbers -from threading import Lock -from multiprocessing import Process, Queue as MPQueue, Event, Value - -import six - -try: - from Queue import Empty, Queue -except ImportError: # python 2 - from queue import Empty, Queue - -import kafka.common -from kafka.common import ( - FetchRequest, OffsetRequest, - OffsetCommitRequest, OffsetFetchRequest, - ConsumerFetchSizeTooSmall, ConsumerNoMoreData, - UnknownTopicOrPartitionError -) - -from kafka.util import ReentrantTimer - -log = logging.getLogger("kafka") - -AUTO_COMMIT_MSG_COUNT = 100 -AUTO_COMMIT_INTERVAL = 5000 - -FETCH_DEFAULT_BLOCK_TIMEOUT = 1 -FETCH_MAX_WAIT_TIME = 100 -FETCH_MIN_BYTES = 4096 -FETCH_BUFFER_SIZE_BYTES = 4096 -MAX_FETCH_BUFFER_SIZE_BYTES = FETCH_BUFFER_SIZE_BYTES * 8 - -ITER_TIMEOUT_SECONDS = 60 -NO_MESSAGES_WAIT_TIME_SECONDS = 0.1 - - -class FetchContext(object): - """ - Class for managing the state of a consumer during fetch - """ - def __init__(self, consumer, block, timeout): - self.consumer = consumer - self.block = block - - if block: - if not timeout: - timeout = FETCH_DEFAULT_BLOCK_TIMEOUT - self.timeout = timeout * 1000 - - def __enter__(self): - """Set fetch values based on blocking status""" - self.orig_fetch_max_wait_time = self.consumer.fetch_max_wait_time - self.orig_fetch_min_bytes = self.consumer.fetch_min_bytes - if self.block: - self.consumer.fetch_max_wait_time = self.timeout - self.consumer.fetch_min_bytes = 1 - else: - self.consumer.fetch_min_bytes = 0 - - def __exit__(self, type, value, traceback): - """Reset values""" - self.consumer.fetch_max_wait_time = self.orig_fetch_max_wait_time - self.consumer.fetch_min_bytes = self.orig_fetch_min_bytes - - -class Consumer(object): - """ - Base class to be used by other consumers. Not to be used directly - - This base class provides logic for - * initialization and fetching metadata of partitions - * Auto-commit logic - * APIs for fetching pending message count - """ - def __init__(self, client, group, topic, partitions=None, auto_commit=True, - auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, - auto_commit_every_t=AUTO_COMMIT_INTERVAL): - - self.client = client - self.topic = topic - self.group = group - self.client.load_metadata_for_topics(topic) - self.offsets = {} - - if not partitions: - partitions = self.client.get_partition_ids_for_topic(topic) - else: - assert all(isinstance(x, numbers.Integral) for x in partitions) - - # Variables for handling offset commits - self.commit_lock = Lock() - self.commit_timer = None - self.count_since_commit = 0 - self.auto_commit = auto_commit - self.auto_commit_every_n = auto_commit_every_n - self.auto_commit_every_t = auto_commit_every_t - - # Set up the auto-commit timer - if auto_commit is True and auto_commit_every_t is not None: - self.commit_timer = ReentrantTimer(auto_commit_every_t, - self.commit) - self.commit_timer.start() - - if auto_commit: - self.fetch_last_known_offsets(partitions) - else: - for partition in partitions: - self.offsets[partition] = 0 - - def fetch_last_known_offsets(self, partitions=None): - if not partitions: - partitions = self.client.get_partition_ids_for_topic(self.topic) - - def get_or_init_offset(resp): - try: - kafka.common.check_error(resp) - return resp.offset - except UnknownTopicOrPartitionError: - return 0 - - for partition in partitions: - req = OffsetFetchRequest(self.topic, partition) - (resp,) = self.client.send_offset_fetch_request(self.group, [req], - fail_on_error=False) - self.offsets[partition] = get_or_init_offset(resp) - self.fetch_offsets = self.offsets.copy() - - def commit(self, partitions=None): - """ - Commit offsets for this consumer - - partitions: list of partitions to commit, default is to commit - all of them - """ - - # short circuit if nothing happened. This check is kept outside - # to prevent un-necessarily acquiring a lock for checking the state - if self.count_since_commit == 0: - return - - with self.commit_lock: - # Do this check again, just in case the state has changed - # during the lock acquiring timeout - if self.count_since_commit == 0: - return - - reqs = [] - if not partitions: # commit all partitions - partitions = self.offsets.keys() - - for partition in partitions: - offset = self.offsets[partition] - log.debug("Commit offset %d in SimpleConsumer: " - "group=%s, topic=%s, partition=%s" % - (offset, self.group, self.topic, partition)) - - reqs.append(OffsetCommitRequest(self.topic, partition, - offset, None)) - - resps = self.client.send_offset_commit_request(self.group, reqs) - for resp in resps: - kafka.common.check_error(resp) - - self.count_since_commit = 0 - - def _auto_commit(self): - """ - Check if we have to commit based on number of messages and commit - """ - - # Check if we are supposed to do an auto-commit - if not self.auto_commit or self.auto_commit_every_n is None: - return - - if self.count_since_commit >= self.auto_commit_every_n: - self.commit() - - def stop(self): - if self.commit_timer is not None: - self.commit_timer.stop() - self.commit() - - def pending(self, partitions=None): - """ - Gets the pending message count - - partitions: list of partitions to check for, default is to check all - """ - if not partitions: - partitions = self.offsets.keys() - - total = 0 - reqs = [] - - for partition in partitions: - reqs.append(OffsetRequest(self.topic, partition, -1, 1)) - - resps = self.client.send_offset_request(reqs) - for resp in resps: - partition = resp.partition - pending = resp.offsets[0] - offset = self.offsets[partition] - total += pending - offset - (1 if offset > 0 else 0) - - return total - - -class SimpleConsumer(Consumer): - """ - A simple consumer implementation that consumes all/specified partitions - for a topic - - client: a connected KafkaClient - group: a name for this consumer, used for offset storage and must be unique - topic: the topic to consume - partitions: An optional list of partitions to consume the data from - - auto_commit: default True. Whether or not to auto commit the offsets - auto_commit_every_n: default 100. How many messages to consume - before a commit - auto_commit_every_t: default 5000. How much time (in milliseconds) to - wait before commit - fetch_size_bytes: number of bytes to request in a FetchRequest - buffer_size: default 4K. Initial number of bytes to tell kafka we - have available. This will double as needed. - max_buffer_size: default 16K. Max number of bytes to tell kafka we have - available. None means no limit. - iter_timeout: default None. How much time (in seconds) to wait for a - message in the iterator before exiting. None means no - timeout, so it will wait forever. - - Auto commit details: - If both auto_commit_every_n and auto_commit_every_t are set, they will - reset one another when one is triggered. These triggers simply call the - commit method on this class. A manual call to commit will also reset - these triggers - """ - def __init__(self, client, group, topic, auto_commit=True, partitions=None, - auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, - auto_commit_every_t=AUTO_COMMIT_INTERVAL, - fetch_size_bytes=FETCH_MIN_BYTES, - buffer_size=FETCH_BUFFER_SIZE_BYTES, - max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES, - iter_timeout=None): - super(SimpleConsumer, self).__init__( - client, group, topic, - partitions=partitions, - auto_commit=auto_commit, - auto_commit_every_n=auto_commit_every_n, - auto_commit_every_t=auto_commit_every_t) - - if max_buffer_size is not None and buffer_size > max_buffer_size: - raise ValueError("buffer_size (%d) is greater than " - "max_buffer_size (%d)" % - (buffer_size, max_buffer_size)) - self.buffer_size = buffer_size - self.max_buffer_size = max_buffer_size - self.partition_info = False # Do not return partition info in msgs - self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME - self.fetch_min_bytes = fetch_size_bytes - self.fetch_offsets = self.offsets.copy() - self.iter_timeout = iter_timeout - self.queue = Queue() - - def __repr__(self): - return '<SimpleConsumer group=%s, topic=%s, partitions=%s>' % \ - (self.group, self.topic, str(self.offsets.keys())) - - def provide_partition_info(self): - """ - Indicates that partition info must be returned by the consumer - """ - self.partition_info = True - - def seek(self, offset, whence): - """ - Alter the current offset in the consumer, similar to fseek - - offset: how much to modify the offset - whence: where to modify it from - 0 is relative to the earliest available offset (head) - 1 is relative to the current offset - 2 is relative to the latest known offset (tail) - """ - - if whence == 1: # relative to current position - for partition, _offset in self.offsets.items(): - self.offsets[partition] = _offset + offset - elif whence in (0, 2): # relative to beginning or end - # divide the request offset by number of partitions, - # distribute the remained evenly - (delta, rem) = divmod(offset, len(self.offsets)) - deltas = {} - for partition, r in izip_longest(self.offsets.keys(), - repeat(1, rem), fillvalue=0): - deltas[partition] = delta + r - - reqs = [] - for partition in self.offsets.keys(): - if whence == 0: - reqs.append(OffsetRequest(self.topic, partition, -2, 1)) - elif whence == 2: - reqs.append(OffsetRequest(self.topic, partition, -1, 1)) - else: - pass - - resps = self.client.send_offset_request(reqs) - for resp in resps: - self.offsets[resp.partition] = \ - resp.offsets[0] + deltas[resp.partition] - else: - raise ValueError("Unexpected value for `whence`, %d" % whence) - - # Reset queue and fetch offsets since they are invalid - self.fetch_offsets = self.offsets.copy() - if self.auto_commit: - self.count_since_commit += 1 - self.commit() - - self.queue = Queue() - - def get_messages(self, count=1, block=True, timeout=0.1): - """ - Fetch the specified number of messages - - count: Indicates the maximum number of messages to be fetched - block: If True, the API will block till some messages are fetched. - timeout: If block is True, the function will block for the specified - time (in seconds) until count messages is fetched. If None, - it will block forever. - """ - messages = [] - if timeout is not None: - max_time = time.time() + timeout - - new_offsets = {} - while count > 0 and (timeout is None or timeout > 0): - result = self._get_message(block, timeout, get_partition_info=True, - update_offset=False) - if result: - partition, message = result - if self.partition_info: - messages.append(result) - else: - messages.append(message) - new_offsets[partition] = message.offset + 1 - count -= 1 - else: - # Ran out of messages for the last request. - if not block: - # If we're not blocking, break. - break - if timeout is not None: - # If we're blocking and have a timeout, reduce it to the - # appropriate value - timeout = max_time - time.time() - - # Update and commit offsets if necessary - self.offsets.update(new_offsets) - self.count_since_commit += len(messages) - self._auto_commit() - return messages - - def get_message(self, block=True, timeout=0.1, get_partition_info=None): - return self._get_message(block, timeout, get_partition_info) - - def _get_message(self, block=True, timeout=0.1, get_partition_info=None, - update_offset=True): - """ - If no messages can be fetched, returns None. - If get_partition_info is None, it defaults to self.partition_info - If get_partition_info is True, returns (partition, message) - If get_partition_info is False, returns message - """ - if self.queue.empty(): - # We're out of messages, go grab some more. - with FetchContext(self, block, timeout): - self._fetch() - try: - partition, message = self.queue.get_nowait() - - if update_offset: - # Update partition offset - self.offsets[partition] = message.offset + 1 - - # Count, check and commit messages if necessary - self.count_since_commit += 1 - self._auto_commit() - - if get_partition_info is None: - get_partition_info = self.partition_info - if get_partition_info: - return partition, message - else: - return message - except Empty: - return None - - def __iter__(self): - if self.iter_timeout is None: - timeout = ITER_TIMEOUT_SECONDS - else: - timeout = self.iter_timeout - - while True: - message = self.get_message(True, timeout) - if message: - yield message - elif self.iter_timeout is None: - # We did not receive any message yet but we don't have a - # timeout, so give up the CPU for a while before trying again - time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS) - else: - # Timed out waiting for a message - break - - def _fetch(self): - # Create fetch request payloads for all the partitions - partitions = dict((p, self.buffer_size) - for p in self.fetch_offsets.keys()) - while partitions: - requests = [] - for partition, buffer_size in six.iteritems(partitions): - requests.append(FetchRequest(self.topic, partition, - self.fetch_offsets[partition], - buffer_size)) - # Send request - responses = self.client.send_fetch_request( - requests, - max_wait_time=int(self.fetch_max_wait_time), - min_bytes=self.fetch_min_bytes) - - retry_partitions = {} - for resp in responses: - partition = resp.partition - buffer_size = partitions[partition] - try: - for message in resp.messages: - # Put the message in our queue - self.queue.put((partition, message)) - self.fetch_offsets[partition] = message.offset + 1 - except ConsumerFetchSizeTooSmall: - if (self.max_buffer_size is not None and - buffer_size == self.max_buffer_size): - log.error("Max fetch size %d too small", - self.max_buffer_size) - raise - if self.max_buffer_size is None: - buffer_size *= 2 - else: - buffer_size = max(buffer_size * 2, - self.max_buffer_size) - log.warn("Fetch size too small, increase to %d (2x) " - "and retry", buffer_size) - retry_partitions[partition] = buffer_size - except ConsumerNoMoreData as e: - log.debug("Iteration was ended by %r", e) - except StopIteration: - # Stop iterating through this partition - log.debug("Done iterating over partition %s" % partition) - partitions = retry_partitions - -def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size): - """ - A child process worker which consumes messages based on the - notifications given by the controller process - - NOTE: Ideally, this should have been a method inside the Consumer - class. However, multiprocessing module has issues in windows. The - functionality breaks unless this function is kept outside of a class - """ - - # Make the child processes open separate socket connections - client.reinit() - - # We will start consumers without auto-commit. Auto-commit will be - # done by the master controller process. - consumer = SimpleConsumer(client, group, topic, - partitions=chunk, - auto_commit=False, - auto_commit_every_n=None, - auto_commit_every_t=None) - - # Ensure that the consumer provides the partition information - consumer.provide_partition_info() - - while True: - # Wait till the controller indicates us to start consumption - start.wait() - - # If we are asked to quit, do so - if exit.is_set(): - break - - # Consume messages and add them to the queue. If the controller - # indicates a specific number of messages, follow that advice - count = 0 - - message = consumer.get_message() - if message: - queue.put(message) - count += 1 - - # We have reached the required size. The controller might have - # more than what he needs. Wait for a while. - # Without this logic, it is possible that we run into a big - # loop consuming all available messages before the controller - # can reset the 'start' event - if count == size.value: - pause.wait() - - else: - # In case we did not receive any message, give up the CPU for - # a while before we try again - time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS) - - consumer.stop() - - -class MultiProcessConsumer(Consumer): - """ - A consumer implementation that consumes partitions for a topic in - parallel using multiple processes - - client: a connected KafkaClient - group: a name for this consumer, used for offset storage and must be unique - topic: the topic to consume - - auto_commit: default True. Whether or not to auto commit the offsets - auto_commit_every_n: default 100. How many messages to consume - before a commit - auto_commit_every_t: default 5000. How much time (in milliseconds) to - wait before commit - num_procs: Number of processes to start for consuming messages. - The available partitions will be divided among these processes - partitions_per_proc: Number of partitions to be allocated per process - (overrides num_procs) - - Auto commit details: - If both auto_commit_every_n and auto_commit_every_t are set, they will - reset one another when one is triggered. These triggers simply call the - commit method on this class. A manual call to commit will also reset - these triggers - """ - def __init__(self, client, group, topic, auto_commit=True, - auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, - auto_commit_every_t=AUTO_COMMIT_INTERVAL, - num_procs=1, partitions_per_proc=0): - - # Initiate the base consumer class - super(MultiProcessConsumer, self).__init__( - client, group, topic, - partitions=None, - auto_commit=auto_commit, - auto_commit_every_n=auto_commit_every_n, - auto_commit_every_t=auto_commit_every_t) - - # Variables for managing and controlling the data flow from - # consumer child process to master - self.queue = MPQueue(1024) # Child consumers dump messages into this - self.start = Event() # Indicates the consumers to start fetch - self.exit = Event() # Requests the consumers to shutdown - self.pause = Event() # Requests the consumers to pause fetch - self.size = Value('i', 0) # Indicator of number of messages to fetch - - partitions = self.offsets.keys() - - # If unspecified, start one consumer per partition - # The logic below ensures that - # * we do not cross the num_procs limit - # * we have an even distribution of partitions among processes - if not partitions_per_proc: - partitions_per_proc = round(len(partitions) * 1.0 / num_procs) - if partitions_per_proc < num_procs * 0.5: - partitions_per_proc += 1 - - # The final set of chunks - chunker = lambda *x: [] + list(x) - chunks = map(chunker, *[iter(partitions)] * int(partitions_per_proc)) - - self.procs = [] - for chunk in chunks: - chunk = filter(lambda x: x is not None, chunk) - args = (client.copy(), - group, topic, list(chunk), - self.queue, self.start, self.exit, - self.pause, self.size) - - proc = Process(target=_mp_consume, args=args) - proc.daemon = True - proc.start() - self.procs.append(proc) - - def __repr__(self): - return '<MultiProcessConsumer group=%s, topic=%s, consumers=%d>' % \ - (self.group, self.topic, len(self.procs)) - - def stop(self): - # Set exit and start off all waiting consumers - self.exit.set() - self.pause.set() - self.start.set() - - for proc in self.procs: - proc.join() - proc.terminate() - - super(MultiProcessConsumer, self).stop() - - def __iter__(self): - """ - Iterator to consume the messages available on this consumer - """ - # Trigger the consumer procs to start off. - # We will iterate till there are no more messages available - self.size.value = 0 - self.pause.set() - - while True: - self.start.set() - try: - # We will block for a small while so that the consumers get - # a chance to run and put some messages in the queue - # TODO: This is a hack and will make the consumer block for - # at least one second. Need to find a better way of doing this - partition, message = self.queue.get(block=True, timeout=1) - except Empty: - break - - # Count, check and commit messages if necessary - self.offsets[partition] = message.offset + 1 - self.start.clear() - self.count_since_commit += 1 - self._auto_commit() - yield message - - self.start.clear() - - def get_messages(self, count=1, block=True, timeout=10): - """ - Fetch the specified number of messages - - count: Indicates the maximum number of messages to be fetched - block: If True, the API will block till some messages are fetched. - timeout: If block is True, the function will block for the specified - time (in seconds) until count messages is fetched. If None, - it will block forever. - """ - messages = [] - - # Give a size hint to the consumers. Each consumer process will fetch - # a maximum of "count" messages. This will fetch more messages than - # necessary, but these will not be committed to kafka. Also, the extra - # messages can be provided in subsequent runs - self.size.value = count - self.pause.clear() - - if timeout is not None: - max_time = time.time() + timeout - - new_offsets = {} - while count > 0 and (timeout is None or timeout > 0): - # Trigger consumption only if the queue is empty - # By doing this, we will ensure that consumers do not - # go into overdrive and keep consuming thousands of - # messages when the user might need only a few - if self.queue.empty(): - self.start.set() - - try: - partition, message = self.queue.get(block, timeout) - except Empty: - break - - messages.append(message) - new_offsets[partition] = message.offset + 1 - count -= 1 - if timeout is not None: - timeout = max_time - time.time() - - self.size.value = 0 - self.start.clear() - self.pause.set() - - # Update and commit offsets if necessary - self.offsets.update(new_offsets) - self.count_since_commit += len(messages) - self._auto_commit() - - return messages |