diff options
-rw-r--r-- | test/fixtures.py | 4 | ||||
-rw-r--r-- | test/test_client_integration.py | 6 | ||||
-rw-r--r-- | test/test_codec.py | 2 | ||||
-rw-r--r-- | test/test_consumer_integration.py | 31 | ||||
-rw-r--r-- | test/test_failover_integration.py | 4 | ||||
-rw-r--r-- | test/test_producer_integration.py | 4 | ||||
-rw-r--r-- | test/testutil.py | 12 |
7 files changed, 51 insertions, 12 deletions
diff --git a/test/fixtures.py b/test/fixtures.py index 7b032f1..df6faec 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -7,8 +7,8 @@ import tempfile import uuid from urlparse import urlparse -from .service import ExternalService, SpawnedService -from .testutil import get_open_port +from service import ExternalService, SpawnedService +from testutil import get_open_port class Fixture(object): kafka_version = os.environ.get('KAFKA_VERSION', '0.8.0') diff --git a/test/test_client_integration.py b/test/test_client_integration.py index 29a0cd0..b3d01fc 100644 --- a/test/test_client_integration.py +++ b/test/test_client_integration.py @@ -5,8 +5,8 @@ import random import kafka from kafka.common import * -from .fixtures import ZookeeperFixture, KafkaFixture -from .testutil import * +from fixtures import ZookeeperFixture, KafkaFixture +from testutil import * @unittest.skipIf(skip_integration(), 'Skipping Integration') class TestKafkaClientIntegration(KafkaIntegrationTestCase): @@ -45,7 +45,7 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase): # Offset Tests # #################### - @unittest.skip('commit offset not supported in this version') + @kafka_versions("0.8.1") def test_commit_fetch_offsets(self): req = OffsetCommitRequest(self.topic, 0, 42, "metadata") (resp,) = self.client.send_offset_commit_request("group", [req]) diff --git a/test/test_codec.py b/test/test_codec.py index 7fedb71..c311c52 100644 --- a/test/test_codec.py +++ b/test/test_codec.py @@ -20,7 +20,7 @@ from kafka.codec import ( from kafka.protocol import ( create_gzip_message, create_message, create_snappy_message, KafkaProtocol ) -from .testutil import * +from testutil import * class TestCodec(unittest.TestCase): @unittest.skipUnless(has_gzip(), "Gzip not available") diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index b8050a4..a1d9515 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -4,8 +4,8 @@ from datetime import datetime from kafka import * # noqa from kafka.common import * # noqa from kafka.consumer import MAX_FETCH_BUFFER_SIZE_BYTES -from .fixtures import ZookeeperFixture, KafkaFixture -from .testutil import * +from fixtures import ZookeeperFixture, KafkaFixture +from testutil import * @unittest.skipIf(skip_integration(), 'Skipping Integration') class TestConsumerIntegration(KafkaIntegrationTestCase): @@ -206,3 +206,30 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): self.assertEquals(message.message.value, huge_message) big_consumer.stop() + + @kafka_versions("0.8.1") + def test_offset_behavior__resuming_behavior(self): + msgs1 = self.send_messages(0, range(0, 100)) + msgs2 = self.send_messages(1, range(100, 200)) + + # Start a consumer + consumer = SimpleConsumer(self.client, "group1", + self.topic, auto_commit=True, + auto_commit_every_n=20, + iter_timeout=0) + + # Grab the first 195 messages + output_msgs1 = [ consumer.get_message().message.value for _ in xrange(195) ] + self.assert_message_count(output_msgs1, 195) + consumer.stop() + + # The offset should be at 180 + consumer = SimpleConsumer(self.client, "group1", + self.topic, auto_commit=True, + auto_commit_every_n=20, + iter_timeout=0) + + # 180-200 + self.assert_message_count([ message for message in consumer ], 20) + + consumer.stop() diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py index 1211087..782907b 100644 --- a/test/test_failover_integration.py +++ b/test/test_failover_integration.py @@ -3,8 +3,8 @@ import time from kafka import * # noqa from kafka.common import * # noqa -from .fixtures import ZookeeperFixture, KafkaFixture -from .testutil import * +from fixtures import ZookeeperFixture, KafkaFixture +from testutil import * @unittest.skipIf(skip_integration(), 'Skipping Integration') class TestFailover(KafkaIntegrationTestCase): diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py index eb07d0a..6723ff7 100644 --- a/test/test_producer_integration.py +++ b/test/test_producer_integration.py @@ -5,8 +5,8 @@ import unittest from kafka import * # noqa from kafka.common import * # noqa from kafka.codec import has_gzip, has_snappy -from .fixtures import ZookeeperFixture, KafkaFixture -from .testutil import * +from fixtures import ZookeeperFixture, KafkaFixture +from testutil import * class TestKafkaProducerIntegration(KafkaIntegrationTestCase): topic = 'produce_topic' diff --git a/test/testutil.py b/test/testutil.py index ccb3955..9d2ea9c 100644 --- a/test/testutil.py +++ b/test/testutil.py @@ -1,3 +1,4 @@ +import functools import logging import os import random @@ -15,6 +16,7 @@ __all__ = [ 'skip_integration', 'ensure_topic_creation', 'get_open_port', + 'kafka_versions', 'KafkaIntegrationTestCase', 'Timer', ] @@ -26,6 +28,16 @@ def random_string(l): def skip_integration(): return os.environ.get('SKIP_INTEGRATION') +def kafka_versions(*versions): + def kafka_versions(func): + @functools.wraps(func) + def wrapper(self): + if os.environ.get('KAFKA_VERSION', None) not in versions: + self.skipTest("unsupported kafka version") + return func(self) + return wrapper + return kafka_versions + def ensure_topic_creation(client, topic_name, timeout = 30): start_time = time.time() |