diff options
-rw-r--r-- | kafka/client.py | 30 | ||||
-rw-r--r-- | kafka/common.py | 30 | ||||
-rw-r--r-- | kafka/protocol.py | 45 | ||||
-rw-r--r-- | test/test_client.py | 158 | ||||
-rw-r--r-- | test/test_protocol.py | 74 |
5 files changed, 192 insertions, 145 deletions
diff --git a/kafka/client.py b/kafka/client.py index 8630f66..e14694f 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -6,7 +6,7 @@ import logging import time import kafka.common -from kafka.common import (TopicAndPartition, +from kafka.common import (TopicAndPartition, BrokerMetadata, ConnectionError, FailedPayloadsError, PartitionUnavailableError, LeaderUnavailableError, KafkaUnavailableError, KafkaTimeoutError, @@ -83,20 +83,26 @@ class KafkaClient(object): """ return KafkaClient.ID_GEN.next() - def _send_broker_unaware_request(self, requestId, request): + def _send_broker_unaware_request(self, payloads, encoder_fn, decoder_fn): """ Attempt to send a broker-agnostic request to one of the available brokers. Keep trying until you succeed. """ for (host, port) in self.hosts: + requestId = self._next_id() try: conn = self._get_conn(host, port) + request = encoder_fn(client_id=self.client_id, + correlation_id=requestId, + payloads=payloads) + conn.send(requestId, request) response = conn.recv(requestId) - return response + return decoder_fn(response) + except Exception as e: log.warning("Could not send request [%r] to server %s:%i, " - "trying next server: %s" % (request, host, port, e)) + "trying next server: %s" % (requestId, host, port, e)) raise KafkaUnavailableError("All servers failed to process request") @@ -246,13 +252,11 @@ class KafkaClient(object): Discover brokers and metadata for a set of topics. This function is called lazily whenever metadata is unavailable. """ - request_id = self._next_id() - request = KafkaProtocol.encode_metadata_request(self.client_id, - request_id, topics) - response = self._send_broker_unaware_request(request_id, request) + resp = self.send_metadata_request(topics) - (brokers, topics) = KafkaProtocol.decode_metadata_response(response) + brokers = dict([(broker.nodeId, broker) for broker in resp.brokers]) + topics = dict([(t.topic, dict([(p.partition, p) for p in t.partitions]) ) for t in resp.topics]) log.debug("Broker metadata: %s", brokers) log.debug("Topic metadata: %s", topics) @@ -276,6 +280,14 @@ class KafkaClient(object): else: self.topics_to_brokers[topic_part] = brokers[meta.leader] + def send_metadata_request(self, payloads=[], fail_on_error=True, + callback=None): + + encoder = KafkaProtocol.encode_metadata_request + decoder = KafkaProtocol.decode_metadata_response + + return self._send_broker_unaware_request(payloads, encoder, decoder) + def send_produce_request(self, payloads=[], acks=1, timeout=1000, fail_on_error=True, callback=None): """ diff --git a/kafka/common.py b/kafka/common.py index 907e128..e1713cf 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -17,8 +17,14 @@ OffsetRequest = namedtuple("OffsetRequest", OffsetCommitRequest = namedtuple("OffsetCommitRequest", ["topic", "partition", "offset", "metadata"]) +MetadataRequest = namedtuple("MetadataRequest", + ["topics"]) + OffsetFetchRequest = namedtuple("OffsetFetchRequest", ["topic", "partition"]) +MetadataResponse = namedtuple("MetadataResponse", + ["brokers", "topics"]) + # Response payloads ProduceResponse = namedtuple("ProduceResponse", ["topic", "partition", "error", "offset"]) @@ -36,16 +42,26 @@ OffsetFetchResponse = namedtuple("OffsetFetchResponse", ["topic", "partition", "offset", "metadata", "error"]) -BrokerMetadata = namedtuple("BrokerMetadata", ["nodeId", "host", "port"]) -PartitionMetadata = namedtuple("PartitionMetadata", - ["topic", "partition", "leader", - "replicas", "isr"]) # Other useful structs -OffsetAndMessage = namedtuple("OffsetAndMessage", ["offset", "message"]) -Message = namedtuple("Message", ["magic", "attributes", "key", "value"]) -TopicAndPartition = namedtuple("TopicAndPartition", ["topic", "partition"]) +BrokerMetadata = namedtuple("BrokerMetadata", + ["nodeId", "host", "port"]) + +TopicMetadata = namedtuple("TopicMetadata", + ["topic", "error", "partitions"]) + +PartitionMetadata = namedtuple("PartitionMetadata", + ["topic", "partition", "leader", "replicas", "isr", "error"]) + +OffsetAndMessage = namedtuple("OffsetAndMessage", + ["offset", "message"]) + +Message = namedtuple("Message", + ["magic", "attributes", "key", "value"]) + +TopicAndPartition = namedtuple("TopicAndPartition", + ["topic", "partition"]) ################# diff --git a/kafka/protocol.py b/kafka/protocol.py index 58661c7..db048aa 100644 --- a/kafka/protocol.py +++ b/kafka/protocol.py @@ -6,11 +6,12 @@ from kafka.codec import ( gzip_encode, gzip_decode, snappy_encode, snappy_decode ) from kafka.common import ( - BrokerMetadata, PartitionMetadata, Message, OffsetAndMessage, - ProduceResponse, FetchResponse, OffsetResponse, - OffsetCommitResponse, OffsetFetchResponse, ProtocolError, - BufferUnderflowError, ChecksumError, ConsumerFetchSizeTooSmall, - UnsupportedCodecError + Message, OffsetAndMessage, TopicAndPartition, + BrokerMetadata, TopicMetadata, PartitionMetadata, + MetadataResponse, ProduceResponse, FetchResponse, + OffsetResponse, OffsetCommitResponse, OffsetFetchResponse, + ProtocolError, BufferUnderflowError, ChecksumError, + ConsumerFetchSizeTooSmall, UnsupportedCodecError ) from kafka.util import ( read_short_string, read_int_string, relative_unpack, @@ -340,7 +341,8 @@ class KafkaProtocol(object): yield OffsetResponse(topic, partition, error, tuple(offsets)) @classmethod - def encode_metadata_request(cls, client_id, correlation_id, topics=None): + def encode_metadata_request(cls, client_id, correlation_id, topics=None, + payloads=None): """ Encode a MetadataRequest @@ -350,7 +352,11 @@ class KafkaProtocol(object): correlation_id: int topics: list of strings """ - topics = [] if topics is None else topics + if payloads is None: + topics = [] if topics is None else topics + else: + topics = payloads + message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.METADATA_KEY) @@ -373,28 +379,24 @@ class KafkaProtocol(object): ((correlation_id, numbrokers), cur) = relative_unpack('>ii', data, 0) # Broker info - brokers = {} + brokers = [] for i in range(numbrokers): ((nodeId, ), cur) = relative_unpack('>i', data, cur) (host, cur) = read_short_string(data, cur) ((port,), cur) = relative_unpack('>i', data, cur) - brokers[nodeId] = BrokerMetadata(nodeId, host, port) + brokers.append(BrokerMetadata(nodeId, host, port)) # Topic info ((num_topics,), cur) = relative_unpack('>i', data, cur) - topic_metadata = {} + topic_metadata = [] for i in range(num_topics): - # NOTE: topic_error is discarded. Should probably be returned with - # the topic metadata. ((topic_error,), cur) = relative_unpack('>h', data, cur) (topic_name, cur) = read_short_string(data, cur) ((num_partitions,), cur) = relative_unpack('>i', data, cur) - partition_metadata = {} + partition_metadata = [] for j in range(num_partitions): - # NOTE: partition_error_code is discarded. Should probably be - # returned with the partition metadata. ((partition_error_code, partition, leader, numReplicas), cur) = \ relative_unpack('>hiii', data, cur) @@ -404,13 +406,16 @@ class KafkaProtocol(object): ((num_isr,), cur) = relative_unpack('>i', data, cur) (isr, cur) = relative_unpack('>%di' % num_isr, data, cur) - partition_metadata[partition] = \ - PartitionMetadata( - topic_name, partition, leader, replicas, isr) + partition_metadata.append( + PartitionMetadata(topic_name, partition, leader, + replicas, isr, partition_error_code) + ) - topic_metadata[topic_name] = partition_metadata + topic_metadata.append( + TopicMetadata(topic_name, topic_error, partition_metadata) + ) - return brokers, topic_metadata + return MetadataResponse(brokers, topic_metadata) @classmethod def encode_offset_commit_request(cls, client_id, correlation_id, diff --git a/test/test_client.py b/test/test_client.py index 32a2256..58254fc 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -1,10 +1,11 @@ import unittest2 -from mock import MagicMock, patch +from mock import ANY, MagicMock, patch from kafka import KafkaClient from kafka.common import ( - ProduceRequest, BrokerMetadata, PartitionMetadata, + ProduceRequest, MetadataResponse, + BrokerMetadata, TopicMetadata, PartitionMetadata, TopicAndPartition, KafkaUnavailableError, LeaderUnavailableError, PartitionUnavailableError ) @@ -56,10 +57,12 @@ class TestKafkaClient(unittest2.TestCase): client = KafkaClient(hosts=['kafka01:9092', 'kafka02:9092']) with self.assertRaises(KafkaUnavailableError): - client._send_broker_unaware_request(1, 'fake request') + client._send_broker_unaware_request(payloads=['fake request'], + encoder_fn=MagicMock(return_value='fake encoded message'), + decoder_fn=lambda x: x) for key, conn in mocked_conns.iteritems(): - conn.send.assert_called_with(1, 'fake request') + conn.send.assert_called_with(ANY, 'fake encoded message') def test_send_broker_unaware_request(self): 'Tests that call works when at least one of the host is available' @@ -80,12 +83,15 @@ class TestKafkaClient(unittest2.TestCase): # patch to avoid making requests before we want it with patch.object(KafkaClient, 'load_metadata_for_topics'): with patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn): - client = KafkaClient(hosts='kafka01:9092,kafka02:9092') + with patch.object(KafkaClient, '_next_id', return_value=1): + client = KafkaClient(hosts='kafka01:9092,kafka02:9092') - resp = client._send_broker_unaware_request(1, 'fake request') + resp = client._send_broker_unaware_request(payloads=['fake request'], + encoder_fn=MagicMock(), + decoder_fn=lambda x: x) - self.assertEqual('valid response', resp) - mocked_conns[('kafka02', 9092)].recv.assert_called_with(1) + self.assertEqual('valid response', resp) + mocked_conns[('kafka02', 9092)].recv.assert_called_with(1) @patch('kafka.client.KafkaConnection') @patch('kafka.client.KafkaProtocol') @@ -94,25 +100,27 @@ class TestKafkaClient(unittest2.TestCase): conn.recv.return_value = 'response' # anything but None - brokers = {} - brokers[0] = BrokerMetadata(1, 'broker_1', 4567) - brokers[1] = BrokerMetadata(2, 'broker_2', 5678) - - topics = {} - topics['topic_1'] = { - 0: PartitionMetadata('topic_1', 0, 1, [1, 2], [1, 2]) - } - topics['topic_noleader'] = { - 0: PartitionMetadata('topic_noleader', 0, -1, [], []), - 1: PartitionMetadata('topic_noleader', 1, -1, [], []) - } - topics['topic_no_partitions'] = {} - topics['topic_3'] = { - 0: PartitionMetadata('topic_3', 0, 0, [0, 1], [0, 1]), - 1: PartitionMetadata('topic_3', 1, 1, [1, 0], [1, 0]), - 2: PartitionMetadata('topic_3', 2, 0, [0, 1], [0, 1]) - } - protocol.decode_metadata_response.return_value = (brokers, topics) + brokers = [ + BrokerMetadata(0, 'broker_1', 4567), + BrokerMetadata(1, 'broker_2', 5678) + ] + + topics = [ + TopicMetadata('topic_1', 0, [ + PartitionMetadata('topic_1', 0, 1, [1, 2], [1, 2], 0) + ]), + TopicMetadata('topic_noleader', 0, [ + PartitionMetadata('topic_noleader', 0, -1, [], [], 0), + PartitionMetadata('topic_noleader', 1, -1, [], [], 0) + ]), + TopicMetadata('topic_no_partitions', 0, []), + TopicMetadata('topic_3', 0, [ + PartitionMetadata('topic_3', 0, 0, [0, 1], [0, 1], 0), + PartitionMetadata('topic_3', 1, 1, [1, 0], [1, 0], 0), + PartitionMetadata('topic_3', 2, 0, [0, 1], [0, 1], 0) + ]) + ] + protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) # client loads metadata at init client = KafkaClient(hosts=['broker_1:4567']) @@ -132,30 +140,35 @@ class TestKafkaClient(unittest2.TestCase): conn.recv.return_value = 'response' # anything but None - brokers = {} - brokers[0] = BrokerMetadata(0, 'broker_1', 4567) - brokers[1] = BrokerMetadata(1, 'broker_2', 5678) + brokers = [ + BrokerMetadata(0, 'broker_1', 4567), + BrokerMetadata(1, 'broker_2', 5678) + ] - topics = {'topic_no_partitions': {}} - protocol.decode_metadata_response.return_value = (brokers, topics) + topics = [ + TopicMetadata('topic_no_partitions', 0, []) + ] + protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) client = KafkaClient(hosts=['broker_1:4567']) # topic metadata is loaded but empty self.assertDictEqual({}, client.topics_to_brokers) - topics['topic_no_partitions'] = { - 0: PartitionMetadata('topic_no_partitions', 0, 0, [0, 1], [0, 1]) - } - protocol.decode_metadata_response.return_value = (brokers, topics) + topics = [ + TopicMetadata('topic_one_partition', 0, [ + PartitionMetadata('topic_no_partition', 0, 0, [0, 1], [0, 1], 0) + ]) + ] + protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) # calling _get_leader_for_partition (from any broker aware request) # will try loading metadata again for the same topic - leader = client._get_leader_for_partition('topic_no_partitions', 0) + leader = client._get_leader_for_partition('topic_one_partition', 0) self.assertEqual(brokers[0], leader) self.assertDictEqual({ - TopicAndPartition('topic_no_partitions', 0): brokers[0]}, + TopicAndPartition('topic_one_partition', 0): brokers[0]}, client.topics_to_brokers) @patch('kafka.client.KafkaConnection') @@ -165,12 +178,15 @@ class TestKafkaClient(unittest2.TestCase): conn.recv.return_value = 'response' # anything but None - brokers = {} - brokers[0] = BrokerMetadata(0, 'broker_1', 4567) - brokers[1] = BrokerMetadata(1, 'broker_2', 5678) + brokers = [ + BrokerMetadata(0, 'broker_1', 4567), + BrokerMetadata(1, 'broker_2', 5678) + ] - topics = {'topic_no_partitions': {}} - protocol.decode_metadata_response.return_value = (brokers, topics) + topics = [ + TopicMetadata('topic_no_partitions', 0, []) + ] + protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) client = KafkaClient(hosts=['broker_1:4567']) @@ -186,16 +202,18 @@ class TestKafkaClient(unittest2.TestCase): conn.recv.return_value = 'response' # anything but None - brokers = {} - brokers[0] = BrokerMetadata(0, 'broker_1', 4567) - brokers[1] = BrokerMetadata(1, 'broker_2', 5678) + brokers = [ + BrokerMetadata(0, 'broker_1', 4567), + BrokerMetadata(1, 'broker_2', 5678) + ] - topics = {} - topics['topic_noleader'] = { - 0: PartitionMetadata('topic_noleader', 0, -1, [], []), - 1: PartitionMetadata('topic_noleader', 1, -1, [], []) - } - protocol.decode_metadata_response.return_value = (brokers, topics) + topics = [ + TopicMetadata('topic_noleader', 0, [ + PartitionMetadata('topic_noleader', 0, -1, [], [], 0), + PartitionMetadata('topic_noleader', 1, -1, [], [], 0) + ]), + ] + protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) client = KafkaClient(hosts=['broker_1:4567']) self.assertDictEqual( @@ -207,31 +225,35 @@ class TestKafkaClient(unittest2.TestCase): self.assertIsNone(client._get_leader_for_partition('topic_noleader', 0)) self.assertIsNone(client._get_leader_for_partition('topic_noleader', 1)) - topics['topic_noleader'] = { - 0: PartitionMetadata('topic_noleader', 0, 0, [0, 1], [0, 1]), - 1: PartitionMetadata('topic_noleader', 1, 1, [1, 0], [1, 0]) - } - protocol.decode_metadata_response.return_value = (brokers, topics) + topics = [ + TopicMetadata('topic_noleader', 0, [ + PartitionMetadata('topic_noleader', 0, 0, [0, 1], [0, 1], 0), + PartitionMetadata('topic_noleader', 1, 1, [1, 0], [1, 0], 0) + ]), + ] + protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) self.assertEqual(brokers[0], client._get_leader_for_partition('topic_noleader', 0)) self.assertEqual(brokers[1], client._get_leader_for_partition('topic_noleader', 1)) @patch('kafka.client.KafkaConnection') @patch('kafka.client.KafkaProtocol') def test_send_produce_request_raises_when_noleader(self, protocol, conn): - "Send producer request raises LeaderUnavailableError if leader is not available" + "Send producer request raises LeaderNotAvailableError if leader is not available" conn.recv.return_value = 'response' # anything but None - brokers = {} - brokers[0] = BrokerMetadata(0, 'broker_1', 4567) - brokers[1] = BrokerMetadata(1, 'broker_2', 5678) - - topics = {} - topics['topic_noleader'] = { - 0: PartitionMetadata('topic_noleader', 0, -1, [], []), - 1: PartitionMetadata('topic_noleader', 1, -1, [], []) - } - protocol.decode_metadata_response.return_value = (brokers, topics) + brokers = [ + BrokerMetadata(0, 'broker_1', 4567), + BrokerMetadata(1, 'broker_2', 5678) + ] + + topics = [ + TopicMetadata('topic_noleader', 0, [ + PartitionMetadata('topic_noleader', 0, -1, [], [], 0), + PartitionMetadata('topic_noleader', 1, -1, [], [], 0) + ]), + ] + protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) client = KafkaClient(hosts=['broker_1:4567']) diff --git a/test/test_protocol.py b/test/test_protocol.py index 2089f48..a92d20e 100644 --- a/test/test_protocol.py +++ b/test/test_protocol.py @@ -11,10 +11,10 @@ from kafka.common import ( OffsetRequest, OffsetCommitRequest, OffsetFetchRequest, OffsetResponse, OffsetCommitResponse, OffsetFetchResponse, ProduceRequest, FetchRequest, Message, ChecksumError, - ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse, OffsetAndMessage, - BrokerMetadata, PartitionMetadata, TopicAndPartition, KafkaUnavailableError, - ProtocolError, LeaderUnavailableError, PartitionUnavailableError, - UnsupportedCodecError + ProduceResponse, FetchResponse, OffsetAndMessage, + BrokerMetadata, TopicMetadata, PartitionMetadata, TopicAndPartition, + KafkaUnavailableError, PartitionUnavailableError, + UnsupportedCodecError, ConsumerFetchSizeTooSmall, ProtocolError, ) from kafka.codec import ( has_snappy, gzip_encode, gzip_decode, @@ -454,21 +454,22 @@ class TestProtocol(unittest2.TestCase): self.assertEqual(encoded, expected) - def _create_encoded_metadata_response(self, broker_data, topic_data, - topic_errors, partition_errors): - encoded = struct.pack('>ii', 3, len(broker_data)) - for node_id, broker in broker_data.iteritems(): - encoded += struct.pack('>ih%dsi' % len(broker.host), node_id, + def _create_encoded_metadata_response(self, brokers, topics): + encoded = struct.pack('>ii', 3, len(brokers)) + for broker in brokers: + encoded += struct.pack('>ih%dsi' % len(broker.host), broker.nodeId, len(broker.host), broker.host, broker.port) - encoded += struct.pack('>i', len(topic_data)) - for topic, partitions in topic_data.iteritems(): - encoded += struct.pack('>hh%dsi' % len(topic), topic_errors[topic], - len(topic), topic, len(partitions)) - for partition, metadata in partitions.iteritems(): + encoded += struct.pack('>i', len(topics)) + for topic in topics: + encoded += struct.pack('>hh%dsi' % len(topic.topic), + topic.error, len(topic.topic), + topic.topic, len(topic.partitions)) + for metadata in topic.partitions: encoded += struct.pack('>hiii', - partition_errors[(topic, partition)], - partition, metadata.leader, + metadata.error, + metadata.partition, + metadata.leader, len(metadata.replicas)) if len(metadata.replicas) > 0: encoded += struct.pack('>%di' % len(metadata.replicas), @@ -478,35 +479,26 @@ class TestProtocol(unittest2.TestCase): if len(metadata.isr) > 0: encoded += struct.pack('>%di' % len(metadata.isr), *metadata.isr) - return encoded def test_decode_metadata_response(self): - node_brokers = { - 0: BrokerMetadata(0, "brokers1.kafka.rdio.com", 1000), - 1: BrokerMetadata(1, "brokers1.kafka.rdio.com", 1001), - 3: BrokerMetadata(3, "brokers2.kafka.rdio.com", 1000) - } - - topic_partitions = { - "topic1": { - 0: PartitionMetadata("topic1", 0, 1, (0, 2), (2,)), - 1: PartitionMetadata("topic1", 1, 3, (0, 1), (0, 1)) - }, - "topic2": { - 0: PartitionMetadata("topic2", 0, 0, (), ()) - } - } - topic_errors = {"topic1": 0, "topic2": 1} - partition_errors = { - ("topic1", 0): 0, - ("topic1", 1): 1, - ("topic2", 0): 0 - } + node_brokers = [ + BrokerMetadata(0, "brokers1.kafka.rdio.com", 1000), + BrokerMetadata(1, "brokers1.kafka.rdio.com", 1001), + BrokerMetadata(3, "brokers2.kafka.rdio.com", 1000) + ] + + topic_partitions = [ + TopicMetadata("topic1", 0, [ + PartitionMetadata("topic1", 0, 1, (0, 2), (2,), 0), + PartitionMetadata("topic1", 1, 3, (0, 1), (0, 1), 1) + ]), + TopicMetadata("topic2", 1, [ + PartitionMetadata("topic2", 0, 0, (), (), 0), + ]), + ] encoded = self._create_encoded_metadata_response(node_brokers, - topic_partitions, - topic_errors, - partition_errors) + topic_partitions) decoded = KafkaProtocol.decode_metadata_response(encoded) self.assertEqual(decoded, (node_brokers, topic_partitions)) |