#!/usr/bin/env python # Copyright 2014 The Chromium Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import contextlib import json import logging import math import multiprocessing import os import tempfile import time import unittest from py_trace_event import trace_event from py_trace_event import trace_time from py_trace_event.trace_event_impl import log class TraceEventTests(unittest.TestCase): def setUp(self): tf = tempfile.NamedTemporaryFile(delete=False) self._log_path = tf.name tf.close() def tearDown(self): if os.path.exists(self._log_path): os.remove(self._log_path) @contextlib.contextmanager def _test_trace(self, disable=True): try: trace_event.trace_enable(self._log_path) yield finally: if disable: trace_event.trace_disable() def testNoImpl(self): orig_impl = trace_event.trace_event_impl try: trace_event.trace_event_impl = None self.assertFalse(trace_event.trace_can_enable()) finally: trace_event.trace_event_impl = orig_impl def testImpl(self): self.assertTrue(trace_event.trace_can_enable()) def testIsEnabledFalse(self): self.assertFalse(trace_event.trace_is_enabled()) def testIsEnabledTrue(self): with self._test_trace(): self.assertTrue(trace_event.trace_is_enabled()) def testEnable(self): with self._test_trace(): with open(self._log_path, 'r') as f: log_output = json.loads(f.read() + ']') self.assertEquals(len(log_output), 1) self.assertTrue(trace_event.trace_is_enabled()) log_output = log_output.pop() self.assertEquals(log_output['category'], 'process_argv') self.assertEquals(log_output['name'], 'process_argv') self.assertTrue(log_output['args']['argv']) self.assertEquals(log_output['ph'], 'M') def testDoubleEnable(self): try: with self._test_trace(): with self._test_trace(): pass except log.TraceException: return assert False def testDisable(self): with self._test_trace(disable=False): with open(self._log_path, 'r') as f: self.assertTrue(trace_event.trace_is_enabled()) trace_event.trace_disable() self.assertEquals(len(json.loads(f.read() + ']')), 1) self.assertFalse(trace_event.trace_is_enabled()) def testDoubleDisable(self): with self._test_trace(): pass trace_event.trace_disable() def testFlushChanges(self): with self._test_trace(): with open(self._log_path, 'r') as f: trace_event.clock_sync('1') self.assertEquals(len(json.loads(f.read() + ']')), 1) f.seek(0) trace_event.trace_flush() self.assertEquals(len(json.loads(f.read() + ']')), 2) def testFlushNoChanges(self): with self._test_trace(): with open(self._log_path, 'r') as f: self.assertEquals(len(json.loads(f.read() + ']')),1) f.seek(0) trace_event.trace_flush() self.assertEquals(len(json.loads(f.read() + ']')), 1) def testDoubleFlush(self): with self._test_trace(): with open(self._log_path, 'r') as f: trace_event.clock_sync('1') self.assertEquals(len(json.loads(f.read() + ']')), 1) f.seek(0) trace_event.trace_flush() trace_event.trace_flush() self.assertEquals(len(json.loads(f.read() + ']')), 2) def testTraceBegin(self): with self._test_trace(): with open(self._log_path, 'r') as f: trace_event.trace_begin('test_event', this='that') trace_event.trace_flush() log_output = json.loads(f.read() + ']') self.assertEquals(len(log_output), 2) current_entry = log_output.pop(0) self.assertEquals(current_entry['category'], 'process_argv') self.assertEquals(current_entry['name'], 'process_argv') self.assertTrue( current_entry['args']['argv']) self.assertEquals( current_entry['ph'], 'M') current_entry = log_output.pop(0) self.assertEquals(current_entry['category'], 'python') self.assertEquals(current_entry['name'], 'test_event') self.assertEquals(current_entry['args']['this'], '\'that\'') self.assertEquals(current_entry['ph'], 'B') def testTraceEnd(self): with self._test_trace(): with open(self._log_path, 'r') as f: trace_event.trace_end('test_event') trace_event.trace_flush() log_output = json.loads(f.read() + ']') self.assertEquals(len(log_output), 2) current_entry = log_output.pop(0) self.assertEquals(current_entry['category'], 'process_argv') self.assertEquals(current_entry['name'], 'process_argv') self.assertTrue(current_entry['args']['argv']) self.assertEquals(current_entry['ph'], 'M') current_entry = log_output.pop(0) self.assertEquals(current_entry['category'], 'python') self.assertEquals(current_entry['name'], 'test_event') self.assertEquals(current_entry['args'], {}) self.assertEquals(current_entry['ph'], 'E') def testTrace(self): with self._test_trace(): with trace_event.trace('test_event', this='that'): pass trace_event.trace_flush() with open(self._log_path, 'r') as f: log_output = json.loads(f.read() + ']') self.assertEquals(len(log_output), 3) current_entry = log_output.pop(0) self.assertEquals(current_entry['category'], 'process_argv') self.assertEquals(current_entry['name'], 'process_argv') self.assertTrue(current_entry['args']['argv']) self.assertEquals(current_entry['ph'], 'M') current_entry = log_output.pop(0) self.assertEquals(current_entry['category'], 'python') self.assertEquals(current_entry['name'], 'test_event') self.assertEquals(current_entry['args']['this'], '\'that\'') self.assertEquals(current_entry['ph'], 'B') current_entry = log_output.pop(0) self.assertEquals(current_entry['category'], 'python') self.assertEquals(current_entry['name'], 'test_event') self.assertEquals(current_entry['args'], {}) self.assertEquals(current_entry['ph'], 'E') def testTracedDecorator(self): @trace_event.traced("this") def test_decorator(this="that"): pass with self._test_trace(): test_decorator() trace_event.trace_flush() with open(self._log_path, 'r') as f: log_output = json.loads(f.read() + ']') self.assertEquals(len(log_output), 3) current_entry = log_output.pop(0) self.assertEquals(current_entry['category'], 'process_argv') self.assertEquals(current_entry['name'], 'process_argv') self.assertTrue(current_entry['args']['argv']) self.assertEquals(current_entry['ph'], 'M') current_entry = log_output.pop(0) self.assertEquals(current_entry['category'], 'python') self.assertEquals(current_entry['name'], '__main__.test_decorator') self.assertEquals(current_entry['args']['this'], '\'that\'') self.assertEquals(current_entry['ph'], 'B') current_entry = log_output.pop(0) self.assertEquals(current_entry['category'], 'python') self.assertEquals(current_entry['name'], '__main__.test_decorator') self.assertEquals(current_entry['args'], {}) self.assertEquals(current_entry['ph'], 'E') def testClockSyncWithTs(self): with self._test_trace(): with open(self._log_path, 'r') as f: trace_event.clock_sync('id', issue_ts=trace_time.Now()) trace_event.trace_flush() log_output = json.loads(f.read() + ']') self.assertEquals(len(log_output), 2) current_entry = log_output.pop(0) self.assertEquals(current_entry['category'], 'process_argv') self.assertEquals(current_entry['name'], 'process_argv') self.assertTrue(current_entry['args']['argv']) self.assertEquals(current_entry['ph'], 'M') current_entry = log_output.pop(0) self.assertEquals(current_entry['category'], 'python') self.assertEquals(current_entry['name'], 'clock_sync') self.assertTrue(current_entry['args']['issue_ts']) self.assertEquals(current_entry['ph'], 'c') def testClockSyncWithoutTs(self): with self._test_trace(): with open(self._log_path, 'r') as f: trace_event.clock_sync('id') trace_event.trace_flush() log_output = json.loads(f.read() + ']') self.assertEquals(len(log_output), 2) current_entry = log_output.pop(0) self.assertEquals(current_entry['category'], 'process_argv') self.assertEquals(current_entry['name'], 'process_argv') self.assertTrue(current_entry['args']['argv']) self.assertEquals(current_entry['ph'], 'M') current_entry = log_output.pop(0) self.assertEquals(current_entry['category'], 'python') self.assertEquals(current_entry['name'], 'clock_sync') self.assertFalse(current_entry['args'].get('issue_ts')) self.assertEquals(current_entry['ph'], 'c') def testTime(self): actual_diff = [] def func1(): trace_begin("func1") start = time.time() time.sleep(0.25) end = time.time() actual_diff.append(end-start) # Pass via array because of Python scoping trace_end("func1") with self._test_trace(): start_ts = time.time() trace_event.trace_begin('test') end_ts = time.time() trace_event.trace_end('test') trace_event.trace_flush() with open(self._log_path, 'r') as f: log_output = json.loads(f.read() + ']') self.assertEquals(len(log_output), 3) meta_data = log_output[0] open_data = log_output[1] close_data = log_output[2] self.assertEquals(meta_data['category'], 'process_argv') self.assertEquals(meta_data['name'], 'process_argv') self.assertTrue(meta_data['args']['argv']) self.assertEquals(meta_data['ph'], 'M') self.assertEquals(open_data['category'], 'python') self.assertEquals(open_data['name'], 'test') self.assertEquals(open_data['ph'], 'B') self.assertEquals(close_data['category'], 'python') self.assertEquals(close_data['name'], 'test') self.assertEquals(close_data['ph'], 'E') event_time_diff = close_data['ts'] - open_data['ts'] recorded_time_diff = (end_ts - start_ts) * 1000000 self.assertLess(math.fabs(event_time_diff - recorded_time_diff), 1000) def testNestedCalls(self): with self._test_trace(): trace_event.trace_begin('one') trace_event.trace_begin('two') trace_event.trace_end('two') trace_event.trace_end('one') trace_event.trace_flush() with open(self._log_path, 'r') as f: log_output = json.loads(f.read() + ']') self.assertEquals(len(log_output), 5) meta_data = log_output[0] one_open = log_output[1] two_open = log_output[2] two_close = log_output[3] one_close = log_output[4] self.assertEquals(meta_data['category'], 'process_argv') self.assertEquals(meta_data['name'], 'process_argv') self.assertTrue(meta_data['args']['argv']) self.assertEquals(meta_data['ph'], 'M') self.assertEquals(one_open['category'], 'python') self.assertEquals(one_open['name'], 'one') self.assertEquals(one_open['ph'], 'B') self.assertEquals(one_close['category'], 'python') self.assertEquals(one_close['name'], 'one') self.assertEquals(one_close['ph'], 'E') self.assertEquals(two_open['category'], 'python') self.assertEquals(two_open['name'], 'two') self.assertEquals(two_open['ph'], 'B') self.assertEquals(two_close['category'], 'python') self.assertEquals(two_close['name'], 'two') self.assertEquals(two_close['ph'], 'E') self.assertLessEqual(one_open['ts'], two_open['ts']) self.assertGreaterEqual(one_close['ts'], two_close['ts']) def testInterleavedCalls(self): with self._test_trace(): trace_event.trace_begin('one') trace_event.trace_begin('two') trace_event.trace_end('one') trace_event.trace_end('two') trace_event.trace_flush() with open(self._log_path, 'r') as f: log_output = json.loads(f.read() + ']') self.assertEquals(len(log_output), 5) meta_data = log_output[0] one_open = log_output[1] two_open = log_output[2] two_close = log_output[4] one_close = log_output[3] self.assertEquals(meta_data['category'], 'process_argv') self.assertEquals(meta_data['name'], 'process_argv') self.assertTrue(meta_data['args']['argv']) self.assertEquals(meta_data['ph'], 'M') self.assertEquals(one_open['category'], 'python') self.assertEquals(one_open['name'], 'one') self.assertEquals(one_open['ph'], 'B') self.assertEquals(one_close['category'], 'python') self.assertEquals(one_close['name'], 'one') self.assertEquals(one_close['ph'], 'E') self.assertEquals(two_open['category'], 'python') self.assertEquals(two_open['name'], 'two') self.assertEquals(two_open['ph'], 'B') self.assertEquals(two_close['category'], 'python') self.assertEquals(two_close['name'], 'two') self.assertEquals(two_close['ph'], 'E') self.assertLessEqual(one_open['ts'], two_open['ts']) self.assertLessEqual(one_close['ts'], two_close['ts']) def testMultiprocess(self): def child_function(): with trace_event.trace('child_event'): pass with self._test_trace(): trace_event.trace_begin('parent_event') trace_event.trace_flush() p = multiprocessing.Process(target=child_function) p.start() self.assertTrue(hasattr(p, "_shimmed_by_trace_event")) p.join() trace_event.trace_end('parent_event') trace_event.trace_flush() with open(self._log_path, 'r') as f: log_output = json.loads(f.read() + ']') self.assertEquals(len(log_output), 5) meta_data = log_output[0] parent_open = log_output[1] child_open = log_output[2] child_close = log_output[3] parent_close = log_output[4] self.assertEquals(meta_data['category'], 'process_argv') self.assertEquals(meta_data['name'], 'process_argv') self.assertTrue(meta_data['args']['argv']) self.assertEquals(meta_data['ph'], 'M') self.assertEquals(parent_open['category'], 'python') self.assertEquals(parent_open['name'], 'parent_event') self.assertEquals(parent_open['ph'], 'B') self.assertEquals(child_open['category'], 'python') self.assertEquals(child_open['name'], 'child_event') self.assertEquals(child_open['ph'], 'B') self.assertEquals(child_close['category'], 'python') self.assertEquals(child_close['name'], 'child_event') self.assertEquals(child_close['ph'], 'E') self.assertEquals(parent_close['category'], 'python') self.assertEquals(parent_close['name'], 'parent_event') self.assertEquals(parent_close['ph'], 'E') def testMultiprocessExceptionInChild(self): def bad_child(): trace_event.trace_disable() with self._test_trace(): p = multiprocessing.Pool(1) trace_event.trace_begin('parent') self.assertRaises(Exception, lambda: p.apply(bad_child, ())) p.close() p.terminate() p.join() trace_event.trace_end('parent') trace_event.trace_flush() with open(self._log_path, 'r') as f: log_output = json.loads(f.read() + ']') self.assertEquals(len(log_output), 3) meta_data = log_output[0] parent_open = log_output[1] parent_close = log_output[2] self.assertEquals(parent_open['category'], 'python') self.assertEquals(parent_open['name'], 'parent') self.assertEquals(parent_open['ph'], 'B') self.assertEquals(parent_close['category'], 'python') self.assertEquals(parent_close['name'], 'parent') self.assertEquals(parent_close['ph'], 'E') if __name__ == '__main__': logging.getLogger().setLevel(logging.DEBUG) unittest.main(verbosity=2)