summaryrefslogtreecommitdiff
path: root/test/testlib
diff options
context:
space:
mode:
authorJason Kirtland <jek@discorporate.us>2008-04-03 17:08:08 +0000
committerJason Kirtland <jek@discorporate.us>2008-04-03 17:08:08 +0000
commitd78f39d0057bbc648a9af31d7bd3ead2895ee178 (patch)
tree62877dffe613060d8963438f29480693dee9e9bd /test/testlib
parentca1ad4cbb9af8a45da550ba07c476f8cac17cd7a (diff)
downloadsqlalchemy-d78f39d0057bbc648a9af31d7bd3ead2895ee178.tar.gz
- Experimental: prefer cProfile over hotspot for 2.5+
- The latest skirmish in the battle against zoomark and sanity: 3rd party code is factored out in the function call count canary tests
Diffstat (limited to 'test/testlib')
-rw-r--r--test/testlib/compat.py16
-rw-r--r--test/testlib/engines.py93
-rw-r--r--test/testlib/profiling.py91
3 files changed, 166 insertions, 34 deletions
diff --git a/test/testlib/compat.py b/test/testlib/compat.py
index 4f2006afd..ba12b78ac 100644
--- a/test/testlib/compat.py
+++ b/test/testlib/compat.py
@@ -1,6 +1,6 @@
import itertools, new, sys, warnings
-__all__ = 'set', 'frozenset', 'sorted', '_function_named'
+__all__ = 'set', 'frozenset', 'sorted', '_function_named', 'deque'
try:
set = set
@@ -68,6 +68,20 @@ except NameError:
l.sort()
return l
+try:
+ from collections import deque
+except ImportError:
+ class deque(list):
+ def appendleft(self, x):
+ self.insert(0, x)
+ def popleft(self):
+ return self.pop(0)
+ def extendleft(self, iterable):
+ items = list(iterable)
+ items.reverse()
+ for x in items:
+ self.insert(0, x)
+
def _function_named(fn, newname):
try:
fn.__name__ = newname
diff --git a/test/testlib/engines.py b/test/testlib/engines.py
index 8cb321597..f5694df57 100644
--- a/test/testlib/engines.py
+++ b/test/testlib/engines.py
@@ -1,4 +1,4 @@
-import sys, weakref
+import sys, types, weakref
from testlib import config
from testlib.compat import *
@@ -131,3 +131,94 @@ def utf8_engine(url=None, options=None):
url = str(url)
return testing_engine(url, options)
+
+
+class ReplayableSession(object):
+ """A simple record/playback tool.
+
+ This is *not* a mock testing class. It only records a session for later
+ playback and makes no assertions on call consistency whatsoever. It's
+ unlikely to be suitable for anything other than DB-API recording.
+
+ """
+
+ Callable = object()
+ NoAttribute = object()
+ Natives = set([getattr(types, t)
+ for t in dir(types) if not t.startswith('_')]). \
+ difference([getattr(types, t)
+ for t in ('FunctionType', 'BuiltinFunctionType',
+ 'MethodType', 'BuiltinMethodType',
+ 'LambdaType', 'UnboundMethodType',)])
+ def __init__(self):
+ self.buffer = deque()
+
+ def recorder(self, base):
+ return self.Recorder(self.buffer, base)
+
+ def player(self):
+ return self.Player(self.buffer)
+
+ class Recorder(object):
+ def __init__(self, buffer, subject):
+ self._buffer = buffer
+ self._subject = subject
+
+ def __call__(self, *args, **kw):
+ subject, buffer = [object.__getattribute__(self, x)
+ for x in ('_subject', '_buffer')]
+
+ result = subject(*args, **kw)
+ if type(result) not in ReplayableSession.Natives:
+ buffer.append(ReplayableSession.Callable)
+ return type(self)(buffer, result)
+ else:
+ buffer.append(result)
+ return result
+
+ def __getattribute__(self, key):
+ try:
+ return object.__getattribute__(self, key)
+ except AttributeError:
+ pass
+
+ subject, buffer = [object.__getattribute__(self, x)
+ for x in ('_subject', '_buffer')]
+ try:
+ result = type(subject).__getattribute__(subject, key)
+ except AttributeError:
+ buffer.append(ReplayableSession.NoAttribute)
+ raise
+ else:
+ if type(result) not in ReplayableSession.Natives:
+ buffer.append(ReplayableSession.Callable)
+ return type(self)(buffer, result)
+ else:
+ buffer.append(result)
+ return result
+
+ class Player(object):
+ def __init__(self, buffer):
+ self._buffer = buffer
+
+ def __call__(self, *args, **kw):
+ buffer = object.__getattribute__(self, '_buffer')
+ result = buffer.popleft()
+ if result is ReplayableSession.Callable:
+ return self
+ else:
+ return result
+
+ def __getattribute__(self, key):
+ try:
+ return object.__getattribute__(self, key)
+ except AttributeError:
+ pass
+ buffer = object.__getattribute__(self, '_buffer')
+ result = buffer.popleft()
+ if result is ReplayableSession.Callable:
+ return self
+ elif result is ReplayableSession.NoAttribute:
+ raise AttributeError(key)
+ else:
+ return result
diff --git a/test/testlib/profiling.py b/test/testlib/profiling.py
index 54a96db47..edaeabdad 100644
--- a/test/testlib/profiling.py
+++ b/test/testlib/profiling.py
@@ -12,6 +12,7 @@ profile_config = { 'targets': set(),
'report': True,
'sort': ('time', 'calls'),
'limit': None }
+profiler = None
def profiled(target=None, **target_opts):
"""Optional function profiling.
@@ -42,19 +43,12 @@ def profiled(target=None, **target_opts):
not target_opts.get('always', None)):
return fn(*args, **kw)
- prof = hotshot.Profile(filename)
- began = time.time()
- prof.start()
- try:
- result = fn(*args, **kw)
- finally:
- prof.stop()
- ended = time.time()
- prof.close()
+ elapsed, load_stats, result = _profile(
+ filename, fn, *args, **kw)
if not testlib.config.options.quiet:
print "Profiled target '%s', wall time: %.2f seconds" % (
- target, ended - began)
+ target, elapsed)
report = target_opts.get('report', profile_config['report'])
if report and testlib.config.options.verbose:
@@ -63,20 +57,13 @@ def profiled(target=None, **target_opts):
print "Profile report for target '%s' (%s)" % (
target, filename)
- stats = hotshot.stats.load(filename)
+ stats = load_stats()
stats.sort_stats(*sort_)
if limit:
stats.print_stats(limit)
else:
stats.print_stats()
- assert_range = target_opts.get('call_range')
- if assert_range:
- if isinstance(assert_range, dict):
- assert_range = assert_range.get(testlib.config.db, 'default')
- stats = hotshot.stats.load(filename)
- assert stats.total_calls >= assert_range[0] and stats.total_calls <= assert_range[1], stats.total_calls
-
os.unlink(filename)
return result
return _function_named(profiled, fn.__name__)
@@ -118,23 +105,21 @@ def function_call_count(count=None, versions={}, variance=0.05):
if count is None:
return lambda fn: fn
- import hotshot, hotshot.stats
-
def decorator(fn):
def counted(*args, **kw):
try:
filename = "%s.prof" % fn.__name__
- prof = hotshot.Profile(filename)
- prof.start()
- try:
- result = fn(*args, **kw)
- finally:
- prof.stop()
- prof.close()
+ elapsed, stat_loader, result = _profile(
+ filename, fn, *args, **kw)
- stats = hotshot.stats.load(filename)
+ stats = stat_loader()
calls = stats.total_calls
+
+ if testlib.config.options.verbose:
+ stats.sort_stats('calls', 'cumulative')
+ stats.print_stats()
+
deviance = int(count * variance)
if (calls < (count - deviance) or
calls > (count + deviance)):
@@ -143,10 +128,6 @@ def function_call_count(count=None, versions={}, variance=0.05):
"of expected %s. (Python version %s)" % (
calls, (variance * 100), count, py_version))
- if testlib.config.options.verbose:
- stats.sort_stats('calls', 'cumulative')
- stats.print_stats()
-
return result
finally:
if os.path.exists(filename):
@@ -179,3 +160,49 @@ def conditional_call_count(discriminator, categories):
return rewrapped(*args, **kw)
return _function_named(at_runtime, fn.__name__)
return decorator
+
+
+def _profile(filename, fn, *args, **kw):
+ global profiler
+ if not profiler:
+ try:
+ import cProfile
+ profiler = 'cProfile'
+ except ImportError:
+ profiler = 'hotshot'
+
+ if profiler == 'cProfile':
+ return _profile_cProfile(filename, fn, *args, **kw)
+ else:
+ return _profile_hotshot(filename, fn, *args, **kw)
+
+def _profile_cProfile(filename, fn, *args, **kw):
+ import cProfile, gc, pstats, time
+
+ load_stats = lambda: pstats.Stats(filename)
+ gc.collect()
+
+ began = time.time()
+ cProfile.runctx('result = fn(*args, **kw)', globals(), locals(),
+ filename=filename)
+ ended = time.time()
+
+ return ended - began, load_stats, locals()['result']
+
+def _profile_hotshot(filename, fn, *args, **kw):
+ import gc, hotshot, hotshot.stats, time
+ load_stats = lambda: hotshot.stats.load(filename)
+
+ gc.collect()
+ prof = hotshot.Profile(filename)
+ began = time.time()
+ prof.start()
+ try:
+ result = fn(*args, **kw)
+ finally:
+ prof.stop()
+ ended = time.time()
+ prof.close()
+
+ return ended - began, load_stats, result
+