summaryrefslogtreecommitdiff
path: root/testrepository/repository/memory.py
diff options
context:
space:
mode:
Diffstat (limited to 'testrepository/repository/memory.py')
-rw-r--r--testrepository/repository/memory.py117
1 files changed, 39 insertions, 78 deletions
diff --git a/testrepository/repository/memory.py b/testrepository/repository/memory.py
index 9eca6da..a332d6f 100644
--- a/testrepository/repository/memory.py
+++ b/testrepository/repository/memory.py
@@ -17,7 +17,7 @@
from io import BytesIO
import subunit
-from testtools.content import TracebackContent
+import testtools
from testrepository.repository import (
AbstractRepository,
@@ -107,10 +107,9 @@ class _Failures(AbstractTestRun):
return self
def run(self, result):
- for outcome, test, details in self._repository._failing.values():
- result.startTest(test)
- getattr(result, 'add' + outcome)(test, details=details)
- result.stopTest(test)
+ # Speaks original.
+ for case in self._repository._failing.values():
+ case.run(result)
class _Inserter(AbstractTestRun):
@@ -119,98 +118,60 @@ class _Inserter(AbstractTestRun):
def __init__(self, repository, partial):
self._repository = repository
self._partial = partial
- self._outcomes = []
- self._events = []
- self._time = None
- self._test_start = None
+ self._tests = []
+ # Subunit V1 stream for get_subunit_stream
+ self._subunit = None
def startTestRun(self):
- pass
+ self._subunit = BytesIO()
+ serialiser = subunit.TestProtocolClient(self._subunit)
+ serialiser = testtools.StreamToExtendedDecorator(serialiser)
+ self._hook = testtools.CopyStreamResult([
+ testtools.StreamToDict(self._handle_test),
+ serialiser])
+ self._hook.startTestRun()
+
+ def _handle_test(self, test_dict):
+ self._tests.append(test_dict)
+ start, stop = test_dict['timestamps']
+ if None in (start, stop):
+ return
+ duration_delta = stop - start
+ duration_seconds = ((duration_delta.microseconds +
+ (duration_delta.seconds + duration_delta.days * 24 * 3600)
+ * 10**6) / 10.0**6)
+ self._repository._times[test_dict['id']] = duration_seconds
def stopTestRun(self):
+ self._hook.stopTestRun()
self._repository._runs.append(self)
self._run_id = len(self._repository._runs) - 1
if not self._partial:
self._repository._failing = {}
- for record in self._outcomes:
- test_id = record[1].id()
- if record[0] in ('Failure', 'Error'):
- self._repository._failing[test_id] = record
+ for test_dict in self._tests:
+ test_id = test_dict['id']
+ if test_dict['status'] == 'fail':
+ case = testtools.testresult.real.test_dict_to_case(test_dict)
+ self._repository._failing[test_id] = case
else:
self._repository._failing.pop(test_id, None)
return self._run_id
- def startTest(self, test):
- self._test_start = self._time
- self._events.append(('startTest', test))
-
- def stopTest(self, test):
- self._events.append(('stopTest', test))
- if None in (self._test_start, self._time):
- return
- duration_delta = self._time - self._test_start
- duration_seconds = ((duration_delta.microseconds +
- (duration_delta.seconds + duration_delta.days * 24 * 3600)
- * 10**6) / 10.0**6)
- self._repository._times[test.id()] = duration_seconds
-
- def _addOutcome(self, outcome, test, details):
- self._outcomes.append((outcome, test, details))
-
- def addSuccess(self, test, details=None):
- self._events.append(('addSuccess', test, details))
- self._addOutcome('Success', test, details)
-
- def _force_to_details(self, test, err, details):
- if not details:
- details = {}
- if err is not None:
- details['err'] = TracebackContent(err, test)
- return details
-
- def addFailure(self, test, err=None, details=None):
- # Don't support old interface for now.
- self._events.append(('addFailure', test, err, details))
- details = self._force_to_details(test, err, details)
- self._addOutcome('Failure', test, details)
-
- def addError(self, test, err=None, details=None):
- self._events.append(('addError', test, err, details))
- details = self._force_to_details(test, err, details)
- self._addOutcome('Error', test, details)
-
- def addExpectedFailure(self, test, err=None, details=None):
- assert err is None
- self._events.append(('addExpectedFailure', test, None, details))
- self._addOutcome('ExpectedFailure', test, details)
-
- def addUnexpectedSuccess(self, test, details=None):
- self._events.append(('addUnexpectedSuccess', test, details))
- self._addOutcome('UnexpectedSuccess', test, details)
-
- def addSkip(self, test, reason=None, details=None):
- assert reason is None
- self._events.append(('addSkip', test, None, details))
- self._addOutcome('Skip', test, details)
+ def status(self, *args, **kwargs):
+ self._hook.status(*args, **kwargs)
def get_id(self):
return self._run_id
def get_subunit_stream(self):
- result = BytesIO()
- serialiser = subunit.TestProtocolClient(result)
- self.run(serialiser)
- result.seek(0)
- return result
+ self._subunit.seek(0)
+ return self._subunit
def get_test(self):
return self
def run(self, result):
- for event in self._events:
- method = getattr(result, event[0])
- method(*event[1:])
-
- def time(self, timestamp):
- self._events.append(('time', timestamp))
- self._time = timestamp
+ # Speaks original.
+ for test_dict in self._tests:
+ case = testtools.testresult.real.test_dict_to_case(test_dict)
+ case.run(result)