diff options
Diffstat (limited to 'sql/ha_federated.cc')
-rw-r--r-- | sql/ha_federated.cc | 1611 |
1 files changed, 1170 insertions, 441 deletions
diff --git a/sql/ha_federated.cc b/sql/ha_federated.cc index 1cec6faea04..63454bf88ed 100644 --- a/sql/ha_federated.cc +++ b/sql/ha_federated.cc @@ -54,6 +54,7 @@ ***IMPORTANT*** + This is a first release, conceptual release Only 'mysql://' is supported at this release. @@ -352,7 +353,8 @@ #ifdef HAVE_FEDERATED_DB #include "ha_federated.h" -#define MAX_REMOTE_SIZE IO_SIZE + +#include "m_string.h" /* Variables for federated share methods */ static HASH federated_open_tables; // Hash used to track open // tables @@ -413,13 +415,14 @@ bool federated_db_end() return FALSE; } - /* Check (in create) whether the tables exists, and that it can be connected to SYNOPSIS check_foreign_data_source() share pointer to FEDERATED share + table_create_flag tells us that ::create is the caller, + therefore, return CANT_CREATE_FEDERATED_TABLE DESCRIPTION This method first checks that the connection information that parse url @@ -427,23 +430,23 @@ bool federated_db_end() table, and if so, does the foreign table exist. */ -static int check_foreign_data_source(FEDERATED_SHARE *share) +static int check_foreign_data_source( + FEDERATED_SHARE *share, + bool table_create_flag) { - char escaped_table_base_name[IO_SIZE]; - MYSQL *mysql; - MYSQL_RES *result=0; + char escaped_table_name[NAME_LEN*2]; + char query_buffer[FEDERATED_QUERY_BUFFER_SIZE]; + char error_buffer[FEDERATED_QUERY_BUFFER_SIZE]; uint error_code; - char query_buffer[IO_SIZE]; - char error_buffer[IO_SIZE]; String query(query_buffer, sizeof(query_buffer), &my_charset_bin); + MYSQL *mysql; DBUG_ENTER("ha_federated::check_foreign_data_source"); + /* Zero the length, otherwise the string will have misc chars */ query.length(0); /* error out if we can't alloc memory for mysql_init(NULL) (per Georg) */ - if (! (mysql= mysql_init(NULL))) - { + if (!(mysql= mysql_init(NULL))) DBUG_RETURN(HA_ERR_OUT_OF_MEM); - } /* check if we can connect */ if (!mysql_real_connect(mysql, share->hostname, @@ -453,11 +456,18 @@ static int check_foreign_data_source(FEDERATED_SHARE *share) share->port, share->socket, 0)) { + /* + we want the correct error message, but it to return + ER_CANT_CREATE_FEDERATED_TABLE if called by ::create + */ + error_code= table_create_flag? + ER_CANT_CREATE_FEDERATED_TABLE : ER_CONNECT_TO_FOREIGN_DATA_SOURCE; + my_sprintf(error_buffer, - (error_buffer, - "unable to connect to database '%s' on host '%s as user '%s' !", - share->database, share->hostname, share->username)); - error_code= ER_CONNECT_TO_MASTER; + (error_buffer, " database %s username %s hostname %s", + share->database, share->username, share->hostname)); + + my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), error_buffer); goto error; } else @@ -468,48 +478,42 @@ static int check_foreign_data_source(FEDERATED_SHARE *share) with transactions */ mysql->reconnect= 1; - /* + /* Note: I am not using INORMATION_SCHEMA because this needs to work with < 5.0 if we can connect, then make sure the table exists + + the query will be: SELECT * FROM `tablename` WHERE 1=0 */ - query.append("SHOW TABLES LIKE '"); - escape_string_for_mysql(&my_charset_bin, (char *)escaped_table_base_name, - sizeof(escaped_table_base_name), - share->table_base_name, - share->table_base_name_length); - query.append(escaped_table_base_name); - query.append("'"); - - error_code= ER_QUERY_ON_MASTER; + query.append(FEDERATED_SELECT); + query.append(FEDERATED_STAR); + query.append(FEDERATED_FROM); + query.append(FEDERATED_BTICK); + escape_string_for_mysql(&my_charset_bin, (char *)escaped_table_name, + sizeof(escaped_table_name), + share->table_name, + share->table_name_length); + query.append(escaped_table_name); + query.append(FEDERATED_BTICK); + query.append(FEDERATED_WHERE); + query.append(FEDERATED_FALSE); + + DBUG_PRINT("info", ("check_foreign_data_source query %s", query.c_ptr_quick())); if (mysql_real_query(mysql, query.ptr(), query.length())) - goto error; - - result= mysql_store_result(mysql); - if (! result) - goto error; - - /* if ! mysql_num_rows, the table doesn't exist, send error */ - if (! mysql_num_rows(result)) { - my_sprintf(error_buffer, - (error_buffer, "foreign table '%s' does not exist!", - share->table_base_name)); + error_code= table_create_flag ? + ER_CANT_CREATE_FEDERATED_TABLE : ER_FOREIGN_DATA_SOURCE_DOESNT_EXIST; + my_sprintf(error_buffer, (error_buffer, ": %d : %s", + mysql_errno(mysql), mysql_error(mysql))); + + my_error(error_code, MYF(0), error_buffer); goto error; } - mysql_free_result(result); - result= 0; - mysql_close(mysql); - } - DBUG_RETURN(0); + error_code=0; error: - if (result) - mysql_free_result(result); mysql_close(mysql); - my_error(error_code, MYF(0), error_buffer); DBUG_RETURN(error_code); - } @@ -545,22 +549,27 @@ error: 'password' and 'port' are both optional. RETURN VALUE - 0 success - 1 failure, wrong string format + 0 success + error_num particular error code */ static int parse_url(FEDERATED_SHARE *share, TABLE *table, uint table_create_flag) { - uint error_num= (table_create_flag ? ER_CANT_CREATE_TABLE : - ER_CONNECT_TO_MASTER); + uint error_num= (table_create_flag ? + ER_FOREIGN_DATA_STRING_INVALID_CANT_CREATE : + ER_FOREIGN_DATA_STRING_INVALID); DBUG_ENTER("ha_federated::parse_url"); share->port= 0; - share->socket= 0; share->scheme= my_strdup(table->s->comment, MYF(0)); + DBUG_PRINT("info",("parse_url alloced share->scheme %lx", share->scheme)); + /* + remove addition of null terminator and store length + for each string in share + */ if ((share->username= strstr(share->scheme, "://"))) { share->scheme[share->username - share->scheme]= '\0'; @@ -613,20 +622,20 @@ static int parse_url(FEDERATED_SHARE *share, TABLE *table, share->port= atoi(share->sport); } - if ((share->table_base_name= strchr(share->database, '/'))) + if ((share->table_name= strchr(share->database, '/'))) { - share->database[share->table_base_name - share->database]= '\0'; - share->table_base_name++; + share->database[share->table_name - share->database]= '\0'; + share->table_name++; } else goto error; - share->table_base_name_length= strlen(share->table_base_name); + share->table_name_length= strlen(share->table_name); } else goto error; /* make sure there's not an extra / */ - if ((strchr(share->table_base_name, '/'))) + if ((strchr(share->table_name, '/'))) goto error; if (share->hostname[0] == '\0') @@ -645,7 +654,7 @@ static int parse_url(FEDERATED_SHARE *share, TABLE *table, hostname %s port %d database %s tablename %s\n", share->scheme, share->username, share->password, share->hostname, share->port, share->database, - share->table_base_name)); + share->table_name)); } else goto error; @@ -656,13 +665,51 @@ static int parse_url(FEDERATED_SHARE *share, TABLE *table, DBUG_RETURN(0); error: - my_error(error_num, MYF(0), - "connection string is not in the correct format",0); - DBUG_RETURN(1); + if (share->scheme) + { + DBUG_PRINT("info", + ("error: parse_url. Returning error code %d \ + freeing share->scheme %lx", error_num, share->scheme)); + my_free((gptr) share->scheme, MYF(0)); + share->scheme= 0; + } + my_error(error_num, MYF(0), table->s->comment); + DBUG_RETURN(error_num); } +/* Federated storage engine handlerton */ + +static handlerton federated_hton= { + "FEDERATED", + 0, /* slot */ + 0, /* savepoint size. */ + 0, /* close_connection */ + 0, /* savepoint */ + 0, /* rollback to savepoint */ + 0, /* release savepoint */ + 0, /* commit */ + 0, /* rollback */ + 0, /* prepare */ + 0, /* recover */ + 0, /* commit_by_xid */ + 0, /* rollback_by_xid */ + HTON_NO_FLAGS +}; + + +/***************************************************************************** +** FEDERATED tables +*****************************************************************************/ + +ha_federated::ha_federated(TABLE *table_arg) + :handler(&federated_hton, table_arg), + mysql(0), stored_result(0), scan_flag(0), + ref_length(sizeof(MYSQL_ROW_OFFSET)), current_position(0) +{} + + /* Convert MySQL result set row to handler internal format @@ -684,24 +731,115 @@ error: uint ha_federated::convert_row_to_internal_format(byte *record, MYSQL_ROW row) { - ulong *lengths; uint num_fields; - uint x= 0; + ulong *lengths; + Field **field; DBUG_ENTER("ha_federated::convert_row_to_internal_format"); - num_fields= mysql_num_fields(result); - lengths= mysql_fetch_lengths(result); + num_fields= mysql_num_fields(stored_result); + lengths= mysql_fetch_lengths(stored_result); memset(record, 0, table->s->null_bytes); - for (Field **field= table->field; *field; field++, x++) + for (field= table->field; *field; field++) { + /* + index variable to move us through the row at the + same iterative step as the field + */ + int x= field - table->field; + my_ptrdiff_t old_ptr; + old_ptr= (my_ptrdiff_t) (record - table->record[0]); + (*field)->move_field(old_ptr); if (!row[x]) (*field)->set_null(); else + { + (*field)->set_notnull(); (*field)->store(row[x], lengths[x], &my_charset_bin); + } + (*field)->move_field(-old_ptr); + } + + DBUG_DUMP("record", record, table->s->reclength); + DBUG_RETURN(0); +} + +static bool emit_key_part_name(String *to, KEY_PART_INFO *part) +{ + DBUG_ENTER("emit_key_part_name"); + if (to->append(FEDERATED_BTICK) || + to->append(part->field->field_name) || + to->append(FEDERATED_BTICK)) + DBUG_RETURN(1); // Out of memory + DBUG_RETURN(0); +} + +static bool emit_key_part_element(String *to, KEY_PART_INFO *part, + bool needs_quotes, bool is_like, + const byte *ptr, uint len) +{ + Field *field= part->field; + DBUG_ENTER("emit_key_part_element"); + + if (needs_quotes && to->append(FEDERATED_SQUOTE)) + DBUG_RETURN(1); + + if (part->type == HA_KEYTYPE_BIT) + { + char buff[STRING_BUFFER_USUAL_SIZE], *buf= buff; + + *buf++= '0'; + *buf++= 'x'; + for (; len; ptr++,len--) + { + uint tmp= (uint)(uchar) *ptr; + *buf++= _dig_vec_upper[tmp >> 4]; + *buf++= _dig_vec_upper[tmp & 15]; + } + if (to->append(buff, (uint)(buf - buff))) + DBUG_RETURN(1); } + else if (part->key_part_flag & HA_BLOB_PART) + { + String blob; + uint blob_length= uint2korr(ptr); + blob.set_quick((char*) ptr+HA_KEY_BLOB_LENGTH, + blob_length, &my_charset_bin); + if (append_escaped(to, &blob)) + DBUG_RETURN(1); + } + else if (part->key_part_flag & HA_VAR_LENGTH_PART) + { + String varchar; + uint var_length= uint2korr(ptr); + varchar.set_quick((char*) ptr+HA_KEY_BLOB_LENGTH, + var_length, &my_charset_bin); + if (append_escaped(to, &varchar)) + DBUG_RETURN(1); + } + else + { + char strbuff[MAX_FIELD_WIDTH]; + String str(strbuff, sizeof(strbuff), part->field->charset()), *res; + + res= field->val_str(&str, (char *)ptr); + + if (field->result_type() == STRING_RESULT) + { + if (append_escaped(to, res)) + DBUG_RETURN(1); + } + else if (to->append(res->ptr(), res->length())) + DBUG_RETURN(1); + } + + if (is_like && to->append(FEDERATED_PERCENT)) + DBUG_RETURN(1); + + if (needs_quotes && to->append(FEDERATED_SQUOTE)) + DBUG_RETURN(1); DBUG_RETURN(0); } @@ -716,6 +854,8 @@ uint ha_federated::convert_row_to_internal_format(byte *record, MYSQL_ROW row) key_info KEY struct pointer key byte pointer containing key key_length length of key + range_type 0 - no range, 1 - min range, 2 - max range + (see enum range_operation) DESCRIPTION Using iteration through all the keys via a KEY_PART_INFO pointer, @@ -726,112 +866,402 @@ uint ha_federated::convert_row_to_internal_format(byte *record, MYSQL_ROW row) 0 After all keys have been accounted for to create the WHERE clause 1 No keys found - */ + Range flags Table per Timour: + + ----------------- + - start_key: + * ">" -> HA_READ_AFTER_KEY + * ">=" -> HA_READ_KEY_OR_NEXT + * "=" -> HA_READ_KEY_EXACT + + - end_key: + * "<" -> HA_READ_BEFORE_KEY + * "<=" -> HA_READ_AFTER_KEY + + records_in_range: + ----------------- + - start_key: + * ">" -> HA_READ_AFTER_KEY + * ">=" -> HA_READ_KEY_EXACT + * "=" -> HA_READ_KEY_EXACT + + - end_key: + * "<" -> HA_READ_BEFORE_KEY + * "<=" -> HA_READ_AFTER_KEY + * "=" -> HA_READ_AFTER_KEY -bool ha_federated::create_where_from_key(String *to, KEY *key_info, - const byte *key, uint key_length) +0 HA_READ_KEY_EXACT, Find first record else error +1 HA_READ_KEY_OR_NEXT, Record or next record +2 HA_READ_KEY_OR_PREV, Record or previous +3 HA_READ_AFTER_KEY, Find next rec. after key-record +4 HA_READ_BEFORE_KEY, Find next rec. before key-record +5 HA_READ_PREFIX, Key which as same prefix +6 HA_READ_PREFIX_LAST, Last key with the same prefix +7 HA_READ_PREFIX_LAST_OR_PREV, Last or prev key with the same prefix + +Flags that I've found: + +id, primary key, varchar + +id = 'ccccc' +records_in_range: start_key 0 end_key 3 +read_range_first: start_key 0 end_key NULL + +id > 'ccccc' +records_in_range: start_key 3 end_key NULL +read_range_first: start_key 3 end_key NULL + +id < 'ccccc' +records_in_range: start_key NULL end_key 4 +read_range_first: start_key NULL end_key 4 + +id <= 'ccccc' +records_in_range: start_key NULL end_key 3 +read_range_first: start_key NULL end_key 3 + +id >= 'ccccc' +records_in_range: start_key 0 end_key NULL +read_range_first: start_key 1 end_key NULL + +id like 'cc%cc' +records_in_range: start_key 0 end_key 3 +read_range_first: start_key 1 end_key 3 + +id > 'aaaaa' and id < 'ccccc' +records_in_range: start_key 3 end_key 4 +read_range_first: start_key 3 end_key 4 + +id >= 'aaaaa' and id < 'ccccc'; +records_in_range: start_key 0 end_key 4 +read_range_first: start_key 1 end_key 4 + +id >= 'aaaaa' and id <= 'ccccc'; +records_in_range: start_key 0 end_key 3 +read_range_first: start_key 1 end_key 3 + +id > 'aaaaa' and id <= 'ccccc'; +records_in_range: start_key 3 end_key 3 +read_range_first: start_key 3 end_key 3 + +numeric keys: + +id = 4 +index_read_idx: start_key 0 end_key NULL + +id > 4 +records_in_range: start_key 3 end_key NULL +read_range_first: start_key 3 end_key NULL + +id >= 4 +records_in_range: start_key 0 end_key NULL +read_range_first: start_key 1 end_key NULL + +id < 4 +records_in_range: start_key NULL end_key 4 +read_range_first: start_key NULL end_key 4 + +id <= 4 +records_in_range: start_key NULL end_key 3 +read_range_first: start_key NULL end_key 3 + +id like 4 +full table scan, select * from + +id > 2 and id < 8 +records_in_range: start_key 3 end_key 4 +read_range_first: start_key 3 end_key 4 + +id >= 2 and id < 8 +records_in_range: start_key 0 end_key 4 +read_range_first: start_key 1 end_key 4 + +id >= 2 and id <= 8 +records_in_range: start_key 0 end_key 3 +read_range_first: start_key 1 end_key 3 + +id > 2 and id <= 8 +records_in_range: start_key 3 end_key 3 +read_range_first: start_key 3 end_key 3 + +multi keys (id int, name varchar, other varchar) + +id = 1; +records_in_range: start_key 0 end_key 3 +read_range_first: start_key 0 end_key NULL + +id > 4; +id > 2 and name = '333'; remote: id > 2 +id > 2 and name > '333'; remote: id > 2 +id > 2 and name > '333' and other < 'ddd'; remote: id > 2 no results +id > 2 and name >= '333' and other < 'ddd'; remote: id > 2 1 result +id >= 4 and name = 'eric was here' and other > 'eeee'; +records_in_range: start_key 3 end_key NULL +read_range_first: start_key 3 end_key NULL + +id >= 4; +id >= 2 and name = '333' and other < 'ddd'; +remote: `id` >= 2 AND `name` >= '333'; +records_in_range: start_key 0 end_key NULL +read_range_first: start_key 1 end_key NULL + +id < 4; +id < 3 and name = '222' and other <= 'ccc'; remote: id < 3 +records_in_range: start_key NULL end_key 4 +read_range_first: start_key NULL end_key 4 + +id <= 4; +records_in_range: start_key NULL end_key 3 +read_range_first: start_key NULL end_key 3 + +id like 4; +full table scan + +id > 2 and id < 4; +records_in_range: start_key 3 end_key 4 +read_range_first: start_key 3 end_key 4 + +id >= 2 and id < 4; +records_in_range: start_key 0 end_key 4 +read_range_first: start_key 1 end_key 4 + +id >= 2 and id <= 4; +records_in_range: start_key 0 end_key 3 +read_range_first: start_key 1 end_key 3 + +id > 2 and id <= 4; +id = 6 and name = 'eric was here' and other > 'eeee'; +remote: (`id` > 6 AND `name` > 'eric was here' AND `other` > 'eeee') +AND (`id` <= 6) AND ( AND `name` <= 'eric was here') +no results +records_in_range: start_key 3 end_key 3 +read_range_first: start_key 3 end_key 3 + +Summary: + +* If the start key flag is 0 the max key flag shouldn't even be set, + and if it is, the query produced would be invalid. +* Multipart keys, even if containing some or all numeric columns, + are treated the same as non-numeric keys + + If the query is " = " (quotes or not): + - records in range start key flag HA_READ_KEY_EXACT, + end key flag HA_READ_AFTER_KEY (incorrect) + - any other: start key flag HA_READ_KEY_OR_NEXT, + end key flag HA_READ_AFTER_KEY (correct) + +* 'like' queries (of key) + - Numeric, full table scan + - Non-numeric + records_in_range: start_key 0 end_key 3 + other : start_key 1 end_key 3 + +* If the key flag is HA_READ_AFTER_KEY: + if start_key, append > + if end_key, append <= + +* If create_where_key was called by records_in_range: + + - if the key is numeric: + start key flag is 0 when end key is NULL, end key flag is 3 or 4 + - if create_where_key was called by any other function: + start key flag is 1 when end key is NULL, end key flag is 3 or 4 + - if the key is non-numeric, or multipart + When the query is an exact match, the start key flag is 0, + end key flag is 3 for what should be a no-range condition where + you should have 0 and max key NULL, which it is if called by + read_range_first + +Conclusion: + +1. Need logic to determin if a key is min or max when the flag is +HA_READ_AFTER_KEY, and handle appending correct operator accordingly + +2. Need a boolean flag to pass to create_where_from_key, used in the +switch statement. Add 1 to the flag if: + - start key flag is HA_READ_KEY_EXACT and the end key is NULL + +*/ + +bool ha_federated::create_where_from_key(String *to, + KEY *key_info, + const key_range *start_key, + const key_range *end_key, + bool records_in_range) { - uint second_loop= 0; - KEY_PART_INFO *key_part; - bool needs_quotes; - String tmp; + bool both_not_null= + (start_key != NULL && end_key != NULL) ? TRUE : FALSE; + const byte *ptr; + uint remainder, length; + char tmpbuff[FEDERATED_QUERY_BUFFER_SIZE]; + String tmp(tmpbuff, sizeof(tmpbuff), system_charset_info); + const key_range *ranges[2]= { start_key, end_key }; DBUG_ENTER("ha_federated::create_where_from_key"); - for (key_part= key_info->key_part; (int) key_length > 0; key_part++) - { - Field *field= key_part->field; - needs_quotes= field->needs_quotes(); - uint length= key_part->length; + tmp.length(0); + if (start_key == NULL && end_key == NULL) + DBUG_RETURN(1); - if (second_loop++ && to->append(" AND ", 5)) - DBUG_RETURN(1); - if (to->append('`') || to->append(field->field_name) || to->append("` ", 2)) - DBUG_RETURN(1); // Out of memory + for (int i= 0; i <= 1; i++) + { + bool needs_quotes; + uint loop_counter= 0; + KEY_PART_INFO *key_part; + if (ranges[i] == NULL) + continue; + const byte *key= ranges[i]->key; + uint key_length= ranges[i]->length; - if (key_part->null_bit) + if (both_not_null) { - if (*key++) - { - if (to->append("IS NULL", 7)) - DBUG_RETURN(1); - - DBUG_PRINT("info", - ("NULL type %s", to->c_ptr_quick())); - key_length-= key_part->store_length; - key+= key_part->store_length - 1; - continue; - } - key_length--; + if (i > 0) + tmp.append(FEDERATED_CONJUNCTION); + else + tmp.append(FEDERATED_OPENPAREN); } - if (to->append("= ")) - DBUG_RETURN(1); - if (needs_quotes && to->append("'")) - DBUG_RETURN(1); - if (key_part->type == HA_KEYTYPE_BIT) - { - /* This is can be treated as a hex string */ - Field_bit *field= (Field_bit *) (key_part->field); - char buff[64 + 2], *ptr; - byte *end= (byte*)(key)+length; - - buff[0]= '0'; - buff[1]= 'x'; - for (ptr= buff + 2; key < end; key++) - { - uint tmp= (uint)(uchar) *key; - *ptr++= _dig_vec_upper[tmp >> 4]; - *ptr++= _dig_vec_upper[tmp & 15]; - } - if (to->append(buff, (uint)(ptr - buff))) - DBUG_RETURN(1); - key_length-= length; - continue; - } - if (key_part->key_part_flag & HA_BLOB_PART) + for (key_part= key_info->key_part, + remainder= key_info->key_parts, + length= ranges[i]->length, + ptr= ranges[i]->key; ; + remainder--, + key_part++) { - uint blob_length= uint2korr(key); - key+= HA_KEY_BLOB_LENGTH; - key_length-= HA_KEY_BLOB_LENGTH; + Field *field= key_part->field; + uint store_length= key_part->store_length; + uint part_length= min(store_length, length); + needs_quotes= field->needs_quotes(); + DBUG_DUMP("key, start of loop", (char *) ptr, length); - tmp.set_quick((char*) key, blob_length, &my_charset_bin); - if (append_escaped(to, &tmp)) - DBUG_RETURN(1); + if (key_part->null_bit) + { + if (*ptr++) + { + if (emit_key_part_name(&tmp, key_part) || + tmp.append(FEDERATED_ISNULL)) + DBUG_RETURN(1); + continue; + } + } - length= key_part->length; - } - else if (key_part->key_part_flag & HA_VAR_LENGTH_PART) - { - length= uint2korr(key); - key+= HA_KEY_BLOB_LENGTH; - tmp.set_quick((char*) key, length, &my_charset_bin); - if (append_escaped(to, &tmp)) + if (tmp.append(FEDERATED_OPENPAREN)) DBUG_RETURN(1); - } - else - { - char buff[MAX_FIELD_WIDTH]; - String str(buff, sizeof(buff), field->charset()), *res; - res= field->val_str(&str, (char*) (key)); - if (field->result_type() == STRING_RESULT) - { - if (append_escaped(to, res)) + switch(ranges[i]->flag) { + case(HA_READ_KEY_EXACT): + if (store_length >= length || + !needs_quotes || + key_part->type == HA_KEYTYPE_BIT || + field->result_type() != STRING_RESULT) + { + if (emit_key_part_name(&tmp, key_part)) + DBUG_RETURN(1); + + if (records_in_range) + { + if (tmp.append(FEDERATED_GE)) + DBUG_RETURN(1); + } + else + { + if (tmp.append(FEDERATED_EQ)) + DBUG_RETURN(1); + } + + if (emit_key_part_element(&tmp, key_part, needs_quotes, 0, ptr, + part_length)) + DBUG_RETURN(1); + } + else + /* LIKE */ + { + if (emit_key_part_name(&tmp, key_part) || + tmp.append(FEDERATED_LIKE) || + emit_key_part_element(&tmp, key_part, needs_quotes, 1, ptr, + part_length)) + DBUG_RETURN(1); + } + break; + case(HA_READ_AFTER_KEY): + if (store_length >= length) /* end key */ + { + if (emit_key_part_name(&tmp, key_part)) + DBUG_RETURN(1); + + if (i > 0) /* end key */ + { + if (tmp.append(FEDERATED_LE)) + DBUG_RETURN(1); + } + else /* start key */ + { + if (tmp.append(FEDERATED_GT)) + DBUG_RETURN(1); + } + + if (emit_key_part_element(&tmp, key_part, needs_quotes, 0, ptr, + part_length)) + { + DBUG_RETURN(1); + } + break; + } + case(HA_READ_KEY_OR_NEXT): + if (emit_key_part_name(&tmp, key_part) || + tmp.append(FEDERATED_GE) || + emit_key_part_element(&tmp, key_part, needs_quotes, 0, ptr, + part_length)) + DBUG_RETURN(1); + break; + case(HA_READ_BEFORE_KEY): + if (store_length >= length) + { + if (emit_key_part_name(&tmp, key_part) || + tmp.append(FEDERATED_LT) || + emit_key_part_element(&tmp, key_part, needs_quotes, 0, ptr, + part_length)) + DBUG_RETURN(1); + break; + } + case(HA_READ_KEY_OR_PREV): + if (emit_key_part_name(&tmp, key_part) || + tmp.append(FEDERATED_LE) || + emit_key_part_element(&tmp, key_part, needs_quotes, 0, ptr, + part_length)) DBUG_RETURN(1); - res= field->val_str(&str, (char*) (key)); + break; + default: + DBUG_PRINT("info",("cannot handle flag %d", ranges[i]->flag)); + DBUG_RETURN(1); } - else if (to->append(res->ptr(), res->length())) + if (tmp.append(FEDERATED_CLOSEPAREN)) DBUG_RETURN(1); + +next_loop: + if (store_length >= length) + break; + DBUG_PRINT("info", ("remainder %d", remainder)); + DBUG_ASSERT(remainder > 1); + length-= store_length; + ptr+= store_length; + if (tmp.append(FEDERATED_AND)) + DBUG_RETURN(1); + + DBUG_PRINT("info", + ("create_where_from_key WHERE clause: %s", + tmp.c_ptr_quick())); } - if (needs_quotes && to->append("'")) - DBUG_RETURN(1); - DBUG_PRINT("info", - ("final value for 'to' %s", to->c_ptr_quick())); - key+= length; - key_length-= length; - DBUG_RETURN(0); } - DBUG_RETURN(1); + if (both_not_null) + if (tmp.append(FEDERATED_CLOSEPAREN)) + DBUG_RETURN(1); + + if (to->append(FEDERATED_WHERE)) + DBUG_RETURN(1); + + if (to->append(tmp)) + DBUG_RETURN(1); + + DBUG_RETURN(0); } /* @@ -842,40 +1272,47 @@ bool ha_federated::create_where_from_key(String *to, KEY *key_info, static FEDERATED_SHARE *get_share(const char *table_name, TABLE *table) { - FEDERATED_SHARE *share; - char query_buffer[IO_SIZE]; + char *select_query, *tmp_table_name; + char query_buffer[FEDERATED_QUERY_BUFFER_SIZE]; + uint tmp_table_name_length; + Field **field; String query(query_buffer, sizeof(query_buffer), &my_charset_bin); + FEDERATED_SHARE *share; + /* + In order to use this string, we must first zero it's length, + or it will contain garbage + */ query.length(0); - uint table_name_length, table_base_name_length; - char *tmp_table_name, *table_base_name, *select_query; - - /* share->table_name has the file location - we want the table's name! */ - table_base_name= (char*) table->s->table_name; - DBUG_PRINT("info", ("table_name %s", table_base_name)); - /* - So why does this exist? There is no way currently to init a storage engine. - Innodb and BDB both have modifications to the server to allow them to - do this. Since you will not want to do this, this is probably the next - best method. - */ pthread_mutex_lock(&federated_mutex); - table_name_length= (uint) strlen(table_name); - table_base_name_length= (uint) strlen(table_base_name); + tmp_table_name= (char *)table->s->table_name; + tmp_table_name_length= (uint) strlen(tmp_table_name); if (!(share= (FEDERATED_SHARE *) hash_search(&federated_open_tables, (byte*) table_name, - table_name_length))) + strlen(table_name)))) { query.set_charset(system_charset_info); - query.append("SELECT * FROM `"); + query.append(FEDERATED_SELECT); + for (field= table->field; *field; field++) + { + query.append(FEDERATED_BTICK); + query.append((*field)->field_name); + query.append(FEDERATED_BTICK); + query.append(FEDERATED_COMMA); + } + query.length(query.length()- strlen(FEDERATED_COMMA)); + query.append(FEDERATED_FROM); + query.append(FEDERATED_BTICK); + if (!(share= (FEDERATED_SHARE *) - my_multi_malloc(MYF(MY_WME | MY_ZEROFILL), + my_multi_malloc(MYF(MY_WME), &share, sizeof(*share), - &tmp_table_name, table_name_length + 1, - &select_query, query.length() + - strlen(table->s->comment) + 1, NullS))) + &tmp_table_name, tmp_table_name_length+ 1, + &select_query, + query.length()+strlen(table->s->comment)+1, + NullS))) { pthread_mutex_unlock(&federated_mutex); return NULL; @@ -884,16 +1321,15 @@ static FEDERATED_SHARE *get_share(const char *table_name, TABLE *table) if (parse_url(share, table, 0)) goto error; - query.append(share->table_base_name); - query.append("`"); - share->use_count= 0; - share->table_name_length= table_name_length; - share->table_name= tmp_table_name; + query.append(share->table_name, share->table_name_length); + query.append(FEDERATED_BTICK); share->select_query= select_query; - strmov(share->table_name, table_name); strmov(share->select_query, query.ptr()); + share->use_count= 0; + share->table_name_length= strlen(share->table_name); DBUG_PRINT("info", ("share->select_query %s", share->select_query)); + if (my_hash_insert(&federated_open_tables, (byte*) share)) goto error; thr_lock_init(&share->lock); @@ -907,9 +1343,10 @@ static FEDERATED_SHARE *get_share(const char *table_name, TABLE *table) error: pthread_mutex_unlock(&federated_mutex); if (share->scheme) + { my_free((gptr) share->scheme, MYF(0)); - my_free((gptr) share, MYF(0)); - + share->scheme= 0; + } return NULL; } @@ -922,12 +1359,16 @@ error: static int free_share(FEDERATED_SHARE *share) { + DBUG_ENTER("free_share"); pthread_mutex_lock(&federated_mutex); if (!--share->use_count) { if (share->scheme) + { my_free((gptr) share->scheme, MYF(0)); + share->scheme= 0; + } hash_delete(&federated_open_tables, (byte*) share); thr_lock_delete(&share->lock); @@ -936,23 +1377,37 @@ static int free_share(FEDERATED_SHARE *share) } pthread_mutex_unlock(&federated_mutex); - return 0; + DBUG_RETURN(0); } +ha_rows ha_federated::records_in_range(uint inx, key_range *start_key, + key_range *end_key) +{ + /* + + We really want indexes to be used as often as possible, therefore + we just need to hard-code the return value to a very low number to + force the issue + +*/ + DBUG_ENTER("ha_federated::records_in_range"); + DBUG_RETURN(FEDERATED_RECORDS_IN_RANGE); +} /* If frm_error() is called then we will use this to to find out what file extentions exist for the storage engine. This is also used by the default rename_table and delete_table method in handler.cc. */ -static const char *ha_federated_exts[] = { - NullS -}; const char **ha_federated::bas_ext() const { - return ha_federated_exts; + static const char *ext[]= + { + NullS + }; + return ext; } @@ -969,6 +1424,7 @@ const char **ha_federated::bas_ext() const int ha_federated::open(const char *name, int mode, uint test_if_locked) { + int rc; DBUG_ENTER("ha_federated::open"); if (!(share= get_share(name, table))) @@ -977,11 +1433,6 @@ int ha_federated::open(const char *name, int mode, uint test_if_locked) /* Connect to foreign database mysql_real_connect() */ mysql= mysql_init(0); - DBUG_PRINT("info", ("hostname %s", share->hostname)); - DBUG_PRINT("info", ("username %s", share->username)); - DBUG_PRINT("info", ("password %s", share->password)); - DBUG_PRINT("info", ("database %s", share->database)); - DBUG_PRINT("info", ("port %d", share->port)); if (!mysql_real_connect(mysql, share->hostname, share->username, @@ -990,8 +1441,13 @@ int ha_federated::open(const char *name, int mode, uint test_if_locked) share->port, share->socket, 0)) { - my_error(ER_CONNECT_TO_MASTER, MYF(0), mysql_error(mysql)); - DBUG_RETURN(ER_CONNECT_TO_MASTER); + int error_code; + char error_buffer[FEDERATED_QUERY_BUFFER_SIZE]; + error_code= ER_CONNECT_TO_FOREIGN_DATA_SOURCE; + my_sprintf(error_buffer, (error_buffer, ": %d : %s", + mysql_errno(mysql), mysql_error(mysql))); + my_error(error_code, MYF(0), error_buffer); + DBUG_RETURN(error_code); } /* Since we do not support transactions at this version, we can let the client @@ -1016,19 +1472,21 @@ int ha_federated::open(const char *name, int mode, uint test_if_locked) int ha_federated::close(void) { + int retval; DBUG_ENTER("ha_federated::close"); /* free the result set */ - if (result) + if (stored_result) { DBUG_PRINT("info", - ("mysql_free_result result at address %lx", result)); - mysql_free_result(result); - result= 0; + ("mysql_free_result result at address %lx", stored_result)); + mysql_free_result(stored_result); + stored_result= 0; } /* Disconnect from mysql */ mysql_close(mysql); - DBUG_RETURN(free_share(share)); + retval= free_share(share); + DBUG_RETURN(retval); } @@ -1085,61 +1543,71 @@ inline uint field_in_record_is_null(TABLE *table, int ha_federated::write_row(byte *buf) { - uint x, num_fields; + bool has_fields= FALSE; + char insert_buffer[FEDERATED_QUERY_BUFFER_SIZE]; + char values_buffer[FEDERATED_QUERY_BUFFER_SIZE]; + char insert_field_value_buffer[STRING_BUFFER_USUAL_SIZE]; Field **field; - char insert_buffer[IO_SIZE]; - char values_buffer[IO_SIZE], insert_field_value_buffer[IO_SIZE]; - /* The main insert query string */ String insert_string(insert_buffer, sizeof(insert_buffer), &my_charset_bin); - insert_string.length(0); /* The string containing the values to be added to the insert */ String values_string(values_buffer, sizeof(values_buffer), &my_charset_bin); - values_string.length(0); /* The actual value of the field, to be added to the values_string */ String insert_field_value_string(insert_field_value_buffer, sizeof(insert_field_value_buffer), &my_charset_bin); + values_string.length(0); + insert_string.length(0); insert_field_value_string.length(0); DBUG_ENTER("ha_federated::write_row"); - DBUG_PRINT("info", ("table charset name %s csname %s", - table->s->table_charset->name, table->s->table_charset->csname)); + DBUG_PRINT("info", + ("table charset name %s csname %s", + table->s->table_charset->name, + table->s->table_charset->csname)); statistic_increment(table->in_use->status_var.ha_write_count, &LOCK_status); if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT) table->timestamp_field->set_time(); - /* start off our string */ - insert_string.append("INSERT INTO `"); - insert_string.append(share->table_base_name); - insert_string.append("`"); - /* start both our field and field values strings */ - insert_string.append(" ("); - values_string.append(" VALUES ("); + /* + start both our field and field values strings + */ + insert_string.append(FEDERATED_INSERT); + insert_string.append(FEDERATED_BTICK); + insert_string.append(share->table_name, share->table_name_length); + insert_string.append(FEDERATED_BTICK); + insert_string.append(FEDERATED_OPENPAREN); + + values_string.append(FEDERATED_VALUES); + values_string.append(FEDERATED_OPENPAREN); + /* loop through the field pointer array, add any fields to both the values list and the fields list that is part of the write set + + You might ask "Why an index variable (has_fields) ?" My answer is that + we need to count how many fields we actually need */ - for (num_fields= 0, field= table->field; *field; field++) + for (field= table->field; *field; field++) { /* if there is a query id and if it's equal to the current query id */ if (ha_get_bit_in_write_set((*field)->fieldnr)) { - num_fields++; + /* + There are some fields. This will be used later to determine + whether to chop off commas and parens. + */ + has_fields= TRUE; if ((*field)->is_null()) - { - DBUG_PRINT("info", ("column %d field is_null", num_fields-1)); - insert_field_value_string.append("NULL"); - } + insert_field_value_string.append(FEDERATED_NULL); else { - DBUG_PRINT("info", ("column %d field is not null", num_fields-1)); (*field)->val_str(&insert_field_value_string); /* quote these fields if they require it */ - (*field)->quote_data(&insert_field_value_string); + (*field)->quote_data(&insert_field_value_string); } /* append the field name */ insert_string.append((*field)->field_name); @@ -1149,34 +1617,35 @@ int ha_federated::write_row(byte *buf) insert_field_value_string.length(0); /* append commas between both fields and fieldnames */ - insert_string.append(','); - values_string.append(','); - + /* + unfortunately, we can't use the logic + if *(fields + 1) to make the following + appends conditional because we may not append + if the next field doesn't match the condition: + (((*field)->query_id && (*field)->query_id == current_query_id) + */ + insert_string.append(FEDERATED_COMMA); + values_string.append(FEDERATED_COMMA); } } /* - chop of the trailing comma, or if there were no fields, a '(' - So, "INSERT INTO foo (" becomes "INSERT INTO foo " - or, with fields, "INSERT INTO foo (field1, field2," becomes - "INSERT INTO foo (field1, field2" + remove trailing comma */ - insert_string.chop(); - + insert_string.length(insert_string.length() - strlen(FEDERATED_COMMA)); /* if there were no fields, we don't want to add a closing paren AND, we don't want to chop off the last char '(' insert will be "INSERT INTO t1 VALUES ();" */ - DBUG_PRINT("info", ("x %d num fields %d", x, num_fields)); - if (num_fields > 0) + if (has_fields) { /* chops off leading commas */ - values_string.chop(); - insert_string.append(')'); + values_string.length(values_string.length() - strlen(FEDERATED_COMMA)); + insert_string.append(FEDERATED_CLOSEPAREN); } /* we always want to append this, even if there aren't any fields */ - values_string.append(')'); + values_string.append(FEDERATED_CLOSEPAREN); /* add the values */ insert_string.append(values_string); @@ -1185,8 +1654,69 @@ int ha_federated::write_row(byte *buf) if (mysql_real_query(mysql, insert_string.ptr(), insert_string.length())) { - my_error(ER_QUERY_ON_MASTER, MYF(0), mysql_error(mysql)); - DBUG_RETURN(ER_QUERY_ON_MASTER); + int error_code; + char error_buffer[FEDERATED_QUERY_BUFFER_SIZE]; + error_code= ER_QUERY_ON_FOREIGN_DATA_SOURCE; + my_sprintf(error_buffer, (error_buffer, ": %d : %s", + mysql_errno(mysql), mysql_error(mysql))); + my_error(error_code, MYF(0), error_buffer); + DBUG_RETURN(error_code); + } + + DBUG_RETURN(0); +} + + +int ha_federated::optimize(THD* thd, HA_CHECK_OPT* check_opt) +{ + char query_buffer[STRING_BUFFER_USUAL_SIZE]; + String query(query_buffer, sizeof(query_buffer), &my_charset_bin); + + DBUG_ENTER("ha_federated::optimize"); + + query.length(0); + + query.set_charset(system_charset_info); + query.append(FEDERATED_OPTIMIZE); + query.append(FEDERATED_BTICK); + query.append(share->table_name, share->table_name_length); + query.append(FEDERATED_BTICK); + + if (mysql_real_query(mysql, query.ptr(), query.length())) + { + my_error(-1, MYF(0), mysql_error(mysql)); + DBUG_RETURN(-1); + } + + DBUG_RETURN(0); +} + + +int ha_federated::repair(THD* thd, HA_CHECK_OPT* check_opt) +{ + char query_buffer[STRING_BUFFER_USUAL_SIZE]; + String query(query_buffer, sizeof(query_buffer), &my_charset_bin); + + DBUG_ENTER("ha_federated::repair"); + + query.length(0); + + query.set_charset(system_charset_info); + query.append(FEDERATED_REPAIR); + query.append(FEDERATED_BTICK); + query.append(share->table_name, share->table_name_length); + query.append(FEDERATED_BTICK); + if (check_opt->flags & T_QUICK) + query.append(FEDERATED_QUICK); + if (check_opt->flags & T_EXTEND) + query.append(FEDERATED_EXTENDED); + if (check_opt->sql_flags & TT_USEFRM) + query.append(FEDERATED_USE_FRM); + + if (mysql_real_query(mysql, query.ptr(), query.length())) + { + my_error(-1, MYF(0), mysql_error(mysql)); + DBUG_RETURN(-1); } DBUG_RETURN(0); @@ -1212,39 +1742,57 @@ int ha_federated::write_row(byte *buf) int ha_federated::update_row(const byte *old_data, byte *new_data) { - uint x= 0; - uint has_a_primary_key= 0; - uint primary_key_field_num; - char old_field_value_buffer[IO_SIZE], new_field_value_buffer[IO_SIZE]; - char update_buffer[IO_SIZE], where_buffer[IO_SIZE]; + /* + This used to control how the query was built. If there was a primary key, + the query would be built such that there was a where clause with only + that column as the condition. This is flawed, because if we have a multi-part + primary key, it would only use the first part! We don't need to do this anyway, + because read_range_first will retrieve the correct record, which is what is used + to build the WHERE clause. We can however use this to append a LIMIT to the end + if there is NOT a primary key. Why do this? Because we only are updating one + record, and LIMIT enforces this. + */ + bool has_a_primary_key= (table->s->primary_key == 0 ? TRUE : FALSE); + /* + buffers for following strings + */ + char error_buffer[FEDERATED_QUERY_BUFFER_SIZE]; + char old_field_value_buffer[STRING_BUFFER_USUAL_SIZE]; + char new_field_value_buffer[STRING_BUFFER_USUAL_SIZE]; + char update_buffer[FEDERATED_QUERY_BUFFER_SIZE]; + char where_buffer[FEDERATED_QUERY_BUFFER_SIZE]; /* stores the value to be replaced of the field were are updating */ - String old_field_value(old_field_value_buffer, sizeof(old_field_value_buffer), + String old_field_value(old_field_value_buffer, + sizeof(old_field_value_buffer), &my_charset_bin); /* stores the new value of the field */ - String new_field_value(new_field_value_buffer, sizeof(new_field_value_buffer), + String new_field_value(new_field_value_buffer, + sizeof(new_field_value_buffer), &my_charset_bin); /* stores the update query */ - String update_string(update_buffer, sizeof(update_buffer), &my_charset_bin); + String update_string(update_buffer, + sizeof(update_buffer), + &my_charset_bin); /* stores the WHERE clause */ - String where_string(where_buffer, sizeof(where_buffer), &my_charset_bin); + String where_string(where_buffer, + sizeof(where_buffer), + &my_charset_bin); DBUG_ENTER("ha_federated::update_row"); + /* + set string lengths to 0 to avoid misc chars in string + */ old_field_value.length(0); new_field_value.length(0); update_string.length(0); where_string.length(0); - has_a_primary_key= (table->s->primary_key == 0 ? 1 : 0); - primary_key_field_num= has_a_primary_key ? - table->key_info[table->s->primary_key].key_part->fieldnr - 1 : -1; - if (has_a_primary_key) - DBUG_PRINT("info", ("has a primary key")); - - update_string.append("UPDATE `"); - update_string.append(share->table_base_name); - update_string.append("`"); - update_string.append(" SET "); + update_string.append(FEDERATED_UPDATE); + update_string.append(FEDERATED_BTICK); + update_string.append(share->table_name); + update_string.append(FEDERATED_BTICK); + update_string.append(FEDERATED_SET); /* In this loop, we want to match column names to values being inserted @@ -1256,104 +1804,73 @@ int ha_federated::update_row(const byte *old_data, byte *new_data) field=oldvalue */ - for (Field **field= table->field; *field; field++, x++) + for (Field **field= table->field; *field; field++) { - /* - In all of these tests for 'has_a_primary_key', what I'm trying to - accomplish is to only use the primary key in the WHERE clause if the - table has a primary key, as opposed to a table without a primary key - in which case we have to use all the fields to create a WHERE clause - using the old/current values, as well as adding a LIMIT statement - */ - if (has_a_primary_key) - { - if (x == primary_key_field_num) - where_string.append((*field)->field_name); - } - else - where_string.append((*field)->field_name); - + where_string.append((*field)->field_name); update_string.append((*field)->field_name); - update_string.append('='); + update_string.append(FEDERATED_EQ); if ((*field)->is_null()) - { - DBUG_PRINT("info", ("column %d is NULL", x )); - new_field_value.append("NULL"); - } + new_field_value.append(FEDERATED_NULL); else { /* otherwise = */ (*field)->val_str(&new_field_value); (*field)->quote_data(&new_field_value); - if (has_a_primary_key) - { - if (x == primary_key_field_num) - where_string.append("="); - } - else if (!field_in_record_is_null(table, *field, (char*) old_data)) - where_string.append("="); + if (!field_in_record_is_null(table, *field, (char*) old_data)) + where_string.append(FEDERATED_EQ); } - if (has_a_primary_key) - { - if (x == primary_key_field_num) - { - (*field)->val_str(&old_field_value, - (char*) (old_data + (*field)->offset())); - (*field)->quote_data(&old_field_value); - where_string.append(old_field_value); - } - } + if (field_in_record_is_null(table, *field, (char*) old_data)) + where_string.append(FEDERATED_ISNULL); else { - if (field_in_record_is_null(table, *field, (char*) old_data)) - where_string.append(" IS NULL "); - else - { - uint o_len; - (*field)->val_str(&old_field_value, - (char*) (old_data + (*field)->offset())); - o_len= (*field)->pack_length(); - DBUG_PRINT("info", ("o_len %lu", o_len)); - (*field)->quote_data(&old_field_value); - where_string.append(old_field_value); - } + uint o_len; + (*field)->val_str(&old_field_value, + (char*) (old_data + (*field)->offset())); + o_len= (*field)->pack_length(); + (*field)->quote_data(&old_field_value); + where_string.append(old_field_value); } - DBUG_PRINT("info", - ("column %d new value %s old value %s", - x, new_field_value.c_ptr_quick(), old_field_value.c_ptr_quick() )); + update_string.append(new_field_value); new_field_value.length(0); - if (x + 1 < table->s->fields) + /* + Only append conjunctions if we have another field in which + to iterate + */ + if (*(field + 1)) { - update_string.append(", "); - if (!has_a_primary_key) - where_string.append(" AND "); + update_string.append(FEDERATED_COMMA); + where_string.append(FEDERATED_AND); } old_field_value.length(0); } - update_string.append(" WHERE "); + update_string.append(FEDERATED_WHERE); update_string.append(where_string); - if (! has_a_primary_key) - update_string.append(" LIMIT 1"); + /* + If this table has not a primary key, then we could possibly + update multiple rows. We want to make sure to only update one! + */ + if (!has_a_primary_key) + update_string.append(FEDERATED_LIMIT1); - DBUG_PRINT("info", ("Final update query: %s", - update_string.c_ptr_quick())); if (mysql_real_query(mysql, update_string.ptr(), update_string.length())) { - my_error(ER_QUERY_ON_MASTER, MYF(0), mysql_error(mysql)); - DBUG_RETURN(ER_QUERY_ON_MASTER); + int error_code= ER_QUERY_ON_FOREIGN_DATA_SOURCE; + char error_buffer[FEDERATED_QUERY_BUFFER_SIZE]; + my_sprintf(error_buffer, (error_buffer, ": %d : %s", + mysql_errno(mysql), mysql_error(mysql))); + my_error(error_code, MYF(0), error_buffer); + DBUG_RETURN(error_code); } - - DBUG_RETURN(0); } /* - This will delete a row. 'buf' will contain a copy of the row to be deleted. + This will delete a row. 'buf' will contain a copy of the row to be =deleted. The server will call this right after the current row has been called (from either a previous rnd_nexT() or index call). If you keep a pointer to the last row or can access a primary key it will @@ -1369,49 +1886,60 @@ int ha_federated::update_row(const byte *old_data, byte *new_data) int ha_federated::delete_row(const byte *buf) { - char delete_buffer[IO_SIZE]; - char data_buffer[IO_SIZE]; + char delete_buffer[FEDERATED_QUERY_BUFFER_SIZE]; + char data_buffer[FEDERATED_QUERY_BUFFER_SIZE]; + String delete_string(delete_buffer, sizeof(delete_buffer), &my_charset_bin); String data_string(data_buffer, sizeof(data_buffer), &my_charset_bin); + delete_string.length(0); + data_string.length(0); + DBUG_ENTER("ha_federated::delete_row"); - delete_string.length(0); - delete_string.append("DELETE FROM `"); - delete_string.append(share->table_base_name); - delete_string.append("`"); - delete_string.append(" WHERE "); + delete_string.append(FEDERATED_DELETE); + delete_string.append(FEDERATED_FROM); + delete_string.append(FEDERATED_BTICK); + delete_string.append(share->table_name); + delete_string.append(FEDERATED_BTICK); + delete_string.append(FEDERATED_WHERE); for (Field **field= table->field; *field; field++) { - Field *cur_field= *field; - data_string.length(0); - delete_string.append(cur_field->field_name); + delete_string.append((*field)->field_name); - if (cur_field->is_null_in_record((const uchar*) buf)) + if ((*field)->is_null()) { - delete_string.append(" IS "); - data_string.append("NULL"); + delete_string.append(FEDERATED_IS); + data_string.append(FEDERATED_NULL); } else { - delete_string.append("="); - cur_field->val_str(&data_string, (char*) buf+ cur_field->offset()); - cur_field->quote_data(&data_string); + delete_string.append(FEDERATED_EQ); + (*field)->val_str(&data_string); + (*field)->quote_data(&data_string); } delete_string.append(data_string); - delete_string.append(" AND "); + data_string.length(0); + + if (*(field + 1)) + delete_string.append(FEDERATED_AND); } - delete_string.length(delete_string.length()-5); // Remove AND - delete_string.append(" LIMIT 1"); + delete_string.append(FEDERATED_LIMIT1); DBUG_PRINT("info", ("Delete sql: %s", delete_string.c_ptr_quick())); if (mysql_real_query(mysql, delete_string.ptr(), delete_string.length())) { - my_error(ER_QUERY_ON_MASTER, MYF(0), mysql_error(mysql)); - DBUG_RETURN(ER_QUERY_ON_MASTER); + int error_code= ER_QUERY_ON_FOREIGN_DATA_SOURCE; + char error_buffer[FEDERATED_QUERY_BUFFER_SIZE]; + my_error(error_code, MYF(0), error_buffer); + DBUG_RETURN(error_code); } + deleted+= mysql->affected_rows; + DBUG_PRINT("info", + ("rows deleted %d rows deleted for all time %d", + int(mysql->affected_rows), deleted)); DBUG_RETURN(0); } @@ -1425,12 +1953,12 @@ int ha_federated::delete_row(const byte *buf) */ int ha_federated::index_read(byte *buf, const byte *key, - uint key_len __attribute__ ((unused)), - enum ha_rkey_function find_flag - __attribute__ ((unused))) + uint key_len, enum ha_rkey_function find_flag) { + int retval; DBUG_ENTER("ha_federated::index_read"); - DBUG_RETURN(index_read_idx(buf, active_index, key, key_len, find_flag)); + retval= index_read_idx(buf, active_index, key, key_len, find_flag); + DBUG_RETURN(retval); } @@ -1444,17 +1972,23 @@ int ha_federated::index_read(byte *buf, const byte *key, */ int ha_federated::index_read_idx(byte *buf, uint index, const byte *key, - uint key_len __attribute__ ((unused)), - enum ha_rkey_function find_flag - __attribute__ ((unused))) + uint key_len, enum ha_rkey_function find_flag) { - char index_value[IO_SIZE]; - String index_string(index_value, sizeof(index_value), &my_charset_bin); - index_string.length(0); - uint keylen; + int retval; + char error_buffer[FEDERATED_QUERY_BUFFER_SIZE]; + char index_value[STRING_BUFFER_USUAL_SIZE]; + char key_value[STRING_BUFFER_USUAL_SIZE]; + char test_value[STRING_BUFFER_USUAL_SIZE]; + char sql_query_buffer[FEDERATED_QUERY_BUFFER_SIZE]; + String index_string(index_value, + sizeof(index_value), + &my_charset_bin); + String sql_query(sql_query_buffer, + sizeof(sql_query_buffer), + &my_charset_bin); + key_range range; - char sql_query_buffer[IO_SIZE]; - String sql_query(sql_query_buffer, sizeof(sql_query_buffer), &my_charset_bin); + index_string.length(0); sql_query.length(0); DBUG_ENTER("ha_federated::index_read_idx"); @@ -1463,10 +1997,14 @@ int ha_federated::index_read_idx(byte *buf, uint index, const byte *key, &LOCK_status); sql_query.append(share->select_query); - sql_query.append(" WHERE "); - keylen= strlen((char*) (key)); - create_where_from_key(&index_string, &table->key_info[index], key, keylen); + range.key= key; + range.length= key_len; + range.flag= find_flag; + create_where_from_key(&index_string, + &table->key_info[index], + &range, + NULL, 0); sql_query.append(index_string); DBUG_PRINT("info", @@ -1478,42 +2016,50 @@ int ha_federated::index_read_idx(byte *buf, uint index, const byte *key, ("current position %d sql_query %s", current_position, sql_query.c_ptr_quick())); - if (result) + if (stored_result) { - mysql_free_result(result); - result= 0; + mysql_free_result(stored_result); + stored_result= 0; } if (mysql_real_query(mysql, sql_query.ptr(), sql_query.length())) { - my_error(ER_QUERY_ON_MASTER, MYF(0), mysql_error(mysql)); - DBUG_RETURN(ER_QUERY_ON_MASTER); + my_sprintf(error_buffer, (error_buffer, ": %d : %s", + mysql_errno(mysql), mysql_error(mysql))); + retval= ER_QUERY_ON_FOREIGN_DATA_SOURCE; + goto error; } - result= mysql_store_result(mysql); + stored_result= mysql_store_result(mysql); - if (!result) + if (!stored_result) { - table->status= STATUS_NOT_FOUND; - DBUG_RETURN(HA_ERR_END_OF_FILE); + retval= HA_ERR_END_OF_FILE; + goto error; } + /* + This basically says that the record in table->record[0] is legal, + and that it is ok to use this record, for whatever reason, such + as with a join (without it, joins will not work) + */ + table->status= 0; + + retval= rnd_next(buf); + DBUG_RETURN(retval); - if (mysql_errno(mysql)) +error: + if (stored_result) { - table->status= STATUS_NOT_FOUND; - DBUG_RETURN(mysql_errno(mysql)); + mysql_free_result(stored_result); + stored_result= 0; } - /* - This basically says that the record in table->record[0] is legal, and that it is - ok to use this record, for whatever reason, such as with a join (without it, joins - will not work) - */ - table->status=0; - - DBUG_RETURN(rnd_next(buf)); + table->status= STATUS_NOT_FOUND; + my_error(retval, MYF(0), error_buffer); + DBUG_RETURN(retval); } /* Initialized at each key walk (called multiple times unlike rnd_init()) */ int ha_federated::index_init(uint keynr, bool sorted) { + int error; DBUG_ENTER("ha_federated::index_init"); DBUG_PRINT("info", ("table: '%s' key: %d", table->s->table_name, keynr)); @@ -1521,14 +2067,90 @@ int ha_federated::index_init(uint keynr, bool sorted) DBUG_RETURN(0); } +/* + + int read_range_first(const key_range *start_key, + const key_range *end_key, + bool eq_range, bool sorted); +*/ +int ha_federated::read_range_first(const key_range *start_key, + const key_range *end_key, + bool eq_range, bool sorted) +{ + char sql_query_buffer[FEDERATED_QUERY_BUFFER_SIZE]; + int retval; + String sql_query(sql_query_buffer, + sizeof(sql_query_buffer), + &my_charset_bin); + + DBUG_ENTER("ha_federated::read_range_first"); + if (start_key == NULL && end_key == NULL) + DBUG_RETURN(0); + + sql_query.length(0); + sql_query.append(share->select_query); + create_where_from_key(&sql_query, + &table->key_info[active_index], + start_key, end_key, 0); + + if (mysql_real_query(mysql, sql_query.ptr(), sql_query.length())) + { + retval= ER_QUERY_ON_FOREIGN_DATA_SOURCE; + goto error; + } + sql_query.length(0); + + if (stored_result) + { + DBUG_PRINT("info", + ("mysql_free_result address %lx", stored_result)); + mysql_free_result(stored_result); + stored_result= 0; + } + stored_result= mysql_store_result(mysql); + + if (!stored_result) + { + retval= HA_ERR_END_OF_FILE; + goto error; + } + + /* This was successful, please let it be known! */ + table->status= 0; + + retval= rnd_next(table->record[0]); + DBUG_RETURN(retval); + +error: + table->status= STATUS_NOT_FOUND; + if (stored_result) + { + DBUG_PRINT("info", ("mysql_free_result address %lx", stored_result)); + mysql_free_result(stored_result); + stored_result= 0; + } + DBUG_RETURN(retval); +} + +int ha_federated::read_range_next() +{ + int retval; + DBUG_ENTER("ha_federated::read_range_next"); + retval= rnd_next(table->record[0]); + DBUG_RETURN(retval); +} + + /* Used to read forward through the index. */ int ha_federated::index_next(byte *buf) { + int retval; DBUG_ENTER("ha_federated::index_next"); - DBUG_RETURN(rnd_next(buf)); + statistic_increment(table->in_use->status_var.ha_read_next_count, + &LOCK_status); + retval= rnd_next(buf); + DBUG_RETURN(retval); } - - /* rnd_init() is called when the system wants the storage engine to do a table scan. @@ -1544,12 +2166,15 @@ int ha_federated::index_next(byte *buf) int ha_federated::rnd_init(bool scan) { - DBUG_ENTER("ha_federated::rnd_init"); + int num_fields, rows; + int retval; + char error_buffer[FEDERATED_QUERY_BUFFER_SIZE]; + DBUG_ENTER("ha_federated::rnd_init"); /* - This 'scan' flag is incredibly important for this handler to work - properly, especially with updates containing WHERE clauses using - indexed columns. + The use of the 'scan' flag is incredibly important for this handler + to work properly, especially with updates containing WHERE clauses + using indexed columns. When the initial query contains a WHERE clause of the query using an indexed column, it's index_read_idx that selects the exact record from @@ -1584,40 +2209,48 @@ int ha_federated::rnd_init(bool scan) if (scan) { DBUG_PRINT("info", ("share->select_query %s", share->select_query)); - if (result) + if (stored_result) { DBUG_PRINT("info", - ("mysql_free_result address %lx", result)); - mysql_free_result(result); - result= 0; + ("mysql_free_result address %lx", stored_result)); + mysql_free_result(stored_result); + stored_result= 0; } - if (mysql_real_query - (mysql, share->select_query, strlen(share->select_query))) - { - my_error(ER_QUERY_ON_MASTER, MYF(0), mysql_error(mysql)); - DBUG_RETURN(ER_QUERY_ON_MASTER); - } - result= mysql_store_result(mysql); + if (mysql_real_query(mysql, + share->select_query, + strlen(share->select_query))) + goto error; - if (mysql_errno(mysql)) - DBUG_RETURN(mysql_errno(mysql)); + stored_result= mysql_store_result(mysql); + if (!stored_result) + goto error; } DBUG_RETURN(0); + +error: + retval= ER_QUERY_ON_FOREIGN_DATA_SOURCE; + my_sprintf(error_buffer, (error_buffer, ": %d : %s", + mysql_errno(mysql), mysql_error(mysql))); + my_error(retval, MYF(0), error_buffer); + DBUG_PRINT("info", + ("return error code %d", retval)); + DBUG_RETURN(retval); } int ha_federated::rnd_end() { + int retval; DBUG_ENTER("ha_federated::rnd_end"); - if (result) + + if (stored_result) { - DBUG_PRINT("info", ("mysql_free_result address %lx", result)); - mysql_free_result(result); - result= 0; + DBUG_PRINT("info", ("mysql_free_result address %lx", stored_result)); + mysql_free_result(stored_result); + stored_result= 0; } - - mysql_free_result(result); - DBUG_RETURN(index_end()); + retval= index_end(); + DBUG_RETURN(retval); } int ha_federated::index_end(void) @@ -1639,10 +2272,11 @@ int ha_federated::index_end(void) int ha_federated::rnd_next(byte *buf) { + int retval; MYSQL_ROW row; DBUG_ENTER("ha_federated::rnd_next"); - if (result == 0) + if (stored_result == 0) { /* Return value of rnd_init is not always checked (see records.cc), @@ -1653,12 +2287,13 @@ int ha_federated::rnd_next(byte *buf) } /* Fetch a row, insert it back in a row format. */ - current_position= result->data_cursor; + current_position= stored_result->data_cursor; DBUG_PRINT("info", ("current position %d", current_position)); - if (!(row= mysql_fetch_row(result))) + if (!(row= mysql_fetch_row(stored_result))) DBUG_RETURN(HA_ERR_END_OF_FILE); - DBUG_RETURN(convert_row_to_internal_format(buf, row)); + retval= convert_row_to_internal_format(buf, row); + DBUG_RETURN(retval); } @@ -1705,13 +2340,15 @@ int ha_federated::rnd_pos(byte *buf, byte *pos) */ if (scan_flag) { + int retval; statistic_increment(table->in_use->status_var.ha_read_rnd_count, &LOCK_status); memcpy_fixed(¤t_position, pos, sizeof(MYSQL_ROW_OFFSET)); // pos /* is not aligned */ - result->current_row= 0; - result->data_cursor= current_position; - DBUG_RETURN(rnd_next(buf)); + stored_result->current_row= 0; + stored_result->data_cursor= current_position; + retval= rnd_next(buf); + DBUG_RETURN(retval); } DBUG_RETURN(0); } @@ -1760,12 +2397,91 @@ int ha_federated::rnd_pos(byte *buf, byte *pos) sql_update.cc */ -/* FIX: later version provide better information to the optimizer */ void ha_federated::info(uint flag) { + char error_buffer[FEDERATED_QUERY_BUFFER_SIZE]; + char status_buf[FEDERATED_QUERY_BUFFER_SIZE]; + char escaped_table_name[FEDERATED_QUERY_BUFFER_SIZE]; + int error; + uint error_code; + MYSQL_RES *result= 0; + MYSQL_ROW row; + String status_query_string(status_buf, sizeof(status_buf), &my_charset_bin); + DBUG_ENTER("ha_federated::info"); - records= 10000; // fix later + + error_code= ER_QUERY_ON_FOREIGN_DATA_SOURCE; + /* we want not to show table status if not needed to do so */ + if (flag & (HA_STATUS_VARIABLE | HA_STATUS_CONST)) + { + status_query_string.length(0); + status_query_string.append(FEDERATED_INFO); + status_query_string.append(FEDERATED_SQUOTE); + + escape_string_for_mysql(&my_charset_bin, (char *)escaped_table_name, + sizeof(escaped_table_name), + share->table_name, + share->table_name_length); + status_query_string.append(escaped_table_name); + status_query_string.append(FEDERATED_SQUOTE); + + if (mysql_real_query(mysql, status_query_string.ptr(), + status_query_string.length())) + goto error; + + status_query_string.length(0); + + result= mysql_store_result(mysql); + if (!result) + goto error; + + if (!mysql_num_rows(result)) + goto error; + + if (!(row= mysql_fetch_row(result))) + goto error; + + if (flag & HA_STATUS_VARIABLE | HA_STATUS_CONST) + { + /* + deleted is set in ha_federated::info + */ + /* + need to figure out what this means as far as federated is concerned, + since we don't have a "file" + + data_file_length = ? + index_file_length = ? + delete_length = ? + */ + if (row[4] != NULL) + records= (ha_rows) my_strtoll10(row[4], (char**) 0, &error); + if (row[5] != NULL) + mean_rec_length= (ha_rows) my_strtoll10(row[5], (char**) 0, &error); + if (row[12] != NULL) + update_time= (ha_rows) my_strtoll10(row[12], (char**) 0, &error); + if (row[13] != NULL) + check_time= (ha_rows) my_strtoll10(row[13], (char**) 0, &error); + } + if (flag & HA_STATUS_CONST) + { + TABLE_SHARE *share= table->s; + block_size= 4096; + } + } + + if (result) + mysql_free_result(result); + + DBUG_VOID_RETURN; + +error: + if (result) + mysql_free_result(result); + my_sprintf(error_buffer, (error_buffer, ": %d : %s", + mysql_errno(mysql), mysql_error(mysql))); + my_error(error_code, MYF(0), error_buffer); DBUG_VOID_RETURN; } @@ -1786,22 +2502,30 @@ int ha_federated::delete_all_rows() { DBUG_ENTER("ha_federated::delete_all_rows"); - char query_buffer[IO_SIZE]; + char query_buffer[FEDERATED_QUERY_BUFFER_SIZE]; String query(query_buffer, sizeof(query_buffer), &my_charset_bin); query.length(0); query.set_charset(system_charset_info); - query.append("TRUNCATE `"); - query.append(share->table_base_name); - query.append("`"); + query.append(FEDERATED_TRUNCATE); + query.append(FEDERATED_BTICK); + query.append(share->table_name); + query.append(FEDERATED_BTICK); + /* + TRUNCATE won't return anything in mysql_affected_rows + */ + deleted+= records; if (mysql_real_query(mysql, query.ptr(), query.length())) { - my_error(ER_QUERY_ON_MASTER, MYF(0), mysql_error(mysql)); - DBUG_RETURN(ER_QUERY_ON_MASTER); + int error_code= ER_QUERY_ON_FOREIGN_DATA_SOURCE; + char error_buffer[FEDERATED_QUERY_BUFFER_SIZE]; + my_sprintf(error_buffer, (error_buffer, ": %d : %s", + mysql_errno(mysql), mysql_error(mysql))); + my_error(error_code, MYF(0), error_buffer); + DBUG_RETURN(error_code); } - - DBUG_RETURN(HA_ERR_WRONG_COMMAND); + DBUG_RETURN(0); } @@ -1849,7 +2573,7 @@ THR_LOCK_DATA **ha_federated::store_lock(THD *thd, */ if ((lock_type >= TL_WRITE_CONCURRENT_INSERT && - lock_type <= TL_WRITE) && !thd->in_lock_tables && !thd->tablespace_op) + lock_type <= TL_WRITE) && !thd->in_lock_tables) lock_type= TL_WRITE_ALLOW_WRITE; /* @@ -1879,28 +2603,33 @@ THR_LOCK_DATA **ha_federated::store_lock(THD *thd, int ha_federated::create(const char *name, TABLE *table_arg, HA_CREATE_INFO *create_info) { - int connection_error=0; - FEDERATED_SHARE tmp; + int retval= 0; + /* + only a temporary share, to test the url + */ + FEDERATED_SHARE tmp_share; DBUG_ENTER("ha_federated::create"); - if (parse_url(&tmp, table_arg, 1)) - { - my_error(ER_CANT_CREATE_TABLE, MYF(0), name, 1); + if ((retval= parse_url(&tmp_share, table_arg, 1))) goto error; - } - if ((connection_error= check_foreign_data_source(&tmp))) - { - my_error(connection_error, MYF(0), name, 1); + + if ((retval= check_foreign_data_source(&tmp_share, 1))) goto error; + + if (tmp_share.scheme) + { + my_free((gptr) tmp_share.scheme, MYF(0)); + tmp_share.scheme= 0; } - - my_free((gptr) tmp.scheme, MYF(0)); - DBUG_RETURN(0); - + DBUG_RETURN(retval); + error: - DBUG_PRINT("info", ("errors, returning %d", ER_CANT_CREATE_TABLE)); - my_free((gptr) tmp.scheme, MYF(0)); - DBUG_RETURN(ER_CANT_CREATE_TABLE); + if (tmp_share.scheme) + { + my_free((gptr) tmp_share.scheme, MYF(0)); + tmp_share.scheme= 0; + } + DBUG_RETURN(retval); } #endif /* HAVE_FEDERATED_DB */ |