summaryrefslogtreecommitdiff
path: root/kafka/client.py
diff options
context:
space:
mode:
authorEnrico Canzonieri <ecanzonieri@gmail.com>2015-01-26 15:01:51 -0800
committerEnrico Canzonieri <enrico@yelp.com>2015-01-26 15:01:51 -0800
commit9ab8415ed75b08c5de9f823708027bb4f10a0643 (patch)
treef2011cb5cbdc4d5cd3d9bff9c52b35c2a6aca2ad /kafka/client.py
parentf517ddf283a86947a15f95e5ec562e81f4c477e5 (diff)
parent587206ff6ad59ae01248d24ff9c9fadbdfc1c1fc (diff)
downloadkafka-python-9ab8415ed75b08c5de9f823708027bb4f10a0643.tar.gz
Merge branch 'master' of github.com:mumrah/kafka-python into validate_consumer_offset
Conflicts: kafka/consumer/simple.py
Diffstat (limited to 'kafka/client.py')
-rw-r--r--kafka/client.py55
1 files changed, 30 insertions, 25 deletions
diff --git a/kafka/client.py b/kafka/client.py
index bc3d853..7b04e71 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -11,7 +11,7 @@ from kafka.common import (TopicAndPartition, BrokerMetadata,
ConnectionError, FailedPayloadsError,
KafkaTimeoutError, KafkaUnavailableError,
LeaderNotAvailableError, UnknownTopicOrPartitionError,
- NotLeaderForPartitionError)
+ NotLeaderForPartitionError, ReplicaNotAvailableError)
from kafka.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS
from kafka.protocol import KafkaProtocol
@@ -131,19 +131,21 @@ class KafkaClient(object):
the leader broker for that partition using the supplied encode/decode
functions
- Params
- ======
+ Arguments:
+
payloads: list of object-like entities with a topic (str) and
- partition (int) attribute
+ partition (int) attribute
+
encode_fn: a method to encode the list of payloads to a request body,
- must accept client_id, correlation_id, and payloads as
- keyword arguments
+ must accept client_id, correlation_id, and payloads as
+ keyword arguments
+
decode_fn: a method to decode a response body into response objects.
- The response objects must be object-like and have topic
- and partition attributes
+ The response objects must be object-like and have topic
+ and partition attributes
+
+ Returns:
- Return
- ======
List of response objects in the same order as the supplied payloads
"""
@@ -285,9 +287,9 @@ class KafkaClient(object):
This method should be called after receiving any error
- @param: *topics (optional)
- If a list of topics is provided, the metadata refresh will be limited
- to the specified topics only.
+ Arguments:
+ *topics (optional): If a list of topics is provided,
+ the metadata refresh will be limited to the specified topics only.
Exceptions:
----------
@@ -350,6 +352,11 @@ class KafkaClient(object):
log.error('No leader for topic %s partition %d', topic, partition)
self.topics_to_brokers[topic_part] = None
continue
+ # If one of the replicas is unavailable -- ignore
+ # this error code is provided for admin purposes only
+ # we never talk to replicas, only the leader
+ except ReplicaNotAvailableError:
+ log.warning('Some (non-leader) replicas not available for topic %s partition %d', topic, partition)
# If Known Broker, topic_partition -> BrokerMetadata
if leader in self.brokers:
@@ -379,18 +386,16 @@ class KafkaClient(object):
sent to a specific broker. Output is a list of responses in the
same order as the list of payloads specified
- Params
- ======
- payloads: list of ProduceRequest
- fail_on_error: boolean, should we raise an Exception if we
- encounter an API error?
- callback: function, instead of returning the ProduceResponse,
- first pass it through this function
-
- Return
- ======
- list of ProduceResponse or callback(ProduceResponse), in the
- order of input payloads
+ Arguments:
+ payloads: list of ProduceRequest
+ fail_on_error: boolean, should we raise an Exception if we
+ encounter an API error?
+ callback: function, instead of returning the ProduceResponse,
+ first pass it through this function
+
+ Returns:
+ list of ProduceResponse or callback(ProduceResponse), in the
+ order of input payloads
"""
encoder = functools.partial(