summaryrefslogtreecommitdiff
path: root/kafka/consumer/group.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer/group.py')
-rw-r--r--kafka/consumer/group.py256
1 files changed, 127 insertions, 129 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index bd9d03d..9ce1438 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -18,7 +18,114 @@ log = logging.getLogger(__name__)
class KafkaConsumer(six.Iterator):
- """Consumer for Kafka 0.9"""
+ """Consume records from a Kafka cluster.
+
+ The consumer will transparently handle the failure of servers in the Kafka
+ cluster, and adapt as topic-partitions are created or migrate between
+ brokers. It also interacts with the assigned kafka Group Coordinator node
+ to allow multiple consumers to load balance consumption of topics (requires
+ kafka >= 0.9.0.0).
+
+ Arguments:
+ *topics (str): optional list of topics to subscribe to. If not set,
+ call subscribe() or assign() before consuming records.
+
+ Keyword Arguments:
+ bootstrap_servers: 'host[:port]' string (or list of 'host[:port]'
+ strings) that the consumer should contact to bootstrap initial
+ cluster metadata. This does not have to be the full node list.
+ It just needs to have at least one broker that will respond to a
+ Metadata API Request. Default port is 9092. If no servers are
+ specified, will default to localhost:9092.
+ client_id (str): a name for this client. This string is passed in
+ each request to servers and can be used to identify specific
+ server-side log entries that correspond to this client. Also
+ submitted to GroupCoordinator for logging with respect to
+ consumer group administration. Default: 'kafka-python-{version}'
+ group_id (str): name of the consumer group to join for dynamic
+ partition assignment (if enabled), and to use for fetching and
+ committing offsets. Default: 'kafka-python-default-group'
+ key_deserializer (callable): Any callable that takes a
+ raw message key and returns a deserialized key.
+ value_deserializer (callable, optional): Any callable that takes a
+ raw message value and returns a deserialized value.
+ fetch_min_bytes (int): Minimum amount of data the server should
+ return for a fetch request, otherwise wait up to
+ fetch_max_wait_ms for more data to accumulate. Default: 1024.
+ fetch_max_wait_ms (int): The maximum amount of time in milliseconds
+ the server will block before answering the fetch request if
+ there isn't sufficient data to immediately satisfy the
+ requirement given by fetch_min_bytes. Default: 500.
+ max_partition_fetch_bytes (int): The maximum amount of data
+ per-partition the server will return. The maximum total memory
+ used for a request = #partitions * max_partition_fetch_bytes.
+ This size must be at least as large as the maximum message size
+ the server allows or else it is possible for the producer to
+ send messages larger than the consumer can fetch. If that
+ happens, the consumer can get stuck trying to fetch a large
+ message on a certain partition. Default: 1048576.
+ request_timeout_ms (int): Client request timeout in milliseconds.
+ Default: 40000.
+ retry_backoff_ms (int): Milliseconds to backoff when retrying on
+ errors. Default: 100.
+ reconnect_backoff_ms (int): The amount of time in milliseconds to
+ wait before attempting to reconnect to a given host.
+ Default: 50.
+ max_in_flight_requests_per_connection (int): Requests are pipelined
+ to kafka brokers up to this number of maximum requests per
+ broker connection. Default: 5.
+ auto_offset_reset (str): A policy for resetting offsets on
+ OffsetOutOfRange errors: 'earliest' will move to the oldest
+ available message, 'latest' will move to the most recent. Any
+ ofther value will raise the exception. Default: 'latest'.
+ enable_auto_commit (bool): If true the consumer's offset will be
+ periodically committed in the background. Default: True.
+ auto_commit_interval_ms (int): milliseconds between automatic
+ offset commits, if enable_auto_commit is True. Default: 5000.
+ default_offset_commit_callback (callable): called as
+ callback(offsets, response) response will be either an Exception
+ or a OffsetCommitResponse struct. This callback can be used to
+ trigger custom actions when a commit request completes.
+ check_crcs (bool): Automatically check the CRC32 of the records
+ consumed. This ensures no on-the-wire or on-disk corruption to
+ the messages occurred. This check adds some overhead, so it may
+ be disabled in cases seeking extreme performance. Default: True
+ metadata_max_age_ms (int): The period of time in milliseconds after
+ which we force a refresh of metadata even if we haven't seen any
+ partition leadership changes to proactively discover any new
+ brokers or partitions. Default: 300000
+ partition_assignment_strategy (list): List of objects to use to
+ distribute partition ownership amongst consumer instances when
+ group management is used. Default: [RoundRobinPartitionAssignor]
+ heartbeat_interval_ms (int): The expected time in milliseconds
+ between heartbeats to the consumer coordinator when using
+ Kafka's group management feature. Heartbeats are used to ensure
+ that the consumer's session stays active and to facilitate
+ rebalancing when new consumers join or leave the group. The
+ value must be set lower than session_timeout_ms, but typically
+ should be set no higher than 1/3 of that value. It can be
+ adjusted even lower to control the expected time for normal
+ rebalances. Default: 3000
+ session_timeout_ms (int): The timeout used to detect failures when
+ using Kafka's group managementment facilities. Default: 30000
+ send_buffer_bytes (int): The size of the TCP send buffer
+ (SO_SNDBUF) to use when sending data. Default: 131072
+ receive_buffer_bytes (int): The size of the TCP receive buffer
+ (SO_RCVBUF) to use when reading data. Default: 32768
+ consumer_timeout_ms (int): number of millisecond to throw a timeout
+ exception to the consumer if no message is available for
+ consumption. Default: -1 (dont throw exception)
+ api_version (str): specify which kafka API version to use.
+ 0.9 enables full group coordination features; 0.8.2 enables
+ kafka-storage offset commits; 0.8.1 enables zookeeper-storage
+ offset commits; 0.8.0 is what is left. If set to 'auto', will
+ attempt to infer the broker version by probing various APIs.
+ Default: auto
+
+ Note:
+ Configuration parameters are described in more detail at
+ https://kafka.apache.org/090/configuration.html#newconsumerconfigs
+ """
DEFAULT_CONFIG = {
'bootstrap_servers': 'localhost',
'client_id': 'kafka-python-' + __version__,
@@ -51,114 +158,6 @@ class KafkaConsumer(six.Iterator):
}
def __init__(self, *topics, **configs):
- """A Kafka client that consumes records from a Kafka cluster.
-
- The consumer will transparently handle the failure of servers in the
- Kafka cluster, and transparently adapt as partitions of data it fetches
- migrate within the cluster. This client also interacts with the server
- to allow groups of consumers to load balance consumption using consumer
- groups.
-
- Requires Kafka Server >= 0.9.0.0
-
- Configuration settings can be passed to constructor as kwargs,
- otherwise defaults will be used:
-
- Keyword Arguments:
- bootstrap_servers: 'host[:port]' string (or list of 'host[:port]'
- strings) that the consumer should contact to bootstrap initial
- cluster metadata. This does not have to be the full node list.
- It just needs to have at least one broker that will respond to a
- Metadata API Request. Default port is 9092. If no servers are
- specified, will default to localhost:9092.
- client_id (str): a name for this client. This string is passed in
- each request to servers and can be used to identify specific
- server-side log entries that correspond to this client. Also
- submitted to GroupCoordinator for logging with respect to
- consumer group administration. Default: 'kafka-python-{version}'
- group_id (str): name of the consumer group to join for dynamic
- partition assignment (if enabled), and to use for fetching and
- committing offsets. Default: 'kafka-python-default-group'
- key_deserializer (callable): Any callable that takes a
- raw message key and returns a deserialized key.
- value_deserializer (callable, optional): Any callable that takes a
- raw message value and returns a deserialized value.
- fetch_min_bytes (int): Minimum amount of data the server should
- return for a fetch request, otherwise wait up to
- fetch_max_wait_ms for more data to accumulate. Default: 1024.
- fetch_max_wait_ms (int): The maximum amount of time in milliseconds
- the server will block before answering the fetch request if
- there isn't sufficient data to immediately satisfy the
- requirement given by fetch_min_bytes. Default: 500.
- max_partition_fetch_bytes (int): The maximum amount of data
- per-partition the server will return. The maximum total memory
- used for a request = #partitions * max_partition_fetch_bytes.
- This size must be at least as large as the maximum message size
- the server allows or else it is possible for the producer to
- send messages larger than the consumer can fetch. If that
- happens, the consumer can get stuck trying to fetch a large
- message on a certain partition. Default: 1048576.
- request_timeout_ms (int): Client request timeout in milliseconds.
- Default: 40000.
- retry_backoff_ms (int): Milliseconds to backoff when retrying on
- errors. Default: 100.
- reconnect_backoff_ms (int): The amount of time in milliseconds to
- wait before attempting to reconnect to a given host.
- Default: 50.
- max_in_flight_requests_per_connection (int): Requests are pipelined
- to kafka brokers up to this number of maximum requests per
- broker connection. Default: 5.
- auto_offset_reset (str): A policy for resetting offsets on
- OffsetOutOfRange errors: 'earliest' will move to the oldest
- available message, 'latest' will move to the most recent. Any
- ofther value will raise the exception. Default: 'latest'.
- enable_auto_commit (bool): If true the consumer's offset will be
- periodically committed in the background. Default: True.
- auto_commit_interval_ms (int): milliseconds between automatic
- offset commits, if enable_auto_commit is True. Default: 5000.
- default_offset_commit_callback (callable): called as
- callback(offsets, response) response will be either an Exception
- or a OffsetCommitResponse struct. This callback can be used to
- trigger custom actions when a commit request completes.
- check_crcs (bool): Automatically check the CRC32 of the records
- consumed. This ensures no on-the-wire or on-disk corruption to
- the messages occurred. This check adds some overhead, so it may
- be disabled in cases seeking extreme performance. Default: True
- metadata_max_age_ms (int): The period of time in milliseconds after
- which we force a refresh of metadata even if we haven't seen any
- partition leadership changes to proactively discover any new
- brokers or partitions. Default: 300000
- partition_assignment_strategy (list): List of objects to use to
- distribute partition ownership amongst consumer instances when
- group management is used. Default: [RoundRobinPartitionAssignor]
- heartbeat_interval_ms (int): The expected time in milliseconds
- between heartbeats to the consumer coordinator when using
- Kafka's group management feature. Heartbeats are used to ensure
- that the consumer's session stays active and to facilitate
- rebalancing when new consumers join or leave the group. The
- value must be set lower than session_timeout_ms, but typically
- should be set no higher than 1/3 of that value. It can be
- adjusted even lower to control the expected time for normal
- rebalances. Default: 3000
- session_timeout_ms (int): The timeout used to detect failures when
- using Kafka's group managementment facilities. Default: 30000
- send_buffer_bytes (int): The size of the TCP send buffer
- (SO_SNDBUF) to use when sending data. Default: 131072
- receive_buffer_bytes (int): The size of the TCP receive buffer
- (SO_RCVBUF) to use when reading data. Default: 32768
- consumer_timeout_ms (int): number of millisecond to throw a timeout
- exception to the consumer if no message is available for
- consumption. Default: -1 (dont throw exception)
- api_version (str): specify which kafka API version to use.
- 0.9 enables full group coordination features; 0.8.2 enables
- kafka-storage offset commits; 0.8.1 enables zookeeper-storage
- offset commits; 0.8.0 is what is left. If set to 'auto', will
- attempt to infer the broker version by probing various APIs.
- Default: auto
-
- Configuration parameters are described in more detail at
- https://kafka.apache.org/090/configuration.html#newconsumerconfigs
- """
self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config:
if key in configs:
@@ -204,20 +203,25 @@ class KafkaConsumer(six.Iterator):
def assign(self, partitions):
"""Manually assign a list of TopicPartitions to this consumer.
- This interface does not allow for incremental assignment and will
- replace the previous assignment (if there was one).
-
- Manual topic assignment through this method does not use the consumer's
- group management functionality. As such, there will be no rebalance
- operation triggered when group membership or cluster and topic metadata
- change. Note that it is not possible to use both manual partition
- assignment with assign() and group assignment with subscribe().
-
Arguments:
partitions (list of TopicPartition): assignment for this instance.
Raises:
IllegalStateError: if consumer has already called subscribe()
+
+ Warning:
+ It is not possible to use both manual partition assignment with
+ assign() and group assignment with subscribe().
+
+ Note:
+ This interface does not support incremental assignment and will
+ replace the previous assignment (if there was one).
+
+ Note:
+ Manual topic assignment through this method does not use the
+ consumer's group management functionality. As such, there will be
+ no rebalance operation triggered when group membership or cluster
+ and topic metadata change.
"""
self._subscription.assign_from_user(partitions)
self._client.set_topics([tp.topic for tp in partitions])
@@ -225,12 +229,12 @@ class KafkaConsumer(six.Iterator):
def assignment(self):
"""Get the TopicPartitions currently assigned to this consumer.
- If partitions were directly assigning using assign(), then this will
- simply return the same partitions that were assigned.
- If topics were subscribed to using subscribe(), then this will give the
+ If partitions were directly assigned using assign(), then this will
+ simply return the same partitions that were previously assigned.
+ If topics were subscribed using subscribe(), then this will give the
set of topic partitions currently assigned to the consumer (which may
- be none if the assignment hasn't happened yet, or the partitions are in
- the process of getting reassigned).
+ be none if the assignment hasn't happened yet, or if the partitions are
+ in the process of being reassigned).
Returns:
set: {TopicPartition, ...}
@@ -654,31 +658,25 @@ class KafkaConsumer(six.Iterator):
# old KafkaConsumer methods are deprecated
def configure(self, **configs):
- """DEPRECATED -- initialize a new consumer"""
raise NotImplementedError(
'deprecated -- initialize a new consumer')
def set_topic_partitions(self, *topics):
- """DEPRECATED -- use subscribe() or assign()"""
raise NotImplementedError(
'deprecated -- use subscribe() or assign()')
def fetch_messages(self):
- """DEPRECATED -- use poll() or iterator interface"""
raise NotImplementedError(
'deprecated -- use poll() or iterator interface')
def get_partition_offsets(self, topic, partition,
request_time_ms, max_num_offsets):
- """DEPRECATED -- send OffsetRequest with KafkaClient"""
raise NotImplementedError(
'deprecated -- send an OffsetRequest with KafkaClient')
def offsets(self, group=None):
- """DEPRECATED -- use committed(partition)"""
raise NotImplementedError('deprecated -- use committed(partition)')
def task_done(self, message):
- """DEPRECATED -- commit manually if needed"""
raise NotImplementedError(
'deprecated -- commit offsets manually if needed')