summaryrefslogtreecommitdiff
path: root/test/git/test_utils.py
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2009-11-04 19:53:42 +0100
committerSebastian Thiel <byronimo@gmail.com>2009-11-04 19:53:42 +0100
commitd884adc80c80300b4cc05321494713904ef1df2d (patch)
tree3878d5e0282531596d42505d8725482dde002c20 /test/git/test_utils.py
parent05d2687afcc78cd192714ee3d71fdf36a37d110f (diff)
parentace1fed6321bb8dd6d38b2f58d7cf815fa16db7a (diff)
downloadgitpython-d884adc80c80300b4cc05321494713904ef1df2d.tar.gz
Merge branch 'improvements'
Diffstat (limited to 'test/git/test_utils.py')
-rw-r--r--test/git/test_utils.py87
1 files changed, 86 insertions, 1 deletions
diff --git a/test/git/test_utils.py b/test/git/test_utils.py
index 2983a14a..029d2054 100644
--- a/test/git/test_utils.py
+++ b/test/git/test_utils.py
@@ -5,10 +5,15 @@
# the BSD License: http://www.opensource.org/licenses/bsd-license.php
import os
+import tempfile
+
from test.testlib import *
+from git.utils import *
from git import *
+from git.cmd import dashify
+
-class TestUtils(object):
+class TestUtils(TestCase):
def setup(self):
self.testdict = {
"string": "42",
@@ -19,3 +24,83 @@ class TestUtils(object):
def test_it_should_dashify(self):
assert_equal('this-is-my-argument', dashify('this_is_my_argument'))
assert_equal('foo', dashify('foo'))
+
+
+ def test_lock_file(self):
+ my_file = tempfile.mktemp()
+ lock_file = LockFile(my_file)
+ assert not lock_file._has_lock()
+ # release lock we don't have - fine
+ lock_file._release_lock()
+
+ # get lock
+ lock_file._obtain_lock_or_raise()
+ assert lock_file._has_lock()
+
+ # concurrent access
+ other_lock_file = LockFile(my_file)
+ assert not other_lock_file._has_lock()
+ self.failUnlessRaises(IOError, other_lock_file._obtain_lock_or_raise)
+
+ lock_file._release_lock()
+ assert not lock_file._has_lock()
+
+ other_lock_file._obtain_lock_or_raise()
+ self.failUnlessRaises(IOError, lock_file._obtain_lock_or_raise)
+
+ # auto-release on destruction
+ del(other_lock_file)
+ lock_file._obtain_lock_or_raise()
+ lock_file._release_lock()
+
+ def _cmp_contents(self, file_path, data):
+ # raise if data from file at file_path
+ # does not match data string
+ fp = open(file_path, "r")
+ try:
+ assert fp.read() == data
+ finally:
+ fp.close()
+
+ def test_safe_operation(self):
+ my_file = tempfile.mktemp()
+ orig_data = "hello"
+ new_data = "world"
+ my_file_fp = open(my_file, "w")
+ my_file_fp.write(orig_data)
+ my_file_fp.close()
+
+ try:
+ cwrite = ConcurrentWriteOperation(my_file)
+
+ # didn't start writing, doesnt matter
+ cwrite._end_writing(False)
+ cwrite._end_writing(True)
+ assert not cwrite._is_writing()
+
+ # write data and fail
+ stream = cwrite._begin_writing()
+ assert cwrite._is_writing()
+ stream.write(new_data)
+ cwrite._end_writing(successful=False)
+ self._cmp_contents(my_file, orig_data)
+ assert not os.path.exists(stream.name)
+
+ # write data - concurrently
+ ocwrite = ConcurrentWriteOperation(my_file)
+ stream = cwrite._begin_writing()
+ self.failUnlessRaises(IOError, ocwrite._begin_writing)
+
+ stream.write("world")
+ cwrite._end_writing(successful=True)
+ self._cmp_contents(my_file, new_data)
+ assert not os.path.exists(stream.name)
+
+ # could test automatic _end_writing on destruction
+ finally:
+ os.remove(my_file)
+ # END final cleanup
+
+
+
+