summaryrefslogtreecommitdiff
path: root/gitdb
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2011-03-31 18:01:05 +0200
committerSebastian Thiel <byronimo@gmail.com>2011-03-31 18:01:05 +0200
commite83210d99aaac5768827c448909fa04d63776e64 (patch)
tree3c074e0526b5db8510899463f3eb390a18ea42b6 /gitdb
parent810d1e38315c6e886c1daef93670840b213ee78a (diff)
downloadgitdb-e83210d99aaac5768827c448909fa04d63776e64.tar.gz
initial version of pack writing, which seems to work, but still needs some more testing and verification
Diffstat (limited to 'gitdb')
-rw-r--r--gitdb/exc.py3
-rw-r--r--gitdb/fun.py20
-rw-r--r--gitdb/pack.py196
-rw-r--r--gitdb/stream.py18
-rw-r--r--gitdb/test/test_pack.py51
5 files changed, 266 insertions, 22 deletions
diff --git a/gitdb/exc.py b/gitdb/exc.py
index 96fa874..e087047 100644
--- a/gitdb/exc.py
+++ b/gitdb/exc.py
@@ -17,6 +17,9 @@ class BadObject(ODBError):
def __str__(self):
return "BadObject: %s" % to_hex_sha(self.args[0])
+
+class ParseError(ODBError):
+ """Thrown if the parsing of a file failed due to an invalid format"""
class AmbiguousObjectName(ODBError):
"""Thrown if a possibly shortened name does not uniquely represent a single object
diff --git a/gitdb/fun.py b/gitdb/fun.py
index 34978cd..5bbe8ef 100644
--- a/gitdb/fun.py
+++ b/gitdb/fun.py
@@ -48,7 +48,7 @@ chunk_size = 1000*mmap.PAGESIZE
__all__ = ('is_loose_object', 'loose_object_header_info', 'msb_size', 'pack_object_header_info',
'write_object', 'loose_object_header', 'stream_copy', 'apply_delta_data',
- 'is_equal_canonical_sha', 'connect_deltas', 'DeltaChunkList')
+ 'is_equal_canonical_sha', 'connect_deltas', 'DeltaChunkList', 'create_pack_object_header')
#{ Structures
@@ -412,6 +412,24 @@ def pack_object_header_info(data):
s += 7
# END character loop
return (type_id, size, i)
+
+def create_pack_object_header(obj_type, obj_size):
+ """:return: string defining the pack header comprised of the object type
+ and its incompressed size in bytes
+ :parmam obj_type: pack type_id of the object
+ :param obj_size: uncompressed size in bytes of the following object stream"""
+ c = 0 # 1 byte
+ hdr = str() # output string
+
+ c = (obj_type << 4) | (obj_size & 0xf)
+ obj_size >>= 4
+ while obj_size:
+ hdr += chr(c | 0x80)
+ c = obj_size & 0x7f
+ obj_size >>= 7
+ #END until size is consumed
+ hdr += chr(c)
+ return hdr
def msb_size(data, offset=0):
"""
diff --git a/gitdb/pack.py b/gitdb/pack.py
index 335fe3c..6c32949 100644
--- a/gitdb/pack.py
+++ b/gitdb/pack.py
@@ -5,7 +5,8 @@
"""Contains PackIndexFile and PackFile implementations"""
from gitdb.exc import (
BadObject,
- UnsupportedOperation
+ UnsupportedOperation,
+ ParseError
)
from util import (
zlib,
@@ -15,6 +16,7 @@ from util import (
)
from fun import (
+ create_pack_object_header,
pack_object_header_info,
is_equal_canonical_sha,
type_id_to_type_map,
@@ -47,6 +49,7 @@ from stream import (
DeltaApplyReader,
Sha1Writer,
NullStream,
+ FlexibleSha1Writer
)
from struct import (
@@ -54,6 +57,8 @@ from struct import (
unpack,
)
+from binascii import crc32
+
from itertools import izip
import array
import os
@@ -119,10 +124,113 @@ def pack_object_at(data, offset, as_stream):
return abs_data_offset, ODeltaPackInfo(offset, type_id, uncomp_size, delta_info)
# END handle info
# END handle stream
-
+
+def write_stream_to_pack(read, write, zstream, want_crc=False):
+ """Copy a stream as read from read function, zip it, and write the result.
+ Count the number of written bytes and return it
+ :param want_crc: if True, the crc will be generated over the compressed data.
+ :return: tuple(no bytes read, no bytes written, crc32) crc might be 0 if want_crc
+ was false"""
+ br = 0 # bytes read
+ bw = 0 # bytes written
+ crc = 0
+
+ while True:
+ chunk = read(chunk_size)
+ br += len(chunk)
+ compressed = zstream.compress(chunk)
+ bw += len(compressed)
+ write(compressed) # cannot assume return value
+
+ if want_crc:
+ crc = crc32(compressed, crc)
+ #END handle crc
+
+ if len(chunk) != chunk_size:
+ break
+ #END copy loop
+
+ compressed = zstream.flush()
+ bw += len(compressed)
+ write(compressed)
+ if want_crc:
+ crc = crc32(compressed, crc)
+ #END handle crc
+
+ return (br, bw, crc)
+
+
#} END utilities
+class IndexWriter(object):
+ """Utility to cache index information, allowing to write all information later
+ in one go to the given stream
+ :note: currently only writes v2 indices"""
+ __slots__ = '_objs'
+
+ def __init__(self):
+ self._objs = list()
+
+ def append(self, binsha, crc, offset):
+ """Append one piece of object information"""
+ self._objs.append((binsha, crc, offset))
+
+ def write(self, pack_binsha, write):
+ """Write the index file using the given write method
+ :param pack_binsha: sha over the whole pack that we index"""
+ # sort for sha1 hash
+ self._objs.sort(key=lambda o: o[0])
+
+ sha_writer = FlexibleSha1Writer(write)
+ sha_write = sha_writer.write
+ sha_write(PackIndexFile.index_v2_signature)
+ sha_write(pack(">L", PackIndexFile.index_version_default))
+
+ # fanout
+ tmplist = list((0,)*256) # fanout or list with 64 bit offsets
+ for t in self._objs:
+ tmplist[ord(t[0][0])] += 1
+ #END prepare fanout
+
+ for i in xrange(255):
+ v = tmplist[i]
+ sha_write(pack('>L', v))
+ tmplist[i+1] = v
+ #END write each fanout entry
+ sha_write(pack('>L', tmplist[255]))
+
+ # sha1 ordered
+ # save calls, that is push them into c
+ sha_write(''.join(t[0] for t in self._objs))
+
+ # crc32
+ for t in self._objs:
+ sha_write(pack('>L', t[1]&0xffffffff))
+ #END for each crc
+
+ tmplist = list()
+ # offset 32
+ for t in self._objs:
+ ofs = t[2]
+ if ofs > 0x7fffffff:
+ tmplist.append(ofs)
+ ofs = 0x80000000 + len(tmplist)-1
+ #END hande 64 bit offsets
+ sha_write(pack('>L', ofs&0xffffffff))
+ #END for each offset
+
+ # offset 64
+ for ofs in tmplist:
+ sha_write(pack(">Q", ofs))
+ #END for each offset
+
+ # trailer
+ assert(len(pack_binsha) == 20)
+ sha_write(pack_binsha)
+ write(sha_writer.sha(as_hex=False))
+
+
class PackIndexFile(LazyMixin):
"""A pack index provides offsets into the corresponding pack, allowing to find
@@ -135,6 +243,8 @@ class PackIndexFile(LazyMixin):
# used in v2 indices
_sha_list_offset = 8 + 1024
+ index_v2_signature = '\377tOc'
+ index_version_default = 2
def __init__(self, indexpath):
super(PackIndexFile, self).__init__()
@@ -155,7 +265,7 @@ class PackIndexFile(LazyMixin):
# to access the fanout table or related properties
# CHECK VERSION
- self._version = (self._data[:4] == '\377tOc' and 2) or 1
+ self._version = (self._data[:4] == self.index_v2_signature and 2) or 1
if self._version == 2:
version_id = unpack_from(">L", self._data, 4)[0]
assert version_id == self._version, "Unsupported index version: %i" % version_id
@@ -383,6 +493,8 @@ class PackFile(LazyMixin):
case"""
__slots__ = ('_packpath', '_data', '_size', '_version')
+ pack_signature = 0x5041434b # 'PACK'
+ pack_version_default = 2
# offset into our data at which the first object starts
first_object_offset = 3*4 # header bytes
@@ -396,15 +508,19 @@ class PackFile(LazyMixin):
self._data = file_contents_ro_filepath(self._packpath)
# read the header information
- type_id, self._version, self._size = unpack_from(">4sLL", self._data, 0)
+ type_id, self._version, self._size = unpack_from(">LLL", self._data, 0)
# TODO: figure out whether we should better keep the lock, or maybe
# add a .keep file instead ?
else: # must be '_size' or '_version'
# read header info - we do that just with a file stream
- type_id, self._version, self._size = unpack(">4sLL", open(self._packpath).read(12))
+ type_id, self._version, self._size = unpack(">LLL", open(self._packpath).read(12))
# END handle header
+ if type_id != self.pack_signature:
+ raise ParseError("Invalid pack signature: %i" % type_id)
+ #END assert type id
+
def _iter_objects(self, start_offset, as_stream=True):
"""Handle the actual iteration of objects within this pack"""
data = self._data
@@ -759,7 +875,8 @@ class PackEntity(LazyMixin):
@classmethod
- def create(cls, object_iter, pack_write, index_write=None):
+ def write_pack(cls, object_iter, pack_write, index_write=None,
+ object_count = None, zlib_compression = zlib.Z_BEST_SPEED):
"""
Create a new pack by putting all objects obtained by the object_iterator
into a pack which is written using the pack_write method.
@@ -769,9 +886,74 @@ class PackEntity(LazyMixin):
:param pack_write: function to receive strings to write into the pack stream
:param indx_write: if not None, the function writes the index file corresponding
to the pack.
+ :param object_count: if you can provide the amount of objects in your iteration,
+ this would be the place to put it. Otherwise we have to pre-iterate and store
+ all items into a list to get the number, which uses more memory than necessary.
+ :param zlib_compression: the zlib compression level to use
+ :return: binary sha over all the contents of the pack
:note: The destination of the write functions is up to the user. It could
- be a socket, or a file for instance"""
+ be a socket, or a file for instance
+ :note: writes only undeltified objects"""
+ objs = object_iter
+ if not object_count:
+ if not isinstance(object_iter, (tuple, list)):
+ objs = list(object_iter)
+ #END handle list type
+ object_count = len(objs)
+ #END handle object
+
+ pack_writer = FlexibleSha1Writer(pack_write)
+ pwrite = pack_writer.write
+ ofs = 0 # current offset into the pack file
+ index = None
+ wants_index = index_write is not None
+
+ # write header
+ pwrite(pack('>LLL', PackFile.pack_signature, PackFile.pack_version_default, object_count))
+ ofs += 12
+
+ if wants_index:
+ index = IndexWriter()
+ #END handle index header
+
+ actual_count = 0
+ for obj in objs:
+ actual_count += 1
+
+ # object header
+ hdr = create_pack_object_header(obj.type_id, obj.size)
+ pwrite(hdr)
+
+ # data stream
+ zstream = zlib.compressobj(zlib_compression)
+ ostream = obj.stream
+ br, bw, crc = write_stream_to_pack(ostream.read, pwrite, zstream, want_crc = index_write)
+ assert(br == obj.size)
+ if wants_index:
+ index.append(obj.binsha, crc, ofs)
+ #END handle index
+
+ ofs += len(hdr) + bw
+ if actual_count == object_count:
+ break
+ #END abort once we are done
+ #END for each object
+
+ if actual_count != object_count:
+ raise ValueError("Expected to write %i objects into pack, but received only %i from iterators" % (object_count, actual_count))
+ #END count assertion
+
+ # write footer
+ binsha = pack_writer.sha(as_hex = False)
+ assert len(binsha) == 20
+ pack_write(binsha)
+ ofs += len(binsha) # just for completeness ;)
+
+ if wants_index:
+ index.write(binsha, index_write)
+ #END handle index
+ return binsha
#} END interface
diff --git a/gitdb/stream.py b/gitdb/stream.py
index 6c3b8d3..8010a05 100644
--- a/gitdb/stream.py
+++ b/gitdb/stream.py
@@ -33,7 +33,9 @@ try:
except ImportError:
pass
-__all__ = ('DecompressMemMapReader', 'FDCompressedSha1Writer', 'DeltaApplyReader')
+__all__ = ( 'DecompressMemMapReader', 'FDCompressedSha1Writer', 'DeltaApplyReader',
+ 'Sha1Writer', 'FlexibleSha1Writer', 'ZippedStoreShaWriter', 'FDCompressedSha1Writer',
+ 'FDStream', 'NullStream')
#{ RO Streams
@@ -557,6 +559,20 @@ class Sha1Writer(object):
#} END interface
+class FlexibleSha1Writer(Sha1Writer):
+ """Writer producing a sha1 while passing on the written bytes to the given
+ write function"""
+ __slots__ = 'writer'
+
+ def __init__(self, writer):
+ Sha1Writer.__init__(self)
+ self.writer = writer
+
+ def write(self, data):
+ Sha1Writer.write(self, data)
+ self.writer(data)
+
+
class ZippedStoreShaWriter(Sha1Writer):
"""Remembers everything someone writes to it and generates a sha"""
__slots__ = ('buf', 'zip')
diff --git a/gitdb/test/test_pack.py b/gitdb/test/test_pack.py
index 8e98808..c4d8df1 100644
--- a/gitdb/test/test_pack.py
+++ b/gitdb/test/test_pack.py
@@ -140,7 +140,7 @@ class TestPack(TestBase):
@with_rw_directory
def test_pack_entity(self, rw_dir):
- pack_iterators = list();
+ pack_objs = list()
for packinfo, indexinfo in ( (self.packfile_v2_1, self.packindexfile_v1),
(self.packfile_v2_2, self.packindexfile_v2),
(self.packfile_v2_3_ascii, self.packindexfile_v2_3_ascii)):
@@ -149,7 +149,7 @@ class TestPack(TestBase):
entity = PackEntity(packfile)
assert entity.pack().path() == packfile
assert entity.index().path() == indexfile
- pack_iterators.append(entity.stream_iter())
+ pack_objs.extend(entity.stream_iter())
count = 0
for info, stream in izip(entity.info_iter(), entity.stream_iter()):
@@ -182,24 +182,49 @@ class TestPack(TestBase):
assert count == size
# END for each entity
-
+
# pack writing - write all packs into one
# index path can be None
pack_path = tempfile.mktemp('', "pack", rw_dir)
index_path = tempfile.mktemp('', 'index', rw_dir)
- for pp, ip in ((pack_path, )*2, (index_path, None)):
- pfile = open(pp, 'wb')
- ifile = None
- if ip:
- ifile = open(ip, 'wb')
+ iteration = 0
+ for ppath, ipath, num_obj in zip((pack_path, )*2, (index_path, None), (len(pack_objs), None)):
+ pfile = open(ppath, 'wb')
+ iwrite = None
+ if ipath:
+ ifile = open(ipath, 'wb')
+ iwrite = ifile.write
#END handle ip
- PackEntity.create(chain(*pack_iterators), pfile, ifile)
- assert os.path.getsize(pp) > 100
- if ip is not None:
- assert os.path.getsize(ip) > 100
+ # make sure we rewind the streams ... we work on the same objects over and over again
+ if iteration > 0:
+ for obj in pack_objs:
+ obj.stream.seek(0)
+ #END rewind streams
+ iteration += 1
+
+ binsha = PackEntity.write_pack(pack_objs, pfile.write, iwrite, object_count=num_obj)
+ pfile.close()
+ assert os.path.getsize(ppath) > 100
+
+ # verify pack
+ pf = PackFile(ppath)
+ assert pf.size() == len(pack_objs)
+ assert pf.version() == PackFile.pack_version_default
+ assert pf.checksum() == binsha
+
+ # verify index
+ if ipath is not None:
+ assert os.path.getsize(ipath) > 100
+
#END verify files exist
- #END for each packpath, indexpath pair
+
+ if ifile:
+ ifile.close()
+ #END handle index
+ #END for each packpath, indexpath pair
+
+ #
def test_pack_64(self):