summaryrefslogtreecommitdiff
path: root/tests/thread_test.py
blob: 76e9b2481d96df3b53560058a886d00727cf0540 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import gc
import weakref

import eventlet
from eventlet import corolocal
from eventlet import event
from eventlet import greenthread
from eventlet import patcher
from eventlet.green import thread
import six

from tests import LimitedTestCase


class Locals(LimitedTestCase):
    def passthru(self, *args, **kw):
        self.results.append((args, kw))
        return args, kw

    def setUp(self):
        self.results = []
        super(Locals, self).setUp()

    def tearDown(self):
        self.results = []
        super(Locals, self).tearDown()

    def test_assignment(self):
        my_local = corolocal.local()
        my_local.a = 1

        def do_something():
            my_local.b = 2
            self.assertEqual(my_local.b, 2)
            try:
                my_local.a
                self.fail()
            except AttributeError:
                pass

        eventlet.spawn(do_something).wait()
        self.assertEqual(my_local.a, 1)

    def test_calls_init(self):
        init_args = []

        class Init(corolocal.local):
            def __init__(self, *args):
                init_args.append((args, eventlet.getcurrent()))

        my_local = Init(1, 2, 3)
        self.assertEqual(init_args[0][0], (1, 2, 3))
        self.assertEqual(init_args[0][1], eventlet.getcurrent())

        def do_something():
            my_local.foo = 'bar'
            self.assertEqual(len(init_args), 2, init_args)
            self.assertEqual(init_args[1][0], (1, 2, 3))
            self.assertEqual(init_args[1][1], eventlet.getcurrent())

        eventlet.spawn(do_something).wait()

    def test_calling_methods(self):
        class Caller(corolocal.local):
            def callme(self):
                return self.foo

        my_local = Caller()
        my_local.foo = "foo1"
        self.assertEqual("foo1", my_local.callme())

        def do_something():
            my_local.foo = "foo2"
            self.assertEqual("foo2", my_local.callme())

        eventlet.spawn(do_something).wait()

        my_local.foo = "foo3"
        self.assertEqual("foo3", my_local.callme())

    def test_no_leaking(self):
        refs = weakref.WeakKeyDictionary()
        my_local = corolocal.local()

        class X(object):
            pass

        def do_something(i):
            o = X()
            refs[o] = True
            my_local.foo = o

        p = eventlet.GreenPool()
        for i in six.moves.range(100):
            p.spawn(do_something, i)
        p.waitall()
        del p
        gc.collect()
        eventlet.sleep(0)
        gc.collect()
        # at this point all our coros have terminated
        self.assertEqual(len(refs), 1)


def test_compat_lock_release():
    # https://github.com/eventlet/eventlet/issues/697
    for mod in (patcher.original("threading"), thread):
        try:
            mod.Lock().release()
        except RuntimeError as e:
            # python3
            assert "release unlocked lock" in str(e).lower(), str((mod, e))
        except thread.error as e:
            # python2.7
            assert "release unlocked lock" in str(e).lower(), str((mod, e))


def test_reinit():
    # py39+ expects locks to have a _at_fork_reinit() method
    # https://github.com/eventlet/eventlet/pull/721#pullrequestreview-769377850
    lk = thread.Lock()
    lk.acquire()
    lk._at_fork_reinit()
    assert lk.acquire(blocking=False)
    assert not lk.acquire(blocking=False)