diff options
author | Enrico Canzonieri <ecanzonieri@gmail.com> | 2015-01-26 15:01:51 -0800 |
---|---|---|
committer | Enrico Canzonieri <enrico@yelp.com> | 2015-01-26 15:01:51 -0800 |
commit | 9ab8415ed75b08c5de9f823708027bb4f10a0643 (patch) | |
tree | f2011cb5cbdc4d5cd3d9bff9c52b35c2a6aca2ad /kafka/client.py | |
parent | f517ddf283a86947a15f95e5ec562e81f4c477e5 (diff) | |
parent | 587206ff6ad59ae01248d24ff9c9fadbdfc1c1fc (diff) | |
download | kafka-python-9ab8415ed75b08c5de9f823708027bb4f10a0643.tar.gz |
Merge branch 'master' of github.com:mumrah/kafka-python into validate_consumer_offset
Conflicts:
kafka/consumer/simple.py
Diffstat (limited to 'kafka/client.py')
-rw-r--r-- | kafka/client.py | 55 |
1 files changed, 30 insertions, 25 deletions
diff --git a/kafka/client.py b/kafka/client.py index bc3d853..7b04e71 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -11,7 +11,7 @@ from kafka.common import (TopicAndPartition, BrokerMetadata, ConnectionError, FailedPayloadsError, KafkaTimeoutError, KafkaUnavailableError, LeaderNotAvailableError, UnknownTopicOrPartitionError, - NotLeaderForPartitionError) + NotLeaderForPartitionError, ReplicaNotAvailableError) from kafka.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS from kafka.protocol import KafkaProtocol @@ -131,19 +131,21 @@ class KafkaClient(object): the leader broker for that partition using the supplied encode/decode functions - Params - ====== + Arguments: + payloads: list of object-like entities with a topic (str) and - partition (int) attribute + partition (int) attribute + encode_fn: a method to encode the list of payloads to a request body, - must accept client_id, correlation_id, and payloads as - keyword arguments + must accept client_id, correlation_id, and payloads as + keyword arguments + decode_fn: a method to decode a response body into response objects. - The response objects must be object-like and have topic - and partition attributes + The response objects must be object-like and have topic + and partition attributes + + Returns: - Return - ====== List of response objects in the same order as the supplied payloads """ @@ -285,9 +287,9 @@ class KafkaClient(object): This method should be called after receiving any error - @param: *topics (optional) - If a list of topics is provided, the metadata refresh will be limited - to the specified topics only. + Arguments: + *topics (optional): If a list of topics is provided, + the metadata refresh will be limited to the specified topics only. Exceptions: ---------- @@ -350,6 +352,11 @@ class KafkaClient(object): log.error('No leader for topic %s partition %d', topic, partition) self.topics_to_brokers[topic_part] = None continue + # If one of the replicas is unavailable -- ignore + # this error code is provided for admin purposes only + # we never talk to replicas, only the leader + except ReplicaNotAvailableError: + log.warning('Some (non-leader) replicas not available for topic %s partition %d', topic, partition) # If Known Broker, topic_partition -> BrokerMetadata if leader in self.brokers: @@ -379,18 +386,16 @@ class KafkaClient(object): sent to a specific broker. Output is a list of responses in the same order as the list of payloads specified - Params - ====== - payloads: list of ProduceRequest - fail_on_error: boolean, should we raise an Exception if we - encounter an API error? - callback: function, instead of returning the ProduceResponse, - first pass it through this function - - Return - ====== - list of ProduceResponse or callback(ProduceResponse), in the - order of input payloads + Arguments: + payloads: list of ProduceRequest + fail_on_error: boolean, should we raise an Exception if we + encounter an API error? + callback: function, instead of returning the ProduceResponse, + first pass it through this function + + Returns: + list of ProduceResponse or callback(ProduceResponse), in the + order of input payloads """ encoder = functools.partial( |