summaryrefslogtreecommitdiff
path: root/gitdb/test/lib.py
blob: 50645be65fda73b250a1bd5f581ea89bab9781f2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# Copyright (C) 2010, 2011 Sebastian Thiel (byronimo@gmail.com) and contributors
#
# This module is part of GitDB and is released under
# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
"""Utilities used in ODB testing"""
from gitdb import (
	OStream, 
	)
from gitdb.stream import ( 
							Sha1Writer, 
							ZippedStoreShaWriter
						)

from gitdb.util import zlib

import sys
import random
from array import array
from cStringIO import StringIO

import glob
import unittest
import tempfile
import shutil
import os
import gc


#{ Bases

class TestBase(unittest.TestCase):
	"""Base class for all tests"""
	

#} END bases

#{ Decorators

def with_rw_directory(func):
	"""Create a temporary directory which can be written to, remove it if the 
	test suceeds, but leave it otherwise to aid additional debugging"""
	def wrapper(self):
		path = tempfile.mktemp(prefix=func.__name__)
		os.mkdir(path)
		keep = False
		try:
			try:
				return func(self, path)
			except Exception:
				print >> sys.stderr, "Test %s.%s failed, output is at %r" % (type(self).__name__, func.__name__, path)
				keep = True
				raise
		finally:
			# Need to collect here to be sure all handles have been closed. It appears
			# a windows-only issue. In fact things should be deleted, as well as 
			# memory maps closed, once objects go out of scope. For some reason
			# though this is not the case here unless we collect explicitly.
			if not keep:
				gc.collect()
				shutil.rmtree(path)
		# END handle exception
	# END wrapper
	
	wrapper.__name__ = func.__name__
	return wrapper


def with_packs_rw(func):
	"""Function that provides a path into which the packs for testing should be 
	copied. Will pass on the path to the actual function afterwards"""
	def wrapper(self, path):
		src_pack_glob = fixture_path('packs/*')
		copy_files_globbed(src_pack_glob, path, hard_link_ok=True)
		return func(self, path)
	# END wrapper
	
	wrapper.__name__ = func.__name__
	return wrapper

#} END decorators

#{ Routines

def fixture_path(relapath=''):
	""":return: absolute path into the fixture directory
	:param relapath: relative path into the fixtures directory, or ''
		to obtain the fixture directory itself"""
	return os.path.join(os.path.dirname(__file__), 'fixtures', relapath)
	
def copy_files_globbed(source_glob, target_dir, hard_link_ok=False):
	"""Copy all files found according to the given source glob into the target directory
	:param hard_link_ok: if True, hard links will be created if possible. Otherwise 
		the files will be copied"""
	for src_file in glob.glob(source_glob):
		if hard_link_ok and hasattr(os, 'link'):
			target = os.path.join(target_dir, os.path.basename(src_file))
			try:
				os.link(src_file, target)
			except OSError:
				shutil.copy(src_file, target_dir)
			# END handle cross device links ( and resulting failure )
		else:
			shutil.copy(src_file, target_dir)
		# END try hard link
	# END for each file to copy
	

def make_bytes(size_in_bytes, randomize=False):
	""":return: string with given size in bytes
	:param randomize: try to produce a very random stream"""
	actual_size = size_in_bytes / 4
	producer = xrange(actual_size)
	if randomize:
		producer = list(producer)
		random.shuffle(producer)
	# END randomize
	a = array('i', producer)
	return a.tostring()

def make_object(type, data):
	""":return: bytes resembling an uncompressed object"""
	odata = "blob %i\0" % len(data)
	return odata + data
	
def make_memory_file(size_in_bytes, randomize=False):
	""":return: tuple(size_of_stream, stream)
	:param randomize: try to produce a very random stream"""
	d = make_bytes(size_in_bytes, randomize)
	return len(d), StringIO(d)

#} END routines

#{ Stream Utilities

class DummyStream(object):
		def __init__(self):
			self.was_read = False
			self.bytes = 0
			self.closed = False
			
		def read(self, size):
			self.was_read = True
			self.bytes = size
			
		def close(self):
			self.closed = True
			
		def _assert(self):
			assert self.was_read


class DeriveTest(OStream):
	def __init__(self, sha, type, size, stream, *args, **kwargs):
		self.myarg = kwargs.pop('myarg')
		self.args = args
		
	def _assert(self):
		assert self.args
		assert self.myarg

#} END stream utilitiess