summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/dialect/mysql/test_types.py394
1 files changed, 388 insertions, 6 deletions
diff --git a/test/dialect/mysql/test_types.py b/test/dialect/mysql/test_types.py
index 7c279ffbf..47264a5e9 100644
--- a/test/dialect/mysql/test_types.py
+++ b/test/dialect/mysql/test_types.py
@@ -1,26 +1,30 @@
# coding: utf-8
-from sqlalchemy.testing import eq_, assert_raises, assert_raises_message
+from sqlalchemy.testing import eq_, is_, assert_raises, assert_raises_message
from sqlalchemy import *
from sqlalchemy import sql, exc, schema
from sqlalchemy.util import u
from sqlalchemy import util
from sqlalchemy.dialects.mysql import base as mysql
-from sqlalchemy.testing import fixtures, AssertsCompiledSQL, AssertsExecutionResults
+from sqlalchemy.dialects.mysql import JSON
+from sqlalchemy.testing import AssertsCompiledSQL, AssertsExecutionResults
+from sqlalchemy.testing import engines, fixtures
from sqlalchemy import testing
+from sqlalchemy.ext.declarative import declarative_base
+from sqlalchemy.orm import Session
import datetime
import decimal
class TypesTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL):
- "Test MySQL column types"
+ """Test MySQL column types"""
__dialect__ = mysql.dialect()
__only_on__ = 'mysql'
__backend__ = True
def test_numeric(self):
- "Exercise type specification and options for numeric types."
+ """Exercise type specification and options for numeric types."""
columns = [
# column type, args, kwargs, expected ddl
@@ -530,7 +534,6 @@ class TypesTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL):
]
)
-
def test_datetime_generic(self):
self.assert_compile(
mysql.DATETIME(),
@@ -543,7 +546,6 @@ class TypesTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL):
"DATETIME(4)"
)
-
def test_time_generic(self):
""""Exercise TIME."""
@@ -1021,3 +1023,383 @@ def colspec(c):
return testing.db.dialect.ddl_compiler(
testing.db.dialect, None).get_column_specification(c)
+
+class JSONTest(AssertsCompiledSQL, fixtures.TestBase):
+ __only_on__ = ('mysql >= 5.7.9',)
+ __dialect__ = mysql.dialect()
+
+ def setup(self):
+ metadata = MetaData()
+ self.test_table = Table('test_table', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('test_column', JSON),
+ )
+ self.jsoncol = self.test_table.c.test_column
+ self.ids = self.test_table.c.id
+
+ def _test_where(self, whereclause, expected):
+ stmt = select([self.test_table]).where(whereclause)
+ self.assert_compile(
+ stmt,
+ "SELECT test_table.id, test_table.test_column FROM test_table "
+ "WHERE %s" % expected
+ )
+
+ def _test_cols(self, colclause, expected, from_=True):
+ stmt = select([colclause])
+ self.assert_compile(
+ stmt,
+ (
+ "SELECT %s" +
+ (" FROM test_table" if from_ else "")
+ ) % expected
+ )
+
+ def test_bind_serialize_default(self):
+ dialect = mysql.dialect()
+ proc = self.test_table.c.test_column.type._cached_bind_processor(
+ dialect)
+ eq_(
+ proc({"A": [1, 2, 3, True, False]}),
+ '{"A": [1, 2, 3, true, false]}'
+ )
+
+ def test_bind_serialize_none(self):
+ dialect = mysql.dialect()
+ proc = self.test_table.c.test_column.type._cached_bind_processor(
+ dialect)
+ eq_(
+ proc(None),
+ 'null'
+ )
+
+ def test_bind_serialize_none_as_null(self):
+ dialect = mysql.dialect()
+ proc = JSON(none_as_null=True)._cached_bind_processor(
+ dialect)
+ eq_(
+ proc(None),
+ None
+ )
+ eq_(
+ proc(null()),
+ None
+ )
+
+ def test_bind_serialize_null(self):
+ dialect = mysql.dialect()
+ proc = self.test_table.c.test_column.type._cached_bind_processor(
+ dialect)
+ eq_(
+ proc(null()),
+ None
+ )
+
+ def test_result_deserialize_default(self):
+ dialect = mysql.dialect()
+ proc = self.test_table.c.test_column.type._cached_result_processor(
+ dialect, None)
+ eq_(
+ proc('{"A": [1, 2, 3, true, false]}'),
+ {"A": [1, 2, 3, True, False]}
+ )
+
+ def test_result_deserialize_null(self):
+ dialect = mysql.dialect()
+ proc = self.test_table.c.test_column.type._cached_result_processor(
+ dialect, None)
+ eq_(
+ proc('null'),
+ None
+ )
+
+ def test_result_deserialize_None(self):
+ dialect = mysql.dialect()
+ proc = self.test_table.c.test_column.type._cached_result_processor(
+ dialect, None)
+ eq_(
+ proc(None),
+ None
+ )
+
+ # This test is a bit misleading -- in real life you will need to cast to
+ # do anything
+ def test_where_getitem(self):
+ self._test_where(
+ self.jsoncol['bar'][1] == None,
+ "JSON_EXTRACT(test_table.test_column, %s) IS NULL"
+ )
+
+ def test_where_path(self):
+ self._test_where(
+ self.jsoncol[("foo", 1)] == None,
+ "JSON_EXTRACT(test_table.test_column, %s) IS NULL"
+ )
+
+ def test_path_typing(self):
+ col = column('x', JSON())
+ is_(
+ col['q'].type._type_affinity, JSON
+ )
+ is_(
+ col[('q', )].type._type_affinity, JSON
+ )
+ is_(
+ col['q']['p'].type._type_affinity, JSON
+ )
+ is_(
+ col[('q', 'p')].type._type_affinity, JSON
+ )
+
+ def test_where_getitem_json_cast(self):
+ self._test_where(
+ self.jsoncol['bar'].cast(Integer) == 5,
+ "CAST(JSON_EXTRACT(test_table.test_column, %s) AS SIGNED INTEGER) "
+ "= %s"
+ )
+
+ def test_cols_get(self):
+ self._test_cols(
+ self.jsoncol['foo'],
+ "JSON_EXTRACT(test_table.test_column, %s) AS `JSON_EXTRACT_1`",
+ True
+ )
+
+class JSONRoundTripTest(fixtures.TablesTest):
+ __only_on__ = ('mysql >= 5.7.9',)
+ __backend__ = True
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table('data_table', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('name', String(30), nullable=False),
+ Column('data', JSON),
+ Column('nulldata', JSON(none_as_null=True))
+ )
+
+ def _fixture_data(self, engine):
+ data_table = self.tables.data_table
+ engine.execute(
+ data_table.insert(),
+ {'name': 'r1', 'data': {"k1": "r1v1", "k2": "r1v2"}},
+ {'name': 'r2', 'data': {"k1": "r2v1", "k2": "r2v2"}},
+ {'name': 'r3', 'data': {"k1": "r3v1", "k2": "r3v2"}},
+ {'name': 'r4', 'data': {"k1": "r4v1", "k2": "r4v2"}},
+ {'name': 'r5', 'data': {"k1": "r5v1", "k2": "r5v2", "k3": 5}},
+ {'name': 'r6', 'data': {"k1": {"r6v1": {'subr': [1, 2, 3]}}}},
+ {'name': 'r7', 'data': [{"r7v1": 1}, {"r7v2": 2}, {"r7v3": 3}]},
+ )
+
+ def _assert_data(self, compare, engine, column='data'):
+ col = self.tables.data_table.c[column]
+
+ data = engine.execute(
+ select([col]).
+ order_by(self.tables.data_table.c.name)
+ ).fetchall()
+ eq_([d for d, in data], compare)
+
+ def _assert_column_is_NULL(self, column='data'):
+ col = self.tables.data_table.c[column]
+
+ data = testing.db.execute(
+ select([col]).
+ where(col.is_(null()))
+ ).fetchall()
+ eq_([d for d, in data], [None])
+
+ def _assert_column_is_JSON_NULL(self, column='data'):
+ col = self.tables.data_table.c[column]
+
+ data = testing.db.execute(
+ select([col]).
+ where(cast(col, String) == "null")
+ ).fetchall()
+ eq_([d for d, in data], [None])
+
+ def test_insert(self):
+ testing.db.execute(
+ self.tables.data_table.insert(),
+ {'name': 'r1', 'data': {"k1": "r1v1", "k2": "r1v2"}}
+ )
+ self._assert_data([{"k1": "r1v1", "k2": "r1v2"}], testing.db)
+
+ def test_insert_nulls(self):
+ testing.db.execute(
+ self.tables.data_table.insert(),
+ {'name': 'r1', 'data': null()}
+ )
+ self._assert_data([None], testing.db)
+
+ def test_insert_none_as_null(self):
+ testing.db.execute(
+ self.tables.data_table.insert(),
+ {'name': 'r1', 'nulldata': None}
+ )
+ self._assert_column_is_NULL(column='nulldata')
+
+ def test_insert_nulljson_into_none_as_null(self):
+ testing.db.execute(
+ self.tables.data_table.insert(),
+ {'name': 'r1', 'nulldata': JSON.NULL}
+ )
+ self._assert_column_is_JSON_NULL(column='nulldata')
+
+ def test_reflect(self):
+ insp = inspect(testing.db)
+ cols = insp.get_columns('data_table')
+ assert isinstance(cols[2]['type'], JSON)
+
+ def test_custom_serialize_deserialize(self):
+ import json
+
+ def loads(value):
+ value = json.loads(value)
+ value['x'] += '_loads'
+ return value
+
+ def dumps(value):
+ value = dict(value)
+ value['x'] = 'dumps_y'
+ return json.dumps(value)
+
+ engine = engines.testing_engine(options=dict(
+ json_serializer=dumps,
+ json_deserializer=loads
+ ))
+
+ engine.execute(
+ self.tables.data_table.insert(),
+ {'name': 'r1', 'data': {"key": "value", "x": "q"}}
+ )
+ self._assert_data([{"key": "value", "x": "dumps_y_loads"}], engine)
+
+ def test_attribute_query(self):
+ self._fixture_data(testing.db)
+ data_table = self.tables.data_table
+ result = testing.db.execute(
+ select([data_table.c.name]).where(
+ data_table.c.data.json.k1['r6v1'].subr == [1, 2, 3]
+ )
+ )
+ eq_(result.scalar(), 'r6')
+
+ def test_path_query_list(self):
+ self._fixture_data(testing.db)
+ data_table = self.tables.data_table
+ result = testing.db.execute(
+ select([data_table.c.name]).where(
+ data_table.c.data[('k1', 'r6v1', 'subr')] == [1, 2, 3]
+ )
+ )
+ eq_(result.scalar(), 'r6')
+
+ def test_path_query_dict(self):
+ self._fixture_data(testing.db)
+ data_table = self.tables.data_table
+ result = testing.db.execute(
+ select([data_table.c.name]).where(
+ data_table.c.data[('k1', 'r6v1')] == {'subr': [1, 2, 3]}
+ )
+ )
+ eq_(result.scalar(), 'r6')
+
+ def test_multi_index_query(self):
+ self._fixture_data(testing.db)
+ data_table = self.tables.data_table
+ result = testing.db.execute(
+ select([data_table.c.name]).where(
+ data_table.c.data['k1']['r6v1']['subr'] == [1, 2, 3]
+ )
+ )
+ eq_(result.scalar(), 'r6')
+
+ def test_wildcard_query(self):
+ self._fixture_data(testing.db)
+ data_table = self.tables.data_table
+ data = (
+ (data_table.c.data['*']['%'], 'r7', [1, 2, 3]),
+ (data_table.c.data['**.subr']['*'], 'r6', [1, 2, 3]),
+ (data_table.c.data['**']['subr']['*'], 'r6', [1, 2, 3]),
+ )
+ for datum in data:
+ result = testing.db.execute(
+ select([datum[0]]).where(data_table.c.name == datum[1]
+ )
+ )
+ eq_(result.scalar(), datum[2])
+
+ def test_query_returned_as_int(self):
+ self._fixture_data(testing.db)
+ data_table = self.tables.data_table
+ result = testing.db.execute(
+ select([data_table.c.data['k3'].cast(Integer)]).where(
+ data_table.c.name == 'r5')
+ ).first()
+ assert isinstance(result[0], long)
+
+ def test_criterion(self):
+ self._fixture_data(testing.db)
+ data_table = self.tables.data_table
+ result = testing.db.execute(
+ select([data_table.c.data]).where(
+ data_table.c.data['k1'] == 'r3v1'
+ )
+ ).first()
+ eq_(result, ({'k1': 'r3v1', 'k2': 'r3v2'},))
+
+ def test_fixed_round_trip(self):
+ doc = {
+ "key": "value",
+ "key2": {"k1": "v1", "k2": "v2"}
+ }
+ s = select([JSON.MySqlJsonDocument(doc)])
+ eq_(
+ testing.db.scalar(s),
+ doc
+ )
+
+ def test_unicode_round_trip(self):
+ doc = {
+ "key": util.u('réveillé'),
+ "data": {"k1": util.u('drôle')}
+ }
+ s = select([JSON.MySqlJsonDocument(doc)])
+ eq_(
+ testing.db.scalar(s),
+ doc
+ )
+
+ def test_eval_none_flag_orm(self):
+ Base = declarative_base()
+
+ class Data(Base):
+ __table__ = self.tables.data_table
+
+ s = Session(testing.db)
+
+ d1 = Data(name='d1', data=None, nulldata=None)
+ s.add(d1)
+ s.commit()
+
+ s.bulk_insert_mappings(
+ Data, [{"name": "d2", "data": None, "nulldata": None}]
+ )
+ eq_(
+ s.query(
+ cast(self.tables.data_table.c.data, String),
+ cast(self.tables.data_table.c.nulldata, String)
+ ).filter(self.tables.data_table.c.name == 'd1').first(),
+ ("null", None)
+ )
+ eq_(
+ s.query(
+ cast(self.tables.data_table.c.data, String),
+ cast(self.tables.data_table.c.nulldata, String)
+ ).filter(self.tables.data_table.c.name == 'd2').first(),
+ ("null", None)
+ )
+
+# ::TODO:: postgres jsonb tests have tests for contains/contained/etc...
+# \ No newline at end of file