summaryrefslogtreecommitdiff
path: root/Tools/Scripts/webkitpy/layout_tests/controllers/manager.py
diff options
context:
space:
mode:
Diffstat (limited to 'Tools/Scripts/webkitpy/layout_tests/controllers/manager.py')
-rw-r--r--Tools/Scripts/webkitpy/layout_tests/controllers/manager.py200
1 files changed, 46 insertions, 154 deletions
diff --git a/Tools/Scripts/webkitpy/layout_tests/controllers/manager.py b/Tools/Scripts/webkitpy/layout_tests/controllers/manager.py
index f2fed3f4a..7aee0c2fb 100644
--- a/Tools/Scripts/webkitpy/layout_tests/controllers/manager.py
+++ b/Tools/Scripts/webkitpy/layout_tests/controllers/manager.py
@@ -44,7 +44,7 @@ import re
import sys
import time
-from webkitpy.layout_tests.controllers import manager_worker_broker
+from webkitpy.common import message_pool
from webkitpy.layout_tests.controllers import worker
from webkitpy.layout_tests.controllers.test_result_writer import TestResultWriter
from webkitpy.layout_tests.layout_package import json_layout_results_generator
@@ -88,11 +88,9 @@ def interpret_test_failures(port, test_name, failures):
test_dict['image_diff_percent'] = failure.diff_percent
elif isinstance(failure, test_failures.FailureReftestMismatch):
test_dict['is_reftest'] = True
- test_dict['ref_file'] = port.relative_test_filename(failure.reference_filename)
test_dict['image_diff_percent'] = failure.diff_percent
elif isinstance(failure, test_failures.FailureReftestMismatchDidNotOccur):
test_dict['is_mismatch_reftest'] = True
- test_dict['ref_file'] = port.relative_test_filename(failure.reference_filename)
if test_failures.FailureMissingResult in failure_types:
test_dict['is_missing_text'] = True
@@ -266,7 +264,8 @@ class TestRunInterruptedException(Exception):
return self.__class__, (self.reason,)
-WorkerException = manager_worker_broker.WorkerException
+# Export this so callers don't need to know about message pools.
+WorkerException = message_pool.WorkerException
class TestShard(object):
@@ -300,7 +299,6 @@ class Manager(object):
self._filesystem = port.host.filesystem
self._options = options
self._printer = printer
- self._message_broker = None
self._expectations = None
self.HTTP_SUBDIR = 'http' + port.TEST_PATH_SEPARATOR
@@ -328,11 +326,9 @@ class Manager(object):
self._all_results = []
self._group_stats = {}
+ self._worker_stats = {}
self._current_result_summary = None
- # This maps worker names to the state we are tracking for each of them.
- self._worker_states = {}
-
def collect_tests(self, args):
"""Find all the files to test.
@@ -484,20 +480,13 @@ class Manager(object):
# now make sure we're explicitly running any tests passed on the command line.
self._test_files.update(found_test_files.intersection(self._paths))
- if not num_all_test_files:
+ num_to_run = len(self._test_files)
+ num_skipped = num_all_test_files - num_to_run
+
+ if not num_to_run:
_log.critical('No tests to run.')
return None
- num_skipped = num_all_test_files - len(self._test_files)
- if num_skipped:
- self._printer.print_expected("Running %s (found %d, skipping %d)." % (
- grammar.pluralize('test', num_all_test_files - num_skipped),
- num_all_test_files, num_skipped))
- elif len(self._test_files) > 1:
- self._printer.print_expected("Running all %d tests." % len(self._test_files))
- else:
- self._printer.print_expected("Running %1 test.")
-
# Create a sorted list of test files so the subset chunk,
# if used, contains alphabetically consecutive tests.
self._test_files_list = list(self._test_files)
@@ -522,6 +511,8 @@ class Manager(object):
(self._options.repeat_each if self._options.repeat_each else 1) * \
(self._options.iterations if self._options.iterations else 1)
result_summary = ResultSummary(self._expectations, self._test_files | skipped, iterations)
+
+ self._printer.print_expected('Found %s.' % grammar.pluralize('test', num_all_test_files))
self._print_expected_results_of_type(result_summary, test_expectations.PASS, "passes")
self._print_expected_results_of_type(result_summary, test_expectations.FAIL, "failures")
self._print_expected_results_of_type(result_summary, test_expectations.FLAKY, "flaky")
@@ -534,17 +525,16 @@ class Manager(object):
for test in skipped:
result = test_results.TestResult(test)
result.type = test_expectations.SKIP
- iterations = \
- (self._options.repeat_each if self._options.repeat_each else 1) * \
- (self._options.iterations if self._options.iterations else 1)
for iteration in range(iterations):
result_summary.add(result, expected=True)
self._printer.print_expected('')
- # Check to make sure we didn't filter out all of the tests.
- if not len(self._test_files):
- _log.info("All tests are being skipped")
- return None
+ if self._options.repeat_each > 1:
+ self._printer.print_expected('Running each test %d times.' % self._options.repeat_each)
+ if self._options.iterations > 1:
+ self._printer.print_expected('Running %d iterations of the tests.' % self._options.iterations)
+ if iterations > 1:
+ self._printer.print_expected('')
return result_summary
@@ -719,11 +709,12 @@ class Manager(object):
def _log_num_workers(self, num_workers, num_shards, num_locked_shards):
driver_name = self._port.driver_name()
if num_workers == 1:
- self._printer.print_config("Running 1 %s over %s" %
+ self._printer.print_config("Running 1 %s over %s." %
(driver_name, grammar.pluralize('shard', num_shards)))
else:
- self._printer.print_config("Running %d %ss in parallel over %d shards (%d locked)" %
+ self._printer.print_config("Running %d %ss in parallel over %d shards (%d locked)." %
(num_workers, driver_name, num_shards, num_locked_shards))
+ self._printer.print_config('')
def _run_tests(self, file_list, result_summary, num_workers):
"""Runs the tests in the file_list.
@@ -744,11 +735,10 @@ class Manager(object):
self._current_result_summary = result_summary
self._all_results = []
self._group_stats = {}
- self._worker_states = {}
+ self._worker_stats = {}
keyboard_interrupted = False
interrupted = False
- thread_timings = []
self._printer.print_update('Sharding tests ...')
locked_shards, unlocked_shards = self._shard_tests(file_list, int(self._options.child_processes), self._options.fully_parallel)
@@ -769,71 +759,32 @@ class Manager(object):
num_workers = min(num_workers, len(all_shards))
self._log_num_workers(num_workers, len(all_shards), len(locked_shards))
- def worker_factory(worker_connection, worker_number):
- return worker.Worker(worker_connection, worker_number, self.results_directory(), self._options)
-
- manager_connection = manager_worker_broker.get(num_workers, self, worker_factory, self._port.host)
+ def worker_factory(worker_connection):
+ return worker.Worker(worker_connection, self.results_directory(), self._options)
if self._options.dry_run:
- return (keyboard_interrupted, interrupted, thread_timings, self._group_stats, self._all_results)
+ return (keyboard_interrupted, interrupted, self._worker_stats.values(), self._group_stats, self._all_results)
self._printer.print_update('Starting %s ...' % grammar.pluralize('worker', num_workers))
- for worker_number in xrange(num_workers):
- worker_connection = manager_connection.start_worker(worker_number)
- worker_state = _WorkerState(worker_number, worker_connection)
- self._worker_states[worker_connection.name()] = worker_state
-
- time.sleep(self._port.worker_startup_delay_secs())
-
- self._printer.print_update("Starting testing ...")
- for shard in all_shards:
- # FIXME: Change 'test_list' to 'shard', make sharding public.
- manager_connection.post_message('test_list', shard.name, shard.test_inputs)
-
- # We post one 'stop' message for each worker. Because the stop message
- # are sent after all of the tests, and because each worker will stop
- # reading messsages after receiving a stop, we can be sure each
- # worker will get a stop message and hence they will all shut down.
- for _ in xrange(num_workers):
- manager_connection.post_message('stop')
try:
- while not self.is_done():
- manager_connection.run_message_loop(delay_secs=1.0)
-
- # Make sure all of the workers have shut down (if possible).
- for worker_state in self._worker_states.values():
- if worker_state.worker_connection.is_alive():
- _log.debug('Waiting for worker %d to exit' % worker_state.number)
- worker_state.worker_connection.join(5.0)
- if worker_state.worker_connection.is_alive():
- _log.error('Worker %d did not exit in time.' % worker_state.number)
-
+ with message_pool.get(self, worker_factory, num_workers, self._port.worker_startup_delay_secs(), self._port.host) as pool:
+ pool.run(('test_list', shard.name, shard.test_inputs) for shard in all_shards)
except KeyboardInterrupt:
self._printer.flush()
self._printer.write('Interrupted, exiting ...')
- self.cancel_workers()
keyboard_interrupted = True
except TestRunInterruptedException, e:
_log.warning(e.reason)
- self.cancel_workers()
interrupted = True
- except WorkerException:
- self.cancel_workers()
- raise
- except:
- # Unexpected exception; don't try to clean up workers.
- _log.error("Exception raised, exiting")
- self.cancel_workers()
+ except Exception, e:
+ _log.debug('%s("%s") raised, exiting' % (e.__class__.__name__, str(e)))
raise
finally:
- manager_connection.cleanup()
self.stop_servers_with_lock()
- thread_timings = [worker_state.stats for worker_state in self._worker_states.values()]
-
# FIXME: should this be a class instead of a tuple?
- return (interrupted, keyboard_interrupted, thread_timings, self._group_stats, self._all_results)
+ return (interrupted, keyboard_interrupted, self._worker_stats.values(), self._group_stats, self._all_results)
def results_directory(self):
if not self._retrying:
@@ -934,7 +885,7 @@ class Manager(object):
self._print_timing_statistics(end_time - start_time, thread_timings, test_timings, individual_test_timings, result_summary)
self._print_result_summary(result_summary)
- self._printer.print_one_line_summary(result_summary.total, result_summary.expected, result_summary.unexpected)
+ self._printer.print_one_line_summary(result_summary.total - result_summary.expected_skips, result_summary.expected - result_summary.expected_skips, result_summary.unexpected)
unexpected_results = summarize_results(self._port, self._expectations, result_summary, retry_summary, individual_test_timings, only_unexpected=True, interrupted=interrupted)
self._printer.print_unexpected_results(unexpected_results)
@@ -1375,9 +1326,8 @@ class Manager(object):
result_summary: information to log
"""
failed = result_summary.total_failures
- skipped = result_summary.total_tests_by_expectation[test_expectations.SKIP]
- total = result_summary.total
- passed = total - failed - skipped
+ total = result_summary.total - result_summary.expected_skips
+ passed = total - failed
pct_passed = 0.0
if total > 0:
pct_passed = float(passed) * 100 / total
@@ -1442,42 +1392,17 @@ class Manager(object):
results_filename = self._filesystem.join(self._results_directory, "results.html")
self._port.show_results_html_file(results_filename)
- def name(self):
- return 'Manager'
-
- def is_done(self):
- worker_states = self._worker_states.values()
- return worker_states and all(self._worker_is_done(worker_state) for worker_state in worker_states)
-
- # FIXME: Inline this function.
- def _worker_is_done(self, worker_state):
- return worker_state.done
-
- def cancel_workers(self):
- for worker_state in self._worker_states.values():
- worker_state.worker_connection.cancel()
-
- def handle_started_test(self, source, test_info, hang_timeout):
- worker_state = self._worker_states[source]
- worker_state.current_test_name = test_info.test_name
- worker_state.next_timeout = time.time() + hang_timeout
-
- def handle_done(self, source, log_messages=None):
- worker_state = self._worker_states[source]
- worker_state.done = True
- self._log_messages(log_messages)
-
- def handle_exception(self, source, exception_type, exception_value, stack):
- if exception_type in (KeyboardInterrupt, TestRunInterruptedException):
- raise exception_type(exception_value)
- _log.error("%s raised %s('%s'):" % (
- source,
- exception_value.__class__.__name__,
- str(exception_value)))
- self._log_worker_stack(stack)
- raise WorkerException(str(exception_value))
-
- def handle_finished_list(self, source, list_name, num_tests, elapsed_time):
+ def handle(self, name, source, *args):
+ method = getattr(self, '_handle_' + name)
+ if method:
+ return method(source, *args)
+ raise AssertionError('unknown message %s received from %s, args=%s' % (name, source, repr(args)))
+
+ def _handle_started_test(self, worker_name, test_input, test_timeout_sec):
+ # FIXME: log that we've started another test.
+ pass
+
+ def _handle_finished_test_list(self, worker_name, list_name, num_tests, elapsed_time):
self._group_stats[list_name] = (num_tests, elapsed_time)
def find(name, test_lists):
@@ -1492,29 +1417,13 @@ class Manager(object):
if not self._remaining_locked_shards:
self.stop_servers_with_lock()
- def handle_finished_test(self, source, result, elapsed_time, log_messages=None):
- worker_state = self._worker_states[source]
- worker_state.next_timeout = None
- worker_state.current_test_name = None
- worker_state.stats['total_time'] += elapsed_time
- worker_state.stats['num_tests'] += 1
-
- self._log_messages(log_messages)
+ def _handle_finished_test(self, worker_name, result, elapsed_time, log_messages=[]):
+ self._worker_stats.setdefault(worker_name, {'name': worker_name, 'num_tests': 0, 'total_time': 0})
+ self._worker_stats[worker_name]['total_time'] += elapsed_time
+ self._worker_stats[worker_name]['num_tests'] += 1
self._all_results.append(result)
self._update_summary_with_result(self._current_result_summary, result)
- def _log_messages(self, messages):
- for message in messages:
- logging.root.handle(message)
-
- def _log_worker_stack(self, stack):
- webkitpydir = self._port.path_from_webkit_base('Tools', 'Scripts', 'webkitpy') + self._filesystem.sep
- for filename, line_number, function_name, text in stack:
- if filename.startswith(webkitpydir):
- filename = filename.replace(webkitpydir, '')
- _log.error(' %s:%u (in %s)' % (filename, line_number, function_name))
- _log.error(' %s' % text)
-
def read_test_files(fs, filenames, test_path_separator):
tests = []
@@ -1563,20 +1472,3 @@ def natural_sort_key(string_to_split):
return val
return [tryint(chunk) for chunk in re.split('(\d+)', string_to_split)]
-
-
-class _WorkerState(object):
- """A class for the manager to use to track the current state of the workers."""
- def __init__(self, number, worker_connection):
- self.worker_connection = worker_connection
- self.number = number
- self.done = False
- self.current_test_name = None
- self.next_timeout = None
- self.stats = {}
- self.stats['name'] = worker_connection.name()
- self.stats['num_tests'] = 0
- self.stats['total_time'] = 0
-
- def __repr__(self):
- return "_WorkerState(" + str(self.__dict__) + ")"