diff options
| author | Sebastian Thiel <byronimo@gmail.com> | 2014-11-14 12:45:19 +0100 |
|---|---|---|
| committer | Sebastian Thiel <byronimo@gmail.com> | 2014-11-14 12:45:19 +0100 |
| commit | 2f2fe4eea8ba4f47e63a7392a1f27f74f5ee925d (patch) | |
| tree | 176a493d114fab7cc6e930bf318b2339db386cf5 /gitdb/test/performance/test_stream.py | |
| parent | 81707c606b88e971cc359e3e9f3abeeea2204860 (diff) | |
| parent | 0dcec5a27b341ce58e5ab169f91aa25b2cafec0c (diff) | |
| download | gitdb-0.6.0.tar.gz | |
Merge branch 'py2n3'0.6.0
* python 3 compatibility
* all tests work in py2.6, 2.7, 3.3, 3.4
Diffstat (limited to 'gitdb/test/performance/test_stream.py')
| -rw-r--r-- | gitdb/test/performance/test_stream.py | 126 |
1 files changed, 19 insertions, 107 deletions
diff --git a/gitdb/test/performance/test_stream.py b/gitdb/test/performance/test_stream.py index 010003d..84c9dea 100644 --- a/gitdb/test/performance/test_stream.py +++ b/gitdb/test/performance/test_stream.py @@ -3,35 +3,25 @@ # This module is part of GitDB and is released under # the New BSD License: http://www.opensource.org/licenses/bsd-license.php """Performance data streaming performance""" -from lib import TestBigRepoR -from gitdb.db import * -from gitdb.base import * -from gitdb.stream import * -from gitdb.util import ( - pool, - bin_to_hex - ) -from gitdb.typ import str_blob_type -from gitdb.fun import chunk_size +from __future__ import print_function + +from gitdb.test.performance.lib import TestBigRepoR +from gitdb.db import LooseObjectDB +from gitdb import IStream -from async import ( - IteratorReader, - ChannelThreadTask, - ) +from gitdb.util import bin_to_hex +from gitdb.fun import chunk_size -from cStringIO import StringIO from time import time import os import sys -import stat -import subprocess -from lib import ( - TestBigRepoR, +from gitdb.test.lib import ( make_memory_file, - with_rw_directory - ) + with_rw_directory, + skip_on_travis_ci +) #{ Utilities @@ -47,22 +37,14 @@ def read_chunked_stream(stream): return stream -class TestStreamReader(ChannelThreadTask): - """Expects input streams and reads them in chunks. It will read one at a time, - requireing a queue chunk of size 1""" - def __init__(self, *args): - super(TestStreamReader, self).__init__(*args) - self.fun = read_chunked_stream - self.max_chunksize = 1 - - #} END utilities class TestObjDBPerformance(TestBigRepoR): large_data_size_bytes = 1000*1000*50 # some MiB should do it moderate_data_size_bytes = 1000*1000*1 # just 1 MiB - + + @skip_on_travis_ci @with_rw_directory def test_large_data_streaming(self, path): ldb = LooseObjectDB(path) @@ -71,11 +53,11 @@ class TestObjDBPerformance(TestBigRepoR): # serial mode for randomize in range(2): desc = (randomize and 'random ') or '' - print >> sys.stderr, "Creating %s data ..." % desc + print("Creating %s data ..." % desc, file=sys.stderr) st = time() size, stream = make_memory_file(self.large_data_size_bytes, randomize) elapsed = time() - st - print >> sys.stderr, "Done (in %f s)" % elapsed + print("Done (in %f s)" % elapsed, file=sys.stderr) string_ios.append(stream) # writing - due to the compression it will seem faster than it is @@ -88,7 +70,7 @@ class TestObjDBPerformance(TestBigRepoR): size_kib = size / 1000 - print >> sys.stderr, "Added %i KiB (filesize = %i KiB) of %s data to loose odb in %f s ( %f Write KiB / s)" % (size_kib, fsize_kib, desc, elapsed_add, size_kib / elapsed_add) + print("Added %i KiB (filesize = %i KiB) of %s data to loose odb in %f s ( %f Write KiB / s)" % (size_kib, fsize_kib, desc, elapsed_add, size_kib / elapsed_add), file=sys.stderr) # reading all at once st = time() @@ -98,7 +80,7 @@ class TestObjDBPerformance(TestBigRepoR): stream.seek(0) assert shadata == stream.getvalue() - print >> sys.stderr, "Read %i KiB of %s data at once from loose odb in %f s ( %f Read KiB / s)" % (size_kib, desc, elapsed_readall, size_kib / elapsed_readall) + print("Read %i KiB of %s data at once from loose odb in %f s ( %f Read KiB / s)" % (size_kib, desc, elapsed_readall, size_kib / elapsed_readall), file=sys.stderr) # reading in chunks of 1 MiB @@ -115,81 +97,11 @@ class TestObjDBPerformance(TestBigRepoR): elapsed_readchunks = time() - st stream.seek(0) - assert ''.join(chunks) == stream.getvalue() + assert b''.join(chunks) == stream.getvalue() cs_kib = cs / 1000 - print >> sys.stderr, "Read %i KiB of %s data in %i KiB chunks from loose odb in %f s ( %f Read KiB / s)" % (size_kib, desc, cs_kib, elapsed_readchunks, size_kib / elapsed_readchunks) + print("Read %i KiB of %s data in %i KiB chunks from loose odb in %f s ( %f Read KiB / s)" % (size_kib, desc, cs_kib, elapsed_readchunks, size_kib / elapsed_readchunks), file=sys.stderr) # del db file so we keep something to do os.remove(db_file) # END for each randomization factor - - - # multi-threaded mode - # want two, should be supported by most of todays cpus - pool.set_size(2) - total_kib = 0 - nsios = len(string_ios) - for stream in string_ios: - stream.seek(0) - total_kib += len(stream.getvalue()) / 1000 - # END rewind - - def istream_iter(): - for stream in string_ios: - stream.seek(0) - yield IStream(str_blob_type, len(stream.getvalue()), stream) - # END for each stream - # END util - - # write multiple objects at once, involving concurrent compression - reader = IteratorReader(istream_iter()) - istream_reader = ldb.store_async(reader) - istream_reader.task().max_chunksize = 1 - - st = time() - istreams = istream_reader.read(nsios) - assert len(istreams) == nsios - elapsed = time() - st - - print >> sys.stderr, "Threads(%i): Compressed %i KiB of data in loose odb in %f s ( %f Write KiB / s)" % (pool.size(), total_kib, elapsed, total_kib / elapsed) - - # decompress multiple at once, by reading them - # chunk size is not important as the stream will not really be decompressed - - # until its read - istream_reader = IteratorReader(iter([ i.binsha for i in istreams ])) - ostream_reader = ldb.stream_async(istream_reader) - - chunk_task = TestStreamReader(ostream_reader, "chunker", None) - output_reader = pool.add_task(chunk_task) - output_reader.task().max_chunksize = 1 - - st = time() - assert len(output_reader.read(nsios)) == nsios - elapsed = time() - st - - print >> sys.stderr, "Threads(%i): Decompressed %i KiB of data in loose odb in %f s ( %f Read KiB / s)" % (pool.size(), total_kib, elapsed, total_kib / elapsed) - - # store the files, and read them back. For the reading, we use a task - # as well which is chunked into one item per task. Reading all will - # very quickly result in two threads handling two bytestreams of - # chained compression/decompression streams - reader = IteratorReader(istream_iter()) - istream_reader = ldb.store_async(reader) - istream_reader.task().max_chunksize = 1 - - istream_to_sha = lambda items: [ i.binsha for i in items ] - istream_reader.set_post_cb(istream_to_sha) - - ostream_reader = ldb.stream_async(istream_reader) - - chunk_task = TestStreamReader(ostream_reader, "chunker", None) - output_reader = pool.add_task(chunk_task) - output_reader.max_chunksize = 1 - - st = time() - assert len(output_reader.read(nsios)) == nsios - elapsed = time() - st - - print >> sys.stderr, "Threads(%i): Compressed and decompressed and read %i KiB of data in loose odb in %f s ( %f Combined KiB / s)" % (pool.size(), total_kib, elapsed, total_kib / elapsed) |
