diff options
author | Sebastian Thiel <byronimo@gmail.com> | 2010-06-15 01:08:25 +0200 |
---|---|---|
committer | Sebastian Thiel <byronimo@gmail.com> | 2010-06-15 01:08:25 +0200 |
commit | 1d2307532d679393ae067326e4b6fa1a2ba5cc06 (patch) | |
tree | db41bfd94fcf9422aa4798e2a15234e3243c302f /test/git/test_utils.py | |
parent | 06590aee389f4466e02407f39af1674366a74705 (diff) | |
download | gitpython-1d2307532d679393ae067326e4b6fa1a2ba5cc06.tar.gz |
Moved LockedFD and its test into the gitdb project
Diffstat (limited to 'test/git/test_utils.py')
-rw-r--r-- | test/git/test_utils.py | 73 |
1 files changed, 1 insertions, 72 deletions
diff --git a/test/git/test_utils.py b/test/git/test_utils.py index fa07da1b..963e2b55 100644 --- a/test/git/test_utils.py +++ b/test/git/test_utils.py @@ -12,6 +12,7 @@ from git.utils import * from git.objects.utils import * from git import * from git.cmd import dashify + import time @@ -68,78 +69,6 @@ class TestUtils(TestCase): elapsed = time.time() - start assert elapsed <= wait_time + 0.02 # some extra time it may cost - def _cmp_contents(self, file_path, data): - # raise if data from file at file_path - # does not match data string - fp = open(file_path, "rb") - try: - assert fp.read() == data - finally: - fp.close() - - def test_safe_operation(self): - my_file = tempfile.mktemp() - orig_data = "hello" - new_data = "world" - my_file_fp = open(my_file, "wb") - my_file_fp.write(orig_data) - my_file_fp.close() - - try: - lfd = LockedFD(my_file) - lockfilepath = lfd._lockfilepath() - - # cannot end before it was started - self.failUnlessRaises(AssertionError, lfd.rollback) - self.failUnlessRaises(AssertionError, lfd.commit) - - # open for writing - assert not os.path.isfile(lockfilepath) - wfd = lfd.open(write=True) - assert lfd._fd is wfd - assert os.path.isfile(lockfilepath) - - # write data and fail - os.write(wfd, new_data) - lfd.rollback() - assert lfd._fd is None - self._cmp_contents(my_file, orig_data) - assert not os.path.isfile(lockfilepath) - - # additional call doesnt fail - lfd.commit() - lfd.rollback() - - # test reading - lfd = LockedFD(my_file) - rfd = lfd.open(write=False) - assert os.read(rfd, len(orig_data)) == orig_data - - assert os.path.isfile(lockfilepath) - # deletion rolls back - del(lfd) - assert not os.path.isfile(lockfilepath) - - - # write data - concurrently - lfd = LockedFD(my_file) - olfd = LockedFD(my_file) - assert not os.path.isfile(lockfilepath) - wfdstream = lfd.open(write=True, stream=True) # this time as stream - assert os.path.isfile(lockfilepath) - # another one fails - self.failUnlessRaises(IOError, olfd.open) - - wfdstream.write(new_data) - lfd.commit() - assert not os.path.isfile(lockfilepath) - self._cmp_contents(my_file, new_data) - - # could test automatic _end_writing on destruction - finally: - os.remove(my_file) - # END final cleanup - def test_user_id(self): assert '@' in get_user_id() |