diff options
Diffstat (limited to 'python')
| -rw-r--r-- | python/subunit/chunked.py | 24 | ||||
| -rw-r--r-- | python/subunit/tests/test_chunked.py | 24 |
2 files changed, 42 insertions, 6 deletions
diff --git a/python/subunit/chunked.py b/python/subunit/chunked.py index 82e4b0d..5f8c6f1 100644 --- a/python/subunit/chunked.py +++ b/python/subunit/chunked.py @@ -1,12 +1,13 @@ # # subunit: extensions to python unittest to get test results from subprocesses. # Copyright (C) 2005 Robert Collins <robertc@robertcollins.net> +# Copyright (C) 2011 Martin Pool <mbp@sourcefrog.net> # # Licensed under either the Apache License, Version 2.0 or the BSD 3-clause # license at the users choice. A copy of both licenses are available in the # project source as Apache-2.0 and BSD. You may not use this file except in # compliance with one of these two licences. -# +# # Unless required by applicable law or agreed to in writing, software # distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the @@ -19,7 +20,7 @@ class Decoder(object): """Decode chunked content to a byte stream.""" - def __init__(self, output): + def __init__(self, output, strict=True): """Create a decoder decoding to output. :param output: A file-like object. Bytes written to the Decoder are @@ -29,11 +30,18 @@ class Decoder(object): when no more data is available, to detect short streams; the write method will return none-None when the end of a stream is detected. + + :param strict: If True (the default), the decoder will not knowingly + accept input that is not conformant to the HTTP specification. + (This does not imply that it will catch every nonconformance.) + If False, it will accept incorrect input that is still + unambiguous. """ self.output = output self.buffered_bytes = [] self.state = self._read_length self.body_length = 0 + self.strict = strict def close(self): """Close the decoder. @@ -72,7 +80,6 @@ class Decoder(object): def _read_length(self): """Try to decode a length from the bytes.""" - count = -1 match_chars = "0123456789abcdefABCDEF\r\n" count_chars = [] for bytes in self.buffered_bytes: @@ -87,7 +94,12 @@ class Decoder(object): if count_chars[-1][-1] != '\n': return count_str = ''.join(count_chars) - self.body_length = int(count_str[:-2], 16) + if self.strict: + if count_str[-2:] != '\r\n': + raise ValueError("chunk header invalid: %r" % count_str) + if '\r' in count_str[:-2]: + raise ValueError("too many CRs in chunk header %r" % count_str) + self.body_length = int(count_str.rstrip('\n\r'), 16) excess_bytes = len(count_str) while excess_bytes: if excess_bytes >= len(self.buffered_bytes[0]): @@ -107,7 +119,7 @@ class Decoder(object): def write(self, bytes): """Decode bytes to the output stream. - + :raises ValueError: If the stream has already seen the end of file marker. :returns: None, or the excess bytes beyond the end of file marker. @@ -133,7 +145,7 @@ class Encoder(object): def flush(self, extra_len=0): """Flush the encoder to the output stream. - + :param extra_len: Increase the size of the chunk by this many bytes to allow for a subsequent write. """ diff --git a/python/subunit/tests/test_chunked.py b/python/subunit/tests/test_chunked.py index a24e31e..eee7fe9 100644 --- a/python/subunit/tests/test_chunked.py +++ b/python/subunit/tests/test_chunked.py @@ -1,6 +1,7 @@ # # subunit: extensions to python unittest to get test results from subprocesses. # Copyright (C) 2005 Robert Collins <robertc@robertcollins.net> +# Copyright (C) 2011 Martin Pool <mbp@sourcefrog.net> # # Licensed under either the Apache License, Version 2.0 or the BSD 3-clause # license at the users choice. A copy of both licenses are available in the @@ -86,6 +87,29 @@ class TestDecode(unittest.TestCase): self.assertEqual('', self.decoder.write('0\r\n')) self.assertEqual('1' * 65536 + '2' * 65536, self.output.getvalue()) + def test_decode_newline_nonstrict(self): + """Tolerate chunk markers with no CR character.""" + # From <http://pad.lv/505078> + self.decoder = subunit.chunked.Decoder(self.output, strict=False) + self.assertEqual(None, self.decoder.write('a\n')) + self.assertEqual(None, self.decoder.write('abcdeabcde')) + self.assertEqual('', self.decoder.write('0\n')) + self.assertEqual('abcdeabcde', self.output.getvalue()) + + def test_decode_strict_newline_only(self): + """Reject chunk markers with no CR character in strict mode.""" + # From <http://pad.lv/505078> + self.assertRaises(ValueError, + self.decoder.write, 'a\n') + + def test_decode_strict_multiple_crs(self): + self.assertRaises(ValueError, + self.decoder.write, 'a\r\r\n') + + def test_decode_short_header(self): + self.assertRaises(ValueError, + self.decoder.write, '\n') + class TestEncode(unittest.TestCase): |
