summaryrefslogtreecommitdiff
path: root/git/test
diff options
context:
space:
mode:
Diffstat (limited to 'git/test')
-rw-r--r--git/test/lib/__init__.py4
-rw-r--r--git/test/lib/asserts.py18
-rw-r--r--git/test/lib/helper.py113
-rw-r--r--git/test/performance/lib.py28
-rw-r--r--git/test/performance/test_commit.py39
-rw-r--r--git/test/performance/test_odb.py18
-rw-r--r--git/test/performance/test_streams.py56
-rw-r--r--git/test/performance/test_utils.py64
-rw-r--r--git/test/test_actor.py8
-rw-r--r--git/test/test_base.py49
-rw-r--r--git/test/test_blob.py10
-rw-r--r--git/test/test_commit.py117
-rw-r--r--git/test/test_config.py41
-rw-r--r--git/test/test_db.py9
-rw-r--r--git/test/test_diff.py45
-rw-r--r--git/test/test_fun.py98
-rw-r--r--git/test/test_git.py25
-rw-r--r--git/test/test_index.py300
-rw-r--r--git/test/test_reflog.py46
-rw-r--r--git/test/test_refs.py227
-rw-r--r--git/test/test_remote.py243
-rw-r--r--git/test/test_repo.py267
-rw-r--r--git/test/test_stats.py9
-rw-r--r--git/test/test_submodule.py234
-rw-r--r--git/test/test_tree.py69
-rw-r--r--git/test/test_util.py64
26 files changed, 1098 insertions, 1103 deletions
diff --git a/git/test/lib/__init__.py b/git/test/lib/__init__.py
index 77512794..e13e227d 100644
--- a/git/test/lib/__init__.py
+++ b/git/test/lib/__init__.py
@@ -9,5 +9,5 @@ from mock import *
from asserts import *
from helper import *
-__all__ = [ name for name, obj in locals().items()
- if not (name.startswith('_') or inspect.ismodule(obj)) ]
+__all__ = [name for name, obj in locals().items()
+ if not (name.startswith('_') or inspect.ismodule(obj))]
diff --git a/git/test/lib/asserts.py b/git/test/lib/asserts.py
index fa754b92..351901dc 100644
--- a/git/test/lib/asserts.py
+++ b/git/test/lib/asserts.py
@@ -10,41 +10,49 @@ from nose import tools
from nose.tools import *
import stat
-__all__ = ['assert_instance_of', 'assert_not_instance_of',
+__all__ = ['assert_instance_of', 'assert_not_instance_of',
'assert_none', 'assert_not_none',
'assert_match', 'assert_not_match', 'assert_mode_644',
'assert_mode_755'] + tools.__all__
+
def assert_instance_of(expected, actual, msg=None):
"""Verify that object is an instance of expected """
assert isinstance(actual, expected), msg
+
def assert_not_instance_of(expected, actual, msg=None):
"""Verify that object is not an instance of expected """
assert not isinstance(actual, expected, msg)
-
+
+
def assert_none(actual, msg=None):
"""verify that item is None"""
assert actual is None, msg
+
def assert_not_none(actual, msg=None):
"""verify that item is None"""
assert actual is not None, msg
+
def assert_match(pattern, string, msg=None):
"""verify that the pattern matches the string"""
assert_not_none(re.search(pattern, string), msg)
+
def assert_not_match(pattern, string, msg=None):
"""verify that the pattern does not match the string"""
assert_none(re.search(pattern, string), msg)
-
+
+
def assert_mode_644(mode):
"""Verify given mode is 644"""
- assert (mode & stat.S_IROTH) and (mode & stat.S_IRGRP)
+ assert (mode & stat.S_IROTH) and (mode & stat.S_IRGRP)
assert (mode & stat.S_IWUSR) and (mode & stat.S_IRUSR) and not (mode & stat.S_IXUSR)
+
def assert_mode_755(mode):
"""Verify given mode is 755"""
assert (mode & stat.S_IROTH) and (mode & stat.S_IRGRP) and (mode & stat.S_IXOTH) and (mode & stat.S_IXGRP)
- assert (mode & stat.S_IWUSR) and (mode & stat.S_IRUSR) and (mode & stat.S_IXUSR) \ No newline at end of file
+ assert (mode & stat.S_IWUSR) and (mode & stat.S_IRUSR) and (mode & stat.S_IXUSR)
diff --git a/git/test/lib/helper.py b/git/test/lib/helper.py
index a76f1a15..913cf3b6 100644
--- a/git/test/lib/helper.py
+++ b/git/test/lib/helper.py
@@ -21,36 +21,42 @@ __all__ = (
#{ Routines
+
def fixture_path(name):
test_dir = os.path.dirname(os.path.dirname(__file__))
return os.path.join(test_dir, "fixtures", name)
+
def fixture(name):
return open(fixture_path(name), 'rb').read()
+
def absolute_project_path():
return os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
#} END routines
-
-#{ Adapters
-
+
+#{ Adapters
+
+
class StringProcessAdapter(object):
+
"""Allows to use strings as Process object as returned by SubProcess.Popen.
Its tailored to work with the test system only"""
-
+
def __init__(self, input_string):
self.stdout = cStringIO.StringIO(input_string)
self.stderr = cStringIO.StringIO()
-
+
def wait(self):
return 0
-
+
poll = wait
-
+
#} END adapters
-#{ Decorators
+#{ Decorators
+
def _mktemp(*args):
"""Wrapper around default tempfile.mktemp to fix an osx issue"""
@@ -59,28 +65,31 @@ def _mktemp(*args):
tdir = '/private' + tdir
return tdir
+
def _rmtree_onerror(osremove, fullpath, exec_info):
"""
- Handle the case on windows that read-only files cannot be deleted by
+ Handle the case on windows that read-only files cannot be deleted by
os.remove by setting it to mode 777, then retry deletion.
"""
if os.name != 'nt' or osremove is not os.remove:
raise
-
+
os.chmod(fullpath, 0777)
os.remove(fullpath)
+
def with_rw_repo(working_tree_ref, bare=False):
"""
- Same as with_bare_repo, but clones the rorepo as non-bare repository, checking
+ Same as with_bare_repo, but clones the rorepo as non-bare repository, checking
out the working tree at the given working_tree_ref.
-
+
This repository type is more costly due to the working copy checkout.
-
- To make working with relative paths easier, the cwd will be set to the working
+
+ To make working with relative paths easier, the cwd will be set to the working
dir of the repository.
"""
assert isinstance(working_tree_ref, basestring), "Decorator requires ref name for working tree checkout"
+
def argument_passer(func):
def repo_creator(self):
prefix = 'non_'
@@ -89,12 +98,12 @@ def with_rw_repo(working_tree_ref, bare=False):
#END handle prefix
repo_dir = _mktemp("%sbare_%s" % (prefix, func.__name__))
rw_repo = self.rorepo.clone(repo_dir, shared=True, bare=bare, n=True)
-
+
rw_repo.head.commit = rw_repo.commit(working_tree_ref)
if not bare:
rw_repo.head.reference.checkout()
# END handle checkout
-
+
prev_cwd = os.getcwd()
os.chdir(rw_repo.working_dir)
try:
@@ -116,44 +125,46 @@ def with_rw_repo(working_tree_ref, bare=False):
return repo_creator
# END argument passer
return argument_passer
-
+
+
def with_rw_and_rw_remote_repo(working_tree_ref):
"""
Same as with_rw_repo, but also provides a writable remote repository from which the
- rw_repo has been forked as well as a handle for a git-daemon that may be started to
+ rw_repo has been forked as well as a handle for a git-daemon that may be started to
run the remote_repo.
- The remote repository was cloned as bare repository from the rorepo, wheras
+ The remote repository was cloned as bare repository from the rorepo, wheras
the rw repo has a working tree and was cloned from the remote repository.
-
- remote_repo has two remotes: origin and daemon_origin. One uses a local url,
- the other uses a server url. The daemon setup must be done on system level
- and should be an inetd service that serves tempdir.gettempdir() and all
+
+ remote_repo has two remotes: origin and daemon_origin. One uses a local url,
+ the other uses a server url. The daemon setup must be done on system level
+ and should be an inetd service that serves tempdir.gettempdir() and all
directories in it.
-
+
The following scetch demonstrates this::
rorepo ---<bare clone>---> rw_remote_repo ---<clone>---> rw_repo
-
+
The test case needs to support the following signature::
def case(self, rw_repo, rw_remote_repo)
-
+
This setup allows you to test push and pull scenarios and hooks nicely.
-
+
See working dir info in with_rw_repo
"""
assert isinstance(working_tree_ref, basestring), "Decorator requires ref name for working tree checkout"
+
def argument_passer(func):
def remote_repo_creator(self):
remote_repo_dir = _mktemp("remote_repo_%s" % func.__name__)
repo_dir = _mktemp("remote_clone_non_bare_repo")
-
+
rw_remote_repo = self.rorepo.clone(remote_repo_dir, shared=True, bare=True)
rw_repo = rw_remote_repo.clone(repo_dir, shared=True, bare=False, n=True) # recursive alternates info ?
rw_repo.head.commit = working_tree_ref
rw_repo.head.reference.checkout()
-
+
# prepare for git-daemon
rw_remote_repo.daemon_export = True
-
+
# this thing is just annoying !
crw = rw_remote_repo.config_writer()
section = "daemon"
@@ -164,28 +175,28 @@ def with_rw_and_rw_remote_repo(working_tree_ref):
crw.set(section, "receivepack", True)
# release lock
del(crw)
-
- # initialize the remote - first do it as local remote and pull, then
+
+ # initialize the remote - first do it as local remote and pull, then
# we change the url to point to the daemon. The daemon should be started
# by the user, not by us
d_remote = Remote.create(rw_repo, "daemon_origin", remote_repo_dir)
d_remote.fetch()
remote_repo_url = "git://localhost%s" % remote_repo_dir
-
+
d_remote.config_writer.set('url', remote_repo_url)
-
+
# try to list remotes to diagnoes whether the server is up
try:
rw_repo.git.ls_remote(d_remote)
- except GitCommandError,e:
+ except GitCommandError, e:
print str(e)
if os.name == 'nt':
- raise AssertionError('git-daemon needs to run this test, but windows does not have one. Otherwise, run: git-daemon "%s"' % os.path.dirname(_mktemp()))
+ raise AssertionError('git-daemon needs to run this test, but windows does not have one. Otherwise, run: git-daemon "%s"' % os.path.dirname(_mktemp()))
else:
raise AssertionError('Please start a git-daemon to run this test, execute: git-daemon "%s"' % os.path.dirname(_mktemp()))
# END make assertion
#END catch ls remote error
-
+
# adjust working dir
prev_cwd = os.getcwd()
os.chdir(rw_repo.working_dir)
@@ -203,37 +214,39 @@ def with_rw_and_rw_remote_repo(working_tree_ref):
return remote_repo_creator
# END remote repo creator
# END argument parsser
-
+
return argument_passer
-
+
#} END decorators
-
+
+
class TestBase(TestCase):
+
"""
Base Class providing default functionality to all tests such as:
-
+
- Utility functions provided by the TestCase base of the unittest method such as::
self.fail("todo")
self.failUnlessRaises(...)
-
- - Class level repository which is considered read-only as it is shared among
+
+ - Class level repository which is considered read-only as it is shared among
all test cases in your type.
- Access it using::
+ Access it using::
self.rorepo # 'ro' stands for read-only
-
- The rorepo is in fact your current project's git repo. If you refer to specific
- shas for your objects, be sure you choose some that are part of the immutable portion
+
+ The rorepo is in fact your current project's git repo. If you refer to specific
+ shas for your objects, be sure you choose some that are part of the immutable portion
of the project history ( to assure tests don't fail for others ).
"""
-
+
@classmethod
def setUpClass(cls):
"""
- Dynamically add a read-only repository to our actual type. This way
+ Dynamically add a read-only repository to our actual type. This way
each test type has its own repository
"""
cls.rorepo = Repo(GIT_REPO)
-
+
def _make_file(self, rela_path, data, repo=None):
"""
Create a file at the given path relative to our repository, filled
diff --git a/git/test/performance/lib.py b/git/test/performance/lib.py
index d8313dac..00d41b76 100644
--- a/git/test/performance/lib.py
+++ b/git/test/performance/lib.py
@@ -30,26 +30,27 @@ def resolve_or_fail(env_var):
#} END utilities
-#{ Base Classes
+#{ Base Classes
class TestBigRepoR(TestBase):
- """TestCase providing access to readonly 'big' repositories using the following
+
+ """TestCase providing access to readonly 'big' repositories using the following
member variables:
-
+
* gitrorepo
-
+
* Read-Only git repository - actually the repo of git itself
-
+
* puregitrorepo
-
+
* As gitrepo, but uses pure python implementation
"""
-
+
#{ Invariants
head_sha_2k = '235d521da60e4699e5bd59ac658b5b48bd76ddca'
head_sha_50 = '32347c375250fd470973a5d76185cac718955fd5'
- #} END invariants
-
+ #} END invariants
+
@classmethod
def setUp(cls):
super(TestBigRepoR, cls).setUp()
@@ -59,10 +60,11 @@ class TestBigRepoR(TestBase):
class TestBigRepoRW(TestBigRepoR):
+
"""As above, but provides a big repository that we can write to.
-
+
Provides ``self.gitrwrepo`` and ``self.puregitrwrepo``"""
-
+
@classmethod
def setUp(cls):
super(TestBigRepoRW, cls).setUp()
@@ -70,9 +72,9 @@ class TestBigRepoRW(TestBigRepoR):
os.mkdir(dirname)
cls.gitrwrepo = cls.gitrorepo.clone(dirname, shared=True, bare=True, odbt=GitCmdObjectDB)
cls.puregitrwrepo = Repo(dirname, odbt=GitDB)
-
+
@classmethod
def tearDownAll(cls):
shutil.rmtree(cls.gitrwrepo.working_dir)
-
+
#} END base classes
diff --git a/git/test/performance/test_commit.py b/git/test/performance/test_commit.py
index 1bdfcfa2..009b3d82 100644
--- a/git/test/performance/test_commit.py
+++ b/git/test/performance/test_commit.py
@@ -12,6 +12,7 @@ from cStringIO import StringIO
from time import time
import sys
+
class TestPerformance(TestBigRepoRW):
# ref with about 100 commits in its history
@@ -26,15 +27,15 @@ class TestPerformance(TestBigRepoRW):
c.committer_tz_offset
c.message
c.parents
-
+
def test_iteration(self):
no = 0
nc = 0
-
- # find the first commit containing the given path - always do a full
- # iteration ( restricted to the path in question ), but in fact it should
+
+ # find the first commit containing the given path - always do a full
+ # iteration ( restricted to the path in question ), but in fact it should
# return quite a lot of commits, we just take one and hence abort the operation
-
+
st = time()
for c in self.rorepo.iter_commits(self.ref_100):
nc += 1
@@ -45,8 +46,8 @@ class TestPerformance(TestBigRepoRW):
# END for each object
# END for each commit
elapsed_time = time() - st
- print >> sys.stderr, "Traversed %i Trees and a total of %i unchached objects in %s [s] ( %f objs/s )" % (nc, no, elapsed_time, no/elapsed_time)
-
+ print >> sys.stderr, "Traversed %i Trees and a total of %i unchached objects in %s [s] ( %f objs/s )" % (nc, no, elapsed_time, no / elapsed_time)
+
def test_commit_traversal(self):
# bound to cat-file parsing performance
nc = 0
@@ -56,8 +57,8 @@ class TestPerformance(TestBigRepoRW):
self._query_commit_info(c)
# END for each traversed commit
elapsed_time = time() - st
- print >> sys.stderr, "Traversed %i Commits in %s [s] ( %f commits/s )" % (nc, elapsed_time, nc/elapsed_time)
-
+ print >> sys.stderr, "Traversed %i Commits in %s [s] ( %f commits/s )" % (nc, elapsed_time, nc / elapsed_time)
+
def test_commit_iteration(self):
# bound to stream parsing performance
nc = 0
@@ -67,33 +68,33 @@ class TestPerformance(TestBigRepoRW):
self._query_commit_info(c)
# END for each traversed commit
elapsed_time = time() - st
- print >> sys.stderr, "Iterated %i Commits in %s [s] ( %f commits/s )" % (nc, elapsed_time, nc/elapsed_time)
-
+ print >> sys.stderr, "Iterated %i Commits in %s [s] ( %f commits/s )" % (nc, elapsed_time, nc / elapsed_time)
+
def test_commit_serialization(self):
assert_commit_serialization(self.gitrwrepo, self.head_sha_2k, True)
-
+
rwrepo = self.gitrwrepo
make_object = rwrepo.odb.store
# direct serialization - deserialization can be tested afterwards
# serialization is probably limited on IO
hc = rwrepo.commit(self.head_sha_2k)
-
+
commits = list()
nc = 5000
st = time()
for i in xrange(nc):
- cm = Commit( rwrepo, Commit.NULL_BIN_SHA, hc.tree,
- hc.author, hc.authored_date, hc.author_tz_offset,
- hc.committer, hc.committed_date, hc.committer_tz_offset,
+ cm = Commit(rwrepo, Commit.NULL_BIN_SHA, hc.tree,
+ hc.author, hc.authored_date, hc.author_tz_offset,
+ hc.committer, hc.committed_date, hc.committer_tz_offset,
str(i), parents=hc.parents, encoding=hc.encoding)
-
+
stream = StringIO()
cm._serialize(stream)
slen = stream.tell()
stream.seek(0)
-
+
cm.binsha = make_object(IStream(Commit.type, slen, stream)).binsha
# END commit creation
elapsed = time() - st
-
+
print >> sys.stderr, "Serialized %i commits to loose objects in %f s ( %f commits / s )" % (nc, elapsed, nc / elapsed)
diff --git a/git/test/performance/test_odb.py b/git/test/performance/test_odb.py
index ccc13eb4..5ddbbd53 100644
--- a/git/test/performance/test_odb.py
+++ b/git/test/performance/test_odb.py
@@ -10,9 +10,9 @@ from lib import (
class TestObjDBPerformance(TestBigRepoR):
-
+
def test_random_access(self):
- results = [ ["Iterate Commits"], ["Iterate Blobs"], ["Retrieve Blob Data"] ]
+ results = [["Iterate Commits"], ["Iterate Blobs"], ["Retrieve Blob Data"]]
for repo in (self.gitrorepo, self.puregitrorepo):
# GET COMMITS
st = time()
@@ -20,10 +20,10 @@ class TestObjDBPerformance(TestBigRepoR):
commits = list(root_commit.traverse())
nc = len(commits)
elapsed = time() - st
-
+
print >> sys.stderr, "%s: Retrieved %i commits from ObjectStore in %g s ( %f commits / s )" % (type(repo.odb), nc, elapsed, nc / elapsed)
results[0].append(elapsed)
-
+
# GET TREES
# walk all trees of all commits
st = time()
@@ -41,10 +41,10 @@ class TestObjDBPerformance(TestBigRepoR):
blobs_per_commit.append(blobs)
# END for each commit
elapsed = time() - st
-
+
print >> sys.stderr, "%s: Retrieved %i objects from %i commits in %g s ( %f objects / s )" % (type(repo.odb), nt, len(commits), elapsed, nt / elapsed)
results[1].append(elapsed)
-
+
# GET BLOBS
st = time()
nb = 0
@@ -59,11 +59,11 @@ class TestObjDBPerformance(TestBigRepoR):
break
# END for each bloblist
elapsed = time() - st
-
- print >> sys.stderr, "%s: Retrieved %i blob (%i KiB) and their data in %g s ( %f blobs / s, %f KiB / s )" % (type(repo.odb), nb, data_bytes/1000, elapsed, nb / elapsed, (data_bytes / 1000) / elapsed)
+
+ print >> sys.stderr, "%s: Retrieved %i blob (%i KiB) and their data in %g s ( %f blobs / s, %f KiB / s )" % (type(repo.odb), nb, data_bytes / 1000, elapsed, nb / elapsed, (data_bytes / 1000) / elapsed)
results[2].append(elapsed)
# END for each repo type
-
+
# final results
for test_name, a, b in results:
print >> sys.stderr, "%s: %f s vs %f s, pure is %f times slower" % (test_name, a, b, b / a)
diff --git a/git/test/performance/test_streams.py b/git/test/performance/test_streams.py
index 93e88841..e42867a3 100644
--- a/git/test/performance/test_streams.py
+++ b/git/test/performance/test_streams.py
@@ -18,16 +18,16 @@ from lib import (
class TestObjDBPerformance(TestBigRepoR):
-
- large_data_size_bytes = 1000*1000*10 # some MiB should do it
- moderate_data_size_bytes = 1000*1000*1 # just 1 MiB
-
+
+ large_data_size_bytes = 1000 * 1000 * 10 # some MiB should do it
+ moderate_data_size_bytes = 1000 * 1000 * 1 # just 1 MiB
+
@with_rw_repo('HEAD', bare=True)
def test_large_data_streaming(self, rwrepo):
# TODO: This part overlaps with the same file in gitdb.test.performance.test_stream
# It should be shared if possible
ldb = LooseObjectDB(os.path.join(rwrepo.git_dir, 'objects'))
-
+
for randomize in range(2):
desc = (randomize and 'random ') or ''
print >> sys.stderr, "Creating %s data ..." % desc
@@ -35,32 +35,30 @@ class TestObjDBPerformance(TestBigRepoR):
size, stream = make_memory_file(self.large_data_size_bytes, randomize)
elapsed = time() - st
print >> sys.stderr, "Done (in %f s)" % elapsed
-
- # writing - due to the compression it will seem faster than it is
+
+ # writing - due to the compression it will seem faster than it is
st = time()
binsha = ldb.store(IStream('blob', size, stream)).binsha
elapsed_add = time() - st
assert ldb.has_object(binsha)
db_file = ldb.readable_db_object_path(bin_to_hex(binsha))
fsize_kib = os.path.getsize(db_file) / 1000
-
-
+
size_kib = size / 1000
print >> sys.stderr, "Added %i KiB (filesize = %i KiB) of %s data to loose odb in %f s ( %f Write KiB / s)" % (size_kib, fsize_kib, desc, elapsed_add, size_kib / elapsed_add)
-
+
# reading all at once
st = time()
ostream = ldb.stream(binsha)
shadata = ostream.read()
elapsed_readall = time() - st
-
+
stream.seek(0)
assert shadata == stream.getvalue()
print >> sys.stderr, "Read %i KiB of %s data at once from loose odb in %f s ( %f Read KiB / s)" % (size_kib, desc, elapsed_readall, size_kib / elapsed_readall)
-
-
+
# reading in chunks of 1 MiB
- cs = 512*1000
+ cs = 512 * 1000
chunks = list()
st = time()
ostream = ldb.stream(binsha)
@@ -71,21 +69,21 @@ class TestObjDBPerformance(TestBigRepoR):
break
# END read in chunks
elapsed_readchunks = time() - st
-
+
stream.seek(0)
assert ''.join(chunks) == stream.getvalue()
-
+
cs_kib = cs / 1000
print >> sys.stderr, "Read %i KiB of %s data in %i KiB chunks from loose odb in %f s ( %f Read KiB / s)" % (size_kib, desc, cs_kib, elapsed_readchunks, size_kib / elapsed_readchunks)
-
+
# del db file so git has something to do
os.remove(db_file)
-
- # VS. CGIT
+
+ # VS. CGIT
##########
# CGIT ! Can using the cgit programs be faster ?
proc = rwrepo.git.hash_object('-w', '--stdin', as_process=True, istream=subprocess.PIPE)
-
+
# write file - pump everything in at once to be a fast as possible
data = stream.getvalue() # cache it
st = time()
@@ -96,25 +94,23 @@ class TestObjDBPerformance(TestBigRepoR):
gelapsed_add = time() - st
del(data)
assert gitsha == bin_to_hex(binsha) # we do it the same way, right ?
-
+
# as its the same sha, we reuse our path
fsize_kib = os.path.getsize(db_file) / 1000
print >> sys.stderr, "Added %i KiB (filesize = %i KiB) of %s data to using git-hash-object in %f s ( %f Write KiB / s)" % (size_kib, fsize_kib, desc, gelapsed_add, size_kib / gelapsed_add)
-
- # compare ...
+
+ # compare ...
print >> sys.stderr, "Git-Python is %f %% faster than git when adding big %s files" % (100.0 - (elapsed_add / gelapsed_add) * 100, desc)
-
-
+
# read all
st = time()
s, t, size, data = rwrepo.git.get_object_data(gitsha)
gelapsed_readall = time() - st
print >> sys.stderr, "Read %i KiB of %s data at once using git-cat-file in %f s ( %f Read KiB / s)" % (size_kib, desc, gelapsed_readall, size_kib / gelapsed_readall)
- # compare
+ # compare
print >> sys.stderr, "Git-Python is %f %% faster than git when reading big %sfiles" % (100.0 - (elapsed_readall / gelapsed_readall) * 100, desc)
-
-
+
# read chunks
st = time()
s, t, size, stream = rwrepo.git.stream_object_data(gitsha)
@@ -125,7 +121,7 @@ class TestObjDBPerformance(TestBigRepoR):
# END read stream
gelapsed_readchunks = time() - st
print >> sys.stderr, "Read %i KiB of %s data in %i KiB chunks from git-cat-file in %f s ( %f Read KiB / s)" % (size_kib, desc, cs_kib, gelapsed_readchunks, size_kib / gelapsed_readchunks)
-
- # compare
+
+ # compare
print >> sys.stderr, "Git-Python is %f %% faster than git when reading big %s files in chunks" % (100.0 - (elapsed_readchunks / gelapsed_readchunks) * 100, desc)
# END for each randomization factor
diff --git a/git/test/performance/test_utils.py b/git/test/performance/test_utils.py
index 8637af48..c8d397fb 100644
--- a/git/test/performance/test_utils.py
+++ b/git/test/performance/test_utils.py
@@ -9,29 +9,33 @@ from lib import (
class TestUtilPerformance(TestBigRepoR):
-
+
def test_access(self):
# compare dict vs. slot access
class Slotty(object):
__slots__ = "attr"
+
def __init__(self):
self.attr = 1
-
+
class Dicty(object):
+
def __init__(self):
self.attr = 1
-
+
class BigSlotty(object):
__slots__ = ('attr', ) + tuple('abcdefghijk')
+
def __init__(self):
for attr in self.__slots__:
setattr(self, attr, 1)
-
+
class BigDicty(object):
+
def __init__(self):
for attr in BigSlotty.__slots__:
setattr(self, attr, 1)
-
+
ni = 1000000
for cls in (Slotty, Dicty, BigSlotty, BigDicty):
cli = cls()
@@ -42,7 +46,7 @@ class TestUtilPerformance(TestBigRepoR):
elapsed = time() - st
print >> sys.stderr, "Accessed %s.attr %i times in %s s ( %f acc / s)" % (cls.__name__, ni, elapsed, ni / elapsed)
# END for each class type
-
+
# check num of sequence-acceses
for cls in (list, tuple):
x = 10
@@ -56,12 +60,12 @@ class TestUtilPerformance(TestBigRepoR):
elapsed = time() - st
na = ni * 3
print >> sys.stderr, "Accessed %s[x] %i times in %s s ( %f acc / s)" % (cls.__name__, na, elapsed, na / elapsed)
- # END for each sequence
-
+ # END for each sequence
+
def test_instantiation(self):
ni = 100000
max_num_items = 4
- for mni in range(max_num_items+1):
+ for mni in range(max_num_items + 1):
for cls in (tuple, list):
st = time()
for i in xrange(ni):
@@ -70,11 +74,11 @@ class TestUtilPerformance(TestBigRepoR):
elif mni == 1:
cls((1,))
elif mni == 2:
- cls((1,2))
+ cls((1, 2))
elif mni == 3:
- cls((1,2,3))
+ cls((1, 2, 3))
elif mni == 4:
- cls((1,2,3,4))
+ cls((1, 2, 3, 4))
else:
cls(x for x in xrange(mni))
# END handle empty cls
@@ -83,27 +87,27 @@ class TestUtilPerformance(TestBigRepoR):
print >> sys.stderr, "Created %i %ss of size %i in %f s ( %f inst / s)" % (ni, cls.__name__, mni, elapsed, ni / elapsed)
# END for each type
# END for each item count
-
+
# tuple and tuple direct
st = time()
for i in xrange(ni):
- t = (1,2,3,4)
+ t = (1, 2, 3, 4)
# END for each item
elapsed = time() - st
print >> sys.stderr, "Created %i tuples (1,2,3,4) in %f s ( %f tuples / s)" % (ni, elapsed, ni / elapsed)
-
+
st = time()
for i in xrange(ni):
- t = tuple((1,2,3,4))
+ t = tuple((1, 2, 3, 4))
# END for each item
elapsed = time() - st
print >> sys.stderr, "Created %i tuples tuple((1,2,3,4)) in %f s ( %f tuples / s)" % (ni, elapsed, ni / elapsed)
-
+
def test_unpacking_vs_indexing(self):
ni = 1000000
- list_items = [1,2,3,4]
- tuple_items = (1,2,3,4)
-
+ list_items = [1, 2, 3, 4]
+ tuple_items = (1, 2, 3, 4)
+
for sequence in (list_items, tuple_items):
st = time()
for i in xrange(ni):
@@ -111,14 +115,14 @@ class TestUtilPerformance(TestBigRepoR):
# END for eac iteration
elapsed = time() - st
print >> sys.stderr, "Unpacked %i %ss of size %i in %f s ( %f acc / s)" % (ni, type(sequence).__name__, len(sequence), elapsed, ni / elapsed)
-
+
st = time()
for i in xrange(ni):
one, two, three, four = sequence[0], sequence[1], sequence[2], sequence[3]
# END for eac iteration
elapsed = time() - st
print >> sys.stderr, "Unpacked %i %ss of size %i individually in %f s ( %f acc / s)" % (ni, type(sequence).__name__, len(sequence), elapsed, ni / elapsed)
-
+
st = time()
for i in xrange(ni):
one, two = sequence[0], sequence[1]
@@ -126,15 +130,15 @@ class TestUtilPerformance(TestBigRepoR):
elapsed = time() - st
print >> sys.stderr, "Unpacked %i %ss of size %i individually (2 of 4) in %f s ( %f acc / s)" % (ni, type(sequence).__name__, len(sequence), elapsed, ni / elapsed)
# END for each sequence
-
+
def test_large_list_vs_iteration(self):
# what costs more: alloc/realloc of lists, or the cpu strain of iterators ?
def slow_iter(ni):
for i in xrange(ni):
yield i
# END slow iter - be closer to the real world
-
- # alloc doesn't play a role here it seems
+
+ # alloc doesn't play a role here it seems
for ni in (500, 1000, 10000, 20000, 40000):
st = time()
for i in list(xrange(ni)):
@@ -142,7 +146,7 @@ class TestUtilPerformance(TestBigRepoR):
# END for each item
elapsed = time() - st
print >> sys.stderr, "Iterated %i items from list in %f s ( %f acc / s)" % (ni, elapsed, ni / elapsed)
-
+
st = time()
for i in slow_iter(ni):
i
@@ -150,14 +154,14 @@ class TestUtilPerformance(TestBigRepoR):
elapsed = time() - st
print >> sys.stderr, "Iterated %i items from iterator in %f s ( %f acc / s)" % (ni, elapsed, ni / elapsed)
# END for each number of iterations
-
+
def test_type_vs_inst_class(self):
class NewType(object):
pass
-
+
# lets see which way is faster
inst = NewType()
-
+
ni = 1000000
st = time()
for i in xrange(ni):
@@ -165,7 +169,7 @@ class TestUtilPerformance(TestBigRepoR):
# END for each item
elapsed = time() - st
print >> sys.stderr, "Created %i items using inst.__class__ in %f s ( %f items / s)" % (ni, elapsed, ni / elapsed)
-
+
st = time()
for i in xrange(ni):
type(inst)()
diff --git a/git/test/test_actor.py b/git/test/test_actor.py
index b8e5ba3b..5ccf1d2e 100644
--- a/git/test/test_actor.py
+++ b/git/test/test_actor.py
@@ -8,15 +8,17 @@ import os
from git.test.lib import *
from git import *
+
class TestActor(object):
+
def test_from_string_should_separate_name_and_email(self):
a = Actor._from_string("Michael Trier <mtrier@example.com>")
assert_equal("Michael Trier", a.name)
assert_equal("mtrier@example.com", a.email)
-
+
# base type capabilities
assert a == a
- assert not ( a != a )
+ assert not (a != a)
m = set()
m.add(a)
m.add(a)
@@ -33,4 +35,4 @@ class TestActor(object):
def test_str_should_alias_name(self):
a = Actor._from_string("Michael Trier <mtrier@example.com>")
- assert_equal(a.name, str(a)) \ No newline at end of file
+ assert_equal(a.name, str(a))
diff --git a/git/test/test_base.py b/git/test/test_base.py
index 5edc9c52..81e785ab 100644
--- a/git/test/test_base.py
+++ b/git/test/test_base.py
@@ -15,18 +15,19 @@ from git.objects.util import get_object_type_by_name
from gitdb.util import hex_to_bin
import tempfile
+
class TestBase(TestBase):
-
- type_tuples = ( ("blob", "8741fc1d09d61f02ffd8cded15ff603eff1ec070", "blob.py"),
+
+ type_tuples = (("blob", "8741fc1d09d61f02ffd8cded15ff603eff1ec070", "blob.py"),
("tree", "3a6a5e3eeed3723c09f1ef0399f81ed6b8d82e79", "directory"),
("commit", "4251bd59fb8e11e40c40548cba38180a9536118c", None),
- ("tag", "e56a60e8e9cd333cfba0140a77cd12b0d9398f10", None) )
-
- def test_base_object(self):
+ ("tag", "e56a60e8e9cd333cfba0140a77cd12b0d9398f10", None))
+
+ def test_base_object(self):
# test interface of base object classes
types = (Blob, Tree, Commit, TagObject)
assert len(types) == len(self.type_tuples)
-
+
s = set()
num_objs = 0
num_index_objs = 0
@@ -34,9 +35,9 @@ class TestBase(TestBase):
binsha = hex_to_bin(hexsha)
item = None
if path is None:
- item = obj_type(self.rorepo,binsha)
+ item = obj_type(self.rorepo, binsha)
else:
- item = obj_type(self.rorepo,binsha, 0, path)
+ item = obj_type(self.rorepo, binsha, 0, path)
# END handle index objects
num_objs += 1
assert item.hexsha == hexsha
@@ -47,54 +48,54 @@ class TestBase(TestBase):
assert str(item) == item.hexsha
assert repr(item)
s.add(item)
-
+
if isinstance(item, base.IndexObject):
num_index_objs += 1
- if hasattr(item,'path'): # never runs here
+ if hasattr(item, 'path'): # never runs here
assert not item.path.startswith("/") # must be relative
assert isinstance(item.mode, int)
# END index object check
-
+
# read from stream
data_stream = item.data_stream
data = data_stream.read()
assert data
-
+
tmpfile = os.tmpfile()
assert item == item.stream_data(tmpfile)
tmpfile.seek(0)
assert tmpfile.read() == data
# END stream to file directly
# END for each object type to create
-
+
# each has a unique sha
assert len(s) == num_objs
- assert len(s|s) == num_objs
+ assert len(s | s) == num_objs
assert num_index_objs == 2
-
+
def test_get_object_type_by_name(self):
for tname in base.Object.TYPES:
assert base.Object in get_object_type_by_name(tname).mro()
- # END for each known type
-
- assert_raises( ValueError, get_object_type_by_name, "doesntexist" )
+ # END for each known type
+
+ assert_raises(ValueError, get_object_type_by_name, "doesntexist")
def test_object_resolution(self):
# objects must be resolved to shas so they compare equal
assert self.rorepo.head.reference.object == self.rorepo.active_branch.object
-
+
@with_rw_repo('HEAD', bare=True)
def test_with_bare_rw_repo(self, bare_rw_repo):
assert bare_rw_repo.config_reader("repository").getboolean("core", "bare")
- assert os.path.isfile(os.path.join(bare_rw_repo.git_dir,'HEAD'))
-
+ assert os.path.isfile(os.path.join(bare_rw_repo.git_dir, 'HEAD'))
+
@with_rw_repo('0.1.6')
def test_with_rw_repo(self, rw_repo):
assert not rw_repo.config_reader("repository").getboolean("core", "bare")
- assert os.path.isdir(os.path.join(rw_repo.working_tree_dir,'lib'))
-
+ assert os.path.isdir(os.path.join(rw_repo.working_tree_dir, 'lib'))
+
@with_rw_and_rw_remote_repo('0.1.6')
def test_with_rw_remote_and_rw_repo(self, rw_repo, rw_remote_repo):
assert not rw_repo.config_reader("repository").getboolean("core", "bare")
assert rw_remote_repo.config_reader("repository").getboolean("core", "bare")
- assert os.path.isdir(os.path.join(rw_repo.working_tree_dir,'lib'))
+ assert os.path.isdir(os.path.join(rw_repo.working_tree_dir, 'lib'))
diff --git a/git/test/test_blob.py b/git/test/test_blob.py
index 6fc0287f..ddd2893f 100644
--- a/git/test/test_blob.py
+++ b/git/test/test_blob.py
@@ -8,16 +8,16 @@ from git.test.lib import *
from git import *
from gitdb.util import hex_to_bin
+
class TestBlob(TestBase):
-
+
def test_mime_type_should_return_mime_type_for_known_types(self):
blob = Blob(self.rorepo, **{'binsha': Blob.NULL_BIN_SHA, 'path': 'foo.png'})
assert_equal("image/png", blob.mime_type)
-
+
def test_mime_type_should_return_text_plain_for_unknown_types(self):
- blob = Blob(self.rorepo, **{'binsha': Blob.NULL_BIN_SHA,'path': 'something'})
+ blob = Blob(self.rorepo, **{'binsha': Blob.NULL_BIN_SHA, 'path': 'something'})
assert_equal("text/plain", blob.mime_type)
-
+
def test_nodict(self):
self.failUnlessRaises(AttributeError, setattr, self.rorepo.tree()['AUTHORS'], 'someattr', 2)
-
diff --git a/git/test/test_commit.py b/git/test/test_commit.py
index f536470f..6cd892f0 100644
--- a/git/test/test_commit.py
+++ b/git/test/test_commit.py
@@ -17,54 +17,54 @@ import re
def assert_commit_serialization(rwrepo, commit_id, print_performance_info=False):
- """traverse all commits in the history of commit identified by commit_id and check
+ """traverse all commits in the history of commit identified by commit_id and check
if the serialization works.
:param print_performance_info: if True, we will show how fast we are"""
ns = 0 # num serializations
nds = 0 # num deserializations
-
+
st = time.time()
for cm in rwrepo.commit(commit_id).traverse():
nds += 1
-
- # assert that we deserialize commits correctly, hence we get the same
+
+ # assert that we deserialize commits correctly, hence we get the same
# sha on serialization
stream = StringIO()
cm._serialize(stream)
ns += 1
streamlen = stream.tell()
stream.seek(0)
-
+
istream = rwrepo.odb.store(IStream(Commit.type, streamlen, stream))
assert istream.hexsha == cm.hexsha
-
+
nc = Commit(rwrepo, Commit.NULL_BIN_SHA, cm.tree,
- cm.author, cm.authored_date, cm.author_tz_offset,
- cm.committer, cm.committed_date, cm.committer_tz_offset,
+ cm.author, cm.authored_date, cm.author_tz_offset,
+ cm.committer, cm.committed_date, cm.committer_tz_offset,
cm.message, cm.parents, cm.encoding)
-
+
assert nc.parents == cm.parents
stream = StringIO()
nc._serialize(stream)
ns += 1
streamlen = stream.tell()
stream.seek(0)
-
+
# reuse istream
istream.size = streamlen
istream.stream = stream
istream.binsha = None
nc.binsha = rwrepo.odb.store(istream).binsha
-
+
# if it worked, we have exactly the same contents !
assert nc.hexsha == cm.hexsha
# END check commits
elapsed = time.time() - st
-
+
if print_performance_info:
- print >> sys.stderr, "Serialized %i and deserialized %i commits in %f s ( (%f, %f) commits / s" % (ns, nds, elapsed, ns/elapsed, nds/elapsed)
+ print >> sys.stderr, "Serialized %i and deserialized %i commits in %f s ( (%f, %f) commits / s" % (ns, nds, elapsed, ns / elapsed, nds / elapsed)
# END handle performance info
-
+
class TestCommit(TestBase):
@@ -73,7 +73,7 @@ class TestCommit(TestBase):
commit = self.rorepo.commit('2454ae89983a4496a445ce347d7a41c0bb0ea7ae')
# commits have no dict
self.failUnlessRaises(AttributeError, setattr, commit, 'someattr', 1)
- commit.author # bake
+ commit.author # bake
assert_equal("Sebastian Thiel", commit.author.name)
assert_equal("byronimo@gmail.com", commit.author.email)
@@ -82,26 +82,25 @@ class TestCommit(TestBase):
assert isinstance(commit.author_tz_offset, int) and isinstance(commit.committer_tz_offset, int)
assert commit.message == "Added missing information to docstrings of commit and stats module\n"
-
def test_stats(self):
commit = self.rorepo.commit('33ebe7acec14b25c5f84f35a664803fcab2f7781')
stats = commit.stats
-
+
def check_entries(d):
assert isinstance(d, dict)
for key in ("insertions", "deletions", "lines"):
assert key in d
- # END assertion helper
- assert stats.files
+ # END assertion helper
+ assert stats.files
assert stats.total
-
- check_entries(stats.total)
+
+ check_entries(stats.total)
assert "files" in stats.total
-
+
for filepath, d in stats.files.items():
check_entries(d)
# END for each stated file
-
+
# assure data is parsed properly
michael = Actor._from_string("Michael Trier <mtrier@gmail.com>")
assert commit.author == michael
@@ -111,7 +110,7 @@ class TestCommit(TestBase):
assert commit.author_tz_offset == 14400, commit.author_tz_offset
assert commit.committer_tz_offset == 14400, commit.committer_tz_offset
assert commit.message == "initial project\n"
-
+
def test_unicode_actor(self):
# assure we can parse unicode actors correctly
name = "Üäöß ÄußÉ".decode("utf-8")
@@ -119,7 +118,7 @@ class TestCommit(TestBase):
special = Actor._from_string(u"%s <something@this.com>" % name)
assert special.name == name
assert isinstance(special.name, unicode)
-
+
def test_traversal(self):
start = self.rorepo.commit("a4d06724202afccd2b5c54f81bcf2bf26dea7fff")
first = self.rorepo.commit("33ebe7acec14b25c5f84f35a664803fcab2f7781")
@@ -127,65 +126,65 @@ class TestCommit(TestBase):
p1 = start.parents[1]
p00 = p0.parents[0]
p10 = p1.parents[0]
-
+
# basic branch first, depth first
dfirst = start.traverse(branch_first=False)
bfirst = start.traverse(branch_first=True)
assert dfirst.next() == p0
assert dfirst.next() == p00
-
+
assert bfirst.next() == p0
assert bfirst.next() == p1
assert bfirst.next() == p00
assert bfirst.next() == p10
-
+
# at some point, both iterations should stop
assert list(bfirst)[-1] == first
stoptraverse = self.rorepo.commit("254d04aa3180eb8b8daf7b7ff25f010cd69b4e7d").traverse(as_edge=True)
l = list(stoptraverse)
assert len(l[0]) == 2
-
+
# ignore self
assert start.traverse(ignore_self=False).next() == start
-
- # depth
+
+ # depth
assert len(list(start.traverse(ignore_self=False, depth=0))) == 1
-
+
# prune
- assert start.traverse(branch_first=1, prune=lambda i,d: i==p0).next() == p1
-
+ assert start.traverse(branch_first=1, prune=lambda i, d: i == p0).next() == p1
+
# predicate
- assert start.traverse(branch_first=1, predicate=lambda i,d: i==p1).next() == p1
-
+ assert start.traverse(branch_first=1, predicate=lambda i, d: i == p1).next() == p1
+
# traversal should stop when the beginning is reached
self.failUnlessRaises(StopIteration, first.traverse().next)
-
- # parents of the first commit should be empty ( as the only parent has a null
+
+ # parents of the first commit should be empty ( as the only parent has a null
# sha )
assert len(first.parents) == 0
-
+
def test_iteration(self):
# we can iterate commits
all_commits = Commit.list_items(self.rorepo, self.rorepo.head)
assert all_commits
assert all_commits == list(self.rorepo.iter_commits())
-
+
# this includes merge commits
mcomit = self.rorepo.commit('d884adc80c80300b4cc05321494713904ef1df2d')
assert mcomit in all_commits
-
+
# we can limit the result to paths
ltd_commits = list(self.rorepo.iter_commits(paths='CHANGES'))
assert ltd_commits and len(ltd_commits) < len(all_commits)
-
+
# show commits of multiple paths, resulting in a union of commits
less_ltd_commits = list(Commit.iter_items(self.rorepo, 'master', paths=('CHANGES', 'AUTHORS')))
assert len(ltd_commits) < len(less_ltd_commits)
-
+
def test_iter_items(self):
# pretty not allowed
self.failUnlessRaises(ValueError, Commit.iter_items, self.rorepo, 'master', pretty="raw")
-
+
def test_rev_list_bisect_all(self):
"""
'git rev-list --bisect-all' returns additional information
@@ -206,8 +205,8 @@ class TestCommit(TestBase):
assert_equal(sha1, commit.hexsha)
def test_count(self):
- assert self.rorepo.tag('refs/tags/0.1.5').commit.count( ) == 143
-
+ assert self.rorepo.tag('refs/tags/0.1.5').commit.count() == 143
+
def test_list(self):
assert isinstance(Commit.list_items(self.rorepo, '0.1.5', max_count=5)[hex_to_bin('5117c9c8a4d3af19a9958677e45cda9269de1541')], Commit)
@@ -222,10 +221,10 @@ class TestCommit(TestBase):
def test_equality(self):
commit1 = Commit(self.rorepo, Commit.NULL_BIN_SHA)
commit2 = Commit(self.rorepo, Commit.NULL_BIN_SHA)
- commit3 = Commit(self.rorepo, "\1"*20)
+ commit3 = Commit(self.rorepo, "\1" * 20)
assert_equal(commit1, commit2)
assert_not_equal(commit2, commit3)
-
+
def test_iter_parents(self):
# should return all but ourselves, even if skip is defined
c = self.rorepo.commit('0.1.5')
@@ -234,40 +233,40 @@ class TestCommit(TestBase):
first_parent = piter.next()
assert first_parent != c
assert first_parent == c.parents[0]
- # END for each
-
+ # END for each
+
def test_base(self):
name_rev = self.rorepo.head.commit.name_rev
assert isinstance(name_rev, basestring)
-
+
@with_rw_repo('HEAD', bare=True)
def test_serialization(self, rwrepo):
# create all commits of our repo
assert_commit_serialization(rwrepo, '0.1.6')
-
+
def test_serialization_unicode_support(self):
assert Commit.default_encoding.lower() == 'utf-8'
-
+
# create a commit with unicode in the message, and the author's name
# Verify its serialization and deserialization
cmt = self.rorepo.commit('0.1.6')
assert isinstance(cmt.message, unicode) # it automatically decodes it as such
- assert isinstance(cmt.author.name, unicode) # same here
-
+ assert isinstance(cmt.author.name, unicode) # same here
+
cmt.message = "üäêèß".decode("utf-8")
assert len(cmt.message) == 5
-
+
cmt.author.name = "äüß".decode("utf-8")
assert len(cmt.author.name) == 3
-
+
cstream = StringIO()
cmt._serialize(cstream)
cstream.seek(0)
assert len(cstream.getvalue())
-
+
ncmt = Commit(self.rorepo, cmt.binsha)
ncmt._deserialize(cstream)
-
+
assert cmt.author.name == ncmt.author.name
assert cmt.message == ncmt.message
# actually, it can't be printed in a shell as repr wants to have ascii only
diff --git a/git/test/test_config.py b/git/test/test_config.py
index b397b193..b6888023 100644
--- a/git/test/test_config.py
+++ b/git/test/test_config.py
@@ -10,36 +10,37 @@ import StringIO
from copy import copy
from ConfigParser import NoSectionError
+
class TestBase(TestCase):
-
+
def _to_memcache(self, file_path):
fp = open(file_path, "r")
sio = StringIO.StringIO(fp.read())
sio.name = file_path
return sio
-
+
def _parsers_equal_or_raise(self, lhs, rhs):
pass
-
+
def test_read_write(self):
# writer must create the exact same file as the one read before
for filename in ("git_config", "git_config_global"):
file_obj = self._to_memcache(fixture_path(filename))
file_obj_orig = copy(file_obj)
- w_config = GitConfigParser(file_obj, read_only = False)
+ w_config = GitConfigParser(file_obj, read_only=False)
w_config.read() # enforce reading
assert w_config._sections
w_config.write() # enforce writing
-
+
# we stripped lines when reading, so the results differ
assert file_obj.getvalue() != file_obj_orig.getvalue()
-
+
# creating an additional config writer must fail due to exclusive access
- self.failUnlessRaises(IOError, GitConfigParser, file_obj, read_only = False)
-
+ self.failUnlessRaises(IOError, GitConfigParser, file_obj, read_only=False)
+
# should still have a lock and be able to make changes
assert w_config._lock._has_lock()
-
+
# changes should be written right away
sname = "my_section"
oname = "mykey"
@@ -47,15 +48,15 @@ class TestBase(TestCase):
w_config.add_section(sname)
assert w_config.has_section(sname)
w_config.set(sname, oname, val)
- assert w_config.has_option(sname,oname)
+ assert w_config.has_option(sname, oname)
assert w_config.get(sname, oname) == val
-
+
sname_new = "new_section"
oname_new = "new_key"
ival = 10
w_config.set_value(sname_new, oname_new, ival)
assert w_config.get_value(sname_new, oname_new) == ival
-
+
file_obj.seek(0)
r_config = GitConfigParser(file_obj, read_only=True)
#print file_obj.getvalue()
@@ -63,7 +64,7 @@ class TestBase(TestCase):
assert r_config.has_option(sname, oname)
assert r_config.get(sname, oname) == val
# END for each filename
-
+
def test_base(self):
path_repo = fixture_path("git_config")
path_global = fixture_path("git_config_global")
@@ -71,7 +72,7 @@ class TestBase(TestCase):
assert r_config.read_only
num_sections = 0
num_options = 0
-
+
# test reader methods
assert r_config._is_initialized == False
for section in r_config.sections():
@@ -84,21 +85,19 @@ class TestBase(TestCase):
assert val
assert "\n" not in option
assert "\n" not in val
-
+
# writing must fail
self.failUnlessRaises(IOError, r_config.set, section, option, None)
- self.failUnlessRaises(IOError, r_config.remove_option, section, option )
+ self.failUnlessRaises(IOError, r_config.remove_option, section, option)
# END for each option
self.failUnlessRaises(IOError, r_config.remove_section, section)
- # END for each section
+ # END for each section
assert num_sections and num_options
assert r_config._is_initialized == True
-
+
# get value which doesnt exist, with default
default = "my default value"
assert r_config.get_value("doesnt", "exist", default) == default
-
+
# it raises if there is no default though
self.failUnlessRaises(NoSectionError, r_config.get_value, "doesnt", "exist")
-
-
diff --git a/git/test/test_db.py b/git/test/test_db.py
index dc8190a7..b53c4209 100644
--- a/git/test/test_db.py
+++ b/git/test/test_db.py
@@ -9,17 +9,18 @@ from gitdb.util import bin_to_hex
from git.exc import BadObject
import os
+
class TestDB(TestBase):
-
+
def test_base(self):
gdb = GitCmdObjectDB(os.path.join(self.rorepo.git_dir, 'objects'), self.rorepo.git)
-
+
# partial to complete - works with everything
hexsha = bin_to_hex(gdb.partial_to_complete_sha_hex("0.1.6"))
assert len(hexsha) == 40
-
+
assert bin_to_hex(gdb.partial_to_complete_sha_hex(hexsha[:20])) == hexsha
-
+
# fails with BadObject
for invalid_rev in ("0000", "bad/ref", "super bad"):
self.failUnlessRaises(BadObject, gdb.partial_to_complete_sha_hex, invalid_rev)
diff --git a/git/test/test_diff.py b/git/test/test_diff.py
index 80652c8e..151a3d14 100644
--- a/git/test/test_diff.py
+++ b/git/test/test_diff.py
@@ -7,8 +7,9 @@
from git.test.lib import *
from git import *
+
class TestDiff(TestBase):
-
+
def _assert_diff_format(self, diffs):
# verify that the format of the diff is sane
for diff in diffs:
@@ -16,19 +17,19 @@ class TestDiff(TestBase):
assert isinstance(diff.a_mode, int)
if diff.b_mode:
assert isinstance(diff.b_mode, int)
-
+
if diff.a_blob:
assert not diff.a_blob.path.endswith('\n')
if diff.b_blob:
assert not diff.b_blob.path.endswith('\n')
# END for each diff
return diffs
-
+
def test_list_from_string_new_mode(self):
output = StringProcessAdapter(fixture('diff_new_mode'))
diffs = Diff._index_from_patch_format(self.rorepo, output.stdout)
self._assert_diff_format(diffs)
-
+
assert_equal(1, len(diffs))
assert_equal(10, len(diffs[0].diff.splitlines()))
@@ -36,7 +37,7 @@ class TestDiff(TestBase):
output = StringProcessAdapter(fixture('diff_rename'))
diffs = Diff._index_from_patch_format(self.rorepo, output.stdout)
self._assert_diff_format(diffs)
-
+
assert_equal(1, len(diffs))
diff = diffs[0]
@@ -47,10 +48,10 @@ class TestDiff(TestBase):
def test_diff_patch_format(self):
# test all of the 'old' format diffs for completness - it should at least
# be able to deal with it
- fixtures = ("diff_2", "diff_2f", "diff_f", "diff_i", "diff_mode_only",
- "diff_new_mode", "diff_numstat", "diff_p", "diff_rename",
- "diff_tree_numstat_root" )
-
+ fixtures = ("diff_2", "diff_2f", "diff_f", "diff_i", "diff_mode_only",
+ "diff_new_mode", "diff_numstat", "diff_p", "diff_rename",
+ "diff_tree_numstat_root")
+
for fixture_name in fixtures:
diff_proc = StringProcessAdapter(fixture(fixture_name))
diffs = Diff._index_from_patch_format(self.rorepo, diff_proc.stdout)
@@ -61,24 +62,24 @@ class TestDiff(TestBase):
assertion_map = dict()
for i, commit in enumerate(self.rorepo.iter_commits('0.1.6', max_count=2)):
diff_item = commit
- if i%2 == 0:
+ if i % 2 == 0:
diff_item = commit.tree
# END use tree every second item
-
+
for other in (None, commit.Index, commit.parents[0]):
for paths in (None, "CHANGES", ("CHANGES", "lib")):
for create_patch in range(2):
diff_index = diff_item.diff(other, paths, create_patch)
assert isinstance(diff_index, DiffIndex)
-
+
if diff_index:
self._assert_diff_format(diff_index)
for ct in DiffIndex.change_type:
- key = 'ct_%s'%ct
+ key = 'ct_%s' % ct
assertion_map.setdefault(key, 0)
- assertion_map[key] = assertion_map[key]+len(list(diff_index.iter_change_type(ct)))
+ assertion_map[key] = assertion_map[key] + len(list(diff_index.iter_change_type(ct)))
# END for each changetype
-
+
# check entries
diff_set = set()
diff_set.add(diff_index[0])
@@ -86,23 +87,21 @@ class TestDiff(TestBase):
assert len(diff_set) == 1
assert diff_index[0] == diff_index[0]
assert not (diff_index[0] != diff_index[0])
- # END diff index checking
+ # END diff index checking
# END for each patch option
# END for each path option
# END for each other side
# END for each commit
-
- # assert we could always find at least one instance of the members we
+
+ # assert we could always find at least one instance of the members we
# can iterate in the diff index - if not this indicates its not working correctly
# or our test does not span the whole range of possibilities
- for key,value in assertion_map.items():
+ for key, value in assertion_map.items():
assert value, "Did not find diff for %s" % key
- # END for each iteration type
-
+ # END for each iteration type
+
# test path not existing in the index - should be ignored
c = self.rorepo.head.commit
cp = c.parents[0]
diff_index = c.diff(cp, ["does/not/exist"])
assert len(diff_index) == 0
-
-
diff --git a/git/test/test_fun.py b/git/test/test_fun.py
index 096cd368..4672901c 100644
--- a/git/test/test_fun.py
+++ b/git/test/test_fun.py
@@ -1,7 +1,7 @@
from git.test.lib import *
from git.objects.fun import (
traverse_tree_recursive,
- traverse_trees_recursive,
+ traverse_trees_recursive,
tree_to_stream,
tree_entries_from_data
)
@@ -15,7 +15,7 @@ from gitdb.base import IStream
from gitdb.typ import str_tree_type
from stat import (
- S_IFDIR,
+ S_IFDIR,
S_IFREG,
S_IFLNK
)
@@ -23,8 +23,9 @@ from stat import (
from git.index import IndexFile
from cStringIO import StringIO
+
class TestFun(TestBase):
-
+
def _assert_index_entries(self, entries, trees):
index = IndexFile.from_tree(self.rorepo, *[self.rorepo.tree(bin_to_hex(t)) for t in trees])
assert entries
@@ -32,22 +33,22 @@ class TestFun(TestBase):
for entry in entries:
assert (entry.path, entry.stage) in index.entries
# END assert entry matches fully
-
+
def test_aggressive_tree_merge(self):
# head tree with additions, removals and modification compared to its predecessor
odb = self.rorepo.odb
- HC = self.rorepo.commit("6c1faef799095f3990e9970bc2cb10aa0221cf9c")
+ HC = self.rorepo.commit("6c1faef799095f3990e9970bc2cb10aa0221cf9c")
H = HC.tree
B = HC.parents[0].tree
-
+
# entries from single tree
trees = [H.binsha]
self._assert_index_entries(aggressive_tree_merge(odb, trees), trees)
-
+
# from multiple trees
trees = [B.binsha, H.binsha]
self._assert_index_entries(aggressive_tree_merge(odb, trees), trees)
-
+
# three way, no conflict
tree = self.rorepo.tree
B = tree("35a09c0534e89b2d43ec4101a5fb54576b577905")
@@ -55,16 +56,16 @@ class TestFun(TestBase):
M = tree("1f2b19de3301e76ab3a6187a49c9c93ff78bafbd")
trees = [B.binsha, H.binsha, M.binsha]
self._assert_index_entries(aggressive_tree_merge(odb, trees), trees)
-
+
# three-way, conflict in at least one file, both modified
B = tree("a7a4388eeaa4b6b94192dce67257a34c4a6cbd26")
H = tree("f9cec00938d9059882bb8eabdaf2f775943e00e5")
M = tree("44a601a068f4f543f73fd9c49e264c931b1e1652")
trees = [B.binsha, H.binsha, M.binsha]
self._assert_index_entries(aggressive_tree_merge(odb, trees), trees)
-
+
# too many trees
- self.failUnlessRaises(ValueError, aggressive_tree_merge, odb, trees*2)
+ self.failUnlessRaises(ValueError, aggressive_tree_merge, odb, trees * 2)
def mktree(self, odb, entries):
"""create a tree from the given tree entries and safe it to the database"""
@@ -73,122 +74,123 @@ class TestFun(TestBase):
sio.seek(0)
istream = odb.store(IStream(str_tree_type, len(sio.getvalue()), sio))
return istream.binsha
-
+
@with_rw_repo('0.1.6')
def test_three_way_merge(self, rwrepo):
def mkfile(name, sha, executable=0):
- return (sha, S_IFREG | 0644 | executable*0111, name)
+ return (sha, S_IFREG | 0644 | executable * 0111, name)
+
def mkcommit(name, sha):
return (sha, S_IFDIR | S_IFLNK, name)
+
def assert_entries(entries, num_entries, has_conflict=False):
assert len(entries) == num_entries
assert has_conflict == (len([e for e in entries if e.stage != 0]) > 0)
mktree = self.mktree
-
- shaa = "\1"*20
- shab = "\2"*20
- shac = "\3"*20
-
+
+ shaa = "\1" * 20
+ shab = "\2" * 20
+ shac = "\3" * 20
+
odb = rwrepo.odb
-
+
# base tree
bfn = 'basefile'
fbase = mkfile(bfn, shaa)
tb = mktree(odb, [fbase])
-
+
# non-conflicting new files, same data
fa = mkfile('1', shab)
th = mktree(odb, [fbase, fa])
fb = mkfile('2', shac)
tm = mktree(odb, [fbase, fb])
-
+
# two new files, same base file
trees = [tb, th, tm]
assert_entries(aggressive_tree_merge(odb, trees), 3)
-
+
# both delete same file, add own one
fa = mkfile('1', shab)
th = mktree(odb, [fa])
fb = mkfile('2', shac)
tm = mktree(odb, [fb])
-
+
# two new files
trees = [tb, th, tm]
assert_entries(aggressive_tree_merge(odb, trees), 2)
-
+
# same file added in both, differently
fa = mkfile('1', shab)
th = mktree(odb, [fa])
fb = mkfile('1', shac)
tm = mktree(odb, [fb])
-
+
# expect conflict
trees = [tb, th, tm]
assert_entries(aggressive_tree_merge(odb, trees), 2, True)
-
+
# same file added, different mode
fa = mkfile('1', shab)
th = mktree(odb, [fa])
fb = mkcommit('1', shab)
tm = mktree(odb, [fb])
-
+
# expect conflict
trees = [tb, th, tm]
assert_entries(aggressive_tree_merge(odb, trees), 2, True)
-
+
# same file added in both
fa = mkfile('1', shab)
th = mktree(odb, [fa])
fb = mkfile('1', shab)
tm = mktree(odb, [fb])
-
+
# expect conflict
trees = [tb, th, tm]
assert_entries(aggressive_tree_merge(odb, trees), 1)
-
+
# modify same base file, differently
fa = mkfile(bfn, shab)
th = mktree(odb, [fa])
fb = mkfile(bfn, shac)
tm = mktree(odb, [fb])
-
+
# conflict, 3 versions on 3 stages
trees = [tb, th, tm]
assert_entries(aggressive_tree_merge(odb, trees), 3, True)
-
-
+
# change mode on same base file, by making one a commit, the other executable
# no content change ( this is totally unlikely to happen in the real world )
fa = mkcommit(bfn, shaa)
th = mktree(odb, [fa])
fb = mkfile(bfn, shaa, executable=1)
tm = mktree(odb, [fb])
-
+
# conflict, 3 versions on 3 stages, because of different mode
trees = [tb, th, tm]
assert_entries(aggressive_tree_merge(odb, trees), 3, True)
-
+
for is_them in range(2):
# only we/they change contents
fa = mkfile(bfn, shab)
th = mktree(odb, [fa])
-
+
trees = [tb, th, tb]
if is_them:
trees = [tb, tb, th]
entries = aggressive_tree_merge(odb, trees)
assert len(entries) == 1 and entries[0].binsha == shab
-
+
# only we/they change the mode
fa = mkcommit(bfn, shaa)
th = mktree(odb, [fa])
-
+
trees = [tb, th, tb]
if is_them:
trees = [tb, tb, th]
entries = aggressive_tree_merge(odb, trees)
assert len(entries) == 1 and entries[0].binsha == shaa and entries[0].mode == fa[1]
-
+
# one side deletes, the other changes = conflict
fa = mkfile(bfn, shab)
th = mktree(odb, [fa])
@@ -199,16 +201,16 @@ class TestFun(TestBase):
# as one is deleted, there are only 2 entries
assert_entries(aggressive_tree_merge(odb, trees), 2, True)
# END handle ours, theirs
-
+
def _assert_tree_entries(self, entries, num_trees):
for entry in entries:
assert len(entry) == num_trees
paths = set(e[2] for e in entry if e)
-
+
# only one path per set of entries
assert len(paths) == 1
# END verify entry
-
+
def test_tree_traversal(self):
# low level tree tarversal
odb = self.rorepo.odb
@@ -216,29 +218,29 @@ class TestFun(TestBase):
M = self.rorepo.tree('e14e3f143e7260de9581aee27e5a9b2645db72de') # merge tree
B = self.rorepo.tree('f606937a7a21237c866efafcad33675e6539c103') # base tree
B_old = self.rorepo.tree('1f66cfbbce58b4b552b041707a12d437cc5f400a') # old base tree
-
+
# two very different trees
entries = traverse_trees_recursive(odb, [B_old.binsha, H.binsha], '')
self._assert_tree_entries(entries, 2)
-
+
oentries = traverse_trees_recursive(odb, [H.binsha, B_old.binsha], '')
assert len(oentries) == len(entries)
self._assert_tree_entries(oentries, 2)
-
+
# single tree
is_no_tree = lambda i, d: i.type != 'tree'
entries = traverse_trees_recursive(odb, [B.binsha], '')
assert len(entries) == len(list(B.traverse(predicate=is_no_tree)))
self._assert_tree_entries(entries, 1)
-
+
# two trees
entries = traverse_trees_recursive(odb, [B.binsha, H.binsha], '')
self._assert_tree_entries(entries, 2)
-
+
# tree trees
entries = traverse_trees_recursive(odb, [B.binsha, H.binsha, M.binsha], '')
self._assert_tree_entries(entries, 3)
-
+
def test_tree_traversal_single(self):
max_count = 50
count = 0
diff --git a/git/test/test_git.py b/git/test/test_git.py
index 7132aa83..49c256ca 100644
--- a/git/test/test_git.py
+++ b/git/test/test_git.py
@@ -5,15 +5,16 @@
# the BSD License: http://www.opensource.org/licenses/bsd-license.php
import os
-from git.test.lib import ( TestBase,
- patch,
+from git.test.lib import (TestBase,
+ patch,
raises,
assert_equal,
assert_true,
assert_match,
- fixture_path )
-from git import ( Git,
- GitCommandError )
+ fixture_path)
+from git import (Git,
+ GitCommandError)
+
class TestGit(TestBase):
@@ -41,7 +42,6 @@ class TestGit(TestBase):
def test_it_raises_errors(self):
self.git.this_does_not_exist()
-
def test_it_transforms_kwargs_into_git_command_arguments(self):
assert_equal(["-s"], self.git.transform_kwargs(**{'s': True}))
assert_equal(["-s5"], self.git.transform_kwargs(**{'s': 5}))
@@ -52,7 +52,7 @@ class TestGit(TestBase):
assert_equal(["-s", "-t"], self.git.transform_kwargs(**{'s': True, 't': True}))
def test_it_executes_git_to_shell_and_returns_result(self):
- assert_match('^git version [\d\.]{2}.*$', self.git.execute(["git","version"]))
+ assert_match('^git version [\d\.]{2}.*$', self.git.execute(["git", "version"]))
def test_it_accepts_stdin(self):
filename = fixture_path("cat_file_blob")
@@ -71,13 +71,13 @@ class TestGit(TestBase):
# read header only
import subprocess as sp
hexsha = "b2339455342180c7cc1e9bba3e9f181f7baa5167"
- g = self.git.cat_file(batch_check=True, istream=sp.PIPE,as_process=True)
+ g = self.git.cat_file(batch_check=True, istream=sp.PIPE, as_process=True)
g.stdin.write("b2339455342180c7cc1e9bba3e9f181f7baa5167\n")
g.stdin.flush()
obj_info = g.stdout.readline()
# read header + data
- g = self.git.cat_file(batch=True, istream=sp.PIPE,as_process=True)
+ g = self.git.cat_file(batch=True, istream=sp.PIPE, as_process=True)
g.stdin.write("b2339455342180c7cc1e9bba3e9f181f7baa5167\n")
g.stdin.flush()
obj_info_two = g.stdout.readline()
@@ -87,15 +87,14 @@ class TestGit(TestBase):
size = int(obj_info.split()[2])
data = g.stdout.read(size)
g.stdout.read(1)
-
+
# now we should be able to read a new object
g.stdin.write("b2339455342180c7cc1e9bba3e9f181f7baa5167\n")
g.stdin.flush()
assert g.stdout.readline() == obj_info
-
# same can be achived using the respective command functions
- hexsha, typename, size = self.git.get_object_header(hexsha)
+ hexsha, typename, size = self.git.get_object_header(hexsha)
hexsha, typename_two, size_two, data = self.git.get_object_data(hexsha)
assert typename == typename_two and size == size_two
@@ -123,7 +122,7 @@ class TestGit(TestBase):
self.assertEquals(git_version, git_command_version)
def test_single_char_git_options_are_passed_to_git(self):
- input_value='TestValue'
+ input_value = 'TestValue'
output_value = self.git(c='user.name={}'.format(input_value)).config('--get', 'user.name')
self.assertEquals(input_value, output_value)
diff --git a/git/test/test_index.py b/git/test/test_index.py
index d532a3b4..f1f718cd 100644
--- a/git/test/test_index.py
+++ b/git/test/test_index.py
@@ -14,12 +14,13 @@ import glob
import shutil
from stat import *
+
class TestIndex(TestBase):
-
+
def __init__(self, *args):
super(TestIndex, self).__init__(*args)
self._reset_progress()
-
+
def _assert_fprogress(self, entries):
assert len(entries) == len(self._fprogress_map)
for path, call_count in self._fprogress_map.iteritems():
@@ -35,48 +36,48 @@ class TestIndex(TestBase):
if curval == 1:
assert done
self._fprogress_map[path] = curval + 1
-
+
def _fprogress_add(self, path, done, item):
- """Called as progress func - we keep track of the proper
+ """Called as progress func - we keep track of the proper
call order"""
assert item is not None
self._fprogress(path, done, item)
-
+
def _reset_progress(self):
# maps paths to the count of calls
self._fprogress_map = dict()
-
+
def _assert_entries(self, entries):
for entry in entries:
assert isinstance(entry, BaseIndexEntry)
assert not os.path.isabs(entry.path)
assert not "\\" in entry.path
# END for each entry
-
+
def test_index_file_base(self):
# read from file
index = IndexFile(self.rorepo, fixture_path("index"))
assert index.entries
assert index.version > 0
-
+
# test entry
last_val = None
entry = index.entries.itervalues().next()
- for attr in ("path","ctime","mtime","dev","inode","mode","uid",
- "gid","size","binsha", "hexsha", "stage"):
+ for attr in ("path", "ctime", "mtime", "dev", "inode", "mode", "uid",
+ "gid", "size", "binsha", "hexsha", "stage"):
val = getattr(entry, attr)
# END for each method
-
+
# test update
entries = index.entries
assert isinstance(index.update(), IndexFile)
assert entries is not index.entries
-
+
# test stage
index_merge = IndexFile(self.rorepo, fixture_path("index_merge"))
assert len(index_merge.entries) == 106
- assert len(list(e for e in index_merge.entries.itervalues() if e.stage != 0 ))
-
+ assert len(list(e for e in index_merge.entries.itervalues() if e.stage != 0))
+
# write the data - it must match the original
tmpfile = tempfile.mktemp()
index_merge.write(tmpfile)
@@ -84,82 +85,80 @@ class TestIndex(TestBase):
assert fp.read() == fixture("index_merge")
fp.close()
os.remove(tmpfile)
-
+
def _cmp_tree_index(self, tree, index):
# fail unless both objects contain the same paths and blobs
if isinstance(tree, str):
tree = self.rorepo.commit(tree).tree
-
+
num_blobs = 0
blist = list()
- for blob in tree.traverse(predicate = lambda e,d: e.type == "blob", branch_first=False):
- assert (blob.path,0) in index.entries
+ for blob in tree.traverse(predicate=lambda e, d: e.type == "blob", branch_first=False):
+ assert (blob.path, 0) in index.entries
blist.append(blob)
# END for each blob in tree
if len(blist) != len(index.entries):
iset = set(k[0] for k in index.entries.keys())
bset = set(b.path for b in blist)
- raise AssertionError( "CMP Failed: Missing entries in index: %s, missing in tree: %s" % (bset-iset, iset-bset) )
+ raise AssertionError("CMP Failed: Missing entries in index: %s, missing in tree: %s" % (bset - iset, iset - bset))
# END assertion message
-
+
@with_rw_repo('0.1.6')
def test_index_file_from_tree(self, rw_repo):
common_ancestor_sha = "5117c9c8a4d3af19a9958677e45cda9269de1541"
cur_sha = "4b43ca7ff72d5f535134241e7c797ddc9c7a3573"
other_sha = "39f85c4358b7346fee22169da9cad93901ea9eb9"
-
- # simple index from tree
+
+ # simple index from tree
base_index = IndexFile.from_tree(rw_repo, common_ancestor_sha)
assert base_index.entries
self._cmp_tree_index(common_ancestor_sha, base_index)
-
+
# merge two trees - its like a fast-forward
two_way_index = IndexFile.from_tree(rw_repo, common_ancestor_sha, cur_sha)
assert two_way_index.entries
self._cmp_tree_index(cur_sha, two_way_index)
-
+
# merge three trees - here we have a merge conflict
three_way_index = IndexFile.from_tree(rw_repo, common_ancestor_sha, cur_sha, other_sha)
assert len(list(e for e in three_way_index.entries.values() if e.stage != 0))
-
-
+
# ITERATE BLOBS
merge_required = lambda t: t[0] != 0
merge_blobs = list(three_way_index.iter_blobs(merge_required))
assert merge_blobs
- assert merge_blobs[0][0] in (1,2,3)
+ assert merge_blobs[0][0] in (1, 2, 3)
assert isinstance(merge_blobs[0][1], Blob)
-
+
# test BlobFilter
prefix = 'lib/git'
for stage, blob in base_index.iter_blobs(BlobFilter([prefix])):
- assert blob.path.startswith(prefix)
-
-
+ assert blob.path.startswith(prefix)
+
# writing a tree should fail with an unmerged index
self.failUnlessRaises(UnmergedEntriesError, three_way_index.write_tree)
-
+
# removed unmerged entries
unmerged_blob_map = three_way_index.unmerged_blobs()
assert unmerged_blob_map
-
+
# pick the first blob at the first stage we find and use it as resolved version
- three_way_index.resolve_blobs( l[0][1] for l in unmerged_blob_map.itervalues() )
+ three_way_index.resolve_blobs(l[0][1] for l in unmerged_blob_map.itervalues())
tree = three_way_index.write_tree()
assert isinstance(tree, Tree)
num_blobs = 0
- for blob in tree.traverse(predicate=lambda item,d: item.type == "blob"):
- assert (blob.path,0) in three_way_index.entries
+ for blob in tree.traverse(predicate=lambda item, d: item.type == "blob"):
+ assert (blob.path, 0) in three_way_index.entries
num_blobs += 1
# END for each blob
assert num_blobs == len(three_way_index.entries)
-
+
@with_rw_repo('0.1.6')
def test_index_merge_tree(self, rw_repo):
- # A bit out of place, but we need a different repo for this:
+ # A bit out of place, but we need a different repo for this:
assert self.rorepo != rw_repo and not (self.rorepo == rw_repo)
assert len(set((self.rorepo, self.rorepo, rw_repo, rw_repo))) == 2
-
+
# SINGLE TREE MERGE
# current index is at the (virtual) cur_commit
next_commit = "4c39f9da792792d4e73fc3a5effde66576ae128c"
@@ -169,107 +168,106 @@ class TestIndex(TestBase):
rw_repo.index.merge_tree(next_commit)
# only one change should be recorded
assert manifest_entry.binsha != rw_repo.index.entries[manifest_key].binsha
-
+
rw_repo.index.reset(rw_repo.head)
assert rw_repo.index.entries[manifest_key].binsha == manifest_entry.binsha
-
+
# FAKE MERGE
#############
- # Add a change with a NULL sha that should conflict with next_commit. We
- # pretend there was a change, but we do not even bother adding a proper
+ # Add a change with a NULL sha that should conflict with next_commit. We
+ # pretend there was a change, but we do not even bother adding a proper
# sha for it ( which makes things faster of course )
- manifest_fake_entry = BaseIndexEntry((manifest_entry[0], "\0"*20, 0, manifest_entry[3]))
+ manifest_fake_entry = BaseIndexEntry((manifest_entry[0], "\0" * 20, 0, manifest_entry[3]))
# try write flag
self._assert_entries(rw_repo.index.add([manifest_fake_entry], write=False))
- # add actually resolves the null-hex-sha for us as a feature, but we can
+ # add actually resolves the null-hex-sha for us as a feature, but we can
# edit the index manually
assert rw_repo.index.entries[manifest_key].binsha != Object.NULL_BIN_SHA
- # must operate on the same index for this ! Its a bit problematic as
+ # must operate on the same index for this ! Its a bit problematic as
# it might confuse people
- index = rw_repo.index
+ index = rw_repo.index
index.entries[manifest_key] = IndexEntry.from_base(manifest_fake_entry)
index.write()
assert rw_repo.index.entries[manifest_key].hexsha == Diff.NULL_HEX_SHA
-
+
# write an unchanged index ( just for the fun of it )
rw_repo.index.write()
-
- # a three way merge would result in a conflict and fails as the command will
- # not overwrite any entries in our index and hence leave them unmerged. This is
+
+ # a three way merge would result in a conflict and fails as the command will
+ # not overwrite any entries in our index and hence leave them unmerged. This is
# mainly a protection feature as the current index is not yet in a tree
self.failUnlessRaises(GitCommandError, index.merge_tree, next_commit, base=parent_commit)
-
- # the only way to get the merged entries is to safe the current index away into a tree,
+
+ # the only way to get the merged entries is to safe the current index away into a tree,
# which is like a temporary commit for us. This fails as well as the NULL sha deos not
# have a corresponding object
# NOTE: missing_ok is not a kwarg anymore, missing_ok is always true
# self.failUnlessRaises(GitCommandError, index.write_tree)
-
+
# if missing objects are okay, this would work though ( they are always okay now )
tree = index.write_tree()
-
+
# now make a proper three way merge with unmerged entries
unmerged_tree = IndexFile.from_tree(rw_repo, parent_commit, tree, next_commit)
unmerged_blobs = unmerged_tree.unmerged_blobs()
assert len(unmerged_blobs) == 1 and unmerged_blobs.keys()[0] == manifest_key[0]
-
-
+
@with_rw_repo('0.1.6')
def test_index_file_diffing(self, rw_repo):
# default Index instance points to our index
index = IndexFile(rw_repo)
assert index.path is not None
assert len(index.entries)
-
+
# write the file back
index.write()
-
+
# could sha it, or check stats
-
+
# test diff
- # resetting the head will leave the index in a different state, and the
+ # resetting the head will leave the index in a different state, and the
# diff will yield a few changes
cur_head_commit = rw_repo.head.reference.commit
ref = rw_repo.head.reset('HEAD~6', index=True, working_tree=False)
-
+
# diff against same index is 0
diff = index.diff()
assert len(diff) == 0
-
+
# against HEAD as string, must be the same as it matches index
diff = index.diff('HEAD')
assert len(diff) == 0
-
+
# against previous head, there must be a difference
diff = index.diff(cur_head_commit)
assert len(diff)
-
+
# we reverse the result
adiff = index.diff(str(cur_head_commit), R=True)
odiff = index.diff(cur_head_commit, R=False) # now its not reversed anymore
assert adiff != odiff
assert odiff == diff # both unreversed diffs against HEAD
-
+
# against working copy - its still at cur_commit
wdiff = index.diff(None)
assert wdiff != adiff
assert wdiff != odiff
-
+
# against something unusual
self.failUnlessRaises(ValueError, index.diff, int)
-
+
# adjust the index to match an old revision
cur_branch = rw_repo.active_branch
cur_commit = cur_branch.commit
rev_head_parent = 'HEAD~1'
assert index.reset(rev_head_parent) is index
-
+
assert cur_branch == rw_repo.active_branch
assert cur_commit == rw_repo.head.commit
-
+
# there must be differences towards the working tree which is in the 'future'
assert index.diff(None)
-
+
# reset the working copy as well to current head,to pull 'back' as well
new_data = "will be reverted"
file_path = os.path.join(rw_repo.working_tree_dir, "CHANGES")
@@ -280,12 +278,12 @@ class TestIndex(TestBase):
assert not index.diff(None)
assert cur_branch == rw_repo.active_branch
assert cur_commit == rw_repo.head.commit
- fp = open(file_path,'rb')
+ fp = open(file_path, 'rb')
try:
assert fp.read() != new_data
finally:
fp.close()
-
+
# test full checkout
test_file = os.path.join(rw_repo.working_tree_dir, "CHANGES")
open(test_file, 'ab').write("some data")
@@ -293,24 +291,24 @@ class TestIndex(TestBase):
assert 'CHANGES' in list(rval)
self._assert_fprogress([None])
assert os.path.isfile(test_file)
-
+
os.remove(test_file)
rval = index.checkout(None, force=False, fprogress=self._fprogress)
assert 'CHANGES' in list(rval)
self._assert_fprogress([None])
assert os.path.isfile(test_file)
-
+
# individual file
os.remove(test_file)
rval = index.checkout(test_file, fprogress=self._fprogress)
assert list(rval)[0] == 'CHANGES'
self._assert_fprogress([test_file])
assert os.path.exists(test_file)
-
+
# checking out non-existing file throws
self.failUnlessRaises(CheckoutError, index.checkout, "doesnt_exist_ever.txt.that")
self.failUnlessRaises(CheckoutError, index.checkout, paths=["doesnt/exist"])
-
+
# checkout file with modifications
append_data = "hello"
fp = open(test_file, "ab")
@@ -325,16 +323,16 @@ class TestIndex(TestBase):
assert open(test_file).read().endswith(append_data)
else:
raise AssertionError("Exception CheckoutError not thrown")
-
+
# if we force it it should work
index.checkout(test_file, force=True)
assert not open(test_file).read().endswith(append_data)
-
+
# checkout directory
shutil.rmtree(os.path.join(rw_repo.working_tree_dir, "lib"))
rval = index.checkout('lib')
assert len(list(rval)) > 1
-
+
def _count_existing(self, repo, files):
"""
Returns count of files that actually exist in the repository directory.
@@ -346,24 +344,24 @@ class TestIndex(TestBase):
# END for each deleted file
return existing
# END num existing helper
-
+
@with_rw_repo('0.1.6')
def test_index_mutation(self, rw_repo):
index = rw_repo.index
num_entries = len(index.entries)
cur_head = rw_repo.head
-
+
uname = "Some Developer"
umail = "sd@company.com"
rw_repo.config_writer().set_value("user", "name", uname)
- rw_repo.config_writer().set_value("user", "email", umail)
-
- # remove all of the files, provide a wild mix of paths, BaseIndexEntries,
+ rw_repo.config_writer().set_value("user", "email", umail)
+
+ # remove all of the files, provide a wild mix of paths, BaseIndexEntries,
# IndexEntries
def mixed_iterator():
count = 0
for entry in index.entries.itervalues():
- type_id = count % 4
+ type_id = count % 4
if type_id == 0: # path
yield entry.path
elif type_id == 1: # blob
@@ -375,39 +373,39 @@ class TestIndex(TestBase):
else:
raise AssertionError("Invalid Type")
count += 1
- # END for each entry
+ # END for each entry
# END mixed iterator
deleted_files = index.remove(mixed_iterator(), working_tree=False)
assert deleted_files
assert self._count_existing(rw_repo, deleted_files) == len(deleted_files)
assert len(index.entries) == 0
-
+
# reset the index to undo our changes
index.reset()
assert len(index.entries) == num_entries
-
+
# remove with working copy
deleted_files = index.remove(mixed_iterator(), working_tree=True)
assert deleted_files
assert self._count_existing(rw_repo, deleted_files) == 0
-
+
# reset everything
index.reset(working_tree=True)
assert self._count_existing(rw_repo, deleted_files) == len(deleted_files)
-
+
# invalid type
self.failUnlessRaises(TypeError, index.remove, [1])
-
+
# absolute path
- deleted_files = index.remove([os.path.join(rw_repo.working_tree_dir,"lib")], r=True)
+ deleted_files = index.remove([os.path.join(rw_repo.working_tree_dir, "lib")], r=True)
assert len(deleted_files) > 1
self.failUnlessRaises(ValueError, index.remove, ["/doesnt/exists"])
-
+
# TEST COMMITTING
# commit changed index
cur_commit = cur_head.commit
commit_message = "commit default head"
-
+
new_commit = index.commit(commit_message, head=False)
assert cur_commit != new_commit
assert new_commit.author.name == uname
@@ -418,66 +416,66 @@ class TestIndex(TestBase):
assert new_commit.parents[0] == cur_commit
assert len(new_commit.parents) == 1
assert cur_head.commit == cur_commit
-
+
# same index, no parents
commit_message = "index without parents"
commit_no_parents = index.commit(commit_message, parent_commits=list(), head=True)
assert commit_no_parents.message == commit_message
assert len(commit_no_parents.parents) == 0
assert cur_head.commit == commit_no_parents
-
+
# same index, multiple parents
commit_message = "Index with multiple parents\n commit with another line"
- commit_multi_parent = index.commit(commit_message,parent_commits=(commit_no_parents, new_commit))
+ commit_multi_parent = index.commit(commit_message, parent_commits=(commit_no_parents, new_commit))
assert commit_multi_parent.message == commit_message
assert len(commit_multi_parent.parents) == 2
assert commit_multi_parent.parents[0] == commit_no_parents
assert commit_multi_parent.parents[1] == new_commit
assert cur_head.commit == commit_multi_parent
-
+
# re-add all files in lib
# get the lib folder back on disk, but get an index without it
index.reset(new_commit.parents[0], working_tree=True).reset(new_commit, working_tree=False)
lib_file_path = os.path.join("lib", "git", "__init__.py")
assert (lib_file_path, 0) not in index.entries
assert os.path.isfile(os.path.join(rw_repo.working_tree_dir, lib_file_path))
-
+
# directory
entries = index.add(['lib'], fprogress=self._fprogress_add)
self._assert_entries(entries)
self._assert_fprogress(entries)
- assert len(entries)>1
-
- # glob
+ assert len(entries) > 1
+
+ # glob
entries = index.reset(new_commit).add([os.path.join('lib', 'git', '*.py')], fprogress=self._fprogress_add)
self._assert_entries(entries)
self._assert_fprogress(entries)
assert len(entries) == 14
-
- # same file
- entries = index.reset(new_commit).add([os.path.abspath(os.path.join('lib', 'git', 'head.py'))]*2, fprogress=self._fprogress_add)
+
+ # same file
+ entries = index.reset(new_commit).add([os.path.abspath(os.path.join('lib', 'git', 'head.py'))] * 2, fprogress=self._fprogress_add)
self._assert_entries(entries)
assert entries[0].mode & 0644 == 0644
# would fail, test is too primitive to handle this case
# self._assert_fprogress(entries)
self._reset_progress()
assert len(entries) == 2
-
+
# missing path
self.failUnlessRaises(OSError, index.reset(new_commit).add, ['doesnt/exist/must/raise'])
-
+
# blob from older revision overrides current index revision
old_blob = new_commit.parents[0].tree.blobs[0]
entries = index.reset(new_commit).add([old_blob], fprogress=self._fprogress_add)
self._assert_entries(entries)
self._assert_fprogress(entries)
- assert index.entries[(old_blob.path,0)].hexsha == old_blob.hexsha and len(entries) == 1
-
+ assert index.entries[(old_blob.path, 0)].hexsha == old_blob.hexsha and len(entries) == 1
+
# mode 0 not allowed
null_hex_sha = Diff.NULL_HEX_SHA
null_bin_sha = "\0" * 20
- self.failUnlessRaises(ValueError, index.reset(new_commit).add, [BaseIndexEntry((0, null_bin_sha,0,"doesntmatter"))])
-
+ self.failUnlessRaises(ValueError, index.reset(new_commit).add, [BaseIndexEntry((0, null_bin_sha, 0, "doesntmatter"))])
+
# add new file
new_file_relapath = "my_new_file"
new_file_path = self._make_file(new_file_relapath, "hello world", rw_repo)
@@ -485,7 +483,7 @@ class TestIndex(TestBase):
self._assert_entries(entries)
self._assert_fprogress(entries)
assert len(entries) == 1 and entries[0].hexsha != null_hex_sha
-
+
# add symlink
if sys.platform != "win32":
basename = "my_real_symlink"
@@ -497,11 +495,11 @@ class TestIndex(TestBase):
self._assert_fprogress(entries)
assert len(entries) == 1 and S_ISLNK(entries[0].mode)
assert S_ISLNK(index.entries[index.entry_key("my_real_symlink", 0)].mode)
-
+
# we expect only the target to be written
assert index.repo.odb.stream(entries[0].binsha).read() == target
- # END real symlink test
-
+ # END real symlink test
+
# add fake symlink and assure it checks-our as symlink
fake_symlink_relapath = "my_fake_symlink"
link_target = "/etc/that"
@@ -512,83 +510,83 @@ class TestIndex(TestBase):
self._assert_fprogress(entries)
assert entries[0].hexsha != null_hex_sha
assert len(entries) == 1 and S_ISLNK(entries[0].mode)
-
+
# assure this also works with an alternate method
full_index_entry = IndexEntry.from_base(BaseIndexEntry((0120000, entries[0].binsha, 0, entries[0].path)))
entry_key = index.entry_key(full_index_entry)
index.reset(new_commit)
-
+
assert entry_key not in index.entries
index.entries[entry_key] = full_index_entry
index.write()
index.update() # force reread of entries
new_entry = index.entries[entry_key]
assert S_ISLNK(new_entry.mode)
-
+
# a tree created from this should contain the symlink
tree = index.write_tree()
assert fake_symlink_relapath in tree
index.write() # flush our changes for the checkout
-
+
# checkout the fakelink, should be a link then
assert not S_ISLNK(os.stat(fake_symlink_path)[ST_MODE])
os.remove(fake_symlink_path)
index.checkout(fake_symlink_path)
-
+
# on windows we will never get symlinks
if os.name == 'nt':
- # simlinks should contain the link as text ( which is what a
+ # simlinks should contain the link as text ( which is what a
# symlink actually is )
- open(fake_symlink_path,'rb').read() == link_target
+ open(fake_symlink_path, 'rb').read() == link_target
else:
assert S_ISLNK(os.lstat(fake_symlink_path)[ST_MODE])
-
+
# TEST RENAMING
def assert_mv_rval(rval):
for source, dest in rval:
assert not os.path.exists(source) and os.path.exists(dest)
# END for each renamed item
# END move assertion utility
-
+
self.failUnlessRaises(ValueError, index.move, ['just_one_path'])
# file onto existing file
files = ['AUTHORS', 'LICENSE']
self.failUnlessRaises(GitCommandError, index.move, files)
-
- # again, with force
+
+ # again, with force
assert_mv_rval(index.move(files, f=True))
-
+
# files into directory - dry run
paths = ['LICENSE', 'VERSION', 'doc']
rval = index.move(paths, dry_run=True)
assert len(rval) == 2
assert os.path.exists(paths[0])
-
+
# again, no dry run
rval = index.move(paths)
assert_mv_rval(rval)
-
+
# dir into dir
rval = index.move(['doc', 'test'])
assert_mv_rval(rval)
-
-
+
# TEST PATH REWRITING
######################
count = [0]
+
def rewriter(entry):
rval = str(count[0])
count[0] += 1
return rval
# END rewriter
-
+
def make_paths():
# two existing ones, one new one
yield 'CHANGES'
yield 'ez_setup.py'
yield index.entries[index.entry_key('README', 0)]
yield index.entries[index.entry_key('.gitignore', 0)]
-
+
for fid in range(3):
fname = 'newfile%i' % fid
open(fname, 'wb').write("abcd")
@@ -597,11 +595,10 @@ class TestIndex(TestBase):
# END path producer
paths = list(make_paths())
self._assert_entries(index.add(paths, path_rewriter=rewriter))
-
+
for filenum in range(len(paths)):
assert index.entry_key(str(filenum), 0) in index.entries
-
-
+
# TEST RESET ON PATHS
######################
arela = "aa"
@@ -613,34 +610,33 @@ class TestIndex(TestBase):
keys = (akey, bkey)
absfiles = (afile, bfile)
files = (arela, brela)
-
+
for fkey in keys:
assert not fkey in index.entries
-
+
index.add(files, write=True)
nc = index.commit("2 files committed", head=False)
-
+
for fkey in keys:
assert fkey in index.entries
-
+
# just the index
index.reset(paths=(arela, afile))
assert not akey in index.entries
assert bkey in index.entries
-
+
# now with working tree - files on disk as well as entries must be recreated
rw_repo.head.commit = nc
for absfile in absfiles:
os.remove(absfile)
-
+
index.reset(working_tree=True, paths=files)
-
- for fkey in keys:
+
+ for fkey in keys:
assert fkey in index.entries
for absfile in absfiles:
assert os.path.isfile(absfile)
-
-
+
@with_rw_repo('HEAD')
def test_compare_write_tree(self, rw_repo):
# write all trees and compare them
@@ -654,16 +650,14 @@ class TestIndex(TestBase):
index = rw_repo.index.reset(commit)
orig_tree = commit.tree
assert index.write_tree() == orig_tree
- # END for each commit
-
+ # END for each commit
+
def test_index_new(self):
B = self.rorepo.tree("6d9b1f4f9fa8c9f030e3207e7deacc5d5f8bba4e")
H = self.rorepo.tree("25dca42bac17d511b7e2ebdd9d1d679e7626db5f")
M = self.rorepo.tree("e746f96bcc29238b79118123028ca170adc4ff0f")
-
- for args in ((B,), (B,H), (B,H,M)):
+
+ for args in ((B,), (B, H), (B, H, M)):
index = IndexFile.new(self.rorepo, *args)
assert isinstance(index, IndexFile)
# END for each arg tuple
-
-
diff --git a/git/test/test_reflog.py b/git/test/test_reflog.py
index fca9e1cd..fec50095 100644
--- a/git/test/test_reflog.py
+++ b/git/test/test_reflog.py
@@ -7,6 +7,7 @@ import tempfile
import shutil
import os
+
class TestRefLog(TestBase):
def test_reflogentry(self):
@@ -14,51 +15,51 @@ class TestRefLog(TestBase):
hexsha = 'F' * 40
actor = Actor('name', 'email')
msg = "message"
-
+
self.failUnlessRaises(ValueError, RefLogEntry.new, nullhexsha, hexsha, 'noactor', 0, 0, "")
e = RefLogEntry.new(nullhexsha, hexsha, actor, 0, 1, msg)
-
+
assert e.oldhexsha == nullhexsha
assert e.newhexsha == hexsha
assert e.actor == actor
assert e.time[0] == 0
assert e.time[1] == 1
assert e.message == msg
-
+
# check representation (roughly)
assert repr(e).startswith(nullhexsha)
-
+
def test_base(self):
rlp_head = fixture_path('reflog_HEAD')
rlp_master = fixture_path('reflog_master')
tdir = tempfile.mktemp(suffix="test_reflogs")
os.mkdir(tdir)
-
- rlp_master_ro = RefLog.path(self.rorepo.head)
+
+ rlp_master_ro = RefLog.path(self.rorepo.head)
assert os.path.isfile(rlp_master_ro)
-
+
# simple read
reflog = RefLog.from_file(rlp_master_ro)
assert reflog._path is not None
assert isinstance(reflog, RefLog)
assert len(reflog)
-
+
# iter_entries works with path and with stream
assert len(list(RefLog.iter_entries(open(rlp_master))))
assert len(list(RefLog.iter_entries(rlp_master)))
-
+
# raise on invalid revlog
# TODO: Try multiple corrupted ones !
pp = 'reflog_invalid_'
for suffix in ('oldsha', 'newsha', 'email', 'date', 'sep'):
- self.failUnlessRaises(ValueError, RefLog.from_file, fixture_path(pp+suffix))
+ self.failUnlessRaises(ValueError, RefLog.from_file, fixture_path(pp + suffix))
#END for each invalid file
-
+
# cannot write an uninitialized reflog
self.failUnlessRaises(ValueError, RefLog().write)
-
+
# test serialize and deserialize - results must match exactly
- binsha = chr(255)*20
+ binsha = chr(255) * 20
msg = "my reflog message"
cr = self.rorepo.config_reader()
for rlp in (rlp_head, rlp_master):
@@ -66,35 +67,34 @@ class TestRefLog(TestBase):
tfile = os.path.join(tdir, os.path.basename(rlp))
reflog.to_file(tfile)
assert reflog.write() is reflog
-
+
# parsed result must match ...
treflog = RefLog.from_file(tfile)
assert treflog == reflog
-
+
# ... as well as each bytes of the written stream
assert open(tfile).read() == open(rlp).read()
-
+
# append an entry
entry = RefLog.append_entry(cr, tfile, IndexObject.NULL_BIN_SHA, binsha, msg)
assert entry.oldhexsha == IndexObject.NULL_HEX_SHA
- assert entry.newhexsha == 'f'*40
+ assert entry.newhexsha == 'f' * 40
assert entry.message == msg
assert RefLog.from_file(tfile)[-1] == entry
-
+
# index entry
# raises on invalid index
self.failUnlessRaises(IndexError, RefLog.entry_at, rlp, 10000)
-
+
# indices can be positive ...
assert isinstance(RefLog.entry_at(rlp, 0), RefLogEntry)
RefLog.entry_at(rlp, 23)
-
+
# ... and negative
for idx in (-1, -24):
RefLog.entry_at(rlp, idx)
#END for each index to read
- # END for each reflog
-
-
+ # END for each reflog
+
# finally remove our temporary data
shutil.rmtree(tdir)
diff --git a/git/test/test_refs.py b/git/test/test_refs.py
index cf08d7ec..ee9d8074 100644
--- a/git/test/test_refs.py
+++ b/git/test/test_refs.py
@@ -13,39 +13,40 @@ from git.objects.tag import TagObject
from itertools import chain
import os
+
class TestRefs(TestBase):
def test_from_path(self):
# should be able to create any reference directly
- for ref_type in ( Reference, Head, TagReference, RemoteReference ):
+ for ref_type in (Reference, Head, TagReference, RemoteReference):
for name in ('rela_name', 'path/rela_name'):
full_path = ref_type.to_full_path(name)
instance = ref_type.from_path(self.rorepo, full_path)
assert isinstance(instance, ref_type)
- # END for each name
+ # END for each name
# END for each type
-
+
# invalid path
self.failUnlessRaises(ValueError, TagReference, self.rorepo, "refs/invalid/tag")
# works without path check
TagReference(self.rorepo, "refs/invalid/tag", check_path=False)
-
+
def test_tag_base(self):
tag_object_refs = list()
for tag in self.rorepo.tags:
assert "refs/tags" in tag.path
assert tag.name
- assert isinstance( tag.commit, Commit )
+ assert isinstance(tag.commit, Commit)
if tag.tag is not None:
- tag_object_refs.append( tag )
+ tag_object_refs.append(tag)
tagobj = tag.tag
# have no dict
self.failUnlessRaises(AttributeError, setattr, tagobj, 'someattr', 1)
- assert isinstance( tagobj, TagObject )
+ assert isinstance(tagobj, TagObject)
assert tagobj.tag == tag.name
- assert isinstance( tagobj.tagger, Actor )
- assert isinstance( tagobj.tagged_date, int )
- assert isinstance( tagobj.tagger_tz_offset, int )
+ assert isinstance(tagobj.tagger, Actor)
+ assert isinstance(tagobj.tagged_date, int)
+ assert isinstance(tagobj.tagger_tz_offset, int)
assert tagobj.message
assert tag.object == tagobj
# can't assign the object
@@ -55,16 +56,13 @@ class TestRefs(TestBase):
assert tag_object_refs
assert isinstance(self.rorepo.tags['0.1.5'], TagReference)
-
def test_tags_author(self):
tag = self.rorepo.tags[0]
tagobj = tag.tag
- assert isinstance( tagobj.tagger, Actor )
+ assert isinstance(tagobj.tagger, Actor)
tagger_name = tagobj.tagger.name
assert tagger_name == 'Michael Trier'
-
-
def test_tags(self):
# tag refs can point to tag objects or to commits
s = set()
@@ -79,8 +77,8 @@ class TestRefs(TestBase):
s.add(ref)
# END for each ref
assert len(s) == ref_count
- assert len(s|s) == ref_count
-
+ assert len(s | s) == ref_count
+
@with_rw_repo('HEAD', bare=False)
def test_heads(self, rwrepo):
for head in rwrepo.heads:
@@ -91,7 +89,7 @@ class TestRefs(TestBase):
cur_object = head.object
assert prev_object == cur_object # represent the same git object
assert prev_object is not cur_object # but are different instances
-
+
writer = head.config_writer()
tv = "testopt"
writer.set_value(tv, 1)
@@ -99,7 +97,7 @@ class TestRefs(TestBase):
del(writer)
assert head.config_reader().get_value(tv) == 1
head.config_writer().remove_option(tv)
-
+
# after the clone, we might still have a tracking branch setup
head.set_tracking_branch(None)
assert head.tracking_branch() is None
@@ -109,7 +107,7 @@ class TestRefs(TestBase):
head.set_tracking_branch(None)
assert head.tracking_branch() is None
# END for each head
-
+
# verify REFLOG gets altered
head = rwrepo.head
cur_head = head.ref
@@ -123,76 +121,73 @@ class TestRefs(TestBase):
assert len(thlog) == hlog_len + 1
assert thlog[-1].oldhexsha == cur_commit.hexsha
assert thlog[-1].newhexsha == pcommit.hexsha
-
+
# the ref didn't change though
assert len(cur_head.log()) == blog_len
-
+
# head changes once again, cur_head doesn't change
head.set_reference(cur_head, 'reattach head')
- assert len(head.log()) == hlog_len+2
+ assert len(head.log()) == hlog_len + 2
assert len(cur_head.log()) == blog_len
-
+
# adjusting the head-ref also adjust the head, so both reflogs are
# altered
cur_head.set_commit(pcommit, 'changing commit')
- assert len(cur_head.log()) == blog_len+1
- assert len(head.log()) == hlog_len+3
-
-
+ assert len(cur_head.log()) == blog_len + 1
+ assert len(head.log()) == hlog_len + 3
+
# with automatic dereferencing
assert head.set_commit(cur_commit, 'change commit once again') is head
- assert len(head.log()) == hlog_len+4
- assert len(cur_head.log()) == blog_len+2
-
+ assert len(head.log()) == hlog_len + 4
+ assert len(cur_head.log()) == blog_len + 2
+
# a new branch has just a single entry
other_head = Head.create(rwrepo, 'mynewhead', pcommit, logmsg='new head created')
log = other_head.log()
assert len(log) == 1
assert log[0].oldhexsha == pcommit.NULL_HEX_SHA
assert log[0].newhexsha == pcommit.hexsha
-
-
+
def test_refs(self):
types_found = set()
for ref in self.rorepo.refs:
types_found.add(type(ref))
- assert len(types_found) >= 3
-
+ assert len(types_found) >= 3
+
def test_is_valid(self):
assert Reference(self.rorepo, 'refs/doesnt/exist').is_valid() == False
assert self.rorepo.head.is_valid()
assert self.rorepo.head.reference.is_valid()
assert SymbolicReference(self.rorepo, 'hellothere').is_valid() == False
-
+
def test_orig_head(self):
assert type(self.rorepo.head.orig_head()) == SymbolicReference
-
+
@with_rw_repo('0.1.6')
def test_head_reset(self, rw_repo):
cur_head = rw_repo.head
old_head_commit = cur_head.commit
new_head_commit = cur_head.ref.commit.parents[0]
- cur_head.reset(new_head_commit, index=True) # index only
+ cur_head.reset(new_head_commit, index=True) # index only
assert cur_head.reference.commit == new_head_commit
-
+
self.failUnlessRaises(ValueError, cur_head.reset, new_head_commit, index=False, working_tree=True)
new_head_commit = new_head_commit.parents[0]
cur_head.reset(new_head_commit, index=True, working_tree=True) # index + wt
assert cur_head.reference.commit == new_head_commit
-
+
# paths - make sure we have something to do
rw_repo.index.reset(old_head_commit.parents[0])
- cur_head.reset(cur_head, paths = "test")
- cur_head.reset(new_head_commit, paths = "lib")
+ cur_head.reset(cur_head, paths="test")
+ cur_head.reset(new_head_commit, paths="lib")
# hard resets with paths don't work, its all or nothing
- self.failUnlessRaises(GitCommandError, cur_head.reset, new_head_commit, working_tree=True, paths = "lib")
-
+ self.failUnlessRaises(GitCommandError, cur_head.reset, new_head_commit, working_tree=True, paths="lib")
+
# we can do a mixed reset, and then checkout from the index though
cur_head.reset(new_head_commit)
- rw_repo.index.checkout(["lib"], force=True)#
-
-
- # now that we have a write write repo, change the HEAD reference - its
+ rw_repo.index.checkout(["lib"], force=True)
+
+ # now that we have a write write repo, change the HEAD reference - its
# like git-reset --soft
heads = rw_repo.heads
assert heads
@@ -203,7 +198,7 @@ class TestRefs(TestBase):
assert cur_head.commit == head.commit
assert not cur_head.is_detached
# END for each head
-
+
# detach
active_head = heads[0]
curhead_commit = active_head.commit
@@ -211,50 +206,50 @@ class TestRefs(TestBase):
assert cur_head.commit == curhead_commit
assert cur_head.is_detached
self.failUnlessRaises(TypeError, getattr, cur_head, "reference")
-
+
# tags are references, hence we can point to them
some_tag = rw_repo.tags[0]
cur_head.reference = some_tag
assert not cur_head.is_detached
assert cur_head.commit == some_tag.commit
- assert isinstance(cur_head.reference, TagReference)
-
+ assert isinstance(cur_head.reference, TagReference)
+
# put HEAD back to a real head, otherwise everything else fails
cur_head.reference = active_head
-
+
# type check
self.failUnlessRaises(ValueError, setattr, cur_head, "reference", "that")
-
- # head handling
+
+ # head handling
commit = 'HEAD'
prev_head_commit = cur_head.commit
for count, new_name in enumerate(("my_new_head", "feature/feature1")):
- actual_commit = commit+"^"*count
+ actual_commit = commit + "^" * count
new_head = Head.create(rw_repo, new_name, actual_commit)
assert new_head.is_detached
assert cur_head.commit == prev_head_commit
assert isinstance(new_head, Head)
# already exists, but has the same value, so its fine
Head.create(rw_repo, new_name, new_head.commit)
-
+
# its not fine with a different value
self.failUnlessRaises(OSError, Head.create, rw_repo, new_name, new_head.commit.parents[0])
-
+
# force it
new_head = Head.create(rw_repo, new_name, actual_commit, force=True)
old_path = new_head.path
old_name = new_head.name
-
+
assert new_head.rename("hello").name == "hello"
assert new_head.rename("hello/world").name == "hello/world"
assert new_head.rename(old_name).name == old_name and new_head.path == old_path
-
+
# rename with force
tmp_head = Head.create(rw_repo, "tmphead")
self.failUnlessRaises(GitCommandError, tmp_head.rename, new_head)
tmp_head.rename(new_head, force=True)
assert tmp_head == new_head and tmp_head.object == new_head.object
-
+
logfile = RefLog.path(tmp_head)
assert os.path.isfile(logfile)
Head.delete(rw_repo, tmp_head)
@@ -265,17 +260,17 @@ class TestRefs(TestBase):
# force on deletion testing would be missing here, code looks okay though ;)
# END for each new head name
self.failUnlessRaises(TypeError, RemoteReference.create, rw_repo, "some_name")
-
+
# tag ref
tag_name = "1.0.2"
light_tag = TagReference.create(rw_repo, tag_name)
self.failUnlessRaises(GitCommandError, TagReference.create, rw_repo, tag_name)
- light_tag = TagReference.create(rw_repo, tag_name, "HEAD~1", force = True)
+ light_tag = TagReference.create(rw_repo, tag_name, "HEAD~1", force=True)
assert isinstance(light_tag, TagReference)
assert light_tag.name == tag_name
assert light_tag.commit == cur_head.commit.parents[0]
assert light_tag.tag is None
-
+
# tag with tag object
other_tag_name = "releases/1.0.2RC"
msg = "my mighty tag\nsecond line"
@@ -284,18 +279,18 @@ class TestRefs(TestBase):
assert obj_tag.name == other_tag_name
assert obj_tag.commit == cur_head.commit
assert obj_tag.tag is not None
-
+
TagReference.delete(rw_repo, light_tag, obj_tag)
tags = rw_repo.tags
assert light_tag not in tags and obj_tag not in tags
-
+
# remote deletion
remote_refs_so_far = 0
- remotes = rw_repo.remotes
+ remotes = rw_repo.remotes
assert remotes
for remote in remotes:
refs = remote.refs
-
+
# If a HEAD exists, it must be deleted first. Otherwise it might
# end up pointing to an invalid ref it the ref was deleted before.
remote_head_name = "HEAD"
@@ -303,30 +298,30 @@ class TestRefs(TestBase):
RemoteReference.delete(rw_repo, refs[remote_head_name])
del(refs[remote_head_name])
#END handle HEAD deletion
-
+
RemoteReference.delete(rw_repo, *refs)
remote_refs_so_far += len(refs)
for ref in refs:
assert ref.remote_name == remote.name
# END for each ref to delete
assert remote_refs_so_far
-
+
for remote in remotes:
# remotes without references throw
self.failUnlessRaises(AssertionError, getattr, remote, 'refs')
# END for each remote
-
+
# change where the active head points to
if cur_head.is_detached:
cur_head.reference = rw_repo.heads[0]
-
+
head = cur_head.reference
old_commit = head.commit
head.commit = old_commit.parents[0]
assert head.commit == old_commit.parents[0]
assert head.commit == cur_head.commit
head.commit = old_commit
-
+
# setting a non-commit as commit fails, but succeeds as object
head_tree = head.commit.tree
self.failUnlessRaises(ValueError, setattr, head, 'commit', head_tree)
@@ -335,8 +330,8 @@ class TestRefs(TestBase):
head.object = head_tree
assert head.object == head_tree
# cannot query tree as commit
- self.failUnlessRaises(TypeError, getattr, head, 'commit')
-
+ self.failUnlessRaises(TypeError, getattr, head, 'commit')
+
# set the commit directly using the head. This would never detach the head
assert not cur_head.is_detached
head.object = old_commit
@@ -346,58 +341,58 @@ class TestRefs(TestBase):
assert cur_head.is_detached
cur_head.commit = parent_commit
assert cur_head.is_detached and cur_head.commit == parent_commit
-
+
cur_head.reference = head
assert not cur_head.is_detached
cur_head.commit = parent_commit
assert not cur_head.is_detached
assert head.commit == parent_commit
-
+
# test checkout
active_branch = rw_repo.active_branch
for head in rw_repo.heads:
checked_out_head = head.checkout()
assert checked_out_head == head
# END for each head to checkout
-
+
# checkout with branch creation
new_head = active_branch.checkout(b="new_head")
assert active_branch != rw_repo.active_branch
assert new_head == rw_repo.active_branch
-
+
# checkout with force as we have a changed a file
# clear file
- open(new_head.commit.tree.blobs[-1].abspath,'w').close()
+ open(new_head.commit.tree.blobs[-1].abspath, 'w').close()
assert len(new_head.commit.diff(None))
-
+
# create a new branch that is likely to touch the file we changed
- far_away_head = rw_repo.create_head("far_head",'HEAD~100')
+ far_away_head = rw_repo.create_head("far_head", 'HEAD~100')
self.failUnlessRaises(GitCommandError, far_away_head.checkout)
assert active_branch == active_branch.checkout(force=True)
assert rw_repo.head.reference != far_away_head
-
+
# test reference creation
partial_ref = 'sub/ref'
full_ref = 'refs/%s' % partial_ref
ref = Reference.create(rw_repo, partial_ref)
assert ref.path == full_ref
assert ref.object == rw_repo.head.commit
-
+
self.failUnlessRaises(OSError, Reference.create, rw_repo, full_ref, 'HEAD~20')
# it works if it is at the same spot though and points to the same reference
assert Reference.create(rw_repo, full_ref, 'HEAD').path == full_ref
Reference.delete(rw_repo, full_ref)
-
+
# recreate the reference using a full_ref
ref = Reference.create(rw_repo, full_ref)
assert ref.path == full_ref
assert ref.object == rw_repo.head.commit
-
+
# recreate using force
ref = Reference.create(rw_repo, partial_ref, 'HEAD~1', force=True)
assert ref.path == full_ref
assert ref.object == rw_repo.head.commit.parents[0]
-
+
# rename it
orig_obj = ref.object
for name in ('refs/absname', 'rela_name', 'feature/rela_name'):
@@ -407,10 +402,10 @@ class TestRefs(TestBase):
assert ref_new_name.object == orig_obj
assert ref_new_name == ref
# END for each name type
-
+
# References that don't exist trigger an error if we want to access them
self.failUnlessRaises(ValueError, getattr, Reference(rw_repo, "refs/doesntexist"), 'commit')
-
+
# exists, fail unless we force
ex_ref_path = far_away_head.path
self.failUnlessRaises(OSError, ref.rename, ex_ref_path)
@@ -418,35 +413,35 @@ class TestRefs(TestBase):
far_away_head.commit = ref.commit
ref.rename(ex_ref_path)
assert ref.path == ex_ref_path and ref.object == orig_obj
- assert ref.rename(ref.path).path == ex_ref_path # rename to same name
-
+ assert ref.rename(ref.path).path == ex_ref_path # rename to same name
+
# create symbolic refs
symref_path = "symrefs/sym"
symref = SymbolicReference.create(rw_repo, symref_path, cur_head.reference)
assert symref.path == symref_path
assert symref.reference == cur_head.reference
-
+
self.failUnlessRaises(OSError, SymbolicReference.create, rw_repo, symref_path, cur_head.reference.commit)
- # it works if the new ref points to the same reference
+ # it works if the new ref points to the same reference
SymbolicReference.create(rw_repo, symref.path, symref.reference).path == symref.path
SymbolicReference.delete(rw_repo, symref)
# would raise if the symref wouldn't have been deletedpbl
symref = SymbolicReference.create(rw_repo, symref_path, cur_head.reference)
-
+
# test symbolic references which are not at default locations like HEAD
# or FETCH_HEAD - they may also be at spots in refs of course
symbol_ref_path = "refs/symbol_ref"
symref = SymbolicReference(rw_repo, symbol_ref_path)
assert symref.path == symbol_ref_path
symbol_ref_abspath = os.path.join(rw_repo.git_dir, symref.path)
-
+
# set it
symref.reference = new_head
assert symref.reference == new_head
assert os.path.isfile(symbol_ref_abspath)
assert symref.commit == new_head.commit
-
- for name in ('absname','folder/rela_name'):
+
+ for name in ('absname', 'folder/rela_name'):
symref_new_name = symref.rename(name)
assert isinstance(symref_new_name, SymbolicReference)
assert name in symref_new_name.path
@@ -454,10 +449,10 @@ class TestRefs(TestBase):
assert symref_new_name == symref
assert not symref.is_detached
# END for each ref
-
+
# create a new non-head ref just to be sure we handle it even if packed
Reference.create(rw_repo, full_ref)
-
+
# test ref listing - assure we have packed refs
rw_repo.git.pack_refs(all=True, prune=True)
heads = rw_repo.heads
@@ -465,14 +460,14 @@ class TestRefs(TestBase):
assert new_head in heads
assert active_branch in heads
assert rw_repo.tags
-
+
# we should be able to iterate all symbolic refs as well - in that case
# we should expect only symbolic references to be returned
for symref in SymbolicReference.iter_items(rw_repo):
assert not symref.is_detached
-
+
# when iterating references, we can get references and symrefs
- # when deleting all refs, I'd expect them to be gone ! Even from
+ # when deleting all refs, I'd expect them to be gone ! Even from
# the packed ones
# For this to work, we must not be on any branch
rw_repo.head.reference = rw_repo.head.commit
@@ -484,62 +479,60 @@ class TestRefs(TestBase):
# END delete ref
# END for each ref to iterate and to delete
assert deleted_refs
-
+
for ref in Reference.iter_items(rw_repo):
if ref.is_detached:
assert ref not in deleted_refs
# END for each ref
-
- # reattach head - head will not be returned if it is not a symbolic
+
+ # reattach head - head will not be returned if it is not a symbolic
# ref
rw_repo.head.reference = Head.create(rw_repo, "master")
-
+
# At least the head should still exist
assert os.path.isfile(os.path.join(rw_repo.git_dir, 'HEAD'))
refs = list(SymbolicReference.iter_items(rw_repo))
assert len(refs) == 1
-
-
+
# test creation of new refs from scratch
for path in ("basename", "dir/somename", "dir2/subdir/basename"):
- # REFERENCES
+ # REFERENCES
############
fpath = Reference.to_full_path(path)
ref_fp = Reference.from_path(rw_repo, fpath)
assert not ref_fp.is_valid()
ref = Reference(rw_repo, fpath)
assert ref == ref_fp
-
+
# can be created by assigning a commit
ref.commit = rw_repo.head.commit
assert ref.is_valid()
-
+
# if the assignment raises, the ref doesn't exist
Reference.delete(ref.repo, ref.path)
assert not ref.is_valid()
self.failUnlessRaises(ValueError, setattr, ref, 'commit', "nonsense")
assert not ref.is_valid()
-
+
# I am sure I had my reason to make it a class method at first, but
# now it doesn't make so much sense anymore, want an instance method as well
# See http://byronimo.lighthouseapp.com/projects/51787-gitpython/tickets/27
Reference.delete(ref.repo, ref.path)
assert not ref.is_valid()
-
+
ref.object = rw_repo.head.commit
assert ref.is_valid()
-
+
Reference.delete(ref.repo, ref.path)
assert not ref.is_valid()
self.failUnlessRaises(ValueError, setattr, ref, 'object', "nonsense")
assert not ref.is_valid()
-
+
# END for each path
-
+
def test_dereference_recursive(self):
# for now, just test the HEAD
assert SymbolicReference.dereference_recursive(self.rorepo, 'HEAD')
-
+
def test_reflog(self):
assert isinstance(self.rorepo.heads.master.log(), RefLog)
-
diff --git a/git/test/test_remote.py b/git/test/test_remote.py
index b1248096..a5a73ce1 100644
--- a/git/test/test_remote.py
+++ b/git/test/test_remote.py
@@ -12,17 +12,19 @@ import shutil
import os
import random
-# assure we have repeatable results
+# assure we have repeatable results
random.seed(0)
+
class TestRemoteProgress(RemoteProgress):
- __slots__ = ( "_seen_lines", "_stages_per_op", '_num_progress_messages' )
+ __slots__ = ("_seen_lines", "_stages_per_op", '_num_progress_messages')
+
def __init__(self):
super(TestRemoteProgress, self).__init__()
self._seen_lines = list()
self._stages_per_op = dict()
self._num_progress_messages = 0
-
+
def _parse_progress_line(self, line):
# we may remove the line later if it is dropped
# Keep it for debugging
@@ -30,37 +32,36 @@ class TestRemoteProgress(RemoteProgress):
rval = super(TestRemoteProgress, self)._parse_progress_line(line)
assert len(line) > 1, "line %r too short" % line
return rval
-
+
def line_dropped(self, line):
try:
self._seen_lines.remove(line)
except ValueError:
pass
-
+
def update(self, op_code, cur_count, max_count=None, message=''):
# check each stage only comes once
op_id = op_code & self.OP_MASK
assert op_id in (self.COUNTING, self.COMPRESSING, self.WRITING)
-
+
self._stages_per_op.setdefault(op_id, 0)
- self._stages_per_op[ op_id ] = self._stages_per_op[ op_id ] | (op_code & self.STAGE_MASK)
-
- if op_code & (self.WRITING|self.END) == (self.WRITING|self.END):
+ self._stages_per_op[op_id] = self._stages_per_op[op_id] | (op_code & self.STAGE_MASK)
+
+ if op_code & (self.WRITING | self.END) == (self.WRITING | self.END):
assert message
# END check we get message
-
+
self._num_progress_messages += 1
-
-
+
def make_assertion(self):
# we don't always receive messages
if not self._seen_lines:
return
-
+
# sometimes objects are not compressed which is okay
- assert len(self._seen_ops) in (2,3)
+ assert len(self._seen_ops) in (2, 3)
assert self._stages_per_op
-
+
# must have seen all stages
for op, stages in self._stages_per_op.items():
assert stages & self.STAGE_MASK == self.STAGE_MASK
@@ -68,15 +69,14 @@ class TestRemoteProgress(RemoteProgress):
def assert_received_message(self):
assert self._num_progress_messages
-
+
class TestRemote(TestBase):
-
+
def _print_fetchhead(self, repo):
fp = open(os.path.join(repo.git_dir, "FETCH_HEAD"))
fp.close()
-
-
+
def _do_test_fetch_result(self, results, remote):
# self._print_fetchhead(remote.repo)
assert len(results) > 0 and isinstance(results[0], FetchInfo)
@@ -84,15 +84,15 @@ class TestRemote(TestBase):
assert isinstance(info.note, basestring)
if isinstance(info.ref, Reference):
assert info.flags != 0
- # END reference type flags handling
+ # END reference type flags handling
assert isinstance(info.ref, (SymbolicReference, Reference))
- if info.flags & (info.FORCED_UPDATE|info.FAST_FORWARD):
+ if info.flags & (info.FORCED_UPDATE | info.FAST_FORWARD):
assert isinstance(info.old_commit, Commit)
else:
assert info.old_commit is None
- # END forced update checking
+ # END forced update checking
# END for each info
-
+
def _do_test_push_result(self, results, remote):
assert len(results) > 0 and isinstance(results[0], PushInfo)
for info in results:
@@ -108,32 +108,31 @@ class TestRemote(TestBase):
assert has_one
else:
# there must be a remote commit
- if info.flags & info.DELETED == 0:
+ if info.flags & info.DELETED == 0:
assert isinstance(info.local_ref, Reference)
else:
assert info.local_ref is None
assert type(info.remote_ref) in (TagReference, RemoteReference)
# END error checking
- # END for each info
-
-
+ # END for each info
+
def _do_test_fetch_info(self, repo):
self.failUnlessRaises(ValueError, FetchInfo._from_line, repo, "nonsense", '')
self.failUnlessRaises(ValueError, FetchInfo._from_line, repo, "? [up to date] 0.1.7RC -> origin/0.1.7RC", '')
-
+
def _commit_random_file(self, repo):
#Create a file with a random name and random data and commit it to repo.
# Return the commited absolute file path
index = repo.index
- new_file = self._make_file(os.path.basename(tempfile.mktemp()),str(random.random()), repo)
+ new_file = self._make_file(os.path.basename(tempfile.mktemp()), str(random.random()), repo)
index.add([new_file])
index.commit("Committing %s" % new_file)
return new_file
-
- def _do_test_fetch(self,remote, rw_repo, remote_repo):
+
+ def _do_test_fetch(self, remote, rw_repo, remote_repo):
# specialized fetch testing to de-clutter the main test
self._do_test_fetch_info(rw_repo)
-
+
def fetch_and_test(remote, **kwargs):
progress = TestRemoteProgress()
kwargs['progress'] = progress
@@ -142,60 +141,60 @@ class TestRemote(TestBase):
self._do_test_fetch_result(res, remote)
return res
# END fetch and check
-
+
def get_info(res, remote, name):
- return res["%s/%s"%(remote,name)]
-
+ return res["%s/%s" % (remote, name)]
+
# put remote head to master as it is garantueed to exist
remote_repo.head.reference = remote_repo.heads.master
-
+
res = fetch_and_test(remote)
# all uptodate
for info in res:
assert info.flags & info.HEAD_UPTODATE
-
+
# rewind remote head to trigger rejection
# index must be false as remote is a bare repo
rhead = remote_repo.head
remote_commit = rhead.commit
rhead.reset("HEAD~2", index=False)
res = fetch_and_test(remote)
- mkey = "%s/%s"%(remote,'master')
+ mkey = "%s/%s" % (remote, 'master')
master_info = res[mkey]
assert master_info.flags & FetchInfo.FORCED_UPDATE and master_info.note is not None
-
+
# normal fast forward - set head back to previous one
rhead.commit = remote_commit
res = fetch_and_test(remote)
assert res[mkey].flags & FetchInfo.FAST_FORWARD
-
+
# new remote branch
new_remote_branch = Head.create(remote_repo, "new_branch")
res = fetch_and_test(remote)
new_branch_info = get_info(res, remote, new_remote_branch)
assert new_branch_info.flags & FetchInfo.NEW_HEAD
-
+
# remote branch rename ( causes creation of a new one locally )
new_remote_branch.rename("other_branch_name")
res = fetch_and_test(remote)
other_branch_info = get_info(res, remote, new_remote_branch)
assert other_branch_info.ref.commit == new_branch_info.ref.commit
-
+
# remove new branch
Head.delete(new_remote_branch.repo, new_remote_branch)
res = fetch_and_test(remote)
# deleted remote will not be fetched
self.failUnlessRaises(IndexError, get_info, res, remote, new_remote_branch)
-
+
# prune stale tracking branches
stale_refs = remote.stale_refs
assert len(stale_refs) == 2 and isinstance(stale_refs[0], RemoteReference)
RemoteReference.delete(rw_repo, *stale_refs)
-
+
# test single branch fetch with refspec including target remote
- res = fetch_and_test(remote, refspec="master:refs/remotes/%s/master"%remote)
+ res = fetch_and_test(remote, refspec="master:refs/remotes/%s/master" % remote)
assert len(res) == 1 and get_info(res, remote, 'master')
-
+
# ... with respec and no target
res = fetch_and_test(remote, refspec='master')
assert len(res) == 1
@@ -203,27 +202,27 @@ class TestRemote(TestBase):
# ... multiple refspecs
res = fetch_and_test(remote, refspec=['master', 'fred'])
assert len(res) == 1
-
+
# add new tag reference
rtag = TagReference.create(remote_repo, "1.0-RV_hello.there")
res = fetch_and_test(remote, tags=True)
tinfo = res[str(rtag)]
assert isinstance(tinfo.ref, TagReference) and tinfo.ref.commit == rtag.commit
assert tinfo.flags & tinfo.NEW_TAG
-
+
# adjust tag commit
Reference.set_object(rtag, rhead.commit.parents[0].parents[0])
res = fetch_and_test(remote, tags=True)
tinfo = res[str(rtag)]
assert tinfo.commit == rtag.commit
assert tinfo.flags & tinfo.TAG_UPDATE
-
+
# delete remote tag - local one will stay
TagReference.delete(remote_repo, rtag)
res = fetch_and_test(remote, tags=True)
self.failUnlessRaises(IndexError, get_info, res, remote, str(rtag))
-
- # provoke to receive actual objects to see what kind of output we have to
+
+ # provoke to receive actual objects to see what kind of output we have to
# expect. For that we need a remote transport protocol
# Create a new UN-shared repo and fetch into it after we pushed a change
# to the shared repo
@@ -231,31 +230,31 @@ class TestRemote(TestBase):
# must clone with a local path for the repo implementation not to freak out
# as it wants local paths only ( which I can understand )
other_repo = remote_repo.clone(other_repo_dir, shared=False)
- remote_repo_url = "git://localhost%s"%remote_repo.git_dir
-
+ remote_repo_url = "git://localhost%s" % remote_repo.git_dir
+
# put origin to git-url
- other_origin = other_repo.remotes.origin
+ other_origin = other_repo.remotes.origin
other_origin.config_writer.set("url", remote_repo_url)
# it automatically creates alternates as remote_repo is shared as well.
# It will use the transport though and ignore alternates when fetching
# assert not other_repo.alternates # this would fail
-
+
# assure we are in the right state
rw_repo.head.reset(remote.refs.master, working_tree=True)
try:
self._commit_random_file(rw_repo)
remote.push(rw_repo.head.reference)
-
- # here I would expect to see remote-information about packing
- # objects and so on. Unfortunately, this does not happen
+
+ # here I would expect to see remote-information about packing
+ # objects and so on. Unfortunately, this does not happen
# if we are redirecting the output - git explicitly checks for this
# and only provides progress information to ttys
res = fetch_and_test(other_origin)
finally:
shutil.rmtree(other_repo_dir)
# END test and cleanup
-
- def _assert_push_and_pull(self,remote, rw_repo, remote_repo):
+
+ def _assert_push_and_pull(self, remote, rw_repo, remote_repo):
# push our changes
lhead = rw_repo.head
lindex = rw_repo.index
@@ -263,16 +262,16 @@ class TestRemote(TestBase):
try:
lhead.reference = rw_repo.heads.master
except AttributeError:
- # if the author is on a non-master branch, the clones might not have
+ # if the author is on a non-master branch, the clones might not have
# a local master yet. We simply create it
lhead.reference = rw_repo.create_head('master')
- # END master handling
+ # END master handling
lhead.reset(remote.refs.master, working_tree=True)
-
+
# push without spec should fail ( without further configuration )
# well, works nicely
# self.failUnlessRaises(GitCommandError, remote.push)
-
+
# simple file push
self._commit_random_file(rw_repo)
progress = TestRemoteProgress()
@@ -280,25 +279,25 @@ class TestRemote(TestBase):
assert isinstance(res, IterableList)
self._do_test_push_result(res, remote)
progress.make_assertion()
-
+
# rejected - undo last commit
lhead.reset("HEAD~1")
res = remote.push(lhead.reference)
- assert res[0].flags & PushInfo.ERROR
+ assert res[0].flags & PushInfo.ERROR
assert res[0].flags & PushInfo.REJECTED
self._do_test_push_result(res, remote)
-
+
# force rejected pull
res = remote.push('+%s' % lhead.reference)
- assert res[0].flags & PushInfo.ERROR == 0
+ assert res[0].flags & PushInfo.ERROR == 0
assert res[0].flags & PushInfo.FORCED_UPDATE
self._do_test_push_result(res, remote)
-
+
# invalid refspec
res = remote.push("hellothere")
assert len(res) == 0
-
- # push new tags
+
+ # push new tags
progress = TestRemoteProgress()
to_be_updated = "my_tag.1.0RV"
new_tag = TagReference.create(rw_repo, to_be_updated)
@@ -307,26 +306,26 @@ class TestRemote(TestBase):
assert res[-1].flags & PushInfo.NEW_TAG
progress.make_assertion()
self._do_test_push_result(res, remote)
-
+
# update push new tags
# Rejection is default
new_tag = TagReference.create(rw_repo, to_be_updated, ref='HEAD~1', force=True)
res = remote.push(tags=True)
self._do_test_push_result(res, remote)
assert res[-1].flags & PushInfo.REJECTED and res[-1].flags & PushInfo.ERROR
-
+
# push force this tag
res = remote.push("+%s" % new_tag.path)
assert res[-1].flags & PushInfo.ERROR == 0 and res[-1].flags & PushInfo.FORCED_UPDATE
-
+
# delete tag - have to do it using refspec
res = remote.push(":%s" % new_tag.path)
self._do_test_push_result(res, remote)
assert res[0].flags & PushInfo.DELETED
- # Currently progress is not properly transferred, especially not using
+ # Currently progress is not properly transferred, especially not using
# the git daemon
# progress.assert_received_message()
-
+
# push new branch
new_head = Head.create(rw_repo, "my_new_branch")
progress = TestRemoteProgress()
@@ -334,45 +333,45 @@ class TestRemote(TestBase):
assert res[0].flags & PushInfo.NEW_HEAD
progress.make_assertion()
self._do_test_push_result(res, remote)
-
+
# delete new branch on the remote end and locally
res = remote.push(":%s" % new_head.path)
self._do_test_push_result(res, remote)
Head.delete(rw_repo, new_head)
assert res[-1].flags & PushInfo.DELETED
-
+
# --all
res = remote.push(all=True)
self._do_test_push_result(res, remote)
-
+
remote.pull('master')
-
- # cleanup - delete created tags and branches as we are in an innerloop on
+
+ # cleanup - delete created tags and branches as we are in an innerloop on
# the same repository
TagReference.delete(rw_repo, new_tag, other_tag)
remote.push(":%s" % other_tag.path)
-
+
@with_rw_and_rw_remote_repo('0.1.6')
def test_base(self, rw_repo, remote_repo):
num_remotes = 0
remote_set = set()
ran_fetch_test = False
-
+
for remote in rw_repo.remotes:
num_remotes += 1
assert remote == remote
assert str(remote) != repr(remote)
remote_set.add(remote)
remote_set.add(remote) # should already exist
-
- # REFS
+
+ # REFS
refs = remote.refs
assert refs
for ref in refs:
assert ref.remote_name == remote.name
assert ref.remote_head
# END for each ref
-
+
# OPTIONS
# cannot use 'fetch' key anymore as it is now a method
for opt in ("url", ):
@@ -380,10 +379,10 @@ class TestRemote(TestBase):
reader = remote.config_reader
assert reader.get(opt) == val
assert reader.get_value(opt, None) == val
-
+
# unable to write with a reader
self.failUnlessRaises(IOError, reader.set, opt, "test")
-
+
# change value
writer = remote.config_writer
new_val = "myval"
@@ -393,9 +392,9 @@ class TestRemote(TestBase):
assert writer.get(opt) == val
del(writer)
assert getattr(remote, opt) == val
- # END for each default option key
-
- # RENAME
+ # END for each default option key
+
+ # RENAME
other_name = "totally_other_name"
prev_name = remote.name
assert remote.rename(other_name) == remote
@@ -404,98 +403,96 @@ class TestRemote(TestBase):
for time in range(2):
assert remote.rename(prev_name).name == prev_name
# END for each rename ( back to prev_name )
-
+
# PUSH/PULL TESTING
self._assert_push_and_pull(remote, rw_repo, remote_repo)
-
+
# FETCH TESTING
- # Only for remotes - local cases are the same or less complicated
+ # Only for remotes - local cases are the same or less complicated
# as additional progress information will never be emitted
if remote.name == "daemon_origin":
self._do_test_fetch(remote, rw_repo, remote_repo)
ran_fetch_test = True
- # END fetch test
-
+ # END fetch test
+
remote.update()
# END for each remote
-
+
assert ran_fetch_test
assert num_remotes
assert num_remotes == len(remote_set)
-
+
origin = rw_repo.remote('origin')
assert origin == rw_repo.remotes.origin
-
+
@with_rw_repo('HEAD', bare=True)
def test_creation_and_removal(self, bare_rw_repo):
new_name = "test_new_one"
arg_list = (new_name, "git@server:hello.git")
- remote = Remote.create(bare_rw_repo, *arg_list )
+ remote = Remote.create(bare_rw_repo, *arg_list)
assert remote.name == "test_new_one"
assert remote in bare_rw_repo.remotes
-
+
# create same one again
self.failUnlessRaises(GitCommandError, Remote.create, bare_rw_repo, *arg_list)
-
+
Remote.remove(bare_rw_repo, new_name)
-
+
for remote in bare_rw_repo.remotes:
if remote.name == new_name:
raise AssertionError("Remote removal failed")
# END if deleted remote matches existing remote's name
# END for each remote
-
+
def test_fetch_info(self):
# assure we can handle remote-tracking branches
fetch_info_line_fmt = "c437ee5deb8d00cf02f03720693e4c802e99f390 not-for-merge %s '0.3' of git://github.com/gitpython-developers/GitPython"
remote_info_line_fmt = "* [new branch] nomatter -> %s"
fi = FetchInfo._from_line(self.rorepo,
- remote_info_line_fmt % "local/master",
+ remote_info_line_fmt % "local/master",
fetch_info_line_fmt % 'remote-tracking branch')
assert fi.ref.is_valid()
assert fi.ref.commit
-
+
# handles non-default refspecs: One can specify a different path in refs/remotes
# or a special path just in refs/something for instance
-
+
fi = FetchInfo._from_line(self.rorepo,
- remote_info_line_fmt % "subdir/tagname",
+ remote_info_line_fmt % "subdir/tagname",
fetch_info_line_fmt % 'tag')
-
+
assert isinstance(fi.ref, TagReference)
assert fi.ref.path.startswith('refs/tags')
-
+
# it could be in a remote direcftory though
fi = FetchInfo._from_line(self.rorepo,
- remote_info_line_fmt % "remotename/tags/tagname",
+ remote_info_line_fmt % "remotename/tags/tagname",
fetch_info_line_fmt % 'tag')
-
+
assert isinstance(fi.ref, TagReference)
assert fi.ref.path.startswith('refs/remotes/')
-
+
# it can also be anywhere !
tag_path = "refs/something/remotename/tags/tagname"
fi = FetchInfo._from_line(self.rorepo,
- remote_info_line_fmt % tag_path,
+ remote_info_line_fmt % tag_path,
fetch_info_line_fmt % 'tag')
-
+
assert isinstance(fi.ref, TagReference)
assert fi.ref.path == tag_path
-
+
# branches default to refs/remotes
fi = FetchInfo._from_line(self.rorepo,
- remote_info_line_fmt % "remotename/branch",
+ remote_info_line_fmt % "remotename/branch",
fetch_info_line_fmt % 'branch')
-
+
assert isinstance(fi.ref, RemoteReference)
assert fi.ref.remote_name == 'remotename'
-
+
# but you can force it anywhere, in which case we only have a references
fi = FetchInfo._from_line(self.rorepo,
- remote_info_line_fmt % "refs/something/branch",
+ remote_info_line_fmt % "refs/something/branch",
fetch_info_line_fmt % 'branch')
-
+
assert type(fi.ref) is Reference
assert fi.ref.path == "refs/something/branch"
-
-
diff --git a/git/test/test_repo.py b/git/test/test_repo.py
index 9770d97c..d6568d0b 100644
--- a/git/test/test_repo.py
+++ b/git/test/test_repo.py
@@ -24,7 +24,7 @@ from cStringIO import StringIO
class TestRepo(TestBase):
-
+
@raises(InvalidGitRepositoryError)
def test_new_should_raise_on_invalid_repo_location(self):
Repo(tempfile.gettempdir())
@@ -52,21 +52,21 @@ class TestRepo(TestBase):
def test_heads_should_populate_head_data(self):
for head in self.rorepo.heads:
assert head.name
- assert isinstance(head.commit,Commit)
- # END for each head
-
+ assert isinstance(head.commit, Commit)
+ # END for each head
+
assert isinstance(self.rorepo.heads.master, Head)
assert isinstance(self.rorepo.heads['master'], Head)
-
+
def test_tree_from_revision(self):
tree = self.rorepo.tree('0.1.6')
- assert len(tree.hexsha) == 40
+ assert len(tree.hexsha) == 40
assert tree.type == "tree"
assert self.rorepo.tree(tree) == tree
-
+
# try from invalid revision that does not exist
self.failUnlessRaises(BadObject, self.rorepo.tree, 'hello world')
-
+
def test_commit_from_revision(self):
commit = self.rorepo.commit('0.1.4')
assert commit.type == 'commit'
@@ -76,7 +76,7 @@ class TestRepo(TestBase):
mc = 10
commits = list(self.rorepo.iter_commits('0.1.6', max_count=mc))
assert len(commits) == mc
-
+
c = commits[0]
assert_equal('9a4b1d4d11eee3c5362a4152216376e634bd14cf', c.hexsha)
assert_equal(["c76852d0bff115720af3f27acdb084c59361e5f6"], [p.hexsha for p in c.parents])
@@ -84,11 +84,11 @@ class TestRepo(TestBase):
assert_equal("Michael Trier", c.author.name)
assert_equal("mtrier@gmail.com", c.author.email)
assert_equal(1232829715, c.authored_date)
- assert_equal(5*3600, c.author_tz_offset)
+ assert_equal(5 * 3600, c.author_tz_offset)
assert_equal("Michael Trier", c.committer.name)
assert_equal("mtrier@gmail.com", c.committer.email)
assert_equal(1232829715, c.committed_date)
- assert_equal(5*3600, c.committer_tz_offset)
+ assert_equal(5 * 3600, c.committer_tz_offset)
assert_equal("Bumped version 0.1.6\n", c.message)
c = commits[1]
@@ -103,34 +103,32 @@ class TestRepo(TestBase):
# END for each tree
assert num_trees == mc
-
def _assert_empty_repo(self, repo):
- # test all kinds of things with an empty, freshly initialized repo.
+ # test all kinds of things with an empty, freshly initialized repo.
# It should throw good errors
-
+
# entries should be empty
assert len(repo.index.entries) == 0
-
+
# head is accessible
assert repo.head
assert repo.head.ref
assert not repo.head.is_valid()
-
+
# we can change the head to some other ref
head_ref = Head.from_path(repo, Head.to_full_path('some_head'))
assert not head_ref.is_valid()
repo.head.ref = head_ref
-
+
# is_dirty can handle all kwargs
for args in ((1, 0, 0), (0, 1, 0), (0, 0, 1)):
assert not repo.is_dirty(*args)
- # END for each arg
-
+ # END for each arg
+
# we can add a file to the index ( if we are not bare )
if not repo.bare:
pass
# END test repos with working tree
-
def test_init(self):
prev_cwd = os.getcwd()
@@ -145,15 +143,14 @@ class TestRepo(TestBase):
assert isinstance(r, Repo)
assert r.bare == True
assert os.path.isdir(r.git_dir)
-
+
self._assert_empty_repo(r)
-
+
# test clone
clone_path = path + "_clone"
rc = r.clone(clone_path)
self._assert_empty_repo(rc)
-
-
+
try:
shutil.rmtree(clone_path)
except OSError:
@@ -161,11 +158,11 @@ class TestRepo(TestBase):
# of the parent directory
pass
# END exception handling
-
+
# try again, this time with the absolute version
rc = Repo.clone_from(r.git_dir, clone_path)
self._assert_empty_repo(rc)
-
+
shutil.rmtree(git_dir_abs)
try:
shutil.rmtree(clone_path)
@@ -174,14 +171,14 @@ class TestRepo(TestBase):
# of the parent directory
pass
# END exception handling
-
+
# END for each path
-
+
os.makedirs(git_dir_rela)
os.chdir(git_dir_rela)
r = Repo.init(bare=False)
r.bare == False
-
+
self._assert_empty_repo(r)
finally:
try:
@@ -190,23 +187,23 @@ class TestRepo(TestBase):
pass
os.chdir(prev_cwd)
# END restore previous state
-
+
def test_bare_property(self):
self.rorepo.bare
def test_daemon_export(self):
orig_val = self.rorepo.daemon_export
self.rorepo.daemon_export = not orig_val
- assert self.rorepo.daemon_export == ( not orig_val )
+ assert self.rorepo.daemon_export == (not orig_val)
self.rorepo.daemon_export = orig_val
assert self.rorepo.daemon_export == orig_val
-
+
def test_alternates(self):
cur_alternates = self.rorepo.alternates
# empty alternates
self.rorepo.alternates = []
assert self.rorepo.alternates == []
- alts = [ "other/location", "this/location" ]
+ alts = ["other/location", "this/location"]
self.rorepo.alternates = alts
assert alts == self.rorepo.alternates
self.rorepo.alternates = cur_alternates
@@ -219,13 +216,13 @@ class TestRepo(TestBase):
orig_value = self.rorepo._bare
self.rorepo._bare = True
assert_false(self.rorepo.is_dirty())
- self.rorepo._bare = orig_value
+ self.rorepo._bare = orig_value
def test_is_dirty(self):
self.rorepo._bare = False
- for index in (0,1):
- for working_tree in (0,1):
- for untracked_files in (0,1):
+ for index in (0, 1):
+ for working_tree in (0, 1):
+ for untracked_files in (0, 1):
assert self.rorepo.is_dirty(index, working_tree, untracked_files) in (True, False)
# END untracked files
# END working tree
@@ -241,27 +238,27 @@ class TestRepo(TestBase):
def test_index(self):
index = self.rorepo.index
assert isinstance(index, IndexFile)
-
+
def test_tag(self):
assert self.rorepo.tag('refs/tags/0.1.5').commit
-
+
def test_archive(self):
tmpfile = os.tmpfile()
self.rorepo.archive(tmpfile, '0.1.5')
assert tmpfile.tell()
-
+
@patch.object(Git, '_call_process')
def test_should_display_blame_information(self, git):
git.return_value = fixture('blame')
- b = self.rorepo.blame( 'master', 'lib/git.py')
+ b = self.rorepo.blame('master', 'lib/git.py')
assert_equal(13, len(b))
- assert_equal( 2, len(b[0]) )
+ assert_equal(2, len(b[0]))
# assert_equal(25, reduce(lambda acc, x: acc + len(x[-1]), b))
assert_equal(hash(b[0][0]), hash(b[9][0]))
c = b[0][0]
assert_true(git.called)
assert_equal(git.call_args, (('blame', 'master', '--', 'lib/git.py'), {'p': True}))
-
+
assert_equal('634396b2f541a9f2d58b00be1a07f0c358b999b3', c.hexsha)
assert_equal('Tom Preston-Werner', c.author.name)
assert_equal('tom@mojombo.com', c.author.email)
@@ -270,13 +267,13 @@ class TestRepo(TestBase):
assert_equal('tom@mojombo.com', c.committer.email)
assert_equal(1191997100, c.committed_date)
assert_equal('initial grit setup', c.message)
-
+
# test the 'lines per commit' entries
tlist = b[0][1]
- assert_true( tlist )
- assert_true( isinstance( tlist[0], basestring ) )
- assert_true( len( tlist ) < sum( len(t) for t in tlist ) ) # test for single-char bug
-
+ assert_true(tlist)
+ assert_true(isinstance(tlist[0], basestring))
+ assert_true(len(tlist) < sum(len(t) for t in tlist)) # test for single-char bug
+
def test_blame_real(self):
c = 0
for item in self.rorepo.head.commit.tree.traverse(
@@ -285,20 +282,20 @@ class TestRepo(TestBase):
b = self.rorepo.blame(self.rorepo.head, item.path)
#END for each item to traverse
assert c
-
+
def test_untracked_files(self):
base = self.rorepo.working_tree_dir
- files = ( join_path_native(base, "__test_myfile"),
- join_path_native(base, "__test_other_file") )
+ files = (join_path_native(base, "__test_myfile"),
+ join_path_native(base, "__test_other_file"))
num_recently_untracked = 0
try:
for fpath in files:
- fd = open(fpath,"wb")
+ fd = open(fpath, "wb")
fd.close()
# END for each filename
untracked_files = self.rorepo.untracked_files
num_recently_untracked = len(untracked_files)
-
+
# assure we have all names - they are relative to the git-dir
num_test_untracked = 0
for utfile in untracked_files:
@@ -308,80 +305,81 @@ class TestRepo(TestBase):
for fpath in files:
if os.path.isfile(fpath):
os.remove(fpath)
- # END handle files
-
+ # END handle files
+
assert len(self.rorepo.untracked_files) == (num_recently_untracked - len(files))
-
+
def test_config_reader(self):
- reader = self.rorepo.config_reader() # all config files
+ reader = self.rorepo.config_reader() # all config files
assert reader.read_only
reader = self.rorepo.config_reader("repository") # single config file
assert reader.read_only
-
+
def test_config_writer(self):
for config_level in self.rorepo.config_level:
try:
writer = self.rorepo.config_writer(config_level)
assert not writer.read_only
except IOError:
- # its okay not to get a writer for some configuration files if we
+ # its okay not to get a writer for some configuration files if we
# have no permissions
- pass
- # END for each config level
-
+ pass
+ # END for each config level
+
def test_creation_deletion(self):
- # just a very quick test to assure it generally works. There are
+ # just a very quick test to assure it generally works. There are
# specialized cases in the test_refs module
head = self.rorepo.create_head("new_head", "HEAD~1")
self.rorepo.delete_head(head)
-
+
tag = self.rorepo.create_tag("new_tag", "HEAD~2")
self.rorepo.delete_tag(tag)
self.rorepo.config_writer()
remote = self.rorepo.create_remote("new_remote", "git@server:repo.git")
self.rorepo.delete_remote(remote)
-
+
def test_comparison_and_hash(self):
# this is only a preliminary test, more testing done in test_index
assert self.rorepo == self.rorepo and not (self.rorepo != self.rorepo)
assert len(set((self.rorepo, self.rorepo))) == 1
-
+
def test_git_cmd(self):
# test CatFileContentStream, just to be very sure we have no fencepost errors
# last \n is the terminating newline that it expects
l1 = "0123456789\n"
l2 = "abcdefghijklmnopqrstxy\n"
- l3 = "z\n"
+ l3 = "z\n"
d = "%s%s%s\n" % (l1, l2, l3)
-
+
l1p = l1[:5]
-
+
# full size
# size is without terminating newline
def mkfull():
- return Git.CatFileContentStream(len(d)-1, StringIO(d))
-
+ return Git.CatFileContentStream(len(d) - 1, StringIO(d))
+
ts = 5
+
def mktiny():
return Git.CatFileContentStream(ts, StringIO(d))
-
+
# readlines no limit
s = mkfull()
lines = s.readlines()
assert len(lines) == 3 and lines[-1].endswith('\n')
assert s._stream.tell() == len(d) # must have scrubbed to the end
-
+
# realines line limit
s = mkfull()
lines = s.readlines(5)
assert len(lines) == 1
-
+
# readlines on tiny sections
s = mktiny()
lines = s.readlines()
assert len(lines) == 1 and lines[0] == l1p
- assert s._stream.tell() == ts+1
-
+ assert s._stream.tell() == ts + 1
+
# readline no limit
s = mkfull()
assert s.readline() == l1
@@ -389,52 +387,51 @@ class TestRepo(TestBase):
assert s.readline() == l3
assert s.readline() == ''
assert s._stream.tell() == len(d)
-
+
# readline limit
s = mkfull()
assert s.readline(5) == l1p
assert s.readline() == l1[5:]
-
+
# readline on tiny section
s = mktiny()
assert s.readline() == l1p
assert s.readline() == ''
- assert s._stream.tell() == ts+1
-
+ assert s._stream.tell() == ts + 1
+
# read no limit
s = mkfull()
assert s.read() == d[:-1]
assert s.read() == ''
assert s._stream.tell() == len(d)
-
+
# read limit
s = mkfull()
assert s.read(5) == l1p
assert s.read(6) == l1[5:]
assert s._stream.tell() == 5 + 6 # its not yet done
-
+
# read tiny
s = mktiny()
assert s.read(2) == l1[:2]
assert s._stream.tell() == 2
assert s.read() == l1[2:ts]
- assert s._stream.tell() == ts+1
-
+ assert s._stream.tell() == ts + 1
+
def _assert_rev_parse_types(self, name, rev_obj):
rev_parse = self.rorepo.rev_parse
-
+
if rev_obj.type == 'tag':
rev_obj = rev_obj.object
-
+
# tree and blob type
obj = rev_parse(name + '^{tree}')
assert obj == rev_obj.tree
-
+
obj = rev_parse(name + ':CHANGES')
assert obj.type == 'blob' and obj.path == 'CHANGES'
assert rev_obj.tree['CHANGES'] == obj
-
-
+
def _assert_rev_parse(self, name):
"""tries multiple different rev-parse syntaxes with the given name
:return: parsed object"""
@@ -444,62 +441,62 @@ class TestRepo(TestBase):
obj = orig_obj.object
else:
obj = orig_obj
- # END deref tags by default
-
+ # END deref tags by default
+
# try history
rev = name + "~"
obj2 = rev_parse(rev)
assert obj2 == obj.parents[0]
self._assert_rev_parse_types(rev, obj2)
-
+
# history with number
ni = 11
history = [obj.parents[0]]
for pn in range(ni):
history.append(history[-1].parents[0])
# END get given amount of commits
-
+
for pn in range(11):
- rev = name + "~%i" % (pn+1)
+ rev = name + "~%i" % (pn + 1)
obj2 = rev_parse(rev)
assert obj2 == history[pn]
self._assert_rev_parse_types(rev, obj2)
# END history check
-
+
# parent ( default )
rev = name + "^"
obj2 = rev_parse(rev)
assert obj2 == obj.parents[0]
self._assert_rev_parse_types(rev, obj2)
-
+
# parent with number
for pn, parent in enumerate(obj.parents):
- rev = name + "^%i" % (pn+1)
+ rev = name + "^%i" % (pn + 1)
assert rev_parse(rev) == parent
self._assert_rev_parse_types(rev, parent)
# END for each parent
-
+
return orig_obj
-
+
@with_rw_repo('HEAD', bare=False)
def test_rw_rev_parse(self, rwrepo):
# verify it does not confuse branches with hexsha ids
ahead = rwrepo.create_head('aaaaaaaa')
assert(rwrepo.rev_parse(str(ahead)) == ahead.commit)
-
+
def test_rev_parse(self):
rev_parse = self.rorepo.rev_parse
-
+
# try special case: This one failed at some point, make sure its fixed
assert rev_parse("33ebe").hexsha == "33ebe7acec14b25c5f84f35a664803fcab2f7781"
-
+
# start from reference
num_resolved = 0
-
+
for ref in Reference.iter_items(self.rorepo):
path_tokens = ref.path.split("/")
for pt in range(len(path_tokens)):
- path_section = '/'.join(path_tokens[-(pt+1):])
+ path_section = '/'.join(path_tokens[-(pt + 1):])
try:
obj = self._assert_rev_parse(path_section)
assert obj.type == ref.object.type
@@ -512,115 +509,109 @@ class TestRepo(TestBase):
# END for each token
# END for each reference
assert num_resolved
-
+
# it works with tags !
tag = self._assert_rev_parse('0.1.4')
assert tag.type == 'tag'
-
+
# try full sha directly ( including type conversion )
assert tag.object == rev_parse(tag.object.hexsha)
self._assert_rev_parse_types(tag.object.hexsha, tag.object)
-
-
+
# multiple tree types result in the same tree: HEAD^{tree}^{tree}:CHANGES
rev = '0.1.4^{tree}^{tree}'
assert rev_parse(rev) == tag.object.tree
- assert rev_parse(rev+':CHANGES') == tag.object.tree['CHANGES']
-
-
+ assert rev_parse(rev + ':CHANGES') == tag.object.tree['CHANGES']
+
# try to get parents from first revision - it should fail as no such revision
# exists
first_rev = "33ebe7acec14b25c5f84f35a664803fcab2f7781"
commit = rev_parse(first_rev)
assert len(commit.parents) == 0
assert commit.hexsha == first_rev
- self.failUnlessRaises(BadObject, rev_parse, first_rev+"~")
- self.failUnlessRaises(BadObject, rev_parse, first_rev+"^")
-
+ self.failUnlessRaises(BadObject, rev_parse, first_rev + "~")
+ self.failUnlessRaises(BadObject, rev_parse, first_rev + "^")
+
# short SHA1
commit2 = rev_parse(first_rev[:20])
assert commit2 == commit
commit2 = rev_parse(first_rev[:5])
assert commit2 == commit
-
-
+
# todo: dereference tag into a blob 0.1.7^{blob} - quite a special one
# needs a tag which points to a blob
-
-
+
# ref^0 returns commit being pointed to, same with ref~0, and ^{}
tag = rev_parse('0.1.4')
for token in (('~0', '^0', '^{}')):
assert tag.object == rev_parse('0.1.4%s' % token)
# END handle multiple tokens
-
+
# try partial parsing
max_items = 40
for i, binsha in enumerate(self.rorepo.odb.sha_iter()):
- assert rev_parse(bin_to_hex(binsha)[:8-(i%2)]).binsha == binsha
+ assert rev_parse(bin_to_hex(binsha)[:8 - (i % 2)]).binsha == binsha
if i > max_items:
# this is rather slow currently, as rev_parse returns an object
# which requires accessing packs, it has some additional overhead
break
# END for each binsha in repo
-
+
# missing closing brace commit^{tree
self.failUnlessRaises(ValueError, rev_parse, '0.1.4^{tree')
-
+
# missing starting brace
self.failUnlessRaises(ValueError, rev_parse, '0.1.4^tree}')
-
+
# REVLOG
#######
head = self.rorepo.head
-
+
# need to specify a ref when using the @ syntax
self.failUnlessRaises(BadObject, rev_parse, "%s@{0}" % head.commit.hexsha)
-
+
# uses HEAD.ref by default
assert rev_parse('@{0}') == head.commit
if not head.is_detached:
refspec = '%s@{0}' % head.ref.name
assert rev_parse(refspec) == head.ref.commit
# all additional specs work as well
- assert rev_parse(refspec+"^{tree}") == head.commit.tree
- assert rev_parse(refspec+":CHANGES").type == 'blob'
+ assert rev_parse(refspec + "^{tree}") == head.commit.tree
+ assert rev_parse(refspec + ":CHANGES").type == 'blob'
#END operate on non-detached head
-
+
# the last position
assert rev_parse('@{1}') != head.commit
-
+
# position doesn't exist
self.failUnlessRaises(IndexError, rev_parse, '@{10000}')
-
+
# currently, nothing more is supported
self.failUnlessRaises(NotImplementedError, rev_parse, "@{1 week ago}")
-
+
def test_repo_odbtype(self):
target_type = GitDB
if sys.version_info[1] < 5:
target_type = GitCmdObjectDB
assert isinstance(self.rorepo.odb, target_type)
-
+
def test_submodules(self):
assert len(self.rorepo.submodules) == 1 # non-recursive
assert len(list(self.rorepo.iter_submodules())) >= 2
-
+
assert isinstance(self.rorepo.submodule("gitdb"), Submodule)
self.failUnlessRaises(ValueError, self.rorepo.submodule, "doesn't exist")
-
+
@with_rw_repo('HEAD', bare=False)
def test_submodule_update(self, rwrepo):
# fails in bare mode
rwrepo._bare = True
self.failUnlessRaises(InvalidGitRepositoryError, rwrepo.submodule_update)
rwrepo._bare = False
-
+
# test create submodule
sm = rwrepo.submodules[0]
sm = rwrepo.create_submodule("my_new_sub", "some_path", join_path_native(self.rorepo.working_tree_dir, sm.path))
assert isinstance(sm, Submodule)
-
+
# note: the rest of this functionality is tested in test_submodule
-
-
diff --git a/git/test/test_stats.py b/git/test/test_stats.py
index 2bdb0a89..d827c680 100644
--- a/git/test/test_stats.py
+++ b/git/test/test_stats.py
@@ -7,19 +7,20 @@
from git.test.lib import *
from git import *
+
class TestStats(TestBase):
-
+
def test__list_from_string(self):
output = fixture('diff_numstat')
stats = Stats._list_from_string(self.rorepo, output)
-
+
assert_equal(2, stats.total['files'])
assert_equal(52, stats.total['lines'])
assert_equal(29, stats.total['insertions'])
assert_equal(23, stats.total['deletions'])
-
+
assert_equal(29, stats.files["a.txt"]['insertions'])
assert_equal(18, stats.files["a.txt"]['deletions'])
-
+
assert_equal(0, stats.files["b.txt"]['insertions'])
assert_equal(5, stats.files["b.txt"]['deletions'])
diff --git a/git/test/test_submodule.py b/git/test/test_submodule.py
index 37bf9f13..0ecb5c1f 100644
--- a/git/test/test_submodule.py
+++ b/git/test/test_submodule.py
@@ -25,35 +25,36 @@ if sys.platform == 'win32':
class TestRootProgress(RootUpdateProgress):
+
"""Just prints messages, for now without checking the correctness of the states"""
-
+
def update(self, op, index, max_count, message=''):
print message
-
+
prog = TestRootProgress()
+
class TestSubmodule(TestBase):
k_subm_current = "468cad66ff1f80ddaeee4123c24e4d53a032c00d"
k_subm_changed = "394ed7006ee5dc8bddfd132b64001d5dfc0ffdd3"
k_no_subm_tag = "0.1.6"
-
def _do_base_tests(self, rwrepo):
"""Perform all tests in the given repository, it may be bare or nonbare"""
# manual instantiation
- smm = Submodule(rwrepo, "\0"*20)
+ smm = Submodule(rwrepo, "\0" * 20)
# name needs to be set in advance
- self.failUnlessRaises(AttributeError, getattr, smm, 'name')
-
+ self.failUnlessRaises(AttributeError, getattr, smm, 'name')
+
# iterate - 1 submodule
sms = Submodule.list_items(rwrepo, self.k_subm_current)
assert len(sms) == 1
sm = sms[0]
-
+
# at a different time, there is None
assert len(Submodule.list_items(rwrepo, self.k_no_subm_tag)) == 0
-
+
assert sm.path == 'git/ext/gitdb'
assert sm.path != sm.name # in our case, we have ids there, which don't equal the path
assert sm.url == 'git://github.com/gitpython-developers/gitdb.git'
@@ -64,26 +65,26 @@ class TestSubmodule(TestBase):
assert sm.size == 0
# the module is not checked-out yet
self.failUnlessRaises(InvalidGitRepositoryError, sm.module)
-
+
# which is why we can't get the branch either - it points into the module() repository
self.failUnlessRaises(InvalidGitRepositoryError, getattr, sm, 'branch')
-
+
# branch_path works, as its just a string
assert isinstance(sm.branch_path, basestring)
-
+
# some commits earlier we still have a submodule, but its at a different commit
smold = Submodule.iter_items(rwrepo, self.k_subm_changed).next()
assert smold.binsha != sm.binsha
assert smold != sm # the name changed
-
+
# force it to reread its information
del(smold._url)
smold.url == sm.url
-
+
# test config_reader/writer methods
sm.config_reader()
- new_smclone_path = None # keep custom paths for later
- new_csmclone_path = None #
+ new_smclone_path = None # keep custom paths for later
+ new_csmclone_path = None #
if rwrepo.bare:
self.failUnlessRaises(InvalidGitRepositoryError, sm.config_writer)
else:
@@ -96,25 +97,25 @@ class TestSubmodule(TestBase):
assert sm.url == new_smclone_path
# END handle bare repo
smold.config_reader()
-
+
# cannot get a writer on historical submodules
if not rwrepo.bare:
self.failUnlessRaises(ValueError, smold.config_writer)
# END handle bare repo
-
+
# make the old into a new - this doesn't work as the name changed
prev_parent_commit = smold.parent_commit
self.failUnlessRaises(ValueError, smold.set_parent_commit, self.k_subm_current)
# the sha is properly updated
- smold.set_parent_commit(self.k_subm_changed+"~1")
+ smold.set_parent_commit(self.k_subm_changed + "~1")
assert smold.binsha != sm.binsha
-
- # raises if the sm didn't exist in new parent - it keeps its
+
+ # raises if the sm didn't exist in new parent - it keeps its
# parent_commit unchanged
self.failUnlessRaises(ValueError, smold.set_parent_commit, self.k_no_subm_tag)
-
+
# TEST TODO: if a path in the gitmodules file, but not in the index, it raises
-
+
# TEST UPDATE
##############
# module retrieval is not always possible
@@ -126,107 +127,105 @@ class TestSubmodule(TestBase):
# its not checked out in our case
self.failUnlessRaises(InvalidGitRepositoryError, sm.module)
assert not sm.module_exists()
-
+
# currently there is only one submodule
assert len(list(rwrepo.iter_submodules())) == 1
- assert sm.binsha != "\0"*20
-
+ assert sm.binsha != "\0" * 20
+
# TEST ADD
###########
# preliminary tests
# adding existing returns exactly the existing
sma = Submodule.add(rwrepo, sm.name, sm.path)
assert sma.path == sm.path
-
+
# no url and no module at path fails
self.failUnlessRaises(ValueError, Submodule.add, rwrepo, "newsubm", "pathtorepo", url=None)
-
+
# CONTINUE UPDATE
#################
-
+
# lets update it - its a recursive one too
newdir = os.path.join(sm.abspath, 'dir')
os.makedirs(newdir)
-
+
# update fails if the path already exists non-empty
self.failUnlessRaises(OSError, sm.update)
os.rmdir(newdir)
-
+
# dry-run does nothing
sm.update(dry_run=True, progress=prog)
assert not sm.module_exists()
-
+
assert sm.update() is sm
sm_repopath = sm.path # cache for later
assert sm.module_exists()
assert isinstance(sm.module(), git.Repo)
assert sm.module().working_tree_dir == sm.abspath
-
+
# INTERLEAVE ADD TEST
#####################
# url must match the one in the existing repository ( if submodule name suggests a new one )
# or we raise
self.failUnlessRaises(ValueError, Submodule.add, rwrepo, "newsubm", sm.path, "git://someurl/repo.git")
-
-
+
# CONTINUE UPDATE
#################
# we should have setup a tracking branch, which is also active
assert sm.module().head.ref.tracking_branch() is not None
-
+
# delete the whole directory and re-initialize
shutil.rmtree(sm.abspath)
assert len(sm.children()) == 0
# dry-run does nothing
sm.update(dry_run=True, recursive=False, progress=prog)
assert len(sm.children()) == 0
-
+
sm.update(recursive=False)
assert len(list(rwrepo.iter_submodules())) == 2
assert len(sm.children()) == 1 # its not checked out yet
csm = sm.children()[0]
assert not csm.module_exists()
csm_repopath = csm.path
-
+
# adjust the path of the submodules module to point to the local destination
new_csmclone_path = to_native_path_linux(join_path_native(self.rorepo.working_tree_dir, sm.path, csm.path))
csm.config_writer().set_value('url', new_csmclone_path)
assert csm.url == new_csmclone_path
-
+
# dry-run does nothing
assert not csm.module_exists()
sm.update(recursive=True, dry_run=True, progress=prog)
assert not csm.module_exists()
-
+
# update recursively again
sm.update(recursive=True)
assert csm.module_exists()
-
+
# tracking branch once again
csm.module().head.ref.tracking_branch() is not None
-
+
# this flushed in a sub-submodule
assert len(list(rwrepo.iter_submodules())) == 2
-
-
+
# reset both heads to the previous version, verify that to_latest_revision works
smods = (sm.module(), csm.module())
for repo in smods:
repo.head.reset('HEAD~2', working_tree=1)
# END for each repo to reset
-
- # dry run does nothing
+
+ # dry run does nothing
sm.update(recursive=True, dry_run=True, progress=prog)
for repo in smods:
assert repo.head.commit != repo.head.ref.tracking_branch().commit
# END for each repo to check
-
+
sm.update(recursive=True, to_latest_revision=True)
for repo in smods:
assert repo.head.commit == repo.head.ref.tracking_branch().commit
# END for each repo to check
del(smods)
-
+
# if the head is detached, it still works ( but warns )
smref = sm.module().head.ref
sm.module().head.ref = 'HEAD~1'
@@ -234,15 +233,15 @@ class TestSubmodule(TestBase):
csm_tracking_branch = csm.module().head.ref.tracking_branch()
csm.module().head.ref.set_tracking_branch(None)
sm.update(recursive=True, to_latest_revision=True)
-
+
# to_latest_revision changes the child submodule's commit, it needs an
# update now
csm.set_parent_commit(csm.repo.head.commit)
-
+
# undo the changes
sm.module().head.ref = smref
csm.module().head.ref.set_tracking_branch(csm_tracking_branch)
-
+
# REMOVAL OF REPOSITOTRY
########################
# must delete something
@@ -260,21 +259,21 @@ class TestSubmodule(TestBase):
# still, we have the file modified
self.failUnlessRaises(InvalidGitRepositoryError, sm.remove, dry_run=True)
sm.module().index.reset(working_tree=True)
-
+
# enforce the submodule to be checked out at the right spot as well.
csm.update()
-
+
# this would work
assert sm.remove(dry_run=True) is sm
assert sm.module_exists()
sm.remove(force=True, dry_run=True)
assert sm.module_exists()
-
+
# but ... we have untracked files in the child submodule
fn = join_path_native(csm.module().working_tree_dir, "newfile")
open(fn, 'w').write("hi")
self.failUnlessRaises(InvalidGitRepositoryError, sm.remove)
-
+
# forcibly delete the child repository
prev_count = len(sm.children())
assert csm.remove(force=True) is csm
@@ -284,62 +283,62 @@ class TestSubmodule(TestBase):
# now we have a changed index, as configuration was altered.
# fix this
sm.module().index.reset(working_tree=True)
-
+
# now delete only the module of the main submodule
assert sm.module_exists()
sm.remove(configuration=False)
assert sm.exists()
assert not sm.module_exists()
assert sm.config_reader().get_value('url')
-
+
# delete the rest
sm.remove()
assert not sm.exists()
assert not sm.module_exists()
-
+
assert len(rwrepo.submodules) == 0
-
+
# ADD NEW SUBMODULE
###################
# add a simple remote repo - trailing slashes are no problem
smid = "newsub"
osmid = "othersub"
- nsm = Submodule.add(rwrepo, smid, sm_repopath, new_smclone_path+"/", None, no_checkout=True)
+ nsm = Submodule.add(rwrepo, smid, sm_repopath, new_smclone_path + "/", None, no_checkout=True)
assert nsm.name == smid
assert nsm.module_exists()
assert nsm.exists()
# its not checked out
assert not os.path.isfile(join_path_native(nsm.module().working_tree_dir, Submodule.k_modules_file))
assert len(rwrepo.submodules) == 1
-
+
# add another submodule, but into the root, not as submodule
osm = Submodule.add(rwrepo, osmid, csm_repopath, new_csmclone_path, Submodule.k_head_default)
assert osm != nsm
assert osm.module_exists()
assert osm.exists()
assert os.path.isfile(join_path_native(osm.module().working_tree_dir, 'setup.py'))
-
+
assert len(rwrepo.submodules) == 2
-
+
# commit the changes, just to finalize the operation
rwrepo.index.commit("my submod commit")
assert len(rwrepo.submodules) == 2
-
- # needs update as the head changed, it thinks its in the history
+
+ # needs update as the head changed, it thinks its in the history
# of the repo otherwise
nsm.set_parent_commit(rwrepo.head.commit)
osm.set_parent_commit(rwrepo.head.commit)
-
+
# MOVE MODULE
#############
# invalid inptu
self.failUnlessRaises(ValueError, nsm.move, 'doesntmatter', module=False, configuration=False)
-
+
# renaming to the same path does nothing
assert nsm.move(sm.path) is nsm
-
+
# rename a module
- nmp = join_path_native("new", "module", "dir") + "/" # new module path
+ nmp = join_path_native("new", "module", "dir") + "/" # new module path
pmp = nsm.path
abspmp = nsm.abspath
assert nsm.move(nmp) is nsm
@@ -347,49 +346,49 @@ class TestSubmodule(TestBase):
nmpl = to_native_path_linux(nmp)
assert nsm.path == nmpl
assert rwrepo.submodules[0].path == nmpl
-
+
mpath = 'newsubmodule'
absmpath = join_path_native(rwrepo.working_tree_dir, mpath)
open(absmpath, 'w').write('')
self.failUnlessRaises(ValueError, nsm.move, mpath)
os.remove(absmpath)
-
+
# now it works, as we just move it back
nsm.move(pmp)
assert nsm.path == pmp
assert rwrepo.submodules[0].path == pmp
-
+
# TODO lowprio: test remaining exceptions ... for now its okay, the code looks right
-
+
# REMOVE 'EM ALL
################
# if a submodule's repo has no remotes, it can't be added without an explicit url
osmod = osm.module()
-
+
osm.remove(module=False)
for remote in osmod.remotes:
remote.remove(osmod, remote.name)
assert not osm.exists()
- self.failUnlessRaises(ValueError, Submodule.add, rwrepo, osmid, csm_repopath, url=None)
+ self.failUnlessRaises(ValueError, Submodule.add, rwrepo, osmid, csm_repopath, url=None)
# END handle bare mode
-
+
# Error if there is no submodule file here
self.failUnlessRaises(IOError, Submodule._config_parser, rwrepo, rwrepo.commit(self.k_no_subm_tag), True)
-
+
@with_rw_repo(k_subm_current)
def test_base_rw(self, rwrepo):
self._do_base_tests(rwrepo)
-
+
@with_rw_repo(k_subm_current, bare=True)
def test_base_bare(self, rwrepo):
self._do_base_tests(rwrepo)
-
+
@with_rw_repo(k_subm_current, bare=False)
def test_root_module(self, rwrepo):
# Can query everything without problems
rm = RootModule(self.rorepo)
assert rm.module() is self.rorepo
-
+
# try attributes
rm.binsha
rm.mode
@@ -398,24 +397,24 @@ class TestSubmodule(TestBase):
assert rm.parent_commit == self.rorepo.head.commit
rm.url
rm.branch
-
+
assert len(rm.list_items(rm.module())) == 1
rm.config_reader()
rm.config_writer()
-
+
# deep traversal gitdb / async
rsmsp = [sm.path for sm in rm.traverse()]
assert len(rsmsp) >= 2 # gitdb and async [and smmap], async being a child of gitdb
-
+
# cannot set the parent commit as root module's path didn't exist
self.failUnlessRaises(ValueError, rm.set_parent_commit, 'HEAD')
-
+
# TEST UPDATE
#############
# setup commit which remove existing, add new and modify existing submodules
rm = RootModule(rwrepo)
assert len(rm.children()) == 1
-
+
# modify path without modifying the index entry
# ( which is what the move method would do properly )
#==================================================
@@ -424,37 +423,37 @@ class TestSubmodule(TestBase):
fp = join_path_native(pp, sm.path)
prep = sm.path
assert not sm.module_exists() # was never updated after rwrepo's clone
-
- # assure we clone from a local source
+
+ # assure we clone from a local source
sm.config_writer().set_value('url', to_native_path_linux(join_path_native(self.rorepo.working_tree_dir, sm.path)))
-
+
# dry-run does nothing
sm.update(recursive=False, dry_run=True, progress=prog)
assert not sm.module_exists()
-
+
sm.update(recursive=False)
assert sm.module_exists()
sm.config_writer().set_value('path', fp) # change path to something with prefix AFTER url change
-
+
# update fails as list_items in such a situations cannot work, as it cannot
# find the entry at the changed path
self.failUnlessRaises(InvalidGitRepositoryError, rm.update, recursive=False)
-
+
# move it properly - doesn't work as it its path currently points to an indexentry
# which doesn't exist ( move it to some path, it doesn't matter here )
self.failUnlessRaises(InvalidGitRepositoryError, sm.move, pp)
# reset the path(cache) to where it was, now it works
sm.path = prep
sm.move(fp, module=False) # leave it at the old location
-
+
assert not sm.module_exists()
- cpathchange = rwrepo.index.commit("changed sm path") # finally we can commit
-
+ cpathchange = rwrepo.index.commit("changed sm path") # finally we can commit
+
# update puts the module into place
rm.update(recursive=False, progress=prog)
sm.set_parent_commit(cpathchange)
assert sm.module_exists()
-
+
# add submodule
#================
nsmn = "newsubmodule"
@@ -468,17 +467,14 @@ class TestSubmodule(TestBase):
# repo and a new submodule comes into life
nsm.remove(configuration=False, module=True)
assert not nsm.module_exists() and nsm.exists()
-
-
+
# dry-run does nothing
rm.update(recursive=False, dry_run=True, progress=prog)
-
+
# otherwise it will work
rm.update(recursive=False, progress=prog)
assert nsm.module_exists()
-
-
-
+
# remove submodule - the previous one
#====================================
sm.set_parent_commit(csmadded)
@@ -486,45 +482,44 @@ class TestSubmodule(TestBase):
assert not sm.remove(module=False).exists()
assert os.path.isdir(smp) # module still exists
csmremoved = rwrepo.index.commit("Removed submodule")
-
+
# an update will remove the module
# not in dry_run
rm.update(recursive=False, dry_run=True)
assert os.path.isdir(smp)
-
+
rm.update(recursive=False)
assert not os.path.isdir(smp)
-
-
- # change url
+
+ # change url
#=============
- # to the first repository, this way we have a fast checkout, and a completely different
+ # to the first repository, this way we have a fast checkout, and a completely different
# repository at the different url
nsm.set_parent_commit(csmremoved)
nsmurl = to_native_path_linux(join_path_native(self.rorepo.working_tree_dir, rsmsp[0]))
nsm.config_writer().set_value('url', nsmurl)
csmpathchange = rwrepo.index.commit("changed url")
nsm.set_parent_commit(csmpathchange)
-
+
prev_commit = nsm.module().head.commit
# dry-run does nothing
rm.update(recursive=False, dry_run=True, progress=prog)
assert nsm.module().remotes.origin.url != nsmurl
-
+
rm.update(recursive=False, progress=prog)
assert nsm.module().remotes.origin.url == nsmurl
# head changed, as the remote url and its commit changed
assert prev_commit != nsm.module().head.commit
-
+
# add the submodule's changed commit to the index, which is what the
# user would do
# beforehand, update our instance's binsha with the new one
nsm.binsha = nsm.module().head.commit.binsha
rwrepo.index.add([nsm])
-
+
# change branch
#=================
- # we only have one branch, so we switch to a virtual one, and back
+ # we only have one branch, so we switch to a virtual one, and back
# to the current one to trigger the difference
cur_branch = nsm.branch
nsmm = nsm.module()
@@ -534,34 +529,33 @@ class TestSubmodule(TestBase):
csmbranchchange = rwrepo.index.commit("changed branch to %s" % branch)
nsm.set_parent_commit(csmbranchchange)
# END for each branch to change
-
+
# Lets remove our tracking branch to simulate some changes
nsmmh = nsmm.head
assert nsmmh.ref.tracking_branch() is None # never set it up until now
assert not nsmmh.is_detached
-
+
#dry run does nothing
rm.update(recursive=False, dry_run=True, progress=prog)
assert nsmmh.ref.tracking_branch() is None
-
+
# the real thing does
rm.update(recursive=False, progress=prog)
-
+
assert nsmmh.ref.tracking_branch() is not None
assert not nsmmh.is_detached
-
+
# recursive update
# =================
# finally we recursively update a module, just to run the code at least once
# remove the module so that it has more work
- assert len(nsm.children()) >= 1 # could include smmap
+ assert len(nsm.children()) >= 1 # could include smmap
assert nsm.exists() and nsm.module_exists() and len(nsm.children()) >= 1
# assure we pull locally only
- nsmc = nsm.children()[0]
+ nsmc = nsm.children()[0]
nsmc.config_writer().set_value('url', async_url)
rm.update(recursive=True, progress=prog, dry_run=True) # just to run the code
rm.update(recursive=True, progress=prog)
-
+
# gitdb: has either 1 or 2 submodules depending on the version
assert len(nsm.children()) >= 1 and nsmc.module_exists()
-
diff --git a/git/test/test_tree.py b/git/test/test_tree.py
index 16d5be59..0f1fb7c3 100644
--- a/git/test/test_tree.py
+++ b/git/test/test_tree.py
@@ -13,8 +13,9 @@ from git.objects.fun import (
)
from cStringIO import StringIO
+
class TestTree(TestBase):
-
+
def test_serializable(self):
# tree at the given commit contains a submodule as well
roottree = self.rorepo.tree('6c1faef799095f3990e9970bc2cb10aa0221cf9c')
@@ -25,75 +26,74 @@ class TestTree(TestBase):
tree = item
# trees have no dict
self.failUnlessRaises(AttributeError, setattr, tree, 'someattr', 1)
-
+
orig_data = tree.data_stream.read()
orig_cache = tree._cache
-
+
stream = StringIO()
tree._serialize(stream)
assert stream.getvalue() == orig_data
-
+
stream.seek(0)
testtree = Tree(self.rorepo, Tree.NULL_BIN_SHA, 0, '')
testtree._deserialize(stream)
assert testtree._cache == orig_cache
-
-
+
# TEST CACHE MUTATOR
mod = testtree.cache
self.failUnlessRaises(ValueError, mod.add, "invalid sha", 0, "name")
self.failUnlessRaises(ValueError, mod.add, Tree.NULL_HEX_SHA, 0, "invalid mode")
self.failUnlessRaises(ValueError, mod.add, Tree.NULL_HEX_SHA, tree.mode, "invalid/name")
-
+
# add new item
name = "fake_dir"
mod.add(testtree.NULL_HEX_SHA, tree.mode, name)
assert name in testtree
-
+
# its available in the tree immediately
assert isinstance(testtree[name], Tree)
-
+
# adding it again will not cause multiple of them to be presents
cur_count = len(testtree)
mod.add(testtree.NULL_HEX_SHA, tree.mode, name)
assert len(testtree) == cur_count
-
+
# fails with a different sha - name exists
- hexsha = "1"*40
+ hexsha = "1" * 40
self.failUnlessRaises(ValueError, mod.add, hexsha, tree.mode, name)
-
+
# force it - replace existing one
mod.add(hexsha, tree.mode, name, force=True)
assert testtree[name].hexsha == hexsha
assert len(testtree) == cur_count
-
+
# unchecked addition always works, even with invalid items
invalid_name = "hi/there"
mod.add_unchecked(hexsha, 0, invalid_name)
assert len(testtree) == cur_count + 1
-
+
del(mod[invalid_name])
assert len(testtree) == cur_count
# del again, its fine
del(mod[invalid_name])
-
+
# have added one item, we are done
mod.set_done()
mod.set_done() # multiple times are okay
-
+
# serialize, its different now
stream = StringIO()
testtree._serialize(stream)
stream.seek(0)
assert stream.getvalue() != orig_data
-
+
# replaces cache, but we make sure of it
del(testtree._cache)
testtree._deserialize(stream)
assert name in testtree
assert invalid_name not in testtree
# END for each item in tree
-
+
def test_traverse(self):
root = self.rorepo.tree('0.1.6')
num_recursive = 0
@@ -101,34 +101,34 @@ class TestTree(TestBase):
for obj in root.traverse():
if "/" in obj.path:
num_recursive += 1
-
+
assert isinstance(obj, (Blob, Tree))
all_items.append(obj)
# END for each object
assert all_items == root.list_traverse()
-
+
# limit recursion level to 0 - should be same as default iteration
assert all_items
assert 'CHANGES' in root
assert len(list(root)) == len(list(root.traverse(depth=1)))
-
+
# only choose trees
- trees_only = lambda i,d: i.type == "tree"
- trees = list(root.traverse(predicate = trees_only))
- assert len(trees) == len(list( i for i in root.traverse() if trees_only(i,0) ))
-
+ trees_only = lambda i, d: i.type == "tree"
+ trees = list(root.traverse(predicate=trees_only))
+ assert len(trees) == len(list(i for i in root.traverse() if trees_only(i, 0)))
+
# test prune
- lib_folder = lambda t,d: t.path == "lib"
- pruned_trees = list(root.traverse(predicate = trees_only,prune = lib_folder))
+ lib_folder = lambda t, d: t.path == "lib"
+ pruned_trees = list(root.traverse(predicate=trees_only, prune=lib_folder))
assert len(pruned_trees) < len(trees)
-
+
# trees and blobs
- assert len(set(trees)|set(root.trees)) == len(trees)
- assert len(set(b for b in root if isinstance(b, Blob)) | set(root.blobs)) == len( root.blobs )
+ assert len(set(trees) | set(root.trees)) == len(trees)
+ assert len(set(b for b in root if isinstance(b, Blob)) | set(root.blobs)) == len(root.blobs)
subitem = trees[0][0]
assert "/" in subitem.path
assert subitem.name == os.path.basename(subitem.path)
-
+
# assure that at some point the traversed paths have a slash in them
found_slash = False
for item in root.traverse():
@@ -136,9 +136,8 @@ class TestTree(TestBase):
if '/' in item.path:
found_slash = True
# END check for slash
-
- # slashes in paths are supported as well
- assert root[item.path] == item == root/item.path
+
+ # slashes in paths are supported as well
+ assert root[item.path] == item == root / item.path
# END for each item
assert found_slash
-
diff --git a/git/test/test_util.py b/git/test/test_util.py
index ea761217..63842d19 100644
--- a/git/test/test_util.py
+++ b/git/test/test_util.py
@@ -17,59 +17,60 @@ import time
class TestIterableMember(object):
+
"""A member of an iterable list"""
__slots__ = ("name", "prefix_name")
-
+
def __init__(self, name):
self.name = name
self.prefix_name = name
-
+
class TestUtils(TestBase):
+
def setup(self):
self.testdict = {
- "string": "42",
- "int": 42,
- "array": [ 42 ],
+ "string": "42",
+ "int": 42,
+ "array": [42],
}
def test_it_should_dashify(self):
assert_equal('this-is-my-argument', dashify('this_is_my_argument'))
assert_equal('foo', dashify('foo'))
-
-
+
def test_lock_file(self):
my_file = tempfile.mktemp()
lock_file = LockFile(my_file)
assert not lock_file._has_lock()
# release lock we don't have - fine
lock_file._release_lock()
-
+
# get lock
lock_file._obtain_lock_or_raise()
assert lock_file._has_lock()
-
+
# concurrent access
other_lock_file = LockFile(my_file)
assert not other_lock_file._has_lock()
self.failUnlessRaises(IOError, other_lock_file._obtain_lock_or_raise)
-
+
lock_file._release_lock()
assert not lock_file._has_lock()
-
+
other_lock_file._obtain_lock_or_raise()
self.failUnlessRaises(IOError, lock_file._obtain_lock_or_raise)
-
+
# auto-release on destruction
del(other_lock_file)
lock_file._obtain_lock_or_raise()
lock_file._release_lock()
-
+
def test_blocking_lock_file(self):
my_file = tempfile.mktemp()
lock_file = BlockingLockFile(my_file)
lock_file._obtain_lock()
-
+
# next one waits for the lock
start = time.time()
wait_time = 0.1
@@ -77,10 +78,10 @@ class TestUtils(TestBase):
self.failUnlessRaises(IOError, wait_lock._obtain_lock)
elapsed = time.time() - start
assert elapsed <= wait_time + 0.02 # some extra time it may cost
-
+
def test_user_id(self):
assert '@' in get_user_id()
-
+
def test_parse_date(self):
# test all supported formats
def assert_rval(rval, veri_time, offset=0):
@@ -88,13 +89,13 @@ class TestUtils(TestBase):
assert isinstance(rval[0], int) and isinstance(rval[1], int)
assert rval[0] == veri_time
assert rval[1] == offset
-
+
# now that we are here, test our conversion functions as well
utctz = altz_to_utctz_str(offset)
assert isinstance(utctz, basestring)
assert utctz_to_altz(verify_utctz(utctz)) == offset
# END assert rval utility
-
+
rfc = ("Thu, 07 Apr 2005 22:13:11 +0000", 0)
iso = ("2005-04-07T22:13:11 -0200", 7200)
iso2 = ("2005-04-07 22:13:11 +0400", -14400)
@@ -105,52 +106,52 @@ class TestUtils(TestBase):
for date, offset in (rfc, iso, iso2, iso3, alt, alt2):
assert_rval(parse_date(date), veri_time, offset)
# END for each date type
-
+
# and failure
self.failUnlessRaises(ValueError, parse_date, 'invalid format')
self.failUnlessRaises(ValueError, parse_date, '123456789 -02000')
self.failUnlessRaises(ValueError, parse_date, ' 123456789 -0200')
-
+
def test_actor(self):
for cr in (None, self.rorepo.config_reader()):
assert isinstance(Actor.committer(cr), Actor)
assert isinstance(Actor.author(cr), Actor)
#END assure config reader is handled
-
+
def test_iterable_list(self):
for args in (('name',), ('name', 'prefix_')):
l = IterableList('name')
-
+
m1 = TestIterableMember('one')
m2 = TestIterableMember('two')
-
+
l.extend((m1, m2))
-
+
assert len(l) == 2
-
+
# contains works with name and identity
assert m1.name in l
assert m2.name in l
assert m2 in l
assert m2 in l
assert 'invalid' not in l
-
+
# with string index
assert l[m1.name] is m1
assert l[m2.name] is m2
-
+
# with int index
assert l[0] is m1
assert l[1] is m2
-
+
# with getattr
assert l.one is m1
assert l.two is m2
-
+
# test exceptions
self.failUnlessRaises(AttributeError, getattr, l, 'something')
self.failUnlessRaises(IndexError, l.__getitem__, 'something')
-
+
# delete by name and index
self.failUnlessRaises(IndexError, l.__delitem__, 'something')
del(l[m2.name])
@@ -159,8 +160,7 @@ class TestUtils(TestBase):
del(l[0])
assert m1.name not in l
assert len(l) == 0
-
+
self.failUnlessRaises(IndexError, l.__delitem__, 0)
self.failUnlessRaises(IndexError, l.__delitem__, 'something')
#END for each possible mode
-