diff options
author | Dana Powers <dana.powers@gmail.com> | 2015-02-19 23:25:19 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2015-02-19 23:25:19 -0800 |
commit | 9ad0be662d388b47aadf04d712f5744add6456e3 (patch) | |
tree | f520b34f440b7d2e9513b8802841c32b1126a106 | |
parent | 60a73788ee9036a79078193dfab892c6e6ef8f9b (diff) | |
parent | 6ed6ad53f23fae8dd7ccadc15a7c84f67334d0c0 (diff) | |
download | kafka-python-9ad0be662d388b47aadf04d712f5744add6456e3.tar.gz |
Merge pull request #314 from dpkp/keyed_producer_failover
Handle keyed producer failover
-rw-r--r-- | kafka/partitioner/base.py | 7 | ||||
-rw-r--r-- | kafka/partitioner/hashed.py | 4 | ||||
-rw-r--r-- | kafka/partitioner/roundrobin.py | 4 | ||||
-rw-r--r-- | kafka/producer/keyed.py | 2 | ||||
-rw-r--r-- | test/test_failover_integration.py | 77 | ||||
-rw-r--r-- | test/test_producer_integration.py | 282 |
6 files changed, 231 insertions, 145 deletions
diff --git a/kafka/partitioner/base.py b/kafka/partitioner/base.py index 0b1bb59..857f634 100644 --- a/kafka/partitioner/base.py +++ b/kafka/partitioner/base.py @@ -12,14 +12,13 @@ class Partitioner(object): """ self.partitions = partitions - def partition(self, key, partitions): + def partition(self, key, partitions=None): """ Takes a string key and num_partitions as argument and returns a partition to be used for the message Arguments: - partitions: The list of partitions is passed in every call. This - may look like an overhead, but it will be useful - (in future) when we handle cases like rebalancing + key: the key to use for partitioning + partitions: (optional) a list of partitions. """ raise NotImplementedError('partition function has to be implemented') diff --git a/kafka/partitioner/hashed.py b/kafka/partitioner/hashed.py index 587a3de..fb5e598 100644 --- a/kafka/partitioner/hashed.py +++ b/kafka/partitioner/hashed.py @@ -5,7 +5,9 @@ class HashedPartitioner(Partitioner): Implements a partitioner which selects the target partition based on the hash of the key """ - def partition(self, key, partitions): + def partition(self, key, partitions=None): + if not partitions: + partitions = self.partitions size = len(partitions) idx = hash(key) % size diff --git a/kafka/partitioner/roundrobin.py b/kafka/partitioner/roundrobin.py index 54d00da..6439e53 100644 --- a/kafka/partitioner/roundrobin.py +++ b/kafka/partitioner/roundrobin.py @@ -15,9 +15,9 @@ class RoundRobinPartitioner(Partitioner): self.partitions = partitions self.iterpart = cycle(partitions) - def partition(self, key, partitions): + def partition(self, key, partitions=None): # Refresh the partition list if necessary - if self.partitions != partitions: + if partitions and self.partitions != partitions: self._set_partitions(partitions) return next(self.iterpart) diff --git a/kafka/producer/keyed.py b/kafka/producer/keyed.py index fe5b056..36328ed 100644 --- a/kafka/producer/keyed.py +++ b/kafka/producer/keyed.py @@ -54,7 +54,7 @@ class KeyedProducer(Producer): self.partitioners[topic] = self.partitioner_class(self.client.get_partition_ids_for_topic(topic)) partitioner = self.partitioners[topic] - return partitioner.partition(key, self.client.get_partition_ids_for_topic(topic)) + return partitioner.partition(key) def send_messages(self,topic,key,*msg): partition = self._next_partition(topic, key) diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py index ca71f2d..7d27526 100644 --- a/test/test_failover_integration.py +++ b/test/test_failover_integration.py @@ -7,6 +7,7 @@ from . import unittest from kafka import KafkaClient, SimpleConsumer from kafka.common import TopicAndPartition, FailedPayloadsError, ConnectionError from kafka.producer.base import Producer +from kafka.producer import KeyedProducer from test.fixtures import ZookeeperFixture, KafkaFixture from test.testutil import ( @@ -17,8 +18,7 @@ from test.testutil import ( class TestFailover(KafkaIntegrationTestCase): create_client = False - @classmethod - def setUpClass(cls): # noqa + def setUp(self): if not os.environ.get('KAFKA_VERSION'): return @@ -27,33 +27,41 @@ class TestFailover(KafkaIntegrationTestCase): partitions = 2 # mini zookeeper, 2 kafka brokers - cls.zk = ZookeeperFixture.instance() - kk_args = [cls.zk.host, cls.zk.port, zk_chroot, replicas, partitions] - cls.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)] + self.zk = ZookeeperFixture.instance() + kk_args = [self.zk.host, self.zk.port, zk_chroot, replicas, partitions] + self.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)] - hosts = ['%s:%d' % (b.host, b.port) for b in cls.brokers] - cls.client = KafkaClient(hosts) + hosts = ['%s:%d' % (b.host, b.port) for b in self.brokers] + self.client = KafkaClient(hosts) + super(TestFailover, self).setUp() - @classmethod - def tearDownClass(cls): + def tearDown(self): + super(TestFailover, self).tearDown() if not os.environ.get('KAFKA_VERSION'): return - cls.client.close() - for broker in cls.brokers: + self.client.close() + for broker in self.brokers: broker.close() - cls.zk.close() + self.zk.close() @kafka_versions("all") def test_switch_leader(self): topic = self.topic partition = 0 - # Test the base class Producer -- send_messages to a specific partition + # Testing the base Producer class here so that we can easily send + # messages to a specific partition, kill the leader for that partition + # and check that after another broker takes leadership the producer + # is able to resume sending messages + + # require that the server commit messages to all in-sync replicas + # so that failover doesn't lose any messages on server-side + # and we can assert that server-side message count equals client-side producer = Producer(self.client, async=False, req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT) - # Send 10 random messages + # Send 100 random messages to a specific partition self._send_random_messages(producer, topic, partition, 100) # kill leader for partition @@ -80,7 +88,7 @@ class TestFailover(KafkaIntegrationTestCase): self._send_random_messages(producer, topic, partition, 100) # count number of messages - # Should be equal to 10 before + 1 recovery + 10 after + # Should be equal to 100 before + 1 recovery + 100 after self.assert_message_count(topic, 201, partitions=(partition,)) @@ -116,6 +124,45 @@ class TestFailover(KafkaIntegrationTestCase): # Should be equal to 10 before + 1 recovery + 10 after self.assert_message_count(topic, 21, partitions=(partition,)) + @kafka_versions("all") + def test_switch_leader_keyed_producer(self): + topic = self.topic + + producer = KeyedProducer(self.client, async=False) + + # Send 10 random messages + for _ in range(10): + key = random_string(3) + msg = random_string(10) + producer.send_messages(topic, key, msg) + + # kill leader for partition 0 + self._kill_leader(topic, 0) + + recovered = False + started = time.time() + timeout = 60 + while not recovered and (time.time() - started) < timeout: + try: + key = random_string(3) + msg = random_string(10) + producer.send_messages(topic, key, msg) + if producer.partitioners[topic].partition(key) == 0: + recovered = True + except (FailedPayloadsError, ConnectionError): + logging.debug("caught exception sending message -- will retry") + continue + + # Verify we successfully sent the message + self.assertTrue(recovered) + + # send some more messages just to make sure no more exceptions + for _ in range(10): + key = random_string(3) + msg = random_string(10) + producer.send_messages(topic, key, msg) + + def _send_random_messages(self, producer, topic, partition, n): for j in range(n): logging.debug('_send_random_message to %s:%d -- try %d', topic, partition, j) diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py index 19d28bd..38df69f 100644 --- a/test/test_producer_integration.py +++ b/test/test_producer_integration.py @@ -14,12 +14,12 @@ from kafka.common import ( FetchRequest, ProduceRequest, UnknownTopicOrPartitionError, LeaderNotAvailableError ) +from kafka.producer.base import Producer from test.fixtures import ZookeeperFixture, KafkaFixture from test.testutil import KafkaIntegrationTestCase, kafka_versions class TestKafkaProducerIntegration(KafkaIntegrationTestCase): - topic = b'produce_topic' @classmethod def setUpClass(cls): # noqa @@ -140,25 +140,26 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): @kafka_versions("all") def test_simple_producer(self): - start_offset0 = self.current_offset(self.topic, 0) - start_offset1 = self.current_offset(self.topic, 1) + partitions = self.client.get_partition_ids_for_topic(self.topic) + start_offsets = [self.current_offset(self.topic, p) for p in partitions] + producer = SimpleProducer(self.client, random_start=False) # Goes to first partition, randomly. resp = producer.send_messages(self.topic, self.msg("one"), self.msg("two")) - self.assert_produce_response(resp, start_offset0) + self.assert_produce_response(resp, start_offsets[0]) # Goes to the next partition, randomly. resp = producer.send_messages(self.topic, self.msg("three")) - self.assert_produce_response(resp, start_offset1) + self.assert_produce_response(resp, start_offsets[1]) - self.assert_fetch_offset(0, start_offset0, [ self.msg("one"), self.msg("two") ]) - self.assert_fetch_offset(1, start_offset1, [ self.msg("three") ]) + self.assert_fetch_offset(partitions[0], start_offsets[0], [ self.msg("one"), self.msg("two") ]) + self.assert_fetch_offset(partitions[1], start_offsets[1], [ self.msg("three") ]) # Goes back to the first partition because there's only two partitions resp = producer.send_messages(self.topic, self.msg("four"), self.msg("five")) - self.assert_produce_response(resp, start_offset0+2) - self.assert_fetch_offset(0, start_offset0, [ self.msg("one"), self.msg("two"), self.msg("four"), self.msg("five") ]) + self.assert_produce_response(resp, start_offsets[0]+2) + self.assert_fetch_offset(partitions[0], start_offsets[0], [ self.msg("one"), self.msg("two"), self.msg("four"), self.msg("five") ]) producer.stop() @@ -194,110 +195,38 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): self.assertEqual(resp3[0].partition, 0) @kafka_versions("all") - def test_round_robin_partitioner(self): - start_offset0 = self.current_offset(self.topic, 0) - start_offset1 = self.current_offset(self.topic, 1) - - producer = KeyedProducer(self.client, partitioner=RoundRobinPartitioner) - resp1 = producer.send(self.topic, self.key("key1"), self.msg("one")) - resp2 = producer.send(self.topic, self.key("key2"), self.msg("two")) - resp3 = producer.send(self.topic, self.key("key3"), self.msg("three")) - resp4 = producer.send(self.topic, self.key("key4"), self.msg("four")) - - self.assert_produce_response(resp1, start_offset0+0) - self.assert_produce_response(resp2, start_offset1+0) - self.assert_produce_response(resp3, start_offset0+1) - self.assert_produce_response(resp4, start_offset1+1) - - self.assert_fetch_offset(0, start_offset0, [ self.msg("one"), self.msg("three") ]) - self.assert_fetch_offset(1, start_offset1, [ self.msg("two"), self.msg("four") ]) - - producer.stop() - - @kafka_versions("all") - def test_hashed_partitioner(self): - start_offset0 = self.current_offset(self.topic, 0) - start_offset1 = self.current_offset(self.topic, 1) - - producer = KeyedProducer(self.client, partitioner=HashedPartitioner) - resp1 = producer.send(self.topic, self.key("1"), self.msg("one")) - resp2 = producer.send(self.topic, self.key("2"), self.msg("two")) - resp3 = producer.send(self.topic, self.key("3"), self.msg("three")) - resp4 = producer.send(self.topic, self.key("3"), self.msg("four")) - resp5 = producer.send(self.topic, self.key("4"), self.msg("five")) - - offsets = {0: start_offset0, 1: start_offset1} - messages = {0: [], 1: []} - - keys = [self.key(k) for k in ["1", "2", "3", "3", "4"]] - resps = [resp1, resp2, resp3, resp4, resp5] - msgs = [self.msg(m) for m in ["one", "two", "three", "four", "five"]] - - for key, resp, msg in zip(keys, resps, msgs): - k = hash(key) % 2 - offset = offsets[k] - self.assert_produce_response(resp, offset) - offsets[k] += 1 - messages[k].append(msg) - - self.assert_fetch_offset(0, start_offset0, messages[0]) - self.assert_fetch_offset(1, start_offset1, messages[1]) - - producer.stop() - - @kafka_versions("all") - def test_acks_none(self): - start_offset0 = self.current_offset(self.topic, 0) + def test_async_simple_producer(self): + partition = self.client.get_partition_ids_for_topic(self.topic)[0] + start_offset = self.current_offset(self.topic, partition) - producer = SimpleProducer(self.client, req_acks=SimpleProducer.ACK_NOT_REQUIRED, - random_start=False) + producer = SimpleProducer(self.client, async=True, random_start=False) resp = producer.send_messages(self.topic, self.msg("one")) self.assertEqual(len(resp), 0) - self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ]) - producer.stop() - - @kafka_versions("all") - def test_acks_local_write(self): - start_offset0 = self.current_offset(self.topic, 0) - - producer = SimpleProducer(self.client, req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE, - random_start=False) - resp = producer.send_messages(self.topic, self.msg("one")) - - self.assert_produce_response(resp, start_offset0) - self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ]) + # wait for the server to report a new highwatermark + while self.current_offset(self.topic, partition) == start_offset: + time.sleep(0.1) - producer.stop() - - @kafka_versions("all") - def test_acks_cluster_commit(self): - start_offset0 = self.current_offset(self.topic, 0) - - producer = SimpleProducer( - self.client, - req_acks=SimpleProducer.ACK_AFTER_CLUSTER_COMMIT, - random_start=False) - - resp = producer.send_messages(self.topic, self.msg("one")) - self.assert_produce_response(resp, start_offset0) - self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ]) + self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ]) producer.stop() @kafka_versions("all") def test_batched_simple_producer__triggers_by_message(self): - start_offset0 = self.current_offset(self.topic, 0) - start_offset1 = self.current_offset(self.topic, 1) + partitions = self.client.get_partition_ids_for_topic(self.topic) + start_offsets = [self.current_offset(self.topic, p) for p in partitions] + # Configure batch producer + batch_messages = 5 + batch_interval = 5 producer = SimpleProducer( self.client, batch_send=True, - batch_send_every_n=5, - batch_send_every_t=20, + batch_send_every_n=batch_messages, + batch_send_every_t=batch_interval, random_start=False) - # Send 5 messages and do a fetch + # Send 4 messages -- should not trigger a batch resp = producer.send_messages(self.topic, self.msg("one"), self.msg("two"), @@ -309,9 +238,10 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): self.assertEqual(len(resp), 0) # It hasn't sent yet - self.assert_fetch_offset(0, start_offset0, []) - self.assert_fetch_offset(1, start_offset1, []) + self.assert_fetch_offset(partitions[0], start_offsets[0], []) + self.assert_fetch_offset(partitions[1], start_offsets[1], []) + # send 3 more messages -- should trigger batch on first 5 resp = producer.send_messages(self.topic, self.msg("five"), self.msg("six"), @@ -321,30 +251,32 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): # Batch mode is async. No ack self.assertEqual(len(resp), 0) - self.assert_fetch_offset(0, start_offset0, [ + # send messages groups all *msgs in a single call to the same partition + # so we should see all messages from the first call in one partition + self.assert_fetch_offset(partitions[0], start_offsets[0], [ self.msg("one"), self.msg("two"), self.msg("three"), self.msg("four"), ]) - self.assert_fetch_offset(1, start_offset1, [ + # Because we are batching every 5 messages, we should only see one + self.assert_fetch_offset(partitions[1], start_offsets[1], [ self.msg("five"), - # self.msg("six"), - # self.msg("seven"), ]) producer.stop() @kafka_versions("all") def test_batched_simple_producer__triggers_by_time(self): - start_offset0 = self.current_offset(self.topic, 0) - start_offset1 = self.current_offset(self.topic, 1) + partitions = self.client.get_partition_ids_for_topic(self.topic) + start_offsets = [self.current_offset(self.topic, p) for p in partitions] + batch_interval = 5 producer = SimpleProducer(self.client, batch_send=True, batch_send_every_n=100, - batch_send_every_t=5, + batch_send_every_t=batch_interval, random_start=False) # Send 5 messages and do a fetch @@ -359,8 +291,8 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): self.assertEqual(len(resp), 0) # It hasn't sent yet - self.assert_fetch_offset(0, start_offset0, []) - self.assert_fetch_offset(1, start_offset1, []) + self.assert_fetch_offset(partitions[0], start_offsets[0], []) + self.assert_fetch_offset(partitions[1], start_offsets[1], []) resp = producer.send_messages(self.topic, self.msg("five"), @@ -372,16 +304,16 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): self.assertEqual(len(resp), 0) # Wait the timeout out - time.sleep(5) + time.sleep(batch_interval) - self.assert_fetch_offset(0, start_offset0, [ + self.assert_fetch_offset(partitions[0], start_offsets[0], [ self.msg("one"), self.msg("two"), self.msg("three"), self.msg("four"), ]) - self.assert_fetch_offset(1, start_offset1, [ + self.assert_fetch_offset(partitions[1], start_offsets[1], [ self.msg("five"), self.msg("six"), self.msg("seven"), @@ -389,40 +321,146 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): producer.stop() + + ############################ + # KeyedProducer Tests # + ############################ + @kafka_versions("all") - def test_async_simple_producer(self): - start_offset0 = self.current_offset(self.topic, 0) + def test_round_robin_partitioner(self): + partitions = self.client.get_partition_ids_for_topic(self.topic) + start_offsets = [self.current_offset(self.topic, p) for p in partitions] - producer = SimpleProducer(self.client, async=True, random_start=False) - resp = producer.send_messages(self.topic, self.msg("one")) - self.assertEqual(len(resp), 0) + producer = KeyedProducer(self.client, partitioner=RoundRobinPartitioner) + resp1 = producer.send(self.topic, self.key("key1"), self.msg("one")) + resp2 = producer.send(self.topic, self.key("key2"), self.msg("two")) + resp3 = producer.send(self.topic, self.key("key3"), self.msg("three")) + resp4 = producer.send(self.topic, self.key("key4"), self.msg("four")) + + self.assert_produce_response(resp1, start_offsets[0]+0) + self.assert_produce_response(resp2, start_offsets[1]+0) + self.assert_produce_response(resp3, start_offsets[0]+1) + self.assert_produce_response(resp4, start_offsets[1]+1) + + self.assert_fetch_offset(partitions[0], start_offsets[0], [ self.msg("one"), self.msg("three") ]) + self.assert_fetch_offset(partitions[1], start_offsets[1], [ self.msg("two"), self.msg("four") ]) + + producer.stop() + + @kafka_versions("all") + def test_hashed_partitioner(self): + partitions = self.client.get_partition_ids_for_topic(self.topic) + start_offsets = [self.current_offset(self.topic, p) for p in partitions] + + producer = KeyedProducer(self.client, partitioner=HashedPartitioner) + resp1 = producer.send(self.topic, self.key("1"), self.msg("one")) + resp2 = producer.send(self.topic, self.key("2"), self.msg("two")) + resp3 = producer.send(self.topic, self.key("3"), self.msg("three")) + resp4 = producer.send(self.topic, self.key("3"), self.msg("four")) + resp5 = producer.send(self.topic, self.key("4"), self.msg("five")) + + offsets = {partitions[0]: start_offsets[0], partitions[1]: start_offsets[1]} + messages = {partitions[0]: [], partitions[1]: []} + + keys = [self.key(k) for k in ["1", "2", "3", "3", "4"]] + resps = [resp1, resp2, resp3, resp4, resp5] + msgs = [self.msg(m) for m in ["one", "two", "three", "four", "five"]] + + for key, resp, msg in zip(keys, resps, msgs): + k = hash(key) % 2 + partition = partitions[k] + offset = offsets[partition] + self.assert_produce_response(resp, offset) + offsets[partition] += 1 + messages[partition].append(msg) - self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ]) + self.assert_fetch_offset(partitions[0], start_offsets[0], messages[partitions[0]]) + self.assert_fetch_offset(partitions[1], start_offsets[1], messages[partitions[1]]) producer.stop() @kafka_versions("all") def test_async_keyed_producer(self): - start_offset0 = self.current_offset(self.topic, 0) + partition = self.client.get_partition_ids_for_topic(self.topic)[0] + start_offset = self.current_offset(self.topic, partition) producer = KeyedProducer(self.client, partitioner = RoundRobinPartitioner, async=True) resp = producer.send(self.topic, self.key("key1"), self.msg("one")) self.assertEqual(len(resp), 0) - self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ]) + # wait for the server to report a new highwatermark + while self.current_offset(self.topic, partition) == start_offset: + time.sleep(0.1) + + self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ]) + + producer.stop() + + ############################ + # Producer ACK Tests # + ############################ + + @kafka_versions("all") + def test_acks_none(self): + partition = self.client.get_partition_ids_for_topic(self.topic)[0] + start_offset = self.current_offset(self.topic, partition) + + producer = Producer( + self.client, + req_acks=Producer.ACK_NOT_REQUIRED, + ) + resp = producer.send_messages(self.topic, partition, self.msg("one")) + + # No response from produce request with no acks required + self.assertEqual(len(resp), 0) + + # But the message should still have been delivered + self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ]) + producer.stop() + + @kafka_versions("all") + def test_acks_local_write(self): + partition = self.client.get_partition_ids_for_topic(self.topic)[0] + start_offset = self.current_offset(self.topic, partition) + + producer = Producer( + self.client, + req_acks=Producer.ACK_AFTER_LOCAL_WRITE, + ) + resp = producer.send_messages(self.topic, partition, self.msg("one")) + + self.assert_produce_response(resp, start_offset) + self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ]) + + producer.stop() + + @kafka_versions("all") + def test_acks_cluster_commit(self): + partition = self.client.get_partition_ids_for_topic(self.topic)[0] + start_offset = self.current_offset(self.topic, partition) + + producer = Producer( + self.client, + req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT, + ) + + resp = producer.send_messages(self.topic, partition, self.msg("one")) + self.assert_produce_response(resp, start_offset) + self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ]) producer.stop() - def assert_produce_request(self, messages, initial_offset, message_ct): - produce = ProduceRequest(self.topic, 0, messages=messages) + def assert_produce_request(self, messages, initial_offset, message_ct, + partition=0): + produce = ProduceRequest(self.topic, partition, messages=messages) # There should only be one response message from the server. # This will throw an exception if there's more than one. resp = self.client.send_produce_request([ produce ]) self.assert_produce_response(resp, initial_offset) - self.assertEqual(self.current_offset(self.topic, 0), initial_offset + message_ct) + self.assertEqual(self.current_offset(self.topic, partition), initial_offset + message_ct) def assert_produce_response(self, resp, initial_offset): self.assertEqual(len(resp), 1) |