diff options
author | Dana Powers <dana.powers@rd.io> | 2014-09-08 13:15:56 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2014-09-08 13:17:43 -0700 |
commit | fff812ddc80243208233f785b3f005904cf33482 (patch) | |
tree | 30f1eb703dcd8ce1063c413fd70ac11f3fff5072 /kafka | |
parent | 42a7ab18bb84fea60deed5f7e3a6cfdfaaaeecd6 (diff) | |
parent | 0dabb1fbe8a9f538527a03c2903475ed77a12c10 (diff) | |
download | kafka-python-fff812ddc80243208233f785b3f005904cf33482.tar.gz |
Merge pull request #223 from dpkp/metadata_refactor
Metadata Refactor
* add MetadataRequest and MetadataResponse namedtuples
* add TopicMetadata namedtuple
* add error codes to Topic and Partition Metadata
* add KafkaClient.send_metadata_request() method
* KafkaProtocol.decode_metadata_response changed to return a
MetadataResponse object so that it is consistent with server api:
[broker_list, topic_list]
* raise server exceptions in load_metadata_for_topics(*topics)
unless topics is null (full refresh)
* Replace non-standard exceptions (LeaderUnavailable,
PartitionUnavailable) with server standard exceptions
(LeaderNotAvailableError, UnknownTopicOrPartitionError)
Conflicts:
kafka/client.py
test/test_client.py
test/test_producer_integration.py
test/test_protocol.py
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/client.py | 219 | ||||
-rw-r--r-- | kafka/common.py | 46 | ||||
-rw-r--r-- | kafka/consumer.py | 11 | ||||
-rw-r--r-- | kafka/producer.py | 19 | ||||
-rw-r--r-- | kafka/protocol.py | 45 |
5 files changed, 224 insertions, 116 deletions
diff --git a/kafka/client.py b/kafka/client.py index a918091..8c78694 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -7,11 +7,11 @@ import logging import time import kafka.common -from kafka.common import (TopicAndPartition, +from kafka.common import (TopicAndPartition, BrokerMetadata, ConnectionError, FailedPayloadsError, - PartitionUnavailableError, LeaderUnavailableError, KafkaUnavailableError, - KafkaTimeoutError, - UnknownTopicOrPartitionError, NotLeaderForPartitionError) + KafkaTimeoutError, KafkaUnavailableError, + LeaderNotAvailableError, UnknownTopicOrPartitionError, + NotLeaderForPartitionError) from kafka.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS from kafka.protocol import KafkaProtocol @@ -37,8 +37,9 @@ class KafkaClient(object): # create connections only when we need them self.conns = {} self.brokers = {} # broker_id -> BrokerMetadata - self.topics_to_brokers = {} # topic_id -> broker_id - self.topic_partitions = {} # topic_id -> [0, 1, 2, ...] + self.topics_to_brokers = {} # TopicAndPartition -> BrokerMetadata + self.topic_partitions = {} # topic -> partition -> PartitionMetadata + self.load_metadata_for_topics() # bootstrap with all metadata @@ -63,20 +64,37 @@ class KafkaClient(object): Returns the leader for a partition or None if the partition exists but has no leader. - PartitionUnavailableError will be raised if the topic or partition + UnknownTopicOrPartitionError will be raised if the topic or partition is not part of the metadata. + + LeaderNotAvailableError is raised if server has metadata, but there is + no current leader """ key = TopicAndPartition(topic, partition) - # reload metadata whether the partition is not available - # or has no leader (broker is None) - if self.topics_to_brokers.get(key) is None: - self.load_metadata_for_topics(topic) - if key not in self.topics_to_brokers: - raise PartitionUnavailableError("%s not available" % str(key)) + # Use cached metadata if it is there + if self.topics_to_brokers.get(key) is not None: + return self.topics_to_brokers[key] + + # Otherwise refresh metadata + + # If topic does not already exist, this will raise + # UnknownTopicOrPartitionError if not auto-creating + # LeaderNotAvailableError otherwise until partitions are created + self.load_metadata_for_topics(topic) - return self.topics_to_brokers[key] + # If the partition doesn't actually exist, raise + if partition not in self.topic_partitions[topic]: + raise UnknownTopicOrPartitionError(key) + + # If there's no leader for the partition, raise + meta = self.topic_partitions[topic][partition] + if meta.leader == -1: + raise LeaderNotAvailableError(meta) + + # Otherwise return the BrokerMetadata + return self.brokers[meta.leader] def _next_id(self): """ @@ -84,20 +102,26 @@ class KafkaClient(object): """ return next(KafkaClient.ID_GEN) - def _send_broker_unaware_request(self, requestId, request): + def _send_broker_unaware_request(self, payloads, encoder_fn, decoder_fn): """ Attempt to send a broker-agnostic request to one of the available brokers. Keep trying until you succeed. """ for (host, port) in self.hosts: + requestId = self._next_id() try: conn = self._get_conn(host, port) + request = encoder_fn(client_id=self.client_id, + correlation_id=requestId, + payloads=payloads) + conn.send(requestId, request) response = conn.recv(requestId) - return response + return decoder_fn(response) + except Exception as e: log.warning("Could not send request [%r] to server %s:%i, " - "trying next server: %s" % (binascii.b2a_hex(request), host, port, e)) + "trying next server: %s" % (requestId, host, port, e)) raise KafkaUnavailableError("All servers failed to process request") @@ -109,8 +133,8 @@ class KafkaClient(object): Params ====== - payloads: list of object-like entities with a topic and - partition attribute + payloads: list of object-like entities with a topic (str) and + partition (int) attribute encode_fn: a method to encode the list of payloads to a request body, must accept client_id, correlation_id, and payloads as keyword arguments @@ -130,10 +154,6 @@ class KafkaClient(object): for payload in payloads: leader = self._get_leader_for_partition(payload.topic, payload.partition) - if leader is None: - raise LeaderUnavailableError( - "Leader not available for topic %s partition %s" % - (payload.topic, payload.partition)) payloads_by_broker[leader].append(payload) original_keys.append((payload.topic, payload.partition)) @@ -195,6 +215,24 @@ class KafkaClient(object): ################# # Public API # ################# + def close(self): + for conn in self.conns.values(): + conn.close() + + def copy(self): + """ + Create an inactive copy of the client object + A reinit() has to be done on the copy before it can be used again + """ + c = copy.deepcopy(self) + for key in c.conns: + c.conns[key] = self.conns[key].copy() + return c + + def reinit(self): + for conn in self.conns.values(): + conn.reinit() + def reset_topic_metadata(self, *topics): for topic in topics: try: @@ -212,70 +250,125 @@ class KafkaClient(object): self.topic_partitions.clear() def has_metadata_for_topic(self, topic): - return topic in self.topic_partitions + return ( + topic in self.topic_partitions + and len(self.topic_partitions[topic]) > 0 + ) + + def get_partition_ids_for_topic(self, topic): + if topic not in self.topic_partitions: + return None + + return list(self.topic_partitions[topic]) def ensure_topic_exists(self, topic, timeout = 30): start_time = time.time() - self.load_metadata_for_topics(topic) while not self.has_metadata_for_topic(topic): if time.time() > start_time + timeout: raise KafkaTimeoutError("Unable to create topic {0}".format(topic)) - self.load_metadata_for_topics(topic) + try: + self.load_metadata_for_topics(topic) + except LeaderNotAvailableError: + pass + except UnknownTopicOrPartitionError: + # Server is not configured to auto-create + # retrying in this case will not help + raise time.sleep(.5) - def close(self): - for conn in self.conns.values(): - conn.close() - - def copy(self): - """ - Create an inactive copy of the client object - A reinit() has to be done on the copy before it can be used again + def load_metadata_for_topics(self, *topics): """ - c = copy.deepcopy(self) - for key in c.conns: - c.conns[key] = self.conns[key].copy() - return c + Fetch broker and topic-partition metadata from the server, + and update internal data: + broker list, topic/partition list, and topic/parition -> broker map - def reinit(self): - for conn in self.conns.values(): - conn.reinit() + This method should be called after receiving any error - def load_metadata_for_topics(self, *topics): - """ - Discover brokers and metadata for a set of topics. This function is called - lazily whenever metadata is unavailable. - """ - request_id = self._next_id() - request = KafkaProtocol.encode_metadata_request(self.client_id, - request_id, topics) + @param: *topics (optional) + If a list of topics is provided, the metadata refresh will be limited + to the specified topics only. + + Exceptions: + ---------- + If the broker is configured to not auto-create topics, + expect UnknownTopicOrPartitionError for topics that don't exist - response = self._send_broker_unaware_request(request_id, request) + If the broker is configured to auto-create topics, + expect LeaderNotAvailableError for new topics + until partitions have been initialized. - (brokers, topics) = KafkaProtocol.decode_metadata_response(response) + Exceptions *will not* be raised in a full refresh (i.e. no topic list) + In this case, error codes will be logged as errors - log.debug("Broker metadata: %s", brokers) - log.debug("Topic metadata: %s", topics) + Partition-level errors will also not be raised here + (a single partition w/o a leader, for example) + """ + resp = self.send_metadata_request(topics) + + log.debug("Broker metadata: %s", resp.brokers) + log.debug("Topic metadata: %s", resp.topics) + + self.brokers = dict([(broker.nodeId, broker) + for broker in resp.brokers]) - self.brokers = brokers + for topic_metadata in resp.topics: + topic = topic_metadata.topic + partitions = topic_metadata.partitions - for topic, partitions in topics.items(): self.reset_topic_metadata(topic) - if not partitions: - log.warning('No partitions for %s', topic) + # Errors expected for new topics + try: + kafka.common.check_error(topic_metadata) + except (UnknownTopicOrPartitionError, LeaderNotAvailableError) as e: + + # Raise if the topic was passed in explicitly + if topic in topics: + raise + + # Otherwise, just log a warning + log.error("Error loading topic metadata for %s: %s", topic, type(e)) continue - self.topic_partitions[topic] = [] - for partition, meta in partitions.items(): - self.topic_partitions[topic].append(partition) + self.topic_partitions[topic] = {} + for partition_metadata in partitions: + partition = partition_metadata.partition + leader = partition_metadata.leader + + self.topic_partitions[topic][partition] = partition_metadata + + # Populate topics_to_brokers dict topic_part = TopicAndPartition(topic, partition) - if meta.leader == -1: - log.warning('No leader for topic %s partition %s', topic, partition) + + # Check for partition errors + try: + kafka.common.check_error(partition_metadata) + + # If No Leader, topics_to_brokers topic_partition -> None + except LeaderNotAvailableError: + log.error('No leader for topic %s partition %d', topic, partition) self.topics_to_brokers[topic_part] = None + continue + + # If Known Broker, topic_partition -> BrokerMetadata + if leader in self.brokers: + self.topics_to_brokers[topic_part] = self.brokers[leader] + + # If Unknown Broker, fake BrokerMetadata so we dont lose the id + # (not sure how this could happen. server could be in bad state) else: - self.topics_to_brokers[topic_part] = brokers[meta.leader] + self.topics_to_brokers[topic_part] = BrokerMetadata( + leader, None, None + ) + + def send_metadata_request(self, payloads=[], fail_on_error=True, + callback=None): + + encoder = KafkaProtocol.encode_metadata_request + decoder = KafkaProtocol.decode_metadata_response + + return self._send_broker_unaware_request(payloads, encoder, decoder) def send_produce_request(self, payloads=[], acks=1, timeout=1000, fail_on_error=True, callback=None): diff --git a/kafka/common.py b/kafka/common.py index 907e128..008736c 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -17,8 +17,14 @@ OffsetRequest = namedtuple("OffsetRequest", OffsetCommitRequest = namedtuple("OffsetCommitRequest", ["topic", "partition", "offset", "metadata"]) +MetadataRequest = namedtuple("MetadataRequest", + ["topics"]) + OffsetFetchRequest = namedtuple("OffsetFetchRequest", ["topic", "partition"]) +MetadataResponse = namedtuple("MetadataResponse", + ["brokers", "topics"]) + # Response payloads ProduceResponse = namedtuple("ProduceResponse", ["topic", "partition", "error", "offset"]) @@ -36,16 +42,26 @@ OffsetFetchResponse = namedtuple("OffsetFetchResponse", ["topic", "partition", "offset", "metadata", "error"]) -BrokerMetadata = namedtuple("BrokerMetadata", ["nodeId", "host", "port"]) -PartitionMetadata = namedtuple("PartitionMetadata", - ["topic", "partition", "leader", - "replicas", "isr"]) # Other useful structs -OffsetAndMessage = namedtuple("OffsetAndMessage", ["offset", "message"]) -Message = namedtuple("Message", ["magic", "attributes", "key", "value"]) -TopicAndPartition = namedtuple("TopicAndPartition", ["topic", "partition"]) +BrokerMetadata = namedtuple("BrokerMetadata", + ["nodeId", "host", "port"]) + +TopicMetadata = namedtuple("TopicMetadata", + ["topic", "error", "partitions"]) + +PartitionMetadata = namedtuple("PartitionMetadata", + ["topic", "partition", "leader", "replicas", "isr", "error"]) + +OffsetAndMessage = namedtuple("OffsetAndMessage", + ["offset", "message"]) + +Message = namedtuple("Message", + ["magic", "attributes", "key", "value"]) + +TopicAndPartition = namedtuple("TopicAndPartition", + ["topic", "partition"]) ################# @@ -60,6 +76,9 @@ class KafkaError(RuntimeError): class BrokerResponseError(KafkaError): pass +class NoError(BrokerResponseError): + errno = 0 + message = 'SUCCESS' class UnknownError(BrokerResponseError): errno = -1 @@ -139,14 +158,6 @@ class KafkaTimeoutError(KafkaError): pass -class LeaderUnavailableError(KafkaError): - pass - - -class PartitionUnavailableError(KafkaError): - pass - - class FailedPayloadsError(KafkaError): pass @@ -181,6 +192,7 @@ class UnsupportedCodecError(KafkaError): kafka_errors = { -1 : UnknownError, + 0 : NoError, 1 : OffsetOutOfRangeError, 2 : InvalidMessageError, 3 : UnknownTopicOrPartitionError, @@ -198,7 +210,7 @@ kafka_errors = { def check_error(response): - error = kafka_errors.get(response.error) - if error: + error = kafka_errors.get(response.error, UnknownError) + if error is not NoError: raise error(response) diff --git a/kafka/consumer.py b/kafka/consumer.py index fa1b8bc..42628e1 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -91,7 +91,7 @@ class Consumer(object): self.offsets = {} if not partitions: - partitions = self.client.topic_partitions[topic] + partitions = self.client.get_partition_ids_for_topic(topic) else: assert all(isinstance(x, numbers.Integral) for x in partitions) @@ -117,9 +117,9 @@ class Consumer(object): def fetch_last_known_offsets(self, partitions=None): if not partitions: - partitions = self.client.topic_partitions[self.topic] + partitions = self.client.get_partition_ids_for_topic(self.topic) - def get_or_init_offset_callback(resp): + def get_or_init_offset(resp): try: kafka.common.check_error(resp) return resp.offset @@ -128,10 +128,9 @@ class Consumer(object): for partition in partitions: req = OffsetFetchRequest(self.topic, partition) - (offset,) = self.client.send_offset_fetch_request(self.group, [req], - callback=get_or_init_offset_callback, + (resp,) = self.client.send_offset_fetch_request(self.group, [req], fail_on_error=False) - self.offsets[partition] = offset + self.offsets[partition] = get_or_init_offset(resp) self.fetch_offsets = self.offsets.copy() def commit(self, partitions=None): diff --git a/kafka/producer.py b/kafka/producer.py index 4a04b38..f186649 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -247,16 +247,14 @@ class SimpleProducer(Producer): def _next_partition(self, topic): if topic not in self.partition_cycles: - if topic not in self.client.topic_partitions: + if not self.client.has_metadata_for_topic(topic): self.client.load_metadata_for_topics(topic) - try: - self.partition_cycles[topic] = cycle(self.client.topic_partitions[topic]) - except KeyError: - raise UnknownTopicOrPartitionError(topic) + + self.partition_cycles[topic] = cycle(self.client.get_partition_ids_for_topic(topic)) # Randomize the initial partition that is returned if self.random_start: - num_partitions = len(self.client.topic_partitions[topic]) + num_partitions = len(self.client.get_partition_ids_for_topic(topic)) for _ in xrange(random.randint(0, num_partitions-1)): next(self.partition_cycles[topic]) @@ -305,12 +303,13 @@ class KeyedProducer(Producer): def _next_partition(self, topic, key): if topic not in self.partitioners: - if topic not in self.client.topic_partitions: + if not self.client.has_metadata_for_topic(topic): self.client.load_metadata_for_topics(topic) - self.partitioners[topic] = \ - self.partitioner_class(self.client.topic_partitions[topic]) + + self.partitioners[topic] = self.partitioner_class(self.client.get_partition_ids_for_topic(topic)) + partitioner = self.partitioners[topic] - return partitioner.partition(key, self.client.topic_partitions[topic]) + return partitioner.partition(key, self.client.get_partition_ids_for_topic(topic)) def send(self, topic, key, msg): partition = self._next_partition(topic, key) diff --git a/kafka/protocol.py b/kafka/protocol.py index e5356c5..9e01f5a 100644 --- a/kafka/protocol.py +++ b/kafka/protocol.py @@ -9,11 +9,12 @@ from kafka.codec import ( gzip_encode, gzip_decode, snappy_encode, snappy_decode ) from kafka.common import ( - BrokerMetadata, PartitionMetadata, Message, OffsetAndMessage, - ProduceResponse, FetchResponse, OffsetResponse, - OffsetCommitResponse, OffsetFetchResponse, ProtocolError, - BufferUnderflowError, ChecksumError, ConsumerFetchSizeTooSmall, - UnsupportedCodecError + Message, OffsetAndMessage, TopicAndPartition, + BrokerMetadata, TopicMetadata, PartitionMetadata, + MetadataResponse, ProduceResponse, FetchResponse, + OffsetResponse, OffsetCommitResponse, OffsetFetchResponse, + ProtocolError, BufferUnderflowError, ChecksumError, + ConsumerFetchSizeTooSmall, UnsupportedCodecError ) from kafka.util import ( crc32, read_short_string, read_int_string, relative_unpack, @@ -343,7 +344,8 @@ class KafkaProtocol(object): yield OffsetResponse(topic, partition, error, tuple(offsets)) @classmethod - def encode_metadata_request(cls, client_id, correlation_id, topics=None): + def encode_metadata_request(cls, client_id, correlation_id, topics=None, + payloads=None): """ Encode a MetadataRequest @@ -353,7 +355,11 @@ class KafkaProtocol(object): correlation_id: int topics: list of strings """ - topics = [] if topics is None else topics + if payloads is None: + topics = [] if topics is None else topics + else: + topics = payloads + message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.METADATA_KEY) @@ -376,28 +382,24 @@ class KafkaProtocol(object): ((correlation_id, numbrokers), cur) = relative_unpack('>ii', data, 0) # Broker info - brokers = {} + brokers = [] for i in range(numbrokers): ((nodeId, ), cur) = relative_unpack('>i', data, cur) (host, cur) = read_short_string(data, cur) ((port,), cur) = relative_unpack('>i', data, cur) - brokers[nodeId] = BrokerMetadata(nodeId, host, port) + brokers.append(BrokerMetadata(nodeId, host, port)) # Topic info ((num_topics,), cur) = relative_unpack('>i', data, cur) - topic_metadata = {} + topic_metadata = [] for i in range(num_topics): - # NOTE: topic_error is discarded. Should probably be returned with - # the topic metadata. ((topic_error,), cur) = relative_unpack('>h', data, cur) (topic_name, cur) = read_short_string(data, cur) ((num_partitions,), cur) = relative_unpack('>i', data, cur) - partition_metadata = {} + partition_metadata = [] for j in range(num_partitions): - # NOTE: partition_error_code is discarded. Should probably be - # returned with the partition metadata. ((partition_error_code, partition, leader, numReplicas), cur) = \ relative_unpack('>hiii', data, cur) @@ -407,13 +409,16 @@ class KafkaProtocol(object): ((num_isr,), cur) = relative_unpack('>i', data, cur) (isr, cur) = relative_unpack('>%di' % num_isr, data, cur) - partition_metadata[partition] = \ - PartitionMetadata( - topic_name, partition, leader, replicas, isr) + partition_metadata.append( + PartitionMetadata(topic_name, partition, leader, + replicas, isr, partition_error_code) + ) - topic_metadata[topic_name] = partition_metadata + topic_metadata.append( + TopicMetadata(topic_name, topic_error, partition_metadata) + ) - return brokers, topic_metadata + return MetadataResponse(brokers, topic_metadata) @classmethod def encode_offset_commit_request(cls, client_id, correlation_id, |