diff options
| author | Thomi Richards <thomi.richards@canonical.com> | 2013-11-25 17:52:26 +1300 |
|---|---|---|
| committer | Thomi Richards <thomi.richards@canonical.com> | 2013-11-25 17:52:26 +1300 |
| commit | 3413fbd1d00240b74084d9ac001bdcb647a9d515 (patch) | |
| tree | 8222f451ca1abdbd423e07e1f9d63f071ce98267 /python/subunit/tests | |
| parent | ec7ebd453b31beb5e43097564947fe725181b37a (diff) | |
| download | subunit-git-3413fbd1d00240b74084d9ac001bdcb647a9d515.tar.gz | |
Lots of fixes from code review.
Diffstat (limited to 'python/subunit/tests')
| -rw-r--r-- | python/subunit/tests/test_output_filter.py | 343 |
1 files changed, 226 insertions, 117 deletions
diff --git a/python/subunit/tests/test_output_filter.py b/python/subunit/tests/test_output_filter.py index 21d8172..401ec08 100644 --- a/python/subunit/tests/test_output_filter.py +++ b/python/subunit/tests/test_output_filter.py @@ -13,6 +13,7 @@ # license you chose for the specific language governing permissions and # limitations under that license. # + import datetime from functools import partial from io import BytesIO, StringIO @@ -20,9 +21,10 @@ import optparse import sys from tempfile import NamedTemporaryFile +from contextlib import contextmanager from testscenarios import WithScenarios from testtools import TestCase -from testtools.compat import _b, _u +from testtools.compat import _u from testtools.matchers import ( Equals, Matcher, @@ -35,9 +37,10 @@ from testtools.testresult.doubles import StreamResult from subunit.iso8601 import UTC from subunit.v2 import StreamResultToBytes, ByteStreamToStreamResult from subunit._output import ( - generate_bytestream, + _ALL_ACTIONS, + _FINAL_ACTIONS, + generate_stream_results, parse_arguments, - write_chunked_file, ) import subunit._output as _o @@ -67,15 +70,7 @@ class TestCaseWithPatchedStderr(TestCase): class TestStatusArgParserTests(WithScenarios, TestCaseWithPatchedStderr): scenarios = [ - (cmd, dict(command=cmd, option='--' + cmd)) for cmd in ( - 'exists', - 'fail', - 'inprogress', - 'skip', - 'success', - 'uxsuccess', - 'xfail', - ) + (cmd, dict(command=cmd, option='--' + cmd)) for cmd in _ALL_ACTIONS ] def test_can_parse_all_commands_with_test_id(self): @@ -113,7 +108,7 @@ class TestStatusArgParserTests(WithScenarios, TestCaseWithPatchedStderr): self.assertThat(args.tags, Equals(["foo", "bar", "baz"])) def test_attach_file_with_hyphen_opens_stdin(self): - self.patch(_o, 'stdin', StringIO(_u("Hello"))) + self.patch(_o.sys, 'stdin', StringIO(_u("Hello"))) args = safe_parse_arguments( args=[self.option, "foo", "--attach-file", "-"] ) @@ -196,7 +191,7 @@ class ArgParserTests(TestCaseWithPatchedStderr): def get_result_for(commands): """Get a result object from *commands. - Runs the 'generate_bytestream' function from subunit._output after + Runs the 'generate_stream_results' function from subunit._output after parsing *commands as if they were specified on the command line. The resulting bytestream is then converted back into a result object and returned. @@ -205,7 +200,7 @@ def get_result_for(commands): args = safe_parse_arguments(commands) output_writer = StreamResultToBytes(output_stream=stream) - generate_bytestream(args, output_writer) + generate_stream_results(args, output_writer) stream.seek(0) @@ -215,143 +210,257 @@ def get_result_for(commands): return result -class ByteStreamCompatibilityTests(WithScenarios, TestCase): +@contextmanager +def temp_file_contents(data): + """Create a temporary file on disk containing 'data'.""" + with NamedTemporaryFile() as f: + f.write(data) + f.seek(0) + yield f + + +class StatusStreamResultTests(WithScenarios, TestCase): scenarios = [ - (s, dict(status=s, option='--' + s)) for s in ( - 'exists', - 'fail', - 'inprogress', - 'skip', - 'success', - 'uxsuccess', - 'xfail', - ) + (s, dict(status=s, option='--' + s)) for s in _ALL_ACTIONS ] _dummy_timestamp = datetime.datetime(2013, 1, 1, 0, 0, 0, 0, UTC) def setUp(self): - super(ByteStreamCompatibilityTests, self).setUp() + super(StatusStreamResultTests, self).setUp() self.patch(_o, 'create_timestamp', lambda: self._dummy_timestamp) + self.test_id = self.getUniqueString() + + def test_only_one_packet_is_generated(self): + result = get_result_for([self.option, self.test_id]) + self.assertThat( + len(result._events), + Equals(1) + ) def test_correct_status_is_generated(self): - result = get_result_for([self.option, 'foo']) + result = get_result_for([self.option, self.test_id]) self.assertThat( result._events[0], - MatchesCall( - call='status', - test_id='foo', - test_status=self.status, - timestamp=self._dummy_timestamp, - ) + MatchesStatusCall(test_status=self.status) ) - def test_all_commands_accept_tags(self): - result = get_result_for([self.option, 'foo', '--tags', 'hello,world']) + def test_all_commands_generate_tags(self): + result = get_result_for([self.option, self.test_id, '--tags', 'hello,world']) self.assertThat( result._events[0], - MatchesCall( - call='status', - test_id='foo', - test_tags=set(['hello', 'world']), - timestamp=self._dummy_timestamp, - ) + MatchesStatusCall(test_tags=set(['hello', 'world'])) ) + def test_all_commands_generate_timestamp(self): + result = get_result_for([self.option, self.test_id]) -class FileChunkingTests(WithScenarios, TestCase): + self.assertThat( + result._events[0], + MatchesStatusCall(timestamp=self._dummy_timestamp) + ) - scenarios = [ - ("With test_id", dict(test_id="foo")), - ("Without test_id", dict(test_id=None)), - ] + def test_all_commands_generate_correct_test_id(self): + result = get_result_for([self.option, self.test_id]) - def _write_chunk_file(self, file_data, chunk_size=1024, mimetype=None, filename=None, test_id=None): - """Write file data to a subunit stream, get a StreamResult object.""" - stream = BytesIO() - output_writer = StreamResultToBytes(output_stream=stream) - - with NamedTemporaryFile() as f: - self._tmp_filename = f.name - f.write(file_data) - f.seek(0) - - write_chunked_file( - file_obj=f, - output_writer=output_writer, - chunk_size=chunk_size, - mime_type=mimetype, - test_id=test_id, - file_name=filename, + self.assertThat( + result._events[0], + MatchesStatusCall(test_id=self.test_id) + ) + + def test_file_is_sent_in_single_packet(self): + with temp_file_contents(b"Hello") as f: + result = get_result_for([self.option, self.test_id, '--attach-file', f.name]) + + self.assertThat( + result._events, + MatchesListwise([ + MatchesStatusCall(file_bytes=b'Hello', eof=True), + ]) ) - stream.seek(0) + def test_file_is_sent_with_test_id(self): + with temp_file_contents(b"Hello") as f: + result = get_result_for([self.option, self.test_id, '--attach-file', f.name]) - case = ByteStreamToStreamResult(source=stream) - result = StreamResult() - case.run(result) - return result + self.assertThat( + result._events, + MatchesListwise([ + MatchesStatusCall(test_id=self.test_id, file_bytes=b'Hello', eof=True), + ]) + ) def test_file_chunk_size_is_honored(self): - result = self._write_chunk_file( - file_data=_b("Hello"), - chunk_size=1, - test_id=self.test_id, - ) - self.assertThat( - result._events, - MatchesListwise([ - MatchesCall(call='status', test_id=self.test_id, file_bytes=_b('H'), mime_type=None, eof=False), - MatchesCall(call='status', test_id=self.test_id, file_bytes=_b('e'), mime_type=None, eof=False), - MatchesCall(call='status', test_id=self.test_id, file_bytes=_b('l'), mime_type=None, eof=False), - MatchesCall(call='status', test_id=self.test_id, file_bytes=_b('l'), mime_type=None, eof=False), - MatchesCall(call='status', test_id=self.test_id, file_bytes=_b('o'), mime_type=None, eof=False), - MatchesCall(call='status', test_id=self.test_id, file_bytes=_b(''), mime_type=None, eof=True), + with temp_file_contents(b"Hello") as f: + self.patch(_o, '_CHUNK_SIZE', 1) + result = get_result_for([self.option, self.test_id, '--attach-file', f.name]) + + self.assertThat( + result._events, + MatchesListwise([ + MatchesStatusCall(test_id=self.test_id, file_bytes=b'H', eof=False), + MatchesStatusCall(test_id=self.test_id, file_bytes=b'e', eof=False), + MatchesStatusCall(test_id=self.test_id, file_bytes=b'l', eof=False), + MatchesStatusCall(test_id=self.test_id, file_bytes=b'l', eof=False), + MatchesStatusCall(test_id=self.test_id, file_bytes=b'o', eof=True), + ]) + ) + + def test_file_mimetype_specified_once_only(self): + with temp_file_contents(b"Hi") as f: + self.patch(_o, '_CHUNK_SIZE', 1) + result = get_result_for([ + self.option, + self.test_id, + '--attach-file', + f.name, + '--mimetype', + 'text/plain', ]) - ) - def test_file_mimetype_is_honored(self): - result = self._write_chunk_file( - file_data=_b("SomeData"), - mimetype="text/plain", - test_id=self.test_id, - ) - self.assertThat( - result._events, - MatchesListwise([ - MatchesCall(call='status', test_id=self.test_id, file_bytes=_b('SomeData'), mime_type="text/plain"), - MatchesCall(call='status', test_id=self.test_id, file_bytes=_b(''), mime_type="text/plain"), + self.assertThat( + result._events, + MatchesListwise([ + MatchesStatusCall(test_id=self.test_id, mime_type='text/plain', file_bytes=b'H', eof=False), + MatchesStatusCall(test_id=self.test_id, mime_type=None, file_bytes=b'i', eof=True), + ]) + ) + + def test_tags_specified_once_only(self): + with temp_file_contents(b"Hi") as f: + self.patch(_o, '_CHUNK_SIZE', 1) + result = get_result_for([ + self.option, + self.test_id, + '--attach-file', + f.name, + '--tags', + 'foo,bar', ]) - ) - def test_file_name_is_honored(self): - result = self._write_chunk_file( - file_data=_b("data"), - filename="/some/name", - test_id=self.test_id - ) - self.assertThat( - result._events, - MatchesListwise([ - MatchesCall(call='status', test_id=self.test_id, file_name='/some/name'), - MatchesCall(call='status', test_id=self.test_id, file_name='/some/name'), + self.assertThat( + result._events, + MatchesListwise([ + MatchesStatusCall(test_id=self.test_id, test_tags=set(['foo', 'bar'])), + MatchesStatusCall(test_id=self.test_id, test_tags=None), + ]) + ) + + def test_timestamp_specified_once_only(self): + with temp_file_contents(b"Hi") as f: + self.patch(_o, '_CHUNK_SIZE', 1) + result = get_result_for([ + self.option, + self.test_id, + '--attach-file', + f.name, ]) - ) - def test_default_filename_is_used(self): - result = self._write_chunk_file(file_data=_b("data")) - self.assertThat( - result._events, - MatchesListwise([ - MatchesCall(call='status', file_name=self._tmp_filename), - MatchesCall(call='status', file_name=self._tmp_filename), + self.assertThat( + result._events, + MatchesListwise([ + MatchesStatusCall(test_id=self.test_id, timestamp=self._dummy_timestamp), + MatchesStatusCall(test_id=self.test_id, timestamp=None), + ]) + ) + + def test_test_status_specified_once_only(self): + with temp_file_contents(b"Hi") as f: + self.patch(_o, '_CHUNK_SIZE', 1) + result = get_result_for([ + self.option, + self.test_id, + '--attach-file', + f.name, ]) - ) + + # 'inprogress' status should be on the first packet only, all other + # statuses should be on the last packet. + if self.status in _FINAL_ACTIONS: + first_call = MatchesStatusCall(test_id=self.test_id, test_status=None) + last_call = MatchesStatusCall(test_id=self.test_id, test_status=self.status) + else: + first_call = MatchesStatusCall(test_id=self.test_id, test_status=self.status) + last_call = MatchesStatusCall(test_id=self.test_id, test_status=None) + self.assertThat( + result._events, + MatchesListwise([first_call, last_call]) + ) + + def test_filename_can_be_overridden(self): + with temp_file_contents(b"Hello") as f: + specified_file_name = self.getUniqueString() + result = get_result_for([ + self.option, + self.test_id, + '--attach-file', + f.name, + '--file-name', + specified_file_name]) + + self.assertThat( + result._events, + MatchesListwise([ + MatchesStatusCall(file_name=specified_file_name, file_bytes=b'Hello'), + ]) + ) + + def test_file_name_is_used_by_default(self): + with temp_file_contents(b"Hello") as f: + result = get_result_for([self.option, self.test_id, '--attach-file', f.name]) + + self.assertThat( + result._events, + MatchesListwise([ + MatchesStatusCall(file_name=f.name, file_bytes=b'Hello', eof=True), + ]) + ) + + +class GlobalFileDataTests(TestCase): + + def test_can_attach_file_without_test_id(self): + with temp_file_contents(b"Hello") as f: + result = get_result_for(['--attach-file', f.name]) + + self.assertThat( + result._events, + MatchesListwise([ + MatchesStatusCall(test_id=None, file_bytes=b'Hello', eof=True), + ]) + ) + + def test_file_name_is_used_by_default(self): + with temp_file_contents(b"Hello") as f: + result = get_result_for(['--attach-file', f.name]) + + self.assertThat( + result._events, + MatchesListwise([ + MatchesStatusCall(file_name=f.name, file_bytes=b'Hello', eof=True), + ]) + ) + + def test_filename_can_be_overridden(self): + with temp_file_contents(b"Hello") as f: + specified_file_name = self.getUniqueString() + result = get_result_for([ + '--attach-file', + f.name, + '--file-name', + specified_file_name]) + + self.assertThat( + result._events, + MatchesListwise([ + MatchesStatusCall(file_name=specified_file_name, file_bytes=b'Hello'), + ]) + ) -class MatchesCall(Matcher): +class MatchesStatusCall(Matcher): _position_lookup = { 'call': 0, @@ -388,4 +497,4 @@ class MatchesCall(Matcher): return Mismatch("Key %s is not present." % k) def __str__(self): - return "<MatchesCall %r>" % self._filters + return "<MatchesStatusCall %r>" % self._filters |
