diff options
Diffstat (limited to 'test/engine/execute.py')
| -rw-r--r-- | test/engine/execute.py | 90 |
1 files changed, 86 insertions, 4 deletions
diff --git a/test/engine/execute.py b/test/engine/execute.py index 260a05e27..36a6bc317 100644 --- a/test/engine/execute.py +++ b/test/engine/execute.py @@ -1,8 +1,13 @@ import testenv; testenv.configure_for_tests() -from sqlalchemy import * -from sqlalchemy import exceptions -from testlib import * +import re +from sqlalchemy.interfaces import ConnectionProxy +from testlib.sa import MetaData, Table, Column, Integer, String, INT, \ + VARCHAR, func +import testlib.sa as tsa +from testlib import TestBase, testing, engines + +users, metadata = None, None class ExecuteTest(TestBase): def setUpAll(self): global users, metadata @@ -70,8 +75,85 @@ class ExecuteTest(TestBase): try: conn.execute("osdjafioajwoejoasfjdoifjowejfoawejqoijwef") assert False - except exceptions.DBAPIError: + except tsa.exc.DBAPIError: assert True +class ProxyConnectionTest(TestBase): + def test_proxy(self): + + stmts = [] + cursor_stmts = [] + + class MyProxy(ConnectionProxy): + def execute(self, conn, execute, clauseelement, *multiparams, **params): + stmts.append( + (str(clauseelement), params,multiparams) + ) + return execute(clauseelement, *multiparams, **params) + + def cursor_execute(self, execute, cursor, statement, parameters, context, executemany): + cursor_stmts.append( + (statement, parameters, None) + ) + return execute(cursor, statement, parameters, context) + + def assert_stmts(expected, received): + for stmt, params, posn in expected: + if not received: + assert False + while received: + teststmt, testparams, testmultiparams = received.pop(0) + teststmt = re.compile(r'[\n\t ]+', re.M).sub(' ', teststmt).strip() + if teststmt.startswith(stmt) and (testparams==params or testparams==posn): + break + + for engine in ( + engines.testing_engine(options=dict(proxy=MyProxy())), + engines.testing_engine(options=dict(proxy=MyProxy(), strategy='threadlocal')) + ): + m = MetaData(engine) + + t1 = Table('t1', m, Column('c1', Integer, primary_key=True), Column('c2', String(50), default=func.lower('Foo'), primary_key=True)) + + m.create_all() + try: + t1.insert().execute(c1=5, c2='some data') + t1.insert().execute(c1=6) + assert engine.execute("select * from t1").fetchall() == [(5, 'some data'), (6, 'foo')] + finally: + m.drop_all() + + engine.dispose() + + compiled = [ + ("CREATE TABLE t1", {}, None), + ("INSERT INTO t1 (c1, c2)", {'c2': 'some data', 'c1': 5}, None), + ("INSERT INTO t1 (c1, c2)", {'c1': 6}, None), + ("select * from t1", {}, None), + ("DROP TABLE t1", {}, None) + ] + + if engine.dialect.preexecute_pk_sequences: + cursor = [ + ("CREATE TABLE t1", {}, None), + ("INSERT INTO t1 (c1, c2)", {'c2': 'some data', 'c1': 5}, [5, 'some data']), + ("SELECT lower", {'lower_2':'Foo'}, ['Foo']), + ("INSERT INTO t1 (c1, c2)", {'c2': 'foo', 'c1': 6}, [6, 'foo']), + ("select * from t1", {}, None), + ("DROP TABLE t1", {}, None) + ] + else: + cursor = [ + ("CREATE TABLE t1", {}, None), + ("INSERT INTO t1 (c1, c2)", {'c2': 'some data', 'c1': 5}, [5, 'some data']), + ("INSERT INTO t1 (c1, c2)", {'c1': 6, "lower_2":"Foo"}, [6, "Foo"]), # bind param name 'lower_2' might be incorrect + ("select * from t1", {}, None), + ("DROP TABLE t1", {}, None) + ] + + assert_stmts(compiled, stmts) + assert_stmts(cursor, cursor_stmts) + + if __name__ == "__main__": testenv.main() |
