summaryrefslogtreecommitdiff
path: root/test/sql
diff options
context:
space:
mode:
authormike bayer <mike_mp@zzzcomputing.com>2020-06-01 20:03:28 +0000
committerGerrit Code Review <gerrit@bbpush.zzzcomputing.com>2020-06-01 20:03:28 +0000
commit03c4166fa852d20b1bc1f4693e70a13725a3ba2e (patch)
tree4d2fe6fef7e7999ee6a0ae4017907cca9c4a294c /test/sql
parent1287853bf59a3e50b0a518d4410ff60305058131 (diff)
parent4ecd352a9fbb9dbac7b428fe0f098f665c1f0cb1 (diff)
downloadsqlalchemy-03c4166fa852d20b1bc1f4693e70a13725a3ba2e.tar.gz
Merge "Improve rendering of core statements w/ ORM elements"
Diffstat (limited to 'test/sql')
-rw-r--r--test/sql/test_compare.py50
-rw-r--r--test/sql/test_compiler.py24
-rw-r--r--test/sql/test_selectable.py60
-rw-r--r--test/sql/test_utils.py41
4 files changed, 175 insertions, 0 deletions
diff --git a/test/sql/test_compare.py b/test/sql/test_compare.py
index d3d21cb0e..2d84ab676 100644
--- a/test/sql/test_compare.py
+++ b/test/sql/test_compare.py
@@ -661,6 +661,56 @@ class CoreFixtures(object):
fixtures.append(_statements_w_context_options_fixtures)
+ def _statements_w_anonymous_col_names():
+ def one():
+ c = column("q")
+
+ l = c.label(None)
+
+ # new case as of Id810f485c5f7ed971529489b84694e02a3356d6d
+ subq = select([l]).subquery()
+
+ # this creates a ColumnClause as a proxy to the Label() that has
+ # an anoymous name, so the column has one too.
+ anon_col = subq.c[0]
+
+ # then when BindParameter is created, it checks the label
+ # and doesn't double up on the anonymous name which is uncachable
+ return anon_col > 5
+
+ def two():
+ c = column("p")
+
+ l = c.label(None)
+
+ # new case as of Id810f485c5f7ed971529489b84694e02a3356d6d
+ subq = select([l]).subquery()
+
+ # this creates a ColumnClause as a proxy to the Label() that has
+ # an anoymous name, so the column has one too.
+ anon_col = subq.c[0]
+
+ # then when BindParameter is created, it checks the label
+ # and doesn't double up on the anonymous name which is uncachable
+ return anon_col > 5
+
+ def three():
+
+ l1, l2 = table_a.c.a.label(None), table_a.c.b.label(None)
+
+ stmt = select([table_a.c.a, table_a.c.b, l1, l2])
+
+ subq = stmt.subquery()
+ return select([subq]).where(subq.c[2] == 10)
+
+ return (
+ one(),
+ two(),
+ three(),
+ )
+
+ fixtures.append(_statements_w_anonymous_col_names)
+
class CacheKeyFixture(object):
def _run_cache_key_fixture(self, fixture, compare_values):
diff --git a/test/sql/test_compiler.py b/test/sql/test_compiler.py
index df52a62c0..9881d1247 100644
--- a/test/sql/test_compiler.py
+++ b/test/sql/test_compiler.py
@@ -71,6 +71,7 @@ from sqlalchemy.engine import default
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql import column
from sqlalchemy.sql import compiler
+from sqlalchemy.sql import elements
from sqlalchemy.sql import label
from sqlalchemy.sql import operators
from sqlalchemy.sql import table
@@ -3294,6 +3295,29 @@ class BindParameterTest(AssertsCompiledSQL, fixtures.TestBase):
checkparams={"3foo_1": "foo", "4_foo_1": "bar"},
)
+ def test_bind_given_anon_name_dont_double(self):
+ c = column("id")
+ l = c.label(None)
+
+ # new case as of Id810f485c5f7ed971529489b84694e02a3356d6d
+ subq = select([l]).subquery()
+
+ # this creates a ColumnClause as a proxy to the Label() that has
+ # an anoymous name, so the column has one too.
+ anon_col = subq.c[0]
+ assert isinstance(anon_col.name, elements._anonymous_label)
+
+ # then when BindParameter is created, it checks the label
+ # and doesn't double up on the anonymous name which is uncachable
+ expr = anon_col > 5
+
+ self.assert_compile(
+ expr, "anon_1.id_1 > :param_1", checkparams={"param_1": 5}
+ )
+
+ # see also test_compare.py -> _statements_w_anonymous_col_names
+ # fixture for cache key
+
def test_bind_as_col(self):
t = table("foo", column("id"))
diff --git a/test/sql/test_selectable.py b/test/sql/test_selectable.py
index e509c9f95..d53ee3385 100644
--- a/test/sql/test_selectable.py
+++ b/test/sql/test_selectable.py
@@ -147,6 +147,66 @@ class SelectableTest(
assert s1.corresponding_column(scalar_select) is s1.c.foo
assert s2.corresponding_column(scalar_select) is s2.c.foo
+ def test_labels_name_w_separate_key(self):
+ label = select([table1.c.col1]).label("foo")
+ label.key = "bar"
+
+ s1 = select([label])
+ assert s1.corresponding_column(label) is s1.selected_columns.bar
+
+ # renders as foo
+ self.assert_compile(
+ s1, "SELECT (SELECT table1.col1 FROM table1) AS foo"
+ )
+
+ def test_labels_anon_w_separate_key(self):
+ label = select([table1.c.col1]).label(None)
+ label.key = "bar"
+
+ s1 = select([label])
+
+ # .bar is there
+ assert s1.corresponding_column(label) is s1.selected_columns.bar
+
+ # renders as anon_1
+ self.assert_compile(
+ s1, "SELECT (SELECT table1.col1 FROM table1) AS anon_1"
+ )
+
+ def test_labels_anon_w_separate_key_subquery(self):
+ label = select([table1.c.col1]).label(None)
+ label.key = label._key_label = "bar"
+
+ s1 = select([label])
+
+ subq = s1.subquery()
+
+ s2 = select([subq]).where(subq.c.bar > 5)
+ self.assert_compile(
+ s2,
+ "SELECT anon_2.anon_1 FROM (SELECT (SELECT table1.col1 "
+ "FROM table1) AS anon_1) AS anon_2 "
+ "WHERE anon_2.anon_1 > :param_1",
+ checkparams={"param_1": 5},
+ )
+
+ def test_labels_anon_generate_binds_subquery(self):
+ label = select([table1.c.col1]).label(None)
+ label.key = label._key_label = "bar"
+
+ s1 = select([label])
+
+ subq = s1.subquery()
+
+ s2 = select([subq]).where(subq.c[0] > 5)
+ self.assert_compile(
+ s2,
+ "SELECT anon_2.anon_1 FROM (SELECT (SELECT table1.col1 "
+ "FROM table1) AS anon_1) AS anon_2 "
+ "WHERE anon_2.anon_1 > :param_1",
+ checkparams={"param_1": 5},
+ )
+
def test_select_label_grouped_still_corresponds(self):
label = select([table1.c.col1]).label("foo")
label2 = label.self_group()
diff --git a/test/sql/test_utils.py b/test/sql/test_utils.py
index 4e713dd28..d68a74475 100644
--- a/test/sql/test_utils.py
+++ b/test/sql/test_utils.py
@@ -4,8 +4,11 @@ from sqlalchemy import MetaData
from sqlalchemy import select
from sqlalchemy import String
from sqlalchemy import Table
+from sqlalchemy.sql import base as sql_base
from sqlalchemy.sql import util as sql_util
from sqlalchemy.sql.elements import ColumnElement
+from sqlalchemy.testing import assert_raises
+from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import eq_
from sqlalchemy.testing import fixtures
@@ -48,3 +51,41 @@ class MiscTest(fixtures.TestBase):
set(sql_util.find_tables(subset_select, include_aliases=True)),
{common, calias, subset_select},
)
+
+ def test_options_merge(self):
+ class opt1(sql_base.CacheableOptions):
+ _cache_key_traversal = []
+
+ class opt2(sql_base.CacheableOptions):
+ _cache_key_traversal = []
+
+ foo = "bar"
+
+ class opt3(sql_base.CacheableOptions):
+ _cache_key_traversal = []
+
+ foo = "bar"
+ bat = "hi"
+
+ o2 = opt2.safe_merge(opt1)
+ eq_(o2.__dict__, {})
+ eq_(o2.foo, "bar")
+
+ assert_raises_message(
+ TypeError,
+ r"other element .*opt2.* is not empty, is not of type .*opt1.*, "
+ r"and contains attributes not covered here .*'foo'.*",
+ opt1.safe_merge,
+ opt2,
+ )
+
+ o2 = opt2 + {"foo": "bat"}
+ o3 = opt2.safe_merge(o2)
+
+ eq_(o3.foo, "bat")
+
+ o4 = opt3.safe_merge(o2)
+ eq_(o4.foo, "bat")
+ eq_(o4.bat, "hi")
+
+ assert_raises(TypeError, opt2.safe_merge, o4)