diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2009-06-10 21:18:24 +0000 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2009-06-10 21:18:24 +0000 |
| commit | 45cec095b4904ba71425d2fe18c143982dd08f43 (patch) | |
| tree | af5e540fdcbf1cb2a3337157d69d4b40be010fa8 /test/aaa_profiling/test_memusage.py | |
| parent | 698a3c1ac665e7cd2ef8d5ad3ebf51b7fe6661f4 (diff) | |
| download | sqlalchemy-45cec095b4904ba71425d2fe18c143982dd08f43.tar.gz | |
- unit tests have been migrated from unittest to nose.
See README.unittests for information on how to run
the tests. [ticket:970]
Diffstat (limited to 'test/aaa_profiling/test_memusage.py')
| -rw-r--r-- | test/aaa_profiling/test_memusage.py | 402 |
1 files changed, 402 insertions, 0 deletions
diff --git a/test/aaa_profiling/test_memusage.py b/test/aaa_profiling/test_memusage.py new file mode 100644 index 000000000..70a3cf8cd --- /dev/null +++ b/test/aaa_profiling/test_memusage.py @@ -0,0 +1,402 @@ +from sqlalchemy.test.testing import eq_ +import gc +from sqlalchemy.orm import mapper, relation, create_session, clear_mappers, sessionmaker +from sqlalchemy.orm.mapper import _mapper_registry +from sqlalchemy.orm.session import _sessions +import operator +from sqlalchemy.test import testing +from sqlalchemy import MetaData, Integer, String, ForeignKey, PickleType +from sqlalchemy.test.schema import Table +from sqlalchemy.test.schema import Column +import sqlalchemy as sa +from sqlalchemy.sql import column +from test.orm import _base + + +class A(_base.ComparableEntity): + pass +class B(_base.ComparableEntity): + pass + +def profile_memory(func): + # run the test 50 times. if length of gc.get_objects() + # keeps growing, assert false + def profile(*args): + gc.collect() + samples = [0 for x in range(0, 50)] + for x in range(0, 50): + func(*args) + gc.collect() + samples[x] = len(gc.get_objects()) + print "sample gc sizes:", samples + + assert len(_sessions) == 0 + + for x in samples[-4:]: + if x != samples[-5]: + flatline = False + break + else: + flatline = True + + if not flatline and samples[-1] > samples[0]: # object count is bigger than when it started + for x in samples[1:-2]: + if x > samples[-1]: # see if a spike bigger than the endpoint exists + break + else: + assert False, repr(samples) + " " + repr(flatline) + + return profile + +def assert_no_mappers(): + clear_mappers() + gc.collect() + assert len(_mapper_registry) == 0 + +class EnsureZeroed(_base.ORMTest): + def setup(self): + _sessions.clear() + _mapper_registry.clear() + +class MemUsageTest(EnsureZeroed): + + # ensure a pure growing test trips the assertion + @testing.fails_if(lambda:True) + def test_fixture(self): + class Foo(object): + pass + + x = [] + @profile_memory + def go(): + x[-1:] = [Foo(), Foo(), Foo(), Foo(), Foo(), Foo()] + go() + + def test_session(self): + metadata = MetaData(testing.db) + + table1 = Table("mytable", metadata, + Column('col1', Integer, primary_key=True), + Column('col2', String(30))) + + table2 = Table("mytable2", metadata, + Column('col1', Integer, primary_key=True), + Column('col2', String(30)), + Column('col3', Integer, ForeignKey("mytable.col1"))) + + metadata.create_all() + + m1 = mapper(A, table1, properties={ + "bs":relation(B, cascade="all, delete", order_by=table2.c.col1)}, + order_by=table1.c.col1) + m2 = mapper(B, table2) + + m3 = mapper(A, table1, non_primary=True) + + @profile_memory + def go(): + sess = create_session() + a1 = A(col2="a1") + a2 = A(col2="a2") + a3 = A(col2="a3") + a1.bs.append(B(col2="b1")) + a1.bs.append(B(col2="b2")) + a3.bs.append(B(col2="b3")) + for x in [a1,a2,a3]: + sess.add(x) + sess.flush() + sess.expunge_all() + + alist = sess.query(A).all() + eq_( + [ + A(col2="a1", bs=[B(col2="b1"), B(col2="b2")]), + A(col2="a2", bs=[]), + A(col2="a3", bs=[B(col2="b3")]) + ], + alist) + + for a in alist: + sess.delete(a) + sess.flush() + go() + + metadata.drop_all() + del m1, m2, m3 + assert_no_mappers() + + def test_mapper_reset(self): + metadata = MetaData(testing.db) + + table1 = Table("mytable", metadata, + Column('col1', Integer, primary_key=True), + Column('col2', String(30))) + + table2 = Table("mytable2", metadata, + Column('col1', Integer, primary_key=True), + Column('col2', String(30)), + Column('col3', Integer, ForeignKey("mytable.col1"))) + + @profile_memory + def go(): + m1 = mapper(A, table1, properties={ + "bs":relation(B, order_by=table2.c.col1) + }) + m2 = mapper(B, table2) + + m3 = mapper(A, table1, non_primary=True) + + sess = create_session() + a1 = A(col2="a1") + a2 = A(col2="a2") + a3 = A(col2="a3") + a1.bs.append(B(col2="b1")) + a1.bs.append(B(col2="b2")) + a3.bs.append(B(col2="b3")) + for x in [a1,a2,a3]: + sess.add(x) + sess.flush() + sess.expunge_all() + + alist = sess.query(A).order_by(A.col1).all() + eq_( + [ + A(col2="a1", bs=[B(col2="b1"), B(col2="b2")]), + A(col2="a2", bs=[]), + A(col2="a3", bs=[B(col2="b3")]) + ], + alist) + + for a in alist: + sess.delete(a) + sess.flush() + sess.close() + clear_mappers() + + metadata.create_all() + try: + go() + finally: + metadata.drop_all() + assert_no_mappers() + + def test_with_inheritance(self): + metadata = MetaData(testing.db) + + table1 = Table("mytable", metadata, + Column('col1', Integer, primary_key=True), + Column('col2', String(30)) + ) + + table2 = Table("mytable2", metadata, + Column('col1', Integer, ForeignKey('mytable.col1'), + primary_key=True), + Column('col3', String(30)), + ) + + @profile_memory + def go(): + class A(_base.ComparableEntity): + pass + class B(A): + pass + + mapper(A, table1, + polymorphic_on=table1.c.col2, + polymorphic_identity='a') + mapper(B, table2, + inherits=A, + polymorphic_identity='b') + + sess = create_session() + a1 = A() + a2 = A() + b1 = B(col3='b1') + b2 = B(col3='b2') + for x in [a1,a2,b1, b2]: + sess.add(x) + sess.flush() + sess.expunge_all() + + alist = sess.query(A).order_by(A.col1).all() + eq_( + [ + A(), A(), B(col3='b1'), B(col3='b2') + ], + alist) + + for a in alist: + sess.delete(a) + sess.flush() + + # dont need to clear_mappers() + del B + del A + + metadata.create_all() + try: + go() + finally: + metadata.drop_all() + assert_no_mappers() + + def test_with_manytomany(self): + metadata = MetaData(testing.db) + + table1 = Table("mytable", metadata, + Column('col1', Integer, primary_key=True), + Column('col2', String(30)) + ) + + table2 = Table("mytable2", metadata, + Column('col1', Integer, primary_key=True), + Column('col2', String(30)), + ) + + table3 = Table('t1tot2', metadata, + Column('t1', Integer, ForeignKey('mytable.col1')), + Column('t2', Integer, ForeignKey('mytable2.col1')), + ) + + @profile_memory + def go(): + class A(_base.ComparableEntity): + pass + class B(_base.ComparableEntity): + pass + + mapper(A, table1, properties={ + 'bs':relation(B, secondary=table3, backref='as', order_by=table3.c.t1) + }) + mapper(B, table2) + + sess = create_session() + a1 = A(col2='a1') + a2 = A(col2='a2') + b1 = B(col2='b1') + b2 = B(col2='b2') + a1.bs.append(b1) + a2.bs.append(b2) + for x in [a1,a2]: + sess.add(x) + sess.flush() + sess.expunge_all() + + alist = sess.query(A).order_by(A.col1).all() + eq_( + [ + A(bs=[B(col2='b1')]), A(bs=[B(col2='b2')]) + ], + alist) + + for a in alist: + sess.delete(a) + sess.flush() + + # dont need to clear_mappers() + del B + del A + + metadata.create_all() + try: + go() + finally: + metadata.drop_all() + assert_no_mappers() + + def test_join_cache(self): + metadata = MetaData(testing.db) + + table1 = Table("table1", metadata, + Column('id', Integer, primary_key=True), + Column('data', String(30)) + ) + + table2 = Table("table2", metadata, + Column('id', Integer, primary_key=True), + Column('data', String(30)), + Column('t1id', Integer, ForeignKey('table1.id')) + ) + + class Foo(object): + pass + + class Bar(object): + pass + + mapper(Foo, table1, properties={ + 'bars':relation(mapper(Bar, table2)) + }) + metadata.create_all() + + session = sessionmaker() + + @profile_memory + def go(): + s = table2.select() + sess = session() + sess.query(Foo).join((s, Foo.bars)).all() + sess.rollback() + try: + go() + finally: + metadata.drop_all() + + + def test_mutable_identity(self): + metadata = MetaData(testing.db) + + table1 = Table("mytable", metadata, + Column('col1', Integer, primary_key=True), + Column('col2', PickleType(comparator=operator.eq)) + ) + + class Foo(object): + def __init__(self, col2): + self.col2 = col2 + + mapper(Foo, table1) + metadata.create_all() + + session = sessionmaker()() + + def go(): + obj = [ + Foo({'a':1}), + Foo({'b':1}), + Foo({'c':1}), + Foo({'d':1}), + Foo({'e':1}), + Foo({'f':1}), + Foo({'g':1}), + Foo({'h':1}), + Foo({'i':1}), + Foo({'j':1}), + Foo({'k':1}), + Foo({'l':1}), + ] + + session.add_all(obj) + session.commit() + + testing.eq_(len(session.identity_map._mutable_attrs), 12) + testing.eq_(len(session.identity_map), 12) + obj = None + gc.collect() + testing.eq_(len(session.identity_map._mutable_attrs), 0) + testing.eq_(len(session.identity_map), 0) + + try: + go() + finally: + metadata.drop_all() + + def test_type_compile(self): + from sqlalchemy.databases.sqlite import SQLiteDialect + cast = sa.cast(column('x'), sa.Integer) + @profile_memory + def go(): + dialect = SQLiteDialect() + cast.compile(dialect=dialect) + go() + |
