summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2010-02-28 23:51:54 +0000
committerMike Bayer <mike_mp@zzzcomputing.com>2010-02-28 23:51:54 +0000
commita76927f584dab481383592645a2a471fed37ecf9 (patch)
tree1303912df863199b07bdbcf4a5ffd1abf63701d6 /test
parentc366b7ec4fded0c8baabcdc6e6f15ecd81a4bf00 (diff)
downloadsqlalchemy-a76927f584dab481383592645a2a471fed37ecf9.tar.gz
- the execution sequence pulls all rowcount/last inserted ID
info from the cursor before commit() is called on the DBAPI connection in an "autocommit" scenario. This helps mxodbc with rowcount and is probably a good idea overall. - cx_oracle wants list(), not tuple(), for empty execute. - cleaned up plain SQL param handling
Diffstat (limited to 'test')
-rw-r--r--test/engine/test_execute.py67
-rw-r--r--test/sql/test_rowcount.py9
2 files changed, 51 insertions, 25 deletions
diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py
index 4a45fceb3..1752fda0d 100644
--- a/test/engine/test_execute.py
+++ b/test/engine/test_execute.py
@@ -12,9 +12,13 @@ users, metadata = None, None
class ExecuteTest(TestBase):
@classmethod
def setup_class(cls):
- global users, metadata
+ global users, users_autoinc, metadata
metadata = MetaData(testing.db)
users = Table('users', metadata,
+ Column('user_id', INT, primary_key = True, autoincrement=False),
+ Column('user_name', VARCHAR(20)),
+ )
+ users_autoinc = Table('users_autoinc', metadata,
Column('user_id', INT, primary_key = True, test_needs_autoincrement=True),
Column('user_name', VARCHAR(20)),
)
@@ -28,16 +32,22 @@ class ExecuteTest(TestBase):
def teardown_class(cls):
metadata.drop_all()
- @testing.fails_on_everything_except('firebird', 'maxdb', 'sqlite', 'mysql+pyodbc', '+zxjdbc', 'mysql+oursql')
+ @testing.fails_on_everything_except('firebird', 'maxdb', 'sqlite', '+pyodbc', '+mxodbc', '+zxjdbc', 'mysql+oursql')
def test_raw_qmark(self):
for conn in (testing.db, testing.db.connect()):
conn.execute("insert into users (user_id, user_name) values (?, ?)", (1,"jack"))
conn.execute("insert into users (user_id, user_name) values (?, ?)", [2,"fred"])
- conn.execute("insert into users (user_id, user_name) values (?, ?)", [3,"ed"], [4,"horse"])
- conn.execute("insert into users (user_id, user_name) values (?, ?)", (5,"barney"), (6,"donkey"))
+ conn.execute("insert into users (user_id, user_name) values (?, ?)",
+ [3,"ed"],
+ [4,"horse"])
+ conn.execute("insert into users (user_id, user_name) values (?, ?)",
+ (5,"barney"), (6,"donkey"))
conn.execute("insert into users (user_id, user_name) values (?, ?)", 7, 'sally')
res = conn.execute("select * from users order by user_id")
- assert res.fetchall() == [(1, "jack"), (2, "fred"), (3, "ed"), (4, "horse"), (5, "barney"), (6, "donkey"), (7, 'sally')]
+ assert res.fetchall() == [(1, "jack"), (2, "fred"),
+ (3, "ed"), (4, "horse"),
+ (5, "barney"), (6, "donkey"),
+ (7, 'sally')]
conn.execute("delete from users")
@testing.fails_on_everything_except('mysql+mysqldb', 'postgresql')
@@ -46,11 +56,15 @@ class ExecuteTest(TestBase):
def test_raw_sprintf(self):
for conn in (testing.db, testing.db.connect()):
conn.execute("insert into users (user_id, user_name) values (%s, %s)", [1,"jack"])
- conn.execute("insert into users (user_id, user_name) values (%s, %s)", [2,"ed"], [3,"horse"])
+ conn.execute("insert into users (user_id, user_name) values (%s, %s)",
+ [2,"ed"],
+ [3,"horse"])
conn.execute("insert into users (user_id, user_name) values (%s, %s)", 4, 'sally')
conn.execute("insert into users (user_id) values (%s)", 5)
res = conn.execute("select * from users order by user_id")
- assert res.fetchall() == [(1, "jack"), (2, "ed"), (3, "horse"), (4, 'sally'), (5, None)]
+ assert res.fetchall() == [(1, "jack"), (2, "ed"),
+ (3, "horse"), (4, 'sally'),
+ (5, None)]
conn.execute("delete from users")
# pyformat is supported for mysql, but skipping because a few driver
@@ -59,9 +73,12 @@ class ExecuteTest(TestBase):
@testing.fails_on_everything_except('postgresql+psycopg2', 'postgresql+pypostgresql')
def test_raw_python(self):
for conn in (testing.db, testing.db.connect()):
- conn.execute("insert into users (user_id, user_name) values (%(id)s, %(name)s)", {'id':1, 'name':'jack'})
- conn.execute("insert into users (user_id, user_name) values (%(id)s, %(name)s)", {'id':2, 'name':'ed'}, {'id':3, 'name':'horse'})
- conn.execute("insert into users (user_id, user_name) values (%(id)s, %(name)s)", id=4, name='sally')
+ conn.execute("insert into users (user_id, user_name) values (%(id)s, %(name)s)",
+ {'id':1, 'name':'jack'})
+ conn.execute("insert into users (user_id, user_name) values (%(id)s, %(name)s)",
+ {'id':2, 'name':'ed'}, {'id':3, 'name':'horse'})
+ conn.execute("insert into users (user_id, user_name) values (%(id)s, %(name)s)",
+ id=4, name='sally')
res = conn.execute("select * from users order by user_id")
assert res.fetchall() == [(1, "jack"), (2, "ed"), (3, "horse"), (4, 'sally')]
conn.execute("delete from users")
@@ -69,9 +86,12 @@ class ExecuteTest(TestBase):
@testing.fails_on_everything_except('sqlite', 'oracle+cx_oracle')
def test_raw_named(self):
for conn in (testing.db, testing.db.connect()):
- conn.execute("insert into users (user_id, user_name) values (:id, :name)", {'id':1, 'name':'jack'})
- conn.execute("insert into users (user_id, user_name) values (:id, :name)", {'id':2, 'name':'ed'}, {'id':3, 'name':'horse'})
- conn.execute("insert into users (user_id, user_name) values (:id, :name)", id=4, name='sally')
+ conn.execute("insert into users (user_id, user_name) values (:id, :name)",
+ {'id':1, 'name':'jack'})
+ conn.execute("insert into users (user_id, user_name) values (:id, :name)",
+ {'id':2, 'name':'ed'}, {'id':3, 'name':'horse'})
+ conn.execute("insert into users (user_id, user_name) values (:id, :name)",
+ id=4, name='sally')
res = conn.execute("select * from users order by user_id")
assert res.fetchall() == [(1, "jack"), (2, "ed"), (3, "horse"), (4, 'sally')]
conn.execute("delete from users")
@@ -86,8 +106,8 @@ class ExecuteTest(TestBase):
def test_empty_insert(self):
"""test that execute() interprets [] as a list with no params"""
- result = testing.db.execute(users.insert().values(user_name=bindparam('name')), [])
- eq_(testing.db.execute(users.select()).fetchall(), [
+ result = testing.db.execute(users_autoinc.insert().values(user_name=bindparam('name')), [])
+ eq_(testing.db.execute(users_autoinc.select()).fetchall(), [
(1, None)
])
@@ -124,17 +144,25 @@ class ProxyConnectionTest(TestBase):
for engine in (
engines.testing_engine(options=dict(implicit_returning=False, proxy=MyProxy())),
- engines.testing_engine(options=dict(implicit_returning=False, proxy=MyProxy(), strategy='threadlocal'))
+ engines.testing_engine(options=dict(
+ implicit_returning=False,
+ proxy=MyProxy(),
+ strategy='threadlocal'))
):
m = MetaData(engine)
- t1 = Table('t1', m, Column('c1', Integer, primary_key=True), Column('c2', String(50), default=func.lower('Foo'), primary_key=True))
+ t1 = Table('t1', m,
+ Column('c1', Integer, primary_key=True),
+ Column('c2', String(50), default=func.lower('Foo'), primary_key=True)
+ )
m.create_all()
try:
t1.insert().execute(c1=5, c2='some data')
t1.insert().execute(c1=6)
- assert engine.execute("select * from t1").fetchall() == [(5, 'some data'), (6, 'foo')]
+ eq_(engine.execute("select * from t1").fetchall(),
+ [(5, 'some data'), (6, 'foo')]
+ )
finally:
m.drop_all()
@@ -165,7 +193,8 @@ class ProxyConnectionTest(TestBase):
cursor = [
("CREATE TABLE t1", {}, ()),
("INSERT INTO t1 (c1, c2)", {'c2': 'some data', 'c1': 5}, (5, 'some data')),
- ("INSERT INTO t1 (c1, c2)", {'c1': 6, "lower_2":"Foo"}, insert2_params), # bind param name 'lower_2' might be incorrect
+ # bind param name 'lower_2' might be incorrect
+ ("INSERT INTO t1 (c1, c2)", {'c1': 6, "lower_2":"Foo"}, insert2_params),
("select * from t1", {}, ()),
("DROP TABLE t1", {}, ())
]
diff --git a/test/sql/test_rowcount.py b/test/sql/test_rowcount.py
index 6da25b914..9577b104a 100644
--- a/test/sql/test_rowcount.py
+++ b/test/sql/test_rowcount.py
@@ -54,22 +54,19 @@ class FoundRowsTest(TestBase, AssertsExecutionResults):
department = employees_table.c.department
r = employees_table.update(department=='C').execute(department='Z')
print "expecting 3, dialect reports %s" % r.rowcount
- if testing.db.dialect.supports_sane_rowcount:
- assert r.rowcount == 3
+ assert r.rowcount == 3
def test_update_rowcount2(self):
# WHERE matches 3, 0 rows changed
department = employees_table.c.department
r = employees_table.update(department=='C').execute(department='C')
print "expecting 3, dialect reports %s" % r.rowcount
- if testing.db.dialect.supports_sane_rowcount:
- assert r.rowcount == 3
+ assert r.rowcount == 3
def test_delete_rowcount(self):
# WHERE matches 3, 3 rows deleted
department = employees_table.c.department
r = employees_table.delete(department=='C').execute()
print "expecting 3, dialect reports %s" % r.rowcount
- if testing.db.dialect.supports_sane_rowcount:
- assert r.rowcount == 3
+ assert r.rowcount == 3