summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2014-09-01 01:48:18 -0700
committerDana Powers <dana.powers@rd.io>2014-09-01 18:02:41 -0700
commit18ac14860791db2382c3e62715f11a6f657f265a (patch)
tree0616db85b4c8ca4bb3c9f7fb8d4c6a7ad9b63dcc /kafka
parenteddd1436c226545237aa057c35719950702466ed (diff)
downloadkafka-python-18ac14860791db2382c3e62715f11a6f657f265a.tar.gz
Improve metadata protocol handling
- add MetadataRequest and MetadataResponse namedtuples - add TopicMetadata namedtuple - add error codes to Topic and Partition Metadata - add KafkaClient.send_metadata_request() method - KafkaProtocol.decode_metadata_response changed to return a MetadataResponse object so that it is consistent with server api: [broker_list, topic_list]
Diffstat (limited to 'kafka')
-rw-r--r--kafka/client.py30
-rw-r--r--kafka/common.py30
-rw-r--r--kafka/protocol.py45
3 files changed, 69 insertions, 36 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 8630f66..e14694f 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -6,7 +6,7 @@ import logging
import time
import kafka.common
-from kafka.common import (TopicAndPartition,
+from kafka.common import (TopicAndPartition, BrokerMetadata,
ConnectionError, FailedPayloadsError,
PartitionUnavailableError, LeaderUnavailableError, KafkaUnavailableError,
KafkaTimeoutError,
@@ -83,20 +83,26 @@ class KafkaClient(object):
"""
return KafkaClient.ID_GEN.next()
- def _send_broker_unaware_request(self, requestId, request):
+ def _send_broker_unaware_request(self, payloads, encoder_fn, decoder_fn):
"""
Attempt to send a broker-agnostic request to one of the available
brokers. Keep trying until you succeed.
"""
for (host, port) in self.hosts:
+ requestId = self._next_id()
try:
conn = self._get_conn(host, port)
+ request = encoder_fn(client_id=self.client_id,
+ correlation_id=requestId,
+ payloads=payloads)
+
conn.send(requestId, request)
response = conn.recv(requestId)
- return response
+ return decoder_fn(response)
+
except Exception as e:
log.warning("Could not send request [%r] to server %s:%i, "
- "trying next server: %s" % (request, host, port, e))
+ "trying next server: %s" % (requestId, host, port, e))
raise KafkaUnavailableError("All servers failed to process request")
@@ -246,13 +252,11 @@ class KafkaClient(object):
Discover brokers and metadata for a set of topics. This function is called
lazily whenever metadata is unavailable.
"""
- request_id = self._next_id()
- request = KafkaProtocol.encode_metadata_request(self.client_id,
- request_id, topics)
- response = self._send_broker_unaware_request(request_id, request)
+ resp = self.send_metadata_request(topics)
- (brokers, topics) = KafkaProtocol.decode_metadata_response(response)
+ brokers = dict([(broker.nodeId, broker) for broker in resp.brokers])
+ topics = dict([(t.topic, dict([(p.partition, p) for p in t.partitions]) ) for t in resp.topics])
log.debug("Broker metadata: %s", brokers)
log.debug("Topic metadata: %s", topics)
@@ -276,6 +280,14 @@ class KafkaClient(object):
else:
self.topics_to_brokers[topic_part] = brokers[meta.leader]
+ def send_metadata_request(self, payloads=[], fail_on_error=True,
+ callback=None):
+
+ encoder = KafkaProtocol.encode_metadata_request
+ decoder = KafkaProtocol.decode_metadata_response
+
+ return self._send_broker_unaware_request(payloads, encoder, decoder)
+
def send_produce_request(self, payloads=[], acks=1, timeout=1000,
fail_on_error=True, callback=None):
"""
diff --git a/kafka/common.py b/kafka/common.py
index 907e128..e1713cf 100644
--- a/kafka/common.py
+++ b/kafka/common.py
@@ -17,8 +17,14 @@ OffsetRequest = namedtuple("OffsetRequest",
OffsetCommitRequest = namedtuple("OffsetCommitRequest",
["topic", "partition", "offset", "metadata"])
+MetadataRequest = namedtuple("MetadataRequest",
+ ["topics"])
+
OffsetFetchRequest = namedtuple("OffsetFetchRequest", ["topic", "partition"])
+MetadataResponse = namedtuple("MetadataResponse",
+ ["brokers", "topics"])
+
# Response payloads
ProduceResponse = namedtuple("ProduceResponse",
["topic", "partition", "error", "offset"])
@@ -36,16 +42,26 @@ OffsetFetchResponse = namedtuple("OffsetFetchResponse",
["topic", "partition", "offset",
"metadata", "error"])
-BrokerMetadata = namedtuple("BrokerMetadata", ["nodeId", "host", "port"])
-PartitionMetadata = namedtuple("PartitionMetadata",
- ["topic", "partition", "leader",
- "replicas", "isr"])
# Other useful structs
-OffsetAndMessage = namedtuple("OffsetAndMessage", ["offset", "message"])
-Message = namedtuple("Message", ["magic", "attributes", "key", "value"])
-TopicAndPartition = namedtuple("TopicAndPartition", ["topic", "partition"])
+BrokerMetadata = namedtuple("BrokerMetadata",
+ ["nodeId", "host", "port"])
+
+TopicMetadata = namedtuple("TopicMetadata",
+ ["topic", "error", "partitions"])
+
+PartitionMetadata = namedtuple("PartitionMetadata",
+ ["topic", "partition", "leader", "replicas", "isr", "error"])
+
+OffsetAndMessage = namedtuple("OffsetAndMessage",
+ ["offset", "message"])
+
+Message = namedtuple("Message",
+ ["magic", "attributes", "key", "value"])
+
+TopicAndPartition = namedtuple("TopicAndPartition",
+ ["topic", "partition"])
#################
diff --git a/kafka/protocol.py b/kafka/protocol.py
index 58661c7..db048aa 100644
--- a/kafka/protocol.py
+++ b/kafka/protocol.py
@@ -6,11 +6,12 @@ from kafka.codec import (
gzip_encode, gzip_decode, snappy_encode, snappy_decode
)
from kafka.common import (
- BrokerMetadata, PartitionMetadata, Message, OffsetAndMessage,
- ProduceResponse, FetchResponse, OffsetResponse,
- OffsetCommitResponse, OffsetFetchResponse, ProtocolError,
- BufferUnderflowError, ChecksumError, ConsumerFetchSizeTooSmall,
- UnsupportedCodecError
+ Message, OffsetAndMessage, TopicAndPartition,
+ BrokerMetadata, TopicMetadata, PartitionMetadata,
+ MetadataResponse, ProduceResponse, FetchResponse,
+ OffsetResponse, OffsetCommitResponse, OffsetFetchResponse,
+ ProtocolError, BufferUnderflowError, ChecksumError,
+ ConsumerFetchSizeTooSmall, UnsupportedCodecError
)
from kafka.util import (
read_short_string, read_int_string, relative_unpack,
@@ -340,7 +341,8 @@ class KafkaProtocol(object):
yield OffsetResponse(topic, partition, error, tuple(offsets))
@classmethod
- def encode_metadata_request(cls, client_id, correlation_id, topics=None):
+ def encode_metadata_request(cls, client_id, correlation_id, topics=None,
+ payloads=None):
"""
Encode a MetadataRequest
@@ -350,7 +352,11 @@ class KafkaProtocol(object):
correlation_id: int
topics: list of strings
"""
- topics = [] if topics is None else topics
+ if payloads is None:
+ topics = [] if topics is None else topics
+ else:
+ topics = payloads
+
message = cls._encode_message_header(client_id, correlation_id,
KafkaProtocol.METADATA_KEY)
@@ -373,28 +379,24 @@ class KafkaProtocol(object):
((correlation_id, numbrokers), cur) = relative_unpack('>ii', data, 0)
# Broker info
- brokers = {}
+ brokers = []
for i in range(numbrokers):
((nodeId, ), cur) = relative_unpack('>i', data, cur)
(host, cur) = read_short_string(data, cur)
((port,), cur) = relative_unpack('>i', data, cur)
- brokers[nodeId] = BrokerMetadata(nodeId, host, port)
+ brokers.append(BrokerMetadata(nodeId, host, port))
# Topic info
((num_topics,), cur) = relative_unpack('>i', data, cur)
- topic_metadata = {}
+ topic_metadata = []
for i in range(num_topics):
- # NOTE: topic_error is discarded. Should probably be returned with
- # the topic metadata.
((topic_error,), cur) = relative_unpack('>h', data, cur)
(topic_name, cur) = read_short_string(data, cur)
((num_partitions,), cur) = relative_unpack('>i', data, cur)
- partition_metadata = {}
+ partition_metadata = []
for j in range(num_partitions):
- # NOTE: partition_error_code is discarded. Should probably be
- # returned with the partition metadata.
((partition_error_code, partition, leader, numReplicas), cur) = \
relative_unpack('>hiii', data, cur)
@@ -404,13 +406,16 @@ class KafkaProtocol(object):
((num_isr,), cur) = relative_unpack('>i', data, cur)
(isr, cur) = relative_unpack('>%di' % num_isr, data, cur)
- partition_metadata[partition] = \
- PartitionMetadata(
- topic_name, partition, leader, replicas, isr)
+ partition_metadata.append(
+ PartitionMetadata(topic_name, partition, leader,
+ replicas, isr, partition_error_code)
+ )
- topic_metadata[topic_name] = partition_metadata
+ topic_metadata.append(
+ TopicMetadata(topic_name, topic_error, partition_metadata)
+ )
- return brokers, topic_metadata
+ return MetadataResponse(brokers, topic_metadata)
@classmethod
def encode_offset_commit_request(cls, client_id, correlation_id,