summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStephen Rauch <stephen.rauch+github@gmail.com>2015-12-09 14:10:15 -0800
committerStephen Rauch <stephen.rauch+github@gmail.com>2015-12-09 14:10:15 -0800
commitb42ce1487e53ede01f4d37b3c71a629b554caeb6 (patch)
treeca9c72c0748c4485fde010b82311c90262c91b20
parentf4a1129e79e0cd938da3e7737b190f7f4db3ed63 (diff)
downloadsqlalchemy-pr/221.tar.gz
Add support for MySQL JSONpr/221
-rw-r--r--.gitignore1
-rw-r--r--lib/sqlalchemy/dialects/mysql/__init__.py2
-rw-r--r--lib/sqlalchemy/dialects/mysql/base.py27
-rw-r--r--lib/sqlalchemy/dialects/mysql/json.py325
-rw-r--r--test/dialect/mysql/test_types.py394
5 files changed, 741 insertions, 8 deletions
diff --git a/.gitignore b/.gitignore
index 81fd2d9ed..5acbb972f 100644
--- a/.gitignore
+++ b/.gitignore
@@ -20,3 +20,4 @@ sqlnet.log
/mapping_setup.py
/test.py
/.cache/
+/.idea/
diff --git a/lib/sqlalchemy/dialects/mysql/__init__.py b/lib/sqlalchemy/dialects/mysql/__init__.py
index c1f78bd1d..55a4ebca7 100644
--- a/lib/sqlalchemy/dialects/mysql/__init__.py
+++ b/lib/sqlalchemy/dialects/mysql/__init__.py
@@ -21,6 +21,8 @@ from .base import \
TINYBLOB, TINYINT, TINYTEXT,\
VARBINARY, VARCHAR, YEAR, dialect
+from .json import JSON
+
__all__ = (
'BIGINT', 'BINARY', 'BIT', 'BLOB', 'BOOLEAN', 'CHAR', 'DATE', 'DATETIME',
'DECIMAL', 'DOUBLE', 'ENUM', 'DECIMAL', 'FLOAT', 'INTEGER', 'INTEGER',
diff --git a/lib/sqlalchemy/dialects/mysql/base.py b/lib/sqlalchemy/dialects/mysql/base.py
index 988746403..9ed13eb91 100644
--- a/lib/sqlalchemy/dialects/mysql/base.py
+++ b/lib/sqlalchemy/dialects/mysql/base.py
@@ -534,6 +534,13 @@ output::
``explicit_defaults_for_timestamp``. Prior to this version, it will
not render "NOT NULL" for a TIMESTAMP column that is ``nullable=False``.
+JSON Types
+----------
+
+The MySQL dialect supports the JSON datatype as of version 5.7:
+
+* :class:`.mysql.JSON`
+
"""
import datetime
@@ -602,6 +609,8 @@ RESERVED_WORDS = set(
'get', 'io_after_gtids', 'io_before_gtids', 'master_bind', 'one_shot',
'partition', 'sql_after_gtids', 'sql_before_gtids', # 5.6
+ 'json', # 5.7
+
])
AUTOCOMMIT_RE = re.compile(
@@ -2225,6 +2234,12 @@ class MySQLTypeCompiler(compiler.GenericTypeCompiler):
else:
return self._extend_numeric(type_, "SMALLINT")
+ def visit_JSON(self, type_, **kw):
+ if not self.dialect._supports_json:
+ util.warn("Current MySQL version does not support JSON.")
+ return "TEXT"
+ return "JSON"
+
def visit_BIT(self, type_, **kw):
if type_.length is not None:
return "BIT(%s)" % type_.length
@@ -2433,10 +2448,13 @@ class MySQLDialect(default.DefaultDialect):
})
]
- def __init__(self, isolation_level=None, **kwargs):
- kwargs.pop('use_ansiquotes', None) # legacy
+ def __init__(self, isolation_level=None, json_serializer=None,
+ json_deserializer=None, **kwargs):
+ kwargs.pop('use_ansiquotes', None) # legacy
default.DefaultDialect.__init__(self, **kwargs)
self.isolation_level = isolation_level
+ self._json_deserializer = json_deserializer
+ self._json_serializer = json_serializer
def on_connect(self):
if self.isolation_level is not None:
@@ -2607,6 +2625,11 @@ class MySQLDialect(default.DefaultDialect):
return self.server_version_info is None or \
self.server_version_info >= (4, 0, 2)
+ @property
+ def _supports_json(self):
+ return self.server_version_info is None or \
+ self.server_version_info >= (5, 7, 9)
+
@reflection.cache
def get_schema_names(self, connection, **kw):
rp = connection.execute("SHOW schemas")
diff --git a/lib/sqlalchemy/dialects/mysql/json.py b/lib/sqlalchemy/dialects/mysql/json.py
new file mode 100644
index 000000000..1eab58eea
--- /dev/null
+++ b/lib/sqlalchemy/dialects/mysql/json.py
@@ -0,0 +1,325 @@
+# postgresql/json.py
+# Copyright (C) 2005-2015 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: http://www.opensource.org/licenses/mit-license.php
+from __future__ import absolute_import
+
+import collections
+import json
+
+from .base import ischema_names
+from ... import types as sqltypes
+from ...sql import elements
+from ...sql import expression
+from ... import util
+from operator import getitem
+
+
+__all__ = ('JSON',)
+
+
+class JSON(sqltypes.Indexable, sqltypes.TypeEngine):
+
+ """Represent the Mysql JSON type.
+
+ The :class:`.JSON` type stores arbitrary JSON format data, e.g.::
+
+ data_table = Table('data_table', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('data', JSON)
+ )
+
+ with engine.connect() as conn:
+ conn.execute(
+ data_table.insert(),
+ data = {"key1": "value1", "key2": "value2"}
+ )
+
+ :class:`.JSON` provides several operations:
+
+ * Index operations::
+
+ data_table.c.data['some key']
+
+ * Index operations with CAST
+ (equivalent to
+ ``CAST(JSON_EXPORT(data_table.data, $."some key" AS <type>)``)::
+
+ data_table.c.data['some key'].cast(Integer)
+
+ * Path index operations::
+
+ data_table.c.data[('key_1', 'key_2', ..., 'key_n')]
+
+ * Path attribute operations::
+
+ data_table.c.data.json.key_1.key_2
+
+ (equivalent to: data_table.c.data.json['key_1']['key_2'])
+
+ * Wildcards::
+
+ data_table.c.data['*'], rendered as [*], array index wildcard
+ data_table.c.data['%'], rendered as .*, object key wildcard
+
+ Index operations return an expression object whose type defaults to
+ :class:`.JSON` by default, so that further JSON-oriented instructions
+ may be called upon the result type.
+
+ The :class:`.JSON` type, when used with the SQLAlchemy ORM, does not
+ detect in-place mutations to the structure. In order to detect these, the
+ :mod:`sqlalchemy.ext.mutable` extension must be used. This extension will
+ allow "in-place" changes to the datastructure to produce events which
+ will be detected by the unit of work. See the example at :class:`.HSTORE`
+ for a simple example involving a dictionary.
+
+ When working with NULL values, the :class:`.JSON` type recommends the
+ use of two specific constants in order to differentiate between a column
+ that evaluates to SQL NULL, e.g. no value, vs. the JSON-encoded string
+ of ``"null"``. To insert or select against a value that is SQL NULL,
+ use the constant :func:`.null`::
+
+ conn.execute(table.insert(), json_value=null())
+
+ To insert or select against a value that is JSON ``"null"``, use the
+ constant :attr:`.JSON.NULL`::
+
+ conn.execute(table.insert(), json_value=JSON.NULL)
+
+ The :class:`.JSON` type supports a flag
+ :paramref:`.JSON.none_as_null` which when set to True will result
+ in the Python constant ``None`` evaluating to the value of SQL
+ NULL, and when set to False results in the Python constant
+ ``None`` evaluating to the value of JSON ``"null"``. The Python
+ value ``None`` may be used in conjunction with either
+ :attr:`.JSON.NULL` and :func:`.null` in order to indicate NULL
+ values, but care must be taken as to the value of the
+ :paramref:`.JSON.none_as_null` in these cases.
+
+ Custom serializers and deserializers are specified at the dialect level,
+ that is using :func:`.create_engine`. The reason for this is that when
+ using psycopg2, the DBAPI only allows serializers at the per-cursor
+ or per-connection level. E.g.::
+
+ engine = create_engine("postgresql://scott:tiger@localhost/test",
+ json_serializer=my_serialize_fn,
+ json_deserializer=my_deserialize_fn
+ )
+
+ When using the psycopg2 dialect, the json_deserializer is registered
+ against the database using ``psycopg2.extras.register_default_json``.
+
+ .. versionadded:: 1.1
+
+ """
+
+ __visit_name__ = 'JSON'
+
+ hashable = False
+
+ NULL = util.symbol('JSON_NULL')
+ """Describe the json value of NULL.
+
+ This value is used to force the JSON value of ``"null"`` to be
+ used as the value. A value of Python ``None`` will be recognized
+ either as SQL NULL or JSON ``"null"``, based on the setting
+ of the :paramref:`.JSON.none_as_null` flag; the :attr:`.JSON.NULL`
+ constant can be used to always resolve to JSON ``"null"`` regardless
+ of this setting. This is in contrast to the :func:`.sql.null` construct,
+ which always resolves to SQL NULL. E.g.::
+
+ from sqlalchemy import null
+ from sqlalchemy.dialects.postgresql import JSON
+
+ obj1 = MyObject(json_value=null()) # *always* insert SQL NULL
+ obj2 = MyObject(json_value=JSON.NULL) # *always* insert JSON "null"
+
+ session.add_all([obj1, obj2])
+ session.commit()
+
+ .. versionadded:: 1.1
+
+ """
+
+ def __init__(self, none_as_null=False):
+ """Construct a :class:`.JSON` type.
+
+ :param none_as_null: if True, persist the value ``None`` as a
+ SQL NULL value, not the JSON encoding of ``null``. Note that
+ when this flag is False, the :func:`.null` construct can still
+ be used to persist a NULL value::
+
+ from sqlalchemy import null
+ conn.execute(table.insert(), data=null())
+
+ .. seealso::
+
+ :attr:`.JSON.NULL`
+
+ .. versionadded:: 1.1.0
+
+ """
+ self.none_as_null = none_as_null
+
+ @property
+ def should_evaluate_none(self):
+ return not self.none_as_null
+
+ class Comparator(
+ sqltypes.Indexable.Comparator, sqltypes.Concatenable.Comparator):
+ """Define comparison operations for :class:`.JSON`."""
+
+ def _setup_getitem(self, index):
+ return getitem, JSON._massage_index(index), self.type
+
+ def operate(self, op, *other, **kwargs):
+ if op.__name__ == 'getitem':
+ return JSON.MySqlJsonExtract(self.expr, *other)
+ else:
+ if len(other) > 0 and isinstance(
+ other[0], (list, tuple, dict)):
+ # convert list, tuple or dict json doc.
+ other = list(other)
+ other[0] = JSON.MySqlJsonDocument(other[0], op)
+
+ return super(JSON.Comparator, self).operate(
+ op, *other, **kwargs)
+
+ @property
+ def json(self):
+ """Property which allows attribute access to document fields
+ eg:
+ data_table.c.data.json.k1
+
+ is equivalent to:
+ data_table.c.data['k1']
+ """
+ return JSON.MySqlJsonExtract(self.expr, "", attribute_access=True)
+
+ comparator_factory = Comparator
+
+ class MySqlJsonExtract(expression.Function, dict):
+ """Represents a Json Extract Function which can be invoked via getitem.
+ """
+
+ def __init__(self, doc, path, attribute_access=False):
+ self.json_doc = doc
+ self.json_path = path
+ self.attribute_access = attribute_access
+ expression.Function.__init__(
+ self, "JSON_EXTRACT", doc, '$' + path, type_=JSON)
+
+ # these two lines, along with the json property on the
+ # comparator above, support the __getattr__ method below
+ if self.attribute_access:
+ local_dict = {}
+ dict.__init__(self, local_dict)
+
+ def __getitem__(self, path):
+ """getitem on an existing json_extract just adds to the path"""
+ new_path = self.json_path + JSON._massage_index(path)
+ return JSON.MySqlJsonExtract(
+ self.json_doc, new_path, attribute_access=self.attribute_access)
+
+ def __getattr__(self, item):
+ """Maps attributes to json doc path. Only called if this class
+ does not have an attribute with this name. This allows attribute
+ based access to sub-documents, but it means that hasattr()
+ will now be broken when acting on this class.
+
+ The test for "item[0] != '_' works around this for the known
+ cases of hasattr() usage in the framework. So attributes names
+ not starting with '_' will be considered to be part of the
+ json document.
+ """
+ if self.attribute_access or item[0] != '_':
+ return self.__getitem__(item)
+ raise AttributeError(item)
+
+ class MySqlJsonDocument(expression.Function):
+ """Represents a literal Json Document"""
+
+ def __init__(self, doc, op=None):
+ if isinstance(doc, (list, tuple)):
+ name = "json_array"
+ empty = '[]'
+ else:
+ name = "json_object"
+ empty = '{}'
+
+ self.obj = doc
+ bound_doc = expression.bindparam(
+ name, value=doc, type_=JSON, unique=True,
+ _compared_to_operator=op, _compared_to_type=JSON)
+
+ # could not find a way to represent a json literal document,
+ # so resorted to JSON_MERGE() function which takes as a
+ # parameter a json document.
+ expression.Function.__init__(
+ self, "JSON_MERGE", empty, bound_doc, type_=JSON)
+
+ @staticmethod
+ def _massage_index(index):
+ """
+ Integers are assumed to be array lookups and are formatted as: [x]
+ Strings are assumed to be dict lookups and are formatted as: ."x"
+ Wildcard ['*'] from python is formatted as an array wildcard [*]
+ Wildcard ['%'] from python is formatted as an object wildcard .*
+ """
+
+ if isinstance(index, int):
+ index = '[%d]' % index
+ elif not isinstance(index, util.string_types):
+ assert isinstance(index, collections.Sequence)
+ tokens = ["%s" % JSON._massage_index(elem) for elem in index]
+ index = "".join(tokens)
+ elif index == '%':
+ index = '.*'
+ elif index == '*':
+ index = '[*]'
+ elif index[:2] != '**':
+ index = '."%s"' % index
+ return index
+
+ def bind_processor(self, dialect):
+ json_serializer = dialect._json_serializer or json.dumps
+ if util.py2k:
+ encoding = dialect.encoding
+ else:
+ encoding = None
+
+ def process(value):
+ if value is self.NULL:
+ value = None
+ elif isinstance(value, elements.Null) or (
+ value is None and self.none_as_null
+ ):
+ return None
+ if encoding:
+ encoded = json_serializer(value).encode(encoding)
+ else:
+ encoded = json_serializer(value)
+
+ return encoded
+
+ return process
+
+ def result_processor(self, dialect, coltype):
+ json_deserializer = dialect._json_deserializer or json.loads
+ if util.py2k:
+ encoding = dialect.encoding
+ else:
+ encoding = None
+
+ def process(value):
+ if value is None:
+ return None
+ if encoding:
+ value = value.decode(encoding)
+ return json_deserializer(value)
+ return process
+
+
+ischema_names['json'] = JSON
diff --git a/test/dialect/mysql/test_types.py b/test/dialect/mysql/test_types.py
index 7c279ffbf..47264a5e9 100644
--- a/test/dialect/mysql/test_types.py
+++ b/test/dialect/mysql/test_types.py
@@ -1,26 +1,30 @@
# coding: utf-8
-from sqlalchemy.testing import eq_, assert_raises, assert_raises_message
+from sqlalchemy.testing import eq_, is_, assert_raises, assert_raises_message
from sqlalchemy import *
from sqlalchemy import sql, exc, schema
from sqlalchemy.util import u
from sqlalchemy import util
from sqlalchemy.dialects.mysql import base as mysql
-from sqlalchemy.testing import fixtures, AssertsCompiledSQL, AssertsExecutionResults
+from sqlalchemy.dialects.mysql import JSON
+from sqlalchemy.testing import AssertsCompiledSQL, AssertsExecutionResults
+from sqlalchemy.testing import engines, fixtures
from sqlalchemy import testing
+from sqlalchemy.ext.declarative import declarative_base
+from sqlalchemy.orm import Session
import datetime
import decimal
class TypesTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL):
- "Test MySQL column types"
+ """Test MySQL column types"""
__dialect__ = mysql.dialect()
__only_on__ = 'mysql'
__backend__ = True
def test_numeric(self):
- "Exercise type specification and options for numeric types."
+ """Exercise type specification and options for numeric types."""
columns = [
# column type, args, kwargs, expected ddl
@@ -530,7 +534,6 @@ class TypesTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL):
]
)
-
def test_datetime_generic(self):
self.assert_compile(
mysql.DATETIME(),
@@ -543,7 +546,6 @@ class TypesTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL):
"DATETIME(4)"
)
-
def test_time_generic(self):
""""Exercise TIME."""
@@ -1021,3 +1023,383 @@ def colspec(c):
return testing.db.dialect.ddl_compiler(
testing.db.dialect, None).get_column_specification(c)
+
+class JSONTest(AssertsCompiledSQL, fixtures.TestBase):
+ __only_on__ = ('mysql >= 5.7.9',)
+ __dialect__ = mysql.dialect()
+
+ def setup(self):
+ metadata = MetaData()
+ self.test_table = Table('test_table', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('test_column', JSON),
+ )
+ self.jsoncol = self.test_table.c.test_column
+ self.ids = self.test_table.c.id
+
+ def _test_where(self, whereclause, expected):
+ stmt = select([self.test_table]).where(whereclause)
+ self.assert_compile(
+ stmt,
+ "SELECT test_table.id, test_table.test_column FROM test_table "
+ "WHERE %s" % expected
+ )
+
+ def _test_cols(self, colclause, expected, from_=True):
+ stmt = select([colclause])
+ self.assert_compile(
+ stmt,
+ (
+ "SELECT %s" +
+ (" FROM test_table" if from_ else "")
+ ) % expected
+ )
+
+ def test_bind_serialize_default(self):
+ dialect = mysql.dialect()
+ proc = self.test_table.c.test_column.type._cached_bind_processor(
+ dialect)
+ eq_(
+ proc({"A": [1, 2, 3, True, False]}),
+ '{"A": [1, 2, 3, true, false]}'
+ )
+
+ def test_bind_serialize_none(self):
+ dialect = mysql.dialect()
+ proc = self.test_table.c.test_column.type._cached_bind_processor(
+ dialect)
+ eq_(
+ proc(None),
+ 'null'
+ )
+
+ def test_bind_serialize_none_as_null(self):
+ dialect = mysql.dialect()
+ proc = JSON(none_as_null=True)._cached_bind_processor(
+ dialect)
+ eq_(
+ proc(None),
+ None
+ )
+ eq_(
+ proc(null()),
+ None
+ )
+
+ def test_bind_serialize_null(self):
+ dialect = mysql.dialect()
+ proc = self.test_table.c.test_column.type._cached_bind_processor(
+ dialect)
+ eq_(
+ proc(null()),
+ None
+ )
+
+ def test_result_deserialize_default(self):
+ dialect = mysql.dialect()
+ proc = self.test_table.c.test_column.type._cached_result_processor(
+ dialect, None)
+ eq_(
+ proc('{"A": [1, 2, 3, true, false]}'),
+ {"A": [1, 2, 3, True, False]}
+ )
+
+ def test_result_deserialize_null(self):
+ dialect = mysql.dialect()
+ proc = self.test_table.c.test_column.type._cached_result_processor(
+ dialect, None)
+ eq_(
+ proc('null'),
+ None
+ )
+
+ def test_result_deserialize_None(self):
+ dialect = mysql.dialect()
+ proc = self.test_table.c.test_column.type._cached_result_processor(
+ dialect, None)
+ eq_(
+ proc(None),
+ None
+ )
+
+ # This test is a bit misleading -- in real life you will need to cast to
+ # do anything
+ def test_where_getitem(self):
+ self._test_where(
+ self.jsoncol['bar'][1] == None,
+ "JSON_EXTRACT(test_table.test_column, %s) IS NULL"
+ )
+
+ def test_where_path(self):
+ self._test_where(
+ self.jsoncol[("foo", 1)] == None,
+ "JSON_EXTRACT(test_table.test_column, %s) IS NULL"
+ )
+
+ def test_path_typing(self):
+ col = column('x', JSON())
+ is_(
+ col['q'].type._type_affinity, JSON
+ )
+ is_(
+ col[('q', )].type._type_affinity, JSON
+ )
+ is_(
+ col['q']['p'].type._type_affinity, JSON
+ )
+ is_(
+ col[('q', 'p')].type._type_affinity, JSON
+ )
+
+ def test_where_getitem_json_cast(self):
+ self._test_where(
+ self.jsoncol['bar'].cast(Integer) == 5,
+ "CAST(JSON_EXTRACT(test_table.test_column, %s) AS SIGNED INTEGER) "
+ "= %s"
+ )
+
+ def test_cols_get(self):
+ self._test_cols(
+ self.jsoncol['foo'],
+ "JSON_EXTRACT(test_table.test_column, %s) AS `JSON_EXTRACT_1`",
+ True
+ )
+
+class JSONRoundTripTest(fixtures.TablesTest):
+ __only_on__ = ('mysql >= 5.7.9',)
+ __backend__ = True
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table('data_table', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('name', String(30), nullable=False),
+ Column('data', JSON),
+ Column('nulldata', JSON(none_as_null=True))
+ )
+
+ def _fixture_data(self, engine):
+ data_table = self.tables.data_table
+ engine.execute(
+ data_table.insert(),
+ {'name': 'r1', 'data': {"k1": "r1v1", "k2": "r1v2"}},
+ {'name': 'r2', 'data': {"k1": "r2v1", "k2": "r2v2"}},
+ {'name': 'r3', 'data': {"k1": "r3v1", "k2": "r3v2"}},
+ {'name': 'r4', 'data': {"k1": "r4v1", "k2": "r4v2"}},
+ {'name': 'r5', 'data': {"k1": "r5v1", "k2": "r5v2", "k3": 5}},
+ {'name': 'r6', 'data': {"k1": {"r6v1": {'subr': [1, 2, 3]}}}},
+ {'name': 'r7', 'data': [{"r7v1": 1}, {"r7v2": 2}, {"r7v3": 3}]},
+ )
+
+ def _assert_data(self, compare, engine, column='data'):
+ col = self.tables.data_table.c[column]
+
+ data = engine.execute(
+ select([col]).
+ order_by(self.tables.data_table.c.name)
+ ).fetchall()
+ eq_([d for d, in data], compare)
+
+ def _assert_column_is_NULL(self, column='data'):
+ col = self.tables.data_table.c[column]
+
+ data = testing.db.execute(
+ select([col]).
+ where(col.is_(null()))
+ ).fetchall()
+ eq_([d for d, in data], [None])
+
+ def _assert_column_is_JSON_NULL(self, column='data'):
+ col = self.tables.data_table.c[column]
+
+ data = testing.db.execute(
+ select([col]).
+ where(cast(col, String) == "null")
+ ).fetchall()
+ eq_([d for d, in data], [None])
+
+ def test_insert(self):
+ testing.db.execute(
+ self.tables.data_table.insert(),
+ {'name': 'r1', 'data': {"k1": "r1v1", "k2": "r1v2"}}
+ )
+ self._assert_data([{"k1": "r1v1", "k2": "r1v2"}], testing.db)
+
+ def test_insert_nulls(self):
+ testing.db.execute(
+ self.tables.data_table.insert(),
+ {'name': 'r1', 'data': null()}
+ )
+ self._assert_data([None], testing.db)
+
+ def test_insert_none_as_null(self):
+ testing.db.execute(
+ self.tables.data_table.insert(),
+ {'name': 'r1', 'nulldata': None}
+ )
+ self._assert_column_is_NULL(column='nulldata')
+
+ def test_insert_nulljson_into_none_as_null(self):
+ testing.db.execute(
+ self.tables.data_table.insert(),
+ {'name': 'r1', 'nulldata': JSON.NULL}
+ )
+ self._assert_column_is_JSON_NULL(column='nulldata')
+
+ def test_reflect(self):
+ insp = inspect(testing.db)
+ cols = insp.get_columns('data_table')
+ assert isinstance(cols[2]['type'], JSON)
+
+ def test_custom_serialize_deserialize(self):
+ import json
+
+ def loads(value):
+ value = json.loads(value)
+ value['x'] += '_loads'
+ return value
+
+ def dumps(value):
+ value = dict(value)
+ value['x'] = 'dumps_y'
+ return json.dumps(value)
+
+ engine = engines.testing_engine(options=dict(
+ json_serializer=dumps,
+ json_deserializer=loads
+ ))
+
+ engine.execute(
+ self.tables.data_table.insert(),
+ {'name': 'r1', 'data': {"key": "value", "x": "q"}}
+ )
+ self._assert_data([{"key": "value", "x": "dumps_y_loads"}], engine)
+
+ def test_attribute_query(self):
+ self._fixture_data(testing.db)
+ data_table = self.tables.data_table
+ result = testing.db.execute(
+ select([data_table.c.name]).where(
+ data_table.c.data.json.k1['r6v1'].subr == [1, 2, 3]
+ )
+ )
+ eq_(result.scalar(), 'r6')
+
+ def test_path_query_list(self):
+ self._fixture_data(testing.db)
+ data_table = self.tables.data_table
+ result = testing.db.execute(
+ select([data_table.c.name]).where(
+ data_table.c.data[('k1', 'r6v1', 'subr')] == [1, 2, 3]
+ )
+ )
+ eq_(result.scalar(), 'r6')
+
+ def test_path_query_dict(self):
+ self._fixture_data(testing.db)
+ data_table = self.tables.data_table
+ result = testing.db.execute(
+ select([data_table.c.name]).where(
+ data_table.c.data[('k1', 'r6v1')] == {'subr': [1, 2, 3]}
+ )
+ )
+ eq_(result.scalar(), 'r6')
+
+ def test_multi_index_query(self):
+ self._fixture_data(testing.db)
+ data_table = self.tables.data_table
+ result = testing.db.execute(
+ select([data_table.c.name]).where(
+ data_table.c.data['k1']['r6v1']['subr'] == [1, 2, 3]
+ )
+ )
+ eq_(result.scalar(), 'r6')
+
+ def test_wildcard_query(self):
+ self._fixture_data(testing.db)
+ data_table = self.tables.data_table
+ data = (
+ (data_table.c.data['*']['%'], 'r7', [1, 2, 3]),
+ (data_table.c.data['**.subr']['*'], 'r6', [1, 2, 3]),
+ (data_table.c.data['**']['subr']['*'], 'r6', [1, 2, 3]),
+ )
+ for datum in data:
+ result = testing.db.execute(
+ select([datum[0]]).where(data_table.c.name == datum[1]
+ )
+ )
+ eq_(result.scalar(), datum[2])
+
+ def test_query_returned_as_int(self):
+ self._fixture_data(testing.db)
+ data_table = self.tables.data_table
+ result = testing.db.execute(
+ select([data_table.c.data['k3'].cast(Integer)]).where(
+ data_table.c.name == 'r5')
+ ).first()
+ assert isinstance(result[0], long)
+
+ def test_criterion(self):
+ self._fixture_data(testing.db)
+ data_table = self.tables.data_table
+ result = testing.db.execute(
+ select([data_table.c.data]).where(
+ data_table.c.data['k1'] == 'r3v1'
+ )
+ ).first()
+ eq_(result, ({'k1': 'r3v1', 'k2': 'r3v2'},))
+
+ def test_fixed_round_trip(self):
+ doc = {
+ "key": "value",
+ "key2": {"k1": "v1", "k2": "v2"}
+ }
+ s = select([JSON.MySqlJsonDocument(doc)])
+ eq_(
+ testing.db.scalar(s),
+ doc
+ )
+
+ def test_unicode_round_trip(self):
+ doc = {
+ "key": util.u('réveillé'),
+ "data": {"k1": util.u('drôle')}
+ }
+ s = select([JSON.MySqlJsonDocument(doc)])
+ eq_(
+ testing.db.scalar(s),
+ doc
+ )
+
+ def test_eval_none_flag_orm(self):
+ Base = declarative_base()
+
+ class Data(Base):
+ __table__ = self.tables.data_table
+
+ s = Session(testing.db)
+
+ d1 = Data(name='d1', data=None, nulldata=None)
+ s.add(d1)
+ s.commit()
+
+ s.bulk_insert_mappings(
+ Data, [{"name": "d2", "data": None, "nulldata": None}]
+ )
+ eq_(
+ s.query(
+ cast(self.tables.data_table.c.data, String),
+ cast(self.tables.data_table.c.nulldata, String)
+ ).filter(self.tables.data_table.c.name == 'd1').first(),
+ ("null", None)
+ )
+ eq_(
+ s.query(
+ cast(self.tables.data_table.c.data, String),
+ cast(self.tables.data_table.c.nulldata, String)
+ ).filter(self.tables.data_table.c.name == 'd2').first(),
+ ("null", None)
+ )
+
+# ::TODO:: postgres jsonb tests have tests for contains/contained/etc...
+# \ No newline at end of file