summaryrefslogtreecommitdiff
path: root/git/test/test_util.py
diff options
context:
space:
mode:
Diffstat (limited to 'git/test/test_util.py')
-rw-r--r--git/test/test_util.py111
1 files changed, 56 insertions, 55 deletions
diff --git a/git/test/test_util.py b/git/test/test_util.py
index d2ca8bf2..66103d4c 100644
--- a/git/test/test_util.py
+++ b/git/test/test_util.py
@@ -16,69 +16,70 @@ from git.cmd import dashify
import time
from git.util import (
- to_hex_sha,
- to_bin_sha,
- NULL_HEX_SHA,
- LockedFD,
+ to_hex_sha,
+ to_bin_sha,
+ NULL_HEX_SHA,
+ LockedFD,
Actor,
IterableList
- )
+)
class TestIterableMember(object):
+
"""A member of an iterable list"""
__slots__ = ("name", "prefix_name")
-
+
def __init__(self, name):
self.name = name
self.prefix_name = name
-
+
class TestUtils(TestBase):
+
def setup(self):
self.testdict = {
"string": "42",
"int": 42,
- "array": [ 42 ],
+ "array": [42],
}
def test_it_should_dashify(self):
assert 'this-is-my-argument' == dashify('this_is_my_argument')
assert 'foo' == dashify('foo')
-
-
+
def test_lock_file(self):
my_file = tempfile.mktemp()
lock_file = LockFile(my_file)
assert not lock_file._has_lock()
# release lock we don't have - fine
lock_file._release_lock()
-
+
# get lock
lock_file._obtain_lock_or_raise()
assert lock_file._has_lock()
-
+
# concurrent access
other_lock_file = LockFile(my_file)
assert not other_lock_file._has_lock()
self.failUnlessRaises(IOError, other_lock_file._obtain_lock_or_raise)
-
+
lock_file._release_lock()
assert not lock_file._has_lock()
-
+
other_lock_file._obtain_lock_or_raise()
self.failUnlessRaises(IOError, lock_file._obtain_lock_or_raise)
-
+
# auto-release on destruction
del(other_lock_file)
lock_file._obtain_lock_or_raise()
lock_file._release_lock()
-
+
def test_blocking_lock_file(self):
my_file = tempfile.mktemp()
lock_file = BlockingLockFile(my_file)
lock_file._obtain_lock()
-
+
# next one waits for the lock
start = time.time()
wait_time = 0.1
@@ -86,10 +87,10 @@ class TestUtils(TestBase):
self.failUnlessRaises(IOError, wait_lock._obtain_lock)
elapsed = time.time() - start
assert elapsed <= wait_time + 0.02 # some extra time it may cost
-
+
def test_user_id(self):
assert '@' in get_user_id()
-
+
def test_parse_date(self):
# test all supported formats
def assert_rval(rval, veri_time, offset=0):
@@ -97,13 +98,13 @@ class TestUtils(TestBase):
assert isinstance(rval[0], int) and isinstance(rval[1], int)
assert rval[0] == veri_time
assert rval[1] == offset
-
+
# now that we are here, test our conversion functions as well
utctz = altz_to_utctz_str(offset)
assert isinstance(utctz, basestring)
assert utctz_to_altz(verify_utctz(utctz)) == offset
# END assert rval utility
-
+
rfc = ("Thu, 07 Apr 2005 22:13:11 +0000", 0)
iso = ("2005-04-07T22:13:11 -0200", 7200)
iso2 = ("2005-04-07 22:13:11 +0400", -14400)
@@ -114,32 +115,32 @@ class TestUtils(TestBase):
for date, offset in (rfc, iso, iso2, iso3, alt, alt2):
assert_rval(parse_date(date), veri_time, offset)
# END for each date type
-
+
# and failure
self.failUnlessRaises(ValueError, parse_date, 'invalid format')
self.failUnlessRaises(ValueError, parse_date, '123456789 -02000')
self.failUnlessRaises(ValueError, parse_date, ' 123456789 -0200')
-
+
def test_actor(self):
for cr in (None, self.rorepo.config_reader()):
assert isinstance(Actor.committer(cr), Actor)
assert isinstance(Actor.author(cr), Actor)
- #END assure config reader is handled
-
+ # END assure config reader is handled
+
def test_basics(self):
assert to_hex_sha(NULL_HEX_SHA) == NULL_HEX_SHA
assert len(to_bin_sha(NULL_HEX_SHA)) == 20
assert to_hex_sha(to_bin_sha(NULL_HEX_SHA)) == NULL_HEX_SHA
-
+
def _cmp_contents(self, file_path, data):
- # raise if data from file at file_path
+ # raise if data from file at file_path
# does not match data string
fp = open(file_path, "rb")
try:
assert fp.read() == data
finally:
fp.close()
-
+
def test_lockedfd(self):
my_file = tempfile.mktemp()
orig_data = "hello"
@@ -147,43 +148,42 @@ class TestUtils(TestBase):
my_file_fp = open(my_file, "wb")
my_file_fp.write(orig_data)
my_file_fp.close()
-
+
try:
lfd = LockedFD(my_file)
- lockfilepath = lfd._lockfilepath()
-
+ lockfilepath = lfd._lockfilepath()
+
# cannot end before it was started
self.failUnlessRaises(AssertionError, lfd.rollback)
self.failUnlessRaises(AssertionError, lfd.commit)
-
+
# open for writing
assert not os.path.isfile(lockfilepath)
wfd = lfd.open(write=True)
assert lfd._fd is wfd
assert os.path.isfile(lockfilepath)
-
+
# write data and fail
os.write(wfd, new_data)
lfd.rollback()
assert lfd._fd is None
self._cmp_contents(my_file, orig_data)
assert not os.path.isfile(lockfilepath)
-
+
# additional call doesnt fail
lfd.commit()
lfd.rollback()
-
+
# test reading
lfd = LockedFD(my_file)
rfd = lfd.open(write=False)
assert os.read(rfd, len(orig_data)) == orig_data
-
+
assert os.path.isfile(lockfilepath)
# deletion rolls back
del(lfd)
assert not os.path.isfile(lockfilepath)
-
-
+
# write data - concurrently
lfd = LockedFD(my_file)
olfd = LockedFD(my_file)
@@ -192,17 +192,17 @@ class TestUtils(TestBase):
assert os.path.isfile(lockfilepath)
# another one fails
self.failUnlessRaises(IOError, olfd.open)
-
+
wfdstream.write(new_data)
lfd.commit()
assert not os.path.isfile(lockfilepath)
self._cmp_contents(my_file, new_data)
-
+
# could test automatic _end_writing on destruction
finally:
os.remove(my_file)
# END final cleanup
-
+
# try non-existing file for reading
lfd = LockedFD(tempfile.mktemp())
try:
@@ -216,37 +216,37 @@ class TestUtils(TestBase):
def test_iterable_list(self):
for args in (('name',), ('name', 'prefix_')):
l = IterableList('name')
-
+
m1 = TestIterableMember('one')
m2 = TestIterableMember('two')
-
+
l.extend((m1, m2))
-
+
assert len(l) == 2
-
+
# contains works with name and identity
assert m1.name in l
assert m2.name in l
assert m2 in l
assert m2 in l
assert 'invalid' not in l
-
+
# with string index
assert l[m1.name] is m1
assert l[m2.name] is m2
-
+
# with int index
assert l[0] is m1
assert l[1] is m2
-
+
# with getattr
assert l.one is m1
assert l.two is m2
-
+
# test exceptions
self.failUnlessRaises(AttributeError, getattr, l, 'something')
self.failUnlessRaises(IndexError, l.__getitem__, 'something')
-
+
# delete by name and index
self.failUnlessRaises(IndexError, l.__delitem__, 'something')
del(l[m2.name])
@@ -255,21 +255,22 @@ class TestUtils(TestBase):
del(l[0])
assert m1.name not in l
assert len(l) == 0
-
+
self.failUnlessRaises(IndexError, l.__delitem__, 0)
self.failUnlessRaises(IndexError, l.__delitem__, 'something')
- #END for each possible mode
-
+ # END for each possible mode
+
class TestActor(TestBase):
+
def test_from_string_should_separate_name_and_email(self):
a = Actor._from_string("Michael Trier <mtrier@example.com>")
assert "Michael Trier" == a.name
assert "mtrier@example.com" == a.email
-
+
# base type capabilities
assert a == a
- assert not ( a != a )
+ assert not (a != a)
m = set()
m.add(a)
m.add(a)