summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorJohn Arbash Meinel <john@arbash-meinel.com>2011-06-30 12:59:08 +0200
committerJohn Arbash Meinel <john@arbash-meinel.com>2011-06-30 12:59:08 +0200
commit3a41c806338121e7597c52b8e6e79302e67d8008 (patch)
treedf4154aad123123069b2730846aa6b907dbdb9d7 /python
parent6954432d06307b33e53bbeb11799b2f61148ef31 (diff)
parentfac837618f84531205f6491fb9d72bde8717fbb8 (diff)
downloadsubunit-git-3a41c806338121e7597c52b8e6e79302e67d8008.tar.gz
Merge trunk r142 and resolve conflicts.
Diffstat (limited to 'python')
-rw-r--r--python/subunit/__init__.py67
-rw-r--r--python/subunit/chunked.py24
-rwxr-xr-xpython/subunit/run.py3
-rw-r--r--python/subunit/test_results.py226
-rw-r--r--python/subunit/tests/test_chunked.py24
-rw-r--r--python/subunit/tests/test_details.py1
-rw-r--r--python/subunit/tests/test_subunit_filter.py202
-rw-r--r--python/subunit/tests/test_test_protocol.py21
-rw-r--r--python/subunit/tests/test_test_results.py125
9 files changed, 503 insertions, 190 deletions
diff --git a/python/subunit/__init__.py b/python/subunit/__init__.py
index b6f0108..368d3b2 100644
--- a/python/subunit/__init__.py
+++ b/python/subunit/__init__.py
@@ -6,7 +6,7 @@
# license at the users choice. A copy of both licenses are available in the
# project source as Apache-2.0 and BSD. You may not use this file except in
# compliance with one of these two licences.
-#
+#
# Unless required by applicable law or agreed to in writing, software
# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@@ -49,7 +49,7 @@ details, tags, timestamping and progress markers).
The test outcome methods ``addSuccess``, ``addError``, ``addExpectedFailure``,
``addFailure``, ``addSkip`` take an optional keyword parameter ``details``
which can be used instead of the usual python unittest parameter.
-When used the value of details should be a dict from ``string`` to
+When used the value of details should be a dict from ``string`` to
``testtools.content.Content`` objects. This is a draft API being worked on with
the Python Testing In Python mail list, with the goal of permitting a common
way to provide additional data beyond a traceback, such as captured data from
@@ -58,7 +58,7 @@ and newer).
The ``tags(new_tags, gone_tags)`` method is called (if present) to add or
remove tags in the test run that is currently executing. If called when no
-test is in progress (that is, if called outside of the ``startTest``,
+test is in progress (that is, if called outside of the ``startTest``,
``stopTest`` pair), the the tags apply to all sebsequent tests. If called
when a test is in progress, then the tags only apply to that test.
@@ -87,7 +87,7 @@ tests, allowing isolation between the test runner and some tests.
Similarly, ``IsolatedTestCase`` is a base class which can be subclassed to get
tests that will fork() before that individual test is run.
-`ExecTestCase`` is a convenience wrapper for running an external
+`ExecTestCase`` is a convenience wrapper for running an external
program to get a Subunit stream and then report that back to an arbitrary
result object::
@@ -98,7 +98,7 @@ result object::
def test_script_two(self):
'./bin/script_two'
-
+
# Normally your normal test loading would take of this automatically,
# It is only spelt out in detail here for clarity.
suite = unittest.TestSuite([AggregateTests("test_script_one"),
@@ -116,7 +116,6 @@ Utility modules
* subunit.test_results contains TestResult helper classes.
"""
-import datetime
import os
import re
from StringIO import StringIO
@@ -254,7 +253,7 @@ class _InTest(_ParserState):
def _outcome(self, offset, line, no_details, details_state):
"""An outcome directive has been read.
-
+
:param no_details: Callable to call when no details are presented.
:param details_state: The state to switch to for details
processing of this outcome.
@@ -382,7 +381,7 @@ class _ReadingFailureDetails(_ReadingDetails):
def _outcome_label(self):
return "failure"
-
+
class _ReadingErrorDetails(_ReadingDetails):
"""State for the subunit parser when reading error details."""
@@ -430,7 +429,7 @@ class _ReadingSuccessDetails(_ReadingDetails):
class TestProtocolServer(object):
"""A parser for subunit.
-
+
:ivar tags: The current tags associated with the protocol stream.
"""
@@ -442,7 +441,7 @@ class TestProtocolServer(object):
subunit protocol should be written to. This allows custom handling
of mixed protocols. By default, sys.stdout will be used for
convenience.
- :param forward_stream: A stream to forward subunit lines to. This
+ :param forward_stream: A stream to forward subunit lines to. This
allows a filter to forward the entire stream while still parsing
and acting on it. By default forward_stream is set to
DiscardStream() and no forwarding happens.
@@ -510,7 +509,7 @@ class TestProtocolServer(object):
def readFrom(self, pipe):
"""Blocking convenience API to parse an entire stream.
-
+
:param pipe: A file-like object supporting readlines().
:return: None.
"""
@@ -531,7 +530,7 @@ class TestProtocolServer(object):
class TestProtocolClient(testresult.TestResult):
"""A TestResult which generates a subunit stream for a test run.
-
+
# Get a TestSuite or TestCase to run
suite = make_suite()
# Create a stream (any object with a 'write' method)
@@ -554,7 +553,7 @@ class TestProtocolClient(testresult.TestResult):
def addError(self, test, error=None, details=None):
"""Report an error in test test.
-
+
Only one of error and details should be provided: conceptually there
are two separate methods:
addError(self, test, error)
@@ -569,7 +568,7 @@ class TestProtocolClient(testresult.TestResult):
def addExpectedFailure(self, test, error=None, details=None):
"""Report an expected failure in test test.
-
+
Only one of error and details should be provided: conceptually there
are two separate methods:
addError(self, test, error)
@@ -584,7 +583,7 @@ class TestProtocolClient(testresult.TestResult):
def addFailure(self, test, error=None, details=None):
"""Report a failure in test test.
-
+
Only one of error and details should be provided: conceptually there
are two separate methods:
addFailure(self, test, error)
@@ -599,7 +598,7 @@ class TestProtocolClient(testresult.TestResult):
def _addOutcome(self, outcome, test, error=None, details=None):
"""Report a failure in test test.
-
+
Only one of error and details should be provided: conceptually there
are two separate methods:
addOutcome(self, test, error)
@@ -646,7 +645,13 @@ class TestProtocolClient(testresult.TestResult):
def startTest(self, test):
"""Mark a test as starting its test run."""
+ super(TestProtocolClient, self).startTest(test)
self._stream.write("test: %s\n" % test.id())
+ self._stream.flush()
+
+ def stopTest(self, test):
+ super(TestProtocolClient, self).stopTest(test)
+ self._stream.flush()
def progress(self, offset, whence):
"""Provide indication about the progress/length of the test run.
@@ -711,7 +716,7 @@ def RemoteError(description=u""):
class RemotedTestCase(unittest.TestCase):
"""A class to represent test cases run in child processes.
-
+
Instances of this class are used to provide the Python test API a TestCase
that can be printed to the screen, introspected for metadata and so on.
However, as they are a simply a memoisation of a test that was actually
@@ -796,7 +801,7 @@ class ExecTestCase(unittest.TestCase):
class IsolatedTestCase(unittest.TestCase):
"""A TestCase which executes in a forked process.
-
+
Each test gets its own process, which has a performance overhead but will
provide excellent isolation from global state (such as django configs,
zope utilities and so on).
@@ -809,7 +814,7 @@ class IsolatedTestCase(unittest.TestCase):
class IsolatedTestSuite(unittest.TestSuite):
"""A TestSuite which runs its tests in a forked process.
-
+
This decorator that will fork() before running the tests and report the
results from the child process using a Subunit stream. This is useful for
handling tests that mutate global state, or are testing C extensions that
@@ -861,7 +866,7 @@ def run_isolated(klass, self, result):
def TAP2SubUnit(tap, subunit):
"""Filter a TAP pipe into a subunit pipe.
-
+
:param tap: A tap pipe/stream/file object.
:param subunit: A pipe/stream/file object to write subunit results to.
:return: The exit code to exit with.
@@ -869,7 +874,6 @@ def TAP2SubUnit(tap, subunit):
BEFORE_PLAN = 0
AFTER_PLAN = 1
SKIP_STREAM = 2
- client = TestProtocolClient(subunit)
state = BEFORE_PLAN
plan_start = 1
plan_stop = 0
@@ -1019,11 +1023,11 @@ class ProtocolTestCase(object):
that has been encoded into the stream. The ``unittest.TestCase`` ``debug``
and ``countTestCases`` methods are not supported because there isn't a
sensible mapping for those methods.
-
+
# Get a stream (any object with a readline() method), in this case the
# stream output by the example from ``subunit.TestProtocolClient``.
stream = file('tests.log', 'rb')
- # Create a parser which will read from the stream and emit
+ # Create a parser which will read from the stream and emit
# activity to a unittest.TestResult when run() is called.
suite = subunit.ProtocolTestCase(stream)
# Create a result object to accept the contents of that stream.
@@ -1067,7 +1071,7 @@ class ProtocolTestCase(object):
class TestResultStats(testresult.TestResult):
"""A pyunit TestResult interface implementation for making statistics.
-
+
:ivar total_tests: The total tests seen.
:ivar passed_tests: The tests that passed.
:ivar failed_tests: The tests that failed.
@@ -1118,7 +1122,7 @@ class TestResultStats(testresult.TestResult):
def get_default_formatter():
"""Obtain the default formatter to write to.
-
+
:return: A file-like object.
"""
formatter = os.getenv("SUBUNIT_FORMATTER")
@@ -1128,6 +1132,19 @@ def get_default_formatter():
return sys.stdout
+def read_test_list(path):
+ """Read a list of test ids from a file on disk.
+
+ :param path: Path to the file
+ :return: Sequence of test ids
+ """
+ f = open(path, 'rb')
+ try:
+ return [l.rstrip("\n") for l in f.readlines()]
+ finally:
+ f.close()
+
+
def _make_stream_binary(stream):
"""Ensure that a stream will be binary safe. See _make_binary_on_windows."""
if getattr(stream, 'fileno', None) is not None:
diff --git a/python/subunit/chunked.py b/python/subunit/chunked.py
index 82e4b0d..5f8c6f1 100644
--- a/python/subunit/chunked.py
+++ b/python/subunit/chunked.py
@@ -1,12 +1,13 @@
#
# subunit: extensions to python unittest to get test results from subprocesses.
# Copyright (C) 2005 Robert Collins <robertc@robertcollins.net>
+# Copyright (C) 2011 Martin Pool <mbp@sourcefrog.net>
#
# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
# license at the users choice. A copy of both licenses are available in the
# project source as Apache-2.0 and BSD. You may not use this file except in
# compliance with one of these two licences.
-#
+#
# Unless required by applicable law or agreed to in writing, software
# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@@ -19,7 +20,7 @@
class Decoder(object):
"""Decode chunked content to a byte stream."""
- def __init__(self, output):
+ def __init__(self, output, strict=True):
"""Create a decoder decoding to output.
:param output: A file-like object. Bytes written to the Decoder are
@@ -29,11 +30,18 @@ class Decoder(object):
when no more data is available, to detect short streams; the
write method will return none-None when the end of a stream is
detected.
+
+ :param strict: If True (the default), the decoder will not knowingly
+ accept input that is not conformant to the HTTP specification.
+ (This does not imply that it will catch every nonconformance.)
+ If False, it will accept incorrect input that is still
+ unambiguous.
"""
self.output = output
self.buffered_bytes = []
self.state = self._read_length
self.body_length = 0
+ self.strict = strict
def close(self):
"""Close the decoder.
@@ -72,7 +80,6 @@ class Decoder(object):
def _read_length(self):
"""Try to decode a length from the bytes."""
- count = -1
match_chars = "0123456789abcdefABCDEF\r\n"
count_chars = []
for bytes in self.buffered_bytes:
@@ -87,7 +94,12 @@ class Decoder(object):
if count_chars[-1][-1] != '\n':
return
count_str = ''.join(count_chars)
- self.body_length = int(count_str[:-2], 16)
+ if self.strict:
+ if count_str[-2:] != '\r\n':
+ raise ValueError("chunk header invalid: %r" % count_str)
+ if '\r' in count_str[:-2]:
+ raise ValueError("too many CRs in chunk header %r" % count_str)
+ self.body_length = int(count_str.rstrip('\n\r'), 16)
excess_bytes = len(count_str)
while excess_bytes:
if excess_bytes >= len(self.buffered_bytes[0]):
@@ -107,7 +119,7 @@ class Decoder(object):
def write(self, bytes):
"""Decode bytes to the output stream.
-
+
:raises ValueError: If the stream has already seen the end of file
marker.
:returns: None, or the excess bytes beyond the end of file marker.
@@ -133,7 +145,7 @@ class Encoder(object):
def flush(self, extra_len=0):
"""Flush the encoder to the output stream.
-
+
:param extra_len: Increase the size of the chunk by this many bytes
to allow for a subsequent write.
"""
diff --git a/python/subunit/run.py b/python/subunit/run.py
index daa241a..b390de3 100755
--- a/python/subunit/run.py
+++ b/python/subunit/run.py
@@ -69,4 +69,5 @@ class SubunitTestProgram(TestProgram):
if __name__ == '__main__':
stream = get_default_formatter()
runner = SubunitTestRunner(stream)
- SubunitTestProgram(module=None, argv=sys.argv, testRunner=runner)
+ SubunitTestProgram(module=None, argv=sys.argv, testRunner=runner,
+ stdout=sys.stdout)
diff --git a/python/subunit/test_results.py b/python/subunit/test_results.py
index cc588d2..33fb50e 100644
--- a/python/subunit/test_results.py
+++ b/python/subunit/test_results.py
@@ -18,9 +18,10 @@
import datetime
-import iso8601
import testtools
+from subunit import iso8601
+
# NOT a TestResult, because we are implementing the interface, not inheriting
# it.
@@ -81,8 +82,12 @@ class TestResultDecorator(object):
def stop(self):
return self.decorated.stop()
+ @property
+ def testsRun(self):
+ return self.decorated.testsRun
+
def tags(self, new_tags, gone_tags):
- return self.decorated.time(new_tags, gone_tags)
+ return self.decorated.tags(new_tags, gone_tags)
def time(self, a_datetime):
return self.decorated.time(a_datetime)
@@ -195,6 +200,87 @@ class AutoTimingTestResultDecorator(HookedTestResultDecorator):
return self.decorated.time(a_datetime)
+class TagCollapsingDecorator(TestResultDecorator):
+ """Collapses many 'tags' calls into one where possible."""
+
+ def __init__(self, result):
+ super(TagCollapsingDecorator, self).__init__(result)
+ # The (new, gone) tags for the current test.
+ self._current_test_tags = None
+
+ def startTest(self, test):
+ """Start a test.
+
+ Not directly passed to the client, but used for handling of tags
+ correctly.
+ """
+ self.decorated.startTest(test)
+ self._current_test_tags = set(), set()
+
+ def stopTest(self, test):
+ """Stop a test.
+
+ Not directly passed to the client, but used for handling of tags
+ correctly.
+ """
+ # Tags to output for this test.
+ if self._current_test_tags[0] or self._current_test_tags[1]:
+ self.decorated.tags(*self._current_test_tags)
+ self.decorated.stopTest(test)
+ self._current_test_tags = None
+
+ def tags(self, new_tags, gone_tags):
+ """Handle tag instructions.
+
+ Adds and removes tags as appropriate. If a test is currently running,
+ tags are not affected for subsequent tests.
+
+ :param new_tags: Tags to add,
+ :param gone_tags: Tags to remove.
+ """
+ if self._current_test_tags is not None:
+ # gather the tags until the test stops.
+ self._current_test_tags[0].update(new_tags)
+ self._current_test_tags[0].difference_update(gone_tags)
+ self._current_test_tags[1].update(gone_tags)
+ self._current_test_tags[1].difference_update(new_tags)
+ else:
+ return self.decorated.tags(new_tags, gone_tags)
+
+
+class TimeCollapsingDecorator(HookedTestResultDecorator):
+ """Only pass on the first and last of a consecutive sequence of times."""
+
+ def __init__(self, decorated):
+ super(TimeCollapsingDecorator, self).__init__(decorated)
+ self._last_received_time = None
+ self._last_sent_time = None
+
+ def _before_event(self):
+ if self._last_received_time is None:
+ return
+ if self._last_received_time != self._last_sent_time:
+ self.decorated.time(self._last_received_time)
+ self._last_sent_time = self._last_received_time
+ self._last_received_time = None
+
+ def time(self, a_time):
+ # Don't upcall, because we don't want to call _before_event, it's only
+ # for non-time events.
+ if self._last_received_time is None:
+ self.decorated.time(a_time)
+ self._last_sent_time = a_time
+ self._last_received_time = a_time
+
+
+def all_true(bools):
+ """Return True if all of 'bools' are True. False otherwise."""
+ for b in bools:
+ if not b:
+ return False
+ return True
+
+
class TestResultFilter(TestResultDecorator):
"""A pyunit TestResult interface implementation which filters tests.
@@ -209,7 +295,7 @@ class TestResultFilter(TestResultDecorator):
def __init__(self, result, filter_error=False, filter_failure=False,
filter_success=True, filter_skip=False, filter_xfail=False,
- filter_predicate=None):
+ filter_predicate=None, fixup_expected_failures=None):
"""Create a FilterResult object filtering to result.
:param filter_error: Filter out errors.
@@ -222,71 +308,96 @@ class TestResultFilter(TestResultDecorator):
through. err and details may be none if no error or extra
metadata is available. outcome is the name of the outcome such
as 'success' or 'failure'.
+ :param fixup_expected_failures: Set of test ids to consider known
+ failing.
"""
- TestResultDecorator.__init__(self, result)
- self._filter_error = filter_error
- self._filter_failure = filter_failure
- self._filter_success = filter_success
- self._filter_skip = filter_skip
- self._filter_xfail = filter_xfail
- if filter_predicate is None:
- filter_predicate = lambda test, outcome, err, details: True
- self.filter_predicate = filter_predicate
+ super(TestResultFilter, self).__init__(result)
+ self.decorated = TimeCollapsingDecorator(
+ TagCollapsingDecorator(self.decorated))
+ predicates = []
+ if filter_error:
+ predicates.append(lambda t, outcome, e, d: outcome != 'error')
+ if filter_failure:
+ predicates.append(lambda t, outcome, e, d: outcome != 'failure')
+ if filter_success:
+ predicates.append(lambda t, outcome, e, d: outcome != 'success')
+ if filter_skip:
+ predicates.append(lambda t, outcome, e, d: outcome != 'skip')
+ if filter_xfail:
+ predicates.append(lambda t, outcome, e, d: outcome != 'expectedfailure')
+ if filter_predicate is not None:
+ predicates.append(filter_predicate)
+ self.filter_predicate = (
+ lambda test, outcome, err, details:
+ all_true(p(test, outcome, err, details) for p in predicates))
# The current test (for filtering tags)
self._current_test = None
# Has the current test been filtered (for outputting test tags)
self._current_test_filtered = None
- # The (new, gone) tags for the current test.
- self._current_test_tags = None
+ # Calls to this result that we don't know whether to forward on yet.
+ self._buffered_calls = []
+ if fixup_expected_failures is None:
+ self._fixup_expected_failures = frozenset()
+ else:
+ self._fixup_expected_failures = fixup_expected_failures
def addError(self, test, err=None, details=None):
- if (not self._filter_error and
- self.filter_predicate(test, 'error', err, details)):
- self.decorated.startTest(test)
- self.decorated.addError(test, err, details=details)
+ if (self.filter_predicate(test, 'error', err, details)):
+ if self._failure_expected(test):
+ self._buffered_calls.append(
+ ('addExpectedFailure', [test, err], {'details': details}))
+ else:
+ self._buffered_calls.append(
+ ('addError', [test, err], {'details': details}))
else:
self._filtered()
def addFailure(self, test, err=None, details=None):
- if (not self._filter_failure and
- self.filter_predicate(test, 'failure', err, details)):
- self.decorated.startTest(test)
- self.decorated.addFailure(test, err, details=details)
+ if (self.filter_predicate(test, 'failure', err, details)):
+ if self._failure_expected(test):
+ self._buffered_calls.append(
+ ('addExpectedFailure', [test, err], {'details': details}))
+ else:
+ self._buffered_calls.append(
+ ('addFailure', [test, err], {'details': details}))
else:
self._filtered()
def addSkip(self, test, reason=None, details=None):
- if (not self._filter_skip and
- self.filter_predicate(test, 'skip', reason, details)):
- self.decorated.startTest(test)
- self.decorated.addSkip(test, reason, details=details)
+ if (self.filter_predicate(test, 'skip', reason, details)):
+ self._buffered_calls.append(
+ ('addSkip', [test, reason], {'details': details}))
else:
self._filtered()
def addSuccess(self, test, details=None):
- if (not self._filter_success and
- self.filter_predicate(test, 'success', None, details)):
- self.decorated.startTest(test)
- self.decorated.addSuccess(test, details=details)
+ if (self.filter_predicate(test, 'success', None, details)):
+ if self._failure_expected(test):
+ self._buffered_calls.append(
+ ('addUnexpectedSuccess', [test], {'details': details}))
+ else:
+ self._buffered_calls.append(
+ ('addSuccess', [test], {'details': details}))
else:
self._filtered()
def addExpectedFailure(self, test, err=None, details=None):
- if (not self._filter_xfail and
- self.filter_predicate(test, 'expectedfailure', err, details)):
- self.decorated.startTest(test)
- return self.decorated.addExpectedFailure(test, err,
- details=details)
+ if self.filter_predicate(test, 'expectedfailure', err, details):
+ self._buffered_calls.append(
+ ('addExpectedFailure', [test, err], {'details': details}))
else:
self._filtered()
def addUnexpectedSuccess(self, test, details=None):
- self.decorated.startTest(test)
- return self.decorated.addUnexpectedSuccess(test, details=details)
+ self._buffered_calls.append(
+ ('addUnexpectedSuccess', [test], {'details': details}))
def _filtered(self):
self._current_test_filtered = True
+ def _failure_expected(self, test):
+ return (test.id() in self._fixup_expected_failures)
+
def startTest(self, test):
"""Start a test.
@@ -295,7 +406,7 @@ class TestResultFilter(TestResultDecorator):
"""
self._current_test = test
self._current_test_filtered = False
- self._current_test_tags = set(), set()
+ self._buffered_calls.append(('startTest', [test], {}))
def stopTest(self, test):
"""Stop a test.
@@ -305,29 +416,18 @@ class TestResultFilter(TestResultDecorator):
"""
if not self._current_test_filtered:
# Tags to output for this test.
- if self._current_test_tags[0] or self._current_test_tags[1]:
- self.decorated.tags(*self._current_test_tags)
+ for method, args, kwargs in self._buffered_calls:
+ getattr(self.decorated, method)(*args, **kwargs)
self.decorated.stopTest(test)
self._current_test = None
self._current_test_filtered = None
- self._current_test_tags = None
+ self._buffered_calls = []
- def tags(self, new_tags, gone_tags):
- """Handle tag instructions.
-
- Adds and removes tags as appropriate. If a test is currently running,
- tags are not affected for subsequent tests.
-
- :param new_tags: Tags to add,
- :param gone_tags: Tags to remove.
- """
+ def time(self, a_time):
if self._current_test is not None:
- # gather the tags until the test stops.
- self._current_test_tags[0].update(new_tags)
- self._current_test_tags[0].difference_update(gone_tags)
- self._current_test_tags[1].update(gone_tags)
- self._current_test_tags[1].difference_update(new_tags)
- return self.decorated.tags(new_tags, gone_tags)
+ self._buffered_calls.append(('time', [a_time], {}))
+ else:
+ return self.decorated.time(a_time)
def id_to_orig_id(self, id):
if id.startswith("subunit.RemotedTestCase."):
@@ -339,10 +439,10 @@ class TestIdPrintingResult(testtools.TestResult):
def __init__(self, stream, show_times=False):
"""Create a FilterResult object outputting to stream."""
- testtools.TestResult.__init__(self)
+ super(TestIdPrintingResult, self).__init__()
self._stream = stream
self.failed_tests = 0
- self.__time = 0
+ self.__time = None
self.show_times = show_times
self._test = None
self._test_duration = 0
@@ -358,6 +458,16 @@ class TestIdPrintingResult(testtools.TestResult):
def addSuccess(self, test):
self._test = test
+ def addSkip(self, test, reason=None, details=None):
+ self._test = test
+
+ def addUnexpectedSuccess(self, test, details=None):
+ self.failed_tests += 1
+ self._test = test
+
+ def addExpectedFailure(self, test, err=None, details=None):
+ self._test = test
+
def reportTest(self, test, duration):
if self.show_times:
seconds = duration.seconds
diff --git a/python/subunit/tests/test_chunked.py b/python/subunit/tests/test_chunked.py
index a24e31e..eee7fe9 100644
--- a/python/subunit/tests/test_chunked.py
+++ b/python/subunit/tests/test_chunked.py
@@ -1,6 +1,7 @@
#
# subunit: extensions to python unittest to get test results from subprocesses.
# Copyright (C) 2005 Robert Collins <robertc@robertcollins.net>
+# Copyright (C) 2011 Martin Pool <mbp@sourcefrog.net>
#
# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
# license at the users choice. A copy of both licenses are available in the
@@ -86,6 +87,29 @@ class TestDecode(unittest.TestCase):
self.assertEqual('', self.decoder.write('0\r\n'))
self.assertEqual('1' * 65536 + '2' * 65536, self.output.getvalue())
+ def test_decode_newline_nonstrict(self):
+ """Tolerate chunk markers with no CR character."""
+ # From <http://pad.lv/505078>
+ self.decoder = subunit.chunked.Decoder(self.output, strict=False)
+ self.assertEqual(None, self.decoder.write('a\n'))
+ self.assertEqual(None, self.decoder.write('abcdeabcde'))
+ self.assertEqual('', self.decoder.write('0\n'))
+ self.assertEqual('abcdeabcde', self.output.getvalue())
+
+ def test_decode_strict_newline_only(self):
+ """Reject chunk markers with no CR character in strict mode."""
+ # From <http://pad.lv/505078>
+ self.assertRaises(ValueError,
+ self.decoder.write, 'a\n')
+
+ def test_decode_strict_multiple_crs(self):
+ self.assertRaises(ValueError,
+ self.decoder.write, 'a\r\r\n')
+
+ def test_decode_short_header(self):
+ self.assertRaises(ValueError,
+ self.decoder.write, '\n')
+
class TestEncode(unittest.TestCase):
diff --git a/python/subunit/tests/test_details.py b/python/subunit/tests/test_details.py
index 41c3212..2fd1a66 100644
--- a/python/subunit/tests/test_details.py
+++ b/python/subunit/tests/test_details.py
@@ -14,7 +14,6 @@
# limitations under that license.
#
-from cStringIO import StringIO
import unittest
import subunit.tests
diff --git a/python/subunit/tests/test_subunit_filter.py b/python/subunit/tests/test_subunit_filter.py
index 3c65ed3..f8db05b 100644
--- a/python/subunit/tests/test_subunit_filter.py
+++ b/python/subunit/tests/test_subunit_filter.py
@@ -6,7 +6,7 @@
# license at the users choice. A copy of both licenses are available in the
# project source as Apache-2.0 and BSD. You may not use this file except in
# compliance with one of these two licences.
-#
+#
# Unless required by applicable law or agreed to in writing, software
# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@@ -16,119 +16,177 @@
"""Tests for subunit.TestResultFilter."""
+from datetime import datetime
+from subunit import iso8601
import unittest
from StringIO import StringIO
+from testtools import TestCase
+from testtools.testresult.doubles import ExtendedTestResult
+
import subunit
from subunit.test_results import TestResultFilter
-class TestTestResultFilter(unittest.TestCase):
+class TestTestResultFilter(TestCase):
"""Test for TestResultFilter, a TestResult object which filters tests."""
- def _setUp(self):
- self.output = StringIO()
+ # While TestResultFilter works on python objects, using a subunit stream
+ # is an easy pithy way of getting a series of test objects to call into
+ # the TestResult, and as TestResultFilter is intended for use with subunit
+ # also has the benefit of detecting any interface skew issues.
+ example_subunit_stream = """\
+tags: global
+test passed
+success passed
+test failed
+tags: local
+failure failed
+test error
+error error [
+error details
+]
+test skipped
+skip skipped
+test todo
+xfail todo
+"""
+
+ def run_tests(self, result_filter, input_stream=None):
+ """Run tests through the given filter.
+
+ :param result_filter: A filtering TestResult object.
+ :param input_stream: Bytes of subunit stream data. If not provided,
+ uses TestTestResultFilter.example_subunit_stream.
+ """
+ if input_stream is None:
+ input_stream = self.example_subunit_stream
+ test = subunit.ProtocolTestCase(StringIO(input_stream))
+ test.run(result_filter)
def test_default(self):
"""The default is to exclude success and include everything else."""
- self.filtered_result = unittest.TestResult()
- self.filter = TestResultFilter(self.filtered_result)
- self.run_tests()
+ filtered_result = unittest.TestResult()
+ result_filter = TestResultFilter(filtered_result)
+ self.run_tests(result_filter)
# skips are seen as success by default python TestResult.
self.assertEqual(['error'],
- [error[0].id() for error in self.filtered_result.errors])
+ [error[0].id() for error in filtered_result.errors])
self.assertEqual(['failed'],
[failure[0].id() for failure in
- self.filtered_result.failures])
- self.assertEqual(4, self.filtered_result.testsRun)
+ filtered_result.failures])
+ self.assertEqual(4, filtered_result.testsRun)
def test_exclude_errors(self):
- self.filtered_result = unittest.TestResult()
- self.filter = TestResultFilter(self.filtered_result,
- filter_error=True)
- self.run_tests()
+ filtered_result = unittest.TestResult()
+ result_filter = TestResultFilter(filtered_result, filter_error=True)
+ self.run_tests(result_filter)
# skips are seen as errors by default python TestResult.
- self.assertEqual([], self.filtered_result.errors)
+ self.assertEqual([], filtered_result.errors)
self.assertEqual(['failed'],
[failure[0].id() for failure in
- self.filtered_result.failures])
- self.assertEqual(3, self.filtered_result.testsRun)
+ filtered_result.failures])
+ self.assertEqual(3, filtered_result.testsRun)
+
+ def test_fixup_expected_failures(self):
+ filtered_result = unittest.TestResult()
+ result_filter = TestResultFilter(filtered_result,
+ fixup_expected_failures=set(["failed"]))
+ self.run_tests(result_filter)
+ self.assertEqual(['failed', 'todo'],
+ [failure[0].id() for failure in filtered_result.expectedFailures])
+ self.assertEqual([], filtered_result.failures)
+ self.assertEqual(4, filtered_result.testsRun)
+
+ def test_fixup_expected_errors(self):
+ filtered_result = unittest.TestResult()
+ result_filter = TestResultFilter(filtered_result,
+ fixup_expected_failures=set(["error"]))
+ self.run_tests(result_filter)
+ self.assertEqual(['error', 'todo'],
+ [failure[0].id() for failure in filtered_result.expectedFailures])
+ self.assertEqual([], filtered_result.errors)
+ self.assertEqual(4, filtered_result.testsRun)
+
+ def test_fixup_unexpected_success(self):
+ filtered_result = unittest.TestResult()
+ result_filter = TestResultFilter(filtered_result, filter_success=False,
+ fixup_expected_failures=set(["passed"]))
+ self.run_tests(result_filter)
+ self.assertEqual(['passed'],
+ [passed.id() for passed in filtered_result.unexpectedSuccesses])
+ self.assertEqual(5, filtered_result.testsRun)
def test_exclude_failure(self):
- self.filtered_result = unittest.TestResult()
- self.filter = TestResultFilter(self.filtered_result,
- filter_failure=True)
- self.run_tests()
+ filtered_result = unittest.TestResult()
+ result_filter = TestResultFilter(filtered_result, filter_failure=True)
+ self.run_tests(result_filter)
self.assertEqual(['error'],
- [error[0].id() for error in self.filtered_result.errors])
+ [error[0].id() for error in filtered_result.errors])
self.assertEqual([],
[failure[0].id() for failure in
- self.filtered_result.failures])
- self.assertEqual(3, self.filtered_result.testsRun)
+ filtered_result.failures])
+ self.assertEqual(3, filtered_result.testsRun)
def test_exclude_skips(self):
- self.filtered_result = subunit.TestResultStats(None)
- self.filter = TestResultFilter(self.filtered_result,
- filter_skip=True)
- self.run_tests()
- self.assertEqual(0, self.filtered_result.skipped_tests)
- self.assertEqual(2, self.filtered_result.failed_tests)
- self.assertEqual(3, self.filtered_result.testsRun)
+ filtered_result = subunit.TestResultStats(None)
+ result_filter = TestResultFilter(filtered_result, filter_skip=True)
+ self.run_tests(result_filter)
+ self.assertEqual(0, filtered_result.skipped_tests)
+ self.assertEqual(2, filtered_result.failed_tests)
+ self.assertEqual(3, filtered_result.testsRun)
def test_include_success(self):
- """Success's can be included if requested."""
- self.filtered_result = unittest.TestResult()
- self.filter = TestResultFilter(self.filtered_result,
+ """Successes can be included if requested."""
+ filtered_result = unittest.TestResult()
+ result_filter = TestResultFilter(filtered_result,
filter_success=False)
- self.run_tests()
+ self.run_tests(result_filter)
self.assertEqual(['error'],
- [error[0].id() for error in self.filtered_result.errors])
+ [error[0].id() for error in filtered_result.errors])
self.assertEqual(['failed'],
[failure[0].id() for failure in
- self.filtered_result.failures])
- self.assertEqual(5, self.filtered_result.testsRun)
+ filtered_result.failures])
+ self.assertEqual(5, filtered_result.testsRun)
def test_filter_predicate(self):
"""You can filter by predicate callbacks"""
- self.filtered_result = unittest.TestResult()
+ filtered_result = unittest.TestResult()
def filter_cb(test, outcome, err, details):
return outcome == 'success'
- self.filter = TestResultFilter(self.filtered_result,
+ result_filter = TestResultFilter(filtered_result,
filter_predicate=filter_cb,
filter_success=False)
- self.run_tests()
+ self.run_tests(result_filter)
# Only success should pass
- self.assertEqual(1, self.filtered_result.testsRun)
-
- def run_tests(self):
- self.setUpTestStream()
- self.test = subunit.ProtocolTestCase(self.input_stream)
- self.test.run(self.filter)
-
- def setUpTestStream(self):
- # While TestResultFilter works on python objects, using a subunit
- # stream is an easy pithy way of getting a series of test objects to
- # call into the TestResult, and as TestResultFilter is intended for
- # use with subunit also has the benefit of detecting any interface
- # skew issues.
- self.input_stream = StringIO()
- self.input_stream.write("""tags: global
-test passed
-success passed
-test failed
-tags: local
-failure failed
-test error
-error error [
-error details
-]
-test skipped
-skip skipped
-test todo
-xfail todo
-""")
- self.input_stream.seek(0)
-
+ self.assertEqual(1, filtered_result.testsRun)
+
+ def test_time_ordering_preserved(self):
+ # Passing a subunit stream through TestResultFilter preserves the
+ # relative ordering of 'time' directives and any other subunit
+ # directives that are still included.
+ date_a = datetime(year=2000, month=1, day=1, tzinfo=iso8601.UTC)
+ date_b = datetime(year=2000, month=1, day=2, tzinfo=iso8601.UTC)
+ date_c = datetime(year=2000, month=1, day=3, tzinfo=iso8601.UTC)
+ subunit_stream = '\n'.join([
+ "time: %s",
+ "test: foo",
+ "time: %s",
+ "error: foo",
+ "time: %s",
+ ""]) % (date_a, date_b, date_c)
+ result = ExtendedTestResult()
+ result_filter = TestResultFilter(result)
+ self.run_tests(result_filter, subunit_stream)
+ foo = subunit.RemotedTestCase('foo')
+ self.assertEquals(
+ [('time', date_a),
+ ('startTest', foo),
+ ('time', date_b),
+ ('addError', foo, {}),
+ ('stopTest', foo),
+ ('time', date_c)], result._events)
+
def test_suite():
loader = subunit.tests.TestUtil.TestLoader()
diff --git a/python/subunit/tests/test_test_protocol.py b/python/subunit/tests/test_test_protocol.py
index e1287b6..f7bab5c 100644
--- a/python/subunit/tests/test_test_protocol.py
+++ b/python/subunit/tests/test_test_protocol.py
@@ -6,7 +6,7 @@
# license at the users choice. A copy of both licenses are available in the
# project source as Apache-2.0 and BSD. You may not use this file except in
# compliance with one of these two licences.
-#
+#
# Unless required by applicable law or agreed to in writing, software
# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@@ -18,7 +18,6 @@ import datetime
import unittest
from StringIO import StringIO
import os
-import sys
from testtools.content import Content, TracebackContent
from testtools.content_type import ContentType
@@ -61,7 +60,6 @@ class TestProtocolServerForward(unittest.TestCase):
pipe = StringIO("test old mcdonald\n"
"success old mcdonald\n")
protocol.readFrom(pipe)
- mcdonald = subunit.RemotedTestCase("old mcdonald")
self.assertEqual(client.testsRun, 1)
self.assertEqual(pipe.getvalue(), out.getvalue())
@@ -74,7 +72,7 @@ class TestProtocolServerForward(unittest.TestCase):
protocol.readFrom(pipe)
self.assertEqual(client.testsRun, 0)
self.assertEqual("", out.getvalue())
-
+
class TestTestProtocolServerPipe(unittest.TestCase):
@@ -90,7 +88,6 @@ class TestTestProtocolServerPipe(unittest.TestCase):
"test an error\n"
"error an error\n")
protocol.readFrom(pipe)
- mcdonald = subunit.RemotedTestCase("old mcdonald")
bing = subunit.RemotedTestCase("bing crosby")
an_error = subunit.RemotedTestCase("an error")
self.assertEqual(client.errors,
@@ -311,7 +308,7 @@ class TestTestProtocolServerLostConnection(unittest.TestCase):
self.protocol.lineReceived("%s old mcdonald %s" % (outcome, opening))
self.protocol.lostConnection()
failure = subunit.RemoteError(
- u"lost connection during %s report of test 'old mcdonald'" %
+ u"lost connection during %s report of test 'old mcdonald'" %
outcome)
self.assertEqual([
('startTest', self.test),
@@ -681,12 +678,6 @@ class TestTestProtocolServerAddSuccess(unittest.TestCase):
], self.client._events)
def test_simple_success(self):
- self.simple_success_keyword("failure")
-
- def test_simple_success_colon(self):
- self.simple_success_keyword("failure:")
-
- def test_simple_success(self):
self.simple_success_keyword("successful")
def test_simple_success_colon(self):
@@ -946,7 +937,7 @@ class TestIsolatedTestCase(unittest.TestCase):
def test_construct(self):
- test = self.SampleIsolatedTestCase("test_sets_global_state")
+ self.SampleIsolatedTestCase("test_sets_global_state")
def test_run(self):
result = unittest.TestResult()
@@ -982,7 +973,7 @@ class TestIsolatedTestSuite(unittest.TestCase):
def test_construct(self):
- suite = subunit.IsolatedTestSuite()
+ subunit.IsolatedTestSuite()
def test_run(self):
result = unittest.TestResult()
@@ -1117,7 +1108,7 @@ class TestTestProtocolClient(unittest.TestCase):
self.assertEqual(
self.io.getvalue(),
'skip: %s [\nHas it really?\n]\n' % self.test.id())
-
+
def test_add_skip_details(self):
"""Test addSkip on a TestProtocolClient with details."""
details = {'reason':Content(
diff --git a/python/subunit/tests/test_test_results.py b/python/subunit/tests/test_test_results.py
index fe82c04..94d2274 100644
--- a/python/subunit/tests/test_test_results.py
+++ b/python/subunit/tests/test_test_results.py
@@ -6,7 +6,7 @@
# license at the users choice. A copy of both licenses are available in the
# project source as Apache-2.0 and BSD. You may not use this file except in
# compliance with one of these two licences.
-#
+#
# Unless required by applicable law or agreed to in writing, software
# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@@ -16,12 +16,9 @@
import datetime
import unittest
-from StringIO import StringIO
-import os
-import sys
-from testtools.content_type import ContentType
-from testtools.content import Content
+from testtools import TestCase
+from testtools.testresult.doubles import ExtendedTestResult
import subunit
import subunit.iso8601 as iso8601
@@ -82,22 +79,22 @@ class TestHookedTestResultDecorator(unittest.TestCase):
def test_startTest(self):
self.result.startTest(self)
-
+
def test_startTestRun(self):
self.result.startTestRun()
-
+
def test_stopTest(self):
self.result.stopTest(self)
-
+
def test_stopTestRun(self):
self.result.stopTestRun()
def test_addError(self):
self.result.addError(self, subunit.RemoteError())
-
+
def test_addError_details(self):
self.result.addError(self, details={})
-
+
def test_addFailure(self):
self.result.addFailure(self, subunit.RemoteError())
@@ -142,7 +139,7 @@ class TestHookedTestResultDecorator(unittest.TestCase):
def test_time(self):
self.result.time(None)
-
+
class TestAutoTimingTestResultDecorator(unittest.TestCase):
@@ -193,6 +190,110 @@ class TestAutoTimingTestResultDecorator(unittest.TestCase):
self.assertNotEqual(None, self.decorated._calls[2])
+class TestTagCollapsingDecorator(TestCase):
+
+ def test_tags_forwarded_outside_of_tests(self):
+ result = ExtendedTestResult()
+ tag_collapser = subunit.test_results.TagCollapsingDecorator(result)
+ tag_collapser.tags(set(['a', 'b']), set())
+ self.assertEquals(
+ [('tags', set(['a', 'b']), set([]))], result._events)
+
+ def test_tags_collapsed_inside_of_tests(self):
+ result = ExtendedTestResult()
+ tag_collapser = subunit.test_results.TagCollapsingDecorator(result)
+ test = subunit.RemotedTestCase('foo')
+ tag_collapser.startTest(test)
+ tag_collapser.tags(set(['a']), set())
+ tag_collapser.tags(set(['b']), set(['a']))
+ tag_collapser.tags(set(['c']), set())
+ tag_collapser.stopTest(test)
+ self.assertEquals(
+ [('startTest', test),
+ ('tags', set(['b', 'c']), set(['a'])),
+ ('stopTest', test)],
+ result._events)
+
+ def test_tags_collapsed_inside_of_tests_different_ordering(self):
+ result = ExtendedTestResult()
+ tag_collapser = subunit.test_results.TagCollapsingDecorator(result)
+ test = subunit.RemotedTestCase('foo')
+ tag_collapser.startTest(test)
+ tag_collapser.tags(set(), set(['a']))
+ tag_collapser.tags(set(['a', 'b']), set())
+ tag_collapser.tags(set(['c']), set())
+ tag_collapser.stopTest(test)
+ self.assertEquals(
+ [('startTest', test),
+ ('tags', set(['a', 'b', 'c']), set()),
+ ('stopTest', test)],
+ result._events)
+
+
+class TestTimeCollapsingDecorator(TestCase):
+
+ def make_time(self):
+ # Heh heh.
+ return datetime.datetime(
+ 2000, 1, self.getUniqueInteger(), tzinfo=iso8601.UTC)
+
+ def test_initial_time_forwarded(self):
+ # We always forward the first time event we see.
+ result = ExtendedTestResult()
+ tag_collapser = subunit.test_results.TimeCollapsingDecorator(result)
+ a_time = self.make_time()
+ tag_collapser.time(a_time)
+ self.assertEquals([('time', a_time)], result._events)
+
+ def test_time_collapsed_to_first_and_last(self):
+ # If there are many consecutive time events, only the first and last
+ # are sent through.
+ result = ExtendedTestResult()
+ tag_collapser = subunit.test_results.TimeCollapsingDecorator(result)
+ times = [self.make_time() for i in range(5)]
+ for a_time in times:
+ tag_collapser.time(a_time)
+ tag_collapser.startTest(subunit.RemotedTestCase('foo'))
+ self.assertEquals(
+ [('time', times[0]), ('time', times[-1])], result._events[:-1])
+
+ def test_only_one_time_sent(self):
+ # If we receive a single time event followed by a non-time event, we
+ # send exactly one time event.
+ result = ExtendedTestResult()
+ tag_collapser = subunit.test_results.TimeCollapsingDecorator(result)
+ a_time = self.make_time()
+ tag_collapser.time(a_time)
+ tag_collapser.startTest(subunit.RemotedTestCase('foo'))
+ self.assertEquals([('time', a_time)], result._events[:-1])
+
+ def test_duplicate_times_not_sent(self):
+ # Many time events with the exact same time are collapsed into one
+ # time event.
+ result = ExtendedTestResult()
+ tag_collapser = subunit.test_results.TimeCollapsingDecorator(result)
+ a_time = self.make_time()
+ for i in range(5):
+ tag_collapser.time(a_time)
+ tag_collapser.startTest(subunit.RemotedTestCase('foo'))
+ self.assertEquals([('time', a_time)], result._events[:-1])
+
+ def test_no_times_inserted(self):
+ result = ExtendedTestResult()
+ tag_collapser = subunit.test_results.TimeCollapsingDecorator(result)
+ a_time = self.make_time()
+ tag_collapser.time(a_time)
+ foo = subunit.RemotedTestCase('foo')
+ tag_collapser.startTest(foo)
+ tag_collapser.addSuccess(foo)
+ tag_collapser.stopTest(foo)
+ self.assertEquals(
+ [('time', a_time),
+ ('startTest', foo),
+ ('addSuccess', foo),
+ ('stopTest', foo)], result._events)
+
+
def test_suite():
loader = subunit.tests.TestUtil.TestLoader()
result = loader.loadTestsFromName(__name__)