summaryrefslogtreecommitdiff
path: root/kafka/client.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2014-09-01 01:48:18 -0700
committerDana Powers <dana.powers@rd.io>2014-09-01 18:02:41 -0700
commit18ac14860791db2382c3e62715f11a6f657f265a (patch)
tree0616db85b4c8ca4bb3c9f7fb8d4c6a7ad9b63dcc /kafka/client.py
parenteddd1436c226545237aa057c35719950702466ed (diff)
downloadkafka-python-18ac14860791db2382c3e62715f11a6f657f265a.tar.gz
Improve metadata protocol handling
- add MetadataRequest and MetadataResponse namedtuples - add TopicMetadata namedtuple - add error codes to Topic and Partition Metadata - add KafkaClient.send_metadata_request() method - KafkaProtocol.decode_metadata_response changed to return a MetadataResponse object so that it is consistent with server api: [broker_list, topic_list]
Diffstat (limited to 'kafka/client.py')
-rw-r--r--kafka/client.py30
1 files changed, 21 insertions, 9 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 8630f66..e14694f 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -6,7 +6,7 @@ import logging
import time
import kafka.common
-from kafka.common import (TopicAndPartition,
+from kafka.common import (TopicAndPartition, BrokerMetadata,
ConnectionError, FailedPayloadsError,
PartitionUnavailableError, LeaderUnavailableError, KafkaUnavailableError,
KafkaTimeoutError,
@@ -83,20 +83,26 @@ class KafkaClient(object):
"""
return KafkaClient.ID_GEN.next()
- def _send_broker_unaware_request(self, requestId, request):
+ def _send_broker_unaware_request(self, payloads, encoder_fn, decoder_fn):
"""
Attempt to send a broker-agnostic request to one of the available
brokers. Keep trying until you succeed.
"""
for (host, port) in self.hosts:
+ requestId = self._next_id()
try:
conn = self._get_conn(host, port)
+ request = encoder_fn(client_id=self.client_id,
+ correlation_id=requestId,
+ payloads=payloads)
+
conn.send(requestId, request)
response = conn.recv(requestId)
- return response
+ return decoder_fn(response)
+
except Exception as e:
log.warning("Could not send request [%r] to server %s:%i, "
- "trying next server: %s" % (request, host, port, e))
+ "trying next server: %s" % (requestId, host, port, e))
raise KafkaUnavailableError("All servers failed to process request")
@@ -246,13 +252,11 @@ class KafkaClient(object):
Discover brokers and metadata for a set of topics. This function is called
lazily whenever metadata is unavailable.
"""
- request_id = self._next_id()
- request = KafkaProtocol.encode_metadata_request(self.client_id,
- request_id, topics)
- response = self._send_broker_unaware_request(request_id, request)
+ resp = self.send_metadata_request(topics)
- (brokers, topics) = KafkaProtocol.decode_metadata_response(response)
+ brokers = dict([(broker.nodeId, broker) for broker in resp.brokers])
+ topics = dict([(t.topic, dict([(p.partition, p) for p in t.partitions]) ) for t in resp.topics])
log.debug("Broker metadata: %s", brokers)
log.debug("Topic metadata: %s", topics)
@@ -276,6 +280,14 @@ class KafkaClient(object):
else:
self.topics_to_brokers[topic_part] = brokers[meta.leader]
+ def send_metadata_request(self, payloads=[], fail_on_error=True,
+ callback=None):
+
+ encoder = KafkaProtocol.encode_metadata_request
+ decoder = KafkaProtocol.decode_metadata_response
+
+ return self._send_broker_unaware_request(payloads, encoder, decoder)
+
def send_produce_request(self, payloads=[], acks=1, timeout=1000,
fail_on_error=True, callback=None):
"""