summaryrefslogtreecommitdiff
path: root/gitdb/test/lib.py
blob: d09b1cb8e2b11868127d3294a0bb98837bfe212a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# Copyright (C) 2010, 2011 Sebastian Thiel (byronimo@gmail.com) and contributors
#
# This module is part of GitDB and is released under
# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
"""Utilities used in ODB testing"""
from gitdb import OStream
from gitdb.utils.compat import xrange

import sys
import random
from array import array

from io import BytesIO

import glob
import unittest
import tempfile
import shutil
import os
import gc
from functools import wraps


#{ Bases

class TestBase(unittest.TestCase):
    """Base class for all tests"""


#} END bases

#{ Decorators

def skip_on_travis_ci(func):
    """All tests decorated with this one will raise SkipTest when run on travis ci.
    Use it to workaround difficult to solve issues
    NOTE: copied from bcore (https://github.com/Byron/bcore)"""
    @wraps(func)
    def wrapper(self, *args, **kwargs):
        if 'TRAVIS' in os.environ:
            import nose
            raise nose.SkipTest("Cannot run on travis-ci")
        # end check for travis ci
        return func(self, *args, **kwargs)
    # end wrapper
    return wrapper


def with_rw_directory(func):
    """Create a temporary directory which can be written to, remove it if the
    test suceeds, but leave it otherwise to aid additional debugging"""
    def wrapper(self):
        path = tempfile.mktemp(prefix=func.__name__)
        os.mkdir(path)
        keep = False
        try:
            try:
                return func(self, path)
            except Exception:
                sys.stderr.write("Test %s.%s failed, output is at %r\n" % (type(self).__name__, func.__name__, path))
                keep = True
                raise
        finally:
            # Need to collect here to be sure all handles have been closed. It appears
            # a windows-only issue. In fact things should be deleted, as well as
            # memory maps closed, once objects go out of scope. For some reason
            # though this is not the case here unless we collect explicitly.
            if not keep:
                gc.collect()
                shutil.rmtree(path)
        # END handle exception
    # END wrapper

    wrapper.__name__ = func.__name__
    return wrapper


def with_packs_rw(func):
    """Function that provides a path into which the packs for testing should be
    copied. Will pass on the path to the actual function afterwards"""
    def wrapper(self, path):
        src_pack_glob = fixture_path('packs/*')
        copy_files_globbed(src_pack_glob, path, hard_link_ok=True)
        return func(self, path)
    # END wrapper

    wrapper.__name__ = func.__name__
    return wrapper

#} END decorators

#{ Routines

def fixture_path(relapath=''):
    """:return: absolute path into the fixture directory
    :param relapath: relative path into the fixtures directory, or ''
        to obtain the fixture directory itself"""
    return os.path.join(os.path.dirname(__file__), 'fixtures', relapath)

def copy_files_globbed(source_glob, target_dir, hard_link_ok=False):
    """Copy all files found according to the given source glob into the target directory
    :param hard_link_ok: if True, hard links will be created if possible. Otherwise
        the files will be copied"""
    for src_file in glob.glob(source_glob):
        if hard_link_ok and hasattr(os, 'link'):
            target = os.path.join(target_dir, os.path.basename(src_file))
            try:
                os.link(src_file, target)
            except OSError:
                shutil.copy(src_file, target_dir)
            # END handle cross device links ( and resulting failure )
        else:
            shutil.copy(src_file, target_dir)
        # END try hard link
    # END for each file to copy


def make_bytes(size_in_bytes, randomize=False):
    """:return: string with given size in bytes
    :param randomize: try to produce a very random stream"""
    actual_size = size_in_bytes // 4
    producer = xrange(actual_size)
    if randomize:
        producer = list(producer)
        random.shuffle(producer)
    # END randomize
    a = array('i', producer)
    return a.tostring()

def make_object(type, data):
    """:return: bytes resembling an uncompressed object"""
    odata = "blob %i\0" % len(data)
    return odata.encode("ascii") + data

def make_memory_file(size_in_bytes, randomize=False):
    """:return: tuple(size_of_stream, stream)
    :param randomize: try to produce a very random stream"""
    d = make_bytes(size_in_bytes, randomize)
    return len(d), BytesIO(d)

#} END routines

#{ Stream Utilities

class DummyStream(object):
        def __init__(self):
            self.was_read = False
            self.bytes = 0
            self.closed = False

        def read(self, size):
            self.was_read = True
            self.bytes = size

        def close(self):
            self.closed = True

        def _assert(self):
            assert self.was_read


class DeriveTest(OStream):
    def __init__(self, sha, type, size, stream, *args, **kwargs):
        self.myarg = kwargs.pop('myarg')
        self.args = args

    def _assert(self):
        assert self.args
        assert self.myarg

#} END stream utilitiess