summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMark Roberts <wizzat@gmail.com>2014-04-25 10:55:04 -0700
committerMark Roberts <wizzat@gmail.com>2014-04-25 10:55:04 -0700
commit57913f9f914a959f52bc9040a172f8c9ff77e491 (patch)
treefe5cc6c14283a4c9d9175a748ef97f7d55df6fd7
parent0e50f33ec678f6d656d488ce8a4537f95bba003e (diff)
downloadkafka-python-57913f9f914a959f52bc9040a172f8c9ff77e491.tar.gz
Various fixes
Bump version number to 0.9.1 Update readme to show supported Kafka/Python versions Validate arguments in consumer.py, add initial consumer unit test Make service kill() child processes when startup fails Add tests for util.py, fix Python 2.6 specific bug.
-rw-r--r--README.md19
-rw-r--r--kafka/consumer.py3
-rw-r--r--kafka/util.py8
-rw-r--r--setup.py2
-rw-r--r--test/service.py20
-rw-r--r--test/test_consumer.py22
-rw-r--r--test/test_util.py92
-rw-r--r--tox.ini2
8 files changed, 146 insertions, 22 deletions
diff --git a/README.md b/README.md
index ece8d80..8e99124 100644
--- a/README.md
+++ b/README.md
@@ -7,8 +7,6 @@ high-level consumer and producer classes. Request batching is supported by the
protocol as well as broker-aware request routing. Gzip and Snappy compression
is also supported for message sets.
-Compatible with Apache Kafka 0.8.1
-
http://kafka.apache.org/
# License
@@ -17,8 +15,17 @@ Copyright 2013, David Arthur under Apache License, v2.0. See `LICENSE`
# Status
-The current version of this package is **0.9.0** and is compatible with
-Kafka brokers running version **0.8.1**.
+The current version of this package is **0.9.1** and is compatible with
+
+Kafka broker versions
+- 0.8.0
+- 0.8.1
+- 0.8.1.1
+
+Python versions
+- 2.6.9
+- 2.7.6
+- pypy 2.2.1
# Usage
@@ -209,6 +216,6 @@ git submodule update --init
Then run the tests against supported Kafka versions:
```shell
- KAFKA_VERSION=0.8.0 tox
- KAFKA_VERSION=0.8.1 tox
+KAFKA_VERSION=0.8.0 tox
+KAFKA_VERSION=0.8.1 tox
```
diff --git a/kafka/consumer.py b/kafka/consumer.py
index 3f8d8c2..98f18a0 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -3,6 +3,7 @@ from __future__ import absolute_import
from itertools import izip_longest, repeat
import logging
import time
+import numbers
from threading import Lock
from multiprocessing import Process, Queue as MPQueue, Event, Value
from Queue import Empty, Queue
@@ -81,6 +82,8 @@ class Consumer(object):
if not partitions:
partitions = self.client.topic_partitions[topic]
+ else:
+ assert all(isinstance(x, numbers.Integral) for x in partitions)
# Variables for handling offset commits
self.commit_lock = Lock()
diff --git a/kafka/util.py b/kafka/util.py
index 54052fb..0577a88 100644
--- a/kafka/util.py
+++ b/kafka/util.py
@@ -1,5 +1,6 @@
-from collections import defaultdict
+import sys
import struct
+import collections
from threading import Thread, Event
from kafka.common import BufferUnderflowError
@@ -15,6 +16,9 @@ def write_int_string(s):
def write_short_string(s):
if s is None:
return struct.pack('>h', -1)
+ elif len(s) > 32767 and sys.version < (2,7):
+ # Python 2.6 issues a deprecation warning instead of a struct error
+ raise struct.error(len(s))
else:
return struct.pack('>h%ds' % len(s), len(s), s)
@@ -63,7 +67,7 @@ def relative_unpack(fmt, data, cur):
def group_by_topic_and_partition(tuples):
- out = defaultdict(dict)
+ out = collections.defaultdict(dict)
for t in tuples:
out[t.topic][t.partition] = t
return out
diff --git a/setup.py b/setup.py
index 009e14f..86d1d9f 100644
--- a/setup.py
+++ b/setup.py
@@ -20,7 +20,7 @@ class Tox(Command):
setup(
name="kafka-python",
- version="0.9.0",
+ version="0.9.1",
install_requires=["distribute"],
tests_require=["tox", "mock"],
diff --git a/test/service.py b/test/service.py
index 1b95cbc..78a5f24 100644
--- a/test/service.py
+++ b/test/service.py
@@ -66,7 +66,7 @@ class SpawnedService(threading.Thread):
stderr_handle.close()
def run_with_handles(self, stdout_handle, stderr_handle):
- child = subprocess.Popen(
+ self.child = subprocess.Popen(
self.args,
bufsize=1,
stdout=subprocess.PIPE,
@@ -74,10 +74,10 @@ class SpawnedService(threading.Thread):
alive = True
while True:
- (rds, wds, xds) = select.select([child.stdout, child.stderr], [], [], 1)
+ (rds, wds, xds) = select.select([self.child.stdout, self.child.stderr], [], [], 1)
- if child.stdout in rds:
- line = child.stdout.readline()
+ if self.child.stdout in rds:
+ line = self.child.stdout.readline()
if stdout_handle:
stdout_handle.write(line)
stdout_handle.flush()
@@ -87,8 +87,8 @@ class SpawnedService(threading.Thread):
sys.stdout.write(line)
sys.stdout.flush()
- if child.stderr in rds:
- line = child.stderr.readline()
+ if self.child.stderr in rds:
+ line = self.child.stderr.readline()
if stderr_handle:
stderr_handle.write(line)
stderr_handle.flush()
@@ -99,10 +99,10 @@ class SpawnedService(threading.Thread):
sys.stderr.flush()
if self.should_die.is_set():
- child.terminate()
+ self.child.terminate()
alive = False
- if child.poll() is not None:
+ if self.child.poll() is not None:
if not alive:
break
else:
@@ -113,6 +113,10 @@ class SpawnedService(threading.Thread):
while True:
t2 = time.time()
if t2 - t1 >= timeout:
+ try:
+ self.child.kill()
+ except:
+ logging.exception("Received exception when killing child process")
raise RuntimeError("Waiting for %r timed out" % pattern)
if re.search(pattern, self.captured_stdout, re.IGNORECASE) is not None:
diff --git a/test/test_consumer.py b/test/test_consumer.py
new file mode 100644
index 0000000..778d76a
--- /dev/null
+++ b/test/test_consumer.py
@@ -0,0 +1,22 @@
+import os
+import random
+import struct
+import unittest2
+
+from mock import MagicMock, patch
+
+from kafka import KafkaClient
+from kafka.consumer import SimpleConsumer
+from kafka.common import (
+ ProduceRequest, BrokerMetadata, PartitionMetadata,
+ TopicAndPartition, KafkaUnavailableError,
+ LeaderUnavailableError, PartitionUnavailableError
+)
+from kafka.protocol import (
+ create_message, KafkaProtocol
+)
+
+class TestKafkaConsumer(unittest2.TestCase):
+ def test_non_integer_partitions(self):
+ with self.assertRaises(AssertionError):
+ consumer = SimpleConsumer(MagicMock(), 'group', 'topic', partitions = [ '0' ])
diff --git a/test/test_util.py b/test/test_util.py
index b85585b..8179b01 100644
--- a/test/test_util.py
+++ b/test/test_util.py
@@ -3,16 +3,100 @@ import random
import struct
import unittest2
import kafka.util
+import kafka.common
class UtilTest(unittest2.TestCase):
@unittest2.skip("Unwritten")
def test_relative_unpack(self):
pass
- @unittest2.skip("Unwritten")
def test_write_int_string(self):
- pass
+ self.assertEqual(
+ kafka.util.write_int_string('some string'),
+ '\x00\x00\x00\x0bsome string'
+ )
+
+ def test_write_int_string__empty(self):
+ self.assertEqual(
+ kafka.util.write_int_string(''),
+ '\x00\x00\x00\x00'
+ )
+
+ def test_write_int_string__null(self):
+ self.assertEqual(
+ kafka.util.write_int_string(None),
+ '\xff\xff\xff\xff'
+ )
- @unittest2.skip("Unwritten")
def test_read_int_string(self):
- pass
+ self.assertEqual(kafka.util.read_int_string('\xff\xff\xff\xff', 0), (None, 4))
+ self.assertEqual(kafka.util.read_int_string('\x00\x00\x00\x00', 0), ('', 4))
+ self.assertEqual(kafka.util.read_int_string('\x00\x00\x00\x0bsome string', 0), ('some string', 15))
+
+ def test_read_int_string__insufficient_data(self):
+ with self.assertRaises(kafka.common.BufferUnderflowError):
+ kafka.util.read_int_string('\x00\x00\x00\x021', 0)
+
+ def test_write_short_string(self):
+ self.assertEqual(
+ kafka.util.write_short_string('some string'),
+ '\x00\x0bsome string'
+ )
+
+ def test_write_short_string__empty(self):
+ self.assertEqual(
+ kafka.util.write_short_string(''),
+ '\x00\x00'
+ )
+
+ def test_write_short_string__null(self):
+ self.assertEqual(
+ kafka.util.write_short_string(None),
+ '\xff\xff'
+ )
+
+ def test_write_short_string__too_long(self):
+ with self.assertRaises(struct.error):
+ kafka.util.write_short_string(' ' * 33000)
+
+ def test_read_short_string(self):
+ self.assertEqual(kafka.util.read_short_string('\xff\xff', 0), (None, 2))
+ self.assertEqual(kafka.util.read_short_string('\x00\x00', 0), ('', 2))
+ self.assertEqual(kafka.util.read_short_string('\x00\x0bsome string', 0), ('some string', 13))
+
+ def test_read_int_string__insufficient_data(self):
+ with self.assertRaises(kafka.common.BufferUnderflowError):
+ kafka.util.read_int_string('\x00\x021', 0)
+
+ def test_relative_unpack(self):
+ self.assertEqual(
+ kafka.util.relative_unpack('>hh', '\x00\x01\x00\x00\x02', 0),
+ ((1, 0), 4)
+ )
+
+ def test_relative_unpack(self):
+ with self.assertRaises(kafka.common.BufferUnderflowError):
+ kafka.util.relative_unpack('>hh', '\x00', 0)
+
+
+ def test_group_by_topic_and_partition(self):
+ t = kafka.common.TopicAndPartition
+
+ l = [
+ t("a", 1),
+ t("a", 1),
+ t("a", 2),
+ t("a", 3),
+ t("b", 3),
+ ]
+
+ self.assertEqual(kafka.util.group_by_topic_and_partition(l), {
+ "a" : {
+ 1 : t("a", 1),
+ 2 : t("a", 2),
+ 3 : t("a", 3),
+ },
+ "b" : {
+ 3 : t("b", 3),
+ }
+ })
diff --git a/tox.ini b/tox.ini
index 01f5a86..3c5fd17 100644
--- a/tox.ini
+++ b/tox.ini
@@ -1,5 +1,5 @@
[tox]
-envlist = py26, py27
+envlist = py26, py27, pypy
[testenv]
deps =
unittest2