diff options
| author | John Arbash Meinel <john@arbash-meinel.com> | 2011-06-30 12:59:08 +0200 |
|---|---|---|
| committer | John Arbash Meinel <john@arbash-meinel.com> | 2011-06-30 12:59:08 +0200 |
| commit | 3a41c806338121e7597c52b8e6e79302e67d8008 (patch) | |
| tree | df4154aad123123069b2730846aa6b907dbdb9d7 /python | |
| parent | 6954432d06307b33e53bbeb11799b2f61148ef31 (diff) | |
| parent | fac837618f84531205f6491fb9d72bde8717fbb8 (diff) | |
| download | subunit-git-3a41c806338121e7597c52b8e6e79302e67d8008.tar.gz | |
Merge trunk r142 and resolve conflicts.
Diffstat (limited to 'python')
| -rw-r--r-- | python/subunit/__init__.py | 67 | ||||
| -rw-r--r-- | python/subunit/chunked.py | 24 | ||||
| -rwxr-xr-x | python/subunit/run.py | 3 | ||||
| -rw-r--r-- | python/subunit/test_results.py | 226 | ||||
| -rw-r--r-- | python/subunit/tests/test_chunked.py | 24 | ||||
| -rw-r--r-- | python/subunit/tests/test_details.py | 1 | ||||
| -rw-r--r-- | python/subunit/tests/test_subunit_filter.py | 202 | ||||
| -rw-r--r-- | python/subunit/tests/test_test_protocol.py | 21 | ||||
| -rw-r--r-- | python/subunit/tests/test_test_results.py | 125 |
9 files changed, 503 insertions, 190 deletions
diff --git a/python/subunit/__init__.py b/python/subunit/__init__.py index b6f0108..368d3b2 100644 --- a/python/subunit/__init__.py +++ b/python/subunit/__init__.py @@ -6,7 +6,7 @@ # license at the users choice. A copy of both licenses are available in the # project source as Apache-2.0 and BSD. You may not use this file except in # compliance with one of these two licences. -# +# # Unless required by applicable law or agreed to in writing, software # distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the @@ -49,7 +49,7 @@ details, tags, timestamping and progress markers). The test outcome methods ``addSuccess``, ``addError``, ``addExpectedFailure``, ``addFailure``, ``addSkip`` take an optional keyword parameter ``details`` which can be used instead of the usual python unittest parameter. -When used the value of details should be a dict from ``string`` to +When used the value of details should be a dict from ``string`` to ``testtools.content.Content`` objects. This is a draft API being worked on with the Python Testing In Python mail list, with the goal of permitting a common way to provide additional data beyond a traceback, such as captured data from @@ -58,7 +58,7 @@ and newer). The ``tags(new_tags, gone_tags)`` method is called (if present) to add or remove tags in the test run that is currently executing. If called when no -test is in progress (that is, if called outside of the ``startTest``, +test is in progress (that is, if called outside of the ``startTest``, ``stopTest`` pair), the the tags apply to all sebsequent tests. If called when a test is in progress, then the tags only apply to that test. @@ -87,7 +87,7 @@ tests, allowing isolation between the test runner and some tests. Similarly, ``IsolatedTestCase`` is a base class which can be subclassed to get tests that will fork() before that individual test is run. -`ExecTestCase`` is a convenience wrapper for running an external +`ExecTestCase`` is a convenience wrapper for running an external program to get a Subunit stream and then report that back to an arbitrary result object:: @@ -98,7 +98,7 @@ result object:: def test_script_two(self): './bin/script_two' - + # Normally your normal test loading would take of this automatically, # It is only spelt out in detail here for clarity. suite = unittest.TestSuite([AggregateTests("test_script_one"), @@ -116,7 +116,6 @@ Utility modules * subunit.test_results contains TestResult helper classes. """ -import datetime import os import re from StringIO import StringIO @@ -254,7 +253,7 @@ class _InTest(_ParserState): def _outcome(self, offset, line, no_details, details_state): """An outcome directive has been read. - + :param no_details: Callable to call when no details are presented. :param details_state: The state to switch to for details processing of this outcome. @@ -382,7 +381,7 @@ class _ReadingFailureDetails(_ReadingDetails): def _outcome_label(self): return "failure" - + class _ReadingErrorDetails(_ReadingDetails): """State for the subunit parser when reading error details.""" @@ -430,7 +429,7 @@ class _ReadingSuccessDetails(_ReadingDetails): class TestProtocolServer(object): """A parser for subunit. - + :ivar tags: The current tags associated with the protocol stream. """ @@ -442,7 +441,7 @@ class TestProtocolServer(object): subunit protocol should be written to. This allows custom handling of mixed protocols. By default, sys.stdout will be used for convenience. - :param forward_stream: A stream to forward subunit lines to. This + :param forward_stream: A stream to forward subunit lines to. This allows a filter to forward the entire stream while still parsing and acting on it. By default forward_stream is set to DiscardStream() and no forwarding happens. @@ -510,7 +509,7 @@ class TestProtocolServer(object): def readFrom(self, pipe): """Blocking convenience API to parse an entire stream. - + :param pipe: A file-like object supporting readlines(). :return: None. """ @@ -531,7 +530,7 @@ class TestProtocolServer(object): class TestProtocolClient(testresult.TestResult): """A TestResult which generates a subunit stream for a test run. - + # Get a TestSuite or TestCase to run suite = make_suite() # Create a stream (any object with a 'write' method) @@ -554,7 +553,7 @@ class TestProtocolClient(testresult.TestResult): def addError(self, test, error=None, details=None): """Report an error in test test. - + Only one of error and details should be provided: conceptually there are two separate methods: addError(self, test, error) @@ -569,7 +568,7 @@ class TestProtocolClient(testresult.TestResult): def addExpectedFailure(self, test, error=None, details=None): """Report an expected failure in test test. - + Only one of error and details should be provided: conceptually there are two separate methods: addError(self, test, error) @@ -584,7 +583,7 @@ class TestProtocolClient(testresult.TestResult): def addFailure(self, test, error=None, details=None): """Report a failure in test test. - + Only one of error and details should be provided: conceptually there are two separate methods: addFailure(self, test, error) @@ -599,7 +598,7 @@ class TestProtocolClient(testresult.TestResult): def _addOutcome(self, outcome, test, error=None, details=None): """Report a failure in test test. - + Only one of error and details should be provided: conceptually there are two separate methods: addOutcome(self, test, error) @@ -646,7 +645,13 @@ class TestProtocolClient(testresult.TestResult): def startTest(self, test): """Mark a test as starting its test run.""" + super(TestProtocolClient, self).startTest(test) self._stream.write("test: %s\n" % test.id()) + self._stream.flush() + + def stopTest(self, test): + super(TestProtocolClient, self).stopTest(test) + self._stream.flush() def progress(self, offset, whence): """Provide indication about the progress/length of the test run. @@ -711,7 +716,7 @@ def RemoteError(description=u""): class RemotedTestCase(unittest.TestCase): """A class to represent test cases run in child processes. - + Instances of this class are used to provide the Python test API a TestCase that can be printed to the screen, introspected for metadata and so on. However, as they are a simply a memoisation of a test that was actually @@ -796,7 +801,7 @@ class ExecTestCase(unittest.TestCase): class IsolatedTestCase(unittest.TestCase): """A TestCase which executes in a forked process. - + Each test gets its own process, which has a performance overhead but will provide excellent isolation from global state (such as django configs, zope utilities and so on). @@ -809,7 +814,7 @@ class IsolatedTestCase(unittest.TestCase): class IsolatedTestSuite(unittest.TestSuite): """A TestSuite which runs its tests in a forked process. - + This decorator that will fork() before running the tests and report the results from the child process using a Subunit stream. This is useful for handling tests that mutate global state, or are testing C extensions that @@ -861,7 +866,7 @@ def run_isolated(klass, self, result): def TAP2SubUnit(tap, subunit): """Filter a TAP pipe into a subunit pipe. - + :param tap: A tap pipe/stream/file object. :param subunit: A pipe/stream/file object to write subunit results to. :return: The exit code to exit with. @@ -869,7 +874,6 @@ def TAP2SubUnit(tap, subunit): BEFORE_PLAN = 0 AFTER_PLAN = 1 SKIP_STREAM = 2 - client = TestProtocolClient(subunit) state = BEFORE_PLAN plan_start = 1 plan_stop = 0 @@ -1019,11 +1023,11 @@ class ProtocolTestCase(object): that has been encoded into the stream. The ``unittest.TestCase`` ``debug`` and ``countTestCases`` methods are not supported because there isn't a sensible mapping for those methods. - + # Get a stream (any object with a readline() method), in this case the # stream output by the example from ``subunit.TestProtocolClient``. stream = file('tests.log', 'rb') - # Create a parser which will read from the stream and emit + # Create a parser which will read from the stream and emit # activity to a unittest.TestResult when run() is called. suite = subunit.ProtocolTestCase(stream) # Create a result object to accept the contents of that stream. @@ -1067,7 +1071,7 @@ class ProtocolTestCase(object): class TestResultStats(testresult.TestResult): """A pyunit TestResult interface implementation for making statistics. - + :ivar total_tests: The total tests seen. :ivar passed_tests: The tests that passed. :ivar failed_tests: The tests that failed. @@ -1118,7 +1122,7 @@ class TestResultStats(testresult.TestResult): def get_default_formatter(): """Obtain the default formatter to write to. - + :return: A file-like object. """ formatter = os.getenv("SUBUNIT_FORMATTER") @@ -1128,6 +1132,19 @@ def get_default_formatter(): return sys.stdout +def read_test_list(path): + """Read a list of test ids from a file on disk. + + :param path: Path to the file + :return: Sequence of test ids + """ + f = open(path, 'rb') + try: + return [l.rstrip("\n") for l in f.readlines()] + finally: + f.close() + + def _make_stream_binary(stream): """Ensure that a stream will be binary safe. See _make_binary_on_windows.""" if getattr(stream, 'fileno', None) is not None: diff --git a/python/subunit/chunked.py b/python/subunit/chunked.py index 82e4b0d..5f8c6f1 100644 --- a/python/subunit/chunked.py +++ b/python/subunit/chunked.py @@ -1,12 +1,13 @@ # # subunit: extensions to python unittest to get test results from subprocesses. # Copyright (C) 2005 Robert Collins <robertc@robertcollins.net> +# Copyright (C) 2011 Martin Pool <mbp@sourcefrog.net> # # Licensed under either the Apache License, Version 2.0 or the BSD 3-clause # license at the users choice. A copy of both licenses are available in the # project source as Apache-2.0 and BSD. You may not use this file except in # compliance with one of these two licences. -# +# # Unless required by applicable law or agreed to in writing, software # distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the @@ -19,7 +20,7 @@ class Decoder(object): """Decode chunked content to a byte stream.""" - def __init__(self, output): + def __init__(self, output, strict=True): """Create a decoder decoding to output. :param output: A file-like object. Bytes written to the Decoder are @@ -29,11 +30,18 @@ class Decoder(object): when no more data is available, to detect short streams; the write method will return none-None when the end of a stream is detected. + + :param strict: If True (the default), the decoder will not knowingly + accept input that is not conformant to the HTTP specification. + (This does not imply that it will catch every nonconformance.) + If False, it will accept incorrect input that is still + unambiguous. """ self.output = output self.buffered_bytes = [] self.state = self._read_length self.body_length = 0 + self.strict = strict def close(self): """Close the decoder. @@ -72,7 +80,6 @@ class Decoder(object): def _read_length(self): """Try to decode a length from the bytes.""" - count = -1 match_chars = "0123456789abcdefABCDEF\r\n" count_chars = [] for bytes in self.buffered_bytes: @@ -87,7 +94,12 @@ class Decoder(object): if count_chars[-1][-1] != '\n': return count_str = ''.join(count_chars) - self.body_length = int(count_str[:-2], 16) + if self.strict: + if count_str[-2:] != '\r\n': + raise ValueError("chunk header invalid: %r" % count_str) + if '\r' in count_str[:-2]: + raise ValueError("too many CRs in chunk header %r" % count_str) + self.body_length = int(count_str.rstrip('\n\r'), 16) excess_bytes = len(count_str) while excess_bytes: if excess_bytes >= len(self.buffered_bytes[0]): @@ -107,7 +119,7 @@ class Decoder(object): def write(self, bytes): """Decode bytes to the output stream. - + :raises ValueError: If the stream has already seen the end of file marker. :returns: None, or the excess bytes beyond the end of file marker. @@ -133,7 +145,7 @@ class Encoder(object): def flush(self, extra_len=0): """Flush the encoder to the output stream. - + :param extra_len: Increase the size of the chunk by this many bytes to allow for a subsequent write. """ diff --git a/python/subunit/run.py b/python/subunit/run.py index daa241a..b390de3 100755 --- a/python/subunit/run.py +++ b/python/subunit/run.py @@ -69,4 +69,5 @@ class SubunitTestProgram(TestProgram): if __name__ == '__main__': stream = get_default_formatter() runner = SubunitTestRunner(stream) - SubunitTestProgram(module=None, argv=sys.argv, testRunner=runner) + SubunitTestProgram(module=None, argv=sys.argv, testRunner=runner, + stdout=sys.stdout) diff --git a/python/subunit/test_results.py b/python/subunit/test_results.py index cc588d2..33fb50e 100644 --- a/python/subunit/test_results.py +++ b/python/subunit/test_results.py @@ -18,9 +18,10 @@ import datetime -import iso8601 import testtools +from subunit import iso8601 + # NOT a TestResult, because we are implementing the interface, not inheriting # it. @@ -81,8 +82,12 @@ class TestResultDecorator(object): def stop(self): return self.decorated.stop() + @property + def testsRun(self): + return self.decorated.testsRun + def tags(self, new_tags, gone_tags): - return self.decorated.time(new_tags, gone_tags) + return self.decorated.tags(new_tags, gone_tags) def time(self, a_datetime): return self.decorated.time(a_datetime) @@ -195,6 +200,87 @@ class AutoTimingTestResultDecorator(HookedTestResultDecorator): return self.decorated.time(a_datetime) +class TagCollapsingDecorator(TestResultDecorator): + """Collapses many 'tags' calls into one where possible.""" + + def __init__(self, result): + super(TagCollapsingDecorator, self).__init__(result) + # The (new, gone) tags for the current test. + self._current_test_tags = None + + def startTest(self, test): + """Start a test. + + Not directly passed to the client, but used for handling of tags + correctly. + """ + self.decorated.startTest(test) + self._current_test_tags = set(), set() + + def stopTest(self, test): + """Stop a test. + + Not directly passed to the client, but used for handling of tags + correctly. + """ + # Tags to output for this test. + if self._current_test_tags[0] or self._current_test_tags[1]: + self.decorated.tags(*self._current_test_tags) + self.decorated.stopTest(test) + self._current_test_tags = None + + def tags(self, new_tags, gone_tags): + """Handle tag instructions. + + Adds and removes tags as appropriate. If a test is currently running, + tags are not affected for subsequent tests. + + :param new_tags: Tags to add, + :param gone_tags: Tags to remove. + """ + if self._current_test_tags is not None: + # gather the tags until the test stops. + self._current_test_tags[0].update(new_tags) + self._current_test_tags[0].difference_update(gone_tags) + self._current_test_tags[1].update(gone_tags) + self._current_test_tags[1].difference_update(new_tags) + else: + return self.decorated.tags(new_tags, gone_tags) + + +class TimeCollapsingDecorator(HookedTestResultDecorator): + """Only pass on the first and last of a consecutive sequence of times.""" + + def __init__(self, decorated): + super(TimeCollapsingDecorator, self).__init__(decorated) + self._last_received_time = None + self._last_sent_time = None + + def _before_event(self): + if self._last_received_time is None: + return + if self._last_received_time != self._last_sent_time: + self.decorated.time(self._last_received_time) + self._last_sent_time = self._last_received_time + self._last_received_time = None + + def time(self, a_time): + # Don't upcall, because we don't want to call _before_event, it's only + # for non-time events. + if self._last_received_time is None: + self.decorated.time(a_time) + self._last_sent_time = a_time + self._last_received_time = a_time + + +def all_true(bools): + """Return True if all of 'bools' are True. False otherwise.""" + for b in bools: + if not b: + return False + return True + + class TestResultFilter(TestResultDecorator): """A pyunit TestResult interface implementation which filters tests. @@ -209,7 +295,7 @@ class TestResultFilter(TestResultDecorator): def __init__(self, result, filter_error=False, filter_failure=False, filter_success=True, filter_skip=False, filter_xfail=False, - filter_predicate=None): + filter_predicate=None, fixup_expected_failures=None): """Create a FilterResult object filtering to result. :param filter_error: Filter out errors. @@ -222,71 +308,96 @@ class TestResultFilter(TestResultDecorator): through. err and details may be none if no error or extra metadata is available. outcome is the name of the outcome such as 'success' or 'failure'. + :param fixup_expected_failures: Set of test ids to consider known + failing. """ - TestResultDecorator.__init__(self, result) - self._filter_error = filter_error - self._filter_failure = filter_failure - self._filter_success = filter_success - self._filter_skip = filter_skip - self._filter_xfail = filter_xfail - if filter_predicate is None: - filter_predicate = lambda test, outcome, err, details: True - self.filter_predicate = filter_predicate + super(TestResultFilter, self).__init__(result) + self.decorated = TimeCollapsingDecorator( + TagCollapsingDecorator(self.decorated)) + predicates = [] + if filter_error: + predicates.append(lambda t, outcome, e, d: outcome != 'error') + if filter_failure: + predicates.append(lambda t, outcome, e, d: outcome != 'failure') + if filter_success: + predicates.append(lambda t, outcome, e, d: outcome != 'success') + if filter_skip: + predicates.append(lambda t, outcome, e, d: outcome != 'skip') + if filter_xfail: + predicates.append(lambda t, outcome, e, d: outcome != 'expectedfailure') + if filter_predicate is not None: + predicates.append(filter_predicate) + self.filter_predicate = ( + lambda test, outcome, err, details: + all_true(p(test, outcome, err, details) for p in predicates)) # The current test (for filtering tags) self._current_test = None # Has the current test been filtered (for outputting test tags) self._current_test_filtered = None - # The (new, gone) tags for the current test. - self._current_test_tags = None + # Calls to this result that we don't know whether to forward on yet. + self._buffered_calls = [] + if fixup_expected_failures is None: + self._fixup_expected_failures = frozenset() + else: + self._fixup_expected_failures = fixup_expected_failures def addError(self, test, err=None, details=None): - if (not self._filter_error and - self.filter_predicate(test, 'error', err, details)): - self.decorated.startTest(test) - self.decorated.addError(test, err, details=details) + if (self.filter_predicate(test, 'error', err, details)): + if self._failure_expected(test): + self._buffered_calls.append( + ('addExpectedFailure', [test, err], {'details': details})) + else: + self._buffered_calls.append( + ('addError', [test, err], {'details': details})) else: self._filtered() def addFailure(self, test, err=None, details=None): - if (not self._filter_failure and - self.filter_predicate(test, 'failure', err, details)): - self.decorated.startTest(test) - self.decorated.addFailure(test, err, details=details) + if (self.filter_predicate(test, 'failure', err, details)): + if self._failure_expected(test): + self._buffered_calls.append( + ('addExpectedFailure', [test, err], {'details': details})) + else: + self._buffered_calls.append( + ('addFailure', [test, err], {'details': details})) else: self._filtered() def addSkip(self, test, reason=None, details=None): - if (not self._filter_skip and - self.filter_predicate(test, 'skip', reason, details)): - self.decorated.startTest(test) - self.decorated.addSkip(test, reason, details=details) + if (self.filter_predicate(test, 'skip', reason, details)): + self._buffered_calls.append( + ('addSkip', [test, reason], {'details': details})) else: self._filtered() def addSuccess(self, test, details=None): - if (not self._filter_success and - self.filter_predicate(test, 'success', None, details)): - self.decorated.startTest(test) - self.decorated.addSuccess(test, details=details) + if (self.filter_predicate(test, 'success', None, details)): + if self._failure_expected(test): + self._buffered_calls.append( + ('addUnexpectedSuccess', [test], {'details': details})) + else: + self._buffered_calls.append( + ('addSuccess', [test], {'details': details})) else: self._filtered() def addExpectedFailure(self, test, err=None, details=None): - if (not self._filter_xfail and - self.filter_predicate(test, 'expectedfailure', err, details)): - self.decorated.startTest(test) - return self.decorated.addExpectedFailure(test, err, - details=details) + if self.filter_predicate(test, 'expectedfailure', err, details): + self._buffered_calls.append( + ('addExpectedFailure', [test, err], {'details': details})) else: self._filtered() def addUnexpectedSuccess(self, test, details=None): - self.decorated.startTest(test) - return self.decorated.addUnexpectedSuccess(test, details=details) + self._buffered_calls.append( + ('addUnexpectedSuccess', [test], {'details': details})) def _filtered(self): self._current_test_filtered = True + def _failure_expected(self, test): + return (test.id() in self._fixup_expected_failures) + def startTest(self, test): """Start a test. @@ -295,7 +406,7 @@ class TestResultFilter(TestResultDecorator): """ self._current_test = test self._current_test_filtered = False - self._current_test_tags = set(), set() + self._buffered_calls.append(('startTest', [test], {})) def stopTest(self, test): """Stop a test. @@ -305,29 +416,18 @@ class TestResultFilter(TestResultDecorator): """ if not self._current_test_filtered: # Tags to output for this test. - if self._current_test_tags[0] or self._current_test_tags[1]: - self.decorated.tags(*self._current_test_tags) + for method, args, kwargs in self._buffered_calls: + getattr(self.decorated, method)(*args, **kwargs) self.decorated.stopTest(test) self._current_test = None self._current_test_filtered = None - self._current_test_tags = None + self._buffered_calls = [] - def tags(self, new_tags, gone_tags): - """Handle tag instructions. - - Adds and removes tags as appropriate. If a test is currently running, - tags are not affected for subsequent tests. - - :param new_tags: Tags to add, - :param gone_tags: Tags to remove. - """ + def time(self, a_time): if self._current_test is not None: - # gather the tags until the test stops. - self._current_test_tags[0].update(new_tags) - self._current_test_tags[0].difference_update(gone_tags) - self._current_test_tags[1].update(gone_tags) - self._current_test_tags[1].difference_update(new_tags) - return self.decorated.tags(new_tags, gone_tags) + self._buffered_calls.append(('time', [a_time], {})) + else: + return self.decorated.time(a_time) def id_to_orig_id(self, id): if id.startswith("subunit.RemotedTestCase."): @@ -339,10 +439,10 @@ class TestIdPrintingResult(testtools.TestResult): def __init__(self, stream, show_times=False): """Create a FilterResult object outputting to stream.""" - testtools.TestResult.__init__(self) + super(TestIdPrintingResult, self).__init__() self._stream = stream self.failed_tests = 0 - self.__time = 0 + self.__time = None self.show_times = show_times self._test = None self._test_duration = 0 @@ -358,6 +458,16 @@ class TestIdPrintingResult(testtools.TestResult): def addSuccess(self, test): self._test = test + def addSkip(self, test, reason=None, details=None): + self._test = test + + def addUnexpectedSuccess(self, test, details=None): + self.failed_tests += 1 + self._test = test + + def addExpectedFailure(self, test, err=None, details=None): + self._test = test + def reportTest(self, test, duration): if self.show_times: seconds = duration.seconds diff --git a/python/subunit/tests/test_chunked.py b/python/subunit/tests/test_chunked.py index a24e31e..eee7fe9 100644 --- a/python/subunit/tests/test_chunked.py +++ b/python/subunit/tests/test_chunked.py @@ -1,6 +1,7 @@ # # subunit: extensions to python unittest to get test results from subprocesses. # Copyright (C) 2005 Robert Collins <robertc@robertcollins.net> +# Copyright (C) 2011 Martin Pool <mbp@sourcefrog.net> # # Licensed under either the Apache License, Version 2.0 or the BSD 3-clause # license at the users choice. A copy of both licenses are available in the @@ -86,6 +87,29 @@ class TestDecode(unittest.TestCase): self.assertEqual('', self.decoder.write('0\r\n')) self.assertEqual('1' * 65536 + '2' * 65536, self.output.getvalue()) + def test_decode_newline_nonstrict(self): + """Tolerate chunk markers with no CR character.""" + # From <http://pad.lv/505078> + self.decoder = subunit.chunked.Decoder(self.output, strict=False) + self.assertEqual(None, self.decoder.write('a\n')) + self.assertEqual(None, self.decoder.write('abcdeabcde')) + self.assertEqual('', self.decoder.write('0\n')) + self.assertEqual('abcdeabcde', self.output.getvalue()) + + def test_decode_strict_newline_only(self): + """Reject chunk markers with no CR character in strict mode.""" + # From <http://pad.lv/505078> + self.assertRaises(ValueError, + self.decoder.write, 'a\n') + + def test_decode_strict_multiple_crs(self): + self.assertRaises(ValueError, + self.decoder.write, 'a\r\r\n') + + def test_decode_short_header(self): + self.assertRaises(ValueError, + self.decoder.write, '\n') + class TestEncode(unittest.TestCase): diff --git a/python/subunit/tests/test_details.py b/python/subunit/tests/test_details.py index 41c3212..2fd1a66 100644 --- a/python/subunit/tests/test_details.py +++ b/python/subunit/tests/test_details.py @@ -14,7 +14,6 @@ # limitations under that license. # -from cStringIO import StringIO import unittest import subunit.tests diff --git a/python/subunit/tests/test_subunit_filter.py b/python/subunit/tests/test_subunit_filter.py index 3c65ed3..f8db05b 100644 --- a/python/subunit/tests/test_subunit_filter.py +++ b/python/subunit/tests/test_subunit_filter.py @@ -6,7 +6,7 @@ # license at the users choice. A copy of both licenses are available in the # project source as Apache-2.0 and BSD. You may not use this file except in # compliance with one of these two licences. -# +# # Unless required by applicable law or agreed to in writing, software # distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the @@ -16,119 +16,177 @@ """Tests for subunit.TestResultFilter.""" +from datetime import datetime +from subunit import iso8601 import unittest from StringIO import StringIO +from testtools import TestCase +from testtools.testresult.doubles import ExtendedTestResult + import subunit from subunit.test_results import TestResultFilter -class TestTestResultFilter(unittest.TestCase): +class TestTestResultFilter(TestCase): """Test for TestResultFilter, a TestResult object which filters tests.""" - def _setUp(self): - self.output = StringIO() + # While TestResultFilter works on python objects, using a subunit stream + # is an easy pithy way of getting a series of test objects to call into + # the TestResult, and as TestResultFilter is intended for use with subunit + # also has the benefit of detecting any interface skew issues. + example_subunit_stream = """\ +tags: global +test passed +success passed +test failed +tags: local +failure failed +test error +error error [ +error details +] +test skipped +skip skipped +test todo +xfail todo +""" + + def run_tests(self, result_filter, input_stream=None): + """Run tests through the given filter. + + :param result_filter: A filtering TestResult object. + :param input_stream: Bytes of subunit stream data. If not provided, + uses TestTestResultFilter.example_subunit_stream. + """ + if input_stream is None: + input_stream = self.example_subunit_stream + test = subunit.ProtocolTestCase(StringIO(input_stream)) + test.run(result_filter) def test_default(self): """The default is to exclude success and include everything else.""" - self.filtered_result = unittest.TestResult() - self.filter = TestResultFilter(self.filtered_result) - self.run_tests() + filtered_result = unittest.TestResult() + result_filter = TestResultFilter(filtered_result) + self.run_tests(result_filter) # skips are seen as success by default python TestResult. self.assertEqual(['error'], - [error[0].id() for error in self.filtered_result.errors]) + [error[0].id() for error in filtered_result.errors]) self.assertEqual(['failed'], [failure[0].id() for failure in - self.filtered_result.failures]) - self.assertEqual(4, self.filtered_result.testsRun) + filtered_result.failures]) + self.assertEqual(4, filtered_result.testsRun) def test_exclude_errors(self): - self.filtered_result = unittest.TestResult() - self.filter = TestResultFilter(self.filtered_result, - filter_error=True) - self.run_tests() + filtered_result = unittest.TestResult() + result_filter = TestResultFilter(filtered_result, filter_error=True) + self.run_tests(result_filter) # skips are seen as errors by default python TestResult. - self.assertEqual([], self.filtered_result.errors) + self.assertEqual([], filtered_result.errors) self.assertEqual(['failed'], [failure[0].id() for failure in - self.filtered_result.failures]) - self.assertEqual(3, self.filtered_result.testsRun) + filtered_result.failures]) + self.assertEqual(3, filtered_result.testsRun) + + def test_fixup_expected_failures(self): + filtered_result = unittest.TestResult() + result_filter = TestResultFilter(filtered_result, + fixup_expected_failures=set(["failed"])) + self.run_tests(result_filter) + self.assertEqual(['failed', 'todo'], + [failure[0].id() for failure in filtered_result.expectedFailures]) + self.assertEqual([], filtered_result.failures) + self.assertEqual(4, filtered_result.testsRun) + + def test_fixup_expected_errors(self): + filtered_result = unittest.TestResult() + result_filter = TestResultFilter(filtered_result, + fixup_expected_failures=set(["error"])) + self.run_tests(result_filter) + self.assertEqual(['error', 'todo'], + [failure[0].id() for failure in filtered_result.expectedFailures]) + self.assertEqual([], filtered_result.errors) + self.assertEqual(4, filtered_result.testsRun) + + def test_fixup_unexpected_success(self): + filtered_result = unittest.TestResult() + result_filter = TestResultFilter(filtered_result, filter_success=False, + fixup_expected_failures=set(["passed"])) + self.run_tests(result_filter) + self.assertEqual(['passed'], + [passed.id() for passed in filtered_result.unexpectedSuccesses]) + self.assertEqual(5, filtered_result.testsRun) def test_exclude_failure(self): - self.filtered_result = unittest.TestResult() - self.filter = TestResultFilter(self.filtered_result, - filter_failure=True) - self.run_tests() + filtered_result = unittest.TestResult() + result_filter = TestResultFilter(filtered_result, filter_failure=True) + self.run_tests(result_filter) self.assertEqual(['error'], - [error[0].id() for error in self.filtered_result.errors]) + [error[0].id() for error in filtered_result.errors]) self.assertEqual([], [failure[0].id() for failure in - self.filtered_result.failures]) - self.assertEqual(3, self.filtered_result.testsRun) + filtered_result.failures]) + self.assertEqual(3, filtered_result.testsRun) def test_exclude_skips(self): - self.filtered_result = subunit.TestResultStats(None) - self.filter = TestResultFilter(self.filtered_result, - filter_skip=True) - self.run_tests() - self.assertEqual(0, self.filtered_result.skipped_tests) - self.assertEqual(2, self.filtered_result.failed_tests) - self.assertEqual(3, self.filtered_result.testsRun) + filtered_result = subunit.TestResultStats(None) + result_filter = TestResultFilter(filtered_result, filter_skip=True) + self.run_tests(result_filter) + self.assertEqual(0, filtered_result.skipped_tests) + self.assertEqual(2, filtered_result.failed_tests) + self.assertEqual(3, filtered_result.testsRun) def test_include_success(self): - """Success's can be included if requested.""" - self.filtered_result = unittest.TestResult() - self.filter = TestResultFilter(self.filtered_result, + """Successes can be included if requested.""" + filtered_result = unittest.TestResult() + result_filter = TestResultFilter(filtered_result, filter_success=False) - self.run_tests() + self.run_tests(result_filter) self.assertEqual(['error'], - [error[0].id() for error in self.filtered_result.errors]) + [error[0].id() for error in filtered_result.errors]) self.assertEqual(['failed'], [failure[0].id() for failure in - self.filtered_result.failures]) - self.assertEqual(5, self.filtered_result.testsRun) + filtered_result.failures]) + self.assertEqual(5, filtered_result.testsRun) def test_filter_predicate(self): """You can filter by predicate callbacks""" - self.filtered_result = unittest.TestResult() + filtered_result = unittest.TestResult() def filter_cb(test, outcome, err, details): return outcome == 'success' - self.filter = TestResultFilter(self.filtered_result, + result_filter = TestResultFilter(filtered_result, filter_predicate=filter_cb, filter_success=False) - self.run_tests() + self.run_tests(result_filter) # Only success should pass - self.assertEqual(1, self.filtered_result.testsRun) - - def run_tests(self): - self.setUpTestStream() - self.test = subunit.ProtocolTestCase(self.input_stream) - self.test.run(self.filter) - - def setUpTestStream(self): - # While TestResultFilter works on python objects, using a subunit - # stream is an easy pithy way of getting a series of test objects to - # call into the TestResult, and as TestResultFilter is intended for - # use with subunit also has the benefit of detecting any interface - # skew issues. - self.input_stream = StringIO() - self.input_stream.write("""tags: global -test passed -success passed -test failed -tags: local -failure failed -test error -error error [ -error details -] -test skipped -skip skipped -test todo -xfail todo -""") - self.input_stream.seek(0) - + self.assertEqual(1, filtered_result.testsRun) + + def test_time_ordering_preserved(self): + # Passing a subunit stream through TestResultFilter preserves the + # relative ordering of 'time' directives and any other subunit + # directives that are still included. + date_a = datetime(year=2000, month=1, day=1, tzinfo=iso8601.UTC) + date_b = datetime(year=2000, month=1, day=2, tzinfo=iso8601.UTC) + date_c = datetime(year=2000, month=1, day=3, tzinfo=iso8601.UTC) + subunit_stream = '\n'.join([ + "time: %s", + "test: foo", + "time: %s", + "error: foo", + "time: %s", + ""]) % (date_a, date_b, date_c) + result = ExtendedTestResult() + result_filter = TestResultFilter(result) + self.run_tests(result_filter, subunit_stream) + foo = subunit.RemotedTestCase('foo') + self.assertEquals( + [('time', date_a), + ('startTest', foo), + ('time', date_b), + ('addError', foo, {}), + ('stopTest', foo), + ('time', date_c)], result._events) + def test_suite(): loader = subunit.tests.TestUtil.TestLoader() diff --git a/python/subunit/tests/test_test_protocol.py b/python/subunit/tests/test_test_protocol.py index e1287b6..f7bab5c 100644 --- a/python/subunit/tests/test_test_protocol.py +++ b/python/subunit/tests/test_test_protocol.py @@ -6,7 +6,7 @@ # license at the users choice. A copy of both licenses are available in the # project source as Apache-2.0 and BSD. You may not use this file except in # compliance with one of these two licences. -# +# # Unless required by applicable law or agreed to in writing, software # distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the @@ -18,7 +18,6 @@ import datetime import unittest from StringIO import StringIO import os -import sys from testtools.content import Content, TracebackContent from testtools.content_type import ContentType @@ -61,7 +60,6 @@ class TestProtocolServerForward(unittest.TestCase): pipe = StringIO("test old mcdonald\n" "success old mcdonald\n") protocol.readFrom(pipe) - mcdonald = subunit.RemotedTestCase("old mcdonald") self.assertEqual(client.testsRun, 1) self.assertEqual(pipe.getvalue(), out.getvalue()) @@ -74,7 +72,7 @@ class TestProtocolServerForward(unittest.TestCase): protocol.readFrom(pipe) self.assertEqual(client.testsRun, 0) self.assertEqual("", out.getvalue()) - + class TestTestProtocolServerPipe(unittest.TestCase): @@ -90,7 +88,6 @@ class TestTestProtocolServerPipe(unittest.TestCase): "test an error\n" "error an error\n") protocol.readFrom(pipe) - mcdonald = subunit.RemotedTestCase("old mcdonald") bing = subunit.RemotedTestCase("bing crosby") an_error = subunit.RemotedTestCase("an error") self.assertEqual(client.errors, @@ -311,7 +308,7 @@ class TestTestProtocolServerLostConnection(unittest.TestCase): self.protocol.lineReceived("%s old mcdonald %s" % (outcome, opening)) self.protocol.lostConnection() failure = subunit.RemoteError( - u"lost connection during %s report of test 'old mcdonald'" % + u"lost connection during %s report of test 'old mcdonald'" % outcome) self.assertEqual([ ('startTest', self.test), @@ -681,12 +678,6 @@ class TestTestProtocolServerAddSuccess(unittest.TestCase): ], self.client._events) def test_simple_success(self): - self.simple_success_keyword("failure") - - def test_simple_success_colon(self): - self.simple_success_keyword("failure:") - - def test_simple_success(self): self.simple_success_keyword("successful") def test_simple_success_colon(self): @@ -946,7 +937,7 @@ class TestIsolatedTestCase(unittest.TestCase): def test_construct(self): - test = self.SampleIsolatedTestCase("test_sets_global_state") + self.SampleIsolatedTestCase("test_sets_global_state") def test_run(self): result = unittest.TestResult() @@ -982,7 +973,7 @@ class TestIsolatedTestSuite(unittest.TestCase): def test_construct(self): - suite = subunit.IsolatedTestSuite() + subunit.IsolatedTestSuite() def test_run(self): result = unittest.TestResult() @@ -1117,7 +1108,7 @@ class TestTestProtocolClient(unittest.TestCase): self.assertEqual( self.io.getvalue(), 'skip: %s [\nHas it really?\n]\n' % self.test.id()) - + def test_add_skip_details(self): """Test addSkip on a TestProtocolClient with details.""" details = {'reason':Content( diff --git a/python/subunit/tests/test_test_results.py b/python/subunit/tests/test_test_results.py index fe82c04..94d2274 100644 --- a/python/subunit/tests/test_test_results.py +++ b/python/subunit/tests/test_test_results.py @@ -6,7 +6,7 @@ # license at the users choice. A copy of both licenses are available in the # project source as Apache-2.0 and BSD. You may not use this file except in # compliance with one of these two licences. -# +# # Unless required by applicable law or agreed to in writing, software # distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the @@ -16,12 +16,9 @@ import datetime import unittest -from StringIO import StringIO -import os -import sys -from testtools.content_type import ContentType -from testtools.content import Content +from testtools import TestCase +from testtools.testresult.doubles import ExtendedTestResult import subunit import subunit.iso8601 as iso8601 @@ -82,22 +79,22 @@ class TestHookedTestResultDecorator(unittest.TestCase): def test_startTest(self): self.result.startTest(self) - + def test_startTestRun(self): self.result.startTestRun() - + def test_stopTest(self): self.result.stopTest(self) - + def test_stopTestRun(self): self.result.stopTestRun() def test_addError(self): self.result.addError(self, subunit.RemoteError()) - + def test_addError_details(self): self.result.addError(self, details={}) - + def test_addFailure(self): self.result.addFailure(self, subunit.RemoteError()) @@ -142,7 +139,7 @@ class TestHookedTestResultDecorator(unittest.TestCase): def test_time(self): self.result.time(None) - + class TestAutoTimingTestResultDecorator(unittest.TestCase): @@ -193,6 +190,110 @@ class TestAutoTimingTestResultDecorator(unittest.TestCase): self.assertNotEqual(None, self.decorated._calls[2]) +class TestTagCollapsingDecorator(TestCase): + + def test_tags_forwarded_outside_of_tests(self): + result = ExtendedTestResult() + tag_collapser = subunit.test_results.TagCollapsingDecorator(result) + tag_collapser.tags(set(['a', 'b']), set()) + self.assertEquals( + [('tags', set(['a', 'b']), set([]))], result._events) + + def test_tags_collapsed_inside_of_tests(self): + result = ExtendedTestResult() + tag_collapser = subunit.test_results.TagCollapsingDecorator(result) + test = subunit.RemotedTestCase('foo') + tag_collapser.startTest(test) + tag_collapser.tags(set(['a']), set()) + tag_collapser.tags(set(['b']), set(['a'])) + tag_collapser.tags(set(['c']), set()) + tag_collapser.stopTest(test) + self.assertEquals( + [('startTest', test), + ('tags', set(['b', 'c']), set(['a'])), + ('stopTest', test)], + result._events) + + def test_tags_collapsed_inside_of_tests_different_ordering(self): + result = ExtendedTestResult() + tag_collapser = subunit.test_results.TagCollapsingDecorator(result) + test = subunit.RemotedTestCase('foo') + tag_collapser.startTest(test) + tag_collapser.tags(set(), set(['a'])) + tag_collapser.tags(set(['a', 'b']), set()) + tag_collapser.tags(set(['c']), set()) + tag_collapser.stopTest(test) + self.assertEquals( + [('startTest', test), + ('tags', set(['a', 'b', 'c']), set()), + ('stopTest', test)], + result._events) + + +class TestTimeCollapsingDecorator(TestCase): + + def make_time(self): + # Heh heh. + return datetime.datetime( + 2000, 1, self.getUniqueInteger(), tzinfo=iso8601.UTC) + + def test_initial_time_forwarded(self): + # We always forward the first time event we see. + result = ExtendedTestResult() + tag_collapser = subunit.test_results.TimeCollapsingDecorator(result) + a_time = self.make_time() + tag_collapser.time(a_time) + self.assertEquals([('time', a_time)], result._events) + + def test_time_collapsed_to_first_and_last(self): + # If there are many consecutive time events, only the first and last + # are sent through. + result = ExtendedTestResult() + tag_collapser = subunit.test_results.TimeCollapsingDecorator(result) + times = [self.make_time() for i in range(5)] + for a_time in times: + tag_collapser.time(a_time) + tag_collapser.startTest(subunit.RemotedTestCase('foo')) + self.assertEquals( + [('time', times[0]), ('time', times[-1])], result._events[:-1]) + + def test_only_one_time_sent(self): + # If we receive a single time event followed by a non-time event, we + # send exactly one time event. + result = ExtendedTestResult() + tag_collapser = subunit.test_results.TimeCollapsingDecorator(result) + a_time = self.make_time() + tag_collapser.time(a_time) + tag_collapser.startTest(subunit.RemotedTestCase('foo')) + self.assertEquals([('time', a_time)], result._events[:-1]) + + def test_duplicate_times_not_sent(self): + # Many time events with the exact same time are collapsed into one + # time event. + result = ExtendedTestResult() + tag_collapser = subunit.test_results.TimeCollapsingDecorator(result) + a_time = self.make_time() + for i in range(5): + tag_collapser.time(a_time) + tag_collapser.startTest(subunit.RemotedTestCase('foo')) + self.assertEquals([('time', a_time)], result._events[:-1]) + + def test_no_times_inserted(self): + result = ExtendedTestResult() + tag_collapser = subunit.test_results.TimeCollapsingDecorator(result) + a_time = self.make_time() + tag_collapser.time(a_time) + foo = subunit.RemotedTestCase('foo') + tag_collapser.startTest(foo) + tag_collapser.addSuccess(foo) + tag_collapser.stopTest(foo) + self.assertEquals( + [('time', a_time), + ('startTest', foo), + ('addSuccess', foo), + ('stopTest', foo)], result._events) + + def test_suite(): loader = subunit.tests.TestUtil.TestLoader() result = loader.loadTestsFromName(__name__) |
