summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--README.md49
-rw-r--r--kafka/conn.py2
-rw-r--r--kafka/consumer.py12
-rw-r--r--kafka/producer.py10
-rw-r--r--setup.py4
-rw-r--r--test/test_client_integration.py12
-rw-r--r--test/test_codec.py5
-rw-r--r--test/test_conn.py4
-rw-r--r--test/test_consumer.py16
-rw-r--r--test/test_consumer_integration.py16
-rw-r--r--test/test_failover_integration.py53
-rw-r--r--test/test_producer.py8
-rw-r--r--test/test_producer_integration.py24
-rw-r--r--test/test_protocol.py25
-rw-r--r--test/test_util.py2
15 files changed, 129 insertions, 113 deletions
diff --git a/README.md b/README.md
index da6605f..a866ce7 100644
--- a/README.md
+++ b/README.md
@@ -37,13 +37,10 @@ Python versions
## High level
```python
-from kafka.client import KafkaClient
-from kafka.consumer import SimpleConsumer
-from kafka.producer import SimpleProducer, KeyedProducer
-
-kafka = KafkaClient("localhost:9092")
+from kafka import KafkaClient, SimpleProducer, SimpleConsumer
# To send messages synchronously
+kafka = KafkaClient("localhost:9092")
producer = SimpleProducer(kafka)
# Note that the application is responsible for encoding messages to type str
@@ -97,9 +94,7 @@ kafka.close()
## Keyed messages
```python
-from kafka.client import KafkaClient
-from kafka.producer import KeyedProducer
-from kafka.partitioner import HashedPartitioner, RoundRobinPartitioner
+from kafka import KafkaClient, KeyedProducer, HashedPartitioner, RoundRobinPartitioner
kafka = KafkaClient("localhost:9092")
@@ -113,8 +108,7 @@ producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner)
## Multiprocess consumer
```python
-from kafka.client import KafkaClient
-from kafka.consumer import MultiProcessConsumer
+from kafka import KafkaClient, MultiProcessConsumer
kafka = KafkaClient("localhost:9092")
@@ -135,10 +129,13 @@ for message in consumer.get_messages(count=5, block=True, timeout=4):
## Low level
```python
-from kafka.client import KafkaClient
+from kafka import KafkaClient
+from kafka.protocol import KafkaProtocol, ProduceRequest
+
kafka = KafkaClient("localhost:9092")
+
req = ProduceRequest(topic="my-topic", partition=1,
- messages=[KafkaProdocol.encode_message("some message")])
+ messages=[KafkaProtocol.encode_message("some message")])
resps = kafka.send_produce_request(payloads=[req], fail_on_error=True)
kafka.close()
@@ -152,9 +149,18 @@ resps[0].offset # offset of the first message sent in this request
Install with your favorite package manager
+## Latest Release
Pip:
```shell
+pip install kafka-python
+```
+
+Releases are also listed at https://github.com/mumrah/kafka-python/releases
+
+
+## Bleeding-Edge
+```shell
git clone https://github.com/mumrah/kafka-python
pip install ./kafka-python
```
@@ -211,8 +217,21 @@ pip install python-snappy
tox
```
-## Run a single unit test
+## Run a subset of unit tests
+```shell
+# run protocol tests only
+tox -- -v test.test_protocol
+```
+
```shell
+# test with pypy only
+tox -e pypy
+```
+
+```shell
+# Run only 1 test, and use python 2.7
+tox -e py27 -- -v --with-id --collect-only
+# pick a test number from the list like #102
tox -e py27 -- -v --with-id 102
```
@@ -233,11 +252,11 @@ and optionally set SCALA_VERSION (defaults to 2.8.0, but 2.10.1 is recommended)
SCALA_VERSION=2.10.1 KAFKA_VERSION=trunk ./build_integration.sh
```
-Then run the tests against supported Kafka versions:
+Then run the tests against supported Kafka versions, simply set the `KAFKA_VERSION`
+env variable to the server build you want to use for testing:
```shell
KAFKA_VERSION=0.8.0 tox
KAFKA_VERSION=0.8.1 tox
KAFKA_VERSION=0.8.1.1 tox
KAFKA_VERSION=trunk tox
```
-
diff --git a/kafka/conn.py b/kafka/conn.py
index a1b0a80..a577eba 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -1,8 +1,8 @@
import copy
import logging
+from random import shuffle
import socket
import struct
-from random import shuffle
from threading import local
from kafka.common import ConnectionError
diff --git a/kafka/consumer.py b/kafka/consumer.py
index 0935dd2..928bbac 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -8,12 +8,12 @@ from threading import Lock
from multiprocessing import Process, Queue as MPQueue, Event, Value
from Queue import Empty, Queue
-import kafka
+import kafka.common
from kafka.common import (
- FetchRequest,
- OffsetRequest, OffsetCommitRequest,
- OffsetFetchRequest,
- ConsumerFetchSizeTooSmall, ConsumerNoMoreData
+ FetchRequest, OffsetRequest,
+ OffsetCommitRequest, OffsetFetchRequest,
+ ConsumerFetchSizeTooSmall, ConsumerNoMoreData,
+ UnknownTopicOrPartitionError
)
from kafka.util import ReentrantTimer
@@ -114,7 +114,7 @@ class Consumer(object):
try:
kafka.common.check_error(resp)
return resp.offset
- except kafka.common.UnknownTopicOrPartitionError:
+ except UnknownTopicOrPartitionError:
return 0
for partition in partitions:
diff --git a/kafka/producer.py b/kafka/producer.py
index 8a6bff0..b28a424 100644
--- a/kafka/producer.py
+++ b/kafka/producer.py
@@ -156,11 +156,11 @@ class Producer(object):
Helper method to send produce requests
@param: topic, name of topic for produce request -- type str
@param: partition, partition number for produce request -- type int
- @param: *msg, one or more message payloads -- type str
+ @param: *msg, one or more message payloads -- type bytes
@returns: ResponseRequest returned by server
raises on error
- Note that msg type *must* be encoded to str by user.
+ Note that msg type *must* be encoded to bytes by user.
Passing unicode message will not work, for example
you should encode before calling send_messages via
something like `unicode_message.encode('utf-8')`
@@ -172,9 +172,9 @@ class Producer(object):
if not isinstance(msg, (list, tuple)):
raise TypeError("msg is not a list or tuple!")
- # Raise TypeError if any message is not encoded as a str
- if any(not isinstance(m, str) for m in msg):
- raise TypeError("all produce message payloads must be type str")
+ # Raise TypeError if any message is not encoded as bytes
+ if any(not isinstance(m, bytes) for m in msg):
+ raise TypeError("all produce message payloads must be type bytes")
if self.async:
for m in msg:
diff --git a/setup.py b/setup.py
index a7e1400..d8ec1d1 100644
--- a/setup.py
+++ b/setup.py
@@ -48,6 +48,10 @@ is also supported for message sets.
"Intended Audience :: Developers",
"License :: OSI Approved :: Apache Software License",
"Programming Language :: Python",
+ "Programming Language :: Python :: 2",
+ "Programming Language :: Python :: 2.6",
+ "Programming Language :: Python :: 2.7",
+ "Programming Language :: Python :: Implementation :: PyPy",
"Topic :: Software Development :: Libraries :: Python Modules"
]
)
diff --git a/test/test_client_integration.py b/test/test_client_integration.py
index b33d17c..b433146 100644
--- a/test/test_client_integration.py
+++ b/test/test_client_integration.py
@@ -2,10 +2,16 @@ import os
import socket
import unittest2
-import kafka
-from kafka.common import *
+from kafka.conn import KafkaConnection
+from kafka.common import (
+ FetchRequest, OffsetCommitRequest, OffsetFetchRequest,
+ KafkaTimeoutError
+)
+
from test.fixtures import ZookeeperFixture, KafkaFixture
-from test.testutil import *
+from test.testutil import (
+ KafkaIntegrationTestCase, get_open_port, kafka_versions, Timer
+)
class TestKafkaClientIntegration(KafkaIntegrationTestCase):
@classmethod
diff --git a/test/test_codec.py b/test/test_codec.py
index 2e6f67e..0ee7ce0 100644
--- a/test/test_codec.py
+++ b/test/test_codec.py
@@ -5,10 +5,7 @@ from kafka.codec import (
has_snappy, gzip_encode, gzip_decode,
snappy_encode, snappy_decode
)
-from kafka.protocol import (
- create_gzip_message, create_message, create_snappy_message, KafkaProtocol
-)
-from testutil import *
+from testutil import random_string
class TestCodec(unittest2.TestCase):
def test_gzip(self):
diff --git a/test/test_conn.py b/test/test_conn.py
index 5451398..931ace7 100644
--- a/test/test_conn.py
+++ b/test/test_conn.py
@@ -4,8 +4,8 @@ import struct
import mock
import unittest2
-from kafka.common import *
-from kafka.conn import *
+from kafka.common import ConnectionError
+from kafka.conn import KafkaConnection, collect_hosts, DEFAULT_SOCKET_TIMEOUT_SECONDS
class ConnTest(unittest2.TestCase):
def setUp(self):
diff --git a/test/test_consumer.py b/test/test_consumer.py
index 778d76a..f70e292 100644
--- a/test/test_consumer.py
+++ b/test/test_consumer.py
@@ -1,22 +1,10 @@
-import os
-import random
-import struct
import unittest2
-from mock import MagicMock, patch
+from mock import MagicMock
-from kafka import KafkaClient
from kafka.consumer import SimpleConsumer
-from kafka.common import (
- ProduceRequest, BrokerMetadata, PartitionMetadata,
- TopicAndPartition, KafkaUnavailableError,
- LeaderUnavailableError, PartitionUnavailableError
-)
-from kafka.protocol import (
- create_message, KafkaProtocol
-)
class TestKafkaConsumer(unittest2.TestCase):
def test_non_integer_partitions(self):
with self.assertRaises(AssertionError):
- consumer = SimpleConsumer(MagicMock(), 'group', 'topic', partitions = [ '0' ])
+ SimpleConsumer(MagicMock(), 'group', 'topic', partitions = [ '0' ])
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index 44dafe4..6576a32 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -1,11 +1,13 @@
import os
-from datetime import datetime
-from kafka import * # noqa
-from kafka.common import * # noqa
+from kafka import SimpleConsumer, MultiProcessConsumer, create_message
+from kafka.common import ProduceRequest, ConsumerFetchSizeTooSmall
from kafka.consumer import MAX_FETCH_BUFFER_SIZE_BYTES
-from fixtures import ZookeeperFixture, KafkaFixture
-from testutil import *
+
+from test.fixtures import ZookeeperFixture, KafkaFixture
+from test.testutil import (
+ KafkaIntegrationTestCase, kafka_versions, random_string, Timer
+)
class TestConsumerIntegration(KafkaIntegrationTestCase):
@classmethod
@@ -215,8 +217,8 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
@kafka_versions("0.8.1", "0.8.1.1")
def test_offset_behavior__resuming_behavior(self):
- msgs1 = self.send_messages(0, range(0, 100))
- msgs2 = self.send_messages(1, range(100, 200))
+ self.send_messages(0, range(0, 100))
+ self.send_messages(1, range(100, 200))
# Start a consumer
consumer1 = self.consumer(
diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py
index 6c0e662..5e737b0 100644
--- a/test/test_failover_integration.py
+++ b/test/test_failover_integration.py
@@ -3,11 +3,14 @@ import os
import time
import unittest2
-from kafka import * # noqa
-from kafka.common import * # noqa
+from kafka import KafkaClient, SimpleConsumer
+from kafka.common import TopicAndPartition, FailedPayloadsError, ConnectionError
from kafka.producer import Producer
-from fixtures import ZookeeperFixture, KafkaFixture
-from testutil import *
+
+from test.fixtures import ZookeeperFixture, KafkaFixture
+from test.testutil import (
+ KafkaIntegrationTestCase, kafka_versions, random_string
+)
class TestFailover(KafkaIntegrationTestCase):
@@ -42,16 +45,18 @@ class TestFailover(KafkaIntegrationTestCase):
@kafka_versions("all")
def test_switch_leader(self):
- key, topic, partition = random_string(5), self.topic, 0
+ topic = self.topic
+ partition = 0
# Test the base class Producer -- send_messages to a specific partition
- producer = Producer(self.client, async=False)
+ producer = Producer(self.client, async=False,
+ req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT)
# Send 10 random messages
- self._send_random_messages(producer, topic, partition, 10)
+ self._send_random_messages(producer, topic, partition, 100)
# kill leader for partition
- broker = self._kill_leader(topic, partition)
+ self._kill_leader(topic, partition)
# expect failure, but dont wait more than 60 secs to recover
recovered = False
@@ -71,20 +76,18 @@ class TestFailover(KafkaIntegrationTestCase):
self.assertTrue(recovered)
# send some more messages to new leader
- self._send_random_messages(producer, topic, partition, 10)
+ self._send_random_messages(producer, topic, partition, 100)
# count number of messages
- count = self._count_messages('test_switch_leader group', topic,
- partitions=(partition,))
-
# Should be equal to 10 before + 1 recovery + 10 after
- self.assertEquals(count, 21)
+ self.assert_message_count(topic, 201, partitions=(partition,))
#@kafka_versions("all")
@unittest2.skip("async producer does not support reliable failover yet")
def test_switch_leader_async(self):
- key, topic, partition = random_string(5), self.topic, 0
+ topic = self.topic
+ partition = 0
# Test the base class Producer -- send_messages to a specific partition
producer = Producer(self.client, async=True)
@@ -93,7 +96,7 @@ class TestFailover(KafkaIntegrationTestCase):
self._send_random_messages(producer, topic, partition, 10)
# kill leader for partition
- broker = self._kill_leader(topic, partition)
+ self._kill_leader(topic, partition)
logging.debug("attempting to send 'success' message after leader killed")
@@ -109,12 +112,8 @@ class TestFailover(KafkaIntegrationTestCase):
producer.stop()
# count number of messages
- count = self._count_messages('test_switch_leader_async group', topic,
- partitions=(partition,))
-
# Should be equal to 10 before + 1 recovery + 10 after
- self.assertEquals(count, 21)
-
+ self.assert_message_count(topic, 21, partitions=(partition,))
def _send_random_messages(self, producer, topic, partition, n):
for j in range(n):
@@ -130,17 +129,25 @@ class TestFailover(KafkaIntegrationTestCase):
broker.close()
return broker
- def _count_messages(self, group, topic, timeout=1, partitions=None):
+ def assert_message_count(self, topic, check_count, timeout=10, partitions=None):
hosts = ','.join(['%s:%d' % (broker.host, broker.port)
for broker in self.brokers])
client = KafkaClient(hosts)
+ group = random_string(10)
consumer = SimpleConsumer(client, group, topic,
partitions=partitions,
auto_commit=False,
iter_timeout=timeout)
- count = consumer.pending(partitions)
+ started_at = time.time()
+ pending = consumer.pending(partitions)
+
+ # Keep checking if it isn't immediately correct, subject to timeout
+ while pending != check_count and (time.time() - started_at < timeout):
+ pending = consumer.pending(partitions)
+
consumer.stop()
client.close()
- return count
+
+ self.assertEqual(pending, check_count)
diff --git a/test/test_producer.py b/test/test_producer.py
index a84e20f..e00f9af 100644
--- a/test/test_producer.py
+++ b/test/test_producer.py
@@ -1,14 +1,10 @@
# -*- coding: utf-8 -*-
import logging
-import os
-import random
-import struct
-import unittest2
-from mock import MagicMock, patch
+import unittest2
+from mock import MagicMock
-from kafka import KafkaClient
from kafka.producer import Producer
class TestKafkaProducer(unittest2.TestCase):
diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py
index 7d3a180..19d3a6d 100644
--- a/test/test_producer_integration.py
+++ b/test/test_producer_integration.py
@@ -2,11 +2,18 @@ import os
import time
import uuid
-from kafka import * # noqa
-from kafka.common import * # noqa
-from kafka.codec import has_gzip, has_snappy
-from fixtures import ZookeeperFixture, KafkaFixture
-from testutil import *
+from kafka import (
+ SimpleProducer, KeyedProducer,
+ create_message, create_gzip_message, create_snappy_message,
+ RoundRobinPartitioner, HashedPartitioner
+)
+from kafka.common import (
+ FetchRequest, ProduceRequest, UnknownTopicOrPartitionError
+)
+from kafka.codec import has_snappy
+
+from test.fixtures import ZookeeperFixture, KafkaFixture
+from test.testutil import KafkaIntegrationTestCase, kafka_versions
class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
topic = 'produce_topic'
@@ -149,7 +156,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
# At first it doesn't exist
with self.assertRaises(UnknownTopicOrPartitionError):
- resp = producer.send_messages(new_topic, self.msg("one"))
+ producer.send_messages(new_topic, self.msg("one"))
@kafka_versions("all")
def test_producer_random_order(self):
@@ -219,7 +226,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
@kafka_versions("all")
def test_acks_none(self):
start_offset0 = self.current_offset(self.topic, 0)
- start_offset1 = self.current_offset(self.topic, 1)
producer = SimpleProducer(self.client, req_acks=SimpleProducer.ACK_NOT_REQUIRED)
resp = producer.send_messages(self.topic, self.msg("one"))
@@ -231,7 +237,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
@kafka_versions("all")
def test_acks_local_write(self):
start_offset0 = self.current_offset(self.topic, 0)
- start_offset1 = self.current_offset(self.topic, 1)
producer = SimpleProducer(self.client, req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE)
resp = producer.send_messages(self.topic, self.msg("one"))
@@ -244,7 +249,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
@kafka_versions("all")
def test_acks_cluster_commit(self):
start_offset0 = self.current_offset(self.topic, 0)
- start_offset1 = self.current_offset(self.topic, 1)
producer = SimpleProducer(
self.client,
@@ -360,7 +364,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
@kafka_versions("all")
def test_async_simple_producer(self):
start_offset0 = self.current_offset(self.topic, 0)
- start_offset1 = self.current_offset(self.topic, 1)
producer = SimpleProducer(self.client, async=True)
resp = producer.send_messages(self.topic, self.msg("one"))
@@ -373,7 +376,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
@kafka_versions("all")
def test_async_keyed_producer(self):
start_offset0 = self.current_offset(self.topic, 0)
- start_offset1 = self.current_offset(self.topic, 1)
producer = KeyedProducer(self.client, partitioner = RoundRobinPartitioner, async=True)
diff --git a/test/test_protocol.py b/test/test_protocol.py
index 2089f48..f6e3c96 100644
--- a/test/test_protocol.py
+++ b/test/test_protocol.py
@@ -1,25 +1,18 @@
-import contextlib
-from contextlib import contextmanager
+from contextlib import contextmanager, nested
import struct
-import unittest2
-import mock
-from mock import sentinel
+import unittest2
+from mock import patch, sentinel
-from kafka import KafkaClient
from kafka.common import (
OffsetRequest, OffsetCommitRequest, OffsetFetchRequest,
OffsetResponse, OffsetCommitResponse, OffsetFetchResponse,
ProduceRequest, FetchRequest, Message, ChecksumError,
ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse, OffsetAndMessage,
- BrokerMetadata, PartitionMetadata, TopicAndPartition, KafkaUnavailableError,
- ProtocolError, LeaderUnavailableError, PartitionUnavailableError,
+ BrokerMetadata, PartitionMetadata, ProtocolError,
UnsupportedCodecError
)
-from kafka.codec import (
- has_snappy, gzip_encode, gzip_decode,
- snappy_encode, snappy_decode
-)
+from kafka.codec import has_snappy, gzip_decode, snappy_decode
import kafka.protocol
from kafka.protocol import (
ATTRIBUTE_CODEC_MASK, CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY, KafkaProtocol,
@@ -701,12 +694,12 @@ class TestProtocol(unittest2.TestCase):
@contextmanager
def mock_create_message_fns(self):
- patches = contextlib.nested(
- mock.patch.object(kafka.protocol, "create_message",
+ patches = nested(
+ patch.object(kafka.protocol, "create_message",
return_value=sentinel.message),
- mock.patch.object(kafka.protocol, "create_gzip_message",
+ patch.object(kafka.protocol, "create_gzip_message",
return_value=sentinel.gzip_message),
- mock.patch.object(kafka.protocol, "create_snappy_message",
+ patch.object(kafka.protocol, "create_snappy_message",
return_value=sentinel.snappy_message),
)
diff --git a/test/test_util.py b/test/test_util.py
index 7b5f294..dbc3fe6 100644
--- a/test/test_util.py
+++ b/test/test_util.py
@@ -1,6 +1,8 @@
# -*- coding: utf-8 -*-
import struct
+
import unittest2
+
import kafka.util
import kafka.common