summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarko Mäkelä <marko.makela@mariadb.com>2023-05-11 14:29:56 +0300
committerMarko Mäkelä <marko.makela@mariadb.com>2023-05-11 14:29:56 +0300
commite9a6dc7ae59dec89bdbb1d60bcbdc76bc3d583c8 (patch)
tree589c27a25b5d1be526469bc9fbea0070a5740d40
parent717e3b3cfdb167e8b930323397dc6e852ef94f17 (diff)
parent1c6792d19e48381a12cc3b34598949b4159843a3 (diff)
downloadmariadb-git-bb-10.9-MDEV-29911.tar.gz
-rw-r--r--extra/mariabackup/xtrabackup.cc4
-rw-r--r--storage/innobase/buf/buf0dblwr.cc2
-rw-r--r--storage/innobase/buf/buf0flu.cc1
-rw-r--r--storage/innobase/buf/buf0lru.cc9
-rw-r--r--storage/innobase/buf/buf0rea.cc83
-rw-r--r--storage/innobase/fil/fil0fil.cc68
-rw-r--r--storage/innobase/include/buf0buf.h3
-rw-r--r--storage/innobase/include/buf0rea.h11
-rw-r--r--storage/innobase/include/log0recv.h211
-rw-r--r--storage/innobase/include/os0file.h9
-rw-r--r--storage/innobase/log/log0recv.cc1754
-rw-r--r--storage/innobase/os/os0file.cc90
12 files changed, 1325 insertions, 920 deletions
diff --git a/extra/mariabackup/xtrabackup.cc b/extra/mariabackup/xtrabackup.cc
index 6f42a9be05a..180616f37e9 100644
--- a/extra/mariabackup/xtrabackup.cc
+++ b/extra/mariabackup/xtrabackup.cc
@@ -3146,7 +3146,7 @@ static bool xtrabackup_copy_logfile()
if (log_sys.buf[recv_sys.offset] <= 1)
break;
- if (recv_sys.parse_mtr(STORE_NO) == recv_sys_t::OK)
+ if (recv_sys.parse_mtr<false>(false) == recv_sys_t::OK)
{
do
{
@@ -3156,7 +3156,7 @@ static bool xtrabackup_copy_logfile()
sequence_offset));
*seq= 1;
}
- while ((r= recv_sys.parse_mtr(STORE_NO)) == recv_sys_t::OK);
+ while ((r= recv_sys.parse_mtr<false>(false)) == recv_sys_t::OK);
if (ds_write(dst_log_file, log_sys.buf + start_offset,
recv_sys.offset - start_offset))
diff --git a/storage/innobase/buf/buf0dblwr.cc b/storage/innobase/buf/buf0dblwr.cc
index 1260145ed1c..510ad02256d 100644
--- a/storage/innobase/buf/buf0dblwr.cc
+++ b/storage/innobase/buf/buf0dblwr.cc
@@ -372,7 +372,7 @@ void buf_dblwr_t::recover()
const uint32_t space_id= page_get_space_id(page);
const page_id_t page_id(space_id, page_no);
- if (recv_sys.lsn < lsn)
+ if (recv_sys.scanned_lsn < lsn)
{
ib::info() << "Ignoring a doublewrite copy of page " << page_id
<< " with future log sequence number " << lsn;
diff --git a/storage/innobase/buf/buf0flu.cc b/storage/innobase/buf/buf0flu.cc
index acfef5daad2..d7622bcd0c3 100644
--- a/storage/innobase/buf/buf0flu.cc
+++ b/storage/innobase/buf/buf0flu.cc
@@ -2583,6 +2583,7 @@ ATTRIBUTE_COLD void buf_flush_page_cleaner_init()
/** Flush the buffer pool on shutdown. */
ATTRIBUTE_COLD void buf_flush_buffer_pool()
{
+ ut_ad(!os_aio_pending_reads());
ut_ad(!buf_page_cleaner_is_active);
ut_ad(!buf_flush_sync_lsn);
diff --git a/storage/innobase/buf/buf0lru.cc b/storage/innobase/buf/buf0lru.cc
index 724aa641f12..8a25e9c5266 100644
--- a/storage/innobase/buf/buf0lru.cc
+++ b/storage/innobase/buf/buf0lru.cc
@@ -1093,7 +1093,11 @@ static bool buf_LRU_block_remove_hashed(buf_page_t *bpage, const page_id_t id,
ut_a(!zip || !bpage->oldest_modification());
ut_ad(bpage->zip_size());
-
+ /* Skip consistency checks if the page was freed.
+ In recovery, we could get a sole FREE_PAGE record
+ and nothing else, for a ROW_FORMAT=COMPRESSED page.
+ Its contents would be garbage. */
+ if (!bpage->is_freed())
switch (fil_page_get_type(page)) {
case FIL_PAGE_TYPE_ALLOCATED:
case FIL_PAGE_INODE:
@@ -1224,6 +1228,7 @@ void buf_pool_t::corrupted_evict(buf_page_t *bpage, uint32_t state)
buf_pool_t::hash_chain &chain= buf_pool.page_hash.cell_get(id.fold());
page_hash_latch &hash_lock= buf_pool.page_hash.lock_get(chain);
+ recv_sys.free_corrupted_page(id);
mysql_mutex_lock(&mutex);
hash_lock.lock();
@@ -1248,8 +1253,6 @@ void buf_pool_t::corrupted_evict(buf_page_t *bpage, uint32_t state)
buf_LRU_block_free_hashed_page(reinterpret_cast<buf_block_t*>(bpage));
mysql_mutex_unlock(&mutex);
-
- recv_sys.free_corrupted_page(id);
}
/** Update buf_pool.LRU_old_ratio.
diff --git a/storage/innobase/buf/buf0rea.cc b/storage/innobase/buf/buf0rea.cc
index 3d7b1f29fac..f2eac1b0f95 100644
--- a/storage/innobase/buf/buf0rea.cc
+++ b/storage/innobase/buf/buf0rea.cc
@@ -658,60 +658,35 @@ failed:
return count;
}
-/** @return whether a page has been freed */
-inline bool fil_space_t::is_freed(uint32_t page)
+/** Schedule a page for recovery.
+@param space tablespace
+@param page_id page identifier
+@param recs log records
+@param init page initialization, or nullptr if the page needs to be read */
+void buf_read_recover(fil_space_t *space, const page_id_t page_id,
+ page_recv_t &recs, recv_init *init)
{
- std::lock_guard<std::mutex> freed_lock(freed_range_mutex);
- return freed_ranges.contains(page);
-}
-
-/** Issues read requests for pages which recovery wants to read in.
-@param space_id tablespace identifier
-@param page_nos page numbers to read, in ascending order */
-void buf_read_recv_pages(uint32_t space_id, st_::span<uint32_t> page_nos)
-{
- fil_space_t* space = fil_space_t::get(space_id);
-
- if (!space) {
- /* The tablespace is missing or unreadable: do nothing */
- return;
- }
-
- const ulint zip_size = space->zip_size();
-
- for (ulint i = 0; i < page_nos.size(); i++) {
-
- /* Ignore if the page already present in freed ranges. */
- if (space->is_freed(page_nos[i])) {
- continue;
- }
-
- const page_id_t cur_page_id(space_id, page_nos[i]);
-
- ulint limit = 0;
- for (ulint j = 0; j < buf_pool.n_chunks; j++) {
- limit += buf_pool.chunks[j].size / 2;
- }
+ ut_ad(space->id == page_id.space());
+ space->reacquire();
+ const ulint zip_size= space->zip_size();
- if (os_aio_pending_reads() >= limit) {
- os_aio_wait_until_no_pending_reads(false);
- }
-
- space->reacquire();
- switch (buf_read_page_low(space, false, BUF_READ_ANY_PAGE,
- cur_page_id, zip_size, true)) {
- case DB_SUCCESS: case DB_SUCCESS_LOCKED_REC:
- break;
- default:
- sql_print_error("InnoDB: Recovery failed to read page "
- UINT32PF " from %s",
- cur_page_id.page_no(),
- space->chain.start->name);
- }
- }
-
-
- DBUG_PRINT("ib_buf", ("recovery read (%zu pages) for %s",
- page_nos.size(), space->chain.start->name));
- space->release();
+ if (init)
+ {
+ if (buf_page_t *bpage= buf_page_init_for_read(BUF_READ_ANY_PAGE, page_id,
+ zip_size, true))
+ {
+ ut_ad(bpage->in_file());
+ os_fake_read(IORequest{bpage, (buf_tmp_buffer_t*) &recs,
+ UT_LIST_GET_FIRST(space->chain),
+ IORequest::READ_ASYNC}, ptrdiff_t(init));
+ }
+ }
+ else if (dberr_t err= buf_read_page_low(space, false, BUF_READ_ANY_PAGE,
+ page_id, zip_size, true))
+ {
+ if (err != DB_SUCCESS_LOCKED_REC)
+ sql_print_error("InnoDB: Recovery failed to read page "
+ UINT32PF " from %s",
+ page_id.page_no(), space->chain.start->name);
+ }
}
diff --git a/storage/innobase/fil/fil0fil.cc b/storage/innobase/fil/fil0fil.cc
index be313140225..e4b352a05aa 100644
--- a/storage/innobase/fil/fil0fil.cc
+++ b/storage/innobase/fil/fil0fil.cc
@@ -2775,53 +2775,55 @@ func_exit:
#include <tpool.h>
-/** Callback for AIO completion */
-void fil_aio_callback(const IORequest &request)
+void IORequest::write_complete() const
{
ut_ad(fil_validate_skip());
- ut_ad(request.node);
+ ut_ad(node);
+ ut_ad(is_write());
- if (!request.bpage)
+ if (!bpage)
{
ut_ad(!srv_read_only_mode);
- if (request.type == IORequest::DBLWR_BATCH)
- buf_dblwr.flush_buffered_writes_completed(request);
+ if (type == IORequest::DBLWR_BATCH)
+ buf_dblwr.flush_buffered_writes_completed(*this);
else
- ut_ad(request.type == IORequest::WRITE_ASYNC);
-write_completed:
- request.node->complete_write();
- }
- else if (request.is_write())
- {
- buf_page_write_complete(request);
- goto write_completed;
+ ut_ad(type == IORequest::WRITE_ASYNC);
}
else
- {
- ut_ad(request.is_read());
+ buf_page_write_complete(*this);
- /* IMPORTANT: since i/o handling for reads will read also the insert
- buffer in fil_system.sys_space, we have to be very careful not to
- introduce deadlocks. We never close fil_system.sys_space data
- files and never issue asynchronous reads of change buffer pages. */
- const page_id_t id(request.bpage->id());
+ node->complete_write();
+ node->space->release();
+}
- if (dberr_t err= request.bpage->read_complete(*request.node))
- {
- if (recv_recovery_is_on() && !srv_force_recovery)
- {
- mysql_mutex_lock(&recv_sys.mutex);
- recv_sys.set_corrupt_fs();
- mysql_mutex_unlock(&recv_sys.mutex);
- }
+void IORequest::read_complete() const
+{
+ ut_ad(fil_validate_skip());
+ ut_ad(node);
+ ut_ad(is_read());
+ ut_ad(bpage);
+
+ /* IMPORTANT: since i/o handling for reads will read also the insert
+ buffer in fil_system.sys_space, we have to be very careful not to
+ introduce deadlocks. We never close fil_system.sys_space data files
+ and never issue asynchronous reads of change buffer pages. */
+ const page_id_t id(bpage->id());
- if (err != DB_FAIL)
- ib::error() << "Failed to read page " << id.page_no()
- << " from file '" << request.node->name << "': " << err;
+ if (dberr_t err= bpage->read_complete(*node))
+ {
+ if (recv_recovery_is_on() && !srv_force_recovery)
+ {
+ mysql_mutex_lock(&recv_sys.mutex);
+ recv_sys.set_corrupt_fs();
+ mysql_mutex_unlock(&recv_sys.mutex);
}
+
+ if (err != DB_FAIL)
+ ib::error() << "Failed to read page " << id.page_no()
+ << " from file '" << node->name << "': " << err;
}
- request.node->space->release();
+ node->space->release();
}
/** Flush to disk the writes in file spaces of the given type
diff --git a/storage/innobase/include/buf0buf.h b/storage/innobase/include/buf0buf.h
index 4295c3ba342..6d3ec65b1d3 100644
--- a/storage/innobase/include/buf0buf.h
+++ b/storage/innobase/include/buf0buf.h
@@ -75,8 +75,7 @@ struct buf_pool_info_t
ulint flush_list_len; /*!< Length of buf_pool.flush_list */
ulint n_pend_unzip; /*!< buf_pool.n_pend_unzip, pages
pending decompress */
- ulint n_pend_reads; /*!< buf_pool.n_pend_reads, pages
- pending read */
+ ulint n_pend_reads; /*!< os_aio_pending_reads() */
ulint n_pending_flush_lru; /*!< Pages pending flush in LRU */
ulint n_pending_flush_list; /*!< Pages pending flush in FLUSH
LIST */
diff --git a/storage/innobase/include/buf0rea.h b/storage/innobase/include/buf0rea.h
index 4ec8938c689..3dd085dda5c 100644
--- a/storage/innobase/include/buf0rea.h
+++ b/storage/innobase/include/buf0rea.h
@@ -102,10 +102,13 @@ which could result in a deadlock if the OS does not support asynchronous io.
ulint
buf_read_ahead_linear(const page_id_t page_id, ulint zip_size, bool ibuf);
-/** Issue read requests for pages that need to be recovered.
-@param space_id tablespace identifier
-@param page_nos page numbers to read, in ascending order */
-void buf_read_recv_pages(uint32_t space_id, st_::span<uint32_t> page_nos);
+/** Schedule a page for recovery.
+@param space tablespace
+@param page_id page identifier
+@param recs log records
+@param init page initialization, or nullptr if the page needs to be read */
+void buf_read_recover(fil_space_t *space, const page_id_t page_id,
+ page_recv_t &recs, recv_init *init);
/** @name Modes used in read-ahead @{ */
/** read only pages belonging to the insert buffer tree */
diff --git a/storage/innobase/include/log0recv.h b/storage/innobase/include/log0recv.h
index e787d81e8c2..e642b501409 100644
--- a/storage/innobase/include/log0recv.h
+++ b/storage/innobase/include/log0recv.h
@@ -38,9 +38,9 @@ Created 9/20/1997 Heikki Tuuri
#define recv_recovery_is_on() UNIV_UNLIKELY(recv_sys.recovery_on)
ATTRIBUTE_COLD MY_ATTRIBUTE((nonnull, warn_unused_result))
-/** Apply any buffered redo log to a page that was just read from a data file.
-@param[in,out] space tablespace
-@param[in,out] bpage buffer pool page
+/** Apply any buffered redo log to a page.
+@param space tablespace
+@param bpage buffer pool page
@return whether the page was recovered correctly */
bool recv_recover_page(fil_space_t* space, buf_page_t* bpage);
@@ -49,17 +49,6 @@ of first system tablespace page
@return error code or DB_SUCCESS */
dberr_t recv_recovery_from_checkpoint_start();
-/** Whether to store redo log records in recv_sys.pages */
-enum store_t {
- /** Do not store redo log records. */
- STORE_NO,
- /** Store redo log records. */
- STORE_YES,
- /** Store redo log records if the tablespace exists. */
- STORE_IF_EXISTS
-};
-
-
/** Report an operation to create, delete, or rename a file during backup.
@param[in] space_id tablespace identifier
@param[in] type file operation redo log type
@@ -125,21 +114,15 @@ struct recv_dblwr_t
list pages;
};
-/** the recovery state and buffered records for a page */
+/** recv_sys.pages entry; protected by recv_sys.mutex */
struct page_recv_t
{
- /** Recovery state; protected by recv_sys.mutex */
- enum
- {
- /** not yet processed */
- RECV_NOT_PROCESSED,
- /** not processed; the page will be reinitialized */
- RECV_WILL_NOT_READ,
- /** page is being read */
- RECV_BEING_READ,
- /** log records are being applied on the page */
- RECV_BEING_PROCESSED
- } state= RECV_NOT_PROCESSED;
+ /** Recovery status: 0=not in progress, 1=log is being applied,
+ -1=log has been applied and the entry may be erased.
+ Transitions from 1 to -1 are NOT protected by recv_sys.mutex. */
+ Atomic_relaxed<int8_t> being_processed{0};
+ /** Whether reading the page will be skipped */
+ bool skip_read= false;
/** Latest written byte offset when applying the log records.
@see mtr_t::m_last_offset */
uint16_t last_offset= 1;
@@ -162,6 +145,9 @@ struct page_recv_t
head= recs;
tail= recs;
}
+ /** Remove the last records for the page
+ @param start_lsn start of the removed log */
+ ATTRIBUTE_COLD void rewind(lsn_t start_lsn);
/** @return the last log snippet */
const log_rec_t* last() const { return tail; }
@@ -180,8 +166,8 @@ struct page_recv_t
iterator begin() { return head; }
iterator end() { return NULL; }
bool empty() const { ut_ad(!head == !tail); return !head; }
- /** Clear and free the records; @see recv_sys_t::alloc() */
- inline void clear();
+ /** Clear and free the records; @see recv_sys_t::add() */
+ void clear();
} log;
/** Trim old log records for a page.
@@ -190,21 +176,27 @@ struct page_recv_t
inline bool trim(lsn_t start_lsn);
/** Ignore any earlier redo log records for this page. */
inline void will_not_read();
- /** @return whether the log records for the page are being processed */
- bool is_being_processed() const { return state == RECV_BEING_PROCESSED; }
+};
+
+/** A page initialization operation that was parsed from the redo log */
+struct recv_init
+{
+ /** log sequence number of the page initialization */
+ lsn_t lsn;
+ /** Whether btr_page_create() avoided a read of the page.
+ At the end of the last recovery batch, mark_ibuf_exist()
+ will mark pages for which this flag is set. */
+ bool created;
};
/** Recovery system data structure */
struct recv_sys_t
{
- /** mutex protecting apply_log_recs and page_recv_t::state */
- mysql_mutex_t mutex;
+ using init= recv_init;
+
+ /** mutex protecting this as well as some of page_recv_t */
+ alignas(CPU_LEVEL1_DCACHE_LINESIZE) mysql_mutex_t mutex;
private:
- /** condition variable for
- !apply_batch_on || pages.empty() || found_corrupt_log || found_corrupt_fs */
- pthread_cond_t cond;
- /** whether recv_apply_hashed_log_recs() is running */
- bool apply_batch_on;
/** set when finding a corrupt log block or record, or there is a
log parsing buffer overflow */
bool found_corrupt_log;
@@ -226,6 +218,8 @@ public:
size_t offset;
/** log sequence number of the first non-parsed record */
lsn_t lsn;
+ /** log sequence number of the last parsed mini-transaction */
+ lsn_t scanned_lsn;
/** log sequence number at the end of the FILE_CHECKPOINT record, or 0 */
lsn_t file_checkpoint;
/** the time when progress was last reported */
@@ -238,6 +232,9 @@ public:
map pages;
private:
+ /** iterator to pages, used by parse() */
+ map::iterator pages_it;
+
/** Process a record that indicates that a tablespace size is being shrunk.
@param page_id first page that is not in the file
@param lsn log sequence number of the shrink operation */
@@ -257,30 +254,42 @@ public:
/** The contents of the doublewrite buffer */
recv_dblwr_t dblwr;
- /** Last added LSN to pages, before switching to STORE_NO */
- lsn_t last_stored_lsn= 0;
-
inline void read(os_offset_t offset, span<byte> buf);
inline size_t files_size();
void close_files() { files.clear(); files.shrink_to_fit(); }
+ /** Advance pages_it if it matches the iterator */
+ void pages_it_invalidate(const map::iterator &p)
+ {
+ mysql_mutex_assert_owner(&mutex);
+ if (pages_it == p)
+ pages_it++;
+ }
+ /** Invalidate pages_it if it points to the given tablespace */
+ void pages_it_invalidate(uint32_t space_id)
+ {
+ mysql_mutex_assert_owner(&mutex);
+ if (pages_it != pages.end() && pages_it->first.space() == space_id)
+ pages_it= pages.end();
+ }
+
private:
/** Attempt to initialize a page based on redo log records.
- @param page_id page identifier
- @param p iterator pointing to page_id
+ @param p iterator
@param mtr mini-transaction
@param b pre-allocated buffer pool block
+ @param init page initialization
@return the recovered block
@retval nullptr if the page cannot be initialized based on log records
@retval -1 if the page cannot be recovered due to corruption */
- inline buf_block_t *recover_low(const page_id_t page_id, map::iterator &p,
- mtr_t &mtr, buf_block_t *b);
+ inline buf_block_t *recover_low(const map::iterator &p, mtr_t &mtr,
+ buf_block_t *b, init &init);
/** Attempt to initialize a page based on redo log records.
@param page_id page identifier
@return the recovered block
@retval nullptr if the page cannot be initialized based on log records
@retval -1 if the page cannot be recovered due to corruption */
- buf_block_t *recover_low(const page_id_t page_id);
+ ATTRIBUTE_COLD buf_block_t *recover_low(const page_id_t page_id);
/** All found log files (multiple ones are possible if we are upgrading
from before MariaDB Server 10.5.1) */
@@ -289,10 +298,27 @@ private:
/** Base node of the redo block list.
List elements are linked via buf_block_t::unzip_LRU. */
UT_LIST_BASE_NODE_T(buf_block_t) blocks;
+
+ /** Allocate a block from the buffer pool for recv_sys.pages */
+ ATTRIBUTE_COLD buf_block_t *add_block();
+
+ /** Wait for buffer pool to become available.
+ @param pages number of buffer pool pages needed */
+ ATTRIBUTE_COLD void wait_for_pool(size_t pages);
+
+ /** Free log for processed pages. */
+ void garbage_collect();
+
+ /** Apply a recovery batch.
+ @param space_id current tablespace identifier
+ @param space current tablespace
+ @param free_block spare buffer block
+ @param last_batch whether it is possible to write more redo log
+ @return whether the caller must provide a new free_block */
+ bool apply_batch(uint32_t space_id, fil_space_t *&space,
+ buf_block_t *&free_block, bool last_batch);
+
public:
- /** Check whether the number of read redo log blocks exceeds the maximum.
- @return whether the memory is exhausted */
- inline bool is_memory_exhausted();
/** Apply buffered log to persistent data pages.
@param last_batch whether it is possible to write more redo log */
void apply(bool last_batch);
@@ -310,7 +336,7 @@ public:
/** Clean up after create() */
void close();
- bool is_initialised() const { return last_stored_lsn != 0; }
+ bool is_initialised() const { return scanned_lsn != 0; }
/** Find the latest checkpoint.
@return error code or DB_SUCCESS */
@@ -321,60 +347,76 @@ public:
@param start_lsn start LSN of the mini-transaction
@param lsn @see mtr_t::commit_lsn()
@param l redo log snippet
- @param len length of l, in bytes */
- inline void add(map::iterator it, lsn_t start_lsn, lsn_t lsn,
- const byte *l, size_t len);
-
- enum parse_mtr_result { OK, PREMATURE_EOF, GOT_EOF };
+ @param len length of l, in bytes
+ @return whether we ran out of memory */
+ bool add(map::iterator it, lsn_t start_lsn, lsn_t lsn,
+ const byte *l, size_t len);
+
+ /** Parsing result */
+ enum parse_mtr_result {
+ /** a record was successfully parsed */
+ OK,
+ /** the log ended prematurely (need to read more) */
+ PREMATURE_EOF,
+ /** the end of the log was reached */
+ GOT_EOF,
+ /** parse<true>(l, false) ran out of memory */
+ GOT_OOM
+ };
private:
/** Parse and register one log_t::FORMAT_10_8 mini-transaction.
- @param store whether to store the records
- @param l log data source */
+ @tparam store whether to store the records
+ @param l log data source
+ @param if_exists if store: whether to check if the tablespace exists */
+ template<typename source,bool store>
+ inline parse_mtr_result parse(source &l, bool if_exists) noexcept;
+
+ /** Rewind a mini-transaction when parse() runs out of memory.
+ @param l log data source
+ @param begin start of the mini-transaction */
template<typename source>
- inline parse_mtr_result parse(store_t store, source& l) noexcept;
+ ATTRIBUTE_COLD void rewind(source &l, source &begin) noexcept;
+
+ /** Report progress in terms of LSN or pages remaining */
+ ATTRIBUTE_COLD void report_progress() const;
public:
/** Parse and register one log_t::FORMAT_10_8 mini-transaction,
handling log_sys.is_pmem() buffer wrap-around.
- @param store whether to store the records */
- static parse_mtr_result parse_mtr(store_t store) noexcept;
+ @tparam store whether to store the records
+ @param if_exists if store: whether to check if the tablespace exists */
+ template<bool store>
+ static parse_mtr_result parse_mtr(bool if_exists) noexcept;
/** Parse and register one log_t::FORMAT_10_8 mini-transaction,
handling log_sys.is_pmem() buffer wrap-around.
- @param store whether to store the records */
- static parse_mtr_result parse_pmem(store_t store) noexcept
+ @tparam store whether to store the records
+ @param if_exists if store: whether to check if the tablespace exists */
+ template<bool store>
+ static parse_mtr_result parse_pmem(bool if_exists) noexcept
#ifdef HAVE_PMEM
;
#else
- { return parse_mtr(store); }
+ { return parse_mtr<store>(if_exists); }
#endif
+ /** Erase log records for a page. */
+ void erase(map::iterator p);
+
/** Clear a fully processed set of stored redo log records. */
- inline void clear();
+ void clear();
/** Determine whether redo log recovery progress should be reported.
@param time the current time
@return whether progress should be reported
(the last report was at least 15 seconds ago) */
- bool report(time_t time)
- {
- if (time - progress_time < 15)
- return false;
-
- progress_time= time;
- return true;
- }
+ bool report(time_t time);
/** The alloc() memory alignment, in bytes */
static constexpr size_t ALIGNMENT= sizeof(size_t);
- /** Allocate memory for log_rec_t
- @param len allocation size, in bytes
- @return pointer to len bytes of memory (never NULL) */
- inline void *alloc(size_t len);
-
/** Free a redo log snippet.
- @param data buffer returned by alloc() */
+ @param data buffer allocated in add() */
inline void free(const void *data);
/** Remove records for a corrupted page.
@@ -386,8 +428,6 @@ public:
ATTRIBUTE_COLD void set_corrupt_fs();
/** Flag log file corruption during recovery. */
ATTRIBUTE_COLD void set_corrupt_log();
- /** Possibly finish a recovery batch. */
- inline void maybe_finish_batch();
/** @return whether data file corruption was found */
bool is_corrupt_fs() const { return UNIV_UNLIKELY(found_corrupt_fs); }
@@ -405,13 +445,14 @@ public:
}
/** Try to recover a tablespace that was not readable earlier
- @param p iterator, initially pointing to page_id_t{space_id,0};
- the records will be freed and the iterator advanced
+ @param p iterator
@param name tablespace file name
@param free_block spare buffer block
- @return whether recovery failed */
- bool recover_deferred(map::iterator &p, const std::string &name,
- buf_block_t *&free_block);
+ @return recovered tablespace
+ @retval nullptr if recovery failed */
+ fil_space_t *recover_deferred(const map::iterator &p,
+ const std::string &name,
+ buf_block_t *&free_block);
};
/** The recovery system */
diff --git a/storage/innobase/include/os0file.h b/storage/innobase/include/os0file.h
index 13f9d3de3f8..54f7ceeb4c0 100644
--- a/storage/innobase/include/os0file.h
+++ b/storage/innobase/include/os0file.h
@@ -212,6 +212,10 @@ public:
bool is_LRU() const { return (type & (WRITE_LRU ^ WRITE_ASYNC)) != 0; }
bool is_async() const { return (type & (READ_SYNC ^ READ_ASYNC)) != 0; }
+ void write_complete() const;
+ void read_complete() const;
+ void fake_read_complete(os_offset_t offset) const;
+
/** If requested, free storage space associated with a section of the file.
@param off byte offset from the start (SEEK_SET)
@param len size of the hole in bytes
@@ -1040,6 +1044,11 @@ int os_aio_init();
Frees the asynchronous io system. */
void os_aio_free();
+/** Submit a fake read request during crash recovery.
+@param type fake read request
+@param offset additional context */
+void os_fake_read(const IORequest &type, os_offset_t offset);
+
/** Request a read or write.
@param type I/O request
@param buf buffer
diff --git a/storage/innobase/log/log0recv.cc b/storage/innobase/log/log0recv.cc
index 37a496725fc..4619786ee8d 100644
--- a/storage/innobase/log/log0recv.cc
+++ b/storage/innobase/log/log0recv.cc
@@ -738,7 +738,7 @@ static struct
{
retry:
log_sys.latch.wr_unlock();
- bool fail= false;
+ fil_space_t *space= fil_system.sys_space;
buf_block_t *free_block= buf_LRU_get_free_block(false);
log_sys.latch.wr_lock(SRW_LOCK_CALL);
mysql_mutex_lock(&recv_sys.mutex);
@@ -755,11 +755,12 @@ retry:
there were no buffered records. Either way, we must create a
dummy tablespace with the latest known name,
for dict_drop_index_tree(). */
+ recv_sys.pages_it_invalidate(space_id);
while (p != recv_sys.pages.end() && p->first.space() == space_id)
{
+ ut_ad(!p->second.being_processed);
recv_sys_t::map::iterator r= p++;
- r->second.log.clear();
- recv_sys.pages.erase(r);
+ recv_sys.erase(r);
}
recv_spaces_t::iterator it{recv_spaces.find(space_id)};
if (it != recv_spaces.end())
@@ -782,11 +783,14 @@ retry:
}
}
else
- fail= recv_sys.recover_deferred(p, d->second.file_name, free_block);
+ space= recv_sys.recover_deferred(p, d->second.file_name, free_block);
processed:
- defers.erase(d++);
- if (fail)
+ auto e= d++;
+ defers.erase(e);
+ if (!space)
break;
+ if (space != fil_system.sys_space)
+ space->release();
if (free_block)
continue;
mysql_mutex_unlock(&recv_sys.mutex);
@@ -797,7 +801,7 @@ processed:
mysql_mutex_unlock(&recv_sys.mutex);
if (free_block)
buf_pool.free_block(free_block);
- return fail;
+ return !space;
}
/** Create tablespace metadata for a data file that was initially
@@ -905,28 +909,191 @@ free_space:
}
deferred_spaces;
+/** Report an operation to create, delete, or rename a file during backup.
+@param[in] space_id tablespace identifier
+@param[in] type redo log type
+@param[in] name file name (not NUL-terminated)
+@param[in] len length of name, in bytes
+@param[in] new_name new file name (NULL if not rename)
+@param[in] new_len length of new_name, in bytes (0 if NULL) */
+void (*log_file_op)(uint32_t space_id, int type,
+ const byte* name, ulint len,
+ const byte* new_name, ulint new_len);
+
+void (*undo_space_trunc)(uint32_t space_id);
+
+void (*first_page_init)(uint32_t space_id);
+
+/** Information about initializing page contents during redo log processing.
+FIXME: Rely on recv_sys.pages! */
+class mlog_init_t
+{
+ using map= std::map<const page_id_t, recv_init,
+ std::less<const page_id_t>,
+ ut_allocator<std::pair<const page_id_t, recv_init>>>;
+ /** Map of page initialization operations.
+ FIXME: Merge this to recv_sys.pages! */
+ map inits;
+
+ /** Iterator to the last add() or will_avoid_read(), for speeding up
+ will_avoid_read(). */
+ map::iterator i;
+public:
+ /** Constructor */
+ mlog_init_t() : i(inits.end()) {}
+
+ /** Record that a page will be initialized by the redo log.
+ @param page_id page identifier
+ @param lsn log sequence number
+ @return whether the state was changed */
+ bool add(const page_id_t page_id, lsn_t lsn)
+ {
+ mysql_mutex_assert_owner(&recv_sys.mutex);
+ const recv_init init = { lsn, false };
+ std::pair<map::iterator, bool> p=
+ inits.insert(map::value_type(page_id, init));
+ ut_ad(!p.first->second.created);
+ if (p.second) return true;
+ if (p.first->second.lsn >= lsn) return false;
+ p.first->second = init;
+ i = p.first;
+ return true;
+ }
+
+ /** Get the last stored lsn of the page id and its respective
+ init/load operation.
+ @param page_id page identifier
+ @return the latest page initialization;
+ not valid after releasing recv_sys.mutex. */
+ recv_init &last(page_id_t page_id)
+ {
+ mysql_mutex_assert_owner(&recv_sys.mutex);
+ return inits.find(page_id)->second;
+ }
+
+ /** Determine if a page will be initialized or freed after a time.
+ @param page_id page identifier
+ @param lsn log sequence number
+ @return whether page_id will be freed or initialized after lsn */
+ bool will_avoid_read(page_id_t page_id, lsn_t lsn)
+ {
+ mysql_mutex_assert_owner(&recv_sys.mutex);
+ if (i != inits.end() && i->first == page_id)
+ return i->second.lsn > lsn;
+ i = inits.lower_bound(page_id);
+ return i != inits.end() && i->first == page_id && i->second.lsn > lsn;
+ }
+
+ /** At the end of each recovery batch, reset the 'created' flags. */
+ void reset()
+ {
+ mysql_mutex_assert_owner(&recv_sys.mutex);
+ ut_ad(recv_no_ibuf_operations);
+ for (map::value_type &i : inits)
+ i.second.created= false;
+ }
+
+ /** During the last recovery batch, mark whether there exist
+ buffered changes for the pages that were initialized
+ by buf_page_create() and still reside in the buffer pool. */
+ void mark_ibuf_exist()
+ {
+ mysql_mutex_assert_owner(&recv_sys.mutex);
+
+ for (const map::value_type &i : inits)
+ if (i.second.created)
+ {
+ auto &chain= buf_pool.page_hash.cell_get(i.first.fold());
+ page_hash_latch &hash_lock= buf_pool.page_hash.lock_get(chain);
+
+ hash_lock.lock_shared();
+ buf_block_t *block= reinterpret_cast<buf_block_t*>
+ (buf_pool.page_hash.get(i.first, chain));
+ bool got_latch= block && block->page.lock.x_lock_try();
+ hash_lock.unlock_shared();
+
+ if (!block)
+ continue;
+
+ uint32_t state;
+
+ if (!got_latch)
+ {
+ mysql_mutex_lock(&buf_pool.mutex);
+ block= reinterpret_cast<buf_block_t*>
+ (buf_pool.page_hash.get(i.first, chain));
+ if (!block)
+ {
+ mysql_mutex_unlock(&buf_pool.mutex);
+ continue;
+ }
+
+ state= block->page.fix();
+ mysql_mutex_unlock(&buf_pool.mutex);
+ if (state < buf_page_t::UNFIXED)
+ {
+ block->page.unfix();
+ continue;
+ }
+ block->page.lock.x_lock();
+ state= block->page.unfix();
+ ut_ad(state < buf_page_t::READ_FIX);
+ if (state >= buf_page_t::UNFIXED && block->page.id() == i.first)
+ goto check_ibuf;
+ }
+ else
+ {
+ state= block->page.state();
+ ut_ad(state >= buf_page_t::FREED);
+ ut_ad(state < buf_page_t::READ_FIX);
+
+ if (state >= buf_page_t::UNFIXED)
+ {
+ check_ibuf:
+ mysql_mutex_unlock(&recv_sys.mutex);
+ if (ibuf_page_exists(block->page.id(), block->zip_size()))
+ block->page.set_ibuf_exist();
+ mysql_mutex_lock(&recv_sys.mutex);
+ }
+ }
+
+ block->page.lock.x_unlock();
+ }
+ }
+
+ /** Clear the data structure */
+ void clear() { inits.clear(); i = inits.end(); }
+};
+
+static mlog_init_t mlog_init;
+
/** Try to recover a tablespace that was not readable earlier
-@param p iterator, initially pointing to page_id_t{space_id,0};
- the records will be freed and the iterator advanced
+@param p iterator to the page
@param name tablespace file name
@param free_block spare buffer block
-@return whether recovery failed */
-bool recv_sys_t::recover_deferred(recv_sys_t::map::iterator &p,
- const std::string &name,
- buf_block_t *&free_block)
+@return recovered tablespace
+@retval nullptr if recovery failed */
+fil_space_t *recv_sys_t::recover_deferred(const recv_sys_t::map::iterator &p,
+ const std::string &name,
+ buf_block_t *&free_block)
{
mysql_mutex_assert_owner(&mutex);
- const page_id_t first{p->first};
- ut_ad(first.space());
+ ut_ad(p->first.space());
- recv_spaces_t::iterator it{recv_spaces.find(first.space())};
+ recv_spaces_t::iterator it{recv_spaces.find(p->first.space())};
ut_ad(it != recv_spaces.end());
- if (!first.page_no() && p->second.state == page_recv_t::RECV_WILL_NOT_READ)
+ if (!p->first.page_no() && p->second.skip_read)
{
mtr_t mtr;
- buf_block_t *block= recover_low(first, p, mtr, free_block);
+ ut_ad(!p->second.being_processed);
+ p->second.being_processed= 1;
+ init &init= mlog_init.last(p->first);
+ mysql_mutex_unlock(&mutex);
+ buf_block_t *block= recover_low(p, mtr, free_block, init);
+ mysql_mutex_lock(&mutex);
+ p->second.being_processed= -1;
ut_ad(block == free_block || block == reinterpret_cast<buf_block_t*>(-1));
free_block= nullptr;
if (UNIV_UNLIKELY(!block || block == reinterpret_cast<buf_block_t*>(-1)))
@@ -939,10 +1106,7 @@ bool recv_sys_t::recover_deferred(recv_sys_t::map::iterator &p,
const uint32_t page_no= mach_read_from_4(page + FIL_PAGE_OFFSET);
const uint32_t size= fsp_header_get_field(page, FSP_SIZE);
- ut_ad(it != recv_spaces.end());
-
- if (page_id_t{space_id, page_no} == first && size >= 4 &&
- it != recv_spaces.end() &&
+ if (page_id_t{space_id, page_no} == p->first && size >= 4 &&
fil_space_t::is_valid_flags(flags, space_id) &&
fil_space_t::logical_size(flags) == srv_page_size)
{
@@ -996,10 +1160,10 @@ bool recv_sys_t::recover_deferred(recv_sys_t::map::iterator &p,
}
size_set:
node->deferred= false;
- space->release();
it->second.space= space;
block->page.lock.x_unlock();
- return false;
+ p->second.being_processed= -1;
+ return space;
}
release_and_fail:
@@ -1007,179 +1171,34 @@ bool recv_sys_t::recover_deferred(recv_sys_t::map::iterator &p,
}
fail:
- ib::error() << "Cannot apply log to " << first
+ ib::error() << "Cannot apply log to " << p->first
<< " of corrupted file '" << name << "'";
- return true;
+ return nullptr;
}
-/** Report an operation to create, delete, or rename a file during backup.
-@param[in] space_id tablespace identifier
-@param[in] type redo log type
-@param[in] name file name (not NUL-terminated)
-@param[in] len length of name, in bytes
-@param[in] new_name new file name (NULL if not rename)
-@param[in] new_len length of new_name, in bytes (0 if NULL) */
-void (*log_file_op)(uint32_t space_id, int type,
- const byte* name, ulint len,
- const byte* new_name, ulint new_len);
-
-void (*undo_space_trunc)(uint32_t space_id);
-
-void (*first_page_init)(uint32_t space_id);
-
-/** Information about initializing page contents during redo log processing.
-FIXME: Rely on recv_sys.pages! */
-class mlog_init_t
-{
-public:
- /** A page initialization operation that was parsed from
- the redo log */
- struct init {
- /** log sequence number of the page initialization */
- lsn_t lsn;
- /** Whether btr_page_create() avoided a read of the page.
-
- At the end of the last recovery batch, mark_ibuf_exist()
- will mark pages for which this flag is set. */
- bool created;
- };
-
-private:
- typedef std::map<const page_id_t, init,
- std::less<const page_id_t>,
- ut_allocator<std::pair<const page_id_t, init> > >
- map;
- /** Map of page initialization operations.
- FIXME: Merge this to recv_sys.pages! */
- map inits;
-public:
- /** Record that a page will be initialized by the redo log.
- @param[in] page_id page identifier
- @param[in] lsn log sequence number
- @return whether the state was changed */
- bool add(const page_id_t page_id, lsn_t lsn)
- {
- mysql_mutex_assert_owner(&recv_sys.mutex);
- const init init = { lsn, false };
- std::pair<map::iterator, bool> p = inits.insert(
- map::value_type(page_id, init));
- ut_ad(!p.first->second.created);
- if (p.second) return true;
- if (p.first->second.lsn >= init.lsn) return false;
- p.first->second = init;
- return true;
- }
-
- /** Get the last stored lsn of the page id and its respective
- init/load operation.
- @param[in] page_id page id
- @param[in,out] init initialize log or load log
- @return the latest page initialization;
- not valid after releasing recv_sys.mutex. */
- init& last(page_id_t page_id)
- {
- mysql_mutex_assert_owner(&recv_sys.mutex);
- return inits.find(page_id)->second;
- }
-
- /** Determine if a page will be initialized or freed after a time.
- @param page_id page identifier
- @param lsn log sequence number
- @return whether page_id will be freed or initialized after lsn */
- bool will_avoid_read(page_id_t page_id, lsn_t lsn) const
- {
- mysql_mutex_assert_owner(&recv_sys.mutex);
- auto i= inits.find(page_id);
- return i != inits.end() && i->second.lsn > lsn;
- }
-
- /** At the end of each recovery batch, reset the 'created' flags. */
- void reset()
- {
- mysql_mutex_assert_owner(&recv_sys.mutex);
- ut_ad(recv_no_ibuf_operations);
- for (map::value_type& i : inits) {
- i.second.created = false;
- }
- }
-
- /** On the last recovery batch, mark whether there exist
- buffered changes for the pages that were initialized
- by buf_page_create() and still reside in the buffer pool.
- @param[in,out] mtr dummy mini-transaction */
- void mark_ibuf_exist(mtr_t& mtr)
- {
- mysql_mutex_assert_owner(&recv_sys.mutex);
- mtr.start();
-
- for (const map::value_type& i : inits) {
- if (!i.second.created) {
- continue;
- }
- if (buf_block_t* block = buf_page_get_low(
- i.first, 0, RW_X_LATCH, nullptr,
- BUF_GET_IF_IN_POOL,
- &mtr, nullptr, false)) {
- if (UNIV_LIKELY_NULL(block->page.zip.data)) {
- switch (fil_page_get_type(
- block->page.zip.data)) {
- case FIL_PAGE_INDEX:
- case FIL_PAGE_RTREE:
- if (page_zip_decompress(
- &block->page.zip,
- block->page.frame,
- true)) {
- break;
- }
- ib::error() << "corrupted "
- << block->page.id();
- }
- }
- if (recv_no_ibuf_operations) {
- mtr.commit();
- mtr.start();
- continue;
- }
- mysql_mutex_unlock(&recv_sys.mutex);
- if (ibuf_page_exists(block->page.id(),
- block->zip_size())) {
- block->page.set_ibuf_exist();
- }
- mtr.commit();
- mtr.start();
- mysql_mutex_lock(&recv_sys.mutex);
- }
- }
-
- mtr.commit();
- clear();
- }
-
- /** Clear the data structure */
- void clear() { inits.clear(); }
-};
-
-static mlog_init_t mlog_init;
-
/** Process a record that indicates that a tablespace is
being shrunk in size.
@param page_id first page identifier that is not in the file
@param lsn log sequence number of the shrink operation */
inline void recv_sys_t::trim(const page_id_t page_id, lsn_t lsn)
{
- DBUG_ENTER("recv_sys_t::trim");
- DBUG_LOG("ib_log",
- "discarding log beyond end of tablespace "
- << page_id << " before LSN " << lsn);
- mysql_mutex_assert_owner(&mutex);
- for (recv_sys_t::map::iterator p = pages.lower_bound(page_id);
- p != pages.end() && p->first.space() == page_id.space();) {
- recv_sys_t::map::iterator r = p++;
- if (r->second.trim(lsn)) {
- pages.erase(r);
- }
- }
- DBUG_VOID_RETURN;
+ DBUG_ENTER("recv_sys_t::trim");
+ DBUG_LOG("ib_log", "discarding log beyond end of tablespace "
+ << page_id << " before LSN " << lsn);
+ mysql_mutex_assert_owner(&mutex);
+ if (pages_it != pages.end() && pages_it->first.space() == page_id.space())
+ pages_it= pages.end();
+ for (recv_sys_t::map::iterator p = pages.lower_bound(page_id);
+ p != pages.end() && p->first.space() == page_id.space();)
+ {
+ recv_sys_t::map::iterator r = p++;
+ if (r->second.trim(lsn))
+ {
+ ut_ad(!r->second.being_processed);
+ pages.erase(r);
+ }
+ }
+ DBUG_VOID_RETURN;
}
inline void recv_sys_t::read(os_offset_t total_offset, span<byte> buf)
@@ -1202,15 +1221,10 @@ inline size_t recv_sys_t::files_size()
@param[in] space_id the tablespace ID
@param[in] ftype FILE_MODIFY, FILE_DELETE, or FILE_RENAME
@param[in] lsn lsn of the redo log
-@param[in] store whether the redo log has to be stored */
+@param[in] if_exists whether to check if the tablespace exists */
static void fil_name_process(const char *name, ulint len, uint32_t space_id,
- mfile_type_t ftype, lsn_t lsn, store_t store)
+ mfile_type_t ftype, lsn_t lsn, bool if_exists)
{
- if (srv_operation == SRV_OPERATION_BACKUP
- || srv_operation == SRV_OPERATION_BACKUP_NO_DEFER) {
- return;
- }
-
ut_ad(srv_operation <= SRV_OPERATION_EXPORT_RESTORED
|| srv_operation == SRV_OPERATION_RESTORE
|| srv_operation == SRV_OPERATION_RESTORE_EXPORT);
@@ -1321,7 +1335,7 @@ same_space:
case FIL_LOAD_DEFER:
/** Skip the deferred spaces
when lsn is already processed */
- if (store != store_t::STORE_IF_EXISTS) {
+ if (!if_exists) {
deferred_spaces.add(
space_id, fname.name.c_str(), lsn);
}
@@ -1364,9 +1378,8 @@ void recv_sys_t::close()
deferred_spaces.clear();
ut_d(mysql_mutex_unlock(&mutex));
- last_stored_lsn= 0;
+ scanned_lsn= 0;
mysql_mutex_destroy(&mutex);
- pthread_cond_destroy(&cond);
}
recv_spaces.clear();
@@ -1381,34 +1394,34 @@ void recv_sys_t::create()
ut_ad(this == &recv_sys);
ut_ad(!is_initialised());
mysql_mutex_init(recv_sys_mutex_key, &mutex, nullptr);
- pthread_cond_init(&cond, nullptr);
apply_log_recs = false;
- apply_batch_on = false;
len = 0;
offset = 0;
lsn = 0;
+ scanned_lsn = 1;
found_corrupt_log = false;
found_corrupt_fs = false;
file_checkpoint = 0;
progress_time = time(NULL);
+ ut_ad(pages.empty());
+ pages_it = pages.end();
recv_max_page_lsn = 0;
memset(truncated_undo_spaces, 0, sizeof truncated_undo_spaces);
- last_stored_lsn = 1;
UT_LIST_INIT(blocks, &buf_block_t::unzip_LRU);
}
/** Clear a fully processed set of stored redo log records. */
-inline void recv_sys_t::clear()
+void recv_sys_t::clear()
{
mysql_mutex_assert_owner(&mutex);
apply_log_recs= false;
- apply_batch_on= false;
ut_ad(!after_apply || found_corrupt_fs || !UT_LIST_GET_LAST(blocks));
pages.clear();
+ pages_it= pages.end();
for (buf_block_t *block= UT_LIST_GET_LAST(blocks); block; )
{
@@ -1419,8 +1432,6 @@ inline void recv_sys_t::clear()
buf_block_free(block);
block= prev_block;
}
-
- pthread_cond_broadcast(&cond);
}
/** Free most recovery data structures. */
@@ -1432,52 +1443,14 @@ void recv_sys_t::debug_free()
recovery_on= false;
pages.clear();
+ pages_it= pages.end();
mysql_mutex_unlock(&mutex);
}
-inline void *recv_sys_t::alloc(size_t len)
-{
- mysql_mutex_assert_owner(&mutex);
- ut_ad(len);
- ut_ad(len <= srv_page_size);
-
- buf_block_t *block= UT_LIST_GET_FIRST(blocks);
- if (UNIV_UNLIKELY(!block))
- {
-create_block:
- block= buf_block_alloc();
- block->page.access_time= 1U << 16 |
- ut_calc_align<uint16_t>(static_cast<uint16_t>(len), ALIGNMENT);
- static_assert(ut_is_2pow(ALIGNMENT), "ALIGNMENT must be a power of 2");
- UT_LIST_ADD_FIRST(blocks, block);
- MEM_MAKE_ADDRESSABLE(block->page.frame, len);
- MEM_NOACCESS(block->page.frame + len, srv_page_size - len);
- return my_assume_aligned<ALIGNMENT>(block->page.frame);
- }
-
- size_t free_offset= static_cast<uint16_t>(block->page.access_time);
- ut_ad(!ut_2pow_remainder(free_offset, ALIGNMENT));
- if (UNIV_UNLIKELY(!free_offset))
- {
- ut_ad(srv_page_size == 65536);
- goto create_block;
- }
- ut_ad(free_offset <= srv_page_size);
- free_offset+= len;
-
- if (free_offset > srv_page_size)
- goto create_block;
-
- block->page.access_time= ((block->page.access_time >> 16) + 1) << 16 |
- ut_calc_align<uint16_t>(static_cast<uint16_t>(free_offset), ALIGNMENT);
- MEM_MAKE_ADDRESSABLE(block->page.frame + free_offset - len, len);
- return my_assume_aligned<ALIGNMENT>(block->page.frame + free_offset - len);
-}
-
/** Free a redo log snippet.
-@param data buffer returned by alloc() */
+@param data buffer allocated in add() */
inline void recv_sys_t::free(const void *data)
{
ut_ad(!ut_align_offset(data, ALIGNMENT));
@@ -1502,8 +1475,11 @@ inline void recv_sys_t::free(const void *data)
ut_ad(block->page.state() == buf_page_t::MEMORY);
ut_ad(static_cast<uint16_t>(block->page.access_time - 1) <
srv_page_size);
- ut_ad(block->page.access_time >= 1U << 16);
- if (!((block->page.access_time -= 1U << 16) >> 16))
+ unsigned a= block->page.access_time;
+ ut_ad(a >= 1U << 16);
+ a-= 1U << 16;
+ block->page.access_time= a;
+ if (!(a >> 16))
{
UT_LIST_REMOVE(blocks, block);
MEM_MAKE_ADDRESSABLE(block->page.frame, srv_page_size);
@@ -1689,6 +1665,9 @@ dberr_t recv_sys_t::find_checkpoint()
bool wrong_size= false;
byte *buf;
+ ut_ad(pages.empty());
+ pages_it= pages.end();
+
if (files.empty())
{
file_checkpoint= 0;
@@ -1965,7 +1944,31 @@ inline bool page_recv_t::trim(lsn_t start_lsn)
}
-inline void page_recv_t::recs_t::clear()
+void page_recv_t::recs_t::rewind(lsn_t start_lsn)
+{
+ mysql_mutex_assert_owner(&recv_sys.mutex);
+ log_phys_t *trim= static_cast<log_phys_t*>(head);
+ ut_ad(trim);
+ while (log_phys_t *next= static_cast<log_phys_t*>(trim->next))
+ {
+ ut_ad(trim->start_lsn < start_lsn);
+ if (next->start_lsn == start_lsn)
+ break;
+ trim= next;
+ }
+ tail= trim;
+ log_rec_t *l= tail->next;
+ tail->next= nullptr;
+ while (l)
+ {
+ log_rec_t *next= l->next;
+ recv_sys.free(l);
+ l= next;
+ }
+}
+
+
+void page_recv_t::recs_t::clear()
{
mysql_mutex_assert_owner(&recv_sys.mutex);
for (const log_rec_t *l= head; l; )
@@ -1977,33 +1980,99 @@ inline void page_recv_t::recs_t::clear()
head= tail= nullptr;
}
-
/** Ignore any earlier redo log records for this page. */
inline void page_recv_t::will_not_read()
{
- ut_ad(state == RECV_NOT_PROCESSED || state == RECV_WILL_NOT_READ);
- state= RECV_WILL_NOT_READ;
+ ut_ad(!being_processed);
+ skip_read= true;
log.clear();
}
+void recv_sys_t::erase(map::iterator p)
+{
+ ut_ad(p->second.being_processed <= 0);
+ p->second.log.clear();
+ pages.erase(p);
+}
+
+/** Free log for processed pages. */
+void recv_sys_t::garbage_collect()
+{
+ mysql_mutex_assert_owner(&mutex);
+
+ if (pages_it != pages.end() && pages_it->second.being_processed < 0)
+ pages_it= pages.end();
+
+ for (map::iterator p= pages.begin(); p != pages.end(); )
+ {
+ if (p->second.being_processed < 0)
+ {
+ map::iterator r= p++;
+ erase(r);
+ }
+ else
+ p++;
+ }
+}
+
+/** Allocate a block from the buffer pool for recv_sys.pages */
+ATTRIBUTE_COLD buf_block_t *recv_sys_t::add_block()
+{
+ for (bool freed= false;;)
+ {
+ const auto rs= UT_LIST_GET_LEN(blocks) * 2;
+ mysql_mutex_lock(&buf_pool.mutex);
+ const auto bs=
+ UT_LIST_GET_LEN(buf_pool.free) + UT_LIST_GET_LEN(buf_pool.LRU);
+ if (UNIV_LIKELY(bs > BUF_LRU_MIN_LEN || rs < bs))
+ {
+ buf_block_t *block= buf_LRU_get_free_block(true);
+ mysql_mutex_unlock(&buf_pool.mutex);
+ return block;
+ }
+ /* out of memory: redo log occupies more than 1/3 of buf_pool
+ and there are fewer than BUF_LRU_MIN_LEN pages left */
+ mysql_mutex_unlock(&buf_pool.mutex);
+ if (freed)
+ return nullptr;
+ freed= true;
+ garbage_collect();
+ }
+}
+
+/** Wait for buffer pool to become available. */
+ATTRIBUTE_COLD void recv_sys_t::wait_for_pool(size_t pages)
+{
+ mysql_mutex_unlock(&mutex);
+ os_aio_wait_until_no_pending_reads(false);
+ mysql_mutex_lock(&mutex);
+ garbage_collect();
+ mysql_mutex_lock(&buf_pool.mutex);
+ bool need_more= UT_LIST_GET_LEN(buf_pool.free) < pages;
+ mysql_mutex_unlock(&buf_pool.mutex);
+ if (need_more)
+ buf_flush_sync_batch(lsn);
+}
/** Register a redo log snippet for a page.
@param it page iterator
@param start_lsn start LSN of the mini-transaction
@param lsn @see mtr_t::commit_lsn()
@param l redo log snippet
-@param len length of l, in bytes */
-inline void recv_sys_t::add(map::iterator it, lsn_t start_lsn, lsn_t lsn,
- const byte *l, size_t len)
+@param len length of l, in bytes
+@return whether we ran out of memory */
+ATTRIBUTE_NOINLINE
+bool recv_sys_t::add(map::iterator it, lsn_t start_lsn, lsn_t lsn,
+ const byte *l, size_t len)
{
mysql_mutex_assert_owner(&mutex);
- page_id_t page_id = it->first;
page_recv_t &recs= it->second;
+ buf_block_t *block;
switch (*l & 0x70) {
case FREE_PAGE: case INIT_PAGE:
recs.will_not_read();
- mlog_init.add(page_id, start_lsn); /* FIXME: remove this! */
+ mlog_init.add(it->first, start_lsn); /* FIXME: remove this! */
/* fall through */
default:
log_phys_t *tail= static_cast<log_phys_t*>(recs.log.last());
@@ -2012,7 +2081,7 @@ inline void recv_sys_t::add(map::iterator it, lsn_t start_lsn, lsn_t lsn,
if (tail->start_lsn != start_lsn)
break;
ut_ad(tail->lsn == lsn);
- buf_block_t *block= UT_LIST_GET_LAST(blocks);
+ block= UT_LIST_GET_LAST(blocks);
ut_ad(block);
const size_t used= static_cast<uint16_t>(block->page.access_time - 1) + 1;
ut_ad(used >= ALIGNMENT);
@@ -2025,7 +2094,7 @@ append:
MEM_MAKE_ADDRESSABLE(end + 1, len);
/* Append to the preceding record for the page */
tail->append(l, len);
- return;
+ return false;
}
if (end <= &block->page.frame[used - ALIGNMENT] ||
&block->page.frame[used] >= end)
@@ -2039,8 +2108,49 @@ append:
ut_calc_align<uint16_t>(static_cast<uint16_t>(new_used), ALIGNMENT);
goto append;
}
- recs.log.append(new (alloc(log_phys_t::alloc_size(len)))
+
+ const size_t size{log_phys_t::alloc_size(len)};
+ ut_ad(size <= srv_page_size);
+ void *buf;
+ block= UT_LIST_GET_FIRST(blocks);
+ if (UNIV_UNLIKELY(!block))
+ {
+ create_block:
+ block= add_block();
+ if (UNIV_UNLIKELY(!block))
+ return true;
+ block->page.access_time= 1U << 16 |
+ ut_calc_align<uint16_t>(static_cast<uint16_t>(size), ALIGNMENT);
+ static_assert(ut_is_2pow(ALIGNMENT), "ALIGNMENT must be a power of 2");
+ UT_LIST_ADD_FIRST(blocks, block);
+ MEM_MAKE_ADDRESSABLE(block->page.frame, size);
+ MEM_NOACCESS(block->page.frame + size, srv_page_size - size);
+ buf= block->page.frame;
+ }
+ else
+ {
+ size_t free_offset= static_cast<uint16_t>(block->page.access_time);
+ ut_ad(!ut_2pow_remainder(free_offset, ALIGNMENT));
+ if (UNIV_UNLIKELY(!free_offset))
+ {
+ ut_ad(srv_page_size == 65536);
+ goto create_block;
+ }
+ ut_ad(free_offset <= srv_page_size);
+ free_offset+= size;
+
+ if (free_offset > srv_page_size)
+ goto create_block;
+
+ block->page.access_time= ((block->page.access_time >> 16) + 1) << 16 |
+ ut_calc_align<uint16_t>(static_cast<uint16_t>(free_offset), ALIGNMENT);
+ MEM_MAKE_ADDRESSABLE(block->page.frame + free_offset - size, size);
+ buf= block->page.frame + free_offset - size;
+ }
+
+ recs.log.append(new (my_assume_aligned<ALIGNMENT>(buf))
log_phys_t{start_lsn, lsn, l, len});
+ return false;
}
/** Store/remove the freed pages in fil_name_t of recv_spaces.
@@ -2304,13 +2414,84 @@ struct recv_ring : public recv_buf
};
#endif
-/** Parse and register one log_t::FORMAT_10_8 mini-transaction.
-@param store whether to store the records
-@param l log data source */
template<typename source>
-inline recv_sys_t::parse_mtr_result recv_sys_t::parse(store_t store, source &l)
+void recv_sys_t::rewind(source &l, source &begin) noexcept
+{
+ ut_ad(srv_operation != SRV_OPERATION_BACKUP);
+ mysql_mutex_assert_owner(&mutex);
+
+ const source end= l;
+ uint32_t rlen;
+ for (l= begin; !(l == end); l+= rlen)
+ {
+ const source recs{l};
+ ++l;
+ const byte b= *recs;
+
+ ut_ad(b > 1);
+ ut_ad(UNIV_LIKELY((b & 0x70) != RESERVED) || srv_force_recovery);
+
+ rlen= b & 0xf;
+ if (!rlen)
+ {
+ const uint32_t lenlen= mlog_decode_varint_length(*l);
+ const uint32_t addlen= mlog_decode_varint(l);
+ ut_ad(addlen != MLOG_DECODE_ERROR);
+ rlen= addlen + 15 - lenlen;
+ l+= lenlen;
+ }
+ ut_ad(!l.is_eof(rlen));
+ if (b & 0x80)
+ continue;
+
+ uint32_t idlen= mlog_decode_varint_length(*l);
+ if (UNIV_UNLIKELY(idlen > 5 || idlen >= rlen))
+ continue;
+ const uint32_t space_id= mlog_decode_varint(l);
+ if (UNIV_UNLIKELY(space_id == MLOG_DECODE_ERROR))
+ continue;
+ l+= idlen;
+ rlen-= idlen;
+ idlen= mlog_decode_varint_length(*l);
+ if (UNIV_UNLIKELY(idlen > 5 || idlen > rlen))
+ continue;
+ const uint32_t page_no= mlog_decode_varint(l);
+ if (UNIV_UNLIKELY(page_no == MLOG_DECODE_ERROR))
+ continue;
+ const page_id_t id{space_id, page_no};
+ if (pages_it == pages.end() || pages_it->first != id)
+ {
+ pages_it= pages.find(id);
+ if (pages_it == pages.end())
+ continue;
+ }
+
+ ut_ad(!pages_it->second.being_processed);
+ const log_phys_t *head=
+ static_cast<log_phys_t*>(*pages_it->second.log.begin());
+ if (!head || head->start_lsn == lsn)
+ {
+ erase(pages_it);
+ pages_it= pages.end();
+ }
+ else
+ pages_it->second.log.rewind(lsn);
+ }
+
+ l= begin;
+ pages_it= pages.end();
+}
+
+/** Parse and register one log_t::FORMAT_10_8 mini-transaction.
+@tparam store whether to store the records
+@param l log data source
+@param if_exists if store: whether to check if the tablespace exists */
+template<typename source,bool store>
+inline
+recv_sys_t::parse_mtr_result recv_sys_t::parse(source &l, bool if_exists)
noexcept
{
+ restart:
#ifndef SUX_LOCK_GENERIC
ut_ad(log_sys.latch.is_write_locked() ||
srv_operation == SRV_OPERATION_BACKUP ||
@@ -2319,12 +2500,15 @@ inline recv_sys_t::parse_mtr_result recv_sys_t::parse(store_t store, source &l)
mysql_mutex_assert_owner(&mutex);
ut_ad(log_sys.next_checkpoint_lsn);
ut_ad(log_sys.is_latest());
+ ut_ad(store || !if_exists);
+ ut_ad(store ||
+ srv_operation != SRV_OPERATION_BACKUP ||
+ srv_operation != SRV_OPERATION_BACKUP_NO_DEFER);
alignas(8) byte iv[MY_AES_BLOCK_SIZE];
byte *decrypt_buf= static_cast<byte*>(alloca(srv_page_size));
const lsn_t start_lsn{lsn};
- map::iterator cached_pages_it{pages.end()};
/* Check that the entire mini-transaction is included within the buffer */
if (l.is_eof(0))
@@ -2333,7 +2517,7 @@ inline recv_sys_t::parse_mtr_result recv_sys_t::parse(store_t store, source &l)
if (*l <= 1)
return GOT_EOF; /* We should never write an empty mini-transaction. */
- const source begin{l};
+ source begin{l};
uint32_t rlen;
for (uint32_t total_len= 0; !l.is_eof(); l+= rlen, total_len+= rlen)
{
@@ -2433,7 +2617,6 @@ inline recv_sys_t::parse_mtr_result recv_sys_t::parse(store_t store, source &l)
sql_print_error("InnoDB: Unknown log record at LSN " LSN_PF, lsn);
corrupted:
found_corrupt_log= true;
- pthread_cond_broadcast(&cond);
return GOT_EOF;
}
@@ -2510,13 +2693,13 @@ inline recv_sys_t::parse_mtr_result recv_sys_t::parse(store_t store, source &l)
mach_write_to_4(iv + 12, page_no);
got_page_op= !(b & 0x80);
if (!got_page_op);
- else if (srv_operation == SRV_OPERATION_BACKUP)
+ else if (!store && srv_operation == SRV_OPERATION_BACKUP)
{
if (page_no == 0 && first_page_init && (b & 0x10))
first_page_init(space_id);
continue;
}
- else if (file_checkpoint && !is_predefined_tablespace(space_id))
+ else if (store && file_checkpoint && !is_predefined_tablespace(space_id))
{
recv_spaces_t::iterator i= recv_spaces.lower_bound(space_id);
if (i != recv_spaces.end() && i->first == space_id);
@@ -2585,7 +2768,7 @@ inline recv_sys_t::parse_mtr_result recv_sys_t::parse(store_t store, source &l)
trim({space_id, 0}, lsn);
truncated_undo_spaces[space_id - srv_undo_space_id_start]=
{ lsn, page_no };
- if (undo_space_trunc)
+ if (!store && undo_space_trunc)
undo_space_trunc(space_id);
#endif
last_offset= 1; /* the next record must not be same_page */
@@ -2626,7 +2809,7 @@ inline recv_sys_t::parse_mtr_result recv_sys_t::parse(store_t store, source &l)
{
if (UNIV_UNLIKELY(rlen + last_offset > srv_page_size))
goto record_corrupted;
- if (UNIV_UNLIKELY(!page_no) && file_checkpoint)
+ if (store && UNIV_UNLIKELY(!page_no) && file_checkpoint)
{
const bool has_size= last_offset <= FSP_HEADER_OFFSET + FSP_SIZE &&
last_offset + rlen >= FSP_HEADER_OFFSET + FSP_SIZE + 4;
@@ -2705,38 +2888,57 @@ inline recv_sys_t::parse_mtr_result recv_sys_t::parse(store_t store, source &l)
ut_ad(modified.emplace(id).second || (b & 0x70) != INIT_PAGE);
}
#endif
- const bool is_init= (b & 0x70) <= INIT_PAGE;
- switch (store) {
- case STORE_IF_EXISTS:
- if (fil_space_t *space= fil_space_t::get(space_id))
+ if (store)
+ {
+ if (if_exists)
{
- const auto size= space->get_size();
- space->release();
- if (!size)
+ if (fil_space_t *space= fil_space_t::get(space_id))
+ {
+ const auto size= space->get_size();
+ space->release();
+ if (!size)
+ continue;
+ }
+ else if (!deferred_spaces.find(space_id))
continue;
}
- else if (!deferred_spaces.find(space_id))
- continue;
- /* fall through */
- case STORE_YES:
if (!mlog_init.will_avoid_read(id, start_lsn))
{
- if (cached_pages_it == pages.end() ||
- cached_pages_it->first != id)
- cached_pages_it= pages.emplace(id, page_recv_t{}).first;
- add(cached_pages_it, start_lsn, lsn,
- l.get_buf(cl, recs, decrypt_buf), l - recs + rlen);
+ if (pages_it == pages.end() || pages_it->first != id)
+ pages_it= pages.emplace(id, page_recv_t{}).first;
+ if (UNIV_UNLIKELY(add(pages_it, start_lsn, lsn,
+ l.get_buf(cl, recs, decrypt_buf),
+ l - recs + rlen)))
+ {
+ lsn= start_lsn;
+ log_sys.set_recovered_lsn(start_lsn);
+ l+= rlen;
+ offset= begin.ptr - log_sys.buf;
+ rewind(l, begin);
+ if (if_exists)
+ {
+ apply(false);
+ if (is_corrupt_fs())
+ return GOT_EOF;
+ goto restart;
+ }
+ sql_print_information("InnoDB: Multi-batch recovery needed at LSN "
+ LSN_PF, lsn);
+ return GOT_OOM;
+ }
}
- continue;
- case STORE_NO:
- if (!is_init)
- continue;
+ }
+ else if ((b & 0x70) <= INIT_PAGE)
+ {
mlog_init.add(id, start_lsn);
- map::iterator i= pages.find(id);
- if (i == pages.end())
- continue;
- i->second.log.clear();
- pages.erase(i);
+ if (pages_it == pages.end() || pages_it->first != id)
+ {
+ pages_it= pages.find(id);
+ if (pages_it == pages.end())
+ continue;
+ }
+ map::iterator r= pages_it++;
+ erase(r);
}
}
else if (rlen)
@@ -2749,6 +2951,11 @@ inline recv_sys_t::parse_mtr_result recv_sys_t::parse(store_t store, source &l)
if (rlen < UNIV_PAGE_SIZE_MAX && !l.is_zero(rlen))
continue;
}
+ else if (store)
+ {
+ ut_ad(file_checkpoint);
+ continue;
+ }
else if (const lsn_t c= l.read8())
{
if (UNIV_UNLIKELY(srv_print_verbose_log == 2))
@@ -2830,21 +3037,27 @@ inline recv_sys_t::parse_mtr_result recv_sys_t::parse(store_t store, source &l)
if (UNIV_UNLIKELY(!recv_needed_recovery && srv_read_only_mode))
continue;
+ if (!store &&
+ (srv_operation == SRV_OPERATION_BACKUP ||
+ srv_operation == SRV_OPERATION_BACKUP_NO_DEFER))
+ {
+ if ((b & 0xf0) < FILE_CHECKPOINT && log_file_op)
+ log_file_op(space_id, b & 0xf0,
+ reinterpret_cast<const byte*>(fn),
+ static_cast<ulint>(fnend - fn),
+ reinterpret_cast<const byte*>(fn2),
+ fn2 ? static_cast<ulint>(fn2end - fn2) : 0);
+ continue;
+ }
+
fil_name_process(fn, fnend - fn, space_id,
(b & 0xf0) == FILE_DELETE ? FILE_DELETE : FILE_MODIFY,
- start_lsn, store);
-
- if ((b & 0xf0) < FILE_CHECKPOINT && log_file_op)
- log_file_op(space_id, b & 0xf0,
- reinterpret_cast<const byte*>(fn),
- static_cast<ulint>(fnend - fn),
- reinterpret_cast<const byte*>(fn2),
- fn2 ? static_cast<ulint>(fn2end - fn2) : 0);
+ start_lsn, if_exists);
if (fn2)
{
fil_name_process(fn2, fn2end - fn2, space_id,
- FILE_RENAME, start_lsn, store);
+ FILE_RENAME, start_lsn, if_exists);
if (file_checkpoint)
{
const size_t len= fn2end - fn2;
@@ -2868,18 +3081,23 @@ inline recv_sys_t::parse_mtr_result recv_sys_t::parse(store_t store, source &l)
return OK;
}
-ATTRIBUTE_NOINLINE
-recv_sys_t::parse_mtr_result recv_sys_t::parse_mtr(store_t store) noexcept
+template<bool store>
+recv_sys_t::parse_mtr_result recv_sys_t::parse_mtr(bool if_exists) noexcept
{
recv_buf s{&log_sys.buf[recv_sys.offset]};
- return recv_sys.parse(store, s);
+ return recv_sys.parse<recv_buf,store>(s, if_exists);
}
+/** for mariadb-backup; @see xtrabackup_copy_logfile() */
+template
+recv_sys_t::parse_mtr_result recv_sys_t::parse_mtr<false>(bool) noexcept;
+
#ifdef HAVE_PMEM
-recv_sys_t::parse_mtr_result recv_sys_t::parse_pmem(store_t store) noexcept
+template<bool store>
+recv_sys_t::parse_mtr_result recv_sys_t::parse_pmem(bool if_exists) noexcept
{
- recv_sys_t::parse_mtr_result r{parse_mtr(store)};
- if (r != PREMATURE_EOF || !log_sys.is_pmem())
+ recv_sys_t::parse_mtr_result r{parse_mtr<store>(if_exists)};
+ if (UNIV_LIKELY(r != PREMATURE_EOF) || !log_sys.is_pmem())
return r;
ut_ad(recv_sys.len == log_sys.file_size);
ut_ad(recv_sys.offset >= log_sys.START_OFFSET);
@@ -2888,7 +3106,7 @@ recv_sys_t::parse_mtr_result recv_sys_t::parse_pmem(store_t store) noexcept
{recv_sys.offset == recv_sys.len
? &log_sys.buf[log_sys.START_OFFSET]
: &log_sys.buf[recv_sys.offset]};
- return recv_sys.parse(store, s);
+ return recv_sys.parse<recv_ring,store>(s, if_exists);
}
#endif
@@ -2896,23 +3114,22 @@ recv_sys_t::parse_mtr_result recv_sys_t::parse_pmem(store_t store) noexcept
lsn of a log record.
@param[in,out] block buffer pool page
@param[in,out] mtr mini-transaction
-@param[in,out] p recovery address
+@param[in,out] recs log records to apply
@param[in,out] space tablespace, or NULL if not looked up yet
@param[in,out] init page initialization operation, or NULL
@return the recovered page
@retval nullptr on failure */
static buf_block_t *recv_recover_page(buf_block_t *block, mtr_t &mtr,
- const recv_sys_t::map::iterator &p,
- fil_space_t *space= nullptr,
- mlog_init_t::init *init= nullptr)
+ page_recv_t &recs,
+ fil_space_t *space,
+ recv_init *init)
{
- mysql_mutex_assert_owner(&recv_sys.mutex);
+ mysql_mutex_assert_not_owner(&recv_sys.mutex);
ut_ad(recv_sys.apply_log_recs);
ut_ad(recv_needed_recovery);
ut_ad(!init || init->created);
ut_ad(!init || init->lsn);
- ut_ad(block->page.id() == p->first);
- ut_ad(!p->second.is_being_processed());
+ ut_ad(recs.being_processed == 1);
ut_ad(!space || space->id == block->page.id().space());
ut_ad(log_sys.is_latest());
@@ -2924,10 +3141,6 @@ static buf_block_t *recv_recover_page(buf_block_t *block, mtr_t &mtr,
block->page.id().space(),
block->page.id().page_no()));
- p->second.state = page_recv_t::RECV_BEING_PROCESSED;
-
- mysql_mutex_unlock(&recv_sys.mutex);
-
byte *frame = UNIV_LIKELY_NULL(block->page.zip.data)
? block->page.zip.data
: block->page.frame;
@@ -2941,7 +3154,7 @@ static buf_block_t *recv_recover_page(buf_block_t *block, mtr_t &mtr,
bool skipped_after_init = false;
- for (const log_rec_t* recv : p->second.log) {
+ for (const log_rec_t* recv : recs.log) {
const log_phys_t* l = static_cast<const log_phys_t*>(recv);
ut_ad(l->lsn);
ut_ad(end_lsn <= l->lsn);
@@ -2999,8 +3212,7 @@ static buf_block_t *recv_recover_page(buf_block_t *block, mtr_t &mtr,
block->page.id().space(),
block->page.id().page_no()));
- log_phys_t::apply_status a= l->apply(*block,
- p->second.last_offset);
+ log_phys_t::apply_status a= l->apply(*block, recs.last_offset);
switch (a) {
case log_phys_t::APPLIED_NO:
@@ -3123,26 +3335,11 @@ set_start_lsn:
mtr.commit();
done:
- time_t now = time(NULL);
-
- mysql_mutex_lock(&recv_sys.mutex);
-
+ /* FIXME: do this in page read, protected with recv_sys.mutex! */
if (recv_max_page_lsn < page_lsn) {
recv_max_page_lsn = page_lsn;
}
- ut_ad(!block || p->second.is_being_processed());
- ut_ad(!block || !recv_sys.pages.empty());
-
- if (recv_sys.report(now)) {
- const size_t n = recv_sys.pages.size();
- sql_print_information("InnoDB: To recover: %zu pages from log",
- n);
- service_manager_extend_timeout(INNODB_EXTEND_TIMEOUT_INTERVAL,
- "To recover: %zu pages"
- " from log", n);
- }
-
return block;
}
@@ -3156,146 +3353,347 @@ ATTRIBUTE_COLD void recv_sys_t::free_corrupted_page(page_id_t page_id)
mysql_mutex_lock(&mutex);
map::iterator p= pages.find(page_id);
- if (p != pages.end())
+ if (p == pages.end())
{
- p->second.log.clear();
- pages.erase(p);
- if (!srv_force_recovery)
- {
- set_corrupt_fs();
- ib::error() << "Unable to apply log to corrupted page " << page_id
- << "; set innodb_force_recovery to ignore";
- }
- else
- ib::warn() << "Discarding log for corrupted page " << page_id;
+ mysql_mutex_unlock(&mutex);
+ return;
}
- if (pages.empty())
- pthread_cond_broadcast(&cond);
+ p->second.being_processed= -1;
+ if (!srv_force_recovery)
+ set_corrupt_fs();
mysql_mutex_unlock(&mutex);
-}
-/** Possibly finish a recovery batch. */
-inline void recv_sys_t::maybe_finish_batch()
-{
- mysql_mutex_assert_owner(&mutex);
- ut_ad(recovery_on);
- if (!apply_batch_on || pages.empty() || is_corrupt_log() || is_corrupt_fs())
- pthread_cond_broadcast(&cond);
+ ib::error_or_warn(!srv_force_recovery)
+ << "Unable to apply log to corrupted page " << page_id;
}
ATTRIBUTE_COLD void recv_sys_t::set_corrupt_log()
{
mysql_mutex_lock(&mutex);
found_corrupt_log= true;
- pthread_cond_broadcast(&cond);
mysql_mutex_unlock(&mutex);
}
ATTRIBUTE_COLD void recv_sys_t::set_corrupt_fs()
{
mysql_mutex_assert_owner(&mutex);
+ if (!srv_force_recovery)
+ sql_print_information("InnoDB: Set innodb_force_recovery=1"
+ " to ignore corrupted pages.");
found_corrupt_fs= true;
- pthread_cond_broadcast(&cond);
}
-/** Apply any buffered redo log to a page that was just read from a data file.
-@param[in,out] space tablespace
-@param[in,out] bpage buffer pool page
+/** Apply any buffered redo log to a page.
+@param space tablespace
+@param bpage buffer pool page
@return whether the page was recovered correctly */
bool recv_recover_page(fil_space_t* space, buf_page_t* bpage)
{
- mtr_t mtr;
- mtr.start();
- mtr.set_log_mode(MTR_LOG_NO_REDO);
-
- ut_ad(bpage->frame);
- /* Move the ownership of the x-latch on the page to
- this OS thread, so that we can acquire a second
- x-latch on it. This is needed for the operations to
- the page to pass the debug checks. */
- bpage->lock.claim_ownership();
- bpage->lock.x_lock_recursive();
- bpage->fix_on_recovery();
- mtr.memo_push(reinterpret_cast<buf_block_t*>(bpage),
- MTR_MEMO_PAGE_X_FIX);
-
- buf_block_t* success = reinterpret_cast<buf_block_t*>(bpage);
+ mtr_t mtr;
+ mtr.start();
+ mtr.set_log_mode(MTR_LOG_NO_REDO);
- mysql_mutex_lock(&recv_sys.mutex);
- if (recv_sys.apply_log_recs) {
- recv_sys_t::map::iterator p = recv_sys.pages.find(bpage->id());
- if (p != recv_sys.pages.end()
- && !p->second.is_being_processed()) {
- success = recv_recover_page(success, mtr, p, space);
- if (UNIV_LIKELY(!!success)) {
- p->second.log.clear();
- recv_sys.pages.erase(p);
- }
- recv_sys.maybe_finish_batch();
- goto func_exit;
- }
- }
+ ut_ad(bpage->frame);
+ /* Move the ownership of the x-latch on the page to this OS thread,
+ so that we can acquire a second x-latch on it. This is needed for
+ the operations to the page to pass the debug checks. */
+ bpage->lock.claim_ownership();
+ bpage->lock.x_lock_recursive();
+ bpage->fix_on_recovery();
+ mtr.memo_push(reinterpret_cast<buf_block_t*>(bpage), MTR_MEMO_PAGE_X_FIX);
- mtr.commit();
+ buf_block_t *success= reinterpret_cast<buf_block_t*>(bpage);
+
+ mysql_mutex_lock(&recv_sys.mutex);
+ if (recv_sys.apply_log_recs)
+ {
+ const page_id_t id{bpage->id()};
+ recv_sys_t::map::iterator p= recv_sys.pages.find(id);
+ if (p == recv_sys.pages.end());
+ else if (p->second.being_processed < 0)
+ {
+ recv_sys.pages_it_invalidate(p);
+ recv_sys.erase(p);
+ }
+ else
+ {
+ p->second.being_processed= 1;
+ recv_sys_t::init *init= nullptr;
+ if (p->second.skip_read)
+ (init= &mlog_init.last(id))->created= true;
+ mysql_mutex_unlock(&recv_sys.mutex);
+ success= recv_recover_page(success, mtr, p->second, space, init);
+ p->second.being_processed= -1;
+ goto func_exit;
+ }
+ }
+
+ mysql_mutex_unlock(&recv_sys.mutex);
+ mtr.commit();
func_exit:
- mysql_mutex_unlock(&recv_sys.mutex);
- ut_ad(mtr.has_committed());
- return success;
+ ut_ad(mtr.has_committed());
+ return success;
}
-/** Read pages for which log needs to be applied.
-@param page_id first page identifier to read
-@param i iterator to recv_sys.pages */
-TRANSACTIONAL_TARGET
-static void recv_read_in_area(page_id_t page_id, recv_sys_t::map::iterator i)
+void IORequest::fake_read_complete(os_offset_t offset) const
{
- uint32_t page_nos[32];
- ut_ad(page_id == i->first);
- page_id.set_page_no(ut_2pow_round(page_id.page_no(), 32U));
- const page_id_t up_limit{page_id + 31};
- uint32_t* p= page_nos;
+ ut_ad(node);
+ ut_ad(is_read());
+ ut_ad(bpage);
+ ut_ad(bpage->frame);
+ ut_ad(recv_recovery_is_on());
+ ut_ad(offset);
+
+ mtr_t mtr;
+ mtr.start();
+ mtr.set_log_mode(MTR_LOG_NO_REDO);
- for (; i != recv_sys.pages.end() && i->first <= up_limit; i++)
+ ut_ad(bpage->frame);
+ /* Move the ownership of the x-latch on the page to this OS thread,
+ so that we can acquire a second x-latch on it. This is needed for
+ the operations to the page to pass the debug checks. */
+ bpage->lock.claim_ownership();
+ bpage->lock.x_lock_recursive();
+ bpage->fix_on_recovery();
+ mtr.memo_push(reinterpret_cast<buf_block_t*>(bpage), MTR_MEMO_PAGE_X_FIX);
+
+ page_recv_t &recs= *reinterpret_cast<page_recv_t*>(slot);
+ ut_ad(recs.being_processed == 1);
+ recv_init &init= *reinterpret_cast<recv_init*>(offset);
+ ut_ad(init.lsn > 1);
+ init.created= true;
+
+ if (recv_recover_page(reinterpret_cast<buf_block_t*>(bpage),
+ mtr, recs, node->space, &init))
{
- if (i->second.state == page_recv_t::RECV_NOT_PROCESSED)
+ ut_ad(bpage->oldest_modification() || bpage->is_freed());
+ bpage->lock.x_unlock(true);
+ }
+ recs.being_processed= -1;
+ ut_ad(mtr.has_committed());
+
+ node->space->release();
+}
+
+/** @return whether a page has been freed */
+inline bool fil_space_t::is_freed(uint32_t page)
+{
+ std::lock_guard<std::mutex> freed_lock(freed_range_mutex);
+ return freed_ranges.contains(page);
+}
+
+bool recv_sys_t::report(time_t time)
+{
+ if (time - progress_time < 15)
+ return false;
+ progress_time= time;
+ return true;
+}
+
+ATTRIBUTE_COLD
+void recv_sys_t::report_progress() const
+{
+ mysql_mutex_assert_owner(&mutex);
+ const size_t n{pages.size()};
+ if (recv_sys.scanned_lsn == recv_sys.lsn)
+ {
+ sql_print_information("InnoDB: To recover: %zu pages", n);
+ service_manager_extend_timeout(INNODB_EXTEND_TIMEOUT_INTERVAL,
+ "To recover: %zu pages", n);
+ }
+ else
+ {
+ sql_print_information("InnoDB: To recover: LSN " LSN_PF
+ "/" LSN_PF "; %zu pages",
+ recv_sys.lsn, recv_sys.scanned_lsn, n);
+ service_manager_extend_timeout(INNODB_EXTEND_TIMEOUT_INTERVAL,
+ "To recover: LSN " LSN_PF
+ "/" LSN_PF "; %zu pages",
+ recv_sys.lsn, recv_sys.scanned_lsn, n);
+ }
+}
+
+/** Apply a recovery batch.
+@param space_id current tablespace identifier
+@param space current tablespace
+@param free_block spare buffer block
+@param last_batch whether it is possible to write more redo log
+@return whether the caller must provide a new free_block */
+bool recv_sys_t::apply_batch(uint32_t space_id, fil_space_t *&space,
+ buf_block_t *&free_block, bool last_batch)
+{
+ mysql_mutex_assert_owner(&mutex);
+ ut_ad(pages_it != pages.end());
+ ut_ad(!pages_it->second.log.empty());
+
+ mysql_mutex_lock(&buf_pool.mutex);
+ size_t n= 0, max_n= std::min<size_t>(BUF_LRU_MIN_LEN,
+ UT_LIST_GET_LEN(buf_pool.LRU) +
+ UT_LIST_GET_LEN(buf_pool.free));
+ mysql_mutex_unlock(&buf_pool.mutex);
+
+ map::iterator begin= pages.end();
+ page_id_t begin_id{~0ULL};
+
+ while (pages_it != pages.end() && n < max_n)
+ {
+ ut_ad(!buf_dblwr.is_inside(pages_it->first));
+ if (!pages_it->second.being_processed)
{
- i->second.state= page_recv_t::RECV_BEING_READ;
- *p++= i->first.page_no();
+ if (space_id != pages_it->first.space())
+ {
+ space_id= pages_it->first.space();
+ if (space)
+ space->release();
+ space= fil_space_t::get(space_id);
+ if (!space)
+ {
+ auto d= deferred_spaces.defers.find(space_id);
+ if (d == deferred_spaces.defers.end() || d->second.deleted)
+ /* For deleted files we preserve the deferred_spaces entry */;
+ else if (!free_block)
+ return true;
+ else
+ {
+ space= recover_deferred(pages_it, d->second.file_name, free_block);
+ deferred_spaces.defers.erase(d);
+ if (!space && !srv_force_recovery)
+ {
+ set_corrupt_fs();
+ return false;
+ }
+ }
+ }
+ }
+ if (!space || space->is_freed(pages_it->first.page_no()))
+ pages_it->second.being_processed= -1;
+ else if (!n++)
+ {
+ begin= pages_it;
+ begin_id= pages_it->first;
+ }
}
+ pages_it++;
}
- if (p != page_nos)
+ if (!last_batch)
+ log_sys.latch.wr_unlock();
+
+ pages_it= begin;
+
+ if (report(time(nullptr)))
+ report_progress();
+
+ if (!n)
+ goto wait;
+
+ mysql_mutex_lock(&buf_pool.mutex);
+
+ if (UNIV_UNLIKELY(UT_LIST_GET_LEN(buf_pool.free) < n))
{
- mysql_mutex_unlock(&recv_sys.mutex);
- buf_read_recv_pages(page_id.space(), {page_nos, p});
- mysql_mutex_lock(&recv_sys.mutex);
+ mysql_mutex_unlock(&buf_pool.mutex);
+ wait:
+ wait_for_pool(n);
+ if (n);
+ else if (!last_batch)
+ goto unlock_relock;
+ else
+ goto get_last;
+ pages_it= pages.lower_bound(begin_id);
+ ut_ad(pages_it != pages.end());
}
+ else
+ mysql_mutex_unlock(&buf_pool.mutex);
+
+ while (pages_it != pages.end())
+ {
+ ut_ad(!buf_dblwr.is_inside(pages_it->first));
+ if (!pages_it->second.being_processed)
+ {
+ const page_id_t id{pages_it->first};
+
+ if (space_id != id.space())
+ {
+ space_id= id.space();
+ if (space)
+ space->release();
+ space= fil_space_t::get(space_id);
+ }
+ if (!space)
+ {
+ const auto it= deferred_spaces.defers.find(space_id);
+ if (it != deferred_spaces.defers.end() && !it->second.deleted)
+ /* The records must be processed after recover_deferred(). */
+ goto next;
+ goto space_not_found;
+ }
+ else if (space->is_freed(id.page_no()))
+ {
+ space_not_found:
+ pages_it->second.being_processed= -1;
+ goto next;
+ }
+ else
+ {
+ page_recv_t &recs= pages_it->second;
+ ut_ad(!recs.log.empty());
+ recs.being_processed= 1;
+ init *init= recs.skip_read ? &mlog_init.last(id) : nullptr;
+ mysql_mutex_unlock(&mutex);
+ buf_read_recover(space, id, recs, init);
+ }
+
+ if (!--n)
+ {
+ if (last_batch)
+ goto relock_last;
+ goto relock;
+ }
+ mysql_mutex_lock(&mutex);
+ pages_it= pages.lower_bound(id);
+ }
+ else
+ next:
+ pages_it++;
+ }
+
+ if (!last_batch)
+ {
+ unlock_relock:
+ mysql_mutex_unlock(&mutex);
+ relock:
+ log_sys.latch.wr_lock(SRW_LOCK_CALL);
+ relock_last:
+ mysql_mutex_lock(&mutex);
+ get_last:
+ pages_it= pages.lower_bound(begin_id);
+ }
+
+ return false;
}
/** Attempt to initialize a page based on redo log records.
-@param page_id page identifier
-@param p iterator pointing to page_id
+@param p iterator
@param mtr mini-transaction
@param b pre-allocated buffer pool block
+@param init page initialization
@return the recovered block
@retval nullptr if the page cannot be initialized based on log records
@retval -1 if the page cannot be recovered due to corruption */
-inline buf_block_t *recv_sys_t::recover_low(const page_id_t page_id,
- map::iterator &p, mtr_t &mtr,
- buf_block_t *b)
+inline buf_block_t *recv_sys_t::recover_low(const map::iterator &p, mtr_t &mtr,
+ buf_block_t *b, init &init)
{
- mysql_mutex_assert_owner(&mutex);
- ut_ad(p->first == page_id);
+ mysql_mutex_assert_not_owner(&mutex);
page_recv_t &recs= p->second;
- ut_ad(recs.state == page_recv_t::RECV_WILL_NOT_READ);
+ ut_ad(recs.skip_read);
+ ut_ad(recs.being_processed == 1);
buf_block_t* block= nullptr;
- mlog_init_t::init &i= mlog_init.last(page_id);
const lsn_t end_lsn= recs.log.last()->lsn;
- if (end_lsn < i.lsn)
- DBUG_LOG("ib_log", "skip log for page " << page_id
- << " LSN " << end_lsn << " < " << i.lsn);
- fil_space_t *space= fil_space_t::get(page_id.space());
+ if (end_lsn < init.lsn)
+ DBUG_LOG("ib_log", "skip log for page " << p->first
+ << " LSN " << end_lsn << " < " << init.lsn);
+ fil_space_t *space= fil_space_t::get(p->first.space());
mtr.start();
mtr.set_log_mode(MTR_LOG_NO_REDO);
@@ -3304,82 +3702,77 @@ inline buf_block_t *recv_sys_t::recover_low(const page_id_t page_id,
if (!space)
{
- if (page_id.page_no() != 0)
+ if (p->first.page_no() != 0)
{
nothing_recoverable:
mtr.commit();
return nullptr;
}
- auto it= recv_spaces.find(page_id.space());
+ auto it= recv_spaces.find(p->first.space());
ut_ad(it != recv_spaces.end());
uint32_t flags= it->second.flags;
zip_size= fil_space_t::zip_size(flags);
- block= buf_page_create_deferred(page_id.space(), zip_size, &mtr, b);
+ block= buf_page_create_deferred(p->first.space(), zip_size, &mtr, b);
ut_ad(block == b);
block->page.lock.x_lock_recursive();
}
else
{
- block= buf_page_create(space, page_id.page_no(), zip_size, &mtr, b);
+ block= buf_page_create(space, p->first.page_no(), zip_size, &mtr, b);
if (UNIV_UNLIKELY(block != b))
{
/* The page happened to exist in the buffer pool, or it
was just being read in. Before the exclusive page latch was acquired by
buf_page_create(), all changes to the page must have been applied. */
- ut_ad(pages.find(page_id) == pages.end());
+ ut_d(mysql_mutex_lock(&mutex));
+ ut_ad(pages.find(p->first) == pages.end());
+ ut_d(mysql_mutex_unlock(&mutex));
space->release();
goto nothing_recoverable;
}
}
- ut_ad(&recs == &pages.find(page_id)->second);
- i.created= true;
- map::iterator r= p++;
- block= recv_recover_page(block, mtr, r, space, &i);
+ ut_d(mysql_mutex_lock(&mutex));
+ ut_ad(&recs == &pages.find(p->first)->second);
+ ut_d(mysql_mutex_unlock(&mutex));
+ init.created= true;
+ block= recv_recover_page(block, mtr, recs, space, &init);
ut_ad(mtr.has_committed());
- if (block)
- {
- recs.log.clear();
- pages.erase(r);
- }
- else
- block= reinterpret_cast<buf_block_t*>(-1);
-
- if (pages.empty())
- pthread_cond_signal(&cond);
-
if (space)
space->release();
- return block;
+ return block ? block : reinterpret_cast<buf_block_t*>(-1);
}
/** Attempt to initialize a page based on redo log records.
@param page_id page identifier
@return recovered block
@retval nullptr if the page cannot be initialized based on log records */
-buf_block_t *recv_sys_t::recover_low(const page_id_t page_id)
+ATTRIBUTE_COLD buf_block_t *recv_sys_t::recover_low(const page_id_t page_id)
{
- buf_block_t *free_block= buf_LRU_get_free_block(false);
- buf_block_t *block= nullptr;
-
mysql_mutex_lock(&mutex);
map::iterator p= pages.find(page_id);
- if (p != pages.end() && p->second.state == page_recv_t::RECV_WILL_NOT_READ)
+ if (p != pages.end() && !p->second.being_processed && p->second.skip_read)
{
+ p->second.being_processed= 1;
+ init &init= mlog_init.last(page_id);
+ mysql_mutex_unlock(&mutex);
+ buf_block_t *free_block= buf_LRU_get_free_block(false);
mtr_t mtr;
- block= recover_low(page_id, p, mtr, free_block);
+ buf_block_t *block= recover_low(p, mtr, free_block, init);
+ p->second.being_processed= -1;
ut_ad(!block || block == reinterpret_cast<buf_block_t*>(-1) ||
block == free_block);
+ if (UNIV_UNLIKELY(!block))
+ buf_pool.free_block(free_block);
+ return block;
}
mysql_mutex_unlock(&mutex);
- if (UNIV_UNLIKELY(!block))
- buf_pool.free_block(free_block);
- return block;
+ return nullptr;
}
inline fil_space_t *fil_system_t::find(const char *path) const
@@ -3427,45 +3820,18 @@ void recv_sys_t::apply(bool last_batch)
mysql_mutex_assert_owner(&mutex);
- timespec abstime;
-
- while (apply_batch_on)
- {
- if (is_corrupt_log())
- return;
- if (last_batch)
- my_cond_wait(&cond, &mutex.m_mutex);
- else
- {
-#ifndef SUX_LOCK_GENERIC
- ut_ad(log_sys.latch.is_write_locked());
-#endif
- log_sys.latch.wr_unlock();
- set_timespec_nsec(abstime, 500000000ULL); /* 0.5s */
- my_cond_timedwait(&cond, &mutex.m_mutex, &abstime);
- mysql_mutex_unlock(&mutex);
- log_sys.latch.wr_lock(SRW_LOCK_CALL);
- mysql_mutex_lock(&mutex);
- }
- }
-
- recv_no_ibuf_operations = !last_batch ||
- srv_operation == SRV_OPERATION_RESTORE ||
- srv_operation == SRV_OPERATION_RESTORE_EXPORT;
-
- mtr_t mtr;
+ garbage_collect();
if (!pages.empty())
{
- const char *msg= last_batch
- ? "Starting final batch to recover"
- : "Starting a batch to recover";
- const size_t n= pages.size();
- sql_print_information("InnoDB: %s %zu pages from redo log.", msg, n);
- sd_notifyf(0, "STATUS=%s %zu pages from redo log", msg, n);
+ recv_no_ibuf_operations = !last_batch ||
+ srv_operation == SRV_OPERATION_RESTORE ||
+ srv_operation == SRV_OPERATION_RESTORE_EXPORT;
+ ut_ad(!last_batch || lsn == scanned_lsn);
+ progress_time= time(nullptr);
+ report_progress();
apply_log_recs= true;
- apply_batch_on= true;
for (auto id= srv_undo_tablespaces_open; id--;)
{
@@ -3491,132 +3857,71 @@ void recv_sys_t::apply(bool last_batch)
fil_system.extend_to_recv_size();
- /* We must release log_sys.latch and recv_sys.mutex before
- invoking buf_LRU_get_free_block(). Allocating a block may initiate
- a redo log write and therefore acquire log_sys.latch. To avoid
- deadlocks, log_sys.latch must not be acquired while holding
- recv_sys.mutex. */
- mysql_mutex_unlock(&mutex);
- if (!last_batch)
- log_sys.latch.wr_unlock();
-
- buf_block_t *free_block= buf_LRU_get_free_block(false);
+ fil_space_t *space= nullptr;
+ uint32_t space_id= ~0;
+ buf_block_t *free_block= nullptr;
- if (!last_batch)
- log_sys.latch.wr_lock(SRW_LOCK_CALL);
- mysql_mutex_lock(&mutex);
-
- for (map::iterator p= pages.begin(); p != pages.end(); )
+ for (pages_it= pages.begin(); pages_it != pages.end();
+ pages_it= pages.begin())
{
- const page_id_t page_id= p->first;
- ut_ad(!p->second.log.empty());
+ if (!free_block)
+ {
+ if (!last_batch)
+ log_sys.latch.wr_unlock();
+ wait_for_pool(1);
+ pages_it= pages.begin();
+ mysql_mutex_unlock(&mutex);
+ /* We must release log_sys.latch and recv_sys.mutex before
+ invoking buf_LRU_get_free_block(). Allocating a block may initiate
+ a redo log write and therefore acquire log_sys.latch. To avoid
+ deadlocks, log_sys.latch must not be acquired while holding
+ recv_sys.mutex. */
+ free_block= buf_LRU_get_free_block(false);
+ if (!last_batch)
+ log_sys.latch.wr_lock(SRW_LOCK_CALL);
+ mysql_mutex_lock(&mutex);
+ pages_it= pages.begin();
+ }
- const uint32_t space_id= page_id.space();
- auto d= deferred_spaces.defers.find(space_id);
- if (d != deferred_spaces.defers.end())
+ while (pages_it != pages.end())
{
- if (d->second.deleted)
+ if (is_corrupt_fs() || is_corrupt_log())
{
- /* For deleted files we must preserve the entry in deferred_spaces */
-erase_for_space:
- while (p != pages.end() && p->first.space() == space_id)
+ if (space)
+ space->release();
+ if (free_block)
{
- map::iterator r= p++;
- r->second.log.clear();
- pages.erase(r);
+ mysql_mutex_unlock(&mutex);
+ mysql_mutex_lock(&buf_pool.mutex);
+ buf_LRU_block_free_non_file_page(free_block);
+ mysql_mutex_unlock(&buf_pool.mutex);
+ mysql_mutex_lock(&mutex);
}
+ return;
}
- else if (recover_deferred(p, d->second.file_name, free_block))
- {
- if (!srv_force_recovery)
- set_corrupt_fs();
- deferred_spaces.defers.erase(d);
- goto erase_for_space;
- }
- else
- deferred_spaces.defers.erase(d);
- if (!free_block)
- goto next_free_block;
- p= pages.lower_bound(page_id);
- continue;
- }
-
- switch (p->second.state) {
- case page_recv_t::RECV_BEING_READ:
- case page_recv_t::RECV_BEING_PROCESSED:
- p++;
- continue;
- case page_recv_t::RECV_WILL_NOT_READ:
- if (UNIV_LIKELY(!!recover_low(page_id, p, mtr, free_block)))
- {
-next_free_block:
- mysql_mutex_unlock(&mutex);
- if (!last_batch)
- log_sys.latch.wr_unlock();
- free_block= buf_LRU_get_free_block(false);
- if (!last_batch)
- log_sys.latch.wr_lock(SRW_LOCK_CALL);
- mysql_mutex_lock(&mutex);
+ if (apply_batch(space_id, space, free_block, last_batch))
break;
- }
- ut_ad(p == pages.end() || p->first > page_id);
- continue;
- case page_recv_t::RECV_NOT_PROCESSED:
- recv_read_in_area(page_id, p);
}
- p= pages.lower_bound(page_id);
- /* Ensure that progress will be made. */
- ut_ad(p == pages.end() || p->first > page_id ||
- p->second.state >= page_recv_t::RECV_BEING_READ);
}
- buf_pool.free_block(free_block);
+ if (space)
+ space->release();
- /* Wait until all the pages have been processed */
- for (;;)
+ if (free_block)
{
- const bool empty= pages.empty();
- if (empty && !os_aio_pending_reads())
- break;
-
- if (!is_corrupt_fs() && !is_corrupt_log())
- {
- if (last_batch)
- {
- if (!empty)
- my_cond_wait(&cond, &mutex.m_mutex);
- else
- {
- mysql_mutex_unlock(&mutex);
- os_aio_wait_until_no_pending_reads(false);
- mysql_mutex_lock(&mutex);
- ut_ad(pages.empty());
- }
- }
- else
- {
-#ifndef SUX_LOCK_GENERIC
- ut_ad(log_sys.latch.is_write_locked());
-#endif
- log_sys.latch.wr_unlock();
- set_timespec_nsec(abstime, 500000000ULL); /* 0.5s */
- my_cond_timedwait(&cond, &mutex.m_mutex, &abstime);
- mysql_mutex_unlock(&mutex);
- log_sys.latch.wr_lock(SRW_LOCK_CALL);
- mysql_mutex_lock(&mutex);
- }
- continue;
- }
- if (is_corrupt_fs() && !srv_force_recovery)
- sql_print_information("InnoDB: Set innodb_force_recovery=1"
- " to ignore corrupted pages.");
- return;
+ mysql_mutex_lock(&buf_pool.mutex);
+ buf_LRU_block_free_non_file_page(free_block);
+ mysql_mutex_unlock(&buf_pool.mutex);
}
}
if (last_batch)
- /* We skipped this in buf_page_create(). */
- mlog_init.mark_ibuf_exist(mtr);
+ {
+ if (!recv_no_ibuf_operations)
+ /* We skipped this in buf_page_create(). */
+ mlog_init.mark_ibuf_exist();
+ mlog_init.clear();
+ }
else
{
mlog_init.reset();
@@ -3625,21 +3930,22 @@ next_free_block:
mysql_mutex_unlock(&mutex);
- if (last_batch && srv_operation != SRV_OPERATION_RESTORE &&
- srv_operation != SRV_OPERATION_RESTORE_EXPORT)
- /* Instead of flushing, last_batch sorts the buf_pool.flush_list
- in ascending order of buf_page_t::oldest_modification. */
- log_sort_flush_list();
- else
- buf_flush_sync_batch(lsn);
-
if (!last_batch)
{
+ buf_flush_sync_batch(lsn);
buf_pool_invalidate();
log_sys.latch.wr_lock(SRW_LOCK_CALL);
}
+ else if (srv_operation == SRV_OPERATION_RESTORE ||
+ srv_operation == SRV_OPERATION_RESTORE_EXPORT)
+ buf_flush_sync_batch(lsn);
+ else
+ /* Instead of flushing, last_batch sorts the buf_pool.flush_list
+ in ascending order of buf_page_t::oldest_modification. */
+ log_sort_flush_list();
+
#ifdef HAVE_PMEM
- else if (log_sys.is_pmem())
+ if (last_batch && log_sys.is_pmem())
mprotect(log_sys.buf, len, PROT_READ | PROT_WRITE);
#endif
@@ -3649,35 +3955,24 @@ next_free_block:
clear();
}
-/** Check whether the number of read redo log blocks exceeds the maximum.
-@return whether the memory is exhausted */
-inline bool recv_sys_t::is_memory_exhausted()
-{
- if (UT_LIST_GET_LEN(blocks) * 3 < buf_pool.get_n_pages())
- return false;
- DBUG_PRINT("ib_log",("Ran out of memory and last stored lsn " LSN_PF
- " last stored offset %zu\n", lsn, offset));
- return true;
-}
-
/** Scan log_t::FORMAT_10_8 log store records to the parsing buffer.
@param last_phase whether changes can be applied to the tablespaces
@return whether rescan is needed (not everything was stored) */
static bool recv_scan_log(bool last_phase)
{
DBUG_ENTER("recv_scan_log");
- DBUG_ASSERT(!last_phase || recv_sys.file_checkpoint);
ut_ad(log_sys.is_latest());
const size_t block_size_1{log_sys.get_block_size() - 1};
mysql_mutex_lock(&recv_sys.mutex);
- recv_sys.clear();
ut_d(recv_sys.after_apply= last_phase);
- ut_ad(!last_phase || recv_sys.file_checkpoint);
+ if (!last_phase)
+ recv_sys.clear();
+ else
+ ut_ad(recv_sys.file_checkpoint);
- store_t store= last_phase
- ? STORE_IF_EXISTS : recv_sys.file_checkpoint ? STORE_YES : STORE_NO;
+ bool store{recv_sys.file_checkpoint != 0};
size_t buf_size= log_sys.buf_size;
#ifdef HAVE_PMEM
if (log_sys.is_pmem())
@@ -3694,6 +3989,7 @@ static bool recv_scan_log(bool last_phase)
recv_sys.len= 0;
}
+ lsn_t rewound_lsn= 0;
for (ut_d(lsn_t source_offset= 0);;)
{
#ifndef SUX_LOCK_GENERIC
@@ -3741,27 +4037,29 @@ static bool recv_scan_log(bool last_phase)
if (UNIV_UNLIKELY(!recv_needed_recovery))
{
- ut_ad(store == (recv_sys.file_checkpoint ? STORE_YES : STORE_NO));
+ ut_ad(!last_phase);
ut_ad(recv_sys.lsn >= log_sys.next_checkpoint_lsn);
- for (;;)
+ if (!store)
{
- const byte& b{log_sys.buf[recv_sys.offset]};
- r= recv_sys.parse_pmem(store);
- if (r == recv_sys_t::OK)
+ ut_ad(!recv_sys.file_checkpoint);
+ for (;;)
{
- if (store == STORE_NO &&
- (b == FILE_CHECKPOINT + 2 + 8 || (b & 0xf0) == FILE_MODIFY))
- continue;
- }
- else if (r == recv_sys_t::PREMATURE_EOF)
- goto read_more;
- else if (store != STORE_NO)
- break;
+ const byte& b{log_sys.buf[recv_sys.offset]};
+ r= recv_sys.parse_pmem<false>(false);
+ switch (r) {
+ case recv_sys_t::PREMATURE_EOF:
+ goto read_more;
+ default:
+ ut_ad(r == recv_sys_t::GOT_EOF);
+ break;
+ case recv_sys_t::OK:
+ if (b == FILE_CHECKPOINT + 2 + 8 || (b & 0xf0) == FILE_MODIFY)
+ continue;
+ }
- if (store == STORE_NO)
- {
const lsn_t end{recv_sys.file_checkpoint};
+ ut_ad(!end || end == recv_sys.lsn);
mysql_mutex_unlock(&recv_sys.mutex);
if (!end)
@@ -3771,45 +4069,73 @@ static bool recv_scan_log(bool last_phase)
") at " LSN_PF, log_sys.next_checkpoint_lsn,
recv_sys.lsn);
}
- else
- ut_ad(end == recv_sys.lsn);
DBUG_RETURN(true);
}
-
- recv_needed_recovery= true;
- if (srv_read_only_mode)
- {
- mysql_mutex_unlock(&recv_sys.mutex);
- DBUG_RETURN(false);
+ }
+ else
+ {
+ ut_ad(recv_sys.file_checkpoint != 0);
+ switch ((r= recv_sys.parse_pmem<true>(false))) {
+ case recv_sys_t::PREMATURE_EOF:
+ goto read_more;
+ case recv_sys_t::GOT_EOF:
+ break;
+ default:
+ ut_ad(r == recv_sys_t::OK);
+ recv_needed_recovery= true;
+ if (srv_read_only_mode)
+ {
+ mysql_mutex_unlock(&recv_sys.mutex);
+ DBUG_RETURN(false);
+ }
+ sql_print_information("InnoDB: Starting crash recovery from"
+ " checkpoint LSN=" LSN_PF,
+ log_sys.next_checkpoint_lsn);
}
- sql_print_information("InnoDB: Starting crash recovery from"
- " checkpoint LSN=" LSN_PF,
- log_sys.next_checkpoint_lsn);
- break;
}
}
- while ((r= recv_sys.parse_pmem(store)) == recv_sys_t::OK)
+ if (!store)
+ skip_the_rest:
+ while ((r= recv_sys.parse_pmem<false>(false)) == recv_sys_t::OK);
+ else
{
- if (store != STORE_NO && recv_sys.is_memory_exhausted())
- {
- ut_ad(last_phase == (store == STORE_IF_EXISTS));
- if (store == STORE_YES)
+ uint16_t count= 0;
+ while ((r= recv_sys.parse_pmem<true>(last_phase)) == recv_sys_t::OK)
+ if (!++count && recv_sys.report(time(nullptr)))
{
- store= STORE_NO;
- recv_sys.last_stored_lsn= recv_sys.lsn;
- }
- else
- {
- ut_ad(store == STORE_IF_EXISTS);
- recv_sys.apply(false);
+ const size_t n= recv_sys.pages.size();
+ sql_print_information("InnoDB: Parsed redo log up to LSN=" LSN_PF
+ "; to recover: %zu pages", recv_sys.lsn, n);
+ service_manager_extend_timeout(INNODB_EXTEND_TIMEOUT_INTERVAL,
+ "Parsed redo log up to LSN=" LSN_PF
+ "; to recover: %zu pages",
+ recv_sys.lsn, n);
}
+ if (r == recv_sys_t::GOT_OOM)
+ {
+ ut_ad(!last_phase);
+ rewound_lsn= recv_sys.lsn;
+ store= false;
+ if (recv_sys.scanned_lsn <= 1)
+ goto skip_the_rest;
+ ut_ad(recv_sys.file_checkpoint);
+ goto func_exit;
}
}
if (r != recv_sys_t::PREMATURE_EOF)
{
ut_ad(r == recv_sys_t::GOT_EOF);
+ got_eof:
+ ut_ad(recv_sys.is_initialised());
+ if (recv_sys.scanned_lsn > 1)
+ {
+ ut_ad(recv_sys.scanned_lsn == recv_sys.lsn);
+ break;
+ }
+ recv_sys.scanned_lsn= recv_sys.lsn;
+ sql_print_information("InnoDB: End of log at LSN=" LSN_PF, recv_sys.lsn);
break;
}
@@ -3822,7 +4148,7 @@ static bool recv_scan_log(bool last_phase)
break;
if (recv_sys.offset < log_sys.get_block_size())
- break;
+ goto got_eof;
if (recv_sys.offset > buf_size / 4 ||
(recv_sys.offset > block_size_1 &&
@@ -3835,21 +4161,21 @@ static bool recv_scan_log(bool last_phase)
}
}
- const bool corrupt= recv_sys.is_corrupt_log() || recv_sys.is_corrupt_fs();
- recv_sys.maybe_finish_batch();
if (last_phase)
+ {
+ ut_ad(!rewound_lsn);
+ ut_ad(recv_sys.lsn >= recv_sys.file_checkpoint);
log_sys.set_recovered_lsn(recv_sys.lsn);
+ }
+ else if (rewound_lsn)
+ {
+ ut_ad(!store);
+ ut_ad(recv_sys.file_checkpoint);
+ recv_sys.lsn= rewound_lsn;
+ }
+func_exit:
mysql_mutex_unlock(&recv_sys.mutex);
-
- if (corrupt)
- DBUG_RETURN(false);
-
- DBUG_PRINT("ib_log",
- ("%s " LSN_PF " completed", last_phase ? "rescan" : "scan",
- recv_sys.lsn));
- ut_ad(!last_phase || recv_sys.lsn >= recv_sys.file_checkpoint);
-
- DBUG_RETURN(store == STORE_NO);
+ DBUG_RETURN(!store);
}
/** Report a missing tablespace for which page-redo log exists.
@@ -3945,8 +4271,8 @@ next:
/* fall through */
case file_name_t::DELETED:
recv_sys_t::map::iterator r = p++;
- r->second.log.clear();
- recv_sys.pages.erase(r);
+ recv_sys.pages_it_invalidate(r);
+ recv_sys.erase(r);
continue;
}
ut_ad(0);
@@ -3970,8 +4296,6 @@ func_exit:
continue;
}
- missing_tablespace = true;
-
if (srv_force_recovery) {
sql_print_warning("InnoDB: Tablespace " UINT32PF
" was not found at %.*s,"
@@ -3991,14 +4315,11 @@ func_exit:
rs.first,
int(rs.second.name.size()),
rs.second.name.data());
+ } else {
+ missing_tablespace = true;
}
}
- if (!rescan || srv_force_recovery > 0) {
- missing_tablespace = false;
- }
-
- err = DB_SUCCESS;
goto func_exit;
}
@@ -4232,35 +4553,41 @@ read_only_recovery:
goto early_exit;
}
- /* If there is any missing tablespace and rescan is needed
- then there is a possiblity that hash table will not contain
- all space ids redo logs. Rescan the remaining unstored
- redo logs for the validation of missing tablespace. */
- ut_ad(rescan || !missing_tablespace);
+ if (missing_tablespace) {
+ ut_ad(rescan);
+ /* If any tablespaces seem to be missing,
+ validate the remaining log records. */
- while (missing_tablespace) {
- recv_sys.lsn = recv_sys.last_stored_lsn;
- DBUG_PRINT("ib_log", ("Rescan of redo log to validate "
- "the missing tablespace. Scan "
- "from last stored LSN " LSN_PF,
- recv_sys.lsn));
- rescan = recv_scan_log(false);
- ut_ad(!recv_sys.is_corrupt_fs());
+ do {
+ rescan = recv_scan_log(false);
+ ut_ad(!recv_sys.is_corrupt_fs());
- missing_tablespace = false;
+ if (recv_sys.is_corrupt_log()) {
+ goto err_exit;
+ }
- if (recv_sys.is_corrupt_log()) {
- goto err_exit;
- }
+ missing_tablespace = false;
- err = recv_validate_tablespace(
- rescan, missing_tablespace);
+ err = recv_validate_tablespace(
+ rescan, missing_tablespace);
- if (err != DB_SUCCESS) {
- goto early_exit;
- }
+ if (err != DB_SUCCESS) {
+ goto early_exit;
+ }
+ } while (missing_tablespace);
rescan = true;
+ /* Because in the loop above we overwrote the
+ initially stored recv_sys.pages, we must
+ restart parsing the log from the very beginning. */
+
+ /* FIXME: Use a separate loop for checking for
+ tablespaces (not individual pages), while retaining
+ the initial recv_sys.pages. */
+ mysql_mutex_lock(&recv_sys.mutex);
+ recv_sys.clear();
+ recv_sys.lsn = log_sys.next_checkpoint_lsn;
+ mysql_mutex_unlock(&recv_sys.mutex);
}
if (srv_operation <= SRV_OPERATION_EXPORT_RESTORED) {
@@ -4271,8 +4598,7 @@ read_only_recovery:
ut_ad(srv_force_recovery <= SRV_FORCE_NO_UNDO_LOG_SCAN);
if (rescan) {
- recv_sys.lsn = log_sys.next_checkpoint_lsn;
- rescan = recv_scan_log(true);
+ recv_scan_log(true);
if ((recv_sys.is_corrupt_log()
&& !srv_force_recovery)
|| recv_sys.is_corrupt_fs()) {
diff --git a/storage/innobase/os/os0file.cc b/storage/innobase/os/os0file.cc
index aafa4361b0b..217cf153b59 100644
--- a/storage/innobase/os/os0file.cc
+++ b/storage/innobase/os/os0file.cc
@@ -3411,15 +3411,12 @@ os_file_get_status(
return(ret);
}
-
-extern void fil_aio_callback(const IORequest &request);
-
-static void io_callback(tpool::aiocb *cb)
+static void io_callback_errorcheck(const tpool::aiocb *cb)
{
- const IORequest &request= *static_cast<const IORequest*>
- (static_cast<const void*>(cb->m_userdata));
if (cb->m_err != DB_SUCCESS)
{
+ const IORequest &request= *static_cast<const IORequest*>
+ (static_cast<const void*>(cb->m_userdata));
ib::fatal() << "IO Error: " << cb->m_err << " during " <<
(request.is_async() ? "async " : "sync ") <<
(request.is_LRU() ? "lru " : "") <<
@@ -3427,19 +3424,36 @@ static void io_callback(tpool::aiocb *cb)
" of " << cb->m_len << " bytes, for file " << cb->m_fh << ", returned " <<
cb->m_ret_len;
}
- /* Return cb back to cache*/
- if (cb->m_opcode == tpool::aio_opcode::AIO_PREAD)
- {
- ut_ad(read_slots->contains(cb));
- fil_aio_callback(request);
- read_slots->release(cb);
- }
- else
- {
- ut_ad(write_slots->contains(cb));
- fil_aio_callback(request);
- write_slots->release(cb);
- }
+}
+
+static void fake_io_callback(void *c)
+{
+ tpool::aiocb *cb= static_cast<tpool::aiocb*>(c);
+ ut_ad(read_slots->contains(cb));
+ static_cast<const IORequest*>(static_cast<const void*>(cb->m_userdata))->
+ fake_read_complete(cb->m_offset);
+ read_slots->release(cb);
+}
+
+static void read_io_callback(void *c)
+{
+ tpool::aiocb *cb= static_cast<tpool::aiocb*>(c);
+ ut_ad(cb->m_opcode == tpool::aio_opcode::AIO_PREAD);
+ io_callback_errorcheck(cb);
+ ut_ad(read_slots->contains(cb));
+ static_cast<const IORequest*>
+ (static_cast<const void*>(cb->m_userdata))->read_complete();
+ read_slots->release(cb);
+}
+
+static void write_io_callback(void *c)
+{
+ tpool::aiocb *cb= static_cast<tpool::aiocb*>(c);
+ ut_ad(cb->m_opcode == tpool::aio_opcode::AIO_PWRITE);
+ ut_ad(write_slots->contains(cb));
+ static_cast<const IORequest*>
+ (static_cast<const void*>(cb->m_userdata))->write_complete();
+ write_slots->release(cb);
}
#ifdef LINUX_NATIVE_AIO
@@ -3684,6 +3698,28 @@ void os_aio_wait_until_no_pending_reads(bool declare)
tpool::tpool_wait_end();
}
+/** Submit a fake read request during crash recovery.
+@param type fake read request
+@param offset additional context */
+void os_fake_read(const IORequest &type, os_offset_t offset)
+{
+ tpool::aiocb *cb= read_slots->acquire();
+
+ cb->m_group= read_slots->get_task_group();
+ cb->m_fh= type.node->handle.m_file;
+ cb->m_buffer= nullptr;
+ cb->m_len= 0;
+ cb->m_offset= offset;
+ cb->m_opcode= tpool::aio_opcode::AIO_PREAD;
+ new (cb->m_userdata) IORequest{type};
+ cb->m_internal_task.m_func= fake_io_callback;
+ cb->m_internal_task.m_arg= cb;
+ cb->m_internal_task.m_group= cb->m_group;
+
+ srv_thread_pool->submit_task(&cb->m_internal_task);
+}
+
+
/** Request a read or write.
@param type I/O request
@param buf buffer
@@ -3729,23 +3765,32 @@ func_exit:
return err;
}
+ io_slots* slots;
+ tpool::callback_func callback;
+ tpool::aio_opcode opcode;
+
if (type.is_read()) {
++os_n_file_reads;
+ slots = read_slots;
+ callback = read_io_callback;
+ opcode = tpool::aio_opcode::AIO_PREAD;
} else {
++os_n_file_writes;
+ slots = write_slots;
+ callback = write_io_callback;
+ opcode = tpool::aio_opcode::AIO_PWRITE;
}
compile_time_assert(sizeof(IORequest) <= tpool::MAX_AIO_USERDATA_LEN);
- io_slots* slots= type.is_read() ? read_slots : write_slots;
tpool::aiocb* cb = slots->acquire();
cb->m_buffer = buf;
- cb->m_callback = (tpool::callback_func)io_callback;
+ cb->m_callback = callback;
cb->m_group = slots->get_task_group();
cb->m_fh = type.node->handle.m_file;
cb->m_len = (int)n;
cb->m_offset = offset;
- cb->m_opcode = type.is_read() ? tpool::aio_opcode::AIO_PREAD : tpool::aio_opcode::AIO_PWRITE;
+ cb->m_opcode = opcode;
new (cb->m_userdata) IORequest{type};
if (srv_thread_pool->submit_io(cb)) {
@@ -3753,6 +3798,7 @@ func_exit:
os_file_handle_error(type.node->name, type.is_read()
? "aio read" : "aio write");
err = DB_IO_ERROR;
+ type.node->space->release();
}
goto func_exit;