diff options
-rw-r--r-- | kafka/consumer/group.py | 1277 |
1 files changed, 480 insertions, 797 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index dba5f60..abd9473 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -1,883 +1,566 @@ -#pylint: skip-file from __future__ import absolute_import -from collections import namedtuple -from copy import deepcopy import logging -import random -import sys import time -import six - -from kafka.cluster import Cluster -from kafka.common import ( - OffsetFetchRequest, OffsetCommitRequest, OffsetRequest, FetchRequest, - check_error, NotLeaderForPartitionError, UnknownTopicOrPartitionError, - OffsetOutOfRangeError, RequestTimedOutError, KafkaMessage, ConsumerTimeout, - FailedPayloadsError, KafkaUnavailableError, KafkaConfigurationError -) - -logger = logging.getLogger(__name__) - -OffsetsStruct = namedtuple("OffsetsStruct", ["fetch", "highwater", "commit", "task_done"]) - -NEW_CONSUMER_CONFIGS = { - 'bootstrap_servers': None, - 'client_id': None, - 'group_id': None, - 'key_deserializer': None, - 'value_deserializer': None, - 'auto_commit_interval_ms': 5000, - 'auto_offset_reset': 'latest', - 'check_crcs': True, # "Automatically check the CRC32 of the records consumed. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so it may be disabled in cases seeking extreme performance."; - 'connections_max_idle_ms': 9 * 60 * 1000, - 'enable_auto_commit': True, - 'fetch_max_wait_ms': 500, - 'fetch_min_bytes': 1024, - 'heartbeat_interval_ms': 3000, - 'max_partition_fetch_bytes': 1 * 1024 * 1024, - 'metadata_max_age_ms': 5 * 60 * 1000, # >0 - 'metric_reporters': None, - 'metrics_num_samples': 2, - 'metrics_sample_window_ms': 30000, - 'partition_assignment_strategy': None, # This should default to something like 'roundrobin' or 'range' - 'reconnect_backoff_ms': 50, - 'request_timeout_ms': 40 * 1000, - 'retry_backoff_ms': 100, - 'send_buffer_bytes': 128 * 1024, - 'receive_buffer_bytes': 32 * 1024, - 'session_timeout_ms': 30000, # "The timeout used to detect failures when using Kafka's group management facilities."; -} - -DEFAULT_CONSUMER_CONFIG = { - 'client_id': __name__, - 'group_id': None, - 'bootstrap_servers': [], - 'socket_timeout_ms': 30 * 1000, - 'fetch_message_max_bytes': 1024 * 1024, - 'auto_offset_reset': 'largest', - 'fetch_min_bytes': 1, - 'fetch_wait_max_ms': 100, - 'refresh_leader_backoff_ms': 200, - 'deserializer_class': lambda msg: msg, - 'auto_commit_enable': False, - 'auto_commit_interval_ms': 60 * 1000, - 'auto_commit_interval_messages': None, - 'consumer_timeout_ms': -1, - - # Currently unused - 'socket_receive_buffer_bytes': 64 * 1024, - 'num_consumer_fetchers': 1, - 'default_fetcher_backoff_ms': 1000, - 'queued_max_message_chunks': 10, - 'rebalance_max_retries': 4, - 'rebalance_backoff_ms': 2000, -} - -DEPRECATED_CONFIG_KEYS = { - 'metadata_broker_list': 'bootstrap_servers', -} +import kafka.common as Errors -class KafkaConsumer(object): - """A simpler kafka consumer""" - - def __init__(self, *topics, **configs): - self._config = deepcopy(DEFAULT_CONSUMER_CONFIG) - self._topics = topics - self._partitions = [] - self._offsets = OffsetsStruct(fetch=dict(), commit=dict(), highwater=dict(), task_done=dict()) - self._consumer_timeout = False - self._uncommitted_message_count = 0 - self._next_commit_time = None - self._msg_iter = None - - self._configure(**configs) - self._cluster = Cluster(**self._config) - - def assign(self, topic_partitions): - pass +from kafka.client_async import KafkaClient +from kafka.consumer.fetcher import Fetcher +from kafka.consumer.subscription_state import SubscriptionState +from kafka.coordinator.consumer import ConsumerCoordinator +from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor +from kafka.protocol.offset import OffsetResetStrategy +from kafka.version import __version__ - def assignment(self): - """Get the set of partitions currently assigned to this consumer.""" - pass +log = logging.getLogger(__name__) - def close(self): - """Close the consumer, waiting indefinitely for any needed cleanup.""" - pass - def commitAsync(self, topic_partition_offsets_and_metadata=None, callback=None): - """ - Commit offsets the specified offsets, or those returned on the last poll(), - for all the subscribed list of topics and partition. Asynchronous. - """ - pass +class KafkaConsumer(object): + """Consumer for Kafka 0.9""" + _bootstrap_servers = 'localhost' + _client_id = 'kafka-python-' + __version__ + _group_id = 'kafka-python-default-group' + _key_deserializer = None + _value_deserializer = None + _fetch_max_wait_ms = 500 + _fetch_min_bytes = 1024 + _max_partition_fetch_bytes = 1 * 1024 * 1024 + _request_timeout_ms = 40 * 1000 + _retry_backoff_ms = 100 + _reconnect_backoff_ms = 50 + _auto_offset_reset = 'latest' + _enable_auto_commit = True + _auto_commit_interval_ms = 5000 + _check_crcs = True + _metadata_max_age_ms = 5 * 60 * 1000 + _partition_assignment_strategy = (RoundRobinPartitionAssignor,) + _heartbeat_interval_ms = 3000 + _session_timeout_ms = 30000 + _send_buffer_bytes = 128 * 1024 + _receive_buffer_bytes = 32 * 1024 + _connections_max_idle_ms = 9 * 60 * 1000 # not implemented yet + #_metric_reporters = None + #_metrics_num_samples = 2 + #_metrics_sample_window_ms = 30000 + + def __init__(self, *topics, **kwargs): + """A Kafka client that consumes records from a Kafka cluster. + + The consumer will transparently handle the failure of servers in the + Kafka cluster, and transparently adapt as partitions of data it fetches + migrate within the cluster. This client also interacts with the server + to allow groups of consumers to load balance consumption using consumer + groups. + + Requires Kafka Server >= 0.9.0.0 + + Configuration settings can be passed to constructor as kwargs, + otherwise defaults will be used: - def commitSync(self, topic_partition_offsets_and_metadata=None): - """ - Commit offsets the specified offsets, or those returned on the last poll(), - for all the subscribed list of topics and partition. Synchronous. - Blocks until either the commit succeeds or an unrecoverable error is - encountered (in which case it is thrown to the caller). - """ - pass + Keyword Arguments: + bootstrap_servers: 'host[:port]' string (or list of 'host[:port]' + strings) that the consumer should contact to bootstrap initial + cluster metadata. This does not have to be the full node list. + It just needs to have at least one broker that will respond to a + Metadata API Request. Default port is 9092. If no servers are + specified, will default to localhost:9092. + client_id (str): a name for this client. This string is passed in + each request to servers and can be used to identify specific + server-side log entries that correspond to this client. Also + submitted to GroupCoordinator for logging with respect to + consumer group administration. Default: 'kafka-python-{version}' + group_id (str): name of the consumer group to join for dynamic + partition assignment (if enabled), and to use for fetching and + committing offsets. Default: 'kafka-python-default-group' + key_deserializer (callable): Any callable that takes a + raw message key and returns a deserialized key. + value_deserializer (callable, optional): Any callable that takes a + raw message value and returns a deserialized value. + fetch_min_bytes (int): Minimum amount of data the server should + return for a fetch request, otherwise wait up to + fetch_wait_max_ms for more data to accumulate. Default: 1024. + fetch_wait_max_ms (int): The maximum amount of time in milliseconds + the server will block before answering the fetch request if + there isn't sufficient data to immediately satisfy the + requirement given by fetch_min_bytes. Default: 500. + max_partition_fetch_bytes (int): The maximum amount of data + per-partition the server will return. The maximum total memory + used for a request = #partitions * max_partition_fetch_bytes. + This size must be at least as large as the maximum message size + the server allows or else it is possible for the producer to + send messages larger than the consumer can fetch. If that + happens, the consumer can get stuck trying to fetch a large + message on a certain partition. Default: 1048576. + request_timeout_ms (int): Client request timeout in milliseconds. + Default: 40000. + retry_backoff_ms (int): Milliseconds to backoff when retrying on + errors. Default: 100. + reconnect_backoff_ms (int): The amount of time in milliseconds to + wait before attempting to reconnect to a given host. Defaults + to 50. + auto_offset_reset (str): A policy for resetting offsets on + OffsetOutOfRange errors: 'earliest' will move to the oldest + available message, 'latest' will move to the most recent. Any + ofther value will raise the exception. Default: 'latest'. + enable_auto_commit (bool): If true the consumer's offset will be + periodically committed in the background. Default: True. + auto_commit_interval_ms (int): milliseconds between automatic + offset commits, if enable_auto_commit is True. Default: 5000. + check_crcs (bool): Automatically check the CRC32 of the records + consumed. This ensures no on-the-wire or on-disk corruption to + the messages occurred. This check adds some overhead, so it may + be disabled in cases seeking extreme performance. Default: True + metadata_max_age_ms (int): The period of time in milliseconds after + which we force a refresh of metadata even if we haven't seen any + partition leadership changes to proactively discover any new + brokers or partitions. Default: 300000 + partition_assignment_strategy (list): List of objects to use to + distribute partition ownership amongst consumer instances when + group management is used. Default: [RoundRobinPartitionAssignor] + heartbeat_interval_ms (int): The expected time in milliseconds + between heartbeats to the consumer coordinator when using + Kafka's group management feature. Heartbeats are used to ensure + that the consumer's session stays active and to facilitate + rebalancing when new consumers join or leave the group. The + value must be set lower than session_timeout_ms, but typically + should be set no higher than 1/3 of that value. It can be + adjusted even lower to control the expected time for normal + rebalances. Default: 3000 + session_timeout_ms (int): The timeout used to detect failures when + using Kafka's group managementment facilities. Default: 30000 + send_buffer_bytes (int): The size of the TCP send buffer + (SO_SNDBUF) to use when sending data. Default: 131072 + receive_buffer_bytes (int): The size of the TCP receive buffer + (SO_RCVBUF) to use when reading data. Default: 32768 - def committed(self, topic_partition): - """ - Get the last committed offset for the given partition (whether the - commit happened by this process or another). - Returns: offset_and_metadata + Configuration parameters are described in more detail at + https://kafka.apache.org/090/configuration.html#newconsumerconfigs """ - pass + for config in ('bootstrap_servers', 'client_id', 'group_id', + 'key_deserializer', 'value_deserializer', + 'fetch_max_wait_ms', 'fetch_min_bytes', + 'max_partition_fetch_bytes', 'request_timeout_ms', + 'retry_backoff_ms', 'reconnect_backoff_ms', + 'auto_offset_reset', 'enable_auto_commit', + 'auto_commit_interval_ms', 'check_crcs', + 'metadata_max_age_ms', 'partition_assignment_strategy', + 'heartbeat_interval_ms', 'session_timeout_ms', + 'send_buffer_bytes', 'receive_buffer_bytes'): + if config in kwargs: + setattr(self, '_' + config, kwargs[config]) + + self._client = KafkaClient(**kwargs) + self._subscription = SubscriptionState(self._auto_offset_reset) + self._fetcher = Fetcher( + self._client, self._subscription, **kwargs) + self._coordinator = ConsumerCoordinator( + self._client, self._group_id, self._subscription, + assignors=self._partition_assignment_strategy, + **kwargs) + self._closed = False + + #self.metrics = None + if topics: + self._subscription.subscribe(topics=topics) + self._client.set_topics(topics) + + def assign(self, partitions): + """Manually assign a list of TopicPartitions to this consumer. + + This interface does not allow for incremental assignment and will + replace the previous assignment (if there was one). + + Manual topic assignment through this method does not use the consumer's + group management functionality. As such, there will be no rebalance + operation triggered when group membership or cluster and topic metadata + change. Note that it is not possible to use both manual partition + assignment with assign() and group assignment with subscribe(). - def listTopics(self): - """ - Get metadata about partitions for all topics that the user is authorized - to view. - Returns: {topic: [partition_info]} - """ - pass + Arguments: + partitions (list of TopicPartition): assignment for this instance. - def metrics(self): - """ - Get the metrics kept by the consumer. - Returns: {metric_name: metric} + Raises: + IllegalStateError: if consumer has already called subscribe() """ - pass + self._subscription.assign_from_user(partitions) + self._client.set_topics([tp.topic for tp in partitions]) - def partitionsFor(self, topic): - """ - Get metadata about the partitions for a given topic. - Returns: [partition_info] - """ - pass + def assignment(self): + """Get the TopicPartitions currently assigned to this consumer. - def pause(self, *topic_partitions): - """Suspend fetching from the requested partitions.""" - pass + If partitions were directly assigning using assign(), then this will + simply return the same partitions that were assigned. + If topics were subscribed to using subscribe(), then this will give the + set of topic partitions currently assigned to the consumer (which may + be none if the assignment hasn't happened yet, or the partitions are in + the process of getting reassigned). - def poll(self, timeout): - """ - Fetch data for the topics or partitions specified using one of the - subscribe/assign APIs. - Returns: [consumer_records] + Returns: + set: {TopicPartition, ...} """ - pass + return self._subscription.assigned_partitions() - def position(self, topic_partition): - """Get the offset of the next record that will be fetched (if a record - with that offset exists).""" - pass - - def resume(self, *topic_partitions): - """Resume specified partitions which have been paused""" - pass - - def seek(self, topic_partition, offset): - """Overrides the fetch offsets that the consumer will use on the next - poll(timeout).""" - pass - - def seekToBeginning(self, *topic_partitions): - """Seek to the first offset for each of the given partitions.""" - pass - - def seekToEnd(self, *topic_partitions): - """Seek to the last offset for each of the given partitions.""" - pass + def close(self): + """Close the consumer, waiting indefinitely for any needed cleanup.""" + if self._closed: + return + log.debug("Closing the KafkaConsumer.") + self._closed = True + self._coordinator.close() + #self.metrics.close() + self._client.close() + try: + self._key_deserializer.close() + except AttributeError: + pass + try: + self._value_deserializer.close() + except AttributeError: + pass + log.debug("The KafkaConsumer has closed.") + + def commit_async(self, offsets=None, callback=None): + """Commit offsets to kafka asynchronously, optionally firing callback + + This commits offsets only to Kafka. The offsets committed using this API + will be used on the first fetch after every rebalance and also on + startup. As such, if you need to store offsets in anything other than + Kafka, this API should not be used. + + This is an asynchronous call and will not block. Any errors encountered + are either passed to the callback (if provided) or discarded. - def subscribe(self, topics, callback=None): - """Subscribe to the given list of topics or those matching a regex to get dynamically assigned - partitions.""" - pass + Arguments: + offsets (dict, optional): {TopicPartition: OffsetAndMetadata} dict + to commit with the configured group_id. Defaults to current + consumed offsets for all subscribed partitions. + callback (callable, optional): called as callback(offsets, response) + with response as either an Exception or a OffsetCommitResponse + struct. This callback can be used to trigger custom actions when + a commit request completes. - def subscription(self): - """ - Get the current subscription. - Returns: [topic] + Returns: + kafka.future.Future """ - pass - - def unsubscribe(self): - """Unsubscribe from topics currently subscribed with subscribe(List).""" - pass + if offsets is None: + offsets = self._subscription.all_consumed_offsets() + log.debug("Committing offsets: %s", offsets) + future = self._coordinator.commit_offsets_async( + offsets, callback=callback) + return future - def wakeup(self): - """Wakeup the consumer.""" - pass + def commit(self, offsets=None): + """Commit offsets to kafka, blocking until success or error - def _configure(self, **configs): - """Configure the consumer instance + This commits offsets only to Kafka. The offsets committed using this API + will be used on the first fetch after every rebalance and also on + startup. As such, if you need to store offsets in anything other than + Kafka, this API should not be used. - Configuration settings can be passed to constructor, - otherwise defaults will be used: - - Keyword Arguments: - bootstrap_servers (list): List of initial broker nodes the consumer - should contact to bootstrap initial cluster metadata. This does - not have to be the full node list. It just needs to have at - least one broker that will respond to a Metadata API Request. - client_id (str): a unique name for this client. Defaults to - 'kafka.consumer.kafka'. - group_id (str): the name of the consumer group to join, - Offsets are fetched / committed to this group name. - fetch_message_max_bytes (int, optional): Maximum bytes for each - topic/partition fetch request. Defaults to 1024*1024. - fetch_min_bytes (int, optional): Minimum amount of data the server - should return for a fetch request, otherwise wait up to - fetch_wait_max_ms for more data to accumulate. Defaults to 1. - fetch_wait_max_ms (int, optional): Maximum time for the server to - block waiting for fetch_min_bytes messages to accumulate. - Defaults to 100. - refresh_leader_backoff_ms (int, optional): Milliseconds to backoff - when refreshing metadata on errors (subject to random jitter). - Defaults to 200. - socket_timeout_ms (int, optional): TCP socket timeout in - milliseconds. Defaults to 30*1000. - auto_offset_reset (str, optional): A policy for resetting offsets on - OffsetOutOfRange errors. 'smallest' will move to the oldest - available message, 'largest' will move to the most recent. Any - ofther value will raise the exception. Defaults to 'largest'. - deserializer_class (callable, optional): Any callable that takes a - raw message value and returns a deserialized value. Defaults to - lambda msg: msg. - auto_commit_enable (bool, optional): Enabling auto-commit will cause - the KafkaConsumer to periodically commit offsets without an - explicit call to commit(). Defaults to False. - auto_commit_interval_ms (int, optional): If auto_commit_enabled, - the milliseconds between automatic offset commits. Defaults to - 60 * 1000. - auto_commit_interval_messages (int, optional): If - auto_commit_enabled, a number of messages consumed between - automatic offset commits. Defaults to None (disabled). - consumer_timeout_ms (int, optional): number of millisecond to throw - a timeout exception to the consumer if no message is available - for consumption. Defaults to -1 (dont throw exception). - - Configuration parameters are described in more detail at - http://kafka.apache.org/documentation.html#highlevelconsumerapi - """ - configs = self._deprecate_configs(**configs) - self._config.update(configs) + Blocks until either the commit succeeds or an unrecoverable error is + encountered (in which case it is thrown to the caller). - if self._config['auto_commit_enable']: - logger.info('Configuring consumer to auto-commit offsets') - self._reset_auto_commit() + Currently only supports kafka-topic offset storage (not zookeeper) - def set_topic_partitions(self, *topics): + Arguments: + offsets (dict, optional): {TopicPartition: OffsetAndMetadata} dict + to commit with the configured group_id. Defaults to current + consumed offsets for all subscribed partitions. """ - Set the topic/partitions to consume - Optionally specify offsets to start from - - Accepts types: + if offsets is None: + offsets = self._subscription.all_consumed_offsets() + self._coordinator.commit_offsets_sync(offsets) - * str (utf-8): topic name (will consume all available partitions) - * tuple: (topic, partition) - * dict: - - { topic: partition } - - { topic: [partition list] } - - { topic: (partition tuple,) } + def committed(self, partition): + """Get the last committed offset for the given partition - Optionally, offsets can be specified directly: + This offset will be used as the position for the consumer + in the event of a failure. - * tuple: (topic, partition, offset) - * dict: { (topic, partition): offset, ... } + This call may block to do a remote call if the partition in question + isn't assigned to this consumer or if the consumer hasn't yet + initialized its cache of committed offsets. - Example: - - .. code:: python - - kafka = KafkaConsumer() - - # Consume topic1-all; topic2-partition2; topic3-partition0 - kafka.set_topic_partitions("topic1", ("topic2", 2), {"topic3": 0}) - - # Consume topic1-0 starting at offset 12, and topic2-1 at offset 45 - # using tuples -- - kafka.set_topic_partitions(("topic1", 0, 12), ("topic2", 1, 45)) - - # using dict -- - kafka.set_topic_partitions({ ("topic1", 0): 12, ("topic2", 1): 45 }) + Arguments: + partition (TopicPartition): the partition to check + Returns: + The last committed offset, or None if there was no prior commit. """ - self._cluster.refresh_metadata() - - # Handle different topic types - for arg in topics: - - # Topic name str -- all partitions - if isinstance(arg, (six.string_types, six.binary_type)): - topic = arg - for partition in self._cluster.partitions_for_topic(topic): - self._consume_topic_partition(topic, partition) - - # (topic, partition [, offset]) tuple - elif isinstance(arg, tuple): - topic = arg[0] - partition = arg[1] - self._consume_topic_partition(topic, partition) - if len(arg) == 3: - offset = arg[2] - self._offsets.fetch[(topic, partition)] = offset - - # { topic: partitions, ... } dict - elif isinstance(arg, dict): - for key, value in six.iteritems(arg): - - # key can be string (a topic) - if isinstance(key, (six.string_types, six.binary_type)): - topic = key - - # topic: partition - if isinstance(value, int): - self._consume_topic_partition(topic, value) - - # topic: [ partition1, partition2, ... ] - elif isinstance(value, (list, tuple)): - for partition in value: - self._consume_topic_partition(topic, partition) - else: - raise KafkaConfigurationError( - 'Unknown topic type ' - '(dict key must be int or list/tuple of ints)' - ) - - # (topic, partition): offset - elif isinstance(key, tuple): - topic = key[0] - partition = key[1] - self._consume_topic_partition(topic, partition) - self._offsets.fetch[(topic, partition)] = value - + if self._subscription.is_assigned: + committed = self._subscription.assignment[partition].committed + if committed is None: + self._coordinator.refresh_committed_offsets_if_needed() + committed = self._subscription.assignment[partition].committed + else: + commit_map = self._coordinator.fetch_committed_offsets([partition]) + if partition in commit_map: + committed = commit_map[partition].offset else: - raise KafkaConfigurationError('Unknown topic type (%s)' % type(arg)) - - # If we have a consumer group, try to fetch stored offsets - if self._config['group_id']: - self._get_commit_offsets() - - # Update missing fetch/commit offsets - for topic_partition in self._topics: - - # Commit offsets default is None - if topic_partition not in self._offsets.commit: - self._offsets.commit[topic_partition] = None - - # Skip if we already have a fetch offset from user args - if topic_partition not in self._offsets.fetch: - - # Fetch offsets default is (1) commit - if self._offsets.commit[topic_partition] is not None: - self._offsets.fetch[topic_partition] = self._offsets.commit[topic_partition] - - # or (2) auto reset - else: - self._offsets.fetch[topic_partition] = self._reset_partition_offset(topic_partition) + committed = None + return committed - # highwater marks (received from server on fetch response) - # and task_done (set locally by user) - # should always get initialized to None - self._reset_highwater_offsets() - self._reset_task_done_offsets() + def _ensure_not_closed(self): + if self._closed: + raise Errors.IllegalStateError("This consumer has already been closed.") - # Reset message iterator in case we were in the middle of one - self._reset_message_iterator() + def topics(self): + """Get all topic metadata topics the user is authorized to view. - def next(self): - """Return the next available message - - Blocks indefinitely unless consumer_timeout_ms > 0 + [Not Implemented Yet] Returns: - a single KafkaMessage from the message iterator + {topic: [partition_info]} + """ + raise NotImplementedError('TODO') - Raises: - ConsumerTimeout after consumer_timeout_ms and no message + def partitions_for_topic(self, topic): + """Get metadata about the partitions for a given topic. - Note: - This is also the method called internally during iteration + Arguments: + topic (str): topic to check + Returns: + set: partition ids """ - self._set_consumer_timeout_start() - while True: - - try: - return six.next(self._get_message_iterator()) + return self._client.cluster.partitions_for_topic(topic) - # Handle batch completion - except StopIteration: - self._reset_message_iterator() + def poll(self, timeout_ms=0): + """ + Fetch data for the topics or partitions specified using one of the + subscribe/assign APIs. It is an error to not have subscribed to any + topics or partitions before polling for data. - self._check_consumer_timeout() + On each poll, consumer will try to use the last consumed offset as the + starting offset and fetch sequentially. The last consumed offset can be + manually set through seek(partition, offset) or automatically set as + the last committed offset for the subscribed list of partitions. - def fetch_messages(self): - """Sends FetchRequests for all topic/partitions set for consumption + Arguments: + timeout_ms (int, optional): milliseconds to spend waiting in poll if + data is not available. If 0, returns immediately with any + records that are available now. Must not be negative. Default: 0 Returns: - Generator that yields KafkaMessage structs - after deserializing with the configured `deserializer_class` - - Note: - Refreshes metadata on errors, and resets fetch offset on - OffsetOutOfRange, per the configured `auto_offset_reset` policy - - See Also: - Key KafkaConsumer configuration parameters: - * `fetch_message_max_bytes` - * `fetch_max_wait_ms` - * `fetch_min_bytes` - * `deserializer_class` - * `auto_offset_reset` - + dict: topic to deque of records since the last fetch for the + subscribed list of topics and partitions """ + if timeout_ms < 0: + raise Errors.IllegalArgumentError("Timeout must not be negative") - max_bytes = self._config['fetch_message_max_bytes'] - max_wait_time = self._config['fetch_wait_max_ms'] - min_bytes = self._config['fetch_min_bytes'] - - if not self._topics: - raise KafkaConfigurationError('No topics or partitions configured') - - if not self._offsets.fetch: - raise KafkaConfigurationError( - 'No fetch offsets found when calling fetch_messages' - ) - - fetches = [FetchRequest(topic, partition, - self._offsets.fetch[(topic, partition)], - max_bytes) - for (topic, partition) in self._topics] - - # send_fetch_request will batch topic/partition requests by leader - responses = self._client.send_fetch_request( - fetches, - max_wait_time=max_wait_time, - min_bytes=min_bytes, - fail_on_error=False - ) - - for resp in responses: - - if isinstance(resp, FailedPayloadsError): - logger.warning('FailedPayloadsError attempting to fetch data') - self._refresh_metadata_on_error() - continue - - topic = resp.topic - partition = resp.partition - try: - check_error(resp) - except OffsetOutOfRangeError: - logger.warning('OffsetOutOfRange: topic %s, partition %d, ' - 'offset %d (Highwatermark: %d)', - topic, partition, - self._offsets.fetch[(topic, partition)], - resp.highwaterMark) - # Reset offset - self._offsets.fetch[(topic, partition)] = ( - self._reset_partition_offset((topic, partition)) - ) - continue - - except NotLeaderForPartitionError: - logger.warning("NotLeaderForPartitionError for %s - %d. " - "Metadata may be out of date", - topic, partition) - self._refresh_metadata_on_error() - continue - - except RequestTimedOutError: - logger.warning("RequestTimedOutError for %s - %d", - topic, partition) - continue - - # Track server highwater mark - self._offsets.highwater[(topic, partition)] = resp.highwaterMark - - # Yield each message - # Kafka-python could raise an exception during iteration - # we are not catching -- user will need to address - for (offset, message) in resp.messages: - # deserializer_class could raise an exception here - val = self._config['deserializer_class'](message.value) - msg = KafkaMessage(topic, partition, offset, message.key, val) - - # in some cases the server will return earlier messages - # than we requested. skip them per kafka spec - if offset < self._offsets.fetch[(topic, partition)]: - logger.debug('message offset less than fetched offset ' - 'skipping: %s', msg) - continue - # Only increment fetch offset - # if we safely got the message and deserialized - self._offsets.fetch[(topic, partition)] = offset + 1 - - # Then yield to user - yield msg - - def get_partition_offsets(self, topic, partition, request_time_ms, max_num_offsets): - """Request available fetch offsets for a single topic/partition + # poll for new data until the timeout expires + start = time.time() + remaining = timeout_ms + while True: + records = self._poll_once(remaining) + if records: + # before returning the fetched records, we can send off the + # next round of fetches and avoid block waiting for their + # responses to enable pipelining while the user is handling the + # fetched records. + self._fetcher.init_fetches() + return records + + elapsed_ms = (time.time() - start) * 1000 + remaining = timeout_ms - elapsed_ms + + if remaining <= 0: + break + + def _poll_once(self, timeout_ms): + """ + Do one round of polling. In addition to checking for new data, this does + any needed heart-beating, auto-commits, and offset updates. - Keyword Arguments: - topic (str): topic for offset request - partition (int): partition for offset request - request_time_ms (int): Used to ask for all messages before a - certain time (ms). There are two special values. - Specify -1 to receive the latest offset (i.e. the offset of the - next coming message) and -2 to receive the earliest available - offset. Note that because offsets are pulled in descending - order, asking for the earliest offset will always return you a - single element. - max_num_offsets (int): Maximum offsets to include in the OffsetResponse + Arguments: + timeout_ms (int): The maximum time in milliseconds to block Returns: - a list of offsets in the OffsetResponse submitted for the provided - topic / partition. See: - https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI + dict: map of topic to deque of records (may be empty) """ - reqs = [OffsetRequest(topic, partition, request_time_ms, max_num_offsets)] + # TODO: Sub-requests should take into account the poll timeout (KAFKA-1894) + self._coordinator.ensure_coordinator_known() - (resp,) = self._client.send_offset_request(reqs) + # ensure we have partitions assigned if we expect to + if self._subscription.partitions_auto_assigned(): + self._coordinator.ensure_active_group() - check_error(resp) + # fetch positions if we have partitions we're subscribed to that we + # don't know the offset for + if not self._subscription.has_all_fetch_positions(): + self._update_fetch_positions(self._subscription.missing_fetch_positions()) - # Just for sanity.. - # probably unnecessary - assert resp.topic == topic - assert resp.partition == partition + # init any new fetches (won't resend pending fetches) + records = self._fetcher.fetched_records() - return resp.offsets + # if data is available already, e.g. from a previous network client + # poll() call to commit, then just return it immediately + if records: + return records - def offsets(self, group=None): - """Get internal consumer offset values + self._fetcher.init_fetches() + self._client.poll(timeout_ms / 1000.0) + return self._fetcher.fetched_records() - Keyword Arguments: - group: Either "fetch", "commit", "task_done", or "highwater". - If no group specified, returns all groups. + def position(self, partition): + """Get the offset of the next record that will be fetched - Returns: - A copy of internal offsets struct + Arguments: + partition (TopicPartition): partition to check """ - if not group: - return { - 'fetch': self.offsets('fetch'), - 'commit': self.offsets('commit'), - 'task_done': self.offsets('task_done'), - 'highwater': self.offsets('highwater') - } - else: - return dict(deepcopy(getattr(self._offsets, group))) + if not self._subscription.is_assigned(partition): + raise Errors.IllegalStateError("You can only check the position for partitions assigned to this consumer.") + offset = self._subscription.assignment[partition].consumed + if offset is None: + self._update_fetch_positions(partition) + offset = self._subscription.assignment[partition].consumed + return offset - def task_done(self, message): - """Mark a fetched message as consumed. + def pause(self, *partitions): + """Suspend fetching from the requested partitions. - Offsets for messages marked as "task_done" will be stored back - to the kafka cluster for this consumer group on commit() + Future calls to poll() will not return any records from these partitions + until they have been resumed using resume(). Note that this method does + not affect partition subscription. In particular, it does not cause a + group rebalance when automatic assignment is used. Arguments: - message (KafkaMessage): the message to mark as complete - - Returns: - True, unless the topic-partition for this message has not - been configured for the consumer. In normal operation, this - should not happen. But see github issue 364. + *partitions (TopicPartition): partitions to pause """ - topic_partition = (message.topic, message.partition) - if topic_partition not in self._topics: - logger.warning('Unrecognized topic/partition in task_done message: ' - '{0}:{1}'.format(*topic_partition)) - return False + for partition in partitions: + log.debug("Pausing partition %s", partition) + self._subscription.pause(partition) - offset = message.offset + def resume(self, *partitions): + """Resume fetching from the specified (paused) partitions. - # Warn on non-contiguous offsets - prev_done = self._offsets.task_done[topic_partition] - if prev_done is not None and offset != (prev_done + 1): - logger.warning('Marking task_done on a non-continuous offset: %d != %d + 1', - offset, prev_done) + Arguments: + *partitions (TopicPartition): partitions to resume + """ + for partition in partitions: + log.debug("Resuming partition %s", partition) + self._subscription.resume(partition) + + def seek(self, partition, offset): + """Manually specify the fetch offset for a TopicPartition + + Overrides the fetch offsets that the consumer will use on the next + poll(). If this API is invoked for the same partition more than once, + the latest offset will be used on the next poll(). Note that you may + lose data if this API is arbitrarily used in the middle of consumption, + to reset the fetch offsets. + """ + if offset < 0: + raise Errors.IllegalStateError("seek offset must not be a negative number") + log.debug("Seeking to offset %s for partition %s", offset, partition) + self._subscription.assignment[partition].seek(offset) - # Warn on smaller offsets than previous commit - # "commit" offsets are actually the offset of the next message to fetch. - prev_commit = self._offsets.commit[topic_partition] - if prev_commit is not None and ((offset + 1) <= prev_commit): - logger.warning('Marking task_done on a previously committed offset?: %d (+1) <= %d', - offset, prev_commit) + def seek_to_beginning(self, *partitions): + """Seek to the oldest available offset for partitions. - self._offsets.task_done[topic_partition] = offset + Arguments: + *partitions: optionally provide specific TopicPartitions, otherwise + default to all assigned partitions + """ + if not partitions: + partitions = self._subscription.assigned_partitions() + for tp in partitions: + log.debug("Seeking to beginning of partition %s", tp) + self._subscription.need_offset_reset(tp, OffsetResetStrategy.EARLIEST) - # Check for auto-commit - if self._does_auto_commit_messages(): - self._incr_auto_commit_message_count() + def seek_to_end(self, *partitions): + """Seek to the most recent available offset for partitions. - if self._should_auto_commit(): - self.commit() + Arguments: + *partitions: optionally provide specific TopicPartitions, otherwise + default to all assigned partitions + """ + if not partitions: + partitions = self._subscription.assigned_partitions() + for tp in partitions: + log.debug("Seeking to end of partition %s", tp) + self._subscription.need_offset_reset(tp, OffsetResetStrategy.LATEST) - return True + def subscribe(self, topics=(), pattern=None, listener=None): + """Subscribe to a list of topics, or a topic regex pattern - def commit(self): - """Store consumed message offsets (marked via task_done()) - to kafka cluster for this consumer_group. + Partitions will be dynamically assigned via a group coordinator. + Topic subscriptions are not incremental: this list will replace the + current assignment (if there is one). - Returns: - True on success, or False if no offsets were found for commit + This method is incompatible with assign() - Note: - this functionality requires server version >=0.8.1.1 - https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI + Arguments: + topics (list): List of topics for subscription. + pattern (str): Pattern to match available topics. You must provide + either topics or pattern, but not both. + listener (ConsumerRebalanceListener): Optionally include listener + callback, which will be called before and after each rebalance + operation. + + As part of group management, the consumer will keep track of the + list of consumers that belong to a particular group and will + trigger a rebalance operation if one of the following events + trigger: + + * Number of partitions change for any of the subscribed topics + * Topic is created or deleted + * An existing member of the consumer group dies + * A new member is added to the consumer group + + When any of these events are triggered, the provided listener + will be invoked first to indicate that the consumer's assignment + has been revoked, and then again when the new assignment has + been received. Note that this listener will immediately override + any listener set in a previous call to subscribe. It is + guaranteed, however, that the partitions revoked/assigned + through this interface are from topics subscribed in this call. """ - if not self._config['group_id']: - logger.warning('Cannot commit without a group_id!') - raise KafkaConfigurationError( - 'Attempted to commit offsets ' - 'without a configured consumer group (group_id)' - ) - - # API supports storing metadata with each commit - # but for now it is unused - metadata = b'' - - offsets = self._offsets.task_done - commits = [] - for topic_partition, task_done_offset in six.iteritems(offsets): - - # Skip if None - if task_done_offset is None: - continue - - # Commit offsets as the next offset to fetch - # which is consistent with the Java Client - # task_done is marked by messages consumed, - # so add one to mark the next message for fetching - commit_offset = (task_done_offset + 1) - - # Skip if no change from previous committed - if commit_offset == self._offsets.commit[topic_partition]: - continue - - commits.append( - OffsetCommitRequest(topic_partition[0], topic_partition[1], - commit_offset, metadata) - ) - - if commits: - logger.info('committing consumer offsets to group %s', self._config['group_id']) - resps = self._client.send_offset_commit_request( - self._config['group_id'], commits, - fail_on_error=False - ) - - for r in resps: - check_error(r) - topic_partition = (r.topic, r.partition) - task_done = self._offsets.task_done[topic_partition] - self._offsets.commit[topic_partition] = (task_done + 1) - - if self._config['auto_commit_enable']: - self._reset_auto_commit() - - return True - + if not topics: + self.unsubscribe() else: - logger.info('No new offsets found to commit in group %s', self._config['group_id']) - return False - - # - # Topic/partition management private methods - # - - def _consume_topic_partition(self, topic, partition): - if not isinstance(partition, int): - raise KafkaConfigurationError('Unknown partition type (%s) ' - '-- expected int' % type(partition)) - - if topic not in self._cluster.topics(): - raise UnknownTopicOrPartitionError("Topic %s not found in broker metadata" % topic) - if partition not in self._cluster.partitions_for_topic(topic): - raise UnknownTopicOrPartitionError("Partition %d not found in Topic %s " - "in broker metadata" % (partition, topic)) - logger.info("Configuring consumer to fetch topic '%s', partition %d", topic, partition) - self._topics.append((topic, partition)) - - def _refresh_metadata_on_error(self): - refresh_ms = self._config['refresh_leader_backoff_ms'] - jitter_pct = 0.20 - sleep_ms = random.randint( - int((1.0 - 0.5 * jitter_pct) * refresh_ms), - int((1.0 + 0.5 * jitter_pct) * refresh_ms) - ) - while True: - logger.info("Sleeping for refresh_leader_backoff_ms: %d", sleep_ms) - time.sleep(sleep_ms / 1000.0) - try: - self._client.load_metadata_for_topics() - except KafkaUnavailableError: - logger.warning("Unable to refresh topic metadata... cluster unavailable") - self._check_consumer_timeout() + self._subscription.subscribe(topics=topics, + pattern=pattern, + listener=listener) + # regex will need all topic metadata + if pattern is not None: + self._client.cluster.need_metadata_for_all = True + log.debug("Subscribed to topic pattern: %s", topics) else: - logger.info("Topic metadata refreshed") - return - - # - # Offset-managment private methods - # - - def _get_commit_offsets(self): - logger.info("Consumer fetching stored offsets") - for topic_partition in self._topics: - (resp,) = self._client.send_offset_fetch_request( - self._config['group_id'], - [OffsetFetchRequest(topic_partition[0], topic_partition[1])], - fail_on_error=False) - try: - check_error(resp) - # API spec says server wont set an error here - # but 0.8.1.1 does actually... - except UnknownTopicOrPartitionError: - pass - - # -1 offset signals no commit is currently stored - if resp.offset == -1: - self._offsets.commit[topic_partition] = None - - # Otherwise we committed the stored offset - # and need to fetch the next one - else: - self._offsets.commit[topic_partition] = resp.offset - - def _reset_highwater_offsets(self): - for topic_partition in self._topics: - self._offsets.highwater[topic_partition] = None - - def _reset_task_done_offsets(self): - for topic_partition in self._topics: - self._offsets.task_done[topic_partition] = None - - def _reset_partition_offset(self, topic_partition): - (topic, partition) = topic_partition - LATEST = -1 - EARLIEST = -2 - - request_time_ms = None - if self._config['auto_offset_reset'] == 'largest': - request_time_ms = LATEST - elif self._config['auto_offset_reset'] == 'smallest': - request_time_ms = EARLIEST - else: + self._client.set_topics(self._subscription.group_subscription()) + log.debug("Subscribed to topic(s): %s", topics) - # Let's raise an reasonable exception type if user calls - # outside of an exception context - if sys.exc_info() == (None, None, None): - raise OffsetOutOfRangeError('Cannot reset partition offsets without a ' - 'valid auto_offset_reset setting ' - '(largest|smallest)') + def subscription(self): + """Get the current topic subscription. - # Otherwise we should re-raise the upstream exception - # b/c it typically includes additional data about - # the request that triggered it, and we do not want to drop that - raise + Returns: + set: {topic, ...} + """ + return self._subscription.subscription - (offset, ) = self.get_partition_offsets(topic, partition, - request_time_ms, max_num_offsets=1) - return offset + def unsubscribe(self): + """Unsubscribe from all topics and clear all assigned partitions.""" + self._subscription.unsubscribe() + self._coordinator.close() + self._client.cluster.need_metadata_for_all_topics = False + log.debug("Unsubscribed all topics or patterns and assigned partitions") + + def _update_fetch_positions(self, partitions): + """ + Set the fetch position to the committed position (if there is one) + or reset it using the offset reset policy the user has configured. + + Arguments: + partitions (List[TopicPartition]): The partitions that need + updating fetch positions + + Raises: + NoOffsetForPartitionError: If no offset is stored for a given + partition and no offset reset policy is defined + """ + # refresh commits for all assigned partitions + self._coordinator.refresh_committed_offsets_if_needed() - # - # Consumer Timeout private methods - # - - def _set_consumer_timeout_start(self): - self._consumer_timeout = False - if self._config['consumer_timeout_ms'] >= 0: - self._consumer_timeout = time.time() + (self._config['consumer_timeout_ms'] / 1000.0) - - def _check_consumer_timeout(self): - if self._consumer_timeout and time.time() > self._consumer_timeout: - raise ConsumerTimeout('Consumer timed out after %d ms' % + self._config['consumer_timeout_ms']) - - # - # Autocommit private methods - # - - def _should_auto_commit(self): - if self._does_auto_commit_ms(): - if time.time() >= self._next_commit_time: - return True - - if self._does_auto_commit_messages(): - if self._uncommitted_message_count >= self._config['auto_commit_interval_messages']: - return True - - return False - - def _reset_auto_commit(self): - if not self._config['group_id']: - raise KafkaConfigurationError('auto_commit requires group_id') - self._uncommitted_message_count = 0 - self._next_commit_time = None - if self._does_auto_commit_ms(): - self._next_commit_time = time.time() + (self._config['auto_commit_interval_ms'] / 1000.0) - - def _incr_auto_commit_message_count(self, n=1): - self._uncommitted_message_count += n - - def _does_auto_commit_ms(self): - if not self._config['auto_commit_enable']: - return False - - conf = self._config['auto_commit_interval_ms'] - if conf is not None and conf > 0: - return True - return False - - def _does_auto_commit_messages(self): - if not self._config['auto_commit_enable']: - return False - - conf = self._config['auto_commit_interval_messages'] - if conf is not None and conf > 0: - return True - return False - - # - # Message iterator private methods - # - - def __iter__(self): - return self - - def __next__(self): - return self.next() - - def _get_message_iterator(self): - # Fetch a new batch if needed - if self._msg_iter is None: - self._msg_iter = self.fetch_messages() - - return self._msg_iter - - def _reset_message_iterator(self): - self._msg_iter = None - - # - # python private methods - # - - def __repr__(self): - return '<{0} topics=({1})>'.format( - self.__class__.__name__, - '|'.join(["%s-%d" % topic_partition - for topic_partition in self._topics]) - ) - - # - # other private methods - # - - def _deprecate_configs(self, **configs): - for old, new in six.iteritems(DEPRECATED_CONFIG_KEYS): - if old in configs: - logger.warning('Deprecated Kafka Consumer configuration: %s. ' - 'Please use %s instead.', old, new) - old_value = configs.pop(old) - if new not in configs: - configs[new] = old_value - return configs + # then do any offset lookups in case some positions are not known + self._fetcher.update_fetch_positions(partitions) |