diff options
author | Dana Powers <dana.powers@rd.io> | 2014-09-01 17:24:50 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2014-09-04 18:05:05 -0700 |
commit | 1b282d21522d101f4129d5fc3e70e2b904d3b171 (patch) | |
tree | d559e3c3f650dab1ce9247aa7a89f41bdd410e46 | |
parent | b0f85932216ddd4083b67a5e0595636f4d7b25ce (diff) | |
download | kafka-python-1b282d21522d101f4129d5fc3e70e2b904d3b171.tar.gz |
Cleanup tests: no more import *; remove unused
-rw-r--r-- | test/test_client_integration.py | 13 | ||||
-rw-r--r-- | test/test_codec.py | 5 | ||||
-rw-r--r-- | test/test_conn.py | 4 | ||||
-rw-r--r-- | test/test_consumer.py | 16 | ||||
-rw-r--r-- | test/test_consumer_integration.py | 5 | ||||
-rw-r--r-- | test/test_failover_integration.py | 11 | ||||
-rw-r--r-- | test/test_producer.py | 8 | ||||
-rw-r--r-- | test/test_producer_integration.py | 9 | ||||
-rw-r--r-- | test/test_protocol.py | 9 |
9 files changed, 26 insertions, 54 deletions
diff --git a/test/test_client_integration.py b/test/test_client_integration.py index ba84ef6..b433146 100644 --- a/test/test_client_integration.py +++ b/test/test_client_integration.py @@ -2,11 +2,16 @@ import os import socket import unittest2 -import kafka -from kafka.common import * +from kafka.conn import KafkaConnection +from kafka.common import ( + FetchRequest, OffsetCommitRequest, OffsetFetchRequest, + KafkaTimeoutError +) -from .fixtures import ZookeeperFixture, KafkaFixture -from .testutil import * +from test.fixtures import ZookeeperFixture, KafkaFixture +from test.testutil import ( + KafkaIntegrationTestCase, get_open_port, kafka_versions, Timer +) class TestKafkaClientIntegration(KafkaIntegrationTestCase): @classmethod diff --git a/test/test_codec.py b/test/test_codec.py index 2e6f67e..0ee7ce0 100644 --- a/test/test_codec.py +++ b/test/test_codec.py @@ -5,10 +5,7 @@ from kafka.codec import ( has_snappy, gzip_encode, gzip_decode, snappy_encode, snappy_decode ) -from kafka.protocol import ( - create_gzip_message, create_message, create_snappy_message, KafkaProtocol -) -from testutil import * +from testutil import random_string class TestCodec(unittest2.TestCase): def test_gzip(self): diff --git a/test/test_conn.py b/test/test_conn.py index 5451398..931ace7 100644 --- a/test/test_conn.py +++ b/test/test_conn.py @@ -4,8 +4,8 @@ import struct import mock import unittest2 -from kafka.common import * -from kafka.conn import * +from kafka.common import ConnectionError +from kafka.conn import KafkaConnection, collect_hosts, DEFAULT_SOCKET_TIMEOUT_SECONDS class ConnTest(unittest2.TestCase): def setUp(self): diff --git a/test/test_consumer.py b/test/test_consumer.py index 778d76a..f70e292 100644 --- a/test/test_consumer.py +++ b/test/test_consumer.py @@ -1,22 +1,10 @@ -import os -import random -import struct import unittest2 -from mock import MagicMock, patch +from mock import MagicMock -from kafka import KafkaClient from kafka.consumer import SimpleConsumer -from kafka.common import ( - ProduceRequest, BrokerMetadata, PartitionMetadata, - TopicAndPartition, KafkaUnavailableError, - LeaderUnavailableError, PartitionUnavailableError -) -from kafka.protocol import ( - create_message, KafkaProtocol -) class TestKafkaConsumer(unittest2.TestCase): def test_non_integer_partitions(self): with self.assertRaises(AssertionError): - consumer = SimpleConsumer(MagicMock(), 'group', 'topic', partitions = [ '0' ]) + SimpleConsumer(MagicMock(), 'group', 'topic', partitions = [ '0' ]) diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 2405faf..6576a32 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -1,4 +1,3 @@ -from datetime import datetime import os from kafka import SimpleConsumer, MultiProcessConsumer, create_message @@ -218,8 +217,8 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): @kafka_versions("0.8.1", "0.8.1.1") def test_offset_behavior__resuming_behavior(self): - msgs1 = self.send_messages(0, range(0, 100)) - msgs2 = self.send_messages(1, range(100, 200)) + self.send_messages(0, range(0, 100)) + self.send_messages(1, range(100, 200)) # Start a consumer consumer1 = self.consumer( diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py index 51b791f..5e737b0 100644 --- a/test/test_failover_integration.py +++ b/test/test_failover_integration.py @@ -4,7 +4,7 @@ import time import unittest2 from kafka import KafkaClient, SimpleConsumer -from kafka.common import TopicAndPartition, FailedPayloadsError +from kafka.common import TopicAndPartition, FailedPayloadsError, ConnectionError from kafka.producer import Producer from test.fixtures import ZookeeperFixture, KafkaFixture @@ -45,7 +45,6 @@ class TestFailover(KafkaIntegrationTestCase): @kafka_versions("all") def test_switch_leader(self): - key = random_string(5) topic = self.topic partition = 0 @@ -57,7 +56,7 @@ class TestFailover(KafkaIntegrationTestCase): self._send_random_messages(producer, topic, partition, 100) # kill leader for partition - broker = self._kill_leader(topic, partition) + self._kill_leader(topic, partition) # expect failure, but dont wait more than 60 secs to recover recovered = False @@ -87,7 +86,6 @@ class TestFailover(KafkaIntegrationTestCase): #@kafka_versions("all") @unittest2.skip("async producer does not support reliable failover yet") def test_switch_leader_async(self): - key = random_string(5) topic = self.topic partition = 0 @@ -98,7 +96,7 @@ class TestFailover(KafkaIntegrationTestCase): self._send_random_messages(producer, topic, partition, 10) # kill leader for partition - broker = self._kill_leader(topic, partition) + self._kill_leader(topic, partition) logging.debug("attempting to send 'success' message after leader killed") @@ -115,8 +113,7 @@ class TestFailover(KafkaIntegrationTestCase): # count number of messages # Should be equal to 10 before + 1 recovery + 10 after - count = self.assert_message_count(topic, 21, partitions=(partition,)) - + self.assert_message_count(topic, 21, partitions=(partition,)) def _send_random_messages(self, producer, topic, partition, n): for j in range(n): diff --git a/test/test_producer.py b/test/test_producer.py index a84e20f..e00f9af 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -1,14 +1,10 @@ # -*- coding: utf-8 -*- import logging -import os -import random -import struct -import unittest2 -from mock import MagicMock, patch +import unittest2 +from mock import MagicMock -from kafka import KafkaClient from kafka.producer import Producer class TestKafkaProducer(unittest2.TestCase): diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py index edffa5e..19d3a6d 100644 --- a/test/test_producer_integration.py +++ b/test/test_producer_integration.py @@ -10,7 +10,7 @@ from kafka import ( from kafka.common import ( FetchRequest, ProduceRequest, UnknownTopicOrPartitionError ) -from kafka.codec import has_gzip, has_snappy +from kafka.codec import has_snappy from test.fixtures import ZookeeperFixture, KafkaFixture from test.testutil import KafkaIntegrationTestCase, kafka_versions @@ -156,7 +156,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): # At first it doesn't exist with self.assertRaises(UnknownTopicOrPartitionError): - resp = producer.send_messages(new_topic, self.msg("one")) + producer.send_messages(new_topic, self.msg("one")) @kafka_versions("all") def test_producer_random_order(self): @@ -226,7 +226,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): @kafka_versions("all") def test_acks_none(self): start_offset0 = self.current_offset(self.topic, 0) - start_offset1 = self.current_offset(self.topic, 1) producer = SimpleProducer(self.client, req_acks=SimpleProducer.ACK_NOT_REQUIRED) resp = producer.send_messages(self.topic, self.msg("one")) @@ -238,7 +237,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): @kafka_versions("all") def test_acks_local_write(self): start_offset0 = self.current_offset(self.topic, 0) - start_offset1 = self.current_offset(self.topic, 1) producer = SimpleProducer(self.client, req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE) resp = producer.send_messages(self.topic, self.msg("one")) @@ -251,7 +249,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): @kafka_versions("all") def test_acks_cluster_commit(self): start_offset0 = self.current_offset(self.topic, 0) - start_offset1 = self.current_offset(self.topic, 1) producer = SimpleProducer( self.client, @@ -367,7 +364,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): @kafka_versions("all") def test_async_simple_producer(self): start_offset0 = self.current_offset(self.topic, 0) - start_offset1 = self.current_offset(self.topic, 1) producer = SimpleProducer(self.client, async=True) resp = producer.send_messages(self.topic, self.msg("one")) @@ -380,7 +376,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): @kafka_versions("all") def test_async_keyed_producer(self): start_offset0 = self.current_offset(self.topic, 0) - start_offset1 = self.current_offset(self.topic, 1) producer = KeyedProducer(self.client, partitioner = RoundRobinPartitioner, async=True) diff --git a/test/test_protocol.py b/test/test_protocol.py index b1c5201..f6e3c96 100644 --- a/test/test_protocol.py +++ b/test/test_protocol.py @@ -4,20 +4,15 @@ import struct import unittest2 from mock import patch, sentinel -from kafka import KafkaClient from kafka.common import ( OffsetRequest, OffsetCommitRequest, OffsetFetchRequest, OffsetResponse, OffsetCommitResponse, OffsetFetchResponse, ProduceRequest, FetchRequest, Message, ChecksumError, ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse, OffsetAndMessage, - BrokerMetadata, PartitionMetadata, TopicAndPartition, KafkaUnavailableError, - ProtocolError, LeaderUnavailableError, PartitionUnavailableError, + BrokerMetadata, PartitionMetadata, ProtocolError, UnsupportedCodecError ) -from kafka.codec import ( - has_snappy, gzip_encode, gzip_decode, - snappy_encode, snappy_decode -) +from kafka.codec import has_snappy, gzip_decode, snappy_decode import kafka.protocol from kafka.protocol import ( ATTRIBUTE_CODEC_MASK, CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY, KafkaProtocol, |