diff options
author | Joel Wright <joel.wright@sohonet.com> | 2014-04-04 21:13:01 +0200 |
---|---|---|
committer | Joel Wright <joel.wright@sohonet.com> | 2014-08-26 14:14:21 +0200 |
commit | 24673f8d19fe2f48964f528369081c37e880ec47 (patch) | |
tree | 8cd0cabfc9b8d858339da556fe561674dd8bc83a /tests/unit/test_multithreading.py | |
parent | d97ec374cb1ef91c34e49302842e5a151ee3e476 (diff) | |
download | python-swiftclient-24673f8d19fe2f48964f528369081c37e880ec47.tar.gz |
Add importable SwiftService incorporating shell.py logic
This patch adds a SwiftService class that incorporates the high
level logic from swiftclient/shell.py. It also ports shell.py to
use the new class, and updates the code in swiftclient/multithreading.py
to allow the SwiftService to be used for multiple operations whilst
using only one thread pool.
Currently, code that imports swiftclient has to have its own logic for
things like creating large objects, parallel uploads, and parallel
downloads. This patch adds a SwiftService class that makes that
functionality available in Python code as well as through the shell.
Change-Id: I08c5796b4c01001d79fd571651c3017c16462ffd
Implements: blueprint bin-swift-logic-as-importable-library
Diffstat (limited to 'tests/unit/test_multithreading.py')
-rw-r--r-- | tests/unit/test_multithreading.py | 377 |
1 files changed, 134 insertions, 243 deletions
diff --git a/tests/unit/test_multithreading.py b/tests/unit/test_multithreading.py index 1df0d4f..cd2e9a6 100644 --- a/tests/unit/test_multithreading.py +++ b/tests/unit/test_multithreading.py @@ -14,41 +14,40 @@ # limitations under the License. import sys -import time - -try: - from unittest import mock -except ImportError: - import mock - import testtools import threading import six + +from concurrent.futures import as_completed from six.moves.queue import Queue, Empty +from time import sleep from swiftclient import multithreading as mt -from swiftclient.exceptions import ClientException class ThreadTestCase(testtools.TestCase): def setUp(self): super(ThreadTestCase, self).setUp() + self.got_items = Queue() self.got_args_kwargs = Queue() self.starting_thread_count = threading.active_count() - def _func(self, q_item, *args, **kwargs): - self.got_items.put(q_item) + def _func(self, conn, item, *args, **kwargs): + self.got_items.put((conn, item)) self.got_args_kwargs.put((args, kwargs)) - if q_item == 'go boom': + if item == 'sleep': + sleep(1) + if item == 'go boom': raise Exception('I went boom!') - if q_item == 'c boom': - raise ClientException( - 'Client Boom', http_scheme='http', http_host='192.168.22.1', - http_port=80, http_path='/booze', http_status=404, - http_reason='to much', http_response_content='no sir!') - return 'best result EVAR!' + return 'success' + + def _create_conn(self): + return "This is a connection" + + def _create_conn_fail(self): + raise Exception("This is a failed connection") def assertQueueContains(self, queue, expected_contents): got_contents = [] @@ -62,240 +61,125 @@ class ThreadTestCase(testtools.TestCase): self.assertEqual(expected_contents, got_contents) -class TestQueueFunctionThread(ThreadTestCase): +class TestConnectionThreadPoolExecutor(ThreadTestCase): def setUp(self): - super(TestQueueFunctionThread, self).setUp() - + super(TestConnectionThreadPoolExecutor, self).setUp() self.input_queue = Queue() - self.got_items = Queue() self.stored_results = [] - self.qft = mt.QueueFunctionThread(self.input_queue, self._func, - 'one_arg', 'two_arg', - red_fish='blue_arg', - store_results=self.stored_results) - self.qft.start() - def tearDown(self): - if self.qft.is_alive(): - self.finish_up_thread() - - super(TestQueueFunctionThread, self).tearDown() - - def finish_up_thread(self): - self.input_queue.put(mt.StopWorkerThreadSignal()) - while self.qft.is_alive(): - time.sleep(0.05) - - def test_plumbing_and_store_results(self): - self.input_queue.put('abc') - self.input_queue.put(123) - self.finish_up_thread() - - self.assertQueueContains(self.got_items, ['abc', 123]) - self.assertQueueContains(self.got_args_kwargs, [ - (('one_arg', 'two_arg'), {'red_fish': 'blue_arg'}), - (('one_arg', 'two_arg'), {'red_fish': 'blue_arg'})]) - self.assertEqual(self.stored_results, - ['best result EVAR!', 'best result EVAR!']) - - def test_exception_handling(self): - self.input_queue.put('go boom') - self.input_queue.put('ok') - self.input_queue.put('go boom') - self.finish_up_thread() - - self.assertQueueContains(self.got_items, - ['go boom', 'ok', 'go boom']) - self.assertEqual(len(self.qft.exc_infos), 2) - self.assertEqual(Exception, self.qft.exc_infos[0][0]) - self.assertEqual(Exception, self.qft.exc_infos[1][0]) - self.assertEqual(('I went boom!',), self.qft.exc_infos[0][1].args) - self.assertEqual(('I went boom!',), self.qft.exc_infos[1][1].args) - - -class TestQueueFunctionManager(ThreadTestCase): - def setUp(self): - super(TestQueueFunctionManager, self).setUp() - self.thread_manager = mock.create_autospec( - mt.MultiThreadingManager, spec_set=True, instance=True) - self.thread_count = 4 - self.error_counter = [0] - self.got_items = Queue() - self.stored_results = [] - self.qfq = mt.QueueFunctionManager( - self._func, self.thread_count, self.thread_manager, - thread_args=('1arg', '2arg'), - thread_kwargs={'a': 'b', 'store_results': self.stored_results}, - error_counter=self.error_counter, - connection_maker=self.connection_maker) - - def connection_maker(self): - return 'yup, I made a connection' - - def test_context_manager_without_error_counter(self): - self.qfq = mt.QueueFunctionManager( - self._func, self.thread_count, self.thread_manager, - thread_args=('1arg', '2arg'), - thread_kwargs={'a': 'b', 'store_results': self.stored_results}, - connection_maker=self.connection_maker) - - with self.qfq as input_queue: - self.assertEqual(self.starting_thread_count + self.thread_count, - threading.active_count()) - input_queue.put('go boom') - - self.assertEqual(self.starting_thread_count, threading.active_count()) - error_strs = list(map(str, self.thread_manager.error.call_args_list)) - self.assertEqual(1, len(error_strs)) - self.assertTrue('Exception: I went boom!' in error_strs[0]) - - def test_context_manager_without_conn_maker_or_error_counter(self): - self.qfq = mt.QueueFunctionManager( - self._func, self.thread_count, self.thread_manager, - thread_args=('1arg', '2arg'), thread_kwargs={'a': 'b'}) - - with self.qfq as input_queue: - self.assertEqual(self.starting_thread_count + self.thread_count, - threading.active_count()) - for i in range(20): - input_queue.put('slap%d' % i) - - self.assertEqual(self.starting_thread_count, threading.active_count()) - self.assertEqual([], self.thread_manager.error.call_args_list) - self.assertEqual(0, self.error_counter[0]) - self.assertQueueContains(self.got_items, - set(['slap%d' % i for i in range(20)])) - self.assertQueueContains( - self.got_args_kwargs, - [(('1arg', '2arg'), {'a': 'b'})] * 20) - self.assertEqual(self.stored_results, []) - - def test_context_manager_with_exceptions(self): - with self.qfq as input_queue: - self.assertEqual(self.starting_thread_count + self.thread_count, - threading.active_count()) - for i in range(20): - input_queue.put('item%d' % i if i % 2 == 0 else 'go boom') - - self.assertEqual(self.starting_thread_count, threading.active_count()) - error_strs = list(map(str, self.thread_manager.error.call_args_list)) - self.assertEqual(10, len(error_strs)) - self.assertTrue(all(['Exception: I went boom!' in s for s in - error_strs])) - self.assertEqual(10, self.error_counter[0]) - expected_items = set(['go boom'] + - ['item%d' % i for i in range(20) - if i % 2 == 0]) - self.assertQueueContains(self.got_items, expected_items) - self.assertQueueContains( - self.got_args_kwargs, - [(('yup, I made a connection', '1arg', '2arg'), {'a': 'b'})] * 20) - self.assertEqual(self.stored_results, ['best result EVAR!'] * 10) - - def test_context_manager_with_client_exceptions(self): - with self.qfq as input_queue: - self.assertEqual(self.starting_thread_count + self.thread_count, - threading.active_count()) - for i in range(20): - input_queue.put('item%d' % i if i % 2 == 0 else 'c boom') - - self.assertEqual(self.starting_thread_count, threading.active_count()) - error_strs = list(map(str, self.thread_manager.error.call_args_list)) - self.assertEqual(10, len(error_strs)) - stringification = 'Client Boom: ' \ - 'http://192.168.22.1:80/booze 404 to much no sir!' - self.assertTrue(all([stringification in s for s in error_strs])) - self.assertEqual(10, self.error_counter[0]) - expected_items = set(['c boom'] + - ['item%d' % i for i in range(20) - if i % 2 == 0]) - self.assertQueueContains(self.got_items, expected_items) - self.assertQueueContains( - self.got_args_kwargs, - [(('yup, I made a connection', '1arg', '2arg'), {'a': 'b'})] * 20) - self.assertEqual(self.stored_results, ['best result EVAR!'] * 10) - - def test_context_manager_with_connection_maker(self): - with self.qfq as input_queue: - self.assertEqual(self.starting_thread_count + self.thread_count, - threading.active_count()) - for i in range(20): - input_queue.put('item%d' % i) - - self.assertEqual(self.starting_thread_count, threading.active_count()) - self.assertEqual([], self.thread_manager.error.call_args_list) - self.assertEqual(0, self.error_counter[0]) - self.assertQueueContains(self.got_items, - set(['item%d' % i for i in range(20)])) - self.assertQueueContains( - self.got_args_kwargs, - [(('yup, I made a connection', '1arg', '2arg'), {'a': 'b'})] * 20) - self.assertEqual(self.stored_results, ['best result EVAR!'] * 20) - - -class TestMultiThreadingManager(ThreadTestCase): - - @mock.patch('swiftclient.multithreading.QueueFunctionManager') - def test_instantiation(self, mock_qfq): - thread_manager = mt.MultiThreadingManager() - - self.assertEqual([ - mock.call(thread_manager._print, 1, thread_manager), - mock.call(thread_manager._print_error, 1, thread_manager), - ], mock_qfq.call_args_list) - - # These contexts don't get entered into until the - # MultiThreadingManager's context is entered. - self.assertEqual([], thread_manager.printer.__enter__.call_args_list) - self.assertEqual([], - thread_manager.error_printer.__enter__.call_args_list) - - # Test default values for the streams. - self.assertEqual(sys.stdout, thread_manager.print_stream) - self.assertEqual(sys.stderr, thread_manager.error_stream) - - @mock.patch('swiftclient.multithreading.QueueFunctionManager') - def test_queue_manager_no_args(self, mock_qfq): - thread_manager = mt.MultiThreadingManager() - - mock_qfq.reset_mock() - mock_qfq.return_value = 'slap happy!' - - self.assertEqual( - 'slap happy!', - thread_manager.queue_manager(self._func, 88)) - - self.assertEqual([ - mock.call(self._func, 88, thread_manager, thread_args=(), - thread_kwargs={}, connection_maker=None, - error_counter=None) - ], mock_qfq.call_args_list) - - @mock.patch('swiftclient.multithreading.QueueFunctionManager') - def test_queue_manager_with_args(self, mock_qfq): - thread_manager = mt.MultiThreadingManager() - - mock_qfq.reset_mock() - mock_qfq.return_value = 'do run run' - - self.assertEqual( - 'do run run', - thread_manager.queue_manager(self._func, 88, 'fun', times='are', - connection_maker='abc', to='be had', - error_counter='def')) - - self.assertEqual([ - mock.call(self._func, 88, thread_manager, thread_args=('fun',), - thread_kwargs={'times': 'are', 'to': 'be had'}, - connection_maker='abc', error_counter='def') - ], mock_qfq.call_args_list) + super(TestConnectionThreadPoolExecutor, self).tearDown() + + def test_submit_good_connection(self): + ctpe = mt.ConnectionThreadPoolExecutor(self._create_conn, 1) + with ctpe as pool: + # Try submitting a job that should succeed + f = pool.submit(self._func, "succeed") + f.result() + self.assertQueueContains( + self.got_items, + [("This is a connection", "succeed")] + ) + + # Now a job that fails + went_boom = False + try: + f = pool.submit(self._func, "go boom") + f.result() + except Exception as e: + went_boom = True + self.assertEquals('I went boom!', str(e)) + self.assertTrue(went_boom) + + # Has the connection been returned to the pool? + f = pool.submit(self._func, "succeed") + f.result() + self.assertQueueContains( + self.got_items, + [ + ("This is a connection", "go boom"), + ("This is a connection", "succeed") + ] + ) + + def test_submit_bad_connection(self): + ctpe = mt.ConnectionThreadPoolExecutor(self._create_conn_fail, 1) + with ctpe as pool: + # Now a connection that fails + connection_failed = False + try: + f = pool.submit(self._func, "succeed") + f.result() + except Exception as e: + connection_failed = True + self.assertEquals('This is a failed connection', str(e)) + self.assertTrue(connection_failed) + + # Make sure we don't lock up on failed connections + connection_failed = False + try: + f = pool.submit(self._func, "go boom") + f.result() + except Exception as e: + connection_failed = True + self.assertEquals('This is a failed connection', str(e)) + self.assertTrue(connection_failed) + + def test_lazy_connections(self): + ctpe = mt.ConnectionThreadPoolExecutor(self._create_conn, 10) + with ctpe as pool: + # Submit multiple jobs sequentially - should only use 1 conn + f = pool.submit(self._func, "succeed") + f.result() + f = pool.submit(self._func, "succeed") + f.result() + f = pool.submit(self._func, "succeed") + f.result() + + expected_connections = [(0, "This is a connection")] + expected_connections.extend([(x, None) for x in range(1, 10)]) + + self.assertQueueContains( + pool._connections, expected_connections + ) + + ctpe = mt.ConnectionThreadPoolExecutor(self._create_conn, 10) + with ctpe as pool: + fs = [] + f1 = pool.submit(self._func, "sleep") + f2 = pool.submit(self._func, "sleep") + f3 = pool.submit(self._func, "sleep") + fs.extend([f1, f2, f3]) + + expected_connections = [ + (0, "This is a connection"), + (1, "This is a connection"), + (2, "This is a connection") + ] + expected_connections.extend([(x, None) for x in range(3, 10)]) + + for f in as_completed(fs): + f.result() + + self.assertQueueContains( + pool._connections, expected_connections + ) + + +class TestOutputManager(testtools.TestCase): + + def test_instantiation(self): + output_manager = mt.OutputManager() + + self.assertEqual(sys.stdout, output_manager.print_stream) + self.assertEqual(sys.stderr, output_manager.error_stream) def test_printers(self): out_stream = six.StringIO() err_stream = six.StringIO() + starting_thread_count = threading.active_count() - with mt.MultiThreadingManager( + with mt.OutputManager( print_stream=out_stream, error_stream=err_stream) as thread_manager: @@ -304,7 +188,8 @@ class TestMultiThreadingManager(ThreadTestCase): self.assertEqual(out_stream, thread_manager.print_stream) self.assertEqual(err_stream, thread_manager.error_stream) - self.assertEqual(self.starting_thread_count + 2, + # No printing has happened yet, so no new threads + self.assertEqual(starting_thread_count, threading.active_count()) thread_manager.print_msg('one-argument') @@ -317,7 +202,13 @@ class TestMultiThreadingManager(ThreadTestCase): thread_manager.error('Sometimes\n%.1f%% just\ndoes not\nwork!', 3.14159) - self.assertEqual(self.starting_thread_count, threading.active_count()) + # Now we have a thread for error printing and a thread for + # normal print messages + self.assertEqual(starting_thread_count + 2, + threading.active_count()) + + # The threads should have been cleaned up + self.assertEqual(starting_thread_count, threading.active_count()) out_stream.seek(0) if six.PY3: |