summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/client.py24
-rw-r--r--kafka/common.py5
-rw-r--r--kafka/conn.py28
-rw-r--r--kafka/producer.py11
-rw-r--r--test/fixtures.py25
-rw-r--r--test/resources/kafka.properties3
-rw-r--r--test/test_integration.py107
7 files changed, 173 insertions, 30 deletions
diff --git a/kafka/client.py b/kafka/client.py
index b7ceb2e..71ededa 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -3,9 +3,11 @@ from collections import defaultdict
from functools import partial
from itertools import count
import logging
+import socket
import time
from kafka.common import ErrorMapping, TopicAndPartition
+from kafka.common import ConnectionError, FailedPayloadsException
from kafka.conn import KafkaConnection
from kafka.protocol import KafkaProtocol
@@ -71,7 +73,7 @@ class KafkaClient(object):
log.debug("Broker metadata: %s", brokers)
log.debug("Topic metadata: %s", topics)
- self.brokers.update(brokers)
+ self.brokers = brokers
self.topics_to_brokers = {}
for topic, partitions in topics.items():
@@ -147,13 +149,15 @@ class KafkaClient(object):
for payload in payloads:
leader = self._get_leader_for_partition(payload.topic,
payload.partition)
-
payloads_by_broker[leader].append(payload)
original_keys.append((payload.topic, payload.partition))
# Accumulate the responses in a dictionary
acc = {}
+ # keep a list of payloads that were failed to be sent to brokers
+ failed_payloads = []
+
# For each broker, send the list of request payloads
for broker, payloads in payloads_by_broker.items():
conn = self._get_conn_for_broker(broker)
@@ -162,15 +166,23 @@ class KafkaClient(object):
correlation_id=requestId, payloads=payloads)
# Send the request, recv the response
- conn.send(requestId, request)
-
- if decoder_fn is None:
+ try:
+ conn.send(requestId, request)
+ if decoder_fn is None:
+ continue
+ response = conn.recv(requestId)
+ except ConnectionError, e: # ignore BufferUnderflow for now
+ log.warning("Could not send request [%s] to server %s: %s" % (request, conn, e))
+ failed_payloads += payloads
+ self.topics_to_brokers = {} # reset metadata
continue
- response = conn.recv(requestId)
for response in decoder_fn(response):
acc[(response.topic, response.partition)] = response
+ if failed_payloads:
+ raise FailedPayloadsException(failed_payloads)
+
# Order the accumulated responses by the original key order
return (acc[k] for k in original_keys) if acc else ()
diff --git a/kafka/common.py b/kafka/common.py
index 8f3154c..6f0dd32 100644
--- a/kafka/common.py
+++ b/kafka/common.py
@@ -69,6 +69,11 @@ class ErrorMapping(object):
# Exceptions #
#################
+class FailedPayloadsException(Exception):
+ pass
+
+class ConnectionError(Exception):
+ pass
class BufferUnderflowError(Exception):
pass
diff --git a/kafka/conn.py b/kafka/conn.py
index 194a19c..14aebc6 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -5,7 +5,7 @@ import struct
from threading import local
from kafka.common import BufferUnderflowError
-
+from kafka.common import ConnectionError
log = logging.getLogger("kafka")
@@ -27,6 +27,7 @@ class KafkaConnection(local):
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.connect((host, port))
self._sock.settimeout(10)
+ self._dirty = False
def __str__(self):
return "<KafkaConnection host=%s port=%d>" % (self.host, self.port)
@@ -54,7 +55,7 @@ class KafkaConnection(local):
# Read the size off of the header
resp = self._sock.recv(4)
if resp == "":
- raise Exception("Got no response from Kafka")
+ self._raise_connection_error()
(size,) = struct.unpack('>i', resp)
messagesize = size - 4
@@ -72,6 +73,10 @@ class KafkaConnection(local):
total += len(resp)
yield resp
+ def _raise_connection_error(self):
+ self._dirty = True
+ raise ConnectionError("Kafka @ {}:{} went away".format(self.host, self.port))
+
##################
# Public API #
##################
@@ -80,14 +85,16 @@ class KafkaConnection(local):
def send(self, request_id, payload):
"Send a request to Kafka"
-
- log.debug(
- "About to send %d bytes to Kafka, request %d" %
- (len(payload), request_id))
-
- sent = self._sock.sendall(payload)
- if sent is not None:
- raise RuntimeError("Kafka went away")
+ log.debug("About to send %d bytes to Kafka, request %d" % (len(payload), request_id))
+ try:
+ if self._dirty:
+ self.reinit()
+ sent = self._sock.sendall(payload)
+ if sent is not None:
+ self._raise_connection_error()
+ except socket.error:
+ log.exception('Unable to send payload to Kafka')
+ self._raise_connection_error()
def recv(self, request_id):
"""
@@ -121,3 +128,4 @@ class KafkaConnection(local):
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.connect((self.host, self.port))
self._sock.settimeout(10)
+ self._dirty = False
diff --git a/kafka/producer.py b/kafka/producer.py
index a7bfe28..7ef7896 100644
--- a/kafka/producer.py
+++ b/kafka/producer.py
@@ -7,6 +7,7 @@ import logging
import sys
from kafka.common import ProduceRequest
+from kafka.common import FailedPayloadsException
from kafka.protocol import create_message
from kafka.partitioner import HashedPartitioner
@@ -67,7 +68,7 @@ def _send_upstream(topic, queue, client, batch_time, batch_size,
acks=req_acks,
timeout=ack_timeout)
except Exception as exp:
- log.error("Error sending message", exc_info=sys.exc_info())
+ log.exception("Unable to send message")
class Producer(object):
@@ -140,8 +141,12 @@ class Producer(object):
else:
messages = [create_message(m) for m in msg]
req = ProduceRequest(self.topic, partition, messages)
- resp = self.client.send_produce_request([req], acks=self.req_acks,
- timeout=self.ack_timeout)
+ try:
+ resp = self.client.send_produce_request([req], acks=self.req_acks,
+ timeout=self.ack_timeout)
+ except Exception as e:
+ log.exception("Unable to send messages")
+ raise e
return resp
def stop(self, timeout=1):
diff --git a/test/fixtures.py b/test/fixtures.py
index 00c1afd..946c64f 100644
--- a/test/fixtures.py
+++ b/test/fixtures.py
@@ -242,7 +242,7 @@ class ZookeeperFixture(object):
class KafkaFixture(object):
@staticmethod
- def instance(broker_id, zk_host, zk_port, zk_chroot=None):
+ def instance(broker_id, zk_host, zk_port, zk_chroot=None, replicas=1, partitions=2):
if zk_chroot is None:
zk_chroot = "kafka-python_" + str(uuid.uuid4()).replace("-", "_")
if "KAFKA_URI" in os.environ:
@@ -251,11 +251,11 @@ class KafkaFixture(object):
fixture = ExternalService(host, port)
else:
(host, port) = ("127.0.0.1", get_open_port())
- fixture = KafkaFixture(host, port, broker_id, zk_host, zk_port, zk_chroot)
+ fixture = KafkaFixture(host, port, broker_id, zk_host, zk_port, zk_chroot, replicas, partitions)
fixture.open()
return fixture
- def __init__(self, host, port, broker_id, zk_host, zk_port, zk_chroot):
+ def __init__(self, host, port, broker_id, zk_host, zk_port, zk_chroot, replicas=1, partitions=2):
self.host = host
self.port = port
@@ -265,19 +265,24 @@ class KafkaFixture(object):
self.zk_port = zk_port
self.zk_chroot = zk_chroot
+ self.replicas = replicas
+ self.partitions = partitions
+
self.tmp_dir = None
self.child = None
def open(self):
self.tmp_dir = tempfile.mkdtemp()
print("*** Running local Kafka instance")
- print(" host = %s" % self.host)
- print(" port = %s" % self.port)
- print(" broker_id = %s" % self.broker_id)
- print(" zk_host = %s" % self.zk_host)
- print(" zk_port = %s" % self.zk_port)
- print(" zk_chroot = %s" % self.zk_chroot)
- print(" tmp_dir = %s" % self.tmp_dir)
+ print(" host = %s" % self.host)
+ print(" port = %s" % self.port)
+ print(" broker_id = %s" % self.broker_id)
+ print(" zk_host = %s" % self.zk_host)
+ print(" zk_port = %s" % self.zk_port)
+ print(" zk_chroot = %s" % self.zk_chroot)
+ print(" replicas = %s" % self.replicas)
+ print(" partitions = %s" % self.partitions)
+ print(" tmp_dir = %s" % self.tmp_dir)
# Create directories
os.mkdir(os.path.join(self.tmp_dir, "logs"))
diff --git a/test/resources/kafka.properties b/test/resources/kafka.properties
index d42c097..f8732fb 100644
--- a/test/resources/kafka.properties
+++ b/test/resources/kafka.properties
@@ -32,7 +32,8 @@ socket.request.max.bytes=104857600
############################# Log Basics #############################
log.dir={tmp_dir}/data
-num.partitions=2
+num.partitions={partitions}
+default.replication.factor={replicas}
############################# Log Flush Policy #############################
diff --git a/test/test_integration.py b/test/test_integration.py
index d8ead59..a10dae2 100644
--- a/test/test_integration.py
+++ b/test/test_integration.py
@@ -770,6 +770,113 @@ class TestConsumer(unittest.TestCase):
self.assertEquals(all_messages[i], message.message)
self.assertEquals(i, 19)
+class TestFailover(unittest.TestCase):
+
+ @classmethod
+ def setUpClass(cls):
+
+ zk_chroot = random_string(10)
+ replicas = 2
+ partitions = 2
+
+ # mini zookeeper, 2 kafka brokers
+ cls.zk = ZookeeperFixture.instance()
+ kk_args = [cls.zk.host, cls.zk.port, zk_chroot, replicas, partitions]
+ cls.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)]
+ cls.client = KafkaClient(cls.brokers[0].host, cls.brokers[0].port)
+
+ @classmethod
+ def tearDownClass(cls):
+ cls.client.close()
+ for broker in cls.brokers:
+ broker.close()
+ cls.zk.close()
+
+ def test_switch_leader(self):
+
+ key, topic, partition = random_string(5), 'test_switch_leader', 0
+ producer = SimpleProducer(self.client, topic)
+
+ for i in range(1, 4):
+
+ # XXX unfortunately, the conns dict needs to be warmed for this to work
+ # XXX unfortunately, for warming to work, we need at least as many partitions as brokers
+ self._send_random_messages(producer, 10)
+
+ # kil leader for partition 0
+ broker = self._kill_leader(topic, partition)
+
+ # expect failure, reload meta data
+ with self.assertRaises(FailedPayloadsException):
+ producer.send_messages('part 1')
+ producer.send_messages('part 2')
+ time.sleep(1)
+
+ # send to new leader
+ self._send_random_messages(producer, 10)
+
+ broker.open()
+ time.sleep(3)
+
+ # count number of messages
+ count = self._count_messages('test_switch_leader group %s' % i, topic)
+ self.assertIn(count, range(20 * i, 22 * i + 1))
+
+ producer.stop()
+
+ def test_switch_leader_async(self):
+
+ key, topic, partition = random_string(5), 'test_switch_leader_async', 0
+ producer = SimpleProducer(self.client, topic, async=True)
+
+ for i in range(1, 4):
+
+ self._send_random_messages(producer, 10)
+
+ # kil leader for partition 0
+ broker = self._kill_leader(topic, partition)
+
+ # expect failure, reload meta data
+ producer.send_messages('part 1')
+ producer.send_messages('part 2')
+ time.sleep(1)
+
+ # send to new leader
+ self._send_random_messages(producer, 10)
+
+ broker.open()
+ time.sleep(3)
+
+ # count number of messages
+ count = self._count_messages('test_switch_leader_async group %s' % i, topic)
+ self.assertIn(count, range(20 * i, 22 * i + 1))
+
+ producer.stop()
+
+ def _send_random_messages(self, producer, n):
+ for j in range(n):
+ resp = producer.send_messages(random_string(10))
+ if len(resp) > 0:
+ self.assertEquals(resp[0].error, 0)
+ time.sleep(1) # give it some time
+
+ def _kill_leader(self, topic, partition):
+ leader = self.client.topics_to_brokers[TopicAndPartition(topic, partition)]
+ broker = self.brokers[leader.nodeId]
+ broker.close()
+ time.sleep(1) # give it some time
+ return broker
+
+ def _count_messages(self, group, topic):
+ client = KafkaClient(self.brokers[0].host, self.brokers[0].port)
+ consumer = SimpleConsumer(client, group, topic, auto_commit=False)
+ all_messages = []
+ for message in consumer:
+ all_messages.append(message)
+ consumer.stop()
+ client.close()
+ return len(all_messages)
+
def random_string(l):
s = "".join(random.choice(string.letters) for i in xrange(l))