summaryrefslogtreecommitdiff
path: root/test/dialect
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2009-08-06 21:11:27 +0000
committerMike Bayer <mike_mp@zzzcomputing.com>2009-08-06 21:11:27 +0000
commit8fc5005dfe3eb66a46470ad8a8c7b95fc4d6bdca (patch)
treeae9e27d12c9fbf8297bb90469509e1cb6a206242 /test/dialect
parent7638aa7f242c6ea3d743aa9100e32be2052546a6 (diff)
downloadsqlalchemy-8fc5005dfe3eb66a46470ad8a8c7b95fc4d6bdca.tar.gz
merge 0.6 series to trunk.
Diffstat (limited to 'test/dialect')
-rw-r--r--test/dialect/test_firebird.py123
-rw-r--r--test/dialect/test_informix.py3
-rw-r--r--test/dialect/test_maxdb.py2
-rw-r--r--test/dialect/test_mssql.py532
-rw-r--r--test/dialect/test_mysql.py219
-rw-r--r--test/dialect/test_oracle.py53
-rw-r--r--test/dialect/test_postgresql.py (renamed from test/dialect/test_postgres.py)553
-rw-r--r--test/dialect/test_sqlite.py117
8 files changed, 906 insertions, 696 deletions
diff --git a/test/dialect/test_firebird.py b/test/dialect/test_firebird.py
index fa608c9a1..2dc6af91b 100644
--- a/test/dialect/test_firebird.py
+++ b/test/dialect/test_firebird.py
@@ -50,6 +50,7 @@ class DomainReflectionTest(TestBase, AssertsExecutionResults):
con.execute('DROP GENERATOR gen_testtable_id')
def test_table_is_reflected(self):
+ from sqlalchemy.types import Integer, Text, Binary, String, Date, Time, DateTime
metadata = MetaData(testing.db)
table = Table('testtable', metadata, autoload=True)
eq_(set(table.columns.keys()),
@@ -57,17 +58,17 @@ class DomainReflectionTest(TestBase, AssertsExecutionResults):
"Columns of reflected table didn't equal expected columns")
eq_(table.c.question.primary_key, True)
eq_(table.c.question.sequence.name, 'gen_testtable_id')
- eq_(table.c.question.type.__class__, firebird.FBInteger)
+ assert isinstance(table.c.question.type, Integer)
eq_(table.c.question.server_default.arg.text, "42")
- eq_(table.c.answer.type.__class__, firebird.FBString)
+ assert isinstance(table.c.answer.type, String)
eq_(table.c.answer.server_default.arg.text, "'no answer'")
- eq_(table.c.remark.type.__class__, firebird.FBText)
+ assert isinstance(table.c.remark.type, Text)
eq_(table.c.remark.server_default.arg.text, "''")
- eq_(table.c.photo.type.__class__, firebird.FBBinary)
+ assert isinstance(table.c.photo.type, Binary)
# The following assume a Dialect 3 database
- eq_(table.c.d.type.__class__, firebird.FBDate)
- eq_(table.c.t.type.__class__, firebird.FBTime)
- eq_(table.c.dt.type.__class__, firebird.FBDateTime)
+ assert isinstance(table.c.d.type, Date)
+ assert isinstance(table.c.t.type, Time)
+ assert isinstance(table.c.dt.type, DateTime)
class CompileTest(TestBase, AssertsCompiledSQL):
@@ -76,7 +77,13 @@ class CompileTest(TestBase, AssertsCompiledSQL):
def test_alias(self):
t = table('sometable', column('col1'), column('col2'))
s = select([t.alias()])
- self.assert_compile(s, "SELECT sometable_1.col1, sometable_1.col2 FROM sometable sometable_1")
+ self.assert_compile(s, "SELECT sometable_1.col1, sometable_1.col2 FROM sometable AS sometable_1")
+
+ dialect = firebird.FBDialect()
+ dialect._version_two = False
+ self.assert_compile(s, "SELECT sometable_1.col1, sometable_1.col2 FROM sometable sometable_1",
+ dialect = dialect
+ )
def test_function(self):
self.assert_compile(func.foo(1, 2), "foo(:foo_1, :foo_2)")
@@ -98,15 +105,15 @@ class CompileTest(TestBase, AssertsCompiledSQL):
column('description', String(128)),
)
- u = update(table1, values=dict(name='foo'), firebird_returning=[table1.c.myid, table1.c.name])
+ u = update(table1, values=dict(name='foo')).returning(table1.c.myid, table1.c.name)
self.assert_compile(u, "UPDATE mytable SET name=:name RETURNING mytable.myid, mytable.name")
- u = update(table1, values=dict(name='foo'), firebird_returning=[table1])
+ u = update(table1, values=dict(name='foo')).returning(table1)
self.assert_compile(u, "UPDATE mytable SET name=:name "\
"RETURNING mytable.myid, mytable.name, mytable.description")
- u = update(table1, values=dict(name='foo'), firebird_returning=[func.length(table1.c.name)])
- self.assert_compile(u, "UPDATE mytable SET name=:name RETURNING char_length(mytable.name)")
+ u = update(table1, values=dict(name='foo')).returning(func.length(table1.c.name))
+ self.assert_compile(u, "UPDATE mytable SET name=:name RETURNING char_length(mytable.name) AS length_1")
def test_insert_returning(self):
table1 = table('mytable',
@@ -115,90 +122,20 @@ class CompileTest(TestBase, AssertsCompiledSQL):
column('description', String(128)),
)
- i = insert(table1, values=dict(name='foo'), firebird_returning=[table1.c.myid, table1.c.name])
+ i = insert(table1, values=dict(name='foo')).returning(table1.c.myid, table1.c.name)
self.assert_compile(i, "INSERT INTO mytable (name) VALUES (:name) RETURNING mytable.myid, mytable.name")
- i = insert(table1, values=dict(name='foo'), firebird_returning=[table1])
+ i = insert(table1, values=dict(name='foo')).returning(table1)
self.assert_compile(i, "INSERT INTO mytable (name) VALUES (:name) "\
"RETURNING mytable.myid, mytable.name, mytable.description")
- i = insert(table1, values=dict(name='foo'), firebird_returning=[func.length(table1.c.name)])
- self.assert_compile(i, "INSERT INTO mytable (name) VALUES (:name) RETURNING char_length(mytable.name)")
+ i = insert(table1, values=dict(name='foo')).returning(func.length(table1.c.name))
+ self.assert_compile(i, "INSERT INTO mytable (name) VALUES (:name) RETURNING char_length(mytable.name) AS length_1")
-class ReturningTest(TestBase, AssertsExecutionResults):
- __only_on__ = 'firebird'
-
- @testing.exclude('firebird', '<', (2, 1), '2.1+ feature')
- def test_update_returning(self):
- meta = MetaData(testing.db)
- table = Table('tables', meta,
- Column('id', Integer, Sequence('gen_tables_id'), primary_key=True),
- Column('persons', Integer),
- Column('full', Boolean)
- )
- table.create()
- try:
- table.insert().execute([{'persons': 5, 'full': False}, {'persons': 3, 'full': False}])
-
- result = table.update(table.c.persons > 4, dict(full=True), firebird_returning=[table.c.id]).execute()
- eq_(result.fetchall(), [(1,)])
-
- result2 = select([table.c.id, table.c.full]).order_by(table.c.id).execute()
- eq_(result2.fetchall(), [(1,True),(2,False)])
- finally:
- table.drop()
-
- @testing.exclude('firebird', '<', (2, 0), '2.0+ feature')
- def test_insert_returning(self):
- meta = MetaData(testing.db)
- table = Table('tables', meta,
- Column('id', Integer, Sequence('gen_tables_id'), primary_key=True),
- Column('persons', Integer),
- Column('full', Boolean)
- )
- table.create()
- try:
- result = table.insert(firebird_returning=[table.c.id]).execute({'persons': 1, 'full': False})
-
- eq_(result.fetchall(), [(1,)])
-
- # Multiple inserts only return the last row
- result2 = table.insert(firebird_returning=[table]).execute(
- [{'persons': 2, 'full': False}, {'persons': 3, 'full': True}])
-
- eq_(result2.fetchall(), [(3,3,True)])
-
- result3 = table.insert(firebird_returning=[table.c.id]).execute({'persons': 4, 'full': False})
- eq_([dict(row) for row in result3], [{'ID':4}])
-
- result4 = testing.db.execute('insert into tables (id, persons, "full") values (5, 10, 1) returning persons')
- eq_([dict(row) for row in result4], [{'PERSONS': 10}])
- finally:
- table.drop()
-
- @testing.exclude('firebird', '<', (2, 1), '2.1+ feature')
- def test_delete_returning(self):
- meta = MetaData(testing.db)
- table = Table('tables', meta,
- Column('id', Integer, Sequence('gen_tables_id'), primary_key=True),
- Column('persons', Integer),
- Column('full', Boolean)
- )
- table.create()
- try:
- table.insert().execute([{'persons': 5, 'full': False}, {'persons': 3, 'full': False}])
-
- result = table.delete(table.c.persons > 4, firebird_returning=[table.c.id]).execute()
- eq_(result.fetchall(), [(1,)])
-
- result2 = select([table.c.id, table.c.full]).order_by(table.c.id).execute()
- eq_(result2.fetchall(), [(2,False),])
- finally:
- table.drop()
-class MiscFBTests(TestBase):
+class MiscTest(TestBase):
__only_on__ = 'firebird'
def test_strlen(self):
@@ -217,12 +154,20 @@ class MiscFBTests(TestBase):
try:
t.insert(values=dict(name='dante')).execute()
t.insert(values=dict(name='alighieri')).execute()
- select([func.count(t.c.id)],func.length(t.c.name)==5).execute().fetchone()[0] == 1
+ select([func.count(t.c.id)],func.length(t.c.name)==5).execute().first()[0] == 1
finally:
meta.drop_all()
def test_server_version_info(self):
- version = testing.db.dialect.server_version_info(testing.db.connect())
+ version = testing.db.dialect.server_version_info
assert len(version) == 3, "Got strange version info: %s" % repr(version)
+ def test_percents_in_text(self):
+ for expr, result in (
+ (text("select '%' from rdb$database"), '%'),
+ (text("select '%%' from rdb$database"), '%%'),
+ (text("select '%%%' from rdb$database"), '%%%'),
+ (text("select 'hello % world' from rdb$database"), "hello % world")
+ ):
+ eq_(testing.db.scalar(expr), result)
diff --git a/test/dialect/test_informix.py b/test/dialect/test_informix.py
index 86a4e751d..e647990d3 100644
--- a/test/dialect/test_informix.py
+++ b/test/dialect/test_informix.py
@@ -4,7 +4,8 @@ from sqlalchemy.test import *
class CompileTest(TestBase, AssertsCompiledSQL):
- __dialect__ = informix.InfoDialect()
+ __only_on__ = 'informix'
+ __dialect__ = informix.InformixDialect()
def test_statements(self):
meta =MetaData()
diff --git a/test/dialect/test_maxdb.py b/test/dialect/test_maxdb.py
index 033a05533..c69a81120 100644
--- a/test/dialect/test_maxdb.py
+++ b/test/dialect/test_maxdb.py
@@ -185,7 +185,7 @@ class DBAPITest(TestBase, AssertsExecutionResults):
vals = []
for i in xrange(3):
cr.execute('SELECT busto.NEXTVAL FROM DUAL')
- vals.append(cr.fetchone()[0])
+ vals.append(cr.first()[0])
# should be 1,2,3, but no...
self.assert_(vals != [1,2,3])
diff --git a/test/dialect/test_mssql.py b/test/dialect/test_mssql.py
index dd86ce0de..423310db6 100644
--- a/test/dialect/test_mssql.py
+++ b/test/dialect/test_mssql.py
@@ -2,17 +2,18 @@
from sqlalchemy.test.testing import eq_
import datetime, os, re
from sqlalchemy import *
-from sqlalchemy import types, exc
+from sqlalchemy import types, exc, schema
from sqlalchemy.orm import *
from sqlalchemy.sql import table, column
from sqlalchemy.databases import mssql
-import sqlalchemy.engine.url as url
+from sqlalchemy.dialects.mssql import pyodbc
+from sqlalchemy.engine import url
from sqlalchemy.test import *
from sqlalchemy.test.testing import eq_
class CompileTest(TestBase, AssertsCompiledSQL):
- __dialect__ = mssql.MSSQLDialect()
+ __dialect__ = mssql.dialect()
def test_insert(self):
t = table('sometable', column('somecolumn'))
@@ -157,6 +158,45 @@ class CompileTest(TestBase, AssertsCompiledSQL):
select([extract(field, t.c.col1)]),
'SELECT DATEPART("%s", t.col1) AS anon_1 FROM t' % field)
+ def test_update_returning(self):
+ table1 = table('mytable',
+ column('myid', Integer),
+ column('name', String(128)),
+ column('description', String(128)),
+ )
+
+ u = update(table1, values=dict(name='foo')).returning(table1.c.myid, table1.c.name)
+ self.assert_compile(u, "UPDATE mytable SET name=:name OUTPUT inserted.myid, inserted.name")
+
+ u = update(table1, values=dict(name='foo')).returning(table1)
+ self.assert_compile(u, "UPDATE mytable SET name=:name OUTPUT inserted.myid, "
+ "inserted.name, inserted.description")
+
+ u = update(table1, values=dict(name='foo')).returning(table1).where(table1.c.name=='bar')
+ self.assert_compile(u, "UPDATE mytable SET name=:name OUTPUT inserted.myid, "
+ "inserted.name, inserted.description WHERE mytable.name = :name_1")
+
+ u = update(table1, values=dict(name='foo')).returning(func.length(table1.c.name))
+ self.assert_compile(u, "UPDATE mytable SET name=:name OUTPUT LEN(inserted.name) AS length_1")
+
+ def test_insert_returning(self):
+ table1 = table('mytable',
+ column('myid', Integer),
+ column('name', String(128)),
+ column('description', String(128)),
+ )
+
+ i = insert(table1, values=dict(name='foo')).returning(table1.c.myid, table1.c.name)
+ self.assert_compile(i, "INSERT INTO mytable (name) OUTPUT inserted.myid, inserted.name VALUES (:name)")
+
+ i = insert(table1, values=dict(name='foo')).returning(table1)
+ self.assert_compile(i, "INSERT INTO mytable (name) OUTPUT inserted.myid, "
+ "inserted.name, inserted.description VALUES (:name)")
+
+ i = insert(table1, values=dict(name='foo')).returning(func.length(table1.c.name))
+ self.assert_compile(i, "INSERT INTO mytable (name) OUTPUT LEN(inserted.name) AS length_1 VALUES (:name)")
+
+
class IdentityInsertTest(TestBase, AssertsCompiledSQL):
__only_on__ = 'mssql'
@@ -189,9 +229,9 @@ class IdentityInsertTest(TestBase, AssertsCompiledSQL):
eq_([(9, 'Python')], list(cats))
result = cattable.insert().values(description='PHP').execute()
- eq_([10], result.last_inserted_ids())
+ eq_([10], result.inserted_primary_key)
lastcat = cattable.select().order_by(desc(cattable.c.id)).execute()
- eq_((10, 'PHP'), lastcat.fetchone())
+ eq_((10, 'PHP'), lastcat.first())
def test_executemany(self):
cattable.insert().execute([
@@ -213,10 +253,51 @@ class IdentityInsertTest(TestBase, AssertsCompiledSQL):
eq_([(91, 'Smalltalk'), (90, 'PHP')], list(lastcats))
-class ReflectionTest(TestBase):
+class ReflectionTest(TestBase, ComparesTables):
__only_on__ = 'mssql'
- def testidentity(self):
+ def test_basic_reflection(self):
+ meta = MetaData(testing.db)
+
+ users = Table('engine_users', meta,
+ Column('user_id', types.INT, primary_key=True),
+ Column('user_name', types.VARCHAR(20), nullable=False),
+ Column('test1', types.CHAR(5), nullable=False),
+ Column('test2', types.Float(5), nullable=False),
+ Column('test3', types.Text),
+ Column('test4', types.Numeric, nullable = False),
+ Column('test5', types.DateTime),
+ Column('parent_user_id', types.Integer,
+ ForeignKey('engine_users.user_id')),
+ Column('test6', types.DateTime, nullable=False),
+ Column('test7', types.Text),
+ Column('test8', types.Binary),
+ Column('test_passivedefault2', types.Integer, server_default='5'),
+ Column('test9', types.Binary(100)),
+ Column('test_numeric', types.Numeric()),
+ test_needs_fk=True,
+ )
+
+ addresses = Table('engine_email_addresses', meta,
+ Column('address_id', types.Integer, primary_key = True),
+ Column('remote_user_id', types.Integer, ForeignKey(users.c.user_id)),
+ Column('email_address', types.String(20)),
+ test_needs_fk=True,
+ )
+ meta.create_all()
+
+ try:
+ meta2 = MetaData()
+ reflected_users = Table('engine_users', meta2, autoload=True,
+ autoload_with=testing.db)
+ reflected_addresses = Table('engine_email_addresses', meta2,
+ autoload=True, autoload_with=testing.db)
+ self.assert_tables_equal(users, reflected_users)
+ self.assert_tables_equal(addresses, reflected_addresses)
+ finally:
+ meta.drop_all()
+
+ def test_identity(self):
meta = MetaData(testing.db)
table = Table(
'identity_test', meta,
@@ -240,7 +321,7 @@ class QueryUnicodeTest(TestBase):
meta = MetaData(testing.db)
t1 = Table('unitest_table', meta,
Column('id', Integer, primary_key=True),
- Column('descr', mssql.MSText(200, convert_unicode=True)))
+ Column('descr', mssql.MSText(convert_unicode=True)))
meta.create_all()
con = testing.db.connect()
@@ -248,7 +329,7 @@ class QueryUnicodeTest(TestBase):
con.execute(u"insert into unitest_table values ('bien mangé')".encode('UTF-8'))
try:
- r = t1.select().execute().fetchone()
+ r = t1.select().execute().first()
assert isinstance(r[1], unicode), '%s is %s instead of unicode, working on %s' % (
r[1], type(r[1]), meta.bind)
@@ -262,7 +343,9 @@ class QueryTest(TestBase):
meta = MetaData(testing.db)
t1 = Table('t1', meta,
Column('id', Integer, Sequence('fred', 100, 1), primary_key=True),
- Column('descr', String(200)))
+ Column('descr', String(200)),
+ implicit_returning = False
+ )
t2 = Table('t2', meta,
Column('id', Integer, Sequence('fred', 200, 1), primary_key=True),
Column('descr', String(200)))
@@ -274,9 +357,9 @@ class QueryTest(TestBase):
try:
tr = con.begin()
r = con.execute(t2.insert(), descr='hello')
- self.assert_(r.last_inserted_ids() == [200])
+ self.assert_(r.inserted_primary_key == [200])
r = con.execute(t1.insert(), descr='hello')
- self.assert_(r.last_inserted_ids() == [100])
+ self.assert_(r.inserted_primary_key == [100])
finally:
tr.commit()
@@ -295,6 +378,19 @@ class QueryTest(TestBase):
tbl.drop()
con.execute('drop schema paj')
+ def test_returning_no_autoinc(self):
+ meta = MetaData(testing.db)
+
+ table = Table('t1', meta, Column('id', Integer, primary_key=True), Column('data', String(50)))
+ table.create()
+ try:
+ result = table.insert().values(id=1, data=func.lower("SomeString")).returning(table.c.id, table.c.data).execute()
+ eq_(result.fetchall(), [(1, 'somestring',)])
+ finally:
+ # this will hang if the "SET IDENTITY_INSERT t1 OFF" occurs before the
+ # result is fetched
+ table.drop()
+
def test_delete_schema(self):
meta = MetaData(testing.db)
con = testing.db.connect()
@@ -371,36 +467,26 @@ class SchemaTest(TestBase):
)
self.column = t.c.test_column
+ dialect = mssql.dialect()
+ self.ddl_compiler = dialect.ddl_compiler(dialect, schema.CreateTable(t))
+
+ def _column_spec(self):
+ return self.ddl_compiler.get_column_specification(self.column)
+
def test_that_mssql_default_nullability_emits_null(self):
- schemagenerator = \
- mssql.MSSQLDialect().schemagenerator(mssql.MSSQLDialect(), None)
- column_specification = \
- schemagenerator.get_column_specification(self.column)
- eq_("test_column VARCHAR NULL", column_specification)
+ eq_("test_column VARCHAR NULL", self._column_spec())
def test_that_mssql_none_nullability_does_not_emit_nullability(self):
- schemagenerator = \
- mssql.MSSQLDialect().schemagenerator(mssql.MSSQLDialect(), None)
self.column.nullable = None
- column_specification = \
- schemagenerator.get_column_specification(self.column)
- eq_("test_column VARCHAR", column_specification)
+ eq_("test_column VARCHAR", self._column_spec())
def test_that_mssql_specified_nullable_emits_null(self):
- schemagenerator = \
- mssql.MSSQLDialect().schemagenerator(mssql.MSSQLDialect(), None)
self.column.nullable = True
- column_specification = \
- schemagenerator.get_column_specification(self.column)
- eq_("test_column VARCHAR NULL", column_specification)
+ eq_("test_column VARCHAR NULL", self._column_spec())
def test_that_mssql_specified_not_nullable_emits_not_null(self):
- schemagenerator = \
- mssql.MSSQLDialect().schemagenerator(mssql.MSSQLDialect(), None)
self.column.nullable = False
- column_specification = \
- schemagenerator.get_column_specification(self.column)
- eq_("test_column VARCHAR NOT NULL", column_specification)
+ eq_("test_column VARCHAR NOT NULL", self._column_spec())
def full_text_search_missing():
@@ -515,79 +601,73 @@ class MatchTest(TestBase, AssertsCompiledSQL):
class ParseConnectTest(TestBase, AssertsCompiledSQL):
__only_on__ = 'mssql'
+ @classmethod
+ def setup_class(cls):
+ global dialect
+ dialect = pyodbc.MSDialect_pyodbc()
+
def test_pyodbc_connect_dsn_trusted(self):
u = url.make_url('mssql://mydsn')
- dialect = mssql.MSSQLDialect_pyodbc()
connection = dialect.create_connect_args(u)
eq_([['dsn=mydsn;TrustedConnection=Yes'], {}], connection)
def test_pyodbc_connect_old_style_dsn_trusted(self):
u = url.make_url('mssql:///?dsn=mydsn')
- dialect = mssql.MSSQLDialect_pyodbc()
connection = dialect.create_connect_args(u)
eq_([['dsn=mydsn;TrustedConnection=Yes'], {}], connection)
def test_pyodbc_connect_dsn_non_trusted(self):
u = url.make_url('mssql://username:password@mydsn')
- dialect = mssql.MSSQLDialect_pyodbc()
connection = dialect.create_connect_args(u)
eq_([['dsn=mydsn;UID=username;PWD=password'], {}], connection)
def test_pyodbc_connect_dsn_extra(self):
u = url.make_url('mssql://username:password@mydsn/?LANGUAGE=us_english&foo=bar')
- dialect = mssql.MSSQLDialect_pyodbc()
connection = dialect.create_connect_args(u)
eq_([['dsn=mydsn;UID=username;PWD=password;LANGUAGE=us_english;foo=bar'], {}], connection)
def test_pyodbc_connect(self):
u = url.make_url('mssql://username:password@hostspec/database')
- dialect = mssql.MSSQLDialect_pyodbc()
connection = dialect.create_connect_args(u)
eq_([['DRIVER={SQL Server};Server=hostspec;Database=database;UID=username;PWD=password'], {}], connection)
def test_pyodbc_connect_comma_port(self):
u = url.make_url('mssql://username:password@hostspec:12345/database')
- dialect = mssql.MSSQLDialect_pyodbc()
connection = dialect.create_connect_args(u)
eq_([['DRIVER={SQL Server};Server=hostspec,12345;Database=database;UID=username;PWD=password'], {}], connection)
def test_pyodbc_connect_config_port(self):
u = url.make_url('mssql://username:password@hostspec/database?port=12345')
- dialect = mssql.MSSQLDialect_pyodbc()
connection = dialect.create_connect_args(u)
eq_([['DRIVER={SQL Server};Server=hostspec;Database=database;UID=username;PWD=password;port=12345'], {}], connection)
def test_pyodbc_extra_connect(self):
u = url.make_url('mssql://username:password@hostspec/database?LANGUAGE=us_english&foo=bar')
- dialect = mssql.MSSQLDialect_pyodbc()
connection = dialect.create_connect_args(u)
eq_([['DRIVER={SQL Server};Server=hostspec;Database=database;UID=username;PWD=password;foo=bar;LANGUAGE=us_english'], {}], connection)
def test_pyodbc_odbc_connect(self):
u = url.make_url('mssql:///?odbc_connect=DRIVER%3D%7BSQL+Server%7D%3BServer%3Dhostspec%3BDatabase%3Ddatabase%3BUID%3Dusername%3BPWD%3Dpassword')
- dialect = mssql.MSSQLDialect_pyodbc()
connection = dialect.create_connect_args(u)
eq_([['DRIVER={SQL Server};Server=hostspec;Database=database;UID=username;PWD=password'], {}], connection)
def test_pyodbc_odbc_connect_with_dsn(self):
u = url.make_url('mssql:///?odbc_connect=dsn%3Dmydsn%3BDatabase%3Ddatabase%3BUID%3Dusername%3BPWD%3Dpassword')
- dialect = mssql.MSSQLDialect_pyodbc()
connection = dialect.create_connect_args(u)
eq_([['dsn=mydsn;Database=database;UID=username;PWD=password'], {}], connection)
def test_pyodbc_odbc_connect_ignores_other_values(self):
u = url.make_url('mssql://userdiff:passdiff@localhost/dbdiff?odbc_connect=DRIVER%3D%7BSQL+Server%7D%3BServer%3Dhostspec%3BDatabase%3Ddatabase%3BUID%3Dusername%3BPWD%3Dpassword')
- dialect = mssql.MSSQLDialect_pyodbc()
connection = dialect.create_connect_args(u)
eq_([['DRIVER={SQL Server};Server=hostspec;Database=database;UID=username;PWD=password'], {}], connection)
-class TypesTest(TestBase):
+class TypesTest(TestBase, AssertsExecutionResults, ComparesTables):
__only_on__ = 'mssql'
@classmethod
def setup_class(cls):
- global numeric_table, metadata
+ global metadata
metadata = MetaData(testing.db)
def teardown(self):
@@ -601,26 +681,22 @@ class TypesTest(TestBase):
)
metadata.create_all()
- try:
- test_items = [decimal.Decimal(d) for d in '1500000.00000000000000000000',
- '-1500000.00000000000000000000', '1500000',
- '0.0000000000000000002', '0.2', '-0.0000000000000000002', '-2E-2',
- '156666.458923543', '-156666.458923543', '1', '-1', '-1234', '1234',
- '2E-12', '4E8', '3E-6', '3E-7', '4.1', '1E-1', '1E-2', '1E-3',
- '1E-4', '1E-5', '1E-6', '1E-7', '1E-1', '1E-8', '0.2732E2', '-0.2432E2', '4.35656E2',
- '-02452E-2', '45125E-2',
- '1234.58965E-2', '1.521E+15', '-1E-25', '1E-25', '1254E-25', '-1203E-25',
- '0', '-0.00', '-0', '4585E12', '000000000000000000012', '000000000000.32E12',
- '00000000000000.1E+12', '000000000000.2E-32']
+ test_items = [decimal.Decimal(d) for d in '1500000.00000000000000000000',
+ '-1500000.00000000000000000000', '1500000',
+ '0.0000000000000000002', '0.2', '-0.0000000000000000002', '-2E-2',
+ '156666.458923543', '-156666.458923543', '1', '-1', '-1234', '1234',
+ '2E-12', '4E8', '3E-6', '3E-7', '4.1', '1E-1', '1E-2', '1E-3',
+ '1E-4', '1E-5', '1E-6', '1E-7', '1E-1', '1E-8', '0.2732E2', '-0.2432E2', '4.35656E2',
+ '-02452E-2', '45125E-2',
+ '1234.58965E-2', '1.521E+15', '-1E-25', '1E-25', '1254E-25', '-1203E-25',
+ '0', '-0.00', '-0', '4585E12', '000000000000000000012', '000000000000.32E12',
+ '00000000000000.1E+12', '000000000000.2E-32']
- for value in test_items:
- numeric_table.insert().execute(numericcol=value)
+ for value in test_items:
+ numeric_table.insert().execute(numericcol=value)
- for value in select([numeric_table.c.numericcol]).execute():
- assert value[0] in test_items, "%s not in test_items" % value[0]
-
- except Exception, e:
- raise e
+ for value in select([numeric_table.c.numericcol]).execute():
+ assert value[0] in test_items, "%s not in test_items" % value[0]
def test_float(self):
float_table = Table('float_table', metadata,
@@ -643,11 +719,6 @@ class TypesTest(TestBase):
raise e
-class TypesTest2(TestBase, AssertsExecutionResults):
- "Test Microsoft SQL Server column types"
-
- __only_on__ = 'mssql'
-
def test_money(self):
"Exercise type specification for money types."
@@ -659,13 +730,14 @@ class TypesTest2(TestBase, AssertsExecutionResults):
'SMALLMONEY'),
]
- table_args = ['test_mssql_money', MetaData(testing.db)]
+ table_args = ['test_mssql_money', metadata]
for index, spec in enumerate(columns):
type_, args, kw, res = spec
table_args.append(Column('c%s' % index, type_(*args, **kw), nullable=None))
money_table = Table(*table_args)
- gen = testing.db.dialect.schemagenerator(testing.db.dialect, testing.db, None, None)
+ dialect = mssql.dialect()
+ gen = dialect.ddl_compiler(dialect, schema.CreateTable(money_table))
for col in money_table.c:
index = int(col.name[1:])
@@ -688,15 +760,27 @@ class TypesTest2(TestBase, AssertsExecutionResults):
(mssql.MSDateTime, [], {},
'DATETIME', []),
+ (types.DATE, [], {},
+ 'DATE', ['>=', (10,)]),
+ (types.Date, [], {},
+ 'DATE', ['>=', (10,)]),
+ (types.Date, [], {},
+ 'DATETIME', ['<', (10,)], mssql.MSDateTime),
(mssql.MSDate, [], {},
'DATE', ['>=', (10,)]),
(mssql.MSDate, [], {},
'DATETIME', ['<', (10,)], mssql.MSDateTime),
+ (types.TIME, [], {},
+ 'TIME', ['>=', (10,)]),
+ (types.Time, [], {},
+ 'TIME', ['>=', (10,)]),
(mssql.MSTime, [], {},
'TIME', ['>=', (10,)]),
(mssql.MSTime, [1], {},
'TIME(1)', ['>=', (10,)]),
+ (types.Time, [], {},
+ 'DATETIME', ['<', (10,)], mssql.MSDateTime),
(mssql.MSTime, [], {},
'DATETIME', ['<', (10,)], mssql.MSDateTime),
@@ -715,14 +799,14 @@ class TypesTest2(TestBase, AssertsExecutionResults):
]
- table_args = ['test_mssql_dates', MetaData(testing.db)]
+ table_args = ['test_mssql_dates', metadata]
for index, spec in enumerate(columns):
type_, args, kw, res, requires = spec[0:5]
if (requires and testing._is_excluded('mssql', *requires)) or not requires:
table_args.append(Column('c%s' % index, type_(*args, **kw), nullable=None))
dates_table = Table(*table_args)
- gen = testing.db.dialect.schemagenerator(testing.db.dialect, testing.db, None, None)
+ gen = testing.db.dialect.ddl_compiler(testing.db.dialect, schema.CreateTable(dates_table))
for col in dates_table.c:
index = int(col.name[1:])
@@ -730,49 +814,37 @@ class TypesTest2(TestBase, AssertsExecutionResults):
"%s %s" % (col.name, columns[index][3]))
self.assert_(repr(col))
- try:
- dates_table.create(checkfirst=True)
- assert True
- except:
- raise
+ dates_table.create(checkfirst=True)
reflected_dates = Table('test_mssql_dates', MetaData(testing.db), autoload=True)
for col in reflected_dates.c:
- index = int(col.name[1:])
- testing.eq_(testing.db.dialect.type_descriptor(col.type).__class__,
- len(columns[index]) > 5 and columns[index][5] or columns[index][0])
- dates_table.drop()
-
- def test_dates2(self):
- meta = MetaData(testing.db)
- t = Table('test_dates', meta,
- Column('id', Integer,
- Sequence('datetest_id_seq', optional=True),
- primary_key=True),
- Column('adate', Date),
- Column('atime', Time),
- Column('adatetime', DateTime))
- t.create(checkfirst=True)
- try:
- d1 = datetime.date(2007, 10, 30)
- t1 = datetime.time(11, 2, 32)
- d2 = datetime.datetime(2007, 10, 30, 11, 2, 32)
- t.insert().execute(adate=d1, adatetime=d2, atime=t1)
- t.insert().execute(adate=d2, adatetime=d2, atime=d2)
+ self.assert_types_base(col, dates_table.c[col.key])
- x = t.select().execute().fetchall()[0]
- self.assert_(x.adate.__class__ == datetime.date)
- self.assert_(x.atime.__class__ == datetime.time)
- self.assert_(x.adatetime.__class__ == datetime.datetime)
+ def test_date_roundtrip(self):
+ t = Table('test_dates', metadata,
+ Column('id', Integer,
+ Sequence('datetest_id_seq', optional=True),
+ primary_key=True),
+ Column('adate', Date),
+ Column('atime', Time),
+ Column('adatetime', DateTime))
+ metadata.create_all()
+ d1 = datetime.date(2007, 10, 30)
+ t1 = datetime.time(11, 2, 32)
+ d2 = datetime.datetime(2007, 10, 30, 11, 2, 32)
+ t.insert().execute(adate=d1, adatetime=d2, atime=t1)
+ t.insert().execute(adate=d2, adatetime=d2, atime=d2)
- t.delete().execute()
+ x = t.select().execute().fetchall()[0]
+ self.assert_(x.adate.__class__ == datetime.date)
+ self.assert_(x.atime.__class__ == datetime.time)
+ self.assert_(x.adatetime.__class__ == datetime.datetime)
- t.insert().execute(adate=d1, adatetime=d2, atime=t1)
+ t.delete().execute()
- eq_(select([t.c.adate, t.c.atime, t.c.adatetime], t.c.adate==d1).execute().fetchall(), [(d1, t1, d2)])
+ t.insert().execute(adate=d1, adatetime=d2, atime=t1)
- finally:
- t.drop(checkfirst=True)
+ eq_(select([t.c.adate, t.c.atime, t.c.adatetime], t.c.adate==d1).execute().fetchall(), [(d1, t1, d2)])
def test_binary(self):
"Exercise type specification for binary types."
@@ -781,6 +853,9 @@ class TypesTest2(TestBase, AssertsExecutionResults):
# column type, args, kwargs, expected ddl
(mssql.MSBinary, [], {},
'BINARY'),
+ (types.Binary, [10], {},
+ 'BINARY(10)'),
+
(mssql.MSBinary, [10], {},
'BINARY(10)'),
@@ -798,13 +873,14 @@ class TypesTest2(TestBase, AssertsExecutionResults):
'BINARY(10)')
]
- table_args = ['test_mssql_binary', MetaData(testing.db)]
+ table_args = ['test_mssql_binary', metadata]
for index, spec in enumerate(columns):
type_, args, kw, res = spec
table_args.append(Column('c%s' % index, type_(*args, **kw), nullable=None))
binary_table = Table(*table_args)
- gen = testing.db.dialect.schemagenerator(testing.db.dialect, testing.db, None, None)
+ dialect = mssql.dialect()
+ gen = dialect.ddl_compiler(dialect, schema.CreateTable(binary_table))
for col in binary_table.c:
index = int(col.name[1:])
@@ -812,22 +888,15 @@ class TypesTest2(TestBase, AssertsExecutionResults):
"%s %s" % (col.name, columns[index][3]))
self.assert_(repr(col))
- try:
- binary_table.create(checkfirst=True)
- assert True
- except:
- raise
+ metadata.create_all()
reflected_binary = Table('test_mssql_binary', MetaData(testing.db), autoload=True)
for col in reflected_binary.c:
- # don't test the MSGenericBinary since it's a special case and
- # reflected it will map to a MSImage or MSBinary depending
- if not testing.db.dialect.type_descriptor(binary_table.c[col.name].type).__class__ == mssql.MSGenericBinary:
- testing.eq_(testing.db.dialect.type_descriptor(col.type).__class__,
- testing.db.dialect.type_descriptor(binary_table.c[col.name].type).__class__)
+ c1 =testing.db.dialect.type_descriptor(col.type).__class__
+ c2 =testing.db.dialect.type_descriptor(binary_table.c[col.name].type).__class__
+ assert issubclass(c1, c2), "%r is not a subclass of %r" % (c1, c2)
if binary_table.c[col.name].type.length:
testing.eq_(col.type.length, binary_table.c[col.name].type.length)
- binary_table.drop()
def test_boolean(self):
"Exercise type specification for boolean type."
@@ -838,13 +907,14 @@ class TypesTest2(TestBase, AssertsExecutionResults):
'BIT'),
]
- table_args = ['test_mssql_boolean', MetaData(testing.db)]
+ table_args = ['test_mssql_boolean', metadata]
for index, spec in enumerate(columns):
type_, args, kw, res = spec
table_args.append(Column('c%s' % index, type_(*args, **kw), nullable=None))
boolean_table = Table(*table_args)
- gen = testing.db.dialect.schemagenerator(testing.db.dialect, testing.db, None, None)
+ dialect = mssql.dialect()
+ gen = dialect.ddl_compiler(dialect, schema.CreateTable(boolean_table))
for col in boolean_table.c:
index = int(col.name[1:])
@@ -852,12 +922,7 @@ class TypesTest2(TestBase, AssertsExecutionResults):
"%s %s" % (col.name, columns[index][3]))
self.assert_(repr(col))
- try:
- boolean_table.create(checkfirst=True)
- assert True
- except:
- raise
- boolean_table.drop()
+ metadata.create_all()
def test_numeric(self):
"Exercise type specification and options for numeric types."
@@ -865,40 +930,39 @@ class TypesTest2(TestBase, AssertsExecutionResults):
columns = [
# column type, args, kwargs, expected ddl
(mssql.MSNumeric, [], {},
- 'NUMERIC(10, 2)'),
+ 'NUMERIC'),
(mssql.MSNumeric, [None], {},
'NUMERIC'),
- (mssql.MSNumeric, [12], {},
- 'NUMERIC(12, 2)'),
(mssql.MSNumeric, [12, 4], {},
'NUMERIC(12, 4)'),
- (mssql.MSFloat, [], {},
- 'FLOAT(10)'),
- (mssql.MSFloat, [None], {},
+ (types.Float, [], {},
+ 'FLOAT'),
+ (types.Float, [None], {},
'FLOAT'),
- (mssql.MSFloat, [12], {},
+ (types.Float, [12], {},
'FLOAT(12)'),
(mssql.MSReal, [], {},
'REAL'),
- (mssql.MSInteger, [], {},
+ (types.Integer, [], {},
'INTEGER'),
- (mssql.MSBigInteger, [], {},
+ (types.BigInteger, [], {},
'BIGINT'),
(mssql.MSTinyInteger, [], {},
'TINYINT'),
- (mssql.MSSmallInteger, [], {},
+ (types.SmallInteger, [], {},
'SMALLINT'),
]
- table_args = ['test_mssql_numeric', MetaData(testing.db)]
+ table_args = ['test_mssql_numeric', metadata]
for index, spec in enumerate(columns):
type_, args, kw, res = spec
table_args.append(Column('c%s' % index, type_(*args, **kw), nullable=None))
numeric_table = Table(*table_args)
- gen = testing.db.dialect.schemagenerator(testing.db.dialect, testing.db, None, None)
+ dialect = mssql.dialect()
+ gen = dialect.ddl_compiler(dialect, schema.CreateTable(numeric_table))
for col in numeric_table.c:
index = int(col.name[1:])
@@ -906,20 +970,11 @@ class TypesTest2(TestBase, AssertsExecutionResults):
"%s %s" % (col.name, columns[index][3]))
self.assert_(repr(col))
- try:
- numeric_table.create(checkfirst=True)
- assert True
- except:
- raise
- numeric_table.drop()
+ metadata.create_all()
def test_char(self):
"""Exercise COLLATE-ish options on string types."""
- # modify the text_as_varchar setting since we are not testing that behavior here
- text_as_varchar = testing.db.dialect.text_as_varchar
- testing.db.dialect.text_as_varchar = False
-
columns = [
(mssql.MSChar, [], {},
'CHAR'),
@@ -960,13 +1015,14 @@ class TypesTest2(TestBase, AssertsExecutionResults):
'NTEXT COLLATE Latin1_General_CI_AS'),
]
- table_args = ['test_mssql_charset', MetaData(testing.db)]
+ table_args = ['test_mssql_charset', metadata]
for index, spec in enumerate(columns):
type_, args, kw, res = spec
table_args.append(Column('c%s' % index, type_(*args, **kw), nullable=None))
charset_table = Table(*table_args)
- gen = testing.db.dialect.schemagenerator(testing.db.dialect, testing.db, None, None)
+ dialect = mssql.dialect()
+ gen = dialect.ddl_compiler(dialect, schema.CreateTable(charset_table))
for col in charset_table.c:
index = int(col.name[1:])
@@ -974,110 +1030,91 @@ class TypesTest2(TestBase, AssertsExecutionResults):
"%s %s" % (col.name, columns[index][3]))
self.assert_(repr(col))
- try:
- charset_table.create(checkfirst=True)
- assert True
- except:
- raise
- charset_table.drop()
-
- testing.db.dialect.text_as_varchar = text_as_varchar
+ metadata.create_all()
def test_timestamp(self):
"""Exercise TIMESTAMP column."""
- meta = MetaData(testing.db)
-
- try:
- columns = [
- (TIMESTAMP,
- 'TIMESTAMP'),
- (mssql.MSTimeStamp,
- 'TIMESTAMP'),
- ]
- for idx, (spec, expected) in enumerate(columns):
- t = Table('mssql_ts%s' % idx, meta,
- Column('id', Integer, primary_key=True),
- Column('t', spec, nullable=None))
- testing.eq_(colspec(t.c.t), "t %s" % expected)
- self.assert_(repr(t.c.t))
- try:
- t.create(checkfirst=True)
- assert True
- except:
- raise
- t.drop()
- finally:
- meta.drop_all()
+ dialect = mssql.dialect()
+ spec, expected = (TIMESTAMP,'TIMESTAMP')
+ t = Table('mssql_ts', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('t', spec, nullable=None))
+ gen = dialect.ddl_compiler(dialect, schema.CreateTable(t))
+ testing.eq_(gen.get_column_specification(t.c.t), "t %s" % expected)
+ self.assert_(repr(t.c.t))
+ t.create(checkfirst=True)
+
def test_autoincrement(self):
- meta = MetaData(testing.db)
- try:
- Table('ai_1', meta,
- Column('int_y', Integer, primary_key=True),
- Column('int_n', Integer, DefaultClause('0'),
- primary_key=True))
- Table('ai_2', meta,
- Column('int_y', Integer, primary_key=True),
- Column('int_n', Integer, DefaultClause('0'),
- primary_key=True))
- Table('ai_3', meta,
- Column('int_n', Integer, DefaultClause('0'),
- primary_key=True, autoincrement=False),
- Column('int_y', Integer, primary_key=True))
- Table('ai_4', meta,
- Column('int_n', Integer, DefaultClause('0'),
- primary_key=True, autoincrement=False),
- Column('int_n2', Integer, DefaultClause('0'),
- primary_key=True, autoincrement=False))
- Table('ai_5', meta,
- Column('int_y', Integer, primary_key=True),
- Column('int_n', Integer, DefaultClause('0'),
- primary_key=True, autoincrement=False))
- Table('ai_6', meta,
- Column('o1', String(1), DefaultClause('x'),
- primary_key=True),
- Column('int_y', Integer, primary_key=True))
- Table('ai_7', meta,
- Column('o1', String(1), DefaultClause('x'),
- primary_key=True),
- Column('o2', String(1), DefaultClause('x'),
- primary_key=True),
- Column('int_y', Integer, primary_key=True))
- Table('ai_8', meta,
- Column('o1', String(1), DefaultClause('x'),
- primary_key=True),
- Column('o2', String(1), DefaultClause('x'),
- primary_key=True))
- meta.create_all()
-
- table_names = ['ai_1', 'ai_2', 'ai_3', 'ai_4',
- 'ai_5', 'ai_6', 'ai_7', 'ai_8']
- mr = MetaData(testing.db)
- mr.reflect(only=table_names)
-
- for tbl in [mr.tables[name] for name in table_names]:
- for c in tbl.c:
- if c.name.startswith('int_y'):
- assert c.autoincrement
- elif c.name.startswith('int_n'):
- assert not c.autoincrement
- tbl.insert().execute()
+ Table('ai_1', metadata,
+ Column('int_y', Integer, primary_key=True),
+ Column('int_n', Integer, DefaultClause('0'),
+ primary_key=True))
+ Table('ai_2', metadata,
+ Column('int_y', Integer, primary_key=True),
+ Column('int_n', Integer, DefaultClause('0'),
+ primary_key=True))
+ Table('ai_3', metadata,
+ Column('int_n', Integer, DefaultClause('0'),
+ primary_key=True, autoincrement=False),
+ Column('int_y', Integer, primary_key=True))
+ Table('ai_4', metadata,
+ Column('int_n', Integer, DefaultClause('0'),
+ primary_key=True, autoincrement=False),
+ Column('int_n2', Integer, DefaultClause('0'),
+ primary_key=True, autoincrement=False))
+ Table('ai_5', metadata,
+ Column('int_y', Integer, primary_key=True),
+ Column('int_n', Integer, DefaultClause('0'),
+ primary_key=True, autoincrement=False))
+ Table('ai_6', metadata,
+ Column('o1', String(1), DefaultClause('x'),
+ primary_key=True),
+ Column('int_y', Integer, primary_key=True))
+ Table('ai_7', metadata,
+ Column('o1', String(1), DefaultClause('x'),
+ primary_key=True),
+ Column('o2', String(1), DefaultClause('x'),
+ primary_key=True),
+ Column('int_y', Integer, primary_key=True))
+ Table('ai_8', metadata,
+ Column('o1', String(1), DefaultClause('x'),
+ primary_key=True),
+ Column('o2', String(1), DefaultClause('x'),
+ primary_key=True))
+ metadata.create_all()
+
+ table_names = ['ai_1', 'ai_2', 'ai_3', 'ai_4',
+ 'ai_5', 'ai_6', 'ai_7', 'ai_8']
+ mr = MetaData(testing.db)
+
+ for name in table_names:
+ tbl = Table(name, mr, autoload=True)
+ for c in tbl.c:
+ if c.name.startswith('int_y'):
+ assert c.autoincrement
+ elif c.name.startswith('int_n'):
+ assert not c.autoincrement
+
+ for counter, engine in enumerate([
+ engines.testing_engine(options={'implicit_returning':False}),
+ engines.testing_engine(options={'implicit_returning':True}),
+ ]
+ ):
+ engine.execute(tbl.insert())
if 'int_y' in tbl.c:
- assert select([tbl.c.int_y]).scalar() == 1
- assert list(tbl.select().execute().fetchone()).count(1) == 1
+ assert engine.scalar(select([tbl.c.int_y])) == counter + 1
+ assert list(engine.execute(tbl.select()).first()).count(counter + 1) == 1
else:
- assert 1 not in list(tbl.select().execute().fetchone())
- finally:
- meta.drop_all()
-
-def colspec(c):
- return testing.db.dialect.schemagenerator(testing.db.dialect,
- testing.db, None, None).get_column_specification(c)
-
+ assert 1 not in list(engine.execute(tbl.select()).first())
+ engine.execute(tbl.delete())
class BinaryTest(TestBase, AssertsExecutionResults):
"""Test the Binary and VarBinary types"""
+
+ __only_on__ = 'mssql'
+
@classmethod
def setup_class(cls):
global binary_table, MyPickleType
@@ -1125,6 +1162,11 @@ class BinaryTest(TestBase, AssertsExecutionResults):
stream2 =self.load_stream('binary_data_two.dat')
binary_table.insert().execute(primary_id=1, misc='binary_data_one.dat', data=stream1, data_image=stream1, data_slice=stream1[0:100], pickled=testobj1, mypickle=testobj3)
binary_table.insert().execute(primary_id=2, misc='binary_data_two.dat', data=stream2, data_image=stream2, data_slice=stream2[0:99], pickled=testobj2)
+
+ # TODO: pyodbc does not seem to accept "None" for a VARBINARY column (data=None).
+ # error: [Microsoft][ODBC SQL Server Driver][SQL Server]Implicit conversion from
+ # data type varchar to varbinary is not allowed. Use the CONVERT function to run this query. (257)
+ #binary_table.insert().execute(primary_id=3, misc='binary_data_two.dat', data=None, data_image=None, data_slice=stream2[0:99], pickled=None)
binary_table.insert().execute(primary_id=3, misc='binary_data_two.dat', data_image=None, data_slice=stream2[0:99], pickled=None)
for stmt in (
diff --git a/test/dialect/test_mysql.py b/test/dialect/test_mysql.py
index 8adb2d71c..405264152 100644
--- a/test/dialect/test_mysql.py
+++ b/test/dialect/test_mysql.py
@@ -1,8 +1,12 @@
from sqlalchemy.test.testing import eq_
+
+# Py2K
import sets
+# end Py2K
+
from sqlalchemy import *
from sqlalchemy import sql, exc
-from sqlalchemy.databases import mysql
+from sqlalchemy.dialects.mysql import base as mysql
from sqlalchemy.test.testing import eq_
from sqlalchemy.test import *
@@ -56,11 +60,11 @@ class TypesTest(TestBase, AssertsExecutionResults):
# column type, args, kwargs, expected ddl
# e.g. Column(Integer(10, unsigned=True)) == 'INTEGER(10) UNSIGNED'
(mysql.MSNumeric, [], {},
- 'NUMERIC(10, 2)'),
+ 'NUMERIC'),
(mysql.MSNumeric, [None], {},
'NUMERIC'),
(mysql.MSNumeric, [12], {},
- 'NUMERIC(12, 2)'),
+ 'NUMERIC(12)'),
(mysql.MSNumeric, [12, 4], {'unsigned':True},
'NUMERIC(12, 4) UNSIGNED'),
(mysql.MSNumeric, [12, 4], {'zerofill':True},
@@ -69,11 +73,11 @@ class TypesTest(TestBase, AssertsExecutionResults):
'NUMERIC(12, 4) UNSIGNED ZEROFILL'),
(mysql.MSDecimal, [], {},
- 'DECIMAL(10, 2)'),
+ 'DECIMAL'),
(mysql.MSDecimal, [None], {},
'DECIMAL'),
(mysql.MSDecimal, [12], {},
- 'DECIMAL(12, 2)'),
+ 'DECIMAL(12)'),
(mysql.MSDecimal, [12, None], {},
'DECIMAL(12)'),
(mysql.MSDecimal, [12, 4], {'unsigned':True},
@@ -178,11 +182,11 @@ class TypesTest(TestBase, AssertsExecutionResults):
table_args.append(Column('c%s' % index, type_(*args, **kw)))
numeric_table = Table(*table_args)
- gen = testing.db.dialect.schemagenerator(testing.db.dialect, testing.db, None, None)
+ gen = testing.db.dialect.ddl_compiler(testing.db.dialect, numeric_table)
for col in numeric_table.c:
index = int(col.name[1:])
- self.assert_eq(gen.get_column_specification(col),
+ eq_(gen.get_column_specification(col),
"%s %s" % (col.name, columns[index][3]))
self.assert_(repr(col))
@@ -262,11 +266,11 @@ class TypesTest(TestBase, AssertsExecutionResults):
table_args.append(Column('c%s' % index, type_(*args, **kw)))
charset_table = Table(*table_args)
- gen = testing.db.dialect.schemagenerator(testing.db.dialect, testing.db, None, None)
+ gen = testing.db.dialect.ddl_compiler(testing.db.dialect, charset_table)
for col in charset_table.c:
index = int(col.name[1:])
- self.assert_eq(gen.get_column_specification(col),
+ eq_(gen.get_column_specification(col),
"%s %s" % (col.name, columns[index][3]))
self.assert_(repr(col))
@@ -292,14 +296,14 @@ class TypesTest(TestBase, AssertsExecutionResults):
Column('b7', mysql.MSBit(63)),
Column('b8', mysql.MSBit(64)))
- self.assert_eq(colspec(bit_table.c.b1), 'b1 BIT')
- self.assert_eq(colspec(bit_table.c.b2), 'b2 BIT')
- self.assert_eq(colspec(bit_table.c.b3), 'b3 BIT NOT NULL')
- self.assert_eq(colspec(bit_table.c.b4), 'b4 BIT(1)')
- self.assert_eq(colspec(bit_table.c.b5), 'b5 BIT(8)')
- self.assert_eq(colspec(bit_table.c.b6), 'b6 BIT(32)')
- self.assert_eq(colspec(bit_table.c.b7), 'b7 BIT(63)')
- self.assert_eq(colspec(bit_table.c.b8), 'b8 BIT(64)')
+ eq_(colspec(bit_table.c.b1), 'b1 BIT')
+ eq_(colspec(bit_table.c.b2), 'b2 BIT')
+ eq_(colspec(bit_table.c.b3), 'b3 BIT NOT NULL')
+ eq_(colspec(bit_table.c.b4), 'b4 BIT(1)')
+ eq_(colspec(bit_table.c.b5), 'b5 BIT(8)')
+ eq_(colspec(bit_table.c.b6), 'b6 BIT(32)')
+ eq_(colspec(bit_table.c.b7), 'b7 BIT(63)')
+ eq_(colspec(bit_table.c.b8), 'b8 BIT(64)')
for col in bit_table.c:
self.assert_(repr(col))
@@ -314,7 +318,7 @@ class TypesTest(TestBase, AssertsExecutionResults):
def roundtrip(store, expected=None):
expected = expected or store
table.insert(store).execute()
- row = list(table.select().execute())[0]
+ row = table.select().execute().first()
try:
self.assert_(list(row) == expected)
except:
@@ -322,7 +326,7 @@ class TypesTest(TestBase, AssertsExecutionResults):
print "Expected %s" % expected
print "Found %s" % list(row)
raise
- table.delete().execute()
+ table.delete().execute().close()
roundtrip([0] * 8)
roundtrip([None, None, 0, None, None, None, None, None])
@@ -350,10 +354,10 @@ class TypesTest(TestBase, AssertsExecutionResults):
Column('b3', mysql.MSTinyInteger(1)),
Column('b4', mysql.MSTinyInteger))
- self.assert_eq(colspec(bool_table.c.b1), 'b1 BOOL')
- self.assert_eq(colspec(bool_table.c.b2), 'b2 BOOL')
- self.assert_eq(colspec(bool_table.c.b3), 'b3 TINYINT(1)')
- self.assert_eq(colspec(bool_table.c.b4), 'b4 TINYINT')
+ eq_(colspec(bool_table.c.b1), 'b1 BOOL')
+ eq_(colspec(bool_table.c.b2), 'b2 BOOL')
+ eq_(colspec(bool_table.c.b3), 'b3 TINYINT(1)')
+ eq_(colspec(bool_table.c.b4), 'b4 TINYINT')
for col in bool_table.c:
self.assert_(repr(col))
@@ -364,7 +368,7 @@ class TypesTest(TestBase, AssertsExecutionResults):
def roundtrip(store, expected=None):
expected = expected or store
table.insert(store).execute()
- row = list(table.select().execute())[0]
+ row = table.select().execute().first()
try:
self.assert_(list(row) == expected)
for i, val in enumerate(expected):
@@ -375,7 +379,7 @@ class TypesTest(TestBase, AssertsExecutionResults):
print "Expected %s" % expected
print "Found %s" % list(row)
raise
- table.delete().execute()
+ table.delete().execute().close()
roundtrip([None, None, None, None])
@@ -387,7 +391,7 @@ class TypesTest(TestBase, AssertsExecutionResults):
meta2 = MetaData(testing.db)
# replace with reflected
table = Table('mysql_bool', meta2, autoload=True)
- self.assert_eq(colspec(table.c.b3), 'b3 BOOL')
+ eq_(colspec(table.c.b3), 'b3 BOOL')
roundtrip([None, None, None, None])
roundtrip([True, True, 1, 1], [True, True, True, 1])
@@ -430,7 +434,7 @@ class TypesTest(TestBase, AssertsExecutionResults):
t = Table('mysql_ts%s' % idx, meta,
Column('id', Integer, primary_key=True),
Column('t', *spec))
- self.assert_eq(colspec(t.c.t), "t %s" % expected)
+ eq_(colspec(t.c.t), "t %s" % expected)
self.assert_(repr(t.c.t))
t.create()
r = Table('mysql_ts%s' % idx, MetaData(testing.db),
@@ -460,12 +464,12 @@ class TypesTest(TestBase, AssertsExecutionResults):
for table in year_table, reflected:
table.insert(['1950', '50', None, 50, 1950]).execute()
- row = list(table.select().execute())[0]
- self.assert_eq(list(row), [1950, 2050, None, 50, 1950])
+ row = table.select().execute().first()
+ eq_(list(row), [1950, 2050, None, 50, 1950])
table.delete().execute()
self.assert_(colspec(table.c.y1).startswith('y1 YEAR'))
- self.assert_eq(colspec(table.c.y4), 'y4 YEAR(2)')
- self.assert_eq(colspec(table.c.y5), 'y5 YEAR(4)')
+ eq_(colspec(table.c.y4), 'y4 YEAR(2)')
+ eq_(colspec(table.c.y5), 'y5 YEAR(4)')
finally:
meta.drop_all()
@@ -479,9 +483,9 @@ class TypesTest(TestBase, AssertsExecutionResults):
Column('s2', mysql.MSSet("'a'")),
Column('s3', mysql.MSSet("'5'", "'7'", "'9'")))
- self.assert_eq(colspec(set_table.c.s1), "s1 SET('dq','sq')")
- self.assert_eq(colspec(set_table.c.s2), "s2 SET('a')")
- self.assert_eq(colspec(set_table.c.s3), "s3 SET('5','7','9')")
+ eq_(colspec(set_table.c.s1), "s1 SET('dq','sq')")
+ eq_(colspec(set_table.c.s2), "s2 SET('a')")
+ eq_(colspec(set_table.c.s3), "s3 SET('5','7','9')")
for col in set_table.c:
self.assert_(repr(col))
@@ -494,7 +498,7 @@ class TypesTest(TestBase, AssertsExecutionResults):
def roundtrip(store, expected=None):
expected = expected or store
table.insert(store).execute()
- row = list(table.select().execute())[0]
+ row = table.select().execute().first()
try:
self.assert_(list(row) == expected)
except:
@@ -518,12 +522,12 @@ class TypesTest(TestBase, AssertsExecutionResults):
{'s3':set(['5', '7'])},
{'s3':set(['5', '7', '9'])},
{'s3':set(['7', '9'])})
- rows = list(select(
+ rows = select(
[set_table.c.s3],
- set_table.c.s3.in_([set(['5']), set(['5', '7'])])).execute())
+ set_table.c.s3.in_([set(['5']), set(['5', '7']), set(['7', '5'])])
+ ).execute().fetchall()
found = set([frozenset(row[0]) for row in rows])
- eq_(found,
- set([frozenset(['5']), frozenset(['5', '7'])]))
+ eq_(found, set([frozenset(['5']), frozenset(['5', '7'])]))
finally:
meta.drop_all()
@@ -542,17 +546,17 @@ class TypesTest(TestBase, AssertsExecutionResults):
Column('e6', mysql.MSEnum("'a'", "b")),
)
- self.assert_eq(colspec(enum_table.c.e1),
+ eq_(colspec(enum_table.c.e1),
"e1 ENUM('a','b')")
- self.assert_eq(colspec(enum_table.c.e2),
+ eq_(colspec(enum_table.c.e2),
"e2 ENUM('a','b') NOT NULL")
- self.assert_eq(colspec(enum_table.c.e3),
+ eq_(colspec(enum_table.c.e3),
"e3 ENUM('a','b')")
- self.assert_eq(colspec(enum_table.c.e4),
+ eq_(colspec(enum_table.c.e4),
"e4 ENUM('a','b') NOT NULL")
- self.assert_eq(colspec(enum_table.c.e5),
+ eq_(colspec(enum_table.c.e5),
"e5 ENUM('a','b')")
- self.assert_eq(colspec(enum_table.c.e6),
+ eq_(colspec(enum_table.c.e6),
"e6 ENUM('''a''','b')")
enum_table.drop(checkfirst=True)
enum_table.create()
@@ -585,8 +589,9 @@ class TypesTest(TestBase, AssertsExecutionResults):
# This is known to fail with MySQLDB 1.2.2 beta versions
# which return these as sets.Set(['a']), sets.Set(['b'])
# (even on Pythons with __builtin__.set)
- if testing.db.dialect.dbapi.version_info < (1, 2, 2, 'beta', 3) and \
- testing.db.dialect.dbapi.version_info >= (1, 2, 2):
+ if (not testing.against('+zxjdbc') and
+ testing.db.dialect.dbapi.version_info < (1, 2, 2, 'beta', 3) and
+ testing.db.dialect.dbapi.version_info >= (1, 2, 2)):
# these mysqldb seem to always uses 'sets', even on later pythons
import sets
def convert(value):
@@ -602,7 +607,7 @@ class TypesTest(TestBase, AssertsExecutionResults):
e.append(tuple([convert(c) for c in row]))
expected = e
- self.assert_eq(res, expected)
+ eq_(res, expected)
enum_table.drop()
@testing.exclude('mysql', '<', (4,), "3.23 can't handle an ENUM of ''")
@@ -637,25 +642,52 @@ class TypesTest(TestBase, AssertsExecutionResults):
finally:
enum_table.drop()
+
+
+class ReflectionTest(TestBase, AssertsExecutionResults):
+
+ __only_on__ = 'mysql'
+
def test_default_reflection(self):
"""Test reflection of column defaults."""
def_table = Table('mysql_def', MetaData(testing.db),
Column('c1', String(10), DefaultClause('')),
Column('c2', String(10), DefaultClause('0')),
- Column('c3', String(10), DefaultClause('abc')))
+ Column('c3', String(10), DefaultClause('abc')),
+ Column('c4', TIMESTAMP, DefaultClause('2009-04-05 12:00:00')),
+ Column('c5', TIMESTAMP, ),
+
+ )
+ def_table.create()
try:
- def_table.create()
reflected = Table('mysql_def', MetaData(testing.db),
- autoload=True)
- for t in def_table, reflected:
- assert t.c.c1.server_default.arg == ''
- assert t.c.c2.server_default.arg == '0'
- assert t.c.c3.server_default.arg == 'abc'
+ autoload=True)
finally:
def_table.drop()
+
+ assert def_table.c.c1.server_default.arg == ''
+ assert def_table.c.c2.server_default.arg == '0'
+ assert def_table.c.c3.server_default.arg == 'abc'
+ assert def_table.c.c4.server_default.arg == '2009-04-05 12:00:00'
+
+ assert str(reflected.c.c1.server_default.arg) == "''"
+ assert str(reflected.c.c2.server_default.arg) == "'0'"
+ assert str(reflected.c.c3.server_default.arg) == "'abc'"
+ assert str(reflected.c.c4.server_default.arg) == "'2009-04-05 12:00:00'"
+
+ reflected.create()
+ try:
+ reflected2 = Table('mysql_def', MetaData(testing.db), autoload=True)
+ finally:
+ reflected.drop()
+ assert str(reflected2.c.c1.server_default.arg) == "''"
+ assert str(reflected2.c.c2.server_default.arg) == "'0'"
+ assert str(reflected2.c.c3.server_default.arg) == "'abc'"
+ assert str(reflected2.c.c4.server_default.arg) == "'2009-04-05 12:00:00'"
+
def test_reflection_on_include_columns(self):
"""Test reflection of include_columns to be sure they respect case."""
@@ -700,8 +732,8 @@ class TypesTest(TestBase, AssertsExecutionResults):
( mysql.MSSmallInteger(4), mysql.MSSmallInteger(4), ),
( mysql.MSMediumInteger(), mysql.MSMediumInteger(), ),
( mysql.MSMediumInteger(8), mysql.MSMediumInteger(8), ),
- ( Binary(3), mysql.MSBlob(3), ),
- ( Binary(), mysql.MSBlob() ),
+ ( Binary(3), mysql.TINYBLOB(), ),
+ ( Binary(), mysql.BLOB() ),
( mysql.MSBinary(3), mysql.MSBinary(3), ),
( mysql.MSVarBinary(3),),
( mysql.MSVarBinary(), mysql.MSBlob()),
@@ -734,14 +766,15 @@ class TypesTest(TestBase, AssertsExecutionResults):
# in a view, e.g. char -> varchar, tinyblob -> mediumblob
#
# Not sure exactly which point version has the fix.
- if db.dialect.server_version_info(db.connect()) < (5, 0, 11):
+ if db.dialect.server_version_info < (5, 0, 11):
tables = rt,
else:
tables = rt, rv
for table in tables:
for i, reflected in enumerate(table.c):
- assert isinstance(reflected.type, type(expected[i]))
+ assert isinstance(reflected.type, type(expected[i])), \
+ "element %d: %r not instance of %r" % (i, reflected.type, type(expected[i]))
finally:
db.execute('DROP VIEW mysql_types_v')
finally:
@@ -802,17 +835,12 @@ class TypesTest(TestBase, AssertsExecutionResults):
tbl.insert().execute()
if 'int_y' in tbl.c:
assert select([tbl.c.int_y]).scalar() == 1
- assert list(tbl.select().execute().fetchone()).count(1) == 1
+ assert list(tbl.select().execute().first()).count(1) == 1
else:
- assert 1 not in list(tbl.select().execute().fetchone())
+ assert 1 not in list(tbl.select().execute().first())
finally:
meta.drop_all()
- def assert_eq(self, got, wanted):
- if got != wanted:
- print "Expected %s" % wanted
- print "Found %s" % got
- eq_(got, wanted)
class SQLTest(TestBase, AssertsCompiledSQL):
@@ -909,11 +937,11 @@ class SQLTest(TestBase, AssertsCompiledSQL):
(m.MSBit, "t.col"),
# this is kind of sucky. thank you default arguments!
- (NUMERIC, "CAST(t.col AS DECIMAL(10, 2))"),
- (DECIMAL, "CAST(t.col AS DECIMAL(10, 2))"),
- (Numeric, "CAST(t.col AS DECIMAL(10, 2))"),
- (m.MSNumeric, "CAST(t.col AS DECIMAL(10, 2))"),
- (m.MSDecimal, "CAST(t.col AS DECIMAL(10, 2))"),
+ (NUMERIC, "CAST(t.col AS DECIMAL)"),
+ (DECIMAL, "CAST(t.col AS DECIMAL)"),
+ (Numeric, "CAST(t.col AS DECIMAL)"),
+ (m.MSNumeric, "CAST(t.col AS DECIMAL)"),
+ (m.MSDecimal, "CAST(t.col AS DECIMAL)"),
(FLOAT, "t.col"),
(Float, "t.col"),
@@ -928,8 +956,8 @@ class SQLTest(TestBase, AssertsCompiledSQL):
(DateTime, "CAST(t.col AS DATETIME)"),
(Date, "CAST(t.col AS DATE)"),
(Time, "CAST(t.col AS TIME)"),
- (m.MSDateTime, "CAST(t.col AS DATETIME)"),
- (m.MSDate, "CAST(t.col AS DATE)"),
+ (DateTime, "CAST(t.col AS DATETIME)"),
+ (Date, "CAST(t.col AS DATE)"),
(m.MSTime, "CAST(t.col AS TIME)"),
(m.MSTimeStamp, "CAST(t.col AS DATETIME)"),
(m.MSYear, "t.col"),
@@ -998,12 +1026,11 @@ class SQLTest(TestBase, AssertsCompiledSQL):
class RawReflectionTest(TestBase):
def setup(self):
- self.dialect = mysql.dialect()
- self.reflector = mysql.MySQLSchemaReflector(
- self.dialect.identifier_preparer)
+ dialect = mysql.dialect()
+ self.parser = mysql.MySQLTableDefinitionParser(dialect, dialect.identifier_preparer)
def test_key_reflection(self):
- regex = self.reflector._re_key
+ regex = self.parser._re_key
assert regex.match(' PRIMARY KEY (`id`),')
assert regex.match(' PRIMARY KEY USING BTREE (`id`),')
@@ -1023,37 +1050,11 @@ class ExecutionTest(TestBase):
cx = engine.connect()
meta = MetaData()
-
- assert ('mysql', 'charset') not in cx.info
- assert ('mysql', 'force_charset') not in cx.info
-
- cx.execute(text("SELECT 1")).fetchall()
- assert ('mysql', 'charset') not in cx.info
-
- meta.reflect(cx)
- assert ('mysql', 'charset') in cx.info
-
- cx.execute(text("SET @squiznart=123"))
- assert ('mysql', 'charset') in cx.info
-
- # the charset invalidation is very conservative
- cx.execute(text("SET TIMESTAMP = DEFAULT"))
- assert ('mysql', 'charset') not in cx.info
-
- cx.info[('mysql', 'force_charset')] = 'latin1'
-
- assert engine.dialect._detect_charset(cx) == 'latin1'
- assert cx.info[('mysql', 'charset')] == 'latin1'
-
- del cx.info[('mysql', 'force_charset')]
- del cx.info[('mysql', 'charset')]
+ charset = engine.dialect._detect_charset(cx)
meta.reflect(cx)
- assert ('mysql', 'charset') in cx.info
-
- # String execution doesn't go through the detector.
- cx.execute("SET TIMESTAMP = DEFAULT")
- assert ('mysql', 'charset') in cx.info
+ eq_(cx.dialect._connection_charset, charset)
+ cx.close()
class MatchTest(TestBase, AssertsCompiledSQL):
@@ -1102,9 +1103,10 @@ class MatchTest(TestBase, AssertsCompiledSQL):
metadata.drop_all()
def test_expression(self):
+ format = testing.db.dialect.paramstyle == 'format' and '%s' or '?'
self.assert_compile(
matchtable.c.title.match('somstr'),
- "MATCH (matchtable.title) AGAINST (%s IN BOOLEAN MODE)")
+ "MATCH (matchtable.title) AGAINST (%s IN BOOLEAN MODE)" % format)
def test_simple_match(self):
results = (matchtable.select().
@@ -1162,6 +1164,5 @@ class MatchTest(TestBase, AssertsCompiledSQL):
def colspec(c):
- return testing.db.dialect.schemagenerator(testing.db.dialect,
- testing.db, None, None).get_column_specification(c)
+ return testing.db.dialect.ddl_compiler(testing.db.dialect, c.table).get_column_specification(c)
diff --git a/test/dialect/test_oracle.py b/test/dialect/test_oracle.py
index d9d64806e..53e0f9ec2 100644
--- a/test/dialect/test_oracle.py
+++ b/test/dialect/test_oracle.py
@@ -2,12 +2,14 @@
from sqlalchemy.test.testing import eq_
from sqlalchemy import *
+from sqlalchemy import types as sqltypes
from sqlalchemy.sql import table, column
-from sqlalchemy.databases import oracle
from sqlalchemy.test import *
from sqlalchemy.test.testing import eq_
from sqlalchemy.test.engines import testing_engine
+from sqlalchemy.dialects.oracle import cx_oracle, base as oracle
from sqlalchemy.engine import default
+from sqlalchemy.util import jython
import os
@@ -43,10 +45,10 @@ class CompileTest(TestBase, AssertsCompiledSQL):
meta = MetaData()
parent = Table('parent', meta, Column('id', Integer, primary_key=True),
Column('name', String(50)),
- owner='ed')
+ schema='ed')
child = Table('child', meta, Column('id', Integer, primary_key=True),
Column('parent_id', Integer, ForeignKey('ed.parent.id')),
- owner = 'ed')
+ schema = 'ed')
self.assert_compile(parent.join(child), "ed.parent JOIN ed.child ON ed.parent.id = ed.child.parent_id")
@@ -342,6 +344,25 @@ class TypesTest(TestBase, AssertsCompiledSQL):
b = bindparam("foo", u"hello world!")
assert b.type.dialect_impl(dialect).get_dbapi_type(dbapi) == 'STRING'
+ def test_type_adapt(self):
+ dialect = cx_oracle.dialect()
+
+ for start, test in [
+ (DateTime(), cx_oracle._OracleDateTime),
+ (TIMESTAMP(), cx_oracle._OracleTimestamp),
+ (oracle.OracleRaw(), cx_oracle._OracleRaw),
+ (String(), String),
+ (VARCHAR(), VARCHAR),
+ (String(50), String),
+ (Unicode(), Unicode),
+ (Text(), cx_oracle._OracleText),
+ (UnicodeText(), cx_oracle._OracleUnicodeText),
+ (NCHAR(), NCHAR),
+ (oracle.RAW(50), cx_oracle._OracleRaw),
+ ]:
+ assert isinstance(start.dialect_impl(dialect), test), "wanted %r got %r" % (test, start.dialect_impl(dialect))
+
+
def test_reflect_raw(self):
types_table = Table(
'all_types', MetaData(testing.db),
@@ -354,16 +375,16 @@ class TypesTest(TestBase, AssertsCompiledSQL):
def test_reflect_nvarchar(self):
metadata = MetaData(testing.db)
t = Table('t', metadata,
- Column('data', oracle.OracleNVarchar(255))
+ Column('data', sqltypes.NVARCHAR(255))
)
metadata.create_all()
try:
m2 = MetaData(testing.db)
t2 = Table('t', m2, autoload=True)
- assert isinstance(t2.c.data.type, oracle.OracleNVarchar)
+ assert isinstance(t2.c.data.type, sqltypes.NVARCHAR)
data = u'm’a réveillé.'
t2.insert().execute(data=data)
- eq_(t2.select().execute().fetchone()['data'], data)
+ eq_(t2.select().execute().first()['data'], data)
finally:
metadata.drop_all()
@@ -391,7 +412,7 @@ class TypesTest(TestBase, AssertsCompiledSQL):
t.create(engine)
try:
engine.execute(t.insert(), id=1, data='this is text', bindata='this is binary')
- row = engine.execute(t.select()).fetchone()
+ row = engine.execute(t.select()).first()
eq_(row['data'].read(), 'this is text')
eq_(row['bindata'].read(), 'this is binary')
finally:
@@ -408,7 +429,6 @@ class BufferedColumnTest(TestBase, AssertsCompiledSQL):
Column('data', Binary)
)
meta.create_all()
-
stream = os.path.join(os.path.dirname(__file__), "..", 'binary_data_one.dat')
stream = file(stream).read(12000)
@@ -420,17 +440,18 @@ class BufferedColumnTest(TestBase, AssertsCompiledSQL):
meta.drop_all()
def test_fetch(self):
- eq_(
- binary_table.select().execute().fetchall() ,
- [(i, stream) for i in range(1, 11)],
- )
+ result = binary_table.select().execute().fetchall()
+ if jython:
+ result = [(i, value.tostring()) for i, value in result]
+ eq_(result, [(i, stream) for i in range(1, 11)])
+ @testing.fails_on('+zxjdbc', 'FIXME: zxjdbc should support this')
def test_fetch_single_arraysize(self):
eng = testing_engine(options={'arraysize':1})
- eq_(
- eng.execute(binary_table.select()).fetchall(),
- [(i, stream) for i in range(1, 11)],
- )
+ result = eng.execute(binary_table.select()).fetchall(),
+ if jython:
+ result = [(i, value.tostring()) for i, value in result]
+ eq_(result, [(i, stream) for i in range(1, 11)])
class SequenceTest(TestBase, AssertsCompiledSQL):
def test_basic(self):
diff --git a/test/dialect/test_postgres.py b/test/dialect/test_postgresql.py
index 8ca714bad..e1c351a93 100644
--- a/test/dialect/test_postgres.py
+++ b/test/dialect/test_postgresql.py
@@ -1,18 +1,19 @@
from sqlalchemy.test.testing import eq_, assert_raises, assert_raises_message
+from sqlalchemy.test import engines
import datetime
from sqlalchemy import *
from sqlalchemy.orm import *
-from sqlalchemy import exc
-from sqlalchemy.databases import postgres
+from sqlalchemy import exc, schema
+from sqlalchemy.dialects.postgresql import base as postgresql
from sqlalchemy.engine.strategies import MockEngineStrategy
from sqlalchemy.test import *
from sqlalchemy.sql import table, column
-
+from sqlalchemy.test.testing import eq_
class SequenceTest(TestBase, AssertsCompiledSQL):
def test_basic(self):
seq = Sequence("my_seq_no_schema")
- dialect = postgres.PGDialect()
+ dialect = postgresql.PGDialect()
assert dialect.identifier_preparer.format_sequence(seq) == "my_seq_no_schema"
seq = Sequence("my_seq", schema="some_schema")
@@ -22,43 +23,77 @@ class SequenceTest(TestBase, AssertsCompiledSQL):
assert dialect.identifier_preparer.format_sequence(seq) == '"Some_Schema"."My_Seq"'
class CompileTest(TestBase, AssertsCompiledSQL):
- __dialect__ = postgres.dialect()
+ __dialect__ = postgresql.dialect()
def test_update_returning(self):
- dialect = postgres.dialect()
+ dialect = postgresql.dialect()
table1 = table('mytable',
column('myid', Integer),
column('name', String(128)),
column('description', String(128)),
)
- u = update(table1, values=dict(name='foo'), postgres_returning=[table1.c.myid, table1.c.name])
+ u = update(table1, values=dict(name='foo')).returning(table1.c.myid, table1.c.name)
self.assert_compile(u, "UPDATE mytable SET name=%(name)s RETURNING mytable.myid, mytable.name", dialect=dialect)
- u = update(table1, values=dict(name='foo'), postgres_returning=[table1])
+ u = update(table1, values=dict(name='foo')).returning(table1)
self.assert_compile(u, "UPDATE mytable SET name=%(name)s "\
"RETURNING mytable.myid, mytable.name, mytable.description", dialect=dialect)
- u = update(table1, values=dict(name='foo'), postgres_returning=[func.length(table1.c.name)])
- self.assert_compile(u, "UPDATE mytable SET name=%(name)s RETURNING length(mytable.name)", dialect=dialect)
+ u = update(table1, values=dict(name='foo')).returning(func.length(table1.c.name))
+ self.assert_compile(u, "UPDATE mytable SET name=%(name)s RETURNING length(mytable.name) AS length_1", dialect=dialect)
+
def test_insert_returning(self):
- dialect = postgres.dialect()
+ dialect = postgresql.dialect()
table1 = table('mytable',
column('myid', Integer),
column('name', String(128)),
column('description', String(128)),
)
- i = insert(table1, values=dict(name='foo'), postgres_returning=[table1.c.myid, table1.c.name])
+ i = insert(table1, values=dict(name='foo')).returning(table1.c.myid, table1.c.name)
self.assert_compile(i, "INSERT INTO mytable (name) VALUES (%(name)s) RETURNING mytable.myid, mytable.name", dialect=dialect)
- i = insert(table1, values=dict(name='foo'), postgres_returning=[table1])
+ i = insert(table1, values=dict(name='foo')).returning(table1)
self.assert_compile(i, "INSERT INTO mytable (name) VALUES (%(name)s) "\
"RETURNING mytable.myid, mytable.name, mytable.description", dialect=dialect)
- i = insert(table1, values=dict(name='foo'), postgres_returning=[func.length(table1.c.name)])
- self.assert_compile(i, "INSERT INTO mytable (name) VALUES (%(name)s) RETURNING length(mytable.name)", dialect=dialect)
+ i = insert(table1, values=dict(name='foo')).returning(func.length(table1.c.name))
+ self.assert_compile(i, "INSERT INTO mytable (name) VALUES (%(name)s) RETURNING length(mytable.name) AS length_1", dialect=dialect)
+
+ @testing.uses_deprecated(r".*argument is deprecated. Please use statement.returning.*")
+ def test_old_returning_names(self):
+ dialect = postgresql.dialect()
+ table1 = table('mytable',
+ column('myid', Integer),
+ column('name', String(128)),
+ column('description', String(128)),
+ )
+
+ u = update(table1, values=dict(name='foo'), postgres_returning=[table1.c.myid, table1.c.name])
+ self.assert_compile(u, "UPDATE mytable SET name=%(name)s RETURNING mytable.myid, mytable.name", dialect=dialect)
+
+ u = update(table1, values=dict(name='foo'), postgresql_returning=[table1.c.myid, table1.c.name])
+ self.assert_compile(u, "UPDATE mytable SET name=%(name)s RETURNING mytable.myid, mytable.name", dialect=dialect)
+
+ i = insert(table1, values=dict(name='foo'), postgres_returning=[table1.c.myid, table1.c.name])
+ self.assert_compile(i, "INSERT INTO mytable (name) VALUES (%(name)s) RETURNING mytable.myid, mytable.name", dialect=dialect)
+
+ def test_create_partial_index(self):
+ tbl = Table('testtbl', MetaData(), Column('data',Integer))
+ idx = Index('test_idx1', tbl.c.data, postgresql_where=and_(tbl.c.data > 5, tbl.c.data < 10))
+
+ self.assert_compile(schema.CreateIndex(idx),
+ "CREATE INDEX test_idx1 ON testtbl (data) WHERE testtbl.data > 5 AND testtbl.data < 10", dialect=postgresql.dialect())
+
+ @testing.uses_deprecated(r".*'postgres_where' argument has been renamed.*")
+ def test_old_create_partial_index(self):
+ tbl = Table('testtbl', MetaData(), Column('data',Integer))
+ idx = Index('test_idx1', tbl.c.data, postgres_where=and_(tbl.c.data > 5, tbl.c.data < 10))
+
+ self.assert_compile(schema.CreateIndex(idx),
+ "CREATE INDEX test_idx1 ON testtbl (data) WHERE testtbl.data > 5 AND testtbl.data < 10", dialect=postgresql.dialect())
def test_extract(self):
t = table('t', column('col1'))
@@ -70,72 +105,20 @@ class CompileTest(TestBase, AssertsCompiledSQL):
"FROM t" % field)
-class ReturningTest(TestBase, AssertsExecutionResults):
- __only_on__ = 'postgres'
-
- @testing.exclude('postgres', '<', (8, 2), '8.3+ feature')
- def test_update_returning(self):
- meta = MetaData(testing.db)
- table = Table('tables', meta,
- Column('id', Integer, primary_key=True),
- Column('persons', Integer),
- Column('full', Boolean)
- )
- table.create()
- try:
- table.insert().execute([{'persons': 5, 'full': False}, {'persons': 3, 'full': False}])
-
- result = table.update(table.c.persons > 4, dict(full=True), postgres_returning=[table.c.id]).execute()
- eq_(result.fetchall(), [(1,)])
-
- result2 = select([table.c.id, table.c.full]).order_by(table.c.id).execute()
- eq_(result2.fetchall(), [(1,True),(2,False)])
- finally:
- table.drop()
-
- @testing.exclude('postgres', '<', (8, 2), '8.3+ feature')
- def test_insert_returning(self):
- meta = MetaData(testing.db)
- table = Table('tables', meta,
- Column('id', Integer, primary_key=True),
- Column('persons', Integer),
- Column('full', Boolean)
- )
- table.create()
- try:
- result = table.insert(postgres_returning=[table.c.id]).execute({'persons': 1, 'full': False})
-
- eq_(result.fetchall(), [(1,)])
-
- @testing.fails_on('postgres', 'Known limitation of psycopg2')
- def test_executemany():
- # return value is documented as failing with psycopg2/executemany
- result2 = table.insert(postgres_returning=[table]).execute(
- [{'persons': 2, 'full': False}, {'persons': 3, 'full': True}])
- eq_(result2.fetchall(), [(2, 2, False), (3,3,True)])
-
- test_executemany()
-
- result3 = table.insert(postgres_returning=[(table.c.id*2).label('double_id')]).execute({'persons': 4, 'full': False})
- eq_([dict(row) for row in result3], [{'double_id':8}])
-
- result4 = testing.db.execute('insert into tables (id, persons, "full") values (5, 10, true) returning persons')
- eq_([dict(row) for row in result4], [{'persons': 10}])
- finally:
- table.drop()
-
-
class InsertTest(TestBase, AssertsExecutionResults):
- __only_on__ = 'postgres'
+ __only_on__ = 'postgresql'
@classmethod
def setup_class(cls):
global metadata
+ cls.engine= testing.db
metadata = MetaData(testing.db)
def teardown(self):
metadata.drop_all()
metadata.tables.clear()
+ if self.engine is not testing.db:
+ self.engine.dispose()
def test_compiled_insert(self):
table = Table('testtable', metadata,
@@ -144,7 +127,7 @@ class InsertTest(TestBase, AssertsExecutionResults):
metadata.create_all()
- ins = table.insert(values={'data':bindparam('x')}).compile()
+ ins = table.insert(inline=True, values={'data':bindparam('x')}).compile()
ins.execute({'x':"five"}, {'x':"seven"})
assert table.select().execute().fetchall() == [(1, 'five'), (2, 'seven')]
@@ -155,6 +138,13 @@ class InsertTest(TestBase, AssertsExecutionResults):
metadata.create_all()
self._assert_data_with_sequence(table, "my_seq")
+ def test_sequence_returning_insert(self):
+ table = Table('testtable', metadata,
+ Column('id', Integer, Sequence('my_seq'), primary_key=True),
+ Column('data', String(30)))
+ metadata.create_all()
+ self._assert_data_with_sequence_returning(table, "my_seq")
+
def test_opt_sequence_insert(self):
table = Table('testtable', metadata,
Column('id', Integer, Sequence('my_seq', optional=True), primary_key=True),
@@ -162,6 +152,13 @@ class InsertTest(TestBase, AssertsExecutionResults):
metadata.create_all()
self._assert_data_autoincrement(table)
+ def test_opt_sequence_returning_insert(self):
+ table = Table('testtable', metadata,
+ Column('id', Integer, Sequence('my_seq', optional=True), primary_key=True),
+ Column('data', String(30)))
+ metadata.create_all()
+ self._assert_data_autoincrement_returning(table)
+
def test_autoincrement_insert(self):
table = Table('testtable', metadata,
Column('id', Integer, primary_key=True),
@@ -169,6 +166,13 @@ class InsertTest(TestBase, AssertsExecutionResults):
metadata.create_all()
self._assert_data_autoincrement(table)
+ def test_autoincrement_returning_insert(self):
+ table = Table('testtable', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('data', String(30)))
+ metadata.create_all()
+ self._assert_data_autoincrement_returning(table)
+
def test_noautoincrement_insert(self):
table = Table('testtable', metadata,
Column('id', Integer, primary_key=True, autoincrement=False),
@@ -177,14 +181,17 @@ class InsertTest(TestBase, AssertsExecutionResults):
self._assert_data_noautoincrement(table)
def _assert_data_autoincrement(self, table):
+ self.engine = engines.testing_engine(options={'implicit_returning':False})
+ metadata.bind = self.engine
+
def go():
# execute with explicit id
r = table.insert().execute({'id':30, 'data':'d1'})
- assert r.last_inserted_ids() == [30]
+ assert r.inserted_primary_key == [30]
# execute with prefetch id
r = table.insert().execute({'data':'d2'})
- assert r.last_inserted_ids() == [1]
+ assert r.inserted_primary_key == [1]
# executemany with explicit ids
table.insert().execute({'id':31, 'data':'d3'}, {'id':32, 'data':'d4'})
@@ -201,7 +208,7 @@ class InsertTest(TestBase, AssertsExecutionResults):
# note that the test framework doesnt capture the "preexecute" of a seqeuence
# or default. we just see it in the bind params.
- self.assert_sql(testing.db, go, [], with_sequences=[
+ self.assert_sql(self.engine, go, [], with_sequences=[
(
"INSERT INTO testtable (id, data) VALUES (:id, :data)",
{'id':30, 'data':'d1'}
@@ -242,19 +249,19 @@ class InsertTest(TestBase, AssertsExecutionResults):
# test the same series of events using a reflected
# version of the table
- m2 = MetaData(testing.db)
+ m2 = MetaData(self.engine)
table = Table(table.name, m2, autoload=True)
def go():
table.insert().execute({'id':30, 'data':'d1'})
r = table.insert().execute({'data':'d2'})
- assert r.last_inserted_ids() == [5]
+ assert r.inserted_primary_key == [5]
table.insert().execute({'id':31, 'data':'d3'}, {'id':32, 'data':'d4'})
table.insert().execute({'data':'d5'}, {'data':'d6'})
table.insert(inline=True).execute({'id':33, 'data':'d7'})
table.insert(inline=True).execute({'data':'d8'})
- self.assert_sql(testing.db, go, [], with_sequences=[
+ self.assert_sql(self.engine, go, [], with_sequences=[
(
"INSERT INTO testtable (id, data) VALUES (:id, :data)",
{'id':30, 'data':'d1'}
@@ -293,7 +300,127 @@ class InsertTest(TestBase, AssertsExecutionResults):
]
table.delete().execute()
+ def _assert_data_autoincrement_returning(self, table):
+ self.engine = engines.testing_engine(options={'implicit_returning':True})
+ metadata.bind = self.engine
+
+ def go():
+ # execute with explicit id
+ r = table.insert().execute({'id':30, 'data':'d1'})
+ assert r.inserted_primary_key == [30]
+
+ # execute with prefetch id
+ r = table.insert().execute({'data':'d2'})
+ assert r.inserted_primary_key == [1]
+
+ # executemany with explicit ids
+ table.insert().execute({'id':31, 'data':'d3'}, {'id':32, 'data':'d4'})
+
+ # executemany, uses SERIAL
+ table.insert().execute({'data':'d5'}, {'data':'d6'})
+
+ # single execute, explicit id, inline
+ table.insert(inline=True).execute({'id':33, 'data':'d7'})
+
+ # single execute, inline, uses SERIAL
+ table.insert(inline=True).execute({'data':'d8'})
+
+ self.assert_sql(self.engine, go, [], with_sequences=[
+ (
+ "INSERT INTO testtable (id, data) VALUES (:id, :data)",
+ {'id':30, 'data':'d1'}
+ ),
+ (
+ "INSERT INTO testtable (data) VALUES (:data) RETURNING testtable.id",
+ {'data': 'd2'}
+ ),
+ (
+ "INSERT INTO testtable (id, data) VALUES (:id, :data)",
+ [{'id':31, 'data':'d3'}, {'id':32, 'data':'d4'}]
+ ),
+ (
+ "INSERT INTO testtable (data) VALUES (:data)",
+ [{'data':'d5'}, {'data':'d6'}]
+ ),
+ (
+ "INSERT INTO testtable (id, data) VALUES (:id, :data)",
+ [{'id':33, 'data':'d7'}]
+ ),
+ (
+ "INSERT INTO testtable (data) VALUES (:data)",
+ [{'data':'d8'}]
+ ),
+ ])
+
+ assert table.select().execute().fetchall() == [
+ (30, 'd1'),
+ (1, 'd2'),
+ (31, 'd3'),
+ (32, 'd4'),
+ (2, 'd5'),
+ (3, 'd6'),
+ (33, 'd7'),
+ (4, 'd8'),
+ ]
+ table.delete().execute()
+
+ # test the same series of events using a reflected
+ # version of the table
+ m2 = MetaData(self.engine)
+ table = Table(table.name, m2, autoload=True)
+
+ def go():
+ table.insert().execute({'id':30, 'data':'d1'})
+ r = table.insert().execute({'data':'d2'})
+ assert r.inserted_primary_key == [5]
+ table.insert().execute({'id':31, 'data':'d3'}, {'id':32, 'data':'d4'})
+ table.insert().execute({'data':'d5'}, {'data':'d6'})
+ table.insert(inline=True).execute({'id':33, 'data':'d7'})
+ table.insert(inline=True).execute({'data':'d8'})
+
+ self.assert_sql(self.engine, go, [], with_sequences=[
+ (
+ "INSERT INTO testtable (id, data) VALUES (:id, :data)",
+ {'id':30, 'data':'d1'}
+ ),
+ (
+ "INSERT INTO testtable (data) VALUES (:data) RETURNING testtable.id",
+ {'data':'d2'}
+ ),
+ (
+ "INSERT INTO testtable (id, data) VALUES (:id, :data)",
+ [{'id':31, 'data':'d3'}, {'id':32, 'data':'d4'}]
+ ),
+ (
+ "INSERT INTO testtable (data) VALUES (:data)",
+ [{'data':'d5'}, {'data':'d6'}]
+ ),
+ (
+ "INSERT INTO testtable (id, data) VALUES (:id, :data)",
+ [{'id':33, 'data':'d7'}]
+ ),
+ (
+ "INSERT INTO testtable (data) VALUES (:data)",
+ [{'data':'d8'}]
+ ),
+ ])
+
+ assert table.select().execute().fetchall() == [
+ (30, 'd1'),
+ (5, 'd2'),
+ (31, 'd3'),
+ (32, 'd4'),
+ (6, 'd5'),
+ (7, 'd6'),
+ (33, 'd7'),
+ (8, 'd8'),
+ ]
+ table.delete().execute()
+
def _assert_data_with_sequence(self, table, seqname):
+ self.engine = engines.testing_engine(options={'implicit_returning':False})
+ metadata.bind = self.engine
+
def go():
table.insert().execute({'id':30, 'data':'d1'})
table.insert().execute({'data':'d2'})
@@ -302,7 +429,7 @@ class InsertTest(TestBase, AssertsExecutionResults):
table.insert(inline=True).execute({'id':33, 'data':'d7'})
table.insert(inline=True).execute({'data':'d8'})
- self.assert_sql(testing.db, go, [], with_sequences=[
+ self.assert_sql(self.engine, go, [], with_sequences=[
(
"INSERT INTO testtable (id, data) VALUES (:id, :data)",
{'id':30, 'data':'d1'}
@@ -343,18 +470,76 @@ class InsertTest(TestBase, AssertsExecutionResults):
# cant test reflection here since the Sequence must be
# explicitly specified
+ def _assert_data_with_sequence_returning(self, table, seqname):
+ self.engine = engines.testing_engine(options={'implicit_returning':True})
+ metadata.bind = self.engine
+
+ def go():
+ table.insert().execute({'id':30, 'data':'d1'})
+ table.insert().execute({'data':'d2'})
+ table.insert().execute({'id':31, 'data':'d3'}, {'id':32, 'data':'d4'})
+ table.insert().execute({'data':'d5'}, {'data':'d6'})
+ table.insert(inline=True).execute({'id':33, 'data':'d7'})
+ table.insert(inline=True).execute({'data':'d8'})
+
+ self.assert_sql(self.engine, go, [], with_sequences=[
+ (
+ "INSERT INTO testtable (id, data) VALUES (:id, :data)",
+ {'id':30, 'data':'d1'}
+ ),
+ (
+ "INSERT INTO testtable (id, data) VALUES (nextval('my_seq'), :data) RETURNING testtable.id",
+ {'data':'d2'}
+ ),
+ (
+ "INSERT INTO testtable (id, data) VALUES (:id, :data)",
+ [{'id':31, 'data':'d3'}, {'id':32, 'data':'d4'}]
+ ),
+ (
+ "INSERT INTO testtable (id, data) VALUES (nextval('%s'), :data)" % seqname,
+ [{'data':'d5'}, {'data':'d6'}]
+ ),
+ (
+ "INSERT INTO testtable (id, data) VALUES (:id, :data)",
+ [{'id':33, 'data':'d7'}]
+ ),
+ (
+ "INSERT INTO testtable (id, data) VALUES (nextval('%s'), :data)" % seqname,
+ [{'data':'d8'}]
+ ),
+ ])
+
+ assert table.select().execute().fetchall() == [
+ (30, 'd1'),
+ (1, 'd2'),
+ (31, 'd3'),
+ (32, 'd4'),
+ (2, 'd5'),
+ (3, 'd6'),
+ (33, 'd7'),
+ (4, 'd8'),
+ ]
+
+ # cant test reflection here since the Sequence must be
+ # explicitly specified
+
def _assert_data_noautoincrement(self, table):
+ self.engine = engines.testing_engine(options={'implicit_returning':False})
+ metadata.bind = self.engine
+
table.insert().execute({'id':30, 'data':'d1'})
- try:
- table.insert().execute({'data':'d2'})
- assert False
- except exc.IntegrityError, e:
- assert "violates not-null constraint" in str(e)
- try:
- table.insert().execute({'data':'d2'}, {'data':'d3'})
- assert False
- except exc.IntegrityError, e:
- assert "violates not-null constraint" in str(e)
+
+ if self.engine.driver == 'pg8000':
+ exception_cls = exc.ProgrammingError
+ else:
+ exception_cls = exc.IntegrityError
+
+ assert_raises_message(exception_cls, "violates not-null constraint", table.insert().execute, {'data':'d2'})
+ assert_raises_message(exception_cls, "violates not-null constraint", table.insert().execute, {'data':'d2'}, {'data':'d3'})
+
+ assert_raises_message(exception_cls, "violates not-null constraint", table.insert().execute, {'data':'d2'})
+
+ assert_raises_message(exception_cls, "violates not-null constraint", table.insert().execute, {'data':'d2'}, {'data':'d3'})
table.insert().execute({'id':31, 'data':'d2'}, {'id':32, 'data':'d3'})
table.insert(inline=True).execute({'id':33, 'data':'d4'})
@@ -369,19 +554,12 @@ class InsertTest(TestBase, AssertsExecutionResults):
# test the same series of events using a reflected
# version of the table
- m2 = MetaData(testing.db)
+ m2 = MetaData(self.engine)
table = Table(table.name, m2, autoload=True)
table.insert().execute({'id':30, 'data':'d1'})
- try:
- table.insert().execute({'data':'d2'})
- assert False
- except exc.IntegrityError, e:
- assert "violates not-null constraint" in str(e)
- try:
- table.insert().execute({'data':'d2'}, {'data':'d3'})
- assert False
- except exc.IntegrityError, e:
- assert "violates not-null constraint" in str(e)
+
+ assert_raises_message(exception_cls, "violates not-null constraint", table.insert().execute, {'data':'d2'})
+ assert_raises_message(exception_cls, "violates not-null constraint", table.insert().execute, {'data':'d2'}, {'data':'d3'})
table.insert().execute({'id':31, 'data':'d2'}, {'id':32, 'data':'d3'})
table.insert(inline=True).execute({'id':33, 'data':'d4'})
@@ -396,36 +574,36 @@ class InsertTest(TestBase, AssertsExecutionResults):
class DomainReflectionTest(TestBase, AssertsExecutionResults):
"Test PostgreSQL domains"
- __only_on__ = 'postgres'
+ __only_on__ = 'postgresql'
@classmethod
def setup_class(cls):
con = testing.db.connect()
for ddl in ('CREATE DOMAIN testdomain INTEGER NOT NULL DEFAULT 42',
- 'CREATE DOMAIN alt_schema.testdomain INTEGER DEFAULT 0'):
+ 'CREATE DOMAIN test_schema.testdomain INTEGER DEFAULT 0'):
try:
con.execute(ddl)
except exc.SQLError, e:
if not "already exists" in str(e):
raise e
con.execute('CREATE TABLE testtable (question integer, answer testdomain)')
- con.execute('CREATE TABLE alt_schema.testtable(question integer, answer alt_schema.testdomain, anything integer)')
- con.execute('CREATE TABLE crosschema (question integer, answer alt_schema.testdomain)')
+ con.execute('CREATE TABLE test_schema.testtable(question integer, answer test_schema.testdomain, anything integer)')
+ con.execute('CREATE TABLE crosschema (question integer, answer test_schema.testdomain)')
@classmethod
def teardown_class(cls):
con = testing.db.connect()
con.execute('DROP TABLE testtable')
- con.execute('DROP TABLE alt_schema.testtable')
+ con.execute('DROP TABLE test_schema.testtable')
con.execute('DROP TABLE crosschema')
con.execute('DROP DOMAIN testdomain')
- con.execute('DROP DOMAIN alt_schema.testdomain')
+ con.execute('DROP DOMAIN test_schema.testdomain')
def test_table_is_reflected(self):
metadata = MetaData(testing.db)
table = Table('testtable', metadata, autoload=True)
eq_(set(table.columns.keys()), set(['question', 'answer']), "Columns of reflected table didn't equal expected columns")
- eq_(table.c.answer.type.__class__, postgres.PGInteger)
+ assert isinstance(table.c.answer.type, Integer)
def test_domain_is_reflected(self):
metadata = MetaData(testing.db)
@@ -433,15 +611,15 @@ class DomainReflectionTest(TestBase, AssertsExecutionResults):
eq_(str(table.columns.answer.server_default.arg), '42', "Reflected default value didn't equal expected value")
assert not table.columns.answer.nullable, "Expected reflected column to not be nullable."
- def test_table_is_reflected_alt_schema(self):
+ def test_table_is_reflected_test_schema(self):
metadata = MetaData(testing.db)
- table = Table('testtable', metadata, autoload=True, schema='alt_schema')
+ table = Table('testtable', metadata, autoload=True, schema='test_schema')
eq_(set(table.columns.keys()), set(['question', 'answer', 'anything']), "Columns of reflected table didn't equal expected columns")
- eq_(table.c.anything.type.__class__, postgres.PGInteger)
+ assert isinstance(table.c.anything.type, Integer)
def test_schema_domain_is_reflected(self):
metadata = MetaData(testing.db)
- table = Table('testtable', metadata, autoload=True, schema='alt_schema')
+ table = Table('testtable', metadata, autoload=True, schema='test_schema')
eq_(str(table.columns.answer.server_default.arg), '0', "Reflected default value didn't equal expected value")
assert table.columns.answer.nullable, "Expected reflected column to be nullable."
@@ -452,10 +630,10 @@ class DomainReflectionTest(TestBase, AssertsExecutionResults):
assert table.columns.answer.nullable, "Expected reflected column to be nullable."
def test_unknown_types(self):
- from sqlalchemy.databases import postgres
+ from sqlalchemy.databases import postgresql
- ischema_names = postgres.ischema_names
- postgres.ischema_names = {}
+ ischema_names = postgresql.PGDialect.ischema_names
+ postgresql.PGDialect.ischema_names = {}
try:
m2 = MetaData(testing.db)
assert_raises(exc.SAWarning, Table, "testtable", m2, autoload=True)
@@ -467,11 +645,11 @@ class DomainReflectionTest(TestBase, AssertsExecutionResults):
assert t3.c.answer.type.__class__ == sa.types.NullType
finally:
- postgres.ischema_names = ischema_names
+ postgresql.PGDialect.ischema_names = ischema_names
-class MiscTest(TestBase, AssertsExecutionResults):
- __only_on__ = 'postgres'
+class MiscTest(TestBase, AssertsExecutionResults, AssertsCompiledSQL):
+ __only_on__ = 'postgresql'
def test_date_reflection(self):
m1 = MetaData(testing.db)
@@ -536,26 +714,26 @@ class MiscTest(TestBase, AssertsExecutionResults):
'FROM mytable')
def test_schema_reflection(self):
- """note: this test requires that the 'alt_schema' schema be separate and accessible by the test user"""
+ """note: this test requires that the 'test_schema' schema be separate and accessible by the test user"""
meta1 = MetaData(testing.db)
users = Table('users', meta1,
Column('user_id', Integer, primary_key = True),
Column('user_name', String(30), nullable = False),
- schema="alt_schema"
+ schema="test_schema"
)
addresses = Table('email_addresses', meta1,
Column('address_id', Integer, primary_key = True),
Column('remote_user_id', Integer, ForeignKey(users.c.user_id)),
Column('email_address', String(20)),
- schema="alt_schema"
+ schema="test_schema"
)
meta1.create_all()
try:
meta2 = MetaData(testing.db)
- addresses = Table('email_addresses', meta2, autoload=True, schema="alt_schema")
- users = Table('users', meta2, mustexist=True, schema="alt_schema")
+ addresses = Table('email_addresses', meta2, autoload=True, schema="test_schema")
+ users = Table('users', meta2, mustexist=True, schema="test_schema")
print users
print addresses
@@ -574,12 +752,12 @@ class MiscTest(TestBase, AssertsExecutionResults):
referer = Table("referer", meta1,
Column("id", Integer, primary_key=True),
Column("ref", Integer, ForeignKey('subject.id')),
- schema="alt_schema")
+ schema="test_schema")
meta1.create_all()
try:
meta2 = MetaData(testing.db)
subject = Table("subject", meta2, autoload=True)
- referer = Table("referer", meta2, schema="alt_schema", autoload=True)
+ referer = Table("referer", meta2, schema="test_schema", autoload=True)
print str(subject.join(referer).onclause)
self.assert_((subject.c.id==referer.c.ref).compare(subject.join(referer).onclause))
finally:
@@ -589,19 +767,19 @@ class MiscTest(TestBase, AssertsExecutionResults):
meta1 = MetaData(testing.db)
subject = Table("subject", meta1,
Column("id", Integer, primary_key=True),
- schema='alt_schema_2'
+ schema='test_schema_2'
)
referer = Table("referer", meta1,
Column("id", Integer, primary_key=True),
- Column("ref", Integer, ForeignKey('alt_schema_2.subject.id')),
- schema="alt_schema")
+ Column("ref", Integer, ForeignKey('test_schema_2.subject.id')),
+ schema="test_schema")
meta1.create_all()
try:
meta2 = MetaData(testing.db)
- subject = Table("subject", meta2, autoload=True, schema="alt_schema_2")
- referer = Table("referer", meta2, schema="alt_schema", autoload=True)
+ subject = Table("subject", meta2, autoload=True, schema="test_schema_2")
+ referer = Table("referer", meta2, schema="test_schema", autoload=True)
print str(subject.join(referer).onclause)
self.assert_((subject.c.id==referer.c.ref).compare(subject.join(referer).onclause))
finally:
@@ -611,7 +789,7 @@ class MiscTest(TestBase, AssertsExecutionResults):
meta = MetaData(testing.db)
users = Table('users', meta,
Column('id', Integer, primary_key=True),
- Column('name', String(50)), schema='alt_schema')
+ Column('name', String(50)), schema='test_schema')
users.create()
try:
users.insert().execute(id=1, name='name1')
@@ -646,15 +824,15 @@ class MiscTest(TestBase, AssertsExecutionResults):
user_name VARCHAR NOT NULL,
user_password VARCHAR NOT NULL
);
- """, None)
+ """)
t = Table("speedy_users", meta, autoload=True)
r = t.insert().execute(user_name='user', user_password='lala')
- assert r.last_inserted_ids() == [1]
+ assert r.inserted_primary_key == [1]
l = t.select().execute().fetchall()
assert l == [(1, 'user', 'lala')]
finally:
- testing.db.execute("drop table speedy_users", None)
+ testing.db.execute("drop table speedy_users")
@testing.emits_warning()
def test_index_reflection(self):
@@ -676,10 +854,10 @@ class MiscTest(TestBase, AssertsExecutionResults):
testing.db.execute("""
create index idx1 on party ((id || name))
- """, None)
+ """)
testing.db.execute("""
create unique index idx2 on party (id) where name = 'test'
- """, None)
+ """)
testing.db.execute("""
create index idx3 on party using btree
@@ -713,35 +891,42 @@ class MiscTest(TestBase, AssertsExecutionResults):
warnings.warn = capture_warnings._orig_showwarning
m1.drop_all()
- def test_create_partial_index(self):
- tbl = Table('testtbl', MetaData(), Column('data',Integer))
- idx = Index('test_idx1', tbl.c.data, postgres_where=and_(tbl.c.data > 5, tbl.c.data < 10))
-
- executed_sql = []
- mock_strategy = MockEngineStrategy()
- mock_conn = mock_strategy.create('postgres://', executed_sql.append)
+ def test_set_isolation_level(self):
+ """Test setting the isolation level with create_engine"""
+ eng = create_engine(testing.db.url)
+ eq_(
+ eng.execute("show transaction isolation level").scalar(),
+ 'read committed')
+ eng = create_engine(testing.db.url, isolation_level="SERIALIZABLE")
+ eq_(
+ eng.execute("show transaction isolation level").scalar(),
+ 'serializable')
+ eng = create_engine(testing.db.url, isolation_level="FOO")
- idx.create(mock_conn)
+ if testing.db.driver == 'zxjdbc':
+ exception_cls = eng.dialect.dbapi.Error
+ else:
+ exception_cls = eng.dialect.dbapi.ProgrammingError
+ assert_raises(exception_cls, eng.execute, "show transaction isolation level")
- assert executed_sql == ['CREATE INDEX test_idx1 ON testtbl (data) WHERE testtbl.data > 5 AND testtbl.data < 10']
class TimezoneTest(TestBase, AssertsExecutionResults):
"""Test timezone-aware datetimes.
- psycopg will return a datetime with a tzinfo attached to it, if postgres
+ psycopg will return a datetime with a tzinfo attached to it, if postgresql
returns it. python then will not let you compare a datetime with a tzinfo
to a datetime that doesnt have one. this test illustrates two ways to
have datetime types with and without timezone info.
"""
- __only_on__ = 'postgres'
+ __only_on__ = 'postgresql'
@classmethod
def setup_class(cls):
global tztable, notztable, metadata
metadata = MetaData(testing.db)
- # current_timestamp() in postgres is assumed to return TIMESTAMP WITH TIMEZONE
+ # current_timestamp() in postgresql is assumed to return TIMESTAMP WITH TIMEZONE
tztable = Table('tztable', metadata,
Column("id", Integer, primary_key=True),
Column("date", DateTime(timezone=True), onupdate=func.current_timestamp()),
@@ -762,17 +947,17 @@ class TimezoneTest(TestBase, AssertsExecutionResults):
somedate = testing.db.connect().scalar(func.current_timestamp().select())
tztable.insert().execute(id=1, name='row1', date=somedate)
c = tztable.update(tztable.c.id==1).execute(name='newname')
- print tztable.select(tztable.c.id==1).execute().fetchone()
+ print tztable.select(tztable.c.id==1).execute().first()
def test_without_timezone(self):
# get a date without a tzinfo
somedate = datetime.datetime(2005, 10,20, 11, 52, 00)
notztable.insert().execute(id=1, name='row1', date=somedate)
c = notztable.update(notztable.c.id==1).execute(name='newname')
- print notztable.select(tztable.c.id==1).execute().fetchone()
+ print notztable.select(tztable.c.id==1).execute().first()
class ArrayTest(TestBase, AssertsExecutionResults):
- __only_on__ = 'postgres'
+ __only_on__ = 'postgresql'
@classmethod
def setup_class(cls):
@@ -781,10 +966,14 @@ class ArrayTest(TestBase, AssertsExecutionResults):
arrtable = Table('arrtable', metadata,
Column('id', Integer, primary_key=True),
- Column('intarr', postgres.PGArray(Integer)),
- Column('strarr', postgres.PGArray(String(convert_unicode=True)), nullable=False)
+ Column('intarr', postgresql.PGArray(Integer)),
+ Column('strarr', postgresql.PGArray(String(convert_unicode=True)), nullable=False)
)
metadata.create_all()
+
+ def teardown(self):
+ arrtable.delete().execute()
+
@classmethod
def teardown_class(cls):
metadata.drop_all()
@@ -792,34 +981,38 @@ class ArrayTest(TestBase, AssertsExecutionResults):
def test_reflect_array_column(self):
metadata2 = MetaData(testing.db)
tbl = Table('arrtable', metadata2, autoload=True)
- assert isinstance(tbl.c.intarr.type, postgres.PGArray)
- assert isinstance(tbl.c.strarr.type, postgres.PGArray)
+ assert isinstance(tbl.c.intarr.type, postgresql.PGArray)
+ assert isinstance(tbl.c.strarr.type, postgresql.PGArray)
assert isinstance(tbl.c.intarr.type.item_type, Integer)
assert isinstance(tbl.c.strarr.type.item_type, String)
+ @testing.fails_on('postgresql+zxjdbc', 'zxjdbc has no support for PG arrays')
def test_insert_array(self):
arrtable.insert().execute(intarr=[1,2,3], strarr=['abc', 'def'])
results = arrtable.select().execute().fetchall()
eq_(len(results), 1)
eq_(results[0]['intarr'], [1,2,3])
eq_(results[0]['strarr'], ['abc','def'])
- arrtable.delete().execute()
+ @testing.fails_on('postgresql+pg8000', 'pg8000 has poor support for PG arrays')
+ @testing.fails_on('postgresql+zxjdbc', 'zxjdbc has no support for PG arrays')
def test_array_where(self):
arrtable.insert().execute(intarr=[1,2,3], strarr=['abc', 'def'])
arrtable.insert().execute(intarr=[4,5,6], strarr='ABC')
results = arrtable.select().where(arrtable.c.intarr == [1,2,3]).execute().fetchall()
eq_(len(results), 1)
eq_(results[0]['intarr'], [1,2,3])
- arrtable.delete().execute()
+ @testing.fails_on('postgresql+pg8000', 'pg8000 has poor support for PG arrays')
+ @testing.fails_on('postgresql+zxjdbc', 'zxjdbc has no support for PG arrays')
def test_array_concat(self):
arrtable.insert().execute(intarr=[1,2,3], strarr=['abc', 'def'])
results = select([arrtable.c.intarr + [4,5,6]]).execute().fetchall()
eq_(len(results), 1)
eq_(results[0][0], [1,2,3,4,5,6])
- arrtable.delete().execute()
+ @testing.fails_on('postgresql+pg8000', 'pg8000 has poor support for PG arrays')
+ @testing.fails_on('postgresql+zxjdbc', 'zxjdbc has no support for PG arrays')
def test_array_subtype_resultprocessor(self):
arrtable.insert().execute(intarr=[4,5,6], strarr=[[u'm\xe4\xe4'], [u'm\xf6\xf6']])
arrtable.insert().execute(intarr=[1,2,3], strarr=[u'm\xe4\xe4', u'm\xf6\xf6'])
@@ -827,13 +1020,14 @@ class ArrayTest(TestBase, AssertsExecutionResults):
eq_(len(results), 2)
eq_(results[0]['strarr'], [u'm\xe4\xe4', u'm\xf6\xf6'])
eq_(results[1]['strarr'], [[u'm\xe4\xe4'], [u'm\xf6\xf6']])
- arrtable.delete().execute()
+ @testing.fails_on('postgresql+pg8000', 'pg8000 has poor support for PG arrays')
+ @testing.fails_on('postgresql+zxjdbc', 'zxjdbc has no support for PG arrays')
def test_array_mutability(self):
class Foo(object): pass
footable = Table('foo', metadata,
Column('id', Integer, primary_key=True),
- Column('intarr', postgres.PGArray(Integer), nullable=True)
+ Column('intarr', postgresql.PGArray(Integer), nullable=True)
)
mapper(Foo, footable)
metadata.create_all()
@@ -870,19 +1064,19 @@ class ArrayTest(TestBase, AssertsExecutionResults):
sess.add(foo)
sess.flush()
-class TimeStampTest(TestBase, AssertsExecutionResults):
- __only_on__ = 'postgres'
-
- @testing.uses_deprecated()
+class TimestampTest(TestBase, AssertsExecutionResults):
+ __only_on__ = 'postgresql'
+
def test_timestamp(self):
engine = testing.db
connection = engine.connect()
- s = select([func.TIMESTAMP("12/25/07").label("ts")])
- result = connection.execute(s).fetchone()
+
+ s = select(["timestamp '2007-12-25'"])
+ result = connection.execute(s).first()
eq_(result[0], datetime.datetime(2007, 12, 25, 0, 0))
class ServerSideCursorsTest(TestBase, AssertsExecutionResults):
- __only_on__ = 'postgres'
+ __only_on__ = 'postgresql+psycopg2'
@classmethod
def setup_class(cls):
@@ -927,8 +1121,8 @@ class ServerSideCursorsTest(TestBase, AssertsExecutionResults):
class SpecialTypesTest(TestBase, ComparesTables):
"""test DDL and reflection of PG-specific types """
- __only_on__ = 'postgres'
- __excluded_on__ = (('postgres', '<', (8, 3, 0)),)
+ __only_on__ = 'postgresql'
+ __excluded_on__ = (('postgresql', '<', (8, 3, 0)),)
@classmethod
def setup_class(cls):
@@ -936,11 +1130,11 @@ class SpecialTypesTest(TestBase, ComparesTables):
metadata = MetaData(testing.db)
table = Table('sometable', metadata,
- Column('id', postgres.PGUuid, primary_key=True),
- Column('flag', postgres.PGBit),
- Column('addr', postgres.PGInet),
- Column('addr2', postgres.PGMacAddr),
- Column('addr3', postgres.PGCidr)
+ Column('id', postgresql.PGUuid, primary_key=True),
+ Column('flag', postgresql.PGBit),
+ Column('addr', postgresql.PGInet),
+ Column('addr2', postgresql.PGMacAddr),
+ Column('addr3', postgresql.PGCidr)
)
metadata.create_all()
@@ -957,8 +1151,8 @@ class SpecialTypesTest(TestBase, ComparesTables):
class MatchTest(TestBase, AssertsCompiledSQL):
- __only_on__ = 'postgres'
- __excluded_on__ = (('postgres', '<', (8, 3, 0)),)
+ __only_on__ = 'postgresql'
+ __excluded_on__ = (('postgresql', '<', (8, 3, 0)),)
@classmethod
def setup_class(cls):
@@ -992,9 +1186,16 @@ class MatchTest(TestBase, AssertsCompiledSQL):
def teardown_class(cls):
metadata.drop_all()
- def test_expression(self):
+ @testing.fails_on('postgresql+pg8000', 'uses positional')
+ @testing.fails_on('postgresql+zxjdbc', 'uses qmark')
+ def test_expression_pyformat(self):
self.assert_compile(matchtable.c.title.match('somstr'), "matchtable.title @@ to_tsquery(%(title_1)s)")
+ @testing.fails_on('postgresql+psycopg2', 'uses pyformat')
+ @testing.fails_on('postgresql+zxjdbc', 'uses qmark')
+ def test_expression_positional(self):
+ self.assert_compile(matchtable.c.title.match('somstr'), "matchtable.title @@ to_tsquery(%s)")
+
def test_simple_match(self):
results = matchtable.select().where(matchtable.c.title.match('python')).order_by(matchtable.c.id).execute().fetchall()
eq_([2, 5], [r.id for r in results])
diff --git a/test/dialect/test_sqlite.py b/test/dialect/test_sqlite.py
index eb4581e20..448ee947c 100644
--- a/test/dialect/test_sqlite.py
+++ b/test/dialect/test_sqlite.py
@@ -4,7 +4,7 @@ from sqlalchemy.test.testing import eq_, assert_raises, assert_raises_message
import datetime
from sqlalchemy import *
from sqlalchemy import exc, sql
-from sqlalchemy.databases import sqlite
+from sqlalchemy.dialects.sqlite import base as sqlite, pysqlite as pysqlite_dialect
from sqlalchemy.test import *
@@ -19,7 +19,7 @@ class TestTypes(TestBase, AssertsExecutionResults):
meta = MetaData(testing.db)
t = Table('bool_table', meta,
Column('id', Integer, primary_key=True),
- Column('boo', sqlite.SLBoolean))
+ Column('boo', Boolean))
try:
meta.create_all()
@@ -39,7 +39,7 @@ class TestTypes(TestBase, AssertsExecutionResults):
def test_time_microseconds(self):
dt = datetime.datetime(2008, 6, 27, 12, 0, 0, 125) # 125 usec
eq_(str(dt), '2008-06-27 12:00:00.000125')
- sldt = sqlite.SLDateTime()
+ sldt = sqlite._SLDateTime()
bp = sldt.bind_processor(None)
eq_(bp(dt), '2008-06-27 12:00:00.000125')
@@ -69,59 +69,44 @@ class TestTypes(TestBase, AssertsExecutionResults):
bindproc = t.dialect_impl(dialect).bind_processor(dialect)
assert not bindproc or isinstance(bindproc(u"some string"), unicode)
- @testing.uses_deprecated('Using String type with no length')
def test_type_reflection(self):
# (ask_for, roundtripped_as_if_different)
- specs = [( String(), sqlite.SLString(), ),
- ( String(1), sqlite.SLString(1), ),
- ( String(3), sqlite.SLString(3), ),
- ( Text(), sqlite.SLText(), ),
- ( Unicode(), sqlite.SLString(), ),
- ( Unicode(1), sqlite.SLString(1), ),
- ( Unicode(3), sqlite.SLString(3), ),
- ( UnicodeText(), sqlite.SLText(), ),
- ( CLOB, sqlite.SLText(), ),
- ( sqlite.SLChar(1), ),
- ( CHAR(3), sqlite.SLChar(3), ),
- ( NCHAR(2), sqlite.SLChar(2), ),
- ( SmallInteger(), sqlite.SLSmallInteger(), ),
- ( sqlite.SLSmallInteger(), ),
- ( Binary(3), sqlite.SLBinary(), ),
- ( Binary(), sqlite.SLBinary() ),
- ( sqlite.SLBinary(3), sqlite.SLBinary(), ),
- ( NUMERIC, sqlite.SLNumeric(), ),
- ( NUMERIC(10,2), sqlite.SLNumeric(10,2), ),
- ( Numeric, sqlite.SLNumeric(), ),
- ( Numeric(10, 2), sqlite.SLNumeric(10, 2), ),
- ( DECIMAL, sqlite.SLNumeric(), ),
- ( DECIMAL(10, 2), sqlite.SLNumeric(10, 2), ),
- ( Float, sqlite.SLFloat(), ),
- ( sqlite.SLNumeric(), ),
- ( INT, sqlite.SLInteger(), ),
- ( Integer, sqlite.SLInteger(), ),
- ( sqlite.SLInteger(), ),
- ( TIMESTAMP, sqlite.SLDateTime(), ),
- ( DATETIME, sqlite.SLDateTime(), ),
- ( DateTime, sqlite.SLDateTime(), ),
- ( sqlite.SLDateTime(), ),
- ( DATE, sqlite.SLDate(), ),
- ( Date, sqlite.SLDate(), ),
- ( sqlite.SLDate(), ),
- ( TIME, sqlite.SLTime(), ),
- ( Time, sqlite.SLTime(), ),
- ( sqlite.SLTime(), ),
- ( BOOLEAN, sqlite.SLBoolean(), ),
- ( Boolean, sqlite.SLBoolean(), ),
- ( sqlite.SLBoolean(), ),
+ specs = [( String(), String(), ),
+ ( String(1), String(1), ),
+ ( String(3), String(3), ),
+ ( Text(), Text(), ),
+ ( Unicode(), String(), ),
+ ( Unicode(1), String(1), ),
+ ( Unicode(3), String(3), ),
+ ( UnicodeText(), Text(), ),
+ ( CHAR(1), ),
+ ( CHAR(3), CHAR(3), ),
+ ( NUMERIC, NUMERIC(), ),
+ ( NUMERIC(10,2), NUMERIC(10,2), ),
+ ( Numeric, NUMERIC(), ),
+ ( Numeric(10, 2), NUMERIC(10, 2), ),
+ ( DECIMAL, DECIMAL(), ),
+ ( DECIMAL(10, 2), DECIMAL(10, 2), ),
+ ( Float, Float(), ),
+ ( NUMERIC(), ),
+ ( TIMESTAMP, TIMESTAMP(), ),
+ ( DATETIME, DATETIME(), ),
+ ( DateTime, DateTime(), ),
+ ( DateTime(), ),
+ ( DATE, DATE(), ),
+ ( Date, Date(), ),
+ ( TIME, TIME(), ),
+ ( Time, Time(), ),
+ ( BOOLEAN, BOOLEAN(), ),
+ ( Boolean, Boolean(), ),
]
columns = [Column('c%i' % (i + 1), t[0]) for i, t in enumerate(specs)]
db = testing.db
m = MetaData(db)
t_table = Table('types', m, *columns)
+ m.create_all()
try:
- m.create_all()
-
m2 = MetaData(db)
rt = Table('types', m2, autoload=True)
try:
@@ -131,7 +116,7 @@ class TestTypes(TestBase, AssertsExecutionResults):
expected = [len(c) > 1 and c[1] or c[0] for c in specs]
for table in rt, rv:
for i, reflected in enumerate(table.c):
- assert isinstance(reflected.type, type(expected[i])), type(expected[i])
+ assert isinstance(reflected.type, type(expected[i])), "%d: %r" % (i, type(expected[i]))
finally:
db.execute('DROP VIEW types_v')
finally:
@@ -163,7 +148,7 @@ class TestDefaults(TestBase, AssertsExecutionResults):
rt = Table('t_defaults', m2, autoload=True)
expected = [c[1] for c in specs]
for i, reflected in enumerate(rt.c):
- eq_(reflected.server_default.arg.text, expected[i])
+ eq_(str(reflected.server_default.arg), expected[i])
finally:
m.drop_all()
@@ -173,7 +158,7 @@ class TestDefaults(TestBase, AssertsExecutionResults):
db = testing.db
m = MetaData(db)
- expected = ["'my_default'", '0']
+ expected = ["my_default", '0']
table = """CREATE TABLE r_defaults (
data VARCHAR(40) DEFAULT 'my_default',
val INTEGER NOT NULL DEFAULT 0
@@ -184,7 +169,7 @@ class TestDefaults(TestBase, AssertsExecutionResults):
rt = Table('r_defaults', m, autoload=True)
for i, reflected in enumerate(rt.c):
- eq_(reflected.server_default.arg.text, expected[i])
+ eq_(str(reflected.server_default.arg), expected[i])
finally:
db.execute("DROP TABLE r_defaults")
@@ -247,24 +232,24 @@ class DialectTest(TestBase, AssertsExecutionResults):
def test_attached_as_schema(self):
cx = testing.db.connect()
try:
- cx.execute('ATTACH DATABASE ":memory:" AS alt_schema')
+ cx.execute('ATTACH DATABASE ":memory:" AS test_schema')
dialect = cx.dialect
- assert dialect.table_names(cx, 'alt_schema') == []
+ assert dialect.table_names(cx, 'test_schema') == []
meta = MetaData(cx)
Table('created', meta, Column('id', Integer),
- schema='alt_schema')
+ schema='test_schema')
alt_master = Table('sqlite_master', meta, autoload=True,
- schema='alt_schema')
+ schema='test_schema')
meta.create_all(cx)
- eq_(dialect.table_names(cx, 'alt_schema'),
+ eq_(dialect.table_names(cx, 'test_schema'),
['created'])
assert len(alt_master.c) > 0
meta.clear()
reflected = Table('created', meta, autoload=True,
- schema='alt_schema')
+ schema='test_schema')
assert len(reflected.c) == 1
cx.execute(reflected.insert(), dict(id=1))
@@ -282,9 +267,9 @@ class DialectTest(TestBase, AssertsExecutionResults):
# note that sqlite_master is cleared, above
meta.drop_all()
- assert dialect.table_names(cx, 'alt_schema') == []
+ assert dialect.table_names(cx, 'test_schema') == []
finally:
- cx.execute('DETACH DATABASE alt_schema')
+ cx.execute('DETACH DATABASE test_schema')
@testing.exclude('sqlite', '<', (2, 6), 'no database support')
def test_temp_table_reflection(self):
@@ -305,6 +290,20 @@ class DialectTest(TestBase, AssertsExecutionResults):
pass
raise
+ def test_set_isolation_level(self):
+ """Test setting the read uncommitted/serializable levels"""
+ eng = create_engine(testing.db.url)
+ eq_(eng.execute("PRAGMA read_uncommitted").scalar(), 0)
+
+ eng = create_engine(testing.db.url, isolation_level="READ UNCOMMITTED")
+ eq_(eng.execute("PRAGMA read_uncommitted").scalar(), 1)
+
+ eng = create_engine(testing.db.url, isolation_level="SERIALIZABLE")
+ eq_(eng.execute("PRAGMA read_uncommitted").scalar(), 0)
+
+ assert_raises(exc.ArgumentError, create_engine, testing.db.url,
+ isolation_level="FOO")
+
class SQLTest(TestBase, AssertsCompiledSQL):
"""Tests SQLite-dialect specific compilation."""