From 0de847bb324e3bf3345c355f899c939180ecb406 Mon Sep 17 00:00:00 2001 From: Martin Pool Date: Tue, 11 Jan 2011 22:49:26 -0600 Subject: Default to (more) strict decoding of chunked parts --- python/subunit/chunked.py | 15 ++++++++++++++- python/subunit/tests/test_chunked.py | 20 ++++++++++++++++++-- 2 files changed, 32 insertions(+), 3 deletions(-) (limited to 'python') diff --git a/python/subunit/chunked.py b/python/subunit/chunked.py index 9c51357..c15675b 100644 --- a/python/subunit/chunked.py +++ b/python/subunit/chunked.py @@ -1,6 +1,7 @@ # # subunit: extensions to python unittest to get test results from subprocesses. # Copyright (C) 2005 Robert Collins +# Copyright (C) 2011 Martin Pool # # Licensed under either the Apache License, Version 2.0 or the BSD 3-clause # license at the users choice. A copy of both licenses are available in the @@ -19,7 +20,7 @@ class Decoder(object): """Decode chunked content to a byte stream.""" - def __init__(self, output): + def __init__(self, output, strict=True): """Create a decoder decoding to output. :param output: A file-like object. Bytes written to the Decoder are @@ -29,11 +30,18 @@ class Decoder(object): when no more data is available, to detect short streams; the write method will return none-None when the end of a stream is detected. + + :param strict: If True (the default), the decoder will not knowingly + accept input that is not conformant to the HTTP specification. + (This does not imply that it will catch every nonconformance.) + If False, it will accept incorrect input that is still + unambiguous. """ self.output = output self.buffered_bytes = [] self.state = self._read_length self.body_length = 0 + self.strict = strict def close(self): """Close the decoder. @@ -87,6 +95,11 @@ class Decoder(object): if count_chars[-1][-1] != '\n': return count_str = ''.join(count_chars) + if self.strict: + if count_str[-2:] != '\r\n': + raise ValueError("chunk header invalid: %r" % count_str) + if '\r' in count_str[:-2]: + raise ValueError("too many crs in chunk header %r" % count_str) self.body_length = int(count_str.rstrip('\n\r'), 16) excess_bytes = len(count_str) while excess_bytes: diff --git a/python/subunit/tests/test_chunked.py b/python/subunit/tests/test_chunked.py index 681af6b..4fb99bf 100644 --- a/python/subunit/tests/test_chunked.py +++ b/python/subunit/tests/test_chunked.py @@ -87,13 +87,29 @@ class TestDecode(unittest.TestCase): self.assertEqual('', self.decoder.write('0\r\n')) self.assertEqual('1' * 65536 + '2' * 65536, self.output.getvalue()) - def test_decode_newline(self): + def test_decode_newline_nonstrict(self): """Tolerate chunk markers with no cr character.""" + # From + self.decoder = subunit.chunked.Decoder(self.output, strict=False) self.assertEqual(None, self.decoder.write('a\n')) self.assertEqual(None, self.decoder.write('abcdeabcde')) - self.assertEqual(None, self.decoder.write('0\n')) + self.assertEqual('', self.decoder.write('0\n')) self.assertEqual('abcdeabcde', self.output.getvalue()) + def test_decode_strict_newline_only(self): + """Reject chunk markers with no cr character in strict mode.""" + # From + self.assertRaises(ValueError, + self.decoder.write, 'a\n') + + def test_decode_strict_multiple_crs(self): + self.assertRaises(ValueError, + self.decoder.write, 'a\r\r\n') + + def test_decode_short_header(self): + self.assertRaises(ValueError, + self.decoder.write, '\n') + class TestEncode(unittest.TestCase): -- cgit v1.2.1