summaryrefslogtreecommitdiff
path: root/test/dialect/postgresql
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2019-10-21 17:32:04 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2019-10-22 18:27:48 -0400
commit240d9a60ccdb540543a72d9ff30a6f50d33acc5d (patch)
treeff665f4c9a1e7dac8311732bf8ecc1a370a388fc /test/dialect/postgresql
parentd76cb7213557c24609a1a75d8c391aea0179562a (diff)
downloadsqlalchemy-240d9a60ccdb540543a72d9ff30a6f50d33acc5d.tar.gz
Refactor dialect tests for combinations
Dialect tests tend to have a lot of lists of types, SQL constructs etc, convert as many of these to @combinations as possible. This is exposing that we don't have per-combination exclusion rules set up which is making things a little bit cumbersome. Also set up a fixture that does metadata + DDL. Change-Id: Ief820e48c9202982b0b1e181b87862490cd7b0c3
Diffstat (limited to 'test/dialect/postgresql')
-rw-r--r--test/dialect/postgresql/test_types.py585
1 files changed, 234 insertions, 351 deletions
diff --git a/test/dialect/postgresql/test_types.py b/test/dialect/postgresql/test_types.py
index 557b91622..1eb8677bf 100644
--- a/test/dialect/postgresql/test_types.py
+++ b/test/dialect/postgresql/test_types.py
@@ -1,6 +1,7 @@
# coding: utf-8
import datetime
import decimal
+import uuid
import sqlalchemy as sa
from sqlalchemy import any_
@@ -32,6 +33,7 @@ from sqlalchemy import Unicode
from sqlalchemy import util
from sqlalchemy.dialects import postgresql
from sqlalchemy.dialects.postgresql import array
+from sqlalchemy.dialects.postgresql import base
from sqlalchemy.dialects.postgresql import DATERANGE
from sqlalchemy.dialects.postgresql import HSTORE
from sqlalchemy.dialects.postgresql import hstore
@@ -885,37 +887,35 @@ class TimezoneTest(fixtures.TestBase):
assert row[0] >= somedate
-class TimePrecisionTest(fixtures.TestBase, AssertsCompiledSQL):
+class TimePrecisionCompileTest(fixtures.TestBase, AssertsCompiledSQL):
+ __dialect__ = postgresql.dialect()
+
+ @testing.combinations(
+ (postgresql.TIME(), "TIME WITHOUT TIME ZONE"),
+ (postgresql.TIME(precision=5), "TIME(5) WITHOUT TIME ZONE"),
+ (
+ postgresql.TIME(timezone=True, precision=5),
+ "TIME(5) WITH TIME ZONE",
+ ),
+ (postgresql.TIMESTAMP(), "TIMESTAMP WITHOUT TIME ZONE"),
+ (postgresql.TIMESTAMP(precision=5), "TIMESTAMP(5) WITHOUT TIME ZONE"),
+ (
+ postgresql.TIMESTAMP(timezone=True, precision=5),
+ "TIMESTAMP(5) WITH TIME ZONE",
+ ),
+ (postgresql.TIME(precision=0), "TIME(0) WITHOUT TIME ZONE"),
+ (postgresql.TIMESTAMP(precision=0), "TIMESTAMP(0) WITHOUT TIME ZONE"),
+ )
+ def test_compile(self, type_, expected):
+ self.assert_compile(type_, expected)
+
+
+class TimePrecisionTest(fixtures.TestBase):
__dialect__ = postgresql.dialect()
__prefer__ = "postgresql"
__backend__ = True
- def test_compile(self):
- for type_, expected in [
- (postgresql.TIME(), "TIME WITHOUT TIME ZONE"),
- (postgresql.TIME(precision=5), "TIME(5) WITHOUT TIME ZONE"),
- (
- postgresql.TIME(timezone=True, precision=5),
- "TIME(5) WITH TIME ZONE",
- ),
- (postgresql.TIMESTAMP(), "TIMESTAMP WITHOUT TIME ZONE"),
- (
- postgresql.TIMESTAMP(precision=5),
- "TIMESTAMP(5) WITHOUT TIME ZONE",
- ),
- (
- postgresql.TIMESTAMP(timezone=True, precision=5),
- "TIMESTAMP(5) WITH TIME ZONE",
- ),
- (postgresql.TIME(precision=0), "TIME(0) WITHOUT TIME ZONE"),
- (
- postgresql.TIMESTAMP(precision=0),
- "TIMESTAMP(0) WITHOUT TIME ZONE",
- ),
- ]:
- self.assert_compile(type_, expected)
-
@testing.only_on("postgresql", "DB specific feature")
@testing.provide_metadata
def test_reflection(self):
@@ -1608,7 +1608,8 @@ class PGArrayRoundTripTest(
):
ARRAY = postgresql.ARRAY
- def _test_undim_array_contains_typed_exec(self, struct):
+ @testing.combinations((set,), (list,), (lambda elem: (x for x in elem),))
+ def test_undim_array_contains_typed_exec(self, struct):
arrtable = self.tables.arrtable
self._fixture_456(arrtable)
eq_(
@@ -1620,18 +1621,8 @@ class PGArrayRoundTripTest(
[4, 5, 6],
)
- def test_undim_array_contains_set_exec(self):
- self._test_undim_array_contains_typed_exec(set)
-
- def test_undim_array_contains_list_exec(self):
- self._test_undim_array_contains_typed_exec(list)
-
- def test_undim_array_contains_generator_exec(self):
- self._test_undim_array_contains_typed_exec(
- lambda elem: (x for x in elem)
- )
-
- def _test_dim_array_contains_typed_exec(self, struct):
+ @testing.combinations((set,), (list,), (lambda elem: (x for x in elem),))
+ def test_dim_array_contains_typed_exec(self, struct):
dim_arrtable = self.tables.dim_arrtable
self._fixture_456(dim_arrtable)
eq_(
@@ -1643,17 +1634,6 @@ class PGArrayRoundTripTest(
[4, 5, 6],
)
- def test_dim_array_contains_set_exec(self):
- self._test_dim_array_contains_typed_exec(set)
-
- def test_dim_array_contains_list_exec(self):
- self._test_dim_array_contains_typed_exec(list)
-
- def test_dim_array_contains_generator_exec(self):
- self._test_dim_array_contains_typed_exec(
- lambda elem: (x for x in elem)
- )
-
def test_array_contained_by_exec(self):
arrtable = self.tables.arrtable
with testing.db.connect() as conn:
@@ -1697,7 +1677,28 @@ class HashableFlagORMTest(fixtures.TestBase):
__only_on__ = "postgresql"
- def _test(self, type_, data):
+ @testing.combinations(
+ (
+ "ARRAY",
+ postgresql.ARRAY(Text()),
+ [["a", "b", "c"], ["d", "e", "f"]],
+ ),
+ (
+ "JSON",
+ postgresql.JSON(),
+ [
+ {"a": "1", "b": "2", "c": "3"},
+ {
+ "d": "4",
+ "e": {"e1": "5", "e2": "6"},
+ "f": {"f1": [9, 10, 11]},
+ },
+ ],
+ ),
+ id_="iaa",
+ )
+ @testing.provide_metadata
+ def test_hashable_flag(self, type_, data):
Base = declarative_base(metadata=self.metadata)
class A(Base):
@@ -1718,38 +1719,16 @@ class HashableFlagORMTest(fixtures.TestBase):
list(enumerate(data, 1)),
)
- @testing.provide_metadata
- def test_array(self):
- self._test(
- postgresql.ARRAY(Text()), [["a", "b", "c"], ["d", "e", "f"]]
- )
-
@testing.requires.hstore
- @testing.provide_metadata
def test_hstore(self):
- self._test(
+ self.test_hashable_flag(
postgresql.HSTORE(),
[{"a": "1", "b": "2", "c": "3"}, {"d": "4", "e": "5", "f": "6"}],
)
- @testing.provide_metadata
- def test_json(self):
- self._test(
- postgresql.JSON(),
- [
- {"a": "1", "b": "2", "c": "3"},
- {
- "d": "4",
- "e": {"e1": "5", "e2": "6"},
- "f": {"f1": [9, 10, 11]},
- },
- ],
- )
-
@testing.requires.postgresql_jsonb
- @testing.provide_metadata
def test_jsonb(self):
- self._test(
+ self.test_hashable_flag(
postgresql.JSONB(),
[
{"a": "1", "b": "2", "c": "3"},
@@ -1795,17 +1774,28 @@ class TimestampTest(fixtures.TestBase, AssertsExecutionResults):
assert isinstance(expr.type, postgresql.INTERVAL)
-class SpecialTypesTest(fixtures.TestBase, ComparesTables, AssertsCompiledSQL):
+class SpecialTypesCompileTest(fixtures.TestBase, AssertsCompiledSQL):
+ __dialect__ = "postgresql"
+
+ @testing.combinations(
+ (postgresql.BIT(), "BIT(1)"),
+ (postgresql.BIT(5), "BIT(5)"),
+ (postgresql.BIT(varying=True), "BIT VARYING"),
+ (postgresql.BIT(5, varying=True), "BIT VARYING(5)"),
+ )
+ def test_bit_compile(self, type_, expected):
+ self.assert_compile(type_, expected)
+
+
+class SpecialTypesTest(fixtures.TablesTest, ComparesTables):
"""test DDL and reflection of PG-specific types """
__only_on__ = ("postgresql >= 8.3.0",)
__backend__ = True
- @classmethod
- def setup_class(cls):
- global metadata, table
- metadata = MetaData(testing.db)
+ @testing.metadata_fixture()
+ def special_types_table(self, metadata):
# create these types so that we can issue
# special SQL92 INTERVAL syntax
@@ -1835,36 +1825,22 @@ class SpecialTypesTest(fixtures.TestBase, ComparesTables, AssertsCompiledSQL):
Column("tsvector_document", postgresql.TSVECTOR),
)
- metadata.create_all()
+ return table
+ def test_reflection(self, special_types_table):
# cheat so that the "strict type check"
# works
- table.c.year_interval.type = postgresql.INTERVAL()
- table.c.month_interval.type = postgresql.INTERVAL()
-
- @classmethod
- def teardown_class(cls):
- metadata.drop_all()
+ special_types_table.c.year_interval.type = postgresql.INTERVAL()
+ special_types_table.c.month_interval.type = postgresql.INTERVAL()
- def test_reflection(self):
m = MetaData(testing.db)
t = Table("sometable", m, autoload=True)
- self.assert_tables_equal(table, t, strict_types=True)
+ self.assert_tables_equal(special_types_table, t, strict_types=True)
assert t.c.plain_interval.type.precision is None
assert t.c.precision_interval.type.precision == 3
assert t.c.bitstring.type.length == 4
- def test_bit_compile(self):
- pairs = [
- (postgresql.BIT(), "BIT(1)"),
- (postgresql.BIT(5), "BIT(5)"),
- (postgresql.BIT(varying=True), "BIT VARYING"),
- (postgresql.BIT(5, varying=True), "BIT VARYING(5)"),
- ]
- for type_, expected in pairs:
- self.assert_compile(type_, expected)
-
@testing.provide_metadata
def test_tsvector_round_trip(self):
t = Table("t1", self.metadata, Column("data", postgresql.TSVECTOR))
@@ -1910,86 +1886,51 @@ class UUIDTest(fixtures.TestBase):
__only_on__ = "postgresql >= 8.3"
__backend__ = True
- @testing.fails_on(
- "postgresql+zxjdbc",
- 'column "data" is of type uuid but expression '
- "is of type character varying",
- )
- def test_uuid_string(self):
- import uuid
-
- self._test_round_trip(
- Table(
- "utable",
- MetaData(),
- Column("data", postgresql.UUID(as_uuid=False)),
- ),
+ @testing.combinations(
+ (
+ "not_as_uuid",
+ postgresql.UUID(as_uuid=False),
str(uuid.uuid4()),
str(uuid.uuid4()),
- )
-
- @testing.fails_on(
- "postgresql+zxjdbc",
- 'column "data" is of type uuid but expression is '
- "of type character varying",
+ ),
+ ("as_uuid", postgresql.UUID(as_uuid=True), uuid.uuid4(), uuid.uuid4()),
+ id_="iaaa",
)
- def test_uuid_uuid(self):
- import uuid
-
- self._test_round_trip(
- Table(
- "utable",
- MetaData(),
- Column("data", postgresql.UUID(as_uuid=True)),
- ),
- uuid.uuid4(),
- uuid.uuid4(),
- )
+ def test_round_trip(self, datatype, value1, value2):
- @testing.fails_on(
- "postgresql+zxjdbc",
- 'column "data" is of type uuid[] but '
- "expression is of type character varying",
- )
- @testing.fails_on("postgresql+pg8000", "No support for UUID with ARRAY")
- def test_uuid_array(self):
- import uuid
-
- self._test_round_trip(
- Table(
- "utable",
- MetaData(),
- Column(
- "data", postgresql.ARRAY(postgresql.UUID(as_uuid=True))
- ),
- ),
+ utable = Table("utable", MetaData(), Column("data", datatype))
+
+ with testing.db.connect() as conn:
+ conn.begin()
+ utable.create(conn)
+ conn.execute(utable.insert(), {"data": value1})
+ conn.execute(utable.insert(), {"data": value2})
+ r = conn.execute(
+ select([utable.c.data]).where(utable.c.data != value1)
+ )
+ eq_(r.fetchone()[0], value2)
+ eq_(r.fetchone(), None)
+
+ @testing.combinations(
+ (
+ "as_uuid",
+ postgresql.ARRAY(postgresql.UUID(as_uuid=True)),
[uuid.uuid4(), uuid.uuid4()],
[uuid.uuid4(), uuid.uuid4()],
- )
-
- @testing.fails_on(
- "postgresql+zxjdbc",
- 'column "data" is of type uuid[] but '
- "expression is of type character varying",
- )
- @testing.fails_on("postgresql+pg8000", "No support for UUID with ARRAY")
- def test_uuid_string_array(self):
- import uuid
-
- self._test_round_trip(
- Table(
- "utable",
- MetaData(),
- Column(
- "data", postgresql.ARRAY(postgresql.UUID(as_uuid=False))
- ),
- ),
+ ),
+ (
+ "not_as_uuid",
+ postgresql.ARRAY(postgresql.UUID(as_uuid=False)),
[str(uuid.uuid4()), str(uuid.uuid4())],
[str(uuid.uuid4()), str(uuid.uuid4())],
- )
+ ),
+ id_="iaaa",
+ )
+ @testing.fails_on("postgresql+pg8000", "No support for UUID with ARRAY")
+ def test_uuid_array(self, datatype, value1, value2):
+ self.test_round_trip(datatype, value1, value2)
def test_no_uuid_available(self):
- from sqlalchemy.dialects.postgresql import base
uuid_type = base._python_UUID
base._python_UUID = None
@@ -1998,26 +1939,6 @@ class UUIDTest(fixtures.TestBase):
finally:
base._python_UUID = uuid_type
- def setup(self):
- self.conn = testing.db.connect()
- self.conn.begin()
-
- def teardown(self):
- self.conn.close()
-
- def _test_round_trip(self, utable, value1, value2, exp_value2=None):
- utable.create(self.conn)
- self.conn.execute(utable.insert(), {"data": value1})
- self.conn.execute(utable.insert(), {"data": value2})
- r = self.conn.execute(
- select([utable.c.data]).where(utable.c.data != value1)
- )
- if exp_value2:
- eq_(r.fetchone()[0], exp_value2)
- else:
- eq_(r.fetchone()[0], value2)
- eq_(r.fetchone(), None)
-
class HStoreTest(AssertsCompiledSQL, fixtures.TestBase):
__dialect__ = "postgresql"
@@ -2040,13 +1961,6 @@ class HStoreTest(AssertsCompiledSQL, fixtures.TestBase):
"WHERE %s" % expected,
)
- def _test_cols(self, colclause, expected, from_=True):
- stmt = select([colclause])
- self.assert_compile(
- stmt,
- ("SELECT %s" + (" FROM test_table" if from_ else "")) % expected,
- )
-
def test_bind_serialize_default(self):
dialect = postgresql.dialect()
@@ -2184,60 +2098,48 @@ class HStoreTest(AssertsCompiledSQL, fixtures.TestBase):
"(test_table.hash -> %(hash_1)s) IS NULL",
)
- def test_cols_get(self):
- self._test_cols(
- self.hashcol["foo"],
+ @testing.combinations(
+ (
+ lambda self: self.hashcol["foo"],
"test_table.hash -> %(hash_1)s AS anon_1",
True,
- )
-
- def test_cols_delete_single_key(self):
- self._test_cols(
- self.hashcol.delete("foo"),
+ ),
+ (
+ lambda self: self.hashcol.delete("foo"),
"delete(test_table.hash, %(delete_2)s) AS delete_1",
True,
- )
-
- def test_cols_delete_array_of_keys(self):
- self._test_cols(
- self.hashcol.delete(postgresql.array(["foo", "bar"])),
+ ),
+ (
+ lambda self: self.hashcol.delete(postgresql.array(["foo", "bar"])),
(
"delete(test_table.hash, ARRAY[%(param_1)s, %(param_2)s]) "
"AS delete_1"
),
True,
- )
-
- def test_cols_delete_matching_pairs(self):
- self._test_cols(
- self.hashcol.delete(hstore("1", "2")),
+ ),
+ (
+ lambda self: self.hashcol.delete(hstore("1", "2")),
(
"delete(test_table.hash, hstore(%(hstore_1)s, %(hstore_2)s)) "
"AS delete_1"
),
True,
- )
-
- def test_cols_slice(self):
- self._test_cols(
- self.hashcol.slice(postgresql.array(["1", "2"])),
+ ),
+ (
+ lambda self: self.hashcol.slice(postgresql.array(["1", "2"])),
(
"slice(test_table.hash, ARRAY[%(param_1)s, %(param_2)s]) "
"AS slice_1"
),
True,
- )
-
- def test_cols_hstore_pair_text(self):
- self._test_cols(
- hstore("foo", "3")["foo"],
+ ),
+ (
+ lambda self: hstore("foo", "3")["foo"],
"hstore(%(hstore_1)s, %(hstore_2)s) -> %(hstore_3)s AS anon_1",
False,
- )
-
- def test_cols_hstore_pair_array(self):
- self._test_cols(
- hstore(
+ ),
+ (
+ lambda self: hstore(
postgresql.array(["1", "2"]), postgresql.array(["3", None])
)["1"],
(
@@ -2245,72 +2147,68 @@ class HStoreTest(AssertsCompiledSQL, fixtures.TestBase):
"ARRAY[%(param_3)s, NULL]) -> %(hstore_1)s AS anon_1"
),
False,
- )
-
- def test_cols_hstore_single_array(self):
- self._test_cols(
- hstore(postgresql.array(["1", "2", "3", None]))["3"],
+ ),
+ (
+ lambda self: hstore(postgresql.array(["1", "2", "3", None]))["3"],
(
"hstore(ARRAY[%(param_1)s, %(param_2)s, %(param_3)s, NULL]) "
"-> %(hstore_1)s AS anon_1"
),
False,
- )
-
- def test_cols_concat(self):
- self._test_cols(
- self.hashcol.concat(hstore(cast(self.test_table.c.id, Text), "3")),
+ ),
+ (
+ lambda self: self.hashcol.concat(
+ hstore(cast(self.test_table.c.id, Text), "3")
+ ),
(
"test_table.hash || hstore(CAST(test_table.id AS TEXT), "
"%(hstore_1)s) AS anon_1"
),
True,
- )
-
- def test_cols_concat_op(self):
- self._test_cols(
- hstore("foo", "bar") + self.hashcol,
+ ),
+ (
+ lambda self: hstore("foo", "bar") + self.hashcol,
"hstore(%(hstore_1)s, %(hstore_2)s) || test_table.hash AS anon_1",
True,
- )
-
- def test_cols_concat_get(self):
- self._test_cols(
- (self.hashcol + self.hashcol)["foo"],
+ ),
+ (
+ lambda self: (self.hashcol + self.hashcol)["foo"],
"(test_table.hash || test_table.hash) -> %(param_1)s AS anon_1",
- )
-
- def test_cols_against_is(self):
- self._test_cols(
- self.hashcol["foo"] != None, # noqa
+ True,
+ ),
+ (
+ lambda self: self.hashcol["foo"] != None, # noqa
"(test_table.hash -> %(hash_1)s) IS NOT NULL AS anon_1",
- )
-
- def test_cols_keys(self):
- self._test_cols(
+ True,
+ ),
+ (
# hide from 2to3
- getattr(self.hashcol, "keys")(),
+ lambda self: getattr(self.hashcol, "keys")(),
"akeys(test_table.hash) AS akeys_1",
True,
- )
-
- def test_cols_vals(self):
- self._test_cols(
- self.hashcol.vals(), "avals(test_table.hash) AS avals_1", True
- )
-
- def test_cols_array(self):
- self._test_cols(
- self.hashcol.array(),
+ ),
+ (
+ lambda self: self.hashcol.vals(),
+ "avals(test_table.hash) AS avals_1",
+ True,
+ ),
+ (
+ lambda self: self.hashcol.array(),
"hstore_to_array(test_table.hash) AS hstore_to_array_1",
True,
- )
-
- def test_cols_matrix(self):
- self._test_cols(
- self.hashcol.matrix(),
+ ),
+ (
+ lambda self: self.hashcol.matrix(),
"hstore_to_matrix(test_table.hash) AS hstore_to_matrix_1",
True,
+ ),
+ )
+ def test_cols(self, colclause_fn, expected, from_):
+ colclause = colclause_fn(self)
+ stmt = select([colclause])
+ self.assert_compile(
+ stmt,
+ ("SELECT %s" + (" FROM test_table" if from_ else "")) % expected,
)
@@ -2850,7 +2748,36 @@ class JSONTest(AssertsCompiledSQL, fixtures.TestBase):
)
self.jsoncol = self.test_table.c.test_column
- def _test_where(self, whereclause, expected):
+ @testing.combinations(
+ (
+ lambda self: self.jsoncol["bar"] == None, # noqa
+ "(test_table.test_column -> %(test_column_1)s) IS NULL",
+ ),
+ (
+ lambda self: self.jsoncol[("foo", 1)] == None, # noqa
+ "(test_table.test_column #> %(test_column_1)s) IS NULL",
+ ),
+ (
+ lambda self: self.jsoncol["bar"].astext == None, # noqa
+ "(test_table.test_column ->> %(test_column_1)s) IS NULL",
+ ),
+ (
+ lambda self: self.jsoncol["bar"].astext.cast(Integer) == 5,
+ "CAST((test_table.test_column ->> %(test_column_1)s) AS INTEGER) "
+ "= %(param_1)s",
+ ),
+ (
+ lambda self: self.jsoncol["bar"].cast(Integer) == 5,
+ "CAST((test_table.test_column -> %(test_column_1)s) AS INTEGER) "
+ "= %(param_1)s",
+ ),
+ (
+ lambda self: self.jsoncol[("foo", 1)].astext == None, # noqa
+ "(test_table.test_column #>> %(test_column_1)s) IS NULL",
+ ),
+ )
+ def test_where(self, whereclause_fn, expected):
+ whereclause = whereclause_fn(self)
stmt = select([self.test_table]).where(whereclause)
self.assert_compile(
stmt,
@@ -2858,27 +2785,6 @@ class JSONTest(AssertsCompiledSQL, fixtures.TestBase):
"WHERE %s" % expected,
)
- def _test_cols(self, colclause, expected, from_=True):
- stmt = select([colclause])
- self.assert_compile(
- stmt,
- ("SELECT %s" + (" FROM test_table" if from_ else "")) % expected,
- )
-
- # This test is a bit misleading -- in real life you will need to cast to
- # do anything
- def test_where_getitem(self):
- self._test_where(
- self.jsoncol["bar"] == None, # noqa
- "(test_table.test_column -> %(test_column_1)s) IS NULL",
- )
-
- def test_where_path(self):
- self._test_where(
- self.jsoncol[("foo", 1)] == None, # noqa
- "(test_table.test_column #> %(test_column_1)s) IS NULL",
- )
-
def test_path_typing(self):
col = column("x", JSON())
is_(col["q"].type._type_affinity, types.JSON)
@@ -2898,38 +2804,20 @@ class JSONTest(AssertsCompiledSQL, fixtures.TestBase):
is_(col["q"]["p"].astext.type.__class__, MyType)
- def test_where_getitem_as_text(self):
- self._test_where(
- self.jsoncol["bar"].astext == None, # noqa
- "(test_table.test_column ->> %(test_column_1)s) IS NULL",
- )
-
- def test_where_getitem_astext_cast(self):
- self._test_where(
- self.jsoncol["bar"].astext.cast(Integer) == 5,
- "CAST((test_table.test_column ->> %(test_column_1)s) AS INTEGER) "
- "= %(param_1)s",
- )
-
- def test_where_getitem_json_cast(self):
- self._test_where(
- self.jsoncol["bar"].cast(Integer) == 5,
- "CAST((test_table.test_column -> %(test_column_1)s) AS INTEGER) "
- "= %(param_1)s",
- )
-
- def test_where_path_as_text(self):
- self._test_where(
- self.jsoncol[("foo", 1)].astext == None, # noqa
- "(test_table.test_column #>> %(test_column_1)s) IS NULL",
- )
-
- def test_cols_get(self):
- self._test_cols(
- self.jsoncol["foo"],
+ @testing.combinations(
+ (
+ lambda self: self.jsoncol["foo"],
"test_table.test_column -> %(test_column_1)s AS anon_1",
True,
)
+ )
+ def test_cols(self, colclause_fn, expected, from_):
+ colclause = colclause_fn(self)
+ stmt = select([colclause])
+ self.assert_compile(
+ stmt,
+ ("SELECT %s" + (" FROM test_table" if from_ else "")) % expected,
+ )
class JSONRoundTripTest(fixtures.TablesTest):
@@ -3292,40 +3180,35 @@ class JSONBTest(JSONTest):
)
self.jsoncol = self.test_table.c.test_column
- # Note - add fixture data for arrays []
-
- def test_where_has_key(self):
- self._test_where(
+ @testing.combinations(
+ (
# hide from 2to3
- getattr(self.jsoncol, "has_key")("data"),
+ lambda self: getattr(self.jsoncol, "has_key")("data"),
"test_table.test_column ? %(test_column_1)s",
- )
-
- def test_where_has_all(self):
- self._test_where(
- self.jsoncol.has_all(
+ ),
+ (
+ lambda self: self.jsoncol.has_all(
{"name": "r1", "data": {"k1": "r1v1", "k2": "r1v2"}}
),
"test_table.test_column ?& %(test_column_1)s",
- )
-
- def test_where_has_any(self):
- self._test_where(
- self.jsoncol.has_any(postgresql.array(["name", "data"])),
+ ),
+ (
+ lambda self: self.jsoncol.has_any(
+ postgresql.array(["name", "data"])
+ ),
"test_table.test_column ?| ARRAY[%(param_1)s, %(param_2)s]",
- )
-
- def test_where_contains(self):
- self._test_where(
- self.jsoncol.contains({"k1": "r1v1"}),
+ ),
+ (
+ lambda self: self.jsoncol.contains({"k1": "r1v1"}),
"test_table.test_column @> %(test_column_1)s",
- )
-
- def test_where_contained_by(self):
- self._test_where(
- self.jsoncol.contained_by({"foo": "1", "bar": None}),
+ ),
+ (
+ lambda self: self.jsoncol.contained_by({"foo": "1", "bar": None}),
"test_table.test_column <@ %(test_column_1)s",
- )
+ ),
+ )
+ def test_where(self, whereclause_fn, expected):
+ super(JSONBTest, self).test_where(whereclause_fn, expected)
class JSONBRoundTripTest(JSONRoundTripTest):