From 1e1a38e7801f410f244e4bbb44ec795ae152e04e Mon Sep 17 00:00:00 2001 From: Mike Bayer Date: Sun, 6 Jan 2019 01:14:26 -0500 Subject: Run black -l 79 against all source files This is a straight reformat run using black as is, with no edits applied at all. The black run will format code consistently, however in some cases that are prevalent in SQLAlchemy code it produces too-long lines. The too-long lines will be resolved in the following commit that will resolve all remaining flake8 issues including shadowed builtins, long lines, import order, unused imports, duplicate imports, and docstring issues. Change-Id: I7eda77fed3d8e73df84b3651fd6cfcfe858d4dc9 --- test/sql/test_metadata.py | 3779 ++++++++++++++++++++++++--------------------- 1 file changed, 2019 insertions(+), 1760 deletions(-) (limited to 'test/sql/test_metadata.py') diff --git a/test/sql/test_metadata.py b/test/sql/test_metadata.py index f030a7e77..9a28b0c7b 100644 --- a/test/sql/test_metadata.py +++ b/test/sql/test_metadata.py @@ -2,12 +2,33 @@ from sqlalchemy.testing import assert_raises from sqlalchemy.testing import assert_raises_message from sqlalchemy.testing import emits_warning import pickle -from sqlalchemy import Integer, String, UniqueConstraint, \ - CheckConstraint, ForeignKey, MetaData, Sequence, \ - ForeignKeyConstraint, PrimaryKeyConstraint, ColumnDefault, Index, event,\ - events, Unicode, types as sqltypes, bindparam, \ - Table, Column, Boolean, Enum, func, text, TypeDecorator, \ - BLANK_SCHEMA, ARRAY +from sqlalchemy import ( + Integer, + String, + UniqueConstraint, + CheckConstraint, + ForeignKey, + MetaData, + Sequence, + ForeignKeyConstraint, + PrimaryKeyConstraint, + ColumnDefault, + Index, + event, + events, + Unicode, + types as sqltypes, + bindparam, + Table, + Column, + Boolean, + Enum, + func, + text, + TypeDecorator, + BLANK_SCHEMA, + ARRAY, +) from sqlalchemy import schema, exc from sqlalchemy.engine import default from sqlalchemy.sql import elements, naming @@ -20,14 +41,14 @@ from contextlib import contextmanager from sqlalchemy import util from sqlalchemy.testing import engines -class MetaDataTest(fixtures.TestBase, ComparesTables): +class MetaDataTest(fixtures.TestBase, ComparesTables): def test_metadata_contains(self): metadata = MetaData() - t1 = Table('t1', metadata, Column('x', Integer)) - t2 = Table('t2', metadata, Column('x', Integer), schema='foo') - t3 = Table('t2', MetaData(), Column('x', Integer)) - t4 = Table('t1', MetaData(), Column('x', Integer), schema='foo') + t1 = Table("t1", metadata, Column("x", Integer)) + t2 = Table("t2", metadata, Column("x", Integer), schema="foo") + t3 = Table("t2", MetaData(), Column("x", Integer)) + t4 = Table("t1", MetaData(), Column("x", Integer), schema="foo") assert "t1" in metadata assert "foo.t2" in metadata @@ -40,50 +61,68 @@ class MetaDataTest(fixtures.TestBase, ComparesTables): def test_uninitialized_column_copy(self): for col in [ - Column('foo', String(), nullable=False), - Column('baz', String(), unique=True), + Column("foo", String(), nullable=False), + Column("baz", String(), unique=True), Column(Integer(), primary_key=True), - Column('bar', Integer(), Sequence('foo_seq'), primary_key=True, - key='bar'), - Column(Integer(), ForeignKey('bat.blah'), doc="this is a col"), - Column('bar', Integer(), ForeignKey('bat.blah'), primary_key=True, - key='bar'), - Column('bar', Integer(), info={'foo': 'bar'}), + Column( + "bar", + Integer(), + Sequence("foo_seq"), + primary_key=True, + key="bar", + ), + Column(Integer(), ForeignKey("bat.blah"), doc="this is a col"), + Column( + "bar", + Integer(), + ForeignKey("bat.blah"), + primary_key=True, + key="bar", + ), + Column("bar", Integer(), info={"foo": "bar"}), ]: c2 = col.copy() - for attr in ('name', 'type', 'nullable', - 'primary_key', 'key', 'unique', 'info', - 'doc'): + for attr in ( + "name", + "type", + "nullable", + "primary_key", + "key", + "unique", + "info", + "doc", + ): eq_(getattr(col, attr), getattr(c2, attr)) eq_(len(col.foreign_keys), len(c2.foreign_keys)) if col.default: - eq_(c2.default.name, 'foo_seq') + eq_(c2.default.name, "foo_seq") for a1, a2 in zip(col.foreign_keys, c2.foreign_keys): assert a1 is not a2 - eq_(a2._colspec, 'bat.blah') + eq_(a2._colspec, "bat.blah") def test_col_subclass_copy(self): class MyColumn(schema.Column): - def __init__(self, *args, **kw): - self.widget = kw.pop('widget', None) + self.widget = kw.pop("widget", None) super(MyColumn, self).__init__(*args, **kw) def copy(self, *arg, **kw): c = super(MyColumn, self).copy(*arg, **kw) c.widget = self.widget return c - c1 = MyColumn('foo', Integer, widget='x') + + c1 = MyColumn("foo", Integer, widget="x") c2 = c1.copy() assert isinstance(c2, MyColumn) - eq_(c2.widget, 'x') + eq_(c2.widget, "x") def test_uninitialized_column_copy_events(self): msgs = [] def write(c, t): msgs.append("attach %s.%s" % (t.name, c.name)) - c1 = Column('foo', String()) + + c1 = Column("foo", String()) m = MetaData() for i in range(3): cx = c1.copy() @@ -91,39 +130,39 @@ class MetaDataTest(fixtures.TestBase, ComparesTables): # that listeners will be re-established from the # natural construction of things. cx._on_table_attach(write) - Table('foo%d' % i, m, cx) - eq_(msgs, ['attach foo0.foo', 'attach foo1.foo', 'attach foo2.foo']) + Table("foo%d" % i, m, cx) + eq_(msgs, ["attach foo0.foo", "attach foo1.foo", "attach foo2.foo"]) def test_schema_collection_add(self): metadata = MetaData() - Table('t1', metadata, Column('x', Integer), schema='foo') - Table('t2', metadata, Column('x', Integer), schema='bar') - Table('t3', metadata, Column('x', Integer)) + Table("t1", metadata, Column("x", Integer), schema="foo") + Table("t2", metadata, Column("x", Integer), schema="bar") + Table("t3", metadata, Column("x", Integer)) - eq_(metadata._schemas, set(['foo', 'bar'])) + eq_(metadata._schemas, set(["foo", "bar"])) eq_(len(metadata.tables), 3) def test_schema_collection_remove(self): metadata = MetaData() - t1 = Table('t1', metadata, Column('x', Integer), schema='foo') - Table('t2', metadata, Column('x', Integer), schema='bar') - t3 = Table('t3', metadata, Column('x', Integer), schema='bar') + t1 = Table("t1", metadata, Column("x", Integer), schema="foo") + Table("t2", metadata, Column("x", Integer), schema="bar") + t3 = Table("t3", metadata, Column("x", Integer), schema="bar") metadata.remove(t3) - eq_(metadata._schemas, set(['foo', 'bar'])) + eq_(metadata._schemas, set(["foo", "bar"])) eq_(len(metadata.tables), 2) metadata.remove(t1) - eq_(metadata._schemas, set(['bar'])) + eq_(metadata._schemas, set(["bar"])) eq_(len(metadata.tables), 1) def test_schema_collection_remove_all(self): metadata = MetaData() - Table('t1', metadata, Column('x', Integer), schema='foo') - Table('t2', metadata, Column('x', Integer), schema='bar') + Table("t1", metadata, Column("x", Integer), schema="foo") + Table("t2", metadata, Column("x", Integer), schema="bar") metadata.clear() eq_(metadata._schemas, set()) @@ -132,46 +171,56 @@ class MetaDataTest(fixtures.TestBase, ComparesTables): def test_metadata_tables_immutable(self): metadata = MetaData() - Table('t1', metadata, Column('x', Integer)) - assert 't1' in metadata.tables + Table("t1", metadata, Column("x", Integer)) + assert "t1" in metadata.tables - assert_raises( - TypeError, - lambda: metadata.tables.pop('t1') - ) + assert_raises(TypeError, lambda: metadata.tables.pop("t1")) @testing.provide_metadata def test_dupe_tables(self): metadata = self.metadata - Table('table1', metadata, - Column('col1', Integer, primary_key=True), - Column('col2', String(20))) + Table( + "table1", + metadata, + Column("col1", Integer, primary_key=True), + Column("col2", String(20)), + ) metadata.create_all() - Table('table1', metadata, autoload=True) + Table("table1", metadata, autoload=True) def go(): - Table('table1', metadata, - Column('col1', Integer, primary_key=True), - Column('col2', String(20))) + Table( + "table1", + metadata, + Column("col1", Integer, primary_key=True), + Column("col2", String(20)), + ) + assert_raises_message( tsa.exc.InvalidRequestError, "Table 'table1' is already defined for this " "MetaData instance. Specify 'extend_existing=True' " "to redefine options and columns on an existing " "Table object.", - go + go, ) def test_fk_copy(self): - c1 = Column('foo', Integer) - c2 = Column('bar', Integer) + c1 = Column("foo", Integer) + c2 = Column("bar", Integer) m = MetaData() - t1 = Table('t', m, c1, c2) - - kw = dict(onupdate="X", - ondelete="Y", use_alter=True, name='f1', - deferrable="Z", initially="Q", link_to_name=True) + t1 = Table("t", m, c1, c2) + + kw = dict( + onupdate="X", + ondelete="Y", + use_alter=True, + name="f1", + deferrable="Z", + initially="Q", + link_to_name=True, + ) fk1 = ForeignKey(c1, **kw) fk2 = ForeignKeyConstraint((c1,), (c2,), **kw) @@ -185,14 +234,18 @@ class MetaDataTest(fixtures.TestBase, ComparesTables): eq_(getattr(fk2c, k), kw[k]) def test_check_constraint_copy(self): - def r(x): return x - c = CheckConstraint("foo bar", - name='name', - initially=True, - deferrable=True, - _create_rule=r) + def r(x): + return x + + c = CheckConstraint( + "foo bar", + name="name", + initially=True, + deferrable=True, + _create_rule=r, + ) c2 = c.copy() - eq_(c2.name, 'name') + eq_(c2.name, "name") eq_(str(c2.sqltext), "foo bar") eq_(c2.initially, True) eq_(c2.deferrable, True) @@ -200,152 +253,157 @@ class MetaDataTest(fixtures.TestBase, ComparesTables): def test_col_replace_w_constraint(self): m = MetaData() - a = Table('a', m, Column('id', Integer, primary_key=True)) + a = Table("a", m, Column("id", Integer, primary_key=True)) - aid = Column('a_id', ForeignKey('a.id')) - b = Table('b', m, aid) + aid = Column("a_id", ForeignKey("a.id")) + b = Table("b", m, aid) b.append_column(aid) assert b.c.a_id.references(a.c.id) eq_(len(b.constraints), 2) def test_fk_construct(self): - c1 = Column('foo', Integer) - c2 = Column('bar', Integer) + c1 = Column("foo", Integer) + c2 = Column("bar", Integer) m = MetaData() - t1 = Table('t', m, c1, c2) - fk1 = ForeignKeyConstraint(('foo', ), ('bar', ), table=t1) + t1 = Table("t", m, c1, c2) + fk1 = ForeignKeyConstraint(("foo",), ("bar",), table=t1) assert fk1 in t1.constraints def test_fk_constraint_col_collection_w_table(self): - c1 = Column('foo', Integer) - c2 = Column('bar', Integer) + c1 = Column("foo", Integer) + c2 = Column("bar", Integer) m = MetaData() - t1 = Table('t', m, c1, c2) - fk1 = ForeignKeyConstraint(('foo', ), ('bar', ), table=t1) + t1 = Table("t", m, c1, c2) + fk1 = ForeignKeyConstraint(("foo",), ("bar",), table=t1) eq_(dict(fk1.columns), {"foo": c1}) def test_fk_constraint_col_collection_no_table(self): - fk1 = ForeignKeyConstraint(('foo', 'bat'), ('bar', 'hoho')) + fk1 = ForeignKeyConstraint(("foo", "bat"), ("bar", "hoho")) eq_(dict(fk1.columns), {}) - eq_(fk1.column_keys, ['foo', 'bat']) - eq_(fk1._col_description, 'foo, bat') + eq_(fk1.column_keys, ["foo", "bat"]) + eq_(fk1._col_description, "foo, bat") eq_(fk1._elements, {"foo": fk1.elements[0], "bat": fk1.elements[1]}) def test_fk_constraint_col_collection_no_table_real_cols(self): - c1 = Column('foo', Integer) - c2 = Column('bar', Integer) - fk1 = ForeignKeyConstraint((c1, ), (c2, )) + c1 = Column("foo", Integer) + c2 = Column("bar", Integer) + fk1 = ForeignKeyConstraint((c1,), (c2,)) eq_(dict(fk1.columns), {}) - eq_(fk1.column_keys, ['foo']) - eq_(fk1._col_description, 'foo') + eq_(fk1.column_keys, ["foo"]) + eq_(fk1._col_description, "foo") eq_(fk1._elements, {"foo": fk1.elements[0]}) def test_fk_constraint_col_collection_added_to_table(self): - c1 = Column('foo', Integer) + c1 = Column("foo", Integer) m = MetaData() - fk1 = ForeignKeyConstraint(('foo', ), ('bar', )) - Table('t', m, c1, fk1) + fk1 = ForeignKeyConstraint(("foo",), ("bar",)) + Table("t", m, c1, fk1) eq_(dict(fk1.columns), {"foo": c1}) eq_(fk1._elements, {"foo": fk1.elements[0]}) def test_fk_constraint_col_collection_via_fk(self): - fk = ForeignKey('bar') - c1 = Column('foo', Integer, fk) + fk = ForeignKey("bar") + c1 = Column("foo", Integer, fk) m = MetaData() - t1 = Table('t', m, c1) + t1 = Table("t", m, c1) fk1 = fk.constraint - eq_(fk1.column_keys, ['foo']) + eq_(fk1.column_keys, ["foo"]) assert fk1 in t1.constraints - eq_(fk1.column_keys, ['foo']) + eq_(fk1.column_keys, ["foo"]) eq_(dict(fk1.columns), {"foo": c1}) eq_(fk1._elements, {"foo": fk}) def test_fk_no_such_parent_col_error(self): meta = MetaData() - a = Table('a', meta, Column('a', Integer)) - Table('b', meta, Column('b', Integer)) + a = Table("a", meta, Column("a", Integer)) + Table("b", meta, Column("b", Integer)) def go(): - a.append_constraint( - ForeignKeyConstraint(['x'], ['b.b']) - ) + a.append_constraint(ForeignKeyConstraint(["x"], ["b.b"])) + assert_raises_message( exc.ArgumentError, "Can't create ForeignKeyConstraint on " "table 'a': no column named 'x' is present.", - go + go, ) def test_fk_given_non_col(self): - not_a_col = bindparam('x') + not_a_col = bindparam("x") assert_raises_message( exc.ArgumentError, "String, Column, or Column-bound argument expected, got Bind", - ForeignKey, not_a_col + ForeignKey, + not_a_col, ) def test_fk_given_non_col_clauseelem(self): class Foo(object): - def __clause_element__(self): - return bindparam('x') + return bindparam("x") + assert_raises_message( exc.ArgumentError, "String, Column, or Column-bound argument expected, got Bind", - ForeignKey, Foo() + ForeignKey, + Foo(), ) def test_fk_given_col_non_table(self): - t = Table('t', MetaData(), Column('x', Integer)) + t = Table("t", MetaData(), Column("x", Integer)) xa = t.alias().c.x assert_raises_message( exc.ArgumentError, "ForeignKey received Column not bound to a Table, got: .*Alias", - ForeignKey, xa + ForeignKey, + xa, ) def test_fk_given_col_non_table_clauseelem(self): - t = Table('t', MetaData(), Column('x', Integer)) + t = Table("t", MetaData(), Column("x", Integer)) class Foo(object): - def __clause_element__(self): return t.alias().c.x assert_raises_message( exc.ArgumentError, "ForeignKey received Column not bound to a Table, got: .*Alias", - ForeignKey, Foo() + ForeignKey, + Foo(), ) def test_fk_no_such_target_col_error_upfront(self): meta = MetaData() - a = Table('a', meta, Column('a', Integer)) - Table('b', meta, Column('b', Integer)) + a = Table("a", meta, Column("a", Integer)) + Table("b", meta, Column("b", Integer)) - a.append_constraint(ForeignKeyConstraint(['a'], ['b.x'])) + a.append_constraint(ForeignKeyConstraint(["a"], ["b.x"])) assert_raises_message( exc.NoReferencedColumnError, "Could not initialize target column for ForeignKey 'b.x' on " "table 'a': table 'b' has no column named 'x'", - getattr, list(a.foreign_keys)[0], "column" + getattr, + list(a.foreign_keys)[0], + "column", ) def test_fk_no_such_target_col_error_delayed(self): meta = MetaData() - a = Table('a', meta, Column('a', Integer)) - a.append_constraint( - ForeignKeyConstraint(['a'], ['b.x'])) + a = Table("a", meta, Column("a", Integer)) + a.append_constraint(ForeignKeyConstraint(["a"], ["b.x"])) - b = Table('b', meta, Column('b', Integer)) + b = Table("b", meta, Column("b", Integer)) assert_raises_message( exc.NoReferencedColumnError, "Could not initialize target column for ForeignKey 'b.x' on " "table 'a': table 'b' has no column named 'x'", - getattr, list(a.foreign_keys)[0], "column" + getattr, + list(a.foreign_keys)[0], + "column", ) def test_fk_mismatched_local_remote_cols(self): @@ -354,177 +412,202 @@ class MetaDataTest(fixtures.TestBase, ComparesTables): exc.ArgumentError, "ForeignKeyConstraint number of constrained columns must " "match the number of referenced columns.", - ForeignKeyConstraint, ['a'], ['b.a', 'b.b'] + ForeignKeyConstraint, + ["a"], + ["b.a", "b.b"], ) assert_raises_message( exc.ArgumentError, "ForeignKeyConstraint number of constrained columns " "must match the number of referenced columns.", - ForeignKeyConstraint, ['a', 'b'], ['b.a'] + ForeignKeyConstraint, + ["a", "b"], + ["b.a"], ) assert_raises_message( exc.ArgumentError, "ForeignKeyConstraint with duplicate source column " "references are not supported.", - ForeignKeyConstraint, ['a', 'a'], ['b.a', 'b.b'] + ForeignKeyConstraint, + ["a", "a"], + ["b.a", "b.b"], ) def test_pickle_metadata_sequence_restated(self): m1 = MetaData() - Table('a', m1, - Column('id', Integer, primary_key=True), - Column('x', Integer, Sequence("x_seq"))) + Table( + "a", + m1, + Column("id", Integer, primary_key=True), + Column("x", Integer, Sequence("x_seq")), + ) m2 = pickle.loads(pickle.dumps(m1)) s2 = Sequence("x_seq") - t2 = Table('a', m2, - Column('id', Integer, primary_key=True), - Column('x', Integer, s2), - extend_existing=True) + t2 = Table( + "a", + m2, + Column("id", Integer, primary_key=True), + Column("x", Integer, s2), + extend_existing=True, + ) - assert m2._sequences['x_seq'] is t2.c.x.default - assert m2._sequences['x_seq'] is s2 + assert m2._sequences["x_seq"] is t2.c.x.default + assert m2._sequences["x_seq"] is s2 def test_sequence_restated_replaced(self): """Test restatement of Sequence replaces.""" m1 = MetaData() s1 = Sequence("x_seq") - t = Table('a', m1, - Column('x', Integer, s1) - ) - assert m1._sequences['x_seq'] is s1 - - s2 = Sequence('x_seq') - Table('a', m1, - Column('x', Integer, s2), - extend_existing=True - ) + t = Table("a", m1, Column("x", Integer, s1)) + assert m1._sequences["x_seq"] is s1 + + s2 = Sequence("x_seq") + Table("a", m1, Column("x", Integer, s2), extend_existing=True) assert t.c.x.default is s2 - assert m1._sequences['x_seq'] is s2 + assert m1._sequences["x_seq"] is s2 def test_sequence_attach_to_table(self): m1 = MetaData() s1 = Sequence("s") - t = Table('a', m1, Column('x', Integer, s1)) + t = Table("a", m1, Column("x", Integer, s1)) assert s1.metadata is m1 def test_sequence_attach_to_existing_table(self): m1 = MetaData() s1 = Sequence("s") - t = Table('a', m1, Column('x', Integer)) + t = Table("a", m1, Column("x", Integer)) t.c.x._init_items(s1) assert s1.metadata is m1 def test_pickle_metadata_sequence_implicit(self): m1 = MetaData() - Table('a', m1, - Column('id', Integer, primary_key=True), - Column('x', Integer, Sequence("x_seq"))) + Table( + "a", + m1, + Column("id", Integer, primary_key=True), + Column("x", Integer, Sequence("x_seq")), + ) m2 = pickle.loads(pickle.dumps(m1)) - t2 = Table('a', m2, extend_existing=True) + t2 = Table("a", m2, extend_existing=True) - eq_(m2._sequences, {'x_seq': t2.c.x.default}) + eq_(m2._sequences, {"x_seq": t2.c.x.default}) def test_pickle_metadata_schema(self): m1 = MetaData() - Table('a', m1, - Column('id', Integer, primary_key=True), - Column('x', Integer, Sequence("x_seq")), - schema='y') + Table( + "a", + m1, + Column("id", Integer, primary_key=True), + Column("x", Integer, Sequence("x_seq")), + schema="y", + ) m2 = pickle.loads(pickle.dumps(m1)) - Table('a', m2, schema='y', - extend_existing=True) + Table("a", m2, schema="y", extend_existing=True) eq_(m2._schemas, m1._schemas) def test_metadata_schema_arg(self): - m1 = MetaData(schema='sch1') - m2 = MetaData(schema='sch1', quote_schema=True) - m3 = MetaData(schema='sch1', quote_schema=False) + m1 = MetaData(schema="sch1") + m2 = MetaData(schema="sch1", quote_schema=True) + m3 = MetaData(schema="sch1", quote_schema=False) m4 = MetaData() - for i, (name, metadata, schema, quote_schema, - exp_schema, exp_quote_schema) in enumerate([ - ('t1', m1, None, None, 'sch1', None), - ('t2', m1, 'sch2', None, 'sch2', None), - ('t3', m1, 'sch2', True, 'sch2', True), - ('t4', m1, 'sch1', None, 'sch1', None), - ('t5', m1, BLANK_SCHEMA, None, None, None), - ('t1', m2, None, None, 'sch1', True), - ('t2', m2, 'sch2', None, 'sch2', None), - ('t3', m2, 'sch2', True, 'sch2', True), - ('t4', m2, 'sch1', None, 'sch1', None), - ('t1', m3, None, None, 'sch1', False), - ('t2', m3, 'sch2', None, 'sch2', None), - ('t3', m3, 'sch2', True, 'sch2', True), - ('t4', m3, 'sch1', None, 'sch1', None), - ('t1', m4, None, None, None, None), - ('t2', m4, 'sch2', None, 'sch2', None), - ('t3', m4, 'sch2', True, 'sch2', True), - ('t4', m4, 'sch1', None, 'sch1', None), - ('t5', m4, BLANK_SCHEMA, None, None, None), - ]): + for ( + i, + ( + name, + metadata, + schema, + quote_schema, + exp_schema, + exp_quote_schema, + ), + ) in enumerate( + [ + ("t1", m1, None, None, "sch1", None), + ("t2", m1, "sch2", None, "sch2", None), + ("t3", m1, "sch2", True, "sch2", True), + ("t4", m1, "sch1", None, "sch1", None), + ("t5", m1, BLANK_SCHEMA, None, None, None), + ("t1", m2, None, None, "sch1", True), + ("t2", m2, "sch2", None, "sch2", None), + ("t3", m2, "sch2", True, "sch2", True), + ("t4", m2, "sch1", None, "sch1", None), + ("t1", m3, None, None, "sch1", False), + ("t2", m3, "sch2", None, "sch2", None), + ("t3", m3, "sch2", True, "sch2", True), + ("t4", m3, "sch1", None, "sch1", None), + ("t1", m4, None, None, None, None), + ("t2", m4, "sch2", None, "sch2", None), + ("t3", m4, "sch2", True, "sch2", True), + ("t4", m4, "sch1", None, "sch1", None), + ("t5", m4, BLANK_SCHEMA, None, None, None), + ] + ): kw = {} if schema is not None: - kw['schema'] = schema + kw["schema"] = schema if quote_schema is not None: - kw['quote_schema'] = quote_schema + kw["quote_schema"] = quote_schema t = Table(name, metadata, **kw) eq_(t.schema, exp_schema, "test %d, table schema" % i) - eq_(t.schema.quote if t.schema is not None else None, + eq_( + t.schema.quote if t.schema is not None else None, exp_quote_schema, - "test %d, table quote_schema" % i) + "test %d, table quote_schema" % i, + ) seq = Sequence(name, metadata=metadata, **kw) eq_(seq.schema, exp_schema, "test %d, seq schema" % i) - eq_(seq.schema.quote if seq.schema is not None else None, + eq_( + seq.schema.quote if seq.schema is not None else None, exp_quote_schema, - "test %d, seq quote_schema" % i) + "test %d, seq quote_schema" % i, + ) def test_manual_dependencies(self): meta = MetaData() - a = Table('a', meta, Column('foo', Integer)) - b = Table('b', meta, Column('foo', Integer)) - c = Table('c', meta, Column('foo', Integer)) - d = Table('d', meta, Column('foo', Integer)) - e = Table('e', meta, Column('foo', Integer)) + a = Table("a", meta, Column("foo", Integer)) + b = Table("b", meta, Column("foo", Integer)) + c = Table("c", meta, Column("foo", Integer)) + d = Table("d", meta, Column("foo", Integer)) + e = Table("e", meta, Column("foo", Integer)) e.add_is_dependent_on(c) a.add_is_dependent_on(b) b.add_is_dependent_on(d) e.add_is_dependent_on(b) c.add_is_dependent_on(a) - eq_( - meta.sorted_tables, - [d, b, a, c, e] - ) + eq_(meta.sorted_tables, [d, b, a, c, e]) def test_deterministic_order(self): meta = MetaData() - a = Table('a', meta, Column('foo', Integer)) - b = Table('b', meta, Column('foo', Integer)) - c = Table('c', meta, Column('foo', Integer)) - d = Table('d', meta, Column('foo', Integer)) - e = Table('e', meta, Column('foo', Integer)) + a = Table("a", meta, Column("foo", Integer)) + b = Table("b", meta, Column("foo", Integer)) + c = Table("c", meta, Column("foo", Integer)) + d = Table("d", meta, Column("foo", Integer)) + e = Table("e", meta, Column("foo", Integer)) e.add_is_dependent_on(c) a.add_is_dependent_on(b) - eq_( - meta.sorted_tables, - [b, c, d, a, e] - ) + eq_(meta.sorted_tables, [b, c, d, a, e]) def test_nonexistent(self): - assert_raises(tsa.exc.NoSuchTableError, Table, - 'fake_table', - MetaData(testing.db), autoload=True) + assert_raises( + tsa.exc.NoSuchTableError, + Table, + "fake_table", + MetaData(testing.db), + autoload=True, + ) def test_assorted_repr(self): t1 = Table("foo", MetaData(), Column("x", Integer)) @@ -532,89 +615,74 @@ class MetaDataTest(fixtures.TestBase, ComparesTables): ck = schema.CheckConstraint("x > y", name="someconstraint") for const, exp in ( - (Sequence("my_seq"), - "Sequence('my_seq')"), - (Sequence("my_seq", start=5), - "Sequence('my_seq', start=5)"), - (Column("foo", Integer), - "Column('foo', Integer(), table=None)"), - (Table("bar", MetaData(), Column("x", String)), + (Sequence("my_seq"), "Sequence('my_seq')"), + (Sequence("my_seq", start=5), "Sequence('my_seq', start=5)"), + (Column("foo", Integer), "Column('foo', Integer(), table=None)"), + ( + Table("bar", MetaData(), Column("x", String)), "Table('bar', MetaData(bind=None), " - "Column('x', String(), table=), schema=None)"), - (schema.DefaultGenerator(for_update=True), - "DefaultGenerator(for_update=True)"), + "Column('x', String(), table=), schema=None)", + ), + ( + schema.DefaultGenerator(for_update=True), + "DefaultGenerator(for_update=True)", + ), (schema.Index("bar", "c"), "Index('bar', 'c')"), (i1, "Index('bar', Column('x', Integer(), table=))"), (schema.FetchedValue(), "FetchedValue()"), - (ck, - "CheckConstraint(" - "%s" - ", name='someconstraint')" % repr(ck.sqltext)), - (ColumnDefault(('foo', 'bar')), "ColumnDefault(('foo', 'bar'))") + ( + ck, + "CheckConstraint(" + "%s" + ", name='someconstraint')" % repr(ck.sqltext), + ), + (ColumnDefault(("foo", "bar")), "ColumnDefault(('foo', 'bar'))"), ): - eq_( - repr(const), - exp - ) + eq_(repr(const), exp) class ToMetaDataTest(fixtures.TestBase, ComparesTables): - @testing.requires.check_constraints def test_copy(self): from sqlalchemy.testing.schema import Table + meta = MetaData() table = Table( - 'mytable', + "mytable", meta, + Column("myid", Integer, Sequence("foo_id_seq"), primary_key=True), + Column("name", String(40), nullable=True), Column( - 'myid', - Integer, - Sequence('foo_id_seq'), - primary_key=True), - Column( - 'name', - String(40), - nullable=True), - Column( - 'foo', + "foo", String(40), nullable=False, - server_default='x', - server_onupdate='q'), + server_default="x", + server_onupdate="q", + ), Column( - 'bar', - String(40), - nullable=False, - default='y', - onupdate='z'), + "bar", String(40), nullable=False, default="y", onupdate="z" + ), Column( - 'description', - String(30), - CheckConstraint("description='hi'")), - UniqueConstraint('name'), - test_needs_fk=True) + "description", String(30), CheckConstraint("description='hi'") + ), + UniqueConstraint("name"), + test_needs_fk=True, + ) table2 = Table( - 'othertable', + "othertable", meta, - Column( - 'id', - Integer, - Sequence('foo_seq'), - primary_key=True), - Column( - 'myid', - Integer, - ForeignKey('mytable.myid'), - ), - test_needs_fk=True) + Column("id", Integer, Sequence("foo_seq"), primary_key=True), + Column("myid", Integer, ForeignKey("mytable.myid")), + test_needs_fk=True, + ) table3 = Table( - 'has_comments', meta, - Column('foo', Integer, comment='some column'), - comment='table comment' + "has_comments", + meta, + Column("foo", Integer, comment="some column"), + comment="table comment", ) def test_to_metadata(): @@ -630,47 +698,61 @@ class ToMetaDataTest(fixtures.TestBase, ComparesTables): assert meta2.bind is None pickle.loads(pickle.dumps(meta2)) return ( - meta2.tables['mytable'], - meta2.tables['othertable'], meta2.tables['has_comments']) + meta2.tables["mytable"], + meta2.tables["othertable"], + meta2.tables["has_comments"], + ) def test_pickle_via_reflect(): # this is the most common use case, pickling the results of a # database reflection meta2 = MetaData(bind=testing.db) - t1 = Table('mytable', meta2, autoload=True) - Table('othertable', meta2, autoload=True) - Table('has_comments', meta2, autoload=True) + t1 = Table("mytable", meta2, autoload=True) + Table("othertable", meta2, autoload=True) + Table("has_comments", meta2, autoload=True) meta3 = pickle.loads(pickle.dumps(meta2)) assert meta3.bind is None - assert meta3.tables['mytable'] is not t1 + assert meta3.tables["mytable"] is not t1 return ( - meta3.tables['mytable'], meta3.tables['othertable'], - meta3.tables['has_comments'] + meta3.tables["mytable"], + meta3.tables["othertable"], + meta3.tables["has_comments"], ) meta.create_all(testing.db) try: - for test, has_constraints, reflect in \ - (test_to_metadata, True, False), \ - (test_pickle, True, False), \ - (test_pickle_via_reflect, False, True): + for test, has_constraints, reflect in ( + (test_to_metadata, True, False), + (test_pickle, True, False), + (test_pickle_via_reflect, False, True), + ): table_c, table2_c, table3_c = test() self.assert_tables_equal(table, table_c) self.assert_tables_equal(table2, table2_c) assert table is not table_c assert table.primary_key is not table_c.primary_key - assert list(table2_c.c.myid.foreign_keys)[0].column \ + assert ( + list(table2_c.c.myid.foreign_keys)[0].column is table_c.c.myid - assert list(table2_c.c.myid.foreign_keys)[0].column \ + ) + assert ( + list(table2_c.c.myid.foreign_keys)[0].column is not table.c.myid - assert 'x' in str(table_c.c.foo.server_default.arg) + ) + assert "x" in str(table_c.c.foo.server_default.arg) if not reflect: assert isinstance(table_c.c.myid.default, Sequence) - assert str(table_c.c.foo.server_onupdate.arg) == 'q' - assert str(table_c.c.bar.default.arg) == 'y' - assert getattr(table_c.c.bar.onupdate.arg, 'arg', - table_c.c.bar.onupdate.arg) == 'z' + assert str(table_c.c.foo.server_onupdate.arg) == "q" + assert str(table_c.c.bar.default.arg) == "y" + assert ( + getattr( + table_c.c.bar.onupdate.arg, + "arg", + table_c.c.bar.onupdate.arg, + ) + == "z" + ) assert isinstance(table2_c.c.id.default, Sequence) # constraints don't get reflected for any dialect right @@ -701,8 +783,8 @@ class ToMetaDataTest(fixtures.TestBase, ComparesTables): def test_col_key_fk_parent(self): # test #2643 m1 = MetaData() - a = Table('a', m1, Column('x', Integer)) - b = Table('b', m1, Column('x', Integer, ForeignKey('a.x'), key='y')) + a = Table("a", m1, Column("x", Integer)) + b = Table("b", m1, Column("x", Integer, ForeignKey("a.x"), key="y")) assert b.c.y.references(a.c.x) m2 = MetaData() @@ -713,120 +795,141 @@ class ToMetaDataTest(fixtures.TestBase, ComparesTables): def test_change_schema(self): meta = MetaData() - table = Table('mytable', meta, - Column('myid', Integer, primary_key=True), - Column('name', String(40), nullable=True), - Column('description', String(30), - CheckConstraint("description='hi'")), - UniqueConstraint('name'), - ) + table = Table( + "mytable", + meta, + Column("myid", Integer, primary_key=True), + Column("name", String(40), nullable=True), + Column( + "description", String(30), CheckConstraint("description='hi'") + ), + UniqueConstraint("name"), + ) - table2 = Table('othertable', meta, - Column('id', Integer, primary_key=True), - Column('myid', Integer, ForeignKey('mytable.myid')), - ) + table2 = Table( + "othertable", + meta, + Column("id", Integer, primary_key=True), + Column("myid", Integer, ForeignKey("mytable.myid")), + ) meta2 = MetaData() - table_c = table.tometadata(meta2, schema='someschema') - table2_c = table2.tometadata(meta2, schema='someschema') + table_c = table.tometadata(meta2, schema="someschema") + table2_c = table2.tometadata(meta2, schema="someschema") - eq_(str(table_c.join(table2_c).onclause), str(table_c.c.myid - == table2_c.c.myid)) - eq_(str(table_c.join(table2_c).onclause), - 'someschema.mytable.myid = someschema.othertable.myid') + eq_( + str(table_c.join(table2_c).onclause), + str(table_c.c.myid == table2_c.c.myid), + ) + eq_( + str(table_c.join(table2_c).onclause), + "someschema.mytable.myid = someschema.othertable.myid", + ) def test_retain_table_schema(self): meta = MetaData() - table = Table('mytable', meta, - Column('myid', Integer, primary_key=True), - Column('name', String(40), nullable=True), - Column('description', String(30), - CheckConstraint("description='hi'")), - UniqueConstraint('name'), - schema='myschema', - ) + table = Table( + "mytable", + meta, + Column("myid", Integer, primary_key=True), + Column("name", String(40), nullable=True), + Column( + "description", String(30), CheckConstraint("description='hi'") + ), + UniqueConstraint("name"), + schema="myschema", + ) table2 = Table( - 'othertable', + "othertable", meta, - Column( - 'id', - Integer, - primary_key=True), - Column( - 'myid', - Integer, - ForeignKey('myschema.mytable.myid')), - schema='myschema', + Column("id", Integer, primary_key=True), + Column("myid", Integer, ForeignKey("myschema.mytable.myid")), + schema="myschema", ) meta2 = MetaData() table_c = table.tometadata(meta2) table2_c = table2.tometadata(meta2) - eq_(str(table_c.join(table2_c).onclause), str(table_c.c.myid - == table2_c.c.myid)) - eq_(str(table_c.join(table2_c).onclause), - 'myschema.mytable.myid = myschema.othertable.myid') + eq_( + str(table_c.join(table2_c).onclause), + str(table_c.c.myid == table2_c.c.myid), + ) + eq_( + str(table_c.join(table2_c).onclause), + "myschema.mytable.myid = myschema.othertable.myid", + ) def test_change_name_retain_metadata(self): meta = MetaData() - table = Table('mytable', meta, - Column('myid', Integer, primary_key=True), - Column('name', String(40), nullable=True), - Column('description', String(30), - CheckConstraint("description='hi'")), - UniqueConstraint('name'), - schema='myschema', - ) + table = Table( + "mytable", + meta, + Column("myid", Integer, primary_key=True), + Column("name", String(40), nullable=True), + Column( + "description", String(30), CheckConstraint("description='hi'") + ), + UniqueConstraint("name"), + schema="myschema", + ) - table2 = table.tometadata(table.metadata, name='newtable') - table3 = table.tometadata(table.metadata, schema='newschema', - name='newtable') + table2 = table.tometadata(table.metadata, name="newtable") + table3 = table.tometadata( + table.metadata, schema="newschema", name="newtable" + ) assert table.metadata is table2.metadata assert table.metadata is table3.metadata - eq_((table.name, table2.name, table3.name), - ('mytable', 'newtable', 'newtable')) - eq_((table.key, table2.key, table3.key), - ('myschema.mytable', 'myschema.newtable', 'newschema.newtable')) + eq_( + (table.name, table2.name, table3.name), + ("mytable", "newtable", "newtable"), + ) + eq_( + (table.key, table2.key, table3.key), + ("myschema.mytable", "myschema.newtable", "newschema.newtable"), + ) def test_change_name_change_metadata(self): meta = MetaData() meta2 = MetaData() - table = Table('mytable', meta, - Column('myid', Integer, primary_key=True), - Column('name', String(40), nullable=True), - Column('description', String(30), - CheckConstraint("description='hi'")), - UniqueConstraint('name'), - schema='myschema', - ) + table = Table( + "mytable", + meta, + Column("myid", Integer, primary_key=True), + Column("name", String(40), nullable=True), + Column( + "description", String(30), CheckConstraint("description='hi'") + ), + UniqueConstraint("name"), + schema="myschema", + ) - table2 = table.tometadata(meta2, name='newtable') + table2 = table.tometadata(meta2, name="newtable") assert table.metadata is not table2.metadata - eq_((table.name, table2.name), - ('mytable', 'newtable')) - eq_((table.key, table2.key), - ('myschema.mytable', 'myschema.newtable')) + eq_((table.name, table2.name), ("mytable", "newtable")) + eq_((table.key, table2.key), ("myschema.mytable", "myschema.newtable")) def test_change_name_selfref_fk_moves(self): meta = MetaData() - referenced = Table('ref', meta, - Column('id', Integer, primary_key=True), - ) - table = Table('mytable', meta, - Column('id', Integer, primary_key=True), - Column('parent_id', ForeignKey('mytable.id')), - Column('ref_id', ForeignKey('ref.id')) - ) + referenced = Table( + "ref", meta, Column("id", Integer, primary_key=True) + ) + table = Table( + "mytable", + meta, + Column("id", Integer, primary_key=True), + Column("parent_id", ForeignKey("mytable.id")), + Column("ref_id", ForeignKey("ref.id")), + ) - table2 = table.tometadata(table.metadata, name='newtable') + table2 = table.tometadata(table.metadata, name="newtable") assert table.metadata is table2.metadata assert table2.c.ref_id.references(referenced.c.id) assert table2.c.parent_id.references(table2.c.id) @@ -834,18 +937,21 @@ class ToMetaDataTest(fixtures.TestBase, ComparesTables): def test_change_name_selfref_fk_moves_w_schema(self): meta = MetaData() - referenced = Table('ref', meta, - Column('id', Integer, primary_key=True), - ) - table = Table('mytable', meta, - Column('id', Integer, primary_key=True), - Column('parent_id', ForeignKey('mytable.id')), - Column('ref_id', ForeignKey('ref.id')) - ) + referenced = Table( + "ref", meta, Column("id", Integer, primary_key=True) + ) + table = Table( + "mytable", + meta, + Column("id", Integer, primary_key=True), + Column("parent_id", ForeignKey("mytable.id")), + Column("ref_id", ForeignKey("ref.id")), + ) table2 = table.tometadata( - table.metadata, name='newtable', schema='newschema') - ref2 = referenced.tometadata(table.metadata, schema='newschema') + table.metadata, name="newtable", schema="newschema" + ) + ref2 = referenced.tometadata(table.metadata, schema="newschema") assert table.metadata is table2.metadata assert table2.c.ref_id.references(ref2.c.id) assert table2.c.parent_id.references(table2.c.id) @@ -854,8 +960,9 @@ class ToMetaDataTest(fixtures.TestBase, ComparesTables): m2 = MetaData() existing_schema = t2.schema if schema: - t2c = t2.tometadata(m2, schema=schema, - referred_schema_fn=referred_schema_fn) + t2c = t2.tometadata( + m2, schema=schema, referred_schema_fn=referred_schema_fn + ) eq_(t2c.schema, schema) else: t2c = t2.tometadata(m2, referred_schema_fn=referred_schema_fn) @@ -865,151 +972,164 @@ class ToMetaDataTest(fixtures.TestBase, ComparesTables): def test_fk_has_schema_string_retain_schema(self): m = MetaData() - t2 = Table('t2', m, Column('y', Integer, ForeignKey('q.t1.x'))) + t2 = Table("t2", m, Column("y", Integer, ForeignKey("q.t1.x"))) self._assert_fk(t2, None, "q.t1.x") - Table('t1', m, Column('x', Integer), schema='q') + Table("t1", m, Column("x", Integer), schema="q") self._assert_fk(t2, None, "q.t1.x") def test_fk_has_schema_string_new_schema(self): m = MetaData() - t2 = Table('t2', m, Column('y', Integer, ForeignKey('q.t1.x'))) + t2 = Table("t2", m, Column("y", Integer, ForeignKey("q.t1.x"))) self._assert_fk(t2, "z", "q.t1.x") - Table('t1', m, Column('x', Integer), schema='q') + Table("t1", m, Column("x", Integer), schema="q") self._assert_fk(t2, "z", "q.t1.x") def test_fk_has_schema_col_retain_schema(self): m = MetaData() - t1 = Table('t1', m, Column('x', Integer), schema='q') - t2 = Table('t2', m, Column('y', Integer, ForeignKey(t1.c.x))) + t1 = Table("t1", m, Column("x", Integer), schema="q") + t2 = Table("t2", m, Column("y", Integer, ForeignKey(t1.c.x))) self._assert_fk(t2, "z", "q.t1.x") def test_fk_has_schema_col_new_schema(self): m = MetaData() - t1 = Table('t1', m, Column('x', Integer), schema='q') - t2 = Table('t2', m, Column('y', Integer, ForeignKey(t1.c.x))) + t1 = Table("t1", m, Column("x", Integer), schema="q") + t2 = Table("t2", m, Column("y", Integer, ForeignKey(t1.c.x))) self._assert_fk(t2, "z", "q.t1.x") def test_fk_and_referent_has_same_schema_string_retain_schema(self): m = MetaData() - t2 = Table('t2', m, Column('y', Integer, - ForeignKey('q.t1.x')), schema="q") + t2 = Table( + "t2", m, Column("y", Integer, ForeignKey("q.t1.x")), schema="q" + ) self._assert_fk(t2, None, "q.t1.x") - Table('t1', m, Column('x', Integer), schema='q') + Table("t1", m, Column("x", Integer), schema="q") self._assert_fk(t2, None, "q.t1.x") def test_fk_and_referent_has_same_schema_string_new_schema(self): m = MetaData() - t2 = Table('t2', m, Column('y', Integer, - ForeignKey('q.t1.x')), schema="q") + t2 = Table( + "t2", m, Column("y", Integer, ForeignKey("q.t1.x")), schema="q" + ) self._assert_fk(t2, "z", "z.t1.x") - Table('t1', m, Column('x', Integer), schema='q') + Table("t1", m, Column("x", Integer), schema="q") self._assert_fk(t2, "z", "z.t1.x") def test_fk_and_referent_has_same_schema_col_retain_schema(self): m = MetaData() - t1 = Table('t1', m, Column('x', Integer), schema='q') - t2 = Table('t2', m, Column('y', Integer, - ForeignKey(t1.c.x)), schema='q') + t1 = Table("t1", m, Column("x", Integer), schema="q") + t2 = Table( + "t2", m, Column("y", Integer, ForeignKey(t1.c.x)), schema="q" + ) self._assert_fk(t2, None, "q.t1.x") def test_fk_and_referent_has_same_schema_col_new_schema(self): m = MetaData() - t1 = Table('t1', m, Column('x', Integer), schema='q') - t2 = Table('t2', m, Column('y', Integer, - ForeignKey(t1.c.x)), schema='q') - self._assert_fk(t2, 'z', "z.t1.x") + t1 = Table("t1", m, Column("x", Integer), schema="q") + t2 = Table( + "t2", m, Column("y", Integer, ForeignKey(t1.c.x)), schema="q" + ) + self._assert_fk(t2, "z", "z.t1.x") def test_fk_and_referent_has_diff_schema_string_retain_schema(self): m = MetaData() - t2 = Table('t2', m, Column('y', Integer, - ForeignKey('p.t1.x')), schema="q") + t2 = Table( + "t2", m, Column("y", Integer, ForeignKey("p.t1.x")), schema="q" + ) self._assert_fk(t2, None, "p.t1.x") - Table('t1', m, Column('x', Integer), schema='p') + Table("t1", m, Column("x", Integer), schema="p") self._assert_fk(t2, None, "p.t1.x") def test_fk_and_referent_has_diff_schema_string_new_schema(self): m = MetaData() - t2 = Table('t2', m, Column('y', Integer, - ForeignKey('p.t1.x')), schema="q") + t2 = Table( + "t2", m, Column("y", Integer, ForeignKey("p.t1.x")), schema="q" + ) self._assert_fk(t2, "z", "p.t1.x") - Table('t1', m, Column('x', Integer), schema='p') + Table("t1", m, Column("x", Integer), schema="p") self._assert_fk(t2, "z", "p.t1.x") def test_fk_and_referent_has_diff_schema_col_retain_schema(self): m = MetaData() - t1 = Table('t1', m, Column('x', Integer), schema='p') - t2 = Table('t2', m, Column('y', Integer, - ForeignKey(t1.c.x)), schema='q') + t1 = Table("t1", m, Column("x", Integer), schema="p") + t2 = Table( + "t2", m, Column("y", Integer, ForeignKey(t1.c.x)), schema="q" + ) self._assert_fk(t2, None, "p.t1.x") def test_fk_and_referent_has_diff_schema_col_new_schema(self): m = MetaData() - t1 = Table('t1', m, Column('x', Integer), schema='p') - t2 = Table('t2', m, Column('y', Integer, - ForeignKey(t1.c.x)), schema='q') - self._assert_fk(t2, 'z', "p.t1.x") + t1 = Table("t1", m, Column("x", Integer), schema="p") + t2 = Table( + "t2", m, Column("y", Integer, ForeignKey(t1.c.x)), schema="q" + ) + self._assert_fk(t2, "z", "p.t1.x") def test_fk_custom_system(self): m = MetaData() - t2 = Table('t2', m, Column('y', Integer, - ForeignKey('p.t1.x')), schema='q') + t2 = Table( + "t2", m, Column("y", Integer, ForeignKey("p.t1.x")), schema="q" + ) def ref_fn(table, to_schema, constraint, referred_schema): assert table is t2 eq_(to_schema, "z") eq_(referred_schema, "p") return "h" - self._assert_fk(t2, 'z', "h.t1.x", referred_schema_fn=ref_fn) + + self._assert_fk(t2, "z", "h.t1.x", referred_schema_fn=ref_fn) def test_copy_info(self): m = MetaData() - fk = ForeignKey('t2.id') - c = Column('c', Integer, fk) - ck = CheckConstraint('c > 5') - t = Table('t', m, c, ck) - - m.info['minfo'] = True - fk.info['fkinfo'] = True - c.info['cinfo'] = True - ck.info['ckinfo'] = True - t.info['tinfo'] = True - t.primary_key.info['pkinfo'] = True - fkc = [const for const in t.constraints if - isinstance(const, ForeignKeyConstraint)][0] - fkc.info['fkcinfo'] = True + fk = ForeignKey("t2.id") + c = Column("c", Integer, fk) + ck = CheckConstraint("c > 5") + t = Table("t", m, c, ck) + + m.info["minfo"] = True + fk.info["fkinfo"] = True + c.info["cinfo"] = True + ck.info["ckinfo"] = True + t.info["tinfo"] = True + t.primary_key.info["pkinfo"] = True + fkc = [ + const + for const in t.constraints + if isinstance(const, ForeignKeyConstraint) + ][0] + fkc.info["fkcinfo"] = True m2 = MetaData() t2 = t.tometadata(m2) - m.info['minfo'] = False - fk.info['fkinfo'] = False - c.info['cinfo'] = False - ck.info['ckinfo'] = False - t.primary_key.info['pkinfo'] = False - fkc.info['fkcinfo'] = False + m.info["minfo"] = False + fk.info["fkinfo"] = False + c.info["cinfo"] = False + ck.info["ckinfo"] = False + t.primary_key.info["pkinfo"] = False + fkc.info["fkcinfo"] = False eq_(m2.info, {}) eq_(t2.info, {"tinfo": True}) @@ -1017,21 +1137,29 @@ class ToMetaDataTest(fixtures.TestBase, ComparesTables): eq_(list(t2.c.c.foreign_keys)[0].info, {"fkinfo": True}) eq_(t2.primary_key.info, {"pkinfo": True}) - fkc2 = [const for const in t2.constraints - if isinstance(const, ForeignKeyConstraint)][0] + fkc2 = [ + const + for const in t2.constraints + if isinstance(const, ForeignKeyConstraint) + ][0] eq_(fkc2.info, {"fkcinfo": True}) - ck2 = [const for const in - t2.constraints if isinstance(const, CheckConstraint)][0] + ck2 = [ + const + for const in t2.constraints + if isinstance(const, CheckConstraint) + ][0] eq_(ck2.info, {"ckinfo": True}) def test_dialect_kwargs(self): meta = MetaData() - table = Table('mytable', meta, - Column('myid', Integer, primary_key=True), - mysql_engine='InnoDB', - ) + table = Table( + "mytable", + meta, + Column("myid", Integer, primary_key=True), + mysql_engine="InnoDB", + ) meta2 = MetaData() table_c = table.tometadata(meta2) @@ -1043,72 +1171,82 @@ class ToMetaDataTest(fixtures.TestBase, ComparesTables): def test_indexes(self): meta = MetaData() - table = Table('mytable', meta, - Column('id', Integer, primary_key=True), - Column('data1', Integer, index=True), - Column('data2', Integer), - Index('text', text('data1 + 1')), - ) - Index('multi', table.c.data1, table.c.data2) - Index('func', func.abs(table.c.data1)) - Index('multi-func', table.c.data1, func.abs(table.c.data2)) + table = Table( + "mytable", + meta, + Column("id", Integer, primary_key=True), + Column("data1", Integer, index=True), + Column("data2", Integer), + Index("text", text("data1 + 1")), + ) + Index("multi", table.c.data1, table.c.data2) + Index("func", func.abs(table.c.data1)) + Index("multi-func", table.c.data1, func.abs(table.c.data2)) meta2 = MetaData() table_c = table.tometadata(meta2) def _get_key(i): - return [i.name, i.unique] + \ - sorted(i.kwargs.items()) + \ - [str(col) for col in i.expressions] + return ( + [i.name, i.unique] + + sorted(i.kwargs.items()) + + [str(col) for col in i.expressions] + ) eq_( sorted([_get_key(i) for i in table.indexes]), - sorted([_get_key(i) for i in table_c.indexes]) + sorted([_get_key(i) for i in table_c.indexes]), ) def test_indexes_with_col_redefine(self): meta = MetaData() - table = Table('mytable', meta, - Column('id', Integer, primary_key=True), - Column('data1', Integer), - Column('data2', Integer), - Index('text', text('data1 + 1')), - ) - Index('multi', table.c.data1, table.c.data2) - Index('func', func.abs(table.c.data1)) - Index('multi-func', table.c.data1, func.abs(table.c.data2)) - - table = Table('mytable', meta, - Column('data1', Integer), - Column('data2', Integer), - extend_existing=True - ) + table = Table( + "mytable", + meta, + Column("id", Integer, primary_key=True), + Column("data1", Integer), + Column("data2", Integer), + Index("text", text("data1 + 1")), + ) + Index("multi", table.c.data1, table.c.data2) + Index("func", func.abs(table.c.data1)) + Index("multi-func", table.c.data1, func.abs(table.c.data2)) + + table = Table( + "mytable", + meta, + Column("data1", Integer), + Column("data2", Integer), + extend_existing=True, + ) meta2 = MetaData() table_c = table.tometadata(meta2) def _get_key(i): - return [i.name, i.unique] + \ - sorted(i.kwargs.items()) + \ - [str(col) for col in i.expressions] + return ( + [i.name, i.unique] + + sorted(i.kwargs.items()) + + [str(col) for col in i.expressions] + ) eq_( sorted([_get_key(i) for i in table.indexes]), - sorted([_get_key(i) for i in table_c.indexes]) + sorted([_get_key(i) for i in table_c.indexes]), ) @emits_warning("Table '.+' already exists within the given MetaData") def test_already_exists(self): meta1 = MetaData() - table1 = Table('mytable', meta1, - Column('myid', Integer, primary_key=True), - ) + table1 = Table( + "mytable", meta1, Column("myid", Integer, primary_key=True) + ) meta2 = MetaData() - table2 = Table('mytable', meta2, - Column('yourid', Integer, primary_key=True), - ) + table2 = Table( + "mytable", meta2, Column("yourid", Integer, primary_key=True) + ) table_c = table1.tometadata(meta2) table_d = table2.tometadata(meta2) @@ -1117,86 +1255,97 @@ class ToMetaDataTest(fixtures.TestBase, ComparesTables): assert table_c is table_d def test_default_schema_metadata(self): - meta = MetaData(schema='myschema') + meta = MetaData(schema="myschema") table = Table( - 'mytable', + "mytable", meta, + Column("myid", Integer, primary_key=True), + Column("name", String(40), nullable=True), Column( - 'myid', - Integer, - primary_key=True), - Column( - 'name', - String(40), - nullable=True), - Column( - 'description', - String(30), - CheckConstraint("description='hi'")), - UniqueConstraint('name'), + "description", String(30), CheckConstraint("description='hi'") + ), + UniqueConstraint("name"), ) table2 = Table( - 'othertable', meta, Column( - 'id', Integer, primary_key=True), Column( - 'myid', Integer, ForeignKey('myschema.mytable.myid')), ) + "othertable", + meta, + Column("id", Integer, primary_key=True), + Column("myid", Integer, ForeignKey("myschema.mytable.myid")), + ) - meta2 = MetaData(schema='someschema') + meta2 = MetaData(schema="someschema") table_c = table.tometadata(meta2, schema=None) table2_c = table2.tometadata(meta2, schema=None) - eq_(str(table_c.join(table2_c).onclause), - str(table_c.c.myid == table2_c.c.myid)) - eq_(str(table_c.join(table2_c).onclause), - "someschema.mytable.myid = someschema.othertable.myid") + eq_( + str(table_c.join(table2_c).onclause), + str(table_c.c.myid == table2_c.c.myid), + ) + eq_( + str(table_c.join(table2_c).onclause), + "someschema.mytable.myid = someschema.othertable.myid", + ) def test_strip_schema(self): meta = MetaData() - table = Table('mytable', meta, - Column('myid', Integer, primary_key=True), - Column('name', String(40), nullable=True), - Column('description', String(30), - CheckConstraint("description='hi'")), - UniqueConstraint('name'), - ) + table = Table( + "mytable", + meta, + Column("myid", Integer, primary_key=True), + Column("name", String(40), nullable=True), + Column( + "description", String(30), CheckConstraint("description='hi'") + ), + UniqueConstraint("name"), + ) - table2 = Table('othertable', meta, - Column('id', Integer, primary_key=True), - Column('myid', Integer, ForeignKey('mytable.myid')), - ) + table2 = Table( + "othertable", + meta, + Column("id", Integer, primary_key=True), + Column("myid", Integer, ForeignKey("mytable.myid")), + ) meta2 = MetaData() table_c = table.tometadata(meta2, schema=None) table2_c = table2.tometadata(meta2, schema=None) - eq_(str(table_c.join(table2_c).onclause), str(table_c.c.myid - == table2_c.c.myid)) - eq_(str(table_c.join(table2_c).onclause), - 'mytable.myid = othertable.myid') + eq_( + str(table_c.join(table2_c).onclause), + str(table_c.c.myid == table2_c.c.myid), + ) + eq_( + str(table_c.join(table2_c).onclause), + "mytable.myid = othertable.myid", + ) def test_unique_true_flag(self): meta = MetaData() - table = Table('mytable', meta, Column('x', Integer, unique=True)) + table = Table("mytable", meta, Column("x", Integer, unique=True)) m2 = MetaData() t2 = table.tometadata(m2) eq_( - len([ - const for const - in t2.constraints - if isinstance(const, UniqueConstraint)]), - 1 + len( + [ + const + for const in t2.constraints + if isinstance(const, UniqueConstraint) + ] + ), + 1, ) def test_index_true_flag(self): meta = MetaData() - table = Table('mytable', meta, Column('x', Integer, index=True)) + table = Table("mytable", meta, Column("x", Integer, index=True)) m2 = MetaData() @@ -1214,168 +1363,161 @@ class InfoTest(fixtures.TestBase): eq_(m1.info, {"foo": "bar"}) def test_foreignkey_constraint_info(self): - fkc = ForeignKeyConstraint(['a'], ['b'], name='bar') + fkc = ForeignKeyConstraint(["a"], ["b"], name="bar") eq_(fkc.info, {}) fkc = ForeignKeyConstraint( - ['a'], ['b'], name='bar', info={"foo": "bar"}) + ["a"], ["b"], name="bar", info={"foo": "bar"} + ) eq_(fkc.info, {"foo": "bar"}) def test_foreignkey_info(self): - fkc = ForeignKey('a') + fkc = ForeignKey("a") eq_(fkc.info, {}) - fkc = ForeignKey('a', info={"foo": "bar"}) + fkc = ForeignKey("a", info={"foo": "bar"}) eq_(fkc.info, {"foo": "bar"}) def test_primarykey_constraint_info(self): - pkc = PrimaryKeyConstraint('a', name='x') + pkc = PrimaryKeyConstraint("a", name="x") eq_(pkc.info, {}) - pkc = PrimaryKeyConstraint('a', name='x', info={'foo': 'bar'}) - eq_(pkc.info, {'foo': 'bar'}) + pkc = PrimaryKeyConstraint("a", name="x", info={"foo": "bar"}) + eq_(pkc.info, {"foo": "bar"}) def test_unique_constraint_info(self): - uc = UniqueConstraint('a', name='x') + uc = UniqueConstraint("a", name="x") eq_(uc.info, {}) - uc = UniqueConstraint('a', name='x', info={'foo': 'bar'}) - eq_(uc.info, {'foo': 'bar'}) + uc = UniqueConstraint("a", name="x", info={"foo": "bar"}) + eq_(uc.info, {"foo": "bar"}) def test_check_constraint_info(self): - cc = CheckConstraint('foo=bar', name='x') + cc = CheckConstraint("foo=bar", name="x") eq_(cc.info, {}) - cc = CheckConstraint('foo=bar', name='x', info={'foo': 'bar'}) - eq_(cc.info, {'foo': 'bar'}) + cc = CheckConstraint("foo=bar", name="x", info={"foo": "bar"}) + eq_(cc.info, {"foo": "bar"}) def test_index_info(self): - ix = Index('x', 'a') + ix = Index("x", "a") eq_(ix.info, {}) - ix = Index('x', 'a', info={'foo': 'bar'}) - eq_(ix.info, {'foo': 'bar'}) + ix = Index("x", "a", info={"foo": "bar"}) + eq_(ix.info, {"foo": "bar"}) def test_column_info(self): - c = Column('x', Integer) + c = Column("x", Integer) eq_(c.info, {}) - c = Column('x', Integer, info={'foo': 'bar'}) - eq_(c.info, {'foo': 'bar'}) + c = Column("x", Integer, info={"foo": "bar"}) + eq_(c.info, {"foo": "bar"}) def test_table_info(self): - t = Table('x', MetaData()) + t = Table("x", MetaData()) eq_(t.info, {}) - t = Table('x', MetaData(), info={'foo': 'bar'}) - eq_(t.info, {'foo': 'bar'}) + t = Table("x", MetaData(), info={"foo": "bar"}) + eq_(t.info, {"foo": "bar"}) class TableTest(fixtures.TestBase, AssertsCompiledSQL): - @testing.requires.temporary_tables - @testing.skip_if('mssql', 'different col format') + @testing.skip_if("mssql", "different col format") def test_prefixes(self): from sqlalchemy import Table - table1 = Table("temporary_table_1", MetaData(), - Column("col1", Integer), - prefixes=["TEMPORARY"]) + + table1 = Table( + "temporary_table_1", + MetaData(), + Column("col1", Integer), + prefixes=["TEMPORARY"], + ) self.assert_compile( schema.CreateTable(table1), - "CREATE TEMPORARY TABLE temporary_table_1 (col1 INTEGER)" + "CREATE TEMPORARY TABLE temporary_table_1 (col1 INTEGER)", ) - table2 = Table("temporary_table_2", MetaData(), - Column("col1", Integer), - prefixes=["VIRTUAL"]) + table2 = Table( + "temporary_table_2", + MetaData(), + Column("col1", Integer), + prefixes=["VIRTUAL"], + ) self.assert_compile( schema.CreateTable(table2), - "CREATE VIRTUAL TABLE temporary_table_2 (col1 INTEGER)" + "CREATE VIRTUAL TABLE temporary_table_2 (col1 INTEGER)", ) def test_table_info(self): metadata = MetaData() - t1 = Table('foo', metadata, info={'x': 'y'}) - t2 = Table('bar', metadata, info={}) - t3 = Table('bat', metadata) - assert t1.info == {'x': 'y'} + t1 = Table("foo", metadata, info={"x": "y"}) + t2 = Table("bar", metadata, info={}) + t3 = Table("bat", metadata) + assert t1.info == {"x": "y"} assert t2.info == {} assert t3.info == {} for t in (t1, t2, t3): - t.info['bar'] = 'zip' - assert t.info['bar'] == 'zip' + t.info["bar"] = "zip" + assert t.info["bar"] == "zip" def test_reset_exported_passes(self): m = MetaData() - t = Table('t', m, Column('foo', Integer)) - eq_( - list(t.c), [t.c.foo] - ) + t = Table("t", m, Column("foo", Integer)) + eq_(list(t.c), [t.c.foo]) t._reset_exported() - eq_( - list(t.c), [t.c.foo] - ) + eq_(list(t.c), [t.c.foo]) def test_foreign_key_constraints_collection(self): metadata = MetaData() - t1 = Table('foo', metadata, Column('a', Integer)) + t1 = Table("foo", metadata, Column("a", Integer)) eq_(t1.foreign_key_constraints, set()) - fk1 = ForeignKey('q.id') - fk2 = ForeignKey('j.id') - fk3 = ForeignKeyConstraint(['b', 'c'], ['r.x', 'r.y']) + fk1 = ForeignKey("q.id") + fk2 = ForeignKey("j.id") + fk3 = ForeignKeyConstraint(["b", "c"], ["r.x", "r.y"]) - t1.append_column(Column('b', Integer, fk1)) - eq_( - t1.foreign_key_constraints, - set([fk1.constraint])) + t1.append_column(Column("b", Integer, fk1)) + eq_(t1.foreign_key_constraints, set([fk1.constraint])) - t1.append_column(Column('c', Integer, fk2)) - eq_( - t1.foreign_key_constraints, - set([fk1.constraint, fk2.constraint])) + t1.append_column(Column("c", Integer, fk2)) + eq_(t1.foreign_key_constraints, set([fk1.constraint, fk2.constraint])) t1.append_constraint(fk3) eq_( t1.foreign_key_constraints, - set([fk1.constraint, fk2.constraint, fk3])) + set([fk1.constraint, fk2.constraint, fk3]), + ) def test_c_immutable(self): m = MetaData() - t1 = Table('t', m, Column('x', Integer), Column('y', Integer)) - assert_raises( - TypeError, - t1.c.extend, [Column('z', Integer)] - ) + t1 = Table("t", m, Column("x", Integer), Column("y", Integer)) + assert_raises(TypeError, t1.c.extend, [Column("z", Integer)]) def assign(): - t1.c['z'] = Column('z', Integer) - assert_raises( - TypeError, - assign - ) + t1.c["z"] = Column("z", Integer) + + assert_raises(TypeError, assign) def assign2(): - t1.c.z = Column('z', Integer) - assert_raises( - TypeError, - assign2 - ) + t1.c.z = Column("z", Integer) + + assert_raises(TypeError, assign2) def test_c_mutate_after_unpickle(self): m = MetaData() - y = Column('y', Integer) - t1 = Table('t', m, Column('x', Integer), y) + y = Column("y", Integer) + t1 = Table("t", m, Column("x", Integer), y) t2 = pickle.loads(pickle.dumps(t1)) - z = Column('z', Integer) - g = Column('g', Integer) + z = Column("z", Integer) + g = Column("g", Integer) t2.append_column(z) is_(t1.c.contains_column(y), True) @@ -1389,39 +1531,39 @@ class TableTest(fixtures.TestBase, AssertsCompiledSQL): def test_autoincrement_replace(self): m = MetaData() - t = Table('t', m, - Column('id', Integer, primary_key=True) - ) + t = Table("t", m, Column("id", Integer, primary_key=True)) is_(t._autoincrement_column, t.c.id) - t = Table('t', m, - Column('id', Integer, primary_key=True), - extend_existing=True - ) + t = Table( + "t", + m, + Column("id", Integer, primary_key=True), + extend_existing=True, + ) is_(t._autoincrement_column, t.c.id) def test_pk_args_standalone(self): m = MetaData() - t = Table('t', m, - Column('x', Integer, primary_key=True), - PrimaryKeyConstraint(mssql_clustered=True) - ) - eq_( - list(t.primary_key), [t.c.x] - ) - eq_( - t.primary_key.dialect_kwargs, {"mssql_clustered": True} + t = Table( + "t", + m, + Column("x", Integer, primary_key=True), + PrimaryKeyConstraint(mssql_clustered=True), ) + eq_(list(t.primary_key), [t.c.x]) + eq_(t.primary_key.dialect_kwargs, {"mssql_clustered": True}) def test_pk_cols_sets_flags(self): m = MetaData() - t = Table('t', m, - Column('x', Integer), - Column('y', Integer), - Column('z', Integer), - PrimaryKeyConstraint('x', 'y') - ) + t = Table( + "t", + m, + Column("x", Integer), + Column("y", Integer), + Column("z", Integer), + PrimaryKeyConstraint("x", "y"), + ) eq_(t.c.x.primary_key, True) eq_(t.c.y.primary_key, True) eq_(t.c.z.primary_key, False) @@ -1432,10 +1574,12 @@ class TableTest(fixtures.TestBase, AssertsCompiledSQL): exc.SAWarning, "Table 't' specifies columns 'x' as primary_key=True, " "not matching locally specified columns 'q'", - Table, 't', m, - Column('x', Integer, primary_key=True), - Column('q', Integer), - PrimaryKeyConstraint('q') + Table, + "t", + m, + Column("x", Integer, primary_key=True), + Column("q", Integer), + PrimaryKeyConstraint("q"), ) def test_pk_col_mismatch_two(self): @@ -1444,40 +1588,46 @@ class TableTest(fixtures.TestBase, AssertsCompiledSQL): exc.SAWarning, "Table 't' specifies columns 'a', 'b', 'c' as primary_key=True, " "not matching locally specified columns 'b', 'c'", - Table, 't', m, - Column('a', Integer, primary_key=True), - Column('b', Integer, primary_key=True), - Column('c', Integer, primary_key=True), - PrimaryKeyConstraint('b', 'c') + Table, + "t", + m, + Column("a", Integer, primary_key=True), + Column("b", Integer, primary_key=True), + Column("c", Integer, primary_key=True), + PrimaryKeyConstraint("b", "c"), ) @testing.emits_warning("Table 't'") def test_pk_col_mismatch_three(self): m = MetaData() - t = Table('t', m, - Column('x', Integer, primary_key=True), - Column('q', Integer), - PrimaryKeyConstraint('q') - ) + t = Table( + "t", + m, + Column("x", Integer, primary_key=True), + Column("q", Integer), + PrimaryKeyConstraint("q"), + ) eq_(list(t.primary_key), [t.c.q]) @testing.emits_warning("Table 't'") def test_pk_col_mismatch_four(self): m = MetaData() - t = Table('t', m, - Column('a', Integer, primary_key=True), - Column('b', Integer, primary_key=True), - Column('c', Integer, primary_key=True), - PrimaryKeyConstraint('b', 'c') - ) + t = Table( + "t", + m, + Column("a", Integer, primary_key=True), + Column("b", Integer, primary_key=True), + Column("c", Integer, primary_key=True), + PrimaryKeyConstraint("b", "c"), + ) eq_(list(t.primary_key), [t.c.b, t.c.c]) def test_pk_always_flips_nullable(self): m = MetaData() - t1 = Table('t1', m, Column('x', Integer), PrimaryKeyConstraint('x')) + t1 = Table("t1", m, Column("x", Integer), PrimaryKeyConstraint("x")) - t2 = Table('t2', m, Column('x', Integer, primary_key=True)) + t2 = Table("t2", m, Column("x", Integer, primary_key=True)) eq_(list(t1.primary_key), [t1.c.x]) @@ -1492,67 +1642,58 @@ class TableTest(fixtures.TestBase, AssertsCompiledSQL): class PKAutoIncrementTest(fixtures.TestBase): def test_multi_integer_no_autoinc(self): - pk = PrimaryKeyConstraint( - Column('a', Integer), - Column('b', Integer) - ) - t = Table('t', MetaData()) + pk = PrimaryKeyConstraint(Column("a", Integer), Column("b", Integer)) + t = Table("t", MetaData()) t.append_constraint(pk) is_(pk._autoincrement_column, None) def test_multi_integer_multi_autoinc(self): pk = PrimaryKeyConstraint( - Column('a', Integer, autoincrement=True), - Column('b', Integer, autoincrement=True) + Column("a", Integer, autoincrement=True), + Column("b", Integer, autoincrement=True), ) - t = Table('t', MetaData()) + t = Table("t", MetaData()) t.append_constraint(pk) assert_raises_message( exc.ArgumentError, "Only one Column may be marked", - lambda: pk._autoincrement_column + lambda: pk._autoincrement_column, ) def test_single_integer_no_autoinc(self): - pk = PrimaryKeyConstraint( - Column('a', Integer), - ) - t = Table('t', MetaData()) + pk = PrimaryKeyConstraint(Column("a", Integer)) + t = Table("t", MetaData()) t.append_constraint(pk) - is_(pk._autoincrement_column, pk.columns['a']) + is_(pk._autoincrement_column, pk.columns["a"]) def test_single_string_no_autoinc(self): - pk = PrimaryKeyConstraint( - Column('a', String), - ) - t = Table('t', MetaData()) + pk = PrimaryKeyConstraint(Column("a", String)) + t = Table("t", MetaData()) t.append_constraint(pk) is_(pk._autoincrement_column, None) def test_single_string_illegal_autoinc(self): - t = Table('t', MetaData(), Column('a', String, autoincrement=True)) - pk = PrimaryKeyConstraint( - t.c.a - ) + t = Table("t", MetaData(), Column("a", String, autoincrement=True)) + pk = PrimaryKeyConstraint(t.c.a) t.append_constraint(pk) assert_raises_message( exc.ArgumentError, "Column type VARCHAR on column 't.a'", - lambda: pk._autoincrement_column + lambda: pk._autoincrement_column, ) def test_single_integer_default(self): t = Table( - 't', MetaData(), - Column('a', Integer, autoincrement=True, default=lambda: 1)) - pk = PrimaryKeyConstraint( - t.c.a + "t", + MetaData(), + Column("a", Integer, autoincrement=True, default=lambda: 1), ) + pk = PrimaryKeyConstraint(t.c.a) t.append_constraint(pk) is_(pk._autoincrement_column, t.c.a) @@ -1562,47 +1703,45 @@ class PKAutoIncrementTest(fixtures.TestBase): # if the user puts autoincrement=True with a server_default, trust # them on it t = Table( - 't', MetaData(), - Column('a', Integer, - autoincrement=True, server_default=func.magic())) - pk = PrimaryKeyConstraint( - t.c.a + "t", + MetaData(), + Column( + "a", Integer, autoincrement=True, server_default=func.magic() + ), ) + pk = PrimaryKeyConstraint(t.c.a) t.append_constraint(pk) is_(pk._autoincrement_column, t.c.a) def test_implicit_autoinc_but_fks(self): m = MetaData() - Table('t1', m, Column('id', Integer, primary_key=True)) - t2 = Table( - 't2', MetaData(), - Column('a', Integer, ForeignKey('t1.id'))) - pk = PrimaryKeyConstraint( - t2.c.a - ) + Table("t1", m, Column("id", Integer, primary_key=True)) + t2 = Table("t2", MetaData(), Column("a", Integer, ForeignKey("t1.id"))) + pk = PrimaryKeyConstraint(t2.c.a) t2.append_constraint(pk) is_(pk._autoincrement_column, None) def test_explicit_autoinc_but_fks(self): m = MetaData() - Table('t1', m, Column('id', Integer, primary_key=True)) + Table("t1", m, Column("id", Integer, primary_key=True)) t2 = Table( - 't2', MetaData(), - Column('a', Integer, ForeignKey('t1.id'), autoincrement=True)) - pk = PrimaryKeyConstraint( - t2.c.a + "t2", + MetaData(), + Column("a", Integer, ForeignKey("t1.id"), autoincrement=True), ) + pk = PrimaryKeyConstraint(t2.c.a) t2.append_constraint(pk) is_(pk._autoincrement_column, t2.c.a) t3 = Table( - 't3', MetaData(), - Column('a', Integer, - ForeignKey('t1.id'), autoincrement='ignore_fk')) - pk = PrimaryKeyConstraint( - t3.c.a + "t3", + MetaData(), + Column( + "a", Integer, ForeignKey("t1.id"), autoincrement="ignore_fk" + ), ) + pk = PrimaryKeyConstraint(t3.c.a) t3.append_constraint(pk) is_(pk._autoincrement_column, t3.c.a) @@ -1622,12 +1761,14 @@ class SchemaTypeTest(fixtures.TestBase): def _on_table_create(self, target, bind, **kw): super(SchemaTypeTest.TrackEvents, self)._on_table_create( - target, bind, **kw) + target, bind, **kw + ) self.evt_targets += (target,) def _on_metadata_create(self, target, bind, **kw): super(SchemaTypeTest.TrackEvents, self)._on_metadata_create( - target, bind, **kw) + target, bind, **kw + ) self.evt_targets += (target,) # TODO: Enum and Boolean put TypeEngine first. Changing that here @@ -1644,7 +1785,6 @@ class SchemaTypeTest(fixtures.TestBase): pass class MyTypeWImpl(MyType): - def _gen_dialect_impl(self, dialect): return self.adapt(SchemaTypeTest.MyTypeImpl) @@ -1724,13 +1864,13 @@ class SchemaTypeTest(fixtures.TestBase): orig_set_parent_w_dispatch(parent) canary._set_parent_with_dispatch(parent) - with mock.patch.object(evt_target, '_set_parent', _set_parent): + with mock.patch.object(evt_target, "_set_parent", _set_parent): with mock.patch.object( - evt_target, '_set_parent_with_dispatch', - _set_parent_w_dispatch): + evt_target, "_set_parent_with_dispatch", _set_parent_w_dispatch + ): event.listen(evt_target, "before_parent_attach", canary.go) - c = Column('q', typ) + c = Column("q", typ) if double: # no clean way yet to fix this, inner schema type is called @@ -1741,8 +1881,8 @@ class SchemaTypeTest(fixtures.TestBase): mock.call._set_parent(c), mock.call.go(evt_target, c), mock.call._set_parent(c), - mock.call._set_parent_with_dispatch(c) - ] + mock.call._set_parent_with_dispatch(c), + ], ) else: eq_( @@ -1750,39 +1890,39 @@ class SchemaTypeTest(fixtures.TestBase): [ mock.call.go(evt_target, c), mock.call._set_parent(c), - mock.call._set_parent_with_dispatch(c) - ] + mock.call._set_parent_with_dispatch(c), + ], ) def test_independent_schema(self): m = MetaData() type_ = self.MyType(schema="q") - t1 = Table('x', m, Column("y", type_), schema="z") + t1 = Table("x", m, Column("y", type_), schema="z") eq_(t1.c.y.type.schema, "q") def test_inherit_schema(self): m = MetaData() type_ = self.MyType(schema="q", inherit_schema=True) - t1 = Table('x', m, Column("y", type_), schema="z") + t1 = Table("x", m, Column("y", type_), schema="z") eq_(t1.c.y.type.schema, "z") def test_independent_schema_enum(self): m = MetaData() type_ = sqltypes.Enum("a", schema="q") - t1 = Table('x', m, Column("y", type_), schema="z") + t1 = Table("x", m, Column("y", type_), schema="z") eq_(t1.c.y.type.schema, "q") def test_inherit_schema_enum(self): m = MetaData() type_ = sqltypes.Enum("a", "b", "c", schema="q", inherit_schema=True) - t1 = Table('x', m, Column("y", type_), schema="z") + t1 = Table("x", m, Column("y", type_), schema="z") eq_(t1.c.y.type.schema, "z") def test_tometadata_copy_type(self): m1 = MetaData() type_ = self.MyType() - t1 = Table('x', m1, Column("y", type_)) + t1 = Table("x", m1, Column("y", type_)) m2 = MetaData() t2 = t1.tometadata(m2) @@ -1794,14 +1934,13 @@ class SchemaTypeTest(fixtures.TestBase): is_(t2.c.y.type.table, t2) def test_tometadata_copy_decorated(self): - class MyDecorated(TypeDecorator): impl = self.MyType m1 = MetaData() type_ = MyDecorated(schema="z") - t1 = Table('x', m1, Column("y", type_)) + t1 = Table("x", m1, Column("y", type_)) m2 = MetaData() t2 = t1.tometadata(m2) @@ -1811,7 +1950,7 @@ class SchemaTypeTest(fixtures.TestBase): m1 = MetaData() type_ = self.MyType() - t1 = Table('x', m1, Column("y", type_)) + t1 = Table("x", m1, Column("y", type_)) m2 = MetaData() t2 = t1.tometadata(m2, schema="bar") @@ -1822,7 +1961,7 @@ class SchemaTypeTest(fixtures.TestBase): m1 = MetaData() type_ = self.MyType(inherit_schema=True) - t1 = Table('x', m1, Column("y", type_)) + t1 = Table("x", m1, Column("y", type_)) m2 = MetaData() t2 = t1.tometadata(m2, schema="bar") @@ -1834,7 +1973,7 @@ class SchemaTypeTest(fixtures.TestBase): m1 = MetaData() type_ = self.MyType() - t1 = Table('x', m1, Column("y", type_)) + t1 = Table("x", m1, Column("y", type_)) m2 = MetaData() t2 = t1.tometadata(m2) @@ -1851,17 +1990,17 @@ class SchemaTypeTest(fixtures.TestBase): def test_enum_column_copy_transfers_events(self): m = MetaData() - type_ = self.WrapEnum('a', 'b', 'c', name='foo') - y = Column('y', type_) + type_ = self.WrapEnum("a", "b", "c", name="foo") + y = Column("y", type_) y_copy = y.copy() - t1 = Table('x', m, y_copy) + t1 = Table("x", m, y_copy) is_true(y_copy.type._create_events) # for PostgreSQL, this will emit CREATE TYPE m.dispatch.before_create(t1, testing.db) try: - eq_(t1.c.y.type.evt_targets, (t1, )) + eq_(t1.c.y.type.evt_targets, (t1,)) finally: # do the drop so that PostgreSQL emits DROP TYPE m.dispatch.after_drop(t1, testing.db) @@ -1869,25 +2008,30 @@ class SchemaTypeTest(fixtures.TestBase): def test_enum_nonnative_column_copy_transfers_events(self): m = MetaData() - type_ = self.WrapEnum('a', 'b', 'c', name='foo', native_enum=False) - y = Column('y', type_) + type_ = self.WrapEnum("a", "b", "c", name="foo", native_enum=False) + y = Column("y", type_) y_copy = y.copy() - t1 = Table('x', m, y_copy) + t1 = Table("x", m, y_copy) is_true(y_copy.type._create_events) m.dispatch.before_create(t1, testing.db) - eq_(t1.c.y.type.evt_targets, (t1, )) + eq_(t1.c.y.type.evt_targets, (t1,)) def test_enum_nonnative_column_copy_transfers_constraintpref(self): m = MetaData() type_ = self.WrapEnum( - 'a', 'b', 'c', name='foo', - native_enum=False, create_constraint=False) - y = Column('y', type_) + "a", + "b", + "c", + name="foo", + native_enum=False, + create_constraint=False, + ) + y = Column("y", type_) y_copy = y.copy() - Table('x', m, y_copy) + Table("x", m, y_copy) is_false(y_copy.type.create_constraint) @@ -1895,9 +2039,9 @@ class SchemaTypeTest(fixtures.TestBase): m = MetaData() type_ = self.WrapBoolean() - y = Column('y', type_) + y = Column("y", type_) y_copy = y.copy() - t1 = Table('x', m, y_copy) + t1 = Table("x", m, y_copy) is_true(y_copy.type._create_events) @@ -1905,9 +2049,9 @@ class SchemaTypeTest(fixtures.TestBase): m = MetaData() type_ = self.WrapBoolean(create_constraint=False) - y = Column('y', type_) + y = Column("y", type_) y_copy = y.copy() - Table('x', m, y_copy) + Table("x", m, y_copy) is_false(y_copy.type.create_constraint) @@ -1915,7 +2059,7 @@ class SchemaTypeTest(fixtures.TestBase): m1 = MetaData() typ = self.MyType(metadata=m1) m1.dispatch.before_create(m1, testing.db) - eq_(typ.evt_targets, (m1, )) + eq_(typ.evt_targets, (m1,)) dialect_impl = typ.dialect_impl(testing.db.dialect) eq_(dialect_impl.evt_targets, ()) @@ -1924,24 +2068,24 @@ class SchemaTypeTest(fixtures.TestBase): m1 = MetaData() typ = self.MyTypeWImpl(metadata=m1) m1.dispatch.before_create(m1, testing.db) - eq_(typ.evt_targets, (m1, )) + eq_(typ.evt_targets, (m1,)) dialect_impl = typ.dialect_impl(testing.db.dialect) - eq_(dialect_impl.evt_targets, (m1, )) + eq_(dialect_impl.evt_targets, (m1,)) def test_table_dispatch_decorator_schematype(self): m1 = MetaData() typ = self.MyTypeDecAndSchema() - t1 = Table('t1', m1, Column('x', typ)) + t1 = Table("t1", m1, Column("x", typ)) m1.dispatch.before_create(t1, testing.db) - eq_(typ.evt_targets, (t1, )) + eq_(typ.evt_targets, (t1,)) def test_table_dispatch_no_new_impl(self): m1 = MetaData() typ = self.MyType() - t1 = Table('t1', m1, Column('x', typ)) + t1 = Table("t1", m1, Column("x", typ)) m1.dispatch.before_create(t1, testing.db) - eq_(typ.evt_targets, (t1, )) + eq_(typ.evt_targets, (t1,)) dialect_impl = typ.dialect_impl(testing.db.dialect) eq_(dialect_impl.evt_targets, ()) @@ -1949,12 +2093,12 @@ class SchemaTypeTest(fixtures.TestBase): def test_table_dispatch_new_impl(self): m1 = MetaData() typ = self.MyTypeWImpl() - t1 = Table('t1', m1, Column('x', typ)) + t1 = Table("t1", m1, Column("x", typ)) m1.dispatch.before_create(t1, testing.db) - eq_(typ.evt_targets, (t1, )) + eq_(typ.evt_targets, (t1,)) dialect_impl = typ.dialect_impl(testing.db.dialect) - eq_(dialect_impl.evt_targets, (t1, )) + eq_(dialect_impl.evt_targets, (t1,)) def test_create_metadata_bound_no_crash(self): m1 = MetaData() @@ -1965,127 +2109,103 @@ class SchemaTypeTest(fixtures.TestBase): def test_boolean_constraint_type_doesnt_double(self): m1 = MetaData() - t1 = Table('x', m1, Column("flag", Boolean())) + t1 = Table("x", m1, Column("flag", Boolean())) eq_( - len([ - c for c in t1.constraints - if isinstance(c, CheckConstraint)]), - 1 + len([c for c in t1.constraints if isinstance(c, CheckConstraint)]), + 1, ) m2 = MetaData() t2 = t1.tometadata(m2) eq_( - len([ - c for c in t2.constraints - if isinstance(c, CheckConstraint)]), - 1 + len([c for c in t2.constraints if isinstance(c, CheckConstraint)]), + 1, ) def test_enum_constraint_type_doesnt_double(self): m1 = MetaData() - t1 = Table('x', m1, Column("flag", Enum('a', 'b', 'c'))) + t1 = Table("x", m1, Column("flag", Enum("a", "b", "c"))) eq_( - len([ - c for c in t1.constraints - if isinstance(c, CheckConstraint)]), - 1 + len([c for c in t1.constraints if isinstance(c, CheckConstraint)]), + 1, ) m2 = MetaData() t2 = t1.tometadata(m2) eq_( - len([ - c for c in t2.constraints - if isinstance(c, CheckConstraint)]), - 1 + len([c for c in t2.constraints if isinstance(c, CheckConstraint)]), + 1, ) class SchemaTest(fixtures.TestBase, AssertsCompiledSQL): - def test_default_schema_metadata_fk(self): m = MetaData(schema="foo") - t1 = Table('t1', m, Column('x', Integer)) - t2 = Table('t2', m, Column('x', Integer, ForeignKey('t1.x'))) + t1 = Table("t1", m, Column("x", Integer)) + t2 = Table("t2", m, Column("x", Integer, ForeignKey("t1.x"))) assert t2.c.x.references(t1.c.x) def test_ad_hoc_schema_equiv_fk(self): m = MetaData() - t1 = Table('t1', m, Column('x', Integer), schema="foo") + t1 = Table("t1", m, Column("x", Integer), schema="foo") t2 = Table( - 't2', - m, - Column( - 'x', - Integer, - ForeignKey('t1.x')), - schema="foo") + "t2", m, Column("x", Integer, ForeignKey("t1.x")), schema="foo" + ) assert_raises( - exc.NoReferencedTableError, - lambda: t2.c.x.references(t1.c.x) + exc.NoReferencedTableError, lambda: t2.c.x.references(t1.c.x) ) def test_default_schema_metadata_fk_alt_remote(self): m = MetaData(schema="foo") - t1 = Table('t1', m, Column('x', Integer)) - t2 = Table('t2', m, Column('x', Integer, ForeignKey('t1.x')), - schema="bar") + t1 = Table("t1", m, Column("x", Integer)) + t2 = Table( + "t2", m, Column("x", Integer, ForeignKey("t1.x")), schema="bar" + ) assert t2.c.x.references(t1.c.x) def test_default_schema_metadata_fk_alt_local_raises(self): m = MetaData(schema="foo") - t1 = Table('t1', m, Column('x', Integer), schema="bar") - t2 = Table('t2', m, Column('x', Integer, ForeignKey('t1.x'))) + t1 = Table("t1", m, Column("x", Integer), schema="bar") + t2 = Table("t2", m, Column("x", Integer, ForeignKey("t1.x"))) assert_raises( - exc.NoReferencedTableError, - lambda: t2.c.x.references(t1.c.x) + exc.NoReferencedTableError, lambda: t2.c.x.references(t1.c.x) ) def test_default_schema_metadata_fk_alt_local(self): m = MetaData(schema="foo") - t1 = Table('t1', m, Column('x', Integer), schema="bar") - t2 = Table('t2', m, Column('x', Integer, ForeignKey('bar.t1.x'))) + t1 = Table("t1", m, Column("x", Integer), schema="bar") + t2 = Table("t2", m, Column("x", Integer, ForeignKey("bar.t1.x"))) assert t2.c.x.references(t1.c.x) def test_create_drop_schema(self): self.assert_compile( - schema.CreateSchema("sa_schema"), - "CREATE SCHEMA sa_schema" + schema.CreateSchema("sa_schema"), "CREATE SCHEMA sa_schema" ) self.assert_compile( - schema.DropSchema("sa_schema"), - "DROP SCHEMA sa_schema" + schema.DropSchema("sa_schema"), "DROP SCHEMA sa_schema" ) self.assert_compile( schema.DropSchema("sa_schema", cascade=True), - "DROP SCHEMA sa_schema CASCADE" + "DROP SCHEMA sa_schema CASCADE", ) def test_iteration(self): metadata = MetaData() table1 = Table( - 'table1', + "table1", metadata, - Column( - 'col1', - Integer, - primary_key=True), - schema='someschema') + Column("col1", Integer, primary_key=True), + schema="someschema", + ) table2 = Table( - 'table2', + "table2", metadata, - Column( - 'col1', - Integer, - primary_key=True), - Column( - 'col2', - Integer, - ForeignKey('someschema.table1.col1')), - schema='someschema') + Column("col1", Integer, primary_key=True), + Column("col2", Integer, ForeignKey("someschema.table1.col1")), + schema="someschema", + ) t1 = str(schema.CreateTable(table1).compile(bind=testing.db)) t2 = str(schema.CreateTable(table2).compile(bind=testing.db)) @@ -2098,16 +2218,18 @@ class SchemaTest(fixtures.TestBase, AssertsCompiledSQL): class UseExistingTest(fixtures.TablesTest): - @classmethod def define_tables(cls, metadata): - Table('users', metadata, - Column('id', Integer, primary_key=True), - Column('name', String(30))) + Table( + "users", + metadata, + Column("id", Integer, primary_key=True), + Column("name", String(30)), + ) def _useexisting_fixture(self): meta2 = MetaData(testing.db) - Table('users', meta2, autoload=True) + Table("users", meta2, autoload=True) return meta2 def _notexisting_fixture(self): @@ -2117,21 +2239,23 @@ class UseExistingTest(fixtures.TablesTest): meta2 = self._useexisting_fixture() def go(): - Table('users', meta2, Column('name', - Unicode), autoload=True) + Table("users", meta2, Column("name", Unicode), autoload=True) + assert_raises_message( exc.InvalidRequestError, - "Table 'users' is already defined for this " - "MetaData instance.", - go + "Table 'users' is already defined for this " "MetaData instance.", + go, ) def test_keep_plus_existing_raises(self): meta2 = self._useexisting_fixture() assert_raises( exc.ArgumentError, - Table, 'users', meta2, keep_existing=True, - extend_existing=True + Table, + "users", + meta2, + keep_existing=True, + extend_existing=True, ) @testing.uses_deprecated() @@ -2139,118 +2263,152 @@ class UseExistingTest(fixtures.TablesTest): meta2 = self._useexisting_fixture() assert_raises( exc.ArgumentError, - Table, 'users', meta2, useexisting=True, - extend_existing=True + Table, + "users", + meta2, + useexisting=True, + extend_existing=True, ) def test_keep_existing_no_dupe_constraints(self): meta2 = self._notexisting_fixture() - users = Table('users', meta2, - Column('id', Integer), - Column('name', Unicode), - UniqueConstraint('name'), - keep_existing=True - ) - assert 'name' in users.c - assert 'id' in users.c + users = Table( + "users", + meta2, + Column("id", Integer), + Column("name", Unicode), + UniqueConstraint("name"), + keep_existing=True, + ) + assert "name" in users.c + assert "id" in users.c eq_(len(users.constraints), 2) - u2 = Table('users', meta2, - Column('id', Integer), - Column('name', Unicode), - UniqueConstraint('name'), - keep_existing=True - ) + u2 = Table( + "users", + meta2, + Column("id", Integer), + Column("name", Unicode), + UniqueConstraint("name"), + keep_existing=True, + ) eq_(len(u2.constraints), 2) def test_extend_existing_dupes_constraints(self): meta2 = self._notexisting_fixture() - users = Table('users', meta2, - Column('id', Integer), - Column('name', Unicode), - UniqueConstraint('name'), - extend_existing=True - ) - assert 'name' in users.c - assert 'id' in users.c + users = Table( + "users", + meta2, + Column("id", Integer), + Column("name", Unicode), + UniqueConstraint("name"), + extend_existing=True, + ) + assert "name" in users.c + assert "id" in users.c eq_(len(users.constraints), 2) - u2 = Table('users', meta2, - Column('id', Integer), - Column('name', Unicode), - UniqueConstraint('name'), - extend_existing=True - ) + u2 = Table( + "users", + meta2, + Column("id", Integer), + Column("name", Unicode), + UniqueConstraint("name"), + extend_existing=True, + ) # constraint got duped eq_(len(u2.constraints), 3) def test_keep_existing_coltype(self): meta2 = self._useexisting_fixture() - users = Table('users', meta2, Column('name', Unicode), - autoload=True, keep_existing=True) + users = Table( + "users", + meta2, + Column("name", Unicode), + autoload=True, + keep_existing=True, + ) assert not isinstance(users.c.name.type, Unicode) def test_keep_existing_quote(self): meta2 = self._useexisting_fixture() - users = Table('users', meta2, quote=True, autoload=True, - keep_existing=True) + users = Table( + "users", meta2, quote=True, autoload=True, keep_existing=True + ) assert not users.name.quote def test_keep_existing_add_column(self): meta2 = self._useexisting_fixture() - users = Table('users', meta2, - Column('foo', Integer), - autoload=True, - keep_existing=True) + users = Table( + "users", + meta2, + Column("foo", Integer), + autoload=True, + keep_existing=True, + ) assert "foo" not in users.c def test_keep_existing_coltype_no_orig(self): meta2 = self._notexisting_fixture() - users = Table('users', meta2, Column('name', Unicode), - autoload=True, keep_existing=True) + users = Table( + "users", + meta2, + Column("name", Unicode), + autoload=True, + keep_existing=True, + ) assert isinstance(users.c.name.type, Unicode) @testing.skip_if( lambda: testing.db.dialect.requires_name_normalize, - "test depends on lowercase as case insensitive") + "test depends on lowercase as case insensitive", + ) def test_keep_existing_quote_no_orig(self): meta2 = self._notexisting_fixture() - users = Table('users', meta2, quote=True, - autoload=True, - keep_existing=True) + users = Table( + "users", meta2, quote=True, autoload=True, keep_existing=True + ) assert users.name.quote def test_keep_existing_add_column_no_orig(self): meta2 = self._notexisting_fixture() - users = Table('users', meta2, - Column('foo', Integer), - autoload=True, - keep_existing=True) + users = Table( + "users", + meta2, + Column("foo", Integer), + autoload=True, + keep_existing=True, + ) assert "foo" in users.c def test_keep_existing_coltype_no_reflection(self): meta2 = self._useexisting_fixture() - users = Table('users', meta2, Column('name', Unicode), - keep_existing=True) + users = Table( + "users", meta2, Column("name", Unicode), keep_existing=True + ) assert not isinstance(users.c.name.type, Unicode) def test_keep_existing_quote_no_reflection(self): meta2 = self._useexisting_fixture() - users = Table('users', meta2, quote=True, - keep_existing=True) + users = Table("users", meta2, quote=True, keep_existing=True) assert not users.name.quote def test_keep_existing_add_column_no_reflection(self): meta2 = self._useexisting_fixture() - users = Table('users', meta2, - Column('foo', Integer), - keep_existing=True) + users = Table( + "users", meta2, Column("foo", Integer), keep_existing=True + ) assert "foo" not in users.c def test_extend_existing_coltype(self): meta2 = self._useexisting_fixture() - users = Table('users', meta2, Column('name', Unicode), - autoload=True, extend_existing=True) + users = Table( + "users", + meta2, + Column("name", Unicode), + autoload=True, + extend_existing=True, + ) assert isinstance(users.c.name.type, Unicode) def test_extend_existing_quote(self): @@ -2258,46 +2416,63 @@ class UseExistingTest(fixtures.TablesTest): assert_raises_message( tsa.exc.ArgumentError, "Can't redefine 'quote' or 'quote_schema' arguments", - Table, 'users', meta2, quote=True, autoload=True, - extend_existing=True + Table, + "users", + meta2, + quote=True, + autoload=True, + extend_existing=True, ) def test_extend_existing_add_column(self): meta2 = self._useexisting_fixture() - users = Table('users', meta2, - Column('foo', Integer), - autoload=True, - extend_existing=True) + users = Table( + "users", + meta2, + Column("foo", Integer), + autoload=True, + extend_existing=True, + ) assert "foo" in users.c def test_extend_existing_coltype_no_orig(self): meta2 = self._notexisting_fixture() - users = Table('users', meta2, Column('name', Unicode), - autoload=True, extend_existing=True) + users = Table( + "users", + meta2, + Column("name", Unicode), + autoload=True, + extend_existing=True, + ) assert isinstance(users.c.name.type, Unicode) @testing.skip_if( lambda: testing.db.dialect.requires_name_normalize, - "test depends on lowercase as case insensitive") + "test depends on lowercase as case insensitive", + ) def test_extend_existing_quote_no_orig(self): meta2 = self._notexisting_fixture() - users = Table('users', meta2, quote=True, - autoload=True, - extend_existing=True) + users = Table( + "users", meta2, quote=True, autoload=True, extend_existing=True + ) assert users.name.quote def test_extend_existing_add_column_no_orig(self): meta2 = self._notexisting_fixture() - users = Table('users', meta2, - Column('foo', Integer), - autoload=True, - extend_existing=True) + users = Table( + "users", + meta2, + Column("foo", Integer), + autoload=True, + extend_existing=True, + ) assert "foo" in users.c def test_extend_existing_coltype_no_reflection(self): meta2 = self._useexisting_fixture() - users = Table('users', meta2, Column('name', Unicode), - extend_existing=True) + users = Table( + "users", meta2, Column("name", Unicode), extend_existing=True + ) assert isinstance(users.c.name.type, Unicode) def test_extend_existing_quote_no_reflection(self): @@ -2305,35 +2480,30 @@ class UseExistingTest(fixtures.TablesTest): assert_raises_message( tsa.exc.ArgumentError, "Can't redefine 'quote' or 'quote_schema' arguments", - Table, 'users', meta2, quote=True, - extend_existing=True + Table, + "users", + meta2, + quote=True, + extend_existing=True, ) def test_extend_existing_add_column_no_reflection(self): meta2 = self._useexisting_fixture() - users = Table('users', meta2, - Column('foo', Integer), - extend_existing=True) + users = Table( + "users", meta2, Column("foo", Integer), extend_existing=True + ) assert "foo" in users.c class ConstraintTest(fixtures.TestBase): - def _single_fixture(self): m = MetaData() - t1 = Table('t1', m, - Column('a', Integer), - Column('b', Integer) - ) + t1 = Table("t1", m, Column("a", Integer), Column("b", Integer)) - t2 = Table('t2', m, - Column('a', Integer, ForeignKey('t1.a')) - ) + t2 = Table("t2", m, Column("a", Integer, ForeignKey("t1.a"))) - t3 = Table('t3', m, - Column('a', Integer) - ) + t3 = Table("t3", m, Column("a", Integer)) return t1, t2, t3 def _assert_index_col_x(self, t, i, columns=True): @@ -2346,106 +2516,108 @@ class ConstraintTest(fixtures.TestBase): def test_separate_decl_columns(self): m = MetaData() - t = Table('t', m, Column('x', Integer)) - i = Index('i', t.c.x) + t = Table("t", m, Column("x", Integer)) + i = Index("i", t.c.x) self._assert_index_col_x(t, i) def test_separate_decl_columns_functional(self): m = MetaData() - t = Table('t', m, Column('x', Integer)) - i = Index('i', func.foo(t.c.x)) + t = Table("t", m, Column("x", Integer)) + i = Index("i", func.foo(t.c.x)) self._assert_index_col_x(t, i) def test_index_no_cols_private_table_arg(self): m = MetaData() - t = Table('t', m, Column('x', Integer)) - i = Index('i', _table=t) + t = Table("t", m, Column("x", Integer)) + i = Index("i", _table=t) is_(i.table, t) eq_(list(i.columns), []) def test_index_w_cols_private_table_arg(self): m = MetaData() - t = Table('t', m, Column('x', Integer)) - i = Index('i', t.c.x, _table=t) + t = Table("t", m, Column("x", Integer)) + i = Index("i", t.c.x, _table=t) is_(i.table, t) eq_(i.columns, [t.c.x]) def test_inline_decl_columns(self): m = MetaData() - c = Column('x', Integer) - i = Index('i', c) - t = Table('t', m, c, i) + c = Column("x", Integer) + i = Index("i", c) + t = Table("t", m, c, i) self._assert_index_col_x(t, i) def test_inline_decl_columns_functional(self): m = MetaData() - c = Column('x', Integer) - i = Index('i', func.foo(c)) - t = Table('t', m, c, i) + c = Column("x", Integer) + i = Index("i", func.foo(c)) + t = Table("t", m, c, i) self._assert_index_col_x(t, i) def test_inline_decl_string(self): m = MetaData() - i = Index('i', "x") - t = Table('t', m, Column('x', Integer), i) + i = Index("i", "x") + t = Table("t", m, Column("x", Integer), i) self._assert_index_col_x(t, i) def test_inline_decl_textonly(self): m = MetaData() - i = Index('i', text("foobar(x)")) - t = Table('t', m, Column('x', Integer), i) + i = Index("i", text("foobar(x)")) + t = Table("t", m, Column("x", Integer), i) self._assert_index_col_x(t, i, columns=False) def test_separate_decl_textonly(self): m = MetaData() - i = Index('i', text("foobar(x)")) - t = Table('t', m, Column('x', Integer)) + i = Index("i", text("foobar(x)")) + t = Table("t", m, Column("x", Integer)) t.append_constraint(i) self._assert_index_col_x(t, i, columns=False) def test_unnamed_column_exception(self): # this can occur in some declarative situations c = Column(Integer) - idx = Index('q', c) + idx = Index("q", c) m = MetaData() - t = Table('t', m, Column('q')) + t = Table("t", m, Column("q")) assert_raises_message( exc.ArgumentError, "Can't add unnamed column to column collection", - t.append_constraint, idx + t.append_constraint, + idx, ) def test_column_associated_w_lowercase_table(self): from sqlalchemy import table - c = Column('x', Integer) - table('foo', c) - idx = Index('q', c) + + c = Column("x", Integer) + table("foo", c) + idx = Index("q", c) is_(idx.table, None) # lower-case-T table doesn't have indexes def test_clauseelement_extraction_one(self): - t = Table('t', MetaData(), Column('x', Integer), Column('y', Integer)) + t = Table("t", MetaData(), Column("x", Integer), Column("y", Integer)) class MyThing(object): def __clause_element__(self): return t.c.x + 5 - idx = Index('foo', MyThing()) + idx = Index("foo", MyThing()) self._assert_index_col_x(t, idx) def test_clauseelement_extraction_two(self): - t = Table('t', MetaData(), Column('x', Integer), Column('y', Integer)) + t = Table("t", MetaData(), Column("x", Integer), Column("y", Integer)) class MyThing(object): def __clause_element__(self): return t.c.x + 5 - idx = Index('bar', MyThing(), t.c.y) + idx = Index("bar", MyThing(), t.c.y) eq_(set(t.indexes), set([idx])) def test_clauseelement_extraction_three(self): - t = Table('t', MetaData(), Column('x', Integer), Column('y', Integer)) + t = Table("t", MetaData(), Column("x", Integer), Column("y", Integer)) expr1 = t.c.x + 5 @@ -2453,7 +2625,7 @@ class ConstraintTest(fixtures.TestBase): def __clause_element__(self): return expr1 - idx = Index('bar', MyThing(), t.c.y) + idx = Index("bar", MyThing(), t.c.y) is_(idx.expressions[0], expr1) is_(idx.expressions[1], t.c.y) @@ -2493,59 +2665,52 @@ class ConstraintTest(fixtures.TestBase): is_(fkc.referred_table, t1) def test_referred_table_accessor_not_available(self): - t1 = Table('t', MetaData(), Column('x', ForeignKey('q.id'))) + t1 = Table("t", MetaData(), Column("x", ForeignKey("q.id"))) fkc = list(t1.foreign_key_constraints)[0] assert_raises_message( exc.InvalidRequestError, "Foreign key associated with column 't.x' could not find " "table 'q' with which to generate a foreign key to target " "column 'id'", - getattr, fkc, "referred_table" + getattr, + fkc, + "referred_table", ) def test_related_column_not_present_atfirst_ok(self): m = MetaData() - base_table = Table("base", m, - Column("id", Integer, primary_key=True) - ) - fk = ForeignKey('base.q') - derived_table = Table("derived", m, - Column("id", None, fk, - primary_key=True), - ) - - base_table.append_column(Column('q', Integer)) + base_table = Table("base", m, Column("id", Integer, primary_key=True)) + fk = ForeignKey("base.q") + derived_table = Table( + "derived", m, Column("id", None, fk, primary_key=True) + ) + + base_table.append_column(Column("q", Integer)) assert fk.column is base_table.c.q assert isinstance(derived_table.c.id.type, Integer) def test_related_column_not_present_atfirst_ok_onname(self): m = MetaData() - base_table = Table("base", m, - Column("id", Integer, primary_key=True) - ) - fk = ForeignKey('base.q', link_to_name=True) - derived_table = Table("derived", m, - Column("id", None, fk, - primary_key=True), - ) - - base_table.append_column(Column('q', Integer, key='zz')) + base_table = Table("base", m, Column("id", Integer, primary_key=True)) + fk = ForeignKey("base.q", link_to_name=True) + derived_table = Table( + "derived", m, Column("id", None, fk, primary_key=True) + ) + + base_table.append_column(Column("q", Integer, key="zz")) assert fk.column is base_table.c.zz assert isinstance(derived_table.c.id.type, Integer) def test_related_column_not_present_atfirst_ok_linktoname_conflict(self): m = MetaData() - base_table = Table("base", m, - Column("id", Integer, primary_key=True) - ) - fk = ForeignKey('base.q', link_to_name=True) - derived_table = Table("derived", m, - Column("id", None, fk, - primary_key=True), - ) - - base_table.append_column(Column('zz', Integer, key='q')) - base_table.append_column(Column('q', Integer, key='zz')) + base_table = Table("base", m, Column("id", Integer, primary_key=True)) + fk = ForeignKey("base.q", link_to_name=True) + derived_table = Table( + "derived", m, Column("id", None, fk, primary_key=True) + ) + + base_table.append_column(Column("zz", Integer, key="q")) + base_table.append_column(Column("q", Integer, key="zz")) assert fk.column is base_table.c.zz assert isinstance(derived_table.c.id.type, Integer) @@ -2557,134 +2722,136 @@ class ConstraintTest(fixtures.TestBase): r"ForeignKeyConstraint on t1\(x, y\) refers to " "multiple remote tables: t2 and t3", Table, - 't1', m, Column('x', Integer), Column('y', Integer), - ForeignKeyConstraint(['x', 'y'], ['t2.x', 't3.y']) + "t1", + m, + Column("x", Integer), + Column("y", Integer), + ForeignKeyConstraint(["x", "y"], ["t2.x", "t3.y"]), ) def test_invalid_composite_fk_check_columns(self): m = MetaData() - t2 = Table('t2', m, Column('x', Integer)) - t3 = Table('t3', m, Column('y', Integer)) + t2 = Table("t2", m, Column("x", Integer)) + t3 = Table("t3", m, Column("y", Integer)) assert_raises_message( exc.ArgumentError, r"ForeignKeyConstraint on t1\(x, y\) refers to " "multiple remote tables: t2 and t3", Table, - 't1', m, Column('x', Integer), Column('y', Integer), - ForeignKeyConstraint(['x', 'y'], [t2.c.x, t3.c.y]) + "t1", + m, + Column("x", Integer), + Column("y", Integer), + ForeignKeyConstraint(["x", "y"], [t2.c.x, t3.c.y]), ) def test_invalid_composite_fk_check_columns_notattached(self): m = MetaData() - x = Column('x', Integer) - y = Column('y', Integer) + x = Column("x", Integer) + y = Column("y", Integer) # no error is raised for this one right now. # which is a minor bug. - Table('t1', m, Column('x', Integer), Column('y', Integer), - ForeignKeyConstraint(['x', 'y'], [x, y]) - ) + Table( + "t1", + m, + Column("x", Integer), + Column("y", Integer), + ForeignKeyConstraint(["x", "y"], [x, y]), + ) - Table('t2', m, x) - Table('t3', m, y) + Table("t2", m, x) + Table("t3", m, y) def test_constraint_copied_to_proxy_ok(self): m = MetaData() - Table('t1', m, Column('id', Integer, primary_key=True)) - t2 = Table('t2', m, Column('id', Integer, ForeignKey('t1.id'), - primary_key=True)) + Table("t1", m, Column("id", Integer, primary_key=True)) + t2 = Table( + "t2", + m, + Column("id", Integer, ForeignKey("t1.id"), primary_key=True), + ) s = tsa.select([t2]) t2fk = list(t2.c.id.foreign_keys)[0] sfk = list(s.c.id.foreign_keys)[0] # the two FKs share the ForeignKeyConstraint - is_( - t2fk.constraint, - sfk.constraint - ) + is_(t2fk.constraint, sfk.constraint) # but the ForeignKeyConstraint isn't # aware of the select's FK - eq_( - t2fk.constraint.elements, - [t2fk] - ) + eq_(t2fk.constraint.elements, [t2fk]) def test_type_propagate_composite_fk_string(self): metadata = MetaData() Table( - 'a', metadata, - Column('key1', Integer, primary_key=True), - Column('key2', String(40), primary_key=True)) - - b = Table('b', metadata, - Column('a_key1', None), - Column('a_key2', None), - Column('id', Integer, primary_key=True), - ForeignKeyConstraint(['a_key1', 'a_key2'], - ['a.key1', 'a.key2']) - ) + "a", + metadata, + Column("key1", Integer, primary_key=True), + Column("key2", String(40), primary_key=True), + ) + + b = Table( + "b", + metadata, + Column("a_key1", None), + Column("a_key2", None), + Column("id", Integer, primary_key=True), + ForeignKeyConstraint(["a_key1", "a_key2"], ["a.key1", "a.key2"]), + ) assert isinstance(b.c.a_key1.type, Integer) assert isinstance(b.c.a_key2.type, String) def test_type_propagate_composite_fk_col(self): metadata = MetaData() - a = Table('a', metadata, - Column('key1', Integer, primary_key=True), - Column('key2', String(40), primary_key=True)) - - b = Table('b', metadata, - Column('a_key1', None), - Column('a_key2', None), - Column('id', Integer, primary_key=True), - ForeignKeyConstraint(['a_key1', 'a_key2'], - [a.c.key1, a.c.key2]) - ) + a = Table( + "a", + metadata, + Column("key1", Integer, primary_key=True), + Column("key2", String(40), primary_key=True), + ) + + b = Table( + "b", + metadata, + Column("a_key1", None), + Column("a_key2", None), + Column("id", Integer, primary_key=True), + ForeignKeyConstraint(["a_key1", "a_key2"], [a.c.key1, a.c.key2]), + ) assert isinstance(b.c.a_key1.type, Integer) assert isinstance(b.c.a_key2.type, String) def test_type_propagate_standalone_fk_string(self): metadata = MetaData() - Table( - 'a', metadata, - Column('key1', Integer, primary_key=True)) + Table("a", metadata, Column("key1", Integer, primary_key=True)) - b = Table('b', metadata, - Column('a_key1', None, ForeignKey("a.key1")), - ) + b = Table("b", metadata, Column("a_key1", None, ForeignKey("a.key1"))) assert isinstance(b.c.a_key1.type, Integer) def test_type_propagate_standalone_fk_col(self): metadata = MetaData() - a = Table('a', metadata, - Column('key1', Integer, primary_key=True)) + a = Table("a", metadata, Column("key1", Integer, primary_key=True)) - b = Table('b', metadata, - Column('a_key1', None, ForeignKey(a.c.key1)), - ) + b = Table("b", metadata, Column("a_key1", None, ForeignKey(a.c.key1))) assert isinstance(b.c.a_key1.type, Integer) def test_type_propagate_chained_string_source_first(self): metadata = MetaData() - Table( - 'a', metadata, - Column('key1', Integer, primary_key=True) - ) + Table("a", metadata, Column("key1", Integer, primary_key=True)) - b = Table('b', metadata, - Column('a_key1', None, ForeignKey("a.key1")), - ) + b = Table("b", metadata, Column("a_key1", None, ForeignKey("a.key1"))) - c = Table('c', metadata, - Column('b_key1', None, ForeignKey("b.a_key1")), - ) + c = Table( + "c", metadata, Column("b_key1", None, ForeignKey("b.a_key1")) + ) assert isinstance(b.c.a_key1.type, Integer) assert isinstance(c.c.b_key1.type, Integer) @@ -2692,17 +2859,13 @@ class ConstraintTest(fixtures.TestBase): def test_type_propagate_chained_string_source_last(self): metadata = MetaData() - b = Table('b', metadata, - Column('a_key1', None, ForeignKey("a.key1")), - ) + b = Table("b", metadata, Column("a_key1", None, ForeignKey("a.key1"))) - c = Table('c', metadata, - Column('b_key1', None, ForeignKey("b.a_key1")), - ) + c = Table( + "c", metadata, Column("b_key1", None, ForeignKey("b.a_key1")) + ) - Table( - 'a', metadata, - Column('key1', Integer, primary_key=True)) + Table("a", metadata, Column("key1", Integer, primary_key=True)) assert isinstance(b.c.a_key1.type, Integer) assert isinstance(c.c.b_key1.type, Integer) @@ -2710,21 +2873,31 @@ class ConstraintTest(fixtures.TestBase): def test_type_propagate_chained_string_source_last_onname(self): metadata = MetaData() - b = Table('b', metadata, - Column( - 'a_key1', None, - ForeignKey("a.key1", link_to_name=True), key="ak1"), - ) + b = Table( + "b", + metadata, + Column( + "a_key1", + None, + ForeignKey("a.key1", link_to_name=True), + key="ak1", + ), + ) - c = Table('c', metadata, - Column( - 'b_key1', None, - ForeignKey("b.a_key1", link_to_name=True), key="bk1"), - ) + c = Table( + "c", + metadata, + Column( + "b_key1", + None, + ForeignKey("b.a_key1", link_to_name=True), + key="bk1", + ), + ) Table( - 'a', metadata, - Column('key1', Integer, primary_key=True, key='ak1')) + "a", metadata, Column("key1", Integer, primary_key=True, key="ak1") + ) assert isinstance(b.c.ak1.type, Integer) assert isinstance(c.c.bk1.type, Integer) @@ -2732,34 +2905,41 @@ class ConstraintTest(fixtures.TestBase): def test_type_propagate_chained_string_source_last_onname_conflict(self): metadata = MetaData() - b = Table('b', metadata, - # b.c.key1 -> a.c.key1 -> String - Column( - 'ak1', None, - ForeignKey("a.key1", link_to_name=False), key="key1"), - # b.c.ak1 -> a.c.ak1 -> Integer - Column( - 'a_key1', None, - ForeignKey("a.key1", link_to_name=True), key="ak1"), - ) - - c = Table('c', metadata, - # c.c.b_key1 -> b.c.ak1 -> Integer - Column( - 'b_key1', None, - ForeignKey("b.ak1", link_to_name=False)), - # c.c.b_ak1 -> b.c.ak1 - Column( - 'b_ak1', None, - ForeignKey("b.ak1", link_to_name=True)), - ) + b = Table( + "b", + metadata, + # b.c.key1 -> a.c.key1 -> String + Column( + "ak1", + None, + ForeignKey("a.key1", link_to_name=False), + key="key1", + ), + # b.c.ak1 -> a.c.ak1 -> Integer + Column( + "a_key1", + None, + ForeignKey("a.key1", link_to_name=True), + key="ak1", + ), + ) + + c = Table( + "c", + metadata, + # c.c.b_key1 -> b.c.ak1 -> Integer + Column("b_key1", None, ForeignKey("b.ak1", link_to_name=False)), + # c.c.b_ak1 -> b.c.ak1 + Column("b_ak1", None, ForeignKey("b.ak1", link_to_name=True)), + ) Table( - 'a', metadata, + "a", + metadata, # a.c.key1 - Column('ak1', String, key="key1"), + Column("ak1", String, key="key1"), # a.c.ak1 - Column('key1', Integer, primary_key=True, key='ak1'), + Column("key1", Integer, primary_key=True, key="ak1"), ) assert isinstance(b.c.key1.type, String) @@ -2770,30 +2950,26 @@ class ConstraintTest(fixtures.TestBase): def test_type_propagate_chained_col_orig_first(self): metadata = MetaData() - a = Table('a', metadata, - Column('key1', Integer, primary_key=True)) + a = Table("a", metadata, Column("key1", Integer, primary_key=True)) - b = Table('b', metadata, - Column('a_key1', None, ForeignKey(a.c.key1)), - ) + b = Table("b", metadata, Column("a_key1", None, ForeignKey(a.c.key1))) - c = Table('c', metadata, - Column('b_key1', None, ForeignKey(b.c.a_key1)), - ) + c = Table( + "c", metadata, Column("b_key1", None, ForeignKey(b.c.a_key1)) + ) assert isinstance(b.c.a_key1.type, Integer) assert isinstance(c.c.b_key1.type, Integer) def test_column_accessor_col(self): - c1 = Column('x', Integer) + c1 = Column("x", Integer) fk = ForeignKey(c1) is_(fk.column, c1) def test_column_accessor_clause_element(self): - c1 = Column('x', Integer) + c1 = Column("x", Integer) class CThing(object): - def __init__(self, c): self.c = c @@ -2809,50 +2985,58 @@ class ConstraintTest(fixtures.TestBase): exc.InvalidRequestError, "this ForeignKey object does not yet have a parent " "Column associated with it.", - getattr, fk, "column" + getattr, + fk, + "column", ) def test_column_accessor_string_no_parent_table(self): fk = ForeignKey("sometable.somecol") - Column('x', fk) + Column("x", fk) assert_raises_message( exc.InvalidRequestError, "this ForeignKey's parent column is not yet " "associated with a Table.", - getattr, fk, "column" + getattr, + fk, + "column", ) def test_column_accessor_string_no_target_table(self): fk = ForeignKey("sometable.somecol") - c1 = Column('x', fk) - Table('t', MetaData(), c1) + c1 = Column("x", fk) + Table("t", MetaData(), c1) assert_raises_message( exc.NoReferencedTableError, "Foreign key associated with column 't.x' could not find " "table 'sometable' with which to generate a " "foreign key to target column 'somecol'", - getattr, fk, "column" + getattr, + fk, + "column", ) def test_column_accessor_string_no_target_column(self): fk = ForeignKey("sometable.somecol") - c1 = Column('x', fk) + c1 = Column("x", fk) m = MetaData() - Table('t', m, c1) - Table("sometable", m, Column('notsomecol', Integer)) + Table("t", m, c1) + Table("sometable", m, Column("notsomecol", Integer)) assert_raises_message( exc.NoReferencedColumnError, "Could not initialize target column for ForeignKey " "'sometable.somecol' on table 't': " "table 'sometable' has no column named 'somecol'", - getattr, fk, "column" + getattr, + fk, + "column", ) def test_remove_table_fk_bookkeeping(self): metadata = MetaData() - fk = ForeignKey('t1.x') - t2 = Table('t2', metadata, Column('y', Integer, fk)) - t3 = Table('t3', metadata, Column('y', Integer, ForeignKey('t1.x'))) + fk = ForeignKey("t1.x") + t2 = Table("t2", metadata, Column("y", Integer, fk)) + t3 = Table("t3", metadata, Column("y", Integer, ForeignKey("t1.x"))) assert t2.key in metadata.tables assert ("t1", "x") in metadata._fk_memos @@ -2869,13 +3053,15 @@ class ConstraintTest(fixtures.TestBase): assert fk not in metadata._fk_memos[("t1", "x")] # make the referenced table - t1 = Table('t1', metadata, Column('x', Integer)) + t1 = Table("t1", metadata, Column("x", Integer)) # t2 tells us exactly what's wrong assert_raises_message( exc.InvalidRequestError, "Table t2 is no longer associated with its parent MetaData", - getattr, fk, "column" + getattr, + fk, + "column", ) # t3 is unaffected @@ -2885,63 +3071,51 @@ class ConstraintTest(fixtures.TestBase): metadata.remove(t2) def test_double_fk_usage_raises(self): - f = ForeignKey('b.id') + f = ForeignKey("b.id") - Column('x', Integer, f) + Column("x", Integer, f) assert_raises(exc.InvalidRequestError, Column, "y", Integer, f) def test_auto_append_constraint(self): m = MetaData() - t = Table('tbl', m, - Column('a', Integer), - Column('b', Integer) - ) + t = Table("tbl", m, Column("a", Integer), Column("b", Integer)) - t2 = Table('t2', m, - Column('a', Integer), - Column('b', Integer) - ) + t2 = Table("t2", m, Column("a", Integer), Column("b", Integer)) for c in ( UniqueConstraint(t.c.a), CheckConstraint(t.c.a > 5), ForeignKeyConstraint([t.c.a], [t2.c.a]), - PrimaryKeyConstraint(t.c.a) + PrimaryKeyConstraint(t.c.a), ): assert c in t.constraints t.append_constraint(c) assert c in t.constraints - c = Index('foo', t.c.a) + c = Index("foo", t.c.a) assert c in t.indexes def test_auto_append_lowercase_table(self): from sqlalchemy import table, column - t = table('t', column('a')) - t2 = table('t2', column('a')) + t = table("t", column("a")) + t2 = table("t2", column("a")) for c in ( UniqueConstraint(t.c.a), CheckConstraint(t.c.a > 5), ForeignKeyConstraint([t.c.a], [t2.c.a]), PrimaryKeyConstraint(t.c.a), - Index('foo', t.c.a) + Index("foo", t.c.a), ): assert True def test_tometadata_ok(self): m = MetaData() - t = Table('tbl', m, - Column('a', Integer), - Column('b', Integer) - ) + t = Table("tbl", m, Column("a", Integer), Column("b", Integer)) - t2 = Table('t2', m, - Column('a', Integer), - Column('b', Integer) - ) + t2 = Table("t2", m, Column("a", Integer), Column("b", Integer)) UniqueConstraint(t.c.a) CheckConstraint(t.c.a > 5) @@ -2959,10 +3133,7 @@ class ConstraintTest(fixtures.TestBase): def test_check_constraint_copy(self): m = MetaData() - t = Table('tbl', m, - Column('a', Integer), - Column('b', Integer) - ) + t = Table("tbl", m, Column("a", Integer), Column("b", Integer)) ck = CheckConstraint(t.c.a > 5) ck2 = ck.copy() assert ck in t.constraints @@ -2971,15 +3142,9 @@ class ConstraintTest(fixtures.TestBase): def test_ambig_check_constraint_auto_append(self): m = MetaData() - t = Table('tbl', m, - Column('a', Integer), - Column('b', Integer) - ) + t = Table("tbl", m, Column("a", Integer), Column("b", Integer)) - t2 = Table('t2', m, - Column('a', Integer), - Column('b', Integer) - ) + t2 = Table("t2", m, Column("a", Integer), Column("b", Integer)) c = CheckConstraint(t.c.a > t2.c.b) assert c not in t.constraints assert c not in t2.constraints @@ -2987,22 +3152,22 @@ class ConstraintTest(fixtures.TestBase): def test_auto_append_ck_on_col_attach_one(self): m = MetaData() - a = Column('a', Integer) - b = Column('b', Integer) + a = Column("a", Integer) + b = Column("b", Integer) ck = CheckConstraint(a > b) - t = Table('tbl', m, a, b) + t = Table("tbl", m, a, b) assert ck in t.constraints def test_auto_append_ck_on_col_attach_two(self): m = MetaData() - a = Column('a', Integer) - b = Column('b', Integer) - c = Column('c', Integer) + a = Column("a", Integer) + b = Column("b", Integer) + c = Column("c", Integer) ck = CheckConstraint(a > b + c) - t = Table('tbl', m, a) + t = Table("tbl", m, a) assert ck not in t.constraints t.append_column(b) @@ -3014,18 +3179,18 @@ class ConstraintTest(fixtures.TestBase): def test_auto_append_ck_on_col_attach_three(self): m = MetaData() - a = Column('a', Integer) - b = Column('b', Integer) - c = Column('c', Integer) + a = Column("a", Integer) + b = Column("b", Integer) + c = Column("c", Integer) ck = CheckConstraint(a > b + c) - t = Table('tbl', m, a) + t = Table("tbl", m, a) assert ck not in t.constraints t.append_column(b) assert ck not in t.constraints - t2 = Table('t2', m) + t2 = Table("t2", m) t2.append_column(c) # two different tables, so CheckConstraint does nothing. @@ -3034,22 +3199,22 @@ class ConstraintTest(fixtures.TestBase): def test_auto_append_uq_on_col_attach_one(self): m = MetaData() - a = Column('a', Integer) - b = Column('b', Integer) + a = Column("a", Integer) + b = Column("b", Integer) uq = UniqueConstraint(a, b) - t = Table('tbl', m, a, b) + t = Table("tbl", m, a, b) assert uq in t.constraints def test_auto_append_uq_on_col_attach_two(self): m = MetaData() - a = Column('a', Integer) - b = Column('b', Integer) - c = Column('c', Integer) + a = Column("a", Integer) + b = Column("b", Integer) + c = Column("c", Integer) uq = UniqueConstraint(a, b, c) - t = Table('tbl', m, a) + t = Table("tbl", m, a) assert uq not in t.constraints t.append_column(b) @@ -3061,24 +3226,25 @@ class ConstraintTest(fixtures.TestBase): def test_auto_append_uq_on_col_attach_three(self): m = MetaData() - a = Column('a', Integer) - b = Column('b', Integer) - c = Column('c', Integer) + a = Column("a", Integer) + b = Column("b", Integer) + c = Column("c", Integer) uq = UniqueConstraint(a, b, c) - t = Table('tbl', m, a) + t = Table("tbl", m, a) assert uq not in t.constraints t.append_column(b) assert uq not in t.constraints - t2 = Table('t2', m) + t2 = Table("t2", m) # two different tables, so UniqueConstraint raises assert_raises_message( exc.ArgumentError, r"Column\(s\) 't2\.c' are not part of table 'tbl'\.", - t2.append_column, c + t2.append_column, + c, ) def test_auto_append_uq_on_col_attach_four(self): @@ -3088,12 +3254,12 @@ class ConstraintTest(fixtures.TestBase): """ m = MetaData() - a = Column('a', Integer) - b = Column('b', Integer) - c = Column('c', Integer) - uq = UniqueConstraint(a, 'b', 'c') + a = Column("a", Integer) + b = Column("b", Integer) + c = Column("c", Integer) + uq = UniqueConstraint(a, "b", "c") - t = Table('tbl', m, a) + t = Table("tbl", m, a) assert uq not in t.constraints t.append_column(b) @@ -3111,7 +3277,7 @@ class ConstraintTest(fixtures.TestBase): eq_( [cn for cn in t.constraints if isinstance(cn, UniqueConstraint)], - [uq] + [uq], ) def test_auto_append_uq_on_col_attach_five(self): @@ -3121,13 +3287,13 @@ class ConstraintTest(fixtures.TestBase): """ m = MetaData() - a = Column('a', Integer) - b = Column('b', Integer) - c = Column('c', Integer) + a = Column("a", Integer) + b = Column("b", Integer) + c = Column("c", Integer) - t = Table('tbl', m, a, c, b) + t = Table("tbl", m, a, c, b) - uq = UniqueConstraint(a, 'b', 'c') + uq = UniqueConstraint(a, "b", "c") assert uq in t.constraints @@ -3137,38 +3303,36 @@ class ConstraintTest(fixtures.TestBase): eq_( [cn for cn in t.constraints if isinstance(cn, UniqueConstraint)], - [uq] + [uq], ) def test_index_asserts_cols_standalone(self): metadata = MetaData() - t1 = Table('t1', metadata, - Column('x', Integer) - ) - t2 = Table('t2', metadata, - Column('y', Integer) - ) + t1 = Table("t1", metadata, Column("x", Integer)) + t2 = Table("t2", metadata, Column("y", Integer)) assert_raises_message( exc.ArgumentError, r"Column\(s\) 't2.y' are not part of table 't1'.", Index, - "bar", t1.c.x, t2.c.y + "bar", + t1.c.x, + t2.c.y, ) def test_index_asserts_cols_inline(self): metadata = MetaData() - t1 = Table('t1', metadata, - Column('x', Integer) - ) + t1 = Table("t1", metadata, Column("x", Integer)) assert_raises_message( exc.ArgumentError, "Index 'bar' is against table 't1', and " "cannot be associated with table 't2'.", - Table, 't2', metadata, - Column('y', Integer), - Index('bar', t1.c.x) + Table, + "t2", + metadata, + Column("y", Integer), + Index("bar", t1.c.x), ) def test_raise_index_nonexistent_name(self): @@ -3176,28 +3340,25 @@ class ConstraintTest(fixtures.TestBase): # the KeyError isn't ideal here, a nicer message # perhaps assert_raises( - KeyError, - Table, 't', m, Column('x', Integer), Index("foo", "q") + KeyError, Table, "t", m, Column("x", Integer), Index("foo", "q") ) def test_raise_not_a_column(self): - assert_raises( - exc.ArgumentError, - Index, "foo", 5 - ) + assert_raises(exc.ArgumentError, Index, "foo", 5) def test_raise_expr_no_column(self): - idx = Index('foo', func.lower(5)) + idx = Index("foo", func.lower(5)) assert_raises_message( exc.CompileError, "Index 'foo' is not associated with any table.", - schema.CreateIndex(idx).compile, dialect=testing.db.dialect + schema.CreateIndex(idx).compile, + dialect=testing.db.dialect, ) assert_raises_message( exc.CompileError, "Index 'foo' is not associated with any table.", - schema.CreateIndex(idx).compile + schema.CreateIndex(idx).compile, ) def test_no_warning_w_no_columns(self): @@ -3206,26 +3367,29 @@ class ConstraintTest(fixtures.TestBase): assert_raises_message( exc.CompileError, "Index 'foo' is not associated with any table.", - schema.CreateIndex(idx).compile, dialect=testing.db.dialect + schema.CreateIndex(idx).compile, + dialect=testing.db.dialect, ) assert_raises_message( exc.CompileError, "Index 'foo' is not associated with any table.", - schema.CreateIndex(idx).compile + schema.CreateIndex(idx).compile, ) def test_raise_clauseelement_not_a_column(self): m = MetaData() - t2 = Table('t2', m, Column('x', Integer)) + t2 = Table("t2", m, Column("x", Integer)) class SomeClass(object): - def __clause_element__(self): return t2 + assert_raises_message( exc.ArgumentError, r"Element Table\('t2', .* is not a string name or column element", - Index, "foo", SomeClass() + Index, + "foo", + SomeClass(), ) @@ -3233,28 +3397,30 @@ class ColumnDefinitionTest(AssertsCompiledSQL, fixtures.TestBase): """Test Column() construction.""" - __dialect__ = 'default' + __dialect__ = "default" def columns(self): - return [Column(Integer), - Column('b', Integer), - Column(Integer), - Column('d', Integer), - Column(Integer, name='e'), - Column(type_=Integer), - Column(Integer()), - Column('h', Integer()), - Column(type_=Integer())] + return [ + Column(Integer), + Column("b", Integer), + Column(Integer), + Column("d", Integer), + Column(Integer, name="e"), + Column(type_=Integer), + Column(Integer()), + Column("h", Integer()), + Column(type_=Integer()), + ] def test_basic(self): c = self.columns() - for i, v in ((0, 'a'), (2, 'c'), (5, 'f'), (6, 'g'), (8, 'i')): + for i, v in ((0, "a"), (2, "c"), (5, "f"), (6, "g"), (8, "i")): c[i].name = v c[i].key = v del i, v - tbl = Table('table', MetaData(), *c) + tbl = Table("table", MetaData(), *c) for i, col in enumerate(tbl.c): assert col.name == c[i].name @@ -3266,35 +3432,47 @@ class ColumnDefinitionTest(AssertsCompiledSQL, fixtures.TestBase): exc.ArgumentError, "Column must be constructed with a non-blank name or assign a " "non-blank .name ", - Table, 't', MetaData(), c) + Table, + "t", + MetaData(), + c, + ) def test_name_blank(self): - c = Column('', Integer) + c = Column("", Integer) assert_raises_message( exc.ArgumentError, "Column must be constructed with a non-blank name or assign a " "non-blank .name ", - Table, 't', MetaData(), c) + Table, + "t", + MetaData(), + c, + ) def test_dupe_column(self): - c = Column('x', Integer) - Table('t', MetaData(), c) + c = Column("x", Integer) + Table("t", MetaData(), c) assert_raises_message( exc.ArgumentError, "Column object 'x' already assigned to Table 't'", - Table, 'q', MetaData(), c) + Table, + "q", + MetaData(), + c, + ) def test_incomplete_key(self): c = Column(Integer) assert c.name is None assert c.key is None - c.name = 'named' - Table('t', MetaData(), c) + c.name = "named" + Table("t", MetaData(), c) - assert c.name == 'named' + assert c.name == "named" assert c.name == c.key def test_unique_index_flags_default_to_none(self): @@ -3302,28 +3480,29 @@ class ColumnDefinitionTest(AssertsCompiledSQL, fixtures.TestBase): eq_(c.unique, None) eq_(c.index, None) - c = Column('c', Integer, index=True) + c = Column("c", Integer, index=True) eq_(c.unique, None) eq_(c.index, True) - t = Table('t', MetaData(), c) + t = Table("t", MetaData(), c) eq_(list(t.indexes)[0].unique, False) c = Column(Integer, unique=True) eq_(c.unique, True) eq_(c.index, None) - c = Column('c', Integer, index=True, unique=True) + c = Column("c", Integer, index=True, unique=True) eq_(c.unique, True) eq_(c.index, True) - t = Table('t', MetaData(), c) + t = Table("t", MetaData(), c) eq_(list(t.indexes)[0].unique, True) def test_bogus(self): - assert_raises(exc.ArgumentError, Column, 'foo', name='bar') - assert_raises(exc.ArgumentError, Column, 'foo', Integer, - type_=Integer()) + assert_raises(exc.ArgumentError, Column, "foo", name="bar") + assert_raises( + exc.ArgumentError, Column, "foo", Integer, type_=Integer() + ) def test_custom_subclass_proxy(self): """test proxy generation of a Column subclass, can be compiled.""" @@ -3333,9 +3512,8 @@ class ColumnDefinitionTest(AssertsCompiledSQL, fixtures.TestBase): from sqlalchemy.sql import select class MyColumn(Column): - def _constructor(self, name, type, **kw): - kw['name'] = name + kw["name"] = name return MyColumn(type, **kw) def __init__(self, type, **kw): @@ -3350,13 +3528,10 @@ class ColumnDefinitionTest(AssertsCompiledSQL, fixtures.TestBase): return s + "-" id = MyColumn(Integer, primary_key=True) - id.name = 'id' + id.name = "id" name = MyColumn(String) - name.name = 'name' - t1 = Table('foo', MetaData(), - id, - name - ) + name.name = "name" + t1 = Table("foo", MetaData(), id, name) # goofy thing eq_(t1.c.name.my_goofy_thing(), "hi") @@ -3380,24 +3555,22 @@ class ColumnDefinitionTest(AssertsCompiledSQL, fixtures.TestBase): from sqlalchemy.sql import select class MyColumn(Column): - def __init__(self, type, **kw): Column.__init__(self, type, **kw) id = MyColumn(Integer, primary_key=True) - id.name = 'id' + id.name = "id" name = MyColumn(String) - name.name = 'name' - t1 = Table('foo', MetaData(), - id, - name - ) + name.name = "name" + t1 = Table("foo", MetaData(), id, name) assert_raises_message( TypeError, "Could not create a copy of this " "object. Ensure the class includes a _constructor()", - getattr, select([t1.select().alias()]), 'c' + getattr, + select([t1.select().alias()]), + "c", ) def test_custom_create(self): @@ -3412,7 +3585,7 @@ class ColumnDefinitionTest(AssertsCompiledSQL, fixtures.TestBase): text = "%s SPECIAL DIRECTIVE %s" % ( column.name, - compiler.type_compiler.process(column.type) + compiler.type_compiler.process(column.type), ) default = compiler.get_column_default_string(column) if default is not None: @@ -3423,23 +3596,23 @@ class ColumnDefinitionTest(AssertsCompiledSQL, fixtures.TestBase): if column.constraints: text += " ".join( - compiler.process(const) - for const in column.constraints) + compiler.process(const) for const in column.constraints + ) return text t = Table( - 'mytable', MetaData(), - Column('x', Integer, info={ - "special": True}, primary_key=True), - Column('y', String(50)), - Column('z', String(20), info={ - "special": True})) + "mytable", + MetaData(), + Column("x", Integer, info={"special": True}, primary_key=True), + Column("y", String(50)), + Column("z", String(20), info={"special": True}), + ) self.assert_compile( schema.CreateTable(t), "CREATE TABLE mytable (x SPECIAL DIRECTIVE INTEGER " "NOT NULL, y VARCHAR(50), " - "z SPECIAL DIRECTIVE VARCHAR(20), PRIMARY KEY (x))" + "z SPECIAL DIRECTIVE VARCHAR(20), PRIMARY KEY (x))", ) deregister(schema.CreateColumn) @@ -3450,127 +3623,126 @@ class ColumnDefaultsTest(fixtures.TestBase): """test assignment of default fixures to columns""" def _fixture(self, *arg, **kw): - return Column('x', Integer, *arg, **kw) + return Column("x", Integer, *arg, **kw) def test_server_default_positional(self): - target = schema.DefaultClause('y') + target = schema.DefaultClause("y") c = self._fixture(target) assert c.server_default is target assert target.column is c def test_onupdate_default_not_server_default_one(self): - target1 = schema.DefaultClause('y') - target2 = schema.DefaultClause('z') + target1 = schema.DefaultClause("y") + target2 = schema.DefaultClause("z") c = self._fixture(server_default=target1, server_onupdate=target2) - eq_(c.server_default.arg, 'y') - eq_(c.server_onupdate.arg, 'z') + eq_(c.server_default.arg, "y") + eq_(c.server_onupdate.arg, "z") def test_onupdate_default_not_server_default_two(self): - target1 = schema.DefaultClause('y', for_update=True) - target2 = schema.DefaultClause('z', for_update=True) + target1 = schema.DefaultClause("y", for_update=True) + target2 = schema.DefaultClause("z", for_update=True) c = self._fixture(server_default=target1, server_onupdate=target2) - eq_(c.server_default.arg, 'y') - eq_(c.server_onupdate.arg, 'z') + eq_(c.server_default.arg, "y") + eq_(c.server_onupdate.arg, "z") def test_onupdate_default_not_server_default_three(self): - target1 = schema.DefaultClause('y', for_update=False) - target2 = schema.DefaultClause('z', for_update=True) + target1 = schema.DefaultClause("y", for_update=False) + target2 = schema.DefaultClause("z", for_update=True) c = self._fixture(target1, target2) - eq_(c.server_default.arg, 'y') - eq_(c.server_onupdate.arg, 'z') + eq_(c.server_default.arg, "y") + eq_(c.server_onupdate.arg, "z") def test_onupdate_default_not_server_default_four(self): - target1 = schema.DefaultClause('y', for_update=False) + target1 = schema.DefaultClause("y", for_update=False) c = self._fixture(server_onupdate=target1) is_(c.server_default, None) - eq_(c.server_onupdate.arg, 'y') + eq_(c.server_onupdate.arg, "y") def test_server_default_keyword_as_schemaitem(self): - target = schema.DefaultClause('y') + target = schema.DefaultClause("y") c = self._fixture(server_default=target) assert c.server_default is target assert target.column is c def test_server_default_keyword_as_clause(self): - target = 'y' + target = "y" c = self._fixture(server_default=target) assert c.server_default.arg == target assert c.server_default.column is c def test_server_default_onupdate_positional(self): - target = schema.DefaultClause('y', for_update=True) + target = schema.DefaultClause("y", for_update=True) c = self._fixture(target) assert c.server_onupdate is target assert target.column is c def test_server_default_onupdate_keyword_as_schemaitem(self): - target = schema.DefaultClause('y', for_update=True) + target = schema.DefaultClause("y", for_update=True) c = self._fixture(server_onupdate=target) assert c.server_onupdate is target assert target.column is c def test_server_default_onupdate_keyword_as_clause(self): - target = 'y' + target = "y" c = self._fixture(server_onupdate=target) assert c.server_onupdate.arg == target assert c.server_onupdate.column is c def test_column_default_positional(self): - target = schema.ColumnDefault('y') + target = schema.ColumnDefault("y") c = self._fixture(target) assert c.default is target assert target.column is c def test_column_default_keyword_as_schemaitem(self): - target = schema.ColumnDefault('y') + target = schema.ColumnDefault("y") c = self._fixture(default=target) assert c.default is target assert target.column is c def test_column_default_keyword_as_clause(self): - target = 'y' + target = "y" c = self._fixture(default=target) assert c.default.arg == target assert c.default.column is c def test_column_default_onupdate_positional(self): - target = schema.ColumnDefault('y', for_update=True) + target = schema.ColumnDefault("y", for_update=True) c = self._fixture(target) assert c.onupdate is target assert target.column is c def test_column_default_onupdate_keyword_as_schemaitem(self): - target = schema.ColumnDefault('y', for_update=True) + target = schema.ColumnDefault("y", for_update=True) c = self._fixture(onupdate=target) assert c.onupdate is target assert target.column is c def test_column_default_onupdate_keyword_as_clause(self): - target = 'y' + target = "y" c = self._fixture(onupdate=target) assert c.onupdate.arg == target assert c.onupdate.column is c class ColumnOptionsTest(fixtures.TestBase): - def test_default_generators(self): - g1, g2 = Sequence('foo_id_seq'), ColumnDefault('f5') + g1, g2 = Sequence("foo_id_seq"), ColumnDefault("f5") assert Column(String, default=g1).default is g1 assert Column(String, onupdate=g1).onupdate is g1 assert Column(String, default=g2).default is g2 assert Column(String, onupdate=g2).onupdate is g2 def _null_type_error(self, col): - t = Table('t', MetaData(), col) + t = Table("t", MetaData(), col) assert_raises_message( exc.CompileError, r"\(in table 't', column 'foo'\): Can't generate DDL for NullType", - schema.CreateTable(t).compile + schema.CreateTable(t).compile, ) def _no_name_error(self, col): @@ -3578,13 +3750,16 @@ class ColumnOptionsTest(fixtures.TestBase): exc.ArgumentError, "Column must be constructed with a non-blank name or " "assign a non-blank .name", - Table, 't', MetaData(), col + Table, + "t", + MetaData(), + col, ) def _no_error(self, col): m = MetaData() - b = Table('bar', m, Column('id', Integer)) - t = Table('t', m, col) + b = Table("bar", m, Column("id", Integer)) + t = Table("t", m, col) schema.CreateTable(t).compile() def test_argument_signatures(self): @@ -3597,71 +3772,82 @@ class ColumnOptionsTest(fixtures.TestBase): self._null_type_error(Column("foo", Sequence("a"))) - self._no_name_error(Column(ForeignKey('bar.id'))) + self._no_name_error(Column(ForeignKey("bar.id"))) - self._no_error(Column("foo", ForeignKey('bar.id'))) + self._no_error(Column("foo", ForeignKey("bar.id"))) - self._no_name_error(Column(ForeignKey('bar.id'), default="foo")) + self._no_name_error(Column(ForeignKey("bar.id"), default="foo")) - self._no_name_error(Column(ForeignKey('bar.id'), Sequence("a"))) - self._no_error(Column("foo", ForeignKey('bar.id'), default="foo")) - self._no_error(Column("foo", ForeignKey('bar.id'), Sequence("a"))) + self._no_name_error(Column(ForeignKey("bar.id"), Sequence("a"))) + self._no_error(Column("foo", ForeignKey("bar.id"), default="foo")) + self._no_error(Column("foo", ForeignKey("bar.id"), Sequence("a"))) def test_column_info(self): - c1 = Column('foo', String, info={'x': 'y'}) - c2 = Column('bar', String, info={}) - c3 = Column('bat', String) - assert c1.info == {'x': 'y'} + c1 = Column("foo", String, info={"x": "y"}) + c2 = Column("bar", String, info={}) + c3 = Column("bat", String) + assert c1.info == {"x": "y"} assert c2.info == {} assert c3.info == {} for c in (c1, c2, c3): - c.info['bar'] = 'zip' - assert c.info['bar'] == 'zip' + c.info["bar"] = "zip" + assert c.info["bar"] == "zip" class CatchAllEventsTest(fixtures.RemovesEvents, fixtures.TestBase): - def test_all_events(self): canary = [] def before_attach(obj, parent): - canary.append("%s->%s" % (obj.__class__.__name__, - parent.__class__.__name__)) + canary.append( + "%s->%s" % (obj.__class__.__name__, parent.__class__.__name__) + ) def after_attach(obj, parent): canary.append("%s->%s" % (obj.__class__.__name__, parent)) self.event_listen( - schema.SchemaItem, - "before_parent_attach", - before_attach) + schema.SchemaItem, "before_parent_attach", before_attach + ) self.event_listen( - schema.SchemaItem, - "after_parent_attach", - after_attach) + schema.SchemaItem, "after_parent_attach", after_attach + ) m = MetaData() - Table('t1', m, - Column('id', Integer, Sequence('foo_id'), primary_key=True), - Column('bar', String, ForeignKey('t2.id')) - ) - Table('t2', m, - Column('id', Integer, primary_key=True), - ) + Table( + "t1", + m, + Column("id", Integer, Sequence("foo_id"), primary_key=True), + Column("bar", String, ForeignKey("t2.id")), + ) + Table("t2", m, Column("id", Integer, primary_key=True)) eq_( canary, - ['Sequence->Column', 'Sequence->id', 'ForeignKey->Column', - 'ForeignKey->bar', 'Table->MetaData', - 'PrimaryKeyConstraint->Table', 'PrimaryKeyConstraint->t1', - 'Column->Table', 'Column->t1', 'Column->Table', - 'Column->t1', 'ForeignKeyConstraint->Table', - 'ForeignKeyConstraint->t1', 'Table->MetaData(bind=None)', - 'Table->MetaData', 'PrimaryKeyConstraint->Table', - 'PrimaryKeyConstraint->t2', 'Column->Table', 'Column->t2', - 'Table->MetaData(bind=None)'] + [ + "Sequence->Column", + "Sequence->id", + "ForeignKey->Column", + "ForeignKey->bar", + "Table->MetaData", + "PrimaryKeyConstraint->Table", + "PrimaryKeyConstraint->t1", + "Column->Table", + "Column->t1", + "Column->Table", + "Column->t1", + "ForeignKeyConstraint->Table", + "ForeignKeyConstraint->t1", + "Table->MetaData(bind=None)", + "Table->MetaData", + "PrimaryKeyConstraint->Table", + "PrimaryKeyConstraint->t2", + "Column->Table", + "Column->t2", + "Table->MetaData(bind=None)", + ], ) def test_events_per_constraint(self): @@ -3669,80 +3855,82 @@ class CatchAllEventsTest(fixtures.RemovesEvents, fixtures.TestBase): def evt(target): def before_attach(obj, parent): - canary.append("%s->%s" % (target.__name__, - parent.__class__.__name__)) + canary.append( + "%s->%s" % (target.__name__, parent.__class__.__name__) + ) def after_attach(obj, parent): - assert hasattr(obj, 'name') # so we can change it + assert hasattr(obj, "name") # so we can change it canary.append("%s->%s" % (target.__name__, parent)) + self.event_listen(target, "before_parent_attach", before_attach) self.event_listen(target, "after_parent_attach", after_attach) for target in [ - schema.ForeignKeyConstraint, schema.PrimaryKeyConstraint, + schema.ForeignKeyConstraint, + schema.PrimaryKeyConstraint, schema.UniqueConstraint, schema.CheckConstraint, - schema.Index + schema.Index, ]: evt(target) m = MetaData() - Table('t1', m, - Column('id', Integer, Sequence('foo_id'), primary_key=True), - Column('bar', String, ForeignKey('t2.id'), index=True), - Column('bat', Integer, unique=True), - ) - Table('t2', m, - Column('id', Integer, primary_key=True), - Column('bar', Integer), - Column('bat', Integer), - CheckConstraint("bar>5"), - UniqueConstraint('bar', 'bat'), - Index(None, 'bar', 'bat') - ) + Table( + "t1", + m, + Column("id", Integer, Sequence("foo_id"), primary_key=True), + Column("bar", String, ForeignKey("t2.id"), index=True), + Column("bat", Integer, unique=True), + ) + Table( + "t2", + m, + Column("id", Integer, primary_key=True), + Column("bar", Integer), + Column("bat", Integer), + CheckConstraint("bar>5"), + UniqueConstraint("bar", "bat"), + Index(None, "bar", "bat"), + ) eq_( canary, [ - 'PrimaryKeyConstraint->Table', 'PrimaryKeyConstraint->t1', - 'Index->Table', 'Index->t1', - 'ForeignKeyConstraint->Table', 'ForeignKeyConstraint->t1', - 'UniqueConstraint->Table', 'UniqueConstraint->t1', - 'PrimaryKeyConstraint->Table', 'PrimaryKeyConstraint->t2', - 'CheckConstraint->Table', 'CheckConstraint->t2', - 'UniqueConstraint->Table', 'UniqueConstraint->t2', - 'Index->Table', 'Index->t2' - ] + "PrimaryKeyConstraint->Table", + "PrimaryKeyConstraint->t1", + "Index->Table", + "Index->t1", + "ForeignKeyConstraint->Table", + "ForeignKeyConstraint->t1", + "UniqueConstraint->Table", + "UniqueConstraint->t1", + "PrimaryKeyConstraint->Table", + "PrimaryKeyConstraint->t2", + "CheckConstraint->Table", + "CheckConstraint->t2", + "UniqueConstraint->Table", + "UniqueConstraint->t2", + "Index->Table", + "Index->t2", + ], ) class DialectKWArgTest(fixtures.TestBase): - @contextmanager def _fixture(self): from sqlalchemy.engine.default import DefaultDialect class ParticipatingDialect(DefaultDialect): construct_arguments = [ - (schema.Index, { - "x": 5, - "y": False, - "z_one": None - }), - (schema.ForeignKeyConstraint, { - "foobar": False - }) + (schema.Index, {"x": 5, "y": False, "z_one": None}), + (schema.ForeignKeyConstraint, {"foobar": False}), ] class ParticipatingDialect2(DefaultDialect): construct_arguments = [ - (schema.Index, { - "x": 9, - "y": True, - "pp": "default" - }), - (schema.Table, { - "*": None - }) + (schema.Index, {"x": 9, "y": True, "pp": "default"}), + (schema.Table, {"*": None}), ] class NonParticipatingDialect(DefaultDialect): @@ -3757,6 +3945,7 @@ class DialectKWArgTest(fixtures.TestBase): return NonParticipatingDialect else: raise exc.NoSuchModuleError("no dialect %r" % dialect_name) + with mock.patch("sqlalchemy.dialects.registry.load", load): yield @@ -3765,32 +3954,21 @@ class DialectKWArgTest(fixtures.TestBase): def test_participating(self): with self._fixture(): - idx = Index('a', 'b', 'c', participating_y=True) + idx = Index("a", "b", "c", participating_y=True) eq_( idx.dialect_options, - {"participating": {"x": 5, "y": True, "z_one": None}} - ) - eq_( - idx.dialect_kwargs, - { - 'participating_y': True, - } + {"participating": {"x": 5, "y": True, "z_one": None}}, ) + eq_(idx.dialect_kwargs, {"participating_y": True}) def test_nonparticipating(self): with self._fixture(): idx = Index( - 'a', - 'b', - 'c', - nonparticipating_y=True, - nonparticipating_q=5) + "a", "b", "c", nonparticipating_y=True, nonparticipating_q=5 + ) eq_( idx.dialect_kwargs, - { - 'nonparticipating_y': True, - 'nonparticipating_q': 5 - } + {"nonparticipating_y": True, "nonparticipating_q": 5}, ) def test_bad_kwarg_raise(self): @@ -3799,7 +3977,11 @@ class DialectKWArgTest(fixtures.TestBase): TypeError, "Additional arguments should be named " "_, got 'foobar'", - Index, 'a', 'b', 'c', foobar=True + Index, + "a", + "b", + "c", + foobar=True, ) def test_unknown_dialect_warning(self): @@ -3808,7 +3990,11 @@ class DialectKWArgTest(fixtures.TestBase): exc.SAWarning, "Can't validate argument 'unknown_y'; can't locate " "any SQLAlchemy dialect named 'unknown'", - Index, 'a', 'b', 'c', unknown_y=True + Index, + "a", + "b", + "c", + unknown_y=True, ) def test_participating_bad_kw(self): @@ -3818,7 +4004,11 @@ class DialectKWArgTest(fixtures.TestBase): "Argument 'participating_q_p_x' is not accepted by dialect " "'participating' on behalf of " "", - Index, 'a', 'b', 'c', participating_q_p_x=8 + Index, + "a", + "b", + "c", + participating_q_p_x=8, ) def test_participating_unknown_schema_item(self): @@ -3830,310 +4020,328 @@ class DialectKWArgTest(fixtures.TestBase): "Argument 'participating_q_p_x' is not accepted by dialect " "'participating' on behalf of " "", - UniqueConstraint, 'a', 'b', participating_q_p_x=8 + UniqueConstraint, + "a", + "b", + participating_q_p_x=8, ) @testing.emits_warning("Can't validate") def test_unknown_dialect_warning_still_populates(self): with self._fixture(): - idx = Index('a', 'b', 'c', unknown_y=True) + idx = Index("a", "b", "c", unknown_y=True) eq_(idx.dialect_kwargs, {"unknown_y": True}) # still populates @testing.emits_warning("Can't validate") def test_unknown_dialect_warning_still_populates_multiple(self): with self._fixture(): - idx = Index('a', 'b', 'c', unknown_y=True, unknown_z=5, - otherunknown_foo='bar', participating_y=8) + idx = Index( + "a", + "b", + "c", + unknown_y=True, + unknown_z=5, + otherunknown_foo="bar", + participating_y=8, + ) eq_( idx.dialect_options, { - "unknown": {'y': True, 'z': 5, '*': None}, - "otherunknown": {'foo': 'bar', '*': None}, - "participating": {'x': 5, 'y': 8, 'z_one': None} - } + "unknown": {"y": True, "z": 5, "*": None}, + "otherunknown": {"foo": "bar", "*": None}, + "participating": {"x": 5, "y": 8, "z_one": None}, + }, ) - eq_(idx.dialect_kwargs, - {'unknown_z': 5, 'participating_y': 8, - 'unknown_y': True, - 'otherunknown_foo': 'bar'} - ) # still populates + eq_( + idx.dialect_kwargs, + { + "unknown_z": 5, + "participating_y": 8, + "unknown_y": True, + "otherunknown_foo": "bar", + }, + ) # still populates def test_runs_safekwarg(self): - with mock.patch("sqlalchemy.util.safe_kwarg", - lambda arg: "goofy_%s" % arg): + with mock.patch( + "sqlalchemy.util.safe_kwarg", lambda arg: "goofy_%s" % arg + ): with self._fixture(): - idx = Index('a', 'b') - idx.kwargs[util.u('participating_x')] = 7 + idx = Index("a", "b") + idx.kwargs[util.u("participating_x")] = 7 - eq_( - list(idx.dialect_kwargs), - ['goofy_participating_x'] - ) + eq_(list(idx.dialect_kwargs), ["goofy_participating_x"]) def test_combined(self): with self._fixture(): - idx = Index('a', 'b', 'c', participating_x=7, - nonparticipating_y=True) + idx = Index( + "a", "b", "c", participating_x=7, nonparticipating_y=True + ) eq_( idx.dialect_options, { - 'participating': {'y': False, 'x': 7, 'z_one': None}, - 'nonparticipating': {'y': True, '*': None} - } + "participating": {"y": False, "x": 7, "z_one": None}, + "nonparticipating": {"y": True, "*": None}, + }, ) eq_( idx.dialect_kwargs, - { - 'participating_x': 7, - 'nonparticipating_y': True, - } + {"participating_x": 7, "nonparticipating_y": True}, ) def test_multiple_participating(self): with self._fixture(): - idx = Index('a', 'b', 'c', - participating_x=7, - participating2_x=15, - participating2_y="lazy" - ) + idx = Index( + "a", + "b", + "c", + participating_x=7, + participating2_x=15, + participating2_y="lazy", + ) eq_( idx.dialect_options, { - "participating": {'x': 7, 'y': False, 'z_one': None}, - "participating2": {'x': 15, 'y': 'lazy', 'pp': 'default'}, - } + "participating": {"x": 7, "y": False, "z_one": None}, + "participating2": {"x": 15, "y": "lazy", "pp": "default"}, + }, ) eq_( idx.dialect_kwargs, { - 'participating_x': 7, - 'participating2_x': 15, - 'participating2_y': 'lazy' - } + "participating_x": 7, + "participating2_x": 15, + "participating2_y": "lazy", + }, ) def test_foreign_key_propagate(self): with self._fixture(): m = MetaData() - fk = ForeignKey('t2.id', participating_foobar=True) - t = Table('t', m, Column('id', Integer, fk)) + fk = ForeignKey("t2.id", participating_foobar=True) + t = Table("t", m, Column("id", Integer, fk)) fkc = [ - c for c in t.constraints if isinstance( - c, - ForeignKeyConstraint)][0] - eq_( - fkc.dialect_kwargs, - {'participating_foobar': True} - ) + c for c in t.constraints if isinstance(c, ForeignKeyConstraint) + ][0] + eq_(fkc.dialect_kwargs, {"participating_foobar": True}) def test_foreign_key_propagate_exceptions_delayed(self): with self._fixture(): m = MetaData() - fk = ForeignKey('t2.id', participating_fake=True) - c1 = Column('id', Integer, fk) + fk = ForeignKey("t2.id", participating_fake=True) + c1 = Column("id", Integer, fk) assert_raises_message( exc.ArgumentError, "Argument 'participating_fake' is not accepted by " "dialect 'participating' on behalf of " "", - Table, 't', m, c1 + Table, + "t", + m, + c1, ) def test_wildcard(self): with self._fixture(): m = MetaData() - t = Table('x', m, Column('x', Integer), - participating2_xyz='foo', - participating2_engine='InnoDB', - ) + t = Table( + "x", + m, + Column("x", Integer), + participating2_xyz="foo", + participating2_engine="InnoDB", + ) eq_( t.dialect_kwargs, { - 'participating2_xyz': 'foo', - 'participating2_engine': 'InnoDB' - } + "participating2_xyz": "foo", + "participating2_engine": "InnoDB", + }, ) def test_uninit_wildcard(self): with self._fixture(): m = MetaData() - t = Table('x', m, Column('x', Integer)) - eq_( - t.dialect_options['participating2'], {'*': None} - ) - eq_( - t.dialect_kwargs, {} - ) + t = Table("x", m, Column("x", Integer)) + eq_(t.dialect_options["participating2"], {"*": None}) + eq_(t.dialect_kwargs, {}) def test_not_contains_wildcard(self): with self._fixture(): m = MetaData() - t = Table('x', m, Column('x', Integer)) - assert 'foobar' not in t.dialect_options['participating2'] + t = Table("x", m, Column("x", Integer)) + assert "foobar" not in t.dialect_options["participating2"] def test_contains_wildcard(self): with self._fixture(): m = MetaData() - t = Table('x', m, Column('x', Integer), participating2_foobar=5) - assert 'foobar' in t.dialect_options['participating2'] + t = Table("x", m, Column("x", Integer), participating2_foobar=5) + assert "foobar" in t.dialect_options["participating2"] def test_update(self): with self._fixture(): - idx = Index('a', 'b', 'c', participating_x=20) - eq_(idx.dialect_kwargs, { - "participating_x": 20, - }) - idx._validate_dialect_kwargs({ - "participating_x": 25, - "participating_z_one": "default"}) - eq_(idx.dialect_options, { - "participating": {"x": 25, "y": False, "z_one": "default"} - }) - eq_(idx.dialect_kwargs, { - "participating_x": 25, - 'participating_z_one': "default" - }) - - idx._validate_dialect_kwargs({ - "participating_x": 25, - "participating_z_one": "default"}) - - eq_(idx.dialect_options, { - "participating": {"x": 25, "y": False, "z_one": "default"} - }) - eq_(idx.dialect_kwargs, { - "participating_x": 25, - 'participating_z_one': "default" - }) - - idx._validate_dialect_kwargs({ - "participating_y": True, - 'participating2_y': "p2y"}) - eq_(idx.dialect_options, { - "participating": {"x": 25, "y": True, "z_one": "default"}, - "participating2": {"y": "p2y", "pp": "default", "x": 9} - }) - eq_(idx.dialect_kwargs, { - "participating_x": 25, - "participating_y": True, - 'participating2_y': "p2y", - "participating_z_one": "default"}) + idx = Index("a", "b", "c", participating_x=20) + eq_(idx.dialect_kwargs, {"participating_x": 20}) + idx._validate_dialect_kwargs( + {"participating_x": 25, "participating_z_one": "default"} + ) + eq_( + idx.dialect_options, + {"participating": {"x": 25, "y": False, "z_one": "default"}}, + ) + eq_( + idx.dialect_kwargs, + {"participating_x": 25, "participating_z_one": "default"}, + ) + + idx._validate_dialect_kwargs( + {"participating_x": 25, "participating_z_one": "default"} + ) + + eq_( + idx.dialect_options, + {"participating": {"x": 25, "y": False, "z_one": "default"}}, + ) + eq_( + idx.dialect_kwargs, + {"participating_x": 25, "participating_z_one": "default"}, + ) + + idx._validate_dialect_kwargs( + {"participating_y": True, "participating2_y": "p2y"} + ) + eq_( + idx.dialect_options, + { + "participating": {"x": 25, "y": True, "z_one": "default"}, + "participating2": {"y": "p2y", "pp": "default", "x": 9}, + }, + ) + eq_( + idx.dialect_kwargs, + { + "participating_x": 25, + "participating_y": True, + "participating2_y": "p2y", + "participating_z_one": "default", + }, + ) def test_key_error_kwargs_no_dialect(self): with self._fixture(): - idx = Index('a', 'b', 'c') - assert_raises( - KeyError, - idx.kwargs.__getitem__, 'foo_bar' - ) + idx = Index("a", "b", "c") + assert_raises(KeyError, idx.kwargs.__getitem__, "foo_bar") def test_key_error_kwargs_no_underscore(self): with self._fixture(): - idx = Index('a', 'b', 'c') - assert_raises( - KeyError, - idx.kwargs.__getitem__, 'foobar' - ) + idx = Index("a", "b", "c") + assert_raises(KeyError, idx.kwargs.__getitem__, "foobar") def test_key_error_kwargs_no_argument(self): with self._fixture(): - idx = Index('a', 'b', 'c') + idx = Index("a", "b", "c") assert_raises( - KeyError, - idx.kwargs.__getitem__, 'participating_asdmfq34098' + KeyError, idx.kwargs.__getitem__, "participating_asdmfq34098" ) assert_raises( KeyError, - idx.kwargs.__getitem__, 'nonparticipating_asdmfq34098' + idx.kwargs.__getitem__, + "nonparticipating_asdmfq34098", ) def test_key_error_dialect_options(self): with self._fixture(): - idx = Index('a', 'b', 'c') + idx = Index("a", "b", "c") assert_raises( KeyError, - idx.dialect_options['participating'].__getitem__, 'asdfaso890' + idx.dialect_options["participating"].__getitem__, + "asdfaso890", ) assert_raises( KeyError, - idx.dialect_options['nonparticipating'].__getitem__, - 'asdfaso890') + idx.dialect_options["nonparticipating"].__getitem__, + "asdfaso890", + ) def test_ad_hoc_participating_via_opt(self): with self._fixture(): - idx = Index('a', 'b', 'c') - idx.dialect_options['participating']['foobar'] = 5 + idx = Index("a", "b", "c") + idx.dialect_options["participating"]["foobar"] = 5 - eq_(idx.dialect_options['participating']['foobar'], 5) - eq_(idx.kwargs['participating_foobar'], 5) + eq_(idx.dialect_options["participating"]["foobar"], 5) + eq_(idx.kwargs["participating_foobar"], 5) def test_ad_hoc_nonparticipating_via_opt(self): with self._fixture(): - idx = Index('a', 'b', 'c') - idx.dialect_options['nonparticipating']['foobar'] = 5 + idx = Index("a", "b", "c") + idx.dialect_options["nonparticipating"]["foobar"] = 5 - eq_(idx.dialect_options['nonparticipating']['foobar'], 5) - eq_(idx.kwargs['nonparticipating_foobar'], 5) + eq_(idx.dialect_options["nonparticipating"]["foobar"], 5) + eq_(idx.kwargs["nonparticipating_foobar"], 5) def test_ad_hoc_participating_via_kwargs(self): with self._fixture(): - idx = Index('a', 'b', 'c') - idx.kwargs['participating_foobar'] = 5 + idx = Index("a", "b", "c") + idx.kwargs["participating_foobar"] = 5 - eq_(idx.dialect_options['participating']['foobar'], 5) - eq_(idx.kwargs['participating_foobar'], 5) + eq_(idx.dialect_options["participating"]["foobar"], 5) + eq_(idx.kwargs["participating_foobar"], 5) def test_ad_hoc_nonparticipating_via_kwargs(self): with self._fixture(): - idx = Index('a', 'b', 'c') - idx.kwargs['nonparticipating_foobar'] = 5 + idx = Index("a", "b", "c") + idx.kwargs["nonparticipating_foobar"] = 5 - eq_(idx.dialect_options['nonparticipating']['foobar'], 5) - eq_(idx.kwargs['nonparticipating_foobar'], 5) + eq_(idx.dialect_options["nonparticipating"]["foobar"], 5) + eq_(idx.kwargs["nonparticipating_foobar"], 5) def test_ad_hoc_via_kwargs_invalid_key(self): with self._fixture(): - idx = Index('a', 'b', 'c') + idx = Index("a", "b", "c") assert_raises_message( exc.ArgumentError, "Keys must be of the form _", - idx.kwargs.__setitem__, "foobar", 5 + idx.kwargs.__setitem__, + "foobar", + 5, ) def test_ad_hoc_via_kwargs_invalid_dialect(self): with self._fixture(): - idx = Index('a', 'b', 'c') + idx = Index("a", "b", "c") assert_raises_message( exc.ArgumentError, "no dialect 'nonexistent'", - idx.kwargs.__setitem__, "nonexistent_foobar", 5 + idx.kwargs.__setitem__, + "nonexistent_foobar", + 5, ) def test_add_new_arguments_participating(self): with self._fixture(): Index.argument_for("participating", "xyzqpr", False) - idx = Index('a', 'b', 'c', participating_xyzqpr=True) + idx = Index("a", "b", "c", participating_xyzqpr=True) - eq_(idx.kwargs['participating_xyzqpr'], True) + eq_(idx.kwargs["participating_xyzqpr"], True) - idx = Index('a', 'b', 'c') - eq_(idx.dialect_options['participating']['xyzqpr'], False) + idx = Index("a", "b", "c") + eq_(idx.dialect_options["participating"]["xyzqpr"], False) def test_add_new_arguments_participating_no_existing(self): with self._fixture(): PrimaryKeyConstraint.argument_for("participating", "xyzqpr", False) - pk = PrimaryKeyConstraint('a', 'b', 'c', participating_xyzqpr=True) + pk = PrimaryKeyConstraint("a", "b", "c", participating_xyzqpr=True) - eq_(pk.kwargs['participating_xyzqpr'], True) + eq_(pk.kwargs["participating_xyzqpr"], True) - pk = PrimaryKeyConstraint('a', 'b', 'c') - eq_(pk.dialect_options['participating']['xyzqpr'], False) + pk = PrimaryKeyConstraint("a", "b", "c") + eq_(pk.dialect_options["participating"]["xyzqpr"], False) def test_add_new_arguments_nonparticipating(self): with self._fixture(): @@ -4141,7 +4349,10 @@ class DialectKWArgTest(fixtures.TestBase): exc.ArgumentError, "Dialect 'nonparticipating' does have keyword-argument " "validation and defaults enabled configured", - Index.argument_for, "nonparticipating", "xyzqpr", False + Index.argument_for, + "nonparticipating", + "xyzqpr", + False, ) def test_add_new_arguments_invalid_dialect(self): @@ -4149,43 +4360,48 @@ class DialectKWArgTest(fixtures.TestBase): assert_raises_message( exc.ArgumentError, "no dialect 'nonexistent'", - Index.argument_for, "nonexistent", "foobar", 5 + Index.argument_for, + "nonexistent", + "foobar", + 5, ) class NamingConventionTest(fixtures.TestBase, AssertsCompiledSQL): - __dialect__ = 'default' + __dialect__ = "default" def _fixture(self, naming_convention, table_schema=None): m1 = MetaData(naming_convention=naming_convention) - u1 = Table('user', m1, - Column('id', Integer, primary_key=True), - Column('version', Integer, primary_key=True), - Column('data', String(30)), - Column('Data2', String(30), key="data2"), - Column('Data3', String(30), key="data3"), - schema=table_schema - ) + u1 = Table( + "user", + m1, + Column("id", Integer, primary_key=True), + Column("version", Integer, primary_key=True), + Column("data", String(30)), + Column("Data2", String(30), key="data2"), + Column("Data3", String(30), key="data3"), + schema=table_schema, + ) return u1 def test_uq_name(self): - u1 = self._fixture(naming_convention={ - "uq": "uq_%(table_name)s_%(column_0_name)s" - }) + u1 = self._fixture( + naming_convention={"uq": "uq_%(table_name)s_%(column_0_name)s"} + ) uq = UniqueConstraint(u1.c.data) eq_(uq.name, "uq_user_data") def test_uq_conv_name(self): - u1 = self._fixture(naming_convention={ - "uq": "uq_%(table_name)s_%(column_0_name)s" - }) + u1 = self._fixture( + naming_convention={"uq": "uq_%(table_name)s_%(column_0_name)s"} + ) uq = UniqueConstraint(u1.c.data, name=naming.conv("myname")) self.assert_compile( schema.AddConstraint(uq), 'ALTER TABLE "user" ADD CONSTRAINT myname UNIQUE (data)', - dialect="default" + dialect="default", ) def test_uq_defer_name_no_convention(self): @@ -4194,59 +4410,59 @@ class NamingConventionTest(fixtures.TestBase, AssertsCompiledSQL): self.assert_compile( schema.AddConstraint(uq), 'ALTER TABLE "user" ADD CONSTRAINT myname UNIQUE (data)', - dialect="default" + dialect="default", ) def test_uq_defer_name_convention(self): - u1 = self._fixture(naming_convention={ - "uq": "uq_%(table_name)s_%(column_0_name)s" - }) + u1 = self._fixture( + naming_convention={"uq": "uq_%(table_name)s_%(column_0_name)s"} + ) uq = UniqueConstraint(u1.c.data, name=naming._defer_name("myname")) self.assert_compile( schema.AddConstraint(uq), 'ALTER TABLE "user" ADD CONSTRAINT uq_user_data UNIQUE (data)', - dialect="default" + dialect="default", ) def test_uq_key(self): - u1 = self._fixture(naming_convention={ - "uq": "uq_%(table_name)s_%(column_0_key)s" - }) + u1 = self._fixture( + naming_convention={"uq": "uq_%(table_name)s_%(column_0_key)s"} + ) uq = UniqueConstraint(u1.c.data, u1.c.data2) eq_(uq.name, "uq_user_data") def test_uq_label(self): - u1 = self._fixture(naming_convention={ - "uq": "uq_%(table_name)s_%(column_0_label)s" - }) + u1 = self._fixture( + naming_convention={"uq": "uq_%(table_name)s_%(column_0_label)s"} + ) uq = UniqueConstraint(u1.c.data, u1.c.data2) eq_(uq.name, "uq_user_user_data") def test_uq_allcols_underscore_name(self): - u1 = self._fixture(naming_convention={ - "uq": "uq_%(table_name)s_%(column_0_N_name)s" - }) + u1 = self._fixture( + naming_convention={"uq": "uq_%(table_name)s_%(column_0_N_name)s"} + ) uq = UniqueConstraint(u1.c.data, u1.c.data2, u1.c.data3) eq_(uq.name, "uq_user_data_Data2_Data3") def test_uq_allcols_merged_name(self): - u1 = self._fixture(naming_convention={ - "uq": "uq_%(table_name)s_%(column_0N_name)s" - }) + u1 = self._fixture( + naming_convention={"uq": "uq_%(table_name)s_%(column_0N_name)s"} + ) uq = UniqueConstraint(u1.c.data, u1.c.data2, u1.c.data3) eq_(uq.name, "uq_user_dataData2Data3") def test_uq_allcols_merged_key(self): - u1 = self._fixture(naming_convention={ - "uq": "uq_%(table_name)s_%(column_0N_key)s" - }) + u1 = self._fixture( + naming_convention={"uq": "uq_%(table_name)s_%(column_0N_key)s"} + ) uq = UniqueConstraint(u1.c.data, u1.c.data2, u1.c.data3) eq_(uq.name, "uq_user_datadata2data3") def test_uq_allcols_truncated_name(self): - u1 = self._fixture(naming_convention={ - "uq": "uq_%(table_name)s_%(column_0N_name)s" - }) + u1 = self._fixture( + naming_convention={"uq": "uq_%(table_name)s_%(column_0N_name)s"} + ) uq = UniqueConstraint(u1.c.data, u1.c.data2, u1.c.data3) dialect = default.DefaultDialect() @@ -4255,7 +4471,7 @@ class NamingConventionTest(fixtures.TestBase, AssertsCompiledSQL): 'ALTER TABLE "user" ADD ' 'CONSTRAINT "uq_user_dataData2Data3" ' 'UNIQUE (data, "Data2", "Data3")', - dialect=dialect + dialect=dialect, ) dialect.max_identifier_length = 15 @@ -4263,188 +4479,222 @@ class NamingConventionTest(fixtures.TestBase, AssertsCompiledSQL): schema.AddConstraint(uq), 'ALTER TABLE "user" ADD ' 'CONSTRAINT uq_user_2769 UNIQUE (data, "Data2", "Data3")', - dialect=dialect + dialect=dialect, ) def test_fk_allcols_underscore_name(self): - u1 = self._fixture(naming_convention={ - "fk": "fk_%(table_name)s_%(column_0_N_name)s_" - "%(referred_table_name)s_%(referred_column_0_N_name)s"}) + u1 = self._fixture( + naming_convention={ + "fk": "fk_%(table_name)s_%(column_0_N_name)s_" + "%(referred_table_name)s_%(referred_column_0_N_name)s" + } + ) m1 = u1.metadata - a1 = Table('address', m1, - Column('id', Integer, primary_key=True), - Column('UserData', String(30), key="user_data"), - Column('UserData2', String(30), key="user_data2"), - Column('UserData3', String(30), key="user_data3") - ) - fk = ForeignKeyConstraint(['user_data', 'user_data2', 'user_data3'], - ['user.data', 'user.data2', 'user.data3']) + a1 = Table( + "address", + m1, + Column("id", Integer, primary_key=True), + Column("UserData", String(30), key="user_data"), + Column("UserData2", String(30), key="user_data2"), + Column("UserData3", String(30), key="user_data3"), + ) + fk = ForeignKeyConstraint( + ["user_data", "user_data2", "user_data3"], + ["user.data", "user.data2", "user.data3"], + ) a1.append_constraint(fk) self.assert_compile( schema.AddConstraint(fk), - 'ALTER TABLE address ADD CONSTRAINT ' + "ALTER TABLE address ADD CONSTRAINT " '"fk_address_UserData_UserData2_UserData3_user_data_Data2_Data3" ' 'FOREIGN KEY("UserData", "UserData2", "UserData3") ' 'REFERENCES "user" (data, "Data2", "Data3")', - dialect=default.DefaultDialect() + dialect=default.DefaultDialect(), ) def test_fk_allcols_merged_name(self): - u1 = self._fixture(naming_convention={ - "fk": "fk_%(table_name)s_%(column_0N_name)s_" - "%(referred_table_name)s_%(referred_column_0N_name)s"}) + u1 = self._fixture( + naming_convention={ + "fk": "fk_%(table_name)s_%(column_0N_name)s_" + "%(referred_table_name)s_%(referred_column_0N_name)s" + } + ) m1 = u1.metadata - a1 = Table('address', m1, - Column('id', Integer, primary_key=True), - Column('UserData', String(30), key="user_data"), - Column('UserData2', String(30), key="user_data2"), - Column('UserData3', String(30), key="user_data3") - ) - fk = ForeignKeyConstraint(['user_data', 'user_data2', 'user_data3'], - ['user.data', 'user.data2', 'user.data3']) + a1 = Table( + "address", + m1, + Column("id", Integer, primary_key=True), + Column("UserData", String(30), key="user_data"), + Column("UserData2", String(30), key="user_data2"), + Column("UserData3", String(30), key="user_data3"), + ) + fk = ForeignKeyConstraint( + ["user_data", "user_data2", "user_data3"], + ["user.data", "user.data2", "user.data3"], + ) a1.append_constraint(fk) self.assert_compile( schema.AddConstraint(fk), - 'ALTER TABLE address ADD CONSTRAINT ' + "ALTER TABLE address ADD CONSTRAINT " '"fk_address_UserDataUserData2UserData3_user_dataData2Data3" ' 'FOREIGN KEY("UserData", "UserData2", "UserData3") ' 'REFERENCES "user" (data, "Data2", "Data3")', - dialect=default.DefaultDialect() + dialect=default.DefaultDialect(), ) def test_fk_allcols_truncated_name(self): - u1 = self._fixture(naming_convention={ - "fk": "fk_%(table_name)s_%(column_0N_name)s_" - "%(referred_table_name)s_%(referred_column_0N_name)s"}) + u1 = self._fixture( + naming_convention={ + "fk": "fk_%(table_name)s_%(column_0N_name)s_" + "%(referred_table_name)s_%(referred_column_0N_name)s" + } + ) m1 = u1.metadata - a1 = Table('address', m1, - Column('id', Integer, primary_key=True), - Column('UserData', String(30), key="user_data"), - Column('UserData2', String(30), key="user_data2"), - Column('UserData3', String(30), key="user_data3") - ) - fk = ForeignKeyConstraint(['user_data', 'user_data2', 'user_data3'], - ['user.data', 'user.data2', 'user.data3']) + a1 = Table( + "address", + m1, + Column("id", Integer, primary_key=True), + Column("UserData", String(30), key="user_data"), + Column("UserData2", String(30), key="user_data2"), + Column("UserData3", String(30), key="user_data3"), + ) + fk = ForeignKeyConstraint( + ["user_data", "user_data2", "user_data3"], + ["user.data", "user.data2", "user.data3"], + ) a1.append_constraint(fk) dialect = default.DefaultDialect() dialect.max_identifier_length = 15 self.assert_compile( schema.AddConstraint(fk), - 'ALTER TABLE address ADD CONSTRAINT ' - 'fk_addr_f9ff ' + "ALTER TABLE address ADD CONSTRAINT " + "fk_addr_f9ff " 'FOREIGN KEY("UserData", "UserData2", "UserData3") ' 'REFERENCES "user" (data, "Data2", "Data3")', - dialect=dialect + dialect=dialect, ) def test_ix_allcols_truncation(self): - u1 = self._fixture(naming_convention={ - "ix": "ix_%(table_name)s_%(column_0N_name)s" - }) + u1 = self._fixture( + naming_convention={"ix": "ix_%(table_name)s_%(column_0N_name)s"} + ) ix = Index(None, u1.c.data, u1.c.data2, u1.c.data3) dialect = default.DefaultDialect() dialect.max_identifier_length = 15 self.assert_compile( schema.CreateIndex(ix), - 'CREATE INDEX ix_user_2de9 ON ' - '"user" (data, "Data2", "Data3")', - dialect=dialect + "CREATE INDEX ix_user_2de9 ON " '"user" (data, "Data2", "Data3")', + dialect=dialect, ) def test_ix_name(self): - u1 = self._fixture(naming_convention={ - "ix": "ix_%(table_name)s_%(column_0_name)s" - }) + u1 = self._fixture( + naming_convention={"ix": "ix_%(table_name)s_%(column_0_name)s"} + ) ix = Index(None, u1.c.data) eq_(ix.name, "ix_user_data") def test_ck_name_required(self): - u1 = self._fixture(naming_convention={ - "ck": "ck_%(table_name)s_%(constraint_name)s" - }) - ck = CheckConstraint(u1.c.data == 'x', name='mycheck') + u1 = self._fixture( + naming_convention={"ck": "ck_%(table_name)s_%(constraint_name)s"} + ) + ck = CheckConstraint(u1.c.data == "x", name="mycheck") eq_(ck.name, "ck_user_mycheck") assert_raises_message( exc.InvalidRequestError, r"Naming convention including %\(constraint_name\)s token " "requires that constraint is explicitly named.", - CheckConstraint, u1.c.data == 'x' + CheckConstraint, + u1.c.data == "x", ) def test_ck_name_deferred_required(self): - u1 = self._fixture(naming_convention={ - "ck": "ck_%(table_name)s_%(constraint_name)s" - }) - ck = CheckConstraint(u1.c.data == 'x', name=elements._defer_name(None)) + u1 = self._fixture( + naming_convention={"ck": "ck_%(table_name)s_%(constraint_name)s"} + ) + ck = CheckConstraint(u1.c.data == "x", name=elements._defer_name(None)) assert_raises_message( exc.InvalidRequestError, r"Naming convention including %\(constraint_name\)s token " "requires that constraint is explicitly named.", - schema.AddConstraint(ck).compile + schema.AddConstraint(ck).compile, ) def test_column_attached_ck_name(self): - m = MetaData(naming_convention={ - "ck": "ck_%(table_name)s_%(constraint_name)s" - }) - ck = CheckConstraint('x > 5', name='x1') - Table('t', m, Column('x', ck)) + m = MetaData( + naming_convention={"ck": "ck_%(table_name)s_%(constraint_name)s"} + ) + ck = CheckConstraint("x > 5", name="x1") + Table("t", m, Column("x", ck)) eq_(ck.name, "ck_t_x1") def test_table_attached_ck_name(self): - m = MetaData(naming_convention={ - "ck": "ck_%(table_name)s_%(constraint_name)s" - }) - ck = CheckConstraint('x > 5', name='x1') - Table('t', m, Column('x', Integer), ck) + m = MetaData( + naming_convention={"ck": "ck_%(table_name)s_%(constraint_name)s"} + ) + ck = CheckConstraint("x > 5", name="x1") + Table("t", m, Column("x", Integer), ck) eq_(ck.name, "ck_t_x1") def test_uq_name_already_conv(self): - m = MetaData(naming_convention={ - "uq": "uq_%(constraint_name)s_%(column_0_name)s" - }) + m = MetaData( + naming_convention={ + "uq": "uq_%(constraint_name)s_%(column_0_name)s" + } + ) - t = Table('mytable', m) - uq = UniqueConstraint(name=naming.conv('my_special_key')) + t = Table("mytable", m) + uq = UniqueConstraint(name=naming.conv("my_special_key")) t.append_constraint(uq) eq_(uq.name, "my_special_key") def test_fk_name_schema(self): - u1 = self._fixture(naming_convention={ - "fk": "fk_%(table_name)s_%(column_0_name)s_" - "%(referred_table_name)s_%(referred_column_0_name)s" - }, table_schema="foo") + u1 = self._fixture( + naming_convention={ + "fk": "fk_%(table_name)s_%(column_0_name)s_" + "%(referred_table_name)s_%(referred_column_0_name)s" + }, + table_schema="foo", + ) m1 = u1.metadata - a1 = Table('address', m1, - Column('id', Integer, primary_key=True), - Column('user_id', Integer), - Column('user_version_id', Integer) - ) - fk = ForeignKeyConstraint(['user_id', 'user_version_id'], - ['foo.user.id', 'foo.user.version']) + a1 = Table( + "address", + m1, + Column("id", Integer, primary_key=True), + Column("user_id", Integer), + Column("user_version_id", Integer), + ) + fk = ForeignKeyConstraint( + ["user_id", "user_version_id"], ["foo.user.id", "foo.user.version"] + ) a1.append_constraint(fk) eq_(fk.name, "fk_address_user_id_user_id") def test_fk_attrs(self): - u1 = self._fixture(naming_convention={ - "fk": "fk_%(table_name)s_%(column_0_name)s_" - "%(referred_table_name)s_%(referred_column_0_name)s" - }) + u1 = self._fixture( + naming_convention={ + "fk": "fk_%(table_name)s_%(column_0_name)s_" + "%(referred_table_name)s_%(referred_column_0_name)s" + } + ) m1 = u1.metadata - a1 = Table('address', m1, - Column('id', Integer, primary_key=True), - Column('user_id', Integer), - Column('user_version_id', Integer) - ) - fk = ForeignKeyConstraint(['user_id', 'user_version_id'], - ['user.id', 'user.version']) + a1 = Table( + "address", + m1, + Column("id", Integer, primary_key=True), + Column("user_id", Integer), + Column("user_version_id", Integer), + ) + fk = ForeignKeyConstraint( + ["user_id", "user_version_id"], ["user.id", "user.version"] + ) a1.append_constraint(fk) eq_(fk.name, "fk_address_user_id_user_id") @@ -4452,32 +4702,38 @@ class NamingConventionTest(fixtures.TestBase, AssertsCompiledSQL): def key_hash(const, table): return "HASH_%s" % table.name - u1 = self._fixture(naming_convention={ - "fk": "fk_%(table_name)s_%(key_hash)s", - "key_hash": key_hash - }) + u1 = self._fixture( + naming_convention={ + "fk": "fk_%(table_name)s_%(key_hash)s", + "key_hash": key_hash, + } + ) m1 = u1.metadata - a1 = Table('address', m1, - Column('id', Integer, primary_key=True), - Column('user_id', Integer), - Column('user_version_id', Integer) - ) - fk = ForeignKeyConstraint(['user_id', 'user_version_id'], - ['user.id', 'user.version']) + a1 = Table( + "address", + m1, + Column("id", Integer, primary_key=True), + Column("user_id", Integer), + Column("user_version_id", Integer), + ) + fk = ForeignKeyConstraint( + ["user_id", "user_version_id"], ["user.id", "user.version"] + ) a1.append_constraint(fk) eq_(fk.name, "fk_address_HASH_address") def test_schematype_ck_name_boolean(self): - m1 = MetaData(naming_convention={ - "ck": "ck_%(table_name)s_%(constraint_name)s"}) + m1 = MetaData( + naming_convention={"ck": "ck_%(table_name)s_%(constraint_name)s"} + ) - u1 = Table('user', m1, - Column('x', Boolean(name='foo')) - ) + u1 = Table("user", m1, Column("x", Boolean(name="foo"))) # constraint is not hit eq_( - [c for c in u1.constraints - if isinstance(c, CheckConstraint)][0].name, "foo" + [c for c in u1.constraints if isinstance(c, CheckConstraint)][ + 0 + ].name, + "foo", ) # but is hit at compile time self.assert_compile( @@ -4485,20 +4741,21 @@ class NamingConventionTest(fixtures.TestBase, AssertsCompiledSQL): 'CREATE TABLE "user" (' "x BOOLEAN, " "CONSTRAINT ck_user_foo CHECK (x IN (0, 1))" - ")" + ")", ) def test_schematype_ck_name_boolean_not_on_name(self): - m1 = MetaData(naming_convention={ - "ck": "ck_%(table_name)s_%(column_0_name)s"}) + m1 = MetaData( + naming_convention={"ck": "ck_%(table_name)s_%(column_0_name)s"} + ) - u1 = Table('user', m1, - Column('x', Boolean()) - ) + u1 = Table("user", m1, Column("x", Boolean())) # constraint is not hit eq_( - [c for c in u1.constraints - if isinstance(c, CheckConstraint)][0].name, "_unnamed_" + [c for c in u1.constraints if isinstance(c, CheckConstraint)][ + 0 + ].name, + "_unnamed_", ) # but is hit at compile time self.assert_compile( @@ -4506,19 +4763,20 @@ class NamingConventionTest(fixtures.TestBase, AssertsCompiledSQL): 'CREATE TABLE "user" (' "x BOOLEAN, " "CONSTRAINT ck_user_x CHECK (x IN (0, 1))" - ")" + ")", ) def test_schematype_ck_name_enum(self): - m1 = MetaData(naming_convention={ - "ck": "ck_%(table_name)s_%(constraint_name)s"}) + m1 = MetaData( + naming_convention={"ck": "ck_%(table_name)s_%(constraint_name)s"} + ) - u1 = Table('user', m1, - Column('x', Enum('a', 'b', name='foo')) - ) + u1 = Table("user", m1, Column("x", Enum("a", "b", name="foo"))) eq_( - [c for c in u1.constraints - if isinstance(c, CheckConstraint)][0].name, "foo" + [c for c in u1.constraints if isinstance(c, CheckConstraint)][ + 0 + ].name, + "foo", ) # but is hit at compile time self.assert_compile( @@ -4526,19 +4784,22 @@ class NamingConventionTest(fixtures.TestBase, AssertsCompiledSQL): 'CREATE TABLE "user" (' "x VARCHAR(1), " "CONSTRAINT ck_user_foo CHECK (x IN ('a', 'b'))" - ")" + ")", ) def test_schematype_ck_name_propagate_conv(self): - m1 = MetaData(naming_convention={ - "ck": "ck_%(table_name)s_%(constraint_name)s"}) + m1 = MetaData( + naming_convention={"ck": "ck_%(table_name)s_%(constraint_name)s"} + ) - u1 = Table('user', m1, - Column('x', Enum('a', 'b', name=naming.conv('foo'))) - ) + u1 = Table( + "user", m1, Column("x", Enum("a", "b", name=naming.conv("foo"))) + ) eq_( - [c for c in u1.constraints - if isinstance(c, CheckConstraint)][0].name, "foo" + [c for c in u1.constraints if isinstance(c, CheckConstraint)][ + 0 + ].name, + "foo", ) # but is hit at compile time self.assert_compile( @@ -4546,62 +4807,60 @@ class NamingConventionTest(fixtures.TestBase, AssertsCompiledSQL): 'CREATE TABLE "user" (' "x VARCHAR(1), " "CONSTRAINT foo CHECK (x IN ('a', 'b'))" - ")" + ")", ) def test_schematype_ck_name_boolean_no_name(self): - m1 = MetaData(naming_convention={ - "ck": "ck_%(table_name)s_%(constraint_name)s" - }) - - u1 = Table( - 'user', m1, - Column('x', Boolean()) + m1 = MetaData( + naming_convention={"ck": "ck_%(table_name)s_%(constraint_name)s"} ) + + u1 = Table("user", m1, Column("x", Boolean())) # constraint gets special _defer_none_name eq_( - [c for c in u1.constraints - if isinstance(c, CheckConstraint)][0].name, "_unnamed_" + [c for c in u1.constraints if isinstance(c, CheckConstraint)][ + 0 + ].name, + "_unnamed_", ) # no issue with native boolean self.assert_compile( schema.CreateTable(u1), - 'CREATE TABLE "user" (' - "x BOOLEAN" - ")", - dialect='postgresql' + 'CREATE TABLE "user" (' "x BOOLEAN" ")", + dialect="postgresql", ) assert_raises_message( exc.InvalidRequestError, r"Naming convention including \%\(constraint_name\)s token " r"requires that constraint is explicitly named.", - schema.CreateTable(u1).compile, dialect=default.DefaultDialect() + schema.CreateTable(u1).compile, + dialect=default.DefaultDialect(), ) def test_schematype_no_ck_name_boolean_no_name(self): m1 = MetaData() # no naming convention - u1 = Table( - 'user', m1, - Column('x', Boolean()) - ) + u1 = Table("user", m1, Column("x", Boolean())) # constraint gets special _defer_none_name eq_( - [c for c in u1.constraints - if isinstance(c, CheckConstraint)][0].name, "_unnamed_" + [c for c in u1.constraints if isinstance(c, CheckConstraint)][ + 0 + ].name, + "_unnamed_", ) self.assert_compile( schema.CreateTable(u1), - 'CREATE TABLE "user" (x BOOLEAN, CHECK (x IN (0, 1)))' + 'CREATE TABLE "user" (x BOOLEAN, CHECK (x IN (0, 1)))', ) def test_ck_constraint_redundant_event(self): - u1 = self._fixture(naming_convention={ - "ck": "ck_%(table_name)s_%(constraint_name)s"}) + u1 = self._fixture( + naming_convention={"ck": "ck_%(table_name)s_%(constraint_name)s"} + ) - ck1 = CheckConstraint(u1.c.version > 3, name='foo') + ck1 = CheckConstraint(u1.c.version > 3, name="foo") u1.append_constraint(ck1) u1.append_constraint(ck1) u1.append_constraint(ck1) @@ -4615,8 +4874,8 @@ class NamingConventionTest(fixtures.TestBase, AssertsCompiledSQL): eq_(m2.naming_convention, {"pk": "%(table_name)s_pk"}) - t2a = Table('t2', m, Column('id', Integer, primary_key=True)) - t2b = Table('t2', m2, Column('id', Integer, primary_key=True)) + t2a = Table("t2", m, Column("id", Integer, primary_key=True)) + t2b = Table("t2", m2, Column("id", Integer, primary_key=True)) eq_(t2a.primary_key.name, t2b.primary_key.name) eq_(t2b.primary_key.name, "t2_pk") -- cgit v1.2.1