from sqlalchemy.testing import eq_, assert_raises_message, assert_raises, \ in_, not_in_, is_, ne_ from sqlalchemy import testing from sqlalchemy.testing import fixtures, engines from sqlalchemy import util from sqlalchemy import ( exc, sql, func, select, String, Integer, MetaData, ForeignKey, VARCHAR, INT, CHAR, text, type_coerce, literal_column, TypeDecorator, table, column) from sqlalchemy.engine import result as _result from sqlalchemy.testing.schema import Table, Column import operator from sqlalchemy.testing import assertions class ResultProxyTest(fixtures.TablesTest): __backend__ = True @classmethod def define_tables(cls, metadata): Table( 'users', metadata, Column( 'user_id', INT, primary_key=True, test_needs_autoincrement=True), Column('user_name', VARCHAR(20)), test_needs_acid=True ) Table( 'addresses', metadata, Column( 'address_id', Integer, primary_key=True, test_needs_autoincrement=True), Column('user_id', Integer, ForeignKey('users.user_id')), Column('address', String(30)), test_needs_acid=True ) Table( 'users2', metadata, Column('user_id', INT, primary_key=True), Column('user_name', VARCHAR(20)), test_needs_acid=True ) def test_row_iteration(self): users = self.tables.users users.insert().execute( {'user_id': 7, 'user_name': 'jack'}, {'user_id': 8, 'user_name': 'ed'}, {'user_id': 9, 'user_name': 'fred'}, ) r = users.select().execute() l = [] for row in r: l.append(row) eq_(len(l), 3) @testing.requires.subqueries def test_anonymous_rows(self): users = self.tables.users users.insert().execute( {'user_id': 7, 'user_name': 'jack'}, {'user_id': 8, 'user_name': 'ed'}, {'user_id': 9, 'user_name': 'fred'}, ) sel = select([users.c.user_id]).where(users.c.user_name == 'jack'). \ as_scalar() for row in select([sel + 1, sel + 3], bind=users.bind).execute(): eq_(row['anon_1'], 8) eq_(row['anon_2'], 10) def test_row_comparison(self): users = self.tables.users users.insert().execute(user_id=7, user_name='jack') rp = users.select().execute().first() eq_(rp, rp) is_(not(rp != rp), True) equal = (7, 'jack') eq_(rp, equal) eq_(equal, rp) is_((not (rp != equal)), True) is_(not (equal != equal), True) def endless(): while True: yield 1 ne_(rp, endless()) ne_(endless(), rp) # test that everything compares the same # as it would against a tuple for compare in [False, 8, endless(), 'xyz', (7, 'jack')]: for op in [ operator.eq, operator.ne, operator.gt, operator.lt, operator.ge, operator.le ]: try: control = op(equal, compare) except TypeError: # Py3K raises TypeError for some invalid comparisons assert_raises(TypeError, op, rp, compare) else: eq_(control, op(rp, compare)) try: control = op(compare, equal) except TypeError: # Py3K raises TypeError for some invalid comparisons assert_raises(TypeError, op, compare, rp) else: eq_(control, op(compare, rp)) @testing.provide_metadata def test_column_label_overlap_fallback(self): content = Table( 'content', self.metadata, Column('type', String(30)), ) bar = Table( 'bar', self.metadata, Column('content_type', String(30)) ) self.metadata.create_all(testing.db) testing.db.execute(content.insert().values(type="t1")) row = testing.db.execute(content.select(use_labels=True)).first() in_(content.c.type, row) not_in_(bar.c.content_type, row) in_(sql.column('content_type'), row) row = testing.db.execute( select([content.c.type.label("content_type")])).first() in_(content.c.type, row) not_in_(bar.c.content_type, row) in_(sql.column('content_type'), row) row = testing.db.execute(select([func.now().label("content_type")])). \ first() not_in_(content.c.type, row) not_in_(bar.c.content_type, row) in_(sql.column('content_type'), row) def test_pickled_rows(self): users = self.tables.users addresses = self.tables.addresses users.insert().execute( {'user_id': 7, 'user_name': 'jack'}, {'user_id': 8, 'user_name': 'ed'}, {'user_id': 9, 'user_name': 'fred'}, ) for pickle in False, True: for use_labels in False, True: result = users.select(use_labels=use_labels).order_by( users.c.user_id).execute().fetchall() if pickle: result = util.pickle.loads(util.pickle.dumps(result)) eq_( result, [(7, "jack"), (8, "ed"), (9, "fred")] ) if use_labels: eq_(result[0]['users_user_id'], 7) eq_( list(result[0].keys()), ["users_user_id", "users_user_name"]) else: eq_(result[0]['user_id'], 7) eq_(list(result[0].keys()), ["user_id", "user_name"]) eq_(result[0][0], 7) eq_(result[0][users.c.user_id], 7) eq_(result[0][users.c.user_name], 'jack') if not pickle or use_labels: assert_raises( exc.NoSuchColumnError, lambda: result[0][addresses.c.user_id]) else: # test with a different table. name resolution is # causing 'user_id' to match when use_labels wasn't used. eq_(result[0][addresses.c.user_id], 7) assert_raises( exc.NoSuchColumnError, lambda: result[0]['fake key']) assert_raises( exc.NoSuchColumnError, lambda: result[0][addresses.c.address_id]) def test_column_error_printing(self): row = testing.db.execute(select([1])).first() class unprintable(object): def __str__(self): raise ValueError("nope") msg = r"Could not locate column in row for column '%s'" for accessor, repl in [ ("x", "x"), (Column("q", Integer), "q"), (Column("q", Integer) + 12, r"q \+ :q_1"), (unprintable(), "unprintable element.*"), ]: assert_raises_message( exc.NoSuchColumnError, msg % repl, lambda: row[accessor] ) def test_fetchmany(self): users = self.tables.users users.insert().execute(user_id=7, user_name='jack') users.insert().execute(user_id=8, user_name='ed') users.insert().execute(user_id=9, user_name='fred') r = users.select().execute() l = [] for row in r.fetchmany(size=2): l.append(row) eq_(len(l), 2) def test_column_slices(self): users = self.tables.users addresses = self.tables.addresses users.insert().execute(user_id=1, user_name='john') users.insert().execute(user_id=2, user_name='jack') addresses.insert().execute( address_id=1, user_id=2, address='foo@bar.com') r = text( "select * from addresses", bind=testing.db).execute().first() eq_(r[0:1], (1,)) eq_(r[1:], (2, 'foo@bar.com')) eq_(r[:-1], (1, 2)) def test_column_accessor_basic_compiled(self): users = self.tables.users users.insert().execute( dict(user_id=1, user_name='john'), dict(user_id=2, user_name='jack') ) r = users.select(users.c.user_id == 2).execute().first() eq_(r.user_id, 2) eq_(r['user_id'], 2) eq_(r[users.c.user_id], 2) eq_(r.user_name, 'jack') eq_(r['user_name'], 'jack') eq_(r[users.c.user_name], 'jack') def test_column_accessor_basic_text(self): users = self.tables.users users.insert().execute( dict(user_id=1, user_name='john'), dict(user_id=2, user_name='jack') ) r = testing.db.execute( text("select * from users where user_id=2")).first() eq_(r.user_id, 2) eq_(r['user_id'], 2) eq_(r[users.c.user_id], 2) eq_(r.user_name, 'jack') eq_(r['user_name'], 'jack') eq_(r[users.c.user_name], 'jack') def test_column_accessor_textual_select(self): users = self.tables.users users.insert().execute( dict(user_id=1, user_name='john'), dict(user_id=2, user_name='jack') ) # this will create column() objects inside # the select(), these need to match on name anyway r = testing.db.execute( select([ column('user_id'), column('user_name') ]).select_from(table('users')). where(text('user_id=2')) ).first() eq_(r.user_id, 2) eq_(r['user_id'], 2) eq_(r[users.c.user_id], 2) eq_(r.user_name, 'jack') eq_(r['user_name'], 'jack') eq_(r[users.c.user_name], 'jack') def test_column_accessor_dotted_union(self): users = self.tables.users users.insert().execute( dict(user_id=1, user_name='john'), ) # test a little sqlite < 3.10.0 weirdness - with the UNION, # cols come back as "users.user_id" in cursor.description r = testing.db.execute( text( "select users.user_id, users.user_name " "from users " "UNION select users.user_id, " "users.user_name from users" ) ).first() eq_(r['user_id'], 1) eq_(r['user_name'], "john") eq_(list(r.keys()), ["user_id", "user_name"]) def test_column_accessor_sqlite_raw(self): users = self.tables.users users.insert().execute( dict(user_id=1, user_name='john'), ) r = text( "select users.user_id, users.user_name " "from users " "UNION select users.user_id, " "users.user_name from users", bind=testing.db).execution_options(sqlite_raw_colnames=True). \ execute().first() if testing.against("sqlite < 3.10.0"): not_in_('user_id', r) not_in_('user_name', r) eq_(r['users.user_id'], 1) eq_(r['users.user_name'], "john") eq_(list(r.keys()), ["users.user_id", "users.user_name"]) else: not_in_('users.user_id', r) not_in_('users.user_name', r) eq_(r['user_id'], 1) eq_(r['user_name'], "john") eq_(list(r.keys()), ["user_id", "user_name"]) def test_column_accessor_sqlite_translated(self): users = self.tables.users users.insert().execute( dict(user_id=1, user_name='john'), ) r = text( "select users.user_id, users.user_name " "from users " "UNION select users.user_id, " "users.user_name from users", bind=testing.db).execute().first() eq_(r['user_id'], 1) eq_(r['user_name'], "john") if testing.against("sqlite < 3.10.0"): eq_(r['users.user_id'], 1) eq_(r['users.user_name'], "john") else: not_in_('users.user_id', r) not_in_('users.user_name', r) eq_(list(r.keys()), ["user_id", "user_name"]) def test_column_accessor_labels_w_dots(self): users = self.tables.users users.insert().execute( dict(user_id=1, user_name='john'), ) # test using literal tablename.colname r = text( 'select users.user_id AS "users.user_id", ' 'users.user_name AS "users.user_name" ' 'from users', bind=testing.db).\ execution_options(sqlite_raw_colnames=True).execute().first() eq_(r['users.user_id'], 1) eq_(r['users.user_name'], "john") not_in_("user_name", r) eq_(list(r.keys()), ["users.user_id", "users.user_name"]) def test_column_accessor_unary(self): users = self.tables.users users.insert().execute( dict(user_id=1, user_name='john'), ) # unary experssions r = select([users.c.user_name.distinct()]).order_by( users.c.user_name).execute().first() eq_(r[users.c.user_name], 'john') eq_(r.user_name, 'john') def test_column_accessor_err(self): r = testing.db.execute(select([1])).first() assert_raises_message( AttributeError, "Could not locate column in row for column 'foo'", getattr, r, "foo" ) assert_raises_message( KeyError, "Could not locate column in row for column 'foo'", lambda: r['foo'] ) def test_graceful_fetch_on_non_rows(self): """test that calling fetchone() etc. on a result that doesn't return rows fails gracefully. """ # these proxies don't work with no cursor.description present. # so they don't apply to this test at the moment. # result.FullyBufferedResultProxy, # result.BufferedRowResultProxy, # result.BufferedColumnResultProxy users = self.tables.users conn = testing.db.connect() for meth in [ lambda r: r.fetchone(), lambda r: r.fetchall(), lambda r: r.first(), lambda r: r.scalar(), lambda r: r.fetchmany(), lambda r: r._getter('user'), lambda r: r._has_key('user'), ]: trans = conn.begin() result = conn.execute(users.insert(), user_id=1) assert_raises_message( exc.ResourceClosedError, "This result object does not return rows. " "It has been closed automatically.", meth, result, ) trans.rollback() def test_fetchone_til_end(self): result = testing.db.execute("select * from users") eq_(result.fetchone(), None) eq_(result.fetchone(), None) eq_(result.fetchone(), None) result.close() assert_raises_message( exc.ResourceClosedError, "This result object is closed.", result.fetchone ) def test_row_case_sensitive(self): row = testing.db.execute( select([ literal_column("1").label("case_insensitive"), literal_column("2").label("CaseSensitive") ]) ).first() eq_(list(row.keys()), ["case_insensitive", "CaseSensitive"]) in_("case_insensitive", row._keymap) in_("CaseSensitive", row._keymap) not_in_("casesensitive", row._keymap) eq_(row["case_insensitive"], 1) eq_(row["CaseSensitive"], 2) assert_raises( KeyError, lambda: row["Case_insensitive"] ) assert_raises( KeyError, lambda: row["casesensitive"] ) def test_row_case_sensitive_unoptimized(self): ins_db = engines.testing_engine(options={"case_sensitive": True}) row = ins_db.execute( select([ literal_column("1").label("case_insensitive"), literal_column("2").label("CaseSensitive"), text("3 AS screw_up_the_cols") ]) ).first() eq_( list(row.keys()), ["case_insensitive", "CaseSensitive", "screw_up_the_cols"]) in_("case_insensitive", row._keymap) in_("CaseSensitive", row._keymap) not_in_("casesensitive", row._keymap) eq_(row["case_insensitive"], 1) eq_(row["CaseSensitive"], 2) eq_(row["screw_up_the_cols"], 3) assert_raises(KeyError, lambda: row["Case_insensitive"]) assert_raises(KeyError, lambda: row["casesensitive"]) assert_raises(KeyError, lambda: row["screw_UP_the_cols"]) def test_row_case_insensitive(self): ins_db = engines.testing_engine(options={"case_sensitive": False}) row = ins_db.execute( select([ literal_column("1").label("case_insensitive"), literal_column("2").label("CaseSensitive") ]) ).first() eq_(list(row.keys()), ["case_insensitive", "CaseSensitive"]) in_("case_insensitive", row._keymap) in_("CaseSensitive", row._keymap) in_("casesensitive", row._keymap) eq_(row["case_insensitive"], 1) eq_(row["CaseSensitive"], 2) eq_(row["Case_insensitive"], 1) eq_(row["casesensitive"], 2) def test_row_case_insensitive_unoptimized(self): ins_db = engines.testing_engine(options={"case_sensitive": False}) row = ins_db.execute( select([ literal_column("1").label("case_insensitive"), literal_column("2").label("CaseSensitive"), text("3 AS screw_up_the_cols") ]) ).first() eq_( list(row.keys()), ["case_insensitive", "CaseSensitive", "screw_up_the_cols"]) in_("case_insensitive", row._keymap) in_("CaseSensitive", row._keymap) in_("casesensitive", row._keymap) eq_(row["case_insensitive"], 1) eq_(row["CaseSensitive"], 2) eq_(row["screw_up_the_cols"], 3) eq_(row["Case_insensitive"], 1) eq_(row["casesensitive"], 2) eq_(row["screw_UP_the_cols"], 3) def test_row_as_args(self): users = self.tables.users users.insert().execute(user_id=1, user_name='john') r = users.select(users.c.user_id == 1).execute().first() users.delete().execute() users.insert().execute(r) eq_(users.select().execute().fetchall(), [(1, 'john')]) def test_result_as_args(self): users = self.tables.users users2 = self.tables.users2 users.insert().execute([ dict(user_id=1, user_name='john'), dict(user_id=2, user_name='ed')]) r = users.select().execute() users2.insert().execute(list(r)) eq_( users2.select().order_by(users2.c.user_id).execute().fetchall(), [(1, 'john'), (2, 'ed')] ) users2.delete().execute() r = users.select().execute() users2.insert().execute(*list(r)) eq_( users2.select().order_by(users2.c.user_id).execute().fetchall(), [(1, 'john'), (2, 'ed')] ) @testing.requires.duplicate_names_in_cursor_description def test_ambiguous_column(self): users = self.tables.users addresses = self.tables.addresses users.insert().execute(user_id=1, user_name='john') result = users.outerjoin(addresses).select().execute() r = result.first() assert_raises_message( exc.InvalidRequestError, "Ambiguous column name", lambda: r['user_id'] ) # pure positional targeting; users.c.user_id # and addresses.c.user_id are known! # works as of 1.1 issue #3501 eq_(r[users.c.user_id], 1) eq_(r[addresses.c.user_id], None) # try to trick it - fake_table isn't in the result! # we get the correct error fake_table = Table('fake', MetaData(), Column('user_id', Integer)) assert_raises_message( exc.InvalidRequestError, "Could not locate column in row for column 'fake.user_id'", lambda: r[fake_table.c.user_id] ) r = util.pickle.loads(util.pickle.dumps(r)) assert_raises_message( exc.InvalidRequestError, "Ambiguous column name", lambda: r['user_id'] ) result = users.outerjoin(addresses).select().execute() result = _result.BufferedColumnResultProxy(result.context) r = result.first() assert isinstance(r, _result.BufferedColumnRow) assert_raises_message( exc.InvalidRequestError, "Ambiguous column name", lambda: r['user_id'] ) @testing.requires.duplicate_names_in_cursor_description def test_ambiguous_column_by_col(self): users = self.tables.users users.insert().execute(user_id=1, user_name='john') ua = users.alias() u2 = users.alias() result = select([users.c.user_id, ua.c.user_id]).execute() row = result.first() # as of 1.1 issue #3501, we use pure positional # targeting for the column objects here eq_(row[users.c.user_id], 1) eq_(row[ua.c.user_id], 1) # this now works as of 1.1 issue #3501; # previously this was stuck on "ambiguous column name" assert_raises_message( exc.InvalidRequestError, "Could not locate column in row", lambda: row[u2.c.user_id] ) @testing.requires.duplicate_names_in_cursor_description def test_ambiguous_column_contains(self): users = self.tables.users addresses = self.tables.addresses # ticket 2702. in 0.7 we'd get True, False. # in 0.8, both columns are present so it's True; # but when they're fetched you'll get the ambiguous error. users.insert().execute(user_id=1, user_name='john') result = select([users.c.user_id, addresses.c.user_id]).\ select_from(users.outerjoin(addresses)).execute() row = result.first() eq_( set([users.c.user_id in row, addresses.c.user_id in row]), set([True]) ) def test_ambiguous_column_by_col_plus_label(self): users = self.tables.users users.insert().execute(user_id=1, user_name='john') result = select( [users.c.user_id, type_coerce(users.c.user_id, Integer).label('foo')]).execute() row = result.first() eq_( row[users.c.user_id], 1 ) eq_( row[1], 1 ) def test_fetch_partial_result_map(self): users = self.tables.users users.insert().execute(user_id=7, user_name='ed') t = text("select * from users").columns( user_name=String() ) eq_( testing.db.execute(t).fetchall(), [(7, 'ed')] ) def test_fetch_unordered_result_map(self): users = self.tables.users users.insert().execute(user_id=7, user_name='ed') class Goofy1(TypeDecorator): impl = String def process_result_value(self, value, dialect): return value + "a" class Goofy2(TypeDecorator): impl = String def process_result_value(self, value, dialect): return value + "b" class Goofy3(TypeDecorator): impl = String def process_result_value(self, value, dialect): return value + "c" t = text( "select user_name as a, user_name as b, " "user_name as c from users").columns( a=Goofy1(), b=Goofy2(), c=Goofy3() ) eq_( testing.db.execute(t).fetchall(), [ ('eda', 'edb', 'edc') ] ) @testing.requires.subqueries def test_column_label_targeting(self): users = self.tables.users users.insert().execute(user_id=7, user_name='ed') for s in ( users.select().alias('foo'), users.select().alias(users.name), ): row = s.select(use_labels=True).execute().first() eq_(row[s.c.user_id], 7) eq_(row[s.c.user_name], 'ed') def test_keys(self): users = self.tables.users users.insert().execute(user_id=1, user_name='foo') result = users.select().execute() eq_( result.keys(), ['user_id', 'user_name'] ) row = result.first() eq_( row.keys(), ['user_id', 'user_name'] ) def test_keys_anon_labels(self): """test [ticket:3483]""" users = self.tables.users users.insert().execute(user_id=1, user_name='foo') result = testing.db.execute( select([ users.c.user_id, users.c.user_name.label(None), func.count(literal_column('1'))]). group_by(users.c.user_id, users.c.user_name) ) eq_( result.keys(), ['user_id', 'user_name_1', 'count_1'] ) row = result.first() eq_( row.keys(), ['user_id', 'user_name_1', 'count_1'] ) def test_items(self): users = self.tables.users users.insert().execute(user_id=1, user_name='foo') r = users.select().execute().first() eq_( [(x[0].lower(), x[1]) for x in list(r.items())], [('user_id', 1), ('user_name', 'foo')]) def test_len(self): users = self.tables.users users.insert().execute(user_id=1, user_name='foo') r = users.select().execute().first() eq_(len(r), 2) r = testing.db.execute('select user_name, user_id from users'). \ first() eq_(len(r), 2) r = testing.db.execute('select user_name from users').first() eq_(len(r), 1) def test_sorting_in_python(self): users = self.tables.users users.insert().execute( dict(user_id=1, user_name='foo'), dict(user_id=2, user_name='bar'), dict(user_id=3, user_name='def'), ) rows = users.select().order_by(users.c.user_name).execute().fetchall() eq_(rows, [(2, 'bar'), (3, 'def'), (1, 'foo')]) eq_(sorted(rows), [(1, 'foo'), (2, 'bar'), (3, 'def')]) def test_column_order_with_simple_query(self): # should return values in column definition order users = self.tables.users users.insert().execute(user_id=1, user_name='foo') r = users.select(users.c.user_id == 1).execute().first() eq_(r[0], 1) eq_(r[1], 'foo') eq_([x.lower() for x in list(r.keys())], ['user_id', 'user_name']) eq_(list(r.values()), [1, 'foo']) def test_column_order_with_text_query(self): # should return values in query order users = self.tables.users users.insert().execute(user_id=1, user_name='foo') r = testing.db.execute('select user_name, user_id from users'). \ first() eq_(r[0], 'foo') eq_(r[1], 1) eq_([x.lower() for x in list(r.keys())], ['user_name', 'user_id']) eq_(list(r.values()), ['foo', 1]) @testing.crashes('oracle', 'FIXME: unknown, varify not fails_on()') @testing.crashes('firebird', 'An identifier must begin with a letter') @testing.provide_metadata def test_column_accessor_shadow(self): shadowed = Table( 'test_shadowed', self.metadata, Column('shadow_id', INT, primary_key=True), Column('shadow_name', VARCHAR(20)), Column('parent', VARCHAR(20)), Column('row', VARCHAR(40)), Column('_parent', VARCHAR(20)), Column('_row', VARCHAR(20)), ) self.metadata.create_all() shadowed.insert().execute( shadow_id=1, shadow_name='The Shadow', parent='The Light', row='Without light there is no shadow', _parent='Hidden parent', _row='Hidden row') r = shadowed.select(shadowed.c.shadow_id == 1).execute().first() eq_(r.shadow_id, 1) eq_(r['shadow_id'], 1) eq_(r[shadowed.c.shadow_id], 1) eq_(r.shadow_name, 'The Shadow') eq_(r['shadow_name'], 'The Shadow') eq_(r[shadowed.c.shadow_name], 'The Shadow') eq_(r.parent, 'The Light') eq_(r['parent'], 'The Light') eq_(r[shadowed.c.parent], 'The Light') eq_(r.row, 'Without light there is no shadow') eq_(r['row'], 'Without light there is no shadow') eq_(r[shadowed.c.row], 'Without light there is no shadow') eq_(r['_parent'], 'Hidden parent') eq_(r['_row'], 'Hidden row') class KeyTargetingTest(fixtures.TablesTest): run_inserts = 'once' run_deletes = None __backend__ = True @classmethod def define_tables(cls, metadata): Table( 'keyed1', metadata, Column("a", CHAR(2), key="b"), Column("c", CHAR(2), key="q") ) Table('keyed2', metadata, Column("a", CHAR(2)), Column("b", CHAR(2))) Table('keyed3', metadata, Column("a", CHAR(2)), Column("d", CHAR(2))) Table('keyed4', metadata, Column("b", CHAR(2)), Column("q", CHAR(2))) Table('content', metadata, Column('t', String(30), key="type")) Table('bar', metadata, Column('ctype', String(30), key="content_type")) if testing.requires.schemas.enabled: Table( 'wschema', metadata, Column("a", CHAR(2), key="b"), Column("c", CHAR(2), key="q"), schema=testing.config.test_schema ) @classmethod def insert_data(cls): cls.tables.keyed1.insert().execute(dict(b="a1", q="c1")) cls.tables.keyed2.insert().execute(dict(a="a2", b="b2")) cls.tables.keyed3.insert().execute(dict(a="a3", d="d3")) cls.tables.keyed4.insert().execute(dict(b="b4", q="q4")) cls.tables.content.insert().execute(type="t1") if testing.requires.schemas.enabled: cls.tables[ '%s.wschema' % testing.config.test_schema].insert().execute( dict(b="a1", q="c1")) @testing.requires.schemas def test_keyed_accessor_wschema(self): keyed1 = self.tables['%s.wschema' % testing.config.test_schema] row = testing.db.execute(keyed1.select()).first() eq_(row.b, "a1") eq_(row.q, "c1") eq_(row.a, "a1") eq_(row.c, "c1") def test_keyed_accessor_single(self): keyed1 = self.tables.keyed1 row = testing.db.execute(keyed1.select()).first() eq_(row.b, "a1") eq_(row.q, "c1") eq_(row.a, "a1") eq_(row.c, "c1") def test_keyed_accessor_single_labeled(self): keyed1 = self.tables.keyed1 row = testing.db.execute(keyed1.select().apply_labels()).first() eq_(row.keyed1_b, "a1") eq_(row.keyed1_q, "c1") eq_(row.keyed1_a, "a1") eq_(row.keyed1_c, "c1") @testing.requires.duplicate_names_in_cursor_description def test_keyed_accessor_composite_conflict_2(self): keyed1 = self.tables.keyed1 keyed2 = self.tables.keyed2 row = testing.db.execute(select([keyed1, keyed2])).first() # row.b is unambiguous eq_(row.b, "b2") # row.a is ambiguous assert_raises_message( exc.InvalidRequestError, "Ambig", getattr, row, "a" ) def test_keyed_accessor_composite_names_precedent(self): keyed1 = self.tables.keyed1 keyed4 = self.tables.keyed4 row = testing.db.execute(select([keyed1, keyed4])).first() eq_(row.b, "b4") eq_(row.q, "q4") eq_(row.a, "a1") eq_(row.c, "c1") @testing.requires.duplicate_names_in_cursor_description def test_keyed_accessor_composite_keys_precedent(self): keyed1 = self.tables.keyed1 keyed3 = self.tables.keyed3 row = testing.db.execute(select([keyed1, keyed3])).first() eq_(row.q, "c1") assert_raises_message( exc.InvalidRequestError, "Ambiguous column name 'a'", getattr, row, "b" ) assert_raises_message( exc.InvalidRequestError, "Ambiguous column name 'a'", getattr, row, "a" ) eq_(row.d, "d3") def test_keyed_accessor_composite_labeled(self): keyed1 = self.tables.keyed1 keyed2 = self.tables.keyed2 row = testing.db.execute(select([keyed1, keyed2]).apply_labels()). \ first() eq_(row.keyed1_b, "a1") eq_(row.keyed1_a, "a1") eq_(row.keyed1_q, "c1") eq_(row.keyed1_c, "c1") eq_(row.keyed2_a, "a2") eq_(row.keyed2_b, "b2") assert_raises(KeyError, lambda: row['keyed2_c']) assert_raises(KeyError, lambda: row['keyed2_q']) def test_column_label_overlap_fallback(self): content, bar = self.tables.content, self.tables.bar row = testing.db.execute( select([content.c.type.label("content_type")])).first() not_in_(content.c.type, row) not_in_(bar.c.content_type, row) in_(sql.column('content_type'), row) row = testing.db.execute(select([func.now().label("content_type")])). \ first() not_in_(content.c.type, row) not_in_(bar.c.content_type, row) in_(sql.column('content_type'), row) def test_column_label_overlap_fallback_2(self): content, bar = self.tables.content, self.tables.bar row = testing.db.execute(content.select(use_labels=True)).first() in_(content.c.type, row) not_in_(bar.c.content_type, row) not_in_(sql.column('content_type'), row) def test_columnclause_schema_column_one(self): keyed2 = self.tables.keyed2 # this is addressed by [ticket:2932] # ColumnClause._compare_name_for_result allows the # columns which the statement is against to be lightweight # cols, which results in a more liberal comparison scheme a, b = sql.column('a'), sql.column('b') stmt = select([a, b]).select_from(table("keyed2")) row = testing.db.execute(stmt).first() in_(keyed2.c.a, row) in_(keyed2.c.b, row) in_(a, row) in_(b, row) def test_columnclause_schema_column_two(self): keyed2 = self.tables.keyed2 a, b = sql.column('a'), sql.column('b') stmt = select([keyed2.c.a, keyed2.c.b]) row = testing.db.execute(stmt).first() in_(keyed2.c.a, row) in_(keyed2.c.b, row) in_(a, row) in_(b, row) def test_columnclause_schema_column_three(self): keyed2 = self.tables.keyed2 # this is also addressed by [ticket:2932] a, b = sql.column('a'), sql.column('b') stmt = text("select a, b from keyed2").columns(a=CHAR, b=CHAR) row = testing.db.execute(stmt).first() in_(keyed2.c.a, row) in_(keyed2.c.b, row) in_(a, row) in_(b, row) in_(stmt.c.a, row) in_(stmt.c.b, row) def test_columnclause_schema_column_four(self): keyed2 = self.tables.keyed2 # this is also addressed by [ticket:2932] a, b = sql.column('keyed2_a'), sql.column('keyed2_b') stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns( a, b) row = testing.db.execute(stmt).first() in_(keyed2.c.a, row) in_(keyed2.c.b, row) in_(a, row) in_(b, row) in_(stmt.c.keyed2_a, row) in_(stmt.c.keyed2_b, row) def test_columnclause_schema_column_five(self): keyed2 = self.tables.keyed2 # this is also addressed by [ticket:2932] stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns( keyed2_a=CHAR, keyed2_b=CHAR) row = testing.db.execute(stmt).first() in_(keyed2.c.a, row) in_(keyed2.c.b, row) in_(stmt.c.keyed2_a, row) in_(stmt.c.keyed2_b, row) class PositionalTextTest(fixtures.TablesTest): run_inserts = 'once' run_deletes = None __backend__ = True @classmethod def define_tables(cls, metadata): Table( 'text1', metadata, Column("a", CHAR(2)), Column("b", CHAR(2)), Column("c", CHAR(2)), Column("d", CHAR(2)) ) @classmethod def insert_data(cls): cls.tables.text1.insert().execute([ dict(a="a1", b="b1", c="c1", d="d1"), ]) def test_via_column(self): c1, c2, c3, c4 = column('q'), column('p'), column('r'), column('d') stmt = text("select a, b, c, d from text1").columns(c1, c2, c3, c4) result = testing.db.execute(stmt) row = result.first() eq_(row[c2], "b1") eq_(row[c4], "d1") eq_(row[1], "b1") eq_(row["b"], "b1") eq_(row.keys(), ["a", "b", "c", "d"]) eq_(row["r"], "c1") eq_(row["d"], "d1") def test_fewer_cols_than_sql_positional(self): c1, c2 = column('q'), column('p') stmt = text("select a, b, c, d from text1").columns(c1, c2) # no warning as this can be similar for non-positional result = testing.db.execute(stmt) row = result.first() eq_(row[c1], "a1") eq_(row["c"], "c1") def test_fewer_cols_than_sql_non_positional(self): c1, c2 = column('a'), column('p') stmt = text("select a, b, c, d from text1").columns(c2, c1, d=CHAR) # no warning as this can be similar for non-positional result = testing.db.execute(stmt) row = result.first() # c1 name matches, locates eq_(row[c1], "a1") eq_(row["c"], "c1") # c2 name does not match, doesn't locate assert_raises_message( exc.NoSuchColumnError, "in row for column 'p'", lambda: row[c2] ) def test_more_cols_than_sql(self): c1, c2, c3, c4 = column('q'), column('p'), column('r'), column('d') stmt = text("select a, b from text1").columns(c1, c2, c3, c4) with assertions.expect_warnings( r"Number of columns in textual SQL \(4\) is " "smaller than number of columns requested \(2\)"): result = testing.db.execute(stmt) row = result.first() eq_(row[c2], "b1") assert_raises_message( exc.NoSuchColumnError, "in row for column 'r'", lambda: row[c3] ) def test_dupe_col_obj(self): c1, c2, c3 = column('q'), column('p'), column('r') stmt = text("select a, b, c, d from text1").columns(c1, c2, c3, c2) assert_raises_message( exc.InvalidRequestError, "Duplicate column expression requested in " "textual SQL: <.*.ColumnClause.*; p>", testing.db.execute, stmt ) def test_anon_aliased_unique(self): text1 = self.tables.text1 c1 = text1.c.a.label(None) c2 = text1.alias().c.c c3 = text1.alias().c.b c4 = text1.alias().c.d.label(None) stmt = text("select a, b, c, d from text1").columns(c1, c2, c3, c4) result = testing.db.execute(stmt) row = result.first() eq_(row[c1], "a1") eq_(row[c2], "b1") eq_(row[c3], "c1") eq_(row[c4], "d1") # key fallback rules still match this to a column # unambiguously based on its name eq_(row[text1.c.a], "a1") # key fallback rules still match this to a column # unambiguously based on its name eq_(row[text1.c.d], "d1") # text1.c.b goes nowhere....because we hit key fallback # but the text1.c.b doesn't derive from text1.c.c assert_raises_message( exc.NoSuchColumnError, "Could not locate column in row for column 'text1.b'", lambda: row[text1.c.b] ) def test_anon_aliased_overlapping(self): text1 = self.tables.text1 c1 = text1.c.a.label(None) c2 = text1.alias().c.a c3 = text1.alias().c.a.label(None) c4 = text1.c.a.label(None) stmt = text("select a, b, c, d from text1").columns(c1, c2, c3, c4) result = testing.db.execute(stmt) row = result.first() eq_(row[c1], "a1") eq_(row[c2], "b1") eq_(row[c3], "c1") eq_(row[c4], "d1") # key fallback rules still match this to a column # unambiguously based on its name eq_(row[text1.c.a], "a1") def test_anon_aliased_name_conflict(self): text1 = self.tables.text1 c1 = text1.c.a.label("a") c2 = text1.alias().c.a c3 = text1.alias().c.a.label("a") c4 = text1.c.a.label("a") # all cols are named "a". if we are positional, we don't care. # this is new logic in 1.1 stmt = text("select a, b as a, c as a, d as a from text1").columns( c1, c2, c3, c4) result = testing.db.execute(stmt) row = result.first() eq_(row[c1], "a1") eq_(row[c2], "b1") eq_(row[c3], "c1") eq_(row[c4], "d1") # fails, because we hit key fallback and find conflicts # in columns that are presnet assert_raises_message( exc.NoSuchColumnError, "Could not locate column in row for column 'text1.a'", lambda: row[text1.c.a] )