summaryrefslogtreecommitdiff
path: root/git/test/test_pack.py
diff options
context:
space:
mode:
authorCraig Northway <craig.northway@gmail.com>2014-07-25 11:26:48 +1000
committerCraig Northway <craig.northway@gmail.com>2014-07-25 19:19:22 +1000
commita23d0d8617ba3119069e610fc7b0850a17322726 (patch)
treed8bc5213dd7e7c7f0befdf65afecb13d5435f873 /git/test/test_pack.py
parent75194159abce545bfa38c3172efb42da9b0017dc (diff)
downloadgitpython-a23d0d8617ba3119069e610fc7b0850a17322726.tar.gz
Autopep8 fixes with maximum line length 120
Diffstat (limited to 'git/test/test_pack.py')
-rw-r--r--git/test/test_pack.py134
1 files changed, 66 insertions, 68 deletions
diff --git a/git/test/test_pack.py b/git/test/test_pack.py
index 1c308689..665a0226 100644
--- a/git/test/test_pack.py
+++ b/git/test/test_pack.py
@@ -4,23 +4,23 @@
# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
"""Test everything about packs reading and writing"""
from lib import (
- TestBase,
- with_rw_directory,
- with_packs_rw,
- fixture_path
- )
+ TestBase,
+ with_rw_directory,
+ with_packs_rw,
+ fixture_path
+)
from git.stream import DeltaApplyReader
from git.pack import (
- PackEntity,
- PackIndexFile,
- PackFile
- )
+ PackEntity,
+ PackIndexFile,
+ PackFile
+)
from git.base import (
- OInfo,
- OStream,
- )
+ OInfo,
+ OStream,
+)
from git.fun import delta_types
from git.exc import UnsupportedOperation
@@ -38,16 +38,17 @@ def bin_sha_from_filename(filename):
return to_bin_sha(os.path.splitext(os.path.basename(filename))[0][5:])
#} END utilities
+
class TestPack(TestBase):
-
+
packindexfile_v1 = (fixture_path('packs/pack-c0438c19fb16422b6bbcce24387b3264416d485b.idx'), 1, 67)
packindexfile_v2 = (fixture_path('packs/pack-11fdfa9e156ab73caae3b6da867192221f2089c2.idx'), 2, 30)
packindexfile_v2_3_ascii = (fixture_path('packs/pack-a2bf8e71d8c18879e499335762dd95119d93d9f1.idx'), 2, 42)
packfile_v2_1 = (fixture_path('packs/pack-c0438c19fb16422b6bbcce24387b3264416d485b.pack'), 2, packindexfile_v1[2])
packfile_v2_2 = (fixture_path('packs/pack-11fdfa9e156ab73caae3b6da867192221f2089c2.pack'), 2, packindexfile_v2[2])
- packfile_v2_3_ascii = (fixture_path('packs/pack-a2bf8e71d8c18879e499335762dd95119d93d9f1.pack'), 2, packindexfile_v2_3_ascii[2])
-
-
+ packfile_v2_3_ascii = (
+ fixture_path('packs/pack-a2bf8e71d8c18879e499335762dd95119d93d9f1.pack'), 2, packindexfile_v2_3_ascii[2])
+
def _assert_index_file(self, index, version, size):
assert index.packfile_checksum() != index.indexfile_checksum()
assert len(index.packfile_checksum()) == 20
@@ -55,102 +56,99 @@ class TestPack(TestBase):
assert index.version() == version
assert index.size() == size
assert len(index.offsets()) == size
-
+
# get all data of all objects
for oidx in xrange(index.size()):
sha = index.sha(oidx)
assert oidx == index.sha_to_index(sha)
-
+
entry = index.entry(oidx)
assert len(entry) == 3
-
+
assert entry[0] == index.offset(oidx)
assert entry[1] == sha
assert entry[2] == index.crc(oidx)
-
+
# verify partial sha
- for l in (4,8,11,17,20):
- assert index.partial_sha_to_index(sha[:l], l*2) == oidx
-
+ for l in (4, 8, 11, 17, 20):
+ assert index.partial_sha_to_index(sha[:l], l * 2) == oidx
+
# END for each object index in indexfile
self.failUnlessRaises(ValueError, index.partial_sha_to_index, "\0", 2)
-
-
+
def _assert_pack_file(self, pack, version, size):
assert pack.version() == 2
assert pack.size() == size
assert len(pack.checksum()) == 20
-
+
num_obj = 0
for obj in pack.stream_iter():
num_obj += 1
info = pack.info(obj.pack_offset)
stream = pack.stream(obj.pack_offset)
-
+
assert info.pack_offset == stream.pack_offset
assert info.type_id == stream.type_id
assert hasattr(stream, 'read')
-
+
# it should be possible to read from both streams
assert obj.read() == stream.read()
-
+
streams = pack.collect_streams(obj.pack_offset)
assert streams
-
+
# read the stream
try:
dstream = DeltaApplyReader.new(streams)
except ValueError:
- # ignore these, old git versions use only ref deltas,
+ # ignore these, old git versions use only ref deltas,
# which we havent resolved ( as we are without an index )
# Also ignore non-delta streams
continue
# END get deltastream
-
+
# read all
data = dstream.read()
assert len(data) == dstream.size
-
+
# test seek
dstream.seek(0)
assert dstream.read() == data
-
-
+
# read chunks
# NOTE: the current implementation is safe, it basically transfers
# all calls to the underlying memory map
-
+
# END for each object
assert num_obj == size
-
-
+
def test_pack_index(self):
# check version 1 and 2
- for indexfile, version, size in (self.packindexfile_v1, self.packindexfile_v2):
+ for indexfile, version, size in (self.packindexfile_v1, self.packindexfile_v2):
index = PackIndexFile(indexfile)
self._assert_index_file(index, version, size)
# END run tests
-
+
def test_pack(self):
- # there is this special version 3, but apparently its like 2 ...
+ # there is this special version 3, but apparently its like 2 ...
for packfile, version, size in (self.packfile_v2_3_ascii, self.packfile_v2_1, self.packfile_v2_2):
pack = PackFile(packfile)
self._assert_pack_file(pack, version, size)
# END for each pack to test
-
+
@with_rw_directory
def test_pack_entity(self, rw_dir):
pack_objs = list()
- for packinfo, indexinfo in ( (self.packfile_v2_1, self.packindexfile_v1),
- (self.packfile_v2_2, self.packindexfile_v2),
- (self.packfile_v2_3_ascii, self.packindexfile_v2_3_ascii)):
+ for packinfo, indexinfo in ((self.packfile_v2_1, self.packindexfile_v1),
+ (self.packfile_v2_2, self.packindexfile_v2),
+ (self.packfile_v2_3_ascii, self.packindexfile_v2_3_ascii)):
packfile, version, size = packinfo
indexfile, version, size = indexinfo
entity = PackEntity(packfile)
assert entity.pack().path() == packfile
assert entity.index().path() == indexfile
pack_objs.extend(entity.stream_iter())
-
+
count = 0
for info, stream in izip(entity.info_iter(), entity.stream_iter()):
count += 1
@@ -158,10 +156,10 @@ class TestPack(TestBase):
assert len(info.binsha) == 20
assert info.type_id == stream.type_id
assert info.size == stream.size
-
+
# we return fully resolved items, which is implied by the sha centric access
assert not info.type_id in delta_types
-
+
# try all calls
assert len(entity.collect_streams(info.binsha))
oinfo = entity.info(info.binsha)
@@ -170,7 +168,7 @@ class TestPack(TestBase):
ostream = entity.stream(info.binsha)
assert isinstance(ostream, OStream)
assert ostream.binsha is not None
-
+
# verify the stream
try:
assert entity.is_valid_stream(info.binsha, use_crc=True)
@@ -180,42 +178,43 @@ class TestPack(TestBase):
assert entity.is_valid_stream(info.binsha, use_crc=False)
# END for each info, stream tuple
assert count == size
-
+
# END for each entity
-
+
# pack writing - write all packs into one
# index path can be None
pack_path = tempfile.mktemp('', "pack", rw_dir)
index_path = tempfile.mktemp('', 'index', rw_dir)
iteration = 0
+
def rewind_streams():
- for obj in pack_objs:
+ for obj in pack_objs:
obj.stream.seek(0)
- #END utility
- for ppath, ipath, num_obj in zip((pack_path, )*2, (index_path, None), (len(pack_objs), None)):
+ # END utility
+ for ppath, ipath, num_obj in zip((pack_path, ) * 2, (index_path, None), (len(pack_objs), None)):
pfile = open(ppath, 'wb')
iwrite = None
if ipath:
ifile = open(ipath, 'wb')
iwrite = ifile.write
- #END handle ip
-
+ # END handle ip
+
# make sure we rewind the streams ... we work on the same objects over and over again
- if iteration > 0:
+ if iteration > 0:
rewind_streams()
- #END rewind streams
+ # END rewind streams
iteration += 1
-
+
pack_sha, index_sha = PackEntity.write_pack(pack_objs, pfile.write, iwrite, object_count=num_obj)
pfile.close()
assert os.path.getsize(ppath) > 100
-
+
# verify pack
pf = PackFile(ppath)
assert pf.size() == len(pack_objs)
assert pf.version() == PackFile.pack_version_default
assert pf.checksum() == pack_sha
-
+
# verify index
if ipath is not None:
ifile.close()
@@ -225,9 +224,9 @@ class TestPack(TestBase):
assert idx.packfile_checksum() == pack_sha
assert idx.indexfile_checksum() == index_sha
assert idx.size() == len(pack_objs)
- #END verify files exist
- #END for each packpath, indexpath pair
-
+ # END verify files exist
+ # END for each packpath, indexpath pair
+
# verify the packs throughly
rewind_streams()
entity = PackEntity.create(pack_objs, rw_dir)
@@ -237,11 +236,10 @@ class TestPack(TestBase):
for use_crc in range(2):
assert entity.is_valid_stream(info.binsha, use_crc)
# END for each crc mode
- #END for each info
+ # END for each info
assert count == len(pack_objs)
-
-
+
def test_pack_64(self):
# TODO: hex-edit a pack helping us to verify that we can handle 64 byte offsets
- # of course without really needing such a huge pack
+ # of course without really needing such a huge pack
raise SkipTest()