summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSergei Petrunia <psergey@askmonty.org>2020-03-10 11:23:43 +0300
committerSergei Petrunia <psergey@askmonty.org>2020-03-10 11:23:43 +0300
commit181da13191dd6c6bdcb57e8220b46bed1c1492d4 (patch)
treea8175512bcf3d14edbfd9b8c5637afff31d56d2b
parente0e5d8c5942a1eb0b0ae05b6296286193073e571 (diff)
parentcc5f54819da673cce2f56fa7b592512d0897c825 (diff)
downloadmariadb-git-bb-10.5-xpand.tar.gz
Merge XPand Storage Engine (rebased)bb-10.5-xpand
-rw-r--r--mysql-test/suite/xpand/basics.result107
-rw-r--r--mysql-test/suite/xpand/basics.test100
-rw-r--r--mysql-test/suite/xpand/my.cnf4
-rw-r--r--mysql-test/suite/xpand/pushdown_conditions.result47
-rw-r--r--mysql-test/suite/xpand/pushdown_conditions.test34
-rw-r--r--mysql-test/suite/xpand/suite.opt4
-rw-r--r--mysql-test/suite/xpand/update.result51
-rw-r--r--mysql-test/suite/xpand/update.test44
-rw-r--r--mysql-test/suite/xpand/upsert.result72
-rw-r--r--mysql-test/suite/xpand/upsert.test49
-rw-r--r--storage/federatedx/federatedx_pushdown.cc10
-rw-r--r--storage/xpand/CMakeLists.txt6
-rw-r--r--storage/xpand/ha_xpand.cc1588
-rw-r--r--storage/xpand/ha_xpand.h147
-rw-r--r--storage/xpand/ha_xpand_pushdown.cc484
-rw-r--r--storage/xpand/ha_xpand_pushdown.h84
-rw-r--r--storage/xpand/xpand_connection.cc1357
-rw-r--r--storage/xpand/xpand_connection.h146
18 files changed, 4334 insertions, 0 deletions
diff --git a/mysql-test/suite/xpand/basics.result b/mysql-test/suite/xpand/basics.result
new file mode 100644
index 00000000000..e21ce2c574e
--- /dev/null
+++ b/mysql-test/suite/xpand/basics.result
@@ -0,0 +1,107 @@
+CREATE DATABASE xpd;
+USE xpd;
+DROP TABLE IF EXISTS cx1, t1, t2;
+CREATE TABLE cx1(i BIGINT)ENGINE=xpand;
+CREATE TABLE cx1(i BIGINT)ENGINE=xpand;
+ERROR 42S01: Table 'cx1' already exists
+INSERT INTO cx1 VALUES (42);
+SELECT * FROM cx1;
+i
+42
+DROP TABLE cx1;
+SHOW CREATE TABLE cx1;
+ERROR 42S02: Table 'xpd.cx1' doesn't exist
+DROP TABLE IF EXISTS intandtext;
+Warnings:
+Note 1051 Unknown table 'xpd.intandtext'
+CREATE TABLE intandtext(i bigint, t text)ENGINE=xpand;
+INSERT INTO intandtext VALUES(10, 'someqwqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqq');
+SELECT i,t FROM intandtext;
+i t
+10 someqwqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqq
+EXPLAIN SELECT i,t FROM intandtext;
+id select_type table type possible_keys key key_len ref rows Extra
+1 PUSHED SELECT NULL NULL NULL NULL NULL NULL NULL NULL
+SET @@optimizer_switch='derived_merge=OFF';
+SET xpand_select_handler=OFF;
+SELECT i,t FROM (SELECT i,t FROM intandtext) t;
+i t
+10 someqwqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqq
+EXPLAIN SELECT i,t FROM (SELECT i,t FROM intandtext) t;
+id select_type table type possible_keys key key_len ref rows Extra
+1 PRIMARY <derived2> ALL NULL NULL NULL NULL 10000
+2 PUSHED DERIVED NULL NULL NULL NULL NULL NULL NULL NULL
+SET xpand_derived_handler=OFF;
+SELECT i,t FROM intandtext;
+i t
+10 someqwqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqq
+SELECT i,t FROM (SELECT i,t FROM intandtext) t;
+i t
+10 someqwqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqq
+EXPLAIN SELECT i,t FROM (SELECT i,t FROM intandtext) t;
+id select_type table type possible_keys key key_len ref rows Extra
+1 PRIMARY <derived2> ALL NULL NULL NULL NULL 10000
+2 DERIVED intandtext ALL NULL NULL NULL NULL 10000
+DROP TABLE intandtext;
+set
+optimizer_switch=default,
+xpand_derived_handler= default,
+xpand_select_handler=default;
+#
+# CLX-77: INSERT ... SELECT returns rows to the client instead of inserting
+#
+drop table if exists t1,t2;
+create table t1 (a int) engine=xpand;
+insert into t1 values (1);
+select a into @var from t1;
+select @var;
+@var
+1
+# This must not emit output to the client:
+select a into outfile 'tmpfile1' from t1;
+create table t2 (a int) engine=myisam;
+insert into t2 select * from t1;
+select * from t2;
+a
+1
+drop table t1,t2;
+#
+# CLX-55: Prepared statement support:
+# Implement "Direct Update" by printing the statement
+#
+create table t1 (a int primary key, b int) engine=xpand;
+insert into t1 values (1,1),(2,2),(3,3);
+prepare s from 'update t1 set b=b+? where a=?';
+execute s using 10000, 2;
+select * from t1;
+a b
+1 1
+2 10002
+3 3
+drop table t1;
+#
+# CLX-80: ALTER TABLE t ENGINE=CLUSTRIX fails with an error
+#
+create table t1 (a int) engine=myisam;
+insert into t1 values (1),(2),(3);
+alter table t1 engine=xpand;
+select * from t1;
+a
+1
+2
+3
+show create table t1;
+Table Create Table
+t1 CREATE TABLE `t1` (
+ `a` int(11) DEFAULT NULL
+) ENGINE=XPAND DEFAULT CHARSET=utf8
+# Try a RENAME TABLE too since the patch touches the code
+alter table t1 rename t2;
+show create table t2;
+Table Create Table
+t2 CREATE TABLE `t2` (
+ `a` int(11) DEFAULT NULL
+) ENGINE=XPAND DEFAULT CHARSET=utf8
+drop table t2;
+USE test;
+DROP DATABASE xpd;
diff --git a/mysql-test/suite/xpand/basics.test b/mysql-test/suite/xpand/basics.test
new file mode 100644
index 00000000000..41f9a457da9
--- /dev/null
+++ b/mysql-test/suite/xpand/basics.test
@@ -0,0 +1,100 @@
+CREATE DATABASE xpd;
+USE xpd;
+
+--disable_warnings
+DROP TABLE IF EXISTS cx1, t1, t2;
+--enable_warnings
+
+CREATE TABLE cx1(i BIGINT)ENGINE=xpand;
+--error ER_TABLE_EXISTS_ERROR
+CREATE TABLE cx1(i BIGINT)ENGINE=xpand;
+
+INSERT INTO cx1 VALUES (42);
+
+SELECT * FROM cx1;
+
+DROP TABLE cx1;
+
+--error ER_NO_SUCH_TABLE
+SHOW CREATE TABLE cx1;
+
+DROP TABLE IF EXISTS intandtext;
+CREATE TABLE intandtext(i bigint, t text)ENGINE=xpand;
+INSERT INTO intandtext VALUES(10, 'someqwqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqq');
+
+SELECT i,t FROM intandtext;
+EXPLAIN SELECT i,t FROM intandtext;
+
+SET @@optimizer_switch='derived_merge=OFF';
+SET xpand_select_handler=OFF;
+SELECT i,t FROM (SELECT i,t FROM intandtext) t;
+EXPLAIN SELECT i,t FROM (SELECT i,t FROM intandtext) t;
+
+SET xpand_derived_handler=OFF;
+SELECT i,t FROM intandtext;
+SELECT i,t FROM (SELECT i,t FROM intandtext) t;
+EXPLAIN SELECT i,t FROM (SELECT i,t FROM intandtext) t;
+
+DROP TABLE intandtext;
+
+set
+ optimizer_switch=default,
+ xpand_derived_handler= default,
+ xpand_select_handler=default;
+
+--echo #
+--echo # CLX-77: INSERT ... SELECT returns rows to the client instead of inserting
+--echo #
+--disable_warnings
+drop table if exists t1,t2;
+--enable_warnings
+
+create table t1 (a int) engine=xpand;
+insert into t1 values (1);
+
+select a into @var from t1;
+select @var;
+
+--echo # This must not emit output to the client:
+select a into outfile 'tmpfile1' from t1;
+let $file=`select concat(@@datadir,'/xpd/tmpfile1')`;
+
+--remove_file $file
+
+create table t2 (a int) engine=myisam;
+insert into t2 select * from t1;
+select * from t2;
+
+drop table t1,t2;
+
+--echo #
+--echo # CLX-55: Prepared statement support:
+--echo # Implement "Direct Update" by printing the statement
+--echo #
+create table t1 (a int primary key, b int) engine=xpand;
+insert into t1 values (1,1),(2,2),(3,3);
+
+prepare s from 'update t1 set b=b+? where a=?';
+execute s using 10000, 2;
+--sorted_result
+select * from t1;
+drop table t1;
+
+--echo #
+--echo # CLX-80: ALTER TABLE t ENGINE=CLUSTRIX fails with an error
+--echo #
+create table t1 (a int) engine=myisam;
+insert into t1 values (1),(2),(3);
+alter table t1 engine=xpand;
+--sorted_result
+select * from t1;
+show create table t1;
+
+--echo # Try a RENAME TABLE too since the patch touches the code
+alter table t1 rename t2;
+show create table t2;
+
+drop table t2;
+
+USE test;
+DROP DATABASE xpd;
diff --git a/mysql-test/suite/xpand/my.cnf b/mysql-test/suite/xpand/my.cnf
new file mode 100644
index 00000000000..8105041b85c
--- /dev/null
+++ b/mysql-test/suite/xpand/my.cnf
@@ -0,0 +1,4 @@
+!include include/default_my.cnf
+
+[mysqld.1]
+socket= /tmp/mysqld42.sock
diff --git a/mysql-test/suite/xpand/pushdown_conditions.result b/mysql-test/suite/xpand/pushdown_conditions.result
new file mode 100644
index 00000000000..3120000e56a
--- /dev/null
+++ b/mysql-test/suite/xpand/pushdown_conditions.result
@@ -0,0 +1,47 @@
+CREATE DATABASE xpd;
+USE xpd;
+DROP TABLE IF EXISTS cx1;
+CREATE TABLE cx1(i BIGINT, i2 BIGINT, t TEXT)ENGINE=xpand;
+INSERT INTO cx1 VALUES (41, 43, 'some1'), (42, 42, 'some2'), (43, 41, 'some3');
+SELECT * FROM cx1 ORDER BY i;
+i i2 t
+41 43 some1
+42 42 some2
+43 41 some3
+SET xpand_select_handler=OFF;
+SELECT * FROM cx1 WHERE i>41 AND i2>41;
+i i2 t
+42 42 some2
+EXPLAIN SELECT * FROM cx1 WHERE i>41 AND i2>41;
+id select_type table type possible_keys key key_len ref rows Extra
+1 SIMPLE cx1 ALL NULL NULL NULL NULL 10000 Using where with pushed condition
+SELECT * FROM cx1 WHERE i>41 AND i2>41 AND t='some2';
+i i2 t
+42 42 some2
+EXPLAIN SELECT * FROM cx1 WHERE i>41 AND i2>41 AND t='some2';
+id select_type table type possible_keys key key_len ref rows Extra
+1 SIMPLE cx1 ALL NULL NULL NULL NULL 10000 Using where with pushed condition
+SELECT * FROM cx1 WHERE i>i2+1;
+i i2 t
+43 41 some3
+EXPLAIN SELECT * FROM cx1 WHERE i>i2+1;
+id select_type table type possible_keys key key_len ref rows Extra
+1 SIMPLE cx1 ALL NULL NULL NULL NULL 10000 Using where with pushed condition
+SET @@optimizer_switch='derived_merge=OFF';
+SELECT * FROM (SELECT * FROM cx1 WHERE i>i2+1) a1 ORDER BY i;
+i i2 t
+43 41 some3
+EXPLAIN SELECT * FROM (SELECT * FROM cx1 WHERE i>i2+1) a1 ORDER BY i;
+id select_type table type possible_keys key key_len ref rows Extra
+1 PRIMARY <derived2> ALL NULL NULL NULL NULL 10000 Using filesort
+2 PUSHED DERIVED NULL NULL NULL NULL NULL NULL NULL NULL
+SET xpand_derived_handler=OFF;
+SELECT * FROM (SELECT * FROM cx1 WHERE i>i2+1) a1 ORDER BY i;
+i i2 t
+43 41 some3
+EXPLAIN SELECT * FROM (SELECT * FROM cx1 WHERE i>i2+1) a1 ORDER BY i;
+id select_type table type possible_keys key key_len ref rows Extra
+1 PRIMARY <derived2> ALL NULL NULL NULL NULL 10000 Using filesort
+2 DERIVED cx1 ALL NULL NULL NULL NULL 10000 Using where with pushed condition
+USE test;
+DROP DATABASE xpd;
diff --git a/mysql-test/suite/xpand/pushdown_conditions.test b/mysql-test/suite/xpand/pushdown_conditions.test
new file mode 100644
index 00000000000..9d8eb1f3bf5
--- /dev/null
+++ b/mysql-test/suite/xpand/pushdown_conditions.test
@@ -0,0 +1,34 @@
+CREATE DATABASE xpd;
+USE xpd;
+
+--disable_warnings
+DROP TABLE IF EXISTS cx1;
+--enable_warnings
+CREATE TABLE cx1(i BIGINT, i2 BIGINT, t TEXT)ENGINE=xpand;
+
+INSERT INTO cx1 VALUES (41, 43, 'some1'), (42, 42, 'some2'), (43, 41, 'some3');
+
+SELECT * FROM cx1 ORDER BY i;
+SET xpand_select_handler=OFF;
+
+SELECT * FROM cx1 WHERE i>41 AND i2>41;
+EXPLAIN SELECT * FROM cx1 WHERE i>41 AND i2>41;
+SELECT * FROM cx1 WHERE i>41 AND i2>41 AND t='some2';
+EXPLAIN SELECT * FROM cx1 WHERE i>41 AND i2>41 AND t='some2';
+SELECT * FROM cx1 WHERE i>i2+1;
+EXPLAIN SELECT * FROM cx1 WHERE i>i2+1;
+
+# The plugin doesn't use pushdown conditions for DH as of 10.5.1
+# but it is worth to test memory leaks.
+SET @@optimizer_switch='derived_merge=OFF';
+SELECT * FROM (SELECT * FROM cx1 WHERE i>i2+1) a1 ORDER BY i;
+EXPLAIN SELECT * FROM (SELECT * FROM cx1 WHERE i>i2+1) a1 ORDER BY i;
+SET xpand_derived_handler=OFF;
+SELECT * FROM (SELECT * FROM cx1 WHERE i>i2+1) a1 ORDER BY i;
+EXPLAIN SELECT * FROM (SELECT * FROM cx1 WHERE i>i2+1) a1 ORDER BY i;
+
+# SELECT * FROM (SELECT i FROM cx1 WHERE i=42)a1,(SELECT i FROM cx1 WHERE i =42)a2 WHERE a1.i=a2.i;
+# EXPLAIN SELECT * FROM (SELECT i FROM cx1 WHERE i=42)a1,(SELECT i FROM cx1 WHERE i =42)a2 WHERE a1.i=a2.i;
+
+USE test;
+DROP DATABASE xpd;
diff --git a/mysql-test/suite/xpand/suite.opt b/mysql-test/suite/xpand/suite.opt
new file mode 100644
index 00000000000..1e15cb158ae
--- /dev/null
+++ b/mysql-test/suite/xpand/suite.opt
@@ -0,0 +1,4 @@
+--plugin-load=xpand=ha_xpand.so
+--core-file
+--xpand_port=3306
+--plugin-maturity=unknown
diff --git a/mysql-test/suite/xpand/update.result b/mysql-test/suite/xpand/update.result
new file mode 100644
index 00000000000..9fa63f834f1
--- /dev/null
+++ b/mysql-test/suite/xpand/update.result
@@ -0,0 +1,51 @@
+CREATE DATABASE IF NOT EXISTS `db1`;
+connect con1,localhost,root,,test;
+connection con1;
+USE `db1`;
+DROP TABLE IF EXISTS `t1`;
+CREATE TABLE `t1`(i BIGINT, t TEXT)ENGINE=xpand;
+show create table t1;
+Table Create Table
+t1 CREATE TABLE `t1` (
+ `i` bigint(20) DEFAULT NULL,
+ `t` text DEFAULT NULL
+) ENGINE=XPAND DEFAULT CHARSET=utf8
+set character_set_client=utf8;
+set collation_connection=utf8_bin;
+set character_set_results=utf8;
+INSERT INTO `t1` (i, t) VALUES (42, 'один');
+INSERT INTO `t1` (i, t) VALUES (42, 'ноль');
+SELECT * FROM `t1` ORDER BY `i` DESC, `t` DESC;
+i t
+42 один
+42 ноль
+# examine the data on the backend:
+# This should show that the SELECT has been pushed to the backend:
+explain
+select i, hex(t) from t1;
+id select_type table type possible_keys key key_len ref rows Extra
+1 PUSHED SELECT NULL NULL NULL NULL NULL NULL NULL NULL
+select i, hex(t) from t1;
+i hex(t)
+42 D0BDD0BED0BBD18C
+42 D0BED0B4D0B8D0BD
+# The above should match:
+select hex('один') as row1, hex('ноль') as row2;
+row1 row2
+D0BED0B4D0B8D0BD D0BDD0BED0BBD18C
+UPDATE `t1` SET i=i+1,t='два' WHERE t='один';
+SELECT * FROM `t1` ORDER BY `i` DESC, `t` DESC;
+i t
+43 два
+42 ноль
+USE test;
+UPDATE `db1`.`t1` SET i=i+1,t='три' WHERE t='два';
+SELECT * FROM `db1`.`t1` ORDER BY `i` DESC, `t` DESC;
+i t
+44 три
+42 ноль
+disconnect con1;
+connection default;
+DROP TABLE `db1`.`t1`;
+USE test;
+DROP DATABASE `db1`;
diff --git a/mysql-test/suite/xpand/update.test b/mysql-test/suite/xpand/update.test
new file mode 100644
index 00000000000..eb920fb4324
--- /dev/null
+++ b/mysql-test/suite/xpand/update.test
@@ -0,0 +1,44 @@
+CREATE DATABASE IF NOT EXISTS `db1`;
+
+# Do the test in another connection so that we don't have to clean up
+connect (con1,localhost,root,,test);
+connection con1;
+
+USE `db1`;
+--disable_warnings
+DROP TABLE IF EXISTS `t1`;
+--enable_warnings
+
+CREATE TABLE `t1`(i BIGINT, t TEXT)ENGINE=xpand;
+show create table t1;
+
+set character_set_client=utf8;
+set collation_connection=utf8_bin;
+set character_set_results=utf8;
+INSERT INTO `t1` (i, t) VALUES (42, 'один');
+INSERT INTO `t1` (i, t) VALUES (42, 'ноль');
+SELECT * FROM `t1` ORDER BY `i` DESC, `t` DESC;
+
+--echo # examine the data on the backend:
+--echo # This should show that the SELECT has been pushed to the backend:
+explain
+select i, hex(t) from t1;
+--sorted_result
+select i, hex(t) from t1;
+--echo # The above should match:
+select hex('один') as row1, hex('ноль') as row2;
+
+UPDATE `t1` SET i=i+1,t='два' WHERE t='один';
+SELECT * FROM `t1` ORDER BY `i` DESC, `t` DESC;
+
+USE test;
+UPDATE `db1`.`t1` SET i=i+1,t='три' WHERE t='два';
+SELECT * FROM `db1`.`t1` ORDER BY `i` DESC, `t` DESC;
+
+disconnect con1;
+connection default;
+
+DROP TABLE `db1`.`t1`;
+
+USE test;
+DROP DATABASE `db1`;
diff --git a/mysql-test/suite/xpand/upsert.result b/mysql-test/suite/xpand/upsert.result
new file mode 100644
index 00000000000..9cbad652dd4
--- /dev/null
+++ b/mysql-test/suite/xpand/upsert.result
@@ -0,0 +1,72 @@
+CREATE DATABASE IF NOT EXISTS `db1`;
+USE `db1`;
+DROP TABLE IF EXISTS `ins_duplicate`;
+Warnings:
+Note 1051 Unknown table 'db1.ins_duplicate'
+CREATE TABLE `ins_duplicate`(`id` INT PRIMARY KEY, `animal` VARCHAR(30)) ENGINE=xpand;
+INSERT INTO `ins_duplicate` VALUES (1,'Aardvark'), (2,'Cheetah'), (3,'Zebra');
+SELECT * FROM `ins_duplicate` ORDER BY `id`;
+id animal
+1 Aardvark
+2 Cheetah
+3 Zebra
+INSERT INTO ins_duplicate VALUES (1,'Antelope');
+ERROR 23000: Can't write; duplicate key in table 'ins_duplicate'
+INSERT INTO ins_duplicate VALUES (1,'Antelope') ON DUPLICATE KEY UPDATE animal='Banana';
+SELECT * FROM `ins_duplicate` ORDER BY `id`;
+id animal
+1 Banana
+2 Cheetah
+3 Zebra
+INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah');
+ERROR 23000: Can't write; duplicate key in table 'ins_duplicate'
+INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah') ON DUPLICATE KEY UPDATE animal='hybrid';
+SELECT * FROM `ins_duplicate` ORDER BY `id`;
+id animal
+1 hybrid
+2 hybrid
+3 Zebra
+BEGIN;
+SELECT * FROM `ins_duplicate` ORDER BY `id`;
+id animal
+1 hybrid
+2 hybrid
+3 Zebra
+INSERT INTO ins_duplicate VALUES (1,'Antelope');
+ERROR 23000: Can't write; duplicate key in table 'ins_duplicate'
+INSERT INTO ins_duplicate VALUES (1,'Antelope') ON DUPLICATE KEY UPDATE animal='Vegetable';
+SELECT * FROM `ins_duplicate` ORDER BY `id`;
+id animal
+1 Vegetable
+2 hybrid
+3 Zebra
+INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah');
+ERROR 23000: Can't write; duplicate key in table 'ins_duplicate'
+INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah') ON DUPLICATE KEY UPDATE animal='hybrid2';
+COMMIT;
+BEGIN;
+SELECT * FROM `ins_duplicate` ORDER BY `id`;
+id animal
+1 hybrid2
+2 hybrid2
+3 Zebra
+INSERT INTO ins_duplicate VALUES (1,'Antelope');
+ERROR 23000: Can't write; duplicate key in table 'ins_duplicate'
+INSERT INTO ins_duplicate VALUES (1,'Antelope') ON DUPLICATE KEY UPDATE animal='Vegetable';
+SELECT * FROM `ins_duplicate` ORDER BY `id`;
+id animal
+1 Vegetable
+2 hybrid2
+3 Zebra
+INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah');
+ERROR 23000: Can't write; duplicate key in table 'ins_duplicate'
+INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah') ON DUPLICATE KEY UPDATE animal='hybrid3';
+ROLLBACK;
+SELECT * FROM `ins_duplicate` ORDER BY `id`;
+id animal
+1 hybrid2
+2 hybrid2
+3 Zebra
+DROP TABLE `db1`.`ins_duplicate`;
+USE test;
+DROP DATABASE `db1`;
diff --git a/mysql-test/suite/xpand/upsert.test b/mysql-test/suite/xpand/upsert.test
new file mode 100644
index 00000000000..ab46b1a01b8
--- /dev/null
+++ b/mysql-test/suite/xpand/upsert.test
@@ -0,0 +1,49 @@
+CREATE DATABASE IF NOT EXISTS `db1`;
+USE `db1`;
+DROP TABLE IF EXISTS `ins_duplicate`;
+CREATE TABLE `ins_duplicate`(`id` INT PRIMARY KEY, `animal` VARCHAR(30)) ENGINE=xpand;
+INSERT INTO `ins_duplicate` VALUES (1,'Aardvark'), (2,'Cheetah'), (3,'Zebra');
+SELECT * FROM `ins_duplicate` ORDER BY `id`;
+
+--error ER_DUP_KEY
+INSERT INTO ins_duplicate VALUES (1,'Antelope');
+INSERT INTO ins_duplicate VALUES (1,'Antelope') ON DUPLICATE KEY UPDATE animal='Banana';
+SELECT * FROM `ins_duplicate` ORDER BY `id`;
+
+--error ER_DUP_KEY
+INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah');
+INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah') ON DUPLICATE KEY UPDATE animal='hybrid';
+SELECT * FROM `ins_duplicate` ORDER BY `id`;
+
+BEGIN;
+SELECT * FROM `ins_duplicate` ORDER BY `id`;
+
+--error ER_DUP_KEY
+INSERT INTO ins_duplicate VALUES (1,'Antelope');
+INSERT INTO ins_duplicate VALUES (1,'Antelope') ON DUPLICATE KEY UPDATE animal='Vegetable';
+SELECT * FROM `ins_duplicate` ORDER BY `id`;
+
+--error ER_DUP_KEY
+INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah');
+INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah') ON DUPLICATE KEY UPDATE animal='hybrid2';
+COMMIT;
+
+BEGIN;
+SELECT * FROM `ins_duplicate` ORDER BY `id`;
+
+--error ER_DUP_KEY
+INSERT INTO ins_duplicate VALUES (1,'Antelope');
+INSERT INTO ins_duplicate VALUES (1,'Antelope') ON DUPLICATE KEY UPDATE animal='Vegetable';
+SELECT * FROM `ins_duplicate` ORDER BY `id`;
+
+--error ER_DUP_KEY
+INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah');
+INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah') ON DUPLICATE KEY UPDATE animal='hybrid3';
+ROLLBACK;
+
+SELECT * FROM `ins_duplicate` ORDER BY `id`;
+
+DROP TABLE `db1`.`ins_duplicate`;
+
+USE test;
+DROP DATABASE `db1`;
diff --git a/storage/federatedx/federatedx_pushdown.cc b/storage/federatedx/federatedx_pushdown.cc
index 2701436ccf5..baaf8245aea 100644
--- a/storage/federatedx/federatedx_pushdown.cc
+++ b/storage/federatedx/federatedx_pushdown.cc
@@ -181,6 +181,16 @@ create_federatedx_select_handler(THD* thd, SELECT_LEX *sel)
else if (ht != tbl->table->file->partition_ht())
return 0;
}
+
+ /*
+ Currently, ha_federatedx_select_handler::init_scan just takes the
+ thd->query and sends it to the backend.
+ This obviously won't work if the SELECT uses an "INTO @var" or
+ "INTO OUTFILE". It is also unlikely to work if the select has some
+ other kind of side effect.
+ */
+ if (sel->uncacheable & UNCACHEABLE_SIDEEFFECT)
+ return NULL;
/*
Currently, ha_federatedx_select_handler::init_scan just takes the
diff --git a/storage/xpand/CMakeLists.txt b/storage/xpand/CMakeLists.txt
new file mode 100644
index 00000000000..7b4727c09da
--- /dev/null
+++ b/storage/xpand/CMakeLists.txt
@@ -0,0 +1,6 @@
+#*****************************************************************************
+# Copyright (c) 2019, 2020, MariaDB Corporation.
+#****************************************************************************/
+
+SET(XPAND_SOURCES ha_xpand.cc xpand_connection.cc ha_xpand_pushdown.cc)
+MYSQL_ADD_PLUGIN(xpand ${XPAND_SOURCES} STORAGE_ENGINE COMPONENT xpand-engine)
diff --git a/storage/xpand/ha_xpand.cc b/storage/xpand/ha_xpand.cc
new file mode 100644
index 00000000000..04369c35fe8
--- /dev/null
+++ b/storage/xpand/ha_xpand.cc
@@ -0,0 +1,1588 @@
+/*****************************************************************************
+Copyright (c) 2019, 2020, MariaDB Corporation.
+*****************************************************************************/
+
+/** @file ha_xpand.cc */
+
+#include "ha_xpand.h"
+#include "ha_xpand_pushdown.h"
+#include "key.h"
+#include <strfunc.h> /* strconvert */
+#include "my_pthread.h"
+
+handlerton *xpand_hton = NULL;
+
+int xpand_connect_timeout;
+static MYSQL_SYSVAR_INT
+(
+ connect_timeout,
+ xpand_connect_timeout,
+ PLUGIN_VAR_OPCMDARG,
+ "Timeout for connecting to Xpand",
+ NULL, NULL, -1, -1, 2147483647, 0
+);
+
+int xpand_read_timeout;
+static MYSQL_SYSVAR_INT
+(
+ read_timeout,
+ xpand_read_timeout,
+ PLUGIN_VAR_OPCMDARG,
+ "Timeout for receiving data from Xpand",
+ NULL, NULL, -1, -1, 2147483647, 0
+);
+
+int xpand_write_timeout;
+static MYSQL_SYSVAR_INT
+(
+ write_timeout,
+ xpand_write_timeout,
+ PLUGIN_VAR_OPCMDARG,
+ "Timeout for sending data to Xpand",
+ NULL, NULL, -1, -1, 2147483647, 0
+);
+
+//state for load balancing
+int xpand_hosts_cur; //protected by my_atomic's
+ulong xpand_balance_algorithm;
+const char* balance_algorithm_names[]=
+{
+ "first", "round_robin", NullS
+};
+
+TYPELIB balance_algorithms=
+{
+ array_elements(balance_algorithm_names) - 1, "",
+ balance_algorithm_names, NULL
+};
+
+static void update_balance_algorithm(MYSQL_THD thd, struct st_mysql_sys_var *var,
+ void *var_ptr, const void *save)
+{
+ *static_cast<ulong *>(var_ptr) = *static_cast<const ulong *>(save);
+ my_atomic_store32(&xpand_hosts_cur, 0);
+}
+
+static MYSQL_SYSVAR_ENUM
+(
+ balance_algorithm,
+ xpand_balance_algorithm,
+ PLUGIN_VAR_OPCMDARG,
+ "Method for managing load balancing of Clustrix nodes, can take values FIRST or ROUND_ROBIN",
+ NULL, update_balance_algorithm, XPAND_BALANCE_ROUND_ROBIN, &balance_algorithms
+);
+
+//current list of clx hosts
+#ifdef HAVE_PSI_INTERFACE
+static PSI_rwlock_key key_xpand_hosts;
+#endif
+mysql_rwlock_t xpand_hosts_lock;
+xpand_host_list *xpand_hosts;
+
+static int check_hosts(MYSQL_THD thd, struct st_mysql_sys_var *var,
+ void *save, struct st_mysql_value *value)
+{
+ DBUG_ENTER("check_hosts");
+ char b;
+ int len = 0;
+ const char *val = value->val_str(value, &b, &len);
+ if (!val)
+ DBUG_RETURN(HA_ERR_OUT_OF_MEM);
+
+ xpand_host_list list;
+ memset(&list, 0, sizeof(list));
+
+ int error_code = 0;
+ if ((error_code = list.fill(val)))
+ DBUG_RETURN(error_code);
+ list.empty();
+
+ *static_cast<const char **>(save) = val;
+ DBUG_RETURN(0);
+}
+
+static void update_hosts(MYSQL_THD thd, struct st_mysql_sys_var *var,
+ void *var_ptr, const void *save)
+{
+ DBUG_ENTER("update_hosts");
+ const char *from_save = *static_cast<const char * const *>(save);
+
+ mysql_rwlock_wrlock(&xpand_hosts_lock);
+
+ xpand_host_list *list = static_cast<xpand_host_list*>(
+ my_malloc(sizeof(xpand_host_list), MYF(MY_WME | MY_ZEROFILL)));
+ int error_code = list->fill(from_save);
+ if (error_code) {
+ my_free(list);
+ sql_print_error("Unhandled error %d setting xpand hostlist", error_code);
+ DBUG_VOID_RETURN;
+ }
+
+ xpand_hosts->empty();
+ my_free(xpand_hosts);
+ xpand_hosts = list;
+
+ char **display_var = static_cast<char**>(var_ptr);
+ my_free(*display_var);
+ *display_var = my_strdup(from_save, MYF(MY_WME));
+
+ mysql_rwlock_unlock(&xpand_hosts_lock);
+ DBUG_VOID_RETURN;
+}
+
+static char *xpand_hosts_str;
+static MYSQL_SYSVAR_STR
+(
+ hosts,
+ xpand_hosts_str,
+ PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC,
+ "List of xpand hostnames seperated by commas, semicolons or spaces",
+ check_hosts, update_hosts, "localhost"
+);
+
+char *xpand_username;
+static MYSQL_SYSVAR_STR
+(
+ username,
+ xpand_username,
+ PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC,
+ "Xpand user name",
+ NULL, NULL, "root"
+);
+
+char *xpand_password;
+static MYSQL_SYSVAR_STR
+(
+ password,
+ xpand_password,
+ PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC,
+ "Xpand password",
+ NULL, NULL, ""
+);
+
+uint xpand_port;
+static MYSQL_SYSVAR_UINT
+(
+ port,
+ xpand_port,
+ PLUGIN_VAR_RQCMDARG,
+ "Xpand port",
+ NULL, NULL, MYSQL_PORT_DEFAULT, MYSQL_PORT_DEFAULT, 65535, 0
+);
+
+char *xpand_socket;
+static MYSQL_SYSVAR_STR
+(
+ socket,
+ xpand_socket,
+ PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC,
+ "Xpand socket",
+ NULL, NULL, ""
+);
+
+static MYSQL_THDVAR_UINT
+(
+ row_buffer,
+ PLUGIN_VAR_RQCMDARG,
+ "Xpand rowstore row buffer size",
+ NULL, NULL, 20, 1, 65535, 0
+);
+
+// Per thread select handler knob
+static MYSQL_THDVAR_BOOL(
+ select_handler,
+ PLUGIN_VAR_NOCMDARG,
+ "",
+ NULL,
+ NULL,
+ 1
+);
+
+// Per thread derived handler knob
+static MYSQL_THDVAR_BOOL(
+ derived_handler,
+ PLUGIN_VAR_NOCMDARG,
+ "",
+ NULL,
+ NULL,
+ 1
+);
+
+static MYSQL_THDVAR_BOOL(
+ enable_direct_update,
+ PLUGIN_VAR_NOCMDARG,
+ "",
+ NULL,
+ NULL,
+ 1
+);
+
+bool select_handler_setting(THD* thd)
+{
+ return ( thd == NULL ) ? false : THDVAR(thd, select_handler);
+}
+
+bool derived_handler_setting(THD* thd)
+{
+ return ( thd == NULL ) ? false : THDVAR(thd, derived_handler);
+}
+
+uint row_buffer_setting(THD* thd)
+{
+ return THDVAR(thd, row_buffer);
+}
+
+
+/*
+ Get an Xpand_share object for this object. If it doesn't yet exist, create
+ it.
+*/
+
+Xpand_share *ha_xpand::get_share()
+{
+ Xpand_share *tmp_share;
+
+ DBUG_ENTER("ha_xpand::get_share()");
+
+ lock_shared_ha_data();
+ if (!(tmp_share= static_cast<Xpand_share*>(get_ha_share_ptr())))
+ {
+ tmp_share= new Xpand_share;
+ if (!tmp_share)
+ goto err;
+
+ set_ha_share_ptr(static_cast<Handler_share*>(tmp_share));
+ }
+err:
+ unlock_shared_ha_data();
+ DBUG_RETURN(tmp_share);
+}
+
+
+/****************************************************************************
+** Utility functions
+****************************************************************************/
+// This is a wastefull aproach but better then fixed sized buffer.
+size_t estimate_row_size(TABLE *table)
+{
+ size_t row_size = 0;
+ size_t null_byte_count = (bitmap_bits_set(table->write_set) + 7) / 8;
+ row_size += null_byte_count;
+ Field **p_field= table->field, *field;
+ for ( ; (field= *p_field) ; p_field++) {
+ row_size += field->max_data_length();
+ }
+ return row_size;
+}
+
+
+/*
+ Try to decode a string from filename encoding, if that fails, return the
+ original string.
+
+ @detail
+ This is used to get table (or database) name from file (or directory)
+ name. Names of regular tables/databases are encoded using
+ my_charset_filename encoding.
+ Names of temporary tables are not encoded, and they start with '#sql'
+ which is not a valid character sequence in my_charset_filename encoding.
+ Our way to talkle this is to
+ 1. Try to convert the name back
+ 2. If that failed, assume it's a temporary object name and just use the
+ name.
+*/
+
+static void decode_object_or_tmp_name(const char *from, uint size,
+ std::string *out)
+{
+ uint errors, new_size;
+ out->resize(size+1); // assume the decoded string is not longer
+ new_size= strconvert(&my_charset_filename, from, size,
+ system_charset_info, (char*)out->c_str(), size+1,
+ &errors);
+ if (errors)
+ out->assign(from, size);
+ else
+ out->resize(new_size);
+}
+
+/*
+ Take a "./db_name/table_name" and extract db_name and table_name from it
+
+ @return
+ 0 OK
+ other Error code
+*/
+static int normalize_tablename(const char *db_table,
+ std::string *norm_db, std::string *norm_table)
+{
+ std::string tablename(db_table);
+ if (tablename.size() < 2 || tablename[0] != '.' ||
+ (tablename[1] != FN_LIBCHAR && tablename[1] != FN_LIBCHAR2)) {
+ DBUG_ASSERT(0); // We were not passed table name?
+ return HA_ERR_INTERNAL_ERROR;
+ }
+
+ size_t pos = tablename.find_first_of(FN_LIBCHAR, 2);
+ if (pos == std::string::npos) {
+ pos = tablename.find_first_of(FN_LIBCHAR2, 2);
+ }
+
+ if (pos == std::string::npos) {
+ DBUG_ASSERT(0); // We were not passed table name?
+ return HA_ERR_INTERNAL_ERROR;
+ }
+
+ decode_object_or_tmp_name(tablename.c_str() + 2, pos - 2, norm_db);
+ decode_object_or_tmp_name(tablename.c_str() + pos + 1,
+ tablename.size() - (pos + 1), norm_table);
+ return 0;
+}
+
+
+xpand_connection *get_trx(THD *thd, int *error_code)
+{
+ *error_code = 0;
+ xpand_connection *trx;
+ if (!(trx = (xpand_connection *)thd_get_ha_data(thd, xpand_hton)))
+ {
+ if (!(trx = new xpand_connection())) {
+ *error_code = HA_ERR_OUT_OF_MEM;
+ return NULL;
+ }
+
+ *error_code = trx->connect();
+ if (*error_code) {
+ delete trx;
+ return NULL;
+ }
+
+ thd_set_ha_data(thd, xpand_hton, trx);
+ }
+
+ return trx;
+}
+/****************************************************************************
+** Class ha_xpand
+****************************************************************************/
+
+ha_xpand::ha_xpand(handlerton *hton, TABLE_SHARE *table_arg)
+ : handler(hton, table_arg)
+{
+ DBUG_ENTER("ha_xpand::ha_xpand");
+ rgi = NULL;
+ scan_cur = NULL;
+ xpand_table_oid = 0;
+ upsert_flag = 0;
+ DBUG_VOID_RETURN;
+}
+
+ha_xpand::~ha_xpand()
+{
+ if (rgi)
+ remove_current_table_from_rpl_table_list(rgi);
+}
+
+
+int ha_xpand::create(const char *name, TABLE *form, HA_CREATE_INFO *info)
+{
+ int error_code;
+ THD *thd = ha_thd();
+ xpand_connection *trx = get_trx(thd, &error_code);
+ if (!trx)
+ return error_code;
+
+ enum tmp_table_type saved_tmp_table_type = form->s->tmp_table;
+ Table_specification_st *create_info = &thd->lex->create_info;
+ const bool is_tmp_table = info->options & HA_LEX_CREATE_TMP_TABLE;
+ String create_table_stmt;
+
+ /* Create a copy of the CREATE TABLE statement */
+ if (!is_tmp_table)
+ form->s->tmp_table = NO_TMP_TABLE;
+ const char *old_dbstr = thd->db.str;
+ thd->db.str = NULL;
+ ulong old = create_info->used_fields;
+ create_info->used_fields &= ~HA_CREATE_USED_ENGINE;
+
+ std::string norm_db, norm_table;
+ if ((error_code= normalize_tablename(name, &norm_db, &norm_table)))
+ return error_code;
+
+ TABLE_LIST table_list;
+ memset(&table_list, 0, sizeof(table_list));
+ table_list.table = form;
+ error_code = show_create_table_ex(thd, &table_list,
+ norm_db.c_str(), norm_table.c_str(),
+ &create_table_stmt, create_info, WITH_DB_NAME);
+ if (!is_tmp_table)
+ form->s->tmp_table = saved_tmp_table_type;
+ create_info->used_fields = old;
+ thd->db.str = old_dbstr;
+ if (error_code)
+ return error_code;
+
+ // To syncronize the schemas of MDB FE and XPD BE.
+ if (form->s && form->s->db.length) {
+ String createdb_stmt;
+ createdb_stmt.append("CREATE DATABASE IF NOT EXISTS `");
+ createdb_stmt.append(form->s->db.str, form->s->db.length);
+ createdb_stmt.append("`");
+ trx->run_query(createdb_stmt);
+ }
+
+ return trx->run_query(create_table_stmt);
+}
+
+int ha_xpand::delete_table(const char *path)
+{
+ int error_code;
+ THD *thd = ha_thd();
+ xpand_connection *trx = get_trx(thd, &error_code);
+ if (!trx)
+ return error_code;
+
+ std::string decoded_dbname;
+ std::string decoded_tbname;
+ if ((error_code= normalize_tablename(path, &decoded_dbname,
+ &decoded_tbname)))
+ return error_code;
+
+ String delete_cmd;
+ delete_cmd.append("DROP TABLE `");
+ delete_cmd.append(decoded_dbname.c_str());
+ delete_cmd.append("`.`");
+ delete_cmd.append(decoded_tbname.c_str());
+ delete_cmd.append("`");
+
+ return trx->run_query(delete_cmd);
+}
+
+int ha_xpand::rename_table(const char* from, const char* to)
+{
+ int error_code;
+ THD *thd = ha_thd();
+ xpand_connection *trx = get_trx(thd, &error_code);
+ if (!trx)
+ return error_code;
+
+ std::string decoded_from_dbname;
+ std::string decoded_from_tbname;
+ if ((error_code= normalize_tablename(from, &decoded_from_dbname,
+ &decoded_from_tbname)))
+ return error_code;
+
+ std::string decoded_to_dbname;
+ std::string decoded_to_tbname;
+ if ((error_code= normalize_tablename(to, &decoded_to_dbname,
+ &decoded_to_tbname)))
+ return error_code;
+
+ String rename_cmd;
+ rename_cmd.append("RENAME TABLE `");
+ rename_cmd.append(decoded_from_dbname.c_str());
+ rename_cmd.append("`.`");
+ rename_cmd.append(decoded_from_tbname.c_str());
+ rename_cmd.append("` TO `");
+ rename_cmd.append(decoded_to_dbname.c_str());
+ rename_cmd.append("`.`");
+ rename_cmd.append(decoded_to_tbname.c_str());
+ rename_cmd.append("`;");
+
+ return trx->run_query(rename_cmd);
+}
+
+static void
+xpand_mark_table_for_discovery(TABLE *table)
+{
+ table->m_needs_reopen = true;
+ Xpand_share *xs;
+ if ((xs= static_cast<Xpand_share*>(table->s->ha_share)))
+ xs->rediscover_table = true;
+}
+
+void
+xpand_mark_tables_for_discovery(LEX *lex)
+{
+ for (TABLE_LIST *tbl= lex->query_tables; tbl; tbl= tbl->next_global)
+ if (tbl->table && tbl->table->file->ht == xpand_hton)
+ xpand_mark_table_for_discovery(tbl->table);
+}
+
+ulonglong *
+xpand_extract_table_oids(THD *thd, LEX *lex)
+{
+ int cnt = 1;
+ for (TABLE_LIST *tbl = lex->query_tables; tbl; tbl= tbl->next_global)
+ if (tbl->table && tbl->table->file->ht == xpand_hton)
+ cnt++;
+
+ ulonglong *oids = (ulonglong*)thd_alloc(thd, cnt * sizeof(ulonglong));
+ ulonglong *ptr = oids;
+ for (TABLE_LIST *tbl = lex->query_tables; tbl; tbl= tbl->next_global)
+ {
+ if (tbl->table && tbl->table->file->ht == xpand_hton)
+ {
+ ha_xpand *hndlr = static_cast<ha_xpand *>(tbl->table->file);
+ *ptr++ = hndlr->get_table_oid();
+ }
+ }
+
+ *ptr = 0;
+ return oids;
+}
+
+int ha_xpand::open(const char *name, int mode, uint test_if_locked)
+{
+ THD *thd= ha_thd();
+ DBUG_ENTER("ha_xpand::open");
+
+ Xpand_share *share;
+ if (!(share = get_share()))
+ DBUG_RETURN(1);
+
+ int error_code;
+ xpand_connection *trx = get_trx(thd, &error_code);
+ if (!trx)
+ DBUG_RETURN(error_code);
+
+ if (share->rediscover_table)
+ DBUG_RETURN(HA_ERR_TABLE_DEF_CHANGED);
+
+ if (!share->xpand_table_oid) {
+ // We may end up with two threads executing this piece concurrently but
+ // it's ok
+ std::string norm_table;
+ std::string norm_db;
+ if ((error_code= normalize_tablename(name, &norm_db, &norm_table)))
+ DBUG_RETURN(error_code);
+
+ ulonglong oid = 0;
+ error_code= trx->get_table_oid(norm_db.c_str(), strlen(norm_db.c_str()),
+ norm_table.c_str(),
+ strlen(norm_table.c_str()), &oid,
+ table_share);
+ if (error_code)
+ DBUG_RETURN(error_code);
+
+ share->xpand_table_oid = oid;
+ }
+
+ xpand_table_oid = share->xpand_table_oid;
+
+ // Surrogate key marker
+ has_hidden_key = table->s->primary_key == MAX_KEY;
+ if (has_hidden_key) {
+ ref_length = 8;
+ } else {
+ KEY* key_info = table->key_info + table->s->primary_key;
+ ref_length = key_info->key_length;
+ }
+
+ DBUG_PRINT("open finished",
+ ("oid: %llu, ref_length: %u", xpand_table_oid, ref_length));
+ DBUG_RETURN(0);
+}
+
+int ha_xpand::close(void)
+{
+ return 0;
+}
+
+int ha_xpand::reset()
+{
+ upsert_flag &= ~XPAND_BULK_UPSERT;
+ upsert_flag &= ~XPAND_HAS_UPSERT;
+ upsert_flag &= ~XPAND_UPSERT_SENT;
+ xpd_lock_type = XPAND_NO_LOCKS;
+ pushdown_cond_list.empty();
+ return 0;
+}
+
+int ha_xpand::extra(enum ha_extra_function operation)
+{
+ DBUG_ENTER("ha_xpand::extra");
+ if (operation == HA_EXTRA_INSERT_WITH_UPDATE)
+ upsert_flag |= XPAND_HAS_UPSERT;
+ DBUG_RETURN(0);
+}
+
+/*@brief UPSERT State Machine*/
+/*************************************************************
+ * DESCRIPTION:
+ * Fasttrack for UPSERT sends queries down to a XPD backend.
+ * UPSERT could be of two kinds: singular and bulk. The plugin
+ * re-/sets XPAND_BULK_UPSERT in end|start_bulk_insert
+ * methods. XPAND_UPSERT_SENT is used to avoid multiple
+ * execution at XPD backend.
+ * Generic XPAND_HAS_UPSERT is set for bulk UPSERT only b/c
+ * MDB calls write_row only once.
+ ************************************************************/
+int ha_xpand::write_row(const uchar *buf)
+{
+ int error_code = 0;
+ THD *thd = ha_thd();
+ xpand_connection *trx = get_trx(thd, &error_code);
+ if (!trx)
+ return error_code;
+
+ if (upsert_flag & XPAND_HAS_UPSERT) {
+ if (!(upsert_flag & XPAND_UPSERT_SENT)) {
+ ha_rows update_rows;
+ String update_stmt;
+ update_stmt.append(thd->query_string.str());
+
+ if (!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
+ trx->auto_commit_next();
+
+ ulonglong *oids = xpand_extract_table_oids(thd, thd->lex);
+ error_code= trx->update_query(update_stmt, table->s->db, oids,
+ &update_rows);
+ if (upsert_flag & XPAND_BULK_UPSERT)
+ upsert_flag |= XPAND_UPSERT_SENT;
+ else
+ upsert_flag &= ~XPAND_HAS_UPSERT;
+ }
+
+ if (error_code == HA_ERR_TABLE_DEF_CHANGED)
+ xpand_mark_tables_for_discovery(thd->lex);
+ return error_code;
+ }
+
+ /* Convert the row format to binlog (packed) format */
+ uchar *packed_new_row = (uchar*) my_alloca(estimate_row_size(table));
+ size_t packed_size = pack_row(table, table->write_set, packed_new_row, buf);
+
+ /* XXX: Xpand may needs to return HA_ERR_AUTOINC_ERANGE if we hit that
+ error. */
+ ulonglong last_insert_id = 0;
+ if ((error_code = trx->write_row(xpand_table_oid, packed_new_row, packed_size,
+ &last_insert_id)))
+ goto err;
+
+ if (table->next_number_field)
+ insert_id_for_cur_row = last_insert_id;
+
+err:
+ if (error_code == HA_ERR_TABLE_DEF_CHANGED)
+ xpand_mark_table_for_discovery(table);
+
+ if (packed_size)
+ my_afree(packed_new_row);
+
+ return error_code;
+}
+
+int ha_xpand::update_row(const uchar *old_data, const uchar *new_data)
+{
+ DBUG_ENTER("ha_xpand::update_row");
+ int error_code;
+ THD *thd = ha_thd();
+ xpand_connection *trx = get_trx(thd, &error_code);
+ if (!trx)
+ DBUG_RETURN(error_code);
+
+ size_t row_size = estimate_row_size(table);
+ size_t packed_key_len;
+ uchar *packed_key = (uchar*) my_alloca(row_size);
+ build_key_packed_row(table->s->primary_key, old_data,
+ packed_key, &packed_key_len);
+
+ uchar *packed_new_row = (uchar*) my_alloca(row_size);
+ size_t packed_new_size = pack_row(table, table->write_set, packed_new_row,
+ new_data);
+
+ /* Send the packed rows to Xpand */
+ error_code = trx->key_update(xpand_table_oid, packed_key, packed_key_len,
+ table->write_set,
+ packed_new_row, packed_new_size);
+
+ if(packed_key)
+ my_afree(packed_key);
+
+ if(packed_new_row)
+ my_afree(packed_new_row);
+
+ if (error_code == HA_ERR_TABLE_DEF_CHANGED)
+ xpand_mark_table_for_discovery(table);
+
+ DBUG_RETURN(error_code);
+}
+
+int ha_xpand::direct_update_rows_init(List<Item> *update_fields)
+{
+ DBUG_ENTER("ha_xpand::direct_update_rows_init");
+ THD *thd= ha_thd();
+ if (!THDVAR(thd, enable_direct_update))
+ DBUG_RETURN(HA_ERR_WRONG_COMMAND);
+ DBUG_RETURN(0);
+}
+
+int ha_xpand::direct_update_rows(ha_rows *update_rows, ha_rows *found_rows)
+{
+ DBUG_ENTER("ha_xpand::direct_update_rows");
+ int error_code= 0;
+ THD *thd= ha_thd();
+ xpand_connection *trx= get_trx(thd, &error_code);
+ if (!trx)
+ return error_code;
+
+ String update_stmt;
+ // Do the same as create_xpand_select_handler does:
+ thd->lex->print(&update_stmt, QT_ORDINARY);
+
+ if (!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
+ trx->auto_commit_next();
+
+ ulonglong *oids = xpand_extract_table_oids(thd, thd->lex);
+ error_code = trx->update_query(update_stmt, table->s->db, oids, update_rows);
+ *found_rows = *update_rows;
+
+ if (error_code == HA_ERR_TABLE_DEF_CHANGED)
+ xpand_mark_tables_for_discovery(thd->lex);
+ DBUG_RETURN(error_code);
+}
+
+void ha_xpand::start_bulk_insert(ha_rows rows, uint flags)
+{
+ DBUG_ENTER("ha_xpand::start_bulk_insert");
+ int error_code= 0;
+ THD *thd= ha_thd();
+ xpand_connection *trx= get_trx(thd, &error_code);
+ if (!trx) {
+ // TBD log this
+ DBUG_VOID_RETURN;
+ }
+
+ upsert_flag |= XPAND_BULK_UPSERT;
+
+ DBUG_VOID_RETURN;
+}
+
+int ha_xpand::end_bulk_insert()
+{
+ DBUG_ENTER("ha_xpand::end_bulk_insert");
+ upsert_flag &= ~XPAND_BULK_UPSERT;
+ upsert_flag &= ~XPAND_HAS_UPSERT;
+ upsert_flag &= ~XPAND_UPSERT_SENT;
+ DBUG_RETURN(0);
+}
+
+int ha_xpand::delete_row(const uchar *buf)
+{
+ int error_code;
+ THD *thd = ha_thd();
+ xpand_connection *trx = get_trx(thd, &error_code);
+ if (!trx)
+ return error_code;
+
+ // The estimate should consider only key fields widths.
+ size_t packed_key_len;
+ uchar *packed_key = (uchar*) my_alloca(estimate_row_size(table));
+ build_key_packed_row(table->s->primary_key, buf, packed_key, &packed_key_len);
+
+ error_code = trx->key_delete(xpand_table_oid, packed_key, packed_key_len);
+
+ if (error_code == HA_ERR_TABLE_DEF_CHANGED)
+ xpand_mark_table_for_discovery(table);
+
+ if (packed_key)
+ my_afree(packed_key);
+
+ return error_code;
+}
+
+ha_xpand::Table_flags ha_xpand::table_flags(void) const
+{
+ Table_flags flags = HA_PARTIAL_COLUMN_READ |
+ HA_REC_NOT_IN_SEQ |
+ HA_FAST_KEY_READ |
+ HA_NULL_IN_KEY |
+ HA_CAN_INDEX_BLOBS |
+ HA_AUTO_PART_KEY |
+ HA_CAN_SQL_HANDLER |
+ HA_BINLOG_STMT_CAPABLE |
+ HA_CAN_TABLE_CONDITION_PUSHDOWN |
+ HA_CAN_DIRECT_UPDATE_AND_DELETE;
+
+ return flags;
+}
+
+ulong ha_xpand::index_flags(uint idx, uint part, bool all_parts) const
+{
+ ulong flags = HA_READ_NEXT |
+ HA_READ_PREV |
+ HA_READ_ORDER |
+ HA_READ_RANGE;
+
+ return flags;
+}
+
+ha_rows ha_xpand::records()
+{
+ return 10000;
+}
+
+ha_rows ha_xpand::records_in_range(uint inx, key_range *min_key,
+ key_range *max_key)
+{
+ return 2;
+}
+
+int ha_xpand::info(uint flag)
+{
+ //THD *thd = ha_thd();
+ if (flag & HA_STATUS_TIME)
+ {
+ /* Retrieve the time of the most recent update to the table */
+ // stats.update_time =
+ }
+
+ if (flag & HA_STATUS_AUTO)
+ {
+ /* Retrieve the latest auto_increment value */
+ stats.auto_increment_value = next_insert_id;
+ }
+
+ if (flag & HA_STATUS_VARIABLE)
+ {
+ /* Retrieve variable info, such as row counts and file lengths */
+ stats.records = records();
+ stats.deleted = 0;
+ // stats.data_file_length =
+ // stats.index_file_length =
+ // stats.delete_length =
+ stats.check_time = 0;
+ // stats.mrr_length_per_rec =
+
+ if (stats.records == 0)
+ stats.mean_rec_length = 0;
+ else
+ stats.mean_rec_length = (ulong) (stats.data_file_length / stats.records);
+ }
+
+ if (flag & HA_STATUS_CONST)
+ {
+ /*
+ Retrieve constant info, such as file names, max file lengths,
+ create time, block size
+ */
+ // stats.max_data_file_length =
+ // stats.create_time =
+ // stats.block_size =
+ }
+
+ return 0;
+}
+
+int ha_xpand::index_init(uint idx, bool sorted)
+{
+ int error_code = 0;
+ THD *thd = ha_thd();
+ xpand_connection *trx = get_trx(thd, &error_code);
+ if (!trx)
+ return error_code;
+
+ active_index = idx;
+ add_current_table_to_rpl_table_list(&rgi, thd, table);
+ scan_cur = NULL;
+
+ /* Return all columns until there is a better understanding of
+ requirements. */
+ if (my_bitmap_init(&scan_fields, NULL, table->read_set->n_bits, false))
+ return ER_OUTOFMEMORY;
+ bitmap_set_all(&scan_fields);
+ sorted_scan = sorted;
+
+ return 0;
+}
+
+int ha_xpand::index_read(uchar * buf, const uchar * key, uint key_len,
+ enum ha_rkey_function find_flag)
+{
+ DBUG_ENTER("ha_xpand::index_read");
+ int error_code = 0;
+ THD *thd = ha_thd();
+ xpand_connection *trx = get_trx(thd, &error_code);
+ if (!trx)
+ DBUG_RETURN(error_code);
+
+ key_restore(buf, key, &table->key_info[active_index], key_len);
+ // The estimate should consider only key fields widths.
+ size_t packed_key_len;
+ uchar *packed_key = (uchar*) my_alloca(estimate_row_size(table));
+ build_key_packed_row(active_index, buf, packed_key, &packed_key_len);
+
+ bool exact = false;
+ xpand_connection::scan_type st;
+ switch (find_flag) {
+ case HA_READ_KEY_EXACT:
+ exact = true;
+ break;
+ case HA_READ_KEY_OR_NEXT:
+ st = xpand_connection::READ_KEY_OR_NEXT;
+ break;
+ case HA_READ_KEY_OR_PREV:
+ st = xpand_connection::READ_KEY_OR_PREV;
+ break;
+ case HA_READ_AFTER_KEY:
+ st = xpand_connection::READ_AFTER_KEY;
+ break;
+ case HA_READ_BEFORE_KEY:
+ st = xpand_connection::READ_BEFORE_KEY;
+ break;
+ case HA_READ_PREFIX:
+ case HA_READ_PREFIX_LAST:
+ case HA_READ_PREFIX_LAST_OR_PREV:
+ case HA_READ_MBR_CONTAIN:
+ case HA_READ_MBR_INTERSECT:
+ case HA_READ_MBR_WITHIN:
+ case HA_READ_MBR_DISJOINT:
+ case HA_READ_MBR_EQUAL:
+ DBUG_RETURN(ER_NOT_SUPPORTED_YET);
+ }
+
+ uchar *rowdata = NULL;
+ if (exact) {
+ is_scan = false;
+ ulonglong rowdata_length;
+ error_code = trx->key_read(xpand_table_oid, 0, xpd_lock_type,
+ table->read_set, packed_key, packed_key_len,
+ &rowdata, &rowdata_length);
+ if (!error_code)
+ error_code = unpack_row_to_buf(rgi, table, buf, rowdata, table->read_set,
+ rowdata + rowdata_length);
+ } else {
+ is_scan = true;
+ error_code = trx->scan_from_key(xpand_table_oid, active_index,
+ xpd_lock_type, st, -1, sorted_scan,
+ &scan_fields, packed_key, packed_key_len,
+ THDVAR(thd, row_buffer), &scan_cur);
+ if (!error_code)
+ error_code = rnd_next(buf);
+ }
+
+ if (rowdata)
+ my_free(rowdata);
+
+ if (packed_key)
+ my_afree(packed_key);
+
+ if (error_code == HA_ERR_TABLE_DEF_CHANGED)
+ xpand_mark_table_for_discovery(table);
+
+ DBUG_RETURN(error_code);
+}
+
+int ha_xpand::index_first(uchar *buf)
+{
+ DBUG_ENTER("ha_xpand::index_first");
+ int error_code = 0;
+ THD *thd = ha_thd();
+ xpand_connection *trx = get_trx(thd, &error_code);
+ if (!trx)
+ DBUG_RETURN(error_code);
+
+ error_code = trx->scan_from_key(xpand_table_oid, active_index, xpd_lock_type,
+ xpand_connection::READ_FROM_START, -1,
+ sorted_scan, &scan_fields, NULL, 0,
+ THDVAR(thd, row_buffer), &scan_cur);
+
+ if (error_code == HA_ERR_TABLE_DEF_CHANGED)
+ xpand_mark_table_for_discovery(table);
+
+ if (error_code)
+ DBUG_RETURN(error_code);
+
+ DBUG_RETURN(rnd_next(buf));
+}
+
+int ha_xpand::index_last(uchar *buf)
+{
+ DBUG_ENTER("ha_xpand::index_last");
+ int error_code = 0;
+ THD *thd = ha_thd();
+ xpand_connection *trx = get_trx(thd, &error_code);
+ if (!trx)
+ DBUG_RETURN(error_code);
+
+ error_code = trx->scan_from_key(xpand_table_oid, active_index, xpd_lock_type,
+ xpand_connection::READ_FROM_LAST, -1,
+ sorted_scan, &scan_fields, NULL, 0,
+ THDVAR(thd, row_buffer), &scan_cur);
+
+ if (error_code == HA_ERR_TABLE_DEF_CHANGED)
+ xpand_mark_table_for_discovery(table);
+
+ if (error_code)
+ DBUG_RETURN(error_code);
+
+ DBUG_RETURN(rnd_next(buf));
+}
+
+int ha_xpand::index_next(uchar *buf)
+{
+ DBUG_ENTER("index_next");
+ DBUG_RETURN(rnd_next(buf));
+}
+
+#if 0
+int ha_xpand::index_next_same(uchar *buf, const uchar *key, uint keylen)
+{
+ DBUG_ENTER("index_next_same");
+ DBUG_RETURN(rnd_next(buf));
+}
+#endif
+
+int ha_xpand::index_prev(uchar *buf)
+{
+ DBUG_ENTER("index_prev");
+ DBUG_RETURN(rnd_next(buf));
+}
+
+int ha_xpand::index_end()
+{
+ DBUG_ENTER("index_prev");
+ if (scan_cur)
+ DBUG_RETURN(rnd_end());
+ else
+ {
+ my_bitmap_free(&scan_fields);
+ DBUG_RETURN(0);
+ }
+}
+
+int ha_xpand::rnd_init(bool scan)
+{
+ DBUG_ENTER("ha_xpand::rnd_init");
+ int error_code = 0;
+ THD *thd = ha_thd();
+ if (thd->lex->sql_command == SQLCOM_UPDATE)
+ DBUG_RETURN(error_code);
+ xpand_connection *trx = get_trx(thd, &error_code);
+ if (!trx)
+ DBUG_RETURN(error_code);
+
+ add_current_table_to_rpl_table_list(&rgi, thd, table);
+ is_scan = scan;
+ scan_cur = NULL;
+
+ if (my_bitmap_init(&scan_fields, NULL, table->read_set->n_bits, false))
+ DBUG_RETURN(ER_OUTOFMEMORY);
+
+#if 0
+ if (table->s->keys)
+ table->mark_columns_used_by_index(table->s->primary_key, &scan_fields);
+ else
+ bitmap_clear_all(&scan_fields);
+
+ bitmap_union(&scan_fields, table->read_set);
+#else
+ /* Why is read_set not setup correctly? */
+ bitmap_set_all(&scan_fields);
+#endif
+
+ String* pushdown_cond_sql = nullptr;
+ if (pushdown_cond_list.elements) {
+ pushdown_cond_sql = new String();
+ while (pushdown_cond_list.elements > 0) {
+ COND* cond = pushdown_cond_list.pop();
+ String sql_predicate;
+ cond->print_for_table_def(&sql_predicate);
+ pushdown_cond_sql->append(sql_predicate);
+ if ( pushdown_cond_list.elements > 0)
+ pushdown_cond_sql->append(" AND ");
+ }
+ }
+
+ error_code = trx->scan_table(xpand_table_oid, xpd_lock_type, &scan_fields,
+ THDVAR(thd, row_buffer), &scan_cur,
+ pushdown_cond_sql);
+
+ if (pushdown_cond_sql != nullptr)
+ delete pushdown_cond_sql;
+
+ if (error_code == HA_ERR_TABLE_DEF_CHANGED)
+ xpand_mark_table_for_discovery(table);
+
+ if (error_code)
+ DBUG_RETURN(error_code);
+
+ DBUG_RETURN(0);
+}
+
+int ha_xpand::rnd_next(uchar *buf)
+{
+ int error_code = 0;
+ THD *thd = ha_thd();
+ xpand_connection *trx = get_trx(thd, &error_code);
+ if (!trx)
+ return error_code;
+
+ assert(is_scan);
+ assert(scan_cur);
+
+ uchar *rowdata;
+ ulong rowdata_length;
+ if ((error_code = trx->scan_next(scan_cur, &rowdata, &rowdata_length)))
+ return error_code;
+
+ if (has_hidden_key) {
+ last_hidden_key = *(ulonglong *)rowdata;
+ rowdata += 8;
+ rowdata_length -= 8;
+ }
+
+ error_code = unpack_row_to_buf(rgi, table, buf, rowdata, &scan_fields,
+ rowdata + rowdata_length);
+
+ if (error_code)
+ return error_code;
+
+ return 0;
+}
+
+int ha_xpand::rnd_pos(uchar * buf, uchar *pos)
+{
+ DBUG_ENTER("xpd_rnd_pos");
+ DBUG_DUMP("pos", pos, ref_length);
+
+ int error_code = 0;
+ THD *thd = ha_thd();
+ xpand_connection *trx = get_trx(thd, &error_code);
+ if (!trx)
+ DBUG_RETURN(error_code);
+
+ /* WDD: We need a way to convert key buffers directy to rbr buffers. */
+
+ if (has_hidden_key) {
+ memcpy(&last_hidden_key, pos, sizeof(ulonglong));
+ } else {
+ uint keyno = table->s->primary_key;
+ uint len = calculate_key_len(table, keyno, pos,
+ table->const_key_parts[keyno]);
+ key_restore(buf, pos, &table->key_info[keyno], len);
+ }
+
+ // The estimate should consider only key fields widths.
+ uchar *packed_key = (uchar*) my_alloca(estimate_row_size(table));
+ size_t packed_key_len;
+ build_key_packed_row(table->s->primary_key, buf, packed_key, &packed_key_len);
+
+ uchar *rowdata = NULL;
+ ulonglong rowdata_length;
+ if ((error_code = trx->key_read(xpand_table_oid, 0, xpd_lock_type,
+ table->read_set, packed_key, packed_key_len,
+ &rowdata, &rowdata_length)))
+ goto err;
+
+ if ((error_code = unpack_row_to_buf(rgi, table, buf, rowdata, table->read_set,
+ rowdata + rowdata_length)))
+ goto err;
+
+err:
+ if (rowdata)
+ my_free(rowdata);
+
+ if (packed_key)
+ my_afree(packed_key);
+
+ if (error_code == HA_ERR_TABLE_DEF_CHANGED)
+ xpand_mark_table_for_discovery(table);
+
+ DBUG_RETURN(error_code);
+}
+
+int ha_xpand::rnd_end()
+{
+ DBUG_ENTER("ha_xpand::rnd_end");
+ int error_code = 0;
+ THD *thd = ha_thd();
+ if (thd->lex->sql_command == SQLCOM_UPDATE)
+ DBUG_RETURN(error_code);
+
+ xpand_connection *trx = get_trx(thd, &error_code);
+ if (!trx)
+ DBUG_RETURN(error_code);
+
+ my_bitmap_free(&scan_fields);
+ if (scan_cur && (error_code = trx->scan_end(scan_cur)))
+ DBUG_RETURN(error_code);
+ scan_cur = NULL;
+
+ DBUG_RETURN(0);
+}
+
+void ha_xpand::position(const uchar *record)
+{
+ DBUG_ENTER("xpd_position");
+ if (has_hidden_key) {
+ memcpy(ref, &last_hidden_key, sizeof(ulonglong));
+ } else {
+ KEY* key_info = table->key_info + table->s->primary_key;
+ key_copy(ref, record, key_info, key_info->key_length);
+ }
+ DBUG_DUMP("key", ref, ref_length);
+ DBUG_VOID_RETURN;
+}
+
+uint ha_xpand::lock_count(void) const
+{
+ /* Hopefully, we don't need to use thread locks */
+ return 0;
+}
+
+THR_LOCK_DATA **ha_xpand::store_lock(THD *thd, THR_LOCK_DATA **to,
+ enum thr_lock_type lock_type)
+{
+ /* Hopefully, we don't need to use thread locks */
+ return to;
+}
+
+int ha_xpand::external_lock(THD *thd, int lock_type)
+{
+ DBUG_ENTER("ha_xpand::external_lock()");
+ int error_code;
+ xpand_connection *trx = get_trx(thd, &error_code);
+ if (error_code)
+ DBUG_RETURN(error_code);
+
+ if (lock_type == F_WRLCK)
+ xpd_lock_type = XPAND_EXCLUSIVE;
+ else if (lock_type == F_RDLCK)
+ xpd_lock_type = XPAND_SHARED;
+ else if (lock_type == F_UNLCK)
+ xpd_lock_type = XPAND_NO_LOCKS;
+
+ if (lock_type != F_UNLCK) {
+ if (!trx->has_open_transaction()) {
+ error_code = trx->begin_transaction_next();
+ if (error_code)
+ DBUG_RETURN(error_code);
+ }
+
+ trans_register_ha(thd, FALSE, xpand_hton);
+ if (thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
+ trans_register_ha(thd, TRUE, xpand_hton);
+ }
+
+ DBUG_RETURN(error_code);
+}
+
+/****************************************************************************
+ Engine Condition Pushdown
+****************************************************************************/
+
+const COND *ha_xpand::cond_push(const COND *cond)
+{
+ THD *thd= ha_thd();
+ if (!thd->lex->describe) {
+ pushdown_cond_list.push_front(const_cast<COND*>(cond));
+ }
+
+ return NULL;
+}
+
+void ha_xpand::cond_pop()
+{
+ pushdown_cond_list.pop();
+}
+
+int ha_xpand::info_push(uint info_type, void *info)
+{
+ return 0;
+}
+
+ulonglong ha_xpand::get_table_oid()
+{
+ return xpand_table_oid;
+}
+
+/****************************************************************************
+** Row encoding functions
+****************************************************************************/
+
+void add_current_table_to_rpl_table_list(rpl_group_info **_rgi, THD *thd,
+ TABLE *table)
+{
+ if (*_rgi)
+ return;
+
+ Relay_log_info *rli = new Relay_log_info(FALSE);
+ rli->sql_driver_thd = thd;
+
+ rpl_group_info *rgi = new rpl_group_info(rli);
+ *_rgi = rgi;
+ rgi->thd = thd;
+ rgi->tables_to_lock_count = 0;
+ rgi->tables_to_lock = NULL;
+ if (rgi->tables_to_lock_count)
+ return;
+
+ rgi->tables_to_lock = (RPL_TABLE_LIST *)my_malloc(sizeof(RPL_TABLE_LIST),
+ MYF(MY_WME));
+ rgi->tables_to_lock->init_one_table(&table->s->db, &table->s->table_name, 0,
+ TL_READ);
+ rgi->tables_to_lock->table = table;
+ rgi->tables_to_lock->table_id = table->tablenr;
+ rgi->tables_to_lock->m_conv_table = NULL;
+ rgi->tables_to_lock->master_had_triggers = FALSE;
+ rgi->tables_to_lock->m_tabledef_valid = TRUE;
+ // We need one byte per column to save a column's binlog type.
+ uchar *col_type = (uchar*) my_alloca(table->s->fields);
+ for (uint i = 0 ; i < table->s->fields ; ++i)
+ col_type[i] = table->field[i]->binlog_type();
+
+ table_def *tabledef = &rgi->tables_to_lock->m_tabledef;
+ new (tabledef) table_def(col_type, table->s->fields, NULL, 0, NULL, 0);
+ rgi->tables_to_lock_count++;
+ if (col_type)
+ my_afree(col_type);
+}
+
+void remove_current_table_from_rpl_table_list(rpl_group_info *rgi)
+{
+ if (!rgi->tables_to_lock)
+ return;
+
+ rgi->tables_to_lock->m_tabledef.table_def::~table_def();
+ rgi->tables_to_lock->m_tabledef_valid = FALSE;
+ my_free(rgi->tables_to_lock);
+ rgi->tables_to_lock_count--;
+ rgi->tables_to_lock = NULL;
+ delete rgi->rli;
+ delete rgi;
+}
+
+void ha_xpand::build_key_packed_row(uint index, const uchar *buf,
+ uchar *packed_key, size_t *packed_key_len)
+{
+ if (index == table->s->primary_key && has_hidden_key) {
+ memcpy(packed_key, &last_hidden_key, sizeof(ulonglong));
+ *packed_key_len = sizeof(ulonglong);
+ } else {
+ // make a row from the table
+ table->mark_columns_used_by_index(index, &table->tmp_set);
+ *packed_key_len = pack_row(table, &table->tmp_set, packed_key, buf);
+ }
+}
+
+int unpack_row_to_buf(rpl_group_info *rgi, TABLE *table, uchar *data,
+ uchar const *const row_data, MY_BITMAP const *cols,
+ uchar const *const row_end)
+{
+ /* Since unpack_row can only write to record[0], if 'data' does not point to
+ table->record[0], we must back it up and then restore it afterwards. */
+ uchar const *current_row_end;
+ ulong master_reclength;
+ uchar *backup_row = NULL;
+ if (data != table->record[0]) {
+ /* See Update_rows_log_event::do_exec_row(rpl_group_info *rgi)
+ and the definitions of store_record and restore_record. */
+ backup_row = (uchar*) my_alloca(table->s->reclength);
+ memcpy(backup_row, table->record[0], table->s->reclength);
+ restore_record(table, record[data == table->record[1] ? 1 : 2]);
+ }
+
+ int error_code = unpack_row(rgi, table, table->s->fields, row_data, cols,
+ &current_row_end, &master_reclength, row_end);
+
+ if (backup_row) {
+ store_record(table, record[data == table->record[1] ? 1 : 2]);
+ memcpy(table->record[0], backup_row, table->s->reclength);
+ my_afree(backup_row);
+ }
+
+ return error_code;
+}
+
+/****************************************************************************
+** Plugin Functions
+****************************************************************************/
+
+static int xpand_commit(handlerton *hton, THD *thd, bool all)
+{
+ xpand_connection* trx = (xpand_connection *) thd_get_ha_data(thd, hton);
+ assert(trx);
+
+ int error_code = 0;
+ if (trx->has_open_transaction()) {
+ if (all || !thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
+ error_code = trx->commit_transaction();
+ else
+ error_code = trx->new_statement_next();
+ }
+
+ return error_code;
+}
+
+static int xpand_rollback(handlerton *hton, THD *thd, bool all)
+{
+ xpand_connection* trx = (xpand_connection *) thd_get_ha_data(thd, hton);
+ assert(trx);
+
+ int error_code = 0;
+ if (trx->has_open_transaction()) {
+ if (all || !thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
+ error_code = trx->rollback_transaction();
+ else
+ error_code = trx->rollback_statement_next();
+ }
+
+ return error_code;
+}
+
+static handler* xpand_create_handler(handlerton *hton, TABLE_SHARE *table,
+ MEM_ROOT *mem_root)
+{
+ return new (mem_root) ha_xpand(hton, table);
+}
+
+static int xpand_close_connection(handlerton* hton, THD* thd)
+{
+ xpand_connection* trx = (xpand_connection *) thd_get_ha_data(thd, hton);
+ if (!trx)
+ return 0; /* Transaction is not started */
+
+ int error_code = xpand_rollback(xpand_hton, thd, TRUE);
+
+ delete trx;
+
+ return error_code;
+}
+
+static int xpand_panic(handlerton *hton, ha_panic_function type)
+{
+ return 0;
+}
+
+static bool xpand_show_status(handlerton *hton, THD *thd,
+ stat_print_fn *stat_print,
+ enum ha_stat_type stat_type)
+{
+ return FALSE;
+}
+
+static int xpand_discover_table_names(handlerton *hton, LEX_CSTRING *db,
+ MY_DIR *dir,
+ handlerton::discovered_list *result)
+{
+ DBUG_ENTER("xpand_discover_table_names");
+ xpand_connection *xpand_net = new xpand_connection();
+ int error_code = xpand_net->connect();
+ if (error_code) {
+ if (error_code == HA_ERR_NO_CONNECTION)
+ error_code = 0;
+ goto err;
+ }
+
+ error_code = xpand_net->populate_table_list(db, result);
+
+err:
+ delete xpand_net;
+ DBUG_RETURN(error_code);
+}
+
+int xpand_discover_table(handlerton *hton, THD *thd, TABLE_SHARE *share)
+{
+ DBUG_ENTER("xpand_discover_table");
+ xpand_connection *xpand_net = new xpand_connection();
+ int error_code = xpand_net->connect();
+ if (error_code) {
+ if (error_code == HA_ERR_NO_CONNECTION)
+ error_code = HA_ERR_NO_SUCH_TABLE;
+ goto err;
+ }
+
+ error_code = xpand_net->discover_table_details(&share->db, &share->table_name,
+ thd, share);
+
+err:
+ delete xpand_net;
+ DBUG_RETURN(error_code);
+}
+
+static int xpand_init(void *p)
+{
+ DBUG_ENTER("xpand_init");
+
+ xpand_hton = (handlerton *) p;
+ xpand_hton->flags = HTON_NO_FLAGS;
+ xpand_hton->panic = xpand_panic;
+ xpand_hton->close_connection = xpand_close_connection;
+ xpand_hton->commit = xpand_commit;
+ xpand_hton->rollback = xpand_rollback;
+ xpand_hton->create = xpand_create_handler;
+ xpand_hton->show_status = xpand_show_status;
+ xpand_hton->discover_table_names = xpand_discover_table_names;
+ xpand_hton->discover_table = xpand_discover_table;
+ xpand_hton->create_select = create_xpand_select_handler;
+ xpand_hton->create_derived = create_xpand_derived_handler;
+
+ mysql_rwlock_init(key_xpand_hosts, &xpand_hosts_lock);
+ mysql_rwlock_wrlock(&xpand_hosts_lock);
+ xpand_hosts = static_cast<xpand_host_list*>(
+ my_malloc(sizeof(xpand_host_list), MYF(MY_WME | MY_ZEROFILL)));
+ int error_code = xpand_hosts->fill(xpand_hosts_str);
+ if (error_code) {
+ my_free(xpand_hosts);
+ xpand_hosts = NULL;
+ }
+ mysql_rwlock_unlock(&xpand_hosts_lock);
+ DBUG_RETURN(error_code);
+}
+
+static int xpand_deinit(void *p)
+{
+ DBUG_ENTER("xpand_deinit");
+ mysql_rwlock_wrlock(&xpand_hosts_lock);
+ xpand_hosts->empty();
+ my_free(xpand_hosts);
+ xpand_hosts = NULL;
+ mysql_rwlock_destroy(&xpand_hosts_lock);
+ DBUG_RETURN(0);
+}
+
+struct st_mysql_show_var xpand_status_vars[] =
+{
+ {NullS, NullS, SHOW_LONG}
+};
+
+static struct st_mysql_sys_var* xpand_system_variables[] =
+{
+ MYSQL_SYSVAR(connect_timeout),
+ MYSQL_SYSVAR(read_timeout),
+ MYSQL_SYSVAR(write_timeout),
+ MYSQL_SYSVAR(balance_algorithm),
+ MYSQL_SYSVAR(hosts),
+ MYSQL_SYSVAR(username),
+ MYSQL_SYSVAR(password),
+ MYSQL_SYSVAR(port),
+ MYSQL_SYSVAR(socket),
+ MYSQL_SYSVAR(row_buffer),
+ MYSQL_SYSVAR(select_handler),
+ MYSQL_SYSVAR(derived_handler),
+ MYSQL_SYSVAR(enable_direct_update),
+ NULL
+};
+
+static struct st_mysql_storage_engine xpand_storage_engine =
+ {MYSQL_HANDLERTON_INTERFACE_VERSION};
+
+maria_declare_plugin(xpand)
+{
+ MYSQL_STORAGE_ENGINE_PLUGIN, /* Plugin Type */
+ &xpand_storage_engine, /* Plugin Descriptor */
+ "XPAND", /* Plugin Name */
+ "MariaDB", /* Plugin Author */
+ "Xpand storage engine", /* Plugin Description */
+ PLUGIN_LICENSE_GPL, /* Plugin Licence */
+ xpand_init, /* Plugin Entry Point */
+ xpand_deinit, /* Plugin Deinitializer */
+ 0x0001, /* Hex Version Number (0.1) */
+ NULL /* xpand_status_vars */, /* Status Variables */
+ xpand_system_variables, /* System Variables */
+ "0.1", /* String Version */
+ MariaDB_PLUGIN_MATURITY_EXPERIMENTAL /* Maturity Level */
+}
+maria_declare_plugin_end;
diff --git a/storage/xpand/ha_xpand.h b/storage/xpand/ha_xpand.h
new file mode 100644
index 00000000000..808e516edd3
--- /dev/null
+++ b/storage/xpand/ha_xpand.h
@@ -0,0 +1,147 @@
+/*****************************************************************************
+Copyright (c) 2019, 2020, MariaDB Corporation.
+*****************************************************************************/
+
+#ifndef _ha_xpand_h
+#define _ha_xpand_h
+
+#ifdef USE_PRAGMA_INTERFACE
+#pragma interface /* gcc class implementation */
+#endif
+
+#define MYSQL_SERVER 1
+#include "xpand_connection.h"
+#include "my_bitmap.h"
+#include "table.h"
+#include "rpl_rli.h"
+#include "handler.h"
+#include "sql_class.h"
+#include "sql_show.h"
+#include "mysql.h"
+#include "../../sql/rpl_record.h"
+
+size_t estimate_row_size(TABLE *table);
+xpand_connection *get_trx(THD *thd, int *error_code);
+bool get_enable_sh(THD* thd);
+void add_current_table_to_rpl_table_list(rpl_group_info **_rgi, THD *thd,
+ TABLE *table);
+void remove_current_table_from_rpl_table_list(rpl_group_info *rgi);
+int unpack_row_to_buf(rpl_group_info *rgi, TABLE *table, uchar *data,
+ uchar const *const row_data, MY_BITMAP const *cols,
+ uchar const *const row_end);
+void xpand_mark_tables_for_discovery(LEX *lex);
+ulonglong *xpand_extract_table_oids(THD *thd, LEX *lex);
+
+
+class Xpand_share : public Handler_share {
+public:
+ Xpand_share(): xpand_table_oid(0), rediscover_table(false) {}
+
+ std::atomic<ulonglong> xpand_table_oid;
+ std::atomic<bool> rediscover_table;
+};
+
+class ha_xpand : public handler
+{
+private:
+ // TODO: do we need this here or one in share would be sufficient?
+ ulonglong xpand_table_oid;
+ rpl_group_info *rgi;
+
+ Field *auto_inc_field;
+ ulonglong auto_inc_value;
+
+ bool has_hidden_key;
+ ulonglong last_hidden_key;
+ xpand_connection_cursor *scan_cur;
+ bool is_scan;
+ MY_BITMAP scan_fields;
+ bool sorted_scan;
+ xpand_lock_mode_t xpd_lock_type;
+
+ uint last_dup_errkey;
+
+ typedef enum xpand_upsert_flags {
+ XPAND_HAS_UPSERT= 1,
+ XPAND_BULK_UPSERT= 2,
+ XPAND_UPSERT_SENT= 4
+ } xpd_upsert_flags_t;
+ int upsert_flag;
+
+ List<COND> pushdown_cond_list;
+
+ Xpand_share *get_share(); ///< Get the share
+
+public:
+ ha_xpand(handlerton *hton, TABLE_SHARE *table_arg);
+ ~ha_xpand();
+ int create(const char *name, TABLE *form, HA_CREATE_INFO *info) override;
+ int delete_table(const char *name) override;
+ int rename_table(const char* from, const char* to) override;
+ int open(const char *name, int mode, uint test_if_locked) override;
+ int close(void) override;
+ int reset() override;
+ int extra(enum ha_extra_function operation) override;
+ int write_row(const uchar *buf) override;
+ // start_bulk_update exec_bulk_update
+ int update_row(const uchar *old_data, const uchar *new_data) override;
+ // start_bulk_delete exec_bulk_delete
+ int delete_row(const uchar *buf) override;
+ int direct_update_rows_init(List<Item> *update_fields) override;
+ int direct_update_rows(ha_rows *update_rows, ha_rows *found_rows) override;
+ void start_bulk_insert(ha_rows rows, uint flags = 0) override;
+ int end_bulk_insert() override;
+
+ Table_flags table_flags(void) const override;
+ ulong index_flags(uint idx, uint part, bool all_parts) const override;
+ uint max_supported_keys() const override { return MAX_KEY; }
+
+ ha_rows records() override;
+ ha_rows records_in_range(uint inx, key_range *min_key,
+ key_range *max_key) override;
+
+ int info(uint flag) override; // see my_base.h for full description
+
+ // multi_read_range
+ // read_range
+ int index_init(uint idx, bool sorted) override;
+ int index_read(uchar * buf, const uchar * key, uint key_len,
+ enum ha_rkey_function find_flag) override;
+ int index_first(uchar *buf) override;
+ int index_prev(uchar *buf) override;
+ int index_last(uchar *buf) override;
+ int index_next(uchar *buf) override;
+ //int index_next_same(uchar *buf, const uchar *key, uint keylen) override;
+ int index_end() override;
+
+ int rnd_init(bool scan) override;
+ int rnd_next(uchar *buf) override;
+ int rnd_pos(uchar * buf, uchar *pos) override;
+ int rnd_end() override;
+
+ void position(const uchar *record) override;
+ uint lock_count(void) const override;
+ THR_LOCK_DATA **store_lock(THD *thd,
+ THR_LOCK_DATA **to,
+ enum thr_lock_type lock_type) override;
+ int external_lock(THD *thd, int lock_type) override;
+
+ uint8 table_cache_type() override
+ {
+ return(HA_CACHE_TBL_NOCACHE);
+ }
+
+ const COND *cond_push(const COND *cond) override;
+ void cond_pop() override;
+ int info_push(uint info_type, void *info) override;
+
+ ulonglong get_table_oid();
+private:
+ void build_key_packed_row(uint index, const uchar *buf,
+ uchar *packed_key, size_t *packed_key_len);
+};
+
+bool select_handler_setting(THD* thd);
+bool derived_handler_setting(THD* thd);
+uint row_buffer_setting(THD* thd);
+#endif // _ha_xpand_h
diff --git a/storage/xpand/ha_xpand_pushdown.cc b/storage/xpand/ha_xpand_pushdown.cc
new file mode 100644
index 00000000000..e51bc162236
--- /dev/null
+++ b/storage/xpand/ha_xpand_pushdown.cc
@@ -0,0 +1,484 @@
+/*****************************************************************************
+Copyright (c) 2019, 2020, MariaDB Corporation.
+*****************************************************************************/
+
+#include "ha_xpand.h"
+#include "ha_xpand_pushdown.h"
+
+extern handlerton *xpand_hton;
+extern uint xpand_row_buffer;
+
+/*@brief Fills up array data types, metadata and nullability*/
+/************************************************************
+ * DESCRIPTION:
+ * Fills up three arrays with: field binlog data types, field
+ * metadata and nullability bitmask as in Table_map_log_event
+ * ctor. Internally creates a temporary table as does
+ * Pushdown_select. DH uses the actual temp table w/o
+ * b/c create_DH is called later compared to create_SH.
+ * More details in server/sql/log_event_server.cc
+ * PARAMETERS:
+ * thd - THD*
+ * table__ - TABLE* temp table for the results
+ * sl - SELECT_LEX*
+ * fieldtype - uchar*
+ * field_metadata - uchar*
+ * null_bits - uchar*
+ * num_null_bytes - null bit size
+ * fields_count - a number of fields
+ * RETURN:
+ * metadata_size int or -1 in case of error
+ ************************************************************/
+int get_field_types(THD *thd, TABLE *table__, SELECT_LEX *sl, uchar *fieldtype,
+ uchar *field_metadata, uchar *null_bits,
+ const int num_null_bytes, const uint fields_count)
+{
+ int field_metadata_size = 0;
+ int metadata_index = 0;
+ TABLE *tmp_table= table__;
+
+ if (!tmp_table) {
+ // Construct a tmp table with fields to find out result DTs.
+ // This should be reconsidered if it worths the effort.
+ List<Item> types;
+ TMP_TABLE_PARAM tmp_table_param;
+ sl->master_unit()->join_union_item_types(thd, types, 1);
+ tmp_table_param.init();
+ tmp_table_param.field_count= types.elements;
+
+ tmp_table = create_tmp_table(thd, &tmp_table_param, types, (ORDER *) 0,
+ false, 0, TMP_TABLE_ALL_COLUMNS, 1,
+ &empty_clex_str, true, false);
+ if (!tmp_table) {
+ field_metadata_size = -1;
+ goto err;
+ }
+ }
+
+ for (unsigned int i = 0 ; i < fields_count; ++i) {
+ fieldtype[i]= tmp_table->field[i]->binlog_type();
+ }
+
+ bzero(field_metadata, (fields_count * 2));
+ for (unsigned int i= 0 ; i < fields_count ; i++)
+ {
+ Binlog_type_info bti= tmp_table->field[i]->binlog_type_info();
+ uchar *ptr = reinterpret_cast<uchar*>(&bti.m_metadata);
+ memcpy(&field_metadata[metadata_index], ptr, bti.m_metadata_size);
+ metadata_index+= bti.m_metadata_size;
+ }
+
+ if (metadata_index < 251)
+ field_metadata_size += metadata_index + 1;
+ else
+ field_metadata_size += metadata_index + 3;
+
+ bzero(null_bits, num_null_bytes);
+ for (unsigned int i= 0 ; i < fields_count ; ++i) {
+ if (tmp_table->field[i]->maybe_null()) {
+ null_bits[(i / 8)]+= 1 << (i % 8);
+ }
+ }
+
+ if (!table__)
+ free_tmp_table(thd, tmp_table);
+err:
+ return field_metadata_size;
+}
+
+/*@brief create_xpand_select_handler- Creates handler*/
+/************************************************************
+ * DESCRIPTION:
+ * Creates a select handler
+ * More details in server/sql/select_handler.h
+ * PARAMETERS:
+ * thd - THD pointer.
+ * sel - SELECT_LEX* that describes the query.
+ * RETURN:
+ * select_handler if possible
+ * NULL otherwise
+ ************************************************************/
+select_handler*
+create_xpand_select_handler(THD* thd, SELECT_LEX* select_lex)
+{
+ ulonglong *oids = NULL;
+ ha_xpand_select_handler *sh = NULL;
+ if (!select_handler_setting(thd)) {
+ return sh;
+ }
+
+ // TODO Return early for EXPLAIN before we run the actual scan.
+ // We can send compile request when we separate compilation
+ // and execution.
+ xpand_connection_cursor *scan = NULL;
+ if (thd->lex->describe) {
+ sh = new ha_xpand_select_handler(thd, select_lex, scan);
+ return sh;
+ }
+
+ // Multi-update runs an implicit query to collect constraints.
+ // SH couldn't be used for this.
+ if (thd->lex->sql_command == SQLCOM_UPDATE_MULTI) {
+ return sh;
+ }
+
+ String query;
+ // Print the query into a string provided
+ select_lex->print(thd, &query, QT_ORDINARY);
+ int error_code = 0;
+ int field_metadata_size = 0;
+ xpand_connection *trx = NULL;
+
+ // We presume this number is equal to types.elements in get_field_types
+ uint items_number = select_lex->get_item_list()->elements;
+ uint num_null_bytes = (items_number + 7) / 8;
+ uchar *fieldtype = NULL;
+ uchar *null_bits = NULL;
+ uchar *field_metadata = NULL;
+ uchar *meta_memory= (uchar *)my_multi_malloc(MYF(MY_WME), &fieldtype, items_number,
+ &null_bits, num_null_bytes, &field_metadata, (items_number * 2), NULL);
+
+ if (!meta_memory) {
+ // The only way to say something here is to raise warning
+ // b/c we will fallback to other access methods: derived handler or rowstore.
+ goto err;
+ }
+
+ if((field_metadata_size =
+ get_field_types(thd, NULL, select_lex, fieldtype, field_metadata,
+ null_bits, num_null_bytes, items_number)) < 0) {
+ goto err;
+ }
+
+ trx = get_trx(thd, &error_code);
+ if (!trx)
+ goto err;
+
+ if (!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
+ trx->auto_commit_next();
+
+ oids = xpand_extract_table_oids(thd, select_lex->parent_lex);
+ if ((error_code = trx->scan_query(query, fieldtype, items_number, null_bits,
+ num_null_bytes, field_metadata,
+ field_metadata_size,
+ row_buffer_setting(thd), oids, &scan))) {
+ goto err;
+ }
+
+ sh = new ha_xpand_select_handler(thd, select_lex, scan);
+
+err:
+ if (meta_memory)
+ my_free(meta_memory);
+
+ if (error_code == HA_ERR_TABLE_DEF_CHANGED)
+ xpand_mark_tables_for_discovery(select_lex->parent_lex);
+
+ return sh;
+}
+
+/***********************************************************
+ * DESCRIPTION:
+ * select_handler constructor
+ * PARAMETERS:
+ * thd - THD pointer.
+ * select_lex - sematic tree for the query.
+ **********************************************************/
+ha_xpand_select_handler::ha_xpand_select_handler(
+ THD *thd,
+ SELECT_LEX* select_lex,
+ xpand_connection_cursor *scan_)
+ : select_handler(thd, xpand_hton)
+{
+ thd__ = thd;
+ scan = scan_;
+ select = select_lex;
+ rgi = NULL;
+}
+
+/***********************************************************
+ * DESCRIPTION:
+ * select_handler constructor
+ * This frees dynamic memory allocated for bitmap
+ * and disables replication to SH temp table.
+ **********************************************************/
+ha_xpand_select_handler::~ha_xpand_select_handler()
+{
+ int error_code;
+ xpand_connection *trx = get_trx(thd, &error_code);
+ if (!trx) {
+ // TBD Log this
+ }
+ if (trx && scan)
+ trx->scan_end(scan);
+
+ // If the ::init_scan has been executed
+ if (table__)
+ my_bitmap_free(&scan_fields);
+
+ if (rgi)
+ remove_current_table_from_rpl_table_list(rgi);
+}
+
+/*@brief Initiate the query for select_handler */
+/***********************************************************
+ * DESCRIPTION:
+ * Initializes dynamic structures and sets SH temp table
+ * as RBR replication destination to unpack rows.
+ * * PARAMETERS:
+ * RETURN:
+ * rc as int
+ * ********************************************************/
+int ha_xpand_select_handler::init_scan()
+{
+ // Save this into the base handler class attribute
+ table__ = table;
+ // need this bitmap future in next_row()
+ if (my_bitmap_init(&scan_fields, NULL, table->read_set->n_bits, false))
+ return ER_OUTOFMEMORY;
+ bitmap_set_all(&scan_fields);
+
+ add_current_table_to_rpl_table_list(&rgi, thd__, table__);
+
+ return 0;
+}
+
+/*@brief Fetch next row for select_handler */
+/***********************************************************
+ * DESCRIPTION:
+ * Fetch next row for select_handler.
+ * PARAMETERS:
+ * RETURN:
+ * rc as int
+ * ********************************************************/
+int ha_xpand_select_handler::next_row()
+{
+ int error_code = 0;
+ xpand_connection *trx = get_trx(thd, &error_code);
+ if (!trx)
+ return error_code;
+
+ assert(scan);
+
+ uchar *rowdata;
+ ulong rowdata_length;
+ if ((error_code = trx->scan_next(scan, &rowdata, &rowdata_length)))
+ return error_code;
+
+ uchar const *current_row_end;
+ ulong master_reclength;
+
+ error_code = unpack_row(rgi, table, table->s->fields, rowdata,
+ &scan_fields, &current_row_end,
+ &master_reclength, rowdata + rowdata_length);
+
+ if (error_code)
+ return error_code;
+
+ return 0;
+}
+
+/*@brief Finishes the scan and clean it up */
+/***********************************************************
+ * DESCRIPTION:
+ * Finishes the scan for select handler
+ * PARAMETERS:
+ * RETURN:
+ * rc as int
+ ***********************************************************/
+int ha_xpand_select_handler::end_scan()
+{
+ return 0;
+}
+
+/*@brief create_xpand_derived_handler- Creates handler*/
+/************************************************************
+ * DESCRIPTION:
+ * Creates a derived handler
+ * More details in server/sql/derived_handler.h
+ * PARAMETERS:
+ * thd - THD pointer.
+ * derived - TABLE_LIST* that describes the tables involved
+ * RETURN:
+ * derived_handler if possible
+ * NULL otherwise
+ ************************************************************/
+derived_handler*
+create_xpand_derived_handler(THD* thd, TABLE_LIST *derived)
+{
+ ha_xpand_derived_handler *dh = NULL;
+ if (!derived_handler_setting(thd)) {
+ return dh;
+ }
+
+ SELECT_LEX_UNIT *unit= derived->derived;
+ SELECT_LEX *select_lex = unit->first_select();
+ String query;
+
+ dh = new ha_xpand_derived_handler(thd, select_lex, NULL);
+
+ return dh;
+}
+
+/***********************************************************
+ * DESCRIPTION:
+ * derived_handler constructor
+ * PARAMETERS:
+ * thd - THD pointer.
+ * select_lex - sematic tree for the query.
+ **********************************************************/
+ha_xpand_derived_handler::ha_xpand_derived_handler(
+ THD *thd,
+ SELECT_LEX* select_lex,
+ xpand_connection_cursor *scan_)
+ : derived_handler(thd, xpand_hton)
+{
+ thd__ = thd;
+ scan = scan_;
+ select = select_lex;
+ rgi = NULL;
+}
+
+/***********************************************************
+ * DESCRIPTION:
+ * derived_handler constructor
+ * This frees dynamic memory allocated for bitmap
+ * and disables replication to SH temp table.
+ **********************************************************/
+ha_xpand_derived_handler::~ha_xpand_derived_handler()
+{
+ int error_code;
+ xpand_connection *trx = get_trx(thd, &error_code);
+ if (!trx) {
+ // TBD Log this.
+ }
+ if (trx && scan)
+ trx->scan_end(scan);
+
+ // If the ::init_scan has been executed
+ if (table__)
+ my_bitmap_free(&scan_fields);
+
+ if (rgi)
+ remove_current_table_from_rpl_table_list(rgi);
+}
+
+/*@brief Initiate the query for derived_handler */
+/***********************************************************
+ * DESCRIPTION:
+ * Initializes dynamic structures and sets SH temp table
+ * as RBR replication destination to unpack rows.
+ * * PARAMETERS:
+ * RETURN:
+ * rc as int
+ * ********************************************************/
+int ha_xpand_derived_handler::init_scan()
+{
+ String query;
+ // Print the query into a string provided
+ select->print(thd__, &query, QT_ORDINARY);
+ int error_code = 0;
+ int field_metadata_size = 0;
+ xpand_connection *trx = NULL;
+ ulonglong *oids = NULL;
+
+ // We presume this number is equal to types.elements in get_field_types
+ uint items_number= select->get_item_list()->elements;
+ uint num_null_bytes = (items_number + 7) / 8;
+ uchar *fieldtype = NULL;
+ uchar *null_bits = NULL;
+ uchar *field_metadata = NULL;
+ uchar *meta_memory= (uchar *)my_multi_malloc(MYF(MY_WME), &fieldtype, items_number,
+ &null_bits, num_null_bytes, &field_metadata, (items_number * 2), NULL);
+
+ if (!meta_memory) {
+ // The only way to say something here is to raise warning
+ // b/c we will fallback to other access methods: derived handler or rowstore.
+ goto err;
+ }
+
+ if((field_metadata_size=
+ get_field_types(thd__, table, select, fieldtype, field_metadata,
+ null_bits, num_null_bytes, items_number)) < 0) {
+ goto err;
+ }
+
+ trx = get_trx(thd__, &error_code);
+ if (!trx)
+ goto err;
+
+ oids = xpand_extract_table_oids(thd__, select->parent_lex);
+ if ((error_code = trx->scan_query(query, fieldtype, items_number, null_bits,
+ num_null_bytes, field_metadata,
+ field_metadata_size,
+ row_buffer_setting(thd), oids, &scan))) {
+ goto err;
+ }
+
+ // Save this into the base handler class attribute
+ table__ = table;
+
+ // need this bitmap future in next_row()
+ if (my_bitmap_init(&scan_fields, NULL, table->read_set->n_bits, false))
+ return ER_OUTOFMEMORY;
+ bitmap_set_all(&scan_fields);
+
+ add_current_table_to_rpl_table_list(&rgi, thd__, table__);
+
+err:
+ if (meta_memory)
+ my_free(meta_memory);
+
+ if (error_code == HA_ERR_TABLE_DEF_CHANGED)
+ xpand_mark_tables_for_discovery(select->parent_lex);
+
+ return error_code;
+}
+
+/*@brief Fetch next row for derived_handler */
+/***********************************************************
+ * DESCRIPTION:
+ * Fetch next row for derived_handler.
+ * PARAMETERS:
+ * RETURN:
+ * rc as int
+ * ********************************************************/
+int ha_xpand_derived_handler::next_row()
+{
+ int error_code = 0;
+ xpand_connection *trx = get_trx(thd, &error_code);
+ if (!trx)
+ return error_code;
+
+ assert(scan);
+
+ uchar *rowdata;
+ ulong rowdata_length;
+ if ((error_code = trx->scan_next(scan, &rowdata, &rowdata_length)))
+ return error_code;
+
+ uchar const *current_row_end;
+ ulong master_reclength;
+
+ error_code = unpack_row(rgi, table, table->s->fields, rowdata,
+ &scan_fields, &current_row_end,
+ &master_reclength, rowdata + rowdata_length);
+
+ if (error_code)
+ return error_code;
+
+ return 0;
+}
+
+/*@brief Finishes the scan and clean it up */
+/***********************************************************
+ * DESCRIPTION:
+ * Finishes the scan for derived handler
+ * PARAMETERS:
+ * RETURN:
+ * rc as int
+ ***********************************************************/
+int ha_xpand_derived_handler::end_scan()
+{
+ return 0;
+}
diff --git a/storage/xpand/ha_xpand_pushdown.h b/storage/xpand/ha_xpand_pushdown.h
new file mode 100644
index 00000000000..16284b8e076
--- /dev/null
+++ b/storage/xpand/ha_xpand_pushdown.h
@@ -0,0 +1,84 @@
+/*****************************************************************************
+Copyright (c) 2019, 2020, MariaDB Corporation.
+*****************************************************************************/
+#ifndef _ha_xpand_pushdown_h
+#define _ha_xpand_pushdown_h
+
+#include "select_handler.h"
+#include "derived_handler.h"
+#include "sql_select.h"
+
+/*@brief base_handler class*/
+/***********************************************************
+ * DESCRIPTION:
+ * To be described
+ ************************************************************/
+class ha_xpand_base_handler
+{
+ // To simulate abstract class
+protected:
+ ha_xpand_base_handler(): thd__(0),table__(0) {}
+ ~ha_xpand_base_handler() {}
+
+ // Copies of pushdown handlers attributes
+ // to use them in shared methods.
+ THD *thd__;
+ TABLE *table__;
+ // The bitmap used to sent
+ MY_BITMAP scan_fields;
+ // Structures to unpack RBR rows from XPD BE
+ rpl_group_info *rgi;
+ // XPD BE scan operation reference
+ xpand_connection_cursor *scan;
+};
+
+/*@brief select_handler class*/
+/***********************************************************
+ * DESCRIPTION:
+ * select_handler API methods. Could be used by the server
+ * tp pushdown the whole query described by SELECT_LEX.
+ * More details in server/sql/select_handler.h
+ * sel semantic tree for the query in SELECT_LEX.
+ ************************************************************/
+class ha_xpand_select_handler:
+ private ha_xpand_base_handler,
+ public select_handler
+{
+public:
+ ha_xpand_select_handler(THD* thd_arg, SELECT_LEX* sel,
+ xpand_connection_cursor *scan);
+ ~ha_xpand_select_handler();
+
+ int init_scan() override;
+ int next_row() override;
+ int end_scan() override;
+ void print_error(int, unsigned long) override {}
+};
+
+/*@brief derived_handler class*/
+/***********************************************************
+ * DESCRIPTION:
+ * derived_handler API methods. Could be used by the server
+ * tp pushdown the whole query described by SELECT_LEX.
+ * More details in server/sql/derived_handler.h
+ * sel semantic tree for the query in SELECT_LEX.
+ ************************************************************/
+class ha_xpand_derived_handler:
+ private ha_xpand_base_handler,
+ public derived_handler
+{
+public:
+ ha_xpand_derived_handler(THD* thd_arg, SELECT_LEX* sel,
+ xpand_connection_cursor *scan);
+ ~ha_xpand_derived_handler();
+
+ int init_scan() override;
+ int next_row() override;
+ int end_scan() override;
+ void print_error(int, unsigned long) override {}
+};
+
+select_handler *create_xpand_select_handler(THD* thd, SELECT_LEX* select_lex);
+derived_handler *create_xpand_derived_handler(THD* thd, TABLE_LIST *derived);
+
+#endif
diff --git a/storage/xpand/xpand_connection.cc b/storage/xpand/xpand_connection.cc
new file mode 100644
index 00000000000..e6b5059322b
--- /dev/null
+++ b/storage/xpand/xpand_connection.cc
@@ -0,0 +1,1357 @@
+/*****************************************************************************
+Copyright (c) 2019, 2020, MariaDB Corporation.
+*****************************************************************************/
+
+/** @file xpand_connection.cc */
+
+#include "xpand_connection.h"
+#include "ha_xpand.h"
+#include <string>
+#include "handler.h"
+#include "table.h"
+#include "sql_class.h"
+#include "my_pthread.h"
+#include "tztime.h"
+#include "errmsg.h"
+
+#ifdef _WIN32
+#include <stdlib.h>
+#define htobe64 _byteswap_uint64
+#define be64toh _byteswap_uint64
+#define htobe32 _byteswap_ulong
+#define be32toh _byteswap_ulong
+#define htobe16 _byteswap_ushort
+#define be16toh _byteswap_ushort
+#endif
+
+#if defined(__APPLE__)
+#include <libkern/OSByteOrder.h>
+#define htobe64(x) OSSwapHostToBigInt64(x)
+#define be64toh(x) OSSwapBigToHostInt64(x)
+#define htobe32(x) OSSwapHostToBigInt32(x)
+#define be32toh(x) OSSwapBigToHostInt32(x)
+#define htobe16(x) OSSwapHostToBigInt16(x)
+#define be16toh(x) OSSwapBigToHostInt16(x)
+#endif
+
+
+extern int xpand_connect_timeout;
+extern int xpand_read_timeout;
+extern int xpand_write_timeout;
+extern char *xpand_username;
+extern char *xpand_password;
+extern uint xpand_port;
+extern char *xpand_socket;
+
+/*
+ This class implements the commands that can be sent to the cluster by the
+ Xpand engine. All of these commands return a status to the caller, but some
+ commands also create open invocations on the cluster, which must be closed by
+ sending additional commands.
+
+ Transactions on the cluster are started using flags attached to commands, and
+ transactions are committed or rolled back using separate commands.
+
+ Methods ending with _next affect the transaction state after the next command
+ is sent to the cluster. Other transaction commands are sent to the cluster
+ immediately, and the state is changed before they return.
+
+ _____________________ _______________________
+ | | | | | |
+ V | | V | |
+ NONE --> REQUESTED --> STARTED --> NEW_STMT |
+ | |
+ `----> ROLLBACK_STMT ---`
+
+ The commit and rollback commands will change any other state to NONE. This
+ includes the REQUESTED state, for which nothing will be sent to the cluster.
+ The rollback statement command can likewise change the state from NEW_STMT to
+ STARTED without sending anything to the cluster.
+
+ In addition, the XPAND_TRANS_AUTOCOMMIT flag will cause the transactions
+ for commands that complete without leaving open invocations on the cluster to
+ be committed if successful or rolled back if there was an error. If
+ auto-commit is enabled, only one open invocation may be in progress at a
+ time.
+*/
+
+enum xpand_trans_state {
+ XPAND_TRANS_STARTED = 0,
+ XPAND_TRANS_REQUESTED = 1,
+ XPAND_TRANS_NEW_STMT = 2,
+ XPAND_TRANS_ROLLBACK_STMT = 4,
+ XPAND_TRANS_NONE = 32,
+};
+const int XPAND_TRANS_STARTS_STMT = (XPAND_TRANS_NEW_STMT |
+ XPAND_TRANS_REQUESTED |
+ XPAND_TRANS_ROLLBACK_STMT);
+
+enum xpand_trans_post_flags {
+ XPAND_TRANS_AUTOCOMMIT = 8,
+ XPAND_TRANS_NO_POST_FLAGS = 0,
+};
+
+enum xpand_commands {
+ XPAND_WRITE_ROW = 1,
+ XPAND_SCAN_TABLE,
+ XPAND_SCAN_NEXT,
+ XPAND_SCAN_STOP,
+ XPAND_KEY_READ,
+ XPAND_KEY_DELETE,
+ XPAND_SCAN_QUERY,
+ XPAND_KEY_UPDATE,
+ XPAND_SCAN_FROM_KEY,
+ XPAND_UPDATE_QUERY,
+ XPAND_COMMIT,
+ XPAND_ROLLBACK,
+ XPAND_SCAN_TABLE_COND,
+};
+
+/****************************************************************************
+** Class xpand_connection
+****************************************************************************/
+xpand_connection::xpand_connection()
+ : command_buffer(NULL), command_buffer_length(0), command_length(0),
+ trans_state(XPAND_TRANS_NONE), trans_flags(XPAND_TRANS_NO_POST_FLAGS)
+{
+ DBUG_ENTER("xpand_connection::xpand_connection");
+ memset(&xpand_net, 0, sizeof(MYSQL));
+ DBUG_VOID_RETURN;
+}
+
+xpand_connection::~xpand_connection()
+{
+ DBUG_ENTER("xpand_connection::~xpand_connection");
+ if (is_connected())
+ disconnect(TRUE);
+
+ if (command_buffer)
+ my_free(command_buffer);
+ DBUG_VOID_RETURN;
+}
+
+void xpand_connection::disconnect(bool is_destructor)
+{
+ DBUG_ENTER("xpand_connection::disconnect");
+ if (is_destructor)
+ {
+ /*
+ Connection object destruction occurs after the destruction of
+ the thread used by the network has begun, so usage of that
+ thread object now is not reliable
+ */
+ xpand_net.net.thd = NULL;
+ }
+ mysql_close(&xpand_net);
+ DBUG_VOID_RETURN;
+}
+
+extern int xpand_hosts_cur;
+extern ulong xpand_balance_algorithm;
+
+extern mysql_rwlock_t xpand_hosts_lock;
+extern xpand_host_list *xpand_hosts;
+
+int xpand_connection::connect()
+{
+ DBUG_ENTER("xpand_connection::connect");
+ int start = 0;
+ if (xpand_balance_algorithm == XPAND_BALANCE_ROUND_ROBIN)
+ start = my_atomic_add32(&xpand_hosts_cur, 1);
+
+ mysql_rwlock_rdlock(&xpand_hosts_lock);
+
+ //search for available host
+ int error_code = HA_ERR_NO_CONNECTION;
+ for (int i = 0; i < xpand_hosts->hosts_len; i++) {
+ char *host = xpand_hosts->hosts[(start + i) % xpand_hosts->hosts_len];
+ error_code = connect_direct(host);
+ if (!error_code)
+ break;
+ }
+ mysql_rwlock_unlock(&xpand_hosts_lock);
+ DBUG_RETURN(error_code);
+}
+
+
+int xpand_connection::connect_direct(char *host)
+{
+ DBUG_ENTER("xpand_connection::connect_direct");
+ my_bool my_true = true;
+ DBUG_PRINT("host", ("%s", host));
+
+ if (!mysql_init(&xpand_net))
+ DBUG_RETURN(HA_ERR_OUT_OF_MEM);
+
+ uint protocol_tcp = MYSQL_PROTOCOL_TCP;
+ mysql_options(&xpand_net, MYSQL_OPT_PROTOCOL, &protocol_tcp);
+ mysql_options(&xpand_net, MYSQL_OPT_READ_TIMEOUT,
+ &xpand_read_timeout);
+ mysql_options(&xpand_net, MYSQL_OPT_WRITE_TIMEOUT,
+ &xpand_write_timeout);
+ mysql_options(&xpand_net, MYSQL_OPT_CONNECT_TIMEOUT,
+ &xpand_connect_timeout);
+ mysql_options(&xpand_net, MYSQL_OPT_USE_REMOTE_CONNECTION,
+ NULL);
+ mysql_options(&xpand_net, MYSQL_SET_CHARSET_NAME, "utf8mb4");
+ mysql_options(&xpand_net, MYSQL_OPT_USE_THREAD_SPECIFIC_MEMORY,
+ (char *) &my_true);
+ mysql_options(&xpand_net, MYSQL_INIT_COMMAND,"SET autocommit=0");
+
+#ifdef XPAND_CONNECTION_SSL
+ if (opt_ssl_ca_length | conn->tgt_ssl_capath_length |
+ conn->tgt_ssl_cert_length | conn->tgt_ssl_key_length)
+ {
+ mysql_ssl_set(&xpand_net, conn->tgt_ssl_key, conn->tgt_ssl_cert,
+ conn->tgt_ssl_ca, conn->tgt_ssl_capath, conn->tgt_ssl_cipher);
+ if (conn->tgt_ssl_vsc)
+ {
+ my_bool verify_flg = TRUE;
+ mysql_options(&xpand_net, MYSQL_OPT_SSL_VERIFY_SERVER_CERT, &verify_flg);
+ }
+ }
+#endif
+
+ int error_code = 0;
+ if (!mysql_real_connect(&xpand_net, host, xpand_username, xpand_password,
+ NULL, xpand_port, xpand_socket,
+ CLIENT_MULTI_STATEMENTS))
+ {
+ sql_print_error("Error connecting to xpand: %s", mysql_error(&xpand_net));
+ disconnect();
+ error_code = HA_ERR_NO_CONNECTION;
+ }
+
+ DBUG_RETURN(error_code);
+}
+
+int xpand_connection::add_status_vars()
+{
+ DBUG_ENTER("xpand_connection::add_status_vars");
+
+ if (!(trans_state & XPAND_TRANS_STARTS_STMT))
+ DBUG_RETURN(add_command_operand_uchar(0));
+
+ int error_code = 0;
+ system_variables vars = current_thd->variables;
+ if ((error_code = add_command_operand_uchar(1)))
+ DBUG_RETURN(error_code);
+ //sql mode
+ if ((error_code = add_command_operand_ulonglong(vars.sql_mode)))
+ DBUG_RETURN(error_code);
+ //auto increment state
+ if ((error_code = add_command_operand_ushort(vars.auto_increment_increment)))
+ DBUG_RETURN(error_code);
+ if ((error_code = add_command_operand_ushort(vars.auto_increment_offset)))
+ DBUG_RETURN(error_code);
+ //character sets and collations
+ if ((error_code = add_command_operand_ushort(vars.character_set_results->number)))
+ DBUG_RETURN(error_code);
+ if ((error_code = add_command_operand_ushort(vars.character_set_client->number)))
+ DBUG_RETURN(error_code);
+ if ((error_code = add_command_operand_ushort(vars.collation_connection->number)))
+ DBUG_RETURN(error_code);
+ if ((error_code = add_command_operand_ushort(vars.collation_server->number)))
+ DBUG_RETURN(error_code);
+ //timezone and time names
+ String tzone;
+ vars.time_zone->get_name()->print(&tzone, system_charset_info);
+ if ((error_code = add_command_operand_str((const uchar*)tzone.ptr(),tzone.length())))
+ DBUG_RETURN(error_code);
+ if ((error_code = add_command_operand_ushort(vars.lc_time_names->number)))
+ DBUG_RETURN(error_code);
+ //transaction isolation
+ if ((error_code = add_command_operand_uchar(vars.tx_isolation)))
+ DBUG_RETURN(error_code);
+ DBUG_RETURN(0);
+}
+
+int xpand_connection::begin_command(uchar command)
+{
+ if (trans_state == XPAND_TRANS_NONE)
+ return HA_ERR_INTERNAL_ERROR;
+
+ command_length = 0;
+ int error_code = 0;
+ if ((error_code = add_command_operand_uchar(command)))
+ return error_code;
+
+ if ((error_code = add_command_operand_uchar(trans_state | trans_flags)))
+ return error_code;
+
+ if ((error_code = add_status_vars()))
+ return error_code;
+
+ return error_code;
+}
+
+int xpand_connection::send_command()
+{
+ /*
+ Please note:
+ * The transaction state is set before the command is sent because rolling
+ back a nonexistent transaction is better than leaving a tranaction open
+ on the cluster.
+ * The state may have alreadly been STARTED.
+ * Commit and rollback commands update the transaction state after calling
+ this function.
+ * If auto-commit is enabled, the state may also updated after the
+ response has been processed. We do not clear the auto-commit flag here
+ because it needs to be sent with each command until the transaction is
+ committed or rolled back.
+ */
+ trans_state = XPAND_TRANS_STARTED;
+
+ if (simple_command(&xpand_net,
+ (enum_server_command)XPAND_SERVER_REQUEST,
+ command_buffer, command_length, TRUE))
+ return mysql_errno(&xpand_net);
+ return 0;
+}
+
+int xpand_connection::read_query_response()
+{
+ int error_code = 0;
+ if (xpand_net.methods->read_query_result(&xpand_net))
+ error_code = mysql_errno(&xpand_net);
+ auto_commit_closed();
+ return error_code;
+}
+
+bool xpand_connection::has_open_transaction()
+{
+ return trans_state != XPAND_TRANS_NONE;
+}
+
+int xpand_connection::commit_transaction()
+{
+ DBUG_ENTER("xpand_connection::commit_transaction");
+ if (trans_state == XPAND_TRANS_NONE)
+ DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
+
+ if (trans_state == XPAND_TRANS_REQUESTED) {
+ trans_state = XPAND_TRANS_NONE;
+ trans_flags = XPAND_TRANS_NO_POST_FLAGS;
+ DBUG_RETURN(0);
+ }
+
+ int error_code;
+ if ((error_code = begin_command(XPAND_COMMIT)))
+ DBUG_RETURN(error_code);
+
+ if ((error_code = send_command()))
+ DBUG_RETURN(error_code);
+
+ if ((error_code = read_query_response()))
+ DBUG_RETURN(error_code);
+
+ trans_state = XPAND_TRANS_NONE;
+ trans_flags = XPAND_TRANS_NO_POST_FLAGS;
+ DBUG_RETURN(error_code);
+}
+
+int xpand_connection::rollback_transaction()
+{
+ DBUG_ENTER("xpand_connection::rollback_transaction");
+ if (trans_state == XPAND_TRANS_NONE ||
+ trans_state == XPAND_TRANS_REQUESTED) {
+ trans_state = XPAND_TRANS_NONE;
+ DBUG_RETURN(0);
+ }
+
+ int error_code;
+ if ((error_code = begin_command(XPAND_ROLLBACK)))
+ DBUG_RETURN(error_code);
+
+ if ((error_code = send_command()))
+ DBUG_RETURN(error_code);
+
+ if ((error_code = read_query_response()))
+ DBUG_RETURN(error_code);
+
+ trans_state = XPAND_TRANS_NONE;
+ trans_flags = XPAND_TRANS_NO_POST_FLAGS;
+ DBUG_RETURN(error_code);
+}
+
+int xpand_connection::begin_transaction_next()
+{
+ DBUG_ENTER("xpand_connection::begin_transaction_next");
+ if (trans_state != XPAND_TRANS_NONE ||
+ trans_flags != XPAND_TRANS_NO_POST_FLAGS)
+ DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
+
+ trans_state = XPAND_TRANS_REQUESTED;
+ DBUG_RETURN(0);
+}
+
+int xpand_connection::new_statement_next()
+{
+ DBUG_ENTER("xpand_connection::new_statement_next");
+ if (trans_state != XPAND_TRANS_STARTED ||
+ trans_flags != XPAND_TRANS_NO_POST_FLAGS)
+ DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
+
+ trans_state = XPAND_TRANS_NEW_STMT;
+ DBUG_RETURN(0);
+}
+
+int xpand_connection::rollback_statement_next()
+{
+ DBUG_ENTER("xpand_connection::rollback_statement_next");
+ if (trans_state != XPAND_TRANS_STARTED ||
+ trans_flags != XPAND_TRANS_NO_POST_FLAGS)
+ DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
+
+ trans_state = XPAND_TRANS_ROLLBACK_STMT;
+ DBUG_RETURN(0);
+}
+
+void xpand_connection::auto_commit_next()
+{
+ trans_flags |= XPAND_TRANS_AUTOCOMMIT;
+}
+
+void xpand_connection::auto_commit_closed()
+{
+ if (trans_flags & XPAND_TRANS_AUTOCOMMIT) {
+ trans_flags &= ~XPAND_TRANS_AUTOCOMMIT;
+ trans_state = XPAND_TRANS_NONE;
+ }
+}
+
+int xpand_connection::run_query(String &stmt)
+{
+ int error_code = mysql_real_query(&xpand_net, stmt.ptr(), stmt.length());
+ if (error_code)
+ return mysql_errno(&xpand_net);
+ return error_code;
+}
+
+int xpand_connection::write_row(ulonglong xpand_table_oid, uchar *packed_row,
+ size_t packed_size, ulonglong *last_insert_id)
+{
+ int error_code;
+ command_length = 0;
+
+ // row based commands should not be called with auto commit.
+ if (trans_flags & XPAND_TRANS_AUTOCOMMIT)
+ return HA_ERR_INTERNAL_ERROR;
+
+ if ((error_code = begin_command(XPAND_WRITE_ROW)))
+ return error_code;
+
+ if ((error_code = add_command_operand_ulonglong(xpand_table_oid)))
+ return error_code;
+
+ if ((error_code = add_command_operand_str(packed_row, packed_size)))
+ return error_code;
+
+ if ((error_code = send_command()))
+ return error_code;
+
+ if ((error_code = read_query_response())) {
+ if (error_code == ER_DUP_ENTRY)
+ return HA_ERR_FOUND_DUPP_KEY;
+ return error_code;
+ }
+
+ *last_insert_id = xpand_net.insert_id;
+ return error_code;
+}
+
+int xpand_connection::key_update(ulonglong xpand_table_oid, uchar *packed_key,
+ size_t packed_key_length,
+ MY_BITMAP *update_set, uchar *packed_new_data,
+ size_t packed_new_length)
+{
+ int error_code;
+ command_length = 0;
+
+ // row based commands should not be called with auto commit.
+ if (trans_flags & XPAND_TRANS_AUTOCOMMIT)
+ return HA_ERR_INTERNAL_ERROR;
+
+ if ((error_code = begin_command(XPAND_KEY_UPDATE)))
+ return error_code;
+
+ if ((error_code = add_command_operand_ulonglong(xpand_table_oid)))
+ return error_code;
+
+ if ((error_code = add_command_operand_str(packed_key, packed_key_length)))
+ return error_code;
+
+ if ((error_code = add_command_operand_bitmap(update_set)))
+ return error_code;
+
+ if ((error_code = add_command_operand_str(packed_new_data,
+ packed_new_length)))
+ return error_code;
+
+ if ((error_code = send_command()))
+ return error_code;
+
+ if ((error_code = read_query_response()))
+ return error_code;
+
+ return error_code;
+}
+
+int xpand_connection::key_delete(ulonglong xpand_table_oid,
+ uchar *packed_key, size_t packed_key_length)
+{
+ int error_code;
+ command_length = 0;
+
+ // row based commands should not be called with auto commit.
+ if (trans_flags & XPAND_TRANS_AUTOCOMMIT)
+ return HA_ERR_INTERNAL_ERROR;
+
+ if ((error_code = begin_command(XPAND_KEY_DELETE)))
+ return error_code;
+
+ if ((error_code = add_command_operand_ulonglong(xpand_table_oid)))
+ return error_code;
+
+ if ((error_code = add_command_operand_str(packed_key, packed_key_length)))
+ return error_code;
+
+ if ((error_code = send_command()))
+ return error_code;
+
+ if ((error_code = read_query_response()))
+ return error_code;
+
+ return error_code;
+}
+
+int xpand_connection::key_read(ulonglong xpand_table_oid, uint index,
+ xpand_lock_mode_t lock_mode, MY_BITMAP *read_set,
+ uchar *packed_key, ulong packed_key_length,
+ uchar **rowdata, ulonglong *rowdata_length)
+{
+ int error_code;
+ command_length = 0;
+
+ // row based commands should not be called with auto commit.
+ if (trans_flags & XPAND_TRANS_AUTOCOMMIT)
+ return HA_ERR_INTERNAL_ERROR;
+
+ if ((error_code = begin_command(XPAND_KEY_READ)))
+ return error_code;
+
+ if ((error_code = add_command_operand_ulonglong(xpand_table_oid)))
+ return error_code;
+
+ if ((error_code = add_command_operand_uint(index)))
+ return error_code;
+
+ if ((error_code = add_command_operand_uchar((uchar)lock_mode)))
+ return error_code;
+
+ if ((error_code = add_command_operand_bitmap(read_set)))
+ return error_code;
+
+ if ((error_code = add_command_operand_str(packed_key, packed_key_length)))
+ return error_code;
+
+ if ((error_code = send_command()))
+ return error_code;
+
+ ulong packet_length = cli_safe_read(&xpand_net);
+ if (packet_length == packet_error)
+ return mysql_errno(&xpand_net);
+
+ uchar *data = xpand_net.net.read_pos;
+ *rowdata_length = safe_net_field_length_ll(&data, packet_length);
+ *rowdata = (uchar *)my_malloc(*rowdata_length, MYF(MY_WME));
+ memcpy(*rowdata, data, *rowdata_length);
+
+ packet_length = cli_safe_read(&xpand_net);
+ if (packet_length == packet_error) {
+ my_free(*rowdata);
+ *rowdata = NULL;
+ *rowdata_length = 0;
+ return mysql_errno(&xpand_net);
+ }
+
+ return 0;
+}
+
+class xpand_connection_cursor {
+ struct rowdata {
+ ulong length;
+ uchar *data;
+ };
+
+ ulong current_row;
+ ulong last_row;
+ struct rowdata *rows;
+ uchar *outstanding_row; // to be freed on next request.
+ MYSQL *xpand_net;
+
+public:
+ ulong buffer_size;
+ ulonglong scan_refid;
+ bool eof_reached;
+
+private:
+ int cache_row(uchar *rowdata, ulong rowdata_length)
+ {
+ DBUG_ENTER("xpand_connection_cursor::cache_row");
+ rows[last_row].length = rowdata_length;
+ rows[last_row].data = (uchar *)my_malloc(rowdata_length, MYF(MY_WME));
+ if (!rows[last_row].data)
+ DBUG_RETURN(HA_ERR_OUT_OF_MEM);
+ memcpy(rows[last_row].data, rowdata, rowdata_length);
+ last_row++;
+ DBUG_RETURN(0);
+ }
+
+ int load_rows_impl(bool *stmt_completed)
+ {
+ DBUG_ENTER("xpand_connection_cursor::load_rows_impl");
+ int error_code = 0;
+ ulong packet_length = cli_safe_read(xpand_net);
+ if (packet_length == packet_error) {
+ error_code = mysql_errno(xpand_net);
+ *stmt_completed = TRUE;
+ if (error_code == HA_ERR_END_OF_FILE) {
+ // We have read all rows for query.
+ eof_reached = TRUE;
+ DBUG_RETURN(0);
+ }
+ DBUG_RETURN(error_code);
+ }
+
+ uchar *rowdata = xpand_net->net.read_pos;
+ ulong rowdata_length = (ulong) safe_net_field_length_ll(&rowdata, packet_length);
+ if (!rowdata_length) {
+ // We have read all rows in this batch.
+ DBUG_RETURN(0);
+ }
+
+ if ((error_code = cache_row(rowdata, rowdata_length)))
+ DBUG_RETURN(error_code);
+
+ DBUG_RETURN(load_rows_impl(stmt_completed));
+ }
+
+public:
+ xpand_connection_cursor(MYSQL *xpand_net_, ulong bufsize)
+ {
+ DBUG_ENTER("xpand_connection_cursor::xpand_connection_cursor");
+ xpand_net = xpand_net_;
+ eof_reached = FALSE;
+ current_row = 0;
+ last_row = 0;
+ outstanding_row = NULL;
+ buffer_size = bufsize;
+ rows = NULL;
+ DBUG_VOID_RETURN;
+ }
+
+ ~xpand_connection_cursor()
+ {
+ DBUG_ENTER("xpand_connection_cursor::~xpand_connection_cursor");
+ if (outstanding_row)
+ my_free(outstanding_row);
+ if (rows) {
+ while (current_row < last_row)
+ my_free(rows[current_row++].data);
+ my_free(rows);
+ }
+ DBUG_VOID_RETURN;
+ }
+
+ int load_rows(bool *stmt_completed)
+ {
+ DBUG_ENTER("xpand_connection_cursor::load_rows");
+ current_row = 0;
+ last_row = 0;
+ DBUG_RETURN(load_rows_impl(stmt_completed));
+ }
+
+ int initialize(bool *stmt_completed)
+ {
+ DBUG_ENTER("xpand_connection_cursor::initialize");
+ ulong packet_length = cli_safe_read(xpand_net);
+ if (packet_length == packet_error) {
+ *stmt_completed = TRUE;
+ int error_code = mysql_errno(xpand_net);
+ my_printf_error(error_code, "Xpand error: %s", MYF(0),
+ mysql_error(xpand_net));
+ DBUG_RETURN(error_code);
+ }
+
+ unsigned char *pos = xpand_net->net.read_pos;
+ scan_refid = safe_net_field_length_ll(&pos, packet_length);
+
+ rows = (struct rowdata *)my_malloc(buffer_size * sizeof(struct rowdata),
+ MYF(MY_WME));
+ if (!rows)
+ DBUG_RETURN(HA_ERR_OUT_OF_MEM);
+
+ DBUG_RETURN(load_rows(stmt_completed));
+ }
+
+ uchar *retrieve_row(ulong *rowdata_length)
+ {
+ DBUG_ENTER("xpand_connection_cursor::retrieve_row");
+ if (outstanding_row) {
+ my_free(outstanding_row);
+ outstanding_row = NULL;
+ }
+ if (current_row == last_row)
+ DBUG_RETURN(NULL);
+ *rowdata_length = rows[current_row].length;
+ outstanding_row = rows[current_row].data;
+ current_row++;
+ DBUG_RETURN(outstanding_row);
+ }
+};
+
+int xpand_connection::allocate_cursor(MYSQL *xpand_net, ulong buffer_size,
+ xpand_connection_cursor **scan)
+{
+ DBUG_ENTER("xpand_connection::allocate_cursor");
+ *scan = new xpand_connection_cursor(xpand_net, buffer_size);
+ if (!*scan)
+ DBUG_RETURN(HA_ERR_OUT_OF_MEM);
+
+ bool stmt_completed = FALSE;
+ int error_code = (*scan)->initialize(&stmt_completed);
+ if (error_code) {
+ delete *scan;
+ *scan = NULL;
+ }
+
+ if (stmt_completed)
+ auto_commit_closed();
+
+ DBUG_RETURN(error_code);
+}
+
+int xpand_connection::scan_table(ulonglong xpand_table_oid,
+ xpand_lock_mode_t lock_mode,
+ MY_BITMAP *read_set, ushort row_req,
+ xpand_connection_cursor **scan,
+ String* pushdown_cond_sql)
+{
+ int error_code;
+ command_length = 0;
+
+ // row based commands should not be called with auto commit.
+ if (trans_flags & XPAND_TRANS_AUTOCOMMIT)
+ return HA_ERR_INTERNAL_ERROR;
+
+ if (pushdown_cond_sql != nullptr) {
+ if ((error_code= begin_command(XPAND_SCAN_TABLE_COND)))
+ return error_code;
+ } else {
+ if ((error_code= begin_command(XPAND_SCAN_TABLE)))
+ return error_code;
+ }
+
+ if ((error_code = add_command_operand_ushort(row_req)))
+ return error_code;
+
+ if ((error_code = add_command_operand_ulonglong(xpand_table_oid)))
+ return error_code;
+
+ if ((error_code = add_command_operand_uchar((uchar)lock_mode)))
+ return error_code;
+
+ if ((error_code = add_command_operand_bitmap(read_set)))
+ return error_code;
+
+ if (pushdown_cond_sql != nullptr) {
+ if ((error_code= add_command_operand_str(
+ reinterpret_cast<const uchar*>(pushdown_cond_sql->ptr()),
+ pushdown_cond_sql->length()))) {
+ return error_code;
+ }
+ }
+
+ if ((error_code = send_command()))
+ return error_code;
+
+ return allocate_cursor(&xpand_net, row_req, scan);
+}
+
+/**
+ * @brief
+ * Sends a command to initiate query scan.
+ * @details
+ * Sends a command over mysql protocol connection to initiate an
+ * arbitrary query using a query text.
+ * Uses field types, field metadata and nullability to explicitly
+ * cast result to expected data type. Exploits RBR TABLE_MAP_EVENT
+ * format + sends SQL text.
+ * @args
+ * stmt& Query text to send
+ * fieldtype* array of byte wide field types of result projection
+ * null_bits* fields nullability bitmap of result projection
+ * field_metadata* Field metadata of result projection
+ * scan_refid id used to reference this scan later
+ * Used in pushdowns to initiate query scan.
+ **/
+int xpand_connection::scan_query(String &stmt, uchar *fieldtype, uint fields,
+ uchar *null_bits, uint null_bits_size,
+ uchar *field_metadata,
+ uint field_metadata_size, ushort row_req,
+ ulonglong *oids,
+ xpand_connection_cursor **scan)
+{
+ int error_code;
+ command_length = 0;
+
+ if ((error_code = begin_command(XPAND_SCAN_QUERY)))
+ return error_code;
+
+ do {
+ if ((error_code = add_command_operand_ulonglong(*oids)))
+ return error_code;
+ }
+ while (*oids++);
+
+ if ((error_code = add_command_operand_ushort(row_req)))
+ return error_code;
+
+ if ((error_code = add_command_operand_str((uchar*)stmt.ptr(), stmt.length())))
+ return error_code;
+
+ if ((error_code = add_command_operand_str(fieldtype, fields)))
+ return error_code;
+
+ if ((error_code = add_command_operand_str(field_metadata,
+ field_metadata_size)))
+ return error_code;
+
+ // This variable length string calls for an additional store w/o lcb lenth prefix.
+ if ((error_code = add_command_operand_vlstr(null_bits, null_bits_size)))
+ return error_code;
+
+ if ((error_code = send_command()))
+ return error_code;
+
+ return allocate_cursor(&xpand_net, row_req, scan);
+}
+
+/**
+ * @brief
+ * Sends a command to initiate UPDATE.
+ * @details
+ * Sends a command over mysql protocol connection to initiate an
+ * UPDATE query using a query text.
+ * @args
+ * stmt& Query text to send
+ * dbname current working database
+ * dbname &current database name
+ **/
+int xpand_connection::update_query(String &stmt, LEX_CSTRING &dbname,
+ ulonglong *oids, ulonglong *affected_rows)
+{
+ int error_code;
+ command_length = 0;
+
+ if ((error_code = begin_command(XPAND_UPDATE_QUERY)))
+ return error_code;
+
+ do {
+ if ((error_code = add_command_operand_ulonglong(*oids)))
+ return error_code;
+ }
+ while (*oids++);
+
+ if ((error_code = add_command_operand_str((uchar*)dbname.str, dbname.length)))
+ return error_code;
+
+ if ((error_code = add_command_operand_str((uchar*)stmt.ptr(), stmt.length())))
+ return error_code;
+
+ if ((error_code = send_command()))
+ return error_code;
+
+ error_code = read_query_response();
+ if (!error_code)
+ *affected_rows = xpand_net.affected_rows;
+
+ return error_code;
+}
+
+int xpand_connection::scan_from_key(ulonglong xpand_table_oid, uint index,
+ xpand_lock_mode_t lock_mode,
+ enum scan_type scan_dir,
+ int no_key_cols, bool sorted_scan,
+ MY_BITMAP *read_set, uchar *packed_key,
+ ulong packed_key_length, ushort row_req,
+ xpand_connection_cursor **scan)
+{
+ int error_code;
+ command_length = 0;
+
+ // row based commands should not be called with auto commit.
+ if (trans_flags & XPAND_TRANS_AUTOCOMMIT)
+ return HA_ERR_INTERNAL_ERROR;
+
+ if ((error_code = begin_command(XPAND_SCAN_FROM_KEY)))
+ return error_code;
+
+ if ((error_code = add_command_operand_ushort(row_req)))
+ return error_code;
+
+ if ((error_code = add_command_operand_ulonglong(xpand_table_oid)))
+ return error_code;
+
+ if ((error_code = add_command_operand_uint(index)))
+ return error_code;
+
+ if ((error_code = add_command_operand_uchar((uchar)lock_mode)))
+ return error_code;
+
+ if ((error_code = add_command_operand_uchar(scan_dir)))
+ return error_code;
+
+ if ((error_code = add_command_operand_uint(no_key_cols)))
+ return error_code;
+
+ if ((error_code = add_command_operand_uchar(sorted_scan)))
+ return error_code;
+
+ if ((error_code = add_command_operand_str(packed_key, packed_key_length)))
+ return error_code;
+
+ if ((error_code = add_command_operand_bitmap(read_set)))
+ return error_code;
+
+ if ((error_code = send_command()))
+ return error_code;
+
+ return allocate_cursor(&xpand_net, row_req, scan);
+}
+
+int xpand_connection::scan_next(xpand_connection_cursor *scan,
+ uchar **rowdata, ulong *rowdata_length)
+{
+ *rowdata = scan->retrieve_row(rowdata_length);
+ if (*rowdata)
+ return 0;
+
+ if (scan->eof_reached)
+ return HA_ERR_END_OF_FILE;
+
+ int error_code;
+ command_length = 0;
+
+ if ((error_code = begin_command(XPAND_SCAN_NEXT)))
+ return error_code;
+
+ // This should not happen as @@xpand_row_buffer has this limit.
+ if (scan->buffer_size > 65535)
+ return HA_ERR_INTERNAL_ERROR;
+
+ if ((error_code = add_command_operand_ushort((ushort)scan->buffer_size)))
+ return error_code;
+
+ if ((error_code = add_command_operand_lcb(scan->scan_refid)))
+ return error_code;
+
+ if ((error_code = send_command()))
+ return error_code;
+
+ bool stmt_completed = FALSE;
+ error_code = scan->load_rows(&stmt_completed);
+ if (stmt_completed)
+ auto_commit_closed();
+ if (error_code)
+ return error_code;
+
+ *rowdata = scan->retrieve_row(rowdata_length);
+ if (!*rowdata)
+ return HA_ERR_END_OF_FILE;
+
+ return 0;
+}
+
+int xpand_connection::scan_end(xpand_connection_cursor *scan)
+{
+ int error_code;
+ command_length = 0;
+ ulonglong scan_refid = scan->scan_refid;
+ bool eof_reached = scan->eof_reached;
+ delete scan;
+
+ if (eof_reached)
+ return 0;
+
+ if ((error_code = begin_command(XPAND_SCAN_STOP)))
+ return error_code;
+
+ if ((error_code = add_command_operand_lcb(scan_refid)))
+ return error_code;
+
+ if ((error_code = send_command()))
+ return error_code;
+
+ return read_query_response();
+}
+
+int xpand_connection::populate_table_list(LEX_CSTRING *db,
+ handlerton::discovered_list *result)
+{
+ int error_code = 0;
+ String stmt;
+ stmt.append("SHOW FULL TABLES FROM ");
+ stmt.append(db);
+ stmt.append(" WHERE table_type = 'BASE TABLE'");
+
+ if (mysql_real_query(&xpand_net, stmt.c_ptr(), stmt.length())) {
+ int error_code = mysql_errno(&xpand_net);
+ if (error_code == ER_BAD_DB_ERROR)
+ return 0;
+ else
+ return error_code;
+ }
+
+ MYSQL_RES *results = mysql_store_result(&xpand_net);
+ if (mysql_num_fields(results) != 2) {
+ error_code = HA_ERR_CORRUPT_EVENT;
+ goto error;
+ }
+
+ MYSQL_ROW row;
+ while((row = mysql_fetch_row(results)))
+ result->add_table(row[0], strlen(row[0]));
+
+error:
+ mysql_free_result(results);
+ return error_code;
+}
+
+
+/*
+ Given a table name, find its OID in the Clustrix, and save it in TABLE_SHARE
+
+ @param db Database name
+ @param name Table name
+ @param oid OUT Return the OID here
+ @param share INOUT If not NULL and the share has ha_share pointer, also
+ update Xpand_share::xpand_table_oid.
+
+ @return
+ 0 - OK
+ error code if an error occurred
+*/
+
+int xpand_connection::get_table_oid(const char *db, size_t db_len,
+ const char *name, size_t name_len,
+ ulonglong *oid, TABLE_SHARE *share)
+{
+ MYSQL_ROW row;
+ int error_code = 0;
+ MYSQL_RES *results_oid = NULL;
+ String get_oid;
+ DBUG_ENTER("xpand_connection::get_table_oid");
+
+ /* get oid */
+ get_oid.append("select r.table "
+ "from system.databases d "
+ " inner join ""system.relations r on d.db = r.db "
+ "where d.name = '");
+ get_oid.append(db, db_len);
+ get_oid.append("' and r.name = '");
+ get_oid.append(name, name_len);
+ get_oid.append("'");
+
+ if (mysql_real_query(&xpand_net, get_oid.c_ptr(), get_oid.length())) {
+ if ((error_code = mysql_errno(&xpand_net))) {
+ DBUG_PRINT("mysql_real_query returns ", ("%d", error_code));
+ error_code = HA_ERR_NO_SUCH_TABLE;
+ goto error;
+ }
+ }
+
+ results_oid = mysql_store_result(&xpand_net);
+ DBUG_PRINT("oid results",
+ ("rows: %llu, fields: %u", mysql_num_rows(results_oid),
+ mysql_num_fields(results_oid)));
+
+ if (mysql_num_rows(results_oid) != 1) {
+ error_code = HA_ERR_NO_SUCH_TABLE;
+ goto error;
+ }
+
+ if ((row = mysql_fetch_row(results_oid))) {
+ DBUG_PRINT("row", ("%s", row[0]));
+ *oid = strtoull((const char *)row[0], NULL, 10);
+ } else {
+ error_code = HA_ERR_NO_SUCH_TABLE;
+ goto error;
+ }
+
+error:
+ if (results_oid)
+ mysql_free_result(results_oid);
+
+ DBUG_RETURN(error_code);
+}
+
+
+/*
+ Given a table name, fetch table definition from Clustrix and fill the TABLE_SHARE
+ object with details about field, indexes, etc.
+*/
+int xpand_connection::discover_table_details(LEX_CSTRING *db, LEX_CSTRING *name,
+ THD *thd, TABLE_SHARE *share)
+{
+ DBUG_ENTER("xpand_connection::discover_table_details");
+ int error_code = 0;
+ MYSQL_RES *results_create = NULL;
+ MYSQL_ROW row;
+ String show;
+ ulonglong oid = 0;
+ Xpand_share *cs;
+
+ if ((error_code = xpand_connection::get_table_oid(db->str, db->length,
+ name->str, name->length,
+ &oid, share)))
+ goto error;
+
+ if (!share->ha_share)
+ share->ha_share= new Xpand_share;
+ cs= static_cast<Xpand_share*>(share->ha_share);
+ cs->xpand_table_oid = oid;
+
+ /* get show create statement */
+ show.append("show simple create table ");
+ show.append(db);
+ show.append(".");
+ show.append("`");
+ show.append(name);
+ show.append("`");
+ if (mysql_real_query(&xpand_net, show.c_ptr(), show.length())) {
+ if ((error_code = mysql_errno(&xpand_net))) {
+ DBUG_PRINT("mysql_real_query returns ", ("%d", error_code));
+ error_code = HA_ERR_NO_SUCH_TABLE;
+ goto error;
+ }
+ }
+
+ results_create = mysql_store_result(&xpand_net);
+ DBUG_PRINT("show table results",
+ ("rows: %llu, fields: %u", mysql_num_rows(results_create),
+ mysql_num_fields(results_create)));
+
+ if (mysql_num_rows(results_create) != 1) {
+ error_code = HA_ERR_NO_SUCH_TABLE;
+ goto error;
+ }
+
+ if (mysql_num_fields(results_create) != 2) {
+ error_code = HA_ERR_CORRUPT_EVENT;
+ goto error;
+ }
+
+ while((row = mysql_fetch_row(results_create))) {
+ DBUG_PRINT("row", ("%s - %s", row[0], row[1]));
+ error_code = share->init_from_sql_statement_string(thd, false, row[1],
+ strlen(row[1]));
+ }
+
+ cs->rediscover_table = false;
+error:
+ if (results_create)
+ mysql_free_result(results_create);
+ DBUG_RETURN(error_code);
+}
+
+#define COMMAND_BUFFER_SIZE_INCREMENT 1024
+#define COMMAND_BUFFER_SIZE_INCREMENT_BITS 10
+int xpand_connection::expand_command_buffer(size_t add_length)
+{
+ size_t expanded_length;
+
+ if (command_buffer_length >= command_length + add_length)
+ return 0;
+
+ expanded_length = command_buffer_length +
+ ((add_length >> COMMAND_BUFFER_SIZE_INCREMENT_BITS)
+ << COMMAND_BUFFER_SIZE_INCREMENT_BITS) +
+ COMMAND_BUFFER_SIZE_INCREMENT;
+
+ if (!command_buffer_length)
+ command_buffer = (uchar *) my_malloc(expanded_length, MYF(MY_WME));
+ else
+ command_buffer = (uchar *) my_realloc(command_buffer, expanded_length,
+ MYF(MY_WME));
+ if (!command_buffer)
+ return HA_ERR_OUT_OF_MEM;
+
+ command_buffer_length = expanded_length;
+
+ return 0;
+}
+
+int xpand_connection::add_command_operand_uchar(uchar value)
+{
+ int error_code = expand_command_buffer(sizeof(value));
+ if (error_code)
+ return error_code;
+
+ memcpy(command_buffer + command_length, &value, sizeof(value));
+ command_length += sizeof(value);
+
+ return 0;
+}
+
+int xpand_connection::add_command_operand_ushort(ushort value)
+{
+ ushort be_value = htobe16(value);
+ int error_code = expand_command_buffer(sizeof(be_value));
+ if (error_code)
+ return error_code;
+
+ memcpy(command_buffer + command_length, &be_value, sizeof(be_value));
+ command_length += sizeof(be_value);
+ return 0;
+}
+
+int xpand_connection::add_command_operand_uint(uint value)
+{
+ uint be_value = htobe32(value);
+ int error_code = expand_command_buffer(sizeof(be_value));
+ if (error_code)
+ return error_code;
+
+ memcpy(command_buffer + command_length, &be_value, sizeof(be_value));
+ command_length += sizeof(be_value);
+ return 0;
+}
+
+int xpand_connection::add_command_operand_ulonglong(ulonglong value)
+{
+ ulonglong be_value = htobe64(value);
+ int error_code = expand_command_buffer(sizeof(be_value));
+ if (error_code)
+ return error_code;
+
+ memcpy(command_buffer + command_length, &be_value, sizeof(be_value));
+ command_length += sizeof(be_value);
+ return 0;
+}
+
+int xpand_connection::add_command_operand_lcb(ulonglong value)
+{
+ int len = net_length_size(value);
+ int error_code = expand_command_buffer(len);
+ if (error_code)
+ return error_code;
+
+ net_store_length(command_buffer + command_length, value);
+ command_length += len;
+ return 0;
+}
+
+int xpand_connection::add_command_operand_str(const uchar *str,
+ size_t str_length)
+{
+ int error_code = add_command_operand_lcb(str_length);
+ if (error_code)
+ return error_code;
+
+ if (!str_length)
+ return 0;
+
+ error_code = expand_command_buffer(str_length);
+ if (error_code)
+ return error_code;
+
+ memcpy(command_buffer + command_length, str, str_length);
+ command_length += str_length;
+ return 0;
+}
+
+/**
+ * @brief
+ * Puts variable length string into the buffer.
+ * @details
+ * Puts into the buffer variable length string the size
+ * of which is send by other means. For details see
+ * MDB Client/Server Protocol.
+ * @args
+ * str - string to send
+ * str_length - size
+ **/
+int xpand_connection::add_command_operand_vlstr(const uchar *str,
+ size_t str_length)
+{
+ int error_code = expand_command_buffer(str_length);
+ if (error_code)
+ return error_code;
+
+ memcpy(command_buffer + command_length, str, str_length);
+ command_length += str_length;
+ return 0;
+}
+
+int xpand_connection::add_command_operand_lex_string(LEX_CSTRING str)
+{
+ return add_command_operand_str((const uchar *)str.str, str.length);
+}
+
+int xpand_connection::add_command_operand_bitmap(MY_BITMAP *bitmap)
+{
+ int error_code = add_command_operand_lcb(bitmap->n_bits);
+ if (error_code)
+ return error_code;
+
+ int no_bytes = no_bytes_in_map(bitmap);
+ error_code = expand_command_buffer(no_bytes);
+ if (error_code)
+ return error_code;
+
+ memcpy(command_buffer + command_length, bitmap->bitmap, no_bytes);
+ command_length += no_bytes;
+ return 0;
+}
+
+/****************************************************************************
+** Class xpand_host_list
+****************************************************************************/
+
+int xpand_host_list::fill(const char *hosts)
+{
+ strtok_buf = my_strdup(hosts, MYF(MY_WME));
+ if (!strtok_buf) {
+ return HA_ERR_OUT_OF_MEM;
+ }
+
+ const char *sep = ",; ";
+ //parse into array
+ int i = 0;
+ char *cursor = NULL;
+ char *token = NULL;
+ for (token = strtok_r(strtok_buf, sep, &cursor);
+ token && i < max_host_count;
+ token = strtok_r(NULL, sep, &cursor)) {
+ this->hosts[i] = token;
+ i++;
+ }
+
+ //host count out of range
+ if (i == 0 || token) {
+ my_free(strtok_buf);
+ return ER_BAD_HOST_ERROR;
+ }
+ hosts_len = i;
+
+ return 0;
+}
+
+void xpand_host_list::empty()
+{
+ my_free(strtok_buf);
+ strtok_buf = NULL;
+ hosts_len = 0;
+}
diff --git a/storage/xpand/xpand_connection.h b/storage/xpand/xpand_connection.h
new file mode 100644
index 00000000000..beeaf8a95f4
--- /dev/null
+++ b/storage/xpand/xpand_connection.h
@@ -0,0 +1,146 @@
+/*****************************************************************************
+Copyright (c) 2019, 2020, MariaDB Corporation.
+*****************************************************************************/
+
+#ifndef _xpand_connection_h
+#define _xpand_connection_h
+
+#ifdef USE_PRAGMA_INTERFACE
+#pragma interface /* gcc class implementation */
+#endif
+
+#define MYSQL_SERVER 1
+#include "my_global.h"
+#include "m_string.h"
+#include "mysql.h"
+#include "sql_common.h"
+#include "my_base.h"
+#include "mysqld_error.h"
+#include "my_bitmap.h"
+#include "handler.h"
+
+#define XPAND_SERVER_REQUEST 30
+
+enum xpand_lock_mode_t {
+ XPAND_NO_LOCKS,
+ XPAND_SHARED,
+ XPAND_EXCLUSIVE,
+};
+
+enum xpand_balance_algorithm_enum {
+ XPAND_BALANCE_FIRST,
+ XPAND_BALANCE_ROUND_ROBIN
+};
+
+class xpand_connection_cursor;
+class xpand_connection
+{
+private:
+ MYSQL xpand_net;
+ uchar *command_buffer;
+ size_t command_buffer_length;
+ size_t command_length;
+
+ int trans_state;
+ int trans_flags;
+ int allocate_cursor(MYSQL *xpand_net, ulong buffer_size,
+ xpand_connection_cursor **scan);
+public:
+ xpand_connection();
+ ~xpand_connection();
+
+ inline bool is_connected()
+ {
+ return xpand_net.net.vio;
+ }
+ int connect();
+ int connect_direct(char *host);
+ void disconnect(bool is_destructor = FALSE);
+
+ bool has_open_transaction();
+ int commit_transaction();
+ int rollback_transaction();
+ int begin_transaction_next();
+ int new_statement_next();
+ int rollback_statement_next(); // also starts new statement
+ void auto_commit_next();
+ void auto_commit_closed();
+
+ int run_query(String &stmt);
+ int write_row(ulonglong xpand_table_oid, uchar *packed_row,
+ size_t packed_size, ulonglong *last_insert_id);
+ int key_update(ulonglong xpand_table_oid,
+ uchar *packed_key, size_t packed_key_length,
+ MY_BITMAP *update_set,
+ uchar *packed_new_data, size_t packed_new_length);
+ int key_delete(ulonglong xpand_table_oid,
+ uchar *packed_key, size_t packed_key_length);
+ int key_read(ulonglong xpand_table_oid, uint index,
+ xpand_lock_mode_t lock_mode, MY_BITMAP *read_set,
+ uchar *packed_key, ulong packed_key_length, uchar **rowdata,
+ ulonglong *rowdata_length);
+ enum sort_order {SORT_NONE = 0, SORT_ASC = 1, SORT_DESC = 2};
+ enum scan_type {
+ READ_KEY_OR_NEXT, /* rows with key and greater */
+ READ_KEY_OR_PREV, /* rows with key and less. */
+ READ_AFTER_KEY, /* rows with keys greater than key */
+ READ_BEFORE_KEY, /* rows with keys less than key */
+ READ_FROM_START, /* rows with forwards from first key. */
+ READ_FROM_LAST, /* rows with backwards from last key. */
+ };
+ int scan_table(ulonglong xpand_table_oid,
+ xpand_lock_mode_t lock_mode,
+ MY_BITMAP *read_set, ushort row_req,
+ xpand_connection_cursor **scan, String* pushdown_cond_sql);
+ int scan_query(String &stmt, uchar *fieldtype, uint fields, uchar *null_bits,
+ uint null_bits_size, uchar *field_metadata,
+ uint field_metadata_size, ushort row_req, ulonglong *oids,
+ xpand_connection_cursor **scan);
+ int update_query(String &stmt, LEX_CSTRING &dbname, ulonglong *oids,
+ ulonglong *affected_rows);
+ int scan_from_key(ulonglong xpand_table_oid, uint index,
+ xpand_lock_mode_t lock_mode,
+ enum scan_type scan_dir, int no_key_cols, bool sorted_scan,
+ MY_BITMAP *read_set, uchar *packed_key,
+ ulong packed_key_length, ushort row_req,
+ xpand_connection_cursor **scan);
+ int scan_next(xpand_connection_cursor *scan, uchar **rowdata,
+ ulong *rowdata_length);
+ int scan_end(xpand_connection_cursor *scan);
+
+ int populate_table_list(LEX_CSTRING *db, handlerton::discovered_list *result);
+ int get_table_oid(const char *db, size_t db_len, const char *name,
+ size_t name_len, ulonglong *oid, TABLE_SHARE *share);
+ int discover_table_details(LEX_CSTRING *db, LEX_CSTRING *name, THD *thd,
+ TABLE_SHARE *share);
+
+private:
+ int expand_command_buffer(size_t add_length);
+ int add_command_operand_uchar(uchar value);
+ int add_command_operand_ushort(ushort value);
+ int add_command_operand_uint(uint value);
+ int add_command_operand_ulonglong(ulonglong value);
+ int add_command_operand_lcb(ulonglong value);
+ int add_command_operand_str(const uchar *str, size_t length);
+ int add_command_operand_vlstr(const uchar *str, size_t length);
+ int add_command_operand_lex_string(LEX_CSTRING str);
+ int add_command_operand_bitmap(MY_BITMAP *bitmap);
+ int add_status_vars();
+ int begin_command(uchar command);
+ int send_command();
+ int read_query_response();
+};
+
+static const int max_host_count = 128;
+class xpand_host_list {
+private:
+ char *strtok_buf;
+public:
+ int hosts_len;
+ char *hosts[max_host_count];
+
+ int fill(const char *hosts);
+ void empty();
+};
+
+#endif // _xpand_connection_h