summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThirunarayanan Balathandayuthapani <thiru@mariadb.com>2019-12-20 12:33:29 +0530
committerThirunarayanan Balathandayuthapani <thiru@mariadb.com>2019-12-20 12:33:29 +0530
commita2e792932db488d764420e7adfa9d745a202e463 (patch)
treefc3988b2329348e2c6028f6846aff83882923377
parent305081a7354f4ef17d2e16ca16f747ee754fee69 (diff)
downloadmariadb-git-bb-10.5-MDEV-19176.tar.gz
MDEV-21351 Replace recv_sys.heap with list of buf_block_t*bb-10.5-MDEV-19176
- Replace recv_sys.heap with list of buf_block_t for storing redo logs during recovery.
-rw-r--r--extra/mariabackup/xtrabackup.cc2
-rw-r--r--storage/innobase/buf/buf0rea.cc3
-rw-r--r--storage/innobase/include/log0recv.h64
-rw-r--r--storage/innobase/log/log0recv.cc324
4 files changed, 264 insertions, 129 deletions
diff --git a/extra/mariabackup/xtrabackup.cc b/extra/mariabackup/xtrabackup.cc
index bb066daaba6..382c868f474 100644
--- a/extra/mariabackup/xtrabackup.cc
+++ b/extra/mariabackup/xtrabackup.cc
@@ -2676,7 +2676,7 @@ static lsn_t xtrabackup_copy_log(lsn_t start_lsn, lsn_t end_lsn, bool last)
}
}
- if (more_data && recv_parse_log_recs(0, STORE_NO, false)) {
+ if (more_data && recv_parse_log_recs(0, false)) {
msg("Error: copying the log failed");
diff --git a/storage/innobase/buf/buf0rea.cc b/storage/innobase/buf/buf0rea.cc
index e6d36259eac..dc2530721f6 100644
--- a/storage/innobase/buf/buf0rea.cc
+++ b/storage/innobase/buf/buf0rea.cc
@@ -766,7 +766,8 @@ buf_read_recv_pages(
ulint count = 0;
buf_pool = buf_pool_get(cur_page_id);
- while (buf_pool->n_pend_reads >= recv_n_pool_free_frames / 2) {
+ while (buf_pool->n_pend_reads >=
+ (recv_sys.num_avail_blocks * srv_page_size) / 2) {
os_thread_sleep(10000);
diff --git a/storage/innobase/include/log0recv.h b/storage/innobase/include/log0recv.h
index 6e65b7458c5..170dab4b173 100644
--- a/storage/innobase/include/log0recv.h
+++ b/storage/innobase/include/log0recv.h
@@ -105,11 +105,10 @@ bool recv_sys_add_to_parsing_buf(const byte* log_block, lsn_t scanned_lsn);
/** Parse log records from a buffer and optionally store them in recv_sys.pages
to wait merging to file pages.
@param[in] checkpoint_lsn the LSN of the latest checkpoint
-@param[in] store whether to store page operations
@param[in] apply whether to apply the records
@return whether MLOG_CHECKPOINT record was seen the first time,
or corruption was noticed */
-bool recv_parse_log_recs(lsn_t checkpoint_lsn, store_t store, bool apply);
+bool recv_parse_log_recs(lsn_t checkpoint_lsn, bool apply);
/** Moves the parsing buffer data left to the buffer start. */
void recv_sys_justify_left_parsing_buf();
@@ -279,8 +278,6 @@ struct recv_sys_t{
record, or 0 if none was parsed */
/** the time when progress was last reported */
time_t progress_time;
- mem_heap_t* heap; /*!< memory heap of log records and file
- addresses*/
using map = std::map<const page_id_t, page_recv_t,
std::less<const page_id_t>,
@@ -309,6 +306,19 @@ struct recv_sys_t{
/** Last added LSN to pages. */
lsn_t last_stored_lsn;
+ /** Last stored offset to pages */
+ ulint last_stored_offset;
+
+ /** Number of buf_block_t* blocks to store the redo log */
+ ulint num_avail_blocks;
+
+ /** Base node of the redo block list */
+ UT_LIST_BASE_NODE_T(buf_block_t) redo_list;
+
+ /** Value to indicate whether to store the redo log
+ in buf_block_t */
+ store_t store_value;
+
/** Initialize the redo log recovery subsystem. */
void create();
@@ -347,6 +357,46 @@ struct recv_sys_t{
progress_time = time;
return true;
}
+
+ /** Check whether overflow of pages happened
+ @param[in] len len of the data to be added */
+ bool check_overflow(uint32_t len);
+
+ /** Get the memory block for storing recv_t and redo log data
+ @param[in] len len of the data to be stored
+ @param[in] store_data whether the redo log data to be stored
+ @param[in] new_block whether to create new block
+ @return block of the data to be stored */
+ byte* get_mem_block(uint32_t len, bool store_data=false,
+ bool new_block=false);
+
+ /** Get the free len of the latest block
+ @return free length*/
+ uint32_t get_free_len();
+
+ /** Get the block for the given page by traversing the
+ redo list in recv_sys
+ @param[in] page page of the redo log data
+ @return block of the data */
+ buf_block_t* get_block(byte* page);
+
+ /** Set the store value with given parameter
+ @param[in] store data to be stored */
+ void set_store_value(store_t store) { store_value= store; }
+
+ /** Get the store value from store_value */
+ store_t get_store_value() { return store_value; }
+
+ /** Assign the store value depends on last phase and
+ mlog_checkpoint_lsn. */
+ void assign_store_value(bool last_phase)
+ {
+ store_value= (mlog_checkpoint_lsn == 0)
+ ? STORE_NO: (last_phase ? STORE_IF_EXISTS : STORE_YES);
+ }
+
+ /** Remove the free blocks from redo list. */
+ void remove_free_blocks();
};
/** The recovery system */
@@ -387,10 +437,4 @@ times! */
roll-forward */
#define RECV_SCAN_SIZE (4U << srv_page_size_shift)
-/** This many frames must be left free in the buffer pool when we scan
-the log and store the scanned log records in the buffer pool: we will
-use these free frames to read in pages when we start applying the
-log records to the database. */
-extern ulint recv_n_pool_free_frames;
-
#endif
diff --git a/storage/innobase/log/log0recv.cc b/storage/innobase/log/log0recv.cc
index afce73f5524..a619b472aa2 100644
--- a/storage/innobase/log/log0recv.cc
+++ b/storage/innobase/log/log0recv.cc
@@ -100,14 +100,6 @@ static ulint recv_previous_parsed_rec_offset;
/** The 'multi' flag of the previous parsed redo log record */
static ulint recv_previous_parsed_rec_is_multi;
-/** This many frames must be left free in the buffer pool when we scan
-the log and store the scanned log records in the buffer pool: we will
-use these free frames to read in pages when we start applying the
-log records to the database.
-This is the default value. If the actual size of the buffer pool is
-larger than 10 MB we'll set this value to 512. */
-ulint recv_n_pool_free_frames;
-
/** The maximum lsn we see for a page during the recovery process. If this
is bigger than the lsn we are able to scan up to, that is an indication that
the recovery failed and the database may be corrupt. */
@@ -690,11 +682,6 @@ void recv_sys_t::close()
dblwr.pages.clear();
pages.clear();
- if (heap) {
- mem_heap_free(heap);
- heap = NULL;
- }
-
if (flush_start) {
os_event_destroy(flush_start);
}
@@ -717,6 +704,29 @@ void recv_sys_t::close()
mlog_init.clear();
}
+bool recv_sys_t::check_overflow(uint32_t len)
+{
+ if (!UT_LIST_GET_LEN(redo_list))
+ return false;
+
+ buf_block_t* block= UT_LIST_GET_FIRST(redo_list);
+
+ if (block->modify_clock + len + sizeof(recv_t) + sizeof(recv_t::data_t)
+ <= srv_page_size)
+ return false;
+
+ /* Added OS_FILE_LOG_BLOCK_SIZE to compensate sizeof(recv_t)
+ and sizeof (recv_t::data_t). */
+ len += OS_FILE_LOG_BLOCK_SIZE;
+
+ uint32_t n_pages= (len - (srv_page_size - block->modify_clock))
+ / srv_page_size;
+ if (num_avail_blocks >= (UT_LIST_GET_LEN(redo_list) + n_pages))
+ return false;
+
+ return true;
+}
+
/************************************************************
Reset the state of the recovery system variables. */
void
@@ -730,7 +740,6 @@ recv_sys_var_init(void)
recv_previous_parsed_rec_type = MLOG_SINGLE_REC_FLAG;
recv_previous_parsed_rec_offset = 0;
recv_previous_parsed_rec_is_multi = 0;
- recv_n_pool_free_frames = 384;
recv_max_page_lsn = 0;
}
@@ -802,8 +811,6 @@ void recv_sys_t::create()
mutex_create(LATCH_ID_RECV_SYS, &mutex);
mutex_create(LATCH_ID_RECV_WRITER, &writer_mutex);
- heap = mem_heap_create_typed(256, MEM_HEAP_FOR_RECV_SYS);
-
if (!srv_read_only_mode) {
flush_start = os_event_create(0);
flush_end = os_event_create(0);
@@ -814,12 +821,7 @@ void recv_sys_t::create()
apply_batch_on = false;
ulint size = buf_pool_get_curr_size();
- /* Set appropriate value of recv_n_pool_free_frames. */
- if (size >= 10 << 20) {
- /* Buffer pool of size greater than 10 MB. */
- recv_n_pool_free_frames = 512;
- }
-
+ num_avail_blocks = size / (3 * srv_page_size);
buf = static_cast<byte*>(ut_malloc_dontdump(RECV_PARSING_BUF_SIZE));
buf_size = RECV_PARSING_BUF_SIZE;
len = 0;
@@ -837,6 +839,8 @@ void recv_sys_t::create()
memset(truncated_undo_spaces, 0, sizeof truncated_undo_spaces);
last_stored_lsn = 0;
+ UT_LIST_INIT(redo_list, &buf_block_t::unzip_LRU);
+ store_value= STORE_NO;
}
/** Clear a fully processed set of stored redo log records. */
@@ -844,7 +848,15 @@ inline void recv_sys_t::clear()
{
ut_ad(mutex_own(&mutex));
pages.clear();
- mem_heap_empty(heap);
+
+ buf_block_t* prev_block= nullptr;
+ for (buf_block_t* block = UT_LIST_GET_LAST(redo_list);
+ block; block = prev_block) {
+ prev_block = UT_LIST_GET_PREV(unzip_LRU, block);
+ ut_ad(buf_block_get_state(block) == BUF_BLOCK_MEMORY);
+ UT_LIST_REMOVE(redo_list, block);
+ buf_block_free(block);
+ }
}
/** Free most recovery data structures. */
@@ -855,11 +867,9 @@ void recv_sys_t::debug_free()
mutex_enter(&mutex);
pages.clear();
- mem_heap_free(heap);
ut_free_dodump(buf, buf_size);
buf = NULL;
- heap = NULL;
/* wake page cleaner up to progress */
if (!srv_read_only_mode) {
@@ -1737,6 +1747,70 @@ parse_log:
return(ptr);
}
+uint32_t recv_sys_t::get_free_len()
+{
+ if (!UT_LIST_GET_LEN(redo_list))
+ return 0;
+
+ return srv_page_size - UT_LIST_GET_FIRST(redo_list)->modify_clock;
+}
+
+byte* recv_sys_t::get_mem_block(uint32_t len, bool store_data,
+ bool new_block)
+{
+ buf_block_t* block= UT_LIST_GET_FIRST(redo_list);
+ uint32_t free_offset= (block == NULL) ? 0:block->modify_clock;
+
+ if (new_block)
+ goto create_block;
+
+ if (free_offset + len <= srv_page_size)
+ {
+ if (store_data)
+ block->page.fix();
+ block->modify_clock += len;
+ } else {
+create_block:
+ buf_block_t* new_block= buf_block_alloc(nullptr);
+ new_block->modify_clock= 0;
+ buf_block_t* free_block= new_block;
+ UT_LIST_ADD_FIRST(redo_list, new_block);
+
+ if (store_data)
+ new_block->page.fix();
+
+ if (len < srv_page_size)
+ new_block->modify_clock += len;
+ else
+ {
+ new_block->modify_clock = srv_page_size;
+ new_block = buf_block_alloc(nullptr);
+ UT_LIST_ADD_FIRST(redo_list, new_block);
+ }
+
+ return free_block->frame;
+ }
+
+ return block->frame + free_offset;
+}
+
+void recv_sys_t::remove_free_blocks()
+{
+ buf_block_t* prev_block= NULL;
+ for (buf_block_t* block= UT_LIST_GET_LAST(redo_list);
+ block != NULL; )
+ {
+ prev_block= UT_LIST_GET_PREV(unzip_LRU, block);
+ if (0 == static_cast<uint32_t>(block->page.buf_fix_count))
+ {
+ UT_LIST_REMOVE(redo_list, block);
+ buf_block_free(block);
+ }
+
+ block= prev_block;
+ }
+}
+
/** Store a redo log record for applying.
@param type record type
@param page_id page identifier
@@ -1748,6 +1822,9 @@ inline void recv_sys_t::add(mlog_id_t type, const page_id_t page_id,
const byte* body, const byte* rec_end, lsn_t lsn,
lsn_t end_lsn)
{
+ if (get_store_value() == STORE_NO)
+ return;
+
ut_ad(type != MLOG_FILE_DELETE);
ut_ad(type != MLOG_FILE_CREATE2);
ut_ad(type != MLOG_FILE_RENAME2);
@@ -1775,16 +1852,36 @@ inline void recv_sys_t::add(mlog_id_t type, const page_id_t page_id,
/* Store the log record body in limited-size chunks, because the
heap grows into the buffer pool. */
uint32_t len= uint32_t(rec_end - body);
- const uint32_t chunk_limit= static_cast<uint32_t>(RECV_DATA_BLOCK_SIZE);
- recv_t *recv= new (mem_heap_alloc(heap, sizeof(recv_t)))
- recv_t(len, type, lsn, end_lsn);
+ if (check_overflow(len))
+ {
+ DBUG_PRINT("ib_log", ("Ran out of memory and last stored lsn "
+ LSN_PF " last stored offset " ULINTPF "\n",
+ last_stored_lsn, last_stored_offset));
+ set_store_value(STORE_NO);
+ return;
+ }
+
+ last_stored_lsn= recovered_lsn;
+ last_stored_offset= recovered_offset;
+ uint32_t free_limit= get_free_len();
+ bool new_block= false;
+
+ /* Free space of the block is not sufficient. So create a
+ new block for storing redo logs */
+ if (free_limit < sizeof(recv_t) + sizeof(recv_t::data) + 1)
+ new_block= true;
+
+ recv_t* recv= new (get_mem_block(sizeof(recv_t), false, new_block))
+ recv_t(len, type, lsn, end_lsn);
recs.log.append(recv);
- for (recv_t::data_t *prev= NULL;;) {
- const uint32_t l= std::min(len, chunk_limit);
- recv_t::data_t *d= new (mem_heap_alloc(heap, sizeof(recv_t::data_t) + l))
- recv_t::data_t(body, l);
+ for (recv_t::data_t *prev= NULL;;)
+ {
+ uint32_t data_free_limit= get_free_len() - sizeof(recv_t::data);
+ const uint32_t l= std::min(len, data_free_limit);
+ recv_t::data_t *d= new (get_mem_block(
+ sizeof(recv_t::data) + l, true)) recv_t::data_t(body, l);
if (prev)
prev->append(d);
else
@@ -1824,26 +1921,44 @@ inline void page_recv_t::will_not_read()
log.clear();
}
+buf_block_t* recv_sys_t::get_block(page_t* page)
+{
+ for (buf_block_t* block= UT_LIST_GET_LAST(redo_list);
+ block != NULL; block= UT_LIST_GET_PREV(unzip_LRU, block))
+ {
+ if (block->frame == page)
+ return block;
+ }
+ return NULL;
+}
-/*********************************************************************//**
-Copies the log record body from recv to buf. */
+/** Copies the log record body from recv to buf.
+@param[in] buf buffer of length at least recv->len
+@param[in] recv log record */
static ATTRIBUTE_COLD
void
recv_data_copy_to_buf(
-/*==================*/
- byte* buf, /*!< in: buffer of length at least recv->len */
- const recv_t& recv) /*!< in: log record */
+ byte* buf,
+ const recv_t& recv)
{
const recv_t::data_t* recv_data = recv.data;
+ page_t* initial_page = page_align(recv_data);
ulint len = recv.len;
- const ulint chunk_limit = static_cast<ulint>(RECV_DATA_BLOCK_SIZE);
-
do {
+ ulint offset = page_offset(recv_data + 1);
+ buf_block_t* block = recv_sys.get_block(
+ page_align(recv_data));
+ const ulint chunk_limit = (srv_page_size - offset);
+
const ulint l = std::min(len, chunk_limit);
+
memcpy(buf, reinterpret_cast<const byte*>(recv_data + 1), l);
recv_data = recv_data->next;
buf += l;
len -= l;
+
+ if (initial_page != block->frame)
+ block->page.unfix();
} while (len);
}
@@ -1886,7 +2001,6 @@ static void recv_recover_page(buf_block_t* block, mtr_t& mtr,
lsn_t start_lsn = 0, end_lsn = 0;
ut_d(lsn_t recv_start_lsn = 0);
const lsn_t init_lsn = init ? init->lsn : 0;
- const ulint chunk_limit = static_cast<ulint>(RECV_DATA_BLOCK_SIZE);
for (const log_rec_t* l : p->second.log) {
const recv_t* recv = static_cast<const recv_t*>(l);
@@ -1930,10 +2044,13 @@ static void recv_recover_page(buf_block_t* block, mtr_t& mtr,
<< " len " << recv->len
<< " page " << block->page.id);
- byte* buf;
+ ulint data_offset= page_offset(recv->data+ 1);
+ byte* buf= NULL;
const byte* recs;
+ buf_block_t* first_block = recv_sys.get_block(
+ page_align(recv->data + 1));
- if (UNIV_UNLIKELY(recv->len > chunk_limit)) {
+ if (srv_page_size - data_offset < recv->len) {
/* We have to copy the record body to
a separate buffer */
recs = buf = static_cast<byte*>
@@ -1961,6 +2078,7 @@ static void recv_recover_page(buf_block_t* block, mtr_t& mtr,
end_lsn);
}
+ first_block->page.unfix();
ut_free(buf);
}
}
@@ -2099,6 +2217,7 @@ void recv_apply_hashed_log_recs(bool last_batch)
while (recv_sys.apply_batch_on) {
bool abort = recv_sys.found_corrupt_log;
+ recv_sys.remove_free_blocks();
mutex_exit(&recv_sys.mutex);
if (abort) {
@@ -2512,11 +2631,10 @@ recv_mlog_index_load(ulint space_id, ulint page_no, lsn_t lsn)
/** Parse log records from a buffer and optionally store them to a
hash table to wait merging to file pages.
@param[in] checkpoint_lsn the LSN of the latest checkpoint
-@param[in] store whether to store page operations
@param[in] apply whether to apply the records
@return whether MLOG_CHECKPOINT record was seen the first time,
or corruption was noticed */
-bool recv_parse_log_recs(lsn_t checkpoint_lsn, store_t store, bool apply)
+bool recv_parse_log_recs(lsn_t checkpoint_lsn, bool apply)
{
bool single_rec;
ulint len;
@@ -2629,20 +2747,14 @@ loop:
}
break;
default:
- switch (store) {
- case STORE_NO:
+ if (recv_sys.get_store_value() == STORE_IF_EXISTS
+ && !fil_space_get_size(space)) {
break;
- case STORE_IF_EXISTS:
- if (!fil_space_get_size(space)) {
- break;
- }
- /* fall through */
- case STORE_YES:
- recv_sys.add(
- type, page_id_t(space, page_no), body,
- ptr + len, old_lsn,
- recv_sys.recovered_lsn);
}
+
+ recv_sys.add(type, page_id_t(space, page_no), body,
+ ptr + len, old_lsn,
+ recv_sys.recovered_lsn);
/* fall through */
case MLOG_INDEX_LOAD:
if (type == MLOG_INDEX_LOAD) {
@@ -2805,22 +2917,14 @@ corrupted_log:
recv_parse_or_apply_log_rec_body(). */
break;
default:
- switch (store) {
- case STORE_NO:
- break;
- case STORE_IF_EXISTS:
- if (!fil_space_get_size(space)) {
- break;
- }
- /* fall through */
- case STORE_YES:
- recv_sys.add(
- type,
- page_id_t(space, page_no),
- body, ptr + len,
- old_lsn,
- new_recovered_lsn);
- }
+ if (recv_sys.get_store_value()
+ == STORE_IF_EXISTS
+ && !fil_space_get_size(space))
+ break;
+
+ recv_sys.add(type, page_id_t(space, page_no),
+ body, ptr + len, old_lsn,
+ new_recovered_lsn);
}
ptr += len;
@@ -2911,26 +3015,20 @@ void recv_sys_justify_left_parsing_buf()
/** Scan redo log from a buffer and stores new log data to the parsing buffer.
Parse and hash the log records if new data found.
Apply log records automatically when the hash table becomes full.
+@param[in] log_block log segment
+@param[in] checkpoint_lsn latest checkpoint LSN
+@param[in] start_lsn buffer start LSN
+@param[in] end_lsn buffer end LSN
+@param[in,out] contiguous_lsn it is known that all log groups contain
+ contiguous log data up to this lsn
+@param[in] last_phase last phase of redo log apply
+@param[out] group_scanned_lsn scanning succeeded up to this lsn
@return true if not able to scan any more in this log group */
-static
-bool
-recv_scan_log_recs(
-/*===============*/
- ulint available_memory,/*!< in: we let the hash table of recs
- to grow to this size, at the maximum */
- store_t* store_to_hash, /*!< in,out: whether the records should be
- stored to the hash table; this is reset
- if just debug checking is needed, or
- when the available_memory runs out */
- const byte* log_block, /*!< in: log segment */
- lsn_t checkpoint_lsn, /*!< in: latest checkpoint LSN */
- lsn_t start_lsn, /*!< in: buffer start LSN */
- lsn_t end_lsn, /*!< in: buffer end LSN */
- lsn_t* contiguous_lsn, /*!< in/out: it is known that all log
- groups contain contiguous log data up
- to this lsn */
- lsn_t* group_scanned_lsn)/*!< out: scanning succeeded up to
- this lsn */
+static bool
+recv_scan_log_recs(const byte* log_block, lsn_t checkpoint_lsn,
+ lsn_t start_lsn, lsn_t end_lsn,
+ lsn_t* contiguous_lsn, bool last_phase,
+ lsn_t* group_scanned_lsn)
{
lsn_t scanned_lsn = start_lsn;
bool finished = false;
@@ -3072,8 +3170,7 @@ recv_scan_log_recs(
if (more_data && !recv_sys.found_corrupt_log) {
/* Try to parse more log records */
- if (recv_parse_log_recs(checkpoint_lsn,
- *store_to_hash, apply)) {
+ if (recv_parse_log_recs(checkpoint_lsn, apply)) {
ut_ad(recv_sys.found_corrupt_log
|| recv_sys.found_corrupt_fs
|| recv_sys.mlog_checkpoint_lsn
@@ -3082,19 +3179,10 @@ recv_scan_log_recs(
goto func_exit;
}
- if (*store_to_hash != STORE_NO
- && mem_heap_get_size(recv_sys.heap) > available_memory) {
-
- DBUG_PRINT("ib_log", ("Ran out of memory and last "
- "stored lsn " LSN_PF,
- recv_sys.recovered_lsn));
-
- recv_sys.last_stored_lsn = recv_sys.recovered_lsn;
- *store_to_hash = STORE_NO;
- }
-
if (recv_sys.recovered_offset > recv_parsing_buf_size / 4) {
/* Move parsing buffer data to the buffer start */
+ if (last_phase && recv_sys.get_store_value() == STORE_NO)
+ recv_sys.recovered_offset= recv_sys.last_stored_offset;
recv_sys_justify_left_parsing_buf();
}
@@ -3139,21 +3227,24 @@ recv_group_scan_log_recs(
ut_ad(last_phase || !recv_writer_thread_active);
mutex_exit(&recv_sys.mutex);
+ recv_sys.assign_store_value(last_phase);
lsn_t start_lsn;
lsn_t end_lsn;
- store_t store_to_hash = recv_sys.mlog_checkpoint_lsn == 0
- ? STORE_NO : (last_phase ? STORE_IF_EXISTS : STORE_YES);
- ulint available_mem = srv_page_size
- * (buf_pool_get_n_pages()
- - (recv_n_pool_free_frames * srv_buf_pool_instances));
-
log_sys.log.scanned_lsn = end_lsn = *contiguous_lsn =
ut_uint64_align_down(*contiguous_lsn, OS_FILE_LOG_BLOCK_SIZE);
do {
- if (last_phase && store_to_hash == STORE_NO) {
- store_to_hash = STORE_IF_EXISTS;
+ if (last_phase && recv_sys.get_store_value() == STORE_NO) {
+ recv_sys.set_store_value(STORE_IF_EXISTS);
+
recv_apply_hashed_log_recs(false);
+ /* Ran out of memory during last phase.
+ So re-read the redo log records from last stored
+ offset in recv_sys.buf */
+ if (recv_sys.recovered_offset)
+ recv_sys.recovered_offset= recv_sys.last_stored_offset;
+
+ recv_sys.recovered_lsn= recv_sys.last_stored_lsn;
}
start_lsn = ut_uint64_align_down(end_lsn,
@@ -3162,10 +3253,9 @@ recv_group_scan_log_recs(
log_sys.log.read_log_seg(&end_lsn, start_lsn + RECV_SCAN_SIZE);
} while (end_lsn != start_lsn
&& !recv_scan_log_recs(
- available_mem, &store_to_hash, log_sys.buf,
- checkpoint_lsn,
- start_lsn, end_lsn,
- contiguous_lsn, &log_sys.log.scanned_lsn));
+ log_sys.buf, checkpoint_lsn,
+ start_lsn, end_lsn, contiguous_lsn,
+ last_phase, &log_sys.log.scanned_lsn));
if (recv_sys.found_corrupt_log || recv_sys.found_corrupt_fs) {
DBUG_RETURN(false);
@@ -3175,7 +3265,7 @@ recv_group_scan_log_recs(
last_phase ? "rescan" : "scan",
log_sys.log.scanned_lsn));
- DBUG_RETURN(store_to_hash == STORE_NO);
+ DBUG_RETURN(recv_sys.get_store_value() == STORE_NO);
}
/** Report a missing tablespace for which page-redo log exists.