summaryrefslogtreecommitdiff
path: root/test/git/odb/test_db.py
blob: 35ba86802ce4b730ce3c7e80831d0208c67bd575 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
"""Test for object db"""
from test.testlib import *
from lib import ZippedStoreShaWriter

from git.odb import *
from git.odb.stream import Sha1Writer
from git import Blob
from git.errors import BadObject


from cStringIO import StringIO
import os
		
class TestDB(TestBase):
	"""Test the different db class implementations"""
	
	# data
	two_lines = "1234\nhello world"
	
	all_data = (two_lines, )
	
	def _assert_object_writing(self, db):
		"""General tests to verify object writing, compatible to ObjectDBW
		:note: requires write access to the database"""
		# start in 'dry-run' mode, using a simple sha1 writer
		ostreams = (ZippedStoreShaWriter, None)
		for ostreamcls in ostreams:
			for data in self.all_data:
				dry_run = ostreamcls is not None
				ostream = None
				if ostreamcls is not None:
					ostream = ostreamcls()
					assert isinstance(ostream, Sha1Writer)
				# END create ostream
				
				prev_ostream = db.set_ostream(ostream)
				assert type(prev_ostream) in ostreams or prev_ostream in ostreams 
					
				istream = IStream(Blob.type, len(data), StringIO(data))
				
				# store returns same istream instance, with new sha set
				my_istream = db.store(istream)
				sha = istream.sha
				assert my_istream is istream
				assert db.has_object(sha) != dry_run
				assert len(sha) == 40		# for now we require 40 byte shas as default
				
				# verify data - the slow way, we want to run code
				if not dry_run:
					info = db.info(sha)
					assert Blob.type == info.type
					assert info.size == len(data)
					
					ostream = db.stream(sha)
					assert ostream.read() == data
					assert ostream.type == Blob.type
					assert ostream.size == len(data)
				else:
					self.failUnlessRaises(BadObject, db.info, sha)
					self.failUnlessRaises(BadObject, db.stream, sha)
					
					# DIRECT STREAM COPY
					# our data hase been written in object format to the StringIO
					# we pasesd as output stream. No physical database representation
					# was created.
					# Test direct stream copy of object streams, the result must be 
					# identical to what we fed in
					ostream.seek(0)
					istream.stream = ostream
					assert istream.sha is not None
					prev_sha = istream.sha
					
					db.set_ostream(ZippedStoreShaWriter())
					db.store(istream)
					assert istream.sha == prev_sha
					new_ostream = db.ostream()
					
					# note: only works as long our store write uses the same compression
					# level, which is zip
					assert ostream.getvalue() == new_ostream.getvalue()
			# END for each data set
		# END for each dry_run mode
				
	@with_bare_rw_repo
	def test_writing(self, rwrepo):
		ldb = LooseObjectDB(os.path.join(rwrepo.git_dir, 'objects'))
		
		# write data
		self._assert_object_writing(ldb)