summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2014-09-01 01:24:35 -0700
committerDana Powers <dana.powers@rd.io>2014-09-01 01:24:35 -0700
commit1954f385d15f8d2d8608b13f2c5b175db37d38f8 (patch)
treec2aff041cf7371ec9e34c06ccfc8b5594259fa08
parent8ca55d6a7458c593e307879f285f6d7f53a8219b (diff)
downloadkafka-python-1954f385d15f8d2d8608b13f2c5b175db37d38f8.tar.gz
Fix socket timeout test -- mock the side_effect
-rw-r--r--test/test_client.py20
-rw-r--r--test/test_client_integration.py11
2 files changed, 19 insertions, 12 deletions
diff --git a/test/test_client.py b/test/test_client.py
index 32a2256..6a80bbb 100644
--- a/test/test_client.py
+++ b/test/test_client.py
@@ -1,3 +1,5 @@
+import socket
+from time import sleep
import unittest2
from mock import MagicMock, patch
@@ -6,10 +8,14 @@ from kafka import KafkaClient
from kafka.common import (
ProduceRequest, BrokerMetadata, PartitionMetadata,
TopicAndPartition, KafkaUnavailableError,
- LeaderUnavailableError, PartitionUnavailableError
+ LeaderUnavailableError, PartitionUnavailableError,
+ ConnectionError
)
+from kafka.conn import KafkaConnection
from kafka.protocol import create_message
+from test.testutil import Timer
+
class TestKafkaClient(unittest2.TestCase):
def test_init_with_list(self):
with patch.object(KafkaClient, 'load_metadata_for_topics'):
@@ -242,3 +248,15 @@ class TestKafkaClient(unittest2.TestCase):
with self.assertRaises(LeaderUnavailableError):
client.send_produce_request(requests)
+ def test_timeout(self):
+ def _timeout(*args, **kwargs):
+ timeout = args[1]
+ sleep(timeout)
+ raise socket.timeout
+
+ with patch.object(socket, "create_connection", side_effect=_timeout):
+
+ with Timer() as t:
+ with self.assertRaises(ConnectionError):
+ KafkaConnection("nowhere", 1234, 1.0)
+ self.assertGreaterEqual(t.interval, 1.0)
diff --git a/test/test_client_integration.py b/test/test_client_integration.py
index b5bcb22..b33d17c 100644
--- a/test/test_client_integration.py
+++ b/test/test_client_integration.py
@@ -24,17 +24,6 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase):
cls.server.close()
cls.zk.close()
- @unittest2.skip("This doesn't appear to work on Linux?")
- def test_timeout(self):
- server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- server_port = get_open_port()
- server_socket.bind(('localhost', server_port))
-
- with Timer() as t:
- with self.assertRaises((socket.timeout, socket.error)):
- kafka.conn.KafkaConnection("localhost", server_port, 1.0)
- self.assertGreaterEqual(t.interval, 1.0)
-
@kafka_versions("all")
def test_consume_none(self):
fetch = FetchRequest(self.topic, 0, 0, 1024)