summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/testing/suite
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2019-01-06 01:14:26 -0500
committermike bayer <mike_mp@zzzcomputing.com>2019-01-06 17:34:50 +0000
commit1e1a38e7801f410f244e4bbb44ec795ae152e04e (patch)
tree28e725c5c8188bd0cfd133d1e268dbca9b524978 /lib/sqlalchemy/testing/suite
parent404e69426b05a82d905cbb3ad33adafccddb00dd (diff)
downloadsqlalchemy-1e1a38e7801f410f244e4bbb44ec795ae152e04e.tar.gz
Run black -l 79 against all source files
This is a straight reformat run using black as is, with no edits applied at all. The black run will format code consistently, however in some cases that are prevalent in SQLAlchemy code it produces too-long lines. The too-long lines will be resolved in the following commit that will resolve all remaining flake8 issues including shadowed builtins, long lines, import order, unused imports, duplicate imports, and docstring issues. Change-Id: I7eda77fed3d8e73df84b3651fd6cfcfe858d4dc9
Diffstat (limited to 'lib/sqlalchemy/testing/suite')
-rw-r--r--lib/sqlalchemy/testing/suite/__init__.py1
-rw-r--r--lib/sqlalchemy/testing/suite/test_cte.py132
-rw-r--r--lib/sqlalchemy/testing/suite/test_ddl.py50
-rw-r--r--lib/sqlalchemy/testing/suite/test_dialect.py54
-rw-r--r--lib/sqlalchemy/testing/suite/test_insert.py221
-rw-r--r--lib/sqlalchemy/testing/suite/test_reflection.py762
-rw-r--r--lib/sqlalchemy/testing/suite/test_results.py242
-rw-r--r--lib/sqlalchemy/testing/suite/test_select.py483
-rw-r--r--lib/sqlalchemy/testing/suite/test_sequence.py136
-rw-r--r--lib/sqlalchemy/testing/suite/test_types.py702
-rw-r--r--lib/sqlalchemy/testing/suite/test_update_delete.py37
11 files changed, 1403 insertions, 1417 deletions
diff --git a/lib/sqlalchemy/testing/suite/__init__.py b/lib/sqlalchemy/testing/suite/__init__.py
index 748d9722d..a4e142c5a 100644
--- a/lib/sqlalchemy/testing/suite/__init__.py
+++ b/lib/sqlalchemy/testing/suite/__init__.py
@@ -1,4 +1,3 @@
-
from sqlalchemy.testing.suite.test_cte import *
from sqlalchemy.testing.suite.test_dialect import *
from sqlalchemy.testing.suite.test_ddl import *
diff --git a/lib/sqlalchemy/testing/suite/test_cte.py b/lib/sqlalchemy/testing/suite/test_cte.py
index cc72278e6..d2f35933b 100644
--- a/lib/sqlalchemy/testing/suite/test_cte.py
+++ b/lib/sqlalchemy/testing/suite/test_cte.py
@@ -10,22 +10,28 @@ from ..schema import Table, Column
class CTETest(fixtures.TablesTest):
__backend__ = True
- __requires__ = 'ctes',
+ __requires__ = ("ctes",)
- run_inserts = 'each'
- run_deletes = 'each'
+ run_inserts = "each"
+ run_deletes = "each"
@classmethod
def define_tables(cls, metadata):
- Table("some_table", metadata,
- Column('id', Integer, primary_key=True),
- Column('data', String(50)),
- Column("parent_id", ForeignKey("some_table.id")))
+ Table(
+ "some_table",
+ metadata,
+ Column("id", Integer, primary_key=True),
+ Column("data", String(50)),
+ Column("parent_id", ForeignKey("some_table.id")),
+ )
- Table("some_other_table", metadata,
- Column('id', Integer, primary_key=True),
- Column('data', String(50)),
- Column("parent_id", Integer))
+ Table(
+ "some_other_table",
+ metadata,
+ Column("id", Integer, primary_key=True),
+ Column("data", String(50)),
+ Column("parent_id", Integer),
+ )
@classmethod
def insert_data(cls):
@@ -36,28 +42,33 @@ class CTETest(fixtures.TablesTest):
{"id": 2, "data": "d2", "parent_id": 1},
{"id": 3, "data": "d3", "parent_id": 1},
{"id": 4, "data": "d4", "parent_id": 3},
- {"id": 5, "data": "d5", "parent_id": 3}
- ]
+ {"id": 5, "data": "d5", "parent_id": 3},
+ ],
)
def test_select_nonrecursive_round_trip(self):
some_table = self.tables.some_table
with config.db.connect() as conn:
- cte = select([some_table]).where(
- some_table.c.data.in_(["d2", "d3", "d4"])).cte("some_cte")
+ cte = (
+ select([some_table])
+ .where(some_table.c.data.in_(["d2", "d3", "d4"]))
+ .cte("some_cte")
+ )
result = conn.execute(
select([cte.c.data]).where(cte.c.data.in_(["d4", "d5"]))
)
- eq_(result.fetchall(), [("d4", )])
+ eq_(result.fetchall(), [("d4",)])
def test_select_recursive_round_trip(self):
some_table = self.tables.some_table
with config.db.connect() as conn:
- cte = select([some_table]).where(
- some_table.c.data.in_(["d2", "d3", "d4"])).cte(
- "some_cte", recursive=True)
+ cte = (
+ select([some_table])
+ .where(some_table.c.data.in_(["d2", "d3", "d4"]))
+ .cte("some_cte", recursive=True)
+ )
cte_alias = cte.alias("c1")
st1 = some_table.alias()
@@ -67,12 +78,13 @@ class CTETest(fixtures.TablesTest):
select([st1]).where(st1.c.id == cte_alias.c.parent_id)
)
result = conn.execute(
- select([cte.c.data]).where(
- cte.c.data != "d2").order_by(cte.c.data.desc())
+ select([cte.c.data])
+ .where(cte.c.data != "d2")
+ .order_by(cte.c.data.desc())
)
eq_(
result.fetchall(),
- [('d4',), ('d3',), ('d3',), ('d1',), ('d1',), ('d1',)]
+ [("d4",), ("d3",), ("d3",), ("d1",), ("d1",), ("d1",)],
)
def test_insert_from_select_round_trip(self):
@@ -80,20 +92,21 @@ class CTETest(fixtures.TablesTest):
some_other_table = self.tables.some_other_table
with config.db.connect() as conn:
- cte = select([some_table]).where(
- some_table.c.data.in_(["d2", "d3", "d4"])
- ).cte("some_cte")
+ cte = (
+ select([some_table])
+ .where(some_table.c.data.in_(["d2", "d3", "d4"]))
+ .cte("some_cte")
+ )
conn.execute(
some_other_table.insert().from_select(
- ["id", "data", "parent_id"],
- select([cte])
+ ["id", "data", "parent_id"], select([cte])
)
)
eq_(
conn.execute(
select([some_other_table]).order_by(some_other_table.c.id)
).fetchall(),
- [(2, "d2", 1), (3, "d3", 1), (4, "d4", 3)]
+ [(2, "d2", 1), (3, "d3", 1), (4, "d4", 3)],
)
@testing.requires.ctes_with_update_delete
@@ -105,27 +118,31 @@ class CTETest(fixtures.TablesTest):
with config.db.connect() as conn:
conn.execute(
some_other_table.insert().from_select(
- ['id', 'data', 'parent_id'],
- select([some_table])
+ ["id", "data", "parent_id"], select([some_table])
)
)
- cte = select([some_table]).where(
- some_table.c.data.in_(["d2", "d3", "d4"])
- ).cte("some_cte")
+ cte = (
+ select([some_table])
+ .where(some_table.c.data.in_(["d2", "d3", "d4"]))
+ .cte("some_cte")
+ )
conn.execute(
- some_other_table.update().values(parent_id=5).where(
- some_other_table.c.data == cte.c.data
- )
+ some_other_table.update()
+ .values(parent_id=5)
+ .where(some_other_table.c.data == cte.c.data)
)
eq_(
conn.execute(
select([some_other_table]).order_by(some_other_table.c.id)
).fetchall(),
[
- (1, "d1", None), (2, "d2", 5),
- (3, "d3", 5), (4, "d4", 5), (5, "d5", 3)
- ]
+ (1, "d1", None),
+ (2, "d2", 5),
+ (3, "d3", 5),
+ (4, "d4", 5),
+ (5, "d5", 3),
+ ],
)
@testing.requires.ctes_with_update_delete
@@ -137,14 +154,15 @@ class CTETest(fixtures.TablesTest):
with config.db.connect() as conn:
conn.execute(
some_other_table.insert().from_select(
- ['id', 'data', 'parent_id'],
- select([some_table])
+ ["id", "data", "parent_id"], select([some_table])
)
)
- cte = select([some_table]).where(
- some_table.c.data.in_(["d2", "d3", "d4"])
- ).cte("some_cte")
+ cte = (
+ select([some_table])
+ .where(some_table.c.data.in_(["d2", "d3", "d4"]))
+ .cte("some_cte")
+ )
conn.execute(
some_other_table.delete().where(
some_other_table.c.data == cte.c.data
@@ -154,9 +172,7 @@ class CTETest(fixtures.TablesTest):
conn.execute(
select([some_other_table]).order_by(some_other_table.c.id)
).fetchall(),
- [
- (1, "d1", None), (5, "d5", 3)
- ]
+ [(1, "d1", None), (5, "d5", 3)],
)
@testing.requires.ctes_with_update_delete
@@ -168,26 +184,26 @@ class CTETest(fixtures.TablesTest):
with config.db.connect() as conn:
conn.execute(
some_other_table.insert().from_select(
- ['id', 'data', 'parent_id'],
- select([some_table])
+ ["id", "data", "parent_id"], select([some_table])
)
)
- cte = select([some_table]).where(
- some_table.c.data.in_(["d2", "d3", "d4"])
- ).cte("some_cte")
+ cte = (
+ select([some_table])
+ .where(some_table.c.data.in_(["d2", "d3", "d4"]))
+ .cte("some_cte")
+ )
conn.execute(
some_other_table.delete().where(
- some_other_table.c.data ==
- select([cte.c.data]).where(
- cte.c.id == some_other_table.c.id)
+ some_other_table.c.data
+ == select([cte.c.data]).where(
+ cte.c.id == some_other_table.c.id
+ )
)
)
eq_(
conn.execute(
select([some_other_table]).order_by(some_other_table.c.id)
).fetchall(),
- [
- (1, "d1", None), (5, "d5", 3)
- ]
+ [(1, "d1", None), (5, "d5", 3)],
)
diff --git a/lib/sqlalchemy/testing/suite/test_ddl.py b/lib/sqlalchemy/testing/suite/test_ddl.py
index 1d8010c8a..7c44388d4 100644
--- a/lib/sqlalchemy/testing/suite/test_ddl.py
+++ b/lib/sqlalchemy/testing/suite/test_ddl.py
@@ -1,5 +1,3 @@
-
-
from .. import fixtures, config, util
from ..config import requirements
from ..assertions import eq_
@@ -11,55 +9,47 @@ class TableDDLTest(fixtures.TestBase):
__backend__ = True
def _simple_fixture(self):
- return Table('test_table', self.metadata,
- Column('id', Integer, primary_key=True,
- autoincrement=False),
- Column('data', String(50))
- )
+ return Table(
+ "test_table",
+ self.metadata,
+ Column("id", Integer, primary_key=True, autoincrement=False),
+ Column("data", String(50)),
+ )
def _underscore_fixture(self):
- return Table('_test_table', self.metadata,
- Column('id', Integer, primary_key=True,
- autoincrement=False),
- Column('_data', String(50))
- )
+ return Table(
+ "_test_table",
+ self.metadata,
+ Column("id", Integer, primary_key=True, autoincrement=False),
+ Column("_data", String(50)),
+ )
def _simple_roundtrip(self, table):
with config.db.begin() as conn:
- conn.execute(table.insert().values((1, 'some data')))
+ conn.execute(table.insert().values((1, "some data")))
result = conn.execute(table.select())
- eq_(
- result.first(),
- (1, 'some data')
- )
+ eq_(result.first(), (1, "some data"))
@requirements.create_table
@util.provide_metadata
def test_create_table(self):
table = self._simple_fixture()
- table.create(
- config.db, checkfirst=False
- )
+ table.create(config.db, checkfirst=False)
self._simple_roundtrip(table)
@requirements.drop_table
@util.provide_metadata
def test_drop_table(self):
table = self._simple_fixture()
- table.create(
- config.db, checkfirst=False
- )
- table.drop(
- config.db, checkfirst=False
- )
+ table.create(config.db, checkfirst=False)
+ table.drop(config.db, checkfirst=False)
@requirements.create_table
@util.provide_metadata
def test_underscore_names(self):
table = self._underscore_fixture()
- table.create(
- config.db, checkfirst=False
- )
+ table.create(config.db, checkfirst=False)
self._simple_roundtrip(table)
-__all__ = ('TableDDLTest', )
+
+__all__ = ("TableDDLTest",)
diff --git a/lib/sqlalchemy/testing/suite/test_dialect.py b/lib/sqlalchemy/testing/suite/test_dialect.py
index 2c5dd0e36..5e589f3b8 100644
--- a/lib/sqlalchemy/testing/suite/test_dialect.py
+++ b/lib/sqlalchemy/testing/suite/test_dialect.py
@@ -15,16 +15,19 @@ class ExceptionTest(fixtures.TablesTest):
specific exceptions from real round trips, we need to be conservative.
"""
- run_deletes = 'each'
+
+ run_deletes = "each"
__backend__ = True
@classmethod
def define_tables(cls, metadata):
- Table('manual_pk', metadata,
- Column('id', Integer, primary_key=True, autoincrement=False),
- Column('data', String(50))
- )
+ Table(
+ "manual_pk",
+ metadata,
+ Column("id", Integer, primary_key=True, autoincrement=False),
+ Column("data", String(50)),
+ )
@requirements.duplicate_key_raises_integrity_error
def test_integrity_error(self):
@@ -33,15 +36,14 @@ class ExceptionTest(fixtures.TablesTest):
trans = conn.begin()
conn.execute(
- self.tables.manual_pk.insert(),
- {'id': 1, 'data': 'd1'}
+ self.tables.manual_pk.insert(), {"id": 1, "data": "d1"}
)
assert_raises(
exc.IntegrityError,
conn.execute,
self.tables.manual_pk.insert(),
- {'id': 1, 'data': 'd1'}
+ {"id": 1, "data": "d1"},
)
trans.rollback()
@@ -49,38 +51,39 @@ class ExceptionTest(fixtures.TablesTest):
class AutocommitTest(fixtures.TablesTest):
- run_deletes = 'each'
+ run_deletes = "each"
- __requires__ = 'autocommit',
+ __requires__ = ("autocommit",)
__backend__ = True
@classmethod
def define_tables(cls, metadata):
- Table('some_table', metadata,
- Column('id', Integer, primary_key=True, autoincrement=False),
- Column('data', String(50)),
- test_needs_acid=True
- )
+ Table(
+ "some_table",
+ metadata,
+ Column("id", Integer, primary_key=True, autoincrement=False),
+ Column("data", String(50)),
+ test_needs_acid=True,
+ )
def _test_conn_autocommits(self, conn, autocommit):
trans = conn.begin()
conn.execute(
- self.tables.some_table.insert(),
- {"id": 1, "data": "some data"}
+ self.tables.some_table.insert(), {"id": 1, "data": "some data"}
)
trans.rollback()
eq_(
conn.scalar(select([self.tables.some_table.c.id])),
- 1 if autocommit else None
+ 1 if autocommit else None,
)
conn.execute(self.tables.some_table.delete())
def test_autocommit_on(self):
conn = config.db.connect()
- c2 = conn.execution_options(isolation_level='AUTOCOMMIT')
+ c2 = conn.execution_options(isolation_level="AUTOCOMMIT")
self._test_conn_autocommits(c2, True)
conn.invalidate()
self._test_conn_autocommits(conn, False)
@@ -98,7 +101,7 @@ class EscapingTest(fixtures.TestBase):
"""
m = self.metadata
- t = Table('t', m, Column('data', String(50)))
+ t = Table("t", m, Column("data", String(50)))
t.create(config.db)
with config.db.begin() as conn:
conn.execute(t.insert(), dict(data="some % value"))
@@ -107,14 +110,17 @@ class EscapingTest(fixtures.TestBase):
eq_(
conn.scalar(
select([t.c.data]).where(
- t.c.data == literal_column("'some % value'"))
+ t.c.data == literal_column("'some % value'")
+ )
),
- "some % value"
+ "some % value",
)
eq_(
conn.scalar(
select([t.c.data]).where(
- t.c.data == literal_column("'some %% other value'"))
- ), "some %% other value"
+ t.c.data == literal_column("'some %% other value'")
+ )
+ ),
+ "some %% other value",
)
diff --git a/lib/sqlalchemy/testing/suite/test_insert.py b/lib/sqlalchemy/testing/suite/test_insert.py
index c0b6b18eb..6257451eb 100644
--- a/lib/sqlalchemy/testing/suite/test_insert.py
+++ b/lib/sqlalchemy/testing/suite/test_insert.py
@@ -10,53 +10,48 @@ from ..schema import Table, Column
class LastrowidTest(fixtures.TablesTest):
- run_deletes = 'each'
+ run_deletes = "each"
__backend__ = True
- __requires__ = 'implements_get_lastrowid', 'autoincrement_insert'
+ __requires__ = "implements_get_lastrowid", "autoincrement_insert"
__engine_options__ = {"implicit_returning": False}
@classmethod
def define_tables(cls, metadata):
- Table('autoinc_pk', metadata,
- Column('id', Integer, primary_key=True,
- test_needs_autoincrement=True),
- Column('data', String(50))
- )
+ Table(
+ "autoinc_pk",
+ metadata,
+ Column(
+ "id", Integer, primary_key=True, test_needs_autoincrement=True
+ ),
+ Column("data", String(50)),
+ )
- Table('manual_pk', metadata,
- Column('id', Integer, primary_key=True, autoincrement=False),
- Column('data', String(50))
- )
+ Table(
+ "manual_pk",
+ metadata,
+ Column("id", Integer, primary_key=True, autoincrement=False),
+ Column("data", String(50)),
+ )
def _assert_round_trip(self, table, conn):
row = conn.execute(table.select()).first()
- eq_(
- row,
- (config.db.dialect.default_sequence_base, "some data")
- )
+ eq_(row, (config.db.dialect.default_sequence_base, "some data"))
def test_autoincrement_on_insert(self):
- config.db.execute(
- self.tables.autoinc_pk.insert(),
- data="some data"
- )
+ config.db.execute(self.tables.autoinc_pk.insert(), data="some data")
self._assert_round_trip(self.tables.autoinc_pk, config.db)
def test_last_inserted_id(self):
r = config.db.execute(
- self.tables.autoinc_pk.insert(),
- data="some data"
+ self.tables.autoinc_pk.insert(), data="some data"
)
pk = config.db.scalar(select([self.tables.autoinc_pk.c.id]))
- eq_(
- r.inserted_primary_key,
- [pk]
- )
+ eq_(r.inserted_primary_key, [pk])
# failed on pypy1.9 but seems to be OK on pypy 2.1
# @exclusions.fails_if(lambda: util.pypy,
@@ -65,50 +60,57 @@ class LastrowidTest(fixtures.TablesTest):
@requirements.dbapi_lastrowid
def test_native_lastrowid_autoinc(self):
r = config.db.execute(
- self.tables.autoinc_pk.insert(),
- data="some data"
+ self.tables.autoinc_pk.insert(), data="some data"
)
lastrowid = r.lastrowid
pk = config.db.scalar(select([self.tables.autoinc_pk.c.id]))
- eq_(
- lastrowid, pk
- )
+ eq_(lastrowid, pk)
class InsertBehaviorTest(fixtures.TablesTest):
- run_deletes = 'each'
+ run_deletes = "each"
__backend__ = True
@classmethod
def define_tables(cls, metadata):
- Table('autoinc_pk', metadata,
- Column('id', Integer, primary_key=True,
- test_needs_autoincrement=True),
- Column('data', String(50))
- )
- Table('manual_pk', metadata,
- Column('id', Integer, primary_key=True, autoincrement=False),
- Column('data', String(50))
- )
- Table('includes_defaults', metadata,
- Column('id', Integer, primary_key=True,
- test_needs_autoincrement=True),
- Column('data', String(50)),
- Column('x', Integer, default=5),
- Column('y', Integer,
- default=literal_column("2", type_=Integer) + literal(2)))
+ Table(
+ "autoinc_pk",
+ metadata,
+ Column(
+ "id", Integer, primary_key=True, test_needs_autoincrement=True
+ ),
+ Column("data", String(50)),
+ )
+ Table(
+ "manual_pk",
+ metadata,
+ Column("id", Integer, primary_key=True, autoincrement=False),
+ Column("data", String(50)),
+ )
+ Table(
+ "includes_defaults",
+ metadata,
+ Column(
+ "id", Integer, primary_key=True, test_needs_autoincrement=True
+ ),
+ Column("data", String(50)),
+ Column("x", Integer, default=5),
+ Column(
+ "y",
+ Integer,
+ default=literal_column("2", type_=Integer) + literal(2),
+ ),
+ )
def test_autoclose_on_insert(self):
if requirements.returning.enabled:
engine = engines.testing_engine(
- options={'implicit_returning': False})
+ options={"implicit_returning": False}
+ )
else:
engine = config.db
- r = engine.execute(
- self.tables.autoinc_pk.insert(),
- data="some data"
- )
+ r = engine.execute(self.tables.autoinc_pk.insert(), data="some data")
assert r._soft_closed
assert not r.closed
assert r.is_insert
@@ -117,8 +119,7 @@ class InsertBehaviorTest(fixtures.TablesTest):
@requirements.returning
def test_autoclose_on_insert_implicit_returning(self):
r = config.db.execute(
- self.tables.autoinc_pk.insert(),
- data="some data"
+ self.tables.autoinc_pk.insert(), data="some data"
)
assert r._soft_closed
assert not r.closed
@@ -127,15 +128,14 @@ class InsertBehaviorTest(fixtures.TablesTest):
@requirements.empty_inserts
def test_empty_insert(self):
- r = config.db.execute(
- self.tables.autoinc_pk.insert(),
- )
+ r = config.db.execute(self.tables.autoinc_pk.insert())
assert r._soft_closed
assert not r.closed
r = config.db.execute(
- self.tables.autoinc_pk.select().
- where(self.tables.autoinc_pk.c.id != None)
+ self.tables.autoinc_pk.select().where(
+ self.tables.autoinc_pk.c.id != None
+ )
)
assert len(r.fetchall())
@@ -150,15 +150,15 @@ class InsertBehaviorTest(fixtures.TablesTest):
dict(id=1, data="data1"),
dict(id=2, data="data2"),
dict(id=3, data="data3"),
- ]
+ ],
)
result = config.db.execute(
- dest_table.insert().
- from_select(
+ dest_table.insert().from_select(
("data",),
- select([src_table.c.data]).
- where(src_table.c.data.in_(["data2", "data3"]))
+ select([src_table.c.data]).where(
+ src_table.c.data.in_(["data2", "data3"])
+ ),
)
)
@@ -167,7 +167,7 @@ class InsertBehaviorTest(fixtures.TablesTest):
result = config.db.execute(
select([dest_table.c.data]).order_by(dest_table.c.data)
)
- eq_(result.fetchall(), [("data2", ), ("data3", )])
+ eq_(result.fetchall(), [("data2",), ("data3",)])
@requirements.insert_from_select
def test_insert_from_select_autoinc_no_rows(self):
@@ -175,11 +175,11 @@ class InsertBehaviorTest(fixtures.TablesTest):
dest_table = self.tables.autoinc_pk
result = config.db.execute(
- dest_table.insert().
- from_select(
+ dest_table.insert().from_select(
("data",),
- select([src_table.c.data]).
- where(src_table.c.data.in_(["data2", "data3"]))
+ select([src_table.c.data]).where(
+ src_table.c.data.in_(["data2", "data3"])
+ ),
)
)
eq_(result.inserted_primary_key, [None])
@@ -199,23 +199,23 @@ class InsertBehaviorTest(fixtures.TablesTest):
dict(id=1, data="data1"),
dict(id=2, data="data2"),
dict(id=3, data="data3"),
- ]
+ ],
)
config.db.execute(
- table.insert(inline=True).
- from_select(("id", "data",),
- select([table.c.id + 5, table.c.data]).
- where(table.c.data.in_(["data2", "data3"]))
- ),
+ table.insert(inline=True).from_select(
+ ("id", "data"),
+ select([table.c.id + 5, table.c.data]).where(
+ table.c.data.in_(["data2", "data3"])
+ ),
+ )
)
eq_(
config.db.execute(
select([table.c.data]).order_by(table.c.data)
).fetchall(),
- [("data1", ), ("data2", ), ("data2", ),
- ("data3", ), ("data3", )]
+ [("data1",), ("data2",), ("data2",), ("data3",), ("data3",)],
)
@requirements.insert_from_select
@@ -227,56 +227,60 @@ class InsertBehaviorTest(fixtures.TablesTest):
dict(id=1, data="data1"),
dict(id=2, data="data2"),
dict(id=3, data="data3"),
- ]
+ ],
)
config.db.execute(
- table.insert(inline=True).
- from_select(("id", "data",),
- select([table.c.id + 5, table.c.data]).
- where(table.c.data.in_(["data2", "data3"]))
- ),
+ table.insert(inline=True).from_select(
+ ("id", "data"),
+ select([table.c.id + 5, table.c.data]).where(
+ table.c.data.in_(["data2", "data3"])
+ ),
+ )
)
eq_(
config.db.execute(
select([table]).order_by(table.c.data, table.c.id)
).fetchall(),
- [(1, 'data1', 5, 4), (2, 'data2', 5, 4),
- (7, 'data2', 5, 4), (3, 'data3', 5, 4), (8, 'data3', 5, 4)]
+ [
+ (1, "data1", 5, 4),
+ (2, "data2", 5, 4),
+ (7, "data2", 5, 4),
+ (3, "data3", 5, 4),
+ (8, "data3", 5, 4),
+ ],
)
class ReturningTest(fixtures.TablesTest):
- run_create_tables = 'each'
- __requires__ = 'returning', 'autoincrement_insert'
+ run_create_tables = "each"
+ __requires__ = "returning", "autoincrement_insert"
__backend__ = True
__engine_options__ = {"implicit_returning": True}
def _assert_round_trip(self, table, conn):
row = conn.execute(table.select()).first()
- eq_(
- row,
- (config.db.dialect.default_sequence_base, "some data")
- )
+ eq_(row, (config.db.dialect.default_sequence_base, "some data"))
@classmethod
def define_tables(cls, metadata):
- Table('autoinc_pk', metadata,
- Column('id', Integer, primary_key=True,
- test_needs_autoincrement=True),
- Column('data', String(50))
- )
+ Table(
+ "autoinc_pk",
+ metadata,
+ Column(
+ "id", Integer, primary_key=True, test_needs_autoincrement=True
+ ),
+ Column("data", String(50)),
+ )
@requirements.fetch_rows_post_commit
def test_explicit_returning_pk_autocommit(self):
engine = config.db
table = self.tables.autoinc_pk
r = engine.execute(
- table.insert().returning(
- table.c.id),
- data="some data"
+ table.insert().returning(table.c.id), data="some data"
)
pk = r.first()[0]
fetched_pk = config.db.scalar(select([table.c.id]))
@@ -287,9 +291,7 @@ class ReturningTest(fixtures.TablesTest):
table = self.tables.autoinc_pk
with engine.begin() as conn:
r = conn.execute(
- table.insert().returning(
- table.c.id),
- data="some data"
+ table.insert().returning(table.c.id), data="some data"
)
pk = r.first()[0]
fetched_pk = config.db.scalar(select([table.c.id]))
@@ -297,23 +299,16 @@ class ReturningTest(fixtures.TablesTest):
def test_autoincrement_on_insert_implcit_returning(self):
- config.db.execute(
- self.tables.autoinc_pk.insert(),
- data="some data"
- )
+ config.db.execute(self.tables.autoinc_pk.insert(), data="some data")
self._assert_round_trip(self.tables.autoinc_pk, config.db)
def test_last_inserted_id_implicit_returning(self):
r = config.db.execute(
- self.tables.autoinc_pk.insert(),
- data="some data"
+ self.tables.autoinc_pk.insert(), data="some data"
)
pk = config.db.scalar(select([self.tables.autoinc_pk.c.id]))
- eq_(
- r.inserted_primary_key,
- [pk]
- )
+ eq_(r.inserted_primary_key, [pk])
-__all__ = ('LastrowidTest', 'InsertBehaviorTest', 'ReturningTest')
+__all__ = ("LastrowidTest", "InsertBehaviorTest", "ReturningTest")
diff --git a/lib/sqlalchemy/testing/suite/test_reflection.py b/lib/sqlalchemy/testing/suite/test_reflection.py
index 00a5aac01..bfed5f1ab 100644
--- a/lib/sqlalchemy/testing/suite/test_reflection.py
+++ b/lib/sqlalchemy/testing/suite/test_reflection.py
@@ -1,5 +1,3 @@
-
-
import sqlalchemy as sa
from sqlalchemy import exc as sa_exc
from sqlalchemy import types as sql_types
@@ -26,10 +24,12 @@ class HasTableTest(fixtures.TablesTest):
@classmethod
def define_tables(cls, metadata):
- Table('test_table', metadata,
- Column('id', Integer, primary_key=True),
- Column('data', String(50))
- )
+ Table(
+ "test_table",
+ metadata,
+ Column("id", Integer, primary_key=True),
+ Column("data", String(50)),
+ )
def test_has_table(self):
with config.db.begin() as conn:
@@ -46,8 +46,10 @@ class ComponentReflectionTest(fixtures.TablesTest):
def setup_bind(cls):
if config.requirements.independent_connections.enabled:
from sqlalchemy import pool
+
return engines.testing_engine(
- options=dict(poolclass=pool.StaticPool))
+ options=dict(poolclass=pool.StaticPool)
+ )
else:
return config.db
@@ -65,86 +67,109 @@ class ComponentReflectionTest(fixtures.TablesTest):
schema_prefix = ""
if testing.requires.self_referential_foreign_keys.enabled:
- users = Table('users', metadata,
- Column('user_id', sa.INT, primary_key=True),
- Column('test1', sa.CHAR(5), nullable=False),
- Column('test2', sa.Float(5), nullable=False),
- Column('parent_user_id', sa.Integer,
- sa.ForeignKey('%susers.user_id' %
- schema_prefix,
- name='user_id_fk')),
- schema=schema,
- test_needs_fk=True,
- )
+ users = Table(
+ "users",
+ metadata,
+ Column("user_id", sa.INT, primary_key=True),
+ Column("test1", sa.CHAR(5), nullable=False),
+ Column("test2", sa.Float(5), nullable=False),
+ Column(
+ "parent_user_id",
+ sa.Integer,
+ sa.ForeignKey(
+ "%susers.user_id" % schema_prefix, name="user_id_fk"
+ ),
+ ),
+ schema=schema,
+ test_needs_fk=True,
+ )
else:
- users = Table('users', metadata,
- Column('user_id', sa.INT, primary_key=True),
- Column('test1', sa.CHAR(5), nullable=False),
- Column('test2', sa.Float(5), nullable=False),
- schema=schema,
- test_needs_fk=True,
- )
-
- Table("dingalings", metadata,
- Column('dingaling_id', sa.Integer, primary_key=True),
- Column('address_id', sa.Integer,
- sa.ForeignKey('%semail_addresses.address_id' %
- schema_prefix)),
- Column('data', sa.String(30)),
- schema=schema,
- test_needs_fk=True,
- )
- Table('email_addresses', metadata,
- Column('address_id', sa.Integer),
- Column('remote_user_id', sa.Integer,
- sa.ForeignKey(users.c.user_id)),
- Column('email_address', sa.String(20)),
- sa.PrimaryKeyConstraint('address_id', name='email_ad_pk'),
- schema=schema,
- test_needs_fk=True,
- )
- Table('comment_test', metadata,
- Column('id', sa.Integer, primary_key=True, comment='id comment'),
- Column('data', sa.String(20), comment='data % comment'),
- Column(
- 'd2', sa.String(20),
- comment=r"""Comment types type speedily ' " \ '' Fun!"""),
- schema=schema,
- comment=r"""the test % ' " \ table comment""")
+ users = Table(
+ "users",
+ metadata,
+ Column("user_id", sa.INT, primary_key=True),
+ Column("test1", sa.CHAR(5), nullable=False),
+ Column("test2", sa.Float(5), nullable=False),
+ schema=schema,
+ test_needs_fk=True,
+ )
+
+ Table(
+ "dingalings",
+ metadata,
+ Column("dingaling_id", sa.Integer, primary_key=True),
+ Column(
+ "address_id",
+ sa.Integer,
+ sa.ForeignKey("%semail_addresses.address_id" % schema_prefix),
+ ),
+ Column("data", sa.String(30)),
+ schema=schema,
+ test_needs_fk=True,
+ )
+ Table(
+ "email_addresses",
+ metadata,
+ Column("address_id", sa.Integer),
+ Column(
+ "remote_user_id", sa.Integer, sa.ForeignKey(users.c.user_id)
+ ),
+ Column("email_address", sa.String(20)),
+ sa.PrimaryKeyConstraint("address_id", name="email_ad_pk"),
+ schema=schema,
+ test_needs_fk=True,
+ )
+ Table(
+ "comment_test",
+ metadata,
+ Column("id", sa.Integer, primary_key=True, comment="id comment"),
+ Column("data", sa.String(20), comment="data % comment"),
+ Column(
+ "d2",
+ sa.String(20),
+ comment=r"""Comment types type speedily ' " \ '' Fun!""",
+ ),
+ schema=schema,
+ comment=r"""the test % ' " \ table comment""",
+ )
if testing.requires.cross_schema_fk_reflection.enabled:
if schema is None:
Table(
- 'local_table', metadata,
- Column('id', sa.Integer, primary_key=True),
- Column('data', sa.String(20)),
+ "local_table",
+ metadata,
+ Column("id", sa.Integer, primary_key=True),
+ Column("data", sa.String(20)),
Column(
- 'remote_id',
+ "remote_id",
ForeignKey(
- '%s.remote_table_2.id' %
- testing.config.test_schema)
+ "%s.remote_table_2.id" % testing.config.test_schema
+ ),
),
test_needs_fk=True,
- schema=config.db.dialect.default_schema_name
+ schema=config.db.dialect.default_schema_name,
)
else:
Table(
- 'remote_table', metadata,
- Column('id', sa.Integer, primary_key=True),
+ "remote_table",
+ metadata,
+ Column("id", sa.Integer, primary_key=True),
Column(
- 'local_id',
+ "local_id",
ForeignKey(
- '%s.local_table.id' %
- config.db.dialect.default_schema_name)
+ "%s.local_table.id"
+ % config.db.dialect.default_schema_name
+ ),
),
- Column('data', sa.String(20)),
+ Column("data", sa.String(20)),
schema=schema,
test_needs_fk=True,
)
Table(
- 'remote_table_2', metadata,
- Column('id', sa.Integer, primary_key=True),
- Column('data', sa.String(20)),
+ "remote_table_2",
+ metadata,
+ Column("id", sa.Integer, primary_key=True),
+ Column("data", sa.String(20)),
schema=schema,
test_needs_fk=True,
)
@@ -155,19 +180,21 @@ class ComponentReflectionTest(fixtures.TablesTest):
if not schema:
# test_needs_fk is at the moment to force MySQL InnoDB
noncol_idx_test_nopk = Table(
- 'noncol_idx_test_nopk', metadata,
- Column('q', sa.String(5)),
+ "noncol_idx_test_nopk",
+ metadata,
+ Column("q", sa.String(5)),
test_needs_fk=True,
)
noncol_idx_test_pk = Table(
- 'noncol_idx_test_pk', metadata,
- Column('id', sa.Integer, primary_key=True),
- Column('q', sa.String(5)),
+ "noncol_idx_test_pk",
+ metadata,
+ Column("id", sa.Integer, primary_key=True),
+ Column("q", sa.String(5)),
test_needs_fk=True,
)
- Index('noncol_idx_nopk', noncol_idx_test_nopk.c.q.desc())
- Index('noncol_idx_pk', noncol_idx_test_pk.c.q.desc())
+ Index("noncol_idx_nopk", noncol_idx_test_nopk.c.q.desc())
+ Index("noncol_idx_pk", noncol_idx_test_pk.c.q.desc())
if testing.requires.view_column_reflection.enabled:
cls.define_views(metadata, schema)
@@ -180,34 +207,35 @@ class ComponentReflectionTest(fixtures.TablesTest):
# temp table fixture
if testing.against("oracle"):
kw = {
- 'prefixes': ["GLOBAL TEMPORARY"],
- 'oracle_on_commit': 'PRESERVE ROWS'
+ "prefixes": ["GLOBAL TEMPORARY"],
+ "oracle_on_commit": "PRESERVE ROWS",
}
else:
- kw = {
- 'prefixes': ["TEMPORARY"],
- }
+ kw = {"prefixes": ["TEMPORARY"]}
user_tmp = Table(
- "user_tmp", metadata,
+ "user_tmp",
+ metadata,
Column("id", sa.INT, primary_key=True),
- Column('name', sa.VARCHAR(50)),
- Column('foo', sa.INT),
- sa.UniqueConstraint('name', name='user_tmp_uq'),
+ Column("name", sa.VARCHAR(50)),
+ Column("foo", sa.INT),
+ sa.UniqueConstraint("name", name="user_tmp_uq"),
sa.Index("user_tmp_ix", "foo"),
**kw
)
- if testing.requires.view_reflection.enabled and \
- testing.requires.temporary_views.enabled:
- event.listen(
- user_tmp, "after_create",
- DDL("create temporary view user_tmp_v as "
- "select * from user_tmp")
- )
+ if (
+ testing.requires.view_reflection.enabled
+ and testing.requires.temporary_views.enabled
+ ):
event.listen(
- user_tmp, "before_drop",
- DDL("drop view user_tmp_v")
+ user_tmp,
+ "after_create",
+ DDL(
+ "create temporary view user_tmp_v as "
+ "select * from user_tmp"
+ ),
)
+ event.listen(user_tmp, "before_drop", DDL("drop view user_tmp_v"))
@classmethod
def define_index(cls, metadata, users):
@@ -216,23 +244,19 @@ class ComponentReflectionTest(fixtures.TablesTest):
@classmethod
def define_views(cls, metadata, schema):
- for table_name in ('users', 'email_addresses'):
+ for table_name in ("users", "email_addresses"):
fullname = table_name
if schema:
fullname = "%s.%s" % (schema, table_name)
- view_name = fullname + '_v'
+ view_name = fullname + "_v"
query = "CREATE VIEW %s AS SELECT * FROM %s" % (
- view_name, fullname)
-
- event.listen(
- metadata,
- "after_create",
- DDL(query)
+ view_name,
+ fullname,
)
+
+ event.listen(metadata, "after_create", DDL(query))
event.listen(
- metadata,
- "before_drop",
- DDL("DROP VIEW %s" % view_name)
+ metadata, "before_drop", DDL("DROP VIEW %s" % view_name)
)
@testing.requires.schema_reflection
@@ -244,9 +268,9 @@ class ComponentReflectionTest(fixtures.TablesTest):
@testing.requires.schema_reflection
def test_dialect_initialize(self):
engine = engines.testing_engine()
- assert not hasattr(engine.dialect, 'default_schema_name')
+ assert not hasattr(engine.dialect, "default_schema_name")
inspect(engine)
- assert hasattr(engine.dialect, 'default_schema_name')
+ assert hasattr(engine.dialect, "default_schema_name")
@testing.requires.schema_reflection
def test_get_default_schema_name(self):
@@ -254,40 +278,49 @@ class ComponentReflectionTest(fixtures.TablesTest):
eq_(insp.default_schema_name, testing.db.dialect.default_schema_name)
@testing.provide_metadata
- def _test_get_table_names(self, schema=None, table_type='table',
- order_by=None):
+ def _test_get_table_names(
+ self, schema=None, table_type="table", order_by=None
+ ):
_ignore_tables = [
- 'comment_test', 'noncol_idx_test_pk', 'noncol_idx_test_nopk',
- 'local_table', 'remote_table', 'remote_table_2'
+ "comment_test",
+ "noncol_idx_test_pk",
+ "noncol_idx_test_nopk",
+ "local_table",
+ "remote_table",
+ "remote_table_2",
]
meta = self.metadata
- users, addresses, dingalings = self.tables.users, \
- self.tables.email_addresses, self.tables.dingalings
+ users, addresses, dingalings = (
+ self.tables.users,
+ self.tables.email_addresses,
+ self.tables.dingalings,
+ )
insp = inspect(meta.bind)
- if table_type == 'view':
+ if table_type == "view":
table_names = insp.get_view_names(schema)
table_names.sort()
- answer = ['email_addresses_v', 'users_v']
+ answer = ["email_addresses_v", "users_v"]
eq_(sorted(table_names), answer)
else:
table_names = [
- t for t in insp.get_table_names(
- schema,
- order_by=order_by) if t not in _ignore_tables]
+ t
+ for t in insp.get_table_names(schema, order_by=order_by)
+ if t not in _ignore_tables
+ ]
- if order_by == 'foreign_key':
- answer = ['users', 'email_addresses', 'dingalings']
+ if order_by == "foreign_key":
+ answer = ["users", "email_addresses", "dingalings"]
eq_(table_names, answer)
else:
- answer = ['dingalings', 'email_addresses', 'users']
+ answer = ["dingalings", "email_addresses", "users"]
eq_(sorted(table_names), answer)
@testing.requires.temp_table_names
def test_get_temp_table_names(self):
insp = inspect(self.bind)
temp_table_names = insp.get_temp_table_names()
- eq_(sorted(temp_table_names), ['user_tmp'])
+ eq_(sorted(temp_table_names), ["user_tmp"])
@testing.requires.view_reflection
@testing.requires.temp_table_names
@@ -295,7 +328,7 @@ class ComponentReflectionTest(fixtures.TablesTest):
def test_get_temp_view_names(self):
insp = inspect(self.bind)
temp_table_names = insp.get_temp_view_names()
- eq_(sorted(temp_table_names), ['user_tmp_v'])
+ eq_(sorted(temp_table_names), ["user_tmp_v"])
@testing.requires.table_reflection
def test_get_table_names(self):
@@ -304,7 +337,7 @@ class ComponentReflectionTest(fixtures.TablesTest):
@testing.requires.table_reflection
@testing.requires.foreign_key_constraint_reflection
def test_get_table_names_fks(self):
- self._test_get_table_names(order_by='foreign_key')
+ self._test_get_table_names(order_by="foreign_key")
@testing.requires.comment_reflection
def test_get_comments(self):
@@ -320,26 +353,24 @@ class ComponentReflectionTest(fixtures.TablesTest):
eq_(
insp.get_table_comment("comment_test", schema=schema),
- {"text": r"""the test % ' " \ table comment"""}
+ {"text": r"""the test % ' " \ table comment"""},
)
- eq_(
- insp.get_table_comment("users", schema=schema),
- {"text": None}
- )
+ eq_(insp.get_table_comment("users", schema=schema), {"text": None})
eq_(
[
- {"name": rec['name'], "comment": rec['comment']}
- for rec in
- insp.get_columns("comment_test", schema=schema)
+ {"name": rec["name"], "comment": rec["comment"]}
+ for rec in insp.get_columns("comment_test", schema=schema)
],
[
- {'comment': 'id comment', 'name': 'id'},
- {'comment': 'data % comment', 'name': 'data'},
- {'comment': r"""Comment types type speedily ' " \ '' Fun!""",
- 'name': 'd2'}
- ]
+ {"comment": "id comment", "name": "id"},
+ {"comment": "data % comment", "name": "data"},
+ {
+ "comment": r"""Comment types type speedily ' " \ '' Fun!""",
+ "name": "d2",
+ },
+ ],
)
@testing.requires.table_reflection
@@ -349,30 +380,33 @@ class ComponentReflectionTest(fixtures.TablesTest):
@testing.requires.view_column_reflection
def test_get_view_names(self):
- self._test_get_table_names(table_type='view')
+ self._test_get_table_names(table_type="view")
@testing.requires.view_column_reflection
@testing.requires.schemas
def test_get_view_names_with_schema(self):
self._test_get_table_names(
- testing.config.test_schema, table_type='view')
+ testing.config.test_schema, table_type="view"
+ )
@testing.requires.table_reflection
@testing.requires.view_column_reflection
def test_get_tables_and_views(self):
self._test_get_table_names()
- self._test_get_table_names(table_type='view')
+ self._test_get_table_names(table_type="view")
- def _test_get_columns(self, schema=None, table_type='table'):
+ def _test_get_columns(self, schema=None, table_type="table"):
meta = MetaData(testing.db)
- users, addresses, dingalings = self.tables.users, \
- self.tables.email_addresses, self.tables.dingalings
- table_names = ['users', 'email_addresses']
- if table_type == 'view':
- table_names = ['users_v', 'email_addresses_v']
+ users, addresses, dingalings = (
+ self.tables.users,
+ self.tables.email_addresses,
+ self.tables.dingalings,
+ )
+ table_names = ["users", "email_addresses"]
+ if table_type == "view":
+ table_names = ["users_v", "email_addresses_v"]
insp = inspect(meta.bind)
- for table_name, table in zip(table_names, (users,
- addresses)):
+ for table_name, table in zip(table_names, (users, addresses)):
schema_name = schema
cols = insp.get_columns(table_name, schema=schema_name)
self.assert_(len(cols) > 0, len(cols))
@@ -380,36 +414,46 @@ class ComponentReflectionTest(fixtures.TablesTest):
# should be in order
for i, col in enumerate(table.columns):
- eq_(col.name, cols[i]['name'])
- ctype = cols[i]['type'].__class__
+ eq_(col.name, cols[i]["name"])
+ ctype = cols[i]["type"].__class__
ctype_def = col.type
if isinstance(ctype_def, sa.types.TypeEngine):
ctype_def = ctype_def.__class__
# Oracle returns Date for DateTime.
- if testing.against('oracle') and ctype_def \
- in (sql_types.Date, sql_types.DateTime):
+ if testing.against("oracle") and ctype_def in (
+ sql_types.Date,
+ sql_types.DateTime,
+ ):
ctype_def = sql_types.Date
# assert that the desired type and return type share
# a base within one of the generic types.
- self.assert_(len(set(ctype.__mro__).
- intersection(ctype_def.__mro__).
- intersection([
- sql_types.Integer,
- sql_types.Numeric,
- sql_types.DateTime,
- sql_types.Date,
- sql_types.Time,
- sql_types.String,
- sql_types._Binary,
- ])) > 0, '%s(%s), %s(%s)' %
- (col.name, col.type, cols[i]['name'], ctype))
+ self.assert_(
+ len(
+ set(ctype.__mro__)
+ .intersection(ctype_def.__mro__)
+ .intersection(
+ [
+ sql_types.Integer,
+ sql_types.Numeric,
+ sql_types.DateTime,
+ sql_types.Date,
+ sql_types.Time,
+ sql_types.String,
+ sql_types._Binary,
+ ]
+ )
+ )
+ > 0,
+ "%s(%s), %s(%s)"
+ % (col.name, col.type, cols[i]["name"], ctype),
+ )
if not col.primary_key:
- assert cols[i]['default'] is None
+ assert cols[i]["default"] is None
@testing.requires.table_reflection
def test_get_columns(self):
@@ -417,24 +461,20 @@ class ComponentReflectionTest(fixtures.TablesTest):
@testing.provide_metadata
def _type_round_trip(self, *types):
- t = Table('t', self.metadata,
- *[
- Column('t%d' % i, type_)
- for i, type_ in enumerate(types)
- ]
- )
+ t = Table(
+ "t",
+ self.metadata,
+ *[Column("t%d" % i, type_) for i, type_ in enumerate(types)]
+ )
t.create()
return [
- c['type'] for c in
- inspect(self.metadata.bind).get_columns('t')
+ c["type"] for c in inspect(self.metadata.bind).get_columns("t")
]
@testing.requires.table_reflection
def test_numeric_reflection(self):
- for typ in self._type_round_trip(
- sql_types.Numeric(18, 5),
- ):
+ for typ in self._type_round_trip(sql_types.Numeric(18, 5)):
assert isinstance(typ, sql_types.Numeric)
eq_(typ.precision, 18)
eq_(typ.scale, 5)
@@ -448,16 +488,19 @@ class ComponentReflectionTest(fixtures.TablesTest):
@testing.requires.table_reflection
@testing.provide_metadata
def test_nullable_reflection(self):
- t = Table('t', self.metadata,
- Column('a', Integer, nullable=True),
- Column('b', Integer, nullable=False))
+ t = Table(
+ "t",
+ self.metadata,
+ Column("a", Integer, nullable=True),
+ Column("b", Integer, nullable=False),
+ )
t.create()
eq_(
dict(
- (col['name'], col['nullable'])
- for col in inspect(self.metadata.bind).get_columns('t')
+ (col["name"], col["nullable"])
+ for col in inspect(self.metadata.bind).get_columns("t")
),
- {"a": True, "b": False}
+ {"a": True, "b": False},
)
@testing.requires.table_reflection
@@ -470,32 +513,30 @@ class ComponentReflectionTest(fixtures.TablesTest):
meta = MetaData(self.bind)
user_tmp = self.tables.user_tmp
insp = inspect(meta.bind)
- cols = insp.get_columns('user_tmp')
+ cols = insp.get_columns("user_tmp")
self.assert_(len(cols) > 0, len(cols))
for i, col in enumerate(user_tmp.columns):
- eq_(col.name, cols[i]['name'])
+ eq_(col.name, cols[i]["name"])
@testing.requires.temp_table_reflection
@testing.requires.view_column_reflection
@testing.requires.temporary_views
def test_get_temp_view_columns(self):
insp = inspect(self.bind)
- cols = insp.get_columns('user_tmp_v')
- eq_(
- [col['name'] for col in cols],
- ['id', 'name', 'foo']
- )
+ cols = insp.get_columns("user_tmp_v")
+ eq_([col["name"] for col in cols], ["id", "name", "foo"])
@testing.requires.view_column_reflection
def test_get_view_columns(self):
- self._test_get_columns(table_type='view')
+ self._test_get_columns(table_type="view")
@testing.requires.view_column_reflection
@testing.requires.schemas
def test_get_view_columns_with_schema(self):
self._test_get_columns(
- schema=testing.config.test_schema, table_type='view')
+ schema=testing.config.test_schema, table_type="view"
+ )
@testing.provide_metadata
def _test_get_pk_constraint(self, schema=None):
@@ -504,15 +545,15 @@ class ComponentReflectionTest(fixtures.TablesTest):
insp = inspect(meta.bind)
users_cons = insp.get_pk_constraint(users.name, schema=schema)
- users_pkeys = users_cons['constrained_columns']
- eq_(users_pkeys, ['user_id'])
+ users_pkeys = users_cons["constrained_columns"]
+ eq_(users_pkeys, ["user_id"])
addr_cons = insp.get_pk_constraint(addresses.name, schema=schema)
- addr_pkeys = addr_cons['constrained_columns']
- eq_(addr_pkeys, ['address_id'])
+ addr_pkeys = addr_cons["constrained_columns"]
+ eq_(addr_pkeys, ["address_id"])
with testing.requires.reflects_pk_names.fail_if():
- eq_(addr_cons['name'], 'email_ad_pk')
+ eq_(addr_cons["name"], "email_ad_pk")
@testing.requires.primary_key_constraint_reflection
def test_get_pk_constraint(self):
@@ -534,44 +575,46 @@ class ComponentReflectionTest(fixtures.TablesTest):
sa_exc.SADeprecationWarning,
"Call to deprecated method get_primary_keys."
" Use get_pk_constraint instead.",
- insp.get_primary_keys, users.name
+ insp.get_primary_keys,
+ users.name,
)
@testing.provide_metadata
def _test_get_foreign_keys(self, schema=None):
meta = self.metadata
- users, addresses, dingalings = self.tables.users, \
- self.tables.email_addresses, self.tables.dingalings
+ users, addresses, dingalings = (
+ self.tables.users,
+ self.tables.email_addresses,
+ self.tables.dingalings,
+ )
insp = inspect(meta.bind)
expected_schema = schema
# users
if testing.requires.self_referential_foreign_keys.enabled:
- users_fkeys = insp.get_foreign_keys(users.name,
- schema=schema)
+ users_fkeys = insp.get_foreign_keys(users.name, schema=schema)
fkey1 = users_fkeys[0]
with testing.requires.named_constraints.fail_if():
- eq_(fkey1['name'], "user_id_fk")
+ eq_(fkey1["name"], "user_id_fk")
- eq_(fkey1['referred_schema'], expected_schema)
- eq_(fkey1['referred_table'], users.name)
- eq_(fkey1['referred_columns'], ['user_id', ])
+ eq_(fkey1["referred_schema"], expected_schema)
+ eq_(fkey1["referred_table"], users.name)
+ eq_(fkey1["referred_columns"], ["user_id"])
if testing.requires.self_referential_foreign_keys.enabled:
- eq_(fkey1['constrained_columns'], ['parent_user_id'])
+ eq_(fkey1["constrained_columns"], ["parent_user_id"])
# addresses
- addr_fkeys = insp.get_foreign_keys(addresses.name,
- schema=schema)
+ addr_fkeys = insp.get_foreign_keys(addresses.name, schema=schema)
fkey1 = addr_fkeys[0]
with testing.requires.implicitly_named_constraints.fail_if():
- self.assert_(fkey1['name'] is not None)
+ self.assert_(fkey1["name"] is not None)
- eq_(fkey1['referred_schema'], expected_schema)
- eq_(fkey1['referred_table'], users.name)
- eq_(fkey1['referred_columns'], ['user_id', ])
- eq_(fkey1['constrained_columns'], ['remote_user_id'])
+ eq_(fkey1["referred_schema"], expected_schema)
+ eq_(fkey1["referred_table"], users.name)
+ eq_(fkey1["referred_columns"], ["user_id"])
+ eq_(fkey1["constrained_columns"], ["remote_user_id"])
@testing.requires.foreign_key_constraint_reflection
def test_get_foreign_keys(self):
@@ -586,9 +629,9 @@ class ComponentReflectionTest(fixtures.TablesTest):
@testing.requires.schemas
def test_get_inter_schema_foreign_keys(self):
local_table, remote_table, remote_table_2 = self.tables(
- '%s.local_table' % testing.db.dialect.default_schema_name,
- '%s.remote_table' % testing.config.test_schema,
- '%s.remote_table_2' % testing.config.test_schema
+ "%s.local_table" % testing.db.dialect.default_schema_name,
+ "%s.remote_table" % testing.config.test_schema,
+ "%s.remote_table_2" % testing.config.test_schema,
)
insp = inspect(config.db)
@@ -597,25 +640,25 @@ class ComponentReflectionTest(fixtures.TablesTest):
eq_(len(local_fkeys), 1)
fkey1 = local_fkeys[0]
- eq_(fkey1['referred_schema'], testing.config.test_schema)
- eq_(fkey1['referred_table'], remote_table_2.name)
- eq_(fkey1['referred_columns'], ['id', ])
- eq_(fkey1['constrained_columns'], ['remote_id'])
+ eq_(fkey1["referred_schema"], testing.config.test_schema)
+ eq_(fkey1["referred_table"], remote_table_2.name)
+ eq_(fkey1["referred_columns"], ["id"])
+ eq_(fkey1["constrained_columns"], ["remote_id"])
remote_fkeys = insp.get_foreign_keys(
- remote_table.name, schema=testing.config.test_schema)
+ remote_table.name, schema=testing.config.test_schema
+ )
eq_(len(remote_fkeys), 1)
fkey2 = remote_fkeys[0]
- assert fkey2['referred_schema'] in (
+ assert fkey2["referred_schema"] in (
None,
- testing.db.dialect.default_schema_name
+ testing.db.dialect.default_schema_name,
)
- eq_(fkey2['referred_table'], local_table.name)
- eq_(fkey2['referred_columns'], ['id', ])
- eq_(fkey2['constrained_columns'], ['local_id'])
-
+ eq_(fkey2["referred_table"], local_table.name)
+ eq_(fkey2["referred_columns"], ["id"])
+ eq_(fkey2["constrained_columns"], ["local_id"])
@testing.requires.foreign_key_constraint_option_reflection_ondelete
def test_get_foreign_key_options_ondelete(self):
@@ -630,26 +673,32 @@ class ComponentReflectionTest(fixtures.TablesTest):
meta = self.metadata
Table(
- 'x', meta,
- Column('id', Integer, primary_key=True),
- test_needs_fk=True
- )
-
- Table('table', meta,
- Column('id', Integer, primary_key=True),
- Column('x_id', Integer, sa.ForeignKey('x.id', name='xid')),
- Column('test', String(10)),
- test_needs_fk=True)
-
- Table('user', meta,
- Column('id', Integer, primary_key=True),
- Column('name', String(50), nullable=False),
- Column('tid', Integer),
- sa.ForeignKeyConstraint(
- ['tid'], ['table.id'],
- name='myfk',
- **options),
- test_needs_fk=True)
+ "x",
+ meta,
+ Column("id", Integer, primary_key=True),
+ test_needs_fk=True,
+ )
+
+ Table(
+ "table",
+ meta,
+ Column("id", Integer, primary_key=True),
+ Column("x_id", Integer, sa.ForeignKey("x.id", name="xid")),
+ Column("test", String(10)),
+ test_needs_fk=True,
+ )
+
+ Table(
+ "user",
+ meta,
+ Column("id", Integer, primary_key=True),
+ Column("name", String(50), nullable=False),
+ Column("tid", Integer),
+ sa.ForeignKeyConstraint(
+ ["tid"], ["table.id"], name="myfk", **options
+ ),
+ test_needs_fk=True,
+ )
meta.create_all()
@@ -657,49 +706,44 @@ class ComponentReflectionTest(fixtures.TablesTest):
# test 'options' is always present for a backend
# that can reflect these, since alembic looks for this
- opts = insp.get_foreign_keys('table')[0]['options']
+ opts = insp.get_foreign_keys("table")[0]["options"]
- eq_(
- dict(
- (k, opts[k])
- for k in opts if opts[k]
- ),
- {}
- )
+ eq_(dict((k, opts[k]) for k in opts if opts[k]), {})
- opts = insp.get_foreign_keys('user')[0]['options']
- eq_(
- dict(
- (k, opts[k])
- for k in opts if opts[k]
- ),
- options
- )
+ opts = insp.get_foreign_keys("user")[0]["options"]
+ eq_(dict((k, opts[k]) for k in opts if opts[k]), options)
def _assert_insp_indexes(self, indexes, expected_indexes):
- index_names = [d['name'] for d in indexes]
+ index_names = [d["name"] for d in indexes]
for e_index in expected_indexes:
- assert e_index['name'] in index_names
- index = indexes[index_names.index(e_index['name'])]
+ assert e_index["name"] in index_names
+ index = indexes[index_names.index(e_index["name"])]
for key in e_index:
eq_(e_index[key], index[key])
@testing.provide_metadata
def _test_get_indexes(self, schema=None):
meta = self.metadata
- users, addresses, dingalings = self.tables.users, \
- self.tables.email_addresses, self.tables.dingalings
+ users, addresses, dingalings = (
+ self.tables.users,
+ self.tables.email_addresses,
+ self.tables.dingalings,
+ )
# The database may decide to create indexes for foreign keys, etc.
# so there may be more indexes than expected.
insp = inspect(meta.bind)
- indexes = insp.get_indexes('users', schema=schema)
+ indexes = insp.get_indexes("users", schema=schema)
expected_indexes = [
- {'unique': False,
- 'column_names': ['test1', 'test2'],
- 'name': 'users_t_idx'},
- {'unique': False,
- 'column_names': ['user_id', 'test2', 'test1'],
- 'name': 'users_all_idx'}
+ {
+ "unique": False,
+ "column_names": ["test1", "test2"],
+ "name": "users_t_idx",
+ },
+ {
+ "unique": False,
+ "column_names": ["user_id", "test2", "test1"],
+ "name": "users_all_idx",
+ },
]
self._assert_insp_indexes(indexes, expected_indexes)
@@ -721,10 +765,7 @@ class ComponentReflectionTest(fixtures.TablesTest):
# reflecting an index that has "x DESC" in it as the column.
# the DB may or may not give us "x", but make sure we get the index
# back, it has a name, it's connected to the table.
- expected_indexes = [
- {'unique': False,
- 'name': ixname}
- ]
+ expected_indexes = [{"unique": False, "name": ixname}]
self._assert_insp_indexes(indexes, expected_indexes)
t = Table(tname, meta, autoload_with=meta.bind)
@@ -748,24 +789,30 @@ class ComponentReflectionTest(fixtures.TablesTest):
@testing.requires.unique_constraint_reflection
def test_get_temp_table_unique_constraints(self):
insp = inspect(self.bind)
- reflected = insp.get_unique_constraints('user_tmp')
+ reflected = insp.get_unique_constraints("user_tmp")
for refl in reflected:
# Different dialects handle duplicate index and constraints
# differently, so ignore this flag
- refl.pop('duplicates_index', None)
- eq_(reflected, [{'column_names': ['name'], 'name': 'user_tmp_uq'}])
+ refl.pop("duplicates_index", None)
+ eq_(reflected, [{"column_names": ["name"], "name": "user_tmp_uq"}])
@testing.requires.temp_table_reflection
def test_get_temp_table_indexes(self):
insp = inspect(self.bind)
- indexes = insp.get_indexes('user_tmp')
+ indexes = insp.get_indexes("user_tmp")
for ind in indexes:
- ind.pop('dialect_options', None)
+ ind.pop("dialect_options", None)
eq_(
# TODO: we need to add better filtering for indexes/uq constraints
# that are doubled up
- [idx for idx in indexes if idx['name'] == 'user_tmp_ix'],
- [{'unique': False, 'column_names': ['foo'], 'name': 'user_tmp_ix'}]
+ [idx for idx in indexes if idx["name"] == "user_tmp_ix"],
+ [
+ {
+ "unique": False,
+ "column_names": ["foo"],
+ "name": "user_tmp_ix",
+ }
+ ],
)
@testing.requires.unique_constraint_reflection
@@ -783,36 +830,37 @@ class ComponentReflectionTest(fixtures.TablesTest):
# CREATE TABLE?
uniques = sorted(
[
- {'name': 'unique_a', 'column_names': ['a']},
- {'name': 'unique_a_b_c', 'column_names': ['a', 'b', 'c']},
- {'name': 'unique_c_a_b', 'column_names': ['c', 'a', 'b']},
- {'name': 'unique_asc_key', 'column_names': ['asc', 'key']},
- {'name': 'i.have.dots', 'column_names': ['b']},
- {'name': 'i have spaces', 'column_names': ['c']},
+ {"name": "unique_a", "column_names": ["a"]},
+ {"name": "unique_a_b_c", "column_names": ["a", "b", "c"]},
+ {"name": "unique_c_a_b", "column_names": ["c", "a", "b"]},
+ {"name": "unique_asc_key", "column_names": ["asc", "key"]},
+ {"name": "i.have.dots", "column_names": ["b"]},
+ {"name": "i have spaces", "column_names": ["c"]},
],
- key=operator.itemgetter('name')
+ key=operator.itemgetter("name"),
)
orig_meta = self.metadata
table = Table(
- 'testtbl', orig_meta,
- Column('a', sa.String(20)),
- Column('b', sa.String(30)),
- Column('c', sa.Integer),
+ "testtbl",
+ orig_meta,
+ Column("a", sa.String(20)),
+ Column("b", sa.String(30)),
+ Column("c", sa.Integer),
# reserved identifiers
- Column('asc', sa.String(30)),
- Column('key', sa.String(30)),
- schema=schema
+ Column("asc", sa.String(30)),
+ Column("key", sa.String(30)),
+ schema=schema,
)
for uc in uniques:
table.append_constraint(
- sa.UniqueConstraint(*uc['column_names'], name=uc['name'])
+ sa.UniqueConstraint(*uc["column_names"], name=uc["name"])
)
orig_meta.create_all()
inspector = inspect(orig_meta.bind)
reflected = sorted(
- inspector.get_unique_constraints('testtbl', schema=schema),
- key=operator.itemgetter('name')
+ inspector.get_unique_constraints("testtbl", schema=schema),
+ key=operator.itemgetter("name"),
)
names_that_duplicate_index = set()
@@ -820,25 +868,31 @@ class ComponentReflectionTest(fixtures.TablesTest):
for orig, refl in zip(uniques, reflected):
# Different dialects handle duplicate index and constraints
# differently, so ignore this flag
- dupe = refl.pop('duplicates_index', None)
+ dupe = refl.pop("duplicates_index", None)
if dupe:
names_that_duplicate_index.add(dupe)
eq_(orig, refl)
reflected_metadata = MetaData()
reflected = Table(
- 'testtbl', reflected_metadata, autoload_with=orig_meta.bind,
- schema=schema)
+ "testtbl",
+ reflected_metadata,
+ autoload_with=orig_meta.bind,
+ schema=schema,
+ )
# test "deduplicates for index" logic. MySQL and Oracle
# "unique constraints" are actually unique indexes (with possible
# exception of a unique that is a dupe of another one in the case
# of Oracle). make sure # they aren't duplicated.
idx_names = set([idx.name for idx in reflected.indexes])
- uq_names = set([
- uq.name for uq in reflected.constraints
- if isinstance(uq, sa.UniqueConstraint)]).difference(
- ['unique_c_a_b'])
+ uq_names = set(
+ [
+ uq.name
+ for uq in reflected.constraints
+ if isinstance(uq, sa.UniqueConstraint)
+ ]
+ ).difference(["unique_c_a_b"])
assert not idx_names.intersection(uq_names)
if names_that_duplicate_index:
@@ -858,47 +912,52 @@ class ComponentReflectionTest(fixtures.TablesTest):
def _test_get_check_constraints(self, schema=None):
orig_meta = self.metadata
Table(
- 'sa_cc', orig_meta,
- Column('a', Integer()),
- sa.CheckConstraint('a > 1 AND a < 5', name='cc1'),
- sa.CheckConstraint('a = 1 OR (a > 2 AND a < 5)', name='cc2'),
- schema=schema
+ "sa_cc",
+ orig_meta,
+ Column("a", Integer()),
+ sa.CheckConstraint("a > 1 AND a < 5", name="cc1"),
+ sa.CheckConstraint("a = 1 OR (a > 2 AND a < 5)", name="cc2"),
+ schema=schema,
)
orig_meta.create_all()
inspector = inspect(orig_meta.bind)
reflected = sorted(
- inspector.get_check_constraints('sa_cc', schema=schema),
- key=operator.itemgetter('name')
+ inspector.get_check_constraints("sa_cc", schema=schema),
+ key=operator.itemgetter("name"),
)
# trying to minimize effect of quoting, parenthesis, etc.
# may need to add more to this as new dialects get CHECK
# constraint reflection support
def normalize(sqltext):
- return " ".join(re.findall(r"and|\d|=|a|or|<|>", sqltext.lower(), re.I))
+ return " ".join(
+ re.findall(r"and|\d|=|a|or|<|>", sqltext.lower(), re.I)
+ )
reflected = [
- {"name": item["name"],
- "sqltext": normalize(item["sqltext"])}
+ {"name": item["name"], "sqltext": normalize(item["sqltext"])}
for item in reflected
]
eq_(
reflected,
[
- {'name': 'cc1', 'sqltext': 'a > 1 and a < 5'},
- {'name': 'cc2', 'sqltext': 'a = 1 or a > 2 and a < 5'}
- ]
+ {"name": "cc1", "sqltext": "a > 1 and a < 5"},
+ {"name": "cc2", "sqltext": "a = 1 or a > 2 and a < 5"},
+ ],
)
@testing.provide_metadata
def _test_get_view_definition(self, schema=None):
meta = self.metadata
- users, addresses, dingalings = self.tables.users, \
- self.tables.email_addresses, self.tables.dingalings
- view_name1 = 'users_v'
- view_name2 = 'email_addresses_v'
+ users, addresses, dingalings = (
+ self.tables.users,
+ self.tables.email_addresses,
+ self.tables.dingalings,
+ )
+ view_name1 = "users_v"
+ view_name2 = "email_addresses_v"
insp = inspect(meta.bind)
v1 = insp.get_view_definition(view_name1, schema=schema)
self.assert_(v1)
@@ -918,18 +977,21 @@ class ComponentReflectionTest(fixtures.TablesTest):
@testing.provide_metadata
def _test_get_table_oid(self, table_name, schema=None):
meta = self.metadata
- users, addresses, dingalings = self.tables.users, \
- self.tables.email_addresses, self.tables.dingalings
+ users, addresses, dingalings = (
+ self.tables.users,
+ self.tables.email_addresses,
+ self.tables.dingalings,
+ )
insp = inspect(meta.bind)
oid = insp.get_table_oid(table_name, schema)
self.assert_(isinstance(oid, int))
def test_get_table_oid(self):
- self._test_get_table_oid('users')
+ self._test_get_table_oid("users")
@testing.requires.schemas
def test_get_table_oid_with_schema(self):
- self._test_get_table_oid('users', schema=testing.config.test_schema)
+ self._test_get_table_oid("users", schema=testing.config.test_schema)
@testing.requires.table_reflection
@testing.provide_metadata
@@ -950,49 +1012,53 @@ class ComponentReflectionTest(fixtures.TablesTest):
insp = inspect(meta.bind)
for tname, cname in [
- ('users', 'user_id'),
- ('email_addresses', 'address_id'),
- ('dingalings', 'dingaling_id'),
+ ("users", "user_id"),
+ ("email_addresses", "address_id"),
+ ("dingalings", "dingaling_id"),
]:
cols = insp.get_columns(tname)
- id_ = {c['name']: c for c in cols}[cname]
- assert id_.get('autoincrement', True)
+ id_ = {c["name"]: c for c in cols}[cname]
+ assert id_.get("autoincrement", True)
class NormalizedNameTest(fixtures.TablesTest):
- __requires__ = 'denormalized_names',
+ __requires__ = ("denormalized_names",)
__backend__ = True
@classmethod
def define_tables(cls, metadata):
Table(
- quoted_name('t1', quote=True), metadata,
- Column('id', Integer, primary_key=True),
+ quoted_name("t1", quote=True),
+ metadata,
+ Column("id", Integer, primary_key=True),
)
Table(
- quoted_name('t2', quote=True), metadata,
- Column('id', Integer, primary_key=True),
- Column('t1id', ForeignKey('t1.id'))
+ quoted_name("t2", quote=True),
+ metadata,
+ Column("id", Integer, primary_key=True),
+ Column("t1id", ForeignKey("t1.id")),
)
def test_reflect_lowercase_forced_tables(self):
m2 = MetaData(testing.db)
- t2_ref = Table(quoted_name('t2', quote=True), m2, autoload=True)
- t1_ref = m2.tables['t1']
+ t2_ref = Table(quoted_name("t2", quote=True), m2, autoload=True)
+ t1_ref = m2.tables["t1"]
assert t2_ref.c.t1id.references(t1_ref.c.id)
m3 = MetaData(testing.db)
- m3.reflect(only=lambda name, m: name.lower() in ('t1', 't2'))
- assert m3.tables['t2'].c.t1id.references(m3.tables['t1'].c.id)
+ m3.reflect(only=lambda name, m: name.lower() in ("t1", "t2"))
+ assert m3.tables["t2"].c.t1id.references(m3.tables["t1"].c.id)
def test_get_table_names(self):
tablenames = [
- t for t in inspect(testing.db).get_table_names()
- if t.lower() in ("t1", "t2")]
+ t
+ for t in inspect(testing.db).get_table_names()
+ if t.lower() in ("t1", "t2")
+ ]
eq_(tablenames[0].upper(), tablenames[0].lower())
eq_(tablenames[1].upper(), tablenames[1].lower())
-__all__ = ('ComponentReflectionTest', 'HasTableTest', 'NormalizedNameTest')
+__all__ = ("ComponentReflectionTest", "HasTableTest", "NormalizedNameTest")
diff --git a/lib/sqlalchemy/testing/suite/test_results.py b/lib/sqlalchemy/testing/suite/test_results.py
index f464d47eb..247f05cf5 100644
--- a/lib/sqlalchemy/testing/suite/test_results.py
+++ b/lib/sqlalchemy/testing/suite/test_results.py
@@ -15,14 +15,18 @@ class RowFetchTest(fixtures.TablesTest):
@classmethod
def define_tables(cls, metadata):
- Table('plain_pk', metadata,
- Column('id', Integer, primary_key=True),
- Column('data', String(50))
- )
- Table('has_dates', metadata,
- Column('id', Integer, primary_key=True),
- Column('today', DateTime)
- )
+ Table(
+ "plain_pk",
+ metadata,
+ Column("id", Integer, primary_key=True),
+ Column("data", String(50)),
+ )
+ Table(
+ "has_dates",
+ metadata,
+ Column("id", Integer, primary_key=True),
+ Column("today", DateTime),
+ )
@classmethod
def insert_data(cls):
@@ -32,65 +36,51 @@ class RowFetchTest(fixtures.TablesTest):
{"id": 1, "data": "d1"},
{"id": 2, "data": "d2"},
{"id": 3, "data": "d3"},
- ]
+ ],
)
config.db.execute(
cls.tables.has_dates.insert(),
- [
- {"id": 1, "today": datetime.datetime(2006, 5, 12, 12, 0, 0)}
- ]
+ [{"id": 1, "today": datetime.datetime(2006, 5, 12, 12, 0, 0)}],
)
def test_via_string(self):
row = config.db.execute(
- self.tables.plain_pk.select().
- order_by(self.tables.plain_pk.c.id)
+ self.tables.plain_pk.select().order_by(self.tables.plain_pk.c.id)
).first()
- eq_(
- row['id'], 1
- )
- eq_(
- row['data'], "d1"
- )
+ eq_(row["id"], 1)
+ eq_(row["data"], "d1")
def test_via_int(self):
row = config.db.execute(
- self.tables.plain_pk.select().
- order_by(self.tables.plain_pk.c.id)
+ self.tables.plain_pk.select().order_by(self.tables.plain_pk.c.id)
).first()
- eq_(
- row[0], 1
- )
- eq_(
- row[1], "d1"
- )
+ eq_(row[0], 1)
+ eq_(row[1], "d1")
def test_via_col_object(self):
row = config.db.execute(
- self.tables.plain_pk.select().
- order_by(self.tables.plain_pk.c.id)
+ self.tables.plain_pk.select().order_by(self.tables.plain_pk.c.id)
).first()
- eq_(
- row[self.tables.plain_pk.c.id], 1
- )
- eq_(
- row[self.tables.plain_pk.c.data], "d1"
- )
+ eq_(row[self.tables.plain_pk.c.id], 1)
+ eq_(row[self.tables.plain_pk.c.data], "d1")
@requirements.duplicate_names_in_cursor_description
def test_row_with_dupe_names(self):
result = config.db.execute(
- select([self.tables.plain_pk.c.data,
- self.tables.plain_pk.c.data.label('data')]).
- order_by(self.tables.plain_pk.c.id)
+ select(
+ [
+ self.tables.plain_pk.c.data,
+ self.tables.plain_pk.c.data.label("data"),
+ ]
+ ).order_by(self.tables.plain_pk.c.id)
)
row = result.first()
- eq_(result.keys(), ['data', 'data'])
- eq_(row, ('d1', 'd1'))
+ eq_(result.keys(), ["data", "data"])
+ eq_(row, ("d1", "d1"))
def test_row_w_scalar_select(self):
"""test that a scalar select as a column is returned as such
@@ -101,11 +91,11 @@ class RowFetchTest(fixtures.TablesTest):
"""
datetable = self.tables.has_dates
- s = select([datetable.alias('x').c.today]).as_scalar()
- s2 = select([datetable.c.id, s.label('somelabel')])
+ s = select([datetable.alias("x").c.today]).as_scalar()
+ s2 = select([datetable.c.id, s.label("somelabel")])
row = config.db.execute(s2).first()
- eq_(row['somelabel'], datetime.datetime(2006, 5, 12, 12, 0, 0))
+ eq_(row["somelabel"], datetime.datetime(2006, 5, 12, 12, 0, 0))
class PercentSchemaNamesTest(fixtures.TablesTest):
@@ -117,29 +107,31 @@ class PercentSchemaNamesTest(fixtures.TablesTest):
"""
- __requires__ = ('percent_schema_names', )
+ __requires__ = ("percent_schema_names",)
__backend__ = True
@classmethod
def define_tables(cls, metadata):
- cls.tables.percent_table = Table('percent%table', metadata,
- Column("percent%", Integer),
- Column(
- "spaces % more spaces", Integer),
- )
+ cls.tables.percent_table = Table(
+ "percent%table",
+ metadata,
+ Column("percent%", Integer),
+ Column("spaces % more spaces", Integer),
+ )
cls.tables.lightweight_percent_table = sql.table(
- 'percent%table', sql.column("percent%"),
- sql.column("spaces % more spaces")
+ "percent%table",
+ sql.column("percent%"),
+ sql.column("spaces % more spaces"),
)
def test_single_roundtrip(self):
percent_table = self.tables.percent_table
for params in [
- {'percent%': 5, 'spaces % more spaces': 12},
- {'percent%': 7, 'spaces % more spaces': 11},
- {'percent%': 9, 'spaces % more spaces': 10},
- {'percent%': 11, 'spaces % more spaces': 9}
+ {"percent%": 5, "spaces % more spaces": 12},
+ {"percent%": 7, "spaces % more spaces": 11},
+ {"percent%": 9, "spaces % more spaces": 10},
+ {"percent%": 11, "spaces % more spaces": 9},
]:
config.db.execute(percent_table.insert(), params)
self._assert_table()
@@ -147,14 +139,15 @@ class PercentSchemaNamesTest(fixtures.TablesTest):
def test_executemany_roundtrip(self):
percent_table = self.tables.percent_table
config.db.execute(
- percent_table.insert(),
- {'percent%': 5, 'spaces % more spaces': 12}
+ percent_table.insert(), {"percent%": 5, "spaces % more spaces": 12}
)
config.db.execute(
percent_table.insert(),
- [{'percent%': 7, 'spaces % more spaces': 11},
- {'percent%': 9, 'spaces % more spaces': 10},
- {'percent%': 11, 'spaces % more spaces': 9}]
+ [
+ {"percent%": 7, "spaces % more spaces": 11},
+ {"percent%": 9, "spaces % more spaces": 10},
+ {"percent%": 11, "spaces % more spaces": 9},
+ ],
)
self._assert_table()
@@ -163,85 +156,81 @@ class PercentSchemaNamesTest(fixtures.TablesTest):
lightweight_percent_table = self.tables.lightweight_percent_table
for table in (
- percent_table,
- percent_table.alias(),
- lightweight_percent_table,
- lightweight_percent_table.alias()):
+ percent_table,
+ percent_table.alias(),
+ lightweight_percent_table,
+ lightweight_percent_table.alias(),
+ ):
eq_(
list(
config.db.execute(
- table.select().order_by(table.c['percent%'])
+ table.select().order_by(table.c["percent%"])
)
),
- [
- (5, 12),
- (7, 11),
- (9, 10),
- (11, 9)
- ]
+ [(5, 12), (7, 11), (9, 10), (11, 9)],
)
eq_(
list(
config.db.execute(
- table.select().
- where(table.c['spaces % more spaces'].in_([9, 10])).
- order_by(table.c['percent%']),
+ table.select()
+ .where(table.c["spaces % more spaces"].in_([9, 10]))
+ .order_by(table.c["percent%"])
)
),
- [
- (9, 10),
- (11, 9)
- ]
+ [(9, 10), (11, 9)],
)
- row = config.db.execute(table.select().
- order_by(table.c['percent%'])).first()
- eq_(row['percent%'], 5)
- eq_(row['spaces % more spaces'], 12)
+ row = config.db.execute(
+ table.select().order_by(table.c["percent%"])
+ ).first()
+ eq_(row["percent%"], 5)
+ eq_(row["spaces % more spaces"], 12)
- eq_(row[table.c['percent%']], 5)
- eq_(row[table.c['spaces % more spaces']], 12)
+ eq_(row[table.c["percent%"]], 5)
+ eq_(row[table.c["spaces % more spaces"]], 12)
config.db.execute(
percent_table.update().values(
- {percent_table.c['spaces % more spaces']: 15}
+ {percent_table.c["spaces % more spaces"]: 15}
)
)
eq_(
list(
config.db.execute(
- percent_table.
- select().
- order_by(percent_table.c['percent%'])
+ percent_table.select().order_by(
+ percent_table.c["percent%"]
+ )
)
),
- [(5, 15), (7, 15), (9, 15), (11, 15)]
+ [(5, 15), (7, 15), (9, 15), (11, 15)],
)
-class ServerSideCursorsTest(fixtures.TestBase, testing.AssertsExecutionResults):
+class ServerSideCursorsTest(
+ fixtures.TestBase, testing.AssertsExecutionResults
+):
- __requires__ = ('server_side_cursors', )
+ __requires__ = ("server_side_cursors",)
__backend__ = True
def _is_server_side(self, cursor):
if self.engine.dialect.driver == "psycopg2":
return cursor.name
- elif self.engine.dialect.driver == 'pymysql':
- sscursor = __import__('pymysql.cursors').cursors.SSCursor
+ elif self.engine.dialect.driver == "pymysql":
+ sscursor = __import__("pymysql.cursors").cursors.SSCursor
return isinstance(cursor, sscursor)
elif self.engine.dialect.driver == "mysqldb":
- sscursor = __import__('MySQLdb.cursors').cursors.SSCursor
+ sscursor = __import__("MySQLdb.cursors").cursors.SSCursor
return isinstance(cursor, sscursor)
else:
return False
def _fixture(self, server_side_cursors):
self.engine = engines.testing_engine(
- options={'server_side_cursors': server_side_cursors}
+ options={"server_side_cursors": server_side_cursors}
)
return self.engine
@@ -251,12 +240,12 @@ class ServerSideCursorsTest(fixtures.TestBase, testing.AssertsExecutionResults):
def test_global_string(self):
engine = self._fixture(True)
- result = engine.execute('select 1')
+ result = engine.execute("select 1")
assert self._is_server_side(result.cursor)
def test_global_text(self):
engine = self._fixture(True)
- result = engine.execute(text('select 1'))
+ result = engine.execute(text("select 1"))
assert self._is_server_side(result.cursor)
def test_global_expr(self):
@@ -266,7 +255,7 @@ class ServerSideCursorsTest(fixtures.TestBase, testing.AssertsExecutionResults):
def test_global_off_explicit(self):
engine = self._fixture(False)
- result = engine.execute(text('select 1'))
+ result = engine.execute(text("select 1"))
# It should be off globally ...
@@ -286,10 +275,11 @@ class ServerSideCursorsTest(fixtures.TestBase, testing.AssertsExecutionResults):
engine = self._fixture(False)
# and this one
- result = \
- engine.connect().execution_options(stream_results=True).\
- execute('select 1'
- )
+ result = (
+ engine.connect()
+ .execution_options(stream_results=True)
+ .execute("select 1")
+ )
assert self._is_server_side(result.cursor)
def test_stmt_enabled_conn_option_disabled(self):
@@ -298,9 +288,9 @@ class ServerSideCursorsTest(fixtures.TestBase, testing.AssertsExecutionResults):
s = select([1]).execution_options(stream_results=True)
# not this one
- result = \
- engine.connect().execution_options(stream_results=False).\
- execute(s)
+ result = (
+ engine.connect().execution_options(stream_results=False).execute(s)
+ )
assert not self._is_server_side(result.cursor)
def test_stmt_option_disabled(self):
@@ -329,18 +319,18 @@ class ServerSideCursorsTest(fixtures.TestBase, testing.AssertsExecutionResults):
def test_for_update_string(self):
engine = self._fixture(True)
- result = engine.execute('SELECT 1 FOR UPDATE')
+ result = engine.execute("SELECT 1 FOR UPDATE")
assert self._is_server_side(result.cursor)
def test_text_no_ss(self):
engine = self._fixture(False)
- s = text('select 42')
+ s = text("select 42")
result = engine.execute(s)
assert not self._is_server_side(result.cursor)
def test_text_ss_option(self):
engine = self._fixture(False)
- s = text('select 42').execution_options(stream_results=True)
+ s = text("select 42").execution_options(stream_results=True)
result = engine.execute(s)
assert self._is_server_side(result.cursor)
@@ -349,19 +339,25 @@ class ServerSideCursorsTest(fixtures.TestBase, testing.AssertsExecutionResults):
md = self.metadata
engine = self._fixture(True)
- test_table = Table('test_table', md,
- Column('id', Integer, primary_key=True),
- Column('data', String(50)))
+ test_table = Table(
+ "test_table",
+ md,
+ Column("id", Integer, primary_key=True),
+ Column("data", String(50)),
+ )
test_table.create(checkfirst=True)
- test_table.insert().execute(data='data1')
- test_table.insert().execute(data='data2')
- eq_(test_table.select().order_by(test_table.c.id).execute().fetchall(),
- [(1, 'data1'), (2, 'data2')])
- test_table.update().where(
- test_table.c.id == 2).values(
- data=test_table.c.data +
- ' updated').execute()
- eq_(test_table.select().order_by(test_table.c.id).execute().fetchall(),
- [(1, 'data1'), (2, 'data2 updated')])
+ test_table.insert().execute(data="data1")
+ test_table.insert().execute(data="data2")
+ eq_(
+ test_table.select().order_by(test_table.c.id).execute().fetchall(),
+ [(1, "data1"), (2, "data2")],
+ )
+ test_table.update().where(test_table.c.id == 2).values(
+ data=test_table.c.data + " updated"
+ ).execute()
+ eq_(
+ test_table.select().order_by(test_table.c.id).execute().fetchall(),
+ [(1, "data1"), (2, "data2 updated")],
+ )
test_table.delete().execute()
- eq_(select([func.count('*')]).select_from(test_table).scalar(), 0)
+ eq_(select([func.count("*")]).select_from(test_table).scalar(), 0)
diff --git a/lib/sqlalchemy/testing/suite/test_select.py b/lib/sqlalchemy/testing/suite/test_select.py
index 73ce02492..032b68eb6 100644
--- a/lib/sqlalchemy/testing/suite/test_select.py
+++ b/lib/sqlalchemy/testing/suite/test_select.py
@@ -16,10 +16,12 @@ class CollateTest(fixtures.TablesTest):
@classmethod
def define_tables(cls, metadata):
- Table("some_table", metadata,
- Column('id', Integer, primary_key=True),
- Column('data', String(100))
- )
+ Table(
+ "some_table",
+ metadata,
+ Column("id", Integer, primary_key=True),
+ Column("data", String(100)),
+ )
@classmethod
def insert_data(cls):
@@ -28,26 +30,21 @@ class CollateTest(fixtures.TablesTest):
[
{"id": 1, "data": "collate data1"},
{"id": 2, "data": "collate data2"},
- ]
+ ],
)
def _assert_result(self, select, result):
- eq_(
- config.db.execute(select).fetchall(),
- result
- )
+ eq_(config.db.execute(select).fetchall(), result)
@testing.requires.order_by_collation
def test_collate_order_by(self):
collation = testing.requires.get_order_by_collation(testing.config)
self._assert_result(
- select([self.tables.some_table]).
- order_by(self.tables.some_table.c.data.collate(collation).asc()),
- [
- (1, "collate data1"),
- (2, "collate data2"),
- ]
+ select([self.tables.some_table]).order_by(
+ self.tables.some_table.c.data.collate(collation).asc()
+ ),
+ [(1, "collate data1"), (2, "collate data2")],
)
@@ -59,17 +56,20 @@ class OrderByLabelTest(fixtures.TablesTest):
setting.
"""
+
__backend__ = True
@classmethod
def define_tables(cls, metadata):
- Table("some_table", metadata,
- Column('id', Integer, primary_key=True),
- Column('x', Integer),
- Column('y', Integer),
- Column('q', String(50)),
- Column('p', String(50))
- )
+ Table(
+ "some_table",
+ metadata,
+ Column("id", Integer, primary_key=True),
+ Column("x", Integer),
+ Column("y", Integer),
+ Column("q", String(50)),
+ Column("p", String(50)),
+ )
@classmethod
def insert_data(cls):
@@ -79,65 +79,55 @@ class OrderByLabelTest(fixtures.TablesTest):
{"id": 1, "x": 1, "y": 2, "q": "q1", "p": "p3"},
{"id": 2, "x": 2, "y": 3, "q": "q2", "p": "p2"},
{"id": 3, "x": 3, "y": 4, "q": "q3", "p": "p1"},
- ]
+ ],
)
def _assert_result(self, select, result):
- eq_(
- config.db.execute(select).fetchall(),
- result
- )
+ eq_(config.db.execute(select).fetchall(), result)
def test_plain(self):
table = self.tables.some_table
- lx = table.c.x.label('lx')
- self._assert_result(
- select([lx]).order_by(lx),
- [(1, ), (2, ), (3, )]
- )
+ lx = table.c.x.label("lx")
+ self._assert_result(select([lx]).order_by(lx), [(1,), (2,), (3,)])
def test_composed_int(self):
table = self.tables.some_table
- lx = (table.c.x + table.c.y).label('lx')
- self._assert_result(
- select([lx]).order_by(lx),
- [(3, ), (5, ), (7, )]
- )
+ lx = (table.c.x + table.c.y).label("lx")
+ self._assert_result(select([lx]).order_by(lx), [(3,), (5,), (7,)])
def test_composed_multiple(self):
table = self.tables.some_table
- lx = (table.c.x + table.c.y).label('lx')
- ly = (func.lower(table.c.q) + table.c.p).label('ly')
+ lx = (table.c.x + table.c.y).label("lx")
+ ly = (func.lower(table.c.q) + table.c.p).label("ly")
self._assert_result(
select([lx, ly]).order_by(lx, ly.desc()),
- [(3, util.u('q1p3')), (5, util.u('q2p2')), (7, util.u('q3p1'))]
+ [(3, util.u("q1p3")), (5, util.u("q2p2")), (7, util.u("q3p1"))],
)
def test_plain_desc(self):
table = self.tables.some_table
- lx = table.c.x.label('lx')
+ lx = table.c.x.label("lx")
self._assert_result(
- select([lx]).order_by(lx.desc()),
- [(3, ), (2, ), (1, )]
+ select([lx]).order_by(lx.desc()), [(3,), (2,), (1,)]
)
def test_composed_int_desc(self):
table = self.tables.some_table
- lx = (table.c.x + table.c.y).label('lx')
+ lx = (table.c.x + table.c.y).label("lx")
self._assert_result(
- select([lx]).order_by(lx.desc()),
- [(7, ), (5, ), (3, )]
+ select([lx]).order_by(lx.desc()), [(7,), (5,), (3,)]
)
@testing.requires.group_by_complex_expression
def test_group_by_composed(self):
table = self.tables.some_table
- expr = (table.c.x + table.c.y).label('lx')
- stmt = select([func.count(table.c.id), expr]).group_by(expr).order_by(expr)
- self._assert_result(
- stmt,
- [(1, 3), (1, 5), (1, 7)]
+ expr = (table.c.x + table.c.y).label("lx")
+ stmt = (
+ select([func.count(table.c.id), expr])
+ .group_by(expr)
+ .order_by(expr)
)
+ self._assert_result(stmt, [(1, 3), (1, 5), (1, 7)])
class LimitOffsetTest(fixtures.TablesTest):
@@ -145,10 +135,13 @@ class LimitOffsetTest(fixtures.TablesTest):
@classmethod
def define_tables(cls, metadata):
- Table("some_table", metadata,
- Column('id', Integer, primary_key=True),
- Column('x', Integer),
- Column('y', Integer))
+ Table(
+ "some_table",
+ metadata,
+ Column("id", Integer, primary_key=True),
+ Column("x", Integer),
+ Column("y", Integer),
+ )
@classmethod
def insert_data(cls):
@@ -159,20 +152,17 @@ class LimitOffsetTest(fixtures.TablesTest):
{"id": 2, "x": 2, "y": 3},
{"id": 3, "x": 3, "y": 4},
{"id": 4, "x": 4, "y": 5},
- ]
+ ],
)
def _assert_result(self, select, result, params=()):
- eq_(
- config.db.execute(select, params).fetchall(),
- result
- )
+ eq_(config.db.execute(select, params).fetchall(), result)
def test_simple_limit(self):
table = self.tables.some_table
self._assert_result(
select([table]).order_by(table.c.id).limit(2),
- [(1, 1, 2), (2, 2, 3)]
+ [(1, 1, 2), (2, 2, 3)],
)
@testing.requires.offset
@@ -180,7 +170,7 @@ class LimitOffsetTest(fixtures.TablesTest):
table = self.tables.some_table
self._assert_result(
select([table]).order_by(table.c.id).offset(2),
- [(3, 3, 4), (4, 4, 5)]
+ [(3, 3, 4), (4, 4, 5)],
)
@testing.requires.offset
@@ -188,7 +178,7 @@ class LimitOffsetTest(fixtures.TablesTest):
table = self.tables.some_table
self._assert_result(
select([table]).order_by(table.c.id).limit(2).offset(1),
- [(2, 2, 3), (3, 3, 4)]
+ [(2, 2, 3), (3, 3, 4)],
)
@testing.requires.offset
@@ -198,41 +188,40 @@ class LimitOffsetTest(fixtures.TablesTest):
table = self.tables.some_table
stmt = select([table]).order_by(table.c.id).limit(2).offset(1)
sql = stmt.compile(
- dialect=config.db.dialect,
- compile_kwargs={"literal_binds": True})
+ dialect=config.db.dialect, compile_kwargs={"literal_binds": True}
+ )
sql = str(sql)
- self._assert_result(
- sql,
- [(2, 2, 3), (3, 3, 4)]
- )
+ self._assert_result(sql, [(2, 2, 3), (3, 3, 4)])
@testing.requires.bound_limit_offset
def test_bound_limit(self):
table = self.tables.some_table
self._assert_result(
- select([table]).order_by(table.c.id).limit(bindparam('l')),
+ select([table]).order_by(table.c.id).limit(bindparam("l")),
[(1, 1, 2), (2, 2, 3)],
- params={"l": 2}
+ params={"l": 2},
)
@testing.requires.bound_limit_offset
def test_bound_offset(self):
table = self.tables.some_table
self._assert_result(
- select([table]).order_by(table.c.id).offset(bindparam('o')),
+ select([table]).order_by(table.c.id).offset(bindparam("o")),
[(3, 3, 4), (4, 4, 5)],
- params={"o": 2}
+ params={"o": 2},
)
@testing.requires.bound_limit_offset
def test_bound_limit_offset(self):
table = self.tables.some_table
self._assert_result(
- select([table]).order_by(table.c.id).
- limit(bindparam("l")).offset(bindparam("o")),
+ select([table])
+ .order_by(table.c.id)
+ .limit(bindparam("l"))
+ .offset(bindparam("o")),
[(2, 2, 3), (3, 3, 4)],
- params={"l": 2, "o": 1}
+ params={"l": 2, "o": 1},
)
@@ -241,10 +230,13 @@ class CompoundSelectTest(fixtures.TablesTest):
@classmethod
def define_tables(cls, metadata):
- Table("some_table", metadata,
- Column('id', Integer, primary_key=True),
- Column('x', Integer),
- Column('y', Integer))
+ Table(
+ "some_table",
+ metadata,
+ Column("id", Integer, primary_key=True),
+ Column("x", Integer),
+ Column("y", Integer),
+ )
@classmethod
def insert_data(cls):
@@ -255,14 +247,11 @@ class CompoundSelectTest(fixtures.TablesTest):
{"id": 2, "x": 2, "y": 3},
{"id": 3, "x": 3, "y": 4},
{"id": 4, "x": 4, "y": 5},
- ]
+ ],
)
def _assert_result(self, select, result, params=()):
- eq_(
- config.db.execute(select, params).fetchall(),
- result
- )
+ eq_(config.db.execute(select, params).fetchall(), result)
def test_plain_union(self):
table = self.tables.some_table
@@ -270,10 +259,7 @@ class CompoundSelectTest(fixtures.TablesTest):
s2 = select([table]).where(table.c.id == 3)
u1 = union(s1, s2)
- self._assert_result(
- u1.order_by(u1.c.id),
- [(2, 2, 3), (3, 3, 4)]
- )
+ self._assert_result(u1.order_by(u1.c.id), [(2, 2, 3), (3, 3, 4)])
def test_select_from_plain_union(self):
table = self.tables.some_table
@@ -281,80 +267,88 @@ class CompoundSelectTest(fixtures.TablesTest):
s2 = select([table]).where(table.c.id == 3)
u1 = union(s1, s2).alias().select()
- self._assert_result(
- u1.order_by(u1.c.id),
- [(2, 2, 3), (3, 3, 4)]
- )
+ self._assert_result(u1.order_by(u1.c.id), [(2, 2, 3), (3, 3, 4)])
@testing.requires.order_by_col_from_union
@testing.requires.parens_in_union_contained_select_w_limit_offset
def test_limit_offset_selectable_in_unions(self):
table = self.tables.some_table
- s1 = select([table]).where(table.c.id == 2).\
- limit(1).order_by(table.c.id)
- s2 = select([table]).where(table.c.id == 3).\
- limit(1).order_by(table.c.id)
+ s1 = (
+ select([table])
+ .where(table.c.id == 2)
+ .limit(1)
+ .order_by(table.c.id)
+ )
+ s2 = (
+ select([table])
+ .where(table.c.id == 3)
+ .limit(1)
+ .order_by(table.c.id)
+ )
u1 = union(s1, s2).limit(2)
- self._assert_result(
- u1.order_by(u1.c.id),
- [(2, 2, 3), (3, 3, 4)]
- )
+ self._assert_result(u1.order_by(u1.c.id), [(2, 2, 3), (3, 3, 4)])
@testing.requires.parens_in_union_contained_select_wo_limit_offset
def test_order_by_selectable_in_unions(self):
table = self.tables.some_table
- s1 = select([table]).where(table.c.id == 2).\
- order_by(table.c.id)
- s2 = select([table]).where(table.c.id == 3).\
- order_by(table.c.id)
+ s1 = select([table]).where(table.c.id == 2).order_by(table.c.id)
+ s2 = select([table]).where(table.c.id == 3).order_by(table.c.id)
u1 = union(s1, s2).limit(2)
- self._assert_result(
- u1.order_by(u1.c.id),
- [(2, 2, 3), (3, 3, 4)]
- )
+ self._assert_result(u1.order_by(u1.c.id), [(2, 2, 3), (3, 3, 4)])
def test_distinct_selectable_in_unions(self):
table = self.tables.some_table
- s1 = select([table]).where(table.c.id == 2).\
- distinct()
- s2 = select([table]).where(table.c.id == 3).\
- distinct()
+ s1 = select([table]).where(table.c.id == 2).distinct()
+ s2 = select([table]).where(table.c.id == 3).distinct()
u1 = union(s1, s2).limit(2)
- self._assert_result(
- u1.order_by(u1.c.id),
- [(2, 2, 3), (3, 3, 4)]
- )
+ self._assert_result(u1.order_by(u1.c.id), [(2, 2, 3), (3, 3, 4)])
@testing.requires.parens_in_union_contained_select_w_limit_offset
def test_limit_offset_in_unions_from_alias(self):
table = self.tables.some_table
- s1 = select([table]).where(table.c.id == 2).\
- limit(1).order_by(table.c.id)
- s2 = select([table]).where(table.c.id == 3).\
- limit(1).order_by(table.c.id)
+ s1 = (
+ select([table])
+ .where(table.c.id == 2)
+ .limit(1)
+ .order_by(table.c.id)
+ )
+ s2 = (
+ select([table])
+ .where(table.c.id == 3)
+ .limit(1)
+ .order_by(table.c.id)
+ )
# this necessarily has double parens
u1 = union(s1, s2).alias()
self._assert_result(
- u1.select().limit(2).order_by(u1.c.id),
- [(2, 2, 3), (3, 3, 4)]
+ u1.select().limit(2).order_by(u1.c.id), [(2, 2, 3), (3, 3, 4)]
)
def test_limit_offset_aliased_selectable_in_unions(self):
table = self.tables.some_table
- s1 = select([table]).where(table.c.id == 2).\
- limit(1).order_by(table.c.id).alias().select()
- s2 = select([table]).where(table.c.id == 3).\
- limit(1).order_by(table.c.id).alias().select()
+ s1 = (
+ select([table])
+ .where(table.c.id == 2)
+ .limit(1)
+ .order_by(table.c.id)
+ .alias()
+ .select()
+ )
+ s2 = (
+ select([table])
+ .where(table.c.id == 3)
+ .limit(1)
+ .order_by(table.c.id)
+ .alias()
+ .select()
+ )
u1 = union(s1, s2).limit(2)
- self._assert_result(
- u1.order_by(u1.c.id),
- [(2, 2, 3), (3, 3, 4)]
- )
+ self._assert_result(u1.order_by(u1.c.id), [(2, 2, 3), (3, 3, 4)])
class ExpandingBoundInTest(fixtures.TablesTest):
@@ -362,11 +356,14 @@ class ExpandingBoundInTest(fixtures.TablesTest):
@classmethod
def define_tables(cls, metadata):
- Table("some_table", metadata,
- Column('id', Integer, primary_key=True),
- Column('x', Integer),
- Column('y', Integer),
- Column('z', String(50)))
+ Table(
+ "some_table",
+ metadata,
+ Column("id", Integer, primary_key=True),
+ Column("x", Integer),
+ Column("y", Integer),
+ Column("z", String(50)),
+ )
@classmethod
def insert_data(cls):
@@ -377,178 +374,184 @@ class ExpandingBoundInTest(fixtures.TablesTest):
{"id": 2, "x": 2, "y": 3, "z": "z2"},
{"id": 3, "x": 3, "y": 4, "z": "z3"},
{"id": 4, "x": 4, "y": 5, "z": "z4"},
- ]
+ ],
)
def _assert_result(self, select, result, params=()):
- eq_(
- config.db.execute(select, params).fetchall(),
- result
- )
+ eq_(config.db.execute(select, params).fetchall(), result)
def test_multiple_empty_sets(self):
# test that any anonymous aliasing used by the dialect
# is fine with duplicates
table = self.tables.some_table
- stmt = select([table.c.id]).where(
- table.c.x.in_(bindparam('q', expanding=True))).where(
- table.c.y.in_(bindparam('p', expanding=True))
- ).order_by(table.c.id)
-
- self._assert_result(
- stmt,
- [],
- params={"q": [], "p": []},
+ stmt = (
+ select([table.c.id])
+ .where(table.c.x.in_(bindparam("q", expanding=True)))
+ .where(table.c.y.in_(bindparam("p", expanding=True)))
+ .order_by(table.c.id)
)
+ self._assert_result(stmt, [], params={"q": [], "p": []})
+
@testing.requires.tuple_in
def test_empty_heterogeneous_tuples(self):
table = self.tables.some_table
- stmt = select([table.c.id]).where(
- tuple_(table.c.x, table.c.z).in_(
- bindparam('q', expanding=True))).order_by(table.c.id)
-
- self._assert_result(
- stmt,
- [],
- params={"q": []},
+ stmt = (
+ select([table.c.id])
+ .where(
+ tuple_(table.c.x, table.c.z).in_(
+ bindparam("q", expanding=True)
+ )
+ )
+ .order_by(table.c.id)
)
+ self._assert_result(stmt, [], params={"q": []})
+
@testing.requires.tuple_in
def test_empty_homogeneous_tuples(self):
table = self.tables.some_table
- stmt = select([table.c.id]).where(
- tuple_(table.c.x, table.c.y).in_(
- bindparam('q', expanding=True))).order_by(table.c.id)
-
- self._assert_result(
- stmt,
- [],
- params={"q": []},
+ stmt = (
+ select([table.c.id])
+ .where(
+ tuple_(table.c.x, table.c.y).in_(
+ bindparam("q", expanding=True)
+ )
+ )
+ .order_by(table.c.id)
)
+ self._assert_result(stmt, [], params={"q": []})
+
def test_bound_in_scalar(self):
table = self.tables.some_table
- stmt = select([table.c.id]).where(
- table.c.x.in_(bindparam('q', expanding=True))).order_by(table.c.id)
-
- self._assert_result(
- stmt,
- [(2, ), (3, ), (4, )],
- params={"q": [2, 3, 4]},
+ stmt = (
+ select([table.c.id])
+ .where(table.c.x.in_(bindparam("q", expanding=True)))
+ .order_by(table.c.id)
)
+ self._assert_result(stmt, [(2,), (3,), (4,)], params={"q": [2, 3, 4]})
+
@testing.requires.tuple_in
def test_bound_in_two_tuple(self):
table = self.tables.some_table
- stmt = select([table.c.id]).where(
- tuple_(table.c.x, table.c.y).in_(
- bindparam('q', expanding=True))).order_by(table.c.id)
+ stmt = (
+ select([table.c.id])
+ .where(
+ tuple_(table.c.x, table.c.y).in_(
+ bindparam("q", expanding=True)
+ )
+ )
+ .order_by(table.c.id)
+ )
self._assert_result(
- stmt,
- [(2, ), (3, ), (4, )],
- params={"q": [(2, 3), (3, 4), (4, 5)]},
+ stmt, [(2,), (3,), (4,)], params={"q": [(2, 3), (3, 4), (4, 5)]}
)
@testing.requires.tuple_in
def test_bound_in_heterogeneous_two_tuple(self):
table = self.tables.some_table
- stmt = select([table.c.id]).where(
- tuple_(table.c.x, table.c.z).in_(
- bindparam('q', expanding=True))).order_by(table.c.id)
+ stmt = (
+ select([table.c.id])
+ .where(
+ tuple_(table.c.x, table.c.z).in_(
+ bindparam("q", expanding=True)
+ )
+ )
+ .order_by(table.c.id)
+ )
self._assert_result(
stmt,
- [(2, ), (3, ), (4, )],
+ [(2,), (3,), (4,)],
params={"q": [(2, "z2"), (3, "z3"), (4, "z4")]},
)
def test_empty_set_against_integer(self):
table = self.tables.some_table
- stmt = select([table.c.id]).where(
- table.c.x.in_(bindparam('q', expanding=True))).order_by(table.c.id)
-
- self._assert_result(
- stmt,
- [],
- params={"q": []},
+ stmt = (
+ select([table.c.id])
+ .where(table.c.x.in_(bindparam("q", expanding=True)))
+ .order_by(table.c.id)
)
+ self._assert_result(stmt, [], params={"q": []})
+
def test_empty_set_against_integer_negation(self):
table = self.tables.some_table
- stmt = select([table.c.id]).where(
- table.c.x.notin_(bindparam('q', expanding=True))
- ).order_by(table.c.id)
-
- self._assert_result(
- stmt,
- [(1, ), (2, ), (3, ), (4, )],
- params={"q": []},
+ stmt = (
+ select([table.c.id])
+ .where(table.c.x.notin_(bindparam("q", expanding=True)))
+ .order_by(table.c.id)
)
+ self._assert_result(stmt, [(1,), (2,), (3,), (4,)], params={"q": []})
+
def test_empty_set_against_string(self):
table = self.tables.some_table
- stmt = select([table.c.id]).where(
- table.c.z.in_(bindparam('q', expanding=True))).order_by(table.c.id)
-
- self._assert_result(
- stmt,
- [],
- params={"q": []},
+ stmt = (
+ select([table.c.id])
+ .where(table.c.z.in_(bindparam("q", expanding=True)))
+ .order_by(table.c.id)
)
+ self._assert_result(stmt, [], params={"q": []})
+
def test_empty_set_against_string_negation(self):
table = self.tables.some_table
- stmt = select([table.c.id]).where(
- table.c.z.notin_(bindparam('q', expanding=True))
- ).order_by(table.c.id)
-
- self._assert_result(
- stmt,
- [(1, ), (2, ), (3, ), (4, )],
- params={"q": []},
+ stmt = (
+ select([table.c.id])
+ .where(table.c.z.notin_(bindparam("q", expanding=True)))
+ .order_by(table.c.id)
)
+ self._assert_result(stmt, [(1,), (2,), (3,), (4,)], params={"q": []})
+
def test_null_in_empty_set_is_false(self):
- stmt = select([
- case(
- [
- (
- null().in_(bindparam('foo', value=(), expanding=True)),
- true()
- )
- ],
- else_=false()
- )
- ])
- in_(
- config.db.execute(stmt).fetchone()[0],
- (False, 0)
+ stmt = select(
+ [
+ case(
+ [
+ (
+ null().in_(
+ bindparam("foo", value=(), expanding=True)
+ ),
+ true(),
+ )
+ ],
+ else_=false(),
+ )
+ ]
)
+ in_(config.db.execute(stmt).fetchone()[0], (False, 0))
class LikeFunctionsTest(fixtures.TablesTest):
__backend__ = True
- run_inserts = 'once'
+ run_inserts = "once"
run_deletes = None
@classmethod
def define_tables(cls, metadata):
- Table("some_table", metadata,
- Column('id', Integer, primary_key=True),
- Column('data', String(50)))
+ Table(
+ "some_table",
+ metadata,
+ Column("id", Integer, primary_key=True),
+ Column("data", String(50)),
+ )
@classmethod
def insert_data(cls):
@@ -565,7 +568,7 @@ class LikeFunctionsTest(fixtures.TablesTest):
{"id": 8, "data": "ab9cdefg"},
{"id": 9, "data": "abcde#fg"},
{"id": 10, "data": "abcd9fg"},
- ]
+ ],
)
def _test(self, expr, expected):
@@ -573,8 +576,10 @@ class LikeFunctionsTest(fixtures.TablesTest):
with config.db.connect() as conn:
rows = {
- value for value, in
- conn.execute(select([some_table.c.id]).where(expr))
+ value
+ for value, in conn.execute(
+ select([some_table.c.id]).where(expr)
+ )
}
eq_(rows, expected)
@@ -591,7 +596,8 @@ class LikeFunctionsTest(fixtures.TablesTest):
col = self.tables.some_table.c.data
self._test(
col.startswith(literal_column("'ab%c'")),
- {1, 2, 3, 4, 5, 6, 7, 8, 9, 10})
+ {1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
+ )
def test_startswith_escape(self):
col = self.tables.some_table.c.data
@@ -608,8 +614,9 @@ class LikeFunctionsTest(fixtures.TablesTest):
def test_endswith_sqlexpr(self):
col = self.tables.some_table.c.data
- self._test(col.endswith(literal_column("'e%fg'")),
- {1, 2, 3, 4, 5, 6, 7, 8, 9})
+ self._test(
+ col.endswith(literal_column("'e%fg'")), {1, 2, 3, 4, 5, 6, 7, 8, 9}
+ )
def test_endswith_autoescape(self):
col = self.tables.some_table.c.data
@@ -640,5 +647,3 @@ class LikeFunctionsTest(fixtures.TablesTest):
col = self.tables.some_table.c.data
self._test(col.contains("b%cd", autoescape=True, escape="#"), {3})
self._test(col.contains("b#cd", autoescape=True, escape="#"), {7})
-
-
diff --git a/lib/sqlalchemy/testing/suite/test_sequence.py b/lib/sqlalchemy/testing/suite/test_sequence.py
index f1c00de6b..15a850fe9 100644
--- a/lib/sqlalchemy/testing/suite/test_sequence.py
+++ b/lib/sqlalchemy/testing/suite/test_sequence.py
@@ -9,140 +9,144 @@ from ..schema import Table, Column
class SequenceTest(fixtures.TablesTest):
- __requires__ = ('sequences',)
+ __requires__ = ("sequences",)
__backend__ = True
- run_create_tables = 'each'
+ run_create_tables = "each"
@classmethod
def define_tables(cls, metadata):
- Table('seq_pk', metadata,
- Column('id', Integer, Sequence('tab_id_seq'), primary_key=True),
- Column('data', String(50))
- )
+ Table(
+ "seq_pk",
+ metadata,
+ Column("id", Integer, Sequence("tab_id_seq"), primary_key=True),
+ Column("data", String(50)),
+ )
- Table('seq_opt_pk', metadata,
- Column('id', Integer, Sequence('tab_id_seq', optional=True),
- primary_key=True),
- Column('data', String(50))
- )
+ Table(
+ "seq_opt_pk",
+ metadata,
+ Column(
+ "id",
+ Integer,
+ Sequence("tab_id_seq", optional=True),
+ primary_key=True,
+ ),
+ Column("data", String(50)),
+ )
def test_insert_roundtrip(self):
- config.db.execute(
- self.tables.seq_pk.insert(),
- data="some data"
- )
+ config.db.execute(self.tables.seq_pk.insert(), data="some data")
self._assert_round_trip(self.tables.seq_pk, config.db)
def test_insert_lastrowid(self):
- r = config.db.execute(
- self.tables.seq_pk.insert(),
- data="some data"
- )
- eq_(
- r.inserted_primary_key,
- [1]
- )
+ r = config.db.execute(self.tables.seq_pk.insert(), data="some data")
+ eq_(r.inserted_primary_key, [1])
def test_nextval_direct(self):
- r = config.db.execute(
- self.tables.seq_pk.c.id.default
- )
- eq_(
- r, 1
- )
+ r = config.db.execute(self.tables.seq_pk.c.id.default)
+ eq_(r, 1)
@requirements.sequences_optional
def test_optional_seq(self):
r = config.db.execute(
- self.tables.seq_opt_pk.insert(),
- data="some data"
- )
- eq_(
- r.inserted_primary_key,
- [1]
+ self.tables.seq_opt_pk.insert(), data="some data"
)
+ eq_(r.inserted_primary_key, [1])
def _assert_round_trip(self, table, conn):
row = conn.execute(table.select()).first()
- eq_(
- row,
- (1, "some data")
- )
+ eq_(row, (1, "some data"))
class SequenceCompilerTest(testing.AssertsCompiledSQL, fixtures.TestBase):
- __requires__ = ('sequences',)
+ __requires__ = ("sequences",)
__backend__ = True
def test_literal_binds_inline_compile(self):
table = Table(
- 'x', MetaData(),
- Column('y', Integer, Sequence('y_seq')),
- Column('q', Integer))
+ "x",
+ MetaData(),
+ Column("y", Integer, Sequence("y_seq")),
+ Column("q", Integer),
+ )
stmt = table.insert().values(q=5)
seq_nextval = testing.db.dialect.statement_compiler(
- statement=None, dialect=testing.db.dialect).visit_sequence(
- Sequence("y_seq"))
+ statement=None, dialect=testing.db.dialect
+ ).visit_sequence(Sequence("y_seq"))
self.assert_compile(
stmt,
- "INSERT INTO x (y, q) VALUES (%s, 5)" % (seq_nextval, ),
+ "INSERT INTO x (y, q) VALUES (%s, 5)" % (seq_nextval,),
literal_binds=True,
- dialect=testing.db.dialect)
+ dialect=testing.db.dialect,
+ )
class HasSequenceTest(fixtures.TestBase):
- __requires__ = 'sequences',
+ __requires__ = ("sequences",)
__backend__ = True
def test_has_sequence(self):
- s1 = Sequence('user_id_seq')
+ s1 = Sequence("user_id_seq")
testing.db.execute(schema.CreateSequence(s1))
try:
- eq_(testing.db.dialect.has_sequence(testing.db,
- 'user_id_seq'), True)
+ eq_(
+ testing.db.dialect.has_sequence(testing.db, "user_id_seq"),
+ True,
+ )
finally:
testing.db.execute(schema.DropSequence(s1))
@testing.requires.schemas
def test_has_sequence_schema(self):
- s1 = Sequence('user_id_seq', schema=config.test_schema)
+ s1 = Sequence("user_id_seq", schema=config.test_schema)
testing.db.execute(schema.CreateSequence(s1))
try:
- eq_(testing.db.dialect.has_sequence(
- testing.db, 'user_id_seq', schema=config.test_schema), True)
+ eq_(
+ testing.db.dialect.has_sequence(
+ testing.db, "user_id_seq", schema=config.test_schema
+ ),
+ True,
+ )
finally:
testing.db.execute(schema.DropSequence(s1))
def test_has_sequence_neg(self):
- eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'),
- False)
+ eq_(testing.db.dialect.has_sequence(testing.db, "user_id_seq"), False)
@testing.requires.schemas
def test_has_sequence_schemas_neg(self):
- eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq',
- schema=config.test_schema),
- False)
+ eq_(
+ testing.db.dialect.has_sequence(
+ testing.db, "user_id_seq", schema=config.test_schema
+ ),
+ False,
+ )
@testing.requires.schemas
def test_has_sequence_default_not_in_remote(self):
- s1 = Sequence('user_id_seq')
+ s1 = Sequence("user_id_seq")
testing.db.execute(schema.CreateSequence(s1))
try:
- eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq',
- schema=config.test_schema),
- False)
+ eq_(
+ testing.db.dialect.has_sequence(
+ testing.db, "user_id_seq", schema=config.test_schema
+ ),
+ False,
+ )
finally:
testing.db.execute(schema.DropSequence(s1))
@testing.requires.schemas
def test_has_sequence_remote_not_in_default(self):
- s1 = Sequence('user_id_seq', schema=config.test_schema)
+ s1 = Sequence("user_id_seq", schema=config.test_schema)
testing.db.execute(schema.CreateSequence(s1))
try:
- eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'),
- False)
+ eq_(
+ testing.db.dialect.has_sequence(testing.db, "user_id_seq"),
+ False,
+ )
finally:
testing.db.execute(schema.DropSequence(s1))
diff --git a/lib/sqlalchemy/testing/suite/test_types.py b/lib/sqlalchemy/testing/suite/test_types.py
index 27c7bb115..6dfb80915 100644
--- a/lib/sqlalchemy/testing/suite/test_types.py
+++ b/lib/sqlalchemy/testing/suite/test_types.py
@@ -4,9 +4,24 @@ from .. import fixtures, config
from ..assertions import eq_
from ..config import requirements
from sqlalchemy import Integer, Unicode, UnicodeText, select, TIMESTAMP
-from sqlalchemy import Date, DateTime, Time, MetaData, String, \
- Text, Numeric, Float, literal, Boolean, cast, null, JSON, and_, \
- type_coerce, BigInteger
+from sqlalchemy import (
+ Date,
+ DateTime,
+ Time,
+ MetaData,
+ String,
+ Text,
+ Numeric,
+ Float,
+ literal,
+ Boolean,
+ cast,
+ null,
+ JSON,
+ and_,
+ type_coerce,
+ BigInteger,
+)
from ..schema import Table, Column
from ... import testing
import decimal
@@ -24,13 +39,17 @@ class _LiteralRoundTripFixture(object):
# into a typed column. we can then SELECT it back as its
# official type; ideally we'd be able to use CAST here
# but MySQL in particular can't CAST fully
- t = Table('t', self.metadata, Column('x', type_))
+ t = Table("t", self.metadata, Column("x", type_))
t.create()
for value in input_:
- ins = t.insert().values(x=literal(value)).compile(
- dialect=testing.db.dialect,
- compile_kwargs=dict(literal_binds=True)
+ ins = (
+ t.insert()
+ .values(x=literal(value))
+ .compile(
+ dialect=testing.db.dialect,
+ compile_kwargs=dict(literal_binds=True),
+ )
)
testing.db.execute(ins)
@@ -42,40 +61,33 @@ class _LiteralRoundTripFixture(object):
class _UnicodeFixture(_LiteralRoundTripFixture):
- __requires__ = 'unicode_data',
+ __requires__ = ("unicode_data",)
- data = u("Alors vous imaginez ma surprise, au lever du jour, "
- "quand une drôle de petite voix m’a réveillé. Elle "
- "disait: « S’il vous plaît… dessine-moi un mouton! »")
+ data = u(
+ "Alors vous imaginez ma surprise, au lever du jour, "
+ "quand une drôle de petite voix m’a réveillé. Elle "
+ "disait: « S’il vous plaît… dessine-moi un mouton! »"
+ )
@classmethod
def define_tables(cls, metadata):
- Table('unicode_table', metadata,
- Column('id', Integer, primary_key=True,
- test_needs_autoincrement=True),
- Column('unicode_data', cls.datatype),
- )
+ Table(
+ "unicode_table",
+ metadata,
+ Column(
+ "id", Integer, primary_key=True, test_needs_autoincrement=True
+ ),
+ Column("unicode_data", cls.datatype),
+ )
def test_round_trip(self):
unicode_table = self.tables.unicode_table
- config.db.execute(
- unicode_table.insert(),
- {
- 'unicode_data': self.data,
- }
- )
+ config.db.execute(unicode_table.insert(), {"unicode_data": self.data})
- row = config.db.execute(
- select([
- unicode_table.c.unicode_data,
- ])
- ).first()
+ row = config.db.execute(select([unicode_table.c.unicode_data])).first()
- eq_(
- row,
- (self.data, )
- )
+ eq_(row, (self.data,))
assert isinstance(row[0], util.text_type)
def test_round_trip_executemany(self):
@@ -83,44 +95,29 @@ class _UnicodeFixture(_LiteralRoundTripFixture):
config.db.execute(
unicode_table.insert(),
- [
- {
- 'unicode_data': self.data,
- }
- for i in range(3)
- ]
+ [{"unicode_data": self.data} for i in range(3)],
)
rows = config.db.execute(
- select([
- unicode_table.c.unicode_data,
- ])
+ select([unicode_table.c.unicode_data])
).fetchall()
- eq_(
- rows,
- [(self.data, ) for i in range(3)]
- )
+ eq_(rows, [(self.data,) for i in range(3)])
for row in rows:
assert isinstance(row[0], util.text_type)
def _test_empty_strings(self):
unicode_table = self.tables.unicode_table
- config.db.execute(
- unicode_table.insert(),
- {"unicode_data": u('')}
- )
- row = config.db.execute(
- select([unicode_table.c.unicode_data])
- ).first()
- eq_(row, (u(''),))
+ config.db.execute(unicode_table.insert(), {"unicode_data": u("")})
+ row = config.db.execute(select([unicode_table.c.unicode_data])).first()
+ eq_(row, (u(""),))
def test_literal(self):
self._literal_round_trip(self.datatype, [self.data], [self.data])
class UnicodeVarcharTest(_UnicodeFixture, fixtures.TablesTest):
- __requires__ = 'unicode_data',
+ __requires__ = ("unicode_data",)
__backend__ = True
datatype = Unicode(255)
@@ -131,7 +128,7 @@ class UnicodeVarcharTest(_UnicodeFixture, fixtures.TablesTest):
class UnicodeTextTest(_UnicodeFixture, fixtures.TablesTest):
- __requires__ = 'unicode_data', 'text_type'
+ __requires__ = "unicode_data", "text_type"
__backend__ = True
datatype = UnicodeText()
@@ -142,54 +139,47 @@ class UnicodeTextTest(_UnicodeFixture, fixtures.TablesTest):
class TextTest(_LiteralRoundTripFixture, fixtures.TablesTest):
- __requires__ = 'text_type',
+ __requires__ = ("text_type",)
__backend__ = True
@classmethod
def define_tables(cls, metadata):
- Table('text_table', metadata,
- Column('id', Integer, primary_key=True,
- test_needs_autoincrement=True),
- Column('text_data', Text),
- )
+ Table(
+ "text_table",
+ metadata,
+ Column(
+ "id", Integer, primary_key=True, test_needs_autoincrement=True
+ ),
+ Column("text_data", Text),
+ )
def test_text_roundtrip(self):
text_table = self.tables.text_table
- config.db.execute(
- text_table.insert(),
- {"text_data": 'some text'}
- )
- row = config.db.execute(
- select([text_table.c.text_data])
- ).first()
- eq_(row, ('some text',))
+ config.db.execute(text_table.insert(), {"text_data": "some text"})
+ row = config.db.execute(select([text_table.c.text_data])).first()
+ eq_(row, ("some text",))
def test_text_empty_strings(self):
text_table = self.tables.text_table
- config.db.execute(
- text_table.insert(),
- {"text_data": ''}
- )
- row = config.db.execute(
- select([text_table.c.text_data])
- ).first()
- eq_(row, ('',))
+ config.db.execute(text_table.insert(), {"text_data": ""})
+ row = config.db.execute(select([text_table.c.text_data])).first()
+ eq_(row, ("",))
def test_literal(self):
self._literal_round_trip(Text, ["some text"], ["some text"])
def test_literal_quoting(self):
- data = '''some 'text' hey "hi there" that's text'''
+ data = """some 'text' hey "hi there" that's text"""
self._literal_round_trip(Text, [data], [data])
def test_literal_backslashes(self):
- data = r'backslash one \ backslash two \\ end'
+ data = r"backslash one \ backslash two \\ end"
self._literal_round_trip(Text, [data], [data])
def test_literal_percentsigns(self):
- data = r'percent % signs %% percent'
+ data = r"percent % signs %% percent"
self._literal_round_trip(Text, [data], [data])
@@ -199,9 +189,7 @@ class StringTest(_LiteralRoundTripFixture, fixtures.TestBase):
@requirements.unbounded_varchar
def test_nolength_string(self):
metadata = MetaData()
- foo = Table('foo', metadata,
- Column('one', String)
- )
+ foo = Table("foo", metadata, Column("one", String))
foo.create(config.db)
foo.drop(config.db)
@@ -210,11 +198,11 @@ class StringTest(_LiteralRoundTripFixture, fixtures.TestBase):
self._literal_round_trip(String(40), ["some text"], ["some text"])
def test_literal_quoting(self):
- data = '''some 'text' hey "hi there" that's text'''
+ data = """some 'text' hey "hi there" that's text"""
self._literal_round_trip(String(40), [data], [data])
def test_literal_backslashes(self):
- data = r'backslash one \ backslash two \\ end'
+ data = r"backslash one \ backslash two \\ end"
self._literal_round_trip(String(40), [data], [data])
@@ -223,44 +211,32 @@ class _DateFixture(_LiteralRoundTripFixture):
@classmethod
def define_tables(cls, metadata):
- Table('date_table', metadata,
- Column('id', Integer, primary_key=True,
- test_needs_autoincrement=True),
- Column('date_data', cls.datatype),
- )
+ Table(
+ "date_table",
+ metadata,
+ Column(
+ "id", Integer, primary_key=True, test_needs_autoincrement=True
+ ),
+ Column("date_data", cls.datatype),
+ )
def test_round_trip(self):
date_table = self.tables.date_table
- config.db.execute(
- date_table.insert(),
- {'date_data': self.data}
- )
+ config.db.execute(date_table.insert(), {"date_data": self.data})
- row = config.db.execute(
- select([
- date_table.c.date_data,
- ])
- ).first()
+ row = config.db.execute(select([date_table.c.date_data])).first()
compare = self.compare or self.data
- eq_(row,
- (compare, ))
+ eq_(row, (compare,))
assert isinstance(row[0], type(compare))
def test_null(self):
date_table = self.tables.date_table
- config.db.execute(
- date_table.insert(),
- {'date_data': None}
- )
+ config.db.execute(date_table.insert(), {"date_data": None})
- row = config.db.execute(
- select([
- date_table.c.date_data,
- ])
- ).first()
+ row = config.db.execute(select([date_table.c.date_data])).first()
eq_(row, (None,))
@testing.requires.datetime_literals
@@ -270,48 +246,49 @@ class _DateFixture(_LiteralRoundTripFixture):
class DateTimeTest(_DateFixture, fixtures.TablesTest):
- __requires__ = 'datetime',
+ __requires__ = ("datetime",)
__backend__ = True
datatype = DateTime
data = datetime.datetime(2012, 10, 15, 12, 57, 18)
class DateTimeMicrosecondsTest(_DateFixture, fixtures.TablesTest):
- __requires__ = 'datetime_microseconds',
+ __requires__ = ("datetime_microseconds",)
__backend__ = True
datatype = DateTime
data = datetime.datetime(2012, 10, 15, 12, 57, 18, 396)
+
class TimestampMicrosecondsTest(_DateFixture, fixtures.TablesTest):
- __requires__ = 'timestamp_microseconds',
+ __requires__ = ("timestamp_microseconds",)
__backend__ = True
datatype = TIMESTAMP
data = datetime.datetime(2012, 10, 15, 12, 57, 18, 396)
class TimeTest(_DateFixture, fixtures.TablesTest):
- __requires__ = 'time',
+ __requires__ = ("time",)
__backend__ = True
datatype = Time
data = datetime.time(12, 57, 18)
class TimeMicrosecondsTest(_DateFixture, fixtures.TablesTest):
- __requires__ = 'time_microseconds',
+ __requires__ = ("time_microseconds",)
__backend__ = True
datatype = Time
data = datetime.time(12, 57, 18, 396)
class DateTest(_DateFixture, fixtures.TablesTest):
- __requires__ = 'date',
+ __requires__ = ("date",)
__backend__ = True
datatype = Date
data = datetime.date(2012, 10, 15)
class DateTimeCoercedToDateTimeTest(_DateFixture, fixtures.TablesTest):
- __requires__ = 'date', 'date_coerces_from_datetime'
+ __requires__ = "date", "date_coerces_from_datetime"
__backend__ = True
datatype = Date
data = datetime.datetime(2012, 10, 15, 12, 57, 18)
@@ -319,14 +296,14 @@ class DateTimeCoercedToDateTimeTest(_DateFixture, fixtures.TablesTest):
class DateTimeHistoricTest(_DateFixture, fixtures.TablesTest):
- __requires__ = 'datetime_historic',
+ __requires__ = ("datetime_historic",)
__backend__ = True
datatype = DateTime
data = datetime.datetime(1850, 11, 10, 11, 52, 35)
class DateHistoricTest(_DateFixture, fixtures.TablesTest):
- __requires__ = 'date_historic',
+ __requires__ = ("date_historic",)
__backend__ = True
datatype = Date
data = datetime.date(1727, 4, 1)
@@ -345,26 +322,21 @@ class IntegerTest(_LiteralRoundTripFixture, fixtures.TestBase):
def _round_trip(self, datatype, data):
metadata = self.metadata
int_table = Table(
- 'integer_table', metadata,
- Column('id', Integer, primary_key=True,
- test_needs_autoincrement=True),
- Column('integer_data', datatype),
+ "integer_table",
+ metadata,
+ Column(
+ "id", Integer, primary_key=True, test_needs_autoincrement=True
+ ),
+ Column("integer_data", datatype),
)
metadata.create_all(config.db)
- config.db.execute(
- int_table.insert(),
- {'integer_data': data}
- )
+ config.db.execute(int_table.insert(), {"integer_data": data})
- row = config.db.execute(
- select([
- int_table.c.integer_data,
- ])
- ).first()
+ row = config.db.execute(select([int_table.c.integer_data])).first()
- eq_(row, (data, ))
+ eq_(row, (data,))
if util.py3k:
assert isinstance(row[0], int)
@@ -377,12 +349,11 @@ class NumericTest(_LiteralRoundTripFixture, fixtures.TestBase):
@testing.emits_warning(r".*does \*not\* support Decimal objects natively")
@testing.provide_metadata
- def _do_test(self, type_, input_, output,
- filter_=None, check_scale=False):
+ def _do_test(self, type_, input_, output, filter_=None, check_scale=False):
metadata = self.metadata
- t = Table('t', metadata, Column('x', type_))
+ t = Table("t", metadata, Column("x", type_))
t.create()
- t.insert().execute([{'x': x} for x in input_])
+ t.insert().execute([{"x": x} for x in input_])
result = {row[0] for row in t.select().execute()}
output = set(output)
@@ -391,10 +362,7 @@ class NumericTest(_LiteralRoundTripFixture, fixtures.TestBase):
output = set(filter_(x) for x in output)
eq_(result, output)
if check_scale:
- eq_(
- [str(x) for x in result],
- [str(x) for x in output],
- )
+ eq_([str(x) for x in result], [str(x) for x in output])
@testing.emits_warning(r".*does \*not\* support Decimal objects natively")
def test_render_literal_numeric(self):
@@ -416,8 +384,8 @@ class NumericTest(_LiteralRoundTripFixture, fixtures.TestBase):
self._literal_round_trip(
Float(4),
[15.7563, decimal.Decimal("15.7563")],
- [15.7563, ],
- filter_=lambda n: n is not None and round(n, 5) or None
+ [15.7563],
+ filter_=lambda n: n is not None and round(n, 5) or None,
)
@testing.requires.precision_generic_float_type
@@ -425,8 +393,8 @@ class NumericTest(_LiteralRoundTripFixture, fixtures.TestBase):
self._do_test(
Float(None, decimal_return_scale=7, asdecimal=True),
[15.7563827, decimal.Decimal("15.7563827")],
- [decimal.Decimal("15.7563827"), ],
- check_scale=True
+ [decimal.Decimal("15.7563827")],
+ check_scale=True,
)
def test_numeric_as_decimal(self):
@@ -445,18 +413,12 @@ class NumericTest(_LiteralRoundTripFixture, fixtures.TestBase):
@testing.requires.fetch_null_from_numeric
def test_numeric_null_as_decimal(self):
- self._do_test(
- Numeric(precision=8, scale=4),
- [None],
- [None],
- )
+ self._do_test(Numeric(precision=8, scale=4), [None], [None])
@testing.requires.fetch_null_from_numeric
def test_numeric_null_as_float(self):
self._do_test(
- Numeric(precision=8, scale=4, asdecimal=False),
- [None],
- [None],
+ Numeric(precision=8, scale=4, asdecimal=False), [None], [None]
)
@testing.requires.floats_to_four_decimals
@@ -472,15 +434,13 @@ class NumericTest(_LiteralRoundTripFixture, fixtures.TestBase):
Float(precision=8),
[15.7563, decimal.Decimal("15.7563")],
[15.7563],
- filter_=lambda n: n is not None and round(n, 5) or None
+ filter_=lambda n: n is not None and round(n, 5) or None,
)
def test_float_coerce_round_trip(self):
expr = 15.7563
- val = testing.db.scalar(
- select([literal(expr)])
- )
+ val = testing.db.scalar(select([literal(expr)]))
eq_(val, expr)
# this does not work in MySQL, see #4036, however we choose not
@@ -491,34 +451,28 @@ class NumericTest(_LiteralRoundTripFixture, fixtures.TestBase):
def test_decimal_coerce_round_trip(self):
expr = decimal.Decimal("15.7563")
- val = testing.db.scalar(
- select([literal(expr)])
- )
+ val = testing.db.scalar(select([literal(expr)]))
eq_(val, expr)
@testing.emits_warning(r".*does \*not\* support Decimal objects natively")
def test_decimal_coerce_round_trip_w_cast(self):
expr = decimal.Decimal("15.7563")
- val = testing.db.scalar(
- select([cast(expr, Numeric(10, 4))])
- )
+ val = testing.db.scalar(select([cast(expr, Numeric(10, 4))]))
eq_(val, expr)
@testing.requires.precision_numerics_general
def test_precision_decimal(self):
- numbers = set([
- decimal.Decimal("54.234246451650"),
- decimal.Decimal("0.004354"),
- decimal.Decimal("900.0"),
- ])
-
- self._do_test(
- Numeric(precision=18, scale=12),
- numbers,
- numbers,
+ numbers = set(
+ [
+ decimal.Decimal("54.234246451650"),
+ decimal.Decimal("0.004354"),
+ decimal.Decimal("900.0"),
+ ]
)
+ self._do_test(Numeric(precision=18, scale=12), numbers, numbers)
+
@testing.requires.precision_numerics_enotation_large
def test_enotation_decimal(self):
"""test exceedingly small decimals.
@@ -528,25 +482,23 @@ class NumericTest(_LiteralRoundTripFixture, fixtures.TestBase):
"""
- numbers = set([
- decimal.Decimal('1E-2'),
- decimal.Decimal('1E-3'),
- decimal.Decimal('1E-4'),
- decimal.Decimal('1E-5'),
- decimal.Decimal('1E-6'),
- decimal.Decimal('1E-7'),
- decimal.Decimal('1E-8'),
- decimal.Decimal("0.01000005940696"),
- decimal.Decimal("0.00000005940696"),
- decimal.Decimal("0.00000000000696"),
- decimal.Decimal("0.70000000000696"),
- decimal.Decimal("696E-12"),
- ])
- self._do_test(
- Numeric(precision=18, scale=14),
- numbers,
- numbers
+ numbers = set(
+ [
+ decimal.Decimal("1E-2"),
+ decimal.Decimal("1E-3"),
+ decimal.Decimal("1E-4"),
+ decimal.Decimal("1E-5"),
+ decimal.Decimal("1E-6"),
+ decimal.Decimal("1E-7"),
+ decimal.Decimal("1E-8"),
+ decimal.Decimal("0.01000005940696"),
+ decimal.Decimal("0.00000005940696"),
+ decimal.Decimal("0.00000000000696"),
+ decimal.Decimal("0.70000000000696"),
+ decimal.Decimal("696E-12"),
+ ]
)
+ self._do_test(Numeric(precision=18, scale=14), numbers, numbers)
@testing.requires.precision_numerics_enotation_large
def test_enotation_decimal_large(self):
@@ -554,41 +506,32 @@ class NumericTest(_LiteralRoundTripFixture, fixtures.TestBase):
"""
- numbers = set([
- decimal.Decimal('4E+8'),
- decimal.Decimal("5748E+15"),
- decimal.Decimal('1.521E+15'),
- decimal.Decimal('00000000000000.1E+12'),
- ])
- self._do_test(
- Numeric(precision=25, scale=2),
- numbers,
- numbers
+ numbers = set(
+ [
+ decimal.Decimal("4E+8"),
+ decimal.Decimal("5748E+15"),
+ decimal.Decimal("1.521E+15"),
+ decimal.Decimal("00000000000000.1E+12"),
+ ]
)
+ self._do_test(Numeric(precision=25, scale=2), numbers, numbers)
@testing.requires.precision_numerics_many_significant_digits
def test_many_significant_digits(self):
- numbers = set([
- decimal.Decimal("31943874831932418390.01"),
- decimal.Decimal("319438950232418390.273596"),
- decimal.Decimal("87673.594069654243"),
- ])
- self._do_test(
- Numeric(precision=38, scale=12),
- numbers,
- numbers
+ numbers = set(
+ [
+ decimal.Decimal("31943874831932418390.01"),
+ decimal.Decimal("319438950232418390.273596"),
+ decimal.Decimal("87673.594069654243"),
+ ]
)
+ self._do_test(Numeric(precision=38, scale=12), numbers, numbers)
@testing.requires.precision_numerics_retains_significant_digits
def test_numeric_no_decimal(self):
- numbers = set([
- decimal.Decimal("1.000")
- ])
+ numbers = set([decimal.Decimal("1.000")])
self._do_test(
- Numeric(precision=5, scale=3),
- numbers,
- numbers,
- check_scale=True
+ Numeric(precision=5, scale=3), numbers, numbers, check_scale=True
)
@@ -597,42 +540,32 @@ class BooleanTest(_LiteralRoundTripFixture, fixtures.TablesTest):
@classmethod
def define_tables(cls, metadata):
- Table('boolean_table', metadata,
- Column('id', Integer, primary_key=True, autoincrement=False),
- Column('value', Boolean),
- Column('unconstrained_value', Boolean(create_constraint=False)),
- )
+ Table(
+ "boolean_table",
+ metadata,
+ Column("id", Integer, primary_key=True, autoincrement=False),
+ Column("value", Boolean),
+ Column("unconstrained_value", Boolean(create_constraint=False)),
+ )
def test_render_literal_bool(self):
- self._literal_round_trip(
- Boolean(),
- [True, False],
- [True, False]
- )
+ self._literal_round_trip(Boolean(), [True, False], [True, False])
def test_round_trip(self):
boolean_table = self.tables.boolean_table
config.db.execute(
boolean_table.insert(),
- {
- 'id': 1,
- 'value': True,
- 'unconstrained_value': False
- }
+ {"id": 1, "value": True, "unconstrained_value": False},
)
row = config.db.execute(
- select([
- boolean_table.c.value,
- boolean_table.c.unconstrained_value
- ])
+ select(
+ [boolean_table.c.value, boolean_table.c.unconstrained_value]
+ )
).first()
- eq_(
- row,
- (True, False)
- )
+ eq_(row, (True, False))
assert isinstance(row[0], bool)
def test_null(self):
@@ -640,24 +573,16 @@ class BooleanTest(_LiteralRoundTripFixture, fixtures.TablesTest):
config.db.execute(
boolean_table.insert(),
- {
- 'id': 1,
- 'value': None,
- 'unconstrained_value': None
- }
+ {"id": 1, "value": None, "unconstrained_value": None},
)
row = config.db.execute(
- select([
- boolean_table.c.value,
- boolean_table.c.unconstrained_value
- ])
+ select(
+ [boolean_table.c.value, boolean_table.c.unconstrained_value]
+ )
).first()
- eq_(
- row,
- (None, None)
- )
+ eq_(row, (None, None))
def test_whereclause(self):
# testing "WHERE <column>" renders a compatible expression
@@ -667,92 +592,82 @@ class BooleanTest(_LiteralRoundTripFixture, fixtures.TablesTest):
conn.execute(
boolean_table.insert(),
[
- {'id': 1, 'value': True, 'unconstrained_value': True},
- {'id': 2, 'value': False, 'unconstrained_value': False}
- ]
+ {"id": 1, "value": True, "unconstrained_value": True},
+ {"id": 2, "value": False, "unconstrained_value": False},
+ ],
)
eq_(
conn.scalar(
select([boolean_table.c.id]).where(boolean_table.c.value)
),
- 1
+ 1,
)
eq_(
conn.scalar(
select([boolean_table.c.id]).where(
- boolean_table.c.unconstrained_value)
+ boolean_table.c.unconstrained_value
+ )
),
- 1
+ 1,
)
eq_(
conn.scalar(
select([boolean_table.c.id]).where(~boolean_table.c.value)
),
- 2
+ 2,
)
eq_(
conn.scalar(
select([boolean_table.c.id]).where(
- ~boolean_table.c.unconstrained_value)
+ ~boolean_table.c.unconstrained_value
+ )
),
- 2
+ 2,
)
-
-
class JSONTest(_LiteralRoundTripFixture, fixtures.TablesTest):
- __requires__ = 'json_type',
+ __requires__ = ("json_type",)
__backend__ = True
datatype = JSON
- data1 = {
- "key1": "value1",
- "key2": "value2"
- }
+ data1 = {"key1": "value1", "key2": "value2"}
data2 = {
"Key 'One'": "value1",
"key two": "value2",
- "key three": "value ' three '"
+ "key three": "value ' three '",
}
data3 = {
"key1": [1, 2, 3],
"key2": ["one", "two", "three"],
- "key3": [{"four": "five"}, {"six": "seven"}]
+ "key3": [{"four": "five"}, {"six": "seven"}],
}
data4 = ["one", "two", "three"]
data5 = {
"nested": {
- "elem1": [
- {"a": "b", "c": "d"},
- {"e": "f", "g": "h"}
- ],
- "elem2": {
- "elem3": {"elem4": "elem5"}
- }
+ "elem1": [{"a": "b", "c": "d"}, {"e": "f", "g": "h"}],
+ "elem2": {"elem3": {"elem4": "elem5"}},
}
}
- data6 = {
- "a": 5,
- "b": "some value",
- "c": {"foo": "bar"}
- }
+ data6 = {"a": 5, "b": "some value", "c": {"foo": "bar"}}
@classmethod
def define_tables(cls, metadata):
- Table('data_table', metadata,
- Column('id', Integer, primary_key=True),
- Column('name', String(30), nullable=False),
- Column('data', cls.datatype),
- Column('nulldata', cls.datatype(none_as_null=True))
- )
+ Table(
+ "data_table",
+ metadata,
+ Column("id", Integer, primary_key=True),
+ Column("name", String(30), nullable=False),
+ Column("data", cls.datatype),
+ Column("nulldata", cls.datatype(none_as_null=True)),
+ )
def test_round_trip_data1(self):
self._test_round_trip(self.data1)
@@ -761,99 +676,82 @@ class JSONTest(_LiteralRoundTripFixture, fixtures.TablesTest):
data_table = self.tables.data_table
config.db.execute(
- data_table.insert(),
- {'name': 'row1', 'data': data_element}
+ data_table.insert(), {"name": "row1", "data": data_element}
)
- row = config.db.execute(
- select([
- data_table.c.data,
- ])
- ).first()
+ row = config.db.execute(select([data_table.c.data])).first()
- eq_(row, (data_element, ))
+ eq_(row, (data_element,))
def test_round_trip_none_as_sql_null(self):
- col = self.tables.data_table.c['nulldata']
+ col = self.tables.data_table.c["nulldata"]
with config.db.connect() as conn:
conn.execute(
- self.tables.data_table.insert(),
- {"name": "r1", "data": None}
+ self.tables.data_table.insert(), {"name": "r1", "data": None}
)
eq_(
conn.scalar(
- select([self.tables.data_table.c.name]).
- where(col.is_(null()))
+ select([self.tables.data_table.c.name]).where(
+ col.is_(null())
+ )
),
- "r1"
+ "r1",
)
- eq_(
- conn.scalar(
- select([col])
- ),
- None
- )
+ eq_(conn.scalar(select([col])), None)
def test_round_trip_json_null_as_json_null(self):
- col = self.tables.data_table.c['data']
+ col = self.tables.data_table.c["data"]
with config.db.connect() as conn:
conn.execute(
self.tables.data_table.insert(),
- {"name": "r1", "data": JSON.NULL}
+ {"name": "r1", "data": JSON.NULL},
)
eq_(
conn.scalar(
- select([self.tables.data_table.c.name]).
- where(cast(col, String) == 'null')
+ select([self.tables.data_table.c.name]).where(
+ cast(col, String) == "null"
+ )
),
- "r1"
+ "r1",
)
- eq_(
- conn.scalar(
- select([col])
- ),
- None
- )
+ eq_(conn.scalar(select([col])), None)
def test_round_trip_none_as_json_null(self):
- col = self.tables.data_table.c['data']
+ col = self.tables.data_table.c["data"]
with config.db.connect() as conn:
conn.execute(
- self.tables.data_table.insert(),
- {"name": "r1", "data": None}
+ self.tables.data_table.insert(), {"name": "r1", "data": None}
)
eq_(
conn.scalar(
- select([self.tables.data_table.c.name]).
- where(cast(col, String) == 'null')
+ select([self.tables.data_table.c.name]).where(
+ cast(col, String) == "null"
+ )
),
- "r1"
+ "r1",
)
- eq_(
- conn.scalar(
- select([col])
- ),
- None
- )
+ eq_(conn.scalar(select([col])), None)
def _criteria_fixture(self):
config.db.execute(
self.tables.data_table.insert(),
- [{"name": "r1", "data": self.data1},
- {"name": "r2", "data": self.data2},
- {"name": "r3", "data": self.data3},
- {"name": "r4", "data": self.data4},
- {"name": "r5", "data": self.data5},
- {"name": "r6", "data": self.data6}]
+ [
+ {"name": "r1", "data": self.data1},
+ {"name": "r2", "data": self.data2},
+ {"name": "r3", "data": self.data3},
+ {"name": "r4", "data": self.data4},
+ {"name": "r5", "data": self.data5},
+ {"name": "r6", "data": self.data6},
+ ],
)
def _test_index_criteria(self, crit, expected, test_literal=True):
@@ -861,20 +759,20 @@ class JSONTest(_LiteralRoundTripFixture, fixtures.TablesTest):
with config.db.connect() as conn:
stmt = select([self.tables.data_table.c.name]).where(crit)
- eq_(
- conn.scalar(stmt),
- expected
- )
+ eq_(conn.scalar(stmt), expected)
if test_literal:
- literal_sql = str(stmt.compile(
- config.db, compile_kwargs={"literal_binds": True}))
+ literal_sql = str(
+ stmt.compile(
+ config.db, compile_kwargs={"literal_binds": True}
+ )
+ )
eq_(conn.scalar(literal_sql), expected)
def test_crit_spaces_in_key(self):
name = self.tables.data_table.c.name
- col = self.tables.data_table.c['data']
+ col = self.tables.data_table.c["data"]
# limit the rows here to avoid PG error
# "cannot extract field from a non-object", which is
@@ -882,76 +780,74 @@ class JSONTest(_LiteralRoundTripFixture, fixtures.TablesTest):
self._test_index_criteria(
and_(
name.in_(["r1", "r2", "r3"]),
- cast(col["key two"], String) == '"value2"'
+ cast(col["key two"], String) == '"value2"',
),
- "r2"
+ "r2",
)
@config.requirements.json_array_indexes
def test_crit_simple_int(self):
name = self.tables.data_table.c.name
- col = self.tables.data_table.c['data']
+ col = self.tables.data_table.c["data"]
# limit the rows here to avoid PG error
# "cannot extract array element from a non-array", which is
# fixed in 9.4 but may exist in 9.3
self._test_index_criteria(
- and_(name == 'r4', cast(col[1], String) == '"two"'),
- "r4"
+ and_(name == "r4", cast(col[1], String) == '"two"'), "r4"
)
def test_crit_mixed_path(self):
- col = self.tables.data_table.c['data']
+ col = self.tables.data_table.c["data"]
self._test_index_criteria(
- cast(col[("key3", 1, "six")], String) == '"seven"',
- "r3"
+ cast(col[("key3", 1, "six")], String) == '"seven"', "r3"
)
def test_crit_string_path(self):
- col = self.tables.data_table.c['data']
+ col = self.tables.data_table.c["data"]
self._test_index_criteria(
cast(col[("nested", "elem2", "elem3", "elem4")], String)
== '"elem5"',
- "r5"
+ "r5",
)
def test_crit_against_string_basic(self):
name = self.tables.data_table.c.name
- col = self.tables.data_table.c['data']
+ col = self.tables.data_table.c["data"]
self._test_index_criteria(
- and_(name == 'r6', cast(col["b"], String) == '"some value"'),
- "r6"
+ and_(name == "r6", cast(col["b"], String) == '"some value"'), "r6"
)
def test_crit_against_string_coerce_type(self):
name = self.tables.data_table.c.name
- col = self.tables.data_table.c['data']
+ col = self.tables.data_table.c["data"]
self._test_index_criteria(
- and_(name == 'r6',
- cast(col["b"], String) == type_coerce("some value", JSON)),
+ and_(
+ name == "r6",
+ cast(col["b"], String) == type_coerce("some value", JSON),
+ ),
"r6",
- test_literal=False
+ test_literal=False,
)
def test_crit_against_int_basic(self):
name = self.tables.data_table.c.name
- col = self.tables.data_table.c['data']
+ col = self.tables.data_table.c["data"]
self._test_index_criteria(
- and_(name == 'r6', cast(col["a"], String) == '5'),
- "r6"
+ and_(name == "r6", cast(col["a"], String) == "5"), "r6"
)
def test_crit_against_int_coerce_type(self):
name = self.tables.data_table.c.name
- col = self.tables.data_table.c['data']
+ col = self.tables.data_table.c["data"]
self._test_index_criteria(
- and_(name == 'r6', cast(col["a"], String) == type_coerce(5, JSON)),
+ and_(name == "r6", cast(col["a"], String) == type_coerce(5, JSON)),
"r6",
- test_literal=False
+ test_literal=False,
)
def test_unicode_round_trip(self):
@@ -961,17 +857,17 @@ class JSONTest(_LiteralRoundTripFixture, fixtures.TablesTest):
{
"name": "r1",
"data": {
- util.u('réveillé'): util.u('réveillé'),
- "data": {"k1": util.u('drôle')}
- }
- }
+ util.u("réveillé"): util.u("réveillé"),
+ "data": {"k1": util.u("drôle")},
+ },
+ },
)
eq_(
conn.scalar(select([self.tables.data_table.c.data])),
{
- util.u('réveillé'): util.u('réveillé'),
- "data": {"k1": util.u('drôle')}
+ util.u("réveillé"): util.u("réveillé"),
+ "data": {"k1": util.u("drôle")},
},
)
@@ -986,7 +882,7 @@ class JSONTest(_LiteralRoundTripFixture, fixtures.TablesTest):
s = Session(testing.db)
- d1 = Data(name='d1', data=None, nulldata=None)
+ d1 = Data(name="d1", data=None, nulldata=None)
s.add(d1)
s.commit()
@@ -995,24 +891,46 @@ class JSONTest(_LiteralRoundTripFixture, fixtures.TablesTest):
)
eq_(
s.query(
- cast(self.tables.data_table.c.data, String(convert_unicode="force")),
- cast(self.tables.data_table.c.nulldata, String)
- ).filter(self.tables.data_table.c.name == 'd1').first(),
- ("null", None)
+ cast(
+ self.tables.data_table.c.data,
+ String(convert_unicode="force"),
+ ),
+ cast(self.tables.data_table.c.nulldata, String),
+ )
+ .filter(self.tables.data_table.c.name == "d1")
+ .first(),
+ ("null", None),
)
eq_(
s.query(
- cast(self.tables.data_table.c.data, String(convert_unicode="force")),
- cast(self.tables.data_table.c.nulldata, String)
- ).filter(self.tables.data_table.c.name == 'd2').first(),
- ("null", None)
- )
-
-
-__all__ = ('UnicodeVarcharTest', 'UnicodeTextTest', 'JSONTest',
- 'DateTest', 'DateTimeTest', 'TextTest',
- 'NumericTest', 'IntegerTest',
- 'DateTimeHistoricTest', 'DateTimeCoercedToDateTimeTest',
- 'TimeMicrosecondsTest', 'TimestampMicrosecondsTest', 'TimeTest',
- 'DateTimeMicrosecondsTest',
- 'DateHistoricTest', 'StringTest', 'BooleanTest')
+ cast(
+ self.tables.data_table.c.data,
+ String(convert_unicode="force"),
+ ),
+ cast(self.tables.data_table.c.nulldata, String),
+ )
+ .filter(self.tables.data_table.c.name == "d2")
+ .first(),
+ ("null", None),
+ )
+
+
+__all__ = (
+ "UnicodeVarcharTest",
+ "UnicodeTextTest",
+ "JSONTest",
+ "DateTest",
+ "DateTimeTest",
+ "TextTest",
+ "NumericTest",
+ "IntegerTest",
+ "DateTimeHistoricTest",
+ "DateTimeCoercedToDateTimeTest",
+ "TimeMicrosecondsTest",
+ "TimestampMicrosecondsTest",
+ "TimeTest",
+ "DateTimeMicrosecondsTest",
+ "DateHistoricTest",
+ "StringTest",
+ "BooleanTest",
+)
diff --git a/lib/sqlalchemy/testing/suite/test_update_delete.py b/lib/sqlalchemy/testing/suite/test_update_delete.py
index e4c61e74a..b232c3a78 100644
--- a/lib/sqlalchemy/testing/suite/test_update_delete.py
+++ b/lib/sqlalchemy/testing/suite/test_update_delete.py
@@ -6,15 +6,17 @@ from ..schema import Table, Column
class SimpleUpdateDeleteTest(fixtures.TablesTest):
- run_deletes = 'each'
+ run_deletes = "each"
__backend__ = True
@classmethod
def define_tables(cls, metadata):
- Table('plain_pk', metadata,
- Column('id', Integer, primary_key=True),
- Column('data', String(50))
- )
+ Table(
+ "plain_pk",
+ metadata,
+ Column("id", Integer, primary_key=True),
+ Column("data", String(50)),
+ )
@classmethod
def insert_data(cls):
@@ -24,40 +26,29 @@ class SimpleUpdateDeleteTest(fixtures.TablesTest):
{"id": 1, "data": "d1"},
{"id": 2, "data": "d2"},
{"id": 3, "data": "d3"},
- ]
+ ],
)
def test_update(self):
t = self.tables.plain_pk
- r = config.db.execute(
- t.update().where(t.c.id == 2),
- data="d2_new"
- )
+ r = config.db.execute(t.update().where(t.c.id == 2), data="d2_new")
assert not r.is_insert
assert not r.returns_rows
eq_(
config.db.execute(t.select().order_by(t.c.id)).fetchall(),
- [
- (1, "d1"),
- (2, "d2_new"),
- (3, "d3")
- ]
+ [(1, "d1"), (2, "d2_new"), (3, "d3")],
)
def test_delete(self):
t = self.tables.plain_pk
- r = config.db.execute(
- t.delete().where(t.c.id == 2)
- )
+ r = config.db.execute(t.delete().where(t.c.id == 2))
assert not r.is_insert
assert not r.returns_rows
eq_(
config.db.execute(t.select().order_by(t.c.id)).fetchall(),
- [
- (1, "d1"),
- (3, "d3")
- ]
+ [(1, "d1"), (3, "d3")],
)
-__all__ = ('SimpleUpdateDeleteTest', )
+
+__all__ = ("SimpleUpdateDeleteTest",)