diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2009-08-06 21:11:27 +0000 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2009-08-06 21:11:27 +0000 |
| commit | 8fc5005dfe3eb66a46470ad8a8c7b95fc4d6bdca (patch) | |
| tree | ae9e27d12c9fbf8297bb90469509e1cb6a206242 /test/dialect | |
| parent | 7638aa7f242c6ea3d743aa9100e32be2052546a6 (diff) | |
| download | sqlalchemy-8fc5005dfe3eb66a46470ad8a8c7b95fc4d6bdca.tar.gz | |
merge 0.6 series to trunk.
Diffstat (limited to 'test/dialect')
| -rw-r--r-- | test/dialect/test_firebird.py | 123 | ||||
| -rw-r--r-- | test/dialect/test_informix.py | 3 | ||||
| -rw-r--r-- | test/dialect/test_maxdb.py | 2 | ||||
| -rw-r--r-- | test/dialect/test_mssql.py | 532 | ||||
| -rw-r--r-- | test/dialect/test_mysql.py | 219 | ||||
| -rw-r--r-- | test/dialect/test_oracle.py | 53 | ||||
| -rw-r--r-- | test/dialect/test_postgresql.py (renamed from test/dialect/test_postgres.py) | 553 | ||||
| -rw-r--r-- | test/dialect/test_sqlite.py | 117 |
8 files changed, 906 insertions, 696 deletions
diff --git a/test/dialect/test_firebird.py b/test/dialect/test_firebird.py index fa608c9a1..2dc6af91b 100644 --- a/test/dialect/test_firebird.py +++ b/test/dialect/test_firebird.py @@ -50,6 +50,7 @@ class DomainReflectionTest(TestBase, AssertsExecutionResults): con.execute('DROP GENERATOR gen_testtable_id') def test_table_is_reflected(self): + from sqlalchemy.types import Integer, Text, Binary, String, Date, Time, DateTime metadata = MetaData(testing.db) table = Table('testtable', metadata, autoload=True) eq_(set(table.columns.keys()), @@ -57,17 +58,17 @@ class DomainReflectionTest(TestBase, AssertsExecutionResults): "Columns of reflected table didn't equal expected columns") eq_(table.c.question.primary_key, True) eq_(table.c.question.sequence.name, 'gen_testtable_id') - eq_(table.c.question.type.__class__, firebird.FBInteger) + assert isinstance(table.c.question.type, Integer) eq_(table.c.question.server_default.arg.text, "42") - eq_(table.c.answer.type.__class__, firebird.FBString) + assert isinstance(table.c.answer.type, String) eq_(table.c.answer.server_default.arg.text, "'no answer'") - eq_(table.c.remark.type.__class__, firebird.FBText) + assert isinstance(table.c.remark.type, Text) eq_(table.c.remark.server_default.arg.text, "''") - eq_(table.c.photo.type.__class__, firebird.FBBinary) + assert isinstance(table.c.photo.type, Binary) # The following assume a Dialect 3 database - eq_(table.c.d.type.__class__, firebird.FBDate) - eq_(table.c.t.type.__class__, firebird.FBTime) - eq_(table.c.dt.type.__class__, firebird.FBDateTime) + assert isinstance(table.c.d.type, Date) + assert isinstance(table.c.t.type, Time) + assert isinstance(table.c.dt.type, DateTime) class CompileTest(TestBase, AssertsCompiledSQL): @@ -76,7 +77,13 @@ class CompileTest(TestBase, AssertsCompiledSQL): def test_alias(self): t = table('sometable', column('col1'), column('col2')) s = select([t.alias()]) - self.assert_compile(s, "SELECT sometable_1.col1, sometable_1.col2 FROM sometable sometable_1") + self.assert_compile(s, "SELECT sometable_1.col1, sometable_1.col2 FROM sometable AS sometable_1") + + dialect = firebird.FBDialect() + dialect._version_two = False + self.assert_compile(s, "SELECT sometable_1.col1, sometable_1.col2 FROM sometable sometable_1", + dialect = dialect + ) def test_function(self): self.assert_compile(func.foo(1, 2), "foo(:foo_1, :foo_2)") @@ -98,15 +105,15 @@ class CompileTest(TestBase, AssertsCompiledSQL): column('description', String(128)), ) - u = update(table1, values=dict(name='foo'), firebird_returning=[table1.c.myid, table1.c.name]) + u = update(table1, values=dict(name='foo')).returning(table1.c.myid, table1.c.name) self.assert_compile(u, "UPDATE mytable SET name=:name RETURNING mytable.myid, mytable.name") - u = update(table1, values=dict(name='foo'), firebird_returning=[table1]) + u = update(table1, values=dict(name='foo')).returning(table1) self.assert_compile(u, "UPDATE mytable SET name=:name "\ "RETURNING mytable.myid, mytable.name, mytable.description") - u = update(table1, values=dict(name='foo'), firebird_returning=[func.length(table1.c.name)]) - self.assert_compile(u, "UPDATE mytable SET name=:name RETURNING char_length(mytable.name)") + u = update(table1, values=dict(name='foo')).returning(func.length(table1.c.name)) + self.assert_compile(u, "UPDATE mytable SET name=:name RETURNING char_length(mytable.name) AS length_1") def test_insert_returning(self): table1 = table('mytable', @@ -115,90 +122,20 @@ class CompileTest(TestBase, AssertsCompiledSQL): column('description', String(128)), ) - i = insert(table1, values=dict(name='foo'), firebird_returning=[table1.c.myid, table1.c.name]) + i = insert(table1, values=dict(name='foo')).returning(table1.c.myid, table1.c.name) self.assert_compile(i, "INSERT INTO mytable (name) VALUES (:name) RETURNING mytable.myid, mytable.name") - i = insert(table1, values=dict(name='foo'), firebird_returning=[table1]) + i = insert(table1, values=dict(name='foo')).returning(table1) self.assert_compile(i, "INSERT INTO mytable (name) VALUES (:name) "\ "RETURNING mytable.myid, mytable.name, mytable.description") - i = insert(table1, values=dict(name='foo'), firebird_returning=[func.length(table1.c.name)]) - self.assert_compile(i, "INSERT INTO mytable (name) VALUES (:name) RETURNING char_length(mytable.name)") + i = insert(table1, values=dict(name='foo')).returning(func.length(table1.c.name)) + self.assert_compile(i, "INSERT INTO mytable (name) VALUES (:name) RETURNING char_length(mytable.name) AS length_1") -class ReturningTest(TestBase, AssertsExecutionResults): - __only_on__ = 'firebird' - - @testing.exclude('firebird', '<', (2, 1), '2.1+ feature') - def test_update_returning(self): - meta = MetaData(testing.db) - table = Table('tables', meta, - Column('id', Integer, Sequence('gen_tables_id'), primary_key=True), - Column('persons', Integer), - Column('full', Boolean) - ) - table.create() - try: - table.insert().execute([{'persons': 5, 'full': False}, {'persons': 3, 'full': False}]) - - result = table.update(table.c.persons > 4, dict(full=True), firebird_returning=[table.c.id]).execute() - eq_(result.fetchall(), [(1,)]) - - result2 = select([table.c.id, table.c.full]).order_by(table.c.id).execute() - eq_(result2.fetchall(), [(1,True),(2,False)]) - finally: - table.drop() - - @testing.exclude('firebird', '<', (2, 0), '2.0+ feature') - def test_insert_returning(self): - meta = MetaData(testing.db) - table = Table('tables', meta, - Column('id', Integer, Sequence('gen_tables_id'), primary_key=True), - Column('persons', Integer), - Column('full', Boolean) - ) - table.create() - try: - result = table.insert(firebird_returning=[table.c.id]).execute({'persons': 1, 'full': False}) - - eq_(result.fetchall(), [(1,)]) - - # Multiple inserts only return the last row - result2 = table.insert(firebird_returning=[table]).execute( - [{'persons': 2, 'full': False}, {'persons': 3, 'full': True}]) - - eq_(result2.fetchall(), [(3,3,True)]) - - result3 = table.insert(firebird_returning=[table.c.id]).execute({'persons': 4, 'full': False}) - eq_([dict(row) for row in result3], [{'ID':4}]) - - result4 = testing.db.execute('insert into tables (id, persons, "full") values (5, 10, 1) returning persons') - eq_([dict(row) for row in result4], [{'PERSONS': 10}]) - finally: - table.drop() - - @testing.exclude('firebird', '<', (2, 1), '2.1+ feature') - def test_delete_returning(self): - meta = MetaData(testing.db) - table = Table('tables', meta, - Column('id', Integer, Sequence('gen_tables_id'), primary_key=True), - Column('persons', Integer), - Column('full', Boolean) - ) - table.create() - try: - table.insert().execute([{'persons': 5, 'full': False}, {'persons': 3, 'full': False}]) - - result = table.delete(table.c.persons > 4, firebird_returning=[table.c.id]).execute() - eq_(result.fetchall(), [(1,)]) - - result2 = select([table.c.id, table.c.full]).order_by(table.c.id).execute() - eq_(result2.fetchall(), [(2,False),]) - finally: - table.drop() -class MiscFBTests(TestBase): +class MiscTest(TestBase): __only_on__ = 'firebird' def test_strlen(self): @@ -217,12 +154,20 @@ class MiscFBTests(TestBase): try: t.insert(values=dict(name='dante')).execute() t.insert(values=dict(name='alighieri')).execute() - select([func.count(t.c.id)],func.length(t.c.name)==5).execute().fetchone()[0] == 1 + select([func.count(t.c.id)],func.length(t.c.name)==5).execute().first()[0] == 1 finally: meta.drop_all() def test_server_version_info(self): - version = testing.db.dialect.server_version_info(testing.db.connect()) + version = testing.db.dialect.server_version_info assert len(version) == 3, "Got strange version info: %s" % repr(version) + def test_percents_in_text(self): + for expr, result in ( + (text("select '%' from rdb$database"), '%'), + (text("select '%%' from rdb$database"), '%%'), + (text("select '%%%' from rdb$database"), '%%%'), + (text("select 'hello % world' from rdb$database"), "hello % world") + ): + eq_(testing.db.scalar(expr), result) diff --git a/test/dialect/test_informix.py b/test/dialect/test_informix.py index 86a4e751d..e647990d3 100644 --- a/test/dialect/test_informix.py +++ b/test/dialect/test_informix.py @@ -4,7 +4,8 @@ from sqlalchemy.test import * class CompileTest(TestBase, AssertsCompiledSQL): - __dialect__ = informix.InfoDialect() + __only_on__ = 'informix' + __dialect__ = informix.InformixDialect() def test_statements(self): meta =MetaData() diff --git a/test/dialect/test_maxdb.py b/test/dialect/test_maxdb.py index 033a05533..c69a81120 100644 --- a/test/dialect/test_maxdb.py +++ b/test/dialect/test_maxdb.py @@ -185,7 +185,7 @@ class DBAPITest(TestBase, AssertsExecutionResults): vals = [] for i in xrange(3): cr.execute('SELECT busto.NEXTVAL FROM DUAL') - vals.append(cr.fetchone()[0]) + vals.append(cr.first()[0]) # should be 1,2,3, but no... self.assert_(vals != [1,2,3]) diff --git a/test/dialect/test_mssql.py b/test/dialect/test_mssql.py index dd86ce0de..423310db6 100644 --- a/test/dialect/test_mssql.py +++ b/test/dialect/test_mssql.py @@ -2,17 +2,18 @@ from sqlalchemy.test.testing import eq_ import datetime, os, re from sqlalchemy import * -from sqlalchemy import types, exc +from sqlalchemy import types, exc, schema from sqlalchemy.orm import * from sqlalchemy.sql import table, column from sqlalchemy.databases import mssql -import sqlalchemy.engine.url as url +from sqlalchemy.dialects.mssql import pyodbc +from sqlalchemy.engine import url from sqlalchemy.test import * from sqlalchemy.test.testing import eq_ class CompileTest(TestBase, AssertsCompiledSQL): - __dialect__ = mssql.MSSQLDialect() + __dialect__ = mssql.dialect() def test_insert(self): t = table('sometable', column('somecolumn')) @@ -157,6 +158,45 @@ class CompileTest(TestBase, AssertsCompiledSQL): select([extract(field, t.c.col1)]), 'SELECT DATEPART("%s", t.col1) AS anon_1 FROM t' % field) + def test_update_returning(self): + table1 = table('mytable', + column('myid', Integer), + column('name', String(128)), + column('description', String(128)), + ) + + u = update(table1, values=dict(name='foo')).returning(table1.c.myid, table1.c.name) + self.assert_compile(u, "UPDATE mytable SET name=:name OUTPUT inserted.myid, inserted.name") + + u = update(table1, values=dict(name='foo')).returning(table1) + self.assert_compile(u, "UPDATE mytable SET name=:name OUTPUT inserted.myid, " + "inserted.name, inserted.description") + + u = update(table1, values=dict(name='foo')).returning(table1).where(table1.c.name=='bar') + self.assert_compile(u, "UPDATE mytable SET name=:name OUTPUT inserted.myid, " + "inserted.name, inserted.description WHERE mytable.name = :name_1") + + u = update(table1, values=dict(name='foo')).returning(func.length(table1.c.name)) + self.assert_compile(u, "UPDATE mytable SET name=:name OUTPUT LEN(inserted.name) AS length_1") + + def test_insert_returning(self): + table1 = table('mytable', + column('myid', Integer), + column('name', String(128)), + column('description', String(128)), + ) + + i = insert(table1, values=dict(name='foo')).returning(table1.c.myid, table1.c.name) + self.assert_compile(i, "INSERT INTO mytable (name) OUTPUT inserted.myid, inserted.name VALUES (:name)") + + i = insert(table1, values=dict(name='foo')).returning(table1) + self.assert_compile(i, "INSERT INTO mytable (name) OUTPUT inserted.myid, " + "inserted.name, inserted.description VALUES (:name)") + + i = insert(table1, values=dict(name='foo')).returning(func.length(table1.c.name)) + self.assert_compile(i, "INSERT INTO mytable (name) OUTPUT LEN(inserted.name) AS length_1 VALUES (:name)") + + class IdentityInsertTest(TestBase, AssertsCompiledSQL): __only_on__ = 'mssql' @@ -189,9 +229,9 @@ class IdentityInsertTest(TestBase, AssertsCompiledSQL): eq_([(9, 'Python')], list(cats)) result = cattable.insert().values(description='PHP').execute() - eq_([10], result.last_inserted_ids()) + eq_([10], result.inserted_primary_key) lastcat = cattable.select().order_by(desc(cattable.c.id)).execute() - eq_((10, 'PHP'), lastcat.fetchone()) + eq_((10, 'PHP'), lastcat.first()) def test_executemany(self): cattable.insert().execute([ @@ -213,10 +253,51 @@ class IdentityInsertTest(TestBase, AssertsCompiledSQL): eq_([(91, 'Smalltalk'), (90, 'PHP')], list(lastcats)) -class ReflectionTest(TestBase): +class ReflectionTest(TestBase, ComparesTables): __only_on__ = 'mssql' - def testidentity(self): + def test_basic_reflection(self): + meta = MetaData(testing.db) + + users = Table('engine_users', meta, + Column('user_id', types.INT, primary_key=True), + Column('user_name', types.VARCHAR(20), nullable=False), + Column('test1', types.CHAR(5), nullable=False), + Column('test2', types.Float(5), nullable=False), + Column('test3', types.Text), + Column('test4', types.Numeric, nullable = False), + Column('test5', types.DateTime), + Column('parent_user_id', types.Integer, + ForeignKey('engine_users.user_id')), + Column('test6', types.DateTime, nullable=False), + Column('test7', types.Text), + Column('test8', types.Binary), + Column('test_passivedefault2', types.Integer, server_default='5'), + Column('test9', types.Binary(100)), + Column('test_numeric', types.Numeric()), + test_needs_fk=True, + ) + + addresses = Table('engine_email_addresses', meta, + Column('address_id', types.Integer, primary_key = True), + Column('remote_user_id', types.Integer, ForeignKey(users.c.user_id)), + Column('email_address', types.String(20)), + test_needs_fk=True, + ) + meta.create_all() + + try: + meta2 = MetaData() + reflected_users = Table('engine_users', meta2, autoload=True, + autoload_with=testing.db) + reflected_addresses = Table('engine_email_addresses', meta2, + autoload=True, autoload_with=testing.db) + self.assert_tables_equal(users, reflected_users) + self.assert_tables_equal(addresses, reflected_addresses) + finally: + meta.drop_all() + + def test_identity(self): meta = MetaData(testing.db) table = Table( 'identity_test', meta, @@ -240,7 +321,7 @@ class QueryUnicodeTest(TestBase): meta = MetaData(testing.db) t1 = Table('unitest_table', meta, Column('id', Integer, primary_key=True), - Column('descr', mssql.MSText(200, convert_unicode=True))) + Column('descr', mssql.MSText(convert_unicode=True))) meta.create_all() con = testing.db.connect() @@ -248,7 +329,7 @@ class QueryUnicodeTest(TestBase): con.execute(u"insert into unitest_table values ('bien mangé')".encode('UTF-8')) try: - r = t1.select().execute().fetchone() + r = t1.select().execute().first() assert isinstance(r[1], unicode), '%s is %s instead of unicode, working on %s' % ( r[1], type(r[1]), meta.bind) @@ -262,7 +343,9 @@ class QueryTest(TestBase): meta = MetaData(testing.db) t1 = Table('t1', meta, Column('id', Integer, Sequence('fred', 100, 1), primary_key=True), - Column('descr', String(200))) + Column('descr', String(200)), + implicit_returning = False + ) t2 = Table('t2', meta, Column('id', Integer, Sequence('fred', 200, 1), primary_key=True), Column('descr', String(200))) @@ -274,9 +357,9 @@ class QueryTest(TestBase): try: tr = con.begin() r = con.execute(t2.insert(), descr='hello') - self.assert_(r.last_inserted_ids() == [200]) + self.assert_(r.inserted_primary_key == [200]) r = con.execute(t1.insert(), descr='hello') - self.assert_(r.last_inserted_ids() == [100]) + self.assert_(r.inserted_primary_key == [100]) finally: tr.commit() @@ -295,6 +378,19 @@ class QueryTest(TestBase): tbl.drop() con.execute('drop schema paj') + def test_returning_no_autoinc(self): + meta = MetaData(testing.db) + + table = Table('t1', meta, Column('id', Integer, primary_key=True), Column('data', String(50))) + table.create() + try: + result = table.insert().values(id=1, data=func.lower("SomeString")).returning(table.c.id, table.c.data).execute() + eq_(result.fetchall(), [(1, 'somestring',)]) + finally: + # this will hang if the "SET IDENTITY_INSERT t1 OFF" occurs before the + # result is fetched + table.drop() + def test_delete_schema(self): meta = MetaData(testing.db) con = testing.db.connect() @@ -371,36 +467,26 @@ class SchemaTest(TestBase): ) self.column = t.c.test_column + dialect = mssql.dialect() + self.ddl_compiler = dialect.ddl_compiler(dialect, schema.CreateTable(t)) + + def _column_spec(self): + return self.ddl_compiler.get_column_specification(self.column) + def test_that_mssql_default_nullability_emits_null(self): - schemagenerator = \ - mssql.MSSQLDialect().schemagenerator(mssql.MSSQLDialect(), None) - column_specification = \ - schemagenerator.get_column_specification(self.column) - eq_("test_column VARCHAR NULL", column_specification) + eq_("test_column VARCHAR NULL", self._column_spec()) def test_that_mssql_none_nullability_does_not_emit_nullability(self): - schemagenerator = \ - mssql.MSSQLDialect().schemagenerator(mssql.MSSQLDialect(), None) self.column.nullable = None - column_specification = \ - schemagenerator.get_column_specification(self.column) - eq_("test_column VARCHAR", column_specification) + eq_("test_column VARCHAR", self._column_spec()) def test_that_mssql_specified_nullable_emits_null(self): - schemagenerator = \ - mssql.MSSQLDialect().schemagenerator(mssql.MSSQLDialect(), None) self.column.nullable = True - column_specification = \ - schemagenerator.get_column_specification(self.column) - eq_("test_column VARCHAR NULL", column_specification) + eq_("test_column VARCHAR NULL", self._column_spec()) def test_that_mssql_specified_not_nullable_emits_not_null(self): - schemagenerator = \ - mssql.MSSQLDialect().schemagenerator(mssql.MSSQLDialect(), None) self.column.nullable = False - column_specification = \ - schemagenerator.get_column_specification(self.column) - eq_("test_column VARCHAR NOT NULL", column_specification) + eq_("test_column VARCHAR NOT NULL", self._column_spec()) def full_text_search_missing(): @@ -515,79 +601,73 @@ class MatchTest(TestBase, AssertsCompiledSQL): class ParseConnectTest(TestBase, AssertsCompiledSQL): __only_on__ = 'mssql' + @classmethod + def setup_class(cls): + global dialect + dialect = pyodbc.MSDialect_pyodbc() + def test_pyodbc_connect_dsn_trusted(self): u = url.make_url('mssql://mydsn') - dialect = mssql.MSSQLDialect_pyodbc() connection = dialect.create_connect_args(u) eq_([['dsn=mydsn;TrustedConnection=Yes'], {}], connection) def test_pyodbc_connect_old_style_dsn_trusted(self): u = url.make_url('mssql:///?dsn=mydsn') - dialect = mssql.MSSQLDialect_pyodbc() connection = dialect.create_connect_args(u) eq_([['dsn=mydsn;TrustedConnection=Yes'], {}], connection) def test_pyodbc_connect_dsn_non_trusted(self): u = url.make_url('mssql://username:password@mydsn') - dialect = mssql.MSSQLDialect_pyodbc() connection = dialect.create_connect_args(u) eq_([['dsn=mydsn;UID=username;PWD=password'], {}], connection) def test_pyodbc_connect_dsn_extra(self): u = url.make_url('mssql://username:password@mydsn/?LANGUAGE=us_english&foo=bar') - dialect = mssql.MSSQLDialect_pyodbc() connection = dialect.create_connect_args(u) eq_([['dsn=mydsn;UID=username;PWD=password;LANGUAGE=us_english;foo=bar'], {}], connection) def test_pyodbc_connect(self): u = url.make_url('mssql://username:password@hostspec/database') - dialect = mssql.MSSQLDialect_pyodbc() connection = dialect.create_connect_args(u) eq_([['DRIVER={SQL Server};Server=hostspec;Database=database;UID=username;PWD=password'], {}], connection) def test_pyodbc_connect_comma_port(self): u = url.make_url('mssql://username:password@hostspec:12345/database') - dialect = mssql.MSSQLDialect_pyodbc() connection = dialect.create_connect_args(u) eq_([['DRIVER={SQL Server};Server=hostspec,12345;Database=database;UID=username;PWD=password'], {}], connection) def test_pyodbc_connect_config_port(self): u = url.make_url('mssql://username:password@hostspec/database?port=12345') - dialect = mssql.MSSQLDialect_pyodbc() connection = dialect.create_connect_args(u) eq_([['DRIVER={SQL Server};Server=hostspec;Database=database;UID=username;PWD=password;port=12345'], {}], connection) def test_pyodbc_extra_connect(self): u = url.make_url('mssql://username:password@hostspec/database?LANGUAGE=us_english&foo=bar') - dialect = mssql.MSSQLDialect_pyodbc() connection = dialect.create_connect_args(u) eq_([['DRIVER={SQL Server};Server=hostspec;Database=database;UID=username;PWD=password;foo=bar;LANGUAGE=us_english'], {}], connection) def test_pyodbc_odbc_connect(self): u = url.make_url('mssql:///?odbc_connect=DRIVER%3D%7BSQL+Server%7D%3BServer%3Dhostspec%3BDatabase%3Ddatabase%3BUID%3Dusername%3BPWD%3Dpassword') - dialect = mssql.MSSQLDialect_pyodbc() connection = dialect.create_connect_args(u) eq_([['DRIVER={SQL Server};Server=hostspec;Database=database;UID=username;PWD=password'], {}], connection) def test_pyodbc_odbc_connect_with_dsn(self): u = url.make_url('mssql:///?odbc_connect=dsn%3Dmydsn%3BDatabase%3Ddatabase%3BUID%3Dusername%3BPWD%3Dpassword') - dialect = mssql.MSSQLDialect_pyodbc() connection = dialect.create_connect_args(u) eq_([['dsn=mydsn;Database=database;UID=username;PWD=password'], {}], connection) def test_pyodbc_odbc_connect_ignores_other_values(self): u = url.make_url('mssql://userdiff:passdiff@localhost/dbdiff?odbc_connect=DRIVER%3D%7BSQL+Server%7D%3BServer%3Dhostspec%3BDatabase%3Ddatabase%3BUID%3Dusername%3BPWD%3Dpassword') - dialect = mssql.MSSQLDialect_pyodbc() connection = dialect.create_connect_args(u) eq_([['DRIVER={SQL Server};Server=hostspec;Database=database;UID=username;PWD=password'], {}], connection) -class TypesTest(TestBase): +class TypesTest(TestBase, AssertsExecutionResults, ComparesTables): __only_on__ = 'mssql' @classmethod def setup_class(cls): - global numeric_table, metadata + global metadata metadata = MetaData(testing.db) def teardown(self): @@ -601,26 +681,22 @@ class TypesTest(TestBase): ) metadata.create_all() - try: - test_items = [decimal.Decimal(d) for d in '1500000.00000000000000000000', - '-1500000.00000000000000000000', '1500000', - '0.0000000000000000002', '0.2', '-0.0000000000000000002', '-2E-2', - '156666.458923543', '-156666.458923543', '1', '-1', '-1234', '1234', - '2E-12', '4E8', '3E-6', '3E-7', '4.1', '1E-1', '1E-2', '1E-3', - '1E-4', '1E-5', '1E-6', '1E-7', '1E-1', '1E-8', '0.2732E2', '-0.2432E2', '4.35656E2', - '-02452E-2', '45125E-2', - '1234.58965E-2', '1.521E+15', '-1E-25', '1E-25', '1254E-25', '-1203E-25', - '0', '-0.00', '-0', '4585E12', '000000000000000000012', '000000000000.32E12', - '00000000000000.1E+12', '000000000000.2E-32'] + test_items = [decimal.Decimal(d) for d in '1500000.00000000000000000000', + '-1500000.00000000000000000000', '1500000', + '0.0000000000000000002', '0.2', '-0.0000000000000000002', '-2E-2', + '156666.458923543', '-156666.458923543', '1', '-1', '-1234', '1234', + '2E-12', '4E8', '3E-6', '3E-7', '4.1', '1E-1', '1E-2', '1E-3', + '1E-4', '1E-5', '1E-6', '1E-7', '1E-1', '1E-8', '0.2732E2', '-0.2432E2', '4.35656E2', + '-02452E-2', '45125E-2', + '1234.58965E-2', '1.521E+15', '-1E-25', '1E-25', '1254E-25', '-1203E-25', + '0', '-0.00', '-0', '4585E12', '000000000000000000012', '000000000000.32E12', + '00000000000000.1E+12', '000000000000.2E-32'] - for value in test_items: - numeric_table.insert().execute(numericcol=value) + for value in test_items: + numeric_table.insert().execute(numericcol=value) - for value in select([numeric_table.c.numericcol]).execute(): - assert value[0] in test_items, "%s not in test_items" % value[0] - - except Exception, e: - raise e + for value in select([numeric_table.c.numericcol]).execute(): + assert value[0] in test_items, "%s not in test_items" % value[0] def test_float(self): float_table = Table('float_table', metadata, @@ -643,11 +719,6 @@ class TypesTest(TestBase): raise e -class TypesTest2(TestBase, AssertsExecutionResults): - "Test Microsoft SQL Server column types" - - __only_on__ = 'mssql' - def test_money(self): "Exercise type specification for money types." @@ -659,13 +730,14 @@ class TypesTest2(TestBase, AssertsExecutionResults): 'SMALLMONEY'), ] - table_args = ['test_mssql_money', MetaData(testing.db)] + table_args = ['test_mssql_money', metadata] for index, spec in enumerate(columns): type_, args, kw, res = spec table_args.append(Column('c%s' % index, type_(*args, **kw), nullable=None)) money_table = Table(*table_args) - gen = testing.db.dialect.schemagenerator(testing.db.dialect, testing.db, None, None) + dialect = mssql.dialect() + gen = dialect.ddl_compiler(dialect, schema.CreateTable(money_table)) for col in money_table.c: index = int(col.name[1:]) @@ -688,15 +760,27 @@ class TypesTest2(TestBase, AssertsExecutionResults): (mssql.MSDateTime, [], {}, 'DATETIME', []), + (types.DATE, [], {}, + 'DATE', ['>=', (10,)]), + (types.Date, [], {}, + 'DATE', ['>=', (10,)]), + (types.Date, [], {}, + 'DATETIME', ['<', (10,)], mssql.MSDateTime), (mssql.MSDate, [], {}, 'DATE', ['>=', (10,)]), (mssql.MSDate, [], {}, 'DATETIME', ['<', (10,)], mssql.MSDateTime), + (types.TIME, [], {}, + 'TIME', ['>=', (10,)]), + (types.Time, [], {}, + 'TIME', ['>=', (10,)]), (mssql.MSTime, [], {}, 'TIME', ['>=', (10,)]), (mssql.MSTime, [1], {}, 'TIME(1)', ['>=', (10,)]), + (types.Time, [], {}, + 'DATETIME', ['<', (10,)], mssql.MSDateTime), (mssql.MSTime, [], {}, 'DATETIME', ['<', (10,)], mssql.MSDateTime), @@ -715,14 +799,14 @@ class TypesTest2(TestBase, AssertsExecutionResults): ] - table_args = ['test_mssql_dates', MetaData(testing.db)] + table_args = ['test_mssql_dates', metadata] for index, spec in enumerate(columns): type_, args, kw, res, requires = spec[0:5] if (requires and testing._is_excluded('mssql', *requires)) or not requires: table_args.append(Column('c%s' % index, type_(*args, **kw), nullable=None)) dates_table = Table(*table_args) - gen = testing.db.dialect.schemagenerator(testing.db.dialect, testing.db, None, None) + gen = testing.db.dialect.ddl_compiler(testing.db.dialect, schema.CreateTable(dates_table)) for col in dates_table.c: index = int(col.name[1:]) @@ -730,49 +814,37 @@ class TypesTest2(TestBase, AssertsExecutionResults): "%s %s" % (col.name, columns[index][3])) self.assert_(repr(col)) - try: - dates_table.create(checkfirst=True) - assert True - except: - raise + dates_table.create(checkfirst=True) reflected_dates = Table('test_mssql_dates', MetaData(testing.db), autoload=True) for col in reflected_dates.c: - index = int(col.name[1:]) - testing.eq_(testing.db.dialect.type_descriptor(col.type).__class__, - len(columns[index]) > 5 and columns[index][5] or columns[index][0]) - dates_table.drop() - - def test_dates2(self): - meta = MetaData(testing.db) - t = Table('test_dates', meta, - Column('id', Integer, - Sequence('datetest_id_seq', optional=True), - primary_key=True), - Column('adate', Date), - Column('atime', Time), - Column('adatetime', DateTime)) - t.create(checkfirst=True) - try: - d1 = datetime.date(2007, 10, 30) - t1 = datetime.time(11, 2, 32) - d2 = datetime.datetime(2007, 10, 30, 11, 2, 32) - t.insert().execute(adate=d1, adatetime=d2, atime=t1) - t.insert().execute(adate=d2, adatetime=d2, atime=d2) + self.assert_types_base(col, dates_table.c[col.key]) - x = t.select().execute().fetchall()[0] - self.assert_(x.adate.__class__ == datetime.date) - self.assert_(x.atime.__class__ == datetime.time) - self.assert_(x.adatetime.__class__ == datetime.datetime) + def test_date_roundtrip(self): + t = Table('test_dates', metadata, + Column('id', Integer, + Sequence('datetest_id_seq', optional=True), + primary_key=True), + Column('adate', Date), + Column('atime', Time), + Column('adatetime', DateTime)) + metadata.create_all() + d1 = datetime.date(2007, 10, 30) + t1 = datetime.time(11, 2, 32) + d2 = datetime.datetime(2007, 10, 30, 11, 2, 32) + t.insert().execute(adate=d1, adatetime=d2, atime=t1) + t.insert().execute(adate=d2, adatetime=d2, atime=d2) - t.delete().execute() + x = t.select().execute().fetchall()[0] + self.assert_(x.adate.__class__ == datetime.date) + self.assert_(x.atime.__class__ == datetime.time) + self.assert_(x.adatetime.__class__ == datetime.datetime) - t.insert().execute(adate=d1, adatetime=d2, atime=t1) + t.delete().execute() - eq_(select([t.c.adate, t.c.atime, t.c.adatetime], t.c.adate==d1).execute().fetchall(), [(d1, t1, d2)]) + t.insert().execute(adate=d1, adatetime=d2, atime=t1) - finally: - t.drop(checkfirst=True) + eq_(select([t.c.adate, t.c.atime, t.c.adatetime], t.c.adate==d1).execute().fetchall(), [(d1, t1, d2)]) def test_binary(self): "Exercise type specification for binary types." @@ -781,6 +853,9 @@ class TypesTest2(TestBase, AssertsExecutionResults): # column type, args, kwargs, expected ddl (mssql.MSBinary, [], {}, 'BINARY'), + (types.Binary, [10], {}, + 'BINARY(10)'), + (mssql.MSBinary, [10], {}, 'BINARY(10)'), @@ -798,13 +873,14 @@ class TypesTest2(TestBase, AssertsExecutionResults): 'BINARY(10)') ] - table_args = ['test_mssql_binary', MetaData(testing.db)] + table_args = ['test_mssql_binary', metadata] for index, spec in enumerate(columns): type_, args, kw, res = spec table_args.append(Column('c%s' % index, type_(*args, **kw), nullable=None)) binary_table = Table(*table_args) - gen = testing.db.dialect.schemagenerator(testing.db.dialect, testing.db, None, None) + dialect = mssql.dialect() + gen = dialect.ddl_compiler(dialect, schema.CreateTable(binary_table)) for col in binary_table.c: index = int(col.name[1:]) @@ -812,22 +888,15 @@ class TypesTest2(TestBase, AssertsExecutionResults): "%s %s" % (col.name, columns[index][3])) self.assert_(repr(col)) - try: - binary_table.create(checkfirst=True) - assert True - except: - raise + metadata.create_all() reflected_binary = Table('test_mssql_binary', MetaData(testing.db), autoload=True) for col in reflected_binary.c: - # don't test the MSGenericBinary since it's a special case and - # reflected it will map to a MSImage or MSBinary depending - if not testing.db.dialect.type_descriptor(binary_table.c[col.name].type).__class__ == mssql.MSGenericBinary: - testing.eq_(testing.db.dialect.type_descriptor(col.type).__class__, - testing.db.dialect.type_descriptor(binary_table.c[col.name].type).__class__) + c1 =testing.db.dialect.type_descriptor(col.type).__class__ + c2 =testing.db.dialect.type_descriptor(binary_table.c[col.name].type).__class__ + assert issubclass(c1, c2), "%r is not a subclass of %r" % (c1, c2) if binary_table.c[col.name].type.length: testing.eq_(col.type.length, binary_table.c[col.name].type.length) - binary_table.drop() def test_boolean(self): "Exercise type specification for boolean type." @@ -838,13 +907,14 @@ class TypesTest2(TestBase, AssertsExecutionResults): 'BIT'), ] - table_args = ['test_mssql_boolean', MetaData(testing.db)] + table_args = ['test_mssql_boolean', metadata] for index, spec in enumerate(columns): type_, args, kw, res = spec table_args.append(Column('c%s' % index, type_(*args, **kw), nullable=None)) boolean_table = Table(*table_args) - gen = testing.db.dialect.schemagenerator(testing.db.dialect, testing.db, None, None) + dialect = mssql.dialect() + gen = dialect.ddl_compiler(dialect, schema.CreateTable(boolean_table)) for col in boolean_table.c: index = int(col.name[1:]) @@ -852,12 +922,7 @@ class TypesTest2(TestBase, AssertsExecutionResults): "%s %s" % (col.name, columns[index][3])) self.assert_(repr(col)) - try: - boolean_table.create(checkfirst=True) - assert True - except: - raise - boolean_table.drop() + metadata.create_all() def test_numeric(self): "Exercise type specification and options for numeric types." @@ -865,40 +930,39 @@ class TypesTest2(TestBase, AssertsExecutionResults): columns = [ # column type, args, kwargs, expected ddl (mssql.MSNumeric, [], {}, - 'NUMERIC(10, 2)'), + 'NUMERIC'), (mssql.MSNumeric, [None], {}, 'NUMERIC'), - (mssql.MSNumeric, [12], {}, - 'NUMERIC(12, 2)'), (mssql.MSNumeric, [12, 4], {}, 'NUMERIC(12, 4)'), - (mssql.MSFloat, [], {}, - 'FLOAT(10)'), - (mssql.MSFloat, [None], {}, + (types.Float, [], {}, + 'FLOAT'), + (types.Float, [None], {}, 'FLOAT'), - (mssql.MSFloat, [12], {}, + (types.Float, [12], {}, 'FLOAT(12)'), (mssql.MSReal, [], {}, 'REAL'), - (mssql.MSInteger, [], {}, + (types.Integer, [], {}, 'INTEGER'), - (mssql.MSBigInteger, [], {}, + (types.BigInteger, [], {}, 'BIGINT'), (mssql.MSTinyInteger, [], {}, 'TINYINT'), - (mssql.MSSmallInteger, [], {}, + (types.SmallInteger, [], {}, 'SMALLINT'), ] - table_args = ['test_mssql_numeric', MetaData(testing.db)] + table_args = ['test_mssql_numeric', metadata] for index, spec in enumerate(columns): type_, args, kw, res = spec table_args.append(Column('c%s' % index, type_(*args, **kw), nullable=None)) numeric_table = Table(*table_args) - gen = testing.db.dialect.schemagenerator(testing.db.dialect, testing.db, None, None) + dialect = mssql.dialect() + gen = dialect.ddl_compiler(dialect, schema.CreateTable(numeric_table)) for col in numeric_table.c: index = int(col.name[1:]) @@ -906,20 +970,11 @@ class TypesTest2(TestBase, AssertsExecutionResults): "%s %s" % (col.name, columns[index][3])) self.assert_(repr(col)) - try: - numeric_table.create(checkfirst=True) - assert True - except: - raise - numeric_table.drop() + metadata.create_all() def test_char(self): """Exercise COLLATE-ish options on string types.""" - # modify the text_as_varchar setting since we are not testing that behavior here - text_as_varchar = testing.db.dialect.text_as_varchar - testing.db.dialect.text_as_varchar = False - columns = [ (mssql.MSChar, [], {}, 'CHAR'), @@ -960,13 +1015,14 @@ class TypesTest2(TestBase, AssertsExecutionResults): 'NTEXT COLLATE Latin1_General_CI_AS'), ] - table_args = ['test_mssql_charset', MetaData(testing.db)] + table_args = ['test_mssql_charset', metadata] for index, spec in enumerate(columns): type_, args, kw, res = spec table_args.append(Column('c%s' % index, type_(*args, **kw), nullable=None)) charset_table = Table(*table_args) - gen = testing.db.dialect.schemagenerator(testing.db.dialect, testing.db, None, None) + dialect = mssql.dialect() + gen = dialect.ddl_compiler(dialect, schema.CreateTable(charset_table)) for col in charset_table.c: index = int(col.name[1:]) @@ -974,110 +1030,91 @@ class TypesTest2(TestBase, AssertsExecutionResults): "%s %s" % (col.name, columns[index][3])) self.assert_(repr(col)) - try: - charset_table.create(checkfirst=True) - assert True - except: - raise - charset_table.drop() - - testing.db.dialect.text_as_varchar = text_as_varchar + metadata.create_all() def test_timestamp(self): """Exercise TIMESTAMP column.""" - meta = MetaData(testing.db) - - try: - columns = [ - (TIMESTAMP, - 'TIMESTAMP'), - (mssql.MSTimeStamp, - 'TIMESTAMP'), - ] - for idx, (spec, expected) in enumerate(columns): - t = Table('mssql_ts%s' % idx, meta, - Column('id', Integer, primary_key=True), - Column('t', spec, nullable=None)) - testing.eq_(colspec(t.c.t), "t %s" % expected) - self.assert_(repr(t.c.t)) - try: - t.create(checkfirst=True) - assert True - except: - raise - t.drop() - finally: - meta.drop_all() + dialect = mssql.dialect() + spec, expected = (TIMESTAMP,'TIMESTAMP') + t = Table('mssql_ts', metadata, + Column('id', Integer, primary_key=True), + Column('t', spec, nullable=None)) + gen = dialect.ddl_compiler(dialect, schema.CreateTable(t)) + testing.eq_(gen.get_column_specification(t.c.t), "t %s" % expected) + self.assert_(repr(t.c.t)) + t.create(checkfirst=True) + def test_autoincrement(self): - meta = MetaData(testing.db) - try: - Table('ai_1', meta, - Column('int_y', Integer, primary_key=True), - Column('int_n', Integer, DefaultClause('0'), - primary_key=True)) - Table('ai_2', meta, - Column('int_y', Integer, primary_key=True), - Column('int_n', Integer, DefaultClause('0'), - primary_key=True)) - Table('ai_3', meta, - Column('int_n', Integer, DefaultClause('0'), - primary_key=True, autoincrement=False), - Column('int_y', Integer, primary_key=True)) - Table('ai_4', meta, - Column('int_n', Integer, DefaultClause('0'), - primary_key=True, autoincrement=False), - Column('int_n2', Integer, DefaultClause('0'), - primary_key=True, autoincrement=False)) - Table('ai_5', meta, - Column('int_y', Integer, primary_key=True), - Column('int_n', Integer, DefaultClause('0'), - primary_key=True, autoincrement=False)) - Table('ai_6', meta, - Column('o1', String(1), DefaultClause('x'), - primary_key=True), - Column('int_y', Integer, primary_key=True)) - Table('ai_7', meta, - Column('o1', String(1), DefaultClause('x'), - primary_key=True), - Column('o2', String(1), DefaultClause('x'), - primary_key=True), - Column('int_y', Integer, primary_key=True)) - Table('ai_8', meta, - Column('o1', String(1), DefaultClause('x'), - primary_key=True), - Column('o2', String(1), DefaultClause('x'), - primary_key=True)) - meta.create_all() - - table_names = ['ai_1', 'ai_2', 'ai_3', 'ai_4', - 'ai_5', 'ai_6', 'ai_7', 'ai_8'] - mr = MetaData(testing.db) - mr.reflect(only=table_names) - - for tbl in [mr.tables[name] for name in table_names]: - for c in tbl.c: - if c.name.startswith('int_y'): - assert c.autoincrement - elif c.name.startswith('int_n'): - assert not c.autoincrement - tbl.insert().execute() + Table('ai_1', metadata, + Column('int_y', Integer, primary_key=True), + Column('int_n', Integer, DefaultClause('0'), + primary_key=True)) + Table('ai_2', metadata, + Column('int_y', Integer, primary_key=True), + Column('int_n', Integer, DefaultClause('0'), + primary_key=True)) + Table('ai_3', metadata, + Column('int_n', Integer, DefaultClause('0'), + primary_key=True, autoincrement=False), + Column('int_y', Integer, primary_key=True)) + Table('ai_4', metadata, + Column('int_n', Integer, DefaultClause('0'), + primary_key=True, autoincrement=False), + Column('int_n2', Integer, DefaultClause('0'), + primary_key=True, autoincrement=False)) + Table('ai_5', metadata, + Column('int_y', Integer, primary_key=True), + Column('int_n', Integer, DefaultClause('0'), + primary_key=True, autoincrement=False)) + Table('ai_6', metadata, + Column('o1', String(1), DefaultClause('x'), + primary_key=True), + Column('int_y', Integer, primary_key=True)) + Table('ai_7', metadata, + Column('o1', String(1), DefaultClause('x'), + primary_key=True), + Column('o2', String(1), DefaultClause('x'), + primary_key=True), + Column('int_y', Integer, primary_key=True)) + Table('ai_8', metadata, + Column('o1', String(1), DefaultClause('x'), + primary_key=True), + Column('o2', String(1), DefaultClause('x'), + primary_key=True)) + metadata.create_all() + + table_names = ['ai_1', 'ai_2', 'ai_3', 'ai_4', + 'ai_5', 'ai_6', 'ai_7', 'ai_8'] + mr = MetaData(testing.db) + + for name in table_names: + tbl = Table(name, mr, autoload=True) + for c in tbl.c: + if c.name.startswith('int_y'): + assert c.autoincrement + elif c.name.startswith('int_n'): + assert not c.autoincrement + + for counter, engine in enumerate([ + engines.testing_engine(options={'implicit_returning':False}), + engines.testing_engine(options={'implicit_returning':True}), + ] + ): + engine.execute(tbl.insert()) if 'int_y' in tbl.c: - assert select([tbl.c.int_y]).scalar() == 1 - assert list(tbl.select().execute().fetchone()).count(1) == 1 + assert engine.scalar(select([tbl.c.int_y])) == counter + 1 + assert list(engine.execute(tbl.select()).first()).count(counter + 1) == 1 else: - assert 1 not in list(tbl.select().execute().fetchone()) - finally: - meta.drop_all() - -def colspec(c): - return testing.db.dialect.schemagenerator(testing.db.dialect, - testing.db, None, None).get_column_specification(c) - + assert 1 not in list(engine.execute(tbl.select()).first()) + engine.execute(tbl.delete()) class BinaryTest(TestBase, AssertsExecutionResults): """Test the Binary and VarBinary types""" + + __only_on__ = 'mssql' + @classmethod def setup_class(cls): global binary_table, MyPickleType @@ -1125,6 +1162,11 @@ class BinaryTest(TestBase, AssertsExecutionResults): stream2 =self.load_stream('binary_data_two.dat') binary_table.insert().execute(primary_id=1, misc='binary_data_one.dat', data=stream1, data_image=stream1, data_slice=stream1[0:100], pickled=testobj1, mypickle=testobj3) binary_table.insert().execute(primary_id=2, misc='binary_data_two.dat', data=stream2, data_image=stream2, data_slice=stream2[0:99], pickled=testobj2) + + # TODO: pyodbc does not seem to accept "None" for a VARBINARY column (data=None). + # error: [Microsoft][ODBC SQL Server Driver][SQL Server]Implicit conversion from + # data type varchar to varbinary is not allowed. Use the CONVERT function to run this query. (257) + #binary_table.insert().execute(primary_id=3, misc='binary_data_two.dat', data=None, data_image=None, data_slice=stream2[0:99], pickled=None) binary_table.insert().execute(primary_id=3, misc='binary_data_two.dat', data_image=None, data_slice=stream2[0:99], pickled=None) for stmt in ( diff --git a/test/dialect/test_mysql.py b/test/dialect/test_mysql.py index 8adb2d71c..405264152 100644 --- a/test/dialect/test_mysql.py +++ b/test/dialect/test_mysql.py @@ -1,8 +1,12 @@ from sqlalchemy.test.testing import eq_ + +# Py2K import sets +# end Py2K + from sqlalchemy import * from sqlalchemy import sql, exc -from sqlalchemy.databases import mysql +from sqlalchemy.dialects.mysql import base as mysql from sqlalchemy.test.testing import eq_ from sqlalchemy.test import * @@ -56,11 +60,11 @@ class TypesTest(TestBase, AssertsExecutionResults): # column type, args, kwargs, expected ddl # e.g. Column(Integer(10, unsigned=True)) == 'INTEGER(10) UNSIGNED' (mysql.MSNumeric, [], {}, - 'NUMERIC(10, 2)'), + 'NUMERIC'), (mysql.MSNumeric, [None], {}, 'NUMERIC'), (mysql.MSNumeric, [12], {}, - 'NUMERIC(12, 2)'), + 'NUMERIC(12)'), (mysql.MSNumeric, [12, 4], {'unsigned':True}, 'NUMERIC(12, 4) UNSIGNED'), (mysql.MSNumeric, [12, 4], {'zerofill':True}, @@ -69,11 +73,11 @@ class TypesTest(TestBase, AssertsExecutionResults): 'NUMERIC(12, 4) UNSIGNED ZEROFILL'), (mysql.MSDecimal, [], {}, - 'DECIMAL(10, 2)'), + 'DECIMAL'), (mysql.MSDecimal, [None], {}, 'DECIMAL'), (mysql.MSDecimal, [12], {}, - 'DECIMAL(12, 2)'), + 'DECIMAL(12)'), (mysql.MSDecimal, [12, None], {}, 'DECIMAL(12)'), (mysql.MSDecimal, [12, 4], {'unsigned':True}, @@ -178,11 +182,11 @@ class TypesTest(TestBase, AssertsExecutionResults): table_args.append(Column('c%s' % index, type_(*args, **kw))) numeric_table = Table(*table_args) - gen = testing.db.dialect.schemagenerator(testing.db.dialect, testing.db, None, None) + gen = testing.db.dialect.ddl_compiler(testing.db.dialect, numeric_table) for col in numeric_table.c: index = int(col.name[1:]) - self.assert_eq(gen.get_column_specification(col), + eq_(gen.get_column_specification(col), "%s %s" % (col.name, columns[index][3])) self.assert_(repr(col)) @@ -262,11 +266,11 @@ class TypesTest(TestBase, AssertsExecutionResults): table_args.append(Column('c%s' % index, type_(*args, **kw))) charset_table = Table(*table_args) - gen = testing.db.dialect.schemagenerator(testing.db.dialect, testing.db, None, None) + gen = testing.db.dialect.ddl_compiler(testing.db.dialect, charset_table) for col in charset_table.c: index = int(col.name[1:]) - self.assert_eq(gen.get_column_specification(col), + eq_(gen.get_column_specification(col), "%s %s" % (col.name, columns[index][3])) self.assert_(repr(col)) @@ -292,14 +296,14 @@ class TypesTest(TestBase, AssertsExecutionResults): Column('b7', mysql.MSBit(63)), Column('b8', mysql.MSBit(64))) - self.assert_eq(colspec(bit_table.c.b1), 'b1 BIT') - self.assert_eq(colspec(bit_table.c.b2), 'b2 BIT') - self.assert_eq(colspec(bit_table.c.b3), 'b3 BIT NOT NULL') - self.assert_eq(colspec(bit_table.c.b4), 'b4 BIT(1)') - self.assert_eq(colspec(bit_table.c.b5), 'b5 BIT(8)') - self.assert_eq(colspec(bit_table.c.b6), 'b6 BIT(32)') - self.assert_eq(colspec(bit_table.c.b7), 'b7 BIT(63)') - self.assert_eq(colspec(bit_table.c.b8), 'b8 BIT(64)') + eq_(colspec(bit_table.c.b1), 'b1 BIT') + eq_(colspec(bit_table.c.b2), 'b2 BIT') + eq_(colspec(bit_table.c.b3), 'b3 BIT NOT NULL') + eq_(colspec(bit_table.c.b4), 'b4 BIT(1)') + eq_(colspec(bit_table.c.b5), 'b5 BIT(8)') + eq_(colspec(bit_table.c.b6), 'b6 BIT(32)') + eq_(colspec(bit_table.c.b7), 'b7 BIT(63)') + eq_(colspec(bit_table.c.b8), 'b8 BIT(64)') for col in bit_table.c: self.assert_(repr(col)) @@ -314,7 +318,7 @@ class TypesTest(TestBase, AssertsExecutionResults): def roundtrip(store, expected=None): expected = expected or store table.insert(store).execute() - row = list(table.select().execute())[0] + row = table.select().execute().first() try: self.assert_(list(row) == expected) except: @@ -322,7 +326,7 @@ class TypesTest(TestBase, AssertsExecutionResults): print "Expected %s" % expected print "Found %s" % list(row) raise - table.delete().execute() + table.delete().execute().close() roundtrip([0] * 8) roundtrip([None, None, 0, None, None, None, None, None]) @@ -350,10 +354,10 @@ class TypesTest(TestBase, AssertsExecutionResults): Column('b3', mysql.MSTinyInteger(1)), Column('b4', mysql.MSTinyInteger)) - self.assert_eq(colspec(bool_table.c.b1), 'b1 BOOL') - self.assert_eq(colspec(bool_table.c.b2), 'b2 BOOL') - self.assert_eq(colspec(bool_table.c.b3), 'b3 TINYINT(1)') - self.assert_eq(colspec(bool_table.c.b4), 'b4 TINYINT') + eq_(colspec(bool_table.c.b1), 'b1 BOOL') + eq_(colspec(bool_table.c.b2), 'b2 BOOL') + eq_(colspec(bool_table.c.b3), 'b3 TINYINT(1)') + eq_(colspec(bool_table.c.b4), 'b4 TINYINT') for col in bool_table.c: self.assert_(repr(col)) @@ -364,7 +368,7 @@ class TypesTest(TestBase, AssertsExecutionResults): def roundtrip(store, expected=None): expected = expected or store table.insert(store).execute() - row = list(table.select().execute())[0] + row = table.select().execute().first() try: self.assert_(list(row) == expected) for i, val in enumerate(expected): @@ -375,7 +379,7 @@ class TypesTest(TestBase, AssertsExecutionResults): print "Expected %s" % expected print "Found %s" % list(row) raise - table.delete().execute() + table.delete().execute().close() roundtrip([None, None, None, None]) @@ -387,7 +391,7 @@ class TypesTest(TestBase, AssertsExecutionResults): meta2 = MetaData(testing.db) # replace with reflected table = Table('mysql_bool', meta2, autoload=True) - self.assert_eq(colspec(table.c.b3), 'b3 BOOL') + eq_(colspec(table.c.b3), 'b3 BOOL') roundtrip([None, None, None, None]) roundtrip([True, True, 1, 1], [True, True, True, 1]) @@ -430,7 +434,7 @@ class TypesTest(TestBase, AssertsExecutionResults): t = Table('mysql_ts%s' % idx, meta, Column('id', Integer, primary_key=True), Column('t', *spec)) - self.assert_eq(colspec(t.c.t), "t %s" % expected) + eq_(colspec(t.c.t), "t %s" % expected) self.assert_(repr(t.c.t)) t.create() r = Table('mysql_ts%s' % idx, MetaData(testing.db), @@ -460,12 +464,12 @@ class TypesTest(TestBase, AssertsExecutionResults): for table in year_table, reflected: table.insert(['1950', '50', None, 50, 1950]).execute() - row = list(table.select().execute())[0] - self.assert_eq(list(row), [1950, 2050, None, 50, 1950]) + row = table.select().execute().first() + eq_(list(row), [1950, 2050, None, 50, 1950]) table.delete().execute() self.assert_(colspec(table.c.y1).startswith('y1 YEAR')) - self.assert_eq(colspec(table.c.y4), 'y4 YEAR(2)') - self.assert_eq(colspec(table.c.y5), 'y5 YEAR(4)') + eq_(colspec(table.c.y4), 'y4 YEAR(2)') + eq_(colspec(table.c.y5), 'y5 YEAR(4)') finally: meta.drop_all() @@ -479,9 +483,9 @@ class TypesTest(TestBase, AssertsExecutionResults): Column('s2', mysql.MSSet("'a'")), Column('s3', mysql.MSSet("'5'", "'7'", "'9'"))) - self.assert_eq(colspec(set_table.c.s1), "s1 SET('dq','sq')") - self.assert_eq(colspec(set_table.c.s2), "s2 SET('a')") - self.assert_eq(colspec(set_table.c.s3), "s3 SET('5','7','9')") + eq_(colspec(set_table.c.s1), "s1 SET('dq','sq')") + eq_(colspec(set_table.c.s2), "s2 SET('a')") + eq_(colspec(set_table.c.s3), "s3 SET('5','7','9')") for col in set_table.c: self.assert_(repr(col)) @@ -494,7 +498,7 @@ class TypesTest(TestBase, AssertsExecutionResults): def roundtrip(store, expected=None): expected = expected or store table.insert(store).execute() - row = list(table.select().execute())[0] + row = table.select().execute().first() try: self.assert_(list(row) == expected) except: @@ -518,12 +522,12 @@ class TypesTest(TestBase, AssertsExecutionResults): {'s3':set(['5', '7'])}, {'s3':set(['5', '7', '9'])}, {'s3':set(['7', '9'])}) - rows = list(select( + rows = select( [set_table.c.s3], - set_table.c.s3.in_([set(['5']), set(['5', '7'])])).execute()) + set_table.c.s3.in_([set(['5']), set(['5', '7']), set(['7', '5'])]) + ).execute().fetchall() found = set([frozenset(row[0]) for row in rows]) - eq_(found, - set([frozenset(['5']), frozenset(['5', '7'])])) + eq_(found, set([frozenset(['5']), frozenset(['5', '7'])])) finally: meta.drop_all() @@ -542,17 +546,17 @@ class TypesTest(TestBase, AssertsExecutionResults): Column('e6', mysql.MSEnum("'a'", "b")), ) - self.assert_eq(colspec(enum_table.c.e1), + eq_(colspec(enum_table.c.e1), "e1 ENUM('a','b')") - self.assert_eq(colspec(enum_table.c.e2), + eq_(colspec(enum_table.c.e2), "e2 ENUM('a','b') NOT NULL") - self.assert_eq(colspec(enum_table.c.e3), + eq_(colspec(enum_table.c.e3), "e3 ENUM('a','b')") - self.assert_eq(colspec(enum_table.c.e4), + eq_(colspec(enum_table.c.e4), "e4 ENUM('a','b') NOT NULL") - self.assert_eq(colspec(enum_table.c.e5), + eq_(colspec(enum_table.c.e5), "e5 ENUM('a','b')") - self.assert_eq(colspec(enum_table.c.e6), + eq_(colspec(enum_table.c.e6), "e6 ENUM('''a''','b')") enum_table.drop(checkfirst=True) enum_table.create() @@ -585,8 +589,9 @@ class TypesTest(TestBase, AssertsExecutionResults): # This is known to fail with MySQLDB 1.2.2 beta versions # which return these as sets.Set(['a']), sets.Set(['b']) # (even on Pythons with __builtin__.set) - if testing.db.dialect.dbapi.version_info < (1, 2, 2, 'beta', 3) and \ - testing.db.dialect.dbapi.version_info >= (1, 2, 2): + if (not testing.against('+zxjdbc') and + testing.db.dialect.dbapi.version_info < (1, 2, 2, 'beta', 3) and + testing.db.dialect.dbapi.version_info >= (1, 2, 2)): # these mysqldb seem to always uses 'sets', even on later pythons import sets def convert(value): @@ -602,7 +607,7 @@ class TypesTest(TestBase, AssertsExecutionResults): e.append(tuple([convert(c) for c in row])) expected = e - self.assert_eq(res, expected) + eq_(res, expected) enum_table.drop() @testing.exclude('mysql', '<', (4,), "3.23 can't handle an ENUM of ''") @@ -637,25 +642,52 @@ class TypesTest(TestBase, AssertsExecutionResults): finally: enum_table.drop() + + +class ReflectionTest(TestBase, AssertsExecutionResults): + + __only_on__ = 'mysql' + def test_default_reflection(self): """Test reflection of column defaults.""" def_table = Table('mysql_def', MetaData(testing.db), Column('c1', String(10), DefaultClause('')), Column('c2', String(10), DefaultClause('0')), - Column('c3', String(10), DefaultClause('abc'))) + Column('c3', String(10), DefaultClause('abc')), + Column('c4', TIMESTAMP, DefaultClause('2009-04-05 12:00:00')), + Column('c5', TIMESTAMP, ), + + ) + def_table.create() try: - def_table.create() reflected = Table('mysql_def', MetaData(testing.db), - autoload=True) - for t in def_table, reflected: - assert t.c.c1.server_default.arg == '' - assert t.c.c2.server_default.arg == '0' - assert t.c.c3.server_default.arg == 'abc' + autoload=True) finally: def_table.drop() + + assert def_table.c.c1.server_default.arg == '' + assert def_table.c.c2.server_default.arg == '0' + assert def_table.c.c3.server_default.arg == 'abc' + assert def_table.c.c4.server_default.arg == '2009-04-05 12:00:00' + + assert str(reflected.c.c1.server_default.arg) == "''" + assert str(reflected.c.c2.server_default.arg) == "'0'" + assert str(reflected.c.c3.server_default.arg) == "'abc'" + assert str(reflected.c.c4.server_default.arg) == "'2009-04-05 12:00:00'" + + reflected.create() + try: + reflected2 = Table('mysql_def', MetaData(testing.db), autoload=True) + finally: + reflected.drop() + assert str(reflected2.c.c1.server_default.arg) == "''" + assert str(reflected2.c.c2.server_default.arg) == "'0'" + assert str(reflected2.c.c3.server_default.arg) == "'abc'" + assert str(reflected2.c.c4.server_default.arg) == "'2009-04-05 12:00:00'" + def test_reflection_on_include_columns(self): """Test reflection of include_columns to be sure they respect case.""" @@ -700,8 +732,8 @@ class TypesTest(TestBase, AssertsExecutionResults): ( mysql.MSSmallInteger(4), mysql.MSSmallInteger(4), ), ( mysql.MSMediumInteger(), mysql.MSMediumInteger(), ), ( mysql.MSMediumInteger(8), mysql.MSMediumInteger(8), ), - ( Binary(3), mysql.MSBlob(3), ), - ( Binary(), mysql.MSBlob() ), + ( Binary(3), mysql.TINYBLOB(), ), + ( Binary(), mysql.BLOB() ), ( mysql.MSBinary(3), mysql.MSBinary(3), ), ( mysql.MSVarBinary(3),), ( mysql.MSVarBinary(), mysql.MSBlob()), @@ -734,14 +766,15 @@ class TypesTest(TestBase, AssertsExecutionResults): # in a view, e.g. char -> varchar, tinyblob -> mediumblob # # Not sure exactly which point version has the fix. - if db.dialect.server_version_info(db.connect()) < (5, 0, 11): + if db.dialect.server_version_info < (5, 0, 11): tables = rt, else: tables = rt, rv for table in tables: for i, reflected in enumerate(table.c): - assert isinstance(reflected.type, type(expected[i])) + assert isinstance(reflected.type, type(expected[i])), \ + "element %d: %r not instance of %r" % (i, reflected.type, type(expected[i])) finally: db.execute('DROP VIEW mysql_types_v') finally: @@ -802,17 +835,12 @@ class TypesTest(TestBase, AssertsExecutionResults): tbl.insert().execute() if 'int_y' in tbl.c: assert select([tbl.c.int_y]).scalar() == 1 - assert list(tbl.select().execute().fetchone()).count(1) == 1 + assert list(tbl.select().execute().first()).count(1) == 1 else: - assert 1 not in list(tbl.select().execute().fetchone()) + assert 1 not in list(tbl.select().execute().first()) finally: meta.drop_all() - def assert_eq(self, got, wanted): - if got != wanted: - print "Expected %s" % wanted - print "Found %s" % got - eq_(got, wanted) class SQLTest(TestBase, AssertsCompiledSQL): @@ -909,11 +937,11 @@ class SQLTest(TestBase, AssertsCompiledSQL): (m.MSBit, "t.col"), # this is kind of sucky. thank you default arguments! - (NUMERIC, "CAST(t.col AS DECIMAL(10, 2))"), - (DECIMAL, "CAST(t.col AS DECIMAL(10, 2))"), - (Numeric, "CAST(t.col AS DECIMAL(10, 2))"), - (m.MSNumeric, "CAST(t.col AS DECIMAL(10, 2))"), - (m.MSDecimal, "CAST(t.col AS DECIMAL(10, 2))"), + (NUMERIC, "CAST(t.col AS DECIMAL)"), + (DECIMAL, "CAST(t.col AS DECIMAL)"), + (Numeric, "CAST(t.col AS DECIMAL)"), + (m.MSNumeric, "CAST(t.col AS DECIMAL)"), + (m.MSDecimal, "CAST(t.col AS DECIMAL)"), (FLOAT, "t.col"), (Float, "t.col"), @@ -928,8 +956,8 @@ class SQLTest(TestBase, AssertsCompiledSQL): (DateTime, "CAST(t.col AS DATETIME)"), (Date, "CAST(t.col AS DATE)"), (Time, "CAST(t.col AS TIME)"), - (m.MSDateTime, "CAST(t.col AS DATETIME)"), - (m.MSDate, "CAST(t.col AS DATE)"), + (DateTime, "CAST(t.col AS DATETIME)"), + (Date, "CAST(t.col AS DATE)"), (m.MSTime, "CAST(t.col AS TIME)"), (m.MSTimeStamp, "CAST(t.col AS DATETIME)"), (m.MSYear, "t.col"), @@ -998,12 +1026,11 @@ class SQLTest(TestBase, AssertsCompiledSQL): class RawReflectionTest(TestBase): def setup(self): - self.dialect = mysql.dialect() - self.reflector = mysql.MySQLSchemaReflector( - self.dialect.identifier_preparer) + dialect = mysql.dialect() + self.parser = mysql.MySQLTableDefinitionParser(dialect, dialect.identifier_preparer) def test_key_reflection(self): - regex = self.reflector._re_key + regex = self.parser._re_key assert regex.match(' PRIMARY KEY (`id`),') assert regex.match(' PRIMARY KEY USING BTREE (`id`),') @@ -1023,37 +1050,11 @@ class ExecutionTest(TestBase): cx = engine.connect() meta = MetaData() - - assert ('mysql', 'charset') not in cx.info - assert ('mysql', 'force_charset') not in cx.info - - cx.execute(text("SELECT 1")).fetchall() - assert ('mysql', 'charset') not in cx.info - - meta.reflect(cx) - assert ('mysql', 'charset') in cx.info - - cx.execute(text("SET @squiznart=123")) - assert ('mysql', 'charset') in cx.info - - # the charset invalidation is very conservative - cx.execute(text("SET TIMESTAMP = DEFAULT")) - assert ('mysql', 'charset') not in cx.info - - cx.info[('mysql', 'force_charset')] = 'latin1' - - assert engine.dialect._detect_charset(cx) == 'latin1' - assert cx.info[('mysql', 'charset')] == 'latin1' - - del cx.info[('mysql', 'force_charset')] - del cx.info[('mysql', 'charset')] + charset = engine.dialect._detect_charset(cx) meta.reflect(cx) - assert ('mysql', 'charset') in cx.info - - # String execution doesn't go through the detector. - cx.execute("SET TIMESTAMP = DEFAULT") - assert ('mysql', 'charset') in cx.info + eq_(cx.dialect._connection_charset, charset) + cx.close() class MatchTest(TestBase, AssertsCompiledSQL): @@ -1102,9 +1103,10 @@ class MatchTest(TestBase, AssertsCompiledSQL): metadata.drop_all() def test_expression(self): + format = testing.db.dialect.paramstyle == 'format' and '%s' or '?' self.assert_compile( matchtable.c.title.match('somstr'), - "MATCH (matchtable.title) AGAINST (%s IN BOOLEAN MODE)") + "MATCH (matchtable.title) AGAINST (%s IN BOOLEAN MODE)" % format) def test_simple_match(self): results = (matchtable.select(). @@ -1162,6 +1164,5 @@ class MatchTest(TestBase, AssertsCompiledSQL): def colspec(c): - return testing.db.dialect.schemagenerator(testing.db.dialect, - testing.db, None, None).get_column_specification(c) + return testing.db.dialect.ddl_compiler(testing.db.dialect, c.table).get_column_specification(c) diff --git a/test/dialect/test_oracle.py b/test/dialect/test_oracle.py index d9d64806e..53e0f9ec2 100644 --- a/test/dialect/test_oracle.py +++ b/test/dialect/test_oracle.py @@ -2,12 +2,14 @@ from sqlalchemy.test.testing import eq_ from sqlalchemy import * +from sqlalchemy import types as sqltypes from sqlalchemy.sql import table, column -from sqlalchemy.databases import oracle from sqlalchemy.test import * from sqlalchemy.test.testing import eq_ from sqlalchemy.test.engines import testing_engine +from sqlalchemy.dialects.oracle import cx_oracle, base as oracle from sqlalchemy.engine import default +from sqlalchemy.util import jython import os @@ -43,10 +45,10 @@ class CompileTest(TestBase, AssertsCompiledSQL): meta = MetaData() parent = Table('parent', meta, Column('id', Integer, primary_key=True), Column('name', String(50)), - owner='ed') + schema='ed') child = Table('child', meta, Column('id', Integer, primary_key=True), Column('parent_id', Integer, ForeignKey('ed.parent.id')), - owner = 'ed') + schema = 'ed') self.assert_compile(parent.join(child), "ed.parent JOIN ed.child ON ed.parent.id = ed.child.parent_id") @@ -342,6 +344,25 @@ class TypesTest(TestBase, AssertsCompiledSQL): b = bindparam("foo", u"hello world!") assert b.type.dialect_impl(dialect).get_dbapi_type(dbapi) == 'STRING' + def test_type_adapt(self): + dialect = cx_oracle.dialect() + + for start, test in [ + (DateTime(), cx_oracle._OracleDateTime), + (TIMESTAMP(), cx_oracle._OracleTimestamp), + (oracle.OracleRaw(), cx_oracle._OracleRaw), + (String(), String), + (VARCHAR(), VARCHAR), + (String(50), String), + (Unicode(), Unicode), + (Text(), cx_oracle._OracleText), + (UnicodeText(), cx_oracle._OracleUnicodeText), + (NCHAR(), NCHAR), + (oracle.RAW(50), cx_oracle._OracleRaw), + ]: + assert isinstance(start.dialect_impl(dialect), test), "wanted %r got %r" % (test, start.dialect_impl(dialect)) + + def test_reflect_raw(self): types_table = Table( 'all_types', MetaData(testing.db), @@ -354,16 +375,16 @@ class TypesTest(TestBase, AssertsCompiledSQL): def test_reflect_nvarchar(self): metadata = MetaData(testing.db) t = Table('t', metadata, - Column('data', oracle.OracleNVarchar(255)) + Column('data', sqltypes.NVARCHAR(255)) ) metadata.create_all() try: m2 = MetaData(testing.db) t2 = Table('t', m2, autoload=True) - assert isinstance(t2.c.data.type, oracle.OracleNVarchar) + assert isinstance(t2.c.data.type, sqltypes.NVARCHAR) data = u'm’a réveillé.' t2.insert().execute(data=data) - eq_(t2.select().execute().fetchone()['data'], data) + eq_(t2.select().execute().first()['data'], data) finally: metadata.drop_all() @@ -391,7 +412,7 @@ class TypesTest(TestBase, AssertsCompiledSQL): t.create(engine) try: engine.execute(t.insert(), id=1, data='this is text', bindata='this is binary') - row = engine.execute(t.select()).fetchone() + row = engine.execute(t.select()).first() eq_(row['data'].read(), 'this is text') eq_(row['bindata'].read(), 'this is binary') finally: @@ -408,7 +429,6 @@ class BufferedColumnTest(TestBase, AssertsCompiledSQL): Column('data', Binary) ) meta.create_all() - stream = os.path.join(os.path.dirname(__file__), "..", 'binary_data_one.dat') stream = file(stream).read(12000) @@ -420,17 +440,18 @@ class BufferedColumnTest(TestBase, AssertsCompiledSQL): meta.drop_all() def test_fetch(self): - eq_( - binary_table.select().execute().fetchall() , - [(i, stream) for i in range(1, 11)], - ) + result = binary_table.select().execute().fetchall() + if jython: + result = [(i, value.tostring()) for i, value in result] + eq_(result, [(i, stream) for i in range(1, 11)]) + @testing.fails_on('+zxjdbc', 'FIXME: zxjdbc should support this') def test_fetch_single_arraysize(self): eng = testing_engine(options={'arraysize':1}) - eq_( - eng.execute(binary_table.select()).fetchall(), - [(i, stream) for i in range(1, 11)], - ) + result = eng.execute(binary_table.select()).fetchall(), + if jython: + result = [(i, value.tostring()) for i, value in result] + eq_(result, [(i, stream) for i in range(1, 11)]) class SequenceTest(TestBase, AssertsCompiledSQL): def test_basic(self): diff --git a/test/dialect/test_postgres.py b/test/dialect/test_postgresql.py index 8ca714bad..e1c351a93 100644 --- a/test/dialect/test_postgres.py +++ b/test/dialect/test_postgresql.py @@ -1,18 +1,19 @@ from sqlalchemy.test.testing import eq_, assert_raises, assert_raises_message +from sqlalchemy.test import engines import datetime from sqlalchemy import * from sqlalchemy.orm import * -from sqlalchemy import exc -from sqlalchemy.databases import postgres +from sqlalchemy import exc, schema +from sqlalchemy.dialects.postgresql import base as postgresql from sqlalchemy.engine.strategies import MockEngineStrategy from sqlalchemy.test import * from sqlalchemy.sql import table, column - +from sqlalchemy.test.testing import eq_ class SequenceTest(TestBase, AssertsCompiledSQL): def test_basic(self): seq = Sequence("my_seq_no_schema") - dialect = postgres.PGDialect() + dialect = postgresql.PGDialect() assert dialect.identifier_preparer.format_sequence(seq) == "my_seq_no_schema" seq = Sequence("my_seq", schema="some_schema") @@ -22,43 +23,77 @@ class SequenceTest(TestBase, AssertsCompiledSQL): assert dialect.identifier_preparer.format_sequence(seq) == '"Some_Schema"."My_Seq"' class CompileTest(TestBase, AssertsCompiledSQL): - __dialect__ = postgres.dialect() + __dialect__ = postgresql.dialect() def test_update_returning(self): - dialect = postgres.dialect() + dialect = postgresql.dialect() table1 = table('mytable', column('myid', Integer), column('name', String(128)), column('description', String(128)), ) - u = update(table1, values=dict(name='foo'), postgres_returning=[table1.c.myid, table1.c.name]) + u = update(table1, values=dict(name='foo')).returning(table1.c.myid, table1.c.name) self.assert_compile(u, "UPDATE mytable SET name=%(name)s RETURNING mytable.myid, mytable.name", dialect=dialect) - u = update(table1, values=dict(name='foo'), postgres_returning=[table1]) + u = update(table1, values=dict(name='foo')).returning(table1) self.assert_compile(u, "UPDATE mytable SET name=%(name)s "\ "RETURNING mytable.myid, mytable.name, mytable.description", dialect=dialect) - u = update(table1, values=dict(name='foo'), postgres_returning=[func.length(table1.c.name)]) - self.assert_compile(u, "UPDATE mytable SET name=%(name)s RETURNING length(mytable.name)", dialect=dialect) + u = update(table1, values=dict(name='foo')).returning(func.length(table1.c.name)) + self.assert_compile(u, "UPDATE mytable SET name=%(name)s RETURNING length(mytable.name) AS length_1", dialect=dialect) + def test_insert_returning(self): - dialect = postgres.dialect() + dialect = postgresql.dialect() table1 = table('mytable', column('myid', Integer), column('name', String(128)), column('description', String(128)), ) - i = insert(table1, values=dict(name='foo'), postgres_returning=[table1.c.myid, table1.c.name]) + i = insert(table1, values=dict(name='foo')).returning(table1.c.myid, table1.c.name) self.assert_compile(i, "INSERT INTO mytable (name) VALUES (%(name)s) RETURNING mytable.myid, mytable.name", dialect=dialect) - i = insert(table1, values=dict(name='foo'), postgres_returning=[table1]) + i = insert(table1, values=dict(name='foo')).returning(table1) self.assert_compile(i, "INSERT INTO mytable (name) VALUES (%(name)s) "\ "RETURNING mytable.myid, mytable.name, mytable.description", dialect=dialect) - i = insert(table1, values=dict(name='foo'), postgres_returning=[func.length(table1.c.name)]) - self.assert_compile(i, "INSERT INTO mytable (name) VALUES (%(name)s) RETURNING length(mytable.name)", dialect=dialect) + i = insert(table1, values=dict(name='foo')).returning(func.length(table1.c.name)) + self.assert_compile(i, "INSERT INTO mytable (name) VALUES (%(name)s) RETURNING length(mytable.name) AS length_1", dialect=dialect) + + @testing.uses_deprecated(r".*argument is deprecated. Please use statement.returning.*") + def test_old_returning_names(self): + dialect = postgresql.dialect() + table1 = table('mytable', + column('myid', Integer), + column('name', String(128)), + column('description', String(128)), + ) + + u = update(table1, values=dict(name='foo'), postgres_returning=[table1.c.myid, table1.c.name]) + self.assert_compile(u, "UPDATE mytable SET name=%(name)s RETURNING mytable.myid, mytable.name", dialect=dialect) + + u = update(table1, values=dict(name='foo'), postgresql_returning=[table1.c.myid, table1.c.name]) + self.assert_compile(u, "UPDATE mytable SET name=%(name)s RETURNING mytable.myid, mytable.name", dialect=dialect) + + i = insert(table1, values=dict(name='foo'), postgres_returning=[table1.c.myid, table1.c.name]) + self.assert_compile(i, "INSERT INTO mytable (name) VALUES (%(name)s) RETURNING mytable.myid, mytable.name", dialect=dialect) + + def test_create_partial_index(self): + tbl = Table('testtbl', MetaData(), Column('data',Integer)) + idx = Index('test_idx1', tbl.c.data, postgresql_where=and_(tbl.c.data > 5, tbl.c.data < 10)) + + self.assert_compile(schema.CreateIndex(idx), + "CREATE INDEX test_idx1 ON testtbl (data) WHERE testtbl.data > 5 AND testtbl.data < 10", dialect=postgresql.dialect()) + + @testing.uses_deprecated(r".*'postgres_where' argument has been renamed.*") + def test_old_create_partial_index(self): + tbl = Table('testtbl', MetaData(), Column('data',Integer)) + idx = Index('test_idx1', tbl.c.data, postgres_where=and_(tbl.c.data > 5, tbl.c.data < 10)) + + self.assert_compile(schema.CreateIndex(idx), + "CREATE INDEX test_idx1 ON testtbl (data) WHERE testtbl.data > 5 AND testtbl.data < 10", dialect=postgresql.dialect()) def test_extract(self): t = table('t', column('col1')) @@ -70,72 +105,20 @@ class CompileTest(TestBase, AssertsCompiledSQL): "FROM t" % field) -class ReturningTest(TestBase, AssertsExecutionResults): - __only_on__ = 'postgres' - - @testing.exclude('postgres', '<', (8, 2), '8.3+ feature') - def test_update_returning(self): - meta = MetaData(testing.db) - table = Table('tables', meta, - Column('id', Integer, primary_key=True), - Column('persons', Integer), - Column('full', Boolean) - ) - table.create() - try: - table.insert().execute([{'persons': 5, 'full': False}, {'persons': 3, 'full': False}]) - - result = table.update(table.c.persons > 4, dict(full=True), postgres_returning=[table.c.id]).execute() - eq_(result.fetchall(), [(1,)]) - - result2 = select([table.c.id, table.c.full]).order_by(table.c.id).execute() - eq_(result2.fetchall(), [(1,True),(2,False)]) - finally: - table.drop() - - @testing.exclude('postgres', '<', (8, 2), '8.3+ feature') - def test_insert_returning(self): - meta = MetaData(testing.db) - table = Table('tables', meta, - Column('id', Integer, primary_key=True), - Column('persons', Integer), - Column('full', Boolean) - ) - table.create() - try: - result = table.insert(postgres_returning=[table.c.id]).execute({'persons': 1, 'full': False}) - - eq_(result.fetchall(), [(1,)]) - - @testing.fails_on('postgres', 'Known limitation of psycopg2') - def test_executemany(): - # return value is documented as failing with psycopg2/executemany - result2 = table.insert(postgres_returning=[table]).execute( - [{'persons': 2, 'full': False}, {'persons': 3, 'full': True}]) - eq_(result2.fetchall(), [(2, 2, False), (3,3,True)]) - - test_executemany() - - result3 = table.insert(postgres_returning=[(table.c.id*2).label('double_id')]).execute({'persons': 4, 'full': False}) - eq_([dict(row) for row in result3], [{'double_id':8}]) - - result4 = testing.db.execute('insert into tables (id, persons, "full") values (5, 10, true) returning persons') - eq_([dict(row) for row in result4], [{'persons': 10}]) - finally: - table.drop() - - class InsertTest(TestBase, AssertsExecutionResults): - __only_on__ = 'postgres' + __only_on__ = 'postgresql' @classmethod def setup_class(cls): global metadata + cls.engine= testing.db metadata = MetaData(testing.db) def teardown(self): metadata.drop_all() metadata.tables.clear() + if self.engine is not testing.db: + self.engine.dispose() def test_compiled_insert(self): table = Table('testtable', metadata, @@ -144,7 +127,7 @@ class InsertTest(TestBase, AssertsExecutionResults): metadata.create_all() - ins = table.insert(values={'data':bindparam('x')}).compile() + ins = table.insert(inline=True, values={'data':bindparam('x')}).compile() ins.execute({'x':"five"}, {'x':"seven"}) assert table.select().execute().fetchall() == [(1, 'five'), (2, 'seven')] @@ -155,6 +138,13 @@ class InsertTest(TestBase, AssertsExecutionResults): metadata.create_all() self._assert_data_with_sequence(table, "my_seq") + def test_sequence_returning_insert(self): + table = Table('testtable', metadata, + Column('id', Integer, Sequence('my_seq'), primary_key=True), + Column('data', String(30))) + metadata.create_all() + self._assert_data_with_sequence_returning(table, "my_seq") + def test_opt_sequence_insert(self): table = Table('testtable', metadata, Column('id', Integer, Sequence('my_seq', optional=True), primary_key=True), @@ -162,6 +152,13 @@ class InsertTest(TestBase, AssertsExecutionResults): metadata.create_all() self._assert_data_autoincrement(table) + def test_opt_sequence_returning_insert(self): + table = Table('testtable', metadata, + Column('id', Integer, Sequence('my_seq', optional=True), primary_key=True), + Column('data', String(30))) + metadata.create_all() + self._assert_data_autoincrement_returning(table) + def test_autoincrement_insert(self): table = Table('testtable', metadata, Column('id', Integer, primary_key=True), @@ -169,6 +166,13 @@ class InsertTest(TestBase, AssertsExecutionResults): metadata.create_all() self._assert_data_autoincrement(table) + def test_autoincrement_returning_insert(self): + table = Table('testtable', metadata, + Column('id', Integer, primary_key=True), + Column('data', String(30))) + metadata.create_all() + self._assert_data_autoincrement_returning(table) + def test_noautoincrement_insert(self): table = Table('testtable', metadata, Column('id', Integer, primary_key=True, autoincrement=False), @@ -177,14 +181,17 @@ class InsertTest(TestBase, AssertsExecutionResults): self._assert_data_noautoincrement(table) def _assert_data_autoincrement(self, table): + self.engine = engines.testing_engine(options={'implicit_returning':False}) + metadata.bind = self.engine + def go(): # execute with explicit id r = table.insert().execute({'id':30, 'data':'d1'}) - assert r.last_inserted_ids() == [30] + assert r.inserted_primary_key == [30] # execute with prefetch id r = table.insert().execute({'data':'d2'}) - assert r.last_inserted_ids() == [1] + assert r.inserted_primary_key == [1] # executemany with explicit ids table.insert().execute({'id':31, 'data':'d3'}, {'id':32, 'data':'d4'}) @@ -201,7 +208,7 @@ class InsertTest(TestBase, AssertsExecutionResults): # note that the test framework doesnt capture the "preexecute" of a seqeuence # or default. we just see it in the bind params. - self.assert_sql(testing.db, go, [], with_sequences=[ + self.assert_sql(self.engine, go, [], with_sequences=[ ( "INSERT INTO testtable (id, data) VALUES (:id, :data)", {'id':30, 'data':'d1'} @@ -242,19 +249,19 @@ class InsertTest(TestBase, AssertsExecutionResults): # test the same series of events using a reflected # version of the table - m2 = MetaData(testing.db) + m2 = MetaData(self.engine) table = Table(table.name, m2, autoload=True) def go(): table.insert().execute({'id':30, 'data':'d1'}) r = table.insert().execute({'data':'d2'}) - assert r.last_inserted_ids() == [5] + assert r.inserted_primary_key == [5] table.insert().execute({'id':31, 'data':'d3'}, {'id':32, 'data':'d4'}) table.insert().execute({'data':'d5'}, {'data':'d6'}) table.insert(inline=True).execute({'id':33, 'data':'d7'}) table.insert(inline=True).execute({'data':'d8'}) - self.assert_sql(testing.db, go, [], with_sequences=[ + self.assert_sql(self.engine, go, [], with_sequences=[ ( "INSERT INTO testtable (id, data) VALUES (:id, :data)", {'id':30, 'data':'d1'} @@ -293,7 +300,127 @@ class InsertTest(TestBase, AssertsExecutionResults): ] table.delete().execute() + def _assert_data_autoincrement_returning(self, table): + self.engine = engines.testing_engine(options={'implicit_returning':True}) + metadata.bind = self.engine + + def go(): + # execute with explicit id + r = table.insert().execute({'id':30, 'data':'d1'}) + assert r.inserted_primary_key == [30] + + # execute with prefetch id + r = table.insert().execute({'data':'d2'}) + assert r.inserted_primary_key == [1] + + # executemany with explicit ids + table.insert().execute({'id':31, 'data':'d3'}, {'id':32, 'data':'d4'}) + + # executemany, uses SERIAL + table.insert().execute({'data':'d5'}, {'data':'d6'}) + + # single execute, explicit id, inline + table.insert(inline=True).execute({'id':33, 'data':'d7'}) + + # single execute, inline, uses SERIAL + table.insert(inline=True).execute({'data':'d8'}) + + self.assert_sql(self.engine, go, [], with_sequences=[ + ( + "INSERT INTO testtable (id, data) VALUES (:id, :data)", + {'id':30, 'data':'d1'} + ), + ( + "INSERT INTO testtable (data) VALUES (:data) RETURNING testtable.id", + {'data': 'd2'} + ), + ( + "INSERT INTO testtable (id, data) VALUES (:id, :data)", + [{'id':31, 'data':'d3'}, {'id':32, 'data':'d4'}] + ), + ( + "INSERT INTO testtable (data) VALUES (:data)", + [{'data':'d5'}, {'data':'d6'}] + ), + ( + "INSERT INTO testtable (id, data) VALUES (:id, :data)", + [{'id':33, 'data':'d7'}] + ), + ( + "INSERT INTO testtable (data) VALUES (:data)", + [{'data':'d8'}] + ), + ]) + + assert table.select().execute().fetchall() == [ + (30, 'd1'), + (1, 'd2'), + (31, 'd3'), + (32, 'd4'), + (2, 'd5'), + (3, 'd6'), + (33, 'd7'), + (4, 'd8'), + ] + table.delete().execute() + + # test the same series of events using a reflected + # version of the table + m2 = MetaData(self.engine) + table = Table(table.name, m2, autoload=True) + + def go(): + table.insert().execute({'id':30, 'data':'d1'}) + r = table.insert().execute({'data':'d2'}) + assert r.inserted_primary_key == [5] + table.insert().execute({'id':31, 'data':'d3'}, {'id':32, 'data':'d4'}) + table.insert().execute({'data':'d5'}, {'data':'d6'}) + table.insert(inline=True).execute({'id':33, 'data':'d7'}) + table.insert(inline=True).execute({'data':'d8'}) + + self.assert_sql(self.engine, go, [], with_sequences=[ + ( + "INSERT INTO testtable (id, data) VALUES (:id, :data)", + {'id':30, 'data':'d1'} + ), + ( + "INSERT INTO testtable (data) VALUES (:data) RETURNING testtable.id", + {'data':'d2'} + ), + ( + "INSERT INTO testtable (id, data) VALUES (:id, :data)", + [{'id':31, 'data':'d3'}, {'id':32, 'data':'d4'}] + ), + ( + "INSERT INTO testtable (data) VALUES (:data)", + [{'data':'d5'}, {'data':'d6'}] + ), + ( + "INSERT INTO testtable (id, data) VALUES (:id, :data)", + [{'id':33, 'data':'d7'}] + ), + ( + "INSERT INTO testtable (data) VALUES (:data)", + [{'data':'d8'}] + ), + ]) + + assert table.select().execute().fetchall() == [ + (30, 'd1'), + (5, 'd2'), + (31, 'd3'), + (32, 'd4'), + (6, 'd5'), + (7, 'd6'), + (33, 'd7'), + (8, 'd8'), + ] + table.delete().execute() + def _assert_data_with_sequence(self, table, seqname): + self.engine = engines.testing_engine(options={'implicit_returning':False}) + metadata.bind = self.engine + def go(): table.insert().execute({'id':30, 'data':'d1'}) table.insert().execute({'data':'d2'}) @@ -302,7 +429,7 @@ class InsertTest(TestBase, AssertsExecutionResults): table.insert(inline=True).execute({'id':33, 'data':'d7'}) table.insert(inline=True).execute({'data':'d8'}) - self.assert_sql(testing.db, go, [], with_sequences=[ + self.assert_sql(self.engine, go, [], with_sequences=[ ( "INSERT INTO testtable (id, data) VALUES (:id, :data)", {'id':30, 'data':'d1'} @@ -343,18 +470,76 @@ class InsertTest(TestBase, AssertsExecutionResults): # cant test reflection here since the Sequence must be # explicitly specified + def _assert_data_with_sequence_returning(self, table, seqname): + self.engine = engines.testing_engine(options={'implicit_returning':True}) + metadata.bind = self.engine + + def go(): + table.insert().execute({'id':30, 'data':'d1'}) + table.insert().execute({'data':'d2'}) + table.insert().execute({'id':31, 'data':'d3'}, {'id':32, 'data':'d4'}) + table.insert().execute({'data':'d5'}, {'data':'d6'}) + table.insert(inline=True).execute({'id':33, 'data':'d7'}) + table.insert(inline=True).execute({'data':'d8'}) + + self.assert_sql(self.engine, go, [], with_sequences=[ + ( + "INSERT INTO testtable (id, data) VALUES (:id, :data)", + {'id':30, 'data':'d1'} + ), + ( + "INSERT INTO testtable (id, data) VALUES (nextval('my_seq'), :data) RETURNING testtable.id", + {'data':'d2'} + ), + ( + "INSERT INTO testtable (id, data) VALUES (:id, :data)", + [{'id':31, 'data':'d3'}, {'id':32, 'data':'d4'}] + ), + ( + "INSERT INTO testtable (id, data) VALUES (nextval('%s'), :data)" % seqname, + [{'data':'d5'}, {'data':'d6'}] + ), + ( + "INSERT INTO testtable (id, data) VALUES (:id, :data)", + [{'id':33, 'data':'d7'}] + ), + ( + "INSERT INTO testtable (id, data) VALUES (nextval('%s'), :data)" % seqname, + [{'data':'d8'}] + ), + ]) + + assert table.select().execute().fetchall() == [ + (30, 'd1'), + (1, 'd2'), + (31, 'd3'), + (32, 'd4'), + (2, 'd5'), + (3, 'd6'), + (33, 'd7'), + (4, 'd8'), + ] + + # cant test reflection here since the Sequence must be + # explicitly specified + def _assert_data_noautoincrement(self, table): + self.engine = engines.testing_engine(options={'implicit_returning':False}) + metadata.bind = self.engine + table.insert().execute({'id':30, 'data':'d1'}) - try: - table.insert().execute({'data':'d2'}) - assert False - except exc.IntegrityError, e: - assert "violates not-null constraint" in str(e) - try: - table.insert().execute({'data':'d2'}, {'data':'d3'}) - assert False - except exc.IntegrityError, e: - assert "violates not-null constraint" in str(e) + + if self.engine.driver == 'pg8000': + exception_cls = exc.ProgrammingError + else: + exception_cls = exc.IntegrityError + + assert_raises_message(exception_cls, "violates not-null constraint", table.insert().execute, {'data':'d2'}) + assert_raises_message(exception_cls, "violates not-null constraint", table.insert().execute, {'data':'d2'}, {'data':'d3'}) + + assert_raises_message(exception_cls, "violates not-null constraint", table.insert().execute, {'data':'d2'}) + + assert_raises_message(exception_cls, "violates not-null constraint", table.insert().execute, {'data':'d2'}, {'data':'d3'}) table.insert().execute({'id':31, 'data':'d2'}, {'id':32, 'data':'d3'}) table.insert(inline=True).execute({'id':33, 'data':'d4'}) @@ -369,19 +554,12 @@ class InsertTest(TestBase, AssertsExecutionResults): # test the same series of events using a reflected # version of the table - m2 = MetaData(testing.db) + m2 = MetaData(self.engine) table = Table(table.name, m2, autoload=True) table.insert().execute({'id':30, 'data':'d1'}) - try: - table.insert().execute({'data':'d2'}) - assert False - except exc.IntegrityError, e: - assert "violates not-null constraint" in str(e) - try: - table.insert().execute({'data':'d2'}, {'data':'d3'}) - assert False - except exc.IntegrityError, e: - assert "violates not-null constraint" in str(e) + + assert_raises_message(exception_cls, "violates not-null constraint", table.insert().execute, {'data':'d2'}) + assert_raises_message(exception_cls, "violates not-null constraint", table.insert().execute, {'data':'d2'}, {'data':'d3'}) table.insert().execute({'id':31, 'data':'d2'}, {'id':32, 'data':'d3'}) table.insert(inline=True).execute({'id':33, 'data':'d4'}) @@ -396,36 +574,36 @@ class InsertTest(TestBase, AssertsExecutionResults): class DomainReflectionTest(TestBase, AssertsExecutionResults): "Test PostgreSQL domains" - __only_on__ = 'postgres' + __only_on__ = 'postgresql' @classmethod def setup_class(cls): con = testing.db.connect() for ddl in ('CREATE DOMAIN testdomain INTEGER NOT NULL DEFAULT 42', - 'CREATE DOMAIN alt_schema.testdomain INTEGER DEFAULT 0'): + 'CREATE DOMAIN test_schema.testdomain INTEGER DEFAULT 0'): try: con.execute(ddl) except exc.SQLError, e: if not "already exists" in str(e): raise e con.execute('CREATE TABLE testtable (question integer, answer testdomain)') - con.execute('CREATE TABLE alt_schema.testtable(question integer, answer alt_schema.testdomain, anything integer)') - con.execute('CREATE TABLE crosschema (question integer, answer alt_schema.testdomain)') + con.execute('CREATE TABLE test_schema.testtable(question integer, answer test_schema.testdomain, anything integer)') + con.execute('CREATE TABLE crosschema (question integer, answer test_schema.testdomain)') @classmethod def teardown_class(cls): con = testing.db.connect() con.execute('DROP TABLE testtable') - con.execute('DROP TABLE alt_schema.testtable') + con.execute('DROP TABLE test_schema.testtable') con.execute('DROP TABLE crosschema') con.execute('DROP DOMAIN testdomain') - con.execute('DROP DOMAIN alt_schema.testdomain') + con.execute('DROP DOMAIN test_schema.testdomain') def test_table_is_reflected(self): metadata = MetaData(testing.db) table = Table('testtable', metadata, autoload=True) eq_(set(table.columns.keys()), set(['question', 'answer']), "Columns of reflected table didn't equal expected columns") - eq_(table.c.answer.type.__class__, postgres.PGInteger) + assert isinstance(table.c.answer.type, Integer) def test_domain_is_reflected(self): metadata = MetaData(testing.db) @@ -433,15 +611,15 @@ class DomainReflectionTest(TestBase, AssertsExecutionResults): eq_(str(table.columns.answer.server_default.arg), '42', "Reflected default value didn't equal expected value") assert not table.columns.answer.nullable, "Expected reflected column to not be nullable." - def test_table_is_reflected_alt_schema(self): + def test_table_is_reflected_test_schema(self): metadata = MetaData(testing.db) - table = Table('testtable', metadata, autoload=True, schema='alt_schema') + table = Table('testtable', metadata, autoload=True, schema='test_schema') eq_(set(table.columns.keys()), set(['question', 'answer', 'anything']), "Columns of reflected table didn't equal expected columns") - eq_(table.c.anything.type.__class__, postgres.PGInteger) + assert isinstance(table.c.anything.type, Integer) def test_schema_domain_is_reflected(self): metadata = MetaData(testing.db) - table = Table('testtable', metadata, autoload=True, schema='alt_schema') + table = Table('testtable', metadata, autoload=True, schema='test_schema') eq_(str(table.columns.answer.server_default.arg), '0', "Reflected default value didn't equal expected value") assert table.columns.answer.nullable, "Expected reflected column to be nullable." @@ -452,10 +630,10 @@ class DomainReflectionTest(TestBase, AssertsExecutionResults): assert table.columns.answer.nullable, "Expected reflected column to be nullable." def test_unknown_types(self): - from sqlalchemy.databases import postgres + from sqlalchemy.databases import postgresql - ischema_names = postgres.ischema_names - postgres.ischema_names = {} + ischema_names = postgresql.PGDialect.ischema_names + postgresql.PGDialect.ischema_names = {} try: m2 = MetaData(testing.db) assert_raises(exc.SAWarning, Table, "testtable", m2, autoload=True) @@ -467,11 +645,11 @@ class DomainReflectionTest(TestBase, AssertsExecutionResults): assert t3.c.answer.type.__class__ == sa.types.NullType finally: - postgres.ischema_names = ischema_names + postgresql.PGDialect.ischema_names = ischema_names -class MiscTest(TestBase, AssertsExecutionResults): - __only_on__ = 'postgres' +class MiscTest(TestBase, AssertsExecutionResults, AssertsCompiledSQL): + __only_on__ = 'postgresql' def test_date_reflection(self): m1 = MetaData(testing.db) @@ -536,26 +714,26 @@ class MiscTest(TestBase, AssertsExecutionResults): 'FROM mytable') def test_schema_reflection(self): - """note: this test requires that the 'alt_schema' schema be separate and accessible by the test user""" + """note: this test requires that the 'test_schema' schema be separate and accessible by the test user""" meta1 = MetaData(testing.db) users = Table('users', meta1, Column('user_id', Integer, primary_key = True), Column('user_name', String(30), nullable = False), - schema="alt_schema" + schema="test_schema" ) addresses = Table('email_addresses', meta1, Column('address_id', Integer, primary_key = True), Column('remote_user_id', Integer, ForeignKey(users.c.user_id)), Column('email_address', String(20)), - schema="alt_schema" + schema="test_schema" ) meta1.create_all() try: meta2 = MetaData(testing.db) - addresses = Table('email_addresses', meta2, autoload=True, schema="alt_schema") - users = Table('users', meta2, mustexist=True, schema="alt_schema") + addresses = Table('email_addresses', meta2, autoload=True, schema="test_schema") + users = Table('users', meta2, mustexist=True, schema="test_schema") print users print addresses @@ -574,12 +752,12 @@ class MiscTest(TestBase, AssertsExecutionResults): referer = Table("referer", meta1, Column("id", Integer, primary_key=True), Column("ref", Integer, ForeignKey('subject.id')), - schema="alt_schema") + schema="test_schema") meta1.create_all() try: meta2 = MetaData(testing.db) subject = Table("subject", meta2, autoload=True) - referer = Table("referer", meta2, schema="alt_schema", autoload=True) + referer = Table("referer", meta2, schema="test_schema", autoload=True) print str(subject.join(referer).onclause) self.assert_((subject.c.id==referer.c.ref).compare(subject.join(referer).onclause)) finally: @@ -589,19 +767,19 @@ class MiscTest(TestBase, AssertsExecutionResults): meta1 = MetaData(testing.db) subject = Table("subject", meta1, Column("id", Integer, primary_key=True), - schema='alt_schema_2' + schema='test_schema_2' ) referer = Table("referer", meta1, Column("id", Integer, primary_key=True), - Column("ref", Integer, ForeignKey('alt_schema_2.subject.id')), - schema="alt_schema") + Column("ref", Integer, ForeignKey('test_schema_2.subject.id')), + schema="test_schema") meta1.create_all() try: meta2 = MetaData(testing.db) - subject = Table("subject", meta2, autoload=True, schema="alt_schema_2") - referer = Table("referer", meta2, schema="alt_schema", autoload=True) + subject = Table("subject", meta2, autoload=True, schema="test_schema_2") + referer = Table("referer", meta2, schema="test_schema", autoload=True) print str(subject.join(referer).onclause) self.assert_((subject.c.id==referer.c.ref).compare(subject.join(referer).onclause)) finally: @@ -611,7 +789,7 @@ class MiscTest(TestBase, AssertsExecutionResults): meta = MetaData(testing.db) users = Table('users', meta, Column('id', Integer, primary_key=True), - Column('name', String(50)), schema='alt_schema') + Column('name', String(50)), schema='test_schema') users.create() try: users.insert().execute(id=1, name='name1') @@ -646,15 +824,15 @@ class MiscTest(TestBase, AssertsExecutionResults): user_name VARCHAR NOT NULL, user_password VARCHAR NOT NULL ); - """, None) + """) t = Table("speedy_users", meta, autoload=True) r = t.insert().execute(user_name='user', user_password='lala') - assert r.last_inserted_ids() == [1] + assert r.inserted_primary_key == [1] l = t.select().execute().fetchall() assert l == [(1, 'user', 'lala')] finally: - testing.db.execute("drop table speedy_users", None) + testing.db.execute("drop table speedy_users") @testing.emits_warning() def test_index_reflection(self): @@ -676,10 +854,10 @@ class MiscTest(TestBase, AssertsExecutionResults): testing.db.execute(""" create index idx1 on party ((id || name)) - """, None) + """) testing.db.execute(""" create unique index idx2 on party (id) where name = 'test' - """, None) + """) testing.db.execute(""" create index idx3 on party using btree @@ -713,35 +891,42 @@ class MiscTest(TestBase, AssertsExecutionResults): warnings.warn = capture_warnings._orig_showwarning m1.drop_all() - def test_create_partial_index(self): - tbl = Table('testtbl', MetaData(), Column('data',Integer)) - idx = Index('test_idx1', tbl.c.data, postgres_where=and_(tbl.c.data > 5, tbl.c.data < 10)) - - executed_sql = [] - mock_strategy = MockEngineStrategy() - mock_conn = mock_strategy.create('postgres://', executed_sql.append) + def test_set_isolation_level(self): + """Test setting the isolation level with create_engine""" + eng = create_engine(testing.db.url) + eq_( + eng.execute("show transaction isolation level").scalar(), + 'read committed') + eng = create_engine(testing.db.url, isolation_level="SERIALIZABLE") + eq_( + eng.execute("show transaction isolation level").scalar(), + 'serializable') + eng = create_engine(testing.db.url, isolation_level="FOO") - idx.create(mock_conn) + if testing.db.driver == 'zxjdbc': + exception_cls = eng.dialect.dbapi.Error + else: + exception_cls = eng.dialect.dbapi.ProgrammingError + assert_raises(exception_cls, eng.execute, "show transaction isolation level") - assert executed_sql == ['CREATE INDEX test_idx1 ON testtbl (data) WHERE testtbl.data > 5 AND testtbl.data < 10'] class TimezoneTest(TestBase, AssertsExecutionResults): """Test timezone-aware datetimes. - psycopg will return a datetime with a tzinfo attached to it, if postgres + psycopg will return a datetime with a tzinfo attached to it, if postgresql returns it. python then will not let you compare a datetime with a tzinfo to a datetime that doesnt have one. this test illustrates two ways to have datetime types with and without timezone info. """ - __only_on__ = 'postgres' + __only_on__ = 'postgresql' @classmethod def setup_class(cls): global tztable, notztable, metadata metadata = MetaData(testing.db) - # current_timestamp() in postgres is assumed to return TIMESTAMP WITH TIMEZONE + # current_timestamp() in postgresql is assumed to return TIMESTAMP WITH TIMEZONE tztable = Table('tztable', metadata, Column("id", Integer, primary_key=True), Column("date", DateTime(timezone=True), onupdate=func.current_timestamp()), @@ -762,17 +947,17 @@ class TimezoneTest(TestBase, AssertsExecutionResults): somedate = testing.db.connect().scalar(func.current_timestamp().select()) tztable.insert().execute(id=1, name='row1', date=somedate) c = tztable.update(tztable.c.id==1).execute(name='newname') - print tztable.select(tztable.c.id==1).execute().fetchone() + print tztable.select(tztable.c.id==1).execute().first() def test_without_timezone(self): # get a date without a tzinfo somedate = datetime.datetime(2005, 10,20, 11, 52, 00) notztable.insert().execute(id=1, name='row1', date=somedate) c = notztable.update(notztable.c.id==1).execute(name='newname') - print notztable.select(tztable.c.id==1).execute().fetchone() + print notztable.select(tztable.c.id==1).execute().first() class ArrayTest(TestBase, AssertsExecutionResults): - __only_on__ = 'postgres' + __only_on__ = 'postgresql' @classmethod def setup_class(cls): @@ -781,10 +966,14 @@ class ArrayTest(TestBase, AssertsExecutionResults): arrtable = Table('arrtable', metadata, Column('id', Integer, primary_key=True), - Column('intarr', postgres.PGArray(Integer)), - Column('strarr', postgres.PGArray(String(convert_unicode=True)), nullable=False) + Column('intarr', postgresql.PGArray(Integer)), + Column('strarr', postgresql.PGArray(String(convert_unicode=True)), nullable=False) ) metadata.create_all() + + def teardown(self): + arrtable.delete().execute() + @classmethod def teardown_class(cls): metadata.drop_all() @@ -792,34 +981,38 @@ class ArrayTest(TestBase, AssertsExecutionResults): def test_reflect_array_column(self): metadata2 = MetaData(testing.db) tbl = Table('arrtable', metadata2, autoload=True) - assert isinstance(tbl.c.intarr.type, postgres.PGArray) - assert isinstance(tbl.c.strarr.type, postgres.PGArray) + assert isinstance(tbl.c.intarr.type, postgresql.PGArray) + assert isinstance(tbl.c.strarr.type, postgresql.PGArray) assert isinstance(tbl.c.intarr.type.item_type, Integer) assert isinstance(tbl.c.strarr.type.item_type, String) + @testing.fails_on('postgresql+zxjdbc', 'zxjdbc has no support for PG arrays') def test_insert_array(self): arrtable.insert().execute(intarr=[1,2,3], strarr=['abc', 'def']) results = arrtable.select().execute().fetchall() eq_(len(results), 1) eq_(results[0]['intarr'], [1,2,3]) eq_(results[0]['strarr'], ['abc','def']) - arrtable.delete().execute() + @testing.fails_on('postgresql+pg8000', 'pg8000 has poor support for PG arrays') + @testing.fails_on('postgresql+zxjdbc', 'zxjdbc has no support for PG arrays') def test_array_where(self): arrtable.insert().execute(intarr=[1,2,3], strarr=['abc', 'def']) arrtable.insert().execute(intarr=[4,5,6], strarr='ABC') results = arrtable.select().where(arrtable.c.intarr == [1,2,3]).execute().fetchall() eq_(len(results), 1) eq_(results[0]['intarr'], [1,2,3]) - arrtable.delete().execute() + @testing.fails_on('postgresql+pg8000', 'pg8000 has poor support for PG arrays') + @testing.fails_on('postgresql+zxjdbc', 'zxjdbc has no support for PG arrays') def test_array_concat(self): arrtable.insert().execute(intarr=[1,2,3], strarr=['abc', 'def']) results = select([arrtable.c.intarr + [4,5,6]]).execute().fetchall() eq_(len(results), 1) eq_(results[0][0], [1,2,3,4,5,6]) - arrtable.delete().execute() + @testing.fails_on('postgresql+pg8000', 'pg8000 has poor support for PG arrays') + @testing.fails_on('postgresql+zxjdbc', 'zxjdbc has no support for PG arrays') def test_array_subtype_resultprocessor(self): arrtable.insert().execute(intarr=[4,5,6], strarr=[[u'm\xe4\xe4'], [u'm\xf6\xf6']]) arrtable.insert().execute(intarr=[1,2,3], strarr=[u'm\xe4\xe4', u'm\xf6\xf6']) @@ -827,13 +1020,14 @@ class ArrayTest(TestBase, AssertsExecutionResults): eq_(len(results), 2) eq_(results[0]['strarr'], [u'm\xe4\xe4', u'm\xf6\xf6']) eq_(results[1]['strarr'], [[u'm\xe4\xe4'], [u'm\xf6\xf6']]) - arrtable.delete().execute() + @testing.fails_on('postgresql+pg8000', 'pg8000 has poor support for PG arrays') + @testing.fails_on('postgresql+zxjdbc', 'zxjdbc has no support for PG arrays') def test_array_mutability(self): class Foo(object): pass footable = Table('foo', metadata, Column('id', Integer, primary_key=True), - Column('intarr', postgres.PGArray(Integer), nullable=True) + Column('intarr', postgresql.PGArray(Integer), nullable=True) ) mapper(Foo, footable) metadata.create_all() @@ -870,19 +1064,19 @@ class ArrayTest(TestBase, AssertsExecutionResults): sess.add(foo) sess.flush() -class TimeStampTest(TestBase, AssertsExecutionResults): - __only_on__ = 'postgres' - - @testing.uses_deprecated() +class TimestampTest(TestBase, AssertsExecutionResults): + __only_on__ = 'postgresql' + def test_timestamp(self): engine = testing.db connection = engine.connect() - s = select([func.TIMESTAMP("12/25/07").label("ts")]) - result = connection.execute(s).fetchone() + + s = select(["timestamp '2007-12-25'"]) + result = connection.execute(s).first() eq_(result[0], datetime.datetime(2007, 12, 25, 0, 0)) class ServerSideCursorsTest(TestBase, AssertsExecutionResults): - __only_on__ = 'postgres' + __only_on__ = 'postgresql+psycopg2' @classmethod def setup_class(cls): @@ -927,8 +1121,8 @@ class ServerSideCursorsTest(TestBase, AssertsExecutionResults): class SpecialTypesTest(TestBase, ComparesTables): """test DDL and reflection of PG-specific types """ - __only_on__ = 'postgres' - __excluded_on__ = (('postgres', '<', (8, 3, 0)),) + __only_on__ = 'postgresql' + __excluded_on__ = (('postgresql', '<', (8, 3, 0)),) @classmethod def setup_class(cls): @@ -936,11 +1130,11 @@ class SpecialTypesTest(TestBase, ComparesTables): metadata = MetaData(testing.db) table = Table('sometable', metadata, - Column('id', postgres.PGUuid, primary_key=True), - Column('flag', postgres.PGBit), - Column('addr', postgres.PGInet), - Column('addr2', postgres.PGMacAddr), - Column('addr3', postgres.PGCidr) + Column('id', postgresql.PGUuid, primary_key=True), + Column('flag', postgresql.PGBit), + Column('addr', postgresql.PGInet), + Column('addr2', postgresql.PGMacAddr), + Column('addr3', postgresql.PGCidr) ) metadata.create_all() @@ -957,8 +1151,8 @@ class SpecialTypesTest(TestBase, ComparesTables): class MatchTest(TestBase, AssertsCompiledSQL): - __only_on__ = 'postgres' - __excluded_on__ = (('postgres', '<', (8, 3, 0)),) + __only_on__ = 'postgresql' + __excluded_on__ = (('postgresql', '<', (8, 3, 0)),) @classmethod def setup_class(cls): @@ -992,9 +1186,16 @@ class MatchTest(TestBase, AssertsCompiledSQL): def teardown_class(cls): metadata.drop_all() - def test_expression(self): + @testing.fails_on('postgresql+pg8000', 'uses positional') + @testing.fails_on('postgresql+zxjdbc', 'uses qmark') + def test_expression_pyformat(self): self.assert_compile(matchtable.c.title.match('somstr'), "matchtable.title @@ to_tsquery(%(title_1)s)") + @testing.fails_on('postgresql+psycopg2', 'uses pyformat') + @testing.fails_on('postgresql+zxjdbc', 'uses qmark') + def test_expression_positional(self): + self.assert_compile(matchtable.c.title.match('somstr'), "matchtable.title @@ to_tsquery(%s)") + def test_simple_match(self): results = matchtable.select().where(matchtable.c.title.match('python')).order_by(matchtable.c.id).execute().fetchall() eq_([2, 5], [r.id for r in results]) diff --git a/test/dialect/test_sqlite.py b/test/dialect/test_sqlite.py index eb4581e20..448ee947c 100644 --- a/test/dialect/test_sqlite.py +++ b/test/dialect/test_sqlite.py @@ -4,7 +4,7 @@ from sqlalchemy.test.testing import eq_, assert_raises, assert_raises_message import datetime from sqlalchemy import * from sqlalchemy import exc, sql -from sqlalchemy.databases import sqlite +from sqlalchemy.dialects.sqlite import base as sqlite, pysqlite as pysqlite_dialect from sqlalchemy.test import * @@ -19,7 +19,7 @@ class TestTypes(TestBase, AssertsExecutionResults): meta = MetaData(testing.db) t = Table('bool_table', meta, Column('id', Integer, primary_key=True), - Column('boo', sqlite.SLBoolean)) + Column('boo', Boolean)) try: meta.create_all() @@ -39,7 +39,7 @@ class TestTypes(TestBase, AssertsExecutionResults): def test_time_microseconds(self): dt = datetime.datetime(2008, 6, 27, 12, 0, 0, 125) # 125 usec eq_(str(dt), '2008-06-27 12:00:00.000125') - sldt = sqlite.SLDateTime() + sldt = sqlite._SLDateTime() bp = sldt.bind_processor(None) eq_(bp(dt), '2008-06-27 12:00:00.000125') @@ -69,59 +69,44 @@ class TestTypes(TestBase, AssertsExecutionResults): bindproc = t.dialect_impl(dialect).bind_processor(dialect) assert not bindproc or isinstance(bindproc(u"some string"), unicode) - @testing.uses_deprecated('Using String type with no length') def test_type_reflection(self): # (ask_for, roundtripped_as_if_different) - specs = [( String(), sqlite.SLString(), ), - ( String(1), sqlite.SLString(1), ), - ( String(3), sqlite.SLString(3), ), - ( Text(), sqlite.SLText(), ), - ( Unicode(), sqlite.SLString(), ), - ( Unicode(1), sqlite.SLString(1), ), - ( Unicode(3), sqlite.SLString(3), ), - ( UnicodeText(), sqlite.SLText(), ), - ( CLOB, sqlite.SLText(), ), - ( sqlite.SLChar(1), ), - ( CHAR(3), sqlite.SLChar(3), ), - ( NCHAR(2), sqlite.SLChar(2), ), - ( SmallInteger(), sqlite.SLSmallInteger(), ), - ( sqlite.SLSmallInteger(), ), - ( Binary(3), sqlite.SLBinary(), ), - ( Binary(), sqlite.SLBinary() ), - ( sqlite.SLBinary(3), sqlite.SLBinary(), ), - ( NUMERIC, sqlite.SLNumeric(), ), - ( NUMERIC(10,2), sqlite.SLNumeric(10,2), ), - ( Numeric, sqlite.SLNumeric(), ), - ( Numeric(10, 2), sqlite.SLNumeric(10, 2), ), - ( DECIMAL, sqlite.SLNumeric(), ), - ( DECIMAL(10, 2), sqlite.SLNumeric(10, 2), ), - ( Float, sqlite.SLFloat(), ), - ( sqlite.SLNumeric(), ), - ( INT, sqlite.SLInteger(), ), - ( Integer, sqlite.SLInteger(), ), - ( sqlite.SLInteger(), ), - ( TIMESTAMP, sqlite.SLDateTime(), ), - ( DATETIME, sqlite.SLDateTime(), ), - ( DateTime, sqlite.SLDateTime(), ), - ( sqlite.SLDateTime(), ), - ( DATE, sqlite.SLDate(), ), - ( Date, sqlite.SLDate(), ), - ( sqlite.SLDate(), ), - ( TIME, sqlite.SLTime(), ), - ( Time, sqlite.SLTime(), ), - ( sqlite.SLTime(), ), - ( BOOLEAN, sqlite.SLBoolean(), ), - ( Boolean, sqlite.SLBoolean(), ), - ( sqlite.SLBoolean(), ), + specs = [( String(), String(), ), + ( String(1), String(1), ), + ( String(3), String(3), ), + ( Text(), Text(), ), + ( Unicode(), String(), ), + ( Unicode(1), String(1), ), + ( Unicode(3), String(3), ), + ( UnicodeText(), Text(), ), + ( CHAR(1), ), + ( CHAR(3), CHAR(3), ), + ( NUMERIC, NUMERIC(), ), + ( NUMERIC(10,2), NUMERIC(10,2), ), + ( Numeric, NUMERIC(), ), + ( Numeric(10, 2), NUMERIC(10, 2), ), + ( DECIMAL, DECIMAL(), ), + ( DECIMAL(10, 2), DECIMAL(10, 2), ), + ( Float, Float(), ), + ( NUMERIC(), ), + ( TIMESTAMP, TIMESTAMP(), ), + ( DATETIME, DATETIME(), ), + ( DateTime, DateTime(), ), + ( DateTime(), ), + ( DATE, DATE(), ), + ( Date, Date(), ), + ( TIME, TIME(), ), + ( Time, Time(), ), + ( BOOLEAN, BOOLEAN(), ), + ( Boolean, Boolean(), ), ] columns = [Column('c%i' % (i + 1), t[0]) for i, t in enumerate(specs)] db = testing.db m = MetaData(db) t_table = Table('types', m, *columns) + m.create_all() try: - m.create_all() - m2 = MetaData(db) rt = Table('types', m2, autoload=True) try: @@ -131,7 +116,7 @@ class TestTypes(TestBase, AssertsExecutionResults): expected = [len(c) > 1 and c[1] or c[0] for c in specs] for table in rt, rv: for i, reflected in enumerate(table.c): - assert isinstance(reflected.type, type(expected[i])), type(expected[i]) + assert isinstance(reflected.type, type(expected[i])), "%d: %r" % (i, type(expected[i])) finally: db.execute('DROP VIEW types_v') finally: @@ -163,7 +148,7 @@ class TestDefaults(TestBase, AssertsExecutionResults): rt = Table('t_defaults', m2, autoload=True) expected = [c[1] for c in specs] for i, reflected in enumerate(rt.c): - eq_(reflected.server_default.arg.text, expected[i]) + eq_(str(reflected.server_default.arg), expected[i]) finally: m.drop_all() @@ -173,7 +158,7 @@ class TestDefaults(TestBase, AssertsExecutionResults): db = testing.db m = MetaData(db) - expected = ["'my_default'", '0'] + expected = ["my_default", '0'] table = """CREATE TABLE r_defaults ( data VARCHAR(40) DEFAULT 'my_default', val INTEGER NOT NULL DEFAULT 0 @@ -184,7 +169,7 @@ class TestDefaults(TestBase, AssertsExecutionResults): rt = Table('r_defaults', m, autoload=True) for i, reflected in enumerate(rt.c): - eq_(reflected.server_default.arg.text, expected[i]) + eq_(str(reflected.server_default.arg), expected[i]) finally: db.execute("DROP TABLE r_defaults") @@ -247,24 +232,24 @@ class DialectTest(TestBase, AssertsExecutionResults): def test_attached_as_schema(self): cx = testing.db.connect() try: - cx.execute('ATTACH DATABASE ":memory:" AS alt_schema') + cx.execute('ATTACH DATABASE ":memory:" AS test_schema') dialect = cx.dialect - assert dialect.table_names(cx, 'alt_schema') == [] + assert dialect.table_names(cx, 'test_schema') == [] meta = MetaData(cx) Table('created', meta, Column('id', Integer), - schema='alt_schema') + schema='test_schema') alt_master = Table('sqlite_master', meta, autoload=True, - schema='alt_schema') + schema='test_schema') meta.create_all(cx) - eq_(dialect.table_names(cx, 'alt_schema'), + eq_(dialect.table_names(cx, 'test_schema'), ['created']) assert len(alt_master.c) > 0 meta.clear() reflected = Table('created', meta, autoload=True, - schema='alt_schema') + schema='test_schema') assert len(reflected.c) == 1 cx.execute(reflected.insert(), dict(id=1)) @@ -282,9 +267,9 @@ class DialectTest(TestBase, AssertsExecutionResults): # note that sqlite_master is cleared, above meta.drop_all() - assert dialect.table_names(cx, 'alt_schema') == [] + assert dialect.table_names(cx, 'test_schema') == [] finally: - cx.execute('DETACH DATABASE alt_schema') + cx.execute('DETACH DATABASE test_schema') @testing.exclude('sqlite', '<', (2, 6), 'no database support') def test_temp_table_reflection(self): @@ -305,6 +290,20 @@ class DialectTest(TestBase, AssertsExecutionResults): pass raise + def test_set_isolation_level(self): + """Test setting the read uncommitted/serializable levels""" + eng = create_engine(testing.db.url) + eq_(eng.execute("PRAGMA read_uncommitted").scalar(), 0) + + eng = create_engine(testing.db.url, isolation_level="READ UNCOMMITTED") + eq_(eng.execute("PRAGMA read_uncommitted").scalar(), 1) + + eng = create_engine(testing.db.url, isolation_level="SERIALIZABLE") + eq_(eng.execute("PRAGMA read_uncommitted").scalar(), 0) + + assert_raises(exc.ArgumentError, create_engine, testing.db.url, + isolation_level="FOO") + class SQLTest(TestBase, AssertsCompiledSQL): """Tests SQLite-dialect specific compilation.""" |
