diff options
Diffstat (limited to 'Lib/test')
-rw-r--r-- | Lib/test/support.py | 2 | ||||
-rw-r--r-- | Lib/test/test_hmac.py | 44 | ||||
-rw-r--r-- | Lib/test/test_mailbox.py | 11 | ||||
-rw-r--r-- | Lib/test/test_multiprocessing.py | 355 | ||||
-rw-r--r-- | Lib/test/test_structseq.py | 5 | ||||
-rw-r--r-- | Lib/test/test_time.py | 65 | ||||
-rw-r--r-- | Lib/test/test_xml_etree.py | 226 | ||||
-rw-r--r-- | Lib/test/test_xml_etree_c.py | 28 |
8 files changed, 566 insertions, 170 deletions
diff --git a/Lib/test/support.py b/Lib/test/support.py index 6749a5115e..3ff1df51d1 100644 --- a/Lib/test/support.py +++ b/Lib/test/support.py @@ -1593,7 +1593,7 @@ def strip_python_stderr(stderr): This will typically be run on the result of the communicate() method of a subprocess.Popen object. """ - stderr = re.sub(br"\[\d+ refs\]\r?\n?$", b"", stderr).strip() + stderr = re.sub(br"\[\d+ refs\]\r?\n?", b"", stderr).strip() return stderr def args_from_interpreter_flags(): diff --git a/Lib/test/test_hmac.py b/Lib/test/test_hmac.py index 042bc5d8f7..4e5961db71 100644 --- a/Lib/test/test_hmac.py +++ b/Lib/test/test_hmac.py @@ -302,40 +302,42 @@ class CopyTestCase(unittest.TestCase): self.assertEqual(h1.hexdigest(), h2.hexdigest(), "Hexdigest of copy doesn't match original hexdigest.") -class SecureCompareTestCase(unittest.TestCase): +class CompareDigestTestCase(unittest.TestCase): def test_compare(self): # Testing input type exception handling a, b = 100, 200 - self.assertRaises(TypeError, hmac.secure_compare, a, b) - a, b = 100, "foobar" - self.assertRaises(TypeError, hmac.secure_compare, a, b) + self.assertRaises(TypeError, hmac.compare_digest, a, b) + a, b = 100, b"foobar" + self.assertRaises(TypeError, hmac.compare_digest, a, b) + a, b = b"foobar", 200 + self.assertRaises(TypeError, hmac.compare_digest, a, b) a, b = "foobar", b"foobar" - self.assertRaises(TypeError, hmac.secure_compare, a, b) + self.assertRaises(TypeError, hmac.compare_digest, a, b) + a, b = b"foobar", "foobar" + self.assertRaises(TypeError, hmac.compare_digest, a, b) + a, b = "foobar", "foobar" + self.assertRaises(TypeError, hmac.compare_digest, a, b) + a, b = bytearray(b"foobar"), bytearray(b"foobar") + self.assertRaises(TypeError, hmac.compare_digest, a, b) - # Testing str/bytes of different lengths - a, b = "foobar", "foo" - self.assertFalse(hmac.secure_compare(a, b)) + # Testing bytes of different lengths a, b = b"foobar", b"foo" - self.assertFalse(hmac.secure_compare(a, b)) + self.assertFalse(hmac.compare_digest(a, b)) a, b = b"\xde\xad\xbe\xef", b"\xde\xad" - self.assertFalse(hmac.secure_compare(a, b)) + self.assertFalse(hmac.compare_digest(a, b)) - # Testing str/bytes of same lengths, different values - a, b = "foobar", "foobaz" - self.assertFalse(hmac.secure_compare(a, b)) + # Testing bytes of same lengths, different values a, b = b"foobar", b"foobaz" - self.assertFalse(hmac.secure_compare(a, b)) + self.assertFalse(hmac.compare_digest(a, b)) a, b = b"\xde\xad\xbe\xef", b"\xab\xad\x1d\xea" - self.assertFalse(hmac.secure_compare(a, b)) + self.assertFalse(hmac.compare_digest(a, b)) - # Testing str/bytes of same lengths, same values - a, b = "foobar", "foobar" - self.assertTrue(hmac.secure_compare(a, b)) + # Testing bytes of same lengths, same values a, b = b"foobar", b"foobar" - self.assertTrue(hmac.secure_compare(a, b)) + self.assertTrue(hmac.compare_digest(a, b)) a, b = b"\xde\xad\xbe\xef", b"\xde\xad\xbe\xef" - self.assertTrue(hmac.secure_compare(a, b)) + self.assertTrue(hmac.compare_digest(a, b)) def test_main(): support.run_unittest( @@ -343,7 +345,7 @@ def test_main(): ConstructorTestCase, SanityTestCase, CopyTestCase, - SecureCompareTestCase + CompareDigestTestCase ) if __name__ == "__main__": diff --git a/Lib/test/test_mailbox.py b/Lib/test/test_mailbox.py index 0fe4bd9dd0..cb54505d9a 100644 --- a/Lib/test/test_mailbox.py +++ b/Lib/test/test_mailbox.py @@ -504,6 +504,17 @@ class TestMailbox(TestBase): # Write changes to disk self._test_flush_or_close(self._box.flush, True) + def test_popitem_and_flush_twice(self): + # See #15036. + self._box.add(self._template % 0) + self._box.add(self._template % 1) + self._box.flush() + + self._box.popitem() + self._box.flush() + self._box.popitem() + self._box.flush() + def test_lock_unlock(self): # Lock and unlock the mailbox self.assertFalse(os.path.exists(self._get_lock_path())) diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py index 65e7b0be6e..2704827f49 100644 --- a/Lib/test/test_multiprocessing.py +++ b/Lib/test/test_multiprocessing.py @@ -18,6 +18,7 @@ import array import socket import random import logging +import struct import test.support @@ -1057,6 +1058,340 @@ class _TestEvent(BaseTestCase): self.assertEqual(wait(), True) # +# Tests for Barrier - adapted from tests in test/lock_tests.py +# + +# Many of the tests for threading.Barrier use a list as an atomic +# counter: a value is appended to increment the counter, and the +# length of the list gives the value. We use the class DummyList +# for the same purpose. + +class _DummyList(object): + + def __init__(self): + wrapper = multiprocessing.heap.BufferWrapper(struct.calcsize('i')) + lock = multiprocessing.Lock() + self.__setstate__((wrapper, lock)) + self._lengthbuf[0] = 0 + + def __setstate__(self, state): + (self._wrapper, self._lock) = state + self._lengthbuf = self._wrapper.create_memoryview().cast('i') + + def __getstate__(self): + return (self._wrapper, self._lock) + + def append(self, _): + with self._lock: + self._lengthbuf[0] += 1 + + def __len__(self): + with self._lock: + return self._lengthbuf[0] + +def _wait(): + # A crude wait/yield function not relying on synchronization primitives. + time.sleep(0.01) + + +class Bunch(object): + """ + A bunch of threads. + """ + def __init__(self, namespace, f, args, n, wait_before_exit=False): + """ + Construct a bunch of `n` threads running the same function `f`. + If `wait_before_exit` is True, the threads won't terminate until + do_finish() is called. + """ + self.f = f + self.args = args + self.n = n + self.started = namespace.DummyList() + self.finished = namespace.DummyList() + self._can_exit = namespace.Event() + if not wait_before_exit: + self._can_exit.set() + for i in range(n): + p = namespace.Process(target=self.task) + p.daemon = True + p.start() + + def task(self): + pid = os.getpid() + self.started.append(pid) + try: + self.f(*self.args) + finally: + self.finished.append(pid) + self._can_exit.wait(30) + assert self._can_exit.is_set() + + def wait_for_started(self): + while len(self.started) < self.n: + _wait() + + def wait_for_finished(self): + while len(self.finished) < self.n: + _wait() + + def do_finish(self): + self._can_exit.set() + + +class AppendTrue(object): + def __init__(self, obj): + self.obj = obj + def __call__(self): + self.obj.append(True) + + +class _TestBarrier(BaseTestCase): + """ + Tests for Barrier objects. + """ + N = 5 + defaultTimeout = 10.0 # XXX Slow Windows buildbots need generous timeout + + def setUp(self): + self.barrier = self.Barrier(self.N, timeout=self.defaultTimeout) + + def tearDown(self): + self.barrier.abort() + self.barrier = None + + def DummyList(self): + if self.TYPE == 'threads': + return [] + elif self.TYPE == 'manager': + return self.manager.list() + else: + return _DummyList() + + def run_threads(self, f, args): + b = Bunch(self, f, args, self.N-1) + f(*args) + b.wait_for_finished() + + @classmethod + def multipass(cls, barrier, results, n): + m = barrier.parties + assert m == cls.N + for i in range(n): + results[0].append(True) + assert len(results[1]) == i * m + barrier.wait() + results[1].append(True) + assert len(results[0]) == (i + 1) * m + barrier.wait() + try: + assert barrier.n_waiting == 0 + except NotImplementedError: + pass + assert not barrier.broken + + def test_barrier(self, passes=1): + """ + Test that a barrier is passed in lockstep + """ + results = [self.DummyList(), self.DummyList()] + self.run_threads(self.multipass, (self.barrier, results, passes)) + + def test_barrier_10(self): + """ + Test that a barrier works for 10 consecutive runs + """ + return self.test_barrier(10) + + @classmethod + def _test_wait_return_f(cls, barrier, queue): + res = barrier.wait() + queue.put(res) + + def test_wait_return(self): + """ + test the return value from barrier.wait + """ + queue = self.Queue() + self.run_threads(self._test_wait_return_f, (self.barrier, queue)) + results = [queue.get() for i in range(self.N)] + self.assertEqual(results.count(0), 1) + + @classmethod + def _test_action_f(cls, barrier, results): + barrier.wait() + if len(results) != 1: + raise RuntimeError + + def test_action(self): + """ + Test the 'action' callback + """ + results = self.DummyList() + barrier = self.Barrier(self.N, action=AppendTrue(results)) + self.run_threads(self._test_action_f, (barrier, results)) + self.assertEqual(len(results), 1) + + @classmethod + def _test_abort_f(cls, barrier, results1, results2): + try: + i = barrier.wait() + if i == cls.N//2: + raise RuntimeError + barrier.wait() + results1.append(True) + except threading.BrokenBarrierError: + results2.append(True) + except RuntimeError: + barrier.abort() + + def test_abort(self): + """ + Test that an abort will put the barrier in a broken state + """ + results1 = self.DummyList() + results2 = self.DummyList() + self.run_threads(self._test_abort_f, + (self.barrier, results1, results2)) + self.assertEqual(len(results1), 0) + self.assertEqual(len(results2), self.N-1) + self.assertTrue(self.barrier.broken) + + @classmethod + def _test_reset_f(cls, barrier, results1, results2, results3): + i = barrier.wait() + if i == cls.N//2: + # Wait until the other threads are all in the barrier. + while barrier.n_waiting < cls.N-1: + time.sleep(0.001) + barrier.reset() + else: + try: + barrier.wait() + results1.append(True) + except threading.BrokenBarrierError: + results2.append(True) + # Now, pass the barrier again + barrier.wait() + results3.append(True) + + def test_reset(self): + """ + Test that a 'reset' on a barrier frees the waiting threads + """ + results1 = self.DummyList() + results2 = self.DummyList() + results3 = self.DummyList() + self.run_threads(self._test_reset_f, + (self.barrier, results1, results2, results3)) + self.assertEqual(len(results1), 0) + self.assertEqual(len(results2), self.N-1) + self.assertEqual(len(results3), self.N) + + @classmethod + def _test_abort_and_reset_f(cls, barrier, barrier2, + results1, results2, results3): + try: + i = barrier.wait() + if i == cls.N//2: + raise RuntimeError + barrier.wait() + results1.append(True) + except threading.BrokenBarrierError: + results2.append(True) + except RuntimeError: + barrier.abort() + # Synchronize and reset the barrier. Must synchronize first so + # that everyone has left it when we reset, and after so that no + # one enters it before the reset. + if barrier2.wait() == cls.N//2: + barrier.reset() + barrier2.wait() + barrier.wait() + results3.append(True) + + def test_abort_and_reset(self): + """ + Test that a barrier can be reset after being broken. + """ + results1 = self.DummyList() + results2 = self.DummyList() + results3 = self.DummyList() + barrier2 = self.Barrier(self.N) + + self.run_threads(self._test_abort_and_reset_f, + (self.barrier, barrier2, results1, results2, results3)) + self.assertEqual(len(results1), 0) + self.assertEqual(len(results2), self.N-1) + self.assertEqual(len(results3), self.N) + + @classmethod + def _test_timeout_f(cls, barrier, results): + i = barrier.wait(20) + if i == cls.N//2: + # One thread is late! + time.sleep(4.0) + try: + barrier.wait(0.5) + except threading.BrokenBarrierError: + results.append(True) + + def test_timeout(self): + """ + Test wait(timeout) + """ + results = self.DummyList() + self.run_threads(self._test_timeout_f, (self.barrier, results)) + self.assertEqual(len(results), self.barrier.parties) + + @classmethod + def _test_default_timeout_f(cls, barrier, results): + i = barrier.wait(20) + if i == cls.N//2: + # One thread is later than the default timeout + time.sleep(4.0) + try: + barrier.wait() + except threading.BrokenBarrierError: + results.append(True) + + def test_default_timeout(self): + """ + Test the barrier's default timeout + """ + barrier = self.Barrier(self.N, timeout=1.0) + results = self.DummyList() + self.run_threads(self._test_default_timeout_f, (barrier, results)) + self.assertEqual(len(results), barrier.parties) + + def test_single_thread(self): + b = self.Barrier(1) + b.wait() + b.wait() + + @classmethod + def _test_thousand_f(cls, barrier, passes, conn, lock): + for i in range(passes): + barrier.wait() + with lock: + conn.send(i) + + def test_thousand(self): + if self.TYPE == 'manager': + return + passes = 1000 + lock = self.Lock() + conn, child_conn = self.Pipe(False) + for j in range(self.N): + p = self.Process(target=self._test_thousand_f, + args=(self.barrier, passes, child_conn, lock)) + p.start() + + for i in range(passes): + for j in range(self.N): + self.assertEqual(conn.recv(), i) + +# # # @@ -1485,6 +1820,11 @@ class _TestZZZNumberOfObjects(BaseTestCase): # run after all the other tests for the manager. It tests that # there have been no "reference leaks" for the manager's shared # objects. Note the comment in _TestPool.test_terminate(). + + # If some other test using ManagerMixin.manager fails, then the + # raised exception may keep alive a frame which holds a reference + # to a managed object. This will cause test_number_of_objects to + # also fail. ALLOWED_TYPES = ('manager',) def test_number_of_objects(self): @@ -1564,6 +1904,11 @@ class _TestMyManager(BaseTestCase): manager.shutdown() + # If the manager process exited cleanly then the exitcode + # will be zero. Otherwise (after a short timeout) + # terminate() is used, resulting in an exitcode of -SIGTERM. + self.assertEqual(manager._process.exitcode, 0) + # # Test of connecting to a remote server and using xmlrpclib for serialization # @@ -1923,7 +2268,7 @@ class _TestConnection(BaseTestCase): class _TestListener(BaseTestCase): - ALLOWED_TYPES = ('processes') + ALLOWED_TYPES = ('processes',) def test_multiple_bind(self): for family in self.connection.families: @@ -2505,10 +2850,12 @@ def create_test_cases(Mixin, type): result = {} glob = globals() Type = type.capitalize() + ALL_TYPES = {'processes', 'threads', 'manager'} for name in list(glob.keys()): if name.startswith('_Test'): base = glob[name] + assert set(base.ALLOWED_TYPES) <= ALL_TYPES, set(base.ALLOWED_TYPES) if type in base.ALLOWED_TYPES: newname = 'With' + Type + name[1:] class Temp(base, unittest.TestCase, Mixin): @@ -2527,7 +2874,7 @@ class ProcessesMixin(object): Process = multiprocessing.Process locals().update(get_attributes(multiprocessing, ( 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', - 'Condition', 'Event', 'Value', 'Array', 'RawValue', + 'Condition', 'Event', 'Barrier', 'Value', 'Array', 'RawValue', 'RawArray', 'current_process', 'active_children', 'Pipe', 'connection', 'JoinableQueue', 'Pool' ))) @@ -2542,7 +2889,7 @@ class ManagerMixin(object): manager = object.__new__(multiprocessing.managers.SyncManager) locals().update(get_attributes(manager, ( 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', - 'Condition', 'Event', 'Value', 'Array', 'list', 'dict', + 'Condition', 'Event', 'Barrier', 'Value', 'Array', 'list', 'dict', 'Namespace', 'JoinableQueue', 'Pool' ))) @@ -2555,7 +2902,7 @@ class ThreadsMixin(object): Process = multiprocessing.dummy.Process locals().update(get_attributes(multiprocessing.dummy, ( 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', - 'Condition', 'Event', 'Value', 'Array', 'current_process', + 'Condition', 'Event', 'Barrier', 'Value', 'Array', 'current_process', 'active_children', 'Pipe', 'connection', 'dict', 'list', 'Namespace', 'JoinableQueue', 'Pool' ))) diff --git a/Lib/test/test_structseq.py b/Lib/test/test_structseq.py index d6c63b792f..a89e9556c1 100644 --- a/Lib/test/test_structseq.py +++ b/Lib/test/test_structseq.py @@ -78,8 +78,9 @@ class StructSeqTest(unittest.TestCase): def test_fields(self): t = time.gmtime() - self.assertEqual(len(t), t.n_fields) - self.assertEqual(t.n_fields, t.n_sequence_fields+t.n_unnamed_fields) + self.assertEqual(len(t), t.n_sequence_fields) + self.assertEqual(t.n_unnamed_fields, 0) + self.assertEqual(t.n_fields, time._STRUCT_TM_ITEMS) def test_constructor(self): t = time.struct_time diff --git a/Lib/test/test_time.py b/Lib/test/test_time.py index 02f05c364a..63e14530eb 100644 --- a/Lib/test/test_time.py +++ b/Lib/test/test_time.py @@ -31,15 +31,14 @@ class TimeTestCase(unittest.TestCase): time.time() info = time.get_clock_info('time') self.assertFalse(info.monotonic) - if sys.platform != 'win32': - self.assertTrue(info.adjusted) + self.assertTrue(info.adjustable) def test_clock(self): time.clock() info = time.get_clock_info('clock') self.assertTrue(info.monotonic) - self.assertFalse(info.adjusted) + self.assertFalse(info.adjustable) @unittest.skipUnless(hasattr(time, 'clock_gettime'), 'need time.clock_gettime()') @@ -371,10 +370,7 @@ class TimeTestCase(unittest.TestCase): info = time.get_clock_info('monotonic') self.assertTrue(info.monotonic) - if sys.platform == 'linux': - self.assertTrue(info.adjusted) - else: - self.assertFalse(info.adjusted) + self.assertFalse(info.adjustable) def test_perf_counter(self): time.perf_counter() @@ -390,7 +386,7 @@ class TimeTestCase(unittest.TestCase): info = time.get_clock_info('process_time') self.assertTrue(info.monotonic) - self.assertFalse(info.adjusted) + self.assertFalse(info.adjustable) @unittest.skipUnless(hasattr(time, 'monotonic'), 'need time.monotonic') @@ -441,7 +437,7 @@ class TimeTestCase(unittest.TestCase): # 0.0 < resolution <= 1.0 self.assertGreater(info.resolution, 0.0) self.assertLessEqual(info.resolution, 1.0) - self.assertIsInstance(info.adjusted, bool) + self.assertIsInstance(info.adjustable, bool) self.assertRaises(ValueError, time.get_clock_info, 'xxx') @@ -624,7 +620,58 @@ class TestPytime(unittest.TestCase): for invalid in self.invalid_values: self.assertRaises(OverflowError, pytime_object_to_timespec, invalid) + @unittest.skipUnless(time._STRUCT_TM_ITEMS == 11, "needs tm_zone support") + def test_localtime_timezone(self): + + # Get the localtime and examine it for the offset and zone. + lt = time.localtime() + self.assertTrue(hasattr(lt, "tm_gmtoff")) + self.assertTrue(hasattr(lt, "tm_zone")) + # See if the offset and zone are similar to the module + # attributes. + if lt.tm_gmtoff is None: + self.assertTrue(not hasattr(time, "timezone")) + else: + self.assertEqual(lt.tm_gmtoff, -[time.timezone, time.altzone][lt.tm_isdst]) + if lt.tm_zone is None: + self.assertTrue(not hasattr(time, "tzname")) + else: + self.assertEqual(lt.tm_zone, time.tzname[lt.tm_isdst]) + + # Try and make UNIX times from the localtime and a 9-tuple + # created from the localtime. Test to see that the times are + # the same. + t = time.mktime(lt); t9 = time.mktime(lt[:9]) + self.assertEqual(t, t9) + + # Make localtimes from the UNIX times and compare them to + # the original localtime, thus making a round trip. + new_lt = time.localtime(t); new_lt9 = time.localtime(t9) + self.assertEqual(new_lt, lt) + self.assertEqual(new_lt.tm_gmtoff, lt.tm_gmtoff) + self.assertEqual(new_lt.tm_zone, lt.tm_zone) + self.assertEqual(new_lt9, lt) + self.assertEqual(new_lt.tm_gmtoff, lt.tm_gmtoff) + self.assertEqual(new_lt9.tm_zone, lt.tm_zone) + + @unittest.skipUnless(time._STRUCT_TM_ITEMS == 11, "needs tm_zone support") + def test_strptime_timezone(self): + t = time.strptime("UTC", "%Z") + self.assertEqual(t.tm_zone, 'UTC') + t = time.strptime("+0500", "%z") + self.assertEqual(t.tm_gmtoff, 5 * 3600) + + @unittest.skipUnless(time._STRUCT_TM_ITEMS == 11, "needs tm_zone support") + def test_short_times(self): + + import pickle + + # Load a short time structure using pickle. + st = b"ctime\nstruct_time\np0\n((I2007\nI8\nI11\nI1\nI24\nI49\nI5\nI223\nI1\ntp1\n(dp2\ntp3\nRp4\n." + lt = pickle.loads(st) + self.assertIs(lt.tm_gmtoff, None) + self.assertIs(lt.tm_zone, None) def test_main(): support.run_unittest( diff --git a/Lib/test/test_xml_etree.py b/Lib/test/test_xml_etree.py index 49a5633a83..24ecae5aca 100644 --- a/Lib/test/test_xml_etree.py +++ b/Lib/test/test_xml_etree.py @@ -23,7 +23,8 @@ import weakref from test import support from test.support import findfile, import_fresh_module, gc_collect -pyET = import_fresh_module('xml.etree.ElementTree', blocked=['_elementtree']) +pyET = None +ET = None SIMPLE_XMLFILE = findfile("simple.xml", subdir="xmltestdata") try: @@ -209,10 +210,8 @@ def interface(): These methods return an iterable. See bug 6472. - >>> check_method(element.iter("tag").__next__) >>> check_method(element.iterfind("tag").__next__) >>> check_method(element.iterfind("*").__next__) - >>> check_method(tree.iter("tag").__next__) >>> check_method(tree.iterfind("tag").__next__) >>> check_method(tree.iterfind("*").__next__) @@ -291,42 +290,6 @@ def cdata(): '<tag>hello</tag>' """ -# Only with Python implementation -def simplefind(): - """ - Test find methods using the elementpath fallback. - - >>> ElementTree = pyET - - >>> CurrentElementPath = ElementTree.ElementPath - >>> ElementTree.ElementPath = ElementTree._SimpleElementPath() - >>> elem = ElementTree.XML(SAMPLE_XML) - >>> elem.find("tag").tag - 'tag' - >>> ElementTree.ElementTree(elem).find("tag").tag - 'tag' - >>> elem.findtext("tag") - 'text' - >>> elem.findtext("tog") - >>> elem.findtext("tog", "default") - 'default' - >>> ElementTree.ElementTree(elem).findtext("tag") - 'text' - >>> summarize_list(elem.findall("tag")) - ['tag', 'tag'] - >>> summarize_list(elem.findall(".//tag")) - ['tag', 'tag', 'tag'] - - Path syntax doesn't work in this case. - - >>> elem.find("section/tag") - >>> elem.findtext("section/tag") - >>> summarize_list(elem.findall("section/tag")) - [] - - >>> ElementTree.ElementPath = CurrentElementPath - """ - def find(): """ Test find methods (including xpath syntax). @@ -1002,36 +965,6 @@ def methods(): '1 < 2\n' """ -def iterators(): - """ - Test iterators. - - >>> e = ET.XML("<html><body>this is a <i>paragraph</i>.</body>..</html>") - >>> summarize_list(e.iter()) - ['html', 'body', 'i'] - >>> summarize_list(e.find("body").iter()) - ['body', 'i'] - >>> summarize(next(e.iter())) - 'html' - >>> "".join(e.itertext()) - 'this is a paragraph...' - >>> "".join(e.find("body").itertext()) - 'this is a paragraph.' - >>> next(e.itertext()) - 'this is a ' - - Method iterparse should return an iterator. See bug 6472. - - >>> sourcefile = serialize(e, to_string=False) - >>> next(ET.iterparse(sourcefile)) # doctest: +ELLIPSIS - ('end', <Element 'i' at 0x...>) - - >>> tree = ET.ElementTree(None) - >>> tree.iter() - Traceback (most recent call last): - AttributeError: 'NoneType' object has no attribute 'iter' - """ - ENTITY_XML = """\ <!DOCTYPE points [ <!ENTITY % user-entities SYSTEM 'user-entities.xml'> @@ -1339,6 +1272,7 @@ XINCLUDE["default.xml"] = """\ </document> """.format(html.escape(SIMPLE_XMLFILE, True)) + def xinclude_loader(href, parse="xml", encoding=None): try: data = XINCLUDE[href] @@ -1411,22 +1345,6 @@ def xinclude(): >>> # print(serialize(document)) # C5 """ -def xinclude_default(): - """ - >>> from xml.etree import ElementInclude - - >>> document = xinclude_loader("default.xml") - >>> ElementInclude.include(document) - >>> print(serialize(document)) # default - <document> - <p>Example.</p> - <root> - <element key="value">text</element> - <element>text</element>tail - <empty-element /> - </root> - </document> - """ # # badly formatted xi:include tags @@ -1917,9 +1835,8 @@ class ElementTreeTest(unittest.TestCase): self.assertIsInstance(ET.QName, type) self.assertIsInstance(ET.ElementTree, type) self.assertIsInstance(ET.Element, type) - # XXX issue 14128 with C ElementTree - # self.assertIsInstance(ET.TreeBuilder, type) - # self.assertIsInstance(ET.XMLParser, type) + self.assertIsInstance(ET.TreeBuilder, type) + self.assertIsInstance(ET.XMLParser, type) def test_Element_subclass_trivial(self): class MyElement(ET.Element): @@ -1953,6 +1870,73 @@ class ElementTreeTest(unittest.TestCase): self.assertEqual(mye.newmethod(), 'joe') +class ElementIterTest(unittest.TestCase): + def _ilist(self, elem, tag=None): + return summarize_list(elem.iter(tag)) + + def test_basic(self): + doc = ET.XML("<html><body>this is a <i>paragraph</i>.</body>..</html>") + self.assertEqual(self._ilist(doc), ['html', 'body', 'i']) + self.assertEqual(self._ilist(doc.find('body')), ['body', 'i']) + self.assertEqual(next(doc.iter()).tag, 'html') + self.assertEqual(''.join(doc.itertext()), 'this is a paragraph...') + self.assertEqual(''.join(doc.find('body').itertext()), + 'this is a paragraph.') + self.assertEqual(next(doc.itertext()), 'this is a ') + + # iterparse should return an iterator + sourcefile = serialize(doc, to_string=False) + self.assertEqual(next(ET.iterparse(sourcefile))[0], 'end') + + tree = ET.ElementTree(None) + self.assertRaises(AttributeError, tree.iter) + + def test_corners(self): + # single root, no subelements + a = ET.Element('a') + self.assertEqual(self._ilist(a), ['a']) + + # one child + b = ET.SubElement(a, 'b') + self.assertEqual(self._ilist(a), ['a', 'b']) + + # one child and one grandchild + c = ET.SubElement(b, 'c') + self.assertEqual(self._ilist(a), ['a', 'b', 'c']) + + # two children, only first with grandchild + d = ET.SubElement(a, 'd') + self.assertEqual(self._ilist(a), ['a', 'b', 'c', 'd']) + + # replace first child by second + a[0] = a[1] + del a[1] + self.assertEqual(self._ilist(a), ['a', 'd']) + + def test_iter_by_tag(self): + doc = ET.XML(''' + <document> + <house> + <room>bedroom1</room> + <room>bedroom2</room> + </house> + <shed>nothing here + </shed> + <house> + <room>bedroom8</room> + </house> + </document>''') + + self.assertEqual(self._ilist(doc, 'room'), ['room'] * 3) + self.assertEqual(self._ilist(doc, 'house'), ['house'] * 2) + + # make sure both tag=None and tag='*' return all tags + all_tags = ['document', 'house', 'room', 'room', + 'shed', 'house', 'room'] + self.assertEqual(self._ilist(doc), all_tags) + self.assertEqual(self._ilist(doc, '*'), all_tags) + + class TreeBuilderTest(unittest.TestCase): sample1 = ('<!DOCTYPE html PUBLIC' ' "-//W3C//DTD XHTML 1.0 Transitional//EN"' @@ -2027,6 +2011,23 @@ class TreeBuilderTest(unittest.TestCase): 'http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd')) +@unittest.skip('Unstable due to module monkeypatching') +class XincludeTest(unittest.TestCase): + def test_xinclude_default(self): + from xml.etree import ElementInclude + doc = xinclude_loader('default.xml') + ElementInclude.include(doc) + s = serialize(doc) + self.assertEqual(s.strip(), '''<document> + <p>Example.</p> + <root> + <element key="value">text</element> + <element>text</element>tail + <empty-element /> +</root> +</document>''') + + class XMLParserTest(unittest.TestCase): sample1 = '<file><line>22</line></file>' sample2 = ('<!DOCTYPE html PUBLIC' @@ -2073,13 +2074,6 @@ class XMLParserTest(unittest.TestCase): 'http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd')) -class NoAcceleratorTest(unittest.TestCase): - # Test that the C accelerator was not imported for pyET - def test_correct_import_pyET(self): - self.assertEqual(pyET.Element.__module__, 'xml.etree.ElementTree') - self.assertEqual(pyET.SubElement.__module__, 'xml.etree.ElementTree') - - class NamespaceParseTest(unittest.TestCase): def test_find_with_namespace(self): nsmap = {'h': 'hello', 'f': 'foo'} @@ -2090,7 +2084,6 @@ class NamespaceParseTest(unittest.TestCase): self.assertEqual(len(doc.findall('.//{foo}name', nsmap)), 1) - class ElementSlicingTest(unittest.TestCase): def _elem_tags(self, elemlist): return [e.tag for e in elemlist] @@ -2232,6 +2225,14 @@ class KeywordArgsTest(unittest.TestCase): with self.assertRaisesRegex(TypeError, 'must be dict, not str'): ET.Element('a', attrib="I'm not a dict") +# -------------------------------------------------------------------- + +@unittest.skipUnless(pyET, 'only for the Python version') +class NoAcceleratorTest(unittest.TestCase): + # Test that the C accelerator was not imported for pyET + def test_correct_import_pyET(self): + self.assertEqual(pyET.Element.__module__, 'xml.etree.ElementTree') + self.assertEqual(pyET.SubElement.__module__, 'xml.etree.ElementTree') # -------------------------------------------------------------------- @@ -2276,31 +2277,42 @@ class CleanContext(object): self.checkwarnings.__exit__(*args) -def test_main(module=pyET): - from test import test_xml_etree +def test_main(module=None): + # When invoked without a module, runs the Python ET tests by loading pyET. + # Otherwise, uses the given module as the ET. + if module is None: + global pyET + pyET = import_fresh_module('xml.etree.ElementTree', + blocked=['_elementtree']) + module = pyET - # The same doctests are used for both the Python and the C implementations - test_xml_etree.ET = module + global ET + ET = module test_classes = [ ElementSlicingTest, BasicElementTest, StringIOTest, ParseErrorTest, + XincludeTest, ElementTreeTest, - NamespaceParseTest, + ElementIterTest, TreeBuilderTest, - XMLParserTest, - KeywordArgsTest] - if module is pyET: - # Run the tests specific to the Python implementation - test_classes += [NoAcceleratorTest] + ] + + # These tests will only run for the pure-Python version that doesn't import + # _elementtree. We can't use skipUnless here, because pyET is filled in only + # after the module is loaded. + if pyET: + test_classes.extend([ + NoAcceleratorTest, + ]) support.run_unittest(*test_classes) # XXX the C module should give the same warnings as the Python module with CleanContext(quiet=(module is not pyET)): - support.run_doctest(test_xml_etree, verbosity=True) + support.run_doctest(sys.modules[__name__], verbosity=True) if __name__ == '__main__': test_main() diff --git a/Lib/test/test_xml_etree_c.py b/Lib/test/test_xml_etree_c.py index 10416d2591..142a22ff98 100644 --- a/Lib/test/test_xml_etree_c.py +++ b/Lib/test/test_xml_etree_c.py @@ -8,31 +8,6 @@ cET = import_fresh_module('xml.etree.ElementTree', fresh=['_elementtree']) cET_alias = import_fresh_module('xml.etree.cElementTree', fresh=['_elementtree', 'xml.etree']) -# cElementTree specific tests - -def sanity(): - r""" - Import sanity. - - Issue #6697. - - >>> cElementTree = cET - >>> e = cElementTree.Element('a') - >>> getattr(e, '\uD800') # doctest: +ELLIPSIS - Traceback (most recent call last): - ... - UnicodeEncodeError: ... - - >>> p = cElementTree.XMLParser() - >>> p.version.split()[0] - 'Expat' - >>> getattr(p, '\uD800') - Traceback (most recent call last): - ... - AttributeError: 'XMLParser' object has no attribute '\ud800' - """ - - class MiscTests(unittest.TestCase): # Issue #8651. @support.bigmemtest(size=support._2G + 100, memuse=1) @@ -46,6 +21,7 @@ class MiscTests(unittest.TestCase): finally: data = None + @unittest.skipUnless(cET, 'requires _elementtree') class TestAliasWorking(unittest.TestCase): # Test that the cET alias module is alive @@ -53,6 +29,7 @@ class TestAliasWorking(unittest.TestCase): e = cET_alias.Element('foo') self.assertEqual(e.tag, 'foo') + @unittest.skipUnless(cET, 'requires _elementtree') class TestAcceleratorImported(unittest.TestCase): # Test that the C accelerator was imported, as expected @@ -67,7 +44,6 @@ def test_main(): from test import test_xml_etree, test_xml_etree_c # Run the tests specific to the C implementation - support.run_doctest(test_xml_etree_c, verbosity=True) support.run_unittest( MiscTests, TestAliasWorking, |