summaryrefslogtreecommitdiff
path: root/innobase/log
diff options
context:
space:
mode:
Diffstat (limited to 'innobase/log')
-rw-r--r--innobase/log/Makefile.am24
-rw-r--r--innobase/log/log0log.c2781
-rw-r--r--innobase/log/log0recv.c2512
-rw-r--r--innobase/log/makefilewin10
-rw-r--r--innobase/log/trash/log0trsh.c648
5 files changed, 5975 insertions, 0 deletions
diff --git a/innobase/log/Makefile.am b/innobase/log/Makefile.am
new file mode 100644
index 00000000000..3910a25ab1a
--- /dev/null
+++ b/innobase/log/Makefile.am
@@ -0,0 +1,24 @@
+# Copyright (C) 2000 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
+# & Innobase Oy
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+include ../include/Makefile.i
+
+libs_LIBRARIES = liblog.a
+
+liblog_a_SOURCES = log0log.c log0recv.c
+
+EXTRA_PROGRAMS =
diff --git a/innobase/log/log0log.c b/innobase/log/log0log.c
new file mode 100644
index 00000000000..c6fec44d128
--- /dev/null
+++ b/innobase/log/log0log.c
@@ -0,0 +1,2781 @@
+/******************************************************
+Database log
+
+(c) 1995-1997 Innobase Oy
+
+Created 12/9/1995 Heikki Tuuri
+*******************************************************/
+
+#include "log0log.h"
+
+#ifdef UNIV_NONINL
+#include "log0log.ic"
+#endif
+
+#include "mem0mem.h"
+#include "buf0buf.h"
+#include "buf0flu.h"
+#include "srv0srv.h"
+#include "log0recv.h"
+#include "fil0fil.h"
+#include "dict0boot.h"
+#include "srv0srv.h"
+#include "trx0sys.h"
+#include "trx0trx.h"
+
+/* Global log system variable */
+log_t* log_sys = NULL;
+
+ibool log_do_write = TRUE;
+ibool log_debug_writes = FALSE;
+
+/* Pointer to this variable is used as the i/o-message when we do i/o to an
+archive */
+byte log_archive_io;
+
+/* A margin for free space in the log buffer before a log entry is catenated */
+#define LOG_BUF_WRITE_MARGIN (4 * OS_FILE_LOG_BLOCK_SIZE)
+
+/* Margins for free space in the log buffer after a log entry is catenated */
+#define LOG_BUF_FLUSH_RATIO 2
+#define LOG_BUF_FLUSH_MARGIN (LOG_BUF_WRITE_MARGIN + 4 * UNIV_PAGE_SIZE)
+
+/* Margin for the free space in the smallest log group, before a new query
+step which modifies the database, is started */
+
+#define LOG_CHECKPOINT_FREE_PER_THREAD (4 * UNIV_PAGE_SIZE)
+#define LOG_CHECKPOINT_EXTRA_FREE (8 * UNIV_PAGE_SIZE)
+
+/* This parameter controls asynchronous making of a new checkpoint; the value
+should be bigger than LOG_POOL_PREFLUSH_RATIO_SYNC */
+
+#define LOG_POOL_CHECKPOINT_RATIO_ASYNC 32
+
+/* This parameter controls synchronous preflushing of modified buffer pages */
+#define LOG_POOL_PREFLUSH_RATIO_SYNC 16
+
+/* The same ratio for asynchronous preflushing; this value should be less than
+the previous */
+#define LOG_POOL_PREFLUSH_RATIO_ASYNC 8
+
+/* Extra margin, in addition to one log file, used in archiving */
+#define LOG_ARCHIVE_EXTRA_MARGIN (4 * UNIV_PAGE_SIZE)
+
+/* This parameter controls asynchronous writing to the archive */
+#define LOG_ARCHIVE_RATIO_ASYNC 16
+
+/* Codes used in unlocking flush latches */
+#define LOG_UNLOCK_NONE_FLUSHED_LOCK 1
+#define LOG_UNLOCK_FLUSH_LOCK 2
+
+/* States of an archiving operation */
+#define LOG_ARCHIVE_READ 1
+#define LOG_ARCHIVE_WRITE 2
+
+/**********************************************************
+Calculates the file count of an lsn within a log group. */
+static
+ulint
+log_group_calc_lsn_file_count(
+/*==========================*/
+ /* out: file count within the log group */
+ dulint lsn, /* in: lsn, must be within 4 GB of
+ group->next_block_lsn */
+ log_group_t* group); /* in: log group */
+/**********************************************************
+Completes a checkpoint write i/o to a log file. */
+static
+void
+log_io_complete_checkpoint(
+/*=======================*/
+ log_group_t* group); /* in: log group */
+/**********************************************************
+Completes an archiving i/o. */
+static
+void
+log_io_complete_archive(void);
+/*=========================*/
+/********************************************************************
+Tries to establish a big enough margin of free space in the log groups, such
+that a new log entry can be catenated without an immediate need for a
+archiving. */
+static
+void
+log_archive_margin(void);
+/*====================*/
+
+
+/********************************************************************
+Returns the oldest modified block lsn in the pool, or log_sys->lsn if none
+exists. */
+static
+dulint
+log_buf_pool_get_oldest_modification(void)
+/*======================================*/
+{
+ dulint lsn;
+
+ ut_ad(mutex_own(&(log_sys->mutex)));
+
+ lsn = buf_pool_get_oldest_modification();
+
+ if (ut_dulint_is_zero(lsn)) {
+
+ lsn = log_sys->lsn;
+ }
+
+ return(lsn);
+}
+
+/****************************************************************
+Opens the log for log_write_low. The log must be closed with log_close and
+released with log_release. */
+
+dulint
+log_reserve_and_open(
+/*=================*/
+ /* out: start lsn of the log record */
+ ulint len) /* in: length of data to be catenated */
+{
+ log_t* log = log_sys;
+ ulint len_upper_limit;
+ ulint archived_lsn_age;
+ ulint count = 0;
+ ulint dummy;
+loop:
+ mutex_enter(&(log->mutex));
+
+ /* Calculate an upper limit for the space the string may take in the
+ log buffer */
+
+ len_upper_limit = LOG_BUF_WRITE_MARGIN + (5 * len) / 4;
+
+ if (log->buf_free + len_upper_limit > log->buf_size) {
+
+ mutex_exit(&(log->mutex));
+
+ /* Not enough free space, do a syncronous flush of the log
+ buffer */
+ log_flush_up_to(ut_dulint_max, LOG_WAIT_ALL_GROUPS);
+
+ count++;
+
+ ut_ad(count < 50);
+
+ goto loop;
+ }
+
+ if (log->archiving_state != LOG_ARCH_OFF) {
+
+ archived_lsn_age = ut_dulint_minus(log->lsn, log->archived_lsn);
+
+ if (archived_lsn_age + len_upper_limit
+ > log->max_archived_lsn_age) {
+
+ /* Not enough free archived space in log groups: do a
+ synchronous archive write batch: */
+
+ mutex_exit(&(log->mutex));
+
+ ut_ad(len_upper_limit <= log->max_archived_lsn_age);
+
+ log_archive_do(TRUE, &dummy);
+
+ count++;
+
+ ut_ad(count < 50);
+
+ goto loop;
+ }
+ }
+
+#ifdef UNIV_LOG_DEBUG
+ log->old_buf_free = log->buf_free;
+ log->old_lsn = log->lsn;
+#endif
+ return(log->lsn);
+}
+
+/****************************************************************
+Writes to the log the string given. It is assumed that the caller holds the
+log mutex. */
+
+void
+log_write_low(
+/*==========*/
+ byte* str, /* in: string */
+ ulint str_len) /* in: string length */
+{
+ log_t* log = log_sys;
+ ulint len;
+ ulint data_len;
+ byte* log_block;
+
+ ut_ad(mutex_own(&(log->mutex)));
+part_loop:
+ /* Calculate a part length */
+
+ data_len = (log->buf_free % OS_FILE_LOG_BLOCK_SIZE) + str_len;
+
+ if (data_len <= OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_TRL_SIZE) {
+
+ /* The string fits within the current log block */
+
+ len = str_len;
+ } else {
+ data_len = OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_TRL_SIZE;
+
+ len = OS_FILE_LOG_BLOCK_SIZE
+ - (log->buf_free % OS_FILE_LOG_BLOCK_SIZE)
+ - LOG_BLOCK_TRL_SIZE;
+ }
+
+ ut_memcpy(log->buf + log->buf_free, str, len);
+
+ str_len -= len;
+ str = str + len;
+
+ log_block = ut_align_down(log->buf + log->buf_free,
+ OS_FILE_LOG_BLOCK_SIZE);
+ log_block_set_data_len(log_block, data_len);
+
+ if (data_len == OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_TRL_SIZE) {
+ /* This block became full */
+ log_block_set_data_len(log_block, OS_FILE_LOG_BLOCK_SIZE);
+ log_block_set_checkpoint_no(log_block,
+ log_sys->next_checkpoint_no);
+ len += LOG_BLOCK_HDR_SIZE + LOG_BLOCK_TRL_SIZE;
+
+ log->lsn = ut_dulint_add(log->lsn, len);
+
+ /* Initialize the next block header and trailer */
+ log_block_init(log_block + OS_FILE_LOG_BLOCK_SIZE, log->lsn);
+ } else {
+ log->lsn = ut_dulint_add(log->lsn, len);
+ }
+
+ log->buf_free += len;
+
+ ut_ad(log->buf_free <= log->buf_size);
+
+ if (str_len > 0) {
+ goto part_loop;
+ }
+}
+
+/****************************************************************
+Closes the log. */
+
+dulint
+log_close(void)
+/*===========*/
+ /* out: lsn */
+{
+ byte* log_block;
+ ulint first_rec_group;
+ dulint oldest_lsn;
+ dulint lsn;
+ log_t* log = log_sys;
+
+ ut_ad(mutex_own(&(log->mutex)));
+
+ lsn = log->lsn;
+
+ log_block = ut_align_down(log->buf + log->buf_free,
+ OS_FILE_LOG_BLOCK_SIZE);
+ first_rec_group = log_block_get_first_rec_group(log_block);
+
+ if (first_rec_group == 0) {
+ /* We initialized a new log block which was not written
+ full by the current mtr: the next mtr log record group
+ will start within this block at the offset data_len */
+
+ log_block_set_first_rec_group(log_block,
+ log_block_get_data_len(log_block));
+ }
+
+ if (log->buf_free > log->max_buf_free) {
+
+ log->check_flush_or_checkpoint = TRUE;
+ }
+
+ if (ut_dulint_minus(lsn, log->last_checkpoint_lsn)
+ <= log->max_modified_age_async) {
+ goto function_exit;
+ }
+
+ oldest_lsn = buf_pool_get_oldest_modification();
+
+ if (ut_dulint_is_zero(oldest_lsn)
+ || (ut_dulint_minus(lsn, oldest_lsn)
+ > log->max_modified_age_async)
+ || (ut_dulint_minus(lsn, log->last_checkpoint_lsn)
+ > log->max_checkpoint_age_async)) {
+
+ log->check_flush_or_checkpoint = TRUE;
+ }
+function_exit:
+
+#ifdef UNIV_LOG_DEBUG
+ log_check_log_recs(log->buf + log->old_buf_free,
+ log->buf_free - log->old_buf_free, log->old_lsn);
+#endif
+
+ return(lsn);
+}
+
+/**********************************************************
+Pads the current log block full with dummy log records. Used in producing
+consistent archived log files. */
+static
+void
+log_pad_current_log_block(void)
+/*===========================*/
+{
+ byte b = MLOG_DUMMY_RECORD;
+ ulint pad_length;
+ ulint i;
+ dulint lsn;
+
+ log_reserve_and_open(OS_FILE_LOG_BLOCK_SIZE);
+
+ pad_length = OS_FILE_LOG_BLOCK_SIZE
+ - (log_sys->buf_free % OS_FILE_LOG_BLOCK_SIZE)
+ - LOG_BLOCK_TRL_SIZE;
+
+ for (i = 0; i < pad_length; i++) {
+ log_write_low(&b, 1);
+ }
+
+ lsn = log_sys->lsn;
+
+ log_close();
+ log_release();
+
+ ut_a((ut_dulint_get_low(lsn) % OS_FILE_LOG_BLOCK_SIZE)
+ == LOG_BLOCK_HDR_SIZE);
+}
+
+/**********************************************************
+Calculates the data capacity of a log group, when the log file headers are not
+included. */
+
+ulint
+log_group_get_capacity(
+/*===================*/
+ /* out: capacity in bytes */
+ log_group_t* group) /* in: log group */
+{
+ ut_ad(mutex_own(&(log_sys->mutex)));
+
+ return((group->file_size - LOG_FILE_HDR_SIZE) * group->n_files);
+}
+
+/**********************************************************
+Calculates the offset within a log group, when the log file headers are not
+included. */
+UNIV_INLINE
+ulint
+log_group_calc_size_offset(
+/*=======================*/
+ /* out: size offset (<= offset) */
+ ulint offset, /* in: real offset within the log group */
+ log_group_t* group) /* in: log group */
+{
+ ut_ad(mutex_own(&(log_sys->mutex)));
+
+ return(offset - LOG_FILE_HDR_SIZE * (1 + offset / group->file_size));
+}
+
+/**********************************************************
+Calculates the offset within a log group, when the log file headers are
+included. */
+UNIV_INLINE
+ulint
+log_group_calc_real_offset(
+/*=======================*/
+ /* out: real offset (>= offset) */
+ ulint offset, /* in: size offset within the log group */
+ log_group_t* group) /* in: log group */
+{
+ ut_ad(mutex_own(&(log_sys->mutex)));
+
+ return(offset + LOG_FILE_HDR_SIZE
+ * (1 + offset / (group->file_size - LOG_FILE_HDR_SIZE)));
+}
+
+/**********************************************************
+Calculates the offset of an lsn within a log group. */
+static
+ulint
+log_group_calc_lsn_offset(
+/*======================*/
+ /* out: offset within the log group */
+ dulint lsn, /* in: lsn, must be within 4 GB of group->lsn */
+ log_group_t* group) /* in: log group */
+{
+ dulint gr_lsn;
+ ulint gr_lsn_size_offset;
+ ulint difference;
+ ulint group_size;
+ ulint offset;
+
+ ut_ad(mutex_own(&(log_sys->mutex)));
+
+ gr_lsn = group->lsn;
+
+ gr_lsn_size_offset = log_group_calc_size_offset(group->lsn_offset,
+ group);
+ group_size = log_group_get_capacity(group);
+
+ if (ut_dulint_cmp(lsn, gr_lsn) >= 0) {
+
+ difference = ut_dulint_minus(lsn, gr_lsn);
+ } else {
+ difference = ut_dulint_minus(gr_lsn, lsn);
+
+ difference = difference % group_size;
+
+ difference = group_size - difference;
+ }
+
+ offset = (gr_lsn_size_offset + difference) % group_size;
+
+ return(log_group_calc_real_offset(offset, group));
+}
+
+/************************************************************
+Sets the field values in group to correspond to a given lsn. For this function
+to work, the values must already be correctly initialized to correspond to
+some lsn, for instance, a checkpoint lsn. */
+
+void
+log_group_set_fields(
+/*=================*/
+ log_group_t* group, /* in: group */
+ dulint lsn) /* in: lsn for which the values should be
+ set */
+{
+ group->lsn_offset = log_group_calc_lsn_offset(lsn, group);
+ group->lsn = lsn;
+}
+
+/*********************************************************************
+Calculates the recommended highest values for lsn - last_checkpoint_lsn,
+lsn - buf_get_oldest_modification(), and lsn - max_archive_lsn_age. */
+static
+ibool
+log_calc_max_ages(void)
+/*===================*/
+ /* out: error value FALSE if the smallest log group is
+ too small to accommodate the number of OS threads in
+ the database server */
+{
+ log_group_t* group;
+ ulint n_threads;
+ ulint margin;
+ ulint free;
+ ibool success = TRUE;
+ ulint smallest_capacity;
+ ulint archive_margin;
+ ulint smallest_archive_margin;
+
+ ut_ad(!mutex_own(&(log_sys->mutex)));
+
+ n_threads = srv_get_n_threads();
+
+ mutex_enter(&(log_sys->mutex));
+
+ group = UT_LIST_GET_FIRST(log_sys->log_groups);
+
+ ut_ad(group);
+
+ smallest_capacity = ULINT_MAX;
+ smallest_archive_margin = ULINT_MAX;
+
+ while (group) {
+ if (log_group_get_capacity(group) < smallest_capacity) {
+
+ smallest_capacity = log_group_get_capacity(group);
+ }
+
+ archive_margin = log_group_get_capacity(group)
+ - (group->file_size - LOG_FILE_HDR_SIZE)
+ - LOG_ARCHIVE_EXTRA_MARGIN;
+
+ if (archive_margin < smallest_archive_margin) {
+
+ smallest_archive_margin = archive_margin;
+ }
+
+ group = UT_LIST_GET_NEXT(log_groups, group);
+ }
+
+ /* For each OS thread we must reserve so much free space in the
+ smallest log group that it can accommodate the log entries produced
+ by single query steps: running out of free log space is a serious
+ system error which requires rebooting the database. */
+
+ free = LOG_CHECKPOINT_FREE_PER_THREAD * n_threads
+ + LOG_CHECKPOINT_EXTRA_FREE;
+ if (free >= smallest_capacity / 2) {
+ success = FALSE;
+ } else {
+ margin = smallest_capacity - free;
+ }
+
+ margin = ut_min(margin, log_sys->adm_checkpoint_interval);
+
+ log_sys->max_modified_age_async = margin
+ - margin / LOG_POOL_PREFLUSH_RATIO_ASYNC;
+ log_sys->max_modified_age_sync = margin
+ - margin / LOG_POOL_PREFLUSH_RATIO_SYNC;
+
+ log_sys->max_checkpoint_age_async = margin - margin
+ / LOG_POOL_CHECKPOINT_RATIO_ASYNC;
+ log_sys->max_checkpoint_age = margin;
+
+ log_sys->max_archived_lsn_age = smallest_archive_margin;
+
+ log_sys->max_archived_lsn_age_async = smallest_archive_margin
+ - smallest_archive_margin /
+ LOG_ARCHIVE_RATIO_ASYNC;
+ mutex_exit(&(log_sys->mutex));
+
+ if (!success) {
+ printf(
+ "Error: log file group too small for the number of threads\n");
+ }
+
+ return(success);
+}
+
+/**********************************************************
+Initializes the log. */
+
+void
+log_init(void)
+/*==========*/
+{
+ byte* buf;
+
+ log_sys = mem_alloc(sizeof(log_t));
+
+ mutex_create(&(log_sys->mutex));
+ mutex_set_level(&(log_sys->mutex), SYNC_LOG);
+
+ mutex_enter(&(log_sys->mutex));
+
+ /* Start the lsn from one log block from zero: this way every
+ log record has a start lsn != zero, a fact which we will use */
+
+ log_sys->lsn = LOG_START_LSN;
+
+ ut_a(LOG_BUFFER_SIZE >= 16 * OS_FILE_LOG_BLOCK_SIZE);
+ ut_a(LOG_BUFFER_SIZE >= 4 * UNIV_PAGE_SIZE);
+
+ buf = ut_malloc(LOG_BUFFER_SIZE + OS_FILE_LOG_BLOCK_SIZE);
+ log_sys->buf = ut_align(buf, OS_FILE_LOG_BLOCK_SIZE);
+
+ log_sys->buf_size = LOG_BUFFER_SIZE;
+ log_sys->max_buf_free = log_sys->buf_size / LOG_BUF_FLUSH_RATIO
+ - LOG_BUF_FLUSH_MARGIN;
+ log_sys->check_flush_or_checkpoint = TRUE;
+ UT_LIST_INIT(log_sys->log_groups);
+
+ log_sys->n_log_ios = 0;
+
+ /*----------------------------*/
+
+ log_sys->buf_next_to_write = 0;
+
+ log_sys->written_to_some_lsn = log_sys->lsn;
+ log_sys->written_to_all_lsn = log_sys->lsn;
+
+ log_sys->n_pending_writes = 0;
+
+ log_sys->no_flush_event = os_event_create(NULL);
+
+ os_event_set(log_sys->no_flush_event);
+
+ log_sys->one_flushed_event = os_event_create(NULL);
+
+ os_event_set(log_sys->one_flushed_event);
+
+ /*----------------------------*/
+ log_sys->adm_checkpoint_interval = ULINT_MAX;
+
+ log_sys->next_checkpoint_no = ut_dulint_zero;
+ log_sys->last_checkpoint_lsn = log_sys->lsn;
+ log_sys->n_pending_checkpoint_writes = 0;
+
+ rw_lock_create(&(log_sys->checkpoint_lock));
+ rw_lock_set_level(&(log_sys->checkpoint_lock), SYNC_NO_ORDER_CHECK);
+
+ log_sys->checkpoint_buf = ut_align(
+ mem_alloc(2 * OS_FILE_LOG_BLOCK_SIZE),
+ OS_FILE_LOG_BLOCK_SIZE);
+ /*----------------------------*/
+
+ log_sys->archiving_state = LOG_ARCH_ON;
+ log_sys->archived_lsn = log_sys->lsn;
+
+ log_sys->n_pending_archive_ios = 0;
+
+ rw_lock_create(&(log_sys->archive_lock));
+ rw_lock_set_level(&(log_sys->archive_lock), SYNC_NO_ORDER_CHECK);
+
+ log_sys->archive_buf = ut_align(
+ ut_malloc(LOG_ARCHIVE_BUF_SIZE
+ + OS_FILE_LOG_BLOCK_SIZE),
+ OS_FILE_LOG_BLOCK_SIZE);
+ log_sys->archive_buf_size = LOG_ARCHIVE_BUF_SIZE;
+
+ log_sys->archiving_on = os_event_create(NULL);
+
+ /*----------------------------*/
+
+ log_sys->online_backup_state = FALSE;
+
+ /*----------------------------*/
+
+ log_block_init(log_sys->buf, log_sys->lsn);
+ log_block_set_first_rec_group(log_sys->buf, LOG_BLOCK_HDR_SIZE);
+
+ log_sys->buf_free = LOG_BLOCK_HDR_SIZE;
+ log_sys->lsn = ut_dulint_add(LOG_START_LSN, LOG_BLOCK_HDR_SIZE);
+
+ mutex_exit(&(log_sys->mutex));
+
+#ifdef UNIV_LOG_DEBUG
+ recv_sys_create();
+ recv_sys_init();
+
+ recv_sys->parse_start_lsn = log_sys->lsn;
+ recv_sys->scanned_lsn = log_sys->lsn;
+ recv_sys->scanned_checkpoint_no = 0;
+ recv_sys->recovered_lsn = log_sys->lsn;
+ recv_sys->limit_lsn = ut_dulint_max;
+#endif
+}
+
+/**********************************************************************
+Inits a log group to the log system. */
+
+void
+log_group_init(
+/*===========*/
+ ulint id, /* in: group id */
+ ulint n_files, /* in: number of log files */
+ ulint file_size, /* in: log file size in bytes */
+ ulint space_id, /* in: space id of the file space
+ which contains the log files of this
+ group */
+ ulint archive_space_id) /* in: space id of the file space
+ which contains some archived log
+ files for this group; currently, only
+ for the first log group this is
+ used */
+{
+ ulint i;
+
+ log_group_t* group;
+
+ group = mem_alloc(sizeof(log_group_t));
+
+ group->id = id;
+ group->n_files = n_files;
+ group->file_size = file_size;
+ group->space_id = space_id;
+ group->state = LOG_GROUP_OK;
+ group->lsn = LOG_START_LSN;
+ group->lsn_offset = LOG_FILE_HDR_SIZE;
+ group->n_pending_writes = 0;
+
+ group->file_header_bufs = mem_alloc(sizeof(byte*) * n_files);
+ group->archive_file_header_bufs = mem_alloc(sizeof(byte*) * n_files);
+
+ for (i = 0; i < n_files; i++) {
+ *(group->file_header_bufs + i) = ut_align(
+ mem_alloc(LOG_FILE_HDR_SIZE + OS_FILE_LOG_BLOCK_SIZE),
+ OS_FILE_LOG_BLOCK_SIZE);
+ *(group->archive_file_header_bufs + i) = ut_align(
+ mem_alloc(LOG_FILE_HDR_SIZE + OS_FILE_LOG_BLOCK_SIZE),
+ OS_FILE_LOG_BLOCK_SIZE);
+ }
+
+ group->archive_space_id = archive_space_id;
+
+ group->archived_file_no = 0;
+ group->archived_offset = 0;
+
+ group->checkpoint_buf = ut_align(
+ mem_alloc(2 * OS_FILE_LOG_BLOCK_SIZE),
+ OS_FILE_LOG_BLOCK_SIZE);
+
+ UT_LIST_ADD_LAST(log_groups, log_sys->log_groups, group);
+
+ ut_a(log_calc_max_ages());
+}
+
+/**********************************************************************
+Does the unlockings needed in flush i/o completion. */
+UNIV_INLINE
+void
+log_flush_do_unlocks(
+/*=================*/
+ ulint code) /* in: any ORed combination of LOG_UNLOCK_FLUSH_LOCK
+ and LOG_UNLOCK_NONE_FLUSHED_LOCK */
+{
+ ut_ad(mutex_own(&(log_sys->mutex)));
+
+ /* NOTE that we must own the log mutex when doing the setting of the
+ events: this is because transactions will wait for these events to
+ be set, and at that moment the log flush they were waiting for must
+ have ended. If the log mutex were not reserved here, the i/o-thread
+ calling this function might be preempted for a while, and when it
+ resumed execution, it might be that a new flush had been started, and
+ this function would erroneously signal the NEW flush as completed.
+ Thus, the changes in the state of these events are performed
+ atomically in conjunction with the changes in the state of
+ log_sys->n_pending_writes etc. */
+
+ if (code & LOG_UNLOCK_NONE_FLUSHED_LOCK) {
+ os_event_set(log_sys->one_flushed_event);
+ }
+
+ if (code & LOG_UNLOCK_FLUSH_LOCK) {
+ os_event_set(log_sys->no_flush_event);
+ }
+}
+
+/**********************************************************************
+Checks if a flush is completed for a log group and does the completion
+routine if yes. */
+UNIV_INLINE
+ulint
+log_group_check_flush_completion(
+/*=============================*/
+ /* out: LOG_UNLOCK_NONE_FLUSHED_LOCK or 0 */
+ log_group_t* group) /* in: log group */
+{
+ ut_ad(mutex_own(&(log_sys->mutex)));
+
+ if (!log_sys->one_flushed && (group->n_pending_writes == 0)) {
+
+ if (log_debug_writes) {
+ printf("Log flushed first to group %lu\n", group->id);
+ }
+
+ log_sys->written_to_some_lsn = log_sys->flush_lsn;
+ log_sys->one_flushed = TRUE;
+
+ return(LOG_UNLOCK_NONE_FLUSHED_LOCK);
+ }
+
+ if (log_debug_writes && (group->n_pending_writes == 0)) {
+
+ printf("Log flushed to group %lu\n", group->id);
+ }
+
+ return(0);
+}
+
+/**********************************************************
+Checks if a flush is completed and does the completion routine if yes. */
+static
+ulint
+log_sys_check_flush_completion(void)
+/*================================*/
+ /* out: LOG_UNLOCK_FLUSH_LOCK or 0 */
+{
+ ulint move_start;
+ ulint move_end;
+
+ ut_ad(mutex_own(&(log_sys->mutex)));
+
+ if (log_sys->n_pending_writes == 0) {
+
+ log_sys->written_to_all_lsn = log_sys->flush_lsn;
+ log_sys->buf_next_to_write = log_sys->flush_end_offset;
+
+ if (log_sys->flush_end_offset > log_sys->max_buf_free / 2) {
+ /* Move the log buffer content to the start of the
+ buffer */
+
+ move_start = ut_calc_align_down(
+ log_sys->flush_end_offset,
+ OS_FILE_LOG_BLOCK_SIZE);
+ move_end = ut_calc_align(log_sys->buf_free,
+ OS_FILE_LOG_BLOCK_SIZE);
+
+ ut_memmove(log_sys->buf, log_sys->buf + move_start,
+ move_end - move_start);
+ log_sys->buf_free -= move_start;
+
+ log_sys->buf_next_to_write -= move_start;
+ }
+
+ return(LOG_UNLOCK_FLUSH_LOCK);
+ }
+
+ return(0);
+}
+
+/**********************************************************
+Completes an i/o to a log file. */
+
+void
+log_io_complete(
+/*============*/
+ log_group_t* group) /* in: log group or a dummy pointer */
+{
+ ulint unlock;
+
+ if ((byte*)group == &log_archive_io) {
+ /* It was an archive write */
+
+ log_io_complete_archive();
+
+ return;
+ }
+
+ if ((ulint)group & 0x1) {
+ /* It was a checkpoint write */
+ group = (log_group_t*)((ulint)group - 1);
+
+ fil_flush(group->space_id);
+
+ log_io_complete_checkpoint(group);
+
+ return;
+ }
+
+ fil_flush(group->space_id);
+
+ mutex_enter(&(log_sys->mutex));
+
+ ut_ad(group->n_pending_writes > 0);
+ ut_ad(log_sys->n_pending_writes > 0);
+
+ group->n_pending_writes--;
+ log_sys->n_pending_writes--;
+
+ unlock = log_group_check_flush_completion(group);
+ unlock = unlock | log_sys_check_flush_completion();
+
+ log_flush_do_unlocks(unlock);
+
+ mutex_exit(&(log_sys->mutex));
+}
+
+/**********************************************************
+Writes a log file header to a log file space. */
+static
+void
+log_group_file_header_flush(
+/*========================*/
+ ulint type, /* in: LOG_FLUSH or LOG_RECOVER */
+ log_group_t* group, /* in: log group */
+ ulint nth_file, /* in: header to the nth file in the
+ log file space */
+ dulint start_lsn) /* in: log file data starts at this
+ lsn */
+{
+ byte* buf;
+ ulint dest_offset;
+ ibool sync;
+
+ ut_ad(mutex_own(&(log_sys->mutex)));
+
+ ut_a(nth_file < group->n_files);
+
+ buf = *(group->file_header_bufs + nth_file);
+
+ mach_write_to_4(buf + LOG_GROUP_ID, group->id);
+ mach_write_to_8(buf + LOG_FILE_START_LSN, start_lsn);
+
+ dest_offset = nth_file * group->file_size;
+
+ sync = FALSE;
+
+ if (type == LOG_RECOVER) {
+
+ sync = TRUE;
+ }
+
+ if (log_debug_writes) {
+ printf(
+ "Writing log file header to group %lu file %lu\n", group->id,
+ nth_file);
+ }
+
+ if (log_do_write) {
+ if (type == LOG_FLUSH) {
+ log_sys->n_pending_writes++;
+ group->n_pending_writes++;
+ }
+
+ log_sys->n_log_ios++;
+
+ fil_io(OS_FILE_WRITE | OS_FILE_LOG, sync, group->space_id,
+ dest_offset / UNIV_PAGE_SIZE,
+ dest_offset % UNIV_PAGE_SIZE,
+ OS_FILE_LOG_BLOCK_SIZE,
+ buf, group);
+ }
+}
+
+/**********************************************************
+Writes a buffer to a log file group. */
+
+void
+log_group_write_buf(
+/*================*/
+ ulint type, /* in: LOG_FLUSH or LOG_RECOVER */
+ log_group_t* group, /* in: log group */
+ byte* buf, /* in: buffer */
+ ulint len, /* in: buffer len; must be divisible
+ by OS_FILE_LOG_BLOCK_SIZE */
+ dulint start_lsn, /* in: start lsn of the buffer; must
+ be divisible by
+ OS_FILE_LOG_BLOCK_SIZE */
+ ulint new_data_offset)/* in: start offset of new data in
+ buf: this parameter is used to decide
+ if we have to write a new log file
+ header */
+{
+ ulint write_len;
+ ibool sync;
+ ibool write_header;
+ ulint next_offset;
+
+ ut_ad(mutex_own(&(log_sys->mutex)));
+ ut_ad(len % OS_FILE_LOG_BLOCK_SIZE == 0);
+ ut_ad(ut_dulint_get_low(start_lsn) % OS_FILE_LOG_BLOCK_SIZE == 0);
+
+ sync = FALSE;
+
+ if (type == LOG_RECOVER) {
+
+ sync = TRUE;
+ }
+
+ if (new_data_offset == 0) {
+ write_header = TRUE;
+ } else {
+ write_header = FALSE;
+ }
+loop:
+ if (len == 0) {
+
+ return;
+ }
+
+ next_offset = log_group_calc_lsn_offset(start_lsn, group);
+
+ if ((next_offset % group->file_size == LOG_FILE_HDR_SIZE)
+ && write_header) {
+ /* We start to write a new log file instance in the group */
+
+ log_group_file_header_flush(type, group,
+ next_offset / group->file_size, start_lsn);
+ }
+
+ if ((next_offset % group->file_size) + len > group->file_size) {
+
+ write_len = group->file_size - (next_offset % group->file_size);
+ } else {
+ write_len = len;
+ }
+
+ if (log_debug_writes) {
+ printf(
+ "Writing log file segment to group %lu offset %lu len %lu\n",
+ group->id, next_offset, write_len);
+ }
+
+ if (log_do_write) {
+ if (type == LOG_FLUSH) {
+ log_sys->n_pending_writes++;
+ group->n_pending_writes++;
+ }
+
+ log_sys->n_log_ios++;
+
+ fil_io(OS_FILE_WRITE | OS_FILE_LOG, sync, group->space_id,
+ next_offset / UNIV_PAGE_SIZE,
+ next_offset % UNIV_PAGE_SIZE, write_len, buf, group);
+ }
+
+ if (write_len < len) {
+ start_lsn = ut_dulint_add(start_lsn, write_len);
+ len -= write_len;
+ buf += write_len;
+
+ write_header = TRUE;
+
+ goto loop;
+ }
+}
+
+/**********************************************************
+This function is called, e.g., when a transaction wants to commit. It checks
+that the log has been flushed to disk up to the last log entry written by the
+transaction. If there is a flush running, it waits and checks if the flush
+flushed enough. If not, starts a new flush. */
+
+void
+log_flush_up_to(
+/*============*/
+ dulint lsn, /* in: log sequence number up to which the log should
+ be flushed, ut_dulint_max if not specified */
+ ulint wait) /* in: LOG_NO_WAIT, LOG_WAIT_ONE_GROUP,
+ or LOG_WAIT_ALL_GROUPS */
+{
+ log_group_t* group;
+ ulint start_offset;
+ ulint end_offset;
+ ulint area_start;
+ ulint area_end;
+ ulint loop_count;
+
+ if (recv_no_ibuf_operations) {
+ /* Recovery is running and no operations on the log files are
+ allowed yet (the variable name .._no_ibuf_.. is misleading) */
+
+ return;
+ }
+
+ loop_count = 0;
+loop:
+ loop_count++;
+
+ ut_ad(loop_count < 5);
+
+ if (loop_count > 2) {
+/* printf("Log loop count %lu\n", loop_count); */
+ }
+
+ mutex_enter(&(log_sys->mutex));
+
+ if ((ut_dulint_cmp(log_sys->written_to_all_lsn, lsn) >= 0)
+ || ((ut_dulint_cmp(log_sys->written_to_some_lsn, lsn) >= 0)
+ && (wait != LOG_WAIT_ALL_GROUPS))) {
+
+ mutex_exit(&(log_sys->mutex));
+
+ return;
+ }
+
+ if (log_sys->n_pending_writes > 0) {
+ /* A flush is running */
+
+ if (ut_dulint_cmp(log_sys->flush_lsn, lsn) >= 0) {
+ /* The flush will flush enough: wait for it to
+ complete */
+
+ goto do_waits;
+ }
+
+ mutex_exit(&(log_sys->mutex));
+
+ /* Wait for the flush to complete and try to start a new
+ flush */
+
+ os_event_wait(log_sys->no_flush_event);
+
+ goto loop;
+ }
+
+ if (log_sys->buf_free == log_sys->buf_next_to_write) {
+ /* Nothing to flush */
+
+ mutex_exit(&(log_sys->mutex));
+
+ return;
+ }
+
+ if (log_debug_writes) {
+ printf("Flushing log from %lu %lu up to lsn %lu %lu\n",
+ ut_dulint_get_high(log_sys->written_to_all_lsn),
+ ut_dulint_get_low(log_sys->written_to_all_lsn),
+ ut_dulint_get_high(log_sys->lsn),
+ ut_dulint_get_low(log_sys->lsn));
+ }
+
+ os_event_reset(log_sys->no_flush_event);
+ os_event_reset(log_sys->one_flushed_event);
+
+ start_offset = log_sys->buf_next_to_write;
+ end_offset = log_sys->buf_free;
+
+ area_start = ut_calc_align_down(start_offset, OS_FILE_LOG_BLOCK_SIZE);
+ area_end = ut_calc_align(end_offset, OS_FILE_LOG_BLOCK_SIZE);
+
+ ut_ad(area_end - area_start > 0);
+
+ log_sys->flush_lsn = log_sys->lsn;
+ log_sys->one_flushed = FALSE;
+
+ log_block_set_flush_bit(log_sys->buf + area_start, TRUE);
+ log_block_set_checkpoint_no(
+ log_sys->buf + area_end - OS_FILE_LOG_BLOCK_SIZE,
+ log_sys->next_checkpoint_no);
+
+ /* Copy the last, incompletely written, log block a log block length
+ up, so that when the flush operation writes from the log buffer, the
+ segment to write will not be changed by writers to the log */
+
+ ut_memcpy(log_sys->buf + area_end,
+ log_sys->buf + area_end - OS_FILE_LOG_BLOCK_SIZE,
+ OS_FILE_LOG_BLOCK_SIZE);
+
+ log_sys->buf_free += OS_FILE_LOG_BLOCK_SIZE;
+ log_sys->flush_end_offset = log_sys->buf_free;
+
+ group = UT_LIST_GET_FIRST(log_sys->log_groups);
+
+ while (group) {
+ log_group_write_buf(LOG_FLUSH, group,
+ log_sys->buf + area_start,
+ area_end - area_start,
+ ut_dulint_align_down(log_sys->written_to_all_lsn,
+ OS_FILE_LOG_BLOCK_SIZE),
+ start_offset - area_start);
+
+ log_group_set_fields(group, log_sys->flush_lsn);
+
+ group = UT_LIST_GET_NEXT(log_groups, group);
+ }
+
+do_waits:
+ mutex_exit(&(log_sys->mutex));
+
+ if (wait == LOG_WAIT_ONE_GROUP) {
+ os_event_wait(log_sys->one_flushed_event);
+ } else if (wait == LOG_WAIT_ALL_GROUPS) {
+ os_event_wait(log_sys->no_flush_event);
+ } else {
+ ut_ad(wait == LOG_NO_WAIT);
+ }
+}
+
+/********************************************************************
+Tries to establish a big enough margin of free space in the log buffer, such
+that a new log entry can be catenated without an immediate need for a flush. */
+static
+void
+log_flush_margin(void)
+/*==================*/
+{
+ ibool do_flush = FALSE;
+ log_t* log = log_sys;
+
+ mutex_enter(&(log->mutex));
+
+ if (log->buf_free > log->max_buf_free) {
+
+ if (log->n_pending_writes > 0) {
+ /* A flush is running: hope that it will provide enough
+ free space */
+ } else {
+ do_flush = TRUE;
+ }
+ }
+
+ mutex_exit(&(log->mutex));
+
+ if (do_flush) {
+ log_flush_up_to(ut_dulint_max, LOG_NO_WAIT);
+ }
+}
+
+/********************************************************************
+Advances the smallest lsn for which there are unflushed dirty blocks in the
+buffer pool. NOTE: this function may only be called if the calling thread owns
+no synchronization objects! */
+
+ibool
+log_preflush_pool_modified_pages(
+/*=============================*/
+ /* out: FALSE if there was a flush batch of
+ the same type running, which means that we
+ could not start this flush batch */
+ dulint new_oldest, /* in: try to advance oldest_modified_lsn
+ at least to this lsn */
+ ibool sync) /* in: TRUE if synchronous operation is
+ desired */
+{
+ ulint n_pages;
+
+ if (recv_recovery_on) {
+ /* If the recovery is running, we must first apply all
+ log records to their respective file pages to get the
+ right modify lsn values to these pages: otherwise, there
+ might be pages on disk which are not yet recovered to the
+ current lsn, and even after calling this function, we could
+ not know how up-to-date the disk version of the database is,
+ and we could not make a new checkpoint on the basis of the
+ info on the buffer pool only. */
+
+ recv_apply_hashed_log_recs(TRUE);
+ }
+
+ n_pages = buf_flush_batch(BUF_FLUSH_LIST, ULINT_MAX, new_oldest);
+
+ if (sync) {
+ buf_flush_wait_batch_end(BUF_FLUSH_LIST);
+ }
+
+ if (n_pages == ULINT_UNDEFINED) {
+
+ return(FALSE);
+ }
+
+ return(TRUE);
+}
+
+/**********************************************************
+Completes a checkpoint. */
+static
+void
+log_complete_checkpoint(void)
+/*=========================*/
+{
+ ut_ad(mutex_own(&(log_sys->mutex)));
+ ut_ad(log_sys->n_pending_checkpoint_writes == 0);
+
+ log_sys->next_checkpoint_no
+ = ut_dulint_add(log_sys->next_checkpoint_no, 1);
+
+ log_sys->last_checkpoint_lsn = log_sys->next_checkpoint_lsn;
+
+ rw_lock_x_unlock_gen(&(log_sys->checkpoint_lock), LOG_CHECKPOINT);
+}
+
+/**********************************************************
+Completes an asynchronous checkpoint info write i/o to a log file. */
+static
+void
+log_io_complete_checkpoint(
+/*=======================*/
+ log_group_t* group) /* in: log group */
+{
+ mutex_enter(&(log_sys->mutex));
+
+ ut_ad(log_sys->n_pending_checkpoint_writes > 0);
+
+ log_sys->n_pending_checkpoint_writes--;
+
+ if (log_debug_writes) {
+ printf("Checkpoint info written to group %lu\n", group->id);
+ }
+
+ if (log_sys->n_pending_checkpoint_writes == 0) {
+ log_complete_checkpoint();
+ }
+
+ mutex_exit(&(log_sys->mutex));
+}
+
+/***********************************************************************
+Writes info to a checkpoint about a log group. */
+static
+void
+log_checkpoint_set_nth_group_info(
+/*==============================*/
+ byte* buf, /* in: buffer for checkpoint info */
+ ulint n, /* in: nth slot */
+ ulint file_no,/* in: archived file number */
+ ulint offset) /* in: archived file offset */
+{
+ ut_ad(n < LOG_MAX_N_GROUPS);
+
+ mach_write_to_4(buf + LOG_CHECKPOINT_GROUP_ARRAY
+ + 8 * n + LOG_CHECKPOINT_ARCHIVED_FILE_NO, file_no);
+ mach_write_to_4(buf + LOG_CHECKPOINT_GROUP_ARRAY
+ + 8 * n + LOG_CHECKPOINT_ARCHIVED_OFFSET, offset);
+}
+
+/***********************************************************************
+Gets info from a checkpoint about a log group. */
+
+void
+log_checkpoint_get_nth_group_info(
+/*==============================*/
+ byte* buf, /* in: buffer containing checkpoint info */
+ ulint n, /* in: nth slot */
+ ulint* file_no,/* out: archived file number */
+ ulint* offset) /* out: archived file offset */
+{
+ ut_ad(n < LOG_MAX_N_GROUPS);
+
+ *file_no = mach_read_from_4(buf + LOG_CHECKPOINT_GROUP_ARRAY
+ + 8 * n + LOG_CHECKPOINT_ARCHIVED_FILE_NO);
+ *offset = mach_read_from_4(buf + LOG_CHECKPOINT_GROUP_ARRAY
+ + 8 * n + LOG_CHECKPOINT_ARCHIVED_OFFSET);
+}
+
+/**********************************************************
+Writes the checkpoint info to a log group header. */
+static
+void
+log_group_checkpoint(
+/*=================*/
+ log_group_t* group) /* in: log group */
+{
+ log_group_t* group2;
+ dulint archived_lsn;
+ dulint next_archived_lsn;
+ ulint write_offset;
+ ulint fold;
+ byte* buf;
+ ulint i;
+
+ ut_ad(mutex_own(&(log_sys->mutex)));
+ ut_ad(LOG_CHECKPOINT_SIZE <= OS_FILE_LOG_BLOCK_SIZE);
+
+ buf = group->checkpoint_buf;
+
+ mach_write_to_8(buf + LOG_CHECKPOINT_NO, log_sys->next_checkpoint_no);
+ mach_write_to_8(buf + LOG_CHECKPOINT_LSN, log_sys->next_checkpoint_lsn);
+
+ mach_write_to_4(buf + LOG_CHECKPOINT_OFFSET,
+ log_group_calc_lsn_offset(
+ log_sys->next_checkpoint_lsn, group));
+
+ mach_write_to_4(buf + LOG_CHECKPOINT_LOG_BUF_SIZE, log_sys->buf_size);
+
+ if (log_sys->archiving_state == LOG_ARCH_OFF) {
+ archived_lsn = ut_dulint_max;
+ } else {
+ archived_lsn = log_sys->archived_lsn;
+
+ if (0 != ut_dulint_cmp(archived_lsn,
+ log_sys->next_archived_lsn)) {
+ next_archived_lsn = log_sys->next_archived_lsn;
+ /* For debugging only */
+ }
+ }
+
+ mach_write_to_8(buf + LOG_CHECKPOINT_ARCHIVED_LSN, archived_lsn);
+
+ for (i = 0; i < LOG_MAX_N_GROUPS; i++) {
+ log_checkpoint_set_nth_group_info(buf, i, 0, 0);
+ }
+
+ group2 = UT_LIST_GET_FIRST(log_sys->log_groups);
+
+ while (group2) {
+ log_checkpoint_set_nth_group_info(buf, group2->id,
+ group2->archived_file_no,
+ group2->archived_offset);
+
+ group2 = UT_LIST_GET_NEXT(log_groups, group2);
+ }
+
+ fold = ut_fold_binary(buf, LOG_CHECKPOINT_CHECKSUM_1);
+ mach_write_to_4(buf + LOG_CHECKPOINT_CHECKSUM_1, fold);
+
+ fold = ut_fold_binary(buf + LOG_CHECKPOINT_LSN,
+ LOG_CHECKPOINT_CHECKSUM_2 - LOG_CHECKPOINT_LSN);
+ mach_write_to_4(buf + LOG_CHECKPOINT_CHECKSUM_2, fold);
+
+ /* We alternate the physical place of the checkpoint info in the first
+ log file */
+
+ if (ut_dulint_get_low(log_sys->next_checkpoint_no) % 2 == 0) {
+ write_offset = LOG_CHECKPOINT_1;
+ } else {
+ write_offset = LOG_CHECKPOINT_2;
+ }
+
+ if (log_do_write) {
+ if (log_sys->n_pending_checkpoint_writes == 0) {
+
+ rw_lock_x_lock_gen(&(log_sys->checkpoint_lock),
+ LOG_CHECKPOINT);
+ }
+
+ log_sys->n_pending_checkpoint_writes++;
+
+ log_sys->n_log_ios++;
+
+ /* We send as the last parameter the group machine address
+ added with 1, as we want to distinguish between a normal log
+ file write and a checkpoint field write */
+
+ fil_io(OS_FILE_WRITE | OS_FILE_LOG, FALSE, group->space_id,
+ write_offset / UNIV_PAGE_SIZE,
+ write_offset % UNIV_PAGE_SIZE,
+ OS_FILE_LOG_BLOCK_SIZE,
+ buf, ((byte*)group + 1));
+
+ ut_ad(((ulint)group & 0x1) == 0);
+ }
+}
+
+/**********************************************************
+Reads a checkpoint info from a log group header to log_sys->checkpoint_buf. */
+
+void
+log_group_read_checkpoint_info(
+/*===========================*/
+ log_group_t* group, /* in: log group */
+ ulint field) /* in: LOG_CHECKPOINT_1 or LOG_CHECKPOINT_2 */
+{
+ ut_ad(mutex_own(&(log_sys->mutex)));
+
+ log_sys->n_log_ios++;
+
+ fil_io(OS_FILE_READ | OS_FILE_LOG, TRUE, group->space_id,
+ field / UNIV_PAGE_SIZE, field % UNIV_PAGE_SIZE,
+ OS_FILE_LOG_BLOCK_SIZE, log_sys->checkpoint_buf, NULL);
+}
+
+/**********************************************************
+Writes checkpoint info to groups. */
+
+void
+log_groups_write_checkpoint_info(void)
+/*==================================*/
+{
+ log_group_t* group;
+
+ ut_ad(mutex_own(&(log_sys->mutex)));
+
+ group = UT_LIST_GET_FIRST(log_sys->log_groups);
+
+ while (group) {
+ log_group_checkpoint(group);
+
+ group = UT_LIST_GET_NEXT(log_groups, group);
+ }
+}
+
+/**********************************************************
+Makes a checkpoint. Note that this function does not flush dirty
+blocks from the buffer pool: it only checks what is lsn of the oldest
+modification in the pool, and writes information about the lsn in
+log files. Use log_make_checkpoint_at to flush also the pool. */
+
+ibool
+log_checkpoint(
+/*===========*/
+ /* out: TRUE if success, FALSE if a checkpoint
+ write was already running */
+ ibool sync, /* in: TRUE if synchronous operation is
+ desired */
+ ibool write_always) /* in: the function normally checks if the
+ the new checkpoint would have a greater
+ lsn than the previous one: if not, then no
+ physical write is done; by setting this
+ parameter TRUE, a physical write will always be
+ made to log files */
+{
+ dulint oldest_lsn;
+
+ if (recv_recovery_is_on()) {
+ recv_apply_hashed_log_recs(TRUE);
+ }
+
+ fil_flush_file_spaces(FIL_TABLESPACE);
+
+ mutex_enter(&(log_sys->mutex));
+
+ oldest_lsn = log_buf_pool_get_oldest_modification();
+
+ mutex_exit(&(log_sys->mutex));
+
+ /* Because log also contains headers and dummy log records,
+ if the buffer pool contains no dirty buffers, oldest_lsn
+ gets the value log_sys->lsn from the previous function,
+ and we must make sure that the log is flushed up to that
+ lsn. If there are dirty buffers in the buffer pool, then our
+ write-ahead-logging algorithm ensures that the log has been flushed
+ up to oldest_lsn. */
+
+ log_flush_up_to(oldest_lsn, LOG_WAIT_ALL_GROUPS);
+
+ mutex_enter(&(log_sys->mutex));
+
+ if (!write_always && ut_dulint_cmp(
+ log_sys->last_checkpoint_lsn, oldest_lsn) >= 0) {
+
+ mutex_exit(&(log_sys->mutex));
+
+ return(TRUE);
+ }
+
+ ut_ad(ut_dulint_cmp(log_sys->written_to_all_lsn, oldest_lsn) >= 0);
+
+ if (log_sys->n_pending_checkpoint_writes > 0) {
+ /* A checkpoint write is running */
+
+ mutex_exit(&(log_sys->mutex));
+
+ if (sync) {
+ /* Wait for the checkpoint write to complete */
+ rw_lock_s_lock(&(log_sys->checkpoint_lock));
+ rw_lock_s_unlock(&(log_sys->checkpoint_lock));
+ }
+
+ return(FALSE);
+ }
+
+ log_sys->next_checkpoint_lsn = oldest_lsn;
+
+ if (log_debug_writes) {
+ printf("Making checkpoint no %lu at lsn %lu %lu\n",
+ ut_dulint_get_low(log_sys->next_checkpoint_no),
+ ut_dulint_get_high(oldest_lsn),
+ ut_dulint_get_low(oldest_lsn));
+ }
+
+ log_groups_write_checkpoint_info();
+
+ mutex_exit(&(log_sys->mutex));
+
+ if (sync) {
+ /* Wait for the checkpoint write to complete */
+ rw_lock_s_lock(&(log_sys->checkpoint_lock));
+ rw_lock_s_unlock(&(log_sys->checkpoint_lock));
+ }
+
+ return(TRUE);
+}
+
+/********************************************************************
+Makes a checkpoint at a given lsn or later. */
+
+void
+log_make_checkpoint_at(
+/*===================*/
+ dulint lsn, /* in: make a checkpoint at this or a later
+ lsn, if ut_dulint_max, makes a checkpoint at
+ the latest lsn */
+ ibool write_always) /* in: the function normally checks if the
+ the new checkpoint would have a greater
+ lsn than the previous one: if not, then no
+ physical write is done; by setting this
+ parameter TRUE, a physical write will always be
+ made to log files */
+{
+ ibool success;
+
+ /* Preflush pages synchronously */
+
+ success = FALSE;
+
+ while (!success) {
+ success = log_preflush_pool_modified_pages(lsn, TRUE);
+ }
+
+ success = FALSE;
+
+ while (!success) {
+ success = log_checkpoint(TRUE, write_always);
+ }
+}
+
+/********************************************************************
+Tries to establish a big enough margin of free space in the log groups, such
+that a new log entry can be catenated without an immediate need for a
+checkpoint. NOTE: this function may only be called if the calling thread
+owns no synchronization objects! */
+static
+void
+log_checkpoint_margin(void)
+/*=======================*/
+{
+ log_t* log = log_sys;
+ ulint age;
+ ulint checkpoint_age;
+ ulint advance;
+ dulint oldest_lsn;
+ dulint new_oldest;
+ ibool do_preflush;
+ ibool sync;
+ ibool checkpoint_sync;
+ ibool do_checkpoint;
+ ibool success;
+loop:
+ sync = FALSE;
+ checkpoint_sync = FALSE;
+ do_preflush = FALSE;
+ do_checkpoint = FALSE;
+
+ mutex_enter(&(log->mutex));
+
+ if (log->check_flush_or_checkpoint == FALSE) {
+ mutex_exit(&(log->mutex));
+
+ return;
+ }
+
+ oldest_lsn = log_buf_pool_get_oldest_modification();
+
+ age = ut_dulint_minus(log->lsn, oldest_lsn);
+
+ if (age > log->max_modified_age_sync) {
+
+ /* A flush is urgent: we have to do a synchronous preflush */
+
+ sync = TRUE;
+
+ advance = 2 * (age - log->max_modified_age_sync);
+
+ new_oldest = ut_dulint_add(oldest_lsn, advance);
+
+ do_preflush = TRUE;
+
+ } else if (age > log->max_modified_age_async) {
+
+ /* A flush is not urgent: we do an asynchronous preflush */
+ advance = age - log->max_modified_age_async;
+
+ new_oldest = ut_dulint_add(oldest_lsn, advance);
+
+ do_preflush = TRUE;
+ }
+
+ checkpoint_age = ut_dulint_minus(log->lsn, log->last_checkpoint_lsn);
+
+ if (checkpoint_age > log->max_checkpoint_age) {
+ /* A checkpoint is urgent: we do it synchronously */
+
+ checkpoint_sync = TRUE;
+
+ do_checkpoint = TRUE;
+
+ } else if (checkpoint_age > log->max_checkpoint_age_async) {
+ /* A checkpoint is not urgent: do it asynchronously */
+
+ do_checkpoint = TRUE;
+
+ log->check_flush_or_checkpoint = FALSE;
+ } else {
+ log->check_flush_or_checkpoint = FALSE;
+ }
+
+ mutex_exit(&(log->mutex));
+
+ if (do_preflush) {
+ success = log_preflush_pool_modified_pages(new_oldest, sync);
+
+ /* If the flush succeeded, this thread has done its part
+ and can proceed. If it did not succeed, there was another
+ thread doing a flush at the same time. If sync was FALSE,
+ the flush was not urgent, and we let this thread proceed.
+ Otherwise, we let it start from the beginning again. */
+
+ if (sync && !success) {
+ mutex_enter(&(log->mutex));
+
+ log->check_flush_or_checkpoint = TRUE;
+
+ mutex_exit(&(log->mutex));
+ goto loop;
+ }
+ }
+
+ if (do_checkpoint) {
+ log_checkpoint(checkpoint_sync, FALSE);
+
+ if (checkpoint_sync) {
+
+ goto loop;
+ }
+ }
+}
+
+/**********************************************************
+Reads a specified log segment to a buffer. */
+
+void
+log_group_read_log_seg(
+/*===================*/
+ ulint type, /* in: LOG_ARCHIVE or LOG_RECOVER */
+ byte* buf, /* in: buffer where to read */
+ log_group_t* group, /* in: log group */
+ dulint start_lsn, /* in: read area start */
+ dulint end_lsn) /* in: read area end */
+{
+ ulint len;
+ ulint source_offset;
+ ibool sync;
+
+ ut_ad(mutex_own(&(log_sys->mutex)));
+
+ sync = FALSE;
+
+ if (type == LOG_RECOVER) {
+ sync = TRUE;
+ }
+loop:
+ source_offset = log_group_calc_lsn_offset(start_lsn, group);
+
+ len = ut_dulint_minus(end_lsn, start_lsn);
+
+ ut_ad(len != 0);
+
+ if ((source_offset % group->file_size) + len > group->file_size) {
+
+ len = group->file_size - (source_offset % group->file_size);
+ }
+
+ if (type == LOG_ARCHIVE) {
+
+ log_sys->n_pending_archive_ios++;
+ }
+
+ log_sys->n_log_ios++;
+
+ fil_io(OS_FILE_READ | OS_FILE_LOG, sync, group->space_id,
+ source_offset / UNIV_PAGE_SIZE, source_offset % UNIV_PAGE_SIZE,
+ len, buf, &log_archive_io);
+
+ start_lsn = ut_dulint_add(start_lsn, len);
+ buf += len;
+
+ if (ut_dulint_cmp(start_lsn, end_lsn) != 0) {
+
+ goto loop;
+ }
+}
+
+/**********************************************************
+Generates an archived log file name. */
+
+void
+log_archived_file_name_gen(
+/*=======================*/
+ char* buf, /* in: buffer where to write */
+ ulint id, /* in: group id */
+ ulint file_no)/* in: file number */
+{
+ UT_NOT_USED(id); /* Currently we only archive the first group */
+
+ sprintf(buf, "%sib_arch_log_%010lu", srv_arch_dir, file_no);
+}
+
+/**********************************************************
+Writes a log file header to a log file space. */
+static
+void
+log_group_archive_file_header_write(
+/*================================*/
+ log_group_t* group, /* in: log group */
+ ulint nth_file, /* in: header to the nth file in the
+ archive log file space */
+ ulint file_no, /* in: archived file number */
+ dulint start_lsn) /* in: log file data starts at this
+ lsn */
+{
+ byte* buf;
+ ulint dest_offset;
+
+ ut_ad(mutex_own(&(log_sys->mutex)));
+
+ ut_a(nth_file < group->n_files);
+
+ buf = *(group->archive_file_header_bufs + nth_file);
+
+ mach_write_to_4(buf + LOG_GROUP_ID, group->id);
+ mach_write_to_8(buf + LOG_FILE_START_LSN, start_lsn);
+ mach_write_to_4(buf + LOG_FILE_NO, file_no);
+
+ mach_write_to_4(buf + LOG_FILE_ARCH_COMPLETED, FALSE);
+
+ dest_offset = nth_file * group->file_size;
+
+ log_sys->n_log_ios++;
+
+ fil_io(OS_FILE_WRITE | OS_FILE_LOG, TRUE, group->archive_space_id,
+ dest_offset / UNIV_PAGE_SIZE,
+ dest_offset % UNIV_PAGE_SIZE,
+ 2 * OS_FILE_LOG_BLOCK_SIZE,
+ buf, &log_archive_io);
+}
+
+/**********************************************************
+Writes a log file header to a completed archived log file. */
+static
+void
+log_group_archive_completed_header_write(
+/*=====================================*/
+ log_group_t* group, /* in: log group */
+ ulint nth_file, /* in: header to the nth file in the
+ archive log file space */
+ dulint end_lsn) /* in: end lsn of the file */
+{
+ byte* buf;
+ ulint dest_offset;
+
+ ut_ad(mutex_own(&(log_sys->mutex)));
+ ut_a(nth_file < group->n_files);
+
+ buf = *(group->archive_file_header_bufs + nth_file);
+
+ mach_write_to_4(buf + LOG_FILE_ARCH_COMPLETED, TRUE);
+ mach_write_to_8(buf + LOG_FILE_END_LSN, end_lsn);
+
+ dest_offset = nth_file * group->file_size + LOG_FILE_ARCH_COMPLETED;
+
+ log_sys->n_log_ios++;
+
+ fil_io(OS_FILE_WRITE | OS_FILE_LOG, TRUE, group->archive_space_id,
+ dest_offset / UNIV_PAGE_SIZE,
+ dest_offset % UNIV_PAGE_SIZE,
+ OS_FILE_LOG_BLOCK_SIZE,
+ buf + LOG_FILE_ARCH_COMPLETED,
+ &log_archive_io);
+}
+
+/**********************************************************
+Does the archive writes for a single log group. */
+static
+void
+log_group_archive(
+/*==============*/
+ log_group_t* group) /* in: log group */
+{
+ os_file_t file_handle;
+ dulint start_lsn;
+ dulint end_lsn;
+ char name[100];
+ byte* buf;
+ ulint len;
+ ibool ret;
+ ulint next_offset;
+ ulint n_files;
+ ulint open_mode;
+
+ ut_ad(mutex_own(&(log_sys->mutex)));
+
+ start_lsn = log_sys->archived_lsn;
+
+ ut_ad(ut_dulint_get_low(start_lsn) % OS_FILE_LOG_BLOCK_SIZE == 0);
+
+ end_lsn = log_sys->next_archived_lsn;
+
+ ut_ad(ut_dulint_get_low(end_lsn) % OS_FILE_LOG_BLOCK_SIZE == 0);
+
+ buf = log_sys->archive_buf;
+
+ n_files = 0;
+
+ next_offset = group->archived_offset;
+loop:
+ if ((next_offset % group->file_size == 0)
+ || (fil_space_get_size(group->archive_space_id) == 0)) {
+
+ /* Add the file to the archive file space; create or open the
+ file */
+
+ if (next_offset % group->file_size == 0) {
+ open_mode = OS_FILE_CREATE;
+ } else {
+ open_mode = OS_FILE_OPEN;
+ }
+
+ log_archived_file_name_gen(name, group->id,
+ group->archived_file_no + n_files);
+ fil_reserve_right_to_open();
+
+ file_handle = os_file_create(name, open_mode, OS_FILE_AIO,
+ &ret);
+ if (!ret && (open_mode == OS_FILE_CREATE)) {
+ file_handle = os_file_create(name, OS_FILE_OPEN,
+ OS_FILE_AIO, &ret);
+ }
+
+ ut_a(ret);
+
+ if (log_debug_writes) {
+ printf("Created archive file %s\n", name);
+ }
+
+ ret = os_file_close(file_handle);
+
+ ut_a(ret);
+
+ fil_release_right_to_open();
+
+ /* Add the archive file as a node to the space */
+
+ fil_node_create(name, group->file_size / UNIV_PAGE_SIZE,
+ group->archive_space_id);
+
+ if (next_offset % group->file_size == 0) {
+ log_group_archive_file_header_write(group, n_files,
+ group->archived_file_no + n_files,
+ start_lsn);
+
+ next_offset += LOG_FILE_HDR_SIZE;
+ }
+ }
+
+ len = ut_dulint_minus(end_lsn, start_lsn);
+
+ if (group->file_size < (next_offset % group->file_size) + len) {
+
+ len = group->file_size - (next_offset % group->file_size);
+ }
+
+ if (log_debug_writes) {
+ printf(
+ "Archiving starting at lsn %lu %lu, len %lu to group %lu\n",
+ ut_dulint_get_high(start_lsn),
+ ut_dulint_get_low(start_lsn),
+ len, group->id);
+ }
+
+ log_sys->n_pending_archive_ios++;
+
+ log_sys->n_log_ios++;
+
+ fil_io(OS_FILE_WRITE | OS_FILE_LOG, FALSE, group->archive_space_id,
+ next_offset / UNIV_PAGE_SIZE, next_offset % UNIV_PAGE_SIZE,
+ ut_calc_align(len, OS_FILE_LOG_BLOCK_SIZE), buf,
+ &log_archive_io);
+
+ start_lsn = ut_dulint_add(start_lsn, len);
+ next_offset += len;
+ buf += len;
+
+ if (next_offset % group->file_size == 0) {
+ n_files++;
+ }
+
+ if (ut_dulint_cmp(end_lsn, start_lsn) != 0) {
+
+ goto loop;
+ }
+
+ group->next_archived_file_no = group->archived_file_no + n_files;
+ group->next_archived_offset = next_offset % group->file_size;
+
+ ut_ad(group->next_archived_offset % OS_FILE_LOG_BLOCK_SIZE == 0);
+}
+
+/*********************************************************
+(Writes to the archive of each log group.) Currently, only the first
+group is archived. */
+static
+void
+log_archive_groups(void)
+/*====================*/
+{
+ log_group_t* group;
+
+ ut_ad(mutex_own(&(log_sys->mutex)));
+
+ group = UT_LIST_GET_FIRST(log_sys->log_groups);
+
+ log_group_archive(group);
+}
+
+/*********************************************************
+Completes the archiving write phase for (each log group), currently,
+the first log group. */
+static
+void
+log_archive_write_complete_groups(void)
+/*===================================*/
+{
+ log_group_t* group;
+ ulint end_offset;
+ ulint trunc_files;
+ ulint n_files;
+ dulint start_lsn;
+ dulint end_lsn;
+ ulint i;
+
+ ut_ad(mutex_own(&(log_sys->mutex)));
+
+ group = UT_LIST_GET_FIRST(log_sys->log_groups);
+
+ group->archived_file_no = group->next_archived_file_no;
+ group->archived_offset = group->next_archived_offset;
+
+ /* Truncate from the archive file space all but the last
+ file, or if it has been written full, all files */
+
+ n_files = (UNIV_PAGE_SIZE
+ * fil_space_get_size(group->archive_space_id))
+ / group->file_size;
+ ut_ad(n_files > 0);
+
+ end_offset = group->archived_offset;
+
+ if (end_offset % group->file_size == 0) {
+
+ trunc_files = n_files;
+ } else {
+ trunc_files = n_files - 1;
+ }
+
+ if (log_debug_writes && trunc_files) {
+ printf("Complete file(s) archived to group %lu\n",
+ group->id);
+ }
+
+ /* Calculate the archive file space start lsn */
+ start_lsn = ut_dulint_subtract(log_sys->next_archived_lsn,
+ end_offset - LOG_FILE_HDR_SIZE
+ + trunc_files
+ * (group->file_size - LOG_FILE_HDR_SIZE));
+ end_lsn = start_lsn;
+
+ for (i = 0; i < trunc_files; i++) {
+
+ end_lsn = ut_dulint_add(end_lsn,
+ group->file_size - LOG_FILE_HDR_SIZE);
+
+ /* Write a notice to the headers of archived log
+ files that the file write has been completed */
+
+ log_group_archive_completed_header_write(group, i, end_lsn);
+ }
+
+ fil_space_truncate_start(group->archive_space_id,
+ trunc_files * group->file_size);
+
+ if (log_debug_writes) {
+ printf("Archiving writes completed\n");
+ }
+}
+
+/**********************************************************
+Completes an archiving i/o. */
+static
+void
+log_archive_check_completion_low(void)
+/*==================================*/
+{
+ ut_ad(mutex_own(&(log_sys->mutex)));
+
+ if (log_sys->n_pending_archive_ios == 0
+ && log_sys->archiving_phase == LOG_ARCHIVE_READ) {
+
+ if (log_debug_writes) {
+ printf("Archiving read completed\n");
+ }
+
+ /* Archive buffer has now been read in: start archive writes */
+
+ log_sys->archiving_phase = LOG_ARCHIVE_WRITE;
+
+ log_archive_groups();
+ }
+
+ if (log_sys->n_pending_archive_ios == 0
+ && log_sys->archiving_phase == LOG_ARCHIVE_WRITE) {
+
+ log_archive_write_complete_groups();
+
+ log_sys->archived_lsn = log_sys->next_archived_lsn;
+
+ rw_lock_x_unlock_gen(&(log_sys->archive_lock), LOG_ARCHIVE);
+ }
+}
+
+/**********************************************************
+Completes an archiving i/o. */
+static
+void
+log_io_complete_archive(void)
+/*=========================*/
+{
+ log_group_t* group;
+
+ mutex_enter(&(log_sys->mutex));
+
+ group = UT_LIST_GET_FIRST(log_sys->log_groups);
+
+ mutex_exit(&(log_sys->mutex));
+
+ fil_flush(group->archive_space_id);
+
+ mutex_enter(&(log_sys->mutex));
+
+ ut_ad(log_sys->n_pending_archive_ios > 0);
+
+ log_sys->n_pending_archive_ios--;
+
+ log_archive_check_completion_low();
+
+ mutex_exit(&(log_sys->mutex));
+}
+
+/************************************************************************
+Starts an archiving operation. */
+
+ibool
+log_archive_do(
+/*===========*/
+ /* out: TRUE if succeed, FALSE if an archiving
+ operation was already running */
+ ibool sync, /* in: TRUE if synchronous operation is desired */
+ ulint* n_bytes)/* out: archive log buffer size, 0 if nothing to
+ archive */
+{
+ ibool calc_new_limit;
+ dulint start_lsn;
+ dulint limit_lsn;
+
+ calc_new_limit = TRUE;
+loop:
+ mutex_enter(&(log_sys->mutex));
+
+ if (log_sys->archiving_state == LOG_ARCH_OFF) {
+ mutex_exit(&(log_sys->mutex));
+
+ *n_bytes = 0;
+
+ return(TRUE);
+
+ } else if (log_sys->archiving_state == LOG_ARCH_STOPPED
+ || log_sys->archiving_state == LOG_ARCH_STOPPING2) {
+
+ mutex_exit(&(log_sys->mutex));
+
+ os_event_wait(log_sys->archiving_on);
+
+ mutex_enter(&(log_sys->mutex));
+
+ goto loop;
+ }
+
+ start_lsn = log_sys->archived_lsn;
+
+ if (calc_new_limit) {
+ ut_ad(log_sys->archive_buf_size % OS_FILE_LOG_BLOCK_SIZE == 0);
+
+ limit_lsn = ut_dulint_add(start_lsn,
+ log_sys->archive_buf_size);
+
+ *n_bytes = log_sys->archive_buf_size;
+
+ if (ut_dulint_cmp(limit_lsn, log_sys->lsn) >= 0) {
+
+ limit_lsn = ut_dulint_align_down(log_sys->lsn,
+ OS_FILE_LOG_BLOCK_SIZE);
+ }
+ }
+
+ if (ut_dulint_cmp(log_sys->archived_lsn, limit_lsn) >= 0) {
+
+ mutex_exit(&(log_sys->mutex));
+
+ *n_bytes = 0;
+
+ return(TRUE);
+ }
+
+ if (ut_dulint_cmp(log_sys->written_to_all_lsn, limit_lsn) < 0) {
+
+ mutex_exit(&(log_sys->mutex));
+
+ log_flush_up_to(limit_lsn, LOG_WAIT_ALL_GROUPS);
+
+ calc_new_limit = FALSE;
+
+ goto loop;
+ }
+
+ if (log_sys->n_pending_archive_ios > 0) {
+ /* An archiving operation is running */
+
+ mutex_exit(&(log_sys->mutex));
+
+ if (sync) {
+ rw_lock_s_lock(&(log_sys->archive_lock));
+ rw_lock_s_unlock(&(log_sys->archive_lock));
+ }
+
+ *n_bytes = log_sys->archive_buf_size;
+
+ return(FALSE);
+ }
+
+ rw_lock_x_lock_gen(&(log_sys->archive_lock), LOG_ARCHIVE);
+
+ log_sys->archiving_phase = LOG_ARCHIVE_READ;
+
+ log_sys->next_archived_lsn = limit_lsn;
+
+ if (log_debug_writes) {
+ printf("Archiving from lsn %lu %lu to lsn %lu %lu\n",
+ ut_dulint_get_high(log_sys->archived_lsn),
+ ut_dulint_get_low(log_sys->archived_lsn),
+ ut_dulint_get_high(limit_lsn),
+ ut_dulint_get_low(limit_lsn));
+ }
+
+ /* Read the log segment to the archive buffer */
+
+ log_group_read_log_seg(LOG_ARCHIVE, log_sys->archive_buf,
+ UT_LIST_GET_FIRST(log_sys->log_groups),
+ start_lsn, limit_lsn);
+
+ mutex_exit(&(log_sys->mutex));
+
+ if (sync) {
+ rw_lock_s_lock(&(log_sys->archive_lock));
+ rw_lock_s_unlock(&(log_sys->archive_lock));
+ }
+
+ *n_bytes = log_sys->archive_buf_size;
+
+ return(TRUE);
+}
+
+/********************************************************************
+Writes the log contents to the archive at least up to the lsn when this
+function was called. */
+static
+void
+log_archive_all(void)
+/*=================*/
+{
+ dulint present_lsn;
+ ulint dummy;
+
+ mutex_enter(&(log_sys->mutex));
+
+ if (log_sys->archiving_state == LOG_ARCH_OFF) {
+ mutex_exit(&(log_sys->mutex));
+
+ return;
+ }
+
+ present_lsn = log_sys->lsn;
+
+ mutex_exit(&(log_sys->mutex));
+
+ log_pad_current_log_block();
+
+ for (;;) {
+ mutex_enter(&(log_sys->mutex));
+
+ if (ut_dulint_cmp(present_lsn, log_sys->archived_lsn) <= 0) {
+
+ mutex_exit(&(log_sys->mutex));
+
+ return;
+ }
+
+ mutex_exit(&(log_sys->mutex));
+
+ log_archive_do(TRUE, &dummy);
+ }
+}
+
+/*********************************************************
+Closes the possible open archive log file (for each group) the first group,
+and if it was open, increments the group file count by 2, if desired. */
+static
+void
+log_archive_close_groups(
+/*=====================*/
+ ibool increment_file_count) /* in: TRUE if we want to increment
+ the file count */
+{
+ log_group_t* group;
+ ulint trunc_len;
+
+ ut_ad(mutex_own(&(log_sys->mutex)));
+
+ group = UT_LIST_GET_FIRST(log_sys->log_groups);
+
+ trunc_len = UNIV_PAGE_SIZE
+ * fil_space_get_size(group->archive_space_id);
+
+ if (trunc_len > 0) {
+ ut_a(trunc_len == group->file_size);
+
+ /* Write a notice to the headers of archived log
+ files that the file write has been completed */
+
+ log_group_archive_completed_header_write(group,
+ 0, log_sys->archived_lsn);
+
+ fil_space_truncate_start(group->archive_space_id,
+ trunc_len);
+ if (increment_file_count) {
+ group->archived_offset = 0;
+ group->archived_file_no += 2;
+ }
+
+ if (log_debug_writes) {
+ printf(
+ "Incrementing arch file no to %lu in log group %lu\n",
+ group->archived_file_no + 2, group->id);
+ }
+ }
+}
+
+/********************************************************************
+Writes the log contents to the archive up to the lsn when this function was
+called, and stops the archiving. When archiving is started again, the archived
+log file numbers start from 2 higher, so that the archiving will
+not write again to the archived log files which exist when this function
+returns. */
+
+ulint
+log_archive_stop(void)
+/*==================*/
+ /* out: DB_SUCCESS or DB_ERROR */
+{
+ ibool success;
+
+ mutex_enter(&(log_sys->mutex));
+
+ if (log_sys->archiving_state != LOG_ARCH_ON) {
+
+ mutex_exit(&(log_sys->mutex));
+
+ return(DB_ERROR);
+ }
+
+ log_sys->archiving_state = LOG_ARCH_STOPPING;
+
+ mutex_exit(&(log_sys->mutex));
+
+ log_archive_all();
+
+ mutex_enter(&(log_sys->mutex));
+
+ log_sys->archiving_state = LOG_ARCH_STOPPING2;
+ os_event_reset(log_sys->archiving_on);
+
+ mutex_exit(&(log_sys->mutex));
+
+ /* Wait for a possible archiving operation to end */
+
+ rw_lock_s_lock(&(log_sys->archive_lock));
+ rw_lock_s_unlock(&(log_sys->archive_lock));
+
+ mutex_enter(&(log_sys->mutex));
+
+ /* Close all archived log files, incrementing the file count by 2,
+ if appropriate */
+
+ log_archive_close_groups(TRUE);
+
+ mutex_exit(&(log_sys->mutex));
+
+ /* Make a checkpoint, so that if recovery is needed, the file numbers
+ of new archived log files will start from the right value */
+
+ success = FALSE;
+
+ while (!success) {
+ success = log_checkpoint(TRUE, TRUE);
+ }
+
+ mutex_enter(&(log_sys->mutex));
+
+ log_sys->archiving_state = LOG_ARCH_STOPPED;
+
+ mutex_exit(&(log_sys->mutex));
+
+ return(DB_SUCCESS);
+}
+
+/********************************************************************
+Starts again archiving which has been stopped. */
+
+ulint
+log_archive_start(void)
+/*===================*/
+ /* out: DB_SUCCESS or DB_ERROR */
+{
+ mutex_enter(&(log_sys->mutex));
+
+ if (log_sys->archiving_state != LOG_ARCH_STOPPED) {
+
+ mutex_exit(&(log_sys->mutex));
+
+ return(DB_ERROR);
+ }
+
+ log_sys->archiving_state = LOG_ARCH_ON;
+
+ os_event_set(log_sys->archiving_on);
+
+ mutex_exit(&(log_sys->mutex));
+
+ return(DB_SUCCESS);
+}
+
+/********************************************************************
+Stop archiving the log so that a gap may occur in the archived log files. */
+
+ulint
+log_archive_noarchivelog(void)
+/*==========================*/
+ /* out: DB_SUCCESS or DB_ERROR */
+{
+loop:
+ mutex_enter(&(log_sys->mutex));
+
+ if (log_sys->archiving_state == LOG_ARCH_STOPPED
+ || log_sys->archiving_state == LOG_ARCH_OFF) {
+
+ log_sys->archiving_state = LOG_ARCH_OFF;
+
+ os_event_set(log_sys->archiving_on);
+
+ mutex_exit(&(log_sys->mutex));
+
+ return(DB_SUCCESS);
+ }
+
+ mutex_exit(&(log_sys->mutex));
+
+ log_archive_stop();
+
+ os_thread_sleep(500000);
+
+ goto loop;
+}
+
+/********************************************************************
+Start archiving the log so that a gap may occur in the archived log files. */
+
+ulint
+log_archive_archivelog(void)
+/*========================*/
+ /* out: DB_SUCCESS or DB_ERROR */
+{
+ mutex_enter(&(log_sys->mutex));
+
+ if (log_sys->archiving_state == LOG_ARCH_OFF) {
+
+ log_sys->archiving_state = LOG_ARCH_ON;
+
+ log_sys->archived_lsn = ut_dulint_align_down(log_sys->lsn,
+ OS_FILE_LOG_BLOCK_SIZE);
+ mutex_exit(&(log_sys->mutex));
+
+ return(DB_SUCCESS);
+ }
+
+ mutex_exit(&(log_sys->mutex));
+
+ return(DB_ERROR);
+}
+
+/********************************************************************
+Tries to establish a big enough margin of free space in the log groups, such
+that a new log entry can be catenated without an immediate need for
+archiving. */
+static
+void
+log_archive_margin(void)
+/*====================*/
+{
+ log_t* log = log_sys;
+ ulint age;
+ ibool sync;
+ ulint dummy;
+loop:
+ mutex_enter(&(log->mutex));
+
+ if (log->archiving_state == LOG_ARCH_OFF) {
+ mutex_exit(&(log->mutex));
+
+ return;
+ }
+
+ age = ut_dulint_minus(log->lsn, log->archived_lsn);
+
+ if (age > log->max_archived_lsn_age) {
+
+ /* An archiving is urgent: we have to do synchronous i/o */
+
+ sync = TRUE;
+
+ } else if (age > log->max_archived_lsn_age_async) {
+
+ /* An archiving is not urgent: we do asynchronous i/o */
+
+ sync = FALSE;
+ } else {
+ /* No archiving required yet */
+
+ mutex_exit(&(log->mutex));
+
+ return;
+ }
+
+ mutex_exit(&(log->mutex));
+
+ log_archive_do(sync, &dummy);
+
+ if (sync == TRUE) {
+ /* Check again that enough was written to the archive */
+
+ goto loop;
+ }
+}
+
+/************************************************************************
+Checks that there is enough free space in the log to start a new query step.
+Flushes the log buffer or makes a new checkpoint if necessary. NOTE: this
+function may only be called if the calling thread owns no synchronization
+objects! */
+
+void
+log_check_margins(void)
+/*===================*/
+{
+loop:
+ log_flush_margin();
+
+ log_checkpoint_margin();
+
+ log_archive_margin();
+
+ mutex_enter(&(log_sys->mutex));
+
+ if (log_sys->check_flush_or_checkpoint) {
+
+ mutex_exit(&(log_sys->mutex));
+
+ goto loop;
+ }
+
+ mutex_exit(&(log_sys->mutex));
+}
+
+/**********************************************************
+Switches the database to the online backup state. */
+
+ulint
+log_switch_backup_state_on(void)
+/*============================*/
+ /* out: DB_SUCCESS or DB_ERROR */
+{
+ dulint backup_lsn;
+
+ mutex_enter(&(log_sys->mutex));
+
+ if (log_sys->online_backup_state) {
+
+ /* The database is already in that state */
+
+ mutex_exit(&(log_sys->mutex));
+
+ return(DB_ERROR);
+ }
+
+ log_sys->online_backup_state = TRUE;
+
+ backup_lsn = log_sys->lsn;
+
+ log_sys->online_backup_lsn = backup_lsn;
+
+ mutex_exit(&(log_sys->mutex));
+
+ /* log_checkpoint_and_mark_file_spaces(); */
+
+ return(DB_SUCCESS);
+}
+
+/**********************************************************
+Switches the online backup state off. */
+
+ulint
+log_switch_backup_state_off(void)
+/*=============================*/
+ /* out: DB_SUCCESS or DB_ERROR */
+{
+ mutex_enter(&(log_sys->mutex));
+
+ if (!log_sys->online_backup_state) {
+
+ /* The database is already in that state */
+
+ mutex_exit(&(log_sys->mutex));
+
+ return(DB_ERROR);
+ }
+
+ log_sys->online_backup_state = FALSE;
+
+ mutex_exit(&(log_sys->mutex));
+
+ return(DB_SUCCESS);
+}
+
+/********************************************************************
+Makes a checkpoint at the latest lsn and writes it to first page of each
+data file in the database, so that we know that the file spaces contain
+all modifications up to that lsn. This can only be called at database
+shutdown. This function also writes all log in log files to the log archive. */
+
+void
+logs_empty_and_mark_files_at_shutdown(void)
+/*=======================================*/
+{
+ dulint lsn;
+ ulint arch_log_no;
+
+ fprintf(stderr, "Innobase: Starting shutdown...\n");
+
+ /* Wait until the master thread and all other operations are idle: our
+ algorithm only works if the server is idle at shutdown */
+loop:
+ os_thread_sleep(100000);
+
+ mutex_enter(&kernel_mutex);
+
+ if (trx_n_mysql_transactions > 0
+ || UT_LIST_GET_LEN(trx_sys->trx_list) > 0) {
+
+ mutex_exit(&kernel_mutex);
+
+ goto loop;
+ }
+
+ if (srv_n_threads_active[SRV_MASTER] != 0) {
+
+ mutex_exit(&kernel_mutex);
+
+ goto loop;
+ }
+
+ mutex_exit(&kernel_mutex);
+
+ mutex_enter(&(log_sys->mutex));
+
+ if (log_sys->n_pending_archive_ios
+ + log_sys->n_pending_checkpoint_writes
+ + log_sys->n_pending_writes > 0) {
+
+ mutex_exit(&(log_sys->mutex));
+
+ goto loop;
+ }
+
+ mutex_exit(&(log_sys->mutex));
+
+ if (!buf_pool_check_no_pending_io()) {
+
+ goto loop;
+ }
+
+ log_archive_all();
+
+ log_make_checkpoint_at(ut_dulint_max, TRUE);
+
+ mutex_enter(&(log_sys->mutex));
+
+ lsn = log_sys->lsn;
+
+ if (ut_dulint_cmp(lsn, log_sys->last_checkpoint_lsn) != 0
+ || (srv_log_archive_on
+ && ut_dulint_cmp(lsn,
+ ut_dulint_add(log_sys->archived_lsn, LOG_BLOCK_HDR_SIZE)) != 0)) {
+
+ mutex_exit(&(log_sys->mutex));
+
+ goto loop;
+ }
+
+ arch_log_no =
+ UT_LIST_GET_FIRST(log_sys->log_groups)->archived_file_no;
+
+ if (0 == UT_LIST_GET_FIRST(log_sys->log_groups)->archived_offset) {
+
+ arch_log_no--;
+ }
+
+ log_archive_close_groups(TRUE);
+
+ mutex_exit(&(log_sys->mutex));
+
+ fil_flush_file_spaces(FIL_TABLESPACE);
+ fil_flush_file_spaces(FIL_LOG);
+
+ /* The following fil_write_... will pass the buffer pool: therefore
+ it is essential that the buffer pool has been completely flushed
+ to disk! */
+
+ if (!buf_all_freed()) {
+
+ goto loop;
+ }
+
+ fil_write_flushed_lsn_to_data_files(lsn, arch_log_no);
+
+ fil_flush_file_spaces(FIL_TABLESPACE);
+
+ fprintf(stderr, "Innobase: Shutdown completed\n");
+}
+
+/**********************************************************
+Checks by parsing that the catenated log segment for a single mtr is
+consistent. */
+
+ibool
+log_check_log_recs(
+/*===============*/
+ byte* buf, /* in: pointer to the start of the log segment
+ in the log_sys->buf log buffer */
+ ulint len, /* in: segment length in bytes */
+ dulint buf_start_lsn) /* in: buffer start lsn */
+{
+ dulint contiguous_lsn;
+ dulint scanned_lsn;
+ byte* start;
+ byte* end;
+ byte* buf1;
+ byte* scan_buf;
+
+ ut_ad(mutex_own(&(log_sys->mutex)));
+
+ if (len == 0) {
+
+ return(TRUE);
+ }
+
+ start = ut_align_down(buf, OS_FILE_LOG_BLOCK_SIZE);
+ end = ut_align(buf + len, OS_FILE_LOG_BLOCK_SIZE);
+
+ buf1 = mem_alloc((end - start) + OS_FILE_LOG_BLOCK_SIZE);
+ scan_buf = ut_align(buf1, OS_FILE_LOG_BLOCK_SIZE);
+
+ ut_memcpy(scan_buf, start, end - start);
+
+ recv_scan_log_recs(FALSE, scan_buf, end - start,
+ ut_dulint_align_down(buf_start_lsn,
+ OS_FILE_LOG_BLOCK_SIZE),
+ &contiguous_lsn, &scanned_lsn);
+
+ ut_a(ut_dulint_cmp(scanned_lsn, ut_dulint_add(buf_start_lsn, len))
+ == 0);
+ ut_a(ut_dulint_cmp(recv_sys->recovered_lsn, scanned_lsn) == 0);
+
+ mem_free(buf1);
+
+ return(TRUE);
+}
+
+/**********************************************************
+Prints info of the log. */
+
+void
+log_print(void)
+/*===========*/
+{
+ printf("Log sequence number %lu %lu\n",
+ ut_dulint_get_high(log_sys->lsn),
+ ut_dulint_get_low(log_sys->lsn));
+}
+
diff --git a/innobase/log/log0recv.c b/innobase/log/log0recv.c
new file mode 100644
index 00000000000..1d0b556f1b6
--- /dev/null
+++ b/innobase/log/log0recv.c
@@ -0,0 +1,2512 @@
+/******************************************************
+Recovery
+
+(c) 1997 Innobase Oy
+
+Created 9/20/1997 Heikki Tuuri
+*******************************************************/
+
+#include "log0recv.h"
+
+#ifdef UNIV_NONINL
+#include "log0recv.ic"
+#endif
+
+#include "mem0mem.h"
+#include "buf0buf.h"
+#include "buf0flu.h"
+#include "buf0rea.h"
+#include "srv0srv.h"
+#include "mtr0mtr.h"
+#include "mtr0log.h"
+#include "page0page.h"
+#include "page0cur.h"
+#include "btr0btr.h"
+#include "btr0cur.h"
+#include "ibuf0ibuf.h"
+#include "trx0undo.h"
+#include "trx0rec.h"
+#include "trx0roll.h"
+#include "btr0cur.h"
+#include "btr0cur.h"
+#include "btr0cur.h"
+#include "dict0boot.h"
+#include "fil0fil.h"
+
+/* Size of block reads when the log groups are scanned forward to do a
+roll-forward */
+#define RECV_SCAN_SIZE (4 * UNIV_PAGE_SIZE)
+
+/* Size of the parsing buffer */
+#define RECV_PARSING_BUF_SIZE LOG_BUFFER_SIZE
+
+/* Log records are stored in the hash table in chunks at most of this size;
+this must be less than UNIV_PAGE_SIZE as it is stored in the buffer pool */
+#define RECV_DATA_BLOCK_SIZE (MEM_MAX_ALLOC_IN_BUF - sizeof(recv_data_t))
+
+/* Read-ahead area in applying log records to file pages */
+#define RECV_READ_AHEAD_AREA 32
+
+recv_sys_t* recv_sys = NULL;
+ibool recv_recovery_on = FALSE;
+ibool recv_recovery_from_backup_on = FALSE;
+
+/* If the following is TRUE, the buffer pool file pages must be invalidated
+after recovery and no ibuf operations are allowed; this becomes TRUE if
+the log record hash table becomes too full, and log records must be merged
+to file pages already before the recovery is finished: in this case no
+ibuf operations are allowed, as they could modify the pages read in the
+buffer pool before the pages have been recovered to the up-to-date state */
+
+/* Recovery is running and no operations on the log files are allowed
+yet: the variable name is misleading */
+
+ibool recv_no_ibuf_operations = FALSE;
+
+/************************************************************
+Creates the recovery system. */
+
+void
+recv_sys_create(void)
+/*=================*/
+{
+ if (recv_sys != NULL) {
+
+ return;
+ }
+
+ recv_sys = mem_alloc(sizeof(recv_sys_t));
+
+ mutex_create(&(recv_sys->mutex));
+ mutex_set_level(&(recv_sys->mutex), SYNC_RECV);
+
+ recv_sys->heap = NULL;
+ recv_sys->addr_hash = NULL;
+}
+
+/************************************************************
+Inits the recovery system for a recovery operation. */
+
+void
+recv_sys_init(void)
+/*===============*/
+{
+ if (recv_sys->heap != NULL) {
+
+ return;
+ }
+
+ mutex_enter(&(recv_sys->mutex));
+
+ recv_sys->heap = mem_heap_create_in_buffer(256);
+
+ recv_sys->buf = ut_malloc(RECV_PARSING_BUF_SIZE);
+ recv_sys->len = 0;
+ recv_sys->recovered_offset = 0;
+
+ recv_sys->addr_hash = hash_create(buf_pool_get_curr_size() / 64);
+ recv_sys->n_addrs = 0;
+
+ recv_sys->apply_log_recs = FALSE;
+ recv_sys->apply_batch_on = FALSE;
+
+ recv_sys->last_block_buf_start = mem_alloc(2 * OS_FILE_LOG_BLOCK_SIZE);
+
+ recv_sys->last_block = ut_align(recv_sys->last_block_buf_start,
+ OS_FILE_LOG_BLOCK_SIZE);
+ mutex_exit(&(recv_sys->mutex));
+}
+
+/************************************************************
+Empties the hash table when it has been fully processed. */
+static
+void
+recv_sys_empty_hash(void)
+/*=====================*/
+{
+ ut_ad(mutex_own(&(recv_sys->mutex)));
+ ut_a(recv_sys->n_addrs == 0);
+
+ hash_table_free(recv_sys->addr_hash);
+ mem_heap_empty(recv_sys->heap);
+
+ recv_sys->addr_hash = hash_create(buf_pool_get_curr_size() / 256);
+}
+
+/************************************************************
+Frees the recovery system. */
+
+void
+recv_sys_free(void)
+/*===============*/
+{
+ mutex_enter(&(recv_sys->mutex));
+
+ hash_table_free(recv_sys->addr_hash);
+ mem_heap_free(recv_sys->heap);
+ ut_free(recv_sys->buf);
+ mem_free(recv_sys->last_block_buf_start);
+
+ recv_sys->addr_hash = NULL;
+ recv_sys->heap = NULL;
+
+ mutex_exit(&(recv_sys->mutex));
+}
+
+/************************************************************
+Truncates possible corrupted or extra records from a log group. */
+static
+void
+recv_truncate_group(
+/*================*/
+ log_group_t* group, /* in: log group */
+ dulint recovered_lsn, /* in: recovery succeeded up to this
+ lsn */
+ dulint limit_lsn, /* in: this was the limit for
+ recovery */
+ dulint checkpoint_lsn, /* in: recovery was started from this
+ checkpoint */
+ dulint archived_lsn) /* in: the log has been archived up to
+ this lsn */
+{
+ dulint start_lsn;
+ dulint end_lsn;
+ dulint finish_lsn1;
+ dulint finish_lsn2;
+ dulint finish_lsn;
+ ulint len;
+ ulint i;
+
+ if (ut_dulint_cmp(archived_lsn, ut_dulint_max) == 0) {
+ /* Checkpoint was taken in the NOARCHIVELOG mode */
+ archived_lsn = checkpoint_lsn;
+ }
+
+ finish_lsn1 = ut_dulint_add(ut_dulint_align_down(archived_lsn,
+ OS_FILE_LOG_BLOCK_SIZE),
+ log_group_get_capacity(group));
+
+ finish_lsn2 = ut_dulint_add(ut_dulint_align_up(recovered_lsn,
+ OS_FILE_LOG_BLOCK_SIZE),
+ recv_sys->last_log_buf_size);
+
+ if (ut_dulint_cmp(limit_lsn, ut_dulint_max) != 0) {
+ /* We do not know how far we should erase log records: erase
+ as much as possible */
+
+ finish_lsn = finish_lsn1;
+ } else {
+ /* It is enough to erase the length of the log buffer */
+ finish_lsn = ut_dulint_get_min(finish_lsn1, finish_lsn2);
+ }
+
+ ut_a(RECV_SCAN_SIZE <= log_sys->buf_size);
+
+ /* Write the log buffer full of zeros */
+ for (i = 0; i < RECV_SCAN_SIZE; i++) {
+
+ *(log_sys->buf + i) = '\0';
+ }
+
+ start_lsn = ut_dulint_align_down(recovered_lsn,
+ OS_FILE_LOG_BLOCK_SIZE);
+
+ if (ut_dulint_cmp(start_lsn, recovered_lsn) != 0) {
+ /* Copy the last incomplete log block to the log buffer and
+ edit its data length: */
+
+ ut_memcpy(log_sys->buf, recv_sys->last_block,
+ OS_FILE_LOG_BLOCK_SIZE);
+ log_block_set_data_len(log_sys->buf,
+ ut_dulint_minus(recovered_lsn, start_lsn));
+ }
+
+ if (ut_dulint_cmp(start_lsn, finish_lsn) >= 0) {
+
+ return;
+ }
+
+ for (;;) {
+ end_lsn = ut_dulint_add(start_lsn, RECV_SCAN_SIZE);
+
+ if (ut_dulint_cmp(end_lsn, finish_lsn) > 0) {
+
+ end_lsn = finish_lsn;
+ }
+
+ len = ut_dulint_minus(end_lsn, start_lsn);
+
+ log_group_write_buf(LOG_RECOVER, group, log_sys->buf, len,
+ start_lsn, 0);
+ if (ut_dulint_cmp(end_lsn, finish_lsn) >= 0) {
+
+ return;
+ }
+
+ /* Write the log buffer full of zeros */
+ for (i = 0; i < RECV_SCAN_SIZE; i++) {
+
+ *(log_sys->buf + i) = '\0';
+ }
+
+ start_lsn = end_lsn;
+ }
+}
+
+/************************************************************
+Copies the log segment between group->recovered_lsn and recovered_lsn from the
+most up-to-date log group to group, so that it contains the latest log data. */
+static
+void
+recv_copy_group(
+/*============*/
+ log_group_t* up_to_date_group, /* in: the most up-to-date log
+ group */
+ log_group_t* group, /* in: copy to this log group */
+ dulint recovered_lsn) /* in: recovery succeeded up
+ to this lsn */
+{
+ dulint start_lsn;
+ dulint end_lsn;
+ ulint len;
+
+ if (ut_dulint_cmp(group->scanned_lsn, recovered_lsn) >= 0) {
+
+ return;
+ }
+
+ ut_a(RECV_SCAN_SIZE <= log_sys->buf_size);
+
+ start_lsn = ut_dulint_align_down(group->scanned_lsn,
+ OS_FILE_LOG_BLOCK_SIZE);
+ for (;;) {
+ end_lsn = ut_dulint_add(start_lsn, RECV_SCAN_SIZE);
+
+ if (ut_dulint_cmp(end_lsn, recovered_lsn) > 0) {
+ end_lsn = ut_dulint_align_up(recovered_lsn,
+ OS_FILE_LOG_BLOCK_SIZE);
+ }
+
+ log_group_read_log_seg(LOG_RECOVER, log_sys->buf,
+ up_to_date_group, start_lsn, end_lsn);
+
+ len = ut_dulint_minus(end_lsn, start_lsn);
+
+ log_group_write_buf(LOG_RECOVER, group, log_sys->buf, len,
+ start_lsn, 0);
+
+ if (ut_dulint_cmp(end_lsn, recovered_lsn) >= 0) {
+
+ return;
+ }
+
+ start_lsn = end_lsn;
+ }
+}
+
+/************************************************************
+Copies a log segment from the most up-to-date log group to the other log
+groups, so that they all contain the latest log data. Also writes the info
+about the latest checkpoint to the groups, and inits the fields in the group
+memory structs to up-to-date values. */
+
+void
+recv_synchronize_groups(
+/*====================*/
+ log_group_t* up_to_date_group) /* in: the most up-to-date
+ log group */
+{
+ log_group_t* group;
+ dulint start_lsn;
+ dulint end_lsn;
+ dulint recovered_lsn;
+ dulint limit_lsn;
+
+ recovered_lsn = recv_sys->recovered_lsn;
+ limit_lsn = recv_sys->limit_lsn;
+
+ /* Read the last recovered log block to the recovery system buffer:
+ the block is always incomplete */
+
+ start_lsn = ut_dulint_align_down(recovered_lsn, OS_FILE_LOG_BLOCK_SIZE);
+ end_lsn = ut_dulint_align_up(recovered_lsn, OS_FILE_LOG_BLOCK_SIZE);
+
+ ut_ad(ut_dulint_cmp(start_lsn, end_lsn) != 0);
+
+ log_group_read_log_seg(LOG_RECOVER, recv_sys->last_block,
+ up_to_date_group, start_lsn, end_lsn);
+
+ group = UT_LIST_GET_FIRST(log_sys->log_groups);
+
+ while (group) {
+ if (group != up_to_date_group) {
+
+ /* Copy log data if needed */
+
+ recv_copy_group(group, up_to_date_group,
+ recovered_lsn);
+ }
+
+ /* Update the fields in the group struct to correspond to
+ recovered_lsn */
+
+ log_group_set_fields(group, recovered_lsn);
+
+ group = UT_LIST_GET_NEXT(log_groups, group);
+ }
+
+ /* Copy the checkpoint info to the groups; remember that we have
+ incremented checkpoint_no by one, and the info will not be written
+ over the max checkpoint info, thus making the preservation of max
+ checkpoint info on disk certain */
+
+ log_groups_write_checkpoint_info();
+
+ mutex_exit(&(log_sys->mutex));
+
+ /* Wait for the checkpoint write to complete */
+ rw_lock_s_lock(&(log_sys->checkpoint_lock));
+ rw_lock_s_unlock(&(log_sys->checkpoint_lock));
+
+ mutex_enter(&(log_sys->mutex));
+}
+
+/************************************************************
+Looks for the maximum consistent checkpoint from the log groups. */
+static
+ulint
+recv_find_max_checkpoint(
+/*=====================*/
+ /* out: error code or DB_SUCCESS */
+ log_group_t** max_group, /* out: max group */
+ ulint* max_field) /* out: LOG_CHECKPOINT_1 or
+ LOG_CHECKPOINT_2 */
+{
+ log_group_t* group;
+ dulint max_no;
+ dulint checkpoint_no;
+ ulint field;
+ ulint fold;
+ byte* buf;
+
+ group = UT_LIST_GET_FIRST(log_sys->log_groups);
+
+ max_no = ut_dulint_zero;
+ *max_group = NULL;
+
+ buf = log_sys->checkpoint_buf;
+
+ while (group) {
+ group->state = LOG_GROUP_CORRUPTED;
+
+ for (field = LOG_CHECKPOINT_1; field <= LOG_CHECKPOINT_2;
+ field += LOG_CHECKPOINT_2 - LOG_CHECKPOINT_1) {
+
+ log_group_read_checkpoint_info(group, field);
+
+ /* Check the consistency of the checkpoint info */
+ fold = ut_fold_binary(buf, LOG_CHECKPOINT_CHECKSUM_1);
+
+ if (fold != mach_read_from_4(buf
+ + LOG_CHECKPOINT_CHECKSUM_1)) {
+ if (log_debug_writes) {
+ fprintf(stderr,
+ "Innobase: Checkpoint in group %lu at %lu invalid\n",
+ group->id, field);
+ }
+
+ goto not_consistent;
+ }
+
+ fold = ut_fold_binary(buf + LOG_CHECKPOINT_LSN,
+ LOG_CHECKPOINT_CHECKSUM_2
+ - LOG_CHECKPOINT_LSN);
+ if (fold != mach_read_from_4(buf
+ + LOG_CHECKPOINT_CHECKSUM_2)) {
+ if (log_debug_writes) {
+ fprintf(stderr,
+ "Innobase: Checkpoint in group %lu at %lu invalid\n",
+ group->id, field);
+ }
+ goto not_consistent;
+ }
+
+ group->state = LOG_GROUP_OK;
+
+ group->lsn = mach_read_from_8(buf
+ + LOG_CHECKPOINT_LSN);
+ group->lsn_offset = mach_read_from_4(buf
+ + LOG_CHECKPOINT_OFFSET);
+ checkpoint_no =
+ mach_read_from_8(buf + LOG_CHECKPOINT_NO);
+
+ if (log_debug_writes) {
+ fprintf(stderr,
+ "Innobase: Checkpoint number %lu found in group %lu\n",
+ ut_dulint_get_low(checkpoint_no), group->id);
+ }
+
+ if (ut_dulint_cmp(checkpoint_no, max_no) >= 0) {
+ *max_group = group;
+ *max_field = field;
+ max_no = checkpoint_no;
+ }
+
+ not_consistent:
+ ;
+ }
+
+ group = UT_LIST_GET_NEXT(log_groups, group);
+ }
+
+ if (*max_group == NULL) {
+
+ if (log_debug_writes) {
+ fprintf(stderr,
+ "Innobase: No valid checkpoint found\n");
+ }
+
+ return(DB_ERROR);
+ }
+
+ return(DB_SUCCESS);
+}
+
+/***********************************************************************
+Tries to parse a single log record body and also applies it to a page if
+specified. */
+static
+byte*
+recv_parse_or_apply_log_rec_body(
+/*=============================*/
+ /* out: log record end, NULL if not a complete
+ record */
+ byte type, /* in: type */
+ byte* ptr, /* in: pointer to a buffer */
+ byte* end_ptr,/* in: pointer to the buffer end */
+ page_t* page, /* in: buffer page or NULL; if not NULL, then the log
+ record is applied to the page, and the log record
+ should be complete then */
+ mtr_t* mtr) /* in: mtr or NULL; should be non-NULL if and only if
+ page is non-NULL */
+{
+ byte* new_ptr;
+
+ if (type <= MLOG_8BYTES) {
+ new_ptr = mlog_parse_nbytes(type, ptr, end_ptr, page);
+
+ } else if (type == MLOG_REC_INSERT) {
+ new_ptr = page_cur_parse_insert_rec(FALSE, ptr, end_ptr, page,
+ mtr);
+ } else if (type == MLOG_REC_CLUST_DELETE_MARK) {
+ new_ptr = btr_cur_parse_del_mark_set_clust_rec(ptr, end_ptr,
+ page);
+ } else if (type == MLOG_REC_SEC_DELETE_MARK) {
+ new_ptr = btr_cur_parse_del_mark_set_sec_rec(ptr, end_ptr,
+ page);
+ } else if (type == MLOG_REC_UPDATE_IN_PLACE) {
+ new_ptr = btr_cur_parse_update_in_place(ptr, end_ptr, page);
+
+ } else if ((type == MLOG_LIST_END_DELETE)
+ || (type == MLOG_LIST_START_DELETE)) {
+ new_ptr = page_parse_delete_rec_list(type, ptr, end_ptr, page,
+ mtr);
+ } else if (type == MLOG_LIST_END_COPY_CREATED) {
+ new_ptr = page_parse_copy_rec_list_to_created_page(ptr,
+ end_ptr, page, mtr);
+ } else if (type == MLOG_PAGE_REORGANIZE) {
+ new_ptr = btr_parse_page_reorganize(ptr, end_ptr, page, mtr);
+
+ } else if (type == MLOG_PAGE_CREATE) {
+ new_ptr = page_parse_create(ptr, end_ptr, page, mtr);
+
+ } else if (type == MLOG_UNDO_INSERT) {
+ new_ptr = trx_undo_parse_add_undo_rec(ptr, end_ptr, page);
+
+ } else if (type == MLOG_UNDO_ERASE_END) {
+ new_ptr = trx_undo_parse_erase_page_end(ptr, end_ptr, page,
+ mtr);
+ } else if (type == MLOG_UNDO_INIT) {
+ new_ptr = trx_undo_parse_page_init(ptr, end_ptr, page, mtr);
+
+ } else if (type == MLOG_UNDO_HDR_DISCARD) {
+ new_ptr = trx_undo_parse_discard_latest(ptr, end_ptr, page,
+ mtr);
+ } else if ((type == MLOG_UNDO_HDR_CREATE)
+ || (type == MLOG_UNDO_HDR_REUSE)) {
+ new_ptr = trx_undo_parse_page_header(type, ptr, end_ptr, page,
+ mtr);
+ } else if (type == MLOG_REC_MIN_MARK) {
+ new_ptr = btr_parse_set_min_rec_mark(ptr, end_ptr, page, mtr);
+
+ } else if (type == MLOG_REC_DELETE) {
+ new_ptr = page_cur_parse_delete_rec(ptr, end_ptr, page, mtr);
+
+ } else if (type == MLOG_IBUF_BITMAP_INIT) {
+ new_ptr = ibuf_parse_bitmap_init(ptr, end_ptr, page, mtr);
+
+ } else if (type == MLOG_FULL_PAGE) {
+ new_ptr = mtr_log_parse_full_page(ptr, end_ptr, page);
+
+ } else if (type == MLOG_INIT_FILE_PAGE) {
+ new_ptr = fsp_parse_init_file_page(ptr, end_ptr, page);
+
+ } else if (type <= MLOG_WRITE_STRING) {
+ new_ptr = mlog_parse_string(ptr, end_ptr, page);
+ } else {
+ ut_error;
+ }
+
+ ut_ad(!page || new_ptr);
+
+ return(new_ptr);
+}
+
+/*************************************************************************
+Calculates the fold value of a page file address: used in inserting or
+searching for a log record in the hash table. */
+UNIV_INLINE
+ulint
+recv_fold(
+/*======*/
+ /* out: folded value */
+ ulint space, /* in: space */
+ ulint page_no)/* in: page number */
+{
+ return(ut_fold_ulint_pair(space, page_no));
+}
+
+/*************************************************************************
+Calculates the hash value of a page file address: used in inserting or
+searching for a log record in the hash table. */
+UNIV_INLINE
+ulint
+recv_hash(
+/*======*/
+ /* out: folded value */
+ ulint space, /* in: space */
+ ulint page_no)/* in: page number */
+{
+ return(hash_calc_hash(recv_fold(space, page_no), recv_sys->addr_hash));
+}
+
+/*************************************************************************
+Gets the hashed file address struct for a page. */
+static
+recv_addr_t*
+recv_get_fil_addr_struct(
+/*=====================*/
+ /* out: file address struct, NULL if not found from
+ the hash table */
+ ulint space, /* in: space id */
+ ulint page_no)/* in: page number */
+{
+ recv_addr_t* recv_addr;
+
+ recv_addr = HASH_GET_FIRST(recv_sys->addr_hash,
+ recv_hash(space, page_no));
+
+ while (recv_addr) {
+ if ((recv_addr->space == space)
+ && (recv_addr->page_no == page_no)) {
+
+ break;
+ }
+
+ recv_addr = HASH_GET_NEXT(addr_hash, recv_addr);
+ }
+
+ return(recv_addr);
+}
+
+/***********************************************************************
+Adds a new log record to the hash table of log records. */
+static
+void
+recv_add_to_hash_table(
+/*===================*/
+ byte type, /* in: log record type */
+ ulint space, /* in: space id */
+ ulint page_no, /* in: page number */
+ byte* body, /* in: log record body */
+ byte* rec_end, /* in: log record end */
+ dulint start_lsn, /* in: start lsn of the mtr */
+ dulint end_lsn) /* in: end lsn of the mtr */
+{
+ recv_t* recv;
+ ulint len;
+ recv_data_t* recv_data;
+ recv_data_t** prev_field;
+ recv_addr_t* recv_addr;
+
+ ut_a(space == 0); /* For debugging; TODO: remove this */
+
+ len = rec_end - body;
+
+ recv = mem_heap_alloc(recv_sys->heap, sizeof(recv_t));
+ recv->type = type;
+ recv->len = rec_end - body;
+ recv->start_lsn = start_lsn;
+ recv->end_lsn = end_lsn;
+
+ recv_addr = recv_get_fil_addr_struct(space, page_no);
+
+ if (recv_addr == NULL) {
+ recv_addr = mem_heap_alloc(recv_sys->heap,
+ sizeof(recv_addr_t));
+ recv_addr->space = space;
+ recv_addr->page_no = page_no;
+ recv_addr->state = RECV_NOT_PROCESSED;
+
+ UT_LIST_INIT(recv_addr->rec_list);
+
+ HASH_INSERT(recv_addr_t, addr_hash, recv_sys->addr_hash,
+ recv_fold(space, page_no), recv_addr);
+ recv_sys->n_addrs++;
+ }
+
+ UT_LIST_ADD_LAST(rec_list, recv_addr->rec_list, recv);
+
+ prev_field = &(recv->data);
+
+ /* Store the log record body in chunks of less than UNIV_PAGE_SIZE:
+ recv_sys->heap grows into the buffer pool, and bigger chunks could not
+ be allocated */
+
+ while (rec_end > body) {
+
+ len = rec_end - body;
+
+ if (len > RECV_DATA_BLOCK_SIZE) {
+ len = RECV_DATA_BLOCK_SIZE;
+ }
+
+ recv_data = mem_heap_alloc(recv_sys->heap,
+ sizeof(recv_data_t) + len);
+ *prev_field = recv_data;
+
+ ut_memcpy(((byte*)recv_data) + sizeof(recv_data_t), body, len);
+
+ prev_field = &(recv_data->next);
+
+ body += len;
+ }
+
+ *prev_field = NULL;
+}
+
+/*************************************************************************
+Copies the log record body from recv to buf. */
+static
+void
+recv_data_copy_to_buf(
+/*==================*/
+ byte* buf, /* in: buffer of length at least recv->len */
+ recv_t* recv) /* in: log record */
+{
+ recv_data_t* recv_data;
+ ulint part_len;
+ ulint len;
+
+ len = recv->len;
+ recv_data = recv->data;
+
+ while (len > 0) {
+ if (len > RECV_DATA_BLOCK_SIZE) {
+ part_len = RECV_DATA_BLOCK_SIZE;
+ } else {
+ part_len = len;
+ }
+
+ ut_memcpy(buf, ((byte*)recv_data) + sizeof(recv_data_t),
+ part_len);
+ buf += part_len;
+ len -= part_len;
+
+ recv_data = recv_data->next;
+ }
+}
+
+/****************************************************************************
+Applies the hashed log records to the page, if the page lsn is less than the
+lsn of a log record. This can be called when a buffer page has just been
+read in, or also for a page already in the buffer pool. */
+
+void
+recv_recover_page(
+/*==============*/
+ ibool just_read_in, /* in: TRUE if the i/o-handler calls this for
+ a freshly read page */
+ page_t* page, /* in: buffer page */
+ ulint space, /* in: space id */
+ ulint page_no) /* in: page number */
+{
+ buf_block_t* block;
+ recv_addr_t* recv_addr;
+ recv_t* recv;
+ byte* buf;
+ dulint start_lsn;
+ dulint end_lsn;
+ dulint page_lsn;
+ dulint page_newest_lsn;
+ ibool modification_to_page;
+ ibool success;
+ mtr_t mtr;
+
+ mutex_enter(&(recv_sys->mutex));
+
+ if (recv_sys->apply_log_recs == FALSE) {
+
+ /* Log records should not be applied now */
+
+ mutex_exit(&(recv_sys->mutex));
+
+ return;
+ }
+
+ recv_addr = recv_get_fil_addr_struct(space, page_no);
+
+ if ((recv_addr == NULL)
+ || (recv_addr->state == RECV_BEING_PROCESSED)
+ || (recv_addr->state == RECV_PROCESSED)) {
+
+ mutex_exit(&(recv_sys->mutex));
+
+ return;
+ }
+
+ recv_addr->state = RECV_BEING_PROCESSED;
+
+ mutex_exit(&(recv_sys->mutex));
+
+ block = buf_block_align(page);
+
+ if (just_read_in) {
+ /* Move the ownership of the x-latch on the page to this OS
+ thread, so that we can acquire a second x-latch on it. This
+ is needed for the operations to the page to pass the debug
+ checks. */
+
+ rw_lock_x_lock_move_ownership(&(block->lock));
+ }
+
+ mtr_start(&mtr);
+
+ mtr_set_log_mode(&mtr, MTR_LOG_NONE);
+
+ success = buf_page_get_known_nowait(RW_X_LATCH, page, BUF_KEEP_OLD,
+#ifdef UNIV_SYNC_DEBUG
+ __FILE__, __LINE__,
+#endif
+ &mtr);
+ ut_a(success);
+
+ buf_page_dbg_add_level(page, SYNC_NO_ORDER_CHECK);
+
+ /* Read the newest modification lsn from the page */
+ page_lsn = mach_read_from_8(page + FIL_PAGE_LSN);
+
+ /* It may be that the page has been modified in the buffer pool: read
+ the newest modification lsn there */
+
+ page_newest_lsn = buf_frame_get_newest_modification(page);
+
+ if (!ut_dulint_is_zero(page_newest_lsn)) {
+
+ page_lsn = page_newest_lsn;
+ }
+
+ modification_to_page = FALSE;
+
+ recv = UT_LIST_GET_FIRST(recv_addr->rec_list);
+
+ while (recv) {
+ end_lsn = recv->end_lsn;
+
+ if (recv->len > RECV_DATA_BLOCK_SIZE) {
+ /* We have to copy the record body to a separate
+ buffer */
+
+ buf = mem_alloc(recv->len);
+
+ recv_data_copy_to_buf(buf, recv);
+ } else {
+ buf = ((byte*)(recv->data)) + sizeof(recv_data_t);
+ }
+
+ if ((recv->type == MLOG_INIT_FILE_PAGE)
+ || (recv->type == MLOG_FULL_PAGE)) {
+ /* A new file page may has been taken into use,
+ or we have stored the full contents of the page:
+ in this case it may be that the original log record
+ type was MLOG_INIT_FILE_PAGE, and we replaced it
+ with MLOG_FULL_PAGE, thus to we have to apply
+ any record of type MLOG_FULL_PAGE */
+
+ page_lsn = page_newest_lsn;
+
+ mach_write_to_8(page + UNIV_PAGE_SIZE
+ - FIL_PAGE_END_LSN, ut_dulint_zero);
+ mach_write_to_8(page + FIL_PAGE_LSN, ut_dulint_zero);
+ }
+
+ if (ut_dulint_cmp(recv->start_lsn, page_lsn) >= 0) {
+
+ if (!modification_to_page) {
+
+ modification_to_page = TRUE;
+ start_lsn = recv->start_lsn;
+ }
+
+ if (log_debug_writes) {
+ fprintf(stderr,
+ "Innobase: Applying log rec type %lu len %lu to space %lu page no %lu\n",
+ recv->type, recv->len, recv_addr->space,
+ recv_addr->page_no);
+ }
+
+ recv_parse_or_apply_log_rec_body(recv->type, buf,
+ buf + recv->len, page, &mtr);
+ }
+
+ if (recv->len > RECV_DATA_BLOCK_SIZE) {
+ mem_free(buf);
+ }
+
+ recv = UT_LIST_GET_NEXT(rec_list, recv);
+ }
+
+ /* If the following assert fails, the file page is incompletely
+ written, and a recovery from a backup is required */
+
+ ut_a(0 == ut_dulint_cmp(mach_read_from_8(page + FIL_PAGE_LSN),
+ mach_read_from_8(page + UNIV_PAGE_SIZE
+ - FIL_PAGE_END_LSN)));
+ mutex_enter(&(recv_sys->mutex));
+
+ recv_addr->state = RECV_PROCESSED;
+
+ ut_a(recv_sys->n_addrs);
+ recv_sys->n_addrs--;
+
+ mutex_exit(&(recv_sys->mutex));
+
+ if (modification_to_page) {
+ buf_flush_recv_note_modification(block, start_lsn, end_lsn);
+ }
+
+ /* Make sure that committing mtr does not change the modification
+ lsn values of page */
+
+ mtr.modifications = FALSE;
+
+ mtr_commit(&mtr);
+}
+
+/***********************************************************************
+Reads in pages which have hashed log records, from an area around a given
+page number. */
+static
+ulint
+recv_read_in_area(
+/*==============*/
+ /* out: number of pages found */
+ ulint space, /* in: space */
+ ulint page_no)/* in: page number */
+{
+ recv_addr_t* recv_addr;
+ ulint page_nos[RECV_READ_AHEAD_AREA];
+ ulint low_limit;
+ ulint n;
+
+ low_limit = page_no - (page_no % RECV_READ_AHEAD_AREA);
+
+ n = 0;
+
+ for (page_no = low_limit; page_no < low_limit + RECV_READ_AHEAD_AREA;
+ page_no++) {
+ recv_addr = recv_get_fil_addr_struct(space, page_no);
+
+ if (recv_addr && !buf_page_peek(space, page_no)) {
+
+ mutex_enter(&(recv_sys->mutex));
+
+ if (recv_addr->state == RECV_NOT_PROCESSED) {
+ recv_addr->state = RECV_BEING_READ;
+
+ page_nos[n] = page_no;
+
+ n++;
+ }
+
+ mutex_exit(&(recv_sys->mutex));
+ }
+ }
+
+ buf_read_recv_pages(FALSE, space, page_nos, n);
+
+ /* printf("Recv pages at %lu n %lu\n", page_nos[0], n); */
+
+ return(n);
+}
+
+/***********************************************************************
+Empties the hash table of stored log records, applying them to appropriate
+pages. */
+
+void
+recv_apply_hashed_log_recs(
+/*=======================*/
+ ibool allow_ibuf) /* in: if TRUE, also ibuf operations are
+ allowed during the application; if FALSE,
+ no ibuf operations are allowed, and after
+ the application all file pages are flushed to
+ disk and invalidated in buffer pool: this
+ alternative means that no new log records
+ can be generated during the application;
+ the caller must in this case own the log
+ mutex */
+{
+ recv_addr_t* recv_addr;
+ page_t* page;
+ ulint i;
+ ulint space;
+ ulint page_no;
+ ulint n_pages;
+ ibool has_printed = FALSE;
+ mtr_t mtr;
+loop:
+ mutex_enter(&(recv_sys->mutex));
+
+ if (recv_sys->apply_batch_on) {
+
+ mutex_exit(&(recv_sys->mutex));
+
+ os_thread_sleep(500000);
+
+ goto loop;
+ }
+
+ if (!allow_ibuf) {
+ ut_ad(mutex_own(&(log_sys->mutex)));
+
+ recv_no_ibuf_operations = TRUE;
+ } else {
+ ut_ad(!mutex_own(&(log_sys->mutex)));
+ }
+
+ recv_sys->apply_log_recs = TRUE;
+ recv_sys->apply_batch_on = TRUE;
+
+ for (i = 0; i < hash_get_n_cells(recv_sys->addr_hash); i++) {
+
+ recv_addr = HASH_GET_FIRST(recv_sys->addr_hash, i);
+
+ while (recv_addr) {
+ space = recv_addr->space;
+ page_no = recv_addr->page_no;
+
+ if (recv_addr->state == RECV_NOT_PROCESSED) {
+ if (!has_printed) {
+ fprintf(stderr,
+"Innobase: Starting an apply batch of log records to the database...\n");
+ has_printed = TRUE;
+ }
+
+ mutex_exit(&(recv_sys->mutex));
+
+ if (buf_page_peek(space, page_no)) {
+
+ mtr_start(&mtr);
+
+ page = buf_page_get(space, page_no,
+ RW_X_LATCH, &mtr);
+
+ buf_page_dbg_add_level(page,
+ SYNC_NO_ORDER_CHECK);
+ recv_recover_page(FALSE, page, space,
+ page_no);
+ mtr_commit(&mtr);
+ } else {
+ recv_read_in_area(space, page_no);
+ }
+
+ mutex_enter(&(recv_sys->mutex));
+ }
+
+ recv_addr = HASH_GET_NEXT(addr_hash, recv_addr);
+ }
+ }
+
+ /* Wait until all the pages have been processed */
+
+ while (recv_sys->n_addrs != 0) {
+
+ mutex_exit(&(recv_sys->mutex));
+
+ os_thread_sleep(500000);
+
+ mutex_enter(&(recv_sys->mutex));
+ }
+
+ if (!allow_ibuf) {
+ /* Flush all the file pages to disk and invalidate them in
+ the buffer pool */
+
+ mutex_exit(&(recv_sys->mutex));
+ mutex_exit(&(log_sys->mutex));
+
+ n_pages = buf_flush_batch(BUF_FLUSH_LIST, ULINT_MAX,
+ ut_dulint_max);
+ ut_a(n_pages != ULINT_UNDEFINED);
+
+ buf_flush_wait_batch_end(BUF_FLUSH_LIST);
+
+ buf_pool_invalidate();
+
+ mutex_enter(&(log_sys->mutex));
+ mutex_enter(&(recv_sys->mutex));
+
+ recv_no_ibuf_operations = FALSE;
+ }
+
+ recv_sys->apply_log_recs = FALSE;
+ recv_sys->apply_batch_on = FALSE;
+
+ recv_sys_empty_hash();
+
+ if (has_printed) {
+ fprintf(stderr, "Innobase: Apply batch completed\n");
+ }
+
+ mutex_exit(&(recv_sys->mutex));
+}
+
+/***********************************************************************
+In the debug version, updates the replica of a file page, based on a log
+record. */
+static
+void
+recv_update_replicate(
+/*==================*/
+ byte type, /* in: log record type */
+ ulint space, /* in: space id */
+ ulint page_no,/* in: page number */
+ byte* body, /* in: log record body */
+ byte* end_ptr)/* in: log record end */
+{
+ page_t* replica;
+ mtr_t mtr;
+ byte* ptr;
+
+ mtr_start(&mtr);
+
+ mtr_set_log_mode(&mtr, MTR_LOG_NONE);
+
+ replica = buf_page_get(space + RECV_REPLICA_SPACE_ADD, page_no,
+ RW_X_LATCH, &mtr);
+ buf_page_dbg_add_level(replica, SYNC_NO_ORDER_CHECK);
+
+ ptr = recv_parse_or_apply_log_rec_body(type, body, end_ptr, replica,
+ &mtr);
+ ut_a(ptr == end_ptr);
+
+ /* Notify the buffer manager that the page has been updated */
+
+ buf_flush_recv_note_modification(buf_block_align(replica),
+ log_sys->old_lsn, log_sys->old_lsn);
+
+ /* Make sure that committing mtr does not call log routines, as
+ we currently own the log mutex */
+
+ mtr.modifications = FALSE;
+
+ mtr_commit(&mtr);
+}
+
+/***********************************************************************
+Checks that two strings are identical. */
+static
+void
+recv_check_identical(
+/*=================*/
+ byte* str1, /* in: first string */
+ byte* str2, /* in: second string */
+ ulint len) /* in: length of strings */
+{
+ ulint i;
+
+ for (i = 0; i < len; i++) {
+
+ if (str1[i] != str2[i]) {
+ fprintf(stderr, "Strings do not match at offset %lu\n", i);
+
+ ut_print_buf(str1 + i, 16);
+ fprintf(stderr, "\n");
+ ut_print_buf(str2 + i, 16);
+
+ ut_error;
+ }
+ }
+}
+
+/***********************************************************************
+In the debug version, checks that the replica of a file page is identical
+to the original page. */
+static
+void
+recv_compare_replicate(
+/*===================*/
+ ulint space, /* in: space id */
+ ulint page_no)/* in: page number */
+{
+ page_t* replica;
+ page_t* page;
+ mtr_t mtr;
+
+ mtr_start(&mtr);
+
+ mutex_enter(&(buf_pool->mutex));
+
+ page = buf_page_hash_get(space, page_no)->frame;
+
+ mutex_exit(&(buf_pool->mutex));
+
+ replica = buf_page_get(space + RECV_REPLICA_SPACE_ADD, page_no,
+ RW_X_LATCH, &mtr);
+ buf_page_dbg_add_level(replica, SYNC_NO_ORDER_CHECK);
+
+ recv_check_identical(page + FIL_PAGE_DATA,
+ replica + FIL_PAGE_DATA,
+ PAGE_HEADER + PAGE_MAX_TRX_ID - FIL_PAGE_DATA);
+
+ recv_check_identical(page + PAGE_HEADER + PAGE_MAX_TRX_ID + 8,
+ replica + PAGE_HEADER + PAGE_MAX_TRX_ID + 8,
+ UNIV_PAGE_SIZE - FIL_PAGE_DATA_END
+ - PAGE_HEADER - PAGE_MAX_TRX_ID - 8);
+ mtr_commit(&mtr);
+}
+
+/***********************************************************************
+Checks that a replica of a space is identical to the original space. */
+
+void
+recv_compare_spaces(
+/*================*/
+ ulint space1, /* in: space id */
+ ulint space2, /* in: space id */
+ ulint n_pages)/* in: number of pages */
+{
+ page_t* replica;
+ page_t* page;
+ mtr_t mtr;
+ page_t* frame;
+ ulint page_no;
+
+ replica = buf_frame_alloc();
+ page = buf_frame_alloc();
+
+ for (page_no = 0; page_no < n_pages; page_no++) {
+
+ mtr_start(&mtr);
+
+ frame = buf_page_get_gen(space1, page_no, RW_S_LATCH, NULL,
+ BUF_GET_IF_IN_POOL,
+#ifdef UNIV_SYNC_DEBUG
+ __FILE__, __LINE__,
+#endif
+ &mtr);
+ if (frame) {
+ buf_page_dbg_add_level(frame, SYNC_NO_ORDER_CHECK);
+ ut_memcpy(page, frame, UNIV_PAGE_SIZE);
+ } else {
+ /* Read it from file */
+ fil_io(OS_FILE_READ, TRUE, space1, page_no, 0,
+ UNIV_PAGE_SIZE, page, NULL);
+ }
+
+ frame = buf_page_get_gen(space2, page_no, RW_S_LATCH, NULL,
+ BUF_GET_IF_IN_POOL,
+#ifdef UNIV_SYNC_DEBUG
+ __FILE__, __LINE__,
+#endif
+ &mtr);
+ if (frame) {
+ buf_page_dbg_add_level(frame, SYNC_NO_ORDER_CHECK);
+ ut_memcpy(replica, frame, UNIV_PAGE_SIZE);
+ } else {
+ /* Read it from file */
+ fil_io(OS_FILE_READ, TRUE, space2, page_no, 0,
+ UNIV_PAGE_SIZE, replica, NULL);
+ }
+
+ recv_check_identical(page + FIL_PAGE_DATA,
+ replica + FIL_PAGE_DATA,
+ PAGE_HEADER + PAGE_MAX_TRX_ID - FIL_PAGE_DATA);
+
+ recv_check_identical(page + PAGE_HEADER + PAGE_MAX_TRX_ID + 8,
+ replica + PAGE_HEADER + PAGE_MAX_TRX_ID + 8,
+ UNIV_PAGE_SIZE - FIL_PAGE_DATA_END
+ - PAGE_HEADER - PAGE_MAX_TRX_ID - 8);
+
+ mtr_commit(&mtr);
+ }
+
+ buf_frame_free(replica);
+ buf_frame_free(page);
+}
+
+/***********************************************************************
+Checks that a replica of a space is identical to the original space. Disables
+ibuf operations and flushes and invalidates the buffer pool pages after the
+test. This function can be used to check the recovery before dict or trx
+systems are initialized. */
+
+void
+recv_compare_spaces_low(
+/*====================*/
+ ulint space1, /* in: space id */
+ ulint space2, /* in: space id */
+ ulint n_pages)/* in: number of pages */
+{
+ mutex_enter(&(log_sys->mutex));
+
+ recv_apply_hashed_log_recs(FALSE);
+
+ mutex_exit(&(log_sys->mutex));
+
+ recv_compare_spaces(space1, space2, n_pages);
+}
+
+/***********************************************************************
+Tries to parse a single log record and returns its length. */
+static
+ulint
+recv_parse_log_rec(
+/*===============*/
+ /* out: length of the record, or 0 if the record was
+ not complete */
+ byte* ptr, /* in: pointer to a buffer */
+ byte* end_ptr,/* in: pointer to the buffer end */
+ byte* type, /* out: type */
+ ulint* space, /* out: space id */
+ ulint* page_no,/* out: page number */
+ byte** body) /* out: log record body start */
+{
+ byte* new_ptr;
+
+ if (ptr == end_ptr) {
+
+ return(0);
+ }
+
+ if (*ptr == MLOG_MULTI_REC_END) {
+
+ *type = *ptr;
+
+ return(1);
+ }
+
+ if (*ptr == MLOG_DUMMY_RECORD) {
+ *type = *ptr;
+
+ *space = 1000; /* For debugging */
+
+ return(1);
+ }
+
+ new_ptr = mlog_parse_initial_log_record(ptr, end_ptr, type, space,
+ page_no);
+ if (!new_ptr) {
+
+ return(0);
+ }
+
+ *body = new_ptr;
+
+ new_ptr = recv_parse_or_apply_log_rec_body(*type, new_ptr, end_ptr,
+ NULL, NULL);
+ if (new_ptr == NULL) {
+
+ return(0);
+ }
+
+ return(new_ptr - ptr);
+}
+
+/***********************************************************
+Calculates the new value for lsn when more data is added to the log. */
+static
+dulint
+recv_calc_lsn_on_data_add(
+/*======================*/
+ dulint lsn, /* in: old lsn */
+ ulint len) /* in: this many bytes of data is added, log block
+ headers not included */
+{
+ ulint frag_len;
+ ulint lsn_len;
+
+ frag_len = (ut_dulint_get_low(lsn) % OS_FILE_LOG_BLOCK_SIZE)
+ - LOG_BLOCK_HDR_SIZE;
+ ut_ad(frag_len < OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_HDR_SIZE
+ - LOG_BLOCK_TRL_SIZE);
+ lsn_len = len + ((len + frag_len)
+ / (OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_HDR_SIZE
+ - LOG_BLOCK_TRL_SIZE))
+ * (LOG_BLOCK_HDR_SIZE + LOG_BLOCK_TRL_SIZE);
+
+ return(ut_dulint_add(lsn, lsn_len));
+}
+
+/***********************************************************
+Checks that the parser recognizes incomplete initial segments of a log
+record as incomplete. */
+
+void
+recv_check_incomplete_log_recs(
+/*===========================*/
+ byte* ptr, /* in: pointer to a complete log record */
+ ulint len) /* in: length of the log record */
+{
+ ulint i;
+ byte type;
+ ulint space;
+ ulint page_no;
+ byte* body;
+
+ for (i = 0; i < len; i++) {
+ ut_a(0 == recv_parse_log_rec(ptr, ptr + i, &type, &space,
+ &page_no, &body));
+ }
+}
+
+/***********************************************************
+Parses log records from a buffer and stores them to a hash table to wait
+merging to file pages. If the hash table becomes too full, applies it
+automatically to file pages. */
+
+void
+recv_parse_log_recs(
+/*================*/
+ ibool store_to_hash) /* in: TRUE if the records should be stored
+ to the hash table; this is set to FALSE if just
+ debug checking is needed */
+{
+ byte* ptr;
+ byte* end_ptr;
+ ulint single_rec;
+ ulint len;
+ ulint total_len;
+ dulint new_recovered_lsn;
+ dulint old_lsn;
+ byte type;
+ ulint space;
+ ulint page_no;
+ byte* body;
+ ulint n_recs;
+
+ ut_ad(mutex_own(&(log_sys->mutex)));
+ ut_ad(!ut_dulint_is_zero(recv_sys->parse_start_lsn));
+loop:
+ ptr = recv_sys->buf + recv_sys->recovered_offset;
+
+ end_ptr = recv_sys->buf + recv_sys->len;
+
+ if (ptr == end_ptr) {
+
+ return;
+ }
+
+ single_rec = (ulint)*ptr & MLOG_SINGLE_REC_FLAG;
+
+ if (single_rec || *ptr == MLOG_DUMMY_RECORD) {
+ /* The mtr only modified a single page */
+
+ old_lsn = recv_sys->recovered_lsn;
+
+ len = recv_parse_log_rec(ptr, end_ptr, &type, &space,
+ &page_no, &body);
+ if (len == 0) {
+
+ return;
+ }
+
+ new_recovered_lsn = recv_calc_lsn_on_data_add(old_lsn, len);
+
+ if (ut_dulint_cmp(new_recovered_lsn, recv_sys->scanned_lsn)
+ > 0) {
+ /* The log record filled a log block, and we require
+ that also the next log block should have been scanned
+ in */
+
+ return;
+ }
+
+ recv_sys->recovered_offset += len;
+ recv_sys->recovered_lsn = new_recovered_lsn;
+
+ if (log_debug_writes) {
+ fprintf(stderr,
+"Innobase: Parsed a single log rec type %lu len %lu space %lu page no %lu\n",
+ type, len, space, page_no);
+ }
+
+ if (type == MLOG_DUMMY_RECORD) {
+ /* Do nothing */
+
+ } else if (store_to_hash) {
+ recv_add_to_hash_table(type, space, page_no, body,
+ ptr + len, old_lsn,
+ recv_sys->recovered_lsn);
+ } else {
+ /* In debug checking, update a replicate page
+ according to the log record, and check that it
+ becomes identical with the original page */
+#ifdef UNIV_LOG_DEBUG
+ recv_check_incomplete_log_recs(ptr, len);
+#endif
+ recv_update_replicate(type, space, page_no, body,
+ ptr + len);
+ recv_compare_replicate(space, page_no);
+ }
+ } else {
+ /* Check that all the records associated with the single mtr
+ are included within the buffer */
+
+ total_len = 0;
+ n_recs = 0;
+
+ for (;;) {
+ len = recv_parse_log_rec(ptr, end_ptr, &type, &space,
+ &page_no, &body);
+ if (len == 0) {
+
+ return;
+ }
+
+ if ((!store_to_hash) && (type != MLOG_MULTI_REC_END)) {
+ /* In debug checking, update a replicate page
+ according to the log record */
+#ifdef UNIV_LOG_DEBUG
+ recv_check_incomplete_log_recs(ptr, len);
+#endif
+ recv_update_replicate(type, space, page_no,
+ body, ptr + len);
+ }
+
+ if (log_debug_writes) {
+ fprintf(stderr,
+"Innobase: Parsed a multi log rec type %lu len %lu space %lu page no %lu\n",
+ type, len, space, page_no);
+ }
+
+ total_len += len;
+ n_recs++;
+
+ ptr += len;
+
+ if (type == MLOG_MULTI_REC_END) {
+
+ /* Found the end mark for the records */
+
+ break;
+ }
+ }
+
+ new_recovered_lsn = recv_calc_lsn_on_data_add(
+ recv_sys->recovered_lsn, total_len);
+
+ if (ut_dulint_cmp(new_recovered_lsn, recv_sys->scanned_lsn)
+ > 0) {
+ /* The log record filled a log block, and we require
+ that also the next log block should have been scanned
+ in */
+
+ return;
+ }
+
+ if (2 * n_recs * (sizeof(recv_t) + sizeof(recv_addr_t))
+ + total_len
+ + mem_heap_get_size(recv_sys->heap)
+ + RECV_POOL_N_FREE_BLOCKS * UNIV_PAGE_SIZE
+ > buf_pool_get_curr_size()) {
+
+ /* Hash table of log records will grow too big:
+ empty it */
+
+ recv_apply_hashed_log_recs(FALSE);
+ }
+
+ ut_ad(2 * n_recs * (sizeof(recv_t) + sizeof(recv_addr_t))
+ + total_len
+ + mem_heap_get_size(recv_sys->heap)
+ + RECV_POOL_N_FREE_BLOCKS * UNIV_PAGE_SIZE
+ < buf_pool_get_curr_size());
+
+ /* Add all the records to the hash table */
+
+ ptr = recv_sys->buf + recv_sys->recovered_offset;
+
+ for (;;) {
+ old_lsn = recv_sys->recovered_lsn;
+ len = recv_parse_log_rec(ptr, end_ptr, &type, &space,
+ &page_no, &body);
+ ut_a(len != 0);
+ ut_a(0 == ((ulint)*ptr & MLOG_SINGLE_REC_FLAG));
+
+ recv_sys->recovered_offset += len;
+ recv_sys->recovered_lsn = recv_calc_lsn_on_data_add(
+ old_lsn, len);
+ if (type == MLOG_MULTI_REC_END) {
+
+ /* Found the end mark for the records */
+
+ break;
+ }
+
+ if (store_to_hash) {
+ recv_add_to_hash_table(type, space, page_no,
+ body, ptr + len, old_lsn,
+ new_recovered_lsn);
+ } else {
+ /* In debug checking, check that the replicate
+ page has become identical with the original
+ page */
+
+ recv_compare_replicate(space, page_no);
+ }
+
+ ptr += len;
+ }
+ }
+
+ if (store_to_hash && buf_get_free_list_len()
+ < RECV_POOL_N_FREE_BLOCKS) {
+
+ /* Hash table of log records has grown too big: empty it;
+ FALSE means no ibuf operations allowed, as we cannot add
+ new records to the log yet: they would be produced by ibuf
+ operations */
+
+ recv_apply_hashed_log_recs(FALSE);
+ }
+
+ goto loop;
+}
+
+/***********************************************************
+Adds data from a new log block to the parsing buffer of recv_sys if
+recv_sys->parse_start_lsn is non-zero. */
+static
+ibool
+recv_sys_add_to_parsing_buf(
+/*========================*/
+ /* out: TRUE if more data added */
+ byte* log_block, /* in: log block */
+ dulint scanned_lsn) /* in: lsn of how far we were able to find
+ data in this log block */
+{
+ ulint more_len;
+ ulint data_len;
+ ulint start_offset;
+ ulint end_offset;
+
+ ut_ad(ut_dulint_cmp(scanned_lsn, recv_sys->scanned_lsn) >= 0);
+
+ if (ut_dulint_is_zero(recv_sys->parse_start_lsn)) {
+ /* Cannot start parsing yet because no start point for
+ it found */
+
+ return(FALSE);
+ }
+
+ data_len = log_block_get_data_len(log_block);
+
+ if (ut_dulint_cmp(recv_sys->parse_start_lsn, scanned_lsn) >= 0) {
+
+ return(FALSE);
+
+ } else if (ut_dulint_cmp(recv_sys->scanned_lsn, scanned_lsn) >= 0) {
+
+ return(FALSE);
+
+ } else if (ut_dulint_cmp(recv_sys->parse_start_lsn,
+ recv_sys->scanned_lsn) > 0) {
+ more_len = ut_dulint_minus(scanned_lsn,
+ recv_sys->parse_start_lsn);
+ } else {
+ more_len = ut_dulint_minus(scanned_lsn, recv_sys->scanned_lsn);
+ }
+
+ if (more_len == 0) {
+
+ return(FALSE);
+ }
+
+ ut_ad(data_len >= more_len);
+
+ start_offset = data_len - more_len;
+
+ if (start_offset < LOG_BLOCK_HDR_SIZE) {
+ start_offset = LOG_BLOCK_HDR_SIZE;
+ }
+
+ end_offset = data_len;
+
+ if (end_offset > OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_TRL_SIZE) {
+ end_offset = OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_TRL_SIZE;
+ }
+
+ ut_ad(start_offset <= end_offset);
+
+ if (start_offset < end_offset) {
+ ut_memcpy(recv_sys->buf + recv_sys->len,
+ log_block + start_offset, end_offset - start_offset);
+
+ recv_sys->len += end_offset - start_offset;
+
+ ut_ad(recv_sys->len <= RECV_PARSING_BUF_SIZE);
+ }
+
+ return(TRUE);
+}
+
+/***********************************************************
+Moves the parsing buffer data left to the buffer start. */
+static
+void
+recv_sys_justify_left_parsing_buf(void)
+/*===================================*/
+{
+ ut_memmove(recv_sys->buf, recv_sys->buf + recv_sys->recovered_offset,
+ recv_sys->len - recv_sys->recovered_offset);
+
+ recv_sys->len -= recv_sys->recovered_offset;
+
+ recv_sys->recovered_offset = 0;
+}
+
+/***********************************************************
+Scans log from a buffer and stores new log data to the parsing buffer. Parses
+and hashes the log records if new data found. */
+
+ibool
+recv_scan_log_recs(
+/*===============*/
+ /* out: TRUE if limit_lsn has been reached, or
+ not able to scan any more in this log group */
+ ibool store_to_hash, /* in: TRUE if the records should be stored
+ to the hash table; this is set to FALSE if just
+ debug checking is needed */
+ byte* buf, /* in: buffer containing a log segment or
+ garbage */
+ ulint len, /* in: buffer length */
+ dulint start_lsn, /* in: buffer start lsn */
+ dulint* contiguous_lsn, /* in/out: it is known that all log groups
+ contain contiguous log data up to this lsn */
+ dulint* group_scanned_lsn)/* out: scanning succeeded up to this lsn */
+{
+ byte* log_block;
+ ulint no;
+ dulint scanned_lsn;
+ ibool finished;
+ ulint data_len;
+ ibool more_data;
+
+ ut_ad(ut_dulint_get_low(start_lsn) % OS_FILE_LOG_BLOCK_SIZE == 0);
+ ut_ad(len % OS_FILE_LOG_BLOCK_SIZE == 0);
+ ut_ad(len > 0);
+
+ finished = FALSE;
+
+ log_block = buf;
+ scanned_lsn = start_lsn;
+ more_data = FALSE;
+
+ while (log_block < buf + len && !finished) {
+
+ no = log_block_get_hdr_no(log_block);
+
+ /* fprintf(stderr, "Log block header no %lu\n", no); */
+
+ if (no != log_block_get_trl_no(log_block)
+ || no != log_block_convert_lsn_to_no(scanned_lsn)) {
+
+ /* Garbage or an incompletely written log block */
+
+ finished = TRUE;
+
+ break;
+ }
+
+ if (log_block_get_flush_bit(log_block)) {
+ /* This block was a start of a log flush operation:
+ we know that the previous flush operation must have
+ been completed for all log groups before this block
+ can have been flushed to any of the groups. Therefore,
+ we know that log data is contiguous up to scanned_lsn
+ in all non-corrupt log groups. */
+
+ if (ut_dulint_cmp(scanned_lsn, *contiguous_lsn) > 0) {
+ *contiguous_lsn = scanned_lsn;
+ }
+ }
+
+ data_len = log_block_get_data_len(log_block);
+
+ if ((store_to_hash || (data_len == OS_FILE_LOG_BLOCK_SIZE))
+ && (ut_dulint_cmp(ut_dulint_add(scanned_lsn, data_len),
+ recv_sys->scanned_lsn) > 0)
+ && (recv_sys->scanned_checkpoint_no > 0)
+ && (log_block_get_checkpoint_no(log_block)
+ < recv_sys->scanned_checkpoint_no)
+ && (recv_sys->scanned_checkpoint_no
+ - log_block_get_checkpoint_no(log_block)
+ > 0x80000000)) {
+
+ /* Garbage from a log buffer flush which was made
+ before the most recent database recovery */
+
+ finished = TRUE;
+#ifdef UNIV_LOG_DEBUG
+ /* This is not really an error, but currently
+ we stop here in the debug version: */
+
+ ut_error;
+#endif
+ break;
+ }
+
+ if (ut_dulint_is_zero(recv_sys->parse_start_lsn)
+ && (log_block_get_first_rec_group(log_block) > 0)) {
+
+ /* We found a point from which to start the parsing
+ of log records */
+
+ recv_sys->parse_start_lsn =
+ ut_dulint_add(scanned_lsn,
+ log_block_get_first_rec_group(log_block));
+ recv_sys->scanned_lsn = recv_sys->parse_start_lsn;
+ recv_sys->recovered_lsn = recv_sys->parse_start_lsn;
+ }
+
+ scanned_lsn = ut_dulint_add(scanned_lsn, data_len);
+
+ if (ut_dulint_cmp(scanned_lsn, recv_sys->scanned_lsn) > 0) {
+
+ /* We were able to find more log data: add it to the
+ parsing buffer if parse_start_lsn is already non-zero */
+
+ more_data = recv_sys_add_to_parsing_buf(log_block,
+ scanned_lsn);
+ recv_sys->scanned_lsn = scanned_lsn;
+ recv_sys->scanned_checkpoint_no =
+ log_block_get_checkpoint_no(log_block);
+ }
+
+ if (data_len < OS_FILE_LOG_BLOCK_SIZE) {
+ /* Log data for this group ends here */
+
+ finished = TRUE;
+ } else {
+ log_block += OS_FILE_LOG_BLOCK_SIZE;
+ }
+ }
+
+ *group_scanned_lsn = scanned_lsn;
+
+ if (more_data) {
+ fprintf(stderr,
+"Innobase: Doing recovery: scanned up to log sequence number %lu %lu\n",
+ ut_dulint_get_high(*group_scanned_lsn),
+ ut_dulint_get_low(*group_scanned_lsn));
+
+ /* Try to parse more log records */
+
+ recv_parse_log_recs(store_to_hash);
+
+ if (recv_sys->recovered_offset > RECV_PARSING_BUF_SIZE / 4) {
+ /* Move parsing buffer data to the buffer start */
+
+ recv_sys_justify_left_parsing_buf();
+ }
+ }
+
+ return(finished);
+}
+
+/***********************************************************
+Scans log from a buffer and stores new log data to the parsing buffer. Parses
+and hashes the log records if new data found. */
+static
+void
+recv_group_scan_log_recs(
+/*=====================*/
+ log_group_t* group, /* in: log group */
+ dulint* contiguous_lsn, /* in/out: it is known that all log groups
+ contain contiguous log data up to this lsn */
+ dulint* group_scanned_lsn)/* out: scanning succeeded up to this lsn */
+{
+ ibool finished;
+ dulint start_lsn;
+ dulint end_lsn;
+
+ finished = FALSE;
+
+ start_lsn = *contiguous_lsn;
+
+ while (!finished) {
+ end_lsn = ut_dulint_add(start_lsn, RECV_SCAN_SIZE);
+
+ log_group_read_log_seg(LOG_RECOVER, log_sys->buf,
+ group, start_lsn, end_lsn);
+
+ finished = recv_scan_log_recs(TRUE, log_sys->buf,
+ RECV_SCAN_SIZE, start_lsn,
+ contiguous_lsn,
+ group_scanned_lsn);
+ start_lsn = end_lsn;
+ }
+
+ if (log_debug_writes) {
+ fprintf(stderr,
+ "Innobase: Scanned group %lu up to log sequence number %lu %lu\n",
+ group->id,
+ ut_dulint_get_high(*group_scanned_lsn),
+ ut_dulint_get_low(*group_scanned_lsn));
+ }
+}
+
+/************************************************************
+Recovers from a checkpoint. When this function returns, the database is able
+to start processing of new user transactions, but the function
+recv_recovery_from_checkpoint_finish should be called later to complete
+the recovery and free the resources used in it. */
+
+ulint
+recv_recovery_from_checkpoint_start(
+/*================================*/
+ /* out: error code or DB_SUCCESS */
+ ulint type, /* in: LOG_CHECKPOINT or LOG_ARCHIVE */
+ dulint limit_lsn, /* in: recover up to this lsn if possible */
+ dulint min_flushed_lsn,/* in: min flushed lsn from data files */
+ dulint max_flushed_lsn)/* in: max flushed lsn from data files */
+{
+ log_group_t* group;
+ log_group_t* max_cp_group;
+ log_group_t* up_to_date_group;
+ ulint max_cp_field;
+ dulint checkpoint_lsn;
+ dulint checkpoint_no;
+ dulint old_scanned_lsn;
+ dulint group_scanned_lsn;
+ dulint contiguous_lsn;
+ dulint archived_lsn;
+ ulint capacity;
+ byte* buf;
+ ulint err;
+
+ ut_ad((type != LOG_CHECKPOINT)
+ || (ut_dulint_cmp(limit_lsn, ut_dulint_max) == 0));
+
+ if (type == LOG_CHECKPOINT) {
+
+ recv_sys_create();
+ recv_sys_init();
+ }
+
+ sync_order_checks_on = TRUE;
+
+ recv_recovery_on = TRUE;
+
+ recv_sys->limit_lsn = limit_lsn;
+
+ mutex_enter(&(log_sys->mutex));
+
+ /* Look for the latest checkpoint from any of the log groups */
+
+ err = recv_find_max_checkpoint(&max_cp_group, &max_cp_field);
+
+ if (err != DB_SUCCESS) {
+
+ mutex_exit(&(log_sys->mutex));
+
+ return(err);
+ }
+
+ log_group_read_checkpoint_info(max_cp_group, max_cp_field);
+
+ buf = log_sys->checkpoint_buf;
+
+ checkpoint_lsn = mach_read_from_8(buf + LOG_CHECKPOINT_LSN);
+ checkpoint_no = mach_read_from_8(buf + LOG_CHECKPOINT_NO);
+ archived_lsn = mach_read_from_8(buf + LOG_CHECKPOINT_ARCHIVED_LSN);
+
+ group = UT_LIST_GET_FIRST(log_sys->log_groups);
+
+ while (group) {
+ log_checkpoint_get_nth_group_info(buf, group->id,
+ &(group->archived_file_no),
+ &(group->archived_offset));
+
+ group = UT_LIST_GET_NEXT(log_groups, group);
+ }
+
+ if (type == LOG_CHECKPOINT) {
+ /* Start reading the log groups from the checkpoint lsn up. The
+ variable contiguous_lsn contains an lsn up to which the log is
+ known to be contiguously written to all log groups. */
+
+ recv_sys->parse_start_lsn = checkpoint_lsn;
+ recv_sys->scanned_lsn = checkpoint_lsn;
+ recv_sys->scanned_checkpoint_no = 0;
+ recv_sys->recovered_lsn = checkpoint_lsn;
+
+ /* NOTE: we always do recovery at startup, but only if
+ there is something wrong we will print a message to the
+ user about recovery: */
+
+ if (ut_dulint_cmp(checkpoint_lsn, max_flushed_lsn) != 0
+ || ut_dulint_cmp(checkpoint_lsn, min_flushed_lsn) != 0) {
+
+ fprintf(stderr,
+ "Innobase: Database was not shut down normally.\n"
+ "Innobase: Starting recovery from log files...\n");
+ fprintf(stderr,
+ "Innobase: Starting log scan based on checkpoint at\n"
+ "Innobase: log sequence number %lu %lu\n",
+ ut_dulint_get_high(checkpoint_lsn),
+ ut_dulint_get_low(checkpoint_lsn));
+ }
+ }
+
+ contiguous_lsn = ut_dulint_align_down(recv_sys->scanned_lsn,
+ OS_FILE_LOG_BLOCK_SIZE);
+ if (type == LOG_ARCHIVE) {
+ /* Try to recover the remaining part from logs: first from
+ the logs of the archived group */
+
+ group = recv_sys->archive_group;
+ capacity = log_group_get_capacity(group);
+
+ if ((ut_dulint_cmp(recv_sys->scanned_lsn,
+ ut_dulint_add(checkpoint_lsn, capacity)) > 0)
+ || (ut_dulint_cmp(checkpoint_lsn,
+ ut_dulint_add(recv_sys->scanned_lsn, capacity)) > 0)) {
+
+ mutex_exit(&(log_sys->mutex));
+
+ /* The group does not contain enough log: probably
+ an archived log file was missing or corrupt */
+
+ return(DB_ERROR);
+ }
+
+ recv_group_scan_log_recs(group, &contiguous_lsn,
+ &group_scanned_lsn);
+ if (ut_dulint_cmp(recv_sys->scanned_lsn, checkpoint_lsn) < 0) {
+
+ mutex_exit(&(log_sys->mutex));
+
+ /* The group did not contain enough log: an archived
+ log file was missing or invalid, or the log group
+ was corrupt */
+
+ return(DB_ERROR);
+ }
+
+ group->scanned_lsn = group_scanned_lsn;
+ up_to_date_group = group;
+ } else {
+ up_to_date_group = max_cp_group;
+ }
+
+ ut_ad(RECV_SCAN_SIZE <= log_sys->buf_size);
+
+ group = UT_LIST_GET_FIRST(log_sys->log_groups);
+
+ if ((type == LOG_ARCHIVE) && (group == recv_sys->archive_group)) {
+ group = UT_LIST_GET_NEXT(log_groups, group);
+ }
+
+ while (group) {
+ old_scanned_lsn = recv_sys->scanned_lsn;
+
+ recv_group_scan_log_recs(group, &contiguous_lsn,
+ &group_scanned_lsn);
+ group->scanned_lsn = group_scanned_lsn;
+
+ if (ut_dulint_cmp(old_scanned_lsn, group_scanned_lsn) < 0) {
+ /* We found a more up-to-date group */
+
+ up_to_date_group = group;
+ }
+
+ if ((type == LOG_ARCHIVE)
+ && (group == recv_sys->archive_group)) {
+ group = UT_LIST_GET_NEXT(log_groups, group);
+ }
+
+ group = UT_LIST_GET_NEXT(log_groups, group);
+ }
+
+ if (ut_dulint_cmp(recv_sys->recovered_lsn, checkpoint_lsn) < 0) {
+
+ mutex_exit(&(log_sys->mutex));
+
+ if (ut_dulint_cmp(recv_sys->recovered_lsn, limit_lsn) >= 0) {
+
+ return(DB_SUCCESS);
+ }
+
+ ut_error;
+
+ return(DB_ERROR);
+ }
+
+ /* Synchronize the uncorrupted log groups to the most up-to-date log
+ group; we also copy checkpoint info to groups */
+
+ log_sys->next_checkpoint_lsn = checkpoint_lsn;
+ log_sys->next_checkpoint_no = ut_dulint_add(checkpoint_no, 1);
+
+ log_sys->archived_lsn = archived_lsn;
+
+ recv_synchronize_groups(up_to_date_group);
+
+ log_sys->lsn = recv_sys->recovered_lsn;
+
+ ut_memcpy(log_sys->buf, recv_sys->last_block, OS_FILE_LOG_BLOCK_SIZE);
+
+ log_sys->buf_free = ut_dulint_get_low(log_sys->lsn)
+ % OS_FILE_LOG_BLOCK_SIZE;
+ log_sys->buf_next_to_write = log_sys->buf_free;
+ log_sys->written_to_some_lsn = log_sys->lsn;
+ log_sys->written_to_all_lsn = log_sys->lsn;
+
+ log_sys->last_checkpoint_lsn = checkpoint_lsn;
+
+ log_sys->next_checkpoint_no = ut_dulint_add(checkpoint_no, 1);
+
+ if (ut_dulint_cmp(archived_lsn, ut_dulint_max) == 0) {
+
+ log_sys->archiving_state = LOG_ARCH_OFF;
+ }
+
+ mutex_enter(&(recv_sys->mutex));
+
+ recv_sys->apply_log_recs = TRUE;
+
+ mutex_exit(&(recv_sys->mutex));
+
+ mutex_exit(&(log_sys->mutex));
+
+ sync_order_checks_on = FALSE;
+
+ /* The database is now ready to start almost normal processing of user
+ transactions: transaction rollbacks and the application of the log
+ records in the hash table can be run in background. */
+
+ return(DB_SUCCESS);
+}
+
+/************************************************************
+Completes recovery from a checkpoint. */
+
+void
+recv_recovery_from_checkpoint_finish(void)
+/*======================================*/
+{
+ /* Rollback the uncommitted transactions which have no user session */
+
+ trx_rollback_all_without_sess();
+
+ /* Apply the hashed log records to the respective file pages */
+
+ recv_apply_hashed_log_recs(TRUE);
+
+ if (log_debug_writes) {
+ fprintf(stderr,
+ "Innobase: Log records applied to the database\n");
+ }
+
+ /* Free the resources of the recovery system */
+
+ recv_recovery_on = FALSE;
+#ifndef UNIV_LOG_DEBUG
+ recv_sys_free();
+#endif
+}
+
+/**********************************************************
+Resets the logs. The contents of log files will be lost! */
+
+void
+recv_reset_logs(
+/*============*/
+ dulint lsn, /* in: reset to this lsn rounded up to
+ be divisible by OS_FILE_LOG_BLOCK_SIZE,
+ after which we add LOG_BLOCK_HDR_SIZE */
+ ulint arch_log_no, /* in: next archived log file number */
+ ibool new_logs_created)/* in: TRUE if resetting logs is done
+ at the log creation; FALSE if it is done
+ after archive recovery */
+{
+ log_group_t* group;
+
+ ut_ad(mutex_own(&(log_sys->mutex)));
+
+ log_sys->lsn = ut_dulint_align_up(lsn, OS_FILE_LOG_BLOCK_SIZE);
+
+ group = UT_LIST_GET_FIRST(log_sys->log_groups);
+
+ while (group) {
+ group->lsn = log_sys->lsn;
+ group->lsn_offset = LOG_FILE_HDR_SIZE;
+
+ group->archived_file_no = arch_log_no;
+ group->archived_offset = 0;
+
+ if (!new_logs_created) {
+ recv_truncate_group(group, group->lsn, group->lsn,
+ group->lsn, group->lsn);
+ }
+
+ group = UT_LIST_GET_NEXT(log_groups, group);
+ }
+
+ log_sys->buf_next_to_write = 0;
+ log_sys->written_to_some_lsn = log_sys->lsn;
+ log_sys->written_to_all_lsn = log_sys->lsn;
+
+ log_sys->next_checkpoint_no = ut_dulint_zero;
+ log_sys->last_checkpoint_lsn = ut_dulint_zero;
+
+ log_sys->archived_lsn = log_sys->lsn;
+
+ log_block_init(log_sys->buf, log_sys->lsn);
+ log_block_set_first_rec_group(log_sys->buf, LOG_BLOCK_HDR_SIZE);
+
+ log_sys->buf_free = LOG_BLOCK_HDR_SIZE;
+ log_sys->lsn = ut_dulint_add(log_sys->lsn, LOG_BLOCK_HDR_SIZE);
+
+ mutex_exit(&(log_sys->mutex));
+
+ /* Reset the checkpoint fields in logs */
+
+ log_make_checkpoint_at(ut_dulint_max, TRUE);
+ log_make_checkpoint_at(ut_dulint_max, TRUE);
+
+ mutex_enter(&(log_sys->mutex));
+}
+
+/**********************************************************
+Reads from the archive of a log group and performs recovery. */
+static
+ibool
+log_group_recover_from_archive_file(
+/*================================*/
+ /* out: TRUE if no more complete
+ consistent archive files */
+ log_group_t* group) /* in: log group */
+{
+ os_file_t file_handle;
+ dulint start_lsn;
+ dulint file_end_lsn;
+ dulint dummy_lsn;
+ dulint scanned_lsn;
+ ulint len;
+ char name[10000];
+ ibool ret;
+ byte* buf;
+ ulint read_offset;
+ ulint file_size;
+ ulint file_size_high;
+ int input_char;
+
+try_open_again:
+ buf = log_sys->buf;
+
+ /* Add the file to the archive file space; open the file */
+
+ log_archived_file_name_gen(name, group->id, group->archived_file_no);
+
+ fil_reserve_right_to_open();
+
+ file_handle = os_file_create(name, OS_FILE_OPEN, OS_FILE_AIO, &ret);
+
+ if (ret == FALSE) {
+ fil_release_right_to_open();
+ask_again:
+ fprintf(stderr,
+ "Innobase: Do you want to copy additional archived log files\n"
+ "Innobase: to the directory\n");
+ fprintf(stderr,
+ "Innobase: or were these all the files needed in recovery?\n");
+ fprintf(stderr,
+ "Innobase: (Y == copy more files; N == this is all)?");
+
+ input_char = getchar();
+
+ if (input_char == (int) 'N') {
+
+ return(TRUE);
+ } else if (input_char == (int) 'Y') {
+
+ goto try_open_again;
+ } else {
+ goto ask_again;
+ }
+ }
+
+ ret = os_file_get_size(file_handle, &file_size, &file_size_high);
+ ut_a(ret);
+
+ ut_a(file_size_high == 0);
+
+ fprintf(stderr, "Innobase: Opened archived log file %s\n", name);
+
+ ret = os_file_close(file_handle);
+
+ if (file_size < LOG_FILE_HDR_SIZE) {
+ fprintf(stderr,
+ "Innobase: Archive file header incomplete %s\n", name);
+
+ return(TRUE);
+ }
+
+ ut_a(ret);
+
+ fil_release_right_to_open();
+
+ /* Add the archive file as a node to the space */
+
+ fil_node_create(name, 1 + file_size / UNIV_PAGE_SIZE,
+ group->archive_space_id);
+ ut_a(RECV_SCAN_SIZE >= LOG_FILE_HDR_SIZE);
+
+ /* Read the archive file header */
+ fil_io(OS_FILE_READ | OS_FILE_LOG, TRUE, group->archive_space_id, 0, 0,
+ LOG_FILE_HDR_SIZE, buf, NULL);
+
+ /* Check if the archive file header is consistent */
+
+ if (mach_read_from_4(buf + LOG_GROUP_ID) != group->id
+ || mach_read_from_4(buf + LOG_FILE_NO)
+ != group->archived_file_no) {
+ fprintf(stderr,
+ "Innobase: Archive file header inconsistent %s\n", name);
+
+ return(TRUE);
+ }
+
+ if (!mach_read_from_4(buf + LOG_FILE_ARCH_COMPLETED)) {
+ fprintf(stderr,
+ "Innobase: Archive file not completely written %s\n", name);
+
+ return(TRUE);
+ }
+
+ start_lsn = mach_read_from_8(buf + LOG_FILE_START_LSN);
+ file_end_lsn = mach_read_from_8(buf + LOG_FILE_END_LSN);
+
+ if (ut_dulint_is_zero(recv_sys->scanned_lsn)) {
+
+ if (ut_dulint_cmp(recv_sys->parse_start_lsn, start_lsn) < 0) {
+ fprintf(stderr,
+ "Innobase: Archive log file %s starts from too big a lsn\n",
+ name);
+ return(TRUE);
+ }
+
+ recv_sys->scanned_lsn = start_lsn;
+ }
+
+ if (ut_dulint_cmp(recv_sys->scanned_lsn, start_lsn) != 0) {
+
+ fprintf(stderr,
+ "Innobase: Archive log file %s starts from a wrong lsn\n",
+ name);
+ return(TRUE);
+ }
+
+ read_offset = LOG_FILE_HDR_SIZE;
+
+ for (;;) {
+ len = RECV_SCAN_SIZE;
+
+ if (read_offset + len > file_size) {
+ len = ut_calc_align_down(file_size - read_offset,
+ OS_FILE_LOG_BLOCK_SIZE);
+ }
+
+ if (len == 0) {
+
+ break;
+ }
+
+ if (log_debug_writes) {
+ fprintf(stderr,
+"Innobase: Archive read starting at lsn %lu %lu, len %lu from file %s\n",
+ ut_dulint_get_high(start_lsn),
+ ut_dulint_get_low(start_lsn),
+ len, name);
+ }
+
+ fil_io(OS_FILE_READ | OS_FILE_LOG, TRUE,
+ group->archive_space_id, read_offset / UNIV_PAGE_SIZE,
+ read_offset % UNIV_PAGE_SIZE, len, buf, NULL);
+
+
+ ret = recv_scan_log_recs(TRUE, buf, len, start_lsn,
+ &dummy_lsn, &scanned_lsn);
+
+ if (ut_dulint_cmp(scanned_lsn, file_end_lsn) == 0) {
+
+ return(FALSE);
+ }
+
+ if (ret) {
+ fprintf(stderr,
+ "Innobase: Archive log file %s does not scan right\n",
+ name);
+ return(TRUE);
+ }
+
+ read_offset += len;
+ start_lsn = ut_dulint_add(start_lsn, len);
+
+ ut_ad(ut_dulint_cmp(start_lsn, scanned_lsn) == 0);
+ }
+
+ return(FALSE);
+}
+
+/************************************************************
+Recovers from archived log files, and also from log files, if they exist. */
+
+ulint
+recv_recovery_from_archive_start(
+/*=============================*/
+ /* out: error code or DB_SUCCESS */
+ dulint min_flushed_lsn,/* in: min flushed lsn field from the
+ data files */
+ dulint limit_lsn, /* in: recover up to this lsn if possible */
+ ulint first_log_no) /* in: number of the first archived log file
+ to use in the recovery; the file will be
+ searched from INNOBASE_LOG_ARCH_DIR specified
+ in server config file */
+{
+ log_group_t* group;
+ ulint group_id;
+ ulint trunc_len;
+ ibool ret;
+ ulint err;
+
+ recv_sys_create();
+ recv_sys_init();
+
+ sync_order_checks_on = TRUE;
+
+ recv_recovery_on = TRUE;
+ recv_recovery_from_backup_on = TRUE;
+
+ recv_sys->limit_lsn = limit_lsn;
+
+ group_id = 0;
+
+ group = UT_LIST_GET_FIRST(log_sys->log_groups);
+
+ while (group) {
+ if (group->id == group_id) {
+
+ break;
+ }
+
+ group = UT_LIST_GET_NEXT(log_groups, group);
+ }
+
+ if (!group) {
+ fprintf(stderr,
+ "Innobase: There is no log group defined with id %lu!\n",
+ group_id);
+ return(DB_ERROR);
+ }
+
+ group->archived_file_no = first_log_no;
+
+ recv_sys->parse_start_lsn = min_flushed_lsn;
+
+ recv_sys->scanned_lsn = ut_dulint_zero;
+ recv_sys->scanned_checkpoint_no = 0;
+ recv_sys->recovered_lsn = recv_sys->parse_start_lsn;
+
+ recv_sys->archive_group = group;
+
+ ret = FALSE;
+
+ mutex_enter(&(log_sys->mutex));
+
+ while (!ret) {
+ ret = log_group_recover_from_archive_file(group);
+
+ /* Close and truncate a possible processed archive file
+ from the file space */
+
+ trunc_len = UNIV_PAGE_SIZE
+ * fil_space_get_size(group->archive_space_id);
+ if (trunc_len > 0) {
+ fil_space_truncate_start(group->archive_space_id,
+ trunc_len);
+ }
+
+ group->archived_file_no++;
+ }
+
+ if (ut_dulint_cmp(recv_sys->recovered_lsn, limit_lsn) < 0) {
+
+ if (ut_dulint_is_zero(recv_sys->scanned_lsn)) {
+
+ recv_sys->scanned_lsn = recv_sys->parse_start_lsn;
+ }
+
+ mutex_exit(&(log_sys->mutex));
+
+ err = recv_recovery_from_checkpoint_start(LOG_ARCHIVE,
+ limit_lsn,
+ ut_dulint_max,
+ ut_dulint_max);
+ if (err != DB_SUCCESS) {
+
+ return(err);
+ }
+
+ mutex_enter(&(log_sys->mutex));
+ }
+
+ if (ut_dulint_cmp(limit_lsn, ut_dulint_max) != 0) {
+
+ recv_apply_hashed_log_recs(FALSE);
+
+ recv_reset_logs(recv_sys->recovered_lsn, 0, FALSE);
+ }
+
+ mutex_exit(&(log_sys->mutex));
+
+ sync_order_checks_on = FALSE;
+
+ return(DB_SUCCESS);
+}
+
+/************************************************************
+Completes recovery from archive. */
+
+void
+recv_recovery_from_archive_finish(void)
+/*===================================*/
+{
+ recv_recovery_from_checkpoint_finish();
+
+ recv_recovery_from_backup_on = FALSE;
+}
diff --git a/innobase/log/makefilewin b/innobase/log/makefilewin
new file mode 100644
index 00000000000..a690af3bb35
--- /dev/null
+++ b/innobase/log/makefilewin
@@ -0,0 +1,10 @@
+include ..\include\makefile.i
+
+log.lib: log0log.obj log0recv.obj
+ lib -out:..\libs\log.lib log0log.obj log0recv.obj
+
+log0log.obj: log0log.c
+ $(CCOM) $(CFL) -c log0log.c
+
+log0recv.obj: log0recv.c
+ $(CCOM) $(CFL) -c log0recv.c
diff --git a/innobase/log/trash/log0trsh.c b/innobase/log/trash/log0trsh.c
new file mode 100644
index 00000000000..7f48118a0d1
--- /dev/null
+++ b/innobase/log/trash/log0trsh.c
@@ -0,0 +1,648 @@
+/******************************************************
+Recovery
+
+(c) 1997 Innobase Oy
+
+Created 9/20/1997 Heikki Tuuri
+*******************************************************/
+
+#include "log0recv.h"
+
+#ifdef UNIV_NONINL
+#include "log0recv.ic"
+#endif
+
+#include "mem0mem.h"
+#include "buf0buf.h"
+#include "buf0flu.h"
+#include "srv0srv.h"
+
+/* Size of block reads when the log groups are scanned forward to do
+roll-forward */
+#define RECV_SCAN_SIZE (4 * UNIV_PAGE_SIZE)
+
+/* Size of block reads when the log groups are scanned backwards to synchronize
+them */
+#define RECV_BACK_SCAN_SIZE (4 * UNIV_PAGE_SIZE)
+
+recv_sys_t* recv_sys = NULL;
+
+recv_recover_page(block->frame, block->space, block->offset);
+
+/************************************************************
+Creates the recovery system. */
+
+void
+recv_sys_create(void)
+/*=================*/
+{
+ ut_a(recv_sys == NULL);
+
+ recv_sys = mem_alloc(sizeof(recv_t));
+
+ mutex_create(&(recv_sys->mutex));
+
+ recv_sys->hash = NULL;
+ recv_sys->heap = NULL;
+}
+
+/************************************************************
+Inits the recovery system for a recovery operation. */
+
+void
+recv_sys_init(void)
+/*===============*/
+{
+ recv_sys->hash = hash_create(buf_pool_get_curr_size() / 64);
+ recv_sys->heap = mem_heap_create_in_buffer(256);
+}
+
+/************************************************************
+Empties the recovery system. */
+
+void
+recv_sys_empty(void)
+/*================*/
+{
+ mutex_enter(&(recv_sys->mutex));
+
+ hash_free(recv_sys->hash);
+ mem_heap_free(recv_sys->heap);
+
+ recv_sys->hash = NULL;
+ recv_sys->heap = NULL;
+
+ mutex_exit(&(recv_sys->mutex));
+}
+
+/***********************************************************
+For recovery purposes copies the log buffer to a group to synchronize log
+data. */
+static
+void
+recv_log_buf_flush(
+/*===============*/
+ log_group_t* group, /* in: log group */
+ dulint start_lsn, /* in: start lsn of the log data in
+ the log buffer; must be divisible by
+ OS_FILE_LOG_BLOCK_SIZE */
+ dulint end_lsn) /* in: end lsn of the log data in the
+ log buffer; must be divisible by
+ OS_FILE_LOG_BLOCK_SIZE */
+{
+ ulint len;
+
+ ut_ad(mutex_own(&(log_sys->mutex)));
+
+ len = ut_dulint_minus(end_lsn, start_lsn);
+
+ log_group_write_buf(LOG_RECOVER, group, log_sys->buf, len, start_lsn,
+ 0);
+}
+
+/***********************************************************
+Compares two buffers containing log segments and determines the highest lsn
+where they match, if any. */
+static
+dulint
+recv_log_bufs_cmp(
+/*==============*/
+ /* out: if no match found, ut_dulint_zero or
+ if start_lsn == LOG_START_LSN, returns
+ LOG_START_LSN; otherwise the highest matching
+ lsn */
+ byte* recv_buf, /* in: buffer containing valid log data */
+ byte* buf, /* in: buffer of data from a possibly
+ incompletely written log group */
+ dulint start_lsn, /* in: buffer start lsn, must be divisible
+ by OS_FILE_LOG_BLOCK_SIZE and must be >=
+ LOG_START_LSN */
+ dulint end_lsn, /* in: buffer end lsn, must be divisible
+ by OS_FILE_LOG_BLOCK_SIZE */
+ dulint recovered_lsn) /* in: recovery succeeded up to this lsn */
+{
+ ulint len;
+ ulint offset;
+ byte* log_block1;
+ byte* log_block2;
+ ulint no;
+ ulint data_len;
+
+ ut_ad(ut_dulint_cmp(start_lsn, LOG_START_LSN) >= 0);
+
+ if (ut_dulint_cmp(end_lsn, recovered_lsn) > 0) {
+ end_lsn = ut_dulint_align_up(recovered_lsn,
+ OS_FILE_LOG_BLOCK_SIZE);
+ }
+
+ len = ut_dulint_minus(end_lsn, start_lsn);
+
+ if (len == 0) {
+
+ goto no_match;
+ }
+
+ ut_ad(len % OS_FILE_LOG_BLOCK_SIZE == 0);
+
+ log_block1 = recv_buf + len;
+ log_block2 = buf + len;
+
+ for (;;) {
+ log_block1 -= OS_FILE_LOG_BLOCK_SIZE;
+ log_block2 -= OS_FILE_LOG_BLOCK_SIZE;
+
+ no = log_block_get_hdr_no(log_block1);
+ ut_a(no == log_block_get_trl_no(log_block1));
+
+ if ((no == log_block_get_hdr_no(log_block2))
+ && (no == log_block_get_trl_no(log_block2))) {
+
+ /* Match found if the block is not corrupted */
+
+ data_len = log_block_get_data_len(log_block2);
+
+ if (0 == ut_memcmp(log_block1 + LOG_BLOCK_DATA,
+ log_block2 + LOG_BLOCK_DATA,
+ data_len - LOG_BLOCK_DATA)) {
+
+ /* Match found */
+
+ return(ut_dulint_add(start_lsn,
+ log_block2 - buf + data_len));
+ }
+ }
+
+ if (log_block1 == recv_buf) {
+
+ /* No match found */
+
+ break;
+ }
+ }
+no_match:
+ if (ut_dulint_cmp(start_lsn, LOG_START_LSN) == 0) {
+
+ return(LOG_START_LSN);
+ }
+
+ return(ut_dulint_zero);
+}
+
+/************************************************************
+Copies a log segment from the most up-to-date log group to the other log
+group, so that it contains the latest log data. */
+static
+void
+recv_copy_group(
+/*============*/
+ log_group_t* up_to_date_group, /* in: the most up-to-date
+ log group */
+ log_group_t* group, /* in: copy to this log group */
+ dulint_lsn recovered_lsn) /* in: recovery succeeded up
+ to this lsn */
+{
+ dulint start_lsn;
+ dulint end_lsn;
+ dulint match;
+ byte* buf;
+ byte* buf1;
+
+ ut_ad(mutex_own(&(log_sys->mutex)));
+
+ if (0 == ut_dulint_cmp(LOG_START_LSN, recovered_lsn)) {
+
+ return;
+ }
+
+ ut_ad(RECV_BACK_SCAN_SIZE <= log_sys->buf_size);
+
+ buf1 = mem_alloc(2 * RECV_BACK_SCAN_SIZE);
+ buf = ut_align(buf, RECV_BACK_SCAN_SIZE););
+
+ end_lsn = ut_dulint_align_up(recovered_lsn, RECV_BACK_SCAN_SIZE);
+
+ match = ut_dulint_zero;
+
+ for (;;) {
+ if (ut_dulint_cmp(ut_dulint_add(LOG_START_LSN,
+ RECV_BACK_SCAN_SIZE), end_lsn) >= 0) {
+ start_lsn = LOG_START_LSN;
+ } else {
+ start_lsn = ut_dulint_subtract(end_lsn,
+ RECV_BACK_SCAN_SIZE);
+ }
+
+ log_group_read_log_seg(LOG_RECOVER, buf, group, start_lsn,
+ end_lsn);
+ log_group_read_log_seg(LOG_RECOVER, log_sys->buf,
+ up_to_date_group, start_lsn, end_lsn);
+
+ match = recv_log_bufs_cmp(log_sys->buf, buf, start_lsn,
+ end_lsn, recovered_lsn);
+
+ if (ut_dulint_cmp(match, recovered_lsn) != 0) {
+ recv_log_buf_flush(group, start_lsn, end_lsn);
+ }
+
+ if (!ut_dulint_zero(match)) {
+
+ mem_free(buf1);
+
+ return;
+ }
+
+ end_lsn = start_lsn;
+ }
+}
+
+/************************************************************
+Copies a log segment from the most up-to-date log group to the other log
+groups, so that they all contain the latest log data. Also writes the info
+about the latest checkpoint to the groups, and inits the fields in the group
+memory structs to up-to-date values. */
+
+void
+recv_synchronize_groups(
+/*====================*/
+ log_group_t* up_to_date_group, /* in: the most up-to-date
+ log group */
+ dulint_lsn recovered_lsn, /* in: recovery succeeded up
+ to this lsn */
+ log_group_t* max_checkpoint_group) /* in: the group with the most
+ recent checkpoint info */
+{
+ log_group_t* group;
+
+ ut_ad(mutex_own(&(log_sys->mutex)));
+
+ group = UT_LIST_GET_FIRST(log_sys->log_groups);
+
+ while (group) {
+ if (group != up_to_date_group) {
+
+ /* Copy log data */
+
+ recv_copy_group(group, up_to_date_group,
+ recovered_lsn);
+ }
+
+ if (group != max_checkpoint_group) {
+
+ /* Copy the checkpoint info to the group */
+
+ log_group_checkpoint(group);
+
+ mutex_exit(&(log_sys->mutex));
+
+ /* Wait for the checkpoint write to complete */
+ rw_lock_s_lock(&(log_sys->checkpoint_lock));
+ rw_lock_s_unlock(&(log_sys->checkpoint_lock));
+
+ mutex_enter(&(log_sys->mutex));
+ }
+
+ /* Update the fields in the group struct to correspond to
+ recovered_lsn */
+
+ log_group_set_fields(group, recovered_lsn);
+
+ group = UT_LIST_GET_NEXT(log_groups, group);
+ }
+}
+
+/************************************************************
+Looks for the maximum consistent checkpoint from the log groups. */
+static
+ulint
+recv_find_max_checkpoint(
+/*=====================*/
+ /* out: error code or DB_SUCCESS */
+ log_group_t** max_group, /* out: max group */
+ ulint* max_field) /* out: LOG_CHECKPOINT_1 or
+ LOG_CHECKPOINT_2 */
+{
+ log_group_t* group;
+ dulint max_no;
+ dulint cp_no;
+ ulint field;
+ ulint fold;
+ byte* buf;
+
+ ut_ad(mutex_own(&(log_sys->mutex)));
+
+ /* Look for the latest checkpoint from the log groups */
+ group = UT_LIST_GET_FIRST(log_sys->log_groups);
+
+ checkpoint_no = ut_dulint_zero;
+ checkpoint_lsn = ut_dulint_zero;
+ *max_group = NULL;
+
+ buf = log_sys->checkpoint_buf;
+
+ while (group) {
+ group->state = LOG_GROUP_CORRUPTED;
+
+ for (field = LOG_CHECKPOINT_1; field <= LOG_CHECKPOINT_2;
+ field += LOG_CHECKPOINT_2 - LOG_CHECKPOINT_1) {
+
+ log_group_read_checkpoint_info(group, field);
+
+ /* Check the consistency of the checkpoint info */
+ fold = ut_fold_binary(buf, LOG_CHECKPOINT_CHECKSUM_1);
+
+ if (fold != mach_read_from_4(buf
+ + LOG_CHECKPOINT_CHECKSUM_1)) {
+ goto not_consistent;
+ }
+
+ fold = ut_fold_binary(buf + LOG_CHECKPOINT_LSN,
+ LOG_CHECKPOINT_CHECKSUM_2
+ - LOG_CHECKPOINT_LSN);
+ if (fold != mach_read_from_4(buf
+ + LOG_CHECKPOINT_CHECKSUM_2)) {
+ goto not_consistent;
+ }
+
+ group->state = LOG_GROUP_OK;
+
+ group->lsn = mach_read_from_8(buf
+ + LOG_CHECKPOINT_LSN);
+ group->lsn_offset = mach_read_from_4(buf
+ + LOG_CHECKPOINT_OFFSET);
+ group->lsn_file_count = mach_read_from_4(
+ buf + LOG_CHECKPOINT_FILE_COUNT);
+
+ cp_no = mach_read_from_8(buf + LOG_CHECKPOINT_NO);
+
+ if (ut_dulint_cmp(cp_no, max_no) >= 0) {
+ *max_group = group;
+ *max_field = field;
+ max_no = cp_no;
+ }
+
+ not_consistent:
+ }
+
+ group = UT_LIST_GET_NEXT(log_groups, group);
+ }
+
+ if (*max_group == NULL) {
+
+ return(DB_ERROR);
+ }
+
+ return(DB_SUCCESS);
+}
+
+/***********************************************************
+Parses log records from a buffer and stores them to a hash table to wait
+merging to file pages. If the hash table becomes too big, merges automatically
+it to file pages. */
+static
+bool
+recv_parse_and_hash_log_recs(
+/*=========================*/
+ /* out: TRUE if limit_lsn has been reached */
+ byte* buf, /* in: buffer containing a log segment or
+ garbage */
+ ulint len, /* in: buffer length */
+ dulint start_lsn, /* in: buffer start lsn */
+ dulint limit_lsn, /* in: recover at least to this lsn */
+ dulint* recovered_lsn) /* out: was able to parse up to this lsn */
+{
+
+}
+
+/************************************************************
+Recovers from a checkpoint. When this function returns, the database is able
+to start processing new user transactions, but the function
+recv_recovery_from_checkpoint_finish should be called later to complete
+the recovery and free the resources used in it. */
+
+ulint
+recv_recovery_from_checkpoint_start(
+/*================================*/
+ /* out: error code or DB_SUCCESS */
+ dulint limit_lsn) /* in: recover up to this lsn if possible */
+{
+ log_group_t* max_cp_group;
+ log_group_t* up_to_date_group;
+ ulint max_cp_field;
+ byte* buf;
+ ulint err;
+ dulint checkpoint_lsn;
+ dulint checkpoint_no;
+ dulint recovered_lsn;
+ dulint old_lsn;
+ dulint end_lsn;
+ dulint start_lsn;
+ bool finished;
+ dulint flush_start_lsn;
+
+ mutex_enter(&(log_sys->mutex));
+
+ /* Look for the latest checkpoint from any of the log groups */
+
+ err = recv_find_max_checkpoint(&max_cp_group, &max_cp_field);
+
+ if (err != DB_SUCCESS) {
+ mutex_exit(&(log_sys->mutex));
+
+ return(err);
+ }
+
+ log_group_read_checkpoint_info(max_cp_group, max_cp_field);
+
+ buf = log_sys->checkpoint_buf;
+
+ checkpoint_lsn = mach_read_from_8(buf + LOG_CHECKPOINT_LSN);
+ checkpoint_no = mach_read_from_8(buf + LOG_CHECKPOINT_NO);
+
+ if (ut_dulint_cmp(limit_lsn, checkpoint_lsn) < 0) {
+ mutex_exit(&(log_sys->mutex));
+
+ return(DB_ERROR);
+ }
+
+ /* Start reading the log groups from the checkpoint lsn up. The
+ variable flush_start_lsn tells a lsn up to which the log is known
+ to be contiguously written in all log groups. */
+
+ recovered_lsn = checkpoint_lsn;
+ flush_start_lsn = ut_dulint_align_down(checkpoint_lsn,
+ OS_FILE_LOG_BLOCK_SIZE);
+ up_to_date_group = max_cp_group;
+
+ ut_ad(RECV_SCAN_SIZE <= log_sys->buf_size);
+
+ group = UT_LIST_GET_FIRST(log_sys->log_groups);
+
+ while (group) {
+ finished = FALSE;
+
+ if (group->state == LOG_GROUP_CORRUPTED) {
+ finished = TRUE;
+ }
+
+ start_lsn = flush_start_lsn;
+
+ while (!finished) {
+ end_lsn = ut_dulint_add(start_lsn, RECV_SCAN_SIZE);
+
+ log_group_read_log_seg(LOG_RECOVER, log_sys->buf,
+ group, start_lsn, end_lsn);
+ old_lsn = recovered_lsn;
+
+ finished = recv_parse_and_hash_log_recs(log_sys->buf,
+ RECV_SCAN_SIZE, start_lsn,
+ limit_lsn, &flush_start_lsn,
+ &recovered_lsn);
+
+ if (ut_dulint_cmp(recovered_lsn, old_lsn) > 0) {
+
+ /* We found a more up-to-date group */
+ up_to_date_group = group;
+ }
+
+ start_lsn = end_lsn;
+ }
+
+ group = UT_LIST_GET_NEXT(log_groups, group);
+ }
+
+ /* Delete possible corrupted or extra log records from all log
+ groups */
+
+ recv_truncate_groups(recovered_lsn);
+
+ /* Synchronize the uncorrupted log groups to the most up-to-date log
+ group; we may also have to copy checkpoint info to groups */
+
+ log_sys->next_checkpoint_lsn = checkpoint_lsn;
+ log_sys->next_checkpoint_no = checkpoint_no;
+
+ recv_synchronize_groups(up_to_date_group, _lsn, max_cp_group);
+
+ log_sys->next_checkpoint_no = ut_dulint_add(checkpoint_no, 1);
+
+ /* The database is now ready to start almost normal processing of user
+ transactions */
+
+ return(DB_SUCCESS);
+}
+
+/************************************************************
+Completes recovery from a checkpoint. */
+
+void
+recv_recovery_from_checkpoint_finish(void)
+/*======================================*/
+{
+ /* Rollback the uncommitted transactions which have no user session */
+
+ trx_rollback_all_without_sess();
+
+ /* Merge the hashed log records */
+
+ recv_merge_hashed_log_recs();
+
+ /* Free the resources of the recovery system */
+
+ recv_sys_empty();
+}
+
+/****************************************************************
+Writes to the log a record about incrementing the row id counter. */
+UNIV_INLINE
+void
+log_write_row_id_incr_rec(void)
+/*===========================*/
+{
+ log_t* log = log_sys;
+ ulint data_len;
+
+ mutex_enter(&(log->mutex));
+
+ data_len = (log->buf_free % OS_FILE_LOG_BLOCK_SIZE) + 1;
+
+ if (data_len >= OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_TRL_SIZE) {
+
+ /* The string does not fit within the current log block
+ or the the block would become full */
+
+ mutex_exit(&(log->mutex));
+
+ log_write_row_id_incr_rec_slow();
+
+ return;
+ }
+
+ *(log->buf + log->buf_free) = MLOG_INCR_ROW_ID | MLOG_SINGLE_REC_FLAG;
+
+ log_block_set_data_len(ut_align_down(log->buf + log->buf_free,
+ OS_FILE_LOG_BLOCK_SIZE),
+ data_len);
+#ifdef UNIV_LOG_DEBUG
+ log->old_buf_free = log->buf_free;
+ log->old_lsn = log->lsn;
+ log_check_log_recs(log->buf + log->buf_free, 1, log->lsn);
+#endif
+ log->buf_free++;
+
+ ut_ad(log->buf_free <= log->buf_size);
+
+ UT_DULINT_INC(log->lsn);
+
+ mutex_exit(&(log->mutex));
+}
+
+/****************************************************************
+Writes to the log a record about incrementing the row id counter. */
+static
+void
+log_write_row_id_incr_rec_slow(void)
+/*================================*/
+{
+ byte type;
+
+ log_reserve_and_open(1);
+
+ type = MLOG_INCR_ROW_ID | MLOG_SINGLE_REC_FLAG;
+
+ log_write_low(&type, 1);
+
+ log_close();
+
+ log_release();
+}
+
+/**************************************************************************
+Parses and applies a log record MLOG_SET_ROW_ID. */
+
+byte*
+dict_hdr_parse_set_row_id(
+/*======================*/
+ /* out: end of log record or NULL */
+ byte* ptr, /* in: buffer */
+ byte* end_ptr,/* in: buffer end */
+ page_t* page) /* in: page or NULL */
+{
+ dulint dval;
+
+ ptr = mach_dulint_parse_compressed(ptr, end_ptr, &dval);
+
+ if (ptr == NULL) {
+
+ return(NULL);
+ }
+
+ if (!page) {
+
+ return(ptr);
+ }
+
+ mach_write_to_8(page + DICT_HDR + DICT_HDR_ROW_ID, dval);
+
+ return(ptr);
+}
+