diff options
Diffstat (limited to 'python')
| -rw-r--r-- | python/subunit/__init__.py | 5 | ||||
| -rwxr-xr-x | python/subunit/run.py | 23 | ||||
| -rw-r--r-- | python/subunit/test_results.py | 47 |
3 files changed, 63 insertions, 12 deletions
diff --git a/python/subunit/__init__.py b/python/subunit/__init__.py index ad749ff..7470f92 100644 --- a/python/subunit/__init__.py +++ b/python/subunit/__init__.py @@ -126,7 +126,7 @@ try: except ImportError: _UnsupportedOperation = AttributeError - +from extras import safe_hasattr from testtools import content, content_type, ExtendedToOriginalDecorator from testtools.content import TracebackContent from testtools.compat import _b, _u, BytesIO, StringIO @@ -1268,7 +1268,8 @@ def get_default_formatter(): else: stream = sys.stdout if sys.version_info > (3, 0): - stream = stream.buffer + if safe_hasattr(stream, 'buffer'): + stream = stream.buffer return stream diff --git a/python/subunit/run.py b/python/subunit/run.py index 612d7ea..d0a4a91 100755 --- a/python/subunit/run.py +++ b/python/subunit/run.py @@ -50,9 +50,7 @@ class SubunitTestRunner(object): def run(self, test): "Run the given test case or test suite." - result = StreamResultToBytes(self.stream) - for case in iterate_tests(test): - result.status(test_id=case.id(), test_status='exists') + result = self._list(test) result = ExtendedToStreamDecorator(result) result = AutoTimingTestResultDecorator(result) if self.failfast is not None: @@ -64,6 +62,24 @@ class SubunitTestRunner(object): result.stopTestRun() return result + def list(self, test): + "List the test." + self._list(test) + + def _list(self, test): + try: + fileno = self.stream.fileno() + except: + fileno = None + if fileno is not None: + stream = os.fdopen(fileno, 'wb', 0) + else: + stream = self.stream + result = StreamResultToBytes(stream) + for case in iterate_tests(test): + result.status(test_id=case.id(), test_status='exists') + return result + class SubunitTestProgram(TestProgram): @@ -91,7 +107,6 @@ class SubunitTestProgram(TestProgram): if __name__ == '__main__': # Disable the default buffering, for Python 2.x where pdb doesn't do it # on non-ttys. - sys.stdout = os.fdopen(sys.stdout.fileno(), 'ab', 0) stream = get_default_formatter() runner = SubunitTestRunner SubunitTestProgram(module=None, argv=sys.argv, testRunner=runner, diff --git a/python/subunit/test_results.py b/python/subunit/test_results.py index c9c7681..8c89d9b 100644 --- a/python/subunit/test_results.py +++ b/python/subunit/test_results.py @@ -527,16 +527,24 @@ class TestResultFilter(TestResultDecorator): class TestIdPrintingResult(testtools.TestResult): + """Print test ids to a stream. - def __init__(self, stream, show_times=False): + Implements both TestResult and StreamResult, for compatibility. + """ + + def __init__(self, stream, show_times=False, show_exists=False): """Create a FilterResult object outputting to stream.""" super(TestIdPrintingResult, self).__init__() self._stream = stream + self.show_exists = show_exists + self.show_times = show_times + + def startTestRun(self): self.failed_tests = 0 self.__time = None - self.show_times = show_times self._test = None self._test_duration = 0 + self._active_tests = {} def addError(self, test, err): self.failed_tests += 1 @@ -559,21 +567,44 @@ class TestIdPrintingResult(testtools.TestResult): def addExpectedFailure(self, test, err=None, details=None): self._test = test - def reportTest(self, test, duration): + def reportTest(self, test_id, duration): if self.show_times: seconds = duration.seconds seconds += duration.days * 3600 * 24 seconds += duration.microseconds / 1000000.0 - self._stream.write(test.id() + ' %0.3f\n' % seconds) + self._stream.write(test_id + ' %0.3f\n' % seconds) else: - self._stream.write(test.id() + '\n') + self._stream.write(test_id + '\n') def startTest(self, test): self._start_time = self._time() + def status(self, test_id=None, test_status=None, test_tags=None, + runnable=True, file_name=None, file_bytes=None, eof=False, + mime_type=None, route_code=None, timestamp=None): + if not test_id: + return + if timestamp is not None: + self.time(timestamp) + if test_status=='exists': + if self.show_exists: + self.reportTest(test_id, 0) + elif test_status in ('inprogress', None): + self._active_tests[test_id] = self._time() + else: + self._end_test(test_id) + + def _end_test(self, test_id): + test_start = self._active_tests.pop(test_id, None) + if not test_start: + test_duration = 0 + else: + test_duration = self._time() - test_start + self.reportTest(test_id, test_duration) + def stopTest(self, test): test_duration = self._time() - self._start_time - self.reportTest(self._test, test_duration) + self.reportTest(self._test.id(), test_duration) def time(self, time): self.__time = time @@ -585,6 +616,10 @@ class TestIdPrintingResult(testtools.TestResult): "Tells whether or not this result was a success" return self.failed_tests == 0 + def stopTestRun(self): + for test_id in list(self._active_tests.keys()): + self._end_test(test_id) + class TestByTestResult(testtools.TestResult): """Call something every time a test completes.""" |
