summaryrefslogtreecommitdiff
path: root/tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests.py')
-rw-r--r--tests.py75
1 files changed, 64 insertions, 11 deletions
diff --git a/tests.py b/tests.py
index cba8b39..f188dc5 100644
--- a/tests.py
+++ b/tests.py
@@ -9,15 +9,15 @@ from xml.sax.saxutils import XMLGenerator
from xml.sax import SAXParseException
from pyexpat import ExpatError
-from defusedxml import cElementTree, ElementTree, minidom, pulldom, sax
+from defusedxml import cElementTree, ElementTree, minidom, pulldom, sax, xmlrpc
from defusedxml import (DefusedXmlException, DTDForbidden, EntitiesForbidden,
ExternalReferenceForbidden, NotSupportedError)
from defusedxml.common import PY3, PY26, PY31
-if PY3:
- from xmlrpc.client import ExpatParser as XmlRpcParser
-else:
- from xmlrpclib import ExpatParser as XmlRpcParser
+try:
+ import gzip
+except ImportError:
+ gzip = None
try:
from defusedxml import lxml
@@ -432,19 +432,70 @@ class XmlRpcTarget(object):
def end(self, tag):
self._data.append("</%s>" % tag)
-
class TestXmlRpc(DefusedTestCase):
- def parse(self, xmlfile):
+ module = xmlrpc
+ def parse(self, xmlfile, **kwargs):
target = XmlRpcTarget()
- parser = XmlRpcParser(target)
+ parser = self.module.DefusedExpatParser(target, **kwargs)
data = self.get_content(xmlfile)
parser.feed(data)
parser.close()
return target
- #def test_xmlrpc(self):
- # self.parse(self.xml_bomb)
- # self.parse(self.xml_quadratic)
+ def test_xmlrpc(self):
+ self.assertRaises(EntitiesForbidden, self.parse, self.xml_bomb)
+ self.assertRaises(EntitiesForbidden, self.parse, self.xml_quadratic)
+ self.parse(self.xml_dtd)
+ self.assertRaises(DTDForbidden, self.parse, self.xml_dtd,
+ forbid_dtd=True)
+
+
+class TestDefusedGzip(DefusedTestCase):
+ def get_gzipped(self, length):
+ f = io.BytesIO()
+ gzf = gzip.GzipFile(mode="wb", fileobj=f)
+ gzf.write(b"d" * length)
+ gzf.close()
+ f.seek(0)
+ return f
+
+ def decode_response(self, response, limit=None):
+ dec = xmlrpc.DefusedGzipDecodedResponse(response, limit)
+ acc = []
+ while True:
+ data = dec.read(1024)
+ if not data:
+ break
+ acc.append(data)
+ return b"".join(acc)
+
+ def test_defused_gzip_decode(self):
+ data = self.get_gzipped(4096).getvalue()
+ result = xmlrpc.defused_gzip_decode(data)
+ self.assertEqual(result, b"d" *4096)
+ result = xmlrpc.defused_gzip_decode(data, -1)
+ self.assertEqual(result, b"d" *4096)
+ result = xmlrpc.defused_gzip_decode(data, 4096)
+ self.assertEqual(result, b"d" *4096)
+ with self.assertRaises(ValueError):
+ result = xmlrpc.defused_gzip_decode(data, 4095)
+ with self.assertRaises(ValueError):
+ result = xmlrpc.defused_gzip_decode(data, 0)
+
+ def test_defused_gzip_response(self):
+ clen = len(self.get_gzipped(4096).getvalue())
+
+ response = self.get_gzipped(4096)
+ data = self.decode_response(response)
+ self.assertEqual(data, b"d" *4096)
+
+ with self.assertRaises(ValueError):
+ response = self.get_gzipped(4096)
+ xmlrpc.DefusedGzipDecodedResponse(response, clen - 1)
+
+ with self.assertRaises(ValueError):
+ response = self.get_gzipped(4096)
+ self.decode_response(response, 4095)
def test_main():
@@ -457,6 +508,8 @@ def test_main():
suite.addTests(unittest.makeSuite(TestXmlRpc))
if lxml is not None:
suite.addTests(unittest.makeSuite(TestDefusedLxml))
+ if gzip is not None:
+ suite.addTests(unittest.makeSuite(TestDefusedGzip))
return suite
if __name__ == "__main__":