diff options
Diffstat (limited to 'Tools/Scripts/webkitpy/layout_tests/controllers/manager.py')
| -rw-r--r-- | Tools/Scripts/webkitpy/layout_tests/controllers/manager.py | 200 |
1 files changed, 46 insertions, 154 deletions
diff --git a/Tools/Scripts/webkitpy/layout_tests/controllers/manager.py b/Tools/Scripts/webkitpy/layout_tests/controllers/manager.py index f2fed3f4a..7aee0c2fb 100644 --- a/Tools/Scripts/webkitpy/layout_tests/controllers/manager.py +++ b/Tools/Scripts/webkitpy/layout_tests/controllers/manager.py @@ -44,7 +44,7 @@ import re import sys import time -from webkitpy.layout_tests.controllers import manager_worker_broker +from webkitpy.common import message_pool from webkitpy.layout_tests.controllers import worker from webkitpy.layout_tests.controllers.test_result_writer import TestResultWriter from webkitpy.layout_tests.layout_package import json_layout_results_generator @@ -88,11 +88,9 @@ def interpret_test_failures(port, test_name, failures): test_dict['image_diff_percent'] = failure.diff_percent elif isinstance(failure, test_failures.FailureReftestMismatch): test_dict['is_reftest'] = True - test_dict['ref_file'] = port.relative_test_filename(failure.reference_filename) test_dict['image_diff_percent'] = failure.diff_percent elif isinstance(failure, test_failures.FailureReftestMismatchDidNotOccur): test_dict['is_mismatch_reftest'] = True - test_dict['ref_file'] = port.relative_test_filename(failure.reference_filename) if test_failures.FailureMissingResult in failure_types: test_dict['is_missing_text'] = True @@ -266,7 +264,8 @@ class TestRunInterruptedException(Exception): return self.__class__, (self.reason,) -WorkerException = manager_worker_broker.WorkerException +# Export this so callers don't need to know about message pools. +WorkerException = message_pool.WorkerException class TestShard(object): @@ -300,7 +299,6 @@ class Manager(object): self._filesystem = port.host.filesystem self._options = options self._printer = printer - self._message_broker = None self._expectations = None self.HTTP_SUBDIR = 'http' + port.TEST_PATH_SEPARATOR @@ -328,11 +326,9 @@ class Manager(object): self._all_results = [] self._group_stats = {} + self._worker_stats = {} self._current_result_summary = None - # This maps worker names to the state we are tracking for each of them. - self._worker_states = {} - def collect_tests(self, args): """Find all the files to test. @@ -484,20 +480,13 @@ class Manager(object): # now make sure we're explicitly running any tests passed on the command line. self._test_files.update(found_test_files.intersection(self._paths)) - if not num_all_test_files: + num_to_run = len(self._test_files) + num_skipped = num_all_test_files - num_to_run + + if not num_to_run: _log.critical('No tests to run.') return None - num_skipped = num_all_test_files - len(self._test_files) - if num_skipped: - self._printer.print_expected("Running %s (found %d, skipping %d)." % ( - grammar.pluralize('test', num_all_test_files - num_skipped), - num_all_test_files, num_skipped)) - elif len(self._test_files) > 1: - self._printer.print_expected("Running all %d tests." % len(self._test_files)) - else: - self._printer.print_expected("Running %1 test.") - # Create a sorted list of test files so the subset chunk, # if used, contains alphabetically consecutive tests. self._test_files_list = list(self._test_files) @@ -522,6 +511,8 @@ class Manager(object): (self._options.repeat_each if self._options.repeat_each else 1) * \ (self._options.iterations if self._options.iterations else 1) result_summary = ResultSummary(self._expectations, self._test_files | skipped, iterations) + + self._printer.print_expected('Found %s.' % grammar.pluralize('test', num_all_test_files)) self._print_expected_results_of_type(result_summary, test_expectations.PASS, "passes") self._print_expected_results_of_type(result_summary, test_expectations.FAIL, "failures") self._print_expected_results_of_type(result_summary, test_expectations.FLAKY, "flaky") @@ -534,17 +525,16 @@ class Manager(object): for test in skipped: result = test_results.TestResult(test) result.type = test_expectations.SKIP - iterations = \ - (self._options.repeat_each if self._options.repeat_each else 1) * \ - (self._options.iterations if self._options.iterations else 1) for iteration in range(iterations): result_summary.add(result, expected=True) self._printer.print_expected('') - # Check to make sure we didn't filter out all of the tests. - if not len(self._test_files): - _log.info("All tests are being skipped") - return None + if self._options.repeat_each > 1: + self._printer.print_expected('Running each test %d times.' % self._options.repeat_each) + if self._options.iterations > 1: + self._printer.print_expected('Running %d iterations of the tests.' % self._options.iterations) + if iterations > 1: + self._printer.print_expected('') return result_summary @@ -719,11 +709,12 @@ class Manager(object): def _log_num_workers(self, num_workers, num_shards, num_locked_shards): driver_name = self._port.driver_name() if num_workers == 1: - self._printer.print_config("Running 1 %s over %s" % + self._printer.print_config("Running 1 %s over %s." % (driver_name, grammar.pluralize('shard', num_shards))) else: - self._printer.print_config("Running %d %ss in parallel over %d shards (%d locked)" % + self._printer.print_config("Running %d %ss in parallel over %d shards (%d locked)." % (num_workers, driver_name, num_shards, num_locked_shards)) + self._printer.print_config('') def _run_tests(self, file_list, result_summary, num_workers): """Runs the tests in the file_list. @@ -744,11 +735,10 @@ class Manager(object): self._current_result_summary = result_summary self._all_results = [] self._group_stats = {} - self._worker_states = {} + self._worker_stats = {} keyboard_interrupted = False interrupted = False - thread_timings = [] self._printer.print_update('Sharding tests ...') locked_shards, unlocked_shards = self._shard_tests(file_list, int(self._options.child_processes), self._options.fully_parallel) @@ -769,71 +759,32 @@ class Manager(object): num_workers = min(num_workers, len(all_shards)) self._log_num_workers(num_workers, len(all_shards), len(locked_shards)) - def worker_factory(worker_connection, worker_number): - return worker.Worker(worker_connection, worker_number, self.results_directory(), self._options) - - manager_connection = manager_worker_broker.get(num_workers, self, worker_factory, self._port.host) + def worker_factory(worker_connection): + return worker.Worker(worker_connection, self.results_directory(), self._options) if self._options.dry_run: - return (keyboard_interrupted, interrupted, thread_timings, self._group_stats, self._all_results) + return (keyboard_interrupted, interrupted, self._worker_stats.values(), self._group_stats, self._all_results) self._printer.print_update('Starting %s ...' % grammar.pluralize('worker', num_workers)) - for worker_number in xrange(num_workers): - worker_connection = manager_connection.start_worker(worker_number) - worker_state = _WorkerState(worker_number, worker_connection) - self._worker_states[worker_connection.name()] = worker_state - - time.sleep(self._port.worker_startup_delay_secs()) - - self._printer.print_update("Starting testing ...") - for shard in all_shards: - # FIXME: Change 'test_list' to 'shard', make sharding public. - manager_connection.post_message('test_list', shard.name, shard.test_inputs) - - # We post one 'stop' message for each worker. Because the stop message - # are sent after all of the tests, and because each worker will stop - # reading messsages after receiving a stop, we can be sure each - # worker will get a stop message and hence they will all shut down. - for _ in xrange(num_workers): - manager_connection.post_message('stop') try: - while not self.is_done(): - manager_connection.run_message_loop(delay_secs=1.0) - - # Make sure all of the workers have shut down (if possible). - for worker_state in self._worker_states.values(): - if worker_state.worker_connection.is_alive(): - _log.debug('Waiting for worker %d to exit' % worker_state.number) - worker_state.worker_connection.join(5.0) - if worker_state.worker_connection.is_alive(): - _log.error('Worker %d did not exit in time.' % worker_state.number) - + with message_pool.get(self, worker_factory, num_workers, self._port.worker_startup_delay_secs(), self._port.host) as pool: + pool.run(('test_list', shard.name, shard.test_inputs) for shard in all_shards) except KeyboardInterrupt: self._printer.flush() self._printer.write('Interrupted, exiting ...') - self.cancel_workers() keyboard_interrupted = True except TestRunInterruptedException, e: _log.warning(e.reason) - self.cancel_workers() interrupted = True - except WorkerException: - self.cancel_workers() - raise - except: - # Unexpected exception; don't try to clean up workers. - _log.error("Exception raised, exiting") - self.cancel_workers() + except Exception, e: + _log.debug('%s("%s") raised, exiting' % (e.__class__.__name__, str(e))) raise finally: - manager_connection.cleanup() self.stop_servers_with_lock() - thread_timings = [worker_state.stats for worker_state in self._worker_states.values()] - # FIXME: should this be a class instead of a tuple? - return (interrupted, keyboard_interrupted, thread_timings, self._group_stats, self._all_results) + return (interrupted, keyboard_interrupted, self._worker_stats.values(), self._group_stats, self._all_results) def results_directory(self): if not self._retrying: @@ -934,7 +885,7 @@ class Manager(object): self._print_timing_statistics(end_time - start_time, thread_timings, test_timings, individual_test_timings, result_summary) self._print_result_summary(result_summary) - self._printer.print_one_line_summary(result_summary.total, result_summary.expected, result_summary.unexpected) + self._printer.print_one_line_summary(result_summary.total - result_summary.expected_skips, result_summary.expected - result_summary.expected_skips, result_summary.unexpected) unexpected_results = summarize_results(self._port, self._expectations, result_summary, retry_summary, individual_test_timings, only_unexpected=True, interrupted=interrupted) self._printer.print_unexpected_results(unexpected_results) @@ -1375,9 +1326,8 @@ class Manager(object): result_summary: information to log """ failed = result_summary.total_failures - skipped = result_summary.total_tests_by_expectation[test_expectations.SKIP] - total = result_summary.total - passed = total - failed - skipped + total = result_summary.total - result_summary.expected_skips + passed = total - failed pct_passed = 0.0 if total > 0: pct_passed = float(passed) * 100 / total @@ -1442,42 +1392,17 @@ class Manager(object): results_filename = self._filesystem.join(self._results_directory, "results.html") self._port.show_results_html_file(results_filename) - def name(self): - return 'Manager' - - def is_done(self): - worker_states = self._worker_states.values() - return worker_states and all(self._worker_is_done(worker_state) for worker_state in worker_states) - - # FIXME: Inline this function. - def _worker_is_done(self, worker_state): - return worker_state.done - - def cancel_workers(self): - for worker_state in self._worker_states.values(): - worker_state.worker_connection.cancel() - - def handle_started_test(self, source, test_info, hang_timeout): - worker_state = self._worker_states[source] - worker_state.current_test_name = test_info.test_name - worker_state.next_timeout = time.time() + hang_timeout - - def handle_done(self, source, log_messages=None): - worker_state = self._worker_states[source] - worker_state.done = True - self._log_messages(log_messages) - - def handle_exception(self, source, exception_type, exception_value, stack): - if exception_type in (KeyboardInterrupt, TestRunInterruptedException): - raise exception_type(exception_value) - _log.error("%s raised %s('%s'):" % ( - source, - exception_value.__class__.__name__, - str(exception_value))) - self._log_worker_stack(stack) - raise WorkerException(str(exception_value)) - - def handle_finished_list(self, source, list_name, num_tests, elapsed_time): + def handle(self, name, source, *args): + method = getattr(self, '_handle_' + name) + if method: + return method(source, *args) + raise AssertionError('unknown message %s received from %s, args=%s' % (name, source, repr(args))) + + def _handle_started_test(self, worker_name, test_input, test_timeout_sec): + # FIXME: log that we've started another test. + pass + + def _handle_finished_test_list(self, worker_name, list_name, num_tests, elapsed_time): self._group_stats[list_name] = (num_tests, elapsed_time) def find(name, test_lists): @@ -1492,29 +1417,13 @@ class Manager(object): if not self._remaining_locked_shards: self.stop_servers_with_lock() - def handle_finished_test(self, source, result, elapsed_time, log_messages=None): - worker_state = self._worker_states[source] - worker_state.next_timeout = None - worker_state.current_test_name = None - worker_state.stats['total_time'] += elapsed_time - worker_state.stats['num_tests'] += 1 - - self._log_messages(log_messages) + def _handle_finished_test(self, worker_name, result, elapsed_time, log_messages=[]): + self._worker_stats.setdefault(worker_name, {'name': worker_name, 'num_tests': 0, 'total_time': 0}) + self._worker_stats[worker_name]['total_time'] += elapsed_time + self._worker_stats[worker_name]['num_tests'] += 1 self._all_results.append(result) self._update_summary_with_result(self._current_result_summary, result) - def _log_messages(self, messages): - for message in messages: - logging.root.handle(message) - - def _log_worker_stack(self, stack): - webkitpydir = self._port.path_from_webkit_base('Tools', 'Scripts', 'webkitpy') + self._filesystem.sep - for filename, line_number, function_name, text in stack: - if filename.startswith(webkitpydir): - filename = filename.replace(webkitpydir, '') - _log.error(' %s:%u (in %s)' % (filename, line_number, function_name)) - _log.error(' %s' % text) - def read_test_files(fs, filenames, test_path_separator): tests = [] @@ -1563,20 +1472,3 @@ def natural_sort_key(string_to_split): return val return [tryint(chunk) for chunk in re.split('(\d+)', string_to_split)] - - -class _WorkerState(object): - """A class for the manager to use to track the current state of the workers.""" - def __init__(self, number, worker_connection): - self.worker_connection = worker_connection - self.number = number - self.done = False - self.current_test_name = None - self.next_timeout = None - self.stats = {} - self.stats['name'] = worker_connection.name() - self.stats['num_tests'] = 0 - self.stats['total_time'] = 0 - - def __repr__(self): - return "_WorkerState(" + str(self.__dict__) + ")" |
