summaryrefslogtreecommitdiff
path: root/test/git/test_utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/git/test_utils.py')
-rw-r--r--test/git/test_utils.py73
1 files changed, 1 insertions, 72 deletions
diff --git a/test/git/test_utils.py b/test/git/test_utils.py
index fa07da1b..963e2b55 100644
--- a/test/git/test_utils.py
+++ b/test/git/test_utils.py
@@ -12,6 +12,7 @@ from git.utils import *
from git.objects.utils import *
from git import *
from git.cmd import dashify
+
import time
@@ -68,78 +69,6 @@ class TestUtils(TestCase):
elapsed = time.time() - start
assert elapsed <= wait_time + 0.02 # some extra time it may cost
- def _cmp_contents(self, file_path, data):
- # raise if data from file at file_path
- # does not match data string
- fp = open(file_path, "rb")
- try:
- assert fp.read() == data
- finally:
- fp.close()
-
- def test_safe_operation(self):
- my_file = tempfile.mktemp()
- orig_data = "hello"
- new_data = "world"
- my_file_fp = open(my_file, "wb")
- my_file_fp.write(orig_data)
- my_file_fp.close()
-
- try:
- lfd = LockedFD(my_file)
- lockfilepath = lfd._lockfilepath()
-
- # cannot end before it was started
- self.failUnlessRaises(AssertionError, lfd.rollback)
- self.failUnlessRaises(AssertionError, lfd.commit)
-
- # open for writing
- assert not os.path.isfile(lockfilepath)
- wfd = lfd.open(write=True)
- assert lfd._fd is wfd
- assert os.path.isfile(lockfilepath)
-
- # write data and fail
- os.write(wfd, new_data)
- lfd.rollback()
- assert lfd._fd is None
- self._cmp_contents(my_file, orig_data)
- assert not os.path.isfile(lockfilepath)
-
- # additional call doesnt fail
- lfd.commit()
- lfd.rollback()
-
- # test reading
- lfd = LockedFD(my_file)
- rfd = lfd.open(write=False)
- assert os.read(rfd, len(orig_data)) == orig_data
-
- assert os.path.isfile(lockfilepath)
- # deletion rolls back
- del(lfd)
- assert not os.path.isfile(lockfilepath)
-
-
- # write data - concurrently
- lfd = LockedFD(my_file)
- olfd = LockedFD(my_file)
- assert not os.path.isfile(lockfilepath)
- wfdstream = lfd.open(write=True, stream=True) # this time as stream
- assert os.path.isfile(lockfilepath)
- # another one fails
- self.failUnlessRaises(IOError, olfd.open)
-
- wfdstream.write(new_data)
- lfd.commit()
- assert not os.path.isfile(lockfilepath)
- self._cmp_contents(my_file, new_data)
-
- # could test automatic _end_writing on destruction
- finally:
- os.remove(my_file)
- # END final cleanup
-
def test_user_id(self):
assert '@' in get_user_id()