summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMark Roberts <wizzat@gmail.com>2014-04-23 09:29:30 -0700
committerMark Roberts <wizzat@gmail.com>2014-04-23 09:29:30 -0700
commit86e1ac7b96a41cf84e220fa25a11f138555d5c7e (patch)
tree624d8f57f8109d47fa355af31223220c9807770d
parent7c21dfece73e717029c8a582a28ed9ff1f885cb7 (diff)
downloadkafka-python-86e1ac7b96a41cf84e220fa25a11f138555d5c7e.tar.gz
Add test support for multiple versions of kafka. Uncomment first 0.8.1 specific test. Add rudimentary (failing) consumer resumption test
-rw-r--r--test/fixtures.py4
-rw-r--r--test/test_client_integration.py6
-rw-r--r--test/test_codec.py2
-rw-r--r--test/test_consumer_integration.py31
-rw-r--r--test/test_failover_integration.py4
-rw-r--r--test/test_producer_integration.py4
-rw-r--r--test/testutil.py12
7 files changed, 51 insertions, 12 deletions
diff --git a/test/fixtures.py b/test/fixtures.py
index 7b032f1..df6faec 100644
--- a/test/fixtures.py
+++ b/test/fixtures.py
@@ -7,8 +7,8 @@ import tempfile
import uuid
from urlparse import urlparse
-from .service import ExternalService, SpawnedService
-from .testutil import get_open_port
+from service import ExternalService, SpawnedService
+from testutil import get_open_port
class Fixture(object):
kafka_version = os.environ.get('KAFKA_VERSION', '0.8.0')
diff --git a/test/test_client_integration.py b/test/test_client_integration.py
index 29a0cd0..b3d01fc 100644
--- a/test/test_client_integration.py
+++ b/test/test_client_integration.py
@@ -5,8 +5,8 @@ import random
import kafka
from kafka.common import *
-from .fixtures import ZookeeperFixture, KafkaFixture
-from .testutil import *
+from fixtures import ZookeeperFixture, KafkaFixture
+from testutil import *
@unittest.skipIf(skip_integration(), 'Skipping Integration')
class TestKafkaClientIntegration(KafkaIntegrationTestCase):
@@ -45,7 +45,7 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase):
# Offset Tests #
####################
- @unittest.skip('commit offset not supported in this version')
+ @kafka_versions("0.8.1")
def test_commit_fetch_offsets(self):
req = OffsetCommitRequest(self.topic, 0, 42, "metadata")
(resp,) = self.client.send_offset_commit_request("group", [req])
diff --git a/test/test_codec.py b/test/test_codec.py
index 7fedb71..c311c52 100644
--- a/test/test_codec.py
+++ b/test/test_codec.py
@@ -20,7 +20,7 @@ from kafka.codec import (
from kafka.protocol import (
create_gzip_message, create_message, create_snappy_message, KafkaProtocol
)
-from .testutil import *
+from testutil import *
class TestCodec(unittest.TestCase):
@unittest.skipUnless(has_gzip(), "Gzip not available")
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index b8050a4..a1d9515 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -4,8 +4,8 @@ from datetime import datetime
from kafka import * # noqa
from kafka.common import * # noqa
from kafka.consumer import MAX_FETCH_BUFFER_SIZE_BYTES
-from .fixtures import ZookeeperFixture, KafkaFixture
-from .testutil import *
+from fixtures import ZookeeperFixture, KafkaFixture
+from testutil import *
@unittest.skipIf(skip_integration(), 'Skipping Integration')
class TestConsumerIntegration(KafkaIntegrationTestCase):
@@ -206,3 +206,30 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
self.assertEquals(message.message.value, huge_message)
big_consumer.stop()
+
+ @kafka_versions("0.8.1")
+ def test_offset_behavior__resuming_behavior(self):
+ msgs1 = self.send_messages(0, range(0, 100))
+ msgs2 = self.send_messages(1, range(100, 200))
+
+ # Start a consumer
+ consumer = SimpleConsumer(self.client, "group1",
+ self.topic, auto_commit=True,
+ auto_commit_every_n=20,
+ iter_timeout=0)
+
+ # Grab the first 195 messages
+ output_msgs1 = [ consumer.get_message().message.value for _ in xrange(195) ]
+ self.assert_message_count(output_msgs1, 195)
+ consumer.stop()
+
+ # The offset should be at 180
+ consumer = SimpleConsumer(self.client, "group1",
+ self.topic, auto_commit=True,
+ auto_commit_every_n=20,
+ iter_timeout=0)
+
+ # 180-200
+ self.assert_message_count([ message for message in consumer ], 20)
+
+ consumer.stop()
diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py
index 1211087..782907b 100644
--- a/test/test_failover_integration.py
+++ b/test/test_failover_integration.py
@@ -3,8 +3,8 @@ import time
from kafka import * # noqa
from kafka.common import * # noqa
-from .fixtures import ZookeeperFixture, KafkaFixture
-from .testutil import *
+from fixtures import ZookeeperFixture, KafkaFixture
+from testutil import *
@unittest.skipIf(skip_integration(), 'Skipping Integration')
class TestFailover(KafkaIntegrationTestCase):
diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py
index eb07d0a..6723ff7 100644
--- a/test/test_producer_integration.py
+++ b/test/test_producer_integration.py
@@ -5,8 +5,8 @@ import unittest
from kafka import * # noqa
from kafka.common import * # noqa
from kafka.codec import has_gzip, has_snappy
-from .fixtures import ZookeeperFixture, KafkaFixture
-from .testutil import *
+from fixtures import ZookeeperFixture, KafkaFixture
+from testutil import *
class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
topic = 'produce_topic'
diff --git a/test/testutil.py b/test/testutil.py
index ccb3955..9d2ea9c 100644
--- a/test/testutil.py
+++ b/test/testutil.py
@@ -1,3 +1,4 @@
+import functools
import logging
import os
import random
@@ -15,6 +16,7 @@ __all__ = [
'skip_integration',
'ensure_topic_creation',
'get_open_port',
+ 'kafka_versions',
'KafkaIntegrationTestCase',
'Timer',
]
@@ -26,6 +28,16 @@ def random_string(l):
def skip_integration():
return os.environ.get('SKIP_INTEGRATION')
+def kafka_versions(*versions):
+ def kafka_versions(func):
+ @functools.wraps(func)
+ def wrapper(self):
+ if os.environ.get('KAFKA_VERSION', None) not in versions:
+ self.skipTest("unsupported kafka version")
+ return func(self)
+ return wrapper
+ return kafka_versions
+
def ensure_topic_creation(client, topic_name, timeout = 30):
start_time = time.time()