diff options
author | Dana Powers <dana.powers@rd.io> | 2014-09-01 01:48:18 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2014-09-01 18:02:41 -0700 |
commit | 18ac14860791db2382c3e62715f11a6f657f265a (patch) | |
tree | 0616db85b4c8ca4bb3c9f7fb8d4c6a7ad9b63dcc /kafka/client.py | |
parent | eddd1436c226545237aa057c35719950702466ed (diff) | |
download | kafka-python-18ac14860791db2382c3e62715f11a6f657f265a.tar.gz |
Improve metadata protocol handling
- add MetadataRequest and MetadataResponse namedtuples
- add TopicMetadata namedtuple
- add error codes to Topic and Partition Metadata
- add KafkaClient.send_metadata_request() method
- KafkaProtocol.decode_metadata_response changed
to return a MetadataResponse object
so that it is consistent with server api: [broker_list, topic_list]
Diffstat (limited to 'kafka/client.py')
-rw-r--r-- | kafka/client.py | 30 |
1 files changed, 21 insertions, 9 deletions
diff --git a/kafka/client.py b/kafka/client.py index 8630f66..e14694f 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -6,7 +6,7 @@ import logging import time import kafka.common -from kafka.common import (TopicAndPartition, +from kafka.common import (TopicAndPartition, BrokerMetadata, ConnectionError, FailedPayloadsError, PartitionUnavailableError, LeaderUnavailableError, KafkaUnavailableError, KafkaTimeoutError, @@ -83,20 +83,26 @@ class KafkaClient(object): """ return KafkaClient.ID_GEN.next() - def _send_broker_unaware_request(self, requestId, request): + def _send_broker_unaware_request(self, payloads, encoder_fn, decoder_fn): """ Attempt to send a broker-agnostic request to one of the available brokers. Keep trying until you succeed. """ for (host, port) in self.hosts: + requestId = self._next_id() try: conn = self._get_conn(host, port) + request = encoder_fn(client_id=self.client_id, + correlation_id=requestId, + payloads=payloads) + conn.send(requestId, request) response = conn.recv(requestId) - return response + return decoder_fn(response) + except Exception as e: log.warning("Could not send request [%r] to server %s:%i, " - "trying next server: %s" % (request, host, port, e)) + "trying next server: %s" % (requestId, host, port, e)) raise KafkaUnavailableError("All servers failed to process request") @@ -246,13 +252,11 @@ class KafkaClient(object): Discover brokers and metadata for a set of topics. This function is called lazily whenever metadata is unavailable. """ - request_id = self._next_id() - request = KafkaProtocol.encode_metadata_request(self.client_id, - request_id, topics) - response = self._send_broker_unaware_request(request_id, request) + resp = self.send_metadata_request(topics) - (brokers, topics) = KafkaProtocol.decode_metadata_response(response) + brokers = dict([(broker.nodeId, broker) for broker in resp.brokers]) + topics = dict([(t.topic, dict([(p.partition, p) for p in t.partitions]) ) for t in resp.topics]) log.debug("Broker metadata: %s", brokers) log.debug("Topic metadata: %s", topics) @@ -276,6 +280,14 @@ class KafkaClient(object): else: self.topics_to_brokers[topic_part] = brokers[meta.leader] + def send_metadata_request(self, payloads=[], fail_on_error=True, + callback=None): + + encoder = KafkaProtocol.encode_metadata_request + decoder = KafkaProtocol.decode_metadata_response + + return self._send_broker_unaware_request(payloads, encoder, decoder) + def send_produce_request(self, payloads=[], acks=1, timeout=1000, fail_on_error=True, callback=None): """ |