summaryrefslogtreecommitdiff
path: root/test/engine/execute.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/engine/execute.py')
-rw-r--r--test/engine/execute.py90
1 files changed, 86 insertions, 4 deletions
diff --git a/test/engine/execute.py b/test/engine/execute.py
index 260a05e27..36a6bc317 100644
--- a/test/engine/execute.py
+++ b/test/engine/execute.py
@@ -1,8 +1,13 @@
import testenv; testenv.configure_for_tests()
-from sqlalchemy import *
-from sqlalchemy import exceptions
-from testlib import *
+import re
+from sqlalchemy.interfaces import ConnectionProxy
+from testlib.sa import MetaData, Table, Column, Integer, String, INT, \
+ VARCHAR, func
+import testlib.sa as tsa
+from testlib import TestBase, testing, engines
+
+users, metadata = None, None
class ExecuteTest(TestBase):
def setUpAll(self):
global users, metadata
@@ -70,8 +75,85 @@ class ExecuteTest(TestBase):
try:
conn.execute("osdjafioajwoejoasfjdoifjowejfoawejqoijwef")
assert False
- except exceptions.DBAPIError:
+ except tsa.exc.DBAPIError:
assert True
+class ProxyConnectionTest(TestBase):
+ def test_proxy(self):
+
+ stmts = []
+ cursor_stmts = []
+
+ class MyProxy(ConnectionProxy):
+ def execute(self, conn, execute, clauseelement, *multiparams, **params):
+ stmts.append(
+ (str(clauseelement), params,multiparams)
+ )
+ return execute(clauseelement, *multiparams, **params)
+
+ def cursor_execute(self, execute, cursor, statement, parameters, context, executemany):
+ cursor_stmts.append(
+ (statement, parameters, None)
+ )
+ return execute(cursor, statement, parameters, context)
+
+ def assert_stmts(expected, received):
+ for stmt, params, posn in expected:
+ if not received:
+ assert False
+ while received:
+ teststmt, testparams, testmultiparams = received.pop(0)
+ teststmt = re.compile(r'[\n\t ]+', re.M).sub(' ', teststmt).strip()
+ if teststmt.startswith(stmt) and (testparams==params or testparams==posn):
+ break
+
+ for engine in (
+ engines.testing_engine(options=dict(proxy=MyProxy())),
+ engines.testing_engine(options=dict(proxy=MyProxy(), strategy='threadlocal'))
+ ):
+ m = MetaData(engine)
+
+ t1 = Table('t1', m, Column('c1', Integer, primary_key=True), Column('c2', String(50), default=func.lower('Foo'), primary_key=True))
+
+ m.create_all()
+ try:
+ t1.insert().execute(c1=5, c2='some data')
+ t1.insert().execute(c1=6)
+ assert engine.execute("select * from t1").fetchall() == [(5, 'some data'), (6, 'foo')]
+ finally:
+ m.drop_all()
+
+ engine.dispose()
+
+ compiled = [
+ ("CREATE TABLE t1", {}, None),
+ ("INSERT INTO t1 (c1, c2)", {'c2': 'some data', 'c1': 5}, None),
+ ("INSERT INTO t1 (c1, c2)", {'c1': 6}, None),
+ ("select * from t1", {}, None),
+ ("DROP TABLE t1", {}, None)
+ ]
+
+ if engine.dialect.preexecute_pk_sequences:
+ cursor = [
+ ("CREATE TABLE t1", {}, None),
+ ("INSERT INTO t1 (c1, c2)", {'c2': 'some data', 'c1': 5}, [5, 'some data']),
+ ("SELECT lower", {'lower_2':'Foo'}, ['Foo']),
+ ("INSERT INTO t1 (c1, c2)", {'c2': 'foo', 'c1': 6}, [6, 'foo']),
+ ("select * from t1", {}, None),
+ ("DROP TABLE t1", {}, None)
+ ]
+ else:
+ cursor = [
+ ("CREATE TABLE t1", {}, None),
+ ("INSERT INTO t1 (c1, c2)", {'c2': 'some data', 'c1': 5}, [5, 'some data']),
+ ("INSERT INTO t1 (c1, c2)", {'c1': 6, "lower_2":"Foo"}, [6, "Foo"]), # bind param name 'lower_2' might be incorrect
+ ("select * from t1", {}, None),
+ ("DROP TABLE t1", {}, None)
+ ]
+
+ assert_stmts(compiled, stmts)
+ assert_stmts(cursor, cursor_stmts)
+
+
if __name__ == "__main__":
testenv.main()