summaryrefslogtreecommitdiff
path: root/test/sql/test_metadata.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/sql/test_metadata.py')
-rw-r--r--test/sql/test_metadata.py3779
1 files changed, 2019 insertions, 1760 deletions
diff --git a/test/sql/test_metadata.py b/test/sql/test_metadata.py
index f030a7e77..9a28b0c7b 100644
--- a/test/sql/test_metadata.py
+++ b/test/sql/test_metadata.py
@@ -2,12 +2,33 @@ from sqlalchemy.testing import assert_raises
from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import emits_warning
import pickle
-from sqlalchemy import Integer, String, UniqueConstraint, \
- CheckConstraint, ForeignKey, MetaData, Sequence, \
- ForeignKeyConstraint, PrimaryKeyConstraint, ColumnDefault, Index, event,\
- events, Unicode, types as sqltypes, bindparam, \
- Table, Column, Boolean, Enum, func, text, TypeDecorator, \
- BLANK_SCHEMA, ARRAY
+from sqlalchemy import (
+ Integer,
+ String,
+ UniqueConstraint,
+ CheckConstraint,
+ ForeignKey,
+ MetaData,
+ Sequence,
+ ForeignKeyConstraint,
+ PrimaryKeyConstraint,
+ ColumnDefault,
+ Index,
+ event,
+ events,
+ Unicode,
+ types as sqltypes,
+ bindparam,
+ Table,
+ Column,
+ Boolean,
+ Enum,
+ func,
+ text,
+ TypeDecorator,
+ BLANK_SCHEMA,
+ ARRAY,
+)
from sqlalchemy import schema, exc
from sqlalchemy.engine import default
from sqlalchemy.sql import elements, naming
@@ -20,14 +41,14 @@ from contextlib import contextmanager
from sqlalchemy import util
from sqlalchemy.testing import engines
-class MetaDataTest(fixtures.TestBase, ComparesTables):
+class MetaDataTest(fixtures.TestBase, ComparesTables):
def test_metadata_contains(self):
metadata = MetaData()
- t1 = Table('t1', metadata, Column('x', Integer))
- t2 = Table('t2', metadata, Column('x', Integer), schema='foo')
- t3 = Table('t2', MetaData(), Column('x', Integer))
- t4 = Table('t1', MetaData(), Column('x', Integer), schema='foo')
+ t1 = Table("t1", metadata, Column("x", Integer))
+ t2 = Table("t2", metadata, Column("x", Integer), schema="foo")
+ t3 = Table("t2", MetaData(), Column("x", Integer))
+ t4 = Table("t1", MetaData(), Column("x", Integer), schema="foo")
assert "t1" in metadata
assert "foo.t2" in metadata
@@ -40,50 +61,68 @@ class MetaDataTest(fixtures.TestBase, ComparesTables):
def test_uninitialized_column_copy(self):
for col in [
- Column('foo', String(), nullable=False),
- Column('baz', String(), unique=True),
+ Column("foo", String(), nullable=False),
+ Column("baz", String(), unique=True),
Column(Integer(), primary_key=True),
- Column('bar', Integer(), Sequence('foo_seq'), primary_key=True,
- key='bar'),
- Column(Integer(), ForeignKey('bat.blah'), doc="this is a col"),
- Column('bar', Integer(), ForeignKey('bat.blah'), primary_key=True,
- key='bar'),
- Column('bar', Integer(), info={'foo': 'bar'}),
+ Column(
+ "bar",
+ Integer(),
+ Sequence("foo_seq"),
+ primary_key=True,
+ key="bar",
+ ),
+ Column(Integer(), ForeignKey("bat.blah"), doc="this is a col"),
+ Column(
+ "bar",
+ Integer(),
+ ForeignKey("bat.blah"),
+ primary_key=True,
+ key="bar",
+ ),
+ Column("bar", Integer(), info={"foo": "bar"}),
]:
c2 = col.copy()
- for attr in ('name', 'type', 'nullable',
- 'primary_key', 'key', 'unique', 'info',
- 'doc'):
+ for attr in (
+ "name",
+ "type",
+ "nullable",
+ "primary_key",
+ "key",
+ "unique",
+ "info",
+ "doc",
+ ):
eq_(getattr(col, attr), getattr(c2, attr))
eq_(len(col.foreign_keys), len(c2.foreign_keys))
if col.default:
- eq_(c2.default.name, 'foo_seq')
+ eq_(c2.default.name, "foo_seq")
for a1, a2 in zip(col.foreign_keys, c2.foreign_keys):
assert a1 is not a2
- eq_(a2._colspec, 'bat.blah')
+ eq_(a2._colspec, "bat.blah")
def test_col_subclass_copy(self):
class MyColumn(schema.Column):
-
def __init__(self, *args, **kw):
- self.widget = kw.pop('widget', None)
+ self.widget = kw.pop("widget", None)
super(MyColumn, self).__init__(*args, **kw)
def copy(self, *arg, **kw):
c = super(MyColumn, self).copy(*arg, **kw)
c.widget = self.widget
return c
- c1 = MyColumn('foo', Integer, widget='x')
+
+ c1 = MyColumn("foo", Integer, widget="x")
c2 = c1.copy()
assert isinstance(c2, MyColumn)
- eq_(c2.widget, 'x')
+ eq_(c2.widget, "x")
def test_uninitialized_column_copy_events(self):
msgs = []
def write(c, t):
msgs.append("attach %s.%s" % (t.name, c.name))
- c1 = Column('foo', String())
+
+ c1 = Column("foo", String())
m = MetaData()
for i in range(3):
cx = c1.copy()
@@ -91,39 +130,39 @@ class MetaDataTest(fixtures.TestBase, ComparesTables):
# that listeners will be re-established from the
# natural construction of things.
cx._on_table_attach(write)
- Table('foo%d' % i, m, cx)
- eq_(msgs, ['attach foo0.foo', 'attach foo1.foo', 'attach foo2.foo'])
+ Table("foo%d" % i, m, cx)
+ eq_(msgs, ["attach foo0.foo", "attach foo1.foo", "attach foo2.foo"])
def test_schema_collection_add(self):
metadata = MetaData()
- Table('t1', metadata, Column('x', Integer), schema='foo')
- Table('t2', metadata, Column('x', Integer), schema='bar')
- Table('t3', metadata, Column('x', Integer))
+ Table("t1", metadata, Column("x", Integer), schema="foo")
+ Table("t2", metadata, Column("x", Integer), schema="bar")
+ Table("t3", metadata, Column("x", Integer))
- eq_(metadata._schemas, set(['foo', 'bar']))
+ eq_(metadata._schemas, set(["foo", "bar"]))
eq_(len(metadata.tables), 3)
def test_schema_collection_remove(self):
metadata = MetaData()
- t1 = Table('t1', metadata, Column('x', Integer), schema='foo')
- Table('t2', metadata, Column('x', Integer), schema='bar')
- t3 = Table('t3', metadata, Column('x', Integer), schema='bar')
+ t1 = Table("t1", metadata, Column("x", Integer), schema="foo")
+ Table("t2", metadata, Column("x", Integer), schema="bar")
+ t3 = Table("t3", metadata, Column("x", Integer), schema="bar")
metadata.remove(t3)
- eq_(metadata._schemas, set(['foo', 'bar']))
+ eq_(metadata._schemas, set(["foo", "bar"]))
eq_(len(metadata.tables), 2)
metadata.remove(t1)
- eq_(metadata._schemas, set(['bar']))
+ eq_(metadata._schemas, set(["bar"]))
eq_(len(metadata.tables), 1)
def test_schema_collection_remove_all(self):
metadata = MetaData()
- Table('t1', metadata, Column('x', Integer), schema='foo')
- Table('t2', metadata, Column('x', Integer), schema='bar')
+ Table("t1", metadata, Column("x", Integer), schema="foo")
+ Table("t2", metadata, Column("x", Integer), schema="bar")
metadata.clear()
eq_(metadata._schemas, set())
@@ -132,46 +171,56 @@ class MetaDataTest(fixtures.TestBase, ComparesTables):
def test_metadata_tables_immutable(self):
metadata = MetaData()
- Table('t1', metadata, Column('x', Integer))
- assert 't1' in metadata.tables
+ Table("t1", metadata, Column("x", Integer))
+ assert "t1" in metadata.tables
- assert_raises(
- TypeError,
- lambda: metadata.tables.pop('t1')
- )
+ assert_raises(TypeError, lambda: metadata.tables.pop("t1"))
@testing.provide_metadata
def test_dupe_tables(self):
metadata = self.metadata
- Table('table1', metadata,
- Column('col1', Integer, primary_key=True),
- Column('col2', String(20)))
+ Table(
+ "table1",
+ metadata,
+ Column("col1", Integer, primary_key=True),
+ Column("col2", String(20)),
+ )
metadata.create_all()
- Table('table1', metadata, autoload=True)
+ Table("table1", metadata, autoload=True)
def go():
- Table('table1', metadata,
- Column('col1', Integer, primary_key=True),
- Column('col2', String(20)))
+ Table(
+ "table1",
+ metadata,
+ Column("col1", Integer, primary_key=True),
+ Column("col2", String(20)),
+ )
+
assert_raises_message(
tsa.exc.InvalidRequestError,
"Table 'table1' is already defined for this "
"MetaData instance. Specify 'extend_existing=True' "
"to redefine options and columns on an existing "
"Table object.",
- go
+ go,
)
def test_fk_copy(self):
- c1 = Column('foo', Integer)
- c2 = Column('bar', Integer)
+ c1 = Column("foo", Integer)
+ c2 = Column("bar", Integer)
m = MetaData()
- t1 = Table('t', m, c1, c2)
-
- kw = dict(onupdate="X",
- ondelete="Y", use_alter=True, name='f1',
- deferrable="Z", initially="Q", link_to_name=True)
+ t1 = Table("t", m, c1, c2)
+
+ kw = dict(
+ onupdate="X",
+ ondelete="Y",
+ use_alter=True,
+ name="f1",
+ deferrable="Z",
+ initially="Q",
+ link_to_name=True,
+ )
fk1 = ForeignKey(c1, **kw)
fk2 = ForeignKeyConstraint((c1,), (c2,), **kw)
@@ -185,14 +234,18 @@ class MetaDataTest(fixtures.TestBase, ComparesTables):
eq_(getattr(fk2c, k), kw[k])
def test_check_constraint_copy(self):
- def r(x): return x
- c = CheckConstraint("foo bar",
- name='name',
- initially=True,
- deferrable=True,
- _create_rule=r)
+ def r(x):
+ return x
+
+ c = CheckConstraint(
+ "foo bar",
+ name="name",
+ initially=True,
+ deferrable=True,
+ _create_rule=r,
+ )
c2 = c.copy()
- eq_(c2.name, 'name')
+ eq_(c2.name, "name")
eq_(str(c2.sqltext), "foo bar")
eq_(c2.initially, True)
eq_(c2.deferrable, True)
@@ -200,152 +253,157 @@ class MetaDataTest(fixtures.TestBase, ComparesTables):
def test_col_replace_w_constraint(self):
m = MetaData()
- a = Table('a', m, Column('id', Integer, primary_key=True))
+ a = Table("a", m, Column("id", Integer, primary_key=True))
- aid = Column('a_id', ForeignKey('a.id'))
- b = Table('b', m, aid)
+ aid = Column("a_id", ForeignKey("a.id"))
+ b = Table("b", m, aid)
b.append_column(aid)
assert b.c.a_id.references(a.c.id)
eq_(len(b.constraints), 2)
def test_fk_construct(self):
- c1 = Column('foo', Integer)
- c2 = Column('bar', Integer)
+ c1 = Column("foo", Integer)
+ c2 = Column("bar", Integer)
m = MetaData()
- t1 = Table('t', m, c1, c2)
- fk1 = ForeignKeyConstraint(('foo', ), ('bar', ), table=t1)
+ t1 = Table("t", m, c1, c2)
+ fk1 = ForeignKeyConstraint(("foo",), ("bar",), table=t1)
assert fk1 in t1.constraints
def test_fk_constraint_col_collection_w_table(self):
- c1 = Column('foo', Integer)
- c2 = Column('bar', Integer)
+ c1 = Column("foo", Integer)
+ c2 = Column("bar", Integer)
m = MetaData()
- t1 = Table('t', m, c1, c2)
- fk1 = ForeignKeyConstraint(('foo', ), ('bar', ), table=t1)
+ t1 = Table("t", m, c1, c2)
+ fk1 = ForeignKeyConstraint(("foo",), ("bar",), table=t1)
eq_(dict(fk1.columns), {"foo": c1})
def test_fk_constraint_col_collection_no_table(self):
- fk1 = ForeignKeyConstraint(('foo', 'bat'), ('bar', 'hoho'))
+ fk1 = ForeignKeyConstraint(("foo", "bat"), ("bar", "hoho"))
eq_(dict(fk1.columns), {})
- eq_(fk1.column_keys, ['foo', 'bat'])
- eq_(fk1._col_description, 'foo, bat')
+ eq_(fk1.column_keys, ["foo", "bat"])
+ eq_(fk1._col_description, "foo, bat")
eq_(fk1._elements, {"foo": fk1.elements[0], "bat": fk1.elements[1]})
def test_fk_constraint_col_collection_no_table_real_cols(self):
- c1 = Column('foo', Integer)
- c2 = Column('bar', Integer)
- fk1 = ForeignKeyConstraint((c1, ), (c2, ))
+ c1 = Column("foo", Integer)
+ c2 = Column("bar", Integer)
+ fk1 = ForeignKeyConstraint((c1,), (c2,))
eq_(dict(fk1.columns), {})
- eq_(fk1.column_keys, ['foo'])
- eq_(fk1._col_description, 'foo')
+ eq_(fk1.column_keys, ["foo"])
+ eq_(fk1._col_description, "foo")
eq_(fk1._elements, {"foo": fk1.elements[0]})
def test_fk_constraint_col_collection_added_to_table(self):
- c1 = Column('foo', Integer)
+ c1 = Column("foo", Integer)
m = MetaData()
- fk1 = ForeignKeyConstraint(('foo', ), ('bar', ))
- Table('t', m, c1, fk1)
+ fk1 = ForeignKeyConstraint(("foo",), ("bar",))
+ Table("t", m, c1, fk1)
eq_(dict(fk1.columns), {"foo": c1})
eq_(fk1._elements, {"foo": fk1.elements[0]})
def test_fk_constraint_col_collection_via_fk(self):
- fk = ForeignKey('bar')
- c1 = Column('foo', Integer, fk)
+ fk = ForeignKey("bar")
+ c1 = Column("foo", Integer, fk)
m = MetaData()
- t1 = Table('t', m, c1)
+ t1 = Table("t", m, c1)
fk1 = fk.constraint
- eq_(fk1.column_keys, ['foo'])
+ eq_(fk1.column_keys, ["foo"])
assert fk1 in t1.constraints
- eq_(fk1.column_keys, ['foo'])
+ eq_(fk1.column_keys, ["foo"])
eq_(dict(fk1.columns), {"foo": c1})
eq_(fk1._elements, {"foo": fk})
def test_fk_no_such_parent_col_error(self):
meta = MetaData()
- a = Table('a', meta, Column('a', Integer))
- Table('b', meta, Column('b', Integer))
+ a = Table("a", meta, Column("a", Integer))
+ Table("b", meta, Column("b", Integer))
def go():
- a.append_constraint(
- ForeignKeyConstraint(['x'], ['b.b'])
- )
+ a.append_constraint(ForeignKeyConstraint(["x"], ["b.b"]))
+
assert_raises_message(
exc.ArgumentError,
"Can't create ForeignKeyConstraint on "
"table 'a': no column named 'x' is present.",
- go
+ go,
)
def test_fk_given_non_col(self):
- not_a_col = bindparam('x')
+ not_a_col = bindparam("x")
assert_raises_message(
exc.ArgumentError,
"String, Column, or Column-bound argument expected, got Bind",
- ForeignKey, not_a_col
+ ForeignKey,
+ not_a_col,
)
def test_fk_given_non_col_clauseelem(self):
class Foo(object):
-
def __clause_element__(self):
- return bindparam('x')
+ return bindparam("x")
+
assert_raises_message(
exc.ArgumentError,
"String, Column, or Column-bound argument expected, got Bind",
- ForeignKey, Foo()
+ ForeignKey,
+ Foo(),
)
def test_fk_given_col_non_table(self):
- t = Table('t', MetaData(), Column('x', Integer))
+ t = Table("t", MetaData(), Column("x", Integer))
xa = t.alias().c.x
assert_raises_message(
exc.ArgumentError,
"ForeignKey received Column not bound to a Table, got: .*Alias",
- ForeignKey, xa
+ ForeignKey,
+ xa,
)
def test_fk_given_col_non_table_clauseelem(self):
- t = Table('t', MetaData(), Column('x', Integer))
+ t = Table("t", MetaData(), Column("x", Integer))
class Foo(object):
-
def __clause_element__(self):
return t.alias().c.x
assert_raises_message(
exc.ArgumentError,
"ForeignKey received Column not bound to a Table, got: .*Alias",
- ForeignKey, Foo()
+ ForeignKey,
+ Foo(),
)
def test_fk_no_such_target_col_error_upfront(self):
meta = MetaData()
- a = Table('a', meta, Column('a', Integer))
- Table('b', meta, Column('b', Integer))
+ a = Table("a", meta, Column("a", Integer))
+ Table("b", meta, Column("b", Integer))
- a.append_constraint(ForeignKeyConstraint(['a'], ['b.x']))
+ a.append_constraint(ForeignKeyConstraint(["a"], ["b.x"]))
assert_raises_message(
exc.NoReferencedColumnError,
"Could not initialize target column for ForeignKey 'b.x' on "
"table 'a': table 'b' has no column named 'x'",
- getattr, list(a.foreign_keys)[0], "column"
+ getattr,
+ list(a.foreign_keys)[0],
+ "column",
)
def test_fk_no_such_target_col_error_delayed(self):
meta = MetaData()
- a = Table('a', meta, Column('a', Integer))
- a.append_constraint(
- ForeignKeyConstraint(['a'], ['b.x']))
+ a = Table("a", meta, Column("a", Integer))
+ a.append_constraint(ForeignKeyConstraint(["a"], ["b.x"]))
- b = Table('b', meta, Column('b', Integer))
+ b = Table("b", meta, Column("b", Integer))
assert_raises_message(
exc.NoReferencedColumnError,
"Could not initialize target column for ForeignKey 'b.x' on "
"table 'a': table 'b' has no column named 'x'",
- getattr, list(a.foreign_keys)[0], "column"
+ getattr,
+ list(a.foreign_keys)[0],
+ "column",
)
def test_fk_mismatched_local_remote_cols(self):
@@ -354,177 +412,202 @@ class MetaDataTest(fixtures.TestBase, ComparesTables):
exc.ArgumentError,
"ForeignKeyConstraint number of constrained columns must "
"match the number of referenced columns.",
- ForeignKeyConstraint, ['a'], ['b.a', 'b.b']
+ ForeignKeyConstraint,
+ ["a"],
+ ["b.a", "b.b"],
)
assert_raises_message(
exc.ArgumentError,
"ForeignKeyConstraint number of constrained columns "
"must match the number of referenced columns.",
- ForeignKeyConstraint, ['a', 'b'], ['b.a']
+ ForeignKeyConstraint,
+ ["a", "b"],
+ ["b.a"],
)
assert_raises_message(
exc.ArgumentError,
"ForeignKeyConstraint with duplicate source column "
"references are not supported.",
- ForeignKeyConstraint, ['a', 'a'], ['b.a', 'b.b']
+ ForeignKeyConstraint,
+ ["a", "a"],
+ ["b.a", "b.b"],
)
def test_pickle_metadata_sequence_restated(self):
m1 = MetaData()
- Table('a', m1,
- Column('id', Integer, primary_key=True),
- Column('x', Integer, Sequence("x_seq")))
+ Table(
+ "a",
+ m1,
+ Column("id", Integer, primary_key=True),
+ Column("x", Integer, Sequence("x_seq")),
+ )
m2 = pickle.loads(pickle.dumps(m1))
s2 = Sequence("x_seq")
- t2 = Table('a', m2,
- Column('id', Integer, primary_key=True),
- Column('x', Integer, s2),
- extend_existing=True)
+ t2 = Table(
+ "a",
+ m2,
+ Column("id", Integer, primary_key=True),
+ Column("x", Integer, s2),
+ extend_existing=True,
+ )
- assert m2._sequences['x_seq'] is t2.c.x.default
- assert m2._sequences['x_seq'] is s2
+ assert m2._sequences["x_seq"] is t2.c.x.default
+ assert m2._sequences["x_seq"] is s2
def test_sequence_restated_replaced(self):
"""Test restatement of Sequence replaces."""
m1 = MetaData()
s1 = Sequence("x_seq")
- t = Table('a', m1,
- Column('x', Integer, s1)
- )
- assert m1._sequences['x_seq'] is s1
-
- s2 = Sequence('x_seq')
- Table('a', m1,
- Column('x', Integer, s2),
- extend_existing=True
- )
+ t = Table("a", m1, Column("x", Integer, s1))
+ assert m1._sequences["x_seq"] is s1
+
+ s2 = Sequence("x_seq")
+ Table("a", m1, Column("x", Integer, s2), extend_existing=True)
assert t.c.x.default is s2
- assert m1._sequences['x_seq'] is s2
+ assert m1._sequences["x_seq"] is s2
def test_sequence_attach_to_table(self):
m1 = MetaData()
s1 = Sequence("s")
- t = Table('a', m1, Column('x', Integer, s1))
+ t = Table("a", m1, Column("x", Integer, s1))
assert s1.metadata is m1
def test_sequence_attach_to_existing_table(self):
m1 = MetaData()
s1 = Sequence("s")
- t = Table('a', m1, Column('x', Integer))
+ t = Table("a", m1, Column("x", Integer))
t.c.x._init_items(s1)
assert s1.metadata is m1
def test_pickle_metadata_sequence_implicit(self):
m1 = MetaData()
- Table('a', m1,
- Column('id', Integer, primary_key=True),
- Column('x', Integer, Sequence("x_seq")))
+ Table(
+ "a",
+ m1,
+ Column("id", Integer, primary_key=True),
+ Column("x", Integer, Sequence("x_seq")),
+ )
m2 = pickle.loads(pickle.dumps(m1))
- t2 = Table('a', m2, extend_existing=True)
+ t2 = Table("a", m2, extend_existing=True)
- eq_(m2._sequences, {'x_seq': t2.c.x.default})
+ eq_(m2._sequences, {"x_seq": t2.c.x.default})
def test_pickle_metadata_schema(self):
m1 = MetaData()
- Table('a', m1,
- Column('id', Integer, primary_key=True),
- Column('x', Integer, Sequence("x_seq")),
- schema='y')
+ Table(
+ "a",
+ m1,
+ Column("id", Integer, primary_key=True),
+ Column("x", Integer, Sequence("x_seq")),
+ schema="y",
+ )
m2 = pickle.loads(pickle.dumps(m1))
- Table('a', m2, schema='y',
- extend_existing=True)
+ Table("a", m2, schema="y", extend_existing=True)
eq_(m2._schemas, m1._schemas)
def test_metadata_schema_arg(self):
- m1 = MetaData(schema='sch1')
- m2 = MetaData(schema='sch1', quote_schema=True)
- m3 = MetaData(schema='sch1', quote_schema=False)
+ m1 = MetaData(schema="sch1")
+ m2 = MetaData(schema="sch1", quote_schema=True)
+ m3 = MetaData(schema="sch1", quote_schema=False)
m4 = MetaData()
- for i, (name, metadata, schema, quote_schema,
- exp_schema, exp_quote_schema) in enumerate([
- ('t1', m1, None, None, 'sch1', None),
- ('t2', m1, 'sch2', None, 'sch2', None),
- ('t3', m1, 'sch2', True, 'sch2', True),
- ('t4', m1, 'sch1', None, 'sch1', None),
- ('t5', m1, BLANK_SCHEMA, None, None, None),
- ('t1', m2, None, None, 'sch1', True),
- ('t2', m2, 'sch2', None, 'sch2', None),
- ('t3', m2, 'sch2', True, 'sch2', True),
- ('t4', m2, 'sch1', None, 'sch1', None),
- ('t1', m3, None, None, 'sch1', False),
- ('t2', m3, 'sch2', None, 'sch2', None),
- ('t3', m3, 'sch2', True, 'sch2', True),
- ('t4', m3, 'sch1', None, 'sch1', None),
- ('t1', m4, None, None, None, None),
- ('t2', m4, 'sch2', None, 'sch2', None),
- ('t3', m4, 'sch2', True, 'sch2', True),
- ('t4', m4, 'sch1', None, 'sch1', None),
- ('t5', m4, BLANK_SCHEMA, None, None, None),
- ]):
+ for (
+ i,
+ (
+ name,
+ metadata,
+ schema,
+ quote_schema,
+ exp_schema,
+ exp_quote_schema,
+ ),
+ ) in enumerate(
+ [
+ ("t1", m1, None, None, "sch1", None),
+ ("t2", m1, "sch2", None, "sch2", None),
+ ("t3", m1, "sch2", True, "sch2", True),
+ ("t4", m1, "sch1", None, "sch1", None),
+ ("t5", m1, BLANK_SCHEMA, None, None, None),
+ ("t1", m2, None, None, "sch1", True),
+ ("t2", m2, "sch2", None, "sch2", None),
+ ("t3", m2, "sch2", True, "sch2", True),
+ ("t4", m2, "sch1", None, "sch1", None),
+ ("t1", m3, None, None, "sch1", False),
+ ("t2", m3, "sch2", None, "sch2", None),
+ ("t3", m3, "sch2", True, "sch2", True),
+ ("t4", m3, "sch1", None, "sch1", None),
+ ("t1", m4, None, None, None, None),
+ ("t2", m4, "sch2", None, "sch2", None),
+ ("t3", m4, "sch2", True, "sch2", True),
+ ("t4", m4, "sch1", None, "sch1", None),
+ ("t5", m4, BLANK_SCHEMA, None, None, None),
+ ]
+ ):
kw = {}
if schema is not None:
- kw['schema'] = schema
+ kw["schema"] = schema
if quote_schema is not None:
- kw['quote_schema'] = quote_schema
+ kw["quote_schema"] = quote_schema
t = Table(name, metadata, **kw)
eq_(t.schema, exp_schema, "test %d, table schema" % i)
- eq_(t.schema.quote if t.schema is not None else None,
+ eq_(
+ t.schema.quote if t.schema is not None else None,
exp_quote_schema,
- "test %d, table quote_schema" % i)
+ "test %d, table quote_schema" % i,
+ )
seq = Sequence(name, metadata=metadata, **kw)
eq_(seq.schema, exp_schema, "test %d, seq schema" % i)
- eq_(seq.schema.quote if seq.schema is not None else None,
+ eq_(
+ seq.schema.quote if seq.schema is not None else None,
exp_quote_schema,
- "test %d, seq quote_schema" % i)
+ "test %d, seq quote_schema" % i,
+ )
def test_manual_dependencies(self):
meta = MetaData()
- a = Table('a', meta, Column('foo', Integer))
- b = Table('b', meta, Column('foo', Integer))
- c = Table('c', meta, Column('foo', Integer))
- d = Table('d', meta, Column('foo', Integer))
- e = Table('e', meta, Column('foo', Integer))
+ a = Table("a", meta, Column("foo", Integer))
+ b = Table("b", meta, Column("foo", Integer))
+ c = Table("c", meta, Column("foo", Integer))
+ d = Table("d", meta, Column("foo", Integer))
+ e = Table("e", meta, Column("foo", Integer))
e.add_is_dependent_on(c)
a.add_is_dependent_on(b)
b.add_is_dependent_on(d)
e.add_is_dependent_on(b)
c.add_is_dependent_on(a)
- eq_(
- meta.sorted_tables,
- [d, b, a, c, e]
- )
+ eq_(meta.sorted_tables, [d, b, a, c, e])
def test_deterministic_order(self):
meta = MetaData()
- a = Table('a', meta, Column('foo', Integer))
- b = Table('b', meta, Column('foo', Integer))
- c = Table('c', meta, Column('foo', Integer))
- d = Table('d', meta, Column('foo', Integer))
- e = Table('e', meta, Column('foo', Integer))
+ a = Table("a", meta, Column("foo", Integer))
+ b = Table("b", meta, Column("foo", Integer))
+ c = Table("c", meta, Column("foo", Integer))
+ d = Table("d", meta, Column("foo", Integer))
+ e = Table("e", meta, Column("foo", Integer))
e.add_is_dependent_on(c)
a.add_is_dependent_on(b)
- eq_(
- meta.sorted_tables,
- [b, c, d, a, e]
- )
+ eq_(meta.sorted_tables, [b, c, d, a, e])
def test_nonexistent(self):
- assert_raises(tsa.exc.NoSuchTableError, Table,
- 'fake_table',
- MetaData(testing.db), autoload=True)
+ assert_raises(
+ tsa.exc.NoSuchTableError,
+ Table,
+ "fake_table",
+ MetaData(testing.db),
+ autoload=True,
+ )
def test_assorted_repr(self):
t1 = Table("foo", MetaData(), Column("x", Integer))
@@ -532,89 +615,74 @@ class MetaDataTest(fixtures.TestBase, ComparesTables):
ck = schema.CheckConstraint("x > y", name="someconstraint")
for const, exp in (
- (Sequence("my_seq"),
- "Sequence('my_seq')"),
- (Sequence("my_seq", start=5),
- "Sequence('my_seq', start=5)"),
- (Column("foo", Integer),
- "Column('foo', Integer(), table=None)"),
- (Table("bar", MetaData(), Column("x", String)),
+ (Sequence("my_seq"), "Sequence('my_seq')"),
+ (Sequence("my_seq", start=5), "Sequence('my_seq', start=5)"),
+ (Column("foo", Integer), "Column('foo', Integer(), table=None)"),
+ (
+ Table("bar", MetaData(), Column("x", String)),
"Table('bar', MetaData(bind=None), "
- "Column('x', String(), table=<bar>), schema=None)"),
- (schema.DefaultGenerator(for_update=True),
- "DefaultGenerator(for_update=True)"),
+ "Column('x', String(), table=<bar>), schema=None)",
+ ),
+ (
+ schema.DefaultGenerator(for_update=True),
+ "DefaultGenerator(for_update=True)",
+ ),
(schema.Index("bar", "c"), "Index('bar', 'c')"),
(i1, "Index('bar', Column('x', Integer(), table=<foo>))"),
(schema.FetchedValue(), "FetchedValue()"),
- (ck,
- "CheckConstraint("
- "%s"
- ", name='someconstraint')" % repr(ck.sqltext)),
- (ColumnDefault(('foo', 'bar')), "ColumnDefault(('foo', 'bar'))")
+ (
+ ck,
+ "CheckConstraint("
+ "%s"
+ ", name='someconstraint')" % repr(ck.sqltext),
+ ),
+ (ColumnDefault(("foo", "bar")), "ColumnDefault(('foo', 'bar'))"),
):
- eq_(
- repr(const),
- exp
- )
+ eq_(repr(const), exp)
class ToMetaDataTest(fixtures.TestBase, ComparesTables):
-
@testing.requires.check_constraints
def test_copy(self):
from sqlalchemy.testing.schema import Table
+
meta = MetaData()
table = Table(
- 'mytable',
+ "mytable",
meta,
+ Column("myid", Integer, Sequence("foo_id_seq"), primary_key=True),
+ Column("name", String(40), nullable=True),
Column(
- 'myid',
- Integer,
- Sequence('foo_id_seq'),
- primary_key=True),
- Column(
- 'name',
- String(40),
- nullable=True),
- Column(
- 'foo',
+ "foo",
String(40),
nullable=False,
- server_default='x',
- server_onupdate='q'),
+ server_default="x",
+ server_onupdate="q",
+ ),
Column(
- 'bar',
- String(40),
- nullable=False,
- default='y',
- onupdate='z'),
+ "bar", String(40), nullable=False, default="y", onupdate="z"
+ ),
Column(
- 'description',
- String(30),
- CheckConstraint("description='hi'")),
- UniqueConstraint('name'),
- test_needs_fk=True)
+ "description", String(30), CheckConstraint("description='hi'")
+ ),
+ UniqueConstraint("name"),
+ test_needs_fk=True,
+ )
table2 = Table(
- 'othertable',
+ "othertable",
meta,
- Column(
- 'id',
- Integer,
- Sequence('foo_seq'),
- primary_key=True),
- Column(
- 'myid',
- Integer,
- ForeignKey('mytable.myid'),
- ),
- test_needs_fk=True)
+ Column("id", Integer, Sequence("foo_seq"), primary_key=True),
+ Column("myid", Integer, ForeignKey("mytable.myid")),
+ test_needs_fk=True,
+ )
table3 = Table(
- 'has_comments', meta,
- Column('foo', Integer, comment='some column'),
- comment='table comment'
+ "has_comments",
+ meta,
+ Column("foo", Integer, comment="some column"),
+ comment="table comment",
)
def test_to_metadata():
@@ -630,47 +698,61 @@ class ToMetaDataTest(fixtures.TestBase, ComparesTables):
assert meta2.bind is None
pickle.loads(pickle.dumps(meta2))
return (
- meta2.tables['mytable'],
- meta2.tables['othertable'], meta2.tables['has_comments'])
+ meta2.tables["mytable"],
+ meta2.tables["othertable"],
+ meta2.tables["has_comments"],
+ )
def test_pickle_via_reflect():
# this is the most common use case, pickling the results of a
# database reflection
meta2 = MetaData(bind=testing.db)
- t1 = Table('mytable', meta2, autoload=True)
- Table('othertable', meta2, autoload=True)
- Table('has_comments', meta2, autoload=True)
+ t1 = Table("mytable", meta2, autoload=True)
+ Table("othertable", meta2, autoload=True)
+ Table("has_comments", meta2, autoload=True)
meta3 = pickle.loads(pickle.dumps(meta2))
assert meta3.bind is None
- assert meta3.tables['mytable'] is not t1
+ assert meta3.tables["mytable"] is not t1
return (
- meta3.tables['mytable'], meta3.tables['othertable'],
- meta3.tables['has_comments']
+ meta3.tables["mytable"],
+ meta3.tables["othertable"],
+ meta3.tables["has_comments"],
)
meta.create_all(testing.db)
try:
- for test, has_constraints, reflect in \
- (test_to_metadata, True, False), \
- (test_pickle, True, False), \
- (test_pickle_via_reflect, False, True):
+ for test, has_constraints, reflect in (
+ (test_to_metadata, True, False),
+ (test_pickle, True, False),
+ (test_pickle_via_reflect, False, True),
+ ):
table_c, table2_c, table3_c = test()
self.assert_tables_equal(table, table_c)
self.assert_tables_equal(table2, table2_c)
assert table is not table_c
assert table.primary_key is not table_c.primary_key
- assert list(table2_c.c.myid.foreign_keys)[0].column \
+ assert (
+ list(table2_c.c.myid.foreign_keys)[0].column
is table_c.c.myid
- assert list(table2_c.c.myid.foreign_keys)[0].column \
+ )
+ assert (
+ list(table2_c.c.myid.foreign_keys)[0].column
is not table.c.myid
- assert 'x' in str(table_c.c.foo.server_default.arg)
+ )
+ assert "x" in str(table_c.c.foo.server_default.arg)
if not reflect:
assert isinstance(table_c.c.myid.default, Sequence)
- assert str(table_c.c.foo.server_onupdate.arg) == 'q'
- assert str(table_c.c.bar.default.arg) == 'y'
- assert getattr(table_c.c.bar.onupdate.arg, 'arg',
- table_c.c.bar.onupdate.arg) == 'z'
+ assert str(table_c.c.foo.server_onupdate.arg) == "q"
+ assert str(table_c.c.bar.default.arg) == "y"
+ assert (
+ getattr(
+ table_c.c.bar.onupdate.arg,
+ "arg",
+ table_c.c.bar.onupdate.arg,
+ )
+ == "z"
+ )
assert isinstance(table2_c.c.id.default, Sequence)
# constraints don't get reflected for any dialect right
@@ -701,8 +783,8 @@ class ToMetaDataTest(fixtures.TestBase, ComparesTables):
def test_col_key_fk_parent(self):
# test #2643
m1 = MetaData()
- a = Table('a', m1, Column('x', Integer))
- b = Table('b', m1, Column('x', Integer, ForeignKey('a.x'), key='y'))
+ a = Table("a", m1, Column("x", Integer))
+ b = Table("b", m1, Column("x", Integer, ForeignKey("a.x"), key="y"))
assert b.c.y.references(a.c.x)
m2 = MetaData()
@@ -713,120 +795,141 @@ class ToMetaDataTest(fixtures.TestBase, ComparesTables):
def test_change_schema(self):
meta = MetaData()
- table = Table('mytable', meta,
- Column('myid', Integer, primary_key=True),
- Column('name', String(40), nullable=True),
- Column('description', String(30),
- CheckConstraint("description='hi'")),
- UniqueConstraint('name'),
- )
+ table = Table(
+ "mytable",
+ meta,
+ Column("myid", Integer, primary_key=True),
+ Column("name", String(40), nullable=True),
+ Column(
+ "description", String(30), CheckConstraint("description='hi'")
+ ),
+ UniqueConstraint("name"),
+ )
- table2 = Table('othertable', meta,
- Column('id', Integer, primary_key=True),
- Column('myid', Integer, ForeignKey('mytable.myid')),
- )
+ table2 = Table(
+ "othertable",
+ meta,
+ Column("id", Integer, primary_key=True),
+ Column("myid", Integer, ForeignKey("mytable.myid")),
+ )
meta2 = MetaData()
- table_c = table.tometadata(meta2, schema='someschema')
- table2_c = table2.tometadata(meta2, schema='someschema')
+ table_c = table.tometadata(meta2, schema="someschema")
+ table2_c = table2.tometadata(meta2, schema="someschema")
- eq_(str(table_c.join(table2_c).onclause), str(table_c.c.myid
- == table2_c.c.myid))
- eq_(str(table_c.join(table2_c).onclause),
- 'someschema.mytable.myid = someschema.othertable.myid')
+ eq_(
+ str(table_c.join(table2_c).onclause),
+ str(table_c.c.myid == table2_c.c.myid),
+ )
+ eq_(
+ str(table_c.join(table2_c).onclause),
+ "someschema.mytable.myid = someschema.othertable.myid",
+ )
def test_retain_table_schema(self):
meta = MetaData()
- table = Table('mytable', meta,
- Column('myid', Integer, primary_key=True),
- Column('name', String(40), nullable=True),
- Column('description', String(30),
- CheckConstraint("description='hi'")),
- UniqueConstraint('name'),
- schema='myschema',
- )
+ table = Table(
+ "mytable",
+ meta,
+ Column("myid", Integer, primary_key=True),
+ Column("name", String(40), nullable=True),
+ Column(
+ "description", String(30), CheckConstraint("description='hi'")
+ ),
+ UniqueConstraint("name"),
+ schema="myschema",
+ )
table2 = Table(
- 'othertable',
+ "othertable",
meta,
- Column(
- 'id',
- Integer,
- primary_key=True),
- Column(
- 'myid',
- Integer,
- ForeignKey('myschema.mytable.myid')),
- schema='myschema',
+ Column("id", Integer, primary_key=True),
+ Column("myid", Integer, ForeignKey("myschema.mytable.myid")),
+ schema="myschema",
)
meta2 = MetaData()
table_c = table.tometadata(meta2)
table2_c = table2.tometadata(meta2)
- eq_(str(table_c.join(table2_c).onclause), str(table_c.c.myid
- == table2_c.c.myid))
- eq_(str(table_c.join(table2_c).onclause),
- 'myschema.mytable.myid = myschema.othertable.myid')
+ eq_(
+ str(table_c.join(table2_c).onclause),
+ str(table_c.c.myid == table2_c.c.myid),
+ )
+ eq_(
+ str(table_c.join(table2_c).onclause),
+ "myschema.mytable.myid = myschema.othertable.myid",
+ )
def test_change_name_retain_metadata(self):
meta = MetaData()
- table = Table('mytable', meta,
- Column('myid', Integer, primary_key=True),
- Column('name', String(40), nullable=True),
- Column('description', String(30),
- CheckConstraint("description='hi'")),
- UniqueConstraint('name'),
- schema='myschema',
- )
+ table = Table(
+ "mytable",
+ meta,
+ Column("myid", Integer, primary_key=True),
+ Column("name", String(40), nullable=True),
+ Column(
+ "description", String(30), CheckConstraint("description='hi'")
+ ),
+ UniqueConstraint("name"),
+ schema="myschema",
+ )
- table2 = table.tometadata(table.metadata, name='newtable')
- table3 = table.tometadata(table.metadata, schema='newschema',
- name='newtable')
+ table2 = table.tometadata(table.metadata, name="newtable")
+ table3 = table.tometadata(
+ table.metadata, schema="newschema", name="newtable"
+ )
assert table.metadata is table2.metadata
assert table.metadata is table3.metadata
- eq_((table.name, table2.name, table3.name),
- ('mytable', 'newtable', 'newtable'))
- eq_((table.key, table2.key, table3.key),
- ('myschema.mytable', 'myschema.newtable', 'newschema.newtable'))
+ eq_(
+ (table.name, table2.name, table3.name),
+ ("mytable", "newtable", "newtable"),
+ )
+ eq_(
+ (table.key, table2.key, table3.key),
+ ("myschema.mytable", "myschema.newtable", "newschema.newtable"),
+ )
def test_change_name_change_metadata(self):
meta = MetaData()
meta2 = MetaData()
- table = Table('mytable', meta,
- Column('myid', Integer, primary_key=True),
- Column('name', String(40), nullable=True),
- Column('description', String(30),
- CheckConstraint("description='hi'")),
- UniqueConstraint('name'),
- schema='myschema',
- )
+ table = Table(
+ "mytable",
+ meta,
+ Column("myid", Integer, primary_key=True),
+ Column("name", String(40), nullable=True),
+ Column(
+ "description", String(30), CheckConstraint("description='hi'")
+ ),
+ UniqueConstraint("name"),
+ schema="myschema",
+ )
- table2 = table.tometadata(meta2, name='newtable')
+ table2 = table.tometadata(meta2, name="newtable")
assert table.metadata is not table2.metadata
- eq_((table.name, table2.name),
- ('mytable', 'newtable'))
- eq_((table.key, table2.key),
- ('myschema.mytable', 'myschema.newtable'))
+ eq_((table.name, table2.name), ("mytable", "newtable"))
+ eq_((table.key, table2.key), ("myschema.mytable", "myschema.newtable"))
def test_change_name_selfref_fk_moves(self):
meta = MetaData()
- referenced = Table('ref', meta,
- Column('id', Integer, primary_key=True),
- )
- table = Table('mytable', meta,
- Column('id', Integer, primary_key=True),
- Column('parent_id', ForeignKey('mytable.id')),
- Column('ref_id', ForeignKey('ref.id'))
- )
+ referenced = Table(
+ "ref", meta, Column("id", Integer, primary_key=True)
+ )
+ table = Table(
+ "mytable",
+ meta,
+ Column("id", Integer, primary_key=True),
+ Column("parent_id", ForeignKey("mytable.id")),
+ Column("ref_id", ForeignKey("ref.id")),
+ )
- table2 = table.tometadata(table.metadata, name='newtable')
+ table2 = table.tometadata(table.metadata, name="newtable")
assert table.metadata is table2.metadata
assert table2.c.ref_id.references(referenced.c.id)
assert table2.c.parent_id.references(table2.c.id)
@@ -834,18 +937,21 @@ class ToMetaDataTest(fixtures.TestBase, ComparesTables):
def test_change_name_selfref_fk_moves_w_schema(self):
meta = MetaData()
- referenced = Table('ref', meta,
- Column('id', Integer, primary_key=True),
- )
- table = Table('mytable', meta,
- Column('id', Integer, primary_key=True),
- Column('parent_id', ForeignKey('mytable.id')),
- Column('ref_id', ForeignKey('ref.id'))
- )
+ referenced = Table(
+ "ref", meta, Column("id", Integer, primary_key=True)
+ )
+ table = Table(
+ "mytable",
+ meta,
+ Column("id", Integer, primary_key=True),
+ Column("parent_id", ForeignKey("mytable.id")),
+ Column("ref_id", ForeignKey("ref.id")),
+ )
table2 = table.tometadata(
- table.metadata, name='newtable', schema='newschema')
- ref2 = referenced.tometadata(table.metadata, schema='newschema')
+ table.metadata, name="newtable", schema="newschema"
+ )
+ ref2 = referenced.tometadata(table.metadata, schema="newschema")
assert table.metadata is table2.metadata
assert table2.c.ref_id.references(ref2.c.id)
assert table2.c.parent_id.references(table2.c.id)
@@ -854,8 +960,9 @@ class ToMetaDataTest(fixtures.TestBase, ComparesTables):
m2 = MetaData()
existing_schema = t2.schema
if schema:
- t2c = t2.tometadata(m2, schema=schema,
- referred_schema_fn=referred_schema_fn)
+ t2c = t2.tometadata(
+ m2, schema=schema, referred_schema_fn=referred_schema_fn
+ )
eq_(t2c.schema, schema)
else:
t2c = t2.tometadata(m2, referred_schema_fn=referred_schema_fn)
@@ -865,151 +972,164 @@ class ToMetaDataTest(fixtures.TestBase, ComparesTables):
def test_fk_has_schema_string_retain_schema(self):
m = MetaData()
- t2 = Table('t2', m, Column('y', Integer, ForeignKey('q.t1.x')))
+ t2 = Table("t2", m, Column("y", Integer, ForeignKey("q.t1.x")))
self._assert_fk(t2, None, "q.t1.x")
- Table('t1', m, Column('x', Integer), schema='q')
+ Table("t1", m, Column("x", Integer), schema="q")
self._assert_fk(t2, None, "q.t1.x")
def test_fk_has_schema_string_new_schema(self):
m = MetaData()
- t2 = Table('t2', m, Column('y', Integer, ForeignKey('q.t1.x')))
+ t2 = Table("t2", m, Column("y", Integer, ForeignKey("q.t1.x")))
self._assert_fk(t2, "z", "q.t1.x")
- Table('t1', m, Column('x', Integer), schema='q')
+ Table("t1", m, Column("x", Integer), schema="q")
self._assert_fk(t2, "z", "q.t1.x")
def test_fk_has_schema_col_retain_schema(self):
m = MetaData()
- t1 = Table('t1', m, Column('x', Integer), schema='q')
- t2 = Table('t2', m, Column('y', Integer, ForeignKey(t1.c.x)))
+ t1 = Table("t1", m, Column("x", Integer), schema="q")
+ t2 = Table("t2", m, Column("y", Integer, ForeignKey(t1.c.x)))
self._assert_fk(t2, "z", "q.t1.x")
def test_fk_has_schema_col_new_schema(self):
m = MetaData()
- t1 = Table('t1', m, Column('x', Integer), schema='q')
- t2 = Table('t2', m, Column('y', Integer, ForeignKey(t1.c.x)))
+ t1 = Table("t1", m, Column("x", Integer), schema="q")
+ t2 = Table("t2", m, Column("y", Integer, ForeignKey(t1.c.x)))
self._assert_fk(t2, "z", "q.t1.x")
def test_fk_and_referent_has_same_schema_string_retain_schema(self):
m = MetaData()
- t2 = Table('t2', m, Column('y', Integer,
- ForeignKey('q.t1.x')), schema="q")
+ t2 = Table(
+ "t2", m, Column("y", Integer, ForeignKey("q.t1.x")), schema="q"
+ )
self._assert_fk(t2, None, "q.t1.x")
- Table('t1', m, Column('x', Integer), schema='q')
+ Table("t1", m, Column("x", Integer), schema="q")
self._assert_fk(t2, None, "q.t1.x")
def test_fk_and_referent_has_same_schema_string_new_schema(self):
m = MetaData()
- t2 = Table('t2', m, Column('y', Integer,
- ForeignKey('q.t1.x')), schema="q")
+ t2 = Table(
+ "t2", m, Column("y", Integer, ForeignKey("q.t1.x")), schema="q"
+ )
self._assert_fk(t2, "z", "z.t1.x")
- Table('t1', m, Column('x', Integer), schema='q')
+ Table("t1", m, Column("x", Integer), schema="q")
self._assert_fk(t2, "z", "z.t1.x")
def test_fk_and_referent_has_same_schema_col_retain_schema(self):
m = MetaData()
- t1 = Table('t1', m, Column('x', Integer), schema='q')
- t2 = Table('t2', m, Column('y', Integer,
- ForeignKey(t1.c.x)), schema='q')
+ t1 = Table("t1", m, Column("x", Integer), schema="q")
+ t2 = Table(
+ "t2", m, Column("y", Integer, ForeignKey(t1.c.x)), schema="q"
+ )
self._assert_fk(t2, None, "q.t1.x")
def test_fk_and_referent_has_same_schema_col_new_schema(self):
m = MetaData()
- t1 = Table('t1', m, Column('x', Integer), schema='q')
- t2 = Table('t2', m, Column('y', Integer,
- ForeignKey(t1.c.x)), schema='q')
- self._assert_fk(t2, 'z', "z.t1.x")
+ t1 = Table("t1", m, Column("x", Integer), schema="q")
+ t2 = Table(
+ "t2", m, Column("y", Integer, ForeignKey(t1.c.x)), schema="q"
+ )
+ self._assert_fk(t2, "z", "z.t1.x")
def test_fk_and_referent_has_diff_schema_string_retain_schema(self):
m = MetaData()
- t2 = Table('t2', m, Column('y', Integer,
- ForeignKey('p.t1.x')), schema="q")
+ t2 = Table(
+ "t2", m, Column("y", Integer, ForeignKey("p.t1.x")), schema="q"
+ )
self._assert_fk(t2, None, "p.t1.x")
- Table('t1', m, Column('x', Integer), schema='p')
+ Table("t1", m, Column("x", Integer), schema="p")
self._assert_fk(t2, None, "p.t1.x")
def test_fk_and_referent_has_diff_schema_string_new_schema(self):
m = MetaData()
- t2 = Table('t2', m, Column('y', Integer,
- ForeignKey('p.t1.x')), schema="q")
+ t2 = Table(
+ "t2", m, Column("y", Integer, ForeignKey("p.t1.x")), schema="q"
+ )
self._assert_fk(t2, "z", "p.t1.x")
- Table('t1', m, Column('x', Integer), schema='p')
+ Table("t1", m, Column("x", Integer), schema="p")
self._assert_fk(t2, "z", "p.t1.x")
def test_fk_and_referent_has_diff_schema_col_retain_schema(self):
m = MetaData()
- t1 = Table('t1', m, Column('x', Integer), schema='p')
- t2 = Table('t2', m, Column('y', Integer,
- ForeignKey(t1.c.x)), schema='q')
+ t1 = Table("t1", m, Column("x", Integer), schema="p")
+ t2 = Table(
+ "t2", m, Column("y", Integer, ForeignKey(t1.c.x)), schema="q"
+ )
self._assert_fk(t2, None, "p.t1.x")
def test_fk_and_referent_has_diff_schema_col_new_schema(self):
m = MetaData()
- t1 = Table('t1', m, Column('x', Integer), schema='p')
- t2 = Table('t2', m, Column('y', Integer,
- ForeignKey(t1.c.x)), schema='q')
- self._assert_fk(t2, 'z', "p.t1.x")
+ t1 = Table("t1", m, Column("x", Integer), schema="p")
+ t2 = Table(
+ "t2", m, Column("y", Integer, ForeignKey(t1.c.x)), schema="q"
+ )
+ self._assert_fk(t2, "z", "p.t1.x")
def test_fk_custom_system(self):
m = MetaData()
- t2 = Table('t2', m, Column('y', Integer,
- ForeignKey('p.t1.x')), schema='q')
+ t2 = Table(
+ "t2", m, Column("y", Integer, ForeignKey("p.t1.x")), schema="q"
+ )
def ref_fn(table, to_schema, constraint, referred_schema):
assert table is t2
eq_(to_schema, "z")
eq_(referred_schema, "p")
return "h"
- self._assert_fk(t2, 'z', "h.t1.x", referred_schema_fn=ref_fn)
+
+ self._assert_fk(t2, "z", "h.t1.x", referred_schema_fn=ref_fn)
def test_copy_info(self):
m = MetaData()
- fk = ForeignKey('t2.id')
- c = Column('c', Integer, fk)
- ck = CheckConstraint('c > 5')
- t = Table('t', m, c, ck)
-
- m.info['minfo'] = True
- fk.info['fkinfo'] = True
- c.info['cinfo'] = True
- ck.info['ckinfo'] = True
- t.info['tinfo'] = True
- t.primary_key.info['pkinfo'] = True
- fkc = [const for const in t.constraints if
- isinstance(const, ForeignKeyConstraint)][0]
- fkc.info['fkcinfo'] = True
+ fk = ForeignKey("t2.id")
+ c = Column("c", Integer, fk)
+ ck = CheckConstraint("c > 5")
+ t = Table("t", m, c, ck)
+
+ m.info["minfo"] = True
+ fk.info["fkinfo"] = True
+ c.info["cinfo"] = True
+ ck.info["ckinfo"] = True
+ t.info["tinfo"] = True
+ t.primary_key.info["pkinfo"] = True
+ fkc = [
+ const
+ for const in t.constraints
+ if isinstance(const, ForeignKeyConstraint)
+ ][0]
+ fkc.info["fkcinfo"] = True
m2 = MetaData()
t2 = t.tometadata(m2)
- m.info['minfo'] = False
- fk.info['fkinfo'] = False
- c.info['cinfo'] = False
- ck.info['ckinfo'] = False
- t.primary_key.info['pkinfo'] = False
- fkc.info['fkcinfo'] = False
+ m.info["minfo"] = False
+ fk.info["fkinfo"] = False
+ c.info["cinfo"] = False
+ ck.info["ckinfo"] = False
+ t.primary_key.info["pkinfo"] = False
+ fkc.info["fkcinfo"] = False
eq_(m2.info, {})
eq_(t2.info, {"tinfo": True})
@@ -1017,21 +1137,29 @@ class ToMetaDataTest(fixtures.TestBase, ComparesTables):
eq_(list(t2.c.c.foreign_keys)[0].info, {"fkinfo": True})
eq_(t2.primary_key.info, {"pkinfo": True})
- fkc2 = [const for const in t2.constraints
- if isinstance(const, ForeignKeyConstraint)][0]
+ fkc2 = [
+ const
+ for const in t2.constraints
+ if isinstance(const, ForeignKeyConstraint)
+ ][0]
eq_(fkc2.info, {"fkcinfo": True})
- ck2 = [const for const in
- t2.constraints if isinstance(const, CheckConstraint)][0]
+ ck2 = [
+ const
+ for const in t2.constraints
+ if isinstance(const, CheckConstraint)
+ ][0]
eq_(ck2.info, {"ckinfo": True})
def test_dialect_kwargs(self):
meta = MetaData()
- table = Table('mytable', meta,
- Column('myid', Integer, primary_key=True),
- mysql_engine='InnoDB',
- )
+ table = Table(
+ "mytable",
+ meta,
+ Column("myid", Integer, primary_key=True),
+ mysql_engine="InnoDB",
+ )
meta2 = MetaData()
table_c = table.tometadata(meta2)
@@ -1043,72 +1171,82 @@ class ToMetaDataTest(fixtures.TestBase, ComparesTables):
def test_indexes(self):
meta = MetaData()
- table = Table('mytable', meta,
- Column('id', Integer, primary_key=True),
- Column('data1', Integer, index=True),
- Column('data2', Integer),
- Index('text', text('data1 + 1')),
- )
- Index('multi', table.c.data1, table.c.data2)
- Index('func', func.abs(table.c.data1))
- Index('multi-func', table.c.data1, func.abs(table.c.data2))
+ table = Table(
+ "mytable",
+ meta,
+ Column("id", Integer, primary_key=True),
+ Column("data1", Integer, index=True),
+ Column("data2", Integer),
+ Index("text", text("data1 + 1")),
+ )
+ Index("multi", table.c.data1, table.c.data2)
+ Index("func", func.abs(table.c.data1))
+ Index("multi-func", table.c.data1, func.abs(table.c.data2))
meta2 = MetaData()
table_c = table.tometadata(meta2)
def _get_key(i):
- return [i.name, i.unique] + \
- sorted(i.kwargs.items()) + \
- [str(col) for col in i.expressions]
+ return (
+ [i.name, i.unique]
+ + sorted(i.kwargs.items())
+ + [str(col) for col in i.expressions]
+ )
eq_(
sorted([_get_key(i) for i in table.indexes]),
- sorted([_get_key(i) for i in table_c.indexes])
+ sorted([_get_key(i) for i in table_c.indexes]),
)
def test_indexes_with_col_redefine(self):
meta = MetaData()
- table = Table('mytable', meta,
- Column('id', Integer, primary_key=True),
- Column('data1', Integer),
- Column('data2', Integer),
- Index('text', text('data1 + 1')),
- )
- Index('multi', table.c.data1, table.c.data2)
- Index('func', func.abs(table.c.data1))
- Index('multi-func', table.c.data1, func.abs(table.c.data2))
-
- table = Table('mytable', meta,
- Column('data1', Integer),
- Column('data2', Integer),
- extend_existing=True
- )
+ table = Table(
+ "mytable",
+ meta,
+ Column("id", Integer, primary_key=True),
+ Column("data1", Integer),
+ Column("data2", Integer),
+ Index("text", text("data1 + 1")),
+ )
+ Index("multi", table.c.data1, table.c.data2)
+ Index("func", func.abs(table.c.data1))
+ Index("multi-func", table.c.data1, func.abs(table.c.data2))
+
+ table = Table(
+ "mytable",
+ meta,
+ Column("data1", Integer),
+ Column("data2", Integer),
+ extend_existing=True,
+ )
meta2 = MetaData()
table_c = table.tometadata(meta2)
def _get_key(i):
- return [i.name, i.unique] + \
- sorted(i.kwargs.items()) + \
- [str(col) for col in i.expressions]
+ return (
+ [i.name, i.unique]
+ + sorted(i.kwargs.items())
+ + [str(col) for col in i.expressions]
+ )
eq_(
sorted([_get_key(i) for i in table.indexes]),
- sorted([_get_key(i) for i in table_c.indexes])
+ sorted([_get_key(i) for i in table_c.indexes]),
)
@emits_warning("Table '.+' already exists within the given MetaData")
def test_already_exists(self):
meta1 = MetaData()
- table1 = Table('mytable', meta1,
- Column('myid', Integer, primary_key=True),
- )
+ table1 = Table(
+ "mytable", meta1, Column("myid", Integer, primary_key=True)
+ )
meta2 = MetaData()
- table2 = Table('mytable', meta2,
- Column('yourid', Integer, primary_key=True),
- )
+ table2 = Table(
+ "mytable", meta2, Column("yourid", Integer, primary_key=True)
+ )
table_c = table1.tometadata(meta2)
table_d = table2.tometadata(meta2)
@@ -1117,86 +1255,97 @@ class ToMetaDataTest(fixtures.TestBase, ComparesTables):
assert table_c is table_d
def test_default_schema_metadata(self):
- meta = MetaData(schema='myschema')
+ meta = MetaData(schema="myschema")
table = Table(
- 'mytable',
+ "mytable",
meta,
+ Column("myid", Integer, primary_key=True),
+ Column("name", String(40), nullable=True),
Column(
- 'myid',
- Integer,
- primary_key=True),
- Column(
- 'name',
- String(40),
- nullable=True),
- Column(
- 'description',
- String(30),
- CheckConstraint("description='hi'")),
- UniqueConstraint('name'),
+ "description", String(30), CheckConstraint("description='hi'")
+ ),
+ UniqueConstraint("name"),
)
table2 = Table(
- 'othertable', meta, Column(
- 'id', Integer, primary_key=True), Column(
- 'myid', Integer, ForeignKey('myschema.mytable.myid')), )
+ "othertable",
+ meta,
+ Column("id", Integer, primary_key=True),
+ Column("myid", Integer, ForeignKey("myschema.mytable.myid")),
+ )
- meta2 = MetaData(schema='someschema')
+ meta2 = MetaData(schema="someschema")
table_c = table.tometadata(meta2, schema=None)
table2_c = table2.tometadata(meta2, schema=None)
- eq_(str(table_c.join(table2_c).onclause),
- str(table_c.c.myid == table2_c.c.myid))
- eq_(str(table_c.join(table2_c).onclause),
- "someschema.mytable.myid = someschema.othertable.myid")
+ eq_(
+ str(table_c.join(table2_c).onclause),
+ str(table_c.c.myid == table2_c.c.myid),
+ )
+ eq_(
+ str(table_c.join(table2_c).onclause),
+ "someschema.mytable.myid = someschema.othertable.myid",
+ )
def test_strip_schema(self):
meta = MetaData()
- table = Table('mytable', meta,
- Column('myid', Integer, primary_key=True),
- Column('name', String(40), nullable=True),
- Column('description', String(30),
- CheckConstraint("description='hi'")),
- UniqueConstraint('name'),
- )
+ table = Table(
+ "mytable",
+ meta,
+ Column("myid", Integer, primary_key=True),
+ Column("name", String(40), nullable=True),
+ Column(
+ "description", String(30), CheckConstraint("description='hi'")
+ ),
+ UniqueConstraint("name"),
+ )
- table2 = Table('othertable', meta,
- Column('id', Integer, primary_key=True),
- Column('myid', Integer, ForeignKey('mytable.myid')),
- )
+ table2 = Table(
+ "othertable",
+ meta,
+ Column("id", Integer, primary_key=True),
+ Column("myid", Integer, ForeignKey("mytable.myid")),
+ )
meta2 = MetaData()
table_c = table.tometadata(meta2, schema=None)
table2_c = table2.tometadata(meta2, schema=None)
- eq_(str(table_c.join(table2_c).onclause), str(table_c.c.myid
- == table2_c.c.myid))
- eq_(str(table_c.join(table2_c).onclause),
- 'mytable.myid = othertable.myid')
+ eq_(
+ str(table_c.join(table2_c).onclause),
+ str(table_c.c.myid == table2_c.c.myid),
+ )
+ eq_(
+ str(table_c.join(table2_c).onclause),
+ "mytable.myid = othertable.myid",
+ )
def test_unique_true_flag(self):
meta = MetaData()
- table = Table('mytable', meta, Column('x', Integer, unique=True))
+ table = Table("mytable", meta, Column("x", Integer, unique=True))
m2 = MetaData()
t2 = table.tometadata(m2)
eq_(
- len([
- const for const
- in t2.constraints
- if isinstance(const, UniqueConstraint)]),
- 1
+ len(
+ [
+ const
+ for const in t2.constraints
+ if isinstance(const, UniqueConstraint)
+ ]
+ ),
+ 1,
)
def test_index_true_flag(self):
meta = MetaData()
- table = Table('mytable', meta, Column('x', Integer, index=True))
+ table = Table("mytable", meta, Column("x", Integer, index=True))
m2 = MetaData()
@@ -1214,168 +1363,161 @@ class InfoTest(fixtures.TestBase):
eq_(m1.info, {"foo": "bar"})
def test_foreignkey_constraint_info(self):
- fkc = ForeignKeyConstraint(['a'], ['b'], name='bar')
+ fkc = ForeignKeyConstraint(["a"], ["b"], name="bar")
eq_(fkc.info, {})
fkc = ForeignKeyConstraint(
- ['a'], ['b'], name='bar', info={"foo": "bar"})
+ ["a"], ["b"], name="bar", info={"foo": "bar"}
+ )
eq_(fkc.info, {"foo": "bar"})
def test_foreignkey_info(self):
- fkc = ForeignKey('a')
+ fkc = ForeignKey("a")
eq_(fkc.info, {})
- fkc = ForeignKey('a', info={"foo": "bar"})
+ fkc = ForeignKey("a", info={"foo": "bar"})
eq_(fkc.info, {"foo": "bar"})
def test_primarykey_constraint_info(self):
- pkc = PrimaryKeyConstraint('a', name='x')
+ pkc = PrimaryKeyConstraint("a", name="x")
eq_(pkc.info, {})
- pkc = PrimaryKeyConstraint('a', name='x', info={'foo': 'bar'})
- eq_(pkc.info, {'foo': 'bar'})
+ pkc = PrimaryKeyConstraint("a", name="x", info={"foo": "bar"})
+ eq_(pkc.info, {"foo": "bar"})
def test_unique_constraint_info(self):
- uc = UniqueConstraint('a', name='x')
+ uc = UniqueConstraint("a", name="x")
eq_(uc.info, {})
- uc = UniqueConstraint('a', name='x', info={'foo': 'bar'})
- eq_(uc.info, {'foo': 'bar'})
+ uc = UniqueConstraint("a", name="x", info={"foo": "bar"})
+ eq_(uc.info, {"foo": "bar"})
def test_check_constraint_info(self):
- cc = CheckConstraint('foo=bar', name='x')
+ cc = CheckConstraint("foo=bar", name="x")
eq_(cc.info, {})
- cc = CheckConstraint('foo=bar', name='x', info={'foo': 'bar'})
- eq_(cc.info, {'foo': 'bar'})
+ cc = CheckConstraint("foo=bar", name="x", info={"foo": "bar"})
+ eq_(cc.info, {"foo": "bar"})
def test_index_info(self):
- ix = Index('x', 'a')
+ ix = Index("x", "a")
eq_(ix.info, {})
- ix = Index('x', 'a', info={'foo': 'bar'})
- eq_(ix.info, {'foo': 'bar'})
+ ix = Index("x", "a", info={"foo": "bar"})
+ eq_(ix.info, {"foo": "bar"})
def test_column_info(self):
- c = Column('x', Integer)
+ c = Column("x", Integer)
eq_(c.info, {})
- c = Column('x', Integer, info={'foo': 'bar'})
- eq_(c.info, {'foo': 'bar'})
+ c = Column("x", Integer, info={"foo": "bar"})
+ eq_(c.info, {"foo": "bar"})
def test_table_info(self):
- t = Table('x', MetaData())
+ t = Table("x", MetaData())
eq_(t.info, {})
- t = Table('x', MetaData(), info={'foo': 'bar'})
- eq_(t.info, {'foo': 'bar'})
+ t = Table("x", MetaData(), info={"foo": "bar"})
+ eq_(t.info, {"foo": "bar"})
class TableTest(fixtures.TestBase, AssertsCompiledSQL):
-
@testing.requires.temporary_tables
- @testing.skip_if('mssql', 'different col format')
+ @testing.skip_if("mssql", "different col format")
def test_prefixes(self):
from sqlalchemy import Table
- table1 = Table("temporary_table_1", MetaData(),
- Column("col1", Integer),
- prefixes=["TEMPORARY"])
+
+ table1 = Table(
+ "temporary_table_1",
+ MetaData(),
+ Column("col1", Integer),
+ prefixes=["TEMPORARY"],
+ )
self.assert_compile(
schema.CreateTable(table1),
- "CREATE TEMPORARY TABLE temporary_table_1 (col1 INTEGER)"
+ "CREATE TEMPORARY TABLE temporary_table_1 (col1 INTEGER)",
)
- table2 = Table("temporary_table_2", MetaData(),
- Column("col1", Integer),
- prefixes=["VIRTUAL"])
+ table2 = Table(
+ "temporary_table_2",
+ MetaData(),
+ Column("col1", Integer),
+ prefixes=["VIRTUAL"],
+ )
self.assert_compile(
schema.CreateTable(table2),
- "CREATE VIRTUAL TABLE temporary_table_2 (col1 INTEGER)"
+ "CREATE VIRTUAL TABLE temporary_table_2 (col1 INTEGER)",
)
def test_table_info(self):
metadata = MetaData()
- t1 = Table('foo', metadata, info={'x': 'y'})
- t2 = Table('bar', metadata, info={})
- t3 = Table('bat', metadata)
- assert t1.info == {'x': 'y'}
+ t1 = Table("foo", metadata, info={"x": "y"})
+ t2 = Table("bar", metadata, info={})
+ t3 = Table("bat", metadata)
+ assert t1.info == {"x": "y"}
assert t2.info == {}
assert t3.info == {}
for t in (t1, t2, t3):
- t.info['bar'] = 'zip'
- assert t.info['bar'] == 'zip'
+ t.info["bar"] = "zip"
+ assert t.info["bar"] == "zip"
def test_reset_exported_passes(self):
m = MetaData()
- t = Table('t', m, Column('foo', Integer))
- eq_(
- list(t.c), [t.c.foo]
- )
+ t = Table("t", m, Column("foo", Integer))
+ eq_(list(t.c), [t.c.foo])
t._reset_exported()
- eq_(
- list(t.c), [t.c.foo]
- )
+ eq_(list(t.c), [t.c.foo])
def test_foreign_key_constraints_collection(self):
metadata = MetaData()
- t1 = Table('foo', metadata, Column('a', Integer))
+ t1 = Table("foo", metadata, Column("a", Integer))
eq_(t1.foreign_key_constraints, set())
- fk1 = ForeignKey('q.id')
- fk2 = ForeignKey('j.id')
- fk3 = ForeignKeyConstraint(['b', 'c'], ['r.x', 'r.y'])
+ fk1 = ForeignKey("q.id")
+ fk2 = ForeignKey("j.id")
+ fk3 = ForeignKeyConstraint(["b", "c"], ["r.x", "r.y"])
- t1.append_column(Column('b', Integer, fk1))
- eq_(
- t1.foreign_key_constraints,
- set([fk1.constraint]))
+ t1.append_column(Column("b", Integer, fk1))
+ eq_(t1.foreign_key_constraints, set([fk1.constraint]))
- t1.append_column(Column('c', Integer, fk2))
- eq_(
- t1.foreign_key_constraints,
- set([fk1.constraint, fk2.constraint]))
+ t1.append_column(Column("c", Integer, fk2))
+ eq_(t1.foreign_key_constraints, set([fk1.constraint, fk2.constraint]))
t1.append_constraint(fk3)
eq_(
t1.foreign_key_constraints,
- set([fk1.constraint, fk2.constraint, fk3]))
+ set([fk1.constraint, fk2.constraint, fk3]),
+ )
def test_c_immutable(self):
m = MetaData()
- t1 = Table('t', m, Column('x', Integer), Column('y', Integer))
- assert_raises(
- TypeError,
- t1.c.extend, [Column('z', Integer)]
- )
+ t1 = Table("t", m, Column("x", Integer), Column("y", Integer))
+ assert_raises(TypeError, t1.c.extend, [Column("z", Integer)])
def assign():
- t1.c['z'] = Column('z', Integer)
- assert_raises(
- TypeError,
- assign
- )
+ t1.c["z"] = Column("z", Integer)
+
+ assert_raises(TypeError, assign)
def assign2():
- t1.c.z = Column('z', Integer)
- assert_raises(
- TypeError,
- assign2
- )
+ t1.c.z = Column("z", Integer)
+
+ assert_raises(TypeError, assign2)
def test_c_mutate_after_unpickle(self):
m = MetaData()
- y = Column('y', Integer)
- t1 = Table('t', m, Column('x', Integer), y)
+ y = Column("y", Integer)
+ t1 = Table("t", m, Column("x", Integer), y)
t2 = pickle.loads(pickle.dumps(t1))
- z = Column('z', Integer)
- g = Column('g', Integer)
+ z = Column("z", Integer)
+ g = Column("g", Integer)
t2.append_column(z)
is_(t1.c.contains_column(y), True)
@@ -1389,39 +1531,39 @@ class TableTest(fixtures.TestBase, AssertsCompiledSQL):
def test_autoincrement_replace(self):
m = MetaData()
- t = Table('t', m,
- Column('id', Integer, primary_key=True)
- )
+ t = Table("t", m, Column("id", Integer, primary_key=True))
is_(t._autoincrement_column, t.c.id)
- t = Table('t', m,
- Column('id', Integer, primary_key=True),
- extend_existing=True
- )
+ t = Table(
+ "t",
+ m,
+ Column("id", Integer, primary_key=True),
+ extend_existing=True,
+ )
is_(t._autoincrement_column, t.c.id)
def test_pk_args_standalone(self):
m = MetaData()
- t = Table('t', m,
- Column('x', Integer, primary_key=True),
- PrimaryKeyConstraint(mssql_clustered=True)
- )
- eq_(
- list(t.primary_key), [t.c.x]
- )
- eq_(
- t.primary_key.dialect_kwargs, {"mssql_clustered": True}
+ t = Table(
+ "t",
+ m,
+ Column("x", Integer, primary_key=True),
+ PrimaryKeyConstraint(mssql_clustered=True),
)
+ eq_(list(t.primary_key), [t.c.x])
+ eq_(t.primary_key.dialect_kwargs, {"mssql_clustered": True})
def test_pk_cols_sets_flags(self):
m = MetaData()
- t = Table('t', m,
- Column('x', Integer),
- Column('y', Integer),
- Column('z', Integer),
- PrimaryKeyConstraint('x', 'y')
- )
+ t = Table(
+ "t",
+ m,
+ Column("x", Integer),
+ Column("y", Integer),
+ Column("z", Integer),
+ PrimaryKeyConstraint("x", "y"),
+ )
eq_(t.c.x.primary_key, True)
eq_(t.c.y.primary_key, True)
eq_(t.c.z.primary_key, False)
@@ -1432,10 +1574,12 @@ class TableTest(fixtures.TestBase, AssertsCompiledSQL):
exc.SAWarning,
"Table 't' specifies columns 'x' as primary_key=True, "
"not matching locally specified columns 'q'",
- Table, 't', m,
- Column('x', Integer, primary_key=True),
- Column('q', Integer),
- PrimaryKeyConstraint('q')
+ Table,
+ "t",
+ m,
+ Column("x", Integer, primary_key=True),
+ Column("q", Integer),
+ PrimaryKeyConstraint("q"),
)
def test_pk_col_mismatch_two(self):
@@ -1444,40 +1588,46 @@ class TableTest(fixtures.TestBase, AssertsCompiledSQL):
exc.SAWarning,
"Table 't' specifies columns 'a', 'b', 'c' as primary_key=True, "
"not matching locally specified columns 'b', 'c'",
- Table, 't', m,
- Column('a', Integer, primary_key=True),
- Column('b', Integer, primary_key=True),
- Column('c', Integer, primary_key=True),
- PrimaryKeyConstraint('b', 'c')
+ Table,
+ "t",
+ m,
+ Column("a", Integer, primary_key=True),
+ Column("b", Integer, primary_key=True),
+ Column("c", Integer, primary_key=True),
+ PrimaryKeyConstraint("b", "c"),
)
@testing.emits_warning("Table 't'")
def test_pk_col_mismatch_three(self):
m = MetaData()
- t = Table('t', m,
- Column('x', Integer, primary_key=True),
- Column('q', Integer),
- PrimaryKeyConstraint('q')
- )
+ t = Table(
+ "t",
+ m,
+ Column("x", Integer, primary_key=True),
+ Column("q", Integer),
+ PrimaryKeyConstraint("q"),
+ )
eq_(list(t.primary_key), [t.c.q])
@testing.emits_warning("Table 't'")
def test_pk_col_mismatch_four(self):
m = MetaData()
- t = Table('t', m,
- Column('a', Integer, primary_key=True),
- Column('b', Integer, primary_key=True),
- Column('c', Integer, primary_key=True),
- PrimaryKeyConstraint('b', 'c')
- )
+ t = Table(
+ "t",
+ m,
+ Column("a", Integer, primary_key=True),
+ Column("b", Integer, primary_key=True),
+ Column("c", Integer, primary_key=True),
+ PrimaryKeyConstraint("b", "c"),
+ )
eq_(list(t.primary_key), [t.c.b, t.c.c])
def test_pk_always_flips_nullable(self):
m = MetaData()
- t1 = Table('t1', m, Column('x', Integer), PrimaryKeyConstraint('x'))
+ t1 = Table("t1", m, Column("x", Integer), PrimaryKeyConstraint("x"))
- t2 = Table('t2', m, Column('x', Integer, primary_key=True))
+ t2 = Table("t2", m, Column("x", Integer, primary_key=True))
eq_(list(t1.primary_key), [t1.c.x])
@@ -1492,67 +1642,58 @@ class TableTest(fixtures.TestBase, AssertsCompiledSQL):
class PKAutoIncrementTest(fixtures.TestBase):
def test_multi_integer_no_autoinc(self):
- pk = PrimaryKeyConstraint(
- Column('a', Integer),
- Column('b', Integer)
- )
- t = Table('t', MetaData())
+ pk = PrimaryKeyConstraint(Column("a", Integer), Column("b", Integer))
+ t = Table("t", MetaData())
t.append_constraint(pk)
is_(pk._autoincrement_column, None)
def test_multi_integer_multi_autoinc(self):
pk = PrimaryKeyConstraint(
- Column('a', Integer, autoincrement=True),
- Column('b', Integer, autoincrement=True)
+ Column("a", Integer, autoincrement=True),
+ Column("b", Integer, autoincrement=True),
)
- t = Table('t', MetaData())
+ t = Table("t", MetaData())
t.append_constraint(pk)
assert_raises_message(
exc.ArgumentError,
"Only one Column may be marked",
- lambda: pk._autoincrement_column
+ lambda: pk._autoincrement_column,
)
def test_single_integer_no_autoinc(self):
- pk = PrimaryKeyConstraint(
- Column('a', Integer),
- )
- t = Table('t', MetaData())
+ pk = PrimaryKeyConstraint(Column("a", Integer))
+ t = Table("t", MetaData())
t.append_constraint(pk)
- is_(pk._autoincrement_column, pk.columns['a'])
+ is_(pk._autoincrement_column, pk.columns["a"])
def test_single_string_no_autoinc(self):
- pk = PrimaryKeyConstraint(
- Column('a', String),
- )
- t = Table('t', MetaData())
+ pk = PrimaryKeyConstraint(Column("a", String))
+ t = Table("t", MetaData())
t.append_constraint(pk)
is_(pk._autoincrement_column, None)
def test_single_string_illegal_autoinc(self):
- t = Table('t', MetaData(), Column('a', String, autoincrement=True))
- pk = PrimaryKeyConstraint(
- t.c.a
- )
+ t = Table("t", MetaData(), Column("a", String, autoincrement=True))
+ pk = PrimaryKeyConstraint(t.c.a)
t.append_constraint(pk)
assert_raises_message(
exc.ArgumentError,
"Column type VARCHAR on column 't.a'",
- lambda: pk._autoincrement_column
+ lambda: pk._autoincrement_column,
)
def test_single_integer_default(self):
t = Table(
- 't', MetaData(),
- Column('a', Integer, autoincrement=True, default=lambda: 1))
- pk = PrimaryKeyConstraint(
- t.c.a
+ "t",
+ MetaData(),
+ Column("a", Integer, autoincrement=True, default=lambda: 1),
)
+ pk = PrimaryKeyConstraint(t.c.a)
t.append_constraint(pk)
is_(pk._autoincrement_column, t.c.a)
@@ -1562,47 +1703,45 @@ class PKAutoIncrementTest(fixtures.TestBase):
# if the user puts autoincrement=True with a server_default, trust
# them on it
t = Table(
- 't', MetaData(),
- Column('a', Integer,
- autoincrement=True, server_default=func.magic()))
- pk = PrimaryKeyConstraint(
- t.c.a
+ "t",
+ MetaData(),
+ Column(
+ "a", Integer, autoincrement=True, server_default=func.magic()
+ ),
)
+ pk = PrimaryKeyConstraint(t.c.a)
t.append_constraint(pk)
is_(pk._autoincrement_column, t.c.a)
def test_implicit_autoinc_but_fks(self):
m = MetaData()
- Table('t1', m, Column('id', Integer, primary_key=True))
- t2 = Table(
- 't2', MetaData(),
- Column('a', Integer, ForeignKey('t1.id')))
- pk = PrimaryKeyConstraint(
- t2.c.a
- )
+ Table("t1", m, Column("id", Integer, primary_key=True))
+ t2 = Table("t2", MetaData(), Column("a", Integer, ForeignKey("t1.id")))
+ pk = PrimaryKeyConstraint(t2.c.a)
t2.append_constraint(pk)
is_(pk._autoincrement_column, None)
def test_explicit_autoinc_but_fks(self):
m = MetaData()
- Table('t1', m, Column('id', Integer, primary_key=True))
+ Table("t1", m, Column("id", Integer, primary_key=True))
t2 = Table(
- 't2', MetaData(),
- Column('a', Integer, ForeignKey('t1.id'), autoincrement=True))
- pk = PrimaryKeyConstraint(
- t2.c.a
+ "t2",
+ MetaData(),
+ Column("a", Integer, ForeignKey("t1.id"), autoincrement=True),
)
+ pk = PrimaryKeyConstraint(t2.c.a)
t2.append_constraint(pk)
is_(pk._autoincrement_column, t2.c.a)
t3 = Table(
- 't3', MetaData(),
- Column('a', Integer,
- ForeignKey('t1.id'), autoincrement='ignore_fk'))
- pk = PrimaryKeyConstraint(
- t3.c.a
+ "t3",
+ MetaData(),
+ Column(
+ "a", Integer, ForeignKey("t1.id"), autoincrement="ignore_fk"
+ ),
)
+ pk = PrimaryKeyConstraint(t3.c.a)
t3.append_constraint(pk)
is_(pk._autoincrement_column, t3.c.a)
@@ -1622,12 +1761,14 @@ class SchemaTypeTest(fixtures.TestBase):
def _on_table_create(self, target, bind, **kw):
super(SchemaTypeTest.TrackEvents, self)._on_table_create(
- target, bind, **kw)
+ target, bind, **kw
+ )
self.evt_targets += (target,)
def _on_metadata_create(self, target, bind, **kw):
super(SchemaTypeTest.TrackEvents, self)._on_metadata_create(
- target, bind, **kw)
+ target, bind, **kw
+ )
self.evt_targets += (target,)
# TODO: Enum and Boolean put TypeEngine first. Changing that here
@@ -1644,7 +1785,6 @@ class SchemaTypeTest(fixtures.TestBase):
pass
class MyTypeWImpl(MyType):
-
def _gen_dialect_impl(self, dialect):
return self.adapt(SchemaTypeTest.MyTypeImpl)
@@ -1724,13 +1864,13 @@ class SchemaTypeTest(fixtures.TestBase):
orig_set_parent_w_dispatch(parent)
canary._set_parent_with_dispatch(parent)
- with mock.patch.object(evt_target, '_set_parent', _set_parent):
+ with mock.patch.object(evt_target, "_set_parent", _set_parent):
with mock.patch.object(
- evt_target, '_set_parent_with_dispatch',
- _set_parent_w_dispatch):
+ evt_target, "_set_parent_with_dispatch", _set_parent_w_dispatch
+ ):
event.listen(evt_target, "before_parent_attach", canary.go)
- c = Column('q', typ)
+ c = Column("q", typ)
if double:
# no clean way yet to fix this, inner schema type is called
@@ -1741,8 +1881,8 @@ class SchemaTypeTest(fixtures.TestBase):
mock.call._set_parent(c),
mock.call.go(evt_target, c),
mock.call._set_parent(c),
- mock.call._set_parent_with_dispatch(c)
- ]
+ mock.call._set_parent_with_dispatch(c),
+ ],
)
else:
eq_(
@@ -1750,39 +1890,39 @@ class SchemaTypeTest(fixtures.TestBase):
[
mock.call.go(evt_target, c),
mock.call._set_parent(c),
- mock.call._set_parent_with_dispatch(c)
- ]
+ mock.call._set_parent_with_dispatch(c),
+ ],
)
def test_independent_schema(self):
m = MetaData()
type_ = self.MyType(schema="q")
- t1 = Table('x', m, Column("y", type_), schema="z")
+ t1 = Table("x", m, Column("y", type_), schema="z")
eq_(t1.c.y.type.schema, "q")
def test_inherit_schema(self):
m = MetaData()
type_ = self.MyType(schema="q", inherit_schema=True)
- t1 = Table('x', m, Column("y", type_), schema="z")
+ t1 = Table("x", m, Column("y", type_), schema="z")
eq_(t1.c.y.type.schema, "z")
def test_independent_schema_enum(self):
m = MetaData()
type_ = sqltypes.Enum("a", schema="q")
- t1 = Table('x', m, Column("y", type_), schema="z")
+ t1 = Table("x", m, Column("y", type_), schema="z")
eq_(t1.c.y.type.schema, "q")
def test_inherit_schema_enum(self):
m = MetaData()
type_ = sqltypes.Enum("a", "b", "c", schema="q", inherit_schema=True)
- t1 = Table('x', m, Column("y", type_), schema="z")
+ t1 = Table("x", m, Column("y", type_), schema="z")
eq_(t1.c.y.type.schema, "z")
def test_tometadata_copy_type(self):
m1 = MetaData()
type_ = self.MyType()
- t1 = Table('x', m1, Column("y", type_))
+ t1 = Table("x", m1, Column("y", type_))
m2 = MetaData()
t2 = t1.tometadata(m2)
@@ -1794,14 +1934,13 @@ class SchemaTypeTest(fixtures.TestBase):
is_(t2.c.y.type.table, t2)
def test_tometadata_copy_decorated(self):
-
class MyDecorated(TypeDecorator):
impl = self.MyType
m1 = MetaData()
type_ = MyDecorated(schema="z")
- t1 = Table('x', m1, Column("y", type_))
+ t1 = Table("x", m1, Column("y", type_))
m2 = MetaData()
t2 = t1.tometadata(m2)
@@ -1811,7 +1950,7 @@ class SchemaTypeTest(fixtures.TestBase):
m1 = MetaData()
type_ = self.MyType()
- t1 = Table('x', m1, Column("y", type_))
+ t1 = Table("x", m1, Column("y", type_))
m2 = MetaData()
t2 = t1.tometadata(m2, schema="bar")
@@ -1822,7 +1961,7 @@ class SchemaTypeTest(fixtures.TestBase):
m1 = MetaData()
type_ = self.MyType(inherit_schema=True)
- t1 = Table('x', m1, Column("y", type_))
+ t1 = Table("x", m1, Column("y", type_))
m2 = MetaData()
t2 = t1.tometadata(m2, schema="bar")
@@ -1834,7 +1973,7 @@ class SchemaTypeTest(fixtures.TestBase):
m1 = MetaData()
type_ = self.MyType()
- t1 = Table('x', m1, Column("y", type_))
+ t1 = Table("x", m1, Column("y", type_))
m2 = MetaData()
t2 = t1.tometadata(m2)
@@ -1851,17 +1990,17 @@ class SchemaTypeTest(fixtures.TestBase):
def test_enum_column_copy_transfers_events(self):
m = MetaData()
- type_ = self.WrapEnum('a', 'b', 'c', name='foo')
- y = Column('y', type_)
+ type_ = self.WrapEnum("a", "b", "c", name="foo")
+ y = Column("y", type_)
y_copy = y.copy()
- t1 = Table('x', m, y_copy)
+ t1 = Table("x", m, y_copy)
is_true(y_copy.type._create_events)
# for PostgreSQL, this will emit CREATE TYPE
m.dispatch.before_create(t1, testing.db)
try:
- eq_(t1.c.y.type.evt_targets, (t1, ))
+ eq_(t1.c.y.type.evt_targets, (t1,))
finally:
# do the drop so that PostgreSQL emits DROP TYPE
m.dispatch.after_drop(t1, testing.db)
@@ -1869,25 +2008,30 @@ class SchemaTypeTest(fixtures.TestBase):
def test_enum_nonnative_column_copy_transfers_events(self):
m = MetaData()
- type_ = self.WrapEnum('a', 'b', 'c', name='foo', native_enum=False)
- y = Column('y', type_)
+ type_ = self.WrapEnum("a", "b", "c", name="foo", native_enum=False)
+ y = Column("y", type_)
y_copy = y.copy()
- t1 = Table('x', m, y_copy)
+ t1 = Table("x", m, y_copy)
is_true(y_copy.type._create_events)
m.dispatch.before_create(t1, testing.db)
- eq_(t1.c.y.type.evt_targets, (t1, ))
+ eq_(t1.c.y.type.evt_targets, (t1,))
def test_enum_nonnative_column_copy_transfers_constraintpref(self):
m = MetaData()
type_ = self.WrapEnum(
- 'a', 'b', 'c', name='foo',
- native_enum=False, create_constraint=False)
- y = Column('y', type_)
+ "a",
+ "b",
+ "c",
+ name="foo",
+ native_enum=False,
+ create_constraint=False,
+ )
+ y = Column("y", type_)
y_copy = y.copy()
- Table('x', m, y_copy)
+ Table("x", m, y_copy)
is_false(y_copy.type.create_constraint)
@@ -1895,9 +2039,9 @@ class SchemaTypeTest(fixtures.TestBase):
m = MetaData()
type_ = self.WrapBoolean()
- y = Column('y', type_)
+ y = Column("y", type_)
y_copy = y.copy()
- t1 = Table('x', m, y_copy)
+ t1 = Table("x", m, y_copy)
is_true(y_copy.type._create_events)
@@ -1905,9 +2049,9 @@ class SchemaTypeTest(fixtures.TestBase):
m = MetaData()
type_ = self.WrapBoolean(create_constraint=False)
- y = Column('y', type_)
+ y = Column("y", type_)
y_copy = y.copy()
- Table('x', m, y_copy)
+ Table("x", m, y_copy)
is_false(y_copy.type.create_constraint)
@@ -1915,7 +2059,7 @@ class SchemaTypeTest(fixtures.TestBase):
m1 = MetaData()
typ = self.MyType(metadata=m1)
m1.dispatch.before_create(m1, testing.db)
- eq_(typ.evt_targets, (m1, ))
+ eq_(typ.evt_targets, (m1,))
dialect_impl = typ.dialect_impl(testing.db.dialect)
eq_(dialect_impl.evt_targets, ())
@@ -1924,24 +2068,24 @@ class SchemaTypeTest(fixtures.TestBase):
m1 = MetaData()
typ = self.MyTypeWImpl(metadata=m1)
m1.dispatch.before_create(m1, testing.db)
- eq_(typ.evt_targets, (m1, ))
+ eq_(typ.evt_targets, (m1,))
dialect_impl = typ.dialect_impl(testing.db.dialect)
- eq_(dialect_impl.evt_targets, (m1, ))
+ eq_(dialect_impl.evt_targets, (m1,))
def test_table_dispatch_decorator_schematype(self):
m1 = MetaData()
typ = self.MyTypeDecAndSchema()
- t1 = Table('t1', m1, Column('x', typ))
+ t1 = Table("t1", m1, Column("x", typ))
m1.dispatch.before_create(t1, testing.db)
- eq_(typ.evt_targets, (t1, ))
+ eq_(typ.evt_targets, (t1,))
def test_table_dispatch_no_new_impl(self):
m1 = MetaData()
typ = self.MyType()
- t1 = Table('t1', m1, Column('x', typ))
+ t1 = Table("t1", m1, Column("x", typ))
m1.dispatch.before_create(t1, testing.db)
- eq_(typ.evt_targets, (t1, ))
+ eq_(typ.evt_targets, (t1,))
dialect_impl = typ.dialect_impl(testing.db.dialect)
eq_(dialect_impl.evt_targets, ())
@@ -1949,12 +2093,12 @@ class SchemaTypeTest(fixtures.TestBase):
def test_table_dispatch_new_impl(self):
m1 = MetaData()
typ = self.MyTypeWImpl()
- t1 = Table('t1', m1, Column('x', typ))
+ t1 = Table("t1", m1, Column("x", typ))
m1.dispatch.before_create(t1, testing.db)
- eq_(typ.evt_targets, (t1, ))
+ eq_(typ.evt_targets, (t1,))
dialect_impl = typ.dialect_impl(testing.db.dialect)
- eq_(dialect_impl.evt_targets, (t1, ))
+ eq_(dialect_impl.evt_targets, (t1,))
def test_create_metadata_bound_no_crash(self):
m1 = MetaData()
@@ -1965,127 +2109,103 @@ class SchemaTypeTest(fixtures.TestBase):
def test_boolean_constraint_type_doesnt_double(self):
m1 = MetaData()
- t1 = Table('x', m1, Column("flag", Boolean()))
+ t1 = Table("x", m1, Column("flag", Boolean()))
eq_(
- len([
- c for c in t1.constraints
- if isinstance(c, CheckConstraint)]),
- 1
+ len([c for c in t1.constraints if isinstance(c, CheckConstraint)]),
+ 1,
)
m2 = MetaData()
t2 = t1.tometadata(m2)
eq_(
- len([
- c for c in t2.constraints
- if isinstance(c, CheckConstraint)]),
- 1
+ len([c for c in t2.constraints if isinstance(c, CheckConstraint)]),
+ 1,
)
def test_enum_constraint_type_doesnt_double(self):
m1 = MetaData()
- t1 = Table('x', m1, Column("flag", Enum('a', 'b', 'c')))
+ t1 = Table("x", m1, Column("flag", Enum("a", "b", "c")))
eq_(
- len([
- c for c in t1.constraints
- if isinstance(c, CheckConstraint)]),
- 1
+ len([c for c in t1.constraints if isinstance(c, CheckConstraint)]),
+ 1,
)
m2 = MetaData()
t2 = t1.tometadata(m2)
eq_(
- len([
- c for c in t2.constraints
- if isinstance(c, CheckConstraint)]),
- 1
+ len([c for c in t2.constraints if isinstance(c, CheckConstraint)]),
+ 1,
)
class SchemaTest(fixtures.TestBase, AssertsCompiledSQL):
-
def test_default_schema_metadata_fk(self):
m = MetaData(schema="foo")
- t1 = Table('t1', m, Column('x', Integer))
- t2 = Table('t2', m, Column('x', Integer, ForeignKey('t1.x')))
+ t1 = Table("t1", m, Column("x", Integer))
+ t2 = Table("t2", m, Column("x", Integer, ForeignKey("t1.x")))
assert t2.c.x.references(t1.c.x)
def test_ad_hoc_schema_equiv_fk(self):
m = MetaData()
- t1 = Table('t1', m, Column('x', Integer), schema="foo")
+ t1 = Table("t1", m, Column("x", Integer), schema="foo")
t2 = Table(
- 't2',
- m,
- Column(
- 'x',
- Integer,
- ForeignKey('t1.x')),
- schema="foo")
+ "t2", m, Column("x", Integer, ForeignKey("t1.x")), schema="foo"
+ )
assert_raises(
- exc.NoReferencedTableError,
- lambda: t2.c.x.references(t1.c.x)
+ exc.NoReferencedTableError, lambda: t2.c.x.references(t1.c.x)
)
def test_default_schema_metadata_fk_alt_remote(self):
m = MetaData(schema="foo")
- t1 = Table('t1', m, Column('x', Integer))
- t2 = Table('t2', m, Column('x', Integer, ForeignKey('t1.x')),
- schema="bar")
+ t1 = Table("t1", m, Column("x", Integer))
+ t2 = Table(
+ "t2", m, Column("x", Integer, ForeignKey("t1.x")), schema="bar"
+ )
assert t2.c.x.references(t1.c.x)
def test_default_schema_metadata_fk_alt_local_raises(self):
m = MetaData(schema="foo")
- t1 = Table('t1', m, Column('x', Integer), schema="bar")
- t2 = Table('t2', m, Column('x', Integer, ForeignKey('t1.x')))
+ t1 = Table("t1", m, Column("x", Integer), schema="bar")
+ t2 = Table("t2", m, Column("x", Integer, ForeignKey("t1.x")))
assert_raises(
- exc.NoReferencedTableError,
- lambda: t2.c.x.references(t1.c.x)
+ exc.NoReferencedTableError, lambda: t2.c.x.references(t1.c.x)
)
def test_default_schema_metadata_fk_alt_local(self):
m = MetaData(schema="foo")
- t1 = Table('t1', m, Column('x', Integer), schema="bar")
- t2 = Table('t2', m, Column('x', Integer, ForeignKey('bar.t1.x')))
+ t1 = Table("t1", m, Column("x", Integer), schema="bar")
+ t2 = Table("t2", m, Column("x", Integer, ForeignKey("bar.t1.x")))
assert t2.c.x.references(t1.c.x)
def test_create_drop_schema(self):
self.assert_compile(
- schema.CreateSchema("sa_schema"),
- "CREATE SCHEMA sa_schema"
+ schema.CreateSchema("sa_schema"), "CREATE SCHEMA sa_schema"
)
self.assert_compile(
- schema.DropSchema("sa_schema"),
- "DROP SCHEMA sa_schema"
+ schema.DropSchema("sa_schema"), "DROP SCHEMA sa_schema"
)
self.assert_compile(
schema.DropSchema("sa_schema", cascade=True),
- "DROP SCHEMA sa_schema CASCADE"
+ "DROP SCHEMA sa_schema CASCADE",
)
def test_iteration(self):
metadata = MetaData()
table1 = Table(
- 'table1',
+ "table1",
metadata,
- Column(
- 'col1',
- Integer,
- primary_key=True),
- schema='someschema')
+ Column("col1", Integer, primary_key=True),
+ schema="someschema",
+ )
table2 = Table(
- 'table2',
+ "table2",
metadata,
- Column(
- 'col1',
- Integer,
- primary_key=True),
- Column(
- 'col2',
- Integer,
- ForeignKey('someschema.table1.col1')),
- schema='someschema')
+ Column("col1", Integer, primary_key=True),
+ Column("col2", Integer, ForeignKey("someschema.table1.col1")),
+ schema="someschema",
+ )
t1 = str(schema.CreateTable(table1).compile(bind=testing.db))
t2 = str(schema.CreateTable(table2).compile(bind=testing.db))
@@ -2098,16 +2218,18 @@ class SchemaTest(fixtures.TestBase, AssertsCompiledSQL):
class UseExistingTest(fixtures.TablesTest):
-
@classmethod
def define_tables(cls, metadata):
- Table('users', metadata,
- Column('id', Integer, primary_key=True),
- Column('name', String(30)))
+ Table(
+ "users",
+ metadata,
+ Column("id", Integer, primary_key=True),
+ Column("name", String(30)),
+ )
def _useexisting_fixture(self):
meta2 = MetaData(testing.db)
- Table('users', meta2, autoload=True)
+ Table("users", meta2, autoload=True)
return meta2
def _notexisting_fixture(self):
@@ -2117,21 +2239,23 @@ class UseExistingTest(fixtures.TablesTest):
meta2 = self._useexisting_fixture()
def go():
- Table('users', meta2, Column('name',
- Unicode), autoload=True)
+ Table("users", meta2, Column("name", Unicode), autoload=True)
+
assert_raises_message(
exc.InvalidRequestError,
- "Table 'users' is already defined for this "
- "MetaData instance.",
- go
+ "Table 'users' is already defined for this " "MetaData instance.",
+ go,
)
def test_keep_plus_existing_raises(self):
meta2 = self._useexisting_fixture()
assert_raises(
exc.ArgumentError,
- Table, 'users', meta2, keep_existing=True,
- extend_existing=True
+ Table,
+ "users",
+ meta2,
+ keep_existing=True,
+ extend_existing=True,
)
@testing.uses_deprecated()
@@ -2139,118 +2263,152 @@ class UseExistingTest(fixtures.TablesTest):
meta2 = self._useexisting_fixture()
assert_raises(
exc.ArgumentError,
- Table, 'users', meta2, useexisting=True,
- extend_existing=True
+ Table,
+ "users",
+ meta2,
+ useexisting=True,
+ extend_existing=True,
)
def test_keep_existing_no_dupe_constraints(self):
meta2 = self._notexisting_fixture()
- users = Table('users', meta2,
- Column('id', Integer),
- Column('name', Unicode),
- UniqueConstraint('name'),
- keep_existing=True
- )
- assert 'name' in users.c
- assert 'id' in users.c
+ users = Table(
+ "users",
+ meta2,
+ Column("id", Integer),
+ Column("name", Unicode),
+ UniqueConstraint("name"),
+ keep_existing=True,
+ )
+ assert "name" in users.c
+ assert "id" in users.c
eq_(len(users.constraints), 2)
- u2 = Table('users', meta2,
- Column('id', Integer),
- Column('name', Unicode),
- UniqueConstraint('name'),
- keep_existing=True
- )
+ u2 = Table(
+ "users",
+ meta2,
+ Column("id", Integer),
+ Column("name", Unicode),
+ UniqueConstraint("name"),
+ keep_existing=True,
+ )
eq_(len(u2.constraints), 2)
def test_extend_existing_dupes_constraints(self):
meta2 = self._notexisting_fixture()
- users = Table('users', meta2,
- Column('id', Integer),
- Column('name', Unicode),
- UniqueConstraint('name'),
- extend_existing=True
- )
- assert 'name' in users.c
- assert 'id' in users.c
+ users = Table(
+ "users",
+ meta2,
+ Column("id", Integer),
+ Column("name", Unicode),
+ UniqueConstraint("name"),
+ extend_existing=True,
+ )
+ assert "name" in users.c
+ assert "id" in users.c
eq_(len(users.constraints), 2)
- u2 = Table('users', meta2,
- Column('id', Integer),
- Column('name', Unicode),
- UniqueConstraint('name'),
- extend_existing=True
- )
+ u2 = Table(
+ "users",
+ meta2,
+ Column("id", Integer),
+ Column("name", Unicode),
+ UniqueConstraint("name"),
+ extend_existing=True,
+ )
# constraint got duped
eq_(len(u2.constraints), 3)
def test_keep_existing_coltype(self):
meta2 = self._useexisting_fixture()
- users = Table('users', meta2, Column('name', Unicode),
- autoload=True, keep_existing=True)
+ users = Table(
+ "users",
+ meta2,
+ Column("name", Unicode),
+ autoload=True,
+ keep_existing=True,
+ )
assert not isinstance(users.c.name.type, Unicode)
def test_keep_existing_quote(self):
meta2 = self._useexisting_fixture()
- users = Table('users', meta2, quote=True, autoload=True,
- keep_existing=True)
+ users = Table(
+ "users", meta2, quote=True, autoload=True, keep_existing=True
+ )
assert not users.name.quote
def test_keep_existing_add_column(self):
meta2 = self._useexisting_fixture()
- users = Table('users', meta2,
- Column('foo', Integer),
- autoload=True,
- keep_existing=True)
+ users = Table(
+ "users",
+ meta2,
+ Column("foo", Integer),
+ autoload=True,
+ keep_existing=True,
+ )
assert "foo" not in users.c
def test_keep_existing_coltype_no_orig(self):
meta2 = self._notexisting_fixture()
- users = Table('users', meta2, Column('name', Unicode),
- autoload=True, keep_existing=True)
+ users = Table(
+ "users",
+ meta2,
+ Column("name", Unicode),
+ autoload=True,
+ keep_existing=True,
+ )
assert isinstance(users.c.name.type, Unicode)
@testing.skip_if(
lambda: testing.db.dialect.requires_name_normalize,
- "test depends on lowercase as case insensitive")
+ "test depends on lowercase as case insensitive",
+ )
def test_keep_existing_quote_no_orig(self):
meta2 = self._notexisting_fixture()
- users = Table('users', meta2, quote=True,
- autoload=True,
- keep_existing=True)
+ users = Table(
+ "users", meta2, quote=True, autoload=True, keep_existing=True
+ )
assert users.name.quote
def test_keep_existing_add_column_no_orig(self):
meta2 = self._notexisting_fixture()
- users = Table('users', meta2,
- Column('foo', Integer),
- autoload=True,
- keep_existing=True)
+ users = Table(
+ "users",
+ meta2,
+ Column("foo", Integer),
+ autoload=True,
+ keep_existing=True,
+ )
assert "foo" in users.c
def test_keep_existing_coltype_no_reflection(self):
meta2 = self._useexisting_fixture()
- users = Table('users', meta2, Column('name', Unicode),
- keep_existing=True)
+ users = Table(
+ "users", meta2, Column("name", Unicode), keep_existing=True
+ )
assert not isinstance(users.c.name.type, Unicode)
def test_keep_existing_quote_no_reflection(self):
meta2 = self._useexisting_fixture()
- users = Table('users', meta2, quote=True,
- keep_existing=True)
+ users = Table("users", meta2, quote=True, keep_existing=True)
assert not users.name.quote
def test_keep_existing_add_column_no_reflection(self):
meta2 = self._useexisting_fixture()
- users = Table('users', meta2,
- Column('foo', Integer),
- keep_existing=True)
+ users = Table(
+ "users", meta2, Column("foo", Integer), keep_existing=True
+ )
assert "foo" not in users.c
def test_extend_existing_coltype(self):
meta2 = self._useexisting_fixture()
- users = Table('users', meta2, Column('name', Unicode),
- autoload=True, extend_existing=True)
+ users = Table(
+ "users",
+ meta2,
+ Column("name", Unicode),
+ autoload=True,
+ extend_existing=True,
+ )
assert isinstance(users.c.name.type, Unicode)
def test_extend_existing_quote(self):
@@ -2258,46 +2416,63 @@ class UseExistingTest(fixtures.TablesTest):
assert_raises_message(
tsa.exc.ArgumentError,
"Can't redefine 'quote' or 'quote_schema' arguments",
- Table, 'users', meta2, quote=True, autoload=True,
- extend_existing=True
+ Table,
+ "users",
+ meta2,
+ quote=True,
+ autoload=True,
+ extend_existing=True,
)
def test_extend_existing_add_column(self):
meta2 = self._useexisting_fixture()
- users = Table('users', meta2,
- Column('foo', Integer),
- autoload=True,
- extend_existing=True)
+ users = Table(
+ "users",
+ meta2,
+ Column("foo", Integer),
+ autoload=True,
+ extend_existing=True,
+ )
assert "foo" in users.c
def test_extend_existing_coltype_no_orig(self):
meta2 = self._notexisting_fixture()
- users = Table('users', meta2, Column('name', Unicode),
- autoload=True, extend_existing=True)
+ users = Table(
+ "users",
+ meta2,
+ Column("name", Unicode),
+ autoload=True,
+ extend_existing=True,
+ )
assert isinstance(users.c.name.type, Unicode)
@testing.skip_if(
lambda: testing.db.dialect.requires_name_normalize,
- "test depends on lowercase as case insensitive")
+ "test depends on lowercase as case insensitive",
+ )
def test_extend_existing_quote_no_orig(self):
meta2 = self._notexisting_fixture()
- users = Table('users', meta2, quote=True,
- autoload=True,
- extend_existing=True)
+ users = Table(
+ "users", meta2, quote=True, autoload=True, extend_existing=True
+ )
assert users.name.quote
def test_extend_existing_add_column_no_orig(self):
meta2 = self._notexisting_fixture()
- users = Table('users', meta2,
- Column('foo', Integer),
- autoload=True,
- extend_existing=True)
+ users = Table(
+ "users",
+ meta2,
+ Column("foo", Integer),
+ autoload=True,
+ extend_existing=True,
+ )
assert "foo" in users.c
def test_extend_existing_coltype_no_reflection(self):
meta2 = self._useexisting_fixture()
- users = Table('users', meta2, Column('name', Unicode),
- extend_existing=True)
+ users = Table(
+ "users", meta2, Column("name", Unicode), extend_existing=True
+ )
assert isinstance(users.c.name.type, Unicode)
def test_extend_existing_quote_no_reflection(self):
@@ -2305,35 +2480,30 @@ class UseExistingTest(fixtures.TablesTest):
assert_raises_message(
tsa.exc.ArgumentError,
"Can't redefine 'quote' or 'quote_schema' arguments",
- Table, 'users', meta2, quote=True,
- extend_existing=True
+ Table,
+ "users",
+ meta2,
+ quote=True,
+ extend_existing=True,
)
def test_extend_existing_add_column_no_reflection(self):
meta2 = self._useexisting_fixture()
- users = Table('users', meta2,
- Column('foo', Integer),
- extend_existing=True)
+ users = Table(
+ "users", meta2, Column("foo", Integer), extend_existing=True
+ )
assert "foo" in users.c
class ConstraintTest(fixtures.TestBase):
-
def _single_fixture(self):
m = MetaData()
- t1 = Table('t1', m,
- Column('a', Integer),
- Column('b', Integer)
- )
+ t1 = Table("t1", m, Column("a", Integer), Column("b", Integer))
- t2 = Table('t2', m,
- Column('a', Integer, ForeignKey('t1.a'))
- )
+ t2 = Table("t2", m, Column("a", Integer, ForeignKey("t1.a")))
- t3 = Table('t3', m,
- Column('a', Integer)
- )
+ t3 = Table("t3", m, Column("a", Integer))
return t1, t2, t3
def _assert_index_col_x(self, t, i, columns=True):
@@ -2346,106 +2516,108 @@ class ConstraintTest(fixtures.TestBase):
def test_separate_decl_columns(self):
m = MetaData()
- t = Table('t', m, Column('x', Integer))
- i = Index('i', t.c.x)
+ t = Table("t", m, Column("x", Integer))
+ i = Index("i", t.c.x)
self._assert_index_col_x(t, i)
def test_separate_decl_columns_functional(self):
m = MetaData()
- t = Table('t', m, Column('x', Integer))
- i = Index('i', func.foo(t.c.x))
+ t = Table("t", m, Column("x", Integer))
+ i = Index("i", func.foo(t.c.x))
self._assert_index_col_x(t, i)
def test_index_no_cols_private_table_arg(self):
m = MetaData()
- t = Table('t', m, Column('x', Integer))
- i = Index('i', _table=t)
+ t = Table("t", m, Column("x", Integer))
+ i = Index("i", _table=t)
is_(i.table, t)
eq_(list(i.columns), [])
def test_index_w_cols_private_table_arg(self):
m = MetaData()
- t = Table('t', m, Column('x', Integer))
- i = Index('i', t.c.x, _table=t)
+ t = Table("t", m, Column("x", Integer))
+ i = Index("i", t.c.x, _table=t)
is_(i.table, t)
eq_(i.columns, [t.c.x])
def test_inline_decl_columns(self):
m = MetaData()
- c = Column('x', Integer)
- i = Index('i', c)
- t = Table('t', m, c, i)
+ c = Column("x", Integer)
+ i = Index("i", c)
+ t = Table("t", m, c, i)
self._assert_index_col_x(t, i)
def test_inline_decl_columns_functional(self):
m = MetaData()
- c = Column('x', Integer)
- i = Index('i', func.foo(c))
- t = Table('t', m, c, i)
+ c = Column("x", Integer)
+ i = Index("i", func.foo(c))
+ t = Table("t", m, c, i)
self._assert_index_col_x(t, i)
def test_inline_decl_string(self):
m = MetaData()
- i = Index('i', "x")
- t = Table('t', m, Column('x', Integer), i)
+ i = Index("i", "x")
+ t = Table("t", m, Column("x", Integer), i)
self._assert_index_col_x(t, i)
def test_inline_decl_textonly(self):
m = MetaData()
- i = Index('i', text("foobar(x)"))
- t = Table('t', m, Column('x', Integer), i)
+ i = Index("i", text("foobar(x)"))
+ t = Table("t", m, Column("x", Integer), i)
self._assert_index_col_x(t, i, columns=False)
def test_separate_decl_textonly(self):
m = MetaData()
- i = Index('i', text("foobar(x)"))
- t = Table('t', m, Column('x', Integer))
+ i = Index("i", text("foobar(x)"))
+ t = Table("t", m, Column("x", Integer))
t.append_constraint(i)
self._assert_index_col_x(t, i, columns=False)
def test_unnamed_column_exception(self):
# this can occur in some declarative situations
c = Column(Integer)
- idx = Index('q', c)
+ idx = Index("q", c)
m = MetaData()
- t = Table('t', m, Column('q'))
+ t = Table("t", m, Column("q"))
assert_raises_message(
exc.ArgumentError,
"Can't add unnamed column to column collection",
- t.append_constraint, idx
+ t.append_constraint,
+ idx,
)
def test_column_associated_w_lowercase_table(self):
from sqlalchemy import table
- c = Column('x', Integer)
- table('foo', c)
- idx = Index('q', c)
+
+ c = Column("x", Integer)
+ table("foo", c)
+ idx = Index("q", c)
is_(idx.table, None) # lower-case-T table doesn't have indexes
def test_clauseelement_extraction_one(self):
- t = Table('t', MetaData(), Column('x', Integer), Column('y', Integer))
+ t = Table("t", MetaData(), Column("x", Integer), Column("y", Integer))
class MyThing(object):
def __clause_element__(self):
return t.c.x + 5
- idx = Index('foo', MyThing())
+ idx = Index("foo", MyThing())
self._assert_index_col_x(t, idx)
def test_clauseelement_extraction_two(self):
- t = Table('t', MetaData(), Column('x', Integer), Column('y', Integer))
+ t = Table("t", MetaData(), Column("x", Integer), Column("y", Integer))
class MyThing(object):
def __clause_element__(self):
return t.c.x + 5
- idx = Index('bar', MyThing(), t.c.y)
+ idx = Index("bar", MyThing(), t.c.y)
eq_(set(t.indexes), set([idx]))
def test_clauseelement_extraction_three(self):
- t = Table('t', MetaData(), Column('x', Integer), Column('y', Integer))
+ t = Table("t", MetaData(), Column("x", Integer), Column("y", Integer))
expr1 = t.c.x + 5
@@ -2453,7 +2625,7 @@ class ConstraintTest(fixtures.TestBase):
def __clause_element__(self):
return expr1
- idx = Index('bar', MyThing(), t.c.y)
+ idx = Index("bar", MyThing(), t.c.y)
is_(idx.expressions[0], expr1)
is_(idx.expressions[1], t.c.y)
@@ -2493,59 +2665,52 @@ class ConstraintTest(fixtures.TestBase):
is_(fkc.referred_table, t1)
def test_referred_table_accessor_not_available(self):
- t1 = Table('t', MetaData(), Column('x', ForeignKey('q.id')))
+ t1 = Table("t", MetaData(), Column("x", ForeignKey("q.id")))
fkc = list(t1.foreign_key_constraints)[0]
assert_raises_message(
exc.InvalidRequestError,
"Foreign key associated with column 't.x' could not find "
"table 'q' with which to generate a foreign key to target "
"column 'id'",
- getattr, fkc, "referred_table"
+ getattr,
+ fkc,
+ "referred_table",
)
def test_related_column_not_present_atfirst_ok(self):
m = MetaData()
- base_table = Table("base", m,
- Column("id", Integer, primary_key=True)
- )
- fk = ForeignKey('base.q')
- derived_table = Table("derived", m,
- Column("id", None, fk,
- primary_key=True),
- )
-
- base_table.append_column(Column('q', Integer))
+ base_table = Table("base", m, Column("id", Integer, primary_key=True))
+ fk = ForeignKey("base.q")
+ derived_table = Table(
+ "derived", m, Column("id", None, fk, primary_key=True)
+ )
+
+ base_table.append_column(Column("q", Integer))
assert fk.column is base_table.c.q
assert isinstance(derived_table.c.id.type, Integer)
def test_related_column_not_present_atfirst_ok_onname(self):
m = MetaData()
- base_table = Table("base", m,
- Column("id", Integer, primary_key=True)
- )
- fk = ForeignKey('base.q', link_to_name=True)
- derived_table = Table("derived", m,
- Column("id", None, fk,
- primary_key=True),
- )
-
- base_table.append_column(Column('q', Integer, key='zz'))
+ base_table = Table("base", m, Column("id", Integer, primary_key=True))
+ fk = ForeignKey("base.q", link_to_name=True)
+ derived_table = Table(
+ "derived", m, Column("id", None, fk, primary_key=True)
+ )
+
+ base_table.append_column(Column("q", Integer, key="zz"))
assert fk.column is base_table.c.zz
assert isinstance(derived_table.c.id.type, Integer)
def test_related_column_not_present_atfirst_ok_linktoname_conflict(self):
m = MetaData()
- base_table = Table("base", m,
- Column("id", Integer, primary_key=True)
- )
- fk = ForeignKey('base.q', link_to_name=True)
- derived_table = Table("derived", m,
- Column("id", None, fk,
- primary_key=True),
- )
-
- base_table.append_column(Column('zz', Integer, key='q'))
- base_table.append_column(Column('q', Integer, key='zz'))
+ base_table = Table("base", m, Column("id", Integer, primary_key=True))
+ fk = ForeignKey("base.q", link_to_name=True)
+ derived_table = Table(
+ "derived", m, Column("id", None, fk, primary_key=True)
+ )
+
+ base_table.append_column(Column("zz", Integer, key="q"))
+ base_table.append_column(Column("q", Integer, key="zz"))
assert fk.column is base_table.c.zz
assert isinstance(derived_table.c.id.type, Integer)
@@ -2557,134 +2722,136 @@ class ConstraintTest(fixtures.TestBase):
r"ForeignKeyConstraint on t1\(x, y\) refers to "
"multiple remote tables: t2 and t3",
Table,
- 't1', m, Column('x', Integer), Column('y', Integer),
- ForeignKeyConstraint(['x', 'y'], ['t2.x', 't3.y'])
+ "t1",
+ m,
+ Column("x", Integer),
+ Column("y", Integer),
+ ForeignKeyConstraint(["x", "y"], ["t2.x", "t3.y"]),
)
def test_invalid_composite_fk_check_columns(self):
m = MetaData()
- t2 = Table('t2', m, Column('x', Integer))
- t3 = Table('t3', m, Column('y', Integer))
+ t2 = Table("t2", m, Column("x", Integer))
+ t3 = Table("t3", m, Column("y", Integer))
assert_raises_message(
exc.ArgumentError,
r"ForeignKeyConstraint on t1\(x, y\) refers to "
"multiple remote tables: t2 and t3",
Table,
- 't1', m, Column('x', Integer), Column('y', Integer),
- ForeignKeyConstraint(['x', 'y'], [t2.c.x, t3.c.y])
+ "t1",
+ m,
+ Column("x", Integer),
+ Column("y", Integer),
+ ForeignKeyConstraint(["x", "y"], [t2.c.x, t3.c.y]),
)
def test_invalid_composite_fk_check_columns_notattached(self):
m = MetaData()
- x = Column('x', Integer)
- y = Column('y', Integer)
+ x = Column("x", Integer)
+ y = Column("y", Integer)
# no error is raised for this one right now.
# which is a minor bug.
- Table('t1', m, Column('x', Integer), Column('y', Integer),
- ForeignKeyConstraint(['x', 'y'], [x, y])
- )
+ Table(
+ "t1",
+ m,
+ Column("x", Integer),
+ Column("y", Integer),
+ ForeignKeyConstraint(["x", "y"], [x, y]),
+ )
- Table('t2', m, x)
- Table('t3', m, y)
+ Table("t2", m, x)
+ Table("t3", m, y)
def test_constraint_copied_to_proxy_ok(self):
m = MetaData()
- Table('t1', m, Column('id', Integer, primary_key=True))
- t2 = Table('t2', m, Column('id', Integer, ForeignKey('t1.id'),
- primary_key=True))
+ Table("t1", m, Column("id", Integer, primary_key=True))
+ t2 = Table(
+ "t2",
+ m,
+ Column("id", Integer, ForeignKey("t1.id"), primary_key=True),
+ )
s = tsa.select([t2])
t2fk = list(t2.c.id.foreign_keys)[0]
sfk = list(s.c.id.foreign_keys)[0]
# the two FKs share the ForeignKeyConstraint
- is_(
- t2fk.constraint,
- sfk.constraint
- )
+ is_(t2fk.constraint, sfk.constraint)
# but the ForeignKeyConstraint isn't
# aware of the select's FK
- eq_(
- t2fk.constraint.elements,
- [t2fk]
- )
+ eq_(t2fk.constraint.elements, [t2fk])
def test_type_propagate_composite_fk_string(self):
metadata = MetaData()
Table(
- 'a', metadata,
- Column('key1', Integer, primary_key=True),
- Column('key2', String(40), primary_key=True))
-
- b = Table('b', metadata,
- Column('a_key1', None),
- Column('a_key2', None),
- Column('id', Integer, primary_key=True),
- ForeignKeyConstraint(['a_key1', 'a_key2'],
- ['a.key1', 'a.key2'])
- )
+ "a",
+ metadata,
+ Column("key1", Integer, primary_key=True),
+ Column("key2", String(40), primary_key=True),
+ )
+
+ b = Table(
+ "b",
+ metadata,
+ Column("a_key1", None),
+ Column("a_key2", None),
+ Column("id", Integer, primary_key=True),
+ ForeignKeyConstraint(["a_key1", "a_key2"], ["a.key1", "a.key2"]),
+ )
assert isinstance(b.c.a_key1.type, Integer)
assert isinstance(b.c.a_key2.type, String)
def test_type_propagate_composite_fk_col(self):
metadata = MetaData()
- a = Table('a', metadata,
- Column('key1', Integer, primary_key=True),
- Column('key2', String(40), primary_key=True))
-
- b = Table('b', metadata,
- Column('a_key1', None),
- Column('a_key2', None),
- Column('id', Integer, primary_key=True),
- ForeignKeyConstraint(['a_key1', 'a_key2'],
- [a.c.key1, a.c.key2])
- )
+ a = Table(
+ "a",
+ metadata,
+ Column("key1", Integer, primary_key=True),
+ Column("key2", String(40), primary_key=True),
+ )
+
+ b = Table(
+ "b",
+ metadata,
+ Column("a_key1", None),
+ Column("a_key2", None),
+ Column("id", Integer, primary_key=True),
+ ForeignKeyConstraint(["a_key1", "a_key2"], [a.c.key1, a.c.key2]),
+ )
assert isinstance(b.c.a_key1.type, Integer)
assert isinstance(b.c.a_key2.type, String)
def test_type_propagate_standalone_fk_string(self):
metadata = MetaData()
- Table(
- 'a', metadata,
- Column('key1', Integer, primary_key=True))
+ Table("a", metadata, Column("key1", Integer, primary_key=True))
- b = Table('b', metadata,
- Column('a_key1', None, ForeignKey("a.key1")),
- )
+ b = Table("b", metadata, Column("a_key1", None, ForeignKey("a.key1")))
assert isinstance(b.c.a_key1.type, Integer)
def test_type_propagate_standalone_fk_col(self):
metadata = MetaData()
- a = Table('a', metadata,
- Column('key1', Integer, primary_key=True))
+ a = Table("a", metadata, Column("key1", Integer, primary_key=True))
- b = Table('b', metadata,
- Column('a_key1', None, ForeignKey(a.c.key1)),
- )
+ b = Table("b", metadata, Column("a_key1", None, ForeignKey(a.c.key1)))
assert isinstance(b.c.a_key1.type, Integer)
def test_type_propagate_chained_string_source_first(self):
metadata = MetaData()
- Table(
- 'a', metadata,
- Column('key1', Integer, primary_key=True)
- )
+ Table("a", metadata, Column("key1", Integer, primary_key=True))
- b = Table('b', metadata,
- Column('a_key1', None, ForeignKey("a.key1")),
- )
+ b = Table("b", metadata, Column("a_key1", None, ForeignKey("a.key1")))
- c = Table('c', metadata,
- Column('b_key1', None, ForeignKey("b.a_key1")),
- )
+ c = Table(
+ "c", metadata, Column("b_key1", None, ForeignKey("b.a_key1"))
+ )
assert isinstance(b.c.a_key1.type, Integer)
assert isinstance(c.c.b_key1.type, Integer)
@@ -2692,17 +2859,13 @@ class ConstraintTest(fixtures.TestBase):
def test_type_propagate_chained_string_source_last(self):
metadata = MetaData()
- b = Table('b', metadata,
- Column('a_key1', None, ForeignKey("a.key1")),
- )
+ b = Table("b", metadata, Column("a_key1", None, ForeignKey("a.key1")))
- c = Table('c', metadata,
- Column('b_key1', None, ForeignKey("b.a_key1")),
- )
+ c = Table(
+ "c", metadata, Column("b_key1", None, ForeignKey("b.a_key1"))
+ )
- Table(
- 'a', metadata,
- Column('key1', Integer, primary_key=True))
+ Table("a", metadata, Column("key1", Integer, primary_key=True))
assert isinstance(b.c.a_key1.type, Integer)
assert isinstance(c.c.b_key1.type, Integer)
@@ -2710,21 +2873,31 @@ class ConstraintTest(fixtures.TestBase):
def test_type_propagate_chained_string_source_last_onname(self):
metadata = MetaData()
- b = Table('b', metadata,
- Column(
- 'a_key1', None,
- ForeignKey("a.key1", link_to_name=True), key="ak1"),
- )
+ b = Table(
+ "b",
+ metadata,
+ Column(
+ "a_key1",
+ None,
+ ForeignKey("a.key1", link_to_name=True),
+ key="ak1",
+ ),
+ )
- c = Table('c', metadata,
- Column(
- 'b_key1', None,
- ForeignKey("b.a_key1", link_to_name=True), key="bk1"),
- )
+ c = Table(
+ "c",
+ metadata,
+ Column(
+ "b_key1",
+ None,
+ ForeignKey("b.a_key1", link_to_name=True),
+ key="bk1",
+ ),
+ )
Table(
- 'a', metadata,
- Column('key1', Integer, primary_key=True, key='ak1'))
+ "a", metadata, Column("key1", Integer, primary_key=True, key="ak1")
+ )
assert isinstance(b.c.ak1.type, Integer)
assert isinstance(c.c.bk1.type, Integer)
@@ -2732,34 +2905,41 @@ class ConstraintTest(fixtures.TestBase):
def test_type_propagate_chained_string_source_last_onname_conflict(self):
metadata = MetaData()
- b = Table('b', metadata,
- # b.c.key1 -> a.c.key1 -> String
- Column(
- 'ak1', None,
- ForeignKey("a.key1", link_to_name=False), key="key1"),
- # b.c.ak1 -> a.c.ak1 -> Integer
- Column(
- 'a_key1', None,
- ForeignKey("a.key1", link_to_name=True), key="ak1"),
- )
-
- c = Table('c', metadata,
- # c.c.b_key1 -> b.c.ak1 -> Integer
- Column(
- 'b_key1', None,
- ForeignKey("b.ak1", link_to_name=False)),
- # c.c.b_ak1 -> b.c.ak1
- Column(
- 'b_ak1', None,
- ForeignKey("b.ak1", link_to_name=True)),
- )
+ b = Table(
+ "b",
+ metadata,
+ # b.c.key1 -> a.c.key1 -> String
+ Column(
+ "ak1",
+ None,
+ ForeignKey("a.key1", link_to_name=False),
+ key="key1",
+ ),
+ # b.c.ak1 -> a.c.ak1 -> Integer
+ Column(
+ "a_key1",
+ None,
+ ForeignKey("a.key1", link_to_name=True),
+ key="ak1",
+ ),
+ )
+
+ c = Table(
+ "c",
+ metadata,
+ # c.c.b_key1 -> b.c.ak1 -> Integer
+ Column("b_key1", None, ForeignKey("b.ak1", link_to_name=False)),
+ # c.c.b_ak1 -> b.c.ak1
+ Column("b_ak1", None, ForeignKey("b.ak1", link_to_name=True)),
+ )
Table(
- 'a', metadata,
+ "a",
+ metadata,
# a.c.key1
- Column('ak1', String, key="key1"),
+ Column("ak1", String, key="key1"),
# a.c.ak1
- Column('key1', Integer, primary_key=True, key='ak1'),
+ Column("key1", Integer, primary_key=True, key="ak1"),
)
assert isinstance(b.c.key1.type, String)
@@ -2770,30 +2950,26 @@ class ConstraintTest(fixtures.TestBase):
def test_type_propagate_chained_col_orig_first(self):
metadata = MetaData()
- a = Table('a', metadata,
- Column('key1', Integer, primary_key=True))
+ a = Table("a", metadata, Column("key1", Integer, primary_key=True))
- b = Table('b', metadata,
- Column('a_key1', None, ForeignKey(a.c.key1)),
- )
+ b = Table("b", metadata, Column("a_key1", None, ForeignKey(a.c.key1)))
- c = Table('c', metadata,
- Column('b_key1', None, ForeignKey(b.c.a_key1)),
- )
+ c = Table(
+ "c", metadata, Column("b_key1", None, ForeignKey(b.c.a_key1))
+ )
assert isinstance(b.c.a_key1.type, Integer)
assert isinstance(c.c.b_key1.type, Integer)
def test_column_accessor_col(self):
- c1 = Column('x', Integer)
+ c1 = Column("x", Integer)
fk = ForeignKey(c1)
is_(fk.column, c1)
def test_column_accessor_clause_element(self):
- c1 = Column('x', Integer)
+ c1 = Column("x", Integer)
class CThing(object):
-
def __init__(self, c):
self.c = c
@@ -2809,50 +2985,58 @@ class ConstraintTest(fixtures.TestBase):
exc.InvalidRequestError,
"this ForeignKey object does not yet have a parent "
"Column associated with it.",
- getattr, fk, "column"
+ getattr,
+ fk,
+ "column",
)
def test_column_accessor_string_no_parent_table(self):
fk = ForeignKey("sometable.somecol")
- Column('x', fk)
+ Column("x", fk)
assert_raises_message(
exc.InvalidRequestError,
"this ForeignKey's parent column is not yet "
"associated with a Table.",
- getattr, fk, "column"
+ getattr,
+ fk,
+ "column",
)
def test_column_accessor_string_no_target_table(self):
fk = ForeignKey("sometable.somecol")
- c1 = Column('x', fk)
- Table('t', MetaData(), c1)
+ c1 = Column("x", fk)
+ Table("t", MetaData(), c1)
assert_raises_message(
exc.NoReferencedTableError,
"Foreign key associated with column 't.x' could not find "
"table 'sometable' with which to generate a "
"foreign key to target column 'somecol'",
- getattr, fk, "column"
+ getattr,
+ fk,
+ "column",
)
def test_column_accessor_string_no_target_column(self):
fk = ForeignKey("sometable.somecol")
- c1 = Column('x', fk)
+ c1 = Column("x", fk)
m = MetaData()
- Table('t', m, c1)
- Table("sometable", m, Column('notsomecol', Integer))
+ Table("t", m, c1)
+ Table("sometable", m, Column("notsomecol", Integer))
assert_raises_message(
exc.NoReferencedColumnError,
"Could not initialize target column for ForeignKey "
"'sometable.somecol' on table 't': "
"table 'sometable' has no column named 'somecol'",
- getattr, fk, "column"
+ getattr,
+ fk,
+ "column",
)
def test_remove_table_fk_bookkeeping(self):
metadata = MetaData()
- fk = ForeignKey('t1.x')
- t2 = Table('t2', metadata, Column('y', Integer, fk))
- t3 = Table('t3', metadata, Column('y', Integer, ForeignKey('t1.x')))
+ fk = ForeignKey("t1.x")
+ t2 = Table("t2", metadata, Column("y", Integer, fk))
+ t3 = Table("t3", metadata, Column("y", Integer, ForeignKey("t1.x")))
assert t2.key in metadata.tables
assert ("t1", "x") in metadata._fk_memos
@@ -2869,13 +3053,15 @@ class ConstraintTest(fixtures.TestBase):
assert fk not in metadata._fk_memos[("t1", "x")]
# make the referenced table
- t1 = Table('t1', metadata, Column('x', Integer))
+ t1 = Table("t1", metadata, Column("x", Integer))
# t2 tells us exactly what's wrong
assert_raises_message(
exc.InvalidRequestError,
"Table t2 is no longer associated with its parent MetaData",
- getattr, fk, "column"
+ getattr,
+ fk,
+ "column",
)
# t3 is unaffected
@@ -2885,63 +3071,51 @@ class ConstraintTest(fixtures.TestBase):
metadata.remove(t2)
def test_double_fk_usage_raises(self):
- f = ForeignKey('b.id')
+ f = ForeignKey("b.id")
- Column('x', Integer, f)
+ Column("x", Integer, f)
assert_raises(exc.InvalidRequestError, Column, "y", Integer, f)
def test_auto_append_constraint(self):
m = MetaData()
- t = Table('tbl', m,
- Column('a', Integer),
- Column('b', Integer)
- )
+ t = Table("tbl", m, Column("a", Integer), Column("b", Integer))
- t2 = Table('t2', m,
- Column('a', Integer),
- Column('b', Integer)
- )
+ t2 = Table("t2", m, Column("a", Integer), Column("b", Integer))
for c in (
UniqueConstraint(t.c.a),
CheckConstraint(t.c.a > 5),
ForeignKeyConstraint([t.c.a], [t2.c.a]),
- PrimaryKeyConstraint(t.c.a)
+ PrimaryKeyConstraint(t.c.a),
):
assert c in t.constraints
t.append_constraint(c)
assert c in t.constraints
- c = Index('foo', t.c.a)
+ c = Index("foo", t.c.a)
assert c in t.indexes
def test_auto_append_lowercase_table(self):
from sqlalchemy import table, column
- t = table('t', column('a'))
- t2 = table('t2', column('a'))
+ t = table("t", column("a"))
+ t2 = table("t2", column("a"))
for c in (
UniqueConstraint(t.c.a),
CheckConstraint(t.c.a > 5),
ForeignKeyConstraint([t.c.a], [t2.c.a]),
PrimaryKeyConstraint(t.c.a),
- Index('foo', t.c.a)
+ Index("foo", t.c.a),
):
assert True
def test_tometadata_ok(self):
m = MetaData()
- t = Table('tbl', m,
- Column('a', Integer),
- Column('b', Integer)
- )
+ t = Table("tbl", m, Column("a", Integer), Column("b", Integer))
- t2 = Table('t2', m,
- Column('a', Integer),
- Column('b', Integer)
- )
+ t2 = Table("t2", m, Column("a", Integer), Column("b", Integer))
UniqueConstraint(t.c.a)
CheckConstraint(t.c.a > 5)
@@ -2959,10 +3133,7 @@ class ConstraintTest(fixtures.TestBase):
def test_check_constraint_copy(self):
m = MetaData()
- t = Table('tbl', m,
- Column('a', Integer),
- Column('b', Integer)
- )
+ t = Table("tbl", m, Column("a", Integer), Column("b", Integer))
ck = CheckConstraint(t.c.a > 5)
ck2 = ck.copy()
assert ck in t.constraints
@@ -2971,15 +3142,9 @@ class ConstraintTest(fixtures.TestBase):
def test_ambig_check_constraint_auto_append(self):
m = MetaData()
- t = Table('tbl', m,
- Column('a', Integer),
- Column('b', Integer)
- )
+ t = Table("tbl", m, Column("a", Integer), Column("b", Integer))
- t2 = Table('t2', m,
- Column('a', Integer),
- Column('b', Integer)
- )
+ t2 = Table("t2", m, Column("a", Integer), Column("b", Integer))
c = CheckConstraint(t.c.a > t2.c.b)
assert c not in t.constraints
assert c not in t2.constraints
@@ -2987,22 +3152,22 @@ class ConstraintTest(fixtures.TestBase):
def test_auto_append_ck_on_col_attach_one(self):
m = MetaData()
- a = Column('a', Integer)
- b = Column('b', Integer)
+ a = Column("a", Integer)
+ b = Column("b", Integer)
ck = CheckConstraint(a > b)
- t = Table('tbl', m, a, b)
+ t = Table("tbl", m, a, b)
assert ck in t.constraints
def test_auto_append_ck_on_col_attach_two(self):
m = MetaData()
- a = Column('a', Integer)
- b = Column('b', Integer)
- c = Column('c', Integer)
+ a = Column("a", Integer)
+ b = Column("b", Integer)
+ c = Column("c", Integer)
ck = CheckConstraint(a > b + c)
- t = Table('tbl', m, a)
+ t = Table("tbl", m, a)
assert ck not in t.constraints
t.append_column(b)
@@ -3014,18 +3179,18 @@ class ConstraintTest(fixtures.TestBase):
def test_auto_append_ck_on_col_attach_three(self):
m = MetaData()
- a = Column('a', Integer)
- b = Column('b', Integer)
- c = Column('c', Integer)
+ a = Column("a", Integer)
+ b = Column("b", Integer)
+ c = Column("c", Integer)
ck = CheckConstraint(a > b + c)
- t = Table('tbl', m, a)
+ t = Table("tbl", m, a)
assert ck not in t.constraints
t.append_column(b)
assert ck not in t.constraints
- t2 = Table('t2', m)
+ t2 = Table("t2", m)
t2.append_column(c)
# two different tables, so CheckConstraint does nothing.
@@ -3034,22 +3199,22 @@ class ConstraintTest(fixtures.TestBase):
def test_auto_append_uq_on_col_attach_one(self):
m = MetaData()
- a = Column('a', Integer)
- b = Column('b', Integer)
+ a = Column("a", Integer)
+ b = Column("b", Integer)
uq = UniqueConstraint(a, b)
- t = Table('tbl', m, a, b)
+ t = Table("tbl", m, a, b)
assert uq in t.constraints
def test_auto_append_uq_on_col_attach_two(self):
m = MetaData()
- a = Column('a', Integer)
- b = Column('b', Integer)
- c = Column('c', Integer)
+ a = Column("a", Integer)
+ b = Column("b", Integer)
+ c = Column("c", Integer)
uq = UniqueConstraint(a, b, c)
- t = Table('tbl', m, a)
+ t = Table("tbl", m, a)
assert uq not in t.constraints
t.append_column(b)
@@ -3061,24 +3226,25 @@ class ConstraintTest(fixtures.TestBase):
def test_auto_append_uq_on_col_attach_three(self):
m = MetaData()
- a = Column('a', Integer)
- b = Column('b', Integer)
- c = Column('c', Integer)
+ a = Column("a", Integer)
+ b = Column("b", Integer)
+ c = Column("c", Integer)
uq = UniqueConstraint(a, b, c)
- t = Table('tbl', m, a)
+ t = Table("tbl", m, a)
assert uq not in t.constraints
t.append_column(b)
assert uq not in t.constraints
- t2 = Table('t2', m)
+ t2 = Table("t2", m)
# two different tables, so UniqueConstraint raises
assert_raises_message(
exc.ArgumentError,
r"Column\(s\) 't2\.c' are not part of table 'tbl'\.",
- t2.append_column, c
+ t2.append_column,
+ c,
)
def test_auto_append_uq_on_col_attach_four(self):
@@ -3088,12 +3254,12 @@ class ConstraintTest(fixtures.TestBase):
"""
m = MetaData()
- a = Column('a', Integer)
- b = Column('b', Integer)
- c = Column('c', Integer)
- uq = UniqueConstraint(a, 'b', 'c')
+ a = Column("a", Integer)
+ b = Column("b", Integer)
+ c = Column("c", Integer)
+ uq = UniqueConstraint(a, "b", "c")
- t = Table('tbl', m, a)
+ t = Table("tbl", m, a)
assert uq not in t.constraints
t.append_column(b)
@@ -3111,7 +3277,7 @@ class ConstraintTest(fixtures.TestBase):
eq_(
[cn for cn in t.constraints if isinstance(cn, UniqueConstraint)],
- [uq]
+ [uq],
)
def test_auto_append_uq_on_col_attach_five(self):
@@ -3121,13 +3287,13 @@ class ConstraintTest(fixtures.TestBase):
"""
m = MetaData()
- a = Column('a', Integer)
- b = Column('b', Integer)
- c = Column('c', Integer)
+ a = Column("a", Integer)
+ b = Column("b", Integer)
+ c = Column("c", Integer)
- t = Table('tbl', m, a, c, b)
+ t = Table("tbl", m, a, c, b)
- uq = UniqueConstraint(a, 'b', 'c')
+ uq = UniqueConstraint(a, "b", "c")
assert uq in t.constraints
@@ -3137,38 +3303,36 @@ class ConstraintTest(fixtures.TestBase):
eq_(
[cn for cn in t.constraints if isinstance(cn, UniqueConstraint)],
- [uq]
+ [uq],
)
def test_index_asserts_cols_standalone(self):
metadata = MetaData()
- t1 = Table('t1', metadata,
- Column('x', Integer)
- )
- t2 = Table('t2', metadata,
- Column('y', Integer)
- )
+ t1 = Table("t1", metadata, Column("x", Integer))
+ t2 = Table("t2", metadata, Column("y", Integer))
assert_raises_message(
exc.ArgumentError,
r"Column\(s\) 't2.y' are not part of table 't1'.",
Index,
- "bar", t1.c.x, t2.c.y
+ "bar",
+ t1.c.x,
+ t2.c.y,
)
def test_index_asserts_cols_inline(self):
metadata = MetaData()
- t1 = Table('t1', metadata,
- Column('x', Integer)
- )
+ t1 = Table("t1", metadata, Column("x", Integer))
assert_raises_message(
exc.ArgumentError,
"Index 'bar' is against table 't1', and "
"cannot be associated with table 't2'.",
- Table, 't2', metadata,
- Column('y', Integer),
- Index('bar', t1.c.x)
+ Table,
+ "t2",
+ metadata,
+ Column("y", Integer),
+ Index("bar", t1.c.x),
)
def test_raise_index_nonexistent_name(self):
@@ -3176,28 +3340,25 @@ class ConstraintTest(fixtures.TestBase):
# the KeyError isn't ideal here, a nicer message
# perhaps
assert_raises(
- KeyError,
- Table, 't', m, Column('x', Integer), Index("foo", "q")
+ KeyError, Table, "t", m, Column("x", Integer), Index("foo", "q")
)
def test_raise_not_a_column(self):
- assert_raises(
- exc.ArgumentError,
- Index, "foo", 5
- )
+ assert_raises(exc.ArgumentError, Index, "foo", 5)
def test_raise_expr_no_column(self):
- idx = Index('foo', func.lower(5))
+ idx = Index("foo", func.lower(5))
assert_raises_message(
exc.CompileError,
"Index 'foo' is not associated with any table.",
- schema.CreateIndex(idx).compile, dialect=testing.db.dialect
+ schema.CreateIndex(idx).compile,
+ dialect=testing.db.dialect,
)
assert_raises_message(
exc.CompileError,
"Index 'foo' is not associated with any table.",
- schema.CreateIndex(idx).compile
+ schema.CreateIndex(idx).compile,
)
def test_no_warning_w_no_columns(self):
@@ -3206,26 +3367,29 @@ class ConstraintTest(fixtures.TestBase):
assert_raises_message(
exc.CompileError,
"Index 'foo' is not associated with any table.",
- schema.CreateIndex(idx).compile, dialect=testing.db.dialect
+ schema.CreateIndex(idx).compile,
+ dialect=testing.db.dialect,
)
assert_raises_message(
exc.CompileError,
"Index 'foo' is not associated with any table.",
- schema.CreateIndex(idx).compile
+ schema.CreateIndex(idx).compile,
)
def test_raise_clauseelement_not_a_column(self):
m = MetaData()
- t2 = Table('t2', m, Column('x', Integer))
+ t2 = Table("t2", m, Column("x", Integer))
class SomeClass(object):
-
def __clause_element__(self):
return t2
+
assert_raises_message(
exc.ArgumentError,
r"Element Table\('t2', .* is not a string name or column element",
- Index, "foo", SomeClass()
+ Index,
+ "foo",
+ SomeClass(),
)
@@ -3233,28 +3397,30 @@ class ColumnDefinitionTest(AssertsCompiledSQL, fixtures.TestBase):
"""Test Column() construction."""
- __dialect__ = 'default'
+ __dialect__ = "default"
def columns(self):
- return [Column(Integer),
- Column('b', Integer),
- Column(Integer),
- Column('d', Integer),
- Column(Integer, name='e'),
- Column(type_=Integer),
- Column(Integer()),
- Column('h', Integer()),
- Column(type_=Integer())]
+ return [
+ Column(Integer),
+ Column("b", Integer),
+ Column(Integer),
+ Column("d", Integer),
+ Column(Integer, name="e"),
+ Column(type_=Integer),
+ Column(Integer()),
+ Column("h", Integer()),
+ Column(type_=Integer()),
+ ]
def test_basic(self):
c = self.columns()
- for i, v in ((0, 'a'), (2, 'c'), (5, 'f'), (6, 'g'), (8, 'i')):
+ for i, v in ((0, "a"), (2, "c"), (5, "f"), (6, "g"), (8, "i")):
c[i].name = v
c[i].key = v
del i, v
- tbl = Table('table', MetaData(), *c)
+ tbl = Table("table", MetaData(), *c)
for i, col in enumerate(tbl.c):
assert col.name == c[i].name
@@ -3266,35 +3432,47 @@ class ColumnDefinitionTest(AssertsCompiledSQL, fixtures.TestBase):
exc.ArgumentError,
"Column must be constructed with a non-blank name or assign a "
"non-blank .name ",
- Table, 't', MetaData(), c)
+ Table,
+ "t",
+ MetaData(),
+ c,
+ )
def test_name_blank(self):
- c = Column('', Integer)
+ c = Column("", Integer)
assert_raises_message(
exc.ArgumentError,
"Column must be constructed with a non-blank name or assign a "
"non-blank .name ",
- Table, 't', MetaData(), c)
+ Table,
+ "t",
+ MetaData(),
+ c,
+ )
def test_dupe_column(self):
- c = Column('x', Integer)
- Table('t', MetaData(), c)
+ c = Column("x", Integer)
+ Table("t", MetaData(), c)
assert_raises_message(
exc.ArgumentError,
"Column object 'x' already assigned to Table 't'",
- Table, 'q', MetaData(), c)
+ Table,
+ "q",
+ MetaData(),
+ c,
+ )
def test_incomplete_key(self):
c = Column(Integer)
assert c.name is None
assert c.key is None
- c.name = 'named'
- Table('t', MetaData(), c)
+ c.name = "named"
+ Table("t", MetaData(), c)
- assert c.name == 'named'
+ assert c.name == "named"
assert c.name == c.key
def test_unique_index_flags_default_to_none(self):
@@ -3302,28 +3480,29 @@ class ColumnDefinitionTest(AssertsCompiledSQL, fixtures.TestBase):
eq_(c.unique, None)
eq_(c.index, None)
- c = Column('c', Integer, index=True)
+ c = Column("c", Integer, index=True)
eq_(c.unique, None)
eq_(c.index, True)
- t = Table('t', MetaData(), c)
+ t = Table("t", MetaData(), c)
eq_(list(t.indexes)[0].unique, False)
c = Column(Integer, unique=True)
eq_(c.unique, True)
eq_(c.index, None)
- c = Column('c', Integer, index=True, unique=True)
+ c = Column("c", Integer, index=True, unique=True)
eq_(c.unique, True)
eq_(c.index, True)
- t = Table('t', MetaData(), c)
+ t = Table("t", MetaData(), c)
eq_(list(t.indexes)[0].unique, True)
def test_bogus(self):
- assert_raises(exc.ArgumentError, Column, 'foo', name='bar')
- assert_raises(exc.ArgumentError, Column, 'foo', Integer,
- type_=Integer())
+ assert_raises(exc.ArgumentError, Column, "foo", name="bar")
+ assert_raises(
+ exc.ArgumentError, Column, "foo", Integer, type_=Integer()
+ )
def test_custom_subclass_proxy(self):
"""test proxy generation of a Column subclass, can be compiled."""
@@ -3333,9 +3512,8 @@ class ColumnDefinitionTest(AssertsCompiledSQL, fixtures.TestBase):
from sqlalchemy.sql import select
class MyColumn(Column):
-
def _constructor(self, name, type, **kw):
- kw['name'] = name
+ kw["name"] = name
return MyColumn(type, **kw)
def __init__(self, type, **kw):
@@ -3350,13 +3528,10 @@ class ColumnDefinitionTest(AssertsCompiledSQL, fixtures.TestBase):
return s + "-"
id = MyColumn(Integer, primary_key=True)
- id.name = 'id'
+ id.name = "id"
name = MyColumn(String)
- name.name = 'name'
- t1 = Table('foo', MetaData(),
- id,
- name
- )
+ name.name = "name"
+ t1 = Table("foo", MetaData(), id, name)
# goofy thing
eq_(t1.c.name.my_goofy_thing(), "hi")
@@ -3380,24 +3555,22 @@ class ColumnDefinitionTest(AssertsCompiledSQL, fixtures.TestBase):
from sqlalchemy.sql import select
class MyColumn(Column):
-
def __init__(self, type, **kw):
Column.__init__(self, type, **kw)
id = MyColumn(Integer, primary_key=True)
- id.name = 'id'
+ id.name = "id"
name = MyColumn(String)
- name.name = 'name'
- t1 = Table('foo', MetaData(),
- id,
- name
- )
+ name.name = "name"
+ t1 = Table("foo", MetaData(), id, name)
assert_raises_message(
TypeError,
"Could not create a copy of this <class "
"'test.sql.test_metadata..*MyColumn'> "
"object. Ensure the class includes a _constructor()",
- getattr, select([t1.select().alias()]), 'c'
+ getattr,
+ select([t1.select().alias()]),
+ "c",
)
def test_custom_create(self):
@@ -3412,7 +3585,7 @@ class ColumnDefinitionTest(AssertsCompiledSQL, fixtures.TestBase):
text = "%s SPECIAL DIRECTIVE %s" % (
column.name,
- compiler.type_compiler.process(column.type)
+ compiler.type_compiler.process(column.type),
)
default = compiler.get_column_default_string(column)
if default is not None:
@@ -3423,23 +3596,23 @@ class ColumnDefinitionTest(AssertsCompiledSQL, fixtures.TestBase):
if column.constraints:
text += " ".join(
- compiler.process(const)
- for const in column.constraints)
+ compiler.process(const) for const in column.constraints
+ )
return text
t = Table(
- 'mytable', MetaData(),
- Column('x', Integer, info={
- "special": True}, primary_key=True),
- Column('y', String(50)),
- Column('z', String(20), info={
- "special": True}))
+ "mytable",
+ MetaData(),
+ Column("x", Integer, info={"special": True}, primary_key=True),
+ Column("y", String(50)),
+ Column("z", String(20), info={"special": True}),
+ )
self.assert_compile(
schema.CreateTable(t),
"CREATE TABLE mytable (x SPECIAL DIRECTIVE INTEGER "
"NOT NULL, y VARCHAR(50), "
- "z SPECIAL DIRECTIVE VARCHAR(20), PRIMARY KEY (x))"
+ "z SPECIAL DIRECTIVE VARCHAR(20), PRIMARY KEY (x))",
)
deregister(schema.CreateColumn)
@@ -3450,127 +3623,126 @@ class ColumnDefaultsTest(fixtures.TestBase):
"""test assignment of default fixures to columns"""
def _fixture(self, *arg, **kw):
- return Column('x', Integer, *arg, **kw)
+ return Column("x", Integer, *arg, **kw)
def test_server_default_positional(self):
- target = schema.DefaultClause('y')
+ target = schema.DefaultClause("y")
c = self._fixture(target)
assert c.server_default is target
assert target.column is c
def test_onupdate_default_not_server_default_one(self):
- target1 = schema.DefaultClause('y')
- target2 = schema.DefaultClause('z')
+ target1 = schema.DefaultClause("y")
+ target2 = schema.DefaultClause("z")
c = self._fixture(server_default=target1, server_onupdate=target2)
- eq_(c.server_default.arg, 'y')
- eq_(c.server_onupdate.arg, 'z')
+ eq_(c.server_default.arg, "y")
+ eq_(c.server_onupdate.arg, "z")
def test_onupdate_default_not_server_default_two(self):
- target1 = schema.DefaultClause('y', for_update=True)
- target2 = schema.DefaultClause('z', for_update=True)
+ target1 = schema.DefaultClause("y", for_update=True)
+ target2 = schema.DefaultClause("z", for_update=True)
c = self._fixture(server_default=target1, server_onupdate=target2)
- eq_(c.server_default.arg, 'y')
- eq_(c.server_onupdate.arg, 'z')
+ eq_(c.server_default.arg, "y")
+ eq_(c.server_onupdate.arg, "z")
def test_onupdate_default_not_server_default_three(self):
- target1 = schema.DefaultClause('y', for_update=False)
- target2 = schema.DefaultClause('z', for_update=True)
+ target1 = schema.DefaultClause("y", for_update=False)
+ target2 = schema.DefaultClause("z", for_update=True)
c = self._fixture(target1, target2)
- eq_(c.server_default.arg, 'y')
- eq_(c.server_onupdate.arg, 'z')
+ eq_(c.server_default.arg, "y")
+ eq_(c.server_onupdate.arg, "z")
def test_onupdate_default_not_server_default_four(self):
- target1 = schema.DefaultClause('y', for_update=False)
+ target1 = schema.DefaultClause("y", for_update=False)
c = self._fixture(server_onupdate=target1)
is_(c.server_default, None)
- eq_(c.server_onupdate.arg, 'y')
+ eq_(c.server_onupdate.arg, "y")
def test_server_default_keyword_as_schemaitem(self):
- target = schema.DefaultClause('y')
+ target = schema.DefaultClause("y")
c = self._fixture(server_default=target)
assert c.server_default is target
assert target.column is c
def test_server_default_keyword_as_clause(self):
- target = 'y'
+ target = "y"
c = self._fixture(server_default=target)
assert c.server_default.arg == target
assert c.server_default.column is c
def test_server_default_onupdate_positional(self):
- target = schema.DefaultClause('y', for_update=True)
+ target = schema.DefaultClause("y", for_update=True)
c = self._fixture(target)
assert c.server_onupdate is target
assert target.column is c
def test_server_default_onupdate_keyword_as_schemaitem(self):
- target = schema.DefaultClause('y', for_update=True)
+ target = schema.DefaultClause("y", for_update=True)
c = self._fixture(server_onupdate=target)
assert c.server_onupdate is target
assert target.column is c
def test_server_default_onupdate_keyword_as_clause(self):
- target = 'y'
+ target = "y"
c = self._fixture(server_onupdate=target)
assert c.server_onupdate.arg == target
assert c.server_onupdate.column is c
def test_column_default_positional(self):
- target = schema.ColumnDefault('y')
+ target = schema.ColumnDefault("y")
c = self._fixture(target)
assert c.default is target
assert target.column is c
def test_column_default_keyword_as_schemaitem(self):
- target = schema.ColumnDefault('y')
+ target = schema.ColumnDefault("y")
c = self._fixture(default=target)
assert c.default is target
assert target.column is c
def test_column_default_keyword_as_clause(self):
- target = 'y'
+ target = "y"
c = self._fixture(default=target)
assert c.default.arg == target
assert c.default.column is c
def test_column_default_onupdate_positional(self):
- target = schema.ColumnDefault('y', for_update=True)
+ target = schema.ColumnDefault("y", for_update=True)
c = self._fixture(target)
assert c.onupdate is target
assert target.column is c
def test_column_default_onupdate_keyword_as_schemaitem(self):
- target = schema.ColumnDefault('y', for_update=True)
+ target = schema.ColumnDefault("y", for_update=True)
c = self._fixture(onupdate=target)
assert c.onupdate is target
assert target.column is c
def test_column_default_onupdate_keyword_as_clause(self):
- target = 'y'
+ target = "y"
c = self._fixture(onupdate=target)
assert c.onupdate.arg == target
assert c.onupdate.column is c
class ColumnOptionsTest(fixtures.TestBase):
-
def test_default_generators(self):
- g1, g2 = Sequence('foo_id_seq'), ColumnDefault('f5')
+ g1, g2 = Sequence("foo_id_seq"), ColumnDefault("f5")
assert Column(String, default=g1).default is g1
assert Column(String, onupdate=g1).onupdate is g1
assert Column(String, default=g2).default is g2
assert Column(String, onupdate=g2).onupdate is g2
def _null_type_error(self, col):
- t = Table('t', MetaData(), col)
+ t = Table("t", MetaData(), col)
assert_raises_message(
exc.CompileError,
r"\(in table 't', column 'foo'\): Can't generate DDL for NullType",
- schema.CreateTable(t).compile
+ schema.CreateTable(t).compile,
)
def _no_name_error(self, col):
@@ -3578,13 +3750,16 @@ class ColumnOptionsTest(fixtures.TestBase):
exc.ArgumentError,
"Column must be constructed with a non-blank name or "
"assign a non-blank .name",
- Table, 't', MetaData(), col
+ Table,
+ "t",
+ MetaData(),
+ col,
)
def _no_error(self, col):
m = MetaData()
- b = Table('bar', m, Column('id', Integer))
- t = Table('t', m, col)
+ b = Table("bar", m, Column("id", Integer))
+ t = Table("t", m, col)
schema.CreateTable(t).compile()
def test_argument_signatures(self):
@@ -3597,71 +3772,82 @@ class ColumnOptionsTest(fixtures.TestBase):
self._null_type_error(Column("foo", Sequence("a")))
- self._no_name_error(Column(ForeignKey('bar.id')))
+ self._no_name_error(Column(ForeignKey("bar.id")))
- self._no_error(Column("foo", ForeignKey('bar.id')))
+ self._no_error(Column("foo", ForeignKey("bar.id")))
- self._no_name_error(Column(ForeignKey('bar.id'), default="foo"))
+ self._no_name_error(Column(ForeignKey("bar.id"), default="foo"))
- self._no_name_error(Column(ForeignKey('bar.id'), Sequence("a")))
- self._no_error(Column("foo", ForeignKey('bar.id'), default="foo"))
- self._no_error(Column("foo", ForeignKey('bar.id'), Sequence("a")))
+ self._no_name_error(Column(ForeignKey("bar.id"), Sequence("a")))
+ self._no_error(Column("foo", ForeignKey("bar.id"), default="foo"))
+ self._no_error(Column("foo", ForeignKey("bar.id"), Sequence("a")))
def test_column_info(self):
- c1 = Column('foo', String, info={'x': 'y'})
- c2 = Column('bar', String, info={})
- c3 = Column('bat', String)
- assert c1.info == {'x': 'y'}
+ c1 = Column("foo", String, info={"x": "y"})
+ c2 = Column("bar", String, info={})
+ c3 = Column("bat", String)
+ assert c1.info == {"x": "y"}
assert c2.info == {}
assert c3.info == {}
for c in (c1, c2, c3):
- c.info['bar'] = 'zip'
- assert c.info['bar'] == 'zip'
+ c.info["bar"] = "zip"
+ assert c.info["bar"] == "zip"
class CatchAllEventsTest(fixtures.RemovesEvents, fixtures.TestBase):
-
def test_all_events(self):
canary = []
def before_attach(obj, parent):
- canary.append("%s->%s" % (obj.__class__.__name__,
- parent.__class__.__name__))
+ canary.append(
+ "%s->%s" % (obj.__class__.__name__, parent.__class__.__name__)
+ )
def after_attach(obj, parent):
canary.append("%s->%s" % (obj.__class__.__name__, parent))
self.event_listen(
- schema.SchemaItem,
- "before_parent_attach",
- before_attach)
+ schema.SchemaItem, "before_parent_attach", before_attach
+ )
self.event_listen(
- schema.SchemaItem,
- "after_parent_attach",
- after_attach)
+ schema.SchemaItem, "after_parent_attach", after_attach
+ )
m = MetaData()
- Table('t1', m,
- Column('id', Integer, Sequence('foo_id'), primary_key=True),
- Column('bar', String, ForeignKey('t2.id'))
- )
- Table('t2', m,
- Column('id', Integer, primary_key=True),
- )
+ Table(
+ "t1",
+ m,
+ Column("id", Integer, Sequence("foo_id"), primary_key=True),
+ Column("bar", String, ForeignKey("t2.id")),
+ )
+ Table("t2", m, Column("id", Integer, primary_key=True))
eq_(
canary,
- ['Sequence->Column', 'Sequence->id', 'ForeignKey->Column',
- 'ForeignKey->bar', 'Table->MetaData',
- 'PrimaryKeyConstraint->Table', 'PrimaryKeyConstraint->t1',
- 'Column->Table', 'Column->t1', 'Column->Table',
- 'Column->t1', 'ForeignKeyConstraint->Table',
- 'ForeignKeyConstraint->t1', 'Table->MetaData(bind=None)',
- 'Table->MetaData', 'PrimaryKeyConstraint->Table',
- 'PrimaryKeyConstraint->t2', 'Column->Table', 'Column->t2',
- 'Table->MetaData(bind=None)']
+ [
+ "Sequence->Column",
+ "Sequence->id",
+ "ForeignKey->Column",
+ "ForeignKey->bar",
+ "Table->MetaData",
+ "PrimaryKeyConstraint->Table",
+ "PrimaryKeyConstraint->t1",
+ "Column->Table",
+ "Column->t1",
+ "Column->Table",
+ "Column->t1",
+ "ForeignKeyConstraint->Table",
+ "ForeignKeyConstraint->t1",
+ "Table->MetaData(bind=None)",
+ "Table->MetaData",
+ "PrimaryKeyConstraint->Table",
+ "PrimaryKeyConstraint->t2",
+ "Column->Table",
+ "Column->t2",
+ "Table->MetaData(bind=None)",
+ ],
)
def test_events_per_constraint(self):
@@ -3669,80 +3855,82 @@ class CatchAllEventsTest(fixtures.RemovesEvents, fixtures.TestBase):
def evt(target):
def before_attach(obj, parent):
- canary.append("%s->%s" % (target.__name__,
- parent.__class__.__name__))
+ canary.append(
+ "%s->%s" % (target.__name__, parent.__class__.__name__)
+ )
def after_attach(obj, parent):
- assert hasattr(obj, 'name') # so we can change it
+ assert hasattr(obj, "name") # so we can change it
canary.append("%s->%s" % (target.__name__, parent))
+
self.event_listen(target, "before_parent_attach", before_attach)
self.event_listen(target, "after_parent_attach", after_attach)
for target in [
- schema.ForeignKeyConstraint, schema.PrimaryKeyConstraint,
+ schema.ForeignKeyConstraint,
+ schema.PrimaryKeyConstraint,
schema.UniqueConstraint,
schema.CheckConstraint,
- schema.Index
+ schema.Index,
]:
evt(target)
m = MetaData()
- Table('t1', m,
- Column('id', Integer, Sequence('foo_id'), primary_key=True),
- Column('bar', String, ForeignKey('t2.id'), index=True),
- Column('bat', Integer, unique=True),
- )
- Table('t2', m,
- Column('id', Integer, primary_key=True),
- Column('bar', Integer),
- Column('bat', Integer),
- CheckConstraint("bar>5"),
- UniqueConstraint('bar', 'bat'),
- Index(None, 'bar', 'bat')
- )
+ Table(
+ "t1",
+ m,
+ Column("id", Integer, Sequence("foo_id"), primary_key=True),
+ Column("bar", String, ForeignKey("t2.id"), index=True),
+ Column("bat", Integer, unique=True),
+ )
+ Table(
+ "t2",
+ m,
+ Column("id", Integer, primary_key=True),
+ Column("bar", Integer),
+ Column("bat", Integer),
+ CheckConstraint("bar>5"),
+ UniqueConstraint("bar", "bat"),
+ Index(None, "bar", "bat"),
+ )
eq_(
canary,
[
- 'PrimaryKeyConstraint->Table', 'PrimaryKeyConstraint->t1',
- 'Index->Table', 'Index->t1',
- 'ForeignKeyConstraint->Table', 'ForeignKeyConstraint->t1',
- 'UniqueConstraint->Table', 'UniqueConstraint->t1',
- 'PrimaryKeyConstraint->Table', 'PrimaryKeyConstraint->t2',
- 'CheckConstraint->Table', 'CheckConstraint->t2',
- 'UniqueConstraint->Table', 'UniqueConstraint->t2',
- 'Index->Table', 'Index->t2'
- ]
+ "PrimaryKeyConstraint->Table",
+ "PrimaryKeyConstraint->t1",
+ "Index->Table",
+ "Index->t1",
+ "ForeignKeyConstraint->Table",
+ "ForeignKeyConstraint->t1",
+ "UniqueConstraint->Table",
+ "UniqueConstraint->t1",
+ "PrimaryKeyConstraint->Table",
+ "PrimaryKeyConstraint->t2",
+ "CheckConstraint->Table",
+ "CheckConstraint->t2",
+ "UniqueConstraint->Table",
+ "UniqueConstraint->t2",
+ "Index->Table",
+ "Index->t2",
+ ],
)
class DialectKWArgTest(fixtures.TestBase):
-
@contextmanager
def _fixture(self):
from sqlalchemy.engine.default import DefaultDialect
class ParticipatingDialect(DefaultDialect):
construct_arguments = [
- (schema.Index, {
- "x": 5,
- "y": False,
- "z_one": None
- }),
- (schema.ForeignKeyConstraint, {
- "foobar": False
- })
+ (schema.Index, {"x": 5, "y": False, "z_one": None}),
+ (schema.ForeignKeyConstraint, {"foobar": False}),
]
class ParticipatingDialect2(DefaultDialect):
construct_arguments = [
- (schema.Index, {
- "x": 9,
- "y": True,
- "pp": "default"
- }),
- (schema.Table, {
- "*": None
- })
+ (schema.Index, {"x": 9, "y": True, "pp": "default"}),
+ (schema.Table, {"*": None}),
]
class NonParticipatingDialect(DefaultDialect):
@@ -3757,6 +3945,7 @@ class DialectKWArgTest(fixtures.TestBase):
return NonParticipatingDialect
else:
raise exc.NoSuchModuleError("no dialect %r" % dialect_name)
+
with mock.patch("sqlalchemy.dialects.registry.load", load):
yield
@@ -3765,32 +3954,21 @@ class DialectKWArgTest(fixtures.TestBase):
def test_participating(self):
with self._fixture():
- idx = Index('a', 'b', 'c', participating_y=True)
+ idx = Index("a", "b", "c", participating_y=True)
eq_(
idx.dialect_options,
- {"participating": {"x": 5, "y": True, "z_one": None}}
- )
- eq_(
- idx.dialect_kwargs,
- {
- 'participating_y': True,
- }
+ {"participating": {"x": 5, "y": True, "z_one": None}},
)
+ eq_(idx.dialect_kwargs, {"participating_y": True})
def test_nonparticipating(self):
with self._fixture():
idx = Index(
- 'a',
- 'b',
- 'c',
- nonparticipating_y=True,
- nonparticipating_q=5)
+ "a", "b", "c", nonparticipating_y=True, nonparticipating_q=5
+ )
eq_(
idx.dialect_kwargs,
- {
- 'nonparticipating_y': True,
- 'nonparticipating_q': 5
- }
+ {"nonparticipating_y": True, "nonparticipating_q": 5},
)
def test_bad_kwarg_raise(self):
@@ -3799,7 +3977,11 @@ class DialectKWArgTest(fixtures.TestBase):
TypeError,
"Additional arguments should be named "
"<dialectname>_<argument>, got 'foobar'",
- Index, 'a', 'b', 'c', foobar=True
+ Index,
+ "a",
+ "b",
+ "c",
+ foobar=True,
)
def test_unknown_dialect_warning(self):
@@ -3808,7 +3990,11 @@ class DialectKWArgTest(fixtures.TestBase):
exc.SAWarning,
"Can't validate argument 'unknown_y'; can't locate "
"any SQLAlchemy dialect named 'unknown'",
- Index, 'a', 'b', 'c', unknown_y=True
+ Index,
+ "a",
+ "b",
+ "c",
+ unknown_y=True,
)
def test_participating_bad_kw(self):
@@ -3818,7 +4004,11 @@ class DialectKWArgTest(fixtures.TestBase):
"Argument 'participating_q_p_x' is not accepted by dialect "
"'participating' on behalf of "
"<class 'sqlalchemy.sql.schema.Index'>",
- Index, 'a', 'b', 'c', participating_q_p_x=8
+ Index,
+ "a",
+ "b",
+ "c",
+ participating_q_p_x=8,
)
def test_participating_unknown_schema_item(self):
@@ -3830,310 +4020,328 @@ class DialectKWArgTest(fixtures.TestBase):
"Argument 'participating_q_p_x' is not accepted by dialect "
"'participating' on behalf of "
"<class 'sqlalchemy.sql.schema.UniqueConstraint'>",
- UniqueConstraint, 'a', 'b', participating_q_p_x=8
+ UniqueConstraint,
+ "a",
+ "b",
+ participating_q_p_x=8,
)
@testing.emits_warning("Can't validate")
def test_unknown_dialect_warning_still_populates(self):
with self._fixture():
- idx = Index('a', 'b', 'c', unknown_y=True)
+ idx = Index("a", "b", "c", unknown_y=True)
eq_(idx.dialect_kwargs, {"unknown_y": True}) # still populates
@testing.emits_warning("Can't validate")
def test_unknown_dialect_warning_still_populates_multiple(self):
with self._fixture():
- idx = Index('a', 'b', 'c', unknown_y=True, unknown_z=5,
- otherunknown_foo='bar', participating_y=8)
+ idx = Index(
+ "a",
+ "b",
+ "c",
+ unknown_y=True,
+ unknown_z=5,
+ otherunknown_foo="bar",
+ participating_y=8,
+ )
eq_(
idx.dialect_options,
{
- "unknown": {'y': True, 'z': 5, '*': None},
- "otherunknown": {'foo': 'bar', '*': None},
- "participating": {'x': 5, 'y': 8, 'z_one': None}
- }
+ "unknown": {"y": True, "z": 5, "*": None},
+ "otherunknown": {"foo": "bar", "*": None},
+ "participating": {"x": 5, "y": 8, "z_one": None},
+ },
)
- eq_(idx.dialect_kwargs,
- {'unknown_z': 5, 'participating_y': 8,
- 'unknown_y': True,
- 'otherunknown_foo': 'bar'}
- ) # still populates
+ eq_(
+ idx.dialect_kwargs,
+ {
+ "unknown_z": 5,
+ "participating_y": 8,
+ "unknown_y": True,
+ "otherunknown_foo": "bar",
+ },
+ ) # still populates
def test_runs_safekwarg(self):
- with mock.patch("sqlalchemy.util.safe_kwarg",
- lambda arg: "goofy_%s" % arg):
+ with mock.patch(
+ "sqlalchemy.util.safe_kwarg", lambda arg: "goofy_%s" % arg
+ ):
with self._fixture():
- idx = Index('a', 'b')
- idx.kwargs[util.u('participating_x')] = 7
+ idx = Index("a", "b")
+ idx.kwargs[util.u("participating_x")] = 7
- eq_(
- list(idx.dialect_kwargs),
- ['goofy_participating_x']
- )
+ eq_(list(idx.dialect_kwargs), ["goofy_participating_x"])
def test_combined(self):
with self._fixture():
- idx = Index('a', 'b', 'c', participating_x=7,
- nonparticipating_y=True)
+ idx = Index(
+ "a", "b", "c", participating_x=7, nonparticipating_y=True
+ )
eq_(
idx.dialect_options,
{
- 'participating': {'y': False, 'x': 7, 'z_one': None},
- 'nonparticipating': {'y': True, '*': None}
- }
+ "participating": {"y": False, "x": 7, "z_one": None},
+ "nonparticipating": {"y": True, "*": None},
+ },
)
eq_(
idx.dialect_kwargs,
- {
- 'participating_x': 7,
- 'nonparticipating_y': True,
- }
+ {"participating_x": 7, "nonparticipating_y": True},
)
def test_multiple_participating(self):
with self._fixture():
- idx = Index('a', 'b', 'c',
- participating_x=7,
- participating2_x=15,
- participating2_y="lazy"
- )
+ idx = Index(
+ "a",
+ "b",
+ "c",
+ participating_x=7,
+ participating2_x=15,
+ participating2_y="lazy",
+ )
eq_(
idx.dialect_options,
{
- "participating": {'x': 7, 'y': False, 'z_one': None},
- "participating2": {'x': 15, 'y': 'lazy', 'pp': 'default'},
- }
+ "participating": {"x": 7, "y": False, "z_one": None},
+ "participating2": {"x": 15, "y": "lazy", "pp": "default"},
+ },
)
eq_(
idx.dialect_kwargs,
{
- 'participating_x': 7,
- 'participating2_x': 15,
- 'participating2_y': 'lazy'
- }
+ "participating_x": 7,
+ "participating2_x": 15,
+ "participating2_y": "lazy",
+ },
)
def test_foreign_key_propagate(self):
with self._fixture():
m = MetaData()
- fk = ForeignKey('t2.id', participating_foobar=True)
- t = Table('t', m, Column('id', Integer, fk))
+ fk = ForeignKey("t2.id", participating_foobar=True)
+ t = Table("t", m, Column("id", Integer, fk))
fkc = [
- c for c in t.constraints if isinstance(
- c,
- ForeignKeyConstraint)][0]
- eq_(
- fkc.dialect_kwargs,
- {'participating_foobar': True}
- )
+ c for c in t.constraints if isinstance(c, ForeignKeyConstraint)
+ ][0]
+ eq_(fkc.dialect_kwargs, {"participating_foobar": True})
def test_foreign_key_propagate_exceptions_delayed(self):
with self._fixture():
m = MetaData()
- fk = ForeignKey('t2.id', participating_fake=True)
- c1 = Column('id', Integer, fk)
+ fk = ForeignKey("t2.id", participating_fake=True)
+ c1 = Column("id", Integer, fk)
assert_raises_message(
exc.ArgumentError,
"Argument 'participating_fake' is not accepted by "
"dialect 'participating' on behalf of "
"<class 'sqlalchemy.sql.schema.ForeignKeyConstraint'>",
- Table, 't', m, c1
+ Table,
+ "t",
+ m,
+ c1,
)
def test_wildcard(self):
with self._fixture():
m = MetaData()
- t = Table('x', m, Column('x', Integer),
- participating2_xyz='foo',
- participating2_engine='InnoDB',
- )
+ t = Table(
+ "x",
+ m,
+ Column("x", Integer),
+ participating2_xyz="foo",
+ participating2_engine="InnoDB",
+ )
eq_(
t.dialect_kwargs,
{
- 'participating2_xyz': 'foo',
- 'participating2_engine': 'InnoDB'
- }
+ "participating2_xyz": "foo",
+ "participating2_engine": "InnoDB",
+ },
)
def test_uninit_wildcard(self):
with self._fixture():
m = MetaData()
- t = Table('x', m, Column('x', Integer))
- eq_(
- t.dialect_options['participating2'], {'*': None}
- )
- eq_(
- t.dialect_kwargs, {}
- )
+ t = Table("x", m, Column("x", Integer))
+ eq_(t.dialect_options["participating2"], {"*": None})
+ eq_(t.dialect_kwargs, {})
def test_not_contains_wildcard(self):
with self._fixture():
m = MetaData()
- t = Table('x', m, Column('x', Integer))
- assert 'foobar' not in t.dialect_options['participating2']
+ t = Table("x", m, Column("x", Integer))
+ assert "foobar" not in t.dialect_options["participating2"]
def test_contains_wildcard(self):
with self._fixture():
m = MetaData()
- t = Table('x', m, Column('x', Integer), participating2_foobar=5)
- assert 'foobar' in t.dialect_options['participating2']
+ t = Table("x", m, Column("x", Integer), participating2_foobar=5)
+ assert "foobar" in t.dialect_options["participating2"]
def test_update(self):
with self._fixture():
- idx = Index('a', 'b', 'c', participating_x=20)
- eq_(idx.dialect_kwargs, {
- "participating_x": 20,
- })
- idx._validate_dialect_kwargs({
- "participating_x": 25,
- "participating_z_one": "default"})
- eq_(idx.dialect_options, {
- "participating": {"x": 25, "y": False, "z_one": "default"}
- })
- eq_(idx.dialect_kwargs, {
- "participating_x": 25,
- 'participating_z_one': "default"
- })
-
- idx._validate_dialect_kwargs({
- "participating_x": 25,
- "participating_z_one": "default"})
-
- eq_(idx.dialect_options, {
- "participating": {"x": 25, "y": False, "z_one": "default"}
- })
- eq_(idx.dialect_kwargs, {
- "participating_x": 25,
- 'participating_z_one': "default"
- })
-
- idx._validate_dialect_kwargs({
- "participating_y": True,
- 'participating2_y': "p2y"})
- eq_(idx.dialect_options, {
- "participating": {"x": 25, "y": True, "z_one": "default"},
- "participating2": {"y": "p2y", "pp": "default", "x": 9}
- })
- eq_(idx.dialect_kwargs, {
- "participating_x": 25,
- "participating_y": True,
- 'participating2_y': "p2y",
- "participating_z_one": "default"})
+ idx = Index("a", "b", "c", participating_x=20)
+ eq_(idx.dialect_kwargs, {"participating_x": 20})
+ idx._validate_dialect_kwargs(
+ {"participating_x": 25, "participating_z_one": "default"}
+ )
+ eq_(
+ idx.dialect_options,
+ {"participating": {"x": 25, "y": False, "z_one": "default"}},
+ )
+ eq_(
+ idx.dialect_kwargs,
+ {"participating_x": 25, "participating_z_one": "default"},
+ )
+
+ idx._validate_dialect_kwargs(
+ {"participating_x": 25, "participating_z_one": "default"}
+ )
+
+ eq_(
+ idx.dialect_options,
+ {"participating": {"x": 25, "y": False, "z_one": "default"}},
+ )
+ eq_(
+ idx.dialect_kwargs,
+ {"participating_x": 25, "participating_z_one": "default"},
+ )
+
+ idx._validate_dialect_kwargs(
+ {"participating_y": True, "participating2_y": "p2y"}
+ )
+ eq_(
+ idx.dialect_options,
+ {
+ "participating": {"x": 25, "y": True, "z_one": "default"},
+ "participating2": {"y": "p2y", "pp": "default", "x": 9},
+ },
+ )
+ eq_(
+ idx.dialect_kwargs,
+ {
+ "participating_x": 25,
+ "participating_y": True,
+ "participating2_y": "p2y",
+ "participating_z_one": "default",
+ },
+ )
def test_key_error_kwargs_no_dialect(self):
with self._fixture():
- idx = Index('a', 'b', 'c')
- assert_raises(
- KeyError,
- idx.kwargs.__getitem__, 'foo_bar'
- )
+ idx = Index("a", "b", "c")
+ assert_raises(KeyError, idx.kwargs.__getitem__, "foo_bar")
def test_key_error_kwargs_no_underscore(self):
with self._fixture():
- idx = Index('a', 'b', 'c')
- assert_raises(
- KeyError,
- idx.kwargs.__getitem__, 'foobar'
- )
+ idx = Index("a", "b", "c")
+ assert_raises(KeyError, idx.kwargs.__getitem__, "foobar")
def test_key_error_kwargs_no_argument(self):
with self._fixture():
- idx = Index('a', 'b', 'c')
+ idx = Index("a", "b", "c")
assert_raises(
- KeyError,
- idx.kwargs.__getitem__, 'participating_asdmfq34098'
+ KeyError, idx.kwargs.__getitem__, "participating_asdmfq34098"
)
assert_raises(
KeyError,
- idx.kwargs.__getitem__, 'nonparticipating_asdmfq34098'
+ idx.kwargs.__getitem__,
+ "nonparticipating_asdmfq34098",
)
def test_key_error_dialect_options(self):
with self._fixture():
- idx = Index('a', 'b', 'c')
+ idx = Index("a", "b", "c")
assert_raises(
KeyError,
- idx.dialect_options['participating'].__getitem__, 'asdfaso890'
+ idx.dialect_options["participating"].__getitem__,
+ "asdfaso890",
)
assert_raises(
KeyError,
- idx.dialect_options['nonparticipating'].__getitem__,
- 'asdfaso890')
+ idx.dialect_options["nonparticipating"].__getitem__,
+ "asdfaso890",
+ )
def test_ad_hoc_participating_via_opt(self):
with self._fixture():
- idx = Index('a', 'b', 'c')
- idx.dialect_options['participating']['foobar'] = 5
+ idx = Index("a", "b", "c")
+ idx.dialect_options["participating"]["foobar"] = 5
- eq_(idx.dialect_options['participating']['foobar'], 5)
- eq_(idx.kwargs['participating_foobar'], 5)
+ eq_(idx.dialect_options["participating"]["foobar"], 5)
+ eq_(idx.kwargs["participating_foobar"], 5)
def test_ad_hoc_nonparticipating_via_opt(self):
with self._fixture():
- idx = Index('a', 'b', 'c')
- idx.dialect_options['nonparticipating']['foobar'] = 5
+ idx = Index("a", "b", "c")
+ idx.dialect_options["nonparticipating"]["foobar"] = 5
- eq_(idx.dialect_options['nonparticipating']['foobar'], 5)
- eq_(idx.kwargs['nonparticipating_foobar'], 5)
+ eq_(idx.dialect_options["nonparticipating"]["foobar"], 5)
+ eq_(idx.kwargs["nonparticipating_foobar"], 5)
def test_ad_hoc_participating_via_kwargs(self):
with self._fixture():
- idx = Index('a', 'b', 'c')
- idx.kwargs['participating_foobar'] = 5
+ idx = Index("a", "b", "c")
+ idx.kwargs["participating_foobar"] = 5
- eq_(idx.dialect_options['participating']['foobar'], 5)
- eq_(idx.kwargs['participating_foobar'], 5)
+ eq_(idx.dialect_options["participating"]["foobar"], 5)
+ eq_(idx.kwargs["participating_foobar"], 5)
def test_ad_hoc_nonparticipating_via_kwargs(self):
with self._fixture():
- idx = Index('a', 'b', 'c')
- idx.kwargs['nonparticipating_foobar'] = 5
+ idx = Index("a", "b", "c")
+ idx.kwargs["nonparticipating_foobar"] = 5
- eq_(idx.dialect_options['nonparticipating']['foobar'], 5)
- eq_(idx.kwargs['nonparticipating_foobar'], 5)
+ eq_(idx.dialect_options["nonparticipating"]["foobar"], 5)
+ eq_(idx.kwargs["nonparticipating_foobar"], 5)
def test_ad_hoc_via_kwargs_invalid_key(self):
with self._fixture():
- idx = Index('a', 'b', 'c')
+ idx = Index("a", "b", "c")
assert_raises_message(
exc.ArgumentError,
"Keys must be of the form <dialectname>_<argname>",
- idx.kwargs.__setitem__, "foobar", 5
+ idx.kwargs.__setitem__,
+ "foobar",
+ 5,
)
def test_ad_hoc_via_kwargs_invalid_dialect(self):
with self._fixture():
- idx = Index('a', 'b', 'c')
+ idx = Index("a", "b", "c")
assert_raises_message(
exc.ArgumentError,
"no dialect 'nonexistent'",
- idx.kwargs.__setitem__, "nonexistent_foobar", 5
+ idx.kwargs.__setitem__,
+ "nonexistent_foobar",
+ 5,
)
def test_add_new_arguments_participating(self):
with self._fixture():
Index.argument_for("participating", "xyzqpr", False)
- idx = Index('a', 'b', 'c', participating_xyzqpr=True)
+ idx = Index("a", "b", "c", participating_xyzqpr=True)
- eq_(idx.kwargs['participating_xyzqpr'], True)
+ eq_(idx.kwargs["participating_xyzqpr"], True)
- idx = Index('a', 'b', 'c')
- eq_(idx.dialect_options['participating']['xyzqpr'], False)
+ idx = Index("a", "b", "c")
+ eq_(idx.dialect_options["participating"]["xyzqpr"], False)
def test_add_new_arguments_participating_no_existing(self):
with self._fixture():
PrimaryKeyConstraint.argument_for("participating", "xyzqpr", False)
- pk = PrimaryKeyConstraint('a', 'b', 'c', participating_xyzqpr=True)
+ pk = PrimaryKeyConstraint("a", "b", "c", participating_xyzqpr=True)
- eq_(pk.kwargs['participating_xyzqpr'], True)
+ eq_(pk.kwargs["participating_xyzqpr"], True)
- pk = PrimaryKeyConstraint('a', 'b', 'c')
- eq_(pk.dialect_options['participating']['xyzqpr'], False)
+ pk = PrimaryKeyConstraint("a", "b", "c")
+ eq_(pk.dialect_options["participating"]["xyzqpr"], False)
def test_add_new_arguments_nonparticipating(self):
with self._fixture():
@@ -4141,7 +4349,10 @@ class DialectKWArgTest(fixtures.TestBase):
exc.ArgumentError,
"Dialect 'nonparticipating' does have keyword-argument "
"validation and defaults enabled configured",
- Index.argument_for, "nonparticipating", "xyzqpr", False
+ Index.argument_for,
+ "nonparticipating",
+ "xyzqpr",
+ False,
)
def test_add_new_arguments_invalid_dialect(self):
@@ -4149,43 +4360,48 @@ class DialectKWArgTest(fixtures.TestBase):
assert_raises_message(
exc.ArgumentError,
"no dialect 'nonexistent'",
- Index.argument_for, "nonexistent", "foobar", 5
+ Index.argument_for,
+ "nonexistent",
+ "foobar",
+ 5,
)
class NamingConventionTest(fixtures.TestBase, AssertsCompiledSQL):
- __dialect__ = 'default'
+ __dialect__ = "default"
def _fixture(self, naming_convention, table_schema=None):
m1 = MetaData(naming_convention=naming_convention)
- u1 = Table('user', m1,
- Column('id', Integer, primary_key=True),
- Column('version', Integer, primary_key=True),
- Column('data', String(30)),
- Column('Data2', String(30), key="data2"),
- Column('Data3', String(30), key="data3"),
- schema=table_schema
- )
+ u1 = Table(
+ "user",
+ m1,
+ Column("id", Integer, primary_key=True),
+ Column("version", Integer, primary_key=True),
+ Column("data", String(30)),
+ Column("Data2", String(30), key="data2"),
+ Column("Data3", String(30), key="data3"),
+ schema=table_schema,
+ )
return u1
def test_uq_name(self):
- u1 = self._fixture(naming_convention={
- "uq": "uq_%(table_name)s_%(column_0_name)s"
- })
+ u1 = self._fixture(
+ naming_convention={"uq": "uq_%(table_name)s_%(column_0_name)s"}
+ )
uq = UniqueConstraint(u1.c.data)
eq_(uq.name, "uq_user_data")
def test_uq_conv_name(self):
- u1 = self._fixture(naming_convention={
- "uq": "uq_%(table_name)s_%(column_0_name)s"
- })
+ u1 = self._fixture(
+ naming_convention={"uq": "uq_%(table_name)s_%(column_0_name)s"}
+ )
uq = UniqueConstraint(u1.c.data, name=naming.conv("myname"))
self.assert_compile(
schema.AddConstraint(uq),
'ALTER TABLE "user" ADD CONSTRAINT myname UNIQUE (data)',
- dialect="default"
+ dialect="default",
)
def test_uq_defer_name_no_convention(self):
@@ -4194,59 +4410,59 @@ class NamingConventionTest(fixtures.TestBase, AssertsCompiledSQL):
self.assert_compile(
schema.AddConstraint(uq),
'ALTER TABLE "user" ADD CONSTRAINT myname UNIQUE (data)',
- dialect="default"
+ dialect="default",
)
def test_uq_defer_name_convention(self):
- u1 = self._fixture(naming_convention={
- "uq": "uq_%(table_name)s_%(column_0_name)s"
- })
+ u1 = self._fixture(
+ naming_convention={"uq": "uq_%(table_name)s_%(column_0_name)s"}
+ )
uq = UniqueConstraint(u1.c.data, name=naming._defer_name("myname"))
self.assert_compile(
schema.AddConstraint(uq),
'ALTER TABLE "user" ADD CONSTRAINT uq_user_data UNIQUE (data)',
- dialect="default"
+ dialect="default",
)
def test_uq_key(self):
- u1 = self._fixture(naming_convention={
- "uq": "uq_%(table_name)s_%(column_0_key)s"
- })
+ u1 = self._fixture(
+ naming_convention={"uq": "uq_%(table_name)s_%(column_0_key)s"}
+ )
uq = UniqueConstraint(u1.c.data, u1.c.data2)
eq_(uq.name, "uq_user_data")
def test_uq_label(self):
- u1 = self._fixture(naming_convention={
- "uq": "uq_%(table_name)s_%(column_0_label)s"
- })
+ u1 = self._fixture(
+ naming_convention={"uq": "uq_%(table_name)s_%(column_0_label)s"}
+ )
uq = UniqueConstraint(u1.c.data, u1.c.data2)
eq_(uq.name, "uq_user_user_data")
def test_uq_allcols_underscore_name(self):
- u1 = self._fixture(naming_convention={
- "uq": "uq_%(table_name)s_%(column_0_N_name)s"
- })
+ u1 = self._fixture(
+ naming_convention={"uq": "uq_%(table_name)s_%(column_0_N_name)s"}
+ )
uq = UniqueConstraint(u1.c.data, u1.c.data2, u1.c.data3)
eq_(uq.name, "uq_user_data_Data2_Data3")
def test_uq_allcols_merged_name(self):
- u1 = self._fixture(naming_convention={
- "uq": "uq_%(table_name)s_%(column_0N_name)s"
- })
+ u1 = self._fixture(
+ naming_convention={"uq": "uq_%(table_name)s_%(column_0N_name)s"}
+ )
uq = UniqueConstraint(u1.c.data, u1.c.data2, u1.c.data3)
eq_(uq.name, "uq_user_dataData2Data3")
def test_uq_allcols_merged_key(self):
- u1 = self._fixture(naming_convention={
- "uq": "uq_%(table_name)s_%(column_0N_key)s"
- })
+ u1 = self._fixture(
+ naming_convention={"uq": "uq_%(table_name)s_%(column_0N_key)s"}
+ )
uq = UniqueConstraint(u1.c.data, u1.c.data2, u1.c.data3)
eq_(uq.name, "uq_user_datadata2data3")
def test_uq_allcols_truncated_name(self):
- u1 = self._fixture(naming_convention={
- "uq": "uq_%(table_name)s_%(column_0N_name)s"
- })
+ u1 = self._fixture(
+ naming_convention={"uq": "uq_%(table_name)s_%(column_0N_name)s"}
+ )
uq = UniqueConstraint(u1.c.data, u1.c.data2, u1.c.data3)
dialect = default.DefaultDialect()
@@ -4255,7 +4471,7 @@ class NamingConventionTest(fixtures.TestBase, AssertsCompiledSQL):
'ALTER TABLE "user" ADD '
'CONSTRAINT "uq_user_dataData2Data3" '
'UNIQUE (data, "Data2", "Data3")',
- dialect=dialect
+ dialect=dialect,
)
dialect.max_identifier_length = 15
@@ -4263,188 +4479,222 @@ class NamingConventionTest(fixtures.TestBase, AssertsCompiledSQL):
schema.AddConstraint(uq),
'ALTER TABLE "user" ADD '
'CONSTRAINT uq_user_2769 UNIQUE (data, "Data2", "Data3")',
- dialect=dialect
+ dialect=dialect,
)
def test_fk_allcols_underscore_name(self):
- u1 = self._fixture(naming_convention={
- "fk": "fk_%(table_name)s_%(column_0_N_name)s_"
- "%(referred_table_name)s_%(referred_column_0_N_name)s"})
+ u1 = self._fixture(
+ naming_convention={
+ "fk": "fk_%(table_name)s_%(column_0_N_name)s_"
+ "%(referred_table_name)s_%(referred_column_0_N_name)s"
+ }
+ )
m1 = u1.metadata
- a1 = Table('address', m1,
- Column('id', Integer, primary_key=True),
- Column('UserData', String(30), key="user_data"),
- Column('UserData2', String(30), key="user_data2"),
- Column('UserData3', String(30), key="user_data3")
- )
- fk = ForeignKeyConstraint(['user_data', 'user_data2', 'user_data3'],
- ['user.data', 'user.data2', 'user.data3'])
+ a1 = Table(
+ "address",
+ m1,
+ Column("id", Integer, primary_key=True),
+ Column("UserData", String(30), key="user_data"),
+ Column("UserData2", String(30), key="user_data2"),
+ Column("UserData3", String(30), key="user_data3"),
+ )
+ fk = ForeignKeyConstraint(
+ ["user_data", "user_data2", "user_data3"],
+ ["user.data", "user.data2", "user.data3"],
+ )
a1.append_constraint(fk)
self.assert_compile(
schema.AddConstraint(fk),
- 'ALTER TABLE address ADD CONSTRAINT '
+ "ALTER TABLE address ADD CONSTRAINT "
'"fk_address_UserData_UserData2_UserData3_user_data_Data2_Data3" '
'FOREIGN KEY("UserData", "UserData2", "UserData3") '
'REFERENCES "user" (data, "Data2", "Data3")',
- dialect=default.DefaultDialect()
+ dialect=default.DefaultDialect(),
)
def test_fk_allcols_merged_name(self):
- u1 = self._fixture(naming_convention={
- "fk": "fk_%(table_name)s_%(column_0N_name)s_"
- "%(referred_table_name)s_%(referred_column_0N_name)s"})
+ u1 = self._fixture(
+ naming_convention={
+ "fk": "fk_%(table_name)s_%(column_0N_name)s_"
+ "%(referred_table_name)s_%(referred_column_0N_name)s"
+ }
+ )
m1 = u1.metadata
- a1 = Table('address', m1,
- Column('id', Integer, primary_key=True),
- Column('UserData', String(30), key="user_data"),
- Column('UserData2', String(30), key="user_data2"),
- Column('UserData3', String(30), key="user_data3")
- )
- fk = ForeignKeyConstraint(['user_data', 'user_data2', 'user_data3'],
- ['user.data', 'user.data2', 'user.data3'])
+ a1 = Table(
+ "address",
+ m1,
+ Column("id", Integer, primary_key=True),
+ Column("UserData", String(30), key="user_data"),
+ Column("UserData2", String(30), key="user_data2"),
+ Column("UserData3", String(30), key="user_data3"),
+ )
+ fk = ForeignKeyConstraint(
+ ["user_data", "user_data2", "user_data3"],
+ ["user.data", "user.data2", "user.data3"],
+ )
a1.append_constraint(fk)
self.assert_compile(
schema.AddConstraint(fk),
- 'ALTER TABLE address ADD CONSTRAINT '
+ "ALTER TABLE address ADD CONSTRAINT "
'"fk_address_UserDataUserData2UserData3_user_dataData2Data3" '
'FOREIGN KEY("UserData", "UserData2", "UserData3") '
'REFERENCES "user" (data, "Data2", "Data3")',
- dialect=default.DefaultDialect()
+ dialect=default.DefaultDialect(),
)
def test_fk_allcols_truncated_name(self):
- u1 = self._fixture(naming_convention={
- "fk": "fk_%(table_name)s_%(column_0N_name)s_"
- "%(referred_table_name)s_%(referred_column_0N_name)s"})
+ u1 = self._fixture(
+ naming_convention={
+ "fk": "fk_%(table_name)s_%(column_0N_name)s_"
+ "%(referred_table_name)s_%(referred_column_0N_name)s"
+ }
+ )
m1 = u1.metadata
- a1 = Table('address', m1,
- Column('id', Integer, primary_key=True),
- Column('UserData', String(30), key="user_data"),
- Column('UserData2', String(30), key="user_data2"),
- Column('UserData3', String(30), key="user_data3")
- )
- fk = ForeignKeyConstraint(['user_data', 'user_data2', 'user_data3'],
- ['user.data', 'user.data2', 'user.data3'])
+ a1 = Table(
+ "address",
+ m1,
+ Column("id", Integer, primary_key=True),
+ Column("UserData", String(30), key="user_data"),
+ Column("UserData2", String(30), key="user_data2"),
+ Column("UserData3", String(30), key="user_data3"),
+ )
+ fk = ForeignKeyConstraint(
+ ["user_data", "user_data2", "user_data3"],
+ ["user.data", "user.data2", "user.data3"],
+ )
a1.append_constraint(fk)
dialect = default.DefaultDialect()
dialect.max_identifier_length = 15
self.assert_compile(
schema.AddConstraint(fk),
- 'ALTER TABLE address ADD CONSTRAINT '
- 'fk_addr_f9ff '
+ "ALTER TABLE address ADD CONSTRAINT "
+ "fk_addr_f9ff "
'FOREIGN KEY("UserData", "UserData2", "UserData3") '
'REFERENCES "user" (data, "Data2", "Data3")',
- dialect=dialect
+ dialect=dialect,
)
def test_ix_allcols_truncation(self):
- u1 = self._fixture(naming_convention={
- "ix": "ix_%(table_name)s_%(column_0N_name)s"
- })
+ u1 = self._fixture(
+ naming_convention={"ix": "ix_%(table_name)s_%(column_0N_name)s"}
+ )
ix = Index(None, u1.c.data, u1.c.data2, u1.c.data3)
dialect = default.DefaultDialect()
dialect.max_identifier_length = 15
self.assert_compile(
schema.CreateIndex(ix),
- 'CREATE INDEX ix_user_2de9 ON '
- '"user" (data, "Data2", "Data3")',
- dialect=dialect
+ "CREATE INDEX ix_user_2de9 ON " '"user" (data, "Data2", "Data3")',
+ dialect=dialect,
)
def test_ix_name(self):
- u1 = self._fixture(naming_convention={
- "ix": "ix_%(table_name)s_%(column_0_name)s"
- })
+ u1 = self._fixture(
+ naming_convention={"ix": "ix_%(table_name)s_%(column_0_name)s"}
+ )
ix = Index(None, u1.c.data)
eq_(ix.name, "ix_user_data")
def test_ck_name_required(self):
- u1 = self._fixture(naming_convention={
- "ck": "ck_%(table_name)s_%(constraint_name)s"
- })
- ck = CheckConstraint(u1.c.data == 'x', name='mycheck')
+ u1 = self._fixture(
+ naming_convention={"ck": "ck_%(table_name)s_%(constraint_name)s"}
+ )
+ ck = CheckConstraint(u1.c.data == "x", name="mycheck")
eq_(ck.name, "ck_user_mycheck")
assert_raises_message(
exc.InvalidRequestError,
r"Naming convention including %\(constraint_name\)s token "
"requires that constraint is explicitly named.",
- CheckConstraint, u1.c.data == 'x'
+ CheckConstraint,
+ u1.c.data == "x",
)
def test_ck_name_deferred_required(self):
- u1 = self._fixture(naming_convention={
- "ck": "ck_%(table_name)s_%(constraint_name)s"
- })
- ck = CheckConstraint(u1.c.data == 'x', name=elements._defer_name(None))
+ u1 = self._fixture(
+ naming_convention={"ck": "ck_%(table_name)s_%(constraint_name)s"}
+ )
+ ck = CheckConstraint(u1.c.data == "x", name=elements._defer_name(None))
assert_raises_message(
exc.InvalidRequestError,
r"Naming convention including %\(constraint_name\)s token "
"requires that constraint is explicitly named.",
- schema.AddConstraint(ck).compile
+ schema.AddConstraint(ck).compile,
)
def test_column_attached_ck_name(self):
- m = MetaData(naming_convention={
- "ck": "ck_%(table_name)s_%(constraint_name)s"
- })
- ck = CheckConstraint('x > 5', name='x1')
- Table('t', m, Column('x', ck))
+ m = MetaData(
+ naming_convention={"ck": "ck_%(table_name)s_%(constraint_name)s"}
+ )
+ ck = CheckConstraint("x > 5", name="x1")
+ Table("t", m, Column("x", ck))
eq_(ck.name, "ck_t_x1")
def test_table_attached_ck_name(self):
- m = MetaData(naming_convention={
- "ck": "ck_%(table_name)s_%(constraint_name)s"
- })
- ck = CheckConstraint('x > 5', name='x1')
- Table('t', m, Column('x', Integer), ck)
+ m = MetaData(
+ naming_convention={"ck": "ck_%(table_name)s_%(constraint_name)s"}
+ )
+ ck = CheckConstraint("x > 5", name="x1")
+ Table("t", m, Column("x", Integer), ck)
eq_(ck.name, "ck_t_x1")
def test_uq_name_already_conv(self):
- m = MetaData(naming_convention={
- "uq": "uq_%(constraint_name)s_%(column_0_name)s"
- })
+ m = MetaData(
+ naming_convention={
+ "uq": "uq_%(constraint_name)s_%(column_0_name)s"
+ }
+ )
- t = Table('mytable', m)
- uq = UniqueConstraint(name=naming.conv('my_special_key'))
+ t = Table("mytable", m)
+ uq = UniqueConstraint(name=naming.conv("my_special_key"))
t.append_constraint(uq)
eq_(uq.name, "my_special_key")
def test_fk_name_schema(self):
- u1 = self._fixture(naming_convention={
- "fk": "fk_%(table_name)s_%(column_0_name)s_"
- "%(referred_table_name)s_%(referred_column_0_name)s"
- }, table_schema="foo")
+ u1 = self._fixture(
+ naming_convention={
+ "fk": "fk_%(table_name)s_%(column_0_name)s_"
+ "%(referred_table_name)s_%(referred_column_0_name)s"
+ },
+ table_schema="foo",
+ )
m1 = u1.metadata
- a1 = Table('address', m1,
- Column('id', Integer, primary_key=True),
- Column('user_id', Integer),
- Column('user_version_id', Integer)
- )
- fk = ForeignKeyConstraint(['user_id', 'user_version_id'],
- ['foo.user.id', 'foo.user.version'])
+ a1 = Table(
+ "address",
+ m1,
+ Column("id", Integer, primary_key=True),
+ Column("user_id", Integer),
+ Column("user_version_id", Integer),
+ )
+ fk = ForeignKeyConstraint(
+ ["user_id", "user_version_id"], ["foo.user.id", "foo.user.version"]
+ )
a1.append_constraint(fk)
eq_(fk.name, "fk_address_user_id_user_id")
def test_fk_attrs(self):
- u1 = self._fixture(naming_convention={
- "fk": "fk_%(table_name)s_%(column_0_name)s_"
- "%(referred_table_name)s_%(referred_column_0_name)s"
- })
+ u1 = self._fixture(
+ naming_convention={
+ "fk": "fk_%(table_name)s_%(column_0_name)s_"
+ "%(referred_table_name)s_%(referred_column_0_name)s"
+ }
+ )
m1 = u1.metadata
- a1 = Table('address', m1,
- Column('id', Integer, primary_key=True),
- Column('user_id', Integer),
- Column('user_version_id', Integer)
- )
- fk = ForeignKeyConstraint(['user_id', 'user_version_id'],
- ['user.id', 'user.version'])
+ a1 = Table(
+ "address",
+ m1,
+ Column("id", Integer, primary_key=True),
+ Column("user_id", Integer),
+ Column("user_version_id", Integer),
+ )
+ fk = ForeignKeyConstraint(
+ ["user_id", "user_version_id"], ["user.id", "user.version"]
+ )
a1.append_constraint(fk)
eq_(fk.name, "fk_address_user_id_user_id")
@@ -4452,32 +4702,38 @@ class NamingConventionTest(fixtures.TestBase, AssertsCompiledSQL):
def key_hash(const, table):
return "HASH_%s" % table.name
- u1 = self._fixture(naming_convention={
- "fk": "fk_%(table_name)s_%(key_hash)s",
- "key_hash": key_hash
- })
+ u1 = self._fixture(
+ naming_convention={
+ "fk": "fk_%(table_name)s_%(key_hash)s",
+ "key_hash": key_hash,
+ }
+ )
m1 = u1.metadata
- a1 = Table('address', m1,
- Column('id', Integer, primary_key=True),
- Column('user_id', Integer),
- Column('user_version_id', Integer)
- )
- fk = ForeignKeyConstraint(['user_id', 'user_version_id'],
- ['user.id', 'user.version'])
+ a1 = Table(
+ "address",
+ m1,
+ Column("id", Integer, primary_key=True),
+ Column("user_id", Integer),
+ Column("user_version_id", Integer),
+ )
+ fk = ForeignKeyConstraint(
+ ["user_id", "user_version_id"], ["user.id", "user.version"]
+ )
a1.append_constraint(fk)
eq_(fk.name, "fk_address_HASH_address")
def test_schematype_ck_name_boolean(self):
- m1 = MetaData(naming_convention={
- "ck": "ck_%(table_name)s_%(constraint_name)s"})
+ m1 = MetaData(
+ naming_convention={"ck": "ck_%(table_name)s_%(constraint_name)s"}
+ )
- u1 = Table('user', m1,
- Column('x', Boolean(name='foo'))
- )
+ u1 = Table("user", m1, Column("x", Boolean(name="foo")))
# constraint is not hit
eq_(
- [c for c in u1.constraints
- if isinstance(c, CheckConstraint)][0].name, "foo"
+ [c for c in u1.constraints if isinstance(c, CheckConstraint)][
+ 0
+ ].name,
+ "foo",
)
# but is hit at compile time
self.assert_compile(
@@ -4485,20 +4741,21 @@ class NamingConventionTest(fixtures.TestBase, AssertsCompiledSQL):
'CREATE TABLE "user" ('
"x BOOLEAN, "
"CONSTRAINT ck_user_foo CHECK (x IN (0, 1))"
- ")"
+ ")",
)
def test_schematype_ck_name_boolean_not_on_name(self):
- m1 = MetaData(naming_convention={
- "ck": "ck_%(table_name)s_%(column_0_name)s"})
+ m1 = MetaData(
+ naming_convention={"ck": "ck_%(table_name)s_%(column_0_name)s"}
+ )
- u1 = Table('user', m1,
- Column('x', Boolean())
- )
+ u1 = Table("user", m1, Column("x", Boolean()))
# constraint is not hit
eq_(
- [c for c in u1.constraints
- if isinstance(c, CheckConstraint)][0].name, "_unnamed_"
+ [c for c in u1.constraints if isinstance(c, CheckConstraint)][
+ 0
+ ].name,
+ "_unnamed_",
)
# but is hit at compile time
self.assert_compile(
@@ -4506,19 +4763,20 @@ class NamingConventionTest(fixtures.TestBase, AssertsCompiledSQL):
'CREATE TABLE "user" ('
"x BOOLEAN, "
"CONSTRAINT ck_user_x CHECK (x IN (0, 1))"
- ")"
+ ")",
)
def test_schematype_ck_name_enum(self):
- m1 = MetaData(naming_convention={
- "ck": "ck_%(table_name)s_%(constraint_name)s"})
+ m1 = MetaData(
+ naming_convention={"ck": "ck_%(table_name)s_%(constraint_name)s"}
+ )
- u1 = Table('user', m1,
- Column('x', Enum('a', 'b', name='foo'))
- )
+ u1 = Table("user", m1, Column("x", Enum("a", "b", name="foo")))
eq_(
- [c for c in u1.constraints
- if isinstance(c, CheckConstraint)][0].name, "foo"
+ [c for c in u1.constraints if isinstance(c, CheckConstraint)][
+ 0
+ ].name,
+ "foo",
)
# but is hit at compile time
self.assert_compile(
@@ -4526,19 +4784,22 @@ class NamingConventionTest(fixtures.TestBase, AssertsCompiledSQL):
'CREATE TABLE "user" ('
"x VARCHAR(1), "
"CONSTRAINT ck_user_foo CHECK (x IN ('a', 'b'))"
- ")"
+ ")",
)
def test_schematype_ck_name_propagate_conv(self):
- m1 = MetaData(naming_convention={
- "ck": "ck_%(table_name)s_%(constraint_name)s"})
+ m1 = MetaData(
+ naming_convention={"ck": "ck_%(table_name)s_%(constraint_name)s"}
+ )
- u1 = Table('user', m1,
- Column('x', Enum('a', 'b', name=naming.conv('foo')))
- )
+ u1 = Table(
+ "user", m1, Column("x", Enum("a", "b", name=naming.conv("foo")))
+ )
eq_(
- [c for c in u1.constraints
- if isinstance(c, CheckConstraint)][0].name, "foo"
+ [c for c in u1.constraints if isinstance(c, CheckConstraint)][
+ 0
+ ].name,
+ "foo",
)
# but is hit at compile time
self.assert_compile(
@@ -4546,62 +4807,60 @@ class NamingConventionTest(fixtures.TestBase, AssertsCompiledSQL):
'CREATE TABLE "user" ('
"x VARCHAR(1), "
"CONSTRAINT foo CHECK (x IN ('a', 'b'))"
- ")"
+ ")",
)
def test_schematype_ck_name_boolean_no_name(self):
- m1 = MetaData(naming_convention={
- "ck": "ck_%(table_name)s_%(constraint_name)s"
- })
-
- u1 = Table(
- 'user', m1,
- Column('x', Boolean())
+ m1 = MetaData(
+ naming_convention={"ck": "ck_%(table_name)s_%(constraint_name)s"}
)
+
+ u1 = Table("user", m1, Column("x", Boolean()))
# constraint gets special _defer_none_name
eq_(
- [c for c in u1.constraints
- if isinstance(c, CheckConstraint)][0].name, "_unnamed_"
+ [c for c in u1.constraints if isinstance(c, CheckConstraint)][
+ 0
+ ].name,
+ "_unnamed_",
)
# no issue with native boolean
self.assert_compile(
schema.CreateTable(u1),
- 'CREATE TABLE "user" ('
- "x BOOLEAN"
- ")",
- dialect='postgresql'
+ 'CREATE TABLE "user" (' "x BOOLEAN" ")",
+ dialect="postgresql",
)
assert_raises_message(
exc.InvalidRequestError,
r"Naming convention including \%\(constraint_name\)s token "
r"requires that constraint is explicitly named.",
- schema.CreateTable(u1).compile, dialect=default.DefaultDialect()
+ schema.CreateTable(u1).compile,
+ dialect=default.DefaultDialect(),
)
def test_schematype_no_ck_name_boolean_no_name(self):
m1 = MetaData() # no naming convention
- u1 = Table(
- 'user', m1,
- Column('x', Boolean())
- )
+ u1 = Table("user", m1, Column("x", Boolean()))
# constraint gets special _defer_none_name
eq_(
- [c for c in u1.constraints
- if isinstance(c, CheckConstraint)][0].name, "_unnamed_"
+ [c for c in u1.constraints if isinstance(c, CheckConstraint)][
+ 0
+ ].name,
+ "_unnamed_",
)
self.assert_compile(
schema.CreateTable(u1),
- 'CREATE TABLE "user" (x BOOLEAN, CHECK (x IN (0, 1)))'
+ 'CREATE TABLE "user" (x BOOLEAN, CHECK (x IN (0, 1)))',
)
def test_ck_constraint_redundant_event(self):
- u1 = self._fixture(naming_convention={
- "ck": "ck_%(table_name)s_%(constraint_name)s"})
+ u1 = self._fixture(
+ naming_convention={"ck": "ck_%(table_name)s_%(constraint_name)s"}
+ )
- ck1 = CheckConstraint(u1.c.version > 3, name='foo')
+ ck1 = CheckConstraint(u1.c.version > 3, name="foo")
u1.append_constraint(ck1)
u1.append_constraint(ck1)
u1.append_constraint(ck1)
@@ -4615,8 +4874,8 @@ class NamingConventionTest(fixtures.TestBase, AssertsCompiledSQL):
eq_(m2.naming_convention, {"pk": "%(table_name)s_pk"})
- t2a = Table('t2', m, Column('id', Integer, primary_key=True))
- t2b = Table('t2', m2, Column('id', Integer, primary_key=True))
+ t2a = Table("t2", m, Column("id", Integer, primary_key=True))
+ t2b = Table("t2", m2, Column("id", Integer, primary_key=True))
eq_(t2a.primary_key.name, t2b.primary_key.name)
eq_(t2b.primary_key.name, "t2_pk")