diff options
Diffstat (limited to 'test/test_conn.py')
-rw-r--r-- | test/test_conn.py | 139 |
1 files changed, 117 insertions, 22 deletions
diff --git a/test/test_conn.py b/test/test_conn.py index 4ab6d4f..184a99e 100644 --- a/test/test_conn.py +++ b/test/test_conn.py @@ -1,13 +1,52 @@ -import os -import random +import socket import struct + +import mock import unittest2 -import kafka.conn + +from kafka.common import * +from kafka.conn import * class ConnTest(unittest2.TestCase): + def setUp(self): + self.config = { + 'host': 'localhost', + 'port': 9090, + 'request_id': 0, + 'payload': 'test data', + 'payload2': 'another packet' + } + + # Mocking socket.create_connection will cause _sock to always be a + # MagicMock() + patcher = mock.patch('socket.create_connection', spec=True) + self.MockCreateConn = patcher.start() + self.addCleanup(patcher.stop) + + # Also mock socket.sendall() to appear successful + socket.create_connection().sendall.return_value = None + + # And mock socket.recv() to return two payloads, then '', then raise + # Note that this currently ignores the num_bytes parameter to sock.recv() + payload_size = len(self.config['payload']) + payload2_size = len(self.config['payload2']) + socket.create_connection().recv.side_effect = [ + struct.pack('>i', payload_size), + struct.pack('>%ds' % payload_size, self.config['payload']), + struct.pack('>i', payload2_size), + struct.pack('>%ds' % payload2_size, self.config['payload2']), + '' + ] + + # Create a connection object + self.conn = KafkaConnection(self.config['host'], self.config['port']) + + # Reset any mock counts caused by __init__ + socket.create_connection.reset_mock() + def test_collect_hosts__happy_path(self): hosts = "localhost:1234,localhost" - results = kafka.conn.collect_hosts(hosts) + results = collect_hosts(hosts) self.assertEqual(set(results), set([ ('localhost', 1234), @@ -20,7 +59,7 @@ class ConnTest(unittest2.TestCase): 'localhost', ] - results = kafka.conn.collect_hosts(hosts) + results = collect_hosts(hosts) self.assertEqual(set(results), set([ ('localhost', 1234), @@ -29,41 +68,97 @@ class ConnTest(unittest2.TestCase): def test_collect_hosts__with_spaces(self): hosts = "localhost:1234, localhost" - results = kafka.conn.collect_hosts(hosts) + results = collect_hosts(hosts) self.assertEqual(set(results), set([ ('localhost', 1234), ('localhost', 9092), ])) - @unittest2.skip("Not Implemented") def test_send(self): - pass + self.conn.send(self.config['request_id'], self.config['payload']) + self.conn._sock.sendall.assert_called_with(self.config['payload']) + + def test_init_creates_socket_connection(self): + KafkaConnection(self.config['host'], self.config['port']) + socket.create_connection.assert_called_with((self.config['host'], self.config['port']), DEFAULT_SOCKET_TIMEOUT_SECONDS) + + def test_init_failure_raises_connection_error(self): + + def raise_error(*args): + raise socket.error + + assert socket.create_connection is self.MockCreateConn + socket.create_connection.side_effect=raise_error + with self.assertRaises(ConnectionError): + KafkaConnection(self.config['host'], self.config['port']) - @unittest2.skip("Not Implemented") def test_send__reconnects_on_dirty_conn(self): - pass - @unittest2.skip("Not Implemented") + # Dirty the connection + try: + self.conn._raise_connection_error() + except ConnectionError: + pass + + # Now test that sending attempts to reconnect + self.assertEqual(socket.create_connection.call_count, 0) + self.conn.send(self.config['request_id'], self.config['payload']) + self.assertEqual(socket.create_connection.call_count, 1) + def test_send__failure_sets_dirty_connection(self): - pass - @unittest2.skip("Not Implemented") + def raise_error(*args): + raise socket.error + + assert isinstance(self.conn._sock, mock.Mock) + self.conn._sock.sendall.side_effect=raise_error + try: + self.conn.send(self.config['request_id'], self.config['payload']) + except ConnectionError: + self.assertIsNone(self.conn._sock) + def test_recv(self): - pass - @unittest2.skip("Not Implemented") + self.assertEquals(self.conn.recv(self.config['request_id']), self.config['payload']) + def test_recv__reconnects_on_dirty_conn(self): - pass - @unittest2.skip("Not Implemented") + # Dirty the connection + try: + self.conn._raise_connection_error() + except ConnectionError: + pass + + # Now test that recv'ing attempts to reconnect + self.assertEqual(socket.create_connection.call_count, 0) + self.conn.recv(self.config['request_id']) + self.assertEqual(socket.create_connection.call_count, 1) + def test_recv__failure_sets_dirty_connection(self): - pass - @unittest2.skip("Not Implemented") + def raise_error(*args): + raise socket.error + + # test that recv'ing attempts to reconnect + assert isinstance(self.conn._sock, mock.Mock) + self.conn._sock.recv.side_effect=raise_error + try: + self.conn.recv(self.config['request_id']) + except ConnectionError: + self.assertIsNone(self.conn._sock) + def test_recv__doesnt_consume_extra_data_in_stream(self): - pass - @unittest2.skip("Not Implemented") + # Here just test that each call to recv will return a single payload + self.assertEquals(self.conn.recv(self.config['request_id']), self.config['payload']) + self.assertEquals(self.conn.recv(self.config['request_id']), self.config['payload2']) + def test_close__object_is_reusable(self): - pass + + # test that sending to a closed connection + # will re-connect and send data to the socket + self.conn.close() + self.conn.send(self.config['request_id'], self.config['payload']) + self.assertEqual(socket.create_connection.call_count, 1) + self.conn._sock.sendall.assert_called_with(self.config['payload']) |