diff options
Diffstat (limited to 'storage/innobase/page/page0cur.cc')
-rw-r--r-- | storage/innobase/page/page0cur.cc | 285 |
1 files changed, 195 insertions, 90 deletions
diff --git a/storage/innobase/page/page0cur.cc b/storage/innobase/page/page0cur.cc index f295f7e764b..cc94d6cfa3e 100644 --- a/storage/innobase/page/page0cur.cc +++ b/storage/innobase/page/page0cur.cc @@ -89,7 +89,10 @@ page_cur_try_search_shortcut( goto exit_func; } - next_rec = page_rec_get_next_const(rec); + if (!(next_rec = page_rec_get_next_const(rec))) { + goto exit_func; + } + if (!page_rec_is_supremum(next_rec)) { offsets = rec_get_offsets(next_rec, index, offsets, index->n_core_fields, @@ -180,7 +183,10 @@ page_cur_try_search_shortcut_bytes( goto exit_func; } - next_rec = page_rec_get_next_const(rec); + if (!(next_rec = page_rec_get_next_const(rec))) { + goto exit_func; + } + if (!page_rec_is_supremum(next_rec)) { offsets = rec_get_offsets(next_rec, index, offsets, index->n_core_fields, @@ -268,7 +274,7 @@ page_cur_rec_field_extends( /****************************************************************//** Searches the right position for a page cursor. */ -void +bool page_cur_search_with_match( /*=======================*/ const buf_block_t* block, /*!< in: buffer block */ @@ -290,7 +296,6 @@ page_cur_search_with_match( ulint low; ulint mid; const page_t* page; - const page_dir_slot_t* slot; const rec_t* up_rec; const rec_t* low_rec; const rec_t* mid_rec; @@ -336,7 +341,7 @@ page_cur_search_with_match( && page_cur_try_search_shortcut( block, index, tuple, iup_matched_fields, ilow_matched_fields, cursor)) { - return; + return false; } # ifdef PAGE_CUR_DBG if (mode == PAGE_CUR_DBG) { @@ -353,10 +358,9 @@ page_cur_search_with_match( if (mode == PAGE_CUR_RTREE_INSERT && n_core) { mode = PAGE_CUR_LE; } else { - rtr_cur_search_with_match( + return rtr_cur_search_with_match( block, (dict_index_t*)index, tuple, mode, cursor, rtr_info); - return; } } @@ -387,9 +391,11 @@ page_cur_search_with_match( while (up - low > 1) { mid = (low + up) / 2; - slot = page_dir_get_nth_slot(page, mid); - mid_rec = page_dir_slot_get_rec(slot); - + const page_dir_slot_t* slot = page_dir_get_nth_slot(page, mid); + if (UNIV_UNLIKELY(!(mid_rec + = page_dir_slot_get_rec_validate(slot)))) { + goto corrupted; + } cur_matched_fields = std::min(low_matched_fields, up_matched_fields); @@ -432,18 +438,30 @@ up_slot_match: } } - slot = page_dir_get_nth_slot(page, low); - low_rec = page_dir_slot_get_rec(slot); - slot = page_dir_get_nth_slot(page, up); - up_rec = page_dir_slot_get_rec(slot); + low_rec = page_dir_slot_get_rec_validate( + page_dir_get_nth_slot(page, low)); + up_rec = page_dir_slot_get_rec_validate( + page_dir_get_nth_slot(page, up)); + if (UNIV_UNLIKELY(!low_rec || !up_rec)) { +corrupted: + if (UNIV_LIKELY_NULL(heap)) { + mem_heap_free(heap); + } + return true; + } /* Perform linear search until the upper and lower records come to distance 1 of each other. */ - while (page_rec_get_next_const(low_rec) != up_rec) { - - mid_rec = page_rec_get_next_const(low_rec); - + for (;;) { + if (const rec_t* next = page_rec_get_next_const(low_rec)) { + if (next == up_rec) { + break; + } + mid_rec = next; + } else { + goto corrupted; + } cur_matched_fields = std::min(low_matched_fields, up_matched_fields); @@ -513,6 +531,8 @@ up_rec_match: if (UNIV_LIKELY_NULL(heap)) { mem_heap_free(heap); } + + return false; } #ifdef BTR_CUR_HASH_ADAPT @@ -530,7 +550,7 @@ lower limit record @param[in,out] ilow_matched_bytes already matched bytes in the first partially matched field in the lower limit record @param[out] cursor page cursor */ -void +bool page_cur_search_with_match_bytes( const buf_block_t* block, const dict_index_t* index, @@ -544,9 +564,7 @@ page_cur_search_with_match_bytes( { ulint up; ulint low; - ulint mid; const page_t* page; - const page_dir_slot_t* slot; const rec_t* up_rec; const rec_t* low_rec; const rec_t* mid_rec; @@ -595,7 +613,7 @@ page_cur_search_with_match_bytes( iup_matched_fields, iup_matched_bytes, ilow_matched_fields, ilow_matched_bytes, cursor)) { - return; + return false; } # ifdef PAGE_CUR_DBG if (mode == PAGE_CUR_DBG) { @@ -633,9 +651,12 @@ page_cur_search_with_match_bytes( const ulint n_core = page_is_leaf(page) ? index->n_core_fields : 0; while (up - low > 1) { - mid = (low + up) / 2; - slot = page_dir_get_nth_slot(page, mid); - mid_rec = page_dir_slot_get_rec(slot); + const ulint mid = (low + up) / 2; + mid_rec = page_dir_slot_get_rec_validate( + page_dir_get_nth_slot(page, mid)); + if (UNIV_UNLIKELY(!mid_rec)) { + goto corrupted; + } ut_pair_min(&cur_matched_fields, &cur_matched_bytes, low_matched_fields, low_matched_bytes, @@ -682,18 +703,30 @@ up_slot_match: } } - slot = page_dir_get_nth_slot(page, low); - low_rec = page_dir_slot_get_rec(slot); - slot = page_dir_get_nth_slot(page, up); - up_rec = page_dir_slot_get_rec(slot); + low_rec = page_dir_slot_get_rec_validate( + page_dir_get_nth_slot(page, low)); + up_rec = page_dir_slot_get_rec_validate( + page_dir_get_nth_slot(page, up)); + if (UNIV_UNLIKELY(!low_rec || !up_rec)) { +corrupted: + if (UNIV_LIKELY_NULL(heap)) { + mem_heap_free(heap); + } + return true; + } /* Perform linear search until the upper and lower records come to distance 1 of each other. */ - while (page_rec_get_next_const(low_rec) != up_rec) { - - mid_rec = page_rec_get_next_const(low_rec); - + for (;;) { + if (const rec_t* next = page_rec_get_next_const(low_rec)) { + if (next == up_rec) { + break; + } + mid_rec = next; + } else { + goto corrupted; + } ut_pair_min(&cur_matched_fields, &cur_matched_bytes, low_matched_fields, low_matched_bytes, up_matched_fields, up_matched_bytes); @@ -762,6 +795,7 @@ up_rec_match: if (UNIV_LIKELY_NULL(heap)) { mem_heap_free(heap); } + return false; } #endif /* BTR_CUR_HASH_ADAPT */ @@ -774,17 +808,12 @@ page_cur_open_on_rnd_user_rec( buf_block_t* block, /*!< in: page */ page_cur_t* cursor) /*!< out: page cursor */ { - const ulint n_recs = page_get_n_recs(block->page.frame); - - page_cur_set_before_first(block, cursor); - - if (UNIV_UNLIKELY(n_recs == 0)) { - - return; - } - - cursor->rec = page_rec_get_nth(block->page.frame, - ut_rnd_interval(n_recs) + 1); + cursor->block= block; + if (const ulint n_recs= page_get_n_recs(block->page.frame)) + if ((cursor->rec= page_rec_get_nth(block->page.frame, + ut_rnd_interval(n_recs) + 1))) + return; + cursor->rec= page_get_infimum_rec(block->page.frame); } /** @@ -803,7 +832,7 @@ static void page_rec_set_n_owned(rec_t *rec, ulint n_owned, bool comp) Split a directory slot which owns too many records. @param[in,out] block index page @param[in,out] slot the slot that needs to be split */ -static void page_dir_split_slot(const buf_block_t &block, +static bool page_dir_split_slot(const buf_block_t &block, page_dir_slot_t *slot) { ut_ad(slot <= &block.page.frame[srv_page_size - PAGE_EMPTY_DIR_START]); @@ -816,10 +845,17 @@ static void page_dir_split_slot(const buf_block_t &block, PAGE_DIR_SLOT_MIN_N_OWNED, "compatibility"); /* Find a record approximately in the middle. */ - const rec_t *rec= page_dir_slot_get_rec(slot + PAGE_DIR_SLOT_SIZE); + const rec_t *rec= page_dir_slot_get_rec_validate(slot + PAGE_DIR_SLOT_SIZE); for (ulint i= n_owned / 2; i--; ) + { + if (UNIV_UNLIKELY(!rec)) + return true; rec= page_rec_get_next_const(rec); + } + + if (UNIV_UNLIKELY(!rec)) + return true; /* Add a directory slot immediately below this one. */ constexpr uint16_t n_slots_f= PAGE_N_DIR_SLOTS + PAGE_HEADER; @@ -829,7 +865,10 @@ static void page_dir_split_slot(const buf_block_t &block, page_dir_slot_t *last_slot= static_cast<page_dir_slot_t*> (block.page.frame + srv_page_size - (PAGE_DIR + PAGE_DIR_SLOT_SIZE) - n_slots * PAGE_DIR_SLOT_SIZE); - ut_ad(slot >= last_slot); + + if (UNIV_UNLIKELY(slot < last_slot)) + return true; + memmove_aligned<2>(last_slot, last_slot + PAGE_DIR_SLOT_SIZE, slot - last_slot); @@ -842,6 +881,7 @@ static void page_dir_split_slot(const buf_block_t &block, page_rec_set_n_owned(page_dir_slot_get_rec(slot), half_owned, comp); page_rec_set_n_owned(page_dir_slot_get_rec(slot - PAGE_DIR_SLOT_SIZE), n_owned - half_owned, comp); + return false; } /** @@ -867,6 +907,10 @@ static void page_zip_dir_split_slot(buf_block_t *block, ulint s, mtr_t* mtr) const rec_t *rec= page_dir_slot_get_rec(slot + PAGE_DIR_SLOT_SIZE); + /* We do not try to prevent crash on corruption here. + For ROW_FORMAT=COMPRESSED pages, the next-record links should + be validated in page_zip_decompress(). Corruption should only + be possible here if the buffer pool was corrupted later. */ for (ulint i= n_owned / 2; i--; ) rec= page_rec_get_next_const(rec); @@ -952,8 +996,12 @@ static void page_zip_dir_balance_slot(buf_block_t *block, ulint s, mtr_t *mtr) /* Transfer one record to the underfilled slot */ page_rec_set_n_owned<true>(block, slot_rec, 0, true, mtr); - rec_t* new_rec = rec_get_next_ptr(slot_rec, TRUE); - page_rec_set_n_owned<true>(block, new_rec, + const rec_t* new_rec = page_rec_get_next_low(slot_rec, TRUE); + /* We do not try to prevent crash on corruption here. + For ROW_FORMAT=COMPRESSED pages, the next-record links should + be validated in page_zip_decompress(). Corruption should only + be possible here if the buffer pool was corrupted later. */ + page_rec_set_n_owned<true>(block, const_cast<rec_t*>(new_rec), PAGE_DIR_SLOT_MIN_N_OWNED, true, mtr); mach_write_to_2(slot, page_offset(new_rec)); @@ -1014,18 +1062,27 @@ static void page_dir_balance_slot(const buf_block_t &block, ulint s) } /* Transfer one record to the underfilled slot */ - rec_t* new_rec; + const rec_t* new_rec; if (comp) { + if (UNIV_UNLIKELY(!(new_rec = + page_rec_get_next_low(slot_rec, true)))) { + ut_ad("corrupted page" == 0); + return; + } page_rec_set_n_owned(slot_rec, 0, true); - new_rec = rec_get_next_ptr(slot_rec, TRUE); - page_rec_set_n_owned(new_rec, PAGE_DIR_SLOT_MIN_N_OWNED, true); + page_rec_set_n_owned(const_cast<rec_t*>(new_rec), + PAGE_DIR_SLOT_MIN_N_OWNED, true); page_rec_set_n_owned(up_rec, up_n_owned - 1, true); } else { + if (UNIV_UNLIKELY(!(new_rec = + page_rec_get_next_low(slot_rec, false)))) { + ut_ad("corrupted page" == 0); + return; + } page_rec_set_n_owned(slot_rec, 0, false); - new_rec = rec_get_next_ptr(slot_rec, FALSE); - page_rec_set_n_owned(new_rec, PAGE_DIR_SLOT_MIN_N_OWNED, - false); + page_rec_set_n_owned(const_cast<rec_t*>(new_rec), + PAGE_DIR_SLOT_MIN_N_OWNED, false); page_rec_set_n_owned(up_rec, up_n_owned - 1, false); } @@ -1282,6 +1339,20 @@ inline void mtr_t::page_insert(const buf_block_t &block, bool reuse, m_last_offset= FIL_PAGE_TYPE; } +/** Report page directory corruption. +@param block index page +@param index index tree +*/ +ATTRIBUTE_COLD +static void page_cur_directory_corrupted(const buf_block_t &block, + const dict_index_t &index) +{ + ib::error() << "Directory of " << block.page.id() + << " of index " << index.name + << " in table " << index.table->name + << " is corrupted"; +} + /***********************************************************//** Inserts a record next to page cursor on an uncompressed page. @return pointer to record @@ -1624,9 +1695,14 @@ copied: { const ulint owner= page_dir_find_owner_slot(next_rec); if (UNIV_UNLIKELY(owner == ULINT_UNDEFINED)) + { + page_cur_directory_corrupted(*block, *index); + return nullptr; + } + + if (page_dir_split_slot(*block, page_dir_get_nth_slot(block->page.frame, + owner))) return nullptr; - page_dir_split_slot(*block, - page_dir_get_nth_slot(block->page.frame, owner)); } rec_offs_make_valid(insert_buf + extra_size, index, @@ -1694,20 +1770,20 @@ static inline void page_zip_dir_add_slot(buf_block_t *block, /***********************************************************//** Inserts a record next to page cursor on a compressed and uncompressed -page. Returns pointer to inserted record if succeed, i.e., -enough space available, NULL otherwise. -The cursor stays at the same position. +page. IMPORTANT: The caller will have to update IBUF_BITMAP_FREE if this is a compressed leaf page in a secondary index. This has to be done either within the same mini-transaction, or by invoking ibuf_reset_free_bits() before mtr_commit(). -@return pointer to record if succeed, NULL otherwise */ +@return pointer to inserted record +@return nullptr on failure */ rec_t* page_cur_insert_rec_zip( /*====================*/ - page_cur_t* cursor, /*!< in/out: page cursor */ + page_cur_t* cursor, /*!< in/out: page cursor, + logical position unchanged */ dict_index_t* index, /*!< in: record descriptor */ const rec_t* rec, /*!< in: pointer to a physical record */ rec_offs* offsets,/*!< in/out: rec_get_offsets(rec, index) */ @@ -1787,6 +1863,9 @@ page_cur_insert_rec_zip( { ulint pos= page_rec_get_n_recs_before(cursor->rec); + if (UNIV_UNLIKELY(pos == ULINT_UNDEFINED)) + return nullptr; + switch (page_zip_reorganize(cursor->block, index, level, mtr, true)) { case DB_FAIL: ut_ad(cursor->rec == cursor_rec); @@ -1797,10 +1876,13 @@ page_cur_insert_rec_zip( return nullptr; } - if (pos) - cursor->rec= page_rec_get_nth(page, pos); - else - ut_ad(cursor->rec == page_get_infimum_rec(page)); + if (!pos) + ut_ad(cursor->rec == page + PAGE_NEW_INFIMUM); + else if (!(cursor->rec= page_rec_get_nth(page, pos))) + { + cursor->rec= page + PAGE_NEW_SUPREMUM; + return nullptr; + } ut_ad(!page_header_get_ptr(page, PAGE_FREE)); @@ -1817,16 +1899,21 @@ page_cur_insert_rec_zip( if (insert_rec) { ulint pos= page_rec_get_n_recs_before(insert_rec); - ut_ad(pos > 0); + if (UNIV_UNLIKELY(!pos || pos == ULINT_UNDEFINED)) + return nullptr; /* We are writing entire page images to the log. Reduce the redo log volume by reorganizing the page at the same time. */ switch (page_zip_reorganize(cursor->block, index, level, mtr)) { case DB_SUCCESS: /* The page was reorganized: Seek to pos. */ - cursor->rec= pos > 1 - ? page_rec_get_nth(page, pos - 1) - : page + PAGE_NEW_INFIMUM; + if (pos <= 1) + cursor->rec= page + PAGE_NEW_INFIMUM; + else if (!(cursor->rec= page_rec_get_nth(page, pos - 1))) + { + cursor->rec= page + PAGE_NEW_INFIMUM; + return nullptr; + } insert_rec= page + rec_get_next_offs(cursor->rec, 1); rec_offs_make_valid(insert_rec, index, page_is_leaf(page), offsets); break; @@ -1892,19 +1979,25 @@ too_small: byte *const free_rec_ptr= page + free_rec; heap_no= rec_get_heap_no_new(free_rec_ptr); - int16_t next_rec= mach_read_from_2(free_rec_ptr - REC_NEXT); + int16_t next_free= mach_read_from_2(free_rec_ptr - REC_NEXT); /* With innodb_page_size=64k, int16_t would be unsafe to use here, but that cannot be used with ROW_FORMAT=COMPRESSED. */ static_assert(UNIV_ZIP_SIZE_SHIFT_MAX == 14, "compatibility"); - if (next_rec) + if (next_free) { - next_rec= static_cast<int16_t>(next_rec + free_rec); - ut_ad(int{PAGE_NEW_SUPREMUM_END + REC_N_NEW_EXTRA_BYTES} <= next_rec); - ut_ad(static_cast<uint16_t>(next_rec) < srv_page_size); + next_free= static_cast<int16_t>(next_free + free_rec); + if (UNIV_UNLIKELY(int{PAGE_NEW_SUPREMUM_END + REC_N_NEW_EXTRA_BYTES} > + next_free || + uint16_t(next_free) >= srv_page_size)) + { + if (UNIV_LIKELY_NULL(heap)) + mem_heap_free(heap); + return nullptr; + } } byte *hdr= my_assume_aligned<4>(&page_zip->data[page_free_f]); - mach_write_to_2(hdr, static_cast<uint16_t>(next_rec)); + mach_write_to_2(hdr, static_cast<uint16_t>(next_free)); const byte *const garbage= my_assume_aligned<2>(page_free + 2); ut_ad(mach_read_from_2(garbage) >= rec_size); mach_write_to_2(my_assume_aligned<2>(hdr + 2), @@ -1960,18 +2053,20 @@ use_heap: page_zip_dir_add_slot(cursor->block, index, mtr); } + /* next record after current before the insertion */ + const rec_t *next_rec = page_rec_get_next_low(cursor->rec, TRUE); + if (UNIV_UNLIKELY(!next_rec || + rec_get_status(next_rec) == REC_STATUS_INFIMUM || + rec_get_status(cursor->rec) > REC_STATUS_INFIMUM)) + return nullptr; + /* 3. Create the record */ byte *insert_rec= rec_copy(insert_buf, rec, offsets); rec_offs_make_valid(insert_rec, index, page_is_leaf(page), offsets); /* 4. Insert the record in the linked list of records */ ut_ad(cursor->rec != insert_rec); - - /* next record after current before the insertion */ - const rec_t* next_rec = page_rec_get_next_low(cursor->rec, TRUE); - ut_ad(rec_get_status(cursor->rec) <= REC_STATUS_INFIMUM); ut_ad(rec_get_status(insert_rec) < REC_STATUS_INFIMUM); - ut_ad(rec_get_status(next_rec) != REC_STATUS_INFIMUM); mach_write_to_2(insert_rec - REC_NEXT, static_cast<uint16_t> (next_rec - insert_rec)); @@ -2038,8 +2133,9 @@ inc_dir: /* 7. It remains to update the owner record. */ ulint n_owned; - while (!(n_owned = rec_get_n_owned_new(next_rec))) - next_rec= page_rec_get_next_low(next_rec, true); + while (!(n_owned= rec_get_n_owned_new(next_rec))) + if (!(next_rec= page_rec_get_next_low(next_rec, true))) + return nullptr; rec_set_bit_field_1(const_cast<rec_t*>(next_rec), n_owned + 1, REC_NEW_N_OWNED, REC_N_OWNED_MASK, REC_N_OWNED_SHIFT); @@ -2053,7 +2149,10 @@ inc_dir: { const ulint owner= page_dir_find_owner_slot(next_rec); if (UNIV_UNLIKELY(owner == ULINT_UNDEFINED)) + { + page_cur_directory_corrupted(*cursor->block, *index); return nullptr; + } page_zip_dir_split_slot(cursor->block, owner, mtr); } @@ -2186,8 +2285,8 @@ page_cur_delete_rec( However, this could also be a call in btr_cur_pessimistic_update() to delete the only record in the page and to insert another one. */ - page_cur_move_to_next(cursor); - ut_ad(page_cur_is_after_last(cursor)); + ut_ad(page_rec_is_supremum(page_rec_get_next(cursor->rec))); + page_cur_set_after_last(block, cursor); page_create_empty(page_cur_get_block(cursor), const_cast<dict_index_t*>(index), mtr); return; @@ -2198,6 +2297,7 @@ page_cur_delete_rec( if (UNIV_UNLIKELY(!cur_slot_no || cur_slot_no == ULINT_UNDEFINED)) { /* Avoid crashing due to a corrupted page. */ + page_cur_directory_corrupted(*block, *index); return; } @@ -2221,11 +2321,16 @@ page_cur_delete_rec( while (current_rec != rec) { prev_rec = rec; - rec = page_rec_get_next(rec); + if (!(rec = page_rec_get_next(rec))) { + /* Avoid crashing due to a corrupted page. */ + return; + } } - page_cur_move_to_next(cursor); - next_rec = cursor->rec; + if (!(next_rec = page_cur_move_to_next(cursor))) { + /* Avoid crashing due to a corrupted page. */ + return; + } /* Remove the record from the linked list of records */ /* If the deleted record is pointed to by a dir slot, update the @@ -2547,7 +2652,7 @@ inc_dir: mach_write_to_2(page_n_recs, mach_read_from_2(page_n_recs) + 1); if (UNIV_UNLIKELY(n_owned == PAGE_DIR_SLOT_MAX_N_OWNED)) - page_dir_split_slot(block, owner_slot); + return page_dir_split_slot(block, owner_slot); ut_ad(page_simple_validate_old(page)); return false; } @@ -2772,7 +2877,7 @@ inc_dir: mach_write_to_2(page_n_recs, mach_read_from_2(page_n_recs) + 1); if (UNIV_UNLIKELY(n_owned == PAGE_DIR_SLOT_MAX_N_OWNED)) - page_dir_split_slot(block, owner_slot); + return page_dir_split_slot(block, owner_slot); ut_ad(page_simple_validate_new(page)); return false; } |