summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/subunit/chunked.py24
-rw-r--r--python/subunit/tests/test_chunked.py24
2 files changed, 42 insertions, 6 deletions
diff --git a/python/subunit/chunked.py b/python/subunit/chunked.py
index 82e4b0d..5f8c6f1 100644
--- a/python/subunit/chunked.py
+++ b/python/subunit/chunked.py
@@ -1,12 +1,13 @@
#
# subunit: extensions to python unittest to get test results from subprocesses.
# Copyright (C) 2005 Robert Collins <robertc@robertcollins.net>
+# Copyright (C) 2011 Martin Pool <mbp@sourcefrog.net>
#
# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
# license at the users choice. A copy of both licenses are available in the
# project source as Apache-2.0 and BSD. You may not use this file except in
# compliance with one of these two licences.
-#
+#
# Unless required by applicable law or agreed to in writing, software
# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@@ -19,7 +20,7 @@
class Decoder(object):
"""Decode chunked content to a byte stream."""
- def __init__(self, output):
+ def __init__(self, output, strict=True):
"""Create a decoder decoding to output.
:param output: A file-like object. Bytes written to the Decoder are
@@ -29,11 +30,18 @@ class Decoder(object):
when no more data is available, to detect short streams; the
write method will return none-None when the end of a stream is
detected.
+
+ :param strict: If True (the default), the decoder will not knowingly
+ accept input that is not conformant to the HTTP specification.
+ (This does not imply that it will catch every nonconformance.)
+ If False, it will accept incorrect input that is still
+ unambiguous.
"""
self.output = output
self.buffered_bytes = []
self.state = self._read_length
self.body_length = 0
+ self.strict = strict
def close(self):
"""Close the decoder.
@@ -72,7 +80,6 @@ class Decoder(object):
def _read_length(self):
"""Try to decode a length from the bytes."""
- count = -1
match_chars = "0123456789abcdefABCDEF\r\n"
count_chars = []
for bytes in self.buffered_bytes:
@@ -87,7 +94,12 @@ class Decoder(object):
if count_chars[-1][-1] != '\n':
return
count_str = ''.join(count_chars)
- self.body_length = int(count_str[:-2], 16)
+ if self.strict:
+ if count_str[-2:] != '\r\n':
+ raise ValueError("chunk header invalid: %r" % count_str)
+ if '\r' in count_str[:-2]:
+ raise ValueError("too many CRs in chunk header %r" % count_str)
+ self.body_length = int(count_str.rstrip('\n\r'), 16)
excess_bytes = len(count_str)
while excess_bytes:
if excess_bytes >= len(self.buffered_bytes[0]):
@@ -107,7 +119,7 @@ class Decoder(object):
def write(self, bytes):
"""Decode bytes to the output stream.
-
+
:raises ValueError: If the stream has already seen the end of file
marker.
:returns: None, or the excess bytes beyond the end of file marker.
@@ -133,7 +145,7 @@ class Encoder(object):
def flush(self, extra_len=0):
"""Flush the encoder to the output stream.
-
+
:param extra_len: Increase the size of the chunk by this many bytes
to allow for a subsequent write.
"""
diff --git a/python/subunit/tests/test_chunked.py b/python/subunit/tests/test_chunked.py
index a24e31e..eee7fe9 100644
--- a/python/subunit/tests/test_chunked.py
+++ b/python/subunit/tests/test_chunked.py
@@ -1,6 +1,7 @@
#
# subunit: extensions to python unittest to get test results from subprocesses.
# Copyright (C) 2005 Robert Collins <robertc@robertcollins.net>
+# Copyright (C) 2011 Martin Pool <mbp@sourcefrog.net>
#
# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
# license at the users choice. A copy of both licenses are available in the
@@ -86,6 +87,29 @@ class TestDecode(unittest.TestCase):
self.assertEqual('', self.decoder.write('0\r\n'))
self.assertEqual('1' * 65536 + '2' * 65536, self.output.getvalue())
+ def test_decode_newline_nonstrict(self):
+ """Tolerate chunk markers with no CR character."""
+ # From <http://pad.lv/505078>
+ self.decoder = subunit.chunked.Decoder(self.output, strict=False)
+ self.assertEqual(None, self.decoder.write('a\n'))
+ self.assertEqual(None, self.decoder.write('abcdeabcde'))
+ self.assertEqual('', self.decoder.write('0\n'))
+ self.assertEqual('abcdeabcde', self.output.getvalue())
+
+ def test_decode_strict_newline_only(self):
+ """Reject chunk markers with no CR character in strict mode."""
+ # From <http://pad.lv/505078>
+ self.assertRaises(ValueError,
+ self.decoder.write, 'a\n')
+
+ def test_decode_strict_multiple_crs(self):
+ self.assertRaises(ValueError,
+ self.decoder.write, 'a\r\r\n')
+
+ def test_decode_short_header(self):
+ self.assertRaises(ValueError,
+ self.decoder.write, '\n')
+
class TestEncode(unittest.TestCase):