diff options
Diffstat (limited to 'test/base/utils.py')
| -rw-r--r-- | test/base/utils.py | 480 |
1 files changed, 418 insertions, 62 deletions
diff --git a/test/base/utils.py b/test/base/utils.py index a00338f5f..070ffb583 100644 --- a/test/base/utils.py +++ b/test/base/utils.py @@ -1,8 +1,9 @@ import testenv; testenv.configure_for_tests() -import unittest -from sqlalchemy import util, sql, exceptions -from testlib import * -from testlib import sorted +import threading, unittest +from sqlalchemy import util, sql, exc +from testlib import TestBase +from testlib.testing import eq_, is_, ne_ +from testlib.compat import frozenset, set, sorted class OrderedDictTest(TestBase): def test_odict(self): @@ -12,40 +13,37 @@ class OrderedDictTest(TestBase): o['snack'] = 'attack' o['c'] = 3 - self.assert_(o.keys() == ['a', 'b', 'snack', 'c']) - self.assert_(o.values() == [1, 2, 'attack', 3]) + eq_(o.keys(), ['a', 'b', 'snack', 'c']) + eq_(o.values(), [1, 2, 'attack', 3]) o.pop('snack') - self.assert_(o.keys() == ['a', 'b', 'c']) - self.assert_(o.values() == [1, 2, 3]) + eq_(o.keys(), ['a', 'b', 'c']) + eq_(o.values(), [1, 2, 3]) o2 = util.OrderedDict(d=4) o2['e'] = 5 - self.assert_(o2.keys() == ['d', 'e']) - self.assert_(o2.values() == [4, 5]) + eq_(o2.keys(), ['d', 'e']) + eq_(o2.values(), [4, 5]) o.update(o2) - self.assert_(o.keys() == ['a', 'b', 'c', 'd', 'e']) - self.assert_(o.values() == [1, 2, 3, 4, 5]) + eq_(o.keys(), ['a', 'b', 'c', 'd', 'e']) + eq_(o.values(), [1, 2, 3, 4, 5]) o.setdefault('c', 'zzz') o.setdefault('f', 6) - self.assert_(o.keys() == ['a', 'b', 'c', 'd', 'e', 'f']) - self.assert_(o.values() == [1, 2, 3, 4, 5, 6]) + eq_(o.keys(), ['a', 'b', 'c', 'd', 'e', 'f']) + eq_(o.values(), [1, 2, 3, 4, 5, 6]) class OrderedSetTest(TestBase): def test_mutators_against_iter(self): # testing a set modified against an iterator o = util.OrderedSet([3,2, 4, 5]) - self.assertEquals(o.difference(iter([3,4])), - util.OrderedSet([2,5])) - self.assertEquals(o.intersection(iter([3,4, 6])), - util.OrderedSet([3, 4])) - self.assertEquals(o.union(iter([3,4, 6])), - util.OrderedSet([2, 3, 4, 5, 6])) + eq_(o.difference(iter([3,4])), util.OrderedSet([2,5])) + eq_(o.intersection(iter([3,4, 6])), util.OrderedSet([3, 4])) + eq_(o.union(iter([3,4, 6])), util.OrderedSet([2, 3, 4, 5, 6])) class ColumnCollectionTest(TestBase): def test_in(self): @@ -59,8 +57,8 @@ class ColumnCollectionTest(TestBase): try: cc['col1'] in cc assert False - except exceptions.ArgumentError, e: - assert str(e) == "__contains__ requires a string argument" + except exc.ArgumentError, e: + eq_(str(e), "__contains__ requires a string argument") def test_compare(self): cc1 = sql.ColumnCollection() @@ -90,11 +88,11 @@ class ArgSingletonTest(unittest.TestCase): m3 = MyClass(3, 4) assert m1 is m3 assert m2 is not m3 - assert len(util.ArgSingleton.instances) == 2 + eq_(len(util.ArgSingleton.instances), 2) m1 = m2 = m3 = None MyClass.dispose(MyClass) - assert len(util.ArgSingleton.instances) == 0 + eq_(len(util.ArgSingleton.instances), 0) class ImmutableSubclass(str): @@ -140,7 +138,7 @@ class IdentitySetTest(unittest.TestCase): def assert_eq(self, identityset, expected_iterable): expected = sorted([id(o) for o in expected_iterable]) found = sorted([id(o) for o in identityset]) - self.assertEquals(found, expected) + eq_(found, expected) def test_init(self): ids = util.IdentitySet([1,2,3,2,1]) @@ -184,32 +182,35 @@ class IdentitySetTest(unittest.TestCase): ids.remove(o1) self.assertRaises(KeyError, ids.remove, o1) - self.assert_(ids.copy() == ids) - self.assert_(ids != None) - self.assert_(not(ids == None)) - self.assert_(ids != IdentitySet([o1,o2,o3])) + eq_(ids.copy(), ids) + + # explicit __eq__ and __ne__ tests + assert ids != None + assert not(ids == None) + + ne_(ids, IdentitySet([o1,o2,o3])) ids.clear() - self.assert_(o1 not in ids) + assert o1 not in ids ids.add(o2) - self.assert_(o2 in ids) - self.assert_(ids.pop() == o2) + assert o2 in ids + eq_(ids.pop(), o2) ids.add(o1) - self.assert_(len(ids) == 1) + eq_(len(ids), 1) isuper = IdentitySet([o1,o2]) - self.assert_(ids < isuper) - self.assert_(ids.issubset(isuper)) - self.assert_(isuper.issuperset(ids)) - self.assert_(isuper > ids) - - self.assert_(ids.union(isuper) == isuper) - self.assert_(ids | isuper == isuper) - self.assert_(isuper - ids == IdentitySet([o2])) - self.assert_(isuper.difference(ids) == IdentitySet([o2])) - self.assert_(ids.intersection(isuper) == IdentitySet([o1])) - self.assert_(ids & isuper == IdentitySet([o1])) - self.assert_(ids.symmetric_difference(isuper) == IdentitySet([o2])) - self.assert_(ids ^ isuper == IdentitySet([o2])) + assert ids < isuper + assert ids.issubset(isuper) + assert isuper.issuperset(ids) + assert isuper > ids + + eq_(ids.union(isuper), isuper) + eq_(ids | isuper, isuper) + eq_(isuper - ids, IdentitySet([o2])) + eq_(isuper.difference(ids), IdentitySet([o2])) + eq_(ids.intersection(isuper), IdentitySet([o1])) + eq_(ids & isuper, IdentitySet([o1])) + eq_(ids.symmetric_difference(isuper), IdentitySet([o2])) + eq_(ids ^ isuper, IdentitySet([o2])) ids.update(isuper) ids |= isuper @@ -223,16 +224,16 @@ class IdentitySetTest(unittest.TestCase): ids.update('foobar') try: ids |= 'foobar' - self.assert_(False) + assert False except TypeError: - self.assert_(True) + assert True try: s = set([o1,o2]) s |= ids - self.assert_(False) + assert False except TypeError: - self.assert_(True) + assert True self.assertRaises(TypeError, cmp, ids) self.assertRaises(TypeError, hash, ids) @@ -243,8 +244,8 @@ class IdentitySetTest(unittest.TestCase): s1 = set([1,2,3]) s2 = set([3,4,5]) - self.assertEquals(os1 - os2, util.IdentitySet([1, 2])) - self.assertEquals(os2 - os1, util.IdentitySet([4, 5])) + eq_(os1 - os2, util.IdentitySet([1, 2])) + eq_(os2 - os1, util.IdentitySet([4, 5])) self.assertRaises(TypeError, lambda: os1 - s2) self.assertRaises(TypeError, lambda: os1 - [3, 4, 5]) self.assertRaises(TypeError, lambda: s1 - os2) @@ -256,7 +257,7 @@ class DictlikeIteritemsTest(unittest.TestCase): def _ok(self, instance): iterator = util.dictlike_iteritems(instance) - self.assertEquals(set(iterator), self.baseline) + eq_(set(iterator), self.baseline) def _notok(self, instance): self.assertRaises(TypeError, @@ -322,6 +323,33 @@ class DictlikeIteritemsTest(unittest.TestCase): self._notok(duck6()) +class DuckTypeCollectionTest(TestBase): + def test_sets(self): + import sets + class SetLike(object): + def add(self): + pass + + class ForcedSet(list): + __emulates__ = set + + for type_ in (set, + sets.Set, + util.Set, + SetLike, + ForcedSet): + eq_(util.duck_type_collection(type_), util.Set) + instance = type_() + eq_(util.duck_type_collection(instance), util.Set) + + for type_ in (frozenset, + sets.ImmutableSet, + util.FrozenSet): + is_(util.duck_type_collection(type_), None) + instance = type_() + is_(util.duck_type_collection(instance), None) + + class ArgInspectionTest(TestBase): def test_get_cls_kwargs(self): class A(object): @@ -359,7 +387,7 @@ class ArgInspectionTest(TestBase): pass def test(cls, *expected): - self.assertEquals(set(util.get_cls_kwargs(cls)), set(expected)) + eq_(set(util.get_cls_kwargs(cls)), set(expected)) test(A, 'a') test(A1, 'a1') @@ -382,7 +410,7 @@ class ArgInspectionTest(TestBase): def f4(**foo): pass def test(fn, *expected): - self.assertEquals(set(util.get_func_kwargs(fn)), set(expected)) + eq_(set(util.get_func_kwargs(fn)), set(expected)) test(f1) test(f2, 'foo') @@ -419,7 +447,336 @@ class SymbolTest(TestBase): assert rt is sym1 assert rt is sym2 +class WeakIdentityMappingTest(TestBase): + class Data(object): + pass + + def _some_data(self, some=20): + return [self.Data() for _ in xrange(some)] + + def _fixture(self, some=20): + data = self._some_data() + wim = util.WeakIdentityMapping() + for idx, obj in enumerate(data): + wim[obj] = idx + return data, wim + + def test_delitem(self): + data, wim = self._fixture() + needle = data[-1] + + assert needle in wim + assert id(needle) in wim.by_id + eq_(wim[needle], wim.by_id[id(needle)]) + + del wim[needle] + + assert needle not in wim + assert id(needle) not in wim.by_id + eq_(len(wim), (len(data) - 1)) + + data.remove(needle) + + assert needle not in wim + assert id(needle) not in wim.by_id + eq_(len(wim), len(data)) + + def test_setitem(self): + data, wim = self._fixture() + + o1, oid1 = data[-1], id(data[-1]) + + assert o1 in wim + assert oid1 in wim.by_id + eq_(wim[o1], wim.by_id[oid1]) + id_keys = set(wim.by_id.keys()) + + wim[o1] = 1234 + assert o1 in wim + assert oid1 in wim.by_id + eq_(wim[o1], wim.by_id[oid1]) + eq_(set(wim.by_id.keys()), id_keys) + + o2 = self.Data() + oid2 = id(o2) + + wim[o2] = 5678 + assert o2 in wim + assert oid2 in wim.by_id + eq_(wim[o2], wim.by_id[oid2]) + + def test_pop(self): + data, wim = self._fixture() + needle = data[-1] + + needle = data.pop() + assert needle in wim + assert id(needle) in wim.by_id + eq_(wim[needle], wim.by_id[id(needle)]) + eq_(len(wim), (len(data) + 1)) + + wim.pop(needle) + assert needle not in wim + assert id(needle) not in wim.by_id + eq_(len(wim), len(data)) + + def test_pop_default(self): + data, wim = self._fixture() + needle = data[-1] + + value = wim[needle] + x = wim.pop(needle, 123) + ne_(x, 123) + eq_(x, value) + assert needle not in wim + assert id(needle) not in wim.by_id + eq_(len(data), (len(wim) + 1)) + + n2 = self.Data() + y = wim.pop(n2, 456) + eq_(y, 456) + assert n2 not in wim + assert id(n2) not in wim.by_id + eq_(len(data), (len(wim) + 1)) + + def test_popitem(self): + data, wim = self._fixture() + (needle, idx) = wim.popitem() + + assert needle in data + eq_(len(data), (len(wim) + 1)) + assert id(needle) not in wim.by_id + + def test_setdefault(self): + data, wim = self._fixture() + + o1 = self.Data() + oid1 = id(o1) + + assert o1 not in wim + + res1 = wim.setdefault(o1, 123) + assert o1 in wim + assert oid1 in wim.by_id + eq_(res1, 123) + id_keys = set(wim.by_id.keys()) + + res2 = wim.setdefault(o1, 456) + assert o1 in wim + assert oid1 in wim.by_id + eq_(res2, 123) + assert set(wim.by_id.keys()) == id_keys + + del wim[o1] + assert o1 not in wim + assert oid1 not in wim.by_id + ne_(set(wim.by_id.keys()), id_keys) + + res3 = wim.setdefault(o1, 789) + assert o1 in wim + assert oid1 in wim.by_id + eq_(res3, 789) + eq_(set(wim.by_id.keys()), id_keys) + + def test_clear(self): + data, wim = self._fixture() + + assert len(data) == len(wim) == len(wim.by_id) + wim.clear() + + eq_(wim, {}) + eq_(wim.by_id, {}) + + def test_update(self): + data, wim = self._fixture() + self.assertRaises(NotImplementedError, wim.update) + + def test_weak_clear(self): + data, wim = self._fixture() + + assert len(data) == len(wim) == len(wim.by_id) + + del data[:] + eq_(wim, {}) + eq_(wim.by_id, {}) + eq_(wim._weakrefs, {}) + + def test_weak_single(self): + data, wim = self._fixture() + + assert len(data) == len(wim) == len(wim.by_id) + + oid = id(data[0]) + del data[0] + + assert len(data) == len(wim) == len(wim.by_id) + assert oid not in wim.by_id + + def test_weak_threadhop(self): + data, wim = self._fixture() + data = set(data) + + cv = threading.Condition() + + def empty(obj): + cv.acquire() + obj.clear() + cv.notify() + cv.release() + + th = threading.Thread(target=empty, args=(data,)) + + cv.acquire() + th.start() + cv.wait() + cv.release() + + eq_(wim, {}) + eq_(wim.by_id, {}) + eq_(wim._weakrefs, {}) + + +class TestFormatArgspec(TestBase): + def test_specs(self): + def test(fn, wanted, grouped=None): + if grouped is None: + parsed = util.format_argspec_plus(fn) + else: + parsed = util.format_argspec_plus(fn, grouped=grouped) + eq_(parsed, wanted) + + test(lambda: None, + {'args': '()', 'self_arg': None, + 'apply_kw': '()', 'apply_pos': '()' }) + + test(lambda: None, + {'args': '', 'self_arg': None, + 'apply_kw': '', 'apply_pos': '' }, + grouped=False) + + test(lambda self: None, + {'args': '(self)', 'self_arg': 'self', + 'apply_kw': '(self)', 'apply_pos': '(self)' }) + + test(lambda self: None, + {'args': 'self', 'self_arg': 'self', + 'apply_kw': 'self', 'apply_pos': 'self' }, + grouped=False) + + test(lambda *a: None, + {'args': '(*a)', 'self_arg': None, + 'apply_kw': '(*a)', 'apply_pos': '(*a)' }) + + test(lambda **kw: None, + {'args': '(**kw)', 'self_arg': None, + 'apply_kw': '(**kw)', 'apply_pos': '(**kw)' }) + + test(lambda *a, **kw: None, + {'args': '(*a, **kw)', 'self_arg': None, + 'apply_kw': '(*a, **kw)', 'apply_pos': '(*a, **kw)' }) + + test(lambda a, *b: None, + {'args': '(a, *b)', 'self_arg': 'a', + 'apply_kw': '(a, *b)', 'apply_pos': '(a, *b)' }) + + test(lambda a, **b: None, + {'args': '(a, **b)', 'self_arg': 'a', + 'apply_kw': '(a, **b)', 'apply_pos': '(a, **b)' }) + + test(lambda a, *b, **c: None, + {'args': '(a, *b, **c)', 'self_arg': 'a', + 'apply_kw': '(a, *b, **c)', 'apply_pos': '(a, *b, **c)' }) + + test(lambda a, b=1, **c: None, + {'args': '(a, b=1, **c)', 'self_arg': 'a', + 'apply_kw': '(a, b=b, **c)', 'apply_pos': '(a, b, **c)' }) + + test(lambda a=1, b=2: None, + {'args': '(a=1, b=2)', 'self_arg': 'a', + 'apply_kw': '(a=a, b=b)', 'apply_pos': '(a, b)' }) + + test(lambda a=1, b=2: None, + {'args': 'a=1, b=2', 'self_arg': 'a', + 'apply_kw': 'a=a, b=b', 'apply_pos': 'a, b' }, + grouped=False) + + def test_init_grouped(self): + object_spec = { + 'args': '(self)', 'self_arg': 'self', + 'apply_pos': '(self)', 'apply_kw': '(self)'} + wrapper_spec = { + 'args': '(self, *args, **kwargs)', 'self_arg': 'self', + 'apply_pos': '(self, *args, **kwargs)', + 'apply_kw': '(self, *args, **kwargs)'} + custom_spec = { + 'args': '(slef, a=123)', 'self_arg': 'slef', # yes, slef + 'apply_pos': '(slef, a)', 'apply_kw': '(slef, a=a)'} + + self._test_init(None, object_spec, wrapper_spec, custom_spec) + self._test_init(True, object_spec, wrapper_spec, custom_spec) + + def test_init_bare(self): + object_spec = { + 'args': 'self', 'self_arg': 'self', + 'apply_pos': 'self', 'apply_kw': 'self'} + wrapper_spec = { + 'args': 'self, *args, **kwargs', 'self_arg': 'self', + 'apply_pos': 'self, *args, **kwargs', + 'apply_kw': 'self, *args, **kwargs'} + custom_spec = { + 'args': 'slef, a=123', 'self_arg': 'slef', # yes, slef + 'apply_pos': 'slef, a', 'apply_kw': 'slef, a=a'} + + self._test_init(False, object_spec, wrapper_spec, custom_spec) + + def _test_init(self, grouped, object_spec, wrapper_spec, custom_spec): + def test(fn, wanted): + if grouped is None: + parsed = util.format_argspec_init(fn) + else: + parsed = util.format_argspec_init(fn, grouped=grouped) + eq_(parsed, wanted) + + class O(object): pass + + test(O.__init__, object_spec) + + class O(object): + def __init__(self): + pass + + test(O.__init__, object_spec) + + class O(object): + def __init__(slef, a=123): + pass + + test(O.__init__, custom_spec) + + class O(list): pass + + test(O.__init__, wrapper_spec) + + class O(list): + def __init__(self, *args, **kwargs): + pass + + test(O.__init__, wrapper_spec) + + class O(list): + def __init__(self): + pass + + test(O.__init__, object_spec) + + class O(list): + def __init__(slef, a=123): + pass + + test(O.__init__, custom_spec) + class AsInterfaceTest(TestBase): + class Something(object): def _ignoreme(self): pass def foo(self): pass @@ -442,9 +799,9 @@ class AsInterfaceTest(TestBase): cls=self.Something, required=('foo')) obj = self.Something() - self.assertEqual(obj, util.as_interface(obj, cls=self.Something)) - self.assertEqual(obj, util.as_interface(obj, methods=('foo',))) - self.assertEqual( + eq_(obj, util.as_interface(obj, cls=self.Something)) + eq_(obj, util.as_interface(obj, methods=('foo',))) + eq_( obj, util.as_interface(obj, cls=self.Something, required=('outofband',))) partial = self.Partial() @@ -453,12 +810,11 @@ class AsInterfaceTest(TestBase): slotted.bar = lambda self: 123 for obj in partial, slotted: - self.assertEqual(obj, util.as_interface(obj, cls=self.Something)) + eq_(obj, util.as_interface(obj, cls=self.Something)) self.assertRaises(TypeError, util.as_interface, obj, methods=('foo')) - self.assertEqual(obj, util.as_interface(obj, methods=('bar',))) - self.assertEqual( - obj, util.as_interface(obj, cls=self.Something, + eq_(obj, util.as_interface(obj, methods=('bar',))) + eq_(obj, util.as_interface(obj, cls=self.Something, required=('bar',))) self.assertRaises(TypeError, util.as_interface, obj, cls=self.Something, required=('foo',)) |
