summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/consumer/group.py1277
1 files changed, 480 insertions, 797 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index dba5f60..abd9473 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -1,883 +1,566 @@
-#pylint: skip-file
from __future__ import absolute_import
-from collections import namedtuple
-from copy import deepcopy
import logging
-import random
-import sys
import time
-import six
-
-from kafka.cluster import Cluster
-from kafka.common import (
- OffsetFetchRequest, OffsetCommitRequest, OffsetRequest, FetchRequest,
- check_error, NotLeaderForPartitionError, UnknownTopicOrPartitionError,
- OffsetOutOfRangeError, RequestTimedOutError, KafkaMessage, ConsumerTimeout,
- FailedPayloadsError, KafkaUnavailableError, KafkaConfigurationError
-)
-
-logger = logging.getLogger(__name__)
-
-OffsetsStruct = namedtuple("OffsetsStruct", ["fetch", "highwater", "commit", "task_done"])
-
-NEW_CONSUMER_CONFIGS = {
- 'bootstrap_servers': None,
- 'client_id': None,
- 'group_id': None,
- 'key_deserializer': None,
- 'value_deserializer': None,
- 'auto_commit_interval_ms': 5000,
- 'auto_offset_reset': 'latest',
- 'check_crcs': True, # "Automatically check the CRC32 of the records consumed. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so it may be disabled in cases seeking extreme performance.";
- 'connections_max_idle_ms': 9 * 60 * 1000,
- 'enable_auto_commit': True,
- 'fetch_max_wait_ms': 500,
- 'fetch_min_bytes': 1024,
- 'heartbeat_interval_ms': 3000,
- 'max_partition_fetch_bytes': 1 * 1024 * 1024,
- 'metadata_max_age_ms': 5 * 60 * 1000, # >0
- 'metric_reporters': None,
- 'metrics_num_samples': 2,
- 'metrics_sample_window_ms': 30000,
- 'partition_assignment_strategy': None, # This should default to something like 'roundrobin' or 'range'
- 'reconnect_backoff_ms': 50,
- 'request_timeout_ms': 40 * 1000,
- 'retry_backoff_ms': 100,
- 'send_buffer_bytes': 128 * 1024,
- 'receive_buffer_bytes': 32 * 1024,
- 'session_timeout_ms': 30000, # "The timeout used to detect failures when using Kafka's group management facilities.";
-}
-
-DEFAULT_CONSUMER_CONFIG = {
- 'client_id': __name__,
- 'group_id': None,
- 'bootstrap_servers': [],
- 'socket_timeout_ms': 30 * 1000,
- 'fetch_message_max_bytes': 1024 * 1024,
- 'auto_offset_reset': 'largest',
- 'fetch_min_bytes': 1,
- 'fetch_wait_max_ms': 100,
- 'refresh_leader_backoff_ms': 200,
- 'deserializer_class': lambda msg: msg,
- 'auto_commit_enable': False,
- 'auto_commit_interval_ms': 60 * 1000,
- 'auto_commit_interval_messages': None,
- 'consumer_timeout_ms': -1,
-
- # Currently unused
- 'socket_receive_buffer_bytes': 64 * 1024,
- 'num_consumer_fetchers': 1,
- 'default_fetcher_backoff_ms': 1000,
- 'queued_max_message_chunks': 10,
- 'rebalance_max_retries': 4,
- 'rebalance_backoff_ms': 2000,
-}
-
-DEPRECATED_CONFIG_KEYS = {
- 'metadata_broker_list': 'bootstrap_servers',
-}
+import kafka.common as Errors
-class KafkaConsumer(object):
- """A simpler kafka consumer"""
-
- def __init__(self, *topics, **configs):
- self._config = deepcopy(DEFAULT_CONSUMER_CONFIG)
- self._topics = topics
- self._partitions = []
- self._offsets = OffsetsStruct(fetch=dict(), commit=dict(), highwater=dict(), task_done=dict())
- self._consumer_timeout = False
- self._uncommitted_message_count = 0
- self._next_commit_time = None
- self._msg_iter = None
-
- self._configure(**configs)
- self._cluster = Cluster(**self._config)
-
- def assign(self, topic_partitions):
- pass
+from kafka.client_async import KafkaClient
+from kafka.consumer.fetcher import Fetcher
+from kafka.consumer.subscription_state import SubscriptionState
+from kafka.coordinator.consumer import ConsumerCoordinator
+from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
+from kafka.protocol.offset import OffsetResetStrategy
+from kafka.version import __version__
- def assignment(self):
- """Get the set of partitions currently assigned to this consumer."""
- pass
+log = logging.getLogger(__name__)
- def close(self):
- """Close the consumer, waiting indefinitely for any needed cleanup."""
- pass
- def commitAsync(self, topic_partition_offsets_and_metadata=None, callback=None):
- """
- Commit offsets the specified offsets, or those returned on the last poll(),
- for all the subscribed list of topics and partition. Asynchronous.
- """
- pass
+class KafkaConsumer(object):
+ """Consumer for Kafka 0.9"""
+ _bootstrap_servers = 'localhost'
+ _client_id = 'kafka-python-' + __version__
+ _group_id = 'kafka-python-default-group'
+ _key_deserializer = None
+ _value_deserializer = None
+ _fetch_max_wait_ms = 500
+ _fetch_min_bytes = 1024
+ _max_partition_fetch_bytes = 1 * 1024 * 1024
+ _request_timeout_ms = 40 * 1000
+ _retry_backoff_ms = 100
+ _reconnect_backoff_ms = 50
+ _auto_offset_reset = 'latest'
+ _enable_auto_commit = True
+ _auto_commit_interval_ms = 5000
+ _check_crcs = True
+ _metadata_max_age_ms = 5 * 60 * 1000
+ _partition_assignment_strategy = (RoundRobinPartitionAssignor,)
+ _heartbeat_interval_ms = 3000
+ _session_timeout_ms = 30000
+ _send_buffer_bytes = 128 * 1024
+ _receive_buffer_bytes = 32 * 1024
+ _connections_max_idle_ms = 9 * 60 * 1000 # not implemented yet
+ #_metric_reporters = None
+ #_metrics_num_samples = 2
+ #_metrics_sample_window_ms = 30000
+
+ def __init__(self, *topics, **kwargs):
+ """A Kafka client that consumes records from a Kafka cluster.
+
+ The consumer will transparently handle the failure of servers in the
+ Kafka cluster, and transparently adapt as partitions of data it fetches
+ migrate within the cluster. This client also interacts with the server
+ to allow groups of consumers to load balance consumption using consumer
+ groups.
+
+ Requires Kafka Server >= 0.9.0.0
+
+ Configuration settings can be passed to constructor as kwargs,
+ otherwise defaults will be used:
- def commitSync(self, topic_partition_offsets_and_metadata=None):
- """
- Commit offsets the specified offsets, or those returned on the last poll(),
- for all the subscribed list of topics and partition. Synchronous.
- Blocks until either the commit succeeds or an unrecoverable error is
- encountered (in which case it is thrown to the caller).
- """
- pass
+ Keyword Arguments:
+ bootstrap_servers: 'host[:port]' string (or list of 'host[:port]'
+ strings) that the consumer should contact to bootstrap initial
+ cluster metadata. This does not have to be the full node list.
+ It just needs to have at least one broker that will respond to a
+ Metadata API Request. Default port is 9092. If no servers are
+ specified, will default to localhost:9092.
+ client_id (str): a name for this client. This string is passed in
+ each request to servers and can be used to identify specific
+ server-side log entries that correspond to this client. Also
+ submitted to GroupCoordinator for logging with respect to
+ consumer group administration. Default: 'kafka-python-{version}'
+ group_id (str): name of the consumer group to join for dynamic
+ partition assignment (if enabled), and to use for fetching and
+ committing offsets. Default: 'kafka-python-default-group'
+ key_deserializer (callable): Any callable that takes a
+ raw message key and returns a deserialized key.
+ value_deserializer (callable, optional): Any callable that takes a
+ raw message value and returns a deserialized value.
+ fetch_min_bytes (int): Minimum amount of data the server should
+ return for a fetch request, otherwise wait up to
+ fetch_wait_max_ms for more data to accumulate. Default: 1024.
+ fetch_wait_max_ms (int): The maximum amount of time in milliseconds
+ the server will block before answering the fetch request if
+ there isn't sufficient data to immediately satisfy the
+ requirement given by fetch_min_bytes. Default: 500.
+ max_partition_fetch_bytes (int): The maximum amount of data
+ per-partition the server will return. The maximum total memory
+ used for a request = #partitions * max_partition_fetch_bytes.
+ This size must be at least as large as the maximum message size
+ the server allows or else it is possible for the producer to
+ send messages larger than the consumer can fetch. If that
+ happens, the consumer can get stuck trying to fetch a large
+ message on a certain partition. Default: 1048576.
+ request_timeout_ms (int): Client request timeout in milliseconds.
+ Default: 40000.
+ retry_backoff_ms (int): Milliseconds to backoff when retrying on
+ errors. Default: 100.
+ reconnect_backoff_ms (int): The amount of time in milliseconds to
+ wait before attempting to reconnect to a given host. Defaults
+ to 50.
+ auto_offset_reset (str): A policy for resetting offsets on
+ OffsetOutOfRange errors: 'earliest' will move to the oldest
+ available message, 'latest' will move to the most recent. Any
+ ofther value will raise the exception. Default: 'latest'.
+ enable_auto_commit (bool): If true the consumer's offset will be
+ periodically committed in the background. Default: True.
+ auto_commit_interval_ms (int): milliseconds between automatic
+ offset commits, if enable_auto_commit is True. Default: 5000.
+ check_crcs (bool): Automatically check the CRC32 of the records
+ consumed. This ensures no on-the-wire or on-disk corruption to
+ the messages occurred. This check adds some overhead, so it may
+ be disabled in cases seeking extreme performance. Default: True
+ metadata_max_age_ms (int): The period of time in milliseconds after
+ which we force a refresh of metadata even if we haven't seen any
+ partition leadership changes to proactively discover any new
+ brokers or partitions. Default: 300000
+ partition_assignment_strategy (list): List of objects to use to
+ distribute partition ownership amongst consumer instances when
+ group management is used. Default: [RoundRobinPartitionAssignor]
+ heartbeat_interval_ms (int): The expected time in milliseconds
+ between heartbeats to the consumer coordinator when using
+ Kafka's group management feature. Heartbeats are used to ensure
+ that the consumer's session stays active and to facilitate
+ rebalancing when new consumers join or leave the group. The
+ value must be set lower than session_timeout_ms, but typically
+ should be set no higher than 1/3 of that value. It can be
+ adjusted even lower to control the expected time for normal
+ rebalances. Default: 3000
+ session_timeout_ms (int): The timeout used to detect failures when
+ using Kafka's group managementment facilities. Default: 30000
+ send_buffer_bytes (int): The size of the TCP send buffer
+ (SO_SNDBUF) to use when sending data. Default: 131072
+ receive_buffer_bytes (int): The size of the TCP receive buffer
+ (SO_RCVBUF) to use when reading data. Default: 32768
- def committed(self, topic_partition):
- """
- Get the last committed offset for the given partition (whether the
- commit happened by this process or another).
- Returns: offset_and_metadata
+ Configuration parameters are described in more detail at
+ https://kafka.apache.org/090/configuration.html#newconsumerconfigs
"""
- pass
+ for config in ('bootstrap_servers', 'client_id', 'group_id',
+ 'key_deserializer', 'value_deserializer',
+ 'fetch_max_wait_ms', 'fetch_min_bytes',
+ 'max_partition_fetch_bytes', 'request_timeout_ms',
+ 'retry_backoff_ms', 'reconnect_backoff_ms',
+ 'auto_offset_reset', 'enable_auto_commit',
+ 'auto_commit_interval_ms', 'check_crcs',
+ 'metadata_max_age_ms', 'partition_assignment_strategy',
+ 'heartbeat_interval_ms', 'session_timeout_ms',
+ 'send_buffer_bytes', 'receive_buffer_bytes'):
+ if config in kwargs:
+ setattr(self, '_' + config, kwargs[config])
+
+ self._client = KafkaClient(**kwargs)
+ self._subscription = SubscriptionState(self._auto_offset_reset)
+ self._fetcher = Fetcher(
+ self._client, self._subscription, **kwargs)
+ self._coordinator = ConsumerCoordinator(
+ self._client, self._group_id, self._subscription,
+ assignors=self._partition_assignment_strategy,
+ **kwargs)
+ self._closed = False
+
+ #self.metrics = None
+ if topics:
+ self._subscription.subscribe(topics=topics)
+ self._client.set_topics(topics)
+
+ def assign(self, partitions):
+ """Manually assign a list of TopicPartitions to this consumer.
+
+ This interface does not allow for incremental assignment and will
+ replace the previous assignment (if there was one).
+
+ Manual topic assignment through this method does not use the consumer's
+ group management functionality. As such, there will be no rebalance
+ operation triggered when group membership or cluster and topic metadata
+ change. Note that it is not possible to use both manual partition
+ assignment with assign() and group assignment with subscribe().
- def listTopics(self):
- """
- Get metadata about partitions for all topics that the user is authorized
- to view.
- Returns: {topic: [partition_info]}
- """
- pass
+ Arguments:
+ partitions (list of TopicPartition): assignment for this instance.
- def metrics(self):
- """
- Get the metrics kept by the consumer.
- Returns: {metric_name: metric}
+ Raises:
+ IllegalStateError: if consumer has already called subscribe()
"""
- pass
+ self._subscription.assign_from_user(partitions)
+ self._client.set_topics([tp.topic for tp in partitions])
- def partitionsFor(self, topic):
- """
- Get metadata about the partitions for a given topic.
- Returns: [partition_info]
- """
- pass
+ def assignment(self):
+ """Get the TopicPartitions currently assigned to this consumer.
- def pause(self, *topic_partitions):
- """Suspend fetching from the requested partitions."""
- pass
+ If partitions were directly assigning using assign(), then this will
+ simply return the same partitions that were assigned.
+ If topics were subscribed to using subscribe(), then this will give the
+ set of topic partitions currently assigned to the consumer (which may
+ be none if the assignment hasn't happened yet, or the partitions are in
+ the process of getting reassigned).
- def poll(self, timeout):
- """
- Fetch data for the topics or partitions specified using one of the
- subscribe/assign APIs.
- Returns: [consumer_records]
+ Returns:
+ set: {TopicPartition, ...}
"""
- pass
+ return self._subscription.assigned_partitions()
- def position(self, topic_partition):
- """Get the offset of the next record that will be fetched (if a record
- with that offset exists)."""
- pass
-
- def resume(self, *topic_partitions):
- """Resume specified partitions which have been paused"""
- pass
-
- def seek(self, topic_partition, offset):
- """Overrides the fetch offsets that the consumer will use on the next
- poll(timeout)."""
- pass
-
- def seekToBeginning(self, *topic_partitions):
- """Seek to the first offset for each of the given partitions."""
- pass
-
- def seekToEnd(self, *topic_partitions):
- """Seek to the last offset for each of the given partitions."""
- pass
+ def close(self):
+ """Close the consumer, waiting indefinitely for any needed cleanup."""
+ if self._closed:
+ return
+ log.debug("Closing the KafkaConsumer.")
+ self._closed = True
+ self._coordinator.close()
+ #self.metrics.close()
+ self._client.close()
+ try:
+ self._key_deserializer.close()
+ except AttributeError:
+ pass
+ try:
+ self._value_deserializer.close()
+ except AttributeError:
+ pass
+ log.debug("The KafkaConsumer has closed.")
+
+ def commit_async(self, offsets=None, callback=None):
+ """Commit offsets to kafka asynchronously, optionally firing callback
+
+ This commits offsets only to Kafka. The offsets committed using this API
+ will be used on the first fetch after every rebalance and also on
+ startup. As such, if you need to store offsets in anything other than
+ Kafka, this API should not be used.
+
+ This is an asynchronous call and will not block. Any errors encountered
+ are either passed to the callback (if provided) or discarded.
- def subscribe(self, topics, callback=None):
- """Subscribe to the given list of topics or those matching a regex to get dynamically assigned
- partitions."""
- pass
+ Arguments:
+ offsets (dict, optional): {TopicPartition: OffsetAndMetadata} dict
+ to commit with the configured group_id. Defaults to current
+ consumed offsets for all subscribed partitions.
+ callback (callable, optional): called as callback(offsets, response)
+ with response as either an Exception or a OffsetCommitResponse
+ struct. This callback can be used to trigger custom actions when
+ a commit request completes.
- def subscription(self):
- """
- Get the current subscription.
- Returns: [topic]
+ Returns:
+ kafka.future.Future
"""
- pass
-
- def unsubscribe(self):
- """Unsubscribe from topics currently subscribed with subscribe(List)."""
- pass
+ if offsets is None:
+ offsets = self._subscription.all_consumed_offsets()
+ log.debug("Committing offsets: %s", offsets)
+ future = self._coordinator.commit_offsets_async(
+ offsets, callback=callback)
+ return future
- def wakeup(self):
- """Wakeup the consumer."""
- pass
+ def commit(self, offsets=None):
+ """Commit offsets to kafka, blocking until success or error
- def _configure(self, **configs):
- """Configure the consumer instance
+ This commits offsets only to Kafka. The offsets committed using this API
+ will be used on the first fetch after every rebalance and also on
+ startup. As such, if you need to store offsets in anything other than
+ Kafka, this API should not be used.
- Configuration settings can be passed to constructor,
- otherwise defaults will be used:
-
- Keyword Arguments:
- bootstrap_servers (list): List of initial broker nodes the consumer
- should contact to bootstrap initial cluster metadata. This does
- not have to be the full node list. It just needs to have at
- least one broker that will respond to a Metadata API Request.
- client_id (str): a unique name for this client. Defaults to
- 'kafka.consumer.kafka'.
- group_id (str): the name of the consumer group to join,
- Offsets are fetched / committed to this group name.
- fetch_message_max_bytes (int, optional): Maximum bytes for each
- topic/partition fetch request. Defaults to 1024*1024.
- fetch_min_bytes (int, optional): Minimum amount of data the server
- should return for a fetch request, otherwise wait up to
- fetch_wait_max_ms for more data to accumulate. Defaults to 1.
- fetch_wait_max_ms (int, optional): Maximum time for the server to
- block waiting for fetch_min_bytes messages to accumulate.
- Defaults to 100.
- refresh_leader_backoff_ms (int, optional): Milliseconds to backoff
- when refreshing metadata on errors (subject to random jitter).
- Defaults to 200.
- socket_timeout_ms (int, optional): TCP socket timeout in
- milliseconds. Defaults to 30*1000.
- auto_offset_reset (str, optional): A policy for resetting offsets on
- OffsetOutOfRange errors. 'smallest' will move to the oldest
- available message, 'largest' will move to the most recent. Any
- ofther value will raise the exception. Defaults to 'largest'.
- deserializer_class (callable, optional): Any callable that takes a
- raw message value and returns a deserialized value. Defaults to
- lambda msg: msg.
- auto_commit_enable (bool, optional): Enabling auto-commit will cause
- the KafkaConsumer to periodically commit offsets without an
- explicit call to commit(). Defaults to False.
- auto_commit_interval_ms (int, optional): If auto_commit_enabled,
- the milliseconds between automatic offset commits. Defaults to
- 60 * 1000.
- auto_commit_interval_messages (int, optional): If
- auto_commit_enabled, a number of messages consumed between
- automatic offset commits. Defaults to None (disabled).
- consumer_timeout_ms (int, optional): number of millisecond to throw
- a timeout exception to the consumer if no message is available
- for consumption. Defaults to -1 (dont throw exception).
-
- Configuration parameters are described in more detail at
- http://kafka.apache.org/documentation.html#highlevelconsumerapi
- """
- configs = self._deprecate_configs(**configs)
- self._config.update(configs)
+ Blocks until either the commit succeeds or an unrecoverable error is
+ encountered (in which case it is thrown to the caller).
- if self._config['auto_commit_enable']:
- logger.info('Configuring consumer to auto-commit offsets')
- self._reset_auto_commit()
+ Currently only supports kafka-topic offset storage (not zookeeper)
- def set_topic_partitions(self, *topics):
+ Arguments:
+ offsets (dict, optional): {TopicPartition: OffsetAndMetadata} dict
+ to commit with the configured group_id. Defaults to current
+ consumed offsets for all subscribed partitions.
"""
- Set the topic/partitions to consume
- Optionally specify offsets to start from
-
- Accepts types:
+ if offsets is None:
+ offsets = self._subscription.all_consumed_offsets()
+ self._coordinator.commit_offsets_sync(offsets)
- * str (utf-8): topic name (will consume all available partitions)
- * tuple: (topic, partition)
- * dict:
- - { topic: partition }
- - { topic: [partition list] }
- - { topic: (partition tuple,) }
+ def committed(self, partition):
+ """Get the last committed offset for the given partition
- Optionally, offsets can be specified directly:
+ This offset will be used as the position for the consumer
+ in the event of a failure.
- * tuple: (topic, partition, offset)
- * dict: { (topic, partition): offset, ... }
+ This call may block to do a remote call if the partition in question
+ isn't assigned to this consumer or if the consumer hasn't yet
+ initialized its cache of committed offsets.
- Example:
-
- .. code:: python
-
- kafka = KafkaConsumer()
-
- # Consume topic1-all; topic2-partition2; topic3-partition0
- kafka.set_topic_partitions("topic1", ("topic2", 2), {"topic3": 0})
-
- # Consume topic1-0 starting at offset 12, and topic2-1 at offset 45
- # using tuples --
- kafka.set_topic_partitions(("topic1", 0, 12), ("topic2", 1, 45))
-
- # using dict --
- kafka.set_topic_partitions({ ("topic1", 0): 12, ("topic2", 1): 45 })
+ Arguments:
+ partition (TopicPartition): the partition to check
+ Returns:
+ The last committed offset, or None if there was no prior commit.
"""
- self._cluster.refresh_metadata()
-
- # Handle different topic types
- for arg in topics:
-
- # Topic name str -- all partitions
- if isinstance(arg, (six.string_types, six.binary_type)):
- topic = arg
- for partition in self._cluster.partitions_for_topic(topic):
- self._consume_topic_partition(topic, partition)
-
- # (topic, partition [, offset]) tuple
- elif isinstance(arg, tuple):
- topic = arg[0]
- partition = arg[1]
- self._consume_topic_partition(topic, partition)
- if len(arg) == 3:
- offset = arg[2]
- self._offsets.fetch[(topic, partition)] = offset
-
- # { topic: partitions, ... } dict
- elif isinstance(arg, dict):
- for key, value in six.iteritems(arg):
-
- # key can be string (a topic)
- if isinstance(key, (six.string_types, six.binary_type)):
- topic = key
-
- # topic: partition
- if isinstance(value, int):
- self._consume_topic_partition(topic, value)
-
- # topic: [ partition1, partition2, ... ]
- elif isinstance(value, (list, tuple)):
- for partition in value:
- self._consume_topic_partition(topic, partition)
- else:
- raise KafkaConfigurationError(
- 'Unknown topic type '
- '(dict key must be int or list/tuple of ints)'
- )
-
- # (topic, partition): offset
- elif isinstance(key, tuple):
- topic = key[0]
- partition = key[1]
- self._consume_topic_partition(topic, partition)
- self._offsets.fetch[(topic, partition)] = value
-
+ if self._subscription.is_assigned:
+ committed = self._subscription.assignment[partition].committed
+ if committed is None:
+ self._coordinator.refresh_committed_offsets_if_needed()
+ committed = self._subscription.assignment[partition].committed
+ else:
+ commit_map = self._coordinator.fetch_committed_offsets([partition])
+ if partition in commit_map:
+ committed = commit_map[partition].offset
else:
- raise KafkaConfigurationError('Unknown topic type (%s)' % type(arg))
-
- # If we have a consumer group, try to fetch stored offsets
- if self._config['group_id']:
- self._get_commit_offsets()
-
- # Update missing fetch/commit offsets
- for topic_partition in self._topics:
-
- # Commit offsets default is None
- if topic_partition not in self._offsets.commit:
- self._offsets.commit[topic_partition] = None
-
- # Skip if we already have a fetch offset from user args
- if topic_partition not in self._offsets.fetch:
-
- # Fetch offsets default is (1) commit
- if self._offsets.commit[topic_partition] is not None:
- self._offsets.fetch[topic_partition] = self._offsets.commit[topic_partition]
-
- # or (2) auto reset
- else:
- self._offsets.fetch[topic_partition] = self._reset_partition_offset(topic_partition)
+ committed = None
+ return committed
- # highwater marks (received from server on fetch response)
- # and task_done (set locally by user)
- # should always get initialized to None
- self._reset_highwater_offsets()
- self._reset_task_done_offsets()
+ def _ensure_not_closed(self):
+ if self._closed:
+ raise Errors.IllegalStateError("This consumer has already been closed.")
- # Reset message iterator in case we were in the middle of one
- self._reset_message_iterator()
+ def topics(self):
+ """Get all topic metadata topics the user is authorized to view.
- def next(self):
- """Return the next available message
-
- Blocks indefinitely unless consumer_timeout_ms > 0
+ [Not Implemented Yet]
Returns:
- a single KafkaMessage from the message iterator
+ {topic: [partition_info]}
+ """
+ raise NotImplementedError('TODO')
- Raises:
- ConsumerTimeout after consumer_timeout_ms and no message
+ def partitions_for_topic(self, topic):
+ """Get metadata about the partitions for a given topic.
- Note:
- This is also the method called internally during iteration
+ Arguments:
+ topic (str): topic to check
+ Returns:
+ set: partition ids
"""
- self._set_consumer_timeout_start()
- while True:
-
- try:
- return six.next(self._get_message_iterator())
+ return self._client.cluster.partitions_for_topic(topic)
- # Handle batch completion
- except StopIteration:
- self._reset_message_iterator()
+ def poll(self, timeout_ms=0):
+ """
+ Fetch data for the topics or partitions specified using one of the
+ subscribe/assign APIs. It is an error to not have subscribed to any
+ topics or partitions before polling for data.
- self._check_consumer_timeout()
+ On each poll, consumer will try to use the last consumed offset as the
+ starting offset and fetch sequentially. The last consumed offset can be
+ manually set through seek(partition, offset) or automatically set as
+ the last committed offset for the subscribed list of partitions.
- def fetch_messages(self):
- """Sends FetchRequests for all topic/partitions set for consumption
+ Arguments:
+ timeout_ms (int, optional): milliseconds to spend waiting in poll if
+ data is not available. If 0, returns immediately with any
+ records that are available now. Must not be negative. Default: 0
Returns:
- Generator that yields KafkaMessage structs
- after deserializing with the configured `deserializer_class`
-
- Note:
- Refreshes metadata on errors, and resets fetch offset on
- OffsetOutOfRange, per the configured `auto_offset_reset` policy
-
- See Also:
- Key KafkaConsumer configuration parameters:
- * `fetch_message_max_bytes`
- * `fetch_max_wait_ms`
- * `fetch_min_bytes`
- * `deserializer_class`
- * `auto_offset_reset`
-
+ dict: topic to deque of records since the last fetch for the
+ subscribed list of topics and partitions
"""
+ if timeout_ms < 0:
+ raise Errors.IllegalArgumentError("Timeout must not be negative")
- max_bytes = self._config['fetch_message_max_bytes']
- max_wait_time = self._config['fetch_wait_max_ms']
- min_bytes = self._config['fetch_min_bytes']
-
- if not self._topics:
- raise KafkaConfigurationError('No topics or partitions configured')
-
- if not self._offsets.fetch:
- raise KafkaConfigurationError(
- 'No fetch offsets found when calling fetch_messages'
- )
-
- fetches = [FetchRequest(topic, partition,
- self._offsets.fetch[(topic, partition)],
- max_bytes)
- for (topic, partition) in self._topics]
-
- # send_fetch_request will batch topic/partition requests by leader
- responses = self._client.send_fetch_request(
- fetches,
- max_wait_time=max_wait_time,
- min_bytes=min_bytes,
- fail_on_error=False
- )
-
- for resp in responses:
-
- if isinstance(resp, FailedPayloadsError):
- logger.warning('FailedPayloadsError attempting to fetch data')
- self._refresh_metadata_on_error()
- continue
-
- topic = resp.topic
- partition = resp.partition
- try:
- check_error(resp)
- except OffsetOutOfRangeError:
- logger.warning('OffsetOutOfRange: topic %s, partition %d, '
- 'offset %d (Highwatermark: %d)',
- topic, partition,
- self._offsets.fetch[(topic, partition)],
- resp.highwaterMark)
- # Reset offset
- self._offsets.fetch[(topic, partition)] = (
- self._reset_partition_offset((topic, partition))
- )
- continue
-
- except NotLeaderForPartitionError:
- logger.warning("NotLeaderForPartitionError for %s - %d. "
- "Metadata may be out of date",
- topic, partition)
- self._refresh_metadata_on_error()
- continue
-
- except RequestTimedOutError:
- logger.warning("RequestTimedOutError for %s - %d",
- topic, partition)
- continue
-
- # Track server highwater mark
- self._offsets.highwater[(topic, partition)] = resp.highwaterMark
-
- # Yield each message
- # Kafka-python could raise an exception during iteration
- # we are not catching -- user will need to address
- for (offset, message) in resp.messages:
- # deserializer_class could raise an exception here
- val = self._config['deserializer_class'](message.value)
- msg = KafkaMessage(topic, partition, offset, message.key, val)
-
- # in some cases the server will return earlier messages
- # than we requested. skip them per kafka spec
- if offset < self._offsets.fetch[(topic, partition)]:
- logger.debug('message offset less than fetched offset '
- 'skipping: %s', msg)
- continue
- # Only increment fetch offset
- # if we safely got the message and deserialized
- self._offsets.fetch[(topic, partition)] = offset + 1
-
- # Then yield to user
- yield msg
-
- def get_partition_offsets(self, topic, partition, request_time_ms, max_num_offsets):
- """Request available fetch offsets for a single topic/partition
+ # poll for new data until the timeout expires
+ start = time.time()
+ remaining = timeout_ms
+ while True:
+ records = self._poll_once(remaining)
+ if records:
+ # before returning the fetched records, we can send off the
+ # next round of fetches and avoid block waiting for their
+ # responses to enable pipelining while the user is handling the
+ # fetched records.
+ self._fetcher.init_fetches()
+ return records
+
+ elapsed_ms = (time.time() - start) * 1000
+ remaining = timeout_ms - elapsed_ms
+
+ if remaining <= 0:
+ break
+
+ def _poll_once(self, timeout_ms):
+ """
+ Do one round of polling. In addition to checking for new data, this does
+ any needed heart-beating, auto-commits, and offset updates.
- Keyword Arguments:
- topic (str): topic for offset request
- partition (int): partition for offset request
- request_time_ms (int): Used to ask for all messages before a
- certain time (ms). There are two special values.
- Specify -1 to receive the latest offset (i.e. the offset of the
- next coming message) and -2 to receive the earliest available
- offset. Note that because offsets are pulled in descending
- order, asking for the earliest offset will always return you a
- single element.
- max_num_offsets (int): Maximum offsets to include in the OffsetResponse
+ Arguments:
+ timeout_ms (int): The maximum time in milliseconds to block
Returns:
- a list of offsets in the OffsetResponse submitted for the provided
- topic / partition. See:
- https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI
+ dict: map of topic to deque of records (may be empty)
"""
- reqs = [OffsetRequest(topic, partition, request_time_ms, max_num_offsets)]
+ # TODO: Sub-requests should take into account the poll timeout (KAFKA-1894)
+ self._coordinator.ensure_coordinator_known()
- (resp,) = self._client.send_offset_request(reqs)
+ # ensure we have partitions assigned if we expect to
+ if self._subscription.partitions_auto_assigned():
+ self._coordinator.ensure_active_group()
- check_error(resp)
+ # fetch positions if we have partitions we're subscribed to that we
+ # don't know the offset for
+ if not self._subscription.has_all_fetch_positions():
+ self._update_fetch_positions(self._subscription.missing_fetch_positions())
- # Just for sanity..
- # probably unnecessary
- assert resp.topic == topic
- assert resp.partition == partition
+ # init any new fetches (won't resend pending fetches)
+ records = self._fetcher.fetched_records()
- return resp.offsets
+ # if data is available already, e.g. from a previous network client
+ # poll() call to commit, then just return it immediately
+ if records:
+ return records
- def offsets(self, group=None):
- """Get internal consumer offset values
+ self._fetcher.init_fetches()
+ self._client.poll(timeout_ms / 1000.0)
+ return self._fetcher.fetched_records()
- Keyword Arguments:
- group: Either "fetch", "commit", "task_done", or "highwater".
- If no group specified, returns all groups.
+ def position(self, partition):
+ """Get the offset of the next record that will be fetched
- Returns:
- A copy of internal offsets struct
+ Arguments:
+ partition (TopicPartition): partition to check
"""
- if not group:
- return {
- 'fetch': self.offsets('fetch'),
- 'commit': self.offsets('commit'),
- 'task_done': self.offsets('task_done'),
- 'highwater': self.offsets('highwater')
- }
- else:
- return dict(deepcopy(getattr(self._offsets, group)))
+ if not self._subscription.is_assigned(partition):
+ raise Errors.IllegalStateError("You can only check the position for partitions assigned to this consumer.")
+ offset = self._subscription.assignment[partition].consumed
+ if offset is None:
+ self._update_fetch_positions(partition)
+ offset = self._subscription.assignment[partition].consumed
+ return offset
- def task_done(self, message):
- """Mark a fetched message as consumed.
+ def pause(self, *partitions):
+ """Suspend fetching from the requested partitions.
- Offsets for messages marked as "task_done" will be stored back
- to the kafka cluster for this consumer group on commit()
+ Future calls to poll() will not return any records from these partitions
+ until they have been resumed using resume(). Note that this method does
+ not affect partition subscription. In particular, it does not cause a
+ group rebalance when automatic assignment is used.
Arguments:
- message (KafkaMessage): the message to mark as complete
-
- Returns:
- True, unless the topic-partition for this message has not
- been configured for the consumer. In normal operation, this
- should not happen. But see github issue 364.
+ *partitions (TopicPartition): partitions to pause
"""
- topic_partition = (message.topic, message.partition)
- if topic_partition not in self._topics:
- logger.warning('Unrecognized topic/partition in task_done message: '
- '{0}:{1}'.format(*topic_partition))
- return False
+ for partition in partitions:
+ log.debug("Pausing partition %s", partition)
+ self._subscription.pause(partition)
- offset = message.offset
+ def resume(self, *partitions):
+ """Resume fetching from the specified (paused) partitions.
- # Warn on non-contiguous offsets
- prev_done = self._offsets.task_done[topic_partition]
- if prev_done is not None and offset != (prev_done + 1):
- logger.warning('Marking task_done on a non-continuous offset: %d != %d + 1',
- offset, prev_done)
+ Arguments:
+ *partitions (TopicPartition): partitions to resume
+ """
+ for partition in partitions:
+ log.debug("Resuming partition %s", partition)
+ self._subscription.resume(partition)
+
+ def seek(self, partition, offset):
+ """Manually specify the fetch offset for a TopicPartition
+
+ Overrides the fetch offsets that the consumer will use on the next
+ poll(). If this API is invoked for the same partition more than once,
+ the latest offset will be used on the next poll(). Note that you may
+ lose data if this API is arbitrarily used in the middle of consumption,
+ to reset the fetch offsets.
+ """
+ if offset < 0:
+ raise Errors.IllegalStateError("seek offset must not be a negative number")
+ log.debug("Seeking to offset %s for partition %s", offset, partition)
+ self._subscription.assignment[partition].seek(offset)
- # Warn on smaller offsets than previous commit
- # "commit" offsets are actually the offset of the next message to fetch.
- prev_commit = self._offsets.commit[topic_partition]
- if prev_commit is not None and ((offset + 1) <= prev_commit):
- logger.warning('Marking task_done on a previously committed offset?: %d (+1) <= %d',
- offset, prev_commit)
+ def seek_to_beginning(self, *partitions):
+ """Seek to the oldest available offset for partitions.
- self._offsets.task_done[topic_partition] = offset
+ Arguments:
+ *partitions: optionally provide specific TopicPartitions, otherwise
+ default to all assigned partitions
+ """
+ if not partitions:
+ partitions = self._subscription.assigned_partitions()
+ for tp in partitions:
+ log.debug("Seeking to beginning of partition %s", tp)
+ self._subscription.need_offset_reset(tp, OffsetResetStrategy.EARLIEST)
- # Check for auto-commit
- if self._does_auto_commit_messages():
- self._incr_auto_commit_message_count()
+ def seek_to_end(self, *partitions):
+ """Seek to the most recent available offset for partitions.
- if self._should_auto_commit():
- self.commit()
+ Arguments:
+ *partitions: optionally provide specific TopicPartitions, otherwise
+ default to all assigned partitions
+ """
+ if not partitions:
+ partitions = self._subscription.assigned_partitions()
+ for tp in partitions:
+ log.debug("Seeking to end of partition %s", tp)
+ self._subscription.need_offset_reset(tp, OffsetResetStrategy.LATEST)
- return True
+ def subscribe(self, topics=(), pattern=None, listener=None):
+ """Subscribe to a list of topics, or a topic regex pattern
- def commit(self):
- """Store consumed message offsets (marked via task_done())
- to kafka cluster for this consumer_group.
+ Partitions will be dynamically assigned via a group coordinator.
+ Topic subscriptions are not incremental: this list will replace the
+ current assignment (if there is one).
- Returns:
- True on success, or False if no offsets were found for commit
+ This method is incompatible with assign()
- Note:
- this functionality requires server version >=0.8.1.1
- https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
+ Arguments:
+ topics (list): List of topics for subscription.
+ pattern (str): Pattern to match available topics. You must provide
+ either topics or pattern, but not both.
+ listener (ConsumerRebalanceListener): Optionally include listener
+ callback, which will be called before and after each rebalance
+ operation.
+
+ As part of group management, the consumer will keep track of the
+ list of consumers that belong to a particular group and will
+ trigger a rebalance operation if one of the following events
+ trigger:
+
+ * Number of partitions change for any of the subscribed topics
+ * Topic is created or deleted
+ * An existing member of the consumer group dies
+ * A new member is added to the consumer group
+
+ When any of these events are triggered, the provided listener
+ will be invoked first to indicate that the consumer's assignment
+ has been revoked, and then again when the new assignment has
+ been received. Note that this listener will immediately override
+ any listener set in a previous call to subscribe. It is
+ guaranteed, however, that the partitions revoked/assigned
+ through this interface are from topics subscribed in this call.
"""
- if not self._config['group_id']:
- logger.warning('Cannot commit without a group_id!')
- raise KafkaConfigurationError(
- 'Attempted to commit offsets '
- 'without a configured consumer group (group_id)'
- )
-
- # API supports storing metadata with each commit
- # but for now it is unused
- metadata = b''
-
- offsets = self._offsets.task_done
- commits = []
- for topic_partition, task_done_offset in six.iteritems(offsets):
-
- # Skip if None
- if task_done_offset is None:
- continue
-
- # Commit offsets as the next offset to fetch
- # which is consistent with the Java Client
- # task_done is marked by messages consumed,
- # so add one to mark the next message for fetching
- commit_offset = (task_done_offset + 1)
-
- # Skip if no change from previous committed
- if commit_offset == self._offsets.commit[topic_partition]:
- continue
-
- commits.append(
- OffsetCommitRequest(topic_partition[0], topic_partition[1],
- commit_offset, metadata)
- )
-
- if commits:
- logger.info('committing consumer offsets to group %s', self._config['group_id'])
- resps = self._client.send_offset_commit_request(
- self._config['group_id'], commits,
- fail_on_error=False
- )
-
- for r in resps:
- check_error(r)
- topic_partition = (r.topic, r.partition)
- task_done = self._offsets.task_done[topic_partition]
- self._offsets.commit[topic_partition] = (task_done + 1)
-
- if self._config['auto_commit_enable']:
- self._reset_auto_commit()
-
- return True
-
+ if not topics:
+ self.unsubscribe()
else:
- logger.info('No new offsets found to commit in group %s', self._config['group_id'])
- return False
-
- #
- # Topic/partition management private methods
- #
-
- def _consume_topic_partition(self, topic, partition):
- if not isinstance(partition, int):
- raise KafkaConfigurationError('Unknown partition type (%s) '
- '-- expected int' % type(partition))
-
- if topic not in self._cluster.topics():
- raise UnknownTopicOrPartitionError("Topic %s not found in broker metadata" % topic)
- if partition not in self._cluster.partitions_for_topic(topic):
- raise UnknownTopicOrPartitionError("Partition %d not found in Topic %s "
- "in broker metadata" % (partition, topic))
- logger.info("Configuring consumer to fetch topic '%s', partition %d", topic, partition)
- self._topics.append((topic, partition))
-
- def _refresh_metadata_on_error(self):
- refresh_ms = self._config['refresh_leader_backoff_ms']
- jitter_pct = 0.20
- sleep_ms = random.randint(
- int((1.0 - 0.5 * jitter_pct) * refresh_ms),
- int((1.0 + 0.5 * jitter_pct) * refresh_ms)
- )
- while True:
- logger.info("Sleeping for refresh_leader_backoff_ms: %d", sleep_ms)
- time.sleep(sleep_ms / 1000.0)
- try:
- self._client.load_metadata_for_topics()
- except KafkaUnavailableError:
- logger.warning("Unable to refresh topic metadata... cluster unavailable")
- self._check_consumer_timeout()
+ self._subscription.subscribe(topics=topics,
+ pattern=pattern,
+ listener=listener)
+ # regex will need all topic metadata
+ if pattern is not None:
+ self._client.cluster.need_metadata_for_all = True
+ log.debug("Subscribed to topic pattern: %s", topics)
else:
- logger.info("Topic metadata refreshed")
- return
-
- #
- # Offset-managment private methods
- #
-
- def _get_commit_offsets(self):
- logger.info("Consumer fetching stored offsets")
- for topic_partition in self._topics:
- (resp,) = self._client.send_offset_fetch_request(
- self._config['group_id'],
- [OffsetFetchRequest(topic_partition[0], topic_partition[1])],
- fail_on_error=False)
- try:
- check_error(resp)
- # API spec says server wont set an error here
- # but 0.8.1.1 does actually...
- except UnknownTopicOrPartitionError:
- pass
-
- # -1 offset signals no commit is currently stored
- if resp.offset == -1:
- self._offsets.commit[topic_partition] = None
-
- # Otherwise we committed the stored offset
- # and need to fetch the next one
- else:
- self._offsets.commit[topic_partition] = resp.offset
-
- def _reset_highwater_offsets(self):
- for topic_partition in self._topics:
- self._offsets.highwater[topic_partition] = None
-
- def _reset_task_done_offsets(self):
- for topic_partition in self._topics:
- self._offsets.task_done[topic_partition] = None
-
- def _reset_partition_offset(self, topic_partition):
- (topic, partition) = topic_partition
- LATEST = -1
- EARLIEST = -2
-
- request_time_ms = None
- if self._config['auto_offset_reset'] == 'largest':
- request_time_ms = LATEST
- elif self._config['auto_offset_reset'] == 'smallest':
- request_time_ms = EARLIEST
- else:
+ self._client.set_topics(self._subscription.group_subscription())
+ log.debug("Subscribed to topic(s): %s", topics)
- # Let's raise an reasonable exception type if user calls
- # outside of an exception context
- if sys.exc_info() == (None, None, None):
- raise OffsetOutOfRangeError('Cannot reset partition offsets without a '
- 'valid auto_offset_reset setting '
- '(largest|smallest)')
+ def subscription(self):
+ """Get the current topic subscription.
- # Otherwise we should re-raise the upstream exception
- # b/c it typically includes additional data about
- # the request that triggered it, and we do not want to drop that
- raise
+ Returns:
+ set: {topic, ...}
+ """
+ return self._subscription.subscription
- (offset, ) = self.get_partition_offsets(topic, partition,
- request_time_ms, max_num_offsets=1)
- return offset
+ def unsubscribe(self):
+ """Unsubscribe from all topics and clear all assigned partitions."""
+ self._subscription.unsubscribe()
+ self._coordinator.close()
+ self._client.cluster.need_metadata_for_all_topics = False
+ log.debug("Unsubscribed all topics or patterns and assigned partitions")
+
+ def _update_fetch_positions(self, partitions):
+ """
+ Set the fetch position to the committed position (if there is one)
+ or reset it using the offset reset policy the user has configured.
+
+ Arguments:
+ partitions (List[TopicPartition]): The partitions that need
+ updating fetch positions
+
+ Raises:
+ NoOffsetForPartitionError: If no offset is stored for a given
+ partition and no offset reset policy is defined
+ """
+ # refresh commits for all assigned partitions
+ self._coordinator.refresh_committed_offsets_if_needed()
- #
- # Consumer Timeout private methods
- #
-
- def _set_consumer_timeout_start(self):
- self._consumer_timeout = False
- if self._config['consumer_timeout_ms'] >= 0:
- self._consumer_timeout = time.time() + (self._config['consumer_timeout_ms'] / 1000.0)
-
- def _check_consumer_timeout(self):
- if self._consumer_timeout and time.time() > self._consumer_timeout:
- raise ConsumerTimeout('Consumer timed out after %d ms' % + self._config['consumer_timeout_ms'])
-
- #
- # Autocommit private methods
- #
-
- def _should_auto_commit(self):
- if self._does_auto_commit_ms():
- if time.time() >= self._next_commit_time:
- return True
-
- if self._does_auto_commit_messages():
- if self._uncommitted_message_count >= self._config['auto_commit_interval_messages']:
- return True
-
- return False
-
- def _reset_auto_commit(self):
- if not self._config['group_id']:
- raise KafkaConfigurationError('auto_commit requires group_id')
- self._uncommitted_message_count = 0
- self._next_commit_time = None
- if self._does_auto_commit_ms():
- self._next_commit_time = time.time() + (self._config['auto_commit_interval_ms'] / 1000.0)
-
- def _incr_auto_commit_message_count(self, n=1):
- self._uncommitted_message_count += n
-
- def _does_auto_commit_ms(self):
- if not self._config['auto_commit_enable']:
- return False
-
- conf = self._config['auto_commit_interval_ms']
- if conf is not None and conf > 0:
- return True
- return False
-
- def _does_auto_commit_messages(self):
- if not self._config['auto_commit_enable']:
- return False
-
- conf = self._config['auto_commit_interval_messages']
- if conf is not None and conf > 0:
- return True
- return False
-
- #
- # Message iterator private methods
- #
-
- def __iter__(self):
- return self
-
- def __next__(self):
- return self.next()
-
- def _get_message_iterator(self):
- # Fetch a new batch if needed
- if self._msg_iter is None:
- self._msg_iter = self.fetch_messages()
-
- return self._msg_iter
-
- def _reset_message_iterator(self):
- self._msg_iter = None
-
- #
- # python private methods
- #
-
- def __repr__(self):
- return '<{0} topics=({1})>'.format(
- self.__class__.__name__,
- '|'.join(["%s-%d" % topic_partition
- for topic_partition in self._topics])
- )
-
- #
- # other private methods
- #
-
- def _deprecate_configs(self, **configs):
- for old, new in six.iteritems(DEPRECATED_CONFIG_KEYS):
- if old in configs:
- logger.warning('Deprecated Kafka Consumer configuration: %s. '
- 'Please use %s instead.', old, new)
- old_value = configs.pop(old)
- if new not in configs:
- configs[new] = old_value
- return configs
+ # then do any offset lookups in case some positions are not known
+ self._fetcher.update_fetch_positions(partitions)