diff options
author | Dana Powers <dana.powers@rd.io> | 2014-09-01 01:24:35 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2014-09-01 01:24:35 -0700 |
commit | 1954f385d15f8d2d8608b13f2c5b175db37d38f8 (patch) | |
tree | c2aff041cf7371ec9e34c06ccfc8b5594259fa08 | |
parent | 8ca55d6a7458c593e307879f285f6d7f53a8219b (diff) | |
download | kafka-python-1954f385d15f8d2d8608b13f2c5b175db37d38f8.tar.gz |
Fix socket timeout test -- mock the side_effect
-rw-r--r-- | test/test_client.py | 20 | ||||
-rw-r--r-- | test/test_client_integration.py | 11 |
2 files changed, 19 insertions, 12 deletions
diff --git a/test/test_client.py b/test/test_client.py index 32a2256..6a80bbb 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -1,3 +1,5 @@ +import socket +from time import sleep import unittest2 from mock import MagicMock, patch @@ -6,10 +8,14 @@ from kafka import KafkaClient from kafka.common import ( ProduceRequest, BrokerMetadata, PartitionMetadata, TopicAndPartition, KafkaUnavailableError, - LeaderUnavailableError, PartitionUnavailableError + LeaderUnavailableError, PartitionUnavailableError, + ConnectionError ) +from kafka.conn import KafkaConnection from kafka.protocol import create_message +from test.testutil import Timer + class TestKafkaClient(unittest2.TestCase): def test_init_with_list(self): with patch.object(KafkaClient, 'load_metadata_for_topics'): @@ -242,3 +248,15 @@ class TestKafkaClient(unittest2.TestCase): with self.assertRaises(LeaderUnavailableError): client.send_produce_request(requests) + def test_timeout(self): + def _timeout(*args, **kwargs): + timeout = args[1] + sleep(timeout) + raise socket.timeout + + with patch.object(socket, "create_connection", side_effect=_timeout): + + with Timer() as t: + with self.assertRaises(ConnectionError): + KafkaConnection("nowhere", 1234, 1.0) + self.assertGreaterEqual(t.interval, 1.0) diff --git a/test/test_client_integration.py b/test/test_client_integration.py index b5bcb22..b33d17c 100644 --- a/test/test_client_integration.py +++ b/test/test_client_integration.py @@ -24,17 +24,6 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase): cls.server.close() cls.zk.close() - @unittest2.skip("This doesn't appear to work on Linux?") - def test_timeout(self): - server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - server_port = get_open_port() - server_socket.bind(('localhost', server_port)) - - with Timer() as t: - with self.assertRaises((socket.timeout, socket.error)): - kafka.conn.KafkaConnection("localhost", server_port, 1.0) - self.assertGreaterEqual(t.interval, 1.0) - @kafka_versions("all") def test_consume_none(self): fetch = FetchRequest(self.topic, 0, 0, 1024) |