diff options
-rw-r--r-- | kafka/consumer.py | 22 | ||||
-rw-r--r-- | test/test_client_integration.py | 16 | ||||
-rw-r--r-- | test/test_codec.py | 3 | ||||
-rw-r--r-- | test/test_consumer_integration.py | 31 | ||||
-rw-r--r-- | test/test_failover_integration.py | 12 | ||||
-rw-r--r-- | test/test_producer_integration.py | 27 | ||||
-rw-r--r-- | test/test_protocol.py | 4 | ||||
-rw-r--r-- | test/testutil.py | 17 |
8 files changed, 96 insertions, 36 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py index 14b84fe..d855874 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -10,6 +10,7 @@ from Queue import Empty, Queue from kafka.common import ( ErrorMapping, FetchRequest, OffsetRequest, OffsetCommitRequest, + OffsetFetchRequest, ConsumerFetchSizeTooSmall, ConsumerNoMoreData ) @@ -105,17 +106,16 @@ class Consumer(object): "partition=%d failed with errorcode=%s" % ( resp.topic, resp.partition, resp.error)) - # Uncomment for 0.8.1 - # - #for partition in partitions: - # req = OffsetFetchRequest(topic, partition) - # (offset,) = self.client.send_offset_fetch_request(group, [req], - # callback=get_or_init_offset_callback, - # fail_on_error=False) - # self.offsets[partition] = offset - - for partition in partitions: - self.offsets[partition] = 0 + if auto_commit: + for partition in partitions: + req = OffsetFetchRequest(topic, partition) + (offset,) = self.client.send_offset_fetch_request(group, [req], + callback=get_or_init_offset_callback, + fail_on_error=False) + self.offsets[partition] = offset + else: + for partition in partitions: + self.offsets[partition] = 0 def commit(self, partitions=None): """ diff --git a/test/test_client_integration.py b/test/test_client_integration.py index b3d01fc..881d0ae 100644 --- a/test/test_client_integration.py +++ b/test/test_client_integration.py @@ -1,25 +1,32 @@ -import unittest -import time -import socket +import os import random +import socket +import time +import unittest import kafka from kafka.common import * from fixtures import ZookeeperFixture, KafkaFixture from testutil import * -@unittest.skipIf(skip_integration(), 'Skipping Integration') class TestKafkaClientIntegration(KafkaIntegrationTestCase): @classmethod def setUpClass(cls): # noqa + if not os.environ.get('KAFKA_VERSION'): + return + cls.zk = ZookeeperFixture.instance() cls.server = KafkaFixture.instance(0, cls.zk.host, cls.zk.port) @classmethod def tearDownClass(cls): # noqa + if not os.environ.get('KAFKA_VERSION'): + return + cls.server.close() cls.zk.close() + @kafka_versions("all") def test_timeout(self): server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_port = get_open_port() @@ -30,6 +37,7 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase): conn = kafka.conn.KafkaConnection("localhost", server_port, 1.0) self.assertGreaterEqual(t.interval, 1.0) + @kafka_versions("all") def test_consume_none(self): fetch = FetchRequest(self.topic, 0, 0, 1024) diff --git a/test/test_codec.py b/test/test_codec.py index c311c52..40bd1b4 100644 --- a/test/test_codec.py +++ b/test/test_codec.py @@ -14,7 +14,7 @@ from kafka.common import ( LeaderUnavailableError, PartitionUnavailableError ) from kafka.codec import ( - has_gzip, has_snappy, gzip_encode, gzip_decode, + has_snappy, gzip_encode, gzip_decode, snappy_encode, snappy_decode ) from kafka.protocol import ( @@ -23,7 +23,6 @@ from kafka.protocol import ( from testutil import * class TestCodec(unittest.TestCase): - @unittest.skipUnless(has_gzip(), "Gzip not available") def test_gzip(self): for i in xrange(1000): s1 = random_string(100) diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index a1d9515..b1d1a59 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -1,3 +1,4 @@ +import os import unittest from datetime import datetime @@ -7,10 +8,12 @@ from kafka.consumer import MAX_FETCH_BUFFER_SIZE_BYTES from fixtures import ZookeeperFixture, KafkaFixture from testutil import * -@unittest.skipIf(skip_integration(), 'Skipping Integration') class TestConsumerIntegration(KafkaIntegrationTestCase): @classmethod def setUpClass(cls): + if not os.environ.get('KAFKA_VERSION'): + return + cls.zk = ZookeeperFixture.instance() cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port) cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port) @@ -19,6 +22,9 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): @classmethod def tearDownClass(cls): # noqa + if not os.environ.get('KAFKA_VERSION'): + return + cls.server1.close() cls.server2.close() cls.zk.close() @@ -38,6 +44,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): # Make sure there are no duplicates self.assertEquals(len(set(messages)), num_messages) + @kafka_versions("all") def test_simple_consumer(self): self.send_messages(0, range(0, 100)) self.send_messages(1, range(100, 200)) @@ -51,6 +58,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer.stop() + @kafka_versions("all") def test_simple_consumer__seek(self): self.send_messages(0, range(0, 100)) self.send_messages(1, range(100, 200)) @@ -69,6 +77,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer.stop() + @kafka_versions("all") def test_simple_consumer_blocking(self): consumer = SimpleConsumer(self.client, "group1", self.topic, @@ -96,6 +105,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer.stop() + @kafka_versions("all") def test_simple_consumer_pending(self): # Produce 10 messages to partitions 0 and 1 self.send_messages(0, range(0, 10)) @@ -110,6 +120,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer.stop() + @kafka_versions("all") def test_multi_process_consumer(self): # Produce 100 messages to partitions 0 and 1 self.send_messages(0, range(0, 100)) @@ -121,6 +132,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer.stop() + @kafka_versions("all") def test_multi_process_consumer_blocking(self): consumer = MultiProcessConsumer(self.client, "grp1", self.topic, auto_commit=False) @@ -148,6 +160,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer.stop() + @kafka_versions("all") def test_multi_proc_pending(self): self.send_messages(0, range(0, 10)) self.send_messages(1, range(10, 20)) @@ -160,6 +173,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer.stop() + @kafka_versions("all") def test_large_messages(self): # Produce 10 "normal" size messages small_messages = self.send_messages(0, [ str(x) for x in range(10) ]) @@ -177,6 +191,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer.stop() + @kafka_versions("all") def test_huge_messages(self): huge_message, = self.send_messages(0, [ create_message(random_string(MAX_FETCH_BUFFER_SIZE_BYTES + 10)), @@ -213,23 +228,25 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): msgs2 = self.send_messages(1, range(100, 200)) # Start a consumer - consumer = SimpleConsumer(self.client, "group1", + consumer1 = SimpleConsumer(self.client, "group1", self.topic, auto_commit=True, + auto_commit_every_t=600, auto_commit_every_n=20, iter_timeout=0) # Grab the first 195 messages - output_msgs1 = [ consumer.get_message().message.value for _ in xrange(195) ] + output_msgs1 = [ consumer1.get_message().message.value for _ in xrange(195) ] self.assert_message_count(output_msgs1, 195) - consumer.stop() # The offset should be at 180 - consumer = SimpleConsumer(self.client, "group1", + consumer2 = SimpleConsumer(self.client, "group1", self.topic, auto_commit=True, + auto_commit_every_t=600, auto_commit_every_n=20, iter_timeout=0) # 180-200 - self.assert_message_count([ message for message in consumer ], 20) + self.assert_message_count([ message for message in consumer2 ], 20) - consumer.stop() + consumer1.stop() + consumer2.stop() diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py index 782907b..e30b298 100644 --- a/test/test_failover_integration.py +++ b/test/test_failover_integration.py @@ -1,17 +1,20 @@ -import unittest +import os import time +import unittest from kafka import * # noqa from kafka.common import * # noqa from fixtures import ZookeeperFixture, KafkaFixture from testutil import * -@unittest.skipIf(skip_integration(), 'Skipping Integration') class TestFailover(KafkaIntegrationTestCase): create_client = False @classmethod def setUpClass(cls): # noqa + if not os.environ.get('KAFKA_VERSION'): + return + zk_chroot = random_string(10) replicas = 2 partitions = 2 @@ -26,11 +29,15 @@ class TestFailover(KafkaIntegrationTestCase): @classmethod def tearDownClass(cls): + if not os.environ.get('KAFKA_VERSION'): + return + cls.client.close() for broker in cls.brokers: broker.close() cls.zk.close() + @kafka_versions("all") def test_switch_leader(self): key, topic, partition = random_string(5), self.topic, 0 producer = SimpleProducer(self.client) @@ -62,6 +69,7 @@ class TestFailover(KafkaIntegrationTestCase): producer.stop() + @kafka_versions("all") def test_switch_leader_async(self): key, topic, partition = random_string(5), self.topic, 0 producer = SimpleProducer(self.client, async=True) diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py index 6723ff7..41e9c53 100644 --- a/test/test_producer_integration.py +++ b/test/test_producer_integration.py @@ -1,6 +1,7 @@ -import uuid +import os import time import unittest +import uuid from kafka import * # noqa from kafka.common import * # noqa @@ -13,14 +14,21 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): @classmethod def setUpClass(cls): # noqa + if not os.environ.get('KAFKA_VERSION'): + return + cls.zk = ZookeeperFixture.instance() cls.server = KafkaFixture.instance(0, cls.zk.host, cls.zk.port) @classmethod def tearDownClass(cls): # noqa + if not os.environ.get('KAFKA_VERSION'): + return + cls.server.close() cls.zk.close() + @kafka_versions("all") def test_produce_many_simple(self): start_offset = self.current_offset(self.topic, 0) @@ -36,6 +44,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): 100, ) + @kafka_versions("all") def test_produce_10k_simple(self): start_offset = self.current_offset(self.topic, 0) @@ -45,6 +54,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): 10000, ) + @kafka_versions("all") def test_produce_many_gzip(self): start_offset = self.current_offset(self.topic, 0) @@ -57,8 +67,9 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): 200, ) - @unittest.skip("All snappy integration tests fail with nosnappyjava") + @kafka_versions("all") def test_produce_many_snappy(self): + self.skipTest("All snappy integration tests fail with nosnappyjava") start_offset = self.current_offset(self.topic, 0) self.assert_produce_request([ @@ -69,6 +80,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): 200, ) + @kafka_versions("all") def test_produce_mixed(self): start_offset = self.current_offset(self.topic, 0) @@ -85,6 +97,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): self.assert_produce_request(messages, start_offset, msg_count) + @kafka_versions("all") def test_produce_100k_gzipped(self): start_offset = self.current_offset(self.topic, 0) @@ -106,6 +119,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): # SimpleProducer Tests # ############################ + @kafka_versions("all") def test_simple_producer(self): start_offset0 = self.current_offset(self.topic, 0) start_offset1 = self.current_offset(self.topic, 1) @@ -130,6 +144,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): producer.stop() + @kafka_versions("all") def test_round_robin_partitioner(self): msg1, msg2, msg3, msg4 = [ str(uuid.uuid4()) for _ in range(4) ] @@ -152,6 +167,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): producer.stop() + @kafka_versions("all") def test_hashed_partitioner(self): start_offset0 = self.current_offset(self.topic, 0) start_offset1 = self.current_offset(self.topic, 1) @@ -174,6 +190,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): producer.stop() + @kafka_versions("all") def test_acks_none(self): start_offset0 = self.current_offset(self.topic, 0) start_offset1 = self.current_offset(self.topic, 1) @@ -185,6 +202,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ]) producer.stop() + @kafka_versions("all") def test_acks_local_write(self): start_offset0 = self.current_offset(self.topic, 0) start_offset1 = self.current_offset(self.topic, 1) @@ -197,6 +215,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): producer.stop() + @kafka_versions("all") def test_acks_cluster_commit(self): start_offset0 = self.current_offset(self.topic, 0) start_offset1 = self.current_offset(self.topic, 1) @@ -211,6 +230,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): producer.stop() + @kafka_versions("all") def test_batched_simple_producer__triggers_by_message(self): start_offset0 = self.current_offset(self.topic, 0) start_offset1 = self.current_offset(self.topic, 1) @@ -259,6 +279,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): producer.stop() + @kafka_versions("all") def test_batched_simple_producer__triggers_by_time(self): start_offset0 = self.current_offset(self.topic, 0) start_offset1 = self.current_offset(self.topic, 1) @@ -310,6 +331,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): producer.stop() + @kafka_versions("all") def test_async_simple_producer(self): start_offset0 = self.current_offset(self.topic, 0) start_offset1 = self.current_offset(self.topic, 1) @@ -322,6 +344,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): producer.stop() + @kafka_versions("all") def test_async_keyed_producer(self): start_offset0 = self.current_offset(self.topic, 0) start_offset1 = self.current_offset(self.topic, 1) diff --git a/test/test_protocol.py b/test/test_protocol.py index 555fe10..125169f 100644 --- a/test/test_protocol.py +++ b/test/test_protocol.py @@ -12,7 +12,7 @@ from kafka.common import ( LeaderUnavailableError, PartitionUnavailableError ) from kafka.codec import ( - has_gzip, has_snappy, gzip_encode, gzip_decode, + has_snappy, gzip_encode, gzip_decode, snappy_encode, snappy_decode ) from kafka.protocol import ( @@ -29,7 +29,6 @@ class TestProtocol(unittest.TestCase): self.assertEqual(msg.key, key) self.assertEqual(msg.value, payload) - @unittest.skipUnless(has_gzip(), "gzip not available") def test_create_gzip(self): payloads = ["v1", "v2"] msg = create_gzip_message(payloads) @@ -197,7 +196,6 @@ class TestProtocol(unittest.TestCase): self.assertEqual(returned_offset2, 1) self.assertEqual(decoded_message2, create_message("v2", "k2")) - @unittest.skipUnless(has_gzip(), "Gzip not available") def test_decode_message_gzip(self): gzip_encoded = ('\xc0\x11\xb2\xf0\x00\x01\xff\xff\xff\xff\x00\x00\x000' '\x1f\x8b\x08\x00\xa1\xc1\xc5R\x02\xffc`\x80\x03\x01' diff --git a/test/testutil.py b/test/testutil.py index 9d2ea9c..61fe9bd 100644 --- a/test/testutil.py +++ b/test/testutil.py @@ -13,7 +13,6 @@ from kafka import KafkaClient __all__ = [ 'random_string', - 'skip_integration', 'ensure_topic_creation', 'get_open_port', 'kafka_versions', @@ -25,15 +24,17 @@ def random_string(l): s = "".join(random.choice(string.letters) for i in xrange(l)) return s -def skip_integration(): - return os.environ.get('SKIP_INTEGRATION') - def kafka_versions(*versions): def kafka_versions(func): @functools.wraps(func) def wrapper(self): - if os.environ.get('KAFKA_VERSION', None) not in versions: + kafka_version = os.environ.get('KAFKA_VERSION') + + if not kafka_version: + self.skipTest("no kafka version specified") + elif 'all' not in versions and kafka_version not in versions: self.skipTest("unsupported kafka version") + return func(self) return wrapper return kafka_versions @@ -61,6 +62,9 @@ class KafkaIntegrationTestCase(unittest.TestCase): def setUp(self): super(KafkaIntegrationTestCase, self).setUp() + if not os.environ.get('KAFKA_VERSION'): + return + if not self.topic: self.topic = "%s-%s" % (self.id()[self.id().rindex(".") + 1:], random_string(10)) @@ -73,6 +77,9 @@ class KafkaIntegrationTestCase(unittest.TestCase): def tearDown(self): super(KafkaIntegrationTestCase, self).tearDown() + if not os.environ.get('KAFKA_VERSION'): + return + if self.create_client: self.client.close() |