summaryrefslogtreecommitdiff
path: root/test/sql
diff options
context:
space:
mode:
Diffstat (limited to 'test/sql')
-rw-r--r--test/sql/test_compiler.py8
-rw-r--r--test/sql/test_defaults.py2
-rw-r--r--test/sql/test_deprecations.py425
-rw-r--r--test/sql/test_generative.py4
-rw-r--r--test/sql/test_metadata.py12
-rw-r--r--test/sql/test_resultset.py2
-rw-r--r--test/sql/test_selectable.py33
-rw-r--r--test/sql/test_text.py12
-rw-r--r--test/sql/test_types.py52
9 files changed, 445 insertions, 105 deletions
diff --git a/test/sql/test_compiler.py b/test/sql/test_compiler.py
index f152d0ed9..3418eac73 100644
--- a/test/sql/test_compiler.py
+++ b/test/sql/test_compiler.py
@@ -1529,14 +1529,6 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
"FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE",
)
- assert_raises_message(
- exc.ArgumentError,
- "Unknown for_update argument: 'unknown_mode'",
- table1.select,
- table1.c.myid == 7,
- for_update="unknown_mode",
- )
-
def test_alias(self):
# test the alias for a table1. column names stay the same,
# table name "changes" to "foo".
diff --git a/test/sql/test_defaults.py b/test/sql/test_defaults.py
index 5ede46ede..4ce8cc32f 100644
--- a/test/sql/test_defaults.py
+++ b/test/sql/test_defaults.py
@@ -363,7 +363,7 @@ class DefaultTest(fixtures.TestBase):
@testing.fails_on("firebird", "Data type unknown")
def test_standalone(self):
- c = testing.db.engine.contextual_connect()
+ c = testing.db.engine.connect()
x = c.execute(t.c.col1.default)
y = t.c.col2.default.execute()
z = c.execute(t.c.col3.default)
diff --git a/test/sql/test_deprecations.py b/test/sql/test_deprecations.py
new file mode 100644
index 000000000..2e4042a1b
--- /dev/null
+++ b/test/sql/test_deprecations.py
@@ -0,0 +1,425 @@
+#! coding: utf-8
+
+from sqlalchemy import bindparam
+from sqlalchemy import Column
+from sqlalchemy import column
+from sqlalchemy import create_engine
+from sqlalchemy import exc
+from sqlalchemy import ForeignKey
+from sqlalchemy import Integer
+from sqlalchemy import MetaData
+from sqlalchemy import select
+from sqlalchemy import String
+from sqlalchemy import Table
+from sqlalchemy import table
+from sqlalchemy import testing
+from sqlalchemy import text
+from sqlalchemy import util
+from sqlalchemy.engine import default
+from sqlalchemy.schema import DDL
+from sqlalchemy.sql import util as sql_util
+from sqlalchemy.testing import assert_raises
+from sqlalchemy.testing import assert_raises_message
+from sqlalchemy.testing import AssertsCompiledSQL
+from sqlalchemy.testing import engines
+from sqlalchemy.testing import eq_
+from sqlalchemy.testing import fixtures
+from sqlalchemy.testing import mock
+
+
+class DeprecationWarningsTest(fixtures.TestBase):
+ def test_ident_preparer_force(self):
+ preparer = testing.db.dialect.identifier_preparer
+ preparer.quote("hi")
+ with testing.expect_deprecated(
+ "The IdentifierPreparer.quote.force parameter is deprecated"
+ ):
+ preparer.quote("hi", True)
+
+ with testing.expect_deprecated(
+ "The IdentifierPreparer.quote.force parameter is deprecated"
+ ):
+ preparer.quote("hi", False)
+
+ preparer.quote_schema("hi")
+ with testing.expect_deprecated(
+ "The IdentifierPreparer.quote_schema.force parameter is deprecated"
+ ):
+ preparer.quote_schema("hi", True)
+
+ with testing.expect_deprecated(
+ "The IdentifierPreparer.quote_schema.force parameter is deprecated"
+ ):
+ preparer.quote_schema("hi", True)
+
+ def test_string_convert_unicode(self):
+ with testing.expect_deprecated(
+ "The String.convert_unicode parameter is deprecated and "
+ "will be removed in a future release."
+ ):
+ String(convert_unicode=True)
+
+ def test_string_convert_unicode_force(self):
+ with testing.expect_deprecated(
+ "The String.convert_unicode parameter is deprecated and "
+ "will be removed in a future release."
+ ):
+ String(convert_unicode="force")
+
+ def test_engine_convert_unicode(self):
+ with testing.expect_deprecated(
+ "The create_engine.convert_unicode parameter and "
+ "corresponding dialect-level"
+ ):
+ create_engine("mysql://", convert_unicode=True, module=mock.Mock())
+
+ def test_join_condition_ignore_nonexistent_tables(self):
+ m = MetaData()
+ t1 = Table("t1", m, Column("id", Integer))
+ t2 = Table(
+ "t2", m, Column("id", Integer), Column("t1id", ForeignKey("t1.id"))
+ )
+ with testing.expect_deprecated(
+ "The join_condition.ignore_nonexistent_tables "
+ "parameter is deprecated"
+ ):
+ join_cond = sql_util.join_condition(
+ t1, t2, ignore_nonexistent_tables=True
+ )
+
+ t1t2 = t1.join(t2)
+
+ assert t1t2.onclause.compare(join_cond)
+
+ def test_select_autocommit(self):
+ with testing.expect_deprecated(
+ "The select.autocommit parameter is deprecated and "
+ "will be removed in a future release."
+ ):
+ stmt = select([column("x")], autocommit=True)
+
+ def test_select_for_update(self):
+ with testing.expect_deprecated(
+ "The select.for_update parameter is deprecated and "
+ "will be removed in a future release."
+ ):
+ stmt = select([column("x")], for_update=True)
+
+ @testing.provide_metadata
+ def test_table_useexisting(self):
+ meta = self.metadata
+
+ Table("t", meta, Column("x", Integer))
+ meta.create_all()
+
+ with testing.expect_deprecated(
+ "The Table.useexisting parameter is deprecated and "
+ "will be removed in a future release."
+ ):
+ Table("t", meta, useexisting=True, autoload_with=testing.db)
+
+ with testing.expect_deprecated(
+ "The Table.useexisting parameter is deprecated and "
+ "will be removed in a future release."
+ ):
+ assert_raises_message(
+ exc.ArgumentError,
+ "useexisting is synonymous with extend_existing.",
+ Table,
+ "t",
+ meta,
+ useexisting=True,
+ extend_existing=True,
+ autoload_with=testing.db,
+ )
+
+
+class DDLListenerDeprecationsTest(fixtures.TestBase):
+ def setup(self):
+ self.bind = self.engine = engines.mock_engine()
+ self.metadata = MetaData(self.bind)
+ self.table = Table("t", self.metadata, Column("id", Integer))
+ self.users = Table(
+ "users",
+ self.metadata,
+ Column("user_id", Integer, primary_key=True),
+ Column("user_name", String(40)),
+ )
+
+ def test_append_listener(self):
+ metadata, table, bind = self.metadata, self.table, self.bind
+
+ def fn(*a):
+ return None
+
+ with testing.expect_deprecated(".* is deprecated .*"):
+ table.append_ddl_listener("before-create", fn)
+ with testing.expect_deprecated(".* is deprecated .*"):
+ assert_raises(
+ exc.InvalidRequestError, table.append_ddl_listener, "blah", fn
+ )
+
+ with testing.expect_deprecated(".* is deprecated .*"):
+ metadata.append_ddl_listener("before-create", fn)
+ with testing.expect_deprecated(".* is deprecated .*"):
+ assert_raises(
+ exc.InvalidRequestError,
+ metadata.append_ddl_listener,
+ "blah",
+ fn,
+ )
+
+ def test_deprecated_append_ddl_listener_table(self):
+ metadata, users, engine = self.metadata, self.users, self.engine
+ canary = []
+ with testing.expect_deprecated(".* is deprecated .*"):
+ users.append_ddl_listener(
+ "before-create", lambda e, t, b: canary.append("mxyzptlk")
+ )
+ with testing.expect_deprecated(".* is deprecated .*"):
+ users.append_ddl_listener(
+ "after-create", lambda e, t, b: canary.append("klptzyxm")
+ )
+ with testing.expect_deprecated(".* is deprecated .*"):
+ users.append_ddl_listener(
+ "before-drop", lambda e, t, b: canary.append("xyzzy")
+ )
+ with testing.expect_deprecated(".* is deprecated .*"):
+ users.append_ddl_listener(
+ "after-drop", lambda e, t, b: canary.append("fnord")
+ )
+
+ metadata.create_all()
+ assert "mxyzptlk" in canary
+ assert "klptzyxm" in canary
+ assert "xyzzy" not in canary
+ assert "fnord" not in canary
+ del engine.mock[:]
+ canary[:] = []
+ metadata.drop_all()
+ assert "mxyzptlk" not in canary
+ assert "klptzyxm" not in canary
+ assert "xyzzy" in canary
+ assert "fnord" in canary
+
+ def test_deprecated_append_ddl_listener_metadata(self):
+ metadata, users, engine = self.metadata, self.users, self.engine
+ canary = []
+ with testing.expect_deprecated(".* is deprecated .*"):
+ metadata.append_ddl_listener(
+ "before-create",
+ lambda e, t, b, tables=None: canary.append("mxyzptlk"),
+ )
+ with testing.expect_deprecated(".* is deprecated .*"):
+ metadata.append_ddl_listener(
+ "after-create",
+ lambda e, t, b, tables=None: canary.append("klptzyxm"),
+ )
+ with testing.expect_deprecated(".* is deprecated .*"):
+ metadata.append_ddl_listener(
+ "before-drop",
+ lambda e, t, b, tables=None: canary.append("xyzzy"),
+ )
+ with testing.expect_deprecated(".* is deprecated .*"):
+ metadata.append_ddl_listener(
+ "after-drop",
+ lambda e, t, b, tables=None: canary.append("fnord"),
+ )
+
+ metadata.create_all()
+ assert "mxyzptlk" in canary
+ assert "klptzyxm" in canary
+ assert "xyzzy" not in canary
+ assert "fnord" not in canary
+ del engine.mock[:]
+ canary[:] = []
+ metadata.drop_all()
+ assert "mxyzptlk" not in canary
+ assert "klptzyxm" not in canary
+ assert "xyzzy" in canary
+ assert "fnord" in canary
+
+ def test_filter_deprecated(self):
+ cx = self.engine
+
+ tbl = Table("t", MetaData(), Column("id", Integer))
+ target = cx.name
+
+ assert DDL("")._should_execute_deprecated("x", tbl, cx)
+ with testing.expect_deprecated(".* is deprecated .*"):
+ assert DDL("", on=target)._should_execute_deprecated("x", tbl, cx)
+ with testing.expect_deprecated(".* is deprecated .*"):
+ assert not DDL("", on="bogus")._should_execute_deprecated(
+ "x", tbl, cx
+ )
+ with testing.expect_deprecated(".* is deprecated .*"):
+ assert DDL(
+ "", on=lambda d, x, y, z: True
+ )._should_execute_deprecated("x", tbl, cx)
+ with testing.expect_deprecated(".* is deprecated .*"):
+ assert DDL(
+ "", on=lambda d, x, y, z: z.engine.name != "bogus"
+ )._should_execute_deprecated("x", tbl, cx)
+
+
+class ConvertUnicodeDeprecationTest(fixtures.TestBase):
+
+ __backend__ = True
+
+ data = util.u(
+ "Alors vous imaginez ma surprise, au lever du jour, quand "
+ "une drôle de petite voix m’a réveillé. "
+ "Elle disait: « S’il vous plaît… dessine-moi un mouton! »"
+ )
+
+ def test_unicode_warnings_dialectlevel(self):
+
+ unicodedata = self.data
+
+ with testing.expect_deprecated(
+ "The create_engine.convert_unicode parameter and "
+ "corresponding dialect-level"
+ ):
+ dialect = default.DefaultDialect(convert_unicode=True)
+ dialect.supports_unicode_binds = False
+
+ s = String()
+ uni = s.dialect_impl(dialect).bind_processor(dialect)
+
+ uni(util.b("x"))
+ assert isinstance(uni(unicodedata), util.binary_type)
+
+ eq_(uni(unicodedata), unicodedata.encode("utf-8"))
+
+ def test_ignoring_unicode_error(self):
+ """checks String(unicode_error='ignore') is passed to
+ underlying codec."""
+
+ unicodedata = self.data
+
+ with testing.expect_deprecated(
+ "The String.convert_unicode parameter is deprecated and "
+ "will be removed in a future release.",
+ "The String.unicode_errors parameter is deprecated and "
+ "will be removed in a future release.",
+ ):
+ type_ = String(
+ 248, convert_unicode="force", unicode_error="ignore"
+ )
+ dialect = default.DefaultDialect(encoding="ascii")
+ proc = type_.result_processor(dialect, 10)
+
+ utfdata = unicodedata.encode("utf8")
+ eq_(proc(utfdata), unicodedata.encode("ascii", "ignore").decode())
+
+
+class ForUpdateTest(fixtures.TestBase, AssertsCompiledSQL):
+ __dialect__ = "default"
+
+ def _assert_legacy(self, leg, read=False, nowait=False):
+ t = table("t", column("c"))
+
+ with testing.expect_deprecated(
+ "The select.for_update parameter is deprecated and "
+ "will be removed in a future release."
+ ):
+ s1 = select([t], for_update=leg)
+
+ if leg is False:
+ assert s1._for_update_arg is None
+ assert s1.for_update is None
+ else:
+ eq_(s1._for_update_arg.read, read)
+ eq_(s1._for_update_arg.nowait, nowait)
+ eq_(s1.for_update, leg)
+
+ def test_false_legacy(self):
+ self._assert_legacy(False)
+
+ def test_plain_true_legacy(self):
+ self._assert_legacy(True)
+
+ def test_read_legacy(self):
+ self._assert_legacy("read", read=True)
+
+ def test_nowait_legacy(self):
+ self._assert_legacy("nowait", nowait=True)
+
+ def test_read_nowait_legacy(self):
+ self._assert_legacy("read_nowait", read=True, nowait=True)
+
+ def test_unknown_mode(self):
+ t = table("t", column("c"))
+
+ with testing.expect_deprecated(
+ "The select.for_update parameter is deprecated and "
+ "will be removed in a future release."
+ ):
+ assert_raises_message(
+ exc.ArgumentError,
+ "Unknown for_update argument: 'unknown_mode'",
+ t.select,
+ t.c.c == 7,
+ for_update="unknown_mode",
+ )
+
+ def test_legacy_setter(self):
+ t = table("t", column("c"))
+ s = select([t])
+ s.for_update = "nowait"
+ eq_(s._for_update_arg.nowait, True)
+
+
+class TextTest(fixtures.TestBase, AssertsCompiledSQL):
+ __dialect__ = "default"
+
+ def test_legacy_bindparam(self):
+ with testing.expect_deprecated(
+ "The text.bindparams parameter is deprecated"
+ ):
+ t = text(
+ "select * from foo where lala=:bar and hoho=:whee",
+ bindparams=[bindparam("bar", 4), bindparam("whee", 7)],
+ )
+
+ self.assert_compile(
+ t,
+ "select * from foo where lala=:bar and hoho=:whee",
+ checkparams={"bar": 4, "whee": 7},
+ )
+
+ def test_legacy_typemap(self):
+ table1 = table(
+ "mytable",
+ column("myid", Integer),
+ column("name", String),
+ column("description", String),
+ )
+ with testing.expect_deprecated(
+ "The text.typemap parameter is deprecated"
+ ):
+ t = text(
+ "select id, name from user",
+ typemap=dict(id=Integer, name=String),
+ )
+
+ stmt = select([table1.c.myid]).select_from(
+ table1.join(t, table1.c.myid == t.c.id)
+ )
+ compiled = stmt.compile()
+ eq_(
+ compiled._create_result_map(),
+ {
+ "myid": (
+ "myid",
+ (table1.c.myid, "myid", "myid"),
+ table1.c.myid.type,
+ )
+ },
+ )
+
+ def test_autocommit(self):
+ with testing.expect_deprecated(
+ "The text.autocommit parameter is deprecated"
+ ):
+ t = text("select id, name from user", autocommit=True)
diff --git a/test/sql/test_generative.py b/test/sql/test_generative.py
index 1e6221cdc..f030f1d9c 100644
--- a/test/sql/test_generative.py
+++ b/test/sql/test_generative.py
@@ -500,8 +500,8 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL):
)
def test_text(self):
- clause = text(
- "select * from table where foo=:bar", bindparams=[bindparam("bar")]
+ clause = text("select * from table where foo=:bar").bindparams(
+ bindparam("bar")
)
c1 = str(clause)
diff --git a/test/sql/test_metadata.py b/test/sql/test_metadata.py
index a6d4b2d1a..3d60fb60e 100644
--- a/test/sql/test_metadata.py
+++ b/test/sql/test_metadata.py
@@ -2262,18 +2262,6 @@ class UseExistingTest(fixtures.TablesTest):
extend_existing=True,
)
- @testing.uses_deprecated()
- def test_existing_plus_useexisting_raises(self):
- meta2 = self._useexisting_fixture()
- assert_raises(
- exc.ArgumentError,
- Table,
- "users",
- meta2,
- useexisting=True,
- extend_existing=True,
- )
-
def test_keep_existing_no_dupe_constraints(self):
meta2 = self._notexisting_fixture()
users = Table(
diff --git a/test/sql/test_resultset.py b/test/sql/test_resultset.py
index 3bd61b1f8..5987c7746 100644
--- a/test/sql/test_resultset.py
+++ b/test/sql/test_resultset.py
@@ -1647,7 +1647,7 @@ class AlternateResultProxyTest(fixtures.TablesTest):
"test",
metadata,
Column("x", Integer, primary_key=True),
- Column("y", String(50, convert_unicode="force")),
+ Column("y", String(50)),
)
@classmethod
diff --git a/test/sql/test_selectable.py b/test/sql/test_selectable.py
index 5456dfb4f..04c0e6102 100644
--- a/test/sql/test_selectable.py
+++ b/test/sql/test_selectable.py
@@ -2544,39 +2544,6 @@ class ResultMapTest(fixtures.TestBase):
class ForUpdateTest(fixtures.TestBase, AssertsCompiledSQL):
__dialect__ = "default"
- def _assert_legacy(self, leg, read=False, nowait=False):
- t = table("t", column("c"))
- s1 = select([t], for_update=leg)
-
- if leg is False:
- assert s1._for_update_arg is None
- assert s1.for_update is None
- else:
- eq_(s1._for_update_arg.read, read)
- eq_(s1._for_update_arg.nowait, nowait)
- eq_(s1.for_update, leg)
-
- def test_false_legacy(self):
- self._assert_legacy(False)
-
- def test_plain_true_legacy(self):
- self._assert_legacy(True)
-
- def test_read_legacy(self):
- self._assert_legacy("read", read=True)
-
- def test_nowait_legacy(self):
- self._assert_legacy("nowait", nowait=True)
-
- def test_read_nowait_legacy(self):
- self._assert_legacy("read_nowait", read=True, nowait=True)
-
- def test_legacy_setter(self):
- t = table("t", column("c"))
- s = select([t])
- s.for_update = "nowait"
- eq_(s._for_update_arg.nowait, True)
-
def test_basic_clone(self):
t = table("t", column("c"))
s = select([t]).with_for_update(read=True, of=t.c.c)
diff --git a/test/sql/test_text.py b/test/sql/test_text.py
index 6b419f599..48302058d 100644
--- a/test/sql/test_text.py
+++ b/test/sql/test_text.py
@@ -198,18 +198,6 @@ class SelectCompositionTest(fixtures.TestBase, AssertsCompiledSQL):
class BindParamTest(fixtures.TestBase, AssertsCompiledSQL):
__dialect__ = "default"
- def test_legacy(self):
- t = text(
- "select * from foo where lala=:bar and hoho=:whee",
- bindparams=[bindparam("bar", 4), bindparam("whee", 7)],
- )
-
- self.assert_compile(
- t,
- "select * from foo where lala=:bar and hoho=:whee",
- checkparams={"bar": 4, "whee": 7},
- )
-
def test_positional(self):
t = text("select * from foo where lala=:bar and hoho=:whee")
t = t.bindparams(bindparam("bar", 4), bindparam("whee", 7))
diff --git a/test/sql/test_types.py b/test/sql/test_types.py
index c54fe1e54..4bd182c3e 100644
--- a/test/sql/test_types.py
+++ b/test/sql/test_types.py
@@ -175,7 +175,7 @@ class AdaptTest(fixtures.TestBase):
% (type_, expected)
)
- @testing.uses_deprecated()
+ @testing.uses_deprecated(".*Binary.*")
def test_adapt_method(self):
"""ensure all types have a working adapt() method,
which creates a distinct copy.
@@ -191,6 +191,8 @@ class AdaptTest(fixtures.TestBase):
def adaptions():
for typ in self._all_types():
+ # up adapt from LowerCase to UPPERCASE,
+ # as well as to all non-sqltypes
up_adaptions = [typ] + typ.__subclasses__()
yield False, typ, up_adaptions
for subcl in typ.__subclasses__():
@@ -258,7 +260,6 @@ class AdaptTest(fixtures.TestBase):
eq_(types.DateTime().python_type, datetime.datetime)
eq_(types.String().python_type, str)
eq_(types.Unicode().python_type, util.text_type)
- eq_(types.String(convert_unicode=True).python_type, util.text_type)
eq_(types.Enum("one", "two", "three").python_type, str)
assert_raises(
@@ -283,10 +284,15 @@ class AdaptTest(fixtures.TestBase):
This essentially is testing the behavior of util.constructor_copy().
"""
- t1 = String(length=50, convert_unicode=False)
- t2 = t1.adapt(Text, convert_unicode=True)
+ t1 = String(length=50)
+ t2 = t1.adapt(Text)
eq_(t2.length, 50)
- eq_(t2.convert_unicode, True)
+
+ def test_convert_unicode_text_type(self):
+ with testing.expect_deprecated(
+ "The String.convert_unicode parameter is deprecated"
+ ):
+ eq_(types.String(convert_unicode=True).python_type, util.text_type)
class TypeAffinityTest(fixtures.TestBase):
@@ -1245,34 +1251,6 @@ class UnicodeTest(fixtures.TestBase):
):
eq_(uni(5), 5)
- def test_unicode_warnings_dialectlevel(self):
-
- unicodedata = self.data
-
- dialect = default.DefaultDialect(convert_unicode=True)
- dialect.supports_unicode_binds = False
-
- s = String()
- uni = s.dialect_impl(dialect).bind_processor(dialect)
-
- uni(util.b("x"))
- assert isinstance(uni(unicodedata), util.binary_type)
-
- eq_(uni(unicodedata), unicodedata.encode("utf-8"))
-
- def test_ignoring_unicode_error(self):
- """checks String(unicode_error='ignore') is passed to
- underlying codec."""
-
- unicodedata = self.data
-
- type_ = String(248, convert_unicode="force", unicode_error="ignore")
- dialect = default.DefaultDialect(encoding="ascii")
- proc = type_.result_processor(dialect, 10)
-
- utfdata = unicodedata.encode("utf8")
- eq_(proc(utfdata), unicodedata.encode("ascii", "ignore").decode())
-
class EnumTest(AssertsCompiledSQL, fixtures.TablesTest):
__backend__ = True
@@ -1857,6 +1835,7 @@ class EnumTest(AssertsCompiledSQL, fixtures.TablesTest):
# depending on backend.
assert "('x'," in e.print_sql()
+ @testing.uses_deprecated(".*convert_unicode")
def test_repr(self):
e = Enum(
"x",
@@ -1958,13 +1937,14 @@ class BinaryTest(fixtures.TestBase, AssertsExecutionResults):
binary_table.select(order_by=binary_table.c.primary_id),
text(
"select * from binary_table order by binary_table.primary_id",
- typemap={
+ bind=testing.db,
+ ).columns(
+ **{
"pickled": PickleType,
"mypickle": MyPickleType,
"data": LargeBinary,
"data_slice": LargeBinary,
- },
- bind=testing.db,
+ }
),
):
result = stmt.execute().fetchall()