Spaces:
Paused
Paused
| # Copyright (C) 2010, 2011 Sebastian Thiel ([email protected]) and contributors | |
| # | |
| # This module is part of GitDB and is released under | |
| # the New BSD License: https://opensource.org/license/bsd-3-clause/ | |
| """Test everything about packs reading and writing""" | |
| from gitdb.test.lib import ( | |
| TestBase, | |
| with_rw_directory, | |
| fixture_path | |
| ) | |
| from gitdb.stream import DeltaApplyReader | |
| from gitdb.pack import ( | |
| PackEntity, | |
| PackIndexFile, | |
| PackFile | |
| ) | |
| from gitdb.base import ( | |
| OInfo, | |
| OStream, | |
| ) | |
| from gitdb.fun import delta_types | |
| from gitdb.exc import UnsupportedOperation | |
| from gitdb.util import to_bin_sha | |
| import pytest | |
| import os | |
| import tempfile | |
| #{ Utilities | |
| def bin_sha_from_filename(filename): | |
| return to_bin_sha(os.path.splitext(os.path.basename(filename))[0][5:]) | |
| #} END utilities | |
| class TestPack(TestBase): | |
| packindexfile_v1 = (fixture_path('packs/pack-c0438c19fb16422b6bbcce24387b3264416d485b.idx'), 1, 67) | |
| packindexfile_v2 = (fixture_path('packs/pack-11fdfa9e156ab73caae3b6da867192221f2089c2.idx'), 2, 30) | |
| packindexfile_v2_3_ascii = (fixture_path('packs/pack-a2bf8e71d8c18879e499335762dd95119d93d9f1.idx'), 2, 42) | |
| packfile_v2_1 = (fixture_path('packs/pack-c0438c19fb16422b6bbcce24387b3264416d485b.pack'), 2, packindexfile_v1[2]) | |
| packfile_v2_2 = (fixture_path('packs/pack-11fdfa9e156ab73caae3b6da867192221f2089c2.pack'), 2, packindexfile_v2[2]) | |
| packfile_v2_3_ascii = ( | |
| fixture_path('packs/pack-a2bf8e71d8c18879e499335762dd95119d93d9f1.pack'), 2, packindexfile_v2_3_ascii[2]) | |
| def _assert_index_file(self, index, version, size): | |
| assert index.packfile_checksum() != index.indexfile_checksum() | |
| assert len(index.packfile_checksum()) == 20 | |
| assert len(index.indexfile_checksum()) == 20 | |
| assert index.version() == version | |
| assert index.size() == size | |
| assert len(index.offsets()) == size | |
| # get all data of all objects | |
| for oidx in range(index.size()): | |
| sha = index.sha(oidx) | |
| assert oidx == index.sha_to_index(sha) | |
| entry = index.entry(oidx) | |
| assert len(entry) == 3 | |
| assert entry[0] == index.offset(oidx) | |
| assert entry[1] == sha | |
| assert entry[2] == index.crc(oidx) | |
| # verify partial sha | |
| for l in (4, 8, 11, 17, 20): | |
| assert index.partial_sha_to_index(sha[:l], l * 2) == oidx | |
| # END for each object index in indexfile | |
| self.assertRaises(ValueError, index.partial_sha_to_index, "\0", 2) | |
| def _assert_pack_file(self, pack, version, size): | |
| assert pack.version() == 2 | |
| assert pack.size() == size | |
| assert len(pack.checksum()) == 20 | |
| num_obj = 0 | |
| for obj in pack.stream_iter(): | |
| num_obj += 1 | |
| info = pack.info(obj.pack_offset) | |
| stream = pack.stream(obj.pack_offset) | |
| assert info.pack_offset == stream.pack_offset | |
| assert info.type_id == stream.type_id | |
| assert hasattr(stream, 'read') | |
| # it should be possible to read from both streams | |
| assert obj.read() == stream.read() | |
| streams = pack.collect_streams(obj.pack_offset) | |
| assert streams | |
| # read the stream | |
| try: | |
| dstream = DeltaApplyReader.new(streams) | |
| except ValueError: | |
| # ignore these, old git versions use only ref deltas, | |
| # which we haven't resolved ( as we are without an index ) | |
| # Also ignore non-delta streams | |
| continue | |
| # END get deltastream | |
| # read all | |
| data = dstream.read() | |
| assert len(data) == dstream.size | |
| # test seek | |
| dstream.seek(0) | |
| assert dstream.read() == data | |
| # read chunks | |
| # NOTE: the current implementation is safe, it basically transfers | |
| # all calls to the underlying memory map | |
| # END for each object | |
| assert num_obj == size | |
| def test_pack_index(self): | |
| # check version 1 and 2 | |
| for indexfile, version, size in (self.packindexfile_v1, self.packindexfile_v2): | |
| index = PackIndexFile(indexfile) | |
| self._assert_index_file(index, version, size) | |
| # END run tests | |
| def test_pack(self): | |
| # there is this special version 3, but apparently its like 2 ... | |
| for packfile, version, size in (self.packfile_v2_3_ascii, self.packfile_v2_1, self.packfile_v2_2): | |
| pack = PackFile(packfile) | |
| self._assert_pack_file(pack, version, size) | |
| # END for each pack to test | |
| def test_pack_entity(self, rw_dir): | |
| pack_objs = list() | |
| for packinfo, indexinfo in ((self.packfile_v2_1, self.packindexfile_v1), | |
| (self.packfile_v2_2, self.packindexfile_v2), | |
| (self.packfile_v2_3_ascii, self.packindexfile_v2_3_ascii)): | |
| packfile, version, size = packinfo | |
| indexfile, version, size = indexinfo | |
| entity = PackEntity(packfile) | |
| assert entity.pack().path() == packfile | |
| assert entity.index().path() == indexfile | |
| pack_objs.extend(entity.stream_iter()) | |
| count = 0 | |
| for info, stream in zip(entity.info_iter(), entity.stream_iter()): | |
| count += 1 | |
| assert info.binsha == stream.binsha | |
| assert len(info.binsha) == 20 | |
| assert info.type_id == stream.type_id | |
| assert info.size == stream.size | |
| # we return fully resolved items, which is implied by the sha centric access | |
| assert not info.type_id in delta_types | |
| # try all calls | |
| assert len(entity.collect_streams(info.binsha)) | |
| oinfo = entity.info(info.binsha) | |
| assert isinstance(oinfo, OInfo) | |
| assert oinfo.binsha is not None | |
| ostream = entity.stream(info.binsha) | |
| assert isinstance(ostream, OStream) | |
| assert ostream.binsha is not None | |
| # verify the stream | |
| try: | |
| assert entity.is_valid_stream(info.binsha, use_crc=True) | |
| except UnsupportedOperation: | |
| pass | |
| # END ignore version issues | |
| assert entity.is_valid_stream(info.binsha, use_crc=False) | |
| # END for each info, stream tuple | |
| assert count == size | |
| # END for each entity | |
| # pack writing - write all packs into one | |
| # index path can be None | |
| pack_path1 = tempfile.mktemp('', "pack1", rw_dir) | |
| pack_path2 = tempfile.mktemp('', "pack2", rw_dir) | |
| index_path = tempfile.mktemp('', 'index', rw_dir) | |
| iteration = 0 | |
| def rewind_streams(): | |
| for obj in pack_objs: | |
| obj.stream.seek(0) | |
| # END utility | |
| for ppath, ipath, num_obj in zip((pack_path1, pack_path2), | |
| (index_path, None), | |
| (len(pack_objs), None)): | |
| iwrite = None | |
| if ipath: | |
| ifile = open(ipath, 'wb') | |
| iwrite = ifile.write | |
| # END handle ip | |
| # make sure we rewind the streams ... we work on the same objects over and over again | |
| if iteration > 0: | |
| rewind_streams() | |
| # END rewind streams | |
| iteration += 1 | |
| with open(ppath, 'wb') as pfile: | |
| pack_sha, index_sha = PackEntity.write_pack(pack_objs, pfile.write, iwrite, object_count=num_obj) | |
| assert os.path.getsize(ppath) > 100 | |
| # verify pack | |
| pf = PackFile(ppath) | |
| assert pf.size() == len(pack_objs) | |
| assert pf.version() == PackFile.pack_version_default | |
| assert pf.checksum() == pack_sha | |
| pf.close() | |
| # verify index | |
| if ipath is not None: | |
| ifile.close() | |
| assert os.path.getsize(ipath) > 100 | |
| idx = PackIndexFile(ipath) | |
| assert idx.version() == PackIndexFile.index_version_default | |
| assert idx.packfile_checksum() == pack_sha | |
| assert idx.indexfile_checksum() == index_sha | |
| assert idx.size() == len(pack_objs) | |
| idx.close() | |
| # END verify files exist | |
| # END for each packpath, indexpath pair | |
| # verify the packs thoroughly | |
| rewind_streams() | |
| entity = PackEntity.create(pack_objs, rw_dir) | |
| count = 0 | |
| for info in entity.info_iter(): | |
| count += 1 | |
| for use_crc in range(2): | |
| assert entity.is_valid_stream(info.binsha, use_crc) | |
| # END for each crc mode | |
| # END for each info | |
| assert count == len(pack_objs) | |
| entity.close() | |
| def test_pack_64(self): | |
| # TODO: hex-edit a pack helping us to verify that we can handle 64 byte offsets | |
| # of course without really needing such a huge pack | |
| pytest.skip('not implemented') | |