diff options
author | Dana Powers <dana.powers@rd.io> | 2014-09-08 13:15:56 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2014-09-08 13:17:43 -0700 |
commit | fff812ddc80243208233f785b3f005904cf33482 (patch) | |
tree | 30f1eb703dcd8ce1063c413fd70ac11f3fff5072 /kafka/client.py | |
parent | 42a7ab18bb84fea60deed5f7e3a6cfdfaaaeecd6 (diff) | |
parent | 0dabb1fbe8a9f538527a03c2903475ed77a12c10 (diff) | |
download | kafka-python-fff812ddc80243208233f785b3f005904cf33482.tar.gz |
Merge pull request #223 from dpkp/metadata_refactor
Metadata Refactor
* add MetadataRequest and MetadataResponse namedtuples
* add TopicMetadata namedtuple
* add error codes to Topic and Partition Metadata
* add KafkaClient.send_metadata_request() method
* KafkaProtocol.decode_metadata_response changed to return a
MetadataResponse object so that it is consistent with server api:
[broker_list, topic_list]
* raise server exceptions in load_metadata_for_topics(*topics)
unless topics is null (full refresh)
* Replace non-standard exceptions (LeaderUnavailable,
PartitionUnavailable) with server standard exceptions
(LeaderNotAvailableError, UnknownTopicOrPartitionError)
Conflicts:
kafka/client.py
test/test_client.py
test/test_producer_integration.py
test/test_protocol.py
Diffstat (limited to 'kafka/client.py')
-rw-r--r-- | kafka/client.py | 219 |
1 files changed, 156 insertions, 63 deletions
diff --git a/kafka/client.py b/kafka/client.py index a918091..8c78694 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -7,11 +7,11 @@ import logging import time import kafka.common -from kafka.common import (TopicAndPartition, +from kafka.common import (TopicAndPartition, BrokerMetadata, ConnectionError, FailedPayloadsError, - PartitionUnavailableError, LeaderUnavailableError, KafkaUnavailableError, - KafkaTimeoutError, - UnknownTopicOrPartitionError, NotLeaderForPartitionError) + KafkaTimeoutError, KafkaUnavailableError, + LeaderNotAvailableError, UnknownTopicOrPartitionError, + NotLeaderForPartitionError) from kafka.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS from kafka.protocol import KafkaProtocol @@ -37,8 +37,9 @@ class KafkaClient(object): # create connections only when we need them self.conns = {} self.brokers = {} # broker_id -> BrokerMetadata - self.topics_to_brokers = {} # topic_id -> broker_id - self.topic_partitions = {} # topic_id -> [0, 1, 2, ...] + self.topics_to_brokers = {} # TopicAndPartition -> BrokerMetadata + self.topic_partitions = {} # topic -> partition -> PartitionMetadata + self.load_metadata_for_topics() # bootstrap with all metadata @@ -63,20 +64,37 @@ class KafkaClient(object): Returns the leader for a partition or None if the partition exists but has no leader. - PartitionUnavailableError will be raised if the topic or partition + UnknownTopicOrPartitionError will be raised if the topic or partition is not part of the metadata. + + LeaderNotAvailableError is raised if server has metadata, but there is + no current leader """ key = TopicAndPartition(topic, partition) - # reload metadata whether the partition is not available - # or has no leader (broker is None) - if self.topics_to_brokers.get(key) is None: - self.load_metadata_for_topics(topic) - if key not in self.topics_to_brokers: - raise PartitionUnavailableError("%s not available" % str(key)) + # Use cached metadata if it is there + if self.topics_to_brokers.get(key) is not None: + return self.topics_to_brokers[key] + + # Otherwise refresh metadata + + # If topic does not already exist, this will raise + # UnknownTopicOrPartitionError if not auto-creating + # LeaderNotAvailableError otherwise until partitions are created + self.load_metadata_for_topics(topic) - return self.topics_to_brokers[key] + # If the partition doesn't actually exist, raise + if partition not in self.topic_partitions[topic]: + raise UnknownTopicOrPartitionError(key) + + # If there's no leader for the partition, raise + meta = self.topic_partitions[topic][partition] + if meta.leader == -1: + raise LeaderNotAvailableError(meta) + + # Otherwise return the BrokerMetadata + return self.brokers[meta.leader] def _next_id(self): """ @@ -84,20 +102,26 @@ class KafkaClient(object): """ return next(KafkaClient.ID_GEN) - def _send_broker_unaware_request(self, requestId, request): + def _send_broker_unaware_request(self, payloads, encoder_fn, decoder_fn): """ Attempt to send a broker-agnostic request to one of the available brokers. Keep trying until you succeed. """ for (host, port) in self.hosts: + requestId = self._next_id() try: conn = self._get_conn(host, port) + request = encoder_fn(client_id=self.client_id, + correlation_id=requestId, + payloads=payloads) + conn.send(requestId, request) response = conn.recv(requestId) - return response + return decoder_fn(response) + except Exception as e: log.warning("Could not send request [%r] to server %s:%i, " - "trying next server: %s" % (binascii.b2a_hex(request), host, port, e)) + "trying next server: %s" % (requestId, host, port, e)) raise KafkaUnavailableError("All servers failed to process request") @@ -109,8 +133,8 @@ class KafkaClient(object): Params ====== - payloads: list of object-like entities with a topic and - partition attribute + payloads: list of object-like entities with a topic (str) and + partition (int) attribute encode_fn: a method to encode the list of payloads to a request body, must accept client_id, correlation_id, and payloads as keyword arguments @@ -130,10 +154,6 @@ class KafkaClient(object): for payload in payloads: leader = self._get_leader_for_partition(payload.topic, payload.partition) - if leader is None: - raise LeaderUnavailableError( - "Leader not available for topic %s partition %s" % - (payload.topic, payload.partition)) payloads_by_broker[leader].append(payload) original_keys.append((payload.topic, payload.partition)) @@ -195,6 +215,24 @@ class KafkaClient(object): ################# # Public API # ################# + def close(self): + for conn in self.conns.values(): + conn.close() + + def copy(self): + """ + Create an inactive copy of the client object + A reinit() has to be done on the copy before it can be used again + """ + c = copy.deepcopy(self) + for key in c.conns: + c.conns[key] = self.conns[key].copy() + return c + + def reinit(self): + for conn in self.conns.values(): + conn.reinit() + def reset_topic_metadata(self, *topics): for topic in topics: try: @@ -212,70 +250,125 @@ class KafkaClient(object): self.topic_partitions.clear() def has_metadata_for_topic(self, topic): - return topic in self.topic_partitions + return ( + topic in self.topic_partitions + and len(self.topic_partitions[topic]) > 0 + ) + + def get_partition_ids_for_topic(self, topic): + if topic not in self.topic_partitions: + return None + + return list(self.topic_partitions[topic]) def ensure_topic_exists(self, topic, timeout = 30): start_time = time.time() - self.load_metadata_for_topics(topic) while not self.has_metadata_for_topic(topic): if time.time() > start_time + timeout: raise KafkaTimeoutError("Unable to create topic {0}".format(topic)) - self.load_metadata_for_topics(topic) + try: + self.load_metadata_for_topics(topic) + except LeaderNotAvailableError: + pass + except UnknownTopicOrPartitionError: + # Server is not configured to auto-create + # retrying in this case will not help + raise time.sleep(.5) - def close(self): - for conn in self.conns.values(): - conn.close() - - def copy(self): - """ - Create an inactive copy of the client object - A reinit() has to be done on the copy before it can be used again + def load_metadata_for_topics(self, *topics): """ - c = copy.deepcopy(self) - for key in c.conns: - c.conns[key] = self.conns[key].copy() - return c + Fetch broker and topic-partition metadata from the server, + and update internal data: + broker list, topic/partition list, and topic/parition -> broker map - def reinit(self): - for conn in self.conns.values(): - conn.reinit() + This method should be called after receiving any error - def load_metadata_for_topics(self, *topics): - """ - Discover brokers and metadata for a set of topics. This function is called - lazily whenever metadata is unavailable. - """ - request_id = self._next_id() - request = KafkaProtocol.encode_metadata_request(self.client_id, - request_id, topics) + @param: *topics (optional) + If a list of topics is provided, the metadata refresh will be limited + to the specified topics only. + + Exceptions: + ---------- + If the broker is configured to not auto-create topics, + expect UnknownTopicOrPartitionError for topics that don't exist - response = self._send_broker_unaware_request(request_id, request) + If the broker is configured to auto-create topics, + expect LeaderNotAvailableError for new topics + until partitions have been initialized. - (brokers, topics) = KafkaProtocol.decode_metadata_response(response) + Exceptions *will not* be raised in a full refresh (i.e. no topic list) + In this case, error codes will be logged as errors - log.debug("Broker metadata: %s", brokers) - log.debug("Topic metadata: %s", topics) + Partition-level errors will also not be raised here + (a single partition w/o a leader, for example) + """ + resp = self.send_metadata_request(topics) + + log.debug("Broker metadata: %s", resp.brokers) + log.debug("Topic metadata: %s", resp.topics) + + self.brokers = dict([(broker.nodeId, broker) + for broker in resp.brokers]) - self.brokers = brokers + for topic_metadata in resp.topics: + topic = topic_metadata.topic + partitions = topic_metadata.partitions - for topic, partitions in topics.items(): self.reset_topic_metadata(topic) - if not partitions: - log.warning('No partitions for %s', topic) + # Errors expected for new topics + try: + kafka.common.check_error(topic_metadata) + except (UnknownTopicOrPartitionError, LeaderNotAvailableError) as e: + + # Raise if the topic was passed in explicitly + if topic in topics: + raise + + # Otherwise, just log a warning + log.error("Error loading topic metadata for %s: %s", topic, type(e)) continue - self.topic_partitions[topic] = [] - for partition, meta in partitions.items(): - self.topic_partitions[topic].append(partition) + self.topic_partitions[topic] = {} + for partition_metadata in partitions: + partition = partition_metadata.partition + leader = partition_metadata.leader + + self.topic_partitions[topic][partition] = partition_metadata + + # Populate topics_to_brokers dict topic_part = TopicAndPartition(topic, partition) - if meta.leader == -1: - log.warning('No leader for topic %s partition %s', topic, partition) + + # Check for partition errors + try: + kafka.common.check_error(partition_metadata) + + # If No Leader, topics_to_brokers topic_partition -> None + except LeaderNotAvailableError: + log.error('No leader for topic %s partition %d', topic, partition) self.topics_to_brokers[topic_part] = None + continue + + # If Known Broker, topic_partition -> BrokerMetadata + if leader in self.brokers: + self.topics_to_brokers[topic_part] = self.brokers[leader] + + # If Unknown Broker, fake BrokerMetadata so we dont lose the id + # (not sure how this could happen. server could be in bad state) else: - self.topics_to_brokers[topic_part] = brokers[meta.leader] + self.topics_to_brokers[topic_part] = BrokerMetadata( + leader, None, None + ) + + def send_metadata_request(self, payloads=[], fail_on_error=True, + callback=None): + + encoder = KafkaProtocol.encode_metadata_request + decoder = KafkaProtocol.decode_metadata_response + + return self._send_broker_unaware_request(payloads, encoder, decoder) def send_produce_request(self, payloads=[], acks=1, timeout=1000, fail_on_error=True, callback=None): |