summaryrefslogtreecommitdiff
path: root/gitdb/test
diff options
context:
space:
mode:
Diffstat (limited to 'gitdb/test')
-rw-r--r--gitdb/test/__init__.py9
-rw-r--r--gitdb/test/db/test_base.py17
-rw-r--r--gitdb/test/lib.py199
-rw-r--r--gitdb/test/object/test_blob.py21
-rw-r--r--gitdb/test/object/test_commit.py274
-rw-r--r--gitdb/test/object/test_tree.py143
-rw-r--r--gitdb/test/performance/lib.py48
-rw-r--r--gitdb/test/performance/test_pack.py89
-rw-r--r--gitdb/test/performance/test_stream.py189
-rw-r--r--gitdb/test/test_base.py91
-rw-r--r--gitdb/test/test_config.py101
-rw-r--r--gitdb/test/test_example.py63
-rw-r--r--gitdb/test/test_pack.py246
-rw-r--r--gitdb/test/test_refs.py39
-rw-r--r--gitdb/test/test_util.py125
15 files changed, 20 insertions, 1634 deletions
diff --git a/gitdb/test/__init__.py b/gitdb/test/__init__.py
index 760f531..a19fc87 100644
--- a/gitdb/test/__init__.py
+++ b/gitdb/test/__init__.py
@@ -3,14 +3,5 @@
# This module is part of GitDB and is released under
# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
-import gitdb.util
-#{ Initialization
-def _init_pool():
- """Assure the pool is actually threaded"""
- size = 2
- print "Setting ThreadPool to %i" % size
- gitdb.util.pool.set_size(size)
-
-#} END initialization
diff --git a/gitdb/test/db/test_base.py b/gitdb/test/db/test_base.py
index 0a381be..8b13789 100644
--- a/gitdb/test/db/test_base.py
+++ b/gitdb/test/db/test_base.py
@@ -1,18 +1 @@
-# Copyright (C) 2010, 2011 Sebastian Thiel (byronimo@gmail.com) and contributors
-#
-# This module is part of GitDB and is released under
-# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
-from lib import *
-from gitdb.db import RefSpec
-class TestBase(TestDBBase):
-
- @with_rw_directory
- def test_basics(self, path):
- self.failUnlessRaises(ValueError, RefSpec, None, None)
- rs = RefSpec(None, "something")
- assert rs.force == False
- assert rs.delete_destination()
- assert rs.source is None
- assert rs.destination == "something"
-
diff --git a/gitdb/test/lib.py b/gitdb/test/lib.py
index 9224f5f..8b13789 100644
--- a/gitdb/test/lib.py
+++ b/gitdb/test/lib.py
@@ -1,200 +1 @@
-# Copyright (C) 2010, 2011 Sebastian Thiel (byronimo@gmail.com) and contributors
-#
-# This module is part of PureGitDB and is released under
-# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
-"""Utilities used in ODB testing"""
-from gitdb import OStream
-from gitdb.db.py import PureGitDB
-from gitdb.stream import (
- Sha1Writer,
- ZippedStoreShaWriter
- )
-
-from gitdb.util import zlib
-
-import sys
-import random
-from array import array
-from cStringIO import StringIO
-
-import glob
-import unittest
-import tempfile
-import shutil
-import os
-import gc
-
-
-#{ Decorators
-
-def with_rw_directory(func):
- """Create a temporary directory which can be written to, remove it if the
- test suceeds, but leave it otherwise to aid additional debugging"""
- def wrapper(self):
- path = tempfile.mktemp(prefix=func.__name__)
- os.mkdir(path)
- keep = False
- try:
- try:
- return func(self, path)
- except Exception:
- print >> sys.stderr, "Test %s.%s failed, output is at %r" % (type(self).__name__, func.__name__, path)
- keep = True
- raise
- finally:
- # Need to collect here to be sure all handles have been closed. It appears
- # a windows-only issue. In fact things should be deleted, as well as
- # memory maps closed, once objects go out of scope. For some reason
- # though this is not the case here unless we collect explicitly.
- if not keep:
- gc.collect()
- shutil.rmtree(path)
- # END handle exception
- # END wrapper
-
- wrapper.__name__ = func.__name__
- return wrapper
-
-
-def with_rw_repo(func):
- """Create a copy of our repository and put it into a writable location. It will
- be removed if the test doesn't result in an error.
- As we can currently only copy the fully working tree, tests must not rely on
- being on a certain branch or on anything really except for the default tags
- that should exist
- Wrapped function obtains a git repository """
- def wrapper(self, path):
- src_dir = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
- assert(os.path.isdir(path))
- os.rmdir(path) # created by wrapper, but must not exist for copy operation
- shutil.copytree(src_dir, path)
- target_gitdir = os.path.join(path, '.git')
- assert os.path.isdir(target_gitdir)
- return func(self, PureGitDB(target_gitdir))
- #END wrapper
- wrapper.__name__ = func.__name__
- return with_rw_directory(wrapper)
-
-
-
-def with_packs_rw(func):
- """Function that provides a path into which the packs for testing should be
- copied. Will pass on the path to the actual function afterwards
-
- :note: needs with_rw_directory wrapped around it"""
- def wrapper(self, path):
- src_pack_glob = fixture_path('packs/*')
- copy_files_globbed(src_pack_glob, path, hard_link_ok=True)
- return func(self, path)
- # END wrapper
-
- wrapper.__name__ = func.__name__
- return with_rw_directory(wrapper)
-
-#} END decorators
-
-#{ Routines
-
-def repo_dir():
- """:return: path to our own repository, being our own .git directory.
- :note: doesn't work in bare repositories"""
- base = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), '.git')
- assert os.path.isdir(base)
- return base
-
-
-def maketemp(*args):
- """Wrapper around default tempfile.mktemp to fix an osx issue"""
- tdir = tempfile.mktemp(*args)
- if sys.platform == 'darwin':
- tdir = '/private' + tdir
- return tdir
-
-def fixture_path(relapath=''):
- """:return: absolute path into the fixture directory
- :param relapath: relative path into the fixtures directory, or ''
- to obtain the fixture directory itself"""
- return os.path.join(os.path.dirname(__file__), 'fixtures', relapath)
-
-def copy_files_globbed(source_glob, target_dir, hard_link_ok=False):
- """Copy all files found according to the given source glob into the target directory
- :param hard_link_ok: if True, hard links will be created if possible. Otherwise
- the files will be copied"""
- for src_file in glob.glob(source_glob):
- if hard_link_ok and hasattr(os, 'link'):
- target = os.path.join(target_dir, os.path.basename(src_file))
- try:
- os.link(src_file, target)
- except OSError:
- shutil.copy(src_file, target_dir)
- # END handle cross device links ( and resulting failure )
- else:
- shutil.copy(src_file, target_dir)
- # END try hard link
- # END for each file to copy
-
-
-def make_bytes(size_in_bytes, randomize=False):
- """:return: string with given size in bytes
- :param randomize: try to produce a very random stream"""
- actual_size = size_in_bytes / 4
- producer = xrange(actual_size)
- if randomize:
- producer = list(producer)
- random.shuffle(producer)
- # END randomize
- a = array('i', producer)
- return a.tostring()
-
-def make_object(type, data):
- """:return: bytes resembling an uncompressed object"""
- odata = "blob %i\0" % len(data)
- return odata + data
-
-def make_memory_file(size_in_bytes, randomize=False):
- """:return: tuple(size_of_stream, stream)
- :param randomize: try to produce a very random stream"""
- d = make_bytes(size_in_bytes, randomize)
- return len(d), StringIO(d)
-
-#} END routines
-
-#{ Stream Utilities
-
-class DummyStream(object):
- def __init__(self):
- self.was_read = False
- self.bytes = 0
- self.closed = False
-
- def read(self, size):
- self.was_read = True
- self.bytes = size
-
- def close(self):
- self.closed = True
-
- def _assert(self):
- assert self.was_read
-
-
-class DeriveTest(OStream):
- def __init__(self, sha, type, size, stream, *args, **kwargs):
- self.myarg = kwargs.pop('myarg')
- self.args = args
-
- def _assert(self):
- assert self.args
- assert self.myarg
-
-#} END stream utilitiess
-
-#{ Bases
-
-class TestBase(unittest.TestCase):
- """Base class for all tests"""
- # The non-database specific tests just provides a default pure git database
- rorepo = PureGitDB(repo_dir())
-
-#} END bases
diff --git a/gitdb/test/object/test_blob.py b/gitdb/test/object/test_blob.py
index 661c050..139597f 100644
--- a/gitdb/test/object/test_blob.py
+++ b/gitdb/test/object/test_blob.py
@@ -1,23 +1,2 @@
-# test_blob.py
-# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors
-#
-# This module is part of GitPython and is released under
-# the BSD License: http://www.opensource.org/licenses/bsd-license.php
-from git.test.lib import *
-from git import *
-from gitdb.util import hex_to_bin
-class TestBlob(TestBase):
-
- def test_mime_type_should_return_mime_type_for_known_types(self):
- blob = Blob(self.rorepo, **{'binsha': Blob.NULL_BIN_SHA, 'path': 'foo.png'})
- assert_equal("image/png", blob.mime_type)
-
- def test_mime_type_should_return_text_plain_for_unknown_types(self):
- blob = Blob(self.rorepo, **{'binsha': Blob.NULL_BIN_SHA,'path': 'something'})
- assert_equal("text/plain", blob.mime_type)
-
- def test_nodict(self):
- self.failUnlessRaises(AttributeError, setattr, self.rorepo.tree()['AUTHORS'], 'someattr', 2)
-
diff --git a/gitdb/test/object/test_commit.py b/gitdb/test/object/test_commit.py
index 4a8d8b8..8b13789 100644
--- a/gitdb/test/object/test_commit.py
+++ b/gitdb/test/object/test_commit.py
@@ -1,275 +1 @@
-# -*- coding: utf-8 -*-
-# test_commit.py
-# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors
-#
-# This module is part of GitPython and is released under
-# the BSD License: http://www.opensource.org/licenses/bsd-license.php
-from git.test.lib import *
-from git import *
-from gitdb import IStream
-from gitdb.util import hex_to_bin
-
-from cStringIO import StringIO
-import time
-import sys
-
-
-def assert_commit_serialization(rwrepo, commit_id, print_performance_info=False):
- """traverse all commits in the history of commit identified by commit_id and check
- if the serialization works.
- :param print_performance_info: if True, we will show how fast we are"""
- ns = 0 # num serializations
- nds = 0 # num deserializations
-
- st = time.time()
- for cm in rwrepo.commit(commit_id).traverse():
- nds += 1
-
- # assert that we deserialize commits correctly, hence we get the same
- # sha on serialization
- stream = StringIO()
- cm._serialize(stream)
- ns += 1
- streamlen = stream.tell()
- stream.seek(0)
-
- istream = rwrepo.odb.store(IStream(Commit.type, streamlen, stream))
- assert istream.hexsha == cm.hexsha
-
- nc = Commit(rwrepo, Commit.NULL_BIN_SHA, cm.tree,
- cm.author, cm.authored_date, cm.author_tz_offset,
- cm.committer, cm.committed_date, cm.committer_tz_offset,
- cm.message, cm.parents, cm.encoding)
-
- assert nc.parents == cm.parents
- stream = StringIO()
- nc._serialize(stream)
- ns += 1
- streamlen = stream.tell()
- stream.seek(0)
-
- # reuse istream
- istream.size = streamlen
- istream.stream = stream
- istream.binsha = None
- nc.binsha = rwrepo.odb.store(istream).binsha
-
- # if it worked, we have exactly the same contents !
- assert nc.hexsha == cm.hexsha
- # END check commits
- elapsed = time.time() - st
-
- if print_performance_info:
- print >> sys.stderr, "Serialized %i and deserialized %i commits in %f s ( (%f, %f) commits / s" % (ns, nds, elapsed, ns/elapsed, nds/elapsed)
- # END handle performance info
-
-
-class TestCommit(TestBase):
-
- def test_bake(self):
-
- commit = self.rorepo.commit('2454ae89983a4496a445ce347d7a41c0bb0ea7ae')
- # commits have no dict
- self.failUnlessRaises(AttributeError, setattr, commit, 'someattr', 1)
- commit.author # bake
-
- assert_equal("Sebastian Thiel", commit.author.name)
- assert_equal("byronimo@gmail.com", commit.author.email)
- assert commit.author == commit.committer
- assert isinstance(commit.authored_date, int) and isinstance(commit.committed_date, int)
- assert isinstance(commit.author_tz_offset, int) and isinstance(commit.committer_tz_offset, int)
- assert commit.message == "Added missing information to docstrings of commit and stats module\n"
-
-
- def test_stats(self):
- commit = self.rorepo.commit('33ebe7acec14b25c5f84f35a664803fcab2f7781')
- stats = commit.stats
-
- def check_entries(d):
- assert isinstance(d, dict)
- for key in ("insertions", "deletions", "lines"):
- assert key in d
- # END assertion helper
- assert stats.files
- assert stats.total
-
- check_entries(stats.total)
- assert "files" in stats.total
-
- for filepath, d in stats.files.items():
- check_entries(d)
- # END for each stated file
-
- # assure data is parsed properly
- michael = Actor._from_string("Michael Trier <mtrier@gmail.com>")
- assert commit.author == michael
- assert commit.committer == michael
- assert commit.authored_date == 1210193388
- assert commit.committed_date == 1210193388
- assert commit.author_tz_offset == 14400, commit.author_tz_offset
- assert commit.committer_tz_offset == 14400, commit.committer_tz_offset
- assert commit.message == "initial project\n"
-
- def test_unicode_actor(self):
- # assure we can parse unicode actors correctly
- name = "Üäöß ÄußÉ".decode("utf-8")
- assert len(name) == 9
- special = Actor._from_string(u"%s <something@this.com>" % name)
- assert special.name == name
- assert isinstance(special.name, unicode)
-
- def test_traversal(self):
- start = self.rorepo.commit("a4d06724202afccd2b5c54f81bcf2bf26dea7fff")
- first = self.rorepo.commit("33ebe7acec14b25c5f84f35a664803fcab2f7781")
- p0 = start.parents[0]
- p1 = start.parents[1]
- p00 = p0.parents[0]
- p10 = p1.parents[0]
-
- # basic branch first, depth first
- dfirst = start.traverse(branch_first=False)
- bfirst = start.traverse(branch_first=True)
- assert dfirst.next() == p0
- assert dfirst.next() == p00
-
- assert bfirst.next() == p0
- assert bfirst.next() == p1
- assert bfirst.next() == p00
- assert bfirst.next() == p10
-
- # at some point, both iterations should stop
- assert list(bfirst)[-1] == first
- stoptraverse = self.rorepo.commit("254d04aa3180eb8b8daf7b7ff25f010cd69b4e7d").traverse(as_edge=True)
- l = list(stoptraverse)
- assert len(l[0]) == 2
-
- # ignore self
- assert start.traverse(ignore_self=False).next() == start
-
- # depth
- assert len(list(start.traverse(ignore_self=False, depth=0))) == 1
-
- # prune
- assert start.traverse(branch_first=1, prune=lambda i,d: i==p0).next() == p1
-
- # predicate
- assert start.traverse(branch_first=1, predicate=lambda i,d: i==p1).next() == p1
-
- # traversal should stop when the beginning is reached
- self.failUnlessRaises(StopIteration, first.traverse().next)
-
- # parents of the first commit should be empty ( as the only parent has a null
- # sha )
- assert len(first.parents) == 0
-
- def test_iteration(self):
- # we can iterate commits
- all_commits = Commit.list_items(self.rorepo, self.rorepo.head)
- assert all_commits
- assert all_commits == list(self.rorepo.iter_commits())
-
- # this includes merge commits
- mcomit = self.rorepo.commit('d884adc80c80300b4cc05321494713904ef1df2d')
- assert mcomit in all_commits
-
- # we can limit the result to paths
- ltd_commits = list(self.rorepo.iter_commits(paths='CHANGES'))
- assert ltd_commits and len(ltd_commits) < len(all_commits)
-
- # show commits of multiple paths, resulting in a union of commits
- less_ltd_commits = list(Commit.iter_items(self.rorepo, 'master', paths=('CHANGES', 'AUTHORS')))
- assert len(ltd_commits) < len(less_ltd_commits)
-
- def test_iter_items(self):
- # pretty not allowed
- self.failUnlessRaises(ValueError, Commit.iter_items, self.rorepo, 'master', pretty="raw")
-
- def test_rev_list_bisect_all(self):
- """
- 'git rev-list --bisect-all' returns additional information
- in the commit header. This test ensures that we properly parse it.
- """
- revs = self.rorepo.git.rev_list('933d23bf95a5bd1624fbcdf328d904e1fa173474',
- first_parent=True,
- bisect_all=True)
-
- commits = Commit._iter_from_process_or_stream(self.rorepo, StringProcessAdapter(revs))
- expected_ids = (
- '7156cece3c49544abb6bf7a0c218eb36646fad6d',
- '1f66cfbbce58b4b552b041707a12d437cc5f400a',
- '33ebe7acec14b25c5f84f35a664803fcab2f7781',
- '933d23bf95a5bd1624fbcdf328d904e1fa173474'
- )
- for sha1, commit in zip(expected_ids, commits):
- assert_equal(sha1, commit.hexsha)
-
- def test_count(self):
- assert self.rorepo.tag('refs/tags/0.1.5').commit.count( ) == 143
-
- def test_list(self):
- assert isinstance(Commit.list_items(self.rorepo, '0.1.5', max_count=5)[hex_to_bin('5117c9c8a4d3af19a9958677e45cda9269de1541')], Commit)
-
- def test_str(self):
- commit = Commit(self.rorepo, Commit.NULL_BIN_SHA)
- assert_equal(Commit.NULL_HEX_SHA, str(commit))
-
- def test_repr(self):
- commit = Commit(self.rorepo, Commit.NULL_BIN_SHA)
- assert_equal('<git.Commit "%s">' % Commit.NULL_HEX_SHA, repr(commit))
-
- def test_equality(self):
- commit1 = Commit(self.rorepo, Commit.NULL_BIN_SHA)
- commit2 = Commit(self.rorepo, Commit.NULL_BIN_SHA)
- commit3 = Commit(self.rorepo, "\1"*20)
- assert_equal(commit1, commit2)
- assert_not_equal(commit2, commit3)
-
- def test_iter_parents(self):
- # should return all but ourselves, even if skip is defined
- c = self.rorepo.commit('0.1.5')
- for skip in (0, 1):
- piter = c.iter_parents(skip=skip)
- first_parent = piter.next()
- assert first_parent != c
- assert first_parent == c.parents[0]
- # END for each
-
- def test_base(self):
- name_rev = self.rorepo.head.commit.name_rev
- assert isinstance(name_rev, basestring)
-
- @with_rw_repo('HEAD', bare=True)
- def test_serialization(self, rwrepo):
- # create all commits of our repo
- assert_commit_serialization(rwrepo, '0.1.6')
-
- def test_serialization_unicode_support(self):
- assert Commit.default_encoding.lower() == 'utf-8'
-
- # create a commit with unicode in the message, and the author's name
- # Verify its serialization and deserialization
- cmt = self.rorepo.commit('0.1.6')
- assert isinstance(cmt.message, unicode) # it automatically decodes it as such
- assert isinstance(cmt.author.name, unicode) # same here
-
- cmt.message = "üäêèß".decode("utf-8")
- assert len(cmt.message) == 5
-
- cmt.author.name = "äüß".decode("utf-8")
- assert len(cmt.author.name) == 3
-
- cstream = StringIO()
- cmt._serialize(cstream)
- cstream.seek(0)
- assert len(cstream.getvalue())
-
- ncmt = Commit(self.rorepo, cmt.binsha)
- ncmt._deserialize(cstream)
-
- assert cmt.author.name == ncmt.author.name
- assert cmt.message == ncmt.message
- # actually, it can't be printed in a shell as repr wants to have ascii only
- # it appears
- cmt.author.__repr__()
-
diff --git a/gitdb/test/object/test_tree.py b/gitdb/test/object/test_tree.py
index ec10e96..8b13789 100644
--- a/gitdb/test/object/test_tree.py
+++ b/gitdb/test/object/test_tree.py
@@ -1,144 +1 @@
-# test_tree.py
-# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors
-#
-# This module is part of GitPython and is released under
-# the BSD License: http://www.opensource.org/licenses/bsd-license.php
-import os
-from git.test.lib import *
-from git import *
-from git.objects.fun import (
- traverse_tree_recursive,
- traverse_trees_recursive
- )
-from cStringIO import StringIO
-
-class TestTree(TestBase):
-
- def test_serializable(self):
- # tree at the given commit contains a submodule as well
- roottree = self.rorepo.tree('6c1faef799095f3990e9970bc2cb10aa0221cf9c')
- for item in roottree.traverse(ignore_self=False):
- if item.type != Tree.type:
- continue
- # END skip non-trees
- tree = item
- # trees have no dict
- self.failUnlessRaises(AttributeError, setattr, tree, 'someattr', 1)
-
- orig_data = tree.data_stream.read()
- orig_cache = tree._cache
-
- stream = StringIO()
- tree._serialize(stream)
- assert stream.getvalue() == orig_data
-
- stream.seek(0)
- testtree = Tree(self.rorepo, Tree.NULL_BIN_SHA, 0, '')
- testtree._deserialize(stream)
- assert testtree._cache == orig_cache
-
-
- # TEST CACHE MUTATOR
- mod = testtree.cache
- self.failUnlessRaises(ValueError, mod.add, "invalid sha", 0, "name")
- self.failUnlessRaises(ValueError, mod.add, Tree.NULL_HEX_SHA, 0, "invalid mode")
- self.failUnlessRaises(ValueError, mod.add, Tree.NULL_HEX_SHA, tree.mode, "invalid/name")
-
- # add new item
- name = "fake_dir"
- mod.add(testtree.NULL_HEX_SHA, tree.mode, name)
- assert name in testtree
-
- # its available in the tree immediately
- assert isinstance(testtree[name], Tree)
-
- # adding it again will not cause multiple of them to be presents
- cur_count = len(testtree)
- mod.add(testtree.NULL_HEX_SHA, tree.mode, name)
- assert len(testtree) == cur_count
-
- # fails with a different sha - name exists
- hexsha = "1"*40
- self.failUnlessRaises(ValueError, mod.add, hexsha, tree.mode, name)
-
- # force it - replace existing one
- mod.add(hexsha, tree.mode, name, force=True)
- assert testtree[name].hexsha == hexsha
- assert len(testtree) == cur_count
-
- # unchecked addition always works, even with invalid items
- invalid_name = "hi/there"
- mod.add_unchecked(hexsha, 0, invalid_name)
- assert len(testtree) == cur_count + 1
-
- del(mod[invalid_name])
- assert len(testtree) == cur_count
- # del again, its fine
- del(mod[invalid_name])
-
- # have added one item, we are done
- mod.set_done()
- mod.set_done() # multiple times are okay
-
- # serialize, its different now
- stream = StringIO()
- testtree._serialize(stream)
- stream.seek(0)
- assert stream.getvalue() != orig_data
-
- # replaces cache, but we make sure of it
- del(testtree._cache)
- testtree._deserialize(stream)
- assert name in testtree
- assert invalid_name not in testtree
- # END for each item in tree
-
- def test_traverse(self):
- root = self.rorepo.tree('0.1.6')
- num_recursive = 0
- all_items = list()
- for obj in root.traverse():
- if "/" in obj.path:
- num_recursive += 1
-
- assert isinstance(obj, (Blob, Tree))
- all_items.append(obj)
- # END for each object
- assert all_items == root.list_traverse()
-
- # limit recursion level to 0 - should be same as default iteration
- assert all_items
- assert 'CHANGES' in root
- assert len(list(root)) == len(list(root.traverse(depth=1)))
-
- # only choose trees
- trees_only = lambda i,d: i.type == "tree"
- trees = list(root.traverse(predicate = trees_only))
- assert len(trees) == len(list( i for i in root.traverse() if trees_only(i,0) ))
-
- # test prune
- lib_folder = lambda t,d: t.path == "lib"
- pruned_trees = list(root.traverse(predicate = trees_only,prune = lib_folder))
- assert len(pruned_trees) < len(trees)
-
- # trees and blobs
- assert len(set(trees)|set(root.trees)) == len(trees)
- assert len(set(b for b in root if isinstance(b, Blob)) | set(root.blobs)) == len( root.blobs )
- subitem = trees[0][0]
- assert "/" in subitem.path
- assert subitem.name == os.path.basename(subitem.path)
-
- # assure that at some point the traversed paths have a slash in them
- found_slash = False
- for item in root.traverse():
- assert os.path.isabs(item.abspath)
- if '/' in item.path:
- found_slash = True
- # END check for slash
-
- # slashes in paths are supported as well
- assert root[item.path] == item == root/item.path
- # END for each item
- assert found_slash
-
diff --git a/gitdb/test/performance/lib.py b/gitdb/test/performance/lib.py
index 761113d..cb6303f 100644
--- a/gitdb/test/performance/lib.py
+++ b/gitdb/test/performance/lib.py
@@ -3,52 +3,4 @@
# This module is part of GitDB and is released under
# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
"""Contains library functions"""
-import os
-from gitdb.test.lib import *
-import shutil
-import tempfile
-
-#{ Invvariants
-k_env_git_repo = "GITDB_TEST_GIT_REPO_BASE"
-#} END invariants
-
-
-#{ Utilities
-def resolve_or_fail(env_var):
- """:return: resolved environment variable or raise EnvironmentError"""
- try:
- return os.environ[env_var]
- except KeyError:
- raise EnvironmentError("Please set the %r envrionment variable and retry" % env_var)
- # END exception handling
-
-#} END utilities
-
-
-#{ Base Classes
-
-class TestBigRepoR(TestBase):
- """TestCase providing access to readonly 'big' repositories using the following
- member variables:
-
- * gitrepopath
-
- * read-only base path of the git source repository, i.e. .../git/.git"""
-
- #{ Invariants
- head_sha_2k = '235d521da60e4699e5bd59ac658b5b48bd76ddca'
- head_sha_50 = '32347c375250fd470973a5d76185cac718955fd5'
- #} END invariants
-
- @classmethod
- def setUpAll(cls):
- try:
- super(TestBigRepoR, cls).setUpAll()
- except AttributeError:
- pass
- cls.gitrepopath = resolve_or_fail(k_env_git_repo)
- assert cls.gitrepopath.endswith('.git')
-
-
-#} END base classes
diff --git a/gitdb/test/performance/test_pack.py b/gitdb/test/performance/test_pack.py
index da952b1..8b13789 100644
--- a/gitdb/test/performance/test_pack.py
+++ b/gitdb/test/performance/test_pack.py
@@ -1,90 +1 @@
-# Copyright (C) 2010, 2011 Sebastian Thiel (byronimo@gmail.com) and contributors
-#
-# This module is part of GitDB and is released under
-# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
-"""Performance tests for object store"""
-from lib import (
- TestBigRepoR
- )
-from gitdb.exc import UnsupportedOperation
-from gitdb.db.pack import PackedDB
-
-import sys
-import os
-from time import time
-import random
-
-class TestPackedDBPerformance(TestBigRepoR):
-
- def _test_pack_random_access(self):
- pdb = PackedDB(os.path.join(self.gitrepopath, "objects/pack"))
-
- # sha lookup
- st = time()
- sha_list = list(pdb.sha_iter())
- elapsed = time() - st
- ns = len(sha_list)
- print >> sys.stderr, "PDB: looked up %i shas by index in %f s ( %f shas/s )" % (ns, elapsed, ns / elapsed)
-
- # sha lookup: best-case and worst case access
- pdb_pack_info = pdb._pack_info
- # END shuffle shas
- st = time()
- for sha in sha_list:
- pdb_pack_info(sha)
- # END for each sha to look up
- elapsed = time() - st
-
- # discard cache
- del(pdb._entities)
- pdb.entities()
- print >> sys.stderr, "PDB: looked up %i sha in %i packs in %f s ( %f shas/s )" % (ns, len(pdb.entities()), elapsed, ns / elapsed)
- # END for each random mode
-
- # query info and streams only
- max_items = 10000 # can wait longer when testing memory
- for pdb_fun in (pdb.info, pdb.stream):
- st = time()
- for sha in sha_list[:max_items]:
- pdb_fun(sha)
- elapsed = time() - st
- print >> sys.stderr, "PDB: Obtained %i object %s by sha in %f s ( %f items/s )" % (max_items, pdb_fun.__name__.upper(), elapsed, max_items / elapsed)
- # END for each function
-
- # retrieve stream and read all
- max_items = 5000
- pdb_stream = pdb.stream
- total_size = 0
- st = time()
- for sha in sha_list[:max_items]:
- stream = pdb_stream(sha)
- stream.read()
- total_size += stream.size
- elapsed = time() - st
- total_kib = total_size / 1000
- print >> sys.stderr, "PDB: Obtained %i streams by sha and read all bytes totallying %i KiB ( %f KiB / s ) in %f s ( %f streams/s )" % (max_items, total_kib, total_kib/elapsed , elapsed, max_items / elapsed)
-
- def test_correctness(self):
- pdb = PackedDB(os.path.join(self.gitrepopath, "objects/pack"))
- # disabled for now as it used to work perfectly, checking big repositories takes a long time
- print >> sys.stderr, "Endurance run: verify streaming of objects (crc and sha)"
- for crc in range(2):
- count = 0
- st = time()
- for entity in pdb.entities():
- pack_verify = entity.is_valid_stream
- sha_by_index = entity.index().sha
- for index in xrange(entity.index().size()):
- try:
- assert pack_verify(sha_by_index(index), use_crc=crc)
- count += 1
- except UnsupportedOperation:
- pass
- # END ignore old indices
- # END for each index
- # END for each entity
- elapsed = time() - st
- print >> sys.stderr, "PDB: verified %i objects (crc=%i) in %f s ( %f objects/s )" % (count, crc, elapsed, count / elapsed)
- # END for each verify mode
-
diff --git a/gitdb/test/performance/test_stream.py b/gitdb/test/performance/test_stream.py
index 0e47484..d64346e 100644
--- a/gitdb/test/performance/test_stream.py
+++ b/gitdb/test/performance/test_stream.py
@@ -3,193 +3,4 @@
# This module is part of GitDB and is released under
# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
"""Performance data streaming performance"""
-from lib import TestBigRepoR
-from gitdb.db.py import *
-from gitdb.base import *
-from gitdb.stream import *
-from gitdb.util import (
- pool,
- bin_to_hex
- )
-from gitdb.typ import str_blob_type
-from gitdb.fun import chunk_size
-from async import (
- IteratorReader,
- ChannelThreadTask,
- )
-
-from cStringIO import StringIO
-from time import time
-import os
-import sys
-import stat
-import subprocess
-
-
-from lib import (
- TestBigRepoR,
- make_memory_file,
- with_rw_directory
- )
-
-
-#{ Utilities
-def read_chunked_stream(stream):
- total = 0
- while True:
- chunk = stream.read(chunk_size)
- total += len(chunk)
- if len(chunk) < chunk_size:
- break
- # END read stream loop
- assert total == stream.size
- return stream
-
-
-class TestStreamReader(ChannelThreadTask):
- """Expects input streams and reads them in chunks. It will read one at a time,
- requireing a queue chunk of size 1"""
- def __init__(self, *args):
- super(TestStreamReader, self).__init__(*args)
- self.fun = read_chunked_stream
- self.max_chunksize = 1
-
-
-#} END utilities
-
-class TestObjDBPerformance(TestBigRepoR):
-
- large_data_size_bytes = 1000*1000*50 # some MiB should do it
- moderate_data_size_bytes = 1000*1000*1 # just 1 MiB
-
- @with_rw_directory
- def test_large_data_streaming(self, path):
- ldb = PureLooseObjectODB(path)
- string_ios = list() # list of streams we previously created
-
- # serial mode
- for randomize in range(2):
- desc = (randomize and 'random ') or ''
- print >> sys.stderr, "Creating %s data ..." % desc
- st = time()
- size, stream = make_memory_file(self.large_data_size_bytes, randomize)
- elapsed = time() - st
- print >> sys.stderr, "Done (in %f s)" % elapsed
- string_ios.append(stream)
-
- # writing - due to the compression it will seem faster than it is
- st = time()
- sha = ldb.store(IStream('blob', size, stream)).binsha
- elapsed_add = time() - st
- assert ldb.has_object(sha)
- db_file = ldb.readable_db_object_path(bin_to_hex(sha))
- fsize_kib = os.path.getsize(db_file) / 1000
-
-
- size_kib = size / 1000
- print >> sys.stderr, "Added %i KiB (filesize = %i KiB) of %s data to loose odb in %f s ( %f Write KiB / s)" % (size_kib, fsize_kib, desc, elapsed_add, size_kib / elapsed_add)
-
- # reading all at once
- st = time()
- ostream = ldb.stream(sha)
- shadata = ostream.read()
- elapsed_readall = time() - st
-
- stream.seek(0)
- assert shadata == stream.getvalue()
- print >> sys.stderr, "Read %i KiB of %s data at once from loose odb in %f s ( %f Read KiB / s)" % (size_kib, desc, elapsed_readall, size_kib / elapsed_readall)
-
-
- # reading in chunks of 1 MiB
- cs = 512*1000
- chunks = list()
- st = time()
- ostream = ldb.stream(sha)
- while True:
- data = ostream.read(cs)
- chunks.append(data)
- if len(data) < cs:
- break
- # END read in chunks
- elapsed_readchunks = time() - st
-
- stream.seek(0)
- assert ''.join(chunks) == stream.getvalue()
-
- cs_kib = cs / 1000
- print >> sys.stderr, "Read %i KiB of %s data in %i KiB chunks from loose odb in %f s ( %f Read KiB / s)" % (size_kib, desc, cs_kib, elapsed_readchunks, size_kib / elapsed_readchunks)
-
- # del db file so we keep something to do
- os.remove(db_file)
- # END for each randomization factor
-
-
- # multi-threaded mode
- # want two, should be supported by most of todays cpus
- pool.set_size(2)
- total_kib = 0
- nsios = len(string_ios)
- for stream in string_ios:
- stream.seek(0)
- total_kib += len(stream.getvalue()) / 1000
- # END rewind
-
- def istream_iter():
- for stream in string_ios:
- stream.seek(0)
- yield IStream(str_blob_type, len(stream.getvalue()), stream)
- # END for each stream
- # END util
-
- # write multiple objects at once, involving concurrent compression
- reader = IteratorReader(istream_iter())
- istream_reader = ldb.store_async(reader)
- istream_reader.task().max_chunksize = 1
-
- st = time()
- istreams = istream_reader.read(nsios)
- assert len(istreams) == nsios
- elapsed = time() - st
-
- print >> sys.stderr, "Threads(%i): Compressed %i KiB of data in loose odb in %f s ( %f Write KiB / s)" % (pool.size(), total_kib, elapsed, total_kib / elapsed)
-
- # decompress multiple at once, by reading them
- # chunk size is not important as the stream will not really be decompressed
-
- # until its read
- istream_reader = IteratorReader(iter([ i.binsha for i in istreams ]))
- ostream_reader = ldb.stream_async(istream_reader)
-
- chunk_task = TestStreamReader(ostream_reader, "chunker", None)
- output_reader = pool.add_task(chunk_task)
- output_reader.task().max_chunksize = 1
-
- st = time()
- assert len(output_reader.read(nsios)) == nsios
- elapsed = time() - st
-
- print >> sys.stderr, "Threads(%i): Decompressed %i KiB of data in loose odb in %f s ( %f Read KiB / s)" % (pool.size(), total_kib, elapsed, total_kib / elapsed)
-
- # store the files, and read them back. For the reading, we use a task
- # as well which is chunked into one item per task. Reading all will
- # very quickly result in two threads handling two bytestreams of
- # chained compression/decompression streams
- reader = IteratorReader(istream_iter())
- istream_reader = ldb.store_async(reader)
- istream_reader.task().max_chunksize = 1
-
- istream_to_sha = lambda items: [ i.binsha for i in items ]
- istream_reader.set_post_cb(istream_to_sha)
-
- ostream_reader = ldb.stream_async(istream_reader)
-
- chunk_task = TestStreamReader(ostream_reader, "chunker", None)
- output_reader = pool.add_task(chunk_task)
- output_reader.max_chunksize = 1
-
- st = time()
- assert len(output_reader.read(nsios)) == nsios
- elapsed = time() - st
-
- print >> sys.stderr, "Threads(%i): Compressed and decompressed and read %i KiB of data in loose odb in %f s ( %f Combined KiB / s)" % (pool.size(), total_kib, elapsed, total_kib / elapsed)
diff --git a/gitdb/test/test_base.py b/gitdb/test/test_base.py
index 1b20faf..fbe0135 100644
--- a/gitdb/test/test_base.py
+++ b/gitdb/test/test_base.py
@@ -3,96 +3,5 @@
# This module is part of GitDB and is released under
# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
"""Test for object db"""
-from lib import (
- TestBase,
- DummyStream,
- DeriveTest,
- )
-from gitdb import *
-from gitdb.util import (
- NULL_BIN_SHA
- )
-from gitdb.typ import (
- str_blob_type
- )
-
-
-class TestBaseTypes(TestBase):
-
- def test_streams(self):
- # test info
- sha = NULL_BIN_SHA
- s = 20
- blob_id = 3
-
- info = OInfo(sha, str_blob_type, s)
- assert info.binsha == sha
- assert info.type == str_blob_type
- assert info.type_id == blob_id
- assert info.size == s
-
- # test pack info
- # provides type_id
- pinfo = OPackInfo(0, blob_id, s)
- assert pinfo.type == str_blob_type
- assert pinfo.type_id == blob_id
- assert pinfo.pack_offset == 0
-
- dpinfo = ODeltaPackInfo(0, blob_id, s, sha)
- assert dpinfo.type == str_blob_type
- assert dpinfo.type_id == blob_id
- assert dpinfo.delta_info == sha
- assert dpinfo.pack_offset == 0
-
-
- # test ostream
- stream = DummyStream()
- ostream = OStream(*(info + (stream, )))
- assert ostream.stream is stream
- ostream.read(15)
- stream._assert()
- assert stream.bytes == 15
- ostream.read(20)
- assert stream.bytes == 20
-
- # test packstream
- postream = OPackStream(*(pinfo + (stream, )))
- assert postream.stream is stream
- postream.read(10)
- stream._assert()
- assert stream.bytes == 10
-
- # test deltapackstream
- dpostream = ODeltaPackStream(*(dpinfo + (stream, )))
- dpostream.stream is stream
- dpostream.read(5)
- stream._assert()
- assert stream.bytes == 5
-
- # derive with own args
- DeriveTest(sha, str_blob_type, s, stream, 'mine',myarg = 3)._assert()
-
- # test istream
- istream = IStream(str_blob_type, s, stream)
- assert istream.binsha == None
- istream.binsha = sha
- assert istream.binsha == sha
-
- assert len(istream.binsha) == 20
- assert len(istream.hexsha) == 40
-
- assert istream.size == s
- istream.size = s * 2
- istream.size == s * 2
- assert istream.type == str_blob_type
- istream.type = "something"
- assert istream.type == "something"
- assert istream.stream is stream
- istream.stream = None
- assert istream.stream is None
-
- assert istream.error is None
- istream.error = Exception()
- assert isinstance(istream.error, Exception)
diff --git a/gitdb/test/test_config.py b/gitdb/test/test_config.py
index 205f3c2..8b13789 100644
--- a/gitdb/test/test_config.py
+++ b/gitdb/test/test_config.py
@@ -1,102 +1 @@
-# test_config.py
-# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors
-#
-# This module is part of GitPython and is released under
-# the BSD License: http://www.opensource.org/licenses/bsd-license.php
-from gitdb.test.lib import *
-from gitdb.config import *
-import StringIO
-from copy import copy
-from ConfigParser import NoSectionError
-
-class TestBase(TestBase):
-
- def _to_memcache(self, file_path):
- fp = open(file_path, "r")
- sio = StringIO.StringIO(fp.read())
- sio.name = file_path
- return sio
-
- def _parsers_equal_or_raise(self, lhs, rhs):
- pass
-
- def test_read_write(self):
- # writer must create the exact same file as the one read before
- for filename in ("git_config", "git_config_global"):
- file_obj = self._to_memcache(fixture_path(filename))
- file_obj_orig = copy(file_obj)
- w_config = GitConfigParser(file_obj, read_only = False)
- w_config.read() # enforce reading
- assert w_config._sections
- w_config.write() # enforce writing
- assert file_obj.getvalue() == file_obj_orig.getvalue()
-
- # creating an additional config writer must fail due to exclusive access
- self.failUnlessRaises(IOError, GitConfigParser, file_obj, read_only = False)
-
- # should still have a lock and be able to make changes
- assert w_config._lock._has_lock()
-
- # changes should be written right away
- sname = "my_section"
- oname = "mykey"
- val = "myvalue"
- w_config.add_section(sname)
- assert w_config.has_section(sname)
- w_config.set(sname, oname, val)
- assert w_config.has_option(sname,oname)
- assert w_config.get(sname, oname) == val
-
- sname_new = "new_section"
- oname_new = "new_key"
- ival = 10
- w_config.set_value(sname_new, oname_new, ival)
- assert w_config.get_value(sname_new, oname_new) == ival
-
- file_obj.seek(0)
- r_config = GitConfigParser(file_obj, read_only=True)
- assert r_config.has_section(sname)
- assert r_config.has_option(sname, oname)
- assert r_config.get(sname, oname) == val
-
- # END for each filename
-
- def test_base(self):
- path_repo = fixture_path("git_config")
- path_global = fixture_path("git_config_global")
- r_config = GitConfigParser([path_repo, path_global], read_only=True)
- assert r_config.read_only
- num_sections = 0
- num_options = 0
-
- # test reader methods
- assert r_config._is_initialized == False
- for section in r_config.sections():
- num_sections += 1
- for option in r_config.options(section):
- num_options += 1
- val = r_config.get(section, option)
- val_typed = r_config.get_value(section, option)
- assert isinstance(val_typed, (bool, long, float, basestring))
- assert val
- assert "\n" not in option
- assert "\n" not in val
-
- # writing must fail
- self.failUnlessRaises(IOError, r_config.set, section, option, None)
- self.failUnlessRaises(IOError, r_config.remove_option, section, option )
- # END for each option
- self.failUnlessRaises(IOError, r_config.remove_section, section)
- # END for each section
- assert num_sections and num_options
- assert r_config._is_initialized == True
-
- # get value which doesnt exist, with default
- default = "my default value"
- assert r_config.get_value("doesnt", "exist", default) == default
-
- # it raises if there is no default though
- self.failUnlessRaises(NoSectionError, r_config.get_value, "doesnt", "exist")
-
-
diff --git a/gitdb/test/test_example.py b/gitdb/test/test_example.py
index c2e7840..8b13789 100644
--- a/gitdb/test/test_example.py
+++ b/gitdb/test/test_example.py
@@ -1,64 +1 @@
-# Copyright (C) 2010, 2011 Sebastian Thiel (byronimo@gmail.com) and contributors
-#
-# This module is part of GitDB and is released under
-# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
-"""Module with examples from the tutorial section of the docs"""
-from lib import *
-from gitdb import IStream
-from gitdb.db.py import PureLooseObjectODB
-from gitdb.util import pool
-
-from cStringIO import StringIO
-from async import IteratorReader
-
-class TestExamples(TestBase):
-
- def test_base(self):
- ldb = PureLooseObjectODB(fixture_path("../../../.git/objects"))
-
- for sha1 in ldb.sha_iter():
- oinfo = ldb.info(sha1)
- ostream = ldb.stream(sha1)
- assert oinfo[:3] == ostream[:3]
-
- assert len(ostream.read()) == ostream.size
- assert ldb.has_object(oinfo.binsha)
- # END for each sha in database
- # assure we close all files
- try:
- del(ostream)
- del(oinfo)
- except UnboundLocalError:
- pass
- # END ignore exception if there are no loose objects
-
- data = "my data"
- istream = IStream("blob", len(data), StringIO(data))
-
- # the object does not yet have a sha
- assert istream.binsha is None
- ldb.store(istream)
- # now the sha is set
- assert len(istream.binsha) == 20
- assert ldb.has_object(istream.binsha)
-
-
- # async operation
- # Create a reader from an iterator
- reader = IteratorReader(ldb.sha_iter())
-
- # get reader for object streams
- info_reader = ldb.stream_async(reader)
-
- # read one
- info = info_reader.read(1)[0]
-
- # read all the rest until depletion
- ostreams = info_reader.read()
-
- # set the pool to use two threads
- pool.set_size(2)
-
- # synchronize the mode of operation
- pool.set_size(0)
diff --git a/gitdb/test/test_pack.py b/gitdb/test/test_pack.py
index 4a7f1ca..8b13789 100644
--- a/gitdb/test/test_pack.py
+++ b/gitdb/test/test_pack.py
@@ -1,247 +1 @@
-# Copyright (C) 2010, 2011 Sebastian Thiel (byronimo@gmail.com) and contributors
-#
-# This module is part of GitDB and is released under
-# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
-"""Test everything about packs reading and writing"""
-from lib import (
- TestBase,
- with_rw_directory,
- with_packs_rw,
- fixture_path
- )
-from gitdb.stream import DeltaApplyReader
-from gitdb.pack import (
- PackEntity,
- PackIndexFile,
- PackFile
- )
-
-from gitdb.base import (
- OInfo,
- OStream,
- )
-
-from gitdb.fun import delta_types
-from gitdb.exc import UnsupportedOperation
-from gitdb.util import to_bin_sha
-from itertools import izip, chain
-from nose import SkipTest
-
-import os
-import sys
-import tempfile
-
-
-#{ Utilities
-def bin_sha_from_filename(filename):
- return to_bin_sha(os.path.splitext(os.path.basename(filename))[0][5:])
-#} END utilities
-
-class TestPack(TestBase):
-
- packindexfile_v1 = (fixture_path('packs/pack-c0438c19fb16422b6bbcce24387b3264416d485b.idx'), 1, 67)
- packindexfile_v2 = (fixture_path('packs/pack-11fdfa9e156ab73caae3b6da867192221f2089c2.idx'), 2, 30)
- packindexfile_v2_3_ascii = (fixture_path('packs/pack-a2bf8e71d8c18879e499335762dd95119d93d9f1.idx'), 2, 42)
- packfile_v2_1 = (fixture_path('packs/pack-c0438c19fb16422b6bbcce24387b3264416d485b.pack'), 2, packindexfile_v1[2])
- packfile_v2_2 = (fixture_path('packs/pack-11fdfa9e156ab73caae3b6da867192221f2089c2.pack'), 2, packindexfile_v2[2])
- packfile_v2_3_ascii = (fixture_path('packs/pack-a2bf8e71d8c18879e499335762dd95119d93d9f1.pack'), 2, packindexfile_v2_3_ascii[2])
-
-
- def _assert_index_file(self, index, version, size):
- assert index.packfile_checksum() != index.indexfile_checksum()
- assert len(index.packfile_checksum()) == 20
- assert len(index.indexfile_checksum()) == 20
- assert index.version() == version
- assert index.size() == size
- assert len(index.offsets()) == size
-
- # get all data of all objects
- for oidx in xrange(index.size()):
- sha = index.sha(oidx)
- assert oidx == index.sha_to_index(sha)
-
- entry = index.entry(oidx)
- assert len(entry) == 3
-
- assert entry[0] == index.offset(oidx)
- assert entry[1] == sha
- assert entry[2] == index.crc(oidx)
-
- # verify partial sha
- for l in (4,8,11,17,20):
- assert index.partial_sha_to_index(sha[:l], l*2) == oidx
-
- # END for each object index in indexfile
- self.failUnlessRaises(ValueError, index.partial_sha_to_index, "\0", 2)
-
-
- def _assert_pack_file(self, pack, version, size):
- assert pack.version() == 2
- assert pack.size() == size
- assert len(pack.checksum()) == 20
-
- num_obj = 0
- for obj in pack.stream_iter():
- num_obj += 1
- info = pack.info(obj.pack_offset)
- stream = pack.stream(obj.pack_offset)
-
- assert info.pack_offset == stream.pack_offset
- assert info.type_id == stream.type_id
- assert hasattr(stream, 'read')
-
- # it should be possible to read from both streams
- assert obj.read() == stream.read()
-
- streams = pack.collect_streams(obj.pack_offset)
- assert streams
-
- # read the stream
- try:
- dstream = DeltaApplyReader.new(streams)
- except ValueError:
- # ignore these, old git versions use only ref deltas,
- # which we havent resolved ( as we are without an index )
- # Also ignore non-delta streams
- continue
- # END get deltastream
-
- # read all
- data = dstream.read()
- assert len(data) == dstream.size
-
- # test seek
- dstream.seek(0)
- assert dstream.read() == data
-
-
- # read chunks
- # NOTE: the current implementation is safe, it basically transfers
- # all calls to the underlying memory map
-
- # END for each object
- assert num_obj == size
-
-
- def test_pack_index(self):
- # check version 1 and 2
- for indexfile, version, size in (self.packindexfile_v1, self.packindexfile_v2):
- index = PackIndexFile(indexfile)
- self._assert_index_file(index, version, size)
- # END run tests
-
- def test_pack(self):
- # there is this special version 3, but apparently its like 2 ...
- for packfile, version, size in (self.packfile_v2_3_ascii, self.packfile_v2_1, self.packfile_v2_2):
- pack = PackFile(packfile)
- self._assert_pack_file(pack, version, size)
- # END for each pack to test
-
- @with_rw_directory
- def test_pack_entity(self, rw_dir):
- pack_objs = list()
- for packinfo, indexinfo in ( (self.packfile_v2_1, self.packindexfile_v1),
- (self.packfile_v2_2, self.packindexfile_v2),
- (self.packfile_v2_3_ascii, self.packindexfile_v2_3_ascii)):
- packfile, version, size = packinfo
- indexfile, version, size = indexinfo
- entity = PackEntity(packfile)
- assert entity.pack().path() == packfile
- assert entity.index().path() == indexfile
- pack_objs.extend(entity.stream_iter())
-
- count = 0
- for info, stream in izip(entity.info_iter(), entity.stream_iter()):
- count += 1
- assert info.binsha == stream.binsha
- assert len(info.binsha) == 20
- assert info.type_id == stream.type_id
- assert info.size == stream.size
-
- # we return fully resolved items, which is implied by the sha centric access
- assert not info.type_id in delta_types
-
- # try all calls
- assert len(entity.collect_streams(info.binsha))
- oinfo = entity.info(info.binsha)
- assert isinstance(oinfo, OInfo)
- assert oinfo.binsha is not None
- ostream = entity.stream(info.binsha)
- assert isinstance(ostream, OStream)
- assert ostream.binsha is not None
-
- # verify the stream
- try:
- assert entity.is_valid_stream(info.binsha, use_crc=True)
- except UnsupportedOperation:
- pass
- # END ignore version issues
- assert entity.is_valid_stream(info.binsha, use_crc=False)
- # END for each info, stream tuple
- assert count == size
-
- # END for each entity
-
- # pack writing - write all packs into one
- # index path can be None
- pack_path = tempfile.mktemp('', "pack", rw_dir)
- index_path = tempfile.mktemp('', 'index', rw_dir)
- iteration = 0
- def rewind_streams():
- for obj in pack_objs:
- obj.stream.seek(0)
- #END utility
- for ppath, ipath, num_obj in zip((pack_path, )*2, (index_path, None), (len(pack_objs), None)):
- pfile = open(ppath, 'wb')
- iwrite = None
- if ipath:
- ifile = open(ipath, 'wb')
- iwrite = ifile.write
- #END handle ip
-
- # make sure we rewind the streams ... we work on the same objects over and over again
- if iteration > 0:
- rewind_streams()
- #END rewind streams
- iteration += 1
-
- pack_sha, index_sha = PackEntity.write_pack(pack_objs, pfile.write, iwrite, object_count=num_obj)
- pfile.close()
- assert os.path.getsize(ppath) > 100
-
- # verify pack
- pf = PackFile(ppath)
- assert pf.size() == len(pack_objs)
- assert pf.version() == PackFile.pack_version_default
- assert pf.checksum() == pack_sha
-
- # verify index
- if ipath is not None:
- ifile.close()
- assert os.path.getsize(ipath) > 100
- idx = PackIndexFile(ipath)
- assert idx.version() == PackIndexFile.index_version_default
- assert idx.packfile_checksum() == pack_sha
- assert idx.indexfile_checksum() == index_sha
- assert idx.size() == len(pack_objs)
- #END verify files exist
- #END for each packpath, indexpath pair
-
- # verify the packs throughly
- rewind_streams()
- entity = PackEntity.create(pack_objs, rw_dir)
- count = 0
- for info in entity.info_iter():
- count += 1
- for use_crc in range(2):
- assert entity.is_valid_stream(info.binsha, use_crc)
- # END for each crc mode
- #END for each info
- assert count == len(pack_objs)
-
-
- def test_pack_64(self):
- # TODO: hex-edit a pack helping us to verify that we can handle 64 byte offsets
- # of course without really needing such a huge pack
- raise SkipTest()
diff --git a/gitdb/test/test_refs.py b/gitdb/test/test_refs.py
index 2c571b7..649542f 100644
--- a/gitdb/test/test_refs.py
+++ b/gitdb/test/test_refs.py
@@ -158,26 +158,25 @@ class TestRefs(TestBase):
cur_head = HEAD(rw_repo)
old_head_commit = cur_head.commit
new_head_commit = cur_head.ref.commit.parents[0]
- if False: #TODO get reset checking back into the game
- cur_head.reset(new_head_commit, index=True) # index only
- assert cur_head.reference.commit == new_head_commit
-
- self.failUnlessRaises(ValueError, cur_head.reset, new_head_commit, index=False, working_tree=True)
- new_head_commit = new_head_commit.parents[0]
- cur_head.reset(new_head_commit, index=True, working_tree=True) # index + wt
- assert cur_head.reference.commit == new_head_commit
-
- # paths - make sure we have something to do
- rw_repo.index.reset(old_head_commit.parents[0])
- cur_head.reset(cur_head, paths = "test")
- cur_head.reset(new_head_commit, paths = "lib")
- # hard resets with paths don't work, its all or nothing
- self.failUnlessRaises(GitCommandError, cur_head.reset, new_head_commit, working_tree=True, paths = "lib")
-
- # we can do a mixed reset, and then checkout from the index though
- cur_head.reset(new_head_commit)
- rw_repo.index.checkout(["lib"], force=True)#
- #END ignore block
+
+ cur_head.reset(new_head_commit, index=True) # index only
+ assert cur_head.reference.commit == new_head_commit
+
+ self.failUnlessRaises(ValueError, cur_head.reset, new_head_commit, index=False, working_tree=True)
+ new_head_commit = new_head_commit.parents[0]
+ cur_head.reset(new_head_commit, index=True, working_tree=True) # index + wt
+ assert cur_head.reference.commit == new_head_commit
+
+ # paths - make sure we have something to do
+ rw_repo.index.reset(old_head_commit.parents[0])
+ cur_head.reset(cur_head, paths = "test")
+ cur_head.reset(new_head_commit, paths = "lib")
+ # hard resets with paths don't work, its all or nothing
+ self.failUnlessRaises(GitCommandError, cur_head.reset, new_head_commit, working_tree=True, paths = "lib")
+
+ # we can do a mixed reset, and then checkout from the index though
+ cur_head.reset(new_head_commit)
+ rw_repo.index.checkout(["lib"], force=True)#
# now that we have a write write repo, change the HEAD reference - its
# like git-reset --soft
diff --git a/gitdb/test/test_util.py b/gitdb/test/test_util.py
index 0847ab5..53e2569 100644
--- a/gitdb/test/test_util.py
+++ b/gitdb/test/test_util.py
@@ -3,131 +3,8 @@
# This module is part of GitDB and is released under
# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
"""Test for object db"""
-import tempfile
-import os
-from lib import TestBase
-from gitdb.util import (
- to_hex_sha,
- to_bin_sha,
- NULL_HEX_SHA,
- LockedFD,
- Actor
- )
class TestUtils(TestBase):
- def test_basics(self):
- assert to_hex_sha(NULL_HEX_SHA) == NULL_HEX_SHA
- assert len(to_bin_sha(NULL_HEX_SHA)) == 20
- assert to_hex_sha(to_bin_sha(NULL_HEX_SHA)) == NULL_HEX_SHA
-
- def _cmp_contents(self, file_path, data):
- # raise if data from file at file_path
- # does not match data string
- fp = open(file_path, "rb")
- try:
- assert fp.read() == data
- finally:
- fp.close()
-
- def test_lockedfd(self):
- my_file = tempfile.mktemp()
- orig_data = "hello"
- new_data = "world"
- my_file_fp = open(my_file, "wb")
- my_file_fp.write(orig_data)
- my_file_fp.close()
-
- try:
- lfd = LockedFD(my_file)
- lockfilepath = lfd._lockfilepath()
-
- # cannot end before it was started
- self.failUnlessRaises(AssertionError, lfd.rollback)
- self.failUnlessRaises(AssertionError, lfd.commit)
-
- # open for writing
- assert not os.path.isfile(lockfilepath)
- wfd = lfd.open(write=True)
- assert lfd._fd is wfd
- assert os.path.isfile(lockfilepath)
-
- # write data and fail
- os.write(wfd, new_data)
- lfd.rollback()
- assert lfd._fd is None
- self._cmp_contents(my_file, orig_data)
- assert not os.path.isfile(lockfilepath)
-
- # additional call doesnt fail
- lfd.commit()
- lfd.rollback()
-
- # test reading
- lfd = LockedFD(my_file)
- rfd = lfd.open(write=False)
- assert os.read(rfd, len(orig_data)) == orig_data
-
- assert os.path.isfile(lockfilepath)
- # deletion rolls back
- del(lfd)
- assert not os.path.isfile(lockfilepath)
-
-
- # write data - concurrently
- lfd = LockedFD(my_file)
- olfd = LockedFD(my_file)
- assert not os.path.isfile(lockfilepath)
- wfdstream = lfd.open(write=True, stream=True) # this time as stream
- assert os.path.isfile(lockfilepath)
- # another one fails
- self.failUnlessRaises(IOError, olfd.open)
-
- wfdstream.write(new_data)
- lfd.commit()
- assert not os.path.isfile(lockfilepath)
- self._cmp_contents(my_file, new_data)
-
- # could test automatic _end_writing on destruction
- finally:
- os.remove(my_file)
- # END final cleanup
-
- # try non-existing file for reading
- lfd = LockedFD(tempfile.mktemp())
- try:
- lfd.open(write=False)
- except OSError:
- assert not os.path.exists(lfd._lockfilepath())
- else:
- self.fail("expected OSError")
- # END handle exceptions
-
-
-class TestActor(TestBase):
- def test_from_string_should_separate_name_and_email(self):
- a = Actor._from_string("Michael Trier <mtrier@example.com>")
- assert "Michael Trier" == a.name
- assert "mtrier@example.com" == a.email
-
- # base type capabilities
- assert a == a
- assert not ( a != a )
- m = set()
- m.add(a)
- m.add(a)
- assert len(m) == 1
-
- def test_from_string_should_handle_just_name(self):
- a = Actor._from_string("Michael Trier")
- assert "Michael Trier" == a.name
- assert None == a.email
-
- def test_should_display_representation(self):
- a = Actor._from_string("Michael Trier <mtrier@example.com>")
- assert '<git.Actor "Michael Trier <mtrier@example.com>">' == repr(a)
-
- def test_str_should_alias_name(self):
- a = Actor._from_string("Michael Trier <mtrier@example.com>")
- assert a.name == str(a)
+