diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2019-01-06 01:14:26 -0500 |
|---|---|---|
| committer | mike bayer <mike_mp@zzzcomputing.com> | 2019-01-06 17:34:50 +0000 |
| commit | 1e1a38e7801f410f244e4bbb44ec795ae152e04e (patch) | |
| tree | 28e725c5c8188bd0cfd133d1e268dbca9b524978 /test/sql/test_resultset.py | |
| parent | 404e69426b05a82d905cbb3ad33adafccddb00dd (diff) | |
| download | sqlalchemy-1e1a38e7801f410f244e4bbb44ec795ae152e04e.tar.gz | |
Run black -l 79 against all source files
This is a straight reformat run using black as is, with no edits
applied at all.
The black run will format code consistently, however in
some cases that are prevalent in SQLAlchemy code it produces
too-long lines. The too-long lines will be resolved in the
following commit that will resolve all remaining flake8 issues
including shadowed builtins, long lines, import order, unused
imports, duplicate imports, and docstring issues.
Change-Id: I7eda77fed3d8e73df84b3651fd6cfcfe858d4dc9
Diffstat (limited to 'test/sql/test_resultset.py')
| -rw-r--r-- | test/sql/test_resultset.py | 1007 |
1 files changed, 527 insertions, 480 deletions
diff --git a/test/sql/test_resultset.py b/test/sql/test_resultset.py index 8e6279708..c5e1efef6 100644 --- a/test/sql/test_resultset.py +++ b/test/sql/test_resultset.py @@ -1,12 +1,36 @@ -from sqlalchemy.testing import eq_, assert_raises_message, assert_raises, \ - in_, not_in_, is_, ne_, le_ +from sqlalchemy.testing import ( + eq_, + assert_raises_message, + assert_raises, + in_, + not_in_, + is_, + ne_, + le_, +) from sqlalchemy import testing from sqlalchemy.testing import fixtures, engines from sqlalchemy import util from sqlalchemy import ( - exc, sql, func, select, String, Integer, MetaData, ForeignKey, - VARCHAR, INT, CHAR, text, type_coerce, literal_column, - TypeDecorator, table, column, literal) + exc, + sql, + func, + select, + String, + Integer, + MetaData, + ForeignKey, + VARCHAR, + INT, + CHAR, + text, + type_coerce, + literal_column, + TypeDecorator, + table, + column, + literal, +) from sqlalchemy.engine import result as _result from sqlalchemy.testing.schema import Table, Column import operator @@ -23,37 +47,43 @@ class ResultProxyTest(fixtures.TablesTest): @classmethod def define_tables(cls, metadata): Table( - 'users', metadata, + "users", + metadata, Column( - 'user_id', INT, primary_key=True, - test_needs_autoincrement=True), - Column('user_name', VARCHAR(20)), - test_needs_acid=True + "user_id", INT, primary_key=True, test_needs_autoincrement=True + ), + Column("user_name", VARCHAR(20)), + test_needs_acid=True, ) Table( - 'addresses', metadata, + "addresses", + metadata, Column( - 'address_id', Integer, primary_key=True, - test_needs_autoincrement=True), - Column('user_id', Integer, ForeignKey('users.user_id')), - Column('address', String(30)), - test_needs_acid=True + "address_id", + Integer, + primary_key=True, + test_needs_autoincrement=True, + ), + Column("user_id", Integer, ForeignKey("users.user_id")), + Column("address", String(30)), + test_needs_acid=True, ) Table( - 'users2', metadata, - Column('user_id', INT, primary_key=True), - Column('user_name', VARCHAR(20)), - test_needs_acid=True + "users2", + metadata, + Column("user_id", INT, primary_key=True), + Column("user_name", VARCHAR(20)), + test_needs_acid=True, ) def test_row_iteration(self): users = self.tables.users users.insert().execute( - {'user_id': 7, 'user_name': 'jack'}, - {'user_id': 8, 'user_name': 'ed'}, - {'user_id': 9, 'user_name': 'fred'}, + {"user_id": 7, "user_name": "jack"}, + {"user_id": 8, "user_name": "ed"}, + {"user_id": 9, "user_name": "fred"}, ) r = users.select().execute() rows = [] @@ -65,15 +95,15 @@ class ResultProxyTest(fixtures.TablesTest): users = self.tables.users users.insert().execute( - {'user_id': 7, 'user_name': 'jack'}, - {'user_id': 8, 'user_name': 'ed'}, - {'user_id': 9, 'user_name': 'fred'}, + {"user_id": 7, "user_name": "jack"}, + {"user_id": 8, "user_name": "ed"}, + {"user_id": 9, "user_name": "fred"}, ) r = users.select().execute() rows = [] while True: - row = next(r, 'foo') - if row == 'foo': + row = next(r, "foo") + if row == "foo": break rows.append(row) eq_(len(rows), 3) @@ -83,27 +113,30 @@ class ResultProxyTest(fixtures.TablesTest): users = self.tables.users users.insert().execute( - {'user_id': 7, 'user_name': 'jack'}, - {'user_id': 8, 'user_name': 'ed'}, - {'user_id': 9, 'user_name': 'fred'}, + {"user_id": 7, "user_name": "jack"}, + {"user_id": 8, "user_name": "ed"}, + {"user_id": 9, "user_name": "fred"}, ) - sel = select([users.c.user_id]).where(users.c.user_name == 'jack'). \ - as_scalar() + sel = ( + select([users.c.user_id]) + .where(users.c.user_name == "jack") + .as_scalar() + ) for row in select([sel + 1, sel + 3], bind=users.bind).execute(): - eq_(row['anon_1'], 8) - eq_(row['anon_2'], 10) + eq_(row["anon_1"], 8) + eq_(row["anon_2"], 10) def test_row_comparison(self): users = self.tables.users - users.insert().execute(user_id=7, user_name='jack') + users.insert().execute(user_id=7, user_name="jack") rp = users.select().execute().first() eq_(rp, rp) - is_(not(rp != rp), True) + is_(not (rp != rp), True) - equal = (7, 'jack') + equal = (7, "jack") eq_(rp, equal) eq_(equal, rp) @@ -113,15 +146,20 @@ class ResultProxyTest(fixtures.TablesTest): def endless(): while True: yield 1 + ne_(rp, endless()) ne_(endless(), rp) # test that everything compares the same # as it would against a tuple - for compare in [False, 8, endless(), 'xyz', (7, 'jack')]: + for compare in [False, 8, endless(), "xyz", (7, "jack")]: for op in [ - operator.eq, operator.ne, operator.gt, - operator.lt, operator.ge, operator.le + operator.eq, + operator.ne, + operator.gt, + operator.lt, + operator.ge, + operator.le, ]: try: @@ -142,94 +180,94 @@ class ResultProxyTest(fixtures.TablesTest): @testing.provide_metadata def test_column_label_overlap_fallback(self): - content = Table( - 'content', self.metadata, - Column('type', String(30)), - ) - bar = Table( - 'bar', self.metadata, - Column('content_type', String(30)) - ) + content = Table("content", self.metadata, Column("type", String(30))) + bar = Table("bar", self.metadata, Column("content_type", String(30))) self.metadata.create_all(testing.db) testing.db.execute(content.insert().values(type="t1")) row = testing.db.execute(content.select(use_labels=True)).first() in_(content.c.type, row) not_in_(bar.c.content_type, row) - in_(sql.column('content_type'), row) + in_(sql.column("content_type"), row) row = testing.db.execute( - select([content.c.type.label("content_type")])).first() + select([content.c.type.label("content_type")]) + ).first() in_(content.c.type, row) not_in_(bar.c.content_type, row) - in_(sql.column('content_type'), row) + in_(sql.column("content_type"), row) - row = testing.db.execute(select([func.now().label("content_type")])). \ - first() + row = testing.db.execute( + select([func.now().label("content_type")]) + ).first() not_in_(content.c.type, row) not_in_(bar.c.content_type, row) - in_(sql.column('content_type'), row) + in_(sql.column("content_type"), row) def test_pickled_rows(self): users = self.tables.users addresses = self.tables.addresses users.insert().execute( - {'user_id': 7, 'user_name': 'jack'}, - {'user_id': 8, 'user_name': 'ed'}, - {'user_id': 9, 'user_name': 'fred'}, + {"user_id": 7, "user_name": "jack"}, + {"user_id": 8, "user_name": "ed"}, + {"user_id": 9, "user_name": "fred"}, ) for pickle in False, True: for use_labels in False, True: - result = users.select(use_labels=use_labels).order_by( - users.c.user_id).execute().fetchall() + result = ( + users.select(use_labels=use_labels) + .order_by(users.c.user_id) + .execute() + .fetchall() + ) if pickle: result = util.pickle.loads(util.pickle.dumps(result)) - eq_( - result, - [(7, "jack"), (8, "ed"), (9, "fred")] - ) + eq_(result, [(7, "jack"), (8, "ed"), (9, "fred")]) if use_labels: - eq_(result[0]['users_user_id'], 7) + eq_(result[0]["users_user_id"], 7) eq_( list(result[0].keys()), - ["users_user_id", "users_user_name"]) + ["users_user_id", "users_user_name"], + ) else: - eq_(result[0]['user_id'], 7) + eq_(result[0]["user_id"], 7) eq_(list(result[0].keys()), ["user_id", "user_name"]) eq_(result[0][0], 7) eq_(result[0][users.c.user_id], 7) - eq_(result[0][users.c.user_name], 'jack') + eq_(result[0][users.c.user_name], "jack") if not pickle or use_labels: assert_raises( exc.NoSuchColumnError, - lambda: result[0][addresses.c.user_id]) + lambda: result[0][addresses.c.user_id], + ) else: # test with a different table. name resolution is # causing 'user_id' to match when use_labels wasn't used. eq_(result[0][addresses.c.user_id], 7) assert_raises( - exc.NoSuchColumnError, lambda: result[0]['fake key']) + exc.NoSuchColumnError, lambda: result[0]["fake key"] + ) assert_raises( exc.NoSuchColumnError, - lambda: result[0][addresses.c.address_id]) + lambda: result[0][addresses.c.address_id], + ) def test_column_error_printing(self): result = testing.db.execute(select([1])) row = result.first() class unprintable(object): - def __str__(self): raise ValueError("nope") @@ -242,25 +280,21 @@ class ResultProxyTest(fixtures.TablesTest): (unprintable(), "unprintable element.*"), ]: assert_raises_message( - exc.NoSuchColumnError, - msg % repl, - result._getter, accessor + exc.NoSuchColumnError, msg % repl, result._getter, accessor ) is_(result._getter(accessor, False), None) assert_raises_message( - exc.NoSuchColumnError, - msg % repl, - lambda: row[accessor] + exc.NoSuchColumnError, msg % repl, lambda: row[accessor] ) def test_fetchmany(self): users = self.tables.users - users.insert().execute(user_id=7, user_name='jack') - users.insert().execute(user_id=8, user_name='ed') - users.insert().execute(user_id=9, user_name='fred') + users.insert().execute(user_id=7, user_name="jack") + users.insert().execute(user_id=8, user_name="ed") + users.insert().execute(user_id=9, user_name="fred") r = users.select().execute() rows = [] for row in r.fetchmany(size=2): @@ -271,82 +305,80 @@ class ResultProxyTest(fixtures.TablesTest): users = self.tables.users addresses = self.tables.addresses - users.insert().execute(user_id=1, user_name='john') - users.insert().execute(user_id=2, user_name='jack') + users.insert().execute(user_id=1, user_name="john") + users.insert().execute(user_id=2, user_name="jack") addresses.insert().execute( - address_id=1, user_id=2, address='foo@bar.com') + address_id=1, user_id=2, address="foo@bar.com" + ) - r = text( - "select * from addresses", bind=testing.db).execute().first() + r = text("select * from addresses", bind=testing.db).execute().first() eq_(r[0:1], (1,)) - eq_(r[1:], (2, 'foo@bar.com')) + eq_(r[1:], (2, "foo@bar.com")) eq_(r[:-1], (1, 2)) def test_column_accessor_basic_compiled(self): users = self.tables.users users.insert().execute( - dict(user_id=1, user_name='john'), - dict(user_id=2, user_name='jack') + dict(user_id=1, user_name="john"), + dict(user_id=2, user_name="jack"), ) r = users.select(users.c.user_id == 2).execute().first() eq_(r.user_id, 2) - eq_(r['user_id'], 2) + eq_(r["user_id"], 2) eq_(r[users.c.user_id], 2) - eq_(r.user_name, 'jack') - eq_(r['user_name'], 'jack') - eq_(r[users.c.user_name], 'jack') + eq_(r.user_name, "jack") + eq_(r["user_name"], "jack") + eq_(r[users.c.user_name], "jack") def test_column_accessor_basic_text(self): users = self.tables.users users.insert().execute( - dict(user_id=1, user_name='john'), - dict(user_id=2, user_name='jack') + dict(user_id=1, user_name="john"), + dict(user_id=2, user_name="jack"), ) r = testing.db.execute( - text("select * from users where user_id=2")).first() + text("select * from users where user_id=2") + ).first() eq_(r.user_id, 2) - eq_(r['user_id'], 2) + eq_(r["user_id"], 2) eq_(r[users.c.user_id], 2) - eq_(r.user_name, 'jack') - eq_(r['user_name'], 'jack') - eq_(r[users.c.user_name], 'jack') + eq_(r.user_name, "jack") + eq_(r["user_name"], "jack") + eq_(r[users.c.user_name], "jack") def test_column_accessor_textual_select(self): users = self.tables.users users.insert().execute( - dict(user_id=1, user_name='john'), - dict(user_id=2, user_name='jack') + dict(user_id=1, user_name="john"), + dict(user_id=2, user_name="jack"), ) # this will create column() objects inside # the select(), these need to match on name anyway r = testing.db.execute( - select([ - column('user_id'), column('user_name') - ]).select_from(table('users')). - where(text('user_id=2')) + select([column("user_id"), column("user_name")]) + .select_from(table("users")) + .where(text("user_id=2")) ).first() eq_(r.user_id, 2) - eq_(r['user_id'], 2) + eq_(r["user_id"], 2) eq_(r[users.c.user_id], 2) - eq_(r.user_name, 'jack') - eq_(r['user_name'], 'jack') - eq_(r[users.c.user_name], 'jack') + eq_(r.user_name, "jack") + eq_(r["user_name"], "jack") + eq_(r[users.c.user_name], "jack") def test_column_accessor_dotted_union(self): users = self.tables.users - users.insert().execute( - dict(user_id=1, user_name='john'), - ) + users.insert().execute(dict(user_id=1, user_name="john")) # test a little sqlite < 3.10.0 weirdness - with the UNION, # cols come back as "users.user_id" in cursor.description @@ -358,106 +390,120 @@ class ResultProxyTest(fixtures.TablesTest): "users.user_name from users" ) ).first() - eq_(r['user_id'], 1) - eq_(r['user_name'], "john") + eq_(r["user_id"], 1) + eq_(r["user_name"], "john") eq_(list(r.keys()), ["user_id", "user_name"]) def test_column_accessor_sqlite_raw(self): users = self.tables.users - users.insert().execute( - dict(user_id=1, user_name='john'), - ) + users.insert().execute(dict(user_id=1, user_name="john")) - r = text( - "select users.user_id, users.user_name " - "from users " - "UNION select users.user_id, " - "users.user_name from users", - bind=testing.db).execution_options(sqlite_raw_colnames=True). \ - execute().first() + r = ( + text( + "select users.user_id, users.user_name " + "from users " + "UNION select users.user_id, " + "users.user_name from users", + bind=testing.db, + ) + .execution_options(sqlite_raw_colnames=True) + .execute() + .first() + ) if testing.against("sqlite < 3.10.0"): - not_in_('user_id', r) - not_in_('user_name', r) - eq_(r['users.user_id'], 1) - eq_(r['users.user_name'], "john") + not_in_("user_id", r) + not_in_("user_name", r) + eq_(r["users.user_id"], 1) + eq_(r["users.user_name"], "john") eq_(list(r.keys()), ["users.user_id", "users.user_name"]) else: - not_in_('users.user_id', r) - not_in_('users.user_name', r) - eq_(r['user_id'], 1) - eq_(r['user_name'], "john") + not_in_("users.user_id", r) + not_in_("users.user_name", r) + eq_(r["user_id"], 1) + eq_(r["user_name"], "john") eq_(list(r.keys()), ["user_id", "user_name"]) def test_column_accessor_sqlite_translated(self): users = self.tables.users - users.insert().execute( - dict(user_id=1, user_name='john'), - ) + users.insert().execute(dict(user_id=1, user_name="john")) - r = text( - "select users.user_id, users.user_name " - "from users " - "UNION select users.user_id, " - "users.user_name from users", - bind=testing.db).execute().first() - eq_(r['user_id'], 1) - eq_(r['user_name'], "john") + r = ( + text( + "select users.user_id, users.user_name " + "from users " + "UNION select users.user_id, " + "users.user_name from users", + bind=testing.db, + ) + .execute() + .first() + ) + eq_(r["user_id"], 1) + eq_(r["user_name"], "john") if testing.against("sqlite < 3.10.0"): - eq_(r['users.user_id'], 1) - eq_(r['users.user_name'], "john") + eq_(r["users.user_id"], 1) + eq_(r["users.user_name"], "john") else: - not_in_('users.user_id', r) - not_in_('users.user_name', r) + not_in_("users.user_id", r) + not_in_("users.user_name", r) eq_(list(r.keys()), ["user_id", "user_name"]) def test_column_accessor_labels_w_dots(self): users = self.tables.users - users.insert().execute( - dict(user_id=1, user_name='john'), - ) + users.insert().execute(dict(user_id=1, user_name="john")) # test using literal tablename.colname - r = text( - 'select users.user_id AS "users.user_id", ' - 'users.user_name AS "users.user_name" ' - 'from users', bind=testing.db).\ - execution_options(sqlite_raw_colnames=True).execute().first() - eq_(r['users.user_id'], 1) - eq_(r['users.user_name'], "john") + r = ( + text( + 'select users.user_id AS "users.user_id", ' + 'users.user_name AS "users.user_name" ' + "from users", + bind=testing.db, + ) + .execution_options(sqlite_raw_colnames=True) + .execute() + .first() + ) + eq_(r["users.user_id"], 1) + eq_(r["users.user_name"], "john") not_in_("user_name", r) eq_(list(r.keys()), ["users.user_id", "users.user_name"]) def test_column_accessor_unary(self): users = self.tables.users - users.insert().execute( - dict(user_id=1, user_name='john'), - ) + users.insert().execute(dict(user_id=1, user_name="john")) # unary expressions - r = select([users.c.user_name.distinct()]).order_by( - users.c.user_name).execute().first() - eq_(r[users.c.user_name], 'john') - eq_(r.user_name, 'john') + r = ( + select([users.c.user_name.distinct()]) + .order_by(users.c.user_name) + .execute() + .first() + ) + eq_(r[users.c.user_name], "john") + eq_(r.user_name, "john") def test_column_accessor_err(self): r = testing.db.execute(select([1])).first() assert_raises_message( AttributeError, "Could not locate column in row for column 'foo'", - getattr, r, "foo" + getattr, + r, + "foo", ) assert_raises_message( KeyError, "Could not locate column in row for column 'foo'", - lambda: r['foo'] + lambda: r["foo"], ) def test_graceful_fetch_on_non_rows(self): @@ -481,8 +527,8 @@ class ResultProxyTest(fixtures.TablesTest): lambda r: r.first(), lambda r: r.scalar(), lambda r: r.fetchmany(), - lambda r: r._getter('user'), - lambda r: r._has_key('user'), + lambda r: r._getter("user"), + lambda r: r._has_key("user"), ]: trans = conn.begin() result = conn.execute(users.insert(), user_id=1) @@ -490,7 +536,8 @@ class ResultProxyTest(fixtures.TablesTest): exc.ResourceClosedError, "This result object does not return rows. " "It has been closed automatically.", - meth, result, + meth, + result, ) trans.rollback() @@ -503,19 +550,17 @@ class ResultProxyTest(fixtures.TablesTest): assert_raises_message( exc.ResourceClosedError, "This result object is closed.", - result.fetchone + result.fetchone, ) def test_connectionless_autoclose_rows_exhausted(self): users = self.tables.users - users.insert().execute( - dict(user_id=1, user_name='john'), - ) + users.insert().execute(dict(user_id=1, user_name="john")) result = testing.db.execute("select * from users") connection = result.connection assert not connection.closed - eq_(result.fetchone(), (1, 'john')) + eq_(result.fetchone(), (1, "john")) assert not connection.closed eq_(result.fetchone(), None) assert connection.closed @@ -523,12 +568,15 @@ class ResultProxyTest(fixtures.TablesTest): @testing.requires.returning def test_connectionless_autoclose_crud_rows_exhausted(self): users = self.tables.users - stmt = users.insert().values(user_id=1, user_name='john').\ - returning(users.c.user_id) + stmt = ( + users.insert() + .values(user_id=1, user_name="john") + .returning(users.c.user_id) + ) result = testing.db.execute(stmt) connection = result.connection assert not connection.closed - eq_(result.fetchone(), (1, )) + eq_(result.fetchone(), (1,)) assert not connection.closed eq_(result.fetchone(), None) assert connection.closed @@ -548,15 +596,17 @@ class ResultProxyTest(fixtures.TablesTest): assert_raises_message( exc.ResourceClosedError, "This result object does not return rows.", - result.fetchone + result.fetchone, ) def test_row_case_sensitive(self): row = testing.db.execute( - select([ - literal_column("1").label("case_insensitive"), - literal_column("2").label("CaseSensitive") - ]) + select( + [ + literal_column("1").label("case_insensitive"), + literal_column("2").label("CaseSensitive"), + ] + ) ).first() eq_(list(row.keys()), ["case_insensitive", "CaseSensitive"]) @@ -568,28 +618,25 @@ class ResultProxyTest(fixtures.TablesTest): eq_(row["case_insensitive"], 1) eq_(row["CaseSensitive"], 2) - assert_raises( - KeyError, - lambda: row["Case_insensitive"] - ) - assert_raises( - KeyError, - lambda: row["casesensitive"] - ) + assert_raises(KeyError, lambda: row["Case_insensitive"]) + assert_raises(KeyError, lambda: row["casesensitive"]) def test_row_case_sensitive_unoptimized(self): ins_db = engines.testing_engine(options={"case_sensitive": True}) row = ins_db.execute( - select([ - literal_column("1").label("case_insensitive"), - literal_column("2").label("CaseSensitive"), - text("3 AS screw_up_the_cols") - ]) + select( + [ + literal_column("1").label("case_insensitive"), + literal_column("2").label("CaseSensitive"), + text("3 AS screw_up_the_cols"), + ] + ) ).first() eq_( list(row.keys()), - ["case_insensitive", "CaseSensitive", "screw_up_the_cols"]) + ["case_insensitive", "CaseSensitive", "screw_up_the_cols"], + ) in_("case_insensitive", row._keymap) in_("CaseSensitive", row._keymap) @@ -606,10 +653,12 @@ class ResultProxyTest(fixtures.TablesTest): def test_row_case_insensitive(self): ins_db = engines.testing_engine(options={"case_sensitive": False}) row = ins_db.execute( - select([ - literal_column("1").label("case_insensitive"), - literal_column("2").label("CaseSensitive") - ]) + select( + [ + literal_column("1").label("case_insensitive"), + literal_column("2").label("CaseSensitive"), + ] + ) ).first() eq_(list(row.keys()), ["case_insensitive", "CaseSensitive"]) @@ -626,16 +675,19 @@ class ResultProxyTest(fixtures.TablesTest): def test_row_case_insensitive_unoptimized(self): ins_db = engines.testing_engine(options={"case_sensitive": False}) row = ins_db.execute( - select([ - literal_column("1").label("case_insensitive"), - literal_column("2").label("CaseSensitive"), - text("3 AS screw_up_the_cols") - ]) + select( + [ + literal_column("1").label("case_insensitive"), + literal_column("2").label("CaseSensitive"), + text("3 AS screw_up_the_cols"), + ] + ) ).first() eq_( list(row.keys()), - ["case_insensitive", "CaseSensitive", "screw_up_the_cols"]) + ["case_insensitive", "CaseSensitive", "screw_up_the_cols"], + ) in_("case_insensitive", row._keymap) in_("CaseSensitive", row._keymap) @@ -651,24 +703,27 @@ class ResultProxyTest(fixtures.TablesTest): def test_row_as_args(self): users = self.tables.users - users.insert().execute(user_id=1, user_name='john') + users.insert().execute(user_id=1, user_name="john") r = users.select(users.c.user_id == 1).execute().first() users.delete().execute() users.insert().execute(r) - eq_(users.select().execute().fetchall(), [(1, 'john')]) + eq_(users.select().execute().fetchall(), [(1, "john")]) def test_result_as_args(self): users = self.tables.users users2 = self.tables.users2 - users.insert().execute([ - dict(user_id=1, user_name='john'), - dict(user_id=2, user_name='ed')]) + users.insert().execute( + [ + dict(user_id=1, user_name="john"), + dict(user_id=2, user_name="ed"), + ] + ) r = users.select().execute() users2.insert().execute(list(r)) eq_( users2.select().order_by(users2.c.user_id).execute().fetchall(), - [(1, 'john'), (2, 'ed')] + [(1, "john"), (2, "ed")], ) users2.delete().execute() @@ -676,7 +731,7 @@ class ResultProxyTest(fixtures.TablesTest): users2.insert().execute(*list(r)) eq_( users2.select().order_by(users2.c.user_id).execute().fetchall(), - [(1, 'john'), (2, 'ed')] + [(1, "john"), (2, "ed")], ) @testing.requires.duplicate_names_in_cursor_description @@ -684,14 +739,14 @@ class ResultProxyTest(fixtures.TablesTest): users = self.tables.users addresses = self.tables.addresses - users.insert().execute(user_id=1, user_name='john') + users.insert().execute(user_id=1, user_name="john") result = users.outerjoin(addresses).select().execute() r = result.first() assert_raises_message( exc.InvalidRequestError, "Ambiguous column name", - lambda: r['user_id'] + lambda: r["user_id"], ) # pure positional targeting; users.c.user_id @@ -702,18 +757,18 @@ class ResultProxyTest(fixtures.TablesTest): # try to trick it - fake_table isn't in the result! # we get the correct error - fake_table = Table('fake', MetaData(), Column('user_id', Integer)) + fake_table = Table("fake", MetaData(), Column("user_id", Integer)) assert_raises_message( exc.InvalidRequestError, "Could not locate column in row for column 'fake.user_id'", - lambda: r[fake_table.c.user_id] + lambda: r[fake_table.c.user_id], ) r = util.pickle.loads(util.pickle.dumps(r)) assert_raises_message( exc.InvalidRequestError, "Ambiguous column name", - lambda: r['user_id'] + lambda: r["user_id"], ) result = users.outerjoin(addresses).select().execute() @@ -723,14 +778,14 @@ class ResultProxyTest(fixtures.TablesTest): assert_raises_message( exc.InvalidRequestError, "Ambiguous column name", - lambda: r['user_id'] + lambda: r["user_id"], ) @testing.requires.duplicate_names_in_cursor_description def test_ambiguous_column_by_col(self): users = self.tables.users - users.insert().execute(user_id=1, user_name='john') + users.insert().execute(user_id=1, user_name="john") ua = users.alias() u2 = users.alias() result = select([users.c.user_id, ua.c.user_id]).execute() @@ -747,22 +802,26 @@ class ResultProxyTest(fixtures.TablesTest): assert_raises_message( exc.InvalidRequestError, "Could not locate column in row", - lambda: row[u2.c.user_id] + lambda: row[u2.c.user_id], ) @testing.requires.duplicate_names_in_cursor_description def test_ambiguous_column_case_sensitive(self): eng = engines.testing_engine(options=dict(case_sensitive=False)) - row = eng.execute(select([ - literal_column('1').label('SOMECOL'), - literal_column('1').label('SOMECOL'), - ])).first() + row = eng.execute( + select( + [ + literal_column("1").label("SOMECOL"), + literal_column("1").label("SOMECOL"), + ] + ) + ).first() assert_raises_message( exc.InvalidRequestError, "Ambiguous column name", - lambda: row['somecol'] + lambda: row["somecol"], ) @testing.requires.duplicate_names_in_cursor_description @@ -773,47 +832,45 @@ class ResultProxyTest(fixtures.TablesTest): # ticket 2702. in 0.7 we'd get True, False. # in 0.8, both columns are present so it's True; # but when they're fetched you'll get the ambiguous error. - users.insert().execute(user_id=1, user_name='john') - result = select([users.c.user_id, addresses.c.user_id]).\ - select_from(users.outerjoin(addresses)).execute() + users.insert().execute(user_id=1, user_name="john") + result = ( + select([users.c.user_id, addresses.c.user_id]) + .select_from(users.outerjoin(addresses)) + .execute() + ) row = result.first() eq_( set([users.c.user_id in row, addresses.c.user_id in row]), - set([True]) + set([True]), ) def test_ambiguous_column_by_col_plus_label(self): users = self.tables.users - users.insert().execute(user_id=1, user_name='john') + users.insert().execute(user_id=1, user_name="john") result = select( - [users.c.user_id, - type_coerce(users.c.user_id, Integer).label('foo')]).execute() + [ + users.c.user_id, + type_coerce(users.c.user_id, Integer).label("foo"), + ] + ).execute() row = result.first() - eq_( - row[users.c.user_id], 1 - ) - eq_( - row[1], 1 - ) + eq_(row[users.c.user_id], 1) + eq_(row[1], 1) def test_fetch_partial_result_map(self): users = self.tables.users - users.insert().execute(user_id=7, user_name='ed') + users.insert().execute(user_id=7, user_name="ed") - t = text("select * from users").columns( - user_name=String() - ) - eq_( - testing.db.execute(t).fetchall(), [(7, 'ed')] - ) + t = text("select * from users").columns(user_name=String()) + eq_(testing.db.execute(t).fetchall(), [(7, "ed")]) def test_fetch_unordered_result_map(self): users = self.tables.users - users.insert().execute(user_id=7, user_name='ed') + users.insert().execute(user_id=7, user_name="ed") class Goofy1(TypeDecorator): impl = String @@ -835,166 +892,155 @@ class ResultProxyTest(fixtures.TablesTest): t = text( "select user_name as a, user_name as b, " - "user_name as c from users").columns( - a=Goofy1(), b=Goofy2(), c=Goofy3() - ) - eq_( - testing.db.execute(t).fetchall(), [ - ('eda', 'edb', 'edc') - ] - ) + "user_name as c from users" + ).columns(a=Goofy1(), b=Goofy2(), c=Goofy3()) + eq_(testing.db.execute(t).fetchall(), [("eda", "edb", "edc")]) @testing.requires.subqueries def test_column_label_targeting(self): users = self.tables.users - users.insert().execute(user_id=7, user_name='ed') + users.insert().execute(user_id=7, user_name="ed") for s in ( - users.select().alias('foo'), + users.select().alias("foo"), users.select().alias(users.name), ): row = s.select(use_labels=True).execute().first() eq_(row[s.c.user_id], 7) - eq_(row[s.c.user_name], 'ed') + eq_(row[s.c.user_name], "ed") def test_keys(self): users = self.tables.users - users.insert().execute(user_id=1, user_name='foo') + users.insert().execute(user_id=1, user_name="foo") result = users.select().execute() - eq_( - result.keys(), - ['user_id', 'user_name'] - ) + eq_(result.keys(), ["user_id", "user_name"]) row = result.first() - eq_( - row.keys(), - ['user_id', 'user_name'] - ) + eq_(row.keys(), ["user_id", "user_name"]) def test_keys_anon_labels(self): """test [ticket:3483]""" users = self.tables.users - users.insert().execute(user_id=1, user_name='foo') + users.insert().execute(user_id=1, user_name="foo") result = testing.db.execute( - select([ - users.c.user_id, - users.c.user_name.label(None), - func.count(literal_column('1'))]). - group_by(users.c.user_id, users.c.user_name) + select( + [ + users.c.user_id, + users.c.user_name.label(None), + func.count(literal_column("1")), + ] + ).group_by(users.c.user_id, users.c.user_name) ) - eq_( - result.keys(), - ['user_id', 'user_name_1', 'count_1'] - ) + eq_(result.keys(), ["user_id", "user_name_1", "count_1"]) row = result.first() - eq_( - row.keys(), - ['user_id', 'user_name_1', 'count_1'] - ) + eq_(row.keys(), ["user_id", "user_name_1", "count_1"]) def test_items(self): users = self.tables.users - users.insert().execute(user_id=1, user_name='foo') + users.insert().execute(user_id=1, user_name="foo") r = users.select().execute().first() eq_( [(x[0].lower(), x[1]) for x in list(r.items())], - [('user_id', 1), ('user_name', 'foo')]) + [("user_id", 1), ("user_name", "foo")], + ) def test_len(self): users = self.tables.users - users.insert().execute(user_id=1, user_name='foo') + users.insert().execute(user_id=1, user_name="foo") r = users.select().execute().first() eq_(len(r), 2) - r = testing.db.execute('select user_name, user_id from users'). \ - first() + r = testing.db.execute("select user_name, user_id from users").first() eq_(len(r), 2) - r = testing.db.execute('select user_name from users').first() + r = testing.db.execute("select user_name from users").first() eq_(len(r), 1) def test_sorting_in_python(self): users = self.tables.users users.insert().execute( - dict(user_id=1, user_name='foo'), - dict(user_id=2, user_name='bar'), - dict(user_id=3, user_name='def'), + dict(user_id=1, user_name="foo"), + dict(user_id=2, user_name="bar"), + dict(user_id=3, user_name="def"), ) rows = users.select().order_by(users.c.user_name).execute().fetchall() - eq_(rows, [(2, 'bar'), (3, 'def'), (1, 'foo')]) + eq_(rows, [(2, "bar"), (3, "def"), (1, "foo")]) - eq_(sorted(rows), [(1, 'foo'), (2, 'bar'), (3, 'def')]) + eq_(sorted(rows), [(1, "foo"), (2, "bar"), (3, "def")]) def test_column_order_with_simple_query(self): # should return values in column definition order users = self.tables.users - users.insert().execute(user_id=1, user_name='foo') + users.insert().execute(user_id=1, user_name="foo") r = users.select(users.c.user_id == 1).execute().first() eq_(r[0], 1) - eq_(r[1], 'foo') - eq_([x.lower() for x in list(r.keys())], ['user_id', 'user_name']) - eq_(list(r.values()), [1, 'foo']) + eq_(r[1], "foo") + eq_([x.lower() for x in list(r.keys())], ["user_id", "user_name"]) + eq_(list(r.values()), [1, "foo"]) def test_column_order_with_text_query(self): # should return values in query order users = self.tables.users - users.insert().execute(user_id=1, user_name='foo') - r = testing.db.execute('select user_name, user_id from users'). \ - first() - eq_(r[0], 'foo') + users.insert().execute(user_id=1, user_name="foo") + r = testing.db.execute("select user_name, user_id from users").first() + eq_(r[0], "foo") eq_(r[1], 1) - eq_([x.lower() for x in list(r.keys())], ['user_name', 'user_id']) - eq_(list(r.values()), ['foo', 1]) + eq_([x.lower() for x in list(r.keys())], ["user_name", "user_id"]) + eq_(list(r.values()), ["foo", 1]) - @testing.crashes('oracle', 'FIXME: unknown, varify not fails_on()') - @testing.crashes('firebird', 'An identifier must begin with a letter') + @testing.crashes("oracle", "FIXME: unknown, varify not fails_on()") + @testing.crashes("firebird", "An identifier must begin with a letter") @testing.provide_metadata def test_column_accessor_shadow(self): shadowed = Table( - 'test_shadowed', self.metadata, - Column('shadow_id', INT, primary_key=True), - Column('shadow_name', VARCHAR(20)), - Column('parent', VARCHAR(20)), - Column('row', VARCHAR(40)), - Column('_parent', VARCHAR(20)), - Column('_row', VARCHAR(20)), + "test_shadowed", + self.metadata, + Column("shadow_id", INT, primary_key=True), + Column("shadow_name", VARCHAR(20)), + Column("parent", VARCHAR(20)), + Column("row", VARCHAR(40)), + Column("_parent", VARCHAR(20)), + Column("_row", VARCHAR(20)), ) self.metadata.create_all() shadowed.insert().execute( - shadow_id=1, shadow_name='The Shadow', parent='The Light', - row='Without light there is no shadow', - _parent='Hidden parent', _row='Hidden row') + shadow_id=1, + shadow_name="The Shadow", + parent="The Light", + row="Without light there is no shadow", + _parent="Hidden parent", + _row="Hidden row", + ) r = shadowed.select(shadowed.c.shadow_id == 1).execute().first() eq_(r.shadow_id, 1) - eq_(r['shadow_id'], 1) + eq_(r["shadow_id"], 1) eq_(r[shadowed.c.shadow_id], 1) - eq_(r.shadow_name, 'The Shadow') - eq_(r['shadow_name'], 'The Shadow') - eq_(r[shadowed.c.shadow_name], 'The Shadow') + eq_(r.shadow_name, "The Shadow") + eq_(r["shadow_name"], "The Shadow") + eq_(r[shadowed.c.shadow_name], "The Shadow") - eq_(r.parent, 'The Light') - eq_(r['parent'], 'The Light') - eq_(r[shadowed.c.parent], 'The Light') + eq_(r.parent, "The Light") + eq_(r["parent"], "The Light") + eq_(r[shadowed.c.parent], "The Light") - eq_(r.row, 'Without light there is no shadow') - eq_(r['row'], 'Without light there is no shadow') - eq_(r[shadowed.c.row], 'Without light there is no shadow') + eq_(r.row, "Without light there is no shadow") + eq_(r["row"], "Without light there is no shadow") + eq_(r[shadowed.c.row], "Without light there is no shadow") - eq_(r['_parent'], 'Hidden parent') - eq_(r['_row'], 'Hidden row') + eq_(r["_parent"], "Hidden parent") + eq_(r["_row"], "Hidden row") def test_nontuple_row(self): """ensure the C version of BaseRowProxy handles @@ -1003,7 +1049,6 @@ class ResultProxyTest(fixtures.TablesTest): from sqlalchemy.engine import RowProxy class MyList(object): - def __init__(self, data): self.internal_list = data @@ -1013,11 +1058,15 @@ class ResultProxyTest(fixtures.TablesTest): def __getitem__(self, i): return list.__getitem__(self.internal_list, i) - proxy = RowProxy(object(), MyList(['value']), [None], { - 'key': (None, None, 0), 0: (None, None, 0)}) - eq_(list(proxy), ['value']) - eq_(proxy[0], 'value') - eq_(proxy['key'], 'value') + proxy = RowProxy( + object(), + MyList(["value"]), + [None], + {"key": (None, None, 0), 0: (None, None, 0)}, + ) + eq_(list(proxy), ["value"]) + eq_(proxy[0], "value") + eq_(proxy["key"], "value") @testing.provide_metadata def test_no_rowcount_on_selects_inserts(self): @@ -1033,28 +1082,26 @@ class ResultProxyTest(fixtures.TablesTest): engine = engines.testing_engine() - t = Table('t1', metadata, - Column('data', String(10)) - ) + t = Table("t1", metadata, Column("data", String(10))) metadata.create_all(engine) with patch.object( - engine.dialect.execution_ctx_cls, "rowcount") as mock_rowcount: + engine.dialect.execution_ctx_cls, "rowcount" + ) as mock_rowcount: mock_rowcount.__get__ = Mock() - engine.execute(t.insert(), - {'data': 'd1'}, - {'data': 'd2'}, - {'data': 'd3'}) + engine.execute( + t.insert(), {"data": "d1"}, {"data": "d2"}, {"data": "d3"} + ) eq_(len(mock_rowcount.__get__.mock_calls), 0) eq_( engine.execute(t.select()).fetchall(), - [('d1', ), ('d2', ), ('d3', )] + [("d1",), ("d2",), ("d3",)], ) eq_(len(mock_rowcount.__get__.mock_calls), 0) - engine.execute(t.update(), {'data': 'd4'}) + engine.execute(t.update(), {"data": "d4"}) eq_(len(mock_rowcount.__get__.mock_calls), 1) @@ -1066,58 +1113,66 @@ class ResultProxyTest(fixtures.TablesTest): from sqlalchemy.engine import RowProxy row = RowProxy( - object(), ['value'], [None], - {'key': (None, None, 0), 0: (None, None, 0)}) + object(), + ["value"], + [None], + {"key": (None, None, 0), 0: (None, None, 0)}, + ) assert isinstance(row, collections_abc.Sequence) @testing.provide_metadata def test_rowproxy_getitem_indexes_compiled(self): - values = Table('rp', self.metadata, - Column('key', String(10), primary_key=True), - Column('value', String(10))) + values = Table( + "rp", + self.metadata, + Column("key", String(10), primary_key=True), + Column("value", String(10)), + ) values.create() - testing.db.execute(values.insert(), dict(key='One', value='Uno')) + testing.db.execute(values.insert(), dict(key="One", value="Uno")) row = testing.db.execute(values.select()).first() - eq_(row['key'], 'One') - eq_(row['value'], 'Uno') - eq_(row[0], 'One') - eq_(row[1], 'Uno') - eq_(row[-2], 'One') - eq_(row[-1], 'Uno') - eq_(row[1:0:-1], ('Uno',)) + eq_(row["key"], "One") + eq_(row["value"], "Uno") + eq_(row[0], "One") + eq_(row[1], "Uno") + eq_(row[-2], "One") + eq_(row[-1], "Uno") + eq_(row[1:0:-1], ("Uno",)) @testing.only_on("sqlite") def test_rowproxy_getitem_indexes_raw(self): row = testing.db.execute("select 'One' as key, 'Uno' as value").first() - eq_(row['key'], 'One') - eq_(row['value'], 'Uno') - eq_(row[0], 'One') - eq_(row[1], 'Uno') - eq_(row[-2], 'One') - eq_(row[-1], 'Uno') - eq_(row[1:0:-1], ('Uno',)) + eq_(row["key"], "One") + eq_(row["value"], "Uno") + eq_(row[0], "One") + eq_(row[1], "Uno") + eq_(row[-2], "One") + eq_(row[-1], "Uno") + eq_(row[1:0:-1], ("Uno",)) @testing.requires.cextensions def test_row_c_sequence_check(self): import csv metadata = MetaData() - metadata.bind = 'sqlite://' - users = Table('users', metadata, - Column('id', Integer, primary_key=True), - Column('name', String(40)), - ) + metadata.bind = "sqlite://" + users = Table( + "users", + metadata, + Column("id", Integer, primary_key=True), + Column("name", String(40)), + ) users.create() - users.insert().execute(name='Test') + users.insert().execute(name="Test") row = users.select().execute().fetchone() s = util.StringIO() writer = csv.writer(s) # csv performs PySequenceCheck call writer.writerow(row) - assert s.getvalue().strip() == '1,Test' + assert s.getvalue().strip() == "1,Test" @testing.requires.selectone def test_empty_accessors(self): @@ -1129,33 +1184,28 @@ class ResultProxyTest(fixtures.TablesTest): lambda r: r.last_updated_params(), lambda r: r.prefetch_cols(), lambda r: r.postfetch_cols(), - lambda r: r.inserted_primary_key + lambda r: r.inserted_primary_key, ], - "Statement is not a compiled expression construct." + "Statement is not a compiled expression construct.", ), ( select([1]), [ lambda r: r.last_inserted_params(), - lambda r: r.inserted_primary_key + lambda r: r.inserted_primary_key, ], - r"Statement is not an insert\(\) expression construct." + r"Statement is not an insert\(\) expression construct.", ), ( select([1]), - [ - lambda r: r.last_updated_params(), - ], - r"Statement is not an update\(\) expression construct." + [lambda r: r.last_updated_params()], + r"Statement is not an update\(\) expression construct.", ), ( select([1]), - [ - lambda r: r.prefetch_cols(), - lambda r: r.postfetch_cols() - ], + [lambda r: r.prefetch_cols(), lambda r: r.postfetch_cols()], r"Statement is not an insert\(\) " - r"or update\(\) expression construct." + r"or update\(\) expression construct.", ), ] @@ -1164,9 +1214,7 @@ class ResultProxyTest(fixtures.TablesTest): try: for meth in meths: assert_raises_message( - sa_exc.InvalidRequestError, - msg, - meth, r + sa_exc.InvalidRequestError, msg, meth, r ) finally: @@ -1174,28 +1222,31 @@ class ResultProxyTest(fixtures.TablesTest): class KeyTargetingTest(fixtures.TablesTest): - run_inserts = 'once' + run_inserts = "once" run_deletes = None __backend__ = True @classmethod def define_tables(cls, metadata): Table( - 'keyed1', metadata, Column("a", CHAR(2), key="b"), - Column("c", CHAR(2), key="q") + "keyed1", + metadata, + Column("a", CHAR(2), key="b"), + Column("c", CHAR(2), key="q"), ) - Table('keyed2', metadata, Column("a", CHAR(2)), Column("b", CHAR(2))) - Table('keyed3', metadata, Column("a", CHAR(2)), Column("d", CHAR(2))) - Table('keyed4', metadata, Column("b", CHAR(2)), Column("q", CHAR(2))) - Table('content', metadata, Column('t', String(30), key="type")) - Table('bar', metadata, Column('ctype', String(30), key="content_type")) + Table("keyed2", metadata, Column("a", CHAR(2)), Column("b", CHAR(2))) + Table("keyed3", metadata, Column("a", CHAR(2)), Column("d", CHAR(2))) + Table("keyed4", metadata, Column("b", CHAR(2)), Column("q", CHAR(2))) + Table("content", metadata, Column("t", String(30), key="type")) + Table("bar", metadata, Column("ctype", String(30), key="content_type")) if testing.requires.schemas.enabled: Table( - 'wschema', metadata, + "wschema", + metadata, Column("a", CHAR(2), key="b"), Column("c", CHAR(2), key="q"), - schema=testing.config.test_schema + schema=testing.config.test_schema, ) @classmethod @@ -1208,12 +1259,12 @@ class KeyTargetingTest(fixtures.TablesTest): if testing.requires.schemas.enabled: cls.tables[ - '%s.wschema' % testing.config.test_schema].insert().execute( - dict(b="a1", q="c1")) + "%s.wschema" % testing.config.test_schema + ].insert().execute(dict(b="a1", q="c1")) @testing.requires.schemas def test_keyed_accessor_wschema(self): - keyed1 = self.tables['%s.wschema' % testing.config.test_schema] + keyed1 = self.tables["%s.wschema" % testing.config.test_schema] row = testing.db.execute(keyed1.select()).first() eq_(row.b, "a1") @@ -1249,9 +1300,7 @@ class KeyTargetingTest(fixtures.TablesTest): eq_(row.b, "b2") # row.a is ambiguous assert_raises_message( - exc.InvalidRequestError, - "Ambig", - getattr, row, "a" + exc.InvalidRequestError, "Ambig", getattr, row, "a" ) def test_keyed_accessor_composite_names_precedent(self): @@ -1274,12 +1323,16 @@ class KeyTargetingTest(fixtures.TablesTest): assert_raises_message( exc.InvalidRequestError, "Ambiguous column name 'a'", - getattr, row, "b" + getattr, + row, + "b", ) assert_raises_message( exc.InvalidRequestError, "Ambiguous column name 'a'", - getattr, row, "a" + getattr, + row, + "a", ) eq_(row.d, "d3") @@ -1287,39 +1340,42 @@ class KeyTargetingTest(fixtures.TablesTest): keyed1 = self.tables.keyed1 keyed2 = self.tables.keyed2 - row = testing.db.execute(select([keyed1, keyed2]).apply_labels()). \ - first() + row = testing.db.execute( + select([keyed1, keyed2]).apply_labels() + ).first() eq_(row.keyed1_b, "a1") eq_(row.keyed1_a, "a1") eq_(row.keyed1_q, "c1") eq_(row.keyed1_c, "c1") eq_(row.keyed2_a, "a2") eq_(row.keyed2_b, "b2") - assert_raises(KeyError, lambda: row['keyed2_c']) - assert_raises(KeyError, lambda: row['keyed2_q']) + assert_raises(KeyError, lambda: row["keyed2_c"]) + assert_raises(KeyError, lambda: row["keyed2_q"]) def test_column_label_overlap_fallback(self): content, bar = self.tables.content, self.tables.bar row = testing.db.execute( - select([content.c.type.label("content_type")])).first() + select([content.c.type.label("content_type")]) + ).first() not_in_(content.c.type, row) not_in_(bar.c.content_type, row) - in_(sql.column('content_type'), row) + in_(sql.column("content_type"), row) - row = testing.db.execute(select([func.now().label("content_type")])). \ - first() + row = testing.db.execute( + select([func.now().label("content_type")]) + ).first() not_in_(content.c.type, row) not_in_(bar.c.content_type, row) - in_(sql.column('content_type'), row) + in_(sql.column("content_type"), row) def test_column_label_overlap_fallback_2(self): content, bar = self.tables.content, self.tables.bar row = testing.db.execute(content.select(use_labels=True)).first() in_(content.c.type, row) not_in_(bar.c.content_type, row) - not_in_(sql.column('content_type'), row) + not_in_(sql.column("content_type"), row) def test_columnclause_schema_column_one(self): keyed2 = self.tables.keyed2 @@ -1328,7 +1384,7 @@ class KeyTargetingTest(fixtures.TablesTest): # ColumnClause._compare_name_for_result allows the # columns which the statement is against to be lightweight # cols, which results in a more liberal comparison scheme - a, b = sql.column('a'), sql.column('b') + a, b = sql.column("a"), sql.column("b") stmt = select([a, b]).select_from(table("keyed2")) row = testing.db.execute(stmt).first() @@ -1340,7 +1396,7 @@ class KeyTargetingTest(fixtures.TablesTest): def test_columnclause_schema_column_two(self): keyed2 = self.tables.keyed2 - a, b = sql.column('a'), sql.column('b') + a, b = sql.column("a"), sql.column("b") stmt = select([keyed2.c.a, keyed2.c.b]) row = testing.db.execute(stmt).first() @@ -1354,7 +1410,7 @@ class KeyTargetingTest(fixtures.TablesTest): # this is also addressed by [ticket:2932] - a, b = sql.column('a'), sql.column('b') + a, b = sql.column("a"), sql.column("b") stmt = text("select a, b from keyed2").columns(a=CHAR, b=CHAR) row = testing.db.execute(stmt).first() @@ -1370,9 +1426,10 @@ class KeyTargetingTest(fixtures.TablesTest): # this is also addressed by [ticket:2932] - a, b = sql.column('keyed2_a'), sql.column('keyed2_b') + a, b = sql.column("keyed2_a"), sql.column("keyed2_b") stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns( - a, b) + a, b + ) row = testing.db.execute(stmt).first() in_(keyed2.c.a, row) @@ -1388,7 +1445,8 @@ class KeyTargetingTest(fixtures.TablesTest): # this is also addressed by [ticket:2932] stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns( - keyed2_a=CHAR, keyed2_b=CHAR) + keyed2_a=CHAR, keyed2_b=CHAR + ) row = testing.db.execute(stmt).first() in_(keyed2.c.a, row) @@ -1398,29 +1456,29 @@ class KeyTargetingTest(fixtures.TablesTest): class PositionalTextTest(fixtures.TablesTest): - run_inserts = 'once' + run_inserts = "once" run_deletes = None __backend__ = True @classmethod def define_tables(cls, metadata): Table( - 'text1', + "text1", metadata, Column("a", CHAR(2)), Column("b", CHAR(2)), Column("c", CHAR(2)), - Column("d", CHAR(2)) + Column("d", CHAR(2)), ) @classmethod def insert_data(cls): - cls.tables.text1.insert().execute([ - dict(a="a1", b="b1", c="c1", d="d1"), - ]) + cls.tables.text1.insert().execute( + [dict(a="a1", b="b1", c="c1", d="d1")] + ) def test_via_column(self): - c1, c2, c3, c4 = column('q'), column('p'), column('r'), column('d') + c1, c2, c3, c4 = column("q"), column("p"), column("r"), column("d") stmt = text("select a, b, c, d from text1").columns(c1, c2, c3, c4) result = testing.db.execute(stmt) @@ -1435,7 +1493,7 @@ class PositionalTextTest(fixtures.TablesTest): eq_(row["d"], "d1") def test_fewer_cols_than_sql_positional(self): - c1, c2 = column('q'), column('p') + c1, c2 = column("q"), column("p") stmt = text("select a, b, c, d from text1").columns(c1, c2) # no warning as this can be similar for non-positional @@ -1446,7 +1504,7 @@ class PositionalTextTest(fixtures.TablesTest): eq_(row["c"], "c1") def test_fewer_cols_than_sql_non_positional(self): - c1, c2 = column('a'), column('p') + c1, c2 = column("a"), column("p") stmt = text("select a, b, c, d from text1").columns(c2, c1, d=CHAR) # no warning as this can be similar for non-positional @@ -1459,38 +1517,36 @@ class PositionalTextTest(fixtures.TablesTest): # c2 name does not match, doesn't locate assert_raises_message( - exc.NoSuchColumnError, - "in row for column 'p'", - lambda: row[c2] + exc.NoSuchColumnError, "in row for column 'p'", lambda: row[c2] ) def test_more_cols_than_sql(self): - c1, c2, c3, c4 = column('q'), column('p'), column('r'), column('d') + c1, c2, c3, c4 = column("q"), column("p"), column("r"), column("d") stmt = text("select a, b from text1").columns(c1, c2, c3, c4) with assertions.expect_warnings( - r"Number of columns in textual SQL \(4\) is " - r"smaller than number of columns requested \(2\)"): + r"Number of columns in textual SQL \(4\) is " + r"smaller than number of columns requested \(2\)" + ): result = testing.db.execute(stmt) row = result.first() eq_(row[c2], "b1") assert_raises_message( - exc.NoSuchColumnError, - "in row for column 'r'", - lambda: row[c3] + exc.NoSuchColumnError, "in row for column 'r'", lambda: row[c3] ) def test_dupe_col_obj(self): - c1, c2, c3 = column('q'), column('p'), column('r') + c1, c2, c3 = column("q"), column("p"), column("r") stmt = text("select a, b, c, d from text1").columns(c1, c2, c3, c2) assert_raises_message( exc.InvalidRequestError, "Duplicate column expression requested in " "textual SQL: <.*.ColumnClause.*; p>", - testing.db.execute, stmt + testing.db.execute, + stmt, ) def test_anon_aliased_unique(self): @@ -1523,7 +1579,7 @@ class PositionalTextTest(fixtures.TablesTest): assert_raises_message( exc.NoSuchColumnError, "Could not locate column in row for column 'text1.b'", - lambda: row[text1.c.b] + lambda: row[text1.c.b], ) def test_anon_aliased_overlapping(self): @@ -1558,7 +1614,8 @@ class PositionalTextTest(fixtures.TablesTest): # all cols are named "a". if we are positional, we don't care. # this is new logic in 1.1 stmt = text("select a, b as a, c as a, d as a from text1").columns( - c1, c2, c3, c4) + c1, c2, c3, c4 + ) result = testing.db.execute(stmt) row = result.first() @@ -1572,42 +1629,45 @@ class PositionalTextTest(fixtures.TablesTest): assert_raises_message( exc.NoSuchColumnError, "Could not locate column in row for column 'text1.a'", - lambda: row[text1.c.a] + lambda: row[text1.c.a], ) class AlternateResultProxyTest(fixtures.TablesTest): - __requires__ = ('sqlite', ) + __requires__ = ("sqlite",) @classmethod def setup_bind(cls): - cls.engine = engine = engines.testing_engine('sqlite://') + cls.engine = engine = engines.testing_engine("sqlite://") return engine @classmethod def define_tables(cls, metadata): Table( - 'test', metadata, - Column('x', Integer, primary_key=True), - Column('y', String(50, convert_unicode='force')) + "test", + metadata, + Column("x", Integer, primary_key=True), + Column("y", String(50, convert_unicode="force")), ) @classmethod def insert_data(cls): - cls.engine.execute(cls.tables.test.insert(), [ - {'x': i, 'y': "t_%d" % i} for i in range(1, 12) - ]) + cls.engine.execute( + cls.tables.test.insert(), + [{"x": i, "y": "t_%d" % i} for i in range(1, 12)], + ) @contextmanager def _proxy_fixture(self, cls): self.table = self.tables.test class ExcCtx(default.DefaultExecutionContext): - def get_result_proxy(self): return cls(self) + self.patcher = patch.object( - self.engine.dialect, "execution_ctx_cls", ExcCtx) + self.engine.dialect, "execution_ctx_cls", ExcCtx + ) with self.patcher: yield @@ -1664,21 +1724,15 @@ class AlternateResultProxyTest(fixtures.TablesTest): def _assert_result_closed(self, r): assert_raises_message( - sa_exc.ResourceClosedError, - "object is closed", - r.fetchone + sa_exc.ResourceClosedError, "object is closed", r.fetchone ) assert_raises_message( - sa_exc.ResourceClosedError, - "object is closed", - r.fetchmany, 2 + sa_exc.ResourceClosedError, "object is closed", r.fetchmany, 2 ) assert_raises_message( - sa_exc.ResourceClosedError, - "object is closed", - r.fetchall + sa_exc.ResourceClosedError, "object is closed", r.fetchall ) def test_basic_plain(self): @@ -1738,35 +1792,28 @@ class AlternateResultProxyTest(fixtures.TablesTest): def test_buffered_row_growth(self): with self._proxy_fixture(_result.BufferedRowResultProxy): with self.engine.connect() as conn: - conn.execute(self.table.insert(), [ - {'x': i, 'y': "t_%d" % i} for i in range(15, 1200) - ]) + conn.execute( + self.table.insert(), + [{"x": i, "y": "t_%d" % i} for i in range(15, 1200)], + ) result = conn.execute(self.table.select()) - checks = { - 0: 5, 1: 10, 9: 20, 135: 250, 274: 500, - 1351: 1000 - } + checks = {0: 5, 1: 10, 9: 20, 135: 250, 274: 500, 1351: 1000} for idx, row in enumerate(result, 0): if idx in checks: eq_(result._bufsize, checks[idx]) - le_( - len(result._BufferedRowResultProxy__rowbuffer), - 1000 - ) + le_(len(result._BufferedRowResultProxy__rowbuffer), 1000) def test_max_row_buffer_option(self): with self._proxy_fixture(_result.BufferedRowResultProxy): with self.engine.connect() as conn: - conn.execute(self.table.insert(), [ - {'x': i, 'y': "t_%d" % i} for i in range(15, 1200) - ]) + conn.execute( + self.table.insert(), + [{"x": i, "y": "t_%d" % i} for i in range(15, 1200)], + ) result = conn.execution_options(max_row_buffer=27).execute( self.table.select() ) for idx, row in enumerate(result, 0): if idx in (16, 70, 150, 250): eq_(result._bufsize, 27) - le_( - len(result._BufferedRowResultProxy__rowbuffer), - 27 - ) + le_(len(result._BufferedRowResultProxy__rowbuffer), 27) |
