summaryrefslogtreecommitdiff
path: root/tests/unit/test_multithreading.py
diff options
context:
space:
mode:
authorJoel Wright <joel.wright@sohonet.com>2014-04-04 21:13:01 +0200
committerJoel Wright <joel.wright@sohonet.com>2014-08-26 14:14:21 +0200
commit24673f8d19fe2f48964f528369081c37e880ec47 (patch)
tree8cd0cabfc9b8d858339da556fe561674dd8bc83a /tests/unit/test_multithreading.py
parentd97ec374cb1ef91c34e49302842e5a151ee3e476 (diff)
downloadpython-swiftclient-24673f8d19fe2f48964f528369081c37e880ec47.tar.gz
Add importable SwiftService incorporating shell.py logic
This patch adds a SwiftService class that incorporates the high level logic from swiftclient/shell.py. It also ports shell.py to use the new class, and updates the code in swiftclient/multithreading.py to allow the SwiftService to be used for multiple operations whilst using only one thread pool. Currently, code that imports swiftclient has to have its own logic for things like creating large objects, parallel uploads, and parallel downloads. This patch adds a SwiftService class that makes that functionality available in Python code as well as through the shell. Change-Id: I08c5796b4c01001d79fd571651c3017c16462ffd Implements: blueprint bin-swift-logic-as-importable-library
Diffstat (limited to 'tests/unit/test_multithreading.py')
-rw-r--r--tests/unit/test_multithreading.py377
1 files changed, 134 insertions, 243 deletions
diff --git a/tests/unit/test_multithreading.py b/tests/unit/test_multithreading.py
index 1df0d4f..cd2e9a6 100644
--- a/tests/unit/test_multithreading.py
+++ b/tests/unit/test_multithreading.py
@@ -14,41 +14,40 @@
# limitations under the License.
import sys
-import time
-
-try:
- from unittest import mock
-except ImportError:
- import mock
-
import testtools
import threading
import six
+
+from concurrent.futures import as_completed
from six.moves.queue import Queue, Empty
+from time import sleep
from swiftclient import multithreading as mt
-from swiftclient.exceptions import ClientException
class ThreadTestCase(testtools.TestCase):
def setUp(self):
super(ThreadTestCase, self).setUp()
+ self.got_items = Queue()
self.got_args_kwargs = Queue()
self.starting_thread_count = threading.active_count()
- def _func(self, q_item, *args, **kwargs):
- self.got_items.put(q_item)
+ def _func(self, conn, item, *args, **kwargs):
+ self.got_items.put((conn, item))
self.got_args_kwargs.put((args, kwargs))
- if q_item == 'go boom':
+ if item == 'sleep':
+ sleep(1)
+ if item == 'go boom':
raise Exception('I went boom!')
- if q_item == 'c boom':
- raise ClientException(
- 'Client Boom', http_scheme='http', http_host='192.168.22.1',
- http_port=80, http_path='/booze', http_status=404,
- http_reason='to much', http_response_content='no sir!')
- return 'best result EVAR!'
+ return 'success'
+
+ def _create_conn(self):
+ return "This is a connection"
+
+ def _create_conn_fail(self):
+ raise Exception("This is a failed connection")
def assertQueueContains(self, queue, expected_contents):
got_contents = []
@@ -62,240 +61,125 @@ class ThreadTestCase(testtools.TestCase):
self.assertEqual(expected_contents, got_contents)
-class TestQueueFunctionThread(ThreadTestCase):
+class TestConnectionThreadPoolExecutor(ThreadTestCase):
def setUp(self):
- super(TestQueueFunctionThread, self).setUp()
-
+ super(TestConnectionThreadPoolExecutor, self).setUp()
self.input_queue = Queue()
- self.got_items = Queue()
self.stored_results = []
- self.qft = mt.QueueFunctionThread(self.input_queue, self._func,
- 'one_arg', 'two_arg',
- red_fish='blue_arg',
- store_results=self.stored_results)
- self.qft.start()
-
def tearDown(self):
- if self.qft.is_alive():
- self.finish_up_thread()
-
- super(TestQueueFunctionThread, self).tearDown()
-
- def finish_up_thread(self):
- self.input_queue.put(mt.StopWorkerThreadSignal())
- while self.qft.is_alive():
- time.sleep(0.05)
-
- def test_plumbing_and_store_results(self):
- self.input_queue.put('abc')
- self.input_queue.put(123)
- self.finish_up_thread()
-
- self.assertQueueContains(self.got_items, ['abc', 123])
- self.assertQueueContains(self.got_args_kwargs, [
- (('one_arg', 'two_arg'), {'red_fish': 'blue_arg'}),
- (('one_arg', 'two_arg'), {'red_fish': 'blue_arg'})])
- self.assertEqual(self.stored_results,
- ['best result EVAR!', 'best result EVAR!'])
-
- def test_exception_handling(self):
- self.input_queue.put('go boom')
- self.input_queue.put('ok')
- self.input_queue.put('go boom')
- self.finish_up_thread()
-
- self.assertQueueContains(self.got_items,
- ['go boom', 'ok', 'go boom'])
- self.assertEqual(len(self.qft.exc_infos), 2)
- self.assertEqual(Exception, self.qft.exc_infos[0][0])
- self.assertEqual(Exception, self.qft.exc_infos[1][0])
- self.assertEqual(('I went boom!',), self.qft.exc_infos[0][1].args)
- self.assertEqual(('I went boom!',), self.qft.exc_infos[1][1].args)
-
-
-class TestQueueFunctionManager(ThreadTestCase):
- def setUp(self):
- super(TestQueueFunctionManager, self).setUp()
- self.thread_manager = mock.create_autospec(
- mt.MultiThreadingManager, spec_set=True, instance=True)
- self.thread_count = 4
- self.error_counter = [0]
- self.got_items = Queue()
- self.stored_results = []
- self.qfq = mt.QueueFunctionManager(
- self._func, self.thread_count, self.thread_manager,
- thread_args=('1arg', '2arg'),
- thread_kwargs={'a': 'b', 'store_results': self.stored_results},
- error_counter=self.error_counter,
- connection_maker=self.connection_maker)
-
- def connection_maker(self):
- return 'yup, I made a connection'
-
- def test_context_manager_without_error_counter(self):
- self.qfq = mt.QueueFunctionManager(
- self._func, self.thread_count, self.thread_manager,
- thread_args=('1arg', '2arg'),
- thread_kwargs={'a': 'b', 'store_results': self.stored_results},
- connection_maker=self.connection_maker)
-
- with self.qfq as input_queue:
- self.assertEqual(self.starting_thread_count + self.thread_count,
- threading.active_count())
- input_queue.put('go boom')
-
- self.assertEqual(self.starting_thread_count, threading.active_count())
- error_strs = list(map(str, self.thread_manager.error.call_args_list))
- self.assertEqual(1, len(error_strs))
- self.assertTrue('Exception: I went boom!' in error_strs[0])
-
- def test_context_manager_without_conn_maker_or_error_counter(self):
- self.qfq = mt.QueueFunctionManager(
- self._func, self.thread_count, self.thread_manager,
- thread_args=('1arg', '2arg'), thread_kwargs={'a': 'b'})
-
- with self.qfq as input_queue:
- self.assertEqual(self.starting_thread_count + self.thread_count,
- threading.active_count())
- for i in range(20):
- input_queue.put('slap%d' % i)
-
- self.assertEqual(self.starting_thread_count, threading.active_count())
- self.assertEqual([], self.thread_manager.error.call_args_list)
- self.assertEqual(0, self.error_counter[0])
- self.assertQueueContains(self.got_items,
- set(['slap%d' % i for i in range(20)]))
- self.assertQueueContains(
- self.got_args_kwargs,
- [(('1arg', '2arg'), {'a': 'b'})] * 20)
- self.assertEqual(self.stored_results, [])
-
- def test_context_manager_with_exceptions(self):
- with self.qfq as input_queue:
- self.assertEqual(self.starting_thread_count + self.thread_count,
- threading.active_count())
- for i in range(20):
- input_queue.put('item%d' % i if i % 2 == 0 else 'go boom')
-
- self.assertEqual(self.starting_thread_count, threading.active_count())
- error_strs = list(map(str, self.thread_manager.error.call_args_list))
- self.assertEqual(10, len(error_strs))
- self.assertTrue(all(['Exception: I went boom!' in s for s in
- error_strs]))
- self.assertEqual(10, self.error_counter[0])
- expected_items = set(['go boom'] +
- ['item%d' % i for i in range(20)
- if i % 2 == 0])
- self.assertQueueContains(self.got_items, expected_items)
- self.assertQueueContains(
- self.got_args_kwargs,
- [(('yup, I made a connection', '1arg', '2arg'), {'a': 'b'})] * 20)
- self.assertEqual(self.stored_results, ['best result EVAR!'] * 10)
-
- def test_context_manager_with_client_exceptions(self):
- with self.qfq as input_queue:
- self.assertEqual(self.starting_thread_count + self.thread_count,
- threading.active_count())
- for i in range(20):
- input_queue.put('item%d' % i if i % 2 == 0 else 'c boom')
-
- self.assertEqual(self.starting_thread_count, threading.active_count())
- error_strs = list(map(str, self.thread_manager.error.call_args_list))
- self.assertEqual(10, len(error_strs))
- stringification = 'Client Boom: ' \
- 'http://192.168.22.1:80/booze 404 to much no sir!'
- self.assertTrue(all([stringification in s for s in error_strs]))
- self.assertEqual(10, self.error_counter[0])
- expected_items = set(['c boom'] +
- ['item%d' % i for i in range(20)
- if i % 2 == 0])
- self.assertQueueContains(self.got_items, expected_items)
- self.assertQueueContains(
- self.got_args_kwargs,
- [(('yup, I made a connection', '1arg', '2arg'), {'a': 'b'})] * 20)
- self.assertEqual(self.stored_results, ['best result EVAR!'] * 10)
-
- def test_context_manager_with_connection_maker(self):
- with self.qfq as input_queue:
- self.assertEqual(self.starting_thread_count + self.thread_count,
- threading.active_count())
- for i in range(20):
- input_queue.put('item%d' % i)
-
- self.assertEqual(self.starting_thread_count, threading.active_count())
- self.assertEqual([], self.thread_manager.error.call_args_list)
- self.assertEqual(0, self.error_counter[0])
- self.assertQueueContains(self.got_items,
- set(['item%d' % i for i in range(20)]))
- self.assertQueueContains(
- self.got_args_kwargs,
- [(('yup, I made a connection', '1arg', '2arg'), {'a': 'b'})] * 20)
- self.assertEqual(self.stored_results, ['best result EVAR!'] * 20)
-
-
-class TestMultiThreadingManager(ThreadTestCase):
-
- @mock.patch('swiftclient.multithreading.QueueFunctionManager')
- def test_instantiation(self, mock_qfq):
- thread_manager = mt.MultiThreadingManager()
-
- self.assertEqual([
- mock.call(thread_manager._print, 1, thread_manager),
- mock.call(thread_manager._print_error, 1, thread_manager),
- ], mock_qfq.call_args_list)
-
- # These contexts don't get entered into until the
- # MultiThreadingManager's context is entered.
- self.assertEqual([], thread_manager.printer.__enter__.call_args_list)
- self.assertEqual([],
- thread_manager.error_printer.__enter__.call_args_list)
-
- # Test default values for the streams.
- self.assertEqual(sys.stdout, thread_manager.print_stream)
- self.assertEqual(sys.stderr, thread_manager.error_stream)
-
- @mock.patch('swiftclient.multithreading.QueueFunctionManager')
- def test_queue_manager_no_args(self, mock_qfq):
- thread_manager = mt.MultiThreadingManager()
-
- mock_qfq.reset_mock()
- mock_qfq.return_value = 'slap happy!'
-
- self.assertEqual(
- 'slap happy!',
- thread_manager.queue_manager(self._func, 88))
-
- self.assertEqual([
- mock.call(self._func, 88, thread_manager, thread_args=(),
- thread_kwargs={}, connection_maker=None,
- error_counter=None)
- ], mock_qfq.call_args_list)
-
- @mock.patch('swiftclient.multithreading.QueueFunctionManager')
- def test_queue_manager_with_args(self, mock_qfq):
- thread_manager = mt.MultiThreadingManager()
-
- mock_qfq.reset_mock()
- mock_qfq.return_value = 'do run run'
-
- self.assertEqual(
- 'do run run',
- thread_manager.queue_manager(self._func, 88, 'fun', times='are',
- connection_maker='abc', to='be had',
- error_counter='def'))
-
- self.assertEqual([
- mock.call(self._func, 88, thread_manager, thread_args=('fun',),
- thread_kwargs={'times': 'are', 'to': 'be had'},
- connection_maker='abc', error_counter='def')
- ], mock_qfq.call_args_list)
+ super(TestConnectionThreadPoolExecutor, self).tearDown()
+
+ def test_submit_good_connection(self):
+ ctpe = mt.ConnectionThreadPoolExecutor(self._create_conn, 1)
+ with ctpe as pool:
+ # Try submitting a job that should succeed
+ f = pool.submit(self._func, "succeed")
+ f.result()
+ self.assertQueueContains(
+ self.got_items,
+ [("This is a connection", "succeed")]
+ )
+
+ # Now a job that fails
+ went_boom = False
+ try:
+ f = pool.submit(self._func, "go boom")
+ f.result()
+ except Exception as e:
+ went_boom = True
+ self.assertEquals('I went boom!', str(e))
+ self.assertTrue(went_boom)
+
+ # Has the connection been returned to the pool?
+ f = pool.submit(self._func, "succeed")
+ f.result()
+ self.assertQueueContains(
+ self.got_items,
+ [
+ ("This is a connection", "go boom"),
+ ("This is a connection", "succeed")
+ ]
+ )
+
+ def test_submit_bad_connection(self):
+ ctpe = mt.ConnectionThreadPoolExecutor(self._create_conn_fail, 1)
+ with ctpe as pool:
+ # Now a connection that fails
+ connection_failed = False
+ try:
+ f = pool.submit(self._func, "succeed")
+ f.result()
+ except Exception as e:
+ connection_failed = True
+ self.assertEquals('This is a failed connection', str(e))
+ self.assertTrue(connection_failed)
+
+ # Make sure we don't lock up on failed connections
+ connection_failed = False
+ try:
+ f = pool.submit(self._func, "go boom")
+ f.result()
+ except Exception as e:
+ connection_failed = True
+ self.assertEquals('This is a failed connection', str(e))
+ self.assertTrue(connection_failed)
+
+ def test_lazy_connections(self):
+ ctpe = mt.ConnectionThreadPoolExecutor(self._create_conn, 10)
+ with ctpe as pool:
+ # Submit multiple jobs sequentially - should only use 1 conn
+ f = pool.submit(self._func, "succeed")
+ f.result()
+ f = pool.submit(self._func, "succeed")
+ f.result()
+ f = pool.submit(self._func, "succeed")
+ f.result()
+
+ expected_connections = [(0, "This is a connection")]
+ expected_connections.extend([(x, None) for x in range(1, 10)])
+
+ self.assertQueueContains(
+ pool._connections, expected_connections
+ )
+
+ ctpe = mt.ConnectionThreadPoolExecutor(self._create_conn, 10)
+ with ctpe as pool:
+ fs = []
+ f1 = pool.submit(self._func, "sleep")
+ f2 = pool.submit(self._func, "sleep")
+ f3 = pool.submit(self._func, "sleep")
+ fs.extend([f1, f2, f3])
+
+ expected_connections = [
+ (0, "This is a connection"),
+ (1, "This is a connection"),
+ (2, "This is a connection")
+ ]
+ expected_connections.extend([(x, None) for x in range(3, 10)])
+
+ for f in as_completed(fs):
+ f.result()
+
+ self.assertQueueContains(
+ pool._connections, expected_connections
+ )
+
+
+class TestOutputManager(testtools.TestCase):
+
+ def test_instantiation(self):
+ output_manager = mt.OutputManager()
+
+ self.assertEqual(sys.stdout, output_manager.print_stream)
+ self.assertEqual(sys.stderr, output_manager.error_stream)
def test_printers(self):
out_stream = six.StringIO()
err_stream = six.StringIO()
+ starting_thread_count = threading.active_count()
- with mt.MultiThreadingManager(
+ with mt.OutputManager(
print_stream=out_stream,
error_stream=err_stream) as thread_manager:
@@ -304,7 +188,8 @@ class TestMultiThreadingManager(ThreadTestCase):
self.assertEqual(out_stream, thread_manager.print_stream)
self.assertEqual(err_stream, thread_manager.error_stream)
- self.assertEqual(self.starting_thread_count + 2,
+ # No printing has happened yet, so no new threads
+ self.assertEqual(starting_thread_count,
threading.active_count())
thread_manager.print_msg('one-argument')
@@ -317,7 +202,13 @@ class TestMultiThreadingManager(ThreadTestCase):
thread_manager.error('Sometimes\n%.1f%% just\ndoes not\nwork!',
3.14159)
- self.assertEqual(self.starting_thread_count, threading.active_count())
+ # Now we have a thread for error printing and a thread for
+ # normal print messages
+ self.assertEqual(starting_thread_count + 2,
+ threading.active_count())
+
+ # The threads should have been cleaned up
+ self.assertEqual(starting_thread_count, threading.active_count())
out_stream.seek(0)
if six.PY3: