diff options
Diffstat (limited to 'innobase/log')
-rw-r--r-- | innobase/log/Makefile.am | 24 | ||||
-rw-r--r-- | innobase/log/log0log.c | 2781 | ||||
-rw-r--r-- | innobase/log/log0recv.c | 2512 | ||||
-rw-r--r-- | innobase/log/makefilewin | 10 | ||||
-rw-r--r-- | innobase/log/trash/log0trsh.c | 648 |
5 files changed, 5975 insertions, 0 deletions
diff --git a/innobase/log/Makefile.am b/innobase/log/Makefile.am new file mode 100644 index 00000000000..3910a25ab1a --- /dev/null +++ b/innobase/log/Makefile.am @@ -0,0 +1,24 @@ +# Copyright (C) 2000 MySQL AB & MySQL Finland AB & TCX DataKonsult AB +# & Innobase Oy +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +include ../include/Makefile.i + +libs_LIBRARIES = liblog.a + +liblog_a_SOURCES = log0log.c log0recv.c + +EXTRA_PROGRAMS = diff --git a/innobase/log/log0log.c b/innobase/log/log0log.c new file mode 100644 index 00000000000..c6fec44d128 --- /dev/null +++ b/innobase/log/log0log.c @@ -0,0 +1,2781 @@ +/****************************************************** +Database log + +(c) 1995-1997 Innobase Oy + +Created 12/9/1995 Heikki Tuuri +*******************************************************/ + +#include "log0log.h" + +#ifdef UNIV_NONINL +#include "log0log.ic" +#endif + +#include "mem0mem.h" +#include "buf0buf.h" +#include "buf0flu.h" +#include "srv0srv.h" +#include "log0recv.h" +#include "fil0fil.h" +#include "dict0boot.h" +#include "srv0srv.h" +#include "trx0sys.h" +#include "trx0trx.h" + +/* Global log system variable */ +log_t* log_sys = NULL; + +ibool log_do_write = TRUE; +ibool log_debug_writes = FALSE; + +/* Pointer to this variable is used as the i/o-message when we do i/o to an +archive */ +byte log_archive_io; + +/* A margin for free space in the log buffer before a log entry is catenated */ +#define LOG_BUF_WRITE_MARGIN (4 * OS_FILE_LOG_BLOCK_SIZE) + +/* Margins for free space in the log buffer after a log entry is catenated */ +#define LOG_BUF_FLUSH_RATIO 2 +#define LOG_BUF_FLUSH_MARGIN (LOG_BUF_WRITE_MARGIN + 4 * UNIV_PAGE_SIZE) + +/* Margin for the free space in the smallest log group, before a new query +step which modifies the database, is started */ + +#define LOG_CHECKPOINT_FREE_PER_THREAD (4 * UNIV_PAGE_SIZE) +#define LOG_CHECKPOINT_EXTRA_FREE (8 * UNIV_PAGE_SIZE) + +/* This parameter controls asynchronous making of a new checkpoint; the value +should be bigger than LOG_POOL_PREFLUSH_RATIO_SYNC */ + +#define LOG_POOL_CHECKPOINT_RATIO_ASYNC 32 + +/* This parameter controls synchronous preflushing of modified buffer pages */ +#define LOG_POOL_PREFLUSH_RATIO_SYNC 16 + +/* The same ratio for asynchronous preflushing; this value should be less than +the previous */ +#define LOG_POOL_PREFLUSH_RATIO_ASYNC 8 + +/* Extra margin, in addition to one log file, used in archiving */ +#define LOG_ARCHIVE_EXTRA_MARGIN (4 * UNIV_PAGE_SIZE) + +/* This parameter controls asynchronous writing to the archive */ +#define LOG_ARCHIVE_RATIO_ASYNC 16 + +/* Codes used in unlocking flush latches */ +#define LOG_UNLOCK_NONE_FLUSHED_LOCK 1 +#define LOG_UNLOCK_FLUSH_LOCK 2 + +/* States of an archiving operation */ +#define LOG_ARCHIVE_READ 1 +#define LOG_ARCHIVE_WRITE 2 + +/********************************************************** +Calculates the file count of an lsn within a log group. */ +static +ulint +log_group_calc_lsn_file_count( +/*==========================*/ + /* out: file count within the log group */ + dulint lsn, /* in: lsn, must be within 4 GB of + group->next_block_lsn */ + log_group_t* group); /* in: log group */ +/********************************************************** +Completes a checkpoint write i/o to a log file. */ +static +void +log_io_complete_checkpoint( +/*=======================*/ + log_group_t* group); /* in: log group */ +/********************************************************** +Completes an archiving i/o. */ +static +void +log_io_complete_archive(void); +/*=========================*/ +/******************************************************************** +Tries to establish a big enough margin of free space in the log groups, such +that a new log entry can be catenated without an immediate need for a +archiving. */ +static +void +log_archive_margin(void); +/*====================*/ + + +/******************************************************************** +Returns the oldest modified block lsn in the pool, or log_sys->lsn if none +exists. */ +static +dulint +log_buf_pool_get_oldest_modification(void) +/*======================================*/ +{ + dulint lsn; + + ut_ad(mutex_own(&(log_sys->mutex))); + + lsn = buf_pool_get_oldest_modification(); + + if (ut_dulint_is_zero(lsn)) { + + lsn = log_sys->lsn; + } + + return(lsn); +} + +/**************************************************************** +Opens the log for log_write_low. The log must be closed with log_close and +released with log_release. */ + +dulint +log_reserve_and_open( +/*=================*/ + /* out: start lsn of the log record */ + ulint len) /* in: length of data to be catenated */ +{ + log_t* log = log_sys; + ulint len_upper_limit; + ulint archived_lsn_age; + ulint count = 0; + ulint dummy; +loop: + mutex_enter(&(log->mutex)); + + /* Calculate an upper limit for the space the string may take in the + log buffer */ + + len_upper_limit = LOG_BUF_WRITE_MARGIN + (5 * len) / 4; + + if (log->buf_free + len_upper_limit > log->buf_size) { + + mutex_exit(&(log->mutex)); + + /* Not enough free space, do a syncronous flush of the log + buffer */ + log_flush_up_to(ut_dulint_max, LOG_WAIT_ALL_GROUPS); + + count++; + + ut_ad(count < 50); + + goto loop; + } + + if (log->archiving_state != LOG_ARCH_OFF) { + + archived_lsn_age = ut_dulint_minus(log->lsn, log->archived_lsn); + + if (archived_lsn_age + len_upper_limit + > log->max_archived_lsn_age) { + + /* Not enough free archived space in log groups: do a + synchronous archive write batch: */ + + mutex_exit(&(log->mutex)); + + ut_ad(len_upper_limit <= log->max_archived_lsn_age); + + log_archive_do(TRUE, &dummy); + + count++; + + ut_ad(count < 50); + + goto loop; + } + } + +#ifdef UNIV_LOG_DEBUG + log->old_buf_free = log->buf_free; + log->old_lsn = log->lsn; +#endif + return(log->lsn); +} + +/**************************************************************** +Writes to the log the string given. It is assumed that the caller holds the +log mutex. */ + +void +log_write_low( +/*==========*/ + byte* str, /* in: string */ + ulint str_len) /* in: string length */ +{ + log_t* log = log_sys; + ulint len; + ulint data_len; + byte* log_block; + + ut_ad(mutex_own(&(log->mutex))); +part_loop: + /* Calculate a part length */ + + data_len = (log->buf_free % OS_FILE_LOG_BLOCK_SIZE) + str_len; + + if (data_len <= OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_TRL_SIZE) { + + /* The string fits within the current log block */ + + len = str_len; + } else { + data_len = OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_TRL_SIZE; + + len = OS_FILE_LOG_BLOCK_SIZE + - (log->buf_free % OS_FILE_LOG_BLOCK_SIZE) + - LOG_BLOCK_TRL_SIZE; + } + + ut_memcpy(log->buf + log->buf_free, str, len); + + str_len -= len; + str = str + len; + + log_block = ut_align_down(log->buf + log->buf_free, + OS_FILE_LOG_BLOCK_SIZE); + log_block_set_data_len(log_block, data_len); + + if (data_len == OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_TRL_SIZE) { + /* This block became full */ + log_block_set_data_len(log_block, OS_FILE_LOG_BLOCK_SIZE); + log_block_set_checkpoint_no(log_block, + log_sys->next_checkpoint_no); + len += LOG_BLOCK_HDR_SIZE + LOG_BLOCK_TRL_SIZE; + + log->lsn = ut_dulint_add(log->lsn, len); + + /* Initialize the next block header and trailer */ + log_block_init(log_block + OS_FILE_LOG_BLOCK_SIZE, log->lsn); + } else { + log->lsn = ut_dulint_add(log->lsn, len); + } + + log->buf_free += len; + + ut_ad(log->buf_free <= log->buf_size); + + if (str_len > 0) { + goto part_loop; + } +} + +/**************************************************************** +Closes the log. */ + +dulint +log_close(void) +/*===========*/ + /* out: lsn */ +{ + byte* log_block; + ulint first_rec_group; + dulint oldest_lsn; + dulint lsn; + log_t* log = log_sys; + + ut_ad(mutex_own(&(log->mutex))); + + lsn = log->lsn; + + log_block = ut_align_down(log->buf + log->buf_free, + OS_FILE_LOG_BLOCK_SIZE); + first_rec_group = log_block_get_first_rec_group(log_block); + + if (first_rec_group == 0) { + /* We initialized a new log block which was not written + full by the current mtr: the next mtr log record group + will start within this block at the offset data_len */ + + log_block_set_first_rec_group(log_block, + log_block_get_data_len(log_block)); + } + + if (log->buf_free > log->max_buf_free) { + + log->check_flush_or_checkpoint = TRUE; + } + + if (ut_dulint_minus(lsn, log->last_checkpoint_lsn) + <= log->max_modified_age_async) { + goto function_exit; + } + + oldest_lsn = buf_pool_get_oldest_modification(); + + if (ut_dulint_is_zero(oldest_lsn) + || (ut_dulint_minus(lsn, oldest_lsn) + > log->max_modified_age_async) + || (ut_dulint_minus(lsn, log->last_checkpoint_lsn) + > log->max_checkpoint_age_async)) { + + log->check_flush_or_checkpoint = TRUE; + } +function_exit: + +#ifdef UNIV_LOG_DEBUG + log_check_log_recs(log->buf + log->old_buf_free, + log->buf_free - log->old_buf_free, log->old_lsn); +#endif + + return(lsn); +} + +/********************************************************** +Pads the current log block full with dummy log records. Used in producing +consistent archived log files. */ +static +void +log_pad_current_log_block(void) +/*===========================*/ +{ + byte b = MLOG_DUMMY_RECORD; + ulint pad_length; + ulint i; + dulint lsn; + + log_reserve_and_open(OS_FILE_LOG_BLOCK_SIZE); + + pad_length = OS_FILE_LOG_BLOCK_SIZE + - (log_sys->buf_free % OS_FILE_LOG_BLOCK_SIZE) + - LOG_BLOCK_TRL_SIZE; + + for (i = 0; i < pad_length; i++) { + log_write_low(&b, 1); + } + + lsn = log_sys->lsn; + + log_close(); + log_release(); + + ut_a((ut_dulint_get_low(lsn) % OS_FILE_LOG_BLOCK_SIZE) + == LOG_BLOCK_HDR_SIZE); +} + +/********************************************************** +Calculates the data capacity of a log group, when the log file headers are not +included. */ + +ulint +log_group_get_capacity( +/*===================*/ + /* out: capacity in bytes */ + log_group_t* group) /* in: log group */ +{ + ut_ad(mutex_own(&(log_sys->mutex))); + + return((group->file_size - LOG_FILE_HDR_SIZE) * group->n_files); +} + +/********************************************************** +Calculates the offset within a log group, when the log file headers are not +included. */ +UNIV_INLINE +ulint +log_group_calc_size_offset( +/*=======================*/ + /* out: size offset (<= offset) */ + ulint offset, /* in: real offset within the log group */ + log_group_t* group) /* in: log group */ +{ + ut_ad(mutex_own(&(log_sys->mutex))); + + return(offset - LOG_FILE_HDR_SIZE * (1 + offset / group->file_size)); +} + +/********************************************************** +Calculates the offset within a log group, when the log file headers are +included. */ +UNIV_INLINE +ulint +log_group_calc_real_offset( +/*=======================*/ + /* out: real offset (>= offset) */ + ulint offset, /* in: size offset within the log group */ + log_group_t* group) /* in: log group */ +{ + ut_ad(mutex_own(&(log_sys->mutex))); + + return(offset + LOG_FILE_HDR_SIZE + * (1 + offset / (group->file_size - LOG_FILE_HDR_SIZE))); +} + +/********************************************************** +Calculates the offset of an lsn within a log group. */ +static +ulint +log_group_calc_lsn_offset( +/*======================*/ + /* out: offset within the log group */ + dulint lsn, /* in: lsn, must be within 4 GB of group->lsn */ + log_group_t* group) /* in: log group */ +{ + dulint gr_lsn; + ulint gr_lsn_size_offset; + ulint difference; + ulint group_size; + ulint offset; + + ut_ad(mutex_own(&(log_sys->mutex))); + + gr_lsn = group->lsn; + + gr_lsn_size_offset = log_group_calc_size_offset(group->lsn_offset, + group); + group_size = log_group_get_capacity(group); + + if (ut_dulint_cmp(lsn, gr_lsn) >= 0) { + + difference = ut_dulint_minus(lsn, gr_lsn); + } else { + difference = ut_dulint_minus(gr_lsn, lsn); + + difference = difference % group_size; + + difference = group_size - difference; + } + + offset = (gr_lsn_size_offset + difference) % group_size; + + return(log_group_calc_real_offset(offset, group)); +} + +/************************************************************ +Sets the field values in group to correspond to a given lsn. For this function +to work, the values must already be correctly initialized to correspond to +some lsn, for instance, a checkpoint lsn. */ + +void +log_group_set_fields( +/*=================*/ + log_group_t* group, /* in: group */ + dulint lsn) /* in: lsn for which the values should be + set */ +{ + group->lsn_offset = log_group_calc_lsn_offset(lsn, group); + group->lsn = lsn; +} + +/********************************************************************* +Calculates the recommended highest values for lsn - last_checkpoint_lsn, +lsn - buf_get_oldest_modification(), and lsn - max_archive_lsn_age. */ +static +ibool +log_calc_max_ages(void) +/*===================*/ + /* out: error value FALSE if the smallest log group is + too small to accommodate the number of OS threads in + the database server */ +{ + log_group_t* group; + ulint n_threads; + ulint margin; + ulint free; + ibool success = TRUE; + ulint smallest_capacity; + ulint archive_margin; + ulint smallest_archive_margin; + + ut_ad(!mutex_own(&(log_sys->mutex))); + + n_threads = srv_get_n_threads(); + + mutex_enter(&(log_sys->mutex)); + + group = UT_LIST_GET_FIRST(log_sys->log_groups); + + ut_ad(group); + + smallest_capacity = ULINT_MAX; + smallest_archive_margin = ULINT_MAX; + + while (group) { + if (log_group_get_capacity(group) < smallest_capacity) { + + smallest_capacity = log_group_get_capacity(group); + } + + archive_margin = log_group_get_capacity(group) + - (group->file_size - LOG_FILE_HDR_SIZE) + - LOG_ARCHIVE_EXTRA_MARGIN; + + if (archive_margin < smallest_archive_margin) { + + smallest_archive_margin = archive_margin; + } + + group = UT_LIST_GET_NEXT(log_groups, group); + } + + /* For each OS thread we must reserve so much free space in the + smallest log group that it can accommodate the log entries produced + by single query steps: running out of free log space is a serious + system error which requires rebooting the database. */ + + free = LOG_CHECKPOINT_FREE_PER_THREAD * n_threads + + LOG_CHECKPOINT_EXTRA_FREE; + if (free >= smallest_capacity / 2) { + success = FALSE; + } else { + margin = smallest_capacity - free; + } + + margin = ut_min(margin, log_sys->adm_checkpoint_interval); + + log_sys->max_modified_age_async = margin + - margin / LOG_POOL_PREFLUSH_RATIO_ASYNC; + log_sys->max_modified_age_sync = margin + - margin / LOG_POOL_PREFLUSH_RATIO_SYNC; + + log_sys->max_checkpoint_age_async = margin - margin + / LOG_POOL_CHECKPOINT_RATIO_ASYNC; + log_sys->max_checkpoint_age = margin; + + log_sys->max_archived_lsn_age = smallest_archive_margin; + + log_sys->max_archived_lsn_age_async = smallest_archive_margin + - smallest_archive_margin / + LOG_ARCHIVE_RATIO_ASYNC; + mutex_exit(&(log_sys->mutex)); + + if (!success) { + printf( + "Error: log file group too small for the number of threads\n"); + } + + return(success); +} + +/********************************************************** +Initializes the log. */ + +void +log_init(void) +/*==========*/ +{ + byte* buf; + + log_sys = mem_alloc(sizeof(log_t)); + + mutex_create(&(log_sys->mutex)); + mutex_set_level(&(log_sys->mutex), SYNC_LOG); + + mutex_enter(&(log_sys->mutex)); + + /* Start the lsn from one log block from zero: this way every + log record has a start lsn != zero, a fact which we will use */ + + log_sys->lsn = LOG_START_LSN; + + ut_a(LOG_BUFFER_SIZE >= 16 * OS_FILE_LOG_BLOCK_SIZE); + ut_a(LOG_BUFFER_SIZE >= 4 * UNIV_PAGE_SIZE); + + buf = ut_malloc(LOG_BUFFER_SIZE + OS_FILE_LOG_BLOCK_SIZE); + log_sys->buf = ut_align(buf, OS_FILE_LOG_BLOCK_SIZE); + + log_sys->buf_size = LOG_BUFFER_SIZE; + log_sys->max_buf_free = log_sys->buf_size / LOG_BUF_FLUSH_RATIO + - LOG_BUF_FLUSH_MARGIN; + log_sys->check_flush_or_checkpoint = TRUE; + UT_LIST_INIT(log_sys->log_groups); + + log_sys->n_log_ios = 0; + + /*----------------------------*/ + + log_sys->buf_next_to_write = 0; + + log_sys->written_to_some_lsn = log_sys->lsn; + log_sys->written_to_all_lsn = log_sys->lsn; + + log_sys->n_pending_writes = 0; + + log_sys->no_flush_event = os_event_create(NULL); + + os_event_set(log_sys->no_flush_event); + + log_sys->one_flushed_event = os_event_create(NULL); + + os_event_set(log_sys->one_flushed_event); + + /*----------------------------*/ + log_sys->adm_checkpoint_interval = ULINT_MAX; + + log_sys->next_checkpoint_no = ut_dulint_zero; + log_sys->last_checkpoint_lsn = log_sys->lsn; + log_sys->n_pending_checkpoint_writes = 0; + + rw_lock_create(&(log_sys->checkpoint_lock)); + rw_lock_set_level(&(log_sys->checkpoint_lock), SYNC_NO_ORDER_CHECK); + + log_sys->checkpoint_buf = ut_align( + mem_alloc(2 * OS_FILE_LOG_BLOCK_SIZE), + OS_FILE_LOG_BLOCK_SIZE); + /*----------------------------*/ + + log_sys->archiving_state = LOG_ARCH_ON; + log_sys->archived_lsn = log_sys->lsn; + + log_sys->n_pending_archive_ios = 0; + + rw_lock_create(&(log_sys->archive_lock)); + rw_lock_set_level(&(log_sys->archive_lock), SYNC_NO_ORDER_CHECK); + + log_sys->archive_buf = ut_align( + ut_malloc(LOG_ARCHIVE_BUF_SIZE + + OS_FILE_LOG_BLOCK_SIZE), + OS_FILE_LOG_BLOCK_SIZE); + log_sys->archive_buf_size = LOG_ARCHIVE_BUF_SIZE; + + log_sys->archiving_on = os_event_create(NULL); + + /*----------------------------*/ + + log_sys->online_backup_state = FALSE; + + /*----------------------------*/ + + log_block_init(log_sys->buf, log_sys->lsn); + log_block_set_first_rec_group(log_sys->buf, LOG_BLOCK_HDR_SIZE); + + log_sys->buf_free = LOG_BLOCK_HDR_SIZE; + log_sys->lsn = ut_dulint_add(LOG_START_LSN, LOG_BLOCK_HDR_SIZE); + + mutex_exit(&(log_sys->mutex)); + +#ifdef UNIV_LOG_DEBUG + recv_sys_create(); + recv_sys_init(); + + recv_sys->parse_start_lsn = log_sys->lsn; + recv_sys->scanned_lsn = log_sys->lsn; + recv_sys->scanned_checkpoint_no = 0; + recv_sys->recovered_lsn = log_sys->lsn; + recv_sys->limit_lsn = ut_dulint_max; +#endif +} + +/********************************************************************** +Inits a log group to the log system. */ + +void +log_group_init( +/*===========*/ + ulint id, /* in: group id */ + ulint n_files, /* in: number of log files */ + ulint file_size, /* in: log file size in bytes */ + ulint space_id, /* in: space id of the file space + which contains the log files of this + group */ + ulint archive_space_id) /* in: space id of the file space + which contains some archived log + files for this group; currently, only + for the first log group this is + used */ +{ + ulint i; + + log_group_t* group; + + group = mem_alloc(sizeof(log_group_t)); + + group->id = id; + group->n_files = n_files; + group->file_size = file_size; + group->space_id = space_id; + group->state = LOG_GROUP_OK; + group->lsn = LOG_START_LSN; + group->lsn_offset = LOG_FILE_HDR_SIZE; + group->n_pending_writes = 0; + + group->file_header_bufs = mem_alloc(sizeof(byte*) * n_files); + group->archive_file_header_bufs = mem_alloc(sizeof(byte*) * n_files); + + for (i = 0; i < n_files; i++) { + *(group->file_header_bufs + i) = ut_align( + mem_alloc(LOG_FILE_HDR_SIZE + OS_FILE_LOG_BLOCK_SIZE), + OS_FILE_LOG_BLOCK_SIZE); + *(group->archive_file_header_bufs + i) = ut_align( + mem_alloc(LOG_FILE_HDR_SIZE + OS_FILE_LOG_BLOCK_SIZE), + OS_FILE_LOG_BLOCK_SIZE); + } + + group->archive_space_id = archive_space_id; + + group->archived_file_no = 0; + group->archived_offset = 0; + + group->checkpoint_buf = ut_align( + mem_alloc(2 * OS_FILE_LOG_BLOCK_SIZE), + OS_FILE_LOG_BLOCK_SIZE); + + UT_LIST_ADD_LAST(log_groups, log_sys->log_groups, group); + + ut_a(log_calc_max_ages()); +} + +/********************************************************************** +Does the unlockings needed in flush i/o completion. */ +UNIV_INLINE +void +log_flush_do_unlocks( +/*=================*/ + ulint code) /* in: any ORed combination of LOG_UNLOCK_FLUSH_LOCK + and LOG_UNLOCK_NONE_FLUSHED_LOCK */ +{ + ut_ad(mutex_own(&(log_sys->mutex))); + + /* NOTE that we must own the log mutex when doing the setting of the + events: this is because transactions will wait for these events to + be set, and at that moment the log flush they were waiting for must + have ended. If the log mutex were not reserved here, the i/o-thread + calling this function might be preempted for a while, and when it + resumed execution, it might be that a new flush had been started, and + this function would erroneously signal the NEW flush as completed. + Thus, the changes in the state of these events are performed + atomically in conjunction with the changes in the state of + log_sys->n_pending_writes etc. */ + + if (code & LOG_UNLOCK_NONE_FLUSHED_LOCK) { + os_event_set(log_sys->one_flushed_event); + } + + if (code & LOG_UNLOCK_FLUSH_LOCK) { + os_event_set(log_sys->no_flush_event); + } +} + +/********************************************************************** +Checks if a flush is completed for a log group and does the completion +routine if yes. */ +UNIV_INLINE +ulint +log_group_check_flush_completion( +/*=============================*/ + /* out: LOG_UNLOCK_NONE_FLUSHED_LOCK or 0 */ + log_group_t* group) /* in: log group */ +{ + ut_ad(mutex_own(&(log_sys->mutex))); + + if (!log_sys->one_flushed && (group->n_pending_writes == 0)) { + + if (log_debug_writes) { + printf("Log flushed first to group %lu\n", group->id); + } + + log_sys->written_to_some_lsn = log_sys->flush_lsn; + log_sys->one_flushed = TRUE; + + return(LOG_UNLOCK_NONE_FLUSHED_LOCK); + } + + if (log_debug_writes && (group->n_pending_writes == 0)) { + + printf("Log flushed to group %lu\n", group->id); + } + + return(0); +} + +/********************************************************** +Checks if a flush is completed and does the completion routine if yes. */ +static +ulint +log_sys_check_flush_completion(void) +/*================================*/ + /* out: LOG_UNLOCK_FLUSH_LOCK or 0 */ +{ + ulint move_start; + ulint move_end; + + ut_ad(mutex_own(&(log_sys->mutex))); + + if (log_sys->n_pending_writes == 0) { + + log_sys->written_to_all_lsn = log_sys->flush_lsn; + log_sys->buf_next_to_write = log_sys->flush_end_offset; + + if (log_sys->flush_end_offset > log_sys->max_buf_free / 2) { + /* Move the log buffer content to the start of the + buffer */ + + move_start = ut_calc_align_down( + log_sys->flush_end_offset, + OS_FILE_LOG_BLOCK_SIZE); + move_end = ut_calc_align(log_sys->buf_free, + OS_FILE_LOG_BLOCK_SIZE); + + ut_memmove(log_sys->buf, log_sys->buf + move_start, + move_end - move_start); + log_sys->buf_free -= move_start; + + log_sys->buf_next_to_write -= move_start; + } + + return(LOG_UNLOCK_FLUSH_LOCK); + } + + return(0); +} + +/********************************************************** +Completes an i/o to a log file. */ + +void +log_io_complete( +/*============*/ + log_group_t* group) /* in: log group or a dummy pointer */ +{ + ulint unlock; + + if ((byte*)group == &log_archive_io) { + /* It was an archive write */ + + log_io_complete_archive(); + + return; + } + + if ((ulint)group & 0x1) { + /* It was a checkpoint write */ + group = (log_group_t*)((ulint)group - 1); + + fil_flush(group->space_id); + + log_io_complete_checkpoint(group); + + return; + } + + fil_flush(group->space_id); + + mutex_enter(&(log_sys->mutex)); + + ut_ad(group->n_pending_writes > 0); + ut_ad(log_sys->n_pending_writes > 0); + + group->n_pending_writes--; + log_sys->n_pending_writes--; + + unlock = log_group_check_flush_completion(group); + unlock = unlock | log_sys_check_flush_completion(); + + log_flush_do_unlocks(unlock); + + mutex_exit(&(log_sys->mutex)); +} + +/********************************************************** +Writes a log file header to a log file space. */ +static +void +log_group_file_header_flush( +/*========================*/ + ulint type, /* in: LOG_FLUSH or LOG_RECOVER */ + log_group_t* group, /* in: log group */ + ulint nth_file, /* in: header to the nth file in the + log file space */ + dulint start_lsn) /* in: log file data starts at this + lsn */ +{ + byte* buf; + ulint dest_offset; + ibool sync; + + ut_ad(mutex_own(&(log_sys->mutex))); + + ut_a(nth_file < group->n_files); + + buf = *(group->file_header_bufs + nth_file); + + mach_write_to_4(buf + LOG_GROUP_ID, group->id); + mach_write_to_8(buf + LOG_FILE_START_LSN, start_lsn); + + dest_offset = nth_file * group->file_size; + + sync = FALSE; + + if (type == LOG_RECOVER) { + + sync = TRUE; + } + + if (log_debug_writes) { + printf( + "Writing log file header to group %lu file %lu\n", group->id, + nth_file); + } + + if (log_do_write) { + if (type == LOG_FLUSH) { + log_sys->n_pending_writes++; + group->n_pending_writes++; + } + + log_sys->n_log_ios++; + + fil_io(OS_FILE_WRITE | OS_FILE_LOG, sync, group->space_id, + dest_offset / UNIV_PAGE_SIZE, + dest_offset % UNIV_PAGE_SIZE, + OS_FILE_LOG_BLOCK_SIZE, + buf, group); + } +} + +/********************************************************** +Writes a buffer to a log file group. */ + +void +log_group_write_buf( +/*================*/ + ulint type, /* in: LOG_FLUSH or LOG_RECOVER */ + log_group_t* group, /* in: log group */ + byte* buf, /* in: buffer */ + ulint len, /* in: buffer len; must be divisible + by OS_FILE_LOG_BLOCK_SIZE */ + dulint start_lsn, /* in: start lsn of the buffer; must + be divisible by + OS_FILE_LOG_BLOCK_SIZE */ + ulint new_data_offset)/* in: start offset of new data in + buf: this parameter is used to decide + if we have to write a new log file + header */ +{ + ulint write_len; + ibool sync; + ibool write_header; + ulint next_offset; + + ut_ad(mutex_own(&(log_sys->mutex))); + ut_ad(len % OS_FILE_LOG_BLOCK_SIZE == 0); + ut_ad(ut_dulint_get_low(start_lsn) % OS_FILE_LOG_BLOCK_SIZE == 0); + + sync = FALSE; + + if (type == LOG_RECOVER) { + + sync = TRUE; + } + + if (new_data_offset == 0) { + write_header = TRUE; + } else { + write_header = FALSE; + } +loop: + if (len == 0) { + + return; + } + + next_offset = log_group_calc_lsn_offset(start_lsn, group); + + if ((next_offset % group->file_size == LOG_FILE_HDR_SIZE) + && write_header) { + /* We start to write a new log file instance in the group */ + + log_group_file_header_flush(type, group, + next_offset / group->file_size, start_lsn); + } + + if ((next_offset % group->file_size) + len > group->file_size) { + + write_len = group->file_size - (next_offset % group->file_size); + } else { + write_len = len; + } + + if (log_debug_writes) { + printf( + "Writing log file segment to group %lu offset %lu len %lu\n", + group->id, next_offset, write_len); + } + + if (log_do_write) { + if (type == LOG_FLUSH) { + log_sys->n_pending_writes++; + group->n_pending_writes++; + } + + log_sys->n_log_ios++; + + fil_io(OS_FILE_WRITE | OS_FILE_LOG, sync, group->space_id, + next_offset / UNIV_PAGE_SIZE, + next_offset % UNIV_PAGE_SIZE, write_len, buf, group); + } + + if (write_len < len) { + start_lsn = ut_dulint_add(start_lsn, write_len); + len -= write_len; + buf += write_len; + + write_header = TRUE; + + goto loop; + } +} + +/********************************************************** +This function is called, e.g., when a transaction wants to commit. It checks +that the log has been flushed to disk up to the last log entry written by the +transaction. If there is a flush running, it waits and checks if the flush +flushed enough. If not, starts a new flush. */ + +void +log_flush_up_to( +/*============*/ + dulint lsn, /* in: log sequence number up to which the log should + be flushed, ut_dulint_max if not specified */ + ulint wait) /* in: LOG_NO_WAIT, LOG_WAIT_ONE_GROUP, + or LOG_WAIT_ALL_GROUPS */ +{ + log_group_t* group; + ulint start_offset; + ulint end_offset; + ulint area_start; + ulint area_end; + ulint loop_count; + + if (recv_no_ibuf_operations) { + /* Recovery is running and no operations on the log files are + allowed yet (the variable name .._no_ibuf_.. is misleading) */ + + return; + } + + loop_count = 0; +loop: + loop_count++; + + ut_ad(loop_count < 5); + + if (loop_count > 2) { +/* printf("Log loop count %lu\n", loop_count); */ + } + + mutex_enter(&(log_sys->mutex)); + + if ((ut_dulint_cmp(log_sys->written_to_all_lsn, lsn) >= 0) + || ((ut_dulint_cmp(log_sys->written_to_some_lsn, lsn) >= 0) + && (wait != LOG_WAIT_ALL_GROUPS))) { + + mutex_exit(&(log_sys->mutex)); + + return; + } + + if (log_sys->n_pending_writes > 0) { + /* A flush is running */ + + if (ut_dulint_cmp(log_sys->flush_lsn, lsn) >= 0) { + /* The flush will flush enough: wait for it to + complete */ + + goto do_waits; + } + + mutex_exit(&(log_sys->mutex)); + + /* Wait for the flush to complete and try to start a new + flush */ + + os_event_wait(log_sys->no_flush_event); + + goto loop; + } + + if (log_sys->buf_free == log_sys->buf_next_to_write) { + /* Nothing to flush */ + + mutex_exit(&(log_sys->mutex)); + + return; + } + + if (log_debug_writes) { + printf("Flushing log from %lu %lu up to lsn %lu %lu\n", + ut_dulint_get_high(log_sys->written_to_all_lsn), + ut_dulint_get_low(log_sys->written_to_all_lsn), + ut_dulint_get_high(log_sys->lsn), + ut_dulint_get_low(log_sys->lsn)); + } + + os_event_reset(log_sys->no_flush_event); + os_event_reset(log_sys->one_flushed_event); + + start_offset = log_sys->buf_next_to_write; + end_offset = log_sys->buf_free; + + area_start = ut_calc_align_down(start_offset, OS_FILE_LOG_BLOCK_SIZE); + area_end = ut_calc_align(end_offset, OS_FILE_LOG_BLOCK_SIZE); + + ut_ad(area_end - area_start > 0); + + log_sys->flush_lsn = log_sys->lsn; + log_sys->one_flushed = FALSE; + + log_block_set_flush_bit(log_sys->buf + area_start, TRUE); + log_block_set_checkpoint_no( + log_sys->buf + area_end - OS_FILE_LOG_BLOCK_SIZE, + log_sys->next_checkpoint_no); + + /* Copy the last, incompletely written, log block a log block length + up, so that when the flush operation writes from the log buffer, the + segment to write will not be changed by writers to the log */ + + ut_memcpy(log_sys->buf + area_end, + log_sys->buf + area_end - OS_FILE_LOG_BLOCK_SIZE, + OS_FILE_LOG_BLOCK_SIZE); + + log_sys->buf_free += OS_FILE_LOG_BLOCK_SIZE; + log_sys->flush_end_offset = log_sys->buf_free; + + group = UT_LIST_GET_FIRST(log_sys->log_groups); + + while (group) { + log_group_write_buf(LOG_FLUSH, group, + log_sys->buf + area_start, + area_end - area_start, + ut_dulint_align_down(log_sys->written_to_all_lsn, + OS_FILE_LOG_BLOCK_SIZE), + start_offset - area_start); + + log_group_set_fields(group, log_sys->flush_lsn); + + group = UT_LIST_GET_NEXT(log_groups, group); + } + +do_waits: + mutex_exit(&(log_sys->mutex)); + + if (wait == LOG_WAIT_ONE_GROUP) { + os_event_wait(log_sys->one_flushed_event); + } else if (wait == LOG_WAIT_ALL_GROUPS) { + os_event_wait(log_sys->no_flush_event); + } else { + ut_ad(wait == LOG_NO_WAIT); + } +} + +/******************************************************************** +Tries to establish a big enough margin of free space in the log buffer, such +that a new log entry can be catenated without an immediate need for a flush. */ +static +void +log_flush_margin(void) +/*==================*/ +{ + ibool do_flush = FALSE; + log_t* log = log_sys; + + mutex_enter(&(log->mutex)); + + if (log->buf_free > log->max_buf_free) { + + if (log->n_pending_writes > 0) { + /* A flush is running: hope that it will provide enough + free space */ + } else { + do_flush = TRUE; + } + } + + mutex_exit(&(log->mutex)); + + if (do_flush) { + log_flush_up_to(ut_dulint_max, LOG_NO_WAIT); + } +} + +/******************************************************************** +Advances the smallest lsn for which there are unflushed dirty blocks in the +buffer pool. NOTE: this function may only be called if the calling thread owns +no synchronization objects! */ + +ibool +log_preflush_pool_modified_pages( +/*=============================*/ + /* out: FALSE if there was a flush batch of + the same type running, which means that we + could not start this flush batch */ + dulint new_oldest, /* in: try to advance oldest_modified_lsn + at least to this lsn */ + ibool sync) /* in: TRUE if synchronous operation is + desired */ +{ + ulint n_pages; + + if (recv_recovery_on) { + /* If the recovery is running, we must first apply all + log records to their respective file pages to get the + right modify lsn values to these pages: otherwise, there + might be pages on disk which are not yet recovered to the + current lsn, and even after calling this function, we could + not know how up-to-date the disk version of the database is, + and we could not make a new checkpoint on the basis of the + info on the buffer pool only. */ + + recv_apply_hashed_log_recs(TRUE); + } + + n_pages = buf_flush_batch(BUF_FLUSH_LIST, ULINT_MAX, new_oldest); + + if (sync) { + buf_flush_wait_batch_end(BUF_FLUSH_LIST); + } + + if (n_pages == ULINT_UNDEFINED) { + + return(FALSE); + } + + return(TRUE); +} + +/********************************************************** +Completes a checkpoint. */ +static +void +log_complete_checkpoint(void) +/*=========================*/ +{ + ut_ad(mutex_own(&(log_sys->mutex))); + ut_ad(log_sys->n_pending_checkpoint_writes == 0); + + log_sys->next_checkpoint_no + = ut_dulint_add(log_sys->next_checkpoint_no, 1); + + log_sys->last_checkpoint_lsn = log_sys->next_checkpoint_lsn; + + rw_lock_x_unlock_gen(&(log_sys->checkpoint_lock), LOG_CHECKPOINT); +} + +/********************************************************** +Completes an asynchronous checkpoint info write i/o to a log file. */ +static +void +log_io_complete_checkpoint( +/*=======================*/ + log_group_t* group) /* in: log group */ +{ + mutex_enter(&(log_sys->mutex)); + + ut_ad(log_sys->n_pending_checkpoint_writes > 0); + + log_sys->n_pending_checkpoint_writes--; + + if (log_debug_writes) { + printf("Checkpoint info written to group %lu\n", group->id); + } + + if (log_sys->n_pending_checkpoint_writes == 0) { + log_complete_checkpoint(); + } + + mutex_exit(&(log_sys->mutex)); +} + +/*********************************************************************** +Writes info to a checkpoint about a log group. */ +static +void +log_checkpoint_set_nth_group_info( +/*==============================*/ + byte* buf, /* in: buffer for checkpoint info */ + ulint n, /* in: nth slot */ + ulint file_no,/* in: archived file number */ + ulint offset) /* in: archived file offset */ +{ + ut_ad(n < LOG_MAX_N_GROUPS); + + mach_write_to_4(buf + LOG_CHECKPOINT_GROUP_ARRAY + + 8 * n + LOG_CHECKPOINT_ARCHIVED_FILE_NO, file_no); + mach_write_to_4(buf + LOG_CHECKPOINT_GROUP_ARRAY + + 8 * n + LOG_CHECKPOINT_ARCHIVED_OFFSET, offset); +} + +/*********************************************************************** +Gets info from a checkpoint about a log group. */ + +void +log_checkpoint_get_nth_group_info( +/*==============================*/ + byte* buf, /* in: buffer containing checkpoint info */ + ulint n, /* in: nth slot */ + ulint* file_no,/* out: archived file number */ + ulint* offset) /* out: archived file offset */ +{ + ut_ad(n < LOG_MAX_N_GROUPS); + + *file_no = mach_read_from_4(buf + LOG_CHECKPOINT_GROUP_ARRAY + + 8 * n + LOG_CHECKPOINT_ARCHIVED_FILE_NO); + *offset = mach_read_from_4(buf + LOG_CHECKPOINT_GROUP_ARRAY + + 8 * n + LOG_CHECKPOINT_ARCHIVED_OFFSET); +} + +/********************************************************** +Writes the checkpoint info to a log group header. */ +static +void +log_group_checkpoint( +/*=================*/ + log_group_t* group) /* in: log group */ +{ + log_group_t* group2; + dulint archived_lsn; + dulint next_archived_lsn; + ulint write_offset; + ulint fold; + byte* buf; + ulint i; + + ut_ad(mutex_own(&(log_sys->mutex))); + ut_ad(LOG_CHECKPOINT_SIZE <= OS_FILE_LOG_BLOCK_SIZE); + + buf = group->checkpoint_buf; + + mach_write_to_8(buf + LOG_CHECKPOINT_NO, log_sys->next_checkpoint_no); + mach_write_to_8(buf + LOG_CHECKPOINT_LSN, log_sys->next_checkpoint_lsn); + + mach_write_to_4(buf + LOG_CHECKPOINT_OFFSET, + log_group_calc_lsn_offset( + log_sys->next_checkpoint_lsn, group)); + + mach_write_to_4(buf + LOG_CHECKPOINT_LOG_BUF_SIZE, log_sys->buf_size); + + if (log_sys->archiving_state == LOG_ARCH_OFF) { + archived_lsn = ut_dulint_max; + } else { + archived_lsn = log_sys->archived_lsn; + + if (0 != ut_dulint_cmp(archived_lsn, + log_sys->next_archived_lsn)) { + next_archived_lsn = log_sys->next_archived_lsn; + /* For debugging only */ + } + } + + mach_write_to_8(buf + LOG_CHECKPOINT_ARCHIVED_LSN, archived_lsn); + + for (i = 0; i < LOG_MAX_N_GROUPS; i++) { + log_checkpoint_set_nth_group_info(buf, i, 0, 0); + } + + group2 = UT_LIST_GET_FIRST(log_sys->log_groups); + + while (group2) { + log_checkpoint_set_nth_group_info(buf, group2->id, + group2->archived_file_no, + group2->archived_offset); + + group2 = UT_LIST_GET_NEXT(log_groups, group2); + } + + fold = ut_fold_binary(buf, LOG_CHECKPOINT_CHECKSUM_1); + mach_write_to_4(buf + LOG_CHECKPOINT_CHECKSUM_1, fold); + + fold = ut_fold_binary(buf + LOG_CHECKPOINT_LSN, + LOG_CHECKPOINT_CHECKSUM_2 - LOG_CHECKPOINT_LSN); + mach_write_to_4(buf + LOG_CHECKPOINT_CHECKSUM_2, fold); + + /* We alternate the physical place of the checkpoint info in the first + log file */ + + if (ut_dulint_get_low(log_sys->next_checkpoint_no) % 2 == 0) { + write_offset = LOG_CHECKPOINT_1; + } else { + write_offset = LOG_CHECKPOINT_2; + } + + if (log_do_write) { + if (log_sys->n_pending_checkpoint_writes == 0) { + + rw_lock_x_lock_gen(&(log_sys->checkpoint_lock), + LOG_CHECKPOINT); + } + + log_sys->n_pending_checkpoint_writes++; + + log_sys->n_log_ios++; + + /* We send as the last parameter the group machine address + added with 1, as we want to distinguish between a normal log + file write and a checkpoint field write */ + + fil_io(OS_FILE_WRITE | OS_FILE_LOG, FALSE, group->space_id, + write_offset / UNIV_PAGE_SIZE, + write_offset % UNIV_PAGE_SIZE, + OS_FILE_LOG_BLOCK_SIZE, + buf, ((byte*)group + 1)); + + ut_ad(((ulint)group & 0x1) == 0); + } +} + +/********************************************************** +Reads a checkpoint info from a log group header to log_sys->checkpoint_buf. */ + +void +log_group_read_checkpoint_info( +/*===========================*/ + log_group_t* group, /* in: log group */ + ulint field) /* in: LOG_CHECKPOINT_1 or LOG_CHECKPOINT_2 */ +{ + ut_ad(mutex_own(&(log_sys->mutex))); + + log_sys->n_log_ios++; + + fil_io(OS_FILE_READ | OS_FILE_LOG, TRUE, group->space_id, + field / UNIV_PAGE_SIZE, field % UNIV_PAGE_SIZE, + OS_FILE_LOG_BLOCK_SIZE, log_sys->checkpoint_buf, NULL); +} + +/********************************************************** +Writes checkpoint info to groups. */ + +void +log_groups_write_checkpoint_info(void) +/*==================================*/ +{ + log_group_t* group; + + ut_ad(mutex_own(&(log_sys->mutex))); + + group = UT_LIST_GET_FIRST(log_sys->log_groups); + + while (group) { + log_group_checkpoint(group); + + group = UT_LIST_GET_NEXT(log_groups, group); + } +} + +/********************************************************** +Makes a checkpoint. Note that this function does not flush dirty +blocks from the buffer pool: it only checks what is lsn of the oldest +modification in the pool, and writes information about the lsn in +log files. Use log_make_checkpoint_at to flush also the pool. */ + +ibool +log_checkpoint( +/*===========*/ + /* out: TRUE if success, FALSE if a checkpoint + write was already running */ + ibool sync, /* in: TRUE if synchronous operation is + desired */ + ibool write_always) /* in: the function normally checks if the + the new checkpoint would have a greater + lsn than the previous one: if not, then no + physical write is done; by setting this + parameter TRUE, a physical write will always be + made to log files */ +{ + dulint oldest_lsn; + + if (recv_recovery_is_on()) { + recv_apply_hashed_log_recs(TRUE); + } + + fil_flush_file_spaces(FIL_TABLESPACE); + + mutex_enter(&(log_sys->mutex)); + + oldest_lsn = log_buf_pool_get_oldest_modification(); + + mutex_exit(&(log_sys->mutex)); + + /* Because log also contains headers and dummy log records, + if the buffer pool contains no dirty buffers, oldest_lsn + gets the value log_sys->lsn from the previous function, + and we must make sure that the log is flushed up to that + lsn. If there are dirty buffers in the buffer pool, then our + write-ahead-logging algorithm ensures that the log has been flushed + up to oldest_lsn. */ + + log_flush_up_to(oldest_lsn, LOG_WAIT_ALL_GROUPS); + + mutex_enter(&(log_sys->mutex)); + + if (!write_always && ut_dulint_cmp( + log_sys->last_checkpoint_lsn, oldest_lsn) >= 0) { + + mutex_exit(&(log_sys->mutex)); + + return(TRUE); + } + + ut_ad(ut_dulint_cmp(log_sys->written_to_all_lsn, oldest_lsn) >= 0); + + if (log_sys->n_pending_checkpoint_writes > 0) { + /* A checkpoint write is running */ + + mutex_exit(&(log_sys->mutex)); + + if (sync) { + /* Wait for the checkpoint write to complete */ + rw_lock_s_lock(&(log_sys->checkpoint_lock)); + rw_lock_s_unlock(&(log_sys->checkpoint_lock)); + } + + return(FALSE); + } + + log_sys->next_checkpoint_lsn = oldest_lsn; + + if (log_debug_writes) { + printf("Making checkpoint no %lu at lsn %lu %lu\n", + ut_dulint_get_low(log_sys->next_checkpoint_no), + ut_dulint_get_high(oldest_lsn), + ut_dulint_get_low(oldest_lsn)); + } + + log_groups_write_checkpoint_info(); + + mutex_exit(&(log_sys->mutex)); + + if (sync) { + /* Wait for the checkpoint write to complete */ + rw_lock_s_lock(&(log_sys->checkpoint_lock)); + rw_lock_s_unlock(&(log_sys->checkpoint_lock)); + } + + return(TRUE); +} + +/******************************************************************** +Makes a checkpoint at a given lsn or later. */ + +void +log_make_checkpoint_at( +/*===================*/ + dulint lsn, /* in: make a checkpoint at this or a later + lsn, if ut_dulint_max, makes a checkpoint at + the latest lsn */ + ibool write_always) /* in: the function normally checks if the + the new checkpoint would have a greater + lsn than the previous one: if not, then no + physical write is done; by setting this + parameter TRUE, a physical write will always be + made to log files */ +{ + ibool success; + + /* Preflush pages synchronously */ + + success = FALSE; + + while (!success) { + success = log_preflush_pool_modified_pages(lsn, TRUE); + } + + success = FALSE; + + while (!success) { + success = log_checkpoint(TRUE, write_always); + } +} + +/******************************************************************** +Tries to establish a big enough margin of free space in the log groups, such +that a new log entry can be catenated without an immediate need for a +checkpoint. NOTE: this function may only be called if the calling thread +owns no synchronization objects! */ +static +void +log_checkpoint_margin(void) +/*=======================*/ +{ + log_t* log = log_sys; + ulint age; + ulint checkpoint_age; + ulint advance; + dulint oldest_lsn; + dulint new_oldest; + ibool do_preflush; + ibool sync; + ibool checkpoint_sync; + ibool do_checkpoint; + ibool success; +loop: + sync = FALSE; + checkpoint_sync = FALSE; + do_preflush = FALSE; + do_checkpoint = FALSE; + + mutex_enter(&(log->mutex)); + + if (log->check_flush_or_checkpoint == FALSE) { + mutex_exit(&(log->mutex)); + + return; + } + + oldest_lsn = log_buf_pool_get_oldest_modification(); + + age = ut_dulint_minus(log->lsn, oldest_lsn); + + if (age > log->max_modified_age_sync) { + + /* A flush is urgent: we have to do a synchronous preflush */ + + sync = TRUE; + + advance = 2 * (age - log->max_modified_age_sync); + + new_oldest = ut_dulint_add(oldest_lsn, advance); + + do_preflush = TRUE; + + } else if (age > log->max_modified_age_async) { + + /* A flush is not urgent: we do an asynchronous preflush */ + advance = age - log->max_modified_age_async; + + new_oldest = ut_dulint_add(oldest_lsn, advance); + + do_preflush = TRUE; + } + + checkpoint_age = ut_dulint_minus(log->lsn, log->last_checkpoint_lsn); + + if (checkpoint_age > log->max_checkpoint_age) { + /* A checkpoint is urgent: we do it synchronously */ + + checkpoint_sync = TRUE; + + do_checkpoint = TRUE; + + } else if (checkpoint_age > log->max_checkpoint_age_async) { + /* A checkpoint is not urgent: do it asynchronously */ + + do_checkpoint = TRUE; + + log->check_flush_or_checkpoint = FALSE; + } else { + log->check_flush_or_checkpoint = FALSE; + } + + mutex_exit(&(log->mutex)); + + if (do_preflush) { + success = log_preflush_pool_modified_pages(new_oldest, sync); + + /* If the flush succeeded, this thread has done its part + and can proceed. If it did not succeed, there was another + thread doing a flush at the same time. If sync was FALSE, + the flush was not urgent, and we let this thread proceed. + Otherwise, we let it start from the beginning again. */ + + if (sync && !success) { + mutex_enter(&(log->mutex)); + + log->check_flush_or_checkpoint = TRUE; + + mutex_exit(&(log->mutex)); + goto loop; + } + } + + if (do_checkpoint) { + log_checkpoint(checkpoint_sync, FALSE); + + if (checkpoint_sync) { + + goto loop; + } + } +} + +/********************************************************** +Reads a specified log segment to a buffer. */ + +void +log_group_read_log_seg( +/*===================*/ + ulint type, /* in: LOG_ARCHIVE or LOG_RECOVER */ + byte* buf, /* in: buffer where to read */ + log_group_t* group, /* in: log group */ + dulint start_lsn, /* in: read area start */ + dulint end_lsn) /* in: read area end */ +{ + ulint len; + ulint source_offset; + ibool sync; + + ut_ad(mutex_own(&(log_sys->mutex))); + + sync = FALSE; + + if (type == LOG_RECOVER) { + sync = TRUE; + } +loop: + source_offset = log_group_calc_lsn_offset(start_lsn, group); + + len = ut_dulint_minus(end_lsn, start_lsn); + + ut_ad(len != 0); + + if ((source_offset % group->file_size) + len > group->file_size) { + + len = group->file_size - (source_offset % group->file_size); + } + + if (type == LOG_ARCHIVE) { + + log_sys->n_pending_archive_ios++; + } + + log_sys->n_log_ios++; + + fil_io(OS_FILE_READ | OS_FILE_LOG, sync, group->space_id, + source_offset / UNIV_PAGE_SIZE, source_offset % UNIV_PAGE_SIZE, + len, buf, &log_archive_io); + + start_lsn = ut_dulint_add(start_lsn, len); + buf += len; + + if (ut_dulint_cmp(start_lsn, end_lsn) != 0) { + + goto loop; + } +} + +/********************************************************** +Generates an archived log file name. */ + +void +log_archived_file_name_gen( +/*=======================*/ + char* buf, /* in: buffer where to write */ + ulint id, /* in: group id */ + ulint file_no)/* in: file number */ +{ + UT_NOT_USED(id); /* Currently we only archive the first group */ + + sprintf(buf, "%sib_arch_log_%010lu", srv_arch_dir, file_no); +} + +/********************************************************** +Writes a log file header to a log file space. */ +static +void +log_group_archive_file_header_write( +/*================================*/ + log_group_t* group, /* in: log group */ + ulint nth_file, /* in: header to the nth file in the + archive log file space */ + ulint file_no, /* in: archived file number */ + dulint start_lsn) /* in: log file data starts at this + lsn */ +{ + byte* buf; + ulint dest_offset; + + ut_ad(mutex_own(&(log_sys->mutex))); + + ut_a(nth_file < group->n_files); + + buf = *(group->archive_file_header_bufs + nth_file); + + mach_write_to_4(buf + LOG_GROUP_ID, group->id); + mach_write_to_8(buf + LOG_FILE_START_LSN, start_lsn); + mach_write_to_4(buf + LOG_FILE_NO, file_no); + + mach_write_to_4(buf + LOG_FILE_ARCH_COMPLETED, FALSE); + + dest_offset = nth_file * group->file_size; + + log_sys->n_log_ios++; + + fil_io(OS_FILE_WRITE | OS_FILE_LOG, TRUE, group->archive_space_id, + dest_offset / UNIV_PAGE_SIZE, + dest_offset % UNIV_PAGE_SIZE, + 2 * OS_FILE_LOG_BLOCK_SIZE, + buf, &log_archive_io); +} + +/********************************************************** +Writes a log file header to a completed archived log file. */ +static +void +log_group_archive_completed_header_write( +/*=====================================*/ + log_group_t* group, /* in: log group */ + ulint nth_file, /* in: header to the nth file in the + archive log file space */ + dulint end_lsn) /* in: end lsn of the file */ +{ + byte* buf; + ulint dest_offset; + + ut_ad(mutex_own(&(log_sys->mutex))); + ut_a(nth_file < group->n_files); + + buf = *(group->archive_file_header_bufs + nth_file); + + mach_write_to_4(buf + LOG_FILE_ARCH_COMPLETED, TRUE); + mach_write_to_8(buf + LOG_FILE_END_LSN, end_lsn); + + dest_offset = nth_file * group->file_size + LOG_FILE_ARCH_COMPLETED; + + log_sys->n_log_ios++; + + fil_io(OS_FILE_WRITE | OS_FILE_LOG, TRUE, group->archive_space_id, + dest_offset / UNIV_PAGE_SIZE, + dest_offset % UNIV_PAGE_SIZE, + OS_FILE_LOG_BLOCK_SIZE, + buf + LOG_FILE_ARCH_COMPLETED, + &log_archive_io); +} + +/********************************************************** +Does the archive writes for a single log group. */ +static +void +log_group_archive( +/*==============*/ + log_group_t* group) /* in: log group */ +{ + os_file_t file_handle; + dulint start_lsn; + dulint end_lsn; + char name[100]; + byte* buf; + ulint len; + ibool ret; + ulint next_offset; + ulint n_files; + ulint open_mode; + + ut_ad(mutex_own(&(log_sys->mutex))); + + start_lsn = log_sys->archived_lsn; + + ut_ad(ut_dulint_get_low(start_lsn) % OS_FILE_LOG_BLOCK_SIZE == 0); + + end_lsn = log_sys->next_archived_lsn; + + ut_ad(ut_dulint_get_low(end_lsn) % OS_FILE_LOG_BLOCK_SIZE == 0); + + buf = log_sys->archive_buf; + + n_files = 0; + + next_offset = group->archived_offset; +loop: + if ((next_offset % group->file_size == 0) + || (fil_space_get_size(group->archive_space_id) == 0)) { + + /* Add the file to the archive file space; create or open the + file */ + + if (next_offset % group->file_size == 0) { + open_mode = OS_FILE_CREATE; + } else { + open_mode = OS_FILE_OPEN; + } + + log_archived_file_name_gen(name, group->id, + group->archived_file_no + n_files); + fil_reserve_right_to_open(); + + file_handle = os_file_create(name, open_mode, OS_FILE_AIO, + &ret); + if (!ret && (open_mode == OS_FILE_CREATE)) { + file_handle = os_file_create(name, OS_FILE_OPEN, + OS_FILE_AIO, &ret); + } + + ut_a(ret); + + if (log_debug_writes) { + printf("Created archive file %s\n", name); + } + + ret = os_file_close(file_handle); + + ut_a(ret); + + fil_release_right_to_open(); + + /* Add the archive file as a node to the space */ + + fil_node_create(name, group->file_size / UNIV_PAGE_SIZE, + group->archive_space_id); + + if (next_offset % group->file_size == 0) { + log_group_archive_file_header_write(group, n_files, + group->archived_file_no + n_files, + start_lsn); + + next_offset += LOG_FILE_HDR_SIZE; + } + } + + len = ut_dulint_minus(end_lsn, start_lsn); + + if (group->file_size < (next_offset % group->file_size) + len) { + + len = group->file_size - (next_offset % group->file_size); + } + + if (log_debug_writes) { + printf( + "Archiving starting at lsn %lu %lu, len %lu to group %lu\n", + ut_dulint_get_high(start_lsn), + ut_dulint_get_low(start_lsn), + len, group->id); + } + + log_sys->n_pending_archive_ios++; + + log_sys->n_log_ios++; + + fil_io(OS_FILE_WRITE | OS_FILE_LOG, FALSE, group->archive_space_id, + next_offset / UNIV_PAGE_SIZE, next_offset % UNIV_PAGE_SIZE, + ut_calc_align(len, OS_FILE_LOG_BLOCK_SIZE), buf, + &log_archive_io); + + start_lsn = ut_dulint_add(start_lsn, len); + next_offset += len; + buf += len; + + if (next_offset % group->file_size == 0) { + n_files++; + } + + if (ut_dulint_cmp(end_lsn, start_lsn) != 0) { + + goto loop; + } + + group->next_archived_file_no = group->archived_file_no + n_files; + group->next_archived_offset = next_offset % group->file_size; + + ut_ad(group->next_archived_offset % OS_FILE_LOG_BLOCK_SIZE == 0); +} + +/********************************************************* +(Writes to the archive of each log group.) Currently, only the first +group is archived. */ +static +void +log_archive_groups(void) +/*====================*/ +{ + log_group_t* group; + + ut_ad(mutex_own(&(log_sys->mutex))); + + group = UT_LIST_GET_FIRST(log_sys->log_groups); + + log_group_archive(group); +} + +/********************************************************* +Completes the archiving write phase for (each log group), currently, +the first log group. */ +static +void +log_archive_write_complete_groups(void) +/*===================================*/ +{ + log_group_t* group; + ulint end_offset; + ulint trunc_files; + ulint n_files; + dulint start_lsn; + dulint end_lsn; + ulint i; + + ut_ad(mutex_own(&(log_sys->mutex))); + + group = UT_LIST_GET_FIRST(log_sys->log_groups); + + group->archived_file_no = group->next_archived_file_no; + group->archived_offset = group->next_archived_offset; + + /* Truncate from the archive file space all but the last + file, or if it has been written full, all files */ + + n_files = (UNIV_PAGE_SIZE + * fil_space_get_size(group->archive_space_id)) + / group->file_size; + ut_ad(n_files > 0); + + end_offset = group->archived_offset; + + if (end_offset % group->file_size == 0) { + + trunc_files = n_files; + } else { + trunc_files = n_files - 1; + } + + if (log_debug_writes && trunc_files) { + printf("Complete file(s) archived to group %lu\n", + group->id); + } + + /* Calculate the archive file space start lsn */ + start_lsn = ut_dulint_subtract(log_sys->next_archived_lsn, + end_offset - LOG_FILE_HDR_SIZE + + trunc_files + * (group->file_size - LOG_FILE_HDR_SIZE)); + end_lsn = start_lsn; + + for (i = 0; i < trunc_files; i++) { + + end_lsn = ut_dulint_add(end_lsn, + group->file_size - LOG_FILE_HDR_SIZE); + + /* Write a notice to the headers of archived log + files that the file write has been completed */ + + log_group_archive_completed_header_write(group, i, end_lsn); + } + + fil_space_truncate_start(group->archive_space_id, + trunc_files * group->file_size); + + if (log_debug_writes) { + printf("Archiving writes completed\n"); + } +} + +/********************************************************** +Completes an archiving i/o. */ +static +void +log_archive_check_completion_low(void) +/*==================================*/ +{ + ut_ad(mutex_own(&(log_sys->mutex))); + + if (log_sys->n_pending_archive_ios == 0 + && log_sys->archiving_phase == LOG_ARCHIVE_READ) { + + if (log_debug_writes) { + printf("Archiving read completed\n"); + } + + /* Archive buffer has now been read in: start archive writes */ + + log_sys->archiving_phase = LOG_ARCHIVE_WRITE; + + log_archive_groups(); + } + + if (log_sys->n_pending_archive_ios == 0 + && log_sys->archiving_phase == LOG_ARCHIVE_WRITE) { + + log_archive_write_complete_groups(); + + log_sys->archived_lsn = log_sys->next_archived_lsn; + + rw_lock_x_unlock_gen(&(log_sys->archive_lock), LOG_ARCHIVE); + } +} + +/********************************************************** +Completes an archiving i/o. */ +static +void +log_io_complete_archive(void) +/*=========================*/ +{ + log_group_t* group; + + mutex_enter(&(log_sys->mutex)); + + group = UT_LIST_GET_FIRST(log_sys->log_groups); + + mutex_exit(&(log_sys->mutex)); + + fil_flush(group->archive_space_id); + + mutex_enter(&(log_sys->mutex)); + + ut_ad(log_sys->n_pending_archive_ios > 0); + + log_sys->n_pending_archive_ios--; + + log_archive_check_completion_low(); + + mutex_exit(&(log_sys->mutex)); +} + +/************************************************************************ +Starts an archiving operation. */ + +ibool +log_archive_do( +/*===========*/ + /* out: TRUE if succeed, FALSE if an archiving + operation was already running */ + ibool sync, /* in: TRUE if synchronous operation is desired */ + ulint* n_bytes)/* out: archive log buffer size, 0 if nothing to + archive */ +{ + ibool calc_new_limit; + dulint start_lsn; + dulint limit_lsn; + + calc_new_limit = TRUE; +loop: + mutex_enter(&(log_sys->mutex)); + + if (log_sys->archiving_state == LOG_ARCH_OFF) { + mutex_exit(&(log_sys->mutex)); + + *n_bytes = 0; + + return(TRUE); + + } else if (log_sys->archiving_state == LOG_ARCH_STOPPED + || log_sys->archiving_state == LOG_ARCH_STOPPING2) { + + mutex_exit(&(log_sys->mutex)); + + os_event_wait(log_sys->archiving_on); + + mutex_enter(&(log_sys->mutex)); + + goto loop; + } + + start_lsn = log_sys->archived_lsn; + + if (calc_new_limit) { + ut_ad(log_sys->archive_buf_size % OS_FILE_LOG_BLOCK_SIZE == 0); + + limit_lsn = ut_dulint_add(start_lsn, + log_sys->archive_buf_size); + + *n_bytes = log_sys->archive_buf_size; + + if (ut_dulint_cmp(limit_lsn, log_sys->lsn) >= 0) { + + limit_lsn = ut_dulint_align_down(log_sys->lsn, + OS_FILE_LOG_BLOCK_SIZE); + } + } + + if (ut_dulint_cmp(log_sys->archived_lsn, limit_lsn) >= 0) { + + mutex_exit(&(log_sys->mutex)); + + *n_bytes = 0; + + return(TRUE); + } + + if (ut_dulint_cmp(log_sys->written_to_all_lsn, limit_lsn) < 0) { + + mutex_exit(&(log_sys->mutex)); + + log_flush_up_to(limit_lsn, LOG_WAIT_ALL_GROUPS); + + calc_new_limit = FALSE; + + goto loop; + } + + if (log_sys->n_pending_archive_ios > 0) { + /* An archiving operation is running */ + + mutex_exit(&(log_sys->mutex)); + + if (sync) { + rw_lock_s_lock(&(log_sys->archive_lock)); + rw_lock_s_unlock(&(log_sys->archive_lock)); + } + + *n_bytes = log_sys->archive_buf_size; + + return(FALSE); + } + + rw_lock_x_lock_gen(&(log_sys->archive_lock), LOG_ARCHIVE); + + log_sys->archiving_phase = LOG_ARCHIVE_READ; + + log_sys->next_archived_lsn = limit_lsn; + + if (log_debug_writes) { + printf("Archiving from lsn %lu %lu to lsn %lu %lu\n", + ut_dulint_get_high(log_sys->archived_lsn), + ut_dulint_get_low(log_sys->archived_lsn), + ut_dulint_get_high(limit_lsn), + ut_dulint_get_low(limit_lsn)); + } + + /* Read the log segment to the archive buffer */ + + log_group_read_log_seg(LOG_ARCHIVE, log_sys->archive_buf, + UT_LIST_GET_FIRST(log_sys->log_groups), + start_lsn, limit_lsn); + + mutex_exit(&(log_sys->mutex)); + + if (sync) { + rw_lock_s_lock(&(log_sys->archive_lock)); + rw_lock_s_unlock(&(log_sys->archive_lock)); + } + + *n_bytes = log_sys->archive_buf_size; + + return(TRUE); +} + +/******************************************************************** +Writes the log contents to the archive at least up to the lsn when this +function was called. */ +static +void +log_archive_all(void) +/*=================*/ +{ + dulint present_lsn; + ulint dummy; + + mutex_enter(&(log_sys->mutex)); + + if (log_sys->archiving_state == LOG_ARCH_OFF) { + mutex_exit(&(log_sys->mutex)); + + return; + } + + present_lsn = log_sys->lsn; + + mutex_exit(&(log_sys->mutex)); + + log_pad_current_log_block(); + + for (;;) { + mutex_enter(&(log_sys->mutex)); + + if (ut_dulint_cmp(present_lsn, log_sys->archived_lsn) <= 0) { + + mutex_exit(&(log_sys->mutex)); + + return; + } + + mutex_exit(&(log_sys->mutex)); + + log_archive_do(TRUE, &dummy); + } +} + +/********************************************************* +Closes the possible open archive log file (for each group) the first group, +and if it was open, increments the group file count by 2, if desired. */ +static +void +log_archive_close_groups( +/*=====================*/ + ibool increment_file_count) /* in: TRUE if we want to increment + the file count */ +{ + log_group_t* group; + ulint trunc_len; + + ut_ad(mutex_own(&(log_sys->mutex))); + + group = UT_LIST_GET_FIRST(log_sys->log_groups); + + trunc_len = UNIV_PAGE_SIZE + * fil_space_get_size(group->archive_space_id); + + if (trunc_len > 0) { + ut_a(trunc_len == group->file_size); + + /* Write a notice to the headers of archived log + files that the file write has been completed */ + + log_group_archive_completed_header_write(group, + 0, log_sys->archived_lsn); + + fil_space_truncate_start(group->archive_space_id, + trunc_len); + if (increment_file_count) { + group->archived_offset = 0; + group->archived_file_no += 2; + } + + if (log_debug_writes) { + printf( + "Incrementing arch file no to %lu in log group %lu\n", + group->archived_file_no + 2, group->id); + } + } +} + +/******************************************************************** +Writes the log contents to the archive up to the lsn when this function was +called, and stops the archiving. When archiving is started again, the archived +log file numbers start from 2 higher, so that the archiving will +not write again to the archived log files which exist when this function +returns. */ + +ulint +log_archive_stop(void) +/*==================*/ + /* out: DB_SUCCESS or DB_ERROR */ +{ + ibool success; + + mutex_enter(&(log_sys->mutex)); + + if (log_sys->archiving_state != LOG_ARCH_ON) { + + mutex_exit(&(log_sys->mutex)); + + return(DB_ERROR); + } + + log_sys->archiving_state = LOG_ARCH_STOPPING; + + mutex_exit(&(log_sys->mutex)); + + log_archive_all(); + + mutex_enter(&(log_sys->mutex)); + + log_sys->archiving_state = LOG_ARCH_STOPPING2; + os_event_reset(log_sys->archiving_on); + + mutex_exit(&(log_sys->mutex)); + + /* Wait for a possible archiving operation to end */ + + rw_lock_s_lock(&(log_sys->archive_lock)); + rw_lock_s_unlock(&(log_sys->archive_lock)); + + mutex_enter(&(log_sys->mutex)); + + /* Close all archived log files, incrementing the file count by 2, + if appropriate */ + + log_archive_close_groups(TRUE); + + mutex_exit(&(log_sys->mutex)); + + /* Make a checkpoint, so that if recovery is needed, the file numbers + of new archived log files will start from the right value */ + + success = FALSE; + + while (!success) { + success = log_checkpoint(TRUE, TRUE); + } + + mutex_enter(&(log_sys->mutex)); + + log_sys->archiving_state = LOG_ARCH_STOPPED; + + mutex_exit(&(log_sys->mutex)); + + return(DB_SUCCESS); +} + +/******************************************************************** +Starts again archiving which has been stopped. */ + +ulint +log_archive_start(void) +/*===================*/ + /* out: DB_SUCCESS or DB_ERROR */ +{ + mutex_enter(&(log_sys->mutex)); + + if (log_sys->archiving_state != LOG_ARCH_STOPPED) { + + mutex_exit(&(log_sys->mutex)); + + return(DB_ERROR); + } + + log_sys->archiving_state = LOG_ARCH_ON; + + os_event_set(log_sys->archiving_on); + + mutex_exit(&(log_sys->mutex)); + + return(DB_SUCCESS); +} + +/******************************************************************** +Stop archiving the log so that a gap may occur in the archived log files. */ + +ulint +log_archive_noarchivelog(void) +/*==========================*/ + /* out: DB_SUCCESS or DB_ERROR */ +{ +loop: + mutex_enter(&(log_sys->mutex)); + + if (log_sys->archiving_state == LOG_ARCH_STOPPED + || log_sys->archiving_state == LOG_ARCH_OFF) { + + log_sys->archiving_state = LOG_ARCH_OFF; + + os_event_set(log_sys->archiving_on); + + mutex_exit(&(log_sys->mutex)); + + return(DB_SUCCESS); + } + + mutex_exit(&(log_sys->mutex)); + + log_archive_stop(); + + os_thread_sleep(500000); + + goto loop; +} + +/******************************************************************** +Start archiving the log so that a gap may occur in the archived log files. */ + +ulint +log_archive_archivelog(void) +/*========================*/ + /* out: DB_SUCCESS or DB_ERROR */ +{ + mutex_enter(&(log_sys->mutex)); + + if (log_sys->archiving_state == LOG_ARCH_OFF) { + + log_sys->archiving_state = LOG_ARCH_ON; + + log_sys->archived_lsn = ut_dulint_align_down(log_sys->lsn, + OS_FILE_LOG_BLOCK_SIZE); + mutex_exit(&(log_sys->mutex)); + + return(DB_SUCCESS); + } + + mutex_exit(&(log_sys->mutex)); + + return(DB_ERROR); +} + +/******************************************************************** +Tries to establish a big enough margin of free space in the log groups, such +that a new log entry can be catenated without an immediate need for +archiving. */ +static +void +log_archive_margin(void) +/*====================*/ +{ + log_t* log = log_sys; + ulint age; + ibool sync; + ulint dummy; +loop: + mutex_enter(&(log->mutex)); + + if (log->archiving_state == LOG_ARCH_OFF) { + mutex_exit(&(log->mutex)); + + return; + } + + age = ut_dulint_minus(log->lsn, log->archived_lsn); + + if (age > log->max_archived_lsn_age) { + + /* An archiving is urgent: we have to do synchronous i/o */ + + sync = TRUE; + + } else if (age > log->max_archived_lsn_age_async) { + + /* An archiving is not urgent: we do asynchronous i/o */ + + sync = FALSE; + } else { + /* No archiving required yet */ + + mutex_exit(&(log->mutex)); + + return; + } + + mutex_exit(&(log->mutex)); + + log_archive_do(sync, &dummy); + + if (sync == TRUE) { + /* Check again that enough was written to the archive */ + + goto loop; + } +} + +/************************************************************************ +Checks that there is enough free space in the log to start a new query step. +Flushes the log buffer or makes a new checkpoint if necessary. NOTE: this +function may only be called if the calling thread owns no synchronization +objects! */ + +void +log_check_margins(void) +/*===================*/ +{ +loop: + log_flush_margin(); + + log_checkpoint_margin(); + + log_archive_margin(); + + mutex_enter(&(log_sys->mutex)); + + if (log_sys->check_flush_or_checkpoint) { + + mutex_exit(&(log_sys->mutex)); + + goto loop; + } + + mutex_exit(&(log_sys->mutex)); +} + +/********************************************************** +Switches the database to the online backup state. */ + +ulint +log_switch_backup_state_on(void) +/*============================*/ + /* out: DB_SUCCESS or DB_ERROR */ +{ + dulint backup_lsn; + + mutex_enter(&(log_sys->mutex)); + + if (log_sys->online_backup_state) { + + /* The database is already in that state */ + + mutex_exit(&(log_sys->mutex)); + + return(DB_ERROR); + } + + log_sys->online_backup_state = TRUE; + + backup_lsn = log_sys->lsn; + + log_sys->online_backup_lsn = backup_lsn; + + mutex_exit(&(log_sys->mutex)); + + /* log_checkpoint_and_mark_file_spaces(); */ + + return(DB_SUCCESS); +} + +/********************************************************** +Switches the online backup state off. */ + +ulint +log_switch_backup_state_off(void) +/*=============================*/ + /* out: DB_SUCCESS or DB_ERROR */ +{ + mutex_enter(&(log_sys->mutex)); + + if (!log_sys->online_backup_state) { + + /* The database is already in that state */ + + mutex_exit(&(log_sys->mutex)); + + return(DB_ERROR); + } + + log_sys->online_backup_state = FALSE; + + mutex_exit(&(log_sys->mutex)); + + return(DB_SUCCESS); +} + +/******************************************************************** +Makes a checkpoint at the latest lsn and writes it to first page of each +data file in the database, so that we know that the file spaces contain +all modifications up to that lsn. This can only be called at database +shutdown. This function also writes all log in log files to the log archive. */ + +void +logs_empty_and_mark_files_at_shutdown(void) +/*=======================================*/ +{ + dulint lsn; + ulint arch_log_no; + + fprintf(stderr, "Innobase: Starting shutdown...\n"); + + /* Wait until the master thread and all other operations are idle: our + algorithm only works if the server is idle at shutdown */ +loop: + os_thread_sleep(100000); + + mutex_enter(&kernel_mutex); + + if (trx_n_mysql_transactions > 0 + || UT_LIST_GET_LEN(trx_sys->trx_list) > 0) { + + mutex_exit(&kernel_mutex); + + goto loop; + } + + if (srv_n_threads_active[SRV_MASTER] != 0) { + + mutex_exit(&kernel_mutex); + + goto loop; + } + + mutex_exit(&kernel_mutex); + + mutex_enter(&(log_sys->mutex)); + + if (log_sys->n_pending_archive_ios + + log_sys->n_pending_checkpoint_writes + + log_sys->n_pending_writes > 0) { + + mutex_exit(&(log_sys->mutex)); + + goto loop; + } + + mutex_exit(&(log_sys->mutex)); + + if (!buf_pool_check_no_pending_io()) { + + goto loop; + } + + log_archive_all(); + + log_make_checkpoint_at(ut_dulint_max, TRUE); + + mutex_enter(&(log_sys->mutex)); + + lsn = log_sys->lsn; + + if (ut_dulint_cmp(lsn, log_sys->last_checkpoint_lsn) != 0 + || (srv_log_archive_on + && ut_dulint_cmp(lsn, + ut_dulint_add(log_sys->archived_lsn, LOG_BLOCK_HDR_SIZE)) != 0)) { + + mutex_exit(&(log_sys->mutex)); + + goto loop; + } + + arch_log_no = + UT_LIST_GET_FIRST(log_sys->log_groups)->archived_file_no; + + if (0 == UT_LIST_GET_FIRST(log_sys->log_groups)->archived_offset) { + + arch_log_no--; + } + + log_archive_close_groups(TRUE); + + mutex_exit(&(log_sys->mutex)); + + fil_flush_file_spaces(FIL_TABLESPACE); + fil_flush_file_spaces(FIL_LOG); + + /* The following fil_write_... will pass the buffer pool: therefore + it is essential that the buffer pool has been completely flushed + to disk! */ + + if (!buf_all_freed()) { + + goto loop; + } + + fil_write_flushed_lsn_to_data_files(lsn, arch_log_no); + + fil_flush_file_spaces(FIL_TABLESPACE); + + fprintf(stderr, "Innobase: Shutdown completed\n"); +} + +/********************************************************** +Checks by parsing that the catenated log segment for a single mtr is +consistent. */ + +ibool +log_check_log_recs( +/*===============*/ + byte* buf, /* in: pointer to the start of the log segment + in the log_sys->buf log buffer */ + ulint len, /* in: segment length in bytes */ + dulint buf_start_lsn) /* in: buffer start lsn */ +{ + dulint contiguous_lsn; + dulint scanned_lsn; + byte* start; + byte* end; + byte* buf1; + byte* scan_buf; + + ut_ad(mutex_own(&(log_sys->mutex))); + + if (len == 0) { + + return(TRUE); + } + + start = ut_align_down(buf, OS_FILE_LOG_BLOCK_SIZE); + end = ut_align(buf + len, OS_FILE_LOG_BLOCK_SIZE); + + buf1 = mem_alloc((end - start) + OS_FILE_LOG_BLOCK_SIZE); + scan_buf = ut_align(buf1, OS_FILE_LOG_BLOCK_SIZE); + + ut_memcpy(scan_buf, start, end - start); + + recv_scan_log_recs(FALSE, scan_buf, end - start, + ut_dulint_align_down(buf_start_lsn, + OS_FILE_LOG_BLOCK_SIZE), + &contiguous_lsn, &scanned_lsn); + + ut_a(ut_dulint_cmp(scanned_lsn, ut_dulint_add(buf_start_lsn, len)) + == 0); + ut_a(ut_dulint_cmp(recv_sys->recovered_lsn, scanned_lsn) == 0); + + mem_free(buf1); + + return(TRUE); +} + +/********************************************************** +Prints info of the log. */ + +void +log_print(void) +/*===========*/ +{ + printf("Log sequence number %lu %lu\n", + ut_dulint_get_high(log_sys->lsn), + ut_dulint_get_low(log_sys->lsn)); +} + diff --git a/innobase/log/log0recv.c b/innobase/log/log0recv.c new file mode 100644 index 00000000000..1d0b556f1b6 --- /dev/null +++ b/innobase/log/log0recv.c @@ -0,0 +1,2512 @@ +/****************************************************** +Recovery + +(c) 1997 Innobase Oy + +Created 9/20/1997 Heikki Tuuri +*******************************************************/ + +#include "log0recv.h" + +#ifdef UNIV_NONINL +#include "log0recv.ic" +#endif + +#include "mem0mem.h" +#include "buf0buf.h" +#include "buf0flu.h" +#include "buf0rea.h" +#include "srv0srv.h" +#include "mtr0mtr.h" +#include "mtr0log.h" +#include "page0page.h" +#include "page0cur.h" +#include "btr0btr.h" +#include "btr0cur.h" +#include "ibuf0ibuf.h" +#include "trx0undo.h" +#include "trx0rec.h" +#include "trx0roll.h" +#include "btr0cur.h" +#include "btr0cur.h" +#include "btr0cur.h" +#include "dict0boot.h" +#include "fil0fil.h" + +/* Size of block reads when the log groups are scanned forward to do a +roll-forward */ +#define RECV_SCAN_SIZE (4 * UNIV_PAGE_SIZE) + +/* Size of the parsing buffer */ +#define RECV_PARSING_BUF_SIZE LOG_BUFFER_SIZE + +/* Log records are stored in the hash table in chunks at most of this size; +this must be less than UNIV_PAGE_SIZE as it is stored in the buffer pool */ +#define RECV_DATA_BLOCK_SIZE (MEM_MAX_ALLOC_IN_BUF - sizeof(recv_data_t)) + +/* Read-ahead area in applying log records to file pages */ +#define RECV_READ_AHEAD_AREA 32 + +recv_sys_t* recv_sys = NULL; +ibool recv_recovery_on = FALSE; +ibool recv_recovery_from_backup_on = FALSE; + +/* If the following is TRUE, the buffer pool file pages must be invalidated +after recovery and no ibuf operations are allowed; this becomes TRUE if +the log record hash table becomes too full, and log records must be merged +to file pages already before the recovery is finished: in this case no +ibuf operations are allowed, as they could modify the pages read in the +buffer pool before the pages have been recovered to the up-to-date state */ + +/* Recovery is running and no operations on the log files are allowed +yet: the variable name is misleading */ + +ibool recv_no_ibuf_operations = FALSE; + +/************************************************************ +Creates the recovery system. */ + +void +recv_sys_create(void) +/*=================*/ +{ + if (recv_sys != NULL) { + + return; + } + + recv_sys = mem_alloc(sizeof(recv_sys_t)); + + mutex_create(&(recv_sys->mutex)); + mutex_set_level(&(recv_sys->mutex), SYNC_RECV); + + recv_sys->heap = NULL; + recv_sys->addr_hash = NULL; +} + +/************************************************************ +Inits the recovery system for a recovery operation. */ + +void +recv_sys_init(void) +/*===============*/ +{ + if (recv_sys->heap != NULL) { + + return; + } + + mutex_enter(&(recv_sys->mutex)); + + recv_sys->heap = mem_heap_create_in_buffer(256); + + recv_sys->buf = ut_malloc(RECV_PARSING_BUF_SIZE); + recv_sys->len = 0; + recv_sys->recovered_offset = 0; + + recv_sys->addr_hash = hash_create(buf_pool_get_curr_size() / 64); + recv_sys->n_addrs = 0; + + recv_sys->apply_log_recs = FALSE; + recv_sys->apply_batch_on = FALSE; + + recv_sys->last_block_buf_start = mem_alloc(2 * OS_FILE_LOG_BLOCK_SIZE); + + recv_sys->last_block = ut_align(recv_sys->last_block_buf_start, + OS_FILE_LOG_BLOCK_SIZE); + mutex_exit(&(recv_sys->mutex)); +} + +/************************************************************ +Empties the hash table when it has been fully processed. */ +static +void +recv_sys_empty_hash(void) +/*=====================*/ +{ + ut_ad(mutex_own(&(recv_sys->mutex))); + ut_a(recv_sys->n_addrs == 0); + + hash_table_free(recv_sys->addr_hash); + mem_heap_empty(recv_sys->heap); + + recv_sys->addr_hash = hash_create(buf_pool_get_curr_size() / 256); +} + +/************************************************************ +Frees the recovery system. */ + +void +recv_sys_free(void) +/*===============*/ +{ + mutex_enter(&(recv_sys->mutex)); + + hash_table_free(recv_sys->addr_hash); + mem_heap_free(recv_sys->heap); + ut_free(recv_sys->buf); + mem_free(recv_sys->last_block_buf_start); + + recv_sys->addr_hash = NULL; + recv_sys->heap = NULL; + + mutex_exit(&(recv_sys->mutex)); +} + +/************************************************************ +Truncates possible corrupted or extra records from a log group. */ +static +void +recv_truncate_group( +/*================*/ + log_group_t* group, /* in: log group */ + dulint recovered_lsn, /* in: recovery succeeded up to this + lsn */ + dulint limit_lsn, /* in: this was the limit for + recovery */ + dulint checkpoint_lsn, /* in: recovery was started from this + checkpoint */ + dulint archived_lsn) /* in: the log has been archived up to + this lsn */ +{ + dulint start_lsn; + dulint end_lsn; + dulint finish_lsn1; + dulint finish_lsn2; + dulint finish_lsn; + ulint len; + ulint i; + + if (ut_dulint_cmp(archived_lsn, ut_dulint_max) == 0) { + /* Checkpoint was taken in the NOARCHIVELOG mode */ + archived_lsn = checkpoint_lsn; + } + + finish_lsn1 = ut_dulint_add(ut_dulint_align_down(archived_lsn, + OS_FILE_LOG_BLOCK_SIZE), + log_group_get_capacity(group)); + + finish_lsn2 = ut_dulint_add(ut_dulint_align_up(recovered_lsn, + OS_FILE_LOG_BLOCK_SIZE), + recv_sys->last_log_buf_size); + + if (ut_dulint_cmp(limit_lsn, ut_dulint_max) != 0) { + /* We do not know how far we should erase log records: erase + as much as possible */ + + finish_lsn = finish_lsn1; + } else { + /* It is enough to erase the length of the log buffer */ + finish_lsn = ut_dulint_get_min(finish_lsn1, finish_lsn2); + } + + ut_a(RECV_SCAN_SIZE <= log_sys->buf_size); + + /* Write the log buffer full of zeros */ + for (i = 0; i < RECV_SCAN_SIZE; i++) { + + *(log_sys->buf + i) = '\0'; + } + + start_lsn = ut_dulint_align_down(recovered_lsn, + OS_FILE_LOG_BLOCK_SIZE); + + if (ut_dulint_cmp(start_lsn, recovered_lsn) != 0) { + /* Copy the last incomplete log block to the log buffer and + edit its data length: */ + + ut_memcpy(log_sys->buf, recv_sys->last_block, + OS_FILE_LOG_BLOCK_SIZE); + log_block_set_data_len(log_sys->buf, + ut_dulint_minus(recovered_lsn, start_lsn)); + } + + if (ut_dulint_cmp(start_lsn, finish_lsn) >= 0) { + + return; + } + + for (;;) { + end_lsn = ut_dulint_add(start_lsn, RECV_SCAN_SIZE); + + if (ut_dulint_cmp(end_lsn, finish_lsn) > 0) { + + end_lsn = finish_lsn; + } + + len = ut_dulint_minus(end_lsn, start_lsn); + + log_group_write_buf(LOG_RECOVER, group, log_sys->buf, len, + start_lsn, 0); + if (ut_dulint_cmp(end_lsn, finish_lsn) >= 0) { + + return; + } + + /* Write the log buffer full of zeros */ + for (i = 0; i < RECV_SCAN_SIZE; i++) { + + *(log_sys->buf + i) = '\0'; + } + + start_lsn = end_lsn; + } +} + +/************************************************************ +Copies the log segment between group->recovered_lsn and recovered_lsn from the +most up-to-date log group to group, so that it contains the latest log data. */ +static +void +recv_copy_group( +/*============*/ + log_group_t* up_to_date_group, /* in: the most up-to-date log + group */ + log_group_t* group, /* in: copy to this log group */ + dulint recovered_lsn) /* in: recovery succeeded up + to this lsn */ +{ + dulint start_lsn; + dulint end_lsn; + ulint len; + + if (ut_dulint_cmp(group->scanned_lsn, recovered_lsn) >= 0) { + + return; + } + + ut_a(RECV_SCAN_SIZE <= log_sys->buf_size); + + start_lsn = ut_dulint_align_down(group->scanned_lsn, + OS_FILE_LOG_BLOCK_SIZE); + for (;;) { + end_lsn = ut_dulint_add(start_lsn, RECV_SCAN_SIZE); + + if (ut_dulint_cmp(end_lsn, recovered_lsn) > 0) { + end_lsn = ut_dulint_align_up(recovered_lsn, + OS_FILE_LOG_BLOCK_SIZE); + } + + log_group_read_log_seg(LOG_RECOVER, log_sys->buf, + up_to_date_group, start_lsn, end_lsn); + + len = ut_dulint_minus(end_lsn, start_lsn); + + log_group_write_buf(LOG_RECOVER, group, log_sys->buf, len, + start_lsn, 0); + + if (ut_dulint_cmp(end_lsn, recovered_lsn) >= 0) { + + return; + } + + start_lsn = end_lsn; + } +} + +/************************************************************ +Copies a log segment from the most up-to-date log group to the other log +groups, so that they all contain the latest log data. Also writes the info +about the latest checkpoint to the groups, and inits the fields in the group +memory structs to up-to-date values. */ + +void +recv_synchronize_groups( +/*====================*/ + log_group_t* up_to_date_group) /* in: the most up-to-date + log group */ +{ + log_group_t* group; + dulint start_lsn; + dulint end_lsn; + dulint recovered_lsn; + dulint limit_lsn; + + recovered_lsn = recv_sys->recovered_lsn; + limit_lsn = recv_sys->limit_lsn; + + /* Read the last recovered log block to the recovery system buffer: + the block is always incomplete */ + + start_lsn = ut_dulint_align_down(recovered_lsn, OS_FILE_LOG_BLOCK_SIZE); + end_lsn = ut_dulint_align_up(recovered_lsn, OS_FILE_LOG_BLOCK_SIZE); + + ut_ad(ut_dulint_cmp(start_lsn, end_lsn) != 0); + + log_group_read_log_seg(LOG_RECOVER, recv_sys->last_block, + up_to_date_group, start_lsn, end_lsn); + + group = UT_LIST_GET_FIRST(log_sys->log_groups); + + while (group) { + if (group != up_to_date_group) { + + /* Copy log data if needed */ + + recv_copy_group(group, up_to_date_group, + recovered_lsn); + } + + /* Update the fields in the group struct to correspond to + recovered_lsn */ + + log_group_set_fields(group, recovered_lsn); + + group = UT_LIST_GET_NEXT(log_groups, group); + } + + /* Copy the checkpoint info to the groups; remember that we have + incremented checkpoint_no by one, and the info will not be written + over the max checkpoint info, thus making the preservation of max + checkpoint info on disk certain */ + + log_groups_write_checkpoint_info(); + + mutex_exit(&(log_sys->mutex)); + + /* Wait for the checkpoint write to complete */ + rw_lock_s_lock(&(log_sys->checkpoint_lock)); + rw_lock_s_unlock(&(log_sys->checkpoint_lock)); + + mutex_enter(&(log_sys->mutex)); +} + +/************************************************************ +Looks for the maximum consistent checkpoint from the log groups. */ +static +ulint +recv_find_max_checkpoint( +/*=====================*/ + /* out: error code or DB_SUCCESS */ + log_group_t** max_group, /* out: max group */ + ulint* max_field) /* out: LOG_CHECKPOINT_1 or + LOG_CHECKPOINT_2 */ +{ + log_group_t* group; + dulint max_no; + dulint checkpoint_no; + ulint field; + ulint fold; + byte* buf; + + group = UT_LIST_GET_FIRST(log_sys->log_groups); + + max_no = ut_dulint_zero; + *max_group = NULL; + + buf = log_sys->checkpoint_buf; + + while (group) { + group->state = LOG_GROUP_CORRUPTED; + + for (field = LOG_CHECKPOINT_1; field <= LOG_CHECKPOINT_2; + field += LOG_CHECKPOINT_2 - LOG_CHECKPOINT_1) { + + log_group_read_checkpoint_info(group, field); + + /* Check the consistency of the checkpoint info */ + fold = ut_fold_binary(buf, LOG_CHECKPOINT_CHECKSUM_1); + + if (fold != mach_read_from_4(buf + + LOG_CHECKPOINT_CHECKSUM_1)) { + if (log_debug_writes) { + fprintf(stderr, + "Innobase: Checkpoint in group %lu at %lu invalid\n", + group->id, field); + } + + goto not_consistent; + } + + fold = ut_fold_binary(buf + LOG_CHECKPOINT_LSN, + LOG_CHECKPOINT_CHECKSUM_2 + - LOG_CHECKPOINT_LSN); + if (fold != mach_read_from_4(buf + + LOG_CHECKPOINT_CHECKSUM_2)) { + if (log_debug_writes) { + fprintf(stderr, + "Innobase: Checkpoint in group %lu at %lu invalid\n", + group->id, field); + } + goto not_consistent; + } + + group->state = LOG_GROUP_OK; + + group->lsn = mach_read_from_8(buf + + LOG_CHECKPOINT_LSN); + group->lsn_offset = mach_read_from_4(buf + + LOG_CHECKPOINT_OFFSET); + checkpoint_no = + mach_read_from_8(buf + LOG_CHECKPOINT_NO); + + if (log_debug_writes) { + fprintf(stderr, + "Innobase: Checkpoint number %lu found in group %lu\n", + ut_dulint_get_low(checkpoint_no), group->id); + } + + if (ut_dulint_cmp(checkpoint_no, max_no) >= 0) { + *max_group = group; + *max_field = field; + max_no = checkpoint_no; + } + + not_consistent: + ; + } + + group = UT_LIST_GET_NEXT(log_groups, group); + } + + if (*max_group == NULL) { + + if (log_debug_writes) { + fprintf(stderr, + "Innobase: No valid checkpoint found\n"); + } + + return(DB_ERROR); + } + + return(DB_SUCCESS); +} + +/*********************************************************************** +Tries to parse a single log record body and also applies it to a page if +specified. */ +static +byte* +recv_parse_or_apply_log_rec_body( +/*=============================*/ + /* out: log record end, NULL if not a complete + record */ + byte type, /* in: type */ + byte* ptr, /* in: pointer to a buffer */ + byte* end_ptr,/* in: pointer to the buffer end */ + page_t* page, /* in: buffer page or NULL; if not NULL, then the log + record is applied to the page, and the log record + should be complete then */ + mtr_t* mtr) /* in: mtr or NULL; should be non-NULL if and only if + page is non-NULL */ +{ + byte* new_ptr; + + if (type <= MLOG_8BYTES) { + new_ptr = mlog_parse_nbytes(type, ptr, end_ptr, page); + + } else if (type == MLOG_REC_INSERT) { + new_ptr = page_cur_parse_insert_rec(FALSE, ptr, end_ptr, page, + mtr); + } else if (type == MLOG_REC_CLUST_DELETE_MARK) { + new_ptr = btr_cur_parse_del_mark_set_clust_rec(ptr, end_ptr, + page); + } else if (type == MLOG_REC_SEC_DELETE_MARK) { + new_ptr = btr_cur_parse_del_mark_set_sec_rec(ptr, end_ptr, + page); + } else if (type == MLOG_REC_UPDATE_IN_PLACE) { + new_ptr = btr_cur_parse_update_in_place(ptr, end_ptr, page); + + } else if ((type == MLOG_LIST_END_DELETE) + || (type == MLOG_LIST_START_DELETE)) { + new_ptr = page_parse_delete_rec_list(type, ptr, end_ptr, page, + mtr); + } else if (type == MLOG_LIST_END_COPY_CREATED) { + new_ptr = page_parse_copy_rec_list_to_created_page(ptr, + end_ptr, page, mtr); + } else if (type == MLOG_PAGE_REORGANIZE) { + new_ptr = btr_parse_page_reorganize(ptr, end_ptr, page, mtr); + + } else if (type == MLOG_PAGE_CREATE) { + new_ptr = page_parse_create(ptr, end_ptr, page, mtr); + + } else if (type == MLOG_UNDO_INSERT) { + new_ptr = trx_undo_parse_add_undo_rec(ptr, end_ptr, page); + + } else if (type == MLOG_UNDO_ERASE_END) { + new_ptr = trx_undo_parse_erase_page_end(ptr, end_ptr, page, + mtr); + } else if (type == MLOG_UNDO_INIT) { + new_ptr = trx_undo_parse_page_init(ptr, end_ptr, page, mtr); + + } else if (type == MLOG_UNDO_HDR_DISCARD) { + new_ptr = trx_undo_parse_discard_latest(ptr, end_ptr, page, + mtr); + } else if ((type == MLOG_UNDO_HDR_CREATE) + || (type == MLOG_UNDO_HDR_REUSE)) { + new_ptr = trx_undo_parse_page_header(type, ptr, end_ptr, page, + mtr); + } else if (type == MLOG_REC_MIN_MARK) { + new_ptr = btr_parse_set_min_rec_mark(ptr, end_ptr, page, mtr); + + } else if (type == MLOG_REC_DELETE) { + new_ptr = page_cur_parse_delete_rec(ptr, end_ptr, page, mtr); + + } else if (type == MLOG_IBUF_BITMAP_INIT) { + new_ptr = ibuf_parse_bitmap_init(ptr, end_ptr, page, mtr); + + } else if (type == MLOG_FULL_PAGE) { + new_ptr = mtr_log_parse_full_page(ptr, end_ptr, page); + + } else if (type == MLOG_INIT_FILE_PAGE) { + new_ptr = fsp_parse_init_file_page(ptr, end_ptr, page); + + } else if (type <= MLOG_WRITE_STRING) { + new_ptr = mlog_parse_string(ptr, end_ptr, page); + } else { + ut_error; + } + + ut_ad(!page || new_ptr); + + return(new_ptr); +} + +/************************************************************************* +Calculates the fold value of a page file address: used in inserting or +searching for a log record in the hash table. */ +UNIV_INLINE +ulint +recv_fold( +/*======*/ + /* out: folded value */ + ulint space, /* in: space */ + ulint page_no)/* in: page number */ +{ + return(ut_fold_ulint_pair(space, page_no)); +} + +/************************************************************************* +Calculates the hash value of a page file address: used in inserting or +searching for a log record in the hash table. */ +UNIV_INLINE +ulint +recv_hash( +/*======*/ + /* out: folded value */ + ulint space, /* in: space */ + ulint page_no)/* in: page number */ +{ + return(hash_calc_hash(recv_fold(space, page_no), recv_sys->addr_hash)); +} + +/************************************************************************* +Gets the hashed file address struct for a page. */ +static +recv_addr_t* +recv_get_fil_addr_struct( +/*=====================*/ + /* out: file address struct, NULL if not found from + the hash table */ + ulint space, /* in: space id */ + ulint page_no)/* in: page number */ +{ + recv_addr_t* recv_addr; + + recv_addr = HASH_GET_FIRST(recv_sys->addr_hash, + recv_hash(space, page_no)); + + while (recv_addr) { + if ((recv_addr->space == space) + && (recv_addr->page_no == page_no)) { + + break; + } + + recv_addr = HASH_GET_NEXT(addr_hash, recv_addr); + } + + return(recv_addr); +} + +/*********************************************************************** +Adds a new log record to the hash table of log records. */ +static +void +recv_add_to_hash_table( +/*===================*/ + byte type, /* in: log record type */ + ulint space, /* in: space id */ + ulint page_no, /* in: page number */ + byte* body, /* in: log record body */ + byte* rec_end, /* in: log record end */ + dulint start_lsn, /* in: start lsn of the mtr */ + dulint end_lsn) /* in: end lsn of the mtr */ +{ + recv_t* recv; + ulint len; + recv_data_t* recv_data; + recv_data_t** prev_field; + recv_addr_t* recv_addr; + + ut_a(space == 0); /* For debugging; TODO: remove this */ + + len = rec_end - body; + + recv = mem_heap_alloc(recv_sys->heap, sizeof(recv_t)); + recv->type = type; + recv->len = rec_end - body; + recv->start_lsn = start_lsn; + recv->end_lsn = end_lsn; + + recv_addr = recv_get_fil_addr_struct(space, page_no); + + if (recv_addr == NULL) { + recv_addr = mem_heap_alloc(recv_sys->heap, + sizeof(recv_addr_t)); + recv_addr->space = space; + recv_addr->page_no = page_no; + recv_addr->state = RECV_NOT_PROCESSED; + + UT_LIST_INIT(recv_addr->rec_list); + + HASH_INSERT(recv_addr_t, addr_hash, recv_sys->addr_hash, + recv_fold(space, page_no), recv_addr); + recv_sys->n_addrs++; + } + + UT_LIST_ADD_LAST(rec_list, recv_addr->rec_list, recv); + + prev_field = &(recv->data); + + /* Store the log record body in chunks of less than UNIV_PAGE_SIZE: + recv_sys->heap grows into the buffer pool, and bigger chunks could not + be allocated */ + + while (rec_end > body) { + + len = rec_end - body; + + if (len > RECV_DATA_BLOCK_SIZE) { + len = RECV_DATA_BLOCK_SIZE; + } + + recv_data = mem_heap_alloc(recv_sys->heap, + sizeof(recv_data_t) + len); + *prev_field = recv_data; + + ut_memcpy(((byte*)recv_data) + sizeof(recv_data_t), body, len); + + prev_field = &(recv_data->next); + + body += len; + } + + *prev_field = NULL; +} + +/************************************************************************* +Copies the log record body from recv to buf. */ +static +void +recv_data_copy_to_buf( +/*==================*/ + byte* buf, /* in: buffer of length at least recv->len */ + recv_t* recv) /* in: log record */ +{ + recv_data_t* recv_data; + ulint part_len; + ulint len; + + len = recv->len; + recv_data = recv->data; + + while (len > 0) { + if (len > RECV_DATA_BLOCK_SIZE) { + part_len = RECV_DATA_BLOCK_SIZE; + } else { + part_len = len; + } + + ut_memcpy(buf, ((byte*)recv_data) + sizeof(recv_data_t), + part_len); + buf += part_len; + len -= part_len; + + recv_data = recv_data->next; + } +} + +/**************************************************************************** +Applies the hashed log records to the page, if the page lsn is less than the +lsn of a log record. This can be called when a buffer page has just been +read in, or also for a page already in the buffer pool. */ + +void +recv_recover_page( +/*==============*/ + ibool just_read_in, /* in: TRUE if the i/o-handler calls this for + a freshly read page */ + page_t* page, /* in: buffer page */ + ulint space, /* in: space id */ + ulint page_no) /* in: page number */ +{ + buf_block_t* block; + recv_addr_t* recv_addr; + recv_t* recv; + byte* buf; + dulint start_lsn; + dulint end_lsn; + dulint page_lsn; + dulint page_newest_lsn; + ibool modification_to_page; + ibool success; + mtr_t mtr; + + mutex_enter(&(recv_sys->mutex)); + + if (recv_sys->apply_log_recs == FALSE) { + + /* Log records should not be applied now */ + + mutex_exit(&(recv_sys->mutex)); + + return; + } + + recv_addr = recv_get_fil_addr_struct(space, page_no); + + if ((recv_addr == NULL) + || (recv_addr->state == RECV_BEING_PROCESSED) + || (recv_addr->state == RECV_PROCESSED)) { + + mutex_exit(&(recv_sys->mutex)); + + return; + } + + recv_addr->state = RECV_BEING_PROCESSED; + + mutex_exit(&(recv_sys->mutex)); + + block = buf_block_align(page); + + if (just_read_in) { + /* Move the ownership of the x-latch on the page to this OS + thread, so that we can acquire a second x-latch on it. This + is needed for the operations to the page to pass the debug + checks. */ + + rw_lock_x_lock_move_ownership(&(block->lock)); + } + + mtr_start(&mtr); + + mtr_set_log_mode(&mtr, MTR_LOG_NONE); + + success = buf_page_get_known_nowait(RW_X_LATCH, page, BUF_KEEP_OLD, +#ifdef UNIV_SYNC_DEBUG + __FILE__, __LINE__, +#endif + &mtr); + ut_a(success); + + buf_page_dbg_add_level(page, SYNC_NO_ORDER_CHECK); + + /* Read the newest modification lsn from the page */ + page_lsn = mach_read_from_8(page + FIL_PAGE_LSN); + + /* It may be that the page has been modified in the buffer pool: read + the newest modification lsn there */ + + page_newest_lsn = buf_frame_get_newest_modification(page); + + if (!ut_dulint_is_zero(page_newest_lsn)) { + + page_lsn = page_newest_lsn; + } + + modification_to_page = FALSE; + + recv = UT_LIST_GET_FIRST(recv_addr->rec_list); + + while (recv) { + end_lsn = recv->end_lsn; + + if (recv->len > RECV_DATA_BLOCK_SIZE) { + /* We have to copy the record body to a separate + buffer */ + + buf = mem_alloc(recv->len); + + recv_data_copy_to_buf(buf, recv); + } else { + buf = ((byte*)(recv->data)) + sizeof(recv_data_t); + } + + if ((recv->type == MLOG_INIT_FILE_PAGE) + || (recv->type == MLOG_FULL_PAGE)) { + /* A new file page may has been taken into use, + or we have stored the full contents of the page: + in this case it may be that the original log record + type was MLOG_INIT_FILE_PAGE, and we replaced it + with MLOG_FULL_PAGE, thus to we have to apply + any record of type MLOG_FULL_PAGE */ + + page_lsn = page_newest_lsn; + + mach_write_to_8(page + UNIV_PAGE_SIZE + - FIL_PAGE_END_LSN, ut_dulint_zero); + mach_write_to_8(page + FIL_PAGE_LSN, ut_dulint_zero); + } + + if (ut_dulint_cmp(recv->start_lsn, page_lsn) >= 0) { + + if (!modification_to_page) { + + modification_to_page = TRUE; + start_lsn = recv->start_lsn; + } + + if (log_debug_writes) { + fprintf(stderr, + "Innobase: Applying log rec type %lu len %lu to space %lu page no %lu\n", + recv->type, recv->len, recv_addr->space, + recv_addr->page_no); + } + + recv_parse_or_apply_log_rec_body(recv->type, buf, + buf + recv->len, page, &mtr); + } + + if (recv->len > RECV_DATA_BLOCK_SIZE) { + mem_free(buf); + } + + recv = UT_LIST_GET_NEXT(rec_list, recv); + } + + /* If the following assert fails, the file page is incompletely + written, and a recovery from a backup is required */ + + ut_a(0 == ut_dulint_cmp(mach_read_from_8(page + FIL_PAGE_LSN), + mach_read_from_8(page + UNIV_PAGE_SIZE + - FIL_PAGE_END_LSN))); + mutex_enter(&(recv_sys->mutex)); + + recv_addr->state = RECV_PROCESSED; + + ut_a(recv_sys->n_addrs); + recv_sys->n_addrs--; + + mutex_exit(&(recv_sys->mutex)); + + if (modification_to_page) { + buf_flush_recv_note_modification(block, start_lsn, end_lsn); + } + + /* Make sure that committing mtr does not change the modification + lsn values of page */ + + mtr.modifications = FALSE; + + mtr_commit(&mtr); +} + +/*********************************************************************** +Reads in pages which have hashed log records, from an area around a given +page number. */ +static +ulint +recv_read_in_area( +/*==============*/ + /* out: number of pages found */ + ulint space, /* in: space */ + ulint page_no)/* in: page number */ +{ + recv_addr_t* recv_addr; + ulint page_nos[RECV_READ_AHEAD_AREA]; + ulint low_limit; + ulint n; + + low_limit = page_no - (page_no % RECV_READ_AHEAD_AREA); + + n = 0; + + for (page_no = low_limit; page_no < low_limit + RECV_READ_AHEAD_AREA; + page_no++) { + recv_addr = recv_get_fil_addr_struct(space, page_no); + + if (recv_addr && !buf_page_peek(space, page_no)) { + + mutex_enter(&(recv_sys->mutex)); + + if (recv_addr->state == RECV_NOT_PROCESSED) { + recv_addr->state = RECV_BEING_READ; + + page_nos[n] = page_no; + + n++; + } + + mutex_exit(&(recv_sys->mutex)); + } + } + + buf_read_recv_pages(FALSE, space, page_nos, n); + + /* printf("Recv pages at %lu n %lu\n", page_nos[0], n); */ + + return(n); +} + +/*********************************************************************** +Empties the hash table of stored log records, applying them to appropriate +pages. */ + +void +recv_apply_hashed_log_recs( +/*=======================*/ + ibool allow_ibuf) /* in: if TRUE, also ibuf operations are + allowed during the application; if FALSE, + no ibuf operations are allowed, and after + the application all file pages are flushed to + disk and invalidated in buffer pool: this + alternative means that no new log records + can be generated during the application; + the caller must in this case own the log + mutex */ +{ + recv_addr_t* recv_addr; + page_t* page; + ulint i; + ulint space; + ulint page_no; + ulint n_pages; + ibool has_printed = FALSE; + mtr_t mtr; +loop: + mutex_enter(&(recv_sys->mutex)); + + if (recv_sys->apply_batch_on) { + + mutex_exit(&(recv_sys->mutex)); + + os_thread_sleep(500000); + + goto loop; + } + + if (!allow_ibuf) { + ut_ad(mutex_own(&(log_sys->mutex))); + + recv_no_ibuf_operations = TRUE; + } else { + ut_ad(!mutex_own(&(log_sys->mutex))); + } + + recv_sys->apply_log_recs = TRUE; + recv_sys->apply_batch_on = TRUE; + + for (i = 0; i < hash_get_n_cells(recv_sys->addr_hash); i++) { + + recv_addr = HASH_GET_FIRST(recv_sys->addr_hash, i); + + while (recv_addr) { + space = recv_addr->space; + page_no = recv_addr->page_no; + + if (recv_addr->state == RECV_NOT_PROCESSED) { + if (!has_printed) { + fprintf(stderr, +"Innobase: Starting an apply batch of log records to the database...\n"); + has_printed = TRUE; + } + + mutex_exit(&(recv_sys->mutex)); + + if (buf_page_peek(space, page_no)) { + + mtr_start(&mtr); + + page = buf_page_get(space, page_no, + RW_X_LATCH, &mtr); + + buf_page_dbg_add_level(page, + SYNC_NO_ORDER_CHECK); + recv_recover_page(FALSE, page, space, + page_no); + mtr_commit(&mtr); + } else { + recv_read_in_area(space, page_no); + } + + mutex_enter(&(recv_sys->mutex)); + } + + recv_addr = HASH_GET_NEXT(addr_hash, recv_addr); + } + } + + /* Wait until all the pages have been processed */ + + while (recv_sys->n_addrs != 0) { + + mutex_exit(&(recv_sys->mutex)); + + os_thread_sleep(500000); + + mutex_enter(&(recv_sys->mutex)); + } + + if (!allow_ibuf) { + /* Flush all the file pages to disk and invalidate them in + the buffer pool */ + + mutex_exit(&(recv_sys->mutex)); + mutex_exit(&(log_sys->mutex)); + + n_pages = buf_flush_batch(BUF_FLUSH_LIST, ULINT_MAX, + ut_dulint_max); + ut_a(n_pages != ULINT_UNDEFINED); + + buf_flush_wait_batch_end(BUF_FLUSH_LIST); + + buf_pool_invalidate(); + + mutex_enter(&(log_sys->mutex)); + mutex_enter(&(recv_sys->mutex)); + + recv_no_ibuf_operations = FALSE; + } + + recv_sys->apply_log_recs = FALSE; + recv_sys->apply_batch_on = FALSE; + + recv_sys_empty_hash(); + + if (has_printed) { + fprintf(stderr, "Innobase: Apply batch completed\n"); + } + + mutex_exit(&(recv_sys->mutex)); +} + +/*********************************************************************** +In the debug version, updates the replica of a file page, based on a log +record. */ +static +void +recv_update_replicate( +/*==================*/ + byte type, /* in: log record type */ + ulint space, /* in: space id */ + ulint page_no,/* in: page number */ + byte* body, /* in: log record body */ + byte* end_ptr)/* in: log record end */ +{ + page_t* replica; + mtr_t mtr; + byte* ptr; + + mtr_start(&mtr); + + mtr_set_log_mode(&mtr, MTR_LOG_NONE); + + replica = buf_page_get(space + RECV_REPLICA_SPACE_ADD, page_no, + RW_X_LATCH, &mtr); + buf_page_dbg_add_level(replica, SYNC_NO_ORDER_CHECK); + + ptr = recv_parse_or_apply_log_rec_body(type, body, end_ptr, replica, + &mtr); + ut_a(ptr == end_ptr); + + /* Notify the buffer manager that the page has been updated */ + + buf_flush_recv_note_modification(buf_block_align(replica), + log_sys->old_lsn, log_sys->old_lsn); + + /* Make sure that committing mtr does not call log routines, as + we currently own the log mutex */ + + mtr.modifications = FALSE; + + mtr_commit(&mtr); +} + +/*********************************************************************** +Checks that two strings are identical. */ +static +void +recv_check_identical( +/*=================*/ + byte* str1, /* in: first string */ + byte* str2, /* in: second string */ + ulint len) /* in: length of strings */ +{ + ulint i; + + for (i = 0; i < len; i++) { + + if (str1[i] != str2[i]) { + fprintf(stderr, "Strings do not match at offset %lu\n", i); + + ut_print_buf(str1 + i, 16); + fprintf(stderr, "\n"); + ut_print_buf(str2 + i, 16); + + ut_error; + } + } +} + +/*********************************************************************** +In the debug version, checks that the replica of a file page is identical +to the original page. */ +static +void +recv_compare_replicate( +/*===================*/ + ulint space, /* in: space id */ + ulint page_no)/* in: page number */ +{ + page_t* replica; + page_t* page; + mtr_t mtr; + + mtr_start(&mtr); + + mutex_enter(&(buf_pool->mutex)); + + page = buf_page_hash_get(space, page_no)->frame; + + mutex_exit(&(buf_pool->mutex)); + + replica = buf_page_get(space + RECV_REPLICA_SPACE_ADD, page_no, + RW_X_LATCH, &mtr); + buf_page_dbg_add_level(replica, SYNC_NO_ORDER_CHECK); + + recv_check_identical(page + FIL_PAGE_DATA, + replica + FIL_PAGE_DATA, + PAGE_HEADER + PAGE_MAX_TRX_ID - FIL_PAGE_DATA); + + recv_check_identical(page + PAGE_HEADER + PAGE_MAX_TRX_ID + 8, + replica + PAGE_HEADER + PAGE_MAX_TRX_ID + 8, + UNIV_PAGE_SIZE - FIL_PAGE_DATA_END + - PAGE_HEADER - PAGE_MAX_TRX_ID - 8); + mtr_commit(&mtr); +} + +/*********************************************************************** +Checks that a replica of a space is identical to the original space. */ + +void +recv_compare_spaces( +/*================*/ + ulint space1, /* in: space id */ + ulint space2, /* in: space id */ + ulint n_pages)/* in: number of pages */ +{ + page_t* replica; + page_t* page; + mtr_t mtr; + page_t* frame; + ulint page_no; + + replica = buf_frame_alloc(); + page = buf_frame_alloc(); + + for (page_no = 0; page_no < n_pages; page_no++) { + + mtr_start(&mtr); + + frame = buf_page_get_gen(space1, page_no, RW_S_LATCH, NULL, + BUF_GET_IF_IN_POOL, +#ifdef UNIV_SYNC_DEBUG + __FILE__, __LINE__, +#endif + &mtr); + if (frame) { + buf_page_dbg_add_level(frame, SYNC_NO_ORDER_CHECK); + ut_memcpy(page, frame, UNIV_PAGE_SIZE); + } else { + /* Read it from file */ + fil_io(OS_FILE_READ, TRUE, space1, page_no, 0, + UNIV_PAGE_SIZE, page, NULL); + } + + frame = buf_page_get_gen(space2, page_no, RW_S_LATCH, NULL, + BUF_GET_IF_IN_POOL, +#ifdef UNIV_SYNC_DEBUG + __FILE__, __LINE__, +#endif + &mtr); + if (frame) { + buf_page_dbg_add_level(frame, SYNC_NO_ORDER_CHECK); + ut_memcpy(replica, frame, UNIV_PAGE_SIZE); + } else { + /* Read it from file */ + fil_io(OS_FILE_READ, TRUE, space2, page_no, 0, + UNIV_PAGE_SIZE, replica, NULL); + } + + recv_check_identical(page + FIL_PAGE_DATA, + replica + FIL_PAGE_DATA, + PAGE_HEADER + PAGE_MAX_TRX_ID - FIL_PAGE_DATA); + + recv_check_identical(page + PAGE_HEADER + PAGE_MAX_TRX_ID + 8, + replica + PAGE_HEADER + PAGE_MAX_TRX_ID + 8, + UNIV_PAGE_SIZE - FIL_PAGE_DATA_END + - PAGE_HEADER - PAGE_MAX_TRX_ID - 8); + + mtr_commit(&mtr); + } + + buf_frame_free(replica); + buf_frame_free(page); +} + +/*********************************************************************** +Checks that a replica of a space is identical to the original space. Disables +ibuf operations and flushes and invalidates the buffer pool pages after the +test. This function can be used to check the recovery before dict or trx +systems are initialized. */ + +void +recv_compare_spaces_low( +/*====================*/ + ulint space1, /* in: space id */ + ulint space2, /* in: space id */ + ulint n_pages)/* in: number of pages */ +{ + mutex_enter(&(log_sys->mutex)); + + recv_apply_hashed_log_recs(FALSE); + + mutex_exit(&(log_sys->mutex)); + + recv_compare_spaces(space1, space2, n_pages); +} + +/*********************************************************************** +Tries to parse a single log record and returns its length. */ +static +ulint +recv_parse_log_rec( +/*===============*/ + /* out: length of the record, or 0 if the record was + not complete */ + byte* ptr, /* in: pointer to a buffer */ + byte* end_ptr,/* in: pointer to the buffer end */ + byte* type, /* out: type */ + ulint* space, /* out: space id */ + ulint* page_no,/* out: page number */ + byte** body) /* out: log record body start */ +{ + byte* new_ptr; + + if (ptr == end_ptr) { + + return(0); + } + + if (*ptr == MLOG_MULTI_REC_END) { + + *type = *ptr; + + return(1); + } + + if (*ptr == MLOG_DUMMY_RECORD) { + *type = *ptr; + + *space = 1000; /* For debugging */ + + return(1); + } + + new_ptr = mlog_parse_initial_log_record(ptr, end_ptr, type, space, + page_no); + if (!new_ptr) { + + return(0); + } + + *body = new_ptr; + + new_ptr = recv_parse_or_apply_log_rec_body(*type, new_ptr, end_ptr, + NULL, NULL); + if (new_ptr == NULL) { + + return(0); + } + + return(new_ptr - ptr); +} + +/*********************************************************** +Calculates the new value for lsn when more data is added to the log. */ +static +dulint +recv_calc_lsn_on_data_add( +/*======================*/ + dulint lsn, /* in: old lsn */ + ulint len) /* in: this many bytes of data is added, log block + headers not included */ +{ + ulint frag_len; + ulint lsn_len; + + frag_len = (ut_dulint_get_low(lsn) % OS_FILE_LOG_BLOCK_SIZE) + - LOG_BLOCK_HDR_SIZE; + ut_ad(frag_len < OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_HDR_SIZE + - LOG_BLOCK_TRL_SIZE); + lsn_len = len + ((len + frag_len) + / (OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_HDR_SIZE + - LOG_BLOCK_TRL_SIZE)) + * (LOG_BLOCK_HDR_SIZE + LOG_BLOCK_TRL_SIZE); + + return(ut_dulint_add(lsn, lsn_len)); +} + +/*********************************************************** +Checks that the parser recognizes incomplete initial segments of a log +record as incomplete. */ + +void +recv_check_incomplete_log_recs( +/*===========================*/ + byte* ptr, /* in: pointer to a complete log record */ + ulint len) /* in: length of the log record */ +{ + ulint i; + byte type; + ulint space; + ulint page_no; + byte* body; + + for (i = 0; i < len; i++) { + ut_a(0 == recv_parse_log_rec(ptr, ptr + i, &type, &space, + &page_no, &body)); + } +} + +/*********************************************************** +Parses log records from a buffer and stores them to a hash table to wait +merging to file pages. If the hash table becomes too full, applies it +automatically to file pages. */ + +void +recv_parse_log_recs( +/*================*/ + ibool store_to_hash) /* in: TRUE if the records should be stored + to the hash table; this is set to FALSE if just + debug checking is needed */ +{ + byte* ptr; + byte* end_ptr; + ulint single_rec; + ulint len; + ulint total_len; + dulint new_recovered_lsn; + dulint old_lsn; + byte type; + ulint space; + ulint page_no; + byte* body; + ulint n_recs; + + ut_ad(mutex_own(&(log_sys->mutex))); + ut_ad(!ut_dulint_is_zero(recv_sys->parse_start_lsn)); +loop: + ptr = recv_sys->buf + recv_sys->recovered_offset; + + end_ptr = recv_sys->buf + recv_sys->len; + + if (ptr == end_ptr) { + + return; + } + + single_rec = (ulint)*ptr & MLOG_SINGLE_REC_FLAG; + + if (single_rec || *ptr == MLOG_DUMMY_RECORD) { + /* The mtr only modified a single page */ + + old_lsn = recv_sys->recovered_lsn; + + len = recv_parse_log_rec(ptr, end_ptr, &type, &space, + &page_no, &body); + if (len == 0) { + + return; + } + + new_recovered_lsn = recv_calc_lsn_on_data_add(old_lsn, len); + + if (ut_dulint_cmp(new_recovered_lsn, recv_sys->scanned_lsn) + > 0) { + /* The log record filled a log block, and we require + that also the next log block should have been scanned + in */ + + return; + } + + recv_sys->recovered_offset += len; + recv_sys->recovered_lsn = new_recovered_lsn; + + if (log_debug_writes) { + fprintf(stderr, +"Innobase: Parsed a single log rec type %lu len %lu space %lu page no %lu\n", + type, len, space, page_no); + } + + if (type == MLOG_DUMMY_RECORD) { + /* Do nothing */ + + } else if (store_to_hash) { + recv_add_to_hash_table(type, space, page_no, body, + ptr + len, old_lsn, + recv_sys->recovered_lsn); + } else { + /* In debug checking, update a replicate page + according to the log record, and check that it + becomes identical with the original page */ +#ifdef UNIV_LOG_DEBUG + recv_check_incomplete_log_recs(ptr, len); +#endif + recv_update_replicate(type, space, page_no, body, + ptr + len); + recv_compare_replicate(space, page_no); + } + } else { + /* Check that all the records associated with the single mtr + are included within the buffer */ + + total_len = 0; + n_recs = 0; + + for (;;) { + len = recv_parse_log_rec(ptr, end_ptr, &type, &space, + &page_no, &body); + if (len == 0) { + + return; + } + + if ((!store_to_hash) && (type != MLOG_MULTI_REC_END)) { + /* In debug checking, update a replicate page + according to the log record */ +#ifdef UNIV_LOG_DEBUG + recv_check_incomplete_log_recs(ptr, len); +#endif + recv_update_replicate(type, space, page_no, + body, ptr + len); + } + + if (log_debug_writes) { + fprintf(stderr, +"Innobase: Parsed a multi log rec type %lu len %lu space %lu page no %lu\n", + type, len, space, page_no); + } + + total_len += len; + n_recs++; + + ptr += len; + + if (type == MLOG_MULTI_REC_END) { + + /* Found the end mark for the records */ + + break; + } + } + + new_recovered_lsn = recv_calc_lsn_on_data_add( + recv_sys->recovered_lsn, total_len); + + if (ut_dulint_cmp(new_recovered_lsn, recv_sys->scanned_lsn) + > 0) { + /* The log record filled a log block, and we require + that also the next log block should have been scanned + in */ + + return; + } + + if (2 * n_recs * (sizeof(recv_t) + sizeof(recv_addr_t)) + + total_len + + mem_heap_get_size(recv_sys->heap) + + RECV_POOL_N_FREE_BLOCKS * UNIV_PAGE_SIZE + > buf_pool_get_curr_size()) { + + /* Hash table of log records will grow too big: + empty it */ + + recv_apply_hashed_log_recs(FALSE); + } + + ut_ad(2 * n_recs * (sizeof(recv_t) + sizeof(recv_addr_t)) + + total_len + + mem_heap_get_size(recv_sys->heap) + + RECV_POOL_N_FREE_BLOCKS * UNIV_PAGE_SIZE + < buf_pool_get_curr_size()); + + /* Add all the records to the hash table */ + + ptr = recv_sys->buf + recv_sys->recovered_offset; + + for (;;) { + old_lsn = recv_sys->recovered_lsn; + len = recv_parse_log_rec(ptr, end_ptr, &type, &space, + &page_no, &body); + ut_a(len != 0); + ut_a(0 == ((ulint)*ptr & MLOG_SINGLE_REC_FLAG)); + + recv_sys->recovered_offset += len; + recv_sys->recovered_lsn = recv_calc_lsn_on_data_add( + old_lsn, len); + if (type == MLOG_MULTI_REC_END) { + + /* Found the end mark for the records */ + + break; + } + + if (store_to_hash) { + recv_add_to_hash_table(type, space, page_no, + body, ptr + len, old_lsn, + new_recovered_lsn); + } else { + /* In debug checking, check that the replicate + page has become identical with the original + page */ + + recv_compare_replicate(space, page_no); + } + + ptr += len; + } + } + + if (store_to_hash && buf_get_free_list_len() + < RECV_POOL_N_FREE_BLOCKS) { + + /* Hash table of log records has grown too big: empty it; + FALSE means no ibuf operations allowed, as we cannot add + new records to the log yet: they would be produced by ibuf + operations */ + + recv_apply_hashed_log_recs(FALSE); + } + + goto loop; +} + +/*********************************************************** +Adds data from a new log block to the parsing buffer of recv_sys if +recv_sys->parse_start_lsn is non-zero. */ +static +ibool +recv_sys_add_to_parsing_buf( +/*========================*/ + /* out: TRUE if more data added */ + byte* log_block, /* in: log block */ + dulint scanned_lsn) /* in: lsn of how far we were able to find + data in this log block */ +{ + ulint more_len; + ulint data_len; + ulint start_offset; + ulint end_offset; + + ut_ad(ut_dulint_cmp(scanned_lsn, recv_sys->scanned_lsn) >= 0); + + if (ut_dulint_is_zero(recv_sys->parse_start_lsn)) { + /* Cannot start parsing yet because no start point for + it found */ + + return(FALSE); + } + + data_len = log_block_get_data_len(log_block); + + if (ut_dulint_cmp(recv_sys->parse_start_lsn, scanned_lsn) >= 0) { + + return(FALSE); + + } else if (ut_dulint_cmp(recv_sys->scanned_lsn, scanned_lsn) >= 0) { + + return(FALSE); + + } else if (ut_dulint_cmp(recv_sys->parse_start_lsn, + recv_sys->scanned_lsn) > 0) { + more_len = ut_dulint_minus(scanned_lsn, + recv_sys->parse_start_lsn); + } else { + more_len = ut_dulint_minus(scanned_lsn, recv_sys->scanned_lsn); + } + + if (more_len == 0) { + + return(FALSE); + } + + ut_ad(data_len >= more_len); + + start_offset = data_len - more_len; + + if (start_offset < LOG_BLOCK_HDR_SIZE) { + start_offset = LOG_BLOCK_HDR_SIZE; + } + + end_offset = data_len; + + if (end_offset > OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_TRL_SIZE) { + end_offset = OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_TRL_SIZE; + } + + ut_ad(start_offset <= end_offset); + + if (start_offset < end_offset) { + ut_memcpy(recv_sys->buf + recv_sys->len, + log_block + start_offset, end_offset - start_offset); + + recv_sys->len += end_offset - start_offset; + + ut_ad(recv_sys->len <= RECV_PARSING_BUF_SIZE); + } + + return(TRUE); +} + +/*********************************************************** +Moves the parsing buffer data left to the buffer start. */ +static +void +recv_sys_justify_left_parsing_buf(void) +/*===================================*/ +{ + ut_memmove(recv_sys->buf, recv_sys->buf + recv_sys->recovered_offset, + recv_sys->len - recv_sys->recovered_offset); + + recv_sys->len -= recv_sys->recovered_offset; + + recv_sys->recovered_offset = 0; +} + +/*********************************************************** +Scans log from a buffer and stores new log data to the parsing buffer. Parses +and hashes the log records if new data found. */ + +ibool +recv_scan_log_recs( +/*===============*/ + /* out: TRUE if limit_lsn has been reached, or + not able to scan any more in this log group */ + ibool store_to_hash, /* in: TRUE if the records should be stored + to the hash table; this is set to FALSE if just + debug checking is needed */ + byte* buf, /* in: buffer containing a log segment or + garbage */ + ulint len, /* in: buffer length */ + dulint start_lsn, /* in: buffer start lsn */ + dulint* contiguous_lsn, /* in/out: it is known that all log groups + contain contiguous log data up to this lsn */ + dulint* group_scanned_lsn)/* out: scanning succeeded up to this lsn */ +{ + byte* log_block; + ulint no; + dulint scanned_lsn; + ibool finished; + ulint data_len; + ibool more_data; + + ut_ad(ut_dulint_get_low(start_lsn) % OS_FILE_LOG_BLOCK_SIZE == 0); + ut_ad(len % OS_FILE_LOG_BLOCK_SIZE == 0); + ut_ad(len > 0); + + finished = FALSE; + + log_block = buf; + scanned_lsn = start_lsn; + more_data = FALSE; + + while (log_block < buf + len && !finished) { + + no = log_block_get_hdr_no(log_block); + + /* fprintf(stderr, "Log block header no %lu\n", no); */ + + if (no != log_block_get_trl_no(log_block) + || no != log_block_convert_lsn_to_no(scanned_lsn)) { + + /* Garbage or an incompletely written log block */ + + finished = TRUE; + + break; + } + + if (log_block_get_flush_bit(log_block)) { + /* This block was a start of a log flush operation: + we know that the previous flush operation must have + been completed for all log groups before this block + can have been flushed to any of the groups. Therefore, + we know that log data is contiguous up to scanned_lsn + in all non-corrupt log groups. */ + + if (ut_dulint_cmp(scanned_lsn, *contiguous_lsn) > 0) { + *contiguous_lsn = scanned_lsn; + } + } + + data_len = log_block_get_data_len(log_block); + + if ((store_to_hash || (data_len == OS_FILE_LOG_BLOCK_SIZE)) + && (ut_dulint_cmp(ut_dulint_add(scanned_lsn, data_len), + recv_sys->scanned_lsn) > 0) + && (recv_sys->scanned_checkpoint_no > 0) + && (log_block_get_checkpoint_no(log_block) + < recv_sys->scanned_checkpoint_no) + && (recv_sys->scanned_checkpoint_no + - log_block_get_checkpoint_no(log_block) + > 0x80000000)) { + + /* Garbage from a log buffer flush which was made + before the most recent database recovery */ + + finished = TRUE; +#ifdef UNIV_LOG_DEBUG + /* This is not really an error, but currently + we stop here in the debug version: */ + + ut_error; +#endif + break; + } + + if (ut_dulint_is_zero(recv_sys->parse_start_lsn) + && (log_block_get_first_rec_group(log_block) > 0)) { + + /* We found a point from which to start the parsing + of log records */ + + recv_sys->parse_start_lsn = + ut_dulint_add(scanned_lsn, + log_block_get_first_rec_group(log_block)); + recv_sys->scanned_lsn = recv_sys->parse_start_lsn; + recv_sys->recovered_lsn = recv_sys->parse_start_lsn; + } + + scanned_lsn = ut_dulint_add(scanned_lsn, data_len); + + if (ut_dulint_cmp(scanned_lsn, recv_sys->scanned_lsn) > 0) { + + /* We were able to find more log data: add it to the + parsing buffer if parse_start_lsn is already non-zero */ + + more_data = recv_sys_add_to_parsing_buf(log_block, + scanned_lsn); + recv_sys->scanned_lsn = scanned_lsn; + recv_sys->scanned_checkpoint_no = + log_block_get_checkpoint_no(log_block); + } + + if (data_len < OS_FILE_LOG_BLOCK_SIZE) { + /* Log data for this group ends here */ + + finished = TRUE; + } else { + log_block += OS_FILE_LOG_BLOCK_SIZE; + } + } + + *group_scanned_lsn = scanned_lsn; + + if (more_data) { + fprintf(stderr, +"Innobase: Doing recovery: scanned up to log sequence number %lu %lu\n", + ut_dulint_get_high(*group_scanned_lsn), + ut_dulint_get_low(*group_scanned_lsn)); + + /* Try to parse more log records */ + + recv_parse_log_recs(store_to_hash); + + if (recv_sys->recovered_offset > RECV_PARSING_BUF_SIZE / 4) { + /* Move parsing buffer data to the buffer start */ + + recv_sys_justify_left_parsing_buf(); + } + } + + return(finished); +} + +/*********************************************************** +Scans log from a buffer and stores new log data to the parsing buffer. Parses +and hashes the log records if new data found. */ +static +void +recv_group_scan_log_recs( +/*=====================*/ + log_group_t* group, /* in: log group */ + dulint* contiguous_lsn, /* in/out: it is known that all log groups + contain contiguous log data up to this lsn */ + dulint* group_scanned_lsn)/* out: scanning succeeded up to this lsn */ +{ + ibool finished; + dulint start_lsn; + dulint end_lsn; + + finished = FALSE; + + start_lsn = *contiguous_lsn; + + while (!finished) { + end_lsn = ut_dulint_add(start_lsn, RECV_SCAN_SIZE); + + log_group_read_log_seg(LOG_RECOVER, log_sys->buf, + group, start_lsn, end_lsn); + + finished = recv_scan_log_recs(TRUE, log_sys->buf, + RECV_SCAN_SIZE, start_lsn, + contiguous_lsn, + group_scanned_lsn); + start_lsn = end_lsn; + } + + if (log_debug_writes) { + fprintf(stderr, + "Innobase: Scanned group %lu up to log sequence number %lu %lu\n", + group->id, + ut_dulint_get_high(*group_scanned_lsn), + ut_dulint_get_low(*group_scanned_lsn)); + } +} + +/************************************************************ +Recovers from a checkpoint. When this function returns, the database is able +to start processing of new user transactions, but the function +recv_recovery_from_checkpoint_finish should be called later to complete +the recovery and free the resources used in it. */ + +ulint +recv_recovery_from_checkpoint_start( +/*================================*/ + /* out: error code or DB_SUCCESS */ + ulint type, /* in: LOG_CHECKPOINT or LOG_ARCHIVE */ + dulint limit_lsn, /* in: recover up to this lsn if possible */ + dulint min_flushed_lsn,/* in: min flushed lsn from data files */ + dulint max_flushed_lsn)/* in: max flushed lsn from data files */ +{ + log_group_t* group; + log_group_t* max_cp_group; + log_group_t* up_to_date_group; + ulint max_cp_field; + dulint checkpoint_lsn; + dulint checkpoint_no; + dulint old_scanned_lsn; + dulint group_scanned_lsn; + dulint contiguous_lsn; + dulint archived_lsn; + ulint capacity; + byte* buf; + ulint err; + + ut_ad((type != LOG_CHECKPOINT) + || (ut_dulint_cmp(limit_lsn, ut_dulint_max) == 0)); + + if (type == LOG_CHECKPOINT) { + + recv_sys_create(); + recv_sys_init(); + } + + sync_order_checks_on = TRUE; + + recv_recovery_on = TRUE; + + recv_sys->limit_lsn = limit_lsn; + + mutex_enter(&(log_sys->mutex)); + + /* Look for the latest checkpoint from any of the log groups */ + + err = recv_find_max_checkpoint(&max_cp_group, &max_cp_field); + + if (err != DB_SUCCESS) { + + mutex_exit(&(log_sys->mutex)); + + return(err); + } + + log_group_read_checkpoint_info(max_cp_group, max_cp_field); + + buf = log_sys->checkpoint_buf; + + checkpoint_lsn = mach_read_from_8(buf + LOG_CHECKPOINT_LSN); + checkpoint_no = mach_read_from_8(buf + LOG_CHECKPOINT_NO); + archived_lsn = mach_read_from_8(buf + LOG_CHECKPOINT_ARCHIVED_LSN); + + group = UT_LIST_GET_FIRST(log_sys->log_groups); + + while (group) { + log_checkpoint_get_nth_group_info(buf, group->id, + &(group->archived_file_no), + &(group->archived_offset)); + + group = UT_LIST_GET_NEXT(log_groups, group); + } + + if (type == LOG_CHECKPOINT) { + /* Start reading the log groups from the checkpoint lsn up. The + variable contiguous_lsn contains an lsn up to which the log is + known to be contiguously written to all log groups. */ + + recv_sys->parse_start_lsn = checkpoint_lsn; + recv_sys->scanned_lsn = checkpoint_lsn; + recv_sys->scanned_checkpoint_no = 0; + recv_sys->recovered_lsn = checkpoint_lsn; + + /* NOTE: we always do recovery at startup, but only if + there is something wrong we will print a message to the + user about recovery: */ + + if (ut_dulint_cmp(checkpoint_lsn, max_flushed_lsn) != 0 + || ut_dulint_cmp(checkpoint_lsn, min_flushed_lsn) != 0) { + + fprintf(stderr, + "Innobase: Database was not shut down normally.\n" + "Innobase: Starting recovery from log files...\n"); + fprintf(stderr, + "Innobase: Starting log scan based on checkpoint at\n" + "Innobase: log sequence number %lu %lu\n", + ut_dulint_get_high(checkpoint_lsn), + ut_dulint_get_low(checkpoint_lsn)); + } + } + + contiguous_lsn = ut_dulint_align_down(recv_sys->scanned_lsn, + OS_FILE_LOG_BLOCK_SIZE); + if (type == LOG_ARCHIVE) { + /* Try to recover the remaining part from logs: first from + the logs of the archived group */ + + group = recv_sys->archive_group; + capacity = log_group_get_capacity(group); + + if ((ut_dulint_cmp(recv_sys->scanned_lsn, + ut_dulint_add(checkpoint_lsn, capacity)) > 0) + || (ut_dulint_cmp(checkpoint_lsn, + ut_dulint_add(recv_sys->scanned_lsn, capacity)) > 0)) { + + mutex_exit(&(log_sys->mutex)); + + /* The group does not contain enough log: probably + an archived log file was missing or corrupt */ + + return(DB_ERROR); + } + + recv_group_scan_log_recs(group, &contiguous_lsn, + &group_scanned_lsn); + if (ut_dulint_cmp(recv_sys->scanned_lsn, checkpoint_lsn) < 0) { + + mutex_exit(&(log_sys->mutex)); + + /* The group did not contain enough log: an archived + log file was missing or invalid, or the log group + was corrupt */ + + return(DB_ERROR); + } + + group->scanned_lsn = group_scanned_lsn; + up_to_date_group = group; + } else { + up_to_date_group = max_cp_group; + } + + ut_ad(RECV_SCAN_SIZE <= log_sys->buf_size); + + group = UT_LIST_GET_FIRST(log_sys->log_groups); + + if ((type == LOG_ARCHIVE) && (group == recv_sys->archive_group)) { + group = UT_LIST_GET_NEXT(log_groups, group); + } + + while (group) { + old_scanned_lsn = recv_sys->scanned_lsn; + + recv_group_scan_log_recs(group, &contiguous_lsn, + &group_scanned_lsn); + group->scanned_lsn = group_scanned_lsn; + + if (ut_dulint_cmp(old_scanned_lsn, group_scanned_lsn) < 0) { + /* We found a more up-to-date group */ + + up_to_date_group = group; + } + + if ((type == LOG_ARCHIVE) + && (group == recv_sys->archive_group)) { + group = UT_LIST_GET_NEXT(log_groups, group); + } + + group = UT_LIST_GET_NEXT(log_groups, group); + } + + if (ut_dulint_cmp(recv_sys->recovered_lsn, checkpoint_lsn) < 0) { + + mutex_exit(&(log_sys->mutex)); + + if (ut_dulint_cmp(recv_sys->recovered_lsn, limit_lsn) >= 0) { + + return(DB_SUCCESS); + } + + ut_error; + + return(DB_ERROR); + } + + /* Synchronize the uncorrupted log groups to the most up-to-date log + group; we also copy checkpoint info to groups */ + + log_sys->next_checkpoint_lsn = checkpoint_lsn; + log_sys->next_checkpoint_no = ut_dulint_add(checkpoint_no, 1); + + log_sys->archived_lsn = archived_lsn; + + recv_synchronize_groups(up_to_date_group); + + log_sys->lsn = recv_sys->recovered_lsn; + + ut_memcpy(log_sys->buf, recv_sys->last_block, OS_FILE_LOG_BLOCK_SIZE); + + log_sys->buf_free = ut_dulint_get_low(log_sys->lsn) + % OS_FILE_LOG_BLOCK_SIZE; + log_sys->buf_next_to_write = log_sys->buf_free; + log_sys->written_to_some_lsn = log_sys->lsn; + log_sys->written_to_all_lsn = log_sys->lsn; + + log_sys->last_checkpoint_lsn = checkpoint_lsn; + + log_sys->next_checkpoint_no = ut_dulint_add(checkpoint_no, 1); + + if (ut_dulint_cmp(archived_lsn, ut_dulint_max) == 0) { + + log_sys->archiving_state = LOG_ARCH_OFF; + } + + mutex_enter(&(recv_sys->mutex)); + + recv_sys->apply_log_recs = TRUE; + + mutex_exit(&(recv_sys->mutex)); + + mutex_exit(&(log_sys->mutex)); + + sync_order_checks_on = FALSE; + + /* The database is now ready to start almost normal processing of user + transactions: transaction rollbacks and the application of the log + records in the hash table can be run in background. */ + + return(DB_SUCCESS); +} + +/************************************************************ +Completes recovery from a checkpoint. */ + +void +recv_recovery_from_checkpoint_finish(void) +/*======================================*/ +{ + /* Rollback the uncommitted transactions which have no user session */ + + trx_rollback_all_without_sess(); + + /* Apply the hashed log records to the respective file pages */ + + recv_apply_hashed_log_recs(TRUE); + + if (log_debug_writes) { + fprintf(stderr, + "Innobase: Log records applied to the database\n"); + } + + /* Free the resources of the recovery system */ + + recv_recovery_on = FALSE; +#ifndef UNIV_LOG_DEBUG + recv_sys_free(); +#endif +} + +/********************************************************** +Resets the logs. The contents of log files will be lost! */ + +void +recv_reset_logs( +/*============*/ + dulint lsn, /* in: reset to this lsn rounded up to + be divisible by OS_FILE_LOG_BLOCK_SIZE, + after which we add LOG_BLOCK_HDR_SIZE */ + ulint arch_log_no, /* in: next archived log file number */ + ibool new_logs_created)/* in: TRUE if resetting logs is done + at the log creation; FALSE if it is done + after archive recovery */ +{ + log_group_t* group; + + ut_ad(mutex_own(&(log_sys->mutex))); + + log_sys->lsn = ut_dulint_align_up(lsn, OS_FILE_LOG_BLOCK_SIZE); + + group = UT_LIST_GET_FIRST(log_sys->log_groups); + + while (group) { + group->lsn = log_sys->lsn; + group->lsn_offset = LOG_FILE_HDR_SIZE; + + group->archived_file_no = arch_log_no; + group->archived_offset = 0; + + if (!new_logs_created) { + recv_truncate_group(group, group->lsn, group->lsn, + group->lsn, group->lsn); + } + + group = UT_LIST_GET_NEXT(log_groups, group); + } + + log_sys->buf_next_to_write = 0; + log_sys->written_to_some_lsn = log_sys->lsn; + log_sys->written_to_all_lsn = log_sys->lsn; + + log_sys->next_checkpoint_no = ut_dulint_zero; + log_sys->last_checkpoint_lsn = ut_dulint_zero; + + log_sys->archived_lsn = log_sys->lsn; + + log_block_init(log_sys->buf, log_sys->lsn); + log_block_set_first_rec_group(log_sys->buf, LOG_BLOCK_HDR_SIZE); + + log_sys->buf_free = LOG_BLOCK_HDR_SIZE; + log_sys->lsn = ut_dulint_add(log_sys->lsn, LOG_BLOCK_HDR_SIZE); + + mutex_exit(&(log_sys->mutex)); + + /* Reset the checkpoint fields in logs */ + + log_make_checkpoint_at(ut_dulint_max, TRUE); + log_make_checkpoint_at(ut_dulint_max, TRUE); + + mutex_enter(&(log_sys->mutex)); +} + +/********************************************************** +Reads from the archive of a log group and performs recovery. */ +static +ibool +log_group_recover_from_archive_file( +/*================================*/ + /* out: TRUE if no more complete + consistent archive files */ + log_group_t* group) /* in: log group */ +{ + os_file_t file_handle; + dulint start_lsn; + dulint file_end_lsn; + dulint dummy_lsn; + dulint scanned_lsn; + ulint len; + char name[10000]; + ibool ret; + byte* buf; + ulint read_offset; + ulint file_size; + ulint file_size_high; + int input_char; + +try_open_again: + buf = log_sys->buf; + + /* Add the file to the archive file space; open the file */ + + log_archived_file_name_gen(name, group->id, group->archived_file_no); + + fil_reserve_right_to_open(); + + file_handle = os_file_create(name, OS_FILE_OPEN, OS_FILE_AIO, &ret); + + if (ret == FALSE) { + fil_release_right_to_open(); +ask_again: + fprintf(stderr, + "Innobase: Do you want to copy additional archived log files\n" + "Innobase: to the directory\n"); + fprintf(stderr, + "Innobase: or were these all the files needed in recovery?\n"); + fprintf(stderr, + "Innobase: (Y == copy more files; N == this is all)?"); + + input_char = getchar(); + + if (input_char == (int) 'N') { + + return(TRUE); + } else if (input_char == (int) 'Y') { + + goto try_open_again; + } else { + goto ask_again; + } + } + + ret = os_file_get_size(file_handle, &file_size, &file_size_high); + ut_a(ret); + + ut_a(file_size_high == 0); + + fprintf(stderr, "Innobase: Opened archived log file %s\n", name); + + ret = os_file_close(file_handle); + + if (file_size < LOG_FILE_HDR_SIZE) { + fprintf(stderr, + "Innobase: Archive file header incomplete %s\n", name); + + return(TRUE); + } + + ut_a(ret); + + fil_release_right_to_open(); + + /* Add the archive file as a node to the space */ + + fil_node_create(name, 1 + file_size / UNIV_PAGE_SIZE, + group->archive_space_id); + ut_a(RECV_SCAN_SIZE >= LOG_FILE_HDR_SIZE); + + /* Read the archive file header */ + fil_io(OS_FILE_READ | OS_FILE_LOG, TRUE, group->archive_space_id, 0, 0, + LOG_FILE_HDR_SIZE, buf, NULL); + + /* Check if the archive file header is consistent */ + + if (mach_read_from_4(buf + LOG_GROUP_ID) != group->id + || mach_read_from_4(buf + LOG_FILE_NO) + != group->archived_file_no) { + fprintf(stderr, + "Innobase: Archive file header inconsistent %s\n", name); + + return(TRUE); + } + + if (!mach_read_from_4(buf + LOG_FILE_ARCH_COMPLETED)) { + fprintf(stderr, + "Innobase: Archive file not completely written %s\n", name); + + return(TRUE); + } + + start_lsn = mach_read_from_8(buf + LOG_FILE_START_LSN); + file_end_lsn = mach_read_from_8(buf + LOG_FILE_END_LSN); + + if (ut_dulint_is_zero(recv_sys->scanned_lsn)) { + + if (ut_dulint_cmp(recv_sys->parse_start_lsn, start_lsn) < 0) { + fprintf(stderr, + "Innobase: Archive log file %s starts from too big a lsn\n", + name); + return(TRUE); + } + + recv_sys->scanned_lsn = start_lsn; + } + + if (ut_dulint_cmp(recv_sys->scanned_lsn, start_lsn) != 0) { + + fprintf(stderr, + "Innobase: Archive log file %s starts from a wrong lsn\n", + name); + return(TRUE); + } + + read_offset = LOG_FILE_HDR_SIZE; + + for (;;) { + len = RECV_SCAN_SIZE; + + if (read_offset + len > file_size) { + len = ut_calc_align_down(file_size - read_offset, + OS_FILE_LOG_BLOCK_SIZE); + } + + if (len == 0) { + + break; + } + + if (log_debug_writes) { + fprintf(stderr, +"Innobase: Archive read starting at lsn %lu %lu, len %lu from file %s\n", + ut_dulint_get_high(start_lsn), + ut_dulint_get_low(start_lsn), + len, name); + } + + fil_io(OS_FILE_READ | OS_FILE_LOG, TRUE, + group->archive_space_id, read_offset / UNIV_PAGE_SIZE, + read_offset % UNIV_PAGE_SIZE, len, buf, NULL); + + + ret = recv_scan_log_recs(TRUE, buf, len, start_lsn, + &dummy_lsn, &scanned_lsn); + + if (ut_dulint_cmp(scanned_lsn, file_end_lsn) == 0) { + + return(FALSE); + } + + if (ret) { + fprintf(stderr, + "Innobase: Archive log file %s does not scan right\n", + name); + return(TRUE); + } + + read_offset += len; + start_lsn = ut_dulint_add(start_lsn, len); + + ut_ad(ut_dulint_cmp(start_lsn, scanned_lsn) == 0); + } + + return(FALSE); +} + +/************************************************************ +Recovers from archived log files, and also from log files, if they exist. */ + +ulint +recv_recovery_from_archive_start( +/*=============================*/ + /* out: error code or DB_SUCCESS */ + dulint min_flushed_lsn,/* in: min flushed lsn field from the + data files */ + dulint limit_lsn, /* in: recover up to this lsn if possible */ + ulint first_log_no) /* in: number of the first archived log file + to use in the recovery; the file will be + searched from INNOBASE_LOG_ARCH_DIR specified + in server config file */ +{ + log_group_t* group; + ulint group_id; + ulint trunc_len; + ibool ret; + ulint err; + + recv_sys_create(); + recv_sys_init(); + + sync_order_checks_on = TRUE; + + recv_recovery_on = TRUE; + recv_recovery_from_backup_on = TRUE; + + recv_sys->limit_lsn = limit_lsn; + + group_id = 0; + + group = UT_LIST_GET_FIRST(log_sys->log_groups); + + while (group) { + if (group->id == group_id) { + + break; + } + + group = UT_LIST_GET_NEXT(log_groups, group); + } + + if (!group) { + fprintf(stderr, + "Innobase: There is no log group defined with id %lu!\n", + group_id); + return(DB_ERROR); + } + + group->archived_file_no = first_log_no; + + recv_sys->parse_start_lsn = min_flushed_lsn; + + recv_sys->scanned_lsn = ut_dulint_zero; + recv_sys->scanned_checkpoint_no = 0; + recv_sys->recovered_lsn = recv_sys->parse_start_lsn; + + recv_sys->archive_group = group; + + ret = FALSE; + + mutex_enter(&(log_sys->mutex)); + + while (!ret) { + ret = log_group_recover_from_archive_file(group); + + /* Close and truncate a possible processed archive file + from the file space */ + + trunc_len = UNIV_PAGE_SIZE + * fil_space_get_size(group->archive_space_id); + if (trunc_len > 0) { + fil_space_truncate_start(group->archive_space_id, + trunc_len); + } + + group->archived_file_no++; + } + + if (ut_dulint_cmp(recv_sys->recovered_lsn, limit_lsn) < 0) { + + if (ut_dulint_is_zero(recv_sys->scanned_lsn)) { + + recv_sys->scanned_lsn = recv_sys->parse_start_lsn; + } + + mutex_exit(&(log_sys->mutex)); + + err = recv_recovery_from_checkpoint_start(LOG_ARCHIVE, + limit_lsn, + ut_dulint_max, + ut_dulint_max); + if (err != DB_SUCCESS) { + + return(err); + } + + mutex_enter(&(log_sys->mutex)); + } + + if (ut_dulint_cmp(limit_lsn, ut_dulint_max) != 0) { + + recv_apply_hashed_log_recs(FALSE); + + recv_reset_logs(recv_sys->recovered_lsn, 0, FALSE); + } + + mutex_exit(&(log_sys->mutex)); + + sync_order_checks_on = FALSE; + + return(DB_SUCCESS); +} + +/************************************************************ +Completes recovery from archive. */ + +void +recv_recovery_from_archive_finish(void) +/*===================================*/ +{ + recv_recovery_from_checkpoint_finish(); + + recv_recovery_from_backup_on = FALSE; +} diff --git a/innobase/log/makefilewin b/innobase/log/makefilewin new file mode 100644 index 00000000000..a690af3bb35 --- /dev/null +++ b/innobase/log/makefilewin @@ -0,0 +1,10 @@ +include ..\include\makefile.i + +log.lib: log0log.obj log0recv.obj + lib -out:..\libs\log.lib log0log.obj log0recv.obj + +log0log.obj: log0log.c + $(CCOM) $(CFL) -c log0log.c + +log0recv.obj: log0recv.c + $(CCOM) $(CFL) -c log0recv.c diff --git a/innobase/log/trash/log0trsh.c b/innobase/log/trash/log0trsh.c new file mode 100644 index 00000000000..7f48118a0d1 --- /dev/null +++ b/innobase/log/trash/log0trsh.c @@ -0,0 +1,648 @@ +/****************************************************** +Recovery + +(c) 1997 Innobase Oy + +Created 9/20/1997 Heikki Tuuri +*******************************************************/ + +#include "log0recv.h" + +#ifdef UNIV_NONINL +#include "log0recv.ic" +#endif + +#include "mem0mem.h" +#include "buf0buf.h" +#include "buf0flu.h" +#include "srv0srv.h" + +/* Size of block reads when the log groups are scanned forward to do +roll-forward */ +#define RECV_SCAN_SIZE (4 * UNIV_PAGE_SIZE) + +/* Size of block reads when the log groups are scanned backwards to synchronize +them */ +#define RECV_BACK_SCAN_SIZE (4 * UNIV_PAGE_SIZE) + +recv_sys_t* recv_sys = NULL; + +recv_recover_page(block->frame, block->space, block->offset); + +/************************************************************ +Creates the recovery system. */ + +void +recv_sys_create(void) +/*=================*/ +{ + ut_a(recv_sys == NULL); + + recv_sys = mem_alloc(sizeof(recv_t)); + + mutex_create(&(recv_sys->mutex)); + + recv_sys->hash = NULL; + recv_sys->heap = NULL; +} + +/************************************************************ +Inits the recovery system for a recovery operation. */ + +void +recv_sys_init(void) +/*===============*/ +{ + recv_sys->hash = hash_create(buf_pool_get_curr_size() / 64); + recv_sys->heap = mem_heap_create_in_buffer(256); +} + +/************************************************************ +Empties the recovery system. */ + +void +recv_sys_empty(void) +/*================*/ +{ + mutex_enter(&(recv_sys->mutex)); + + hash_free(recv_sys->hash); + mem_heap_free(recv_sys->heap); + + recv_sys->hash = NULL; + recv_sys->heap = NULL; + + mutex_exit(&(recv_sys->mutex)); +} + +/*********************************************************** +For recovery purposes copies the log buffer to a group to synchronize log +data. */ +static +void +recv_log_buf_flush( +/*===============*/ + log_group_t* group, /* in: log group */ + dulint start_lsn, /* in: start lsn of the log data in + the log buffer; must be divisible by + OS_FILE_LOG_BLOCK_SIZE */ + dulint end_lsn) /* in: end lsn of the log data in the + log buffer; must be divisible by + OS_FILE_LOG_BLOCK_SIZE */ +{ + ulint len; + + ut_ad(mutex_own(&(log_sys->mutex))); + + len = ut_dulint_minus(end_lsn, start_lsn); + + log_group_write_buf(LOG_RECOVER, group, log_sys->buf, len, start_lsn, + 0); +} + +/*********************************************************** +Compares two buffers containing log segments and determines the highest lsn +where they match, if any. */ +static +dulint +recv_log_bufs_cmp( +/*==============*/ + /* out: if no match found, ut_dulint_zero or + if start_lsn == LOG_START_LSN, returns + LOG_START_LSN; otherwise the highest matching + lsn */ + byte* recv_buf, /* in: buffer containing valid log data */ + byte* buf, /* in: buffer of data from a possibly + incompletely written log group */ + dulint start_lsn, /* in: buffer start lsn, must be divisible + by OS_FILE_LOG_BLOCK_SIZE and must be >= + LOG_START_LSN */ + dulint end_lsn, /* in: buffer end lsn, must be divisible + by OS_FILE_LOG_BLOCK_SIZE */ + dulint recovered_lsn) /* in: recovery succeeded up to this lsn */ +{ + ulint len; + ulint offset; + byte* log_block1; + byte* log_block2; + ulint no; + ulint data_len; + + ut_ad(ut_dulint_cmp(start_lsn, LOG_START_LSN) >= 0); + + if (ut_dulint_cmp(end_lsn, recovered_lsn) > 0) { + end_lsn = ut_dulint_align_up(recovered_lsn, + OS_FILE_LOG_BLOCK_SIZE); + } + + len = ut_dulint_minus(end_lsn, start_lsn); + + if (len == 0) { + + goto no_match; + } + + ut_ad(len % OS_FILE_LOG_BLOCK_SIZE == 0); + + log_block1 = recv_buf + len; + log_block2 = buf + len; + + for (;;) { + log_block1 -= OS_FILE_LOG_BLOCK_SIZE; + log_block2 -= OS_FILE_LOG_BLOCK_SIZE; + + no = log_block_get_hdr_no(log_block1); + ut_a(no == log_block_get_trl_no(log_block1)); + + if ((no == log_block_get_hdr_no(log_block2)) + && (no == log_block_get_trl_no(log_block2))) { + + /* Match found if the block is not corrupted */ + + data_len = log_block_get_data_len(log_block2); + + if (0 == ut_memcmp(log_block1 + LOG_BLOCK_DATA, + log_block2 + LOG_BLOCK_DATA, + data_len - LOG_BLOCK_DATA)) { + + /* Match found */ + + return(ut_dulint_add(start_lsn, + log_block2 - buf + data_len)); + } + } + + if (log_block1 == recv_buf) { + + /* No match found */ + + break; + } + } +no_match: + if (ut_dulint_cmp(start_lsn, LOG_START_LSN) == 0) { + + return(LOG_START_LSN); + } + + return(ut_dulint_zero); +} + +/************************************************************ +Copies a log segment from the most up-to-date log group to the other log +group, so that it contains the latest log data. */ +static +void +recv_copy_group( +/*============*/ + log_group_t* up_to_date_group, /* in: the most up-to-date + log group */ + log_group_t* group, /* in: copy to this log group */ + dulint_lsn recovered_lsn) /* in: recovery succeeded up + to this lsn */ +{ + dulint start_lsn; + dulint end_lsn; + dulint match; + byte* buf; + byte* buf1; + + ut_ad(mutex_own(&(log_sys->mutex))); + + if (0 == ut_dulint_cmp(LOG_START_LSN, recovered_lsn)) { + + return; + } + + ut_ad(RECV_BACK_SCAN_SIZE <= log_sys->buf_size); + + buf1 = mem_alloc(2 * RECV_BACK_SCAN_SIZE); + buf = ut_align(buf, RECV_BACK_SCAN_SIZE);); + + end_lsn = ut_dulint_align_up(recovered_lsn, RECV_BACK_SCAN_SIZE); + + match = ut_dulint_zero; + + for (;;) { + if (ut_dulint_cmp(ut_dulint_add(LOG_START_LSN, + RECV_BACK_SCAN_SIZE), end_lsn) >= 0) { + start_lsn = LOG_START_LSN; + } else { + start_lsn = ut_dulint_subtract(end_lsn, + RECV_BACK_SCAN_SIZE); + } + + log_group_read_log_seg(LOG_RECOVER, buf, group, start_lsn, + end_lsn); + log_group_read_log_seg(LOG_RECOVER, log_sys->buf, + up_to_date_group, start_lsn, end_lsn); + + match = recv_log_bufs_cmp(log_sys->buf, buf, start_lsn, + end_lsn, recovered_lsn); + + if (ut_dulint_cmp(match, recovered_lsn) != 0) { + recv_log_buf_flush(group, start_lsn, end_lsn); + } + + if (!ut_dulint_zero(match)) { + + mem_free(buf1); + + return; + } + + end_lsn = start_lsn; + } +} + +/************************************************************ +Copies a log segment from the most up-to-date log group to the other log +groups, so that they all contain the latest log data. Also writes the info +about the latest checkpoint to the groups, and inits the fields in the group +memory structs to up-to-date values. */ + +void +recv_synchronize_groups( +/*====================*/ + log_group_t* up_to_date_group, /* in: the most up-to-date + log group */ + dulint_lsn recovered_lsn, /* in: recovery succeeded up + to this lsn */ + log_group_t* max_checkpoint_group) /* in: the group with the most + recent checkpoint info */ +{ + log_group_t* group; + + ut_ad(mutex_own(&(log_sys->mutex))); + + group = UT_LIST_GET_FIRST(log_sys->log_groups); + + while (group) { + if (group != up_to_date_group) { + + /* Copy log data */ + + recv_copy_group(group, up_to_date_group, + recovered_lsn); + } + + if (group != max_checkpoint_group) { + + /* Copy the checkpoint info to the group */ + + log_group_checkpoint(group); + + mutex_exit(&(log_sys->mutex)); + + /* Wait for the checkpoint write to complete */ + rw_lock_s_lock(&(log_sys->checkpoint_lock)); + rw_lock_s_unlock(&(log_sys->checkpoint_lock)); + + mutex_enter(&(log_sys->mutex)); + } + + /* Update the fields in the group struct to correspond to + recovered_lsn */ + + log_group_set_fields(group, recovered_lsn); + + group = UT_LIST_GET_NEXT(log_groups, group); + } +} + +/************************************************************ +Looks for the maximum consistent checkpoint from the log groups. */ +static +ulint +recv_find_max_checkpoint( +/*=====================*/ + /* out: error code or DB_SUCCESS */ + log_group_t** max_group, /* out: max group */ + ulint* max_field) /* out: LOG_CHECKPOINT_1 or + LOG_CHECKPOINT_2 */ +{ + log_group_t* group; + dulint max_no; + dulint cp_no; + ulint field; + ulint fold; + byte* buf; + + ut_ad(mutex_own(&(log_sys->mutex))); + + /* Look for the latest checkpoint from the log groups */ + group = UT_LIST_GET_FIRST(log_sys->log_groups); + + checkpoint_no = ut_dulint_zero; + checkpoint_lsn = ut_dulint_zero; + *max_group = NULL; + + buf = log_sys->checkpoint_buf; + + while (group) { + group->state = LOG_GROUP_CORRUPTED; + + for (field = LOG_CHECKPOINT_1; field <= LOG_CHECKPOINT_2; + field += LOG_CHECKPOINT_2 - LOG_CHECKPOINT_1) { + + log_group_read_checkpoint_info(group, field); + + /* Check the consistency of the checkpoint info */ + fold = ut_fold_binary(buf, LOG_CHECKPOINT_CHECKSUM_1); + + if (fold != mach_read_from_4(buf + + LOG_CHECKPOINT_CHECKSUM_1)) { + goto not_consistent; + } + + fold = ut_fold_binary(buf + LOG_CHECKPOINT_LSN, + LOG_CHECKPOINT_CHECKSUM_2 + - LOG_CHECKPOINT_LSN); + if (fold != mach_read_from_4(buf + + LOG_CHECKPOINT_CHECKSUM_2)) { + goto not_consistent; + } + + group->state = LOG_GROUP_OK; + + group->lsn = mach_read_from_8(buf + + LOG_CHECKPOINT_LSN); + group->lsn_offset = mach_read_from_4(buf + + LOG_CHECKPOINT_OFFSET); + group->lsn_file_count = mach_read_from_4( + buf + LOG_CHECKPOINT_FILE_COUNT); + + cp_no = mach_read_from_8(buf + LOG_CHECKPOINT_NO); + + if (ut_dulint_cmp(cp_no, max_no) >= 0) { + *max_group = group; + *max_field = field; + max_no = cp_no; + } + + not_consistent: + } + + group = UT_LIST_GET_NEXT(log_groups, group); + } + + if (*max_group == NULL) { + + return(DB_ERROR); + } + + return(DB_SUCCESS); +} + +/*********************************************************** +Parses log records from a buffer and stores them to a hash table to wait +merging to file pages. If the hash table becomes too big, merges automatically +it to file pages. */ +static +bool +recv_parse_and_hash_log_recs( +/*=========================*/ + /* out: TRUE if limit_lsn has been reached */ + byte* buf, /* in: buffer containing a log segment or + garbage */ + ulint len, /* in: buffer length */ + dulint start_lsn, /* in: buffer start lsn */ + dulint limit_lsn, /* in: recover at least to this lsn */ + dulint* recovered_lsn) /* out: was able to parse up to this lsn */ +{ + +} + +/************************************************************ +Recovers from a checkpoint. When this function returns, the database is able +to start processing new user transactions, but the function +recv_recovery_from_checkpoint_finish should be called later to complete +the recovery and free the resources used in it. */ + +ulint +recv_recovery_from_checkpoint_start( +/*================================*/ + /* out: error code or DB_SUCCESS */ + dulint limit_lsn) /* in: recover up to this lsn if possible */ +{ + log_group_t* max_cp_group; + log_group_t* up_to_date_group; + ulint max_cp_field; + byte* buf; + ulint err; + dulint checkpoint_lsn; + dulint checkpoint_no; + dulint recovered_lsn; + dulint old_lsn; + dulint end_lsn; + dulint start_lsn; + bool finished; + dulint flush_start_lsn; + + mutex_enter(&(log_sys->mutex)); + + /* Look for the latest checkpoint from any of the log groups */ + + err = recv_find_max_checkpoint(&max_cp_group, &max_cp_field); + + if (err != DB_SUCCESS) { + mutex_exit(&(log_sys->mutex)); + + return(err); + } + + log_group_read_checkpoint_info(max_cp_group, max_cp_field); + + buf = log_sys->checkpoint_buf; + + checkpoint_lsn = mach_read_from_8(buf + LOG_CHECKPOINT_LSN); + checkpoint_no = mach_read_from_8(buf + LOG_CHECKPOINT_NO); + + if (ut_dulint_cmp(limit_lsn, checkpoint_lsn) < 0) { + mutex_exit(&(log_sys->mutex)); + + return(DB_ERROR); + } + + /* Start reading the log groups from the checkpoint lsn up. The + variable flush_start_lsn tells a lsn up to which the log is known + to be contiguously written in all log groups. */ + + recovered_lsn = checkpoint_lsn; + flush_start_lsn = ut_dulint_align_down(checkpoint_lsn, + OS_FILE_LOG_BLOCK_SIZE); + up_to_date_group = max_cp_group; + + ut_ad(RECV_SCAN_SIZE <= log_sys->buf_size); + + group = UT_LIST_GET_FIRST(log_sys->log_groups); + + while (group) { + finished = FALSE; + + if (group->state == LOG_GROUP_CORRUPTED) { + finished = TRUE; + } + + start_lsn = flush_start_lsn; + + while (!finished) { + end_lsn = ut_dulint_add(start_lsn, RECV_SCAN_SIZE); + + log_group_read_log_seg(LOG_RECOVER, log_sys->buf, + group, start_lsn, end_lsn); + old_lsn = recovered_lsn; + + finished = recv_parse_and_hash_log_recs(log_sys->buf, + RECV_SCAN_SIZE, start_lsn, + limit_lsn, &flush_start_lsn, + &recovered_lsn); + + if (ut_dulint_cmp(recovered_lsn, old_lsn) > 0) { + + /* We found a more up-to-date group */ + up_to_date_group = group; + } + + start_lsn = end_lsn; + } + + group = UT_LIST_GET_NEXT(log_groups, group); + } + + /* Delete possible corrupted or extra log records from all log + groups */ + + recv_truncate_groups(recovered_lsn); + + /* Synchronize the uncorrupted log groups to the most up-to-date log + group; we may also have to copy checkpoint info to groups */ + + log_sys->next_checkpoint_lsn = checkpoint_lsn; + log_sys->next_checkpoint_no = checkpoint_no; + + recv_synchronize_groups(up_to_date_group, _lsn, max_cp_group); + + log_sys->next_checkpoint_no = ut_dulint_add(checkpoint_no, 1); + + /* The database is now ready to start almost normal processing of user + transactions */ + + return(DB_SUCCESS); +} + +/************************************************************ +Completes recovery from a checkpoint. */ + +void +recv_recovery_from_checkpoint_finish(void) +/*======================================*/ +{ + /* Rollback the uncommitted transactions which have no user session */ + + trx_rollback_all_without_sess(); + + /* Merge the hashed log records */ + + recv_merge_hashed_log_recs(); + + /* Free the resources of the recovery system */ + + recv_sys_empty(); +} + +/**************************************************************** +Writes to the log a record about incrementing the row id counter. */ +UNIV_INLINE +void +log_write_row_id_incr_rec(void) +/*===========================*/ +{ + log_t* log = log_sys; + ulint data_len; + + mutex_enter(&(log->mutex)); + + data_len = (log->buf_free % OS_FILE_LOG_BLOCK_SIZE) + 1; + + if (data_len >= OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_TRL_SIZE) { + + /* The string does not fit within the current log block + or the the block would become full */ + + mutex_exit(&(log->mutex)); + + log_write_row_id_incr_rec_slow(); + + return; + } + + *(log->buf + log->buf_free) = MLOG_INCR_ROW_ID | MLOG_SINGLE_REC_FLAG; + + log_block_set_data_len(ut_align_down(log->buf + log->buf_free, + OS_FILE_LOG_BLOCK_SIZE), + data_len); +#ifdef UNIV_LOG_DEBUG + log->old_buf_free = log->buf_free; + log->old_lsn = log->lsn; + log_check_log_recs(log->buf + log->buf_free, 1, log->lsn); +#endif + log->buf_free++; + + ut_ad(log->buf_free <= log->buf_size); + + UT_DULINT_INC(log->lsn); + + mutex_exit(&(log->mutex)); +} + +/**************************************************************** +Writes to the log a record about incrementing the row id counter. */ +static +void +log_write_row_id_incr_rec_slow(void) +/*================================*/ +{ + byte type; + + log_reserve_and_open(1); + + type = MLOG_INCR_ROW_ID | MLOG_SINGLE_REC_FLAG; + + log_write_low(&type, 1); + + log_close(); + + log_release(); +} + +/************************************************************************** +Parses and applies a log record MLOG_SET_ROW_ID. */ + +byte* +dict_hdr_parse_set_row_id( +/*======================*/ + /* out: end of log record or NULL */ + byte* ptr, /* in: buffer */ + byte* end_ptr,/* in: buffer end */ + page_t* page) /* in: page or NULL */ +{ + dulint dval; + + ptr = mach_dulint_parse_compressed(ptr, end_ptr, &dval); + + if (ptr == NULL) { + + return(NULL); + } + + if (!page) { + + return(ptr); + } + + mach_write_to_8(page + DICT_HDR + DICT_HDR_ROW_ID, dval); + + return(ptr); +} + |