summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2014-08-25 10:15:16 -0700
committerDana Powers <dana.powers@rd.io>2014-08-25 12:54:54 -0700
commite151529078d53deca6254dee5b80e41e01226a7f (patch)
tree488ff0fb3a2ed26f52aa9f4b471ff3ff7060a938
parentd73d1690b16ea86a9fd51056f4d149f54a4dd8f0 (diff)
downloadkafka-python-e151529078d53deca6254dee5b80e41e01226a7f.tar.gz
Add pylint to tox.ini; test both kafka and test; default to error-checking only; fixup errors; skip kafka/queue.py
-rw-r--r--.travis.yml4
-rw-r--r--kafka/client.py2
-rw-r--r--kafka/conn.py6
-rw-r--r--test/fixtures.py49
-rw-r--r--test/service.py9
-rw-r--r--test/test_client.py7
-rw-r--r--test/test_client_integration.py8
-rw-r--r--test/test_conn.py18
-rw-r--r--test/testutil.py1
-rw-r--r--tox.ini8
10 files changed, 54 insertions, 58 deletions
diff --git a/.travis.yml b/.travis.yml
index 8eaaad6..b36670c 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -6,7 +6,7 @@ python:
- pypy
env:
- -
+ - UNIT_AND_LINT_ONLY=true
- KAFKA_VERSION=0.8.0
- KAFKA_VERSION=0.8.1
- KAFKA_VERSION=0.8.1.1
@@ -35,4 +35,4 @@ deploy:
# branch: master
script:
- - tox -e `./travis_selector.sh $TRAVIS_PYTHON_VERSION`
+ - if [ -n "$UNIT_AND_LINT_ONLY" ]; then tox -e lint,`./travis_selector.sh $TRAVIS_PYTHON_VERSION`; else tox -e `./travis_selector.sh $TRAVIS_PYTHON_VERSION`; fi
diff --git a/kafka/client.py b/kafka/client.py
index 9474091..8630f66 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -187,7 +187,7 @@ class KafkaClient(object):
def _raise_on_response_error(self, resp):
try:
kafka.common.check_error(resp)
- except (UnknownTopicOrPartitionError, NotLeaderForPartitionError) as e:
+ except (UnknownTopicOrPartitionError, NotLeaderForPartitionError):
self.reset_topic_metadata(resp.topic)
raise
diff --git a/kafka/conn.py b/kafka/conn.py
index 0d17cb8..a1b0a80 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -93,8 +93,8 @@ class KafkaConnection(local):
# that the socket is in error. we will never get
# more data from this socket
if data == '':
- raise socket.error('Not enough data to read message -- did server kill socket?')
-
+ raise socket.error("Not enough data to read message -- did server kill socket?")
+
except socket.error:
log.exception('Unable to receive data from Kafka')
self._raise_connection_error()
@@ -170,7 +170,7 @@ class KafkaConnection(local):
except socket.error:
pass
- # Closing the socket should always succeed
+ # Closing the socket should always succeed
self._sock.close()
self._sock = None
else:
diff --git a/test/fixtures.py b/test/fixtures.py
index adb0642..152777c 100644
--- a/test/fixtures.py
+++ b/test/fixtures.py
@@ -1,5 +1,4 @@
import logging
-import glob
import os
import os.path
import shutil
@@ -9,8 +8,8 @@ import urllib2
import uuid
from urlparse import urlparse
-from service import ExternalService, SpawnedService
-from testutil import get_open_port
+from test.service import ExternalService, SpawnedService
+from test.testutil import get_open_port
class Fixture(object):
kafka_version = os.environ.get('KAFKA_VERSION', '0.8.0')
@@ -36,23 +35,23 @@ class Fixture(object):
output_file = os.path.join(output_dir, distfile + '.tgz')
if os.path.isfile(output_file):
- logging.info("Found file already on disk: %s" % output_file)
+ logging.info("Found file already on disk: %s", output_file)
return output_file
# New tarballs are .tgz, older ones are sometimes .tar.gz
try:
url = url_base + distfile + '.tgz'
- logging.info("Attempting to download %s" % (url,))
+ logging.info("Attempting to download %s", url)
response = urllib2.urlopen(url)
except urllib2.HTTPError:
logging.exception("HTTP Error")
url = url_base + distfile + '.tar.gz'
- logging.info("Attempting to download %s" % (url,))
+ logging.info("Attempting to download %s", url)
response = urllib2.urlopen(url)
- logging.info("Saving distribution file to %s" % (output_file,))
- with open(os.path.join(output_dir, distfile + '.tgz'), 'w') as f:
- f.write(response.read())
+ logging.info("Saving distribution file to %s", output_file)
+ with open(output_file, 'w') as output_file_fd:
+ output_file_fd.write(response.read())
return output_file
@@ -117,11 +116,9 @@ class ZookeeperFixture(Fixture):
self.render_template(template, properties, vars(self))
# Configure Zookeeper child process
- self.child = SpawnedService(args=self.kafka_run_class_args(
- "org.apache.zookeeper.server.quorum.QuorumPeerMain",
- properties),
- env=self.kafka_run_class_env()
- )
+ args = self.kafka_run_class_args("org.apache.zookeeper.server.quorum.QuorumPeerMain", properties)
+ env = self.kafka_run_class_env()
+ self.child = SpawnedService(args, env)
# Party!
self.out("Starting...")
@@ -162,7 +159,7 @@ class KafkaFixture(Fixture):
self.zk_port = zk_port
self.zk_chroot = zk_chroot
- self.replicas = replicas
+ self.replicas = replicas
self.partitions = partitions
self.tmp_dir = None
@@ -199,21 +196,19 @@ class KafkaFixture(Fixture):
self.render_template(template, properties, vars(self))
# Configure Kafka child process
- self.child = SpawnedService(args=self.kafka_run_class_args(
- "kafka.Kafka", properties),
- env=self.kafka_run_class_env()
- )
+ args = self.kafka_run_class_args("kafka.Kafka", properties)
+ env = self.kafka_run_class_env()
+ self.child = SpawnedService(args, env)
# Party!
self.out("Creating Zookeeper chroot node...")
- proc = subprocess.Popen(self.kafka_run_class_args(
- "org.apache.zookeeper.ZooKeeperMain",
- "-server", "%s:%d" % (self.zk_host, self.zk_port),
- "create", "/%s" % self.zk_chroot, "kafka-python"
- ),
- env=self.kafka_run_class_env(),
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
+ args = self.kafka_run_class_args("org.apache.zookeeper.ZooKeeperMain",
+ "-server", "%s:%d" % (self.zk_host, self.zk_port),
+ "create",
+ "/%s" % self.zk_chroot,
+ "kafka-python")
+ env = self.kafka_run_class_env()
+ proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if proc.wait() != 0:
self.out("Failed to create Zookeeper chroot node")
diff --git a/test/service.py b/test/service.py
index df6c1ba..2f66120 100644
--- a/test/service.py
+++ b/test/service.py
@@ -2,7 +2,6 @@ import logging
import re
import select
import subprocess
-import sys
import threading
import time
@@ -14,7 +13,7 @@ __all__ = [
class ExternalService(object):
def __init__(self, host, port):
- print("Using already running service at %s:%d" % (host, port))
+ logging.info("Using already running service at %s:%d", host, port)
self.host = host
self.port = port
@@ -26,9 +25,11 @@ class ExternalService(object):
class SpawnedService(threading.Thread):
- def __init__(self, args=[], env=None):
+ def __init__(self, args=None, env=None):
threading.Thread.__init__(self)
+ if args is None:
+ raise TypeError("args parameter is required")
self.args = args
self.env = env
self.captured_stdout = []
@@ -49,7 +50,7 @@ class SpawnedService(threading.Thread):
alive = True
while True:
- (rds, wds, xds) = select.select([self.child.stdout, self.child.stderr], [], [], 1)
+ (rds, _, _) = select.select([self.child.stdout, self.child.stderr], [], [], 1)
if self.child.stdout in rds:
line = self.child.stdout.readline()
diff --git a/test/test_client.py b/test/test_client.py
index fe9beff..32a2256 100644
--- a/test/test_client.py
+++ b/test/test_client.py
@@ -1,6 +1,3 @@
-import os
-import random
-import struct
import unittest2
from mock import MagicMock, patch
@@ -11,9 +8,7 @@ from kafka.common import (
TopicAndPartition, KafkaUnavailableError,
LeaderUnavailableError, PartitionUnavailableError
)
-from kafka.protocol import (
- create_message, KafkaProtocol
-)
+from kafka.protocol import create_message
class TestKafkaClient(unittest2.TestCase):
def test_init_with_list(self):
diff --git a/test/test_client_integration.py b/test/test_client_integration.py
index 98f2473..b5bcb22 100644
--- a/test/test_client_integration.py
+++ b/test/test_client_integration.py
@@ -1,13 +1,11 @@
import os
-import random
import socket
-import time
import unittest2
import kafka
from kafka.common import *
-from fixtures import ZookeeperFixture, KafkaFixture
-from testutil import *
+from test.fixtures import ZookeeperFixture, KafkaFixture
+from test.testutil import *
class TestKafkaClientIntegration(KafkaIntegrationTestCase):
@classmethod
@@ -34,7 +32,7 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase):
with Timer() as t:
with self.assertRaises((socket.timeout, socket.error)):
- conn = kafka.conn.KafkaConnection("localhost", server_port, 1.0)
+ kafka.conn.KafkaConnection("localhost", server_port, 1.0)
self.assertGreaterEqual(t.interval, 1.0)
@kafka_versions("all")
diff --git a/test/test_conn.py b/test/test_conn.py
index 184a99e..5451398 100644
--- a/test/test_conn.py
+++ b/test/test_conn.py
@@ -24,13 +24,13 @@ class ConnTest(unittest2.TestCase):
self.addCleanup(patcher.stop)
# Also mock socket.sendall() to appear successful
- socket.create_connection().sendall.return_value = None
+ self.MockCreateConn().sendall.return_value = None
# And mock socket.recv() to return two payloads, then '', then raise
# Note that this currently ignores the num_bytes parameter to sock.recv()
payload_size = len(self.config['payload'])
payload2_size = len(self.config['payload2'])
- socket.create_connection().recv.side_effect = [
+ self.MockCreateConn().recv.side_effect = [
struct.pack('>i', payload_size),
struct.pack('>%ds' % payload_size, self.config['payload']),
struct.pack('>i', payload2_size),
@@ -42,7 +42,7 @@ class ConnTest(unittest2.TestCase):
self.conn = KafkaConnection(self.config['host'], self.config['port'])
# Reset any mock counts caused by __init__
- socket.create_connection.reset_mock()
+ self.MockCreateConn.reset_mock()
def test_collect_hosts__happy_path(self):
hosts = "localhost:1234,localhost"
@@ -81,7 +81,7 @@ class ConnTest(unittest2.TestCase):
def test_init_creates_socket_connection(self):
KafkaConnection(self.config['host'], self.config['port'])
- socket.create_connection.assert_called_with((self.config['host'], self.config['port']), DEFAULT_SOCKET_TIMEOUT_SECONDS)
+ self.MockCreateConn.assert_called_with((self.config['host'], self.config['port']), DEFAULT_SOCKET_TIMEOUT_SECONDS)
def test_init_failure_raises_connection_error(self):
@@ -102,9 +102,9 @@ class ConnTest(unittest2.TestCase):
pass
# Now test that sending attempts to reconnect
- self.assertEqual(socket.create_connection.call_count, 0)
+ self.assertEqual(self.MockCreateConn.call_count, 0)
self.conn.send(self.config['request_id'], self.config['payload'])
- self.assertEqual(socket.create_connection.call_count, 1)
+ self.assertEqual(self.MockCreateConn.call_count, 1)
def test_send__failure_sets_dirty_connection(self):
@@ -131,9 +131,9 @@ class ConnTest(unittest2.TestCase):
pass
# Now test that recv'ing attempts to reconnect
- self.assertEqual(socket.create_connection.call_count, 0)
+ self.assertEqual(self.MockCreateConn.call_count, 0)
self.conn.recv(self.config['request_id'])
- self.assertEqual(socket.create_connection.call_count, 1)
+ self.assertEqual(self.MockCreateConn.call_count, 1)
def test_recv__failure_sets_dirty_connection(self):
@@ -160,5 +160,5 @@ class ConnTest(unittest2.TestCase):
# will re-connect and send data to the socket
self.conn.close()
self.conn.send(self.config['request_id'], self.config['payload'])
- self.assertEqual(socket.create_connection.call_count, 1)
+ self.assertEqual(self.MockCreateConn.call_count, 1)
self.conn._sock.sendall.assert_called_with(self.config['payload'])
diff --git a/test/testutil.py b/test/testutil.py
index 4f5f6ee..dc8eea0 100644
--- a/test/testutil.py
+++ b/test/testutil.py
@@ -48,6 +48,7 @@ def get_open_port():
class KafkaIntegrationTestCase(unittest2.TestCase):
create_client = True
topic = None
+ server = None
def setUp(self):
super(KafkaIntegrationTestCase, self).setUp()
diff --git a/tox.ini b/tox.ini
index 09ec1e3..068118b 100644
--- a/tox.ini
+++ b/tox.ini
@@ -1,5 +1,5 @@
[tox]
-envlist = py26, py27, pypy
+envlist = lint, py26, py27, pypy
[testenv]
deps =
unittest2
@@ -11,3 +11,9 @@ commands =
nosetests {posargs:-v --with-id --with-coverage --cover-erase --cover-package kafka}
setenv =
PROJECT_ROOT = {toxinidir}
+[testenv:lint]
+deps =
+ unittest2
+ mock
+ pylint
+commands = pylint {posargs: -E --ignore=queue.py kafka test}