diff options
| author | Robert Collins <robertc@robertcollins.net> | 2013-02-26 22:08:32 +1300 |
|---|---|---|
| committer | Robert Collins <robertc@robertcollins.net> | 2013-02-26 22:08:32 +1300 |
| commit | 11eee3968fd640b279d70261c11c6990fac68478 (patch) | |
| tree | 8ca4111a742c77611c8f668df9910828b969f6bd /python | |
| parent | c960118f3bcefc95deb3499e655c3ef0d6f9981c (diff) | |
| download | subunit-git-11eee3968fd640b279d70261c11c6990fac68478.tar.gz | |
Update subunit-filter to consume and emit v2.
Diffstat (limited to 'python')
| -rw-r--r-- | python/subunit/filters.py | 51 | ||||
| -rw-r--r-- | python/subunit/tests/test_subunit_filter.py | 105 |
2 files changed, 87 insertions, 69 deletions
diff --git a/python/subunit/filters.py b/python/subunit/filters.py index dc3fd8a..208bd8b 100644 --- a/python/subunit/filters.py +++ b/python/subunit/filters.py @@ -17,7 +17,12 @@ from optparse import OptionParser import sys -from subunit import DiscardStream, ProtocolTestCase +from testtools import StreamResultRouter + +from subunit import ( + DiscardStream, ProtocolTestCase, ByteStreamToStreamResult, + StreamResultToBytes, + ) def make_options(description): @@ -36,28 +41,53 @@ def make_options(description): def run_tests_from_stream(input_stream, result, passthrough_stream=None, - forward_stream=None): + forward_stream=None, protocol_version=1): """Run tests from a subunit input stream through 'result'. + Non-test events - top level file attachments - are expected to be + dropped by v2 StreamResults at the present time (as all the analysis code + is in ExtendedTestResult API's), so to implement passthrough_stream they + are diverted and copied directly when that is set. + :param input_stream: A stream containing subunit input. :param result: A TestResult that will receive the test events. + NB: This should be an ExtendedTestResult for v1 and a StreamResult for + v2. :param passthrough_stream: All non-subunit input received will be sent to this stream. If not provided, uses the ``TestProtocolServer`` default, which is ``sys.stdout``. :param forward_stream: All subunit input received will be forwarded to this stream. If not provided, uses the ``TestProtocolServer`` default, which is to not forward any input. + :param protocol_version: What version of the subunit protocol to expect. """ - test = ProtocolTestCase( - input_stream, passthrough=passthrough_stream, - forward=forward_stream) + orig_result = None + if 1==protocol_version: + test = ProtocolTestCase( + input_stream, passthrough=passthrough_stream, + forward=forward_stream) + elif 2==protocol_version: + if passthrough_stream is not None: + # As the StreamToExtendedDecorator discards non-test events + # we merely have to copy them IFF they are requested. + passthrough_result = StreamResultToBytes(passthrough_stream) + orig_result = result + result = StreamResultRouter(result) + result.map(passthrough_result, 'test_id', test_id=None) + orig_result.startTestRun() + test = ByteStreamToStreamResult(input_stream, + non_subunit_name='stdout') + else: + raise Exception("Unknown protocol version.") result.startTestRun() test.run(result) + if orig_result is not None: + orig_result.stopTestRun() result.stopTestRun() def filter_by_result(result_factory, output_path, passthrough, forward, - input_stream=sys.stdin): + input_stream=sys.stdin, protocol_version=1): """Filter an input stream using a test result. :param result_factory: A callable that when passed an output stream @@ -71,12 +101,16 @@ def filter_by_result(result_factory, output_path, passthrough, forward, ``sys.stdout`` as well as to the ``TestResult``. :param input_stream: The source of subunit input. Defaults to ``sys.stdin``. + :param protocol_version: The subunit protocol version to expect. :return: A test result with the resultts of the run. """ if passthrough: passthrough_stream = sys.stdout else: - passthrough_stream = DiscardStream() + if 1==protocol_version: + passthrough_stream = DiscardStream() + else: + passthrough_stream = None if forward: forward_stream = sys.stdout @@ -91,7 +125,8 @@ def filter_by_result(result_factory, output_path, passthrough, forward, try: result = result_factory(output_to) run_tests_from_stream( - input_stream, result, passthrough_stream, forward_stream) + input_stream, result, passthrough_stream, forward_stream, + protocol_version=protocol_version) finally: if output_path: output_to.close() diff --git a/python/subunit/tests/test_subunit_filter.py b/python/subunit/tests/test_subunit_filter.py index 33b9248..1cde16b 100644 --- a/python/subunit/tests/test_subunit_filter.py +++ b/python/subunit/tests/test_subunit_filter.py @@ -25,10 +25,11 @@ import unittest from testtools import TestCase from testtools.compat import _b, BytesIO -from testtools.testresult.doubles import ExtendedTestResult +from testtools.testresult.doubles import ExtendedTestResult, StreamResult import subunit from subunit.test_results import make_tag_filter, TestResultFilter +from subunit import ByteStreamToStreamResult, StreamResultToBytes class TestTestResultFilter(TestCase): @@ -286,23 +287,6 @@ xfail todo class TestFilterCommand(TestCase): - example_subunit_stream = _b("""\ -tags: global -test passed -success passed -test failed -tags: local -failure failed -test error -error error [ -error details -] -test skipped -skip skipped -test todo -xfail todo -""") - def run_command(self, args, stream): root = os.path.dirname( os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) @@ -316,52 +300,51 @@ xfail todo raise RuntimeError("%s failed: %s" % (command, err)) return out - def to_events(self, stream): - test = subunit.ProtocolTestCase(BytesIO(stream)) - result = ExtendedTestResult() - test.run(result) - return result._events - def test_default(self): - output = self.run_command([], _b( - "test: foo\n" - "skip: foo\n" - )) - events = self.to_events(output) - foo = subunit.RemotedTestCase('foo') - self.assertEqual( - [('startTest', foo), - ('addSkip', foo, {}), - ('stopTest', foo)], - events) + byte_stream = BytesIO() + stream = StreamResultToBytes(byte_stream) + stream.status(test_id="foo", test_status="inprogress") + stream.status(test_id="foo", test_status="skip") + output = self.run_command([], byte_stream.getvalue()) + events = StreamResult() + ByteStreamToStreamResult(BytesIO(output)).run(events) + ids = set(event[1] for event in events._events) + self.assertEqual([ + ('status', 'foo', 'inprogress'), + ('status', 'foo', 'skip'), + ], [event[:3] for event in events._events]) def test_tags(self): - output = self.run_command(['-s', '--with-tag', 'a'], _b( - "tags: a\n" - "test: foo\n" - "success: foo\n" - "tags: -a\n" - "test: bar\n" - "success: bar\n" - "test: baz\n" - "tags: a\n" - "success: baz\n" - )) - events = self.to_events(output) - foo = subunit.RemotedTestCase('foo') - baz = subunit.RemotedTestCase('baz') - self.assertEqual( - [('tags', set(['a']), set()), - ('startTest', foo), - ('addSuccess', foo), - ('stopTest', foo), - ('tags', set(), set(['a'])), - ('startTest', baz), - ('tags', set(['a']), set()), - ('addSuccess', baz), - ('stopTest', baz), - ], - events) + byte_stream = BytesIO() + stream = StreamResultToBytes(byte_stream) + stream.status( + test_id="foo", test_status="inprogress", test_tags=set(["a"])) + stream.status( + test_id="foo", test_status="success", test_tags=set(["a"])) + stream.status(test_id="bar", test_status="inprogress") + stream.status(test_id="bar", test_status="inprogress") + stream.status( + test_id="baz", test_status="inprogress", test_tags=set(["a"])) + stream.status( + test_id="baz", test_status="success", test_tags=set(["a"])) + output = self.run_command( + ['-s', '--with-tag', 'a'], byte_stream.getvalue()) + events = StreamResult() + ByteStreamToStreamResult(BytesIO(output)).run(events) + ids = set(event[1] for event in events._events) + self.assertEqual(set(['foo', 'baz']), ids) + + def test_no_passthrough(self): + output = self.run_command(['--no-passthrough'], b'hi thar') + self.assertEqual(b'', output) + + def test_passthrough(self): + output = self.run_command([], b'hi thar') + byte_stream = BytesIO() + stream = StreamResultToBytes(byte_stream) + for pos, _ in enumerate(b'hi thar'): + stream.status(file_name="stdout", file_bytes=b'hi thar'[pos:pos+1]) + self.assertEqual(byte_stream.getvalue(), output) def test_suite(): |
