import os import pprint import unittest import tempfile import _hotshot import gc from test import test_support # Silence Py3k warning hotshot = test_support.import_module('hotshot', deprecated=True) from hotshot.log import ENTER, EXIT, LINE from hotshot import stats def shortfilename(fn): # We use a really shortened filename since an exact match is made, # and the source may be either a Python source file or a # pre-compiled bytecode file. if fn: return os.path.splitext(os.path.basename(fn))[0] else: return fn class UnlinkingLogReader(hotshot.log.LogReader): """Extend the LogReader so the log file is unlinked when we're done with it.""" def __init__(self, logfn): self.__logfn = logfn hotshot.log.LogReader.__init__(self, logfn) def next(self, index=None): try: return hotshot.log.LogReader.next(self) except StopIteration: self.close() os.unlink(self.__logfn) raise class HotShotTestCase(unittest.TestCase): def new_profiler(self, lineevents=0, linetimings=1): self.logfn = test_support.TESTFN return hotshot.Profile(self.logfn, lineevents, linetimings) def get_logreader(self): return UnlinkingLogReader(self.logfn) def get_events_wotime(self): L = [] for event in self.get_logreader(): what, (filename, lineno, funcname), tdelta = event L.append((what, (shortfilename(filename), lineno, funcname))) return L def check_events(self, expected): events = self.get_events_wotime() if events != expected: self.fail( "events did not match expectation; got:\n%s\nexpected:\n%s" % (pprint.pformat(events), pprint.pformat(expected))) def run_test(self, callable, events, profiler=None): if profiler is None: profiler = self.new_profiler() self.assertTrue(not profiler._prof.closed) profiler.runcall(callable) self.assertTrue(not profiler._prof.closed) profiler.close() self.assertTrue(profiler._prof.closed) self.check_events(events) def test_addinfo(self): def f(p): p.addinfo("test-key", "test-value") profiler = self.new_profiler() profiler.runcall(f, profiler) profiler.close() log = self.get_logreader() info = log._info list(log) self.assertTrue(info["test-key"] == ["test-value"]) def test_line_numbers(self): def f(): y = 2 x = 1 def g(): f() f_lineno = f.func_code.co_firstlineno g_lineno = g.func_code.co_firstlineno events = [(ENTER, ("test_hotshot", g_lineno, "g")), (LINE, ("test_hotshot", g_lineno+1, "g")), (ENTER, ("test_hotshot", f_lineno, "f")), (LINE, ("test_hotshot", f_lineno+1, "f")), (LINE, ("test_hotshot", f_lineno+2, "f")), (EXIT, ("test_hotshot", f_lineno, "f")), (EXIT, ("test_hotshot", g_lineno, "g")), ] self.run_test(g, events, self.new_profiler(lineevents=1)) def test_start_stop(self): # Make sure we don't return NULL in the start() and stop() # methods when there isn't an error. Bug in 2.2 noted by # Anthony Baxter. profiler = self.new_profiler() profiler.start() profiler.stop() profiler.close() os.unlink(self.logfn) def test_bad_sys_path(self): import sys import os orig_path = sys.path coverage = hotshot._hotshot.coverage try: # verify we require a list for sys.path sys.path = 'abc' self.assertRaises(RuntimeError, coverage, test_support.TESTFN) # verify that we require sys.path exists del sys.path self.assertRaises(RuntimeError, coverage, test_support.TESTFN) finally: sys.path = orig_path if os.path.exists(test_support.TESTFN): os.remove(test_support.TESTFN) def test_logreader_eof_error(self): emptyfile = tempfile.NamedTemporaryFile() try: self.assertRaises((IOError, EOFError), _hotshot.logreader, emptyfile.name) finally: emptyfile.close() gc.collect() def test_load_stats(self): def start(prof): prof.start() # Make sure stats can be loaded when start and stop of profiler # are not executed in the same stack frame. profiler = self.new_profiler() start(profiler) profiler.stop() profiler.close() stats.load(self.logfn) os.unlink(self.logfn) def test_large_info(self): p = self.new_profiler() self.assertRaises(ValueError, p.addinfo, "A", "A" * 0xfceb) def test_main(): test_support.run_unittest(HotShotTestCase) if __name__ == "__main__": test_main()