diff options
author | Stephen Rauch <stephen.rauch+github@gmail.com> | 2015-12-09 14:10:15 -0800 |
---|---|---|
committer | Stephen Rauch <stephen.rauch+github@gmail.com> | 2015-12-09 14:10:15 -0800 |
commit | b42ce1487e53ede01f4d37b3c71a629b554caeb6 (patch) | |
tree | ca9c72c0748c4485fde010b82311c90262c91b20 | |
parent | f4a1129e79e0cd938da3e7737b190f7f4db3ed63 (diff) | |
download | sqlalchemy-pr/221.tar.gz |
Add support for MySQL JSONpr/221
-rw-r--r-- | .gitignore | 1 | ||||
-rw-r--r-- | lib/sqlalchemy/dialects/mysql/__init__.py | 2 | ||||
-rw-r--r-- | lib/sqlalchemy/dialects/mysql/base.py | 27 | ||||
-rw-r--r-- | lib/sqlalchemy/dialects/mysql/json.py | 325 | ||||
-rw-r--r-- | test/dialect/mysql/test_types.py | 394 |
5 files changed, 741 insertions, 8 deletions
diff --git a/.gitignore b/.gitignore index 81fd2d9ed..5acbb972f 100644 --- a/.gitignore +++ b/.gitignore @@ -20,3 +20,4 @@ sqlnet.log /mapping_setup.py /test.py /.cache/ +/.idea/ diff --git a/lib/sqlalchemy/dialects/mysql/__init__.py b/lib/sqlalchemy/dialects/mysql/__init__.py index c1f78bd1d..55a4ebca7 100644 --- a/lib/sqlalchemy/dialects/mysql/__init__.py +++ b/lib/sqlalchemy/dialects/mysql/__init__.py @@ -21,6 +21,8 @@ from .base import \ TINYBLOB, TINYINT, TINYTEXT,\ VARBINARY, VARCHAR, YEAR, dialect +from .json import JSON + __all__ = ( 'BIGINT', 'BINARY', 'BIT', 'BLOB', 'BOOLEAN', 'CHAR', 'DATE', 'DATETIME', 'DECIMAL', 'DOUBLE', 'ENUM', 'DECIMAL', 'FLOAT', 'INTEGER', 'INTEGER', diff --git a/lib/sqlalchemy/dialects/mysql/base.py b/lib/sqlalchemy/dialects/mysql/base.py index 988746403..9ed13eb91 100644 --- a/lib/sqlalchemy/dialects/mysql/base.py +++ b/lib/sqlalchemy/dialects/mysql/base.py @@ -534,6 +534,13 @@ output:: ``explicit_defaults_for_timestamp``. Prior to this version, it will not render "NOT NULL" for a TIMESTAMP column that is ``nullable=False``. +JSON Types +---------- + +The MySQL dialect supports the JSON datatype as of version 5.7: + +* :class:`.mysql.JSON` + """ import datetime @@ -602,6 +609,8 @@ RESERVED_WORDS = set( 'get', 'io_after_gtids', 'io_before_gtids', 'master_bind', 'one_shot', 'partition', 'sql_after_gtids', 'sql_before_gtids', # 5.6 + 'json', # 5.7 + ]) AUTOCOMMIT_RE = re.compile( @@ -2225,6 +2234,12 @@ class MySQLTypeCompiler(compiler.GenericTypeCompiler): else: return self._extend_numeric(type_, "SMALLINT") + def visit_JSON(self, type_, **kw): + if not self.dialect._supports_json: + util.warn("Current MySQL version does not support JSON.") + return "TEXT" + return "JSON" + def visit_BIT(self, type_, **kw): if type_.length is not None: return "BIT(%s)" % type_.length @@ -2433,10 +2448,13 @@ class MySQLDialect(default.DefaultDialect): }) ] - def __init__(self, isolation_level=None, **kwargs): - kwargs.pop('use_ansiquotes', None) # legacy + def __init__(self, isolation_level=None, json_serializer=None, + json_deserializer=None, **kwargs): + kwargs.pop('use_ansiquotes', None) # legacy default.DefaultDialect.__init__(self, **kwargs) self.isolation_level = isolation_level + self._json_deserializer = json_deserializer + self._json_serializer = json_serializer def on_connect(self): if self.isolation_level is not None: @@ -2607,6 +2625,11 @@ class MySQLDialect(default.DefaultDialect): return self.server_version_info is None or \ self.server_version_info >= (4, 0, 2) + @property + def _supports_json(self): + return self.server_version_info is None or \ + self.server_version_info >= (5, 7, 9) + @reflection.cache def get_schema_names(self, connection, **kw): rp = connection.execute("SHOW schemas") diff --git a/lib/sqlalchemy/dialects/mysql/json.py b/lib/sqlalchemy/dialects/mysql/json.py new file mode 100644 index 000000000..1eab58eea --- /dev/null +++ b/lib/sqlalchemy/dialects/mysql/json.py @@ -0,0 +1,325 @@ +# postgresql/json.py +# Copyright (C) 2005-2015 the SQLAlchemy authors and contributors +# <see AUTHORS file> +# +# This module is part of SQLAlchemy and is released under +# the MIT License: http://www.opensource.org/licenses/mit-license.php +from __future__ import absolute_import + +import collections +import json + +from .base import ischema_names +from ... import types as sqltypes +from ...sql import elements +from ...sql import expression +from ... import util +from operator import getitem + + +__all__ = ('JSON',) + + +class JSON(sqltypes.Indexable, sqltypes.TypeEngine): + + """Represent the Mysql JSON type. + + The :class:`.JSON` type stores arbitrary JSON format data, e.g.:: + + data_table = Table('data_table', metadata, + Column('id', Integer, primary_key=True), + Column('data', JSON) + ) + + with engine.connect() as conn: + conn.execute( + data_table.insert(), + data = {"key1": "value1", "key2": "value2"} + ) + + :class:`.JSON` provides several operations: + + * Index operations:: + + data_table.c.data['some key'] + + * Index operations with CAST + (equivalent to + ``CAST(JSON_EXPORT(data_table.data, $."some key" AS <type>)``):: + + data_table.c.data['some key'].cast(Integer) + + * Path index operations:: + + data_table.c.data[('key_1', 'key_2', ..., 'key_n')] + + * Path attribute operations:: + + data_table.c.data.json.key_1.key_2 + + (equivalent to: data_table.c.data.json['key_1']['key_2']) + + * Wildcards:: + + data_table.c.data['*'], rendered as [*], array index wildcard + data_table.c.data['%'], rendered as .*, object key wildcard + + Index operations return an expression object whose type defaults to + :class:`.JSON` by default, so that further JSON-oriented instructions + may be called upon the result type. + + The :class:`.JSON` type, when used with the SQLAlchemy ORM, does not + detect in-place mutations to the structure. In order to detect these, the + :mod:`sqlalchemy.ext.mutable` extension must be used. This extension will + allow "in-place" changes to the datastructure to produce events which + will be detected by the unit of work. See the example at :class:`.HSTORE` + for a simple example involving a dictionary. + + When working with NULL values, the :class:`.JSON` type recommends the + use of two specific constants in order to differentiate between a column + that evaluates to SQL NULL, e.g. no value, vs. the JSON-encoded string + of ``"null"``. To insert or select against a value that is SQL NULL, + use the constant :func:`.null`:: + + conn.execute(table.insert(), json_value=null()) + + To insert or select against a value that is JSON ``"null"``, use the + constant :attr:`.JSON.NULL`:: + + conn.execute(table.insert(), json_value=JSON.NULL) + + The :class:`.JSON` type supports a flag + :paramref:`.JSON.none_as_null` which when set to True will result + in the Python constant ``None`` evaluating to the value of SQL + NULL, and when set to False results in the Python constant + ``None`` evaluating to the value of JSON ``"null"``. The Python + value ``None`` may be used in conjunction with either + :attr:`.JSON.NULL` and :func:`.null` in order to indicate NULL + values, but care must be taken as to the value of the + :paramref:`.JSON.none_as_null` in these cases. + + Custom serializers and deserializers are specified at the dialect level, + that is using :func:`.create_engine`. The reason for this is that when + using psycopg2, the DBAPI only allows serializers at the per-cursor + or per-connection level. E.g.:: + + engine = create_engine("postgresql://scott:tiger@localhost/test", + json_serializer=my_serialize_fn, + json_deserializer=my_deserialize_fn + ) + + When using the psycopg2 dialect, the json_deserializer is registered + against the database using ``psycopg2.extras.register_default_json``. + + .. versionadded:: 1.1 + + """ + + __visit_name__ = 'JSON' + + hashable = False + + NULL = util.symbol('JSON_NULL') + """Describe the json value of NULL. + + This value is used to force the JSON value of ``"null"`` to be + used as the value. A value of Python ``None`` will be recognized + either as SQL NULL or JSON ``"null"``, based on the setting + of the :paramref:`.JSON.none_as_null` flag; the :attr:`.JSON.NULL` + constant can be used to always resolve to JSON ``"null"`` regardless + of this setting. This is in contrast to the :func:`.sql.null` construct, + which always resolves to SQL NULL. E.g.:: + + from sqlalchemy import null + from sqlalchemy.dialects.postgresql import JSON + + obj1 = MyObject(json_value=null()) # *always* insert SQL NULL + obj2 = MyObject(json_value=JSON.NULL) # *always* insert JSON "null" + + session.add_all([obj1, obj2]) + session.commit() + + .. versionadded:: 1.1 + + """ + + def __init__(self, none_as_null=False): + """Construct a :class:`.JSON` type. + + :param none_as_null: if True, persist the value ``None`` as a + SQL NULL value, not the JSON encoding of ``null``. Note that + when this flag is False, the :func:`.null` construct can still + be used to persist a NULL value:: + + from sqlalchemy import null + conn.execute(table.insert(), data=null()) + + .. seealso:: + + :attr:`.JSON.NULL` + + .. versionadded:: 1.1.0 + + """ + self.none_as_null = none_as_null + + @property + def should_evaluate_none(self): + return not self.none_as_null + + class Comparator( + sqltypes.Indexable.Comparator, sqltypes.Concatenable.Comparator): + """Define comparison operations for :class:`.JSON`.""" + + def _setup_getitem(self, index): + return getitem, JSON._massage_index(index), self.type + + def operate(self, op, *other, **kwargs): + if op.__name__ == 'getitem': + return JSON.MySqlJsonExtract(self.expr, *other) + else: + if len(other) > 0 and isinstance( + other[0], (list, tuple, dict)): + # convert list, tuple or dict json doc. + other = list(other) + other[0] = JSON.MySqlJsonDocument(other[0], op) + + return super(JSON.Comparator, self).operate( + op, *other, **kwargs) + + @property + def json(self): + """Property which allows attribute access to document fields + eg: + data_table.c.data.json.k1 + + is equivalent to: + data_table.c.data['k1'] + """ + return JSON.MySqlJsonExtract(self.expr, "", attribute_access=True) + + comparator_factory = Comparator + + class MySqlJsonExtract(expression.Function, dict): + """Represents a Json Extract Function which can be invoked via getitem. + """ + + def __init__(self, doc, path, attribute_access=False): + self.json_doc = doc + self.json_path = path + self.attribute_access = attribute_access + expression.Function.__init__( + self, "JSON_EXTRACT", doc, '$' + path, type_=JSON) + + # these two lines, along with the json property on the + # comparator above, support the __getattr__ method below + if self.attribute_access: + local_dict = {} + dict.__init__(self, local_dict) + + def __getitem__(self, path): + """getitem on an existing json_extract just adds to the path""" + new_path = self.json_path + JSON._massage_index(path) + return JSON.MySqlJsonExtract( + self.json_doc, new_path, attribute_access=self.attribute_access) + + def __getattr__(self, item): + """Maps attributes to json doc path. Only called if this class + does not have an attribute with this name. This allows attribute + based access to sub-documents, but it means that hasattr() + will now be broken when acting on this class. + + The test for "item[0] != '_' works around this for the known + cases of hasattr() usage in the framework. So attributes names + not starting with '_' will be considered to be part of the + json document. + """ + if self.attribute_access or item[0] != '_': + return self.__getitem__(item) + raise AttributeError(item) + + class MySqlJsonDocument(expression.Function): + """Represents a literal Json Document""" + + def __init__(self, doc, op=None): + if isinstance(doc, (list, tuple)): + name = "json_array" + empty = '[]' + else: + name = "json_object" + empty = '{}' + + self.obj = doc + bound_doc = expression.bindparam( + name, value=doc, type_=JSON, unique=True, + _compared_to_operator=op, _compared_to_type=JSON) + + # could not find a way to represent a json literal document, + # so resorted to JSON_MERGE() function which takes as a + # parameter a json document. + expression.Function.__init__( + self, "JSON_MERGE", empty, bound_doc, type_=JSON) + + @staticmethod + def _massage_index(index): + """ + Integers are assumed to be array lookups and are formatted as: [x] + Strings are assumed to be dict lookups and are formatted as: ."x" + Wildcard ['*'] from python is formatted as an array wildcard [*] + Wildcard ['%'] from python is formatted as an object wildcard .* + """ + + if isinstance(index, int): + index = '[%d]' % index + elif not isinstance(index, util.string_types): + assert isinstance(index, collections.Sequence) + tokens = ["%s" % JSON._massage_index(elem) for elem in index] + index = "".join(tokens) + elif index == '%': + index = '.*' + elif index == '*': + index = '[*]' + elif index[:2] != '**': + index = '."%s"' % index + return index + + def bind_processor(self, dialect): + json_serializer = dialect._json_serializer or json.dumps + if util.py2k: + encoding = dialect.encoding + else: + encoding = None + + def process(value): + if value is self.NULL: + value = None + elif isinstance(value, elements.Null) or ( + value is None and self.none_as_null + ): + return None + if encoding: + encoded = json_serializer(value).encode(encoding) + else: + encoded = json_serializer(value) + + return encoded + + return process + + def result_processor(self, dialect, coltype): + json_deserializer = dialect._json_deserializer or json.loads + if util.py2k: + encoding = dialect.encoding + else: + encoding = None + + def process(value): + if value is None: + return None + if encoding: + value = value.decode(encoding) + return json_deserializer(value) + return process + + +ischema_names['json'] = JSON diff --git a/test/dialect/mysql/test_types.py b/test/dialect/mysql/test_types.py index 7c279ffbf..47264a5e9 100644 --- a/test/dialect/mysql/test_types.py +++ b/test/dialect/mysql/test_types.py @@ -1,26 +1,30 @@ # coding: utf-8 -from sqlalchemy.testing import eq_, assert_raises, assert_raises_message +from sqlalchemy.testing import eq_, is_, assert_raises, assert_raises_message from sqlalchemy import * from sqlalchemy import sql, exc, schema from sqlalchemy.util import u from sqlalchemy import util from sqlalchemy.dialects.mysql import base as mysql -from sqlalchemy.testing import fixtures, AssertsCompiledSQL, AssertsExecutionResults +from sqlalchemy.dialects.mysql import JSON +from sqlalchemy.testing import AssertsCompiledSQL, AssertsExecutionResults +from sqlalchemy.testing import engines, fixtures from sqlalchemy import testing +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import Session import datetime import decimal class TypesTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL): - "Test MySQL column types" + """Test MySQL column types""" __dialect__ = mysql.dialect() __only_on__ = 'mysql' __backend__ = True def test_numeric(self): - "Exercise type specification and options for numeric types." + """Exercise type specification and options for numeric types.""" columns = [ # column type, args, kwargs, expected ddl @@ -530,7 +534,6 @@ class TypesTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL): ] ) - def test_datetime_generic(self): self.assert_compile( mysql.DATETIME(), @@ -543,7 +546,6 @@ class TypesTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL): "DATETIME(4)" ) - def test_time_generic(self): """"Exercise TIME.""" @@ -1021,3 +1023,383 @@ def colspec(c): return testing.db.dialect.ddl_compiler( testing.db.dialect, None).get_column_specification(c) + +class JSONTest(AssertsCompiledSQL, fixtures.TestBase): + __only_on__ = ('mysql >= 5.7.9',) + __dialect__ = mysql.dialect() + + def setup(self): + metadata = MetaData() + self.test_table = Table('test_table', metadata, + Column('id', Integer, primary_key=True), + Column('test_column', JSON), + ) + self.jsoncol = self.test_table.c.test_column + self.ids = self.test_table.c.id + + def _test_where(self, whereclause, expected): + stmt = select([self.test_table]).where(whereclause) + self.assert_compile( + stmt, + "SELECT test_table.id, test_table.test_column FROM test_table " + "WHERE %s" % expected + ) + + def _test_cols(self, colclause, expected, from_=True): + stmt = select([colclause]) + self.assert_compile( + stmt, + ( + "SELECT %s" + + (" FROM test_table" if from_ else "") + ) % expected + ) + + def test_bind_serialize_default(self): + dialect = mysql.dialect() + proc = self.test_table.c.test_column.type._cached_bind_processor( + dialect) + eq_( + proc({"A": [1, 2, 3, True, False]}), + '{"A": [1, 2, 3, true, false]}' + ) + + def test_bind_serialize_none(self): + dialect = mysql.dialect() + proc = self.test_table.c.test_column.type._cached_bind_processor( + dialect) + eq_( + proc(None), + 'null' + ) + + def test_bind_serialize_none_as_null(self): + dialect = mysql.dialect() + proc = JSON(none_as_null=True)._cached_bind_processor( + dialect) + eq_( + proc(None), + None + ) + eq_( + proc(null()), + None + ) + + def test_bind_serialize_null(self): + dialect = mysql.dialect() + proc = self.test_table.c.test_column.type._cached_bind_processor( + dialect) + eq_( + proc(null()), + None + ) + + def test_result_deserialize_default(self): + dialect = mysql.dialect() + proc = self.test_table.c.test_column.type._cached_result_processor( + dialect, None) + eq_( + proc('{"A": [1, 2, 3, true, false]}'), + {"A": [1, 2, 3, True, False]} + ) + + def test_result_deserialize_null(self): + dialect = mysql.dialect() + proc = self.test_table.c.test_column.type._cached_result_processor( + dialect, None) + eq_( + proc('null'), + None + ) + + def test_result_deserialize_None(self): + dialect = mysql.dialect() + proc = self.test_table.c.test_column.type._cached_result_processor( + dialect, None) + eq_( + proc(None), + None + ) + + # This test is a bit misleading -- in real life you will need to cast to + # do anything + def test_where_getitem(self): + self._test_where( + self.jsoncol['bar'][1] == None, + "JSON_EXTRACT(test_table.test_column, %s) IS NULL" + ) + + def test_where_path(self): + self._test_where( + self.jsoncol[("foo", 1)] == None, + "JSON_EXTRACT(test_table.test_column, %s) IS NULL" + ) + + def test_path_typing(self): + col = column('x', JSON()) + is_( + col['q'].type._type_affinity, JSON + ) + is_( + col[('q', )].type._type_affinity, JSON + ) + is_( + col['q']['p'].type._type_affinity, JSON + ) + is_( + col[('q', 'p')].type._type_affinity, JSON + ) + + def test_where_getitem_json_cast(self): + self._test_where( + self.jsoncol['bar'].cast(Integer) == 5, + "CAST(JSON_EXTRACT(test_table.test_column, %s) AS SIGNED INTEGER) " + "= %s" + ) + + def test_cols_get(self): + self._test_cols( + self.jsoncol['foo'], + "JSON_EXTRACT(test_table.test_column, %s) AS `JSON_EXTRACT_1`", + True + ) + +class JSONRoundTripTest(fixtures.TablesTest): + __only_on__ = ('mysql >= 5.7.9',) + __backend__ = True + + @classmethod + def define_tables(cls, metadata): + Table('data_table', metadata, + Column('id', Integer, primary_key=True), + Column('name', String(30), nullable=False), + Column('data', JSON), + Column('nulldata', JSON(none_as_null=True)) + ) + + def _fixture_data(self, engine): + data_table = self.tables.data_table + engine.execute( + data_table.insert(), + {'name': 'r1', 'data': {"k1": "r1v1", "k2": "r1v2"}}, + {'name': 'r2', 'data': {"k1": "r2v1", "k2": "r2v2"}}, + {'name': 'r3', 'data': {"k1": "r3v1", "k2": "r3v2"}}, + {'name': 'r4', 'data': {"k1": "r4v1", "k2": "r4v2"}}, + {'name': 'r5', 'data': {"k1": "r5v1", "k2": "r5v2", "k3": 5}}, + {'name': 'r6', 'data': {"k1": {"r6v1": {'subr': [1, 2, 3]}}}}, + {'name': 'r7', 'data': [{"r7v1": 1}, {"r7v2": 2}, {"r7v3": 3}]}, + ) + + def _assert_data(self, compare, engine, column='data'): + col = self.tables.data_table.c[column] + + data = engine.execute( + select([col]). + order_by(self.tables.data_table.c.name) + ).fetchall() + eq_([d for d, in data], compare) + + def _assert_column_is_NULL(self, column='data'): + col = self.tables.data_table.c[column] + + data = testing.db.execute( + select([col]). + where(col.is_(null())) + ).fetchall() + eq_([d for d, in data], [None]) + + def _assert_column_is_JSON_NULL(self, column='data'): + col = self.tables.data_table.c[column] + + data = testing.db.execute( + select([col]). + where(cast(col, String) == "null") + ).fetchall() + eq_([d for d, in data], [None]) + + def test_insert(self): + testing.db.execute( + self.tables.data_table.insert(), + {'name': 'r1', 'data': {"k1": "r1v1", "k2": "r1v2"}} + ) + self._assert_data([{"k1": "r1v1", "k2": "r1v2"}], testing.db) + + def test_insert_nulls(self): + testing.db.execute( + self.tables.data_table.insert(), + {'name': 'r1', 'data': null()} + ) + self._assert_data([None], testing.db) + + def test_insert_none_as_null(self): + testing.db.execute( + self.tables.data_table.insert(), + {'name': 'r1', 'nulldata': None} + ) + self._assert_column_is_NULL(column='nulldata') + + def test_insert_nulljson_into_none_as_null(self): + testing.db.execute( + self.tables.data_table.insert(), + {'name': 'r1', 'nulldata': JSON.NULL} + ) + self._assert_column_is_JSON_NULL(column='nulldata') + + def test_reflect(self): + insp = inspect(testing.db) + cols = insp.get_columns('data_table') + assert isinstance(cols[2]['type'], JSON) + + def test_custom_serialize_deserialize(self): + import json + + def loads(value): + value = json.loads(value) + value['x'] += '_loads' + return value + + def dumps(value): + value = dict(value) + value['x'] = 'dumps_y' + return json.dumps(value) + + engine = engines.testing_engine(options=dict( + json_serializer=dumps, + json_deserializer=loads + )) + + engine.execute( + self.tables.data_table.insert(), + {'name': 'r1', 'data': {"key": "value", "x": "q"}} + ) + self._assert_data([{"key": "value", "x": "dumps_y_loads"}], engine) + + def test_attribute_query(self): + self._fixture_data(testing.db) + data_table = self.tables.data_table + result = testing.db.execute( + select([data_table.c.name]).where( + data_table.c.data.json.k1['r6v1'].subr == [1, 2, 3] + ) + ) + eq_(result.scalar(), 'r6') + + def test_path_query_list(self): + self._fixture_data(testing.db) + data_table = self.tables.data_table + result = testing.db.execute( + select([data_table.c.name]).where( + data_table.c.data[('k1', 'r6v1', 'subr')] == [1, 2, 3] + ) + ) + eq_(result.scalar(), 'r6') + + def test_path_query_dict(self): + self._fixture_data(testing.db) + data_table = self.tables.data_table + result = testing.db.execute( + select([data_table.c.name]).where( + data_table.c.data[('k1', 'r6v1')] == {'subr': [1, 2, 3]} + ) + ) + eq_(result.scalar(), 'r6') + + def test_multi_index_query(self): + self._fixture_data(testing.db) + data_table = self.tables.data_table + result = testing.db.execute( + select([data_table.c.name]).where( + data_table.c.data['k1']['r6v1']['subr'] == [1, 2, 3] + ) + ) + eq_(result.scalar(), 'r6') + + def test_wildcard_query(self): + self._fixture_data(testing.db) + data_table = self.tables.data_table + data = ( + (data_table.c.data['*']['%'], 'r7', [1, 2, 3]), + (data_table.c.data['**.subr']['*'], 'r6', [1, 2, 3]), + (data_table.c.data['**']['subr']['*'], 'r6', [1, 2, 3]), + ) + for datum in data: + result = testing.db.execute( + select([datum[0]]).where(data_table.c.name == datum[1] + ) + ) + eq_(result.scalar(), datum[2]) + + def test_query_returned_as_int(self): + self._fixture_data(testing.db) + data_table = self.tables.data_table + result = testing.db.execute( + select([data_table.c.data['k3'].cast(Integer)]).where( + data_table.c.name == 'r5') + ).first() + assert isinstance(result[0], long) + + def test_criterion(self): + self._fixture_data(testing.db) + data_table = self.tables.data_table + result = testing.db.execute( + select([data_table.c.data]).where( + data_table.c.data['k1'] == 'r3v1' + ) + ).first() + eq_(result, ({'k1': 'r3v1', 'k2': 'r3v2'},)) + + def test_fixed_round_trip(self): + doc = { + "key": "value", + "key2": {"k1": "v1", "k2": "v2"} + } + s = select([JSON.MySqlJsonDocument(doc)]) + eq_( + testing.db.scalar(s), + doc + ) + + def test_unicode_round_trip(self): + doc = { + "key": util.u('réveillé'), + "data": {"k1": util.u('drôle')} + } + s = select([JSON.MySqlJsonDocument(doc)]) + eq_( + testing.db.scalar(s), + doc + ) + + def test_eval_none_flag_orm(self): + Base = declarative_base() + + class Data(Base): + __table__ = self.tables.data_table + + s = Session(testing.db) + + d1 = Data(name='d1', data=None, nulldata=None) + s.add(d1) + s.commit() + + s.bulk_insert_mappings( + Data, [{"name": "d2", "data": None, "nulldata": None}] + ) + eq_( + s.query( + cast(self.tables.data_table.c.data, String), + cast(self.tables.data_table.c.nulldata, String) + ).filter(self.tables.data_table.c.name == 'd1').first(), + ("null", None) + ) + eq_( + s.query( + cast(self.tables.data_table.c.data, String), + cast(self.tables.data_table.c.nulldata, String) + ).filter(self.tables.data_table.c.name == 'd2').first(), + ("null", None) + ) + +# ::TODO:: postgres jsonb tests have tests for contains/contained/etc... +#
\ No newline at end of file |