summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2016-01-07 17:03:08 -0800
committerDana Powers <dana.powers@rd.io>2016-01-07 17:03:08 -0800
commite080c6b0cdb54563e3c5ad595d582de26561d9f0 (patch)
tree0a3e686336667bb98c4be4bb324292fa29767a67
parentc8deb0c276d57209006eebdd910017846860a38d (diff)
downloadkafka-python-e080c6b0cdb54563e3c5ad595d582de26561d9f0.tar.gz
Docstring updates
-rw-r--r--kafka/client.py39
-rw-r--r--kafka/codec.py32
-rw-r--r--kafka/conn.py11
-rw-r--r--kafka/consumer/group.py256
-rw-r--r--kafka/producer/base.py53
5 files changed, 209 insertions, 182 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 2f070cd..14e71bb 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -455,31 +455,28 @@ class KafkaClient(object):
time.sleep(.5)
def load_metadata_for_topics(self, *topics):
- """
- Fetch broker and topic-partition metadata from the server,
- and update internal data:
- broker list, topic/partition list, and topic/parition -> broker map
+ """Fetch broker and topic-partition metadata from the server.
+
+ Updates internal data: broker list, topic/partition list, and
+ topic/parition -> broker map. This method should be called after
+ receiving any error.
- This method should be called after receiving any error
+ Note: Exceptions *will not* be raised in a full refresh (i.e. no topic
+ list). In this case, error codes will be logged as errors.
+ Partition-level errors will also not be raised here (a single partition
+ w/o a leader, for example).
Arguments:
*topics (optional): If a list of topics is provided,
- the metadata refresh will be limited to the specified topics only.
-
- Exceptions:
- ----------
- If the broker is configured to not auto-create topics,
- expect UnknownTopicOrPartitionError for topics that don't exist
-
- If the broker is configured to auto-create topics,
- expect LeaderNotAvailableError for new topics
- until partitions have been initialized.
-
- Exceptions *will not* be raised in a full refresh (i.e. no topic list)
- In this case, error codes will be logged as errors
-
- Partition-level errors will also not be raised here
- (a single partition w/o a leader, for example)
+ the metadata refresh will be limited to the specified topics
+ only.
+
+ Raises:
+ UnknownTopicOrPartitionError: Raised for topics that do not exist,
+ unless the broker is configured to auto-create topics.
+ LeaderNotAvailableError: Raised for topics that do not exist yet,
+ when the broker is configured to auto-create topics. Retry
+ after a short backoff (topics/partitions are initializing).
"""
if topics:
self.reset_topic_metadata(*topics)
diff --git a/kafka/codec.py b/kafka/codec.py
index a9373c7..c27d89b 100644
--- a/kafka/codec.py
+++ b/kafka/codec.py
@@ -55,24 +55,30 @@ def gzip_decode(payload):
return result
-def snappy_encode(payload, xerial_compatible=False, xerial_blocksize=32 * 1024):
- """Encodes the given data with snappy if xerial_compatible is set then the
- stream is encoded in a fashion compatible with the xerial snappy library
+def snappy_encode(payload, xerial_compatible=False, xerial_blocksize=32*1024):
+ """Encodes the given data with snappy compression.
+
+ If xerial_compatible is set then the stream is encoded in a fashion
+ compatible with the xerial snappy library.
+
+ The block size (xerial_blocksize) controls how frequent the blocking occurs
+ 32k is the default in the xerial library.
+
+ The format winds up being:
- The block size (xerial_blocksize) controls how frequent the blocking
- occurs 32k is the default in the xerial library.
- The format winds up being
+-------------+------------+--------------+------------+--------------+
| Header | Block1 len | Block1 data | Blockn len | Blockn data |
- |-------------+------------+--------------+------------+--------------|
+ +-------------+------------+--------------+------------+--------------+
| 16 bytes | BE int32 | snappy bytes | BE int32 | snappy bytes |
+-------------+------------+--------------+------------+--------------+
- It is important to not that the blocksize is the amount of uncompressed
- data presented to snappy at each block, whereas the blocklen is the
- number of bytes that will be present in the stream, that is the
- length will always be <= blocksize.
+
+ It is important to note that the blocksize is the amount of uncompressed
+ data presented to snappy at each block, whereas the blocklen is the number
+ of bytes that will be present in the stream; so the length will always be
+ <= blocksize.
+
"""
if not has_snappy():
@@ -109,9 +115,9 @@ def _detect_xerial_stream(payload):
This mode writes a magic header of the format:
+--------+--------------+------------+---------+--------+
| Marker | Magic String | Null / Pad | Version | Compat |
- |--------+--------------+------------+---------+--------|
+ +--------+--------------+------------+---------+--------+
| byte | c-string | byte | int32 | int32 |
- |--------+--------------+------------+---------+--------|
+ +--------+--------------+------------+---------+--------+
| -126 | 'SNAPPY' | \0 | | |
+--------+--------------+------------+---------+--------+
diff --git a/kafka/conn.py b/kafka/conn.py
index 9e8a16f..6ee5f5f 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -126,9 +126,17 @@ class BrokerConnection(object):
return False
def connected(self):
+ """Return True iff socket is connected."""
return self.state is ConnectionStates.CONNECTED
def close(self, error=None):
+ """Close socket and fail all in-flight-requests.
+
+ Arguments:
+ error (Exception, optional): pending in-flight-requests
+ will be failed with this exception.
+ Default: kafka.common.ConnectionError.
+ """
if self._sock:
self._sock.close()
self._sock = None
@@ -189,11 +197,12 @@ class BrokerConnection(object):
return future
def can_send_more(self):
+ """Return True unless there are max_in_flight_requests."""
max_ifrs = self.config['max_in_flight_requests_per_connection']
return len(self.in_flight_requests) < max_ifrs
def recv(self, timeout=0):
- """Non-blocking network receive
+ """Non-blocking network receive.
Return response if available
"""
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index bd9d03d..9ce1438 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -18,7 +18,114 @@ log = logging.getLogger(__name__)
class KafkaConsumer(six.Iterator):
- """Consumer for Kafka 0.9"""
+ """Consume records from a Kafka cluster.
+
+ The consumer will transparently handle the failure of servers in the Kafka
+ cluster, and adapt as topic-partitions are created or migrate between
+ brokers. It also interacts with the assigned kafka Group Coordinator node
+ to allow multiple consumers to load balance consumption of topics (requires
+ kafka >= 0.9.0.0).
+
+ Arguments:
+ *topics (str): optional list of topics to subscribe to. If not set,
+ call subscribe() or assign() before consuming records.
+
+ Keyword Arguments:
+ bootstrap_servers: 'host[:port]' string (or list of 'host[:port]'
+ strings) that the consumer should contact to bootstrap initial
+ cluster metadata. This does not have to be the full node list.
+ It just needs to have at least one broker that will respond to a
+ Metadata API Request. Default port is 9092. If no servers are
+ specified, will default to localhost:9092.
+ client_id (str): a name for this client. This string is passed in
+ each request to servers and can be used to identify specific
+ server-side log entries that correspond to this client. Also
+ submitted to GroupCoordinator for logging with respect to
+ consumer group administration. Default: 'kafka-python-{version}'
+ group_id (str): name of the consumer group to join for dynamic
+ partition assignment (if enabled), and to use for fetching and
+ committing offsets. Default: 'kafka-python-default-group'
+ key_deserializer (callable): Any callable that takes a
+ raw message key and returns a deserialized key.
+ value_deserializer (callable, optional): Any callable that takes a
+ raw message value and returns a deserialized value.
+ fetch_min_bytes (int): Minimum amount of data the server should
+ return for a fetch request, otherwise wait up to
+ fetch_max_wait_ms for more data to accumulate. Default: 1024.
+ fetch_max_wait_ms (int): The maximum amount of time in milliseconds
+ the server will block before answering the fetch request if
+ there isn't sufficient data to immediately satisfy the
+ requirement given by fetch_min_bytes. Default: 500.
+ max_partition_fetch_bytes (int): The maximum amount of data
+ per-partition the server will return. The maximum total memory
+ used for a request = #partitions * max_partition_fetch_bytes.
+ This size must be at least as large as the maximum message size
+ the server allows or else it is possible for the producer to
+ send messages larger than the consumer can fetch. If that
+ happens, the consumer can get stuck trying to fetch a large
+ message on a certain partition. Default: 1048576.
+ request_timeout_ms (int): Client request timeout in milliseconds.
+ Default: 40000.
+ retry_backoff_ms (int): Milliseconds to backoff when retrying on
+ errors. Default: 100.
+ reconnect_backoff_ms (int): The amount of time in milliseconds to
+ wait before attempting to reconnect to a given host.
+ Default: 50.
+ max_in_flight_requests_per_connection (int): Requests are pipelined
+ to kafka brokers up to this number of maximum requests per
+ broker connection. Default: 5.
+ auto_offset_reset (str): A policy for resetting offsets on
+ OffsetOutOfRange errors: 'earliest' will move to the oldest
+ available message, 'latest' will move to the most recent. Any
+ ofther value will raise the exception. Default: 'latest'.
+ enable_auto_commit (bool): If true the consumer's offset will be
+ periodically committed in the background. Default: True.
+ auto_commit_interval_ms (int): milliseconds between automatic
+ offset commits, if enable_auto_commit is True. Default: 5000.
+ default_offset_commit_callback (callable): called as
+ callback(offsets, response) response will be either an Exception
+ or a OffsetCommitResponse struct. This callback can be used to
+ trigger custom actions when a commit request completes.
+ check_crcs (bool): Automatically check the CRC32 of the records
+ consumed. This ensures no on-the-wire or on-disk corruption to
+ the messages occurred. This check adds some overhead, so it may
+ be disabled in cases seeking extreme performance. Default: True
+ metadata_max_age_ms (int): The period of time in milliseconds after
+ which we force a refresh of metadata even if we haven't seen any
+ partition leadership changes to proactively discover any new
+ brokers or partitions. Default: 300000
+ partition_assignment_strategy (list): List of objects to use to
+ distribute partition ownership amongst consumer instances when
+ group management is used. Default: [RoundRobinPartitionAssignor]
+ heartbeat_interval_ms (int): The expected time in milliseconds
+ between heartbeats to the consumer coordinator when using
+ Kafka's group management feature. Heartbeats are used to ensure
+ that the consumer's session stays active and to facilitate
+ rebalancing when new consumers join or leave the group. The
+ value must be set lower than session_timeout_ms, but typically
+ should be set no higher than 1/3 of that value. It can be
+ adjusted even lower to control the expected time for normal
+ rebalances. Default: 3000
+ session_timeout_ms (int): The timeout used to detect failures when
+ using Kafka's group managementment facilities. Default: 30000
+ send_buffer_bytes (int): The size of the TCP send buffer
+ (SO_SNDBUF) to use when sending data. Default: 131072
+ receive_buffer_bytes (int): The size of the TCP receive buffer
+ (SO_RCVBUF) to use when reading data. Default: 32768
+ consumer_timeout_ms (int): number of millisecond to throw a timeout
+ exception to the consumer if no message is available for
+ consumption. Default: -1 (dont throw exception)
+ api_version (str): specify which kafka API version to use.
+ 0.9 enables full group coordination features; 0.8.2 enables
+ kafka-storage offset commits; 0.8.1 enables zookeeper-storage
+ offset commits; 0.8.0 is what is left. If set to 'auto', will
+ attempt to infer the broker version by probing various APIs.
+ Default: auto
+
+ Note:
+ Configuration parameters are described in more detail at
+ https://kafka.apache.org/090/configuration.html#newconsumerconfigs
+ """
DEFAULT_CONFIG = {
'bootstrap_servers': 'localhost',
'client_id': 'kafka-python-' + __version__,
@@ -51,114 +158,6 @@ class KafkaConsumer(six.Iterator):
}
def __init__(self, *topics, **configs):
- """A Kafka client that consumes records from a Kafka cluster.
-
- The consumer will transparently handle the failure of servers in the
- Kafka cluster, and transparently adapt as partitions of data it fetches
- migrate within the cluster. This client also interacts with the server
- to allow groups of consumers to load balance consumption using consumer
- groups.
-
- Requires Kafka Server >= 0.9.0.0
-
- Configuration settings can be passed to constructor as kwargs,
- otherwise defaults will be used:
-
- Keyword Arguments:
- bootstrap_servers: 'host[:port]' string (or list of 'host[:port]'
- strings) that the consumer should contact to bootstrap initial
- cluster metadata. This does not have to be the full node list.
- It just needs to have at least one broker that will respond to a
- Metadata API Request. Default port is 9092. If no servers are
- specified, will default to localhost:9092.
- client_id (str): a name for this client. This string is passed in
- each request to servers and can be used to identify specific
- server-side log entries that correspond to this client. Also
- submitted to GroupCoordinator for logging with respect to
- consumer group administration. Default: 'kafka-python-{version}'
- group_id (str): name of the consumer group to join for dynamic
- partition assignment (if enabled), and to use for fetching and
- committing offsets. Default: 'kafka-python-default-group'
- key_deserializer (callable): Any callable that takes a
- raw message key and returns a deserialized key.
- value_deserializer (callable, optional): Any callable that takes a
- raw message value and returns a deserialized value.
- fetch_min_bytes (int): Minimum amount of data the server should
- return for a fetch request, otherwise wait up to
- fetch_max_wait_ms for more data to accumulate. Default: 1024.
- fetch_max_wait_ms (int): The maximum amount of time in milliseconds
- the server will block before answering the fetch request if
- there isn't sufficient data to immediately satisfy the
- requirement given by fetch_min_bytes. Default: 500.
- max_partition_fetch_bytes (int): The maximum amount of data
- per-partition the server will return. The maximum total memory
- used for a request = #partitions * max_partition_fetch_bytes.
- This size must be at least as large as the maximum message size
- the server allows or else it is possible for the producer to
- send messages larger than the consumer can fetch. If that
- happens, the consumer can get stuck trying to fetch a large
- message on a certain partition. Default: 1048576.
- request_timeout_ms (int): Client request timeout in milliseconds.
- Default: 40000.
- retry_backoff_ms (int): Milliseconds to backoff when retrying on
- errors. Default: 100.
- reconnect_backoff_ms (int): The amount of time in milliseconds to
- wait before attempting to reconnect to a given host.
- Default: 50.
- max_in_flight_requests_per_connection (int): Requests are pipelined
- to kafka brokers up to this number of maximum requests per
- broker connection. Default: 5.
- auto_offset_reset (str): A policy for resetting offsets on
- OffsetOutOfRange errors: 'earliest' will move to the oldest
- available message, 'latest' will move to the most recent. Any
- ofther value will raise the exception. Default: 'latest'.
- enable_auto_commit (bool): If true the consumer's offset will be
- periodically committed in the background. Default: True.
- auto_commit_interval_ms (int): milliseconds between automatic
- offset commits, if enable_auto_commit is True. Default: 5000.
- default_offset_commit_callback (callable): called as
- callback(offsets, response) response will be either an Exception
- or a OffsetCommitResponse struct. This callback can be used to
- trigger custom actions when a commit request completes.
- check_crcs (bool): Automatically check the CRC32 of the records
- consumed. This ensures no on-the-wire or on-disk corruption to
- the messages occurred. This check adds some overhead, so it may
- be disabled in cases seeking extreme performance. Default: True
- metadata_max_age_ms (int): The period of time in milliseconds after
- which we force a refresh of metadata even if we haven't seen any
- partition leadership changes to proactively discover any new
- brokers or partitions. Default: 300000
- partition_assignment_strategy (list): List of objects to use to
- distribute partition ownership amongst consumer instances when
- group management is used. Default: [RoundRobinPartitionAssignor]
- heartbeat_interval_ms (int): The expected time in milliseconds
- between heartbeats to the consumer coordinator when using
- Kafka's group management feature. Heartbeats are used to ensure
- that the consumer's session stays active and to facilitate
- rebalancing when new consumers join or leave the group. The
- value must be set lower than session_timeout_ms, but typically
- should be set no higher than 1/3 of that value. It can be
- adjusted even lower to control the expected time for normal
- rebalances. Default: 3000
- session_timeout_ms (int): The timeout used to detect failures when
- using Kafka's group managementment facilities. Default: 30000
- send_buffer_bytes (int): The size of the TCP send buffer
- (SO_SNDBUF) to use when sending data. Default: 131072
- receive_buffer_bytes (int): The size of the TCP receive buffer
- (SO_RCVBUF) to use when reading data. Default: 32768
- consumer_timeout_ms (int): number of millisecond to throw a timeout
- exception to the consumer if no message is available for
- consumption. Default: -1 (dont throw exception)
- api_version (str): specify which kafka API version to use.
- 0.9 enables full group coordination features; 0.8.2 enables
- kafka-storage offset commits; 0.8.1 enables zookeeper-storage
- offset commits; 0.8.0 is what is left. If set to 'auto', will
- attempt to infer the broker version by probing various APIs.
- Default: auto
-
- Configuration parameters are described in more detail at
- https://kafka.apache.org/090/configuration.html#newconsumerconfigs
- """
self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config:
if key in configs:
@@ -204,20 +203,25 @@ class KafkaConsumer(six.Iterator):
def assign(self, partitions):
"""Manually assign a list of TopicPartitions to this consumer.
- This interface does not allow for incremental assignment and will
- replace the previous assignment (if there was one).
-
- Manual topic assignment through this method does not use the consumer's
- group management functionality. As such, there will be no rebalance
- operation triggered when group membership or cluster and topic metadata
- change. Note that it is not possible to use both manual partition
- assignment with assign() and group assignment with subscribe().
-
Arguments:
partitions (list of TopicPartition): assignment for this instance.
Raises:
IllegalStateError: if consumer has already called subscribe()
+
+ Warning:
+ It is not possible to use both manual partition assignment with
+ assign() and group assignment with subscribe().
+
+ Note:
+ This interface does not support incremental assignment and will
+ replace the previous assignment (if there was one).
+
+ Note:
+ Manual topic assignment through this method does not use the
+ consumer's group management functionality. As such, there will be
+ no rebalance operation triggered when group membership or cluster
+ and topic metadata change.
"""
self._subscription.assign_from_user(partitions)
self._client.set_topics([tp.topic for tp in partitions])
@@ -225,12 +229,12 @@ class KafkaConsumer(six.Iterator):
def assignment(self):
"""Get the TopicPartitions currently assigned to this consumer.
- If partitions were directly assigning using assign(), then this will
- simply return the same partitions that were assigned.
- If topics were subscribed to using subscribe(), then this will give the
+ If partitions were directly assigned using assign(), then this will
+ simply return the same partitions that were previously assigned.
+ If topics were subscribed using subscribe(), then this will give the
set of topic partitions currently assigned to the consumer (which may
- be none if the assignment hasn't happened yet, or the partitions are in
- the process of getting reassigned).
+ be none if the assignment hasn't happened yet, or if the partitions are
+ in the process of being reassigned).
Returns:
set: {TopicPartition, ...}
@@ -654,31 +658,25 @@ class KafkaConsumer(six.Iterator):
# old KafkaConsumer methods are deprecated
def configure(self, **configs):
- """DEPRECATED -- initialize a new consumer"""
raise NotImplementedError(
'deprecated -- initialize a new consumer')
def set_topic_partitions(self, *topics):
- """DEPRECATED -- use subscribe() or assign()"""
raise NotImplementedError(
'deprecated -- use subscribe() or assign()')
def fetch_messages(self):
- """DEPRECATED -- use poll() or iterator interface"""
raise NotImplementedError(
'deprecated -- use poll() or iterator interface')
def get_partition_offsets(self, topic, partition,
request_time_ms, max_num_offsets):
- """DEPRECATED -- send OffsetRequest with KafkaClient"""
raise NotImplementedError(
'deprecated -- send an OffsetRequest with KafkaClient')
def offsets(self, group=None):
- """DEPRECATED -- use committed(partition)"""
raise NotImplementedError('deprecated -- use committed(partition)')
def task_done(self, message):
- """DEPRECATED -- commit manually if needed"""
raise NotImplementedError(
'deprecated -- commit offsets manually if needed')
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index 4f5edbc..506da83 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -61,7 +61,8 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
Arguments:
queue (threading.Queue): the queue from which to get messages
- client (KafkaClient): instance to use for communicating with brokers
+ client (kafka.SimpleClient): instance to use for communicating
+ with brokers
codec (kafka.protocol.ALL_CODECS): compression codec to use
batch_time (int): interval in seconds to send message batches
batch_size (int): count of messages that will trigger an immediate send
@@ -225,9 +226,9 @@ class Producer(object):
Base class to be used by producers
Arguments:
- client (KafkaClient): instance to use for broker communications.
- If async=True, the background thread will use client.copy(),
- which is expected to return a thread-safe object.
+ client (kafka.SimpleClient): instance to use for broker
+ communications. If async=True, the background thread will use
+ client.copy(), which is expected to return a thread-safe object.
codec (kafka.protocol.ALL_CODECS): compression codec to use.
req_acks (int, optional): A value indicating the acknowledgements that
the server must receive before responding to the request,
@@ -345,20 +346,36 @@ class Producer(object):
self.sync_fail_on_error = sync_fail_on_error
def send_messages(self, topic, partition, *msg):
- """
- Helper method to send produce requests
- @param: topic, name of topic for produce request -- type str
- @param: partition, partition number for produce request -- type int
- @param: *msg, one or more message payloads -- type bytes
- @returns: ResponseRequest returned by server
- raises on error
-
- Note that msg type *must* be encoded to bytes by user.
- Passing unicode message will not work, for example
- you should encode before calling send_messages via
- something like `unicode_message.encode('utf-8')`
-
- All messages produced via this method will set the message 'key' to Null
+ """Helper method to send produce requests.
+
+ Note that msg type *must* be encoded to bytes by user. Passing unicode
+ message will not work, for example you should encode before calling
+ send_messages via something like `unicode_message.encode('utf-8')`
+ All messages will set the message 'key' to None.
+
+ Arguments:
+ topic (str): name of topic for produce request
+ partition (int): partition number for produce request
+ *msg (bytes): one or more message payloads
+
+ Returns:
+ ResponseRequest returned by server
+
+ Raises:
+ FailedPayloadsError: low-level connection error, can be caused by
+ networking failures, or a malformed request.
+ ConnectionError:
+ KafkaUnavailableError: all known brokers are down when attempting
+ to refresh metadata.
+ LeaderNotAvailableError: topic or partition is initializing or
+ a broker failed and leadership election is in progress.
+ NotLeaderForPartitionError: metadata is out of sync; the broker
+ that the request was sent to is not the leader for the topic
+ or partition.
+ UnknownTopicOrPartitionError: the topic or partition has not
+ been created yet and auto-creation is not available.
+ AsyncProducerQueueFull: in async mode, if too many messages are
+ unsent and remain in the internal queue.
"""
return self._send_messages(topic, partition, *msg)