summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/client.py219
-rw-r--r--kafka/common.py46
-rw-r--r--kafka/consumer.py11
-rw-r--r--kafka/producer.py19
-rw-r--r--kafka/protocol.py45
-rw-r--r--test/test_client.py297
-rw-r--r--test/test_producer_integration.py6
-rw-r--r--test/test_protocol.py73
8 files changed, 479 insertions, 237 deletions
diff --git a/kafka/client.py b/kafka/client.py
index a918091..8c78694 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -7,11 +7,11 @@ import logging
import time
import kafka.common
-from kafka.common import (TopicAndPartition,
+from kafka.common import (TopicAndPartition, BrokerMetadata,
ConnectionError, FailedPayloadsError,
- PartitionUnavailableError, LeaderUnavailableError, KafkaUnavailableError,
- KafkaTimeoutError,
- UnknownTopicOrPartitionError, NotLeaderForPartitionError)
+ KafkaTimeoutError, KafkaUnavailableError,
+ LeaderNotAvailableError, UnknownTopicOrPartitionError,
+ NotLeaderForPartitionError)
from kafka.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS
from kafka.protocol import KafkaProtocol
@@ -37,8 +37,9 @@ class KafkaClient(object):
# create connections only when we need them
self.conns = {}
self.brokers = {} # broker_id -> BrokerMetadata
- self.topics_to_brokers = {} # topic_id -> broker_id
- self.topic_partitions = {} # topic_id -> [0, 1, 2, ...]
+ self.topics_to_brokers = {} # TopicAndPartition -> BrokerMetadata
+ self.topic_partitions = {} # topic -> partition -> PartitionMetadata
+
self.load_metadata_for_topics() # bootstrap with all metadata
@@ -63,20 +64,37 @@ class KafkaClient(object):
Returns the leader for a partition or None if the partition exists
but has no leader.
- PartitionUnavailableError will be raised if the topic or partition
+ UnknownTopicOrPartitionError will be raised if the topic or partition
is not part of the metadata.
+
+ LeaderNotAvailableError is raised if server has metadata, but there is
+ no current leader
"""
key = TopicAndPartition(topic, partition)
- # reload metadata whether the partition is not available
- # or has no leader (broker is None)
- if self.topics_to_brokers.get(key) is None:
- self.load_metadata_for_topics(topic)
- if key not in self.topics_to_brokers:
- raise PartitionUnavailableError("%s not available" % str(key))
+ # Use cached metadata if it is there
+ if self.topics_to_brokers.get(key) is not None:
+ return self.topics_to_brokers[key]
+
+ # Otherwise refresh metadata
+
+ # If topic does not already exist, this will raise
+ # UnknownTopicOrPartitionError if not auto-creating
+ # LeaderNotAvailableError otherwise until partitions are created
+ self.load_metadata_for_topics(topic)
- return self.topics_to_brokers[key]
+ # If the partition doesn't actually exist, raise
+ if partition not in self.topic_partitions[topic]:
+ raise UnknownTopicOrPartitionError(key)
+
+ # If there's no leader for the partition, raise
+ meta = self.topic_partitions[topic][partition]
+ if meta.leader == -1:
+ raise LeaderNotAvailableError(meta)
+
+ # Otherwise return the BrokerMetadata
+ return self.brokers[meta.leader]
def _next_id(self):
"""
@@ -84,20 +102,26 @@ class KafkaClient(object):
"""
return next(KafkaClient.ID_GEN)
- def _send_broker_unaware_request(self, requestId, request):
+ def _send_broker_unaware_request(self, payloads, encoder_fn, decoder_fn):
"""
Attempt to send a broker-agnostic request to one of the available
brokers. Keep trying until you succeed.
"""
for (host, port) in self.hosts:
+ requestId = self._next_id()
try:
conn = self._get_conn(host, port)
+ request = encoder_fn(client_id=self.client_id,
+ correlation_id=requestId,
+ payloads=payloads)
+
conn.send(requestId, request)
response = conn.recv(requestId)
- return response
+ return decoder_fn(response)
+
except Exception as e:
log.warning("Could not send request [%r] to server %s:%i, "
- "trying next server: %s" % (binascii.b2a_hex(request), host, port, e))
+ "trying next server: %s" % (requestId, host, port, e))
raise KafkaUnavailableError("All servers failed to process request")
@@ -109,8 +133,8 @@ class KafkaClient(object):
Params
======
- payloads: list of object-like entities with a topic and
- partition attribute
+ payloads: list of object-like entities with a topic (str) and
+ partition (int) attribute
encode_fn: a method to encode the list of payloads to a request body,
must accept client_id, correlation_id, and payloads as
keyword arguments
@@ -130,10 +154,6 @@ class KafkaClient(object):
for payload in payloads:
leader = self._get_leader_for_partition(payload.topic,
payload.partition)
- if leader is None:
- raise LeaderUnavailableError(
- "Leader not available for topic %s partition %s" %
- (payload.topic, payload.partition))
payloads_by_broker[leader].append(payload)
original_keys.append((payload.topic, payload.partition))
@@ -195,6 +215,24 @@ class KafkaClient(object):
#################
# Public API #
#################
+ def close(self):
+ for conn in self.conns.values():
+ conn.close()
+
+ def copy(self):
+ """
+ Create an inactive copy of the client object
+ A reinit() has to be done on the copy before it can be used again
+ """
+ c = copy.deepcopy(self)
+ for key in c.conns:
+ c.conns[key] = self.conns[key].copy()
+ return c
+
+ def reinit(self):
+ for conn in self.conns.values():
+ conn.reinit()
+
def reset_topic_metadata(self, *topics):
for topic in topics:
try:
@@ -212,70 +250,125 @@ class KafkaClient(object):
self.topic_partitions.clear()
def has_metadata_for_topic(self, topic):
- return topic in self.topic_partitions
+ return (
+ topic in self.topic_partitions
+ and len(self.topic_partitions[topic]) > 0
+ )
+
+ def get_partition_ids_for_topic(self, topic):
+ if topic not in self.topic_partitions:
+ return None
+
+ return list(self.topic_partitions[topic])
def ensure_topic_exists(self, topic, timeout = 30):
start_time = time.time()
- self.load_metadata_for_topics(topic)
while not self.has_metadata_for_topic(topic):
if time.time() > start_time + timeout:
raise KafkaTimeoutError("Unable to create topic {0}".format(topic))
- self.load_metadata_for_topics(topic)
+ try:
+ self.load_metadata_for_topics(topic)
+ except LeaderNotAvailableError:
+ pass
+ except UnknownTopicOrPartitionError:
+ # Server is not configured to auto-create
+ # retrying in this case will not help
+ raise
time.sleep(.5)
- def close(self):
- for conn in self.conns.values():
- conn.close()
-
- def copy(self):
- """
- Create an inactive copy of the client object
- A reinit() has to be done on the copy before it can be used again
+ def load_metadata_for_topics(self, *topics):
"""
- c = copy.deepcopy(self)
- for key in c.conns:
- c.conns[key] = self.conns[key].copy()
- return c
+ Fetch broker and topic-partition metadata from the server,
+ and update internal data:
+ broker list, topic/partition list, and topic/parition -> broker map
- def reinit(self):
- for conn in self.conns.values():
- conn.reinit()
+ This method should be called after receiving any error
- def load_metadata_for_topics(self, *topics):
- """
- Discover brokers and metadata for a set of topics. This function is called
- lazily whenever metadata is unavailable.
- """
- request_id = self._next_id()
- request = KafkaProtocol.encode_metadata_request(self.client_id,
- request_id, topics)
+ @param: *topics (optional)
+ If a list of topics is provided, the metadata refresh will be limited
+ to the specified topics only.
+
+ Exceptions:
+ ----------
+ If the broker is configured to not auto-create topics,
+ expect UnknownTopicOrPartitionError for topics that don't exist
- response = self._send_broker_unaware_request(request_id, request)
+ If the broker is configured to auto-create topics,
+ expect LeaderNotAvailableError for new topics
+ until partitions have been initialized.
- (brokers, topics) = KafkaProtocol.decode_metadata_response(response)
+ Exceptions *will not* be raised in a full refresh (i.e. no topic list)
+ In this case, error codes will be logged as errors
- log.debug("Broker metadata: %s", brokers)
- log.debug("Topic metadata: %s", topics)
+ Partition-level errors will also not be raised here
+ (a single partition w/o a leader, for example)
+ """
+ resp = self.send_metadata_request(topics)
+
+ log.debug("Broker metadata: %s", resp.brokers)
+ log.debug("Topic metadata: %s", resp.topics)
+
+ self.brokers = dict([(broker.nodeId, broker)
+ for broker in resp.brokers])
- self.brokers = brokers
+ for topic_metadata in resp.topics:
+ topic = topic_metadata.topic
+ partitions = topic_metadata.partitions
- for topic, partitions in topics.items():
self.reset_topic_metadata(topic)
- if not partitions:
- log.warning('No partitions for %s', topic)
+ # Errors expected for new topics
+ try:
+ kafka.common.check_error(topic_metadata)
+ except (UnknownTopicOrPartitionError, LeaderNotAvailableError) as e:
+
+ # Raise if the topic was passed in explicitly
+ if topic in topics:
+ raise
+
+ # Otherwise, just log a warning
+ log.error("Error loading topic metadata for %s: %s", topic, type(e))
continue
- self.topic_partitions[topic] = []
- for partition, meta in partitions.items():
- self.topic_partitions[topic].append(partition)
+ self.topic_partitions[topic] = {}
+ for partition_metadata in partitions:
+ partition = partition_metadata.partition
+ leader = partition_metadata.leader
+
+ self.topic_partitions[topic][partition] = partition_metadata
+
+ # Populate topics_to_brokers dict
topic_part = TopicAndPartition(topic, partition)
- if meta.leader == -1:
- log.warning('No leader for topic %s partition %s', topic, partition)
+
+ # Check for partition errors
+ try:
+ kafka.common.check_error(partition_metadata)
+
+ # If No Leader, topics_to_brokers topic_partition -> None
+ except LeaderNotAvailableError:
+ log.error('No leader for topic %s partition %d', topic, partition)
self.topics_to_brokers[topic_part] = None
+ continue
+
+ # If Known Broker, topic_partition -> BrokerMetadata
+ if leader in self.brokers:
+ self.topics_to_brokers[topic_part] = self.brokers[leader]
+
+ # If Unknown Broker, fake BrokerMetadata so we dont lose the id
+ # (not sure how this could happen. server could be in bad state)
else:
- self.topics_to_brokers[topic_part] = brokers[meta.leader]
+ self.topics_to_brokers[topic_part] = BrokerMetadata(
+ leader, None, None
+ )
+
+ def send_metadata_request(self, payloads=[], fail_on_error=True,
+ callback=None):
+
+ encoder = KafkaProtocol.encode_metadata_request
+ decoder = KafkaProtocol.decode_metadata_response
+
+ return self._send_broker_unaware_request(payloads, encoder, decoder)
def send_produce_request(self, payloads=[], acks=1, timeout=1000,
fail_on_error=True, callback=None):
diff --git a/kafka/common.py b/kafka/common.py
index 907e128..008736c 100644
--- a/kafka/common.py
+++ b/kafka/common.py
@@ -17,8 +17,14 @@ OffsetRequest = namedtuple("OffsetRequest",
OffsetCommitRequest = namedtuple("OffsetCommitRequest",
["topic", "partition", "offset", "metadata"])
+MetadataRequest = namedtuple("MetadataRequest",
+ ["topics"])
+
OffsetFetchRequest = namedtuple("OffsetFetchRequest", ["topic", "partition"])
+MetadataResponse = namedtuple("MetadataResponse",
+ ["brokers", "topics"])
+
# Response payloads
ProduceResponse = namedtuple("ProduceResponse",
["topic", "partition", "error", "offset"])
@@ -36,16 +42,26 @@ OffsetFetchResponse = namedtuple("OffsetFetchResponse",
["topic", "partition", "offset",
"metadata", "error"])
-BrokerMetadata = namedtuple("BrokerMetadata", ["nodeId", "host", "port"])
-PartitionMetadata = namedtuple("PartitionMetadata",
- ["topic", "partition", "leader",
- "replicas", "isr"])
# Other useful structs
-OffsetAndMessage = namedtuple("OffsetAndMessage", ["offset", "message"])
-Message = namedtuple("Message", ["magic", "attributes", "key", "value"])
-TopicAndPartition = namedtuple("TopicAndPartition", ["topic", "partition"])
+BrokerMetadata = namedtuple("BrokerMetadata",
+ ["nodeId", "host", "port"])
+
+TopicMetadata = namedtuple("TopicMetadata",
+ ["topic", "error", "partitions"])
+
+PartitionMetadata = namedtuple("PartitionMetadata",
+ ["topic", "partition", "leader", "replicas", "isr", "error"])
+
+OffsetAndMessage = namedtuple("OffsetAndMessage",
+ ["offset", "message"])
+
+Message = namedtuple("Message",
+ ["magic", "attributes", "key", "value"])
+
+TopicAndPartition = namedtuple("TopicAndPartition",
+ ["topic", "partition"])
#################
@@ -60,6 +76,9 @@ class KafkaError(RuntimeError):
class BrokerResponseError(KafkaError):
pass
+class NoError(BrokerResponseError):
+ errno = 0
+ message = 'SUCCESS'
class UnknownError(BrokerResponseError):
errno = -1
@@ -139,14 +158,6 @@ class KafkaTimeoutError(KafkaError):
pass
-class LeaderUnavailableError(KafkaError):
- pass
-
-
-class PartitionUnavailableError(KafkaError):
- pass
-
-
class FailedPayloadsError(KafkaError):
pass
@@ -181,6 +192,7 @@ class UnsupportedCodecError(KafkaError):
kafka_errors = {
-1 : UnknownError,
+ 0 : NoError,
1 : OffsetOutOfRangeError,
2 : InvalidMessageError,
3 : UnknownTopicOrPartitionError,
@@ -198,7 +210,7 @@ kafka_errors = {
def check_error(response):
- error = kafka_errors.get(response.error)
- if error:
+ error = kafka_errors.get(response.error, UnknownError)
+ if error is not NoError:
raise error(response)
diff --git a/kafka/consumer.py b/kafka/consumer.py
index fa1b8bc..42628e1 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -91,7 +91,7 @@ class Consumer(object):
self.offsets = {}
if not partitions:
- partitions = self.client.topic_partitions[topic]
+ partitions = self.client.get_partition_ids_for_topic(topic)
else:
assert all(isinstance(x, numbers.Integral) for x in partitions)
@@ -117,9 +117,9 @@ class Consumer(object):
def fetch_last_known_offsets(self, partitions=None):
if not partitions:
- partitions = self.client.topic_partitions[self.topic]
+ partitions = self.client.get_partition_ids_for_topic(self.topic)
- def get_or_init_offset_callback(resp):
+ def get_or_init_offset(resp):
try:
kafka.common.check_error(resp)
return resp.offset
@@ -128,10 +128,9 @@ class Consumer(object):
for partition in partitions:
req = OffsetFetchRequest(self.topic, partition)
- (offset,) = self.client.send_offset_fetch_request(self.group, [req],
- callback=get_or_init_offset_callback,
+ (resp,) = self.client.send_offset_fetch_request(self.group, [req],
fail_on_error=False)
- self.offsets[partition] = offset
+ self.offsets[partition] = get_or_init_offset(resp)
self.fetch_offsets = self.offsets.copy()
def commit(self, partitions=None):
diff --git a/kafka/producer.py b/kafka/producer.py
index 4a04b38..f186649 100644
--- a/kafka/producer.py
+++ b/kafka/producer.py
@@ -247,16 +247,14 @@ class SimpleProducer(Producer):
def _next_partition(self, topic):
if topic not in self.partition_cycles:
- if topic not in self.client.topic_partitions:
+ if not self.client.has_metadata_for_topic(topic):
self.client.load_metadata_for_topics(topic)
- try:
- self.partition_cycles[topic] = cycle(self.client.topic_partitions[topic])
- except KeyError:
- raise UnknownTopicOrPartitionError(topic)
+
+ self.partition_cycles[topic] = cycle(self.client.get_partition_ids_for_topic(topic))
# Randomize the initial partition that is returned
if self.random_start:
- num_partitions = len(self.client.topic_partitions[topic])
+ num_partitions = len(self.client.get_partition_ids_for_topic(topic))
for _ in xrange(random.randint(0, num_partitions-1)):
next(self.partition_cycles[topic])
@@ -305,12 +303,13 @@ class KeyedProducer(Producer):
def _next_partition(self, topic, key):
if topic not in self.partitioners:
- if topic not in self.client.topic_partitions:
+ if not self.client.has_metadata_for_topic(topic):
self.client.load_metadata_for_topics(topic)
- self.partitioners[topic] = \
- self.partitioner_class(self.client.topic_partitions[topic])
+
+ self.partitioners[topic] = self.partitioner_class(self.client.get_partition_ids_for_topic(topic))
+
partitioner = self.partitioners[topic]
- return partitioner.partition(key, self.client.topic_partitions[topic])
+ return partitioner.partition(key, self.client.get_partition_ids_for_topic(topic))
def send(self, topic, key, msg):
partition = self._next_partition(topic, key)
diff --git a/kafka/protocol.py b/kafka/protocol.py
index e5356c5..9e01f5a 100644
--- a/kafka/protocol.py
+++ b/kafka/protocol.py
@@ -9,11 +9,12 @@ from kafka.codec import (
gzip_encode, gzip_decode, snappy_encode, snappy_decode
)
from kafka.common import (
- BrokerMetadata, PartitionMetadata, Message, OffsetAndMessage,
- ProduceResponse, FetchResponse, OffsetResponse,
- OffsetCommitResponse, OffsetFetchResponse, ProtocolError,
- BufferUnderflowError, ChecksumError, ConsumerFetchSizeTooSmall,
- UnsupportedCodecError
+ Message, OffsetAndMessage, TopicAndPartition,
+ BrokerMetadata, TopicMetadata, PartitionMetadata,
+ MetadataResponse, ProduceResponse, FetchResponse,
+ OffsetResponse, OffsetCommitResponse, OffsetFetchResponse,
+ ProtocolError, BufferUnderflowError, ChecksumError,
+ ConsumerFetchSizeTooSmall, UnsupportedCodecError
)
from kafka.util import (
crc32, read_short_string, read_int_string, relative_unpack,
@@ -343,7 +344,8 @@ class KafkaProtocol(object):
yield OffsetResponse(topic, partition, error, tuple(offsets))
@classmethod
- def encode_metadata_request(cls, client_id, correlation_id, topics=None):
+ def encode_metadata_request(cls, client_id, correlation_id, topics=None,
+ payloads=None):
"""
Encode a MetadataRequest
@@ -353,7 +355,11 @@ class KafkaProtocol(object):
correlation_id: int
topics: list of strings
"""
- topics = [] if topics is None else topics
+ if payloads is None:
+ topics = [] if topics is None else topics
+ else:
+ topics = payloads
+
message = cls._encode_message_header(client_id, correlation_id,
KafkaProtocol.METADATA_KEY)
@@ -376,28 +382,24 @@ class KafkaProtocol(object):
((correlation_id, numbrokers), cur) = relative_unpack('>ii', data, 0)
# Broker info
- brokers = {}
+ brokers = []
for i in range(numbrokers):
((nodeId, ), cur) = relative_unpack('>i', data, cur)
(host, cur) = read_short_string(data, cur)
((port,), cur) = relative_unpack('>i', data, cur)
- brokers[nodeId] = BrokerMetadata(nodeId, host, port)
+ brokers.append(BrokerMetadata(nodeId, host, port))
# Topic info
((num_topics,), cur) = relative_unpack('>i', data, cur)
- topic_metadata = {}
+ topic_metadata = []
for i in range(num_topics):
- # NOTE: topic_error is discarded. Should probably be returned with
- # the topic metadata.
((topic_error,), cur) = relative_unpack('>h', data, cur)
(topic_name, cur) = read_short_string(data, cur)
((num_partitions,), cur) = relative_unpack('>i', data, cur)
- partition_metadata = {}
+ partition_metadata = []
for j in range(num_partitions):
- # NOTE: partition_error_code is discarded. Should probably be
- # returned with the partition metadata.
((partition_error_code, partition, leader, numReplicas), cur) = \
relative_unpack('>hiii', data, cur)
@@ -407,13 +409,16 @@ class KafkaProtocol(object):
((num_isr,), cur) = relative_unpack('>i', data, cur)
(isr, cur) = relative_unpack('>%di' % num_isr, data, cur)
- partition_metadata[partition] = \
- PartitionMetadata(
- topic_name, partition, leader, replicas, isr)
+ partition_metadata.append(
+ PartitionMetadata(topic_name, partition, leader,
+ replicas, isr, partition_error_code)
+ )
- topic_metadata[topic_name] = partition_metadata
+ topic_metadata.append(
+ TopicMetadata(topic_name, topic_error, partition_metadata)
+ )
- return brokers, topic_metadata
+ return MetadataResponse(brokers, topic_metadata)
@classmethod
def encode_offset_commit_request(cls, client_id, correlation_id,
diff --git a/test/test_client.py b/test/test_client.py
index 274655e..ba6e3b1 100644
--- a/test/test_client.py
+++ b/test/test_client.py
@@ -1,15 +1,17 @@
import socket
from time import sleep
-from mock import MagicMock, patch
+from mock import ANY, MagicMock, patch
import six
from . import unittest
from kafka import KafkaClient
from kafka.common import (
- ProduceRequest, BrokerMetadata, PartitionMetadata,
+ ProduceRequest, MetadataResponse,
+ BrokerMetadata, TopicMetadata, PartitionMetadata,
TopicAndPartition, KafkaUnavailableError,
- LeaderUnavailableError, PartitionUnavailableError,
+ LeaderNotAvailableError, NoError,
+ UnknownTopicOrPartitionError, KafkaTimeoutError,
ConnectionError
)
from kafka.conn import KafkaConnection
@@ -17,6 +19,10 @@ from kafka.protocol import KafkaProtocol, create_message
from test.testutil import Timer
+NO_ERROR = 0
+UNKNOWN_TOPIC_OR_PARTITION = 3
+NO_LEADER = 5
+
class TestKafkaClient(unittest.TestCase):
def test_init_with_list(self):
with patch.object(KafkaClient, 'load_metadata_for_topics'):
@@ -64,10 +70,12 @@ class TestKafkaClient(unittest.TestCase):
req = KafkaProtocol.encode_metadata_request(b'client', 0)
with self.assertRaises(KafkaUnavailableError):
- client._send_broker_unaware_request(1, req)
+ client._send_broker_unaware_request(payloads=['fake request'],
+ encoder_fn=MagicMock(return_value='fake encoded message'),
+ decoder_fn=lambda x: x)
for key, conn in six.iteritems(mocked_conns):
- conn.send.assert_called_with(1, req)
+ conn.send.assert_called_with(ANY, 'fake encoded message')
def test_send_broker_unaware_request(self):
'Tests that call works when at least one of the host is available'
@@ -88,40 +96,46 @@ class TestKafkaClient(unittest.TestCase):
# patch to avoid making requests before we want it
with patch.object(KafkaClient, 'load_metadata_for_topics'):
with patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn):
- client = KafkaClient(hosts='kafka01:9092,kafka02:9092')
+ with patch.object(KafkaClient, '_next_id', return_value=1):
+ client = KafkaClient(hosts='kafka01:9092,kafka02:9092')
- req = KafkaProtocol.encode_metadata_request(b'client', 0)
- resp = client._send_broker_unaware_request(1, req)
+ resp = client._send_broker_unaware_request(payloads=['fake request'],
+ encoder_fn=MagicMock(),
+ decoder_fn=lambda x: x)
- self.assertEqual('valid response', resp)
- mocked_conns[('kafka02', 9092)].recv.assert_called_with(1)
+ self.assertEqual('valid response', resp)
+ mocked_conns[('kafka02', 9092)].recv.assert_called_with(1)
@patch('kafka.client.KafkaConnection')
@patch('kafka.client.KafkaProtocol')
def test_load_metadata(self, protocol, conn):
- "Load metadata for all topics"
conn.recv.return_value = 'response' # anything but None
- brokers = {}
- brokers[0] = BrokerMetadata(1, 'broker_1', 4567)
- brokers[1] = BrokerMetadata(2, 'broker_2', 5678)
-
- topics = {}
- topics['topic_1'] = {
- 0: PartitionMetadata('topic_1', 0, 1, [1, 2], [1, 2])
- }
- topics['topic_noleader'] = {
- 0: PartitionMetadata('topic_noleader', 0, -1, [], []),
- 1: PartitionMetadata('topic_noleader', 1, -1, [], [])
- }
- topics['topic_no_partitions'] = {}
- topics['topic_3'] = {
- 0: PartitionMetadata('topic_3', 0, 0, [0, 1], [0, 1]),
- 1: PartitionMetadata('topic_3', 1, 1, [1, 0], [1, 0]),
- 2: PartitionMetadata('topic_3', 2, 0, [0, 1], [0, 1])
- }
- protocol.decode_metadata_response.return_value = (brokers, topics)
+ brokers = [
+ BrokerMetadata(0, 'broker_1', 4567),
+ BrokerMetadata(1, 'broker_2', 5678)
+ ]
+
+ topics = [
+ TopicMetadata('topic_1', NO_ERROR, [
+ PartitionMetadata('topic_1', 0, 1, [1, 2], [1, 2], NO_ERROR)
+ ]),
+ TopicMetadata('topic_noleader', NO_ERROR, [
+ PartitionMetadata('topic_noleader', 0, -1, [], [],
+ NO_LEADER),
+ PartitionMetadata('topic_noleader', 1, -1, [], [],
+ NO_LEADER),
+ ]),
+ TopicMetadata('topic_no_partitions', NO_LEADER, []),
+ TopicMetadata('topic_unknown', UNKNOWN_TOPIC_OR_PARTITION, []),
+ TopicMetadata('topic_3', NO_ERROR, [
+ PartitionMetadata('topic_3', 0, 0, [0, 1], [0, 1], NO_ERROR),
+ PartitionMetadata('topic_3', 1, 1, [1, 0], [1, 0], NO_ERROR),
+ PartitionMetadata('topic_3', 2, 0, [0, 1], [0, 1], NO_ERROR)
+ ])
+ ]
+ protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
# client loads metadata at init
client = KafkaClient(hosts=['broker_1:4567'])
@@ -134,6 +148,78 @@ class TestKafkaClient(unittest.TestCase):
TopicAndPartition('topic_3', 2): brokers[0]},
client.topics_to_brokers)
+ # if we ask for metadata explicitly, it should raise errors
+ with self.assertRaises(LeaderNotAvailableError):
+ client.load_metadata_for_topics('topic_no_partitions')
+
+ with self.assertRaises(UnknownTopicOrPartitionError):
+ client.load_metadata_for_topics('topic_unknown')
+
+ # This should not raise
+ client.load_metadata_for_topics('topic_no_leader')
+
+ @patch('kafka.client.KafkaConnection')
+ @patch('kafka.client.KafkaProtocol')
+ def test_has_metadata_for_topic(self, protocol, conn):
+
+ conn.recv.return_value = 'response' # anything but None
+
+ brokers = [
+ BrokerMetadata(0, 'broker_1', 4567),
+ BrokerMetadata(1, 'broker_2', 5678)
+ ]
+
+ topics = [
+ TopicMetadata('topic_still_creating', NO_LEADER, []),
+ TopicMetadata('topic_doesnt_exist', UNKNOWN_TOPIC_OR_PARTITION, []),
+ TopicMetadata('topic_noleaders', NO_ERROR, [
+ PartitionMetadata('topic_noleaders', 0, -1, [], [], NO_LEADER),
+ PartitionMetadata('topic_noleaders', 1, -1, [], [], NO_LEADER),
+ ]),
+ ]
+ protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
+
+ client = KafkaClient(hosts=['broker_1:4567'])
+
+ # Topics with no partitions return False
+ self.assertFalse(client.has_metadata_for_topic('topic_still_creating'))
+ self.assertFalse(client.has_metadata_for_topic('topic_doesnt_exist'))
+
+ # Topic with partition metadata, but no leaders return True
+ self.assertTrue(client.has_metadata_for_topic('topic_noleaders'))
+
+ @patch('kafka.client.KafkaConnection')
+ @patch('kafka.client.KafkaProtocol')
+ def test_ensure_topic_exists(self, protocol, conn):
+
+ conn.recv.return_value = 'response' # anything but None
+
+ brokers = [
+ BrokerMetadata(0, 'broker_1', 4567),
+ BrokerMetadata(1, 'broker_2', 5678)
+ ]
+
+ topics = [
+ TopicMetadata('topic_still_creating', NO_LEADER, []),
+ TopicMetadata('topic_doesnt_exist', UNKNOWN_TOPIC_OR_PARTITION, []),
+ TopicMetadata('topic_noleaders', NO_ERROR, [
+ PartitionMetadata('topic_noleaders', 0, -1, [], [], NO_LEADER),
+ PartitionMetadata('topic_noleaders', 1, -1, [], [], NO_LEADER),
+ ]),
+ ]
+ protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
+
+ client = KafkaClient(hosts=['broker_1:4567'])
+
+ with self.assertRaises(UnknownTopicOrPartitionError):
+ client.ensure_topic_exists('topic_doesnt_exist', timeout=1)
+
+ with self.assertRaises(KafkaTimeoutError):
+ client.ensure_topic_exists('topic_still_creating', timeout=1)
+
+ # This should not raise
+ client.ensure_topic_exists('topic_noleaders', timeout=1)
+
@patch('kafka.client.KafkaConnection')
@patch('kafka.client.KafkaProtocol')
def test_get_leader_for_partitions_reloads_metadata(self, protocol, conn):
@@ -141,70 +227,84 @@ class TestKafkaClient(unittest.TestCase):
conn.recv.return_value = 'response' # anything but None
- brokers = {}
- brokers[0] = BrokerMetadata(0, 'broker_1', 4567)
- brokers[1] = BrokerMetadata(1, 'broker_2', 5678)
+ brokers = [
+ BrokerMetadata(0, 'broker_1', 4567),
+ BrokerMetadata(1, 'broker_2', 5678)
+ ]
- topics = {'topic_no_partitions': {}}
- protocol.decode_metadata_response.return_value = (brokers, topics)
+ topics = [
+ TopicMetadata('topic_no_partitions', NO_LEADER, [])
+ ]
+ protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
client = KafkaClient(hosts=['broker_1:4567'])
# topic metadata is loaded but empty
self.assertDictEqual({}, client.topics_to_brokers)
- topics['topic_no_partitions'] = {
- 0: PartitionMetadata('topic_no_partitions', 0, 0, [0, 1], [0, 1])
- }
- protocol.decode_metadata_response.return_value = (brokers, topics)
+ topics = [
+ TopicMetadata('topic_one_partition', NO_ERROR, [
+ PartitionMetadata('topic_no_partition', 0, 0, [0, 1], [0, 1], NO_ERROR)
+ ])
+ ]
+ protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
# calling _get_leader_for_partition (from any broker aware request)
# will try loading metadata again for the same topic
- leader = client._get_leader_for_partition('topic_no_partitions', 0)
+ leader = client._get_leader_for_partition('topic_one_partition', 0)
self.assertEqual(brokers[0], leader)
self.assertDictEqual({
- TopicAndPartition('topic_no_partitions', 0): brokers[0]},
+ TopicAndPartition('topic_one_partition', 0): brokers[0]},
client.topics_to_brokers)
@patch('kafka.client.KafkaConnection')
@patch('kafka.client.KafkaProtocol')
def test_get_leader_for_unassigned_partitions(self, protocol, conn):
- "Get leader raises if no partitions is defined for a topic"
conn.recv.return_value = 'response' # anything but None
- brokers = {}
- brokers[0] = BrokerMetadata(0, 'broker_1', 4567)
- brokers[1] = BrokerMetadata(1, 'broker_2', 5678)
+ brokers = [
+ BrokerMetadata(0, 'broker_1', 4567),
+ BrokerMetadata(1, 'broker_2', 5678)
+ ]
- topics = {'topic_no_partitions': {}}
- protocol.decode_metadata_response.return_value = (brokers, topics)
+ topics = [
+ TopicMetadata('topic_no_partitions', NO_LEADER, []),
+ TopicMetadata('topic_unknown', UNKNOWN_TOPIC_OR_PARTITION, []),
+ ]
+ protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
client = KafkaClient(hosts=['broker_1:4567'])
self.assertDictEqual({}, client.topics_to_brokers)
- with self.assertRaises(PartitionUnavailableError):
+ with self.assertRaises(LeaderNotAvailableError):
client._get_leader_for_partition('topic_no_partitions', 0)
+ with self.assertRaises(UnknownTopicOrPartitionError):
+ client._get_leader_for_partition('topic_unknown', 0)
+
@patch('kafka.client.KafkaConnection')
@patch('kafka.client.KafkaProtocol')
- def test_get_leader_returns_none_when_noleader(self, protocol, conn):
- "Getting leader for partitions returns None when the partiion has no leader"
+ def test_get_leader_exceptions_when_noleader(self, protocol, conn):
conn.recv.return_value = 'response' # anything but None
- brokers = {}
- brokers[0] = BrokerMetadata(0, 'broker_1', 4567)
- brokers[1] = BrokerMetadata(1, 'broker_2', 5678)
-
- topics = {}
- topics['topic_noleader'] = {
- 0: PartitionMetadata('topic_noleader', 0, -1, [], []),
- 1: PartitionMetadata('topic_noleader', 1, -1, [], [])
- }
- protocol.decode_metadata_response.return_value = (brokers, topics)
+ brokers = [
+ BrokerMetadata(0, 'broker_1', 4567),
+ BrokerMetadata(1, 'broker_2', 5678)
+ ]
+
+ topics = [
+ TopicMetadata('topic_noleader', NO_ERROR, [
+ PartitionMetadata('topic_noleader', 0, -1, [], [],
+ NO_LEADER),
+ PartitionMetadata('topic_noleader', 1, -1, [], [],
+ NO_LEADER),
+ ]),
+ ]
+ protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
client = KafkaClient(hosts=['broker_1:4567'])
self.assertDictEqual(
@@ -213,34 +313,48 @@ class TestKafkaClient(unittest.TestCase):
TopicAndPartition('topic_noleader', 1): None
},
client.topics_to_brokers)
- self.assertIsNone(client._get_leader_for_partition('topic_noleader', 0))
- self.assertIsNone(client._get_leader_for_partition('topic_noleader', 1))
- topics['topic_noleader'] = {
- 0: PartitionMetadata('topic_noleader', 0, 0, [0, 1], [0, 1]),
- 1: PartitionMetadata('topic_noleader', 1, 1, [1, 0], [1, 0])
- }
- protocol.decode_metadata_response.return_value = (brokers, topics)
+ # No leader partitions -- raise LeaderNotAvailableError
+ with self.assertRaises(LeaderNotAvailableError):
+ self.assertIsNone(client._get_leader_for_partition('topic_noleader', 0))
+ with self.assertRaises(LeaderNotAvailableError):
+ self.assertIsNone(client._get_leader_for_partition('topic_noleader', 1))
+
+ # Unknown partitions -- raise UnknownTopicOrPartitionError
+ with self.assertRaises(UnknownTopicOrPartitionError):
+ self.assertIsNone(client._get_leader_for_partition('topic_noleader', 2))
+
+ topics = [
+ TopicMetadata('topic_noleader', NO_ERROR, [
+ PartitionMetadata('topic_noleader', 0, 0, [0, 1], [0, 1], NO_ERROR),
+ PartitionMetadata('topic_noleader', 1, 1, [1, 0], [1, 0], NO_ERROR)
+ ]),
+ ]
+ protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
self.assertEqual(brokers[0], client._get_leader_for_partition('topic_noleader', 0))
self.assertEqual(brokers[1], client._get_leader_for_partition('topic_noleader', 1))
@patch('kafka.client.KafkaConnection')
@patch('kafka.client.KafkaProtocol')
def test_send_produce_request_raises_when_noleader(self, protocol, conn):
- "Send producer request raises LeaderUnavailableError if leader is not available"
+ "Send producer request raises LeaderNotAvailableError if leader is not available"
conn.recv.return_value = 'response' # anything but None
- brokers = {}
- brokers[0] = BrokerMetadata(0, 'broker_1', 4567)
- brokers[1] = BrokerMetadata(1, 'broker_2', 5678)
-
- topics = {}
- topics['topic_noleader'] = {
- 0: PartitionMetadata('topic_noleader', 0, -1, [], []),
- 1: PartitionMetadata('topic_noleader', 1, -1, [], [])
- }
- protocol.decode_metadata_response.return_value = (brokers, topics)
+ brokers = [
+ BrokerMetadata(0, 'broker_1', 4567),
+ BrokerMetadata(1, 'broker_2', 5678)
+ ]
+
+ topics = [
+ TopicMetadata('topic_noleader', NO_ERROR, [
+ PartitionMetadata('topic_noleader', 0, -1, [], [],
+ NO_LEADER),
+ PartitionMetadata('topic_noleader', 1, -1, [], [],
+ NO_LEADER),
+ ]),
+ ]
+ protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
client = KafkaClient(hosts=['broker_1:4567'])
@@ -248,7 +362,32 @@ class TestKafkaClient(unittest.TestCase):
"topic_noleader", 0,
[create_message("a"), create_message("b")])]
- with self.assertRaises(LeaderUnavailableError):
+ with self.assertRaises(LeaderNotAvailableError):
+ client.send_produce_request(requests)
+
+ @patch('kafka.client.KafkaConnection')
+ @patch('kafka.client.KafkaProtocol')
+ def test_send_produce_request_raises_when_topic_unknown(self, protocol, conn):
+
+ conn.recv.return_value = 'response' # anything but None
+
+ brokers = [
+ BrokerMetadata(0, 'broker_1', 4567),
+ BrokerMetadata(1, 'broker_2', 5678)
+ ]
+
+ topics = [
+ TopicMetadata('topic_doesnt_exist', UNKNOWN_TOPIC_OR_PARTITION, []),
+ ]
+ protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
+
+ client = KafkaClient(hosts=['broker_1:4567'])
+
+ requests = [ProduceRequest(
+ "topic_doesnt_exist", 0,
+ [create_message("a"), create_message("b")])]
+
+ with self.assertRaises(UnknownTopicOrPartitionError):
client.send_produce_request(requests)
def test_timeout(self):
diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py
index 125df2c..71fd26c 100644
--- a/test/test_producer_integration.py
+++ b/test/test_producer_integration.py
@@ -9,7 +9,8 @@ from kafka import (
)
from kafka.codec import has_snappy
from kafka.common import (
- FetchRequest, ProduceRequest, UnknownTopicOrPartitionError
+ FetchRequest, ProduceRequest,
+ UnknownTopicOrPartitionError, LeaderNotAvailableError
)
from test.fixtures import ZookeeperFixture, KafkaFixture
@@ -165,7 +166,8 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
producer = SimpleProducer(self.client)
# At first it doesn't exist
- with self.assertRaises(UnknownTopicOrPartitionError):
+ with self.assertRaises((UnknownTopicOrPartitionError,
+ LeaderNotAvailableError)):
producer.send_messages(new_topic, self.msg("one"))
@kafka_versions("all")
diff --git a/test/test_protocol.py b/test/test_protocol.py
index 11d4687..a028be9 100644
--- a/test/test_protocol.py
+++ b/test/test_protocol.py
@@ -10,9 +10,10 @@ from kafka.common import (
OffsetRequest, OffsetCommitRequest, OffsetFetchRequest,
OffsetResponse, OffsetCommitResponse, OffsetFetchResponse,
ProduceRequest, FetchRequest, Message, ChecksumError,
- ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse, OffsetAndMessage,
- BrokerMetadata, PartitionMetadata, ProtocolError,
- UnsupportedCodecError
+ ProduceResponse, FetchResponse, OffsetAndMessage,
+ BrokerMetadata, TopicMetadata, PartitionMetadata, TopicAndPartition,
+ KafkaUnavailableError, UnsupportedCodecError, ConsumerFetchSizeTooSmall,
+ ProtocolError
)
from kafka.protocol import (
ATTRIBUTE_CODEC_MASK, CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY, KafkaProtocol,
@@ -451,21 +452,22 @@ class TestProtocol(unittest.TestCase):
self.assertEqual(encoded, expected)
- def _create_encoded_metadata_response(self, broker_data, topic_data,
- topic_errors, partition_errors):
- encoded = struct.pack('>ii', 3, len(broker_data))
- for node_id, broker in six.iteritems(broker_data):
- encoded += struct.pack('>ih%dsi' % len(broker.host), node_id,
+ def _create_encoded_metadata_response(self, brokers, topics):
+ encoded = struct.pack('>ii', 3, len(brokers))
+ for broker in brokers:
+ encoded += struct.pack('>ih%dsi' % len(broker.host), broker.nodeId,
len(broker.host), broker.host, broker.port)
- encoded += struct.pack('>i', len(topic_data))
- for topic, partitions in six.iteritems(topic_data):
- encoded += struct.pack('>hh%dsi' % len(topic), topic_errors[topic],
- len(topic), topic, len(partitions))
- for partition, metadata in six.iteritems(partitions):
+ encoded += struct.pack('>i', len(topics))
+ for topic in topics:
+ encoded += struct.pack('>hh%dsi' % len(topic.topic),
+ topic.error, len(topic.topic),
+ topic.topic, len(topic.partitions))
+ for metadata in topic.partitions:
encoded += struct.pack('>hiii',
- partition_errors[(topic, partition)],
- partition, metadata.leader,
+ metadata.error,
+ metadata.partition,
+ metadata.leader,
len(metadata.replicas))
if len(metadata.replicas) > 0:
encoded += struct.pack('>%di' % len(metadata.replicas),
@@ -475,35 +477,26 @@ class TestProtocol(unittest.TestCase):
if len(metadata.isr) > 0:
encoded += struct.pack('>%di' % len(metadata.isr),
*metadata.isr)
-
return encoded
def test_decode_metadata_response(self):
- node_brokers = {
- 0: BrokerMetadata(0, b"brokers1.kafka.rdio.com", 1000),
- 1: BrokerMetadata(1, b"brokers1.kafka.rdio.com", 1001),
- 3: BrokerMetadata(3, b"brokers2.kafka.rdio.com", 1000)
- }
-
- topic_partitions = {
- b"topic1": {
- 0: PartitionMetadata(b"topic1", 0, 1, (0, 2), (2,)),
- 1: PartitionMetadata(b"topic1", 1, 3, (0, 1), (0, 1))
- },
- b"topic2": {
- 0: PartitionMetadata(b"topic2", 0, 0, (), ())
- }
- }
- topic_errors = {b"topic1": 0, b"topic2": 1}
- partition_errors = {
- (b"topic1", 0): 0,
- (b"topic1", 1): 1,
- (b"topic2", 0): 0
- }
+ node_brokers = [
+ BrokerMetadata(0, b"brokers1.kafka.rdio.com", 1000),
+ BrokerMetadata(1, b"brokers1.kafka.rdio.com", 1001),
+ BrokerMetadata(3, b"brokers2.kafka.rdio.com", 1000)
+ ]
+
+ topic_partitions = [
+ TopicMetadata(b"topic1", 0, [
+ PartitionMetadata(b"topic1", 0, 1, (0, 2), (2,), 0),
+ PartitionMetadata(b"topic1", 1, 3, (0, 1), (0, 1), 1)
+ ]),
+ TopicMetadata(b"topic2", 1, [
+ PartitionMetadata(b"topic2", 0, 0, (), (), 0),
+ ]),
+ ]
encoded = self._create_encoded_metadata_response(node_brokers,
- topic_partitions,
- topic_errors,
- partition_errors)
+ topic_partitions)
decoded = KafkaProtocol.decode_metadata_response(encoded)
self.assertEqual(decoded, (node_brokers, topic_partitions))