summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/consumer.py2
-rw-r--r--kafka/partitioner.py1
-rw-r--r--test/test_producer_integration.py366
-rw-r--r--test/testutil.py47
4 files changed, 344 insertions, 72 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
index 8ac28da..14b84fe 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -101,7 +101,7 @@ class Consumer(object):
elif resp.error == ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON:
return 0
else:
- raise Exception("OffsetFetchRequest for topic=%s, "
+ raise ProtocolError("OffsetFetchRequest for topic=%s, "
"partition=%d failed with errorcode=%s" % (
resp.topic, resp.partition, resp.error))
diff --git a/kafka/partitioner.py b/kafka/partitioner.py
index 8190c34..5287cef 100644
--- a/kafka/partitioner.py
+++ b/kafka/partitioner.py
@@ -54,4 +54,5 @@ class HashedPartitioner(Partitioner):
def partition(self, key, partitions):
size = len(partitions)
idx = hash(key) % size
+
return partitions[idx]
diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py
index 8bab4d5..e148ad8 100644
--- a/test/test_producer_integration.py
+++ b/test/test_producer_integration.py
@@ -1,5 +1,5 @@
-import unittest
import time
+import unittest
from kafka import * # noqa
from kafka.common import * # noqa
@@ -14,48 +14,35 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
def setUpClass(cls): # noqa
cls.zk = ZookeeperFixture.instance()
cls.server = KafkaFixture.instance(0, cls.zk.host, cls.zk.port)
- cls.client = KafkaClient('%s:%d' % (cls.server.host, cls.server.port))
@classmethod
def tearDownClass(cls): # noqa
- cls.client.close()
cls.server.close()
cls.zk.close()
def test_produce_many_simple(self):
start_offset = self.current_offset(self.topic, 0)
- produce = ProduceRequest(self.topic, 0, messages=[
- create_message("Test message %d" % i) for i in range(100)
- ])
-
- resp = self.client.send_produce_request([produce])
- self.assertEqual(len(resp), 1) # Only one response
- self.assertEqual(resp[0].error, 0) # No error
- self.assertEqual(resp[0].offset, start_offset) # Initial offset of first message
+ self.assert_produce_request(
+ [ create_message("Test message %d" % i) for i in range(100) ],
+ start_offset,
+ 100,
+ )
- self.assertEqual(self.current_offset(self.topic, 0), start_offset+100)
-
- resp = self.client.send_produce_request([produce])
- self.assertEqual(len(resp), 1) # Only one response
- self.assertEqual(resp[0].error, 0) # No error
- self.assertEqual(resp[0].offset, start_offset+100) # Initial offset of first message
-
- self.assertEqual(self.current_offset(self.topic, 0), start_offset+200)
+ self.assert_produce_request(
+ [ create_message("Test message %d" % i) for i in range(100) ],
+ start_offset+100,
+ 100,
+ )
def test_produce_10k_simple(self):
start_offset = self.current_offset(self.topic, 0)
- produce = ProduceRequest(self.topic, 0, messages=[
- create_message("Test message %d" % i) for i in range(10000)
- ])
-
- resp = self.client.send_produce_request([produce])
- self.assertEqual(len(resp), 1) # Only one response
- self.assertEqual(resp[0].error, 0) # No error
- self.assertEqual(resp[0].offset, start_offset) # Initial offset of first message
-
- self.assertEqual(self.current_offset(self.topic, 0), start_offset+10000)
+ self.assert_produce_request(
+ [ create_message("Test message %d" % i) for i in range(10000) ],
+ start_offset,
+ 10000,
+ )
def test_produce_many_gzip(self):
start_offset = self.current_offset(self.topic, 0)
@@ -63,31 +50,23 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
message1 = create_gzip_message(["Gzipped 1 %d" % i for i in range(100)])
message2 = create_gzip_message(["Gzipped 2 %d" % i for i in range(100)])
- produce = ProduceRequest(self.topic, 0, messages=[message1, message2])
-
- resp = self.client.send_produce_request([produce])
- self.assertEqual(len(resp), 1) # Only one response
- self.assertEqual(resp[0].error, 0) # No error
- self.assertEqual(resp[0].offset, start_offset) # Initial offset of first message
-
- self.assertEqual(self.current_offset(self.topic, 0), start_offset+200)
+ self.assert_produce_request(
+ [ message1, message2 ],
+ start_offset,
+ 200,
+ )
@unittest.skip("All snappy integration tests fail with nosnappyjava")
def test_produce_many_snappy(self):
start_offset = self.current_offset(self.topic, 0)
- produce = ProduceRequest(self.topic, 0, messages=[
- create_snappy_message(["Snappy 1 %d" % i for i in range(100)]),
- create_snappy_message(["Snappy 2 %d" % i for i in range(100)]),
- ])
-
- resp = self.client.send_produce_request([produce])
-
- self.assertEqual(len(resp), 1) # Only one response
- self.assertEqual(resp[0].error, 0) # No error
- self.assertEqual(resp[0].offset, start_offset) # Initial offset of first message
-
- self.assertEqual(self.current_offset(self.topic, 0), start_offset+200)
+ self.assert_produce_request([
+ create_snappy_message(["Snappy 1 %d" % i for i in range(100)]),
+ create_snappy_message(["Snappy 2 %d" % i for i in range(100)]),
+ ],
+ start_offset,
+ 200,
+ )
def test_produce_mixed(self):
start_offset = self.current_offset(self.topic, 0)
@@ -103,37 +82,282 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
msg_count += 100
messages.append(create_snappy_message(["Snappy %d" % i for i in range(100)]))
- produce = ProduceRequest(self.topic, 0, messages=messages)
- resp = self.client.send_produce_request([produce])
-
- self.assertEqual(len(resp), 1) # Only one response
- self.assertEqual(resp[0].error, 0) # No error
- self.assertEqual(resp[0].offset, start_offset) # Initial offset of first message
-
- self.assertEqual(self.current_offset(self.topic, 0), start_offset+msg_count)
+ self.assert_produce_request(messages, start_offset, msg_count)
def test_produce_100k_gzipped(self):
start_offset = self.current_offset(self.topic, 0)
- req1 = ProduceRequest(self.topic, 0, messages=[
- create_gzip_message(["Gzipped batch 1, message %d" % i for i in range(50000)])
+ self.assert_produce_request([
+ create_gzip_message(["Gzipped batch 1, message %d" % i for i in range(50000)])
+ ],
+ start_offset,
+ 50000,
+ )
+
+ self.assert_produce_request([
+ create_gzip_message(["Gzipped batch 1, message %d" % i for i in range(50000)])
+ ],
+ start_offset+50000,
+ 50000,
+ )
+
+ ############################
+ # SimpleProducer Tests #
+ ############################
+
+ def test_simple_producer(self):
+ start_offset0 = self.current_offset(self.topic, 0)
+ start_offset1 = self.current_offset(self.topic, 1)
+ producer = SimpleProducer(self.client)
+
+ # Will go to partition 0
+ msg1, msg2, msg3, msg4, msg5 = [ str(uuid.uuid4()) for x in xrange(5) ]
+ resp = producer.send_messages(self.topic, self.msg("one"), self.msg("two"))
+ self.assert_produce_response(resp, start_offset0)
+
+ # Will go to partition 1
+ resp = producer.send_messages(self.topic, self.msg("three"))
+ self.assert_produce_response(resp, start_offset1)
+
+ self.assert_fetch_offset(0, start_offset0, [ self.msg("one"), self.msg("two") ])
+ self.assert_fetch_offset(1, start_offset1, [ self.msg("three") ])
+
+ # Will go to partition 0
+ resp = producer.send_messages(self.topic, self.msg("four"), self.msg("five"))
+ self.assert_produce_response(resp, start_offset0+2)
+ self.assert_fetch_offset(0, start_offset0, [ self.msg("one"), self.msg("two"), self.msg("four"), self.msg("five") ])
+
+ producer.stop()
+
+ def test_round_robin_partitioner(self):
+ msg1, msg2, msg3, msg4 = [ str(uuid.uuid4()) for _ in range(4) ]
+
+ start_offset0 = self.current_offset(self.topic, 0)
+ start_offset1 = self.current_offset(self.topic, 1)
+
+ producer = KeyedProducer(self.client, partitioner=RoundRobinPartitioner)
+ resp1 = producer.send(self.topic, "key1", self.msg("one"))
+ resp2 = producer.send(self.topic, "key2", self.msg("two"))
+ resp3 = producer.send(self.topic, "key3", self.msg("three"))
+ resp4 = producer.send(self.topic, "key4", self.msg("four"))
+
+ self.assert_produce_response(resp1, start_offset0+0)
+ self.assert_produce_response(resp2, start_offset1+0)
+ self.assert_produce_response(resp3, start_offset0+1)
+ self.assert_produce_response(resp4, start_offset1+1)
+
+ self.assert_fetch_offset(0, start_offset0, [ self.msg("one"), self.msg("three") ])
+ self.assert_fetch_offset(1, start_offset1, [ self.msg("two"), self.msg("four") ])
+
+ producer.stop()
+
+ def test_hashed_partitioner(self):
+ start_offset0 = self.current_offset(self.topic, 0)
+ start_offset1 = self.current_offset(self.topic, 1)
+
+ producer = KeyedProducer(self.client, partitioner=HashedPartitioner)
+ resp1 = producer.send(self.topic, 1, self.msg("one"))
+ resp2 = producer.send(self.topic, 2, self.msg("two"))
+ resp3 = producer.send(self.topic, 3, self.msg("three"))
+ resp4 = producer.send(self.topic, 3, self.msg("four"))
+ resp5 = producer.send(self.topic, 4, self.msg("five"))
+
+ self.assert_produce_response(resp1, start_offset1+0)
+ self.assert_produce_response(resp2, start_offset0+0)
+ self.assert_produce_response(resp3, start_offset1+1)
+ self.assert_produce_response(resp4, start_offset1+2)
+ self.assert_produce_response(resp5, start_offset0+1)
+
+ self.assert_fetch_offset(0, start_offset0, [ self.msg("two"), self.msg("five") ])
+ self.assert_fetch_offset(1, start_offset1, [ self.msg("one"), self.msg("three"), self.msg("four") ])
+
+ producer.stop()
+
+ def test_acks_none(self):
+ start_offset0 = self.current_offset(self.topic, 0)
+ start_offset1 = self.current_offset(self.topic, 1)
+
+ producer = SimpleProducer(self.client, req_acks=SimpleProducer.ACK_NOT_REQUIRED)
+ resp = producer.send_messages(self.topic, self.msg("one"))
+ self.assertEquals(len(resp), 0)
+
+ self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ])
+ producer.stop()
+
+ def test_acks_local_write(self):
+ start_offset0 = self.current_offset(self.topic, 0)
+ start_offset1 = self.current_offset(self.topic, 1)
+
+ producer = SimpleProducer(self.client, req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE)
+ resp = producer.send_messages(self.topic, self.msg("one"))
+
+ self.assert_produce_response(resp, start_offset0)
+ self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ])
+
+ producer.stop()
+
+ def test_acks_cluster_commit(self):
+ start_offset0 = self.current_offset(self.topic, 0)
+ start_offset1 = self.current_offset(self.topic, 1)
+
+ producer = SimpleProducer(
+ self.client,
+ req_acks=SimpleProducer.ACK_AFTER_CLUSTER_COMMIT)
+
+ resp = producer.send_messages(self.topic, self.msg("one"))
+ self.assert_produce_response(resp, start_offset0)
+ self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ])
+
+ producer.stop()
+
+ def test_batched_simple_producer__triggers_by_message(self):
+ start_offset0 = self.current_offset(self.topic, 0)
+ start_offset1 = self.current_offset(self.topic, 1)
+
+ producer = SimpleProducer(self.client,
+ batch_send=True,
+ batch_send_every_n=5,
+ batch_send_every_t=20)
+
+ # Send 5 messages and do a fetch
+ resp = producer.send_messages(self.topic,
+ self.msg("one"),
+ self.msg("two"),
+ self.msg("three"),
+ self.msg("four"),
+ )
+
+ # Batch mode is async. No ack
+ self.assertEquals(len(resp), 0)
+
+ # It hasn't sent yet
+ self.assert_fetch_offset(0, start_offset0, [])
+ self.assert_fetch_offset(1, start_offset1, [])
+
+ resp = producer.send_messages(self.topic,
+ self.msg("five"),
+ self.msg("six"),
+ self.msg("seven"),
+ )
+
+ # Batch mode is async. No ack
+ self.assertEquals(len(resp), 0)
+
+ self.assert_fetch_offset(0, start_offset0, [
+ self.msg("one"),
+ self.msg("two"),
+ self.msg("three"),
+ self.msg("four"),
])
- resp1 = self.client.send_produce_request([req1])
- self.assertEqual(len(resp1), 1) # Only one response
- self.assertEqual(resp1[0].error, 0) # No error
- self.assertEqual(resp1[0].offset, start_offset) # Initial offset of first message
+ self.assert_fetch_offset(1, start_offset1, [
+ self.msg("five"),
+ # self.msg("six"),
+ # self.msg("seven"),
+ ])
- self.assertEqual(self.current_offset(self.topic, 0), start_offset+50000)
+ producer.stop()
+
+ def test_batched_simple_producer__triggers_by_time(self):
+ start_offset0 = self.current_offset(self.topic, 0)
+ start_offset1 = self.current_offset(self.topic, 1)
+
+ producer = SimpleProducer(self.client,
+ batch_send=True,
+ batch_send_every_n=100,
+ batch_send_every_t=5)
+
+ # Send 5 messages and do a fetch
+ resp = producer.send_messages(self.topic,
+ self.msg("one"),
+ self.msg("two"),
+ self.msg("three"),
+ self.msg("four"),
+ )
+
+ # Batch mode is async. No ack
+ self.assertEquals(len(resp), 0)
+
+ # It hasn't sent yet
+ self.assert_fetch_offset(0, start_offset0, [])
+ self.assert_fetch_offset(1, start_offset1, [])
+
+ resp = producer.send_messages(self.topic,
+ self.msg("five"),
+ self.msg("six"),
+ self.msg("seven"),
+ )
+
+ # Batch mode is async. No ack
+ self.assertEquals(len(resp), 0)
+
+ # Wait the timeout out
+ time.sleep(5)
+
+ self.assert_fetch_offset(0, start_offset0, [
+ self.msg("one"),
+ self.msg("two"),
+ self.msg("three"),
+ self.msg("four"),
+ ])
- req2 = ProduceRequest(self.topic, 0, messages=[
- create_gzip_message(["Gzipped batch 2, message %d" % i for i in range(50000)])
+ self.assert_fetch_offset(1, start_offset1, [
+ self.msg("five"),
+ self.msg("six"),
+ self.msg("seven"),
])
- resp2 = self.client.send_produce_request([req2])
+ producer.stop()
+
+ def test_async_simple_producer(self):
+ start_offset0 = self.current_offset(self.topic, 0)
+ start_offset1 = self.current_offset(self.topic, 1)
+
+ producer = SimpleProducer(self.client, async=True)
+ resp = producer.send_messages(self.topic, self.msg("one"))
+ self.assertEquals(len(resp), 0)
+
+ self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ])
+
+ producer.stop()
+
+ def test_async_keyed_producer(self):
+ start_offset0 = self.current_offset(self.topic, 0)
+ start_offset1 = self.current_offset(self.topic, 1)
+
+ producer = KeyedProducer(self.client, partitioner = RoundRobinPartitioner, async=True)
+
+ resp = producer.send(self.topic, "key1", self.msg("one"))
+ self.assertEquals(len(resp), 0)
+
+ self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ])
+
+ producer.stop()
+
+ def assert_produce_request(self, messages, initial_offset, message_ct):
+ produce = ProduceRequest(self.topic, 0, messages=messages)
+
+ # There should only be one response message from the server.
+ # This will throw an exception if there's more than one.
+ resp = self.client.send_produce_request([ produce ])
+ self.assert_produce_response(resp, initial_offset)
+
+ self.assertEqual(self.current_offset(self.topic, 0), initial_offset + message_ct)
+
+ def assert_produce_response(self, resp, initial_offset):
+ self.assertEqual(len(resp), 1)
+ self.assertEqual(resp[0].error, 0)
+ self.assertEqual(resp[0].offset, initial_offset)
+
+ def assert_fetch_offset(self, partition, start_offset, expected_messages):
+ # There should only be one response message from the server.
+ # This will throw an exception if there's more than one.
+
+ resp, = self.client.send_fetch_request([ FetchRequest(self.topic, partition, start_offset, 1024) ])
- self.assertEqual(len(resp2), 1) # Only one response
- self.assertEqual(resp2[0].error, 0) # No error
- self.assertEqual(resp2[0].offset, start_offset+50000) # Initial offset of first message
+ self.assertEquals(resp.error, 0)
+ self.assertEquals(resp.partition, partition)
+ messages = [ x.message.value for x in resp.messages ]
- self.assertEqual(self.current_offset(self.topic, 0), start_offset+100000)
+ self.assertEqual(messages, expected_messages)
+ self.assertEquals(resp.highwaterMark, start_offset+len(expected_messages))
diff --git a/test/testutil.py b/test/testutil.py
index 7d57ff6..4866b9d 100644
--- a/test/testutil.py
+++ b/test/testutil.py
@@ -1,6 +1,12 @@
+import uuid
+import time
+import unittest
import os
import random
import string
+import logging
+from kafka.common import OffsetRequest
+from kafka import KafkaClient
def random_string(l):
s = "".join(random.choice(string.letters) for i in xrange(l))
@@ -8,3 +14,44 @@ def random_string(l):
def skip_integration():
return os.environ.get('SKIP_INTEGRATION')
+
+def ensure_topic_creation(client, topic_name, timeout = 30):
+ start_time = time.time()
+
+ client.load_metadata_for_topics(topic_name)
+ while not client.has_metadata_for_topic(topic_name):
+ if time.time() > start_time + timeout:
+ raise Exception("Unable to create topic %s" % topic_name)
+ client.load_metadata_for_topics(topic_name)
+ time.sleep(1)
+
+class KafkaIntegrationTestCase(unittest.TestCase):
+ topic = None
+
+ def setUp(self):
+ super(KafkaIntegrationTestCase, self).setUp()
+ if not self.topic:
+ self.topic = "%s-%s" % (self.id()[self.id().rindex(".") + 1:], random_string(10))
+
+ self.client = KafkaClient('%s:%d' % (self.server.host, self.server.port))
+ ensure_topic_creation(self.client, self.topic)
+ self._messages = {}
+
+ def tearDown(self):
+ super(KafkaIntegrationTestCase, self).tearDown()
+ self.client.close()
+
+ def current_offset(self, topic, partition):
+ offsets, = self.client.send_offset_request([ OffsetRequest(topic, partition, -1, 1) ])
+ return offsets.offsets[0]
+
+ def msgs(self, iterable):
+ return [ self.msg(x) for x in iterable ]
+
+ def msg(self, s):
+ if s not in self._messages:
+ self._messages[s] = '%s-%s-%s' % (s, self.id(), str(uuid.uuid4()))
+
+ return self._messages[s]
+
+logging.basicConfig(level=logging.DEBUG)