diff options
author | Dana Powers <dana.powers@rd.io> | 2016-01-07 17:03:08 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2016-01-07 17:03:08 -0800 |
commit | e080c6b0cdb54563e3c5ad595d582de26561d9f0 (patch) | |
tree | 0a3e686336667bb98c4be4bb324292fa29767a67 /kafka/consumer | |
parent | c8deb0c276d57209006eebdd910017846860a38d (diff) | |
download | kafka-python-e080c6b0cdb54563e3c5ad595d582de26561d9f0.tar.gz |
Docstring updates
Diffstat (limited to 'kafka/consumer')
-rw-r--r-- | kafka/consumer/group.py | 256 |
1 files changed, 127 insertions, 129 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index bd9d03d..9ce1438 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -18,7 +18,114 @@ log = logging.getLogger(__name__) class KafkaConsumer(six.Iterator): - """Consumer for Kafka 0.9""" + """Consume records from a Kafka cluster. + + The consumer will transparently handle the failure of servers in the Kafka + cluster, and adapt as topic-partitions are created or migrate between + brokers. It also interacts with the assigned kafka Group Coordinator node + to allow multiple consumers to load balance consumption of topics (requires + kafka >= 0.9.0.0). + + Arguments: + *topics (str): optional list of topics to subscribe to. If not set, + call subscribe() or assign() before consuming records. + + Keyword Arguments: + bootstrap_servers: 'host[:port]' string (or list of 'host[:port]' + strings) that the consumer should contact to bootstrap initial + cluster metadata. This does not have to be the full node list. + It just needs to have at least one broker that will respond to a + Metadata API Request. Default port is 9092. If no servers are + specified, will default to localhost:9092. + client_id (str): a name for this client. This string is passed in + each request to servers and can be used to identify specific + server-side log entries that correspond to this client. Also + submitted to GroupCoordinator for logging with respect to + consumer group administration. Default: 'kafka-python-{version}' + group_id (str): name of the consumer group to join for dynamic + partition assignment (if enabled), and to use for fetching and + committing offsets. Default: 'kafka-python-default-group' + key_deserializer (callable): Any callable that takes a + raw message key and returns a deserialized key. + value_deserializer (callable, optional): Any callable that takes a + raw message value and returns a deserialized value. + fetch_min_bytes (int): Minimum amount of data the server should + return for a fetch request, otherwise wait up to + fetch_max_wait_ms for more data to accumulate. Default: 1024. + fetch_max_wait_ms (int): The maximum amount of time in milliseconds + the server will block before answering the fetch request if + there isn't sufficient data to immediately satisfy the + requirement given by fetch_min_bytes. Default: 500. + max_partition_fetch_bytes (int): The maximum amount of data + per-partition the server will return. The maximum total memory + used for a request = #partitions * max_partition_fetch_bytes. + This size must be at least as large as the maximum message size + the server allows or else it is possible for the producer to + send messages larger than the consumer can fetch. If that + happens, the consumer can get stuck trying to fetch a large + message on a certain partition. Default: 1048576. + request_timeout_ms (int): Client request timeout in milliseconds. + Default: 40000. + retry_backoff_ms (int): Milliseconds to backoff when retrying on + errors. Default: 100. + reconnect_backoff_ms (int): The amount of time in milliseconds to + wait before attempting to reconnect to a given host. + Default: 50. + max_in_flight_requests_per_connection (int): Requests are pipelined + to kafka brokers up to this number of maximum requests per + broker connection. Default: 5. + auto_offset_reset (str): A policy for resetting offsets on + OffsetOutOfRange errors: 'earliest' will move to the oldest + available message, 'latest' will move to the most recent. Any + ofther value will raise the exception. Default: 'latest'. + enable_auto_commit (bool): If true the consumer's offset will be + periodically committed in the background. Default: True. + auto_commit_interval_ms (int): milliseconds between automatic + offset commits, if enable_auto_commit is True. Default: 5000. + default_offset_commit_callback (callable): called as + callback(offsets, response) response will be either an Exception + or a OffsetCommitResponse struct. This callback can be used to + trigger custom actions when a commit request completes. + check_crcs (bool): Automatically check the CRC32 of the records + consumed. This ensures no on-the-wire or on-disk corruption to + the messages occurred. This check adds some overhead, so it may + be disabled in cases seeking extreme performance. Default: True + metadata_max_age_ms (int): The period of time in milliseconds after + which we force a refresh of metadata even if we haven't seen any + partition leadership changes to proactively discover any new + brokers or partitions. Default: 300000 + partition_assignment_strategy (list): List of objects to use to + distribute partition ownership amongst consumer instances when + group management is used. Default: [RoundRobinPartitionAssignor] + heartbeat_interval_ms (int): The expected time in milliseconds + between heartbeats to the consumer coordinator when using + Kafka's group management feature. Heartbeats are used to ensure + that the consumer's session stays active and to facilitate + rebalancing when new consumers join or leave the group. The + value must be set lower than session_timeout_ms, but typically + should be set no higher than 1/3 of that value. It can be + adjusted even lower to control the expected time for normal + rebalances. Default: 3000 + session_timeout_ms (int): The timeout used to detect failures when + using Kafka's group managementment facilities. Default: 30000 + send_buffer_bytes (int): The size of the TCP send buffer + (SO_SNDBUF) to use when sending data. Default: 131072 + receive_buffer_bytes (int): The size of the TCP receive buffer + (SO_RCVBUF) to use when reading data. Default: 32768 + consumer_timeout_ms (int): number of millisecond to throw a timeout + exception to the consumer if no message is available for + consumption. Default: -1 (dont throw exception) + api_version (str): specify which kafka API version to use. + 0.9 enables full group coordination features; 0.8.2 enables + kafka-storage offset commits; 0.8.1 enables zookeeper-storage + offset commits; 0.8.0 is what is left. If set to 'auto', will + attempt to infer the broker version by probing various APIs. + Default: auto + + Note: + Configuration parameters are described in more detail at + https://kafka.apache.org/090/configuration.html#newconsumerconfigs + """ DEFAULT_CONFIG = { 'bootstrap_servers': 'localhost', 'client_id': 'kafka-python-' + __version__, @@ -51,114 +158,6 @@ class KafkaConsumer(six.Iterator): } def __init__(self, *topics, **configs): - """A Kafka client that consumes records from a Kafka cluster. - - The consumer will transparently handle the failure of servers in the - Kafka cluster, and transparently adapt as partitions of data it fetches - migrate within the cluster. This client also interacts with the server - to allow groups of consumers to load balance consumption using consumer - groups. - - Requires Kafka Server >= 0.9.0.0 - - Configuration settings can be passed to constructor as kwargs, - otherwise defaults will be used: - - Keyword Arguments: - bootstrap_servers: 'host[:port]' string (or list of 'host[:port]' - strings) that the consumer should contact to bootstrap initial - cluster metadata. This does not have to be the full node list. - It just needs to have at least one broker that will respond to a - Metadata API Request. Default port is 9092. If no servers are - specified, will default to localhost:9092. - client_id (str): a name for this client. This string is passed in - each request to servers and can be used to identify specific - server-side log entries that correspond to this client. Also - submitted to GroupCoordinator for logging with respect to - consumer group administration. Default: 'kafka-python-{version}' - group_id (str): name of the consumer group to join for dynamic - partition assignment (if enabled), and to use for fetching and - committing offsets. Default: 'kafka-python-default-group' - key_deserializer (callable): Any callable that takes a - raw message key and returns a deserialized key. - value_deserializer (callable, optional): Any callable that takes a - raw message value and returns a deserialized value. - fetch_min_bytes (int): Minimum amount of data the server should - return for a fetch request, otherwise wait up to - fetch_max_wait_ms for more data to accumulate. Default: 1024. - fetch_max_wait_ms (int): The maximum amount of time in milliseconds - the server will block before answering the fetch request if - there isn't sufficient data to immediately satisfy the - requirement given by fetch_min_bytes. Default: 500. - max_partition_fetch_bytes (int): The maximum amount of data - per-partition the server will return. The maximum total memory - used for a request = #partitions * max_partition_fetch_bytes. - This size must be at least as large as the maximum message size - the server allows or else it is possible for the producer to - send messages larger than the consumer can fetch. If that - happens, the consumer can get stuck trying to fetch a large - message on a certain partition. Default: 1048576. - request_timeout_ms (int): Client request timeout in milliseconds. - Default: 40000. - retry_backoff_ms (int): Milliseconds to backoff when retrying on - errors. Default: 100. - reconnect_backoff_ms (int): The amount of time in milliseconds to - wait before attempting to reconnect to a given host. - Default: 50. - max_in_flight_requests_per_connection (int): Requests are pipelined - to kafka brokers up to this number of maximum requests per - broker connection. Default: 5. - auto_offset_reset (str): A policy for resetting offsets on - OffsetOutOfRange errors: 'earliest' will move to the oldest - available message, 'latest' will move to the most recent. Any - ofther value will raise the exception. Default: 'latest'. - enable_auto_commit (bool): If true the consumer's offset will be - periodically committed in the background. Default: True. - auto_commit_interval_ms (int): milliseconds between automatic - offset commits, if enable_auto_commit is True. Default: 5000. - default_offset_commit_callback (callable): called as - callback(offsets, response) response will be either an Exception - or a OffsetCommitResponse struct. This callback can be used to - trigger custom actions when a commit request completes. - check_crcs (bool): Automatically check the CRC32 of the records - consumed. This ensures no on-the-wire or on-disk corruption to - the messages occurred. This check adds some overhead, so it may - be disabled in cases seeking extreme performance. Default: True - metadata_max_age_ms (int): The period of time in milliseconds after - which we force a refresh of metadata even if we haven't seen any - partition leadership changes to proactively discover any new - brokers or partitions. Default: 300000 - partition_assignment_strategy (list): List of objects to use to - distribute partition ownership amongst consumer instances when - group management is used. Default: [RoundRobinPartitionAssignor] - heartbeat_interval_ms (int): The expected time in milliseconds - between heartbeats to the consumer coordinator when using - Kafka's group management feature. Heartbeats are used to ensure - that the consumer's session stays active and to facilitate - rebalancing when new consumers join or leave the group. The - value must be set lower than session_timeout_ms, but typically - should be set no higher than 1/3 of that value. It can be - adjusted even lower to control the expected time for normal - rebalances. Default: 3000 - session_timeout_ms (int): The timeout used to detect failures when - using Kafka's group managementment facilities. Default: 30000 - send_buffer_bytes (int): The size of the TCP send buffer - (SO_SNDBUF) to use when sending data. Default: 131072 - receive_buffer_bytes (int): The size of the TCP receive buffer - (SO_RCVBUF) to use when reading data. Default: 32768 - consumer_timeout_ms (int): number of millisecond to throw a timeout - exception to the consumer if no message is available for - consumption. Default: -1 (dont throw exception) - api_version (str): specify which kafka API version to use. - 0.9 enables full group coordination features; 0.8.2 enables - kafka-storage offset commits; 0.8.1 enables zookeeper-storage - offset commits; 0.8.0 is what is left. If set to 'auto', will - attempt to infer the broker version by probing various APIs. - Default: auto - - Configuration parameters are described in more detail at - https://kafka.apache.org/090/configuration.html#newconsumerconfigs - """ self.config = copy.copy(self.DEFAULT_CONFIG) for key in self.config: if key in configs: @@ -204,20 +203,25 @@ class KafkaConsumer(six.Iterator): def assign(self, partitions): """Manually assign a list of TopicPartitions to this consumer. - This interface does not allow for incremental assignment and will - replace the previous assignment (if there was one). - - Manual topic assignment through this method does not use the consumer's - group management functionality. As such, there will be no rebalance - operation triggered when group membership or cluster and topic metadata - change. Note that it is not possible to use both manual partition - assignment with assign() and group assignment with subscribe(). - Arguments: partitions (list of TopicPartition): assignment for this instance. Raises: IllegalStateError: if consumer has already called subscribe() + + Warning: + It is not possible to use both manual partition assignment with + assign() and group assignment with subscribe(). + + Note: + This interface does not support incremental assignment and will + replace the previous assignment (if there was one). + + Note: + Manual topic assignment through this method does not use the + consumer's group management functionality. As such, there will be + no rebalance operation triggered when group membership or cluster + and topic metadata change. """ self._subscription.assign_from_user(partitions) self._client.set_topics([tp.topic for tp in partitions]) @@ -225,12 +229,12 @@ class KafkaConsumer(six.Iterator): def assignment(self): """Get the TopicPartitions currently assigned to this consumer. - If partitions were directly assigning using assign(), then this will - simply return the same partitions that were assigned. - If topics were subscribed to using subscribe(), then this will give the + If partitions were directly assigned using assign(), then this will + simply return the same partitions that were previously assigned. + If topics were subscribed using subscribe(), then this will give the set of topic partitions currently assigned to the consumer (which may - be none if the assignment hasn't happened yet, or the partitions are in - the process of getting reassigned). + be none if the assignment hasn't happened yet, or if the partitions are + in the process of being reassigned). Returns: set: {TopicPartition, ...} @@ -654,31 +658,25 @@ class KafkaConsumer(six.Iterator): # old KafkaConsumer methods are deprecated def configure(self, **configs): - """DEPRECATED -- initialize a new consumer""" raise NotImplementedError( 'deprecated -- initialize a new consumer') def set_topic_partitions(self, *topics): - """DEPRECATED -- use subscribe() or assign()""" raise NotImplementedError( 'deprecated -- use subscribe() or assign()') def fetch_messages(self): - """DEPRECATED -- use poll() or iterator interface""" raise NotImplementedError( 'deprecated -- use poll() or iterator interface') def get_partition_offsets(self, topic, partition, request_time_ms, max_num_offsets): - """DEPRECATED -- send OffsetRequest with KafkaClient""" raise NotImplementedError( 'deprecated -- send an OffsetRequest with KafkaClient') def offsets(self, group=None): - """DEPRECATED -- use committed(partition)""" raise NotImplementedError('deprecated -- use committed(partition)') def task_done(self, message): - """DEPRECATED -- commit manually if needed""" raise NotImplementedError( 'deprecated -- commit offsets manually if needed') |