diff options
Diffstat (limited to 'storage/maria/ma_recovery.c')
-rw-r--r-- | storage/maria/ma_recovery.c | 3159 |
1 files changed, 3159 insertions, 0 deletions
diff --git a/storage/maria/ma_recovery.c b/storage/maria/ma_recovery.c new file mode 100644 index 00000000000..bc594127296 --- /dev/null +++ b/storage/maria/ma_recovery.c @@ -0,0 +1,3159 @@ +/* Copyright (C) 2006, 2007 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +/* + WL#3072 Maria recovery + First version written by Guilhem Bichot on 2006-04-27. +*/ + +/* Here is the implementation of this module */ + +#include "maria_def.h" +#include "ma_recovery.h" +#include "ma_blockrec.h" +#include "ma_checkpoint.h" +#include "trnman.h" +#include "ma_key_recover.h" + +struct st_trn_for_recovery /* used only in the REDO phase */ +{ + LSN group_start_lsn, undo_lsn, first_undo_lsn; + TrID long_trid; +}; +struct st_dirty_page /* used only in the REDO phase */ +{ + uint64 file_and_page_id; + LSN rec_lsn; +}; +struct st_table_for_recovery /* used in the REDO and UNDO phase */ +{ + MARIA_HA *info; + File org_kfile, org_dfile; /**< OS descriptors when Checkpoint saw table */ +}; +/* Variables used by all functions of this module. Ok as single-threaded */ +static struct st_trn_for_recovery *all_active_trans; +static struct st_table_for_recovery *all_tables; +static HASH all_dirty_pages; +static struct st_dirty_page *dirty_pages_pool; +static LSN current_group_end_lsn, + checkpoint_start= LSN_IMPOSSIBLE; +#ifndef DBUG_OFF +/** Current group of REDOs is about this table and only this one */ +static MARIA_HA *current_group_table; +#endif +static TrID max_long_trid= 0; /**< max long trid seen by REDO phase */ +static FILE *tracef; /**< trace file for debugging */ +static my_bool skip_DDLs; /**< if REDO phase should skip DDL records */ +/** @brief to avoid writing a checkpoint if recovery did nothing. */ +static my_bool checkpoint_useful; +static my_bool procent_printed; +static ulonglong now; /**< for tracking execution time of phases */ +uint warnings; /**< count of warnings */ + +#define prototype_redo_exec_hook(R) \ + static int exec_REDO_LOGREC_ ## R(const TRANSLOG_HEADER_BUFFER *rec) + +#define prototype_redo_exec_hook_dummy(R) \ + static int exec_REDO_LOGREC_ ## R(const TRANSLOG_HEADER_BUFFER *rec \ + __attribute__ ((unused))) + +#define prototype_undo_exec_hook(R) \ + static int exec_UNDO_LOGREC_ ## R(const TRANSLOG_HEADER_BUFFER *rec, TRN *trn) + +prototype_redo_exec_hook(LONG_TRANSACTION_ID); +prototype_redo_exec_hook_dummy(CHECKPOINT); +prototype_redo_exec_hook(REDO_CREATE_TABLE); +prototype_redo_exec_hook(REDO_RENAME_TABLE); +prototype_redo_exec_hook(REDO_REPAIR_TABLE); +prototype_redo_exec_hook(REDO_DROP_TABLE); +prototype_redo_exec_hook(FILE_ID); +prototype_redo_exec_hook(INCOMPLETE_LOG); +prototype_redo_exec_hook_dummy(INCOMPLETE_GROUP); +prototype_redo_exec_hook(REDO_INSERT_ROW_HEAD); +prototype_redo_exec_hook(REDO_INSERT_ROW_TAIL); +prototype_redo_exec_hook(REDO_INSERT_ROW_BLOBS); +prototype_redo_exec_hook(REDO_PURGE_ROW_HEAD); +prototype_redo_exec_hook(REDO_PURGE_ROW_TAIL); +prototype_redo_exec_hook(REDO_FREE_HEAD_OR_TAIL); +prototype_redo_exec_hook(REDO_FREE_BLOCKS); +prototype_redo_exec_hook(REDO_DELETE_ALL); +prototype_redo_exec_hook(REDO_INDEX); +prototype_redo_exec_hook(REDO_INDEX_NEW_PAGE); +prototype_redo_exec_hook(REDO_INDEX_FREE_PAGE); +prototype_redo_exec_hook(UNDO_ROW_INSERT); +prototype_redo_exec_hook(UNDO_ROW_DELETE); +prototype_redo_exec_hook(UNDO_ROW_UPDATE); +prototype_redo_exec_hook(UNDO_KEY_INSERT); +prototype_redo_exec_hook(UNDO_KEY_DELETE); +prototype_redo_exec_hook(UNDO_KEY_DELETE_WITH_ROOT); +prototype_redo_exec_hook(COMMIT); +prototype_redo_exec_hook(CLR_END); +prototype_undo_exec_hook(UNDO_ROW_INSERT); +prototype_undo_exec_hook(UNDO_ROW_DELETE); +prototype_undo_exec_hook(UNDO_ROW_UPDATE); +prototype_undo_exec_hook(UNDO_KEY_INSERT); +prototype_undo_exec_hook(UNDO_KEY_DELETE); +prototype_undo_exec_hook(UNDO_KEY_DELETE_WITH_ROOT); + +static int run_redo_phase(LSN lsn, enum maria_apply_log_way apply); +static uint end_of_redo_phase(my_bool prepare_for_undo_phase); +static int run_undo_phase(uint uncommitted); +static void display_record_position(const LOG_DESC *log_desc, + const TRANSLOG_HEADER_BUFFER *rec, + uint number); +static int display_and_apply_record(const LOG_DESC *log_desc, + const TRANSLOG_HEADER_BUFFER *rec); +static MARIA_HA *get_MARIA_HA_from_REDO_record(const + TRANSLOG_HEADER_BUFFER *rec); +static MARIA_HA *get_MARIA_HA_from_UNDO_record(const + TRANSLOG_HEADER_BUFFER *rec); +static void prepare_table_for_close(MARIA_HA *info, TRANSLOG_ADDRESS horizon); +static LSN parse_checkpoint_record(LSN lsn); +static void new_transaction(uint16 sid, TrID long_id, LSN undo_lsn, + LSN first_undo_lsn); +static int new_table(uint16 sid, const char *name, + File org_kfile, File org_dfile, + LSN lsn_of_file_id); +static int new_page(File fileid, pgcache_page_no_t pageid, LSN rec_lsn, + struct st_dirty_page *dirty_page); +static int close_all_tables(void); +static my_bool close_one_table(const char *name, TRANSLOG_ADDRESS addr); +static void print_redo_phase_progress(TRANSLOG_ADDRESS addr); + +/** @brief global [out] buffer for translog_read_record(); never shrinks */ +static struct +{ + uchar *str; + size_t length; +} log_record_buffer; +static void enlarge_buffer(const TRANSLOG_HEADER_BUFFER *rec) +{ + if (log_record_buffer.length < rec->record_length) + { + log_record_buffer.length= rec->record_length; + log_record_buffer.str= my_realloc(log_record_buffer.str, + rec->record_length, + MYF(MY_WME | MY_ALLOW_ZERO_PTR)); + } +} +/** @brief Tells what kind of progress message was printed to the error log */ +static enum recovery_message_type +{ + REC_MSG_NONE= 0, REC_MSG_REDO, REC_MSG_UNDO, REC_MSG_FLUSH +} recovery_message_printed; +/** @brief Prints to a trace file if it is not NULL */ +void tprint(FILE *trace_file, const char *format, ...) + ATTRIBUTE_FORMAT(printf, 2, 3); +void tprint(FILE *trace_file __attribute__ ((unused)), + const char *format __attribute__ ((unused)), ...) +{ + va_list args; + va_start(args, format); + DBUG_PRINT("info", ("%s", format)); + if (trace_file != NULL) + { + if (procent_printed) + { + procent_printed= 0; + fputc('\n', trace_file ? trace_file : stderr); + } + vfprintf(trace_file, format, args); + } + va_end(args); +} + +void eprint(FILE *trace_file, const char *format, ...) + ATTRIBUTE_FORMAT(printf, 2, 3); + +void eprint(FILE *trace_file __attribute__ ((unused)), + const char *format __attribute__ ((unused)), ...) +{ + va_list args; + va_start(args, format); + DBUG_PRINT("error", ("%s", format)); + if (procent_printed) + { + /* In silent mode, print on another line than the 0% 10% 20% line */ + procent_printed= 0; + fputc('\n', trace_file ? trace_file : stderr); + } + vfprintf(trace_file ? trace_file : stderr, format, args); + va_end(args); +} + + +#define ALERT_USER() DBUG_ASSERT(0) + +static void print_preamble() +{ + ma_message_no_user(ME_JUST_INFO, "starting recovery"); +} + + +/** + @brief Recovers from the last checkpoint. + + Runs the REDO phase using special structures, then sets up the playground + of runtime: recreates transactions inside trnman, open tables with their + two-byte-id mapping; takes a checkpoint and runs the UNDO phase. Closes all + tables. + + @return Operation status + @retval 0 OK + @retval !=0 Error +*/ + +int maria_recover(void) +{ + int res= 1; + FILE *trace_file; + uint warnings_count; + DBUG_ENTER("maria_recover"); + + DBUG_ASSERT(!maria_in_recovery); + maria_in_recovery= TRUE; + +#ifdef EXTRA_DEBUG + trace_file= fopen("maria_recovery.trace", "a+"); +#else + trace_file= NULL; /* no trace file for being fast */ +#endif + tprint(trace_file, "TRACE of the last MARIA recovery from mysqld\n"); + DBUG_ASSERT(maria_pagecache->inited); + res= maria_apply_log(LSN_IMPOSSIBLE, MARIA_LOG_APPLY, trace_file, + TRUE, TRUE, TRUE, &warnings_count); + if (!res) + { + if (warnings_count == 0) + tprint(trace_file, "SUCCESS\n"); + else + { + tprint(trace_file, "DOUBTFUL (%u warnings, check previous output)\n", + warnings_count); + /* + We asked for execution of UNDOs, and skipped DDLs, so shouldn't get + any warnings. + */ + DBUG_ASSERT(0); + } + } + if (trace_file) + fclose(trace_file); + maria_in_recovery= FALSE; + DBUG_RETURN(res); +} + + +/** + @brief Displays and/or applies the log + + @param from_lsn LSN from which log reading/applying should start; + LSN_IMPOSSIBLE means "use last checkpoint" + @param apply how log records should be applied or not + @param trace_file trace file where progress/debug messages will go + @param skip_DDLs_arg Should DDL records (CREATE/RENAME/DROP/REPAIR) + be skipped by the REDO phase or not + @param take_checkpoints Should we take checkpoints or not. + @param[out] warnings_count Count of warnings will be put there + + @todo This trace_file thing is primitive; soon we will make it similar to + ma_check_print_warning() etc, and a successful recovery does not need to + create a trace file. But for debugging now it is useful. + + @return Operation status + @retval 0 OK + @retval !=0 Error +*/ + +int maria_apply_log(LSN from_lsn, enum maria_apply_log_way apply, + FILE *trace_file, + my_bool should_run_undo_phase, my_bool skip_DDLs_arg, + my_bool take_checkpoints, uint *warnings_count) +{ + int error= 0; + uint uncommitted_trans; + ulonglong old_now; + DBUG_ENTER("maria_apply_log"); + + DBUG_ASSERT(apply == MARIA_LOG_APPLY || !should_run_undo_phase); + DBUG_ASSERT(!maria_multi_threaded); + warnings= 0; + /* checkpoints can happen only if TRNs have been built */ + DBUG_ASSERT(should_run_undo_phase || !take_checkpoints); + all_active_trans= (struct st_trn_for_recovery *) + my_malloc((SHORT_TRID_MAX + 1) * sizeof(struct st_trn_for_recovery), + MYF(MY_ZEROFILL)); + all_tables= (struct st_table_for_recovery *) + my_malloc((SHARE_ID_MAX + 1) * sizeof(struct st_table_for_recovery), + MYF(MY_ZEROFILL)); + if (!all_active_trans || !all_tables) + goto err; + + if (take_checkpoints && ma_checkpoint_init(0)) + goto err; + + recovery_message_printed= REC_MSG_NONE; + tracef= trace_file; + skip_DDLs= skip_DDLs_arg; + + if (from_lsn == LSN_IMPOSSIBLE) + { + if (last_checkpoint_lsn == LSN_IMPOSSIBLE) + { + from_lsn= translog_first_theoretical_lsn(); + /* + as far as we have not yet any checkpoint then the very first + log file should be present. + */ + if (unlikely((from_lsn == LSN_IMPOSSIBLE) || + (from_lsn == LSN_ERROR))) + goto err; + } + else + { + from_lsn= parse_checkpoint_record(last_checkpoint_lsn); + if (from_lsn == LSN_ERROR) + goto err; + } + } + + now= my_getsystime(); + if (run_redo_phase(from_lsn, apply)) + { + ma_message_no_user(0, "Redo phase failed"); + goto err; + } + + if ((uncommitted_trans= + end_of_redo_phase(should_run_undo_phase)) == (uint)-1) + { + ma_message_no_user(0, "End of redo phase failed"); + goto err; + } + + old_now= now; + now= my_getsystime(); + if (recovery_message_printed == REC_MSG_REDO) + { + float phase_took= (now - old_now)/10000000.0; + /* + Detailed progress info goes to stderr, because ma_message_no_user() + cannot put several messages on one line. + */ + procent_printed= 1; + fprintf(stderr, " (%.1f seconds); ", phase_took); + } + + /** + REDO phase does not fill blocks' rec_lsn, so a checkpoint now would be + wrong: if a future recovery used it, the REDO phase would always + start from the checkpoint and never from before, wrongly skipping REDOs + (tested). Another problem is that the REDO phase uses + PAGECACHE_PLAIN_PAGE, while Checkpoint only collects PAGECACHE_LSN_PAGE. + + @todo fix this. pagecache_write() now can have a rec_lsn argument. And we + could make a function which goes through pages at end of REDO phase and + changes their type. + */ +#ifdef FIX_AND_ENABLE_LATER + if (take_checkpoints && checkpoint_useful) + { + /* + We take a checkpoint as it can save future recovery work if we crash + during the UNDO phase. But we don't flush pages, as UNDOs will change + them again probably. + */ + if (ma_checkpoint_execute(CHECKPOINT_INDIRECT, FALSE)) + goto err; + } +#endif + + if (should_run_undo_phase) + { + if (run_undo_phase(uncommitted_trans)) + { + ma_message_no_user(0, "Undo phase failed"); + goto err; + } + } + else if (uncommitted_trans > 0) + { + tprint(tracef, "***WARNING: %u uncommitted transactions; some tables may" + " be left inconsistent!***\n", uncommitted_trans); + warnings++; + } + + old_now= now; + now= my_getsystime(); + if (recovery_message_printed == REC_MSG_UNDO) + { + float phase_took= (now - old_now)/10000000.0; + procent_printed= 1; + fprintf(stderr, " (%.1f seconds); ", phase_took); + } + + /* + we don't use maria_panic() because it would maria_end(), and Recovery does + not want that (we want to keep some modules initialized for runtime). + */ + if (close_all_tables()) + { + ma_message_no_user(0, "closing of tables failed"); + goto err; + } + + old_now= now; + now= my_getsystime(); + if (recovery_message_printed == REC_MSG_FLUSH) + { + float phase_took= (now - old_now)/10000000.0; + procent_printed= 1; + fprintf(stderr, " (%.1f seconds); ", phase_took); + } + + if (take_checkpoints && checkpoint_useful) + { + /* No dirty pages, all tables are closed, no active transactions, save: */ + if (ma_checkpoint_execute(CHECKPOINT_FULL, FALSE)) + goto err; + } + + goto end; +err: + error= 1; + tprint(tracef, "\nRecovery of tables with transaction logs FAILED\n"); +end: + hash_free(&all_dirty_pages); + bzero(&all_dirty_pages, sizeof(all_dirty_pages)); + my_free(dirty_pages_pool, MYF(MY_ALLOW_ZERO_PTR)); + dirty_pages_pool= NULL; + my_free(all_tables, MYF(MY_ALLOW_ZERO_PTR)); + all_tables= NULL; + my_free(all_active_trans, MYF(MY_ALLOW_ZERO_PTR)); + all_active_trans= NULL; + my_free(log_record_buffer.str, MYF(MY_ALLOW_ZERO_PTR)); + log_record_buffer.str= NULL; + log_record_buffer.length= 0; + ma_checkpoint_end(); + *warnings_count= warnings; + if (recovery_message_printed != REC_MSG_NONE) + { + fprintf(stderr, "\n"); + if (!error) + ma_message_no_user(ME_JUST_INFO, "recovery done"); + } + if (error) + my_message(HA_ERR_INITIALIZATION, + "Maria recovery failed. Please run maria_chk -r on all maria " + "tables and delete all maria_log.######## files", MYF(0)); + procent_printed= 0; + /* we don't cleanly close tables if we hit some error (may corrupt them) */ + DBUG_RETURN(error); +} + + +/* very basic info about the record's header */ +static void display_record_position(const LOG_DESC *log_desc, + const TRANSLOG_HEADER_BUFFER *rec, + uint number) +{ + /* + if number==0, we're going over records which we had already seen and which + form a group, so we indent below the group's end record + */ + tprint(tracef, + "%sRec#%u LSN (%lu,0x%lx) short_trid %u %s(num_type:%u) len %lu\n", + number ? "" : " ", number, LSN_IN_PARTS(rec->lsn), + rec->short_trid, log_desc->name, rec->type, + (ulong)rec->record_length); +} + + +static int display_and_apply_record(const LOG_DESC *log_desc, + const TRANSLOG_HEADER_BUFFER *rec) +{ + int error; + if (log_desc->record_execute_in_redo_phase == NULL) + { + /* die on all not-yet-handled records :) */ + DBUG_ASSERT("one more hook" == "to write"); + return 1; + } + if ((error= (*log_desc->record_execute_in_redo_phase)(rec))) + eprint(tracef, "Got error %d when executing record\n", my_errno); + return error; +} + + +prototype_redo_exec_hook(LONG_TRANSACTION_ID) +{ + uint16 sid= rec->short_trid; + TrID long_trid= all_active_trans[sid].long_trid; + /* + Any incomplete group should be of an old crash which already had a + recovery and thus has logged INCOMPLETE_GROUP which we must have seen. + */ + DBUG_ASSERT(all_active_trans[sid].group_start_lsn == LSN_IMPOSSIBLE); + if (long_trid != 0) + { + LSN ulsn= all_active_trans[sid].undo_lsn; + /* + If the first record of that transaction is after 'rec', it's probably + because that transaction was found in the checkpoint record, and then + it's ok, we can forget about that transaction (we'll meet it later + again in the REDO phase) and replace it with the one in 'rec'. + */ + if ((ulsn != LSN_IMPOSSIBLE) && + (cmp_translog_addr(ulsn, rec->lsn) < 0)) + { + char llbuf[22]; + llstr(long_trid, llbuf); + eprint(tracef, "Found an old transaction long_trid %s short_trid %u" + " with same short id as this new transaction, and has neither" + " committed nor rollback (undo_lsn: (%lu,0x%lx))\n", + llbuf, sid, LSN_IN_PARTS(ulsn)); + goto err; + } + } + long_trid= uint6korr(rec->header); + new_transaction(sid, long_trid, LSN_IMPOSSIBLE, LSN_IMPOSSIBLE); + goto end; +err: + ALERT_USER(); + return 1; +end: + return 0; +} + + +static void new_transaction(uint16 sid, TrID long_id, LSN undo_lsn, + LSN first_undo_lsn) +{ + char llbuf[22]; + all_active_trans[sid].long_trid= long_id; + llstr(long_id, llbuf); + tprint(tracef, "Transaction long_trid %s short_trid %u starts\n", + llbuf, sid); + all_active_trans[sid].undo_lsn= undo_lsn; + all_active_trans[sid].first_undo_lsn= first_undo_lsn; + set_if_bigger(max_long_trid, long_id); +} + + +prototype_redo_exec_hook_dummy(CHECKPOINT) +{ + /* the only checkpoint we care about was found via control file, ignore */ + return 0; +} + + +prototype_redo_exec_hook_dummy(INCOMPLETE_GROUP) +{ + /* abortion was already made */ + return 0; +} + +prototype_redo_exec_hook(INCOMPLETE_LOG) +{ + MARIA_HA *info; + if (skip_DDLs) + { + tprint(tracef, "we skip DDLs\n"); + return 0; + } + if ((info= get_MARIA_HA_from_REDO_record(rec)) == NULL) + { + /* no such table, don't need to warn */ + return 0; + } + /* + Example of what can go wrong when replaying DDLs: + CREATE TABLE t (logged); INSERT INTO t VALUES(1) (logged); + ALTER TABLE t ... which does + CREATE a temporary table #sql... (logged) + INSERT data from t into #sql... (not logged) + RENAME #sql TO t (logged) + Removing tables by hand and replaying the log will leave in the + end an empty table "t": missing records. If after the RENAME an INSERT + into t was done, that row had number 1 in its page, executing the + REDO_INSERT_ROW_HEAD on the recreated empty t will fail (assertion + failure in _ma_apply_redo_insert_row_head_or_tail(): new data page is + created whereas rownr is not 0). + So when the server disables logging for ALTER TABLE or CREATE SELECT, it + logs LOGREC_INCOMPLETE_LOG to warn maria_read_log and then the user. + + Another issue is that replaying of DDLs is not correct enough to work if + there was a crash during a DDL (see comment in execution of + REDO_RENAME_TABLE ). + */ + tprint(tracef, "***WARNING: MySQL server currently logs no records" + " about insertion of data by ALTER TABLE and CREATE SELECT," + " as they are not necessary for recovery;" + " present applying of log records may well not work.***\n"); + warnings++; + return 0; +} + + +prototype_redo_exec_hook(REDO_CREATE_TABLE) +{ + File dfile= -1, kfile= -1; + char *linkname_ptr, filename[FN_REFLEN], *name, *ptr, *data_file_name, + *index_file_name; + uchar *kfile_header; + myf create_flag; + uint flags; + int error= 1, create_mode= O_RDWR | O_TRUNC; + MARIA_HA *info= NULL; + uint kfile_size_before_extension, keystart; + + if (skip_DDLs) + { + tprint(tracef, "we skip DDLs\n"); + return 0; + } + enlarge_buffer(rec); + if (log_record_buffer.str == NULL || + translog_read_record(rec->lsn, 0, rec->record_length, + log_record_buffer.str, NULL) != + rec->record_length) + { + eprint(tracef, "Failed to read record\n"); + goto end; + } + name= (char *)log_record_buffer.str; + /* + TRUNCATE TABLE and REPAIR USE_FRM call maria_create(), so below we can + find a REDO_CREATE_TABLE for a table which we have open, that's why we + need to look for any open instances and close them first. + */ + if (close_one_table(name, rec->lsn)) + { + eprint(tracef, "Table '%s' got error %d on close\n", name, my_errno); + ALERT_USER(); + goto end; + } + /* we try hard to get create_rename_lsn, to avoid mistakes if possible */ + info= maria_open(name, O_RDONLY, HA_OPEN_FOR_REPAIR); + if (info) + { + MARIA_SHARE *share= info->s; + /* check that we're not already using it */ + if (share->reopen != 1) + { + eprint(tracef, "Table '%s is already open (reopen=%u)\n", + name, share->reopen); + ALERT_USER(); + goto end; + } + DBUG_ASSERT(share->now_transactional == share->base.born_transactional); + if (!share->base.born_transactional) + { + /* + could be that transactional table was later dropped, and a non-trans + one was renamed to its name, thus create_rename_lsn is 0 and should + not be trusted. + */ + tprint(tracef, "Table '%s' is not transactional, ignoring creation\n", + name); + ALERT_USER(); + error= 0; + goto end; + } + if (cmp_translog_addr(share->state.create_rename_lsn, rec->lsn) >= 0) + { + tprint(tracef, "Table '%s' has create_rename_lsn (%lu,0x%lx) more " + "recent than record, ignoring creation", + name, LSN_IN_PARTS(share->state.create_rename_lsn)); + error= 0; + goto end; + } + if (maria_is_crashed(info)) + { + eprint(tracef, "Table '%s' is crashed, can't recreate it\n", name); + ALERT_USER(); + goto end; + } + maria_close(info); + info= NULL; + } + else /* one or two files absent, or header corrupted... */ + tprint(tracef, "Table '%s' can't be opened, probably does not exist\n", + name); + /* if does not exist, or is older, overwrite it */ + ptr= name + strlen(name) + 1; + if ((flags= ptr[0] ? HA_DONT_TOUCH_DATA : 0)) + tprint(tracef, ", we will only touch index file"); + ptr++; + kfile_size_before_extension= uint2korr(ptr); + ptr+= 2; + keystart= uint2korr(ptr); + ptr+= 2; + kfile_header= (uchar *)ptr; + ptr+= kfile_size_before_extension; + /* set create_rename_lsn (for maria_read_log to be idempotent) */ + lsn_store(kfile_header + sizeof(info->s->state.header) + 2, rec->lsn); + /* we also set is_of_horizon, like maria_create() does */ + lsn_store(kfile_header + sizeof(info->s->state.header) + 2 + LSN_STORE_SIZE, + rec->lsn); + data_file_name= ptr; + ptr+= strlen(data_file_name) + 1; + index_file_name= ptr; + ptr+= strlen(index_file_name) + 1; + /** @todo handle symlinks */ + if (data_file_name[0] || index_file_name[0]) + { + eprint(tracef, "Table '%s' DATA|INDEX DIRECTORY clauses are not handled\n", + name); + goto end; + } + fn_format(filename, name, "", MARIA_NAME_IEXT, + (MY_UNPACK_FILENAME | + (flags & HA_DONT_TOUCH_DATA) ? MY_RETURN_REAL_PATH : 0) | + MY_APPEND_EXT); + linkname_ptr= NULL; + create_flag= MY_DELETE_OLD; + tprint(tracef, "Table '%s' creating as '%s'", name, filename); + if ((kfile= my_create_with_symlink(linkname_ptr, filename, 0, create_mode, + MYF(MY_WME|create_flag))) < 0) + { + eprint(tracef, "Failed to create index file\n"); + goto end; + } + if (my_pwrite(kfile, kfile_header, + kfile_size_before_extension, 0, MYF(MY_NABP|MY_WME)) || + my_chsize(kfile, keystart, 0, MYF(MY_WME))) + { + eprint(tracef, "Failed to write to index file\n"); + goto end; + } + if (!(flags & HA_DONT_TOUCH_DATA)) + { + fn_format(filename,name,"", MARIA_NAME_DEXT, + MY_UNPACK_FILENAME | MY_APPEND_EXT); + linkname_ptr= NULL; + create_flag=MY_DELETE_OLD; + if (((dfile= + my_create_with_symlink(linkname_ptr, filename, 0, create_mode, + MYF(MY_WME | create_flag))) < 0) || + my_close(dfile, MYF(MY_WME))) + { + eprint(tracef, "Failed to create data file\n"); + goto end; + } + /* + we now have an empty data file. To be able to + _ma_initialize_data_file() we need some pieces of the share to be + correctly filled. So we just open the table (fortunately, an empty + data file does not preclude this). + */ + if (((info= maria_open(name, O_RDONLY, 0)) == NULL) || + _ma_initialize_data_file(info->s, info->dfile.file)) + { + eprint(tracef, "Failed to open new table or write to data file\n"); + goto end; + } + } + error= 0; +end: + tprint(tracef, "\n"); + if (kfile >= 0) + error|= my_close(kfile, MYF(MY_WME)); + if (info != NULL) + error|= maria_close(info); + return error; +} + + +prototype_redo_exec_hook(REDO_RENAME_TABLE) +{ + char *old_name, *new_name; + int error= 1; + MARIA_HA *info= NULL; + if (skip_DDLs) + { + tprint(tracef, "we skip DDLs\n"); + return 0; + } + enlarge_buffer(rec); + if (log_record_buffer.str == NULL || + translog_read_record(rec->lsn, 0, rec->record_length, + log_record_buffer.str, NULL) != + rec->record_length) + { + eprint(tracef, "Failed to read record\n"); + goto end; + } + old_name= (char *)log_record_buffer.str; + new_name= old_name + strlen(old_name) + 1; + tprint(tracef, "Table '%s' to rename to '%s'; old-name table ", old_name, + new_name); + /* + Here is why we skip CREATE/DROP/RENAME when doing a recovery from + ha_maria (whereas we do when called from maria_read_log). Consider: + CREATE TABLE t; + RENAME TABLE t to u; + DROP TABLE u; + RENAME TABLE v to u; # crash between index rename and data rename. + And do a Recovery (not removing tables beforehand). + Recovery replays CREATE, then RENAME: the maria_open("t") works, + maria_open("u") does not (no data file) so table "u" is considered + inexistent and so maria_rename() is done which overwrites u's index file, + which is lost. Ok, the data file (v.MAD) is still available, but only a + REPAIR USE_FRM can rebuild the index, which is unsafe and downtime. + So it is preferrable to not execute RENAME, and leave the "mess" of files, + rather than possibly destroy a file. DBA will manually rename files. + A safe recovery method would probably require checking the existence of + the index file and of the data file separately (not via maria_open()), and + maybe also to store a create_rename_lsn in the data file too + For now, all we risk is to leave the mess (half-renamed files) left by the + crash. We however sync files and directories at each file rename. The SQL + layer is anyway not crash-safe for DDLs (except the repartioning-related + ones). + We replay DDLs in maria_read_log to be able to recreate tables from + scratch. It means that "maria_read_log -a" should not be used on a + database which just crashed during a DDL. And also ALTER TABLE does not + log insertions of records into the temporary table, so replaying may + fail (grep for INCOMPLETE_LOG in files). + */ + info= maria_open(old_name, O_RDONLY, HA_OPEN_FOR_REPAIR); + if (info) + { + MARIA_SHARE *share= info->s; + if (!share->base.born_transactional) + { + tprint(tracef, ", is not transactional, ignoring renaming\n"); + ALERT_USER(); + error= 0; + goto end; + } + if (cmp_translog_addr(share->state.create_rename_lsn, rec->lsn) >= 0) + { + tprint(tracef, ", has create_rename_lsn (%lu,0x%lx) more recent than" + " record, ignoring renaming", + LSN_IN_PARTS(share->state.create_rename_lsn)); + error= 0; + goto end; + } + if (maria_is_crashed(info)) + { + tprint(tracef, ", is crashed, can't rename it"); + ALERT_USER(); + goto end; + } + if (close_one_table(info->s->open_file_name, rec->lsn) || + maria_close(info)) + goto end; + info= NULL; + tprint(tracef, ", is ok for renaming; new-name table "); + } + else /* one or two files absent, or header corrupted... */ + { + tprint(tracef, ", can't be opened, probably does not exist"); + error= 0; + goto end; + } + /* + We must also check the create_rename_lsn of the 'new_name' table if it + exists: otherwise we may, with our rename which overwrites, destroy + another table. For example: + CREATE TABLE t; + RENAME t to u; + DROP TABLE u; + RENAME v to u; # v is an old table, its creation/insertions not in log + And start executing the log (without removing tables beforehand): creates + t, renames it to u (if not testing create_rename_lsn) thus overwriting + old-named v, drops u, and we are stuck, we have lost data. + */ + info= maria_open(new_name, O_RDONLY, HA_OPEN_FOR_REPAIR); + if (info) + { + MARIA_SHARE *share= info->s; + /* We should not have open instances on this table. */ + if (share->reopen != 1) + { + tprint(tracef, ", is already open (reopen=%u)\n", share->reopen); + ALERT_USER(); + goto end; + } + if (!share->base.born_transactional) + { + tprint(tracef, ", is not transactional, ignoring renaming\n"); + ALERT_USER(); + goto drop; + } + if (cmp_translog_addr(share->state.create_rename_lsn, rec->lsn) >= 0) + { + tprint(tracef, ", has create_rename_lsn (%lu,0x%lx) more recent than" + " record, ignoring renaming", + LSN_IN_PARTS(share->state.create_rename_lsn)); + /* + We have to drop the old_name table. Consider: + CREATE TABLE t; + CREATE TABLE v; + RENAME TABLE t to u; + DROP TABLE u; + RENAME TABLE v to u; + and apply the log without removing tables beforehand. t will be + created, v too; in REDO_RENAME u will be more recent, but we still + have to drop t otherwise it stays. + */ + goto drop; + } + if (maria_is_crashed(info)) + { + tprint(tracef, ", is crashed, can't rename it"); + ALERT_USER(); + goto end; + } + if (maria_close(info)) + goto end; + info= NULL; + /* abnormal situation */ + tprint(tracef, ", exists but is older than record, can't rename it"); + goto end; + } + else /* one or two files absent, or header corrupted... */ + tprint(tracef, ", can't be opened, probably does not exist"); + tprint(tracef, ", renaming '%s'", old_name); + if (maria_rename(old_name, new_name)) + { + eprint(tracef, "Failed to rename table\n"); + goto end; + } + info= maria_open(new_name, O_RDONLY, 0); + if (info == NULL) + { + eprint(tracef, "Failed to open renamed table\n"); + goto end; + } + if (_ma_update_create_rename_lsn(info->s, rec->lsn, TRUE)) + goto end; + if (maria_close(info)) + goto end; + info= NULL; + error= 0; + goto end; +drop: + tprint(tracef, ", only dropping '%s'", old_name); + if (maria_delete_table(old_name)) + { + eprint(tracef, "Failed to drop table\n"); + goto end; + } + error= 0; + goto end; +end: + tprint(tracef, "\n"); + if (info != NULL) + error|= maria_close(info); + return error; +} + + +/* + The record may come from REPAIR, ALTER TABLE ENABLE KEYS, OPTIMIZE. +*/ +prototype_redo_exec_hook(REDO_REPAIR_TABLE) +{ + int error= 1; + MARIA_HA *info; + HA_CHECK param; + char *name; + uint quick_repair; + DBUG_ENTER("exec_REDO_LOGREC_REDO_REPAIR_TABLE"); + + if (skip_DDLs) + { + /* + REPAIR is not exactly a DDL, but it manipulates files without logging + insertions into them. + */ + tprint(tracef, "we skip DDLs\n"); + DBUG_RETURN(0); + } + if ((info= get_MARIA_HA_from_REDO_record(rec)) == NULL) + DBUG_RETURN(0); + + /* + Otherwise, the mapping is newer than the table, and our record is newer + than the mapping, so we can repair. + */ + tprint(tracef, " repairing...\n"); + + maria_chk_init(¶m); + param.isam_file_name= name= info->s->open_file_name; + param.testflag= uint4korr(rec->header + FILEID_STORE_SIZE); + param.tmpdir= maria_tmpdir; + DBUG_ASSERT(maria_tmpdir); + + info->s->state.key_map= uint8korr(rec->header + FILEID_STORE_SIZE + 4); + quick_repair= param.testflag & T_QUICK; + + + if (param.testflag & T_REP_PARALLEL) + { + if (maria_repair_parallel(¶m, info, name, quick_repair)) + goto end; + } + else if (param.testflag & T_REP_BY_SORT) + { + if (maria_repair_by_sort(¶m, info, name, quick_repair)) + goto end; + } + else if (maria_repair(¶m, info, name, quick_repair)) + goto end; + + if (_ma_update_create_rename_lsn(info->s, rec->lsn, TRUE)) + goto end; + error= 0; + +end: + DBUG_RETURN(error); +} + + +prototype_redo_exec_hook(REDO_DROP_TABLE) +{ + char *name; + int error= 1; + MARIA_HA *info; + if (skip_DDLs) + { + tprint(tracef, "we skip DDLs\n"); + return 0; + } + enlarge_buffer(rec); + if (log_record_buffer.str == NULL || + translog_read_record(rec->lsn, 0, rec->record_length, + log_record_buffer.str, NULL) != + rec->record_length) + { + eprint(tracef, "Failed to read record\n"); + return 1; + } + name= (char *)log_record_buffer.str; + tprint(tracef, "Table '%s'", name); + info= maria_open(name, O_RDONLY, HA_OPEN_FOR_REPAIR); + if (info) + { + MARIA_SHARE *share= info->s; + if (!share->base.born_transactional) + { + tprint(tracef, ", is not transactional, ignoring removal\n"); + ALERT_USER(); + error= 0; + goto end; + } + if (cmp_translog_addr(share->state.create_rename_lsn, rec->lsn) >= 0) + { + tprint(tracef, ", has create_rename_lsn (%lu,0x%lx) more recent than" + " record, ignoring removal", + LSN_IN_PARTS(share->state.create_rename_lsn)); + error= 0; + goto end; + } + if (maria_is_crashed(info)) + { + tprint(tracef, ", is crashed, can't drop it"); + ALERT_USER(); + goto end; + } + if (close_one_table(info->s->open_file_name, rec->lsn) || + maria_close(info)) + goto end; + info= NULL; + /* if it is older, or its header is corrupted, drop it */ + tprint(tracef, ", dropping '%s'", name); + if (maria_delete_table(name)) + { + eprint(tracef, "Failed to drop table\n"); + goto end; + } + } + else /* one or two files absent, or header corrupted... */ + tprint(tracef,", can't be opened, probably does not exist"); + error= 0; +end: + tprint(tracef, "\n"); + if (info != NULL) + error|= maria_close(info); + return error; +} + + +prototype_redo_exec_hook(FILE_ID) +{ + uint16 sid; + int error= 1; + const char *name; + MARIA_HA *info; + DBUG_ENTER("exec_REDO_LOGREC_FILE_ID"); + + if (cmp_translog_addr(rec->lsn, checkpoint_start) < 0) + { + /* + If that mapping was still true at checkpoint time, it was found in + checkpoint record, no need to recreate it. If that mapping had ended at + checkpoint time (table was closed or repaired), a flush and force + happened and so mapping is not needed. + */ + tprint(tracef, "ignoring because before checkpoint\n"); + DBUG_RETURN(0); + } + + enlarge_buffer(rec); + if (log_record_buffer.str == NULL || + translog_read_record(rec->lsn, 0, rec->record_length, + log_record_buffer.str, NULL) != + rec->record_length) + { + eprint(tracef, "Failed to read record\n"); + goto end; + } + sid= fileid_korr(log_record_buffer.str); + info= all_tables[sid].info; + if (info != NULL) + { + tprint(tracef, " Closing table '%s'\n", info->s->open_file_name); + prepare_table_for_close(info, rec->lsn); + if (maria_close(info)) + { + eprint(tracef, "Failed to close table\n"); + goto end; + } + all_tables[sid].info= NULL; + } + name= (char *)log_record_buffer.str + FILEID_STORE_SIZE; + if (new_table(sid, name, -1, -1, rec->lsn)) + goto end; + error= 0; +end: + DBUG_RETURN(error); +} + + +static int new_table(uint16 sid, const char *name, + File org_kfile, File org_dfile, + LSN lsn_of_file_id) +{ + /* + -1 (skip table): close table and return 0; + 1 (error): close table and return 1; + 0 (success): leave table open and return 0. + */ + int error= 1; + MARIA_HA *info; + MARIA_SHARE *share; + my_off_t dfile_len, kfile_len; + + checkpoint_useful= TRUE; + if ((name == NULL) || (name[0] == 0)) + { + /* + we didn't use DBUG_ASSERT() because such record corruption could + silently pass in the "info == NULL" test below. + */ + tprint(tracef, ", record is corrupted"); + info= NULL; + goto end; + } + tprint(tracef, "Table '%s', id %u", name, sid); + info= maria_open(name, O_RDWR, HA_OPEN_FOR_REPAIR); + if (info == NULL) + { + tprint(tracef, ", is absent (must have been dropped later?)" + " or its header is so corrupted that we cannot open it;" + " we skip it"); + error= 0; + goto end; + } + if (maria_is_crashed(info)) + { + /** @todo what should we do? how to continue recovery? */ + tprint(tracef, "Table is crashed, can't apply log records to it\n"); + goto end; + } + share= info->s; + /* check that we're not already using it */ + if (share->reopen != 1) + { + tprint(tracef, ", is already open (reopen=%u)\n", share->reopen); + /* + It could be that we have in the log + FILE_ID(t1,10) ... (t1 was flushed) ... FILE_ID(t1,12); + */ + if (close_one_table(share->open_file_name, lsn_of_file_id)) + goto end; + } + DBUG_ASSERT(share->now_transactional == share->base.born_transactional); + if (!share->base.born_transactional) + { + tprint(tracef, ", is not transactional\n"); + ALERT_USER(); + error= -1; + goto end; + } + if (cmp_translog_addr(lsn_of_file_id, share->state.create_rename_lsn) <= 0) + { + tprint(tracef, ", has create_rename_lsn (%lu,0x%lx) more recent than" + " LOGREC_FILE_ID's LSN (%lu,0x%lx), ignoring open request", + LSN_IN_PARTS(share->state.create_rename_lsn), + LSN_IN_PARTS(lsn_of_file_id)); + error= -1; + goto end; + } + /* don't log any records for this work */ + _ma_tmp_disable_logging_for_table(info, FALSE); + /* _ma_unpin_all_pages() reads info->trn: */ + info->trn= &dummy_transaction_object; + /* execution of some REDO records relies on data_file_length */ + dfile_len= my_seek(info->dfile.file, 0, SEEK_END, MYF(MY_WME)); + kfile_len= my_seek(info->s->kfile.file, 0, SEEK_END, MYF(MY_WME)); + if ((dfile_len == MY_FILEPOS_ERROR) || + (kfile_len == MY_FILEPOS_ERROR)) + { + tprint(tracef, ", length unknown\n"); + goto end; + } + if (share->state.state.data_file_length != dfile_len) + { + tprint(tracef, ", has wrong state.data_file_length (fixing it)"); + share->state.state.data_file_length= dfile_len; + } + if (share->state.state.key_file_length != kfile_len) + { + tprint(tracef, ", has wrong state.key_file_length (fixing it)"); + share->state.state.key_file_length= kfile_len; + } + if ((dfile_len % share->block_size) || (kfile_len % share->block_size)) + { + tprint(tracef, ", has too short last page\n"); + /* Recovery will fix this, no error */ + ALERT_USER(); + } + /* + This LSN serves in this situation; assume log is: + FILE_ID(6->"t2") REDO_INSERT(6) FILE_ID(6->"t1") CHECKPOINT(6->"t1") + then crash, checkpoint record is parsed and opens "t1" with id 6; assume + REDO phase starts from the REDO_INSERT above: it will wrongly try to + update a page of "t1". With this LSN below, REDO_INSERT can realize the + mapping is newer than itself, and not execute. + Same example is possible with UNDO_INSERT (update of the state). + */ + info->s->lsn_of_file_id= lsn_of_file_id; + all_tables[sid].info= info; + all_tables[sid].org_kfile= org_kfile; + all_tables[sid].org_dfile= org_dfile; + /* + We don't set info->s->id, it would be useless (no logging in REDO phase); + if you change that, know that some records in REDO phase call + _ma_update_create_rename_lsn() which resets info->s->id. + */ + tprint(tracef, ", opened"); + error= 0; +end: + tprint(tracef, "\n"); + if (error) + { + if (info != NULL) + maria_close(info); + if (error == -1) + error= 0; + } + return error; +} + + +prototype_redo_exec_hook(REDO_INSERT_ROW_HEAD) +{ + int error= 1; + uchar *buff= NULL; + MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec); + if (info == NULL) + { + /* + Table was skipped at open time (because later dropped/renamed, not + transactional, or create_rename_lsn newer than LOGREC_FILE_ID); it is + not an error. + */ + return 0; + } + /* + If REDO's LSN is > page's LSN (read from disk), we are going to modify the + page and change its LSN. The normal runtime code stores the UNDO's LSN + into the page. Here storing the REDO's LSN (rec->lsn) would work + (we are not writing to the log here, so don't have to "flush up to UNDO's + LSN"). But in a test scenario where we do updates at runtime, then remove + tables, apply the log and check that this results in the same table as at + runtime, putting the same LSN as runtime had done will decrease + differences. So we use the UNDO's LSN which is current_group_end_lsn. + */ + enlarge_buffer(rec); + if (log_record_buffer.str == NULL) + { + eprint(tracef, "Failed to read allocate buffer for record\n"); + goto end; + } + if (translog_read_record(rec->lsn, 0, rec->record_length, + log_record_buffer.str, NULL) != + rec->record_length) + { + eprint(tracef, "Failed to read record\n"); + goto end; + } + buff= log_record_buffer.str; + if (_ma_apply_redo_insert_row_head_or_tail(info, current_group_end_lsn, + HEAD_PAGE, + buff + FILEID_STORE_SIZE, + buff + + FILEID_STORE_SIZE + + PAGE_STORE_SIZE + + DIRPOS_STORE_SIZE, + rec->record_length - + (FILEID_STORE_SIZE + + PAGE_STORE_SIZE + + DIRPOS_STORE_SIZE))) + goto end; + error= 0; +end: + return error; +} + + +prototype_redo_exec_hook(REDO_INSERT_ROW_TAIL) +{ + int error= 1; + uchar *buff; + MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec); + if (info == NULL) + return 0; + enlarge_buffer(rec); + if (log_record_buffer.str == NULL || + translog_read_record(rec->lsn, 0, rec->record_length, + log_record_buffer.str, NULL) != + rec->record_length) + { + eprint(tracef, "Failed to read record\n"); + goto end; + } + buff= log_record_buffer.str; + if (_ma_apply_redo_insert_row_head_or_tail(info, current_group_end_lsn, + TAIL_PAGE, + buff + FILEID_STORE_SIZE, + buff + + FILEID_STORE_SIZE + + PAGE_STORE_SIZE + + DIRPOS_STORE_SIZE, + rec->record_length - + (FILEID_STORE_SIZE + + PAGE_STORE_SIZE + + DIRPOS_STORE_SIZE))) + goto end; + error= 0; + +end: + return error; +} + + +prototype_redo_exec_hook(REDO_INSERT_ROW_BLOBS) +{ + int error= 1; + uchar *buff; + MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec); + if (info == NULL) + return 0; + enlarge_buffer(rec); + if (log_record_buffer.str == NULL || + translog_read_record(rec->lsn, 0, rec->record_length, + log_record_buffer.str, NULL) != + rec->record_length) + { + eprint(tracef, "Failed to read record\n"); + goto end; + } + buff= log_record_buffer.str; + if (_ma_apply_redo_insert_row_blobs(info, current_group_end_lsn, + buff + FILEID_STORE_SIZE)) + goto end; + error= 0; + +end: + return error; +} + + +prototype_redo_exec_hook(REDO_PURGE_ROW_HEAD) +{ + int error= 1; + MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec); + if (info == NULL) + return 0; + if (_ma_apply_redo_purge_row_head_or_tail(info, current_group_end_lsn, + HEAD_PAGE, + rec->header + FILEID_STORE_SIZE)) + goto end; + error= 0; +end: + return error; +} + + +prototype_redo_exec_hook(REDO_PURGE_ROW_TAIL) +{ + int error= 1; + MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec); + if (info == NULL) + return 0; + if (_ma_apply_redo_purge_row_head_or_tail(info, current_group_end_lsn, + TAIL_PAGE, + rec->header + FILEID_STORE_SIZE)) + goto end; + error= 0; +end: + return error; +} + + +prototype_redo_exec_hook(REDO_FREE_BLOCKS) +{ + int error= 1; + uchar *buff; + MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec); + if (info == NULL) + return 0; + enlarge_buffer(rec); + + if (log_record_buffer.str == NULL || + translog_read_record(rec->lsn, 0, rec->record_length, + log_record_buffer.str, NULL) != + rec->record_length) + { + eprint(tracef, "Failed to read record\n"); + goto end; + } + + buff= log_record_buffer.str; + if (_ma_apply_redo_free_blocks(info, current_group_end_lsn, + buff + FILEID_STORE_SIZE)) + goto end; + error= 0; +end: + return error; +} + + +prototype_redo_exec_hook(REDO_FREE_HEAD_OR_TAIL) +{ + int error= 1; + MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec); + if (info == NULL) + return 0; + + if (_ma_apply_redo_free_head_or_tail(info, current_group_end_lsn, + rec->header + FILEID_STORE_SIZE)) + goto end; + error= 0; +end: + return error; +} + + +prototype_redo_exec_hook(REDO_DELETE_ALL) +{ + int error= 1; + MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec); + if (info == NULL) + return 0; + tprint(tracef, " deleting all %lu rows\n", + (ulong)info->s->state.state.records); + if (maria_delete_all_rows(info)) + goto end; + error= 0; +end: + return error; +} + + +prototype_redo_exec_hook(REDO_INDEX) +{ + int error= 1; + MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec); + if (info == NULL) + return 0; + enlarge_buffer(rec); + + if (log_record_buffer.str == NULL || + translog_read_record(rec->lsn, 0, rec->record_length, + log_record_buffer.str, NULL) != + rec->record_length) + { + eprint(tracef, "Failed to read record\n"); + goto end; + } + + if (_ma_apply_redo_index(info, current_group_end_lsn, + log_record_buffer.str + FILEID_STORE_SIZE, + rec->record_length - FILEID_STORE_SIZE)) + goto end; + error= 0; +end: + return error; +} + +prototype_redo_exec_hook(REDO_INDEX_NEW_PAGE) +{ + int error= 1; + MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec); + if (info == NULL) + return 0; + enlarge_buffer(rec); + + if (log_record_buffer.str == NULL || + translog_read_record(rec->lsn, 0, rec->record_length, + log_record_buffer.str, NULL) != + rec->record_length) + { + eprint(tracef, "Failed to read record\n"); + goto end; + } + + if (_ma_apply_redo_index_new_page(info, current_group_end_lsn, + log_record_buffer.str + FILEID_STORE_SIZE, + rec->record_length - FILEID_STORE_SIZE)) + goto end; + error= 0; +end: + return error; +} + + +prototype_redo_exec_hook(REDO_INDEX_FREE_PAGE) +{ + int error= 1; + MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec); + if (info == NULL) + return 0; + + if (_ma_apply_redo_index_free_page(info, current_group_end_lsn, + rec->header + FILEID_STORE_SIZE)) + goto end; + error= 0; +end: + return error; +} + + +#define set_undo_lsn_for_active_trans(TRID, LSN) do { \ + all_active_trans[TRID].undo_lsn= LSN; \ + if (all_active_trans[TRID].first_undo_lsn == LSN_IMPOSSIBLE) \ + all_active_trans[TRID].first_undo_lsn= LSN; } while (0) + +prototype_redo_exec_hook(UNDO_ROW_INSERT) +{ + MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec); + MARIA_SHARE *share; + + if (info == NULL) + return 0; + share= info->s; + set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn); + if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0) + { + tprint(tracef, " state has LSN (%lu,0x%lx) older than record, updating" + " rows' count\n", LSN_IN_PARTS(share->state.is_of_horizon)); + share->state.state.records++; + if (share->calc_checksum) + { + uchar buff[HA_CHECKSUM_STORE_SIZE]; + if (translog_read_record(rec->lsn, LSN_STORE_SIZE + FILEID_STORE_SIZE + + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE, + HA_CHECKSUM_STORE_SIZE, buff, NULL) != + HA_CHECKSUM_STORE_SIZE) + { + eprint(tracef, "Failed to read record\n"); + return 1; + } + share->state.state.checksum+= ha_checksum_korr(buff); + } + info->s->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED; + } + tprint(tracef, " rows' count %lu\n", (ulong)info->s->state.state.records); + /* Unpin all pages, stamp them with UNDO's LSN */ + _ma_unpin_all_pages(info, rec->lsn); + return 0; +} + + +prototype_redo_exec_hook(UNDO_ROW_DELETE) +{ + MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec); + MARIA_SHARE *share; + + if (info == NULL) + return 0; + share= info->s; + set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn); + if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0) + { + tprint(tracef, " state older than record\n"); + share->state.state.records--; + if (share->calc_checksum) + { + uchar buff[HA_CHECKSUM_STORE_SIZE]; + if (translog_read_record(rec->lsn, LSN_STORE_SIZE + FILEID_STORE_SIZE + + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE, + HA_CHECKSUM_STORE_SIZE, buff, NULL) != + HA_CHECKSUM_STORE_SIZE) + { + eprint(tracef, "Failed to read record\n"); + return 1; + } + share->state.state.checksum+= ha_checksum_korr(buff); + } + share->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED | + STATE_NOT_OPTIMIZED_ROWS; + } + tprint(tracef, " rows' count %lu\n", (ulong)share->state.state.records); + _ma_unpin_all_pages(info, rec->lsn); + return 0; +} + + +prototype_redo_exec_hook(UNDO_ROW_UPDATE) +{ + MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec); + MARIA_SHARE *share; + if (info == NULL) + return 0; + share= info->s; + set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn); + if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0) + { + if (share->calc_checksum) + { + uchar buff[HA_CHECKSUM_STORE_SIZE]; + if (translog_read_record(rec->lsn, LSN_STORE_SIZE + FILEID_STORE_SIZE + + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE, + HA_CHECKSUM_STORE_SIZE, buff, NULL) != + HA_CHECKSUM_STORE_SIZE) + { + eprint(tracef, "Failed to read record\n"); + return 1; + } + share->state.state.checksum+= ha_checksum_korr(buff); + } + share->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED; + } + _ma_unpin_all_pages(info, rec->lsn); + return 0; +} + + +prototype_redo_exec_hook(UNDO_KEY_INSERT) +{ + MARIA_HA *info; + MARIA_SHARE *share; + if (!(info= get_MARIA_HA_from_UNDO_record(rec))) + return 0; + share= info->s; + set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn); + if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0) + { + const uchar *ptr= rec->header + LSN_STORE_SIZE + FILEID_STORE_SIZE; + uint keynr= key_nr_korr(ptr); + if (share->base.auto_key == (keynr + 1)) /* it's auto-increment */ + { + const HA_KEYSEG *keyseg= info->s->keyinfo[keynr].seg; + ulonglong value; + char llbuf[22]; + uchar *to; + tprint(tracef, " state older than record\n"); + /* we read the record to find the auto_increment value */ + enlarge_buffer(rec); + if (log_record_buffer.str == NULL || + translog_read_record(rec->lsn, 0, rec->record_length, + log_record_buffer.str, NULL) != + rec->record_length) + { + eprint(tracef, "Failed to read record\n"); + return 1; + } + to= log_record_buffer.str + LSN_STORE_SIZE + FILEID_STORE_SIZE + + KEY_NR_STORE_SIZE; + if (keyseg->flag & HA_SWAP_KEY) + { + /* We put key from log record to "data record" packing format... */ + uchar reversed[HA_MAX_KEY_BUFF]; + uchar *key_ptr= to; + uchar *key_end= key_ptr + keyseg->length; + to= reversed + keyseg->length; + do + { + *--to= *key_ptr++; + } while (key_ptr != key_end); + /* ... so that we can read it with: */ + } + value= ma_retrieve_auto_increment(to, keyseg->type); + set_if_bigger(share->state.auto_increment, value); + llstr(share->state.auto_increment, llbuf); + tprint(tracef, " auto-inc %s\n", llbuf); + } + } + _ma_unpin_all_pages(info, rec->lsn); + return 0; +} + + +prototype_redo_exec_hook(UNDO_KEY_DELETE) +{ + MARIA_HA *info; + if (!(info= get_MARIA_HA_from_UNDO_record(rec))) + return 0; + set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn); + _ma_unpin_all_pages(info, rec->lsn); + return 0; +} + + +prototype_redo_exec_hook(UNDO_KEY_DELETE_WITH_ROOT) +{ + MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec); + MARIA_SHARE *share; + if (info == NULL) + return 0; + share= info->s; + set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn); + if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0) + { + uint key_nr; + my_off_t page; + key_nr= key_nr_korr(rec->header + LSN_STORE_SIZE + FILEID_STORE_SIZE); + page= page_korr(rec->header + LSN_STORE_SIZE + FILEID_STORE_SIZE + + KEY_NR_STORE_SIZE); + share->state.key_root[key_nr]= (page == IMPOSSIBLE_PAGE_NO ? + HA_OFFSET_ERROR : + page * share->block_size); + } + _ma_unpin_all_pages(info, rec->lsn); + return 0; +} + + +prototype_redo_exec_hook(COMMIT) +{ + uint16 sid= rec->short_trid; + TrID long_trid= all_active_trans[sid].long_trid; + char llbuf[22]; + if (long_trid == 0) + { + tprint(tracef, "We don't know about transaction with short_trid %u;" + "it probably committed long ago, forget it\n", sid); + bzero(&all_active_trans[sid], sizeof(all_active_trans[sid])); + return 0; + } + llstr(long_trid, llbuf); + tprint(tracef, "Transaction long_trid %s short_trid %u committed\n", + llbuf, sid); + bzero(&all_active_trans[sid], sizeof(all_active_trans[sid])); +#ifdef MARIA_VERSIONING + /* + if real recovery: + transaction was committed, move it to some separate list for later + purging (but don't purge now! purging may have been started before, we + may find REDO_PURGE records soon). + */ +#endif + return 0; +} + + +/* + Set position for next active record that will have key inserted +*/ + +static void set_lastpos(MARIA_HA *info, uchar *pos) +{ + ulonglong page; + uint dir_entry; + + /* If we have checksum, it's before rowid */ + if (info->s->calc_checksum) + pos+= HA_CHECKSUM_STORE_SIZE; + page= page_korr(pos); + dir_entry= dirpos_korr(pos + PAGE_STORE_SIZE); + info->cur_row.lastpos= ma_recordpos(page, dir_entry); +} + + +prototype_redo_exec_hook(CLR_END) +{ + MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec); + MARIA_SHARE *share; + LSN previous_undo_lsn; + enum translog_record_type undone_record_type; + const LOG_DESC *log_desc; + my_bool row_entry= 0; + uchar *logpos; + DBUG_ENTER("exec_REDO_LOGREC_CLR_END"); + + if (info == NULL) + DBUG_RETURN(0); + share= info->s; + previous_undo_lsn= lsn_korr(rec->header); + undone_record_type= + clr_type_korr(rec->header + LSN_STORE_SIZE + FILEID_STORE_SIZE); + log_desc= &log_record_type_descriptor[undone_record_type]; + + set_undo_lsn_for_active_trans(rec->short_trid, previous_undo_lsn); + tprint(tracef, " CLR_END was about %s, undo_lsn now LSN (%lu,0x%lx)\n", + log_desc->name, LSN_IN_PARTS(previous_undo_lsn)); + + enlarge_buffer(rec); + if (log_record_buffer.str == NULL || + translog_read_record(rec->lsn, 0, rec->record_length, + log_record_buffer.str, NULL) != + rec->record_length) + { + eprint(tracef, "Failed to read record\n"); + return 1; + } + logpos= (log_record_buffer.str + LSN_STORE_SIZE + FILEID_STORE_SIZE + + CLR_TYPE_STORE_SIZE); + + if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0) + { + tprint(tracef, " state older than record\n"); + switch (undone_record_type) { + case LOGREC_UNDO_ROW_DELETE: + row_entry= 1; + share->state.state.records++; + set_lastpos(info, logpos); + break; + case LOGREC_UNDO_ROW_INSERT: + share->state.state.records--; + share->state.changed|= STATE_NOT_OPTIMIZED_ROWS; + row_entry= 1; + break; + case LOGREC_UNDO_ROW_UPDATE: + row_entry= 1; + set_lastpos(info, logpos); + break; + case LOGREC_UNDO_KEY_INSERT: + case LOGREC_UNDO_KEY_DELETE: + break; + case LOGREC_UNDO_KEY_INSERT_WITH_ROOT: + case LOGREC_UNDO_KEY_DELETE_WITH_ROOT: + { + uint key_nr; + my_off_t page; + key_nr= key_nr_korr(logpos); + page= page_korr(logpos + KEY_NR_STORE_SIZE); + share->state.key_root[key_nr]= (page == IMPOSSIBLE_PAGE_NO ? + HA_OFFSET_ERROR : + page * share->block_size); + break; + } + default: + DBUG_ASSERT(0); + } + if (row_entry && share->calc_checksum) + share->state.state.checksum+= ha_checksum_korr(logpos); + share->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED; + } + else + { + /* We must set lastpos for upcoming undo delete keys */ + switch (undone_record_type) { + case LOGREC_UNDO_ROW_DELETE: + case LOGREC_UNDO_ROW_UPDATE: + set_lastpos(info, logpos); + break; + default: + break; + } + } + if (row_entry) + tprint(tracef, " rows' count %lu\n", (ulong)share->state.state.records); + _ma_unpin_all_pages(info, rec->lsn); + DBUG_RETURN(0); +} + + +prototype_undo_exec_hook(UNDO_ROW_INSERT) +{ + my_bool error; + MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec); + LSN previous_undo_lsn= lsn_korr(rec->header); + MARIA_SHARE *share; + const uchar *record_ptr; + + if (info == NULL) + { + /* + Unlike for REDOs, if the table was skipped it is abnormal; we have a + transaction to rollback which used this table, as it is not rolled back + it was supposed to hold this table and so the table should still be + there. + */ + return 1; + } + share= info->s; + share->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED | + STATE_NOT_OPTIMIZED_ROWS; + + record_ptr= rec->header; + if (share->calc_checksum) + { + /* + We need to read more of the record to put the checksum into the record + buffer used by _ma_apply_undo_row_insert(). + If the table has no live checksum, rec->header will be enough. + */ + enlarge_buffer(rec); + if (log_record_buffer.str == NULL || + translog_read_record(rec->lsn, 0, rec->record_length, + log_record_buffer.str, NULL) != + rec->record_length) + { + eprint(tracef, "Failed to read record\n"); + return 1; + } + record_ptr= log_record_buffer.str; + } + + info->trn= trn; + error= _ma_apply_undo_row_insert(info, previous_undo_lsn, + record_ptr + LSN_STORE_SIZE + + FILEID_STORE_SIZE); + info->trn= 0; + /* trn->undo_lsn is updated in an inwrite_hook when writing the CLR_END */ + tprint(tracef, " rows' count %lu\n", (ulong)info->s->state.state.records); + tprint(tracef, " undo_lsn now LSN (%lu,0x%lx)\n", + LSN_IN_PARTS(previous_undo_lsn)); + return error; +} + + +prototype_undo_exec_hook(UNDO_ROW_DELETE) +{ + my_bool error; + MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec); + LSN previous_undo_lsn= lsn_korr(rec->header); + MARIA_SHARE *share; + + if (info == NULL) + return 1; + + share= info->s; + share->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED; + enlarge_buffer(rec); + if (log_record_buffer.str == NULL || + translog_read_record(rec->lsn, 0, rec->record_length, + log_record_buffer.str, NULL) != + rec->record_length) + { + eprint(tracef, "Failed to read record\n"); + return 1; + } + + info->trn= trn; + /* + For now we skip the page and directory entry. This is to be used + later when we mark rows as deleted. + */ + error= _ma_apply_undo_row_delete(info, previous_undo_lsn, + log_record_buffer.str + LSN_STORE_SIZE + + FILEID_STORE_SIZE + PAGE_STORE_SIZE + + DIRPOS_STORE_SIZE, + rec->record_length - + (LSN_STORE_SIZE + FILEID_STORE_SIZE + + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE)); + info->trn= 0; + tprint(tracef, " rows' count %lu\n undo_lsn now LSN (%lu,0x%lx)\n", + (ulong)share->state.state.records, LSN_IN_PARTS(previous_undo_lsn)); + return error; +} + + +prototype_undo_exec_hook(UNDO_ROW_UPDATE) +{ + my_bool error; + MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec); + LSN previous_undo_lsn= lsn_korr(rec->header); + MARIA_SHARE *share; + + if (info == NULL) + return 1; + + share= info->s; + share->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED; + + enlarge_buffer(rec); + if (log_record_buffer.str == NULL || + translog_read_record(rec->lsn, 0, rec->record_length, + log_record_buffer.str, NULL) != + rec->record_length) + { + eprint(tracef, "Failed to read record\n"); + return 1; + } + + info->trn= trn; + error= _ma_apply_undo_row_update(info, previous_undo_lsn, + log_record_buffer.str + LSN_STORE_SIZE + + FILEID_STORE_SIZE, + rec->record_length - + (LSN_STORE_SIZE + FILEID_STORE_SIZE)); + info->trn= 0; + tprint(tracef, " undo_lsn now LSN (%lu,0x%lx)\n", + LSN_IN_PARTS(previous_undo_lsn)); + return error; +} + + +prototype_undo_exec_hook(UNDO_KEY_INSERT) +{ + my_bool error; + MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec); + LSN previous_undo_lsn= lsn_korr(rec->header); + MARIA_SHARE *share; + + if (info == NULL) + { + /* + Unlike for REDOs, if the table was skipped it is abnormal; we have a + transaction to rollback which used this table, as it is not rolled back + it was supposed to hold this table and so the table should still be + there. + */ + return 1; + } + + share= info->s; + share->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED; + + enlarge_buffer(rec); + if (log_record_buffer.str == NULL || + translog_read_record(rec->lsn, 0, rec->record_length, + log_record_buffer.str, NULL) != + rec->record_length) + { + eprint(tracef, "Failed to read record\n"); + return 1; + } + + info->trn= trn; + error= _ma_apply_undo_key_insert(info, previous_undo_lsn, + log_record_buffer.str + LSN_STORE_SIZE + + FILEID_STORE_SIZE, + rec->record_length - LSN_STORE_SIZE - + FILEID_STORE_SIZE); + info->trn= 0; + /* trn->undo_lsn is updated in an inwrite_hook when writing the CLR_END */ + tprint(tracef, " undo_lsn now LSN (%lu,0x%lx)\n", + LSN_IN_PARTS(previous_undo_lsn)); + return error; +} + + +prototype_undo_exec_hook(UNDO_KEY_DELETE) +{ + my_bool error; + MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec); + LSN previous_undo_lsn= lsn_korr(rec->header); + MARIA_SHARE *share; + + if (info == NULL) + { + /* + Unlike for REDOs, if the table was skipped it is abnormal; we have a + transaction to rollback which used this table, as it is not rolled back + it was supposed to hold this table and so the table should still be + there. + */ + return 1; + } + + share= info->s; + share->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED; + + enlarge_buffer(rec); + if (log_record_buffer.str == NULL || + translog_read_record(rec->lsn, 0, rec->record_length, + log_record_buffer.str, NULL) != + rec->record_length) + { + eprint(tracef, "Failed to read record\n"); + return 1; + } + + info->trn= trn; + error= _ma_apply_undo_key_delete(info, previous_undo_lsn, + log_record_buffer.str + LSN_STORE_SIZE + + FILEID_STORE_SIZE, + rec->record_length - LSN_STORE_SIZE - + FILEID_STORE_SIZE); + info->trn= 0; + /* trn->undo_lsn is updated in an inwrite_hook when writing the CLR_END */ + tprint(tracef, " undo_lsn now LSN (%lu,0x%lx)\n", + LSN_IN_PARTS(previous_undo_lsn)); + return error; +} + + +prototype_undo_exec_hook(UNDO_KEY_DELETE_WITH_ROOT) +{ + my_bool error; + MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec); + LSN previous_undo_lsn= lsn_korr(rec->header); + MARIA_SHARE *share; + + if (info == NULL) + { + /* + Unlike for REDOs, if the table was skipped it is abnormal; we have a + transaction to rollback which used this table, as it is not rolled back + it was supposed to hold this table and so the table should still be + there. + */ + return 1; + } + + share= info->s; + share->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED; + + enlarge_buffer(rec); + if (log_record_buffer.str == NULL || + translog_read_record(rec->lsn, 0, rec->record_length, + log_record_buffer.str, NULL) != + rec->record_length) + { + eprint(tracef, "Failed to read record\n"); + return 1; + } + + info->trn= trn; + error= _ma_apply_undo_key_delete(info, previous_undo_lsn, + log_record_buffer.str + LSN_STORE_SIZE + + FILEID_STORE_SIZE + PAGE_STORE_SIZE, + rec->record_length - LSN_STORE_SIZE - + FILEID_STORE_SIZE - PAGE_STORE_SIZE); + info->trn= 0; + /* trn->undo_lsn is updated in an inwrite_hook when writing the CLR_END */ + tprint(tracef, " undo_lsn now LSN (%lu,0x%lx)\n", + LSN_IN_PARTS(previous_undo_lsn)); + return error; +} + + + +static int run_redo_phase(LSN lsn, enum maria_apply_log_way apply) +{ + TRANSLOG_HEADER_BUFFER rec; + struct st_translog_scanner_data scanner; + int len; + uint i; + + /* install hooks for execution */ +#define install_redo_exec_hook(R) \ + log_record_type_descriptor[LOGREC_ ## R].record_execute_in_redo_phase= \ + exec_REDO_LOGREC_ ## R; +#define install_undo_exec_hook(R) \ + log_record_type_descriptor[LOGREC_ ## R].record_execute_in_undo_phase= \ + exec_UNDO_LOGREC_ ## R; + install_redo_exec_hook(LONG_TRANSACTION_ID); + install_redo_exec_hook(CHECKPOINT); + install_redo_exec_hook(REDO_CREATE_TABLE); + install_redo_exec_hook(REDO_RENAME_TABLE); + install_redo_exec_hook(REDO_REPAIR_TABLE); + install_redo_exec_hook(REDO_DROP_TABLE); + install_redo_exec_hook(FILE_ID); + install_redo_exec_hook(INCOMPLETE_LOG); + install_redo_exec_hook(INCOMPLETE_GROUP); + install_redo_exec_hook(REDO_INSERT_ROW_HEAD); + install_redo_exec_hook(REDO_INSERT_ROW_TAIL); + install_redo_exec_hook(REDO_INSERT_ROW_BLOBS); + install_redo_exec_hook(REDO_PURGE_ROW_HEAD); + install_redo_exec_hook(REDO_PURGE_ROW_TAIL); + install_redo_exec_hook(REDO_FREE_HEAD_OR_TAIL); + install_redo_exec_hook(REDO_FREE_BLOCKS); + install_redo_exec_hook(REDO_DELETE_ALL); + install_redo_exec_hook(REDO_INDEX); + install_redo_exec_hook(REDO_INDEX_NEW_PAGE); + install_redo_exec_hook(REDO_INDEX_FREE_PAGE); + install_redo_exec_hook(UNDO_ROW_INSERT); + install_redo_exec_hook(UNDO_ROW_DELETE); + install_redo_exec_hook(UNDO_ROW_UPDATE); + install_redo_exec_hook(UNDO_KEY_INSERT); + install_redo_exec_hook(UNDO_KEY_DELETE); + install_redo_exec_hook(UNDO_KEY_DELETE_WITH_ROOT); + install_redo_exec_hook(COMMIT); + install_redo_exec_hook(CLR_END); + install_undo_exec_hook(UNDO_ROW_INSERT); + install_undo_exec_hook(UNDO_ROW_DELETE); + install_undo_exec_hook(UNDO_ROW_UPDATE); + install_undo_exec_hook(UNDO_KEY_INSERT); + install_undo_exec_hook(UNDO_KEY_DELETE); + install_undo_exec_hook(UNDO_KEY_DELETE_WITH_ROOT); + + current_group_end_lsn= LSN_IMPOSSIBLE; +#ifndef DBUG_OFF + current_group_table= NULL; +#endif + + if (unlikely(lsn == LSN_IMPOSSIBLE || lsn == translog_get_horizon())) + { + tprint(tracef, "checkpoint address refers to the log end log or " + "log is empty, nothing to do.\n"); + return 0; + } + + len= translog_read_record_header(lsn, &rec); + + if (len == RECHEADER_READ_ERROR) + { + eprint(tracef, "Failed to read header of the first record.\n"); + return 1; + } + if (translog_scanner_init(lsn, 1, &scanner, 1)) + { + tprint(tracef, "Scanner init failed\n"); + return 1; + } + for (i= 1;;i++) + { + uint16 sid= rec.short_trid; + const LOG_DESC *log_desc= &log_record_type_descriptor[rec.type]; + display_record_position(log_desc, &rec, i); + /* + A complete group is a set of log records with an "end mark" record + (e.g. a set of REDOs for an operation, terminated by an UNDO for this + operation); if there is no "end mark" record the group is incomplete and + won't be executed. + */ + if ((log_desc->record_in_group == LOGREC_IS_GROUP_ITSELF) || + (log_desc->record_in_group == LOGREC_LAST_IN_GROUP)) + { + if (all_active_trans[sid].group_start_lsn != LSN_IMPOSSIBLE) + { + if (log_desc->record_in_group == LOGREC_IS_GROUP_ITSELF) + { + /* + Can happen if the transaction got a table write error, then + unlocked tables thus wrote a COMMIT record. Or can be an + INCOMPLETE_GROUP record written by a previous recovery. + */ + tprint(tracef, "\nDiscarding incomplete group before this record\n"); + all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE; + } + else + { + struct st_translog_scanner_data scanner2; + TRANSLOG_HEADER_BUFFER rec2; + /* + There is a complete group for this transaction, containing more + than this event. + */ + tprint(tracef, " ends a group:\n"); + len= + translog_read_record_header(all_active_trans[sid].group_start_lsn, + &rec2); + if (len < 0) /* EOF or error */ + { + tprint(tracef, "Cannot find record where it should be\n"); + goto err; + } + if (translog_scanner_init(rec2.lsn, 1, &scanner2, 1)) + { + tprint(tracef, "Scanner2 init failed\n"); + goto err; + } + current_group_end_lsn= rec.lsn; + do + { + if (rec2.short_trid == sid) /* it's in our group */ + { + const LOG_DESC *log_desc2= &log_record_type_descriptor[rec2.type]; + display_record_position(log_desc2, &rec2, 0); + if (apply == MARIA_LOG_CHECK) + { + translog_size_t read_len; + enlarge_buffer(&rec2); + read_len= + translog_read_record(rec2.lsn, 0, rec2.record_length, + log_record_buffer.str, NULL); + if (read_len != rec2.record_length) + { + tprint(tracef, "Cannot read record's body: read %u of" + " %u bytes\n", read_len, rec2.record_length); + goto err; + } + } + if (apply == MARIA_LOG_APPLY && + display_and_apply_record(log_desc2, &rec2)) + { + translog_destroy_scanner(&scanner2); + goto err; + } + } + len= translog_read_next_record_header(&scanner2, &rec2); + if (len < 0) /* EOF or error */ + { + tprint(tracef, "Cannot find record where it should be\n"); + goto err; + } + } + while (rec2.lsn < rec.lsn); + translog_free_record_header(&rec2); + /* group finished */ + all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE; + current_group_end_lsn= LSN_IMPOSSIBLE; /* for debugging */ + display_record_position(log_desc, &rec, 0); + translog_destroy_scanner(&scanner2); + } + } + if (apply == MARIA_LOG_APPLY && + display_and_apply_record(log_desc, &rec)) + goto err; +#ifndef DBUG_OFF + current_group_table= NULL; +#endif + } + else /* record does not end group */ + { + /* just record the fact, can't know if can execute yet */ + if (all_active_trans[sid].group_start_lsn == LSN_IMPOSSIBLE) + { + /* group not yet started */ + all_active_trans[sid].group_start_lsn= rec.lsn; + } + } + len= translog_read_next_record_header(&scanner, &rec); + if (len < 0) + { + switch (len) + { + case RECHEADER_READ_EOF: + tprint(tracef, "EOF on the log\n"); + break; + case RECHEADER_READ_ERROR: + tprint(tracef, "Error reading log\n"); + goto err; + } + break; + } + } + translog_destroy_scanner(&scanner); + translog_free_record_header(&rec); + if (recovery_message_printed == REC_MSG_REDO) + { + fprintf(stderr, " 100%%"); + procent_printed= 1; + } + return 0; + +err: + translog_destroy_scanner(&scanner); + return 1; +} + + +/** + @brief Informs about any aborted groups or uncommitted transactions, + prepares for the UNDO phase if needed. + + @note Observe that it may init trnman. +*/ +static uint end_of_redo_phase(my_bool prepare_for_undo_phase) +{ + uint sid, uncommitted= 0; + char llbuf[22]; + LSN addr; + + hash_free(&all_dirty_pages); + /* + hash_free() can be called multiple times probably, but be safe if that + changes + */ + bzero(&all_dirty_pages, sizeof(all_dirty_pages)); + my_free(dirty_pages_pool, MYF(MY_ALLOW_ZERO_PTR)); + dirty_pages_pool= NULL; + + llstr(max_long_trid, llbuf); + tprint(tracef, "Maximum transaction long id seen: %s\n", llbuf); + if (prepare_for_undo_phase && trnman_init(max_long_trid)) + return -1; + + for (sid= 0; sid <= SHORT_TRID_MAX; sid++) + { + TrID long_trid= all_active_trans[sid].long_trid; + LSN gslsn= all_active_trans[sid].group_start_lsn; + TRN *trn; + if (gslsn != LSN_IMPOSSIBLE) + { + tprint(tracef, "Group at LSN (%lu,0x%lx) short_trid %u incomplete\n", + LSN_IN_PARTS(gslsn), sid); + all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE; + } + if (all_active_trans[sid].undo_lsn != LSN_IMPOSSIBLE) + { + llstr(long_trid, llbuf); + tprint(tracef, "Transaction long_trid %s short_trid %u uncommitted\n", + llbuf, sid); + /* dummy_transaction_object serves only for DDLs */ + DBUG_ASSERT(long_trid != 0); + if (prepare_for_undo_phase) + { + if ((trn= trnman_recreate_trn_from_recovery(sid, long_trid)) == NULL) + return -1; + trn->undo_lsn= all_active_trans[sid].undo_lsn; + trn->first_undo_lsn= all_active_trans[sid].first_undo_lsn | + TRANSACTION_LOGGED_LONG_ID; /* because trn is known in log */ + if (gslsn != LSN_IMPOSSIBLE) + { + /* + UNDO phase will log some records. So, a future recovery may see: + REDO(from incomplete group) - REDO(from rollback) - CLR_END + and thus execute the first REDO (finding it in "a complete + group"). To prevent that: + */ + LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS]; + LSN lsn; + if (translog_write_record(&lsn, LOGREC_INCOMPLETE_GROUP, + trn, NULL, 0, + TRANSLOG_INTERNAL_PARTS, log_array, + NULL, NULL)) + return -1; + } + } + uncommitted++; + } +#ifdef MARIA_VERSIONING + /* + If real recovery: if transaction was committed, move it to some separate + list for soon purging. + */ +#endif + } + + my_free(all_active_trans, MYF(MY_ALLOW_ZERO_PTR)); + all_active_trans= NULL; + + /* + The UNDO phase uses some normal run-time code of ROLLBACK: generates log + records, etc; prepare tables for that + */ + addr= translog_get_horizon(); + for (sid= 0; sid <= SHARE_ID_MAX; sid++) + { + MARIA_HA *info= all_tables[sid].info; + if (info != NULL) + { + prepare_table_for_close(info, addr); + /* + But we don't close it; we leave it available for the UNDO phase; + it's likely that the UNDO phase will need it. + */ + if (prepare_for_undo_phase) + translog_assign_id_to_share_from_recovery(info->s, sid); + } + } + return uncommitted; +} + + +static int run_undo_phase(uint uncommitted) +{ + DBUG_ENTER("run_undo_phase"); + + if (uncommitted > 0) + { + checkpoint_useful= TRUE; + if (tracef != stdout) + { + if (recovery_message_printed == REC_MSG_NONE) + print_preamble(); + fprintf(stderr, "transactions to roll back:"); + recovery_message_printed= REC_MSG_UNDO; + } + tprint(tracef, "%u transactions will be rolled back\n", uncommitted); + for( ; ; ) + { + char llbuf[22]; + TRN *trn; + if (recovery_message_printed == REC_MSG_UNDO) + fprintf(stderr, " %u", uncommitted); + if ((uncommitted--) == 0) + break; + trn= trnman_get_any_trn(); + DBUG_ASSERT(trn != NULL); + llstr(trn->trid, llbuf); + tprint(tracef, "Rolling back transaction of long id %s\n", llbuf); + + /* Execute all undo entries */ + while (trn->undo_lsn) + { + TRANSLOG_HEADER_BUFFER rec; + LOG_DESC *log_desc; + if (translog_read_record_header(trn->undo_lsn, &rec) == + RECHEADER_READ_ERROR) + DBUG_RETURN(1); + log_desc= &log_record_type_descriptor[rec.type]; + display_record_position(log_desc, &rec, 0); + if (log_desc->record_execute_in_undo_phase(&rec, trn)) + { + tprint(tracef, "Got error %d when executing undo\n", my_errno); + DBUG_RETURN(1); + } + } + + if (trnman_rollback_trn(trn)) + DBUG_RETURN(1); + /* We could want to span a few threads (4?) instead of 1 */ + /* In the future, we want to have this phase *online* */ + } + } + DBUG_RETURN(0); +} + + +/** + @brief re-enables transactionality, updates is_of_horizon + + @param info table + @param horizon address to set is_of_horizon +*/ + +static void prepare_table_for_close(MARIA_HA *info, TRANSLOG_ADDRESS horizon) +{ + MARIA_SHARE *share= info->s; + /* + In a fully-forward REDO phase (no checkpoint record), + state is now at least as new as the LSN of the current record. It may be + newer, in case we are seeing a LOGREC_FILE_ID which tells us to close a + table, but that table was later modified further in the log. + But if we parsed a checkpoint record, it may be this way in the log: + FILE_ID(6->t2)... FILE_ID(6->t1)... CHECKPOINT(6->t1) + Checkpoint parsing opened t1 with id 6; first FILE_ID above is going to + make t1 close; the first condition below is however false (when checkpoint + was taken it increased is_of_horizon) and so it works. For safety we + add the second condition. + */ + if (cmp_translog_addr(share->state.is_of_horizon, horizon) < 0 && + cmp_translog_addr(share->lsn_of_file_id, horizon) < 0) + { + share->state.is_of_horizon= horizon; + _ma_state_info_write_sub(share->kfile.file, &share->state, 1); + } + _ma_reenable_logging_for_table(share); + info->trn= NULL; /* safety */ +} + + +static MARIA_HA *get_MARIA_HA_from_REDO_record(const + TRANSLOG_HEADER_BUFFER *rec) +{ + uint16 sid; + pgcache_page_no_t page; + MARIA_HA *info; + char llbuf[22]; + my_bool index_page_redo_entry= 0; + + print_redo_phase_progress(rec->lsn); + sid= fileid_korr(rec->header); + page= page_korr(rec->header + FILEID_STORE_SIZE); + switch (rec->type) { + /* not all REDO records have a page: */ + case LOGREC_REDO_INDEX_NEW_PAGE: + case LOGREC_REDO_INDEX: + case LOGREC_REDO_INDEX_FREE_PAGE: + index_page_redo_entry= 1; + /* Fall trough*/ + case LOGREC_REDO_INSERT_ROW_HEAD: + case LOGREC_REDO_INSERT_ROW_TAIL: + case LOGREC_REDO_PURGE_ROW_HEAD: + case LOGREC_REDO_PURGE_ROW_TAIL: + llstr(page, llbuf); + tprint(tracef, " For page %s of table of short id %u", llbuf, sid); + break; + /* other types could print their info here too */ + default: + break; + } + info= all_tables[sid].info; +#ifndef DBUG_OFF + DBUG_ASSERT(current_group_table == NULL || current_group_table == info); + current_group_table= info; +#endif + if (info == NULL) + { + tprint(tracef, ", table skipped, so skipping record\n"); + return NULL; + } + tprint(tracef, ", '%s'", info->s->open_file_name); + if (cmp_translog_addr(rec->lsn, info->s->lsn_of_file_id) <= 0) + { + /* + This can happen only if processing a record before the checkpoint + record. + id->name mapping is newer than REDO record: for sure the table subject + of the REDO has been flushed and forced (id re-assignment implies this); + REDO can be ignored (and must be, as we don't know what this subject + table was). + */ + DBUG_ASSERT(cmp_translog_addr(rec->lsn, checkpoint_start) < 0); + tprint(tracef, ", table's LOGREC_FILE_ID has LSN (%lu,0x%lx) more recent" + " than record, skipping record", + LSN_IN_PARTS(info->s->lsn_of_file_id)); + return NULL; + } + /* detect if an open instance of a dropped table (internal bug) */ + DBUG_ASSERT(info->s->last_version != 0); + if (cmp_translog_addr(rec->lsn, checkpoint_start) < 0) + { + uint64 file_and_page_id= + (((uint64) (index_page_redo_entry ? all_tables[sid].org_kfile : + all_tables[sid].org_dfile)) << 32) | page; + struct st_dirty_page *dirty_page= (struct st_dirty_page *) + hash_search(&all_dirty_pages, + (uchar *)&file_and_page_id, sizeof(file_and_page_id)); + if ((dirty_page == NULL) || + cmp_translog_addr(rec->lsn, dirty_page->rec_lsn) < 0) + { + tprint(tracef, ", ignoring because of dirty_pages list\n"); + return NULL; + } + } + + /* + So we are going to read the page, and if its LSN is older than the + record's we will modify the page + */ + tprint(tracef, ", applying record\n"); + _ma_writeinfo(info, WRITEINFO_UPDATE_KEYFILE); /* to flush state on close */ + return info; +} + + +static MARIA_HA *get_MARIA_HA_from_UNDO_record(const + TRANSLOG_HEADER_BUFFER *rec) +{ + uint16 sid; + MARIA_HA *info; + + sid= fileid_korr(rec->header + LSN_STORE_SIZE); + tprint(tracef, " For table of short id %u", sid); + info= all_tables[sid].info; +#ifndef DBUG_OFF + DBUG_ASSERT(current_group_table == NULL || current_group_table == info); + current_group_table= info; +#endif + if (info == NULL) + { + tprint(tracef, ", table skipped, so skipping record\n"); + return NULL; + } + tprint(tracef, ", '%s'", info->s->open_file_name); + if (cmp_translog_addr(rec->lsn, info->s->lsn_of_file_id) <= 0) + { + tprint(tracef, ", table's LOGREC_FILE_ID has LSN (%lu,0x%lx) more recent" + " than record, skipping record", + LSN_IN_PARTS(info->s->lsn_of_file_id)); + return NULL; + } + DBUG_ASSERT(info->s->last_version != 0); + _ma_writeinfo(info, WRITEINFO_UPDATE_KEYFILE); /* to flush state on close */ + tprint(tracef, ", applying record\n"); + return info; +} + + +/** + @brief Parses checkpoint record. + + Builds from it the dirty_pages list (a hash), opens tables and maps them to + their 2-byte IDs, recreates transactions (not real TRNs though). + + @return LSN from where in the log the REDO phase should start + @retval LSN_ERROR error + @retval other ok +*/ + +static LSN parse_checkpoint_record(LSN lsn) +{ + ulong i, nb_dirty_pages; + TRANSLOG_HEADER_BUFFER rec; + TRANSLOG_ADDRESS start_address; + int len; + uint nb_active_transactions, nb_committed_transactions, nb_tables; + uchar *ptr; + LSN minimum_rec_lsn_of_active_transactions, minimum_rec_lsn_of_dirty_pages; + struct st_dirty_page *next_dirty_page_in_pool; + + tprint(tracef, "Loading data from checkpoint record at LSN (%lu,0x%lx)\n", + LSN_IN_PARTS(lsn)); + if ((len= translog_read_record_header(lsn, &rec)) == RECHEADER_READ_ERROR) + { + tprint(tracef, "Cannot find checkpoint record where it should be\n"); + return LSN_ERROR; + } + + enlarge_buffer(&rec); + if (log_record_buffer.str == NULL || + translog_read_record(rec.lsn, 0, rec.record_length, + log_record_buffer.str, NULL) != + rec.record_length) + { + eprint(tracef, "Failed to read record\n"); + return LSN_ERROR; + } + + ptr= log_record_buffer.str; + start_address= lsn_korr(ptr); + ptr+= LSN_STORE_SIZE; + + /* transactions */ + nb_active_transactions= uint2korr(ptr); + ptr+= 2; + tprint(tracef, "%u active transactions\n", nb_active_transactions); + minimum_rec_lsn_of_active_transactions= lsn_korr(ptr); + ptr+= LSN_STORE_SIZE; + max_long_trid= transid_korr(ptr); + ptr+= TRANSID_SIZE; + + /* + how much brain juice and discussions there was to come to writing this + line + */ + set_if_smaller(start_address, minimum_rec_lsn_of_active_transactions); + + for (i= 0; i < nb_active_transactions; i++) + { + uint16 sid= uint2korr(ptr); + TrID long_id; + LSN undo_lsn, first_undo_lsn; + ptr+= 2; + long_id= uint6korr(ptr); + ptr+= 6; + DBUG_ASSERT(sid > 0 && long_id > 0); + undo_lsn= lsn_korr(ptr); + ptr+= LSN_STORE_SIZE; + first_undo_lsn= lsn_korr(ptr); + ptr+= LSN_STORE_SIZE; + new_transaction(sid, long_id, undo_lsn, first_undo_lsn); + } + nb_committed_transactions= uint4korr(ptr); + ptr+= 4; + tprint(tracef, "%lu committed transactions\n", + (ulong)nb_committed_transactions); + /* no purging => committed transactions are not important */ + ptr+= (6 + LSN_STORE_SIZE) * nb_committed_transactions; + + /* tables */ + nb_tables= uint4korr(ptr); + ptr+= 4; + tprint(tracef, "%u open tables\n", nb_tables); + for (i= 0; i< nb_tables; i++) + { + char name[FN_REFLEN]; + File kfile, dfile; + LSN first_log_write_lsn; + uint name_len; + uint16 sid= uint2korr(ptr); + ptr+= 2; + DBUG_ASSERT(sid > 0); + kfile= uint4korr(ptr); + ptr+= 4; + dfile= uint4korr(ptr); + ptr+= 4; + first_log_write_lsn= lsn_korr(ptr); + ptr+= LSN_STORE_SIZE; + name_len= strlen((char *)ptr) + 1; + strmake(name, (char *)ptr, sizeof(name)-1); + ptr+= name_len; + if (new_table(sid, name, kfile, dfile, first_log_write_lsn)) + return LSN_ERROR; + } + + /* dirty pages */ + nb_dirty_pages= uint8korr(ptr); + ptr+= 8; + tprint(tracef, "%lu dirty pages\n", nb_dirty_pages); + if (hash_init(&all_dirty_pages, &my_charset_bin, nb_dirty_pages, + offsetof(struct st_dirty_page, file_and_page_id), + sizeof(((struct st_dirty_page *)NULL)->file_and_page_id), + NULL, NULL, 0)) + return LSN_ERROR; + dirty_pages_pool= + (struct st_dirty_page *)my_malloc(nb_dirty_pages * + sizeof(struct st_dirty_page), + MYF(MY_WME)); + if (unlikely(dirty_pages_pool == NULL)) + return LSN_ERROR; + next_dirty_page_in_pool= dirty_pages_pool; + minimum_rec_lsn_of_dirty_pages= LSN_MAX; + for (i= 0; i < nb_dirty_pages ; i++) + { + pgcache_page_no_t pageid; + LSN rec_lsn; + File fileid= uint4korr(ptr); + ptr+= 4; + pageid= uint4korr(ptr); + ptr+= 4; + rec_lsn= lsn_korr(ptr); + ptr+= LSN_STORE_SIZE; + if (new_page(fileid, pageid, rec_lsn, next_dirty_page_in_pool++)) + return LSN_ERROR; + set_if_smaller(minimum_rec_lsn_of_dirty_pages, rec_lsn); + } + /* after that, there will be no insert/delete into the hash */ + /* + sanity check on record (did we screw up with all those "ptr+=", did the + checkpoint write code and checkpoint read code go out of sync?). + */ + if (ptr != (log_record_buffer.str + log_record_buffer.length)) + { + eprint(tracef, "checkpoint record corrupted\n"); + return LSN_ERROR; + } + set_if_smaller(start_address, minimum_rec_lsn_of_dirty_pages); + + /* + Find LSN higher or equal to this TRANSLOG_ADDRESS, suitable for + translog_read_record() functions + */ + checkpoint_start= translog_next_LSN(start_address, LSN_IMPOSSIBLE); + if (checkpoint_start == LSN_IMPOSSIBLE) + { + /* + There must be a problem, as our checkpoint record exists and is >= the + address which is stored in its first bytes, which is >= start_address. + */ + return LSN_ERROR; + } + return checkpoint_start; +} + +static int new_page(File fileid, pgcache_page_no_t pageid, LSN rec_lsn, + struct st_dirty_page *dirty_page) +{ + /* serves as hash key */ + dirty_page->file_and_page_id= (((uint64)fileid) << 32) | pageid; + dirty_page->rec_lsn= rec_lsn; + return my_hash_insert(&all_dirty_pages, (uchar *)dirty_page); +} + + +static int close_all_tables(void) +{ + int error= 0; + uint count= 0; + LIST *list_element, *next_open; + MARIA_HA *info; + TRANSLOG_ADDRESS addr; + DBUG_ENTER("close_all_tables"); + + pthread_mutex_lock(&THR_LOCK_maria); + if (maria_open_list == NULL) + goto end; + tprint(tracef, "Closing all tables\n"); + if (tracef != stdout) + { + if (recovery_message_printed == REC_MSG_NONE) + print_preamble(); + for (count= 0, list_element= maria_open_list ; + list_element ; count++, (list_element= list_element->next)) + fprintf(stderr, "tables to flush:"); + recovery_message_printed= REC_MSG_FLUSH; + } + /* + Since the end of end_of_redo_phase(), we may have written new records + (if UNDO phase ran) and thus the state is newer than at + end_of_redo_phase(), we need to bump is_of_horizon again. + */ + addr= translog_get_horizon(); + for (list_element= maria_open_list ; ; list_element= next_open) + { + if (recovery_message_printed == REC_MSG_FLUSH) + fprintf(stderr, " %u", count--); + if (list_element == NULL) + break; + next_open= list_element->next; + info= (MARIA_HA*)list_element->data; + pthread_mutex_unlock(&THR_LOCK_maria); /* ok, UNDO phase not online yet */ + /* + Tables which we see here are exactly those which were open at time of + crash. They might have open_count>0 as Checkpoint maybe flushed their + state while they were used. As Recovery corrected them, don't alarm the + user, don't ask for a table check: + */ + info->s->state.open_count= 0; + prepare_table_for_close(info, addr); + error|= maria_close(info); + pthread_mutex_lock(&THR_LOCK_maria); + } +end: + pthread_mutex_unlock(&THR_LOCK_maria); + DBUG_RETURN(error); +} + + +/** + @brief Close all table instances with a certain name which are present in + all_tables. + + @param name Name of table + @param addr Log address passed to prepare_table_for_close() +*/ + +static my_bool close_one_table(const char *name, TRANSLOG_ADDRESS addr) +{ + my_bool res= 0; + /* There are no other threads using the tables, so we don't need any locks */ + struct st_table_for_recovery *internal_table, *end; + for (internal_table= all_tables, end= internal_table + SHARE_ID_MAX + 1; + internal_table < end ; + internal_table++) + { + MARIA_HA *info= internal_table->info; + if ((info != NULL) && !strcmp(info->s->open_file_name, name)) + { + prepare_table_for_close(info, addr); + if (maria_close(info)) + res= 1; + internal_table->info= NULL; + } + } + return res; +} + + +/** + Temporarily disables logging for this table. + + If that makes the log incomplete, writes a LOGREC_INCOMPLETE_LOG to the log + to warn log readers. + + @param info table + @param log_incomplete if that disabling makes the log incomplete + + @note for example in the REDO phase we disable logging but that does not + make the log incomplete. +*/ +void _ma_tmp_disable_logging_for_table(MARIA_HA *info, + my_bool log_incomplete) +{ + MARIA_SHARE *share= info->s; + if (log_incomplete) + { + uchar log_data[FILEID_STORE_SIZE]; + LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 1]; + LSN lsn; + log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data; + log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data); + translog_write_record(&lsn, LOGREC_INCOMPLETE_LOG, + info->trn, info, sizeof(log_data), + TRANSLOG_INTERNAL_PARTS + 1, log_array, + log_data, NULL); + } + /* if we disabled before writing the record, record wouldn't reach log */ + share->now_transactional= FALSE; + share->page_type= PAGECACHE_PLAIN_PAGE; +} + +static void print_redo_phase_progress(TRANSLOG_ADDRESS addr) +{ + static int end_logno= FILENO_IMPOSSIBLE, end_offset, percentage_printed= 0; + static ulonglong initial_remainder= -1; + int cur_logno, cur_offset; + ulonglong local_remainder; + int percentage_done; + + if (tracef == stdout) + return; + if (recovery_message_printed == REC_MSG_NONE) + { + print_preamble(); + fprintf(stderr, "recovered pages: 0%%"); + procent_printed= 1; + recovery_message_printed= REC_MSG_REDO; + } + if (end_logno == FILENO_IMPOSSIBLE) + { + LSN end_addr= translog_get_horizon(); + end_logno= LSN_FILE_NO(end_addr); + end_offset= LSN_OFFSET(end_addr); + } + cur_logno= LSN_FILE_NO(addr); + cur_offset= LSN_OFFSET(addr); + local_remainder= (cur_logno == end_logno) ? (end_offset - cur_offset) : + (((longlong)log_file_size) - cur_offset + + max(end_logno - cur_logno - 1, 0) * ((longlong)log_file_size) + + end_offset); + if (initial_remainder == (ulonglong)(-1)) + initial_remainder= local_remainder; + percentage_done= ((initial_remainder - local_remainder) * ULL(100) / + initial_remainder); + if ((percentage_done - percentage_printed) >= 10) + { + percentage_printed= percentage_done; + fprintf(stderr, " %d%%", percentage_done); + procent_printed= 1; + } +} + +#ifdef MARIA_EXTERNAL_LOCKING +#error Marias Checkpoint and Recovery are really not ready for it +#endif + +/* +Recovery of the state : how it works +===================================== + +Here we ignore Checkpoints for a start. + +The state (MARIA_HA::MARIA_SHARE::MARIA_STATE_INFO) is updated in +memory frequently (at least at every row write/update/delete) but goes +to disk at few moments: maria_close() when closing the last open +instance, and a few rare places like CHECK/REPAIR/ALTER +(non-transactional tables also do it at maria_lock_database() but we +needn't cover them here). + +In case of crash, state on disk is likely to be older than what it was +in memory, the REDO phase needs to recreate the state as it was in +memory at the time of crash. When we say Recovery here we will always +mean "REDO phase". + +For example MARIA_STATUS_INFO::records (count of records). It is updated at +the end of every row write/update/delete/delete_all. When Recovery sees the +sign of such row operation (UNDO or REDO), it may need to update the records' +count if that count does not reflect that operation (is older). How to know +the age of the state compared to the log record: every time the state +goes to disk at runtime, its member "is_of_horizon" is updated to the +current end-of-log horizon. So Recovery just needs to compare is_of_horizon +and the record's LSN to know if it should modify "records". + +Other operations like ALTER TABLE DISABLE KEYS update the state but +don't write log records, thus the REDO phase cannot repeat their +effect on the state in case of crash. But we make them sync the state +as soon as they have finished. This reduces the window for a problem. + +It looks like only one thread at a time updates the state in memory or +on disk. We assume that the upper level (normally MySQL) has protection +against issuing HA_EXTRA_(FORCE_REOPEN|PREPARE_FOR_RENAME) so that these +are not issued while there are any running transactions on the given table. +If this is not done, we may write a corrupted state to disk. + +With checkpoints +================ + +Checkpoint module needs to read the state in memory and write it to +disk. This may happen while some other thread is modifying the state +in memory or on disk. Checkpoint thus may be reading changing data, it +needs a mutex to not have it corrupted, and concurrent modifiers of +the state need that mutex too for the same reason. +"records" is modified for every row write/update/delete, we don't want +to add a mutex lock/unlock there. So we re-use the mutex lock/unlock +which is already present in these moments, namely the log's mutex which is +taken when UNDO_ROW_INSERT|UPDATE|DELETE is written: we update "records" in +under-log-mutex hooks when writing these records (thus "records" is +not updated at the end of maria_write/update/delete() anymore). +Thus Checkpoint takes the log's lock and can read "records" from +memory an write it to disk and release log's lock. +We however want to avoid having the disk write under the log's +lock. So it has to be under another mutex, natural choice is +intern_lock (as Checkpoint needs it anyway to read MARIA_SHARE::kfile, +and as maria_close() takes it too). All state writes to disk are +changed to be protected with intern_lock. +So Checkpoint takes intern_lock, log's lock, reads "records" from +memory, releases log's lock, updates is_of_horizon and writes "records" to +disk, release intern_lock. +In practice, not only "records" needs to be written but the full +state. So, Checkpoint reads the full state from memory. Some other +thread may at this moment be modifying in memory some pieces of the +state which are not protected by the lock's log (see ma_extra.c +HA_EXTRA_NO_KEYS), and Checkpoint would be reading a corrupted state +from memory; to guard against that we extend the intern_lock-zone to +changes done to the state in memory by HA_EXTRA_NO_KEYS et al, and +also any change made in memory to create_rename_lsn/state_is_of_horizon. +Last, we don't want in Checkpoint to do + log lock; read state from memory; release log lock; +for each table, it may hold the log's lock too much in total. +So, we instead do + log lock; read N states from memory; release log lock; +Thus, the sequence above happens outside of any intern_lock. +But this re-introduces the problem that some other thread may be changing the +state in memory and on disk under intern_lock, without log's lock, like +HA_EXTRA_NO_KEYS, while we read the N states. However, when Checkpoint later +comes to handling the table under intern_lock, which is serialized with +HA_EXTRA_NO_KEYS, it can see that is_of_horizon is higher then when the state +was read from memory under log's lock, and thus can decide to not flush the +obsolete state it has, knowing that the other thread flushed a more recent +state already. If on the other hand is_of_horizon is not higher, the read +state is current and can be flushed. So we have a per-table sequence: + lock intern_lock; test if is_of_horizon is higher than when we read the state + under log's lock; if no then flush the read state to disk. +*/ + +/* some comments and pseudo-code which we keep for later */ +#if 0 + /* + MikaelR suggests: support checkpoints during REDO phase too: do checkpoint + after a certain amount of log records have been executed. This helps + against repeated crashes. Those checkpoints could not be user-requested + (as engine is not communicating during the REDO phase), so they would be + automatic: this changes the original assumption that we don't write to the + log while in the REDO phase, but why not. How often should we checkpoint? + */ + + /* + We want to have two steps: + engine->recover_with_max_memory(); + next_engine->recover_with_max_memory(); + engine->init_with_normal_memory(); + next_engine->init_with_normal_memory(); + So: in recover_with_max_memory() allocate a giant page cache, do REDO + phase, then all page cache is flushed and emptied and freed (only retain + small structures like TM): take full checkpoint, which is useful if + next engine crashes in its recovery the next second. + Destroy all shares (maria_close()), then at init_with_normal_memory() we + do this: + */ + + /**** UNDO PHASE *****/ + + /* + Launch one or more threads to do the background rollback. Don't wait for + them to complete their rollback (background rollback; for debugging, we + can have an option which waits). Set a counter (total_of_rollback_threads) + to the number of threads to lauch. + + Note that InnoDB's rollback-in-background works as long as InnoDB is the + last engine to recover, otherwise MySQL will refuse new connections until + the last engine has recovered so it's not "background" from the user's + point of view. InnoDB is near top of sys_table_types so all others + (e.g. BDB) recover after it... So it's really "online rollback" only if + InnoDB is the only engine. + */ + + /* wake up delete/update handler */ + /* tell the TM that it can now accept new transactions */ + + /* + mark that checkpoint requests are now allowed. + */ +#endif |