From 72bcdbd0a0c8cc6aa2a7433169aa49c7fc19b55b Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Thu, 4 Mar 2010 09:06:03 +0100 Subject: Converted all tabs to 4 space characters each to comply with pep8 --- test/git/test_utils.py | 204 ++++++++++++++++++++++++------------------------- 1 file changed, 102 insertions(+), 102 deletions(-) (limited to 'test/git/test_utils.py') diff --git a/test/git/test_utils.py b/test/git/test_utils.py index ade2bffa..f843c12e 100644 --- a/test/git/test_utils.py +++ b/test/git/test_utils.py @@ -15,106 +15,106 @@ import time class TestUtils(TestCase): - def setup(self): - self.testdict = { - "string": "42", - "int": 42, - "array": [ 42 ], - } + def setup(self): + self.testdict = { + "string": "42", + "int": 42, + "array": [ 42 ], + } - def test_it_should_dashify(self): - assert_equal('this-is-my-argument', dashify('this_is_my_argument')) - assert_equal('foo', dashify('foo')) - - - def test_lock_file(self): - my_file = tempfile.mktemp() - lock_file = LockFile(my_file) - assert not lock_file._has_lock() - # release lock we don't have - fine - lock_file._release_lock() - - # get lock - lock_file._obtain_lock_or_raise() - assert lock_file._has_lock() - - # concurrent access - other_lock_file = LockFile(my_file) - assert not other_lock_file._has_lock() - self.failUnlessRaises(IOError, other_lock_file._obtain_lock_or_raise) - - lock_file._release_lock() - assert not lock_file._has_lock() - - other_lock_file._obtain_lock_or_raise() - self.failUnlessRaises(IOError, lock_file._obtain_lock_or_raise) - - # auto-release on destruction - del(other_lock_file) - lock_file._obtain_lock_or_raise() - lock_file._release_lock() - - def test_blocking_lock_file(self): - my_file = tempfile.mktemp() - lock_file = BlockingLockFile(my_file) - lock_file._obtain_lock() - - # next one waits for the lock - start = time.time() - wait_time = 0.1 - wait_lock = BlockingLockFile(my_file, 0.05, wait_time) - self.failUnlessRaises(IOError, wait_lock._obtain_lock) - elapsed = time.time() - start - assert elapsed <= wait_time + 0.02 # some extra time it may cost - - def _cmp_contents(self, file_path, data): - # raise if data from file at file_path - # does not match data string - fp = open(file_path, "rb") - try: - assert fp.read() == data - finally: - fp.close() - - def test_safe_operation(self): - my_file = tempfile.mktemp() - orig_data = "hello" - new_data = "world" - my_file_fp = open(my_file, "wb") - my_file_fp.write(orig_data) - my_file_fp.close() - - try: - cwrite = ConcurrentWriteOperation(my_file) - - # didn't start writing, doesnt matter - cwrite._end_writing(False) - cwrite._end_writing(True) - assert not cwrite._is_writing() - - # write data and fail - stream = cwrite._begin_writing() - assert cwrite._is_writing() - stream.write(new_data) - cwrite._end_writing(successful=False) - self._cmp_contents(my_file, orig_data) - assert not os.path.exists(stream.name) - - # write data - concurrently - ocwrite = ConcurrentWriteOperation(my_file) - stream = cwrite._begin_writing() - self.failUnlessRaises(IOError, ocwrite._begin_writing) - - stream.write("world") - cwrite._end_writing(successful=True) - self._cmp_contents(my_file, new_data) - assert not os.path.exists(stream.name) - - # could test automatic _end_writing on destruction - finally: - os.remove(my_file) - # END final cleanup - - - - + def test_it_should_dashify(self): + assert_equal('this-is-my-argument', dashify('this_is_my_argument')) + assert_equal('foo', dashify('foo')) + + + def test_lock_file(self): + my_file = tempfile.mktemp() + lock_file = LockFile(my_file) + assert not lock_file._has_lock() + # release lock we don't have - fine + lock_file._release_lock() + + # get lock + lock_file._obtain_lock_or_raise() + assert lock_file._has_lock() + + # concurrent access + other_lock_file = LockFile(my_file) + assert not other_lock_file._has_lock() + self.failUnlessRaises(IOError, other_lock_file._obtain_lock_or_raise) + + lock_file._release_lock() + assert not lock_file._has_lock() + + other_lock_file._obtain_lock_or_raise() + self.failUnlessRaises(IOError, lock_file._obtain_lock_or_raise) + + # auto-release on destruction + del(other_lock_file) + lock_file._obtain_lock_or_raise() + lock_file._release_lock() + + def test_blocking_lock_file(self): + my_file = tempfile.mktemp() + lock_file = BlockingLockFile(my_file) + lock_file._obtain_lock() + + # next one waits for the lock + start = time.time() + wait_time = 0.1 + wait_lock = BlockingLockFile(my_file, 0.05, wait_time) + self.failUnlessRaises(IOError, wait_lock._obtain_lock) + elapsed = time.time() - start + assert elapsed <= wait_time + 0.02 # some extra time it may cost + + def _cmp_contents(self, file_path, data): + # raise if data from file at file_path + # does not match data string + fp = open(file_path, "rb") + try: + assert fp.read() == data + finally: + fp.close() + + def test_safe_operation(self): + my_file = tempfile.mktemp() + orig_data = "hello" + new_data = "world" + my_file_fp = open(my_file, "wb") + my_file_fp.write(orig_data) + my_file_fp.close() + + try: + cwrite = ConcurrentWriteOperation(my_file) + + # didn't start writing, doesnt matter + cwrite._end_writing(False) + cwrite._end_writing(True) + assert not cwrite._is_writing() + + # write data and fail + stream = cwrite._begin_writing() + assert cwrite._is_writing() + stream.write(new_data) + cwrite._end_writing(successful=False) + self._cmp_contents(my_file, orig_data) + assert not os.path.exists(stream.name) + + # write data - concurrently + ocwrite = ConcurrentWriteOperation(my_file) + stream = cwrite._begin_writing() + self.failUnlessRaises(IOError, ocwrite._begin_writing) + + stream.write("world") + cwrite._end_writing(successful=True) + self._cmp_contents(my_file, new_data) + assert not os.path.exists(stream.name) + + # could test automatic _end_writing on destruction + finally: + os.remove(my_file) + # END final cleanup + + + + -- cgit v1.2.1