summaryrefslogtreecommitdiff
path: root/test/git/test_utils.py
blob: 029d2054c683f4988c715d3cd96cbccb127752a9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# test_utils.py
# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors
#
# This module is part of GitPython and is released under
# the BSD License: http://www.opensource.org/licenses/bsd-license.php

import os
import tempfile

from test.testlib import *
from git.utils import *
from git import *
from git.cmd import dashify


class TestUtils(TestCase):
	def setup(self):
		self.testdict = {
			"string":	"42",
			"int":		42,
			"array":	[ 42 ],
		}

	def test_it_should_dashify(self):
		assert_equal('this-is-my-argument', dashify('this_is_my_argument'))
		assert_equal('foo', dashify('foo'))
		
		
	def test_lock_file(self):
		my_file = tempfile.mktemp()
		lock_file = LockFile(my_file)
		assert not lock_file._has_lock()
		# release lock we don't have  - fine
		lock_file._release_lock()
		
		# get lock
		lock_file._obtain_lock_or_raise()
		assert lock_file._has_lock()
		
		# concurrent access
		other_lock_file = LockFile(my_file)
		assert not other_lock_file._has_lock()
		self.failUnlessRaises(IOError, other_lock_file._obtain_lock_or_raise)
		
		lock_file._release_lock()
		assert not lock_file._has_lock()
		
		other_lock_file._obtain_lock_or_raise()
		self.failUnlessRaises(IOError, lock_file._obtain_lock_or_raise)
		
		# auto-release on destruction
		del(other_lock_file)
		lock_file._obtain_lock_or_raise()
		lock_file._release_lock()
		
	def _cmp_contents(self, file_path, data):
		# raise if data from file at file_path 
		# does not match data string
		fp = open(file_path, "r")
		try:
			assert fp.read() == data
		finally:
			fp.close()
		
	def test_safe_operation(self):
		my_file = tempfile.mktemp()
		orig_data = "hello"
		new_data = "world"
		my_file_fp = open(my_file, "w")
		my_file_fp.write(orig_data)
		my_file_fp.close()
		
		try:
			cwrite = ConcurrentWriteOperation(my_file)
			
			# didn't start writing, doesnt matter
			cwrite._end_writing(False)
			cwrite._end_writing(True)
			assert not cwrite._is_writing()
			
			# write data and fail
			stream = cwrite._begin_writing()
			assert cwrite._is_writing()
			stream.write(new_data)
			cwrite._end_writing(successful=False)
			self._cmp_contents(my_file, orig_data)
			assert not os.path.exists(stream.name)
			
			# write data - concurrently
			ocwrite = ConcurrentWriteOperation(my_file)
			stream = cwrite._begin_writing()
			self.failUnlessRaises(IOError, ocwrite._begin_writing)
			
			stream.write("world")
			cwrite._end_writing(successful=True)
			self._cmp_contents(my_file, new_data)
			assert not os.path.exists(stream.name)
				
			# could test automatic _end_writing on destruction
		finally:
			os.remove(my_file)
		# END final cleanup