diff options
Diffstat (limited to 'sql/table.cc')
-rw-r--r-- | sql/table.cc | 2502 |
1 files changed, 2008 insertions, 494 deletions
diff --git a/sql/table.cc b/sql/table.cc index 18a395d69af..a445e2d2816 100644 --- a/sql/table.cc +++ b/sql/table.cc @@ -17,142 +17,609 @@ /* Some general useful functions */ #include "mysql_priv.h" -#include <errno.h> +#include "sql_trigger.h" #include <m_ctype.h> #include "my_md5.h" /* Functions defined in this file */ -static void frm_error(int error,TABLE *form,const char *name, - int errortype, int errarg); +void open_table_error(TABLE_SHARE *share, int error, int db_errno, + myf errortype, int errarg); +static int open_binary_frm(THD *thd, TABLE_SHARE *share, + uchar *head, File file); static void fix_type_pointers(const char ***array, TYPELIB *point_to_type, uint types, char **names); -static uint find_field(TABLE *form,uint start,uint length); +static uint find_field(Field **fields, uchar *record, uint start, uint length); -static byte* get_field_name(Field **buff,uint *length, - my_bool not_used __attribute__((unused))) +/************************************************************************** + Object_creation_ctx implementation. +**************************************************************************/ + +Object_creation_ctx *Object_creation_ctx::set_n_backup(THD *thd) +{ + Object_creation_ctx *backup_ctx= create_backup_ctx(thd); + + change_env(thd); + + return backup_ctx; +} + +void Object_creation_ctx::restore_env(THD *thd, Object_creation_ctx *backup_ctx) +{ + if (!backup_ctx) + return; + + backup_ctx->change_env(thd); + + delete backup_ctx; +} + +/************************************************************************** + Default_object_creation_ctx implementation. +**************************************************************************/ + +Default_object_creation_ctx::Default_object_creation_ctx(THD *thd) + : m_client_cs(thd->variables.character_set_client), + m_connection_cl(thd->variables.collation_connection) +{ } + +Default_object_creation_ctx::Default_object_creation_ctx( + CHARSET_INFO *client_cs, CHARSET_INFO *connection_cl) + : m_client_cs(client_cs), + m_connection_cl(connection_cl) +{ } + +Object_creation_ctx * +Default_object_creation_ctx::create_backup_ctx(THD *thd) +{ + return new Default_object_creation_ctx(thd); +} + +void Default_object_creation_ctx::change_env(THD *thd) const +{ + thd->variables.character_set_client= m_client_cs; + thd->variables.collation_connection= m_connection_cl; + + thd->update_charset(); +} + +/************************************************************************** + View_creation_ctx implementation. +**************************************************************************/ + +View_creation_ctx *View_creation_ctx::create(THD *thd) +{ + View_creation_ctx *ctx= new (thd->mem_root) View_creation_ctx(thd); + + return ctx; +} + +/*************************************************************************/ + +View_creation_ctx * View_creation_ctx::create(THD *thd, + TABLE_LIST *view) +{ + View_creation_ctx *ctx= new (thd->mem_root) View_creation_ctx(thd); + + /* Throw a warning if there is NULL cs name. */ + + if (!view->view_client_cs_name.str || + !view->view_connection_cl_name.str) + { + push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, + ER_VIEW_NO_CREATION_CTX, + ER(ER_VIEW_NO_CREATION_CTX), + (const char *) view->db, + (const char *) view->table_name); + + ctx->m_client_cs= system_charset_info; + ctx->m_connection_cl= system_charset_info; + + return ctx; + } + + /* Resolve cs names. Throw a warning if there is unknown cs name. */ + + bool invalid_creation_ctx; + + invalid_creation_ctx= resolve_charset(view->view_client_cs_name.str, + system_charset_info, + &ctx->m_client_cs); + + invalid_creation_ctx= resolve_collation(view->view_connection_cl_name.str, + system_charset_info, + &ctx->m_connection_cl) || + invalid_creation_ctx; + + if (invalid_creation_ctx) + { + sql_print_warning("View '%s'.'%s': there is unknown charset/collation " + "names (client: '%s'; connection: '%s').", + (const char *) view->db, + (const char *) view->table_name, + (const char *) view->view_client_cs_name.str, + (const char *) view->view_connection_cl_name.str); + + push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, + ER_VIEW_INVALID_CREATION_CTX, + ER(ER_VIEW_INVALID_CREATION_CTX), + (const char *) view->db, + (const char *) view->table_name); + } + + return ctx; +} + +/*************************************************************************/ + +/* Get column name from column hash */ + +static uchar *get_field_name(Field **buff, size_t *length, + my_bool not_used __attribute__((unused))) { *length= (uint) strlen((*buff)->field_name); - return (byte*) (*buff)->field_name; + return (uchar*) (*buff)->field_name; +} + + +/* + Returns pointer to '.frm' extension of the file name. + + SYNOPSIS + fn_rext() + name file name + + DESCRIPTION + Checks file name part starting with the rightmost '.' character, + and returns it if it is equal to '.frm'. + + TODO + It is a good idea to get rid of this function modifying the code + to garantee that the functions presently calling fn_rext() always + get arguments in the same format: either with '.frm' or without '.frm'. + + RETURN VALUES + Pointer to the '.frm' extension. If there is no extension, + or extension is not '.frm', pointer at the end of file name. +*/ + +char *fn_rext(char *name) +{ + char *res= strrchr(name, '.'); + if (res && !strcmp(res, reg_ext)) + return res; + return name + strlen(name); +} + + +/* + Allocate a setup TABLE_SHARE structure + + SYNOPSIS + alloc_table_share() + TABLE_LIST Take database and table name from there + key Table cache key (db \0 table_name \0...) + key_length Length of key + + RETURN + 0 Error (out of memory) + # Share +*/ + +TABLE_SHARE *alloc_table_share(TABLE_LIST *table_list, char *key, + uint key_length) +{ + MEM_ROOT mem_root; + TABLE_SHARE *share; + char *key_buff, *path_buff; + char path[FN_REFLEN]; + uint path_length; + DBUG_ENTER("alloc_table_share"); + DBUG_PRINT("enter", ("table: '%s'.'%s'", + table_list->db, table_list->table_name)); + + path_length= build_table_filename(path, sizeof(path) - 1, + table_list->db, + table_list->table_name, "", 0); + init_sql_alloc(&mem_root, TABLE_ALLOC_BLOCK_SIZE, 0); + if (multi_alloc_root(&mem_root, + &share, sizeof(*share), + &key_buff, key_length, + &path_buff, path_length + 1, + NULL)) + { + bzero((char*) share, sizeof(*share)); + + share->set_table_cache_key(key_buff, key, key_length); + + share->path.str= path_buff; + share->path.length= path_length; + strmov(share->path.str, path); + share->normalized_path.str= share->path.str; + share->normalized_path.length= path_length; + + share->version= refresh_version; + + /* + This constant is used to mark that no table map version has been + assigned. No arithmetic is done on the value: it will be + overwritten with a value taken from MYSQL_BIN_LOG. + */ + share->table_map_version= ~(ulonglong)0; + + /* + Since alloc_table_share() can be called without any locking (for + example, ha_create_table... functions), we do not assign a table + map id here. Instead we assign a value that is not used + elsewhere, and then assign a table map id inside open_table() + under the protection of the LOCK_open mutex. + */ + share->table_map_id= ~0UL; + share->cached_row_logging_check= -1; + + memcpy((char*) &share->mem_root, (char*) &mem_root, sizeof(mem_root)); + pthread_mutex_init(&share->mutex, MY_MUTEX_INIT_FAST); + pthread_cond_init(&share->cond, NULL); + } + DBUG_RETURN(share); +} + + +/* + Initialize share for temporary tables + + SYNOPSIS + init_tmp_table_share() + share Share to fill + key Table_cache_key, as generated from create_table_def_key. + must start with db name. + key_length Length of key + table_name Table name + path Path to file (possible in lower case) without .frm + + NOTES + This is different from alloc_table_share() because temporary tables + don't have to be shared between threads or put into the table def + cache, so we can do some things notable simpler and faster + + If table is not put in thd->temporary_tables (happens only when + one uses OPEN TEMPORARY) then one can specify 'db' as key and + use key_length= 0 as neither table_cache_key or key_length will be used). +*/ + +void init_tmp_table_share(TABLE_SHARE *share, const char *key, + uint key_length, const char *table_name, + const char *path) +{ + DBUG_ENTER("init_tmp_table_share"); + DBUG_PRINT("enter", ("table: '%s'.'%s'", key, table_name)); + + bzero((char*) share, sizeof(*share)); + init_sql_alloc(&share->mem_root, TABLE_ALLOC_BLOCK_SIZE, 0); + share->tmp_table= INTERNAL_TMP_TABLE; + share->db.str= (char*) key; + share->db.length= strlen(key); + share->table_cache_key.str= (char*) key; + share->table_cache_key.length= key_length; + share->table_name.str= (char*) table_name; + share->table_name.length= strlen(table_name); + share->path.str= (char*) path; + share->normalized_path.str= (char*) path; + share->path.length= share->normalized_path.length= strlen(path); + share->frm_version= FRM_VER_TRUE_VARCHAR; + + /* + Temporary tables are not replicated, but we set up these fields + anyway to be able to catch errors. + */ + share->table_map_version= ~(ulonglong)0; + share->table_map_id= ~0UL; + share->cached_row_logging_check= -1; + + DBUG_VOID_RETURN; } + /* - Open a .frm file + Free table share and memory used by it + + SYNOPSIS + free_table_share() + share Table share + + NOTES + share->mutex must be locked when we come here if it's not a temp table +*/ + +void free_table_share(TABLE_SHARE *share) +{ + MEM_ROOT mem_root; + DBUG_ENTER("free_table_share"); + DBUG_PRINT("enter", ("table: %s.%s", share->db.str, share->table_name.str)); + DBUG_ASSERT(share->ref_count == 0); + + /* + If someone is waiting for this to be deleted, inform it about this. + Don't do a delete until we know that no one is refering to this anymore. + */ + if (share->tmp_table == NO_TMP_TABLE) + { + /* share->mutex is locked in release_table_share() */ + while (share->waiting_on_cond) + { + pthread_cond_broadcast(&share->cond); + pthread_cond_wait(&share->cond, &share->mutex); + } + /* No thread refers to this anymore */ + pthread_mutex_unlock(&share->mutex); + pthread_mutex_destroy(&share->mutex); + pthread_cond_destroy(&share->cond); + } + hash_free(&share->name_hash); + + plugin_unlock(NULL, share->db_plugin); + share->db_plugin= NULL; + + /* We must copy mem_root from share because share is allocated through it */ + memcpy((char*) &mem_root, (char*) &share->mem_root, sizeof(mem_root)); + free_root(&mem_root, MYF(0)); // Free's share + DBUG_VOID_RETURN; +} + + +/** + Return TRUE if a table name matches one of the system table names. + Currently these are: + + help_category, help_keyword, help_relation, help_topic, + proc, event + time_zone, time_zone_leap_second, time_zone_name, time_zone_transition, + time_zone_transition_type + This function trades accuracy for speed, so may return false + positives. Presumably mysql.* database is for internal purposes only + and should not contain user tables. +*/ + +inline bool is_system_table_name(const char *name, uint length) +{ + CHARSET_INFO *ci= system_charset_info; + + return ( + /* mysql.proc table */ + length == 4 && + my_tolower(ci, name[0]) == 'p' && + my_tolower(ci, name[1]) == 'r' && + my_tolower(ci, name[2]) == 'o' && + my_tolower(ci, name[3]) == 'c' || + + length > 4 && + ( + /* one of mysql.help* tables */ + my_tolower(ci, name[0]) == 'h' && + my_tolower(ci, name[1]) == 'e' && + my_tolower(ci, name[2]) == 'l' && + my_tolower(ci, name[3]) == 'p' || + + /* one of mysql.time_zone* tables */ + my_tolower(ci, name[0]) == 't' && + my_tolower(ci, name[1]) == 'i' && + my_tolower(ci, name[2]) == 'm' && + my_tolower(ci, name[3]) == 'e' || + + /* mysql.event table */ + my_tolower(ci, name[0]) == 'e' && + my_tolower(ci, name[1]) == 'v' && + my_tolower(ci, name[2]) == 'e' && + my_tolower(ci, name[3]) == 'n' && + my_tolower(ci, name[4]) == 't' + ) + ); +} + + +/* + Read table definition from a binary / text based .frm file + SYNOPSIS - openfrm() + open_table_def() + thd Thread handler + share Fill this with table definition + db_flags Bit mask of the following flags: OPEN_VIEW - name path to table-file "db/name" - alias alias for table - db_stat open flags (for example HA_OPEN_KEYFILE|HA_OPEN_RNDFILE..) - can be 0 (example in ha_example_table) - prgflag READ_ALL etc.. - ha_open_flags HA_OPEN_ABORT_IF_LOCKED etc.. - outparam result table + NOTES + This function is called when the table definition is not cached in + table_def_cache + The data is returned in 'share', which is alloced by + alloc_table_share().. The code assumes that share is initialized. RETURN VALUES 0 ok - 1 Error (see frm_error) - 2 Error (see frm_error) + 1 Error (see open_table_error) + 2 Error (see open_table_error) 3 Wrong data in .frm file - 4 Error (see frm_error) - 5 Error (see frm_error: charset unavailable) + 4 Error (see open_table_error) + 5 Error (see open_table_error: charset unavailable) 6 Unknown .frm version */ -int openfrm(THD *thd, const char *name, const char *alias, uint db_stat, - uint prgflag, uint ha_open_flags, TABLE *outparam) -{ - reg1 uint i; - reg2 uchar *strpos; - int j,error, errarg= 0; - uint rec_buff_length,n_length,int_length,records,key_parts,keys, - interval_count,interval_parts,read_length,db_create_options; - uint key_info_length, com_length; - ulong pos, record_offset; - char index_file[FN_REFLEN], *names, *keynames, *comment_pos; - uchar head[288],*disk_buff,new_field_pack_flag; - my_string record; - const char **int_array; - bool use_hash, null_field_first; - bool error_reported= FALSE; - File file; - Field **field_ptr,*reg_field; - KEY *keyinfo; - KEY_PART_INFO *key_part; - uchar *null_pos; - uint null_bit_pos, new_frm_ver, field_pack_length; - SQL_CRYPT *crypted=0; +int open_table_def(THD *thd, TABLE_SHARE *share, uint db_flags) +{ + int error, table_type; + bool error_given; + File file; + uchar head[288], *disk_buff; + char path[FN_REFLEN]; MEM_ROOT **root_ptr, *old_root; - TABLE_SHARE *share; - DBUG_ENTER("openfrm"); - DBUG_PRINT("enter",("name: '%s' form: 0x%lx", name, (long) outparam)); + DBUG_ENTER("open_table_def"); + DBUG_PRINT("enter", ("table: '%s'.'%s' path: '%s'", share->db.str, + share->table_name.str, share->normalized_path.str)); error= 1; + error_given= 0; disk_buff= NULL; - root_ptr= my_pthread_getspecific_ptr(MEM_ROOT**, THR_MALLOC); - old_root= *root_ptr; - - bzero((char*) outparam,sizeof(*outparam)); - outparam->in_use= thd; - outparam->s= share= &outparam->share_not_to_be_used; - if ((file=my_open(fn_format(index_file, name, "", reg_ext, - MY_UNPACK_FILENAME), - O_RDONLY | O_SHARE, - MYF(0))) - < 0) - goto err; + strxmov(path, share->normalized_path.str, reg_ext, NullS); + if ((file= my_open(path, O_RDONLY | O_SHARE, MYF(0))) < 0) + { + /* + We don't try to open 5.0 unencoded name, if + - non-encoded name contains '@' signs, + because '@' can be misinterpreted. + It is not clear if '@' is escape character in 5.1, + or a normal character in 5.0. + + - non-encoded db or table name contain "#mysql50#" prefix. + This kind of tables must have been opened only by the + my_open() above. + */ + if (strchr(share->table_name.str, '@') || + !strncmp(share->db.str, MYSQL50_TABLE_NAME_PREFIX, + MYSQL50_TABLE_NAME_PREFIX_LENGTH) || + !strncmp(share->table_name.str, MYSQL50_TABLE_NAME_PREFIX, + MYSQL50_TABLE_NAME_PREFIX_LENGTH)) + goto err_not_open; + + /* Try unencoded 5.0 name */ + uint length; + strxnmov(path, sizeof(path)-1, + mysql_data_home, "/", share->db.str, "/", + share->table_name.str, reg_ext, NullS); + length= unpack_filename(path, path) - reg_ext_length; + /* + The following is a safety test and should never fail + as the old file name should never be longer than the new one. + */ + DBUG_ASSERT(length <= share->normalized_path.length); + /* + If the old and the new names have the same length, + then table name does not have tricky characters, + so no need to check the old file name. + */ + if (length == share->normalized_path.length || + ((file= my_open(path, O_RDONLY | O_SHARE, MYF(0))) < 0)) + goto err_not_open; + + /* Unencoded 5.0 table name found */ + path[length]= '\0'; // Remove .frm extension + strmov(share->normalized_path.str, path); + share->normalized_path.length= length; + } error= 4; - if (my_read(file,(byte*) head,64,MYF(MY_NABP))) + if (my_read(file, head, 64, MYF(MY_NABP))) goto err; - if (memcmp(head, STRING_WITH_LEN("TYPE=")) == 0) + if (head[0] == (uchar) 254 && head[1] == 1) { - // new .frm - my_close(file,MYF(MY_WME)); - - if (db_stat & NO_ERR_ON_NEW_FRM) - DBUG_RETURN(5); - file= -1; - // caller can't process new .frm + if (head[2] == FRM_VER || head[2] == FRM_VER+1 || + (head[2] >= FRM_VER+3 && head[2] <= FRM_VER+4)) + table_type= 1; + else + { + error= 6; // Unkown .frm version + goto err; + } + } + else if (memcmp(head, STRING_WITH_LEN("TYPE=")) == 0) + { + error= 5; + if (memcmp(head+5,"VIEW",4) == 0) + { + share->is_view= 1; + if (db_flags & OPEN_VIEW) + error= 0; + } goto err; } - if (prgflag & OPEN_VIEW_NO_PARSE) + else goto err; - share->blob_ptr_size= sizeof(char*); - outparam->db_stat= db_stat; - init_sql_alloc(&outparam->mem_root, TABLE_ALLOC_BLOCK_SIZE, 0); - *root_ptr= &outparam->mem_root; + /* No handling of text based files yet */ + if (table_type == 1) + { + root_ptr= my_pthread_getspecific_ptr(MEM_ROOT**, THR_MALLOC); + old_root= *root_ptr; + *root_ptr= &share->mem_root; + error= open_binary_frm(thd, share, head, file); + *root_ptr= old_root; + + if (share->db.length == 5 && !(lower_case_table_names ? + my_strcasecmp(system_charset_info, share->db.str, "mysql") : + strcmp(share->db.str, "mysql"))) + { + /* + We can't mark all tables in 'mysql' database as system since we don't + allow to lock such tables for writing with any other tables (even with + other system tables) and some privilege tables need this. + */ + share->system_table= is_system_table_name(share->table_name.str, + share->table_name.length); + if (!share->system_table) + { + share->log_table= check_if_log_table(share->db.length, share->db.str, + share->table_name.length, + share->table_name.str, 0); + } + } + error_given= 1; + } - share->table_name= strdup_root(&outparam->mem_root, - name+dirname_length(name)); - share->path= strdup_root(&outparam->mem_root, name); - outparam->alias= my_strdup(alias, MYF(MY_WME)); - if (!share->table_name || !share->path || !outparam->alias) - goto err; - *fn_ext(share->table_name)='\0'; // Remove extension - *fn_ext(share->path)='\0'; // Remove extension + if (!error) + thd->status_var.opened_shares++; - if (head[0] != (uchar) 254 || head[1] != 1) - goto err; /* purecov: inspected */ - if (head[2] != FRM_VER && head[2] != FRM_VER+1 && - ! (head[2] >= FRM_VER+3 && head[2] <= FRM_VER+4)) +err: + my_close(file, MYF(MY_WME)); + +err_not_open: + if (error && !error_given) { - error= 6; - goto err; /* purecov: inspected */ + share->error= error; + open_table_error(share, error, (share->open_errno= my_errno), 0); } - new_field_pack_flag=head[27]; + + DBUG_RETURN(error); +} + + +/* + Read data from a binary .frm file from MySQL 3.23 - 5.0 into TABLE_SHARE +*/ + +static int open_binary_frm(THD *thd, TABLE_SHARE *share, uchar *head, + File file) +{ + int error, errarg= 0; + uint new_frm_ver, field_pack_length, new_field_pack_flag; + uint interval_count, interval_parts, read_length, int_length; + uint db_create_options, keys, key_parts, n_length; + uint key_info_length, com_length, null_bit_pos; + uint extra_rec_buf_length; + uint i,j; + bool use_hash; + char *keynames, *names, *comment_pos; + uchar *record; + uchar *disk_buff, *strpos, *null_flags, *null_pos; + ulong pos, record_offset, *rec_per_key, rec_buff_length; + handler *handler_file= 0; + KEY *keyinfo; + KEY_PART_INFO *key_part; + SQL_CRYPT *crypted=0; + Field **field_ptr, *reg_field; + const char **interval_array; + enum legacy_db_type legacy_db_type; + my_bitmap_map *bitmaps; + DBUG_ENTER("open_binary_frm"); + + new_field_pack_flag= head[27]; new_frm_ver= (head[2] - FRM_VER); field_pack_length= new_frm_ver < 2 ? 11 : 17; + disk_buff= 0; - error=3; + error= 3; if (!(pos=get_form_pos(file,head,(TYPELIB*) 0))) goto err; /* purecov: inspected */ - *fn_ext(index_file)='\0'; // Remove .frm extension share->frm_version= head[2]; /* @@ -164,20 +631,33 @@ int openfrm(THD *thd, const char *name, const char *alias, uint db_stat, if (share->frm_version == FRM_VER_TRUE_VARCHAR -1 && head[33] == 5) share->frm_version= FRM_VER_TRUE_VARCHAR; - share->db_type= ha_checktype(thd,(enum db_type) (uint) *(head+3),0,0); - share->db_create_options= db_create_options=uint2korr(head+30); +#ifdef WITH_PARTITION_STORAGE_ENGINE + if (*(head+61) && + !(share->default_part_db_type= + ha_checktype(thd, (enum legacy_db_type) (uint) *(head+61), 1, 0))) + goto err; + DBUG_PRINT("info", ("default_part_db_type = %u", head[61])); +#endif + legacy_db_type= (enum legacy_db_type) (uint) *(head+3); + DBUG_ASSERT(share->db_plugin == NULL); + /* + if the storage engine is dynamic, no point in resolving it by its + dynamically allocated legacy_db_type. We will resolve it later by name. + */ + if (legacy_db_type > DB_TYPE_UNKNOWN && + legacy_db_type < DB_TYPE_FIRST_DYNAMIC) + share->db_plugin= ha_lock_engine(NULL, + ha_checktype(thd, legacy_db_type, 0, 0)); + share->db_create_options= db_create_options= uint2korr(head+30); share->db_options_in_use= share->db_create_options; share->mysql_version= uint4korr(head+51); - null_field_first= 0; + share->null_field_first= 0; if (!head[32]) // New frm file in 3.23 { share->avg_row_length= uint4korr(head+34); share-> row_type= (row_type) head[40]; - share->raid_type= head[41]; - share->raid_chunks= head[42]; - share->raid_chunksize= uint4korr(head+43); share->table_charset= get_charset((uint) head[38],MYF(0)); - null_field_first= 1; + share->null_field_first= 1; } if (!share->table_charset) { @@ -188,7 +668,7 @@ int openfrm(THD *thd, const char *name, const char *alias, uint db_stat, sql_print_warning("'%s' had no or invalid character set, " "and default character set is multi-byte, " "so character column sizes may have changed", - name); + share->path.str); } share->table_charset= default_charset_info; } @@ -196,7 +676,7 @@ int openfrm(THD *thd, const char *name, const char *alias, uint db_stat, if (db_create_options & HA_OPTION_LONG_BLOB_PTR) share->blob_ptr_size= portable_sizeof_char_ptr; /* Set temporarily a good value for db_low_byte_first */ - share->db_low_byte_first= test(share->db_type != DB_TYPE_ISAM); + share->db_low_byte_first= test(legacy_db_type != DB_TYPE_ISAM); error=4; share->max_rows= uint4korr(head+18); share->min_rows= uint4korr(head+22); @@ -204,7 +684,7 @@ int openfrm(THD *thd, const char *name, const char *alias, uint db_stat, /* Read keyinformation */ key_info_length= (uint) uint2korr(head+28); VOID(my_seek(file,(ulong) uint2korr(head+6),MY_SEEK_SET,MYF(0))); - if (read_string(file,(gptr*) &disk_buff,key_info_length)) + if (read_string(file,(uchar**) &disk_buff,key_info_length)) goto err; /* purecov: inspected */ if (disk_buff[0] & 0x80) { @@ -218,33 +698,30 @@ int openfrm(THD *thd, const char *name, const char *alias, uint db_stat, } share->keys_for_keyread.init(0); share->keys_in_use.init(keys); - outparam->quick_keys.init(); - outparam->used_keys.init(); - outparam->keys_in_use_for_query.init(); n_length=keys*sizeof(KEY)+key_parts*sizeof(KEY_PART_INFO); - if (!(keyinfo = (KEY*) alloc_root(&outparam->mem_root, - n_length+uint2korr(disk_buff+4)))) + if (!(keyinfo = (KEY*) alloc_root(&share->mem_root, + n_length + uint2korr(disk_buff+4)))) goto err; /* purecov: inspected */ bzero((char*) keyinfo,n_length); - outparam->key_info=keyinfo; + share->key_info= keyinfo; key_part= my_reinterpret_cast(KEY_PART_INFO*) (keyinfo+keys); strpos=disk_buff+6; - ulong *rec_per_key; - if (!(rec_per_key= (ulong*) alloc_root(&outparam->mem_root, + if (!(rec_per_key= (ulong*) alloc_root(&share->mem_root, sizeof(ulong*)*key_parts))) goto err; for (i=0 ; i < keys ; i++, keyinfo++) { - keyinfo->table= outparam; + keyinfo->table= 0; // Updated in open_frm if (new_frm_ver >= 3) { keyinfo->flags= (uint) uint2korr(strpos) ^ HA_NOSAME; keyinfo->key_length= (uint) uint2korr(strpos+2); keyinfo->key_parts= (uint) strpos[4]; keyinfo->algorithm= (enum ha_key_alg) strpos[5]; + keyinfo->block_size= uint2korr(strpos+6); strpos+=8; } else @@ -294,10 +771,8 @@ int openfrm(THD *thd, const char *name, const char *alias, uint db_stat, #ifdef HAVE_CRYPTED_FRM else if (*(head+26) == 2) { - *root_ptr= old_root - crypted=get_crypt_for_frm(); - *root_ptr= &outparam->mem_root; - outparam->crypted=1; + crypted= get_crypt_for_frm(); + share->crypted= 1; } #endif @@ -305,94 +780,172 @@ int openfrm(THD *thd, const char *name, const char *alias, uint db_stat, ((uint2korr(head+14) == 0xffff ? uint4korr(head+47) : uint2korr(head+14)))); - if ((n_length= uint2korr(head+55))) + if ((n_length= uint4korr(head+55))) { /* Read extra data segment */ - char *buff, *next_chunk, *buff_end; - if (!(next_chunk= buff= my_malloc(n_length, MYF(MY_WME)))) + uchar *buff, *next_chunk, *buff_end; + DBUG_PRINT("info", ("extra segment size is %u bytes", n_length)); + if (!(next_chunk= buff= (uchar*) my_malloc(n_length, MYF(MY_WME)))) goto err; - buff_end= buff + n_length; - if (my_pread(file, (byte*)buff, n_length, record_offset + share->reclength, + if (my_pread(file, buff, n_length, record_offset + share->reclength, MYF(MY_NABP))) { my_free(buff, MYF(0)); goto err; } share->connect_string.length= uint2korr(buff); - if (! (share->connect_string.str= strmake_root(&outparam->mem_root, - next_chunk + 2, share->connect_string.length))) + if (!(share->connect_string.str= strmake_root(&share->mem_root, + (char*) next_chunk + 2, + share->connect_string. + length))) { my_free(buff, MYF(0)); goto err; } next_chunk+= share->connect_string.length + 2; + buff_end= buff + n_length; if (next_chunk + 2 < buff_end) { uint str_db_type_length= uint2korr(next_chunk); - share->db_type= ha_resolve_by_name(next_chunk + 2, str_db_type_length); - DBUG_PRINT("enter", ("Setting dbtype to: %d - %d - '%.*s'\n", - share->db_type, - str_db_type_length, str_db_type_length, - next_chunk + 2)); + LEX_STRING name; + name.str= (char*) next_chunk + 2; + name.length= str_db_type_length; + + plugin_ref tmp_plugin= ha_resolve_by_name(thd, &name); + if (tmp_plugin != NULL && !plugin_equals(tmp_plugin, share->db_plugin)) + { + if (legacy_db_type > DB_TYPE_UNKNOWN && + legacy_db_type < DB_TYPE_FIRST_DYNAMIC && + legacy_db_type != ha_legacy_type( + plugin_data(tmp_plugin, handlerton *))) + { + /* bad file, legacy_db_type did not match the name */ + my_free(buff, MYF(0)); + goto err; + } + /* + tmp_plugin is locked with a local lock. + we unlock the old value of share->db_plugin before + replacing it with a globally locked version of tmp_plugin + */ + plugin_unlock(NULL, share->db_plugin); + share->db_plugin= my_plugin_lock(NULL, &tmp_plugin); + DBUG_PRINT("info", ("setting dbtype to '%.*s' (%d)", + str_db_type_length, next_chunk + 2, + ha_legacy_type(share->db_type()))); + } +#ifdef WITH_PARTITION_STORAGE_ENGINE + else + { + LEX_STRING pname= { C_STRING_WITH_LEN( "partition" ) }; + if (str_db_type_length == pname.length && + !strncmp((char *) next_chunk + 2, pname.str, pname.length)) + { + /* + Use partition handler + tmp_plugin is locked with a local lock. + we unlock the old value of share->db_plugin before + replacing it with a globally locked version of tmp_plugin + */ + plugin_unlock(NULL, share->db_plugin); + share->db_plugin= ha_lock_engine(NULL, partition_hton); + DBUG_PRINT("info", ("setting dbtype to '%.*s' (%d)", + str_db_type_length, next_chunk + 2, + ha_legacy_type(share->db_type()))); + } + } +#endif next_chunk+= str_db_type_length + 2; } + if (next_chunk + 5 < buff_end) + { + uint32 partition_info_len = uint4korr(next_chunk); +#ifdef WITH_PARTITION_STORAGE_ENGINE + if ((share->partition_info_buffer_size= + share->partition_info_len= partition_info_len)) + { + if (!(share->partition_info= (char*) + memdup_root(&share->mem_root, next_chunk + 4, + partition_info_len + 1))) + { + my_free(buff, MYF(0)); + goto err; + } + } +#else + if (partition_info_len) + { + DBUG_PRINT("info", ("WITH_PARTITION_STORAGE_ENGINE is not defined")); + my_free(buff, MYF(0)); + goto err; + } +#endif + next_chunk+= 5 + partition_info_len; + } +#if MYSQL_VERSION_ID < 50200 + if (share->mysql_version >= 50106 && share->mysql_version <= 50109) + { + /* + Partition state array was here in version 5.1.6 to 5.1.9, this code + makes it possible to load a 5.1.6 table in later versions. Can most + likely be removed at some point in time. Will only be used for + upgrades within 5.1 series of versions. Upgrade to 5.2 can only be + done from newer 5.1 versions. + */ + next_chunk+= 4; + } + else if (share->mysql_version >= 50110) +#endif + { + /* New auto_partitioned indicator introduced in 5.1.11 */ +#ifdef WITH_PARTITION_STORAGE_ENGINE + share->auto_partitioned= *next_chunk; +#endif + next_chunk++; + } + keyinfo= share->key_info; + for (i= 0; i < keys; i++, keyinfo++) + { + if (keyinfo->flags & HA_USES_PARSER) + { + LEX_STRING parser_name; + if (next_chunk >= buff_end) + { + DBUG_PRINT("error", + ("fulltext key uses parser that is not defined in .frm")); + my_free(buff, MYF(0)); + goto err; + } + parser_name.str= (char*) next_chunk; + parser_name.length= strlen((char*) next_chunk); + keyinfo->parser= my_plugin_lock_by_name(NULL, &parser_name, + MYSQL_FTPARSER_PLUGIN); + if (! keyinfo->parser) + { + my_error(ER_PLUGIN_IS_NOT_LOADED, MYF(0), parser_name.str); + my_free(buff, MYF(0)); + goto err; + } + } + } my_free(buff, MYF(0)); } - /* Allocate handler */ - if (!(outparam->file= get_new_handler(outparam, &outparam->mem_root, - share->db_type))) - goto err; + share->key_block_size= uint2korr(head+62); error=4; - outparam->reginfo.lock_type= TL_UNLOCK; - outparam->current_lock=F_UNLCK; - if ((db_stat & HA_OPEN_KEYFILE) || (prgflag & DELAYED_OPEN)) - records=2; - else - records=1; - if (prgflag & (READ_ALL+EXTRA_RECORD)) - records++; - /* QQ: TODO, remove the +1 from below */ - rec_buff_length= ALIGN_SIZE(share->reclength + 1 + - outparam->file->extra_rec_buf_length()); + extra_rec_buf_length= uint2korr(head+59); + rec_buff_length= ALIGN_SIZE(share->reclength + 1 + extra_rec_buf_length); share->rec_buff_length= rec_buff_length; - if (!(record= (char *) alloc_root(&outparam->mem_root, - rec_buff_length * records))) + if (!(record= (uchar *) alloc_root(&share->mem_root, + rec_buff_length))) goto err; /* purecov: inspected */ - share->default_values= (byte *) record; - - if (my_pread(file,(byte*) record, (uint) share->reclength, + share->default_values= record; + if (my_pread(file, record, (size_t) share->reclength, record_offset, MYF(MY_NABP))) - goto err; /* purecov: inspected */ + goto err; /* purecov: inspected */ - if (records == 1) - { - /* We are probably in hard repair, and the buffers should not be used */ - outparam->record[0]= outparam->record[1]= share->default_values; - } - else - { - outparam->record[0]= (byte *) record+ rec_buff_length; - if (records > 2) - outparam->record[1]= (byte *) record+ rec_buff_length*2; - else - outparam->record[1]= outparam->record[0]; // Safety - } - -#ifdef HAVE_purify - /* - We need this because when we read var-length rows, we are not updating - bytes after end of varchar - */ - if (records > 1) - { - memcpy(outparam->record[0], share->default_values, rec_buff_length); - if (records > 2) - memcpy(outparam->record[1], share->default_values, rec_buff_length); - } -#endif VOID(my_seek(file,pos,MY_SEEK_SET,MYF(0))); - if (my_read(file,(byte*) head,288,MYF(MY_NABP))) + if (my_read(file, head,288,MYF(MY_NABP))) goto err; #ifdef HAVE_CRYPTED_FRM if (crypted) @@ -412,24 +965,24 @@ int openfrm(THD *thd, const char *name, const char *alias, uint db_stat, share->null_fields= uint2korr(head+282); com_length= uint2korr(head+284); share->comment.length= (int) (head[46]); - share->comment.str= strmake_root(&outparam->mem_root, (char*) head+47, + share->comment.str= strmake_root(&share->mem_root, (char*) head+47, share->comment.length); DBUG_PRINT("info",("i_count: %d i_parts: %d index: %d n_length: %d int_length: %d com_length: %d", interval_count,interval_parts, share->keys,n_length,int_length, com_length)); if (!(field_ptr = (Field **) - alloc_root(&outparam->mem_root, + alloc_root(&share->mem_root, (uint) ((share->fields+1)*sizeof(Field*)+ interval_count*sizeof(TYPELIB)+ (share->fields+interval_parts+ - keys+3)*sizeof(my_string)+ + keys+3)*sizeof(char *)+ (n_length+int_length+com_length))))) goto err; /* purecov: inspected */ - outparam->field=field_ptr; + share->field= field_ptr; read_length=(uint) (share->fields * field_pack_length + pos+ (uint) (n_length+int_length+com_length)); - if (read_string(file,(gptr*) &disk_buff,read_length)) + if (read_string(file,(uchar**) &disk_buff,read_length)) goto err; /* purecov: inspected */ #ifdef HAVE_CRYPTED_FRM if (crypted) @@ -442,8 +995,8 @@ int openfrm(THD *thd, const char *name, const char *alias, uint db_stat, strpos= disk_buff+pos; share->intervals= (TYPELIB*) (field_ptr+share->fields+1); - int_array= (const char **) (share->intervals+interval_count); - names= (char*) (int_array+share->fields+interval_parts+keys+3); + interval_array= (const char **) (share->intervals+interval_count); + names= (char*) (interval_array+share->fields+interval_parts+keys+3); if (!interval_count) share->intervals= 0; // For better debugging memcpy((char*) names, strpos+(share->fields*field_pack_length), @@ -451,10 +1004,10 @@ int openfrm(THD *thd, const char *name, const char *alias, uint db_stat, comment_pos= names+(n_length+int_length); memcpy(comment_pos, disk_buff+read_length-com_length, com_length); - fix_type_pointers(&int_array, &share->fieldnames, 1, &names); + fix_type_pointers(&interval_array, &share->fieldnames, 1, &names); if (share->fieldnames.count != share->fields) goto err; - fix_type_pointers(&int_array, share->intervals, interval_count, + fix_type_pointers(&interval_array, share->intervals, interval_count, &names); { @@ -465,7 +1018,7 @@ int openfrm(THD *thd, const char *name, const char *alias, uint db_stat, interval++) { uint count= (uint) (interval->count + 1) * sizeof(uint); - if (!(interval->type_lengths= (uint *) alloc_root(&outparam->mem_root, + if (!(interval->type_lengths= (uint *) alloc_root(&share->mem_root, count))) goto err; for (count= 0; count < interval->count; count++) @@ -478,14 +1031,17 @@ int openfrm(THD *thd, const char *name, const char *alias, uint db_stat, } if (keynames) - fix_type_pointers(&int_array, &share->keynames, 1, &keynames); - VOID(my_close(file,MYF(MY_WME))); - file= -1; + fix_type_pointers(&interval_array, &share->keynames, 1, &keynames); - record= (char*) outparam->record[0]-1; /* Fieldstart = 1 */ - if (null_field_first) + /* Allocate handler */ + if (!(handler_file= get_new_handler(share, thd->mem_root, + share->db_type()))) + goto err; + + record= share->default_values-1; /* Fieldstart = 1 */ + if (share->null_field_first) { - outparam->null_flags=null_pos=(uchar*) record+1; + null_flags= null_pos= (uchar*) record+1; null_bit_pos= (db_create_options & HA_OPTION_PACK_RECORD) ? 0 : 1; /* null_bytes below is only correct under the condition that @@ -494,13 +1050,15 @@ int openfrm(THD *thd, const char *name, const char *alias, uint db_stat, */ share->null_bytes= (share->null_fields + null_bit_pos + 7) / 8; } +#ifndef WE_WANT_TO_SUPPORT_VERY_OLD_FRM_FILES else { share->null_bytes= (share->null_fields+7)/8; - outparam->null_flags= null_pos= - (uchar*) (record+1+share->reclength-share->null_bytes); + null_flags= null_pos= (uchar*) (record + 1 +share->reclength - + share->null_bytes); null_bit_pos= 0; } +#endif use_hash= share->fields >= MAX_FIELDS_BEFORE_HASH; if (use_hash) @@ -529,7 +1087,7 @@ int openfrm(THD *thd, const char *name, const char *alias, uint db_stat, field_type=(enum_field_types) (uint) strpos[13]; /* charset and geometry_type share the same byte in frm */ - if (field_type == FIELD_TYPE_GEOMETRY) + if (field_type == MYSQL_TYPE_GEOMETRY) { #ifdef HAVE_SPATIAL geom_type= (Field::geometry_type) strpos[14]; @@ -604,7 +1162,7 @@ int openfrm(THD *thd, const char *name, const char *alias, uint db_stat, } #ifndef TO_BE_DELETED_ON_PRODUCTION - if (field_type == FIELD_TYPE_NEWDECIMAL && !share->mysql_version) + if (field_type == MYSQL_TYPE_NEWDECIMAL && !share->mysql_version) { /* Fix pack length of old decimal values from 5.0.3 -> 5.0.4 @@ -615,16 +1173,23 @@ int openfrm(THD *thd, const char *name, const char *alias, uint db_stat, field_length= my_decimal_precision_to_length(field_length, decimals, f_is_dec(pack_flag) == 0); - sql_print_error("Found incompatible DECIMAL field '%s' in %s; Please do \"ALTER TABLE '%s' FORCE\" to fix it!", share->fieldnames.type_names[i], name, share->table_name); + sql_print_error("Found incompatible DECIMAL field '%s' in %s; " + "Please do \"ALTER TABLE '%s' FORCE\" to fix it!", + share->fieldnames.type_names[i], share->table_name.str, + share->table_name.str); push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR, ER_CRASHED_ON_USAGE, - "Found incompatible DECIMAL field '%s' in %s; Please do \"ALTER TABLE '%s' FORCE\" to fix it!", share->fieldnames.type_names[i], name, share->table_name); + "Found incompatible DECIMAL field '%s' in %s; " + "Please do \"ALTER TABLE '%s' FORCE\" to fix it!", + share->fieldnames.type_names[i], + share->table_name.str, + share->table_name.str); share->crashed= 1; // Marker for CHECK TABLE } #endif - *field_ptr=reg_field= - make_field(record+recpos, + *field_ptr= reg_field= + make_field(share, record+recpos, (uint32) field_length, null_pos, null_bit_pos, pack_flag, @@ -635,8 +1200,7 @@ int openfrm(THD *thd, const char *name, const char *alias, uint db_stat, (interval_nr ? share->intervals+interval_nr-1 : (TYPELIB*) 0), - share->fieldnames.type_names[i], - outparam); + share->fieldnames.type_names[i]); if (!reg_field) // Not supported field type { error= 4; @@ -645,7 +1209,7 @@ int openfrm(THD *thd, const char *name, const char *alias, uint db_stat, reg_field->field_index= i; reg_field->comment=comment; - if (field_type == FIELD_TYPE_BIT && !f_bit_as_char(pack_flag)) + if (field_type == MYSQL_TYPE_BIT && !f_bit_as_char(pack_flag)) { if ((null_bit_pos+= field_length & 7) > 7) { @@ -660,12 +1224,15 @@ int openfrm(THD *thd, const char *name, const char *alias, uint db_stat, } if (f_no_default(pack_flag)) reg_field->flags|= NO_DEFAULT_VALUE_FLAG; + if (reg_field->unireg_check == Field::NEXT_NUMBER) - outparam->found_next_number_field= reg_field; - if (outparam->timestamp_field == reg_field) + share->found_next_number_field= field_ptr; + if (share->timestamp_field == reg_field) share->timestamp_field_offset= i; + if (use_hash) - (void) my_hash_insert(&share->name_hash,(byte*) field_ptr); // never fail + (void) my_hash_insert(&share->name_hash, + (uchar*) field_ptr); // never fail } *field_ptr=0; // End marker @@ -674,17 +1241,17 @@ int openfrm(THD *thd, const char *name, const char *alias, uint db_stat, { uint primary_key=(uint) (find_type((char*) primary_key_name, &share->keynames, 3) - 1); - uint ha_option=outparam->file->table_flags(); - keyinfo=outparam->key_info; - key_part=keyinfo->key_part; + longlong ha_option= handler_file->ha_table_flags(); + keyinfo= share->key_info; + key_part= keyinfo->key_part; for (uint key=0 ; key < share->keys ; key++,keyinfo++) { - uint usable_parts=0; + uint usable_parts= 0; keyinfo->name=(char*) share->keynames.type_names[key]; /* Fix fulltext keys for old .frm files */ - if (outparam->key_info[key].flags & HA_FULLTEXT) - outparam->key_info[key].algorithm= HA_KEY_ALG_FULLTEXT; + if (share->key_info[key].flags & HA_FULLTEXT) + share->key_info[key].algorithm= HA_KEY_ALG_FULLTEXT; if (primary_key >= MAX_KEY && (keyinfo->flags & HA_NOSAME)) { @@ -697,8 +1264,8 @@ int openfrm(THD *thd, const char *name, const char *alias, uint db_stat, { uint fieldnr= key_part[i].fieldnr; if (!fieldnr || - outparam->field[fieldnr-1]->null_ptr || - outparam->field[fieldnr-1]->key_length() != + share->field[fieldnr-1]->null_ptr || + share->field[fieldnr-1]->key_length() != key_part[i].length) { primary_key=MAX_KEY; // Can't be used @@ -709,135 +1276,130 @@ int openfrm(THD *thd, const char *name, const char *alias, uint db_stat, for (i=0 ; i < keyinfo->key_parts ; key_part++,i++) { + Field *field; if (new_field_pack_flag <= 1) - key_part->fieldnr=(uint16) find_field(outparam, - (uint) key_part->offset, - (uint) key_part->length); -#ifdef EXTRA_DEBUG - if (key_part->fieldnr > share->fields) - goto err; // sanity check -#endif - if (key_part->fieldnr) - { // Should always be true ! - Field *field=key_part->field=outparam->field[key_part->fieldnr-1]; - key_part->type= field->key_type(); - if (field->null_ptr) - { - key_part->null_offset=(uint) ((byte*) field->null_ptr - - outparam->record[0]); - key_part->null_bit= field->null_bit; - key_part->store_length+=HA_KEY_NULL_LENGTH; - keyinfo->flags|=HA_NULL_PART_KEY; - keyinfo->extra_length+= HA_KEY_NULL_LENGTH; - keyinfo->key_length+= HA_KEY_NULL_LENGTH; - } - if (field->type() == FIELD_TYPE_BLOB || - field->real_type() == MYSQL_TYPE_VARCHAR) - { - if (field->type() == FIELD_TYPE_BLOB) - key_part->key_part_flag|= HA_BLOB_PART; - else - key_part->key_part_flag|= HA_VAR_LENGTH_PART; - keyinfo->extra_length+=HA_KEY_BLOB_LENGTH; - key_part->store_length+=HA_KEY_BLOB_LENGTH; - keyinfo->key_length+= HA_KEY_BLOB_LENGTH; - /* - Mark that there may be many matching values for one key - combination ('a', 'a ', 'a '...) - */ - if (!(field->flags & BINARY_FLAG)) - keyinfo->flags|= HA_END_SPACE_KEY; - } - if (field->type() == MYSQL_TYPE_BIT) - key_part->key_part_flag|= HA_BIT_PART; - - if (i == 0 && key != primary_key) - field->flags |= ((keyinfo->flags & HA_NOSAME) && - (keyinfo->key_parts == 1)) ? - UNIQUE_KEY_FLAG : MULTIPLE_KEY_FLAG; - if (i == 0) - field->key_start.set_bit(key); - if (field->key_length() == key_part->length && - !(field->flags & BLOB_FLAG)) - { - if (outparam->file->index_flags(key, i, 0) & HA_KEYREAD_ONLY) - { - share->keys_for_keyread.set_bit(key); - field->part_of_key.set_bit(key); - } - if (outparam->file->index_flags(key, i, 1) & HA_READ_ORDER) - field->part_of_sortkey.set_bit(key); - } - if (!(key_part->key_part_flag & HA_REVERSE_SORT) && - usable_parts == i) - usable_parts++; // For FILESORT - field->flags|= PART_KEY_FLAG; - if (key == primary_key) - { - field->flags|= PRI_KEY_FLAG; - /* - If this field is part of the primary key and all keys contains - the primary key, then we can use any key to find this column - */ - if (ha_option & HA_PRIMARY_KEY_IN_READ_INDEX) - { - field->part_of_key= share->keys_in_use; - if (field->part_of_sortkey.is_set(key)) - field->part_of_sortkey= share->keys_in_use; - } - } - if (field->key_length() != key_part->length) - { + key_part->fieldnr= (uint16) find_field(share->field, + share->default_values, + (uint) key_part->offset, + (uint) key_part->length); + if (!key_part->fieldnr) + { + error= 4; // Wrong file + goto err; + } + field= key_part->field= share->field[key_part->fieldnr-1]; + key_part->type= field->key_type(); + if (field->null_ptr) + { + key_part->null_offset=(uint) ((uchar*) field->null_ptr - + share->default_values); + key_part->null_bit= field->null_bit; + key_part->store_length+=HA_KEY_NULL_LENGTH; + keyinfo->flags|=HA_NULL_PART_KEY; + keyinfo->extra_length+= HA_KEY_NULL_LENGTH; + keyinfo->key_length+= HA_KEY_NULL_LENGTH; + } + if (field->type() == MYSQL_TYPE_BLOB || + field->real_type() == MYSQL_TYPE_VARCHAR) + { + if (field->type() == MYSQL_TYPE_BLOB) + key_part->key_part_flag|= HA_BLOB_PART; + else + key_part->key_part_flag|= HA_VAR_LENGTH_PART; + keyinfo->extra_length+=HA_KEY_BLOB_LENGTH; + key_part->store_length+=HA_KEY_BLOB_LENGTH; + keyinfo->key_length+= HA_KEY_BLOB_LENGTH; + /* + Mark that there may be many matching values for one key + combination ('a', 'a ', 'a '...) + */ + if (!(field->flags & BINARY_FLAG)) + keyinfo->flags|= HA_END_SPACE_KEY; + } + if (field->type() == MYSQL_TYPE_BIT) + key_part->key_part_flag|= HA_BIT_PART; + + if (i == 0 && key != primary_key) + field->flags |= (((keyinfo->flags & HA_NOSAME) && + (keyinfo->key_parts == 1)) ? + UNIQUE_KEY_FLAG : MULTIPLE_KEY_FLAG); + if (i == 0) + field->key_start.set_bit(key); + if (field->key_length() == key_part->length && + !(field->flags & BLOB_FLAG)) + { + if (handler_file->index_flags(key, i, 0) & HA_KEYREAD_ONLY) + { + share->keys_for_keyread.set_bit(key); + field->part_of_key.set_bit(key); + field->part_of_key_not_clustered.set_bit(key); + } + if (handler_file->index_flags(key, i, 1) & HA_READ_ORDER) + field->part_of_sortkey.set_bit(key); + } + if (!(key_part->key_part_flag & HA_REVERSE_SORT) && + usable_parts == i) + usable_parts++; // For FILESORT + field->flags|= PART_KEY_FLAG; + if (key == primary_key) + { + field->flags|= PRI_KEY_FLAG; + /* + If this field is part of the primary key and all keys contains + the primary key, then we can use any key to find this column + */ + if (ha_option & HA_PRIMARY_KEY_IN_READ_INDEX) + { + field->part_of_key= share->keys_in_use; + if (field->part_of_sortkey.is_set(key)) + field->part_of_sortkey= share->keys_in_use; + } + } + if (field->key_length() != key_part->length) + { #ifndef TO_BE_DELETED_ON_PRODUCTION - if (field->type() == FIELD_TYPE_NEWDECIMAL) - { - /* - Fix a fatal error in decimal key handling that causes crashes - on Innodb. We fix it by reducing the key length so that - InnoDB never gets a too big key when searching. - This allows the end user to do an ALTER TABLE to fix the - error. - */ - keyinfo->key_length-= (key_part->length - field->key_length()); - key_part->store_length-= (uint16)(key_part->length - - field->key_length()); - key_part->length= (uint16)field->key_length(); - sql_print_error("Found wrong key definition in %s; Please do \"ALTER TABLE '%s' FORCE \" to fix it!", name, share->table_name); - push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR, - ER_CRASHED_ON_USAGE, - "Found wrong key definition in %s; Please do \"ALTER TABLE '%s' FORCE\" to fix it!", name, share->table_name); - - share->crashed= 1; // Marker for CHECK TABLE - goto to_be_deleted; - } + if (field->type() == MYSQL_TYPE_NEWDECIMAL) + { + /* + Fix a fatal error in decimal key handling that causes crashes + on Innodb. We fix it by reducing the key length so that + InnoDB never gets a too big key when searching. + This allows the end user to do an ALTER TABLE to fix the + error. + */ + keyinfo->key_length-= (key_part->length - field->key_length()); + key_part->store_length-= (uint16)(key_part->length - + field->key_length()); + key_part->length= (uint16)field->key_length(); + sql_print_error("Found wrong key definition in %s; " + "Please do \"ALTER TABLE '%s' FORCE \" to fix it!", + share->table_name.str, + share->table_name.str); + push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR, + ER_CRASHED_ON_USAGE, + "Found wrong key definition in %s; " + "Please do \"ALTER TABLE '%s' FORCE\" to fix " + "it!", + share->table_name.str, + share->table_name.str); + share->crashed= 1; // Marker for CHECK TABLE + goto to_be_deleted; + } #endif - key_part->key_part_flag|= HA_PART_KEY_SEG; - if (!(field->flags & BLOB_FLAG)) - { // Create a new field - field=key_part->field=field->new_field(&outparam->mem_root, - outparam, - outparam == field->table); - field->field_length=key_part->length; - } - } + key_part->key_part_flag|= HA_PART_KEY_SEG; + } to_be_deleted: - /* - If the field can be NULL, don't optimize away the test - key_part_column = expression from the WHERE clause - as we need to test for NULL = NULL. - */ - if (field->real_maybe_null()) - key_part->key_part_flag|= HA_NULL_PART; - } - else - { // Error: shorten key - keyinfo->key_parts=usable_parts; - keyinfo->flags=0; - } + /* + If the field can be NULL, don't optimize away the test + key_part_column = expression from the WHERE clause + as we need to test for NULL = NULL. + */ + if (field->real_maybe_null()) + key_part->key_part_flag|= HA_NULL_PART; } - keyinfo->usable_key_parts=usable_parts; // Filesort + keyinfo->usable_key_parts= usable_parts; // Filesort set_if_bigger(share->max_key_length,keyinfo->key_length+ keyinfo->key_parts); @@ -858,11 +1420,15 @@ int openfrm(THD *thd, const char *name, const char *alias, uint db_stat, If we are using an integer as the primary key then allow the user to refer to it as '_rowid' */ - if (outparam->key_info[primary_key].key_parts == 1) + if (share->key_info[primary_key].key_parts == 1) { - Field *field= outparam->key_info[primary_key].key_part[0].field; + Field *field= share->key_info[primary_key].key_part[0].field; if (field && field->result_type() == INT_RESULT) - outparam->rowid_field=field; + { + /* note that fieldnr here (and rowid_field_offset) starts from 1 */ + share->rowid_field_offset= (share->key_info[primary_key].key_part[0]. + fieldnr); + } } } else @@ -870,27 +1436,31 @@ int openfrm(THD *thd, const char *name, const char *alias, uint db_stat, } else share->primary_key= MAX_KEY; - x_free((gptr) disk_buff); + x_free((uchar*) disk_buff); disk_buff=0; if (new_field_pack_flag <= 1) { /* Old file format with default as not null */ uint null_length= (share->null_fields+7)/8; - bfill(share->default_values + (outparam->null_flags - (uchar*) record), + bfill(share->default_values + (null_flags - (uchar*) record), null_length, 255); } - if ((reg_field=outparam->found_next_number_field)) + if (share->found_next_number_field) { + reg_field= *share->found_next_number_field; if ((int) (share->next_number_index= (uint) - find_ref_key(outparam,reg_field, - &share->next_number_key_offset)) < 0) + find_ref_key(share->key_info, share->keys, + share->default_values, reg_field, + &share->next_number_key_offset, + &share->next_number_keypart)) < 0) { - reg_field->unireg_check=Field::NONE; /* purecov: inspected */ - outparam->found_next_number_field=0; + /* Wrong field definition */ + error= 4; + goto err; } else - reg_field->flags|=AUTO_INCREMENT_FLAG; + reg_field->flags |= AUTO_INCREMENT_FLAG; } if (share->blob_fields) @@ -900,10 +1470,10 @@ int openfrm(THD *thd, const char *name, const char *alias, uint db_stat, /* Store offsets to blob fields to find them fast */ if (!(share->blob_field= save= - (uint*) alloc_root(&outparam->mem_root, + (uint*) alloc_root(&share->mem_root, (uint) (share->blob_fields* sizeof(uint))))) goto err; - for (k=0, ptr= outparam->field ; *ptr ; ptr++, k++) + for (k=0, ptr= share->field ; *ptr ; ptr++, k++) { if ((*ptr)->flags & BLOB_FLAG) (*save++)= k; @@ -914,18 +1484,295 @@ int openfrm(THD *thd, const char *name, const char *alias, uint db_stat, the correct null_bytes can now be set, since bitfields have been taken into account */ - share->null_bytes= (null_pos - (uchar*) outparam->null_flags + + share->null_bytes= (null_pos - (uchar*) null_flags + (null_bit_pos + 7) / 8); share->last_null_bit_pos= null_bit_pos; + share->db_low_byte_first= handler_file->low_byte_first(); + share->column_bitmap_size= bitmap_buffer_size(share->fields); + + if (!(bitmaps= (my_bitmap_map*) alloc_root(&share->mem_root, + share->column_bitmap_size))) + goto err; + bitmap_init(&share->all_set, bitmaps, share->fields, FALSE); + bitmap_set_all(&share->all_set); + + delete handler_file; +#ifndef DBUG_OFF + if (use_hash) + (void) hash_check(&share->name_hash); +#endif + DBUG_RETURN (0); + + err: + share->error= error; + share->open_errno= my_errno; + share->errarg= errarg; + x_free((uchar*) disk_buff); + delete crypted; + delete handler_file; + hash_free(&share->name_hash); + + open_table_error(share, error, share->open_errno, errarg); + DBUG_RETURN(error); +} /* open_binary_frm */ + + +/* + Open a table based on a TABLE_SHARE + + SYNOPSIS + open_table_from_share() + thd Thread handler + share Table definition + alias Alias for table + db_stat open flags (for example HA_OPEN_KEYFILE| + HA_OPEN_RNDFILE..) can be 0 (example in + ha_example_table) + prgflag READ_ALL etc.. + ha_open_flags HA_OPEN_ABORT_IF_LOCKED etc.. + outparam result table + + RETURN VALUES + 0 ok + 1 Error (see open_table_error) + 2 Error (see open_table_error) + 3 Wrong data in .frm file + 4 Error (see open_table_error) + 5 Error (see open_table_error: charset unavailable) + 7 Table definition has changed in engine +*/ + +int open_table_from_share(THD *thd, TABLE_SHARE *share, const char *alias, + uint db_stat, uint prgflag, uint ha_open_flags, + TABLE *outparam, bool is_create_table) +{ + int error; + uint records, i, bitmap_size; + bool error_reported= FALSE; + uchar *record, *bitmaps; + Field **field_ptr; + DBUG_ENTER("open_table_from_share"); + DBUG_PRINT("enter",("name: '%s.%s' form: 0x%lx", share->db.str, + share->table_name.str, (long) outparam)); + + error= 1; + bzero((char*) outparam, sizeof(*outparam)); + outparam->in_use= thd; + outparam->s= share; + outparam->db_stat= db_stat; + outparam->write_row_record= NULL; + + init_sql_alloc(&outparam->mem_root, TABLE_ALLOC_BLOCK_SIZE, 0); + + if (!(outparam->alias= my_strdup(alias, MYF(MY_WME)))) + goto err; + outparam->quick_keys.init(); + outparam->covering_keys.init(); + outparam->keys_in_use_for_query.init(); + + /* Allocate handler */ + if (!(outparam->file= get_new_handler(share, &outparam->mem_root, + share->db_type()))) + goto err; + + error= 4; + outparam->reginfo.lock_type= TL_UNLOCK; + outparam->current_lock= F_UNLCK; + records=0; + if ((db_stat & HA_OPEN_KEYFILE) || (prgflag & DELAYED_OPEN)) + records=1; + if (prgflag & (READ_ALL+EXTRA_RECORD)) + records++; + + if (!(record= (uchar*) alloc_root(&outparam->mem_root, + share->rec_buff_length * records))) + goto err; /* purecov: inspected */ + + if (records == 0) + { + /* We are probably in hard repair, and the buffers should not be used */ + outparam->record[0]= outparam->record[1]= share->default_values; + } + else + { + outparam->record[0]= record; + if (records > 1) + outparam->record[1]= record+ share->rec_buff_length; + else + outparam->record[1]= outparam->record[0]; // Safety + } + +#ifdef HAVE_purify + /* + We need this because when we read var-length rows, we are not updating + bytes after end of varchar + */ + if (records > 1) + { + memcpy(outparam->record[0], share->default_values, share->rec_buff_length); + memcpy(outparam->record[1], share->default_values, share->null_bytes); + if (records > 2) + memcpy(outparam->record[1], share->default_values, + share->rec_buff_length); + } +#endif + + if (!(field_ptr = (Field **) alloc_root(&outparam->mem_root, + (uint) ((share->fields+1)* + sizeof(Field*))))) + goto err; /* purecov: inspected */ + + outparam->field= field_ptr; + + record= (uchar*) outparam->record[0]-1; /* Fieldstart = 1 */ + if (share->null_field_first) + outparam->null_flags= (uchar*) record+1; + else + outparam->null_flags= (uchar*) (record+ 1+ share->reclength - + share->null_bytes); + + /* Setup copy of fields from share, but use the right alias and record */ + for (i=0 ; i < share->fields; i++, field_ptr++) + { + if (!((*field_ptr)= share->field[i]->clone(&outparam->mem_root, outparam))) + goto err; + } + (*field_ptr)= 0; // End marker + + if (share->found_next_number_field) + outparam->found_next_number_field= + outparam->field[(uint) (share->found_next_number_field - share->field)]; + if (share->timestamp_field) + outparam->timestamp_field= (Field_timestamp*) outparam->field[share->timestamp_field_offset]; + + + /* Fix key->name and key_part->field */ + if (share->key_parts) + { + KEY *key_info, *key_info_end; + KEY_PART_INFO *key_part; + uint n_length; + n_length= share->keys*sizeof(KEY) + share->key_parts*sizeof(KEY_PART_INFO); + if (!(key_info= (KEY*) alloc_root(&outparam->mem_root, n_length))) + goto err; + outparam->key_info= key_info; + key_part= (my_reinterpret_cast(KEY_PART_INFO*) (key_info+share->keys)); + + memcpy(key_info, share->key_info, sizeof(*key_info)*share->keys); + memcpy(key_part, share->key_info[0].key_part, (sizeof(*key_part) * + share->key_parts)); + + for (key_info_end= key_info + share->keys ; + key_info < key_info_end ; + key_info++) + { + KEY_PART_INFO *key_part_end; + + key_info->table= outparam; + key_info->key_part= key_part; + + for (key_part_end= key_part+ key_info->key_parts ; + key_part < key_part_end ; + key_part++) + { + Field *field= key_part->field= outparam->field[key_part->fieldnr-1]; + + if (field->key_length() != key_part->length && + !(field->flags & BLOB_FLAG)) + { + /* + We are using only a prefix of the column as a key: + Create a new field for the key part that matches the index + */ + field= key_part->field=field->new_field(&outparam->mem_root, + outparam, 0); + field->field_length= key_part->length; + } + } + } + } + +#ifdef WITH_PARTITION_STORAGE_ENGINE + if (share->partition_info_len) + { + /* + In this execution we must avoid calling thd->change_item_tree since + we might release memory before statement is completed. We do this + by changing to a new statement arena. As part of this arena we also + set the memory root to be the memory root of the table since we + call the parser and fix_fields which both can allocate memory for + item objects. We keep the arena to ensure that we can release the + free_list when closing the table object. + SEE Bug #21658 + */ + + Query_arena *backup_stmt_arena_ptr= thd->stmt_arena; + Query_arena backup_arena; + Query_arena part_func_arena(&outparam->mem_root, Query_arena::INITIALIZED); + thd->set_n_backup_active_arena(&part_func_arena, &backup_arena); + thd->stmt_arena= &part_func_arena; + bool tmp; + bool work_part_info_used; + + tmp= mysql_unpack_partition(thd, share->partition_info, + share->partition_info_len, + share->part_state, + share->part_state_len, + outparam, is_create_table, + share->default_part_db_type, + &work_part_info_used); + outparam->part_info->is_auto_partitioned= share->auto_partitioned; + DBUG_PRINT("info", ("autopartitioned: %u", share->auto_partitioned)); + /* we should perform the fix_partition_func in either local or + caller's arena depending on work_part_info_used value + */ + if (!tmp && !work_part_info_used) + tmp= fix_partition_func(thd, outparam, is_create_table); + thd->stmt_arena= backup_stmt_arena_ptr; + thd->restore_active_arena(&part_func_arena, &backup_arena); + if (!tmp) + { + if (work_part_info_used) + tmp= fix_partition_func(thd, outparam, is_create_table); + outparam->part_info->item_free_list= part_func_arena.free_list; + } + if (tmp) + { + if (is_create_table) + { + /* + During CREATE/ALTER TABLE it is ok to receive errors here. + It is not ok if it happens during the opening of an frm + file as part of a normal query. + */ + error_reported= TRUE; + } + goto err; + } + } +#endif + + /* Allocate bitmaps */ + + bitmap_size= share->column_bitmap_size; + if (!(bitmaps= (uchar*) alloc_root(&outparam->mem_root, bitmap_size*3))) + goto err; + bitmap_init(&outparam->def_read_set, + (my_bitmap_map*) bitmaps, share->fields, FALSE); + bitmap_init(&outparam->def_write_set, + (my_bitmap_map*) (bitmaps+bitmap_size), share->fields, FALSE); + bitmap_init(&outparam->tmp_set, + (my_bitmap_map*) (bitmaps+bitmap_size*2), share->fields, FALSE); + outparam->default_column_bitmaps(); + /* The table struct is now initialized; Open the table */ - error=2; + error= 2; if (db_stat) { int ha_err; - unpack_filename(index_file,index_file); if ((ha_err= (outparam->file-> - ha_open(index_file, + ha_open(outparam, share->normalized_path.str, (db_stat & HA_READ_ONLY ? O_RDONLY : O_RDWR), (db_stat & HA_OPEN_TEMPORARY ? HA_OPEN_TMP_TABLE : ((db_stat & HA_WAIT_IF_LOCKED) || @@ -940,58 +1787,89 @@ int openfrm(THD *thd, const char *name, const char *alias, uint db_stat, outparam->file->auto_repair() && !(ha_open_flags & HA_OPEN_FOR_REPAIR)); - if (ha_err == HA_ERR_NO_SUCH_TABLE) + switch (ha_err) { - /* The table did not exists in storage engine, use same error message - as if the .frm file didn't exist */ - error= 1; - my_errno= ENOENT; - } - else - { - outparam->file->print_error(ha_err, MYF(0)); - error_reported= TRUE; + case HA_ERR_NO_SUCH_TABLE: + /* + The table did not exists in storage engine, use same error message + as if the .frm file didn't exist + */ + error= 1; + my_errno= ENOENT; + break; + case EMFILE: + /* + Too many files opened, use same error message as if the .frm + file can't open + */ + DBUG_PRINT("error", ("open file: %s failed, too many files opened (errno: %d)", + share->normalized_path.str, ha_err)); + error= 1; + my_errno= EMFILE; + break; + default: + outparam->file->print_error(ha_err, MYF(0)); + error_reported= TRUE; + if (ha_err == HA_ERR_TABLE_DEF_CHANGED) + error= 7; + break; } goto err; /* purecov: inspected */ } } - share->db_low_byte_first= outparam->file->low_byte_first(); - *root_ptr= old_root; - thd->status_var.opened_tables++; -#ifndef DBUG_OFF - if (use_hash) - (void) hash_check(&share->name_hash); +#if defined(HAVE_purify) && !defined(DBUG_OFF) + bzero((char*) bitmaps, bitmap_size*3); #endif + + thd->status_var.opened_tables++; + DBUG_RETURN (0); err: - x_free((gptr) disk_buff); - if (file > 0) - VOID(my_close(file,MYF(MY_WME))); - - delete crypted; - *root_ptr= old_root; if (! error_reported) - frm_error(error,outparam,name,ME_ERROR+ME_WAITTANG, errarg); + open_table_error(share, error, my_errno, 0); delete outparam->file; - outparam->file=0; // For easier errorchecking +#ifdef WITH_PARTITION_STORAGE_ENGINE + if (outparam->part_info) + free_items(outparam->part_info->item_free_list); +#endif + outparam->file= 0; // For easier error checking outparam->db_stat=0; - hash_free(&share->name_hash); free_root(&outparam->mem_root, MYF(0)); // Safe to call on bzero'd root my_free((char*) outparam->alias, MYF(MY_ALLOW_ZERO_PTR)); DBUG_RETURN (error); -} /* openfrm */ +} + +/* + Free information allocated by openfrm - /* close a .frm file and it's tables */ + SYNOPSIS + closefrm() + table TABLE object to free + free_share Is 1 if we also want to free table_share +*/ -int closefrm(register TABLE *table) +int closefrm(register TABLE *table, bool free_share) { int error=0; + uint idx; + KEY *key_info; DBUG_ENTER("closefrm"); + DBUG_PRINT("enter", ("table: 0x%lx", (long) table)); + if (table->db_stat) error=table->file->close(); + key_info= table->key_info; + for (idx= table->s->keys; idx; idx--, key_info++) + { + if (key_info->flags & HA_USES_PARSER) + { + plugin_unlock(NULL, key_info->parser); + key_info->flags= 0; + } + } my_free((char*) table->alias, MYF(MY_ALLOW_ZERO_PTR)); table->alias= 0; if (table->field) @@ -1002,7 +1880,21 @@ int closefrm(register TABLE *table) } delete table->file; table->file= 0; /* For easier errorchecking */ - hash_free(&table->s->name_hash); +#ifdef WITH_PARTITION_STORAGE_ENGINE + if (table->part_info) + { + free_items(table->part_info->item_free_list); + table->part_info->item_free_list= 0; + table->part_info= 0; + } +#endif + if (free_share) + { + if (table->s->tmp_table == NO_TMP_TABLE) + release_table_share(table->s, RELEASE_NORMAL); + else + free_table_share(table->s); + } free_root(&table->mem_root, MYF(0)); DBUG_RETURN(error); } @@ -1031,7 +1923,7 @@ ulong get_form_pos(File file, uchar *head, TYPELIB *save_names) DBUG_ENTER("get_form_pos"); names=uint2korr(head+8); - a_length=(names+2)*sizeof(my_string); /* Room for two extra */ + a_length=(names+2)*sizeof(char *); /* Room for two extra */ if (!save_names) a_length=0; @@ -1042,12 +1934,12 @@ ulong get_form_pos(File file, uchar *head, TYPELIB *save_names) { length=uint2korr(head+4); VOID(my_seek(file,64L,MY_SEEK_SET,MYF(0))); - if (!(buf= (uchar*) my_malloc((uint) length+a_length+names*4, + if (!(buf= (uchar*) my_malloc((size_t) length+a_length+names*4, MYF(MY_WME))) || - my_read(file,(byte*) buf+a_length,(uint) (length+names*4), + my_read(file, buf+a_length, (size_t) (length+names*4), MYF(MY_NABP))) { /* purecov: inspected */ - x_free((gptr) buf); /* purecov: inspected */ + x_free((uchar*) buf); /* purecov: inspected */ DBUG_RETURN(0L); /* purecov: inspected */ } pos= buf+a_length+length; @@ -1056,7 +1948,7 @@ ulong get_form_pos(File file, uchar *head, TYPELIB *save_names) if (! save_names) { if (names) - my_free((gptr) buf,MYF(0)); + my_free((uchar*) buf,MYF(0)); } else if (!names) bzero((char*) save_names,sizeof(save_names)); @@ -1070,19 +1962,24 @@ ulong get_form_pos(File file, uchar *head, TYPELIB *save_names) } - /* Read string from a file with malloc */ +/* + Read string from a file with malloc + + NOTES: + We add an \0 at end of the read string to make reading of C strings easier +*/ -int read_string(File file, gptr *to, uint length) +int read_string(File file, uchar**to, size_t length) { DBUG_ENTER("read_string"); - x_free((gptr) *to); - if (!(*to= (gptr) my_malloc(length+1,MYF(MY_WME))) || - my_read(file,(byte*) *to,length,MYF(MY_NABP))) + x_free(*to); + if (!(*to= (uchar*) my_malloc(length+1,MYF(MY_WME))) || + my_read(file, *to, length,MYF(MY_NABP))) { - x_free((gptr) *to); /* purecov: inspected */ - *to= 0; /* purecov: inspected */ - DBUG_RETURN(1); /* purecov: inspected */ + x_free(*to); /* purecov: inspected */ + *to= 0; /* purecov: inspected */ + DBUG_RETURN(1); /* purecov: inspected */ } *((char*) *to+length)= '\0'; DBUG_RETURN (0); @@ -1096,7 +1993,7 @@ ulong make_new_entry(File file, uchar *fileinfo, TYPELIB *formnames, { uint i,bufflength,maxlength,n_length,length,names; ulong endpos,newpos; - char buff[IO_SIZE]; + uchar buff[IO_SIZE]; uchar *pos; DBUG_ENTER("make_new_entry"); @@ -1116,17 +2013,17 @@ ulong make_new_entry(File file, uchar *fileinfo, TYPELIB *formnames, while (endpos > maxlength) { VOID(my_seek(file,(ulong) (endpos-bufflength),MY_SEEK_SET,MYF(0))); - if (my_read(file,(byte*) buff,bufflength,MYF(MY_NABP+MY_WME))) + if (my_read(file, buff, bufflength, MYF(MY_NABP+MY_WME))) DBUG_RETURN(0L); VOID(my_seek(file,(ulong) (endpos-bufflength+IO_SIZE),MY_SEEK_SET, MYF(0))); - if ((my_write(file,(byte*) buff,bufflength,MYF(MY_NABP+MY_WME)))) + if ((my_write(file, buff,bufflength,MYF(MY_NABP+MY_WME)))) DBUG_RETURN(0); endpos-=bufflength; bufflength=IO_SIZE; } bzero(buff,IO_SIZE); /* Null new block */ VOID(my_seek(file,(ulong) maxlength,MY_SEEK_SET,MYF(0))); - if (my_write(file,(byte*) buff,bufflength,MYF(MY_NABP+MY_WME))) + if (my_write(file,buff,bufflength,MYF(MY_NABP+MY_WME))) DBUG_RETURN(0L); maxlength+=IO_SIZE; /* Fix old ref */ int2store(fileinfo+6,maxlength); @@ -1141,15 +2038,15 @@ ulong make_new_entry(File file, uchar *fileinfo, TYPELIB *formnames, if (n_length == 1 ) { /* First name */ length++; - VOID(strxmov(buff,"/",newname,"/",NullS)); + VOID(strxmov((char*) buff,"/",newname,"/",NullS)); } else - VOID(strxmov(buff,newname,"/",NullS)); /* purecov: inspected */ + VOID(strxmov((char*) buff,newname,"/",NullS)); /* purecov: inspected */ VOID(my_seek(file,63L+(ulong) n_length,MY_SEEK_SET,MYF(0))); - if (my_write(file,(byte*) buff,(uint) length+1,MYF(MY_NABP+MY_WME)) || - (names && my_write(file,(byte*) (*formnames->type_names+n_length-1), + if (my_write(file, buff, (size_t) length+1,MYF(MY_NABP+MY_WME)) || + (names && my_write(file,(uchar*) (*formnames->type_names+n_length-1), names*4, MYF(MY_NABP+MY_WME))) || - my_write(file,(byte*) fileinfo+10,(uint) 4,MYF(MY_NABP+MY_WME))) + my_write(file, fileinfo+10, 4,MYF(MY_NABP+MY_WME))) DBUG_RETURN(0L); /* purecov: inspected */ int2store(fileinfo+8,names+1); @@ -1161,38 +2058,44 @@ ulong make_new_entry(File file, uchar *fileinfo, TYPELIB *formnames, /* error message when opening a form file */ -static void frm_error(int error, TABLE *form, const char *name, - myf errortype, int errarg) +void open_table_error(TABLE_SHARE *share, int error, int db_errno, int errarg) { int err_no; char buff[FN_REFLEN]; - const char *form_dev="",*datext; - const char *real_name= (char*) name+dirname_length(name); - DBUG_ENTER("frm_error"); + myf errortype= ME_ERROR+ME_WAITTANG; + DBUG_ENTER("open_table_error"); switch (error) { + case 7: case 1: - if (my_errno == ENOENT) + if (db_errno == ENOENT) + my_error(ER_NO_SUCH_TABLE, MYF(0), share->db.str, share->table_name.str); + else { - char *db; - uint length=dirname_part(buff,name); - buff[length-1]=0; - db=buff+dirname_length(buff); - my_error(ER_NO_SUCH_TABLE, MYF(0), db, real_name); + strxmov(buff, share->normalized_path.str, reg_ext, NullS); + my_error((db_errno == EMFILE) ? ER_CANT_OPEN_FILE : ER_FILE_NOT_FOUND, + errortype, buff, db_errno); } - else - my_error((my_errno == EMFILE) ? ER_CANT_OPEN_FILE : ER_FILE_NOT_FOUND, - errortype, - fn_format(buff, name, form_dev, reg_ext, 0), my_errno); break; case 2: { - datext= form->file ? *form->file->bas_ext() : ""; - datext= datext==NullS ? "" : datext; - err_no= (my_errno == ENOENT) ? ER_FILE_NOT_FOUND : (my_errno == EAGAIN) ? + handler *file= 0; + const char *datext= ""; + + if (share->db_type() != NULL) + { + if ((file= get_new_handler(share, current_thd->mem_root, + share->db_type()))) + { + if (!(datext= *file->bas_ext())) + datext= ""; + } + } + err_no= (db_errno == ENOENT) ? ER_FILE_NOT_FOUND : (db_errno == EAGAIN) ? ER_FILE_USED : ER_CANT_OPEN_FILE; - my_error(err_no,errortype, - fn_format(buff,real_name,form_dev,datext,2),my_errno); + strxmov(buff, share->normalized_path.str, datext, NullS); + my_error(err_no,errortype, buff, db_errno); + delete file; break; } case 5: @@ -1206,23 +2109,24 @@ static void frm_error(int error, TABLE *form, const char *name, } my_printf_error(ER_UNKNOWN_COLLATION, "Unknown collation '%s' in table '%-.64s' definition", - MYF(0), csname, real_name); + MYF(0), csname, share->table_name.str); break; } case 6: + strxmov(buff, share->normalized_path.str, reg_ext, NullS); my_printf_error(ER_NOT_FORM_FILE, "Table '%-.64s' was created with a different version " - "of MySQL and cannot be read", - MYF(0), name); + "of MySQL and cannot be read", + MYF(0), buff); break; default: /* Better wrong error than none */ case 4: - my_error(ER_NOT_FORM_FILE, errortype, - fn_format(buff, name, form_dev, reg_ext, 0)); + strxmov(buff, share->normalized_path.str, reg_ext, NullS); + my_error(ER_NOT_FORM_FILE, errortype, buff, 0); break; } DBUG_VOID_RETURN; -} /* frm_error */ +} /* open_table_error */ /* @@ -1302,22 +2206,21 @@ TYPELIB *typelib(MEM_ROOT *mem_root, List<String> &strings) # field number +1 */ -static uint find_field(TABLE *form,uint start,uint length) +static uint find_field(Field **fields, uchar *record, uint start, uint length) { Field **field; - uint i, pos, fields; + uint i, pos; - pos=0; - fields= form->s->fields; - for (field=form->field, i=1 ; i<= fields ; i++,field++) + pos= 0; + for (field= fields, i=1 ; *field ; i++,field++) { - if ((*field)->offset() == start) + if ((*field)->offset(record) == start) { if ((*field)->key_length() == length) return (i); - if (!pos || form->field[pos-1]->pack_length() < + if (!pos || fields[pos-1]->pack_length() < (*field)->pack_length()) - pos=i; + pos= i; } } return (pos); @@ -1407,15 +2310,16 @@ void append_unescaped(String *res, const char *pos, uint length) res->append('\''); } + /* Create a .frm file */ -File create_frm(THD *thd, my_string name, const char *db, +File create_frm(THD *thd, const char *name, const char *db, const char *table, uint reclength, uchar *fileinfo, - HA_CREATE_INFO *create_info, uint keys) + HA_CREATE_INFO *create_info, uint keys) { register File file; ulong length; - char fill[IO_SIZE]; + uchar fill[IO_SIZE]; int create_flags= O_RDWR | O_TRUNC; if (create_info->options & HA_LEX_CREATE_TMP_TABLE) @@ -1427,12 +2331,6 @@ File create_frm(THD *thd, my_string name, const char *db, if (create_info->min_rows > UINT_MAX32) create_info->min_rows= UINT_MAX32; - /* - Ensure that raid_chunks can't be larger than 255, as this would cause - problems with drop database - */ - set_if_smaller(create_info->raid_chunks, 255); - if ((file= my_create(name, CREATE_MODE, create_flags, MYF(0))) >= 0) { uint key_length, tmp_key_length; @@ -1443,7 +2341,8 @@ File create_frm(THD *thd, my_string name, const char *db, fileinfo[1]= 1; fileinfo[2]= FRM_VER+3+ test(create_info->varchar); - fileinfo[3]= (uchar) ha_checktype(thd,create_info->db_type,0,0); + fileinfo[3]= (uchar) ha_legacy_type( + ha_checktype(thd,ha_legacy_type(create_info->db_type),0,0)); fileinfo[4]=1; int2store(fileinfo+6,IO_SIZE); /* Next block starts here */ /* @@ -1476,17 +2375,26 @@ File create_frm(THD *thd, my_string name, const char *db, fileinfo[38]= (create_info->default_table_charset ? create_info->default_table_charset->number : 0); fileinfo[40]= (uchar) create_info->row_type; - fileinfo[41]= (uchar) create_info->raid_type; - fileinfo[42]= (uchar) create_info->raid_chunks; - int4store(fileinfo+43,create_info->raid_chunksize); + /* Next few bytes were for RAID support */ + fileinfo[41]= 0; + fileinfo[42]= 0; + fileinfo[43]= 0; + fileinfo[44]= 0; + fileinfo[45]= 0; + fileinfo[46]= 0; int4store(fileinfo+47, key_length); tmp= MYSQL_VERSION_ID; // Store to avoid warning from int4store int4store(fileinfo+51, tmp); - int2store(fileinfo+55, create_info->extra_size); + int4store(fileinfo+55, create_info->extra_size); + /* + 59-60 is reserved for extra_rec_buf_length, + 61 for default_part_db_type + */ + int2store(fileinfo+62, create_info->key_block_size); bzero(fill,IO_SIZE); for (; length > IO_SIZE ; length-= IO_SIZE) { - if (my_write(file,(byte*) fill,IO_SIZE,MYF(MY_WME | MY_NABP))) + if (my_write(file,fill, IO_SIZE, MYF(MY_WME | MY_NABP))) { VOID(my_close(file,MYF(0))); VOID(my_delete(name,MYF(0))); @@ -1515,9 +2423,6 @@ void update_create_info_from_table(HA_CREATE_INFO *create_info, TABLE *table) create_info->table_options= share->db_create_options; create_info->avg_row_length= share->avg_row_length; create_info->row_type= share->row_type; - create_info->raid_type= share->raid_type; - create_info->raid_chunks= share->raid_chunks; - create_info->raid_chunksize= share->raid_chunksize; create_info->default_table_charset= share->table_charset; create_info->table_charset= 0; @@ -1595,13 +2500,37 @@ char *get_field(MEM_ROOT *mem, Field *field) return to; } +/* + DESCRIPTION + given a buffer with a key value, and a map of keyparts + that are present in this value, returns the length of the value +*/ +uint calculate_key_len(TABLE *table, uint key, const uchar *buf, + key_part_map keypart_map) +{ + /* works only with key prefixes */ + DBUG_ASSERT(((keypart_map + 1) & keypart_map) == 0); + + KEY *key_info= table->s->key_info+key; + KEY_PART_INFO *key_part= key_info->key_part; + KEY_PART_INFO *end_key_part= key_part + key_info->key_parts; + uint length= 0; + + while (key_part < end_key_part && keypart_map) + { + length+= key_part->store_length; + keypart_map >>= 1; + key_part++; + } + return length; +} /* Check if database name is valid SYNPOSIS check_db_name() - name Name of database + org_name Name of database and length NOTES If lower_case_table_names is set then database is converted to lower case @@ -1611,51 +2540,52 @@ char *get_field(MEM_ROOT *mem, Field *field) 1 error */ -bool check_db_name(char *name) +bool check_db_name(LEX_STRING *org_name) { - char *start= name; - /* Used to catch empty names and names with end space */ - bool last_char_is_space= TRUE; + char *name= org_name->str; + uint name_length= org_name->length; + + if (!name_length || name_length > NAME_LEN) + return 1; if (lower_case_table_names && name != any_db) my_casedn_str(files_charset_info, name); - while (*name) - { #if defined(USE_MB) && defined(USE_MB_IDENT) - last_char_is_space= my_isspace(system_charset_info, *name); - if (use_mb(system_charset_info)) + if (use_mb(system_charset_info)) + { + name_length= 0; + bool last_char_is_space= TRUE; + char *end= name + org_name->length; + while (name < end) { - int len=my_ismbchar(system_charset_info, name, - name+system_charset_info->mbmaxlen); - if (len) - { - name += len; - continue; - } + int len; + last_char_is_space= my_isspace(system_charset_info, *name); + len= my_ismbchar(system_charset_info, name, end); + if (!len) + len= 1; + name+= len; + name_length++; } -#else - last_char_is_space= *name==' '; -#endif - if (*name == '/' || *name == '\\' || *name == FN_LIBCHAR || - *name == FN_EXTCHAR) - return 1; - name++; + return (last_char_is_space || name_length > NAME_CHAR_LEN); } - return last_char_is_space || (uint) (name - start) > NAME_LEN; + else +#endif + return ((org_name->str[org_name->length - 1] != ' ') || + (name_length > NAME_CHAR_LEN)); /* purecov: inspected */ } /* Allow anything as a table name, as long as it doesn't contain an - a '/', or a '.' character - or ' ' at the end + ' ' at the end returns 1 on error */ bool check_table_name(const char *name, uint length) { + uint name_length= 0; // name length in symbols const char *end= name+length; if (!length || length > NAME_LEN) return 1; @@ -1676,16 +2606,16 @@ bool check_table_name(const char *name, uint length) if (len) { name += len; + name_length++; continue; } } #endif - if (*name == '/' || *name == '\\' || *name == FN_EXTCHAR) - return 1; name++; + name_length++; } #if defined(USE_MB) && defined(USE_MB_IDENT) - return last_char_is_space; + return (last_char_is_space || name_length > NAME_CHAR_LEN) ; #else return 0; #endif @@ -1694,7 +2624,7 @@ bool check_table_name(const char *name, uint length) bool check_column_name(const char *name) { - const char *start= name; + uint name_length= 0; // name length in symbols bool last_char_is_space= TRUE; while (*name) @@ -1708,6 +2638,7 @@ bool check_column_name(const char *name) if (len) { name += len; + name_length++; continue; } } @@ -1717,11 +2648,153 @@ bool check_column_name(const char *name) if (*name == NAMES_SEP_CHAR) return 1; name++; + name_length++; } /* Error if empty or too long column name */ - return last_char_is_space || (uint) (name - start) > NAME_LEN; + return last_char_is_space || (uint) name_length > NAME_CHAR_LEN; } + +/** + Checks whether a table is intact. Should be done *just* after the table has + been opened. + + @param[in] table The table to check + @param[in] table_f_count Expected number of columns in the table + @param[in] table_def Expected structure of the table (column name + and type) + + @retval FALSE OK + @retval TRUE There was an error. An error message is output + to the error log. We do not push an error + message into the error stack because this + function is currently only called at start up, + and such errors never reach the user. +*/ + +my_bool +table_check_intact(TABLE *table, const uint table_f_count, + const TABLE_FIELD_W_TYPE *table_def) +{ + uint i; + my_bool error= FALSE; + my_bool fields_diff_count; + DBUG_ENTER("table_check_intact"); + DBUG_PRINT("info",("table: %s expected_count: %d", + table->alias, table_f_count)); + + fields_diff_count= (table->s->fields != table_f_count); + if (fields_diff_count) + { + DBUG_PRINT("info", ("Column count has changed, checking the definition")); + + /* previous MySQL version */ + if (MYSQL_VERSION_ID > table->s->mysql_version) + { + sql_print_error(ER(ER_COL_COUNT_DOESNT_MATCH_PLEASE_UPDATE), + table->alias, table_f_count, table->s->fields, + table->s->mysql_version, MYSQL_VERSION_ID); + DBUG_RETURN(TRUE); + } + else if (MYSQL_VERSION_ID == table->s->mysql_version) + { + sql_print_error(ER(ER_COL_COUNT_DOESNT_MATCH_CORRUPTED), table->alias, + table_f_count, table->s->fields); + DBUG_RETURN(TRUE); + } + /* + Something has definitely changed, but we're running an older + version of MySQL with new system tables. + Let's check column definitions. If a column was added at + the end of the table, then we don't care much since such change + is backward compatible. + */ + } + char buffer[STRING_BUFFER_USUAL_SIZE]; + for (i=0 ; i < table_f_count; i++, table_def++) + { + String sql_type(buffer, sizeof(buffer), system_charset_info); + sql_type.length(0); + if (i < table->s->fields) + { + Field *field= table->field[i]; + + if (strncmp(field->field_name, table_def->name.str, + table_def->name.length)) + { + /* + Name changes are not fatal, we use ordinal numbers to access columns. + Still this can be a sign of a tampered table, output an error + to the error log. + */ + sql_print_error("Incorrect definition of table %s.%s: " + "expected column '%s' at position %d, found '%s'.", + table->s->db.str, table->alias, table_def->name.str, i, + field->field_name); + } + field->sql_type(sql_type); + /* + Generally, if column types don't match, then something is + wrong. + + However, we only compare column definitions up to the + length of the original definition, since we consider the + following definitions compatible: + + 1. DATETIME and DATETIM + 2. INT(11) and INT(11 + 3. SET('one', 'two') and SET('one', 'two', 'more') + + For SETs or ENUMs, if the same prefix is there it's OK to + add more elements - they will get higher ordinal numbers and + the new table definition is backward compatible with the + original one. + */ + if (strncmp(sql_type.c_ptr_safe(), table_def->type.str, + table_def->type.length - 1)) + { + sql_print_error("Incorrect definition of table %s.%s: " + "expected column '%s' at position %d to have type " + "%s, found type %s.", table->s->db.str, table->alias, + table_def->name.str, i, table_def->type.str, + sql_type.c_ptr_safe()); + error= TRUE; + } + else if (table_def->cset.str && !field->has_charset()) + { + sql_print_error("Incorrect definition of table %s.%s: " + "expected the type of column '%s' at position %d " + "to have character set '%s' but the type has no " + "character set.", table->s->db.str, table->alias, + table_def->name.str, i, table_def->cset.str); + error= TRUE; + } + else if (table_def->cset.str && + strcmp(field->charset()->csname, table_def->cset.str)) + { + sql_print_error("Incorrect definition of table %s.%s: " + "expected the type of column '%s' at position %d " + "to have character set '%s' but found " + "character set '%s'.", table->s->db.str, table->alias, + table_def->name.str, i, table_def->cset.str, + field->charset()->csname); + error= TRUE; + } + } + else + { + sql_print_error("Incorrect definition of table %s.%s: " + "expected column '%s' at position %d to have type %s " + " but the column is not found.", + table->s->db.str, table->alias, + table_def->name.str, i, table_def->type.str); + error= TRUE; + } + } + DBUG_RETURN(error); +} + + /* Create Item_field for each column in the table. @@ -1922,7 +2995,7 @@ void TABLE_LIST::calc_md5(char *buffer) my_MD5_CTX context; uchar digest[16]; my_MD5Init(&context); - my_MD5Update(&context,(uchar *) query.str, query.length); + my_MD5Update(&context,(uchar *) select_stmt.str, select_stmt.length); my_MD5Final(digest, &context); sprintf((char *) buffer, "%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x", @@ -2006,7 +3079,7 @@ bool TABLE_LIST::setup_underlying(THD *thd) List_iterator_fast<Item> it(select->item_list); uint field_count= 0; - if (check_stack_overrun(thd, STACK_MIN_SIZE, (char *)&field_count)) + if (check_stack_overrun(thd, STACK_MIN_SIZE, (uchar*) &field_count)) { DBUG_RETURN(TRUE); } @@ -2332,7 +3405,7 @@ void TABLE_LIST::cleanup_items() for (Field_translator *transl= field_translation; transl < field_translation_end; transl++) - transl->item->walk(&Item::cleanup_processor, 0); + transl->item->walk(&Item::cleanup_processor, 0, 0); } @@ -2426,7 +3499,7 @@ bool TABLE_LIST::set_insert_values(MEM_ROOT *mem_root) if (table) { if (!table->insert_values && - !(table->insert_values= (byte *)alloc_root(mem_root, + !(table->insert_values= (uchar *)alloc_root(mem_root, table->s->rec_buff_length))) return TRUE; } @@ -2828,9 +3901,9 @@ const char *Natural_join_column::db_name() are inconsistent in this respect. */ DBUG_ASSERT(!strcmp(table_ref->db, - table_ref->table->s->db) || + table_ref->table->s->db.str) || (table_ref->schema_table && - table_ref->table->s->db[0] == 0)); + table_ref->table->s->db.str[0] == 0)); return table_ref->db; } @@ -2914,7 +3987,7 @@ Item *create_view_field(THD *thd, TABLE_LIST *view, Item **field_ref, field= *field_ref; } thd->lex->current_select->no_wrap_view_item= save_wrapper; - if (thd->lex->current_select->no_wrap_view_item) + if (save_wrapper) { DBUG_RETURN(field); } @@ -3029,7 +4102,7 @@ const char *Field_iterator_table_ref::table_name() return natural_join_it.column_ref()->table_name(); DBUG_ASSERT(!strcmp(table_ref->table_name, - table_ref->table->s->table_name)); + table_ref->table->s->table_name.str)); return table_ref->table_name; } @@ -3046,9 +4119,9 @@ const char *Field_iterator_table_ref::db_name() ensure consistency. An exception are I_S schema tables, which are inconsistent in this respect. */ - DBUG_ASSERT(!strcmp(table_ref->db, table_ref->table->s->db) || + DBUG_ASSERT(!strcmp(table_ref->db, table_ref->table->s->db.str) || (table_ref->schema_table && - table_ref->table->s->db[0] == 0)); + table_ref->table->s->db.str[0] == 0)); return table_ref->db; } @@ -3110,7 +4183,7 @@ Field_iterator_table_ref::get_or_create_column_ref(TABLE_LIST *parent_table_ref) TABLE_LIST *add_table_ref= parent_table_ref ? parent_table_ref : table_ref; LINT_INIT(field_count); - + if (field_it == &table_field_it) { /* The field belongs to a stored table. */ @@ -3208,6 +4281,260 @@ Field_iterator_table_ref::get_natural_column_ref() return nj_col; } +/***************************************************************************** + Functions to handle column usage bitmaps (read_set, write_set etc...) +*****************************************************************************/ + +/* Reset all columns bitmaps */ + +void st_table::clear_column_bitmaps() +{ + /* + Reset column read/write usage. It's identical to: + bitmap_clear_all(&table->def_read_set); + bitmap_clear_all(&table->def_write_set); + */ + bzero((char*) def_read_set.bitmap, s->column_bitmap_size*2); + column_bitmaps_set(&def_read_set, &def_write_set); +} + + +/* + Tell handler we are going to call position() and rnd_pos() later. + + NOTES: + This is needed for handlers that uses the primary key to find the + row. In this case we have to extend the read bitmap with the primary + key fields. +*/ + +void st_table::prepare_for_position() +{ + DBUG_ENTER("st_table::prepare_for_position"); + + if ((file->ha_table_flags() & HA_PRIMARY_KEY_IN_READ_INDEX) && + s->primary_key < MAX_KEY) + { + mark_columns_used_by_index_no_reset(s->primary_key, read_set); + /* signal change */ + file->column_bitmaps_signal(); + } + DBUG_VOID_RETURN; +} + + +/* + Mark that only fields from one key is used + + NOTE: + This changes the bitmap to use the tmp bitmap + After this, you can't access any other columns in the table until + bitmaps are reset, for example with st_table::clear_column_bitmaps() + or st_table::restore_column_maps_after_mark_index() +*/ + +void st_table::mark_columns_used_by_index(uint index) +{ + MY_BITMAP *bitmap= &tmp_set; + DBUG_ENTER("st_table::mark_columns_used_by_index"); + + (void) file->extra(HA_EXTRA_KEYREAD); + bitmap_clear_all(bitmap); + mark_columns_used_by_index_no_reset(index, bitmap); + column_bitmaps_set(bitmap, bitmap); + DBUG_VOID_RETURN; +} + + +/* + Restore to use normal column maps after key read + + NOTES + This reverse the change done by mark_columns_used_by_index + + WARNING + For this to work, one must have the normal table maps in place + when calling mark_columns_used_by_index +*/ + +void st_table::restore_column_maps_after_mark_index() +{ + DBUG_ENTER("st_table::restore_column_maps_after_mark_index"); + + key_read= 0; + (void) file->extra(HA_EXTRA_NO_KEYREAD); + default_column_bitmaps(); + file->column_bitmaps_signal(); + DBUG_VOID_RETURN; +} + + +/* + mark columns used by key, but don't reset other fields +*/ + +void st_table::mark_columns_used_by_index_no_reset(uint index, + MY_BITMAP *bitmap) +{ + KEY_PART_INFO *key_part= key_info[index].key_part; + KEY_PART_INFO *key_part_end= (key_part + + key_info[index].key_parts); + for (;key_part != key_part_end; key_part++) + bitmap_set_bit(bitmap, key_part->fieldnr-1); +} + + +/* + Mark auto-increment fields as used fields in both read and write maps + + NOTES + This is needed in insert & update as the auto-increment field is + always set and sometimes read. +*/ + +void st_table::mark_auto_increment_column() +{ + DBUG_ASSERT(found_next_number_field); + /* + We must set bit in read set as update_auto_increment() is using the + store() to check overflow of auto_increment values + */ + bitmap_set_bit(read_set, found_next_number_field->field_index); + bitmap_set_bit(write_set, found_next_number_field->field_index); + if (s->next_number_keypart) + mark_columns_used_by_index_no_reset(s->next_number_index, read_set); + file->column_bitmaps_signal(); +} + + +/* + Mark columns needed for doing an delete of a row + + DESCRIPTON + Some table engines don't have a cursor on the retrieve rows + so they need either to use the primary key or all columns to + be able to delete a row. + + If the engine needs this, the function works as follows: + - If primary key exits, mark the primary key columns to be read. + - If not, mark all columns to be read + + If the engine has HA_REQUIRES_KEY_COLUMNS_FOR_DELETE, we will + mark all key columns as 'to-be-read'. This allows the engine to + loop over the given record to find all keys and doesn't have to + retrieve the row again. +*/ + +void st_table::mark_columns_needed_for_delete() +{ + if (triggers) + triggers->mark_fields_used(TRG_EVENT_DELETE); + if (file->ha_table_flags() & HA_REQUIRES_KEY_COLUMNS_FOR_DELETE) + { + Field **reg_field; + for (reg_field= field ; *reg_field ; reg_field++) + { + if ((*reg_field)->flags & PART_KEY_FLAG) + bitmap_set_bit(read_set, (*reg_field)->field_index); + } + file->column_bitmaps_signal(); + } + if (file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_DELETE) + { + /* + If the handler has no cursor capabilites, we have to read either + the primary key, the hidden primary key or all columns to be + able to do an delete + */ + if (s->primary_key == MAX_KEY) + file->use_hidden_primary_key(); + else + { + mark_columns_used_by_index_no_reset(s->primary_key, read_set); + file->column_bitmaps_signal(); + } + } +} + + +/* + Mark columns needed for doing an update of a row + + DESCRIPTON + Some engines needs to have all columns in an update (to be able to + build a complete row). If this is the case, we mark all not + updated columns to be read. + + If this is no the case, we do like in the delete case and mark + if neeed, either the primary key column or all columns to be read. + (see mark_columns_needed_for_delete() for details) + + If the engine has HA_REQUIRES_KEY_COLUMNS_FOR_DELETE, we will + mark all USED key columns as 'to-be-read'. This allows the engine to + loop over the given record to find all changed keys and doesn't have to + retrieve the row again. +*/ + +void st_table::mark_columns_needed_for_update() +{ + DBUG_ENTER("mark_columns_needed_for_update"); + if (triggers) + triggers->mark_fields_used(TRG_EVENT_UPDATE); + if (file->ha_table_flags() & HA_REQUIRES_KEY_COLUMNS_FOR_DELETE) + { + /* Mark all used key columns for read */ + Field **reg_field; + for (reg_field= field ; *reg_field ; reg_field++) + { + /* Merge keys is all keys that had a column refered to in the query */ + if (merge_keys.is_overlapping((*reg_field)->part_of_key)) + bitmap_set_bit(read_set, (*reg_field)->field_index); + } + file->column_bitmaps_signal(); + } + if (file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_DELETE) + { + /* + If the handler has no cursor capabilites, we have to read either + the primary key, the hidden primary key or all columns to be + able to do an update + */ + if (s->primary_key == MAX_KEY) + file->use_hidden_primary_key(); + else + { + mark_columns_used_by_index_no_reset(s->primary_key, read_set); + file->column_bitmaps_signal(); + } + } + DBUG_VOID_RETURN; +} + + +/* + Mark columns the handler needs for doing an insert + + For now, this is used to mark fields used by the trigger + as changed. +*/ + +void st_table::mark_columns_needed_for_insert() +{ + if (triggers) + { + /* + We don't need to mark columns which are used by ON DELETE and + ON UPDATE triggers, which may be invoked in case of REPLACE or + INSERT ... ON DUPLICATE KEY UPDATE, since before doing actual + row replacement or update write_record() will mark all table + fields as used. + */ + triggers->mark_fields_used(TRG_EVENT_INSERT); + } + if (found_next_number_field) + mark_auto_increment_column(); +} + /* Cleanup this table for re-execution. @@ -3256,6 +4583,193 @@ Item_subselect *TABLE_LIST::containing_subselect() return (select_lex ? select_lex->master_unit()->item : 0); } +/* + Compiles the tagged hints list and fills up the bitmasks. + + SYNOPSIS + process_index_hints() + table the TABLE to operate on. + + DESCRIPTION + The parser collects the index hints for each table in a "tagged list" + (TABLE_LIST::index_hints). Using the information in this tagged list + this function sets the members st_table::keys_in_use_for_query, + st_table::keys_in_use_for_group_by, st_table::keys_in_use_for_order_by, + st_table::force_index and st_table::covering_keys. + + Current implementation of the runtime does not allow mixing FORCE INDEX + and USE INDEX, so this is checked here. Then the FORCE INDEX list + (if non-empty) is appended to the USE INDEX list and a flag is set. + + Multiple hints of the same kind are processed so that each clause + is applied to what is computed in the previous clause. + For example: + USE INDEX (i1) USE INDEX (i2) + is equivalent to + USE INDEX (i1,i2) + and means "consider only i1 and i2". + + Similarly + USE INDEX () USE INDEX (i1) + is equivalent to + USE INDEX (i1) + and means "consider only the index i1" + + It is OK to have the same index several times, e.g. "USE INDEX (i1,i1)" is + not an error. + + Different kind of hints (USE/FORCE/IGNORE) are processed in the following + order: + 1. All indexes in USE (or FORCE) INDEX are added to the mask. + 2. All IGNORE INDEX + + e.g. "USE INDEX i1, IGNORE INDEX i1, USE INDEX i1" will not use i1 at all + as if we had "USE INDEX i1, USE INDEX i1, IGNORE INDEX i1". + + As an optimization if there is a covering index, and we have + IGNORE INDEX FOR GROUP/ORDER, and this index is used for the JOIN part, + then we have to ignore the IGNORE INDEX FROM GROUP/ORDER. + + RETURN VALUE + FALSE no errors found + TRUE found and reported an error. +*/ +bool TABLE_LIST::process_index_hints(TABLE *table) +{ + /* initialize the result variables */ + table->keys_in_use_for_query= table->keys_in_use_for_group_by= + table->keys_in_use_for_order_by= table->s->keys_in_use; + + /* index hint list processing */ + if (index_hints) + { + key_map index_join[INDEX_HINT_FORCE + 1]; + key_map index_order[INDEX_HINT_FORCE + 1]; + key_map index_group[INDEX_HINT_FORCE + 1]; + Index_hint *hint; + int type; + bool have_empty_use_join= FALSE, have_empty_use_order= FALSE, + have_empty_use_group= FALSE; + List_iterator <Index_hint> iter(*index_hints); + + /* initialize temporary variables used to collect hints of each kind */ + for (type= INDEX_HINT_IGNORE; type <= INDEX_HINT_FORCE; type++) + { + index_join[type].clear_all(); + index_order[type].clear_all(); + index_group[type].clear_all(); + } + + /* iterate over the hints list */ + while ((hint= iter++)) + { + uint pos; + + /* process empty USE INDEX () */ + if (hint->type == INDEX_HINT_USE && !hint->key_name.str) + { + if (hint->clause & INDEX_HINT_MASK_JOIN) + { + index_join[hint->type].clear_all(); + have_empty_use_join= TRUE; + } + if (hint->clause & INDEX_HINT_MASK_ORDER) + { + index_order[hint->type].clear_all(); + have_empty_use_order= TRUE; + } + if (hint->clause & INDEX_HINT_MASK_GROUP) + { + index_group[hint->type].clear_all(); + have_empty_use_group= TRUE; + } + continue; + } + + /* + Check if an index with the given name exists and get his offset in + the keys bitmask for the table + */ + if (table->s->keynames.type_names == 0 || + (pos= find_type(&table->s->keynames, hint->key_name.str, + hint->key_name.length, 1)) <= 0) + { + my_error(ER_KEY_DOES_NOT_EXITS, MYF(0), hint->key_name.str, alias); + return 1; + } + + pos--; + + /* add to the appropriate clause mask */ + if (hint->clause & INDEX_HINT_MASK_JOIN) + index_join[hint->type].set_bit (pos); + if (hint->clause & INDEX_HINT_MASK_ORDER) + index_order[hint->type].set_bit (pos); + if (hint->clause & INDEX_HINT_MASK_GROUP) + index_group[hint->type].set_bit (pos); + } + + /* cannot mix USE INDEX and FORCE INDEX */ + if ((!index_join[INDEX_HINT_FORCE].is_clear_all() || + !index_order[INDEX_HINT_FORCE].is_clear_all() || + !index_group[INDEX_HINT_FORCE].is_clear_all()) && + (!index_join[INDEX_HINT_USE].is_clear_all() || have_empty_use_join || + !index_order[INDEX_HINT_USE].is_clear_all() || have_empty_use_order || + !index_group[INDEX_HINT_USE].is_clear_all() || have_empty_use_group)) + { + my_error(ER_WRONG_USAGE, MYF(0), index_hint_type_name[INDEX_HINT_USE], + index_hint_type_name[INDEX_HINT_FORCE]); + return 1; + } + + /* process FORCE INDEX as USE INDEX with a flag */ + if (!index_join[INDEX_HINT_FORCE].is_clear_all() || + !index_order[INDEX_HINT_FORCE].is_clear_all() || + !index_group[INDEX_HINT_FORCE].is_clear_all()) + { + table->force_index= TRUE; + index_join[INDEX_HINT_USE].merge(index_join[INDEX_HINT_FORCE]); + index_order[INDEX_HINT_USE].merge(index_order[INDEX_HINT_FORCE]); + index_group[INDEX_HINT_USE].merge(index_group[INDEX_HINT_FORCE]); + } + + /* apply USE INDEX */ + if (!index_join[INDEX_HINT_USE].is_clear_all() || have_empty_use_join) + table->keys_in_use_for_query.intersect(index_join[INDEX_HINT_USE]); + if (!index_order[INDEX_HINT_USE].is_clear_all() || have_empty_use_order) + table->keys_in_use_for_order_by.intersect (index_order[INDEX_HINT_USE]); + if (!index_group[INDEX_HINT_USE].is_clear_all() || have_empty_use_group) + table->keys_in_use_for_group_by.intersect (index_group[INDEX_HINT_USE]); + + /* apply IGNORE INDEX */ + table->keys_in_use_for_query.subtract (index_join[INDEX_HINT_IGNORE]); + table->keys_in_use_for_order_by.subtract (index_order[INDEX_HINT_IGNORE]); + table->keys_in_use_for_group_by.subtract (index_group[INDEX_HINT_IGNORE]); + } + + /* make sure covering_keys don't include indexes disabled with a hint */ + table->covering_keys.intersect(table->keys_in_use_for_query); + return 0; +} + + +size_t max_row_length(TABLE *table, const uchar *data) +{ + TABLE_SHARE *table_s= table->s; + size_t length= table_s->reclength + 2 * table_s->fields; + uint *const beg= table_s->blob_field; + uint *const end= beg + table_s->blob_fields; + + for (uint *ptr= beg ; ptr != end ; ++ptr) + { + Field_blob* const blob= (Field_blob*) table->field[*ptr]; + length+= blob->get_length((const uchar*) + (data + blob->offset(table->record[0]))) + + HA_KEY_BLOB_LENGTH; + } + return length; +} + /***************************************************************************** ** Instansiate templates *****************************************************************************/ |