summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/subunit/__init__.py5
-rwxr-xr-xpython/subunit/run.py23
-rw-r--r--python/subunit/test_results.py47
3 files changed, 63 insertions, 12 deletions
diff --git a/python/subunit/__init__.py b/python/subunit/__init__.py
index ad749ff..7470f92 100644
--- a/python/subunit/__init__.py
+++ b/python/subunit/__init__.py
@@ -126,7 +126,7 @@ try:
except ImportError:
_UnsupportedOperation = AttributeError
-
+from extras import safe_hasattr
from testtools import content, content_type, ExtendedToOriginalDecorator
from testtools.content import TracebackContent
from testtools.compat import _b, _u, BytesIO, StringIO
@@ -1268,7 +1268,8 @@ def get_default_formatter():
else:
stream = sys.stdout
if sys.version_info > (3, 0):
- stream = stream.buffer
+ if safe_hasattr(stream, 'buffer'):
+ stream = stream.buffer
return stream
diff --git a/python/subunit/run.py b/python/subunit/run.py
index 612d7ea..d0a4a91 100755
--- a/python/subunit/run.py
+++ b/python/subunit/run.py
@@ -50,9 +50,7 @@ class SubunitTestRunner(object):
def run(self, test):
"Run the given test case or test suite."
- result = StreamResultToBytes(self.stream)
- for case in iterate_tests(test):
- result.status(test_id=case.id(), test_status='exists')
+ result = self._list(test)
result = ExtendedToStreamDecorator(result)
result = AutoTimingTestResultDecorator(result)
if self.failfast is not None:
@@ -64,6 +62,24 @@ class SubunitTestRunner(object):
result.stopTestRun()
return result
+ def list(self, test):
+ "List the test."
+ self._list(test)
+
+ def _list(self, test):
+ try:
+ fileno = self.stream.fileno()
+ except:
+ fileno = None
+ if fileno is not None:
+ stream = os.fdopen(fileno, 'wb', 0)
+ else:
+ stream = self.stream
+ result = StreamResultToBytes(stream)
+ for case in iterate_tests(test):
+ result.status(test_id=case.id(), test_status='exists')
+ return result
+
class SubunitTestProgram(TestProgram):
@@ -91,7 +107,6 @@ class SubunitTestProgram(TestProgram):
if __name__ == '__main__':
# Disable the default buffering, for Python 2.x where pdb doesn't do it
# on non-ttys.
- sys.stdout = os.fdopen(sys.stdout.fileno(), 'ab', 0)
stream = get_default_formatter()
runner = SubunitTestRunner
SubunitTestProgram(module=None, argv=sys.argv, testRunner=runner,
diff --git a/python/subunit/test_results.py b/python/subunit/test_results.py
index c9c7681..8c89d9b 100644
--- a/python/subunit/test_results.py
+++ b/python/subunit/test_results.py
@@ -527,16 +527,24 @@ class TestResultFilter(TestResultDecorator):
class TestIdPrintingResult(testtools.TestResult):
+ """Print test ids to a stream.
- def __init__(self, stream, show_times=False):
+ Implements both TestResult and StreamResult, for compatibility.
+ """
+
+ def __init__(self, stream, show_times=False, show_exists=False):
"""Create a FilterResult object outputting to stream."""
super(TestIdPrintingResult, self).__init__()
self._stream = stream
+ self.show_exists = show_exists
+ self.show_times = show_times
+
+ def startTestRun(self):
self.failed_tests = 0
self.__time = None
- self.show_times = show_times
self._test = None
self._test_duration = 0
+ self._active_tests = {}
def addError(self, test, err):
self.failed_tests += 1
@@ -559,21 +567,44 @@ class TestIdPrintingResult(testtools.TestResult):
def addExpectedFailure(self, test, err=None, details=None):
self._test = test
- def reportTest(self, test, duration):
+ def reportTest(self, test_id, duration):
if self.show_times:
seconds = duration.seconds
seconds += duration.days * 3600 * 24
seconds += duration.microseconds / 1000000.0
- self._stream.write(test.id() + ' %0.3f\n' % seconds)
+ self._stream.write(test_id + ' %0.3f\n' % seconds)
else:
- self._stream.write(test.id() + '\n')
+ self._stream.write(test_id + '\n')
def startTest(self, test):
self._start_time = self._time()
+ def status(self, test_id=None, test_status=None, test_tags=None,
+ runnable=True, file_name=None, file_bytes=None, eof=False,
+ mime_type=None, route_code=None, timestamp=None):
+ if not test_id:
+ return
+ if timestamp is not None:
+ self.time(timestamp)
+ if test_status=='exists':
+ if self.show_exists:
+ self.reportTest(test_id, 0)
+ elif test_status in ('inprogress', None):
+ self._active_tests[test_id] = self._time()
+ else:
+ self._end_test(test_id)
+
+ def _end_test(self, test_id):
+ test_start = self._active_tests.pop(test_id, None)
+ if not test_start:
+ test_duration = 0
+ else:
+ test_duration = self._time() - test_start
+ self.reportTest(test_id, test_duration)
+
def stopTest(self, test):
test_duration = self._time() - self._start_time
- self.reportTest(self._test, test_duration)
+ self.reportTest(self._test.id(), test_duration)
def time(self, time):
self.__time = time
@@ -585,6 +616,10 @@ class TestIdPrintingResult(testtools.TestResult):
"Tells whether or not this result was a success"
return self.failed_tests == 0
+ def stopTestRun(self):
+ for test_id in list(self._active_tests.keys()):
+ self._end_test(test_id)
+
class TestByTestResult(testtools.TestResult):
"""Call something every time a test completes."""