diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2016-01-08 22:11:09 -0500 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2016-01-08 22:12:25 -0500 |
| commit | 89facbed8855d1443dbe37919ff0645aea640ed0 (patch) | |
| tree | 33e7ab15470a5f3a76b748418e6be0c62aa1eaba /test/engine | |
| parent | 777e25694f1567ff61655d86a91be6264186c13e (diff) | |
| download | sqlalchemy-89facbed8855d1443dbe37919ff0645aea640ed0.tar.gz | |
- Multi-tenancy schema translation for :class:`.Table` objects is added.
This supports the use case of an application that uses the same set of
:class:`.Table` objects in many schemas, such as schema-per-user.
A new execution option
:paramref:`.Connection.execution_options.schema_translate_map` is
added. fixes #2685
- latest tox doesn't like the {posargs} in the profile rerunner
Diffstat (limited to 'test/engine')
| -rw-r--r-- | test/engine/test_execute.py | 186 | ||||
| -rw-r--r-- | test/engine/test_reflection.py | 21 |
2 files changed, 202 insertions, 5 deletions
diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py index fbb1878dc..5ea5d3515 100644 --- a/test/engine/test_execute.py +++ b/test/engine/test_execute.py @@ -21,6 +21,8 @@ from sqlalchemy.testing import fixtures from sqlalchemy.testing.mock import Mock, call, patch from contextlib import contextmanager from sqlalchemy.util import nested +from sqlalchemy.testing.assertsql import CompiledSQL + users, metadata, users_autoinc = None, None, None @@ -805,6 +807,40 @@ class CompiledCacheTest(fixtures.TestBase): eq_(compile_mock.call_count, 1) eq_(len(cache), 1) + @testing.requires.schemas + @testing.provide_metadata + def test_schema_translate_in_key(self): + Table( + 'x', self.metadata, Column('q', Integer)) + Table( + 'x', self.metadata, Column('q', Integer), + schema=config.test_schema) + self.metadata.create_all() + + m = MetaData() + t1 = Table('x', m, Column('q', Integer)) + ins = t1.insert() + stmt = select([t1.c.q]) + + cache = {} + with config.db.connect().execution_options( + compiled_cache=cache, + ) as conn: + conn.execute(ins, {"q": 1}) + eq_(conn.scalar(stmt), 1) + + with config.db.connect().execution_options( + compiled_cache=cache, + schema_translate_map={None: config.test_schema} + ) as conn: + conn.execute(ins, {"q": 2}) + eq_(conn.scalar(stmt), 2) + + with config.db.connect().execution_options( + compiled_cache=cache, + ) as conn: + eq_(conn.scalar(stmt), 1) + class MockStrategyTest(fixtures.TestBase): @@ -989,6 +1025,156 @@ class ResultProxyTest(fixtures.TestBase): finally: r.close() +class SchemaTranslateTest(fixtures.TestBase, testing.AssertsExecutionResults): + __requires__ = 'schemas', + __backend__ = True + + def test_create_table(self): + map_ = { + None: config.test_schema, + "foo": config.test_schema, "bar": None} + + metadata = MetaData() + t1 = Table('t1', metadata, Column('x', Integer)) + t2 = Table('t2', metadata, Column('x', Integer), schema="foo") + t3 = Table('t3', metadata, Column('x', Integer), schema="bar") + + with self.sql_execution_asserter(config.db) as asserter: + with config.db.connect().execution_options( + schema_translate_map=map_) as conn: + + t1.create(conn) + t2.create(conn) + t3.create(conn) + + t3.drop(conn) + t2.drop(conn) + t1.drop(conn) + + asserter.assert_( + CompiledSQL("CREATE TABLE %s.t1 (x INTEGER)" % config.test_schema), + CompiledSQL("CREATE TABLE %s.t2 (x INTEGER)" % config.test_schema), + CompiledSQL("CREATE TABLE t3 (x INTEGER)"), + CompiledSQL("DROP TABLE t3"), + CompiledSQL("DROP TABLE %s.t2" % config.test_schema), + CompiledSQL("DROP TABLE %s.t1" % config.test_schema) + ) + + def _fixture(self): + metadata = self.metadata + Table( + 't1', metadata, Column('x', Integer), + schema=config.test_schema) + Table( + 't2', metadata, Column('x', Integer), + schema=config.test_schema) + Table('t3', metadata, Column('x', Integer), schema=None) + metadata.create_all() + + def test_ddl_hastable(self): + + map_ = { + None: config.test_schema, + "foo": config.test_schema, "bar": None} + + metadata = MetaData() + Table('t1', metadata, Column('x', Integer)) + Table('t2', metadata, Column('x', Integer), schema="foo") + Table('t3', metadata, Column('x', Integer), schema="bar") + + with config.db.connect().execution_options( + schema_translate_map=map_) as conn: + metadata.create_all(conn) + + assert config.db.has_table('t1', schema=config.test_schema) + assert config.db.has_table('t2', schema=config.test_schema) + assert config.db.has_table('t3', schema=None) + + with config.db.connect().execution_options( + schema_translate_map=map_) as conn: + metadata.drop_all(conn) + + assert not config.db.has_table('t1', schema=config.test_schema) + assert not config.db.has_table('t2', schema=config.test_schema) + assert not config.db.has_table('t3', schema=None) + + @testing.provide_metadata + def test_crud(self): + self._fixture() + + map_ = { + None: config.test_schema, + "foo": config.test_schema, "bar": None} + + metadata = MetaData() + t1 = Table('t1', metadata, Column('x', Integer)) + t2 = Table('t2', metadata, Column('x', Integer), schema="foo") + t3 = Table('t3', metadata, Column('x', Integer), schema="bar") + + with self.sql_execution_asserter(config.db) as asserter: + with config.db.connect().execution_options( + schema_translate_map=map_) as conn: + + conn.execute(t1.insert(), {'x': 1}) + conn.execute(t2.insert(), {'x': 1}) + conn.execute(t3.insert(), {'x': 1}) + + conn.execute(t1.update().values(x=1).where(t1.c.x == 1)) + conn.execute(t2.update().values(x=2).where(t2.c.x == 1)) + conn.execute(t3.update().values(x=3).where(t3.c.x == 1)) + + eq_(conn.scalar(select([t1.c.x])), 1) + eq_(conn.scalar(select([t2.c.x])), 2) + eq_(conn.scalar(select([t3.c.x])), 3) + + conn.execute(t1.delete()) + conn.execute(t2.delete()) + conn.execute(t3.delete()) + + asserter.assert_( + CompiledSQL( + "INSERT INTO %s.t1 (x) VALUES (:x)" % config.test_schema), + CompiledSQL( + "INSERT INTO %s.t2 (x) VALUES (:x)" % config.test_schema), + CompiledSQL( + "INSERT INTO t3 (x) VALUES (:x)"), + CompiledSQL( + "UPDATE %s.t1 SET x=:x WHERE %s.t1.x = :x_1" % ( + config.test_schema, config.test_schema)), + CompiledSQL( + "UPDATE %s.t2 SET x=:x WHERE %s.t2.x = :x_1" % ( + config.test_schema, config.test_schema)), + CompiledSQL("UPDATE t3 SET x=:x WHERE t3.x = :x_1"), + CompiledSQL("SELECT %s.t1.x FROM %s.t1" % ( + config.test_schema, config.test_schema)), + CompiledSQL("SELECT %s.t2.x FROM %s.t2" % ( + config.test_schema, config.test_schema)), + CompiledSQL("SELECT t3.x FROM t3"), + CompiledSQL("DELETE FROM %s.t1" % config.test_schema), + CompiledSQL("DELETE FROM %s.t2" % config.test_schema), + CompiledSQL("DELETE FROM t3") + ) + + @testing.provide_metadata + def test_via_engine(self): + self._fixture() + + map_ = { + None: config.test_schema, + "foo": config.test_schema, "bar": None} + + metadata = MetaData() + t2 = Table('t2', metadata, Column('x', Integer), schema="foo") + + with self.sql_execution_asserter(config.db) as asserter: + eng = config.db.execution_options(schema_translate_map=map_) + conn = eng.connect() + conn.execute(select([t2.c.x])) + asserter.assert_( + CompiledSQL("SELECT %s.t2.x FROM %s.t2" % ( + config.test_schema, config.test_schema)), + ) + class ExecutionOptionsTest(fixtures.TestBase): diff --git a/test/engine/test_reflection.py b/test/engine/test_reflection.py index b7bf87d63..f9799fda0 100644 --- a/test/engine/test_reflection.py +++ b/test/engine/test_reflection.py @@ -1,16 +1,15 @@ -import operator - import unicodedata import sqlalchemy as sa -from sqlalchemy import schema, events, event, inspect +from sqlalchemy import schema, inspect from sqlalchemy import MetaData, Integer, String -from sqlalchemy.testing import (ComparesTables, engines, AssertsCompiledSQL, +from sqlalchemy.testing import ( + ComparesTables, engines, AssertsCompiledSQL, fixtures, skip) from sqlalchemy.testing.schema import Table, Column from sqlalchemy.testing import eq_, assert_raises, assert_raises_message from sqlalchemy import testing from sqlalchemy.util import ue - +from sqlalchemy.testing import config metadata, users = None, None @@ -1345,6 +1344,18 @@ class SchemaTest(fixtures.TestBase): metadata.drop_all() @testing.requires.schemas + @testing.provide_metadata + def test_schema_translation(self): + Table('foob', self.metadata, Column('q', Integer), schema=config.test_schema) + self.metadata.create_all() + + m = MetaData() + map_ = {"foob": config.test_schema} + with config.db.connect().execution_options(schema_translate_map=map_) as conn: + t = Table('foob', m, schema="foob", autoload_with=conn) + eq_(t.schema, "foob") + eq_(t.c.keys(), ['q']) + @testing.requires.schemas @testing.fails_on('sybase', 'FIXME: unknown') def test_explicit_default_schema_metadata(self): engine = testing.db |
