diff options
author | Dana Powers <dana.powers@rd.io> | 2014-08-25 10:15:16 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2014-08-25 12:54:54 -0700 |
commit | e151529078d53deca6254dee5b80e41e01226a7f (patch) | |
tree | 488ff0fb3a2ed26f52aa9f4b471ff3ff7060a938 | |
parent | d73d1690b16ea86a9fd51056f4d149f54a4dd8f0 (diff) | |
download | kafka-python-e151529078d53deca6254dee5b80e41e01226a7f.tar.gz |
Add pylint to tox.ini; test both kafka and test; default to error-checking only; fixup errors; skip kafka/queue.py
-rw-r--r-- | .travis.yml | 4 | ||||
-rw-r--r-- | kafka/client.py | 2 | ||||
-rw-r--r-- | kafka/conn.py | 6 | ||||
-rw-r--r-- | test/fixtures.py | 49 | ||||
-rw-r--r-- | test/service.py | 9 | ||||
-rw-r--r-- | test/test_client.py | 7 | ||||
-rw-r--r-- | test/test_client_integration.py | 8 | ||||
-rw-r--r-- | test/test_conn.py | 18 | ||||
-rw-r--r-- | test/testutil.py | 1 | ||||
-rw-r--r-- | tox.ini | 8 |
10 files changed, 54 insertions, 58 deletions
diff --git a/.travis.yml b/.travis.yml index 8eaaad6..b36670c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -6,7 +6,7 @@ python: - pypy env: - - + - UNIT_AND_LINT_ONLY=true - KAFKA_VERSION=0.8.0 - KAFKA_VERSION=0.8.1 - KAFKA_VERSION=0.8.1.1 @@ -35,4 +35,4 @@ deploy: # branch: master script: - - tox -e `./travis_selector.sh $TRAVIS_PYTHON_VERSION` + - if [ -n "$UNIT_AND_LINT_ONLY" ]; then tox -e lint,`./travis_selector.sh $TRAVIS_PYTHON_VERSION`; else tox -e `./travis_selector.sh $TRAVIS_PYTHON_VERSION`; fi diff --git a/kafka/client.py b/kafka/client.py index 9474091..8630f66 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -187,7 +187,7 @@ class KafkaClient(object): def _raise_on_response_error(self, resp): try: kafka.common.check_error(resp) - except (UnknownTopicOrPartitionError, NotLeaderForPartitionError) as e: + except (UnknownTopicOrPartitionError, NotLeaderForPartitionError): self.reset_topic_metadata(resp.topic) raise diff --git a/kafka/conn.py b/kafka/conn.py index 0d17cb8..a1b0a80 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -93,8 +93,8 @@ class KafkaConnection(local): # that the socket is in error. we will never get # more data from this socket if data == '': - raise socket.error('Not enough data to read message -- did server kill socket?') - + raise socket.error("Not enough data to read message -- did server kill socket?") + except socket.error: log.exception('Unable to receive data from Kafka') self._raise_connection_error() @@ -170,7 +170,7 @@ class KafkaConnection(local): except socket.error: pass - # Closing the socket should always succeed + # Closing the socket should always succeed self._sock.close() self._sock = None else: diff --git a/test/fixtures.py b/test/fixtures.py index adb0642..152777c 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -1,5 +1,4 @@ import logging -import glob import os import os.path import shutil @@ -9,8 +8,8 @@ import urllib2 import uuid from urlparse import urlparse -from service import ExternalService, SpawnedService -from testutil import get_open_port +from test.service import ExternalService, SpawnedService +from test.testutil import get_open_port class Fixture(object): kafka_version = os.environ.get('KAFKA_VERSION', '0.8.0') @@ -36,23 +35,23 @@ class Fixture(object): output_file = os.path.join(output_dir, distfile + '.tgz') if os.path.isfile(output_file): - logging.info("Found file already on disk: %s" % output_file) + logging.info("Found file already on disk: %s", output_file) return output_file # New tarballs are .tgz, older ones are sometimes .tar.gz try: url = url_base + distfile + '.tgz' - logging.info("Attempting to download %s" % (url,)) + logging.info("Attempting to download %s", url) response = urllib2.urlopen(url) except urllib2.HTTPError: logging.exception("HTTP Error") url = url_base + distfile + '.tar.gz' - logging.info("Attempting to download %s" % (url,)) + logging.info("Attempting to download %s", url) response = urllib2.urlopen(url) - logging.info("Saving distribution file to %s" % (output_file,)) - with open(os.path.join(output_dir, distfile + '.tgz'), 'w') as f: - f.write(response.read()) + logging.info("Saving distribution file to %s", output_file) + with open(output_file, 'w') as output_file_fd: + output_file_fd.write(response.read()) return output_file @@ -117,11 +116,9 @@ class ZookeeperFixture(Fixture): self.render_template(template, properties, vars(self)) # Configure Zookeeper child process - self.child = SpawnedService(args=self.kafka_run_class_args( - "org.apache.zookeeper.server.quorum.QuorumPeerMain", - properties), - env=self.kafka_run_class_env() - ) + args = self.kafka_run_class_args("org.apache.zookeeper.server.quorum.QuorumPeerMain", properties) + env = self.kafka_run_class_env() + self.child = SpawnedService(args, env) # Party! self.out("Starting...") @@ -162,7 +159,7 @@ class KafkaFixture(Fixture): self.zk_port = zk_port self.zk_chroot = zk_chroot - self.replicas = replicas + self.replicas = replicas self.partitions = partitions self.tmp_dir = None @@ -199,21 +196,19 @@ class KafkaFixture(Fixture): self.render_template(template, properties, vars(self)) # Configure Kafka child process - self.child = SpawnedService(args=self.kafka_run_class_args( - "kafka.Kafka", properties), - env=self.kafka_run_class_env() - ) + args = self.kafka_run_class_args("kafka.Kafka", properties) + env = self.kafka_run_class_env() + self.child = SpawnedService(args, env) # Party! self.out("Creating Zookeeper chroot node...") - proc = subprocess.Popen(self.kafka_run_class_args( - "org.apache.zookeeper.ZooKeeperMain", - "-server", "%s:%d" % (self.zk_host, self.zk_port), - "create", "/%s" % self.zk_chroot, "kafka-python" - ), - env=self.kafka_run_class_env(), - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) + args = self.kafka_run_class_args("org.apache.zookeeper.ZooKeeperMain", + "-server", "%s:%d" % (self.zk_host, self.zk_port), + "create", + "/%s" % self.zk_chroot, + "kafka-python") + env = self.kafka_run_class_env() + proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) if proc.wait() != 0: self.out("Failed to create Zookeeper chroot node") diff --git a/test/service.py b/test/service.py index df6c1ba..2f66120 100644 --- a/test/service.py +++ b/test/service.py @@ -2,7 +2,6 @@ import logging import re import select import subprocess -import sys import threading import time @@ -14,7 +13,7 @@ __all__ = [ class ExternalService(object): def __init__(self, host, port): - print("Using already running service at %s:%d" % (host, port)) + logging.info("Using already running service at %s:%d", host, port) self.host = host self.port = port @@ -26,9 +25,11 @@ class ExternalService(object): class SpawnedService(threading.Thread): - def __init__(self, args=[], env=None): + def __init__(self, args=None, env=None): threading.Thread.__init__(self) + if args is None: + raise TypeError("args parameter is required") self.args = args self.env = env self.captured_stdout = [] @@ -49,7 +50,7 @@ class SpawnedService(threading.Thread): alive = True while True: - (rds, wds, xds) = select.select([self.child.stdout, self.child.stderr], [], [], 1) + (rds, _, _) = select.select([self.child.stdout, self.child.stderr], [], [], 1) if self.child.stdout in rds: line = self.child.stdout.readline() diff --git a/test/test_client.py b/test/test_client.py index fe9beff..32a2256 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -1,6 +1,3 @@ -import os -import random -import struct import unittest2 from mock import MagicMock, patch @@ -11,9 +8,7 @@ from kafka.common import ( TopicAndPartition, KafkaUnavailableError, LeaderUnavailableError, PartitionUnavailableError ) -from kafka.protocol import ( - create_message, KafkaProtocol -) +from kafka.protocol import create_message class TestKafkaClient(unittest2.TestCase): def test_init_with_list(self): diff --git a/test/test_client_integration.py b/test/test_client_integration.py index 98f2473..b5bcb22 100644 --- a/test/test_client_integration.py +++ b/test/test_client_integration.py @@ -1,13 +1,11 @@ import os -import random import socket -import time import unittest2 import kafka from kafka.common import * -from fixtures import ZookeeperFixture, KafkaFixture -from testutil import * +from test.fixtures import ZookeeperFixture, KafkaFixture +from test.testutil import * class TestKafkaClientIntegration(KafkaIntegrationTestCase): @classmethod @@ -34,7 +32,7 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase): with Timer() as t: with self.assertRaises((socket.timeout, socket.error)): - conn = kafka.conn.KafkaConnection("localhost", server_port, 1.0) + kafka.conn.KafkaConnection("localhost", server_port, 1.0) self.assertGreaterEqual(t.interval, 1.0) @kafka_versions("all") diff --git a/test/test_conn.py b/test/test_conn.py index 184a99e..5451398 100644 --- a/test/test_conn.py +++ b/test/test_conn.py @@ -24,13 +24,13 @@ class ConnTest(unittest2.TestCase): self.addCleanup(patcher.stop) # Also mock socket.sendall() to appear successful - socket.create_connection().sendall.return_value = None + self.MockCreateConn().sendall.return_value = None # And mock socket.recv() to return two payloads, then '', then raise # Note that this currently ignores the num_bytes parameter to sock.recv() payload_size = len(self.config['payload']) payload2_size = len(self.config['payload2']) - socket.create_connection().recv.side_effect = [ + self.MockCreateConn().recv.side_effect = [ struct.pack('>i', payload_size), struct.pack('>%ds' % payload_size, self.config['payload']), struct.pack('>i', payload2_size), @@ -42,7 +42,7 @@ class ConnTest(unittest2.TestCase): self.conn = KafkaConnection(self.config['host'], self.config['port']) # Reset any mock counts caused by __init__ - socket.create_connection.reset_mock() + self.MockCreateConn.reset_mock() def test_collect_hosts__happy_path(self): hosts = "localhost:1234,localhost" @@ -81,7 +81,7 @@ class ConnTest(unittest2.TestCase): def test_init_creates_socket_connection(self): KafkaConnection(self.config['host'], self.config['port']) - socket.create_connection.assert_called_with((self.config['host'], self.config['port']), DEFAULT_SOCKET_TIMEOUT_SECONDS) + self.MockCreateConn.assert_called_with((self.config['host'], self.config['port']), DEFAULT_SOCKET_TIMEOUT_SECONDS) def test_init_failure_raises_connection_error(self): @@ -102,9 +102,9 @@ class ConnTest(unittest2.TestCase): pass # Now test that sending attempts to reconnect - self.assertEqual(socket.create_connection.call_count, 0) + self.assertEqual(self.MockCreateConn.call_count, 0) self.conn.send(self.config['request_id'], self.config['payload']) - self.assertEqual(socket.create_connection.call_count, 1) + self.assertEqual(self.MockCreateConn.call_count, 1) def test_send__failure_sets_dirty_connection(self): @@ -131,9 +131,9 @@ class ConnTest(unittest2.TestCase): pass # Now test that recv'ing attempts to reconnect - self.assertEqual(socket.create_connection.call_count, 0) + self.assertEqual(self.MockCreateConn.call_count, 0) self.conn.recv(self.config['request_id']) - self.assertEqual(socket.create_connection.call_count, 1) + self.assertEqual(self.MockCreateConn.call_count, 1) def test_recv__failure_sets_dirty_connection(self): @@ -160,5 +160,5 @@ class ConnTest(unittest2.TestCase): # will re-connect and send data to the socket self.conn.close() self.conn.send(self.config['request_id'], self.config['payload']) - self.assertEqual(socket.create_connection.call_count, 1) + self.assertEqual(self.MockCreateConn.call_count, 1) self.conn._sock.sendall.assert_called_with(self.config['payload']) diff --git a/test/testutil.py b/test/testutil.py index 4f5f6ee..dc8eea0 100644 --- a/test/testutil.py +++ b/test/testutil.py @@ -48,6 +48,7 @@ def get_open_port(): class KafkaIntegrationTestCase(unittest2.TestCase): create_client = True topic = None + server = None def setUp(self): super(KafkaIntegrationTestCase, self).setUp() @@ -1,5 +1,5 @@ [tox] -envlist = py26, py27, pypy +envlist = lint, py26, py27, pypy [testenv] deps = unittest2 @@ -11,3 +11,9 @@ commands = nosetests {posargs:-v --with-id --with-coverage --cover-erase --cover-package kafka} setenv = PROJECT_ROOT = {toxinidir} +[testenv:lint] +deps = + unittest2 + mock + pylint +commands = pylint {posargs: -E --ignore=queue.py kafka test} |