diff options
Diffstat (limited to 'lib/sqlalchemy')
| -rw-r--r-- | lib/sqlalchemy/dialects/oracle/base.py | 151 | ||||
| -rw-r--r-- | lib/sqlalchemy/testing/plugin/noseplugin.py | 10 |
2 files changed, 101 insertions, 60 deletions
diff --git a/lib/sqlalchemy/dialects/oracle/base.py b/lib/sqlalchemy/dialects/oracle/base.py index baea4815b..428c0b0ee 100644 --- a/lib/sqlalchemy/dialects/oracle/base.py +++ b/lib/sqlalchemy/dialects/oracle/base.py @@ -831,19 +831,29 @@ class OracleDialect(default.DefaultDialect): if resolve_synonyms: actual_name, owner, dblink, synonym = self._resolve_synonym( - connection, - desired_owner=self.denormalize_name(schema), - desired_synonym=self.denormalize_name(table_name) - ) + connection, + desired_owner=self.denormalize_name(schema), + desired_synonym=self.denormalize_name(table_name) + ) else: actual_name, owner, dblink, synonym = None, None, None, None if not actual_name: actual_name = self.denormalize_name(table_name) - if not dblink: - dblink = '' - if not owner: + + if dblink: + # using user_db_links here since all_db_links appears + # to have more restricted permissions. + # http://docs.oracle.com/cd/B28359_01/server.111/b28310/ds_admin005.htm + # will need to hear from more users if we are doing + # the right thing here. See [ticket:2619] + owner = connection.scalar( + sql.text("SELECT username FROM user_db_links " + "WHERE db_link=:link"), link=dblink) + dblink = "@" + dblink + elif not owner: owner = self.denormalize_name(schema or self.default_schema_name) - return (actual_name, owner, dblink, synonym) + + return (actual_name, owner, dblink or '', synonym) @reflection.cache def get_schema_names(self, connection, **kw): @@ -899,12 +909,18 @@ class OracleDialect(default.DefaultDialect): else: char_length_col = 'data_length' - c = connection.execute(sql.text( - "SELECT column_name, data_type, %(char_length_col)s, data_precision, data_scale, " - "nullable, data_default FROM ALL_TAB_COLUMNS%(dblink)s " - "WHERE table_name = :table_name AND owner = :owner " - "ORDER BY column_id" % {'dblink': dblink, 'char_length_col': char_length_col}), - table_name=table_name, owner=schema) + params = {"table_name": table_name} + text = "SELECT column_name, data_type, %(char_length_col)s, "\ + "data_precision, data_scale, "\ + "nullable, data_default FROM ALL_TAB_COLUMNS%(dblink)s "\ + "WHERE table_name = :table_name" + if schema is not None: + params['owner'] = schema + text += " AND owner = :owner " + text += " ORDER BY column_id" + text = text % {'dblink': dblink, 'char_length_col': char_length_col} + + c = connection.execute(sql.text(text), **params) for row in c: (colname, orig_colname, coltype, length, precision, scale, nullable, default) = \ @@ -948,20 +964,28 @@ class OracleDialect(default.DefaultDialect): resolve_synonyms, dblink, info_cache=info_cache) indexes = [] - q = sql.text(""" - SELECT a.index_name, a.column_name, b.uniqueness - FROM ALL_IND_COLUMNS%(dblink)s a, - ALL_INDEXES%(dblink)s b - WHERE - a.index_name = b.index_name - AND a.table_owner = b.table_owner - AND a.table_name = b.table_name - - AND a.table_name = :table_name - AND a.table_owner = :schema - ORDER BY a.index_name, a.column_position""" % {'dblink': dblink}) - rp = connection.execute(q, table_name=self.denormalize_name(table_name), - schema=self.denormalize_name(schema)) + + params = {'table_name': table_name} + text = \ + "SELECT a.index_name, a.column_name, b.uniqueness "\ + "\nFROM ALL_IND_COLUMNS%(dblink)s a, "\ + "\nALL_INDEXES%(dblink)s b "\ + "\nWHERE "\ + "\na.index_name = b.index_name "\ + "\nAND a.table_owner = b.table_owner "\ + "\nAND a.table_name = b.table_name "\ + "\nAND a.table_name = :table_name " + + if schema is not None: + params['schema'] = schema + text += "AND a.table_owner = :schema " + + text += "ORDER BY a.index_name, a.column_position" + + text = text % {'dblink': dblink} + + q = sql.text(text) + rp = connection.execute(q, **params) indexes = [] last_index_name = None pk_constraint = self.get_pk_constraint( @@ -1003,29 +1027,38 @@ class OracleDialect(default.DefaultDialect): def _get_constraint_data(self, connection, table_name, schema=None, dblink='', **kw): - rp = connection.execute( - sql.text("""SELECT - ac.constraint_name, - ac.constraint_type, - loc.column_name AS local_column, - rem.table_name AS remote_table, - rem.column_name AS remote_column, - rem.owner AS remote_owner, - loc.position as loc_pos, - rem.position as rem_pos - FROM all_constraints%(dblink)s ac, - all_cons_columns%(dblink)s loc, - all_cons_columns%(dblink)s rem - WHERE ac.table_name = :table_name - AND ac.constraint_type IN ('R','P') - AND ac.owner = :owner - AND ac.owner = loc.owner - AND ac.constraint_name = loc.constraint_name - AND ac.r_owner = rem.owner(+) - AND ac.r_constraint_name = rem.constraint_name(+) - AND (rem.position IS NULL or loc.position=rem.position) - ORDER BY ac.constraint_name, loc.position""" % {'dblink': dblink}), - table_name=table_name, owner=schema) + params = {'table_name': table_name} + + text = \ + "SELECT"\ + "\nac.constraint_name,"\ + "\nac.constraint_type,"\ + "\nloc.column_name AS local_column,"\ + "\nrem.table_name AS remote_table,"\ + "\nrem.column_name AS remote_column,"\ + "\nrem.owner AS remote_owner,"\ + "\nloc.position as loc_pos,"\ + "\nrem.position as rem_pos"\ + "\nFROM all_constraints%(dblink)s ac,"\ + "\nall_cons_columns%(dblink)s loc,"\ + "\nall_cons_columns%(dblink)s rem"\ + "\nWHERE ac.table_name = :table_name"\ + "\nAND ac.constraint_type IN ('R','P')" + + if schema is not None: + params['owner'] = schema + text += "\nAND ac.owner = :owner" + + text += \ + "\nAND ac.owner = loc.owner"\ + "\nAND ac.constraint_name = loc.constraint_name"\ + "\nAND ac.r_owner = rem.owner(+)"\ + "\nAND ac.r_constraint_name = rem.constraint_name(+)"\ + "\nAND (rem.position IS NULL or loc.position=rem.position)"\ + "\nORDER BY ac.constraint_name, loc.position" + + text = text % {'dblink': dblink} + rp = connection.execute(sql.text(text), **params) constraint_data = rp.fetchall() return constraint_data @@ -1138,13 +1171,15 @@ class OracleDialect(default.DefaultDialect): self._prepare_reflection_args(connection, view_name, schema, resolve_synonyms, dblink, info_cache=info_cache) - s = sql.text(""" - SELECT text FROM all_views - WHERE owner = :schema - AND view_name = :view_name - """) - rp = connection.execute(s, - view_name=view_name, schema=schema).scalar() + + params = {'view_name': view_name} + text = "SELECT text FROM all_views WHERE view_name=:view_name" + + if schema is not None: + text += " AND owner = :schema" + params['schema'] = schema + + rp = connection.execute(sql.text(text), **params).scalar() if rp: return rp.decode(self.encoding) else: diff --git a/lib/sqlalchemy/testing/plugin/noseplugin.py b/lib/sqlalchemy/testing/plugin/noseplugin.py index c104c4614..81e724bbe 100644 --- a/lib/sqlalchemy/testing/plugin/noseplugin.py +++ b/lib/sqlalchemy/testing/plugin/noseplugin.py @@ -243,6 +243,7 @@ def _requirements(options, file_config): def _post_setup_options(opt, file_config): from sqlalchemy.testing import config config.options = options + config.file_config = file_config @post @@ -366,9 +367,14 @@ class NoseSQLAlchemy(Plugin): if not check.enabled: raise SkipTest( - "'%s' unsupported on DB implementation '%s'" % ( - cls.__name__, config.db.name) + check.reason if check.reason + else + ( + "'%s' unsupported on DB implementation '%s'" % ( + cls.__name__, config.db.name + ) ) + ) if cls.__unsupported_on__: spec = exclusions.db_spec(*cls.__unsupported_on__) |
