1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
|
# test_utils.py
# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors
#
# This module is part of GitPython and is released under
# the BSD License: http://www.opensource.org/licenses/bsd-license.php
import os
import tempfile
from test.testlib import *
from git.utils import *
from git.objects.utils import *
from git import *
from git.cmd import dashify
import time
class TestUtils(TestCase):
def setup(self):
self.testdict = {
"string": "42",
"int": 42,
"array": [ 42 ],
}
def test_it_should_dashify(self):
assert_equal('this-is-my-argument', dashify('this_is_my_argument'))
assert_equal('foo', dashify('foo'))
def test_lock_file(self):
my_file = tempfile.mktemp()
lock_file = LockFile(my_file)
assert not lock_file._has_lock()
# release lock we don't have - fine
lock_file._release_lock()
# get lock
lock_file._obtain_lock_or_raise()
assert lock_file._has_lock()
# concurrent access
other_lock_file = LockFile(my_file)
assert not other_lock_file._has_lock()
self.failUnlessRaises(IOError, other_lock_file._obtain_lock_or_raise)
lock_file._release_lock()
assert not lock_file._has_lock()
other_lock_file._obtain_lock_or_raise()
self.failUnlessRaises(IOError, lock_file._obtain_lock_or_raise)
# auto-release on destruction
del(other_lock_file)
lock_file._obtain_lock_or_raise()
lock_file._release_lock()
def test_blocking_lock_file(self):
my_file = tempfile.mktemp()
lock_file = BlockingLockFile(my_file)
lock_file._obtain_lock()
# next one waits for the lock
start = time.time()
wait_time = 0.1
wait_lock = BlockingLockFile(my_file, 0.05, wait_time)
self.failUnlessRaises(IOError, wait_lock._obtain_lock)
elapsed = time.time() - start
assert elapsed <= wait_time + 0.02 # some extra time it may cost
def _cmp_contents(self, file_path, data):
# raise if data from file at file_path
# does not match data string
fp = open(file_path, "rb")
try:
assert fp.read() == data
finally:
fp.close()
def test_safe_operation(self):
my_file = tempfile.mktemp()
orig_data = "hello"
new_data = "world"
my_file_fp = open(my_file, "wb")
my_file_fp.write(orig_data)
my_file_fp.close()
try:
lfd = LockedFD(my_file)
lockfilepath = lfd._lockfilepath()
# cannot end before it was started
self.failUnlessRaises(AssertionError, lfd.rollback)
self.failUnlessRaises(AssertionError, lfd.commit)
# open for writing
assert not os.path.isfile(lockfilepath)
wfd = lfd.open(write=True)
assert lfd._fd is wfd
assert os.path.isfile(lockfilepath)
# write data and fail
os.write(wfd, new_data)
lfd.rollback()
assert lfd._fd is None
self._cmp_contents(my_file, orig_data)
assert not os.path.isfile(lockfilepath)
# additional call doesnt fail
lfd.commit()
lfd.rollback()
# test reading
lfd = LockedFD(my_file)
rfd = lfd.open(write=False)
assert os.read(rfd, len(orig_data)) == orig_data
assert os.path.isfile(lockfilepath)
# deletion rolls back
del(lfd)
assert not os.path.isfile(lockfilepath)
# write data - concurrently
lfd = LockedFD(my_file)
olfd = LockedFD(my_file)
assert not os.path.isfile(lockfilepath)
wfdstream = lfd.open(write=True, stream=True) # this time as stream
assert os.path.isfile(lockfilepath)
# another one fails
self.failUnlessRaises(IOError, olfd.open)
wfdstream.write(new_data)
lfd.commit()
assert not os.path.isfile(lockfilepath)
self._cmp_contents(my_file, new_data)
# could test automatic _end_writing on destruction
finally:
os.remove(my_file)
# END final cleanup
def test_user_id(self):
assert '@' in get_user_id()
def test_parse_date(self):
# test all supported formats
def assert_rval(rval, veri_time, offset=0):
assert len(rval) == 2
assert isinstance(rval[0], int) and isinstance(rval[1], int)
assert rval[0] == veri_time
assert rval[1] == offset
# now that we are here, test our conversion functions as well
utctz = altz_to_utctz_str(offset)
assert isinstance(utctz, basestring)
assert utctz_to_altz(verify_utctz(utctz)) == offset
# END assert rval utility
rfc = ("Thu, 07 Apr 2005 22:13:11 +0000", 0)
iso = ("2005-04-07T22:13:11 -0200", 7200)
iso2 = ("2005-04-07 22:13:11 +0400", -14400)
iso3 = ("2005.04.07 22:13:11 -0000", 0)
alt = ("04/07/2005 22:13:11", 0)
alt2 = ("07.04.2005 22:13:11", 0)
veri_time = 1112904791 # the time this represents
for date, offset in (rfc, iso, iso2, iso3, alt, alt2):
assert_rval(parse_date(date), veri_time, offset)
# END for each date type
# and failure
self.failUnlessRaises(ValueError, parse_date, 'invalid format')
self.failUnlessRaises(ValueError, parse_date, '123456789 -02000')
self.failUnlessRaises(ValueError, parse_date, ' 123456789 -0200')
|