summaryrefslogtreecommitdiff
path: root/kafka/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/client.py')
-rw-r--r--kafka/client.py30
1 files changed, 21 insertions, 9 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 8630f66..e14694f 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -6,7 +6,7 @@ import logging
import time
import kafka.common
-from kafka.common import (TopicAndPartition,
+from kafka.common import (TopicAndPartition, BrokerMetadata,
ConnectionError, FailedPayloadsError,
PartitionUnavailableError, LeaderUnavailableError, KafkaUnavailableError,
KafkaTimeoutError,
@@ -83,20 +83,26 @@ class KafkaClient(object):
"""
return KafkaClient.ID_GEN.next()
- def _send_broker_unaware_request(self, requestId, request):
+ def _send_broker_unaware_request(self, payloads, encoder_fn, decoder_fn):
"""
Attempt to send a broker-agnostic request to one of the available
brokers. Keep trying until you succeed.
"""
for (host, port) in self.hosts:
+ requestId = self._next_id()
try:
conn = self._get_conn(host, port)
+ request = encoder_fn(client_id=self.client_id,
+ correlation_id=requestId,
+ payloads=payloads)
+
conn.send(requestId, request)
response = conn.recv(requestId)
- return response
+ return decoder_fn(response)
+
except Exception as e:
log.warning("Could not send request [%r] to server %s:%i, "
- "trying next server: %s" % (request, host, port, e))
+ "trying next server: %s" % (requestId, host, port, e))
raise KafkaUnavailableError("All servers failed to process request")
@@ -246,13 +252,11 @@ class KafkaClient(object):
Discover brokers and metadata for a set of topics. This function is called
lazily whenever metadata is unavailable.
"""
- request_id = self._next_id()
- request = KafkaProtocol.encode_metadata_request(self.client_id,
- request_id, topics)
- response = self._send_broker_unaware_request(request_id, request)
+ resp = self.send_metadata_request(topics)
- (brokers, topics) = KafkaProtocol.decode_metadata_response(response)
+ brokers = dict([(broker.nodeId, broker) for broker in resp.brokers])
+ topics = dict([(t.topic, dict([(p.partition, p) for p in t.partitions]) ) for t in resp.topics])
log.debug("Broker metadata: %s", brokers)
log.debug("Topic metadata: %s", topics)
@@ -276,6 +280,14 @@ class KafkaClient(object):
else:
self.topics_to_brokers[topic_part] = brokers[meta.leader]
+ def send_metadata_request(self, payloads=[], fail_on_error=True,
+ callback=None):
+
+ encoder = KafkaProtocol.encode_metadata_request
+ decoder = KafkaProtocol.decode_metadata_response
+
+ return self._send_broker_unaware_request(payloads, encoder, decoder)
+
def send_produce_request(self, payloads=[], acks=1, timeout=1000,
fail_on_error=True, callback=None):
"""