summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--README.md8
-rw-r--r--example.py2
-rw-r--r--kafka/NOTES.md2
-rw-r--r--kafka/client.py35
-rw-r--r--kafka/conn.py33
-rw-r--r--setup.py5
-rw-r--r--test/test_integration.py23
-rw-r--r--test/test_unit.py187
8 files changed, 256 insertions, 39 deletions
diff --git a/README.md b/README.md
index c9f782d..edf3931 100644
--- a/README.md
+++ b/README.md
@@ -30,7 +30,7 @@ from kafka.client import KafkaClient
from kafka.consumer import SimpleConsumer
from kafka.producer import SimpleProducer, KeyedProducer
-kafka = KafkaClient("localhost", 9092)
+kafka = KafkaClient("localhost:9092")
# To send messages synchronously
producer = SimpleProducer(kafka, "my-topic")
@@ -81,7 +81,7 @@ from kafka.client import KafkaClient
from kafka.producer import KeyedProducer
from kafka.partitioner import HashedPartitioner, RoundRobinPartitioner
-kafka = KafkaClient("localhost", 9092)
+kafka = KafkaClient("localhost:9092")
# HashedPartitioner is default
producer = KeyedProducer(kafka, "my-topic")
@@ -96,7 +96,7 @@ producer = KeyedProducer(kafka, "my-topic", partitioner=RoundRobinPartitioner)
from kafka.client import KafkaClient
from kafka.consumer import MultiProcessConsumer
-kafka = KafkaClient("localhost", 9092)
+kafka = KafkaClient("localhost:9092")
# This will split the number of partitions among two processes
consumer = MultiProcessConsumer(kafka, "my-group", "my-topic", num_procs=2)
@@ -116,7 +116,7 @@ for message in consumer.get_messages(count=5, block=True, timeout=4):
```python
from kafka.client import KafkaClient
-kafka = KafkaClient("localhost", 9092)
+kafka = KafkaClient("localhost:9092")
req = ProduceRequest(topic="my-topic", partition=1,
messages=[KafkaProdocol.encode_message("some message")])
resps = kafka.send_produce_request(payloads=[req], fail_on_error=True)
diff --git a/example.py b/example.py
index 3a2dc92..0cf5583 100644
--- a/example.py
+++ b/example.py
@@ -14,7 +14,7 @@ def consume_example(client):
print(message)
def main():
- client = KafkaClient("localhost", 9092)
+ client = KafkaClient("localhost:9092")
produce_example(client)
consume_example(client)
diff --git a/kafka/NOTES.md b/kafka/NOTES.md
index 540cdad..8fb0f47 100644
--- a/kafka/NOTES.md
+++ b/kafka/NOTES.md
@@ -18,7 +18,7 @@ There are a few levels of abstraction:
# Possible API
- client = KafkaClient("localhost", 9092)
+ client = KafkaClient("localhost:9092")
producer = KafkaProducer(client, "topic")
producer.send_string("hello")
diff --git a/kafka/client.py b/kafka/client.py
index 71ededa..81eec7d 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -8,7 +8,7 @@ import time
from kafka.common import ErrorMapping, TopicAndPartition
from kafka.common import ConnectionError, FailedPayloadsException
-from kafka.conn import KafkaConnection
+from kafka.conn import collect_hosts, KafkaConnection
from kafka.protocol import KafkaProtocol
log = logging.getLogger("kafka")
@@ -19,13 +19,15 @@ class KafkaClient(object):
CLIENT_ID = "kafka-python"
ID_GEN = count()
- def __init__(self, host, port, bufsize=4096, client_id=CLIENT_ID):
+ def __init__(self, hosts, bufsize=4096, client_id=CLIENT_ID):
# We need one connection to bootstrap
self.bufsize = bufsize
self.client_id = client_id
- self.conns = { # (host, port) -> KafkaConnection
- (host, port): KafkaConnection(host, port, bufsize)
- }
+
+ self.hosts = collect_hosts(hosts)
+
+ # create connections only when we need them
+ self.conns = {}
self.brokers = {} # broker_id -> BrokerMetadata
self.topics_to_brokers = {} # topic_id -> broker_id
self.topic_partitions = defaultdict(list) # topic_id -> [0, 1, 2, ...]
@@ -35,15 +37,19 @@ class KafkaClient(object):
# Private API #
##################
+ def _get_conn(self, host, port):
+ "Get or create a connection to a broker using host and port"
+
+ host_key = (host, port)
+ if host_key not in self.conns:
+ self.conns[host_key] = KafkaConnection(host, port, self.bufsize)
+
+ return self.conns[host_key]
+
def _get_conn_for_broker(self, broker):
- """
- Get or create a connection to a broker
- """
- if (broker.host, broker.port) not in self.conns:
- self.conns[(broker.host, broker.port)] = \
- KafkaConnection(broker.host, broker.port, self.bufsize)
+ "Get or create a connection to a broker"
- return self.conns[(broker.host, broker.port)]
+ return self._get_conn(broker.host, broker.port)
def _get_leader_for_partition(self, topic, partition):
key = TopicAndPartition(topic, partition)
@@ -108,7 +114,8 @@ class KafkaClient(object):
Attempt to send a broker-agnostic request to one of the available
brokers. Keep trying until you succeed.
"""
- for conn in self.conns.values():
+ for (host, port) in self.hosts:
+ conn = self._get_conn(host, port)
try:
conn.send(requestId, request)
response = conn.recv(requestId)
@@ -174,7 +181,7 @@ class KafkaClient(object):
except ConnectionError, e: # ignore BufferUnderflow for now
log.warning("Could not send request [%s] to server %s: %s" % (request, conn, e))
failed_payloads += payloads
- self.topics_to_brokers = {} # reset metadata
+ self.topics_to_brokers = {} # reset metadata
continue
for response in decoder_fn(response):
diff --git a/kafka/conn.py b/kafka/conn.py
index 14aebc6..614b1bb 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -2,6 +2,7 @@ import copy
import logging
import socket
import struct
+from random import shuffle
from threading import local
from kafka.common import BufferUnderflowError
@@ -10,6 +11,26 @@ from kafka.common import ConnectionError
log = logging.getLogger("kafka")
+def collect_hosts(hosts, randomize=True):
+ """
+ Collects a comma-separated set of hosts (host:port) and optionnaly
+ randomize the returned list.
+ """
+
+ result = []
+ for host_port in hosts.split(","):
+
+ res = host_port.split(':')
+ host = res[0]
+ port = int(res[1]) if len(res) > 1 else 9092
+ result.append((host.strip(), port))
+
+ if randomize:
+ shuffle(result)
+
+ return result
+
+
class KafkaConnection(local):
"""
A socket connection to a single Kafka broker
@@ -19,14 +40,14 @@ class KafkaConnection(local):
we can do something in here to facilitate multiplexed requests/responses
since the Kafka API includes a correlation id.
"""
- def __init__(self, host, port, bufsize=4096):
+ def __init__(self, host, port, bufsize=4096, timeout=10):
super(KafkaConnection, self).__init__()
self.host = host
self.port = port
self.bufsize = bufsize
- self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self._sock.connect((host, port))
- self._sock.settimeout(10)
+ self.timeout = timeout
+
+ self._sock = socket.create_connection((host, port), timeout=timeout)
self._dirty = False
def __str__(self):
@@ -125,7 +146,5 @@ class KafkaConnection(local):
Re-initialize the socket connection
"""
self.close()
- self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self._sock.connect((self.host, self.port))
- self._sock.settimeout(10)
+ self._sock = socket.create_connection((self.host, self.port), timeout=self.timeout)
self._dirty = False
diff --git a/setup.py b/setup.py
index a246914..fedf139 100644
--- a/setup.py
+++ b/setup.py
@@ -1,11 +1,12 @@
-import os.path
import sys
from setuptools import setup, Command
class Tox(Command):
+
user_options = []
+
def initialize_options(self):
pass
@@ -21,7 +22,7 @@ setup(
name="kafka-python",
version="0.8.1-1",
- install_requires=["distribute", "tox"],
+ install_requires=["distribute", "tox", "mock"],
tests_require=["tox"],
cmdclass={"test": Tox},
diff --git a/test/test_integration.py b/test/test_integration.py
index a10dae2..1f37ebf 100644
--- a/test/test_integration.py
+++ b/test/test_integration.py
@@ -16,7 +16,7 @@ class TestKafkaClient(unittest.TestCase):
def setUpClass(cls): # noqa
cls.zk = ZookeeperFixture.instance()
cls.server = KafkaFixture.instance(0, cls.zk.host, cls.zk.port)
- cls.client = KafkaClient(cls.server.host, cls.server.port)
+ cls.client = KafkaClient('%s:%d' % (cls.server.host, cls.server.port))
@classmethod
def tearDownClass(cls): # noqa
@@ -554,7 +554,7 @@ class TestConsumer(unittest.TestCase):
cls.zk = ZookeeperFixture.instance()
cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port)
cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port)
- cls.client = KafkaClient(cls.server2.host, cls.server2.port, bufsize=8192)
+ cls.client = KafkaClient('%s:%d' % (cls.server2.host, cls.server2.port), bufsize=8192)
@classmethod
def tearDownClass(cls): # noqa
@@ -770,20 +770,23 @@ class TestConsumer(unittest.TestCase):
self.assertEquals(all_messages[i], message.message)
self.assertEquals(i, 19)
+
class TestFailover(unittest.TestCase):
@classmethod
def setUpClass(cls):
zk_chroot = random_string(10)
- replicas = 2
+ replicas = 2
partitions = 2
# mini zookeeper, 2 kafka brokers
- cls.zk = ZookeeperFixture.instance()
- kk_args = [cls.zk.host, cls.zk.port, zk_chroot, replicas, partitions]
+ cls.zk = ZookeeperFixture.instance()
+ kk_args = [cls.zk.host, cls.zk.port, zk_chroot, replicas, partitions]
cls.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)]
- cls.client = KafkaClient(cls.brokers[0].host, cls.brokers[0].port)
+
+ hosts = ','.join(['%s:%d' % (b.host, b.port) for b in cls.brokers])
+ cls.client = KafkaClient(hosts)
@classmethod
def tearDownClass(cls):
@@ -858,17 +861,19 @@ class TestFailover(unittest.TestCase):
resp = producer.send_messages(random_string(10))
if len(resp) > 0:
self.assertEquals(resp[0].error, 0)
- time.sleep(1) # give it some time
+ time.sleep(1) # give it some time
def _kill_leader(self, topic, partition):
leader = self.client.topics_to_brokers[TopicAndPartition(topic, partition)]
broker = self.brokers[leader.nodeId]
broker.close()
- time.sleep(1) # give it some time
+ time.sleep(1) # give it some time
return broker
def _count_messages(self, group, topic):
- client = KafkaClient(self.brokers[0].host, self.brokers[0].port)
+
+ hosts = '%s:%d' % (self.brokers[0].host, self.brokers[0].port)
+ client = KafkaClient(hosts)
consumer = SimpleConsumer(client, group, topic, auto_commit=False)
all_messages = []
for message in consumer:
diff --git a/test/test_unit.py b/test/test_unit.py
index 3f3af66..4ea9442 100644
--- a/test/test_unit.py
+++ b/test/test_unit.py
@@ -3,13 +3,18 @@ import random
import struct
import unittest
+from mock import patch
+
from kafka.client import KafkaClient
-from kafka.common import ProduceRequest, FetchRequest
from kafka.codec import (
has_gzip, has_snappy,
gzip_encode, gzip_decode,
snappy_encode, snappy_decode
)
+from kafka.common import (
+ ProduceRequest, FetchRequest,
+ BrokerMetadata, PartitionMetadata, TopicAndPartition
+)
ITERATIONS = 1000
STRLEN = 100
@@ -217,5 +222,185 @@ class TestRequests(unittest.TestCase):
self.assertEquals(enc, expect)
+class TestKafkaClient(unittest.TestCase):
+
+ def test_send_broker_unaware_request_fail(self):
+ 'Tests that call fails when all hosts are unavailable'
+
+ from mock import MagicMock
+
+ mocked_conns = {
+ ('kafka01', 9092): MagicMock(),
+ ('kafka02', 9092): MagicMock()
+ }
+ # inject conns
+ mocked_conns[('kafka01', 9092)].send.side_effect = RuntimeError("kafka01 went away (unittest)")
+ mocked_conns[('kafka02', 9092)].send.side_effect = RuntimeError("Kafka02 went away (unittest)")
+
+ def mock_get_conn(host, port):
+ print 'mock_get_conn: %s:%d=%s' % (host, port, mocked_conns[(host, port)])
+ return mocked_conns[(host, port)]
+
+ # patch to avoid making requests before we want it
+ with patch.object(KafkaClient, '_load_metadata_for_topics'), \
+ patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn):
+
+ client = KafkaClient(hosts='kafka01:9092,kafka02:9092')
+
+ resp = client._send_broker_unaware_request(1, 'fake request')
+
+ self.assertIsNone(resp)
+
+ for key, conn in mocked_conns.iteritems():
+ conn.send.assert_called_with(1, 'fake request')
+
+ def test_send_broker_unaware_request(self):
+ 'Tests that call fails when one of the host is available'
+
+ from mock import MagicMock
+
+ mocked_conns = {
+ ('kafka01', 9092): MagicMock(),
+ ('kafka02', 9092): MagicMock(),
+ ('kafka03', 9092): MagicMock()
+ }
+ # inject conns
+ mocked_conns[('kafka01', 9092)].send.side_effect = RuntimeError("kafka01 went away (unittest)")
+ mocked_conns[('kafka02', 9092)].recv.return_value = 'valid response'
+ mocked_conns[('kafka03', 9092)].send.side_effect = RuntimeError("kafka03 went away (unittest)")
+
+ def mock_get_conn(host, port):
+ print 'mock_get_conn: %s:%d=%s' % (host, port, mocked_conns[(host, port)])
+ return mocked_conns[(host, port)]
+
+ # patch to avoid making requests before we want it
+ with patch.object(KafkaClient, '_load_metadata_for_topics'), \
+ patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn):
+
+ client = KafkaClient(hosts='kafka01:9092,kafka02:9092')
+
+ resp = client._send_broker_unaware_request(1, 'fake request')
+
+ self.assertEqual('valid response', resp)
+ mocked_conns[('kafka02', 9092)].recv.assert_called_with(1)
+
+ @unittest.skip('requires disabling recursion on _load_metadata_for_topics')
+ @patch('kafka.client.KafkaConnection')
+ @patch('kafka.client.KafkaProtocol')
+ def test_client_load_metadata(self, protocol, conn):
+
+ conn.recv.return_value = 'response' # anything but None
+
+ brokers = {}
+ brokers[0] = BrokerMetadata(1, 'broker_1', 4567)
+ brokers[1] = BrokerMetadata(2, 'broker_2', 5678)
+
+ topics = {}
+ topics['topic_1'] = {
+ 0: PartitionMetadata('topic_1', 0, 1, [1, 2], [1, 2])
+ }
+ topics['topic_2'] = {
+ 0: PartitionMetadata('topic_2', 0, 0, [0, 1], [0, 1]),
+ 1: PartitionMetadata('topic_2', 1, 1, [1, 0], [1, 0])
+ }
+ protocol.decode_metadata_response.return_value = (brokers, topics)
+
+ client = KafkaClient(hosts='broker_1:4567')
+ self.assertItemsEqual(
+ {
+ TopicAndPartition('topic_1', 0): brokers[0],
+ TopicAndPartition('topic_2', 0): brokers[0],
+ TopicAndPartition('topic_2', 1): brokers[1]
+ },
+ client.topics_to_brokers)
+
+ @unittest.skip('requires disabling recursion on _load_metadata_for_topics')
+ @patch('kafka.client.KafkaConnection')
+ @patch('kafka.client.KafkaProtocol')
+ def test_client_load_metadata_unassigned_partitions(self, protocol, conn):
+
+ conn.recv.return_value = 'response' # anything but None
+
+ brokers = {}
+ brokers[0] = BrokerMetadata(0, 'broker_1', 4567)
+ brokers[1] = BrokerMetadata(1, 'broker_2', 5678)
+
+ topics = {}
+ topics['topic_1'] = {
+ 0: PartitionMetadata('topic_1', 0, -1, [], [])
+ }
+ protocol.decode_metadata_response.return_value = (brokers, topics)
+
+ client = KafkaClient(hosts='broker_1:4567')
+
+ self.assertItemsEqual({}, client.topics_to_brokers)
+ self.assertRaises(
+ Exception,
+ client._get_leader_for_partition,
+ 'topic_1', 0)
+
+ # calling _get_leader_for_partition (from any broker aware request)
+ # will try loading metadata again for the same topic
+ topics['topic_1'] = {
+ 0: PartitionMetadata('topic_1', 0, 0, [0, 1], [0, 1])
+ }
+ leader = client._get_leader_for_partition('topic_1', 0)
+
+ self.assertEqual(brokers[0], leader)
+ self.assertItemsEqual(
+ {
+ TopicAndPartition('topic_1', 0): brokers[0],
+ },
+ client.topics_to_brokers)
+
+ @unittest.skip('requires disabling recursion on _load_metadata_for_topics')
+ @patch('kafka.client.KafkaConnection')
+ @patch('kafka.client.KafkaProtocol')
+ def test_client_load_metadata_noleader_partitions(self, protocol, conn):
+
+ conn.recv.return_value = 'response' # anything but None
+
+ brokers = {}
+ brokers[0] = BrokerMetadata(0, 'broker_1', 4567)
+ brokers[1] = BrokerMetadata(1, 'broker_2', 5678)
+
+ topics = {}
+ topics['topic_1'] = {
+ 0: PartitionMetadata('topic_1', 0, -1, [], [])
+ }
+ topics['topic_2'] = {
+ 0: PartitionMetadata('topic_2', 0, 0, [0, 1], []),
+ 1: PartitionMetadata('topic_2', 1, 1, [1, 0], [1, 0])
+ }
+ protocol.decode_metadata_response.return_value = (brokers, topics)
+
+ client = KafkaClient(hosts='broker_1:4567')
+ self.assertItemsEqual(
+ {
+ TopicAndPartition('topic_2', 0): brokers[0],
+ TopicAndPartition('topic_2', 1): brokers[1]
+ },
+ client.topics_to_brokers)
+ self.assertRaises(
+ Exception,
+ client._get_leader_for_partition,
+ 'topic_1', 0)
+
+ # calling _get_leader_for_partition (from any broker aware request)
+ # will try loading metadata again for the same topic
+ topics['topic_1'] = {
+ 0: PartitionMetadata('topic_1', 0, 0, [0, 1], [0, 1])
+ }
+ leader = client._get_leader_for_partition('topic_1', 0)
+
+ self.assertEqual(brokers[0], leader)
+ self.assertItemsEqual(
+ {
+ TopicAndPartition('topic_1', 0): brokers[0],
+ TopicAndPartition('topic_2', 0): brokers[0],
+ TopicAndPartition('topic_2', 1): brokers[1]
+ },
+ client.topics_to_brokers)
+
if __name__ == '__main__':
unittest.main()