summaryrefslogtreecommitdiff
path: root/kafka/client.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2014-03-21 22:50:53 -0700
committerDana Powers <dana.powers@gmail.com>2014-03-21 22:50:53 -0700
commite937e3f971f5958c8da6249b08288aafd5ed5bcd (patch)
tree0d495dfaee84d551ec97ca6ddac13d635fca75d1 /kafka/client.py
parent9599215bf28b65a29908b8644dcaa6f3614a425d (diff)
parent51246fb31859e9cee03b1e1359ad871274dca87e (diff)
downloadkafka-python-e937e3f971f5958c8da6249b08288aafd5ed5bcd.tar.gz
Merge pull request #109 from mrtheb/develop
TopicAndPartition fix when partition has no leader = -1
Diffstat (limited to 'kafka/client.py')
-rw-r--r--kafka/client.py33
1 files changed, 26 insertions, 7 deletions
diff --git a/kafka/client.py b/kafka/client.py
index a683fe0..ab0eb8d 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -8,7 +8,8 @@ from itertools import count
from kafka.common import (ErrorMapping, TopicAndPartition,
ConnectionError, FailedPayloadsError,
BrokerResponseError, PartitionUnavailableError,
- KafkaUnavailableError, KafkaRequestError)
+ LeaderUnavailableError,
+ KafkaUnavailableError)
from kafka.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS
from kafka.protocol import KafkaProtocol
@@ -62,12 +63,22 @@ class KafkaClient(object):
return self._get_conn(broker.host, broker.port)
def _get_leader_for_partition(self, topic, partition):
+ """
+ Returns the leader for a partition or None if the partition exists
+ but has no leader.
+
+ PartitionUnavailableError will be raised if the topic or partition
+ is not part of the metadata.
+ """
+
key = TopicAndPartition(topic, partition)
- if key not in self.topics_to_brokers:
+ # reload metadata whether the partition is not available
+ # or has no leader (broker is None)
+ if self.topics_to_brokers.get(key) is None:
self.load_metadata_for_topics(topic)
if key not in self.topics_to_brokers:
- raise KafkaRequestError("Partition does not exist: %s" % str(key))
+ raise PartitionUnavailableError("%s not available" % str(key))
return self.topics_to_brokers[key]
@@ -124,8 +135,11 @@ class KafkaClient(object):
for payload in payloads:
leader = self._get_leader_for_partition(payload.topic,
payload.partition)
- if leader == -1:
- raise PartitionUnavailableError("Leader is unassigned for %s-%s" % payload.topic, payload.partition)
+ if leader is None:
+ raise LeaderUnavailableError(
+ "Leader not available for topic %s partition %s" %
+ (payload.topic, payload.partition))
+
payloads_by_broker[leader].append(payload)
original_keys.append((payload.topic, payload.partition))
@@ -250,13 +264,18 @@ class KafkaClient(object):
self.reset_topic_metadata(topic)
if not partitions:
+ log.warning('No partitions for %s', topic)
continue
self.topic_partitions[topic] = []
for partition, meta in partitions.items():
- topic_part = TopicAndPartition(topic, partition)
- self.topics_to_brokers[topic_part] = brokers[meta.leader]
self.topic_partitions[topic].append(partition)
+ topic_part = TopicAndPartition(topic, partition)
+ if meta.leader == -1:
+ log.warning('No leader for topic %s partition %s', topic, partition)
+ self.topics_to_brokers[topic_part] = None
+ else:
+ self.topics_to_brokers[topic_part] = brokers[meta.leader]
def send_produce_request(self, payloads=[], acks=1, timeout=1000,
fail_on_error=True, callback=None):