summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarko Mäkelä <marko.makela@mariadb.com>2023-04-26 15:05:03 +0300
committerMarko Mäkelä <marko.makela@mariadb.com>2023-04-26 15:05:03 +0300
commit1c6792d19e48381a12cc3b34598949b4159843a3 (patch)
treeed8325995e792a1443464db1b310909f52ef1c25
parentc15c8ef3e357a3e01f775f20027fdac9b227099f (diff)
downloadmariadb-git-bb-10.8-MDEV-29911.tar.gz
MDEV-29911 InnoDB recovery and mariadb-backup --prepare fail to report detailed progressbb-10.8-MDEV-29911
The progress reporting of InnoDB crash recovery was rather intermittent. Nothing was reported during the single-threaded log record parsing, which could consume minutes when parsing a large log. During log application, there only was progress reporting in background threads that would be invoked on data page read completion. The progress reporting here will be detailed like this: InnoDB: Starting crash recovery from checkpoint LSN=503549688 InnoDB: Parsed redo log up to LSN=1990840177; to recover: 124806 pages InnoDB: Parsed redo log up to LSN=2729777071; to recover: 186123 pages InnoDB: Parsed redo log up to LSN=3488599173; to recover: 248397 pages InnoDB: Parsed redo log up to LSN=4177856618; to recover: 306469 pages InnoDB: Multi-batch recovery needed at LSN 4189599815 InnoDB: End of log at LSN=4483551634 InnoDB: To recover: LSN 4189599815/4483551634; 307490 pages InnoDB: To recover: LSN 4189599815/4483551634; 197159 pages InnoDB: To recover: LSN 4189599815/4483551634; 67623 pages InnoDB: Parsed redo log up to LSN=4353924218; to recover: 102083 pages ... InnoDB: log sequence number 4483551634 ... The previous messages "Starting a batch to recover" or "Starting a final batch to recover" will be replaced by "To recover: ... pages" messages. If a batch lasts longer than 15 seconds, then there will be progress reports every 15 seconds, showing the number of remaining pages. For the non-final batch, the "To recover:" message includes two end LSN: that of the batch, and of the recovered log. This is the primary measure of progress. The batch will end once the number of pages to recover reaches 0. If recovery is possible in a single batch, the output will look like this, with a shorter "To recover:" message that counts only the remaining pages: InnoDB: Starting crash recovery from checkpoint LSN=503549688 InnoDB: Parsed redo log up to LSN=1998701027; to recover: 125560 pages InnoDB: Parsed redo log up to LSN=2734136874; to recover: 186446 pages InnoDB: Parsed redo log up to LSN=3499505504; to recover: 249378 pages InnoDB: Parsed redo log up to LSN=4183247844; to recover: 306964 pages InnoDB: End of log at LSN=4483551634 ... InnoDB: To recover: 331797 pages ... InnoDB: log sequence number 4483551634 ... We will also speed up recovery by improving the memory management and implementing multi-threaded recovery of data pages that will not need to be read into the buffer pool ("fake read"). Log application in the "fake read" threads will be protected by an atomic being_recovered field and exclusive buf_page_t::latch. Recovery will reserve for data pages two thirds of the buffer pool, or 256 pages, whichever is smaller. Previously, we could only use at most one third of the buffer pool for buffered log records. This would typically mean that with large buffer pools, recovery unnecessary consisted of multiple batches. If recovery runs out of memory, it will "roll back" or "rewind" the current mini-transaction. The recv_sys.lsn and recv_sys.pages will correspond to the "out of memory LSN", at the end of the previous complete mini-transaction. If recovery runs out of memory while executing the final recovery batch, we can simply invoke recv_sys.apply(false) to make room, and resume parsing. If recovery runs out of memory before the final batch, we will scan the redo log to the end (recv_sys.scanned_lsn) and check for any missing or inconsistent files. If recv_init_crash_recovery_spaces() does not report any potentially missing tablespaces, we can make use of the already stored recv_sys.pages and only rewind to the "out of memory LSN". Else, we must keep parsing and invoking recv_validate_tablespace() until an error has been found or everything has been resolved, and ultimatily rewind to to the checkpoint LSN. recv_sys_t::pages_it: A cached iterator to recv_sys.pages recv_sys_t::parse_mtr(): Remove an ATTRIBUTE_NOINLINE that would prevent tail call optimization in recv_sys_t::parse_pmem(). recv_sys_t::parse(), recv_sys_t::parse_mtr(), recv_sys_t::parse_pmem(): Add template<bool store> parameter. Redo log record parsing (store=false) is better specialized from store=true (with bool if_exists) so that we can avoid some conditional branches in frequently invoked low-level code. recv_sys_t::is_memory_exhausted(): Remove. The special parse() status GOT_OOM will report out-of-memory situation at the low level. recv_sys_t::rewind(), page_recv_t::recs_t::rewind(): Remove all log starting with a specific LSN. recv_scan_log(): Separate some code for only parsing, not storing log. In rewound_lsn, remember the LSN at which last_phase=false recovery ran out of memory. This is where the next call to recv_scan_log() will resume storing the log. This replaces recv_sys.last_stored_lsn. recv_sys_t::parse(): Evaluate the template parameter store in a few more cases, to allow dead code to be eliminated at compile time. recv_sys_t::scanned_lsn: The end of the log found by recv_scan_log(). The special value 1 means that recv_sys has been initialized but no log has been parsed. IORequest::write_complete(), IORequest::read_complete(): Replaces fil_aio_callback(). read_io_callback(), write_io_callback(): Replaces io_callback(). IORequest::fake_read_complete(), fake_io_callback(), os_fake_read(): Process a "fake read" request for concurrent recovery. recv_sys_t::apply_batch(): Choose a number of successive pages for a recovery batch. recv_sys_t::erase(recv_sys_t::map::iterator): Remove log records for a page whose recovery is not in progress. Log application threads will not invoke this; they will only set being_recovered=-1 to indicate that the entry is no longer needed. recv_sys_t::garbage_collect(): Remove all being_recovered=-1 entries. recv_sys_t::wait_for_pool(): Wait for some space to become available in the buffer pool. mlog_init_t::mark_ibuf_exist(): Avoid calls to recv_sys::recover_low() via ibuf_page_exists() and buf_page_get_low(). Such calls would lead to double locking of recv_sys.mutex, which depending on implementation could cause a deadlock. We will use lower-level calls to look up index pages. buf_LRU_block_remove_hashed(): Disable consistency checks for freed ROW_FORMAT=COMPRESSED pages. Their contents could be uninitialized garbage. This fixes an occasional failure of the test innodb.innodb_bulk_create_index_debug.
-rw-r--r--extra/mariabackup/xtrabackup.cc4
-rw-r--r--storage/innobase/buf/buf0dblwr.cc2
-rw-r--r--storage/innobase/buf/buf0flu.cc1
-rw-r--r--storage/innobase/buf/buf0lru.cc9
-rw-r--r--storage/innobase/buf/buf0rea.cc83
-rw-r--r--storage/innobase/fil/fil0fil.cc68
-rw-r--r--storage/innobase/include/buf0buf.h3
-rw-r--r--storage/innobase/include/buf0rea.h11
-rw-r--r--storage/innobase/include/log0recv.h211
-rw-r--r--storage/innobase/include/os0file.h9
-rw-r--r--storage/innobase/log/log0recv.cc1754
-rw-r--r--storage/innobase/os/os0file.cc90
12 files changed, 1325 insertions, 920 deletions
diff --git a/extra/mariabackup/xtrabackup.cc b/extra/mariabackup/xtrabackup.cc
index 6f42a9be05a..180616f37e9 100644
--- a/extra/mariabackup/xtrabackup.cc
+++ b/extra/mariabackup/xtrabackup.cc
@@ -3146,7 +3146,7 @@ static bool xtrabackup_copy_logfile()
if (log_sys.buf[recv_sys.offset] <= 1)
break;
- if (recv_sys.parse_mtr(STORE_NO) == recv_sys_t::OK)
+ if (recv_sys.parse_mtr<false>(false) == recv_sys_t::OK)
{
do
{
@@ -3156,7 +3156,7 @@ static bool xtrabackup_copy_logfile()
sequence_offset));
*seq= 1;
}
- while ((r= recv_sys.parse_mtr(STORE_NO)) == recv_sys_t::OK);
+ while ((r= recv_sys.parse_mtr<false>(false)) == recv_sys_t::OK);
if (ds_write(dst_log_file, log_sys.buf + start_offset,
recv_sys.offset - start_offset))
diff --git a/storage/innobase/buf/buf0dblwr.cc b/storage/innobase/buf/buf0dblwr.cc
index 1260145ed1c..510ad02256d 100644
--- a/storage/innobase/buf/buf0dblwr.cc
+++ b/storage/innobase/buf/buf0dblwr.cc
@@ -372,7 +372,7 @@ void buf_dblwr_t::recover()
const uint32_t space_id= page_get_space_id(page);
const page_id_t page_id(space_id, page_no);
- if (recv_sys.lsn < lsn)
+ if (recv_sys.scanned_lsn < lsn)
{
ib::info() << "Ignoring a doublewrite copy of page " << page_id
<< " with future log sequence number " << lsn;
diff --git a/storage/innobase/buf/buf0flu.cc b/storage/innobase/buf/buf0flu.cc
index 691993c819e..748d722b84e 100644
--- a/storage/innobase/buf/buf0flu.cc
+++ b/storage/innobase/buf/buf0flu.cc
@@ -2477,6 +2477,7 @@ ATTRIBUTE_COLD void buf_flush_page_cleaner_init()
/** Flush the buffer pool on shutdown. */
ATTRIBUTE_COLD void buf_flush_buffer_pool()
{
+ ut_ad(!os_aio_pending_reads());
ut_ad(!buf_page_cleaner_is_active);
ut_ad(!buf_flush_sync_lsn);
diff --git a/storage/innobase/buf/buf0lru.cc b/storage/innobase/buf/buf0lru.cc
index 724aa641f12..8a25e9c5266 100644
--- a/storage/innobase/buf/buf0lru.cc
+++ b/storage/innobase/buf/buf0lru.cc
@@ -1093,7 +1093,11 @@ static bool buf_LRU_block_remove_hashed(buf_page_t *bpage, const page_id_t id,
ut_a(!zip || !bpage->oldest_modification());
ut_ad(bpage->zip_size());
-
+ /* Skip consistency checks if the page was freed.
+ In recovery, we could get a sole FREE_PAGE record
+ and nothing else, for a ROW_FORMAT=COMPRESSED page.
+ Its contents would be garbage. */
+ if (!bpage->is_freed())
switch (fil_page_get_type(page)) {
case FIL_PAGE_TYPE_ALLOCATED:
case FIL_PAGE_INODE:
@@ -1224,6 +1228,7 @@ void buf_pool_t::corrupted_evict(buf_page_t *bpage, uint32_t state)
buf_pool_t::hash_chain &chain= buf_pool.page_hash.cell_get(id.fold());
page_hash_latch &hash_lock= buf_pool.page_hash.lock_get(chain);
+ recv_sys.free_corrupted_page(id);
mysql_mutex_lock(&mutex);
hash_lock.lock();
@@ -1248,8 +1253,6 @@ void buf_pool_t::corrupted_evict(buf_page_t *bpage, uint32_t state)
buf_LRU_block_free_hashed_page(reinterpret_cast<buf_block_t*>(bpage));
mysql_mutex_unlock(&mutex);
-
- recv_sys.free_corrupted_page(id);
}
/** Update buf_pool.LRU_old_ratio.
diff --git a/storage/innobase/buf/buf0rea.cc b/storage/innobase/buf/buf0rea.cc
index 39ecd5de27f..9c051203745 100644
--- a/storage/innobase/buf/buf0rea.cc
+++ b/storage/innobase/buf/buf0rea.cc
@@ -649,60 +649,35 @@ failed:
return count;
}
-/** @return whether a page has been freed */
-inline bool fil_space_t::is_freed(uint32_t page)
+/** Schedule a page for recovery.
+@param space tablespace
+@param page_id page identifier
+@param recs log records
+@param init page initialization, or nullptr if the page needs to be read */
+void buf_read_recover(fil_space_t *space, const page_id_t page_id,
+ page_recv_t &recs, recv_init *init)
{
- std::lock_guard<std::mutex> freed_lock(freed_range_mutex);
- return freed_ranges.contains(page);
-}
-
-/** Issues read requests for pages which recovery wants to read in.
-@param space_id tablespace identifier
-@param page_nos page numbers to read, in ascending order */
-void buf_read_recv_pages(uint32_t space_id, st_::span<uint32_t> page_nos)
-{
- fil_space_t* space = fil_space_t::get(space_id);
-
- if (!space) {
- /* The tablespace is missing or unreadable: do nothing */
- return;
- }
-
- const ulint zip_size = space->zip_size();
-
- for (ulint i = 0; i < page_nos.size(); i++) {
-
- /* Ignore if the page already present in freed ranges. */
- if (space->is_freed(page_nos[i])) {
- continue;
- }
-
- const page_id_t cur_page_id(space_id, page_nos[i]);
-
- ulint limit = 0;
- for (ulint j = 0; j < buf_pool.n_chunks; j++) {
- limit += buf_pool.chunks[j].size / 2;
- }
+ ut_ad(space->id == page_id.space());
+ space->reacquire();
+ const ulint zip_size= space->zip_size();
- if (os_aio_pending_reads() >= limit) {
- os_aio_wait_until_no_pending_reads(false);
- }
-
- space->reacquire();
- switch (buf_read_page_low(space, false, BUF_READ_ANY_PAGE,
- cur_page_id, zip_size, true)) {
- case DB_SUCCESS: case DB_SUCCESS_LOCKED_REC:
- break;
- default:
- sql_print_error("InnoDB: Recovery failed to read page "
- UINT32PF " from %s",
- cur_page_id.page_no(),
- space->chain.start->name);
- }
- }
-
-
- DBUG_PRINT("ib_buf", ("recovery read (%zu pages) for %s",
- page_nos.size(), space->chain.start->name));
- space->release();
+ if (init)
+ {
+ if (buf_page_t *bpage= buf_page_init_for_read(BUF_READ_ANY_PAGE, page_id,
+ zip_size, true))
+ {
+ ut_ad(bpage->in_file());
+ os_fake_read(IORequest{bpage, (buf_tmp_buffer_t*) &recs,
+ UT_LIST_GET_FIRST(space->chain),
+ IORequest::READ_ASYNC}, ptrdiff_t(init));
+ }
+ }
+ else if (dberr_t err= buf_read_page_low(space, false, BUF_READ_ANY_PAGE,
+ page_id, zip_size, true))
+ {
+ if (err != DB_SUCCESS_LOCKED_REC)
+ sql_print_error("InnoDB: Recovery failed to read page "
+ UINT32PF " from %s",
+ page_id.page_no(), space->chain.start->name);
+ }
}
diff --git a/storage/innobase/fil/fil0fil.cc b/storage/innobase/fil/fil0fil.cc
index be313140225..e4b352a05aa 100644
--- a/storage/innobase/fil/fil0fil.cc
+++ b/storage/innobase/fil/fil0fil.cc
@@ -2775,53 +2775,55 @@ func_exit:
#include <tpool.h>
-/** Callback for AIO completion */
-void fil_aio_callback(const IORequest &request)
+void IORequest::write_complete() const
{
ut_ad(fil_validate_skip());
- ut_ad(request.node);
+ ut_ad(node);
+ ut_ad(is_write());
- if (!request.bpage)
+ if (!bpage)
{
ut_ad(!srv_read_only_mode);
- if (request.type == IORequest::DBLWR_BATCH)
- buf_dblwr.flush_buffered_writes_completed(request);
+ if (type == IORequest::DBLWR_BATCH)
+ buf_dblwr.flush_buffered_writes_completed(*this);
else
- ut_ad(request.type == IORequest::WRITE_ASYNC);
-write_completed:
- request.node->complete_write();
- }
- else if (request.is_write())
- {
- buf_page_write_complete(request);
- goto write_completed;
+ ut_ad(type == IORequest::WRITE_ASYNC);
}
else
- {
- ut_ad(request.is_read());
+ buf_page_write_complete(*this);
- /* IMPORTANT: since i/o handling for reads will read also the insert
- buffer in fil_system.sys_space, we have to be very careful not to
- introduce deadlocks. We never close fil_system.sys_space data
- files and never issue asynchronous reads of change buffer pages. */
- const page_id_t id(request.bpage->id());
+ node->complete_write();
+ node->space->release();
+}
- if (dberr_t err= request.bpage->read_complete(*request.node))
- {
- if (recv_recovery_is_on() && !srv_force_recovery)
- {
- mysql_mutex_lock(&recv_sys.mutex);
- recv_sys.set_corrupt_fs();
- mysql_mutex_unlock(&recv_sys.mutex);
- }
+void IORequest::read_complete() const
+{
+ ut_ad(fil_validate_skip());
+ ut_ad(node);
+ ut_ad(is_read());
+ ut_ad(bpage);
+
+ /* IMPORTANT: since i/o handling for reads will read also the insert
+ buffer in fil_system.sys_space, we have to be very careful not to
+ introduce deadlocks. We never close fil_system.sys_space data files
+ and never issue asynchronous reads of change buffer pages. */
+ const page_id_t id(bpage->id());
- if (err != DB_FAIL)
- ib::error() << "Failed to read page " << id.page_no()
- << " from file '" << request.node->name << "': " << err;
+ if (dberr_t err= bpage->read_complete(*node))
+ {
+ if (recv_recovery_is_on() && !srv_force_recovery)
+ {
+ mysql_mutex_lock(&recv_sys.mutex);
+ recv_sys.set_corrupt_fs();
+ mysql_mutex_unlock(&recv_sys.mutex);
}
+
+ if (err != DB_FAIL)
+ ib::error() << "Failed to read page " << id.page_no()
+ << " from file '" << node->name << "': " << err;
}
- request.node->space->release();
+ node->space->release();
}
/** Flush to disk the writes in file spaces of the given type
diff --git a/storage/innobase/include/buf0buf.h b/storage/innobase/include/buf0buf.h
index 4295c3ba342..6d3ec65b1d3 100644
--- a/storage/innobase/include/buf0buf.h
+++ b/storage/innobase/include/buf0buf.h
@@ -75,8 +75,7 @@ struct buf_pool_info_t
ulint flush_list_len; /*!< Length of buf_pool.flush_list */
ulint n_pend_unzip; /*!< buf_pool.n_pend_unzip, pages
pending decompress */
- ulint n_pend_reads; /*!< buf_pool.n_pend_reads, pages
- pending read */
+ ulint n_pend_reads; /*!< os_aio_pending_reads() */
ulint n_pending_flush_lru; /*!< Pages pending flush in LRU */
ulint n_pending_flush_list; /*!< Pages pending flush in FLUSH
LIST */
diff --git a/storage/innobase/include/buf0rea.h b/storage/innobase/include/buf0rea.h
index 4ec8938c689..3dd085dda5c 100644
--- a/storage/innobase/include/buf0rea.h
+++ b/storage/innobase/include/buf0rea.h
@@ -102,10 +102,13 @@ which could result in a deadlock if the OS does not support asynchronous io.
ulint
buf_read_ahead_linear(const page_id_t page_id, ulint zip_size, bool ibuf);
-/** Issue read requests for pages that need to be recovered.
-@param space_id tablespace identifier
-@param page_nos page numbers to read, in ascending order */
-void buf_read_recv_pages(uint32_t space_id, st_::span<uint32_t> page_nos);
+/** Schedule a page for recovery.
+@param space tablespace
+@param page_id page identifier
+@param recs log records
+@param init page initialization, or nullptr if the page needs to be read */
+void buf_read_recover(fil_space_t *space, const page_id_t page_id,
+ page_recv_t &recs, recv_init *init);
/** @name Modes used in read-ahead @{ */
/** read only pages belonging to the insert buffer tree */
diff --git a/storage/innobase/include/log0recv.h b/storage/innobase/include/log0recv.h
index e787d81e8c2..e642b501409 100644
--- a/storage/innobase/include/log0recv.h
+++ b/storage/innobase/include/log0recv.h
@@ -38,9 +38,9 @@ Created 9/20/1997 Heikki Tuuri
#define recv_recovery_is_on() UNIV_UNLIKELY(recv_sys.recovery_on)
ATTRIBUTE_COLD MY_ATTRIBUTE((nonnull, warn_unused_result))
-/** Apply any buffered redo log to a page that was just read from a data file.
-@param[in,out] space tablespace
-@param[in,out] bpage buffer pool page
+/** Apply any buffered redo log to a page.
+@param space tablespace
+@param bpage buffer pool page
@return whether the page was recovered correctly */
bool recv_recover_page(fil_space_t* space, buf_page_t* bpage);
@@ -49,17 +49,6 @@ of first system tablespace page
@return error code or DB_SUCCESS */
dberr_t recv_recovery_from_checkpoint_start();
-/** Whether to store redo log records in recv_sys.pages */
-enum store_t {
- /** Do not store redo log records. */
- STORE_NO,
- /** Store redo log records. */
- STORE_YES,
- /** Store redo log records if the tablespace exists. */
- STORE_IF_EXISTS
-};
-
-
/** Report an operation to create, delete, or rename a file during backup.
@param[in] space_id tablespace identifier
@param[in] type file operation redo log type
@@ -125,21 +114,15 @@ struct recv_dblwr_t
list pages;
};
-/** the recovery state and buffered records for a page */
+/** recv_sys.pages entry; protected by recv_sys.mutex */
struct page_recv_t
{
- /** Recovery state; protected by recv_sys.mutex */
- enum
- {
- /** not yet processed */
- RECV_NOT_PROCESSED,
- /** not processed; the page will be reinitialized */
- RECV_WILL_NOT_READ,
- /** page is being read */
- RECV_BEING_READ,
- /** log records are being applied on the page */
- RECV_BEING_PROCESSED
- } state= RECV_NOT_PROCESSED;
+ /** Recovery status: 0=not in progress, 1=log is being applied,
+ -1=log has been applied and the entry may be erased.
+ Transitions from 1 to -1 are NOT protected by recv_sys.mutex. */
+ Atomic_relaxed<int8_t> being_processed{0};
+ /** Whether reading the page will be skipped */
+ bool skip_read= false;
/** Latest written byte offset when applying the log records.
@see mtr_t::m_last_offset */
uint16_t last_offset= 1;
@@ -162,6 +145,9 @@ struct page_recv_t
head= recs;
tail= recs;
}
+ /** Remove the last records for the page
+ @param start_lsn start of the removed log */
+ ATTRIBUTE_COLD void rewind(lsn_t start_lsn);
/** @return the last log snippet */
const log_rec_t* last() const { return tail; }
@@ -180,8 +166,8 @@ struct page_recv_t
iterator begin() { return head; }
iterator end() { return NULL; }
bool empty() const { ut_ad(!head == !tail); return !head; }
- /** Clear and free the records; @see recv_sys_t::alloc() */
- inline void clear();
+ /** Clear and free the records; @see recv_sys_t::add() */
+ void clear();
} log;
/** Trim old log records for a page.
@@ -190,21 +176,27 @@ struct page_recv_t
inline bool trim(lsn_t start_lsn);
/** Ignore any earlier redo log records for this page. */
inline void will_not_read();
- /** @return whether the log records for the page are being processed */
- bool is_being_processed() const { return state == RECV_BEING_PROCESSED; }
+};
+
+/** A page initialization operation that was parsed from the redo log */
+struct recv_init
+{
+ /** log sequence number of the page initialization */
+ lsn_t lsn;
+ /** Whether btr_page_create() avoided a read of the page.
+ At the end of the last recovery batch, mark_ibuf_exist()
+ will mark pages for which this flag is set. */
+ bool created;
};
/** Recovery system data structure */
struct recv_sys_t
{
- /** mutex protecting apply_log_recs and page_recv_t::state */
- mysql_mutex_t mutex;
+ using init= recv_init;
+
+ /** mutex protecting this as well as some of page_recv_t */
+ alignas(CPU_LEVEL1_DCACHE_LINESIZE) mysql_mutex_t mutex;
private:
- /** condition variable for
- !apply_batch_on || pages.empty() || found_corrupt_log || found_corrupt_fs */
- pthread_cond_t cond;
- /** whether recv_apply_hashed_log_recs() is running */
- bool apply_batch_on;
/** set when finding a corrupt log block or record, or there is a
log parsing buffer overflow */
bool found_corrupt_log;
@@ -226,6 +218,8 @@ public:
size_t offset;
/** log sequence number of the first non-parsed record */
lsn_t lsn;
+ /** log sequence number of the last parsed mini-transaction */
+ lsn_t scanned_lsn;
/** log sequence number at the end of the FILE_CHECKPOINT record, or 0 */
lsn_t file_checkpoint;
/** the time when progress was last reported */
@@ -238,6 +232,9 @@ public:
map pages;
private:
+ /** iterator to pages, used by parse() */
+ map::iterator pages_it;
+
/** Process a record that indicates that a tablespace size is being shrunk.
@param page_id first page that is not in the file
@param lsn log sequence number of the shrink operation */
@@ -257,30 +254,42 @@ public:
/** The contents of the doublewrite buffer */
recv_dblwr_t dblwr;
- /** Last added LSN to pages, before switching to STORE_NO */
- lsn_t last_stored_lsn= 0;
-
inline void read(os_offset_t offset, span<byte> buf);
inline size_t files_size();
void close_files() { files.clear(); files.shrink_to_fit(); }
+ /** Advance pages_it if it matches the iterator */
+ void pages_it_invalidate(const map::iterator &p)
+ {
+ mysql_mutex_assert_owner(&mutex);
+ if (pages_it == p)
+ pages_it++;
+ }
+ /** Invalidate pages_it if it points to the given tablespace */
+ void pages_it_invalidate(uint32_t space_id)
+ {
+ mysql_mutex_assert_owner(&mutex);
+ if (pages_it != pages.end() && pages_it->first.space() == space_id)
+ pages_it= pages.end();
+ }
+
private:
/** Attempt to initialize a page based on redo log records.
- @param page_id page identifier
- @param p iterator pointing to page_id
+ @param p iterator
@param mtr mini-transaction
@param b pre-allocated buffer pool block
+ @param init page initialization
@return the recovered block
@retval nullptr if the page cannot be initialized based on log records
@retval -1 if the page cannot be recovered due to corruption */
- inline buf_block_t *recover_low(const page_id_t page_id, map::iterator &p,
- mtr_t &mtr, buf_block_t *b);
+ inline buf_block_t *recover_low(const map::iterator &p, mtr_t &mtr,
+ buf_block_t *b, init &init);
/** Attempt to initialize a page based on redo log records.
@param page_id page identifier
@return the recovered block
@retval nullptr if the page cannot be initialized based on log records
@retval -1 if the page cannot be recovered due to corruption */
- buf_block_t *recover_low(const page_id_t page_id);
+ ATTRIBUTE_COLD buf_block_t *recover_low(const page_id_t page_id);
/** All found log files (multiple ones are possible if we are upgrading
from before MariaDB Server 10.5.1) */
@@ -289,10 +298,27 @@ private:
/** Base node of the redo block list.
List elements are linked via buf_block_t::unzip_LRU. */
UT_LIST_BASE_NODE_T(buf_block_t) blocks;
+
+ /** Allocate a block from the buffer pool for recv_sys.pages */
+ ATTRIBUTE_COLD buf_block_t *add_block();
+
+ /** Wait for buffer pool to become available.
+ @param pages number of buffer pool pages needed */
+ ATTRIBUTE_COLD void wait_for_pool(size_t pages);
+
+ /** Free log for processed pages. */
+ void garbage_collect();
+
+ /** Apply a recovery batch.
+ @param space_id current tablespace identifier
+ @param space current tablespace
+ @param free_block spare buffer block
+ @param last_batch whether it is possible to write more redo log
+ @return whether the caller must provide a new free_block */
+ bool apply_batch(uint32_t space_id, fil_space_t *&space,
+ buf_block_t *&free_block, bool last_batch);
+
public:
- /** Check whether the number of read redo log blocks exceeds the maximum.
- @return whether the memory is exhausted */
- inline bool is_memory_exhausted();
/** Apply buffered log to persistent data pages.
@param last_batch whether it is possible to write more redo log */
void apply(bool last_batch);
@@ -310,7 +336,7 @@ public:
/** Clean up after create() */
void close();
- bool is_initialised() const { return last_stored_lsn != 0; }
+ bool is_initialised() const { return scanned_lsn != 0; }
/** Find the latest checkpoint.
@return error code or DB_SUCCESS */
@@ -321,60 +347,76 @@ public:
@param start_lsn start LSN of the mini-transaction
@param lsn @see mtr_t::commit_lsn()
@param l redo log snippet
- @param len length of l, in bytes */
- inline void add(map::iterator it, lsn_t start_lsn, lsn_t lsn,
- const byte *l, size_t len);
-
- enum parse_mtr_result { OK, PREMATURE_EOF, GOT_EOF };
+ @param len length of l, in bytes
+ @return whether we ran out of memory */
+ bool add(map::iterator it, lsn_t start_lsn, lsn_t lsn,
+ const byte *l, size_t len);
+
+ /** Parsing result */
+ enum parse_mtr_result {
+ /** a record was successfully parsed */
+ OK,
+ /** the log ended prematurely (need to read more) */
+ PREMATURE_EOF,
+ /** the end of the log was reached */
+ GOT_EOF,
+ /** parse<true>(l, false) ran out of memory */
+ GOT_OOM
+ };
private:
/** Parse and register one log_t::FORMAT_10_8 mini-transaction.
- @param store whether to store the records
- @param l log data source */
+ @tparam store whether to store the records
+ @param l log data source
+ @param if_exists if store: whether to check if the tablespace exists */
+ template<typename source,bool store>
+ inline parse_mtr_result parse(source &l, bool if_exists) noexcept;
+
+ /** Rewind a mini-transaction when parse() runs out of memory.
+ @param l log data source
+ @param begin start of the mini-transaction */
template<typename source>
- inline parse_mtr_result parse(store_t store, source& l) noexcept;
+ ATTRIBUTE_COLD void rewind(source &l, source &begin) noexcept;
+
+ /** Report progress in terms of LSN or pages remaining */
+ ATTRIBUTE_COLD void report_progress() const;
public:
/** Parse and register one log_t::FORMAT_10_8 mini-transaction,
handling log_sys.is_pmem() buffer wrap-around.
- @param store whether to store the records */
- static parse_mtr_result parse_mtr(store_t store) noexcept;
+ @tparam store whether to store the records
+ @param if_exists if store: whether to check if the tablespace exists */
+ template<bool store>
+ static parse_mtr_result parse_mtr(bool if_exists) noexcept;
/** Parse and register one log_t::FORMAT_10_8 mini-transaction,
handling log_sys.is_pmem() buffer wrap-around.
- @param store whether to store the records */
- static parse_mtr_result parse_pmem(store_t store) noexcept
+ @tparam store whether to store the records
+ @param if_exists if store: whether to check if the tablespace exists */
+ template<bool store>
+ static parse_mtr_result parse_pmem(bool if_exists) noexcept
#ifdef HAVE_PMEM
;
#else
- { return parse_mtr(store); }
+ { return parse_mtr<store>(if_exists); }
#endif
+ /** Erase log records for a page. */
+ void erase(map::iterator p);
+
/** Clear a fully processed set of stored redo log records. */
- inline void clear();
+ void clear();
/** Determine whether redo log recovery progress should be reported.
@param time the current time
@return whether progress should be reported
(the last report was at least 15 seconds ago) */
- bool report(time_t time)
- {
- if (time - progress_time < 15)
- return false;
-
- progress_time= time;
- return true;
- }
+ bool report(time_t time);
/** The alloc() memory alignment, in bytes */
static constexpr size_t ALIGNMENT= sizeof(size_t);
- /** Allocate memory for log_rec_t
- @param len allocation size, in bytes
- @return pointer to len bytes of memory (never NULL) */
- inline void *alloc(size_t len);
-
/** Free a redo log snippet.
- @param data buffer returned by alloc() */
+ @param data buffer allocated in add() */
inline void free(const void *data);
/** Remove records for a corrupted page.
@@ -386,8 +428,6 @@ public:
ATTRIBUTE_COLD void set_corrupt_fs();
/** Flag log file corruption during recovery. */
ATTRIBUTE_COLD void set_corrupt_log();
- /** Possibly finish a recovery batch. */
- inline void maybe_finish_batch();
/** @return whether data file corruption was found */
bool is_corrupt_fs() const { return UNIV_UNLIKELY(found_corrupt_fs); }
@@ -405,13 +445,14 @@ public:
}
/** Try to recover a tablespace that was not readable earlier
- @param p iterator, initially pointing to page_id_t{space_id,0};
- the records will be freed and the iterator advanced
+ @param p iterator
@param name tablespace file name
@param free_block spare buffer block
- @return whether recovery failed */
- bool recover_deferred(map::iterator &p, const std::string &name,
- buf_block_t *&free_block);
+ @return recovered tablespace
+ @retval nullptr if recovery failed */
+ fil_space_t *recover_deferred(const map::iterator &p,
+ const std::string &name,
+ buf_block_t *&free_block);
};
/** The recovery system */
diff --git a/storage/innobase/include/os0file.h b/storage/innobase/include/os0file.h
index 13f9d3de3f8..54f7ceeb4c0 100644
--- a/storage/innobase/include/os0file.h
+++ b/storage/innobase/include/os0file.h
@@ -212,6 +212,10 @@ public:
bool is_LRU() const { return (type & (WRITE_LRU ^ WRITE_ASYNC)) != 0; }
bool is_async() const { return (type & (READ_SYNC ^ READ_ASYNC)) != 0; }
+ void write_complete() const;
+ void read_complete() const;
+ void fake_read_complete(os_offset_t offset) const;
+
/** If requested, free storage space associated with a section of the file.
@param off byte offset from the start (SEEK_SET)
@param len size of the hole in bytes
@@ -1040,6 +1044,11 @@ int os_aio_init();
Frees the asynchronous io system. */
void os_aio_free();
+/** Submit a fake read request during crash recovery.
+@param type fake read request
+@param offset additional context */
+void os_fake_read(const IORequest &type, os_offset_t offset);
+
/** Request a read or write.
@param type I/O request
@param buf buffer
diff --git a/storage/innobase/log/log0recv.cc b/storage/innobase/log/log0recv.cc
index 37a496725fc..4619786ee8d 100644
--- a/storage/innobase/log/log0recv.cc
+++ b/storage/innobase/log/log0recv.cc
@@ -738,7 +738,7 @@ static struct
{
retry:
log_sys.latch.wr_unlock();
- bool fail= false;
+ fil_space_t *space= fil_system.sys_space;
buf_block_t *free_block= buf_LRU_get_free_block(false);
log_sys.latch.wr_lock(SRW_LOCK_CALL);
mysql_mutex_lock(&recv_sys.mutex);
@@ -755,11 +755,12 @@ retry:
there were no buffered records. Either way, we must create a
dummy tablespace with the latest known name,
for dict_drop_index_tree(). */
+ recv_sys.pages_it_invalidate(space_id);
while (p != recv_sys.pages.end() && p->first.space() == space_id)
{
+ ut_ad(!p->second.being_processed);
recv_sys_t::map::iterator r= p++;
- r->second.log.clear();
- recv_sys.pages.erase(r);
+ recv_sys.erase(r);
}
recv_spaces_t::iterator it{recv_spaces.find(space_id)};
if (it != recv_spaces.end())
@@ -782,11 +783,14 @@ retry:
}
}
else
- fail= recv_sys.recover_deferred(p, d->second.file_name, free_block);
+ space= recv_sys.recover_deferred(p, d->second.file_name, free_block);
processed:
- defers.erase(d++);
- if (fail)
+ auto e= d++;
+ defers.erase(e);
+ if (!space)
break;
+ if (space != fil_system.sys_space)
+ space->release();
if (free_block)
continue;
mysql_mutex_unlock(&recv_sys.mutex);
@@ -797,7 +801,7 @@ processed:
mysql_mutex_unlock(&recv_sys.mutex);
if (free_block)
buf_pool.free_block(free_block);
- return fail;
+ return !space;
}
/** Create tablespace metadata for a data file that was initially
@@ -905,28 +909,191 @@ free_space:
}
deferred_spaces;
+/** Report an operation to create, delete, or rename a file during backup.
+@param[in] space_id tablespace identifier
+@param[in] type redo log type
+@param[in] name file name (not NUL-terminated)
+@param[in] len length of name, in bytes
+@param[in] new_name new file name (NULL if not rename)
+@param[in] new_len length of new_name, in bytes (0 if NULL) */
+void (*log_file_op)(uint32_t space_id, int type,
+ const byte* name, ulint len,
+ const byte* new_name, ulint new_len);
+
+void (*undo_space_trunc)(uint32_t space_id);
+
+void (*first_page_init)(uint32_t space_id);
+
+/** Information about initializing page contents during redo log processing.
+FIXME: Rely on recv_sys.pages! */
+class mlog_init_t
+{
+ using map= std::map<const page_id_t, recv_init,
+ std::less<const page_id_t>,
+ ut_allocator<std::pair<const page_id_t, recv_init>>>;
+ /** Map of page initialization operations.
+ FIXME: Merge this to recv_sys.pages! */
+ map inits;
+
+ /** Iterator to the last add() or will_avoid_read(), for speeding up
+ will_avoid_read(). */
+ map::iterator i;
+public:
+ /** Constructor */
+ mlog_init_t() : i(inits.end()) {}
+
+ /** Record that a page will be initialized by the redo log.
+ @param page_id page identifier
+ @param lsn log sequence number
+ @return whether the state was changed */
+ bool add(const page_id_t page_id, lsn_t lsn)
+ {
+ mysql_mutex_assert_owner(&recv_sys.mutex);
+ const recv_init init = { lsn, false };
+ std::pair<map::iterator, bool> p=
+ inits.insert(map::value_type(page_id, init));
+ ut_ad(!p.first->second.created);
+ if (p.second) return true;
+ if (p.first->second.lsn >= lsn) return false;
+ p.first->second = init;
+ i = p.first;
+ return true;
+ }
+
+ /** Get the last stored lsn of the page id and its respective
+ init/load operation.
+ @param page_id page identifier
+ @return the latest page initialization;
+ not valid after releasing recv_sys.mutex. */
+ recv_init &last(page_id_t page_id)
+ {
+ mysql_mutex_assert_owner(&recv_sys.mutex);
+ return inits.find(page_id)->second;
+ }
+
+ /** Determine if a page will be initialized or freed after a time.
+ @param page_id page identifier
+ @param lsn log sequence number
+ @return whether page_id will be freed or initialized after lsn */
+ bool will_avoid_read(page_id_t page_id, lsn_t lsn)
+ {
+ mysql_mutex_assert_owner(&recv_sys.mutex);
+ if (i != inits.end() && i->first == page_id)
+ return i->second.lsn > lsn;
+ i = inits.lower_bound(page_id);
+ return i != inits.end() && i->first == page_id && i->second.lsn > lsn;
+ }
+
+ /** At the end of each recovery batch, reset the 'created' flags. */
+ void reset()
+ {
+ mysql_mutex_assert_owner(&recv_sys.mutex);
+ ut_ad(recv_no_ibuf_operations);
+ for (map::value_type &i : inits)
+ i.second.created= false;
+ }
+
+ /** During the last recovery batch, mark whether there exist
+ buffered changes for the pages that were initialized
+ by buf_page_create() and still reside in the buffer pool. */
+ void mark_ibuf_exist()
+ {
+ mysql_mutex_assert_owner(&recv_sys.mutex);
+
+ for (const map::value_type &i : inits)
+ if (i.second.created)
+ {
+ auto &chain= buf_pool.page_hash.cell_get(i.first.fold());
+ page_hash_latch &hash_lock= buf_pool.page_hash.lock_get(chain);
+
+ hash_lock.lock_shared();
+ buf_block_t *block= reinterpret_cast<buf_block_t*>
+ (buf_pool.page_hash.get(i.first, chain));
+ bool got_latch= block && block->page.lock.x_lock_try();
+ hash_lock.unlock_shared();
+
+ if (!block)
+ continue;
+
+ uint32_t state;
+
+ if (!got_latch)
+ {
+ mysql_mutex_lock(&buf_pool.mutex);
+ block= reinterpret_cast<buf_block_t*>
+ (buf_pool.page_hash.get(i.first, chain));
+ if (!block)
+ {
+ mysql_mutex_unlock(&buf_pool.mutex);
+ continue;
+ }
+
+ state= block->page.fix();
+ mysql_mutex_unlock(&buf_pool.mutex);
+ if (state < buf_page_t::UNFIXED)
+ {
+ block->page.unfix();
+ continue;
+ }
+ block->page.lock.x_lock();
+ state= block->page.unfix();
+ ut_ad(state < buf_page_t::READ_FIX);
+ if (state >= buf_page_t::UNFIXED && block->page.id() == i.first)
+ goto check_ibuf;
+ }
+ else
+ {
+ state= block->page.state();
+ ut_ad(state >= buf_page_t::FREED);
+ ut_ad(state < buf_page_t::READ_FIX);
+
+ if (state >= buf_page_t::UNFIXED)
+ {
+ check_ibuf:
+ mysql_mutex_unlock(&recv_sys.mutex);
+ if (ibuf_page_exists(block->page.id(), block->zip_size()))
+ block->page.set_ibuf_exist();
+ mysql_mutex_lock(&recv_sys.mutex);
+ }
+ }
+
+ block->page.lock.x_unlock();
+ }
+ }
+
+ /** Clear the data structure */
+ void clear() { inits.clear(); i = inits.end(); }
+};
+
+static mlog_init_t mlog_init;
+
/** Try to recover a tablespace that was not readable earlier
-@param p iterator, initially pointing to page_id_t{space_id,0};
- the records will be freed and the iterator advanced
+@param p iterator to the page
@param name tablespace file name
@param free_block spare buffer block
-@return whether recovery failed */
-bool recv_sys_t::recover_deferred(recv_sys_t::map::iterator &p,
- const std::string &name,
- buf_block_t *&free_block)
+@return recovered tablespace
+@retval nullptr if recovery failed */
+fil_space_t *recv_sys_t::recover_deferred(const recv_sys_t::map::iterator &p,
+ const std::string &name,
+ buf_block_t *&free_block)
{
mysql_mutex_assert_owner(&mutex);
- const page_id_t first{p->first};
- ut_ad(first.space());
+ ut_ad(p->first.space());
- recv_spaces_t::iterator it{recv_spaces.find(first.space())};
+ recv_spaces_t::iterator it{recv_spaces.find(p->first.space())};
ut_ad(it != recv_spaces.end());
- if (!first.page_no() && p->second.state == page_recv_t::RECV_WILL_NOT_READ)
+ if (!p->first.page_no() && p->second.skip_read)
{
mtr_t mtr;
- buf_block_t *block= recover_low(first, p, mtr, free_block);
+ ut_ad(!p->second.being_processed);
+ p->second.being_processed= 1;
+ init &init= mlog_init.last(p->first);
+ mysql_mutex_unlock(&mutex);
+ buf_block_t *block= recover_low(p, mtr, free_block, init);
+ mysql_mutex_lock(&mutex);
+ p->second.being_processed= -1;
ut_ad(block == free_block || block == reinterpret_cast<buf_block_t*>(-1));
free_block= nullptr;
if (UNIV_UNLIKELY(!block || block == reinterpret_cast<buf_block_t*>(-1)))
@@ -939,10 +1106,7 @@ bool recv_sys_t::recover_deferred(recv_sys_t::map::iterator &p,
const uint32_t page_no= mach_read_from_4(page + FIL_PAGE_OFFSET);
const uint32_t size= fsp_header_get_field(page, FSP_SIZE);
- ut_ad(it != recv_spaces.end());
-
- if (page_id_t{space_id, page_no} == first && size >= 4 &&
- it != recv_spaces.end() &&
+ if (page_id_t{space_id, page_no} == p->first && size >= 4 &&
fil_space_t::is_valid_flags(flags, space_id) &&
fil_space_t::logical_size(flags) == srv_page_size)
{
@@ -996,10 +1160,10 @@ bool recv_sys_t::recover_deferred(recv_sys_t::map::iterator &p,
}
size_set:
node->deferred= false;
- space->release();
it->second.space= space;
block->page.lock.x_unlock();
- return false;
+ p->second.being_processed= -1;
+ return space;
}
release_and_fail:
@@ -1007,179 +1171,34 @@ bool recv_sys_t::recover_deferred(recv_sys_t::map::iterator &p,
}
fail:
- ib::error() << "Cannot apply log to " << first
+ ib::error() << "Cannot apply log to " << p->first
<< " of corrupted file '" << name << "'";
- return true;
+ return nullptr;
}
-/** Report an operation to create, delete, or rename a file during backup.
-@param[in] space_id tablespace identifier
-@param[in] type redo log type
-@param[in] name file name (not NUL-terminated)
-@param[in] len length of name, in bytes
-@param[in] new_name new file name (NULL if not rename)
-@param[in] new_len length of new_name, in bytes (0 if NULL) */
-void (*log_file_op)(uint32_t space_id, int type,
- const byte* name, ulint len,
- const byte* new_name, ulint new_len);
-
-void (*undo_space_trunc)(uint32_t space_id);
-
-void (*first_page_init)(uint32_t space_id);
-
-/** Information about initializing page contents during redo log processing.
-FIXME: Rely on recv_sys.pages! */
-class mlog_init_t
-{
-public:
- /** A page initialization operation that was parsed from
- the redo log */
- struct init {
- /** log sequence number of the page initialization */
- lsn_t lsn;
- /** Whether btr_page_create() avoided a read of the page.
-
- At the end of the last recovery batch, mark_ibuf_exist()
- will mark pages for which this flag is set. */
- bool created;
- };
-
-private:
- typedef std::map<const page_id_t, init,
- std::less<const page_id_t>,
- ut_allocator<std::pair<const page_id_t, init> > >
- map;
- /** Map of page initialization operations.
- FIXME: Merge this to recv_sys.pages! */
- map inits;
-public:
- /** Record that a page will be initialized by the redo log.
- @param[in] page_id page identifier
- @param[in] lsn log sequence number
- @return whether the state was changed */
- bool add(const page_id_t page_id, lsn_t lsn)
- {
- mysql_mutex_assert_owner(&recv_sys.mutex);
- const init init = { lsn, false };
- std::pair<map::iterator, bool> p = inits.insert(
- map::value_type(page_id, init));
- ut_ad(!p.first->second.created);
- if (p.second) return true;
- if (p.first->second.lsn >= init.lsn) return false;
- p.first->second = init;
- return true;
- }
-
- /** Get the last stored lsn of the page id and its respective
- init/load operation.
- @param[in] page_id page id
- @param[in,out] init initialize log or load log
- @return the latest page initialization;
- not valid after releasing recv_sys.mutex. */
- init& last(page_id_t page_id)
- {
- mysql_mutex_assert_owner(&recv_sys.mutex);
- return inits.find(page_id)->second;
- }
-
- /** Determine if a page will be initialized or freed after a time.
- @param page_id page identifier
- @param lsn log sequence number
- @return whether page_id will be freed or initialized after lsn */
- bool will_avoid_read(page_id_t page_id, lsn_t lsn) const
- {
- mysql_mutex_assert_owner(&recv_sys.mutex);
- auto i= inits.find(page_id);
- return i != inits.end() && i->second.lsn > lsn;
- }
-
- /** At the end of each recovery batch, reset the 'created' flags. */
- void reset()
- {
- mysql_mutex_assert_owner(&recv_sys.mutex);
- ut_ad(recv_no_ibuf_operations);
- for (map::value_type& i : inits) {
- i.second.created = false;
- }
- }
-
- /** On the last recovery batch, mark whether there exist
- buffered changes for the pages that were initialized
- by buf_page_create() and still reside in the buffer pool.
- @param[in,out] mtr dummy mini-transaction */
- void mark_ibuf_exist(mtr_t& mtr)
- {
- mysql_mutex_assert_owner(&recv_sys.mutex);
- mtr.start();
-
- for (const map::value_type& i : inits) {
- if (!i.second.created) {
- continue;
- }
- if (buf_block_t* block = buf_page_get_low(
- i.first, 0, RW_X_LATCH, nullptr,
- BUF_GET_IF_IN_POOL,
- &mtr, nullptr, false)) {
- if (UNIV_LIKELY_NULL(block->page.zip.data)) {
- switch (fil_page_get_type(
- block->page.zip.data)) {
- case FIL_PAGE_INDEX:
- case FIL_PAGE_RTREE:
- if (page_zip_decompress(
- &block->page.zip,
- block->page.frame,
- true)) {
- break;
- }
- ib::error() << "corrupted "
- << block->page.id();
- }
- }
- if (recv_no_ibuf_operations) {
- mtr.commit();
- mtr.start();
- continue;
- }
- mysql_mutex_unlock(&recv_sys.mutex);
- if (ibuf_page_exists(block->page.id(),
- block->zip_size())) {
- block->page.set_ibuf_exist();
- }
- mtr.commit();
- mtr.start();
- mysql_mutex_lock(&recv_sys.mutex);
- }
- }
-
- mtr.commit();
- clear();
- }
-
- /** Clear the data structure */
- void clear() { inits.clear(); }
-};
-
-static mlog_init_t mlog_init;
-
/** Process a record that indicates that a tablespace is
being shrunk in size.
@param page_id first page identifier that is not in the file
@param lsn log sequence number of the shrink operation */
inline void recv_sys_t::trim(const page_id_t page_id, lsn_t lsn)
{
- DBUG_ENTER("recv_sys_t::trim");
- DBUG_LOG("ib_log",
- "discarding log beyond end of tablespace "
- << page_id << " before LSN " << lsn);
- mysql_mutex_assert_owner(&mutex);
- for (recv_sys_t::map::iterator p = pages.lower_bound(page_id);
- p != pages.end() && p->first.space() == page_id.space();) {
- recv_sys_t::map::iterator r = p++;
- if (r->second.trim(lsn)) {
- pages.erase(r);
- }
- }
- DBUG_VOID_RETURN;
+ DBUG_ENTER("recv_sys_t::trim");
+ DBUG_LOG("ib_log", "discarding log beyond end of tablespace "
+ << page_id << " before LSN " << lsn);
+ mysql_mutex_assert_owner(&mutex);
+ if (pages_it != pages.end() && pages_it->first.space() == page_id.space())
+ pages_it= pages.end();
+ for (recv_sys_t::map::iterator p = pages.lower_bound(page_id);
+ p != pages.end() && p->first.space() == page_id.space();)
+ {
+ recv_sys_t::map::iterator r = p++;
+ if (r->second.trim(lsn))
+ {
+ ut_ad(!r->second.being_processed);
+ pages.erase(r);
+ }
+ }
+ DBUG_VOID_RETURN;
}
inline void recv_sys_t::read(os_offset_t total_offset, span<byte> buf)
@@ -1202,15 +1221,10 @@ inline size_t recv_sys_t::files_size()
@param[in] space_id the tablespace ID
@param[in] ftype FILE_MODIFY, FILE_DELETE, or FILE_RENAME
@param[in] lsn lsn of the redo log
-@param[in] store whether the redo log has to be stored */
+@param[in] if_exists whether to check if the tablespace exists */
static void fil_name_process(const char *name, ulint len, uint32_t space_id,
- mfile_type_t ftype, lsn_t lsn, store_t store)
+ mfile_type_t ftype, lsn_t lsn, bool if_exists)
{
- if (srv_operation == SRV_OPERATION_BACKUP
- || srv_operation == SRV_OPERATION_BACKUP_NO_DEFER) {
- return;
- }
-
ut_ad(srv_operation <= SRV_OPERATION_EXPORT_RESTORED
|| srv_operation == SRV_OPERATION_RESTORE
|| srv_operation == SRV_OPERATION_RESTORE_EXPORT);
@@ -1321,7 +1335,7 @@ same_space:
case FIL_LOAD_DEFER:
/** Skip the deferred spaces
when lsn is already processed */
- if (store != store_t::STORE_IF_EXISTS) {
+ if (!if_exists) {
deferred_spaces.add(
space_id, fname.name.c_str(), lsn);
}
@@ -1364,9 +1378,8 @@ void recv_sys_t::close()
deferred_spaces.clear();
ut_d(mysql_mutex_unlock(&mutex));
- last_stored_lsn= 0;
+ scanned_lsn= 0;
mysql_mutex_destroy(&mutex);
- pthread_cond_destroy(&cond);
}
recv_spaces.clear();
@@ -1381,34 +1394,34 @@ void recv_sys_t::create()
ut_ad(this == &recv_sys);
ut_ad(!is_initialised());
mysql_mutex_init(recv_sys_mutex_key, &mutex, nullptr);
- pthread_cond_init(&cond, nullptr);
apply_log_recs = false;
- apply_batch_on = false;
len = 0;
offset = 0;
lsn = 0;
+ scanned_lsn = 1;
found_corrupt_log = false;
found_corrupt_fs = false;
file_checkpoint = 0;
progress_time = time(NULL);
+ ut_ad(pages.empty());
+ pages_it = pages.end();
recv_max_page_lsn = 0;
memset(truncated_undo_spaces, 0, sizeof truncated_undo_spaces);
- last_stored_lsn = 1;
UT_LIST_INIT(blocks, &buf_block_t::unzip_LRU);
}
/** Clear a fully processed set of stored redo log records. */
-inline void recv_sys_t::clear()
+void recv_sys_t::clear()
{
mysql_mutex_assert_owner(&mutex);
apply_log_recs= false;
- apply_batch_on= false;
ut_ad(!after_apply || found_corrupt_fs || !UT_LIST_GET_LAST(blocks));
pages.clear();
+ pages_it= pages.end();
for (buf_block_t *block= UT_LIST_GET_LAST(blocks); block; )
{
@@ -1419,8 +1432,6 @@ inline void recv_sys_t::clear()
buf_block_free(block);
block= prev_block;
}
-
- pthread_cond_broadcast(&cond);
}
/** Free most recovery data structures. */
@@ -1432,52 +1443,14 @@ void recv_sys_t::debug_free()
recovery_on= false;
pages.clear();
+ pages_it= pages.end();
mysql_mutex_unlock(&mutex);
}
-inline void *recv_sys_t::alloc(size_t len)
-{
- mysql_mutex_assert_owner(&mutex);
- ut_ad(len);
- ut_ad(len <= srv_page_size);
-
- buf_block_t *block= UT_LIST_GET_FIRST(blocks);
- if (UNIV_UNLIKELY(!block))
- {
-create_block:
- block= buf_block_alloc();
- block->page.access_time= 1U << 16 |
- ut_calc_align<uint16_t>(static_cast<uint16_t>(len), ALIGNMENT);
- static_assert(ut_is_2pow(ALIGNMENT), "ALIGNMENT must be a power of 2");
- UT_LIST_ADD_FIRST(blocks, block);
- MEM_MAKE_ADDRESSABLE(block->page.frame, len);
- MEM_NOACCESS(block->page.frame + len, srv_page_size - len);
- return my_assume_aligned<ALIGNMENT>(block->page.frame);
- }
-
- size_t free_offset= static_cast<uint16_t>(block->page.access_time);
- ut_ad(!ut_2pow_remainder(free_offset, ALIGNMENT));
- if (UNIV_UNLIKELY(!free_offset))
- {
- ut_ad(srv_page_size == 65536);
- goto create_block;
- }
- ut_ad(free_offset <= srv_page_size);
- free_offset+= len;
-
- if (free_offset > srv_page_size)
- goto create_block;
-
- block->page.access_time= ((block->page.access_time >> 16) + 1) << 16 |
- ut_calc_align<uint16_t>(static_cast<uint16_t>(free_offset), ALIGNMENT);
- MEM_MAKE_ADDRESSABLE(block->page.frame + free_offset - len, len);
- return my_assume_aligned<ALIGNMENT>(block->page.frame + free_offset - len);
-}
-
/** Free a redo log snippet.
-@param data buffer returned by alloc() */
+@param data buffer allocated in add() */
inline void recv_sys_t::free(const void *data)
{
ut_ad(!ut_align_offset(data, ALIGNMENT));
@@ -1502,8 +1475,11 @@ inline void recv_sys_t::free(const void *data)
ut_ad(block->page.state() == buf_page_t::MEMORY);
ut_ad(static_cast<uint16_t>(block->page.access_time - 1) <
srv_page_size);
- ut_ad(block->page.access_time >= 1U << 16);
- if (!((block->page.access_time -= 1U << 16) >> 16))
+ unsigned a= block->page.access_time;
+ ut_ad(a >= 1U << 16);
+ a-= 1U << 16;
+ block->page.access_time= a;
+ if (!(a >> 16))
{
UT_LIST_REMOVE(blocks, block);
MEM_MAKE_ADDRESSABLE(block->page.frame, srv_page_size);
@@ -1689,6 +1665,9 @@ dberr_t recv_sys_t::find_checkpoint()
bool wrong_size= false;
byte *buf;
+ ut_ad(pages.empty());
+ pages_it= pages.end();
+
if (files.empty())
{
file_checkpoint= 0;
@@ -1965,7 +1944,31 @@ inline bool page_recv_t::trim(lsn_t start_lsn)
}
-inline void page_recv_t::recs_t::clear()
+void page_recv_t::recs_t::rewind(lsn_t start_lsn)
+{
+ mysql_mutex_assert_owner(&recv_sys.mutex);
+ log_phys_t *trim= static_cast<log_phys_t*>(head);
+ ut_ad(trim);
+ while (log_phys_t *next= static_cast<log_phys_t*>(trim->next))
+ {
+ ut_ad(trim->start_lsn < start_lsn);
+ if (next->start_lsn == start_lsn)
+ break;
+ trim= next;
+ }
+ tail= trim;
+ log_rec_t *l= tail->next;
+ tail->next= nullptr;
+ while (l)
+ {
+ log_rec_t *next= l->next;
+ recv_sys.free(l);
+ l= next;
+ }
+}
+
+
+void page_recv_t::recs_t::clear()
{
mysql_mutex_assert_owner(&recv_sys.mutex);
for (const log_rec_t *l= head; l; )
@@ -1977,33 +1980,99 @@ inline void page_recv_t::recs_t::clear()
head= tail= nullptr;
}
-
/** Ignore any earlier redo log records for this page. */
inline void page_recv_t::will_not_read()
{
- ut_ad(state == RECV_NOT_PROCESSED || state == RECV_WILL_NOT_READ);
- state= RECV_WILL_NOT_READ;
+ ut_ad(!being_processed);
+ skip_read= true;
log.clear();
}
+void recv_sys_t::erase(map::iterator p)
+{
+ ut_ad(p->second.being_processed <= 0);
+ p->second.log.clear();
+ pages.erase(p);
+}
+
+/** Free log for processed pages. */
+void recv_sys_t::garbage_collect()
+{
+ mysql_mutex_assert_owner(&mutex);
+
+ if (pages_it != pages.end() && pages_it->second.being_processed < 0)
+ pages_it= pages.end();
+
+ for (map::iterator p= pages.begin(); p != pages.end(); )
+ {
+ if (p->second.being_processed < 0)
+ {
+ map::iterator r= p++;
+ erase(r);
+ }
+ else
+ p++;
+ }
+}
+
+/** Allocate a block from the buffer pool for recv_sys.pages */
+ATTRIBUTE_COLD buf_block_t *recv_sys_t::add_block()
+{
+ for (bool freed= false;;)
+ {
+ const auto rs= UT_LIST_GET_LEN(blocks) * 2;
+ mysql_mutex_lock(&buf_pool.mutex);
+ const auto bs=
+ UT_LIST_GET_LEN(buf_pool.free) + UT_LIST_GET_LEN(buf_pool.LRU);
+ if (UNIV_LIKELY(bs > BUF_LRU_MIN_LEN || rs < bs))
+ {
+ buf_block_t *block= buf_LRU_get_free_block(true);
+ mysql_mutex_unlock(&buf_pool.mutex);
+ return block;
+ }
+ /* out of memory: redo log occupies more than 1/3 of buf_pool
+ and there are fewer than BUF_LRU_MIN_LEN pages left */
+ mysql_mutex_unlock(&buf_pool.mutex);
+ if (freed)
+ return nullptr;
+ freed= true;
+ garbage_collect();
+ }
+}
+
+/** Wait for buffer pool to become available. */
+ATTRIBUTE_COLD void recv_sys_t::wait_for_pool(size_t pages)
+{
+ mysql_mutex_unlock(&mutex);
+ os_aio_wait_until_no_pending_reads(false);
+ mysql_mutex_lock(&mutex);
+ garbage_collect();
+ mysql_mutex_lock(&buf_pool.mutex);
+ bool need_more= UT_LIST_GET_LEN(buf_pool.free) < pages;
+ mysql_mutex_unlock(&buf_pool.mutex);
+ if (need_more)
+ buf_flush_sync_batch(lsn);
+}
/** Register a redo log snippet for a page.
@param it page iterator
@param start_lsn start LSN of the mini-transaction
@param lsn @see mtr_t::commit_lsn()
@param l redo log snippet
-@param len length of l, in bytes */
-inline void recv_sys_t::add(map::iterator it, lsn_t start_lsn, lsn_t lsn,
- const byte *l, size_t len)
+@param len length of l, in bytes
+@return whether we ran out of memory */
+ATTRIBUTE_NOINLINE
+bool recv_sys_t::add(map::iterator it, lsn_t start_lsn, lsn_t lsn,
+ const byte *l, size_t len)
{
mysql_mutex_assert_owner(&mutex);
- page_id_t page_id = it->first;
page_recv_t &recs= it->second;
+ buf_block_t *block;
switch (*l & 0x70) {
case FREE_PAGE: case INIT_PAGE:
recs.will_not_read();
- mlog_init.add(page_id, start_lsn); /* FIXME: remove this! */
+ mlog_init.add(it->first, start_lsn); /* FIXME: remove this! */
/* fall through */
default:
log_phys_t *tail= static_cast<log_phys_t*>(recs.log.last());
@@ -2012,7 +2081,7 @@ inline void recv_sys_t::add(map::iterator it, lsn_t start_lsn, lsn_t lsn,
if (tail->start_lsn != start_lsn)
break;
ut_ad(tail->lsn == lsn);
- buf_block_t *block= UT_LIST_GET_LAST(blocks);
+ block= UT_LIST_GET_LAST(blocks);
ut_ad(block);
const size_t used= static_cast<uint16_t>(block->page.access_time - 1) + 1;
ut_ad(used >= ALIGNMENT);
@@ -2025,7 +2094,7 @@ append:
MEM_MAKE_ADDRESSABLE(end + 1, len);
/* Append to the preceding record for the page */
tail->append(l, len);
- return;
+ return false;
}
if (end <= &block->page.frame[used - ALIGNMENT] ||
&block->page.frame[used] >= end)
@@ -2039,8 +2108,49 @@ append:
ut_calc_align<uint16_t>(static_cast<uint16_t>(new_used), ALIGNMENT);
goto append;
}
- recs.log.append(new (alloc(log_phys_t::alloc_size(len)))
+
+ const size_t size{log_phys_t::alloc_size(len)};
+ ut_ad(size <= srv_page_size);
+ void *buf;
+ block= UT_LIST_GET_FIRST(blocks);
+ if (UNIV_UNLIKELY(!block))
+ {
+ create_block:
+ block= add_block();
+ if (UNIV_UNLIKELY(!block))
+ return true;
+ block->page.access_time= 1U << 16 |
+ ut_calc_align<uint16_t>(static_cast<uint16_t>(size), ALIGNMENT);
+ static_assert(ut_is_2pow(ALIGNMENT), "ALIGNMENT must be a power of 2");
+ UT_LIST_ADD_FIRST(blocks, block);
+ MEM_MAKE_ADDRESSABLE(block->page.frame, size);
+ MEM_NOACCESS(block->page.frame + size, srv_page_size - size);
+ buf= block->page.frame;
+ }
+ else
+ {
+ size_t free_offset= static_cast<uint16_t>(block->page.access_time);
+ ut_ad(!ut_2pow_remainder(free_offset, ALIGNMENT));
+ if (UNIV_UNLIKELY(!free_offset))
+ {
+ ut_ad(srv_page_size == 65536);
+ goto create_block;
+ }
+ ut_ad(free_offset <= srv_page_size);
+ free_offset+= size;
+
+ if (free_offset > srv_page_size)
+ goto create_block;
+
+ block->page.access_time= ((block->page.access_time >> 16) + 1) << 16 |
+ ut_calc_align<uint16_t>(static_cast<uint16_t>(free_offset), ALIGNMENT);
+ MEM_MAKE_ADDRESSABLE(block->page.frame + free_offset - size, size);
+ buf= block->page.frame + free_offset - size;
+ }
+
+ recs.log.append(new (my_assume_aligned<ALIGNMENT>(buf))
log_phys_t{start_lsn, lsn, l, len});
+ return false;
}
/** Store/remove the freed pages in fil_name_t of recv_spaces.
@@ -2304,13 +2414,84 @@ struct recv_ring : public recv_buf
};
#endif
-/** Parse and register one log_t::FORMAT_10_8 mini-transaction.
-@param store whether to store the records
-@param l log data source */
template<typename source>
-inline recv_sys_t::parse_mtr_result recv_sys_t::parse(store_t store, source &l)
+void recv_sys_t::rewind(source &l, source &begin) noexcept
+{
+ ut_ad(srv_operation != SRV_OPERATION_BACKUP);
+ mysql_mutex_assert_owner(&mutex);
+
+ const source end= l;
+ uint32_t rlen;
+ for (l= begin; !(l == end); l+= rlen)
+ {
+ const source recs{l};
+ ++l;
+ const byte b= *recs;
+
+ ut_ad(b > 1);
+ ut_ad(UNIV_LIKELY((b & 0x70) != RESERVED) || srv_force_recovery);
+
+ rlen= b & 0xf;
+ if (!rlen)
+ {
+ const uint32_t lenlen= mlog_decode_varint_length(*l);
+ const uint32_t addlen= mlog_decode_varint(l);
+ ut_ad(addlen != MLOG_DECODE_ERROR);
+ rlen= addlen + 15 - lenlen;
+ l+= lenlen;
+ }
+ ut_ad(!l.is_eof(rlen));
+ if (b & 0x80)
+ continue;
+
+ uint32_t idlen= mlog_decode_varint_length(*l);
+ if (UNIV_UNLIKELY(idlen > 5 || idlen >= rlen))
+ continue;
+ const uint32_t space_id= mlog_decode_varint(l);
+ if (UNIV_UNLIKELY(space_id == MLOG_DECODE_ERROR))
+ continue;
+ l+= idlen;
+ rlen-= idlen;
+ idlen= mlog_decode_varint_length(*l);
+ if (UNIV_UNLIKELY(idlen > 5 || idlen > rlen))
+ continue;
+ const uint32_t page_no= mlog_decode_varint(l);
+ if (UNIV_UNLIKELY(page_no == MLOG_DECODE_ERROR))
+ continue;
+ const page_id_t id{space_id, page_no};
+ if (pages_it == pages.end() || pages_it->first != id)
+ {
+ pages_it= pages.find(id);
+ if (pages_it == pages.end())
+ continue;
+ }
+
+ ut_ad(!pages_it->second.being_processed);
+ const log_phys_t *head=
+ static_cast<log_phys_t*>(*pages_it->second.log.begin());
+ if (!head || head->start_lsn == lsn)
+ {
+ erase(pages_it);
+ pages_it= pages.end();
+ }
+ else
+ pages_it->second.log.rewind(lsn);
+ }
+
+ l= begin;
+ pages_it= pages.end();
+}
+
+/** Parse and register one log_t::FORMAT_10_8 mini-transaction.
+@tparam store whether to store the records
+@param l log data source
+@param if_exists if store: whether to check if the tablespace exists */
+template<typename source,bool store>
+inline
+recv_sys_t::parse_mtr_result recv_sys_t::parse(source &l, bool if_exists)
noexcept
{
+ restart:
#ifndef SUX_LOCK_GENERIC
ut_ad(log_sys.latch.is_write_locked() ||
srv_operation == SRV_OPERATION_BACKUP ||
@@ -2319,12 +2500,15 @@ inline recv_sys_t::parse_mtr_result recv_sys_t::parse(store_t store, source &l)
mysql_mutex_assert_owner(&mutex);
ut_ad(log_sys.next_checkpoint_lsn);
ut_ad(log_sys.is_latest());
+ ut_ad(store || !if_exists);
+ ut_ad(store ||
+ srv_operation != SRV_OPERATION_BACKUP ||
+ srv_operation != SRV_OPERATION_BACKUP_NO_DEFER);
alignas(8) byte iv[MY_AES_BLOCK_SIZE];
byte *decrypt_buf= static_cast<byte*>(alloca(srv_page_size));
const lsn_t start_lsn{lsn};
- map::iterator cached_pages_it{pages.end()};
/* Check that the entire mini-transaction is included within the buffer */
if (l.is_eof(0))
@@ -2333,7 +2517,7 @@ inline recv_sys_t::parse_mtr_result recv_sys_t::parse(store_t store, source &l)
if (*l <= 1)
return GOT_EOF; /* We should never write an empty mini-transaction. */
- const source begin{l};
+ source begin{l};
uint32_t rlen;
for (uint32_t total_len= 0; !l.is_eof(); l+= rlen, total_len+= rlen)
{
@@ -2433,7 +2617,6 @@ inline recv_sys_t::parse_mtr_result recv_sys_t::parse(store_t store, source &l)
sql_print_error("InnoDB: Unknown log record at LSN " LSN_PF, lsn);
corrupted:
found_corrupt_log= true;
- pthread_cond_broadcast(&cond);
return GOT_EOF;
}
@@ -2510,13 +2693,13 @@ inline recv_sys_t::parse_mtr_result recv_sys_t::parse(store_t store, source &l)
mach_write_to_4(iv + 12, page_no);
got_page_op= !(b & 0x80);
if (!got_page_op);
- else if (srv_operation == SRV_OPERATION_BACKUP)
+ else if (!store && srv_operation == SRV_OPERATION_BACKUP)
{
if (page_no == 0 && first_page_init && (b & 0x10))
first_page_init(space_id);
continue;
}
- else if (file_checkpoint && !is_predefined_tablespace(space_id))
+ else if (store && file_checkpoint && !is_predefined_tablespace(space_id))
{
recv_spaces_t::iterator i= recv_spaces.lower_bound(space_id);
if (i != recv_spaces.end() && i->first == space_id);
@@ -2585,7 +2768,7 @@ inline recv_sys_t::parse_mtr_result recv_sys_t::parse(store_t store, source &l)
trim({space_id, 0}, lsn);
truncated_undo_spaces[space_id - srv_undo_space_id_start]=
{ lsn, page_no };
- if (undo_space_trunc)
+ if (!store && undo_space_trunc)
undo_space_trunc(space_id);
#endif
last_offset= 1; /* the next record must not be same_page */
@@ -2626,7 +2809,7 @@ inline recv_sys_t::parse_mtr_result recv_sys_t::parse(store_t store, source &l)
{
if (UNIV_UNLIKELY(rlen + last_offset > srv_page_size))
goto record_corrupted;
- if (UNIV_UNLIKELY(!page_no) && file_checkpoint)
+ if (store && UNIV_UNLIKELY(!page_no) && file_checkpoint)
{
const bool has_size= last_offset <= FSP_HEADER_OFFSET + FSP_SIZE &&
last_offset + rlen >= FSP_HEADER_OFFSET + FSP_SIZE + 4;
@@ -2705,38 +2888,57 @@ inline recv_sys_t::parse_mtr_result recv_sys_t::parse(store_t store, source &l)
ut_ad(modified.emplace(id).second || (b & 0x70) != INIT_PAGE);
}
#endif
- const bool is_init= (b & 0x70) <= INIT_PAGE;
- switch (store) {
- case STORE_IF_EXISTS:
- if (fil_space_t *space= fil_space_t::get(space_id))
+ if (store)
+ {
+ if (if_exists)
{
- const auto size= space->get_size();
- space->release();
- if (!size)
+ if (fil_space_t *space= fil_space_t::get(space_id))
+ {
+ const auto size= space->get_size();
+ space->release();
+ if (!size)
+ continue;
+ }
+ else if (!deferred_spaces.find(space_id))
continue;
}
- else if (!deferred_spaces.find(space_id))
- continue;
- /* fall through */
- case STORE_YES:
if (!mlog_init.will_avoid_read(id, start_lsn))
{
- if (cached_pages_it == pages.end() ||
- cached_pages_it->first != id)
- cached_pages_it= pages.emplace(id, page_recv_t{}).first;
- add(cached_pages_it, start_lsn, lsn,
- l.get_buf(cl, recs, decrypt_buf), l - recs + rlen);
+ if (pages_it == pages.end() || pages_it->first != id)
+ pages_it= pages.emplace(id, page_recv_t{}).first;
+ if (UNIV_UNLIKELY(add(pages_it, start_lsn, lsn,
+ l.get_buf(cl, recs, decrypt_buf),
+ l - recs + rlen)))
+ {
+ lsn= start_lsn;
+ log_sys.set_recovered_lsn(start_lsn);
+ l+= rlen;
+ offset= begin.ptr - log_sys.buf;
+ rewind(l, begin);
+ if (if_exists)
+ {
+ apply(false);
+ if (is_corrupt_fs())
+ return GOT_EOF;
+ goto restart;
+ }
+ sql_print_information("InnoDB: Multi-batch recovery needed at LSN "
+ LSN_PF, lsn);
+ return GOT_OOM;
+ }
}
- continue;
- case STORE_NO:
- if (!is_init)
- continue;
+ }
+ else if ((b & 0x70) <= INIT_PAGE)
+ {
mlog_init.add(id, start_lsn);
- map::iterator i= pages.find(id);
- if (i == pages.end())
- continue;
- i->second.log.clear();
- pages.erase(i);
+ if (pages_it == pages.end() || pages_it->first != id)
+ {
+ pages_it= pages.find(id);
+ if (pages_it == pages.end())
+ continue;
+ }
+ map::iterator r= pages_it++;
+ erase(r);
}
}
else if (rlen)
@@ -2749,6 +2951,11 @@ inline recv_sys_t::parse_mtr_result recv_sys_t::parse(store_t store, source &l)
if (rlen < UNIV_PAGE_SIZE_MAX && !l.is_zero(rlen))
continue;
}
+ else if (store)
+ {
+ ut_ad(file_checkpoint);
+ continue;
+ }
else if (const lsn_t c= l.read8())
{
if (UNIV_UNLIKELY(srv_print_verbose_log == 2))
@@ -2830,21 +3037,27 @@ inline recv_sys_t::parse_mtr_result recv_sys_t::parse(store_t store, source &l)
if (UNIV_UNLIKELY(!recv_needed_recovery && srv_read_only_mode))
continue;
+ if (!store &&
+ (srv_operation == SRV_OPERATION_BACKUP ||
+ srv_operation == SRV_OPERATION_BACKUP_NO_DEFER))
+ {
+ if ((b & 0xf0) < FILE_CHECKPOINT && log_file_op)
+ log_file_op(space_id, b & 0xf0,
+ reinterpret_cast<const byte*>(fn),
+ static_cast<ulint>(fnend - fn),
+ reinterpret_cast<const byte*>(fn2),
+ fn2 ? static_cast<ulint>(fn2end - fn2) : 0);
+ continue;
+ }
+
fil_name_process(fn, fnend - fn, space_id,
(b & 0xf0) == FILE_DELETE ? FILE_DELETE : FILE_MODIFY,
- start_lsn, store);
-
- if ((b & 0xf0) < FILE_CHECKPOINT && log_file_op)
- log_file_op(space_id, b & 0xf0,
- reinterpret_cast<const byte*>(fn),
- static_cast<ulint>(fnend - fn),
- reinterpret_cast<const byte*>(fn2),
- fn2 ? static_cast<ulint>(fn2end - fn2) : 0);
+ start_lsn, if_exists);
if (fn2)
{
fil_name_process(fn2, fn2end - fn2, space_id,
- FILE_RENAME, start_lsn, store);
+ FILE_RENAME, start_lsn, if_exists);
if (file_checkpoint)
{
const size_t len= fn2end - fn2;
@@ -2868,18 +3081,23 @@ inline recv_sys_t::parse_mtr_result recv_sys_t::parse(store_t store, source &l)
return OK;
}
-ATTRIBUTE_NOINLINE
-recv_sys_t::parse_mtr_result recv_sys_t::parse_mtr(store_t store) noexcept
+template<bool store>
+recv_sys_t::parse_mtr_result recv_sys_t::parse_mtr(bool if_exists) noexcept
{
recv_buf s{&log_sys.buf[recv_sys.offset]};
- return recv_sys.parse(store, s);
+ return recv_sys.parse<recv_buf,store>(s, if_exists);
}
+/** for mariadb-backup; @see xtrabackup_copy_logfile() */
+template
+recv_sys_t::parse_mtr_result recv_sys_t::parse_mtr<false>(bool) noexcept;
+
#ifdef HAVE_PMEM
-recv_sys_t::parse_mtr_result recv_sys_t::parse_pmem(store_t store) noexcept
+template<bool store>
+recv_sys_t::parse_mtr_result recv_sys_t::parse_pmem(bool if_exists) noexcept
{
- recv_sys_t::parse_mtr_result r{parse_mtr(store)};
- if (r != PREMATURE_EOF || !log_sys.is_pmem())
+ recv_sys_t::parse_mtr_result r{parse_mtr<store>(if_exists)};
+ if (UNIV_LIKELY(r != PREMATURE_EOF) || !log_sys.is_pmem())
return r;
ut_ad(recv_sys.len == log_sys.file_size);
ut_ad(recv_sys.offset >= log_sys.START_OFFSET);
@@ -2888,7 +3106,7 @@ recv_sys_t::parse_mtr_result recv_sys_t::parse_pmem(store_t store) noexcept
{recv_sys.offset == recv_sys.len
? &log_sys.buf[log_sys.START_OFFSET]
: &log_sys.buf[recv_sys.offset]};
- return recv_sys.parse(store, s);
+ return recv_sys.parse<recv_ring,store>(s, if_exists);
}
#endif
@@ -2896,23 +3114,22 @@ recv_sys_t::parse_mtr_result recv_sys_t::parse_pmem(store_t store) noexcept
lsn of a log record.
@param[in,out] block buffer pool page
@param[in,out] mtr mini-transaction
-@param[in,out] p recovery address
+@param[in,out] recs log records to apply
@param[in,out] space tablespace, or NULL if not looked up yet
@param[in,out] init page initialization operation, or NULL
@return the recovered page
@retval nullptr on failure */
static buf_block_t *recv_recover_page(buf_block_t *block, mtr_t &mtr,
- const recv_sys_t::map::iterator &p,
- fil_space_t *space= nullptr,
- mlog_init_t::init *init= nullptr)
+ page_recv_t &recs,
+ fil_space_t *space,
+ recv_init *init)
{
- mysql_mutex_assert_owner(&recv_sys.mutex);
+ mysql_mutex_assert_not_owner(&recv_sys.mutex);
ut_ad(recv_sys.apply_log_recs);
ut_ad(recv_needed_recovery);
ut_ad(!init || init->created);
ut_ad(!init || init->lsn);
- ut_ad(block->page.id() == p->first);
- ut_ad(!p->second.is_being_processed());
+ ut_ad(recs.being_processed == 1);
ut_ad(!space || space->id == block->page.id().space());
ut_ad(log_sys.is_latest());
@@ -2924,10 +3141,6 @@ static buf_block_t *recv_recover_page(buf_block_t *block, mtr_t &mtr,
block->page.id().space(),
block->page.id().page_no()));
- p->second.state = page_recv_t::RECV_BEING_PROCESSED;
-
- mysql_mutex_unlock(&recv_sys.mutex);
-
byte *frame = UNIV_LIKELY_NULL(block->page.zip.data)
? block->page.zip.data
: block->page.frame;
@@ -2941,7 +3154,7 @@ static buf_block_t *recv_recover_page(buf_block_t *block, mtr_t &mtr,
bool skipped_after_init = false;
- for (const log_rec_t* recv : p->second.log) {
+ for (const log_rec_t* recv : recs.log) {
const log_phys_t* l = static_cast<const log_phys_t*>(recv);
ut_ad(l->lsn);
ut_ad(end_lsn <= l->lsn);
@@ -2999,8 +3212,7 @@ static buf_block_t *recv_recover_page(buf_block_t *block, mtr_t &mtr,
block->page.id().space(),
block->page.id().page_no()));
- log_phys_t::apply_status a= l->apply(*block,
- p->second.last_offset);
+ log_phys_t::apply_status a= l->apply(*block, recs.last_offset);
switch (a) {
case log_phys_t::APPLIED_NO:
@@ -3123,26 +3335,11 @@ set_start_lsn:
mtr.commit();
done:
- time_t now = time(NULL);
-
- mysql_mutex_lock(&recv_sys.mutex);
-
+ /* FIXME: do this in page read, protected with recv_sys.mutex! */
if (recv_max_page_lsn < page_lsn) {
recv_max_page_lsn = page_lsn;
}
- ut_ad(!block || p->second.is_being_processed());
- ut_ad(!block || !recv_sys.pages.empty());
-
- if (recv_sys.report(now)) {
- const size_t n = recv_sys.pages.size();
- sql_print_information("InnoDB: To recover: %zu pages from log",
- n);
- service_manager_extend_timeout(INNODB_EXTEND_TIMEOUT_INTERVAL,
- "To recover: %zu pages"
- " from log", n);
- }
-
return block;
}
@@ -3156,146 +3353,347 @@ ATTRIBUTE_COLD void recv_sys_t::free_corrupted_page(page_id_t page_id)
mysql_mutex_lock(&mutex);
map::iterator p= pages.find(page_id);
- if (p != pages.end())
+ if (p == pages.end())
{
- p->second.log.clear();
- pages.erase(p);
- if (!srv_force_recovery)
- {
- set_corrupt_fs();
- ib::error() << "Unable to apply log to corrupted page " << page_id
- << "; set innodb_force_recovery to ignore";
- }
- else
- ib::warn() << "Discarding log for corrupted page " << page_id;
+ mysql_mutex_unlock(&mutex);
+ return;
}
- if (pages.empty())
- pthread_cond_broadcast(&cond);
+ p->second.being_processed= -1;
+ if (!srv_force_recovery)
+ set_corrupt_fs();
mysql_mutex_unlock(&mutex);
-}
-/** Possibly finish a recovery batch. */
-inline void recv_sys_t::maybe_finish_batch()
-{
- mysql_mutex_assert_owner(&mutex);
- ut_ad(recovery_on);
- if (!apply_batch_on || pages.empty() || is_corrupt_log() || is_corrupt_fs())
- pthread_cond_broadcast(&cond);
+ ib::error_or_warn(!srv_force_recovery)
+ << "Unable to apply log to corrupted page " << page_id;
}
ATTRIBUTE_COLD void recv_sys_t::set_corrupt_log()
{
mysql_mutex_lock(&mutex);
found_corrupt_log= true;
- pthread_cond_broadcast(&cond);
mysql_mutex_unlock(&mutex);
}
ATTRIBUTE_COLD void recv_sys_t::set_corrupt_fs()
{
mysql_mutex_assert_owner(&mutex);
+ if (!srv_force_recovery)
+ sql_print_information("InnoDB: Set innodb_force_recovery=1"
+ " to ignore corrupted pages.");
found_corrupt_fs= true;
- pthread_cond_broadcast(&cond);
}
-/** Apply any buffered redo log to a page that was just read from a data file.
-@param[in,out] space tablespace
-@param[in,out] bpage buffer pool page
+/** Apply any buffered redo log to a page.
+@param space tablespace
+@param bpage buffer pool page
@return whether the page was recovered correctly */
bool recv_recover_page(fil_space_t* space, buf_page_t* bpage)
{
- mtr_t mtr;
- mtr.start();
- mtr.set_log_mode(MTR_LOG_NO_REDO);
-
- ut_ad(bpage->frame);
- /* Move the ownership of the x-latch on the page to
- this OS thread, so that we can acquire a second
- x-latch on it. This is needed for the operations to
- the page to pass the debug checks. */
- bpage->lock.claim_ownership();
- bpage->lock.x_lock_recursive();
- bpage->fix_on_recovery();
- mtr.memo_push(reinterpret_cast<buf_block_t*>(bpage),
- MTR_MEMO_PAGE_X_FIX);
-
- buf_block_t* success = reinterpret_cast<buf_block_t*>(bpage);
+ mtr_t mtr;
+ mtr.start();
+ mtr.set_log_mode(MTR_LOG_NO_REDO);
- mysql_mutex_lock(&recv_sys.mutex);
- if (recv_sys.apply_log_recs) {
- recv_sys_t::map::iterator p = recv_sys.pages.find(bpage->id());
- if (p != recv_sys.pages.end()
- && !p->second.is_being_processed()) {
- success = recv_recover_page(success, mtr, p, space);
- if (UNIV_LIKELY(!!success)) {
- p->second.log.clear();
- recv_sys.pages.erase(p);
- }
- recv_sys.maybe_finish_batch();
- goto func_exit;
- }
- }
+ ut_ad(bpage->frame);
+ /* Move the ownership of the x-latch on the page to this OS thread,
+ so that we can acquire a second x-latch on it. This is needed for
+ the operations to the page to pass the debug checks. */
+ bpage->lock.claim_ownership();
+ bpage->lock.x_lock_recursive();
+ bpage->fix_on_recovery();
+ mtr.memo_push(reinterpret_cast<buf_block_t*>(bpage), MTR_MEMO_PAGE_X_FIX);
- mtr.commit();
+ buf_block_t *success= reinterpret_cast<buf_block_t*>(bpage);
+
+ mysql_mutex_lock(&recv_sys.mutex);
+ if (recv_sys.apply_log_recs)
+ {
+ const page_id_t id{bpage->id()};
+ recv_sys_t::map::iterator p= recv_sys.pages.find(id);
+ if (p == recv_sys.pages.end());
+ else if (p->second.being_processed < 0)
+ {
+ recv_sys.pages_it_invalidate(p);
+ recv_sys.erase(p);
+ }
+ else
+ {
+ p->second.being_processed= 1;
+ recv_sys_t::init *init= nullptr;
+ if (p->second.skip_read)
+ (init= &mlog_init.last(id))->created= true;
+ mysql_mutex_unlock(&recv_sys.mutex);
+ success= recv_recover_page(success, mtr, p->second, space, init);
+ p->second.being_processed= -1;
+ goto func_exit;
+ }
+ }
+
+ mysql_mutex_unlock(&recv_sys.mutex);
+ mtr.commit();
func_exit:
- mysql_mutex_unlock(&recv_sys.mutex);
- ut_ad(mtr.has_committed());
- return success;
+ ut_ad(mtr.has_committed());
+ return success;
}
-/** Read pages for which log needs to be applied.
-@param page_id first page identifier to read
-@param i iterator to recv_sys.pages */
-TRANSACTIONAL_TARGET
-static void recv_read_in_area(page_id_t page_id, recv_sys_t::map::iterator i)
+void IORequest::fake_read_complete(os_offset_t offset) const
{
- uint32_t page_nos[32];
- ut_ad(page_id == i->first);
- page_id.set_page_no(ut_2pow_round(page_id.page_no(), 32U));
- const page_id_t up_limit{page_id + 31};
- uint32_t* p= page_nos;
+ ut_ad(node);
+ ut_ad(is_read());
+ ut_ad(bpage);
+ ut_ad(bpage->frame);
+ ut_ad(recv_recovery_is_on());
+ ut_ad(offset);
+
+ mtr_t mtr;
+ mtr.start();
+ mtr.set_log_mode(MTR_LOG_NO_REDO);
- for (; i != recv_sys.pages.end() && i->first <= up_limit; i++)
+ ut_ad(bpage->frame);
+ /* Move the ownership of the x-latch on the page to this OS thread,
+ so that we can acquire a second x-latch on it. This is needed for
+ the operations to the page to pass the debug checks. */
+ bpage->lock.claim_ownership();
+ bpage->lock.x_lock_recursive();
+ bpage->fix_on_recovery();
+ mtr.memo_push(reinterpret_cast<buf_block_t*>(bpage), MTR_MEMO_PAGE_X_FIX);
+
+ page_recv_t &recs= *reinterpret_cast<page_recv_t*>(slot);
+ ut_ad(recs.being_processed == 1);
+ recv_init &init= *reinterpret_cast<recv_init*>(offset);
+ ut_ad(init.lsn > 1);
+ init.created= true;
+
+ if (recv_recover_page(reinterpret_cast<buf_block_t*>(bpage),
+ mtr, recs, node->space, &init))
{
- if (i->second.state == page_recv_t::RECV_NOT_PROCESSED)
+ ut_ad(bpage->oldest_modification() || bpage->is_freed());
+ bpage->lock.x_unlock(true);
+ }
+ recs.being_processed= -1;
+ ut_ad(mtr.has_committed());
+
+ node->space->release();
+}
+
+/** @return whether a page has been freed */
+inline bool fil_space_t::is_freed(uint32_t page)
+{
+ std::lock_guard<std::mutex> freed_lock(freed_range_mutex);
+ return freed_ranges.contains(page);
+}
+
+bool recv_sys_t::report(time_t time)
+{
+ if (time - progress_time < 15)
+ return false;
+ progress_time= time;
+ return true;
+}
+
+ATTRIBUTE_COLD
+void recv_sys_t::report_progress() const
+{
+ mysql_mutex_assert_owner(&mutex);
+ const size_t n{pages.size()};
+ if (recv_sys.scanned_lsn == recv_sys.lsn)
+ {
+ sql_print_information("InnoDB: To recover: %zu pages", n);
+ service_manager_extend_timeout(INNODB_EXTEND_TIMEOUT_INTERVAL,
+ "To recover: %zu pages", n);
+ }
+ else
+ {
+ sql_print_information("InnoDB: To recover: LSN " LSN_PF
+ "/" LSN_PF "; %zu pages",
+ recv_sys.lsn, recv_sys.scanned_lsn, n);
+ service_manager_extend_timeout(INNODB_EXTEND_TIMEOUT_INTERVAL,
+ "To recover: LSN " LSN_PF
+ "/" LSN_PF "; %zu pages",
+ recv_sys.lsn, recv_sys.scanned_lsn, n);
+ }
+}
+
+/** Apply a recovery batch.
+@param space_id current tablespace identifier
+@param space current tablespace
+@param free_block spare buffer block
+@param last_batch whether it is possible to write more redo log
+@return whether the caller must provide a new free_block */
+bool recv_sys_t::apply_batch(uint32_t space_id, fil_space_t *&space,
+ buf_block_t *&free_block, bool last_batch)
+{
+ mysql_mutex_assert_owner(&mutex);
+ ut_ad(pages_it != pages.end());
+ ut_ad(!pages_it->second.log.empty());
+
+ mysql_mutex_lock(&buf_pool.mutex);
+ size_t n= 0, max_n= std::min<size_t>(BUF_LRU_MIN_LEN,
+ UT_LIST_GET_LEN(buf_pool.LRU) +
+ UT_LIST_GET_LEN(buf_pool.free));
+ mysql_mutex_unlock(&buf_pool.mutex);
+
+ map::iterator begin= pages.end();
+ page_id_t begin_id{~0ULL};
+
+ while (pages_it != pages.end() && n < max_n)
+ {
+ ut_ad(!buf_dblwr.is_inside(pages_it->first));
+ if (!pages_it->second.being_processed)
{
- i->second.state= page_recv_t::RECV_BEING_READ;
- *p++= i->first.page_no();
+ if (space_id != pages_it->first.space())
+ {
+ space_id= pages_it->first.space();
+ if (space)
+ space->release();
+ space= fil_space_t::get(space_id);
+ if (!space)
+ {
+ auto d= deferred_spaces.defers.find(space_id);
+ if (d == deferred_spaces.defers.end() || d->second.deleted)
+ /* For deleted files we preserve the deferred_spaces entry */;
+ else if (!free_block)
+ return true;
+ else
+ {
+ space= recover_deferred(pages_it, d->second.file_name, free_block);
+ deferred_spaces.defers.erase(d);
+ if (!space && !srv_force_recovery)
+ {
+ set_corrupt_fs();
+ return false;
+ }
+ }
+ }
+ }
+ if (!space || space->is_freed(pages_it->first.page_no()))
+ pages_it->second.being_processed= -1;
+ else if (!n++)
+ {
+ begin= pages_it;
+ begin_id= pages_it->first;
+ }
}
+ pages_it++;
}
- if (p != page_nos)
+ if (!last_batch)
+ log_sys.latch.wr_unlock();
+
+ pages_it= begin;
+
+ if (report(time(nullptr)))
+ report_progress();
+
+ if (!n)
+ goto wait;
+
+ mysql_mutex_lock(&buf_pool.mutex);
+
+ if (UNIV_UNLIKELY(UT_LIST_GET_LEN(buf_pool.free) < n))
{
- mysql_mutex_unlock(&recv_sys.mutex);
- buf_read_recv_pages(page_id.space(), {page_nos, p});
- mysql_mutex_lock(&recv_sys.mutex);
+ mysql_mutex_unlock(&buf_pool.mutex);
+ wait:
+ wait_for_pool(n);
+ if (n);
+ else if (!last_batch)
+ goto unlock_relock;
+ else
+ goto get_last;
+ pages_it= pages.lower_bound(begin_id);
+ ut_ad(pages_it != pages.end());
}
+ else
+ mysql_mutex_unlock(&buf_pool.mutex);
+
+ while (pages_it != pages.end())
+ {
+ ut_ad(!buf_dblwr.is_inside(pages_it->first));
+ if (!pages_it->second.being_processed)
+ {
+ const page_id_t id{pages_it->first};
+
+ if (space_id != id.space())
+ {
+ space_id= id.space();
+ if (space)
+ space->release();
+ space= fil_space_t::get(space_id);
+ }
+ if (!space)
+ {
+ const auto it= deferred_spaces.defers.find(space_id);
+ if (it != deferred_spaces.defers.end() && !it->second.deleted)
+ /* The records must be processed after recover_deferred(). */
+ goto next;
+ goto space_not_found;
+ }
+ else if (space->is_freed(id.page_no()))
+ {
+ space_not_found:
+ pages_it->second.being_processed= -1;
+ goto next;
+ }
+ else
+ {
+ page_recv_t &recs= pages_it->second;
+ ut_ad(!recs.log.empty());
+ recs.being_processed= 1;
+ init *init= recs.skip_read ? &mlog_init.last(id) : nullptr;
+ mysql_mutex_unlock(&mutex);
+ buf_read_recover(space, id, recs, init);
+ }
+
+ if (!--n)
+ {
+ if (last_batch)
+ goto relock_last;
+ goto relock;
+ }
+ mysql_mutex_lock(&mutex);
+ pages_it= pages.lower_bound(id);
+ }
+ else
+ next:
+ pages_it++;
+ }
+
+ if (!last_batch)
+ {
+ unlock_relock:
+ mysql_mutex_unlock(&mutex);
+ relock:
+ log_sys.latch.wr_lock(SRW_LOCK_CALL);
+ relock_last:
+ mysql_mutex_lock(&mutex);
+ get_last:
+ pages_it= pages.lower_bound(begin_id);
+ }
+
+ return false;
}
/** Attempt to initialize a page based on redo log records.
-@param page_id page identifier
-@param p iterator pointing to page_id
+@param p iterator
@param mtr mini-transaction
@param b pre-allocated buffer pool block
+@param init page initialization
@return the recovered block
@retval nullptr if the page cannot be initialized based on log records
@retval -1 if the page cannot be recovered due to corruption */
-inline buf_block_t *recv_sys_t::recover_low(const page_id_t page_id,
- map::iterator &p, mtr_t &mtr,
- buf_block_t *b)
+inline buf_block_t *recv_sys_t::recover_low(const map::iterator &p, mtr_t &mtr,
+ buf_block_t *b, init &init)
{
- mysql_mutex_assert_owner(&mutex);
- ut_ad(p->first == page_id);
+ mysql_mutex_assert_not_owner(&mutex);
page_recv_t &recs= p->second;
- ut_ad(recs.state == page_recv_t::RECV_WILL_NOT_READ);
+ ut_ad(recs.skip_read);
+ ut_ad(recs.being_processed == 1);
buf_block_t* block= nullptr;
- mlog_init_t::init &i= mlog_init.last(page_id);
const lsn_t end_lsn= recs.log.last()->lsn;
- if (end_lsn < i.lsn)
- DBUG_LOG("ib_log", "skip log for page " << page_id
- << " LSN " << end_lsn << " < " << i.lsn);
- fil_space_t *space= fil_space_t::get(page_id.space());
+ if (end_lsn < init.lsn)
+ DBUG_LOG("ib_log", "skip log for page " << p->first
+ << " LSN " << end_lsn << " < " << init.lsn);
+ fil_space_t *space= fil_space_t::get(p->first.space());
mtr.start();
mtr.set_log_mode(MTR_LOG_NO_REDO);
@@ -3304,82 +3702,77 @@ inline buf_block_t *recv_sys_t::recover_low(const page_id_t page_id,
if (!space)
{
- if (page_id.page_no() != 0)
+ if (p->first.page_no() != 0)
{
nothing_recoverable:
mtr.commit();
return nullptr;
}
- auto it= recv_spaces.find(page_id.space());
+ auto it= recv_spaces.find(p->first.space());
ut_ad(it != recv_spaces.end());
uint32_t flags= it->second.flags;
zip_size= fil_space_t::zip_size(flags);
- block= buf_page_create_deferred(page_id.space(), zip_size, &mtr, b);
+ block= buf_page_create_deferred(p->first.space(), zip_size, &mtr, b);
ut_ad(block == b);
block->page.lock.x_lock_recursive();
}
else
{
- block= buf_page_create(space, page_id.page_no(), zip_size, &mtr, b);
+ block= buf_page_create(space, p->first.page_no(), zip_size, &mtr, b);
if (UNIV_UNLIKELY(block != b))
{
/* The page happened to exist in the buffer pool, or it
was just being read in. Before the exclusive page latch was acquired by
buf_page_create(), all changes to the page must have been applied. */
- ut_ad(pages.find(page_id) == pages.end());
+ ut_d(mysql_mutex_lock(&mutex));
+ ut_ad(pages.find(p->first) == pages.end());
+ ut_d(mysql_mutex_unlock(&mutex));
space->release();
goto nothing_recoverable;
}
}
- ut_ad(&recs == &pages.find(page_id)->second);
- i.created= true;
- map::iterator r= p++;
- block= recv_recover_page(block, mtr, r, space, &i);
+ ut_d(mysql_mutex_lock(&mutex));
+ ut_ad(&recs == &pages.find(p->first)->second);
+ ut_d(mysql_mutex_unlock(&mutex));
+ init.created= true;
+ block= recv_recover_page(block, mtr, recs, space, &init);
ut_ad(mtr.has_committed());
- if (block)
- {
- recs.log.clear();
- pages.erase(r);
- }
- else
- block= reinterpret_cast<buf_block_t*>(-1);
-
- if (pages.empty())
- pthread_cond_signal(&cond);
-
if (space)
space->release();
- return block;
+ return block ? block : reinterpret_cast<buf_block_t*>(-1);
}
/** Attempt to initialize a page based on redo log records.
@param page_id page identifier
@return recovered block
@retval nullptr if the page cannot be initialized based on log records */
-buf_block_t *recv_sys_t::recover_low(const page_id_t page_id)
+ATTRIBUTE_COLD buf_block_t *recv_sys_t::recover_low(const page_id_t page_id)
{
- buf_block_t *free_block= buf_LRU_get_free_block(false);
- buf_block_t *block= nullptr;
-
mysql_mutex_lock(&mutex);
map::iterator p= pages.find(page_id);
- if (p != pages.end() && p->second.state == page_recv_t::RECV_WILL_NOT_READ)
+ if (p != pages.end() && !p->second.being_processed && p->second.skip_read)
{
+ p->second.being_processed= 1;
+ init &init= mlog_init.last(page_id);
+ mysql_mutex_unlock(&mutex);
+ buf_block_t *free_block= buf_LRU_get_free_block(false);
mtr_t mtr;
- block= recover_low(page_id, p, mtr, free_block);
+ buf_block_t *block= recover_low(p, mtr, free_block, init);
+ p->second.being_processed= -1;
ut_ad(!block || block == reinterpret_cast<buf_block_t*>(-1) ||
block == free_block);
+ if (UNIV_UNLIKELY(!block))
+ buf_pool.free_block(free_block);
+ return block;
}
mysql_mutex_unlock(&mutex);
- if (UNIV_UNLIKELY(!block))
- buf_pool.free_block(free_block);
- return block;
+ return nullptr;
}
inline fil_space_t *fil_system_t::find(const char *path) const
@@ -3427,45 +3820,18 @@ void recv_sys_t::apply(bool last_batch)
mysql_mutex_assert_owner(&mutex);
- timespec abstime;
-
- while (apply_batch_on)
- {
- if (is_corrupt_log())
- return;
- if (last_batch)
- my_cond_wait(&cond, &mutex.m_mutex);
- else
- {
-#ifndef SUX_LOCK_GENERIC
- ut_ad(log_sys.latch.is_write_locked());
-#endif
- log_sys.latch.wr_unlock();
- set_timespec_nsec(abstime, 500000000ULL); /* 0.5s */
- my_cond_timedwait(&cond, &mutex.m_mutex, &abstime);
- mysql_mutex_unlock(&mutex);
- log_sys.latch.wr_lock(SRW_LOCK_CALL);
- mysql_mutex_lock(&mutex);
- }
- }
-
- recv_no_ibuf_operations = !last_batch ||
- srv_operation == SRV_OPERATION_RESTORE ||
- srv_operation == SRV_OPERATION_RESTORE_EXPORT;
-
- mtr_t mtr;
+ garbage_collect();
if (!pages.empty())
{
- const char *msg= last_batch
- ? "Starting final batch to recover"
- : "Starting a batch to recover";
- const size_t n= pages.size();
- sql_print_information("InnoDB: %s %zu pages from redo log.", msg, n);
- sd_notifyf(0, "STATUS=%s %zu pages from redo log", msg, n);
+ recv_no_ibuf_operations = !last_batch ||
+ srv_operation == SRV_OPERATION_RESTORE ||
+ srv_operation == SRV_OPERATION_RESTORE_EXPORT;
+ ut_ad(!last_batch || lsn == scanned_lsn);
+ progress_time= time(nullptr);
+ report_progress();
apply_log_recs= true;
- apply_batch_on= true;
for (auto id= srv_undo_tablespaces_open; id--;)
{
@@ -3491,132 +3857,71 @@ void recv_sys_t::apply(bool last_batch)
fil_system.extend_to_recv_size();
- /* We must release log_sys.latch and recv_sys.mutex before
- invoking buf_LRU_get_free_block(). Allocating a block may initiate
- a redo log write and therefore acquire log_sys.latch. To avoid
- deadlocks, log_sys.latch must not be acquired while holding
- recv_sys.mutex. */
- mysql_mutex_unlock(&mutex);
- if (!last_batch)
- log_sys.latch.wr_unlock();
-
- buf_block_t *free_block= buf_LRU_get_free_block(false);
+ fil_space_t *space= nullptr;
+ uint32_t space_id= ~0;
+ buf_block_t *free_block= nullptr;
- if (!last_batch)
- log_sys.latch.wr_lock(SRW_LOCK_CALL);
- mysql_mutex_lock(&mutex);
-
- for (map::iterator p= pages.begin(); p != pages.end(); )
+ for (pages_it= pages.begin(); pages_it != pages.end();
+ pages_it= pages.begin())
{
- const page_id_t page_id= p->first;
- ut_ad(!p->second.log.empty());
+ if (!free_block)
+ {
+ if (!last_batch)
+ log_sys.latch.wr_unlock();
+ wait_for_pool(1);
+ pages_it= pages.begin();
+ mysql_mutex_unlock(&mutex);
+ /* We must release log_sys.latch and recv_sys.mutex before
+ invoking buf_LRU_get_free_block(). Allocating a block may initiate
+ a redo log write and therefore acquire log_sys.latch. To avoid
+ deadlocks, log_sys.latch must not be acquired while holding
+ recv_sys.mutex. */
+ free_block= buf_LRU_get_free_block(false);
+ if (!last_batch)
+ log_sys.latch.wr_lock(SRW_LOCK_CALL);
+ mysql_mutex_lock(&mutex);
+ pages_it= pages.begin();
+ }
- const uint32_t space_id= page_id.space();
- auto d= deferred_spaces.defers.find(space_id);
- if (d != deferred_spaces.defers.end())
+ while (pages_it != pages.end())
{
- if (d->second.deleted)
+ if (is_corrupt_fs() || is_corrupt_log())
{
- /* For deleted files we must preserve the entry in deferred_spaces */
-erase_for_space:
- while (p != pages.end() && p->first.space() == space_id)
+ if (space)
+ space->release();
+ if (free_block)
{
- map::iterator r= p++;
- r->second.log.clear();
- pages.erase(r);
+ mysql_mutex_unlock(&mutex);
+ mysql_mutex_lock(&buf_pool.mutex);
+ buf_LRU_block_free_non_file_page(free_block);
+ mysql_mutex_unlock(&buf_pool.mutex);
+ mysql_mutex_lock(&mutex);
}
+ return;
}
- else if (recover_deferred(p, d->second.file_name, free_block))
- {
- if (!srv_force_recovery)
- set_corrupt_fs();
- deferred_spaces.defers.erase(d);
- goto erase_for_space;
- }
- else
- deferred_spaces.defers.erase(d);
- if (!free_block)
- goto next_free_block;
- p= pages.lower_bound(page_id);
- continue;
- }
-
- switch (p->second.state) {
- case page_recv_t::RECV_BEING_READ:
- case page_recv_t::RECV_BEING_PROCESSED:
- p++;
- continue;
- case page_recv_t::RECV_WILL_NOT_READ:
- if (UNIV_LIKELY(!!recover_low(page_id, p, mtr, free_block)))
- {
-next_free_block:
- mysql_mutex_unlock(&mutex);
- if (!last_batch)
- log_sys.latch.wr_unlock();
- free_block= buf_LRU_get_free_block(false);
- if (!last_batch)
- log_sys.latch.wr_lock(SRW_LOCK_CALL);
- mysql_mutex_lock(&mutex);
+ if (apply_batch(space_id, space, free_block, last_batch))
break;
- }
- ut_ad(p == pages.end() || p->first > page_id);
- continue;
- case page_recv_t::RECV_NOT_PROCESSED:
- recv_read_in_area(page_id, p);
}
- p= pages.lower_bound(page_id);
- /* Ensure that progress will be made. */
- ut_ad(p == pages.end() || p->first > page_id ||
- p->second.state >= page_recv_t::RECV_BEING_READ);
}
- buf_pool.free_block(free_block);
+ if (space)
+ space->release();
- /* Wait until all the pages have been processed */
- for (;;)
+ if (free_block)
{
- const bool empty= pages.empty();
- if (empty && !os_aio_pending_reads())
- break;
-
- if (!is_corrupt_fs() && !is_corrupt_log())
- {
- if (last_batch)
- {
- if (!empty)
- my_cond_wait(&cond, &mutex.m_mutex);
- else
- {
- mysql_mutex_unlock(&mutex);
- os_aio_wait_until_no_pending_reads(false);
- mysql_mutex_lock(&mutex);
- ut_ad(pages.empty());
- }
- }
- else
- {
-#ifndef SUX_LOCK_GENERIC
- ut_ad(log_sys.latch.is_write_locked());
-#endif
- log_sys.latch.wr_unlock();
- set_timespec_nsec(abstime, 500000000ULL); /* 0.5s */
- my_cond_timedwait(&cond, &mutex.m_mutex, &abstime);
- mysql_mutex_unlock(&mutex);
- log_sys.latch.wr_lock(SRW_LOCK_CALL);
- mysql_mutex_lock(&mutex);
- }
- continue;
- }
- if (is_corrupt_fs() && !srv_force_recovery)
- sql_print_information("InnoDB: Set innodb_force_recovery=1"
- " to ignore corrupted pages.");
- return;
+ mysql_mutex_lock(&buf_pool.mutex);
+ buf_LRU_block_free_non_file_page(free_block);
+ mysql_mutex_unlock(&buf_pool.mutex);
}
}
if (last_batch)
- /* We skipped this in buf_page_create(). */
- mlog_init.mark_ibuf_exist(mtr);
+ {
+ if (!recv_no_ibuf_operations)
+ /* We skipped this in buf_page_create(). */
+ mlog_init.mark_ibuf_exist();
+ mlog_init.clear();
+ }
else
{
mlog_init.reset();
@@ -3625,21 +3930,22 @@ next_free_block:
mysql_mutex_unlock(&mutex);
- if (last_batch && srv_operation != SRV_OPERATION_RESTORE &&
- srv_operation != SRV_OPERATION_RESTORE_EXPORT)
- /* Instead of flushing, last_batch sorts the buf_pool.flush_list
- in ascending order of buf_page_t::oldest_modification. */
- log_sort_flush_list();
- else
- buf_flush_sync_batch(lsn);
-
if (!last_batch)
{
+ buf_flush_sync_batch(lsn);
buf_pool_invalidate();
log_sys.latch.wr_lock(SRW_LOCK_CALL);
}
+ else if (srv_operation == SRV_OPERATION_RESTORE ||
+ srv_operation == SRV_OPERATION_RESTORE_EXPORT)
+ buf_flush_sync_batch(lsn);
+ else
+ /* Instead of flushing, last_batch sorts the buf_pool.flush_list
+ in ascending order of buf_page_t::oldest_modification. */
+ log_sort_flush_list();
+
#ifdef HAVE_PMEM
- else if (log_sys.is_pmem())
+ if (last_batch && log_sys.is_pmem())
mprotect(log_sys.buf, len, PROT_READ | PROT_WRITE);
#endif
@@ -3649,35 +3955,24 @@ next_free_block:
clear();
}
-/** Check whether the number of read redo log blocks exceeds the maximum.
-@return whether the memory is exhausted */
-inline bool recv_sys_t::is_memory_exhausted()
-{
- if (UT_LIST_GET_LEN(blocks) * 3 < buf_pool.get_n_pages())
- return false;
- DBUG_PRINT("ib_log",("Ran out of memory and last stored lsn " LSN_PF
- " last stored offset %zu\n", lsn, offset));
- return true;
-}
-
/** Scan log_t::FORMAT_10_8 log store records to the parsing buffer.
@param last_phase whether changes can be applied to the tablespaces
@return whether rescan is needed (not everything was stored) */
static bool recv_scan_log(bool last_phase)
{
DBUG_ENTER("recv_scan_log");
- DBUG_ASSERT(!last_phase || recv_sys.file_checkpoint);
ut_ad(log_sys.is_latest());
const size_t block_size_1{log_sys.get_block_size() - 1};
mysql_mutex_lock(&recv_sys.mutex);
- recv_sys.clear();
ut_d(recv_sys.after_apply= last_phase);
- ut_ad(!last_phase || recv_sys.file_checkpoint);
+ if (!last_phase)
+ recv_sys.clear();
+ else
+ ut_ad(recv_sys.file_checkpoint);
- store_t store= last_phase
- ? STORE_IF_EXISTS : recv_sys.file_checkpoint ? STORE_YES : STORE_NO;
+ bool store{recv_sys.file_checkpoint != 0};
size_t buf_size= log_sys.buf_size;
#ifdef HAVE_PMEM
if (log_sys.is_pmem())
@@ -3694,6 +3989,7 @@ static bool recv_scan_log(bool last_phase)
recv_sys.len= 0;
}
+ lsn_t rewound_lsn= 0;
for (ut_d(lsn_t source_offset= 0);;)
{
#ifndef SUX_LOCK_GENERIC
@@ -3741,27 +4037,29 @@ static bool recv_scan_log(bool last_phase)
if (UNIV_UNLIKELY(!recv_needed_recovery))
{
- ut_ad(store == (recv_sys.file_checkpoint ? STORE_YES : STORE_NO));
+ ut_ad(!last_phase);
ut_ad(recv_sys.lsn >= log_sys.next_checkpoint_lsn);
- for (;;)
+ if (!store)
{
- const byte& b{log_sys.buf[recv_sys.offset]};
- r= recv_sys.parse_pmem(store);
- if (r == recv_sys_t::OK)
+ ut_ad(!recv_sys.file_checkpoint);
+ for (;;)
{
- if (store == STORE_NO &&
- (b == FILE_CHECKPOINT + 2 + 8 || (b & 0xf0) == FILE_MODIFY))
- continue;
- }
- else if (r == recv_sys_t::PREMATURE_EOF)
- goto read_more;
- else if (store != STORE_NO)
- break;
+ const byte& b{log_sys.buf[recv_sys.offset]};
+ r= recv_sys.parse_pmem<false>(false);
+ switch (r) {
+ case recv_sys_t::PREMATURE_EOF:
+ goto read_more;
+ default:
+ ut_ad(r == recv_sys_t::GOT_EOF);
+ break;
+ case recv_sys_t::OK:
+ if (b == FILE_CHECKPOINT + 2 + 8 || (b & 0xf0) == FILE_MODIFY)
+ continue;
+ }
- if (store == STORE_NO)
- {
const lsn_t end{recv_sys.file_checkpoint};
+ ut_ad(!end || end == recv_sys.lsn);
mysql_mutex_unlock(&recv_sys.mutex);
if (!end)
@@ -3771,45 +4069,73 @@ static bool recv_scan_log(bool last_phase)
") at " LSN_PF, log_sys.next_checkpoint_lsn,
recv_sys.lsn);
}
- else
- ut_ad(end == recv_sys.lsn);
DBUG_RETURN(true);
}
-
- recv_needed_recovery= true;
- if (srv_read_only_mode)
- {
- mysql_mutex_unlock(&recv_sys.mutex);
- DBUG_RETURN(false);
+ }
+ else
+ {
+ ut_ad(recv_sys.file_checkpoint != 0);
+ switch ((r= recv_sys.parse_pmem<true>(false))) {
+ case recv_sys_t::PREMATURE_EOF:
+ goto read_more;
+ case recv_sys_t::GOT_EOF:
+ break;
+ default:
+ ut_ad(r == recv_sys_t::OK);
+ recv_needed_recovery= true;
+ if (srv_read_only_mode)
+ {
+ mysql_mutex_unlock(&recv_sys.mutex);
+ DBUG_RETURN(false);
+ }
+ sql_print_information("InnoDB: Starting crash recovery from"
+ " checkpoint LSN=" LSN_PF,
+ log_sys.next_checkpoint_lsn);
}
- sql_print_information("InnoDB: Starting crash recovery from"
- " checkpoint LSN=" LSN_PF,
- log_sys.next_checkpoint_lsn);
- break;
}
}
- while ((r= recv_sys.parse_pmem(store)) == recv_sys_t::OK)
+ if (!store)
+ skip_the_rest:
+ while ((r= recv_sys.parse_pmem<false>(false)) == recv_sys_t::OK);
+ else
{
- if (store != STORE_NO && recv_sys.is_memory_exhausted())
- {
- ut_ad(last_phase == (store == STORE_IF_EXISTS));
- if (store == STORE_YES)
+ uint16_t count= 0;
+ while ((r= recv_sys.parse_pmem<true>(last_phase)) == recv_sys_t::OK)
+ if (!++count && recv_sys.report(time(nullptr)))
{
- store= STORE_NO;
- recv_sys.last_stored_lsn= recv_sys.lsn;
- }
- else
- {
- ut_ad(store == STORE_IF_EXISTS);
- recv_sys.apply(false);
+ const size_t n= recv_sys.pages.size();
+ sql_print_information("InnoDB: Parsed redo log up to LSN=" LSN_PF
+ "; to recover: %zu pages", recv_sys.lsn, n);
+ service_manager_extend_timeout(INNODB_EXTEND_TIMEOUT_INTERVAL,
+ "Parsed redo log up to LSN=" LSN_PF
+ "; to recover: %zu pages",
+ recv_sys.lsn, n);
}
+ if (r == recv_sys_t::GOT_OOM)
+ {
+ ut_ad(!last_phase);
+ rewound_lsn= recv_sys.lsn;
+ store= false;
+ if (recv_sys.scanned_lsn <= 1)
+ goto skip_the_rest;
+ ut_ad(recv_sys.file_checkpoint);
+ goto func_exit;
}
}
if (r != recv_sys_t::PREMATURE_EOF)
{
ut_ad(r == recv_sys_t::GOT_EOF);
+ got_eof:
+ ut_ad(recv_sys.is_initialised());
+ if (recv_sys.scanned_lsn > 1)
+ {
+ ut_ad(recv_sys.scanned_lsn == recv_sys.lsn);
+ break;
+ }
+ recv_sys.scanned_lsn= recv_sys.lsn;
+ sql_print_information("InnoDB: End of log at LSN=" LSN_PF, recv_sys.lsn);
break;
}
@@ -3822,7 +4148,7 @@ static bool recv_scan_log(bool last_phase)
break;
if (recv_sys.offset < log_sys.get_block_size())
- break;
+ goto got_eof;
if (recv_sys.offset > buf_size / 4 ||
(recv_sys.offset > block_size_1 &&
@@ -3835,21 +4161,21 @@ static bool recv_scan_log(bool last_phase)
}
}
- const bool corrupt= recv_sys.is_corrupt_log() || recv_sys.is_corrupt_fs();
- recv_sys.maybe_finish_batch();
if (last_phase)
+ {
+ ut_ad(!rewound_lsn);
+ ut_ad(recv_sys.lsn >= recv_sys.file_checkpoint);
log_sys.set_recovered_lsn(recv_sys.lsn);
+ }
+ else if (rewound_lsn)
+ {
+ ut_ad(!store);
+ ut_ad(recv_sys.file_checkpoint);
+ recv_sys.lsn= rewound_lsn;
+ }
+func_exit:
mysql_mutex_unlock(&recv_sys.mutex);
-
- if (corrupt)
- DBUG_RETURN(false);
-
- DBUG_PRINT("ib_log",
- ("%s " LSN_PF " completed", last_phase ? "rescan" : "scan",
- recv_sys.lsn));
- ut_ad(!last_phase || recv_sys.lsn >= recv_sys.file_checkpoint);
-
- DBUG_RETURN(store == STORE_NO);
+ DBUG_RETURN(!store);
}
/** Report a missing tablespace for which page-redo log exists.
@@ -3945,8 +4271,8 @@ next:
/* fall through */
case file_name_t::DELETED:
recv_sys_t::map::iterator r = p++;
- r->second.log.clear();
- recv_sys.pages.erase(r);
+ recv_sys.pages_it_invalidate(r);
+ recv_sys.erase(r);
continue;
}
ut_ad(0);
@@ -3970,8 +4296,6 @@ func_exit:
continue;
}
- missing_tablespace = true;
-
if (srv_force_recovery) {
sql_print_warning("InnoDB: Tablespace " UINT32PF
" was not found at %.*s,"
@@ -3991,14 +4315,11 @@ func_exit:
rs.first,
int(rs.second.name.size()),
rs.second.name.data());
+ } else {
+ missing_tablespace = true;
}
}
- if (!rescan || srv_force_recovery > 0) {
- missing_tablespace = false;
- }
-
- err = DB_SUCCESS;
goto func_exit;
}
@@ -4232,35 +4553,41 @@ read_only_recovery:
goto early_exit;
}
- /* If there is any missing tablespace and rescan is needed
- then there is a possiblity that hash table will not contain
- all space ids redo logs. Rescan the remaining unstored
- redo logs for the validation of missing tablespace. */
- ut_ad(rescan || !missing_tablespace);
+ if (missing_tablespace) {
+ ut_ad(rescan);
+ /* If any tablespaces seem to be missing,
+ validate the remaining log records. */
- while (missing_tablespace) {
- recv_sys.lsn = recv_sys.last_stored_lsn;
- DBUG_PRINT("ib_log", ("Rescan of redo log to validate "
- "the missing tablespace. Scan "
- "from last stored LSN " LSN_PF,
- recv_sys.lsn));
- rescan = recv_scan_log(false);
- ut_ad(!recv_sys.is_corrupt_fs());
+ do {
+ rescan = recv_scan_log(false);
+ ut_ad(!recv_sys.is_corrupt_fs());
- missing_tablespace = false;
+ if (recv_sys.is_corrupt_log()) {
+ goto err_exit;
+ }
- if (recv_sys.is_corrupt_log()) {
- goto err_exit;
- }
+ missing_tablespace = false;
- err = recv_validate_tablespace(
- rescan, missing_tablespace);
+ err = recv_validate_tablespace(
+ rescan, missing_tablespace);
- if (err != DB_SUCCESS) {
- goto early_exit;
- }
+ if (err != DB_SUCCESS) {
+ goto early_exit;
+ }
+ } while (missing_tablespace);
rescan = true;
+ /* Because in the loop above we overwrote the
+ initially stored recv_sys.pages, we must
+ restart parsing the log from the very beginning. */
+
+ /* FIXME: Use a separate loop for checking for
+ tablespaces (not individual pages), while retaining
+ the initial recv_sys.pages. */
+ mysql_mutex_lock(&recv_sys.mutex);
+ recv_sys.clear();
+ recv_sys.lsn = log_sys.next_checkpoint_lsn;
+ mysql_mutex_unlock(&recv_sys.mutex);
}
if (srv_operation <= SRV_OPERATION_EXPORT_RESTORED) {
@@ -4271,8 +4598,7 @@ read_only_recovery:
ut_ad(srv_force_recovery <= SRV_FORCE_NO_UNDO_LOG_SCAN);
if (rescan) {
- recv_sys.lsn = log_sys.next_checkpoint_lsn;
- rescan = recv_scan_log(true);
+ recv_scan_log(true);
if ((recv_sys.is_corrupt_log()
&& !srv_force_recovery)
|| recv_sys.is_corrupt_fs()) {
diff --git a/storage/innobase/os/os0file.cc b/storage/innobase/os/os0file.cc
index aafa4361b0b..217cf153b59 100644
--- a/storage/innobase/os/os0file.cc
+++ b/storage/innobase/os/os0file.cc
@@ -3411,15 +3411,12 @@ os_file_get_status(
return(ret);
}
-
-extern void fil_aio_callback(const IORequest &request);
-
-static void io_callback(tpool::aiocb *cb)
+static void io_callback_errorcheck(const tpool::aiocb *cb)
{
- const IORequest &request= *static_cast<const IORequest*>
- (static_cast<const void*>(cb->m_userdata));
if (cb->m_err != DB_SUCCESS)
{
+ const IORequest &request= *static_cast<const IORequest*>
+ (static_cast<const void*>(cb->m_userdata));
ib::fatal() << "IO Error: " << cb->m_err << " during " <<
(request.is_async() ? "async " : "sync ") <<
(request.is_LRU() ? "lru " : "") <<
@@ -3427,19 +3424,36 @@ static void io_callback(tpool::aiocb *cb)
" of " << cb->m_len << " bytes, for file " << cb->m_fh << ", returned " <<
cb->m_ret_len;
}
- /* Return cb back to cache*/
- if (cb->m_opcode == tpool::aio_opcode::AIO_PREAD)
- {
- ut_ad(read_slots->contains(cb));
- fil_aio_callback(request);
- read_slots->release(cb);
- }
- else
- {
- ut_ad(write_slots->contains(cb));
- fil_aio_callback(request);
- write_slots->release(cb);
- }
+}
+
+static void fake_io_callback(void *c)
+{
+ tpool::aiocb *cb= static_cast<tpool::aiocb*>(c);
+ ut_ad(read_slots->contains(cb));
+ static_cast<const IORequest*>(static_cast<const void*>(cb->m_userdata))->
+ fake_read_complete(cb->m_offset);
+ read_slots->release(cb);
+}
+
+static void read_io_callback(void *c)
+{
+ tpool::aiocb *cb= static_cast<tpool::aiocb*>(c);
+ ut_ad(cb->m_opcode == tpool::aio_opcode::AIO_PREAD);
+ io_callback_errorcheck(cb);
+ ut_ad(read_slots->contains(cb));
+ static_cast<const IORequest*>
+ (static_cast<const void*>(cb->m_userdata))->read_complete();
+ read_slots->release(cb);
+}
+
+static void write_io_callback(void *c)
+{
+ tpool::aiocb *cb= static_cast<tpool::aiocb*>(c);
+ ut_ad(cb->m_opcode == tpool::aio_opcode::AIO_PWRITE);
+ ut_ad(write_slots->contains(cb));
+ static_cast<const IORequest*>
+ (static_cast<const void*>(cb->m_userdata))->write_complete();
+ write_slots->release(cb);
}
#ifdef LINUX_NATIVE_AIO
@@ -3684,6 +3698,28 @@ void os_aio_wait_until_no_pending_reads(bool declare)
tpool::tpool_wait_end();
}
+/** Submit a fake read request during crash recovery.
+@param type fake read request
+@param offset additional context */
+void os_fake_read(const IORequest &type, os_offset_t offset)
+{
+ tpool::aiocb *cb= read_slots->acquire();
+
+ cb->m_group= read_slots->get_task_group();
+ cb->m_fh= type.node->handle.m_file;
+ cb->m_buffer= nullptr;
+ cb->m_len= 0;
+ cb->m_offset= offset;
+ cb->m_opcode= tpool::aio_opcode::AIO_PREAD;
+ new (cb->m_userdata) IORequest{type};
+ cb->m_internal_task.m_func= fake_io_callback;
+ cb->m_internal_task.m_arg= cb;
+ cb->m_internal_task.m_group= cb->m_group;
+
+ srv_thread_pool->submit_task(&cb->m_internal_task);
+}
+
+
/** Request a read or write.
@param type I/O request
@param buf buffer
@@ -3729,23 +3765,32 @@ func_exit:
return err;
}
+ io_slots* slots;
+ tpool::callback_func callback;
+ tpool::aio_opcode opcode;
+
if (type.is_read()) {
++os_n_file_reads;
+ slots = read_slots;
+ callback = read_io_callback;
+ opcode = tpool::aio_opcode::AIO_PREAD;
} else {
++os_n_file_writes;
+ slots = write_slots;
+ callback = write_io_callback;
+ opcode = tpool::aio_opcode::AIO_PWRITE;
}
compile_time_assert(sizeof(IORequest) <= tpool::MAX_AIO_USERDATA_LEN);
- io_slots* slots= type.is_read() ? read_slots : write_slots;
tpool::aiocb* cb = slots->acquire();
cb->m_buffer = buf;
- cb->m_callback = (tpool::callback_func)io_callback;
+ cb->m_callback = callback;
cb->m_group = slots->get_task_group();
cb->m_fh = type.node->handle.m_file;
cb->m_len = (int)n;
cb->m_offset = offset;
- cb->m_opcode = type.is_read() ? tpool::aio_opcode::AIO_PREAD : tpool::aio_opcode::AIO_PWRITE;
+ cb->m_opcode = opcode;
new (cb->m_userdata) IORequest{type};
if (srv_thread_pool->submit_io(cb)) {
@@ -3753,6 +3798,7 @@ func_exit:
os_file_handle_error(type.node->name, type.is_read()
? "aio read" : "aio write");
err = DB_IO_ERROR;
+ type.node->space->release();
}
goto func_exit;