summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2015-12-28 19:00:50 -0500
committerMike Bayer <mike_mp@zzzcomputing.com>2015-12-28 19:00:50 -0500
commit050bb692360bb975d6668920fcf42355e036da7c (patch)
tree7bb714c5405195418ea8245e65e098651d37a11a
parentc7d6c667b53d96a65e0dedcb83c098e03d4c7453 (diff)
downloadsqlalchemy-050bb692360bb975d6668920fcf42355e036da7c.tar.gz
- start to figure out some kind of common tests for JSON types, reference #3619
-rw-r--r--lib/sqlalchemy/dialects/postgresql/json.py4
-rw-r--r--lib/sqlalchemy/testing/requirements.py6
-rw-r--r--lib/sqlalchemy/testing/suite/test_types.py249
-rw-r--r--test/requirements.py4
4 files changed, 260 insertions, 3 deletions
diff --git a/lib/sqlalchemy/dialects/postgresql/json.py b/lib/sqlalchemy/dialects/postgresql/json.py
index 8a50270f5..4ea26be62 100644
--- a/lib/sqlalchemy/dialects/postgresql/json.py
+++ b/lib/sqlalchemy/dialects/postgresql/json.py
@@ -245,7 +245,9 @@ class JSON(sqltypes.Indexable, sqltypes.TypeEngine):
against, self.expr.right, result_type=self.type.astext_type)
def _setup_getitem(self, index):
- if not isinstance(index, util.string_types):
+ if isinstance(index, int):
+ operator = INDEX
+ elif not isinstance(index, util.string_types):
assert isinstance(index, collections.Sequence)
tokens = [util.text_type(elem) for elem in index]
index = "{%s}" % (", ".join(tokens))
diff --git a/lib/sqlalchemy/testing/requirements.py b/lib/sqlalchemy/testing/requirements.py
index 15bfad831..87c776e8c 100644
--- a/lib/sqlalchemy/testing/requirements.py
+++ b/lib/sqlalchemy/testing/requirements.py
@@ -487,6 +487,12 @@ class SuiteRequirements(Requirements):
return exclusions.open()
@property
+ def json_type(self):
+ """target platform implements a native JSON type."""
+
+ return exclusions.closed()
+
+ @property
def precision_numerics_general(self):
"""target backend has general support for moderately high-precision
numerics."""
diff --git a/lib/sqlalchemy/testing/suite/test_types.py b/lib/sqlalchemy/testing/suite/test_types.py
index 230aeb1e9..1c565122a 100644
--- a/lib/sqlalchemy/testing/suite/test_types.py
+++ b/lib/sqlalchemy/testing/suite/test_types.py
@@ -5,7 +5,7 @@ from ..assertions import eq_
from ..config import requirements
from sqlalchemy import Integer, Unicode, UnicodeText, select
from sqlalchemy import Date, DateTime, Time, MetaData, String, \
- Text, Numeric, Float, literal, Boolean
+ Text, Numeric, Float, literal, Boolean, cast, null
from ..schema import Table, Column
from ... import testing
import decimal
@@ -585,8 +585,253 @@ class BooleanTest(_LiteralRoundTripFixture, fixtures.TablesTest):
(None, None)
)
+from sqlalchemy.dialects.postgresql import JSON
-__all__ = ('UnicodeVarcharTest', 'UnicodeTextTest',
+
+class JSONTest(_LiteralRoundTripFixture, fixtures.TablesTest):
+ __requires__ = 'json_type',
+ __backend__ = True
+
+ datatype = JSON
+
+ data1 = {
+ "key1": "value1",
+ "key2": "value2"
+ }
+
+ data2 = {
+ "Key 'One'": "value1",
+ "key two": "value2",
+ "key three": "value ' three '"
+ }
+
+ data3 = {
+ "key1": [1, 2, 3],
+ "key2": ["one", "two", "three"],
+ "key3": [{"four": "five"}, {"six": "seven"}]
+ }
+
+ data4 = ["one", "two", "three"]
+
+ data5 = {
+ "nested": {
+ "elem1": [
+ {"a": "b", "c": "d"},
+ {"e": "f", "g": "h"}
+ ],
+ "elem2": {
+ "elem3": {"elem4": "elem5"}
+ }
+ }
+ }
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table('data_table', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('name', String(30), nullable=False),
+ Column('data', cls.datatype),
+ Column('nulldata', cls.datatype(none_as_null=True))
+ )
+
+ def test_round_trip_data1(self):
+ self._test_round_trip(self.data1)
+
+ def _test_round_trip(self, data_element):
+ data_table = self.tables.data_table
+
+ config.db.execute(
+ data_table.insert(),
+ {'name': 'row1', 'data': data_element}
+ )
+
+ row = config.db.execute(
+ select([
+ data_table.c.data,
+ ])
+ ).first()
+
+ compare = self.compare or self.data
+ eq_(row,
+ (compare, ))
+ assert isinstance(row[0], type(compare))
+
+ def test_round_trip_none_as_sql_null(self):
+ col = self.tables.data_table.c['nulldata']
+
+ with config.db.connect() as conn:
+ conn.execute(
+ self.tables.data_table.insert(),
+ {"name": "r1", "data": None}
+ )
+
+ eq_(
+ conn.scalar(
+ select([self.tables.data_table.c.name]).
+ where(col.is_(null()))
+ ),
+ "r1"
+ )
+
+ eq_(
+ conn.scalar(
+ select([col])
+ ),
+ None
+ )
+
+ def test_round_trip_json_null_as_json_null(self):
+ col = self.tables.data_table.c['data']
+
+ with config.db.connect() as conn:
+ conn.execute(
+ self.tables.data_table.insert(),
+ {"name": "r1", "data": JSON.NULL}
+ )
+
+ eq_(
+ conn.scalar(
+ select([self.tables.data_table.c.name]).
+ where(cast(col, String) == 'null')
+ ),
+ "r1"
+ )
+
+ eq_(
+ conn.scalar(
+ select([col])
+ ),
+ None
+ )
+
+ def test_round_trip_none_as_json_null(self):
+ col = self.tables.data_table.c['data']
+
+ with config.db.connect() as conn:
+ conn.execute(
+ self.tables.data_table.insert(),
+ {"name": "r1", "data": None}
+ )
+
+ eq_(
+ conn.scalar(
+ select([self.tables.data_table.c.name]).
+ where(cast(col, String) == 'null')
+ ),
+ "r1"
+ )
+
+ eq_(
+ conn.scalar(
+ select([col])
+ ),
+ None
+ )
+
+ def _criteria_fixture(self):
+ config.db.execute(
+ self.tables.data_table.insert(),
+ [{"name": "r1", "data": self.data1},
+ {"name": "r2", "data": self.data2},
+ {"name": "r3", "data": self.data3},
+ {"name": "r4", "data": self.data4},
+ {"name": "r5", "data": self.data5}]
+ )
+
+ def _test_index_criteria(self, crit, expected):
+ self._criteria_fixture()
+ with config.db.connect() as conn:
+ import pdb
+ pdb.set_trace()
+ eq_(
+ conn.scalar(
+ select([self.tables.data_table.c.name]).
+ where(crit)
+ ),
+ expected
+ )
+
+ def test_crit_spaces_in_key(self):
+ col = self.tables.data_table.c['data']
+ self._test_index_criteria(
+ cast(col["key two"], String) == "value2",
+ "r2"
+ )
+
+ def test_crit_simple_int(self):
+ col = self.tables.data_table.c['data']
+ self._test_index_criteria(
+ cast(col[1], String) == "two",
+ "r2"
+ )
+
+ def test_crit_mixed_path(self):
+ col = self.tables.data_table.c['data']
+ self._test_index_criteria(
+ cast(col[("key3", 2, "six")], String) == "seven",
+ "r5"
+ )
+
+ def test_crit_string_path(self):
+ col = self.tables.data_table.c['data']
+ self._test_index_criteria(
+ cast(col[("nested", "elem2", "elem4")], String) == "elem5",
+ "r5"
+ )
+
+ def test_unicode_round_trip(self, engine):
+ s = select([
+ cast(
+ {
+ util.u('réveillé'): util.u('réveillé'),
+ "data": {"k1": util.u('drôle')}
+ },
+ self.datatype
+ )
+ ])
+ eq_(
+ config.db.scalar(s),
+ {
+ util.u('réveillé'): util.u('réveillé'),
+ "data": {"k1": util.u('drôle')}
+ },
+ )
+
+ def test_eval_none_flag_orm(self):
+ from sqlalchemy.ext.declarative import declarative_base
+ from sqlalchemy.orm import Session
+
+ Base = declarative_base()
+
+ class Data(Base):
+ __table__ = self.tables.data_table
+
+ s = Session(testing.db)
+
+ d1 = Data(name='d1', data=None, nulldata=None)
+ s.add(d1)
+ s.commit()
+
+ s.bulk_insert_mappings(
+ Data, [{"name": "d2", "data": None, "nulldata": None}]
+ )
+ eq_(
+ s.query(
+ cast(self.tables.data_table.c.data, String),
+ cast(self.tables.data_table.c.nulldata, String)
+ ).filter(self.tables.data_table.c.name == 'd1').first(),
+ ("null", None)
+ )
+ eq_(
+ s.query(
+ cast(self.tables.data_table.c.data, String),
+ cast(self.tables.data_table.c.nulldata, String)
+ ).filter(self.tables.data_table.c.name == 'd2').first(),
+ ("null", None)
+ )
+
+
+__all__ = ('UnicodeVarcharTest', 'UnicodeTextTest', 'JSONTest',
'DateTest', 'DateTimeTest', 'TextTest',
'NumericTest', 'IntegerTest',
'DateTimeHistoricTest', 'DateTimeCoercedToDateTimeTest',
diff --git a/test/requirements.py b/test/requirements.py
index ff93a9c3d..3aa435cee 100644
--- a/test/requirements.py
+++ b/test/requirements.py
@@ -531,6 +531,10 @@ class DefaultRequirements(SuiteRequirements):
'sybase')
@property
+ def json_type(self):
+ return only_on(["mysql >= 5.7", "postgresql >= 9.3"])
+
+ @property
def datetime_literals(self):
"""target dialect supports rendering of a date, time, or datetime as a
literal string, e.g. via the TypeEngine.literal_processor() method.