diff options
author | Sebastian Thiel <byronimo@gmail.com> | 2014-02-09 21:23:51 +0100 |
---|---|---|
committer | Sebastian Thiel <byronimo@gmail.com> | 2014-02-09 21:23:51 +0100 |
commit | 15dd52cd578691930cea194e003fa80dd02f40eb (patch) | |
tree | ef4c9c5f705dd1ca743b7ceefe5b91b11ad15010 /git/test/test_util.py | |
parent | 660bdca125aa9dcca7a7730535bec433edb8ba02 (diff) | |
download | gitpython-15dd52cd578691930cea194e003fa80dd02f40eb.tar.gz |
tabs to 4 spaces - overall state of this branch is desolate, but fixable. Needs plenty of work
Diffstat (limited to 'git/test/test_util.py')
-rw-r--r-- | git/test/test_util.py | 474 |
1 files changed, 237 insertions, 237 deletions
diff --git a/git/test/test_util.py b/git/test/test_util.py index 7cfcad3f..d2ca8bf2 100644 --- a/git/test/test_util.py +++ b/git/test/test_util.py @@ -16,250 +16,250 @@ from git.cmd import dashify import time from git.util import ( - to_hex_sha, - to_bin_sha, - NULL_HEX_SHA, - LockedFD, - Actor, - IterableList - ) + to_hex_sha, + to_bin_sha, + NULL_HEX_SHA, + LockedFD, + Actor, + IterableList + ) class TestIterableMember(object): - """A member of an iterable list""" - __slots__ = ("name", "prefix_name") - - def __init__(self, name): - self.name = name - self.prefix_name = name - + """A member of an iterable list""" + __slots__ = ("name", "prefix_name") + + def __init__(self, name): + self.name = name + self.prefix_name = name + class TestUtils(TestBase): - def setup(self): - self.testdict = { - "string": "42", - "int": 42, - "array": [ 42 ], - } + def setup(self): + self.testdict = { + "string": "42", + "int": 42, + "array": [ 42 ], + } - def test_it_should_dashify(self): - assert 'this-is-my-argument' == dashify('this_is_my_argument') - assert 'foo' == dashify('foo') - - - def test_lock_file(self): - my_file = tempfile.mktemp() - lock_file = LockFile(my_file) - assert not lock_file._has_lock() - # release lock we don't have - fine - lock_file._release_lock() - - # get lock - lock_file._obtain_lock_or_raise() - assert lock_file._has_lock() - - # concurrent access - other_lock_file = LockFile(my_file) - assert not other_lock_file._has_lock() - self.failUnlessRaises(IOError, other_lock_file._obtain_lock_or_raise) - - lock_file._release_lock() - assert not lock_file._has_lock() - - other_lock_file._obtain_lock_or_raise() - self.failUnlessRaises(IOError, lock_file._obtain_lock_or_raise) - - # auto-release on destruction - del(other_lock_file) - lock_file._obtain_lock_or_raise() - lock_file._release_lock() - - def test_blocking_lock_file(self): - my_file = tempfile.mktemp() - lock_file = BlockingLockFile(my_file) - lock_file._obtain_lock() - - # next one waits for the lock - start = time.time() - wait_time = 0.1 - wait_lock = BlockingLockFile(my_file, 0.05, wait_time) - self.failUnlessRaises(IOError, wait_lock._obtain_lock) - elapsed = time.time() - start - assert elapsed <= wait_time + 0.02 # some extra time it may cost - - def test_user_id(self): - assert '@' in get_user_id() - - def test_parse_date(self): - # test all supported formats - def assert_rval(rval, veri_time, offset=0): - assert len(rval) == 2 - assert isinstance(rval[0], int) and isinstance(rval[1], int) - assert rval[0] == veri_time - assert rval[1] == offset - - # now that we are here, test our conversion functions as well - utctz = altz_to_utctz_str(offset) - assert isinstance(utctz, basestring) - assert utctz_to_altz(verify_utctz(utctz)) == offset - # END assert rval utility - - rfc = ("Thu, 07 Apr 2005 22:13:11 +0000", 0) - iso = ("2005-04-07T22:13:11 -0200", 7200) - iso2 = ("2005-04-07 22:13:11 +0400", -14400) - iso3 = ("2005.04.07 22:13:11 -0000", 0) - alt = ("04/07/2005 22:13:11", 0) - alt2 = ("07.04.2005 22:13:11", 0) - veri_time = 1112904791 # the time this represents - for date, offset in (rfc, iso, iso2, iso3, alt, alt2): - assert_rval(parse_date(date), veri_time, offset) - # END for each date type - - # and failure - self.failUnlessRaises(ValueError, parse_date, 'invalid format') - self.failUnlessRaises(ValueError, parse_date, '123456789 -02000') - self.failUnlessRaises(ValueError, parse_date, ' 123456789 -0200') - - def test_actor(self): - for cr in (None, self.rorepo.config_reader()): - assert isinstance(Actor.committer(cr), Actor) - assert isinstance(Actor.author(cr), Actor) - #END assure config reader is handled - - def test_basics(self): - assert to_hex_sha(NULL_HEX_SHA) == NULL_HEX_SHA - assert len(to_bin_sha(NULL_HEX_SHA)) == 20 - assert to_hex_sha(to_bin_sha(NULL_HEX_SHA)) == NULL_HEX_SHA - - def _cmp_contents(self, file_path, data): - # raise if data from file at file_path - # does not match data string - fp = open(file_path, "rb") - try: - assert fp.read() == data - finally: - fp.close() - - def test_lockedfd(self): - my_file = tempfile.mktemp() - orig_data = "hello" - new_data = "world" - my_file_fp = open(my_file, "wb") - my_file_fp.write(orig_data) - my_file_fp.close() - - try: - lfd = LockedFD(my_file) - lockfilepath = lfd._lockfilepath() - - # cannot end before it was started - self.failUnlessRaises(AssertionError, lfd.rollback) - self.failUnlessRaises(AssertionError, lfd.commit) - - # open for writing - assert not os.path.isfile(lockfilepath) - wfd = lfd.open(write=True) - assert lfd._fd is wfd - assert os.path.isfile(lockfilepath) - - # write data and fail - os.write(wfd, new_data) - lfd.rollback() - assert lfd._fd is None - self._cmp_contents(my_file, orig_data) - assert not os.path.isfile(lockfilepath) - - # additional call doesnt fail - lfd.commit() - lfd.rollback() - - # test reading - lfd = LockedFD(my_file) - rfd = lfd.open(write=False) - assert os.read(rfd, len(orig_data)) == orig_data - - assert os.path.isfile(lockfilepath) - # deletion rolls back - del(lfd) - assert not os.path.isfile(lockfilepath) - - - # write data - concurrently - lfd = LockedFD(my_file) - olfd = LockedFD(my_file) - assert not os.path.isfile(lockfilepath) - wfdstream = lfd.open(write=True, stream=True) # this time as stream - assert os.path.isfile(lockfilepath) - # another one fails - self.failUnlessRaises(IOError, olfd.open) - - wfdstream.write(new_data) - lfd.commit() - assert not os.path.isfile(lockfilepath) - self._cmp_contents(my_file, new_data) - - # could test automatic _end_writing on destruction - finally: - os.remove(my_file) - # END final cleanup - - # try non-existing file for reading - lfd = LockedFD(tempfile.mktemp()) - try: - lfd.open(write=False) - except OSError: - assert not os.path.exists(lfd._lockfilepath()) - else: - self.fail("expected OSError") - # END handle exceptions + def test_it_should_dashify(self): + assert 'this-is-my-argument' == dashify('this_is_my_argument') + assert 'foo' == dashify('foo') + + + def test_lock_file(self): + my_file = tempfile.mktemp() + lock_file = LockFile(my_file) + assert not lock_file._has_lock() + # release lock we don't have - fine + lock_file._release_lock() + + # get lock + lock_file._obtain_lock_or_raise() + assert lock_file._has_lock() + + # concurrent access + other_lock_file = LockFile(my_file) + assert not other_lock_file._has_lock() + self.failUnlessRaises(IOError, other_lock_file._obtain_lock_or_raise) + + lock_file._release_lock() + assert not lock_file._has_lock() + + other_lock_file._obtain_lock_or_raise() + self.failUnlessRaises(IOError, lock_file._obtain_lock_or_raise) + + # auto-release on destruction + del(other_lock_file) + lock_file._obtain_lock_or_raise() + lock_file._release_lock() + + def test_blocking_lock_file(self): + my_file = tempfile.mktemp() + lock_file = BlockingLockFile(my_file) + lock_file._obtain_lock() + + # next one waits for the lock + start = time.time() + wait_time = 0.1 + wait_lock = BlockingLockFile(my_file, 0.05, wait_time) + self.failUnlessRaises(IOError, wait_lock._obtain_lock) + elapsed = time.time() - start + assert elapsed <= wait_time + 0.02 # some extra time it may cost + + def test_user_id(self): + assert '@' in get_user_id() + + def test_parse_date(self): + # test all supported formats + def assert_rval(rval, veri_time, offset=0): + assert len(rval) == 2 + assert isinstance(rval[0], int) and isinstance(rval[1], int) + assert rval[0] == veri_time + assert rval[1] == offset + + # now that we are here, test our conversion functions as well + utctz = altz_to_utctz_str(offset) + assert isinstance(utctz, basestring) + assert utctz_to_altz(verify_utctz(utctz)) == offset + # END assert rval utility + + rfc = ("Thu, 07 Apr 2005 22:13:11 +0000", 0) + iso = ("2005-04-07T22:13:11 -0200", 7200) + iso2 = ("2005-04-07 22:13:11 +0400", -14400) + iso3 = ("2005.04.07 22:13:11 -0000", 0) + alt = ("04/07/2005 22:13:11", 0) + alt2 = ("07.04.2005 22:13:11", 0) + veri_time = 1112904791 # the time this represents + for date, offset in (rfc, iso, iso2, iso3, alt, alt2): + assert_rval(parse_date(date), veri_time, offset) + # END for each date type + + # and failure + self.failUnlessRaises(ValueError, parse_date, 'invalid format') + self.failUnlessRaises(ValueError, parse_date, '123456789 -02000') + self.failUnlessRaises(ValueError, parse_date, ' 123456789 -0200') + + def test_actor(self): + for cr in (None, self.rorepo.config_reader()): + assert isinstance(Actor.committer(cr), Actor) + assert isinstance(Actor.author(cr), Actor) + #END assure config reader is handled + + def test_basics(self): + assert to_hex_sha(NULL_HEX_SHA) == NULL_HEX_SHA + assert len(to_bin_sha(NULL_HEX_SHA)) == 20 + assert to_hex_sha(to_bin_sha(NULL_HEX_SHA)) == NULL_HEX_SHA + + def _cmp_contents(self, file_path, data): + # raise if data from file at file_path + # does not match data string + fp = open(file_path, "rb") + try: + assert fp.read() == data + finally: + fp.close() + + def test_lockedfd(self): + my_file = tempfile.mktemp() + orig_data = "hello" + new_data = "world" + my_file_fp = open(my_file, "wb") + my_file_fp.write(orig_data) + my_file_fp.close() + + try: + lfd = LockedFD(my_file) + lockfilepath = lfd._lockfilepath() + + # cannot end before it was started + self.failUnlessRaises(AssertionError, lfd.rollback) + self.failUnlessRaises(AssertionError, lfd.commit) + + # open for writing + assert not os.path.isfile(lockfilepath) + wfd = lfd.open(write=True) + assert lfd._fd is wfd + assert os.path.isfile(lockfilepath) + + # write data and fail + os.write(wfd, new_data) + lfd.rollback() + assert lfd._fd is None + self._cmp_contents(my_file, orig_data) + assert not os.path.isfile(lockfilepath) + + # additional call doesnt fail + lfd.commit() + lfd.rollback() + + # test reading + lfd = LockedFD(my_file) + rfd = lfd.open(write=False) + assert os.read(rfd, len(orig_data)) == orig_data + + assert os.path.isfile(lockfilepath) + # deletion rolls back + del(lfd) + assert not os.path.isfile(lockfilepath) + + + # write data - concurrently + lfd = LockedFD(my_file) + olfd = LockedFD(my_file) + assert not os.path.isfile(lockfilepath) + wfdstream = lfd.open(write=True, stream=True) # this time as stream + assert os.path.isfile(lockfilepath) + # another one fails + self.failUnlessRaises(IOError, olfd.open) + + wfdstream.write(new_data) + lfd.commit() + assert not os.path.isfile(lockfilepath) + self._cmp_contents(my_file, new_data) + + # could test automatic _end_writing on destruction + finally: + os.remove(my_file) + # END final cleanup + + # try non-existing file for reading + lfd = LockedFD(tempfile.mktemp()) + try: + lfd.open(write=False) + except OSError: + assert not os.path.exists(lfd._lockfilepath()) + else: + self.fail("expected OSError") + # END handle exceptions - def test_iterable_list(self): - for args in (('name',), ('name', 'prefix_')): - l = IterableList('name') - - m1 = TestIterableMember('one') - m2 = TestIterableMember('two') - - l.extend((m1, m2)) - - assert len(l) == 2 - - # contains works with name and identity - assert m1.name in l - assert m2.name in l - assert m2 in l - assert m2 in l - assert 'invalid' not in l - - # with string index - assert l[m1.name] is m1 - assert l[m2.name] is m2 - - # with int index - assert l[0] is m1 - assert l[1] is m2 - - # with getattr - assert l.one is m1 - assert l.two is m2 - - # test exceptions - self.failUnlessRaises(AttributeError, getattr, l, 'something') - self.failUnlessRaises(IndexError, l.__getitem__, 'something') - - # delete by name and index - self.failUnlessRaises(IndexError, l.__delitem__, 'something') - del(l[m2.name]) - assert len(l) == 1 - assert m2.name not in l and m1.name in l - del(l[0]) - assert m1.name not in l - assert len(l) == 0 - - self.failUnlessRaises(IndexError, l.__delitem__, 0) - self.failUnlessRaises(IndexError, l.__delitem__, 'something') - #END for each possible mode - + def test_iterable_list(self): + for args in (('name',), ('name', 'prefix_')): + l = IterableList('name') + + m1 = TestIterableMember('one') + m2 = TestIterableMember('two') + + l.extend((m1, m2)) + + assert len(l) == 2 + + # contains works with name and identity + assert m1.name in l + assert m2.name in l + assert m2 in l + assert m2 in l + assert 'invalid' not in l + + # with string index + assert l[m1.name] is m1 + assert l[m2.name] is m2 + + # with int index + assert l[0] is m1 + assert l[1] is m2 + + # with getattr + assert l.one is m1 + assert l.two is m2 + + # test exceptions + self.failUnlessRaises(AttributeError, getattr, l, 'something') + self.failUnlessRaises(IndexError, l.__getitem__, 'something') + + # delete by name and index + self.failUnlessRaises(IndexError, l.__delitem__, 'something') + del(l[m2.name]) + assert len(l) == 1 + assert m2.name not in l and m1.name in l + del(l[0]) + assert m1.name not in l + assert len(l) == 0 + + self.failUnlessRaises(IndexError, l.__delitem__, 0) + self.failUnlessRaises(IndexError, l.__delitem__, 'something') + #END for each possible mode + class TestActor(TestBase): def test_from_string_should_separate_name_and_email(self): |