diff options
Diffstat (limited to 'testrepository/repository/memory.py')
-rw-r--r-- | testrepository/repository/memory.py | 117 |
1 files changed, 39 insertions, 78 deletions
diff --git a/testrepository/repository/memory.py b/testrepository/repository/memory.py index 9eca6da..a332d6f 100644 --- a/testrepository/repository/memory.py +++ b/testrepository/repository/memory.py @@ -17,7 +17,7 @@ from io import BytesIO import subunit -from testtools.content import TracebackContent +import testtools from testrepository.repository import ( AbstractRepository, @@ -107,10 +107,9 @@ class _Failures(AbstractTestRun): return self def run(self, result): - for outcome, test, details in self._repository._failing.values(): - result.startTest(test) - getattr(result, 'add' + outcome)(test, details=details) - result.stopTest(test) + # Speaks original. + for case in self._repository._failing.values(): + case.run(result) class _Inserter(AbstractTestRun): @@ -119,98 +118,60 @@ class _Inserter(AbstractTestRun): def __init__(self, repository, partial): self._repository = repository self._partial = partial - self._outcomes = [] - self._events = [] - self._time = None - self._test_start = None + self._tests = [] + # Subunit V1 stream for get_subunit_stream + self._subunit = None def startTestRun(self): - pass + self._subunit = BytesIO() + serialiser = subunit.TestProtocolClient(self._subunit) + serialiser = testtools.StreamToExtendedDecorator(serialiser) + self._hook = testtools.CopyStreamResult([ + testtools.StreamToDict(self._handle_test), + serialiser]) + self._hook.startTestRun() + + def _handle_test(self, test_dict): + self._tests.append(test_dict) + start, stop = test_dict['timestamps'] + if None in (start, stop): + return + duration_delta = stop - start + duration_seconds = ((duration_delta.microseconds + + (duration_delta.seconds + duration_delta.days * 24 * 3600) + * 10**6) / 10.0**6) + self._repository._times[test_dict['id']] = duration_seconds def stopTestRun(self): + self._hook.stopTestRun() self._repository._runs.append(self) self._run_id = len(self._repository._runs) - 1 if not self._partial: self._repository._failing = {} - for record in self._outcomes: - test_id = record[1].id() - if record[0] in ('Failure', 'Error'): - self._repository._failing[test_id] = record + for test_dict in self._tests: + test_id = test_dict['id'] + if test_dict['status'] == 'fail': + case = testtools.testresult.real.test_dict_to_case(test_dict) + self._repository._failing[test_id] = case else: self._repository._failing.pop(test_id, None) return self._run_id - def startTest(self, test): - self._test_start = self._time - self._events.append(('startTest', test)) - - def stopTest(self, test): - self._events.append(('stopTest', test)) - if None in (self._test_start, self._time): - return - duration_delta = self._time - self._test_start - duration_seconds = ((duration_delta.microseconds + - (duration_delta.seconds + duration_delta.days * 24 * 3600) - * 10**6) / 10.0**6) - self._repository._times[test.id()] = duration_seconds - - def _addOutcome(self, outcome, test, details): - self._outcomes.append((outcome, test, details)) - - def addSuccess(self, test, details=None): - self._events.append(('addSuccess', test, details)) - self._addOutcome('Success', test, details) - - def _force_to_details(self, test, err, details): - if not details: - details = {} - if err is not None: - details['err'] = TracebackContent(err, test) - return details - - def addFailure(self, test, err=None, details=None): - # Don't support old interface for now. - self._events.append(('addFailure', test, err, details)) - details = self._force_to_details(test, err, details) - self._addOutcome('Failure', test, details) - - def addError(self, test, err=None, details=None): - self._events.append(('addError', test, err, details)) - details = self._force_to_details(test, err, details) - self._addOutcome('Error', test, details) - - def addExpectedFailure(self, test, err=None, details=None): - assert err is None - self._events.append(('addExpectedFailure', test, None, details)) - self._addOutcome('ExpectedFailure', test, details) - - def addUnexpectedSuccess(self, test, details=None): - self._events.append(('addUnexpectedSuccess', test, details)) - self._addOutcome('UnexpectedSuccess', test, details) - - def addSkip(self, test, reason=None, details=None): - assert reason is None - self._events.append(('addSkip', test, None, details)) - self._addOutcome('Skip', test, details) + def status(self, *args, **kwargs): + self._hook.status(*args, **kwargs) def get_id(self): return self._run_id def get_subunit_stream(self): - result = BytesIO() - serialiser = subunit.TestProtocolClient(result) - self.run(serialiser) - result.seek(0) - return result + self._subunit.seek(0) + return self._subunit def get_test(self): return self def run(self, result): - for event in self._events: - method = getattr(result, event[0]) - method(*event[1:]) - - def time(self, timestamp): - self._events.append(('time', timestamp)) - self._time = timestamp + # Speaks original. + for test_dict in self._tests: + case = testtools.testresult.real.test_dict_to_case(test_dict) + case.run(result) |