diff options
-rw-r--r-- | kafka/client.py | 39 | ||||
-rw-r--r-- | kafka/codec.py | 32 | ||||
-rw-r--r-- | kafka/conn.py | 11 | ||||
-rw-r--r-- | kafka/consumer/group.py | 256 | ||||
-rw-r--r-- | kafka/producer/base.py | 53 |
5 files changed, 209 insertions, 182 deletions
diff --git a/kafka/client.py b/kafka/client.py index 2f070cd..14e71bb 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -455,31 +455,28 @@ class KafkaClient(object): time.sleep(.5) def load_metadata_for_topics(self, *topics): - """ - Fetch broker and topic-partition metadata from the server, - and update internal data: - broker list, topic/partition list, and topic/parition -> broker map + """Fetch broker and topic-partition metadata from the server. + + Updates internal data: broker list, topic/partition list, and + topic/parition -> broker map. This method should be called after + receiving any error. - This method should be called after receiving any error + Note: Exceptions *will not* be raised in a full refresh (i.e. no topic + list). In this case, error codes will be logged as errors. + Partition-level errors will also not be raised here (a single partition + w/o a leader, for example). Arguments: *topics (optional): If a list of topics is provided, - the metadata refresh will be limited to the specified topics only. - - Exceptions: - ---------- - If the broker is configured to not auto-create topics, - expect UnknownTopicOrPartitionError for topics that don't exist - - If the broker is configured to auto-create topics, - expect LeaderNotAvailableError for new topics - until partitions have been initialized. - - Exceptions *will not* be raised in a full refresh (i.e. no topic list) - In this case, error codes will be logged as errors - - Partition-level errors will also not be raised here - (a single partition w/o a leader, for example) + the metadata refresh will be limited to the specified topics + only. + + Raises: + UnknownTopicOrPartitionError: Raised for topics that do not exist, + unless the broker is configured to auto-create topics. + LeaderNotAvailableError: Raised for topics that do not exist yet, + when the broker is configured to auto-create topics. Retry + after a short backoff (topics/partitions are initializing). """ if topics: self.reset_topic_metadata(*topics) diff --git a/kafka/codec.py b/kafka/codec.py index a9373c7..c27d89b 100644 --- a/kafka/codec.py +++ b/kafka/codec.py @@ -55,24 +55,30 @@ def gzip_decode(payload): return result -def snappy_encode(payload, xerial_compatible=False, xerial_blocksize=32 * 1024): - """Encodes the given data with snappy if xerial_compatible is set then the - stream is encoded in a fashion compatible with the xerial snappy library +def snappy_encode(payload, xerial_compatible=False, xerial_blocksize=32*1024): + """Encodes the given data with snappy compression. + + If xerial_compatible is set then the stream is encoded in a fashion + compatible with the xerial snappy library. + + The block size (xerial_blocksize) controls how frequent the blocking occurs + 32k is the default in the xerial library. + + The format winds up being: - The block size (xerial_blocksize) controls how frequent the blocking - occurs 32k is the default in the xerial library. - The format winds up being +-------------+------------+--------------+------------+--------------+ | Header | Block1 len | Block1 data | Blockn len | Blockn data | - |-------------+------------+--------------+------------+--------------| + +-------------+------------+--------------+------------+--------------+ | 16 bytes | BE int32 | snappy bytes | BE int32 | snappy bytes | +-------------+------------+--------------+------------+--------------+ - It is important to not that the blocksize is the amount of uncompressed - data presented to snappy at each block, whereas the blocklen is the - number of bytes that will be present in the stream, that is the - length will always be <= blocksize. + + It is important to note that the blocksize is the amount of uncompressed + data presented to snappy at each block, whereas the blocklen is the number + of bytes that will be present in the stream; so the length will always be + <= blocksize. + """ if not has_snappy(): @@ -109,9 +115,9 @@ def _detect_xerial_stream(payload): This mode writes a magic header of the format: +--------+--------------+------------+---------+--------+ | Marker | Magic String | Null / Pad | Version | Compat | - |--------+--------------+------------+---------+--------| + +--------+--------------+------------+---------+--------+ | byte | c-string | byte | int32 | int32 | - |--------+--------------+------------+---------+--------| + +--------+--------------+------------+---------+--------+ | -126 | 'SNAPPY' | \0 | | | +--------+--------------+------------+---------+--------+ diff --git a/kafka/conn.py b/kafka/conn.py index 9e8a16f..6ee5f5f 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -126,9 +126,17 @@ class BrokerConnection(object): return False def connected(self): + """Return True iff socket is connected.""" return self.state is ConnectionStates.CONNECTED def close(self, error=None): + """Close socket and fail all in-flight-requests. + + Arguments: + error (Exception, optional): pending in-flight-requests + will be failed with this exception. + Default: kafka.common.ConnectionError. + """ if self._sock: self._sock.close() self._sock = None @@ -189,11 +197,12 @@ class BrokerConnection(object): return future def can_send_more(self): + """Return True unless there are max_in_flight_requests.""" max_ifrs = self.config['max_in_flight_requests_per_connection'] return len(self.in_flight_requests) < max_ifrs def recv(self, timeout=0): - """Non-blocking network receive + """Non-blocking network receive. Return response if available """ diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index bd9d03d..9ce1438 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -18,7 +18,114 @@ log = logging.getLogger(__name__) class KafkaConsumer(six.Iterator): - """Consumer for Kafka 0.9""" + """Consume records from a Kafka cluster. + + The consumer will transparently handle the failure of servers in the Kafka + cluster, and adapt as topic-partitions are created or migrate between + brokers. It also interacts with the assigned kafka Group Coordinator node + to allow multiple consumers to load balance consumption of topics (requires + kafka >= 0.9.0.0). + + Arguments: + *topics (str): optional list of topics to subscribe to. If not set, + call subscribe() or assign() before consuming records. + + Keyword Arguments: + bootstrap_servers: 'host[:port]' string (or list of 'host[:port]' + strings) that the consumer should contact to bootstrap initial + cluster metadata. This does not have to be the full node list. + It just needs to have at least one broker that will respond to a + Metadata API Request. Default port is 9092. If no servers are + specified, will default to localhost:9092. + client_id (str): a name for this client. This string is passed in + each request to servers and can be used to identify specific + server-side log entries that correspond to this client. Also + submitted to GroupCoordinator for logging with respect to + consumer group administration. Default: 'kafka-python-{version}' + group_id (str): name of the consumer group to join for dynamic + partition assignment (if enabled), and to use for fetching and + committing offsets. Default: 'kafka-python-default-group' + key_deserializer (callable): Any callable that takes a + raw message key and returns a deserialized key. + value_deserializer (callable, optional): Any callable that takes a + raw message value and returns a deserialized value. + fetch_min_bytes (int): Minimum amount of data the server should + return for a fetch request, otherwise wait up to + fetch_max_wait_ms for more data to accumulate. Default: 1024. + fetch_max_wait_ms (int): The maximum amount of time in milliseconds + the server will block before answering the fetch request if + there isn't sufficient data to immediately satisfy the + requirement given by fetch_min_bytes. Default: 500. + max_partition_fetch_bytes (int): The maximum amount of data + per-partition the server will return. The maximum total memory + used for a request = #partitions * max_partition_fetch_bytes. + This size must be at least as large as the maximum message size + the server allows or else it is possible for the producer to + send messages larger than the consumer can fetch. If that + happens, the consumer can get stuck trying to fetch a large + message on a certain partition. Default: 1048576. + request_timeout_ms (int): Client request timeout in milliseconds. + Default: 40000. + retry_backoff_ms (int): Milliseconds to backoff when retrying on + errors. Default: 100. + reconnect_backoff_ms (int): The amount of time in milliseconds to + wait before attempting to reconnect to a given host. + Default: 50. + max_in_flight_requests_per_connection (int): Requests are pipelined + to kafka brokers up to this number of maximum requests per + broker connection. Default: 5. + auto_offset_reset (str): A policy for resetting offsets on + OffsetOutOfRange errors: 'earliest' will move to the oldest + available message, 'latest' will move to the most recent. Any + ofther value will raise the exception. Default: 'latest'. + enable_auto_commit (bool): If true the consumer's offset will be + periodically committed in the background. Default: True. + auto_commit_interval_ms (int): milliseconds between automatic + offset commits, if enable_auto_commit is True. Default: 5000. + default_offset_commit_callback (callable): called as + callback(offsets, response) response will be either an Exception + or a OffsetCommitResponse struct. This callback can be used to + trigger custom actions when a commit request completes. + check_crcs (bool): Automatically check the CRC32 of the records + consumed. This ensures no on-the-wire or on-disk corruption to + the messages occurred. This check adds some overhead, so it may + be disabled in cases seeking extreme performance. Default: True + metadata_max_age_ms (int): The period of time in milliseconds after + which we force a refresh of metadata even if we haven't seen any + partition leadership changes to proactively discover any new + brokers or partitions. Default: 300000 + partition_assignment_strategy (list): List of objects to use to + distribute partition ownership amongst consumer instances when + group management is used. Default: [RoundRobinPartitionAssignor] + heartbeat_interval_ms (int): The expected time in milliseconds + between heartbeats to the consumer coordinator when using + Kafka's group management feature. Heartbeats are used to ensure + that the consumer's session stays active and to facilitate + rebalancing when new consumers join or leave the group. The + value must be set lower than session_timeout_ms, but typically + should be set no higher than 1/3 of that value. It can be + adjusted even lower to control the expected time for normal + rebalances. Default: 3000 + session_timeout_ms (int): The timeout used to detect failures when + using Kafka's group managementment facilities. Default: 30000 + send_buffer_bytes (int): The size of the TCP send buffer + (SO_SNDBUF) to use when sending data. Default: 131072 + receive_buffer_bytes (int): The size of the TCP receive buffer + (SO_RCVBUF) to use when reading data. Default: 32768 + consumer_timeout_ms (int): number of millisecond to throw a timeout + exception to the consumer if no message is available for + consumption. Default: -1 (dont throw exception) + api_version (str): specify which kafka API version to use. + 0.9 enables full group coordination features; 0.8.2 enables + kafka-storage offset commits; 0.8.1 enables zookeeper-storage + offset commits; 0.8.0 is what is left. If set to 'auto', will + attempt to infer the broker version by probing various APIs. + Default: auto + + Note: + Configuration parameters are described in more detail at + https://kafka.apache.org/090/configuration.html#newconsumerconfigs + """ DEFAULT_CONFIG = { 'bootstrap_servers': 'localhost', 'client_id': 'kafka-python-' + __version__, @@ -51,114 +158,6 @@ class KafkaConsumer(six.Iterator): } def __init__(self, *topics, **configs): - """A Kafka client that consumes records from a Kafka cluster. - - The consumer will transparently handle the failure of servers in the - Kafka cluster, and transparently adapt as partitions of data it fetches - migrate within the cluster. This client also interacts with the server - to allow groups of consumers to load balance consumption using consumer - groups. - - Requires Kafka Server >= 0.9.0.0 - - Configuration settings can be passed to constructor as kwargs, - otherwise defaults will be used: - - Keyword Arguments: - bootstrap_servers: 'host[:port]' string (or list of 'host[:port]' - strings) that the consumer should contact to bootstrap initial - cluster metadata. This does not have to be the full node list. - It just needs to have at least one broker that will respond to a - Metadata API Request. Default port is 9092. If no servers are - specified, will default to localhost:9092. - client_id (str): a name for this client. This string is passed in - each request to servers and can be used to identify specific - server-side log entries that correspond to this client. Also - submitted to GroupCoordinator for logging with respect to - consumer group administration. Default: 'kafka-python-{version}' - group_id (str): name of the consumer group to join for dynamic - partition assignment (if enabled), and to use for fetching and - committing offsets. Default: 'kafka-python-default-group' - key_deserializer (callable): Any callable that takes a - raw message key and returns a deserialized key. - value_deserializer (callable, optional): Any callable that takes a - raw message value and returns a deserialized value. - fetch_min_bytes (int): Minimum amount of data the server should - return for a fetch request, otherwise wait up to - fetch_max_wait_ms for more data to accumulate. Default: 1024. - fetch_max_wait_ms (int): The maximum amount of time in milliseconds - the server will block before answering the fetch request if - there isn't sufficient data to immediately satisfy the - requirement given by fetch_min_bytes. Default: 500. - max_partition_fetch_bytes (int): The maximum amount of data - per-partition the server will return. The maximum total memory - used for a request = #partitions * max_partition_fetch_bytes. - This size must be at least as large as the maximum message size - the server allows or else it is possible for the producer to - send messages larger than the consumer can fetch. If that - happens, the consumer can get stuck trying to fetch a large - message on a certain partition. Default: 1048576. - request_timeout_ms (int): Client request timeout in milliseconds. - Default: 40000. - retry_backoff_ms (int): Milliseconds to backoff when retrying on - errors. Default: 100. - reconnect_backoff_ms (int): The amount of time in milliseconds to - wait before attempting to reconnect to a given host. - Default: 50. - max_in_flight_requests_per_connection (int): Requests are pipelined - to kafka brokers up to this number of maximum requests per - broker connection. Default: 5. - auto_offset_reset (str): A policy for resetting offsets on - OffsetOutOfRange errors: 'earliest' will move to the oldest - available message, 'latest' will move to the most recent. Any - ofther value will raise the exception. Default: 'latest'. - enable_auto_commit (bool): If true the consumer's offset will be - periodically committed in the background. Default: True. - auto_commit_interval_ms (int): milliseconds between automatic - offset commits, if enable_auto_commit is True. Default: 5000. - default_offset_commit_callback (callable): called as - callback(offsets, response) response will be either an Exception - or a OffsetCommitResponse struct. This callback can be used to - trigger custom actions when a commit request completes. - check_crcs (bool): Automatically check the CRC32 of the records - consumed. This ensures no on-the-wire or on-disk corruption to - the messages occurred. This check adds some overhead, so it may - be disabled in cases seeking extreme performance. Default: True - metadata_max_age_ms (int): The period of time in milliseconds after - which we force a refresh of metadata even if we haven't seen any - partition leadership changes to proactively discover any new - brokers or partitions. Default: 300000 - partition_assignment_strategy (list): List of objects to use to - distribute partition ownership amongst consumer instances when - group management is used. Default: [RoundRobinPartitionAssignor] - heartbeat_interval_ms (int): The expected time in milliseconds - between heartbeats to the consumer coordinator when using - Kafka's group management feature. Heartbeats are used to ensure - that the consumer's session stays active and to facilitate - rebalancing when new consumers join or leave the group. The - value must be set lower than session_timeout_ms, but typically - should be set no higher than 1/3 of that value. It can be - adjusted even lower to control the expected time for normal - rebalances. Default: 3000 - session_timeout_ms (int): The timeout used to detect failures when - using Kafka's group managementment facilities. Default: 30000 - send_buffer_bytes (int): The size of the TCP send buffer - (SO_SNDBUF) to use when sending data. Default: 131072 - receive_buffer_bytes (int): The size of the TCP receive buffer - (SO_RCVBUF) to use when reading data. Default: 32768 - consumer_timeout_ms (int): number of millisecond to throw a timeout - exception to the consumer if no message is available for - consumption. Default: -1 (dont throw exception) - api_version (str): specify which kafka API version to use. - 0.9 enables full group coordination features; 0.8.2 enables - kafka-storage offset commits; 0.8.1 enables zookeeper-storage - offset commits; 0.8.0 is what is left. If set to 'auto', will - attempt to infer the broker version by probing various APIs. - Default: auto - - Configuration parameters are described in more detail at - https://kafka.apache.org/090/configuration.html#newconsumerconfigs - """ self.config = copy.copy(self.DEFAULT_CONFIG) for key in self.config: if key in configs: @@ -204,20 +203,25 @@ class KafkaConsumer(six.Iterator): def assign(self, partitions): """Manually assign a list of TopicPartitions to this consumer. - This interface does not allow for incremental assignment and will - replace the previous assignment (if there was one). - - Manual topic assignment through this method does not use the consumer's - group management functionality. As such, there will be no rebalance - operation triggered when group membership or cluster and topic metadata - change. Note that it is not possible to use both manual partition - assignment with assign() and group assignment with subscribe(). - Arguments: partitions (list of TopicPartition): assignment for this instance. Raises: IllegalStateError: if consumer has already called subscribe() + + Warning: + It is not possible to use both manual partition assignment with + assign() and group assignment with subscribe(). + + Note: + This interface does not support incremental assignment and will + replace the previous assignment (if there was one). + + Note: + Manual topic assignment through this method does not use the + consumer's group management functionality. As such, there will be + no rebalance operation triggered when group membership or cluster + and topic metadata change. """ self._subscription.assign_from_user(partitions) self._client.set_topics([tp.topic for tp in partitions]) @@ -225,12 +229,12 @@ class KafkaConsumer(six.Iterator): def assignment(self): """Get the TopicPartitions currently assigned to this consumer. - If partitions were directly assigning using assign(), then this will - simply return the same partitions that were assigned. - If topics were subscribed to using subscribe(), then this will give the + If partitions were directly assigned using assign(), then this will + simply return the same partitions that were previously assigned. + If topics were subscribed using subscribe(), then this will give the set of topic partitions currently assigned to the consumer (which may - be none if the assignment hasn't happened yet, or the partitions are in - the process of getting reassigned). + be none if the assignment hasn't happened yet, or if the partitions are + in the process of being reassigned). Returns: set: {TopicPartition, ...} @@ -654,31 +658,25 @@ class KafkaConsumer(six.Iterator): # old KafkaConsumer methods are deprecated def configure(self, **configs): - """DEPRECATED -- initialize a new consumer""" raise NotImplementedError( 'deprecated -- initialize a new consumer') def set_topic_partitions(self, *topics): - """DEPRECATED -- use subscribe() or assign()""" raise NotImplementedError( 'deprecated -- use subscribe() or assign()') def fetch_messages(self): - """DEPRECATED -- use poll() or iterator interface""" raise NotImplementedError( 'deprecated -- use poll() or iterator interface') def get_partition_offsets(self, topic, partition, request_time_ms, max_num_offsets): - """DEPRECATED -- send OffsetRequest with KafkaClient""" raise NotImplementedError( 'deprecated -- send an OffsetRequest with KafkaClient') def offsets(self, group=None): - """DEPRECATED -- use committed(partition)""" raise NotImplementedError('deprecated -- use committed(partition)') def task_done(self, message): - """DEPRECATED -- commit manually if needed""" raise NotImplementedError( 'deprecated -- commit offsets manually if needed') diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 4f5edbc..506da83 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -61,7 +61,8 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, Arguments: queue (threading.Queue): the queue from which to get messages - client (KafkaClient): instance to use for communicating with brokers + client (kafka.SimpleClient): instance to use for communicating + with brokers codec (kafka.protocol.ALL_CODECS): compression codec to use batch_time (int): interval in seconds to send message batches batch_size (int): count of messages that will trigger an immediate send @@ -225,9 +226,9 @@ class Producer(object): Base class to be used by producers Arguments: - client (KafkaClient): instance to use for broker communications. - If async=True, the background thread will use client.copy(), - which is expected to return a thread-safe object. + client (kafka.SimpleClient): instance to use for broker + communications. If async=True, the background thread will use + client.copy(), which is expected to return a thread-safe object. codec (kafka.protocol.ALL_CODECS): compression codec to use. req_acks (int, optional): A value indicating the acknowledgements that the server must receive before responding to the request, @@ -345,20 +346,36 @@ class Producer(object): self.sync_fail_on_error = sync_fail_on_error def send_messages(self, topic, partition, *msg): - """ - Helper method to send produce requests - @param: topic, name of topic for produce request -- type str - @param: partition, partition number for produce request -- type int - @param: *msg, one or more message payloads -- type bytes - @returns: ResponseRequest returned by server - raises on error - - Note that msg type *must* be encoded to bytes by user. - Passing unicode message will not work, for example - you should encode before calling send_messages via - something like `unicode_message.encode('utf-8')` - - All messages produced via this method will set the message 'key' to Null + """Helper method to send produce requests. + + Note that msg type *must* be encoded to bytes by user. Passing unicode + message will not work, for example you should encode before calling + send_messages via something like `unicode_message.encode('utf-8')` + All messages will set the message 'key' to None. + + Arguments: + topic (str): name of topic for produce request + partition (int): partition number for produce request + *msg (bytes): one or more message payloads + + Returns: + ResponseRequest returned by server + + Raises: + FailedPayloadsError: low-level connection error, can be caused by + networking failures, or a malformed request. + ConnectionError: + KafkaUnavailableError: all known brokers are down when attempting + to refresh metadata. + LeaderNotAvailableError: topic or partition is initializing or + a broker failed and leadership election is in progress. + NotLeaderForPartitionError: metadata is out of sync; the broker + that the request was sent to is not the leader for the topic + or partition. + UnknownTopicOrPartitionError: the topic or partition has not + been created yet and auto-creation is not available. + AsyncProducerQueueFull: in async mode, if too many messages are + unsent and remain in the internal queue. """ return self._send_messages(topic, partition, *msg) |