diff options
author | Marko Mäkelä <marko.makela@mariadb.com> | 2020-03-06 15:21:57 +0200 |
---|---|---|
committer | Marko Mäkelä <marko.makela@mariadb.com> | 2020-03-06 15:21:57 +0200 |
commit | 0939ff30faf3ba22277a7c64a7c0a62b55bad7d4 (patch) | |
tree | 16c58e7793eadbb79ce7646ebd6df520de28c907 | |
parent | 35f0e686d834390e19d0c7fc3b0279bab9994ba7 (diff) | |
download | mariadb-git-10.5-marko.tar.gz |
WIP clean up log upgrade10.5-marko
FIXME: innodb.log_corruption fails with result diff,
and fails to delete ib_logfile1. But it no longer crashes!
-rw-r--r-- | extra/mariabackup/xtrabackup.cc | 62 | ||||
-rw-r--r-- | storage/innobase/include/log0crypt.h | 20 | ||||
-rw-r--r-- | storage/innobase/include/log0log.h | 74 | ||||
-rw-r--r-- | storage/innobase/include/log0log.ic | 64 | ||||
-rw-r--r-- | storage/innobase/include/log0recv.h | 23 | ||||
-rw-r--r-- | storage/innobase/log/log0crypt.cc | 182 | ||||
-rw-r--r-- | storage/innobase/log/log0log.cc | 2 | ||||
-rw-r--r-- | storage/innobase/log/log0recv.cc | 598 | ||||
-rw-r--r-- | storage/innobase/srv/srv0start.cc | 53 |
9 files changed, 432 insertions, 646 deletions
diff --git a/extra/mariabackup/xtrabackup.cc b/extra/mariabackup/xtrabackup.cc index 866c44f1e39..518c2873a87 100644 --- a/extra/mariabackup/xtrabackup.cc +++ b/extra/mariabackup/xtrabackup.cc @@ -2663,17 +2663,14 @@ static lsn_t xtrabackup_copy_log(lsn_t start_lsn, lsn_t end_lsn, bool last) recv_sys_justify_left_parsing_buf(); - log_sys.log.scanned_lsn = scanned_lsn; + recv_sys.scanned_lsn = scanned_lsn; end_lsn = last ? ut_uint64_align_up(scanned_lsn, OS_FILE_LOG_BLOCK_SIZE) : scanned_lsn & ~lsn_t(OS_FILE_LOG_BLOCK_SIZE - 1); if (ulint write_size = ulint(end_lsn - start_lsn)) { - if (srv_encrypt_log) { - log_crypt(log_sys.buf, start_lsn, write_size); - } - + ut_ad(!srv_encrypt_log); // FIXME if (ds_write(dst_log_file, log_sys.buf, write_size)) { msg("Error: write to logfile failed"); return(0); @@ -2733,7 +2730,7 @@ static bool xtrabackup_copy_logfile(bool last = false) } } while (start_lsn == end_lsn); - ut_ad(start_lsn == log_sys.log.scanned_lsn); + ut_ad(start_lsn == recv_sys.scanned_lsn); msg(">> log scanned up to (" LSN_PF ")", start_lsn); @@ -3853,7 +3850,7 @@ static void stop_backup_threads() static bool xtrabackup_backup_low() { ut_ad(!metadata_to_lsn); - +#if 0 // FIXME /* read the latest checkpoint lsn */ { ulint max_cp_field; @@ -3877,7 +3874,7 @@ static bool xtrabackup_backup_low() } log_mutex_exit(); } - +#endif stop_backup_threads(); if (metadata_to_lsn && xtrabackup_copy_logfile(true)) { @@ -4046,8 +4043,6 @@ fail: } { - /* definition from recv_recovery_from_checkpoint_start() */ - ulint max_cp_field; /* start back ground thread to copy newer log */ os_thread_id_t log_copying_thread_id; @@ -4056,9 +4051,12 @@ fail: /* Look for the latest checkpoint from any of the log groups */ log_mutex_enter(); - +#if 0 reread_log_header: dberr_t err = recv_find_max_checkpoint(&max_cp_field); +#else + dberr_t err = DB_FAIL; // FIXME +#endif if (err != DB_SUCCESS) { msg("Error: cannot read redo log header"); @@ -4072,19 +4070,19 @@ reread_log_header: goto fail; } - byte* buf = log_sys.buf; checkpoint_lsn_start = log_sys.log.get_lsn(); checkpoint_no_start = log_sys.next_checkpoint_no; +#if 0 // FIXME + byte* buf = log_sys.buf; log_sys.log.main_read(max_cp_field, {buf, OS_FILE_LOG_BLOCK_SIZE}); - if (checkpoint_no_start != mach_read_from_8(buf + LOG_CHECKPOINT_NO) || checkpoint_lsn_start != mach_read_from_8(buf + LOG_CHECKPOINT_LSN) || log_sys.log.get_lsn_offset() != mach_read_from_8(buf + LOG_CHECKPOINT_OFFSET)) goto reread_log_header; - +#endif log_mutex_exit(); xtrabackup_init_datasinks(); @@ -4112,36 +4110,18 @@ reread_log_header: } /* label it */ - alignas(OS_FILE_LOG_BLOCK_SIZE) byte log_hdr_buf[LOG_MAIN_FILE_SIZE]; - memset(log_hdr_buf, 0, sizeof log_hdr_buf); + alignas(OS_FILE_LOG_BLOCK_SIZE) byte log_hdr[512]; + memset(log_hdr, 0, sizeof log_hdr); - byte *log_hdr_field = log_hdr_buf; - mach_write_to_4(log_header::FORMAT + log_hdr_field, + mach_write_to_4(log_header::FORMAT + log_hdr, log_sys.log.format); - mach_write_to_4(log_header::KEY_VERSION + log_hdr_field, + mach_write_to_4(log_header::KEY_VERSION + log_hdr, log_sys.log.key_version); - strcpy(reinterpret_cast<char*>(log_header::CREATOR + log_hdr_field), - "Backup " MYSQL_SERVER_VERSION); - log_block_set_checksum(log_hdr_field, - log_block_calc_checksum_crc32(log_hdr_field)); - - /* copied from log_group_checkpoint() */ - log_hdr_field += - (log_sys.next_checkpoint_no & 1) ? LOG_CHECKPOINT_2 : LOG_CHECKPOINT_1; - /* The least significant bits of LOG_CHECKPOINT_OFFSET must be - stored correctly in the copy of the LOG_FILE_NAME. The most significant - bits, which identify the start offset of the log block in the file, - we did choose freely, as LOG_FILE_HDR_SIZE. */ - ut_ad(!((log_sys.log.get_lsn() ^ checkpoint_lsn_start) - & (OS_FILE_LOG_BLOCK_SIZE - 1))); - /* Adjust the checkpoint page. */ - memcpy(log_hdr_field, log_sys.buf, OS_FILE_LOG_BLOCK_SIZE); - mach_write_to_8(log_hdr_field + LOG_CHECKPOINT_OFFSET, - (checkpoint_lsn_start & (OS_FILE_LOG_BLOCK_SIZE - 1))); - log_block_set_checksum(log_hdr_field, - log_block_calc_checksum_crc32(log_hdr_field)); - - if (ds_write(dst_log_main_file, log_hdr_buf, sizeof(log_hdr_buf))) { + strcpy(reinterpret_cast<char*>(log_header::CREATOR + log_hdr), + "Backup " MYSQL_SERVER_VERSION); + mach_write_to_4(&log_hdr[512 - 4], ut_crc32(log_hdr, 512 -4)); + + if (ds_write(dst_log_main_file, log_hdr, sizeof(log_hdr))) { msg("error: write to main log file failed"); goto fail; } diff --git a/storage/innobase/include/log0crypt.h b/storage/innobase/include/log0crypt.h index c1fe84d8aad..7f77ecc206e 100644 --- a/storage/innobase/include/log0crypt.h +++ b/storage/innobase/include/log0crypt.h @@ -46,30 +46,24 @@ uint32_t log_crypt_key_version(); /** Read the MariaDB 10.1 checkpoint crypto (version, msg and iv) info. @param[in] buf checkpoint buffer @return whether the operation was successful */ -UNIV_INTERN -bool -log_crypt_101_read_checkpoint(const byte* buf); +ATTRIBUTE_COLD bool log_crypt_101_read_checkpoint(const byte* buf); /** Decrypt a MariaDB 10.1 redo log block. @param[in,out] buf log block @param[in] start_lsn server start LSN @return whether the decryption was successful */ -bool log_crypt_101_read_block(byte* buf, lsn_t start_lsn); +ATTRIBUTE_COLD bool log_crypt_101_read_block(byte* buf, lsn_t start_lsn); /** Read the checkpoint crypto (version, msg and iv) info. @param[in] buf checkpoint buffer @return whether the operation was successful */ -UNIV_INTERN -bool -log_crypt_read_checkpoint_buf(const byte* buf); +ATTRIBUTE_COLD bool log_crypt_read_checkpoint_buf(const byte* buf); -/** Encrypt or decrypt log blocks. -@param[in,out] buf log blocks to encrypt or decrypt +/** Decrypt a log block when upgrading from MariaDB 10.2.5 to 10.5.1. +@param[in,out] buf 512-byte log block to decrypt @param[in] lsn log sequence number of the start of the buffer -@param[in] size size of the buffer, in bytes -@param[in] decrypt whether to decrypt, instead of encrypting -@return whether the operation succeeded (encrypt always does) */ -bool log_crypt(byte* buf, lsn_t lsn, ulint size, bool decrypt= false); +@return whether the operation succeeded */ +ATTRIBUTE_COLD bool log_decrypt_10_4(byte* buf, lsn_t lsn); /** Encrypt or decrypt a temporary file block. @param[in] src block to encrypt or decrypt diff --git a/storage/innobase/include/log0log.h b/storage/innobase/include/log0log.h index a3a04d8762a..19b4bac786d 100644 --- a/storage/innobase/include/log0log.h +++ b/storage/innobase/include/log0log.h @@ -159,27 +159,6 @@ objects! */ void log_check_margins(void); -/** Calculate the CRC-32C checksum of a log block. -@param[in] block log block -@return checksum */ -inline ulint log_block_calc_checksum_crc32(const byte* block); - -/************************************************************//** -Gets a log block checksum field value. -@return checksum */ -UNIV_INLINE -ulint -log_block_get_checksum( -/*===================*/ - const byte* log_block); /*!< in: log block */ -/************************************************************//** -Sets a log block checksum field value. */ -UNIV_INLINE -void -log_block_set_checksum( -/*===================*/ - byte* log_block, /*!< in/out: log block */ - ulint checksum); /*!< in: checksum */ /******************************************************//** Prints info of the log. */ void @@ -192,34 +171,6 @@ void log_refresh_stats(void); /*===================*/ -#define LOG_BLOCK_KEY 4 /* encryption key version - before LOG_BLOCK_CHECKSUM; - in log_t::FORMAT_ENC_10_4 only */ -#define LOG_BLOCK_CHECKSUM 4 /* CRC-32C of the ib_logfile0 - header, or pre-10.5.2 log block - contents */ - -/** Offsets inside the checkpoint pages (redo log format version 1) @{ */ -/** Checkpoint number */ -#define LOG_CHECKPOINT_NO 0 -/** Log sequence number up to which all changes have been flushed */ -#define LOG_CHECKPOINT_LSN 8 -/** Byte offset of the log record corresponding to LOG_CHECKPOINT_LSN */ -#define LOG_CHECKPOINT_OFFSET 16 -/** srv_log_buffer_size at the time of the checkpoint (not used) */ -#define LOG_CHECKPOINT_LOG_BUF_SIZE 24 -/** MariaDB 10.2.5 encrypted redo log encryption key version (32 bits)*/ -#define LOG_CHECKPOINT_CRYPT_KEY 32 -/** MariaDB 10.2.5 encrypted redo log random nonce (32 bits) */ -#define LOG_CHECKPOINT_CRYPT_NONCE 36 -/** MariaDB 10.2.5 encrypted redo log random message (MY_AES_BLOCK_SIZE) */ -#define LOG_CHECKPOINT_CRYPT_MESSAGE 40 -/** start LSN of the MLOG_CHECKPOINT mini-transaction corresponding -to this checkpoint, or 0 if the information has not been written */ -#define LOG_CHECKPOINT_END_LSN OS_FILE_LOG_BLOCK_SIZE - 16 - -/* @} */ - /** Offsets of a log file header */ namespace log_header { @@ -262,8 +213,6 @@ namespace log_header #define LOG_CHECKPOINT_2 (3 * OS_FILE_LOG_BLOCK_SIZE) /* second checkpoint field in the log header */ -/** size of LOG_FILE_NAME (header + checkpoints */ -constexpr size_t LOG_MAIN_FILE_SIZE= 4 * OS_FILE_LOG_BLOCK_SIZE; typedef ib_mutex_t LogSysMutex; typedef ib_mutex_t FlushOrderMutex; @@ -391,6 +340,10 @@ private: This must hold if lsn - last_checkpoint_lsn > max_checkpoint_age. */ std::atomic<bool> check_flush_or_checkpoint_; public: +#if 0 + /** The sequence bit of the next record to write */ + bool sequence_bit; +#endif MY_ALIGNED(CACHE_LINE_SIZE) LogSysMutex mutex; /*!< mutex protecting the log */ @@ -442,14 +395,11 @@ public: log_file_t fd; public: - /** used only in recovery: recovery scan succeeded up to this - lsn in this log group */ - lsn_t scanned_lsn; - /** opens log files which must be closed prior this call */ void open_files(std::string path); /** renames log file */ dberr_t main_rename(std::string path) { return fd.rename(path); } + os_offset_t main_file_size() const { return fd_offset; } /** reads from main log files */ void main_read(os_offset_t offset, span<byte> buf); /** writes buffer to log file @@ -489,7 +439,11 @@ public: @param[in] lsn log sequence number @return offset within the log */ inline lsn_t calc_lsn_offset(lsn_t lsn) const; - lsn_t calc_lsn_offset_old(lsn_t lsn) const; + /** Calculate the offset of a log sequence number + in an old redo log file (during upgrade check). + @param[in] lsn log sequence number + @return byte offset within the log */ + inline lsn_t calc_lsn_offset_old(lsn_t lsn) const; /** Set the field values to correspond to a given lsn. */ void set_fields(lsn_t lsn) @@ -614,14 +568,6 @@ public: void set_check_flush_or_checkpoint(bool flag= true) { check_flush_or_checkpoint_.store(flag, std::memory_order_relaxed); } - /** @return the log block trailer offset */ - unsigned trailer_offset() const - { - return log.format == FORMAT_ENC_10_4 - ? OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_CHECKSUM - LOG_BLOCK_KEY - : OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_CHECKSUM; - } - size_t get_pending_flushes() const { return pending_flushes.load(std::memory_order_relaxed); diff --git a/storage/innobase/include/log0log.ic b/storage/innobase/include/log0log.ic index b3bc10620f0..f3a3d2aa1d9 100644 --- a/storage/innobase/include/log0log.ic +++ b/storage/innobase/include/log0log.ic @@ -28,70 +28,6 @@ Created 12/9/1995 Heikki Tuuri #include "srv0mon.h" #include "ut0crc32.h" -/** Calculate the checksum for a log block using the pre-5.7.9 algorithm. -@param[in] block log block -@return checksum */ -UNIV_INLINE -ulint -log_block_calc_checksum_format_0( - const byte* block) -{ - ulint sum; - ulint sh; - ulint i; - - sum = 1; - sh = 0; - - for (i = 0; i < OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_CHECKSUM; i++) { - ulint b = (ulint) block[i]; - sum &= 0x7FFFFFFFUL; - sum += b; - sum += b << sh; - sh++; - if (sh > 24) { - sh = 0; - } - } - - return(sum); -} - -/** Calculate the CRC-32C checksum of a log block. -@param[in] block log block -@return checksum */ -inline ulint log_block_calc_checksum_crc32(const byte* block) -{ - return ut_crc32(block, OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_CHECKSUM); -} - -/************************************************************//** -Gets a log block checksum field value. -@return checksum */ -UNIV_INLINE -ulint -log_block_get_checksum( -/*===================*/ - const byte* log_block) /*!< in: log block */ -{ - return(mach_read_from_4(log_block + OS_FILE_LOG_BLOCK_SIZE - - LOG_BLOCK_CHECKSUM)); -} - -/************************************************************//** -Sets a log block checksum field value. */ -UNIV_INLINE -void -log_block_set_checksum( -/*===================*/ - byte* log_block, /*!< in/out: log block */ - ulint checksum) /*!< in: checksum */ -{ - mach_write_to_4(log_block + OS_FILE_LOG_BLOCK_SIZE - - LOG_BLOCK_CHECKSUM, - checksum); -} - /***********************************************************************//** Checks if there is need for a log buffer flush or a new checkpoint, and does this if yes. Any database operation should call this when it has modified diff --git a/storage/innobase/include/log0recv.h b/storage/innobase/include/log0recv.h index 91b0cd7cc1f..371f492c564 100644 --- a/storage/innobase/include/log0recv.h +++ b/storage/innobase/include/log0recv.h @@ -149,7 +149,7 @@ struct recv_dblwr_t { /** the recovery state and buffered records for a page */ struct page_recv_t { - /** Recovery state */ + /** Recovery state; protected by recv_sys.mutex */ enum { /** not yet processed */ @@ -216,10 +216,14 @@ struct page_recv_t }; /** Recovery system data structure */ -struct recv_sys_t{ - ib_mutex_t mutex; /*!< mutex protecting the fields apply_log_recs, - n_addrs, and the state field in each recv_addr - struct */ +struct recv_sys_t +{ + /** mutex protecting apply_log_recs and page_recv_t::state */ + ib_mutex_t mutex; + /** whether recv_recover_page(), invoked from buf_page_io_complete(), + should apply log records*/ + bool apply_log_recs; + ib_mutex_t writer_mutex;/*!< mutex coordinating flushing between recv_writer_thread and the recovery thread. */ @@ -230,9 +234,6 @@ struct recv_sys_t{ buf_flush_t flush_type;/*!< type of the flush request. BUF_FLUSH_LRU: flush end of LRU, keeping free blocks. BUF_FLUSH_LIST: flush all of blocks. */ - /** whether recv_recover_page(), invoked from buf_page_io_complete(), - should apply log records*/ - bool apply_log_recs; /** whether recv_apply_hashed_log_recs() is running */ bool apply_batch_on; byte* buf; /*!< buffer for parsing log records */ @@ -246,9 +247,6 @@ struct recv_sys_t{ lsn_t scanned_lsn; /*!< the log data has been scanned up to this lsn */ - ulint scanned_checkpoint_no; - /*!< the log data has been scanned up to this - checkpoint number (lowest 4 bytes) */ ulint recovered_offset; /*!< start offset of non-parsed log records in buf */ @@ -266,6 +264,9 @@ struct recv_sys_t{ /** the time when progress was last reported */ time_t progress_time; + /** The sequence bit of the next record to parse */ + bool sequence_bit; + using map = std::map<const page_id_t, page_recv_t, std::less<const page_id_t>, ut_allocator<std::pair<const page_id_t, page_recv_t>>>; diff --git a/storage/innobase/log/log0crypt.cc b/storage/innobase/log/log0crypt.cc index f3d66d3a221..044b56c0f0c 100644 --- a/storage/innobase/log/log0crypt.cc +++ b/storage/innobase/log/log0crypt.cc @@ -34,22 +34,15 @@ MDEV-11782: Rewritten for MariaDB 10.2 by Marko Mäkelä, MariaDB Corporation. /** innodb_encrypt_log: whether to encrypt the redo log */ my_bool srv_encrypt_log; -struct aes_block_t { - byte bytes[MY_AES_BLOCK_SIZE]; -}; - struct crypt_info_t { ulint checkpoint_no; /*!< checkpoint no; 32 bits */ uint key_version; /*!< mysqld key version */ /** random string for encrypting the key */ - aes_block_t crypt_msg; + alignas(8) byte crypt_msg[MY_AES_BLOCK_SIZE]; /** the secret key */ - aes_block_t crypt_key; + alignas(8) byte crypt_key[MY_AES_BLOCK_SIZE]; /** a random string for the per-block initialization vector */ - union { - uint32_t word; - byte bytes[4]; - } crypt_nonce; + alignas(4) byte crypt_nonce[4]; }; /** The crypt info */ @@ -88,7 +81,7 @@ static bool init_crypt_key(crypt_info_t* info, bool upgrade = false) byte mysqld_key[MY_AES_MAX_KEY_LENGTH]; uint keylen = sizeof mysqld_key; - compile_time_assert(16 == sizeof info->crypt_key.bytes); + compile_time_assert(16 == sizeof info->crypt_key); compile_time_assert(16 == MY_AES_BLOCK_SIZE); if (uint rc = encryption_key_get(log_t::KEY_ID, @@ -111,8 +104,8 @@ static bool init_crypt_key(crypt_info_t* info, bool upgrade = false) uint dst_len; int err= my_aes_crypt(MY_AES_ECB, ENCRYPTION_FLAG_NOPAD | ENCRYPTION_FLAG_ENCRYPT, - info->crypt_msg.bytes, MY_AES_BLOCK_SIZE, - info->crypt_key.bytes, &dst_len, + info->crypt_msg, MY_AES_BLOCK_SIZE, + info->crypt_key, &dst_len, mysqld_key, keylen, NULL, 0); if (err != MY_AES_OK || dst_len != MY_AES_BLOCK_SIZE) { @@ -124,86 +117,49 @@ static bool init_crypt_key(crypt_info_t* info, bool upgrade = false) return true; } -/** Encrypt or decrypt log blocks. -@param[in,out] buf log blocks to encrypt or decrypt -@param[in] lsn log sequence number of the start of the buffer -@param[in] size size of the buffer, in bytes -@param[in] decrypt whether to decrypt, instead of encrypting -@return whether the operation succeeded (encrypt always does) */ -bool log_crypt(byte* buf, lsn_t lsn, ulint size, bool decrypt) +/** Decrypt a log block when upgrading from MariaDB 10.2.5 to 10.5.1. +@param[in,out] buf 512-byte log block to decrypt +@param[in] lsn log sequence number of the start of the buffer +@return whether the operation succeeded */ +ATTRIBUTE_COLD bool log_decrypt_10_4(byte* buf, lsn_t lsn) { - ut_ad(size % OS_FILE_LOG_BLOCK_SIZE == 0); - buf = my_assume_aligned<OS_FILE_LOG_BLOCK_SIZE>(buf); - ut_a(info.key_version); - - uint32_t aes_ctr_iv[MY_AES_BLOCK_SIZE / sizeof(uint32_t)]; - compile_time_assert(sizeof(uint32_t) == 4); - -#define LOG_CRYPT_HDR_SIZE 4 - lsn &= ~lsn_t(OS_FILE_LOG_BLOCK_SIZE - 1); - - for (const byte* const end = buf + size; buf != end; - buf += OS_FILE_LOG_BLOCK_SIZE, lsn += OS_FILE_LOG_BLOCK_SIZE) { - uint32_t dst[(OS_FILE_LOG_BLOCK_SIZE - LOG_CRYPT_HDR_SIZE - - LOG_BLOCK_CHECKSUM) - / sizeof(uint32_t)]; - - /* The log block number is not encrypted. */ - *aes_ctr_iv = -#ifdef WORDS_BIGENDIAN - 0x7FFFFFFFU -#else - 0x7FU -#endif - & (*dst = *reinterpret_cast<const uint32_t*>(buf)); - aes_ctr_iv[1] = info.crypt_nonce.word; - mach_write_to_8(reinterpret_cast<byte*>(aes_ctr_iv + 2), lsn); - const uint dst_size - = log_sys.log.format == log_t::FORMAT_ENC_10_4 - ? sizeof dst - LOG_BLOCK_KEY - : sizeof dst; - if (!decrypt) { - ut_ad(log_sys.is_physical()); - } else if (UNIV_UNLIKELY(log_sys.log.format - == log_t::FORMAT_ENC_10_4)) { - const uint key_version = info.key_version; - info.key_version = mach_read_from_4( - OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_KEY - - LOG_BLOCK_CHECKSUM + buf); - if (key_version != info.key_version - && !init_crypt_key(&info)) { - return false; - } -#ifndef DBUG_OFF - if (key_version != info.key_version) { - DBUG_PRINT("ib_log", ("key_version: %x -> %x", - key_version, - info.key_version)); - } -#endif /* !DBUG_OFF */ - } - - ut_ad(LOG_CRYPT_HDR_SIZE + dst_size - == log_sys.trailer_offset()); - - uint dst_len; - int rc = encryption_crypt( - buf + LOG_CRYPT_HDR_SIZE, dst_size, - reinterpret_cast<byte*>(dst), &dst_len, - const_cast<byte*>(info.crypt_key.bytes), - MY_AES_BLOCK_SIZE, - reinterpret_cast<byte*>(aes_ctr_iv), sizeof aes_ctr_iv, - decrypt - ? ENCRYPTION_FLAG_DECRYPT | ENCRYPTION_FLAG_NOPAD - : ENCRYPTION_FLAG_ENCRYPT | ENCRYPTION_FLAG_NOPAD, - log_t::KEY_ID, - info.key_version); - ut_a(rc == MY_AES_OK); - ut_a(dst_len == dst_size); - memcpy(buf + LOG_CRYPT_HDR_SIZE, dst, dst_size); - } - - return true; + buf= my_assume_aligned<512>(buf); + ut_ad(info.key_version); + + alignas(8) byte aes_ctr_iv[MY_AES_BLOCK_SIZE]; + constexpr uint LOG_CRYPT_HDR_SIZE= 4; + alignas(4) byte dst[512 - LOG_CRYPT_HDR_SIZE - 4]; + + /* The log block number is not encrypted. */ + memcpy_aligned<4>(dst, buf, 4); + memcpy_aligned<4>(aes_ctr_iv, buf, 4); + *aes_ctr_iv&= 0x7f; + memcpy_aligned<4>(aes_ctr_iv + 4, info.crypt_nonce, 4); + mach_write_to_8(my_assume_aligned<8>(aes_ctr_iv + 8), lsn); + uint dst_size= sizeof dst; + + if (log_sys.log.format == log_t::FORMAT_ENC_10_4) + { + dst_size-= 4; + const uint key_version= info.key_version; + info.key_version = mach_read_from_4(512 - 4 - 4 + buf); + if (key_version != info.key_version && !init_crypt_key(&info)) + return false; + } + + uint dst_len; + int rc= encryption_crypt(buf + LOG_CRYPT_HDR_SIZE, dst_size, + reinterpret_cast<byte*>(dst), &dst_len, + const_cast<byte*>(info.crypt_key), + MY_AES_BLOCK_SIZE, + reinterpret_cast<byte*>(aes_ctr_iv), + sizeof aes_ctr_iv, + ENCRYPTION_FLAG_DECRYPT | ENCRYPTION_FLAG_NOPAD, + log_t::KEY_ID, info.key_version); + ut_a(rc == MY_AES_OK); + ut_a(dst_len == dst_size); + memcpy(buf + LOG_CRYPT_HDR_SIZE, dst, dst_size); + return true; } /** Initialize the redo log encryption key and random parameters @@ -218,9 +174,9 @@ bool log_crypt_init() if (info.key_version == ENCRYPTION_KEY_VERSION_INVALID) ib::error() << "log_crypt_init(): cannot get key version"; else if (my_random_bytes(tmp_iv, MY_AES_BLOCK_SIZE) != MY_AES_OK || - my_random_bytes(info.crypt_msg.bytes, sizeof info.crypt_msg) != + my_random_bytes(info.crypt_msg, sizeof info.crypt_msg) != MY_AES_OK || - my_random_bytes(info.crypt_nonce.bytes, sizeof info.crypt_nonce) != + my_random_bytes(info.crypt_nonce, sizeof info.crypt_nonce) != MY_AES_OK) ib::error() << "log_crypt_init(): my_random_bytes() failed"; else if (init_crypt_key(&info)) @@ -241,9 +197,7 @@ uint32_t log_crypt_key_version() /** Read the MariaDB 10.1 checkpoint crypto (version, msg and iv) info. @param[in] buf checkpoint buffer @return whether the operation was successful */ -UNIV_INTERN -bool -log_crypt_101_read_checkpoint(const byte* buf) +ATTRIBUTE_COLD bool log_crypt_101_read_checkpoint(const byte* buf) { buf += 20 + 32 * 9; @@ -265,9 +219,8 @@ log_crypt_101_read_checkpoint(const byte* buf) infos_used++; info.checkpoint_no = checkpoint_no; info.key_version = mach_read_from_4(buf + 4); - memcpy(info.crypt_msg.bytes, buf + 8, MY_AES_BLOCK_SIZE); - memcpy(info.crypt_nonce.bytes, buf + 24, - sizeof info.crypt_nonce); + memcpy(info.crypt_msg, buf + 8, MY_AES_BLOCK_SIZE); + memcpy(info.crypt_nonce, buf + 24, sizeof info.crypt_nonce); if (!init_crypt_key(&info, true)) { return false; @@ -283,10 +236,8 @@ next_slot: @param[in,out] buf log block @param[in] start_lsn server start LSN @return whether the decryption was successful */ -bool log_crypt_101_read_block(byte* buf, lsn_t start_lsn) +ATTRIBUTE_COLD bool log_crypt_101_read_block(byte* buf, lsn_t start_lsn) { - ut_ad(log_block_calc_checksum_format_0(buf) - != log_block_get_checksum(buf)); const uint32_t checkpoint_no = mach_read_from_4(buf + 8); const crypt_info_t* info = infos; for (const crypt_info_t* const end = info + infos_used; info < end; @@ -315,7 +266,7 @@ found: /* The log block header is not encrypted. */ memcpy(dst, buf, 12); - memcpy(aes_ctr_iv, info->crypt_nonce.bytes, 3); + memcpy(aes_ctr_iv, info->crypt_nonce, 3); mach_write_to_8(aes_ctr_iv + 3, log_block_get_start_lsn(start_lsn, log_block_no)); memcpy(aes_ctr_iv + 11, buf, 4); @@ -324,7 +275,7 @@ found: int rc = encryption_crypt(buf + 12, src_len, dst + 12, &dst_len, - const_cast<byte*>(info->crypt_key.bytes), + const_cast<byte*>(info->crypt_key), MY_AES_BLOCK_SIZE, aes_ctr_iv, MY_AES_BLOCK_SIZE, ENCRYPTION_FLAG_DECRYPT @@ -340,12 +291,19 @@ found: return true; } +/** Checkpoint number */ +constexpr uint LOG_CHECKPOINT_NO= 0; +/** MariaDB 10.2.5 encrypted redo log encryption key version (32 bits)*/ +constexpr uint LOG_CHECKPOINT_CRYPT_KEY= 32; +/** MariaDB 10.2.5 encrypted redo log random nonce (32 bits) */ +constexpr uint LOG_CHECKPOINT_CRYPT_NONCE= 36; +/** MariaDB 10.2.5 encrypted redo log random message (MY_AES_BLOCK_SIZE) */ +constexpr uint LOG_CHECKPOINT_CRYPT_MESSAGE= 40; + /** Read the checkpoint crypto (version, msg and iv) info. @param[in] buf checkpoint buffer @return whether the operation was successful */ -UNIV_INTERN -bool -log_crypt_read_checkpoint_buf(const byte* buf) +ATTRIBUTE_COLD bool log_crypt_read_checkpoint_buf(const byte* buf) { info.checkpoint_no = mach_read_from_4(buf + (LOG_CHECKPOINT_NO + 4)); info.key_version = mach_read_from_4(buf + LOG_CHECKPOINT_CRYPT_KEY); @@ -353,15 +311,15 @@ log_crypt_read_checkpoint_buf(const byte* buf) #if MY_AES_BLOCK_SIZE != 16 # error "MY_AES_BLOCK_SIZE != 16; redo log checkpoint format affected" #endif - compile_time_assert(16 == sizeof info.crypt_msg.bytes); + compile_time_assert(16 == sizeof info.crypt_msg); compile_time_assert(16 == MY_AES_BLOCK_SIZE); compile_time_assert(LOG_CHECKPOINT_CRYPT_MESSAGE - LOG_CHECKPOINT_CRYPT_NONCE == sizeof info.crypt_nonce); - memcpy(info.crypt_msg.bytes, buf + LOG_CHECKPOINT_CRYPT_MESSAGE, + memcpy(info.crypt_msg, buf + LOG_CHECKPOINT_CRYPT_MESSAGE, MY_AES_BLOCK_SIZE); - memcpy(info.crypt_nonce.bytes, buf + LOG_CHECKPOINT_CRYPT_NONCE, + memcpy(info.crypt_nonce, buf + LOG_CHECKPOINT_CRYPT_NONCE, sizeof info.crypt_nonce); return init_crypt_key(&info); @@ -390,7 +348,7 @@ log_tmp_block_encrypt( int rc = encryption_crypt( src, uint(size), dst, &dst_len, - const_cast<byte*>(info.crypt_key.bytes), MY_AES_BLOCK_SIZE, + const_cast<byte*>(info.crypt_key), MY_AES_BLOCK_SIZE, reinterpret_cast<byte*>(iv), uint(sizeof iv), encrypt ? ENCRYPTION_FLAG_ENCRYPT|ENCRYPTION_FLAG_NOPAD diff --git a/storage/innobase/log/log0log.cc b/storage/innobase/log/log0log.cc index dd702c69530..34c82759f95 100644 --- a/storage/innobase/log/log0log.cc +++ b/storage/innobase/log/log0log.cc @@ -1503,7 +1503,7 @@ std::vector<std::string> get_existing_log_files_paths() { dberr_t create_data_file(os_offset_t size) { - ut_ad(size > LOG_MAIN_FILE_SIZE); + ut_ad(size >= 512); const auto path= get_log_file_path(LOG_DATA_FILE_NAME); os_file_delete_if_exists(innodb_log_file_key, path.c_str(), nullptr); diff --git a/storage/innobase/log/log0recv.cc b/storage/innobase/log/log0recv.cc index f66b8b4aa39..76f840d314a 100644 --- a/storage/innobase/log/log0recv.cc +++ b/storage/innobase/log/log0recv.cc @@ -1022,7 +1022,6 @@ void recv_sys_t::create() len = 0; parse_start_lsn = 0; scanned_lsn = 0; - scanned_checkpoint_no = 0; recovered_offset = 0; recovered_lsn = 0; found_corrupt_log = false; @@ -1218,8 +1217,8 @@ fail: break; } - ulint crc = log_block_calc_checksum_crc32(buf); - ulint cksum = log_block_get_checksum(buf); + uint32_t crc = ut_crc32(buf, 512 - 4); + uint32_t cksum = mach_read_from_4(&buf[512 - 4]); DBUG_EXECUTE_IF("log_intermittent_checksum_mismatch", { static int block_counter; @@ -1237,13 +1236,6 @@ fail: goto fail; } - if ((is_physical() - ? is_encrypted_physical() - : is_encrypted_old()) - && !log_crypt(buf, *start_lsn, - OS_FILE_LOG_BLOCK_SIZE, true)) { - goto fail; - } #if 0// FIXME ulint dl = log_block_get_data_len(buf); if (dl < LOG_BLOCK_HDR_SIZE @@ -1270,18 +1262,6 @@ fail: } -/** Check the consistency of a log header block. -@param[in] log header block -@return true if ok */ -static -bool -recv_check_log_header_checksum( - const byte* buf) -{ - return(log_block_get_checksum(buf) - == log_block_calc_checksum_crc32(buf)); -} - static bool redo_file_sizes_are_correct() { auto paths= get_existing_log_files_paths(); @@ -1304,15 +1284,32 @@ static bool redo_file_sizes_are_correct() return false; } +/** Calculate the checksum for a log block using the pre-10.2.2 algorithm. */ +inline uint32_t log_block_calc_checksum_format_0(const byte *block) +{ + uint32_t sum= 1; + + for (ulint i= 0, sh= 0; i < 512 - 4; i++) + { + ulint b= ulint{block[i]}; + sum&= 0x7FFFFFFFUL; + sum+= b; + sum+= b << sh++; + if (sh == 24) + sh= 0; + } + + return sum; +} + /** Determine if a redo log from before MariaDB 10.2.2 is clean. @return error code @retval DB_SUCCESS if the redo log is clean @retval DB_CORRUPTION if the redo log is corrupted @retval DB_ERROR if the redo log is not empty */ -static dberr_t recv_log_recover_pre_10_2() +ATTRIBUTE_COLD static dberr_t recv_log_recover_pre_10_2() { uint64_t max_no= 0; - uint64_t checkpoint_no; byte *buf= log_sys.buf; ut_ad(log_sys.log.format == 0); @@ -1348,17 +1345,17 @@ static dberr_t recv_log_recover_pre_10_2() continue; } - checkpoint_no = mach_read_from_8(buf + LOG_CHECKPOINT_NO); - if (!log_crypt_101_read_checkpoint(buf)) { ib::error() << "Decrypting checkpoint failed"; continue; } + const uint64_t checkpoint_no= mach_read_from_8(buf); + DBUG_PRINT("ib_log", ("checkpoint " UINT64PF " at " LSN_PF " found", checkpoint_no, - mach_read_from_8(buf + LOG_CHECKPOINT_LSN))); + mach_read_from_8(buf + CHECKPOINT_LSN))); if (checkpoint_no >= max_no) { @@ -1390,7 +1387,8 @@ static dberr_t recv_log_recover_pre_10_2() recv_sys.read(source_offset & ~511, {buf, 512}); - if (log_block_calc_checksum_format_0(buf) != log_block_get_checksum(buf) && + if (log_block_calc_checksum_format_0(buf) != + mach_read_from_4(&buf[512 - 4]) && !log_crypt_101_read_block(buf, lsn)) { ib::error() << NO_UPGRADE_RECOVERY_MSG << ", and it appears corrupted."; @@ -1401,8 +1399,7 @@ static dberr_t recv_log_recover_pre_10_2() { /* Mark the redo log for upgrading. */ srv_log_file_size= 0; - recv_sys.parse_start_lsn= recv_sys.recovered_lsn= recv_sys.scanned_lsn= - lsn; + recv_sys.parse_start_lsn= recv_sys.scanned_lsn= lsn; log_sys.last_checkpoint_lsn= log_sys.next_checkpoint_lsn= log_sys.write_lsn= log_sys.current_flush_lsn= lsn; log_sys.next_checkpoint_no= 0; @@ -1418,11 +1415,13 @@ static dberr_t recv_log_recover_pre_10_2() return DB_ERROR; } -/** Same as cals_lsn_offset() except that it supports multiple files */ -lsn_t log_t::file::calc_lsn_offset_old(lsn_t lsn) const +/** Calculate the offset of a log sequence number +in an old redo log file (during upgrade check). +@param[in] lsn log sequence number +@return byte offset within the log */ +inline lsn_t log_t::file::calc_lsn_offset_old(lsn_t lsn) const { constexpr size_t LOG_FILE_HDR_SIZE= 2048; - ut_ad(log_sys.mutex.is_owned() || log_write_lock_own()); const lsn_t size= (file_size - LOG_FILE_HDR_SIZE) * recv_sys.files_size(); lsn_t l= lsn - this->lsn; if (longlong(l) < 0) @@ -1440,198 +1439,237 @@ lsn_t log_t::file::calc_lsn_offset_old(lsn_t lsn) const @retval DB_SUCCESS if the redo log is clean @retval DB_CORRUPTION if the redo log is corrupted @retval DB_ERROR if the redo log is not empty */ -static dberr_t recv_log_recover_10_4() +ATTRIBUTE_COLD static dberr_t recv_log_recover_10_4() { - const lsn_t lsn = log_sys.log.get_lsn(); - const lsn_t source_offset = log_sys.log.calc_lsn_offset_old(lsn); - byte* buf = log_sys.buf; + uint64_t max_no= 0; + byte *buf= log_sys.buf; + lsn_t lsn= 0; - if (!redo_file_sizes_are_correct()) { - return DB_CORRUPTION; - } + /** the checkpoint LSN field */ + constexpr uint CHECKPOINT_LSN= 8; + /** Byte offset of the log record corresponding to LOG_CHECKPOINT_LSN */ + constexpr uint CHECKPOINT_OFFSET= 16; + /** start LSN of the MLOG_CHECKPOINT mini-transaction corresponding + to this checkpoint, or 0 if the information has not been written */ + constexpr uint CHECKPOINT_END_LSN= 512 - 16; - log_sys.log.main_read(source_offset & ~(OS_FILE_LOG_BLOCK_SIZE - 1), - {buf, OS_FILE_LOG_BLOCK_SIZE}); - - ulint crc = log_block_calc_checksum_crc32(buf); - ulint cksum = log_block_get_checksum(buf); - - if (crc != cksum) { - ib::error() << "Invalid log block checksum." - << " block: " - << (mach_read_from_4(buf) & 0x7FFFFFFF) - << " checkpoint no: " - << mach_read_from_4(buf + 8) - << " expected: " << crc - << " found: " << cksum; - return DB_CORRUPTION; - } + for (ulint field= LOG_CHECKPOINT_1; field <= LOG_CHECKPOINT_2; + field += LOG_CHECKPOINT_2 - LOG_CHECKPOINT_1) + { + log_sys.log.main_read(field, {buf, OS_FILE_LOG_BLOCK_SIZE}); - if (log_sys.log.is_encrypted_old() - && !log_crypt(buf, lsn & (OS_FILE_LOG_BLOCK_SIZE - 1), - OS_FILE_LOG_BLOCK_SIZE, true)) { - return DB_ERROR; - } + const uint32_t crc32= ut_crc32(buf, 512 - 4); + const uint32_t cksum= mach_read_from_4(&buf[512 - 4]); - /* On a clean shutdown, the redo log will be logically empty - after the checkpoint lsn. */ + if (crc32 != cksum) + { + DBUG_PRINT("ib_log", + ("invalid checkpoint, at " ULINTPF + ", checksum %x expected %x", field, cksum, crc32)); + continue; + } - if (mach_read_from_2(buf + 4/* LOG_BLOCK_HDR_DATA_LEN */) - != (source_offset & (OS_FILE_LOG_BLOCK_SIZE - 1))) { - return DB_ERROR; - } + if (log_sys.is_encrypted_old() && !log_crypt_read_checkpoint_buf(buf)) + { + ib::error() << "Reading checkpoint encryption info failed."; + continue; + } - /* Mark the redo log for upgrading. */ - srv_log_file_size = 0; - recv_sys.parse_start_lsn = recv_sys.recovered_lsn - = recv_sys.scanned_lsn = lsn; - log_sys.last_checkpoint_lsn = log_sys.next_checkpoint_lsn - = log_sys.write_lsn = log_sys.current_flush_lsn = lsn; - log_sys.next_checkpoint_no = 0; - return DB_SUCCESS; -} + const lsn_t checkpoint_lsn= mach_read_from_8(buf + CHECKPOINT_LSN); + const lsn_t end_lsn= mach_read_from_8(buf + CHECKPOINT_END_LSN); + if (end_lsn && end_lsn < checkpoint_lsn) + continue; -/** Find the latest checkpoint in the log header. -@param[out] max_field LOG_CHECKPOINT_1 or LOG_CHECKPOINT_2 -@return error code or DB_SUCCESS */ -dberr_t -recv_find_max_checkpoint(ulint* max_field) -{ - ib_uint64_t max_no; - ib_uint64_t checkpoint_no; - ulint field; - byte* buf; + uint64_t checkpoint_no= mach_read_from_8(buf); - max_no = 0; - *max_field = 0; + DBUG_PRINT("ib_log", ("checkpoint " UINT64PF " at " LSN_PF " found", + checkpoint_no, checkpoint_lsn)); - ut_ad(!(log_sys.log.file_size & 511)); + if (checkpoint_no >= max_no) + { + max_no= checkpoint_no; + lsn= checkpoint_lsn; + log_sys.log.set_lsn(lsn); + log_sys.log.set_lsn_offset(mach_read_from_8(buf + CHECKPOINT_OFFSET)); + log_sys.next_checkpoint_no= checkpoint_no; + } + } - buf = log_sys.buf; + if (!lsn) + { + /* Before 10.2.2, we could get here during database initialization + if we created an LOG_FILE_NAME file that was filled with zeroes, + and were killed. After 10.2.2, we would reject such a file already + earlier, when checking the file header. */ + ib::error() << "No valid checkpoint found (corrupted redo log)."; + return DB_ERROR; + } - log_sys.log.main_read(0, {buf, OS_FILE_LOG_BLOCK_SIZE}); - /* Check the header page checksum. There was no - checksum in the first redo log format (version 0). */ - log_sys.log.format = mach_read_from_4(buf + log_header::FORMAT); - if (log_sys.is_physical()) { - log_sys.log.key_version = mach_read_from_4( - buf + log_header::KEY_VERSION); - } else { - log_sys.log.key_version = 0; - } + log_sys.set_lsn(lsn); + log_sys.set_flushed_lsn(lsn); + const lsn_t source_offset= log_sys.log.calc_lsn_offset_old(lsn); - if (log_sys.log.format != log_t::FORMAT_3_23 - && !recv_check_log_header_checksum(buf)) { - ib::error() << "Invalid redo log header checksum."; - return(DB_CORRUPTION); - } + if (!redo_file_sizes_are_correct()) + return DB_CORRUPTION; - char creator[log_header::CREATOR_END - log_header::CREATOR + 1]; - - memcpy(creator, buf + log_header::CREATOR, sizeof creator); - /* Ensure that the string is NUL-terminated. */ - creator[log_header::CREATOR_END - log_header::CREATOR] = 0; - - switch (log_sys.log.format) { - case log_t::FORMAT_3_23: - return recv_log_recover_pre_10_2(); - case log_t::FORMAT_10_2: - case log_t::FORMAT_10_2 | log_t::FORMAT_ENCRYPTED: - case log_t::FORMAT_10_3: - case log_t::FORMAT_10_3 | log_t::FORMAT_ENCRYPTED: - case log_t::FORMAT_10_4: - case log_t::FORMAT_10_4 | log_t::FORMAT_ENCRYPTED: - break; - case log_t::FORMAT_10_5: - if (auto size = mach_read_from_8(buf + log_header::SIZE)) { - size &= ~(1ULL << 47); - if (size == log_sys.log.file_size) { - goto current; - } + log_sys.log.main_read(source_offset & ~511, {buf, 512}); - ib::error() << "Inconsistent redo log size: " - << size << "!=" << log_sys.log.file_size; - } - /* fall through */ - default: - ib::error() << "Unsupported redo log format." - " The redo log was created with " << creator << "."; - return(DB_ERROR); - } + const uint32_t crc= ut_crc32(buf, 512 - 4); + const uint32_t cksum= mach_read_from_4(&buf[512 - 4]); - for (field = LOG_CHECKPOINT_1; field <= LOG_CHECKPOINT_2; - field += LOG_CHECKPOINT_2 - LOG_CHECKPOINT_1) { - log_sys.log.main_read(field, {buf, OS_FILE_LOG_BLOCK_SIZE}); + if (crc != cksum) + { + ib::error() << "Invalid log block checksum. block: " + << (mach_read_from_4(buf) & 0x7FFFFFFF) + << " checkpoint no: " + << mach_read_from_4(buf + 8) + << " expected: " << crc << " found: " << cksum; + return DB_CORRUPTION; + } - const ulint crc32 = log_block_calc_checksum_crc32(buf); - const ulint cksum = log_block_get_checksum(buf); + if (log_sys.is_encrypted_old() && !log_decrypt_10_4(buf, lsn & ~511)) + return DB_ERROR; - if (crc32 != cksum) { - DBUG_PRINT("ib_log", - ("invalid checkpoint," - " at " ULINTPF - ", checksum " ULINTPFx - " expected " ULINTPFx, - field, cksum, crc32)); - continue; - } + /* On a clean shutdown, the redo log will be logically empty + after the checkpoint lsn. */ - if ((log_sys.is_physical() - ? log_sys.is_encrypted_physical() - : log_sys.is_encrypted_old()) - && !log_crypt_read_checkpoint_buf(buf)) { - ib::error() << "Reading checkpoint" - " encryption info failed."; - continue; - } + if (mach_read_from_2(buf + 4) != (source_offset & 511)) + return DB_ERROR; - checkpoint_no = mach_read_from_8( - buf + LOG_CHECKPOINT_NO); - - DBUG_PRINT("ib_log", - ("checkpoint " UINT64PF " at " LSN_PF " found", - checkpoint_no, mach_read_from_8( - buf + LOG_CHECKPOINT_LSN))); - - if (checkpoint_no >= max_no) { - *max_field = field; - max_no = checkpoint_no; - log_sys.log.set_lsn(mach_read_from_8( - buf + LOG_CHECKPOINT_LSN)); - log_sys.log.set_lsn_offset(mach_read_from_8( - buf + LOG_CHECKPOINT_OFFSET)); - log_sys.next_checkpoint_no = checkpoint_no; - } - } + /* Mark the redo log for upgrading. */ + srv_log_file_size= 0; + recv_sys.parse_start_lsn= recv_sys.scanned_lsn= lsn; + log_sys.last_checkpoint_lsn= log_sys.next_checkpoint_lsn= + log_sys.write_lsn= log_sys.current_flush_lsn= lsn; + log_sys.next_checkpoint_no= 0; + return DB_SUCCESS; +} - if (*max_field == 0) { - /* Before 10.2.2, we could get here during database - initialization if we created an LOG_FILE_NAME file that - was filled with zeroes, and were killed. After - 10.2.2, we would reject such a file already earlier, - when checking the file header. */ - ib::error() << "No valid checkpoint found" - " (corrupted redo log)." - " You can try --innodb-force-recovery=6" - " as a last resort."; - return(DB_ERROR); - } +/** Determine if the redo log is clean. +@return error code +@retval DB_SUCCESS if the redo log is clean +@retval DB_CORRUPTION if the redo log is corrupted +@retval DB_ERROR if the redo log is not empty and cannot be upgraded +@retval DB_FAIL if crash recovery is needed */ +static dberr_t recv_check() +{ + byte *buf= log_sys.buf; - if (dberr_t err = recv_log_recover_10_4()) { - ib::error() - << "Upgrade after a crash is not supported." - " The redo log was created with " << creator - << (err == DB_ERROR - ? "." : ", and it appears corrupted."); - return err; - } + ut_ad(!(log_sys.log.file_size & 511)); + log_sys.log.main_read(0, {buf, OS_FILE_LOG_BLOCK_SIZE}); + /* Check the header page checksum. There was no checksum in the + first redo log format (version 0). */ + log_sys.log.format= mach_read_from_4(buf + log_header::FORMAT); + log_sys.log.key_version= 0; - return(DB_SUCCESS); -current: - /* TODO: Seek to the end of the file, read & validate the - last checkpoint_size bytes. If it is valid and points to - the end of the log, fine. Else, start crash recovery. */ - return DB_SUCCESS; + if (log_sys.log.format != log_t::FORMAT_3_23 && + ut_crc32(buf, 512 - 4) != mach_read_from_4(&buf[512 - 4])) + { + ib::error() << "Invalid redo log header checksum."; + return DB_CORRUPTION; + } + + char creator[log_header::CREATOR_END - log_header::CREATOR + 1]; + memcpy(creator, buf + log_header::CREATOR, sizeof creator); + /* Ensure that the string is NUL-terminated. */ + creator[log_header::CREATOR_END - log_header::CREATOR] = 0; + + switch (log_sys.log.format) { + dberr_t err; + case log_t::FORMAT_3_23: + return recv_log_recover_pre_10_2(); + case log_t::FORMAT_10_2: + case log_t::FORMAT_10_2 | log_t::FORMAT_ENCRYPTED: + case log_t::FORMAT_10_3: + case log_t::FORMAT_10_3 | log_t::FORMAT_ENCRYPTED: + case log_t::FORMAT_10_4: + case log_t::FORMAT_10_4 | log_t::FORMAT_ENCRYPTED: + err= recv_log_recover_10_4(); + if (err != DB_SUCCESS) + ib::error() << "Upgrade after a crash is not supported." + " The redo log was created with " << creator + << (err == DB_ERROR + ? "." : ", and it appears corrupted."); + return err; + case log_t::FORMAT_10_5: + log_sys.log.key_version= mach_read_from_4(buf + log_header::KEY_VERSION); + + if (auto size= mach_read_from_8(buf + log_header::SIZE)) + { + size &= ~(1ULL << 47); + if (size == log_sys.log.file_size) + break; + ib::error() << "Inconsistent redo log size: " + << size << "!=" << log_sys.log.file_size; + } + /* fall through */ + default: + ib::error() << "Unsupported redo log format." + " The redo log was created with " << creator << "."; + return DB_ERROR; + } + + /* TODO: Seek to the end of the file, read & validate the + last checkpoint_size bytes. If it is valid and points to + the end of the log, fine. Else, start crash recovery. */ + if (log_sys.log.main_file_size() < 512 + 19) + return DB_CORRUPTION; + + log_sys.log.main_read(log_sys.log.main_file_size() - 19, {buf, 19}); + if (buf[0] != (FILE_CHECKPOINT | (8 + 6))) + return DB_FAIL; + if (mach_read_from_4(buf + 19 - 4) != ut_crc32(buf, 15)) + return DB_FAIL; + + log_sys.last_checkpoint_lsn= log_sys.next_checkpoint_lsn= + recv_sys.parse_start_lsn= recv_sys.scanned_lsn= + log_sys.write_lsn= log_sys.current_flush_lsn= + mach_read_from_8(buf + 1); + log_sys.set_lsn(log_sys.last_checkpoint_lsn); + log_sys.set_flushed_lsn(log_sys.last_checkpoint_lsn); + + os_offset_t data_file_offset= mach_read_from_6(buf + 1 + 8); + recv_sys.sequence_bit= !!(data_file_offset & (1ULL << 47)); + data_file_offset&= ~(1ULL << 47); + + if (data_file_offset >= log_sys.log.file_size) + // not corrupted checkpoint with incorrect file offset? + return DB_FAIL; + + os_offset_t first_block_offset= data_file_offset & + ~(OS_FILE_LOG_BLOCK_SIZE - 1); + log_sys.log.data_read(first_block_offset, {buf, OS_FILE_LOG_BLOCK_SIZE}); + + os_offset_t offset_in_block= data_file_offset & (OS_FILE_LOG_BLOCK_SIZE - 1); + byte *record= buf + offset_in_block; + auto decoded_header= mlog_decode_varint(record); + auto header_size= mlog_decode_varint_length(record[0]); + + os_offset_t size= decoded_header >> 2; + + if (!size || (decoded_header & 1) != recv_sys.sequence_bit || + data_file_offset + size > log_sys.log.file_size) + return DB_SUCCESS; /* Garbage at the end of the log */ + + if (decoded_header & 2) /* skip_bit is set: we must read more */ + return DB_FAIL; + + os_offset_t n_blocks= ((data_file_offset + header_size + size) % + OS_FILE_LOG_BLOCK_SIZE - first_block_offset) / + OS_FILE_LOG_BLOCK_SIZE; + + if (os_offset_t further_blocks= n_blocks - 1) + log_sys.log.data_read(first_block_offset + OS_FILE_LOG_BLOCK_SIZE, + {buf + OS_FILE_LOG_BLOCK_SIZE, + further_blocks * OS_FILE_LOG_BLOCK_SIZE}); + + /* Clear the sequence bit before calculating the checksum. */ + record[header_size - 1] &= ~1; + if (mach_read_from_4(record + header_size + size - 4) != + ut_crc32(record, header_size + size)) + return DB_SUCCESS; /* Garbage at the end of the log */ + + return DB_FAIL; } /** Trim old log records for a page. @@ -2205,7 +2243,6 @@ static void recv_recover_page(buf_block_t* block, mtr_t& mtr, ut_ad(l->lsn); ut_ad(end_lsn <= l->lsn); end_lsn = l->lsn; - ut_ad(end_lsn <= log_sys.log.scanned_lsn); ut_ad(l->start_lsn); ut_ad(recv_start_lsn <= l->start_lsn); @@ -2711,7 +2748,7 @@ bool recv_sys_add_to_parsing_buf(const byte* log_block, lsn_t scanned_lsn) start_offset = data_len - more_len; - end_offset = std::min<ulint>(data_len, log_sys.trailer_offset()); + end_offset = data_len; // trailer_offset ut_ad(start_offset <= end_offset); @@ -2738,6 +2775,7 @@ void recv_sys_justify_left_parsing_buf() recv_sys.recovered_offset = 0; } +#if 0//FIXME /** Scan redo log from a buffer and stores new log data to the parsing buffer. Parse and hash the log records if new data found. Apply log records automatically when the hash table becomes full. @@ -2934,31 +2972,26 @@ func_exit: mutex_exit(&recv_sys.mutex); return(finished); } +#endif -/** Scans log from a buffer and stores new log data to the parsing buffer. -Parses and hashes the log records if new data found. -@param[in] checkpoint_lsn latest checkpoint log sequence number +/** Parse and store log. @return the last parsed LSN */ -static lsn_t recv_group_scan_log_recs(lsn_t checkpoint_lsn) +static lsn_t recv_scan() { - DBUG_ENTER("recv_group_scan_log_recs"); + DBUG_ENTER("recv_scan"); mutex_enter(&recv_sys.mutex); recv_sys.len = 0; recv_sys.recovered_offset = 0; recv_sys.clear(); - recv_sys.parse_start_lsn = - recv_sys.scanned_lsn = - recv_sys.recovered_lsn = checkpoint_lsn; - recv_sys.scanned_checkpoint_no = 0; ut_ad(recv_max_page_lsn == 0); mutex_exit(&recv_sys.mutex); - lsn_t start_lsn; - lsn_t end_lsn; + lsn_t end_lsn= 0; store_t store = STORE_IF_EXISTS; - log_sys.log.scanned_lsn = end_lsn = +#if 0//FIXME + end_lsn = ut_uint64_align_down(checkpoint_lsn, OS_FILE_LOG_BLOCK_SIZE); do { @@ -2969,14 +3002,13 @@ static lsn_t recv_group_scan_log_recs(lsn_t checkpoint_lsn) } while (end_lsn != start_lsn && !recv_scan_log_recs(&store, log_sys.buf, checkpoint_lsn, start_lsn, end_lsn, - &log_sys.log.scanned_lsn)); - + &recv_sys.scanned_lsn)); +#endif if (recv_sys.found_corrupt_log || recv_sys.found_corrupt_fs) { DBUG_RETURN(false); } - DBUG_PRINT("ib_log", ("scan " LSN_PF " completed", - log_sys.log.scanned_lsn)); + DBUG_PRINT("ib_log", ("scan " LSN_PF " completed", end_lsn)); DBUG_RETURN(store == STORE_NO); } @@ -3159,12 +3191,6 @@ of first system tablespace page dberr_t recv_recovery_from_checkpoint_start(lsn_t flush_lsn) { - ulint max_cp_field; - lsn_t checkpoint_lsn; - ib_uint64_t checkpoint_no; - byte* buf; - dberr_t err = DB_SUCCESS; - ut_ad(srv_operation == SRV_OPERATION_NORMAL || srv_operation == SRV_OPERATION_RESTORE || srv_operation == SRV_OPERATION_RESTORE_EXPORT); @@ -3184,49 +3210,21 @@ recv_recovery_from_checkpoint_start(lsn_t flush_lsn) return(DB_SUCCESS); } - recv_recovery_on = true; - - log_mutex_enter(); - - err = recv_find_max_checkpoint(&max_cp_field); - - if (err != DB_SUCCESS) { - recv_sys.recovered_lsn = log_sys.get_lsn(); -err_exit: - log_mutex_exit(); - return(err); - } - - buf = log_sys.buf; - log_sys.log.main_read(max_cp_field, {buf, OS_FILE_LOG_BLOCK_SIZE}); - - checkpoint_lsn = mach_read_from_8(buf + LOG_CHECKPOINT_LSN); - checkpoint_no = mach_read_from_8(buf + LOG_CHECKPOINT_NO); - - /* Start reading the log from the checkpoint lsn. */ - ut_ad(RECV_SCAN_SIZE <= srv_log_buffer_size); - ut_ad(recv_sys.pages.empty()); - - switch (log_sys.log.format) { - case 0: - log_mutex_exit(); - return DB_SUCCESS; + recv_recovery_on = true; + log_mutex_enter(); + dberr_t err = recv_check(); + recv_sys.recovered_lsn= log_sys.get_lsn(); + log_mutex_exit(); + switch (err) { default: - if (const lsn_t end_lsn = mach_read_from_8( - buf + LOG_CHECKPOINT_END_LSN)) { - if (end_lsn < checkpoint_lsn) { - recv_sys.found_corrupt_log = true; - err = DB_ERROR; - goto err_exit; - } - } - log_sys.set_lsn(recv_sys.recovered_lsn); - ut_ad(recv_sys.recovered_lsn == checkpoint_lsn); - goto completed; - case log_t::FORMAT_10_5: - lsn_t end_lsn = recv_group_scan_log_recs(checkpoint_lsn); + return err; + case DB_SUCCESS: + break; + case DB_FAIL: + ib::error() << "FIXME: crash recovery does not work yet"; + lsn_t end_lsn= recv_scan(); #if 1// FIXME /* The first scan should not have stored or applied any records. */ @@ -3236,7 +3234,9 @@ err_exit: if (srv_read_only_mode && recv_needed_recovery) { err = DB_READ_ONLY; - goto err_exit; +err_exit: + log_mutex_exit(); + return err; } if (recv_sys.found_corrupt_log && !srv_force_recovery) { @@ -3244,50 +3244,11 @@ err_exit: err = DB_ERROR; goto err_exit; } - } - - /* NOTE: we always do a 'recovery' at startup, but only if - there is something wrong we will print a message to the - user about recovery: */ - - if (flush_lsn == checkpoint_lsn) { - /* The redo log is logically empty. */ - } else if (checkpoint_lsn != flush_lsn) { - ut_ad(!srv_log_file_created); - - if (checkpoint_lsn < flush_lsn) { - ib::warn() - << "Are you sure you are using the right " - << LOG_FILE_NAME - << " to start up the database? Log sequence " - "number in the " - << LOG_FILE_NAME << " is " << checkpoint_lsn - << ", less than the log sequence number in " - "the first system tablespace file header, " - << flush_lsn << "."; - } - - if (!recv_needed_recovery) { - ib::info() - << "The log sequence number " << flush_lsn - << " in the system tablespace does not match" - " the log sequence number " - << checkpoint_lsn << " in the " - << LOG_FILE_NAME << "!"; - - if (srv_read_only_mode) { - ib::error() << "innodb_read_only" - " prevents crash recovery"; - log_mutex_exit(); - return(DB_READ_ONLY); - } - recv_needed_recovery = true; - } + log_sys.set_lsn(end_lsn); + break; } - log_sys.set_lsn(recv_sys.recovered_lsn); - #if 0// MDEV-14425 TODO if (recv_needed_recovery) { bool missing_tablespace = false; @@ -3296,7 +3257,6 @@ err_exit: rescan, missing_tablespace); if (err != DB_SUCCESS) { - log_mutex_exit(); return(err); } @@ -3326,7 +3286,6 @@ err_exit: rescan, missing_tablespace); if (err != DB_SUCCESS) { - log_mutex_exit(); return err; } @@ -3337,21 +3296,16 @@ err_exit: } #endif - if (log_sys.log.scanned_lsn < checkpoint_lsn - || log_sys.log.scanned_lsn < recv_max_page_lsn) { - + if (recv_sys.scanned_lsn < recv_max_page_lsn) { ib::error() << "We scanned the log up to " - << log_sys.log.scanned_lsn - << ". A checkpoint was at " << checkpoint_lsn << " and" - " the maximum LSN on a database page was " + << recv_sys.scanned_lsn + << ". The maximum LSN on a database page was " << recv_max_page_lsn << ". It is possible that the" " database is now corrupt!"; } -completed: +#if 0// FIXME if (recv_sys.recovered_lsn < checkpoint_lsn) { - log_mutex_exit(); - ib::error() << "Recovered only to lsn:" << recv_sys.recovered_lsn << " checkpoint_lsn: " << checkpoint_lsn; @@ -3378,15 +3332,11 @@ completed: log_sys.last_checkpoint_lsn = checkpoint_lsn; log_sys.next_checkpoint_no = ++checkpoint_no; +#endif mutex_enter(&recv_sys.mutex); - recv_sys.apply_log_recs = true; - mutex_exit(&recv_sys.mutex); - - log_mutex_exit(); - recv_lsn_checks_on = true; /* The database is now ready to start almost normal processing of user diff --git a/storage/innobase/srv/srv0start.cc b/storage/innobase/srv/srv0start.cc index d2e52f894b1..c8edeaaba1c 100644 --- a/storage/innobase/srv/srv0start.cc +++ b/storage/innobase/srv/srv0start.cc @@ -324,7 +324,7 @@ static dberr_t create_log_file(lsn_t lsn, std::string& logfile0) sizeof log_header::CREATOR_CURRENT); static_assert(log_header::CREATOR_END - log_header::CREATOR == sizeof log_header::CREATOR_CURRENT, "compatibility"); - log_block_set_checksum(buf, log_block_calc_checksum_crc32(buf)); + mach_write_to_4(&buf[512 - 4], ut_crc32(buf, 512 - 4)); buf+= 512; /* Write FILE_ID records for any non-predefined tablespaces. */ @@ -1109,17 +1109,17 @@ static lsn_t srv_prepare_to_delete_redo_log_file(bool old_exists) DBUG_RETURN(flushed_lsn); } -/** Tries to locate LOG_FILE_NAME and check it's size, etc +/** Tries to locate log files and check their size, etc @param[out] log_file_found returns true here if correct file was found -@return dberr_t with DB_SUCCESS or some error */ -static dberr_t find_and_check_log_file(bool &log_file_found) +@return DB_SUCCESS or some error */ +static dberr_t find_and_check_log(bool &log_file_found) { log_file_found= false; auto logfile0= get_log_file_path(); os_file_stat_t stat_info; - const dberr_t err= os_file_get_status(logfile0.c_str(), &stat_info, false, - srv_read_only_mode); + dberr_t err= os_file_get_status(logfile0.c_str(), &stat_info, false, + srv_read_only_mode); auto is_operation_restore= []() -> bool { return srv_operation == SRV_OPERATION_RESTORE || @@ -1140,24 +1140,45 @@ static dberr_t find_and_check_log_file(bool &log_file_found) if (!srv_file_check_mode(logfile0.c_str())) return DB_ERROR; - const os_offset_t size= stat_info.size; - ut_a(size != (os_offset_t) -1); + ut_a(stat_info.size != (os_offset_t) -1); - if (size < OS_FILE_LOG_BLOCK_SIZE) + if (stat_info.size == 0 && is_operation_restore()) { - ib::error() << "Log file " << logfile0 << " size " << size + /* Tolerate an empty "ib_logfile0" from a previous run of + mariabackup --prepare. */ + return DB_NOT_FOUND; + } + + if (stat_info.size < 512) + { + ib::error() << "Log file " << logfile0 << " size " << stat_info.size << " is too small"; return DB_ERROR; } - if (size == 0 && is_operation_restore()) + auto logdata= get_log_file_path(LOG_DATA_FILE_NAME); + + err= os_file_get_status(logdata.c_str(), &stat_info, false, + srv_read_only_mode); + if (err == DB_NOT_FOUND) { - /* Tolerate an empty LOG_FILE_NAME from a previous run of - mariabackup --prepare. */ - return DB_NOT_FOUND; + if (is_operation_restore()) + return DB_NOT_FOUND; + + return DB_SUCCESS; + } + + ut_a(stat_info.size != (os_offset_t) -1); + + if (!stat_info.size || (stat_info.size & 511)) + { + ib::error() << "Log file " << logdata << " size " << stat_info.size + << " is incorrect"; + return DB_ERROR; } - srv_log_file_size= size; + srv_log_file_size= stat_info.size; + log_sys.log.file_size= stat_info.size; log_file_found= true; return DB_SUCCESS; @@ -1443,7 +1464,7 @@ dberr_t srv_start(bool create_new_db) srv_log_file_size = 0; bool log_file_found; - if (dberr_t err = find_and_check_log_file(log_file_found)) { + if (dberr_t err = find_and_check_log(log_file_found)) { if (err == DB_NOT_FOUND) { return DB_SUCCESS; } |