diff options
Diffstat (limited to 'test')
| -rw-r--r-- | test/dialect/mysql/test_types.py | 394 |
1 files changed, 388 insertions, 6 deletions
diff --git a/test/dialect/mysql/test_types.py b/test/dialect/mysql/test_types.py index 7c279ffbf..47264a5e9 100644 --- a/test/dialect/mysql/test_types.py +++ b/test/dialect/mysql/test_types.py @@ -1,26 +1,30 @@ # coding: utf-8 -from sqlalchemy.testing import eq_, assert_raises, assert_raises_message +from sqlalchemy.testing import eq_, is_, assert_raises, assert_raises_message from sqlalchemy import * from sqlalchemy import sql, exc, schema from sqlalchemy.util import u from sqlalchemy import util from sqlalchemy.dialects.mysql import base as mysql -from sqlalchemy.testing import fixtures, AssertsCompiledSQL, AssertsExecutionResults +from sqlalchemy.dialects.mysql import JSON +from sqlalchemy.testing import AssertsCompiledSQL, AssertsExecutionResults +from sqlalchemy.testing import engines, fixtures from sqlalchemy import testing +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import Session import datetime import decimal class TypesTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL): - "Test MySQL column types" + """Test MySQL column types""" __dialect__ = mysql.dialect() __only_on__ = 'mysql' __backend__ = True def test_numeric(self): - "Exercise type specification and options for numeric types." + """Exercise type specification and options for numeric types.""" columns = [ # column type, args, kwargs, expected ddl @@ -530,7 +534,6 @@ class TypesTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL): ] ) - def test_datetime_generic(self): self.assert_compile( mysql.DATETIME(), @@ -543,7 +546,6 @@ class TypesTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL): "DATETIME(4)" ) - def test_time_generic(self): """"Exercise TIME.""" @@ -1021,3 +1023,383 @@ def colspec(c): return testing.db.dialect.ddl_compiler( testing.db.dialect, None).get_column_specification(c) + +class JSONTest(AssertsCompiledSQL, fixtures.TestBase): + __only_on__ = ('mysql >= 5.7.9',) + __dialect__ = mysql.dialect() + + def setup(self): + metadata = MetaData() + self.test_table = Table('test_table', metadata, + Column('id', Integer, primary_key=True), + Column('test_column', JSON), + ) + self.jsoncol = self.test_table.c.test_column + self.ids = self.test_table.c.id + + def _test_where(self, whereclause, expected): + stmt = select([self.test_table]).where(whereclause) + self.assert_compile( + stmt, + "SELECT test_table.id, test_table.test_column FROM test_table " + "WHERE %s" % expected + ) + + def _test_cols(self, colclause, expected, from_=True): + stmt = select([colclause]) + self.assert_compile( + stmt, + ( + "SELECT %s" + + (" FROM test_table" if from_ else "") + ) % expected + ) + + def test_bind_serialize_default(self): + dialect = mysql.dialect() + proc = self.test_table.c.test_column.type._cached_bind_processor( + dialect) + eq_( + proc({"A": [1, 2, 3, True, False]}), + '{"A": [1, 2, 3, true, false]}' + ) + + def test_bind_serialize_none(self): + dialect = mysql.dialect() + proc = self.test_table.c.test_column.type._cached_bind_processor( + dialect) + eq_( + proc(None), + 'null' + ) + + def test_bind_serialize_none_as_null(self): + dialect = mysql.dialect() + proc = JSON(none_as_null=True)._cached_bind_processor( + dialect) + eq_( + proc(None), + None + ) + eq_( + proc(null()), + None + ) + + def test_bind_serialize_null(self): + dialect = mysql.dialect() + proc = self.test_table.c.test_column.type._cached_bind_processor( + dialect) + eq_( + proc(null()), + None + ) + + def test_result_deserialize_default(self): + dialect = mysql.dialect() + proc = self.test_table.c.test_column.type._cached_result_processor( + dialect, None) + eq_( + proc('{"A": [1, 2, 3, true, false]}'), + {"A": [1, 2, 3, True, False]} + ) + + def test_result_deserialize_null(self): + dialect = mysql.dialect() + proc = self.test_table.c.test_column.type._cached_result_processor( + dialect, None) + eq_( + proc('null'), + None + ) + + def test_result_deserialize_None(self): + dialect = mysql.dialect() + proc = self.test_table.c.test_column.type._cached_result_processor( + dialect, None) + eq_( + proc(None), + None + ) + + # This test is a bit misleading -- in real life you will need to cast to + # do anything + def test_where_getitem(self): + self._test_where( + self.jsoncol['bar'][1] == None, + "JSON_EXTRACT(test_table.test_column, %s) IS NULL" + ) + + def test_where_path(self): + self._test_where( + self.jsoncol[("foo", 1)] == None, + "JSON_EXTRACT(test_table.test_column, %s) IS NULL" + ) + + def test_path_typing(self): + col = column('x', JSON()) + is_( + col['q'].type._type_affinity, JSON + ) + is_( + col[('q', )].type._type_affinity, JSON + ) + is_( + col['q']['p'].type._type_affinity, JSON + ) + is_( + col[('q', 'p')].type._type_affinity, JSON + ) + + def test_where_getitem_json_cast(self): + self._test_where( + self.jsoncol['bar'].cast(Integer) == 5, + "CAST(JSON_EXTRACT(test_table.test_column, %s) AS SIGNED INTEGER) " + "= %s" + ) + + def test_cols_get(self): + self._test_cols( + self.jsoncol['foo'], + "JSON_EXTRACT(test_table.test_column, %s) AS `JSON_EXTRACT_1`", + True + ) + +class JSONRoundTripTest(fixtures.TablesTest): + __only_on__ = ('mysql >= 5.7.9',) + __backend__ = True + + @classmethod + def define_tables(cls, metadata): + Table('data_table', metadata, + Column('id', Integer, primary_key=True), + Column('name', String(30), nullable=False), + Column('data', JSON), + Column('nulldata', JSON(none_as_null=True)) + ) + + def _fixture_data(self, engine): + data_table = self.tables.data_table + engine.execute( + data_table.insert(), + {'name': 'r1', 'data': {"k1": "r1v1", "k2": "r1v2"}}, + {'name': 'r2', 'data': {"k1": "r2v1", "k2": "r2v2"}}, + {'name': 'r3', 'data': {"k1": "r3v1", "k2": "r3v2"}}, + {'name': 'r4', 'data': {"k1": "r4v1", "k2": "r4v2"}}, + {'name': 'r5', 'data': {"k1": "r5v1", "k2": "r5v2", "k3": 5}}, + {'name': 'r6', 'data': {"k1": {"r6v1": {'subr': [1, 2, 3]}}}}, + {'name': 'r7', 'data': [{"r7v1": 1}, {"r7v2": 2}, {"r7v3": 3}]}, + ) + + def _assert_data(self, compare, engine, column='data'): + col = self.tables.data_table.c[column] + + data = engine.execute( + select([col]). + order_by(self.tables.data_table.c.name) + ).fetchall() + eq_([d for d, in data], compare) + + def _assert_column_is_NULL(self, column='data'): + col = self.tables.data_table.c[column] + + data = testing.db.execute( + select([col]). + where(col.is_(null())) + ).fetchall() + eq_([d for d, in data], [None]) + + def _assert_column_is_JSON_NULL(self, column='data'): + col = self.tables.data_table.c[column] + + data = testing.db.execute( + select([col]). + where(cast(col, String) == "null") + ).fetchall() + eq_([d for d, in data], [None]) + + def test_insert(self): + testing.db.execute( + self.tables.data_table.insert(), + {'name': 'r1', 'data': {"k1": "r1v1", "k2": "r1v2"}} + ) + self._assert_data([{"k1": "r1v1", "k2": "r1v2"}], testing.db) + + def test_insert_nulls(self): + testing.db.execute( + self.tables.data_table.insert(), + {'name': 'r1', 'data': null()} + ) + self._assert_data([None], testing.db) + + def test_insert_none_as_null(self): + testing.db.execute( + self.tables.data_table.insert(), + {'name': 'r1', 'nulldata': None} + ) + self._assert_column_is_NULL(column='nulldata') + + def test_insert_nulljson_into_none_as_null(self): + testing.db.execute( + self.tables.data_table.insert(), + {'name': 'r1', 'nulldata': JSON.NULL} + ) + self._assert_column_is_JSON_NULL(column='nulldata') + + def test_reflect(self): + insp = inspect(testing.db) + cols = insp.get_columns('data_table') + assert isinstance(cols[2]['type'], JSON) + + def test_custom_serialize_deserialize(self): + import json + + def loads(value): + value = json.loads(value) + value['x'] += '_loads' + return value + + def dumps(value): + value = dict(value) + value['x'] = 'dumps_y' + return json.dumps(value) + + engine = engines.testing_engine(options=dict( + json_serializer=dumps, + json_deserializer=loads + )) + + engine.execute( + self.tables.data_table.insert(), + {'name': 'r1', 'data': {"key": "value", "x": "q"}} + ) + self._assert_data([{"key": "value", "x": "dumps_y_loads"}], engine) + + def test_attribute_query(self): + self._fixture_data(testing.db) + data_table = self.tables.data_table + result = testing.db.execute( + select([data_table.c.name]).where( + data_table.c.data.json.k1['r6v1'].subr == [1, 2, 3] + ) + ) + eq_(result.scalar(), 'r6') + + def test_path_query_list(self): + self._fixture_data(testing.db) + data_table = self.tables.data_table + result = testing.db.execute( + select([data_table.c.name]).where( + data_table.c.data[('k1', 'r6v1', 'subr')] == [1, 2, 3] + ) + ) + eq_(result.scalar(), 'r6') + + def test_path_query_dict(self): + self._fixture_data(testing.db) + data_table = self.tables.data_table + result = testing.db.execute( + select([data_table.c.name]).where( + data_table.c.data[('k1', 'r6v1')] == {'subr': [1, 2, 3]} + ) + ) + eq_(result.scalar(), 'r6') + + def test_multi_index_query(self): + self._fixture_data(testing.db) + data_table = self.tables.data_table + result = testing.db.execute( + select([data_table.c.name]).where( + data_table.c.data['k1']['r6v1']['subr'] == [1, 2, 3] + ) + ) + eq_(result.scalar(), 'r6') + + def test_wildcard_query(self): + self._fixture_data(testing.db) + data_table = self.tables.data_table + data = ( + (data_table.c.data['*']['%'], 'r7', [1, 2, 3]), + (data_table.c.data['**.subr']['*'], 'r6', [1, 2, 3]), + (data_table.c.data['**']['subr']['*'], 'r6', [1, 2, 3]), + ) + for datum in data: + result = testing.db.execute( + select([datum[0]]).where(data_table.c.name == datum[1] + ) + ) + eq_(result.scalar(), datum[2]) + + def test_query_returned_as_int(self): + self._fixture_data(testing.db) + data_table = self.tables.data_table + result = testing.db.execute( + select([data_table.c.data['k3'].cast(Integer)]).where( + data_table.c.name == 'r5') + ).first() + assert isinstance(result[0], long) + + def test_criterion(self): + self._fixture_data(testing.db) + data_table = self.tables.data_table + result = testing.db.execute( + select([data_table.c.data]).where( + data_table.c.data['k1'] == 'r3v1' + ) + ).first() + eq_(result, ({'k1': 'r3v1', 'k2': 'r3v2'},)) + + def test_fixed_round_trip(self): + doc = { + "key": "value", + "key2": {"k1": "v1", "k2": "v2"} + } + s = select([JSON.MySqlJsonDocument(doc)]) + eq_( + testing.db.scalar(s), + doc + ) + + def test_unicode_round_trip(self): + doc = { + "key": util.u('réveillé'), + "data": {"k1": util.u('drôle')} + } + s = select([JSON.MySqlJsonDocument(doc)]) + eq_( + testing.db.scalar(s), + doc + ) + + def test_eval_none_flag_orm(self): + Base = declarative_base() + + class Data(Base): + __table__ = self.tables.data_table + + s = Session(testing.db) + + d1 = Data(name='d1', data=None, nulldata=None) + s.add(d1) + s.commit() + + s.bulk_insert_mappings( + Data, [{"name": "d2", "data": None, "nulldata": None}] + ) + eq_( + s.query( + cast(self.tables.data_table.c.data, String), + cast(self.tables.data_table.c.nulldata, String) + ).filter(self.tables.data_table.c.name == 'd1').first(), + ("null", None) + ) + eq_( + s.query( + cast(self.tables.data_table.c.data, String), + cast(self.tables.data_table.c.nulldata, String) + ).filter(self.tables.data_table.c.name == 'd2').first(), + ("null", None) + ) + +# ::TODO:: postgres jsonb tests have tests for contains/contained/etc... +#
\ No newline at end of file |
