diff options
author | Sergei Petrunia <psergey@askmonty.org> | 2020-03-10 11:23:43 +0300 |
---|---|---|
committer | Sergei Petrunia <psergey@askmonty.org> | 2020-03-10 11:23:43 +0300 |
commit | 181da13191dd6c6bdcb57e8220b46bed1c1492d4 (patch) | |
tree | a8175512bcf3d14edbfd9b8c5637afff31d56d2b | |
parent | e0e5d8c5942a1eb0b0ae05b6296286193073e571 (diff) | |
parent | cc5f54819da673cce2f56fa7b592512d0897c825 (diff) | |
download | mariadb-git-bb-10.5-xpand.tar.gz |
Merge XPand Storage Engine (rebased)bb-10.5-xpand
-rw-r--r-- | mysql-test/suite/xpand/basics.result | 107 | ||||
-rw-r--r-- | mysql-test/suite/xpand/basics.test | 100 | ||||
-rw-r--r-- | mysql-test/suite/xpand/my.cnf | 4 | ||||
-rw-r--r-- | mysql-test/suite/xpand/pushdown_conditions.result | 47 | ||||
-rw-r--r-- | mysql-test/suite/xpand/pushdown_conditions.test | 34 | ||||
-rw-r--r-- | mysql-test/suite/xpand/suite.opt | 4 | ||||
-rw-r--r-- | mysql-test/suite/xpand/update.result | 51 | ||||
-rw-r--r-- | mysql-test/suite/xpand/update.test | 44 | ||||
-rw-r--r-- | mysql-test/suite/xpand/upsert.result | 72 | ||||
-rw-r--r-- | mysql-test/suite/xpand/upsert.test | 49 | ||||
-rw-r--r-- | storage/federatedx/federatedx_pushdown.cc | 10 | ||||
-rw-r--r-- | storage/xpand/CMakeLists.txt | 6 | ||||
-rw-r--r-- | storage/xpand/ha_xpand.cc | 1588 | ||||
-rw-r--r-- | storage/xpand/ha_xpand.h | 147 | ||||
-rw-r--r-- | storage/xpand/ha_xpand_pushdown.cc | 484 | ||||
-rw-r--r-- | storage/xpand/ha_xpand_pushdown.h | 84 | ||||
-rw-r--r-- | storage/xpand/xpand_connection.cc | 1357 | ||||
-rw-r--r-- | storage/xpand/xpand_connection.h | 146 |
18 files changed, 4334 insertions, 0 deletions
diff --git a/mysql-test/suite/xpand/basics.result b/mysql-test/suite/xpand/basics.result new file mode 100644 index 00000000000..e21ce2c574e --- /dev/null +++ b/mysql-test/suite/xpand/basics.result @@ -0,0 +1,107 @@ +CREATE DATABASE xpd; +USE xpd; +DROP TABLE IF EXISTS cx1, t1, t2; +CREATE TABLE cx1(i BIGINT)ENGINE=xpand; +CREATE TABLE cx1(i BIGINT)ENGINE=xpand; +ERROR 42S01: Table 'cx1' already exists +INSERT INTO cx1 VALUES (42); +SELECT * FROM cx1; +i +42 +DROP TABLE cx1; +SHOW CREATE TABLE cx1; +ERROR 42S02: Table 'xpd.cx1' doesn't exist +DROP TABLE IF EXISTS intandtext; +Warnings: +Note 1051 Unknown table 'xpd.intandtext' +CREATE TABLE intandtext(i bigint, t text)ENGINE=xpand; +INSERT INTO intandtext VALUES(10, 'someqwqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqq'); +SELECT i,t FROM intandtext; +i t +10 someqwqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqq +EXPLAIN SELECT i,t FROM intandtext; +id select_type table type possible_keys key key_len ref rows Extra +1 PUSHED SELECT NULL NULL NULL NULL NULL NULL NULL NULL +SET @@optimizer_switch='derived_merge=OFF'; +SET xpand_select_handler=OFF; +SELECT i,t FROM (SELECT i,t FROM intandtext) t; +i t +10 someqwqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqq +EXPLAIN SELECT i,t FROM (SELECT i,t FROM intandtext) t; +id select_type table type possible_keys key key_len ref rows Extra +1 PRIMARY <derived2> ALL NULL NULL NULL NULL 10000 +2 PUSHED DERIVED NULL NULL NULL NULL NULL NULL NULL NULL +SET xpand_derived_handler=OFF; +SELECT i,t FROM intandtext; +i t +10 someqwqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqq +SELECT i,t FROM (SELECT i,t FROM intandtext) t; +i t +10 someqwqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqq +EXPLAIN SELECT i,t FROM (SELECT i,t FROM intandtext) t; +id select_type table type possible_keys key key_len ref rows Extra +1 PRIMARY <derived2> ALL NULL NULL NULL NULL 10000 +2 DERIVED intandtext ALL NULL NULL NULL NULL 10000 +DROP TABLE intandtext; +set +optimizer_switch=default, +xpand_derived_handler= default, +xpand_select_handler=default; +# +# CLX-77: INSERT ... SELECT returns rows to the client instead of inserting +# +drop table if exists t1,t2; +create table t1 (a int) engine=xpand; +insert into t1 values (1); +select a into @var from t1; +select @var; +@var +1 +# This must not emit output to the client: +select a into outfile 'tmpfile1' from t1; +create table t2 (a int) engine=myisam; +insert into t2 select * from t1; +select * from t2; +a +1 +drop table t1,t2; +# +# CLX-55: Prepared statement support: +# Implement "Direct Update" by printing the statement +# +create table t1 (a int primary key, b int) engine=xpand; +insert into t1 values (1,1),(2,2),(3,3); +prepare s from 'update t1 set b=b+? where a=?'; +execute s using 10000, 2; +select * from t1; +a b +1 1 +2 10002 +3 3 +drop table t1; +# +# CLX-80: ALTER TABLE t ENGINE=CLUSTRIX fails with an error +# +create table t1 (a int) engine=myisam; +insert into t1 values (1),(2),(3); +alter table t1 engine=xpand; +select * from t1; +a +1 +2 +3 +show create table t1; +Table Create Table +t1 CREATE TABLE `t1` ( + `a` int(11) DEFAULT NULL +) ENGINE=XPAND DEFAULT CHARSET=utf8 +# Try a RENAME TABLE too since the patch touches the code +alter table t1 rename t2; +show create table t2; +Table Create Table +t2 CREATE TABLE `t2` ( + `a` int(11) DEFAULT NULL +) ENGINE=XPAND DEFAULT CHARSET=utf8 +drop table t2; +USE test; +DROP DATABASE xpd; diff --git a/mysql-test/suite/xpand/basics.test b/mysql-test/suite/xpand/basics.test new file mode 100644 index 00000000000..41f9a457da9 --- /dev/null +++ b/mysql-test/suite/xpand/basics.test @@ -0,0 +1,100 @@ +CREATE DATABASE xpd; +USE xpd; + +--disable_warnings +DROP TABLE IF EXISTS cx1, t1, t2; +--enable_warnings + +CREATE TABLE cx1(i BIGINT)ENGINE=xpand; +--error ER_TABLE_EXISTS_ERROR +CREATE TABLE cx1(i BIGINT)ENGINE=xpand; + +INSERT INTO cx1 VALUES (42); + +SELECT * FROM cx1; + +DROP TABLE cx1; + +--error ER_NO_SUCH_TABLE +SHOW CREATE TABLE cx1; + +DROP TABLE IF EXISTS intandtext; +CREATE TABLE intandtext(i bigint, t text)ENGINE=xpand; +INSERT INTO intandtext VALUES(10, 'someqwqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqq'); + +SELECT i,t FROM intandtext; +EXPLAIN SELECT i,t FROM intandtext; + +SET @@optimizer_switch='derived_merge=OFF'; +SET xpand_select_handler=OFF; +SELECT i,t FROM (SELECT i,t FROM intandtext) t; +EXPLAIN SELECT i,t FROM (SELECT i,t FROM intandtext) t; + +SET xpand_derived_handler=OFF; +SELECT i,t FROM intandtext; +SELECT i,t FROM (SELECT i,t FROM intandtext) t; +EXPLAIN SELECT i,t FROM (SELECT i,t FROM intandtext) t; + +DROP TABLE intandtext; + +set + optimizer_switch=default, + xpand_derived_handler= default, + xpand_select_handler=default; + +--echo # +--echo # CLX-77: INSERT ... SELECT returns rows to the client instead of inserting +--echo # +--disable_warnings +drop table if exists t1,t2; +--enable_warnings + +create table t1 (a int) engine=xpand; +insert into t1 values (1); + +select a into @var from t1; +select @var; + +--echo # This must not emit output to the client: +select a into outfile 'tmpfile1' from t1; +let $file=`select concat(@@datadir,'/xpd/tmpfile1')`; + +--remove_file $file + +create table t2 (a int) engine=myisam; +insert into t2 select * from t1; +select * from t2; + +drop table t1,t2; + +--echo # +--echo # CLX-55: Prepared statement support: +--echo # Implement "Direct Update" by printing the statement +--echo # +create table t1 (a int primary key, b int) engine=xpand; +insert into t1 values (1,1),(2,2),(3,3); + +prepare s from 'update t1 set b=b+? where a=?'; +execute s using 10000, 2; +--sorted_result +select * from t1; +drop table t1; + +--echo # +--echo # CLX-80: ALTER TABLE t ENGINE=CLUSTRIX fails with an error +--echo # +create table t1 (a int) engine=myisam; +insert into t1 values (1),(2),(3); +alter table t1 engine=xpand; +--sorted_result +select * from t1; +show create table t1; + +--echo # Try a RENAME TABLE too since the patch touches the code +alter table t1 rename t2; +show create table t2; + +drop table t2; + +USE test; +DROP DATABASE xpd; diff --git a/mysql-test/suite/xpand/my.cnf b/mysql-test/suite/xpand/my.cnf new file mode 100644 index 00000000000..8105041b85c --- /dev/null +++ b/mysql-test/suite/xpand/my.cnf @@ -0,0 +1,4 @@ +!include include/default_my.cnf + +[mysqld.1] +socket= /tmp/mysqld42.sock diff --git a/mysql-test/suite/xpand/pushdown_conditions.result b/mysql-test/suite/xpand/pushdown_conditions.result new file mode 100644 index 00000000000..3120000e56a --- /dev/null +++ b/mysql-test/suite/xpand/pushdown_conditions.result @@ -0,0 +1,47 @@ +CREATE DATABASE xpd; +USE xpd; +DROP TABLE IF EXISTS cx1; +CREATE TABLE cx1(i BIGINT, i2 BIGINT, t TEXT)ENGINE=xpand; +INSERT INTO cx1 VALUES (41, 43, 'some1'), (42, 42, 'some2'), (43, 41, 'some3'); +SELECT * FROM cx1 ORDER BY i; +i i2 t +41 43 some1 +42 42 some2 +43 41 some3 +SET xpand_select_handler=OFF; +SELECT * FROM cx1 WHERE i>41 AND i2>41; +i i2 t +42 42 some2 +EXPLAIN SELECT * FROM cx1 WHERE i>41 AND i2>41; +id select_type table type possible_keys key key_len ref rows Extra +1 SIMPLE cx1 ALL NULL NULL NULL NULL 10000 Using where with pushed condition +SELECT * FROM cx1 WHERE i>41 AND i2>41 AND t='some2'; +i i2 t +42 42 some2 +EXPLAIN SELECT * FROM cx1 WHERE i>41 AND i2>41 AND t='some2'; +id select_type table type possible_keys key key_len ref rows Extra +1 SIMPLE cx1 ALL NULL NULL NULL NULL 10000 Using where with pushed condition +SELECT * FROM cx1 WHERE i>i2+1; +i i2 t +43 41 some3 +EXPLAIN SELECT * FROM cx1 WHERE i>i2+1; +id select_type table type possible_keys key key_len ref rows Extra +1 SIMPLE cx1 ALL NULL NULL NULL NULL 10000 Using where with pushed condition +SET @@optimizer_switch='derived_merge=OFF'; +SELECT * FROM (SELECT * FROM cx1 WHERE i>i2+1) a1 ORDER BY i; +i i2 t +43 41 some3 +EXPLAIN SELECT * FROM (SELECT * FROM cx1 WHERE i>i2+1) a1 ORDER BY i; +id select_type table type possible_keys key key_len ref rows Extra +1 PRIMARY <derived2> ALL NULL NULL NULL NULL 10000 Using filesort +2 PUSHED DERIVED NULL NULL NULL NULL NULL NULL NULL NULL +SET xpand_derived_handler=OFF; +SELECT * FROM (SELECT * FROM cx1 WHERE i>i2+1) a1 ORDER BY i; +i i2 t +43 41 some3 +EXPLAIN SELECT * FROM (SELECT * FROM cx1 WHERE i>i2+1) a1 ORDER BY i; +id select_type table type possible_keys key key_len ref rows Extra +1 PRIMARY <derived2> ALL NULL NULL NULL NULL 10000 Using filesort +2 DERIVED cx1 ALL NULL NULL NULL NULL 10000 Using where with pushed condition +USE test; +DROP DATABASE xpd; diff --git a/mysql-test/suite/xpand/pushdown_conditions.test b/mysql-test/suite/xpand/pushdown_conditions.test new file mode 100644 index 00000000000..9d8eb1f3bf5 --- /dev/null +++ b/mysql-test/suite/xpand/pushdown_conditions.test @@ -0,0 +1,34 @@ +CREATE DATABASE xpd; +USE xpd; + +--disable_warnings +DROP TABLE IF EXISTS cx1; +--enable_warnings +CREATE TABLE cx1(i BIGINT, i2 BIGINT, t TEXT)ENGINE=xpand; + +INSERT INTO cx1 VALUES (41, 43, 'some1'), (42, 42, 'some2'), (43, 41, 'some3'); + +SELECT * FROM cx1 ORDER BY i; +SET xpand_select_handler=OFF; + +SELECT * FROM cx1 WHERE i>41 AND i2>41; +EXPLAIN SELECT * FROM cx1 WHERE i>41 AND i2>41; +SELECT * FROM cx1 WHERE i>41 AND i2>41 AND t='some2'; +EXPLAIN SELECT * FROM cx1 WHERE i>41 AND i2>41 AND t='some2'; +SELECT * FROM cx1 WHERE i>i2+1; +EXPLAIN SELECT * FROM cx1 WHERE i>i2+1; + +# The plugin doesn't use pushdown conditions for DH as of 10.5.1 +# but it is worth to test memory leaks. +SET @@optimizer_switch='derived_merge=OFF'; +SELECT * FROM (SELECT * FROM cx1 WHERE i>i2+1) a1 ORDER BY i; +EXPLAIN SELECT * FROM (SELECT * FROM cx1 WHERE i>i2+1) a1 ORDER BY i; +SET xpand_derived_handler=OFF; +SELECT * FROM (SELECT * FROM cx1 WHERE i>i2+1) a1 ORDER BY i; +EXPLAIN SELECT * FROM (SELECT * FROM cx1 WHERE i>i2+1) a1 ORDER BY i; + +# SELECT * FROM (SELECT i FROM cx1 WHERE i=42)a1,(SELECT i FROM cx1 WHERE i =42)a2 WHERE a1.i=a2.i; +# EXPLAIN SELECT * FROM (SELECT i FROM cx1 WHERE i=42)a1,(SELECT i FROM cx1 WHERE i =42)a2 WHERE a1.i=a2.i; + +USE test; +DROP DATABASE xpd; diff --git a/mysql-test/suite/xpand/suite.opt b/mysql-test/suite/xpand/suite.opt new file mode 100644 index 00000000000..1e15cb158ae --- /dev/null +++ b/mysql-test/suite/xpand/suite.opt @@ -0,0 +1,4 @@ +--plugin-load=xpand=ha_xpand.so +--core-file +--xpand_port=3306 +--plugin-maturity=unknown diff --git a/mysql-test/suite/xpand/update.result b/mysql-test/suite/xpand/update.result new file mode 100644 index 00000000000..9fa63f834f1 --- /dev/null +++ b/mysql-test/suite/xpand/update.result @@ -0,0 +1,51 @@ +CREATE DATABASE IF NOT EXISTS `db1`; +connect con1,localhost,root,,test; +connection con1; +USE `db1`; +DROP TABLE IF EXISTS `t1`; +CREATE TABLE `t1`(i BIGINT, t TEXT)ENGINE=xpand; +show create table t1; +Table Create Table +t1 CREATE TABLE `t1` ( + `i` bigint(20) DEFAULT NULL, + `t` text DEFAULT NULL +) ENGINE=XPAND DEFAULT CHARSET=utf8 +set character_set_client=utf8; +set collation_connection=utf8_bin; +set character_set_results=utf8; +INSERT INTO `t1` (i, t) VALUES (42, 'один'); +INSERT INTO `t1` (i, t) VALUES (42, 'ноль'); +SELECT * FROM `t1` ORDER BY `i` DESC, `t` DESC; +i t +42 один +42 ноль +# examine the data on the backend: +# This should show that the SELECT has been pushed to the backend: +explain +select i, hex(t) from t1; +id select_type table type possible_keys key key_len ref rows Extra +1 PUSHED SELECT NULL NULL NULL NULL NULL NULL NULL NULL +select i, hex(t) from t1; +i hex(t) +42 D0BDD0BED0BBD18C +42 D0BED0B4D0B8D0BD +# The above should match: +select hex('один') as row1, hex('ноль') as row2; +row1 row2 +D0BED0B4D0B8D0BD D0BDD0BED0BBD18C +UPDATE `t1` SET i=i+1,t='два' WHERE t='один'; +SELECT * FROM `t1` ORDER BY `i` DESC, `t` DESC; +i t +43 два +42 ноль +USE test; +UPDATE `db1`.`t1` SET i=i+1,t='три' WHERE t='два'; +SELECT * FROM `db1`.`t1` ORDER BY `i` DESC, `t` DESC; +i t +44 три +42 ноль +disconnect con1; +connection default; +DROP TABLE `db1`.`t1`; +USE test; +DROP DATABASE `db1`; diff --git a/mysql-test/suite/xpand/update.test b/mysql-test/suite/xpand/update.test new file mode 100644 index 00000000000..eb920fb4324 --- /dev/null +++ b/mysql-test/suite/xpand/update.test @@ -0,0 +1,44 @@ +CREATE DATABASE IF NOT EXISTS `db1`; + +# Do the test in another connection so that we don't have to clean up +connect (con1,localhost,root,,test); +connection con1; + +USE `db1`; +--disable_warnings +DROP TABLE IF EXISTS `t1`; +--enable_warnings + +CREATE TABLE `t1`(i BIGINT, t TEXT)ENGINE=xpand; +show create table t1; + +set character_set_client=utf8; +set collation_connection=utf8_bin; +set character_set_results=utf8; +INSERT INTO `t1` (i, t) VALUES (42, 'один'); +INSERT INTO `t1` (i, t) VALUES (42, 'ноль'); +SELECT * FROM `t1` ORDER BY `i` DESC, `t` DESC; + +--echo # examine the data on the backend: +--echo # This should show that the SELECT has been pushed to the backend: +explain +select i, hex(t) from t1; +--sorted_result +select i, hex(t) from t1; +--echo # The above should match: +select hex('один') as row1, hex('ноль') as row2; + +UPDATE `t1` SET i=i+1,t='два' WHERE t='один'; +SELECT * FROM `t1` ORDER BY `i` DESC, `t` DESC; + +USE test; +UPDATE `db1`.`t1` SET i=i+1,t='три' WHERE t='два'; +SELECT * FROM `db1`.`t1` ORDER BY `i` DESC, `t` DESC; + +disconnect con1; +connection default; + +DROP TABLE `db1`.`t1`; + +USE test; +DROP DATABASE `db1`; diff --git a/mysql-test/suite/xpand/upsert.result b/mysql-test/suite/xpand/upsert.result new file mode 100644 index 00000000000..9cbad652dd4 --- /dev/null +++ b/mysql-test/suite/xpand/upsert.result @@ -0,0 +1,72 @@ +CREATE DATABASE IF NOT EXISTS `db1`; +USE `db1`; +DROP TABLE IF EXISTS `ins_duplicate`; +Warnings: +Note 1051 Unknown table 'db1.ins_duplicate' +CREATE TABLE `ins_duplicate`(`id` INT PRIMARY KEY, `animal` VARCHAR(30)) ENGINE=xpand; +INSERT INTO `ins_duplicate` VALUES (1,'Aardvark'), (2,'Cheetah'), (3,'Zebra'); +SELECT * FROM `ins_duplicate` ORDER BY `id`; +id animal +1 Aardvark +2 Cheetah +3 Zebra +INSERT INTO ins_duplicate VALUES (1,'Antelope'); +ERROR 23000: Can't write; duplicate key in table 'ins_duplicate' +INSERT INTO ins_duplicate VALUES (1,'Antelope') ON DUPLICATE KEY UPDATE animal='Banana'; +SELECT * FROM `ins_duplicate` ORDER BY `id`; +id animal +1 Banana +2 Cheetah +3 Zebra +INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah'); +ERROR 23000: Can't write; duplicate key in table 'ins_duplicate' +INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah') ON DUPLICATE KEY UPDATE animal='hybrid'; +SELECT * FROM `ins_duplicate` ORDER BY `id`; +id animal +1 hybrid +2 hybrid +3 Zebra +BEGIN; +SELECT * FROM `ins_duplicate` ORDER BY `id`; +id animal +1 hybrid +2 hybrid +3 Zebra +INSERT INTO ins_duplicate VALUES (1,'Antelope'); +ERROR 23000: Can't write; duplicate key in table 'ins_duplicate' +INSERT INTO ins_duplicate VALUES (1,'Antelope') ON DUPLICATE KEY UPDATE animal='Vegetable'; +SELECT * FROM `ins_duplicate` ORDER BY `id`; +id animal +1 Vegetable +2 hybrid +3 Zebra +INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah'); +ERROR 23000: Can't write; duplicate key in table 'ins_duplicate' +INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah') ON DUPLICATE KEY UPDATE animal='hybrid2'; +COMMIT; +BEGIN; +SELECT * FROM `ins_duplicate` ORDER BY `id`; +id animal +1 hybrid2 +2 hybrid2 +3 Zebra +INSERT INTO ins_duplicate VALUES (1,'Antelope'); +ERROR 23000: Can't write; duplicate key in table 'ins_duplicate' +INSERT INTO ins_duplicate VALUES (1,'Antelope') ON DUPLICATE KEY UPDATE animal='Vegetable'; +SELECT * FROM `ins_duplicate` ORDER BY `id`; +id animal +1 Vegetable +2 hybrid2 +3 Zebra +INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah'); +ERROR 23000: Can't write; duplicate key in table 'ins_duplicate' +INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah') ON DUPLICATE KEY UPDATE animal='hybrid3'; +ROLLBACK; +SELECT * FROM `ins_duplicate` ORDER BY `id`; +id animal +1 hybrid2 +2 hybrid2 +3 Zebra +DROP TABLE `db1`.`ins_duplicate`; +USE test; +DROP DATABASE `db1`; diff --git a/mysql-test/suite/xpand/upsert.test b/mysql-test/suite/xpand/upsert.test new file mode 100644 index 00000000000..ab46b1a01b8 --- /dev/null +++ b/mysql-test/suite/xpand/upsert.test @@ -0,0 +1,49 @@ +CREATE DATABASE IF NOT EXISTS `db1`; +USE `db1`; +DROP TABLE IF EXISTS `ins_duplicate`; +CREATE TABLE `ins_duplicate`(`id` INT PRIMARY KEY, `animal` VARCHAR(30)) ENGINE=xpand; +INSERT INTO `ins_duplicate` VALUES (1,'Aardvark'), (2,'Cheetah'), (3,'Zebra'); +SELECT * FROM `ins_duplicate` ORDER BY `id`; + +--error ER_DUP_KEY +INSERT INTO ins_duplicate VALUES (1,'Antelope'); +INSERT INTO ins_duplicate VALUES (1,'Antelope') ON DUPLICATE KEY UPDATE animal='Banana'; +SELECT * FROM `ins_duplicate` ORDER BY `id`; + +--error ER_DUP_KEY +INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah'); +INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah') ON DUPLICATE KEY UPDATE animal='hybrid'; +SELECT * FROM `ins_duplicate` ORDER BY `id`; + +BEGIN; +SELECT * FROM `ins_duplicate` ORDER BY `id`; + +--error ER_DUP_KEY +INSERT INTO ins_duplicate VALUES (1,'Antelope'); +INSERT INTO ins_duplicate VALUES (1,'Antelope') ON DUPLICATE KEY UPDATE animal='Vegetable'; +SELECT * FROM `ins_duplicate` ORDER BY `id`; + +--error ER_DUP_KEY +INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah'); +INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah') ON DUPLICATE KEY UPDATE animal='hybrid2'; +COMMIT; + +BEGIN; +SELECT * FROM `ins_duplicate` ORDER BY `id`; + +--error ER_DUP_KEY +INSERT INTO ins_duplicate VALUES (1,'Antelope'); +INSERT INTO ins_duplicate VALUES (1,'Antelope') ON DUPLICATE KEY UPDATE animal='Vegetable'; +SELECT * FROM `ins_duplicate` ORDER BY `id`; + +--error ER_DUP_KEY +INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah'); +INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah') ON DUPLICATE KEY UPDATE animal='hybrid3'; +ROLLBACK; + +SELECT * FROM `ins_duplicate` ORDER BY `id`; + +DROP TABLE `db1`.`ins_duplicate`; + +USE test; +DROP DATABASE `db1`; diff --git a/storage/federatedx/federatedx_pushdown.cc b/storage/federatedx/federatedx_pushdown.cc index 2701436ccf5..baaf8245aea 100644 --- a/storage/federatedx/federatedx_pushdown.cc +++ b/storage/federatedx/federatedx_pushdown.cc @@ -181,6 +181,16 @@ create_federatedx_select_handler(THD* thd, SELECT_LEX *sel) else if (ht != tbl->table->file->partition_ht()) return 0; } + + /* + Currently, ha_federatedx_select_handler::init_scan just takes the + thd->query and sends it to the backend. + This obviously won't work if the SELECT uses an "INTO @var" or + "INTO OUTFILE". It is also unlikely to work if the select has some + other kind of side effect. + */ + if (sel->uncacheable & UNCACHEABLE_SIDEEFFECT) + return NULL; /* Currently, ha_federatedx_select_handler::init_scan just takes the diff --git a/storage/xpand/CMakeLists.txt b/storage/xpand/CMakeLists.txt new file mode 100644 index 00000000000..7b4727c09da --- /dev/null +++ b/storage/xpand/CMakeLists.txt @@ -0,0 +1,6 @@ +#***************************************************************************** +# Copyright (c) 2019, 2020, MariaDB Corporation. +#****************************************************************************/ + +SET(XPAND_SOURCES ha_xpand.cc xpand_connection.cc ha_xpand_pushdown.cc) +MYSQL_ADD_PLUGIN(xpand ${XPAND_SOURCES} STORAGE_ENGINE COMPONENT xpand-engine) diff --git a/storage/xpand/ha_xpand.cc b/storage/xpand/ha_xpand.cc new file mode 100644 index 00000000000..04369c35fe8 --- /dev/null +++ b/storage/xpand/ha_xpand.cc @@ -0,0 +1,1588 @@ +/***************************************************************************** +Copyright (c) 2019, 2020, MariaDB Corporation. +*****************************************************************************/ + +/** @file ha_xpand.cc */ + +#include "ha_xpand.h" +#include "ha_xpand_pushdown.h" +#include "key.h" +#include <strfunc.h> /* strconvert */ +#include "my_pthread.h" + +handlerton *xpand_hton = NULL; + +int xpand_connect_timeout; +static MYSQL_SYSVAR_INT +( + connect_timeout, + xpand_connect_timeout, + PLUGIN_VAR_OPCMDARG, + "Timeout for connecting to Xpand", + NULL, NULL, -1, -1, 2147483647, 0 +); + +int xpand_read_timeout; +static MYSQL_SYSVAR_INT +( + read_timeout, + xpand_read_timeout, + PLUGIN_VAR_OPCMDARG, + "Timeout for receiving data from Xpand", + NULL, NULL, -1, -1, 2147483647, 0 +); + +int xpand_write_timeout; +static MYSQL_SYSVAR_INT +( + write_timeout, + xpand_write_timeout, + PLUGIN_VAR_OPCMDARG, + "Timeout for sending data to Xpand", + NULL, NULL, -1, -1, 2147483647, 0 +); + +//state for load balancing +int xpand_hosts_cur; //protected by my_atomic's +ulong xpand_balance_algorithm; +const char* balance_algorithm_names[]= +{ + "first", "round_robin", NullS +}; + +TYPELIB balance_algorithms= +{ + array_elements(balance_algorithm_names) - 1, "", + balance_algorithm_names, NULL +}; + +static void update_balance_algorithm(MYSQL_THD thd, struct st_mysql_sys_var *var, + void *var_ptr, const void *save) +{ + *static_cast<ulong *>(var_ptr) = *static_cast<const ulong *>(save); + my_atomic_store32(&xpand_hosts_cur, 0); +} + +static MYSQL_SYSVAR_ENUM +( + balance_algorithm, + xpand_balance_algorithm, + PLUGIN_VAR_OPCMDARG, + "Method for managing load balancing of Clustrix nodes, can take values FIRST or ROUND_ROBIN", + NULL, update_balance_algorithm, XPAND_BALANCE_ROUND_ROBIN, &balance_algorithms +); + +//current list of clx hosts +#ifdef HAVE_PSI_INTERFACE +static PSI_rwlock_key key_xpand_hosts; +#endif +mysql_rwlock_t xpand_hosts_lock; +xpand_host_list *xpand_hosts; + +static int check_hosts(MYSQL_THD thd, struct st_mysql_sys_var *var, + void *save, struct st_mysql_value *value) +{ + DBUG_ENTER("check_hosts"); + char b; + int len = 0; + const char *val = value->val_str(value, &b, &len); + if (!val) + DBUG_RETURN(HA_ERR_OUT_OF_MEM); + + xpand_host_list list; + memset(&list, 0, sizeof(list)); + + int error_code = 0; + if ((error_code = list.fill(val))) + DBUG_RETURN(error_code); + list.empty(); + + *static_cast<const char **>(save) = val; + DBUG_RETURN(0); +} + +static void update_hosts(MYSQL_THD thd, struct st_mysql_sys_var *var, + void *var_ptr, const void *save) +{ + DBUG_ENTER("update_hosts"); + const char *from_save = *static_cast<const char * const *>(save); + + mysql_rwlock_wrlock(&xpand_hosts_lock); + + xpand_host_list *list = static_cast<xpand_host_list*>( + my_malloc(sizeof(xpand_host_list), MYF(MY_WME | MY_ZEROFILL))); + int error_code = list->fill(from_save); + if (error_code) { + my_free(list); + sql_print_error("Unhandled error %d setting xpand hostlist", error_code); + DBUG_VOID_RETURN; + } + + xpand_hosts->empty(); + my_free(xpand_hosts); + xpand_hosts = list; + + char **display_var = static_cast<char**>(var_ptr); + my_free(*display_var); + *display_var = my_strdup(from_save, MYF(MY_WME)); + + mysql_rwlock_unlock(&xpand_hosts_lock); + DBUG_VOID_RETURN; +} + +static char *xpand_hosts_str; +static MYSQL_SYSVAR_STR +( + hosts, + xpand_hosts_str, + PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC, + "List of xpand hostnames seperated by commas, semicolons or spaces", + check_hosts, update_hosts, "localhost" +); + +char *xpand_username; +static MYSQL_SYSVAR_STR +( + username, + xpand_username, + PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC, + "Xpand user name", + NULL, NULL, "root" +); + +char *xpand_password; +static MYSQL_SYSVAR_STR +( + password, + xpand_password, + PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC, + "Xpand password", + NULL, NULL, "" +); + +uint xpand_port; +static MYSQL_SYSVAR_UINT +( + port, + xpand_port, + PLUGIN_VAR_RQCMDARG, + "Xpand port", + NULL, NULL, MYSQL_PORT_DEFAULT, MYSQL_PORT_DEFAULT, 65535, 0 +); + +char *xpand_socket; +static MYSQL_SYSVAR_STR +( + socket, + xpand_socket, + PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC, + "Xpand socket", + NULL, NULL, "" +); + +static MYSQL_THDVAR_UINT +( + row_buffer, + PLUGIN_VAR_RQCMDARG, + "Xpand rowstore row buffer size", + NULL, NULL, 20, 1, 65535, 0 +); + +// Per thread select handler knob +static MYSQL_THDVAR_BOOL( + select_handler, + PLUGIN_VAR_NOCMDARG, + "", + NULL, + NULL, + 1 +); + +// Per thread derived handler knob +static MYSQL_THDVAR_BOOL( + derived_handler, + PLUGIN_VAR_NOCMDARG, + "", + NULL, + NULL, + 1 +); + +static MYSQL_THDVAR_BOOL( + enable_direct_update, + PLUGIN_VAR_NOCMDARG, + "", + NULL, + NULL, + 1 +); + +bool select_handler_setting(THD* thd) +{ + return ( thd == NULL ) ? false : THDVAR(thd, select_handler); +} + +bool derived_handler_setting(THD* thd) +{ + return ( thd == NULL ) ? false : THDVAR(thd, derived_handler); +} + +uint row_buffer_setting(THD* thd) +{ + return THDVAR(thd, row_buffer); +} + + +/* + Get an Xpand_share object for this object. If it doesn't yet exist, create + it. +*/ + +Xpand_share *ha_xpand::get_share() +{ + Xpand_share *tmp_share; + + DBUG_ENTER("ha_xpand::get_share()"); + + lock_shared_ha_data(); + if (!(tmp_share= static_cast<Xpand_share*>(get_ha_share_ptr()))) + { + tmp_share= new Xpand_share; + if (!tmp_share) + goto err; + + set_ha_share_ptr(static_cast<Handler_share*>(tmp_share)); + } +err: + unlock_shared_ha_data(); + DBUG_RETURN(tmp_share); +} + + +/**************************************************************************** +** Utility functions +****************************************************************************/ +// This is a wastefull aproach but better then fixed sized buffer. +size_t estimate_row_size(TABLE *table) +{ + size_t row_size = 0; + size_t null_byte_count = (bitmap_bits_set(table->write_set) + 7) / 8; + row_size += null_byte_count; + Field **p_field= table->field, *field; + for ( ; (field= *p_field) ; p_field++) { + row_size += field->max_data_length(); + } + return row_size; +} + + +/* + Try to decode a string from filename encoding, if that fails, return the + original string. + + @detail + This is used to get table (or database) name from file (or directory) + name. Names of regular tables/databases are encoded using + my_charset_filename encoding. + Names of temporary tables are not encoded, and they start with '#sql' + which is not a valid character sequence in my_charset_filename encoding. + Our way to talkle this is to + 1. Try to convert the name back + 2. If that failed, assume it's a temporary object name and just use the + name. +*/ + +static void decode_object_or_tmp_name(const char *from, uint size, + std::string *out) +{ + uint errors, new_size; + out->resize(size+1); // assume the decoded string is not longer + new_size= strconvert(&my_charset_filename, from, size, + system_charset_info, (char*)out->c_str(), size+1, + &errors); + if (errors) + out->assign(from, size); + else + out->resize(new_size); +} + +/* + Take a "./db_name/table_name" and extract db_name and table_name from it + + @return + 0 OK + other Error code +*/ +static int normalize_tablename(const char *db_table, + std::string *norm_db, std::string *norm_table) +{ + std::string tablename(db_table); + if (tablename.size() < 2 || tablename[0] != '.' || + (tablename[1] != FN_LIBCHAR && tablename[1] != FN_LIBCHAR2)) { + DBUG_ASSERT(0); // We were not passed table name? + return HA_ERR_INTERNAL_ERROR; + } + + size_t pos = tablename.find_first_of(FN_LIBCHAR, 2); + if (pos == std::string::npos) { + pos = tablename.find_first_of(FN_LIBCHAR2, 2); + } + + if (pos == std::string::npos) { + DBUG_ASSERT(0); // We were not passed table name? + return HA_ERR_INTERNAL_ERROR; + } + + decode_object_or_tmp_name(tablename.c_str() + 2, pos - 2, norm_db); + decode_object_or_tmp_name(tablename.c_str() + pos + 1, + tablename.size() - (pos + 1), norm_table); + return 0; +} + + +xpand_connection *get_trx(THD *thd, int *error_code) +{ + *error_code = 0; + xpand_connection *trx; + if (!(trx = (xpand_connection *)thd_get_ha_data(thd, xpand_hton))) + { + if (!(trx = new xpand_connection())) { + *error_code = HA_ERR_OUT_OF_MEM; + return NULL; + } + + *error_code = trx->connect(); + if (*error_code) { + delete trx; + return NULL; + } + + thd_set_ha_data(thd, xpand_hton, trx); + } + + return trx; +} +/**************************************************************************** +** Class ha_xpand +****************************************************************************/ + +ha_xpand::ha_xpand(handlerton *hton, TABLE_SHARE *table_arg) + : handler(hton, table_arg) +{ + DBUG_ENTER("ha_xpand::ha_xpand"); + rgi = NULL; + scan_cur = NULL; + xpand_table_oid = 0; + upsert_flag = 0; + DBUG_VOID_RETURN; +} + +ha_xpand::~ha_xpand() +{ + if (rgi) + remove_current_table_from_rpl_table_list(rgi); +} + + +int ha_xpand::create(const char *name, TABLE *form, HA_CREATE_INFO *info) +{ + int error_code; + THD *thd = ha_thd(); + xpand_connection *trx = get_trx(thd, &error_code); + if (!trx) + return error_code; + + enum tmp_table_type saved_tmp_table_type = form->s->tmp_table; + Table_specification_st *create_info = &thd->lex->create_info; + const bool is_tmp_table = info->options & HA_LEX_CREATE_TMP_TABLE; + String create_table_stmt; + + /* Create a copy of the CREATE TABLE statement */ + if (!is_tmp_table) + form->s->tmp_table = NO_TMP_TABLE; + const char *old_dbstr = thd->db.str; + thd->db.str = NULL; + ulong old = create_info->used_fields; + create_info->used_fields &= ~HA_CREATE_USED_ENGINE; + + std::string norm_db, norm_table; + if ((error_code= normalize_tablename(name, &norm_db, &norm_table))) + return error_code; + + TABLE_LIST table_list; + memset(&table_list, 0, sizeof(table_list)); + table_list.table = form; + error_code = show_create_table_ex(thd, &table_list, + norm_db.c_str(), norm_table.c_str(), + &create_table_stmt, create_info, WITH_DB_NAME); + if (!is_tmp_table) + form->s->tmp_table = saved_tmp_table_type; + create_info->used_fields = old; + thd->db.str = old_dbstr; + if (error_code) + return error_code; + + // To syncronize the schemas of MDB FE and XPD BE. + if (form->s && form->s->db.length) { + String createdb_stmt; + createdb_stmt.append("CREATE DATABASE IF NOT EXISTS `"); + createdb_stmt.append(form->s->db.str, form->s->db.length); + createdb_stmt.append("`"); + trx->run_query(createdb_stmt); + } + + return trx->run_query(create_table_stmt); +} + +int ha_xpand::delete_table(const char *path) +{ + int error_code; + THD *thd = ha_thd(); + xpand_connection *trx = get_trx(thd, &error_code); + if (!trx) + return error_code; + + std::string decoded_dbname; + std::string decoded_tbname; + if ((error_code= normalize_tablename(path, &decoded_dbname, + &decoded_tbname))) + return error_code; + + String delete_cmd; + delete_cmd.append("DROP TABLE `"); + delete_cmd.append(decoded_dbname.c_str()); + delete_cmd.append("`.`"); + delete_cmd.append(decoded_tbname.c_str()); + delete_cmd.append("`"); + + return trx->run_query(delete_cmd); +} + +int ha_xpand::rename_table(const char* from, const char* to) +{ + int error_code; + THD *thd = ha_thd(); + xpand_connection *trx = get_trx(thd, &error_code); + if (!trx) + return error_code; + + std::string decoded_from_dbname; + std::string decoded_from_tbname; + if ((error_code= normalize_tablename(from, &decoded_from_dbname, + &decoded_from_tbname))) + return error_code; + + std::string decoded_to_dbname; + std::string decoded_to_tbname; + if ((error_code= normalize_tablename(to, &decoded_to_dbname, + &decoded_to_tbname))) + return error_code; + + String rename_cmd; + rename_cmd.append("RENAME TABLE `"); + rename_cmd.append(decoded_from_dbname.c_str()); + rename_cmd.append("`.`"); + rename_cmd.append(decoded_from_tbname.c_str()); + rename_cmd.append("` TO `"); + rename_cmd.append(decoded_to_dbname.c_str()); + rename_cmd.append("`.`"); + rename_cmd.append(decoded_to_tbname.c_str()); + rename_cmd.append("`;"); + + return trx->run_query(rename_cmd); +} + +static void +xpand_mark_table_for_discovery(TABLE *table) +{ + table->m_needs_reopen = true; + Xpand_share *xs; + if ((xs= static_cast<Xpand_share*>(table->s->ha_share))) + xs->rediscover_table = true; +} + +void +xpand_mark_tables_for_discovery(LEX *lex) +{ + for (TABLE_LIST *tbl= lex->query_tables; tbl; tbl= tbl->next_global) + if (tbl->table && tbl->table->file->ht == xpand_hton) + xpand_mark_table_for_discovery(tbl->table); +} + +ulonglong * +xpand_extract_table_oids(THD *thd, LEX *lex) +{ + int cnt = 1; + for (TABLE_LIST *tbl = lex->query_tables; tbl; tbl= tbl->next_global) + if (tbl->table && tbl->table->file->ht == xpand_hton) + cnt++; + + ulonglong *oids = (ulonglong*)thd_alloc(thd, cnt * sizeof(ulonglong)); + ulonglong *ptr = oids; + for (TABLE_LIST *tbl = lex->query_tables; tbl; tbl= tbl->next_global) + { + if (tbl->table && tbl->table->file->ht == xpand_hton) + { + ha_xpand *hndlr = static_cast<ha_xpand *>(tbl->table->file); + *ptr++ = hndlr->get_table_oid(); + } + } + + *ptr = 0; + return oids; +} + +int ha_xpand::open(const char *name, int mode, uint test_if_locked) +{ + THD *thd= ha_thd(); + DBUG_ENTER("ha_xpand::open"); + + Xpand_share *share; + if (!(share = get_share())) + DBUG_RETURN(1); + + int error_code; + xpand_connection *trx = get_trx(thd, &error_code); + if (!trx) + DBUG_RETURN(error_code); + + if (share->rediscover_table) + DBUG_RETURN(HA_ERR_TABLE_DEF_CHANGED); + + if (!share->xpand_table_oid) { + // We may end up with two threads executing this piece concurrently but + // it's ok + std::string norm_table; + std::string norm_db; + if ((error_code= normalize_tablename(name, &norm_db, &norm_table))) + DBUG_RETURN(error_code); + + ulonglong oid = 0; + error_code= trx->get_table_oid(norm_db.c_str(), strlen(norm_db.c_str()), + norm_table.c_str(), + strlen(norm_table.c_str()), &oid, + table_share); + if (error_code) + DBUG_RETURN(error_code); + + share->xpand_table_oid = oid; + } + + xpand_table_oid = share->xpand_table_oid; + + // Surrogate key marker + has_hidden_key = table->s->primary_key == MAX_KEY; + if (has_hidden_key) { + ref_length = 8; + } else { + KEY* key_info = table->key_info + table->s->primary_key; + ref_length = key_info->key_length; + } + + DBUG_PRINT("open finished", + ("oid: %llu, ref_length: %u", xpand_table_oid, ref_length)); + DBUG_RETURN(0); +} + +int ha_xpand::close(void) +{ + return 0; +} + +int ha_xpand::reset() +{ + upsert_flag &= ~XPAND_BULK_UPSERT; + upsert_flag &= ~XPAND_HAS_UPSERT; + upsert_flag &= ~XPAND_UPSERT_SENT; + xpd_lock_type = XPAND_NO_LOCKS; + pushdown_cond_list.empty(); + return 0; +} + +int ha_xpand::extra(enum ha_extra_function operation) +{ + DBUG_ENTER("ha_xpand::extra"); + if (operation == HA_EXTRA_INSERT_WITH_UPDATE) + upsert_flag |= XPAND_HAS_UPSERT; + DBUG_RETURN(0); +} + +/*@brief UPSERT State Machine*/ +/************************************************************* + * DESCRIPTION: + * Fasttrack for UPSERT sends queries down to a XPD backend. + * UPSERT could be of two kinds: singular and bulk. The plugin + * re-/sets XPAND_BULK_UPSERT in end|start_bulk_insert + * methods. XPAND_UPSERT_SENT is used to avoid multiple + * execution at XPD backend. + * Generic XPAND_HAS_UPSERT is set for bulk UPSERT only b/c + * MDB calls write_row only once. + ************************************************************/ +int ha_xpand::write_row(const uchar *buf) +{ + int error_code = 0; + THD *thd = ha_thd(); + xpand_connection *trx = get_trx(thd, &error_code); + if (!trx) + return error_code; + + if (upsert_flag & XPAND_HAS_UPSERT) { + if (!(upsert_flag & XPAND_UPSERT_SENT)) { + ha_rows update_rows; + String update_stmt; + update_stmt.append(thd->query_string.str()); + + if (!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) + trx->auto_commit_next(); + + ulonglong *oids = xpand_extract_table_oids(thd, thd->lex); + error_code= trx->update_query(update_stmt, table->s->db, oids, + &update_rows); + if (upsert_flag & XPAND_BULK_UPSERT) + upsert_flag |= XPAND_UPSERT_SENT; + else + upsert_flag &= ~XPAND_HAS_UPSERT; + } + + if (error_code == HA_ERR_TABLE_DEF_CHANGED) + xpand_mark_tables_for_discovery(thd->lex); + return error_code; + } + + /* Convert the row format to binlog (packed) format */ + uchar *packed_new_row = (uchar*) my_alloca(estimate_row_size(table)); + size_t packed_size = pack_row(table, table->write_set, packed_new_row, buf); + + /* XXX: Xpand may needs to return HA_ERR_AUTOINC_ERANGE if we hit that + error. */ + ulonglong last_insert_id = 0; + if ((error_code = trx->write_row(xpand_table_oid, packed_new_row, packed_size, + &last_insert_id))) + goto err; + + if (table->next_number_field) + insert_id_for_cur_row = last_insert_id; + +err: + if (error_code == HA_ERR_TABLE_DEF_CHANGED) + xpand_mark_table_for_discovery(table); + + if (packed_size) + my_afree(packed_new_row); + + return error_code; +} + +int ha_xpand::update_row(const uchar *old_data, const uchar *new_data) +{ + DBUG_ENTER("ha_xpand::update_row"); + int error_code; + THD *thd = ha_thd(); + xpand_connection *trx = get_trx(thd, &error_code); + if (!trx) + DBUG_RETURN(error_code); + + size_t row_size = estimate_row_size(table); + size_t packed_key_len; + uchar *packed_key = (uchar*) my_alloca(row_size); + build_key_packed_row(table->s->primary_key, old_data, + packed_key, &packed_key_len); + + uchar *packed_new_row = (uchar*) my_alloca(row_size); + size_t packed_new_size = pack_row(table, table->write_set, packed_new_row, + new_data); + + /* Send the packed rows to Xpand */ + error_code = trx->key_update(xpand_table_oid, packed_key, packed_key_len, + table->write_set, + packed_new_row, packed_new_size); + + if(packed_key) + my_afree(packed_key); + + if(packed_new_row) + my_afree(packed_new_row); + + if (error_code == HA_ERR_TABLE_DEF_CHANGED) + xpand_mark_table_for_discovery(table); + + DBUG_RETURN(error_code); +} + +int ha_xpand::direct_update_rows_init(List<Item> *update_fields) +{ + DBUG_ENTER("ha_xpand::direct_update_rows_init"); + THD *thd= ha_thd(); + if (!THDVAR(thd, enable_direct_update)) + DBUG_RETURN(HA_ERR_WRONG_COMMAND); + DBUG_RETURN(0); +} + +int ha_xpand::direct_update_rows(ha_rows *update_rows, ha_rows *found_rows) +{ + DBUG_ENTER("ha_xpand::direct_update_rows"); + int error_code= 0; + THD *thd= ha_thd(); + xpand_connection *trx= get_trx(thd, &error_code); + if (!trx) + return error_code; + + String update_stmt; + // Do the same as create_xpand_select_handler does: + thd->lex->print(&update_stmt, QT_ORDINARY); + + if (!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) + trx->auto_commit_next(); + + ulonglong *oids = xpand_extract_table_oids(thd, thd->lex); + error_code = trx->update_query(update_stmt, table->s->db, oids, update_rows); + *found_rows = *update_rows; + + if (error_code == HA_ERR_TABLE_DEF_CHANGED) + xpand_mark_tables_for_discovery(thd->lex); + DBUG_RETURN(error_code); +} + +void ha_xpand::start_bulk_insert(ha_rows rows, uint flags) +{ + DBUG_ENTER("ha_xpand::start_bulk_insert"); + int error_code= 0; + THD *thd= ha_thd(); + xpand_connection *trx= get_trx(thd, &error_code); + if (!trx) { + // TBD log this + DBUG_VOID_RETURN; + } + + upsert_flag |= XPAND_BULK_UPSERT; + + DBUG_VOID_RETURN; +} + +int ha_xpand::end_bulk_insert() +{ + DBUG_ENTER("ha_xpand::end_bulk_insert"); + upsert_flag &= ~XPAND_BULK_UPSERT; + upsert_flag &= ~XPAND_HAS_UPSERT; + upsert_flag &= ~XPAND_UPSERT_SENT; + DBUG_RETURN(0); +} + +int ha_xpand::delete_row(const uchar *buf) +{ + int error_code; + THD *thd = ha_thd(); + xpand_connection *trx = get_trx(thd, &error_code); + if (!trx) + return error_code; + + // The estimate should consider only key fields widths. + size_t packed_key_len; + uchar *packed_key = (uchar*) my_alloca(estimate_row_size(table)); + build_key_packed_row(table->s->primary_key, buf, packed_key, &packed_key_len); + + error_code = trx->key_delete(xpand_table_oid, packed_key, packed_key_len); + + if (error_code == HA_ERR_TABLE_DEF_CHANGED) + xpand_mark_table_for_discovery(table); + + if (packed_key) + my_afree(packed_key); + + return error_code; +} + +ha_xpand::Table_flags ha_xpand::table_flags(void) const +{ + Table_flags flags = HA_PARTIAL_COLUMN_READ | + HA_REC_NOT_IN_SEQ | + HA_FAST_KEY_READ | + HA_NULL_IN_KEY | + HA_CAN_INDEX_BLOBS | + HA_AUTO_PART_KEY | + HA_CAN_SQL_HANDLER | + HA_BINLOG_STMT_CAPABLE | + HA_CAN_TABLE_CONDITION_PUSHDOWN | + HA_CAN_DIRECT_UPDATE_AND_DELETE; + + return flags; +} + +ulong ha_xpand::index_flags(uint idx, uint part, bool all_parts) const +{ + ulong flags = HA_READ_NEXT | + HA_READ_PREV | + HA_READ_ORDER | + HA_READ_RANGE; + + return flags; +} + +ha_rows ha_xpand::records() +{ + return 10000; +} + +ha_rows ha_xpand::records_in_range(uint inx, key_range *min_key, + key_range *max_key) +{ + return 2; +} + +int ha_xpand::info(uint flag) +{ + //THD *thd = ha_thd(); + if (flag & HA_STATUS_TIME) + { + /* Retrieve the time of the most recent update to the table */ + // stats.update_time = + } + + if (flag & HA_STATUS_AUTO) + { + /* Retrieve the latest auto_increment value */ + stats.auto_increment_value = next_insert_id; + } + + if (flag & HA_STATUS_VARIABLE) + { + /* Retrieve variable info, such as row counts and file lengths */ + stats.records = records(); + stats.deleted = 0; + // stats.data_file_length = + // stats.index_file_length = + // stats.delete_length = + stats.check_time = 0; + // stats.mrr_length_per_rec = + + if (stats.records == 0) + stats.mean_rec_length = 0; + else + stats.mean_rec_length = (ulong) (stats.data_file_length / stats.records); + } + + if (flag & HA_STATUS_CONST) + { + /* + Retrieve constant info, such as file names, max file lengths, + create time, block size + */ + // stats.max_data_file_length = + // stats.create_time = + // stats.block_size = + } + + return 0; +} + +int ha_xpand::index_init(uint idx, bool sorted) +{ + int error_code = 0; + THD *thd = ha_thd(); + xpand_connection *trx = get_trx(thd, &error_code); + if (!trx) + return error_code; + + active_index = idx; + add_current_table_to_rpl_table_list(&rgi, thd, table); + scan_cur = NULL; + + /* Return all columns until there is a better understanding of + requirements. */ + if (my_bitmap_init(&scan_fields, NULL, table->read_set->n_bits, false)) + return ER_OUTOFMEMORY; + bitmap_set_all(&scan_fields); + sorted_scan = sorted; + + return 0; +} + +int ha_xpand::index_read(uchar * buf, const uchar * key, uint key_len, + enum ha_rkey_function find_flag) +{ + DBUG_ENTER("ha_xpand::index_read"); + int error_code = 0; + THD *thd = ha_thd(); + xpand_connection *trx = get_trx(thd, &error_code); + if (!trx) + DBUG_RETURN(error_code); + + key_restore(buf, key, &table->key_info[active_index], key_len); + // The estimate should consider only key fields widths. + size_t packed_key_len; + uchar *packed_key = (uchar*) my_alloca(estimate_row_size(table)); + build_key_packed_row(active_index, buf, packed_key, &packed_key_len); + + bool exact = false; + xpand_connection::scan_type st; + switch (find_flag) { + case HA_READ_KEY_EXACT: + exact = true; + break; + case HA_READ_KEY_OR_NEXT: + st = xpand_connection::READ_KEY_OR_NEXT; + break; + case HA_READ_KEY_OR_PREV: + st = xpand_connection::READ_KEY_OR_PREV; + break; + case HA_READ_AFTER_KEY: + st = xpand_connection::READ_AFTER_KEY; + break; + case HA_READ_BEFORE_KEY: + st = xpand_connection::READ_BEFORE_KEY; + break; + case HA_READ_PREFIX: + case HA_READ_PREFIX_LAST: + case HA_READ_PREFIX_LAST_OR_PREV: + case HA_READ_MBR_CONTAIN: + case HA_READ_MBR_INTERSECT: + case HA_READ_MBR_WITHIN: + case HA_READ_MBR_DISJOINT: + case HA_READ_MBR_EQUAL: + DBUG_RETURN(ER_NOT_SUPPORTED_YET); + } + + uchar *rowdata = NULL; + if (exact) { + is_scan = false; + ulonglong rowdata_length; + error_code = trx->key_read(xpand_table_oid, 0, xpd_lock_type, + table->read_set, packed_key, packed_key_len, + &rowdata, &rowdata_length); + if (!error_code) + error_code = unpack_row_to_buf(rgi, table, buf, rowdata, table->read_set, + rowdata + rowdata_length); + } else { + is_scan = true; + error_code = trx->scan_from_key(xpand_table_oid, active_index, + xpd_lock_type, st, -1, sorted_scan, + &scan_fields, packed_key, packed_key_len, + THDVAR(thd, row_buffer), &scan_cur); + if (!error_code) + error_code = rnd_next(buf); + } + + if (rowdata) + my_free(rowdata); + + if (packed_key) + my_afree(packed_key); + + if (error_code == HA_ERR_TABLE_DEF_CHANGED) + xpand_mark_table_for_discovery(table); + + DBUG_RETURN(error_code); +} + +int ha_xpand::index_first(uchar *buf) +{ + DBUG_ENTER("ha_xpand::index_first"); + int error_code = 0; + THD *thd = ha_thd(); + xpand_connection *trx = get_trx(thd, &error_code); + if (!trx) + DBUG_RETURN(error_code); + + error_code = trx->scan_from_key(xpand_table_oid, active_index, xpd_lock_type, + xpand_connection::READ_FROM_START, -1, + sorted_scan, &scan_fields, NULL, 0, + THDVAR(thd, row_buffer), &scan_cur); + + if (error_code == HA_ERR_TABLE_DEF_CHANGED) + xpand_mark_table_for_discovery(table); + + if (error_code) + DBUG_RETURN(error_code); + + DBUG_RETURN(rnd_next(buf)); +} + +int ha_xpand::index_last(uchar *buf) +{ + DBUG_ENTER("ha_xpand::index_last"); + int error_code = 0; + THD *thd = ha_thd(); + xpand_connection *trx = get_trx(thd, &error_code); + if (!trx) + DBUG_RETURN(error_code); + + error_code = trx->scan_from_key(xpand_table_oid, active_index, xpd_lock_type, + xpand_connection::READ_FROM_LAST, -1, + sorted_scan, &scan_fields, NULL, 0, + THDVAR(thd, row_buffer), &scan_cur); + + if (error_code == HA_ERR_TABLE_DEF_CHANGED) + xpand_mark_table_for_discovery(table); + + if (error_code) + DBUG_RETURN(error_code); + + DBUG_RETURN(rnd_next(buf)); +} + +int ha_xpand::index_next(uchar *buf) +{ + DBUG_ENTER("index_next"); + DBUG_RETURN(rnd_next(buf)); +} + +#if 0 +int ha_xpand::index_next_same(uchar *buf, const uchar *key, uint keylen) +{ + DBUG_ENTER("index_next_same"); + DBUG_RETURN(rnd_next(buf)); +} +#endif + +int ha_xpand::index_prev(uchar *buf) +{ + DBUG_ENTER("index_prev"); + DBUG_RETURN(rnd_next(buf)); +} + +int ha_xpand::index_end() +{ + DBUG_ENTER("index_prev"); + if (scan_cur) + DBUG_RETURN(rnd_end()); + else + { + my_bitmap_free(&scan_fields); + DBUG_RETURN(0); + } +} + +int ha_xpand::rnd_init(bool scan) +{ + DBUG_ENTER("ha_xpand::rnd_init"); + int error_code = 0; + THD *thd = ha_thd(); + if (thd->lex->sql_command == SQLCOM_UPDATE) + DBUG_RETURN(error_code); + xpand_connection *trx = get_trx(thd, &error_code); + if (!trx) + DBUG_RETURN(error_code); + + add_current_table_to_rpl_table_list(&rgi, thd, table); + is_scan = scan; + scan_cur = NULL; + + if (my_bitmap_init(&scan_fields, NULL, table->read_set->n_bits, false)) + DBUG_RETURN(ER_OUTOFMEMORY); + +#if 0 + if (table->s->keys) + table->mark_columns_used_by_index(table->s->primary_key, &scan_fields); + else + bitmap_clear_all(&scan_fields); + + bitmap_union(&scan_fields, table->read_set); +#else + /* Why is read_set not setup correctly? */ + bitmap_set_all(&scan_fields); +#endif + + String* pushdown_cond_sql = nullptr; + if (pushdown_cond_list.elements) { + pushdown_cond_sql = new String(); + while (pushdown_cond_list.elements > 0) { + COND* cond = pushdown_cond_list.pop(); + String sql_predicate; + cond->print_for_table_def(&sql_predicate); + pushdown_cond_sql->append(sql_predicate); + if ( pushdown_cond_list.elements > 0) + pushdown_cond_sql->append(" AND "); + } + } + + error_code = trx->scan_table(xpand_table_oid, xpd_lock_type, &scan_fields, + THDVAR(thd, row_buffer), &scan_cur, + pushdown_cond_sql); + + if (pushdown_cond_sql != nullptr) + delete pushdown_cond_sql; + + if (error_code == HA_ERR_TABLE_DEF_CHANGED) + xpand_mark_table_for_discovery(table); + + if (error_code) + DBUG_RETURN(error_code); + + DBUG_RETURN(0); +} + +int ha_xpand::rnd_next(uchar *buf) +{ + int error_code = 0; + THD *thd = ha_thd(); + xpand_connection *trx = get_trx(thd, &error_code); + if (!trx) + return error_code; + + assert(is_scan); + assert(scan_cur); + + uchar *rowdata; + ulong rowdata_length; + if ((error_code = trx->scan_next(scan_cur, &rowdata, &rowdata_length))) + return error_code; + + if (has_hidden_key) { + last_hidden_key = *(ulonglong *)rowdata; + rowdata += 8; + rowdata_length -= 8; + } + + error_code = unpack_row_to_buf(rgi, table, buf, rowdata, &scan_fields, + rowdata + rowdata_length); + + if (error_code) + return error_code; + + return 0; +} + +int ha_xpand::rnd_pos(uchar * buf, uchar *pos) +{ + DBUG_ENTER("xpd_rnd_pos"); + DBUG_DUMP("pos", pos, ref_length); + + int error_code = 0; + THD *thd = ha_thd(); + xpand_connection *trx = get_trx(thd, &error_code); + if (!trx) + DBUG_RETURN(error_code); + + /* WDD: We need a way to convert key buffers directy to rbr buffers. */ + + if (has_hidden_key) { + memcpy(&last_hidden_key, pos, sizeof(ulonglong)); + } else { + uint keyno = table->s->primary_key; + uint len = calculate_key_len(table, keyno, pos, + table->const_key_parts[keyno]); + key_restore(buf, pos, &table->key_info[keyno], len); + } + + // The estimate should consider only key fields widths. + uchar *packed_key = (uchar*) my_alloca(estimate_row_size(table)); + size_t packed_key_len; + build_key_packed_row(table->s->primary_key, buf, packed_key, &packed_key_len); + + uchar *rowdata = NULL; + ulonglong rowdata_length; + if ((error_code = trx->key_read(xpand_table_oid, 0, xpd_lock_type, + table->read_set, packed_key, packed_key_len, + &rowdata, &rowdata_length))) + goto err; + + if ((error_code = unpack_row_to_buf(rgi, table, buf, rowdata, table->read_set, + rowdata + rowdata_length))) + goto err; + +err: + if (rowdata) + my_free(rowdata); + + if (packed_key) + my_afree(packed_key); + + if (error_code == HA_ERR_TABLE_DEF_CHANGED) + xpand_mark_table_for_discovery(table); + + DBUG_RETURN(error_code); +} + +int ha_xpand::rnd_end() +{ + DBUG_ENTER("ha_xpand::rnd_end"); + int error_code = 0; + THD *thd = ha_thd(); + if (thd->lex->sql_command == SQLCOM_UPDATE) + DBUG_RETURN(error_code); + + xpand_connection *trx = get_trx(thd, &error_code); + if (!trx) + DBUG_RETURN(error_code); + + my_bitmap_free(&scan_fields); + if (scan_cur && (error_code = trx->scan_end(scan_cur))) + DBUG_RETURN(error_code); + scan_cur = NULL; + + DBUG_RETURN(0); +} + +void ha_xpand::position(const uchar *record) +{ + DBUG_ENTER("xpd_position"); + if (has_hidden_key) { + memcpy(ref, &last_hidden_key, sizeof(ulonglong)); + } else { + KEY* key_info = table->key_info + table->s->primary_key; + key_copy(ref, record, key_info, key_info->key_length); + } + DBUG_DUMP("key", ref, ref_length); + DBUG_VOID_RETURN; +} + +uint ha_xpand::lock_count(void) const +{ + /* Hopefully, we don't need to use thread locks */ + return 0; +} + +THR_LOCK_DATA **ha_xpand::store_lock(THD *thd, THR_LOCK_DATA **to, + enum thr_lock_type lock_type) +{ + /* Hopefully, we don't need to use thread locks */ + return to; +} + +int ha_xpand::external_lock(THD *thd, int lock_type) +{ + DBUG_ENTER("ha_xpand::external_lock()"); + int error_code; + xpand_connection *trx = get_trx(thd, &error_code); + if (error_code) + DBUG_RETURN(error_code); + + if (lock_type == F_WRLCK) + xpd_lock_type = XPAND_EXCLUSIVE; + else if (lock_type == F_RDLCK) + xpd_lock_type = XPAND_SHARED; + else if (lock_type == F_UNLCK) + xpd_lock_type = XPAND_NO_LOCKS; + + if (lock_type != F_UNLCK) { + if (!trx->has_open_transaction()) { + error_code = trx->begin_transaction_next(); + if (error_code) + DBUG_RETURN(error_code); + } + + trans_register_ha(thd, FALSE, xpand_hton); + if (thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) + trans_register_ha(thd, TRUE, xpand_hton); + } + + DBUG_RETURN(error_code); +} + +/**************************************************************************** + Engine Condition Pushdown +****************************************************************************/ + +const COND *ha_xpand::cond_push(const COND *cond) +{ + THD *thd= ha_thd(); + if (!thd->lex->describe) { + pushdown_cond_list.push_front(const_cast<COND*>(cond)); + } + + return NULL; +} + +void ha_xpand::cond_pop() +{ + pushdown_cond_list.pop(); +} + +int ha_xpand::info_push(uint info_type, void *info) +{ + return 0; +} + +ulonglong ha_xpand::get_table_oid() +{ + return xpand_table_oid; +} + +/**************************************************************************** +** Row encoding functions +****************************************************************************/ + +void add_current_table_to_rpl_table_list(rpl_group_info **_rgi, THD *thd, + TABLE *table) +{ + if (*_rgi) + return; + + Relay_log_info *rli = new Relay_log_info(FALSE); + rli->sql_driver_thd = thd; + + rpl_group_info *rgi = new rpl_group_info(rli); + *_rgi = rgi; + rgi->thd = thd; + rgi->tables_to_lock_count = 0; + rgi->tables_to_lock = NULL; + if (rgi->tables_to_lock_count) + return; + + rgi->tables_to_lock = (RPL_TABLE_LIST *)my_malloc(sizeof(RPL_TABLE_LIST), + MYF(MY_WME)); + rgi->tables_to_lock->init_one_table(&table->s->db, &table->s->table_name, 0, + TL_READ); + rgi->tables_to_lock->table = table; + rgi->tables_to_lock->table_id = table->tablenr; + rgi->tables_to_lock->m_conv_table = NULL; + rgi->tables_to_lock->master_had_triggers = FALSE; + rgi->tables_to_lock->m_tabledef_valid = TRUE; + // We need one byte per column to save a column's binlog type. + uchar *col_type = (uchar*) my_alloca(table->s->fields); + for (uint i = 0 ; i < table->s->fields ; ++i) + col_type[i] = table->field[i]->binlog_type(); + + table_def *tabledef = &rgi->tables_to_lock->m_tabledef; + new (tabledef) table_def(col_type, table->s->fields, NULL, 0, NULL, 0); + rgi->tables_to_lock_count++; + if (col_type) + my_afree(col_type); +} + +void remove_current_table_from_rpl_table_list(rpl_group_info *rgi) +{ + if (!rgi->tables_to_lock) + return; + + rgi->tables_to_lock->m_tabledef.table_def::~table_def(); + rgi->tables_to_lock->m_tabledef_valid = FALSE; + my_free(rgi->tables_to_lock); + rgi->tables_to_lock_count--; + rgi->tables_to_lock = NULL; + delete rgi->rli; + delete rgi; +} + +void ha_xpand::build_key_packed_row(uint index, const uchar *buf, + uchar *packed_key, size_t *packed_key_len) +{ + if (index == table->s->primary_key && has_hidden_key) { + memcpy(packed_key, &last_hidden_key, sizeof(ulonglong)); + *packed_key_len = sizeof(ulonglong); + } else { + // make a row from the table + table->mark_columns_used_by_index(index, &table->tmp_set); + *packed_key_len = pack_row(table, &table->tmp_set, packed_key, buf); + } +} + +int unpack_row_to_buf(rpl_group_info *rgi, TABLE *table, uchar *data, + uchar const *const row_data, MY_BITMAP const *cols, + uchar const *const row_end) +{ + /* Since unpack_row can only write to record[0], if 'data' does not point to + table->record[0], we must back it up and then restore it afterwards. */ + uchar const *current_row_end; + ulong master_reclength; + uchar *backup_row = NULL; + if (data != table->record[0]) { + /* See Update_rows_log_event::do_exec_row(rpl_group_info *rgi) + and the definitions of store_record and restore_record. */ + backup_row = (uchar*) my_alloca(table->s->reclength); + memcpy(backup_row, table->record[0], table->s->reclength); + restore_record(table, record[data == table->record[1] ? 1 : 2]); + } + + int error_code = unpack_row(rgi, table, table->s->fields, row_data, cols, + ¤t_row_end, &master_reclength, row_end); + + if (backup_row) { + store_record(table, record[data == table->record[1] ? 1 : 2]); + memcpy(table->record[0], backup_row, table->s->reclength); + my_afree(backup_row); + } + + return error_code; +} + +/**************************************************************************** +** Plugin Functions +****************************************************************************/ + +static int xpand_commit(handlerton *hton, THD *thd, bool all) +{ + xpand_connection* trx = (xpand_connection *) thd_get_ha_data(thd, hton); + assert(trx); + + int error_code = 0; + if (trx->has_open_transaction()) { + if (all || !thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) + error_code = trx->commit_transaction(); + else + error_code = trx->new_statement_next(); + } + + return error_code; +} + +static int xpand_rollback(handlerton *hton, THD *thd, bool all) +{ + xpand_connection* trx = (xpand_connection *) thd_get_ha_data(thd, hton); + assert(trx); + + int error_code = 0; + if (trx->has_open_transaction()) { + if (all || !thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) + error_code = trx->rollback_transaction(); + else + error_code = trx->rollback_statement_next(); + } + + return error_code; +} + +static handler* xpand_create_handler(handlerton *hton, TABLE_SHARE *table, + MEM_ROOT *mem_root) +{ + return new (mem_root) ha_xpand(hton, table); +} + +static int xpand_close_connection(handlerton* hton, THD* thd) +{ + xpand_connection* trx = (xpand_connection *) thd_get_ha_data(thd, hton); + if (!trx) + return 0; /* Transaction is not started */ + + int error_code = xpand_rollback(xpand_hton, thd, TRUE); + + delete trx; + + return error_code; +} + +static int xpand_panic(handlerton *hton, ha_panic_function type) +{ + return 0; +} + +static bool xpand_show_status(handlerton *hton, THD *thd, + stat_print_fn *stat_print, + enum ha_stat_type stat_type) +{ + return FALSE; +} + +static int xpand_discover_table_names(handlerton *hton, LEX_CSTRING *db, + MY_DIR *dir, + handlerton::discovered_list *result) +{ + DBUG_ENTER("xpand_discover_table_names"); + xpand_connection *xpand_net = new xpand_connection(); + int error_code = xpand_net->connect(); + if (error_code) { + if (error_code == HA_ERR_NO_CONNECTION) + error_code = 0; + goto err; + } + + error_code = xpand_net->populate_table_list(db, result); + +err: + delete xpand_net; + DBUG_RETURN(error_code); +} + +int xpand_discover_table(handlerton *hton, THD *thd, TABLE_SHARE *share) +{ + DBUG_ENTER("xpand_discover_table"); + xpand_connection *xpand_net = new xpand_connection(); + int error_code = xpand_net->connect(); + if (error_code) { + if (error_code == HA_ERR_NO_CONNECTION) + error_code = HA_ERR_NO_SUCH_TABLE; + goto err; + } + + error_code = xpand_net->discover_table_details(&share->db, &share->table_name, + thd, share); + +err: + delete xpand_net; + DBUG_RETURN(error_code); +} + +static int xpand_init(void *p) +{ + DBUG_ENTER("xpand_init"); + + xpand_hton = (handlerton *) p; + xpand_hton->flags = HTON_NO_FLAGS; + xpand_hton->panic = xpand_panic; + xpand_hton->close_connection = xpand_close_connection; + xpand_hton->commit = xpand_commit; + xpand_hton->rollback = xpand_rollback; + xpand_hton->create = xpand_create_handler; + xpand_hton->show_status = xpand_show_status; + xpand_hton->discover_table_names = xpand_discover_table_names; + xpand_hton->discover_table = xpand_discover_table; + xpand_hton->create_select = create_xpand_select_handler; + xpand_hton->create_derived = create_xpand_derived_handler; + + mysql_rwlock_init(key_xpand_hosts, &xpand_hosts_lock); + mysql_rwlock_wrlock(&xpand_hosts_lock); + xpand_hosts = static_cast<xpand_host_list*>( + my_malloc(sizeof(xpand_host_list), MYF(MY_WME | MY_ZEROFILL))); + int error_code = xpand_hosts->fill(xpand_hosts_str); + if (error_code) { + my_free(xpand_hosts); + xpand_hosts = NULL; + } + mysql_rwlock_unlock(&xpand_hosts_lock); + DBUG_RETURN(error_code); +} + +static int xpand_deinit(void *p) +{ + DBUG_ENTER("xpand_deinit"); + mysql_rwlock_wrlock(&xpand_hosts_lock); + xpand_hosts->empty(); + my_free(xpand_hosts); + xpand_hosts = NULL; + mysql_rwlock_destroy(&xpand_hosts_lock); + DBUG_RETURN(0); +} + +struct st_mysql_show_var xpand_status_vars[] = +{ + {NullS, NullS, SHOW_LONG} +}; + +static struct st_mysql_sys_var* xpand_system_variables[] = +{ + MYSQL_SYSVAR(connect_timeout), + MYSQL_SYSVAR(read_timeout), + MYSQL_SYSVAR(write_timeout), + MYSQL_SYSVAR(balance_algorithm), + MYSQL_SYSVAR(hosts), + MYSQL_SYSVAR(username), + MYSQL_SYSVAR(password), + MYSQL_SYSVAR(port), + MYSQL_SYSVAR(socket), + MYSQL_SYSVAR(row_buffer), + MYSQL_SYSVAR(select_handler), + MYSQL_SYSVAR(derived_handler), + MYSQL_SYSVAR(enable_direct_update), + NULL +}; + +static struct st_mysql_storage_engine xpand_storage_engine = + {MYSQL_HANDLERTON_INTERFACE_VERSION}; + +maria_declare_plugin(xpand) +{ + MYSQL_STORAGE_ENGINE_PLUGIN, /* Plugin Type */ + &xpand_storage_engine, /* Plugin Descriptor */ + "XPAND", /* Plugin Name */ + "MariaDB", /* Plugin Author */ + "Xpand storage engine", /* Plugin Description */ + PLUGIN_LICENSE_GPL, /* Plugin Licence */ + xpand_init, /* Plugin Entry Point */ + xpand_deinit, /* Plugin Deinitializer */ + 0x0001, /* Hex Version Number (0.1) */ + NULL /* xpand_status_vars */, /* Status Variables */ + xpand_system_variables, /* System Variables */ + "0.1", /* String Version */ + MariaDB_PLUGIN_MATURITY_EXPERIMENTAL /* Maturity Level */ +} +maria_declare_plugin_end; diff --git a/storage/xpand/ha_xpand.h b/storage/xpand/ha_xpand.h new file mode 100644 index 00000000000..808e516edd3 --- /dev/null +++ b/storage/xpand/ha_xpand.h @@ -0,0 +1,147 @@ +/***************************************************************************** +Copyright (c) 2019, 2020, MariaDB Corporation. +*****************************************************************************/ + +#ifndef _ha_xpand_h +#define _ha_xpand_h + +#ifdef USE_PRAGMA_INTERFACE +#pragma interface /* gcc class implementation */ +#endif + +#define MYSQL_SERVER 1 +#include "xpand_connection.h" +#include "my_bitmap.h" +#include "table.h" +#include "rpl_rli.h" +#include "handler.h" +#include "sql_class.h" +#include "sql_show.h" +#include "mysql.h" +#include "../../sql/rpl_record.h" + +size_t estimate_row_size(TABLE *table); +xpand_connection *get_trx(THD *thd, int *error_code); +bool get_enable_sh(THD* thd); +void add_current_table_to_rpl_table_list(rpl_group_info **_rgi, THD *thd, + TABLE *table); +void remove_current_table_from_rpl_table_list(rpl_group_info *rgi); +int unpack_row_to_buf(rpl_group_info *rgi, TABLE *table, uchar *data, + uchar const *const row_data, MY_BITMAP const *cols, + uchar const *const row_end); +void xpand_mark_tables_for_discovery(LEX *lex); +ulonglong *xpand_extract_table_oids(THD *thd, LEX *lex); + + +class Xpand_share : public Handler_share { +public: + Xpand_share(): xpand_table_oid(0), rediscover_table(false) {} + + std::atomic<ulonglong> xpand_table_oid; + std::atomic<bool> rediscover_table; +}; + +class ha_xpand : public handler +{ +private: + // TODO: do we need this here or one in share would be sufficient? + ulonglong xpand_table_oid; + rpl_group_info *rgi; + + Field *auto_inc_field; + ulonglong auto_inc_value; + + bool has_hidden_key; + ulonglong last_hidden_key; + xpand_connection_cursor *scan_cur; + bool is_scan; + MY_BITMAP scan_fields; + bool sorted_scan; + xpand_lock_mode_t xpd_lock_type; + + uint last_dup_errkey; + + typedef enum xpand_upsert_flags { + XPAND_HAS_UPSERT= 1, + XPAND_BULK_UPSERT= 2, + XPAND_UPSERT_SENT= 4 + } xpd_upsert_flags_t; + int upsert_flag; + + List<COND> pushdown_cond_list; + + Xpand_share *get_share(); ///< Get the share + +public: + ha_xpand(handlerton *hton, TABLE_SHARE *table_arg); + ~ha_xpand(); + int create(const char *name, TABLE *form, HA_CREATE_INFO *info) override; + int delete_table(const char *name) override; + int rename_table(const char* from, const char* to) override; + int open(const char *name, int mode, uint test_if_locked) override; + int close(void) override; + int reset() override; + int extra(enum ha_extra_function operation) override; + int write_row(const uchar *buf) override; + // start_bulk_update exec_bulk_update + int update_row(const uchar *old_data, const uchar *new_data) override; + // start_bulk_delete exec_bulk_delete + int delete_row(const uchar *buf) override; + int direct_update_rows_init(List<Item> *update_fields) override; + int direct_update_rows(ha_rows *update_rows, ha_rows *found_rows) override; + void start_bulk_insert(ha_rows rows, uint flags = 0) override; + int end_bulk_insert() override; + + Table_flags table_flags(void) const override; + ulong index_flags(uint idx, uint part, bool all_parts) const override; + uint max_supported_keys() const override { return MAX_KEY; } + + ha_rows records() override; + ha_rows records_in_range(uint inx, key_range *min_key, + key_range *max_key) override; + + int info(uint flag) override; // see my_base.h for full description + + // multi_read_range + // read_range + int index_init(uint idx, bool sorted) override; + int index_read(uchar * buf, const uchar * key, uint key_len, + enum ha_rkey_function find_flag) override; + int index_first(uchar *buf) override; + int index_prev(uchar *buf) override; + int index_last(uchar *buf) override; + int index_next(uchar *buf) override; + //int index_next_same(uchar *buf, const uchar *key, uint keylen) override; + int index_end() override; + + int rnd_init(bool scan) override; + int rnd_next(uchar *buf) override; + int rnd_pos(uchar * buf, uchar *pos) override; + int rnd_end() override; + + void position(const uchar *record) override; + uint lock_count(void) const override; + THR_LOCK_DATA **store_lock(THD *thd, + THR_LOCK_DATA **to, + enum thr_lock_type lock_type) override; + int external_lock(THD *thd, int lock_type) override; + + uint8 table_cache_type() override + { + return(HA_CACHE_TBL_NOCACHE); + } + + const COND *cond_push(const COND *cond) override; + void cond_pop() override; + int info_push(uint info_type, void *info) override; + + ulonglong get_table_oid(); +private: + void build_key_packed_row(uint index, const uchar *buf, + uchar *packed_key, size_t *packed_key_len); +}; + +bool select_handler_setting(THD* thd); +bool derived_handler_setting(THD* thd); +uint row_buffer_setting(THD* thd); +#endif // _ha_xpand_h diff --git a/storage/xpand/ha_xpand_pushdown.cc b/storage/xpand/ha_xpand_pushdown.cc new file mode 100644 index 00000000000..e51bc162236 --- /dev/null +++ b/storage/xpand/ha_xpand_pushdown.cc @@ -0,0 +1,484 @@ +/***************************************************************************** +Copyright (c) 2019, 2020, MariaDB Corporation. +*****************************************************************************/ + +#include "ha_xpand.h" +#include "ha_xpand_pushdown.h" + +extern handlerton *xpand_hton; +extern uint xpand_row_buffer; + +/*@brief Fills up array data types, metadata and nullability*/ +/************************************************************ + * DESCRIPTION: + * Fills up three arrays with: field binlog data types, field + * metadata and nullability bitmask as in Table_map_log_event + * ctor. Internally creates a temporary table as does + * Pushdown_select. DH uses the actual temp table w/o + * b/c create_DH is called later compared to create_SH. + * More details in server/sql/log_event_server.cc + * PARAMETERS: + * thd - THD* + * table__ - TABLE* temp table for the results + * sl - SELECT_LEX* + * fieldtype - uchar* + * field_metadata - uchar* + * null_bits - uchar* + * num_null_bytes - null bit size + * fields_count - a number of fields + * RETURN: + * metadata_size int or -1 in case of error + ************************************************************/ +int get_field_types(THD *thd, TABLE *table__, SELECT_LEX *sl, uchar *fieldtype, + uchar *field_metadata, uchar *null_bits, + const int num_null_bytes, const uint fields_count) +{ + int field_metadata_size = 0; + int metadata_index = 0; + TABLE *tmp_table= table__; + + if (!tmp_table) { + // Construct a tmp table with fields to find out result DTs. + // This should be reconsidered if it worths the effort. + List<Item> types; + TMP_TABLE_PARAM tmp_table_param; + sl->master_unit()->join_union_item_types(thd, types, 1); + tmp_table_param.init(); + tmp_table_param.field_count= types.elements; + + tmp_table = create_tmp_table(thd, &tmp_table_param, types, (ORDER *) 0, + false, 0, TMP_TABLE_ALL_COLUMNS, 1, + &empty_clex_str, true, false); + if (!tmp_table) { + field_metadata_size = -1; + goto err; + } + } + + for (unsigned int i = 0 ; i < fields_count; ++i) { + fieldtype[i]= tmp_table->field[i]->binlog_type(); + } + + bzero(field_metadata, (fields_count * 2)); + for (unsigned int i= 0 ; i < fields_count ; i++) + { + Binlog_type_info bti= tmp_table->field[i]->binlog_type_info(); + uchar *ptr = reinterpret_cast<uchar*>(&bti.m_metadata); + memcpy(&field_metadata[metadata_index], ptr, bti.m_metadata_size); + metadata_index+= bti.m_metadata_size; + } + + if (metadata_index < 251) + field_metadata_size += metadata_index + 1; + else + field_metadata_size += metadata_index + 3; + + bzero(null_bits, num_null_bytes); + for (unsigned int i= 0 ; i < fields_count ; ++i) { + if (tmp_table->field[i]->maybe_null()) { + null_bits[(i / 8)]+= 1 << (i % 8); + } + } + + if (!table__) + free_tmp_table(thd, tmp_table); +err: + return field_metadata_size; +} + +/*@brief create_xpand_select_handler- Creates handler*/ +/************************************************************ + * DESCRIPTION: + * Creates a select handler + * More details in server/sql/select_handler.h + * PARAMETERS: + * thd - THD pointer. + * sel - SELECT_LEX* that describes the query. + * RETURN: + * select_handler if possible + * NULL otherwise + ************************************************************/ +select_handler* +create_xpand_select_handler(THD* thd, SELECT_LEX* select_lex) +{ + ulonglong *oids = NULL; + ha_xpand_select_handler *sh = NULL; + if (!select_handler_setting(thd)) { + return sh; + } + + // TODO Return early for EXPLAIN before we run the actual scan. + // We can send compile request when we separate compilation + // and execution. + xpand_connection_cursor *scan = NULL; + if (thd->lex->describe) { + sh = new ha_xpand_select_handler(thd, select_lex, scan); + return sh; + } + + // Multi-update runs an implicit query to collect constraints. + // SH couldn't be used for this. + if (thd->lex->sql_command == SQLCOM_UPDATE_MULTI) { + return sh; + } + + String query; + // Print the query into a string provided + select_lex->print(thd, &query, QT_ORDINARY); + int error_code = 0; + int field_metadata_size = 0; + xpand_connection *trx = NULL; + + // We presume this number is equal to types.elements in get_field_types + uint items_number = select_lex->get_item_list()->elements; + uint num_null_bytes = (items_number + 7) / 8; + uchar *fieldtype = NULL; + uchar *null_bits = NULL; + uchar *field_metadata = NULL; + uchar *meta_memory= (uchar *)my_multi_malloc(MYF(MY_WME), &fieldtype, items_number, + &null_bits, num_null_bytes, &field_metadata, (items_number * 2), NULL); + + if (!meta_memory) { + // The only way to say something here is to raise warning + // b/c we will fallback to other access methods: derived handler or rowstore. + goto err; + } + + if((field_metadata_size = + get_field_types(thd, NULL, select_lex, fieldtype, field_metadata, + null_bits, num_null_bytes, items_number)) < 0) { + goto err; + } + + trx = get_trx(thd, &error_code); + if (!trx) + goto err; + + if (!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) + trx->auto_commit_next(); + + oids = xpand_extract_table_oids(thd, select_lex->parent_lex); + if ((error_code = trx->scan_query(query, fieldtype, items_number, null_bits, + num_null_bytes, field_metadata, + field_metadata_size, + row_buffer_setting(thd), oids, &scan))) { + goto err; + } + + sh = new ha_xpand_select_handler(thd, select_lex, scan); + +err: + if (meta_memory) + my_free(meta_memory); + + if (error_code == HA_ERR_TABLE_DEF_CHANGED) + xpand_mark_tables_for_discovery(select_lex->parent_lex); + + return sh; +} + +/*********************************************************** + * DESCRIPTION: + * select_handler constructor + * PARAMETERS: + * thd - THD pointer. + * select_lex - sematic tree for the query. + **********************************************************/ +ha_xpand_select_handler::ha_xpand_select_handler( + THD *thd, + SELECT_LEX* select_lex, + xpand_connection_cursor *scan_) + : select_handler(thd, xpand_hton) +{ + thd__ = thd; + scan = scan_; + select = select_lex; + rgi = NULL; +} + +/*********************************************************** + * DESCRIPTION: + * select_handler constructor + * This frees dynamic memory allocated for bitmap + * and disables replication to SH temp table. + **********************************************************/ +ha_xpand_select_handler::~ha_xpand_select_handler() +{ + int error_code; + xpand_connection *trx = get_trx(thd, &error_code); + if (!trx) { + // TBD Log this + } + if (trx && scan) + trx->scan_end(scan); + + // If the ::init_scan has been executed + if (table__) + my_bitmap_free(&scan_fields); + + if (rgi) + remove_current_table_from_rpl_table_list(rgi); +} + +/*@brief Initiate the query for select_handler */ +/*********************************************************** + * DESCRIPTION: + * Initializes dynamic structures and sets SH temp table + * as RBR replication destination to unpack rows. + * * PARAMETERS: + * RETURN: + * rc as int + * ********************************************************/ +int ha_xpand_select_handler::init_scan() +{ + // Save this into the base handler class attribute + table__ = table; + // need this bitmap future in next_row() + if (my_bitmap_init(&scan_fields, NULL, table->read_set->n_bits, false)) + return ER_OUTOFMEMORY; + bitmap_set_all(&scan_fields); + + add_current_table_to_rpl_table_list(&rgi, thd__, table__); + + return 0; +} + +/*@brief Fetch next row for select_handler */ +/*********************************************************** + * DESCRIPTION: + * Fetch next row for select_handler. + * PARAMETERS: + * RETURN: + * rc as int + * ********************************************************/ +int ha_xpand_select_handler::next_row() +{ + int error_code = 0; + xpand_connection *trx = get_trx(thd, &error_code); + if (!trx) + return error_code; + + assert(scan); + + uchar *rowdata; + ulong rowdata_length; + if ((error_code = trx->scan_next(scan, &rowdata, &rowdata_length))) + return error_code; + + uchar const *current_row_end; + ulong master_reclength; + + error_code = unpack_row(rgi, table, table->s->fields, rowdata, + &scan_fields, ¤t_row_end, + &master_reclength, rowdata + rowdata_length); + + if (error_code) + return error_code; + + return 0; +} + +/*@brief Finishes the scan and clean it up */ +/*********************************************************** + * DESCRIPTION: + * Finishes the scan for select handler + * PARAMETERS: + * RETURN: + * rc as int + ***********************************************************/ +int ha_xpand_select_handler::end_scan() +{ + return 0; +} + +/*@brief create_xpand_derived_handler- Creates handler*/ +/************************************************************ + * DESCRIPTION: + * Creates a derived handler + * More details in server/sql/derived_handler.h + * PARAMETERS: + * thd - THD pointer. + * derived - TABLE_LIST* that describes the tables involved + * RETURN: + * derived_handler if possible + * NULL otherwise + ************************************************************/ +derived_handler* +create_xpand_derived_handler(THD* thd, TABLE_LIST *derived) +{ + ha_xpand_derived_handler *dh = NULL; + if (!derived_handler_setting(thd)) { + return dh; + } + + SELECT_LEX_UNIT *unit= derived->derived; + SELECT_LEX *select_lex = unit->first_select(); + String query; + + dh = new ha_xpand_derived_handler(thd, select_lex, NULL); + + return dh; +} + +/*********************************************************** + * DESCRIPTION: + * derived_handler constructor + * PARAMETERS: + * thd - THD pointer. + * select_lex - sematic tree for the query. + **********************************************************/ +ha_xpand_derived_handler::ha_xpand_derived_handler( + THD *thd, + SELECT_LEX* select_lex, + xpand_connection_cursor *scan_) + : derived_handler(thd, xpand_hton) +{ + thd__ = thd; + scan = scan_; + select = select_lex; + rgi = NULL; +} + +/*********************************************************** + * DESCRIPTION: + * derived_handler constructor + * This frees dynamic memory allocated for bitmap + * and disables replication to SH temp table. + **********************************************************/ +ha_xpand_derived_handler::~ha_xpand_derived_handler() +{ + int error_code; + xpand_connection *trx = get_trx(thd, &error_code); + if (!trx) { + // TBD Log this. + } + if (trx && scan) + trx->scan_end(scan); + + // If the ::init_scan has been executed + if (table__) + my_bitmap_free(&scan_fields); + + if (rgi) + remove_current_table_from_rpl_table_list(rgi); +} + +/*@brief Initiate the query for derived_handler */ +/*********************************************************** + * DESCRIPTION: + * Initializes dynamic structures and sets SH temp table + * as RBR replication destination to unpack rows. + * * PARAMETERS: + * RETURN: + * rc as int + * ********************************************************/ +int ha_xpand_derived_handler::init_scan() +{ + String query; + // Print the query into a string provided + select->print(thd__, &query, QT_ORDINARY); + int error_code = 0; + int field_metadata_size = 0; + xpand_connection *trx = NULL; + ulonglong *oids = NULL; + + // We presume this number is equal to types.elements in get_field_types + uint items_number= select->get_item_list()->elements; + uint num_null_bytes = (items_number + 7) / 8; + uchar *fieldtype = NULL; + uchar *null_bits = NULL; + uchar *field_metadata = NULL; + uchar *meta_memory= (uchar *)my_multi_malloc(MYF(MY_WME), &fieldtype, items_number, + &null_bits, num_null_bytes, &field_metadata, (items_number * 2), NULL); + + if (!meta_memory) { + // The only way to say something here is to raise warning + // b/c we will fallback to other access methods: derived handler or rowstore. + goto err; + } + + if((field_metadata_size= + get_field_types(thd__, table, select, fieldtype, field_metadata, + null_bits, num_null_bytes, items_number)) < 0) { + goto err; + } + + trx = get_trx(thd__, &error_code); + if (!trx) + goto err; + + oids = xpand_extract_table_oids(thd__, select->parent_lex); + if ((error_code = trx->scan_query(query, fieldtype, items_number, null_bits, + num_null_bytes, field_metadata, + field_metadata_size, + row_buffer_setting(thd), oids, &scan))) { + goto err; + } + + // Save this into the base handler class attribute + table__ = table; + + // need this bitmap future in next_row() + if (my_bitmap_init(&scan_fields, NULL, table->read_set->n_bits, false)) + return ER_OUTOFMEMORY; + bitmap_set_all(&scan_fields); + + add_current_table_to_rpl_table_list(&rgi, thd__, table__); + +err: + if (meta_memory) + my_free(meta_memory); + + if (error_code == HA_ERR_TABLE_DEF_CHANGED) + xpand_mark_tables_for_discovery(select->parent_lex); + + return error_code; +} + +/*@brief Fetch next row for derived_handler */ +/*********************************************************** + * DESCRIPTION: + * Fetch next row for derived_handler. + * PARAMETERS: + * RETURN: + * rc as int + * ********************************************************/ +int ha_xpand_derived_handler::next_row() +{ + int error_code = 0; + xpand_connection *trx = get_trx(thd, &error_code); + if (!trx) + return error_code; + + assert(scan); + + uchar *rowdata; + ulong rowdata_length; + if ((error_code = trx->scan_next(scan, &rowdata, &rowdata_length))) + return error_code; + + uchar const *current_row_end; + ulong master_reclength; + + error_code = unpack_row(rgi, table, table->s->fields, rowdata, + &scan_fields, ¤t_row_end, + &master_reclength, rowdata + rowdata_length); + + if (error_code) + return error_code; + + return 0; +} + +/*@brief Finishes the scan and clean it up */ +/*********************************************************** + * DESCRIPTION: + * Finishes the scan for derived handler + * PARAMETERS: + * RETURN: + * rc as int + ***********************************************************/ +int ha_xpand_derived_handler::end_scan() +{ + return 0; +} diff --git a/storage/xpand/ha_xpand_pushdown.h b/storage/xpand/ha_xpand_pushdown.h new file mode 100644 index 00000000000..16284b8e076 --- /dev/null +++ b/storage/xpand/ha_xpand_pushdown.h @@ -0,0 +1,84 @@ +/***************************************************************************** +Copyright (c) 2019, 2020, MariaDB Corporation. +*****************************************************************************/ +#ifndef _ha_xpand_pushdown_h +#define _ha_xpand_pushdown_h + +#include "select_handler.h" +#include "derived_handler.h" +#include "sql_select.h" + +/*@brief base_handler class*/ +/*********************************************************** + * DESCRIPTION: + * To be described + ************************************************************/ +class ha_xpand_base_handler +{ + // To simulate abstract class +protected: + ha_xpand_base_handler(): thd__(0),table__(0) {} + ~ha_xpand_base_handler() {} + + // Copies of pushdown handlers attributes + // to use them in shared methods. + THD *thd__; + TABLE *table__; + // The bitmap used to sent + MY_BITMAP scan_fields; + // Structures to unpack RBR rows from XPD BE + rpl_group_info *rgi; + // XPD BE scan operation reference + xpand_connection_cursor *scan; +}; + +/*@brief select_handler class*/ +/*********************************************************** + * DESCRIPTION: + * select_handler API methods. Could be used by the server + * tp pushdown the whole query described by SELECT_LEX. + * More details in server/sql/select_handler.h + * sel semantic tree for the query in SELECT_LEX. + ************************************************************/ +class ha_xpand_select_handler: + private ha_xpand_base_handler, + public select_handler +{ +public: + ha_xpand_select_handler(THD* thd_arg, SELECT_LEX* sel, + xpand_connection_cursor *scan); + ~ha_xpand_select_handler(); + + int init_scan() override; + int next_row() override; + int end_scan() override; + void print_error(int, unsigned long) override {} +}; + +/*@brief derived_handler class*/ +/*********************************************************** + * DESCRIPTION: + * derived_handler API methods. Could be used by the server + * tp pushdown the whole query described by SELECT_LEX. + * More details in server/sql/derived_handler.h + * sel semantic tree for the query in SELECT_LEX. + ************************************************************/ +class ha_xpand_derived_handler: + private ha_xpand_base_handler, + public derived_handler +{ +public: + ha_xpand_derived_handler(THD* thd_arg, SELECT_LEX* sel, + xpand_connection_cursor *scan); + ~ha_xpand_derived_handler(); + + int init_scan() override; + int next_row() override; + int end_scan() override; + void print_error(int, unsigned long) override {} +}; + +select_handler *create_xpand_select_handler(THD* thd, SELECT_LEX* select_lex); +derived_handler *create_xpand_derived_handler(THD* thd, TABLE_LIST *derived); + +#endif diff --git a/storage/xpand/xpand_connection.cc b/storage/xpand/xpand_connection.cc new file mode 100644 index 00000000000..e6b5059322b --- /dev/null +++ b/storage/xpand/xpand_connection.cc @@ -0,0 +1,1357 @@ +/***************************************************************************** +Copyright (c) 2019, 2020, MariaDB Corporation. +*****************************************************************************/ + +/** @file xpand_connection.cc */ + +#include "xpand_connection.h" +#include "ha_xpand.h" +#include <string> +#include "handler.h" +#include "table.h" +#include "sql_class.h" +#include "my_pthread.h" +#include "tztime.h" +#include "errmsg.h" + +#ifdef _WIN32 +#include <stdlib.h> +#define htobe64 _byteswap_uint64 +#define be64toh _byteswap_uint64 +#define htobe32 _byteswap_ulong +#define be32toh _byteswap_ulong +#define htobe16 _byteswap_ushort +#define be16toh _byteswap_ushort +#endif + +#if defined(__APPLE__) +#include <libkern/OSByteOrder.h> +#define htobe64(x) OSSwapHostToBigInt64(x) +#define be64toh(x) OSSwapBigToHostInt64(x) +#define htobe32(x) OSSwapHostToBigInt32(x) +#define be32toh(x) OSSwapBigToHostInt32(x) +#define htobe16(x) OSSwapHostToBigInt16(x) +#define be16toh(x) OSSwapBigToHostInt16(x) +#endif + + +extern int xpand_connect_timeout; +extern int xpand_read_timeout; +extern int xpand_write_timeout; +extern char *xpand_username; +extern char *xpand_password; +extern uint xpand_port; +extern char *xpand_socket; + +/* + This class implements the commands that can be sent to the cluster by the + Xpand engine. All of these commands return a status to the caller, but some + commands also create open invocations on the cluster, which must be closed by + sending additional commands. + + Transactions on the cluster are started using flags attached to commands, and + transactions are committed or rolled back using separate commands. + + Methods ending with _next affect the transaction state after the next command + is sent to the cluster. Other transaction commands are sent to the cluster + immediately, and the state is changed before they return. + + _____________________ _______________________ + | | | | | | + V | | V | | + NONE --> REQUESTED --> STARTED --> NEW_STMT | + | | + `----> ROLLBACK_STMT ---` + + The commit and rollback commands will change any other state to NONE. This + includes the REQUESTED state, for which nothing will be sent to the cluster. + The rollback statement command can likewise change the state from NEW_STMT to + STARTED without sending anything to the cluster. + + In addition, the XPAND_TRANS_AUTOCOMMIT flag will cause the transactions + for commands that complete without leaving open invocations on the cluster to + be committed if successful or rolled back if there was an error. If + auto-commit is enabled, only one open invocation may be in progress at a + time. +*/ + +enum xpand_trans_state { + XPAND_TRANS_STARTED = 0, + XPAND_TRANS_REQUESTED = 1, + XPAND_TRANS_NEW_STMT = 2, + XPAND_TRANS_ROLLBACK_STMT = 4, + XPAND_TRANS_NONE = 32, +}; +const int XPAND_TRANS_STARTS_STMT = (XPAND_TRANS_NEW_STMT | + XPAND_TRANS_REQUESTED | + XPAND_TRANS_ROLLBACK_STMT); + +enum xpand_trans_post_flags { + XPAND_TRANS_AUTOCOMMIT = 8, + XPAND_TRANS_NO_POST_FLAGS = 0, +}; + +enum xpand_commands { + XPAND_WRITE_ROW = 1, + XPAND_SCAN_TABLE, + XPAND_SCAN_NEXT, + XPAND_SCAN_STOP, + XPAND_KEY_READ, + XPAND_KEY_DELETE, + XPAND_SCAN_QUERY, + XPAND_KEY_UPDATE, + XPAND_SCAN_FROM_KEY, + XPAND_UPDATE_QUERY, + XPAND_COMMIT, + XPAND_ROLLBACK, + XPAND_SCAN_TABLE_COND, +}; + +/**************************************************************************** +** Class xpand_connection +****************************************************************************/ +xpand_connection::xpand_connection() + : command_buffer(NULL), command_buffer_length(0), command_length(0), + trans_state(XPAND_TRANS_NONE), trans_flags(XPAND_TRANS_NO_POST_FLAGS) +{ + DBUG_ENTER("xpand_connection::xpand_connection"); + memset(&xpand_net, 0, sizeof(MYSQL)); + DBUG_VOID_RETURN; +} + +xpand_connection::~xpand_connection() +{ + DBUG_ENTER("xpand_connection::~xpand_connection"); + if (is_connected()) + disconnect(TRUE); + + if (command_buffer) + my_free(command_buffer); + DBUG_VOID_RETURN; +} + +void xpand_connection::disconnect(bool is_destructor) +{ + DBUG_ENTER("xpand_connection::disconnect"); + if (is_destructor) + { + /* + Connection object destruction occurs after the destruction of + the thread used by the network has begun, so usage of that + thread object now is not reliable + */ + xpand_net.net.thd = NULL; + } + mysql_close(&xpand_net); + DBUG_VOID_RETURN; +} + +extern int xpand_hosts_cur; +extern ulong xpand_balance_algorithm; + +extern mysql_rwlock_t xpand_hosts_lock; +extern xpand_host_list *xpand_hosts; + +int xpand_connection::connect() +{ + DBUG_ENTER("xpand_connection::connect"); + int start = 0; + if (xpand_balance_algorithm == XPAND_BALANCE_ROUND_ROBIN) + start = my_atomic_add32(&xpand_hosts_cur, 1); + + mysql_rwlock_rdlock(&xpand_hosts_lock); + + //search for available host + int error_code = HA_ERR_NO_CONNECTION; + for (int i = 0; i < xpand_hosts->hosts_len; i++) { + char *host = xpand_hosts->hosts[(start + i) % xpand_hosts->hosts_len]; + error_code = connect_direct(host); + if (!error_code) + break; + } + mysql_rwlock_unlock(&xpand_hosts_lock); + DBUG_RETURN(error_code); +} + + +int xpand_connection::connect_direct(char *host) +{ + DBUG_ENTER("xpand_connection::connect_direct"); + my_bool my_true = true; + DBUG_PRINT("host", ("%s", host)); + + if (!mysql_init(&xpand_net)) + DBUG_RETURN(HA_ERR_OUT_OF_MEM); + + uint protocol_tcp = MYSQL_PROTOCOL_TCP; + mysql_options(&xpand_net, MYSQL_OPT_PROTOCOL, &protocol_tcp); + mysql_options(&xpand_net, MYSQL_OPT_READ_TIMEOUT, + &xpand_read_timeout); + mysql_options(&xpand_net, MYSQL_OPT_WRITE_TIMEOUT, + &xpand_write_timeout); + mysql_options(&xpand_net, MYSQL_OPT_CONNECT_TIMEOUT, + &xpand_connect_timeout); + mysql_options(&xpand_net, MYSQL_OPT_USE_REMOTE_CONNECTION, + NULL); + mysql_options(&xpand_net, MYSQL_SET_CHARSET_NAME, "utf8mb4"); + mysql_options(&xpand_net, MYSQL_OPT_USE_THREAD_SPECIFIC_MEMORY, + (char *) &my_true); + mysql_options(&xpand_net, MYSQL_INIT_COMMAND,"SET autocommit=0"); + +#ifdef XPAND_CONNECTION_SSL + if (opt_ssl_ca_length | conn->tgt_ssl_capath_length | + conn->tgt_ssl_cert_length | conn->tgt_ssl_key_length) + { + mysql_ssl_set(&xpand_net, conn->tgt_ssl_key, conn->tgt_ssl_cert, + conn->tgt_ssl_ca, conn->tgt_ssl_capath, conn->tgt_ssl_cipher); + if (conn->tgt_ssl_vsc) + { + my_bool verify_flg = TRUE; + mysql_options(&xpand_net, MYSQL_OPT_SSL_VERIFY_SERVER_CERT, &verify_flg); + } + } +#endif + + int error_code = 0; + if (!mysql_real_connect(&xpand_net, host, xpand_username, xpand_password, + NULL, xpand_port, xpand_socket, + CLIENT_MULTI_STATEMENTS)) + { + sql_print_error("Error connecting to xpand: %s", mysql_error(&xpand_net)); + disconnect(); + error_code = HA_ERR_NO_CONNECTION; + } + + DBUG_RETURN(error_code); +} + +int xpand_connection::add_status_vars() +{ + DBUG_ENTER("xpand_connection::add_status_vars"); + + if (!(trans_state & XPAND_TRANS_STARTS_STMT)) + DBUG_RETURN(add_command_operand_uchar(0)); + + int error_code = 0; + system_variables vars = current_thd->variables; + if ((error_code = add_command_operand_uchar(1))) + DBUG_RETURN(error_code); + //sql mode + if ((error_code = add_command_operand_ulonglong(vars.sql_mode))) + DBUG_RETURN(error_code); + //auto increment state + if ((error_code = add_command_operand_ushort(vars.auto_increment_increment))) + DBUG_RETURN(error_code); + if ((error_code = add_command_operand_ushort(vars.auto_increment_offset))) + DBUG_RETURN(error_code); + //character sets and collations + if ((error_code = add_command_operand_ushort(vars.character_set_results->number))) + DBUG_RETURN(error_code); + if ((error_code = add_command_operand_ushort(vars.character_set_client->number))) + DBUG_RETURN(error_code); + if ((error_code = add_command_operand_ushort(vars.collation_connection->number))) + DBUG_RETURN(error_code); + if ((error_code = add_command_operand_ushort(vars.collation_server->number))) + DBUG_RETURN(error_code); + //timezone and time names + String tzone; + vars.time_zone->get_name()->print(&tzone, system_charset_info); + if ((error_code = add_command_operand_str((const uchar*)tzone.ptr(),tzone.length()))) + DBUG_RETURN(error_code); + if ((error_code = add_command_operand_ushort(vars.lc_time_names->number))) + DBUG_RETURN(error_code); + //transaction isolation + if ((error_code = add_command_operand_uchar(vars.tx_isolation))) + DBUG_RETURN(error_code); + DBUG_RETURN(0); +} + +int xpand_connection::begin_command(uchar command) +{ + if (trans_state == XPAND_TRANS_NONE) + return HA_ERR_INTERNAL_ERROR; + + command_length = 0; + int error_code = 0; + if ((error_code = add_command_operand_uchar(command))) + return error_code; + + if ((error_code = add_command_operand_uchar(trans_state | trans_flags))) + return error_code; + + if ((error_code = add_status_vars())) + return error_code; + + return error_code; +} + +int xpand_connection::send_command() +{ + /* + Please note: + * The transaction state is set before the command is sent because rolling + back a nonexistent transaction is better than leaving a tranaction open + on the cluster. + * The state may have alreadly been STARTED. + * Commit and rollback commands update the transaction state after calling + this function. + * If auto-commit is enabled, the state may also updated after the + response has been processed. We do not clear the auto-commit flag here + because it needs to be sent with each command until the transaction is + committed or rolled back. + */ + trans_state = XPAND_TRANS_STARTED; + + if (simple_command(&xpand_net, + (enum_server_command)XPAND_SERVER_REQUEST, + command_buffer, command_length, TRUE)) + return mysql_errno(&xpand_net); + return 0; +} + +int xpand_connection::read_query_response() +{ + int error_code = 0; + if (xpand_net.methods->read_query_result(&xpand_net)) + error_code = mysql_errno(&xpand_net); + auto_commit_closed(); + return error_code; +} + +bool xpand_connection::has_open_transaction() +{ + return trans_state != XPAND_TRANS_NONE; +} + +int xpand_connection::commit_transaction() +{ + DBUG_ENTER("xpand_connection::commit_transaction"); + if (trans_state == XPAND_TRANS_NONE) + DBUG_RETURN(HA_ERR_INTERNAL_ERROR); + + if (trans_state == XPAND_TRANS_REQUESTED) { + trans_state = XPAND_TRANS_NONE; + trans_flags = XPAND_TRANS_NO_POST_FLAGS; + DBUG_RETURN(0); + } + + int error_code; + if ((error_code = begin_command(XPAND_COMMIT))) + DBUG_RETURN(error_code); + + if ((error_code = send_command())) + DBUG_RETURN(error_code); + + if ((error_code = read_query_response())) + DBUG_RETURN(error_code); + + trans_state = XPAND_TRANS_NONE; + trans_flags = XPAND_TRANS_NO_POST_FLAGS; + DBUG_RETURN(error_code); +} + +int xpand_connection::rollback_transaction() +{ + DBUG_ENTER("xpand_connection::rollback_transaction"); + if (trans_state == XPAND_TRANS_NONE || + trans_state == XPAND_TRANS_REQUESTED) { + trans_state = XPAND_TRANS_NONE; + DBUG_RETURN(0); + } + + int error_code; + if ((error_code = begin_command(XPAND_ROLLBACK))) + DBUG_RETURN(error_code); + + if ((error_code = send_command())) + DBUG_RETURN(error_code); + + if ((error_code = read_query_response())) + DBUG_RETURN(error_code); + + trans_state = XPAND_TRANS_NONE; + trans_flags = XPAND_TRANS_NO_POST_FLAGS; + DBUG_RETURN(error_code); +} + +int xpand_connection::begin_transaction_next() +{ + DBUG_ENTER("xpand_connection::begin_transaction_next"); + if (trans_state != XPAND_TRANS_NONE || + trans_flags != XPAND_TRANS_NO_POST_FLAGS) + DBUG_RETURN(HA_ERR_INTERNAL_ERROR); + + trans_state = XPAND_TRANS_REQUESTED; + DBUG_RETURN(0); +} + +int xpand_connection::new_statement_next() +{ + DBUG_ENTER("xpand_connection::new_statement_next"); + if (trans_state != XPAND_TRANS_STARTED || + trans_flags != XPAND_TRANS_NO_POST_FLAGS) + DBUG_RETURN(HA_ERR_INTERNAL_ERROR); + + trans_state = XPAND_TRANS_NEW_STMT; + DBUG_RETURN(0); +} + +int xpand_connection::rollback_statement_next() +{ + DBUG_ENTER("xpand_connection::rollback_statement_next"); + if (trans_state != XPAND_TRANS_STARTED || + trans_flags != XPAND_TRANS_NO_POST_FLAGS) + DBUG_RETURN(HA_ERR_INTERNAL_ERROR); + + trans_state = XPAND_TRANS_ROLLBACK_STMT; + DBUG_RETURN(0); +} + +void xpand_connection::auto_commit_next() +{ + trans_flags |= XPAND_TRANS_AUTOCOMMIT; +} + +void xpand_connection::auto_commit_closed() +{ + if (trans_flags & XPAND_TRANS_AUTOCOMMIT) { + trans_flags &= ~XPAND_TRANS_AUTOCOMMIT; + trans_state = XPAND_TRANS_NONE; + } +} + +int xpand_connection::run_query(String &stmt) +{ + int error_code = mysql_real_query(&xpand_net, stmt.ptr(), stmt.length()); + if (error_code) + return mysql_errno(&xpand_net); + return error_code; +} + +int xpand_connection::write_row(ulonglong xpand_table_oid, uchar *packed_row, + size_t packed_size, ulonglong *last_insert_id) +{ + int error_code; + command_length = 0; + + // row based commands should not be called with auto commit. + if (trans_flags & XPAND_TRANS_AUTOCOMMIT) + return HA_ERR_INTERNAL_ERROR; + + if ((error_code = begin_command(XPAND_WRITE_ROW))) + return error_code; + + if ((error_code = add_command_operand_ulonglong(xpand_table_oid))) + return error_code; + + if ((error_code = add_command_operand_str(packed_row, packed_size))) + return error_code; + + if ((error_code = send_command())) + return error_code; + + if ((error_code = read_query_response())) { + if (error_code == ER_DUP_ENTRY) + return HA_ERR_FOUND_DUPP_KEY; + return error_code; + } + + *last_insert_id = xpand_net.insert_id; + return error_code; +} + +int xpand_connection::key_update(ulonglong xpand_table_oid, uchar *packed_key, + size_t packed_key_length, + MY_BITMAP *update_set, uchar *packed_new_data, + size_t packed_new_length) +{ + int error_code; + command_length = 0; + + // row based commands should not be called with auto commit. + if (trans_flags & XPAND_TRANS_AUTOCOMMIT) + return HA_ERR_INTERNAL_ERROR; + + if ((error_code = begin_command(XPAND_KEY_UPDATE))) + return error_code; + + if ((error_code = add_command_operand_ulonglong(xpand_table_oid))) + return error_code; + + if ((error_code = add_command_operand_str(packed_key, packed_key_length))) + return error_code; + + if ((error_code = add_command_operand_bitmap(update_set))) + return error_code; + + if ((error_code = add_command_operand_str(packed_new_data, + packed_new_length))) + return error_code; + + if ((error_code = send_command())) + return error_code; + + if ((error_code = read_query_response())) + return error_code; + + return error_code; +} + +int xpand_connection::key_delete(ulonglong xpand_table_oid, + uchar *packed_key, size_t packed_key_length) +{ + int error_code; + command_length = 0; + + // row based commands should not be called with auto commit. + if (trans_flags & XPAND_TRANS_AUTOCOMMIT) + return HA_ERR_INTERNAL_ERROR; + + if ((error_code = begin_command(XPAND_KEY_DELETE))) + return error_code; + + if ((error_code = add_command_operand_ulonglong(xpand_table_oid))) + return error_code; + + if ((error_code = add_command_operand_str(packed_key, packed_key_length))) + return error_code; + + if ((error_code = send_command())) + return error_code; + + if ((error_code = read_query_response())) + return error_code; + + return error_code; +} + +int xpand_connection::key_read(ulonglong xpand_table_oid, uint index, + xpand_lock_mode_t lock_mode, MY_BITMAP *read_set, + uchar *packed_key, ulong packed_key_length, + uchar **rowdata, ulonglong *rowdata_length) +{ + int error_code; + command_length = 0; + + // row based commands should not be called with auto commit. + if (trans_flags & XPAND_TRANS_AUTOCOMMIT) + return HA_ERR_INTERNAL_ERROR; + + if ((error_code = begin_command(XPAND_KEY_READ))) + return error_code; + + if ((error_code = add_command_operand_ulonglong(xpand_table_oid))) + return error_code; + + if ((error_code = add_command_operand_uint(index))) + return error_code; + + if ((error_code = add_command_operand_uchar((uchar)lock_mode))) + return error_code; + + if ((error_code = add_command_operand_bitmap(read_set))) + return error_code; + + if ((error_code = add_command_operand_str(packed_key, packed_key_length))) + return error_code; + + if ((error_code = send_command())) + return error_code; + + ulong packet_length = cli_safe_read(&xpand_net); + if (packet_length == packet_error) + return mysql_errno(&xpand_net); + + uchar *data = xpand_net.net.read_pos; + *rowdata_length = safe_net_field_length_ll(&data, packet_length); + *rowdata = (uchar *)my_malloc(*rowdata_length, MYF(MY_WME)); + memcpy(*rowdata, data, *rowdata_length); + + packet_length = cli_safe_read(&xpand_net); + if (packet_length == packet_error) { + my_free(*rowdata); + *rowdata = NULL; + *rowdata_length = 0; + return mysql_errno(&xpand_net); + } + + return 0; +} + +class xpand_connection_cursor { + struct rowdata { + ulong length; + uchar *data; + }; + + ulong current_row; + ulong last_row; + struct rowdata *rows; + uchar *outstanding_row; // to be freed on next request. + MYSQL *xpand_net; + +public: + ulong buffer_size; + ulonglong scan_refid; + bool eof_reached; + +private: + int cache_row(uchar *rowdata, ulong rowdata_length) + { + DBUG_ENTER("xpand_connection_cursor::cache_row"); + rows[last_row].length = rowdata_length; + rows[last_row].data = (uchar *)my_malloc(rowdata_length, MYF(MY_WME)); + if (!rows[last_row].data) + DBUG_RETURN(HA_ERR_OUT_OF_MEM); + memcpy(rows[last_row].data, rowdata, rowdata_length); + last_row++; + DBUG_RETURN(0); + } + + int load_rows_impl(bool *stmt_completed) + { + DBUG_ENTER("xpand_connection_cursor::load_rows_impl"); + int error_code = 0; + ulong packet_length = cli_safe_read(xpand_net); + if (packet_length == packet_error) { + error_code = mysql_errno(xpand_net); + *stmt_completed = TRUE; + if (error_code == HA_ERR_END_OF_FILE) { + // We have read all rows for query. + eof_reached = TRUE; + DBUG_RETURN(0); + } + DBUG_RETURN(error_code); + } + + uchar *rowdata = xpand_net->net.read_pos; + ulong rowdata_length = (ulong) safe_net_field_length_ll(&rowdata, packet_length); + if (!rowdata_length) { + // We have read all rows in this batch. + DBUG_RETURN(0); + } + + if ((error_code = cache_row(rowdata, rowdata_length))) + DBUG_RETURN(error_code); + + DBUG_RETURN(load_rows_impl(stmt_completed)); + } + +public: + xpand_connection_cursor(MYSQL *xpand_net_, ulong bufsize) + { + DBUG_ENTER("xpand_connection_cursor::xpand_connection_cursor"); + xpand_net = xpand_net_; + eof_reached = FALSE; + current_row = 0; + last_row = 0; + outstanding_row = NULL; + buffer_size = bufsize; + rows = NULL; + DBUG_VOID_RETURN; + } + + ~xpand_connection_cursor() + { + DBUG_ENTER("xpand_connection_cursor::~xpand_connection_cursor"); + if (outstanding_row) + my_free(outstanding_row); + if (rows) { + while (current_row < last_row) + my_free(rows[current_row++].data); + my_free(rows); + } + DBUG_VOID_RETURN; + } + + int load_rows(bool *stmt_completed) + { + DBUG_ENTER("xpand_connection_cursor::load_rows"); + current_row = 0; + last_row = 0; + DBUG_RETURN(load_rows_impl(stmt_completed)); + } + + int initialize(bool *stmt_completed) + { + DBUG_ENTER("xpand_connection_cursor::initialize"); + ulong packet_length = cli_safe_read(xpand_net); + if (packet_length == packet_error) { + *stmt_completed = TRUE; + int error_code = mysql_errno(xpand_net); + my_printf_error(error_code, "Xpand error: %s", MYF(0), + mysql_error(xpand_net)); + DBUG_RETURN(error_code); + } + + unsigned char *pos = xpand_net->net.read_pos; + scan_refid = safe_net_field_length_ll(&pos, packet_length); + + rows = (struct rowdata *)my_malloc(buffer_size * sizeof(struct rowdata), + MYF(MY_WME)); + if (!rows) + DBUG_RETURN(HA_ERR_OUT_OF_MEM); + + DBUG_RETURN(load_rows(stmt_completed)); + } + + uchar *retrieve_row(ulong *rowdata_length) + { + DBUG_ENTER("xpand_connection_cursor::retrieve_row"); + if (outstanding_row) { + my_free(outstanding_row); + outstanding_row = NULL; + } + if (current_row == last_row) + DBUG_RETURN(NULL); + *rowdata_length = rows[current_row].length; + outstanding_row = rows[current_row].data; + current_row++; + DBUG_RETURN(outstanding_row); + } +}; + +int xpand_connection::allocate_cursor(MYSQL *xpand_net, ulong buffer_size, + xpand_connection_cursor **scan) +{ + DBUG_ENTER("xpand_connection::allocate_cursor"); + *scan = new xpand_connection_cursor(xpand_net, buffer_size); + if (!*scan) + DBUG_RETURN(HA_ERR_OUT_OF_MEM); + + bool stmt_completed = FALSE; + int error_code = (*scan)->initialize(&stmt_completed); + if (error_code) { + delete *scan; + *scan = NULL; + } + + if (stmt_completed) + auto_commit_closed(); + + DBUG_RETURN(error_code); +} + +int xpand_connection::scan_table(ulonglong xpand_table_oid, + xpand_lock_mode_t lock_mode, + MY_BITMAP *read_set, ushort row_req, + xpand_connection_cursor **scan, + String* pushdown_cond_sql) +{ + int error_code; + command_length = 0; + + // row based commands should not be called with auto commit. + if (trans_flags & XPAND_TRANS_AUTOCOMMIT) + return HA_ERR_INTERNAL_ERROR; + + if (pushdown_cond_sql != nullptr) { + if ((error_code= begin_command(XPAND_SCAN_TABLE_COND))) + return error_code; + } else { + if ((error_code= begin_command(XPAND_SCAN_TABLE))) + return error_code; + } + + if ((error_code = add_command_operand_ushort(row_req))) + return error_code; + + if ((error_code = add_command_operand_ulonglong(xpand_table_oid))) + return error_code; + + if ((error_code = add_command_operand_uchar((uchar)lock_mode))) + return error_code; + + if ((error_code = add_command_operand_bitmap(read_set))) + return error_code; + + if (pushdown_cond_sql != nullptr) { + if ((error_code= add_command_operand_str( + reinterpret_cast<const uchar*>(pushdown_cond_sql->ptr()), + pushdown_cond_sql->length()))) { + return error_code; + } + } + + if ((error_code = send_command())) + return error_code; + + return allocate_cursor(&xpand_net, row_req, scan); +} + +/** + * @brief + * Sends a command to initiate query scan. + * @details + * Sends a command over mysql protocol connection to initiate an + * arbitrary query using a query text. + * Uses field types, field metadata and nullability to explicitly + * cast result to expected data type. Exploits RBR TABLE_MAP_EVENT + * format + sends SQL text. + * @args + * stmt& Query text to send + * fieldtype* array of byte wide field types of result projection + * null_bits* fields nullability bitmap of result projection + * field_metadata* Field metadata of result projection + * scan_refid id used to reference this scan later + * Used in pushdowns to initiate query scan. + **/ +int xpand_connection::scan_query(String &stmt, uchar *fieldtype, uint fields, + uchar *null_bits, uint null_bits_size, + uchar *field_metadata, + uint field_metadata_size, ushort row_req, + ulonglong *oids, + xpand_connection_cursor **scan) +{ + int error_code; + command_length = 0; + + if ((error_code = begin_command(XPAND_SCAN_QUERY))) + return error_code; + + do { + if ((error_code = add_command_operand_ulonglong(*oids))) + return error_code; + } + while (*oids++); + + if ((error_code = add_command_operand_ushort(row_req))) + return error_code; + + if ((error_code = add_command_operand_str((uchar*)stmt.ptr(), stmt.length()))) + return error_code; + + if ((error_code = add_command_operand_str(fieldtype, fields))) + return error_code; + + if ((error_code = add_command_operand_str(field_metadata, + field_metadata_size))) + return error_code; + + // This variable length string calls for an additional store w/o lcb lenth prefix. + if ((error_code = add_command_operand_vlstr(null_bits, null_bits_size))) + return error_code; + + if ((error_code = send_command())) + return error_code; + + return allocate_cursor(&xpand_net, row_req, scan); +} + +/** + * @brief + * Sends a command to initiate UPDATE. + * @details + * Sends a command over mysql protocol connection to initiate an + * UPDATE query using a query text. + * @args + * stmt& Query text to send + * dbname current working database + * dbname ¤t database name + **/ +int xpand_connection::update_query(String &stmt, LEX_CSTRING &dbname, + ulonglong *oids, ulonglong *affected_rows) +{ + int error_code; + command_length = 0; + + if ((error_code = begin_command(XPAND_UPDATE_QUERY))) + return error_code; + + do { + if ((error_code = add_command_operand_ulonglong(*oids))) + return error_code; + } + while (*oids++); + + if ((error_code = add_command_operand_str((uchar*)dbname.str, dbname.length))) + return error_code; + + if ((error_code = add_command_operand_str((uchar*)stmt.ptr(), stmt.length()))) + return error_code; + + if ((error_code = send_command())) + return error_code; + + error_code = read_query_response(); + if (!error_code) + *affected_rows = xpand_net.affected_rows; + + return error_code; +} + +int xpand_connection::scan_from_key(ulonglong xpand_table_oid, uint index, + xpand_lock_mode_t lock_mode, + enum scan_type scan_dir, + int no_key_cols, bool sorted_scan, + MY_BITMAP *read_set, uchar *packed_key, + ulong packed_key_length, ushort row_req, + xpand_connection_cursor **scan) +{ + int error_code; + command_length = 0; + + // row based commands should not be called with auto commit. + if (trans_flags & XPAND_TRANS_AUTOCOMMIT) + return HA_ERR_INTERNAL_ERROR; + + if ((error_code = begin_command(XPAND_SCAN_FROM_KEY))) + return error_code; + + if ((error_code = add_command_operand_ushort(row_req))) + return error_code; + + if ((error_code = add_command_operand_ulonglong(xpand_table_oid))) + return error_code; + + if ((error_code = add_command_operand_uint(index))) + return error_code; + + if ((error_code = add_command_operand_uchar((uchar)lock_mode))) + return error_code; + + if ((error_code = add_command_operand_uchar(scan_dir))) + return error_code; + + if ((error_code = add_command_operand_uint(no_key_cols))) + return error_code; + + if ((error_code = add_command_operand_uchar(sorted_scan))) + return error_code; + + if ((error_code = add_command_operand_str(packed_key, packed_key_length))) + return error_code; + + if ((error_code = add_command_operand_bitmap(read_set))) + return error_code; + + if ((error_code = send_command())) + return error_code; + + return allocate_cursor(&xpand_net, row_req, scan); +} + +int xpand_connection::scan_next(xpand_connection_cursor *scan, + uchar **rowdata, ulong *rowdata_length) +{ + *rowdata = scan->retrieve_row(rowdata_length); + if (*rowdata) + return 0; + + if (scan->eof_reached) + return HA_ERR_END_OF_FILE; + + int error_code; + command_length = 0; + + if ((error_code = begin_command(XPAND_SCAN_NEXT))) + return error_code; + + // This should not happen as @@xpand_row_buffer has this limit. + if (scan->buffer_size > 65535) + return HA_ERR_INTERNAL_ERROR; + + if ((error_code = add_command_operand_ushort((ushort)scan->buffer_size))) + return error_code; + + if ((error_code = add_command_operand_lcb(scan->scan_refid))) + return error_code; + + if ((error_code = send_command())) + return error_code; + + bool stmt_completed = FALSE; + error_code = scan->load_rows(&stmt_completed); + if (stmt_completed) + auto_commit_closed(); + if (error_code) + return error_code; + + *rowdata = scan->retrieve_row(rowdata_length); + if (!*rowdata) + return HA_ERR_END_OF_FILE; + + return 0; +} + +int xpand_connection::scan_end(xpand_connection_cursor *scan) +{ + int error_code; + command_length = 0; + ulonglong scan_refid = scan->scan_refid; + bool eof_reached = scan->eof_reached; + delete scan; + + if (eof_reached) + return 0; + + if ((error_code = begin_command(XPAND_SCAN_STOP))) + return error_code; + + if ((error_code = add_command_operand_lcb(scan_refid))) + return error_code; + + if ((error_code = send_command())) + return error_code; + + return read_query_response(); +} + +int xpand_connection::populate_table_list(LEX_CSTRING *db, + handlerton::discovered_list *result) +{ + int error_code = 0; + String stmt; + stmt.append("SHOW FULL TABLES FROM "); + stmt.append(db); + stmt.append(" WHERE table_type = 'BASE TABLE'"); + + if (mysql_real_query(&xpand_net, stmt.c_ptr(), stmt.length())) { + int error_code = mysql_errno(&xpand_net); + if (error_code == ER_BAD_DB_ERROR) + return 0; + else + return error_code; + } + + MYSQL_RES *results = mysql_store_result(&xpand_net); + if (mysql_num_fields(results) != 2) { + error_code = HA_ERR_CORRUPT_EVENT; + goto error; + } + + MYSQL_ROW row; + while((row = mysql_fetch_row(results))) + result->add_table(row[0], strlen(row[0])); + +error: + mysql_free_result(results); + return error_code; +} + + +/* + Given a table name, find its OID in the Clustrix, and save it in TABLE_SHARE + + @param db Database name + @param name Table name + @param oid OUT Return the OID here + @param share INOUT If not NULL and the share has ha_share pointer, also + update Xpand_share::xpand_table_oid. + + @return + 0 - OK + error code if an error occurred +*/ + +int xpand_connection::get_table_oid(const char *db, size_t db_len, + const char *name, size_t name_len, + ulonglong *oid, TABLE_SHARE *share) +{ + MYSQL_ROW row; + int error_code = 0; + MYSQL_RES *results_oid = NULL; + String get_oid; + DBUG_ENTER("xpand_connection::get_table_oid"); + + /* get oid */ + get_oid.append("select r.table " + "from system.databases d " + " inner join ""system.relations r on d.db = r.db " + "where d.name = '"); + get_oid.append(db, db_len); + get_oid.append("' and r.name = '"); + get_oid.append(name, name_len); + get_oid.append("'"); + + if (mysql_real_query(&xpand_net, get_oid.c_ptr(), get_oid.length())) { + if ((error_code = mysql_errno(&xpand_net))) { + DBUG_PRINT("mysql_real_query returns ", ("%d", error_code)); + error_code = HA_ERR_NO_SUCH_TABLE; + goto error; + } + } + + results_oid = mysql_store_result(&xpand_net); + DBUG_PRINT("oid results", + ("rows: %llu, fields: %u", mysql_num_rows(results_oid), + mysql_num_fields(results_oid))); + + if (mysql_num_rows(results_oid) != 1) { + error_code = HA_ERR_NO_SUCH_TABLE; + goto error; + } + + if ((row = mysql_fetch_row(results_oid))) { + DBUG_PRINT("row", ("%s", row[0])); + *oid = strtoull((const char *)row[0], NULL, 10); + } else { + error_code = HA_ERR_NO_SUCH_TABLE; + goto error; + } + +error: + if (results_oid) + mysql_free_result(results_oid); + + DBUG_RETURN(error_code); +} + + +/* + Given a table name, fetch table definition from Clustrix and fill the TABLE_SHARE + object with details about field, indexes, etc. +*/ +int xpand_connection::discover_table_details(LEX_CSTRING *db, LEX_CSTRING *name, + THD *thd, TABLE_SHARE *share) +{ + DBUG_ENTER("xpand_connection::discover_table_details"); + int error_code = 0; + MYSQL_RES *results_create = NULL; + MYSQL_ROW row; + String show; + ulonglong oid = 0; + Xpand_share *cs; + + if ((error_code = xpand_connection::get_table_oid(db->str, db->length, + name->str, name->length, + &oid, share))) + goto error; + + if (!share->ha_share) + share->ha_share= new Xpand_share; + cs= static_cast<Xpand_share*>(share->ha_share); + cs->xpand_table_oid = oid; + + /* get show create statement */ + show.append("show simple create table "); + show.append(db); + show.append("."); + show.append("`"); + show.append(name); + show.append("`"); + if (mysql_real_query(&xpand_net, show.c_ptr(), show.length())) { + if ((error_code = mysql_errno(&xpand_net))) { + DBUG_PRINT("mysql_real_query returns ", ("%d", error_code)); + error_code = HA_ERR_NO_SUCH_TABLE; + goto error; + } + } + + results_create = mysql_store_result(&xpand_net); + DBUG_PRINT("show table results", + ("rows: %llu, fields: %u", mysql_num_rows(results_create), + mysql_num_fields(results_create))); + + if (mysql_num_rows(results_create) != 1) { + error_code = HA_ERR_NO_SUCH_TABLE; + goto error; + } + + if (mysql_num_fields(results_create) != 2) { + error_code = HA_ERR_CORRUPT_EVENT; + goto error; + } + + while((row = mysql_fetch_row(results_create))) { + DBUG_PRINT("row", ("%s - %s", row[0], row[1])); + error_code = share->init_from_sql_statement_string(thd, false, row[1], + strlen(row[1])); + } + + cs->rediscover_table = false; +error: + if (results_create) + mysql_free_result(results_create); + DBUG_RETURN(error_code); +} + +#define COMMAND_BUFFER_SIZE_INCREMENT 1024 +#define COMMAND_BUFFER_SIZE_INCREMENT_BITS 10 +int xpand_connection::expand_command_buffer(size_t add_length) +{ + size_t expanded_length; + + if (command_buffer_length >= command_length + add_length) + return 0; + + expanded_length = command_buffer_length + + ((add_length >> COMMAND_BUFFER_SIZE_INCREMENT_BITS) + << COMMAND_BUFFER_SIZE_INCREMENT_BITS) + + COMMAND_BUFFER_SIZE_INCREMENT; + + if (!command_buffer_length) + command_buffer = (uchar *) my_malloc(expanded_length, MYF(MY_WME)); + else + command_buffer = (uchar *) my_realloc(command_buffer, expanded_length, + MYF(MY_WME)); + if (!command_buffer) + return HA_ERR_OUT_OF_MEM; + + command_buffer_length = expanded_length; + + return 0; +} + +int xpand_connection::add_command_operand_uchar(uchar value) +{ + int error_code = expand_command_buffer(sizeof(value)); + if (error_code) + return error_code; + + memcpy(command_buffer + command_length, &value, sizeof(value)); + command_length += sizeof(value); + + return 0; +} + +int xpand_connection::add_command_operand_ushort(ushort value) +{ + ushort be_value = htobe16(value); + int error_code = expand_command_buffer(sizeof(be_value)); + if (error_code) + return error_code; + + memcpy(command_buffer + command_length, &be_value, sizeof(be_value)); + command_length += sizeof(be_value); + return 0; +} + +int xpand_connection::add_command_operand_uint(uint value) +{ + uint be_value = htobe32(value); + int error_code = expand_command_buffer(sizeof(be_value)); + if (error_code) + return error_code; + + memcpy(command_buffer + command_length, &be_value, sizeof(be_value)); + command_length += sizeof(be_value); + return 0; +} + +int xpand_connection::add_command_operand_ulonglong(ulonglong value) +{ + ulonglong be_value = htobe64(value); + int error_code = expand_command_buffer(sizeof(be_value)); + if (error_code) + return error_code; + + memcpy(command_buffer + command_length, &be_value, sizeof(be_value)); + command_length += sizeof(be_value); + return 0; +} + +int xpand_connection::add_command_operand_lcb(ulonglong value) +{ + int len = net_length_size(value); + int error_code = expand_command_buffer(len); + if (error_code) + return error_code; + + net_store_length(command_buffer + command_length, value); + command_length += len; + return 0; +} + +int xpand_connection::add_command_operand_str(const uchar *str, + size_t str_length) +{ + int error_code = add_command_operand_lcb(str_length); + if (error_code) + return error_code; + + if (!str_length) + return 0; + + error_code = expand_command_buffer(str_length); + if (error_code) + return error_code; + + memcpy(command_buffer + command_length, str, str_length); + command_length += str_length; + return 0; +} + +/** + * @brief + * Puts variable length string into the buffer. + * @details + * Puts into the buffer variable length string the size + * of which is send by other means. For details see + * MDB Client/Server Protocol. + * @args + * str - string to send + * str_length - size + **/ +int xpand_connection::add_command_operand_vlstr(const uchar *str, + size_t str_length) +{ + int error_code = expand_command_buffer(str_length); + if (error_code) + return error_code; + + memcpy(command_buffer + command_length, str, str_length); + command_length += str_length; + return 0; +} + +int xpand_connection::add_command_operand_lex_string(LEX_CSTRING str) +{ + return add_command_operand_str((const uchar *)str.str, str.length); +} + +int xpand_connection::add_command_operand_bitmap(MY_BITMAP *bitmap) +{ + int error_code = add_command_operand_lcb(bitmap->n_bits); + if (error_code) + return error_code; + + int no_bytes = no_bytes_in_map(bitmap); + error_code = expand_command_buffer(no_bytes); + if (error_code) + return error_code; + + memcpy(command_buffer + command_length, bitmap->bitmap, no_bytes); + command_length += no_bytes; + return 0; +} + +/**************************************************************************** +** Class xpand_host_list +****************************************************************************/ + +int xpand_host_list::fill(const char *hosts) +{ + strtok_buf = my_strdup(hosts, MYF(MY_WME)); + if (!strtok_buf) { + return HA_ERR_OUT_OF_MEM; + } + + const char *sep = ",; "; + //parse into array + int i = 0; + char *cursor = NULL; + char *token = NULL; + for (token = strtok_r(strtok_buf, sep, &cursor); + token && i < max_host_count; + token = strtok_r(NULL, sep, &cursor)) { + this->hosts[i] = token; + i++; + } + + //host count out of range + if (i == 0 || token) { + my_free(strtok_buf); + return ER_BAD_HOST_ERROR; + } + hosts_len = i; + + return 0; +} + +void xpand_host_list::empty() +{ + my_free(strtok_buf); + strtok_buf = NULL; + hosts_len = 0; +} diff --git a/storage/xpand/xpand_connection.h b/storage/xpand/xpand_connection.h new file mode 100644 index 00000000000..beeaf8a95f4 --- /dev/null +++ b/storage/xpand/xpand_connection.h @@ -0,0 +1,146 @@ +/***************************************************************************** +Copyright (c) 2019, 2020, MariaDB Corporation. +*****************************************************************************/ + +#ifndef _xpand_connection_h +#define _xpand_connection_h + +#ifdef USE_PRAGMA_INTERFACE +#pragma interface /* gcc class implementation */ +#endif + +#define MYSQL_SERVER 1 +#include "my_global.h" +#include "m_string.h" +#include "mysql.h" +#include "sql_common.h" +#include "my_base.h" +#include "mysqld_error.h" +#include "my_bitmap.h" +#include "handler.h" + +#define XPAND_SERVER_REQUEST 30 + +enum xpand_lock_mode_t { + XPAND_NO_LOCKS, + XPAND_SHARED, + XPAND_EXCLUSIVE, +}; + +enum xpand_balance_algorithm_enum { + XPAND_BALANCE_FIRST, + XPAND_BALANCE_ROUND_ROBIN +}; + +class xpand_connection_cursor; +class xpand_connection +{ +private: + MYSQL xpand_net; + uchar *command_buffer; + size_t command_buffer_length; + size_t command_length; + + int trans_state; + int trans_flags; + int allocate_cursor(MYSQL *xpand_net, ulong buffer_size, + xpand_connection_cursor **scan); +public: + xpand_connection(); + ~xpand_connection(); + + inline bool is_connected() + { + return xpand_net.net.vio; + } + int connect(); + int connect_direct(char *host); + void disconnect(bool is_destructor = FALSE); + + bool has_open_transaction(); + int commit_transaction(); + int rollback_transaction(); + int begin_transaction_next(); + int new_statement_next(); + int rollback_statement_next(); // also starts new statement + void auto_commit_next(); + void auto_commit_closed(); + + int run_query(String &stmt); + int write_row(ulonglong xpand_table_oid, uchar *packed_row, + size_t packed_size, ulonglong *last_insert_id); + int key_update(ulonglong xpand_table_oid, + uchar *packed_key, size_t packed_key_length, + MY_BITMAP *update_set, + uchar *packed_new_data, size_t packed_new_length); + int key_delete(ulonglong xpand_table_oid, + uchar *packed_key, size_t packed_key_length); + int key_read(ulonglong xpand_table_oid, uint index, + xpand_lock_mode_t lock_mode, MY_BITMAP *read_set, + uchar *packed_key, ulong packed_key_length, uchar **rowdata, + ulonglong *rowdata_length); + enum sort_order {SORT_NONE = 0, SORT_ASC = 1, SORT_DESC = 2}; + enum scan_type { + READ_KEY_OR_NEXT, /* rows with key and greater */ + READ_KEY_OR_PREV, /* rows with key and less. */ + READ_AFTER_KEY, /* rows with keys greater than key */ + READ_BEFORE_KEY, /* rows with keys less than key */ + READ_FROM_START, /* rows with forwards from first key. */ + READ_FROM_LAST, /* rows with backwards from last key. */ + }; + int scan_table(ulonglong xpand_table_oid, + xpand_lock_mode_t lock_mode, + MY_BITMAP *read_set, ushort row_req, + xpand_connection_cursor **scan, String* pushdown_cond_sql); + int scan_query(String &stmt, uchar *fieldtype, uint fields, uchar *null_bits, + uint null_bits_size, uchar *field_metadata, + uint field_metadata_size, ushort row_req, ulonglong *oids, + xpand_connection_cursor **scan); + int update_query(String &stmt, LEX_CSTRING &dbname, ulonglong *oids, + ulonglong *affected_rows); + int scan_from_key(ulonglong xpand_table_oid, uint index, + xpand_lock_mode_t lock_mode, + enum scan_type scan_dir, int no_key_cols, bool sorted_scan, + MY_BITMAP *read_set, uchar *packed_key, + ulong packed_key_length, ushort row_req, + xpand_connection_cursor **scan); + int scan_next(xpand_connection_cursor *scan, uchar **rowdata, + ulong *rowdata_length); + int scan_end(xpand_connection_cursor *scan); + + int populate_table_list(LEX_CSTRING *db, handlerton::discovered_list *result); + int get_table_oid(const char *db, size_t db_len, const char *name, + size_t name_len, ulonglong *oid, TABLE_SHARE *share); + int discover_table_details(LEX_CSTRING *db, LEX_CSTRING *name, THD *thd, + TABLE_SHARE *share); + +private: + int expand_command_buffer(size_t add_length); + int add_command_operand_uchar(uchar value); + int add_command_operand_ushort(ushort value); + int add_command_operand_uint(uint value); + int add_command_operand_ulonglong(ulonglong value); + int add_command_operand_lcb(ulonglong value); + int add_command_operand_str(const uchar *str, size_t length); + int add_command_operand_vlstr(const uchar *str, size_t length); + int add_command_operand_lex_string(LEX_CSTRING str); + int add_command_operand_bitmap(MY_BITMAP *bitmap); + int add_status_vars(); + int begin_command(uchar command); + int send_command(); + int read_query_response(); +}; + +static const int max_host_count = 128; +class xpand_host_list { +private: + char *strtok_buf; +public: + int hosts_len; + char *hosts[max_host_count]; + + int fill(const char *hosts); + void empty(); +}; + +#endif // _xpand_connection_h |