From 2c594da2148bf15bcb8e10fc9616bbacc70b61a3 Mon Sep 17 00:00:00 2001 From: Mike Bayer Date: Mon, 14 Aug 2017 12:04:26 -0400 Subject: Isolate memory tests in forks Swing the biggest hammer, run multiprocessing.Process() for each memusage test individually so that they are fully isolated from the parent process and any side effects of pytest-xdist Also add --nomemory as a shortcut to exclude_tags=memory-intensive and add this to the setup.py test runner as the memory tests should not be running for quick runs Change-Id: I3c16c781e21b33deb939a64e77a6e0e41fb86922 --- test/aaa_profiling/test_memusage.py | 111 +++++++++++++++++++++++++++--------- 1 file changed, 83 insertions(+), 28 deletions(-) (limited to 'test') diff --git a/test/aaa_profiling/test_memusage.py b/test/aaa_profiling/test_memusage.py index dfb4a4955..3181cfe61 100644 --- a/test/aaa_profiling/test_memusage.py +++ b/test/aaa_profiling/test_memusage.py @@ -21,6 +21,7 @@ from sqlalchemy import util import weakref import itertools +import multiprocessing class A(fixtures.ComparableEntity): pass @@ -52,7 +53,12 @@ def profile_memory(maxtimes=250, else: return gc.get_objects() - def profile(*args): + def profile(queue, func_args): + # give testing.db a brand new pool and don't + # touch the existing pool, since closing a socket + # in the subprocess can affect the parent + testing.db.pool = testing.db.pool.recreate() + gc_collect() samples = [] max_ = 0 @@ -63,29 +69,27 @@ def profile_memory(maxtimes=250, if until_maxtimes >= maxtimes // 5: break for x in range(5): - func(*args) + func(*func_args) gc_collect() samples.append( get_num_objects() if get_num_objects is not None else len(get_objects_skipping_sqlite_issue()) ) - # note: this prints lots of text, and when using pytest-xdist, - # actually interferes with memory itself from just sending - # the stdout between processes :). - # need to figure out a "condiional print" that doesn't send - # any stdout when we have pytest-xdist happening - # print("sample gc sizes:", samples) - if assert_no_sessions: assert len(_sessions) == 0 + # queue.put(('samples', samples)) + latest_max = max(samples[-5:]) if latest_max > max_: - print( - "Max grew from %s to %s, max has " - "grown for %s samples" % ( - max_, latest_max, max_grew_for + queue.put( + ( + 'status', + "Max grew from %s to %s, max has " + "grown for %s samples" % ( + max_, latest_max, max_grew_for + ) ) ) max_ = latest_max @@ -93,22 +97,57 @@ def profile_memory(maxtimes=250, until_maxtimes += 1 continue else: - print("Max remained at %s, %s more attempts left" % - (max_, max_grew_for)) + queue.put( + ( + 'status', + "Max remained at %s, %s more attempts left" % + (max_, max_grew_for) + ) + ) max_grew_for -= 1 if max_grew_for == 0: success = True break if not success: - assert False, \ - "Ran for a total of %d times, memory kept growing: %r" % ( - maxtimes, - samples + queue.put( + ( + 'result', + False, + "Ran for a total of %d times, memory kept " + "growing: %r" % ( + maxtimes, + samples + ) ) + ) + + else: + queue.put( + ('result', True, 'success') + ) + + def run_in_process(*func_args): + queue = multiprocessing.Queue() + proc = multiprocessing.Process( + target=profile, args=(queue, func_args)) + proc.start() + while True: + row = queue.get() + typ = row[0] + if typ == 'samples': + print("sample gc sizes:", row[1]) + elif typ == 'status': + print(row[1]) + elif typ == 'result': + break + else: + assert False, "can't parse row" + proc.join() + assert row[1], row[2] + + return run_in_process - assert success - return profile return decorate @@ -190,6 +229,19 @@ class MemUsageTest(EnsureZeroed): assert not eng.dialect._type_memos + @testing.fails() + def test_fixture_failure(self): + class Foo(object): + pass + stuff = [] + + @profile_memory(maxtimes=20) + def go(): + stuff.extend( + Foo() for i in range(100) + ) + go() + class MemUsageWBackendTest(EnsureZeroed): @@ -416,19 +468,22 @@ class MemUsageWBackendTest(EnsureZeroed): target_strings = session.connection().\ dialect.identifier_preparer._strings - with session.transaction: - @profile_memory( - assert_no_sessions=False, - get_num_objects=lambda: len(target_strings)) - def go(): + session.close() + + @profile_memory( + assert_no_sessions=False, + get_num_objects=lambda: len(target_strings) + ) + def go(): + session = Session(testing.db) + with session.transaction: sc = SomeClass() session.add(sc) - with session.begin_nested(): session.query(SomeClass).first() - go() + go() @testing.crashes('mysql+cymysql', 'blocking') def test_unicode_warnings(self): -- cgit v1.2.1