diff options
Diffstat (limited to 'storage/federatedx')
-rw-r--r-- | storage/federatedx/federatedx_io.cc | 1 | ||||
-rw-r--r-- | storage/federatedx/federatedx_io_mysql.cc | 10 | ||||
-rw-r--r-- | storage/federatedx/federatedx_io_null.cc | 1 | ||||
-rw-r--r-- | storage/federatedx/federatedx_txn.cc | 1 | ||||
-rw-r--r-- | storage/federatedx/ha_federatedx.cc | 199 | ||||
-rw-r--r-- | storage/federatedx/ha_federatedx.h | 7 |
6 files changed, 128 insertions, 91 deletions
diff --git a/storage/federatedx/federatedx_io.cc b/storage/federatedx/federatedx_io.cc index 6c968cd7907..1e0348e3bf8 100644 --- a/storage/federatedx/federatedx_io.cc +++ b/storage/federatedx/federatedx_io.cc @@ -30,7 +30,6 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. /*#define MYSQL_SERVER 1*/ #include <my_global.h> #include "sql_priv.h" -#include <mysql/plugin.h> #include "ha_federatedx.h" diff --git a/storage/federatedx/federatedx_io_mysql.cc b/storage/federatedx/federatedx_io_mysql.cc index ef3b1388200..496a42fb628 100644 --- a/storage/federatedx/federatedx_io_mysql.cc +++ b/storage/federatedx/federatedx_io_mysql.cc @@ -30,11 +30,11 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #define MYSQL_SERVER 1 #include <my_global.h> #include "sql_priv.h" -#include <mysql/plugin.h> #include "ha_federatedx.h" #include "m_string.h" +#include "mysqld_error.h" #include "sql_servers.h" #ifdef USE_PRAGMA_IMPLEMENTATION @@ -138,7 +138,7 @@ federatedx_io_mysql::federatedx_io_mysql(FEDERATEDX_SERVER *aserver) bzero(&mysql, sizeof(MYSQL)); bzero(&savepoints, sizeof(DYNAMIC_ARRAY)); - my_init_dynamic_array(&savepoints, sizeof(SAVEPT), 16, 16); + my_init_dynamic_array(&savepoints, sizeof(SAVEPT), 16, 16, MYF(0)); DBUG_VOID_RETURN; } @@ -424,8 +424,10 @@ int federatedx_io_mysql::actual_query(const char *buffer, uint length) if (!mysql.net.vio) { + my_bool my_true= 1; + if (!(mysql_init(&mysql))) - DBUG_RETURN(-1); + DBUG_RETURN(-1); /* BUG# 17044 Federated Storage Engine is not UTF8 clean @@ -434,6 +436,8 @@ int federatedx_io_mysql::actual_query(const char *buffer, uint length) */ /* this sets the csname like 'set names utf8' */ mysql_options(&mysql, MYSQL_SET_CHARSET_NAME, get_charsetname()); + mysql_options(&mysql, MYSQL_OPT_USE_THREAD_SPECIFIC_MEMORY, + (char*) &my_true); if (!mysql_real_connect(&mysql, get_hostname(), diff --git a/storage/federatedx/federatedx_io_null.cc b/storage/federatedx/federatedx_io_null.cc index 2b84d03808e..aa35d4bdecc 100644 --- a/storage/federatedx/federatedx_io_null.cc +++ b/storage/federatedx/federatedx_io_null.cc @@ -30,7 +30,6 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. /*#define MYSQL_SERVER 1*/ #include <my_global.h> #include "sql_priv.h" -#include <mysql/plugin.h> #include "ha_federatedx.h" diff --git a/storage/federatedx/federatedx_txn.cc b/storage/federatedx/federatedx_txn.cc index d74ece32c61..232ac335dfc 100644 --- a/storage/federatedx/federatedx_txn.cc +++ b/storage/federatedx/federatedx_txn.cc @@ -33,7 +33,6 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #define MYSQL_SERVER 1 #include <my_global.h> #include "sql_priv.h" -#include <mysql/plugin.h> #include "ha_federatedx.h" diff --git a/storage/federatedx/ha_federatedx.cc b/storage/federatedx/ha_federatedx.cc index bafae614fab..0a24fe9c910 100644 --- a/storage/federatedx/ha_federatedx.cc +++ b/storage/federatedx/ha_federatedx.cc @@ -427,6 +427,7 @@ int federatedx_db_init(void *p) federatedx_hton->savepoint_release= ha_federatedx::savepoint_release; federatedx_hton->commit= ha_federatedx::commit; federatedx_hton->rollback= ha_federatedx::rollback; + federatedx_hton->discover_table_structure= ha_federatedx::discover_assisted; federatedx_hton->create= federatedx_create_handler; federatedx_hton->flags= HTON_ALTER_NOT_SUPPORTED; @@ -517,15 +518,16 @@ err: } -static int parse_url_error(FEDERATEDX_SHARE *share, TABLE *table, int error_num) +static int parse_url_error(FEDERATEDX_SHARE *share, TABLE_SHARE *table_s, + int error_num) { char buf[FEDERATEDX_QUERY_BUFFER_SIZE]; int buf_len; DBUG_ENTER("ha_federatedx parse_url_error"); - buf_len= min(table->s->connect_string.length, - FEDERATEDX_QUERY_BUFFER_SIZE-1); - strmake(buf, table->s->connect_string.str, buf_len); + buf_len= MY_MIN(table_s->connect_string.length, + FEDERATEDX_QUERY_BUFFER_SIZE-1); + strmake(buf, table_s->connect_string.str, buf_len); my_error(error_num, MYF(0), buf, 14); DBUG_RETURN(error_num); } @@ -564,17 +566,17 @@ int get_connection(MEM_ROOT *mem_root, FEDERATEDX_SHARE *share) at the address of the share. */ share->server_name_length= server->server_name_length; - share->server_name= server->server_name; - share->username= server->username; - share->password= server->password; - share->database= server->db; + share->server_name= const_cast<char*>(server->server_name); + share->username= const_cast<char*>(server->username); + share->password= const_cast<char*>(server->password); + share->database= const_cast<char*>(server->db); share->port= server->port > MIN_PORT && server->port < 65536 ? (ushort) server->port : MYSQL_PORT; - share->hostname= server->host; - if (!(share->socket= server->socket) && + share->hostname= const_cast<char*>(server->host); + if (!(share->socket= const_cast<char*>(server->socket)) && !strcmp(share->hostname, my_localhost)) share->socket= (char *) MYSQL_UNIX_ADDR; - share->scheme= server->scheme; + share->scheme= const_cast<char*>(server->scheme); DBUG_PRINT("info", ("share->username: %s", share->username)); DBUG_PRINT("info", ("share->password: %s", share->password)); @@ -645,8 +647,8 @@ error: */ -static int parse_url(MEM_ROOT *mem_root, FEDERATEDX_SHARE *share, TABLE *table, - uint table_create_flag) +static int parse_url(MEM_ROOT *mem_root, FEDERATEDX_SHARE *share, + TABLE_SHARE *table_s, uint table_create_flag) { uint error_num= (table_create_flag ? ER_FOREIGN_DATA_STRING_INVALID_CANT_CREATE : @@ -656,11 +658,11 @@ static int parse_url(MEM_ROOT *mem_root, FEDERATEDX_SHARE *share, TABLE *table, share->port= 0; share->socket= 0; DBUG_PRINT("info", ("share at %lx", (long unsigned int) share)); - DBUG_PRINT("info", ("Length: %u", (uint) table->s->connect_string.length)); - DBUG_PRINT("info", ("String: '%.*s'", (int) table->s->connect_string.length, - table->s->connect_string.str)); - share->connection_string= strmake_root(mem_root, table->s->connect_string.str, - table->s->connect_string.length); + DBUG_PRINT("info", ("Length: %u", (uint) table_s->connect_string.length)); + DBUG_PRINT("info", ("String: '%.*s'", (int) table_s->connect_string.length, + table_s->connect_string.str)); + share->connection_string= strmake_root(mem_root, table_s->connect_string.str, + table_s->connect_string.length); DBUG_PRINT("info",("parse_url alloced share->connection_string %lx", (long unsigned int) share->connection_string)); @@ -713,9 +715,9 @@ static int parse_url(MEM_ROOT *mem_root, FEDERATEDX_SHARE *share, TABLE *table, Connection specifies everything but, resort to expecting remote and foreign table names to match */ - share->table_name= strmake_root(mem_root, table->s->table_name.str, + share->table_name= strmake_root(mem_root, table_s->table_name.str, (share->table_name_length= - table->s->table_name.length)); + table_s->table_name.length)); DBUG_PRINT("info", ("internal format, default table_name " "share->connection_string: %s share->table_name: %s", @@ -729,7 +731,7 @@ static int parse_url(MEM_ROOT *mem_root, FEDERATEDX_SHARE *share, TABLE *table, { share->parsed= TRUE; // Add a null for later termination of table name - share->connection_string[table->s->connect_string.length]= 0; + share->connection_string[table_s->connect_string.length]= 0; share->scheme= share->connection_string; DBUG_PRINT("info",("parse_url alloced share->scheme: %lx", (ulong) share->scheme)); @@ -816,7 +818,7 @@ static int parse_url(MEM_ROOT *mem_root, FEDERATEDX_SHARE *share, TABLE *table, DBUG_RETURN(0); error: - DBUG_RETURN(parse_url_error(share, table, error_num)); + DBUG_RETURN(parse_url_error(share, table_s, error_num)); } /***************************************************************************** @@ -924,7 +926,7 @@ static bool emit_key_part_element(String *to, KEY_PART_INFO *part, uint blob_length= uint2korr(ptr); blob.set_quick((char*) ptr+HA_KEY_BLOB_LENGTH, blob_length, &my_charset_bin); - if (append_escaped(to, &blob)) + if (to->append_for_single_quote(&blob)) DBUG_RETURN(1); } else if (part->key_part_flag & HA_VAR_LENGTH_PART) @@ -933,7 +935,7 @@ static bool emit_key_part_element(String *to, KEY_PART_INFO *part, uint var_length= uint2korr(ptr); varchar.set_quick((char*) ptr+HA_KEY_BLOB_LENGTH, var_length, &my_charset_bin); - if (append_escaped(to, &varchar)) + if (to->append_for_single_quote(&varchar)) DBUG_RETURN(1); } else @@ -945,7 +947,7 @@ static bool emit_key_part_element(String *to, KEY_PART_INFO *part, if (field->result_type() == STRING_RESULT) { - if (append_escaped(to, res)) + if (to->append_for_single_quote(res)) DBUG_RETURN(1); } else if (to->append(res->ptr(), res->length())) @@ -1236,16 +1238,16 @@ bool ha_federatedx::create_where_from_key(String *to, tmp.append(STRING_WITH_LEN(" (")); } - for (key_part= key_info->key_part, - remainder= key_info->key_parts, - length= ranges[i]->length, - ptr= ranges[i]->key; ; + for (key_part= key_info->key_part, + remainder= key_info->user_defined_key_parts, + length= ranges[i]->length, + ptr= ranges[i]->key; ; remainder--, - key_part++) + key_part++) { Field *field= key_part->field; uint store_length= key_part->store_length; - uint part_length= min(store_length, length); + uint part_length= MY_MIN(store_length, length); needs_quotes= field->str_needs_quotes(); DBUG_DUMP("key, start of loop", ptr, length); @@ -1317,7 +1319,7 @@ bool ha_federatedx::create_where_from_key(String *to, break; } DBUG_PRINT("info", ("federatedx HA_READ_AFTER_KEY %d", i)); - if (store_length >= length) /* end key */ + if (store_length >= length || i > 0) /* end key */ { if (emit_key_part_name(&tmp, key_part)) goto err; @@ -1387,7 +1389,7 @@ prepare_for_next_key_part: ptr was incremented by 1. Since store_length still counts null-byte, we need to subtract 1 from store_length. */ - ptr+= store_length - test(key_part->null_bit); + ptr+= store_length - MY_TEST(key_part->null_bit); if (tmp.append(STRING_WITH_LEN(" AND "))) goto err; @@ -1520,7 +1522,7 @@ static FEDERATEDX_SERVER *get_server(FEDERATEDX_SHARE *share, TABLE *table) mysql_mutex_assert_owner(&federatedx_mutex); - init_alloc_root(&mem_root, 4096, 4096); + init_alloc_root(&mem_root, 4096, 4096, MYF(0)); fill_server(&mem_root, &tmp_server, share, table ? table->s->table_charset : 0); @@ -1578,13 +1580,13 @@ static FEDERATEDX_SHARE *get_share(const char *table_name, TABLE *table) query.length(0); bzero(&tmp_share, sizeof(tmp_share)); - init_alloc_root(&mem_root, 256, 0); + init_alloc_root(&mem_root, 256, 0, MYF(0)); mysql_mutex_lock(&federatedx_mutex); tmp_share.share_key= table_name; tmp_share.share_key_length= strlen(table_name); - if (parse_url(&mem_root, &tmp_share, table, 0)) + if (parse_url(&mem_root, &tmp_share, table->s, 0)) goto error; /* TODO: change tmp_share.scheme to LEX_STRING object */ @@ -1718,22 +1720,6 @@ ha_rows ha_federatedx::records_in_range(uint inx, key_range *start_key, DBUG_ENTER("ha_federatedx::records_in_range"); DBUG_RETURN(FEDERATEDX_RECORDS_IN_RANGE); } -/* - If frm_error() is called then we will use this to to find out - what file extentions exist for the storage engine. This is - also used by the default rename_table and delete_table method - in handler.cc. -*/ - -const char **ha_federatedx::bas_ext() const -{ - static const char *ext[]= - { - NullS - }; - return ext; -} - federatedx_txn *ha_federatedx::get_txn(THD *thd, bool no_create) { @@ -1790,7 +1776,7 @@ int ha_federatedx::open(const char *name, int mode, uint test_if_locked) DBUG_PRINT("info", ("ref_length: %u", ref_length)); - my_init_dynamic_array(&results, sizeof(FEDERATEDX_IO_RESULT*), 4, 4); + my_init_dynamic_array(&results, sizeof(FEDERATEDX_IO_RESULT*), 4, 4, MYF(0)); reset(); @@ -1804,8 +1790,8 @@ public: public: bool handle_condition(THD *thd, uint sql_errno, const char* sqlstate, - MYSQL_ERROR::enum_warning_level level, - const char* msg, MYSQL_ERROR ** cond_hdl) + Sql_condition::enum_warning_level level, + const char* msg, Sql_condition ** cond_hdl) { return sql_errno >= ER_ABORTING_CONNECTION && sql_errno <= ER_NET_WRITE_INTERRUPTED; @@ -1999,8 +1985,6 @@ int ha_federatedx::write_row(uchar *buf) values_string.length(0); insert_field_value_string.length(0); - if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT) - table->timestamp_field->set_time(); /* start both our field and field values strings @@ -2134,7 +2118,7 @@ int ha_federatedx::write_row(uchar *buf) @details Initializes memory structures required for bulk insert. */ -void ha_federatedx::start_bulk_insert(ha_rows rows) +void ha_federatedx::start_bulk_insert(ha_rows rows, uint flags) { uint page_size; DBUG_ENTER("ha_federatedx::start_bulk_insert"); @@ -2175,7 +2159,7 @@ void ha_federatedx::start_bulk_insert(ha_rows rows) @return Operation status @retval 0 No error - @retval != 0 Error occured at remote server. Also sets my_errno. + @retval != 0 Error occurred at remote server. Also sets my_errno. */ int ha_federatedx::end_bulk_insert() @@ -2310,7 +2294,7 @@ int ha_federatedx::update_row(const uchar *old_data, uchar *new_data) this? Because we only are updating one record, and LIMIT enforces this. */ - bool has_a_primary_key= test(table->s->primary_key != MAX_KEY); + bool has_a_primary_key= MY_TEST(table->s->primary_key != MAX_KEY); /* buffers for following strings @@ -2578,9 +2562,7 @@ int ha_federatedx::index_read_idx(uchar *buf, uint index, const uchar *key, RESULT 0 ok In this case *result will contain the result set - table->status == 0 # error In this case *result will contain 0 - table->status == STATUS_NOT_FOUND */ int ha_federatedx::index_read_idx_with_result_set(uchar *buf, uint index, @@ -2637,11 +2619,9 @@ int ha_federatedx::index_read_idx_with_result_set(uchar *buf, uint index, insert_dynamic(&results, (uchar*) result); *result= 0; - table->status= STATUS_NOT_FOUND; DBUG_RETURN(retval); error: - table->status= STATUS_NOT_FOUND; my_error(retval, MYF(0), error_buffer); DBUG_RETURN(retval); } @@ -2722,7 +2702,6 @@ int ha_federatedx::read_range_first(const key_range *start_key, DBUG_RETURN(retval); error: - table->status= STATUS_NOT_FOUND; DBUG_RETURN(retval); } @@ -2927,8 +2906,6 @@ int ha_federatedx::read_next(uchar *buf, FEDERATEDX_IO_RESULT *result) FEDERATEDX_IO_ROW *row; DBUG_ENTER("ha_federatedx::read_next"); - table->status= STATUS_NOT_FOUND; // For easier return - if ((retval= txn->acquire(share, TRUE, &io))) DBUG_RETURN(retval); @@ -3013,7 +2990,6 @@ int ha_federatedx::rnd_pos(uchar *buf, uchar *pos) DBUG_RETURN(retval); error: - table->status= STATUS_NOT_FOUND; DBUG_RETURN(retval); } @@ -3116,7 +3092,7 @@ error: else if (remote_error_number != -1 /* error already reported */) { error_code= remote_error_number; - my_error(error_code, MYF(0), ER(error_code)); + my_error(error_code, MYF(0), ER_THD(thd, error_code)); } fail: tmp_txn->release(&tmp_io); @@ -3377,7 +3353,7 @@ int ha_federatedx::create(const char *name, TABLE *table_arg, federatedx_io *tmp_io= NULL; DBUG_ENTER("ha_federatedx::create"); - if ((retval= parse_url(thd->mem_root, &tmp_share, table_arg, 1))) + if ((retval= parse_url(thd->mem_root, &tmp_share, table_arg->s, 1))) goto error; /* loopback socket connections hang due to LOCK_open mutex */ @@ -3408,14 +3384,6 @@ int ha_federatedx::create(const char *name, TABLE *table_arg, { FEDERATEDX_SERVER server; - /* - Bug#25679 - Ensure that we do not hold the LOCK_open mutex while attempting - to establish FederatedX connection to guard against a trivial - Denial of Service scenerio. - */ - mysql_mutex_assert_not_owner(&LOCK_open); - fill_server(thd->mem_root, &server, &tmp_share, create_info->table_charset); #ifndef DBUG_OFF @@ -3599,6 +3567,75 @@ int ha_federatedx::rollback(handlerton *hton, MYSQL_THD thd, bool all) DBUG_RETURN(return_val); } + +/* + Federated supports assisted discovery, like + CREATE TABLE t1 CONNECTION="mysql://joe:pass@192.168.1.111/federated/t1"; + but not a fully automatic discovery where a table magically appear + on any use (like, on SELECT * from t1). +*/ +int ha_federatedx::discover_assisted(handlerton *hton, THD* thd, + TABLE_SHARE *table_s, HA_CREATE_INFO *info) +{ + int error= HA_ERR_NO_CONNECTION; + FEDERATEDX_SHARE tmp_share; + CHARSET_INFO *cs= system_charset_info; + MYSQL mysql; + char buf[1024]; + String query(buf, sizeof(buf), cs); + MYSQL_RES *res; + MYSQL_ROW rdata; + ulong *rlen; + my_bool my_true= 1; + + if (parse_url(thd->mem_root, &tmp_share, table_s, 1)) + return HA_WRONG_CREATE_OPTION; + + mysql_init(&mysql); + mysql_options(&mysql, MYSQL_SET_CHARSET_NAME, cs->csname); + mysql_options(&mysql, MYSQL_OPT_USE_THREAD_SPECIFIC_MEMORY, + (char*) &my_true); + + if (!mysql_real_connect(&mysql, tmp_share.hostname, tmp_share.username, + tmp_share.password, tmp_share.database, + tmp_share.port, tmp_share.socket, 0)) + goto err1; + + if (mysql_real_query(&mysql, STRING_WITH_LEN("SET SQL_MODE=NO_TABLE_OPTIONS"))) + goto err1; + + query.copy(STRING_WITH_LEN("SHOW CREATE TABLE "), cs); + append_ident(&query, tmp_share.table_name, + tmp_share.table_name_length, ident_quote_char); + + if (mysql_real_query(&mysql, query.ptr(), query.length())) + goto err1; + + if (!((res= mysql_store_result(&mysql)))) + goto err1; + + if (!(rdata= mysql_fetch_row(res)) || !((rlen= mysql_fetch_lengths(res)))) + goto err2; + + query.copy(rdata[1], rlen[1], cs); + query.append(STRING_WITH_LEN(" CONNECTION='"), cs); + query.append_for_single_quote(table_s->connect_string.str, + table_s->connect_string.length); + query.append('\''); + + error= table_s->init_from_sql_statement_string(thd, true, + query.ptr(), query.length()); + +err2: + mysql_free_result(res); +err1: + if (error) + my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), mysql_error(&mysql)); + mysql_close(&mysql); + return error; +} + + struct st_mysql_storage_engine federatedx_storage_engine= { MYSQL_HANDLERTON_INTERFACE_VERSION }; @@ -3612,10 +3649,10 @@ maria_declare_plugin(federatedx) PLUGIN_LICENSE_GPL, federatedx_db_init, /* Plugin Init */ federatedx_done, /* Plugin Deinit */ - 0x0200 /* 2.0 */, + 0x0201 /* 2.1 */, NULL, /* status variables */ NULL, /* system variables */ - "2.0", /* string version */ - MariaDB_PLUGIN_MATURITY_BETA /* maturity */ + "2.1", /* string version */ + MariaDB_PLUGIN_MATURITY_STABLE /* maturity */ } maria_declare_plugin_end; diff --git a/storage/federatedx/ha_federatedx.h b/storage/federatedx/ha_federatedx.h index 1c64892418e..2c2c6eef26b 100644 --- a/storage/federatedx/ha_federatedx.h +++ b/storage/federatedx/ha_federatedx.h @@ -297,6 +297,8 @@ private: static int savepoint_release(handlerton *hton, MYSQL_THD thd, void *sv); static int commit(handlerton *hton, MYSQL_THD thd, bool all); static int rollback(handlerton *hton, MYSQL_THD thd, bool all); + static int discover_assisted(handlerton *, THD*, TABLE_SHARE *, + HA_CREATE_INFO *); bool append_stmt_insert(String *query); @@ -311,15 +313,12 @@ private: public: ha_federatedx(handlerton *hton, TABLE_SHARE *table_arg); ~ha_federatedx() {} - /* The name that will be used for display purposes */ - const char *table_type() const { return "FEDERATED"; } /* The name of the index type that will be used for display don't implement this method unless you really have indexes */ // perhaps get index type const char *index_type(uint inx) { return "REMOTE"; } - const char **bas_ext() const; /* This is a list of flags that says what the storage engine implements. The current table flags are documented in @@ -392,7 +391,7 @@ public: int open(const char *name, int mode, uint test_if_locked); // required int close(void); // required - void start_bulk_insert(ha_rows rows); + void start_bulk_insert(ha_rows rows, uint flags); int end_bulk_insert(); int write_row(uchar *buf); int update_row(const uchar *old_data, uchar *new_data); |