diff options
author | Dana Powers <dana.powers@rd.io> | 2015-12-01 11:11:51 -0800 |
---|---|---|
committer | Zack Dever <zack.dever@rd.io> | 2015-12-04 11:25:40 -0800 |
commit | 235f7ac855f937207c3d430ad0dc762ff0c21091 (patch) | |
tree | 2ec233a82a0c41c48eb0daec2a317f66992e4278 | |
parent | c94cb620292f93a4cd3cfc0bb57c5fa38d95a717 (diff) | |
download | kafka-python-235f7ac855f937207c3d430ad0dc762ff0c21091.tar.gz |
Unfinished kafka.consumer.group commit
-rw-r--r-- | kafka/consumer/group.py | 883 |
1 files changed, 883 insertions, 0 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py new file mode 100644 index 0000000..4a630ed --- /dev/null +++ b/kafka/consumer/group.py @@ -0,0 +1,883 @@ +from __future__ import absolute_import + +from collections import namedtuple +from copy import deepcopy +import logging +import random +import sys +import time + +import six + +from kafka.cluster import Cluster +from kafka.common import ( + OffsetFetchRequest, OffsetCommitRequest, OffsetRequest, FetchRequest, + check_error, NotLeaderForPartitionError, UnknownTopicOrPartitionError, + OffsetOutOfRangeError, RequestTimedOutError, KafkaMessage, ConsumerTimeout, + FailedPayloadsError, KafkaUnavailableError, KafkaConfigurationError +) +from kafka.util import kafka_bytestring + +logger = logging.getLogger(__name__) + +OffsetsStruct = namedtuple("OffsetsStruct", ["fetch", "highwater", "commit", "task_done"]) + +NEW_CONSUMER_CONFIGS = { + 'bootstrap_servers': None, + 'client_id': None, + 'group_id': None, + 'key_deserializer': None, + 'value_deserializer': None, + 'auto_commit_interval_ms': 5000, + 'auto_offset_reset': 'latest', + 'check_crcs': True, # "Automatically check the CRC32 of the records consumed. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so it may be disabled in cases seeking extreme performance."; + 'connections_max_idle_ms': 9 * 60 * 1000, + 'enable_auto_commit': True, + 'fetch_max_wait_ms': 500, + 'fetch_min_bytes': 1024, + 'heartbeat_interval_ms': 3000, + 'max_partition_fetch_bytes': 1 * 1024 * 1024, + 'metadata_max_age_ms': 5 * 60 * 1000, # >0 + 'metric_reporters': None, + 'metrics_num_samples': 2, + 'metrics_sample_window_ms': 30000, + 'partition_assignment_strategy': None, # This should default to something like 'roundrobin' or 'range' + 'reconnect_backoff_ms': 50, + 'request_timeout_ms': 40 * 1000, + 'retry_backoff_ms': 100, + 'send_buffer_bytes': 128 * 1024, + 'receive_buffer_bytes': 32 * 1024, + 'session_timeout_ms': 30000, # "The timeout used to detect failures when using Kafka's group management facilities."; +} + +DEFAULT_CONSUMER_CONFIG = { + 'client_id': __name__, + 'group_id': None, + 'bootstrap_servers': [], + 'socket_timeout_ms': 30 * 1000, + 'fetch_message_max_bytes': 1024 * 1024, + 'auto_offset_reset': 'largest', + 'fetch_min_bytes': 1, + 'fetch_wait_max_ms': 100, + 'refresh_leader_backoff_ms': 200, + 'deserializer_class': lambda msg: msg, + 'auto_commit_enable': False, + 'auto_commit_interval_ms': 60 * 1000, + 'auto_commit_interval_messages': None, + 'consumer_timeout_ms': -1, + + # Currently unused + 'socket_receive_buffer_bytes': 64 * 1024, + 'num_consumer_fetchers': 1, + 'default_fetcher_backoff_ms': 1000, + 'queued_max_message_chunks': 10, + 'rebalance_max_retries': 4, + 'rebalance_backoff_ms': 2000, +} + +DEPRECATED_CONFIG_KEYS = { + 'metadata_broker_list': 'bootstrap_servers', +} + +class KafkaConsumer(object): + """A simpler kafka consumer""" + + def __init__(self, *topics, **configs): + self._config = deepcopy(DEFAULT_CONSUMER_CONFIG) + self._topics = topics + self._partitions = [] + self._offsets = OffsetsStruct(fetch=dict(), commit=dict(), highwater=dict(), task_done=dict()) + self._consumer_timeout = False + self._uncommitted_message_count = 0 + self._next_commit_time = None + self._msg_iter = None + + self._configure(**configs) + self._cluster = Cluster(**self._config) + + def assign(self, topic_partitions): + pass + + def assignment(self): + """Get the set of partitions currently assigned to this consumer.""" + pass + + def close(self): + """Close the consumer, waiting indefinitely for any needed cleanup.""" + pass + + def commitAsync(self, topic_partition_offsets_and_metadata=None, callback=None): + """ + Commit offsets the specified offsets, or those returned on the last poll(), + for all the subscribed list of topics and partition. Asynchronous. + """ + pass + + def commitSync(self, topic_partition_offsets_and_metadata=None): + """ + Commit offsets the specified offsets, or those returned on the last poll(), + for all the subscribed list of topics and partition. Synchronous. + Blocks until either the commit succeeds or an unrecoverable error is + encountered (in which case it is thrown to the caller). + """ + pass + + def committed(self, topic_partition): + """ + Get the last committed offset for the given partition (whether the + commit happened by this process or another). + Returns: offset_and_metadata + """ + pass + + def listTopics(self): + """ + Get metadata about partitions for all topics that the user is authorized + to view. + Returns: {topic: [partition_info]} + """ + pass + + def metrics(self): + """ + Get the metrics kept by the consumer. + Returns: {metric_name: metric} + """ + pass + + def partitionsFor(self, topic): + """ + Get metadata about the partitions for a given topic. + Returns: [partition_info] + """ + pass + + def pause(self, *topic_partitions): + """Suspend fetching from the requested partitions.""" + pass + + def poll(self, timeout): + """ + Fetch data for the topics or partitions specified using one of the + subscribe/assign APIs. + Returns: [consumer_records] + """ + pass + + def position(self, topic_partition): + """Get the offset of the next record that will be fetched (if a record + with that offset exists).""" + pass + + def resume(self, *topic_partitions): + """Resume specified partitions which have been paused""" + pass + + def seek(self, topic_partition, offset): + """Overrides the fetch offsets that the consumer will use on the next + poll(timeout).""" + pass + + def seekToBeginning(self, *topic_partitions): + """Seek to the first offset for each of the given partitions.""" + pass + + def seekToEnd(self, *topic_partitions): + """Seek to the last offset for each of the given partitions.""" + pass + + def subscribe(self, topics, callback=None): + """Subscribe to the given list of topics or those matching a regex to get dynamically assigned + partitions.""" + pass + + def subscription(self): + """ + Get the current subscription. + Returns: [topic] + """ + pass + + def unsubscribe(self): + """Unsubscribe from topics currently subscribed with subscribe(List).""" + pass + + def wakeup(self): + """Wakeup the consumer.""" + pass + + def _configure(self, **configs): + """Configure the consumer instance + + Configuration settings can be passed to constructor, + otherwise defaults will be used: + + Keyword Arguments: + bootstrap_servers (list): List of initial broker nodes the consumer + should contact to bootstrap initial cluster metadata. This does + not have to be the full node list. It just needs to have at + least one broker that will respond to a Metadata API Request. + client_id (str): a unique name for this client. Defaults to + 'kafka.consumer.kafka'. + group_id (str): the name of the consumer group to join, + Offsets are fetched / committed to this group name. + fetch_message_max_bytes (int, optional): Maximum bytes for each + topic/partition fetch request. Defaults to 1024*1024. + fetch_min_bytes (int, optional): Minimum amount of data the server + should return for a fetch request, otherwise wait up to + fetch_wait_max_ms for more data to accumulate. Defaults to 1. + fetch_wait_max_ms (int, optional): Maximum time for the server to + block waiting for fetch_min_bytes messages to accumulate. + Defaults to 100. + refresh_leader_backoff_ms (int, optional): Milliseconds to backoff + when refreshing metadata on errors (subject to random jitter). + Defaults to 200. + socket_timeout_ms (int, optional): TCP socket timeout in + milliseconds. Defaults to 30*1000. + auto_offset_reset (str, optional): A policy for resetting offsets on + OffsetOutOfRange errors. 'smallest' will move to the oldest + available message, 'largest' will move to the most recent. Any + ofther value will raise the exception. Defaults to 'largest'. + deserializer_class (callable, optional): Any callable that takes a + raw message value and returns a deserialized value. Defaults to + lambda msg: msg. + auto_commit_enable (bool, optional): Enabling auto-commit will cause + the KafkaConsumer to periodically commit offsets without an + explicit call to commit(). Defaults to False. + auto_commit_interval_ms (int, optional): If auto_commit_enabled, + the milliseconds between automatic offset commits. Defaults to + 60 * 1000. + auto_commit_interval_messages (int, optional): If + auto_commit_enabled, a number of messages consumed between + automatic offset commits. Defaults to None (disabled). + consumer_timeout_ms (int, optional): number of millisecond to throw + a timeout exception to the consumer if no message is available + for consumption. Defaults to -1 (dont throw exception). + + Configuration parameters are described in more detail at + http://kafka.apache.org/documentation.html#highlevelconsumerapi + """ + configs = self._deprecate_configs(**configs) + self._config.update(configs) + + if self._config['auto_commit_enable']: + logger.info('Configuring consumer to auto-commit offsets') + self._reset_auto_commit() + + def set_topic_partitions(self, *topics): + """ + Set the topic/partitions to consume + Optionally specify offsets to start from + + Accepts types: + + * str (utf-8): topic name (will consume all available partitions) + * tuple: (topic, partition) + * dict: + - { topic: partition } + - { topic: [partition list] } + - { topic: (partition tuple,) } + + Optionally, offsets can be specified directly: + + * tuple: (topic, partition, offset) + * dict: { (topic, partition): offset, ... } + + Example: + + .. code:: python + + kafka = KafkaConsumer() + + # Consume topic1-all; topic2-partition2; topic3-partition0 + kafka.set_topic_partitions("topic1", ("topic2", 2), {"topic3": 0}) + + # Consume topic1-0 starting at offset 12, and topic2-1 at offset 45 + # using tuples -- + kafka.set_topic_partitions(("topic1", 0, 12), ("topic2", 1, 45)) + + # using dict -- + kafka.set_topic_partitions({ ("topic1", 0): 12, ("topic2", 1): 45 }) + + """ + self._cluster.refresh_metadata() + + # Handle different topic types + for arg in topics: + + # Topic name str -- all partitions + if isinstance(arg, (six.string_types, six.binary_type)): + topic = kafka_bytestring(arg) + for partition in self._cluster.partitions_for_topic(topic): + self._consume_topic_partition(topic, partition) + + # (topic, partition [, offset]) tuple + elif isinstance(arg, tuple): + topic = kafka_bytestring(arg[0]) + partition = arg[1] + self._consume_topic_partition(topic, partition) + if len(arg) == 3: + offset = arg[2] + self._offsets.fetch[(topic, partition)] = offset + + # { topic: partitions, ... } dict + elif isinstance(arg, dict): + for key, value in six.iteritems(arg): + + # key can be string (a topic) + if isinstance(key, (six.string_types, six.binary_type)): + topic = kafka_bytestring(key) + + # topic: partition + if isinstance(value, int): + self._consume_topic_partition(topic, value) + + # topic: [ partition1, partition2, ... ] + elif isinstance(value, (list, tuple)): + for partition in value: + self._consume_topic_partition(topic, partition) + else: + raise KafkaConfigurationError( + 'Unknown topic type ' + '(dict key must be int or list/tuple of ints)' + ) + + # (topic, partition): offset + elif isinstance(key, tuple): + topic = kafka_bytestring(key[0]) + partition = key[1] + self._consume_topic_partition(topic, partition) + self._offsets.fetch[(topic, partition)] = value + + else: + raise KafkaConfigurationError('Unknown topic type (%s)' % type(arg)) + + # If we have a consumer group, try to fetch stored offsets + if self._config['group_id']: + self._get_commit_offsets() + + # Update missing fetch/commit offsets + for topic_partition in self._topics: + + # Commit offsets default is None + if topic_partition not in self._offsets.commit: + self._offsets.commit[topic_partition] = None + + # Skip if we already have a fetch offset from user args + if topic_partition not in self._offsets.fetch: + + # Fetch offsets default is (1) commit + if self._offsets.commit[topic_partition] is not None: + self._offsets.fetch[topic_partition] = self._offsets.commit[topic_partition] + + # or (2) auto reset + else: + self._offsets.fetch[topic_partition] = self._reset_partition_offset(topic_partition) + + # highwater marks (received from server on fetch response) + # and task_done (set locally by user) + # should always get initialized to None + self._reset_highwater_offsets() + self._reset_task_done_offsets() + + # Reset message iterator in case we were in the middle of one + self._reset_message_iterator() + + def next(self): + """Return the next available message + + Blocks indefinitely unless consumer_timeout_ms > 0 + + Returns: + a single KafkaMessage from the message iterator + + Raises: + ConsumerTimeout after consumer_timeout_ms and no message + + Note: + This is also the method called internally during iteration + + """ + self._set_consumer_timeout_start() + while True: + + try: + return six.next(self._get_message_iterator()) + + # Handle batch completion + except StopIteration: + self._reset_message_iterator() + + self._check_consumer_timeout() + + def fetch_messages(self): + """Sends FetchRequests for all topic/partitions set for consumption + + Returns: + Generator that yields KafkaMessage structs + after deserializing with the configured `deserializer_class` + + Note: + Refreshes metadata on errors, and resets fetch offset on + OffsetOutOfRange, per the configured `auto_offset_reset` policy + + See Also: + Key KafkaConsumer configuration parameters: + * `fetch_message_max_bytes` + * `fetch_max_wait_ms` + * `fetch_min_bytes` + * `deserializer_class` + * `auto_offset_reset` + + """ + + max_bytes = self._config['fetch_message_max_bytes'] + max_wait_time = self._config['fetch_wait_max_ms'] + min_bytes = self._config['fetch_min_bytes'] + + if not self._topics: + raise KafkaConfigurationError('No topics or partitions configured') + + if not self._offsets.fetch: + raise KafkaConfigurationError( + 'No fetch offsets found when calling fetch_messages' + ) + + fetches = [FetchRequest(topic, partition, + self._offsets.fetch[(topic, partition)], + max_bytes) + for (topic, partition) in self._topics] + + # send_fetch_request will batch topic/partition requests by leader + responses = self._client.send_fetch_request( + fetches, + max_wait_time=max_wait_time, + min_bytes=min_bytes, + fail_on_error=False + ) + + for resp in responses: + + if isinstance(resp, FailedPayloadsError): + logger.warning('FailedPayloadsError attempting to fetch data') + self._refresh_metadata_on_error() + continue + + topic = kafka_bytestring(resp.topic) + partition = resp.partition + try: + check_error(resp) + except OffsetOutOfRangeError: + logger.warning('OffsetOutOfRange: topic %s, partition %d, ' + 'offset %d (Highwatermark: %d)', + topic, partition, + self._offsets.fetch[(topic, partition)], + resp.highwaterMark) + # Reset offset + self._offsets.fetch[(topic, partition)] = ( + self._reset_partition_offset((topic, partition)) + ) + continue + + except NotLeaderForPartitionError: + logger.warning("NotLeaderForPartitionError for %s - %d. " + "Metadata may be out of date", + topic, partition) + self._refresh_metadata_on_error() + continue + + except RequestTimedOutError: + logger.warning("RequestTimedOutError for %s - %d", + topic, partition) + continue + + # Track server highwater mark + self._offsets.highwater[(topic, partition)] = resp.highwaterMark + + # Yield each message + # Kafka-python could raise an exception during iteration + # we are not catching -- user will need to address + for (offset, message) in resp.messages: + # deserializer_class could raise an exception here + val = self._config['deserializer_class'](message.value) + msg = KafkaMessage(topic, partition, offset, message.key, val) + + # in some cases the server will return earlier messages + # than we requested. skip them per kafka spec + if offset < self._offsets.fetch[(topic, partition)]: + logger.debug('message offset less than fetched offset ' + 'skipping: %s', msg) + continue + # Only increment fetch offset + # if we safely got the message and deserialized + self._offsets.fetch[(topic, partition)] = offset + 1 + + # Then yield to user + yield msg + + def get_partition_offsets(self, topic, partition, request_time_ms, max_num_offsets): + """Request available fetch offsets for a single topic/partition + + Keyword Arguments: + topic (str): topic for offset request + partition (int): partition for offset request + request_time_ms (int): Used to ask for all messages before a + certain time (ms). There are two special values. + Specify -1 to receive the latest offset (i.e. the offset of the + next coming message) and -2 to receive the earliest available + offset. Note that because offsets are pulled in descending + order, asking for the earliest offset will always return you a + single element. + max_num_offsets (int): Maximum offsets to include in the OffsetResponse + + Returns: + a list of offsets in the OffsetResponse submitted for the provided + topic / partition. See: + https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI + """ + reqs = [OffsetRequest(topic, partition, request_time_ms, max_num_offsets)] + + (resp,) = self._client.send_offset_request(reqs) + + check_error(resp) + + # Just for sanity.. + # probably unnecessary + assert resp.topic == topic + assert resp.partition == partition + + return resp.offsets + + def offsets(self, group=None): + """Get internal consumer offset values + + Keyword Arguments: + group: Either "fetch", "commit", "task_done", or "highwater". + If no group specified, returns all groups. + + Returns: + A copy of internal offsets struct + """ + if not group: + return { + 'fetch': self.offsets('fetch'), + 'commit': self.offsets('commit'), + 'task_done': self.offsets('task_done'), + 'highwater': self.offsets('highwater') + } + else: + return dict(deepcopy(getattr(self._offsets, group))) + + def task_done(self, message): + """Mark a fetched message as consumed. + + Offsets for messages marked as "task_done" will be stored back + to the kafka cluster for this consumer group on commit() + + Arguments: + message (KafkaMessage): the message to mark as complete + + Returns: + True, unless the topic-partition for this message has not + been configured for the consumer. In normal operation, this + should not happen. But see github issue 364. + """ + topic_partition = (message.topic, message.partition) + if topic_partition not in self._topics: + logger.warning('Unrecognized topic/partition in task_done message: ' + '{0}:{1}'.format(*topic_partition)) + return False + + offset = message.offset + + # Warn on non-contiguous offsets + prev_done = self._offsets.task_done[topic_partition] + if prev_done is not None and offset != (prev_done + 1): + logger.warning('Marking task_done on a non-continuous offset: %d != %d + 1', + offset, prev_done) + + # Warn on smaller offsets than previous commit + # "commit" offsets are actually the offset of the next message to fetch. + prev_commit = self._offsets.commit[topic_partition] + if prev_commit is not None and ((offset + 1) <= prev_commit): + logger.warning('Marking task_done on a previously committed offset?: %d (+1) <= %d', + offset, prev_commit) + + self._offsets.task_done[topic_partition] = offset + + # Check for auto-commit + if self._does_auto_commit_messages(): + self._incr_auto_commit_message_count() + + if self._should_auto_commit(): + self.commit() + + return True + + def commit(self): + """Store consumed message offsets (marked via task_done()) + to kafka cluster for this consumer_group. + + Returns: + True on success, or False if no offsets were found for commit + + Note: + this functionality requires server version >=0.8.1.1 + https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI + """ + if not self._config['group_id']: + logger.warning('Cannot commit without a group_id!') + raise KafkaConfigurationError( + 'Attempted to commit offsets ' + 'without a configured consumer group (group_id)' + ) + + # API supports storing metadata with each commit + # but for now it is unused + metadata = b'' + + offsets = self._offsets.task_done + commits = [] + for topic_partition, task_done_offset in six.iteritems(offsets): + + # Skip if None + if task_done_offset is None: + continue + + # Commit offsets as the next offset to fetch + # which is consistent with the Java Client + # task_done is marked by messages consumed, + # so add one to mark the next message for fetching + commit_offset = (task_done_offset + 1) + + # Skip if no change from previous committed + if commit_offset == self._offsets.commit[topic_partition]: + continue + + commits.append( + OffsetCommitRequest(topic_partition[0], topic_partition[1], + commit_offset, metadata) + ) + + if commits: + logger.info('committing consumer offsets to group %s', self._config['group_id']) + resps = self._client.send_offset_commit_request( + kafka_bytestring(self._config['group_id']), commits, + fail_on_error=False + ) + + for r in resps: + check_error(r) + topic_partition = (r.topic, r.partition) + task_done = self._offsets.task_done[topic_partition] + self._offsets.commit[topic_partition] = (task_done + 1) + + if self._config['auto_commit_enable']: + self._reset_auto_commit() + + return True + + else: + logger.info('No new offsets found to commit in group %s', self._config['group_id']) + return False + + # + # Topic/partition management private methods + # + + def _consume_topic_partition(self, topic, partition): + if not isinstance(partition, int): + raise KafkaConfigurationError('Unknown partition type (%s) ' + '-- expected int' % type(partition)) + + if topic not in self._cluster.topics(): + raise UnknownTopicOrPartitionError("Topic %s not found in broker metadata" % topic) + if partition not in self._cluster.partitions_for_topic(topic): + raise UnknownTopicOrPartitionError("Partition %d not found in Topic %s " + "in broker metadata" % (partition, topic)) + logger.info("Configuring consumer to fetch topic '%s', partition %d", topic, partition) + self._topics.append((topic, partition)) + + def _refresh_metadata_on_error(self): + refresh_ms = self._config['refresh_leader_backoff_ms'] + jitter_pct = 0.20 + sleep_ms = random.randint( + int((1.0 - 0.5 * jitter_pct) * refresh_ms), + int((1.0 + 0.5 * jitter_pct) * refresh_ms) + ) + while True: + logger.info("Sleeping for refresh_leader_backoff_ms: %d", sleep_ms) + time.sleep(sleep_ms / 1000.0) + try: + self._client.load_metadata_for_topics() + except KafkaUnavailableError: + logger.warning("Unable to refresh topic metadata... cluster unavailable") + self._check_consumer_timeout() + else: + logger.info("Topic metadata refreshed") + return + + # + # Offset-managment private methods + # + + def _get_commit_offsets(self): + logger.info("Consumer fetching stored offsets") + for topic_partition in self._topics: + (resp,) = self._client.send_offset_fetch_request( + kafka_bytestring(self._config['group_id']), + [OffsetFetchRequest(topic_partition[0], topic_partition[1])], + fail_on_error=False) + try: + check_error(resp) + # API spec says server wont set an error here + # but 0.8.1.1 does actually... + except UnknownTopicOrPartitionError: + pass + + # -1 offset signals no commit is currently stored + if resp.offset == -1: + self._offsets.commit[topic_partition] = None + + # Otherwise we committed the stored offset + # and need to fetch the next one + else: + self._offsets.commit[topic_partition] = resp.offset + + def _reset_highwater_offsets(self): + for topic_partition in self._topics: + self._offsets.highwater[topic_partition] = None + + def _reset_task_done_offsets(self): + for topic_partition in self._topics: + self._offsets.task_done[topic_partition] = None + + def _reset_partition_offset(self, topic_partition): + (topic, partition) = topic_partition + LATEST = -1 + EARLIEST = -2 + + request_time_ms = None + if self._config['auto_offset_reset'] == 'largest': + request_time_ms = LATEST + elif self._config['auto_offset_reset'] == 'smallest': + request_time_ms = EARLIEST + else: + + # Let's raise an reasonable exception type if user calls + # outside of an exception context + if sys.exc_info() == (None, None, None): + raise OffsetOutOfRangeError('Cannot reset partition offsets without a ' + 'valid auto_offset_reset setting ' + '(largest|smallest)') + + # Otherwise we should re-raise the upstream exception + # b/c it typically includes additional data about + # the request that triggered it, and we do not want to drop that + raise + + (offset, ) = self.get_partition_offsets(topic, partition, + request_time_ms, max_num_offsets=1) + return offset + + # + # Consumer Timeout private methods + # + + def _set_consumer_timeout_start(self): + self._consumer_timeout = False + if self._config['consumer_timeout_ms'] >= 0: + self._consumer_timeout = time.time() + (self._config['consumer_timeout_ms'] / 1000.0) + + def _check_consumer_timeout(self): + if self._consumer_timeout and time.time() > self._consumer_timeout: + raise ConsumerTimeout('Consumer timed out after %d ms' % + self._config['consumer_timeout_ms']) + + # + # Autocommit private methods + # + + def _should_auto_commit(self): + if self._does_auto_commit_ms(): + if time.time() >= self._next_commit_time: + return True + + if self._does_auto_commit_messages(): + if self._uncommitted_message_count >= self._config['auto_commit_interval_messages']: + return True + + return False + + def _reset_auto_commit(self): + if not self._config['group_id']: + raise KafkaConfigurationError('auto_commit requires group_id') + self._uncommitted_message_count = 0 + self._next_commit_time = None + if self._does_auto_commit_ms(): + self._next_commit_time = time.time() + (self._config['auto_commit_interval_ms'] / 1000.0) + + def _incr_auto_commit_message_count(self, n=1): + self._uncommitted_message_count += n + + def _does_auto_commit_ms(self): + if not self._config['auto_commit_enable']: + return False + + conf = self._config['auto_commit_interval_ms'] + if conf is not None and conf > 0: + return True + return False + + def _does_auto_commit_messages(self): + if not self._config['auto_commit_enable']: + return False + + conf = self._config['auto_commit_interval_messages'] + if conf is not None and conf > 0: + return True + return False + + # + # Message iterator private methods + # + + def __iter__(self): + return self + + def __next__(self): + return self.next() + + def _get_message_iterator(self): + # Fetch a new batch if needed + if self._msg_iter is None: + self._msg_iter = self.fetch_messages() + + return self._msg_iter + + def _reset_message_iterator(self): + self._msg_iter = None + + # + # python private methods + # + + def __repr__(self): + return '<{0} topics=({1})>'.format( + self.__class__.__name__, + '|'.join(["%s-%d" % topic_partition + for topic_partition in self._topics]) + ) + + # + # other private methods + # + + def _deprecate_configs(self, **configs): + for old, new in six.iteritems(DEPRECATED_CONFIG_KEYS): + if old in configs: + logger.warning('Deprecated Kafka Consumer configuration: %s. ' + 'Please use %s instead.', old, new) + old_value = configs.pop(old) + if new not in configs: + configs[new] = old_value + return configs |