diff options
-rw-r--r-- | kafka/consumer.py | 2 | ||||
-rw-r--r-- | kafka/partitioner.py | 1 | ||||
-rw-r--r-- | test/test_producer_integration.py | 366 | ||||
-rw-r--r-- | test/testutil.py | 47 |
4 files changed, 344 insertions, 72 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py index 8ac28da..14b84fe 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -101,7 +101,7 @@ class Consumer(object): elif resp.error == ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON: return 0 else: - raise Exception("OffsetFetchRequest for topic=%s, " + raise ProtocolError("OffsetFetchRequest for topic=%s, " "partition=%d failed with errorcode=%s" % ( resp.topic, resp.partition, resp.error)) diff --git a/kafka/partitioner.py b/kafka/partitioner.py index 8190c34..5287cef 100644 --- a/kafka/partitioner.py +++ b/kafka/partitioner.py @@ -54,4 +54,5 @@ class HashedPartitioner(Partitioner): def partition(self, key, partitions): size = len(partitions) idx = hash(key) % size + return partitions[idx] diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py index 8bab4d5..e148ad8 100644 --- a/test/test_producer_integration.py +++ b/test/test_producer_integration.py @@ -1,5 +1,5 @@ -import unittest import time +import unittest from kafka import * # noqa from kafka.common import * # noqa @@ -14,48 +14,35 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): def setUpClass(cls): # noqa cls.zk = ZookeeperFixture.instance() cls.server = KafkaFixture.instance(0, cls.zk.host, cls.zk.port) - cls.client = KafkaClient('%s:%d' % (cls.server.host, cls.server.port)) @classmethod def tearDownClass(cls): # noqa - cls.client.close() cls.server.close() cls.zk.close() def test_produce_many_simple(self): start_offset = self.current_offset(self.topic, 0) - produce = ProduceRequest(self.topic, 0, messages=[ - create_message("Test message %d" % i) for i in range(100) - ]) - - resp = self.client.send_produce_request([produce]) - self.assertEqual(len(resp), 1) # Only one response - self.assertEqual(resp[0].error, 0) # No error - self.assertEqual(resp[0].offset, start_offset) # Initial offset of first message + self.assert_produce_request( + [ create_message("Test message %d" % i) for i in range(100) ], + start_offset, + 100, + ) - self.assertEqual(self.current_offset(self.topic, 0), start_offset+100) - - resp = self.client.send_produce_request([produce]) - self.assertEqual(len(resp), 1) # Only one response - self.assertEqual(resp[0].error, 0) # No error - self.assertEqual(resp[0].offset, start_offset+100) # Initial offset of first message - - self.assertEqual(self.current_offset(self.topic, 0), start_offset+200) + self.assert_produce_request( + [ create_message("Test message %d" % i) for i in range(100) ], + start_offset+100, + 100, + ) def test_produce_10k_simple(self): start_offset = self.current_offset(self.topic, 0) - produce = ProduceRequest(self.topic, 0, messages=[ - create_message("Test message %d" % i) for i in range(10000) - ]) - - resp = self.client.send_produce_request([produce]) - self.assertEqual(len(resp), 1) # Only one response - self.assertEqual(resp[0].error, 0) # No error - self.assertEqual(resp[0].offset, start_offset) # Initial offset of first message - - self.assertEqual(self.current_offset(self.topic, 0), start_offset+10000) + self.assert_produce_request( + [ create_message("Test message %d" % i) for i in range(10000) ], + start_offset, + 10000, + ) def test_produce_many_gzip(self): start_offset = self.current_offset(self.topic, 0) @@ -63,31 +50,23 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): message1 = create_gzip_message(["Gzipped 1 %d" % i for i in range(100)]) message2 = create_gzip_message(["Gzipped 2 %d" % i for i in range(100)]) - produce = ProduceRequest(self.topic, 0, messages=[message1, message2]) - - resp = self.client.send_produce_request([produce]) - self.assertEqual(len(resp), 1) # Only one response - self.assertEqual(resp[0].error, 0) # No error - self.assertEqual(resp[0].offset, start_offset) # Initial offset of first message - - self.assertEqual(self.current_offset(self.topic, 0), start_offset+200) + self.assert_produce_request( + [ message1, message2 ], + start_offset, + 200, + ) @unittest.skip("All snappy integration tests fail with nosnappyjava") def test_produce_many_snappy(self): start_offset = self.current_offset(self.topic, 0) - produce = ProduceRequest(self.topic, 0, messages=[ - create_snappy_message(["Snappy 1 %d" % i for i in range(100)]), - create_snappy_message(["Snappy 2 %d" % i for i in range(100)]), - ]) - - resp = self.client.send_produce_request([produce]) - - self.assertEqual(len(resp), 1) # Only one response - self.assertEqual(resp[0].error, 0) # No error - self.assertEqual(resp[0].offset, start_offset) # Initial offset of first message - - self.assertEqual(self.current_offset(self.topic, 0), start_offset+200) + self.assert_produce_request([ + create_snappy_message(["Snappy 1 %d" % i for i in range(100)]), + create_snappy_message(["Snappy 2 %d" % i for i in range(100)]), + ], + start_offset, + 200, + ) def test_produce_mixed(self): start_offset = self.current_offset(self.topic, 0) @@ -103,37 +82,282 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): msg_count += 100 messages.append(create_snappy_message(["Snappy %d" % i for i in range(100)])) - produce = ProduceRequest(self.topic, 0, messages=messages) - resp = self.client.send_produce_request([produce]) - - self.assertEqual(len(resp), 1) # Only one response - self.assertEqual(resp[0].error, 0) # No error - self.assertEqual(resp[0].offset, start_offset) # Initial offset of first message - - self.assertEqual(self.current_offset(self.topic, 0), start_offset+msg_count) + self.assert_produce_request(messages, start_offset, msg_count) def test_produce_100k_gzipped(self): start_offset = self.current_offset(self.topic, 0) - req1 = ProduceRequest(self.topic, 0, messages=[ - create_gzip_message(["Gzipped batch 1, message %d" % i for i in range(50000)]) + self.assert_produce_request([ + create_gzip_message(["Gzipped batch 1, message %d" % i for i in range(50000)]) + ], + start_offset, + 50000, + ) + + self.assert_produce_request([ + create_gzip_message(["Gzipped batch 1, message %d" % i for i in range(50000)]) + ], + start_offset+50000, + 50000, + ) + + ############################ + # SimpleProducer Tests # + ############################ + + def test_simple_producer(self): + start_offset0 = self.current_offset(self.topic, 0) + start_offset1 = self.current_offset(self.topic, 1) + producer = SimpleProducer(self.client) + + # Will go to partition 0 + msg1, msg2, msg3, msg4, msg5 = [ str(uuid.uuid4()) for x in xrange(5) ] + resp = producer.send_messages(self.topic, self.msg("one"), self.msg("two")) + self.assert_produce_response(resp, start_offset0) + + # Will go to partition 1 + resp = producer.send_messages(self.topic, self.msg("three")) + self.assert_produce_response(resp, start_offset1) + + self.assert_fetch_offset(0, start_offset0, [ self.msg("one"), self.msg("two") ]) + self.assert_fetch_offset(1, start_offset1, [ self.msg("three") ]) + + # Will go to partition 0 + resp = producer.send_messages(self.topic, self.msg("four"), self.msg("five")) + self.assert_produce_response(resp, start_offset0+2) + self.assert_fetch_offset(0, start_offset0, [ self.msg("one"), self.msg("two"), self.msg("four"), self.msg("five") ]) + + producer.stop() + + def test_round_robin_partitioner(self): + msg1, msg2, msg3, msg4 = [ str(uuid.uuid4()) for _ in range(4) ] + + start_offset0 = self.current_offset(self.topic, 0) + start_offset1 = self.current_offset(self.topic, 1) + + producer = KeyedProducer(self.client, partitioner=RoundRobinPartitioner) + resp1 = producer.send(self.topic, "key1", self.msg("one")) + resp2 = producer.send(self.topic, "key2", self.msg("two")) + resp3 = producer.send(self.topic, "key3", self.msg("three")) + resp4 = producer.send(self.topic, "key4", self.msg("four")) + + self.assert_produce_response(resp1, start_offset0+0) + self.assert_produce_response(resp2, start_offset1+0) + self.assert_produce_response(resp3, start_offset0+1) + self.assert_produce_response(resp4, start_offset1+1) + + self.assert_fetch_offset(0, start_offset0, [ self.msg("one"), self.msg("three") ]) + self.assert_fetch_offset(1, start_offset1, [ self.msg("two"), self.msg("four") ]) + + producer.stop() + + def test_hashed_partitioner(self): + start_offset0 = self.current_offset(self.topic, 0) + start_offset1 = self.current_offset(self.topic, 1) + + producer = KeyedProducer(self.client, partitioner=HashedPartitioner) + resp1 = producer.send(self.topic, 1, self.msg("one")) + resp2 = producer.send(self.topic, 2, self.msg("two")) + resp3 = producer.send(self.topic, 3, self.msg("three")) + resp4 = producer.send(self.topic, 3, self.msg("four")) + resp5 = producer.send(self.topic, 4, self.msg("five")) + + self.assert_produce_response(resp1, start_offset1+0) + self.assert_produce_response(resp2, start_offset0+0) + self.assert_produce_response(resp3, start_offset1+1) + self.assert_produce_response(resp4, start_offset1+2) + self.assert_produce_response(resp5, start_offset0+1) + + self.assert_fetch_offset(0, start_offset0, [ self.msg("two"), self.msg("five") ]) + self.assert_fetch_offset(1, start_offset1, [ self.msg("one"), self.msg("three"), self.msg("four") ]) + + producer.stop() + + def test_acks_none(self): + start_offset0 = self.current_offset(self.topic, 0) + start_offset1 = self.current_offset(self.topic, 1) + + producer = SimpleProducer(self.client, req_acks=SimpleProducer.ACK_NOT_REQUIRED) + resp = producer.send_messages(self.topic, self.msg("one")) + self.assertEquals(len(resp), 0) + + self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ]) + producer.stop() + + def test_acks_local_write(self): + start_offset0 = self.current_offset(self.topic, 0) + start_offset1 = self.current_offset(self.topic, 1) + + producer = SimpleProducer(self.client, req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE) + resp = producer.send_messages(self.topic, self.msg("one")) + + self.assert_produce_response(resp, start_offset0) + self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ]) + + producer.stop() + + def test_acks_cluster_commit(self): + start_offset0 = self.current_offset(self.topic, 0) + start_offset1 = self.current_offset(self.topic, 1) + + producer = SimpleProducer( + self.client, + req_acks=SimpleProducer.ACK_AFTER_CLUSTER_COMMIT) + + resp = producer.send_messages(self.topic, self.msg("one")) + self.assert_produce_response(resp, start_offset0) + self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ]) + + producer.stop() + + def test_batched_simple_producer__triggers_by_message(self): + start_offset0 = self.current_offset(self.topic, 0) + start_offset1 = self.current_offset(self.topic, 1) + + producer = SimpleProducer(self.client, + batch_send=True, + batch_send_every_n=5, + batch_send_every_t=20) + + # Send 5 messages and do a fetch + resp = producer.send_messages(self.topic, + self.msg("one"), + self.msg("two"), + self.msg("three"), + self.msg("four"), + ) + + # Batch mode is async. No ack + self.assertEquals(len(resp), 0) + + # It hasn't sent yet + self.assert_fetch_offset(0, start_offset0, []) + self.assert_fetch_offset(1, start_offset1, []) + + resp = producer.send_messages(self.topic, + self.msg("five"), + self.msg("six"), + self.msg("seven"), + ) + + # Batch mode is async. No ack + self.assertEquals(len(resp), 0) + + self.assert_fetch_offset(0, start_offset0, [ + self.msg("one"), + self.msg("two"), + self.msg("three"), + self.msg("four"), ]) - resp1 = self.client.send_produce_request([req1]) - self.assertEqual(len(resp1), 1) # Only one response - self.assertEqual(resp1[0].error, 0) # No error - self.assertEqual(resp1[0].offset, start_offset) # Initial offset of first message + self.assert_fetch_offset(1, start_offset1, [ + self.msg("five"), + # self.msg("six"), + # self.msg("seven"), + ]) - self.assertEqual(self.current_offset(self.topic, 0), start_offset+50000) + producer.stop() + + def test_batched_simple_producer__triggers_by_time(self): + start_offset0 = self.current_offset(self.topic, 0) + start_offset1 = self.current_offset(self.topic, 1) + + producer = SimpleProducer(self.client, + batch_send=True, + batch_send_every_n=100, + batch_send_every_t=5) + + # Send 5 messages and do a fetch + resp = producer.send_messages(self.topic, + self.msg("one"), + self.msg("two"), + self.msg("three"), + self.msg("four"), + ) + + # Batch mode is async. No ack + self.assertEquals(len(resp), 0) + + # It hasn't sent yet + self.assert_fetch_offset(0, start_offset0, []) + self.assert_fetch_offset(1, start_offset1, []) + + resp = producer.send_messages(self.topic, + self.msg("five"), + self.msg("six"), + self.msg("seven"), + ) + + # Batch mode is async. No ack + self.assertEquals(len(resp), 0) + + # Wait the timeout out + time.sleep(5) + + self.assert_fetch_offset(0, start_offset0, [ + self.msg("one"), + self.msg("two"), + self.msg("three"), + self.msg("four"), + ]) - req2 = ProduceRequest(self.topic, 0, messages=[ - create_gzip_message(["Gzipped batch 2, message %d" % i for i in range(50000)]) + self.assert_fetch_offset(1, start_offset1, [ + self.msg("five"), + self.msg("six"), + self.msg("seven"), ]) - resp2 = self.client.send_produce_request([req2]) + producer.stop() + + def test_async_simple_producer(self): + start_offset0 = self.current_offset(self.topic, 0) + start_offset1 = self.current_offset(self.topic, 1) + + producer = SimpleProducer(self.client, async=True) + resp = producer.send_messages(self.topic, self.msg("one")) + self.assertEquals(len(resp), 0) + + self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ]) + + producer.stop() + + def test_async_keyed_producer(self): + start_offset0 = self.current_offset(self.topic, 0) + start_offset1 = self.current_offset(self.topic, 1) + + producer = KeyedProducer(self.client, partitioner = RoundRobinPartitioner, async=True) + + resp = producer.send(self.topic, "key1", self.msg("one")) + self.assertEquals(len(resp), 0) + + self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ]) + + producer.stop() + + def assert_produce_request(self, messages, initial_offset, message_ct): + produce = ProduceRequest(self.topic, 0, messages=messages) + + # There should only be one response message from the server. + # This will throw an exception if there's more than one. + resp = self.client.send_produce_request([ produce ]) + self.assert_produce_response(resp, initial_offset) + + self.assertEqual(self.current_offset(self.topic, 0), initial_offset + message_ct) + + def assert_produce_response(self, resp, initial_offset): + self.assertEqual(len(resp), 1) + self.assertEqual(resp[0].error, 0) + self.assertEqual(resp[0].offset, initial_offset) + + def assert_fetch_offset(self, partition, start_offset, expected_messages): + # There should only be one response message from the server. + # This will throw an exception if there's more than one. + + resp, = self.client.send_fetch_request([ FetchRequest(self.topic, partition, start_offset, 1024) ]) - self.assertEqual(len(resp2), 1) # Only one response - self.assertEqual(resp2[0].error, 0) # No error - self.assertEqual(resp2[0].offset, start_offset+50000) # Initial offset of first message + self.assertEquals(resp.error, 0) + self.assertEquals(resp.partition, partition) + messages = [ x.message.value for x in resp.messages ] - self.assertEqual(self.current_offset(self.topic, 0), start_offset+100000) + self.assertEqual(messages, expected_messages) + self.assertEquals(resp.highwaterMark, start_offset+len(expected_messages)) diff --git a/test/testutil.py b/test/testutil.py index 7d57ff6..4866b9d 100644 --- a/test/testutil.py +++ b/test/testutil.py @@ -1,6 +1,12 @@ +import uuid +import time +import unittest import os import random import string +import logging +from kafka.common import OffsetRequest +from kafka import KafkaClient def random_string(l): s = "".join(random.choice(string.letters) for i in xrange(l)) @@ -8,3 +14,44 @@ def random_string(l): def skip_integration(): return os.environ.get('SKIP_INTEGRATION') + +def ensure_topic_creation(client, topic_name, timeout = 30): + start_time = time.time() + + client.load_metadata_for_topics(topic_name) + while not client.has_metadata_for_topic(topic_name): + if time.time() > start_time + timeout: + raise Exception("Unable to create topic %s" % topic_name) + client.load_metadata_for_topics(topic_name) + time.sleep(1) + +class KafkaIntegrationTestCase(unittest.TestCase): + topic = None + + def setUp(self): + super(KafkaIntegrationTestCase, self).setUp() + if not self.topic: + self.topic = "%s-%s" % (self.id()[self.id().rindex(".") + 1:], random_string(10)) + + self.client = KafkaClient('%s:%d' % (self.server.host, self.server.port)) + ensure_topic_creation(self.client, self.topic) + self._messages = {} + + def tearDown(self): + super(KafkaIntegrationTestCase, self).tearDown() + self.client.close() + + def current_offset(self, topic, partition): + offsets, = self.client.send_offset_request([ OffsetRequest(topic, partition, -1, 1) ]) + return offsets.offsets[0] + + def msgs(self, iterable): + return [ self.msg(x) for x in iterable ] + + def msg(self, s): + if s not in self._messages: + self._messages[s] = '%s-%s-%s' % (s, self.id(), str(uuid.uuid4())) + + return self._messages[s] + +logging.basicConfig(level=logging.DEBUG) |