diff options
-rw-r--r-- | README.md | 8 | ||||
-rw-r--r-- | example.py | 2 | ||||
-rw-r--r-- | kafka/NOTES.md | 2 | ||||
-rw-r--r-- | kafka/__init__.py | 2 | ||||
-rw-r--r-- | kafka/client.py | 29 | ||||
-rw-r--r-- | kafka/codec.py | 98 | ||||
-rw-r--r-- | kafka/conn.py | 28 | ||||
-rw-r--r-- | setup.py | 1 | ||||
-rw-r--r-- | test/test_integration.py | 34 | ||||
-rw-r--r-- | test/test_unit.py | 151 |
10 files changed, 303 insertions, 52 deletions
@@ -29,7 +29,7 @@ from kafka.client import KafkaClient from kafka.consumer import SimpleConsumer from kafka.producer import SimpleProducer, KeyedProducer -kafka = KafkaClient("localhost", 9092) +kafka = KafkaClient("localhost:9092") # To send messages synchronously producer = SimpleProducer(kafka) @@ -80,7 +80,7 @@ from kafka.client import KafkaClient from kafka.producer import KeyedProducer from kafka.partitioner import HashedPartitioner, RoundRobinPartitioner -kafka = KafkaClient("localhost", 9092) +kafka = KafkaClient("localhost:9092") # HashedPartitioner is default producer = KeyedProducer(kafka) @@ -95,7 +95,7 @@ producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner) from kafka.client import KafkaClient from kafka.consumer import MultiProcessConsumer -kafka = KafkaClient("localhost", 9092) +kafka = KafkaClient("localhost:9092") # This will split the number of partitions among two processes consumer = MultiProcessConsumer(kafka, "my-group", "my-topic", num_procs=2) @@ -115,7 +115,7 @@ for message in consumer.get_messages(count=5, block=True, timeout=4): ```python from kafka.client import KafkaClient -kafka = KafkaClient("localhost", 9092) +kafka = KafkaClient("localhost:9092") req = ProduceRequest(topic="my-topic", partition=1, messages=[KafkaProdocol.encode_message("some message")]) resps = kafka.send_produce_request(payloads=[req], fail_on_error=True) @@ -14,7 +14,7 @@ def consume_example(client): print(message) def main(): - client = KafkaClient("localhost", 9092) + client = KafkaClient("localhost:9092") produce_example(client) consume_example(client) diff --git a/kafka/NOTES.md b/kafka/NOTES.md index 540cdad..8fb0f47 100644 --- a/kafka/NOTES.md +++ b/kafka/NOTES.md @@ -18,7 +18,7 @@ There are a few levels of abstraction: # Possible API - client = KafkaClient("localhost", 9092) + client = KafkaClient("localhost:9092") producer = KafkaProducer(client, "topic") producer.send_string("hello") diff --git a/kafka/__init__.py b/kafka/__init__.py index 73aa760..e446f58 100644 --- a/kafka/__init__.py +++ b/kafka/__init__.py @@ -1,5 +1,5 @@ __title__ = 'kafka' -__version__ = '0.2-alpha' +__version__ = '0.9.0' __author__ = 'David Arthur' __license__ = 'Apache License 2.0' __copyright__ = 'Copyright 2012, David Arthur under Apache License, v2.0' diff --git a/kafka/client.py b/kafka/client.py index c3606e4..ab0eb8d 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -11,7 +11,7 @@ from kafka.common import (ErrorMapping, TopicAndPartition, LeaderUnavailableError, KafkaUnavailableError) -from kafka.conn import KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS +from kafka.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS from kafka.protocol import KafkaProtocol log = logging.getLogger("kafka") @@ -25,14 +25,15 @@ class KafkaClient(object): # NOTE: The timeout given to the client should always be greater than the # one passed to SimpleConsumer.get_message(), otherwise you can get a # socket timeout. - def __init__(self, host, port, client_id=CLIENT_ID, + def __init__(self, hosts, client_id=CLIENT_ID, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS): # We need one connection to bootstrap self.client_id = client_id self.timeout = timeout - self.conns = { # (host, port) -> KafkaConnection - (host, port): KafkaConnection(host, port, timeout=timeout) - } + self.hosts = collect_hosts(hosts) + + # create connections only when we need them + self.conns = {} self.brokers = {} # broker_id -> BrokerMetadata self.topics_to_brokers = {} # topic_id -> broker_id self.topic_partitions = {} # topic_id -> [0, 1, 2, ...] @@ -42,6 +43,15 @@ class KafkaClient(object): # Private API # ################## + def _get_conn(self, host, port): + "Get or create a connection to a broker using host and port" + + host_key = (host, port) + if host_key not in self.conns: + self.conns[host_key] = KafkaConnection(host, port) + + return self.conns[host_key] + def _get_conn_for_broker(self, broker): """ Get or create a connection to a broker @@ -50,7 +60,7 @@ class KafkaClient(object): self.conns[(broker.host, broker.port)] = \ KafkaConnection(broker.host, broker.port, timeout=self.timeout) - return self.conns[(broker.host, broker.port)] + return self._get_conn(broker.host, broker.port) def _get_leader_for_partition(self, topic, partition): """ @@ -83,14 +93,15 @@ class KafkaClient(object): Attempt to send a broker-agnostic request to one of the available brokers. Keep trying until you succeed. """ - for conn in self.conns.values(): + for (host, port) in self.hosts: try: + conn = self._get_conn(host, port) conn.send(requestId, request) response = conn.recv(requestId) return response except Exception, e: - log.warning("Could not send request [%r] to server %s, " - "trying next server: %s" % (request, conn, e)) + log.warning("Could not send request [%r] to server %s:%i, " + "trying next server: %s" % (request, host, port, e)) continue raise KafkaUnavailableError("All servers failed to process request") diff --git a/kafka/codec.py b/kafka/codec.py index eb5d03c..206ddb4 100644 --- a/kafka/codec.py +++ b/kafka/codec.py @@ -1,5 +1,9 @@ from cStringIO import StringIO import gzip +import struct + +_XERIAL_V1_HEADER = (-126, 'S', 'N', 'A', 'P', 'P', 'Y', 0, 1, 1) +_XERIAL_V1_FORMAT = 'bccccccBii' try: import snappy @@ -36,13 +40,101 @@ def gzip_decode(payload): return result -def snappy_encode(payload): +def snappy_encode(payload, xerial_compatible=False, xerial_blocksize=32 * 1024): + """Encodes the given data with snappy if xerial_compatible is set then the + stream is encoded in a fashion compatible with the xerial snappy library + + The block size (xerial_blocksize) controls how frequent the blocking occurs + 32k is the default in the xerial library. + + The format winds up being + +-------------+------------+--------------+------------+--------------+ + | Header | Block1 len | Block1 data | Blockn len | Blockn data | + |-------------+------------+--------------+------------+--------------| + | 16 bytes | BE int32 | snappy bytes | BE int32 | snappy bytes | + +-------------+------------+--------------+------------+--------------+ + + It is important to not that the blocksize is the amount of uncompressed + data presented to snappy at each block, whereas the blocklen is the + number of bytes that will be present in the stream, that is the + length will always be <= blocksize. + """ + if not _has_snappy: raise NotImplementedError("Snappy codec is not available") - return snappy.compress(payload) + + if xerial_compatible: + def _chunker(): + for i in xrange(0, len(payload), xerial_blocksize): + yield payload[i:i+xerial_blocksize] + + out = StringIO() + + header = ''.join([struct.pack('!' + fmt, dat) for fmt, dat + in zip(_XERIAL_V1_FORMAT, _XERIAL_V1_HEADER)]) + + out.write(header) + for chunk in _chunker(): + block = snappy.compress(chunk) + block_size = len(block) + out.write(struct.pack('!i', block_size)) + out.write(block) + + out.seek(0) + return out.read() + + else: + return snappy.compress(payload) + + +def _detect_xerial_stream(payload): + """Detects if the data given might have been encoded with the blocking mode + of the xerial snappy library. + + This mode writes a magic header of the format: + +--------+--------------+------------+---------+--------+ + | Marker | Magic String | Null / Pad | Version | Compat | + |--------+--------------+------------+---------+--------| + | byte | c-string | byte | int32 | int32 | + |--------+--------------+------------+---------+--------| + | -126 | 'SNAPPY' | \0 | | | + +--------+--------------+------------+---------+--------+ + + The pad appears to be to ensure that SNAPPY is a valid cstring + The version is the version of this format as written by xerial, + in the wild this is currently 1 as such we only support v1. + + Compat is there to claim the miniumum supported version that + can read a xerial block stream, presently in the wild this is + 1. + """ + + if len(payload) > 16: + header = header = struct.unpack('!' + _XERIAL_V1_FORMAT, bytes(payload)[:16]) + return header == _XERIAL_V1_HEADER + return False def snappy_decode(payload): if not _has_snappy: raise NotImplementedError("Snappy codec is not available") - return snappy.decompress(payload) + + if _detect_xerial_stream(payload): + # TODO ? Should become a fileobj ? + out = StringIO() + byt = buffer(payload[16:]) + length = len(byt) + cursor = 0 + + while cursor < length: + block_size = struct.unpack_from('!i', byt[cursor:])[0] + # Skip the block size + cursor += 4 + end = cursor + block_size + out.write(snappy.decompress(byt[cursor:end])) + cursor = end + + out.seek(0) + return out.read() + else: + return snappy.decompress(payload) diff --git a/kafka/conn.py b/kafka/conn.py index 2b8f1c2..7266ae8 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -2,6 +2,7 @@ import copy import logging import socket import struct +from random import shuffle from threading import local from kafka.common import ConnectionError @@ -9,6 +10,31 @@ from kafka.common import ConnectionError log = logging.getLogger("kafka") DEFAULT_SOCKET_TIMEOUT_SECONDS = 120 +DEFAULT_KAFKA_PORT = 9092 + + +def collect_hosts(hosts, randomize=True): + """ + Collects a comma-separated set of hosts (host:port) and optionally + randomize the returned list. + """ + + if isinstance(hosts, str): + hosts = hosts.strip().split(',') + + result = [] + for host_port in hosts: + + res = host_port.split(':') + host = res[0] + port = int(res[1]) if len(res) > 1 else DEFAULT_KAFKA_PORT + result.append((host.strip(), port)) + + if randomize: + shuffle(result) + + return result + class KafkaConnection(local): """ @@ -81,7 +107,7 @@ class KafkaConnection(local): sent = self._sock.sendall(payload) if sent is not None: self._raise_connection_error() - except socket.error, e: + except socket.error: log.exception('Unable to send payload to Kafka') self._raise_connection_error() @@ -4,6 +4,7 @@ from setuptools import setup, Command class Tox(Command): + user_options = [] def initialize_options(self): diff --git a/test/test_integration.py b/test/test_integration.py index d0da523..3d6ccf6 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -33,7 +33,7 @@ def ensure_topic_creation(client, topic_name): class KafkaTestCase(unittest.TestCase): def setUp(self): - self.topic = "%s-%s" % (self.id()[self.id().rindex(".")+1:], random_string(10)) + self.topic = "%s-%s" % (self.id()[self.id().rindex(".") + 1:], random_string(10)) ensure_topic_creation(self.client, self.topic) @@ -42,7 +42,7 @@ class TestKafkaClient(KafkaTestCase): def setUpClass(cls): # noqa cls.zk = ZookeeperFixture.instance() cls.server = KafkaFixture.instance(0, cls.zk.host, cls.zk.port) - cls.client = KafkaClient(cls.server.host, cls.server.port) + cls.client = KafkaClient('%s:%d' % (cls.server.host, cls.server.port)) @classmethod def tearDownClass(cls): # noqa @@ -578,7 +578,7 @@ class TestConsumer(KafkaTestCase): cls.zk = ZookeeperFixture.instance() cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port) cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port) - cls.client = KafkaClient(cls.server2.host, cls.server2.port) + cls.client = KafkaClient('%s:%d' % (cls.server2.host, cls.server2.port)) @classmethod def tearDownClass(cls): # noqa @@ -826,23 +826,26 @@ class TestConsumer(KafkaTestCase): class TestFailover(KafkaTestCase): - def setUp(self): + @classmethod + def setUpClass(cls): # noqa zk_chroot = random_string(10) replicas = 2 partitions = 2 # mini zookeeper, 2 kafka brokers - self.zk = ZookeeperFixture.instance() - kk_args = [self.zk.host, self.zk.port, zk_chroot, replicas, partitions] - self.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)] - self.client = KafkaClient(self.brokers[0].host, self.brokers[0].port) - super(TestFailover, self).setUp() - - def tearDown(self): - self.client.close() - for broker in self.brokers: + cls.zk = ZookeeperFixture.instance() + kk_args = [cls.zk.host, cls.zk.port, zk_chroot, replicas, partitions] + cls.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)] + + hosts = ['%s:%d' % (b.host, b.port) for b in cls.brokers] + cls.client = KafkaClient(hosts) + + @classmethod + def tearDownClass(cls): + cls.client.close() + for broker in cls.brokers: broker.close() - self.zk.close() + cls.zk.close() def test_switch_leader(self): key, topic, partition = random_string(5), self.topic, 0 @@ -918,7 +921,8 @@ class TestFailover(KafkaTestCase): return broker def _count_messages(self, group, topic): - client = KafkaClient(self.brokers[0].host, self.brokers[0].port) + hosts = '%s:%d' % (self.brokers[0].host, self.brokers[0].port) + client = KafkaClient(hosts) consumer = SimpleConsumer(client, group, topic, auto_commit=False, iter_timeout=0) all_messages = [] for message in consumer: diff --git a/test/test_unit.py b/test/test_unit.py index 0b2b339..47ec561 100644 --- a/test/test_unit.py +++ b/test/test_unit.py @@ -3,11 +3,15 @@ import random import struct import unittest +from mock import MagicMock, patch + +from kafka import KafkaClient from kafka.common import ( ProduceRequest, FetchRequest, Message, ChecksumError, ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse, OffsetAndMessage, BrokerMetadata, PartitionMetadata, - TopicAndPartition, LeaderUnavailableError, PartitionUnavailableError + TopicAndPartition, KafkaUnavailableError, + LeaderUnavailableError, PartitionUnavailableError ) from kafka.codec import ( has_gzip, has_snappy, gzip_encode, gzip_decode, @@ -17,10 +21,6 @@ from kafka.protocol import ( create_gzip_message, create_message, create_snappy_message, KafkaProtocol ) -from kafka.client import KafkaClient - -from mock import patch - ITERATIONS = 1000 STRLEN = 100 @@ -55,6 +55,7 @@ class TestPackage(unittest.TestCase): from kafka import KafkaClient as KafkaClient2 self.assertEquals(KafkaClient2.__name__, "KafkaClient") + from kafka.codec import snappy_encode self.assertEquals(snappy_encode.__name__, "snappy_encode") @@ -74,6 +75,49 @@ class TestCodec(unittest.TestCase): s2 = snappy_decode(snappy_encode(s1)) self.assertEquals(s1, s2) + @unittest.skipUnless(has_snappy(), "Snappy not available") + def test_snappy_detect_xerial(self): + import kafka as kafka1 + _detect_xerial_stream = kafka1.codec._detect_xerial_stream + + header = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01Some extra bytes' + false_header = b'\x01SNAPPY\x00\x00\x00\x01\x00\x00\x00\x01' + random_snappy = snappy_encode('SNAPPY' * 50) + short_data = b'\x01\x02\x03\x04' + + self.assertTrue(_detect_xerial_stream(header)) + self.assertFalse(_detect_xerial_stream(b'')) + self.assertFalse(_detect_xerial_stream(b'\x00')) + self.assertFalse(_detect_xerial_stream(false_header)) + self.assertFalse(_detect_xerial_stream(random_snappy)) + self.assertFalse(_detect_xerial_stream(short_data)) + + @unittest.skipUnless(has_snappy(), "Snappy not available") + def test_snappy_decode_xerial(self): + header = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01' + random_snappy = snappy_encode('SNAPPY' * 50) + block_len = len(random_snappy) + random_snappy2 = snappy_encode('XERIAL' * 50) + block_len2 = len(random_snappy2) + + to_test = header \ + + struct.pack('!i', block_len) + random_snappy \ + + struct.pack('!i', block_len2) + random_snappy2 \ + + self.assertEquals(snappy_decode(to_test), ('SNAPPY' * 50) + ('XERIAL' * 50)) + + @unittest.skipUnless(has_snappy(), "Snappy not available") + def test_snappy_encode_xerial(self): + to_ensure = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01' + \ + '\x00\x00\x00\x18' + \ + '\xac\x02\x14SNAPPY\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\x96\x06\x00' + \ + '\x00\x00\x00\x18' + \ + '\xac\x02\x14XERIAL\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\x96\x06\x00' + + to_test = ('SNAPPY' * 50) + ('XERIAL' * 50) + + compressed = snappy_encode(to_test, xerial_compatible=True, xerial_blocksize=300) + self.assertEquals(compressed, to_ensure) class TestProtocol(unittest.TestCase): @@ -91,9 +135,8 @@ class TestProtocol(unittest.TestCase): payloads = ["v1", "v2"] msg = create_gzip_message(payloads) self.assertEqual(msg.magic, 0) - self.assertEqual( - msg.attributes, - KafkaProtocol.ATTRIBUTE_CODEC_MASK & KafkaProtocol.CODEC_GZIP) + self.assertEqual(msg.attributes, KafkaProtocol.ATTRIBUTE_CODEC_MASK & + KafkaProtocol.CODEC_GZIP) self.assertEqual(msg.key, None) # Need to decode to check since gzipped payload is non-deterministic decoded = gzip_decode(msg.value) @@ -108,9 +151,8 @@ class TestProtocol(unittest.TestCase): payloads = ["v1", "v2"] msg = create_snappy_message(payloads) self.assertEqual(msg.magic, 0) - self.assertEqual( - msg.attributes, - KafkaProtocol.ATTRIBUTE_CODEC_MASK & KafkaProtocol.CODEC_SNAPPY) + self.assertEqual(msg.attributes, KafkaProtocol.ATTRIBUTE_CODEC_MASK & + KafkaProtocol.CODEC_SNAPPY) self.assertEqual(msg.key, None) expect = ("8\x00\x00\x19\x01@\x10L\x9f[\xc2\x00\x00\xff\xff\xff\xff" "\x00\x00\x00\x02v1\x19\x1bD\x00\x10\xd5\x96\nx\x00\x00\xff" @@ -281,7 +323,6 @@ class TestProtocol(unittest.TestCase): len(ms3), ms3) responses = list(KafkaProtocol.decode_fetch_response(encoded)) - def expand_messages(response): return FetchResponse(response.topic, response.partition, response.error, response.highwaterMark, @@ -369,6 +410,7 @@ class TestProtocol(unittest.TestCase): def test_decode_offset_response(self): pass + @unittest.skip("Not Implemented") def test_encode_offset_commit_request(self): pass @@ -388,6 +430,81 @@ class TestProtocol(unittest.TestCase): class TestKafkaClient(unittest.TestCase): + def test_init_with_list(self): + + with patch.object(KafkaClient, 'load_metadata_for_topics'): + client = KafkaClient( + hosts=['kafka01:9092', 'kafka02:9092', 'kafka03:9092']) + + self.assertItemsEqual( + [('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)], + client.hosts) + + def test_init_with_csv(self): + + with patch.object(KafkaClient, 'load_metadata_for_topics'): + client = KafkaClient( + hosts='kafka01:9092,kafka02:9092,kafka03:9092') + + self.assertItemsEqual( + [('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)], + client.hosts) + + def test_send_broker_unaware_request_fail(self): + 'Tests that call fails when all hosts are unavailable' + + mocked_conns = { + ('kafka01', 9092): MagicMock(), + ('kafka02', 9092): MagicMock() + } + # inject KafkaConnection side effects + mocked_conns[('kafka01', 9092)].send.side_effect = RuntimeError("kafka01 went away (unittest)") + mocked_conns[('kafka02', 9092)].send.side_effect = RuntimeError("Kafka02 went away (unittest)") + + def mock_get_conn(host, port): + return mocked_conns[(host, port)] + + # patch to avoid making requests before we want it + with patch.object(KafkaClient, 'load_metadata_for_topics'), \ + patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn): + + client = KafkaClient(hosts=['kafka01:9092', 'kafka02:9092']) + + self.assertRaises( + KafkaUnavailableError, + client._send_broker_unaware_request, + 1, 'fake request') + + for key, conn in mocked_conns.iteritems(): + conn.send.assert_called_with(1, 'fake request') + + def test_send_broker_unaware_request(self): + 'Tests that call works when at least one of the host is available' + + mocked_conns = { + ('kafka01', 9092): MagicMock(), + ('kafka02', 9092): MagicMock(), + ('kafka03', 9092): MagicMock() + } + # inject KafkaConnection side effects + mocked_conns[('kafka01', 9092)].send.side_effect = RuntimeError("kafka01 went away (unittest)") + mocked_conns[('kafka02', 9092)].recv.return_value = 'valid response' + mocked_conns[('kafka03', 9092)].send.side_effect = RuntimeError("kafka03 went away (unittest)") + + def mock_get_conn(host, port): + return mocked_conns[(host, port)] + + # patch to avoid making requests before we want it + with patch.object(KafkaClient, 'load_metadata_for_topics'), \ + patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn): + + client = KafkaClient(hosts='kafka01:9092,kafka02:9092') + + resp = client._send_broker_unaware_request(1, 'fake request') + + self.assertEqual('valid response', resp) + mocked_conns[('kafka02', 9092)].recv.assert_called_with(1) + @patch('kafka.client.KafkaConnection') @patch('kafka.client.KafkaProtocol') def test_load_metadata(self, protocol, conn): @@ -416,7 +533,7 @@ class TestKafkaClient(unittest.TestCase): protocol.decode_metadata_response.return_value = (brokers, topics) # client loads metadata at init - client = KafkaClient(host='broker_1', port=4567) + client = KafkaClient(hosts=['broker_1:4567']) self.assertDictEqual({ TopicAndPartition('topic_1', 0): brokers[1], TopicAndPartition('topic_noleader', 0): None, @@ -440,7 +557,7 @@ class TestKafkaClient(unittest.TestCase): topics = {'topic_no_partitions': {}} protocol.decode_metadata_response.return_value = (brokers, topics) - client = KafkaClient(host='broker_1', port=4567) + client = KafkaClient(hosts=['broker_1:4567']) # topic metadata is loaded but empty self.assertDictEqual({}, client.topics_to_brokers) @@ -473,7 +590,7 @@ class TestKafkaClient(unittest.TestCase): topics = {'topic_no_partitions': {}} protocol.decode_metadata_response.return_value = (brokers, topics) - client = KafkaClient(host='broker_1', port=4567) + client = KafkaClient(hosts=['broker_1:4567']) self.assertDictEqual({}, client.topics_to_brokers) self.assertRaises( @@ -499,7 +616,7 @@ class TestKafkaClient(unittest.TestCase): } protocol.decode_metadata_response.return_value = (brokers, topics) - client = KafkaClient(host='broker_1', port=4567) + client = KafkaClient(hosts=['broker_1:4567']) self.assertDictEqual( { TopicAndPartition('topic_noleader', 0): None, @@ -535,7 +652,7 @@ class TestKafkaClient(unittest.TestCase): } protocol.decode_metadata_response.return_value = (brokers, topics) - client = KafkaClient(host='broker_1', port=4567) + client = KafkaClient(hosts=['broker_1:4567']) requests = [ProduceRequest( "topic_noleader", 0, |