diff options
author | Christian Schwede <christian.schwede@enovance.com> | 2014-02-25 21:42:17 +0000 |
---|---|---|
committer | Chmouel Boudjnah <chmouel@enovance.com> | 2014-05-05 11:53:37 +0200 |
commit | eb94ac076dec7e08f4e6c4c55d7c6f60b1b1cbfd (patch) | |
tree | 07a9ccd6ff5c1382e13df5259940c0b2cd4a9c03 /tests/test_multithreading.py | |
parent | 9ff3bad4e22fbd561d9253af2b6e38580124b450 (diff) | |
download | python-swiftclient-eb94ac076dec7e08f4e6c4c55d7c6f60b1b1cbfd.tar.gz |
Add functional tests for python-swiftclient
Coverage for swiftclient.client is 71% with these tests.
Unit tests have been moved into another subdirectory
to separate them from functional tests.
Change-Id: Ib8c4d78f7169cee893f82906f6388a5b06c45602
Diffstat (limited to 'tests/test_multithreading.py')
-rw-r--r-- | tests/test_multithreading.py | 347 |
1 files changed, 0 insertions, 347 deletions
diff --git a/tests/test_multithreading.py b/tests/test_multithreading.py deleted file mode 100644 index 875e43a..0000000 --- a/tests/test_multithreading.py +++ /dev/null @@ -1,347 +0,0 @@ -# Copyright (c) 2010-2013 OpenStack, LLC. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import sys -import time - -try: - from unittest import mock -except ImportError: - import mock - -import testtools -import threading -import six -from six.moves.queue import Queue, Empty - -from swiftclient import multithreading as mt -from swiftclient.exceptions import ClientException - - -class ThreadTestCase(testtools.TestCase): - def setUp(self): - super(ThreadTestCase, self).setUp() - self.got_args_kwargs = Queue() - self.starting_thread_count = threading.active_count() - - def _func(self, q_item, *args, **kwargs): - self.got_items.put(q_item) - self.got_args_kwargs.put((args, kwargs)) - - if q_item == 'go boom': - raise Exception('I went boom!') - if q_item == 'c boom': - raise ClientException( - 'Client Boom', http_scheme='http', http_host='192.168.22.1', - http_port=80, http_path='/booze', http_status=404, - http_reason='to much', http_response_content='no sir!') - - return 'best result EVAR!' - - def assertQueueContains(self, queue, expected_contents): - got_contents = [] - try: - while True: - got_contents.append(queue.get(timeout=0.1)) - except Empty: - pass - if isinstance(expected_contents, set): - got_contents = set(got_contents) - self.assertEqual(expected_contents, got_contents) - - -class TestQueueFunctionThread(ThreadTestCase): - def setUp(self): - super(TestQueueFunctionThread, self).setUp() - - self.input_queue = Queue() - self.got_items = Queue() - self.stored_results = [] - - self.qft = mt.QueueFunctionThread(self.input_queue, self._func, - 'one_arg', 'two_arg', - red_fish='blue_arg', - store_results=self.stored_results) - self.qft.start() - - def tearDown(self): - if self.qft.is_alive(): - self.finish_up_thread() - - super(TestQueueFunctionThread, self).tearDown() - - def finish_up_thread(self): - self.input_queue.put(mt.StopWorkerThreadSignal()) - while self.qft.is_alive(): - time.sleep(0.05) - - def test_plumbing_and_store_results(self): - self.input_queue.put('abc') - self.input_queue.put(123) - self.finish_up_thread() - - self.assertQueueContains(self.got_items, ['abc', 123]) - self.assertQueueContains(self.got_args_kwargs, [ - (('one_arg', 'two_arg'), {'red_fish': 'blue_arg'}), - (('one_arg', 'two_arg'), {'red_fish': 'blue_arg'})]) - self.assertEqual(self.stored_results, - ['best result EVAR!', 'best result EVAR!']) - - def test_exception_handling(self): - self.input_queue.put('go boom') - self.input_queue.put('ok') - self.input_queue.put('go boom') - self.finish_up_thread() - - self.assertQueueContains(self.got_items, - ['go boom', 'ok', 'go boom']) - self.assertEqual(len(self.qft.exc_infos), 2) - self.assertEqual(Exception, self.qft.exc_infos[0][0]) - self.assertEqual(Exception, self.qft.exc_infos[1][0]) - self.assertEqual(('I went boom!',), self.qft.exc_infos[0][1].args) - self.assertEqual(('I went boom!',), self.qft.exc_infos[1][1].args) - - -class TestQueueFunctionManager(ThreadTestCase): - def setUp(self): - super(TestQueueFunctionManager, self).setUp() - self.thread_manager = mock.create_autospec( - mt.MultiThreadingManager, spec_set=True, instance=True) - self.thread_count = 4 - self.error_counter = [0] - self.got_items = Queue() - self.stored_results = [] - self.qfq = mt.QueueFunctionManager( - self._func, self.thread_count, self.thread_manager, - thread_args=('1arg', '2arg'), - thread_kwargs={'a': 'b', 'store_results': self.stored_results}, - error_counter=self.error_counter, - connection_maker=self.connection_maker) - - def connection_maker(self): - return 'yup, I made a connection' - - def test_context_manager_without_error_counter(self): - self.qfq = mt.QueueFunctionManager( - self._func, self.thread_count, self.thread_manager, - thread_args=('1arg', '2arg'), - thread_kwargs={'a': 'b', 'store_results': self.stored_results}, - connection_maker=self.connection_maker) - - with self.qfq as input_queue: - self.assertEqual(self.starting_thread_count + self.thread_count, - threading.active_count()) - input_queue.put('go boom') - - self.assertEqual(self.starting_thread_count, threading.active_count()) - error_strs = list(map(str, self.thread_manager.error.call_args_list)) - self.assertEqual(1, len(error_strs)) - self.assertTrue('Exception: I went boom!' in error_strs[0]) - - def test_context_manager_without_conn_maker_or_error_counter(self): - self.qfq = mt.QueueFunctionManager( - self._func, self.thread_count, self.thread_manager, - thread_args=('1arg', '2arg'), thread_kwargs={'a': 'b'}) - - with self.qfq as input_queue: - self.assertEqual(self.starting_thread_count + self.thread_count, - threading.active_count()) - for i in range(20): - input_queue.put('slap%d' % i) - - self.assertEqual(self.starting_thread_count, threading.active_count()) - self.assertEqual([], self.thread_manager.error.call_args_list) - self.assertEqual(0, self.error_counter[0]) - self.assertQueueContains(self.got_items, - set(['slap%d' % i for i in range(20)])) - self.assertQueueContains( - self.got_args_kwargs, - [(('1arg', '2arg'), {'a': 'b'})] * 20) - self.assertEqual(self.stored_results, []) - - def test_context_manager_with_exceptions(self): - with self.qfq as input_queue: - self.assertEqual(self.starting_thread_count + self.thread_count, - threading.active_count()) - for i in range(20): - input_queue.put('item%d' % i if i % 2 == 0 else 'go boom') - - self.assertEqual(self.starting_thread_count, threading.active_count()) - error_strs = list(map(str, self.thread_manager.error.call_args_list)) - self.assertEqual(10, len(error_strs)) - self.assertTrue(all(['Exception: I went boom!' in s for s in - error_strs])) - self.assertEqual(10, self.error_counter[0]) - expected_items = set(['go boom'] + - ['item%d' % i for i in range(20) - if i % 2 == 0]) - self.assertQueueContains(self.got_items, expected_items) - self.assertQueueContains( - self.got_args_kwargs, - [(('yup, I made a connection', '1arg', '2arg'), {'a': 'b'})] * 20) - self.assertEqual(self.stored_results, ['best result EVAR!'] * 10) - - def test_context_manager_with_client_exceptions(self): - with self.qfq as input_queue: - self.assertEqual(self.starting_thread_count + self.thread_count, - threading.active_count()) - for i in range(20): - input_queue.put('item%d' % i if i % 2 == 0 else 'c boom') - - self.assertEqual(self.starting_thread_count, threading.active_count()) - error_strs = list(map(str, self.thread_manager.error.call_args_list)) - self.assertEqual(10, len(error_strs)) - stringification = 'Client Boom: ' \ - 'http://192.168.22.1:80/booze 404 to much no sir!' - self.assertTrue(all([stringification in s for s in error_strs])) - self.assertEqual(10, self.error_counter[0]) - expected_items = set(['c boom'] + - ['item%d' % i for i in range(20) - if i % 2 == 0]) - self.assertQueueContains(self.got_items, expected_items) - self.assertQueueContains( - self.got_args_kwargs, - [(('yup, I made a connection', '1arg', '2arg'), {'a': 'b'})] * 20) - self.assertEqual(self.stored_results, ['best result EVAR!'] * 10) - - def test_context_manager_with_connection_maker(self): - with self.qfq as input_queue: - self.assertEqual(self.starting_thread_count + self.thread_count, - threading.active_count()) - for i in range(20): - input_queue.put('item%d' % i) - - self.assertEqual(self.starting_thread_count, threading.active_count()) - self.assertEqual([], self.thread_manager.error.call_args_list) - self.assertEqual(0, self.error_counter[0]) - self.assertQueueContains(self.got_items, - set(['item%d' % i for i in range(20)])) - self.assertQueueContains( - self.got_args_kwargs, - [(('yup, I made a connection', '1arg', '2arg'), {'a': 'b'})] * 20) - self.assertEqual(self.stored_results, ['best result EVAR!'] * 20) - - -class TestMultiThreadingManager(ThreadTestCase): - - @mock.patch('swiftclient.multithreading.QueueFunctionManager') - def test_instantiation(self, mock_qfq): - thread_manager = mt.MultiThreadingManager() - - self.assertEqual([ - mock.call(thread_manager._print, 1, thread_manager), - mock.call(thread_manager._print_error, 1, thread_manager), - ], mock_qfq.call_args_list) - - # These contexts don't get entered into until the - # MultiThreadingManager's context is entered. - self.assertEqual([], thread_manager.printer.__enter__.call_args_list) - self.assertEqual([], - thread_manager.error_printer.__enter__.call_args_list) - - # Test default values for the streams. - self.assertEqual(sys.stdout, thread_manager.print_stream) - self.assertEqual(sys.stderr, thread_manager.error_stream) - - @mock.patch('swiftclient.multithreading.QueueFunctionManager') - def test_queue_manager_no_args(self, mock_qfq): - thread_manager = mt.MultiThreadingManager() - - mock_qfq.reset_mock() - mock_qfq.return_value = 'slap happy!' - - self.assertEqual( - 'slap happy!', - thread_manager.queue_manager(self._func, 88)) - - self.assertEqual([ - mock.call(self._func, 88, thread_manager, thread_args=(), - thread_kwargs={}, connection_maker=None, - error_counter=None) - ], mock_qfq.call_args_list) - - @mock.patch('swiftclient.multithreading.QueueFunctionManager') - def test_queue_manager_with_args(self, mock_qfq): - thread_manager = mt.MultiThreadingManager() - - mock_qfq.reset_mock() - mock_qfq.return_value = 'do run run' - - self.assertEqual( - 'do run run', - thread_manager.queue_manager(self._func, 88, 'fun', times='are', - connection_maker='abc', to='be had', - error_counter='def')) - - self.assertEqual([ - mock.call(self._func, 88, thread_manager, thread_args=('fun',), - thread_kwargs={'times': 'are', 'to': 'be had'}, - connection_maker='abc', error_counter='def') - ], mock_qfq.call_args_list) - - def test_printers(self): - out_stream = six.StringIO() - err_stream = six.StringIO() - - with mt.MultiThreadingManager( - print_stream=out_stream, - error_stream=err_stream) as thread_manager: - - # Sanity-checking these gives power to the previous test which - # looked at the default values of thread_manager.print/error_stream - self.assertEqual(out_stream, thread_manager.print_stream) - self.assertEqual(err_stream, thread_manager.error_stream) - - self.assertEqual(self.starting_thread_count + 2, - threading.active_count()) - - thread_manager.print_msg('one-argument') - thread_manager.print_msg('one %s, %d fish', 'fish', 88) - thread_manager.error('I have %d problems, but a %s is not one', - 99, u'\u062A\u062A') - thread_manager.print_msg('some\n%s\nover the %r', 'where', - u'\u062A\u062A') - thread_manager.error('one-error-argument') - thread_manager.error('Sometimes\n%.1f%% just\ndoes not\nwork!', - 3.14159) - - self.assertEqual(self.starting_thread_count, threading.active_count()) - - out_stream.seek(0) - if six.PY3: - over_the = "over the '\u062a\u062a'\n" - else: - over_the = "over the u'\\u062a\\u062a'\n" - self.assertEqual([ - 'one-argument\n', - 'one fish, 88 fish\n', - 'some\n', 'where\n', over_the, - ], list(out_stream.readlines())) - - err_stream.seek(0) - first_item = u'I have 99 problems, but a \u062A\u062A is not one\n' - if six.PY2: - first_item = first_item.encode('utf8') - self.assertEqual([ - first_item, - 'one-error-argument\n', - 'Sometimes\n', '3.1% just\n', 'does not\n', 'work!\n', - ], list(err_stream.readlines())) - - self.assertEqual(3, thread_manager.error_count) - - -if __name__ == '__main__': - testtools.main() |