summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2014-09-01 17:24:50 -0700
committerDana Powers <dana.powers@rd.io>2014-09-04 18:05:05 -0700
commit1b282d21522d101f4129d5fc3e70e2b904d3b171 (patch)
treed559e3c3f650dab1ce9247aa7a89f41bdd410e46
parentb0f85932216ddd4083b67a5e0595636f4d7b25ce (diff)
downloadkafka-python-1b282d21522d101f4129d5fc3e70e2b904d3b171.tar.gz
Cleanup tests: no more import *; remove unused
-rw-r--r--test/test_client_integration.py13
-rw-r--r--test/test_codec.py5
-rw-r--r--test/test_conn.py4
-rw-r--r--test/test_consumer.py16
-rw-r--r--test/test_consumer_integration.py5
-rw-r--r--test/test_failover_integration.py11
-rw-r--r--test/test_producer.py8
-rw-r--r--test/test_producer_integration.py9
-rw-r--r--test/test_protocol.py9
9 files changed, 26 insertions, 54 deletions
diff --git a/test/test_client_integration.py b/test/test_client_integration.py
index ba84ef6..b433146 100644
--- a/test/test_client_integration.py
+++ b/test/test_client_integration.py
@@ -2,11 +2,16 @@ import os
import socket
import unittest2
-import kafka
-from kafka.common import *
+from kafka.conn import KafkaConnection
+from kafka.common import (
+ FetchRequest, OffsetCommitRequest, OffsetFetchRequest,
+ KafkaTimeoutError
+)
-from .fixtures import ZookeeperFixture, KafkaFixture
-from .testutil import *
+from test.fixtures import ZookeeperFixture, KafkaFixture
+from test.testutil import (
+ KafkaIntegrationTestCase, get_open_port, kafka_versions, Timer
+)
class TestKafkaClientIntegration(KafkaIntegrationTestCase):
@classmethod
diff --git a/test/test_codec.py b/test/test_codec.py
index 2e6f67e..0ee7ce0 100644
--- a/test/test_codec.py
+++ b/test/test_codec.py
@@ -5,10 +5,7 @@ from kafka.codec import (
has_snappy, gzip_encode, gzip_decode,
snappy_encode, snappy_decode
)
-from kafka.protocol import (
- create_gzip_message, create_message, create_snappy_message, KafkaProtocol
-)
-from testutil import *
+from testutil import random_string
class TestCodec(unittest2.TestCase):
def test_gzip(self):
diff --git a/test/test_conn.py b/test/test_conn.py
index 5451398..931ace7 100644
--- a/test/test_conn.py
+++ b/test/test_conn.py
@@ -4,8 +4,8 @@ import struct
import mock
import unittest2
-from kafka.common import *
-from kafka.conn import *
+from kafka.common import ConnectionError
+from kafka.conn import KafkaConnection, collect_hosts, DEFAULT_SOCKET_TIMEOUT_SECONDS
class ConnTest(unittest2.TestCase):
def setUp(self):
diff --git a/test/test_consumer.py b/test/test_consumer.py
index 778d76a..f70e292 100644
--- a/test/test_consumer.py
+++ b/test/test_consumer.py
@@ -1,22 +1,10 @@
-import os
-import random
-import struct
import unittest2
-from mock import MagicMock, patch
+from mock import MagicMock
-from kafka import KafkaClient
from kafka.consumer import SimpleConsumer
-from kafka.common import (
- ProduceRequest, BrokerMetadata, PartitionMetadata,
- TopicAndPartition, KafkaUnavailableError,
- LeaderUnavailableError, PartitionUnavailableError
-)
-from kafka.protocol import (
- create_message, KafkaProtocol
-)
class TestKafkaConsumer(unittest2.TestCase):
def test_non_integer_partitions(self):
with self.assertRaises(AssertionError):
- consumer = SimpleConsumer(MagicMock(), 'group', 'topic', partitions = [ '0' ])
+ SimpleConsumer(MagicMock(), 'group', 'topic', partitions = [ '0' ])
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index 2405faf..6576a32 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -1,4 +1,3 @@
-from datetime import datetime
import os
from kafka import SimpleConsumer, MultiProcessConsumer, create_message
@@ -218,8 +217,8 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
@kafka_versions("0.8.1", "0.8.1.1")
def test_offset_behavior__resuming_behavior(self):
- msgs1 = self.send_messages(0, range(0, 100))
- msgs2 = self.send_messages(1, range(100, 200))
+ self.send_messages(0, range(0, 100))
+ self.send_messages(1, range(100, 200))
# Start a consumer
consumer1 = self.consumer(
diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py
index 51b791f..5e737b0 100644
--- a/test/test_failover_integration.py
+++ b/test/test_failover_integration.py
@@ -4,7 +4,7 @@ import time
import unittest2
from kafka import KafkaClient, SimpleConsumer
-from kafka.common import TopicAndPartition, FailedPayloadsError
+from kafka.common import TopicAndPartition, FailedPayloadsError, ConnectionError
from kafka.producer import Producer
from test.fixtures import ZookeeperFixture, KafkaFixture
@@ -45,7 +45,6 @@ class TestFailover(KafkaIntegrationTestCase):
@kafka_versions("all")
def test_switch_leader(self):
- key = random_string(5)
topic = self.topic
partition = 0
@@ -57,7 +56,7 @@ class TestFailover(KafkaIntegrationTestCase):
self._send_random_messages(producer, topic, partition, 100)
# kill leader for partition
- broker = self._kill_leader(topic, partition)
+ self._kill_leader(topic, partition)
# expect failure, but dont wait more than 60 secs to recover
recovered = False
@@ -87,7 +86,6 @@ class TestFailover(KafkaIntegrationTestCase):
#@kafka_versions("all")
@unittest2.skip("async producer does not support reliable failover yet")
def test_switch_leader_async(self):
- key = random_string(5)
topic = self.topic
partition = 0
@@ -98,7 +96,7 @@ class TestFailover(KafkaIntegrationTestCase):
self._send_random_messages(producer, topic, partition, 10)
# kill leader for partition
- broker = self._kill_leader(topic, partition)
+ self._kill_leader(topic, partition)
logging.debug("attempting to send 'success' message after leader killed")
@@ -115,8 +113,7 @@ class TestFailover(KafkaIntegrationTestCase):
# count number of messages
# Should be equal to 10 before + 1 recovery + 10 after
- count = self.assert_message_count(topic, 21, partitions=(partition,))
-
+ self.assert_message_count(topic, 21, partitions=(partition,))
def _send_random_messages(self, producer, topic, partition, n):
for j in range(n):
diff --git a/test/test_producer.py b/test/test_producer.py
index a84e20f..e00f9af 100644
--- a/test/test_producer.py
+++ b/test/test_producer.py
@@ -1,14 +1,10 @@
# -*- coding: utf-8 -*-
import logging
-import os
-import random
-import struct
-import unittest2
-from mock import MagicMock, patch
+import unittest2
+from mock import MagicMock
-from kafka import KafkaClient
from kafka.producer import Producer
class TestKafkaProducer(unittest2.TestCase):
diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py
index edffa5e..19d3a6d 100644
--- a/test/test_producer_integration.py
+++ b/test/test_producer_integration.py
@@ -10,7 +10,7 @@ from kafka import (
from kafka.common import (
FetchRequest, ProduceRequest, UnknownTopicOrPartitionError
)
-from kafka.codec import has_gzip, has_snappy
+from kafka.codec import has_snappy
from test.fixtures import ZookeeperFixture, KafkaFixture
from test.testutil import KafkaIntegrationTestCase, kafka_versions
@@ -156,7 +156,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
# At first it doesn't exist
with self.assertRaises(UnknownTopicOrPartitionError):
- resp = producer.send_messages(new_topic, self.msg("one"))
+ producer.send_messages(new_topic, self.msg("one"))
@kafka_versions("all")
def test_producer_random_order(self):
@@ -226,7 +226,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
@kafka_versions("all")
def test_acks_none(self):
start_offset0 = self.current_offset(self.topic, 0)
- start_offset1 = self.current_offset(self.topic, 1)
producer = SimpleProducer(self.client, req_acks=SimpleProducer.ACK_NOT_REQUIRED)
resp = producer.send_messages(self.topic, self.msg("one"))
@@ -238,7 +237,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
@kafka_versions("all")
def test_acks_local_write(self):
start_offset0 = self.current_offset(self.topic, 0)
- start_offset1 = self.current_offset(self.topic, 1)
producer = SimpleProducer(self.client, req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE)
resp = producer.send_messages(self.topic, self.msg("one"))
@@ -251,7 +249,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
@kafka_versions("all")
def test_acks_cluster_commit(self):
start_offset0 = self.current_offset(self.topic, 0)
- start_offset1 = self.current_offset(self.topic, 1)
producer = SimpleProducer(
self.client,
@@ -367,7 +364,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
@kafka_versions("all")
def test_async_simple_producer(self):
start_offset0 = self.current_offset(self.topic, 0)
- start_offset1 = self.current_offset(self.topic, 1)
producer = SimpleProducer(self.client, async=True)
resp = producer.send_messages(self.topic, self.msg("one"))
@@ -380,7 +376,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
@kafka_versions("all")
def test_async_keyed_producer(self):
start_offset0 = self.current_offset(self.topic, 0)
- start_offset1 = self.current_offset(self.topic, 1)
producer = KeyedProducer(self.client, partitioner = RoundRobinPartitioner, async=True)
diff --git a/test/test_protocol.py b/test/test_protocol.py
index b1c5201..f6e3c96 100644
--- a/test/test_protocol.py
+++ b/test/test_protocol.py
@@ -4,20 +4,15 @@ import struct
import unittest2
from mock import patch, sentinel
-from kafka import KafkaClient
from kafka.common import (
OffsetRequest, OffsetCommitRequest, OffsetFetchRequest,
OffsetResponse, OffsetCommitResponse, OffsetFetchResponse,
ProduceRequest, FetchRequest, Message, ChecksumError,
ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse, OffsetAndMessage,
- BrokerMetadata, PartitionMetadata, TopicAndPartition, KafkaUnavailableError,
- ProtocolError, LeaderUnavailableError, PartitionUnavailableError,
+ BrokerMetadata, PartitionMetadata, ProtocolError,
UnsupportedCodecError
)
-from kafka.codec import (
- has_snappy, gzip_encode, gzip_decode,
- snappy_encode, snappy_decode
-)
+from kafka.codec import has_snappy, gzip_decode, snappy_decode
import kafka.protocol
from kafka.protocol import (
ATTRIBUTE_CODEC_MASK, CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY, KafkaProtocol,