diff options
author | Thirunarayanan Balathandayuthapani <thiru@mariadb.com> | 2020-07-29 22:32:41 +0530 |
---|---|---|
committer | Thirunarayanan Balathandayuthapani <thiru@mariadb.com> | 2020-07-30 16:28:46 +0530 |
commit | 9cc886846a11686881c133173b20c796c579bb62 (patch) | |
tree | 635228bddf42647c113b89d12cb61df8fbc227e8 | |
parent | a154228377ea205322ff48b1f4cebbca588cc72d (diff) | |
download | mariadb-git-9cc886846a11686881c133173b20c796c579bb62.tar.gz |
MDEV-11799 InnoDB can abort if the doublewrite buffer contains a bad and a good copy
- Addressed marko's review comments.
-rw-r--r-- | storage/innobase/buf/buf0dblwr.cc | 38 | ||||
-rw-r--r-- | storage/innobase/include/log0recv.h | 14 | ||||
-rw-r--r-- | storage/innobase/log/log0recv.cc | 77 |
3 files changed, 68 insertions, 61 deletions
diff --git a/storage/innobase/buf/buf0dblwr.cc b/storage/innobase/buf/buf0dblwr.cc index 02b37f5a9d6..475e31ce305 100644 --- a/storage/innobase/buf/buf0dblwr.cc +++ b/storage/innobase/buf/buf0dblwr.cc @@ -534,24 +534,38 @@ buf_dblwr_process() { ulint page_no_dblwr = 0; byte* read_buf; - byte* unaligned_read_buf; recv_dblwr_t& recv_dblwr = recv_sys->dblwr; if (!buf_dblwr) { return; } - unaligned_read_buf = static_cast<byte*>( - ut_malloc_nokey(2 * UNIV_PAGE_SIZE)); - read_buf = static_cast<byte*>( - ut_align(unaligned_read_buf, UNIV_PAGE_SIZE)); + aligned_malloc(3 * srv_page_size, srv_page_size)); + + byte* const buf = read_buf + srv_page_size; for (recv_dblwr_t::list::iterator i = recv_dblwr.pages.begin(); i != recv_dblwr.pages.end(); ++i, ++page_no_dblwr) { byte* page = *i; ulint space_id = page_get_space_id(page); + + /* Ignore the dblwr pages for the following: + 1) find_page could have set lsn to zero for oldest page. + 2) dblwr page shouldn't be lesser than checkpoint lsn or + greater than last scanned lsn. */ + ulint lsn= mach_read_from_8(page + FIL_PAGE_LSN); + if (!lsn || recv_sys->parse_start_lsn > lsn) { + continue; + } + + if (recv_sys->scanned_lsn < lsn) { + ib::warn() << "Doublewrite page have future lsn " + << lsn; + continue; + } + fil_space_t* space = fil_space_acquire_for_io(space_id); if (space == NULL) { @@ -619,8 +633,8 @@ next_process: ignore this page (there should be redo log records to initialize it). */ } else if (recv_dblwr.validate_page( - space, page_no, read_buf)) { - goto next_process; + space, page_no, read_buf, false, buf)) { + goto next_process; } else { /* We intentionally skip this message for is_all_zero pages. */ @@ -629,12 +643,14 @@ next_process: << " from the doublewrite buffer."; } - page = (byte*) recv_dblwr.find_page(space_id, page_no, - true); + page = recv_dblwr.find_page(space_id, page_no, space, + buf); if (!page) { goto next_process; } + + fil_space_release_for_io(space); /* Write the good page from the doublewrite buffer to the intended position. */ @@ -642,7 +658,7 @@ next_process: fil_io(write_request, true, page_id, page_size, 0, page_size.physical(), - const_cast<byte*>(page), NULL); + const_cast<byte*>(page), NULL); ib::info() << "Recovered page " << page_id << " from the doublewrite buffer."; @@ -651,7 +667,7 @@ next_process: recv_dblwr.pages.clear(); fil_flush_file_spaces(FIL_TYPE_TABLESPACE); - ut_free(unaligned_read_buf); + aligned_free(read_buf); } /****************************************************************//** diff --git a/storage/innobase/include/log0recv.h b/storage/innobase/include/log0recv.h index 667b408f37d..e154d853b33 100644 --- a/storage/innobase/include/log0recv.h +++ b/storage/innobase/include/log0recv.h @@ -207,18 +207,24 @@ struct recv_dblwr_t { @param[in] page page to be validated @param[in] dblwr_page variable to indicate doublewrite buffer page + @param[in] tmp_buf temporary buffer to decrypt and + decompress the page @return true if success */ bool validate_page(fil_space_t *space, ulint page_no, - const byte* page, bool dblwr_page=false); + const byte *page, bool dblwr_page=false, + byte *tmp_buf=nullptr); /** Find a doublewrite copy of a page. @param[in] space_id tablespace identifier @param[in] page_no page number - @param[in] validate validate the page + @param[in] space tablespace object to do + dblwr page validation + @param[in] tmp_buf temporary buffer to decrypt and + decompress the page @return page frame @retval NULL if no page was found */ - const byte* find_page(ulint space_id, ulint page_no, - bool validate=false); + byte* find_page(ulint space_id, ulint page_no, + fil_space_t *space=nullptr, byte *tmp_buf=nullptr); typedef std::list<byte*, ut_allocator<byte*> > list; diff --git a/storage/innobase/log/log0recv.cc b/storage/innobase/log/log0recv.cc index a075fa01066..524f359e8d0 100644 --- a/storage/innobase/log/log0recv.cc +++ b/storage/innobase/log/log0recv.cc @@ -4089,16 +4089,17 @@ recv_recovery_rollback_active(void) } bool recv_dblwr_t::validate_page(fil_space_t* space, ulint page_no, - const byte* page, bool dblwr_page) + const byte* page, bool dblwr_page, + byte* tmp_buf) { - byte tmp_frame[UNIV_PAGE_SIZE_MAX]; - byte tmp_page[UNIV_PAGE_SIZE_MAX]; - ulint page_type= mach_read_from_2(page + FIL_PAGE_TYPE); + ut_ad(tmp_buf); + byte *tmp_frame = tmp_buf; + byte *tmp_page = tmp_buf + srv_page_size; + const uint16_t page_type= mach_read_from_2(page + FIL_PAGE_TYPE); const page_size_t page_size(space->flags); const ulint phy_size= page_size.physical(); - const bool expect_encrypted= - (space->crypt_data && - space->crypt_data->type != CRYPT_SCHEME_UNENCRYPTED); + const bool expect_encrypted= space->crypt_data && + space->crypt_data->type != CRYPT_SCHEME_UNENCRYPTED; if (dblwr_page) { @@ -4111,46 +4112,32 @@ bool recv_dblwr_t::validate_page(fil_space_t* space, ulint page_no, << "due to invalid flags " << ib::hex(flags); return false; } - - /* dblwr page shouldn't be lesser than checkpoint lsn or - greater than last scanned lsn */ - ulint lsn= mach_read_from_8(page + FIL_PAGE_LSN); - if (recv_sys->parse_start_lsn > lsn) - return false; - if (recv_sys->scanned_lsn < lsn) - { - ib::warn() << "Doublewrite page have future lsn " << lsn; - return false; - } } - if (page_no && expect_encrypted - && mach_read_from_4(page + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION)) + if (page_no && expect_encrypted && + mach_read_from_4(page + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION)) { if (!fil_space_verify_crypt_checksum(page, page_size)) return false; if (page_type != FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED) return true; + if (page_size.is_compressed()) + return false; + memcpy(tmp_page, page, phy_size); if (!fil_space_decrypt(space, tmp_frame, tmp_page)) return false; - - if (page_type != FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED) - return !buf_page_is_corrupted(true, tmp_page, page_size, space); } - if (page_type == FIL_PAGE_PAGE_COMPRESSED) - memcpy(tmp_page, page, phy_size); - - if (page_type == FIL_PAGE_PAGE_COMPRESSED - || page_type == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED) + switch(page_type) { - ulint decomp= fil_page_decompress(tmp_frame, tmp_page); - if (!decomp - || (decomp != srv_page_size && page_size.is_compressed()) - || buf_page_is_corrupted(true, tmp_page, page_size, space)) - return false; - return true; + case FIL_PAGE_PAGE_COMPRESSED: + memcpy(tmp_page, page, phy_size); + /* fall through */ + case FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED: + return !page_size.is_compressed() && + fil_page_decompress(tmp_frame, tmp_page) == srv_page_size && + !buf_page_is_corrupted(true, tmp_page, page_size, space); } return !buf_page_is_corrupted(true, page, page_size, space); @@ -4162,30 +4149,28 @@ bool recv_dblwr_t::validate_page(fil_space_t* space, ulint page_no, @param[in] validate validate the doublewrite buffer page @return page frame @retval NULL if no page was found */ -const byte* +byte* recv_dblwr_t::find_page(ulint space_id, ulint page_no, - bool validate) + fil_space_t *space, byte *tmp_buf) { - const byte *result= NULL; + byte *result= NULL; lsn_t max_lsn= 0; for (list::const_iterator i = pages.begin(); i != pages.end(); ++i) { - const byte *page= *i; + byte *page= *i; if (page_get_page_no(page) != page_no || page_get_space_id(page) != space_id) continue; const lsn_t lsn= mach_read_from_8(page + FIL_PAGE_LSN); - if (lsn <= max_lsn) - continue; - if (validate) + if (lsn <= max_lsn || + (space && !validate_page(space, page_no, page, true, tmp_buf))) { - fil_space_t *space= fil_space_acquire_for_io(space_id); - bool is_valid= validate_page(space, page_no, page, true); - fil_space_release_for_io(space); - if (!is_valid) - continue; + /* Reset the PAGE_LSN to 0 for the older dblwr pages */ + memset(page + FIL_PAGE_LSN, 0, 8); + continue; } + max_lsn= lsn; result= page; } |