diff options
| author | Sebastian Thiel <byronimo@gmail.com> | 2011-03-31 18:01:05 +0200 |
|---|---|---|
| committer | Sebastian Thiel <byronimo@gmail.com> | 2011-03-31 18:01:05 +0200 |
| commit | e83210d99aaac5768827c448909fa04d63776e64 (patch) | |
| tree | 3c074e0526b5db8510899463f3eb390a18ea42b6 /gitdb | |
| parent | 810d1e38315c6e886c1daef93670840b213ee78a (diff) | |
| download | gitdb-e83210d99aaac5768827c448909fa04d63776e64.tar.gz | |
initial version of pack writing, which seems to work, but still needs some more testing and verification
Diffstat (limited to 'gitdb')
| -rw-r--r-- | gitdb/exc.py | 3 | ||||
| -rw-r--r-- | gitdb/fun.py | 20 | ||||
| -rw-r--r-- | gitdb/pack.py | 196 | ||||
| -rw-r--r-- | gitdb/stream.py | 18 | ||||
| -rw-r--r-- | gitdb/test/test_pack.py | 51 |
5 files changed, 266 insertions, 22 deletions
diff --git a/gitdb/exc.py b/gitdb/exc.py index 96fa874..e087047 100644 --- a/gitdb/exc.py +++ b/gitdb/exc.py @@ -17,6 +17,9 @@ class BadObject(ODBError): def __str__(self): return "BadObject: %s" % to_hex_sha(self.args[0]) + +class ParseError(ODBError): + """Thrown if the parsing of a file failed due to an invalid format""" class AmbiguousObjectName(ODBError): """Thrown if a possibly shortened name does not uniquely represent a single object diff --git a/gitdb/fun.py b/gitdb/fun.py index 34978cd..5bbe8ef 100644 --- a/gitdb/fun.py +++ b/gitdb/fun.py @@ -48,7 +48,7 @@ chunk_size = 1000*mmap.PAGESIZE __all__ = ('is_loose_object', 'loose_object_header_info', 'msb_size', 'pack_object_header_info', 'write_object', 'loose_object_header', 'stream_copy', 'apply_delta_data', - 'is_equal_canonical_sha', 'connect_deltas', 'DeltaChunkList') + 'is_equal_canonical_sha', 'connect_deltas', 'DeltaChunkList', 'create_pack_object_header') #{ Structures @@ -412,6 +412,24 @@ def pack_object_header_info(data): s += 7 # END character loop return (type_id, size, i) + +def create_pack_object_header(obj_type, obj_size): + """:return: string defining the pack header comprised of the object type + and its incompressed size in bytes + :parmam obj_type: pack type_id of the object + :param obj_size: uncompressed size in bytes of the following object stream""" + c = 0 # 1 byte + hdr = str() # output string + + c = (obj_type << 4) | (obj_size & 0xf) + obj_size >>= 4 + while obj_size: + hdr += chr(c | 0x80) + c = obj_size & 0x7f + obj_size >>= 7 + #END until size is consumed + hdr += chr(c) + return hdr def msb_size(data, offset=0): """ diff --git a/gitdb/pack.py b/gitdb/pack.py index 335fe3c..6c32949 100644 --- a/gitdb/pack.py +++ b/gitdb/pack.py @@ -5,7 +5,8 @@ """Contains PackIndexFile and PackFile implementations""" from gitdb.exc import ( BadObject, - UnsupportedOperation + UnsupportedOperation, + ParseError ) from util import ( zlib, @@ -15,6 +16,7 @@ from util import ( ) from fun import ( + create_pack_object_header, pack_object_header_info, is_equal_canonical_sha, type_id_to_type_map, @@ -47,6 +49,7 @@ from stream import ( DeltaApplyReader, Sha1Writer, NullStream, + FlexibleSha1Writer ) from struct import ( @@ -54,6 +57,8 @@ from struct import ( unpack, ) +from binascii import crc32 + from itertools import izip import array import os @@ -119,10 +124,113 @@ def pack_object_at(data, offset, as_stream): return abs_data_offset, ODeltaPackInfo(offset, type_id, uncomp_size, delta_info) # END handle info # END handle stream - + +def write_stream_to_pack(read, write, zstream, want_crc=False): + """Copy a stream as read from read function, zip it, and write the result. + Count the number of written bytes and return it + :param want_crc: if True, the crc will be generated over the compressed data. + :return: tuple(no bytes read, no bytes written, crc32) crc might be 0 if want_crc + was false""" + br = 0 # bytes read + bw = 0 # bytes written + crc = 0 + + while True: + chunk = read(chunk_size) + br += len(chunk) + compressed = zstream.compress(chunk) + bw += len(compressed) + write(compressed) # cannot assume return value + + if want_crc: + crc = crc32(compressed, crc) + #END handle crc + + if len(chunk) != chunk_size: + break + #END copy loop + + compressed = zstream.flush() + bw += len(compressed) + write(compressed) + if want_crc: + crc = crc32(compressed, crc) + #END handle crc + + return (br, bw, crc) + + #} END utilities +class IndexWriter(object): + """Utility to cache index information, allowing to write all information later + in one go to the given stream + :note: currently only writes v2 indices""" + __slots__ = '_objs' + + def __init__(self): + self._objs = list() + + def append(self, binsha, crc, offset): + """Append one piece of object information""" + self._objs.append((binsha, crc, offset)) + + def write(self, pack_binsha, write): + """Write the index file using the given write method + :param pack_binsha: sha over the whole pack that we index""" + # sort for sha1 hash + self._objs.sort(key=lambda o: o[0]) + + sha_writer = FlexibleSha1Writer(write) + sha_write = sha_writer.write + sha_write(PackIndexFile.index_v2_signature) + sha_write(pack(">L", PackIndexFile.index_version_default)) + + # fanout + tmplist = list((0,)*256) # fanout or list with 64 bit offsets + for t in self._objs: + tmplist[ord(t[0][0])] += 1 + #END prepare fanout + + for i in xrange(255): + v = tmplist[i] + sha_write(pack('>L', v)) + tmplist[i+1] = v + #END write each fanout entry + sha_write(pack('>L', tmplist[255])) + + # sha1 ordered + # save calls, that is push them into c + sha_write(''.join(t[0] for t in self._objs)) + + # crc32 + for t in self._objs: + sha_write(pack('>L', t[1]&0xffffffff)) + #END for each crc + + tmplist = list() + # offset 32 + for t in self._objs: + ofs = t[2] + if ofs > 0x7fffffff: + tmplist.append(ofs) + ofs = 0x80000000 + len(tmplist)-1 + #END hande 64 bit offsets + sha_write(pack('>L', ofs&0xffffffff)) + #END for each offset + + # offset 64 + for ofs in tmplist: + sha_write(pack(">Q", ofs)) + #END for each offset + + # trailer + assert(len(pack_binsha) == 20) + sha_write(pack_binsha) + write(sha_writer.sha(as_hex=False)) + + class PackIndexFile(LazyMixin): """A pack index provides offsets into the corresponding pack, allowing to find @@ -135,6 +243,8 @@ class PackIndexFile(LazyMixin): # used in v2 indices _sha_list_offset = 8 + 1024 + index_v2_signature = '\377tOc' + index_version_default = 2 def __init__(self, indexpath): super(PackIndexFile, self).__init__() @@ -155,7 +265,7 @@ class PackIndexFile(LazyMixin): # to access the fanout table or related properties # CHECK VERSION - self._version = (self._data[:4] == '\377tOc' and 2) or 1 + self._version = (self._data[:4] == self.index_v2_signature and 2) or 1 if self._version == 2: version_id = unpack_from(">L", self._data, 4)[0] assert version_id == self._version, "Unsupported index version: %i" % version_id @@ -383,6 +493,8 @@ class PackFile(LazyMixin): case""" __slots__ = ('_packpath', '_data', '_size', '_version') + pack_signature = 0x5041434b # 'PACK' + pack_version_default = 2 # offset into our data at which the first object starts first_object_offset = 3*4 # header bytes @@ -396,15 +508,19 @@ class PackFile(LazyMixin): self._data = file_contents_ro_filepath(self._packpath) # read the header information - type_id, self._version, self._size = unpack_from(">4sLL", self._data, 0) + type_id, self._version, self._size = unpack_from(">LLL", self._data, 0) # TODO: figure out whether we should better keep the lock, or maybe # add a .keep file instead ? else: # must be '_size' or '_version' # read header info - we do that just with a file stream - type_id, self._version, self._size = unpack(">4sLL", open(self._packpath).read(12)) + type_id, self._version, self._size = unpack(">LLL", open(self._packpath).read(12)) # END handle header + if type_id != self.pack_signature: + raise ParseError("Invalid pack signature: %i" % type_id) + #END assert type id + def _iter_objects(self, start_offset, as_stream=True): """Handle the actual iteration of objects within this pack""" data = self._data @@ -759,7 +875,8 @@ class PackEntity(LazyMixin): @classmethod - def create(cls, object_iter, pack_write, index_write=None): + def write_pack(cls, object_iter, pack_write, index_write=None, + object_count = None, zlib_compression = zlib.Z_BEST_SPEED): """ Create a new pack by putting all objects obtained by the object_iterator into a pack which is written using the pack_write method. @@ -769,9 +886,74 @@ class PackEntity(LazyMixin): :param pack_write: function to receive strings to write into the pack stream :param indx_write: if not None, the function writes the index file corresponding to the pack. + :param object_count: if you can provide the amount of objects in your iteration, + this would be the place to put it. Otherwise we have to pre-iterate and store + all items into a list to get the number, which uses more memory than necessary. + :param zlib_compression: the zlib compression level to use + :return: binary sha over all the contents of the pack :note: The destination of the write functions is up to the user. It could - be a socket, or a file for instance""" + be a socket, or a file for instance + :note: writes only undeltified objects""" + objs = object_iter + if not object_count: + if not isinstance(object_iter, (tuple, list)): + objs = list(object_iter) + #END handle list type + object_count = len(objs) + #END handle object + + pack_writer = FlexibleSha1Writer(pack_write) + pwrite = pack_writer.write + ofs = 0 # current offset into the pack file + index = None + wants_index = index_write is not None + + # write header + pwrite(pack('>LLL', PackFile.pack_signature, PackFile.pack_version_default, object_count)) + ofs += 12 + + if wants_index: + index = IndexWriter() + #END handle index header + + actual_count = 0 + for obj in objs: + actual_count += 1 + + # object header + hdr = create_pack_object_header(obj.type_id, obj.size) + pwrite(hdr) + + # data stream + zstream = zlib.compressobj(zlib_compression) + ostream = obj.stream + br, bw, crc = write_stream_to_pack(ostream.read, pwrite, zstream, want_crc = index_write) + assert(br == obj.size) + if wants_index: + index.append(obj.binsha, crc, ofs) + #END handle index + + ofs += len(hdr) + bw + if actual_count == object_count: + break + #END abort once we are done + #END for each object + + if actual_count != object_count: + raise ValueError("Expected to write %i objects into pack, but received only %i from iterators" % (object_count, actual_count)) + #END count assertion + + # write footer + binsha = pack_writer.sha(as_hex = False) + assert len(binsha) == 20 + pack_write(binsha) + ofs += len(binsha) # just for completeness ;) + + if wants_index: + index.write(binsha, index_write) + #END handle index + return binsha #} END interface diff --git a/gitdb/stream.py b/gitdb/stream.py index 6c3b8d3..8010a05 100644 --- a/gitdb/stream.py +++ b/gitdb/stream.py @@ -33,7 +33,9 @@ try: except ImportError: pass -__all__ = ('DecompressMemMapReader', 'FDCompressedSha1Writer', 'DeltaApplyReader') +__all__ = ( 'DecompressMemMapReader', 'FDCompressedSha1Writer', 'DeltaApplyReader', + 'Sha1Writer', 'FlexibleSha1Writer', 'ZippedStoreShaWriter', 'FDCompressedSha1Writer', + 'FDStream', 'NullStream') #{ RO Streams @@ -557,6 +559,20 @@ class Sha1Writer(object): #} END interface +class FlexibleSha1Writer(Sha1Writer): + """Writer producing a sha1 while passing on the written bytes to the given + write function""" + __slots__ = 'writer' + + def __init__(self, writer): + Sha1Writer.__init__(self) + self.writer = writer + + def write(self, data): + Sha1Writer.write(self, data) + self.writer(data) + + class ZippedStoreShaWriter(Sha1Writer): """Remembers everything someone writes to it and generates a sha""" __slots__ = ('buf', 'zip') diff --git a/gitdb/test/test_pack.py b/gitdb/test/test_pack.py index 8e98808..c4d8df1 100644 --- a/gitdb/test/test_pack.py +++ b/gitdb/test/test_pack.py @@ -140,7 +140,7 @@ class TestPack(TestBase): @with_rw_directory def test_pack_entity(self, rw_dir): - pack_iterators = list(); + pack_objs = list() for packinfo, indexinfo in ( (self.packfile_v2_1, self.packindexfile_v1), (self.packfile_v2_2, self.packindexfile_v2), (self.packfile_v2_3_ascii, self.packindexfile_v2_3_ascii)): @@ -149,7 +149,7 @@ class TestPack(TestBase): entity = PackEntity(packfile) assert entity.pack().path() == packfile assert entity.index().path() == indexfile - pack_iterators.append(entity.stream_iter()) + pack_objs.extend(entity.stream_iter()) count = 0 for info, stream in izip(entity.info_iter(), entity.stream_iter()): @@ -182,24 +182,49 @@ class TestPack(TestBase): assert count == size # END for each entity - + # pack writing - write all packs into one # index path can be None pack_path = tempfile.mktemp('', "pack", rw_dir) index_path = tempfile.mktemp('', 'index', rw_dir) - for pp, ip in ((pack_path, )*2, (index_path, None)): - pfile = open(pp, 'wb') - ifile = None - if ip: - ifile = open(ip, 'wb') + iteration = 0 + for ppath, ipath, num_obj in zip((pack_path, )*2, (index_path, None), (len(pack_objs), None)): + pfile = open(ppath, 'wb') + iwrite = None + if ipath: + ifile = open(ipath, 'wb') + iwrite = ifile.write #END handle ip - PackEntity.create(chain(*pack_iterators), pfile, ifile) - assert os.path.getsize(pp) > 100 - if ip is not None: - assert os.path.getsize(ip) > 100 + # make sure we rewind the streams ... we work on the same objects over and over again + if iteration > 0: + for obj in pack_objs: + obj.stream.seek(0) + #END rewind streams + iteration += 1 + + binsha = PackEntity.write_pack(pack_objs, pfile.write, iwrite, object_count=num_obj) + pfile.close() + assert os.path.getsize(ppath) > 100 + + # verify pack + pf = PackFile(ppath) + assert pf.size() == len(pack_objs) + assert pf.version() == PackFile.pack_version_default + assert pf.checksum() == binsha + + # verify index + if ipath is not None: + assert os.path.getsize(ipath) > 100 + #END verify files exist - #END for each packpath, indexpath pair + + if ifile: + ifile.close() + #END handle index + #END for each packpath, indexpath pair + + # def test_pack_64(self): |
