summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorRobert Collins <robertc@robertcollins.net>2009-10-10 18:59:09 +1100
committerRobert Collins <robertc@robertcollins.net>2009-10-10 18:59:09 +1100
commita5370700d7c9a70af8b7baa897b5edef81a1ebe0 (patch)
treed4de236f30d24a11b14ea908784786a1c37e9403 /python
parent40ae70b04c7c88ed80a5e5b3f340f0c523b95e59 (diff)
downloadsubunit-git-a5370700d7c9a70af8b7baa897b5edef81a1ebe0.tar.gz
Implement a chunked decoder.
Diffstat (limited to 'python')
-rw-r--r--python/subunit/chunked.py101
-rw-r--r--python/subunit/tests/test_chunked.py57
2 files changed, 157 insertions, 1 deletions
diff --git a/python/subunit/chunked.py b/python/subunit/chunked.py
index 2cfc7ff..89fb97b 100644
--- a/python/subunit/chunked.py
+++ b/python/subunit/chunked.py
@@ -16,13 +16,112 @@
"""Encoder/decoder for http style chunked encoding."""
+class Decoder(object):
+ """Decode chunked content to a byte stream."""
+
+ def __init__(self, output):
+ """Create a decoder decoding to output.
+
+ :param output: A file-like object. Bytes written to the Decoder are
+ decoded to strip off the chunking and written to the output.
+ Up to a full write worth of data or a single control line may be
+ buffered (whichever is larger). The close method should be called
+ when no more data is available, to detect short streams; the
+ write method will return none-None when the end of a stream is
+ detected.
+ """
+ self.output = output
+ self.buffered_bytes = []
+ self.state = self._read_length
+ self.body_length = 0
+
+ def close(self):
+ """Close the decoder.
+
+ :raises ValueError: If the stream is incomplete ValueError is raised.
+ """
+ if self.state != self._finished:
+ raise ValueError("incomplete stream")
+
+ def _finished(self):
+ """Finished reading, return any remaining bytes."""
+ if self.buffered_bytes:
+ buffered_bytes = self.buffered_bytes
+ self.buffered_bytes = []
+ return ''.join(buffered_bytes)
+ else:
+ raise ValueError("stream is finished")
+
+ def _read_body(self):
+ """Pass body bytes to the output."""
+ while self.body_length and self.buffered_bytes:
+ if self.body_length >= self.buffered_bytes[0]:
+ self.output.write(self.buffered_bytes[0])
+ self.body_length -= len(self.buffered_bytes[0])
+ self.state = self._read_length
+ # No more data.
+ else:
+ self.output.write(self.buffered_bytes[0][:self.body_length])
+ self.buffered_bytes[0] = \
+ self.buffered_bytes[0][self.body_length:]
+ self.body_length = 0
+ self.state = self._read_length
+ return self.state()
+
+ def _read_length(self):
+ """Try to decode a length from the bytes."""
+ count = -1
+ match_chars = "0123456789abcdefABCDEF\r\n"
+ count_chars = []
+ for bytes in self.buffered_bytes:
+ for byte in bytes:
+ if byte not in match_chars:
+ break
+ count_chars.append(byte)
+ if byte == '\n':
+ break
+ if not count_chars:
+ return
+ if count_chars[-1][-1] != '\n':
+ return
+ count_str = ''.join(count_chars)
+ self.body_length = int(count_str[:-2], 16)
+ excess_bytes = len(count_str)
+ while excess_bytes:
+ if excess_bytes >= len(self.buffered_bytes[0]):
+ excess_bytes -= len(self.buffered_bytes[0])
+ del self.buffered_bytes[0]
+ else:
+ self.buffered_bytes[0] = self.buffered_bytes[0][excess_bytes:]
+ excess_bytes = 0
+ if not self.body_length:
+ self.state = self._finished
+ if not self.buffered_bytes:
+ # May not call into self._finished with no buffered data.
+ return ''
+ else:
+ self.state = self._read_body
+ return self.state()
+
+ def write(self, bytes):
+ """Decode bytes to the output stream.
+
+ :raises ValueError: If the stream has already seen the end of file
+ marker.
+ :returns: None, or the excess bytes beyond the end of file marker.
+ """
+ if bytes:
+ self.buffered_bytes.append(bytes)
+ return self.state()
+
+
class Encoder(object):
"""Encode content to a stream using HTTP Chunked coding."""
def __init__(self, output):
"""Create an encoder encoding to output.
- :param output: A file-like object. Bytes written to the Encoder
+ :param output: A file-like object. Bytes written to the Encoder
will be encoded using HTTP chunking. Small writes may be buffered
and the ``close`` method must be called to finish the stream.
"""
diff --git a/python/subunit/tests/test_chunked.py b/python/subunit/tests/test_chunked.py
index 2bf82b2..35de613 100644
--- a/python/subunit/tests/test_chunked.py
+++ b/python/subunit/tests/test_chunked.py
@@ -26,9 +26,66 @@ def test_suite():
return result
+class TestDecode(unittest.TestCase):
+
+ def setUp(self):
+ unittest.TestCase.setUp(self)
+ self.output = StringIO()
+ self.decoder = subunit.chunked.Decoder(self.output)
+
+ def test_close_read_length_short_errors(self):
+ self.assertRaises(ValueError, self.decoder.close)
+
+ def test_close_body_short_errors(self):
+ self.assertEqual(None, self.decoder.write('2\r\na'))
+ self.assertRaises(ValueError, self.decoder.close)
+
+ def test_close_body_buffered_data_errors(self):
+ self.assertEqual(None, self.decoder.write('2\r'))
+ self.assertRaises(ValueError, self.decoder.close)
+
+ def test_close_after_finished_stream_safe(self):
+ self.assertEqual(None, self.decoder.write('2\r\nab'))
+ self.assertEqual('', self.decoder.write('0\r\n'))
+ self.decoder.close()
+
+ def test_decode_nothing(self):
+ self.assertEqual('', self.decoder.write('0\r\n'))
+ self.assertEqual('', self.output.getvalue())
+
+ def test_decode_short(self):
+ self.assertEqual('', self.decoder.write('3\r\nabc0\r\n'))
+ self.assertEqual('abc', self.output.getvalue())
+
+ def test_decode_combines_short(self):
+ self.assertEqual('', self.decoder.write('6\r\nabcdef0\r\n'))
+ self.assertEqual('abcdef', self.output.getvalue())
+
+ def test_decode_excess_bytes_from_write(self):
+ self.assertEqual('1234', self.decoder.write('3\r\nabc0\r\n1234'))
+ self.assertEqual('abc', self.output.getvalue())
+
+ def test_decode_write_after_finished_errors(self):
+ self.assertEqual('1234', self.decoder.write('3\r\nabc0\r\n1234'))
+ self.assertRaises(ValueError, self.decoder.write, '')
+
+ def test_decode_hex(self):
+ self.assertEqual('', self.decoder.write('A\r\n12345678900\r\n'))
+ self.assertEqual('1234567890', self.output.getvalue())
+
+ def test_decode_long_ranges(self):
+ self.assertEqual(None, self.decoder.write('10000\r\n'))
+ self.assertEqual(None, self.decoder.write('1' * 65536))
+ self.assertEqual(None, self.decoder.write('10000\r\n'))
+ self.assertEqual(None, self.decoder.write('2' * 65536))
+ self.assertEqual('', self.decoder.write('0\r\n'))
+ self.assertEqual('1' * 65536 + '2' * 65536, self.output.getvalue())
+
+
class TestEncode(unittest.TestCase):
def setUp(self):
+ unittest.TestCase.setUp(self)
self.output = StringIO()
self.encoder = subunit.chunked.Encoder(self.output)