summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormrtheb <mrlabbe@gmail.com>2014-02-09 13:44:47 -0500
committermrtheb <mrlabbe@gmail.com>2014-02-09 13:44:47 -0500
commita2191e5be5d5fcd212582580c163f4533cca6c73 (patch)
tree6d13a0928f572d99a47299b536cdf209dbd29a5f
parent84de472a4d5b583ff3ed6cc6d92250a7c9291ceb (diff)
downloadkafka-python-a2191e5be5d5fcd212582580c163f4533cca6c73.tar.gz
Support list (or comma-separated) of hosts (replaces host and port arguments)
-rw-r--r--kafka/client.py13
-rw-r--r--kafka/conn.py5
-rw-r--r--test/test_integration.py26
-rw-r--r--test/test_unit.py40
4 files changed, 56 insertions, 28 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 33c4419..96cc1df 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -10,7 +10,7 @@ from kafka.common import (ErrorMapping, TopicAndPartition,
BrokerResponseError, PartitionUnavailableError,
KafkaUnavailableError, KafkaRequestError)
-from kafka.conn import KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS
+from kafka.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS
from kafka.protocol import KafkaProtocol
log = logging.getLogger("kafka")
@@ -24,14 +24,15 @@ class KafkaClient(object):
# NOTE: The timeout given to the client should always be greater than the
# one passed to SimpleConsumer.get_message(), otherwise you can get a
# socket timeout.
- def __init__(self, host, port, client_id=CLIENT_ID,
+ def __init__(self, hosts, client_id=CLIENT_ID,
timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS):
# We need one connection to bootstrap
self.client_id = client_id
self.timeout = timeout
- self.conns = { # (host, port) -> KafkaConnection
- (host, port): KafkaConnection(host, port, timeout=timeout)
- }
+ self.hosts = collect_hosts(hosts)
+
+ # create connections only when we need them
+ self.conns = {}
self.brokers = {} # broker_id -> BrokerMetadata
self.topics_to_brokers = {} # topic_id -> broker_id
self.topic_partitions = {} # topic_id -> [0, 1, 2, ...]
@@ -46,7 +47,7 @@ class KafkaClient(object):
host_key = (host, port)
if host_key not in self.conns:
- self.conns[host_key] = KafkaConnection(host, port, self.bufsize)
+ self.conns[host_key] = KafkaConnection(host, port)
return self.conns[host_key]
diff --git a/kafka/conn.py b/kafka/conn.py
index de2d385..20f22dc 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -17,8 +17,11 @@ def collect_hosts(hosts, randomize=True):
randomize the returned list.
"""
+ if isinstance(hosts, str):
+ hosts = hosts.split(',')
+
result = []
- for host_port in hosts.split(","):
+ for host_port in hosts:
res = host_port.split(':')
host = res[0]
diff --git a/test/test_integration.py b/test/test_integration.py
index 000f44a..3d6ccf6 100644
--- a/test/test_integration.py
+++ b/test/test_integration.py
@@ -33,7 +33,7 @@ def ensure_topic_creation(client, topic_name):
class KafkaTestCase(unittest.TestCase):
def setUp(self):
- self.topic = "%s-%s" % (self.id()[self.id().rindex(".")+1:], random_string(10))
+ self.topic = "%s-%s" % (self.id()[self.id().rindex(".") + 1:], random_string(10))
ensure_topic_creation(self.client, self.topic)
@@ -578,7 +578,7 @@ class TestConsumer(KafkaTestCase):
cls.zk = ZookeeperFixture.instance()
cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port)
cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port)
- cls.client = KafkaClient('%s:%d' % (cls.server2.host, cls.server2.port), bufsize=8192)
+ cls.client = KafkaClient('%s:%d' % (cls.server2.host, cls.server2.port))
@classmethod
def tearDownClass(cls): # noqa
@@ -800,7 +800,6 @@ class TestConsumer(KafkaTestCase):
self.assertEquals(all_messages[i], message.message)
self.assertEquals(i, 19)
-
# Produce 1 message that is too large (bigger than max fetch size)
big_message_size = MAX_FETCH_BUFFER_SIZE_BYTES + 10
big_message = create_message(random_string(big_message_size))
@@ -827,25 +826,26 @@ class TestConsumer(KafkaTestCase):
class TestFailover(KafkaTestCase):
- def setUp(self):
+ @classmethod
+ def setUpClass(cls): # noqa
zk_chroot = random_string(10)
replicas = 2
partitions = 2
# mini zookeeper, 2 kafka brokers
- self.zk = ZookeeperFixture.instance()
- kk_args = [self.zk.host, self.zk.port, zk_chroot, replicas, partitions]
- self.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)]
+ cls.zk = ZookeeperFixture.instance()
+ kk_args = [cls.zk.host, cls.zk.port, zk_chroot, replicas, partitions]
+ cls.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)]
- hosts = ','.join(['%s:%d' % (b.host, b.port) for b in cls.brokers])
+ hosts = ['%s:%d' % (b.host, b.port) for b in cls.brokers]
cls.client = KafkaClient(hosts)
- super(TestFailover, self).setUp()
- def tearDown(self):
- self.client.close()
- for broker in self.brokers:
+ @classmethod
+ def tearDownClass(cls):
+ cls.client.close()
+ for broker in cls.brokers:
broker.close()
- self.zk.close()
+ cls.zk.close()
def test_switch_leader(self):
key, topic, partition = random_string(5), self.topic, 0
diff --git a/test/test_unit.py b/test/test_unit.py
index 4c78c1b..624fe39 100644
--- a/test/test_unit.py
+++ b/test/test_unit.py
@@ -5,11 +5,13 @@ import unittest
from mock import patch
+from kafka import KafkaClient
from kafka.common import (
ProduceRequest, FetchRequest, Message, ChecksumError,
ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse,
OffsetAndMessage, BrokerMetadata, PartitionMetadata
)
+from kafka.common import KafkaUnavailableError
from kafka.codec import (
has_gzip, has_snappy, gzip_encode, gzip_decode,
snappy_encode, snappy_decode
@@ -384,6 +386,26 @@ class TestProtocol(unittest.TestCase):
class TestKafkaClient(unittest.TestCase):
+ def test_init_with_list(self):
+
+ with patch.object(KafkaClient, 'load_metadata_for_topics'):
+ client = KafkaClient(
+ hosts=['kafka01:9092', 'kafka02:9092', 'kafka03:9092'])
+
+ self.assertItemsEqual(
+ [('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)],
+ client.hosts)
+
+ def test_init_with_csv(self):
+
+ with patch.object(KafkaClient, 'load_metadata_for_topics'):
+ client = KafkaClient(
+ hosts='kafka01:9092,kafka02:9092,kafka03:9092')
+
+ self.assertItemsEqual(
+ [('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)],
+ client.hosts)
+
def test_send_broker_unaware_request_fail(self):
'Tests that call fails when all hosts are unavailable'
@@ -402,14 +424,16 @@ class TestKafkaClient(unittest.TestCase):
return mocked_conns[(host, port)]
# patch to avoid making requests before we want it
- with patch.object(KafkaClient, '_load_metadata_for_topics'), \
+ with patch.object(KafkaClient, 'load_metadata_for_topics'), \
patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn):
- client = KafkaClient(hosts='kafka01:9092,kafka02:9092')
+ client = KafkaClient(hosts=['kafka01:9092','kafka02:9092'])
- resp = client._send_broker_unaware_request(1, 'fake request')
- self.assertIsNone(resp)
+ self.assertRaises(
+ KafkaUnavailableError,
+ client._send_broker_unaware_request,
+ 1, 'fake request')
for key, conn in mocked_conns.iteritems():
conn.send.assert_called_with(1, 'fake request')
@@ -434,7 +458,7 @@ class TestKafkaClient(unittest.TestCase):
return mocked_conns[(host, port)]
# patch to avoid making requests before we want it
- with patch.object(KafkaClient, '_load_metadata_for_topics'), \
+ with patch.object(KafkaClient, 'load_metadata_for_topics'), \
patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn):
client = KafkaClient(hosts='kafka01:9092,kafka02:9092')
@@ -444,7 +468,7 @@ class TestKafkaClient(unittest.TestCase):
self.assertEqual('valid response', resp)
mocked_conns[('kafka02', 9092)].recv.assert_called_with(1)
- @unittest.skip('requires disabling recursion on _load_metadata_for_topics')
+ @unittest.skip('requires disabling recursion on load_metadata_for_topics')
@patch('kafka.client.KafkaConnection')
@patch('kafka.client.KafkaProtocol')
def test_client_load_metadata(self, protocol, conn):
@@ -474,7 +498,7 @@ class TestKafkaClient(unittest.TestCase):
},
client.topics_to_brokers)
- @unittest.skip('requires disabling recursion on _load_metadata_for_topics')
+ @unittest.skip('requires disabling recursion on load_metadata_for_topics')
@patch('kafka.client.KafkaConnection')
@patch('kafka.client.KafkaProtocol')
def test_client_load_metadata_unassigned_partitions(self, protocol, conn):
@@ -513,7 +537,7 @@ class TestKafkaClient(unittest.TestCase):
},
client.topics_to_brokers)
- @unittest.skip('requires disabling recursion on _load_metadata_for_topics')
+ @unittest.skip('requires disabling recursion on load_metadata_for_topics')
@patch('kafka.client.KafkaConnection')
@patch('kafka.client.KafkaProtocol')
def test_client_load_metadata_noleader_partitions(self, protocol, conn):