# Copyright (C) 2010, 2011 Sebastian Thiel (byronimo@gmail.com) and contributors # # This module is part of GitDB and is released under # the New BSD License: http://www.opensource.org/licenses/bsd-license.php """Utilities used in ODB testing""" #{ Bases from array import array from functools import wraps import gc import glob from io import BytesIO import logging import os import random import shutil import sys import tempfile import unittest from gitdb.test import HIDE_WINDOWS_KNOWN_ERRORS from gitdb.util import rmtree, mman from gitdb.utils.compat import xrange log = logging.getLogger(__name__) class TestBase(unittest.TestCase): """Base class for all tests TestCase providing access to readonly repositories using the following member variables. * gitrepopath * read-only base path of the git source repository, i.e. .../git/.git """ #{ Invvariants k_env_git_repo = "GITDB_TEST_GIT_REPO_BASE" #} END invariants @classmethod def setUpClass(cls): try: super(TestBase, cls).setUpClass() except AttributeError: pass cls.gitrepopath = os.environ.get(cls.k_env_git_repo) if not cls.gitrepopath: log.info("You can set the %s environment variable to a .git repository of your choice" " - defaulting to the gitdb repository", cls.k_env_git_repo) ospd = os.path.dirname cls.gitrepopath = os.path.join(ospd(ospd(ospd(__file__))), '.git') # end assure gitrepo is set assert cls.gitrepopath.endswith('.git') #} END bases #{ Decorators def skip_on_travis_ci(func): """All tests decorated with this one will raise SkipTest when run on travis ci. Use it to workaround difficult to solve issues NOTE: copied from bcore (https://github.com/Byron/bcore)""" @wraps(func) def wrapper(self, *args, **kwargs): if 'TRAVIS' in os.environ: import nose raise nose.SkipTest("Cannot run on travis-ci") # end check for travis ci return func(self, *args, **kwargs) # end wrapper return wrapper def with_rw_directory(func): """Create a temporary directory which can be written to, remove it if the test succeeds, but leave it otherwise to aid additional debugging""" def wrapper(self): path = tempfile.mktemp(prefix=func.__name__) os.mkdir(path) keep = False try: try: return func(self, path) except Exception: sys.stderr.write("Test %s.%s failed, output is at %r\n" % (type(self).__name__, func.__name__, path)) keep = True raise finally: # Need to collect here to be sure all handles have been closed. It appears # a windows-only issue. In fact things should be deleted, as well as # memory maps closed, once objects go out of scope. For some reason # though this is not the case here unless we collect explicitly. if not keep: if HIDE_WINDOWS_KNOWN_ERRORS: ## Or else 2 Windows TCs fail with: # File "D:\Work\gitdb.git\gitdb\util.py", line 141, in onerror # func(path) # Will scream if still not possible to delete. # PermissionError: [WinError 32] The process cannot access the file # because it is being used by another process: 'sss\\index_cc_wll5' mman.collect() gc.collect() rmtree(path) # END handle exception # END wrapper wrapper.__name__ = func.__name__ return wrapper def with_packs_rw(func): """Function that provides a path into which the packs for testing should be copied. Will pass on the path to the actual function afterwards""" def wrapper(self, path): src_pack_glob = fixture_path('packs/*') copy_files_globbed(src_pack_glob, path, hard_link_ok=True) return func(self, path) # END wrapper wrapper.__name__ = func.__name__ return wrapper #} END decorators #{ Routines def fixture_path(relapath=''): """:return: absolute path into the fixture directory :param relapath: relative path into the fixtures directory, or '' to obtain the fixture directory itself""" return os.path.join(os.path.dirname(__file__), 'fixtures', relapath) def copy_files_globbed(source_glob, target_dir, hard_link_ok=False): """Copy all files found according to the given source glob into the target directory :param hard_link_ok: if True, hard links will be created if possible. Otherwise the files will be copied""" for src_file in glob.glob(source_glob): if hard_link_ok and hasattr(os, 'link'): target = os.path.join(target_dir, os.path.basename(src_file)) try: os.link(src_file, target) except OSError: shutil.copy(src_file, target_dir) # END handle cross device links ( and resulting failure ) else: shutil.copy(src_file, target_dir) # END try hard link # END for each file to copy def make_bytes(size_in_bytes, randomize=False): """:return: string with given size in bytes :param randomize: try to produce a very random stream""" actual_size = size_in_bytes // 4 producer = xrange(actual_size) if randomize: producer = list(producer) random.shuffle(producer) # END randomize a = array('i', producer) return a.tostring() def make_object(otype, data): """:return: bytes resembling an uncompressed object""" odata = "blob %i\0" % len(data) return odata.encode("ascii") + data def make_memory_file(size_in_bytes, randomize=False): """:return: tuple(size_of_stream, stream) :param randomize: try to produce a very random stream""" d = make_bytes(size_in_bytes, randomize) return len(d), BytesIO(d) #} END routines #{ Stream Utilities class DummyStream(object): def __init__(self): self.was_read = False self.bytes = 0 self.closed = False def read(self, size): self.was_read = True self.bytes = size def close(self): self.closed = True def _assert(self): assert self.was_read #} END stream utilitiess