diff options
-rw-r--r-- | test/test_conn.py | 143 |
1 files changed, 45 insertions, 98 deletions
diff --git a/test/test_conn.py b/test/test_conn.py index 720435e..6213440 100644 --- a/test/test_conn.py +++ b/test/test_conn.py @@ -11,6 +11,13 @@ from kafka.conn import * class ConnTest(unittest2.TestCase): def setUp(self): + self.config = { + 'host': 'localhost', + 'port': 9090, + 'request_id': 0, + 'payload': 'test data' + } + # Mocking socket.create_connection will cause _sock to always be a # MagicMock() patcher = mock.patch('socket.create_connection', spec=True) @@ -20,6 +27,9 @@ class ConnTest(unittest2.TestCase): self.MockCreateConn().sendall.return_value = None self.addCleanup(patcher.stop) + self.conn = KafkaConnection(self.config['host'], self.config['port']) + socket.create_connection.reset_mock() + def test_collect_hosts__happy_path(self): hosts = "localhost:1234,localhost" results = collect_hosts(hosts) @@ -52,35 +62,14 @@ class ConnTest(unittest2.TestCase): ])) def test_send(self): - fake_config = { - 'host': 'localhost', - 'port': 9090, - 'request_id': 0, - 'payload': 'test data' - } - - assert socket.create_connection is self.MockCreateConn - conn = KafkaConnection(fake_config['host'], fake_config['port']) - socket.create_connection.reset_mock() - conn.send(fake_config['request_id'], fake_config['payload']) - conn._sock.sendall.assert_called_with(fake_config['payload']) + self.conn.send(self.config['request_id'], self.config['payload']) + self.conn._sock.sendall.assert_called_with(self.config['payload']) def test_init_creates_socket_connection(self): - fake_config = { - 'host': 'localhost', - 'port': 9090, - } - - assert socket.create_connection is self.MockCreateConn - socket.create_connection.reset_mock() - KafkaConnection(fake_config['host'], fake_config['port']) - socket.create_connection.assert_called_with((fake_config['host'], fake_config['port']), DEFAULT_SOCKET_TIMEOUT_SECONDS) + KafkaConnection(self.config['host'], self.config['port']) + socket.create_connection.assert_called_with((self.config['host'], self.config['port']), DEFAULT_SOCKET_TIMEOUT_SECONDS) def test_init_failure_raises_connection_error(self): - fake_config = { - 'host': 'localhost', - 'port': 9090, - } def raise_error(*args): raise socket.error @@ -88,132 +77,90 @@ class ConnTest(unittest2.TestCase): assert socket.create_connection is self.MockCreateConn socket.create_connection.side_effect=raise_error with self.assertRaises(ConnectionError): - KafkaConnection(fake_config['host'], fake_config['port']) + KafkaConnection(self.config['host'], self.config['port']) def test_send__reconnects_on_dirty_conn(self): - fake_config = { - 'host': 'localhost', - 'port': 9090, - 'request_id': 0, - 'payload': 'test data' - } - # Get a connection (with socket mocked) - assert socket.create_connection is self.MockCreateConn - conn = KafkaConnection(fake_config['host'], fake_config['port']) - - # Dirty it + # Dirty the connection + assert self.conn._dirty is False try: - conn._raise_connection_error() + self.conn._raise_connection_error() except ConnectionError: pass - - # Reset the socket call counts - socket.create_connection.reset_mock() - self.assertEqual(socket.create_connection.call_count, 0) + assert self.conn._dirty is True # Now test that sending attempts to reconnect - conn.send(fake_config['request_id'], fake_config['payload']) + self.assertEqual(socket.create_connection.call_count, 0) + self.conn.send(self.config['request_id'], self.config['payload']) self.assertEqual(socket.create_connection.call_count, 1) # A second way to dirty it... - conn.close() + self.conn.close() # Reset the socket call counts socket.create_connection.reset_mock() self.assertEqual(socket.create_connection.call_count, 0) # Now test that sending attempts to reconnect - conn.send(fake_config['request_id'], fake_config['payload']) + self.conn.send(self.config['request_id'], self.config['payload']) self.assertEqual(socket.create_connection.call_count, 1) - def test_send__failure_sets_dirty_connection(self): - fake_config = { - 'host': 'localhost', - 'port': 9090, - 'request_id': 0, - 'payload': 'test data' - } def raise_error(*args): raise socket.error - # Get a connection (with socket mocked) - assert socket.create_connection is self.MockCreateConn - conn = KafkaConnection(fake_config['host'], fake_config['port']) + assert self.conn._dirty is False - assert isinstance(conn._sock, mock.Mock) - conn._sock.sendall.side_effect=raise_error + assert isinstance(self.conn._sock, mock.Mock) + self.conn._sock.sendall.side_effect=raise_error try: - conn.send(fake_config['request_id'], fake_config['payload']) + self.conn.send(self.config['request_id'], self.config['payload']) except ConnectionError: - self.assertEquals(conn._dirty, True) + self.assertEquals(self.conn._dirty, True) def test_recv(self): - fake_config = { - 'host': 'localhost', - 'port': 9090, - 'request_id': 0, - 'payload': 'some test data', - } - - # Get a connection - assert socket.create_connection is self.MockCreateConn - conn = KafkaConnection(fake_config['host'], fake_config['port']) # A function to mock _read_bytes - conn._mock_sent_size = False - conn._mock_data_sent = 0 + self.conn._mock_sent_size = False + self.conn._mock_data_sent = 0 def mock_socket_recv(num_bytes): - if not conn._mock_sent_size: + if not self.conn._mock_sent_size: assert num_bytes == 4 - conn._mock_sent_size = True - return struct.pack('>i', len(fake_config['payload'])) + self.conn._mock_sent_size = True + return struct.pack('>i', len(self.config['payload'])) - recv_data = struct.pack('>%ds' % num_bytes, fake_config['payload'][conn._mock_data_sent:conn._mock_data_sent+num_bytes]) - conn._mock_data_sent += num_bytes + recv_data = struct.pack('>%ds' % num_bytes, self.config['payload'][self.conn._mock_data_sent:self.conn._mock_data_sent+num_bytes]) + self.conn._mock_data_sent += num_bytes return recv_data - with mock.patch.object(conn, '_read_bytes', new=mock_socket_recv): - self.assertEquals(conn.recv(fake_config['request_id']), fake_config['payload']) + with mock.patch.object(self.conn, '_read_bytes', new=mock_socket_recv): + self.assertEquals(self.conn.recv(self.config['request_id']), self.config['payload']) def test_recv__reconnects_on_dirty_conn(self): - fake_config = { - 'host': 'localhost', - 'port': 9090, - 'request_id': 0, - 'payload': 'some test data', - } - # Get a connection - assert socket.create_connection is self.MockCreateConn - conn = KafkaConnection(fake_config['host'], fake_config['port']) - - # Dirty it + # Dirty the connection try: - conn._raise_connection_error() + self.conn._raise_connection_error() except ConnectionError: pass - - # Reset the socket call counts - socket.create_connection.reset_mock() - self.assertEqual(socket.create_connection.call_count, 0) + assert self.conn._dirty is True # Now test that recv'ing attempts to reconnect - conn._sock.recv.return_value = fake_config['payload'] - conn._read_bytes(len(fake_config['payload'])) + self.assertEqual(socket.create_connection.call_count, 0) + self.conn._sock.recv.return_value = self.config['payload'] + self.conn._read_bytes(len(self.config['payload'])) self.assertEqual(socket.create_connection.call_count, 1) # A second way to dirty it... - conn.close() + self.conn.close() # Reset the socket call counts socket.create_connection.reset_mock() self.assertEqual(socket.create_connection.call_count, 0) # Now test that recv'ing attempts to reconnect - conn._read_bytes(len(fake_config['payload'])) + self.conn._read_bytes(len(self.config['payload'])) self.assertEqual(socket.create_connection.call_count, 1) @unittest2.skip("Not Implemented") |