summaryrefslogtreecommitdiff
path: root/tests/test_multithreading.py
diff options
context:
space:
mode:
authorChristian Schwede <christian.schwede@enovance.com>2014-02-25 21:42:17 +0000
committerChmouel Boudjnah <chmouel@enovance.com>2014-05-05 11:53:37 +0200
commiteb94ac076dec7e08f4e6c4c55d7c6f60b1b1cbfd (patch)
tree07a9ccd6ff5c1382e13df5259940c0b2cd4a9c03 /tests/test_multithreading.py
parent9ff3bad4e22fbd561d9253af2b6e38580124b450 (diff)
downloadpython-swiftclient-eb94ac076dec7e08f4e6c4c55d7c6f60b1b1cbfd.tar.gz
Add functional tests for python-swiftclient
Coverage for swiftclient.client is 71% with these tests. Unit tests have been moved into another subdirectory to separate them from functional tests. Change-Id: Ib8c4d78f7169cee893f82906f6388a5b06c45602
Diffstat (limited to 'tests/test_multithreading.py')
-rw-r--r--tests/test_multithreading.py347
1 files changed, 0 insertions, 347 deletions
diff --git a/tests/test_multithreading.py b/tests/test_multithreading.py
deleted file mode 100644
index 875e43a..0000000
--- a/tests/test_multithreading.py
+++ /dev/null
@@ -1,347 +0,0 @@
-# Copyright (c) 2010-2013 OpenStack, LLC.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
-# implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import sys
-import time
-
-try:
- from unittest import mock
-except ImportError:
- import mock
-
-import testtools
-import threading
-import six
-from six.moves.queue import Queue, Empty
-
-from swiftclient import multithreading as mt
-from swiftclient.exceptions import ClientException
-
-
-class ThreadTestCase(testtools.TestCase):
- def setUp(self):
- super(ThreadTestCase, self).setUp()
- self.got_args_kwargs = Queue()
- self.starting_thread_count = threading.active_count()
-
- def _func(self, q_item, *args, **kwargs):
- self.got_items.put(q_item)
- self.got_args_kwargs.put((args, kwargs))
-
- if q_item == 'go boom':
- raise Exception('I went boom!')
- if q_item == 'c boom':
- raise ClientException(
- 'Client Boom', http_scheme='http', http_host='192.168.22.1',
- http_port=80, http_path='/booze', http_status=404,
- http_reason='to much', http_response_content='no sir!')
-
- return 'best result EVAR!'
-
- def assertQueueContains(self, queue, expected_contents):
- got_contents = []
- try:
- while True:
- got_contents.append(queue.get(timeout=0.1))
- except Empty:
- pass
- if isinstance(expected_contents, set):
- got_contents = set(got_contents)
- self.assertEqual(expected_contents, got_contents)
-
-
-class TestQueueFunctionThread(ThreadTestCase):
- def setUp(self):
- super(TestQueueFunctionThread, self).setUp()
-
- self.input_queue = Queue()
- self.got_items = Queue()
- self.stored_results = []
-
- self.qft = mt.QueueFunctionThread(self.input_queue, self._func,
- 'one_arg', 'two_arg',
- red_fish='blue_arg',
- store_results=self.stored_results)
- self.qft.start()
-
- def tearDown(self):
- if self.qft.is_alive():
- self.finish_up_thread()
-
- super(TestQueueFunctionThread, self).tearDown()
-
- def finish_up_thread(self):
- self.input_queue.put(mt.StopWorkerThreadSignal())
- while self.qft.is_alive():
- time.sleep(0.05)
-
- def test_plumbing_and_store_results(self):
- self.input_queue.put('abc')
- self.input_queue.put(123)
- self.finish_up_thread()
-
- self.assertQueueContains(self.got_items, ['abc', 123])
- self.assertQueueContains(self.got_args_kwargs, [
- (('one_arg', 'two_arg'), {'red_fish': 'blue_arg'}),
- (('one_arg', 'two_arg'), {'red_fish': 'blue_arg'})])
- self.assertEqual(self.stored_results,
- ['best result EVAR!', 'best result EVAR!'])
-
- def test_exception_handling(self):
- self.input_queue.put('go boom')
- self.input_queue.put('ok')
- self.input_queue.put('go boom')
- self.finish_up_thread()
-
- self.assertQueueContains(self.got_items,
- ['go boom', 'ok', 'go boom'])
- self.assertEqual(len(self.qft.exc_infos), 2)
- self.assertEqual(Exception, self.qft.exc_infos[0][0])
- self.assertEqual(Exception, self.qft.exc_infos[1][0])
- self.assertEqual(('I went boom!',), self.qft.exc_infos[0][1].args)
- self.assertEqual(('I went boom!',), self.qft.exc_infos[1][1].args)
-
-
-class TestQueueFunctionManager(ThreadTestCase):
- def setUp(self):
- super(TestQueueFunctionManager, self).setUp()
- self.thread_manager = mock.create_autospec(
- mt.MultiThreadingManager, spec_set=True, instance=True)
- self.thread_count = 4
- self.error_counter = [0]
- self.got_items = Queue()
- self.stored_results = []
- self.qfq = mt.QueueFunctionManager(
- self._func, self.thread_count, self.thread_manager,
- thread_args=('1arg', '2arg'),
- thread_kwargs={'a': 'b', 'store_results': self.stored_results},
- error_counter=self.error_counter,
- connection_maker=self.connection_maker)
-
- def connection_maker(self):
- return 'yup, I made a connection'
-
- def test_context_manager_without_error_counter(self):
- self.qfq = mt.QueueFunctionManager(
- self._func, self.thread_count, self.thread_manager,
- thread_args=('1arg', '2arg'),
- thread_kwargs={'a': 'b', 'store_results': self.stored_results},
- connection_maker=self.connection_maker)
-
- with self.qfq as input_queue:
- self.assertEqual(self.starting_thread_count + self.thread_count,
- threading.active_count())
- input_queue.put('go boom')
-
- self.assertEqual(self.starting_thread_count, threading.active_count())
- error_strs = list(map(str, self.thread_manager.error.call_args_list))
- self.assertEqual(1, len(error_strs))
- self.assertTrue('Exception: I went boom!' in error_strs[0])
-
- def test_context_manager_without_conn_maker_or_error_counter(self):
- self.qfq = mt.QueueFunctionManager(
- self._func, self.thread_count, self.thread_manager,
- thread_args=('1arg', '2arg'), thread_kwargs={'a': 'b'})
-
- with self.qfq as input_queue:
- self.assertEqual(self.starting_thread_count + self.thread_count,
- threading.active_count())
- for i in range(20):
- input_queue.put('slap%d' % i)
-
- self.assertEqual(self.starting_thread_count, threading.active_count())
- self.assertEqual([], self.thread_manager.error.call_args_list)
- self.assertEqual(0, self.error_counter[0])
- self.assertQueueContains(self.got_items,
- set(['slap%d' % i for i in range(20)]))
- self.assertQueueContains(
- self.got_args_kwargs,
- [(('1arg', '2arg'), {'a': 'b'})] * 20)
- self.assertEqual(self.stored_results, [])
-
- def test_context_manager_with_exceptions(self):
- with self.qfq as input_queue:
- self.assertEqual(self.starting_thread_count + self.thread_count,
- threading.active_count())
- for i in range(20):
- input_queue.put('item%d' % i if i % 2 == 0 else 'go boom')
-
- self.assertEqual(self.starting_thread_count, threading.active_count())
- error_strs = list(map(str, self.thread_manager.error.call_args_list))
- self.assertEqual(10, len(error_strs))
- self.assertTrue(all(['Exception: I went boom!' in s for s in
- error_strs]))
- self.assertEqual(10, self.error_counter[0])
- expected_items = set(['go boom'] +
- ['item%d' % i for i in range(20)
- if i % 2 == 0])
- self.assertQueueContains(self.got_items, expected_items)
- self.assertQueueContains(
- self.got_args_kwargs,
- [(('yup, I made a connection', '1arg', '2arg'), {'a': 'b'})] * 20)
- self.assertEqual(self.stored_results, ['best result EVAR!'] * 10)
-
- def test_context_manager_with_client_exceptions(self):
- with self.qfq as input_queue:
- self.assertEqual(self.starting_thread_count + self.thread_count,
- threading.active_count())
- for i in range(20):
- input_queue.put('item%d' % i if i % 2 == 0 else 'c boom')
-
- self.assertEqual(self.starting_thread_count, threading.active_count())
- error_strs = list(map(str, self.thread_manager.error.call_args_list))
- self.assertEqual(10, len(error_strs))
- stringification = 'Client Boom: ' \
- 'http://192.168.22.1:80/booze 404 to much no sir!'
- self.assertTrue(all([stringification in s for s in error_strs]))
- self.assertEqual(10, self.error_counter[0])
- expected_items = set(['c boom'] +
- ['item%d' % i for i in range(20)
- if i % 2 == 0])
- self.assertQueueContains(self.got_items, expected_items)
- self.assertQueueContains(
- self.got_args_kwargs,
- [(('yup, I made a connection', '1arg', '2arg'), {'a': 'b'})] * 20)
- self.assertEqual(self.stored_results, ['best result EVAR!'] * 10)
-
- def test_context_manager_with_connection_maker(self):
- with self.qfq as input_queue:
- self.assertEqual(self.starting_thread_count + self.thread_count,
- threading.active_count())
- for i in range(20):
- input_queue.put('item%d' % i)
-
- self.assertEqual(self.starting_thread_count, threading.active_count())
- self.assertEqual([], self.thread_manager.error.call_args_list)
- self.assertEqual(0, self.error_counter[0])
- self.assertQueueContains(self.got_items,
- set(['item%d' % i for i in range(20)]))
- self.assertQueueContains(
- self.got_args_kwargs,
- [(('yup, I made a connection', '1arg', '2arg'), {'a': 'b'})] * 20)
- self.assertEqual(self.stored_results, ['best result EVAR!'] * 20)
-
-
-class TestMultiThreadingManager(ThreadTestCase):
-
- @mock.patch('swiftclient.multithreading.QueueFunctionManager')
- def test_instantiation(self, mock_qfq):
- thread_manager = mt.MultiThreadingManager()
-
- self.assertEqual([
- mock.call(thread_manager._print, 1, thread_manager),
- mock.call(thread_manager._print_error, 1, thread_manager),
- ], mock_qfq.call_args_list)
-
- # These contexts don't get entered into until the
- # MultiThreadingManager's context is entered.
- self.assertEqual([], thread_manager.printer.__enter__.call_args_list)
- self.assertEqual([],
- thread_manager.error_printer.__enter__.call_args_list)
-
- # Test default values for the streams.
- self.assertEqual(sys.stdout, thread_manager.print_stream)
- self.assertEqual(sys.stderr, thread_manager.error_stream)
-
- @mock.patch('swiftclient.multithreading.QueueFunctionManager')
- def test_queue_manager_no_args(self, mock_qfq):
- thread_manager = mt.MultiThreadingManager()
-
- mock_qfq.reset_mock()
- mock_qfq.return_value = 'slap happy!'
-
- self.assertEqual(
- 'slap happy!',
- thread_manager.queue_manager(self._func, 88))
-
- self.assertEqual([
- mock.call(self._func, 88, thread_manager, thread_args=(),
- thread_kwargs={}, connection_maker=None,
- error_counter=None)
- ], mock_qfq.call_args_list)
-
- @mock.patch('swiftclient.multithreading.QueueFunctionManager')
- def test_queue_manager_with_args(self, mock_qfq):
- thread_manager = mt.MultiThreadingManager()
-
- mock_qfq.reset_mock()
- mock_qfq.return_value = 'do run run'
-
- self.assertEqual(
- 'do run run',
- thread_manager.queue_manager(self._func, 88, 'fun', times='are',
- connection_maker='abc', to='be had',
- error_counter='def'))
-
- self.assertEqual([
- mock.call(self._func, 88, thread_manager, thread_args=('fun',),
- thread_kwargs={'times': 'are', 'to': 'be had'},
- connection_maker='abc', error_counter='def')
- ], mock_qfq.call_args_list)
-
- def test_printers(self):
- out_stream = six.StringIO()
- err_stream = six.StringIO()
-
- with mt.MultiThreadingManager(
- print_stream=out_stream,
- error_stream=err_stream) as thread_manager:
-
- # Sanity-checking these gives power to the previous test which
- # looked at the default values of thread_manager.print/error_stream
- self.assertEqual(out_stream, thread_manager.print_stream)
- self.assertEqual(err_stream, thread_manager.error_stream)
-
- self.assertEqual(self.starting_thread_count + 2,
- threading.active_count())
-
- thread_manager.print_msg('one-argument')
- thread_manager.print_msg('one %s, %d fish', 'fish', 88)
- thread_manager.error('I have %d problems, but a %s is not one',
- 99, u'\u062A\u062A')
- thread_manager.print_msg('some\n%s\nover the %r', 'where',
- u'\u062A\u062A')
- thread_manager.error('one-error-argument')
- thread_manager.error('Sometimes\n%.1f%% just\ndoes not\nwork!',
- 3.14159)
-
- self.assertEqual(self.starting_thread_count, threading.active_count())
-
- out_stream.seek(0)
- if six.PY3:
- over_the = "over the '\u062a\u062a'\n"
- else:
- over_the = "over the u'\\u062a\\u062a'\n"
- self.assertEqual([
- 'one-argument\n',
- 'one fish, 88 fish\n',
- 'some\n', 'where\n', over_the,
- ], list(out_stream.readlines()))
-
- err_stream.seek(0)
- first_item = u'I have 99 problems, but a \u062A\u062A is not one\n'
- if six.PY2:
- first_item = first_item.encode('utf8')
- self.assertEqual([
- first_item,
- 'one-error-argument\n',
- 'Sometimes\n', '3.1% just\n', 'does not\n', 'work!\n',
- ], list(err_stream.readlines()))
-
- self.assertEqual(3, thread_manager.error_count)
-
-
-if __name__ == '__main__':
- testtools.main()