diff options
author | Dana Powers <dana.powers@gmail.com> | 2014-09-07 12:17:54 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2014-09-07 12:17:54 -0700 |
commit | a99384f4c601d127ab1c4fe5b272ea5c07fd695d (patch) | |
tree | d559e3c3f650dab1ce9247aa7a89f41bdd410e46 | |
parent | 9856cc36d7742922133af0aa53767c8ed4731957 (diff) | |
parent | 1b282d21522d101f4129d5fc3e70e2b904d3b171 (diff) | |
download | kafka-python-a99384f4c601d127ab1c4fe5b272ea5c07fd695d.tar.gz |
Merge pull request #221 from dpkp/minor_cleanups
Minor cleanups
-rw-r--r-- | README.md | 49 | ||||
-rw-r--r-- | kafka/conn.py | 2 | ||||
-rw-r--r-- | kafka/consumer.py | 12 | ||||
-rw-r--r-- | kafka/producer.py | 10 | ||||
-rw-r--r-- | setup.py | 4 | ||||
-rw-r--r-- | test/test_client_integration.py | 12 | ||||
-rw-r--r-- | test/test_codec.py | 5 | ||||
-rw-r--r-- | test/test_conn.py | 4 | ||||
-rw-r--r-- | test/test_consumer.py | 16 | ||||
-rw-r--r-- | test/test_consumer_integration.py | 16 | ||||
-rw-r--r-- | test/test_failover_integration.py | 53 | ||||
-rw-r--r-- | test/test_producer.py | 8 | ||||
-rw-r--r-- | test/test_producer_integration.py | 24 | ||||
-rw-r--r-- | test/test_protocol.py | 25 | ||||
-rw-r--r-- | test/test_util.py | 2 |
15 files changed, 129 insertions, 113 deletions
@@ -37,13 +37,10 @@ Python versions ## High level ```python -from kafka.client import KafkaClient -from kafka.consumer import SimpleConsumer -from kafka.producer import SimpleProducer, KeyedProducer - -kafka = KafkaClient("localhost:9092") +from kafka import KafkaClient, SimpleProducer, SimpleConsumer # To send messages synchronously +kafka = KafkaClient("localhost:9092") producer = SimpleProducer(kafka) # Note that the application is responsible for encoding messages to type str @@ -97,9 +94,7 @@ kafka.close() ## Keyed messages ```python -from kafka.client import KafkaClient -from kafka.producer import KeyedProducer -from kafka.partitioner import HashedPartitioner, RoundRobinPartitioner +from kafka import KafkaClient, KeyedProducer, HashedPartitioner, RoundRobinPartitioner kafka = KafkaClient("localhost:9092") @@ -113,8 +108,7 @@ producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner) ## Multiprocess consumer ```python -from kafka.client import KafkaClient -from kafka.consumer import MultiProcessConsumer +from kafka import KafkaClient, MultiProcessConsumer kafka = KafkaClient("localhost:9092") @@ -135,10 +129,13 @@ for message in consumer.get_messages(count=5, block=True, timeout=4): ## Low level ```python -from kafka.client import KafkaClient +from kafka import KafkaClient +from kafka.protocol import KafkaProtocol, ProduceRequest + kafka = KafkaClient("localhost:9092") + req = ProduceRequest(topic="my-topic", partition=1, - messages=[KafkaProdocol.encode_message("some message")]) + messages=[KafkaProtocol.encode_message("some message")]) resps = kafka.send_produce_request(payloads=[req], fail_on_error=True) kafka.close() @@ -152,9 +149,18 @@ resps[0].offset # offset of the first message sent in this request Install with your favorite package manager +## Latest Release Pip: ```shell +pip install kafka-python +``` + +Releases are also listed at https://github.com/mumrah/kafka-python/releases + + +## Bleeding-Edge +```shell git clone https://github.com/mumrah/kafka-python pip install ./kafka-python ``` @@ -211,8 +217,21 @@ pip install python-snappy tox ``` -## Run a single unit test +## Run a subset of unit tests +```shell +# run protocol tests only +tox -- -v test.test_protocol +``` + ```shell +# test with pypy only +tox -e pypy +``` + +```shell +# Run only 1 test, and use python 2.7 +tox -e py27 -- -v --with-id --collect-only +# pick a test number from the list like #102 tox -e py27 -- -v --with-id 102 ``` @@ -233,11 +252,11 @@ and optionally set SCALA_VERSION (defaults to 2.8.0, but 2.10.1 is recommended) SCALA_VERSION=2.10.1 KAFKA_VERSION=trunk ./build_integration.sh ``` -Then run the tests against supported Kafka versions: +Then run the tests against supported Kafka versions, simply set the `KAFKA_VERSION` +env variable to the server build you want to use for testing: ```shell KAFKA_VERSION=0.8.0 tox KAFKA_VERSION=0.8.1 tox KAFKA_VERSION=0.8.1.1 tox KAFKA_VERSION=trunk tox ``` - diff --git a/kafka/conn.py b/kafka/conn.py index a1b0a80..a577eba 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -1,8 +1,8 @@ import copy import logging +from random import shuffle import socket import struct -from random import shuffle from threading import local from kafka.common import ConnectionError diff --git a/kafka/consumer.py b/kafka/consumer.py index 0935dd2..928bbac 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -8,12 +8,12 @@ from threading import Lock from multiprocessing import Process, Queue as MPQueue, Event, Value from Queue import Empty, Queue -import kafka +import kafka.common from kafka.common import ( - FetchRequest, - OffsetRequest, OffsetCommitRequest, - OffsetFetchRequest, - ConsumerFetchSizeTooSmall, ConsumerNoMoreData + FetchRequest, OffsetRequest, + OffsetCommitRequest, OffsetFetchRequest, + ConsumerFetchSizeTooSmall, ConsumerNoMoreData, + UnknownTopicOrPartitionError ) from kafka.util import ReentrantTimer @@ -114,7 +114,7 @@ class Consumer(object): try: kafka.common.check_error(resp) return resp.offset - except kafka.common.UnknownTopicOrPartitionError: + except UnknownTopicOrPartitionError: return 0 for partition in partitions: diff --git a/kafka/producer.py b/kafka/producer.py index 8a6bff0..b28a424 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -156,11 +156,11 @@ class Producer(object): Helper method to send produce requests @param: topic, name of topic for produce request -- type str @param: partition, partition number for produce request -- type int - @param: *msg, one or more message payloads -- type str + @param: *msg, one or more message payloads -- type bytes @returns: ResponseRequest returned by server raises on error - Note that msg type *must* be encoded to str by user. + Note that msg type *must* be encoded to bytes by user. Passing unicode message will not work, for example you should encode before calling send_messages via something like `unicode_message.encode('utf-8')` @@ -172,9 +172,9 @@ class Producer(object): if not isinstance(msg, (list, tuple)): raise TypeError("msg is not a list or tuple!") - # Raise TypeError if any message is not encoded as a str - if any(not isinstance(m, str) for m in msg): - raise TypeError("all produce message payloads must be type str") + # Raise TypeError if any message is not encoded as bytes + if any(not isinstance(m, bytes) for m in msg): + raise TypeError("all produce message payloads must be type bytes") if self.async: for m in msg: @@ -48,6 +48,10 @@ is also supported for message sets. "Intended Audience :: Developers", "License :: OSI Approved :: Apache Software License", "Programming Language :: Python", + "Programming Language :: Python :: 2", + "Programming Language :: Python :: 2.6", + "Programming Language :: Python :: 2.7", + "Programming Language :: Python :: Implementation :: PyPy", "Topic :: Software Development :: Libraries :: Python Modules" ] ) diff --git a/test/test_client_integration.py b/test/test_client_integration.py index b33d17c..b433146 100644 --- a/test/test_client_integration.py +++ b/test/test_client_integration.py @@ -2,10 +2,16 @@ import os import socket import unittest2 -import kafka -from kafka.common import * +from kafka.conn import KafkaConnection +from kafka.common import ( + FetchRequest, OffsetCommitRequest, OffsetFetchRequest, + KafkaTimeoutError +) + from test.fixtures import ZookeeperFixture, KafkaFixture -from test.testutil import * +from test.testutil import ( + KafkaIntegrationTestCase, get_open_port, kafka_versions, Timer +) class TestKafkaClientIntegration(KafkaIntegrationTestCase): @classmethod diff --git a/test/test_codec.py b/test/test_codec.py index 2e6f67e..0ee7ce0 100644 --- a/test/test_codec.py +++ b/test/test_codec.py @@ -5,10 +5,7 @@ from kafka.codec import ( has_snappy, gzip_encode, gzip_decode, snappy_encode, snappy_decode ) -from kafka.protocol import ( - create_gzip_message, create_message, create_snappy_message, KafkaProtocol -) -from testutil import * +from testutil import random_string class TestCodec(unittest2.TestCase): def test_gzip(self): diff --git a/test/test_conn.py b/test/test_conn.py index 5451398..931ace7 100644 --- a/test/test_conn.py +++ b/test/test_conn.py @@ -4,8 +4,8 @@ import struct import mock import unittest2 -from kafka.common import * -from kafka.conn import * +from kafka.common import ConnectionError +from kafka.conn import KafkaConnection, collect_hosts, DEFAULT_SOCKET_TIMEOUT_SECONDS class ConnTest(unittest2.TestCase): def setUp(self): diff --git a/test/test_consumer.py b/test/test_consumer.py index 778d76a..f70e292 100644 --- a/test/test_consumer.py +++ b/test/test_consumer.py @@ -1,22 +1,10 @@ -import os -import random -import struct import unittest2 -from mock import MagicMock, patch +from mock import MagicMock -from kafka import KafkaClient from kafka.consumer import SimpleConsumer -from kafka.common import ( - ProduceRequest, BrokerMetadata, PartitionMetadata, - TopicAndPartition, KafkaUnavailableError, - LeaderUnavailableError, PartitionUnavailableError -) -from kafka.protocol import ( - create_message, KafkaProtocol -) class TestKafkaConsumer(unittest2.TestCase): def test_non_integer_partitions(self): with self.assertRaises(AssertionError): - consumer = SimpleConsumer(MagicMock(), 'group', 'topic', partitions = [ '0' ]) + SimpleConsumer(MagicMock(), 'group', 'topic', partitions = [ '0' ]) diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 44dafe4..6576a32 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -1,11 +1,13 @@ import os -from datetime import datetime -from kafka import * # noqa -from kafka.common import * # noqa +from kafka import SimpleConsumer, MultiProcessConsumer, create_message +from kafka.common import ProduceRequest, ConsumerFetchSizeTooSmall from kafka.consumer import MAX_FETCH_BUFFER_SIZE_BYTES -from fixtures import ZookeeperFixture, KafkaFixture -from testutil import * + +from test.fixtures import ZookeeperFixture, KafkaFixture +from test.testutil import ( + KafkaIntegrationTestCase, kafka_versions, random_string, Timer +) class TestConsumerIntegration(KafkaIntegrationTestCase): @classmethod @@ -215,8 +217,8 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): @kafka_versions("0.8.1", "0.8.1.1") def test_offset_behavior__resuming_behavior(self): - msgs1 = self.send_messages(0, range(0, 100)) - msgs2 = self.send_messages(1, range(100, 200)) + self.send_messages(0, range(0, 100)) + self.send_messages(1, range(100, 200)) # Start a consumer consumer1 = self.consumer( diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py index 6c0e662..5e737b0 100644 --- a/test/test_failover_integration.py +++ b/test/test_failover_integration.py @@ -3,11 +3,14 @@ import os import time import unittest2 -from kafka import * # noqa -from kafka.common import * # noqa +from kafka import KafkaClient, SimpleConsumer +from kafka.common import TopicAndPartition, FailedPayloadsError, ConnectionError from kafka.producer import Producer -from fixtures import ZookeeperFixture, KafkaFixture -from testutil import * + +from test.fixtures import ZookeeperFixture, KafkaFixture +from test.testutil import ( + KafkaIntegrationTestCase, kafka_versions, random_string +) class TestFailover(KafkaIntegrationTestCase): @@ -42,16 +45,18 @@ class TestFailover(KafkaIntegrationTestCase): @kafka_versions("all") def test_switch_leader(self): - key, topic, partition = random_string(5), self.topic, 0 + topic = self.topic + partition = 0 # Test the base class Producer -- send_messages to a specific partition - producer = Producer(self.client, async=False) + producer = Producer(self.client, async=False, + req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT) # Send 10 random messages - self._send_random_messages(producer, topic, partition, 10) + self._send_random_messages(producer, topic, partition, 100) # kill leader for partition - broker = self._kill_leader(topic, partition) + self._kill_leader(topic, partition) # expect failure, but dont wait more than 60 secs to recover recovered = False @@ -71,20 +76,18 @@ class TestFailover(KafkaIntegrationTestCase): self.assertTrue(recovered) # send some more messages to new leader - self._send_random_messages(producer, topic, partition, 10) + self._send_random_messages(producer, topic, partition, 100) # count number of messages - count = self._count_messages('test_switch_leader group', topic, - partitions=(partition,)) - # Should be equal to 10 before + 1 recovery + 10 after - self.assertEquals(count, 21) + self.assert_message_count(topic, 201, partitions=(partition,)) #@kafka_versions("all") @unittest2.skip("async producer does not support reliable failover yet") def test_switch_leader_async(self): - key, topic, partition = random_string(5), self.topic, 0 + topic = self.topic + partition = 0 # Test the base class Producer -- send_messages to a specific partition producer = Producer(self.client, async=True) @@ -93,7 +96,7 @@ class TestFailover(KafkaIntegrationTestCase): self._send_random_messages(producer, topic, partition, 10) # kill leader for partition - broker = self._kill_leader(topic, partition) + self._kill_leader(topic, partition) logging.debug("attempting to send 'success' message after leader killed") @@ -109,12 +112,8 @@ class TestFailover(KafkaIntegrationTestCase): producer.stop() # count number of messages - count = self._count_messages('test_switch_leader_async group', topic, - partitions=(partition,)) - # Should be equal to 10 before + 1 recovery + 10 after - self.assertEquals(count, 21) - + self.assert_message_count(topic, 21, partitions=(partition,)) def _send_random_messages(self, producer, topic, partition, n): for j in range(n): @@ -130,17 +129,25 @@ class TestFailover(KafkaIntegrationTestCase): broker.close() return broker - def _count_messages(self, group, topic, timeout=1, partitions=None): + def assert_message_count(self, topic, check_count, timeout=10, partitions=None): hosts = ','.join(['%s:%d' % (broker.host, broker.port) for broker in self.brokers]) client = KafkaClient(hosts) + group = random_string(10) consumer = SimpleConsumer(client, group, topic, partitions=partitions, auto_commit=False, iter_timeout=timeout) - count = consumer.pending(partitions) + started_at = time.time() + pending = consumer.pending(partitions) + + # Keep checking if it isn't immediately correct, subject to timeout + while pending != check_count and (time.time() - started_at < timeout): + pending = consumer.pending(partitions) + consumer.stop() client.close() - return count + + self.assertEqual(pending, check_count) diff --git a/test/test_producer.py b/test/test_producer.py index a84e20f..e00f9af 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -1,14 +1,10 @@ # -*- coding: utf-8 -*- import logging -import os -import random -import struct -import unittest2 -from mock import MagicMock, patch +import unittest2 +from mock import MagicMock -from kafka import KafkaClient from kafka.producer import Producer class TestKafkaProducer(unittest2.TestCase): diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py index 7d3a180..19d3a6d 100644 --- a/test/test_producer_integration.py +++ b/test/test_producer_integration.py @@ -2,11 +2,18 @@ import os import time import uuid -from kafka import * # noqa -from kafka.common import * # noqa -from kafka.codec import has_gzip, has_snappy -from fixtures import ZookeeperFixture, KafkaFixture -from testutil import * +from kafka import ( + SimpleProducer, KeyedProducer, + create_message, create_gzip_message, create_snappy_message, + RoundRobinPartitioner, HashedPartitioner +) +from kafka.common import ( + FetchRequest, ProduceRequest, UnknownTopicOrPartitionError +) +from kafka.codec import has_snappy + +from test.fixtures import ZookeeperFixture, KafkaFixture +from test.testutil import KafkaIntegrationTestCase, kafka_versions class TestKafkaProducerIntegration(KafkaIntegrationTestCase): topic = 'produce_topic' @@ -149,7 +156,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): # At first it doesn't exist with self.assertRaises(UnknownTopicOrPartitionError): - resp = producer.send_messages(new_topic, self.msg("one")) + producer.send_messages(new_topic, self.msg("one")) @kafka_versions("all") def test_producer_random_order(self): @@ -219,7 +226,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): @kafka_versions("all") def test_acks_none(self): start_offset0 = self.current_offset(self.topic, 0) - start_offset1 = self.current_offset(self.topic, 1) producer = SimpleProducer(self.client, req_acks=SimpleProducer.ACK_NOT_REQUIRED) resp = producer.send_messages(self.topic, self.msg("one")) @@ -231,7 +237,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): @kafka_versions("all") def test_acks_local_write(self): start_offset0 = self.current_offset(self.topic, 0) - start_offset1 = self.current_offset(self.topic, 1) producer = SimpleProducer(self.client, req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE) resp = producer.send_messages(self.topic, self.msg("one")) @@ -244,7 +249,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): @kafka_versions("all") def test_acks_cluster_commit(self): start_offset0 = self.current_offset(self.topic, 0) - start_offset1 = self.current_offset(self.topic, 1) producer = SimpleProducer( self.client, @@ -360,7 +364,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): @kafka_versions("all") def test_async_simple_producer(self): start_offset0 = self.current_offset(self.topic, 0) - start_offset1 = self.current_offset(self.topic, 1) producer = SimpleProducer(self.client, async=True) resp = producer.send_messages(self.topic, self.msg("one")) @@ -373,7 +376,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): @kafka_versions("all") def test_async_keyed_producer(self): start_offset0 = self.current_offset(self.topic, 0) - start_offset1 = self.current_offset(self.topic, 1) producer = KeyedProducer(self.client, partitioner = RoundRobinPartitioner, async=True) diff --git a/test/test_protocol.py b/test/test_protocol.py index 2089f48..f6e3c96 100644 --- a/test/test_protocol.py +++ b/test/test_protocol.py @@ -1,25 +1,18 @@ -import contextlib -from contextlib import contextmanager +from contextlib import contextmanager, nested import struct -import unittest2 -import mock -from mock import sentinel +import unittest2 +from mock import patch, sentinel -from kafka import KafkaClient from kafka.common import ( OffsetRequest, OffsetCommitRequest, OffsetFetchRequest, OffsetResponse, OffsetCommitResponse, OffsetFetchResponse, ProduceRequest, FetchRequest, Message, ChecksumError, ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse, OffsetAndMessage, - BrokerMetadata, PartitionMetadata, TopicAndPartition, KafkaUnavailableError, - ProtocolError, LeaderUnavailableError, PartitionUnavailableError, + BrokerMetadata, PartitionMetadata, ProtocolError, UnsupportedCodecError ) -from kafka.codec import ( - has_snappy, gzip_encode, gzip_decode, - snappy_encode, snappy_decode -) +from kafka.codec import has_snappy, gzip_decode, snappy_decode import kafka.protocol from kafka.protocol import ( ATTRIBUTE_CODEC_MASK, CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY, KafkaProtocol, @@ -701,12 +694,12 @@ class TestProtocol(unittest2.TestCase): @contextmanager def mock_create_message_fns(self): - patches = contextlib.nested( - mock.patch.object(kafka.protocol, "create_message", + patches = nested( + patch.object(kafka.protocol, "create_message", return_value=sentinel.message), - mock.patch.object(kafka.protocol, "create_gzip_message", + patch.object(kafka.protocol, "create_gzip_message", return_value=sentinel.gzip_message), - mock.patch.object(kafka.protocol, "create_snappy_message", + patch.object(kafka.protocol, "create_snappy_message", return_value=sentinel.snappy_message), ) diff --git a/test/test_util.py b/test/test_util.py index 7b5f294..dbc3fe6 100644 --- a/test/test_util.py +++ b/test/test_util.py @@ -1,6 +1,8 @@ # -*- coding: utf-8 -*- import struct + import unittest2 + import kafka.util import kafka.common |