diff options
author | Marc Labbe <mrlabbe@gmail.com> | 2013-11-14 09:26:49 -0500 |
---|---|---|
committer | Marc Labbe <mrlabbe@gmail.com> | 2013-11-14 09:26:49 -0500 |
commit | 0bdff4e833f73518a7219fca04dfbc3ed201b06e (patch) | |
tree | fe7b43720946aa9d86c92f4a8a5a9c9f4244a683 | |
parent | af3a57edb2c83c35b832e759b4c24ec72149841a (diff) | |
download | kafka-python-0bdff4e833f73518a7219fca04dfbc3ed201b06e.tar.gz |
Allow KafkaClient to take in a list of brokers for bootstrapping
-rw-r--r-- | README.md | 8 | ||||
-rw-r--r-- | example.py | 2 | ||||
-rw-r--r-- | kafka/NOTES.md | 2 | ||||
-rw-r--r-- | kafka/client.py | 35 | ||||
-rw-r--r-- | kafka/conn.py | 33 | ||||
-rw-r--r-- | setup.py | 5 | ||||
-rw-r--r-- | test/test_integration.py | 23 | ||||
-rw-r--r-- | test/test_unit.py | 187 |
8 files changed, 256 insertions, 39 deletions
@@ -30,7 +30,7 @@ from kafka.client import KafkaClient from kafka.consumer import SimpleConsumer from kafka.producer import SimpleProducer, KeyedProducer -kafka = KafkaClient("localhost", 9092) +kafka = KafkaClient("localhost:9092") # To send messages synchronously producer = SimpleProducer(kafka, "my-topic") @@ -81,7 +81,7 @@ from kafka.client import KafkaClient from kafka.producer import KeyedProducer from kafka.partitioner import HashedPartitioner, RoundRobinPartitioner -kafka = KafkaClient("localhost", 9092) +kafka = KafkaClient("localhost:9092") # HashedPartitioner is default producer = KeyedProducer(kafka, "my-topic") @@ -96,7 +96,7 @@ producer = KeyedProducer(kafka, "my-topic", partitioner=RoundRobinPartitioner) from kafka.client import KafkaClient from kafka.consumer import MultiProcessConsumer -kafka = KafkaClient("localhost", 9092) +kafka = KafkaClient("localhost:9092") # This will split the number of partitions among two processes consumer = MultiProcessConsumer(kafka, "my-group", "my-topic", num_procs=2) @@ -116,7 +116,7 @@ for message in consumer.get_messages(count=5, block=True, timeout=4): ```python from kafka.client import KafkaClient -kafka = KafkaClient("localhost", 9092) +kafka = KafkaClient("localhost:9092") req = ProduceRequest(topic="my-topic", partition=1, messages=[KafkaProdocol.encode_message("some message")]) resps = kafka.send_produce_request(payloads=[req], fail_on_error=True) @@ -14,7 +14,7 @@ def consume_example(client): print(message) def main(): - client = KafkaClient("localhost", 9092) + client = KafkaClient("localhost:9092") produce_example(client) consume_example(client) diff --git a/kafka/NOTES.md b/kafka/NOTES.md index 540cdad..8fb0f47 100644 --- a/kafka/NOTES.md +++ b/kafka/NOTES.md @@ -18,7 +18,7 @@ There are a few levels of abstraction: # Possible API - client = KafkaClient("localhost", 9092) + client = KafkaClient("localhost:9092") producer = KafkaProducer(client, "topic") producer.send_string("hello") diff --git a/kafka/client.py b/kafka/client.py index 71ededa..81eec7d 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -8,7 +8,7 @@ import time from kafka.common import ErrorMapping, TopicAndPartition from kafka.common import ConnectionError, FailedPayloadsException -from kafka.conn import KafkaConnection +from kafka.conn import collect_hosts, KafkaConnection from kafka.protocol import KafkaProtocol log = logging.getLogger("kafka") @@ -19,13 +19,15 @@ class KafkaClient(object): CLIENT_ID = "kafka-python" ID_GEN = count() - def __init__(self, host, port, bufsize=4096, client_id=CLIENT_ID): + def __init__(self, hosts, bufsize=4096, client_id=CLIENT_ID): # We need one connection to bootstrap self.bufsize = bufsize self.client_id = client_id - self.conns = { # (host, port) -> KafkaConnection - (host, port): KafkaConnection(host, port, bufsize) - } + + self.hosts = collect_hosts(hosts) + + # create connections only when we need them + self.conns = {} self.brokers = {} # broker_id -> BrokerMetadata self.topics_to_brokers = {} # topic_id -> broker_id self.topic_partitions = defaultdict(list) # topic_id -> [0, 1, 2, ...] @@ -35,15 +37,19 @@ class KafkaClient(object): # Private API # ################## + def _get_conn(self, host, port): + "Get or create a connection to a broker using host and port" + + host_key = (host, port) + if host_key not in self.conns: + self.conns[host_key] = KafkaConnection(host, port, self.bufsize) + + return self.conns[host_key] + def _get_conn_for_broker(self, broker): - """ - Get or create a connection to a broker - """ - if (broker.host, broker.port) not in self.conns: - self.conns[(broker.host, broker.port)] = \ - KafkaConnection(broker.host, broker.port, self.bufsize) + "Get or create a connection to a broker" - return self.conns[(broker.host, broker.port)] + return self._get_conn(broker.host, broker.port) def _get_leader_for_partition(self, topic, partition): key = TopicAndPartition(topic, partition) @@ -108,7 +114,8 @@ class KafkaClient(object): Attempt to send a broker-agnostic request to one of the available brokers. Keep trying until you succeed. """ - for conn in self.conns.values(): + for (host, port) in self.hosts: + conn = self._get_conn(host, port) try: conn.send(requestId, request) response = conn.recv(requestId) @@ -174,7 +181,7 @@ class KafkaClient(object): except ConnectionError, e: # ignore BufferUnderflow for now log.warning("Could not send request [%s] to server %s: %s" % (request, conn, e)) failed_payloads += payloads - self.topics_to_brokers = {} # reset metadata + self.topics_to_brokers = {} # reset metadata continue for response in decoder_fn(response): diff --git a/kafka/conn.py b/kafka/conn.py index 14aebc6..614b1bb 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -2,6 +2,7 @@ import copy import logging import socket import struct +from random import shuffle from threading import local from kafka.common import BufferUnderflowError @@ -10,6 +11,26 @@ from kafka.common import ConnectionError log = logging.getLogger("kafka") +def collect_hosts(hosts, randomize=True): + """ + Collects a comma-separated set of hosts (host:port) and optionnaly + randomize the returned list. + """ + + result = [] + for host_port in hosts.split(","): + + res = host_port.split(':') + host = res[0] + port = int(res[1]) if len(res) > 1 else 9092 + result.append((host.strip(), port)) + + if randomize: + shuffle(result) + + return result + + class KafkaConnection(local): """ A socket connection to a single Kafka broker @@ -19,14 +40,14 @@ class KafkaConnection(local): we can do something in here to facilitate multiplexed requests/responses since the Kafka API includes a correlation id. """ - def __init__(self, host, port, bufsize=4096): + def __init__(self, host, port, bufsize=4096, timeout=10): super(KafkaConnection, self).__init__() self.host = host self.port = port self.bufsize = bufsize - self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self._sock.connect((host, port)) - self._sock.settimeout(10) + self.timeout = timeout + + self._sock = socket.create_connection((host, port), timeout=timeout) self._dirty = False def __str__(self): @@ -125,7 +146,5 @@ class KafkaConnection(local): Re-initialize the socket connection """ self.close() - self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self._sock.connect((self.host, self.port)) - self._sock.settimeout(10) + self._sock = socket.create_connection((self.host, self.port), timeout=self.timeout) self._dirty = False @@ -1,11 +1,12 @@ -import os.path import sys from setuptools import setup, Command class Tox(Command): + user_options = [] + def initialize_options(self): pass @@ -21,7 +22,7 @@ setup( name="kafka-python", version="0.8.1-1", - install_requires=["distribute", "tox"], + install_requires=["distribute", "tox", "mock"], tests_require=["tox"], cmdclass={"test": Tox}, diff --git a/test/test_integration.py b/test/test_integration.py index a10dae2..1f37ebf 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -16,7 +16,7 @@ class TestKafkaClient(unittest.TestCase): def setUpClass(cls): # noqa cls.zk = ZookeeperFixture.instance() cls.server = KafkaFixture.instance(0, cls.zk.host, cls.zk.port) - cls.client = KafkaClient(cls.server.host, cls.server.port) + cls.client = KafkaClient('%s:%d' % (cls.server.host, cls.server.port)) @classmethod def tearDownClass(cls): # noqa @@ -554,7 +554,7 @@ class TestConsumer(unittest.TestCase): cls.zk = ZookeeperFixture.instance() cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port) cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port) - cls.client = KafkaClient(cls.server2.host, cls.server2.port, bufsize=8192) + cls.client = KafkaClient('%s:%d' % (cls.server2.host, cls.server2.port), bufsize=8192) @classmethod def tearDownClass(cls): # noqa @@ -770,20 +770,23 @@ class TestConsumer(unittest.TestCase): self.assertEquals(all_messages[i], message.message) self.assertEquals(i, 19) + class TestFailover(unittest.TestCase): @classmethod def setUpClass(cls): zk_chroot = random_string(10) - replicas = 2 + replicas = 2 partitions = 2 # mini zookeeper, 2 kafka brokers - cls.zk = ZookeeperFixture.instance() - kk_args = [cls.zk.host, cls.zk.port, zk_chroot, replicas, partitions] + cls.zk = ZookeeperFixture.instance() + kk_args = [cls.zk.host, cls.zk.port, zk_chroot, replicas, partitions] cls.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)] - cls.client = KafkaClient(cls.brokers[0].host, cls.brokers[0].port) + + hosts = ','.join(['%s:%d' % (b.host, b.port) for b in cls.brokers]) + cls.client = KafkaClient(hosts) @classmethod def tearDownClass(cls): @@ -858,17 +861,19 @@ class TestFailover(unittest.TestCase): resp = producer.send_messages(random_string(10)) if len(resp) > 0: self.assertEquals(resp[0].error, 0) - time.sleep(1) # give it some time + time.sleep(1) # give it some time def _kill_leader(self, topic, partition): leader = self.client.topics_to_brokers[TopicAndPartition(topic, partition)] broker = self.brokers[leader.nodeId] broker.close() - time.sleep(1) # give it some time + time.sleep(1) # give it some time return broker def _count_messages(self, group, topic): - client = KafkaClient(self.brokers[0].host, self.brokers[0].port) + + hosts = '%s:%d' % (self.brokers[0].host, self.brokers[0].port) + client = KafkaClient(hosts) consumer = SimpleConsumer(client, group, topic, auto_commit=False) all_messages = [] for message in consumer: diff --git a/test/test_unit.py b/test/test_unit.py index 3f3af66..4ea9442 100644 --- a/test/test_unit.py +++ b/test/test_unit.py @@ -3,13 +3,18 @@ import random import struct import unittest +from mock import patch + from kafka.client import KafkaClient -from kafka.common import ProduceRequest, FetchRequest from kafka.codec import ( has_gzip, has_snappy, gzip_encode, gzip_decode, snappy_encode, snappy_decode ) +from kafka.common import ( + ProduceRequest, FetchRequest, + BrokerMetadata, PartitionMetadata, TopicAndPartition +) ITERATIONS = 1000 STRLEN = 100 @@ -217,5 +222,185 @@ class TestRequests(unittest.TestCase): self.assertEquals(enc, expect) +class TestKafkaClient(unittest.TestCase): + + def test_send_broker_unaware_request_fail(self): + 'Tests that call fails when all hosts are unavailable' + + from mock import MagicMock + + mocked_conns = { + ('kafka01', 9092): MagicMock(), + ('kafka02', 9092): MagicMock() + } + # inject conns + mocked_conns[('kafka01', 9092)].send.side_effect = RuntimeError("kafka01 went away (unittest)") + mocked_conns[('kafka02', 9092)].send.side_effect = RuntimeError("Kafka02 went away (unittest)") + + def mock_get_conn(host, port): + print 'mock_get_conn: %s:%d=%s' % (host, port, mocked_conns[(host, port)]) + return mocked_conns[(host, port)] + + # patch to avoid making requests before we want it + with patch.object(KafkaClient, '_load_metadata_for_topics'), \ + patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn): + + client = KafkaClient(hosts='kafka01:9092,kafka02:9092') + + resp = client._send_broker_unaware_request(1, 'fake request') + + self.assertIsNone(resp) + + for key, conn in mocked_conns.iteritems(): + conn.send.assert_called_with(1, 'fake request') + + def test_send_broker_unaware_request(self): + 'Tests that call fails when one of the host is available' + + from mock import MagicMock + + mocked_conns = { + ('kafka01', 9092): MagicMock(), + ('kafka02', 9092): MagicMock(), + ('kafka03', 9092): MagicMock() + } + # inject conns + mocked_conns[('kafka01', 9092)].send.side_effect = RuntimeError("kafka01 went away (unittest)") + mocked_conns[('kafka02', 9092)].recv.return_value = 'valid response' + mocked_conns[('kafka03', 9092)].send.side_effect = RuntimeError("kafka03 went away (unittest)") + + def mock_get_conn(host, port): + print 'mock_get_conn: %s:%d=%s' % (host, port, mocked_conns[(host, port)]) + return mocked_conns[(host, port)] + + # patch to avoid making requests before we want it + with patch.object(KafkaClient, '_load_metadata_for_topics'), \ + patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn): + + client = KafkaClient(hosts='kafka01:9092,kafka02:9092') + + resp = client._send_broker_unaware_request(1, 'fake request') + + self.assertEqual('valid response', resp) + mocked_conns[('kafka02', 9092)].recv.assert_called_with(1) + + @unittest.skip('requires disabling recursion on _load_metadata_for_topics') + @patch('kafka.client.KafkaConnection') + @patch('kafka.client.KafkaProtocol') + def test_client_load_metadata(self, protocol, conn): + + conn.recv.return_value = 'response' # anything but None + + brokers = {} + brokers[0] = BrokerMetadata(1, 'broker_1', 4567) + brokers[1] = BrokerMetadata(2, 'broker_2', 5678) + + topics = {} + topics['topic_1'] = { + 0: PartitionMetadata('topic_1', 0, 1, [1, 2], [1, 2]) + } + topics['topic_2'] = { + 0: PartitionMetadata('topic_2', 0, 0, [0, 1], [0, 1]), + 1: PartitionMetadata('topic_2', 1, 1, [1, 0], [1, 0]) + } + protocol.decode_metadata_response.return_value = (brokers, topics) + + client = KafkaClient(hosts='broker_1:4567') + self.assertItemsEqual( + { + TopicAndPartition('topic_1', 0): brokers[0], + TopicAndPartition('topic_2', 0): brokers[0], + TopicAndPartition('topic_2', 1): brokers[1] + }, + client.topics_to_brokers) + + @unittest.skip('requires disabling recursion on _load_metadata_for_topics') + @patch('kafka.client.KafkaConnection') + @patch('kafka.client.KafkaProtocol') + def test_client_load_metadata_unassigned_partitions(self, protocol, conn): + + conn.recv.return_value = 'response' # anything but None + + brokers = {} + brokers[0] = BrokerMetadata(0, 'broker_1', 4567) + brokers[1] = BrokerMetadata(1, 'broker_2', 5678) + + topics = {} + topics['topic_1'] = { + 0: PartitionMetadata('topic_1', 0, -1, [], []) + } + protocol.decode_metadata_response.return_value = (brokers, topics) + + client = KafkaClient(hosts='broker_1:4567') + + self.assertItemsEqual({}, client.topics_to_brokers) + self.assertRaises( + Exception, + client._get_leader_for_partition, + 'topic_1', 0) + + # calling _get_leader_for_partition (from any broker aware request) + # will try loading metadata again for the same topic + topics['topic_1'] = { + 0: PartitionMetadata('topic_1', 0, 0, [0, 1], [0, 1]) + } + leader = client._get_leader_for_partition('topic_1', 0) + + self.assertEqual(brokers[0], leader) + self.assertItemsEqual( + { + TopicAndPartition('topic_1', 0): brokers[0], + }, + client.topics_to_brokers) + + @unittest.skip('requires disabling recursion on _load_metadata_for_topics') + @patch('kafka.client.KafkaConnection') + @patch('kafka.client.KafkaProtocol') + def test_client_load_metadata_noleader_partitions(self, protocol, conn): + + conn.recv.return_value = 'response' # anything but None + + brokers = {} + brokers[0] = BrokerMetadata(0, 'broker_1', 4567) + brokers[1] = BrokerMetadata(1, 'broker_2', 5678) + + topics = {} + topics['topic_1'] = { + 0: PartitionMetadata('topic_1', 0, -1, [], []) + } + topics['topic_2'] = { + 0: PartitionMetadata('topic_2', 0, 0, [0, 1], []), + 1: PartitionMetadata('topic_2', 1, 1, [1, 0], [1, 0]) + } + protocol.decode_metadata_response.return_value = (brokers, topics) + + client = KafkaClient(hosts='broker_1:4567') + self.assertItemsEqual( + { + TopicAndPartition('topic_2', 0): brokers[0], + TopicAndPartition('topic_2', 1): brokers[1] + }, + client.topics_to_brokers) + self.assertRaises( + Exception, + client._get_leader_for_partition, + 'topic_1', 0) + + # calling _get_leader_for_partition (from any broker aware request) + # will try loading metadata again for the same topic + topics['topic_1'] = { + 0: PartitionMetadata('topic_1', 0, 0, [0, 1], [0, 1]) + } + leader = client._get_leader_for_partition('topic_1', 0) + + self.assertEqual(brokers[0], leader) + self.assertItemsEqual( + { + TopicAndPartition('topic_1', 0): brokers[0], + TopicAndPartition('topic_2', 0): brokers[0], + TopicAndPartition('topic_2', 1): brokers[1] + }, + client.topics_to_brokers) + if __name__ == '__main__': unittest.main() |