summaryrefslogtreecommitdiff
path: root/git/test/test_util.py
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2014-02-09 21:23:51 +0100
committerSebastian Thiel <byronimo@gmail.com>2014-02-09 21:23:51 +0100
commit15dd52cd578691930cea194e003fa80dd02f40eb (patch)
treeef4c9c5f705dd1ca743b7ceefe5b91b11ad15010 /git/test/test_util.py
parent660bdca125aa9dcca7a7730535bec433edb8ba02 (diff)
downloadgitpython-15dd52cd578691930cea194e003fa80dd02f40eb.tar.gz
tabs to 4 spaces - overall state of this branch is desolate, but fixable. Needs plenty of work
Diffstat (limited to 'git/test/test_util.py')
-rw-r--r--git/test/test_util.py474
1 files changed, 237 insertions, 237 deletions
diff --git a/git/test/test_util.py b/git/test/test_util.py
index 7cfcad3f..d2ca8bf2 100644
--- a/git/test/test_util.py
+++ b/git/test/test_util.py
@@ -16,250 +16,250 @@ from git.cmd import dashify
import time
from git.util import (
- to_hex_sha,
- to_bin_sha,
- NULL_HEX_SHA,
- LockedFD,
- Actor,
- IterableList
- )
+ to_hex_sha,
+ to_bin_sha,
+ NULL_HEX_SHA,
+ LockedFD,
+ Actor,
+ IterableList
+ )
class TestIterableMember(object):
- """A member of an iterable list"""
- __slots__ = ("name", "prefix_name")
-
- def __init__(self, name):
- self.name = name
- self.prefix_name = name
-
+ """A member of an iterable list"""
+ __slots__ = ("name", "prefix_name")
+
+ def __init__(self, name):
+ self.name = name
+ self.prefix_name = name
+
class TestUtils(TestBase):
- def setup(self):
- self.testdict = {
- "string": "42",
- "int": 42,
- "array": [ 42 ],
- }
+ def setup(self):
+ self.testdict = {
+ "string": "42",
+ "int": 42,
+ "array": [ 42 ],
+ }
- def test_it_should_dashify(self):
- assert 'this-is-my-argument' == dashify('this_is_my_argument')
- assert 'foo' == dashify('foo')
-
-
- def test_lock_file(self):
- my_file = tempfile.mktemp()
- lock_file = LockFile(my_file)
- assert not lock_file._has_lock()
- # release lock we don't have - fine
- lock_file._release_lock()
-
- # get lock
- lock_file._obtain_lock_or_raise()
- assert lock_file._has_lock()
-
- # concurrent access
- other_lock_file = LockFile(my_file)
- assert not other_lock_file._has_lock()
- self.failUnlessRaises(IOError, other_lock_file._obtain_lock_or_raise)
-
- lock_file._release_lock()
- assert not lock_file._has_lock()
-
- other_lock_file._obtain_lock_or_raise()
- self.failUnlessRaises(IOError, lock_file._obtain_lock_or_raise)
-
- # auto-release on destruction
- del(other_lock_file)
- lock_file._obtain_lock_or_raise()
- lock_file._release_lock()
-
- def test_blocking_lock_file(self):
- my_file = tempfile.mktemp()
- lock_file = BlockingLockFile(my_file)
- lock_file._obtain_lock()
-
- # next one waits for the lock
- start = time.time()
- wait_time = 0.1
- wait_lock = BlockingLockFile(my_file, 0.05, wait_time)
- self.failUnlessRaises(IOError, wait_lock._obtain_lock)
- elapsed = time.time() - start
- assert elapsed <= wait_time + 0.02 # some extra time it may cost
-
- def test_user_id(self):
- assert '@' in get_user_id()
-
- def test_parse_date(self):
- # test all supported formats
- def assert_rval(rval, veri_time, offset=0):
- assert len(rval) == 2
- assert isinstance(rval[0], int) and isinstance(rval[1], int)
- assert rval[0] == veri_time
- assert rval[1] == offset
-
- # now that we are here, test our conversion functions as well
- utctz = altz_to_utctz_str(offset)
- assert isinstance(utctz, basestring)
- assert utctz_to_altz(verify_utctz(utctz)) == offset
- # END assert rval utility
-
- rfc = ("Thu, 07 Apr 2005 22:13:11 +0000", 0)
- iso = ("2005-04-07T22:13:11 -0200", 7200)
- iso2 = ("2005-04-07 22:13:11 +0400", -14400)
- iso3 = ("2005.04.07 22:13:11 -0000", 0)
- alt = ("04/07/2005 22:13:11", 0)
- alt2 = ("07.04.2005 22:13:11", 0)
- veri_time = 1112904791 # the time this represents
- for date, offset in (rfc, iso, iso2, iso3, alt, alt2):
- assert_rval(parse_date(date), veri_time, offset)
- # END for each date type
-
- # and failure
- self.failUnlessRaises(ValueError, parse_date, 'invalid format')
- self.failUnlessRaises(ValueError, parse_date, '123456789 -02000')
- self.failUnlessRaises(ValueError, parse_date, ' 123456789 -0200')
-
- def test_actor(self):
- for cr in (None, self.rorepo.config_reader()):
- assert isinstance(Actor.committer(cr), Actor)
- assert isinstance(Actor.author(cr), Actor)
- #END assure config reader is handled
-
- def test_basics(self):
- assert to_hex_sha(NULL_HEX_SHA) == NULL_HEX_SHA
- assert len(to_bin_sha(NULL_HEX_SHA)) == 20
- assert to_hex_sha(to_bin_sha(NULL_HEX_SHA)) == NULL_HEX_SHA
-
- def _cmp_contents(self, file_path, data):
- # raise if data from file at file_path
- # does not match data string
- fp = open(file_path, "rb")
- try:
- assert fp.read() == data
- finally:
- fp.close()
-
- def test_lockedfd(self):
- my_file = tempfile.mktemp()
- orig_data = "hello"
- new_data = "world"
- my_file_fp = open(my_file, "wb")
- my_file_fp.write(orig_data)
- my_file_fp.close()
-
- try:
- lfd = LockedFD(my_file)
- lockfilepath = lfd._lockfilepath()
-
- # cannot end before it was started
- self.failUnlessRaises(AssertionError, lfd.rollback)
- self.failUnlessRaises(AssertionError, lfd.commit)
-
- # open for writing
- assert not os.path.isfile(lockfilepath)
- wfd = lfd.open(write=True)
- assert lfd._fd is wfd
- assert os.path.isfile(lockfilepath)
-
- # write data and fail
- os.write(wfd, new_data)
- lfd.rollback()
- assert lfd._fd is None
- self._cmp_contents(my_file, orig_data)
- assert not os.path.isfile(lockfilepath)
-
- # additional call doesnt fail
- lfd.commit()
- lfd.rollback()
-
- # test reading
- lfd = LockedFD(my_file)
- rfd = lfd.open(write=False)
- assert os.read(rfd, len(orig_data)) == orig_data
-
- assert os.path.isfile(lockfilepath)
- # deletion rolls back
- del(lfd)
- assert not os.path.isfile(lockfilepath)
-
-
- # write data - concurrently
- lfd = LockedFD(my_file)
- olfd = LockedFD(my_file)
- assert not os.path.isfile(lockfilepath)
- wfdstream = lfd.open(write=True, stream=True) # this time as stream
- assert os.path.isfile(lockfilepath)
- # another one fails
- self.failUnlessRaises(IOError, olfd.open)
-
- wfdstream.write(new_data)
- lfd.commit()
- assert not os.path.isfile(lockfilepath)
- self._cmp_contents(my_file, new_data)
-
- # could test automatic _end_writing on destruction
- finally:
- os.remove(my_file)
- # END final cleanup
-
- # try non-existing file for reading
- lfd = LockedFD(tempfile.mktemp())
- try:
- lfd.open(write=False)
- except OSError:
- assert not os.path.exists(lfd._lockfilepath())
- else:
- self.fail("expected OSError")
- # END handle exceptions
+ def test_it_should_dashify(self):
+ assert 'this-is-my-argument' == dashify('this_is_my_argument')
+ assert 'foo' == dashify('foo')
+
+
+ def test_lock_file(self):
+ my_file = tempfile.mktemp()
+ lock_file = LockFile(my_file)
+ assert not lock_file._has_lock()
+ # release lock we don't have - fine
+ lock_file._release_lock()
+
+ # get lock
+ lock_file._obtain_lock_or_raise()
+ assert lock_file._has_lock()
+
+ # concurrent access
+ other_lock_file = LockFile(my_file)
+ assert not other_lock_file._has_lock()
+ self.failUnlessRaises(IOError, other_lock_file._obtain_lock_or_raise)
+
+ lock_file._release_lock()
+ assert not lock_file._has_lock()
+
+ other_lock_file._obtain_lock_or_raise()
+ self.failUnlessRaises(IOError, lock_file._obtain_lock_or_raise)
+
+ # auto-release on destruction
+ del(other_lock_file)
+ lock_file._obtain_lock_or_raise()
+ lock_file._release_lock()
+
+ def test_blocking_lock_file(self):
+ my_file = tempfile.mktemp()
+ lock_file = BlockingLockFile(my_file)
+ lock_file._obtain_lock()
+
+ # next one waits for the lock
+ start = time.time()
+ wait_time = 0.1
+ wait_lock = BlockingLockFile(my_file, 0.05, wait_time)
+ self.failUnlessRaises(IOError, wait_lock._obtain_lock)
+ elapsed = time.time() - start
+ assert elapsed <= wait_time + 0.02 # some extra time it may cost
+
+ def test_user_id(self):
+ assert '@' in get_user_id()
+
+ def test_parse_date(self):
+ # test all supported formats
+ def assert_rval(rval, veri_time, offset=0):
+ assert len(rval) == 2
+ assert isinstance(rval[0], int) and isinstance(rval[1], int)
+ assert rval[0] == veri_time
+ assert rval[1] == offset
+
+ # now that we are here, test our conversion functions as well
+ utctz = altz_to_utctz_str(offset)
+ assert isinstance(utctz, basestring)
+ assert utctz_to_altz(verify_utctz(utctz)) == offset
+ # END assert rval utility
+
+ rfc = ("Thu, 07 Apr 2005 22:13:11 +0000", 0)
+ iso = ("2005-04-07T22:13:11 -0200", 7200)
+ iso2 = ("2005-04-07 22:13:11 +0400", -14400)
+ iso3 = ("2005.04.07 22:13:11 -0000", 0)
+ alt = ("04/07/2005 22:13:11", 0)
+ alt2 = ("07.04.2005 22:13:11", 0)
+ veri_time = 1112904791 # the time this represents
+ for date, offset in (rfc, iso, iso2, iso3, alt, alt2):
+ assert_rval(parse_date(date), veri_time, offset)
+ # END for each date type
+
+ # and failure
+ self.failUnlessRaises(ValueError, parse_date, 'invalid format')
+ self.failUnlessRaises(ValueError, parse_date, '123456789 -02000')
+ self.failUnlessRaises(ValueError, parse_date, ' 123456789 -0200')
+
+ def test_actor(self):
+ for cr in (None, self.rorepo.config_reader()):
+ assert isinstance(Actor.committer(cr), Actor)
+ assert isinstance(Actor.author(cr), Actor)
+ #END assure config reader is handled
+
+ def test_basics(self):
+ assert to_hex_sha(NULL_HEX_SHA) == NULL_HEX_SHA
+ assert len(to_bin_sha(NULL_HEX_SHA)) == 20
+ assert to_hex_sha(to_bin_sha(NULL_HEX_SHA)) == NULL_HEX_SHA
+
+ def _cmp_contents(self, file_path, data):
+ # raise if data from file at file_path
+ # does not match data string
+ fp = open(file_path, "rb")
+ try:
+ assert fp.read() == data
+ finally:
+ fp.close()
+
+ def test_lockedfd(self):
+ my_file = tempfile.mktemp()
+ orig_data = "hello"
+ new_data = "world"
+ my_file_fp = open(my_file, "wb")
+ my_file_fp.write(orig_data)
+ my_file_fp.close()
+
+ try:
+ lfd = LockedFD(my_file)
+ lockfilepath = lfd._lockfilepath()
+
+ # cannot end before it was started
+ self.failUnlessRaises(AssertionError, lfd.rollback)
+ self.failUnlessRaises(AssertionError, lfd.commit)
+
+ # open for writing
+ assert not os.path.isfile(lockfilepath)
+ wfd = lfd.open(write=True)
+ assert lfd._fd is wfd
+ assert os.path.isfile(lockfilepath)
+
+ # write data and fail
+ os.write(wfd, new_data)
+ lfd.rollback()
+ assert lfd._fd is None
+ self._cmp_contents(my_file, orig_data)
+ assert not os.path.isfile(lockfilepath)
+
+ # additional call doesnt fail
+ lfd.commit()
+ lfd.rollback()
+
+ # test reading
+ lfd = LockedFD(my_file)
+ rfd = lfd.open(write=False)
+ assert os.read(rfd, len(orig_data)) == orig_data
+
+ assert os.path.isfile(lockfilepath)
+ # deletion rolls back
+ del(lfd)
+ assert not os.path.isfile(lockfilepath)
+
+
+ # write data - concurrently
+ lfd = LockedFD(my_file)
+ olfd = LockedFD(my_file)
+ assert not os.path.isfile(lockfilepath)
+ wfdstream = lfd.open(write=True, stream=True) # this time as stream
+ assert os.path.isfile(lockfilepath)
+ # another one fails
+ self.failUnlessRaises(IOError, olfd.open)
+
+ wfdstream.write(new_data)
+ lfd.commit()
+ assert not os.path.isfile(lockfilepath)
+ self._cmp_contents(my_file, new_data)
+
+ # could test automatic _end_writing on destruction
+ finally:
+ os.remove(my_file)
+ # END final cleanup
+
+ # try non-existing file for reading
+ lfd = LockedFD(tempfile.mktemp())
+ try:
+ lfd.open(write=False)
+ except OSError:
+ assert not os.path.exists(lfd._lockfilepath())
+ else:
+ self.fail("expected OSError")
+ # END handle exceptions
- def test_iterable_list(self):
- for args in (('name',), ('name', 'prefix_')):
- l = IterableList('name')
-
- m1 = TestIterableMember('one')
- m2 = TestIterableMember('two')
-
- l.extend((m1, m2))
-
- assert len(l) == 2
-
- # contains works with name and identity
- assert m1.name in l
- assert m2.name in l
- assert m2 in l
- assert m2 in l
- assert 'invalid' not in l
-
- # with string index
- assert l[m1.name] is m1
- assert l[m2.name] is m2
-
- # with int index
- assert l[0] is m1
- assert l[1] is m2
-
- # with getattr
- assert l.one is m1
- assert l.two is m2
-
- # test exceptions
- self.failUnlessRaises(AttributeError, getattr, l, 'something')
- self.failUnlessRaises(IndexError, l.__getitem__, 'something')
-
- # delete by name and index
- self.failUnlessRaises(IndexError, l.__delitem__, 'something')
- del(l[m2.name])
- assert len(l) == 1
- assert m2.name not in l and m1.name in l
- del(l[0])
- assert m1.name not in l
- assert len(l) == 0
-
- self.failUnlessRaises(IndexError, l.__delitem__, 0)
- self.failUnlessRaises(IndexError, l.__delitem__, 'something')
- #END for each possible mode
-
+ def test_iterable_list(self):
+ for args in (('name',), ('name', 'prefix_')):
+ l = IterableList('name')
+
+ m1 = TestIterableMember('one')
+ m2 = TestIterableMember('two')
+
+ l.extend((m1, m2))
+
+ assert len(l) == 2
+
+ # contains works with name and identity
+ assert m1.name in l
+ assert m2.name in l
+ assert m2 in l
+ assert m2 in l
+ assert 'invalid' not in l
+
+ # with string index
+ assert l[m1.name] is m1
+ assert l[m2.name] is m2
+
+ # with int index
+ assert l[0] is m1
+ assert l[1] is m2
+
+ # with getattr
+ assert l.one is m1
+ assert l.two is m2
+
+ # test exceptions
+ self.failUnlessRaises(AttributeError, getattr, l, 'something')
+ self.failUnlessRaises(IndexError, l.__getitem__, 'something')
+
+ # delete by name and index
+ self.failUnlessRaises(IndexError, l.__delitem__, 'something')
+ del(l[m2.name])
+ assert len(l) == 1
+ assert m2.name not in l and m1.name in l
+ del(l[0])
+ assert m1.name not in l
+ assert len(l) == 0
+
+ self.failUnlessRaises(IndexError, l.__delitem__, 0)
+ self.failUnlessRaises(IndexError, l.__delitem__, 'something')
+ #END for each possible mode
+
class TestActor(TestBase):
def test_from_string_should_separate_name_and_email(self):