diff options
Diffstat (limited to 'kafka/client.py')
-rw-r--r-- | kafka/client.py | 30 |
1 files changed, 21 insertions, 9 deletions
diff --git a/kafka/client.py b/kafka/client.py index 8630f66..e14694f 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -6,7 +6,7 @@ import logging import time import kafka.common -from kafka.common import (TopicAndPartition, +from kafka.common import (TopicAndPartition, BrokerMetadata, ConnectionError, FailedPayloadsError, PartitionUnavailableError, LeaderUnavailableError, KafkaUnavailableError, KafkaTimeoutError, @@ -83,20 +83,26 @@ class KafkaClient(object): """ return KafkaClient.ID_GEN.next() - def _send_broker_unaware_request(self, requestId, request): + def _send_broker_unaware_request(self, payloads, encoder_fn, decoder_fn): """ Attempt to send a broker-agnostic request to one of the available brokers. Keep trying until you succeed. """ for (host, port) in self.hosts: + requestId = self._next_id() try: conn = self._get_conn(host, port) + request = encoder_fn(client_id=self.client_id, + correlation_id=requestId, + payloads=payloads) + conn.send(requestId, request) response = conn.recv(requestId) - return response + return decoder_fn(response) + except Exception as e: log.warning("Could not send request [%r] to server %s:%i, " - "trying next server: %s" % (request, host, port, e)) + "trying next server: %s" % (requestId, host, port, e)) raise KafkaUnavailableError("All servers failed to process request") @@ -246,13 +252,11 @@ class KafkaClient(object): Discover brokers and metadata for a set of topics. This function is called lazily whenever metadata is unavailable. """ - request_id = self._next_id() - request = KafkaProtocol.encode_metadata_request(self.client_id, - request_id, topics) - response = self._send_broker_unaware_request(request_id, request) + resp = self.send_metadata_request(topics) - (brokers, topics) = KafkaProtocol.decode_metadata_response(response) + brokers = dict([(broker.nodeId, broker) for broker in resp.brokers]) + topics = dict([(t.topic, dict([(p.partition, p) for p in t.partitions]) ) for t in resp.topics]) log.debug("Broker metadata: %s", brokers) log.debug("Topic metadata: %s", topics) @@ -276,6 +280,14 @@ class KafkaClient(object): else: self.topics_to_brokers[topic_part] = brokers[meta.leader] + def send_metadata_request(self, payloads=[], fail_on_error=True, + callback=None): + + encoder = KafkaProtocol.encode_metadata_request + decoder = KafkaProtocol.decode_metadata_response + + return self._send_broker_unaware_request(payloads, encoder, decoder) + def send_produce_request(self, payloads=[], acks=1, timeout=1000, fail_on_error=True, callback=None): """ |