1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
|
# test_utils.py
# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors
#
# This module is part of GitPython and is released under
# the BSD License: http://www.opensource.org/licenses/bsd-license.php
import os
import tempfile
from test.testlib import *
from git.utils import *
from git import *
from git.cmd import dashify
class TestUtils(TestCase):
def setup(self):
self.testdict = {
"string": "42",
"int": 42,
"array": [ 42 ],
}
def test_it_should_dashify(self):
assert_equal('this-is-my-argument', dashify('this_is_my_argument'))
assert_equal('foo', dashify('foo'))
def test_lock_file(self):
my_file = tempfile.mktemp()
lock_file = LockFile(my_file)
assert not lock_file._has_lock()
# release lock we don't have - fine
lock_file._release_lock()
# get lock
lock_file._obtain_lock_or_raise()
assert lock_file._has_lock()
# concurrent access
other_lock_file = LockFile(my_file)
assert not other_lock_file._has_lock()
self.failUnlessRaises(IOError, other_lock_file._obtain_lock_or_raise)
lock_file._release_lock()
assert not lock_file._has_lock()
other_lock_file._obtain_lock_or_raise()
self.failUnlessRaises(IOError, lock_file._obtain_lock_or_raise)
# auto-release on destruction
del(other_lock_file)
lock_file._obtain_lock_or_raise()
lock_file._release_lock()
def _cmp_contents(self, file_path, data):
# raise if data from file at file_path
# does not match data string
fp = open(file_path, "r")
try:
assert fp.read() == data
finally:
fp.close()
def test_safe_operation(self):
my_file = tempfile.mktemp()
orig_data = "hello"
new_data = "world"
my_file_fp = open(my_file, "w")
my_file_fp.write(orig_data)
my_file_fp.close()
try:
cwrite = ConcurrentWriteOperation(my_file)
# didn't start writing, doesnt matter
cwrite._end_writing(False)
cwrite._end_writing(True)
assert not cwrite._is_writing()
# write data and fail
stream = cwrite._begin_writing()
assert cwrite._is_writing()
stream.write(new_data)
cwrite._end_writing(successful=False)
self._cmp_contents(my_file, orig_data)
assert not os.path.exists(stream.name)
# write data - concurrently
ocwrite = ConcurrentWriteOperation(my_file)
stream = cwrite._begin_writing()
self.failUnlessRaises(IOError, ocwrite._begin_writing)
stream.write("world")
cwrite._end_writing(successful=True)
self._cmp_contents(my_file, new_data)
assert not os.path.exists(stream.name)
# could test automatic _end_writing on destruction
finally:
os.remove(my_file)
# END final cleanup
|