summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorRobert Collins <robertc@robertcollins.net>2013-02-26 22:08:32 +1300
committerRobert Collins <robertc@robertcollins.net>2013-02-26 22:08:32 +1300
commit11eee3968fd640b279d70261c11c6990fac68478 (patch)
tree8ca4111a742c77611c8f668df9910828b969f6bd /python
parentc960118f3bcefc95deb3499e655c3ef0d6f9981c (diff)
downloadsubunit-git-11eee3968fd640b279d70261c11c6990fac68478.tar.gz
Update subunit-filter to consume and emit v2.
Diffstat (limited to 'python')
-rw-r--r--python/subunit/filters.py51
-rw-r--r--python/subunit/tests/test_subunit_filter.py105
2 files changed, 87 insertions, 69 deletions
diff --git a/python/subunit/filters.py b/python/subunit/filters.py
index dc3fd8a..208bd8b 100644
--- a/python/subunit/filters.py
+++ b/python/subunit/filters.py
@@ -17,7 +17,12 @@
from optparse import OptionParser
import sys
-from subunit import DiscardStream, ProtocolTestCase
+from testtools import StreamResultRouter
+
+from subunit import (
+ DiscardStream, ProtocolTestCase, ByteStreamToStreamResult,
+ StreamResultToBytes,
+ )
def make_options(description):
@@ -36,28 +41,53 @@ def make_options(description):
def run_tests_from_stream(input_stream, result, passthrough_stream=None,
- forward_stream=None):
+ forward_stream=None, protocol_version=1):
"""Run tests from a subunit input stream through 'result'.
+ Non-test events - top level file attachments - are expected to be
+ dropped by v2 StreamResults at the present time (as all the analysis code
+ is in ExtendedTestResult API's), so to implement passthrough_stream they
+ are diverted and copied directly when that is set.
+
:param input_stream: A stream containing subunit input.
:param result: A TestResult that will receive the test events.
+ NB: This should be an ExtendedTestResult for v1 and a StreamResult for
+ v2.
:param passthrough_stream: All non-subunit input received will be
sent to this stream. If not provided, uses the ``TestProtocolServer``
default, which is ``sys.stdout``.
:param forward_stream: All subunit input received will be forwarded
to this stream. If not provided, uses the ``TestProtocolServer``
default, which is to not forward any input.
+ :param protocol_version: What version of the subunit protocol to expect.
"""
- test = ProtocolTestCase(
- input_stream, passthrough=passthrough_stream,
- forward=forward_stream)
+ orig_result = None
+ if 1==protocol_version:
+ test = ProtocolTestCase(
+ input_stream, passthrough=passthrough_stream,
+ forward=forward_stream)
+ elif 2==protocol_version:
+ if passthrough_stream is not None:
+ # As the StreamToExtendedDecorator discards non-test events
+ # we merely have to copy them IFF they are requested.
+ passthrough_result = StreamResultToBytes(passthrough_stream)
+ orig_result = result
+ result = StreamResultRouter(result)
+ result.map(passthrough_result, 'test_id', test_id=None)
+ orig_result.startTestRun()
+ test = ByteStreamToStreamResult(input_stream,
+ non_subunit_name='stdout')
+ else:
+ raise Exception("Unknown protocol version.")
result.startTestRun()
test.run(result)
+ if orig_result is not None:
+ orig_result.stopTestRun()
result.stopTestRun()
def filter_by_result(result_factory, output_path, passthrough, forward,
- input_stream=sys.stdin):
+ input_stream=sys.stdin, protocol_version=1):
"""Filter an input stream using a test result.
:param result_factory: A callable that when passed an output stream
@@ -71,12 +101,16 @@ def filter_by_result(result_factory, output_path, passthrough, forward,
``sys.stdout`` as well as to the ``TestResult``.
:param input_stream: The source of subunit input. Defaults to
``sys.stdin``.
+ :param protocol_version: The subunit protocol version to expect.
:return: A test result with the resultts of the run.
"""
if passthrough:
passthrough_stream = sys.stdout
else:
- passthrough_stream = DiscardStream()
+ if 1==protocol_version:
+ passthrough_stream = DiscardStream()
+ else:
+ passthrough_stream = None
if forward:
forward_stream = sys.stdout
@@ -91,7 +125,8 @@ def filter_by_result(result_factory, output_path, passthrough, forward,
try:
result = result_factory(output_to)
run_tests_from_stream(
- input_stream, result, passthrough_stream, forward_stream)
+ input_stream, result, passthrough_stream, forward_stream,
+ protocol_version=protocol_version)
finally:
if output_path:
output_to.close()
diff --git a/python/subunit/tests/test_subunit_filter.py b/python/subunit/tests/test_subunit_filter.py
index 33b9248..1cde16b 100644
--- a/python/subunit/tests/test_subunit_filter.py
+++ b/python/subunit/tests/test_subunit_filter.py
@@ -25,10 +25,11 @@ import unittest
from testtools import TestCase
from testtools.compat import _b, BytesIO
-from testtools.testresult.doubles import ExtendedTestResult
+from testtools.testresult.doubles import ExtendedTestResult, StreamResult
import subunit
from subunit.test_results import make_tag_filter, TestResultFilter
+from subunit import ByteStreamToStreamResult, StreamResultToBytes
class TestTestResultFilter(TestCase):
@@ -286,23 +287,6 @@ xfail todo
class TestFilterCommand(TestCase):
- example_subunit_stream = _b("""\
-tags: global
-test passed
-success passed
-test failed
-tags: local
-failure failed
-test error
-error error [
-error details
-]
-test skipped
-skip skipped
-test todo
-xfail todo
-""")
-
def run_command(self, args, stream):
root = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
@@ -316,52 +300,51 @@ xfail todo
raise RuntimeError("%s failed: %s" % (command, err))
return out
- def to_events(self, stream):
- test = subunit.ProtocolTestCase(BytesIO(stream))
- result = ExtendedTestResult()
- test.run(result)
- return result._events
-
def test_default(self):
- output = self.run_command([], _b(
- "test: foo\n"
- "skip: foo\n"
- ))
- events = self.to_events(output)
- foo = subunit.RemotedTestCase('foo')
- self.assertEqual(
- [('startTest', foo),
- ('addSkip', foo, {}),
- ('stopTest', foo)],
- events)
+ byte_stream = BytesIO()
+ stream = StreamResultToBytes(byte_stream)
+ stream.status(test_id="foo", test_status="inprogress")
+ stream.status(test_id="foo", test_status="skip")
+ output = self.run_command([], byte_stream.getvalue())
+ events = StreamResult()
+ ByteStreamToStreamResult(BytesIO(output)).run(events)
+ ids = set(event[1] for event in events._events)
+ self.assertEqual([
+ ('status', 'foo', 'inprogress'),
+ ('status', 'foo', 'skip'),
+ ], [event[:3] for event in events._events])
def test_tags(self):
- output = self.run_command(['-s', '--with-tag', 'a'], _b(
- "tags: a\n"
- "test: foo\n"
- "success: foo\n"
- "tags: -a\n"
- "test: bar\n"
- "success: bar\n"
- "test: baz\n"
- "tags: a\n"
- "success: baz\n"
- ))
- events = self.to_events(output)
- foo = subunit.RemotedTestCase('foo')
- baz = subunit.RemotedTestCase('baz')
- self.assertEqual(
- [('tags', set(['a']), set()),
- ('startTest', foo),
- ('addSuccess', foo),
- ('stopTest', foo),
- ('tags', set(), set(['a'])),
- ('startTest', baz),
- ('tags', set(['a']), set()),
- ('addSuccess', baz),
- ('stopTest', baz),
- ],
- events)
+ byte_stream = BytesIO()
+ stream = StreamResultToBytes(byte_stream)
+ stream.status(
+ test_id="foo", test_status="inprogress", test_tags=set(["a"]))
+ stream.status(
+ test_id="foo", test_status="success", test_tags=set(["a"]))
+ stream.status(test_id="bar", test_status="inprogress")
+ stream.status(test_id="bar", test_status="inprogress")
+ stream.status(
+ test_id="baz", test_status="inprogress", test_tags=set(["a"]))
+ stream.status(
+ test_id="baz", test_status="success", test_tags=set(["a"]))
+ output = self.run_command(
+ ['-s', '--with-tag', 'a'], byte_stream.getvalue())
+ events = StreamResult()
+ ByteStreamToStreamResult(BytesIO(output)).run(events)
+ ids = set(event[1] for event in events._events)
+ self.assertEqual(set(['foo', 'baz']), ids)
+
+ def test_no_passthrough(self):
+ output = self.run_command(['--no-passthrough'], b'hi thar')
+ self.assertEqual(b'', output)
+
+ def test_passthrough(self):
+ output = self.run_command([], b'hi thar')
+ byte_stream = BytesIO()
+ stream = StreamResultToBytes(byte_stream)
+ for pos, _ in enumerate(b'hi thar'):
+ stream.status(file_name="stdout", file_bytes=b'hi thar'[pos:pos+1])
+ self.assertEqual(byte_stream.getvalue(), output)
def test_suite():