summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarko Mäkelä <marko.makela@mariadb.com>2020-03-06 15:21:57 +0200
committerMarko Mäkelä <marko.makela@mariadb.com>2020-03-06 15:21:57 +0200
commit0939ff30faf3ba22277a7c64a7c0a62b55bad7d4 (patch)
tree16c58e7793eadbb79ce7646ebd6df520de28c907
parent35f0e686d834390e19d0c7fc3b0279bab9994ba7 (diff)
downloadmariadb-git-10.5-marko.tar.gz
WIP clean up log upgrade10.5-marko
FIXME: innodb.log_corruption fails with result diff, and fails to delete ib_logfile1. But it no longer crashes!
-rw-r--r--extra/mariabackup/xtrabackup.cc62
-rw-r--r--storage/innobase/include/log0crypt.h20
-rw-r--r--storage/innobase/include/log0log.h74
-rw-r--r--storage/innobase/include/log0log.ic64
-rw-r--r--storage/innobase/include/log0recv.h23
-rw-r--r--storage/innobase/log/log0crypt.cc182
-rw-r--r--storage/innobase/log/log0log.cc2
-rw-r--r--storage/innobase/log/log0recv.cc598
-rw-r--r--storage/innobase/srv/srv0start.cc53
9 files changed, 432 insertions, 646 deletions
diff --git a/extra/mariabackup/xtrabackup.cc b/extra/mariabackup/xtrabackup.cc
index 866c44f1e39..518c2873a87 100644
--- a/extra/mariabackup/xtrabackup.cc
+++ b/extra/mariabackup/xtrabackup.cc
@@ -2663,17 +2663,14 @@ static lsn_t xtrabackup_copy_log(lsn_t start_lsn, lsn_t end_lsn, bool last)
recv_sys_justify_left_parsing_buf();
- log_sys.log.scanned_lsn = scanned_lsn;
+ recv_sys.scanned_lsn = scanned_lsn;
end_lsn = last
? ut_uint64_align_up(scanned_lsn, OS_FILE_LOG_BLOCK_SIZE)
: scanned_lsn & ~lsn_t(OS_FILE_LOG_BLOCK_SIZE - 1);
if (ulint write_size = ulint(end_lsn - start_lsn)) {
- if (srv_encrypt_log) {
- log_crypt(log_sys.buf, start_lsn, write_size);
- }
-
+ ut_ad(!srv_encrypt_log); // FIXME
if (ds_write(dst_log_file, log_sys.buf, write_size)) {
msg("Error: write to logfile failed");
return(0);
@@ -2733,7 +2730,7 @@ static bool xtrabackup_copy_logfile(bool last = false)
}
} while (start_lsn == end_lsn);
- ut_ad(start_lsn == log_sys.log.scanned_lsn);
+ ut_ad(start_lsn == recv_sys.scanned_lsn);
msg(">> log scanned up to (" LSN_PF ")", start_lsn);
@@ -3853,7 +3850,7 @@ static void stop_backup_threads()
static bool xtrabackup_backup_low()
{
ut_ad(!metadata_to_lsn);
-
+#if 0 // FIXME
/* read the latest checkpoint lsn */
{
ulint max_cp_field;
@@ -3877,7 +3874,7 @@ static bool xtrabackup_backup_low()
}
log_mutex_exit();
}
-
+#endif
stop_backup_threads();
if (metadata_to_lsn && xtrabackup_copy_logfile(true)) {
@@ -4046,8 +4043,6 @@ fail:
}
{
- /* definition from recv_recovery_from_checkpoint_start() */
- ulint max_cp_field;
/* start back ground thread to copy newer log */
os_thread_id_t log_copying_thread_id;
@@ -4056,9 +4051,12 @@ fail:
/* Look for the latest checkpoint from any of the log groups */
log_mutex_enter();
-
+#if 0
reread_log_header:
dberr_t err = recv_find_max_checkpoint(&max_cp_field);
+#else
+ dberr_t err = DB_FAIL; // FIXME
+#endif
if (err != DB_SUCCESS) {
msg("Error: cannot read redo log header");
@@ -4072,19 +4070,19 @@ reread_log_header:
goto fail;
}
- byte* buf = log_sys.buf;
checkpoint_lsn_start = log_sys.log.get_lsn();
checkpoint_no_start = log_sys.next_checkpoint_no;
+#if 0 // FIXME
+ byte* buf = log_sys.buf;
log_sys.log.main_read(max_cp_field, {buf, OS_FILE_LOG_BLOCK_SIZE});
-
if (checkpoint_no_start != mach_read_from_8(buf + LOG_CHECKPOINT_NO)
|| checkpoint_lsn_start
!= mach_read_from_8(buf + LOG_CHECKPOINT_LSN)
|| log_sys.log.get_lsn_offset()
!= mach_read_from_8(buf + LOG_CHECKPOINT_OFFSET))
goto reread_log_header;
-
+#endif
log_mutex_exit();
xtrabackup_init_datasinks();
@@ -4112,36 +4110,18 @@ reread_log_header:
}
/* label it */
- alignas(OS_FILE_LOG_BLOCK_SIZE) byte log_hdr_buf[LOG_MAIN_FILE_SIZE];
- memset(log_hdr_buf, 0, sizeof log_hdr_buf);
+ alignas(OS_FILE_LOG_BLOCK_SIZE) byte log_hdr[512];
+ memset(log_hdr, 0, sizeof log_hdr);
- byte *log_hdr_field = log_hdr_buf;
- mach_write_to_4(log_header::FORMAT + log_hdr_field,
+ mach_write_to_4(log_header::FORMAT + log_hdr,
log_sys.log.format);
- mach_write_to_4(log_header::KEY_VERSION + log_hdr_field,
+ mach_write_to_4(log_header::KEY_VERSION + log_hdr,
log_sys.log.key_version);
- strcpy(reinterpret_cast<char*>(log_header::CREATOR + log_hdr_field),
- "Backup " MYSQL_SERVER_VERSION);
- log_block_set_checksum(log_hdr_field,
- log_block_calc_checksum_crc32(log_hdr_field));
-
- /* copied from log_group_checkpoint() */
- log_hdr_field +=
- (log_sys.next_checkpoint_no & 1) ? LOG_CHECKPOINT_2 : LOG_CHECKPOINT_1;
- /* The least significant bits of LOG_CHECKPOINT_OFFSET must be
- stored correctly in the copy of the LOG_FILE_NAME. The most significant
- bits, which identify the start offset of the log block in the file,
- we did choose freely, as LOG_FILE_HDR_SIZE. */
- ut_ad(!((log_sys.log.get_lsn() ^ checkpoint_lsn_start)
- & (OS_FILE_LOG_BLOCK_SIZE - 1)));
- /* Adjust the checkpoint page. */
- memcpy(log_hdr_field, log_sys.buf, OS_FILE_LOG_BLOCK_SIZE);
- mach_write_to_8(log_hdr_field + LOG_CHECKPOINT_OFFSET,
- (checkpoint_lsn_start & (OS_FILE_LOG_BLOCK_SIZE - 1)));
- log_block_set_checksum(log_hdr_field,
- log_block_calc_checksum_crc32(log_hdr_field));
-
- if (ds_write(dst_log_main_file, log_hdr_buf, sizeof(log_hdr_buf))) {
+ strcpy(reinterpret_cast<char*>(log_header::CREATOR + log_hdr),
+ "Backup " MYSQL_SERVER_VERSION);
+ mach_write_to_4(&log_hdr[512 - 4], ut_crc32(log_hdr, 512 -4));
+
+ if (ds_write(dst_log_main_file, log_hdr, sizeof(log_hdr))) {
msg("error: write to main log file failed");
goto fail;
}
diff --git a/storage/innobase/include/log0crypt.h b/storage/innobase/include/log0crypt.h
index c1fe84d8aad..7f77ecc206e 100644
--- a/storage/innobase/include/log0crypt.h
+++ b/storage/innobase/include/log0crypt.h
@@ -46,30 +46,24 @@ uint32_t log_crypt_key_version();
/** Read the MariaDB 10.1 checkpoint crypto (version, msg and iv) info.
@param[in] buf checkpoint buffer
@return whether the operation was successful */
-UNIV_INTERN
-bool
-log_crypt_101_read_checkpoint(const byte* buf);
+ATTRIBUTE_COLD bool log_crypt_101_read_checkpoint(const byte* buf);
/** Decrypt a MariaDB 10.1 redo log block.
@param[in,out] buf log block
@param[in] start_lsn server start LSN
@return whether the decryption was successful */
-bool log_crypt_101_read_block(byte* buf, lsn_t start_lsn);
+ATTRIBUTE_COLD bool log_crypt_101_read_block(byte* buf, lsn_t start_lsn);
/** Read the checkpoint crypto (version, msg and iv) info.
@param[in] buf checkpoint buffer
@return whether the operation was successful */
-UNIV_INTERN
-bool
-log_crypt_read_checkpoint_buf(const byte* buf);
+ATTRIBUTE_COLD bool log_crypt_read_checkpoint_buf(const byte* buf);
-/** Encrypt or decrypt log blocks.
-@param[in,out] buf log blocks to encrypt or decrypt
+/** Decrypt a log block when upgrading from MariaDB 10.2.5 to 10.5.1.
+@param[in,out] buf 512-byte log block to decrypt
@param[in] lsn log sequence number of the start of the buffer
-@param[in] size size of the buffer, in bytes
-@param[in] decrypt whether to decrypt, instead of encrypting
-@return whether the operation succeeded (encrypt always does) */
-bool log_crypt(byte* buf, lsn_t lsn, ulint size, bool decrypt= false);
+@return whether the operation succeeded */
+ATTRIBUTE_COLD bool log_decrypt_10_4(byte* buf, lsn_t lsn);
/** Encrypt or decrypt a temporary file block.
@param[in] src block to encrypt or decrypt
diff --git a/storage/innobase/include/log0log.h b/storage/innobase/include/log0log.h
index a3a04d8762a..19b4bac786d 100644
--- a/storage/innobase/include/log0log.h
+++ b/storage/innobase/include/log0log.h
@@ -159,27 +159,6 @@ objects! */
void
log_check_margins(void);
-/** Calculate the CRC-32C checksum of a log block.
-@param[in] block log block
-@return checksum */
-inline ulint log_block_calc_checksum_crc32(const byte* block);
-
-/************************************************************//**
-Gets a log block checksum field value.
-@return checksum */
-UNIV_INLINE
-ulint
-log_block_get_checksum(
-/*===================*/
- const byte* log_block); /*!< in: log block */
-/************************************************************//**
-Sets a log block checksum field value. */
-UNIV_INLINE
-void
-log_block_set_checksum(
-/*===================*/
- byte* log_block, /*!< in/out: log block */
- ulint checksum); /*!< in: checksum */
/******************************************************//**
Prints info of the log. */
void
@@ -192,34 +171,6 @@ void
log_refresh_stats(void);
/*===================*/
-#define LOG_BLOCK_KEY 4 /* encryption key version
- before LOG_BLOCK_CHECKSUM;
- in log_t::FORMAT_ENC_10_4 only */
-#define LOG_BLOCK_CHECKSUM 4 /* CRC-32C of the ib_logfile0
- header, or pre-10.5.2 log block
- contents */
-
-/** Offsets inside the checkpoint pages (redo log format version 1) @{ */
-/** Checkpoint number */
-#define LOG_CHECKPOINT_NO 0
-/** Log sequence number up to which all changes have been flushed */
-#define LOG_CHECKPOINT_LSN 8
-/** Byte offset of the log record corresponding to LOG_CHECKPOINT_LSN */
-#define LOG_CHECKPOINT_OFFSET 16
-/** srv_log_buffer_size at the time of the checkpoint (not used) */
-#define LOG_CHECKPOINT_LOG_BUF_SIZE 24
-/** MariaDB 10.2.5 encrypted redo log encryption key version (32 bits)*/
-#define LOG_CHECKPOINT_CRYPT_KEY 32
-/** MariaDB 10.2.5 encrypted redo log random nonce (32 bits) */
-#define LOG_CHECKPOINT_CRYPT_NONCE 36
-/** MariaDB 10.2.5 encrypted redo log random message (MY_AES_BLOCK_SIZE) */
-#define LOG_CHECKPOINT_CRYPT_MESSAGE 40
-/** start LSN of the MLOG_CHECKPOINT mini-transaction corresponding
-to this checkpoint, or 0 if the information has not been written */
-#define LOG_CHECKPOINT_END_LSN OS_FILE_LOG_BLOCK_SIZE - 16
-
-/* @} */
-
/** Offsets of a log file header */
namespace log_header
{
@@ -262,8 +213,6 @@ namespace log_header
#define LOG_CHECKPOINT_2 (3 * OS_FILE_LOG_BLOCK_SIZE)
/* second checkpoint field in the log
header */
-/** size of LOG_FILE_NAME (header + checkpoints */
-constexpr size_t LOG_MAIN_FILE_SIZE= 4 * OS_FILE_LOG_BLOCK_SIZE;
typedef ib_mutex_t LogSysMutex;
typedef ib_mutex_t FlushOrderMutex;
@@ -391,6 +340,10 @@ private:
This must hold if lsn - last_checkpoint_lsn > max_checkpoint_age. */
std::atomic<bool> check_flush_or_checkpoint_;
public:
+#if 0
+ /** The sequence bit of the next record to write */
+ bool sequence_bit;
+#endif
MY_ALIGNED(CACHE_LINE_SIZE)
LogSysMutex mutex; /*!< mutex protecting the log */
@@ -442,14 +395,11 @@ public:
log_file_t fd;
public:
- /** used only in recovery: recovery scan succeeded up to this
- lsn in this log group */
- lsn_t scanned_lsn;
-
/** opens log files which must be closed prior this call */
void open_files(std::string path);
/** renames log file */
dberr_t main_rename(std::string path) { return fd.rename(path); }
+ os_offset_t main_file_size() const { return fd_offset; }
/** reads from main log files */
void main_read(os_offset_t offset, span<byte> buf);
/** writes buffer to log file
@@ -489,7 +439,11 @@ public:
@param[in] lsn log sequence number
@return offset within the log */
inline lsn_t calc_lsn_offset(lsn_t lsn) const;
- lsn_t calc_lsn_offset_old(lsn_t lsn) const;
+ /** Calculate the offset of a log sequence number
+ in an old redo log file (during upgrade check).
+ @param[in] lsn log sequence number
+ @return byte offset within the log */
+ inline lsn_t calc_lsn_offset_old(lsn_t lsn) const;
/** Set the field values to correspond to a given lsn. */
void set_fields(lsn_t lsn)
@@ -614,14 +568,6 @@ public:
void set_check_flush_or_checkpoint(bool flag= true)
{ check_flush_or_checkpoint_.store(flag, std::memory_order_relaxed); }
- /** @return the log block trailer offset */
- unsigned trailer_offset() const
- {
- return log.format == FORMAT_ENC_10_4
- ? OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_CHECKSUM - LOG_BLOCK_KEY
- : OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_CHECKSUM;
- }
-
size_t get_pending_flushes() const
{
return pending_flushes.load(std::memory_order_relaxed);
diff --git a/storage/innobase/include/log0log.ic b/storage/innobase/include/log0log.ic
index b3bc10620f0..f3a3d2aa1d9 100644
--- a/storage/innobase/include/log0log.ic
+++ b/storage/innobase/include/log0log.ic
@@ -28,70 +28,6 @@ Created 12/9/1995 Heikki Tuuri
#include "srv0mon.h"
#include "ut0crc32.h"
-/** Calculate the checksum for a log block using the pre-5.7.9 algorithm.
-@param[in] block log block
-@return checksum */
-UNIV_INLINE
-ulint
-log_block_calc_checksum_format_0(
- const byte* block)
-{
- ulint sum;
- ulint sh;
- ulint i;
-
- sum = 1;
- sh = 0;
-
- for (i = 0; i < OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_CHECKSUM; i++) {
- ulint b = (ulint) block[i];
- sum &= 0x7FFFFFFFUL;
- sum += b;
- sum += b << sh;
- sh++;
- if (sh > 24) {
- sh = 0;
- }
- }
-
- return(sum);
-}
-
-/** Calculate the CRC-32C checksum of a log block.
-@param[in] block log block
-@return checksum */
-inline ulint log_block_calc_checksum_crc32(const byte* block)
-{
- return ut_crc32(block, OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_CHECKSUM);
-}
-
-/************************************************************//**
-Gets a log block checksum field value.
-@return checksum */
-UNIV_INLINE
-ulint
-log_block_get_checksum(
-/*===================*/
- const byte* log_block) /*!< in: log block */
-{
- return(mach_read_from_4(log_block + OS_FILE_LOG_BLOCK_SIZE
- - LOG_BLOCK_CHECKSUM));
-}
-
-/************************************************************//**
-Sets a log block checksum field value. */
-UNIV_INLINE
-void
-log_block_set_checksum(
-/*===================*/
- byte* log_block, /*!< in/out: log block */
- ulint checksum) /*!< in: checksum */
-{
- mach_write_to_4(log_block + OS_FILE_LOG_BLOCK_SIZE
- - LOG_BLOCK_CHECKSUM,
- checksum);
-}
-
/***********************************************************************//**
Checks if there is need for a log buffer flush or a new checkpoint, and does
this if yes. Any database operation should call this when it has modified
diff --git a/storage/innobase/include/log0recv.h b/storage/innobase/include/log0recv.h
index 91b0cd7cc1f..371f492c564 100644
--- a/storage/innobase/include/log0recv.h
+++ b/storage/innobase/include/log0recv.h
@@ -149,7 +149,7 @@ struct recv_dblwr_t {
/** the recovery state and buffered records for a page */
struct page_recv_t
{
- /** Recovery state */
+ /** Recovery state; protected by recv_sys.mutex */
enum
{
/** not yet processed */
@@ -216,10 +216,14 @@ struct page_recv_t
};
/** Recovery system data structure */
-struct recv_sys_t{
- ib_mutex_t mutex; /*!< mutex protecting the fields apply_log_recs,
- n_addrs, and the state field in each recv_addr
- struct */
+struct recv_sys_t
+{
+ /** mutex protecting apply_log_recs and page_recv_t::state */
+ ib_mutex_t mutex;
+ /** whether recv_recover_page(), invoked from buf_page_io_complete(),
+ should apply log records*/
+ bool apply_log_recs;
+
ib_mutex_t writer_mutex;/*!< mutex coordinating
flushing between recv_writer_thread and
the recovery thread. */
@@ -230,9 +234,6 @@ struct recv_sys_t{
buf_flush_t flush_type;/*!< type of the flush request.
BUF_FLUSH_LRU: flush end of LRU, keeping free blocks.
BUF_FLUSH_LIST: flush all of blocks. */
- /** whether recv_recover_page(), invoked from buf_page_io_complete(),
- should apply log records*/
- bool apply_log_recs;
/** whether recv_apply_hashed_log_recs() is running */
bool apply_batch_on;
byte* buf; /*!< buffer for parsing log records */
@@ -246,9 +247,6 @@ struct recv_sys_t{
lsn_t scanned_lsn;
/*!< the log data has been scanned up to this
lsn */
- ulint scanned_checkpoint_no;
- /*!< the log data has been scanned up to this
- checkpoint number (lowest 4 bytes) */
ulint recovered_offset;
/*!< start offset of non-parsed log records in
buf */
@@ -266,6 +264,9 @@ struct recv_sys_t{
/** the time when progress was last reported */
time_t progress_time;
+ /** The sequence bit of the next record to parse */
+ bool sequence_bit;
+
using map = std::map<const page_id_t, page_recv_t,
std::less<const page_id_t>,
ut_allocator<std::pair<const page_id_t, page_recv_t>>>;
diff --git a/storage/innobase/log/log0crypt.cc b/storage/innobase/log/log0crypt.cc
index f3d66d3a221..044b56c0f0c 100644
--- a/storage/innobase/log/log0crypt.cc
+++ b/storage/innobase/log/log0crypt.cc
@@ -34,22 +34,15 @@ MDEV-11782: Rewritten for MariaDB 10.2 by Marko Mäkelä, MariaDB Corporation.
/** innodb_encrypt_log: whether to encrypt the redo log */
my_bool srv_encrypt_log;
-struct aes_block_t {
- byte bytes[MY_AES_BLOCK_SIZE];
-};
-
struct crypt_info_t {
ulint checkpoint_no; /*!< checkpoint no; 32 bits */
uint key_version; /*!< mysqld key version */
/** random string for encrypting the key */
- aes_block_t crypt_msg;
+ alignas(8) byte crypt_msg[MY_AES_BLOCK_SIZE];
/** the secret key */
- aes_block_t crypt_key;
+ alignas(8) byte crypt_key[MY_AES_BLOCK_SIZE];
/** a random string for the per-block initialization vector */
- union {
- uint32_t word;
- byte bytes[4];
- } crypt_nonce;
+ alignas(4) byte crypt_nonce[4];
};
/** The crypt info */
@@ -88,7 +81,7 @@ static bool init_crypt_key(crypt_info_t* info, bool upgrade = false)
byte mysqld_key[MY_AES_MAX_KEY_LENGTH];
uint keylen = sizeof mysqld_key;
- compile_time_assert(16 == sizeof info->crypt_key.bytes);
+ compile_time_assert(16 == sizeof info->crypt_key);
compile_time_assert(16 == MY_AES_BLOCK_SIZE);
if (uint rc = encryption_key_get(log_t::KEY_ID,
@@ -111,8 +104,8 @@ static bool init_crypt_key(crypt_info_t* info, bool upgrade = false)
uint dst_len;
int err= my_aes_crypt(MY_AES_ECB,
ENCRYPTION_FLAG_NOPAD | ENCRYPTION_FLAG_ENCRYPT,
- info->crypt_msg.bytes, MY_AES_BLOCK_SIZE,
- info->crypt_key.bytes, &dst_len,
+ info->crypt_msg, MY_AES_BLOCK_SIZE,
+ info->crypt_key, &dst_len,
mysqld_key, keylen, NULL, 0);
if (err != MY_AES_OK || dst_len != MY_AES_BLOCK_SIZE) {
@@ -124,86 +117,49 @@ static bool init_crypt_key(crypt_info_t* info, bool upgrade = false)
return true;
}
-/** Encrypt or decrypt log blocks.
-@param[in,out] buf log blocks to encrypt or decrypt
-@param[in] lsn log sequence number of the start of the buffer
-@param[in] size size of the buffer, in bytes
-@param[in] decrypt whether to decrypt, instead of encrypting
-@return whether the operation succeeded (encrypt always does) */
-bool log_crypt(byte* buf, lsn_t lsn, ulint size, bool decrypt)
+/** Decrypt a log block when upgrading from MariaDB 10.2.5 to 10.5.1.
+@param[in,out] buf 512-byte log block to decrypt
+@param[in] lsn log sequence number of the start of the buffer
+@return whether the operation succeeded */
+ATTRIBUTE_COLD bool log_decrypt_10_4(byte* buf, lsn_t lsn)
{
- ut_ad(size % OS_FILE_LOG_BLOCK_SIZE == 0);
- buf = my_assume_aligned<OS_FILE_LOG_BLOCK_SIZE>(buf);
- ut_a(info.key_version);
-
- uint32_t aes_ctr_iv[MY_AES_BLOCK_SIZE / sizeof(uint32_t)];
- compile_time_assert(sizeof(uint32_t) == 4);
-
-#define LOG_CRYPT_HDR_SIZE 4
- lsn &= ~lsn_t(OS_FILE_LOG_BLOCK_SIZE - 1);
-
- for (const byte* const end = buf + size; buf != end;
- buf += OS_FILE_LOG_BLOCK_SIZE, lsn += OS_FILE_LOG_BLOCK_SIZE) {
- uint32_t dst[(OS_FILE_LOG_BLOCK_SIZE - LOG_CRYPT_HDR_SIZE
- - LOG_BLOCK_CHECKSUM)
- / sizeof(uint32_t)];
-
- /* The log block number is not encrypted. */
- *aes_ctr_iv =
-#ifdef WORDS_BIGENDIAN
- 0x7FFFFFFFU
-#else
- 0x7FU
-#endif
- & (*dst = *reinterpret_cast<const uint32_t*>(buf));
- aes_ctr_iv[1] = info.crypt_nonce.word;
- mach_write_to_8(reinterpret_cast<byte*>(aes_ctr_iv + 2), lsn);
- const uint dst_size
- = log_sys.log.format == log_t::FORMAT_ENC_10_4
- ? sizeof dst - LOG_BLOCK_KEY
- : sizeof dst;
- if (!decrypt) {
- ut_ad(log_sys.is_physical());
- } else if (UNIV_UNLIKELY(log_sys.log.format
- == log_t::FORMAT_ENC_10_4)) {
- const uint key_version = info.key_version;
- info.key_version = mach_read_from_4(
- OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_KEY
- - LOG_BLOCK_CHECKSUM + buf);
- if (key_version != info.key_version
- && !init_crypt_key(&info)) {
- return false;
- }
-#ifndef DBUG_OFF
- if (key_version != info.key_version) {
- DBUG_PRINT("ib_log", ("key_version: %x -> %x",
- key_version,
- info.key_version));
- }
-#endif /* !DBUG_OFF */
- }
-
- ut_ad(LOG_CRYPT_HDR_SIZE + dst_size
- == log_sys.trailer_offset());
-
- uint dst_len;
- int rc = encryption_crypt(
- buf + LOG_CRYPT_HDR_SIZE, dst_size,
- reinterpret_cast<byte*>(dst), &dst_len,
- const_cast<byte*>(info.crypt_key.bytes),
- MY_AES_BLOCK_SIZE,
- reinterpret_cast<byte*>(aes_ctr_iv), sizeof aes_ctr_iv,
- decrypt
- ? ENCRYPTION_FLAG_DECRYPT | ENCRYPTION_FLAG_NOPAD
- : ENCRYPTION_FLAG_ENCRYPT | ENCRYPTION_FLAG_NOPAD,
- log_t::KEY_ID,
- info.key_version);
- ut_a(rc == MY_AES_OK);
- ut_a(dst_len == dst_size);
- memcpy(buf + LOG_CRYPT_HDR_SIZE, dst, dst_size);
- }
-
- return true;
+ buf= my_assume_aligned<512>(buf);
+ ut_ad(info.key_version);
+
+ alignas(8) byte aes_ctr_iv[MY_AES_BLOCK_SIZE];
+ constexpr uint LOG_CRYPT_HDR_SIZE= 4;
+ alignas(4) byte dst[512 - LOG_CRYPT_HDR_SIZE - 4];
+
+ /* The log block number is not encrypted. */
+ memcpy_aligned<4>(dst, buf, 4);
+ memcpy_aligned<4>(aes_ctr_iv, buf, 4);
+ *aes_ctr_iv&= 0x7f;
+ memcpy_aligned<4>(aes_ctr_iv + 4, info.crypt_nonce, 4);
+ mach_write_to_8(my_assume_aligned<8>(aes_ctr_iv + 8), lsn);
+ uint dst_size= sizeof dst;
+
+ if (log_sys.log.format == log_t::FORMAT_ENC_10_4)
+ {
+ dst_size-= 4;
+ const uint key_version= info.key_version;
+ info.key_version = mach_read_from_4(512 - 4 - 4 + buf);
+ if (key_version != info.key_version && !init_crypt_key(&info))
+ return false;
+ }
+
+ uint dst_len;
+ int rc= encryption_crypt(buf + LOG_CRYPT_HDR_SIZE, dst_size,
+ reinterpret_cast<byte*>(dst), &dst_len,
+ const_cast<byte*>(info.crypt_key),
+ MY_AES_BLOCK_SIZE,
+ reinterpret_cast<byte*>(aes_ctr_iv),
+ sizeof aes_ctr_iv,
+ ENCRYPTION_FLAG_DECRYPT | ENCRYPTION_FLAG_NOPAD,
+ log_t::KEY_ID, info.key_version);
+ ut_a(rc == MY_AES_OK);
+ ut_a(dst_len == dst_size);
+ memcpy(buf + LOG_CRYPT_HDR_SIZE, dst, dst_size);
+ return true;
}
/** Initialize the redo log encryption key and random parameters
@@ -218,9 +174,9 @@ bool log_crypt_init()
if (info.key_version == ENCRYPTION_KEY_VERSION_INVALID)
ib::error() << "log_crypt_init(): cannot get key version";
else if (my_random_bytes(tmp_iv, MY_AES_BLOCK_SIZE) != MY_AES_OK ||
- my_random_bytes(info.crypt_msg.bytes, sizeof info.crypt_msg) !=
+ my_random_bytes(info.crypt_msg, sizeof info.crypt_msg) !=
MY_AES_OK ||
- my_random_bytes(info.crypt_nonce.bytes, sizeof info.crypt_nonce) !=
+ my_random_bytes(info.crypt_nonce, sizeof info.crypt_nonce) !=
MY_AES_OK)
ib::error() << "log_crypt_init(): my_random_bytes() failed";
else if (init_crypt_key(&info))
@@ -241,9 +197,7 @@ uint32_t log_crypt_key_version()
/** Read the MariaDB 10.1 checkpoint crypto (version, msg and iv) info.
@param[in] buf checkpoint buffer
@return whether the operation was successful */
-UNIV_INTERN
-bool
-log_crypt_101_read_checkpoint(const byte* buf)
+ATTRIBUTE_COLD bool log_crypt_101_read_checkpoint(const byte* buf)
{
buf += 20 + 32 * 9;
@@ -265,9 +219,8 @@ log_crypt_101_read_checkpoint(const byte* buf)
infos_used++;
info.checkpoint_no = checkpoint_no;
info.key_version = mach_read_from_4(buf + 4);
- memcpy(info.crypt_msg.bytes, buf + 8, MY_AES_BLOCK_SIZE);
- memcpy(info.crypt_nonce.bytes, buf + 24,
- sizeof info.crypt_nonce);
+ memcpy(info.crypt_msg, buf + 8, MY_AES_BLOCK_SIZE);
+ memcpy(info.crypt_nonce, buf + 24, sizeof info.crypt_nonce);
if (!init_crypt_key(&info, true)) {
return false;
@@ -283,10 +236,8 @@ next_slot:
@param[in,out] buf log block
@param[in] start_lsn server start LSN
@return whether the decryption was successful */
-bool log_crypt_101_read_block(byte* buf, lsn_t start_lsn)
+ATTRIBUTE_COLD bool log_crypt_101_read_block(byte* buf, lsn_t start_lsn)
{
- ut_ad(log_block_calc_checksum_format_0(buf)
- != log_block_get_checksum(buf));
const uint32_t checkpoint_no = mach_read_from_4(buf + 8);
const crypt_info_t* info = infos;
for (const crypt_info_t* const end = info + infos_used; info < end;
@@ -315,7 +266,7 @@ found:
/* The log block header is not encrypted. */
memcpy(dst, buf, 12);
- memcpy(aes_ctr_iv, info->crypt_nonce.bytes, 3);
+ memcpy(aes_ctr_iv, info->crypt_nonce, 3);
mach_write_to_8(aes_ctr_iv + 3,
log_block_get_start_lsn(start_lsn, log_block_no));
memcpy(aes_ctr_iv + 11, buf, 4);
@@ -324,7 +275,7 @@ found:
int rc = encryption_crypt(buf + 12, src_len,
dst + 12, &dst_len,
- const_cast<byte*>(info->crypt_key.bytes),
+ const_cast<byte*>(info->crypt_key),
MY_AES_BLOCK_SIZE,
aes_ctr_iv, MY_AES_BLOCK_SIZE,
ENCRYPTION_FLAG_DECRYPT
@@ -340,12 +291,19 @@ found:
return true;
}
+/** Checkpoint number */
+constexpr uint LOG_CHECKPOINT_NO= 0;
+/** MariaDB 10.2.5 encrypted redo log encryption key version (32 bits)*/
+constexpr uint LOG_CHECKPOINT_CRYPT_KEY= 32;
+/** MariaDB 10.2.5 encrypted redo log random nonce (32 bits) */
+constexpr uint LOG_CHECKPOINT_CRYPT_NONCE= 36;
+/** MariaDB 10.2.5 encrypted redo log random message (MY_AES_BLOCK_SIZE) */
+constexpr uint LOG_CHECKPOINT_CRYPT_MESSAGE= 40;
+
/** Read the checkpoint crypto (version, msg and iv) info.
@param[in] buf checkpoint buffer
@return whether the operation was successful */
-UNIV_INTERN
-bool
-log_crypt_read_checkpoint_buf(const byte* buf)
+ATTRIBUTE_COLD bool log_crypt_read_checkpoint_buf(const byte* buf)
{
info.checkpoint_no = mach_read_from_4(buf + (LOG_CHECKPOINT_NO + 4));
info.key_version = mach_read_from_4(buf + LOG_CHECKPOINT_CRYPT_KEY);
@@ -353,15 +311,15 @@ log_crypt_read_checkpoint_buf(const byte* buf)
#if MY_AES_BLOCK_SIZE != 16
# error "MY_AES_BLOCK_SIZE != 16; redo log checkpoint format affected"
#endif
- compile_time_assert(16 == sizeof info.crypt_msg.bytes);
+ compile_time_assert(16 == sizeof info.crypt_msg);
compile_time_assert(16 == MY_AES_BLOCK_SIZE);
compile_time_assert(LOG_CHECKPOINT_CRYPT_MESSAGE
- LOG_CHECKPOINT_CRYPT_NONCE
== sizeof info.crypt_nonce);
- memcpy(info.crypt_msg.bytes, buf + LOG_CHECKPOINT_CRYPT_MESSAGE,
+ memcpy(info.crypt_msg, buf + LOG_CHECKPOINT_CRYPT_MESSAGE,
MY_AES_BLOCK_SIZE);
- memcpy(info.crypt_nonce.bytes, buf + LOG_CHECKPOINT_CRYPT_NONCE,
+ memcpy(info.crypt_nonce, buf + LOG_CHECKPOINT_CRYPT_NONCE,
sizeof info.crypt_nonce);
return init_crypt_key(&info);
@@ -390,7 +348,7 @@ log_tmp_block_encrypt(
int rc = encryption_crypt(
src, uint(size), dst, &dst_len,
- const_cast<byte*>(info.crypt_key.bytes), MY_AES_BLOCK_SIZE,
+ const_cast<byte*>(info.crypt_key), MY_AES_BLOCK_SIZE,
reinterpret_cast<byte*>(iv), uint(sizeof iv),
encrypt
? ENCRYPTION_FLAG_ENCRYPT|ENCRYPTION_FLAG_NOPAD
diff --git a/storage/innobase/log/log0log.cc b/storage/innobase/log/log0log.cc
index dd702c69530..34c82759f95 100644
--- a/storage/innobase/log/log0log.cc
+++ b/storage/innobase/log/log0log.cc
@@ -1503,7 +1503,7 @@ std::vector<std::string> get_existing_log_files_paths() {
dberr_t create_data_file(os_offset_t size)
{
- ut_ad(size > LOG_MAIN_FILE_SIZE);
+ ut_ad(size >= 512);
const auto path= get_log_file_path(LOG_DATA_FILE_NAME);
os_file_delete_if_exists(innodb_log_file_key, path.c_str(), nullptr);
diff --git a/storage/innobase/log/log0recv.cc b/storage/innobase/log/log0recv.cc
index f66b8b4aa39..76f840d314a 100644
--- a/storage/innobase/log/log0recv.cc
+++ b/storage/innobase/log/log0recv.cc
@@ -1022,7 +1022,6 @@ void recv_sys_t::create()
len = 0;
parse_start_lsn = 0;
scanned_lsn = 0;
- scanned_checkpoint_no = 0;
recovered_offset = 0;
recovered_lsn = 0;
found_corrupt_log = false;
@@ -1218,8 +1217,8 @@ fail:
break;
}
- ulint crc = log_block_calc_checksum_crc32(buf);
- ulint cksum = log_block_get_checksum(buf);
+ uint32_t crc = ut_crc32(buf, 512 - 4);
+ uint32_t cksum = mach_read_from_4(&buf[512 - 4]);
DBUG_EXECUTE_IF("log_intermittent_checksum_mismatch", {
static int block_counter;
@@ -1237,13 +1236,6 @@ fail:
goto fail;
}
- if ((is_physical()
- ? is_encrypted_physical()
- : is_encrypted_old())
- && !log_crypt(buf, *start_lsn,
- OS_FILE_LOG_BLOCK_SIZE, true)) {
- goto fail;
- }
#if 0// FIXME
ulint dl = log_block_get_data_len(buf);
if (dl < LOG_BLOCK_HDR_SIZE
@@ -1270,18 +1262,6 @@ fail:
}
-/** Check the consistency of a log header block.
-@param[in] log header block
-@return true if ok */
-static
-bool
-recv_check_log_header_checksum(
- const byte* buf)
-{
- return(log_block_get_checksum(buf)
- == log_block_calc_checksum_crc32(buf));
-}
-
static bool redo_file_sizes_are_correct()
{
auto paths= get_existing_log_files_paths();
@@ -1304,15 +1284,32 @@ static bool redo_file_sizes_are_correct()
return false;
}
+/** Calculate the checksum for a log block using the pre-10.2.2 algorithm. */
+inline uint32_t log_block_calc_checksum_format_0(const byte *block)
+{
+ uint32_t sum= 1;
+
+ for (ulint i= 0, sh= 0; i < 512 - 4; i++)
+ {
+ ulint b= ulint{block[i]};
+ sum&= 0x7FFFFFFFUL;
+ sum+= b;
+ sum+= b << sh++;
+ if (sh == 24)
+ sh= 0;
+ }
+
+ return sum;
+}
+
/** Determine if a redo log from before MariaDB 10.2.2 is clean.
@return error code
@retval DB_SUCCESS if the redo log is clean
@retval DB_CORRUPTION if the redo log is corrupted
@retval DB_ERROR if the redo log is not empty */
-static dberr_t recv_log_recover_pre_10_2()
+ATTRIBUTE_COLD static dberr_t recv_log_recover_pre_10_2()
{
uint64_t max_no= 0;
- uint64_t checkpoint_no;
byte *buf= log_sys.buf;
ut_ad(log_sys.log.format == 0);
@@ -1348,17 +1345,17 @@ static dberr_t recv_log_recover_pre_10_2()
continue;
}
- checkpoint_no = mach_read_from_8(buf + LOG_CHECKPOINT_NO);
-
if (!log_crypt_101_read_checkpoint(buf))
{
ib::error() << "Decrypting checkpoint failed";
continue;
}
+ const uint64_t checkpoint_no= mach_read_from_8(buf);
+
DBUG_PRINT("ib_log", ("checkpoint " UINT64PF " at " LSN_PF " found",
checkpoint_no,
- mach_read_from_8(buf + LOG_CHECKPOINT_LSN)));
+ mach_read_from_8(buf + CHECKPOINT_LSN)));
if (checkpoint_no >= max_no)
{
@@ -1390,7 +1387,8 @@ static dberr_t recv_log_recover_pre_10_2()
recv_sys.read(source_offset & ~511, {buf, 512});
- if (log_block_calc_checksum_format_0(buf) != log_block_get_checksum(buf) &&
+ if (log_block_calc_checksum_format_0(buf) !=
+ mach_read_from_4(&buf[512 - 4]) &&
!log_crypt_101_read_block(buf, lsn))
{
ib::error() << NO_UPGRADE_RECOVERY_MSG << ", and it appears corrupted.";
@@ -1401,8 +1399,7 @@ static dberr_t recv_log_recover_pre_10_2()
{
/* Mark the redo log for upgrading. */
srv_log_file_size= 0;
- recv_sys.parse_start_lsn= recv_sys.recovered_lsn= recv_sys.scanned_lsn=
- lsn;
+ recv_sys.parse_start_lsn= recv_sys.scanned_lsn= lsn;
log_sys.last_checkpoint_lsn= log_sys.next_checkpoint_lsn=
log_sys.write_lsn= log_sys.current_flush_lsn= lsn;
log_sys.next_checkpoint_no= 0;
@@ -1418,11 +1415,13 @@ static dberr_t recv_log_recover_pre_10_2()
return DB_ERROR;
}
-/** Same as cals_lsn_offset() except that it supports multiple files */
-lsn_t log_t::file::calc_lsn_offset_old(lsn_t lsn) const
+/** Calculate the offset of a log sequence number
+in an old redo log file (during upgrade check).
+@param[in] lsn log sequence number
+@return byte offset within the log */
+inline lsn_t log_t::file::calc_lsn_offset_old(lsn_t lsn) const
{
constexpr size_t LOG_FILE_HDR_SIZE= 2048;
- ut_ad(log_sys.mutex.is_owned() || log_write_lock_own());
const lsn_t size= (file_size - LOG_FILE_HDR_SIZE) * recv_sys.files_size();
lsn_t l= lsn - this->lsn;
if (longlong(l) < 0)
@@ -1440,198 +1439,237 @@ lsn_t log_t::file::calc_lsn_offset_old(lsn_t lsn) const
@retval DB_SUCCESS if the redo log is clean
@retval DB_CORRUPTION if the redo log is corrupted
@retval DB_ERROR if the redo log is not empty */
-static dberr_t recv_log_recover_10_4()
+ATTRIBUTE_COLD static dberr_t recv_log_recover_10_4()
{
- const lsn_t lsn = log_sys.log.get_lsn();
- const lsn_t source_offset = log_sys.log.calc_lsn_offset_old(lsn);
- byte* buf = log_sys.buf;
+ uint64_t max_no= 0;
+ byte *buf= log_sys.buf;
+ lsn_t lsn= 0;
- if (!redo_file_sizes_are_correct()) {
- return DB_CORRUPTION;
- }
+ /** the checkpoint LSN field */
+ constexpr uint CHECKPOINT_LSN= 8;
+ /** Byte offset of the log record corresponding to LOG_CHECKPOINT_LSN */
+ constexpr uint CHECKPOINT_OFFSET= 16;
+ /** start LSN of the MLOG_CHECKPOINT mini-transaction corresponding
+ to this checkpoint, or 0 if the information has not been written */
+ constexpr uint CHECKPOINT_END_LSN= 512 - 16;
- log_sys.log.main_read(source_offset & ~(OS_FILE_LOG_BLOCK_SIZE - 1),
- {buf, OS_FILE_LOG_BLOCK_SIZE});
-
- ulint crc = log_block_calc_checksum_crc32(buf);
- ulint cksum = log_block_get_checksum(buf);
-
- if (crc != cksum) {
- ib::error() << "Invalid log block checksum."
- << " block: "
- << (mach_read_from_4(buf) & 0x7FFFFFFF)
- << " checkpoint no: "
- << mach_read_from_4(buf + 8)
- << " expected: " << crc
- << " found: " << cksum;
- return DB_CORRUPTION;
- }
+ for (ulint field= LOG_CHECKPOINT_1; field <= LOG_CHECKPOINT_2;
+ field += LOG_CHECKPOINT_2 - LOG_CHECKPOINT_1)
+ {
+ log_sys.log.main_read(field, {buf, OS_FILE_LOG_BLOCK_SIZE});
- if (log_sys.log.is_encrypted_old()
- && !log_crypt(buf, lsn & (OS_FILE_LOG_BLOCK_SIZE - 1),
- OS_FILE_LOG_BLOCK_SIZE, true)) {
- return DB_ERROR;
- }
+ const uint32_t crc32= ut_crc32(buf, 512 - 4);
+ const uint32_t cksum= mach_read_from_4(&buf[512 - 4]);
- /* On a clean shutdown, the redo log will be logically empty
- after the checkpoint lsn. */
+ if (crc32 != cksum)
+ {
+ DBUG_PRINT("ib_log",
+ ("invalid checkpoint, at " ULINTPF
+ ", checksum %x expected %x", field, cksum, crc32));
+ continue;
+ }
- if (mach_read_from_2(buf + 4/* LOG_BLOCK_HDR_DATA_LEN */)
- != (source_offset & (OS_FILE_LOG_BLOCK_SIZE - 1))) {
- return DB_ERROR;
- }
+ if (log_sys.is_encrypted_old() && !log_crypt_read_checkpoint_buf(buf))
+ {
+ ib::error() << "Reading checkpoint encryption info failed.";
+ continue;
+ }
- /* Mark the redo log for upgrading. */
- srv_log_file_size = 0;
- recv_sys.parse_start_lsn = recv_sys.recovered_lsn
- = recv_sys.scanned_lsn = lsn;
- log_sys.last_checkpoint_lsn = log_sys.next_checkpoint_lsn
- = log_sys.write_lsn = log_sys.current_flush_lsn = lsn;
- log_sys.next_checkpoint_no = 0;
- return DB_SUCCESS;
-}
+ const lsn_t checkpoint_lsn= mach_read_from_8(buf + CHECKPOINT_LSN);
+ const lsn_t end_lsn= mach_read_from_8(buf + CHECKPOINT_END_LSN);
+ if (end_lsn && end_lsn < checkpoint_lsn)
+ continue;
-/** Find the latest checkpoint in the log header.
-@param[out] max_field LOG_CHECKPOINT_1 or LOG_CHECKPOINT_2
-@return error code or DB_SUCCESS */
-dberr_t
-recv_find_max_checkpoint(ulint* max_field)
-{
- ib_uint64_t max_no;
- ib_uint64_t checkpoint_no;
- ulint field;
- byte* buf;
+ uint64_t checkpoint_no= mach_read_from_8(buf);
- max_no = 0;
- *max_field = 0;
+ DBUG_PRINT("ib_log", ("checkpoint " UINT64PF " at " LSN_PF " found",
+ checkpoint_no, checkpoint_lsn));
- ut_ad(!(log_sys.log.file_size & 511));
+ if (checkpoint_no >= max_no)
+ {
+ max_no= checkpoint_no;
+ lsn= checkpoint_lsn;
+ log_sys.log.set_lsn(lsn);
+ log_sys.log.set_lsn_offset(mach_read_from_8(buf + CHECKPOINT_OFFSET));
+ log_sys.next_checkpoint_no= checkpoint_no;
+ }
+ }
- buf = log_sys.buf;
+ if (!lsn)
+ {
+ /* Before 10.2.2, we could get here during database initialization
+ if we created an LOG_FILE_NAME file that was filled with zeroes,
+ and were killed. After 10.2.2, we would reject such a file already
+ earlier, when checking the file header. */
+ ib::error() << "No valid checkpoint found (corrupted redo log).";
+ return DB_ERROR;
+ }
- log_sys.log.main_read(0, {buf, OS_FILE_LOG_BLOCK_SIZE});
- /* Check the header page checksum. There was no
- checksum in the first redo log format (version 0). */
- log_sys.log.format = mach_read_from_4(buf + log_header::FORMAT);
- if (log_sys.is_physical()) {
- log_sys.log.key_version = mach_read_from_4(
- buf + log_header::KEY_VERSION);
- } else {
- log_sys.log.key_version = 0;
- }
+ log_sys.set_lsn(lsn);
+ log_sys.set_flushed_lsn(lsn);
+ const lsn_t source_offset= log_sys.log.calc_lsn_offset_old(lsn);
- if (log_sys.log.format != log_t::FORMAT_3_23
- && !recv_check_log_header_checksum(buf)) {
- ib::error() << "Invalid redo log header checksum.";
- return(DB_CORRUPTION);
- }
+ if (!redo_file_sizes_are_correct())
+ return DB_CORRUPTION;
- char creator[log_header::CREATOR_END - log_header::CREATOR + 1];
-
- memcpy(creator, buf + log_header::CREATOR, sizeof creator);
- /* Ensure that the string is NUL-terminated. */
- creator[log_header::CREATOR_END - log_header::CREATOR] = 0;
-
- switch (log_sys.log.format) {
- case log_t::FORMAT_3_23:
- return recv_log_recover_pre_10_2();
- case log_t::FORMAT_10_2:
- case log_t::FORMAT_10_2 | log_t::FORMAT_ENCRYPTED:
- case log_t::FORMAT_10_3:
- case log_t::FORMAT_10_3 | log_t::FORMAT_ENCRYPTED:
- case log_t::FORMAT_10_4:
- case log_t::FORMAT_10_4 | log_t::FORMAT_ENCRYPTED:
- break;
- case log_t::FORMAT_10_5:
- if (auto size = mach_read_from_8(buf + log_header::SIZE)) {
- size &= ~(1ULL << 47);
- if (size == log_sys.log.file_size) {
- goto current;
- }
+ log_sys.log.main_read(source_offset & ~511, {buf, 512});
- ib::error() << "Inconsistent redo log size: "
- << size << "!=" << log_sys.log.file_size;
- }
- /* fall through */
- default:
- ib::error() << "Unsupported redo log format."
- " The redo log was created with " << creator << ".";
- return(DB_ERROR);
- }
+ const uint32_t crc= ut_crc32(buf, 512 - 4);
+ const uint32_t cksum= mach_read_from_4(&buf[512 - 4]);
- for (field = LOG_CHECKPOINT_1; field <= LOG_CHECKPOINT_2;
- field += LOG_CHECKPOINT_2 - LOG_CHECKPOINT_1) {
- log_sys.log.main_read(field, {buf, OS_FILE_LOG_BLOCK_SIZE});
+ if (crc != cksum)
+ {
+ ib::error() << "Invalid log block checksum. block: "
+ << (mach_read_from_4(buf) & 0x7FFFFFFF)
+ << " checkpoint no: "
+ << mach_read_from_4(buf + 8)
+ << " expected: " << crc << " found: " << cksum;
+ return DB_CORRUPTION;
+ }
- const ulint crc32 = log_block_calc_checksum_crc32(buf);
- const ulint cksum = log_block_get_checksum(buf);
+ if (log_sys.is_encrypted_old() && !log_decrypt_10_4(buf, lsn & ~511))
+ return DB_ERROR;
- if (crc32 != cksum) {
- DBUG_PRINT("ib_log",
- ("invalid checkpoint,"
- " at " ULINTPF
- ", checksum " ULINTPFx
- " expected " ULINTPFx,
- field, cksum, crc32));
- continue;
- }
+ /* On a clean shutdown, the redo log will be logically empty
+ after the checkpoint lsn. */
- if ((log_sys.is_physical()
- ? log_sys.is_encrypted_physical()
- : log_sys.is_encrypted_old())
- && !log_crypt_read_checkpoint_buf(buf)) {
- ib::error() << "Reading checkpoint"
- " encryption info failed.";
- continue;
- }
+ if (mach_read_from_2(buf + 4) != (source_offset & 511))
+ return DB_ERROR;
- checkpoint_no = mach_read_from_8(
- buf + LOG_CHECKPOINT_NO);
-
- DBUG_PRINT("ib_log",
- ("checkpoint " UINT64PF " at " LSN_PF " found",
- checkpoint_no, mach_read_from_8(
- buf + LOG_CHECKPOINT_LSN)));
-
- if (checkpoint_no >= max_no) {
- *max_field = field;
- max_no = checkpoint_no;
- log_sys.log.set_lsn(mach_read_from_8(
- buf + LOG_CHECKPOINT_LSN));
- log_sys.log.set_lsn_offset(mach_read_from_8(
- buf + LOG_CHECKPOINT_OFFSET));
- log_sys.next_checkpoint_no = checkpoint_no;
- }
- }
+ /* Mark the redo log for upgrading. */
+ srv_log_file_size= 0;
+ recv_sys.parse_start_lsn= recv_sys.scanned_lsn= lsn;
+ log_sys.last_checkpoint_lsn= log_sys.next_checkpoint_lsn=
+ log_sys.write_lsn= log_sys.current_flush_lsn= lsn;
+ log_sys.next_checkpoint_no= 0;
+ return DB_SUCCESS;
+}
- if (*max_field == 0) {
- /* Before 10.2.2, we could get here during database
- initialization if we created an LOG_FILE_NAME file that
- was filled with zeroes, and were killed. After
- 10.2.2, we would reject such a file already earlier,
- when checking the file header. */
- ib::error() << "No valid checkpoint found"
- " (corrupted redo log)."
- " You can try --innodb-force-recovery=6"
- " as a last resort.";
- return(DB_ERROR);
- }
+/** Determine if the redo log is clean.
+@return error code
+@retval DB_SUCCESS if the redo log is clean
+@retval DB_CORRUPTION if the redo log is corrupted
+@retval DB_ERROR if the redo log is not empty and cannot be upgraded
+@retval DB_FAIL if crash recovery is needed */
+static dberr_t recv_check()
+{
+ byte *buf= log_sys.buf;
- if (dberr_t err = recv_log_recover_10_4()) {
- ib::error()
- << "Upgrade after a crash is not supported."
- " The redo log was created with " << creator
- << (err == DB_ERROR
- ? "." : ", and it appears corrupted.");
- return err;
- }
+ ut_ad(!(log_sys.log.file_size & 511));
+ log_sys.log.main_read(0, {buf, OS_FILE_LOG_BLOCK_SIZE});
+ /* Check the header page checksum. There was no checksum in the
+ first redo log format (version 0). */
+ log_sys.log.format= mach_read_from_4(buf + log_header::FORMAT);
+ log_sys.log.key_version= 0;
- return(DB_SUCCESS);
-current:
- /* TODO: Seek to the end of the file, read & validate the
- last checkpoint_size bytes. If it is valid and points to
- the end of the log, fine. Else, start crash recovery. */
- return DB_SUCCESS;
+ if (log_sys.log.format != log_t::FORMAT_3_23 &&
+ ut_crc32(buf, 512 - 4) != mach_read_from_4(&buf[512 - 4]))
+ {
+ ib::error() << "Invalid redo log header checksum.";
+ return DB_CORRUPTION;
+ }
+
+ char creator[log_header::CREATOR_END - log_header::CREATOR + 1];
+ memcpy(creator, buf + log_header::CREATOR, sizeof creator);
+ /* Ensure that the string is NUL-terminated. */
+ creator[log_header::CREATOR_END - log_header::CREATOR] = 0;
+
+ switch (log_sys.log.format) {
+ dberr_t err;
+ case log_t::FORMAT_3_23:
+ return recv_log_recover_pre_10_2();
+ case log_t::FORMAT_10_2:
+ case log_t::FORMAT_10_2 | log_t::FORMAT_ENCRYPTED:
+ case log_t::FORMAT_10_3:
+ case log_t::FORMAT_10_3 | log_t::FORMAT_ENCRYPTED:
+ case log_t::FORMAT_10_4:
+ case log_t::FORMAT_10_4 | log_t::FORMAT_ENCRYPTED:
+ err= recv_log_recover_10_4();
+ if (err != DB_SUCCESS)
+ ib::error() << "Upgrade after a crash is not supported."
+ " The redo log was created with " << creator
+ << (err == DB_ERROR
+ ? "." : ", and it appears corrupted.");
+ return err;
+ case log_t::FORMAT_10_5:
+ log_sys.log.key_version= mach_read_from_4(buf + log_header::KEY_VERSION);
+
+ if (auto size= mach_read_from_8(buf + log_header::SIZE))
+ {
+ size &= ~(1ULL << 47);
+ if (size == log_sys.log.file_size)
+ break;
+ ib::error() << "Inconsistent redo log size: "
+ << size << "!=" << log_sys.log.file_size;
+ }
+ /* fall through */
+ default:
+ ib::error() << "Unsupported redo log format."
+ " The redo log was created with " << creator << ".";
+ return DB_ERROR;
+ }
+
+ /* TODO: Seek to the end of the file, read & validate the
+ last checkpoint_size bytes. If it is valid and points to
+ the end of the log, fine. Else, start crash recovery. */
+ if (log_sys.log.main_file_size() < 512 + 19)
+ return DB_CORRUPTION;
+
+ log_sys.log.main_read(log_sys.log.main_file_size() - 19, {buf, 19});
+ if (buf[0] != (FILE_CHECKPOINT | (8 + 6)))
+ return DB_FAIL;
+ if (mach_read_from_4(buf + 19 - 4) != ut_crc32(buf, 15))
+ return DB_FAIL;
+
+ log_sys.last_checkpoint_lsn= log_sys.next_checkpoint_lsn=
+ recv_sys.parse_start_lsn= recv_sys.scanned_lsn=
+ log_sys.write_lsn= log_sys.current_flush_lsn=
+ mach_read_from_8(buf + 1);
+ log_sys.set_lsn(log_sys.last_checkpoint_lsn);
+ log_sys.set_flushed_lsn(log_sys.last_checkpoint_lsn);
+
+ os_offset_t data_file_offset= mach_read_from_6(buf + 1 + 8);
+ recv_sys.sequence_bit= !!(data_file_offset & (1ULL << 47));
+ data_file_offset&= ~(1ULL << 47);
+
+ if (data_file_offset >= log_sys.log.file_size)
+ // not corrupted checkpoint with incorrect file offset?
+ return DB_FAIL;
+
+ os_offset_t first_block_offset= data_file_offset &
+ ~(OS_FILE_LOG_BLOCK_SIZE - 1);
+ log_sys.log.data_read(first_block_offset, {buf, OS_FILE_LOG_BLOCK_SIZE});
+
+ os_offset_t offset_in_block= data_file_offset & (OS_FILE_LOG_BLOCK_SIZE - 1);
+ byte *record= buf + offset_in_block;
+ auto decoded_header= mlog_decode_varint(record);
+ auto header_size= mlog_decode_varint_length(record[0]);
+
+ os_offset_t size= decoded_header >> 2;
+
+ if (!size || (decoded_header & 1) != recv_sys.sequence_bit ||
+ data_file_offset + size > log_sys.log.file_size)
+ return DB_SUCCESS; /* Garbage at the end of the log */
+
+ if (decoded_header & 2) /* skip_bit is set: we must read more */
+ return DB_FAIL;
+
+ os_offset_t n_blocks= ((data_file_offset + header_size + size) %
+ OS_FILE_LOG_BLOCK_SIZE - first_block_offset) /
+ OS_FILE_LOG_BLOCK_SIZE;
+
+ if (os_offset_t further_blocks= n_blocks - 1)
+ log_sys.log.data_read(first_block_offset + OS_FILE_LOG_BLOCK_SIZE,
+ {buf + OS_FILE_LOG_BLOCK_SIZE,
+ further_blocks * OS_FILE_LOG_BLOCK_SIZE});
+
+ /* Clear the sequence bit before calculating the checksum. */
+ record[header_size - 1] &= ~1;
+ if (mach_read_from_4(record + header_size + size - 4) !=
+ ut_crc32(record, header_size + size))
+ return DB_SUCCESS; /* Garbage at the end of the log */
+
+ return DB_FAIL;
}
/** Trim old log records for a page.
@@ -2205,7 +2243,6 @@ static void recv_recover_page(buf_block_t* block, mtr_t& mtr,
ut_ad(l->lsn);
ut_ad(end_lsn <= l->lsn);
end_lsn = l->lsn;
- ut_ad(end_lsn <= log_sys.log.scanned_lsn);
ut_ad(l->start_lsn);
ut_ad(recv_start_lsn <= l->start_lsn);
@@ -2711,7 +2748,7 @@ bool recv_sys_add_to_parsing_buf(const byte* log_block, lsn_t scanned_lsn)
start_offset = data_len - more_len;
- end_offset = std::min<ulint>(data_len, log_sys.trailer_offset());
+ end_offset = data_len; // trailer_offset
ut_ad(start_offset <= end_offset);
@@ -2738,6 +2775,7 @@ void recv_sys_justify_left_parsing_buf()
recv_sys.recovered_offset = 0;
}
+#if 0//FIXME
/** Scan redo log from a buffer and stores new log data to the parsing buffer.
Parse and hash the log records if new data found.
Apply log records automatically when the hash table becomes full.
@@ -2934,31 +2972,26 @@ func_exit:
mutex_exit(&recv_sys.mutex);
return(finished);
}
+#endif
-/** Scans log from a buffer and stores new log data to the parsing buffer.
-Parses and hashes the log records if new data found.
-@param[in] checkpoint_lsn latest checkpoint log sequence number
+/** Parse and store log.
@return the last parsed LSN */
-static lsn_t recv_group_scan_log_recs(lsn_t checkpoint_lsn)
+static lsn_t recv_scan()
{
- DBUG_ENTER("recv_group_scan_log_recs");
+ DBUG_ENTER("recv_scan");
mutex_enter(&recv_sys.mutex);
recv_sys.len = 0;
recv_sys.recovered_offset = 0;
recv_sys.clear();
- recv_sys.parse_start_lsn =
- recv_sys.scanned_lsn =
- recv_sys.recovered_lsn = checkpoint_lsn;
- recv_sys.scanned_checkpoint_no = 0;
ut_ad(recv_max_page_lsn == 0);
mutex_exit(&recv_sys.mutex);
- lsn_t start_lsn;
- lsn_t end_lsn;
+ lsn_t end_lsn= 0;
store_t store = STORE_IF_EXISTS;
- log_sys.log.scanned_lsn = end_lsn =
+#if 0//FIXME
+ end_lsn =
ut_uint64_align_down(checkpoint_lsn, OS_FILE_LOG_BLOCK_SIZE);
do {
@@ -2969,14 +3002,13 @@ static lsn_t recv_group_scan_log_recs(lsn_t checkpoint_lsn)
} while (end_lsn != start_lsn
&& !recv_scan_log_recs(&store, log_sys.buf, checkpoint_lsn,
start_lsn, end_lsn,
- &log_sys.log.scanned_lsn));
-
+ &recv_sys.scanned_lsn));
+#endif
if (recv_sys.found_corrupt_log || recv_sys.found_corrupt_fs) {
DBUG_RETURN(false);
}
- DBUG_PRINT("ib_log", ("scan " LSN_PF " completed",
- log_sys.log.scanned_lsn));
+ DBUG_PRINT("ib_log", ("scan " LSN_PF " completed", end_lsn));
DBUG_RETURN(store == STORE_NO);
}
@@ -3159,12 +3191,6 @@ of first system tablespace page
dberr_t
recv_recovery_from_checkpoint_start(lsn_t flush_lsn)
{
- ulint max_cp_field;
- lsn_t checkpoint_lsn;
- ib_uint64_t checkpoint_no;
- byte* buf;
- dberr_t err = DB_SUCCESS;
-
ut_ad(srv_operation == SRV_OPERATION_NORMAL
|| srv_operation == SRV_OPERATION_RESTORE
|| srv_operation == SRV_OPERATION_RESTORE_EXPORT);
@@ -3184,49 +3210,21 @@ recv_recovery_from_checkpoint_start(lsn_t flush_lsn)
return(DB_SUCCESS);
}
- recv_recovery_on = true;
-
- log_mutex_enter();
-
- err = recv_find_max_checkpoint(&max_cp_field);
-
- if (err != DB_SUCCESS) {
- recv_sys.recovered_lsn = log_sys.get_lsn();
-err_exit:
- log_mutex_exit();
- return(err);
- }
-
- buf = log_sys.buf;
- log_sys.log.main_read(max_cp_field, {buf, OS_FILE_LOG_BLOCK_SIZE});
-
- checkpoint_lsn = mach_read_from_8(buf + LOG_CHECKPOINT_LSN);
- checkpoint_no = mach_read_from_8(buf + LOG_CHECKPOINT_NO);
-
- /* Start reading the log from the checkpoint lsn. */
-
ut_ad(RECV_SCAN_SIZE <= srv_log_buffer_size);
-
ut_ad(recv_sys.pages.empty());
-
- switch (log_sys.log.format) {
- case 0:
- log_mutex_exit();
- return DB_SUCCESS;
+ recv_recovery_on = true;
+ log_mutex_enter();
+ dberr_t err = recv_check();
+ recv_sys.recovered_lsn= log_sys.get_lsn();
+ log_mutex_exit();
+ switch (err) {
default:
- if (const lsn_t end_lsn = mach_read_from_8(
- buf + LOG_CHECKPOINT_END_LSN)) {
- if (end_lsn < checkpoint_lsn) {
- recv_sys.found_corrupt_log = true;
- err = DB_ERROR;
- goto err_exit;
- }
- }
- log_sys.set_lsn(recv_sys.recovered_lsn);
- ut_ad(recv_sys.recovered_lsn == checkpoint_lsn);
- goto completed;
- case log_t::FORMAT_10_5:
- lsn_t end_lsn = recv_group_scan_log_recs(checkpoint_lsn);
+ return err;
+ case DB_SUCCESS:
+ break;
+ case DB_FAIL:
+ ib::error() << "FIXME: crash recovery does not work yet";
+ lsn_t end_lsn= recv_scan();
#if 1// FIXME
/* The first scan should not have stored or applied any
records. */
@@ -3236,7 +3234,9 @@ err_exit:
if (srv_read_only_mode && recv_needed_recovery) {
err = DB_READ_ONLY;
- goto err_exit;
+err_exit:
+ log_mutex_exit();
+ return err;
}
if (recv_sys.found_corrupt_log && !srv_force_recovery) {
@@ -3244,50 +3244,11 @@ err_exit:
err = DB_ERROR;
goto err_exit;
}
- }
-
- /* NOTE: we always do a 'recovery' at startup, but only if
- there is something wrong we will print a message to the
- user about recovery: */
-
- if (flush_lsn == checkpoint_lsn) {
- /* The redo log is logically empty. */
- } else if (checkpoint_lsn != flush_lsn) {
- ut_ad(!srv_log_file_created);
-
- if (checkpoint_lsn < flush_lsn) {
- ib::warn()
- << "Are you sure you are using the right "
- << LOG_FILE_NAME
- << " to start up the database? Log sequence "
- "number in the "
- << LOG_FILE_NAME << " is " << checkpoint_lsn
- << ", less than the log sequence number in "
- "the first system tablespace file header, "
- << flush_lsn << ".";
- }
-
- if (!recv_needed_recovery) {
- ib::info()
- << "The log sequence number " << flush_lsn
- << " in the system tablespace does not match"
- " the log sequence number "
- << checkpoint_lsn << " in the "
- << LOG_FILE_NAME << "!";
-
- if (srv_read_only_mode) {
- ib::error() << "innodb_read_only"
- " prevents crash recovery";
- log_mutex_exit();
- return(DB_READ_ONLY);
- }
- recv_needed_recovery = true;
- }
+ log_sys.set_lsn(end_lsn);
+ break;
}
- log_sys.set_lsn(recv_sys.recovered_lsn);
-
#if 0// MDEV-14425 TODO
if (recv_needed_recovery) {
bool missing_tablespace = false;
@@ -3296,7 +3257,6 @@ err_exit:
rescan, missing_tablespace);
if (err != DB_SUCCESS) {
- log_mutex_exit();
return(err);
}
@@ -3326,7 +3286,6 @@ err_exit:
rescan, missing_tablespace);
if (err != DB_SUCCESS) {
- log_mutex_exit();
return err;
}
@@ -3337,21 +3296,16 @@ err_exit:
}
#endif
- if (log_sys.log.scanned_lsn < checkpoint_lsn
- || log_sys.log.scanned_lsn < recv_max_page_lsn) {
-
+ if (recv_sys.scanned_lsn < recv_max_page_lsn) {
ib::error() << "We scanned the log up to "
- << log_sys.log.scanned_lsn
- << ". A checkpoint was at " << checkpoint_lsn << " and"
- " the maximum LSN on a database page was "
+ << recv_sys.scanned_lsn
+ << ". The maximum LSN on a database page was "
<< recv_max_page_lsn << ". It is possible that the"
" database is now corrupt!";
}
-completed:
+#if 0// FIXME
if (recv_sys.recovered_lsn < checkpoint_lsn) {
- log_mutex_exit();
-
ib::error() << "Recovered only to lsn:"
<< recv_sys.recovered_lsn << " checkpoint_lsn: " << checkpoint_lsn;
@@ -3378,15 +3332,11 @@ completed:
log_sys.last_checkpoint_lsn = checkpoint_lsn;
log_sys.next_checkpoint_no = ++checkpoint_no;
+#endif
mutex_enter(&recv_sys.mutex);
-
recv_sys.apply_log_recs = true;
-
mutex_exit(&recv_sys.mutex);
-
- log_mutex_exit();
-
recv_lsn_checks_on = true;
/* The database is now ready to start almost normal processing of user
diff --git a/storage/innobase/srv/srv0start.cc b/storage/innobase/srv/srv0start.cc
index d2e52f894b1..c8edeaaba1c 100644
--- a/storage/innobase/srv/srv0start.cc
+++ b/storage/innobase/srv/srv0start.cc
@@ -324,7 +324,7 @@ static dberr_t create_log_file(lsn_t lsn, std::string& logfile0)
sizeof log_header::CREATOR_CURRENT);
static_assert(log_header::CREATOR_END - log_header::CREATOR ==
sizeof log_header::CREATOR_CURRENT, "compatibility");
- log_block_set_checksum(buf, log_block_calc_checksum_crc32(buf));
+ mach_write_to_4(&buf[512 - 4], ut_crc32(buf, 512 - 4));
buf+= 512;
/* Write FILE_ID records for any non-predefined tablespaces. */
@@ -1109,17 +1109,17 @@ static lsn_t srv_prepare_to_delete_redo_log_file(bool old_exists)
DBUG_RETURN(flushed_lsn);
}
-/** Tries to locate LOG_FILE_NAME and check it's size, etc
+/** Tries to locate log files and check their size, etc
@param[out] log_file_found returns true here if correct file was found
-@return dberr_t with DB_SUCCESS or some error */
-static dberr_t find_and_check_log_file(bool &log_file_found)
+@return DB_SUCCESS or some error */
+static dberr_t find_and_check_log(bool &log_file_found)
{
log_file_found= false;
auto logfile0= get_log_file_path();
os_file_stat_t stat_info;
- const dberr_t err= os_file_get_status(logfile0.c_str(), &stat_info, false,
- srv_read_only_mode);
+ dberr_t err= os_file_get_status(logfile0.c_str(), &stat_info, false,
+ srv_read_only_mode);
auto is_operation_restore= []() -> bool {
return srv_operation == SRV_OPERATION_RESTORE ||
@@ -1140,24 +1140,45 @@ static dberr_t find_and_check_log_file(bool &log_file_found)
if (!srv_file_check_mode(logfile0.c_str()))
return DB_ERROR;
- const os_offset_t size= stat_info.size;
- ut_a(size != (os_offset_t) -1);
+ ut_a(stat_info.size != (os_offset_t) -1);
- if (size < OS_FILE_LOG_BLOCK_SIZE)
+ if (stat_info.size == 0 && is_operation_restore())
{
- ib::error() << "Log file " << logfile0 << " size " << size
+ /* Tolerate an empty "ib_logfile0" from a previous run of
+ mariabackup --prepare. */
+ return DB_NOT_FOUND;
+ }
+
+ if (stat_info.size < 512)
+ {
+ ib::error() << "Log file " << logfile0 << " size " << stat_info.size
<< " is too small";
return DB_ERROR;
}
- if (size == 0 && is_operation_restore())
+ auto logdata= get_log_file_path(LOG_DATA_FILE_NAME);
+
+ err= os_file_get_status(logdata.c_str(), &stat_info, false,
+ srv_read_only_mode);
+ if (err == DB_NOT_FOUND)
{
- /* Tolerate an empty LOG_FILE_NAME from a previous run of
- mariabackup --prepare. */
- return DB_NOT_FOUND;
+ if (is_operation_restore())
+ return DB_NOT_FOUND;
+
+ return DB_SUCCESS;
+ }
+
+ ut_a(stat_info.size != (os_offset_t) -1);
+
+ if (!stat_info.size || (stat_info.size & 511))
+ {
+ ib::error() << "Log file " << logdata << " size " << stat_info.size
+ << " is incorrect";
+ return DB_ERROR;
}
- srv_log_file_size= size;
+ srv_log_file_size= stat_info.size;
+ log_sys.log.file_size= stat_info.size;
log_file_found= true;
return DB_SUCCESS;
@@ -1443,7 +1464,7 @@ dberr_t srv_start(bool create_new_db)
srv_log_file_size = 0;
bool log_file_found;
- if (dberr_t err = find_and_check_log_file(log_file_found)) {
+ if (dberr_t err = find_and_check_log(log_file_found)) {
if (err == DB_NOT_FOUND) {
return DB_SUCCESS;
}