diff options
Diffstat (limited to 'test/sql')
-rw-r--r-- | test/sql/test_compiler.py | 2 | ||||
-rw-r--r-- | test/sql/test_constraints.py | 219 | ||||
-rw-r--r-- | test/sql/test_cte.py | 30 | ||||
-rw-r--r-- | test/sql/test_ddlemit.py | 67 | ||||
-rw-r--r-- | test/sql/test_defaults.py | 25 | ||||
-rw-r--r-- | test/sql/test_insert.py | 136 | ||||
-rw-r--r-- | test/sql/test_join_rewriting.py | 1 | ||||
-rw-r--r-- | test/sql/test_metadata.py | 43 | ||||
-rw-r--r-- | test/sql/test_operators.py | 67 | ||||
-rw-r--r-- | test/sql/test_types.py | 71 |
10 files changed, 587 insertions, 74 deletions
diff --git a/test/sql/test_compiler.py b/test/sql/test_compiler.py index 9e99a947b..428fc8986 100644 --- a/test/sql/test_compiler.py +++ b/test/sql/test_compiler.py @@ -2440,7 +2440,7 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): """SELECT /*+ "QuotedName" idx1 */ "QuotedName".col1 """ """FROM "QuotedName" WHERE "QuotedName".col1 > :col1_1"""), (s7, oracle_d, - """SELECT /*+ SomeName idx1 */ "SomeName".col1 FROM """ + """SELECT /*+ "SomeName" idx1 */ "SomeName".col1 FROM """ """"QuotedName" "SomeName" WHERE "SomeName".col1 > :col1_1"""), ]: self.assert_compile( diff --git a/test/sql/test_constraints.py b/test/sql/test_constraints.py index c0b5806ac..2603f67a3 100644 --- a/test/sql/test_constraints.py +++ b/test/sql/test_constraints.py @@ -9,7 +9,7 @@ from sqlalchemy import testing from sqlalchemy.engine import default from sqlalchemy.testing import engines from sqlalchemy.testing import eq_ -from sqlalchemy.testing.assertsql import AllOf, RegexSQL, ExactSQL, CompiledSQL +from sqlalchemy.testing.assertsql import AllOf, RegexSQL, CompiledSQL from sqlalchemy.sql import table, column @@ -58,8 +58,77 @@ class ConstraintGenTest(fixtures.TestBase, AssertsExecutionResults): ) ) + @testing.force_drop_names('a', 'b') + def test_fk_cant_drop_cycled_unnamed(self): + metadata = MetaData() + + Table("a", metadata, + Column('id', Integer, primary_key=True), + Column('bid', Integer), + ForeignKeyConstraint(["bid"], ["b.id"]) + ) + Table( + "b", metadata, + Column('id', Integer, primary_key=True), + Column("aid", Integer), + ForeignKeyConstraint(["aid"], ["a.id"])) + metadata.create_all(testing.db) + if testing.db.dialect.supports_alter: + assert_raises_message( + exc.CircularDependencyError, + "Can't sort tables for DROP; an unresolvable foreign key " + "dependency exists between tables: a, b. Please ensure " + "that the ForeignKey and ForeignKeyConstraint objects " + "involved in the cycle have names so that they can be " + "dropped using DROP CONSTRAINT.", + metadata.drop_all, testing.db + ) + else: + + with self.sql_execution_asserter() as asserter: + metadata.drop_all(testing.db, checkfirst=False) + + asserter.assert_( + AllOf( + CompiledSQL("DROP TABLE a"), + CompiledSQL("DROP TABLE b") + ) + ) + + @testing.provide_metadata + def test_fk_table_auto_alter_constraint_create(self): + metadata = self.metadata + + Table("a", metadata, + Column('id', Integer, primary_key=True), + Column('bid', Integer), + ForeignKeyConstraint(["bid"], ["b.id"]) + ) + Table( + "b", metadata, + Column('id', Integer, primary_key=True), + Column("aid", Integer), + ForeignKeyConstraint(["aid"], ["a.id"], name="bfk")) + self._assert_cyclic_constraint(metadata, auto=True) + + @testing.provide_metadata + def test_fk_column_auto_alter_constraint_create(self): + metadata = self.metadata + + Table("a", metadata, + Column('id', Integer, primary_key=True), + Column('bid', Integer, ForeignKey("b.id")), + ) + Table("b", metadata, + Column('id', Integer, primary_key=True), + Column("aid", Integer, + ForeignKey("a.id", name="bfk") + ), + ) + self._assert_cyclic_constraint(metadata, auto=True) + @testing.provide_metadata - def test_cyclic_fk_table_constraint_create(self): + def test_fk_table_use_alter_constraint_create(self): metadata = self.metadata Table("a", metadata, @@ -75,7 +144,7 @@ class ConstraintGenTest(fixtures.TestBase, AssertsExecutionResults): self._assert_cyclic_constraint(metadata) @testing.provide_metadata - def test_cyclic_fk_column_constraint_create(self): + def test_fk_column_use_alter_constraint_create(self): metadata = self.metadata Table("a", metadata, @@ -90,45 +159,104 @@ class ConstraintGenTest(fixtures.TestBase, AssertsExecutionResults): ) self._assert_cyclic_constraint(metadata) - def _assert_cyclic_constraint(self, metadata): - assertions = [ - CompiledSQL('CREATE TABLE b (' + def _assert_cyclic_constraint(self, metadata, auto=False): + table_assertions = [] + if auto: + if testing.db.dialect.supports_alter: + table_assertions.append( + CompiledSQL('CREATE TABLE b (' + 'id INTEGER NOT NULL, ' + 'aid INTEGER, ' + 'PRIMARY KEY (id)' + ')' + ) + ) + else: + table_assertions.append( + CompiledSQL( + 'CREATE TABLE b (' 'id INTEGER NOT NULL, ' 'aid INTEGER, ' + 'PRIMARY KEY (id), ' + 'CONSTRAINT bfk FOREIGN KEY(aid) REFERENCES a (id)' + ')' + ) + ) + + if testing.db.dialect.supports_alter: + table_assertions.append( + CompiledSQL( + 'CREATE TABLE a (' + 'id INTEGER NOT NULL, ' + 'bid INTEGER, ' 'PRIMARY KEY (id)' ')' - ), - CompiledSQL('CREATE TABLE a (' + ) + ) + else: + table_assertions.append( + CompiledSQL( + 'CREATE TABLE a (' 'id INTEGER NOT NULL, ' 'bid INTEGER, ' 'PRIMARY KEY (id), ' 'FOREIGN KEY(bid) REFERENCES b (id)' ')' - ), - ] + ) + ) + else: + table_assertions.append( + CompiledSQL('CREATE TABLE b (' + 'id INTEGER NOT NULL, ' + 'aid INTEGER, ' + 'PRIMARY KEY (id)' + ')' + ) + ) + table_assertions.append( + CompiledSQL( + 'CREATE TABLE a (' + 'id INTEGER NOT NULL, ' + 'bid INTEGER, ' + 'PRIMARY KEY (id), ' + 'FOREIGN KEY(bid) REFERENCES b (id)' + ')' + ) + ) + + assertions = [AllOf(*table_assertions)] if testing.db.dialect.supports_alter: - assertions.append( + fk_assertions = [] + fk_assertions.append( CompiledSQL('ALTER TABLE b ADD CONSTRAINT bfk ' 'FOREIGN KEY(aid) REFERENCES a (id)') ) - self.assert_sql_execution( - testing.db, - lambda: metadata.create_all(checkfirst=False), - *assertions - ) + if auto: + fk_assertions.append( + CompiledSQL('ALTER TABLE a ADD ' + 'FOREIGN KEY(bid) REFERENCES b (id)') + ) + assertions.append(AllOf(*fk_assertions)) + + with self.sql_execution_asserter() as asserter: + metadata.create_all(checkfirst=False) + asserter.assert_(*assertions) - assertions = [] if testing.db.dialect.supports_alter: - assertions.append(CompiledSQL('ALTER TABLE b DROP CONSTRAINT bfk')) - assertions.extend([ - CompiledSQL("DROP TABLE a"), - CompiledSQL("DROP TABLE b"), - ]) - self.assert_sql_execution( - testing.db, - lambda: metadata.drop_all(checkfirst=False), - *assertions - ) + assertions = [ + CompiledSQL('ALTER TABLE b DROP CONSTRAINT bfk'), + CompiledSQL("DROP TABLE a"), + CompiledSQL("DROP TABLE b") + ] + else: + assertions = [AllOf( + CompiledSQL("DROP TABLE a"), + CompiledSQL("DROP TABLE b") + )] + + with self.sql_execution_asserter() as asserter: + metadata.drop_all(checkfirst=False), + asserter.assert_(*assertions) @testing.requires.check_constraints @testing.provide_metadata @@ -289,13 +417,13 @@ class ConstraintGenTest(fixtures.TestBase, AssertsExecutionResults): lambda: events.create(testing.db), RegexSQL("^CREATE TABLE events"), AllOf( - ExactSQL('CREATE UNIQUE INDEX ix_events_name ON events ' + CompiledSQL('CREATE UNIQUE INDEX ix_events_name ON events ' '(name)'), - ExactSQL('CREATE INDEX ix_events_location ON events ' + CompiledSQL('CREATE INDEX ix_events_location ON events ' '(location)'), - ExactSQL('CREATE UNIQUE INDEX sport_announcer ON events ' + CompiledSQL('CREATE UNIQUE INDEX sport_announcer ON events ' '(sport, announcer)'), - ExactSQL('CREATE INDEX idx_winners ON events (winner)') + CompiledSQL('CREATE INDEX idx_winners ON events (winner)'), ) ) @@ -313,7 +441,7 @@ class ConstraintGenTest(fixtures.TestBase, AssertsExecutionResults): lambda: t.create(testing.db), CompiledSQL('CREATE TABLE sometable (id INTEGER NOT NULL, ' 'data VARCHAR(50), PRIMARY KEY (id))'), - ExactSQL('CREATE INDEX myindex ON sometable (data DESC)') + CompiledSQL('CREATE INDEX myindex ON sometable (data DESC)') ) @@ -542,6 +670,33 @@ class ConstraintCompilationTest(fixtures.TestBase, AssertsCompiledSQL): "REFERENCES tbl (a) MATCH SIMPLE" ) + def test_create_table_omit_fks(self): + fkcs = [ + ForeignKeyConstraint(['a'], ['remote.id'], name='foo'), + ForeignKeyConstraint(['b'], ['remote.id'], name='bar'), + ForeignKeyConstraint(['c'], ['remote.id'], name='bat'), + ] + m = MetaData() + t = Table( + 't', m, + Column('a', Integer), + Column('b', Integer), + Column('c', Integer), + *fkcs + ) + Table('remote', m, Column('id', Integer, primary_key=True)) + + self.assert_compile( + schema.CreateTable(t, include_foreign_key_constraints=[]), + "CREATE TABLE t (a INTEGER, b INTEGER, c INTEGER)" + ) + self.assert_compile( + schema.CreateTable(t, include_foreign_key_constraints=fkcs[0:2]), + "CREATE TABLE t (a INTEGER, b INTEGER, c INTEGER, " + "CONSTRAINT foo FOREIGN KEY(a) REFERENCES remote (id), " + "CONSTRAINT bar FOREIGN KEY(b) REFERENCES remote (id))" + ) + def test_deferrable_unique(self): factory = lambda **kw: UniqueConstraint('b', **kw) self._test_deferrable(factory) diff --git a/test/sql/test_cte.py b/test/sql/test_cte.py index b907fe649..c7906dcb7 100644 --- a/test/sql/test_cte.py +++ b/test/sql/test_cte.py @@ -462,3 +462,33 @@ class CTETest(fixtures.TestBase, AssertsCompiledSQL): 'FROM "order" JOIN regional_sales AS anon_1 ' 'ON anon_1."order" = "order"."order"' ) + + def test_suffixes(self): + orders = table('order', column('order')) + s = select([orders.c.order]).cte("regional_sales") + s = s.suffix_with("pg suffix", dialect='postgresql') + s = s.suffix_with('oracle suffix', dialect='oracle') + stmt = select([orders]).where(orders.c.order > s.c.order) + + self.assert_compile( + stmt, + 'WITH regional_sales AS (SELECT "order"."order" AS "order" ' + 'FROM "order") SELECT "order"."order" FROM "order", ' + 'regional_sales WHERE "order"."order" > regional_sales."order"' + ) + + self.assert_compile( + stmt, + 'WITH regional_sales AS (SELECT "order"."order" AS "order" ' + 'FROM "order") oracle suffix SELECT "order"."order" FROM "order", ' + 'regional_sales WHERE "order"."order" > regional_sales."order"', + dialect='oracle' + ) + + self.assert_compile( + stmt, + 'WITH regional_sales AS (SELECT "order"."order" AS "order" ' + 'FROM "order") pg suffix SELECT "order"."order" FROM "order", ' + 'regional_sales WHERE "order"."order" > regional_sales."order"', + dialect='postgresql' + )
\ No newline at end of file diff --git a/test/sql/test_ddlemit.py b/test/sql/test_ddlemit.py index 825f8228b..e191beed3 100644 --- a/test/sql/test_ddlemit.py +++ b/test/sql/test_ddlemit.py @@ -1,6 +1,6 @@ from sqlalchemy.testing import fixtures from sqlalchemy.sql.ddl import SchemaGenerator, SchemaDropper -from sqlalchemy import MetaData, Table, Column, Integer, Sequence +from sqlalchemy import MetaData, Table, Column, Integer, Sequence, ForeignKey from sqlalchemy import schema from sqlalchemy.testing.mock import Mock @@ -42,6 +42,31 @@ class EmitDDLTest(fixtures.TestBase): for i in range(1, 6) ) + def _use_alter_fixture_one(self): + m = MetaData() + + t1 = Table( + 't1', m, Column('id', Integer, primary_key=True), + Column('t2id', Integer, ForeignKey('t2.id')) + ) + t2 = Table( + 't2', m, Column('id', Integer, primary_key=True), + Column('t1id', Integer, ForeignKey('t1.id')) + ) + return m, t1, t2 + + def _fk_fixture_one(self): + m = MetaData() + + t1 = Table( + 't1', m, Column('id', Integer, primary_key=True), + Column('t2id', Integer, ForeignKey('t2.id')) + ) + t2 = Table( + 't2', m, Column('id', Integer, primary_key=True), + ) + return m, t1, t2 + def _table_seq_fixture(self): m = MetaData() @@ -172,6 +197,32 @@ class EmitDDLTest(fixtures.TestBase): self._assert_drop_tables([t1, t2, t3, t4, t5], generator, m) + def test_create_metadata_auto_alter_fk(self): + m, t1, t2 = self._use_alter_fixture_one() + generator = self._mock_create_fixture( + False, [t1, t2] + ) + self._assert_create_w_alter( + [t1, t2] + + list(t1.foreign_key_constraints) + + list(t2.foreign_key_constraints), + generator, + m + ) + + def test_create_metadata_inline_fk(self): + m, t1, t2 = self._fk_fixture_one() + generator = self._mock_create_fixture( + False, [t1, t2] + ) + self._assert_create_w_alter( + [t1, t2] + + list(t1.foreign_key_constraints) + + list(t2.foreign_key_constraints), + generator, + m + ) + def _assert_create_tables(self, elements, generator, argument): self._assert_ddl(schema.CreateTable, elements, generator, argument) @@ -188,6 +239,16 @@ class EmitDDLTest(fixtures.TestBase): (schema.DropTable, schema.DropSequence), elements, generator, argument) + def _assert_create_w_alter(self, elements, generator, argument): + self._assert_ddl( + (schema.CreateTable, schema.CreateSequence, schema.AddConstraint), + elements, generator, argument) + + def _assert_drop_w_alter(self, elements, generator, argument): + self._assert_ddl( + (schema.DropTable, schema.DropSequence, schema.DropConstraint), + elements, generator, argument) + def _assert_ddl(self, ddl_cls, elements, generator, argument): generator.traverse_single(argument) for call_ in generator.connection.execute.mock_calls: @@ -196,4 +257,8 @@ class EmitDDLTest(fixtures.TestBase): assert c.element in elements, "element %r was not expected"\ % c.element elements.remove(c.element) + if getattr(c, 'include_foreign_key_constraints', None) is not None: + elements[:] = [ + e for e in elements + if e not in set(c.include_foreign_key_constraints)] assert not elements, "elements remain in list: %r" % elements diff --git a/test/sql/test_defaults.py b/test/sql/test_defaults.py index 10e557b76..48505dd8c 100644 --- a/test/sql/test_defaults.py +++ b/test/sql/test_defaults.py @@ -336,13 +336,7 @@ class DefaultTest(fixtures.TestBase): [(54, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today, None, 'hi')]) - @testing.fails_on('firebird', 'Data type unknown') def test_insertmany(self): - # MySQL-Python 1.2.2 breaks functions in execute_many :( - if (testing.against('mysql+mysqldb') and - testing.db.dialect.dbapi.version_info[:3] == (1, 2, 2)): - return - t.insert().execute({}, {}, {}) ctexec = currenttime.scalar() @@ -356,6 +350,22 @@ class DefaultTest(fixtures.TestBase): (53, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today, 'py', 'hi')]) + @testing.requires.multivalues_inserts + def test_insert_multivalues(self): + + t.insert().values([{}, {}, {}]).execute() + + ctexec = currenttime.scalar() + l = t.select().execute() + today = datetime.date.today() + eq_(l.fetchall(), + [(51, 'imthedefault', f, ts, ts, ctexec, True, False, + 12, today, 'py', 'hi'), + (52, 'imthedefault', f, ts, ts, ctexec, True, False, + 12, today, 'py', 'hi'), + (53, 'imthedefault', f, ts, ts, ctexec, True, False, + 12, today, 'py', 'hi')]) + def test_no_embed_in_sql(self): """Using a DefaultGenerator, Sequence, DefaultClause in the columns, where clause of a select, or in the values @@ -368,7 +378,8 @@ class DefaultTest(fixtures.TestBase): ): assert_raises_message( sa.exc.ArgumentError, - "SQL expression object or string expected.", + "SQL expression object or string expected, got object of type " + "<.* 'list'> instead", t.select, [const] ) assert_raises_message( diff --git a/test/sql/test_insert.py b/test/sql/test_insert.py index bd4eaa3e2..8a41d4be7 100644 --- a/test/sql/test_insert.py +++ b/test/sql/test_insert.py @@ -1,12 +1,12 @@ #! coding:utf-8 from sqlalchemy import Column, Integer, MetaData, String, Table,\ - bindparam, exc, func, insert, select, column + bindparam, exc, func, insert, select, column, text from sqlalchemy.dialects import mysql, postgresql from sqlalchemy.engine import default from sqlalchemy.testing import AssertsCompiledSQL,\ assert_raises_message, fixtures - +from sqlalchemy.sql import crud class _InsertTestBase(object): @@ -19,6 +19,12 @@ class _InsertTestBase(object): Table('myothertable', metadata, Column('otherid', Integer, primary_key=True), Column('othername', String(30))) + Table('table_w_defaults', metadata, + Column('id', Integer, primary_key=True), + Column('x', Integer, default=10), + Column('y', Integer, server_default=text('5')), + Column('z', Integer, default=lambda: 10) + ) class InsertTest(_InsertTestBase, fixtures.TablesTest, AssertsCompiledSQL): @@ -565,6 +571,36 @@ class MultirowTest(_InsertTestBase, fixtures.TablesTest, AssertsCompiledSQL): checkpositional=checkpositional, dialect=dialect) + def test_positional_w_defaults(self): + table1 = self.tables.table_w_defaults + + values = [ + {'id': 1}, + {'id': 2}, + {'id': 3} + ] + + checkpositional = (1, None, None, 2, None, None, 3, None, None) + + dialect = default.DefaultDialect() + dialect.supports_multivalues_insert = True + dialect.paramstyle = 'format' + dialect.positional = True + + self.assert_compile( + table1.insert().values(values), + "INSERT INTO table_w_defaults (id, x, z) VALUES " + "(%s, %s, %s), (%s, %s, %s), (%s, %s, %s)", + checkpositional=checkpositional, + check_prefetch=[ + table1.c.x, table1.c.z, + crud._multiparam_column(table1.c.x, 0), + crud._multiparam_column(table1.c.z, 0), + crud._multiparam_column(table1.c.x, 1), + crud._multiparam_column(table1.c.z, 1) + ], + dialect=dialect) + def test_inline_default(self): metadata = MetaData() table = Table('sometable', metadata, @@ -597,6 +633,74 @@ class MultirowTest(_InsertTestBase, fixtures.TablesTest, AssertsCompiledSQL): checkparams=checkparams, dialect=postgresql.dialect()) + def test_python_scalar_default(self): + metadata = MetaData() + table = Table('sometable', metadata, + Column('id', Integer, primary_key=True), + Column('data', String), + Column('foo', Integer, default=10)) + + values = [ + {'id': 1, 'data': 'data1'}, + {'id': 2, 'data': 'data2', 'foo': 15}, + {'id': 3, 'data': 'data3'}, + ] + + checkparams = { + 'id_0': 1, + 'id_1': 2, + 'id_2': 3, + 'data_0': 'data1', + 'data_1': 'data2', + 'data_2': 'data3', + 'foo': None, # evaluated later + 'foo_1': 15, + 'foo_2': None # evaluated later + } + + self.assert_compile( + table.insert().values(values), + 'INSERT INTO sometable (id, data, foo) VALUES ' + '(%(id_0)s, %(data_0)s, %(foo)s), ' + '(%(id_1)s, %(data_1)s, %(foo_1)s), ' + '(%(id_2)s, %(data_2)s, %(foo_2)s)', + checkparams=checkparams, + dialect=postgresql.dialect()) + + def test_python_fn_default(self): + metadata = MetaData() + table = Table('sometable', metadata, + Column('id', Integer, primary_key=True), + Column('data', String), + Column('foo', Integer, default=lambda: 10)) + + values = [ + {'id': 1, 'data': 'data1'}, + {'id': 2, 'data': 'data2', 'foo': 15}, + {'id': 3, 'data': 'data3'}, + ] + + checkparams = { + 'id_0': 1, + 'id_1': 2, + 'id_2': 3, + 'data_0': 'data1', + 'data_1': 'data2', + 'data_2': 'data3', + 'foo': None, # evaluated later + 'foo_1': 15, + 'foo_2': None, # evaluated later + } + + self.assert_compile( + table.insert().values(values), + "INSERT INTO sometable (id, data, foo) VALUES " + "(%(id_0)s, %(data_0)s, %(foo)s), " + "(%(id_1)s, %(data_1)s, %(foo_1)s), " + "(%(id_2)s, %(data_2)s, %(foo_2)s)", + checkparams=checkparams, + dialect=postgresql.dialect()) + def test_sql_functions(self): metadata = MetaData() table = Table('sometable', metadata, @@ -684,24 +788,10 @@ class MultirowTest(_InsertTestBase, fixtures.TablesTest, AssertsCompiledSQL): {'id': 3, 'data': 'data3', 'foo': 'otherfoo'}, ] - checkparams = { - 'id_0': 1, - 'id_1': 2, - 'id_2': 3, - 'data_0': 'data1', - 'data_1': 'data2', - 'data_2': 'data3', - 'foo_0': 'plainfoo', - 'foo_2': 'otherfoo', - } - - # note the effect here is that the first set of params - # takes effect for the rest of them, when one is absent - self.assert_compile( - table.insert().values(values), - 'INSERT INTO sometable (id, data, foo) VALUES ' - '(%(id_0)s, %(data_0)s, %(foo_0)s), ' - '(%(id_1)s, %(data_1)s, %(foo_0)s), ' - '(%(id_2)s, %(data_2)s, %(foo_2)s)', - checkparams=checkparams, - dialect=postgresql.dialect()) + assert_raises_message( + exc.CompileError, + "INSERT value for column sometable.foo is explicitly rendered " + "as a boundparameter in the VALUES clause; a Python-side value or " + "SQL expression is required", + table.insert().values(values).compile + ) diff --git a/test/sql/test_join_rewriting.py b/test/sql/test_join_rewriting.py index ced65d7f1..f99dfda4e 100644 --- a/test/sql/test_join_rewriting.py +++ b/test/sql/test_join_rewriting.py @@ -650,6 +650,7 @@ class JoinExecTest(_JoinRewriteTestBase, fixtures.TestBase): def _test(self, selectable, assert_): result = testing.db.execute(selectable) + result.close() for col in selectable.inner_columns: assert col in result._metadata._keymap diff --git a/test/sql/test_metadata.py b/test/sql/test_metadata.py index 0aa5d7305..cc7d0eb4f 100644 --- a/test/sql/test_metadata.py +++ b/test/sql/test_metadata.py @@ -1160,9 +1160,10 @@ class InfoTest(fixtures.TestBase): t = Table('x', MetaData(), info={'foo': 'bar'}) eq_(t.info, {'foo': 'bar'}) + class TableTest(fixtures.TestBase, AssertsCompiledSQL): - @testing.requires.temporary_table + @testing.requires.temporary_tables @testing.skip_if('mssql', 'different col format') def test_prefixes(self): from sqlalchemy import Table @@ -1195,6 +1196,30 @@ class TableTest(fixtures.TestBase, AssertsCompiledSQL): t.info['bar'] = 'zip' assert t.info['bar'] == 'zip' + def test_foreign_key_constraints_collection(self): + metadata = MetaData() + t1 = Table('foo', metadata, Column('a', Integer)) + eq_(t1.foreign_key_constraints, set()) + + fk1 = ForeignKey('q.id') + fk2 = ForeignKey('j.id') + fk3 = ForeignKeyConstraint(['b', 'c'], ['r.x', 'r.y']) + + t1.append_column(Column('b', Integer, fk1)) + eq_( + t1.foreign_key_constraints, + set([fk1.constraint])) + + t1.append_column(Column('c', Integer, fk2)) + eq_( + t1.foreign_key_constraints, + set([fk1.constraint, fk2.constraint])) + + t1.append_constraint(fk3) + eq_( + t1.foreign_key_constraints, + set([fk1.constraint, fk2.constraint, fk3])) + def test_c_immutable(self): m = MetaData() t1 = Table('t', m, Column('x', Integer), Column('y', Integer)) @@ -1946,6 +1971,22 @@ class ConstraintTest(fixtures.TestBase): assert s1.c.a.references(t1.c.a) assert not s1.c.a.references(t1.c.b) + def test_referred_table_accessor(self): + t1, t2, t3 = self._single_fixture() + fkc = list(t2.foreign_key_constraints)[0] + is_(fkc.referred_table, t1) + + def test_referred_table_accessor_not_available(self): + t1 = Table('t', MetaData(), Column('x', ForeignKey('q.id'))) + fkc = list(t1.foreign_key_constraints)[0] + assert_raises_message( + exc.InvalidRequestError, + "Foreign key associated with column 't.x' could not find " + "table 'q' with which to generate a foreign key to target " + "column 'id'", + getattr, fkc, "referred_table" + ) + def test_related_column_not_present_atfirst_ok(self): m = MetaData() base_table = Table("base", m, diff --git a/test/sql/test_operators.py b/test/sql/test_operators.py index e8ad88511..0985020d1 100644 --- a/test/sql/test_operators.py +++ b/test/sql/test_operators.py @@ -12,7 +12,8 @@ from sqlalchemy import exc from sqlalchemy.engine import default from sqlalchemy.sql.elements import _literal_as_text from sqlalchemy.schema import Column, Table, MetaData -from sqlalchemy.types import TypeEngine, TypeDecorator, UserDefinedType, Boolean +from sqlalchemy.types import TypeEngine, TypeDecorator, UserDefinedType, \ + Boolean, NullType, MatchType from sqlalchemy.dialects import mysql, firebird, postgresql, oracle, \ sqlite, mssql from sqlalchemy import util @@ -360,7 +361,7 @@ class CustomComparatorTest(_CustomComparatorTests, fixtures.TestBase): class comparator_factory(TypeEngine.Comparator): def __init__(self, expr): - self.expr = expr + super(MyInteger.comparator_factory, self).__init__(expr) def __add__(self, other): return self.expr.op("goofy")(other) @@ -381,7 +382,7 @@ class TypeDecoratorComparatorTest(_CustomComparatorTests, fixtures.TestBase): class comparator_factory(TypeDecorator.Comparator): def __init__(self, expr): - self.expr = expr + super(MyInteger.comparator_factory, self).__init__(expr) def __add__(self, other): return self.expr.op("goofy")(other) @@ -392,6 +393,31 @@ class TypeDecoratorComparatorTest(_CustomComparatorTests, fixtures.TestBase): return MyInteger +class TypeDecoratorTypeDecoratorComparatorTest( + _CustomComparatorTests, fixtures.TestBase): + + def _add_override_factory(self): + + class MyIntegerOne(TypeDecorator): + impl = Integer + + class comparator_factory(TypeDecorator.Comparator): + + def __init__(self, expr): + super(MyIntegerOne.comparator_factory, self).__init__(expr) + + def __add__(self, other): + return self.expr.op("goofy")(other) + + def __and__(self, other): + return self.expr.op("goofy_and")(other) + + class MyIntegerTwo(TypeDecorator): + impl = MyIntegerOne + + return MyIntegerTwo + + class TypeDecoratorWVariantComparatorTest( _CustomComparatorTests, fixtures.TestBase): @@ -403,7 +429,9 @@ class TypeDecoratorWVariantComparatorTest( class comparator_factory(TypeEngine.Comparator): def __init__(self, expr): - self.expr = expr + super( + SomeOtherInteger.comparator_factory, + self).__init__(expr) def __add__(self, other): return self.expr.op("not goofy")(other) @@ -417,7 +445,7 @@ class TypeDecoratorWVariantComparatorTest( class comparator_factory(TypeDecorator.Comparator): def __init__(self, expr): - self.expr = expr + super(MyInteger.comparator_factory, self).__init__(expr) def __add__(self, other): return self.expr.op("goofy")(other) @@ -438,7 +466,7 @@ class CustomEmbeddedinTypeDecoratorTest( class comparator_factory(TypeEngine.Comparator): def __init__(self, expr): - self.expr = expr + super(MyInteger.comparator_factory, self).__init__(expr) def __add__(self, other): return self.expr.op("goofy")(other) @@ -460,7 +488,7 @@ class NewOperatorTest(_CustomComparatorTests, fixtures.TestBase): class comparator_factory(TypeEngine.Comparator): def __init__(self, expr): - self.expr = expr + super(MyInteger.comparator_factory, self).__init__(expr) def foob(self, other): return self.expr.op("foob")(other) @@ -1619,6 +1647,31 @@ class MatchTest(fixtures.TestBase, testing.AssertsCompiledSQL): "CONTAINS (mytable.myid, :myid_1)", dialect=oracle.dialect()) + def test_match_is_now_matchtype(self): + expr = self.table1.c.myid.match('somstr') + assert expr.type._type_affinity is MatchType()._type_affinity + assert isinstance(expr.type, MatchType) + + def test_boolean_inversion_postgresql(self): + self.assert_compile( + ~self.table1.c.myid.match('somstr'), + "NOT mytable.myid @@ to_tsquery(%(myid_1)s)", + dialect=postgresql.dialect()) + + def test_boolean_inversion_mysql(self): + # because mysql doesnt have native boolean + self.assert_compile( + ~self.table1.c.myid.match('somstr'), + "NOT MATCH (mytable.myid) AGAINST (%s IN BOOLEAN MODE)", + dialect=mysql.dialect()) + + def test_boolean_inversion_mssql(self): + # because mssql doesnt have native boolean + self.assert_compile( + ~self.table1.c.myid.match('somstr'), + "NOT CONTAINS (mytable.myid, :myid_1)", + dialect=mssql.dialect()) + class ComposedLikeOperatorsTest(fixtures.TestBase, testing.AssertsCompiledSQL): __dialect__ = 'default' diff --git a/test/sql/test_types.py b/test/sql/test_types.py index 26dc6c842..5e1542853 100644 --- a/test/sql/test_types.py +++ b/test/sql/test_types.py @@ -10,6 +10,8 @@ from sqlalchemy import ( type_coerce, VARCHAR, Time, DateTime, BigInteger, SmallInteger, BOOLEAN, BLOB, NCHAR, NVARCHAR, CLOB, TIME, DATE, DATETIME, TIMESTAMP, SMALLINT, INTEGER, DECIMAL, NUMERIC, FLOAT, REAL) +from sqlalchemy.sql import ddl + from sqlalchemy import exc, types, util, dialects for name in dialects.__all__: __import__("sqlalchemy.dialects.%s" % name) @@ -309,6 +311,24 @@ class UserDefinedTest(fixtures.TablesTest, AssertsCompiledSQL): literal_binds=True ) + def test_kw_colspec(self): + class MyType(types.UserDefinedType): + def get_col_spec(self, **kw): + return "FOOB %s" % kw['type_expression'].name + + class MyOtherType(types.UserDefinedType): + def get_col_spec(self): + return "BAR" + + self.assert_compile( + ddl.CreateColumn(Column('bar', MyType)), + "bar FOOB bar" + ) + self.assert_compile( + ddl.CreateColumn(Column('bar', MyOtherType)), + "bar BAR" + ) + def test_typedecorator_literal_render_fallback_bound(self): # fall back to process_bind_param for literal # value rendering. @@ -932,6 +952,7 @@ class UnicodeTest(fixtures.TestBase): expected = (testing.db.name, testing.db.driver) in \ ( ('postgresql', 'psycopg2'), + ('postgresql', 'psycopg2cffi'), ('postgresql', 'pypostgresql'), ('postgresql', 'pg8000'), ('postgresql', 'zxjdbc'), @@ -1157,8 +1178,11 @@ class EnumTest(AssertsCompiledSQL, fixtures.TestBase): def test_repr(self): e = Enum( "x", "y", name="somename", convert_unicode=True, quote=True, - inherit_schema=True) - eq_(repr(e), "Enum('x', 'y', name='somename', inherit_schema=True)") + inherit_schema=True, native_enum=False) + eq_( + repr(e), + "Enum('x', 'y', name='somename', " + "inherit_schema=True, native_enum=False)") binary_table = MyPickleType = metadata = None @@ -1639,6 +1663,49 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): def test_decimal_scale(self): self.assert_compile(types.DECIMAL(2, 4), 'DECIMAL(2, 4)') + def test_kwarg_legacy_typecompiler(self): + from sqlalchemy.sql import compiler + + class SomeTypeCompiler(compiler.GenericTypeCompiler): + # transparently decorated w/ kw decorator + def visit_VARCHAR(self, type_): + return "MYVARCHAR" + + # not affected + def visit_INTEGER(self, type_, **kw): + return "MYINTEGER %s" % kw['type_expression'].name + + dialect = default.DefaultDialect() + dialect.type_compiler = SomeTypeCompiler(dialect) + self.assert_compile( + ddl.CreateColumn(Column('bar', VARCHAR(50))), + "bar MYVARCHAR", + dialect=dialect + ) + self.assert_compile( + ddl.CreateColumn(Column('bar', INTEGER)), + "bar MYINTEGER bar", + dialect=dialect + ) + + +class TestKWArgPassThru(AssertsCompiledSQL, fixtures.TestBase): + __backend__ = True + + def test_user_defined(self): + """test that dialects pass the column through on DDL.""" + + class MyType(types.UserDefinedType): + def get_col_spec(self, **kw): + return "FOOB %s" % kw['type_expression'].name + + m = MetaData() + t = Table('t', m, Column('bar', MyType)) + self.assert_compile( + ddl.CreateColumn(t.c.bar), + "bar FOOB bar" + ) + class NumericRawSQLTest(fixtures.TestBase): |