diff options
Diffstat (limited to 'tests.py')
| -rw-r--r-- | tests.py | 75 |
1 files changed, 64 insertions, 11 deletions
@@ -9,15 +9,15 @@ from xml.sax.saxutils import XMLGenerator from xml.sax import SAXParseException from pyexpat import ExpatError -from defusedxml import cElementTree, ElementTree, minidom, pulldom, sax +from defusedxml import cElementTree, ElementTree, minidom, pulldom, sax, xmlrpc from defusedxml import (DefusedXmlException, DTDForbidden, EntitiesForbidden, ExternalReferenceForbidden, NotSupportedError) from defusedxml.common import PY3, PY26, PY31 -if PY3: - from xmlrpc.client import ExpatParser as XmlRpcParser -else: - from xmlrpclib import ExpatParser as XmlRpcParser +try: + import gzip +except ImportError: + gzip = None try: from defusedxml import lxml @@ -432,19 +432,70 @@ class XmlRpcTarget(object): def end(self, tag): self._data.append("</%s>" % tag) - class TestXmlRpc(DefusedTestCase): - def parse(self, xmlfile): + module = xmlrpc + def parse(self, xmlfile, **kwargs): target = XmlRpcTarget() - parser = XmlRpcParser(target) + parser = self.module.DefusedExpatParser(target, **kwargs) data = self.get_content(xmlfile) parser.feed(data) parser.close() return target - #def test_xmlrpc(self): - # self.parse(self.xml_bomb) - # self.parse(self.xml_quadratic) + def test_xmlrpc(self): + self.assertRaises(EntitiesForbidden, self.parse, self.xml_bomb) + self.assertRaises(EntitiesForbidden, self.parse, self.xml_quadratic) + self.parse(self.xml_dtd) + self.assertRaises(DTDForbidden, self.parse, self.xml_dtd, + forbid_dtd=True) + + +class TestDefusedGzip(DefusedTestCase): + def get_gzipped(self, length): + f = io.BytesIO() + gzf = gzip.GzipFile(mode="wb", fileobj=f) + gzf.write(b"d" * length) + gzf.close() + f.seek(0) + return f + + def decode_response(self, response, limit=None): + dec = xmlrpc.DefusedGzipDecodedResponse(response, limit) + acc = [] + while True: + data = dec.read(1024) + if not data: + break + acc.append(data) + return b"".join(acc) + + def test_defused_gzip_decode(self): + data = self.get_gzipped(4096).getvalue() + result = xmlrpc.defused_gzip_decode(data) + self.assertEqual(result, b"d" *4096) + result = xmlrpc.defused_gzip_decode(data, -1) + self.assertEqual(result, b"d" *4096) + result = xmlrpc.defused_gzip_decode(data, 4096) + self.assertEqual(result, b"d" *4096) + with self.assertRaises(ValueError): + result = xmlrpc.defused_gzip_decode(data, 4095) + with self.assertRaises(ValueError): + result = xmlrpc.defused_gzip_decode(data, 0) + + def test_defused_gzip_response(self): + clen = len(self.get_gzipped(4096).getvalue()) + + response = self.get_gzipped(4096) + data = self.decode_response(response) + self.assertEqual(data, b"d" *4096) + + with self.assertRaises(ValueError): + response = self.get_gzipped(4096) + xmlrpc.DefusedGzipDecodedResponse(response, clen - 1) + + with self.assertRaises(ValueError): + response = self.get_gzipped(4096) + self.decode_response(response, 4095) def test_main(): @@ -457,6 +508,8 @@ def test_main(): suite.addTests(unittest.makeSuite(TestXmlRpc)) if lxml is not None: suite.addTests(unittest.makeSuite(TestDefusedLxml)) + if gzip is not None: + suite.addTests(unittest.makeSuite(TestDefusedGzip)) return suite if __name__ == "__main__": |
