summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/consumer.py22
-rw-r--r--test/test_client_integration.py16
-rw-r--r--test/test_codec.py3
-rw-r--r--test/test_consumer_integration.py31
-rw-r--r--test/test_failover_integration.py12
-rw-r--r--test/test_producer_integration.py27
-rw-r--r--test/test_protocol.py4
-rw-r--r--test/testutil.py17
8 files changed, 96 insertions, 36 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
index 14b84fe..d855874 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -10,6 +10,7 @@ from Queue import Empty, Queue
from kafka.common import (
ErrorMapping, FetchRequest,
OffsetRequest, OffsetCommitRequest,
+ OffsetFetchRequest,
ConsumerFetchSizeTooSmall, ConsumerNoMoreData
)
@@ -105,17 +106,16 @@ class Consumer(object):
"partition=%d failed with errorcode=%s" % (
resp.topic, resp.partition, resp.error))
- # Uncomment for 0.8.1
- #
- #for partition in partitions:
- # req = OffsetFetchRequest(topic, partition)
- # (offset,) = self.client.send_offset_fetch_request(group, [req],
- # callback=get_or_init_offset_callback,
- # fail_on_error=False)
- # self.offsets[partition] = offset
-
- for partition in partitions:
- self.offsets[partition] = 0
+ if auto_commit:
+ for partition in partitions:
+ req = OffsetFetchRequest(topic, partition)
+ (offset,) = self.client.send_offset_fetch_request(group, [req],
+ callback=get_or_init_offset_callback,
+ fail_on_error=False)
+ self.offsets[partition] = offset
+ else:
+ for partition in partitions:
+ self.offsets[partition] = 0
def commit(self, partitions=None):
"""
diff --git a/test/test_client_integration.py b/test/test_client_integration.py
index b3d01fc..881d0ae 100644
--- a/test/test_client_integration.py
+++ b/test/test_client_integration.py
@@ -1,25 +1,32 @@
-import unittest
-import time
-import socket
+import os
import random
+import socket
+import time
+import unittest
import kafka
from kafka.common import *
from fixtures import ZookeeperFixture, KafkaFixture
from testutil import *
-@unittest.skipIf(skip_integration(), 'Skipping Integration')
class TestKafkaClientIntegration(KafkaIntegrationTestCase):
@classmethod
def setUpClass(cls): # noqa
+ if not os.environ.get('KAFKA_VERSION'):
+ return
+
cls.zk = ZookeeperFixture.instance()
cls.server = KafkaFixture.instance(0, cls.zk.host, cls.zk.port)
@classmethod
def tearDownClass(cls): # noqa
+ if not os.environ.get('KAFKA_VERSION'):
+ return
+
cls.server.close()
cls.zk.close()
+ @kafka_versions("all")
def test_timeout(self):
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_port = get_open_port()
@@ -30,6 +37,7 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase):
conn = kafka.conn.KafkaConnection("localhost", server_port, 1.0)
self.assertGreaterEqual(t.interval, 1.0)
+ @kafka_versions("all")
def test_consume_none(self):
fetch = FetchRequest(self.topic, 0, 0, 1024)
diff --git a/test/test_codec.py b/test/test_codec.py
index c311c52..40bd1b4 100644
--- a/test/test_codec.py
+++ b/test/test_codec.py
@@ -14,7 +14,7 @@ from kafka.common import (
LeaderUnavailableError, PartitionUnavailableError
)
from kafka.codec import (
- has_gzip, has_snappy, gzip_encode, gzip_decode,
+ has_snappy, gzip_encode, gzip_decode,
snappy_encode, snappy_decode
)
from kafka.protocol import (
@@ -23,7 +23,6 @@ from kafka.protocol import (
from testutil import *
class TestCodec(unittest.TestCase):
- @unittest.skipUnless(has_gzip(), "Gzip not available")
def test_gzip(self):
for i in xrange(1000):
s1 = random_string(100)
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index a1d9515..b1d1a59 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -1,3 +1,4 @@
+import os
import unittest
from datetime import datetime
@@ -7,10 +8,12 @@ from kafka.consumer import MAX_FETCH_BUFFER_SIZE_BYTES
from fixtures import ZookeeperFixture, KafkaFixture
from testutil import *
-@unittest.skipIf(skip_integration(), 'Skipping Integration')
class TestConsumerIntegration(KafkaIntegrationTestCase):
@classmethod
def setUpClass(cls):
+ if not os.environ.get('KAFKA_VERSION'):
+ return
+
cls.zk = ZookeeperFixture.instance()
cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port)
cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port)
@@ -19,6 +22,9 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
@classmethod
def tearDownClass(cls): # noqa
+ if not os.environ.get('KAFKA_VERSION'):
+ return
+
cls.server1.close()
cls.server2.close()
cls.zk.close()
@@ -38,6 +44,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
# Make sure there are no duplicates
self.assertEquals(len(set(messages)), num_messages)
+ @kafka_versions("all")
def test_simple_consumer(self):
self.send_messages(0, range(0, 100))
self.send_messages(1, range(100, 200))
@@ -51,6 +58,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer.stop()
+ @kafka_versions("all")
def test_simple_consumer__seek(self):
self.send_messages(0, range(0, 100))
self.send_messages(1, range(100, 200))
@@ -69,6 +77,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer.stop()
+ @kafka_versions("all")
def test_simple_consumer_blocking(self):
consumer = SimpleConsumer(self.client, "group1",
self.topic,
@@ -96,6 +105,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer.stop()
+ @kafka_versions("all")
def test_simple_consumer_pending(self):
# Produce 10 messages to partitions 0 and 1
self.send_messages(0, range(0, 10))
@@ -110,6 +120,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer.stop()
+ @kafka_versions("all")
def test_multi_process_consumer(self):
# Produce 100 messages to partitions 0 and 1
self.send_messages(0, range(0, 100))
@@ -121,6 +132,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer.stop()
+ @kafka_versions("all")
def test_multi_process_consumer_blocking(self):
consumer = MultiProcessConsumer(self.client, "grp1", self.topic, auto_commit=False)
@@ -148,6 +160,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer.stop()
+ @kafka_versions("all")
def test_multi_proc_pending(self):
self.send_messages(0, range(0, 10))
self.send_messages(1, range(10, 20))
@@ -160,6 +173,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer.stop()
+ @kafka_versions("all")
def test_large_messages(self):
# Produce 10 "normal" size messages
small_messages = self.send_messages(0, [ str(x) for x in range(10) ])
@@ -177,6 +191,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer.stop()
+ @kafka_versions("all")
def test_huge_messages(self):
huge_message, = self.send_messages(0, [
create_message(random_string(MAX_FETCH_BUFFER_SIZE_BYTES + 10)),
@@ -213,23 +228,25 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
msgs2 = self.send_messages(1, range(100, 200))
# Start a consumer
- consumer = SimpleConsumer(self.client, "group1",
+ consumer1 = SimpleConsumer(self.client, "group1",
self.topic, auto_commit=True,
+ auto_commit_every_t=600,
auto_commit_every_n=20,
iter_timeout=0)
# Grab the first 195 messages
- output_msgs1 = [ consumer.get_message().message.value for _ in xrange(195) ]
+ output_msgs1 = [ consumer1.get_message().message.value for _ in xrange(195) ]
self.assert_message_count(output_msgs1, 195)
- consumer.stop()
# The offset should be at 180
- consumer = SimpleConsumer(self.client, "group1",
+ consumer2 = SimpleConsumer(self.client, "group1",
self.topic, auto_commit=True,
+ auto_commit_every_t=600,
auto_commit_every_n=20,
iter_timeout=0)
# 180-200
- self.assert_message_count([ message for message in consumer ], 20)
+ self.assert_message_count([ message for message in consumer2 ], 20)
- consumer.stop()
+ consumer1.stop()
+ consumer2.stop()
diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py
index 782907b..e30b298 100644
--- a/test/test_failover_integration.py
+++ b/test/test_failover_integration.py
@@ -1,17 +1,20 @@
-import unittest
+import os
import time
+import unittest
from kafka import * # noqa
from kafka.common import * # noqa
from fixtures import ZookeeperFixture, KafkaFixture
from testutil import *
-@unittest.skipIf(skip_integration(), 'Skipping Integration')
class TestFailover(KafkaIntegrationTestCase):
create_client = False
@classmethod
def setUpClass(cls): # noqa
+ if not os.environ.get('KAFKA_VERSION'):
+ return
+
zk_chroot = random_string(10)
replicas = 2
partitions = 2
@@ -26,11 +29,15 @@ class TestFailover(KafkaIntegrationTestCase):
@classmethod
def tearDownClass(cls):
+ if not os.environ.get('KAFKA_VERSION'):
+ return
+
cls.client.close()
for broker in cls.brokers:
broker.close()
cls.zk.close()
+ @kafka_versions("all")
def test_switch_leader(self):
key, topic, partition = random_string(5), self.topic, 0
producer = SimpleProducer(self.client)
@@ -62,6 +69,7 @@ class TestFailover(KafkaIntegrationTestCase):
producer.stop()
+ @kafka_versions("all")
def test_switch_leader_async(self):
key, topic, partition = random_string(5), self.topic, 0
producer = SimpleProducer(self.client, async=True)
diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py
index 6723ff7..41e9c53 100644
--- a/test/test_producer_integration.py
+++ b/test/test_producer_integration.py
@@ -1,6 +1,7 @@
-import uuid
+import os
import time
import unittest
+import uuid
from kafka import * # noqa
from kafka.common import * # noqa
@@ -13,14 +14,21 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
@classmethod
def setUpClass(cls): # noqa
+ if not os.environ.get('KAFKA_VERSION'):
+ return
+
cls.zk = ZookeeperFixture.instance()
cls.server = KafkaFixture.instance(0, cls.zk.host, cls.zk.port)
@classmethod
def tearDownClass(cls): # noqa
+ if not os.environ.get('KAFKA_VERSION'):
+ return
+
cls.server.close()
cls.zk.close()
+ @kafka_versions("all")
def test_produce_many_simple(self):
start_offset = self.current_offset(self.topic, 0)
@@ -36,6 +44,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
100,
)
+ @kafka_versions("all")
def test_produce_10k_simple(self):
start_offset = self.current_offset(self.topic, 0)
@@ -45,6 +54,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
10000,
)
+ @kafka_versions("all")
def test_produce_many_gzip(self):
start_offset = self.current_offset(self.topic, 0)
@@ -57,8 +67,9 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
200,
)
- @unittest.skip("All snappy integration tests fail with nosnappyjava")
+ @kafka_versions("all")
def test_produce_many_snappy(self):
+ self.skipTest("All snappy integration tests fail with nosnappyjava")
start_offset = self.current_offset(self.topic, 0)
self.assert_produce_request([
@@ -69,6 +80,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
200,
)
+ @kafka_versions("all")
def test_produce_mixed(self):
start_offset = self.current_offset(self.topic, 0)
@@ -85,6 +97,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
self.assert_produce_request(messages, start_offset, msg_count)
+ @kafka_versions("all")
def test_produce_100k_gzipped(self):
start_offset = self.current_offset(self.topic, 0)
@@ -106,6 +119,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
# SimpleProducer Tests #
############################
+ @kafka_versions("all")
def test_simple_producer(self):
start_offset0 = self.current_offset(self.topic, 0)
start_offset1 = self.current_offset(self.topic, 1)
@@ -130,6 +144,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
producer.stop()
+ @kafka_versions("all")
def test_round_robin_partitioner(self):
msg1, msg2, msg3, msg4 = [ str(uuid.uuid4()) for _ in range(4) ]
@@ -152,6 +167,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
producer.stop()
+ @kafka_versions("all")
def test_hashed_partitioner(self):
start_offset0 = self.current_offset(self.topic, 0)
start_offset1 = self.current_offset(self.topic, 1)
@@ -174,6 +190,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
producer.stop()
+ @kafka_versions("all")
def test_acks_none(self):
start_offset0 = self.current_offset(self.topic, 0)
start_offset1 = self.current_offset(self.topic, 1)
@@ -185,6 +202,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ])
producer.stop()
+ @kafka_versions("all")
def test_acks_local_write(self):
start_offset0 = self.current_offset(self.topic, 0)
start_offset1 = self.current_offset(self.topic, 1)
@@ -197,6 +215,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
producer.stop()
+ @kafka_versions("all")
def test_acks_cluster_commit(self):
start_offset0 = self.current_offset(self.topic, 0)
start_offset1 = self.current_offset(self.topic, 1)
@@ -211,6 +230,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
producer.stop()
+ @kafka_versions("all")
def test_batched_simple_producer__triggers_by_message(self):
start_offset0 = self.current_offset(self.topic, 0)
start_offset1 = self.current_offset(self.topic, 1)
@@ -259,6 +279,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
producer.stop()
+ @kafka_versions("all")
def test_batched_simple_producer__triggers_by_time(self):
start_offset0 = self.current_offset(self.topic, 0)
start_offset1 = self.current_offset(self.topic, 1)
@@ -310,6 +331,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
producer.stop()
+ @kafka_versions("all")
def test_async_simple_producer(self):
start_offset0 = self.current_offset(self.topic, 0)
start_offset1 = self.current_offset(self.topic, 1)
@@ -322,6 +344,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
producer.stop()
+ @kafka_versions("all")
def test_async_keyed_producer(self):
start_offset0 = self.current_offset(self.topic, 0)
start_offset1 = self.current_offset(self.topic, 1)
diff --git a/test/test_protocol.py b/test/test_protocol.py
index 555fe10..125169f 100644
--- a/test/test_protocol.py
+++ b/test/test_protocol.py
@@ -12,7 +12,7 @@ from kafka.common import (
LeaderUnavailableError, PartitionUnavailableError
)
from kafka.codec import (
- has_gzip, has_snappy, gzip_encode, gzip_decode,
+ has_snappy, gzip_encode, gzip_decode,
snappy_encode, snappy_decode
)
from kafka.protocol import (
@@ -29,7 +29,6 @@ class TestProtocol(unittest.TestCase):
self.assertEqual(msg.key, key)
self.assertEqual(msg.value, payload)
- @unittest.skipUnless(has_gzip(), "gzip not available")
def test_create_gzip(self):
payloads = ["v1", "v2"]
msg = create_gzip_message(payloads)
@@ -197,7 +196,6 @@ class TestProtocol(unittest.TestCase):
self.assertEqual(returned_offset2, 1)
self.assertEqual(decoded_message2, create_message("v2", "k2"))
- @unittest.skipUnless(has_gzip(), "Gzip not available")
def test_decode_message_gzip(self):
gzip_encoded = ('\xc0\x11\xb2\xf0\x00\x01\xff\xff\xff\xff\x00\x00\x000'
'\x1f\x8b\x08\x00\xa1\xc1\xc5R\x02\xffc`\x80\x03\x01'
diff --git a/test/testutil.py b/test/testutil.py
index 9d2ea9c..61fe9bd 100644
--- a/test/testutil.py
+++ b/test/testutil.py
@@ -13,7 +13,6 @@ from kafka import KafkaClient
__all__ = [
'random_string',
- 'skip_integration',
'ensure_topic_creation',
'get_open_port',
'kafka_versions',
@@ -25,15 +24,17 @@ def random_string(l):
s = "".join(random.choice(string.letters) for i in xrange(l))
return s
-def skip_integration():
- return os.environ.get('SKIP_INTEGRATION')
-
def kafka_versions(*versions):
def kafka_versions(func):
@functools.wraps(func)
def wrapper(self):
- if os.environ.get('KAFKA_VERSION', None) not in versions:
+ kafka_version = os.environ.get('KAFKA_VERSION')
+
+ if not kafka_version:
+ self.skipTest("no kafka version specified")
+ elif 'all' not in versions and kafka_version not in versions:
self.skipTest("unsupported kafka version")
+
return func(self)
return wrapper
return kafka_versions
@@ -61,6 +62,9 @@ class KafkaIntegrationTestCase(unittest.TestCase):
def setUp(self):
super(KafkaIntegrationTestCase, self).setUp()
+ if not os.environ.get('KAFKA_VERSION'):
+ return
+
if not self.topic:
self.topic = "%s-%s" % (self.id()[self.id().rindex(".") + 1:], random_string(10))
@@ -73,6 +77,9 @@ class KafkaIntegrationTestCase(unittest.TestCase):
def tearDown(self):
super(KafkaIntegrationTestCase, self).tearDown()
+ if not os.environ.get('KAFKA_VERSION'):
+ return
+
if self.create_client:
self.client.close()