diff options
Diffstat (limited to 'storage/maria')
30 files changed, 1038 insertions, 297 deletions
diff --git a/storage/maria/ft_maria.c b/storage/maria/ft_maria.c index 1b082f904d0..b1b24592593 100644 --- a/storage/maria/ft_maria.c +++ b/storage/maria/ft_maria.c @@ -22,8 +22,8 @@ #include "ma_ftdefs.h" FT_INFO *maria_ft_init_search(uint flags, void *info, uint keynr, - uchar *query, uint query_len, CHARSET_INFO *cs, - uchar *record) + uchar *query, size_t query_len, + CHARSET_INFO *cs, uchar *record) { FT_INFO *res; if (flags & FT_BOOL) diff --git a/storage/maria/ha_maria.cc b/storage/maria/ha_maria.cc index 187cc3465bf..7c34a5f7595 100644 --- a/storage/maria/ha_maria.cc +++ b/storage/maria/ha_maria.cc @@ -102,22 +102,40 @@ TYPELIB maria_translog_purge_type_typelib= array_elements(maria_translog_purge_type_names) - 1, "", maria_translog_purge_type_names, NULL }; + +/* transactional log directory sync */ const char *maria_sync_log_dir_names[]= { "NEVER", "NEWFILE", "ALWAYS", NullS }; - TYPELIB maria_sync_log_dir_typelib= { array_elements(maria_sync_log_dir_names) - 1, "", maria_sync_log_dir_names, NULL }; +/* transactional log group commit */ +const char *maria_group_commit_names[]= +{ + "none", "hard", "soft", NullS +}; +TYPELIB maria_group_commit_typelib= +{ + array_elements(maria_group_commit_names) - 1, "", + maria_group_commit_names, NULL +}; + /** Interval between background checkpoints in seconds */ static ulong checkpoint_interval; static void update_checkpoint_interval(MYSQL_THD thd, struct st_mysql_sys_var *var, void *var_ptr, const void *save); +static void update_maria_group_commit(MYSQL_THD thd, + struct st_mysql_sys_var *var, + void *var_ptr, const void *save); +static void update_maria_group_commit_interval(MYSQL_THD thd, + struct st_mysql_sys_var *var, + void *var_ptr, const void *save); /** After that many consecutive recovery failures, remove logs */ static ulong force_start_after_recovery_failures; static void update_log_file_size(MYSQL_THD thd, @@ -164,6 +182,24 @@ static MYSQL_SYSVAR_ULONG(log_file_size, log_file_size, NULL, update_log_file_size, TRANSLOG_FILE_SIZE, TRANSLOG_MIN_FILE_SIZE, 0xffffffffL, TRANSLOG_PAGE_SIZE); +static MYSQL_SYSVAR_ENUM(group_commit, maria_group_commit, + PLUGIN_VAR_RQCMDARG, + "Specifies maria group commit mode. " + "Possible values are \"none\" (no group commit), " + "\"hard\" (with waiting to actual commit), " + "\"soft\" (no wait for commit (DANGEROUS!!!))", + NULL, update_maria_group_commit, + TRANSLOG_GCOMMIT_NONE, &maria_group_commit_typelib); + +static MYSQL_SYSVAR_ULONG(group_commit_interval, maria_group_commit_interval, + PLUGIN_VAR_RQCMDARG, + "Interval between commite in microseconds (1/1000000c)." + " 0 stands for no waiting" + " for other threads to come and do a commit in \"hard\" mode and no" + " sync()/commit at all in \"soft\" mode. Option has only an effect" + " if maria_group_commit is used", + NULL, update_maria_group_commit_interval, 0, 0, UINT_MAX, 1); + static MYSQL_SYSVAR_ENUM(log_purge_type, log_purge_type, PLUGIN_VAR_RQCMDARG, "Specifies how maria transactional log will be purged. " @@ -665,10 +701,9 @@ int maria_check_definition(MARIA_KEYDEF *t1_keyinfo, extern "C" { -volatile int *_ma_killed_ptr(HA_CHECK *param) +int _ma_killed_ptr(HA_CHECK *param) { - /* In theory Unsafe conversion, but should be ok for now */ - return (int*) &(((THD *) (param->thd))->killed); + return thd_killed((THD*)param->thd); } @@ -1928,8 +1963,7 @@ end: bool ha_maria::check_and_repair(THD *thd) { int error, crashed; - char *old_query; - uint old_query_length; + LEX_STRING old_query; HA_CHECK_OPT check_opt; DBUG_ENTER("ha_maria::check_and_repair"); @@ -1957,11 +1991,9 @@ bool ha_maria::check_and_repair(THD *thd) if (!file->state->del && (maria_recover_options & HA_RECOVER_QUICK)) check_opt.flags |= T_QUICK; - old_query= thd->query; - old_query_length= thd->query_length; + old_query= thd->query_string; pthread_mutex_lock(&LOCK_thread_count); - thd->query= table->s->table_name.str; - thd->query_length= table->s->table_name.length; + thd->query_string= table->s->table_name; pthread_mutex_unlock(&LOCK_thread_count); if (!(crashed= maria_is_crashed(file))) @@ -1981,8 +2013,7 @@ bool ha_maria::check_and_repair(THD *thd) error= 1; } pthread_mutex_lock(&LOCK_thread_count); - thd->query= old_query; - thd->query_length= old_query_length; + thd->query_string= old_query; pthread_mutex_unlock(&LOCK_thread_count); DBUG_RETURN(error); } @@ -2294,9 +2325,12 @@ int ha_maria::extra(enum ha_extra_function operation) extern_lock(F_UNLOCK) (which resets file->trn) followed by maria_close() without calling commit/rollback in between. If file->trn is not set we can't remove file->share from the transaction list in the extra() call. + + table->in_use is not set in the case this is a done as part of closefrm() + as part of drop table. */ - if (!file->trn && + if (file->s->now_transactional && !file->trn && table->in_use && (operation == HA_EXTRA_PREPARE_FOR_DROP || operation == HA_EXTRA_PREPARE_FOR_RENAME)) { @@ -2330,7 +2364,7 @@ int ha_maria::delete_all_rows() { THD *thd= current_thd; (void) translog_log_debug_info(file->trn, LOGREC_DEBUG_INFO_QUERY, - (uchar*) thd->query, thd->query_length); + (uchar*) thd->query(), thd->query_length()); if (file->s->now_transactional && ((table->in_use->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) || table->in_use->locked_tables)) @@ -2349,7 +2383,7 @@ int ha_maria::delete_table(const char *name) { THD *thd= current_thd; (void) translog_log_debug_info(0, LOGREC_DEBUG_INFO_QUERY, - (uchar*) thd->query, thd->query_length); + (uchar*) thd->query(), thd->query_length()); return maria_delete_table(name); } @@ -2430,7 +2464,8 @@ int ha_maria::external_lock(THD *thd, int lock_type) trnman_set_flags(trn, trnman_get_flags(trn) | TRN_STATE_INFO_LOGGED | TRN_STATE_TABLES_CAN_CHANGE); (void) translog_log_debug_info(trn, LOGREC_DEBUG_INFO_QUERY, - (uchar*) thd->query, thd->query_length); + (uchar*) thd->query(), + thd->query_length()); } #endif } @@ -2526,7 +2561,8 @@ int ha_maria::start_stmt(THD *thd, thr_lock_type lock_type) { trnman_set_flags(trn, trnman_get_flags(trn) | TRN_STATE_INFO_LOGGED); (void) translog_log_debug_info(trn, LOGREC_DEBUG_INFO_QUERY, - (uchar*) thd->query, thd->query_length); + (uchar*) thd->query(), + thd->query_length()); } #endif } @@ -2807,7 +2843,7 @@ int ha_maria::create(const char *name, register TABLE *table_arg, create_flags|= HA_CREATE_PAGE_CHECKSUM; (void) translog_log_debug_info(0, LOGREC_DEBUG_INFO_QUERY, - (uchar*) thd->query, thd->query_length); + (uchar*) thd->query(), thd->query_length()); /* TODO: Check that the following fn_format is really needed */ error= @@ -2827,7 +2863,7 @@ int ha_maria::rename_table(const char *from, const char *to) { THD *thd= current_thd; (void) translog_log_debug_info(0, LOGREC_DEBUG_INFO_QUERY, - (uchar*) thd->query, thd->query_length); + (uchar*) thd->query(), thd->query_length()); return maria_rename(from, to); } @@ -3316,11 +3352,13 @@ static struct st_mysql_sys_var* system_variables[]= { MYSQL_SYSVAR(block_size), MYSQL_SYSVAR(checkpoint_interval), MYSQL_SYSVAR(force_start_after_recovery_failures), - MYSQL_SYSVAR(page_checksum), + MYSQL_SYSVAR(group_commit), + MYSQL_SYSVAR(group_commit_interval), MYSQL_SYSVAR(log_dir_path), MYSQL_SYSVAR(log_file_size), MYSQL_SYSVAR(log_purge_type), MYSQL_SYSVAR(max_sort_file_size), + MYSQL_SYSVAR(page_checksum), MYSQL_SYSVAR(pagecache_age_threshold), MYSQL_SYSVAR(pagecache_buffer_size), MYSQL_SYSVAR(pagecache_division_limit), @@ -3347,6 +3385,92 @@ static void update_checkpoint_interval(MYSQL_THD thd, } /** + @brief Updates group commit mode +*/ + +static void update_maria_group_commit(MYSQL_THD thd, + struct st_mysql_sys_var *var, + void *var_ptr, const void *save) +{ + ulong value= (ulong)*((long *)var_ptr); + DBUG_ENTER("update_maria_group_commit"); + DBUG_PRINT("enter", ("old value: %lu new value %lu rate %lu", + value, (ulong)(*(long *)save), + maria_group_commit_interval)); + /* old value */ + switch (value) { + case TRANSLOG_GCOMMIT_NONE: + break; + case TRANSLOG_GCOMMIT_HARD: + translog_hard_group_commit(FALSE); + break; + case TRANSLOG_GCOMMIT_SOFT: + translog_soft_sync(FALSE); + if (maria_group_commit_interval) + translog_soft_sync_end(); + break; + default: + DBUG_ASSERT(0); /* impossible */ + } + value= *(ulong *)var_ptr= (ulong)(*(long *)save); + translog_sync(); + /* new value */ + switch (value) { + case TRANSLOG_GCOMMIT_NONE: + break; + case TRANSLOG_GCOMMIT_HARD: + translog_hard_group_commit(TRUE); + break; + case TRANSLOG_GCOMMIT_SOFT: + translog_soft_sync(TRUE); + /* variable change made under global lock so we can just read it */ + if (maria_group_commit_interval) + translog_soft_sync_start(); + break; + default: + DBUG_ASSERT(0); /* impossible */ + } + DBUG_VOID_RETURN; +} + +/** + @brief Updates group commit interval +*/ + +static void update_maria_group_commit_interval(MYSQL_THD thd, + struct st_mysql_sys_var *var, + void *var_ptr, const void *save) +{ + ulong new_value= (ulong)*((long *)save); + ulong *value_ptr= (ulong*) var_ptr; + DBUG_ENTER("update_maria_group_commit_interval"); + DBUG_PRINT("enter", ("old value: %lu new value %lu group commit %lu", + *value_ptr, new_value, maria_group_commit)); + + /* variable change made under global lock so we can just read it */ + switch (maria_group_commit) { + case TRANSLOG_GCOMMIT_NONE: + *value_ptr= new_value; + translog_set_group_commit_interval(new_value); + break; + case TRANSLOG_GCOMMIT_HARD: + *value_ptr= new_value; + translog_set_group_commit_interval(new_value); + break; + case TRANSLOG_GCOMMIT_SOFT: + if (*value_ptr) + translog_soft_sync_end(); + translog_set_group_commit_interval(new_value); + if ((*value_ptr= new_value)) + translog_soft_sync_start(); + break; + default: + DBUG_ASSERT(0); /* impossible */ + } + DBUG_VOID_RETURN; +} + +/** @brief Updates the transaction log file limit. */ @@ -3368,6 +3492,7 @@ static SHOW_VAR status_variables[]= { {"Maria_pagecache_reads", (char*) &maria_pagecache_var.global_cache_read, SHOW_LONGLONG}, {"Maria_pagecache_write_requests", (char*) &maria_pagecache_var.global_cache_w_requests, SHOW_LONGLONG}, {"Maria_pagecache_writes", (char*) &maria_pagecache_var.global_cache_write, SHOW_LONGLONG}, + {"Maria_transaction_log_syncs", (char*) &translog_syncs, SHOW_LONGLONG}, {NullS, NullS, SHOW_LONG} }; @@ -3437,7 +3562,7 @@ mysql_declare_plugin(maria) MYSQL_STORAGE_ENGINE_PLUGIN, &maria_storage_engine, "MARIA", - "MySQL AB", + "Monty Program Ab", "Crash-safe tables with MyISAM heritage", PLUGIN_LICENSE_GPL, ha_maria_init, /* Plugin Init */ diff --git a/storage/maria/lockman.c b/storage/maria/lockman.c index e7f3c81b0fd..d6d4dcd44e6 100644 --- a/storage/maria/lockman.c +++ b/storage/maria/lockman.c @@ -360,7 +360,7 @@ retry: else { if (my_atomic_casptr((void **)cursor->prev, - (void **)&cursor->curr, cursor->next)) + (void **)(char*) &cursor->curr, cursor->next)) _lf_alloc_free(pins, cursor->curr); else { @@ -421,7 +421,8 @@ static int lockinsert(LOCK * volatile *head, LOCK *node, LF_PINS *pins, node->link= (intptr)cursor.curr; DBUG_ASSERT(node->link != (intptr)node); DBUG_ASSERT(cursor.prev != &node->link); - if (!my_atomic_casptr((void **)cursor.prev, (void **)&cursor.curr, node)) + if (!my_atomic_casptr((void **)cursor.prev, + (void **)(char*) &cursor.curr, node)) { res= REPEAT_ONCE_MORE; node->flags&= ~ACTIVE; @@ -498,11 +499,11 @@ static int lockdelete(LOCK * volatile *head, LOCK *node, LF_PINS *pins) then we can delete. Good news is - this is only required when rolling back a savepoint. */ - if (my_atomic_casptr((void **)&(cursor.curr->link), - (void **)&cursor.next, 1+(char *)cursor.next)) + if (my_atomic_casptr((void **)(char*)&(cursor.curr->link), + (void **)(char*)&cursor.next, 1+(char *)cursor.next)) { if (my_atomic_casptr((void **)cursor.prev, - (void **)&cursor.curr, cursor.next)) + (void **)(char*)&cursor.curr, cursor.next)) _lf_alloc_free(pins, cursor.curr); else lockfind(head, node, &cursor, pins); @@ -573,7 +574,7 @@ static void initialize_bucket(LOCKMAN *lm, LOCK * volatile *node, my_free((void *)dummy, MYF(0)); dummy= cur; } - my_atomic_casptr((void **)node, (void **)&tmp, dummy); + my_atomic_casptr((void **)node, (void **)(char*) &tmp, dummy); } static inline uint calc_hash(uint64 resource) diff --git a/storage/maria/ma_blockrec.c b/storage/maria/ma_blockrec.c index 694c8b7b5ff..89701913c9a 100644 --- a/storage/maria/ma_blockrec.c +++ b/storage/maria/ma_blockrec.c @@ -430,8 +430,9 @@ my_bool _ma_once_end_block_record(MARIA_SHARE *share) if (share->bitmap.file.file >= 0) { if (flush_pagecache_blocks(share->pagecache, &share->bitmap.file, - share->temporary ? FLUSH_IGNORE_CHANGED : - FLUSH_RELEASE)) + ((share->temporary || share->deleting) ? + FLUSH_IGNORE_CHANGED : + FLUSH_RELEASE))) res= 1; /* File must be synced as it is going out of the maria_open_list and so @@ -1688,7 +1689,8 @@ static my_bool get_head_or_tail_page(MARIA_HA *info, if (!page_link.changed) goto crashed; - DBUG_ASSERT((res->buff[PAGE_TYPE_OFFSET] & PAGE_TYPE_MASK) == page_type); + DBUG_ASSERT((uint) (res->buff[PAGE_TYPE_OFFSET] & PAGE_TYPE_MASK) == + page_type); if (!(dir= find_free_position(page_type == HEAD_PAGE ? info : 0, res->buff, block_size, &res->rownr, &res->length, &res->empty_space))) @@ -6094,7 +6096,7 @@ uint _ma_apply_redo_insert_row_head_or_tail(MARIA_HA *info, LSN lsn, DBUG_RETURN(0); } - if (((buff[PAGE_TYPE_OFFSET] & PAGE_TYPE_MASK) != page_type)) + if (((uint) (buff[PAGE_TYPE_OFFSET] & PAGE_TYPE_MASK) != page_type)) { /* This is a page that has been freed before and now should be @@ -6241,7 +6243,7 @@ uint _ma_apply_redo_purge_row_head_or_tail(MARIA_HA *info, LSN lsn, Note that in case the page is not anymore a head or tail page a future redo will fix the bitmap. */ - if ((buff[PAGE_TYPE_OFFSET] & PAGE_TYPE_MASK) == page_type) + if ((uint) (buff[PAGE_TYPE_OFFSET] & PAGE_TYPE_MASK) == page_type) { empty_space= uint2korr(buff+EMPTY_SPACE_OFFSET); if (_ma_bitmap_set(info, page, page_type == HEAD_PAGE, diff --git a/storage/maria/ma_check.c b/storage/maria/ma_check.c index e33849bef04..4f93bf812a3 100644 --- a/storage/maria/ma_check.c +++ b/storage/maria/ma_check.c @@ -215,7 +215,7 @@ int maria_chk_del(HA_CHECK *param, register MARIA_HA *info, empty=0; for (i= share->state.state.del ; i > 0L && next_link != HA_OFFSET_ERROR ; i--) { - if (*_ma_killed_ptr(param)) + if (_ma_killed_ptr(param)) DBUG_RETURN(1); if (test_flag & T_VERBOSE) printf(" %9s",llstr(next_link,buff)); @@ -310,7 +310,7 @@ static int check_k_link(HA_CHECK *param, register MARIA_HA *info, records= (ha_rows) (share->state.state.key_file_length / block_size); while (next_link != HA_OFFSET_ERROR && records > 0) { - if (*_ma_killed_ptr(param)) + if (_ma_killed_ptr(param)) DBUG_RETURN(1); if (param->testflag & T_VERBOSE) printf("%16s",llstr(next_link,llbuff)); @@ -876,10 +876,10 @@ static int chk_index(HA_CHECK *param, MARIA_HA *info, MARIA_KEYDEF *keyinfo, tmp_key.data= tmp_key_buff; for ( ;; ) { - if (*_ma_killed_ptr(param)) - goto err; if (nod_flag) { + if (_ma_killed_ptr(param)) + goto err; next_page= _ma_kpos(nod_flag,keypos); if (chk_index_down(param,info,keyinfo,next_page, temp_buff,keys,key_checksum,level+1)) @@ -1180,7 +1180,7 @@ static int check_static_record(HA_CHECK *param, MARIA_HA *info, int extend, pos= 0; while (pos < share->state.state.data_file_length) { - if (*_ma_killed_ptr(param)) + if (_ma_killed_ptr(param)) return -1; if (my_b_read(¶m->read_cache, record, share->base.pack_reclength)) @@ -1230,7 +1230,7 @@ static int check_dynamic_record(HA_CHECK *param, MARIA_HA *info, int extend, { my_bool got_error= 0; int flag; - if (*_ma_killed_ptr(param)) + if (_ma_killed_ptr(param)) DBUG_RETURN(-1); flag= block_info.second_read=0; @@ -1451,7 +1451,7 @@ static int check_compressed_record(HA_CHECK *param, MARIA_HA *info, int extend, pos= share->pack.header_length; /* Skip header */ while (pos < share->state.state.data_file_length) { - if (*_ma_killed_ptr(param)) + if (_ma_killed_ptr(param)) DBUG_RETURN(-1); if (_ma_read_cache(¶m->read_cache, block_info.header, pos, @@ -1815,7 +1815,7 @@ static int check_block_record(HA_CHECK *param, MARIA_HA *info, int extend, LINT_INIT(row_count); LINT_INIT(empty_space); - if (*_ma_killed_ptr(param)) + if (_ma_killed_ptr(param)) { _ma_scan_end_block_record(info); return -1; @@ -4631,7 +4631,7 @@ static int sort_get_next_record(MARIA_SORT_PARAM *sort_param) char llbuff[22],llbuff2[22]; DBUG_ENTER("sort_get_next_record"); - if (*_ma_killed_ptr(param)) + if (_ma_killed_ptr(param)) DBUG_RETURN(1); switch (sort_info->org_data_file_type) { @@ -6018,7 +6018,7 @@ int maria_update_state_info(HA_CHECK *param, MARIA_HA *info,uint update) { if (update & UPDATE_TIME) { - share->state.check_time= (long) time((time_t*) 0); + share->state.check_time= time((time_t*) 0); if (!share->state.create_time) share->state.create_time= share->state.check_time; } diff --git a/storage/maria/ma_check_standalone.h b/storage/maria/ma_check_standalone.h index 3874d722d6c..9b30c96089f 100644 --- a/storage/maria/ma_check_standalone.h +++ b/storage/maria/ma_check_standalone.h @@ -30,11 +30,9 @@ Check if check/repair operation was killed by a signal */ -static int not_killed= 0; - -volatile int *_ma_killed_ptr(HA_CHECK *param __attribute__((unused))) +int _ma_killed_ptr(HA_CHECK *param __attribute__((unused))) { - return ¬_killed; /* always NULL */ + return 0; } /* print warnings and errors */ diff --git a/storage/maria/ma_close.c b/storage/maria/ma_close.c index 235b37f7030..df525d45d14 100644 --- a/storage/maria/ma_close.c +++ b/storage/maria/ma_close.c @@ -79,7 +79,7 @@ int maria_close(register MARIA_HA *info) if ((*share->once_end)(share)) error= my_errno; if (flush_pagecache_blocks(share->pagecache, &share->kfile, - (share->temporary ? + ((share->temporary || share->deleting) ? FLUSH_IGNORE_CHANGED : FLUSH_RELEASE))) error= my_errno; @@ -177,6 +177,7 @@ int maria_close(register MARIA_HA *info) { (void) pthread_mutex_destroy(&share->intern_lock); (void) pthread_mutex_destroy(&share->close_lock); + (void) pthread_cond_destroy(&share->key_del_cond); my_free((uchar *)share, MYF(0)); /* If share cannot be freed, it's because checkpoint has previously diff --git a/storage/maria/ma_create.c b/storage/maria/ma_create.c index 05b11929d0a..6886dc8f291 100644 --- a/storage/maria/ma_create.c +++ b/storage/maria/ma_create.c @@ -772,7 +772,7 @@ int maria_create(const char *name, enum data_file_type datafile_type, share.base.min_block_length= share.base.pack_reclength; if (! (flags & HA_DONT_TOUCH_DATA)) - share.state.create_time= (long) time((time_t*) 0); + share.state.create_time= time((time_t*) 0); pthread_mutex_lock(&THR_LOCK_maria); diff --git a/storage/maria/ma_delete.c b/storage/maria/ma_delete.c index 5b0a2ac8884..0e9e5caafbf 100644 --- a/storage/maria/ma_delete.c +++ b/storage/maria/ma_delete.c @@ -169,6 +169,8 @@ my_bool _ma_ck_delete(MARIA_HA *info, MARIA_KEY *key) MARIA_KEY org_key; DBUG_ENTER("_ma_ck_delete"); + LINT_INIT_STRUCT(org_key); + save_key_data= key->data; if (share->now_transactional) { diff --git a/storage/maria/ma_extra.c b/storage/maria/ma_extra.c index 4499cca2885..7a30b613ea5 100644 --- a/storage/maria/ma_extra.c +++ b/storage/maria/ma_extra.c @@ -305,6 +305,12 @@ int maria_extra(MARIA_HA *info, enum ha_extra_function function, pthread_mutex_unlock(&THR_LOCK_maria); break; case HA_EXTRA_PREPARE_FOR_DROP: + /* Signals about intent to delete this table */ + share->deleting= TRUE; + share->global_changed= FALSE; /* force writing changed flag */ + /* To force repair if reopened */ + _ma_mark_file_changed(info); + /* Fall trough */ case HA_EXTRA_PREPARE_FOR_RENAME: { my_bool do_flush= test(function != HA_EXTRA_PREPARE_FOR_DROP); diff --git a/storage/maria/ma_ft_boolean_search.c b/storage/maria/ma_ft_boolean_search.c index 763b8827a6c..d302892ce05 100644 --- a/storage/maria/ma_ft_boolean_search.c +++ b/storage/maria/ma_ft_boolean_search.c @@ -180,7 +180,7 @@ typedef struct st_my_ftb_param static int ftb_query_add_word(MYSQL_FTPARSER_PARAM *param, - char *word, int word_len, + const uchar *word, mysql_ft_size_t word_len, MYSQL_FTPARSER_BOOLEAN_INFO *info) { MY_FTB_PARAM *ftb_param= param->mysql_ftparam; @@ -282,24 +282,24 @@ static int ftb_query_add_word(MYSQL_FTPARSER_PARAM *param, static int ftb_parse_query_internal(MYSQL_FTPARSER_PARAM *param, - char *query, int len) + const uchar *query, mysql_ft_size_t len) { MY_FTB_PARAM *ftb_param= param->mysql_ftparam; MYSQL_FTPARSER_BOOLEAN_INFO info; CHARSET_INFO *cs= ftb_param->ftb->charset; - uchar **start= (uchar**) &query; - uchar *end= (uchar*) query + len; + const uchar **start= &query; + const uchar *end= query + len; FT_WORD w; info.prev= ' '; info.quot= 0; while (maria_ft_get_word(cs, start, end, &w, &info)) - param->mysql_add_word(param, (char *) w.pos, w.len, &info); + param->mysql_add_word(param, w.pos, w.len, &info); return(0); } -static int _ftb_parse_query(FTB *ftb, uchar *query, uint len, +static int _ftb_parse_query(FTB *ftb, uchar *query, size_t len, struct st_mysql_ftparser *parser) { MYSQL_FTPARSER_PARAM *param; @@ -321,7 +321,7 @@ static int _ftb_parse_query(FTB *ftb, uchar *query, uint len, param->mysql_add_word= ftb_query_add_word; param->mysql_ftparam= (void *)&ftb_param; param->cs= ftb->charset; - param->doc= (char*) query; + param->doc= query; param->length= len; param->flags= 0; param->mode= MYSQL_FTPARSER_FULL_BOOLEAN_INFO; @@ -539,8 +539,8 @@ static void _ftb_init_index_search(FT_INFO *ftb) FT_INFO * maria_ft_init_boolean_search(MARIA_HA *info, uint keynr, - uchar *query, - uint query_len, CHARSET_INFO *cs) + uchar *query, size_t query_len, + CHARSET_INFO *cs) { FTB *ftb; FTB_EXPR *ftbe; @@ -592,7 +592,7 @@ FT_INFO * maria_ft_init_boolean_search(MARIA_HA *info, uint keynr, sizeof(FTB_WORD *)*ftb->queue.elements); memcpy(ftb->list, ftb->queue.root+1, sizeof(FTB_WORD *)*ftb->queue.elements); my_qsort2(ftb->list, ftb->queue.elements, sizeof(FTB_WORD *), - (qsort2_cmp)FTB_WORD_cmp_list, ftb->charset); + (qsort2_cmp)FTB_WORD_cmp_list, (void*) ftb->charset); if (ftb->queue.elements<2) ftb->with_scan &= ~FTB_FLAG_TRUNC; ftb->state=READY; return ftb; @@ -615,8 +615,9 @@ typedef struct st_my_ftb_phrase_param static int ftb_phrase_add_word(MYSQL_FTPARSER_PARAM *param, - char *word, int word_len, - MYSQL_FTPARSER_BOOLEAN_INFO *boolean_info __attribute__((unused))) + const uchar *word, mysql_ft_size_t word_len, + MYSQL_FTPARSER_BOOLEAN_INFO + *boolean_info __attribute__((unused))) { MY_FTB_PHRASE_PARAM *phrase_param= param->mysql_ftparam; FT_WORD *w= (FT_WORD *)phrase_param->document->data; @@ -647,15 +648,16 @@ static int ftb_phrase_add_word(MYSQL_FTPARSER_PARAM *param, static int ftb_check_phrase_internal(MYSQL_FTPARSER_PARAM *param, - char *document, int len) + const uchar *document, + mysql_ft_size_t len) { FT_WORD word; MY_FTB_PHRASE_PARAM *phrase_param= param->mysql_ftparam; - const uchar *docend= (uchar*) document + len; - while (maria_ft_simple_get_word(phrase_param->cs, (uchar**) &document, + const uchar *docend= document + len; + while (maria_ft_simple_get_word(phrase_param->cs, &document, docend, &word, FALSE)) { - param->mysql_add_word(param, (char*) word.pos, word.len, 0); + param->mysql_add_word(param, word.pos, word.len, 0); if (phrase_param->match) break; } @@ -678,8 +680,8 @@ static int ftb_check_phrase_internal(MYSQL_FTPARSER_PARAM *param, -1 is returned if error occurs. */ -static int _ftb_check_phrase(FTB *ftb, const uchar *document, uint len, - FTB_EXPR *ftbe, struct st_mysql_ftparser *parser) +static int _ftb_check_phrase(FTB *ftb, const uchar *document, size_t len, + FTB_EXPR *ftbe, struct st_mysql_ftparser *parser) { MY_FTB_PHRASE_PARAM ftb_param; MYSQL_FTPARSER_PARAM *param; @@ -699,7 +701,7 @@ static int _ftb_check_phrase(FTB *ftb, const uchar *document, uint len, param->mysql_add_word= ftb_phrase_add_word; param->mysql_ftparam= (void *)&ftb_param; param->cs= ftb->charset; - param->doc= (char *) document; + param->doc= document; param->length= len; param->flags= 0; param->mode= MYSQL_FTPARSER_WITH_STOPWORDS; @@ -872,8 +874,9 @@ typedef struct st_my_ftb_find_param static int ftb_find_relevance_add_word(MYSQL_FTPARSER_PARAM *param, - char *word, int len, - MYSQL_FTPARSER_BOOLEAN_INFO *boolean_info __attribute__((unused))) + const uchar *word, mysql_ft_size_t len, + MYSQL_FTPARSER_BOOLEAN_INFO + *boolean_info __attribute__((unused))) { MY_FTB_FIND_PARAM *ftb_param= param->mysql_ftparam; FT_INFO *ftb= ftb_param->ftb; @@ -933,15 +936,14 @@ static int ftb_find_relevance_add_word(MYSQL_FTPARSER_PARAM *param, static int ftb_find_relevance_parse(MYSQL_FTPARSER_PARAM *param, - char *doc, int len) + const uchar *doc, mysql_ft_size_t len) { MY_FTB_FIND_PARAM *ftb_param= param->mysql_ftparam; FT_INFO *ftb= ftb_param->ftb; - uchar *end= (uchar*) doc + len; + const uchar *end= doc + len; FT_WORD w; - while (maria_ft_simple_get_word(ftb->charset, (uchar**) &doc, - end, &w, TRUE)) - param->mysql_add_word(param, (char *) w.pos, w.len, 0); + while (maria_ft_simple_get_word(ftb->charset, &doc, end, &w, TRUE)) + param->mysql_add_word(param, w.pos, w.len, 0); return(0); } @@ -998,7 +1000,7 @@ float maria_ft_boolean_find_relevance(FT_INFO *ftb, uchar *record, uint length) { if (!ftsi.pos) continue; - param->doc= (char *)ftsi.pos; + param->doc= ftsi.pos; param->length= ftsi.len; if (unlikely(parser->parse(param))) return 0; diff --git a/storage/maria/ma_ft_nlq_search.c b/storage/maria/ma_ft_nlq_search.c index 1a85c50174e..927f34f8b72 100644 --- a/storage/maria/ma_ft_nlq_search.c +++ b/storage/maria/ma_ft_nlq_search.c @@ -51,6 +51,7 @@ typedef struct st_ft_superdoc double tmp_weight; } FT_SUPERDOC; + static int FT_SUPERDOC_cmp(void* cmp_arg __attribute__((unused)), FT_SUPERDOC *p1, FT_SUPERDOC *p2) { @@ -63,7 +64,8 @@ static int FT_SUPERDOC_cmp(void* cmp_arg __attribute__((unused)), static int walk_and_match(FT_WORD *word, uint32 count, ALL_IN_ONE *aio) { - int subkeys, r; + FT_WEIGTH subkeys; + int r; uint doc_cnt; FT_SUPERDOC sdoc, *sptr; TREE_ELEMENT *selem; @@ -90,9 +92,9 @@ static int walk_and_match(FT_WORD *word, uint32 count, ALL_IN_ONE *aio) /* Skip rows inserted by current inserted */ for (r= _ma_search(info, &key, SEARCH_FIND, key_root) ; !r && - (subkeys=ft_sintXkorr(info->last_key.data + - info->last_key.data_length + - info->last_key.ref_length - extra)) > 0 && + (subkeys.i= ft_sintXkorr(info->last_key.data + + info->last_key.data_length + + info->last_key.ref_length - extra)) > 0 && info->cur_row.lastpos >= info->state->data_file_length ; r= _ma_search_next(info, &info->last_key, SEARCH_BIGGER, key_root)) ; @@ -111,7 +113,7 @@ static int walk_and_match(FT_WORD *word, uint32 count, ALL_IN_ONE *aio) key.data+1, key.data_length-1, 0, 0)) break; - if (subkeys<0) + if (subkeys.i < 0) { if (doc_cnt) DBUG_RETURN(1); /* index is corrupted */ @@ -127,7 +129,8 @@ static int walk_and_match(FT_WORD *word, uint32 count, ALL_IN_ONE *aio) goto do_skip; } #if HA_FT_WTYPE == HA_KEYTYPE_FLOAT - tmp_weight=*(float*)&subkeys; + /* The weight we read was actually a float */ + tmp_weight= subkeys.f; #else #error #endif @@ -162,9 +165,9 @@ static int walk_and_match(FT_WORD *word, uint32 count, ALL_IN_ONE *aio) else r= _ma_search(info, &info->last_key, SEARCH_BIGGER, key_root); do_skip: - while ((subkeys=ft_sintXkorr(info->last_key.data + - info->last_key.data_length + - info->last_key.ref_length - extra)) > 0 && + while ((subkeys.i= ft_sintXkorr(info->last_key.data + + info->last_key.data_length + + info->last_key.ref_length - extra)) > 0 && !r && info->cur_row.lastpos >= info->state->data_file_length) r= _ma_search_next(info, &info->last_key, SEARCH_BIGGER, key_root); @@ -205,7 +208,7 @@ static int FT_DOC_cmp(void *unused __attribute__((unused)), FT_INFO *maria_ft_init_nlq_search(MARIA_HA *info, uint keynr, uchar *query, - uint query_len, uint flags, uchar *record) + size_t query_len, uint flags, uchar *record) { TREE wtree; ALL_IN_ONE aio; diff --git a/storage/maria/ma_ft_parser.c b/storage/maria/ma_ft_parser.c index bdfbbb936ce..b35c2227ca2 100644 --- a/storage/maria/ma_ft_parser.c +++ b/storage/maria/ma_ft_parser.c @@ -109,10 +109,11 @@ my_bool maria_ft_boolean_check_syntax_string(const uchar *str) 3 - right bracket 4 - stopword found */ -uchar maria_ft_get_word(CHARSET_INFO *cs, uchar **start, uchar *end, +uchar maria_ft_get_word(CHARSET_INFO *cs, const uchar **start, + const uchar *end, FT_WORD *word, MYSQL_FTPARSER_BOOLEAN_INFO *param) { - uchar *doc=*start; + const uchar *doc= *start; int ctype; uint mwc, length; int mbl; @@ -203,11 +204,11 @@ ret: return param->type; } -uchar maria_ft_simple_get_word(CHARSET_INFO *cs, uchar **start, +uchar maria_ft_simple_get_word(CHARSET_INFO *cs, const uchar **start, const uchar *end, FT_WORD *word, my_bool skip_stopwords) { - uchar *doc= *start; + const uchar *doc= *start; uint mwc, length; int ctype, mbl; DBUG_ENTER("maria_ft_simple_get_word"); @@ -253,14 +254,16 @@ void maria_ft_parse_init(TREE *wtree, CHARSET_INFO *cs) { DBUG_ENTER("maria_ft_parse_init"); if (!is_tree_inited(wtree)) - init_tree(wtree,0,0,sizeof(FT_WORD),(qsort_cmp2)&FT_WORD_cmp,0,NULL, cs); + init_tree(wtree,0,0,sizeof(FT_WORD),(qsort_cmp2)&FT_WORD_cmp,0, NULL, + (void*) cs); DBUG_VOID_RETURN; } static int maria_ft_add_word(MYSQL_FTPARSER_PARAM *param, - char *word, int word_len, - MYSQL_FTPARSER_BOOLEAN_INFO *boolean_info __attribute__((unused))) + const uchar *word, mysql_ft_size_t word_len, + MYSQL_FTPARSER_BOOLEAN_INFO *boolean_info + __attribute__((unused))) { TREE *wtree; FT_WORD w; @@ -276,7 +279,7 @@ static int maria_ft_add_word(MYSQL_FTPARSER_PARAM *param, w.pos= ptr; } else - w.pos= (uchar *) word; + w.pos= word; w.len= word_len; if (!tree_insert(wtree, &w, 0, wtree->custom_arg)) { @@ -288,24 +291,25 @@ static int maria_ft_add_word(MYSQL_FTPARSER_PARAM *param, static int maria_ft_parse_internal(MYSQL_FTPARSER_PARAM *param, - char *doc_arg, int doc_len) + const uchar *doc_arg, + mysql_ft_size_t doc_len) { - uchar *doc= (uchar*) doc_arg; - uchar *end= doc + doc_len; + const uchar *doc= doc_arg; + const uchar *end= doc + doc_len; MY_FT_PARSER_PARAM *ft_param=param->mysql_ftparam; TREE *wtree= ft_param->wtree; FT_WORD w; DBUG_ENTER("maria_ft_parse_internal"); while (maria_ft_simple_get_word(wtree->custom_arg, &doc, end, &w, TRUE)) - if (param->mysql_add_word(param, (char *) w.pos, w.len, 0)) + if (param->mysql_add_word(param, w.pos, w.len, 0)) DBUG_RETURN(1); DBUG_RETURN(0); } -int maria_ft_parse(TREE *wtree, uchar *doc, int doclen, - struct st_mysql_ftparser *parser, +int maria_ft_parse(TREE *wtree, uchar *doc, size_t doclen, + struct st_mysql_ftparser *parser, MYSQL_FTPARSER_PARAM *param, MEM_ROOT *mem_root) { MY_FT_PARSER_PARAM my_param; @@ -318,7 +322,7 @@ int maria_ft_parse(TREE *wtree, uchar *doc, int doclen, param->mysql_add_word= maria_ft_add_word; param->mysql_ftparam= &my_param; param->cs= wtree->custom_arg; - param->doc= (char *) doc; + param->doc= doc; param->length= doclen; param->mode= MYSQL_FTPARSER_SIMPLE_MODE; DBUG_RETURN(parser->parse(param)); @@ -378,8 +382,8 @@ MYSQL_FTPARSER_PARAM *maria_ftparser_call_initializer(MARIA_HA *info, mysql_add_word != 0 - parser is initialized, or no initialization needed. */ info->ftparser_param[ftparser_nr].mysql_add_word= - (int (*)(struct st_mysql_ftparser_param *, char *, int, - MYSQL_FTPARSER_BOOLEAN_INFO *)) 1; + (int (*)(struct st_mysql_ftparser_param *, const uchar *, + mysql_ft_size_t, MYSQL_FTPARSER_BOOLEAN_INFO *)) 1; if (parser->init && parser->init(&info->ftparser_param[ftparser_nr])) return 0; } diff --git a/storage/maria/ma_ftdefs.h b/storage/maria/ma_ftdefs.h index 7e83d774aed..4ce4e9e22ba 100644 --- a/storage/maria/ma_ftdefs.h +++ b/storage/maria/ma_ftdefs.h @@ -96,7 +96,7 @@ #define FTB_RQUOT (ft_boolean_syntax[11]) typedef struct st_maria_ft_word { - uchar * pos; + const uchar * pos; uint len; double weight; } FT_WORD; @@ -106,9 +106,9 @@ int is_stopword(char *word, uint len); MARIA_KEY *_ma_ft_make_key(MARIA_HA *, MARIA_KEY *, uint , uchar *, FT_WORD *, my_off_t); -uchar maria_ft_get_word(CHARSET_INFO *, uchar **, uchar *, FT_WORD *, - MYSQL_FTPARSER_BOOLEAN_INFO *); -uchar maria_ft_simple_get_word(CHARSET_INFO *, uchar **, const uchar *, +uchar maria_ft_get_word(CHARSET_INFO *, const uchar **, const uchar *, + FT_WORD *, MYSQL_FTPARSER_BOOLEAN_INFO *); +uchar maria_ft_simple_get_word(CHARSET_INFO *, const uchar **, const uchar *, FT_WORD *, my_bool); typedef struct _st_maria_ft_seg_iterator { @@ -122,15 +122,17 @@ void _ma_ft_segiterator_dummy_init(const uchar *, uint, FT_SEG_ITERATOR *); uint _ma_ft_segiterator(FT_SEG_ITERATOR *); void maria_ft_parse_init(TREE *, CHARSET_INFO *); -int maria_ft_parse(TREE *, uchar *, int, struct st_mysql_ftparser *parser, +int maria_ft_parse(TREE *, uchar *, size_t, struct st_mysql_ftparser *parser, MYSQL_FTPARSER_PARAM *, MEM_ROOT *); FT_WORD * maria_ft_linearize(TREE *, MEM_ROOT *); FT_WORD * _ma_ft_parserecord(MARIA_HA *, uint, const uchar *, MEM_ROOT *); uint _ma_ft_parse(TREE *, MARIA_HA *, uint, const uchar *, MYSQL_FTPARSER_PARAM *, MEM_ROOT *); -FT_INFO *maria_ft_init_nlq_search(MARIA_HA *, uint, uchar *, uint, uint, uchar *); -FT_INFO *maria_ft_init_boolean_search(MARIA_HA *, uint, uchar *, uint, CHARSET_INFO *); +FT_INFO *maria_ft_init_nlq_search(MARIA_HA *, uint, uchar *, size_t, uint, + uchar *); +FT_INFO *maria_ft_init_boolean_search(MARIA_HA *, uint, uchar *, size_t, + CHARSET_INFO *); extern const struct _ft_vft _ma_ft_vft_nlq; int maria_ft_nlq_read_next(FT_INFO *, char *); diff --git a/storage/maria/ma_init.c b/storage/maria/ma_init.c index 1f2eddd7e30..552b0767bec 100644 --- a/storage/maria/ma_init.c +++ b/storage/maria/ma_init.c @@ -82,6 +82,11 @@ void maria_end(void) maria_inited= maria_multi_threaded= FALSE; ft_free_stopwords(); ma_checkpoint_end(); + if (translog_status == TRANSLOG_OK) + { + translog_soft_sync_end(); + translog_sync(); + } if ((trid= trnman_get_max_trid()) > max_trid_in_control_file) { /* diff --git a/storage/maria/ma_key_recover.h b/storage/maria/ma_key_recover.h index 4b4198f3008..b580433c99a 100644 --- a/storage/maria/ma_key_recover.h +++ b/storage/maria/ma_key_recover.h @@ -63,7 +63,6 @@ extern my_bool write_hook_for_undo_key_insert(enum translog_record_type type, extern my_bool write_hook_for_undo_key_delete(enum translog_record_type type, TRN *trn, MARIA_HA *tbl_info, LSN *lsn, void *hook_arg); -void _ma_unpin_all_pages(MARIA_HA *info, LSN undo_lsn); my_bool _ma_log_prefix(MARIA_PAGE *page, uint changed_length, int move_length); my_bool _ma_log_suffix(MARIA_PAGE *page, uint org_length, diff --git a/storage/maria/ma_locking.c b/storage/maria/ma_locking.c index e1986a7dfaa..b355d7bc792 100644 --- a/storage/maria/ma_locking.c +++ b/storage/maria/ma_locking.c @@ -387,6 +387,9 @@ int _ma_test_if_changed(register MARIA_HA *info) open_count is not maintained on disk for temporary tables. */ +#define _MA_ALREADY_MARKED_FILE_CHANGED \ + ((share->state.changed & STATE_CHANGED) && share->global_changed) + int _ma_mark_file_changed(MARIA_HA *info) { uchar buff[3]; @@ -394,8 +397,6 @@ int _ma_mark_file_changed(MARIA_HA *info) int error= 1; DBUG_ENTER("_ma_mark_file_changed"); -#define _MA_ALREADY_MARKED_FILE_CHANGED \ - ((share->state.changed & STATE_CHANGED) && share->global_changed) if (_MA_ALREADY_MARKED_FILE_CHANGED) DBUG_RETURN(0); pthread_mutex_lock(&share->intern_lock); /* recheck under mutex */ diff --git a/storage/maria/ma_loghandler.c b/storage/maria/ma_loghandler.c index c5efab506f1..476b091594e 100644 --- a/storage/maria/ma_loghandler.c +++ b/storage/maria/ma_loghandler.c @@ -18,6 +18,7 @@ #include "ma_blockrec.h" /* for some constants and in-write hooks */ #include "ma_key_recover.h" /* For some in-write hooks */ #include "ma_checkpoint.h" +#include "ma_servicethread.h" /* On Windows, neither my_open() nor my_sync() work for directories. @@ -47,6 +48,15 @@ #include <m_ctype.h> #endif +/** @brief protects checkpoint_in_progress */ +static pthread_mutex_t LOCK_soft_sync; +/** @brief for killing the background checkpoint thread */ +static pthread_cond_t COND_soft_sync; +/** @brief control structure for checkpoint background thread */ +static MA_SERVICE_THREAD_CONTROL soft_sync_control= + {THREAD_DEAD, FALSE, &LOCK_soft_sync, &COND_soft_sync}; + + /* transaction log file descriptor */ typedef struct st_translog_file { @@ -124,10 +134,24 @@ struct st_translog_buffer /* Previous buffer offset to detect it flush finish */ TRANSLOG_ADDRESS prev_buffer_offset; /* + If the buffer was forced to close it save value of its horizon + otherwise LSN_IMPOSSIBLE + */ + TRANSLOG_ADDRESS pre_force_close_horizon; + /* How much is written (or will be written when copy_to_buffer_in_progress become 0) to this buffer */ translog_size_t size; + /* + When moving from one log buffer to another, we write the last of the + previous buffer to file and then move to start using the new log + buffer. In the case of a part filed last page, this page is not moved + to the start of the new buffer but instead we set the 'skip_data' + variable to tell us how much data at the beginning of the buffer is not + relevant. + */ + uint skipped_data; /* File handler for this buffer */ TRANSLOG_FILE *file; /* Threads which are waiting for buffer filling/freeing */ @@ -304,6 +328,7 @@ struct st_translog_descriptor */ pthread_mutex_t log_flush_lock; pthread_cond_t log_flush_cond; + pthread_cond_t new_goal_cond; /* Protects changing of headers of finished files (max_lsn) */ pthread_mutex_t file_header_lock; @@ -344,13 +369,39 @@ static struct st_translog_descriptor log_descriptor; ulong log_purge_type= TRANSLOG_PURGE_IMMIDIATE; ulong log_file_size= TRANSLOG_FILE_SIZE; +/* sync() of log files directory mode */ ulong sync_log_dir= TRANSLOG_SYNC_DIR_NEWFILE; +ulong maria_group_commit= TRANSLOG_GCOMMIT_NONE; +ulong maria_group_commit_interval= 0; /* Marker for end of log */ static uchar end_of_log= 0; #define END_OF_LOG &end_of_log +/** + Switch for "soft" sync (no real sync() but periodical sync by service + thread) +*/ +static volatile my_bool soft_sync= FALSE; +/** + Switch for "hard" group commit mode +*/ +static volatile my_bool hard_group_commit= FALSE; +/** + File numbers interval which have to be sync() +*/ +static uint32 soft_sync_min= 0; +static uint32 soft_sync_max= 0; +static uint32 soft_need_sync= 1; +/** + stores interval in microseconds +*/ +static uint32 group_commit_wait= 0; enum enum_translog_status translog_status= TRANSLOG_UNINITED; +ulonglong translog_syncs= 0; /* Number of sync()s */ + +/* time of last flush */ +static ulonglong flush_start= 0; /* chunk types */ #define TRANSLOG_CHUNK_LSN 0x00 /* 0 chunk refer as LSN (head or tail */ @@ -980,12 +1031,17 @@ static TRANSLOG_FILE *get_logfile_by_number(uint32 file_no) static TRANSLOG_FILE *get_current_logfile() { TRANSLOG_FILE *file; + DBUG_ENTER("get_current_logfile"); rw_rdlock(&log_descriptor.open_files_lock); + DBUG_PRINT("info", ("max_file: %lu min_file: %lu open_files: %lu", + (ulong) log_descriptor.max_file, + (ulong) log_descriptor.min_file, + (ulong) log_descriptor.open_files.elements)); DBUG_ASSERT(log_descriptor.max_file - log_descriptor.min_file + 1 == log_descriptor.open_files.elements); file= *dynamic_element(&log_descriptor.open_files, 0, TRANSLOG_FILE **); rw_unlock(&log_descriptor.open_files_lock); - return (file); + DBUG_RETURN(file); } uchar NEAR maria_trans_file_magic[]= @@ -1069,6 +1125,7 @@ static my_bool translog_write_file_header() static my_bool translog_max_lsn_to_header(File file, LSN lsn) { uchar lsn_buff[LSN_STORE_SIZE]; + my_bool rc; DBUG_ENTER("translog_max_lsn_to_header"); DBUG_PRINT("enter", ("File descriptor: %ld " "lsn: (%lu,0x%lx)", @@ -1077,11 +1134,17 @@ static my_bool translog_max_lsn_to_header(File file, LSN lsn) lsn_store(lsn_buff, lsn); - DBUG_RETURN(my_pwrite(file, lsn_buff, - LSN_STORE_SIZE, - (LOG_HEADER_DATA_SIZE - LSN_STORE_SIZE), - log_write_flags) != 0 || - my_sync(file, MYF(MY_WME)) != 0); + rc= (my_pwrite(file, lsn_buff, + LSN_STORE_SIZE, + (LOG_HEADER_DATA_SIZE - LSN_STORE_SIZE), + log_write_flags) != 0 || + my_sync(file, MYF(MY_WME)) != 0); + /* + We should not increase counter in case of error above, but it is so + unlikely that we can ignore this case + */ + translog_syncs++; + DBUG_RETURN(rc); } @@ -1394,6 +1457,7 @@ LSN translog_get_file_max_lsn_stored(uint32 file) { LOGHANDLER_FILE_INFO info; + LINT_INIT_STRUCT(info); File fd= open_logfile_by_number_no_cache(file); if ((fd < 0) || (translog_read_file_header(&info, fd) | my_close(fd, MYF(MY_WME)))) @@ -1423,7 +1487,9 @@ LSN translog_get_file_max_lsn_stored(uint32 file) static my_bool translog_buffer_init(struct st_translog_buffer *buffer, int num) { DBUG_ENTER("translog_buffer_init"); - buffer->prev_last_lsn= buffer->last_lsn= LSN_IMPOSSIBLE; + buffer->pre_force_close_horizon= + buffer->prev_last_lsn= buffer->last_lsn= + LSN_IMPOSSIBLE; DBUG_PRINT("info", ("last_lsn and prev_last_lsn set to 0 buffer: 0x%lx", (ulong) buffer)); @@ -1435,6 +1501,7 @@ static my_bool translog_buffer_init(struct st_translog_buffer *buffer, int num) memset(buffer->buffer, TRANSLOG_FILLER, TRANSLOG_WRITE_BUFFER); /* Buffer size */ buffer->size= 0; + buffer->skipped_data= 0; /* cond of thread which is waiting for buffer filling */ if (pthread_cond_init(&buffer->waiting_filling_buffer, 0)) DBUG_RETURN(1); @@ -1489,7 +1556,10 @@ static my_bool translog_close_log_file(TRANSLOG_FILE *file) TODO: sync only we have changed the log */ if (!file->is_sync) + { rc= my_sync(file->handler.file, MYF(MY_WME)); + translog_syncs++; + } rc|= my_close(file->handler.file, MYF(MY_WME)); my_free(file, MYF(0)); return test(rc); @@ -2044,7 +2114,8 @@ static void translog_start_buffer(struct st_translog_buffer *buffer, (ulong) LSN_OFFSET(log_descriptor.horizon), (ulong) LSN_OFFSET(log_descriptor.horizon))); DBUG_ASSERT(buffer_no == buffer->buffer_no); - buffer->prev_last_lsn= buffer->last_lsn= LSN_IMPOSSIBLE; + buffer->pre_force_close_horizon= + buffer->prev_last_lsn= buffer->last_lsn= LSN_IMPOSSIBLE; DBUG_PRINT("info", ("last_lsn and prev_last_lsn set to 0 buffer: 0x%lx", (ulong) buffer)); buffer->offset= log_descriptor.horizon; @@ -2052,6 +2123,7 @@ static void translog_start_buffer(struct st_translog_buffer *buffer, buffer->file= get_current_logfile(); buffer->overlay= 0; buffer->size= 0; + buffer->skipped_data= 0; translog_cursor_init(cursor, buffer, buffer_no); DBUG_PRINT("info", ("file: #%ld (%d) init cursor #%u: 0x%lx " "chaser: %d Size: %lu (%lu)", @@ -2523,6 +2595,7 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer) TRANSLOG_ADDRESS offset= buffer->offset; TRANSLOG_FILE *file= buffer->file; uint8 ver= buffer->ver; + uint skipped_data; DBUG_ENTER("translog_buffer_flush"); DBUG_PRINT("enter", ("Buffer: #%u 0x%lx file: %d offset: (%lu,0x%lx) size: %lu", @@ -2557,6 +2630,8 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer) disk */ file= buffer->file; + skipped_data= buffer->skipped_data; + DBUG_ASSERT(skipped_data < TRANSLOG_PAGE_SIZE); for (i= 0, pg= LSN_OFFSET(buffer->offset) / TRANSLOG_PAGE_SIZE; i < buffer->size; i+= TRANSLOG_PAGE_SIZE, pg++) @@ -2573,13 +2648,16 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer) DBUG_ASSERT(i + TRANSLOG_PAGE_SIZE <= buffer->size); if (translog_status != TRANSLOG_OK && translog_status != TRANSLOG_SHUTDOWN) DBUG_RETURN(1); - if (pagecache_inject(log_descriptor.pagecache, + if (pagecache_write_part(log_descriptor.pagecache, &file->handler, pg, 3, buffer->buffer + i, PAGECACHE_PLAIN_PAGE, PAGECACHE_LOCK_LEFT_UNLOCKED, - PAGECACHE_PIN_LEFT_UNPINNED, 0, - LSN_IMPOSSIBLE)) + PAGECACHE_PIN_LEFT_UNPINNED, + PAGECACHE_WRITE_DONE, 0, + LSN_IMPOSSIBLE, + skipped_data, + TRANSLOG_PAGE_SIZE - skipped_data)) { DBUG_PRINT("error", ("Can't write page (%lu,0x%lx) to pagecache, error: %d", @@ -2589,10 +2667,12 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer) translog_stop_writing(); DBUG_RETURN(1); } + skipped_data= 0; } file->is_sync= 0; - if (my_pwrite(file->handler.file, buffer->buffer, - buffer->size, LSN_OFFSET(buffer->offset), + if (my_pwrite(file->handler.file, buffer->buffer + buffer->skipped_data, + buffer->size - buffer->skipped_data, + LSN_OFFSET(buffer->offset) + buffer->skipped_data, log_write_flags)) { DBUG_PRINT("error", ("Can't write buffer (%lu,0x%lx) size %lu " @@ -2823,8 +2903,8 @@ static my_bool translog_page_validator(uchar *page, data->was_recovered= 0; - if (uint3korr(page) != page_no || - uint3korr(page + 3) != data->number) + if ((pgcache_page_no_t) uint3korr(page) != page_no || + (uint32) uint3korr(page + 3) != data->number) { DBUG_PRINT("error", ("Page (%lu,0x%lx): " "page address written in the page is incorrect: " @@ -2985,6 +3065,7 @@ restart: uchar *from, *table= NULL; int is_last_unfinished_page; uint last_protected_sector= 0; + uint skipped_data= curr_buffer->skipped_data; TRANSLOG_FILE file_copy; uint8 ver= curr_buffer->ver; translog_wait_for_writers(curr_buffer); @@ -2997,7 +3078,38 @@ restart: } DBUG_ASSERT(LSN_FILE_NO(addr) == LSN_FILE_NO(curr_buffer->offset)); from= curr_buffer->buffer + (addr - curr_buffer->offset); - memcpy(buffer, from, TRANSLOG_PAGE_SIZE); + if (skipped_data && addr == curr_buffer->offset) + { + /* + We read page part of which is not present in buffer, + so we should read absent part from file (page cache actually) + */ + file= get_logfile_by_number(file_no); + DBUG_ASSERT(file != NULL); + /* + it's ok to not lock the page because: + - The log handler has it's own page cache. + - There is only one thread that can access the log + cache at a time + */ + if (!(buffer= pagecache_read(log_descriptor.pagecache, + &file->handler, + LSN_OFFSET(addr) / TRANSLOG_PAGE_SIZE, + 3, buffer, + PAGECACHE_PLAIN_PAGE, + PAGECACHE_LOCK_LEFT_UNLOCKED, + NULL))) + DBUG_RETURN(NULL); + } + else + skipped_data= 0; /* Read after skipped in buffer data */ + /* + Now we have correct data in buffer up to 'skipped_data'. The + following memcpy() will move the data from the internal buffer + that was not yet on disk. + */ + memcpy(buffer + skipped_data, from + skipped_data, + TRANSLOG_PAGE_SIZE - skipped_data); /* We can use copy then in translog_page_validator() because it do not put it permanently somewhere. @@ -3291,6 +3403,7 @@ static my_bool translog_truncate_log(TRANSLOG_ADDRESS addr) uint32 next_page_offset, page_rest; uint32 i; File fd; + int rc; TRANSLOG_VALIDATOR_DATA data; char path[FN_REFLEN]; uchar page_buff[TRANSLOG_PAGE_SIZE]; @@ -3316,14 +3429,19 @@ static my_bool translog_truncate_log(TRANSLOG_ADDRESS addr) TRANSLOG_PAGE_SIZE); page_rest= next_page_offset - LSN_OFFSET(addr); memset(page_buff, TRANSLOG_FILLER, page_rest); - if ((fd= open_logfile_by_number_no_cache(LSN_FILE_NO(addr))) < 0 || - ((my_chsize(fd, next_page_offset, TRANSLOG_FILLER, MYF(MY_WME)) || - (page_rest && my_pwrite(fd, page_buff, page_rest, LSN_OFFSET(addr), - log_write_flags)) || - my_sync(fd, MYF(MY_WME))) | - my_close(fd, MYF(MY_WME))) || - (sync_log_dir >= TRANSLOG_SYNC_DIR_ALWAYS && - sync_dir(log_descriptor.directory_fd, MYF(MY_WME | MY_IGNORE_BADFD)))) + rc= ((fd= open_logfile_by_number_no_cache(LSN_FILE_NO(addr))) < 0 || + ((my_chsize(fd, next_page_offset, TRANSLOG_FILLER, MYF(MY_WME)) || + (page_rest && my_pwrite(fd, page_buff, page_rest, LSN_OFFSET(addr), + log_write_flags)) || + my_sync(fd, MYF(MY_WME))))); + translog_syncs++; + rc|= (fd > 0 && my_close(fd, MYF(MY_WME))); + if (sync_log_dir >= TRANSLOG_SYNC_DIR_ALWAYS) + { + rc|= sync_dir(log_descriptor.directory_fd, MYF(MY_WME | MY_IGNORE_BADFD)); + translog_syncs++; + } + if (rc) DBUG_RETURN(1); /* fix the horizon */ @@ -3483,7 +3601,10 @@ my_bool translog_init_with_table(const char *directory, my_bool version_changed= 0; DBUG_ENTER("translog_init_with_table"); + translog_syncs= 0; + flush_start= 0; id_to_share= NULL; + log_descriptor.directory_fd= -1; log_descriptor.is_everything_flushed= 1; log_descriptor.flush_in_progress= 0; @@ -3511,6 +3632,7 @@ my_bool translog_init_with_table(const char *directory, pthread_mutex_init(&log_descriptor.dirty_buffer_mask_lock, MY_MUTEX_INIT_FAST) || pthread_cond_init(&log_descriptor.log_flush_cond, 0) || + pthread_cond_init(&log_descriptor.new_goal_cond, 0) || my_rwlock_init(&log_descriptor.open_files_lock, NULL) || my_init_dynamic_array(&log_descriptor.open_files, @@ -3912,7 +4034,6 @@ my_bool translog_init_with_table(const char *directory, log_descriptor.flushed= log_descriptor.horizon; log_descriptor.in_buffers_only= log_descriptor.bc.buffer->offset; log_descriptor.max_lsn= LSN_IMPOSSIBLE; /* set to 0 */ - log_descriptor.previous_flush_horizon= log_descriptor.horizon; /* Now 'flushed' is set to 'horizon' value, but 'horizon' is (potentially) address of the next LSN and we want indicate that all LSNs that are @@ -3995,6 +4116,10 @@ my_bool translog_init_with_table(const char *directory, It is beginning of the log => there is no LSNs in the log => There is no harm in leaving it "as-is". */ + log_descriptor.previous_flush_horizon= log_descriptor.horizon; + DBUG_PRINT("info", ("previous_flush_horizon: (%lu,0x%lx)", + LSN_IN_PARTS(log_descriptor. + previous_flush_horizon))); DBUG_RETURN(0); } file_no--; @@ -4070,6 +4195,9 @@ my_bool translog_init_with_table(const char *directory, translog_free_record_header(&rec); } } + log_descriptor.previous_flush_horizon= log_descriptor.horizon; + DBUG_PRINT("info", ("previous_flush_horizon: (%lu,0x%lx)", + LSN_IN_PARTS(log_descriptor.previous_flush_horizon))); DBUG_RETURN(0); err: ma_message_no_user(0, "log initialization failed"); @@ -4157,6 +4285,7 @@ void translog_destroy() pthread_mutex_destroy(&log_descriptor.log_flush_lock); pthread_mutex_destroy(&log_descriptor.dirty_buffer_mask_lock); pthread_cond_destroy(&log_descriptor.log_flush_cond); + pthread_cond_destroy(&log_descriptor.new_goal_cond); rwlock_destroy(&log_descriptor.open_files_lock); delete_dynamic(&log_descriptor.open_files); delete_dynamic(&log_descriptor.unfinished_files); @@ -6885,11 +7014,11 @@ int translog_read_record_header_from_buffer(uchar *page, { translog_size_t res; DBUG_ENTER("translog_read_record_header_from_buffer"); + DBUG_PRINT("info", ("page byte: 0x%x offset: %u", + (uint) page[page_offset], (uint) page_offset)); DBUG_ASSERT(translog_is_LSN_chunk(page[page_offset])); DBUG_ASSERT(translog_status == TRANSLOG_OK || translog_status == TRANSLOG_READONLY); - DBUG_PRINT("info", ("page byte: 0x%x offset: %u", - (uint) page[page_offset], (uint) page_offset)); buff->type= (page[page_offset] & TRANSLOG_REC_TYPE); buff->short_trid= uint2korr(page + page_offset + 1); DBUG_PRINT("info", ("Type %u, Short TrID %u, LSN (%lu,0x%lx)", @@ -7356,27 +7485,27 @@ static void translog_force_current_buffer_to_finish() "Buffer addr: (%lu,0x%lx) " "Page addr: (%lu,0x%lx) " "size: %lu (%lu) Pg: %u left: %u in progress %u", - (uint) log_descriptor.bc.buffer_no, - (ulong) log_descriptor.bc.buffer, - LSN_IN_PARTS(log_descriptor.bc.buffer->offset), + (uint) old_buffer_no, + (ulong) old_buffer, + LSN_IN_PARTS(old_buffer->offset), (ulong) LSN_FILE_NO(log_descriptor.horizon), (ulong) (LSN_OFFSET(log_descriptor.horizon) - log_descriptor.bc.current_page_fill), - (ulong) log_descriptor.bc.buffer->size, + (ulong) old_buffer->size, (ulong) (log_descriptor.bc.ptr -log_descriptor.bc. buffer->buffer), (uint) log_descriptor.bc.current_page_fill, (uint) left, - (uint) log_descriptor.bc.buffer-> + (uint) old_buffer-> copy_to_buffer_in_progress)); translog_lock_assert_owner(); LINT_INIT(current_page_fill); - new_buff_beginning= log_descriptor.bc.buffer->offset; - new_buff_beginning+= log_descriptor.bc.buffer->size; /* increase offset */ + new_buff_beginning= old_buffer->offset; + new_buff_beginning+= old_buffer->size; /* increase offset */ DBUG_ASSERT(log_descriptor.bc.ptr !=NULL); DBUG_ASSERT(LSN_FILE_NO(log_descriptor.horizon) == - LSN_FILE_NO(log_descriptor.bc.buffer->offset)); + LSN_FILE_NO(old_buffer->offset)); translog_check_cursor(&log_descriptor.bc); DBUG_ASSERT(left < TRANSLOG_PAGE_SIZE); if (left) @@ -7387,18 +7516,20 @@ static void translog_force_current_buffer_to_finish() */ DBUG_PRINT("info", ("left: %u", (uint) left)); + old_buffer->pre_force_close_horizon= + old_buffer->offset + old_buffer->size; /* decrease offset */ new_buff_beginning-= log_descriptor.bc.current_page_fill; current_page_fill= log_descriptor.bc.current_page_fill; memset(log_descriptor.bc.ptr, TRANSLOG_FILLER, left); - log_descriptor.bc.buffer->size+= left; + old_buffer->size+= left; DBUG_PRINT("info", ("Finish Page buffer #%u: 0x%lx " "Size: %lu", - (uint) log_descriptor.bc.buffer->buffer_no, - (ulong) log_descriptor.bc.buffer, - (ulong) log_descriptor.bc.buffer->size)); - DBUG_ASSERT(log_descriptor.bc.buffer->buffer_no == + (uint) old_buffer->buffer_no, + (ulong) old_buffer, + (ulong) old_buffer->size)); + DBUG_ASSERT(old_buffer->buffer_no == log_descriptor.bc.buffer_no); } else @@ -7509,11 +7640,21 @@ static void translog_force_current_buffer_to_finish() if (left) { - /* - TODO: do not copy beginning of the page if we have no CRC or sector - checks on - */ - memcpy(new_buffer->buffer, data, current_page_fill); + if (log_descriptor.flags & + (TRANSLOG_PAGE_CRC | TRANSLOG_SECTOR_PROTECTION)) + memcpy(new_buffer->buffer, data, current_page_fill); + else + { + /* + This page header does not change if we add more data to the page so + we can not copy it and will not overwrite later + */ + new_buffer->skipped_data= current_page_fill; +#ifndef DBUG_OFF + memset(new_buffer->buffer, 0xa5, current_page_fill); +#endif + DBUG_ASSERT(new_buffer->skipped_data < TRANSLOG_PAGE_SIZE); + } } old_buffer->next_buffer_offset= new_buffer->offset; translog_buffer_lock(new_buffer); @@ -7561,6 +7702,7 @@ void translog_flush_set_new_goal_and_wait(TRANSLOG_ADDRESS lsn) { log_descriptor.next_pass_max_lsn= lsn; log_descriptor.max_lsn_requester= pthread_self(); + pthread_cond_broadcast(&log_descriptor.new_goal_cond); } while (flush_no == log_descriptor.flush_no) { @@ -7572,67 +7714,79 @@ void translog_flush_set_new_goal_and_wait(TRANSLOG_ADDRESS lsn) /** - @brief Flush the log up to given LSN (included) + @brief sync() range of files (inclusive) and directory (by request) - @param lsn log record serial number up to which (inclusive) - the log has to be flushed + @param min min internal file number to flush + @param max max internal file number to flush + @param sync_dir need sync directory - @return Operation status + return Operation status @retval 0 OK @retval 1 Error - */ -my_bool translog_flush(TRANSLOG_ADDRESS lsn) +static my_bool translog_sync_files(uint32 min, uint32 max, + my_bool sync_dir) { - LSN sent_to_disk= LSN_IMPOSSIBLE; - TRANSLOG_ADDRESS flush_horizon; - uint fn, i; - dirty_buffer_mask_t dirty_buffer_mask; - uint8 last_buffer_no, start_buffer_no; + uint fn; my_bool rc= 0; - DBUG_ENTER("translog_flush"); - DBUG_PRINT("enter", ("Flush up to LSN: (%lu,0x%lx)", LSN_IN_PARTS(lsn))); - DBUG_ASSERT(translog_status == TRANSLOG_OK || - translog_status == TRANSLOG_READONLY); - LINT_INIT(sent_to_disk); + ulonglong flush_interval; + DBUG_ENTER("translog_sync_files"); + DBUG_PRINT("info", ("min: %lu max: %lu sync dir: %d", + (ulong) min, (ulong) max, (int) sync_dir)); + DBUG_ASSERT(min <= max); - pthread_mutex_lock(&log_descriptor.log_flush_lock); - DBUG_PRINT("info", ("Everything is flushed up to (%lu,0x%lx)", - LSN_IN_PARTS(log_descriptor.flushed))); - if (cmp_translog_addr(log_descriptor.flushed, lsn) >= 0) + flush_interval= group_commit_wait; + if (flush_interval) + flush_start= my_micro_time(); + for (fn= min; fn <= max; fn++) { - pthread_mutex_unlock(&log_descriptor.log_flush_lock); - DBUG_RETURN(0); - } - if (log_descriptor.flush_in_progress) - { - translog_flush_set_new_goal_and_wait(lsn); - if (!pthread_equal(log_descriptor.max_lsn_requester, pthread_self())) + TRANSLOG_FILE *file= get_logfile_by_number(fn); + DBUG_ASSERT(file != NULL); + if (!file->is_sync) { - /* fix lsn if it was horizon */ - if (cmp_translog_addr(lsn, log_descriptor.bc.buffer->last_lsn) > 0) - lsn= BUFFER_MAX_LSN(log_descriptor.bc.buffer); - translog_flush_wait_for_end(lsn); - pthread_mutex_unlock(&log_descriptor.log_flush_lock); - DBUG_RETURN(0); + if (my_sync(file->handler.file, MYF(MY_WME))) + { + rc= 1; + translog_stop_writing(); + DBUG_RETURN(rc); + } + translog_syncs++; + file->is_sync= 1; } - log_descriptor.next_pass_max_lsn= LSN_IMPOSSIBLE; } - log_descriptor.flush_in_progress= 1; - flush_horizon= log_descriptor.previous_flush_horizon; - DBUG_PRINT("info", ("flush_in_progress is set")); - pthread_mutex_unlock(&log_descriptor.log_flush_lock); - translog_lock(); - if (log_descriptor.is_everything_flushed) + if (sync_dir) { - DBUG_PRINT("info", ("everything is flushed")); - rc= (translog_status == TRANSLOG_READONLY); - translog_unlock(); - goto out; + if (!(rc= sync_dir(log_descriptor.directory_fd, + MYF(MY_WME | MY_IGNORE_BADFD)))) + translog_syncs++; } + DBUG_RETURN(rc); +} + + +/* + @brief Flushes buffers with LSNs in them less or equal address <lsn> + + @param lsn address up to which all LSNs should be flushed, + can be reset to real last LSN address + @parem sent_to_disk returns 'sent to disk' position + @param flush_horizon returns horizon of the flush + + @note About terminology see comment to translog_flush(). +*/ + +void translog_flush_buffers(TRANSLOG_ADDRESS *lsn, + TRANSLOG_ADDRESS *sent_to_disk, + TRANSLOG_ADDRESS *flush_horizon) +{ + dirty_buffer_mask_t dirty_buffer_mask; + uint i; + uint8 last_buffer_no, start_buffer_no; + DBUG_ENTER("translog_flush_buffers"); + /* We will recheck information when will lock buffers one by one so we can use unprotected read here (this is just for @@ -7656,15 +7810,15 @@ my_bool translog_flush(TRANSLOG_ADDRESS lsn) /* if LSN up to which we have to flush bigger then maximum LSN of previous buffer and at least one LSN was saved in the current buffer (last_lsn != - LSN_IMPOSSIBLE) then we better finish the current buffer. + LSN_IMPOSSIBLE) then we have to close the current buffer. */ - if (cmp_translog_addr(lsn, log_descriptor.bc.buffer->prev_last_lsn) > 0 && + if (cmp_translog_addr(*lsn, log_descriptor.bc.buffer->prev_last_lsn) > 0 && log_descriptor.bc.buffer->last_lsn != LSN_IMPOSSIBLE) { struct st_translog_buffer *buffer= log_descriptor.bc.buffer; - lsn= log_descriptor.bc.buffer->last_lsn; /* fix lsn if it was horizon */ + *lsn= log_descriptor.bc.buffer->last_lsn; /* fix lsn if it was horizon */ DBUG_PRINT("info", ("LSN to flush fixed to last lsn: (%lu,0x%lx)", - LSN_IN_PARTS(log_descriptor.bc.buffer->last_lsn))); + LSN_IN_PARTS(log_descriptor.bc.buffer->last_lsn))); last_buffer_no= log_descriptor.bc.buffer_no; log_descriptor.is_everything_flushed= 1; translog_force_current_buffer_to_finish(); @@ -7676,8 +7830,10 @@ my_bool translog_flush(TRANSLOG_ADDRESS lsn) TRANSLOG_BUFFERS_NO); translog_unlock(); } - sent_to_disk= translog_get_sent_to_disk(); - if (cmp_translog_addr(lsn, sent_to_disk) > 0) + + /* flush buffers */ + *sent_to_disk= translog_get_sent_to_disk(); + if (cmp_translog_addr(*lsn, *sent_to_disk) > 0) { DBUG_PRINT("info", ("Start buffer #: %u last buffer #: %u", @@ -7697,53 +7853,238 @@ my_bool translog_flush(TRANSLOG_ADDRESS lsn) LSN_IN_PARTS(buffer->last_lsn), (buffer->file ? "dirty" : "closed"))); - if (buffer->prev_last_lsn <= lsn && + if (buffer->prev_last_lsn <= *lsn && buffer->file != NULL) { - DBUG_ASSERT(flush_horizon <= buffer->offset + buffer->size); - flush_horizon= buffer->offset + buffer->size; + DBUG_ASSERT(*flush_horizon <= buffer->offset + buffer->size); + *flush_horizon= (buffer->pre_force_close_horizon != LSN_IMPOSSIBLE ? + buffer->pre_force_close_horizon : + buffer->offset + buffer->size); + /* pre_force_close_horizon is reset during new buffer start */ + DBUG_PRINT("info", ("flush_horizon: (%lu,0x%lx)", + LSN_IN_PARTS(*flush_horizon))); + DBUG_ASSERT(*flush_horizon <= log_descriptor.horizon); + translog_buffer_flush(buffer); } translog_buffer_unlock(buffer); i= (i + 1) % TRANSLOG_BUFFERS_NO; } while (i != last_buffer_no); - sent_to_disk= translog_get_sent_to_disk(); + *sent_to_disk= translog_get_sent_to_disk(); + } + + DBUG_VOID_RETURN; +} + +/** + @brief Flush the log up to given LSN (included) + + @param lsn log record serial number up to which (inclusive) + the log has to be flushed + + @return Operation status + @retval 0 OK + @retval 1 Error + + @note + + - Non group commit logic: Commits made in passes. Thread which started + flush first is performing actual flush, other threads sets new goal (LSN) + of the next pass (if it is maximum) and waits for the pass end or just + wait for the pass end. + + - If hard group commit enabled and rate set to zero: + The first thread sends all changed buffers to disk. This is repeated + as long as there are new LSNs added. The process can not loop + forever because we have limited number of threads and they will wait + for the data to be synced. + Pseudo code: + + do + send changed buffers to disk + while new_goal + sync + + - If hard group commit switched ON and less than rate microseconds has + passed from last sync, then after buffers have been sent to disk + wait until rate microseconds has passed since last sync, do sync and return. + This ensures that if we call sync infrequently we don't do any waits. + + - If soft group commit enabled everything works as with 'non group commit' + but the thread doesn't do any real sync(). If rate is not zero the + sync() will be performed by a service thread with the given rate + when needed (new LSN appears). + + @note Terminology: + 'sent to disk' means written to disk but not sync()ed, + 'flushed' mean sent to disk and synced(). +*/ + +my_bool translog_flush(TRANSLOG_ADDRESS lsn) +{ + struct timespec abstime; + ulonglong flush_interval; + ulonglong time_spent; + LSN sent_to_disk= LSN_IMPOSSIBLE; + TRANSLOG_ADDRESS flush_horizon; + my_bool rc= 0; + my_bool hgroup_commit_at_start; + DBUG_ENTER("translog_flush"); + DBUG_PRINT("enter", ("Flush up to LSN: (%lu,0x%lx)", LSN_IN_PARTS(lsn))); + DBUG_ASSERT(translog_status == TRANSLOG_OK || + translog_status == TRANSLOG_READONLY); + LINT_INIT(sent_to_disk); + + pthread_mutex_lock(&log_descriptor.log_flush_lock); + DBUG_PRINT("info", ("Everything is flushed up to (%lu,0x%lx)", + LSN_IN_PARTS(log_descriptor.flushed))); + if (cmp_translog_addr(log_descriptor.flushed, lsn) >= 0) + { + pthread_mutex_unlock(&log_descriptor.log_flush_lock); + DBUG_RETURN(0); } + if (log_descriptor.flush_in_progress) + { + translog_lock(); + /* fix lsn if it was horizon */ + if (cmp_translog_addr(lsn, log_descriptor.bc.buffer->last_lsn) > 0) + lsn= BUFFER_MAX_LSN(log_descriptor.bc.buffer); + translog_unlock(); + translog_flush_set_new_goal_and_wait(lsn); + if (!pthread_equal(log_descriptor.max_lsn_requester, pthread_self())) + { + /* + translog_flush_wait_for_end() release log_flush_lock while is + waiting then acquire it again + */ + translog_flush_wait_for_end(lsn); + pthread_mutex_unlock(&log_descriptor.log_flush_lock); + DBUG_RETURN(0); + } + log_descriptor.next_pass_max_lsn= LSN_IMPOSSIBLE; + } + log_descriptor.flush_in_progress= 1; + flush_horizon= log_descriptor.previous_flush_horizon; + DBUG_PRINT("info", ("flush_in_progress is set, flush_horizon: (%lu,0x%lx)", + LSN_IN_PARTS(flush_horizon))); + pthread_mutex_unlock(&log_descriptor.log_flush_lock); + + hgroup_commit_at_start= hard_group_commit; + if (hgroup_commit_at_start) + flush_interval= group_commit_wait; - /* sync files from previous flush till current one */ - for (fn= LSN_FILE_NO(log_descriptor.flushed); fn <= LSN_FILE_NO(lsn); fn++) + translog_lock(); + if (log_descriptor.is_everything_flushed) { - TRANSLOG_FILE *file= get_logfile_by_number(fn); - DBUG_ASSERT(file != NULL); - if (!file->is_sync) + DBUG_PRINT("info", ("everything is flushed")); + translog_unlock(); + pthread_mutex_lock(&log_descriptor.log_flush_lock); + goto out; + } + + for (;;) + { + /* Following function flushes buffers and makes translog_unlock() */ + translog_flush_buffers(&lsn, &sent_to_disk, &flush_horizon); + + if (!hgroup_commit_at_start) + break; /* flush pass is ended */ + +retest: + /* + We do not check time here because pthread_mutex_lock rarely takes + a lot of time so we can sacrifice a bit precision to performance + (taking into account that my_micro_time() might be expensive call). + */ + if (flush_interval == 0) + break; /* flush pass is ended */ + + pthread_mutex_lock(&log_descriptor.log_flush_lock); + if (log_descriptor.next_pass_max_lsn == LSN_IMPOSSIBLE) { - if (my_sync(file->handler.file, MYF(MY_WME))) + if (flush_interval == 0 || + (time_spent= (my_micro_time() - flush_start)) >= flush_interval) { - rc= 1; - translog_stop_writing(); - sent_to_disk= LSN_IMPOSSIBLE; - goto out; + pthread_mutex_unlock(&log_descriptor.log_flush_lock); + break; } - file->is_sync= 1; + DBUG_PRINT("info", ("flush waits: %llu interval: %llu spent: %llu", + flush_interval - time_spent, + flush_interval, time_spent)); + /* wait time or next goal */ + set_timespec_nsec(abstime, flush_interval - time_spent); + pthread_cond_timedwait(&log_descriptor.new_goal_cond, + &log_descriptor.log_flush_lock, + &abstime); + pthread_mutex_unlock(&log_descriptor.log_flush_lock); + DBUG_PRINT("info", ("retest conditions")); + goto retest; } + + /* take next goal */ + lsn= log_descriptor.next_pass_max_lsn; + log_descriptor.next_pass_max_lsn= LSN_IMPOSSIBLE; + /* prevent other thread from continue */ + log_descriptor.max_lsn_requester= pthread_self(); + DBUG_PRINT("info", ("flush took next goal: (%lu,0x%lx)", + LSN_IN_PARTS(lsn))); + pthread_mutex_unlock(&log_descriptor.log_flush_lock); + + /* next flush pass */ + DBUG_PRINT("info", ("next flush pass")); + translog_lock(); } - if (sync_log_dir >= TRANSLOG_SYNC_DIR_ALWAYS && - (LSN_FILE_NO(log_descriptor.previous_flush_horizon) != - LSN_FILE_NO(flush_horizon) || - ((LSN_OFFSET(log_descriptor.previous_flush_horizon) - 1) / - TRANSLOG_PAGE_SIZE) != - ((LSN_OFFSET(flush_horizon) - 1) / TRANSLOG_PAGE_SIZE))) - rc|= sync_dir(log_descriptor.directory_fd, MYF(MY_WME | MY_IGNORE_BADFD)); + /* + sync() files from previous flush till current one + */ + if (!soft_sync || hgroup_commit_at_start) + { + if ((rc= + translog_sync_files(LSN_FILE_NO(log_descriptor.flushed), + LSN_FILE_NO(lsn), + sync_log_dir >= TRANSLOG_SYNC_DIR_ALWAYS && + (LSN_FILE_NO(log_descriptor. + previous_flush_horizon) != + LSN_FILE_NO(flush_horizon) || + (LSN_OFFSET(log_descriptor. + previous_flush_horizon) / + TRANSLOG_PAGE_SIZE) != + (LSN_OFFSET(flush_horizon) / + TRANSLOG_PAGE_SIZE))))) + { + sent_to_disk= LSN_IMPOSSIBLE; + pthread_mutex_lock(&log_descriptor.log_flush_lock); + goto out; + } + /* keep values for soft sync() and forced sync() actual */ + { + uint32 fileno= LSN_FILE_NO(lsn); + my_atomic_rwlock_wrlock(&soft_sync_rwl); + my_atomic_store32(&soft_sync_min, fileno); + my_atomic_store32(&soft_sync_max, fileno); + my_atomic_rwlock_wrunlock(&soft_sync_rwl); + } + } + else + { + my_atomic_rwlock_wrlock(&soft_sync_rwl); + my_atomic_store32(&soft_sync_max, LSN_FILE_NO(lsn)); + my_atomic_store32(&soft_need_sync, 1); + my_atomic_rwlock_wrunlock(&soft_sync_rwl); + } + + DBUG_ASSERT(flush_horizon <= log_descriptor.horizon); + + pthread_mutex_lock(&log_descriptor.log_flush_lock); log_descriptor.previous_flush_horizon= flush_horizon; out: - pthread_mutex_lock(&log_descriptor.log_flush_lock); if (sent_to_disk != LSN_IMPOSSIBLE) log_descriptor.flushed= sent_to_disk; log_descriptor.flush_in_progress= 0; log_descriptor.flush_no++; DBUG_PRINT("info", ("flush_in_progress is dropped")); - pthread_mutex_unlock(&log_descriptor.log_flush_lock);\ + pthread_mutex_unlock(&log_descriptor.log_flush_lock); pthread_cond_broadcast(&log_descriptor.log_flush_cond); DBUG_RETURN(rc); } @@ -8113,6 +8454,8 @@ LSN translog_first_theoretical_lsn() my_bool translog_purge(TRANSLOG_ADDRESS low) { uint32 last_need_file= LSN_FILE_NO(low); + uint32 min_unsync; + int soft; TRANSLOG_ADDRESS horizon= translog_get_horizon(); int rc= 0; DBUG_ENTER("translog_purge"); @@ -8120,12 +8463,26 @@ my_bool translog_purge(TRANSLOG_ADDRESS low) DBUG_ASSERT(translog_status == TRANSLOG_OK || translog_status == TRANSLOG_READONLY); + soft= soft_sync; + my_atomic_rwlock_wrlock(&soft_sync_rwl); + min_unsync= my_atomic_load32(&soft_sync_min); + my_atomic_rwlock_wrunlock(&soft_sync_rwl); + DBUG_PRINT("info", ("min_unsync: %lu", (ulong) min_unsync)); + if (soft && min_unsync < last_need_file) + { + last_need_file= min_unsync; + DBUG_PRINT("info", ("last_need_file set to %lu", (ulong)last_need_file)); + } + pthread_mutex_lock(&log_descriptor.purger_lock); + DBUG_PRINT("info", ("last_lsn_checked file: %lu:", + (ulong) log_descriptor.last_lsn_checked)); if (LSN_FILE_NO(log_descriptor.last_lsn_checked) < last_need_file) { uint32 i; uint32 min_file= translog_first_file(horizon, 1); DBUG_ASSERT(min_file != 0); /* log is already started */ + DBUG_PRINT("info", ("min_file: %lu:",(ulong) min_file)); for(i= min_file; i < last_need_file && rc == 0; i++) { LSN lsn= translog_get_file_max_lsn_stored(i); @@ -8356,6 +8713,159 @@ my_bool translog_log_debug_info(TRN *trn __attribute__((unused)), } + +/** + Sets soft sync mode + + @param mode TRUE if we need switch soft sync on else off +*/ + +void translog_soft_sync(my_bool mode) +{ + soft_sync= mode; +} + + +/** + Sets hard group commit + + @param mode TRUE if we need switch hard group commit on else off +*/ + +void translog_hard_group_commit(my_bool mode) +{ + hard_group_commit= mode; +} + + +/** + @brief forced log sync (used when we are switching modes) +*/ + +void translog_sync() +{ + uint32 max= get_current_logfile()->number; + uint32 min; + DBUG_ENTER("ma_translog_sync"); + + my_atomic_rwlock_rdlock(&soft_sync_rwl); + min= my_atomic_load32(&soft_sync_min); + my_atomic_rwlock_rdunlock(&soft_sync_rwl); + if (!min) + min= max; + + translog_sync_files(min, max, sync_log_dir >= TRANSLOG_SYNC_DIR_ALWAYS); + + DBUG_VOID_RETURN; +} + + +/** + @brief set rate for group commit + + @param interval interval to set. + + @note We use this function with additional variable because have to + restart service thread with new value which we can't make inside changing + variable routine (update_maria_group_commit_interval) +*/ + +void translog_set_group_commit_interval(uint32 interval) +{ + DBUG_ENTER("translog_set_group_commit_interval"); + group_commit_wait= interval; + DBUG_PRINT("info", ("wait: %llu", + (ulonglong)group_commit_wait)); + DBUG_VOID_RETURN; +} + + +/** + @brief syncing service thread +*/ + +static pthread_handler_t +ma_soft_sync_background( void *arg __attribute__((unused))) +{ + + my_thread_init(); + { + DBUG_ENTER("ma_soft_sync_background"); + for(;;) + { + ulonglong prev_loop= my_micro_time(); + ulonglong time, sleep; + uint32 min, max, sync_request; + my_atomic_rwlock_rdlock(&soft_sync_rwl); + min= my_atomic_load32(&soft_sync_min); + max= my_atomic_load32(&soft_sync_max); + sync_request= my_atomic_load32(&soft_need_sync); + my_atomic_store32(&soft_sync_min, max); + my_atomic_store32(&soft_need_sync, 0); + my_atomic_rwlock_rdunlock(&soft_sync_rwl); + + sleep= group_commit_wait; + if (sync_request) + translog_sync_files(min, max, FALSE); + time= my_micro_time() - prev_loop; + if (time > sleep) + sleep= 0; + else + sleep-= time; + if (my_service_thread_sleep(&soft_sync_control, sleep)) + break; + } + my_service_thread_signal_end(&soft_sync_control); + my_thread_end(); + DBUG_RETURN(0); + } +} + + +/** + @brief Starts syncing thread +*/ + +int translog_soft_sync_start(void) +{ + pthread_t th; + int res= 0; + uint32 min, max; + DBUG_ENTER("translog_soft_sync_start"); + + /* check and init variables */ + my_atomic_rwlock_rdlock(&soft_sync_rwl); + min= my_atomic_load32(&soft_sync_min); + max= my_atomic_load32(&soft_sync_max); + if (!max) + my_atomic_store32(&soft_sync_max, (max= get_current_logfile()->number)); + if (!min) + my_atomic_store32(&soft_sync_min, max); + my_atomic_store32(&soft_need_sync, 1); + my_atomic_rwlock_rdunlock(&soft_sync_rwl); + + if (!(res= ma_service_thread_control_init(&soft_sync_control))) + if (!(res= pthread_create(&th, NULL, ma_soft_sync_background, NULL))) + soft_sync_control.status= THREAD_RUNNING; + DBUG_RETURN(res); +} + + +/** + @brief Stops syncing thread +*/ + +void translog_soft_sync_end(void) +{ + DBUG_ENTER("translog_soft_sync_end"); + if (soft_sync_control.inited) + { + ma_service_thread_control_end(&soft_sync_control); + } + DBUG_VOID_RETURN; +} + + #ifdef MARIA_DUMP_LOG #include <my_getopt.h> extern void translog_example_table_init(); diff --git a/storage/maria/ma_loghandler.h b/storage/maria/ma_loghandler.h index dba6283e303..224d93fb24b 100644 --- a/storage/maria/ma_loghandler.h +++ b/storage/maria/ma_loghandler.h @@ -342,6 +342,14 @@ enum enum_translog_status TRANSLOG_SHUTDOWN /* going to shutdown the loghandler */ }; extern enum enum_translog_status translog_status; +extern ulonglong translog_syncs; /* Number of sync()s */ + +void translog_soft_sync(my_bool mode); +void translog_hard_group_commit(my_bool mode); +int translog_soft_sync_start(void); +void translog_soft_sync_end(void); +void translog_sync(); +void translog_set_group_commit_interval(uint32 interval); /* all the rest added because of recovery; should we make @@ -441,6 +449,14 @@ extern LOG_DESC log_record_type_descriptor[LOGREC_NUMBER_OF_TYPES]; typedef enum { + TRANSLOG_GCOMMIT_NONE, + TRANSLOG_GCOMMIT_HARD, + TRANSLOG_GCOMMIT_SOFT +} enum_maria_group_commit; +extern ulong maria_group_commit; +extern ulong maria_group_commit_interval; +typedef enum +{ TRANSLOG_PURGE_IMMIDIATE, TRANSLOG_PURGE_EXTERNAL, TRANSLOG_PURGE_ONDEMAND diff --git a/storage/maria/ma_page.c b/storage/maria/ma_page.c index 54361b8ee3b..acbee2a6f07 100644 --- a/storage/maria/ma_page.c +++ b/storage/maria/ma_page.c @@ -64,6 +64,15 @@ void _ma_page_setup(MARIA_PAGE *page, MARIA_HA *info, share->base.key_reflength : 0); } +#ifdef IDENTICAL_PAGES_AFTER_RECOVERY +void page_cleanup(MARIA_SHARE *share, MARIA_PAGE *page) +{ + uint length= page->size; + DBUG_ASSERT(length <= block_size - KEYPAGE_CHECKSUM_SIZE); + bzero(page->buff + length, share->block_size - length); +} +#endif + /** Fetch a key-page in memory @@ -102,8 +111,10 @@ my_bool _ma_fetch_keypage(MARIA_PAGE *page, MARIA_HA *info, if (lock != PAGECACHE_LOCK_LEFT_UNLOCKED) { - DBUG_ASSERT(lock == PAGECACHE_LOCK_WRITE); - page_link.unlock= PAGECACHE_LOCK_WRITE_UNLOCK; + DBUG_ASSERT(lock == PAGECACHE_LOCK_WRITE || PAGECACHE_LOCK_READ); + page_link.unlock= (lock == PAGECACHE_LOCK_WRITE ? + PAGECACHE_LOCK_WRITE_UNLOCK : + PAGECACHE_LOCK_READ_UNLOCK); page_link.changed= 0; push_dynamic(&info->pinned_pages, (void*) &page_link); page->link_offset= info->pinned_pages.elements-1; @@ -209,14 +220,7 @@ my_bool _ma_write_keypage(MARIA_PAGE *page, enum pagecache_page_lock lock, } #endif -#ifdef IDENTICAL_PAGES_AFTER_RECOVERY - { - uint length= page->size; - DBUG_ASSERT(length <= block_size - KEYPAGE_CHECKSUM_SIZE); - bzero(buff + length, block_size - length); - } -#endif - + page_cleanup(share, page); res= pagecache_write(share->pagecache, &share->kfile, (pgcache_page_no_t) (page->pos / block_size), diff --git a/storage/maria/ma_recovery.c b/storage/maria/ma_recovery.c index 65ad767d8ef..7b3065b0208 100644 --- a/storage/maria/ma_recovery.c +++ b/storage/maria/ma_recovery.c @@ -312,11 +312,14 @@ int maria_apply_log(LSN from_lsn, enum maria_apply_log_way apply, now= my_getsystime(); in_redo_phase= TRUE; + trnman_init(max_trid_in_control_file); if (run_redo_phase(from_lsn, apply)) { ma_message_no_user(0, "Redo phase failed"); + trnman_destroy(); goto err; } + trnman_destroy(); if ((uncommitted_trans= end_of_redo_phase(should_run_undo_phase)) == (uint)-1) diff --git a/storage/maria/ma_rkey.c b/storage/maria/ma_rkey.c index 32a0e141b1d..24b275d0ba6 100644 --- a/storage/maria/ma_rkey.c +++ b/storage/maria/ma_rkey.c @@ -83,6 +83,9 @@ int maria_rkey(MARIA_HA *info, uchar *buf, int inx, const uchar *key_data, rw_rdlock(&keyinfo->root_lock); nextflag= maria_read_vec[search_flag] | key.flag; + if (search_flag != HA_READ_KEY_EXACT || + ((keyinfo->flag & (HA_NOSAME | HA_NULL_PART)) != HA_NOSAME)) + nextflag|= SEARCH_SAVE_BUFF; switch (keyinfo->key_alg) { #ifdef HAVE_RTREE_KEYS diff --git a/storage/maria/ma_search.c b/storage/maria/ma_search.c index a50f3c49a26..d48749b1629 100644 --- a/storage/maria/ma_search.c +++ b/storage/maria/ma_search.c @@ -18,6 +18,10 @@ #include "ma_fulltext.h" #include "m_ctype.h" +static int _ma_search_no_save(register MARIA_HA *info, MARIA_KEY *key, + uint32 nextflag, register my_off_t pos, + MARIA_PINNED_PAGE **res_page_link, + uchar **res_page_buff); static my_bool _ma_get_prev_key(MARIA_KEY *key, MARIA_PAGE *ma_page, uchar *keypos); @@ -57,7 +61,51 @@ int _ma_check_index(MARIA_HA *info, int inx) */ int _ma_search(register MARIA_HA *info, MARIA_KEY *key, uint32 nextflag, - register my_off_t pos) + my_off_t pos) +{ + int error; + MARIA_PINNED_PAGE *page_link; + uchar *page_buff; + + info->page_changed= 1; /* If page not saved */ + if (!(error= _ma_search_no_save(info, key, nextflag, pos, &page_link, + &page_buff))) + { + if (nextflag & SEARCH_SAVE_BUFF) + { + bmove512(info->keyread_buff, page_buff, info->s->block_size); + + /* Save position for a possible read next / previous */ + info->int_keypos= info->keyread_buff + (ulonglong) info->int_keypos; + info->int_maxpos= info->keyread_buff + (ulonglong) info->int_maxpos; + info->int_keytree_version= key->keyinfo->version; + info->last_search_keypage= info->last_keypage; + info->page_changed= 0; + info->keyread_buff_used= 0; + } + } + _ma_unpin_all_pages(info, LSN_IMPOSSIBLE); + return (error); +} + +/** + @breif Search after row by a key + + ret_page_link Will contain pointer to page where we found key + + @note + Position to row is stored in info->lastpos + + @return + @retval 0 ok (key found) + @retval -1 Not found + @retval 1 If one should continue search on higher level +*/ + +static int _ma_search_no_save(register MARIA_HA *info, MARIA_KEY *key, + uint32 nextflag, register my_off_t pos, + MARIA_PINNED_PAGE **res_page_link, + uchar **res_page_buff) { my_bool last_key_not_used; int error,flag; @@ -66,6 +114,7 @@ int _ma_search(register MARIA_HA *info, MARIA_KEY *key, uint32 nextflag, uchar lastkey[MARIA_MAX_KEY_BUFF]; MARIA_KEYDEF *keyinfo= key->keyinfo; MARIA_PAGE page; + MARIA_PINNED_PAGE *page_link; DBUG_ENTER("_ma_search"); DBUG_PRINT("enter",("pos: %lu nextflag: %u lastpos: %lu", (ulong) pos, nextflag, (ulong) info->cur_row.lastpos)); @@ -81,10 +130,11 @@ int _ma_search(register MARIA_HA *info, MARIA_KEY *key, uint32 nextflag, } if (_ma_fetch_keypage(&page, info, keyinfo, pos, - PAGECACHE_LOCK_LEFT_UNLOCKED, - DFLT_INIT_HITS, info->keyread_buff, - test(!(nextflag & SEARCH_SAVE_BUFF)))) + PAGECACHE_LOCK_READ, DFLT_INIT_HITS, 0, 0)) goto err; + page_link= dynamic_element(&info->pinned_pages, + info->pinned_pages.elements-1, + MARIA_PINNED_PAGE*); DBUG_DUMP("page", page.buff, page.size); flag= (*keyinfo->bin_search)(key, &page, nextflag, &keypos, lastkey, @@ -98,8 +148,9 @@ int _ma_search(register MARIA_HA *info, MARIA_KEY *key, uint32 nextflag, if (flag) { - if ((error= _ma_search(info, key, nextflag, - _ma_kpos(nod_flag,keypos))) <= 0) + if ((error= _ma_search_no_save(info, key, nextflag, + _ma_kpos(nod_flag,keypos), + res_page_link, res_page_buff)) <= 0) DBUG_RETURN(error); if (flag >0) @@ -118,26 +169,15 @@ int _ma_search(register MARIA_HA *info, MARIA_KEY *key, uint32 nextflag, ((keyinfo->flag & (HA_NOSAME | HA_NULL_PART)) != HA_NOSAME || (key->flag & SEARCH_PART_KEY) || info->s->base.born_transactional)) { - if ((error= _ma_search(info, key, (nextflag | SEARCH_FIND) & - ~(SEARCH_BIGGER | SEARCH_SMALLER | SEARCH_LAST), - _ma_kpos(nod_flag,keypos))) >= 0 || + if ((error= _ma_search_no_save(info, key, (nextflag | SEARCH_FIND) & + ~(SEARCH_BIGGER | SEARCH_SMALLER | + SEARCH_LAST), + _ma_kpos(nod_flag,keypos), + res_page_link, res_page_buff)) >= 0 || my_errno != HA_ERR_KEY_NOT_FOUND) DBUG_RETURN(error); - info->last_keypage= HA_OFFSET_ERROR; /* Buffer not in mem */ } } - if (pos != info->last_keypage) - { - uchar *old_buff= page.buff; - if (_ma_fetch_keypage(&page, info, keyinfo, pos, - PAGECACHE_LOCK_LEFT_UNLOCKED,DFLT_INIT_HITS, - info->keyread_buff, - test(!(nextflag & SEARCH_SAVE_BUFF)))) - goto err; - /* Restore position if page buffer moved */ - keypos= page.buff + (keypos - old_buff); - maxpos= page.buff + (maxpos - old_buff); - } info->last_key.keyinfo= keyinfo; if ((nextflag & (SEARCH_SMALLER | SEARCH_LAST)) && flag != 0) @@ -172,16 +212,15 @@ int _ma_search(register MARIA_HA *info, MARIA_KEY *key, uint32 nextflag, } info->cur_row.lastpos= _ma_row_pos_from_key(&info->last_key); info->cur_row.trid= _ma_trid_from_key(&info->last_key); - /* Save position for a possible read next / previous */ - info->int_keypos= info->keyread_buff + (keypos - page.buff); - info->int_maxpos= info->keyread_buff + (maxpos - page.buff); - info->int_nod_flag=nod_flag; - info->int_keytree_version=keyinfo->version; - info->last_search_keypage=info->last_keypage; - info->page_changed=0; - /* Set marker that buffer was used (Marker for mi_search_next()) */ - info->keyread_buff_used= (info->keyread_buff != page.buff); + /* Store offset to key */ + info->int_keypos= (uchar*) (keypos - page.buff); + info->int_maxpos= (uchar*) (maxpos - page.buff); + info->int_nod_flag= nod_flag; + info->last_keypage= pos; + *res_page_link= page_link; + *res_page_buff= page.buff; + DBUG_PRINT("exit",("found key at %lu",(ulong) info->cur_row.lastpos)); DBUG_RETURN(0); @@ -190,7 +229,7 @@ err: info->cur_row.lastpos= HA_OFFSET_ERROR; info->page_changed=1; DBUG_RETURN (-1); -} /* _ma_search */ +} /* @@ -389,7 +428,7 @@ int _ma_prefix_search(const MARIA_KEY *key, const MARIA_PAGE *ma_page, uint length_pack; MARIA_KEYDEF *keyinfo= key->keyinfo; MARIA_SHARE *share= keyinfo->share; - uchar *sort_order= keyinfo->seg->charset->sort_order; + const uchar *sort_order= keyinfo->seg->charset->sort_order; DBUG_ENTER("_ma_prefix_search"); LINT_INIT(length); @@ -1883,7 +1922,7 @@ _ma_calc_var_pack_key_length(const MARIA_KEY *int_key, uint nod_flag, uint key_length,ref_length,org_key_length=0, length_pack,new_key_length,diff_flag,pack_marker; const uchar *key, *start, *end, *key_end; - uchar *sort_order; + const uchar *sort_order; my_bool same_length; MARIA_KEYDEF *keyinfo= int_key->keyinfo; diff --git a/storage/maria/ma_sort.c b/storage/maria/ma_sort.c index fa2cbab995a..387563ebaac 100644 --- a/storage/maria/ma_sort.c +++ b/storage/maria/ma_sort.c @@ -920,7 +920,6 @@ merge_buffers(MARIA_SORT_PARAM *info, uint keys, IO_CACHE *from_file, uchar *strpos; BUFFPEK *buffpek,**refpek; QUEUE queue; - volatile int *killed= _ma_killed_ptr(info->sort_info->param); DBUG_ENTER("merge_buffers"); count=error=0; @@ -953,10 +952,6 @@ merge_buffers(MARIA_SORT_PARAM *info, uint keys, IO_CACHE *from_file, { for (;;) { - if (*killed) - { - error=1; goto err; - } buffpek=(BUFFPEK*) queue_top(&queue); if (to_file) { @@ -976,6 +971,12 @@ merge_buffers(MARIA_SORT_PARAM *info, uint keys, IO_CACHE *from_file, buffpek->key+=sort_length; if (! --buffpek->mem_count) { + /* It's enough to check for killedptr before a slow operation */ + if (_ma_killed_ptr(info->sort_info->param)) + { + error=1; + goto err; + } if (!(error=(int) info->read_to_buffer(from_file,buffpek,sort_length))) { uchar *base= buffpek->base; diff --git a/storage/maria/ma_state.c b/storage/maria/ma_state.c index 0b7fba9f55a..d7ddc73e2b4 100644 --- a/storage/maria/ma_state.c +++ b/storage/maria/ma_state.c @@ -528,7 +528,7 @@ void _ma_remove_table_from_trnman(MARIA_SHARE *share, TRN *trn) safe_mutex_assert_owner(&share->intern_lock); - for (prev= (MARIA_USED_TABLES**) &trn->used_tables, tables= *prev; + for (prev= (MARIA_USED_TABLES**) (char*) &trn->used_tables, tables= *prev; tables; tables= *prev) { diff --git a/storage/maria/ma_test3.c b/storage/maria/ma_test3.c index 8dd631380a0..040d6fa78c2 100644 --- a/storage/maria/ma_test3.c +++ b/storage/maria/ma_test3.c @@ -180,7 +180,7 @@ void start_test(int id) if (pagecacheing && rnd(2) == 0) init_pagecache(maria_pagecache, 65536L, 0, 0, MARIA_KEY_BLOCK_LENGTH, MY_WME); - printf("Process %d, pid: %d\n",id,getpid()); fflush(stdout); + printf("Process %d, pid: %ld\n",id,(long) getpid()); fflush(stdout); for (error=i=0 ; i < tests && !error; i++) { @@ -362,7 +362,7 @@ int test_write(MARIA_HA *file,int id,int lock_type) maria_extra(file,HA_EXTRA_WRITE_CACHE,0); } - sprintf((char*) record.id,"%7d",getpid()); + sprintf((char*) record.id,"%7ld", (long) getpid()); strnmov((char*) record.text,"Testing...", sizeof(record.text)); tries=(uint) rnd(100)+10; diff --git a/storage/maria/ma_write.c b/storage/maria/ma_write.c index 20304618229..3b9ca46899f 100644 --- a/storage/maria/ma_write.c +++ b/storage/maria/ma_write.c @@ -587,6 +587,12 @@ my_bool _ma_enlarge_root(MARIA_HA *info, MARIA_KEY *key, my_off_t *root) /* Search after a position for a key and store it there + TODO: + Change this to use pagecache directly instead of creating a copy + of the page. To do this, we must however change write-key-on-page + algorithm to not overwrite the buffer but instead store any overflow + key in a separate buffer. + @return @retval -1 error @retval 0 ok diff --git a/storage/maria/maria_def.h b/storage/maria/maria_def.h index 80a0beeef0b..8b379408521 100644 --- a/storage/maria/maria_def.h +++ b/storage/maria/maria_def.h @@ -390,6 +390,7 @@ typedef struct st_maria_share my_bool now_transactional; my_bool have_versioning; my_bool key_del_used; /* != 0 if key_del is locked */ + my_bool deleting; /* we are going to delete this table */ #ifdef THREAD THR_LOCK lock; void (*lock_restore_status)(void *); @@ -982,6 +983,11 @@ extern ulonglong transid_get_packed(MARIA_SHARE *share, const uchar *from); #define page_store_info(share, page) \ _ma_store_keypage_flag((share), (page)->buff, (page)->flag); \ _ma_store_page_used((share), (page)->buff, (page)->size); +#ifdef IDENTICAL_PAGES_AFTER_RECOVERY +void page_cleanup(MARIA_SHARE *share, MARIA_PAGE *page) +#else +#define page_cleanup(A,B) while (0) +#endif extern MARIA_KEY *_ma_make_key(MARIA_HA *info, MARIA_KEY *int_key, uint keynr, uchar *key, const uchar *record, @@ -1164,7 +1170,7 @@ int _ma_flush_table_files(MARIA_HA *info, uint flush_data_or_index, Functions needed by _ma_check (are overridden in MySQL/ha_maria.cc). See ma_check_standalone.h . */ -volatile int *_ma_killed_ptr(HA_CHECK *param); +int _ma_killed_ptr(HA_CHECK *param); void _ma_check_print_error _VARARGS((HA_CHECK *param, const char *fmt, ...)) ATTRIBUTE_FORMAT(printf, 2, 3); void _ma_check_print_warning _VARARGS((HA_CHECK *param, const char *fmt, ...)) @@ -1200,7 +1206,7 @@ void _ma_tmp_disable_logging_for_table(MARIA_HA *info, my_bool log_incomplete); my_bool _ma_reenable_logging_for_table(MARIA_HA *info, my_bool flush_pages); my_bool write_log_record_for_bulk_insert(MARIA_HA *info); - +void _ma_unpin_all_pages(MARIA_HA *info, LSN undo_lsn); #define MARIA_NO_CRC_NORMAL_PAGE 0xffffffff #define MARIA_NO_CRC_BITMAP_PAGE 0xfffffffe diff --git a/storage/maria/maria_ftdump.c b/storage/maria/maria_ftdump.c index 5e3b47b956e..8b545e6e9af 100644 --- a/storage/maria/maria_ftdump.c +++ b/storage/maria/maria_ftdump.c @@ -53,7 +53,7 @@ static struct my_option my_long_options[] = int main(int argc,char *argv[]) { - int error=0, subkeys; + int error=0; uint keylen, keylen2=0, inx, doc_cnt=0; float weight= 1.0; double gws, min_gws=0, avg_gws=0; @@ -112,11 +112,12 @@ int main(int argc,char *argv[]) while (!(error=maria_rnext(info,NULL,inx))) { + FT_WEIGTH subkeys; keylen=*(info->lastkey_buff); - subkeys=ft_sintXkorr(info->lastkey_buff + keylen + 1); - if (subkeys >= 0) - weight=*(float*)&subkeys; + subkeys.i= ft_sintXkorr(info->lastkey_buff + keylen + 1); + if (subkeys.i >= 0) + weight= subkeys.f; #ifdef HAVE_SNPRINTF snprintf(buf,MAX_LEN,"%.*s",(int) keylen,info->lastkey_buff+1); @@ -153,14 +154,15 @@ int main(int argc,char *argv[]) keylen2=keylen; doc_cnt=0; } - doc_cnt+= (subkeys >= 0 ? 1 : -subkeys); + doc_cnt+= (subkeys.i >= 0 ? 1 : -subkeys.i); } if (dump) { - if (subkeys>=0) + if (subkeys.i >= 0) printf("%9lx %20.7f %s\n", (long) info->cur_row.lastpos,weight,buf); else - printf("%9lx => %17d %s\n",(long) info->cur_row.lastpos,-subkeys,buf); + printf("%9lx => %17d %s\n",(long) info->cur_row.lastpos,-subkeys.i, + buf); } if (verbose && (total%HOW_OFTEN_TO_WRITE)==0) printf("%10ld\r",total); diff --git a/storage/maria/trnman.c b/storage/maria/trnman.c index 43fac68806f..ceb8ad2ae2d 100644 --- a/storage/maria/trnman.c +++ b/storage/maria/trnman.c @@ -300,8 +300,8 @@ TRN *trnman_new_trn(WT_THD *wt) (ABA isn't possible, we're behind a mutex */ my_atomic_rwlock_wrlock(&LOCK_pool); - while (tmp.trn && !my_atomic_casptr((void **)&pool, &tmp.v, - (void *)tmp.trn->next)) + while (tmp.trn && !my_atomic_casptr((void **)(char*) &pool, &tmp.v, + (void *)tmp.trn->next)) /* no-op */; my_atomic_rwlock_wrunlock(&LOCK_pool); @@ -545,7 +545,7 @@ static void trnman_free_trn(TRN *trn) down after the loop at -O2 */ *(TRN * volatile *)&(trn->next)= tmp.trn; - } while (!my_atomic_casptr((void **)&pool, &tmp.v, trn)); + } while (!my_atomic_casptr((void **)(char*)&pool, &tmp.v, trn)); my_atomic_rwlock_wrunlock(&LOCK_pool); } |