summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--test/test_conn.py88
1 files changed, 82 insertions, 6 deletions
diff --git a/test/test_conn.py b/test/test_conn.py
index 8534137..720435e 100644
--- a/test/test_conn.py
+++ b/test/test_conn.py
@@ -128,17 +128,93 @@ class ConnTest(unittest2.TestCase):
self.assertEqual(socket.create_connection.call_count, 1)
- @unittest2.skip("Not Implemented")
def test_send__failure_sets_dirty_connection(self):
- pass
+ fake_config = {
+ 'host': 'localhost',
+ 'port': 9090,
+ 'request_id': 0,
+ 'payload': 'test data'
+ }
+
+ def raise_error(*args):
+ raise socket.error
+
+ # Get a connection (with socket mocked)
+ assert socket.create_connection is self.MockCreateConn
+ conn = KafkaConnection(fake_config['host'], fake_config['port'])
+
+ assert isinstance(conn._sock, mock.Mock)
+ conn._sock.sendall.side_effect=raise_error
+ try:
+ conn.send(fake_config['request_id'], fake_config['payload'])
+ except ConnectionError:
+ self.assertEquals(conn._dirty, True)
- @unittest2.skip("Not Implemented")
def test_recv(self):
- pass
+ fake_config = {
+ 'host': 'localhost',
+ 'port': 9090,
+ 'request_id': 0,
+ 'payload': 'some test data',
+ }
+
+ # Get a connection
+ assert socket.create_connection is self.MockCreateConn
+ conn = KafkaConnection(fake_config['host'], fake_config['port'])
+
+ # A function to mock _read_bytes
+ conn._mock_sent_size = False
+ conn._mock_data_sent = 0
+ def mock_socket_recv(num_bytes):
+ if not conn._mock_sent_size:
+ assert num_bytes == 4
+ conn._mock_sent_size = True
+ return struct.pack('>i', len(fake_config['payload']))
+
+ recv_data = struct.pack('>%ds' % num_bytes, fake_config['payload'][conn._mock_data_sent:conn._mock_data_sent+num_bytes])
+ conn._mock_data_sent += num_bytes
+ return recv_data
+
+ with mock.patch.object(conn, '_read_bytes', new=mock_socket_recv):
+ self.assertEquals(conn.recv(fake_config['request_id']), fake_config['payload'])
- @unittest2.skip("Not Implemented")
def test_recv__reconnects_on_dirty_conn(self):
- pass
+ fake_config = {
+ 'host': 'localhost',
+ 'port': 9090,
+ 'request_id': 0,
+ 'payload': 'some test data',
+ }
+
+ # Get a connection
+ assert socket.create_connection is self.MockCreateConn
+ conn = KafkaConnection(fake_config['host'], fake_config['port'])
+
+ # Dirty it
+ try:
+ conn._raise_connection_error()
+ except ConnectionError:
+ pass
+
+ # Reset the socket call counts
+ socket.create_connection.reset_mock()
+ self.assertEqual(socket.create_connection.call_count, 0)
+
+ # Now test that recv'ing attempts to reconnect
+ conn._sock.recv.return_value = fake_config['payload']
+ conn._read_bytes(len(fake_config['payload']))
+ self.assertEqual(socket.create_connection.call_count, 1)
+
+ # A second way to dirty it...
+ conn.close()
+
+ # Reset the socket call counts
+ socket.create_connection.reset_mock()
+ self.assertEqual(socket.create_connection.call_count, 0)
+
+ # Now test that recv'ing attempts to reconnect
+ conn._read_bytes(len(fake_config['payload']))
+ self.assertEqual(socket.create_connection.call_count, 1)
@unittest2.skip("Not Implemented")
def test_recv__failure_sets_dirty_connection(self):