diff options
author | unknown <guilhem@gbichot4.local> | 2007-12-31 12:52:45 +0100 |
---|---|---|
committer | unknown <guilhem@gbichot4.local> | 2007-12-31 12:52:45 +0100 |
commit | 4902e80471e998e26bf5ea3b5f57cf6d79c2471b (patch) | |
tree | 684b76112ab0e633870f05b86d62746af20a0019 /storage/maria/ma_recovery.c | |
parent | 75a8907ee0d0bf83c396bd0bd6e3bbe3ff523a00 (diff) | |
parent | 18bc7b695af8b553e6b8151ce7ce2e2d8755881d (diff) | |
download | mariadb-git-4902e80471e998e26bf5ea3b5f57cf6d79c2471b.tar.gz |
merge
Diffstat (limited to 'storage/maria/ma_recovery.c')
-rw-r--r-- | storage/maria/ma_recovery.c | 163 |
1 files changed, 115 insertions, 48 deletions
diff --git a/storage/maria/ma_recovery.c b/storage/maria/ma_recovery.c index 4444f73b49f..2ac708246e2 100644 --- a/storage/maria/ma_recovery.c +++ b/storage/maria/ma_recovery.c @@ -40,15 +40,18 @@ struct st_dirty_page /* used only in the REDO phase */ struct st_table_for_recovery /* used in the REDO and UNDO phase */ { MARIA_HA *info; - File org_kfile, org_dfile; /**< OS descriptors when Checkpoint saw table */ }; /* Variables used by all functions of this module. Ok as single-threaded */ static struct st_trn_for_recovery *all_active_trans; static struct st_table_for_recovery *all_tables; static HASH all_dirty_pages; static struct st_dirty_page *dirty_pages_pool; -static LSN current_group_end_lsn, - checkpoint_start= LSN_IMPOSSIBLE; +static LSN current_group_end_lsn; +/* + LSN after which dirty pages list does not apply. Can be slightly before + when ma_checkpoint_execute() started. +*/ +static LSN checkpoint_start= LSN_IMPOSSIBLE; #ifndef DBUG_OFF /** Current group of REDOs is about this table and only this one */ static MARIA_HA *current_group_table; @@ -58,6 +61,7 @@ static FILE *tracef; /**< trace file for debugging */ static my_bool skip_DDLs; /**< if REDO phase should skip DDL records */ /** @brief to avoid writing a checkpoint if recovery did nothing. */ static my_bool checkpoint_useful; +/** @todo looks like duplicate of recovery_message_printed */ static my_bool procent_printed; static ulonglong now; /**< for tracking execution time of phases */ static int (*save_error_handler_hook)(uint, const char *,myf); @@ -124,10 +128,8 @@ static void prepare_table_for_close(MARIA_HA *info, TRANSLOG_ADDRESS horizon); static LSN parse_checkpoint_record(LSN lsn); static void new_transaction(uint16 sid, TrID long_id, LSN undo_lsn, LSN first_undo_lsn); -static int new_table(uint16 sid, const char *name, - File org_kfile, File org_dfile, - LSN lsn_of_file_id); -static int new_page(File fileid, pgcache_page_no_t pageid, LSN rec_lsn, +static int new_table(uint16 sid, const char *name, LSN lsn_of_file_id); +static int new_page(uint32 fileid, pgcache_page_no_t pageid, LSN rec_lsn, struct st_dirty_page *dirty_page); static int close_all_tables(void); static my_bool close_one_table(const char *name, TRANSLOG_ADDRESS addr); @@ -136,6 +138,10 @@ static void print_redo_phase_progress(TRANSLOG_ADDRESS addr); /** @brief global [out] buffer for translog_read_record(); never shrinks */ static struct { + /* + uchar* is more adapted (less casts) than char*, thus we don't use + LEX_STRING. + */ uchar *str; size_t length; } log_record_buffer; @@ -1158,7 +1164,7 @@ prototype_redo_exec_hook(FILE_ID) all_tables[sid].info= NULL; } name= (char *)log_record_buffer.str + FILEID_STORE_SIZE; - if (new_table(sid, name, -1, -1, rec->lsn)) + if (new_table(sid, name, rec->lsn)) goto end; error= 0; end: @@ -1166,9 +1172,7 @@ end: } -static int new_table(uint16 sid, const char *name, - File org_kfile, File org_dfile, - LSN lsn_of_file_id) +static int new_table(uint16 sid, const char *name, LSN lsn_of_file_id) { /* -1 (skip table): close table and return 0; @@ -1201,12 +1205,6 @@ static int new_table(uint16 sid, const char *name, error= 0; goto end; } - if (maria_is_crashed(info)) - { - /** @todo what should we do? how to continue recovery? */ - tprint(tracef, "Table is crashed, can't apply log records to it\n"); - goto end; - } share= info->s; /* check that we're not already using it */ if (share->reopen != 1) @@ -1235,6 +1233,16 @@ static int new_table(uint16 sid, const char *name, LSN_IN_PARTS(lsn_of_file_id)); error= -1; goto end; + /* + Note that we tested that before testing corruption; a recent corrupted + table is not a blocker for the present log record. + */ + } + if (maria_is_crashed(info)) + { + /** @todo what should we do? how to continue recovery? */ + tprint(tracef, "Table is crashed, can't apply log records to it\n"); + goto end; } /* don't log any records for this work */ _ma_tmp_disable_logging_for_table(info, FALSE); @@ -1276,8 +1284,6 @@ static int new_table(uint16 sid, const char *name, */ info->s->lsn_of_file_id= lsn_of_file_id; all_tables[sid].info= info; - all_tables[sid].org_kfile= org_kfile; - all_tables[sid].org_dfile= org_dfile; /* We don't set info->s->id, it would be useless (no logging in REDO phase); if you change that, know that some records in REDO phase call @@ -1588,10 +1594,17 @@ prototype_redo_exec_hook(UNDO_ROW_INSERT) MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec); MARIA_SHARE *share; + set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn); if (info == NULL) + { + /* + Note that we set undo_lsn anyway. So that if the transaction is later + rolled back, this UNDO is tried for execution and we get an error (as it + would then be abnormal that info==NULL). + */ return 0; + } share= info->s; - set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn); if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0) { tprint(tracef, " state has LSN (%lu,0x%lx) older than record, updating" @@ -1625,10 +1638,10 @@ prototype_redo_exec_hook(UNDO_ROW_DELETE) MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec); MARIA_SHARE *share; + set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn); if (info == NULL) return 0; share= info->s; - set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn); if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0) { tprint(tracef, " state older than record\n"); @@ -1661,10 +1674,11 @@ prototype_redo_exec_hook(UNDO_ROW_UPDATE) { MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec); MARIA_SHARE *share; + + set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn); if (info == NULL) return 0; share= info->s; - set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn); if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0) { if (share->calc_checksum) @@ -1692,10 +1706,11 @@ prototype_redo_exec_hook(UNDO_KEY_INSERT) { MARIA_HA *info; MARIA_SHARE *share; + + set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn); if (!(info= get_MARIA_HA_from_UNDO_record(rec))) return 0; share= info->s; - set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn); if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0) { const uchar *ptr= rec->header + LSN_STORE_SIZE + FILEID_STORE_SIZE; @@ -1746,9 +1761,10 @@ prototype_redo_exec_hook(UNDO_KEY_INSERT) prototype_redo_exec_hook(UNDO_KEY_DELETE) { MARIA_HA *info; + + set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn); if (!(info= get_MARIA_HA_from_UNDO_record(rec))) return 0; - set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn); _ma_unpin_all_pages(info, rec->lsn); return 0; } @@ -1758,10 +1774,11 @@ prototype_redo_exec_hook(UNDO_KEY_DELETE_WITH_ROOT) { MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec); MARIA_SHARE *share; + + set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn); if (info == NULL) return 0; share= info->s; - set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn); if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0) { uint key_nr; @@ -1816,15 +1833,15 @@ prototype_redo_exec_hook(CLR_END) uchar *logpos; DBUG_ENTER("exec_REDO_LOGREC_CLR_END"); - if (info == NULL) - DBUG_RETURN(0); - share= info->s; previous_undo_lsn= lsn_korr(rec->header); undone_record_type= clr_type_korr(rec->header + LSN_STORE_SIZE + FILEID_STORE_SIZE); log_desc= &log_record_type_descriptor[undone_record_type]; set_undo_lsn_for_active_trans(rec->short_trid, previous_undo_lsn); + if (info == NULL) + DBUG_RETURN(0); + share= info->s; tprint(tracef, " CLR_END was about %s, undo_lsn now LSN (%lu,0x%lx)\n", log_desc->name, LSN_IN_PARTS(previous_undo_lsn)); @@ -2558,7 +2575,7 @@ static void prepare_table_for_close(MARIA_HA *info, TRANSLOG_ADDRESS horizon) share->state.is_of_horizon= horizon; _ma_state_info_write_sub(share->kfile.file, &share->state, 1); } - _ma_reenable_logging_for_table(share); + _ma_reenable_logging_for_table(info); info->trn= NULL; /* safety */ } @@ -2624,12 +2641,19 @@ static MARIA_HA *get_MARIA_HA_from_REDO_record(const DBUG_ASSERT(info->s->last_version != 0); if (cmp_translog_addr(rec->lsn, checkpoint_start) < 0) { + /* + 64-bit key is formed like this: + Most significant byte: 0 + Next byte: 0 if data page, 1 if index page + Next 2 bytes: table's short id + Next 4 bytes: page number + */ uint64 file_and_page_id= - (((uint64) (index_page_redo_entry ? all_tables[sid].org_kfile : - all_tables[sid].org_dfile)) << 32) | page; + (((uint64)((index_page_redo_entry << 16) | sid)) << 32) | page; struct st_dirty_page *dirty_page= (struct st_dirty_page *) hash_search(&all_dirty_pages, (uchar *)&file_and_page_id, sizeof(file_and_page_id)); + DBUG_PRINT("info", ("in dirty pages list: %d", dirty_page != NULL)); if ((dirty_page == NULL) || cmp_translog_addr(rec->lsn, dirty_page->rec_lsn) < 0) { @@ -2736,7 +2760,8 @@ static LSN parse_checkpoint_record(LSN lsn) /* how much brain juice and discussions there was to come to writing this - line + line. It may make start_address slightly decrease (only by the time it + takes to write one or a few rows, roughly). */ set_if_smaller(start_address, minimum_rec_lsn_of_active_transactions); @@ -2769,22 +2794,17 @@ static LSN parse_checkpoint_record(LSN lsn) for (i= 0; i< nb_tables; i++) { char name[FN_REFLEN]; - File kfile, dfile; LSN first_log_write_lsn; uint name_len; uint16 sid= uint2korr(ptr); ptr+= 2; DBUG_ASSERT(sid > 0); - kfile= uint4korr(ptr); - ptr+= 4; - dfile= uint4korr(ptr); - ptr+= 4; first_log_write_lsn= lsn_korr(ptr); ptr+= LSN_STORE_SIZE; name_len= strlen((char *)ptr) + 1; strmake(name, (char *)ptr, sizeof(name)-1); ptr+= name_len; - if (new_table(sid, name, kfile, dfile, first_log_write_lsn)) + if (new_table(sid, name, first_log_write_lsn)) return LSN_ERROR; } @@ -2807,15 +2827,18 @@ static LSN parse_checkpoint_record(LSN lsn) minimum_rec_lsn_of_dirty_pages= LSN_MAX; for (i= 0; i < nb_dirty_pages ; i++) { - pgcache_page_no_t pageid; + pgcache_page_no_t page_id; LSN rec_lsn; - File fileid= uint4korr(ptr); - ptr+= 4; - pageid= uint4korr(ptr); + uint16 table_id= uint2korr(ptr); + ptr+= 2; + uint32 is_index= ptr[0]; + ptr++; + page_id= uint4korr(ptr); ptr+= 4; rec_lsn= lsn_korr(ptr); ptr+= LSN_STORE_SIZE; - if (new_page(fileid, pageid, rec_lsn, next_dirty_page_in_pool++)) + if (new_page((is_index << 16) | table_id, + page_id, rec_lsn, next_dirty_page_in_pool++)) return LSN_ERROR; set_if_smaller(minimum_rec_lsn_of_dirty_pages, rec_lsn); } @@ -2829,11 +2852,11 @@ static LSN parse_checkpoint_record(LSN lsn) eprint(tracef, "checkpoint record corrupted\n"); return LSN_ERROR; } - set_if_smaller(start_address, minimum_rec_lsn_of_dirty_pages); /* + start_address is now from where the dirty pages list can be ignored. Find LSN higher or equal to this TRANSLOG_ADDRESS, suitable for - translog_read_record() functions + translog_read_record() functions. */ checkpoint_start= translog_next_LSN(start_address, LSN_IMPOSSIBLE); if (checkpoint_start == LSN_IMPOSSIBLE) @@ -2844,10 +2867,16 @@ static LSN parse_checkpoint_record(LSN lsn) */ return LSN_ERROR; } - return checkpoint_start; + /* now, where the REDO phase should start reading log: */ + set_if_smaller(start_address, minimum_rec_lsn_of_dirty_pages); + DBUG_PRINT("info", + ("checkpoint_start: (%lu,0x%lx) start_address: (%lu,0x%lx)", + LSN_IN_PARTS(checkpoint_start), LSN_IN_PARTS(start_address))); + return start_address; } -static int new_page(File fileid, pgcache_page_no_t pageid, LSN rec_lsn, + +static int new_page(uint32 fileid, pgcache_page_no_t pageid, LSN rec_lsn, struct st_dirty_page *dirty_page) { /* serves as hash key */ @@ -2953,6 +2982,7 @@ static my_bool close_one_table(const char *name, TRANSLOG_ADDRESS addr) @note for example in the REDO phase we disable logging but that does not make the log incomplete. */ + void _ma_tmp_disable_logging_for_table(MARIA_HA *info, my_bool log_incomplete) { @@ -2965,15 +2995,52 @@ void _ma_tmp_disable_logging_for_table(MARIA_HA *info, log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data; log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data); translog_write_record(&lsn, LOGREC_INCOMPLETE_LOG, - info->trn, info, sizeof(log_data), + &dummy_transaction_object, info, sizeof(log_data), TRANSLOG_INTERNAL_PARTS + 1, log_array, log_data, NULL); } /* if we disabled before writing the record, record wouldn't reach log */ share->now_transactional= FALSE; + /* + Some code in ma_blockrec.c assumes a trn. + info->trn in some cases can be not NULL and not dummy_transaction_object + when arriving here, but overwriting it does not leak as it is still + remembered in THD_TRN. + */ + info->trn= &dummy_transaction_object; share->page_type= PAGECACHE_PLAIN_PAGE; + /* Functions below will pick up now_transactional and change callbacks */ + set_data_pagecache_callbacks(&info->dfile, share); + set_index_pagecache_callbacks(&share->kfile, share); + _ma_bitmap_set_pagecache_callbacks(&share->bitmap.file, share); +} + + +/** + Re-enables logging for a table which had it temporarily disabled. + + @param info table +*/ + +void _ma_reenable_logging_for_table(MARIA_HA *info) +{ + MARIA_SHARE *share= info->s; + if ((share->now_transactional= share->base.born_transactional)) + { + /* + The change below does NOT affect pages already in the page cache, so you + should have flushed them out already, or write a pagecache function to + change their type. + */ + share->page_type= PAGECACHE_LSN_PAGE; + info->trn= NULL; /* safety */ + } + set_data_pagecache_callbacks(&info->dfile, share); + set_index_pagecache_callbacks(&share->kfile, share); + _ma_bitmap_set_pagecache_callbacks(&share->bitmap.file, share); } + static void print_redo_phase_progress(TRANSLOG_ADDRESS addr) { static int end_logno= FILENO_IMPOSSIBLE, end_offset, percentage_printed= 0; |