diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2010-02-28 23:51:54 +0000 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2010-02-28 23:51:54 +0000 |
| commit | a76927f584dab481383592645a2a471fed37ecf9 (patch) | |
| tree | 1303912df863199b07bdbcf4a5ffd1abf63701d6 /test | |
| parent | c366b7ec4fded0c8baabcdc6e6f15ecd81a4bf00 (diff) | |
| download | sqlalchemy-a76927f584dab481383592645a2a471fed37ecf9.tar.gz | |
- the execution sequence pulls all rowcount/last inserted ID
info from the cursor before commit() is called on the
DBAPI connection in an "autocommit" scenario. This helps
mxodbc with rowcount and is probably a good idea overall.
- cx_oracle wants list(), not tuple(), for empty execute.
- cleaned up plain SQL param handling
Diffstat (limited to 'test')
| -rw-r--r-- | test/engine/test_execute.py | 67 | ||||
| -rw-r--r-- | test/sql/test_rowcount.py | 9 |
2 files changed, 51 insertions, 25 deletions
diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py index 4a45fceb3..1752fda0d 100644 --- a/test/engine/test_execute.py +++ b/test/engine/test_execute.py @@ -12,9 +12,13 @@ users, metadata = None, None class ExecuteTest(TestBase): @classmethod def setup_class(cls): - global users, metadata + global users, users_autoinc, metadata metadata = MetaData(testing.db) users = Table('users', metadata, + Column('user_id', INT, primary_key = True, autoincrement=False), + Column('user_name', VARCHAR(20)), + ) + users_autoinc = Table('users_autoinc', metadata, Column('user_id', INT, primary_key = True, test_needs_autoincrement=True), Column('user_name', VARCHAR(20)), ) @@ -28,16 +32,22 @@ class ExecuteTest(TestBase): def teardown_class(cls): metadata.drop_all() - @testing.fails_on_everything_except('firebird', 'maxdb', 'sqlite', 'mysql+pyodbc', '+zxjdbc', 'mysql+oursql') + @testing.fails_on_everything_except('firebird', 'maxdb', 'sqlite', '+pyodbc', '+mxodbc', '+zxjdbc', 'mysql+oursql') def test_raw_qmark(self): for conn in (testing.db, testing.db.connect()): conn.execute("insert into users (user_id, user_name) values (?, ?)", (1,"jack")) conn.execute("insert into users (user_id, user_name) values (?, ?)", [2,"fred"]) - conn.execute("insert into users (user_id, user_name) values (?, ?)", [3,"ed"], [4,"horse"]) - conn.execute("insert into users (user_id, user_name) values (?, ?)", (5,"barney"), (6,"donkey")) + conn.execute("insert into users (user_id, user_name) values (?, ?)", + [3,"ed"], + [4,"horse"]) + conn.execute("insert into users (user_id, user_name) values (?, ?)", + (5,"barney"), (6,"donkey")) conn.execute("insert into users (user_id, user_name) values (?, ?)", 7, 'sally') res = conn.execute("select * from users order by user_id") - assert res.fetchall() == [(1, "jack"), (2, "fred"), (3, "ed"), (4, "horse"), (5, "barney"), (6, "donkey"), (7, 'sally')] + assert res.fetchall() == [(1, "jack"), (2, "fred"), + (3, "ed"), (4, "horse"), + (5, "barney"), (6, "donkey"), + (7, 'sally')] conn.execute("delete from users") @testing.fails_on_everything_except('mysql+mysqldb', 'postgresql') @@ -46,11 +56,15 @@ class ExecuteTest(TestBase): def test_raw_sprintf(self): for conn in (testing.db, testing.db.connect()): conn.execute("insert into users (user_id, user_name) values (%s, %s)", [1,"jack"]) - conn.execute("insert into users (user_id, user_name) values (%s, %s)", [2,"ed"], [3,"horse"]) + conn.execute("insert into users (user_id, user_name) values (%s, %s)", + [2,"ed"], + [3,"horse"]) conn.execute("insert into users (user_id, user_name) values (%s, %s)", 4, 'sally') conn.execute("insert into users (user_id) values (%s)", 5) res = conn.execute("select * from users order by user_id") - assert res.fetchall() == [(1, "jack"), (2, "ed"), (3, "horse"), (4, 'sally'), (5, None)] + assert res.fetchall() == [(1, "jack"), (2, "ed"), + (3, "horse"), (4, 'sally'), + (5, None)] conn.execute("delete from users") # pyformat is supported for mysql, but skipping because a few driver @@ -59,9 +73,12 @@ class ExecuteTest(TestBase): @testing.fails_on_everything_except('postgresql+psycopg2', 'postgresql+pypostgresql') def test_raw_python(self): for conn in (testing.db, testing.db.connect()): - conn.execute("insert into users (user_id, user_name) values (%(id)s, %(name)s)", {'id':1, 'name':'jack'}) - conn.execute("insert into users (user_id, user_name) values (%(id)s, %(name)s)", {'id':2, 'name':'ed'}, {'id':3, 'name':'horse'}) - conn.execute("insert into users (user_id, user_name) values (%(id)s, %(name)s)", id=4, name='sally') + conn.execute("insert into users (user_id, user_name) values (%(id)s, %(name)s)", + {'id':1, 'name':'jack'}) + conn.execute("insert into users (user_id, user_name) values (%(id)s, %(name)s)", + {'id':2, 'name':'ed'}, {'id':3, 'name':'horse'}) + conn.execute("insert into users (user_id, user_name) values (%(id)s, %(name)s)", + id=4, name='sally') res = conn.execute("select * from users order by user_id") assert res.fetchall() == [(1, "jack"), (2, "ed"), (3, "horse"), (4, 'sally')] conn.execute("delete from users") @@ -69,9 +86,12 @@ class ExecuteTest(TestBase): @testing.fails_on_everything_except('sqlite', 'oracle+cx_oracle') def test_raw_named(self): for conn in (testing.db, testing.db.connect()): - conn.execute("insert into users (user_id, user_name) values (:id, :name)", {'id':1, 'name':'jack'}) - conn.execute("insert into users (user_id, user_name) values (:id, :name)", {'id':2, 'name':'ed'}, {'id':3, 'name':'horse'}) - conn.execute("insert into users (user_id, user_name) values (:id, :name)", id=4, name='sally') + conn.execute("insert into users (user_id, user_name) values (:id, :name)", + {'id':1, 'name':'jack'}) + conn.execute("insert into users (user_id, user_name) values (:id, :name)", + {'id':2, 'name':'ed'}, {'id':3, 'name':'horse'}) + conn.execute("insert into users (user_id, user_name) values (:id, :name)", + id=4, name='sally') res = conn.execute("select * from users order by user_id") assert res.fetchall() == [(1, "jack"), (2, "ed"), (3, "horse"), (4, 'sally')] conn.execute("delete from users") @@ -86,8 +106,8 @@ class ExecuteTest(TestBase): def test_empty_insert(self): """test that execute() interprets [] as a list with no params""" - result = testing.db.execute(users.insert().values(user_name=bindparam('name')), []) - eq_(testing.db.execute(users.select()).fetchall(), [ + result = testing.db.execute(users_autoinc.insert().values(user_name=bindparam('name')), []) + eq_(testing.db.execute(users_autoinc.select()).fetchall(), [ (1, None) ]) @@ -124,17 +144,25 @@ class ProxyConnectionTest(TestBase): for engine in ( engines.testing_engine(options=dict(implicit_returning=False, proxy=MyProxy())), - engines.testing_engine(options=dict(implicit_returning=False, proxy=MyProxy(), strategy='threadlocal')) + engines.testing_engine(options=dict( + implicit_returning=False, + proxy=MyProxy(), + strategy='threadlocal')) ): m = MetaData(engine) - t1 = Table('t1', m, Column('c1', Integer, primary_key=True), Column('c2', String(50), default=func.lower('Foo'), primary_key=True)) + t1 = Table('t1', m, + Column('c1', Integer, primary_key=True), + Column('c2', String(50), default=func.lower('Foo'), primary_key=True) + ) m.create_all() try: t1.insert().execute(c1=5, c2='some data') t1.insert().execute(c1=6) - assert engine.execute("select * from t1").fetchall() == [(5, 'some data'), (6, 'foo')] + eq_(engine.execute("select * from t1").fetchall(), + [(5, 'some data'), (6, 'foo')] + ) finally: m.drop_all() @@ -165,7 +193,8 @@ class ProxyConnectionTest(TestBase): cursor = [ ("CREATE TABLE t1", {}, ()), ("INSERT INTO t1 (c1, c2)", {'c2': 'some data', 'c1': 5}, (5, 'some data')), - ("INSERT INTO t1 (c1, c2)", {'c1': 6, "lower_2":"Foo"}, insert2_params), # bind param name 'lower_2' might be incorrect + # bind param name 'lower_2' might be incorrect + ("INSERT INTO t1 (c1, c2)", {'c1': 6, "lower_2":"Foo"}, insert2_params), ("select * from t1", {}, ()), ("DROP TABLE t1", {}, ()) ] diff --git a/test/sql/test_rowcount.py b/test/sql/test_rowcount.py index 6da25b914..9577b104a 100644 --- a/test/sql/test_rowcount.py +++ b/test/sql/test_rowcount.py @@ -54,22 +54,19 @@ class FoundRowsTest(TestBase, AssertsExecutionResults): department = employees_table.c.department r = employees_table.update(department=='C').execute(department='Z') print "expecting 3, dialect reports %s" % r.rowcount - if testing.db.dialect.supports_sane_rowcount: - assert r.rowcount == 3 + assert r.rowcount == 3 def test_update_rowcount2(self): # WHERE matches 3, 0 rows changed department = employees_table.c.department r = employees_table.update(department=='C').execute(department='C') print "expecting 3, dialect reports %s" % r.rowcount - if testing.db.dialect.supports_sane_rowcount: - assert r.rowcount == 3 + assert r.rowcount == 3 def test_delete_rowcount(self): # WHERE matches 3, 3 rows deleted department = employees_table.c.department r = employees_table.delete(department=='C').execute() print "expecting 3, dialect reports %s" % r.rowcount - if testing.db.dialect.supports_sane_rowcount: - assert r.rowcount == 3 + assert r.rowcount == 3 |
